Java中如何利用MultipleOutputs控制reduce输出路径2016-04-02
package com.mr.test;import java.io.IOException;import java.util.Iterator;import java.util.StringTokenizer;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.IntWritable;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapred.FileInputFormat;import org.apache.hadoop.mapred.FileOutputFormat;import org.apache.hadoop.mapred.JobClient;import org.apache.hadoop.mapred.JobConf;import org.apache.hadoop.mapred.Mapper;import org.apache.hadoop.mapred.OutputCollector;import org.apache.hadoop.mapred.Reducer;import org.apache.hadoop.mapred.Reporter;import org.apache.hadoop.mapred.TextInputFormat;import org.apache.hadoop.mapred.TextOutputFormat;import org.apache.hadoop.mapred.lib.MultipleOutputs;public class WordCount {public static class MyMap implementsMapper<LongWritable, Text, Text, IntWritable> {private final static IntWritable one = new IntWritable(1);private Text word = new Text();public void map(LongWritable key, Text value,OutputCollector<Text, IntWritable> output, Reporter reporter)throws IOException {String line = value.toString();StringTokenizer tokenizer = new StringTokenizer(line);while (tokenizer.hasMoreTokens()) {word.set(tokenizer.nextToken());output.collect(word, one);}}public void configure(JobConf arg0) {}public void close() throws IOException {// TODO Auto-generated method stub}} //public static class MyReduce implementsReducer<Text, IntWritable, Text, IntWritable> {private MultipleOutputs mos;private JobConf jobconf;public void reduce(Text key, Iterator<IntWritable> values,OutputCollector<Text, IntWritable> output, Reporter reporter)throws IOException {int sum = 0;while (values.hasNext()) {sum += values.next().get();}mos.getCollector(key.toString(), reporter).collect(key, new IntWritable(sum));//mos.getCollector(key.toString(),"test", reporter).collect(key, new IntWritable(sum));}public void configure(JobConf jobconf) {mos = new MultipleOutputs(jobconf);this.jobconf = jobconf;}public void close() throws IOException {mos.close();}}public static void main(String[] args) throws Exception {JobConf conf = new JobConf(WordCount.class);conf.setJobName("wordcount");conf.setOutputKeyClass(Text.class);conf.setOutputValueClass(IntWritable.class);conf.setMapperClass(MyMap.class);conf.setCombinerClass(MyReduce.class);conf.setReducerClass(MyReduce.class);conf.setInputFormat(TextInputFormat.class);conf.setOutputFormat(TextOutputFormat.class);System.out.println("args1:"+args[0]);System.out.println("args2:"+args[1]);FileInputFormat.setInputPaths(conf, new Path(args[0]));FileOutputFormat.setOutputPath(conf, new Path(args[1]));MultipleOutputs.addNamedOutput(conf, "test1", TextOutputFormat.class, LongWritable.class, Text.class);MultipleOutputs.addNamedOutput(conf, "test2", TextOutputFormat.class, LongWritable.class, Text.class);MultipleOutputs.addNamedOutput(conf, "test3", TextOutputFormat.class, LongWritable.class, Text.class);MultipleOutputs.addNamedOutput(conf, "test4", TextOutputFormat.class, LongWritable.class, Text.class);MultipleOutputs.addNamedOutput(conf, "test5", TextOutputFormat.class, LongWritable.class, Text.class);JobClient.runJob(conf);}}input.txt:test1test2test3test4test5output:-rw-r--r-- 2 test supergroup 0 2014-04-20 11:23 /chukwa/output/0419-10/_SUCCESSdrwxr-xr-x - test supergroup 0 2014-04-20 11:23 /chukwa/output/0419-10/_logs-rw-r--r-- 2 test supergroup 42 2014-04-20 11:23 /chukwa/output/0419-10/part-00000.lzo-rw-r--r-- 2 test supergroup 58 2014-04-20 11:23 /chukwa/output/0419-10/test1-m-00000.lzo-rw-r--r-- 2 test supergroup 58 2014-04-20 11:23 /chukwa/output/0419-10/test2-m-00000.lzo-rw-r--r-- 2 test supergroup 58 2014-04-20 11:23 /chukwa/output/0419-10/test3-m-00000.lzo-rw-r--r-- 2test supergroup 58 2014-04-20 11:23 /chukwa/output/0419-10/test4-m-00001.lzo-rw-r--r-- 2 test supergroup 58 2014-04-20 11:23 /chukwa/output/0419-10/test5-m-00001.lzo