如何通过Java程序java yarn 提交任务yarn的MapReduce计算任务

分布式计算 MapReduce与yarn工作机制
一、第一代hadoop组成与结构
第一代Hadoop,由分布式存储HDFS和分布式计算框架MapReduce组成,其中,HDFS由一个NameNode和多个DataNode组成,MapReduce由一个JobTracker和多个TaskTracker组成,对应Hadoop版本为Hadoop 1.x和0.21.X,0.22.x。
1、MapReduce角色分配
Client :作业提交发起者。
JobTracker: 初始化作业,分配作业,与TaskTracker通信,协调整个作业。
TaskTracker:保持JobTracker通信,在分配的数据片段上执行MapReduce任务。
2、MapReduce执行流程
(1)提交作业
在作业提交之前,需要对作业进行配置
程序代码,主要是自己书写的MapReduce程序。
输入输出路径
其他配置,如输出压缩等。
配置完成后,通过JobClinet来提交
(2)作业的初始化
客户端提交完成后,JobTracker会将作业加入队列,然后进行调度,默认的调度方法是FIFO调试方式。
(3)任务的分配
TaskTracker和JobTracker之间的通信与任务的分配是通过心跳机制完成的。
TaskTracker会主动向JobTracker询问是否有作业要做,如果自己可以做,那么就会申请到作业任务,这个任务可以使Map也可能是Reduce任务。
(4)任务的执行
申请到任务后,TaskTracker会做如下事情:
拷贝代码到本地
拷贝任务的信息到本地
启动JVM运行任务
(5)状态与任务的更新
任务在运行过程中,首先会将自己的状态汇报给TaskTracker,然后由TaskTracker汇总告之JobTracker。
任务进度是通过计数器来实现的。
(6)作业的完成
JobTracker是在接受到最后一个任务运行完成后,才会将任务标志为成功。
此时会做删除中间结果等善后处理工作。
二、第二代hadoop组成与结构
第二代Hadoop,为克服Hadoop 1.0中HDFS和MapReduce存在的各种问题而提出的。针对Hadoop 1.0中的单NameNode制约HDFS的扩展性问题,提出了HDFS Federation,它让多个NameNode分管不同的目录进而实现访问隔离和横向扩展;针对Hadoop 1.0中的MapReduce在扩展性和多框架支持方面的不足,提出了全新的资源管理框架YARN(Yet Another Resource Negotiator),它将JobTracker中的资源管理和作业控制功能分开,分别由ResourceManager和ApplicationMaster实现,其中,ResourceManager负责所有应用程序的资源分配,而ApplicationMaster仅负责管理一个应用程序。对应Hadoop版本为Hadoop 0.23.x和2.x。
1、 yarn运行架构
YARN 是下一代Hadoop计算平台,如下所示:
在 YARN 架构中,一个全局ResourceManager 以主要后台进程的形式运行,它通常在一台独立机器上运行,在各种竞争的应用程序之间仲裁可用的集群资源。
ResourceManager会追踪集群中有多少可用的活动节点和资源,协调用户提交的哪些应用程序应该在何时获取这些资源。ResourceManager是惟一拥有此信息的进程,所以它可通过某种共享的、安全的、多租户的方式制定分配(或者调度)决策(例如,依据应用程序优先级、队列容量、ACLs、数据位置等)。
在用户提交一个应用程序时,一个称为ApplicationMaster的轻量型进程实例会启动来协调应用程序内的所有任务的执行。这包括监视任务,重新启动失败的任务,推测性地运行缓慢的任务,以及计算应用程序计数器值的总和。这些职责以前是分配给单个 JobTracker来完成的。ApplicationMaster和属于它的应用程序的任务,在受NodeManager控制的资源容器中运行。
NodeManager是TaskTracker的一种更加普通和高效的版本。没有固定数量的 map 和 reduce slots,NodeManager 拥有许多动态创建的资源容器。容器的大小取决于它所包含的资源量,比如内存、CPU、磁盘和网络 IO。目前,仅支持内存和 CPU (YARN-3)。一个节点上的容器数量,由节点资源总量(比如总CPU数和总内存)共同决定。
需要说明的是:ApplicationMaster可在容器内运行任何类型的任务。例如,MapReduce ApplicationMaster请求一个容器来启动map或reduce 任务,而 Giraph ApplicationMaster请求一个容器来运行Giraph任务。
我们还可以实现一个自定义的 ApplicationMaster 来运行特定的任务,进而发明出一种全新的分布式应用程序框架,改变大数据格局。
在YARN中,MapReduce降级为一个分布式应用程序的一个角色(但仍是一个非常流行且有用的角色),现在称为 MRv2。MRv2 是经典MapReduce引擎(称为 MRv1)的重现,运行在YARN之上。
2、YARN可运行任何分布式应用程序
ResourceManager、NodeManager 和容器都不关心应用程序或任务的类型。所有特定于应用程序框架的代码都会转移到ApplicationMaster,以便任何分布式框架都可以受 YARN 支持。
得益于这个一般性的方法,Hadoop YARN集群可以运行许多不同分布式计算模型,例如:MapReduce、Giraph、Storm、Spark、Tez/Impala、MPI等。
3、YARN中提交应用程序
下面讨论在应用程序提交到YARN集群时,ResourceManager、ApplicationMaster、NodeManagers和容器如何相互交互。下图显示了一个例子。
假设用户采用与MRv1中相同的方式键入hadoop jar命令,将应用程序提交到 ResourceManager。ResourceManager维护在集群上运行的应用程序列表,以及每个活动的 NodeManager上的可用资源列表。
ResourceManager 需要确定哪个应用程序接下来应该获得一部分集群资源。该决策受到许多限制,比如队列容量、ACL 和公平性。ResourceManager 使用一个可插拔的 Scheduler。Scheduler 仅执行调度;它管理谁在何时获取集群资源(以容器的形式),但不会对应用程序内的任务执行任何监视,所以它不会尝试重新启动失败的任务。
在 ResourceManager接受一个新应用程序提交时,Scheduler制定的第一个决策是选择将用来运行ApplicationMaster的容器。在 ApplicationMaster启动后,它将负责此应用程序的整个生命周期。首先也是最重要的是,它将资源请求发送到 ResourceManager,请求运行应用程序的任务所需的容器。
资源请求是对一些容器的请求,用以满足一些资源需求,比如:
一定量的资源,目前使用MB内存和CPU份额来表示
一个首选的位置,由主机名、机架名称指定
此应用程序中的一个优先级,而不是跨多个应用程序
如果可能的话,ResourceManager 会分配一个满足ApplicationMaster在资源请求中所请求的容器(表达为容器 ID和主机名)。该容器允许应用程序使用特定主机上给定的资源量。分配一个容器后,ApplicationMaster会要求NodeManager(管理分配容器的主机)使用这些资源来启动一个特定于应用程序的任务。此任务可以是在任何框架中编写的任何进程(比如一个 MapReduce 任务或一个Giraph任务)。
NodeManager 不会监视任务;它仅监视容器中的资源使用情况,例如,如果一个容器消耗的内存比最初分配的更多,它会结束该容器。
ApplicationMaster会竭尽全力协调容器,启动所有需要的任务来完成它的应用程序。它还监视应用程序及其任务的进度,在新请求的容器中重新启动失败的任务,以及向提交应用程序的客户端报告进度。
应用程序完成后,ApplicationMaster 会关闭自己并释放自己的容器。
尽管ResourceManager不会对应用程序内的任务执行任何监视,但它会检查 ApplicationMaster的健康状况。如果 ApplicationMaster失败,ResourceManager 可在一个新容器中重新启动它。我们可以认为ResourceManager负责管理ApplicationMaster,而 ApplicationMasters负责管理任务。6101人阅读
hadoop(37)
http://blog.csdn.net/mercedesqq/article/details/#
在Hadoop上运行MapReduce任务的标准做法是把代码打包到jar里面,上传到服务器,然后用命令行启动。如果你是从一个Java应用中想要启动一个MapReduce,那么这个方法真是又土又麻烦。
其实YARN是可以通过Java程序向Hadoop集群提交MapReduce任务的。与普通的任务不同的是,远程提交的Job由于读不到服务器上的mapred-site.xml和yarn-site.xml,所以在Job的Configuration里面需要添加一些设置,然后再提交就可以了。
贴上一个示例代码,大家一看就明白了:
public class RemoteMapReduceService {
public static String startJob() throws Exception {
Job job = Job.getInstance();
job.setJobName(&xxxx&);
/***************************
*在这里,和普通的MapReduce一样,设置各种需要的东西
***************************/
//下面为了远程提交添加设置:
Configuration conf = job.getConfiguration();
conf.set(&mapreduce.framework.name&, &yarn&);
conf.set(&hbase.zookeeper.quorum&, &MASTER:2181&);
conf.set(&fs.default.name&, &hdfs://MASTER:8020&);
conf.set(&yarn.resourcemanager.resource-tracker.address&, &MASTER:8031&);
conf.set(&yarn.resourcemanager.address&, &MASTER:8032&);
conf.set(&yarn.resourcemanager.scheduler.address&, &MASTER:8030&);
conf.set(&yarn.resourcemanager.admin.address&, &MASTER:8033&);
conf.set(&yarn.application.classpath&, &$HADOOP_CONF_DIR,&
+&$HADOOP_COMMON_HOME/*,$HADOOP_COMMON_HOME/lib/*,&
+&$HADOOP_HDFS_HOME/*,$HADOOP_HDFS_HOME/lib/*,&
+&$HADOOP_MAPRED_HOME/*,$HADOOP_MAPRED_HOME/lib/*,&
+&$YARN_HOME/*,$YARN_HOME/lib/*,&
+&$HBASE_HOME/*,$HBASE_HOME/lib/*,$HBASE_HOME/conf/*&);
conf.set(&mapreduce.jobhistory.address&, &MASTER:10020&);
conf.set(&mapreduce.jobhistory.webapp.address&, &MASTER:19888&);
conf.set(&mapred.child.java.opts&, &-Xmx1024m&);
job.submit();
//提交以后,可以拿到JobID。根据这个JobID可以打开网页查看执行进度。
return job.getJobID().toString();
&&相关文章推荐
* 以上用户言论只代表其个人观点,不代表CSDN网站的观点或立场
访问:1578643次
积分:15599
积分:15599
排名:第679名
原创:299篇
转载:139篇
评论:272条
(4)(4)(2)(2)(2)(4)(3)(7)(3)(3)(2)(4)(9)(8)(2)(17)(7)(8)(14)(2)(6)(1)(2)(6)(2)(5)(5)(12)(9)(6)(18)(10)(12)(3)(5)(10)(22)(14)(4)(11)(1)(6)(1)(3)(1)(4)(12)(1)(1)(4)(3)(22)(1)(10)(9)(3)(3)(4)(1)(1)(2)(1)(1)(1)(3)(2)(2)(1)(1)(3)(1)(3)(1)(1)(5)(2)(5)(8)(2)(9)(16)(2)(6)如何通过Java程序提交yarn的MapReduce计算任务_Linux编程_Linux公社-Linux系统门户网站
你好,游客
如何通过Java程序提交yarn的MapReduce计算任务
来源:Linux社区&
作者:until_v
由于项目需求,需要通过Java程序提交Yarn的MapReduce的计算任务。与一般的通过Jar包提交MapReduce任务不同,通过程序提交MapReduce任务需要有点小变动,详见以下代码。
以下为MapReduce主程序,有几点需要提一下:
1、在程序中,我将文件读入格式设定为WholeFileInputFormat,即不对文件进行切分。
2、为了控制reduce的处理过程,map的输出键的格式为组合键格式。与常规的&key,value&不同,这里变为了&TextPair,Value&,TextPair的格式为&key1,key2&。
3、为了适应组合键,重新设定了分组函数,即GroupComparator。分组规则为,只要TextPair中的key1相同(不要求key2相同),则数据被分配到一个reduce容器中。这样,当相同key1的数据进入reduce容器后,key2起到了一个数据标识的作用。
package web.;
import java.io.IOE
import org.apache.hadoop.conf.Cimport org.apache.hadoop.fs.Pimport org.apache.hadoop.io.BytesWimport org.apache.hadoop.io.WritableCimport org.apache.hadoop.io.WritableCimport org.apache.hadoop.mapred.JobCimport org.apache.hadoop.mapred.JobCimport org.apache.hadoop.mapred.JobSimport org.apache.hadoop.mapreduce.Jimport org.apache.hadoop.mapreduce.Pimport org.apache.hadoop.mapreduce.lib.input.FileInputFimport org.apache.hadoop.mapreduce.lib.output.FileOutputFimport org.apache.hadoop.mapreduce.lib.output.NullOutputF
import util.U
public class GEMIMain {&&public GEMIMain(){& job =&}&&public J&public static class NamePartitioner extends& &Partitioner&TextPair, BytesWritable& {& @Override& public int getPartition(TextPair key, BytesWritable value,& & int numPartitions) {& &return Math.abs(key.getFirst().hashCode() * 127) % numP& }&}
&/**& * 分组设置类,只要两个TextPair的第一个key相同,他们就属于同一组。他们的Value就放到一个Value迭代器中,& * 然后进入Reducer的reduce方法中。& * & * @author hduser& * & */&public static class GroupComparator extends WritableComparator {& public GroupComparator() {& &super(TextPair.class, true);& }
& @Override& public int compare(WritableComparable a, WritableComparable b) {& &TextPair t1 = (TextPair)& &TextPair t2 = (TextPair)& &// 比较相同则返回0,比较不同则返回-1& &return t1.getFirst().compareTo(t2.getFirst()); // 只要是第一个字段相同的就分成为同一组& }&}&&&public& boolean runJob(String[] args) throws IOException,& &ClassNotFoundException, InterruptedException {& & Configuration conf = new Configuration();& // 在conf中设置outputath变量,以在reduce函数中可以获取到该参数的值& conf.set("outputPath", args[args.length - 1].toString());& //设置HDFS中,每次任务生成产品的质量文件所在文件夹。args数组的倒数第二个原数为质量文件所在文件夹& conf.set("qualityFolder", args[args.length - 2].toString());& //如果在Server中运行,则需要获取web项目的根路径;如果以java应用方式调试,则读取/opt/hadoop-2.5.0/etc/hadoop/目录下的配置文件& //MapReduceProgress mprogress = new MapReduceProgress();& //String rootPath= mprogress.rootP& String rootPath="/opt/hadoop-2.5.0/etc/hadoop/";& conf.addResource(new Path(rootPath+"yarn-site.xml"));& conf.addResource(new Path(rootPath+"core-site.xml"));& conf.addResource(new Path(rootPath+"hdfs-site.xml"));& conf.addResource(new Path(rootPath+"mapred-site.xml"));& this.job = new Job(conf);& & job.setJobName("Job name:" + args[0]);& job.setJarByClass(GEMIMain.class);
& job.setMapperClass(GEMIMapper.class);& job.setMapOutputKeyClass(TextPair.class);& job.setMapOutputValueClass(BytesWritable.class);& // 设置partition& job.setPartitionerClass(NamePartitioner.class);& // 在分区之后按照指定的条件分组& job.setGroupingComparatorClass(GroupComparator.class);
& job.setReducerClass(GEMIReducer.class);
& job.setInputFormatClass(WholeFileInputFormat.class);& job.setOutputFormatClass(NullOutputFormat.class);& // job.setOutputKeyClass(NullWritable.class);& // job.setOutputValueClass(Text.class);& job.setNumReduceTasks(8);& & & // 设置计算输入数据的路径& for (int i = 1; i & args.length - 2; i++) {& &FileInputFormat.addInputPath(job, new Path(args[i]));& }& // args数组的最后一个元素为输出路径& FileOutputFormat.setOutputPath(job, new Path(args[args.length - 1]));& boolean flag = job.waitForCompletion(true);&&}&&@SuppressWarnings("static-access")&public static void main(String[] args) throws ClassNotFoundException,& &IOException, InterruptedException {&& & String[] inputPaths = new String[] { "normalizeJob",& & "hdfs://192.168.168.101:9000/user/hduser/red1/",& & "hdfs://192.168.168.101:9000/user/hduser/nir1/","quality11111",& & "hdfs://192.168.168.101:9000/user/hduser/test" };& GEMIMain test = new GEMIMain();& boolean result = test.runJob(inputPaths);& & &
以下为TextPair类
public class TextPair implements WritableComparable&TextPair& {&private T&private T
&public TextPair() {& set(new Text(), new Text());&}
&public TextPair(String first, String second) {& set(new Text(first), new Text(second));&}
&public TextPair(Text first, Text second) {& set(first, second);&}
&public void set(Text first, Text second) {& this.first =& this.second =&}
&public Text getFirst() {&&}
&public Text getSecond() {&&}
&@Override&public void write(DataOutput out) throws IOException {& first.write(out);& second.write(out);&}
&@Override&public void readFields(DataInput in) throws IOException {& first.readFields(in);& second.readFields(in);&}
&@Override&public int hashCode() {& return first.hashCode() * 163 + second.hashCode();&}
&@Override&public boolean equals(Object o) {& if (o instanceof TextPair) {& &TextPair tp = (TextPair)& &return first.equals(tp.first) && second.equals(tp.second);& }&&}
&@Override&public String toString() {& return first + "\t" +&}&&@Override&/**A.compareTo(B)& * 如果比较相同,则比较结果为0& * 如果A大于B,则比较结果为1& * 如果A小于B,则比较结果为-1& * & */&public int compareTo(TextPair tp) {& int cmp = pareTo(tp.first);& if (cmp != 0) {& && }& //此时实现的是升序排列& pareTo(tp.second);&}}
以下为WholeFileInputFormat,其控制数据在mapreduce过程中不被切分
package web.
import java.io.IOE&
import org.apache.hadoop.fs.P& import org.apache.hadoop.io.BytesW& import org.apache.hadoop.io.T& import org.apache.hadoop.mapreduce.InputS& import org.apache.hadoop.mapreduce.JobC& import org.apache.hadoop.mapreduce.RecordR& import org.apache.hadoop.mapreduce.TaskAttemptC& import org.apache.hadoop.mapreduce.lib.input.FileInputF
public class WholeFileInputFormat extends FileInputFormat&Text, BytesWritable& {& &
& & @Override& & & public RecordReader&Text, BytesWritable& createRecordReader(& & & & & & & InputSplit arg0, TaskAttemptContext arg1) throws IOException,& & & & & & & InterruptedException {& & & & & // TODO Auto-generated method stub& & & & & return new WholeFileRecordReader();& & & }& & & & @Override& & & protected boolean isSplitable(JobContext context, Path filename) {& & & & & // TODO Auto-generated method stub& & & & && & & }& }&
以下为WholeFileRecordReader类
package web.
import java.io.IOE
import org.apache.hadoop.conf.Cimport org.apache.hadoop.fs.FSDataInputSimport org.apache.hadoop.fs.FileSimport org.apache.hadoop.fs.Pimport org.apache.hadoop.io.BytesWimport org.apache.hadoop.io.IOUimport org.apache.hadoop.io.Timport org.apache.hadoop.mapreduce.InputSimport org.apache.hadoop.mapreduce.RecordRimport org.apache.hadoop.mapreduce.TaskAttemptCimport org.apache.hadoop.mapreduce.lib.input.FileS
public class WholeFileRecordReader extends RecordReader&Text, BytesWritable& {
&private FileSplit fileS&private FSDataInputS
&private Text key =&private BytesWritable value =
&private boolean processed =
&@Override&public void close() throws IOException {& // TODO Auto-generated method stub& // fis.close();&}
&@Override&public Text getCurrentKey() throws IOException, InterruptedException {& // TODO Auto-generated method stub& return this.&}
&@Override&public BytesWritable getCurrentValue() throws IOException,& &InterruptedException {& // TODO Auto-generated method stub& return this.&}
&@Override&public void initialize(InputSplit inputSplit, TaskAttemptContext tacontext)& &throws IOException, InterruptedException {
& fileSplit = (FileSplit) inputS& Configuration job = tacontext.getConfiguration();& Path file = fileSplit.getPath();& FileSystem fs = file.getFileSystem(job);& fis = fs.open(file);&}
&@Override&public boolean nextKeyValue() {
& if (key == null) {& &key = new Text();& }
& if (value == null) {& &value = new BytesWritable();& }
& if (!processed) {& &byte[] content = new byte[(int) fileSplit.getLength()];
& &Path file = fileSplit.getPath();
& &System.out.println(file.getName());& &key.set(file.getName());
& &try {& & IOUtils.readFully(fis, content, 0, content.length);& & // value.set(content, 0, content.length);& & value.set(new BytesWritable(content));& &} catch (IOException e) {& & // TODO Auto-generated catch block& & e.printStackTrace();& &} finally {& & IOUtils.closeStream(fis);& &}
& &processed =& && }
&@Override&public float getProgress() throws IOException, InterruptedException {& // TODO Auto-generated method stub& return processed ? fileSplit.getLength() : 0;&}
Spark 颠覆 MapReduce 保持的排序记录&
数据库中实现 MapReduce&
MapReduce实现矩阵乘法--实现代码
基于MapReduce的图算法 PDF&
Hadoop的HDFS和MapReduce&
MapReduce 计数器简介
Hadoop技术内幕:深入解析MapReduce架构设计与实现原理 PDF高清扫描版
本文永久更新链接地址:
相关资讯 & & &
& (01月04日)
& (02/27/:06)
& (06月03日)
& (11/29/:20)
& (05/26/:47)
   同意评论声明
   发表
尊重网上道德,遵守中华人民共和国的各项有关法律法规
承担一切因您的行为而直接或间接导致的民事或刑事法律责任
本站管理人员有权保留或删除其管辖留言中的任意内容
本站有权在网站内转载或引用您的评论
参与本评论即表明您已经阅读并接受上述条款

我要回帖

更多关于 spark 提交到yarn 的文章

 

随机推荐