spark学习四 RDD转换是什么以及任务的运行

博客分类:
任务调度以及作业执行流程是Spark的核心,本文不进行源码级别的探究,只是概述Spark的核心组件、它们的作用以及它们如何协作以完成计算作业。
Spark核心组件
SparkContext
DAGScheduler
TaskScheduler
BlockManager
BlockTracker
ShuffleTracker
Spark集群架构概览
在上面这幅图片中,用户将任务提交给Driver,Driver将任务分发到所有的Worker节点(Driver最好跟Work节点在同一个局域网内,以使得任务的分发和结果回送更快)。Worker节点根据Driver提交过来的任务,算出位于本地的那部分数据,然后对它进行计算(这就是数据本地性的概念)。具体的做法是,Workder首先将数据加载到内存(如果内存中没有的话),形成RDD(所以,RDD存在于内存中),然后对RDD进行接下来的计算。
在这个架构中,Driver和Worker构成了Master/Slave架构,Driver负责分发任务,以及等待任务结果
另外一副架构图,在这个图中,Master和Driver是分开的,实际上是否会在一起呢?
spark计算速度快的原因
1.基于内存计算
2.general computation graph --即DAG,Worker对DAG进行优化,然后提交给TaskScheduler去执行。这里的问题是DAG何时构造,由谁构造,DAG的数据结构如何,DAG包含哪些信息,这个暂时放这里。目前要了解的重点是,DAG是提交给TaskScheduler等待调度执行。
上图中,自己编写程序提交到Master上,而Master是由四大部分组成(RDD Graph,Scheduler,Block Tracker以及Shuffle Tracker),启动RDD Graph就是DAG,它会提交给Task Scheduler任务调度器等待调度执行,具体执行时,Task Scheduler会把任务提交到Worker节点上。Block Tracker用于记录计算数据在Worker节点上的块信息。Shuffle Blocker用于记录RDD在计算过程中遇到Shuffle过程时会进行物化,那么Shuffle Tracker用于记录这些物化的RDD的存放信息
上面浅绿色的四个圆柱形框构成一个RDD,每个圆柱框都是一个Partition,每个Partition分配一个任务来执行。浅绿色圆柱框内的绿色矩形框表示实施RDD操作后的数据集,比如对于Task1,先对Partion执行map操作,再执行filter操作得到两个绿色矩形框。
因为map操作或者filter是对RDD进行调用的,所以,RDD中的Partition都会执行相同的动作序列,每个操作结束时,每个Partition都会产生一个数据集,这些数据集对应一个RDD,如MappedRDD,FilteredRDD。这样,就形成了RDD Graph,如上图中的八个绿色框,上面四个框形成一个RDD,下面四个框形成一个RDD。
作业与任务调度
DAGScheduler
1.三个输入元素
1.target RDD是什么RDD是初始RDD还是包含了所有的RDD,比如rdd.map().filter()操作,target RDD是什么
2.针对partition的function指的是什么,比如rdd.map().filter()操作,是map和filter函数都包括吗?
具体的,DAG Scheduler完成下面三个任务:
1.为每个Job(一个Action类型的操作如collect,count)分割Stage,同时决定最佳路径。DAGScheduler会记录哪个RDD或者Stage会被物化,从而寻找一个最佳调度方案。
2.将TaskSet提交给Task Tracker
3.重新提交输出lost的Stage
2. DAGScheduler优化
1.stage的操作是pipleline的
比如,stage内有5个操作,Spark的做法是1+1+1+1+1=5,而对于Hadoop而言,它的做法是1+1=2, 2+1=3,3+1=4,4+1=5,即每计算一步就先存入HDFS,然后后面的操作再从HDFS上都出来,因此IO消耗非常大。
2. 基于Partition选择最小化的join算法,减少Shuffle操作
在Hadoop中,Shuffle相当于Barrier(Join等待合并结果),Reduce操作需要等待Map操作完全执行完
3. 重用RDD Cache过的数据
因为DAGScheduler知道哪些RDD和Stage已经物化过,所以DAGScheduler在执行路径上,会尽可能的使用已经缓存过的数据
从上图中可以看到,AB位于同一个Stage,CDE位于同一个Stage。AB和CDE的结果做Join是产生了一个新的Stage
如下两个阶段一定会产生Stage
1.从数据源加载数据形成RDD时,一定会有Stage的产生
2.进行Shuffle,即有宽依赖的时候一定有Stage的产生,所以上面的DE应该产生一个Stage
Job执行流程
浏览: 399220 次
来自: 北京
关于第一个reduceByKey对应的cache,shuffl ...
看了你的文章,updateStateByKey 这个方式的使用 ...
棒极啦,解决了我的问题。
你好,这个代码生成主要在,那个地方使用。
看楼主这么厉害的样子,请问楼主如何知道类库的版本呢?比如g++ ...
(window.slotbydup=window.slotbydup || []).push({
id: '4773203',
container: s,
size: '200,200',
display: 'inlay-fix'Spark的RDD原理以及2.0特性的介绍(转)
Spark的RDD原理以及2.0特性的介绍
Spark 是什么
Spark 是 Apache 顶级项目里面最火的大数据处理的计算引擎,它目前是负责大数据计算的工作。包括离线计算或交互式查询、数据挖掘算法、流式计算以及图计算等。全世界有许多公司和组织使用或给社区贡献代码,社区的活跃度见 /apache/spark。
2013 年开始 Spark开发团队成立 Databricks,来对 Spark 进行运作和管理,并提供 Cloud 服务。Spark 社区基本保持一个季度一个版本,不出意外的话 Spark 2.0 将在五月底发布。
与 Mapreduce 相比,Spark 具备 DAG 执行引擎以及基于内存的多轮迭代计算等优势,在SQL 层面上,比 Hive/Pig 相比,引入关系数据库的许多特性,以及内存管理技术。另外在 Spark 上所有的计算模型最终都统一基于 RDD 之上运行执行,包括流式和离线计算。Spark 基于磁盘的性能是 MR 的 10 倍,基于内存的性能是 MR 的 100 倍 。
Spark 提供 SQL、机器学习库 MLlib、流计算 Streaming 和图计算 Graphx,同时也支持 Scala、Java、Python 和 R 语言开发的基于 API 的应用程序。
RDD 的原理
RDD,英文全称叫 Resilient Distributed Datasets。
an RDD is a read-only, partitioned collection of records. 字面意思是只读的分布式数据集。
但其实个人觉得可以把 RDD 理解为关系数据库 里的一个个操作,比如 map,filter,Join 等。在 Spark 里面实现了许多这样的 RDD 类,即可以看成是操作类。当我们调用一个 map 接口,底层实现是会生成一个 MapPartitionsRDD 对象,当 RDD 真正执行时,会调用 MapPartitionsRDD 对象里面的 compute 方法来执行这个操作的计算逻辑。但是不同的是,RDD 是 lazy 模式,只有像 count,saveasText 这种 action 动作被调用后再会去触发 runJob 动作。
RDD 分为二类:transformation 和 action。
transformation 是从一个 RDD 转换为一个新的 RDD 或者从数据源生成一个新的 RDD;
action 是触发 job 的执行。所有的 transformation 都是 lazy 执行,只有在 action 被提交的时候才触发前面整个 RDD 的执行图。如下
val file = sc.textFile(args(0))
val words = file.flatMap(line =& line.split(" "))
val wordCounts = words.map(x =& (x, 1)).reduceByKey(_ + _, 2) wordCounts.saveAsTextFile(args(1))
这段代码生成的 RDD 的执行树是如下图所示:
最终在 saveAsTextFile 方法时才会将整个 RDD 的执行图提交给 DAG 执行引擎,根据相关信息切分成一个一个 Stage,每个 Stage 去执行多个 task,最终完成整个 Job 的执行。
还有一个区别就是,RDD 计算后的中间结果是可以被持久化,当下一次需要使用时,可以直接使用之前持久化好的结果,而不是重新计算,并且这些结果被存储在各个结点的 executor 上。下一次使用时,调度器可以直接把 task 分发到存储持久化数据的结点上,减少数据的网络传输开稍。这种场景在数据挖掘迭代计算是经常出现。如下代码
val links = spark.textFile(...).map(...).persist() var ranks = // RDD of (URL, rank) pairs
for (i &- 1 to ITERATIONS) {
// Build an RDD of (targetURL, float) pairs // with the contributions sent by each page val contribs = links.join(ranks).flatMap {
(url, (links, rank)) =&
links.map(dest =& (dest, rank/links.size)) }
// Sum contributions by URL and get new ranks
ranks = contribs.reduceByKey((x,y) =& x+y)
.mapValues(sum =& a/N + (1-a)*sum) }
以上代码生成的 RDD 执行树如下图所示:
计算 contribs-0 时需要使用 links 的计算逻辑,当 links 每个分片计算完后,会将这个结果保存到本地内存或磁盘上,下一次 contribs-1 计算要使用 links 的数据时,直接从上一次保存的内存和磁盘上读取就可以了。这个持久化系统叫做 blockManager,类似于在内部再构建了一个 KV 系统,K 表示每个分区 ID 号,V 表示这个分区计算后的结果。
另外在 streaming 计算时,每个 batch 会去消息队列上拉取这个时间段的数据,每个 Recevier 接收过来数据形成 block 块并存放到 blockManager 上,为了可靠性,这个 block 块可以远程备份,后续的 batch 计算就直接在之前已读取的 block 块上进行计算,这样不断循环迭代来完成流处理。
一个 RDD 一般会有以下四个函数组成。
1. 操作算子的物理执行逻辑
def compute(split: Partition, context: TaskContext): Iterator[T]
如在 MapPartitionsRDD 里的实现是如下:
override def compute(split: Partition, context: TaskContext): Iterator[U] = f(context, split.index, firstParent[T].iterator(split, context))
f: (TaskContext, Int, Iterator[T]) =& Iterator[U]
2. 获取分片信息
protected def getPartitions: Array[Partition]
即这个操作的数据划分为多少个分 区。跟 mapreduce 里的 map 上的 split 类似的。
3. 获取父 RDD 的依赖关系
protected def getDependencies: Seq[Dependency[_]]
依赖分二种:如果 RDD 的每个分区最多只能被一个 Child RDD 的一个分区使用,则称之为 narrow dependency;若依赖于多个 Child RDD 分区,则称之为 wide dependency。不同的操作根据其特性,可能会产生不同的依赖 。如下图所示
map 操作前后二个 RDD 操作之间的分区是一对一的关系,故产生 narrow dependency,而 join 操作的分区分别对应于它的二个子操作相对应的分区,故产生 wide dependency。当最后要生成具体的 task 运行时,就需要利用这个依赖关系也生成 Stage 的 DAG 图。
4. 获取该操作对应数据的存放位置信息,主要是针对 HDFS 这类有数据源的 RDD。
protected def getPreferredLocations(split: Partition): Seq[String]
Spark 的执行模式
Spark 的执行模式有 local、Yarn、Standalone、Mesos 四类。后面三个分别有 cluster 和 client 二种。client 和 cluster 的区别就是指 Driver 是在程序提交客户端还是在集群的 AM 上。 比如常见的 Yarn-cluster 模式如下图所示:
一般来说,运行简单测试或 UT 用的是 local 模式运行,其实就是用多线程模似分布式执行。 如果业务部门较少且不需要对部门或组之间的资源做划分和优先级调度的话,可以使用 Standalone 模式来部署。
当如果有多个部门或组,且希望每个组织可以限制固定运行的最大资源,另外组或者任务需要有优先级执行的话,可以选择 Yarn 或 Mesos。
Spark 2.0 的特性
Unifying DataFrames and Datasets in Scala/Java
DataFrame 和 Dataset 的功能是什么?
它们都是提供给用户使用,包括各类操作接口的 API。1.3 版本引入 DataFrame,1.6 版本引入 Dataset,2.0 提供的功能是将二者统一,即保留 Dataset,而把 DataFrame 定义为 Dataset[Row],即是 Dataset 里的元素对象为 Row 的一种(SPARK-13485)。
DataFrame,它就是提供了一系列操作 API,与 RDD API 相比较,DataFrame 里操作的数据都是带有 Schema 信息,所以 DataFrame 里的所有操作是可以享受 Spark SQL Catalyst optimizer 带来的性能提升,比如 code generation 以及 Tungsten等。执行过程如下图所示
但是 DataFrame 出来后发现有些情况下 RDD 可以表达的逻辑用 DataFrame 无法表达。比如 要对 group by 或 join 后的结果用自定义的函数,可能用 SQL 是无法表达的。如下代码:
case class ClassData(a: String, b: Int)
case class ClassNullableData(a: String, b: Integer)
val ds = Seq(ClassData("a", 1), ClassData("a", 2)).toDS()
val agged = ds.groupByKey(d =& ClassNullableData(d.a, null))
.mapGroups {
case (key, values) =& key.a + values.map(_.b).sum
中间处理过程的数据是自定义的类型,并且 groupby 后的聚合逻辑也是自定义的,故用 SQL 比较难以表达,所以提出了 Dataset API。Dataset API 扩展 DataFrame API 支持静态类型和运行已经存在的 Scala 或 Java 语言的用户自定义函数。同时 Dataset 也能享受 Spark SQL 里所有性能 带来的提升。
那么后面发现 Dataset 是包含了 DataFrame 的功能,这样二者就出现了很大的冗余,故在 2.0 时将二者统一,保留 Dataset API,把 DataFrame 表示为 Dataset[Row],即 Dataset 的子集。
因此我们在使用 API 时,优先选择 DataFrame & Dataset,因为它的性能很好,而且以后的优化它都可以享受到,但是为了兼容早期版本的程序,RDD API 也会一直保留着。后续 Spark 上层的库将全部会用 DataFrame,比如 MLlib、Streaming、Graphx 等。
Whole-stage code generation
其中一个例子:
select count(*) from store_sales where ss_item_sk = 1000
那么在翻译成计算引擎的执行计划如下图:
而通常物理计划的代码是这样实现的:
class Filter {
def next(): Boolean = {
var found = false
while (!found && child.next()) {
found = predicate(child.fetch())
return found
def fetch(): InternalRow = {
child.fetch()
但是真正如果我们用 hard code 写的话,代码是这样的:
var count = 0
for (ss_item_sk in store_sales) {
if (ss_item_sk == 1000) {
count += 1
发现二者相关如下图所示:
那么如何使得计算引擎的物理执行速度能达到 hard code 的性能呢?这就提出了 whole-stage code generation,即对物理执行的多次调用转换为代码 for 循环,类似 hard code 方式,减少中间执行的函数调用次数,当数据记录多时,这个调用次数是很大。 最后这个优化带来的性能提升如下图所示:
从 benchmark 的结果可以看出,使用了该特性后各操作的性能都有很大的提升。
Structured Streaming
Spark Streaming 是把流式计算看成一个一个的离线计算来完成流式计算,提供了一套 Dstream 的流 API,相比于其他的流式计算,Spark Streaming 的优点是容错性和吞吐量上要有优势,关于 Spark Streaming 的详细设计思想和分析,可以到 /lw-lin/CoolplaySpark 进行详细学习和了解。
在 2.0 以前的版本,用户在使用时,如果有流计算,又有离线计算,就需要用二套 API 去编写程序,一套是 RDD API,一套是 Dstream API。而且 Dstream API 在易用性上远不如 SQL 或 DataFrame。
为了真正将流式计算和离线计算在编程 API 上统一,同时也让 Streaming 作业能够享受 DataFrame/Dataset 上所带来的优势:性能提升和 API 易用,于是提出了 Structured Streaming。最后我们只需要基于 DataFrame/Dataset 可以开发离线计算和流式计算的程序,很容易使得 Spark 在 API 跟业界所说的 DataFlow 来统一离线计算和流式计算效果一样。
比如在做 Batch Aggregation 时我们可以写成下面的代码
那么对于流式计算时,我们仅仅是调用了 DataFrame/Dataset 的不同函数代码,如下:
最后,在 DataFrame/Dataset 这个 API 上可以完成如下图所示的所有应用:
其他主要性能提升
采用 vectorized Parquet decoder 读取 parquet 上数据。以前是一行一行的读取,然后处理。现在改为一次读取 4096 行记录,不需要每处理一行记录去调用一次 Parquet 获取记录的方法,而是改为一批去调用一次(SPARK-12854)。加上 Parquet 本身是列式存储,这个优化使得 Parquet 读取速度提高 3 倍。
采有 radix sort 来提高 sort 的性能(SPARK-14724)。在某些情况下排序性能可以提高 10-20 倍。
使用 VectorizedHashmap 来代替 Java 的 HashMap 加速 group by 的执行(SPARK-14319)。
将 Hive 中的 Window 函数用 Native Spark Window 实现,因为 Native Spark Window 在内存管理上有优势(SPARK-8641)。
避免复杂语句中的逻辑相同部分在执行时重复计算(SPARK-13523)。
压缩算法默认使用 LZ4(SPARK-12388)。
语句的增强
建立新的语法解析(SPARK-12362)满足所有的 SQL 语法,这样即合并 Hive 和标准 SQL 的语句解析,同时不依赖 Hive 的语法解析 jar(SPARK-14776)。之前版本二者的语法解析是独立的,这样导致在标准 SQL 中无法使用窗口函数或者 Hive 的语法,而在使用 Hive 语法时无法使用标准 SQL 的语法,比如 In/Exists 子句等。在 SQL 编写时,没法在一个 Context 把二者的范围全部支持,然而有了这个特性后,使得 SQL 语句表达更强大,后续要增加任何语法,只需要维护这一个语法解析即可。当然缺点是后续 Hive 版本的新语法,需要手动添加进来。
支持 intersect/except(SPARK-12542)。如 select * from t1 except select * from t2 或者 select * from t1 intersect select * from t2。
支持 uncorrelated scalar subquery(SPARK-13417)。如 select (select min(value) from testData where key = (select max(key) from testData) - 1)。
支持 DDL/DML(SPARK-14118)。之前 DDL/DML 语句是调用 Hive 的 DDL/DML 语句命令来完成,而现在是直接在 Spark SQL 上就可以完成。
支持 multi-insert(SPARK-13924)。
支持 exist(SPARK-12545)和 NOT EXISTS(SPARK-10600),如 select * from (select 1 as a union all select 2 as a) t where exists (select * from (select 1 as b) t2 where b = a and b & 2)。
支持 subqueries 带有 In/Not In 子句(SPARK-4226),如 select * from (select 1 as a union all select 2 as a) t where a in (select b as a from t2 where b & 2)。
支持 select/where/having 中使用 subquery(SPARK-12543),如 select * from t where a = (select max(b) from t2) 或 select max(a) as ma from t having ma = (select max(b) from t2)。
支持 LeftSemi/LeftAnti(SPARK-14853)。
支持在条件表达式 In/Not In 里使用子句(SPARK-14781),如 select * from l where l.a in (select c from r) or l.a in (select c from r where l.b & r.d)。
支持所有的 TPCDS 语句(SPARK-12540)。
与以前版本兼容(SPARK-11806)
不支持运行在 Hadoop 版本 & 2.2 上(SPARK-11807)。
去掉 HTTPBroadcast(SPARK-12588)。
去掉 HashShuffleManager(SPARK-14667)。
去掉 Akka RPC。
简化与完善 accumulators and task metrics(SPARK-14626)。
将 Hive 语法解析以及语法移至 Core 里(SPARK-14825),在没有 Hive 元数据库和 Hive 依赖包时,我们可以像之前版本使用标准 SQL 一样去使用 HiveQL 语句。
1.6 版本严重问题的解决
在 http://geek.csdn.net/news/detail/70162 提到的 1.6 问题中 Spillable 集合内存溢出问题在 SPARK-4452 里已解决,BlockManager 死锁问题在 SPARK-12757 里已解决。
最后 2.0 版本还有一些其他的特性,如:
用 SparkSession 替换掉原来的 SQLContext and HiveContext。
mllib 里的计算用 DataFrame-based API 代替以前的 RDD 计算逻辑。
提供更多的 R 语言算法。
默认使用 Scala 2.11 编译与运行。
原文地址:
版权声明:本文内容由互联网用户自发贡献,本社区不拥有所有权,也不承担相关法律责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件至: 进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容。
用云栖社区APP,舒服~
【云栖快讯】红轴机械键盘、无线鼠标等753个大奖,先到先得,云栖社区首届博主招募大赛9月21日-11月20日限时开启,为你再添一个高端技术交流场所&&
是为多媒体数据提供的转码计算服务。它以经济、弹性和高可扩展的音视频转换方法,将多媒体数据转码成适合在PC、TV以...
构建于阿里云 ECS 弹性虚拟机之上,利用开源大数据生态系统,包括 Hadoop、Spark、HBase,为用户...
集音视频上传、自动化转码处理、媒体资源管理、分发加速于一体的一站式音视频点播解决方案。
为您提供简单高效、处理能力可弹性伸缩的计算服务,帮助您快速构建更稳定、安全的应用,提升运维效率,降低 IT 成本...
MaxCompute75折抢购
Loading...spark运行原理
弹性之一:自动的进行内存和磁盘数据存储的切换
弹性之二:基于Lineage的高校容错
弹性之三:Task如果失败会自动进行特定次数的重试
弹性之四:Stage如果失败会自动进行特定次数的重试
啥时间做缓存:
缓存:特别耗时、计算链条、Shuffle之后、ChechPoint
Partition size = Block Size
作业:Spark运行原理自我理解
1.首先,spark是基于内存的分布式高效计算框架,采用一栈式管理机制,同时支持流处理,实时交互式出,批处理三种方式,Spark特别支持迭代式计算,因此,他对机器学习,图计算具有较强的支持,为此他提供了机器学习和图计算接口。
2.Spark是基于内存的计算,当然也不仅仅支持内存同时也基于磁盘计算。
3.Spark的核心是RDD,RDD是一种弹性式分布式数据集,他的数据分片分不到各个的节点上。
RDD的弹性具体表现在:
弹性之一:自动的进行内存和磁盘数据存储的切换
弹性之二:基于Lineage的高校容错
弹性之三:Task如果失败会自动进行特定次数的重试
弹性之四:Stage如果失败会自动进行特定次数的重试
4.spark采用集群管理模式
先有集群资源管理服务(Cluster
Manager)和运行作业任务的结点(WorkerNode),然后就是每个应用的任务控制结点Driver和每个机器节点上有具体任务的执行进程(Executor);Executor有二个优点:一个是多线程来执行具体的任务,而不是像MR那样采用进程模型,减少了任务的启动开稍。二个是Executor上会有一个BlockManager存储模块,类似于KV系统(内存和磁盘共同作为存储设备),当需要迭代多轮时,可以将中间过程的数据先放到这个存储系统上,下次需要时直接读该存储上数据,而不需要读写到hdfs等相关的文件系统里,或者在交互式查询场景下,事先将表Cache到该存储系统上,提高读写IO性能。另外Spark在做Shuffle时,在Groupby,Join等场景下去掉了不必要的Sort操作,相比于MapReduce只有Map和Reduce二种模式,Spark还提供了更加丰富全面的运算操作如filter,groupby,join等。
已投稿到:
以上网友发言只代表其个人观点,不代表新浪网的观点或立场。第1章 Spark的环境搭建与运行
第1章 Spark的环境搭建与运行
Apache Spark是一个分布式计算框架,旨在简化运行于计算机集群上的并行程序的编写。该框架对资源调度,任务的提交、执行和跟踪,节点间的通信以及数据并行处理的内在底层操作都进行了抽象。它提供了一个更高级别的API用于处理分布式数据。从这方面说,它与Apache Hadoop等分布式处理框架类似。但在底层架构上,Spark与它们有所不同。
Spark起源于加利福利亚大学伯克利分校的一个研究项目。学校当时关注分布式机器学习算法的应用情况。因此,Spark从一开始便为应对迭代式应用的高性能需求而设计。在这类应用中,相同的数据会被多次访问。该设计主要靠利用数据集内存缓存以及启动任务时的低延迟和低系统开销来实现高性能。再加上其容错性、灵活的分布式数据结构和强大的函数式编程接口,Spark在各类基于机器学习和迭代分析的大规模数据处理任务上有广泛的应用,这也表明了其实用性。
关于Spark项目的更多背景信息,包括其开发的核心研究论文,可从项目的历史介绍页面中查到:。
Spark支持四种运行模式。
本地单机模式:所有Spark进程都运行在同一个Java虚拟机(Java Vitural Machine,JVM)中。
集群单机模式:使用Spark自己内置的任务调度框架。
基于Mesos:Mesos是一个流行的开源集群计算框架。
基于YARN:即Hadoop 2,它是一个与Hadoop关联的集群计算和资源调度框架。
本章主要包括以下内容。
下载Spark二进制版本并搭建一个本地单机模式下的开发环境。各章的代码示例都在该环境下运行。
通过Spark的交互式终端来了解它的编程模型及其API。
分别用Scala、Java和Python语言来编写第一个Spark程序。
在Amazon的Elastic Cloud Compute(EC2)平台上架设一个Spark集群。相比本地模式,该集群可以应对数据量更大、计算更复杂的任务。
通过自定义脚本,Spark同样可以运行在Amazon的Elastic MapReduce服务上,但这不在本书讨论范围内。相关信息可参考;本书写作时,这篇文章是基于Spark 1.1.0写的。
如果读者曾构建过Spark环境并有Spark程序编写基础,可以跳过本章。
1.1 Spark的本地安装与配置
Spark能通过内置的单机集群调度器来在本地运行。此时,所有的Spark进程运行在同一个Java虚拟机中。这实际上构造了一个独立、多线程版本的Spark环境。本地模式很适合程序的原型设计、开发、调试及测试。同样,它也适应于在单机上进行多核并行计算的实际场景。
Spark的本地模式与集群模式完全兼容,本地编写和测试过的程序仅需增加少许设置便能在集群上运行。
本地构建Spark环境的第一步是下载其最新的版本包(本书写作时为1.2.0版)。各个版本的版本包及源代码的GitHub地址可从Spark项目的下载页面找到:。
Spark的在线文档涵盖了进一步学习Spark所需的各种资料。强烈推荐读者浏览查阅。
为了访问HDFS(Hadoop Distributed File System,Hadoop分布式文件系统)以及标准或定制的Hadoop输入源,Spark的编译需要与Hadoop的版本对应。上述下载页面提供了针对Hadoop 1、CDH4(Cloudera的Hadoop发行版)、MapR的Hadoop发行版和Hadoop 2(YARN)的预编译二进制包。除非你想构建针对特定版本Hadoop的Spark,否则建议你通过如下链接从Apache镜像下载Hadoop 2.4预编译版本:。
Spark的运行依赖Scala编程语言(本书写作时为2.10.4版)。好在预编译的二进制包中已包含Scala运行环境,我们不需要另外安装Scala便可运行Spark。但是,JRE(Java运行时环境)或JDK(Java开发套件)是要安装的(相应的安装指南可参见本书代码包中的软硬件列表)。
下载完上述版本包后,解压,并在终端进入解压时新建的主目录:
&tar xfvz spark-1.2.0-bin-hadoop2.4.tgz
&cd spark-1.2.0-bin-hadoop2.4
用户运行Spark的脚本在该目录的bin目录下。我们可以运行Spark附带的一个示例程序来测试是否一切正常:
&./bin/run-example org.apache.spark.examples.SparkPi
该命令将在本地单机模式下执行SparkPi这个示例。在该模式下,所有的Spark进程均运行于同一个JVM中,而并行处理则通过多线程来实现。默认情况下,该示例会启用与本地系统的CPU核心数目相同的线程。示例运行完,应可在输出的结尾看到类似如下的提示:
14/11/27 20:58:47 INFO SparkContext: Job finished: reduce at SparkPi.scala:35, took 0.723269s
Pi is roughly 3.1465
要在本地模式下设置并行的级别,以local[N]的格式来指定一个master变量即可。上述参数中的N表示要使用的线程数目。比如只使用两个线程时,可输入如下命令:
&MASTER=local[2] ./bin/run-example org.apache.spark.examples.SparkPi
1.2 Spark集群
Spark集群由两类程序构成:一个驱动程序和多个执行程序。本地模式时所有的处理都运行在同一个JVM内,而在集群模式时它们通常运行在不同的节点上。
举例来说,一个采用单机模式的Spark集群(即使用Spark内置的集群管理模块)通常包括:
一个运行Spark单机主进程和驱动程序的主节点;
各自运行一个执行程序进程的多个工作节点。
在本书中,我们将使用Spark的本地单机模式做概念讲解和举例说明,但所用的代码也可运行在Spark集群上。比如在一个Spark单机集群上运行上述示例,只需传入主节点的URL即可:
&MASTER=spark://IP:PORT ./bin/run-example org.apache.spark.examples.SparkPi
其中的IP和PORT分别是主节点IP地址和端口号。这是告诉Spark让示例程序运行在主节点所对应的集群上。
Spark集群管理和部署的完整方案不在本书的讨论范围内。但是,本章后面会对Amazon EC2集群的设置和使用做简要说明。
Spark集群部署的概要介绍可参见如下链接:
1.3 Spark编程模型
在对Spark的设计进行更全面的介绍前,我们先介绍SparkContext对象以及Spark shell。后面将通过它们来了解Spark编程模型的基础知识。
虽然这里会对Spark的使用进行简要介绍并提供示例,但要想了解更多,可参考下面这些资料。
Spark快速入门:。
针对Scala、Java和Python的《Spark编程指南》:。
1.3.1 SparkContext类与SparkConf类
任何Spark程序的编写都是从SparkContext(或用Java编写时的JavaSparkContext)开始的。SparkContext的初始化需要一个SparkConf对象,后者包含了Spark集群配置的各种参数(比如主节点的URL)。
初始化后,我们便可用SparkContext对象所包含的各种方法来创建和操作分布式数据集和共享变量。Spark shell(在Scala和Python下可以,但不支持Java)能自动完成上述初始化。若要用Scala代码来实现的话,可参照下面的代码:
val conf = new SparkConf()
.setAppName("Test Spark App")
.setMaster("local[4]")
val sc = new SparkContext(conf)
这段代码会创建一个4线程的SparkContext对象,并将其相应的任务命名为Test Spark APP。我们也可通过如下方式调用SparkContext的简单构造函数,以默认的参数值来创建相应的对象。其效果和上述的完全相同:
val sc = new SparkContext("local[4]", "Test Spark App")
下载示例代码你可从下载你账号购买过的Packt书籍所对应的示例代码。若书是从别处购买的,则可在注册,相应的代码会直接发送到你的电子邮箱。
1.3.2 Spark shell
Spark支持用Scala或Python REPL(Read-Eval-Print-Loop,即交互式shell)来进行交互式的程序编写。由于输入的代码会被立即计算,shell能在输入代码时给出实时反馈。在Scala shell里,命令执行结果的值与类型在代码执行完后也会显示出来。
要想通过Scala来使用Spark shell,只需从Spark的主目录执行./bin/spark-shell。它会启动Scala shell并初始化一个SparkContext对象。我们可以通过sc这个Scala值来调用这个对象。该命令的终端输出应该如下图所示:
要想在Python shell中使用Spark,直接运行./bin/pyspark命令即可。与Scala shell类似, Python下的SparkContext对象可以通过Python变量sc来调用。上述命令的终端输出应该如下图所示:
1.3.3 弹性分布式数据集
RDD(Resilient Distributed Dataset,弹性分布式数据集)是Spark的核心概念之一。一个RDD代表一系列的“记录”(严格来说,某种类型的对象)。这些记录被分配或分区到一个集群的多个节点上(在本地模式下,可以类似地理解为单个进程里的多个线程上)。Spark中的RDD具备容错性,即当某个节点或任务失败时(因非用户代码错误的原因而引起,如硬件故障、网络不通等),RDD会在余下的节点上自动重建,以便任务能最终完成。
1. 创建RDD
RDD可从现有的集合创建。比如在Scala shell中:
val collection = List("a", "b", "c", "d", "e")
val rddFromCollection = sc.parallelize(collection)
RDD也可以基于Hadoop的输入源创建,比如本地文件系统、HDFS和Amazon S3。基于Hadoop的RDD可以使用任何实现了Hadoop InputFormat接口的输入格式,包括文本文件、其他Hadoop标准格式、HBase、Cassandra等。以下举例说明如何用一个本地文件系统里的文件创建RDD:
val rddFromTextFile = sc.textFile("LICENSE")
上述代码中的textFile函数(方法)会返回一个RDD对象。该对象的每一条记录都是一个表示文本文件中某一行文字的String(字符串)对象。
2. Spark操作
创建RDD后,我们便有了一个可供操作的分布式记录集。在Spark编程模式下,所有的操作被分为转换(transformation)和执行(action)两种。一般来说,转换操作是对一个数据集里的所有记录执行某种函数,从而使记录发生改变;而执行通常是运行某些计算或聚合操作,并将结果返回运行SparkContext的那个驱动程序。
Spark的操作通常采用函数式风格。对于那些熟悉用Scala或Python进行函数式编程的程序员来说,这不难掌握。但Spark API其实容易上手,所以那些没有函数式编程经验的程序员也不用担心。
Spark程序中最常用的转换操作便是map操作。该操作对一个RDD里的每一条记录都执行某个函数,从而将输入映射成为新的输出。比如,下面这段代码便对一个从本地文本文件创建的RDD进行操作。它对该RDD中的每一条记录都执行size函数。之前我们曾创建过一个这样的由若干String构成的RDD对象。通过map函数,我们将每一个字符串都转换为一个整数,从而返回一个由若干Int构成的RDD对象。
val intsFromStringsRDD = rddFromTextFile.map(line =& line.size)
其输出应与如下类似,其中也提示了RDD的类型:
intsFromStringsRDD: org.apache.spark.rdd.RDD[Int] = MappedRDD[5] at map at &console&:14
示例代码中的=&是Scala下表示匿名函数的语法。匿名函数指那些没有指定函数名的函数(比如Scala或Python中用def关键字定义的函数)。
匿名函数的具体细节并不在本书讨论范围内,但由于它们在Scala、Python以及Java 8中大量使用(示例或现实应用中都是),列举一些实例仍会有帮助。
语法line =& line.size表示以=&操作符左边的部分作为输入,对其执行一个函数,并以=&操作符右边代码的执行结果为输出。在这个例子中,输入为line,输出则是line.size函数的执行结果。在Scala语言中,这种将一个String对象映射为一个Int的函数被表示为String =& Int。
该语法使得每次使用如map这种方法时,都不需要另外单独定义一个函数。当函数简单且只需使用一次时(像本例一样时),这种方式很有用。
现在我们可以调用一个常见的执行操作count,来返回RDD中的记录数目。
intsFromStringsRDD.count
执行的结果应该类似如下输出:
14/01/29 23:28:28 INFO SparkContext: Starting job: count at &console&:17 ...
14/01/29 23:28:28 INFO SparkContext: Job finished: count at &console&:17, took 0.019227 s
res4: Long = 398
如果要计算这个文本文件里每行字符串的平均长度,可以先使用sum函数来对所有记录的长度求和,然后再除以总的记录数目:
val sumOfRecords = intsFromStringsRDD.sum
val numRecords = intsFromStringsRDD.count
val aveLengthOfRecord = sumOfRecords / numRecords
结果应该如下:
aveLengthOfRecord: Double = 52.69
Spark的大多数操作都会返回一个新RDD,但多数的执行操作则是返回计算的结果(比如上面例子中,count返回一个Long,sum返回一个Double)。这就意味着多个操作可以很自然地前后连接,从而让代码更为简洁明了。举例来说,用下面的一行代码可以得到和上面例子相同的结果:
val aveLengthOfRecordChained = rddFromTextFile.map(line =& line.size).sum / rddFromTextFile.count
值得注意的一点是,Spark中的转换操作是延后的。也就是说,在RDD上调用一个转换操作并不会立即触发相应的计算。相反,这些转换操作会链接起来,并只在有执行操作被调用时才被高效地计算。这样,大部分操作可以在集群上并行执行,只有必要时才计算结果并将其返回给驱动程序,从而提高了Spark的效率。
这就意味着,如果我们的Spark程序从未调用一个执行操作,就不会触发实际的计算,也不会得到任何结果。比如下面的代码就只是返回一个表示一系列转换操作的新RDD:
val transformedRDD = rddFromTextFile.map(line =& line.size).
filter(size =& size & 10).map(size =& size * 2)
相应的终端输出如下:
transformedRDD: org.apache.spark.rdd.RDD[Int] = MappedRDD[8] at map at &console&:14
注意,这里实际上没有触发任何计算,也没有结果被返回。如果我们现在在新的RDD上调用一个执行操作,比如sum,该计算将会被触发:
val computation = transformedRDD.sum
现在你可以看到一个Spark任务被启动,并返回如下终端输出:
14/11/27 21:48:21 INFO SparkContext: Job finished: sum at &console&:16,
took 0.193513 s
computation: Double = 60468.0
RDD支持的转换和执行操作的完整列表以及更为详细的例子,参见《Spark编程指南》()以及Spark API(Scala)文档()。
3. RDD缓存策略
Spark最为强大的功能之一便是能够把数据缓存在集群的内存里。这通过调用RDD的cache函数来实现:
rddFromTextFile.cache
调用一个RDD的cache函数将会告诉Spark将这个RDD缓存在内存中。在RDD首次调用一个执行操作时,这个操作对应的计算会立即执行,数据会从数据源里读出并保存到内存。因此,首次调用cache函数所需要的时间会部分取决于Spark从输入源读取数据所需要的时间。但是,当下一次访问该数据集的时候,数据可以直接从内存中读出从而减少低效的I/O操作,加快计算。多数情况下,这会取得数倍的速度提升。
如果现在在已缓存了的RDD上调用count或sum函数,应该可以感觉到RDD的确已经载入到了内存中:
val aveLengthOfRecordChained = rddFromTextFile.map(line =& line.size).
sum / rddFromTextFile.count
实际上,从下方的输出我们可以看到,数据在第一次调用cache时便已缓存到内存,并占用了大约62 KB的空间,余下270 MB可用:
14/01/30 06:59:27 INFO MemoryStore: ensureFreeSpace(63454) called with curMem=32960, maxMem=
14/01/30 06:59:27 INFO MemoryStore: Block rdd_2_0 stored as values to memory (estimated size 62.0 KB, free 296.9 MB)
14/01/30 06:59:27 INFO BlockManagerMasterActor$BlockManagerInfo: Added rdd_2_0 in memory on 10.0.0.3:55089 (size: 62.0 KB, free: 296.9 MB)
现在,我们再次求平均长度:
val aveLengthOfRecordChainedFromCached = rddFromTextFile.map(line =& line.size).sum / rddFromTextFile.count
从如下的输出中应该可以看出缓存的数据是从内存直接读出的:
14/01/30 06:59:34 INFO BlockManager: Found block rdd_2_0 locally
Spark支持更为细化的缓存策略。通过persist函数可以指定Spark的数据缓存策略。关于RDD缓存的更多信息可参见:。
1.3.4 广播变量和累加器
Spark的另一个核心功能是能创建两种特殊类型的变量:广播变量和累加器。
广播变量(broadcast variable)为只读变量,它由运行SparkContext的驱动程序创建后发送给会参与计算的节点。对那些需要让各工作节点高效地访问相同数据的应用场景,比如机器学习,这非常有用。Spark下创建广播变量只需在SparkContext上调用一个方法即可:
val broadcastAList = sc.broadcast(List("a", "b", "c", "d", "e"))
终端的输出表明,广播变量存储在内存中,占用的空间大概是488字节,仍余下270 MB可用空间:
14/01/30 07:13:32 INFO MemoryStore: ensureFreeSpace(488) called with curMem=96414, maxMem=
14/01/30 07:13:32 INFO MemoryStore: Block broadcast_1 stored as values to memory (estimated size 488.0 B, free 296.9 MB)
broadCastAList: org.apache.spark.broadcast.Broadcast[List[String]] = Broadcast(1)
广播变量也可以被非驱动程序所在的节点(即工作节点)访问,访问的方法是调用该变量的value方法:
sc.parallelize(List("1", "2", "3")).map(x =& broadcastAList.value ++ x).collect
这段代码会从{"1", "2", "3"}这个集合(一个Scala List)里,新建一个带有三条记录的RDD。map函数里的代码会返回一个新的List对象。这个对象里的记录由之前创建的那个broadcastAList里的记录与新建的RDD里的三条记录分别拼接而成。
注意,上述代码使用了collect函数。这个函数是一个Spark执行函数,它将整个RDD以Scala(Python或Java)集合的形式返回驱动程序。
通常只在需将结果返回到驱动程序所在节点以供本地处理时,才调用collect函数。
注意,collect函数一般仅在的确需要将整个结果集返回驱动程序并进行后续处理时才有必要调用。如果在一个非常大的数据集上调用该函数,可能耗尽驱动程序的可用内存,进而导致程序崩溃。
高负荷的处理应尽可能地在整个集群上进行,从而避免驱动程序成为系统瓶颈。然而在不少情况下,将结果收集到驱动程序的确是有必要的。很多机器学习算法的迭代过程便属于这类情况。
从如下结果可以看出,新生成的RDD里包含3条记录,其每一条记录包含一个由原来被广播的List变量附加一个新的元素所构成的新记录(也就是说,新记录分别以1、2、3结尾)。
14/01/31 10:15:39 INFO SparkContext: Job finished: collect at &console&:15, took 0.025806 s
res6: Array[List[Any]] = Array(List(a, b, c, d, e, 1), List(a, b, c, d, e, 2), List(a, b, c, d, e, 3))
累加器(accumulator)也是一种被广播到工作节点的变量。累加器与广播变量的关键不同,是后者只能读取而前者却可累加。但支持的累加操作有一定的限制。具体来说,这种累加必须是一种有关联的操作,即它得能保证在全局范围内累加起来的值能被正确地并行计算以及返回驱动程序。每一个工作节点只能访问和操作其自己本地的累加器,全局累加器则只允许驱动程序访问。累加器同样可以在Spark代码中通过value访问。
关于累加器的更多信息,可参见《Spark编程指南》:。
1.4 Spark Scala编程入门
下面我们用上一节所提到的内容来编写一个简单的Spark数据处理程序。该程序将依次用Scala、Java和Python三种语言来编写。所用数据是客户在我们在线商店的商品购买记录。该数据存在一个CSV文件中,名为UserPurchaseHistory.csv,内容如下所示。文件的每一行对应一条购买记录,从左到右的各列值依次为客户名称、商品名以及商品价格。
John,iPhone Cover,9.99
John,Headphones,5.49
Jack,iPhone Cover,9.99
Jill,Samsung Galaxy Cover,8.95
Bob,iPad Cover,5.49
对于Scala程序而言,需要创建两个文件:Scala代码文件以及项目的构建配置文件。项目将使用SBT(Scala Build Tool,Scala构建工具)来构建。为便于理解,建议读者下载示例代码scala-spark-app。该资源里的data目录下包含了上述CSV文件。运行这个示例项目需要系统中已经安装好SBT(编写本书时所使用的版本为0.13.1)。
配置SBT并不在本书讨论范围内,但读者可以从找到更多信息。
我们的SBT配置文件是build.sbt,其内容如下面所示(注意,各行代码之间的空行是必需的):
name := "scala-spark-app"
version := "1.0"
scalaVersion := "2.10.4"
libraryDependencies += "org.apache.spark" %% "spark-core" % "1.2.0 "
最后一行代码是添加Spark到本项目的依赖库。
相应的Scala程序在ScalaApp.scala这个文件里。接下来我们会逐一讲解代码的各个部分。首先,导入所需要的Spark类:
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
* 用Scala编写的一个简单的Spark应用
object ScalaApp {
在主函数里,我们要初始化所需的SparkContext对象,并且用它通过textFile函数来访问CSV数据文件。之后对每一行原始字符串以逗号为分隔符进行分割,提取出相应的用户名、产品和价格信息,从而完成对原始文本的映射:
def main(args: Array[String]) {
val sc = new SparkContext("local[2]", "First Spark App")
// 将CSV格式的原始数据转化为(user,product,price)格式的记录集
val data = sc.textFile("data/UserPurchaseHistory.csv")
.map(line =& line.split(","))
.map(purchaseRecord =& (purchaseRecord(0), purchaseRecord(1),
purchaseRecord(2)))
现在,我们有了一个RDD,其每条记录都由(user, product, price)三个字段构成。我们可以对商店计算如下指标:
购买总次数
客户总个数
最畅销的产品
计算方法如下:
// 求购买次数
val numPurchases = data.count()
// 求有多少个不同客户购买过商品
val uniqueUsers = data.map{ case (user, product, price) =& user }.distinct().count()
// 求和得出总收入
val totalRevenue = data.map{ case (user, product, price) =& price.toDouble }.sum()
// 求最畅销的产品是什么
val productsByPopularity = data
.map{ case (user, product, price) =& (product, 1) }
.reduceByKey(_ + _)
.collect()
.sortBy(-_._2)
val mostPopular = productsByPopularity(0)
最后那段计算最畅销产品的代码演示了如何进行Map/Reduce模式的计算,该模式随Hadoop而流行。第一步,我们将(user, product, price)格式的记录映射为(product, 1)格式。然后,我们执行一个reduceByKey操作,它会对各个产品的1值进行求和。
转换后的RDD包含各个商品的购买次数。有了这个RDD后,我们可以调用collect函数,这会将其计算结果以Scala集合的形式返回驱动程序。之后在驱动程序的本地对这些记录按照购买次数进行排序。(注意,在实际处理大量数据时,我们通常通过sortByKey这类操作来对其进行并行排序。)
最后,可在终端上打印出计算结果:
println("Total purchases: " + numPurchases)
println("Unique users: " + uniqueUsers)
println("Total revenue: " + totalRevenue)
println("Most popular product: %s with %d purchases".
format(mostPopular._1, mostPopular._2))
可以在项目的主目录下执行sbt run命令来运行这个程序。如果你使用了IDE的话,也可以从Scala IDE直接运行。最终的输出应该与下面的内容相似:
[info] Compiling 1 Scala source to ...
[info] Running ScalaApp
14/01/30 10:54:40 INFO spark.SparkContext: Job finished: collect at
ScalaApp.scala:25, took 0.045181 s
Total purchases: 5
Unique users: 4
Total revenue: 39.91
Most popular product: iPhone Cover with 2 purchases
可以看到,商店总共有4个客户的5次交易,总收入为39.91。最畅销的商品是iPhone Cover,共购买2次。
1.5 Spark Java编程入门
Java API与Scala API本质上很相似。Scala代码可以很方便地调用Java代码,但某些Scala代码却无法在Java里调用,特别是那些使用了隐式类型转换、默认参数和采用了某些Scala反射机制的代码。
一般来说,这些特性在Scala程序中会被广泛使用。这就有必要另外为那些常见的类编写相应的Java版本。由此,SparkContext有了对应的Java版本JavaSparkContext,而RDD则对应JavaRDD。
1.8及之前版本的Java并不支持匿名函数,在函数式编程上也没有严格的语法规范。于是,套用到Spark的Java API上的函数必须要实现一个带有call函数的WrappedFunction接口。这会使得代码冗长,所以我们经常会创建临时类来传递给Spark操作。这些类会实现操作所需的接口以及call函数,以取得和用Scala编写时相同的效果。
Spark提供对Java 8匿名函数(lambda)语法的支持。使用该语法能让Java 8书写的代码看上去很像等效的Scala版。
用Scala编写时,键/值对记录的RDD能支持一些特别的操作(比如reduceByKey和saveAsSequenceFile)。这些操作可以通过隐式类型转换而自动被调用。用Java编写时,则需要特别类型的JavaRDD来支持这些操作。它们包括用于键/值对的JavaPairRDD,以及用于数值记录的JavaDoubleRDD。
我们在这里只涉及标准的Java API语法。关于Java下支持的RDD以及Java 8 lambda表达式支持的更多信息可参见《Spark编程指南》:。
在后面的Java程序中,我们可以看到大部分差异。这些示例代码包含在本章示例代码的java-spark-app目录下。该目录的data子目录下也包含上述CSV数据。
这里会使用Maven构建工具来编译和运行这个项目。我们假设读者已经在其系统上安装好了该工具。
Maven的安装和配置并不在本书讨论范围内。通常它可通过Linux系统中的软件管理器或Mac OS X中的HomeBrew或MacPorts方便地安装。
详细的安装指南参见:。
项目中包含一个名为JavaApp.java的Java源文件:
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkC
import org.apache.spark.api.java.function.DoubleF
import org.apache.spark.api.java.function.F
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairF
import scala.Tuple2;
import java.util.C
import java.util.L
* 用Java编写的一个简单的Spark应用
public class JavaApp {
public static void main(String[] args) {
正如在Scala项目中一样,我们首先需要初始化一个上下文对象。值得注意的是,这里所使用的是JavaSparkContext类而不是之前的SparkContext。类似地,调用JavaSparkContext对象,利用textFile函数来访问数据,然后将各行输入分割成多个字段。请注意下面代码的高亮部分是如何使用匿名类来定义一个分割函数的。该函数确定了如何对各行字符串进行分割。
JavaSparkContext sc = new JavaSparkContext("local[2]", "First Spark App");
// 将CSV格式的原始数据转化为(user,product,price)格式的记录集
JavaRDD&String[]& data =
sc.textFile("data/UserPurchaseHistory.csv")
.map(new Function&String, String[]&() {
public String[] call(String s) throws Exception {
return s.split(",");
现在可以算一下用Scala时计算过的指标。这里有两点值得注意的地方,一是下面Java API中有些函数(比如distinct和count)实际上和在Scala API中一样,二是我们定义了一个匿名类并将其传给map函数。匿名类的定义方式可参见代码的高亮部分。
// 求总购买次数
long numPurchases = data.count();
// 求有多少个不同客户购买过商品
long uniqueUsers = data.map(new Function&String[], String&() {
public String call(String[] strings) throws Exception {
return strings[0];
}).distinct().count();
// 求和得出总收入
double totalRevenue = data.map(new DoubleFunction&String[]&(){
public Double call(String[] strings) throws Exception {
return Double.parseDouble(strings[2]);
下面的代码展现了如何求出最畅销的产品,其步骤与Scala示例的相同。多出的那些代码看似复杂,但它们大多与Java中创建匿名函数有关,实际功能与用Scala时一样:
// 求最畅销的产品是哪个
// 首先用一个PairFunction和Tuple2类将数据映射成为(product,1)格式的记录
// 然后,用一个Function2类来调用reduceByKey操作,该操作实际上是一个求和函数
List&Tuple2&String, Integer&& pairs = data.map(new
PairFunction&String[], String, Integer&() {
public Tuple2&String, Integer& call(String[] strings)
throws Exception {
return new Tuple2(strings[1], 1);
}).reduceByKey(new Function2&Integer, Integer, Integer&() {
public Integer call(Integer integer, Integer integer2)
throws Exception {
return integer + integer2;
}).collect();
// 最后对结果进行排序。注意,这里会需要创建一个Comparator函数来进行降序排列
Collections.sort(pairs, new Comparator&Tuple2&String, Integer&&() {
public int compare(Tuple2&String, Integer& o1,
Tuple2&String, Integer& o2) {
return -(o1._2() - o2._2());
String mostPopular = pairs.get(0)._1();
int purchases = pairs.get(0)._2();
System.out.println("Total purchases: " + numPurchases);
System.out.println("Unique users: " + uniqueUsers);
System.out.println("Total revenue: " + totalRevenue);
System.out.println(String.format("Most popular product:
%s with %d purchases", mostPopular, purchases));
从前面代码可以看出,Java代码和Scala代码相比虽然多了通过内部类来声明变量和函数的引用代码,但两者的基本结构类似。读者不妨分别练习这两种版本的代码,并比较一下计算同一个指标时两种语言在表达上的异同。
该程序可以通过在项目主目录下执行如下命令运行:
&mvn exec:java -Dexec.mainClass="JavaApp"
可以看到其输出和Scala版的很类似,而且计算结果完全一样:
14/01/30 17:02:43 INFO spark.SparkContext: Job finished: collect at
JavaApp.java:46, took 0.039167 s
Total purchases: 5
Unique users: 4
Total revenue: 39.91
Most popular product: iPhone Cover with 2 purchases
1.6 Spark Python编程入门
Spark的Python API几乎覆盖了所有Scala API所能提供的功能,但的确有些特性,比如Spark Streaming和个别的API方法,暂不支持。具体可参见《Spark编程指南》的Python部分:。
与上两节类似,这里将编写一个相同功能的Python版程序。我们假设读者系统中已安装2.6或更高版本的Python(多数Linux系统和Mac OS X已预装Python)。
如下示例代码可以在本章的python-spark-app目录下找到。相应的CSV数据文件也在该目录的data子目录中。项目代码在一个名为pythonapp.py的脚本里,其内容如下:
"""用Python编写的一个简单Spark应用"""
from pyspark import SparkContext
sc = SparkContext("local[2]", "First Spark App")
# 将CSV格式的原始数据转化为(user,product,price)格式的记录集
data = sc.textFile("data/UserPurchaseHistory.csv").map(lambda line:
line.split(",")).map(lambda record: (record[0], record[1], record[2]))
# 求总购买次数
numPurchases = data.count()
# 求有多少不同客户购买过商品
uniqueUsers = data.map(lambda record: record[0]).distinct().count()
# 求和得出总收入
totalRevenue = data.map(lambda record: float(record[2])).sum()
# 求最畅销的产品是什么
products = data.map(lambda record: (record[1], 1.0)).
reduceByKey(lambda a, b: a + b).collect()
mostPopular = sorted(products, key=lambda x: x[1], reverse=True)[0]
print "Total purchases: %d" % numPurchases
print "Unique users: %d" % uniqueUsers
print "Total revenue: %2.2f" % totalRevenue
print "Most popular product: %s with %d purchases" % (mostPopular[0], mostPopular[1])
对比Scala版和Python版代码,不难发现语法大致相同。主要不同在于匿名函数的表达方式上,匿名函数在Python语言中亦称lambda函数,lambda也是语法表达上的关键字。用Scala编写时,一个将输入x映射为输出y的匿名函数表示为x =& y,而在Python中则是lambda x : y。在上面代码的高亮部分,我们定义了一个将两个输入映射为一个输出的匿名函数。这两个输入的类型一般相同,这里调用的是相加函数,故写成lambda a, b : a + b。
运行该脚本的最好方法是在脚本目录下运行如下命令:
&$SPARK_HOME/bin/spark-submit pythonapp.py
上述代码中的$SPARK_HOME变量应该被替换为Spark的主目录,也就是在本章开始Spark预编译包解压生成的那个目录。
脚本运行完的输出应该和运行Scala和Java版时的类似,其结果同样也是:
14/01/30 11:43:47 INFO SparkContext: Job finished: collect at pythonapp.
py:14, took 0.050251 s
Total purchases: 5
Unique users: 4
Total revenue: 39.91
Most popular product: iPhone Cover with 2 purchases
1.7 在Amazon EC2上运行Spark
Spark项目提供了在Amazon EC2上构建一个Spark集群所需的脚本,位于ec2文件夹下。输入如下命令便可调用该文件夹下的spark-ec2脚本:
&./ec2/spark-ec2
当不带参数直接运行上述代码时,终端会显示该命令的用法信息:
Usage: spark-ec2 [options] &action& &cluster_name&
&action& can be: launch, destroy, login, stop, start, get-master
在创建一个Spark EC2集群前,我们需要一个Amazon账号。
如果没有Amazon Web Service账号,可以在注册。
AWS的管理控制台地址是:。
另外,我们还需要创建一个Amazon EC2密钥对和相关的安全凭证。Spark文档提到了在EC2上部署时的需求。
你要先自己创建一个Amazon EC2密钥对。通过管理控制台登入你的Amazon Web Services账号后,单击左边导航栏中的“Key Pairs”,然后创建并下载相应的私钥文件。通过ssh远程访问EC2时,会需要提交该密钥。该密钥的系统访问权限必须设定为600(即只有你可以读写该文件),否则会访问失败。
当需要使用spark-ec2脚本时,需要设置AWS_ACCESS_KEY_ID和AWS_SECRET_ACCESS_ KEY两个环境变量。它们分别为你的Amazon EC2访问密钥标识(key ID)和对应的密钥密码(secret access key)。这些信息可以从AWS主页上依次点击“Account | Security Credentials | Access Credentials”获得。
创建一个密钥时,最好选取一个好记的名字来命名。这里假设密钥名为spark,对应的密钥文件的名称为spark.pem。如上面提到的,我们需要确认密钥的访问权限并设定好所需的环境变量:
&chmod 600 spark.pem
&export AWS_ACCESS_KEY_ID="..."
&export AWS_SECRET_ACCESS_KEY="..."
上述下载所得的密钥文件只能下载一次(即在刚创建后),故对其既要安全保存又要避免丢失。
注意,下一节中会启用一个Amazon EC2集群,这会在你的AWS账号下产生相应的费用。
启动一个EC2 Spark集群
现在我们可以启动一个小型Spark集群了。启动它只需进入到ec2目录,然后输入:
&./spark-ec2 -k spark -i spark.pem -s 1 –-instance-type m3.medium --hadoop-major-version 2 launch test-cluster
这将启动一个名为“test-cluster”的新集群,其包含“m3.medium”级别的主节点和从节点各一个。该集群所用的Spark版本适配于Hadoop 2。我们使用的密钥名和密钥文件分别是spark和spark.pem。
集群的完全启动和初始化会需要一些时间。在运行启动代码后,应该会立即看到如下图所示的内容:
如果集群启动成功,最终应可在终端中看到类似如下的输出:
要测试是否能连接到新集群,可以输入如下命令:
&ssh -i spark.pem root@ec2-54-227-pute-
注意该命令中root@后面的IP地址需要替换为你自己的Amazon EC2的公开域名。该域名可在启动集群时的输出中找到。
另外也可以通过如下命令得到集群的公开域名:
&./spark-ec2 –i spark.pem get-master test-cluster
上述ssh命令执行成功后,你会连接到EC2上Spark集群的主节点,同时终端的输入应与如下类似:
如果要测试集群是否已正确配置Spark环境,可以切换到Spark目录后运行一个示例程序:
&MASTER=local[2] ./bin/run-example SparkPi
其输出应该与在自己电脑上的输出类似:
14/01/30 20:20:21 INFO SparkContext: Job finished: reduce at SparkPi.scala:35, took 0. s
Pi is roughly 3.14032
这样就有了包含多个节点的真实集群,可以测试集群模式下的Spark了。我们会在一个从节点的集群上运行相同的示例。运行命令和上面相同,但用主节点的URL作为MASTER的值:
&MASTER=spark://ec2-54-227-pute-:7077 ./bin/run-example SparkPi
注意,你需要将上面代码中的公开域名替换为你自己的。
同样,命令的输出应该和本地运行时的类似。不同的是,这里会有日志消息提示你的驱动程序已连接到Spark集群的主节点。
14/01/30 20:26:17 INFO client.Client$ClientActor: Connecting to master spark://ec2-54-220-189-136.eu-:7077
14/01/30 20:26:17 INFO cluster.SparkDeploySchedulerBackend: Connected to Spark cluster with app ID app-17-0001
14/01/30 20:26:17 INFO client.Client$ClientActor: Executor added: app- 17-0001/0 on worker-49-ip-10-34-137-45.eu-pute.internal-57119 (ip-10-34-137-45.eu-pute.internal:57119) with 1 cores
14/01/30 20:26:17 INFO cluster.SparkDeploySchedulerBackend: Granted executor ID app-17-0001/0 on hostPort ip-10-34-137-45.eu- pute.internal:57119 with 1 cores, 2.4 GB RAM
14/01/30 20:26:17 INFO client.Client$ClientActor: Executor updated: app- 17-0001/0 is now RUNNING
14/01/30 20:26:18 INFO spark.SparkContext: Starting job: reduce at SparkPi.scala:39
读者不妨在集群上自由练习,熟悉一下Scala的交互式终端:
&./bin/spark-shell --master spark://ec2-54-227-pute-:7077
练习完后,输入exit便可退出终端。另外也可以通过如下命令来体验PySpark终端:
&./bin/pyspark --master spark://ec2-54-227-pute-:7077
通过Spark主节点网页界面,可以看到主节点下注册了哪些应用。该界面位于ec2-54-227-pute-:8080(同样,需要将公开域名替换为你自己的)。你应该可以看到类似下面截图的界面,显示了之前运行过的一个程序以及两个已启动的终端任务。
值得注意的是,Amazon会根据集群的使用情况收取费用。所以在集群使用完毕后,记得停止或终止这个测试集群。要终止该集群可以先在你本地系统的ssh会话里输入exit,然后再输入如下命令:
&./ec2/spark-ec2 -k spark -i spark.pem destroy test-cluster
应该可以看到这样的输出:
Are you sure you
want to destroy the cluster test-cluster?
The following ninstances will be terminated:
Searching for existing
cluster test-cluster...
Found 1 master(s), 1 slaves
& ec2-54-227-pute-
& ec2-54-91-pute-
ALL DATA ON ALL NODES WILL BE LOST!!
Destroy cluster test-cluster (y/N): y
Searching for existing cluster test-cluster...
Terminating master...
Terminating slaves...
输入y,然后回车便可终止该集群。
恭喜!现在你已经做到了在云端设置Spark集群,并在它上面运行了一个完全并发的示例程序,最后也终止了这个集群。如果在学习后续章节时你想在集群上运行示例或你自己的程序,都可以再次使用这些脚本并指定想要的集群规模和配置。(留意下费用并记得使用完毕后关闭它们就行。)
本章我们谈到了如何在自己的电脑以及Amazon EC2的云端上配置Spark环境。通过Scala交互式终端,我们学习了Spark编程模型的基础知识并了解了它的API。另外我们还分别用Scala、Java和Python语言,编写了一个简单的Spark程序。
下一章,我们将考虑如何使用Spark来创建一个机器学习系统。

我要回帖

 

随机推荐