Spark 中用 Scala 和 java net开发与java开发区别有什么区别

1.RDD介绍:& & RDD,弹性分布式数据集,即分布式的元素集合。在spark中,对所有数据的操作不外乎是创建RDD、转化已有的RDD以及调用RDD操作进行求值。在这一切的背后,Spark会自动将RDD中的数据分发到集群中,并将操作并行化。& & Spark中的RDD就是一个不可变的分布式对象集合。每个RDD都被分为多个分区,这些分区运行在集群中的不同节点上。RDD可以包含Python,Java,Scala中任意类型的对象,甚至可以包含用户自定义的对象。& & 用户可以使用两种方法创建RDD:读取一个外部数据集,或在驱动器程序中分发驱动器程序中的对象集合,比如list或者set。& & RDD的转化操作都是惰性求值的,这意味着我们对RDD调用转化操作,操作不会立即执行。相反,Spark会在内部记录下所要求执行的操作的相关信息。我们不应该把RDD看做存放着特定数据的数据集,而最好把每个RDD当做我们通过转化操作构建出来的、记录如何计算数据的指令列表。数据读取到RDD中的操作也是惰性的,数据只会在必要时读取。转化操作和读取操作都有可能多次执行。2.创建RDD数据集& & (1)读取一个外部数据集JavaRDD&String& lines=sc.textFile(inputFile);& & (2)分发对象集合,这里以list为例List&String& list=new ArrayList&String&();list.add("a");list.add("b");list.add("c");JavaRDD&String& temp=sc.parallelize(list);//上述方式等价于JavaRDD&String& temp2=sc.parallelize(Arrays.asList("a","b","c"));3.RDD操作(1)转化操作& & 用java实现过滤器转化操作:List&String& list=new ArrayList&String&();//建立列表,列表中包含以下自定义表项list.add("error:a");list.add("error:b");list.add("error:c");list.add("warning:d");list.add("hadppy ending!");//将列表转换为RDD对象JavaRDD&String& lines = sc.parallelize(list);//将RDD对象lines中有error的表项过滤出来,放在RDD对象errorLines中JavaRDD&String& errorLines = lines.filter(
new Function&String, Boolean&() {
public Boolean call(String v1) throws Exception {
return v1.contains("error");
});//遍历过滤出来的列表项List&String& errorList = errorLines.collect();for (String line : errorList)
System.out.println(line);& & & &输出:error:aerror:berror:c可见,列表list中包含词语error的表项都被正确的过滤出来了。(2)合并操作将两个RDD数据集合并为一个RDD数据集接上述程序示例:JavaRDD&String& warningLines=lines.filter(
new Function&String, Boolean&() {
public Boolean call(String v1) throws Exception {
return v1.contains("warning");
});JavaRDD&String& unionLines=errorLines.union(warningLines);for(String line :unionLines.collect())
System.out.println(line);
输出:error:aerror:berror:cwarning:d可见,将原始列表项中的所有error项和warning项都过滤出来了。(3)获取RDD数据集中的部分或者全部元素①获取RDD数据集中的部分元素 & .take(int num) &返回值List&T& &&获取RDD数据集中的前num项。/** * Take the first num elements of the RDD. This currently scans the partitions *one by one*, so * it will be slow if a lot of partitions are required. In that case, use collect() to get the * whole RDD instead. */def take(num: Int): JList[T] 程序示例:接上JavaRDD&String& unionLines=errorLines.union(warningLines);for(String line :unionLines.take(2))
System.out.println(line);输出:error:aerror:b可见,输出了RDD数据集unionLines的前2项②获取RDD数据集中的全部元素 .collect() 返回值 List&T&程序示例:List&String& unions=unionLines.collect();for(String line :unions)
System.out.println(line);遍历输出RDD数据集unions的每一项4.向spark传递函数&①Function&T,R&JavaRDD&String& errorLines=lines.filter(
new Function&String, Boolean&() {
public Boolean call(String v1)throws Exception {
return v1.contains("error");
});过滤RDD数据集中包含error的表项,新建RDD数据集errorLines②FlatMapFunction&T,R&&List&String& strLine=new ArrayList&String&();strLine.add("how are you");strLine.add("I am ok");strLine.add("do you love me")JavaRDD&String& input=sc.parallelize(strLine);JavaRDD&String& words=input.flatMap(
new FlatMapFunction&String, String&() {
public Iterable&String& call(String s) throws Exception {
return Arrays.asList(s.split(" "));
});将文本行的单词过滤出来,并将所有的单词保存在RDD数据集words中。&③&Function&T1,T2,R&List&String& strLine=new ArrayList&String&();strLine.add("how are you");strLine.add("I am ok");strLine.add("do you love me");JavaRDD&String& input=sc.parallelize(strLine);JavaRDD&String& words=input.flatMap(
new FlatMapFunction&String, String&() {
public Iterable&String& call(String s) throws Exception {
return Arrays.asList(s.split(" "));
});JavaPairRDD&String,Integer& counts=words.mapToPair(
new PairFunction&String, String, Integer&() {
public Tuple2&String, Integer& call(String s) throws Exception {
return new Tuple2(s, 1);
});JavaPairRDD &String,Integer& results=counts.reduceByKey(
new org.apache.spark.api.java.function.Function2&Integer, Integer, Integer&() {
public Integer call(Integer v1, Integer v2) throws Exception {
return v1 + v2;
}) ;上述程序是spark中的wordcount实现方式,其中的reduceByKey操作的Function2函数定义了遇到相同的key时,value是如何reduce的-&直接将两者的value相加。*注意:可以将我们的函数类定义为使用匿名内部类,就像上述程序实现的那样,也可以创建一个具名类,就像这样:class ContainError implements Function&String,Boolean&{
public Boolean call(String v1) throws Exception {
return v1.contains("error");
}}JavaRDD&String& errorLines=lines.filter(new ContainError());for(String line :errorLines.collect())
System.out.println(line);具名类也可以有参数,就像上述过滤出含有”error“的表项,我们可以自定义到底含有哪个词语,就像这样,程序就更有普适性了。5.针对每个元素的转化操作:& & 转化操作map()接收一个函数,把这个函数用于RDD中的每个元素,将函数的返回结果作为结果RDD中对应的元素。关键词:转化& & 转化操作filter()接受一个函数,并将RDD中满足该函数的元素放入新的RDD中返回。关键词:过滤示例图如下所示:①map()计算RDD中各值的平方JavaRDD&Integer& rdd =sc.parallelize(Arrays.asList(1,2,3,4));JavaRDD&Integer& result=rdd.map(
new Function&Integer, Integer&() {
public Integer call(Integer v1) throwsException {
return v1*v1;
});System.out.println( StringUtils.join(result.collect(),","));输出:1,4,9,16filter()②&去除RDD集合中值为1的元素:JavaRDD&Integer& rdd =sc.parallelize(Arrays.asList(1,2,3,4));JavaRDD&Integer& results=rdd.filter(new Function&Integer, Boolean&() {
public Boolean call(Integer v1) throws Exception {
return v1!=1;
});System.out.println(StringUtils.join(results.collect(),","));结果:2,3,4③ 有时候,我们希望对每个输入元素生成多个输出元素。实现该功能的操作叫做flatMap()。和map()类似,我们提供给flatMap()的函数被分别应用到了输入的RDD的每个元素上。不过返回的不是一个元素,而是一个返回值序列的迭代器。输出的RDD倒不是由迭代器组成的。我们得到的是一个包含各个迭代器可以访问的所有元素的RDD。flatMap()的一个简单用途是将输入的字符串切分成单词,如下所示:&JavaRDD&String& rdd =sc.parallelize(Arrays.asList("hello world","hello you","world i love you"));JavaRDD&String& words=rdd.flatMap(
new FlatMapFunction&String, String&() {
public Iterable&String& call(String s) throws Exception {
return Arrays.asList(s.split(" "));
});System.out.println(StringUtils.join(words.collect(),'\n'));输出:helloworldhelloyouworldiloveyou6.集合操作RDD中的集合操作集合操作对笛卡尔集的处理:RDD1.cartesian(RDD2)返回两个RDD数据集的笛卡尔集程序示例:生成RDD集合{1,2} 和{1,2}的笛卡尔集JavaRDD&Integer& rdd1 = sc.parallelize(Arrays.asList(1,2));JavaRDD&Integer& rdd2 = sc.parallelize(Arrays.asList(1,2));JavaPairRDD&Integer ,Integer& rdd=rdd1.cartesian(rdd2);for(Tuple2&Integer,Integer& tuple:rdd.collect())
System.out.println(tuple._1()+"-&"+tuple._2());输出:1-&11-&22-&12-&27.行动操作(1)reduce操作& & reduce()接收一个函数作为参数,这个函数要操作两个RDD的元素类型的数据并返回一个同样类型的新元素。一个简单的例子就是函数+,可以用它来对我们的RDD进行累加。使用reduce(),可以很方便地计算出RDD中所有元素的总和,元素的个数,以及其他类型的聚合操作。& & 以下是求RDD数据集所有元素和的程序示例:JavaRDD&Integer& rdd = sc.parallelize(Arrays.asList(1,2,3,4,5,6,7,8,9,10));Integer sum =rdd.reduce(
new Function2&Integer, Integer, Integer&() {
public Integercall(Integer v1, Integer v2) throws Exception {
return v1+v2;
});System.out.println(sum.intValue());输出:55(2)fold()操作& & 接收一个与reduce()接收的函数签名相同的函数,再加上一个初始值来作为每个分区第一次调用时的结果。你所提供的初始值应当是你提供的操作的单位元素,也就是说,使用你的函数对这个初始值进行多次计算不会改变结果(例如+对应的0,*对应的1,或者拼接操作对应的空列表)。& & 程序实例:①计算RDD数据集中所有元素的和:zeroValue=0;//求和时,初始值为0。JavaRDD&Integer& rdd = sc.parallelize(Arrays.asList(1,2,3,4,5,6,7,8,9,10));Integer sum =rdd.fold(0,
new Function2&Integer, Integer, Integer&() {
public Integer call(Integer v1, Integer v2) throws Exception {
return v1+v2;
});System.out.println(sum);②计算RDD数据集中所有元素的积:zeroValue=1;//求积时,初始值为1。JavaRDD&Integer& rdd = sc.parallelize(Arrays.asList(1,2,3,4,5,6,7,8,9,10));Integer result =rdd.fold(1,
new Function2&Integer, Integer, Integer&() {
public Integer call(Integer v1, Integer v2) throws Exception {
return v1*v2;
});System.out.println(result);(3)aggregate()操作& & aggregate()函数返回值类型不必与所操作的RDD类型相同。& & 与fold()类似,使用aggregate()时,需要提供我们期待返回的类型的初始值。然后通过一个函数把RDD中的元素合并起来放入累加器。考虑到每个节点是在本地进行累加的,最终,还需要提供第二个函数来将累加器两两合并。以下是程序实例:public class AvgCount implements Serializable{public int total;
public int num;
public AvgCount(int total,int num){
this.total=total;
this.num=num;}public double avg(){
return total/(double)num;}static Function2&AvgCount,Integer,AvgCount& addAndCount=new Function2&AvgCount, Integer, AvgCount&() {
public AvgCount call(AvgCount a, Integer x) throws Exception {
a.total+=x;
}};static Function2&AvgCount,AvgCount,AvgCount& combine=
new Function2&AvgCount, AvgCount, AvgCount&() {
public AvgCount call(AvgCount a, AvgCount b) throws Exception {
a.total+=b.total;
a.num+=b.num;
public static void main(String args[]){
SparkConf conf = new SparkConf().setMaster("local").setAppName("my app");
JavaSparkContext sc = new JavaSparkContext(conf);
AvgCount intial =new AvgCount(0,0);
JavaRDD&Integer& rdd =sc.parallelize(Arrays.asList(1, 2, 3, 4, 5, 6));
AvgCount result=rdd.aggregate(intial,addAndCount,combine);
System.out.println(result.avg());
}}这个程序示例可以实现求出RDD对象集的平均数的功能。其中addAndCount将RDD对象集中的元素合并起来放入AvgCount对象之中,combine提供两个AvgCount对象的合并的实现。我们初始化AvgCount(0,0),表示有0个对象,对象的和为0,最终返回的result对象中total中储存了所有元素的和,num储存了元素的个数,这样调用result对象的函数avg()就能够返回最终所需的平均数,即avg=tatal/(double)num。8.持久化缓存& & 因为Spark RDD是惰性求值的,而有时我们希望能多次使用同一个RDD。如果简单地对RDD调用行动操作,Spark每次都会重算RDD以及它的所有依赖。这在迭代算法中消耗格外大,因为迭代算法常常会多次使用同一组数据。& & 为了避免多次计算同一个RDD,可以让Spark对数据进行持久化。当我们让Spark持久化存储一个RDD时,计算出RDD的节点会分别保存它们所求出的分区数据。& & 出于不同的目的,我们可以为RDD选择不同的持久化级别。默认情况下persist()会把数据以序列化的形式缓存在JVM的堆空间中& & & & & & & & & & & & & & & & & & & & & & & & & & & &&不同关键字对应的存储级别表级别使用的空间cpu时间是否在内存是否在磁盘备注MEMORY_ONLY高低是否直接储存在内存MEMORY_ONLY_SER低高是否序列化后储存在内存里MEMORY_AND_DISK低中等部分部分如果数据在内存中放不下,溢写在磁盘上MEMORY_AND_DISK_SER低高部分部分数据在内存中放不下,溢写在磁盘中。内存中存放序列化的数据。DISK_ONLY低高否是直接储存在硬盘里面程序示例:将RDD数据集持久化在内存中。JavaRDD&Integer& rdd =sc.parallelize(Arrays.asList(1,2,3,4,5));rdd.persist(StorageLevel.MEMORY_ONLY());System.out.println(rdd.count());System.out.println(StringUtils.join(rdd.collect(),','));RDD还有unpersist()方法,调用该方法可以手动把持久化的RDD从缓存中移除。9.不同的RDD类型& & Java中有两个专门的类JavaDoubleRDD和JavaPairRDD,来处理特殊类型的RDD,这两个类还针对这些类型提供了额外的函数,折让你可以更加了解所发生的一切,但是也显得有些累赘。& & 要构建这些特殊类型的RDD,需要使用特殊版本的类来替代一般使用的Function类。如果要从T类型的RDD创建出一个DoubleRDD,我们就应当在映射操作中使用DoubleFunction&T&来替代Function&T,Double&。程序实例:以下是一个求RDD每个对象的平方值的程序实例,将普通的RDD对象转化为DoubleRDD对象,最后调用DoubleRDD对象的max()方法,返回生成的平方值中的最大值。JavaRDD&Integer& rdd =sc.parallelize(Arrays.asList(1,2,3,4,5));JavaDoubleRDD result=rdd.mapToDouble(
new DoubleFunction&Integer&() {
public double call(Integer integer) throws Exception {
return (double) integer*integer;
});System.out.println(result.max());
阅读(...) 评论()查看: 62019|回复: 1
Spark:用Scala和Java实现WordCount
主题帖子积分
高级会员, 积分 2837, 距离下一级还需 2163 积分
高级会员, 积分 2837, 距离下一级还需 2163 积分
问题导读:
1. 如何使用IDEA?
2.用java编写spark程序?
3.Spark实现wordcount?
为了在IDEA中编写scala,今天安装配置学习了IDEA集成开发环境。IDEA确实很优秀,学会之后,用起来很顺手。关于如何搭建scala和IDEA开发环境,请看文末的参考资料。
用Scala和Java实现WordCount,其中Java实现的JavaWordCount是spark自带的例子($SPARK_HOME/examples/src/main/java/org/apache/spark/examples/JavaWordCount.java)
OS:Red Hat Enterprise Linux Server release 6.4 (Santiago)Hadoop:Hadoop 2.4.1JDK:1.7.0_60Spark:1.1.0Scala:2.11.2集成开发环境:IntelliJ IDEA 13.1.3
注意:需要在客户端windows环境下安装IDEA、Scala、JDK,并且为IDEA下载scala插件。
2.Scala实现单词计数
1 package com.hq
4&&* User: hadoop
5&&* Date:
6&&* Time: 18:59
8 import org.apache.spark.SparkConf
9 import org.apache.spark.SparkContext
10 import org.apache.spark.SparkContext._
11
12 /**
13&&* 统计字符出现次数
14&&*/
15 object WordCount {
16& &def main(args: Array[String]) {
17& &&&if (args.length & 1) {
18& && & System.err.println(&Usage: &file&&)
19& && & System.exit(1)
20& &&&}
21
22& &&&val conf = new SparkConf()
23& &&&val sc = new SparkContext(conf)
24& &&&val line = sc.textFile(args(0))
25
26& &&&line.flatMap(_.split(& &)).map((_, 1)).reduceByKey(_+_).collect().foreach(println)
27
28& &&&sc.stop()
29& &}
30 }复制代码
3.Java实现单词计数
1 package com.
4&&* User: hadoop
5&&* Date:
6&&* Time: 19:26
9 import org.apache.spark.SparkC
10 import org.apache.spark.api.java.JavaPairRDD;
11 import org.apache.spark.api.java.JavaRDD;
12 import org.apache.spark.api.java.JavaSparkC
13 import org.apache.spark.api.java.function.FlatMapF
14 import org.apache.spark.api.java.function.Function2;
15 import org.apache.spark.api.java.function.PairF
16 import scala.Tuple2;
17
18 import java.util.A
19 import java.util.L
20 import java.util.regex.P
21
22 public final class JavaWordCount {
23& &private static final Pattern SPACE = Pattern.compile(& &);
24
25& &public static void main(String[] args) throws Exception {
26
27& &&&if (args.length & 1) {
28& && & System.err.println(&Usage: JavaWordCount &file&&);
29& && & System.exit(1);
30& &&&}
31
32& &&&SparkConf sparkConf = new SparkConf().setAppName(&JavaWordCount&);
33& &&&JavaSparkContext ctx = new JavaSparkContext(sparkConf);
34& &&&JavaRDD&String& lines = ctx.textFile(args[0], 1);
35
36& &&&JavaRDD&String& words = lines.flatMap(new FlatMapFunction&String, String&() {
37& && & @Override
38& && & public Iterable&String& call(String s) {
39& && && &return Arrays.asList(SPACE.split(s));
40& && & }
41& &&&});
42
43& &&&JavaPairRDD&String, Integer& ones = words.mapToPair(new PairFunction&String, String, Integer&() {
44& && & @Override
45& && & public Tuple2&String, Integer& call(String s) {
46& && && &return new Tuple2&String, Integer&(s, 1);
47& && & }
48& &&&});
49
50& &&&JavaPairRDD&String, Integer& counts = ones.reduceByKey(new Function2&Integer, Integer, Integer&() {
51& && & @Override
52& && & public Integer call(Integer i1, Integer i2) {
53& && && &return i1 + i2;
54& && & }
55& &&&});
56
57& &&&List&Tuple2&String, Integer&& output = counts.collect();
58& &&&for (Tuple2&?, ?& tuple : output) {
59& && & System.out.println(tuple._1() + &: & + tuple._2());
60& &&&}
61& &&&ctx.stop();
62& &}
63 }复制代码
4.IDEA打包和运行
4.1 IDEA的工程结构
在IDEA中建立Scala工程,并导入spark api编程jar包(spark-assembly-1.1.0-hadoop2.4.0.jar:$SPARK_HOME/lib/里面)
3-1.png (72.08 KB, 下载次数: 28)
20:43 上传
4.2 打成jar包
File ---& Project Structure
3-2.png (58.68 KB, 下载次数: 21)
20:43 上传
配置完成后,在菜单栏中选择Build-&Build Artifacts...,然后使用Build等命令打包。打包完成后会在状态栏中显示“Compilation completed successfully...”的信息,去jar包输出路径下查看jar包,如下所示。
3-3.png (52.22 KB, 下载次数: 23)
20:43 上传
ScalaTest1848.jar就是我们编程所产生的jar包,里面包含了三个类HelloWord、WordCount、JavaWordCount。
可以用这个jar包在spark集群里面运行java或者scala的单词计数程序。
4.3 以Spark集群standalone方式运行单词计数
上传jar包到服务器,并放置在/home/ebupt/test/WordCount.jar路径下。
上传一个text文本文件到HDFS作为单词计数的输入文件:hdfs://eb170:8020/user/ebupt/text
内容如下
1 import org apache spark api java JavaPairRDD& &
2 import org apache spark api java JavaRDD& &
3 import org apache spark api java JavaSparkContext& &
4 import org apache spark api java function FlatMapFunction& &
5 import org apache spark api java function Function& &
6 import org apache spark api java function Function2& &
7 import org apache spark api java function PairFunction& &
8 import scala Tuple2 复制代码
用spark-submit命令提交任务运行,具体使用查看:spark-submit --help
1 [ebupt@eb174 bin]$ spark-submit --help
2 Spark assembly has been built with Hive, including Datanucleus jars on classpath
3 Usage: spark-submit [options] &app jar | python file& [app options]
4 Options:
5& &--master MASTER_URL& && && &spark://host:port, mesos://host:port, yarn, or local.
6& &--deploy-mode DEPLOY_MODE& &Whether to launch the driver program locally (&client&) or
7& && && && && && && && && && & on one of the worker machines inside the cluster (&cluster&)
8& && && && && && && && && && & (Default: client).
9& &--class CLASS_NAME& && && & Your application's main class (for Java / Scala apps).
10& &--name NAME& && && && && &&&A name of your application.
11& &--jars JARS& && && && && &&&Comma-separated list of local jars to include on the driver
12& && && && && && && && && && & and executor classpaths.
13& &--py-files PY_FILES& && && &Comma-separated list of .zip, .egg, or .py files to place
14& && && && && && && && && && & on the PYTHONPATH for Python apps.
15& &--files FILES& && && && && &Comma-separated list of files to be placed in the working
16& && && && && && && && && && & directory of each executor.
17
18& &--conf PROP=VALUE& && && &&&Arbitrary Spark configuration property.
19& &--properties-file FILE& && &Path to a file from which to load extra properties. If not
20& && && && && && && && && && & specified, this will look for conf/spark-defaults.conf.
21
22& &--driver-memory MEM& && && &Memory for driver (e.g. 1000M, 2G) (Default: 512M).
23& &--driver-java-options& && & Extra Java options to pass to the driver.
24& &--driver-library-path& && & Extra library path entries to pass to the driver.
25& &--driver-class-path& && && &Extra class path entries to pass to the driver. Note that
26& && && && && && && && && && & jars added with --jars are automatically included in the
27& && && && && && && && && && & classpath.
28
29& &--executor-memory MEM& && & Memory per executor (e.g. 1000M, 2G) (Default: 1G).
30
31& &--help, -h& && && && && && &Show this help message and exit
32& &--verbose, -v& && && && && &Print additional debug output
33
34&&Spark standalone with cluster deploy mode only:
35& &--driver-cores NUM& && && & Cores for driver (Default: 1).
36& &--supervise& && && && && &&&If given, restarts the driver on failure.
37
38&&Spark standalone and Mesos only:
39& &--total-executor-cores NUM&&Total cores for all executors.
40
41&&YARN-only:
42& &--executor-cores NUM& && &&&Number of cores per executor (Default: 1).
43& &--queue QUEUE_NAME& && && & The YARN queue to submit to (Default: &default&).
44& &--num-executors NUM& && && &Number of executors to launch (Default: 2).
45& &--archives ARCHIVES& && && &Comma separated list of archives to be extracted into the
46& && && && && && && && && && & working directory of each executor.复制代码
①提交scala实现的单词计数:
[ebupt@eb174 test]$ spark-submit --master spark://eb174:7077 --name WordCountByscala --class com.hq.WordCount --executor-memory 1G --total-executor-cores 2 ~/test/WordCount.jar hdfs://eb170:8020/user/ebupt/text复制代码
②提交java实现的单词计数:
[ebupt@eb174 test]$ spark-submit --master spark://eb174:7077 --name JavaWordCountByHQ --class com.hq.JavaWordCount --executor-memory 1G --total-executor-cores 2 ~/test/WordCount.jar hdfs://eb170:8020/user/ebupt/text复制代码
③2者运行结果类似,所以只写了一个:
1 Spark assembly has been built with Hive, including Datanucleus jars on classpath
&&2 Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
&&3 14/10/10 19:24:51 INFO SecurityManager: Changing view acls to: ebupt,
&&4 14/10/10 19:24:51 INFO SecurityManager: Changing modify acls to: ebupt,
&&5 14/10/10 19:24:51 INFO SecurityManager: SecurityManager: aut users with view permissions: Set(ebupt, ); users with modify permissions: Set(ebupt, )
&&6 14/10/10 19:24:52 INFO Slf4jLogger: Slf4jLogger started
&&7 14/10/10 19:24:52 INFO Remoting: Starting remoting
&&8 14/10/10 19:24:52 INFO Remoting: R listening on addresses :[akka.tcp://sparkDriver@eb174:56344]
&&9 14/10/10 19:24:52 INFO Remoting: Remoting now listens on addresses: [akka.tcp://sparkDriver@eb174:56344]
10 14/10/10 19:24:52 INFO Utils: Successfully started service 'sparkDriver' on port 56344.
11 14/10/10 19:24:52 INFO SparkEnv: Registering MapOutputTracker
12 14/10/10 19:24:52 INFO SparkEnv: Registering BlockManagerMaster
13 14/10/10 19:24:52 INFO DiskBlockManager: Created local directory at /tmp/spark-local-52-3398
14 14/10/10 19:24:52 INFO Utils: Successfully started service 'Connection manager for block manager' on port 41110.
15 14/10/10 19:24:52 INFO ConnectionManager: Bound socket to port 41110 with id = ConnectionManagerId(eb174,41110)
16 14/10/10 19:24:52 INFO MemoryStore: MemoryStore started with capacity 265.4 MB
17 14/10/10 19:24:52 INFO BlockManagerMaster: Trying to register BlockManager
18 14/10/10 19:24:52 INFO BlockManagerMasterActor: Registering block manager eb174:41110 with 265.4 MB RAM
19 14/10/10 19:24:52 INFO BlockManagerMaster: Registered BlockManager
20 14/10/10 19:24:52 INFO HttpFileServer: HTTP File server directory is /tmp/spark-8051667e-bfdb-4ecd-b16bb13
21 14/10/10 19:24:52 INFO HttpServer: Starting HTTP Server
22 14/10/10 19:24:52 INFO Utils: Successfully started service 'HTTP file server' on port 48233.
23 14/10/10 19:24:53 INFO Utils: Successfully started service 'SparkUI' on port 4040.
24 14/10/10 19:24:53 INFO SparkUI: Started SparkUI at http://eb174:4040
25 14/10/10 19:24:53 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
26 14/10/10 19:24:53 INFO SparkContext: Added JAR file:/home/ebupt/test/WordCountByscala.jar at http://10.1.69.174:48233/jars/WordCountByscala.jar with timestamp 2
27 14/10/10 19:24:53 INFO AppClient$ClientActor: Connecting to master spark://eb174:7077...
28 14/10/10 19:24:53 INFO SparkDeploySchedulerBackend: SchedulerBackend is ready for scheduling beginning after reached minRegisteredResourcesRatio: 0.0
29 14/10/10 19:24:53 INFO MemoryStore: ensureFreeSpace(163705) called with curMem=0, maxMem=
30 14/10/10 19:24:53 INFO MemoryStore: Block broadcast_0 stored as values in memory (estimated size 159.9 KB, free 265.3 MB)
31 14/10/10 19:24:53 INFO SparkDeploySchedulerBackend: Connected to Spark cluster with app ID app-53-0009
32 14/10/10 19:24:53 INFO AppClient$ClientActor: Executor added: app-53-0009/0 on worker-32-eb176-49618 (eb176:49618) with 1 cores
33 14/10/10 19:24:53 INFO SparkDeploySchedulerBackend: Granted executor ID app-53-0009/0 on hostPort eb176:49618 with 1 cores, 1024.0 MB RAM
34 14/10/10 19:24:53 INFO AppClient$ClientActor: Executor added: app-53-0009/1 on worker-32-eb175-56337 (eb175:56337) with 1 cores
35 14/10/10 19:24:53 INFO SparkDeploySchedulerBackend: Granted executor ID app-53-0009/1 on hostPort eb175:56337 with 1 cores, 1024.0 MB RAM
36 14/10/10 19:24:53 INFO AppClient$ClientActor: Executor updated: app-53-0009/0 is now RUNNING
37 14/10/10 19:24:53 INFO AppClient$ClientActor: Executor updated: app-53-0009/1 is now RUNNING
38 14/10/10 19:24:53 INFO MemoryStore: ensureFreeSpace(12633) called with curMem=163705, maxMem=
39 14/10/10 19:24:53 INFO MemoryStore: Block broadcast_0_piece0 stored as bytes in memory (estimated size 12.3 KB, free 265.2 MB)
40 14/10/10 19:24:53 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory on eb174:41110 (size: 12.3 KB, free: 265.4 MB)
41 14/10/10 19:24:53 INFO BlockManagerMaster: Updated info of block broadcast_0_piece0
42 14/10/10 19:24:54 INFO FileInputFormat: Total input paths to process : 1
43 14/10/10 19:24:54 INFO SparkContext: Starting job: collect at WordCount.scala:26
44 14/10/10 19:24:54 INFO DAGScheduler: Registering RDD 3 (map at WordCount.scala:26)
45 14/10/10 19:24:54 INFO DAGScheduler: Got job 0 (collect at WordCount.scala:26) with 2 output partitions (allowLocal=false)
46 14/10/10 19:24:54 INFO DAGScheduler: Final stage: Stage 0(collect at WordCount.scala:26)
47 14/10/10 19:24:54 INFO DAGScheduler: Parents of final stage: List(Stage 1)
48 14/10/10 19:24:54 INFO DAGScheduler: Missing parents: List(Stage 1)
49 14/10/10 19:24:54 INFO DAGScheduler: Submitting Stage 1 (MappedRDD[3] at map at WordCount.scala:26), which has no missing parents
50 14/10/10 19:24:54 INFO MemoryStore: ensureFreeSpace(3400) called with curMem=176338, maxMem=
51 14/10/10 19:24:54 INFO MemoryStore: Block broadcast_1 stored as values in memory (estimated size 3.3 KB, free 265.2 MB)
52 14/10/10 19:24:54 INFO MemoryStore: ensureFreeSpace(2082) called with curMem=179738, maxMem=
53 14/10/10 19:24:54 INFO MemoryStore: Block broadcast_1_piece0 stored as bytes in memory (estimated size 2.0 KB, free 265.2 MB)
54 14/10/10 19:24:54 INFO BlockManagerInfo: Added broadcast_1_piece0 in memory on eb174:41110 (size: 2.0 KB, free: 265.4 MB)
55 14/10/10 19:24:54 INFO BlockManagerMaster: Updated info of block broadcast_1_piece0
56 14/10/10 19:24:54 INFO DAGScheduler: Submitting 2 missing tasks from Stage 1 (MappedRDD[3] at map at WordCount.scala:26)
57 14/10/10 19:24:54 INFO TaskSchedulerImpl: Adding task set 1.0 with 2 tasks
58 14/10/10 19:24:56 INFO SparkDeploySchedulerBackend: Registered executor: Actor[akka.tcp://sparkExecutor@eb176:35482/user/Executor#] with ID 0
59 14/10/10 19:24:56 INFO TaskSetManager: Starting task 0.0 in stage 1.0 (TID 0, eb176, ANY, 1238 bytes)
60 14/10/10 19:24:56 INFO SparkDeploySchedulerBackend: Registered executor: Actor[akka.tcp://sparkExecutor@eb175:35502/user/Executor#-] with ID 1
61 14/10/10 19:24:56 INFO TaskSetManager: Starting task 1.0 in stage 1.0 (TID 1, eb175, ANY, 1238 bytes)
62 14/10/10 19:24:56 INFO BlockManagerMasterActor: Registering block manager eb176:33296 with 530.3 MB RAM
63 14/10/10 19:24:56 INFO BlockManagerMasterActor: Registering block manager eb175:32903 with 530.3 MB RAM
64 14/10/10 19:24:57 INFO ConnectionManager: Accepted connection from [eb176/10.1.69.176:39218]
65 14/10/10 19:24:57 INFO ConnectionManager: Accepted connection from [eb175/10.1.69.175:55227]
66 14/10/10 19:24:57 INFO SendingConnection: Initiating connection to [eb176/10.1.69.176:33296]
67 14/10/10 19:24:57 INFO SendingConnection: Initiating connection to [eb175/10.1.69.175:32903]
68 14/10/10 19:24:57 INFO SendingConnection: Connected to [eb175/10.1.69.175:32903], 1 messages pending
69 14/10/10 19:24:57 INFO SendingConnection: Connected to [eb176/10.1.69.176:33296], 1 messages pending
70 14/10/10 19:24:57 INFO BlockManagerInfo: Added broadcast_1_piece0 in memory on eb175:32903 (size: 2.0 KB, free: 530.3 MB)
71 14/10/10 19:24:57 INFO BlockManagerInfo: Added broadcast_1_piece0 in memory on eb176:33296 (size: 2.0 KB, free: 530.3 MB)
72 14/10/10 19:24:57 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory on eb176:33296 (size: 12.3 KB, free: 530.3 MB)
73 14/10/10 19:24:57 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory on eb175:32903 (size: 12.3 KB, free: 530.3 MB)
74 14/10/10 19:24:58 INFO TaskSetManager: Finished task 1.0 in stage 1.0 (TID 1) in 1697 ms on eb175 (1/2)
75 14/10/10 19:24:58 INFO TaskSetManager: Finished task 0.0 in stage 1.0 (TID 0) in 1715 ms on eb176 (2/2)
76 14/10/10 19:24:58 INFO TaskSchedulerImpl: Removed TaskSet 1.0, whose tasks have all completed, from pool
77 14/10/10 19:24:58 INFO DAGScheduler: Stage 1 (map at WordCount.scala:26) finished in 3.593 s
78 14/10/10 19:24:58 INFO DAGScheduler: looking for newly runnable stages
79 14/10/10 19:24:58 INFO DAGScheduler: running: Set()
80 14/10/10 19:24:58 INFO DAGScheduler: waiting: Set(Stage 0)
81 14/10/10 19:24:58 INFO DAGScheduler: failed: Set()
82 14/10/10 19:24:58 INFO DAGScheduler: Missing parents for Stage 0: List()
83 14/10/10 19:24:58 INFO DAGScheduler: Submitting Stage 0 (ShuffledRDD[4] at reduceByKey at WordCount.scala:26), which is now runnable
84 14/10/10 19:24:58 INFO MemoryStore: ensureFreeSpace(2096) called with curMem=181820, maxMem=
85 14/10/10 19:24:58 INFO MemoryStore: Block broadcast_2 stored as values in memory (estimated size 2.0 KB, free 265.2 MB)
86 14/10/10 19:24:58 INFO MemoryStore: ensureFreeSpace(1338) called with curMem=183916, maxMem=
87 14/10/10 19:24:58 INFO MemoryStore: Block broadcast_2_piece0 stored as bytes in memory (estimated size 1338.0 B, free 265.2 MB)
88 14/10/10 19:24:58 INFO BlockManagerInfo: Added broadcast_2_piece0 in memory on eb174:41110 (size: 1338.0 B, free: 265.4 MB)
89 14/10/10 19:24:58 INFO BlockManagerMaster: Updated info of block broadcast_2_piece0
90 14/10/10 19:24:58 INFO DAGScheduler: Submitting 2 missing tasks from Stage 0 (ShuffledRDD[4] at reduceByKey at WordCount.scala:26)
91 14/10/10 19:24:58 INFO TaskSchedulerImpl: Adding task set 0.0 with 2 tasks
92 14/10/10 19:24:58 INFO TaskSetManager: Starting task 0.0 in stage 0.0 (TID 2, eb175, PROCESS_LOCAL, 1008 bytes)
93 14/10/10 19:24:58 INFO TaskSetManager: Starting task 1.0 in stage 0.0 (TID 3, eb176, PROCESS_LOCAL, 1008 bytes)
94 14/10/10 19:24:58 INFO BlockManagerInfo: Added broadcast_2_piece0 in memory on eb175:32903 (size: 1338.0 B, free: 530.3 MB)
95 14/10/10 19:24:58 INFO BlockManagerInfo: Added broadcast_2_piece0 in memory on eb176:33296 (size: 1338.0 B, free: 530.3 MB)
96 14/10/10 19:24:58 INFO MapOutputTrackerMasterActor: Asked to send map output locations for shuffle 0 to sparkExecutor@eb175:59119
97 14/10/10 19:24:58 INFO MapOutputTrackerMaster: Size of output statuses for shuffle 0 is 144 bytes
98 14/10/10 19:24:58 INFO MapOutputTrackerMasterActor: Asked to send map output locations for shuffle 0 to sparkExecutor@eb176:39028
99 14/10/10 19:24:58 INFO TaskSetManager: Finished task 1.0 in stage 0.0 (TID 3) in 109 ms on eb176 (1/2)
100 14/10/10 19:24:58 INFO TaskSetManager: Finished task 0.0 in stage 0.0 (TID 2) in 120 ms on eb175 (2/2)
101 14/10/10 19:24:58 INFO DAGScheduler: Stage 0 (collect at WordCount.scala:26) finished in 0.123 s
102 14/10/10 19:24:58 INFO TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks have all completed, from pool
103 14/10/10 19:24:58 INFO SparkContext: Job finished: collect at WordCount.scala:26, took 3. s
104 (scala,1)
105 (Function2,1)
106 (JavaSparkContext,1)
107 (JavaRDD,1)
108 (Tuple2,1)
110 (org,7)
111 (apache,7)
112 (JavaPairRDD,1)
113 (java,7)
114 (function,4)
115 (api,7)
116 (Function,1)
117 (PairFunction,1)
118 (spark,7)
119 (FlatMapFunction,1)
120 (import,8)
121 14/10/10 19:24:58 INFO SparkUI: Stopped Spark web UI at http://eb174:4040
122 14/10/10 19:24:58 INFO DAGScheduler: Stopping DAGScheduler
123 14/10/10 19:24:58 INFO SparkDeploySchedulerBackend: Shutting down all executors
124 14/10/10 19:24:58 INFO SparkDeploySchedulerBackend: Asking each executor to shut down
125 14/10/10 19:24:58 INFO ConnectionManager: Removing SendingConnection to ConnectionManagerId(eb176,33296)
126 14/10/10 19:24:58 INFO ConnectionManager: Removing ReceivingConnection to ConnectionManagerId(eb176,33296)
127 14/10/10 19:24:58 ERROR ConnectionManager: Corresponding SendingConnection to ConnectionManagerId(eb176,33296) not found
128 14/10/10 19:24:58 INFO ConnectionManager: Removing ReceivingConnection to ConnectionManagerId(eb175,32903)
129 14/10/10 19:24:58 INFO ConnectionManager: Removing SendingConnection to ConnectionManagerId(eb175,32903)
130 14/10/10 19:24:58 INFO ConnectionManager: Removing SendingConnection to ConnectionManagerId(eb175,32903)
131 14/10/10 19:24:58 INFO ConnectionManager: Key not valid ? sun.nio.ch.SelectionKeyImpl@5e92c11b
132 14/10/10 19:24:58 INFO ConnectionManager: key already cancelled ? sun.nio.ch.SelectionKeyImpl@5e92c11b
133 java.nio.channels.CancelledKeyException
134 at org.apache.spark.network.ConnectionManager.run(ConnectionManager.scala:310)
135 at org.apache.spark.network.ConnectionManager$anon$4.run(ConnectionManager.scala:139)
136 14/10/10 19:24:59 INFO MapOutputTrackerMasterActor: MapOutputTrackerActor stopped!
137 14/10/10 19:24:59 INFO ConnectionManager: Selector thread was interrupted!
138 14/10/10 19:24:59 INFO ConnectionManager: Removing ReceivingConnection to ConnectionManagerId(eb176,33296)
139 14/10/10 19:24:59 ERROR ConnectionManager: Corresponding SendingConnection to ConnectionManagerId(eb176,33296) not found
140 14/10/10 19:24:59 INFO ConnectionManager: Removing SendingConnection to ConnectionManagerId(eb176,33296)
141 14/10/10 19:24:59 WARN ConnectionManager: All connections not cleaned up
142 14/10/10 19:24:59 INFO ConnectionManager: ConnectionManager stopped
143 14/10/10 19:24:59 INFO MemoryStore: MemoryStore cleared
144 14/10/10 19:24:59 INFO BlockManager: BlockManager stopped
145 14/10/10 19:24:59 INFO BlockManagerMaster: BlockManagerMaster stopped
146 14/10/10 19:24:59 INFO SparkContext: Successfully stopped SparkContext
147 14/10/10 19:24:59 INFO RemoteActorRefProvider$RemotingTerminator: Shutting down remote daemon.
148 14/10/10 19:24:59 INFO RemoteActorRefProvider$RemotingTerminator: Rem proceeding with flushing remote transports.
149 14/10/10 19:24:59 INFO Remoting: Remoting shut down
150 14/10/10 19:24:59 INFO RemoteActorRefProvider$RemotingTerminator: Remoting shut down.复制代码
5.参考资料
关于IDEA的使用:
scala编写WC:
java编写WC:、
欢迎加入about云群 、、 ,云计算爱好者群,亦可关注||
主题帖子积分
中级会员, 积分 527, 距离下一级还需 473 积分
中级会员, 积分 527, 距离下一级还需 473 积分
赞一个,文章不错,思路清晰。
站长推荐 /4
云计算hadoop视频大全(新增 yarn、flume|storm、hadoop一套视频
等待验证会员请验证邮箱
新手获取积分方法
技术类问答,解决学习openstack,hadoop生态系统中遇到的问题
Powered by

我要回帖

更多关于 net开发与java开发区别 的文章

 

随机推荐