Spark 中用 Scala 和 java net开发与java开发区别有什么区别

Spark之wordcount程序(Java Scala)
使用java开发本地测试的wordcount程序
@author Administrator
public class WordCountLocal {
public static void main(String[] args) {
// 编写Spark应用程序
// 本地执行,是可以执行在eclipse中的main方法中,执行的
// 第一步:创建SparkConf对象,设置Spark应用的配置信息
// 使用setMaster()可以设置Spark应用程序要连接的Spark集群的master节点的url
// 但是如果设置为local则代表,在本地运行
SparkConf conf = new SparkConf()
.setAppName("WordCountLocal")
.setMaster("local");
// 第二步:创建JavaSparkContext对象
// 在Spark中,SparkContext是Spark所有功能的一个入口,你无论是用java、scala,甚至是python编写
// 都必须要有一个SparkContext,它的主要作用,包括初始化Spark应用程序所需的一些核心组件,包括
// 调度器(DAGSchedule、TaskScheduler),还会去到Spark Master节点上进行注册,等等
// 一句话,SparkContext,是Spark应用中,可以说是最最重要的一个对象
// 但是呢,在Spark中,编写不同类型的Spark应用程序,使用的SparkContext是不同的,如果使用scala,
// 使用的就是原生的SparkContext对象
// 但是如果使用Java,那么就是JavaSparkContext对象
// 如果是开发Spark SQL程序,那么就是SQLContext、HiveContext
// 如果是开发Spark Streaming程序,那么就是它独有的SparkContext
// 以此类推
JavaSparkContext sc = new JavaSparkContext(conf);
// 第三步:要针对输入源(hdfs文件、本地文件,等等),创建一个初始的RDD
// 输入源中的数据会打散,分配到RDD的每个partition中,从而形成一个初始的分布式的数据集
// 我们这里呢,因为是本地测试,所以呢,就是针对本地文件
// SparkContext中,用于根据文件类型的输入源创建RDD的方法,叫做textFile()方法
// 在Java中,创建的普通RDD,都叫做JavaRDD
// 在这里呢,RDD中,有元素这种概念,如果是hdfs或者本地文件呢,创建的RDD,每一个元素就相当于
// 是文件里的一行
JavaRDD&String& lines = sc.textFile("C:/Users/Think/Desktop/spark.txt");
// 第四步:对初始RDD进行transformation操作,也就是一些计算操作
// 通常操作会通过创建function,并配合RDD的map、flatMap等算子来执行
// function,通常,如果比较简单,则创建指定Function的匿名内部类
// 但是如果function比较复杂,则会单独创建一个类,作为实现这个function接口的类
// 先将每一行拆分成单个的单词
// FlatMapFunction,有两个泛型参数,分别代表了输入和输出类型
// 我们这里呢,输入肯定是String,因为是一行一行的文本,输出,其实也是String,因为是每一行的文本
// 这里先简要介绍flatMap算子的作用,其实就是,将RDD的一个元素,给拆分成一个或多个元素
JavaRDD&String& words = lines.flatMap(new FlatMapFunction&String, String&() {
private static final long serialVersionUID = 1L;
public Iterable&String& call(String line) throws Exception {
return Arrays.asList(line.split(" "));
// 接着,需要将每一个单词,映射为(单词, 1)的这种格式
// 因为只有这样,后面才能根据单词作为key,来进行每个单词的出现次数的累加
// mapToPair,其实就是将每个元素,映射为一个(v1,v2)这样的Tuple2类型的元素
// 如果大家还记得scala里面讲的tuple,那么没错,这里的tuple2就是scala类型,包含了两个值
// mapToPair这个算子,要求的是与PairFunction配合使用,第一个泛型参数代表了输入类型
// 第二个和第三个泛型参数,代表的输出的Tuple2的第一个值和第二个值的类型
// JavaPairRDD的两个泛型参数,分别代表了tuple元素的第一个值和第二个值的类型
JavaPairRDD&String, Integer& pairs = words.mapToPair(
new PairFunction&String, String, Integer&() {
private static final long serialVersionUID = 1L;
public Tuple2&String, Integer& call(String word) throws Exception {
return new Tuple2&String, Integer&(word, 1);
// 接着,需要以单词作为key,统计每个单词出现的次数
// 这里要使用reduceByKey这个算子,对每个key对应的value,都进行reduce操作
// 比如JavaPairRDD中有几个元素,分别为(hello, 1) (hello, 1) (hello, 1) (world, 1)
// reduce操作,相当于是把第一个值和第二个值进行计算,然后再将结果与第三个值进行计算
// 比如这里的hello,那么就相当于是,首先是1 + 1 = 2,然后再将2 + 1 = 3
// 最后返回的JavaPairRDD中的元素,也是tuple,但是第一个值就是每个key,第二个值就是key的value
// reduce之后的结果,相当于就是每个单词出现的次数
JavaPairRDD&String, Integer& wordCounts = pairs.reduceByKey(
new Function2&Integer, Integer, Integer&() {
private static final long serialVersionUID = 1L;
public Integer call(Integer v1, Integer v2) throws Exception {
return v1 + v2;
// 到这里为止,我们通过几个Spark算子操作,已经统计出了单词的次数
// 但是,之前我们使用的flatMap、mapToPair、reduceByKey这种操作,都叫做transformation操作
// 一个Spark应用中,光是有transformation操作,是不行的,是不会执行的,必须要有一种叫做action
// 接着,最后,可以使用一种叫做action操作的,比如说,foreach,来触发程序的执行
wordCounts.foreach(new VoidFunction&Tuple2&String,Integer&&() {
private static final long serialVersionUID = 1L;
public void call(Tuple2&String, Integer& wordCount) throws Exception {
System.out.println(wordCount._1 + " appeared " + wordCount._2 + " times.");
sc.close();
将java开发的wordcount程序部署到spark集群上运行
@author Administrator
public class WordCountCluster {
public static void main(String[] args) {
// 如果要在spark集群上运行,需要修改的,只有两个地方
// 第一,将SparkConf的setMaster()方法给删掉,默认它自己会去连接
// 第二,我们针对的不是本地文件了,修改为hadoop hdfs上的真正的存储大数据的文件
// 实际执行步骤:
// 1、将spark.txt文件上传到hdfs上去
// 2、使用我们最早在pom.xml里配置的maven插件,对spark工程进行打包
// 3、将打包后的spark工程jar包,上传到机器上执行
// 4、编写spark-submit脚本
// 5、执行spark-submit脚本,提交spark应用到集群执行
SparkConf conf = new SparkConf()
.setAppName("WordCountCluster");
JavaSparkContext sc = new JavaSparkContext(conf);
JavaRDD&String& lines = sc.textFile("hdfs://spark1:9000/spark.txt");
JavaRDD&String& words = lines.flatMap(new FlatMapFunction&String, String&() {
private static final long serialVersionUID = 1L;
public Iterable&String& call(String line) throws Exception {
return Arrays.asList(line.split(" "));
JavaPairRDD&String, Integer& pairs = words.mapToPair(
new PairFunction&String, String, Integer&() {
private static final long serialVersionUID = 1L;
public Tuple2&String, Integer& call(String word) throws Exception {
return new Tuple2&String, Integer&(word, 1);
JavaPairRDD&String, Integer& wordCounts = pairs.reduceByKey(
new Function2&Integer, Integer, Integer&() {
private static final long serialVersionUID = 1L;
public Integer call(Integer v1, Integer v2) throws Exception {
return v1 + v2;
wordCounts.foreach(new VoidFunction&Tuple2&String,Integer&&() {
private static final long serialVersionUID = 1L;
public void call(Tuple2&String, Integer& wordCount) throws Exception {
System.out.println(wordCount._1 + " appeared " + wordCount._2 + " times.");
sc.close();
* @author Administrator
object WordCount {
def main(args: Array[String]) {
val conf = new SparkConf()
.setAppName("WordCount");
val sc = new SparkContext(conf)
val lines = sc.textFile("hdfs://spark1:9000/spark.txt", 1);
val words = lines.flatMap { line =& line.split(" ") }
val pairs = words.map { word =& (word, 1) }
val wordCounts = pairs.reduceByKey { _ + _ }
wordCounts.foreach(wordCount =& println(wordCount._1 + " appeared " + wordCount._2 + " times."))
版权声明:本文内容由互联网用户自发贡献,本社区不拥有所有权,也不承担相关法律责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件至: 进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容。
用云栖社区APP,舒服~
【云栖快讯】数据库技术天团集体亮相,分享一线生产实践经验,告诉你踩过的坑、走过的路,都是老司机,靠谱!干货分享,不可错过!&&
阿里云流计算(Aliyun StreamCompute)是运行在阿里云平台上的流式大数据分析平台,提供给用户在云...
大数据开发套件(Data IDE),提供可视化开发界面、离线任务调度运维、快速数据集成、多人协同工作等功能,为您...
快速、完全托管的TB/PB级数据仓库解决方案,向用户提供了完善的数据导入方案以及多种经典的分布式计算模型,能够更...
为您提供简单高效、处理能力可弹性伸缩的计算服务,帮助您快速构建更稳定、安全的应用,提升运维效率,降低 IT 成本...
2017杭州云栖大会火热抢票
Loading...后使用快捷导航没有帐号?
查看: 19772|回复: 51
大家会为了使用spark,而去学习 scala吗?
论坛徽章:21
  Mahout Goodbye MapReduce,转投Spark,这个是学大数据,特别是Mahout的人关心之事。
  Spark主要的编程语言是,选择Scala是因为它的简洁性(Scala可以很方便在交互式下使用)和性能(JVM上的静态强类型语言)。Spark支持编程,但对于使用Java就没有了Spark-Shell这样方便的工具,其它与Scala编程是一样的,因为都是JVM上的语言,Scala与Java可以互操作,Java编程接口其实就是对Scala的封装。但是,Spark内核是由Scala语言开发的,大家会为了使用spark,而去学习 scala吗?
高级会员, 积分 525, 距离下一级还需 475 积分
论坛徽章:4
spark好高端
论坛徽章:45
学一门语言就是想学习一下它的新思维,如果一种语言没有任何新思维的话,就不值得学习了。
金牌会员, 积分 1461, 距离下一级还需 1539 积分
论坛徽章:11
方向调整的很快。
金牌会员, 积分 1017, 距离下一级还需 1983 积分
论坛徽章:22
已经学了,感觉再也不想用java写代码了
论坛徽章:21
zdb_cn 发表于
已经学了,感觉再也不想用java写代码了
scala这么强,令你不想回头?之前JAVA熟练的吗?
金牌会员, 积分 1017, 距离下一级还需 1983 积分
论坛徽章:22
hq333 发表于
scala这么强,令你不想回头?之前JAVA熟练的吗?
之前写了三年java
论坛徽章:21
zdb_cn 发表于
之前写了三年java
scala是初学,用了多久?
新手上路, 积分 15, 距离下一级还需 35 积分
论坛徽章:0
会学的。。
高级会员, 积分 626, 距离下一级还需 374 积分
论坛徽章:3
我就是冲这个学scala的Spark 中用 Scala 和 java 开发有什么区别? - 知乎290被浏览51746分享邀请回答106 条评论分享收藏感谢收起maven环境下使用java、scala混合开发spark应用
maven环境下使用java、scala混合开发spark应用:熟悉java的开发者在开发spark应用时,常常会遇到spark对java的接口文档不完善或者不提供对应的java接口的问题。
这个时候,如果在java项目中能直接使用scala来开发spark应用,同时使用java来处理项目中的其它需求,将在一定程度上降低开发spark项目的难度。下面就来探索一下java、scala、spark、maven这一套开发环境要怎样来搭建。
1、下载scala sdk
http://www.scala-lang.org/download/直接到这里下载sdk,目前最新的稳定版为2.11.7,下载后解压就行
(后面在intellijidea中创建.scala后缀源代码时,ide会智能感知并提示你设置scala sdk,按提示指定sdk目录为解压目录即可)
也可以手动配置scala SDK:ideal =&File =&project struct.. =&library..=& +...
2、下载scala forintellij idea的插件
如上图,直接在plugins里搜索Scala,然后安装即可,如果不具备上网环境,或网速不给力。也可以直接到/plugin/?idea&id=1347手动下载插件的zip包,手动下载时,要特别注意版本号,一定要跟本机的intellij idea的版本号匹配,否则下载后无法安装。下载完成后,在上图中,点击&Install plugin from disk...&,选择插件包的zip即可。
3、如何跟maven整合
使用maven对项目进行打包的话,需要在pom文件中配置scala-maven-plugin这个插件。同时,由于是spark开发,jar包需要打包为可执行java包,还需要在pom文件中配置maven-assembly-plugin和maven-shade-plugin插件并设置mainClass。经过实验摸索,下面贴出一个可用的pom文件,使用时只需要在包依赖上进行增减即可使用。
my-project-groupid
1.0-SNAPSHOT
http://maven.apache.org
repo1.maven.org
http://repo1.maven.org/maven2
repository.jboss.org
http://repository.jboss.org/nexus/content/groups/public/
cloudhopper
Repository for Cloudhopper
/repos/third-party/
Repository maven
Scala Tools
Scala Tools
org.scala-lang
scala-library
${scala.version}
org.scala-lang
scala-compiler
${scala.version}
javax.mail
javax.mail-api
org.apache.spark
spark-core_2.10
${spark.version}
org.apache.spark
spark-sql_2.10
${spark.version}
org.apache.spark
spark-streaming_2.10
${spark.version}
org.apache.spark
spark-mllib_2.10
${spark.version}
org.apache.spark
spark-hive_2.10
${spark.version}
org.apache.spark
spark-graphx_2.10
${spark.version}
mysql-connector-java
com.google.guava
org.apache.hadoop
hadoop-common
org.apache.hadoop
hadoop-client
org.apache.spark
spark-hive_2.10
${spark.version}
com.alibaba
commons-math3
com.google.guava
org.apache.hadoop
hadoop-common
org.apache.hadoop
hadoop-hdfs
redis.clients
org.apache.hbase
hbase-client
0.98.6-hadoop2
org.apache.hbase
0.98.6-hadoop2
org.apache.hbase
hbase-common
0.98.6-hadoop2
org.apache.hbase
hbase-server
0.98.6-hadoop2
org.testng
mysql-connector-java
com.fasterxml.jackson.jaxrs
jackson-jaxrs-json-provider
com.fasterxml.jackson.core
jackson-databind
net.sf.json-lib
javax.mail
javax.mail-api
maven-assembly-plugin
jar-with-dependencies
rrkd.dt.sparkTest.HelloWorld
make-assembly
org.apache.maven.plugins
maven-compiler-plugin
${jdk.version}
${jdk.version}
${project.build.sourceEncoding}
org.apache.maven.plugins
maven-shade-plugin
META-INF/*.SF
META-INF/*.DSA
META-INF/*.RSA
reference.conf
rrkd.dt.sparkTest.HelloWorld
net.alchim31.maven
scala-maven-plugin
compile-scala
add-source
test-compile-scala
test-compile
add-source
testCompile
${scala.version}
主要是build部分的配置,其它的毋须过多关注。
项目的目录结构,大体跟maven的默认约定一样,只是src下多了一个scala目录,主要还是为了便于组织java和scala源码,如下图:
在java目录下建立HelloWorld类HelloWorld.class:
import test.H
* Created by L on .
public class HelloWorld {
public static void main(String[] args){
System.out.print(&test&);
Hello.sayHello(&scala&);
Hello.runSpark();
在scala目录下建立hello类hello.scala:
package test
import org.apache.spark.graphx.{Graph, Edge, VertexId, GraphLoader}
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkContext, SparkConf}
import breeze.linalg.{Vector, DenseVector, squaredDistance}
* Created by L on .
object Hello {
def sayHello(x: String): Unit = {
println(&hello,& + x);
def main(args: Array[String]) {
def runSpark() {
val sparkConf = new SparkConf().setAppName(&SparkKMeans&).setMaster(&local[*]&)
val sc = new SparkContext(sparkConf)
// Create an RDD for the vertices
val users: RDD[(VertexId, (String, String))] =
sc.parallelize(Array((3L, (&rxin&, &student&)), (7L, (&jgonzal&, &postdoc&)),
(5L, (&franklin&, &prof&)), (2L, (&istoica&, &prof&)),
(4L, (&peter&, &student&))))
// Create an RDD for edges
val relationships: RDD[Edge[String]] =
sc.parallelize(Array(Edge(3L, 7L, &collab&),
Edge(5L, 3L, &advisor&),
Edge(2L, 5L, &colleague&), Edge(5L, 7L, &pi&),
Edge(4L, 0L, &student&),
Edge(5L, 0L, &colleague&)))
// Define a default user in case there are relationship with missing user
val defaultUser = (&John Doe&, &Missing&)
// Build the initial Graph
val graph = Graph(users, relationships, defaultUser)
// Notice that there is a user 0 (for which we have no information) connected to users
// 4 (peter) and 5 (franklin).
graph.triplets.map(
triplet =& triplet.srcAttr._1 + & is the & + triplet.attr + & of & + triplet.dstAttr._1
).collect.foreach(println(_))
// Remove missing vertices as well as the edges to connected to them
val validGraph = graph.subgraph(vpred = (id, attr) =& attr._2 != &Missing&)
// The valid subgraph will disconnect users 4 and 5 by removing user 0
validGraph.vertices.collect.foreach(println(_))
validGraph.triplets.map(
triplet =& triplet.srcAttr._1 + & is the & + triplet.attr + & of & + triplet.dstAttr._1
).collect.foreach(println(_))
这样子,在scala项目中调用spark的接口来运行一些spark应用,在java项目中再调用scala。
4、scala项目maven的编译打包
java/scala混合的项目,怎么先编译scala再编译java,可以使用以下maven 命令来进行编译打包:
mvn clean scala:compile assembly:assembly
5、spark项目的jar包的运行问题
在开发时,我们可能会以local模式在IDEA中运行,然后使用了上面的命令进行打包。打包后的spark项目必须要放到spark集群下以spark-submit的方式提交运行。

我要回帖

更多关于 net开发与java开发区别 的文章

 

随机推荐