Welcome 微信登录
编程资源 图片资源库 蚂蚁家优选 PDF转换器

首页 / 操作系统 / Linux / 在Hadoop中重写FileInputFormat类以处理二进制格式存储的整数

最近开始使用MapReduce,发现网上大部分例子都是对文本数据进行处理的,也就是说在读取输入数据时直接使用默认的TextInputFormat进行处理即可。对于文本数据处理,这个类还是能满足一部分应用场景。但是如果要处理以二进制形式结构化记录存储的文件时,这些类就不再适合了。本文以一个简单的应用场景为例:对按照二进制格式存储的整数做频数统计。当然,也可以在此基础上实现排序之类的其他应用。实现该应用的主要难点就是如何处理输入数据。参考《权威指南·第三版》得知需要继承FileInputFormat这个类,并实现以下三个方法:class MyInputFormat extends FileInputFormat<Type1, Type2> {
 /*
  * 查询判断当前文件是否可以分块?"true"为可以分块,"false"表示不进行分块
  */
 protected boolean isSplitable(Configuration conf, Path path) {
 
 }
 
 /*
  * MapReduce的客户端调用此方法得到所有的分块,然后将分块发送给MapReduce服务端。
  * 注意,分块中不包含实际的信息,而只是对实际信息的分块信息。具体的说,每个分块中
  * 包含当前分块对应的文件路径,当前分块在该文件中起始位置,当前分块的长度以及对应的
  * 实际数据所在的机器列表。在实现这个函数时,将这些信息填上即可。
  * */
 public List<InputSplit> getSplits(Configuration conf) throws IOException {
 } /*
  * 类RecordReader是用来创建传给map函数的Key-Value序列,传给此类的参数有两个:一个分块(split)和作业的配置信息(context).
  * 在Mapper的run函数中可以看到MapReduce框架执行Map的逻辑:
  * public void run(Context context) throws IOException, InterruptedException {
  *   setup(context);
  *   调用RecordReader方法的nextKeyValue,生成新的键值对。如果当前分块(Split)中已经处理完毕了,则nextKeyValue会返回false.退出run函数
  *  while (context.nextKeyValue()) { 
  *   map(context.getCurrentKey(), context.getCurrentValue(), context);
  *  }
  *  cleanup(context);
  * }
  **/
 public RecordReader<LongWritable, IntWritable> createRecordReader(InputSplit split, TaskAttemptContext context)
   throws IOException, InterruptedException {
 }
}--------------------------------------分割线 --------------------------------------Ubuntu 13.04上搭建Hadoop环境 http://www.linuxidc.com/Linux/2013-06/86106.htmUbuntu 12.10 +Hadoop 1.2.1版本集群配置 http://www.linuxidc.com/Linux/2013-09/90600.htmUbuntu上搭建Hadoop环境(单机模式+伪分布模式) http://www.linuxidc.com/Linux/2013-01/77681.htmUbuntu下Hadoop环境的配置 http://www.linuxidc.com/Linux/2012-11/74539.htm单机版搭建Hadoop环境图文教程详解 http://www.linuxidc.com/Linux/2012-02/53927.htm--------------------------------------分割线 --------------------------------------在RecordReader函数中实现以下几个接口:public class BinRecordReader extends RecordReader<LongWritable, IntWritable> {
 /*关闭文件流
  * */
 public void close() {} /*
  * 获取处理进度
  **/
 public float getProgress() {} /*
  * 获取当前的Key
  * */
 public LongWritable getCurrentKey() throws IOException,
 InterruptedException {} /* 获取当前的Value
  * */
 public IntWritable getCurrentValue() throws IOException,InterruptedException {} /*
  * 进行初始化工作,打开文件流,根据分块信息设置起始位置和长度等等
  * */
 public void initialize(InputSplit inputSplit, TaskAttemptContext context)
   throws IOException, InterruptedException {} /*生成下一个键值对
  **/
 public boolean nextKeyValue() throws IOException, InterruptedException {
 }
}更多详情见请继续阅读下一页的精彩内容: http://www.linuxidc.com/Linux/2014-07/104417p2.htm