首页 / 操作系统 / Linux / Hadoop--两个简单的MapReduce程序
这周在学习Hadoop编程,以前看过《Hadoop权威指南》这本书,但是看完了HDFS这一章之后,后面的内容就难以再看懂了,说实话,之前一直对MapReduce程序敬而远之,毫不理解这种类型的程序的执行过程。这一周花了些时间看了Hadoop的实战,现在能够看懂简单的MapReduce程序,也能自己动手写几个简单的例子程序。相关阅读:Hadoop权威指南(中文版-带目录索引)PDF http://www.linuxidc.com/Linux/2013-05/84948.htm
Hadoop权威指南(中文第2版)PDF http://www.linuxidc.com/Linux/2012-07/65972.htm下面是两个简单的MapReduce程序,用到了一些简单的Hadoop知识点,总结如下文。源码下载:**************************************************************下载在Linux公社的1号FTP服务器里,下载地址:FTP地址:ftp://www.linuxidc.com/用户名:www.6688.cc密码:www.linuxidc.com在 2013年LinuxIDC.com8月Hadoop--两个简单的MapReduce程序下载方法见 http://www.linuxidc.net/thread-1187-1-1.html**************************************************************例子一 求最大数问题描述是这样的,从一系列数中,求出最大的那一个。这个需求应该说是很简单的,如果不用MapReduce来实现,普通的Java程序要实现这个需求,应该说是轻而易举的,几行代码就能搞定。这里用这个例子是想说说Hadoop中的Combiner的用法。我们知道,Hadoop使用Mapper函数将数据处理成一个一个的<key, value>键值对,再在网络节点间对这些键值对进行整理(shuffle),然后使用Reducer函数处理这些键值对,并最终将结果输出。那么可以这样想,如果我们有1亿个数据(Hadoop就是为大数据而生),Mapper函数将会产生1亿个键值对在网络中进行传输,如果我们只是要求出这1亿个数当中的最大值,那么显然,Mapper只需要输出它所知道的最大值即可。这样一来可以减轻网络带宽的压力,二来,可以减轻Reducer的压力,提高程序的效率。如果Reducer只是运行简单的诸如求最大值、最小值、计数,那么我们可以使用Combiner,但是,如果是求一组数的平均值,千万别用Combiner,道理很简单,你自己分析看。Combiner可以看作是Reducer的帮手,或者看成是Mapper端的Reducer,它能减少Mapper函数的输出从而减少网络数据传输并能减少Reducer上的负载。下面是Combiner的例子程序。程序的输入是这样的:12
5
9
21
43
99
65
32
10MapReduce程序需要找到这一组数字中的最大值99,Mapper函数是这样的:public class MyMapper extends Mapper<Object, Text, Text, IntWritable>{
@Override
protected void map(Object key, Text value,Context context)throws IOException, InterruptedException {
// TODO Auto-generated method stub
context.write(new Text(), new IntWritable(Integer.parseInt(value.toString())));
}
}Mapper函数非常简单,它是负责读取HDFS中的数据的,负责将这些数据组成<key, value>对,然后传输给Reducer函数。Reducer函数如下:public class MyReducer extends Reducer<Text, IntWritable, Text, IntWritable>{ @Override
protected void reduce(Text key, Iterable<IntWritable> values,Context context)throws IOException, InterruptedException {
// TODO Auto-generated method stub
int temp = Integer.MIN_VALUE;
for(IntWritable value : values){
if(value.get() > temp){
temp = value.get();
}
}
context.write(new Text(), new IntWritable(temp));
}
}Reducer函数也很简单,就是负责找到从Mapper端传来的数据中找到最大值。那么在Mapper函数与Reducer函数之间,有个Combiner,它的代码是这样的:public class MyCombiner extends Reducer<Text, IntWritable, Text, IntWritable> { @Override
protected void reduce(Text key, Iterable<IntWritable> values,Context context)throws IOException, InterruptedException {
// TODO Auto-generated method stub
int temp = Integer.MIN_VALUE;
for(IntWritable value : values){
if(value.get() > temp){
temp = value.get();
}
}
context.write(new Text(), new IntWritable(temp));
}
}我们可以看到,combiner也是继承了Reducer类,其写法与写reduce函数一样,reduce和combiner对外的功能是一样的,只是使用时的位置和上下文(Context)不一样而已。定义好了自己的Combiner函数之后,需要在Job类中加入一行代码,告诉Job你使用要在Mapper端使用Combiner:job.setCombinerClass(MyCombiner.class);那么这个求最大数的例子的Job类是这样的:public class MyMaxNum {
public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
Configuration conf = new Configuration();
Job job = new Job(conf,"My Max Num");
job.setJarByClass(MyMaxNum.class);
job.setMapperClass(MyMapper.class);
job.setReducerClass(MyReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
job.setCombinerClass(MyCombiner.class);
FileInputFormat.addInputPath(job, new Path("/huhui/nums.txt"));
FileOutputFormat.setOutputPath(job, new Path("/output"));
System.exit(job.waitForCompletion(true) ? 0:1);
}
}当然你还可以对输出进行压缩。只要在函数中添加两行代码,就能对Reducer函数的输出结果进行压缩。当然这里没有必要对结果进行压缩,只是作为一个知识点而已。//对输出进行压缩
conf.setBoolean("mapred.output.compress", true);
conf.setClass("mapred.output.compression.codec", GzipCodec.class, CompressionCodec.class);接下来请看第2页精彩内容: http://www.linuxidc.com/Linux/2013-08/88631p2.htm