Oakactionmapper作用在实际中发挥作用了吗

hadoop(2)
hello tom1
hello tom2
hello tom3
hello tom4
hello tom5
hello tom6
hello tom7
hello tom8
hello tom9
hello tom10
hello tom11
hello tom12
hello tom13
hello tom14
hello tom15
hello tom16
hello tom17
hello tom18
hello tom19
hello tom20
hello tom21
hello tom22
hello tom23
hello tom24
hello tom25
hello tom26
hello tom27
hello tom28
hello tom29
hello tom30
先写一个能产生数据倾斜的MapperReduce代码,如下:
import org.apache.hadoop.io.IntW
import org.apache.hadoop.io.LongW
import org.apache.hadoop.io.T
import org.apache.hadoop.mapreduce.M
import java.io.IOE
public class WCSkewMapper extends Mapper&LongWritable,Text,Text,IntWritable& {
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String[] arr = value.toString().split(& &);
Text keyOut = new Text();
IntWritable valueOut = new IntWritable();
for (String s : arr) {
keyOut.set(s);
valueOut.set(1);
context.write(keyOut, valueOut);
reduce 端:
import org.apache.hadoop.io.IntW
import org.apache.hadoop.io.T
import org.apache.hadoop.mapreduce.R
import java.io.IOE
public class WCSkewReducer extends Reducer&Text,IntWritable,Text,IntWritable&{
protected void reduce(Text key, Iterable&IntWritable& values, Context context) throws IOException, InterruptedException {
int count =0 ;
for(IntWritable iw : values){
count = count
+ iw.get();
context.write(key,new IntWritable(count));
import org.apache.hadoop.conf.C
import org.apache.hadoop.fs.P
import org.apache.hadoop.io.IntW
import org.apache.hadoop.io.T
import org.apache.hadoop.mapreduce.J
import org.apache.hadoop.mapreduce.lib.input.FileInputF
import org.apache.hadoop.mapreduce.lib.input.TextInputF
import org.apache.hadoop.mapreduce.lib.output.FileOutputF
import java.io.IOE
* Created on .
public class WCSkewApp {
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
conf.set(&fs.defaultFS&,&file:///&);
Job job = Job.getInstance(conf);
//设置job 的各种属性
job.setJobName(&WCAkewApp&);
job.setJarByClass(WCSkewApp.class);
job.setInputFormatClass(TextInputFormat.class);
//添加输入路径
FileInputFormat.addInputPath(job,new Path(&g:/comp/skew&));
FileOutputFormat.setOutputPath(job,new
Path(&g:/comp/out&));
//设置合成类
job.setCombinerClass(WCSkewReducer.class);
//设置任务类
job.setMapperClass(WCSkewMapper.class);
job.setReducerClass(WCSkewReducer.class);
//reduce任务数
job.setNumReduceTasks(4);
//设置kv类型
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
job.waitForCompletion(true);
执行以上代码,则会在g:/comp/out/下产生数据的数据。
30个hello 都进到一个reduce 执行,以下为解决办法:
利用随机分区解决:
import org.apache.hadoop.io.IntW
import org.apache.hadoop.io.LongW
import org.apache.hadoop.io.T
import org.apache.hadoop.mapreduce.M
import java.io.IOE
* Created on .
public class WCSkewMapper extends Mapper&LongWritable,Text,Text,IntWritable& {
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String[] arr = value.toString().split(& &);
Text keyOut = new Text();
IntWritable valueOut = new IntWritable();
for (String s : arr) {
keyOut.set(s);
valueOut.set(1);
context.write(keyOut, valueOut);
import org.apache.hadoop.io.IntW
import org.apache.hadoop.io.T
import org.apache.hadoop.mapreduce.R
import java.io.IOE
public class WCSkewReducer extends Reducer&Text,IntWritable,Text,IntWritable&{
protected void reduce(Text key, Iterable&IntWritable& values, Context context) throws IOException, InterruptedException {
int count =0 ;
for(IntWritable iw : values){
count = count
+ iw.get();
context.write(key,new IntWritable(count));
RandomPartitioner 端
import org.apache.hadoop.io.IntW
import org.apache.hadoop.io.T
import org.apache.hadoop.mapreduce.P
import java.util.R
public class RandomPartitioner extends Partitioner&Text,IntWritable&{
public int getPartition(Text text, IntWritable intWritable, int numPartitioner) {
return new Random().nextInt(numPartitioner);
import org.apache.hadoop.conf.C
import org.apache.hadoop.fs.P
import org.apache.hadoop.io.IntW
import org.apache.hadoop.io.T
import org.apache.hadoop.mapreduce.J
import org.apache.hadoop.mapreduce.lib.input.FileInputF
import org.apache.hadoop.mapreduce.lib.input.TextInputF
import org.apache.hadoop.mapreduce.lib.output.FileOutputF
public class WCSkewApp {
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
conf.set(&fs.defaultFS&,&file:///&);
Job job = Job.getInstance(conf);
//设置job 的各种属性
job.setJobName(&WCAkewApp&);
job.setJarByClass(WCSkewApp.class);
job.setInputFormatClass(TextInputFormat.class);
//添加输入路径
FileInputFormat.addInputPath(job,new Path(&g:/comp/skew&));
FileOutputFormat.setOutputPath(job,new
Path(&g:/comp/out&));
//设置合成类
job.setPartitionerClass(RandomPartitioner.class);//设置分区类
job.setCombinerClass(WCSkewReducer.class);
//设置任务类
job.setMapperClass(WCSkewMapper.class);
job.setReducerClass(WCSkewReducer.class);
//reduce任务数
job.setNumReduceTasks(4);
//设置kv类型
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
job.waitForCompletion(true);
利用随机分区,使hello分散在不同的reduceTask上进行计算,但执行到这步还没结束,因为此时产生的结果不是我想要的,因为hello是分散在不同的part-上,我们真正要的结果是每个单词出现的次数,所以我们还要进行一次Job(Mapper Reduce)任务, 如下:
import org.apache.hadoop.io.IntW
import org.apache.hadoop.io.LongW
import org.apache.hadoop.io.T
import org.apache.hadoop.mapreduce.M
import java.io.IOE
public class WCSkewMapper extends Mapper&LongWritable,Text,Text,IntWritable&{
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String[] arr = value.toString().split(&\t&);
context.write(new Text(arr[0]), new IntWritable(Integer.parseInt(arr[1])));
import org.apache.hadoop.io.IntW
import org.apache.hadoop.io.T
import org.apache.hadoop.mapreduce.R
import java.io.IOE
* Created on .
public class WCSkewReducer extends Reducer&Text,IntWritable,Text,IntWritable&{
protected void reduce(Text key, Iterable&IntWritable& values, Context context) throws IOException, InterruptedException {
int count = 0;
for (IntWritable iw : values) {
count = count + iw.get();
context.write(key, new IntWritable(count));
import org.apache.hadoop.conf.C
import org.apache.hadoop.fs.P
import org.apache.hadoop.io.IntW
import org.apache.hadoop.io.T
import org.apache.hadoop.mapreduce.J
import org.apache.hadoop.mapreduce.lib.input.FileInputF
import org.apache.hadoop.mapreduce.lib.input.TextInputF
import org.apache.hadoop.mapreduce.lib.output.FileOutputF
*解决数据倾斜问题
public class WCSkewApp {
public static void main(String[] args) throws Exception {
//加载配置文件
Configuration conf = new Configuration();
//设置本地文件系统
conf.set(&fs.defaultFS&, &file:///&);
//创建job对象
Job job = Job.getInstance(conf);
//设置job的属性
job.setJobName(&WCSkewApp&);
job.setJarByClass(WCSkewApp.class);
//设置文件输入格式
job.setInputFormatClass(TextInputFormat.class);
//设置Mapper Reduce类
job.setMapperClass(WCSkewMapper.class);
job.setReducerClass(WCSkewReducer.class);
//设置map reduce 的kv 建输出类型
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
//设置文件输入和输出路径
FileInputFormat.addInputPath(job, new Path(&g:/comp/out/part-r-00000&));
FileInputFormat.addInputPath(job, new Path(&g:/comp/out/part-r-00001&));
FileInputFormat.addInputPath(job, new Path(&g:/comp/out/part-r-00002&));
FileInputFormat.addInputPath(job, new Path(&g:/comp/out/part-r-00003&));
FileOutputFormat.setOutputPath(job, new Path(&g:/comp/out8&));
//设置reduce 个数
job.setNumReduceTasks(4);
job.waitForCompletion(true);
}执行此job任务之后,得到的hello 为其总数,即为我们想要的 此时数据倾斜问题得到解决。。
在上述解决数据倾斜问题的第二个job任务中,在App端的输入格式还可以设置成为
KeyValueTextInputFormat泛型为&Text,Text&(本来是TextInputformat),需要注意的是此时map 端的输入输出均为Text 类型
代码如下:
import org.apache.hadoop.io.IntW
import org.apache.hadoop.io.LongW
import org.apache.hadoop.io.T
import org.apache.hadoop.mapreduce.M
import java.io.IOE
* Created on .
public class WCSkewMapper extends Mapper&Text,Text,Text,IntWritable&{
protected void map(Text key, Text value, Context context) throws IOException, InterruptedException {
context.write(key, new IntWritable(Integer.parseInt(value.toString())));
import org.apache.hadoop.io.IntW
import org.apache.hadoop.io.T
import org.apache.hadoop.mapreduce.R
import java.io.IOE
public class WCSkewReducer extends Reducer&Text,IntWritable,Text,IntWritable&{
protected void reduce(Text key, Iterable&IntWritable& values, Context context) throws IOException, InterruptedException {
int count = 0;
for (IntWritable iw : values) {
count = count + iw.get();
context.write(key, new IntWritable(count));
import org.apache.hadoop.conf.C
import org.apache.hadoop.fs.P
import org.apache.hadoop.io.IntW
import org.apache.hadoop.io.T
import org.apache.hadoop.mapreduce.J
import org.apache.hadoop.mapreduce.lib.input.FileInputF
import org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputF
import org.apache.hadoop.mapreduce.lib.output.FileOutputF
*解决数据倾斜问题
public class WCSkewApp {
public static void main(String[] args) throws Exception {
//加载配置文件
Configuration conf = new Configuration();
//设置本地文件系统
conf.set(&fs.defaultFS&, &file:///&);
//创建job对象
Job job = Job.getInstance(conf);
//设置job的属性
job.setJobName(&WCSkewApp&);
job.setJarByClass(WCSkewApp.class);
//设置文件输入格式
job.setInputFormatClass(KeyValueTextInputFormat.class);
//设置Mapper Reduce类
job.setMapperClass(WCSkewMapper.class);
job.setReducerClass(WCSkewReducer.class);
//设置map reduce 的kv 建输出类型
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
//设置文件输入和输出路径
FileInputFormat.addInputPath(job, new Path(&g:/comp/out/part-r-00000&));
FileInputFormat.addInputPath(job, new Path(&g:/comp/out/part-r-00001&));
FileInputFormat.addInputPath(job, new Path(&g:/comp/out/part-r-00002&));
FileInputFormat.addInputPath(job, new Path(&g:/comp/out/part-r-00003&));
FileOutputFormat.setOutputPath(job, new Path(&g:/comp/out8&));
//设置reduce 个数
job.setNumReduceTasks(4);
job.waitForCompletion(true);
参考知识库
* 以上用户言论只代表其个人观点,不代表CSDN网站的观点或立场
访问:909次
排名:千里之外
原创:29篇
转载:11篇
(5)(12)(17)(6)TimeMapper:在高雄事件中发挥重要作用的开源地图软件
发表于 11:27|
作者CSDN CODE
摘要:台湾高雄气爆事件之后,Timemap 时间地图开发者 hychen 整理媒体的报导,加到他开发的时间地图上,让人可以用时间轴的方式看到整个事件。
TimeMapper是开放知识基础实验室(
)的一个开源项目,可以利用谷歌电子表格(Spreadsheet)快速创建优雅的时间表及时间地图。TimeMapper是基于许多开源项目开发而成的,包括
项目主页:&代码托管地址:&
使用步骤:
1. 创建一个电子表格
在你的Google电子数据表上添加日期和地点
2. 连接和定制
将TimeMapper连接到你的电子表格并定制结果
3. 发布、嵌入和分享
发布您的TimeMap url,然后分享或嵌入在你的网站上应用实例:台湾高雄气爆事件之后,开发者
整理媒体的报导,加到他开发的时间地图上,让人可以用时间轴的方式看到整个事件,也可以看到事件在空间上的分布,呈现整个事件全貌。时间地图就是改自开源的TimeMapper
套件。与开放知识基金会的版本相比,时间地图具有套叠台南古地图功能,使用移动设备浏览时间地图只会显示地图,支持时间轴标记功能,地图宽度较大。
相关新闻:
推荐阅读相关主题:
CSDN官方微信
扫描二维码,向CSDN吐槽
微信号:CSDNnews
相关热门文章太阳能发电系统对企业的好处-行业动态-德兰明海官网
&当前位置:
太阳能发电系统对企业的好处
&&&&&|&&&& &
太阳能发电系统对企业的好处
随着太阳能发电越来越普遍,它涌入全国各地的社区。你可能已经看到在你的城镇的房子介绍太阳能电池板近年来。他们已经走了很长的路,并帮助数百万家庭节省了他们的能源账单,并促进环境的绿色实践。但它不会停在那里。太阳能电池板不仅用于家庭和社区,而且有多种商业太阳能发电系统的选择。
太阳能电池板实际上是在商业世界中发挥作用。商业地产是使用太阳能的一些最新发展,它只会继续增长。业主发现,切换到太阳能电池板可以获得多个好处。
为什么你的企业应该使用太阳能发电系统
实用工具节省。这也是房主选择太阳能电池板的原因。公用事业费用是一笔巨大的开支,它们往往随着建筑物的规模而增长。你的业务能做什么用额外的钱,是不是一个昂贵的电费账单?这可能看起来微不足道,但如果你累加每年的成本,你会惊讶。
存储/销售额外的能源更有可能是商业房地产。商业往往有更多的屋顶面积和可访问性。这意味着企业可以容纳更多的太阳能电池板,利用更多的太阳能并存储以备将来使用。在许多州和社区,当地电网将从您那里购买多余的能源,因此太阳能电池板实际上可以赚取您的商业资金。
绿色倡议 - 太阳能绿色是您公司的优秀遗产。有许多客户和其他业务,将使您的使用太阳能留下深刻的印象和激励。它可以打开许多门,为您带来业务,并创造尊重的声誉。使用太阳能电池板的企业将永远被视为聪明和全球意识。
在曲线前面。在未来,每个人都将使用太阳能电池板用于能源。所有家庭和企业迟早都会采用太阳能技术。你可以超越曲线,显示你的实力作为一个关注未来的企业,并准备好处理它带来的任何。想想互联网 - 公司缓慢采用互联网失去了业务,甚至落在路边!那些学习利用新技术的人会发现自己准备好迎接未来并发展业务。
税收优惠。前期成本似乎令人望而生畏,但你知道为您的企业安装太阳能电池板可能是一个巨大的税收优势吗?太阳能电池板的购买和安装由国家政府补贴,您收到巨大的税收抵免和回报,以切换到太阳能电池板。
无论你是小型企业还是大型企业,选择太阳能电池板都会有很大的区别。不仅可以消除您的能源费用,而且您还可以最终赚钱,并加强您作为一个环保和技术精明的公司的声誉。电话或联系德兰明海太阳能看看太阳能电池板是否适合你的企业发展需求。
上一篇&&:&&nbsp
下一篇&&:&&nbsp

我要回帖

更多关于 mapperscan 注解作用 的文章

 

随机推荐