Welcome

首页 / 软件开发 / 数据结构与算法 / MapReduce实现矩阵乘法:实现代码

MapReduce实现矩阵乘法:实现代码2014-12-12编程环境:

java version "1.7.0_40"

Eclipse Kepler

Windows7 x64

Ubuntu 12.04 LTS

Hadoop2.2.0

Vmware 9.0.0 build-812388

输入数据:

A矩阵存放地址:hdfs://singlehadoop:8020/workspace/dataguru/hadoopdev/week09/matrixmultiply/matrixA/matrixa

A矩阵内容:
3 4 6
4 0 8

matrixa文件已处理为(x,y,value)格式:

0 0 3

0 1 4

0 2 6

1 0 4

1 1 0

1 2 8

B矩阵存放地址:hdfs://singlehadoop:8020/workspace/dataguru/hadoopdev/week09/matrixmultiply/matrixB/matrixb

B矩阵内容:
2 3
3 0
4 1

matrixb文件已处理为(x,y,value)格式:

0 0 2

0 1 3

1 0 3

1 1 0

2 0 4

2 1 1

实现代码:

一共三个类:

驱动类MMDriver

Map类MMMapper

Reduce类MMReducer

大家可根据个人习惯合并成一个类使用。

package dataguru.matrixmultiply;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.FileSystem;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;public class MMDriver {public static void main(String[] args) throws Exception {// set configurationConfiguration conf = new Configuration();// create jobJob job = new Job(conf,"MatrixMultiply");job.setJarByClass(dataguru.matrixmultiply.MMDriver.class);// specify Mapper & Reducerjob.setMapperClass(dataguru.matrixmultiply.MMMapper.class);job.setReducerClass(dataguru.matrixmultiply.MMReducer.class);// specify output types of mapper and reducerjob.setOutputKeyClass(Text.class);job.setOutputValueClass(Text.class);job.setMapOutputKeyClass(Text.class);job.setMapOutputValueClass(Text.class);// specify input and output DIRECTORIESPath inPathA = new Path("hdfs://singlehadoop:8020/workspace/dataguru/hadoopdev/week09/matrixmultiply/matrixA");Path inPathB = new Path("hdfs://singlehadoop:8020/workspace/dataguru/hadoopdev/week09/matrixmultiply/matrixB");Path outPath = new Path("hdfs://singlehadoop:8020/workspace/dataguru/hadoopdev/week09/matrixmultiply/matrixC");FileInputFormat.addInputPath(job, inPathA);FileInputFormat.addInputPath(job, inPathB);FileOutputFormat.setOutputPath(job,outPath);// delete output directorytry{FileSystem hdfs = outPath.getFileSystem(conf);if(hdfs.exists(outPath))hdfs.delete(outPath);hdfs.close();} catch (Exception e){e.printStackTrace();return ;}// run the jobSystem.exit(job.waitForCompletion(true) ? 0 : 1);}}
package dataguru.matrixmultiply;import java.io.IOException;import java.util.StringTokenizer;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Mapper;import org.apache.hadoop.mapreduce.lib.input.FileSplit;public class MMMapper extends Mapper<Object, Text, Text, Text> {private String tag; //current matrixprivate int crow = 2;// A矩阵的行数private int ccol = 2;// B矩阵的列数@Overrideprotected void setup(Context context) throws IOException,InterruptedException {// TODO get inputpath of input data, set to tagFileSplit fs = (FileSplit)context.getInputSplit();tag = fs.getPath().getParent().getName();}/*** input data include two matrix files*/public void map(Object key, Text value, Context context)throws IOException, InterruptedException {StringTokenizer str = new StringTokenizer(value.toString());if ("matrixA".equals(tag)) { //left matrix,output key:x,ywhile (str.hasMoreTokens()) {String currentx = str.nextToken(); //x,y,value of current itemString currenty = str.nextToken();String currentValue = str.nextToken();for (int i = 0; i < ccol; i++) {Text outkey = new Text(currentx+","+i);Text outvalue = new Text("a,"+currenty+","+currentValue);context.write(outkey, outvalue);System.out.println(outkey+" | "+outvalue);}}}else if ("matrixB".equals(tag)) {while (str.hasMoreTokens()) {String currentx = str.nextToken(); //x,y,value of current itemString currenty = str.nextToken();String currentValue = str.nextToken();for (int i = 0; i < crow; i++) {Text outkey = new Text(i+","+currenty);Text outvalue = new Text("b,"+currentx+","+currentValue);context.write(outkey, outvalue);System.out.println(outkey+" | "+outvalue);}}}}}
package dataguru.matrixmultiply;import java.io.IOException;import java.util.HashMap;import java.util.Iterator;import java.util.Map;import java.util.StringTokenizer;import org.apache.hadoop.io.IntWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Reducer;import org.apache.hadoop.mapreduce.Reducer.Context;public class MMReducer extends Reducer<Text, Text, Text, Text> {public void reduce(Text key, Iterable<Text> values, Context context)throws IOException, InterruptedException {Map<String,String> matrixa = new HashMap<String,String>();Map<String,String> matrixb = new HashMap<String,String>();for (Text val : values) { //values example : b,0,2 or a,0,4StringTokenizer str = new StringTokenizer(val.toString(),",");String sourceMatrix = str.nextToken();if ("a".equals(sourceMatrix)) {matrixa.put(str.nextToken(), str.nextToken()); //(0,4)}if ("b".equals(sourceMatrix)) {matrixb.put(str.nextToken(), str.nextToken()); //(0,2)}}int result = 0;Iterator<String> iter = matrixa.keySet().iterator();while (iter.hasNext()) {String mapkey = iter.next();result += Integer.parseInt(matrixa.get(mapkey)) * Integer.parseInt(matrixb.get(mapkey));}context.write(key, new Text(String.valueOf(result)));}}
最终输出结果:

0,042
0,115
1,040
1,120

作者:csdn博客 u014512124