eclipse开发spark运行jar包应用程序 spark运行jar包2.1.0 导入哪个jar包

本文分析的源码基于Spark2.1.0版本,如果有理解不当的地方欢迎批评指正。
在之前的一篇文章中我们分析了Spark-submit脚本,发现该脚本会调用spark-class脚本检查参数设置,以及提交任务。最后发现,提交任务的入口类是org.apache.spark.deploy.SparkSubmit 我们接下来深入这个类,看看从提交任务到执行用户jar包之间都发生了什么;
首先找到org.apache.spark.deploy.SparkSubmit类的main方法:
main(args: Array[String]):
val appArgs =
new SparkSubmitArguments(args)
if (appArgs.verbose) {
// scalastyle:off println
printStream.println(appArgs)
// scalastyle:on println
appArgs.action match
case SparkSubmitAction.SUBMIT
=& submit(appArgs)
case SparkSubmitAction.KILL
=& kill(appArgs)
case SparkSubmitAction.REQUEST_STATUS
=& requestStatus(appArgs)
main方法很简单,首先利用参数创建了一个SparkSubmitArguments 这个类是SparkSubmitArgumentsParser的子类,主要工作就是对Spark应用的参数进行解析,以及加载当前和Spark相关的环境变量。
在SparkSubmitArguments中有一个action成员,用于表示spark-submit的动作,一般来说使用spark-submit有三个目的,第一是提交应用(这也是最常用的),第二是可以通过spark-submit杀死某个任务,第三是获取某个正在执行的任务的状态。这个action是通过参数指定的,默认值为submit即提交一个任务。
我们可以跳到submit方法看看,该方法定义如下:
private def submit(args: SparkSubmitArguments): Unit
仅接受一个SparkSubmitArguments实例作为参数,这个方法执行两个步骤,首先是基于集群管理器和部署模式设置合适的classpath、系统属性和应用程序参数,以此为运行用户的main方法做环境准备。
然后,使用第一步准备好的环境来启动main方法,这是通过反射完成的,我们下面再看。
Submit方法最终执行了其内部定义的doRunMain,而doRunMain方法会调用runMain(line 169)
runMain(childArgs, childClasspath, sysProps, childMainClass, args.verbose)
在runMain方法中可以看到这三行代码:分别是line 695:
mainClass = Utils.classForName(childMainClass)
这行代码利用反射加载main方法所在类,Utils.classForName方法最终还是调用的Class.forName方法;
line 722:
valmainMethod = mainClass.getMethod("main", new Array[String](0).getClass)
以及line 738:
mainMethod.invoke(null, childArgs.toArray)
上述两行分别获取main方法,然后执行main方法。这些动作都是在driver端完成的。
再理一下思路,spark提交jar的过程如下:
Spark-submit -& Spark-class -& org.apache.spark.deploy.SparkSubmit-& { main -& submit -& doRunMan -& runMain}
org.apache.spark.deploy.SparkSubmit主要负责准备运行环境以及通过反射获取app的main方法并执行。
为了方便理解,接下来我们对Spark利用反射加载运行用户应用程序的main方法做一个简易实现:
新建一个scala工程,在com.load包下新建一个Main类:
package com.load
* Created by hunan on .
object Main {
def main(args: Array[String]): Unit = {
println("Run main method success!")
这个用来模拟我们自己开发的Spark应用,然后打jar包,生成ScalaTest.jar
再创建另一个工程,将刚才的jar作为依赖添加进来,并写如下类:
package com.example
* Created by hunan on .
object Test {
def main(args: Array[String]): Unit = {
var mainClass:Class[_]=null
mainClass = Class.forName("com.load.Main", true, Thread.currentThread().getContextClassLoader)
case e:Exception=&
e.printStackTrace()
if(mainClass!=null){
val mainMethod = mainClass.getMethod("main",new Array[String](0).getClass)
mainMethod.invoke(null,Array[String]())
这个用来模拟Spark本身,运行即可发现输出如下:
这就是Spark提交jar的机制,我们也可以发现这里的main方法仅作为普通方法执行,只不过Spark会检查该mian方法是不是静态的,如果不是静态就抛出异常拒绝执行,如果修改722行的“main”字串也可以实现以任意方法名为Spark app执行入口。
spark-submit 提交参数总结
spark-submit --master yarn-cluster
--num-executors 48 --driver-memory 2g --executor-memory 7g --ex...
spark-submit提交spark任务的具体参数配置说明
spark提交任务常见的两种模式:1,local[k]:本地使用k个worker线程运行saprk程序.这种模式适合小批量数据在本地调试代码用.(若使用本地的文件,需要在前面加上:file://)2....
Spark任务提交-json参数踩坑
Spark提交任务时,需要传递两个参数,其中一个是json字段串json 参数如下:{
&dest_catalog&:&测试文件1&,
&site&:&tencent&,
&song_settings...
Spark2 文件处理和jar包执行
上传数据文件
mkdir -p data/ml/
hadoop fs -mkdir -p /datafile/wangxiao/
hadoop f...
1.为什么要让运行时Jar可以从yarn端访问spark2以后,原有lib目录下的大JAR包被分散成多个小JAR包,原来的spark-assembly-*.jar已经不存在每一次我们运行的时候,如果没...
spark的jar包没必要把所有相关的依赖都打进一个jar包中,因为这样会把spark、hadoop本身相关的jar包也打进去,但是这些依赖在用spark-submit命令运行时,会自动加载部署环境中...
在使用spark时,我们往往在主节点上编写好代码,然后分发到各个从节点进行计算,这里有个问题,如果主节点的代码调用了某个外部jar包,那么是不是在从节点上也要复制该jar包呢?
这篇文章给出了答案
...
第一次在spark环境下运行jar包,耶耶耶!!!心情赞赞哒!!!!
现在开发的工具也主要换成了Intellij,因为方便嘛!!!之前果然不应该用eclipse,简直是强求啊~~Intellij从开发...
没有更多推荐了,
(window.slotbydup=window.slotbydup || []).push({
id: "5865575",
container: s,
size: "300,250",
display: "inlay-fix"【版权声明】博客内容由厦门大学数据库实验室拥有版权,未经允许,请勿转载。版权所有,侵权必究!
前面已经学习了Spark安装,完成了实验环境的搭建,并且学习了Spark运行架构和RDD设计原理,同时,我们还学习了Scala编程的基本语法,有了这些基础知识作为铺垫,现在我们可以没有障碍地开始编写一个简单的Spark应用程序了——词频统计。
任务:编写一个Spark应用程序,对某个文件中的单词进行词频统计。
准备工作:请进入Linux系统,打开“终端”,进入Shell命令提示符状态,然后,执行如下命令新建目录:
cd /usr/local/spark
mkdir mycode
mkdir wordcount
cd wordcount
然后,在“/usr/local/spark/mycode/wordcount”目录下新建一个包含了一些语句的文本文件word.txt,命令如下:
vim word.txt
你可以在文本文件中随意输入一些单词,用空格隔开,我们会编写Spark程序对该文件进行单词词频统计。然后,按键盘Esc键退出vim编辑状态,输入“:wq”保存文件并退出vim编辑器。
在spark-shell中执行词频统计
启动spark-shell
首先,请登录Linux系统(要注意记住登录采用的用户名,本教程统一采用hadoop用户名进行登录),打开“终端”(可以在Linux系统中使用Ctrl+Alt+T组合键开启终端),进入shell命令提示符状态,然后执行以下命令进入spark-shell:
cd /usr/local/spark
./bin/spark-shell
....//这里省略启动过程显示的一大堆信息
启动进入spark-shell需要一点时间,在进入spark-shell后,我们可能还需要到Linux文件系统中对相关目录下的文件进行编辑和操作(比如要查看spark程序执行过程生成的文件),这个无法在park-shell中完成,因此,这里再打开第二个终端,用来在Linux系统的Shell命令提示符下操作。
加载本地文件
在开始具体词频统计代码之前,需要解决一个问题,就是如何加载文件?
要注意,文件可能位于本地文件系统中,也有可能存放在分布式文件系统HDFS中,所以,下面我们分别介绍如何加载本地文件,以及如何加载HDFS中的文件。
首先,请在第二个终端窗口下操作,用下面命令到达“/usr/local/spark/mycode/wordcount”目录,查看一下上面已经建好的word.txt的内容:
cd /usr/local/spark/mycode/wordcount
cat word.txt
cat命令会把word.txt文件的内容全部显示到屏幕上。
现有让我们切换回到第一个终端,也就是spark-shell,然后输入下面命令:
scala& val textFile = sc.textFile("file:///usr/local/spark/mycode/wordcount/word.txt")
上面代码中,val后面的是变量textFile,而sc.textFile()中的这个textFile是sc的一个方法名称,这个方法用来加载文件数据。这两个textFile不是一个东西,不要混淆。实际上,val后面的是变量textFile,你完全可以换个变量名称,比如,val lines = sc.textFile(“file:///usr/local/spark/mycode/wordcount/word.txt”)。这里使用相同名称,就是有意强调二者的区别。
注意,要加载本地文件,必须采用“file:///”开头的这种格式。执行上上面这条命令以后,并不会马上显示结果,因为,Spark采用惰性机制,只有遇到“行动”类型的操作,才会从头到尾执行所有操作。所以,下面我们执行一条“行动”类型的语句,就可以看到结果:
scala& textFile.first()
first()是一个“行动”(Action)类型的操作,会启动真正的计算过程,从文件中加载数据到变量textFile中,并取出第一行文本。屏幕上会显示很多反馈信息,这里不再给出,你可以从这些结果信息中,找到word.txt文件中的第一行的内容。
正因为Spark采用了惰性机制,在执行转换操作的时候,即使我们输入了错误的语句,spark-shell也不会马上报错,而是等到执行“行动”类型的语句时启动真正的计算,那个时候“转换”操作语句中的错误就会显示出来,比如:
val textFile = sc.textFile("file:///usr/local/spark/mycode/wordcount/word123.txt")
上面我们使用了一个根本就不存在的word123.txt,执行上面语句时,spark-shell根本不会报错,因为,没有遇到“行动”类型的first()操作之前,这个加载操作时不会真正执行的。然后,我们执行一个“行动”类型的操作first(),如下:
scala& textFile.first()
执行上面语句后,你会发现,会返回错误信息,其中有四个醒目的中文文字“拒绝连接”,因为,这个word123.txt文件根本就不存在。
好了,现在我们可以练习一下如何把textFile变量中的内容再次写回到另外一个文本文件wordback.txt中:
val textFile = sc.textFile("file:///usr/local/spark/mycode/wordcount/word.txt")
textFile.saveAsTextFile("file:///usr/local/spark/mycode/wordcount/writeback")
上面的saveAsTextFile()括号里面的参数是保存文件的路径,不是文件名。saveAsTextFile()是一个“行动”(Action)类型的操作,所以,马上会执行真正的计算过程,从word.txt中加载数据到变量textFile中,然后,又把textFile中的数据写回到本地文件目录“/usr/local/spark/mycode/wordcount/writeback/”下面,现在让我们切换到Linux Shell命令提示符窗口中,执行下面命令:
cd /usr/local/spark/mycode/wordcount/writeback/
执行结果类似下面:
part-00000 _SUCCESS
也就是说,该目录下包含两个文件,我们可以使用cat命令查看一下part-00000文件(注意:part-后面是五个零):
cat part-00000
显示结果,是和上面word.txt中的内容一样的。
加载HDFS中的文件
为了能够读取HDFS中的文件,请首先启动Hadoop中的HDFS组件。注意,之前我们在“Spark安装”这章内容已经介绍了如何安装Hadoop和Spark,所以,这里我们可以使用以下命令直接启动Hadoop中的HDFS组件(由于用不到MapReduce组件,所以,不需要启动MapReduce或者YARN)。请到第二个终端窗口,使用Linux Shell命令提示符状态,然后输入下面命令:
cd /usr/local/hadoop
./sbin/start-dfs.sh
启动结束后,HDFS开始进入可用状态。如果你在HDFS文件系统中,还没有为当前Linux登录用户创建目录(本教程统一使用用户名hadoop登录Linux系统),请使用下面命令创建:
./bin/hdfs dfs -mkdir -p /user/hadoop
也就是说,HDFS文件系统为Linux登录用户开辟的默认目录是“/user/用户名”(注意:是user,不是usr),本教程统一使用用户名hadoop登录Linux系统,所以,上面创建了“/user/hadoop”目录,再次强调,这个目录是在HDFS文件系统中,不在本地文件系统中。创建好以后,下面我们使用命令查看一下HDFS文件系统中的目录和文件:
./bin/hdfs dfs -ls .
上面命令中,最后一个点号“.”,表示要查看Linux当前登录用户hadoop在HDFS文件系统中与hadoop对应的目录下的文件,也就是查看HDFS文件系统中“/user/hadoop/”目录下的文件,所以,下面两条命令是等价的:
./bin/hdfs dfs -ls .
./bin/hdfs dfs -ls /user/hadoop
如果要查看HDFS文件系统根目录下的内容,需要使用下面命令:
./bin/hdfs dfs -ls /
下面,我们把本地文件系统中的“/usr/local/spark/mycode/wordcount/word.txt”上传到分布式文件系统HDFS中(放到hadoop用户目录下):
./bin/hdfs dfs -put /usr/local/spark/mycode/wordcount/word.txt .
然后,用命令查看一下HDFS的hadoop用户目录下是否多了word.txt文件,可以使用下面命令列出hadoop目录下的内容:
./bin/hdfs dfs -ls .
可以看到,确实多了一个word.txt文件,我们使用cat命令查看一个HDFS中的word.txt文件的内容,命令如下:
./bin/hdfs dfs -cat ./word.txt
上面命令执行后,就会看到HDFS中word.txt的内容了。
现在,让我们切换回到spark-shell窗口,编写语句从HDFS中加载word.txt文件,并显示第一行文本内容:
scala& val textFile = sc.textFile("hdfs://localhost:9000/user/hadoop/word.txt")
scala& textFile.first()
执行上面语句后,就可以看到HDFS文件系统中(不是本地文件系统)的word.txt的第一行内容了。
需要注意的是,sc.textFile(“hdfs://localhost:9000/user/hadoop/word.txt”)中,“hdfs://localhost:9000/”是前面介绍Hadoop安装内容时确定下来的端口地址9000。实际上,也可以省略不写,如下三条语句都是等价的:
val textFile = sc.textFile("hdfs://localhost:9000/user/hadoop/word.txt")
val textFile = sc.textFile("/user/hadoop/word.txt")
val textFile = sc.textFile("word.txt")
下面,我们再把textFile的内容写回到HDFS文件系统中(写到hadoop用户目录下):
scala& val textFile = sc.textFile("word.txt")
scala& textFile.saveAsTextFile("writeback")
执行上面命令后,文本内容会被写入到HDFS文件系统的“/user/hadoop/writeback”目录下,我们可以切换到Linux Shell命令提示符窗口查看一下:
./bin/hdfs dfs -ls .
执行上述命令后,在执行结果中,可以看到有个writeback目录,下面我们查看该目录下有什么文件:
./bin/hdfs dfs -ls ./writeback
执行结果中,可以看到存在两个文件:part-00000和_SUCCESS。我们使用下面命令输出part-00000文件的内容(注意:part-00000里面有五个零):
./bin/hdfs dfs -cat ./writeback/part-00000
执行结果中,就可以看到和word.txt文件中一样的文本内容。
有了前面的铺垫性介绍,下面我们就可以开始第一个Spark应用程序:WordCount。
请切换到spark-shell窗口:
scala& val textFile = sc.textFile("file:///usr/local/spark/mycode/wordcount/word.txt")
scala& val wordCount = textFile.flatMap(line =& line.split(" ")).map(word =& (word, 1)).reduceByKey((a, b) =& a + b)
scala& wordCount.collect()
上面只给出了代码,省略了执行过程中返回的结果信息,因为返回信息很多。
下面简单解释一下上面的语句。
textFile包含了多行文本内容,textFile.flatMap(line => line.split(” “))会遍历textFile中的每行文本内容,当遍历到其中一行文本内容时,会把文本内容赋值给变量line,并执行Lamda表达式line => line.split(” “)。line => line.split(” “)是一个Lamda表达式,左边表示输入参数,右边表示函数里面执行的处理逻辑,这里执行line.split(” “),也就是针对line中的一行文本内容,采用空格作为分隔符进行单词切分,从一行文本切分得到很多个单词构成的单词集合。这样,对于textFile中的每行文本,都会使用Lamda表达式得到一个单词集合,最终,多行文本,就得到多个单词集合。textFile.flatMap()操作就把这多个单词集合“拍扁”得到一个大的单词集合。
然后,针对这个大的单词集合,执行map()操作,也就是map(word => (word, 1)),这个map操作会遍历这个集合中的每个单词,当遍历到其中一个单词时,就把当前这个单词赋值给变量word,并执行Lamda表达式word => (word, 1),这个Lamda表达式的含义是,word作为函数的输入参数,然后,执行函数处理逻辑,这里会执行(word, 1),也就是针对输入的word,构建得到一个tuple,形式为(word,1),key是word,value是1(表示该单词出现1次)。
程序执行到这里,已经得到一个RDD,这个RDD的每个元素是(key,value)形式的tuple。最后,针对这个RDD,执行reduceByKey((a, b) => a + b)操作,这个操作会把所有RDD元素按照key进行分组,然后使用给定的函数(这里就是Lamda表达式:(a, b) => a + b),对具有相同的key的多个value进行reduce操作,返回reduce后的(key,value),比如(“hadoop”,1)和(“hadoop”,1),具有相同的key,进行reduce以后就得到(“hadoop”,2),这样就计算得到了这个单词的词频。
编写独立应用程序执行词频统计
下面我们编写一个Scala应用程序来实现词频统计。
请登录Linux系统(本教程统一采用用户名hadoop进行登录),进入Shell命令提示符状态,然后,执行下面命令:
cd /usr/local/spark/mycode/wordcount/
mkdir -p src/main/scala
//这里加入-p选项,可以一起创建src目录及其子目录
请在“/usr/local/spark/mycode/wordcount/src/main/scala”目录下新建一个test.scala文件,里面包含如下代码:
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf
object WordCount {
def main(args: Array[String]) {
val inputFile =
"file:///usr/local/spark/mycode/wordcount/word.txt"
val conf = new SparkConf().setAppName("WordCount").setMaster("local[2]")
val sc = new SparkContext(conf)
val textFile = sc.textFile(inputFile)
val wordCount = textFile.flatMap(line =& line.split(" ")).map(word =& (word, 1)).reduceByKey((a, b) =& a + b)
wordCount.foreach(println)
注意,SparkConf().setAppName(“WordCount”).setMaster(“local[2]”)这句语句,也可以删除.setMaster(“local[2]”),只保留 val conf = new SparkConf().setAppName(“WordCount”)。
如果test.scala没有调用SparkAPI,那么,只要使用scalac命令编译后执行即可。但是,这个test.scala程序依赖 Spark API,因此我们需要通过 sbt 进行编译打包(前面的“”这个章节已经介绍过如何使用sbt进行编译打包)。下面我们再演示一次。
请执行如下命令:
cd /usr/local/spark/mycode/wordcount/
vim simple.sbt
通过上面代码,新建一个simple.sbt文件,请在该文件中输入下面代码:
name := "Simple Project"
version := "1.0"
scalaVersion := "2.11.8"
libraryDependencies += "org.apache.spark" %% "spark-core" % "2.1.0"
注意, “org.apache.spark”后面是两个百分号,千万不要少些一个百分号%,如果少了,编译时候会报错。
下面我们使用 sbt 打包 Scala 程序。为保证 sbt 能正常运行,先执行如下命令检查整个应用程序的文件结构:
cd /usr/local/spark/mycode/wordcount/
应该是类似下面的文件结构:
./src/main
./src/main/scala
./src/main/scala/test.scala
./simple.sbt
./word.txt
接着,我们就可以通过如下代码将整个应用程序打包成 JAR(首次运行同样需要下载依赖包 ):
cd /usr/local/spark/mycode/wordcount/
//请一定把这目录设置为当前目录
/usr/local/sbt/sbt package
上面执行过程需要消耗几分钟时间,屏幕上会返回一下信息:
hadoop@dblab-VirtualBox:/usr/local/spark/mycode/wordcount$ /usr/local/sbt/sbt package
OpenJDK 64-Bit Server VM warning: ignoring option MaxPermSize=256M; support was removed in 8.0
[info] Set current project to Simple Project (in build file:/usr/local/spark/mycode/wordcount/)
[info] Updating {file:/usr/local/spark/mycode/wordcount/}wordcount...
[info] Resolving jline#2.12.1 ...
[info] Done updating.
[info] Packaging /usr/local/spark/mycode/wordcount/target/scala-2.11/simple-project_2.11-1.0.jar ...
[info] Done packaging.
[success] Total time: 34 s, completed
#屏幕上返回上述信息表明打包成功
生成的 jar 包的位置为 /usr/local/spark/mycode/wordcount/target/scala-2.11/simple-project_2.11-1.0.jar。
最后,通过 spark-submit 运行程序。我们就可以将生成的 jar 包通过 spark-submit 提交到 Spark 中运行了,命令如下:
/usr/local/spark/bin/spark-submit --class "WordCount"
/usr/local/spark/mycode/wordcount/target/scala-2.11/simple-project_2.11-1.0.jar
下面是笔者的word.txt进行词频统计后的结果(你的结果应该和这个类似):
(hadoop,2)【版权声明】博客内容由厦门大学数据库实验室拥有版权,未经允许,请勿转载!
一、方法简介
协同过滤是一种基于一组兴趣相同的用户或项目进行的推荐,它根据邻居用户(与目标用户兴趣相似的用户)的偏好信息产生对目标用户的推荐列表。
关于协同过滤的一个经典的例子就是看电影。如果你不知道哪一部电影是自己喜欢的或者评分比较高的,那么通常的做法就是问问周围的朋友,看看最近有什么好的电影推荐。而在问的时候,肯定都习惯于问跟自己口味差不多的朋友,这就是协同过滤的核心思想。因此,协同过滤是在海量数据中挖掘出小部分与你品味类似的用户,在协同过滤中,这些用户成为邻居,然后根据他们喜欢的东西组织成一个排序的目录推荐给你(如下图所示)。
基于用户的协同过滤推荐机制的基本原理
协同过滤算法主要分为基于用户的协同过滤算法和基于项目的协同过滤算法。MLlib当前支持基于模型的协同过滤,其中用户和商品通过一小组隐语义因子进行表达,并且这些因子也用于预测缺失的元素。Spark MLlib实现了
(ALS) 来学习这些隐性语义因子。
二、隐性反馈 vs 显性反馈
显性反馈行为包括用户明确表示对物品喜好的行为,隐性反馈行为指的是那些不能明确反应用户喜好的行为。在许多的现实生活中的很多场景中,我们常常只能接触到隐性的反馈,例如页面游览,点击,购买,喜欢,分享等等。
基于矩阵分解的协同过滤的标准方法,一般将用户商品矩阵中的元素作为用户对商品的显性偏好。在 MLlib 中所用到的处理这种数据的方法来源于文献:
。 本质上,这个方法将数据作为二元偏好值和偏好强度的一个结合,而不是对评分矩阵直接进行建模。因此,评价就不是与用户对商品的显性评分,而是与所观察到的用户偏好强度关联起来。然后,这个模型将尝试找到隐语义因子来预估一个用户对一个商品的偏好。
下面的例子中,我们将获取MovieLens数据集,其中每行包含一个用户、一个电影、一个该用户对该电影的评分以及时间戳。我们使用默认的ALS.train() 方法,即显性反馈(默认implicitPrefs 为false)来构建推荐模型并根据模型对评分预测的均方根误差来对模型进行评估。
1. 导入需要的包:
import org.apache.spark.ml.evaluation.RegressionEvaluator
import org.apache.spark.ml.recommendation.ALS
2. 根据数据结构创建读取规范:
创建一个Rating类型,即[Int, Int, Float, Long];然后建造一个把数据中每一行转化成Rating类的函数。
scala& case class Rating(userId: Int, movieId: Int, rating: Float, timestamp: Long)
defined class Rating
scala& def parseRating(str: String): Rating = {
val fields = str.split("::")
assert(fields.size == 4)
Rating(fields(0).toInt, fields(1).toInt, fields(2).toFloat, fields(3).toLong)
parseRating: (str: String)Rating
3. 读取数据:
导入implicits,读取MovieLens数据集,把数据转化成Rating类型;
scala& import spark.implicits._
import spark.implicits._
scala& val ratings = spark.sparkContext.textFile("file:///usr/local/spark/data/mllib/als/sample_movielens_ratings.txt").map(parseRating).toDF()
ratings: org.apache.spark.sql.DataFrame = [userId: int, movieId: int ... 2 more fields]
然后,我们把数据打印看一下:
scala& ratings.show()
+------+-------+------+----------+
|userId|movieId|rating| timestamp|
+------+-------+------+----------+
+------+-------+------+----------+
only showing top 20 rows
3. 构建模型
把MovieLens数据集划分训练集和测试集
scala& val Array(training, test) = ratings.randomSplit(Array(0.8, 0.2))
training: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [userId: int, movieId: int... 2 more fields]
test: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [userId: int, movieId: int ... 2 more fields]
使用ALS来建立推荐模型,这里我们构建了两个模型,一个是显性反馈,一个是隐性反馈
scala& val alsExplicit = new ALS().setMaxIter(5).setRegParam(0.01).setUserCol("userId"). setItemCol("movieId").setRatingCol("rating")
alsExplicit: org.apache.spark.ml.recommendation.ALS = als_05fe5d65ffc3
scala& val alsImplicit = new ALS().setMaxIter(5).setRegParam(0.01).setImplicitPrefs(true). setUserCol("userId").setItemCol("movieId").setRatingCol("rating")
alsImplicit: org.apache.spark.ml.recommendation.ALS = als_7e9b959fbdae
在 ML 中的实现有如下的参数:
numBlocks 是用于并行化计算的用户和商品的分块个数 (默认为10)。
rank 是模型中隐语义因子的个数(默认为10)。
maxIter 是迭代的次数(默认为10)。
regParam 是ALS的正则化参数(默认为1.0)。
implicitPrefs 决定了是用显性反馈ALS的版本还是用适用隐性反馈数据集的版本(默认是false,即用显性反馈)。
alpha 是一个针对于隐性反馈 ALS 版本的参数,这个参数决定了偏好行为强度的基准(默认为1.0)。
nonnegative 决定是否对最小二乘法使用非负的限制(默认为false)。
可以调整这些参数,不断优化结果,使均方差变小。比如:imaxIter越大,regParam越 小,均方差会越小,推荐结果较优。
接下来,把推荐模型放在训练数据上训练:
scala& val modelExplicit = alsExplicit.fit(training)
modelExplicit: org.apache.spark.ml.recommendation.ALSModel = als_05fe5d65ffc3
scala& val modelImplicit = alsImplicit.fit(training)
modelImplicit: org.apache.spark.ml.recommendation.ALSModel = als_7e9b959fbdae
4. 模型预测
使用训练好的推荐模型对测试集中的用户商品进行预测评分,得到预测评分的数据集
scala& val predictionsExplicit = modelExplicit.transform(test)
predictionsExplicit: org.apache.spark.sql.DataFrame = [userId: int, movieId: int ... 3 more fields]
scala& val predictionsImplicit = modelImplicit.transform(test)
predictionsImplicit: org.apache.spark.sql.DataFrame = [userId: int, movieId: int ... 3 more fields]
我们把结果输出,对比一下真实结果与预测结果:
scala& predictionsExplicit.show()
+------+-------+------+----------+------------+
|userId|movieId|rating| timestamp|
prediction|
+------+-------+------+----------+------------+
2.3084288|
1.9081671|
1.6470298|
5.7112412|
2.4970412|
1.9727222|
1.8414592|
3.2290685|
2.8074787|
0.7150749|
1.7827456|
2.3001173|
4.8762875|
1.5465991|
2.6036916|
3.1105173|
1.0042696|
+------+-------+------+----------+------------+
only showing top 20 rows
scala& predictionsImplicit.show()
+------+-------+------+----------+-----------+
|userId|movieId|rating| timestamp| prediction|
+------+-------+------+----------+-----------+
0.2783457|
0.1618208|
+------+-------+------+----------+-----------+
only showing top 20 rows
5. 模型评估
通过计算模型的均方根误差来对模型进行评估,均方根误差越小,模型越准确:
scala& val evaluator = new RegressionEvaluator().setMetricName("rmse").setLabelCol("rating"). setPredictionCol("prediction")
evaluator: org.apache.spark.ml.evaluation.RegressionEvaluator = regEval_bc9d91ae7b1a
scala& val rmseExplicit = evaluator.evaluate(predictionsExplicit)
rmseExplicit: Double = 1.5517
scala& val rmseImplicit = evaluator.evaluate(predictionsImplicit)
rmseImplicit: Double = 1.9165
打印出两个模型的均方根误差 :
scala& println(s"Explicit:Root-mean-square error = $rmseExplicit")
Explicit:Root-mean-square error = 1.5517
scala& println(s"Implicit:Root-mean-square error = $rmseImplicit")
Implicit:Root-mean-square error = 1.9165
可以看到打分的均方差值为1.69和1.80左右。由于本例的数据量很少,预测的结果和实际相比有一定的差距。

我要回帖

更多关于 jarofspark 的文章

 

随机推荐