MapReduce实现排序功能2014-12-10 csdn博客 青春张开期间遇到了无法转value的值为int型,我采用try catch解决str22
str11
str33
str14
str47
str25
str39用的 隔开,得到结果str11,4str2 2,5str3 3,9str4 7我这里map,reduce都是单独出来的类,用了自定义的key
package com.kane.mr;import java.io.DataInput;import java.io.DataOutput;import java.io.IOException;import org.apache.hadoop.io.WritableComparable;import com.j_spaces.obf.fi;//str2 2//str1 1//str3 3//str1 4//str4 7//str2 5//str3 9public class IntPair implements WritableComparable<IntPair>{public String getFirstKey() {return firstKey;}public void setFirstKey(String firstKey) {this.firstKey = firstKey;}public int getSecondKey() {return secondKey;}public void setSecondKey(int secondKey) {this.secondKey = secondKey;}private String firstKey;//str1private int secondKey;//1@Overridepublic void write(DataOutput out) throws IOException {out.writeUTF(firstKey);out.writeInt(secondKey);}@Overridepublic void readFields(DataInput in) throws IOException {firstKey=in.readUTF();secondKey=in.readInt();}//这里做比较,另一个是自身本类,对key进行排序@Overridepublic int compareTo(IntPair o) {// int first=o.getFirstKey().compareTo(this.firstKey);// if (first!=0) {// return first;// }// else {// return o.getSecondKey()-this.secondKey;// }return o.getFirstKey().compareTo(this.getFirstKey());}}package com.kane.mr;import java.io.IOException;import org.apache.hadoop.io.IntWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Mapper;public class SortMapper extends Mapper<Object,Text,IntPair,IntWritable>{public IntPair intPair=new IntPair();public IntWritable intWritable=new IntWritable(0);@Overrideprotected void map(Object key, Text value,//str1 1Context context)throws IOException, InterruptedException {//String[] values=value.toString().split("/t");System.out.println(value);int intValue;try {intValue = Integer.parseInt(value.toString());} catch (NumberFormatException e) {intValue=6;}//不加try catch总是读取value时,无法转成int型intPair.setFirstKey(key.toString());intPair.setSecondKey(intValue);intWritable.set(intValue);context.write(intPair, intWritable);// key(str2 2) 2}}package com.kane.mr;import java.io.IOException;import java.util.Iterator;import org.apache.hadoop.io.IntWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Reducer;public class SortReducer extends Reducer<IntPair, IntWritable, Text,Text>{@Overrideprotected void reduce(IntPair key, Iterable<IntWritable> values,Context context)throws IOException, InterruptedException {StringBuffer combineValue=new StringBuffer();Iterator<IntWritable> itr=values.iterator();while (itr.hasNext()) {int value=itr.next().get();combineValue.append(value+",");}context.write(new Text(key.getFirstKey()),new Text(combineValue.toString()));}}package com.kane.mr;import org.apache.hadoop.io.IntWritable;import org.apache.hadoop.mapreduce.Partitioner;public class PartionTest extends Partitioner<IntPair, IntWritable>{@Overridepublic int getPartition(IntPair key, IntWritable value, int numPartitions) {//reduce个数return (key.getFirstKey().hashCode()&Integer.MAX_VALUE%numPartitions);}}package com.kane.mr;import org.apache.hadoop.io.WritableComparable;import org.apache.hadoop.io.WritableComparator;public class TextComparator extends WritableComparator{public TextComparator(){super(IntPair.class,true);}@Overridepublic int compare(WritableComparable a, WritableComparable b) {IntPair o1=(IntPair)a;IntPair o2=(IntPair)b;return o1.getFirstKey().compareTo(o2.getFirstKey());}}package com.kane.mr;import org.apache.hadoop.io.WritableComparable;import org.apache.hadoop.io.WritableComparator;@SuppressWarnings("rawtypes")public class TextIntCompartor extends WritableComparator{protected TextIntCompartor() {super(IntPair.class,true);}@Overridepublic int compare(WritableComparable a,WritableComparable b) {IntPair o1=(IntPair)a;IntPair o2=(IntPair)b;int first=o1.getFirstKey().compareTo(o2.getFirstKey());if (first!=0) {return first;}else {return o1.getSecondKey()-o2.getSecondKey();}}}package com.kane.mr;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.IntWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import org.apache.hadoop.util.GenericOptionsParser;public class SortMain {public static void main(String[] args) throws Exception{Configuration conf = new Configuration(); String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs(); if (otherArgs.length != 2) { System.err.println("Usage: wordcount <in> <out>"); System.exit(2); } Job job = new Job(conf, "Sort"); job.setJarByClass(SortMain.class); job.setInputFormatClass(KeyValueTextInputFormat.class);//设定输入的格式是key(中间 隔开)value job.setMapperClass(SortMapper.class); //job.setCombinerClass(IntSumReducer.class); job.setReducerClass(SortReducer.class);job.setMapOutputKeyClass(IntPair.class); job.setMapOutputValueClass(IntWritable.class);job.setSortComparatorClass(TextIntCompartor.class); job.setGroupingComparatorClass(TextComparator.class);//以key 进行group by job.setPartitionerClass(PartionTest.class);job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class);FileInputFormat.addInputPath(job, new Path(otherArgs[0]));//输入参数,对应hadoop jar 对应类运行时在后面加的第一个参数 FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));//输出参数 System.exit(job.waitForCompletion(true) ? 0 : 1);}}
导出jar包放到hadoop下,然后讲sort.txt放入到hdfs中,然后用hadoop jar KaneTest/sort.jar com.kane.mr.SoetMain /kane/sort.txt /kane/output命令执行