public class WordCount extends Configured implements Tool {
public static class MapClass extends MapReduceBase implements Mapper<LongWritable, Text, Text, IntWritable> {
private final static IntWritable one = new IntWritable(1); private Text word = new Text();
public void map(LongWritable key, Text value, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException { String line = value.toString(); StringTokenizer itr = new StringTokenizer(line); while (itr.hasMoreTokens()) { word.set(itr.nextToken()); output.collect(word, one); } } }
/** * A reducer class that just emits the sum of the input values. */ public static class Reduce extends MapReduceBase implements Reducer<Text, IntWritable, Text, IntWritable> {
public void reduce(Text key, Iterator<IntWritable> values, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException { int sum = 0; while (values.hasNext()) { sum += values.next().get(); } output.collect(key, new IntWritable(sum)); } }
/** * The main driver for word count map/reduce program. Invoke this method to * submit the map/reduce job. * * @throws IOException * When there is communication problems with the job tracker. */ public int run(String[] args) throws Exception { JobConf conf = new JobConf(getConf(), WordCount.class); conf.setJobName("wordcount");
// the keys are words (strings) conf.setOutputKeyClass(Text.class); // the values are counts (ints) conf.setOutputValueClass(IntWritable.class);
List<String> other_args = new ArrayList<String>(); for (int i = 0; i < args.length; ++i) { try { if ("-m".equals(args[i])) { conf.setNumMapTasks(Integer.parseInt(args[++i])); } else if ("-r".equals(args[i])) { conf.setNumReduceTasks(Integer.parseInt(args[++i])); } else { other_args.add(args[i]); } } catch (NumberFormatException except) { System.out.println("ERROR: Integer expected instead of " + args[i]); return printUsage(); } catch (ArrayIndexOutOfBoundsException except) { System.out.println("ERROR: Required parameter missing from " + args[i - 1]); return printUsage(); } }
// Make sure there are exactly 2 parameters left. if (other_args.size() != 2) { System.out.println("ERROR: Wrong number of parameters: " + other_args.size() + " instead of 2."); return printUsage(); } FileInputFormat.setInputPaths(conf, other_args.get(0)); FileOutputFormat.setOutputPath(conf, new Path(other_args.get(1)));
JobClient.runJob(conf); return 0; }
public static void main(String[] args) throws Exception { int res = ToolRunner.run(new Configuration(), new WordCount(), args); System.exit(res); }
} 2. 保证hadoop集群是配置好了的,单机的也好。新建一个目录,比如 /home/admin/WordCount 编译WordCount.java程序。javac -classpath /home/admin/hadoop/hadoop-0.19.1-core.jar WordCount.java -d /home/admin/WordCount 3. 编译完后在/home/admin/WordCount目录会发现三个class文件 WordCount.class,WordCount$Map.class,WordCount$Reduce.class。 cd 进入 /home/admin/WordCount目录,然后执行:jar cvf WordCount.jar *.class 就会生成 WordCount.jar 文件。 4. 构造一些输入数据 input1.txt和input2.txt的文件里面是一些单词。如下:[admin@host WordCount]$ cat input1.txt Hello, i love china are you ok? [admin@host WordCount]$ cat input2.txt hello, i love word You are ok 在hadoop上新建目录,和put程序运行所需要的输入文件:hadoop fs -mkdir /tmp/input hadoop fs -mkdir /tmp/output hadoop fs -put input1.txt /tmp/input/ hadoop fs -put input2.txt /tmp/input/ 5. 运行程序,会显示job运行时的一些信息。[admin@host WordCount]$ hadoop jar WordCount.jar WordCount /tmp/input /tmp/output 10/09/16 22:49:43 WARN mapred.JobClient: Use GenericOptionsParser for parsing the arguments. Applications should implement Tool for the same. 10/09/16 22:49:43 INFO mapred.FileInputFormat: Total input paths to process :2 10/09/16 22:49:43 INFO mapred.JobClient: Running job: job_201008171228_76165 10/09/16 22:49:44 INFO mapred.JobClient: map 0% reduce 0% 10/09/16 22:49:47 INFO mapred.JobClient: map 100% reduce 0% 10/09/16 22:49:54 INFO mapred.JobClient: map 100% reduce 100% 10/09/16 22:49:55 INFO mapred.JobClient: Job complete: job_201008171228_76165 10/09/16 22:49:55 INFO mapred.JobClient: Counters: 16 10/09/16 22:49:55 INFO mapred.JobClient: File Systems 10/09/16 22:49:55 INFO mapred.JobClient: HDFS bytes read=62 10/09/16 22:49:55 INFO mapred.JobClient: HDFS bytes written=73 10/09/16 22:49:55 INFO mapred.JobClient: Local bytes read=152 10/09/16 22:49:55 INFO mapred.JobClient: Local bytes written=366 10/09/16 22:49:55 INFO mapred.JobClient: Job Counters 10/09/16 22:49:55 INFO mapred.JobClient: Launched reduce tasks=1 10/09/16 22:49:55 INFO mapred.JobClient: Rack-local map tasks=2 10/09/16 22:49:55 INFO mapred.JobClient: Launched map tasks=2 10/09/16 22:49:55 INFO mapred.JobClient: Map-Reduce Framework 10/09/16 22:49:55 INFO mapred.JobClient: Reduce input groups=11 10/09/16 22:49:55 INFO mapred.JobClient: Combine output records=14 10/09/16 22:49:55 INFO mapred.JobClient: Map input records=4 10/09/16 22:49:55 INFO mapred.JobClient: Reduce output records=11 10/09/16 22:49:55 INFO mapred.JobClient: Map output bytes=118 10/09/16 22:49:55 INFO mapred.JobClient: Map input bytes=62 10/09/16 22:49:55 INFO mapred.JobClient: Combine input records=14 10/09/16 22:49:55 INFO mapred.JobClient: Map output records=14 10/09/16 22:49:55 INFO mapred.JobClient: Reduce input records=14 6. 查看运行结果[admin@host WordCount]$ hadoop fs -ls /tmp/output/ Found 2 items drwxr-x--- - admin admin 0 2010-09-16 22:43 /tmp/output/_logs -rw-r----- 1 admin admin 102 2010-09-16 22:44 /tmp/output/part-00000 [admin@host WordCount]$ hadoop fs -cat /tmp/output/part-00000 Hello, 1 You 1 are 2 china 1 hello, 1 i 2 love 2 ok 1 ok? 1 word 1 you 1