r语言计算均方误差在计算矩阵乘法时的误差怎样避免

矩阵是元素布置成二维矩形布局的R对象。 它们包含相同原子类型的元素。尽管我们可以创建只包含字符或只逻辑值的矩阵,但是它们没有多大用处。我们使用的是在数学计算中含有数字元素矩阵。
使用 matrix()函数创建一个矩阵。
R语言中创建矩阵的基本语法是:
matrix(data, nrow, ncol, byrow, dimnames)
以下是所使用的参数的说明:
data&-&是这成为矩阵的数据元素输入向量。
nrow&-&是要创建的行数。
ncol&-&要被创建的列的数目。
byrow&-&是一个合乎逻辑。如果为True,那么输入向量元素在安排的行。
dimname&-&是分配给行和列名称。
创建矩阵取向量的数量作为输入
# Elements are arranged sequentially by row.
M &- matrix(c(3:14), nrow=4, byrow=TRUE)
# Elements are arranged sequentially by column.
N &- matrix(c(3:14), nrow=4, byrow=FALSE)
# Define the column and row names.
rownames = c("row1", "row2", "row3", "row4")
colnames = c("col1", "col2", "col3")
P &- matrix(c(3:14), nrow=4, byrow=TRUE, dimnames=list(rownames, colnames))
当我们上面的代码执行时,它产生以下结果:
[,1] [,2] [,3]
[,1] [,2] [,3]
col1 col2 col3
访问矩阵的元素
矩阵的元素可以通过使用元素的列和行索引来访问。我们考虑矩阵P上面找到具体内容如下。
# Define the column and row names.
rownames = c("row1", "row2", "row3", "row4")
colnames = c("col1", "col2", "col3")
# Create the matrix.
P &- matrix(c(3:14), nrow=4, byrow=TRUE, dimnames=list(rownames, colnames))
# Access the element at 3rd column and 1st row.
print(P[1,3])
# Access the element at 2nd column and 4th row.
print(P[4,2])
# Access only the
print(P[2,])
# Access only the 3rd column.
print(P[,3])
当我们上面的代码执行时,它产生以下结果:
col1 col2 col3
row1 row2 row3 row4
各种数学操作是在使用R运算矩阵执行。该操作的结果也是一个矩阵。
大小(行和列的数目)应与参与操作的矩阵相同。
矩阵加法和减法
# Create two 2x3 matrices.
matrix1 &- matrix(c(3, 9, -1, 4, 2, 6), nrow=2)
print(matrix1)
matrix2 &- matrix(c(5, 2, 0, 9, 3, 4), nrow=2)
print(matrix2)
# Add the matrices.
result &- matrix1 + matrix2
cat("Result of addition","n")
print(result)
# Subtract the matrices
result &- matrix1 - matrix2
cat("Result of subtraction","n")
print(result)
当我们上面的代码执行时,它产生以下结果:
[,1] [,2] [,3]
[,1] [,2] [,3]
Result of addition
[,1] [,2] [,3]
Result of subtraction
[,1] [,2] [,3]
矩阵乘法和除法
# Create two 2x3 matrices.
matrix1 &- matrix(c(3, 9, -1, 4, 2, 6), nrow=2)
print(matrix1)
matrix2 &- matrix(c(5, 2, 0, 9, 3, 4), nrow=2)
print(matrix2)
# Multiply the matrices.
result &- matrix1 * matrix2
cat("Result of multiplication","n")
print(result)
# Divide the matrices
result &- matrix1 / matrix2
cat("Result of division","n")
print(result)
当我们上面的代码执行时,它产生以下结果:
[,1] [,2] [,3]
[,1] [,2] [,3]
Result of multiplication
[,1] [,2] [,3]
Result of division
-Inf 0.6666667
4.5 0..5000000
Copyright &
All Rights Reserved &&&&&&我是流氓,我怕谁!
遇到了小bug:
看到网上别人的做法,发现了用class(A)和class(B)之后才发现,是因为读入的时候数据的类型不对,A、B的类型并不是matrix,才导致了这个问题。
用as.matrix来变型一下,就OK了。
阅读(...) 评论()用MapReduce实现矩阵乘法 - 推酷
用MapReduce实现矩阵乘法
Hadoop家族系列文章,主要介绍Hadoop家族产品,常用的项目包括Hadoop, Hive, Pig, HBase, Sqoop, Mahout, Zookeeper, Avro, Ambari, Chukwa,新增加的项目包括,YARN, Hcatalog, Oozie, Cassandra, Hama, Whirr, Flume, Bigtop, Crunch, Hue等。
从2011年开始,中国进入大数据风起云涌的时代,以Hadoop为代表的家族软件,占据了大数据处理的广阔地盘。开源界及厂商,所有数据软件,无一不向Hadoop靠拢。Hadoop也从小众的高富帅领域,变成了大数据开发的标准。在Hadoop原有技术基础之上,出现了Hadoop家族产品,通过“大数据”概念不断创新,推出科技进步。
作为IT界的开发人员,我们也要跟上节奏,抓住机遇,跟着Hadoop一起雄起!
关于作者:
张丹(Conan), 程序员Java,R,PHP,Javascript
weibo:@Conan_Z
blog:&http://blog.fens.me
转载请注明出处:
MapReduce打开了并行计算的大门,让我们个人开发者有了处理大数据的能力。但想用好MapReduce,把原来单机算法并行化,也不是一件容易事情。很多的时候,我们需要从单机算法能否矩阵化去思考,所以矩阵操作就变成了算法并行化的基础。
像推荐系统的协同过滤算法,就是基于矩阵思想实现MapReduce并行化。
矩阵乘法的R语言计算
矩阵乘法的MapReduce计算
稀疏矩阵乘法的MapReduce计算
1. 矩阵介绍
矩阵: 数学上,一个m&n的矩阵是一个由m行n列元素排列成的矩形阵列。矩阵里的元素可以是数字、符号或数学式。以下是一个由6个数字符素构成的2行3列的矩阵:
大小相同(行数列数都相同)的矩阵之间可以相互加减,具体是对每个位置上的元素做加减法。
举例:两个矩阵的加法
1+0 3+0 1+5
1+7 0+5 0+0
两个矩阵可以相乘,当且仅当第一个矩阵的列数等于第二个矩阵的行数。矩阵的乘法满足结合律和分配律,但不满足交换律。
举例:两个矩阵的乘法
(1*3+0*2+2*1)
(1*1+0*1+2*0)
(-1*3+3*2+1*1) (-1*1+3*1+1*0)
2. 矩阵乘法的R语言计算
& m1&-matrix(c(1,0,2,-1,3,1),nrow=2,byrow=TRUE);m1
[,1] [,2] [,3]
& m2&-matrix(c(3,1,2,1,1,0),nrow=3,byrow=TRUE);m2
& m3&-m1 %*% m2;m3
由R语言实现矩阵的乘法是非常简单的。
3. 矩阵乘法的MapReduce计算
算法实现思路:
新建2个矩阵数据文件:m1.csv, m2.csv
新建启动程序:MainRun.java
新建MR程序:MartrixMultiply.java
1).新建2个矩阵数据文件m1.csv, m2.csv
3).新建启动程序:MainRun.java
package org.conan.myhadoop.
import java.util.HashM
import java.util.M
import java.util.regex.P
import org.apache.hadoop.mapred.JobC
public class MainRun {
public static final String HDFS = &hdfs://192.168.1.210:9000&;
public static final Pattern DELIMITER = pile(&[\t,]&);
public static void main(String[] args) {
martrixMultiply();
public static void martrixMultiply() {
Map&String, String& path = new HashMap&String, String&();
path.put(&m1&, &logfile/matrix/m1.csv&);// 本地的数据文件
path.put(&m2&, &logfile/matrix/m2.csv&);
path.put(&input&, HDFS + &/user/hdfs/matrix&);// HDFS的目录
path.put(&input1&, HDFS + &/user/hdfs/matrix/m1&);
path.put(&input2&, HDFS + &/user/hdfs/matrix/m2&);
path.put(&output&, HDFS + &/user/hdfs/matrix/output&);
MartrixMultiply.run(path);// 启动程序
} catch (Exception e) {
e.printStackTrace();
System.exit(0);
public static JobConf config() {// Hadoop集群的远程配置信息
JobConf conf = new JobConf(MainRun.class);
conf.setJobName(&MartrixMultiply&);
conf.addResource(&classpath:/hadoop/core-site.xml&);
conf.addResource(&classpath:/hadoop/hdfs-site.xml&);
conf.addResource(&classpath:/hadoop/mapred-site.xml&);
3).新建MR程序:MartrixMultiply.java
MapReduce程序
package org.conan.myhadoop.
import java.io.IOE
import java.util.HashM
import java.util.I
import java.util.M
import org.apache.hadoop.fs.P
import org.apache.hadoop.io.IntW
import org.apache.hadoop.io.LongW
import org.apache.hadoop.io.T
import org.apache.hadoop.mapred.JobC
import org.apache.hadoop.mapreduce.J
import org.apache.hadoop.mapreduce.M
import org.apache.hadoop.mapreduce.R
import org.apache.hadoop.mapreduce.lib.input.FileInputF
import org.apache.hadoop.mapreduce.lib.input.FileS
import org.apache.hadoop.mapreduce.lib.input.TextInputF
import org.apache.hadoop.mapreduce.lib.output.FileOutputF
import org.apache.hadoop.mapreduce.lib.output.TextOutputF
import org.conan.myhadoop.hdfs.HdfsDAO;
public class MartrixMultiply {
public static class MatrixMapper extends Mapper&LongWritable, Text, Text, Text& {
private S// m1 or m2
private int rowNum = 2;// 矩阵A的行数
private int colNum = 2;// 矩阵B的列数
private int rowIndexA = 1; // 矩阵A,当前在第几行
private int rowIndexB = 1; // 矩阵B,当前在第几行
protected void setup(Context context) throws IOException, InterruptedException {
FileSplit split = (FileSplit) context.getInputSplit();
flag = split.getPath().getName();// 判断读的数据集
public void map(LongWritable key, Text values, Context context) throws IOException, InterruptedException {
String[] tokens = MainRun.DELIMITER.split(values.toString());
if (flag.equals(&m1&)) {
for (int i = 1; i &= rowN i++) {
Text k = new Text(rowIndexA + &,& + i);
for (int j = 1; j &= tokens. j++) {
Text v = new Text(&A:& + j + &,& + tokens[j - 1]);
context.write(k, v);
System.out.println(k.toString() + &
& + v.toString());
rowIndexA++;
} else if (flag.equals(&m2&)) {
for (int i = 1; i &= tokens. i++) {
for (int j = 1; j &= colN j++) {
Text k = new Text(i + &,& + j);
Text v = new Text(&B:& + rowIndexB + &,& + tokens[j - 1]);
context.write(k, v);
System.out.println(k.toString() + &
& + v.toString());
rowIndexB++;
public static class MatrixReducer extends Reducer&Text, Text, Text, IntWritable& {
public void reduce(Text key, Iterable&Text& values, Context context) throws IOException, InterruptedException {
Map&String, String& mapA = new HashMap&String, String&();
Map&String, String& mapB = new HashMap&String, String&();
System.out.print(key.toString() + &:&);
for (Text line : values) {
String val = line.toString();
System.out.print(&(&+val+&)&);
if (val.startsWith(&A:&)) {
String[] kv = MainRun.DELIMITER.split(val.substring(2));
mapA.put(kv[0], kv[1]);
// System.out.println(&A:& + kv[0] + &,& + kv[1]);
} else if (val.startsWith(&B:&)) {
String[] kv = MainRun.DELIMITER.split(val.substring(2));
mapB.put(kv[0], kv[1]);
// System.out.println(&B:& + kv[0] + &,& + kv[1]);
int result = 0;
Iterator&String& iter = mapA.keySet().iterator();
while (iter.hasNext()) {
String mapk = iter.next();
result += Integer.parseInt(mapA.get(mapk)) * Integer.parseInt(mapB.get(mapk));
context.write(key, new IntWritable(result));
System.out.println();
// System.out.println(&C:& + key.toString() + &,& + result);
public static void run(Map&String, String& path) throws IOException, InterruptedException, ClassNotFoundException {
JobConf conf = MainRun.config();
String input = path.get(&input&);
String input1 = path.get(&input1&);
String input2 = path.get(&input2&);
String output = path.get(&output&);
HdfsDAO hdfs = new HdfsDAO(MainRun.HDFS, conf);
hdfs.rmr(input);
hdfs.mkdirs(input);
hdfs.copyFile(path.get(&m1&), input1);
hdfs.copyFile(path.get(&m2&), input2);
Job job = new Job(conf);
job.setJarByClass(MartrixMultiply.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
job.setMapperClass(MatrixMapper.class);
job.setReducerClass(MatrixReducer.class);
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
FileInputFormat.setInputPaths(job, new Path(input1), new Path(input2));// 加载2个输入数据集
FileOutputFormat.setOutputPath(job, new Path(output));
job.waitForCompletion(true);
Delete: hdfs://192.168.1.210:9000/user/hdfs/matrix
Create: hdfs://192.168.1.210:9000/user/hdfs/matrix
copy from: logfile/matrix/m1.csv to hdfs://192.168.1.210:9000/user/hdfs/matrix/m1
copy from: logfile/matrix/m2.csv to hdfs://192.168.1.210:9000/user/hdfs/matrix/m2
10:48:03 org.apache.hadoop.util.NativeCodeLoader &clinit&
警告: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
10:48:03 org.apache.hadoop.mapred.JobClient copyAndConfigureFiles
警告: Use GenericOptionsParser for parsing the arguments. Applications should implement Tool for the same.
10:48:03 org.apache.hadoop.mapred.JobClient copyAndConfigureFiles
警告: No job jar file set.
User classes may not be found. See JobConf(Class) or JobConf#setJar(String).
10:48:03 org.apache.hadoop.mapreduce.lib.input.FileInputFormat listStatus
信息: Total input paths to process : 2
10:48:03 org.apache.press.snappy.LoadSnappy &clinit&
警告: Snappy native library not loaded
10:48:04 org.apache.hadoop.mapred.JobClient monitorAndPrintJob
信息: Running job: job_local_0001
10:48:04 org.apache.hadoop.mapred.Task initialize
Using ResourceCalculatorPlugin : null
10:48:04 org.apache.hadoop.mapred.MapTask$MapOutputBuffer &init&
信息: io.sort.mb = 100
10:48:04 org.apache.hadoop.mapred.MapTask$MapOutputBuffer &init&
信息: data buffer = 14720
10:48:04 org.apache.hadoop.mapred.MapTask$MapOutputBuffer &init&
信息: record buffer = 680
10:48:04 org.apache.hadoop.mapred.MapTask$MapOutputBuffer flush
信息: Starting flush of map output
10:48:04 org.apache.hadoop.mapred.MapTask$MapOutputBuffer sortAndSpill
信息: Finished spill 0
10:48:04 org.apache.hadoop.mapred.Task done
信息: Task:attempt_local_0001_m_ is done. And is in the process of commiting
10:48:05 org.apache.hadoop.mapred.JobClient monitorAndPrintJob
map 0% reduce 0%
10:48:07 org.apache.hadoop.mapred.LocalJobRunner$Job statusUpdate
10:48:07 org.apache.hadoop.mapred.Task sendDone
信息: Task 'attempt_local_0001_m_' done.
10:48:07 org.apache.hadoop.mapred.Task initialize
Using ResourceCalculatorPlugin : null
10:48:07 org.apache.hadoop.mapred.MapTask$MapOutputBuffer &init&
信息: io.sort.mb = 100
10:48:07 org.apache.hadoop.mapred.MapTask$MapOutputBuffer &init&
信息: data buffer = 14720
10:48:07 org.apache.hadoop.mapred.MapTask$MapOutputBuffer &init&
信息: record buffer = 680
10:48:07 org.apache.hadoop.mapred.MapTask$MapOutputBuffer flush
信息: Starting flush of map output
10:48:07 org.apache.hadoop.mapred.MapTask$MapOutputBuffer sortAndSpill
信息: Finished spill 0
10:48:07 org.apache.hadoop.mapred.Task done
信息: Task:attempt_local_0001_m_ is done. And is in the process of commiting
10:48:08 org.apache.hadoop.mapred.JobClient monitorAndPrintJob
map 100% reduce 0%
10:48:10 org.apache.hadoop.mapred.LocalJobRunner$Job statusUpdate
10:48:10 org.apache.hadoop.mapred.Task sendDone
信息: Task 'attempt_local_0001_m_' done.
10:48:10 org.apache.hadoop.mapred.Task initialize
Using ResourceCalculatorPlugin : null
10:48:10 org.apache.hadoop.mapred.LocalJobRunner$Job statusUpdate
10:48:10 org.apache.hadoop.mapred.Merger$MergeQueue merge
信息: Merging 2 sorted segments
10:48:10 org.apache.hadoop.mapred.Merger$MergeQueue merge
信息: Down to the last merge-pass, with 2 segments left of total size: 294 bytes
10:48:10 org.apache.hadoop.mapred.LocalJobRunner$Job statusUpdate
1,1:(B:1,3)(B:2,2)(B:3,1)(A:1,1)(A:2,0)(A:3,2)
1,2:(A:1,1)(A:2,0)(A:3,2)(B:1,1)(B:2,1)(B:3,0)
2,1:(B:1,3)(B:2,2)(B:3,1)(A:1,-1)(A:2,3)(A:3,1)
2,2:(A:1,-1)(A:2,3)(A:3,1)(B:1,1)(B:2,1)(B:3,0)
10:48:10 org.apache.hadoop.mapred.Task done
信息: Task:attempt_local_0001_r_ is done. And is in the process of commiting
10:48:10 org.apache.hadoop.mapred.LocalJobRunner$Job statusUpdate
10:48:10 org.apache.hadoop.mapred.Task commit
信息: Task attempt_local_0001_r_ is allowed to commit now
10:48:10 org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter commitTask
信息: Saved output of task 'attempt_local_0001_r_' to hdfs://192.168.1.210:9000/user/hdfs/matrix/output
10:48:13 org.apache.hadoop.mapred.LocalJobRunner$Job statusUpdate
信息: reduce & reduce
10:48:13 org.apache.hadoop.mapred.Task sendDone
信息: Task 'attempt_local_0001_r_' done.
10:48:14 org.apache.hadoop.mapred.JobClient monitorAndPrintJob
map 100% reduce 100%
10:48:14 org.apache.hadoop.mapred.JobClient monitorAndPrintJob
信息: Job complete: job_local_0001
10:48:14 org.apache.hadoop.mapred.Counters log
信息: Counters: 19
10:48:14 org.apache.hadoop.mapred.Counters log
File Output Format Counters
10:48:14 org.apache.hadoop.mapred.Counters log
Bytes Written=24
10:48:14 org.apache.hadoop.mapred.Counters log
FileSystemCounters
10:48:14 org.apache.hadoop.mapred.Counters log
FILE_BYTES_READ=1713
10:48:14 org.apache.hadoop.mapred.Counters log
HDFS_BYTES_READ=75
10:48:14 org.apache.hadoop.mapred.Counters log
FILE_BYTES_WRITTEN=125314
10:48:14 org.apache.hadoop.mapred.Counters log
HDFS_BYTES_WRITTEN=114
10:48:14 org.apache.hadoop.mapred.Counters log
File Input Format Counters
10:48:14 org.apache.hadoop.mapred.Counters log
Bytes Read=30
10:48:14 org.apache.hadoop.mapred.Counters log
Map-Reduce Framework
10:48:14 org.apache.hadoop.mapred.Counters log
Map output materialized bytes=302
10:48:14 org.apache.hadoop.mapred.Counters log
Map input records=5
10:48:14 org.apache.hadoop.mapred.Counters log
Reduce shuffle bytes=0
10:48:14 org.apache.hadoop.mapred.Counters log
Spilled Records=48
10:48:14 org.apache.hadoop.mapred.Counters log
Map output bytes=242
10:48:14 org.apache.hadoop.mapred.Counters log
Total committed heap usage (bytes)=
10:48:14 org.apache.hadoop.mapred.Counters log
SPLIT_RAW_BYTES=220
10:48:14 org.apache.hadoop.mapred.Counters log
Combine input records=0
10:48:14 org.apache.hadoop.mapred.Counters log
Reduce input records=24
10:48:14 org.apache.hadoop.mapred.Counters log
Reduce input groups=4
10:48:14 org.apache.hadoop.mapred.Counters log
Combine output records=0
10:48:14 org.apache.hadoop.mapred.Counters log
Reduce output records=4
10:48:14 org.apache.hadoop.mapred.Counters log
Map output records=24
4. 稀疏矩阵乘法的MapReduce计算
我们在用矩阵处理真实数据的时候,一般都是非常稀疏矩阵,为了节省存储空间,通常只会存储非0的数据。
下面我们来做一个稀疏矩阵:
R语言的实现矩阵乘法
新建2个矩阵数据文件sm1.csv, sm2.csv
修改启动程序:MainRun.java
新建MR程序:SparseMartrixMultiply.java
1). R语言的实现矩阵乘法
& m1&-matrix(c(1,0,0,3,2,5,0,4,0,0,0,1,4,7,1,2),nrow=4,byrow=TRUE);m1
[,1] [,2] [,3] [,4]
& m2&-matrix(c(5,0,0,2,0,0,3,1),nrow=4,byrow=TRUE);m2
& m3&-m1 %*% m2;m3
2).新建2个稀疏矩阵数据文件sm1.csv, sm2.csv
只存储非0的数据,3列存储,第一列“原矩阵行”,第二列“原矩阵列”,第三列“原矩阵值”。
3).修改启动程序:MainRun.java
增加SparseMartrixMultiply的启动配置
public static void main(String[] args) {
sparseMartrixMultiply();
public static void sparseMartrixMultiply() {
Map&String, String& path = new HashMap&String, String&();
path.put(&m1&, &logfile/matrix/sm1.csv&);// 本地的数据文件
path.put(&m2&, &logfile/matrix/sm2.csv&);
path.put(&input&, HDFS + &/user/hdfs/matrix&);// HDFS的目录
path.put(&input1&, HDFS + &/user/hdfs/matrix/m1&);
path.put(&input2&, HDFS + &/user/hdfs/matrix/m2&);
path.put(&output&, HDFS + &/user/hdfs/matrix/output&);
SparseMartrixMultiply.run(path);// 启动程序
} catch (Exception e) {
e.printStackTrace();
System.exit(0);
4). 新建MR程序:SparseMartrixMultiply.java
map函数有修改,reduce函数没有变化
去掉判断所在行和列的变量
package org.conan.myhadoop.
import java.io.IOE
import java.util.HashM
import java.util.I
import java.util.M
import org.apache.hadoop.fs.P
import org.apache.hadoop.io.IntW
import org.apache.hadoop.io.LongW
import org.apache.hadoop.io.T
import org.apache.hadoop.mapred.JobC
import org.apache.hadoop.mapreduce.J
import org.apache.hadoop.mapreduce.M
import org.apache.hadoop.mapreduce.R
import org.apache.hadoop.mapreduce.lib.input.FileInputF
import org.apache.hadoop.mapreduce.lib.input.FileS
import org.apache.hadoop.mapreduce.lib.input.TextInputF
import org.apache.hadoop.mapreduce.lib.output.FileOutputF
import org.apache.hadoop.mapreduce.lib.output.TextOutputF
import org.conan.myhadoop.hdfs.HdfsDAO;
public class SparseMartrixMultiply {
public static class SparseMatrixMapper extends Mapper&LongWritable, Text, Text, Text& {
private S// m1 or m2
private int rowNum = 4;// 矩阵A的行数
private int colNum = 2;// 矩阵B的列数
protected void setup(Context context) throws IOException, InterruptedException {
FileSplit split = (FileSplit) context.getInputSplit();
flag = split.getPath().getName();// 判断读的数据集
public void map(LongWritable key, Text values, Context context) throws IOException, InterruptedException {
String[] tokens = MainRun.DELIMITER.split(values.toString());
if (flag.equals(&m1&)) {
String row = tokens[0];
String col = tokens[1];
String val = tokens[2];
for (int i = 1; i &= colN i++) {
Text k = new Text(row + &,& + i);
Text v = new Text(&A:& + col + &,& + val);
context.write(k, v);
System.out.println(k.toString() + &
& + v.toString());
} else if (flag.equals(&m2&)) {
String row = tokens[0];
String col = tokens[1];
String val = tokens[2];
for (int i = 1; i &= rowN i++) {
Text k = new Text(i + &,& + col);
Text v = new Text(&B:& + row + &,& + val);
context.write(k, v);
System.out.println(k.toString() + &
& + v.toString());
public static class SparseMatrixReducer extends Reducer&Text, Text, Text, IntWritable& {
public void reduce(Text key, Iterable&Text& values, Context context) throws IOException, InterruptedException {
Map&String, String& mapA = new HashMap&String, String&();
Map&String, String& mapB = new HashMap&String, String&();
System.out.print(key.toString() + &:&);
for (Text line : values) {
String val = line.toString();
System.out.print(&(& + val + &)&);
if (val.startsWith(&A:&)) {
String[] kv = MainRun.DELIMITER.split(val.substring(2));
mapA.put(kv[0], kv[1]);
// System.out.println(&A:& + kv[0] + &,& + kv[1]);
} else if (val.startsWith(&B:&)) {
String[] kv = MainRun.DELIMITER.split(val.substring(2));
mapB.put(kv[0], kv[1]);
// System.out.println(&B:& + kv[0] + &,& + kv[1]);
int result = 0;
Iterator&String& iter = mapA.keySet().iterator();
while (iter.hasNext()) {
String mapk = iter.next();
String bVal = mapB.containsKey(mapk) ? mapB.get(mapk) : &0&;
result += Integer.parseInt(mapA.get(mapk)) * Integer.parseInt(bVal);
context.write(key, new IntWritable(result));
System.out.println();
// System.out.println(&C:& + key.toString() + &,& + result);
public static void run(Map&String, String& path) throws IOException, InterruptedException, ClassNotFoundException {
JobConf conf = MainRun.config();
String input = path.get(&input&);
String input1 = path.get(&input1&);
String input2 = path.get(&input2&);
String output = path.get(&output&);
HdfsDAO hdfs = new HdfsDAO(MainRun.HDFS, conf);
hdfs.rmr(input);
hdfs.mkdirs(input);
hdfs.copyFile(path.get(&m1&), input1);
hdfs.copyFile(path.get(&m2&), input2);
Job job = new Job(conf);
job.setJarByClass(MartrixMultiply.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
job.setMapperClass(SparseMatrixMapper.class);
job.setReducerClass(SparseMatrixReducer.class);
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
FileInputFormat.setInputPaths(job, new Path(input1), new Path(input2));// 加载2个输入数据集
FileOutputFormat.setOutputPath(job, new Path(output));
job.waitForCompletion(true);
Delete: hdfs://192.168.1.210:9000/user/hdfs/matrix
Create: hdfs://192.168.1.210:9000/user/hdfs/matrix
copy from: logfile/matrix/sm1.csv to hdfs://192.168.1.210:9000/user/hdfs/matrix/m1
copy from: logfile/matrix/sm2.csv to hdfs://192.168.1.210:9000/user/hdfs/matrix/m2
11:57:31 org.apache.hadoop.util.NativeCodeLoader &clinit&
警告: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
11:57:31 org.apache.hadoop.mapred.JobClient copyAndConfigureFiles
警告: Use GenericOptionsParser for parsing the arguments. Applications should implement Tool for the same.
11:57:31 org.apache.hadoop.mapred.JobClient copyAndConfigureFiles
警告: No job jar file set.
User classes may not be found. See JobConf(Class) or JobConf#setJar(String).
11:57:31 org.apache.hadoop.mapreduce.lib.input.FileInputFormat listStatus
信息: Total input paths to process : 2
11:57:31 org.apache.press.snappy.LoadSnappy &clinit&
警告: Snappy native library not loaded
11:57:31 org.apache.hadoop.mapred.JobClient monitorAndPrintJob
信息: Running job: job_local_0001
11:57:31 org.apache.hadoop.mapred.Task initialize
Using ResourceCalculatorPlugin : null
11:57:31 org.apache.hadoop.mapred.MapTask$MapOutputBuffer &init&
信息: io.sort.mb = 100
11:57:31 org.apache.hadoop.mapred.MapTask$MapOutputBuffer &init&
信息: data buffer = 14720
11:57:31 org.apache.hadoop.mapred.MapTask$MapOutputBuffer &init&
信息: record buffer = 680
11:57:31 org.apache.hadoop.mapred.MapTask$MapOutputBuffer flush
信息: Starting flush of map output
11:57:31 org.apache.hadoop.mapred.MapTask$MapOutputBuffer sortAndSpill
信息: Finished spill 0
11:57:31 org.apache.hadoop.mapred.Task done
信息: Task:attempt_local_0001_m_ is done. And is in the process of commiting
11:57:32 org.apache.hadoop.mapred.JobClient monitorAndPrintJob
map 0% reduce 0%
11:57:34 org.apache.hadoop.mapred.LocalJobRunner$Job statusUpdate
11:57:34 org.apache.hadoop.mapred.Task sendDone
信息: Task 'attempt_local_0001_m_' done.
11:57:34 org.apache.hadoop.mapred.Task initialize
Using ResourceCalculatorPlugin : null
11:57:34 org.apache.hadoop.mapred.MapTask$MapOutputBuffer &init&
信息: io.sort.mb = 100
11:57:34 org.apache.hadoop.mapred.MapTask$MapOutputBuffer &init&
信息: data buffer = 14720
11:57:34 org.apache.hadoop.mapred.MapTask$MapOutputBuffer &init&
信息: record buffer = 680
11:57:34 org.apache.hadoop.mapred.MapTask$MapOutputBuffer flush
信息: Starting flush of map output
11:57:34 org.apache.hadoop.mapred.MapTask$MapOutputBuffer sortAndSpill
信息: Finished spill 0
11:57:34 org.apache.hadoop.mapred.Task done
信息: Task:attempt_local_0001_m_ is done. And is in the process of commiting
11:57:35 org.apache.hadoop.mapred.JobClient monitorAndPrintJob
map 100% reduce 0%
11:57:37 org.apache.hadoop.mapred.LocalJobRunner$Job statusUpdate
11:57:37 org.apache.hadoop.mapred.Task sendDone
信息: Task 'attempt_local_0001_m_' done.
11:57:37 org.apache.hadoop.mapred.Task initialize
Using ResourceCalculatorPlugin : null
11:57:37 org.apache.hadoop.mapred.LocalJobRunner$Job statusUpdate
11:57:37 org.apache.hadoop.mapred.Merger$MergeQueue merge
信息: Merging 2 sorted segments
11:57:37 org.apache.hadoop.mapred.Merger$MergeQueue merge
信息: Down to the last merge-pass, with 2 segments left of total size: 436 bytes
11:57:37 org.apache.hadoop.mapred.LocalJobRunner$Job statusUpdate
1,1:(B:1,5)(B:4,3)(A:1,1)(A:4,3)
1,2:(A:1,1)(A:4,3)(B:2,2)(B:4,1)
2,1:(B:1,5)(B:4,3)(A:1,2)(A:2,5)(A:4,4)
2,2:(A:1,2)(A:2,5)(A:4,4)(B:4,1)(B:2,2)
3,1:(B:1,5)(B:4,3)(A:4,1)
3,2:(A:4,1)(B:2,2)(B:4,1)
4,1:(B:4,3)(B:1,5)(A:1,4)(A:2,7)(A:3,1)(A:4,2)
4,2:(A:1,4)(A:2,7)(A:3,1)(A:4,2)(B:2,2)(B:4,1)
11:57:37 org.apache.hadoop.mapred.Task done
信息: Task:attempt_local_0001_r_ is done. And is in the process of commiting
11:57:37 org.apache.hadoop.mapred.LocalJobRunner$Job statusUpdate
11:57:37 org.apache.hadoop.mapred.Task commit
信息: Task attempt_local_0001_r_ is allowed to commit now
11:57:37 org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter commitTask
信息: Saved output of task 'attempt_local_0001_r_' to hdfs://192.168.1.210:9000/user/hdfs/matrix/output
11:57:40 org.apache.hadoop.mapred.LocalJobRunner$Job statusUpdate
信息: reduce & reduce
11:57:40 org.apache.hadoop.mapred.Task sendDone
信息: Task 'attempt_local_0001_r_' done.
11:57:41 org.apache.hadoop.mapred.JobClient monitorAndPrintJob
map 100% reduce 100%
11:57:41 org.apache.hadoop.mapred.JobClient monitorAndPrintJob
信息: Job complete: job_local_0001
11:57:41 org.apache.hadoop.mapred.Counters log
信息: Counters: 19
11:57:41 org.apache.hadoop.mapred.Counters log
File Output Format Counters
11:57:41 org.apache.hadoop.mapred.Counters log
Bytes Written=53
11:57:41 org.apache.hadoop.mapred.Counters log
FileSystemCounters
11:57:41 org.apache.hadoop.mapred.Counters log
FILE_BYTES_READ=2503
11:57:41 org.apache.hadoop.mapred.Counters log
HDFS_BYTES_READ=266
11:57:41 org.apache.hadoop.mapred.Counters log
FILE_BYTES_WRITTEN=126274
11:57:41 org.apache.hadoop.mapred.Counters log
HDFS_BYTES_WRITTEN=347
11:57:41 org.apache.hadoop.mapred.Counters log
File Input Format Counters
11:57:41 org.apache.hadoop.mapred.Counters log
Bytes Read=98
11:57:41 org.apache.hadoop.mapred.Counters log
Map-Reduce Framework
11:57:41 org.apache.hadoop.mapred.Counters log
Map output materialized bytes=444
11:57:41 org.apache.hadoop.mapred.Counters log
Map input records=14
11:57:41 org.apache.hadoop.mapred.Counters log
Reduce shuffle bytes=0
11:57:41 org.apache.hadoop.mapred.Counters log
Spilled Records=72
11:57:41 org.apache.hadoop.mapred.Counters log
Map output bytes=360
11:57:41 org.apache.hadoop.mapred.Counters log
Total committed heap usage (bytes)=
11:57:41 org.apache.hadoop.mapred.Counters log
SPLIT_RAW_BYTES=220
11:57:41 org.apache.hadoop.mapred.Counters log
Combine input records=0
11:57:41 org.apache.hadoop.mapred.Counters log
Reduce input records=36
11:57:41 org.apache.hadoop.mapred.Counters log
Reduce input groups=8
11:57:41 org.apache.hadoop.mapred.Counters log
Combine output records=0
11:57:41 org.apache.hadoop.mapred.Counters log
Reduce output records=8
11:57:41 org.apache.hadoop.mapred.Counters log
Map output records=36
程序源代码,已上传到github:
这样就用MapReduce的程序,实现了矩阵的乘法!有了矩阵计算的基础,接下来,我们就可以做更多的事情了!
参考文章:
已发表评论数()
请填写推刊名
描述不能大于100个字符!
权限设置: 公开
仅自己可见
正文不准确
标题不准确
排版有问题
主题不准确
没有分页内容
图片无法显示
视频无法显示
与原文不一致

我要回帖

更多关于 r语言 向量乘法 的文章

 

随机推荐