首页 / 操作系统 / Linux / MapReduce TotalOrderPartitioner 全局排序
我们知道Mapreduce框架在feed数据给reducer之前会对map output key排序,这种排序机制保证了每一个reducer局部有序,Hadoop 默认的partitioner是HashPartitioner,它依赖于output key的hashcode,使得相同key会去相同reducer,但是不保证全局有序,如果想要获得全局排序结果(比如获取top N, bottom N),就需要用到TotalOrderPartitioner了,它保证了相同key去相同reducer的同时也保证了全局有序。public class HashPartitioner<K, V> extends Partitioner<K, V> { /** Use {@link Object#hashCode()} to partition. */ public int getPartition(K key, V value, int numReduceTasks) { return (key.hashCode() & Integer.MAX_VALUE) % numReduceTasks; } }/** * Partitioner effecting a total order by reading split points from * an externally generated source. */ @InterfaceAudience.Public @InterfaceStability.Stable public class TotalOrderPartitioner<K extends WritableComparable<?>,V> extends Partitioner<K,V> implements Configurable { // by construction, we know if our keytype @SuppressWarnings("unchecked") // is memcmp-able and uses the trie public int getPartition(K key, V value, int numPartitions) { return partitions.findPartition(key); } }TotalOrderPartitioner依赖于一个partition file来distribute keys,partition file是一个实现计算好的sequence file,如果我们设置的reducer number是N,那么这个文件包含(N-1)个key分割点,并且是基于key comparator排好序的。TotalOrderPartitioner会检查每一个key属于哪一个reducer的范围内,然后决定分发给哪一个reducer。InputSampler类的writePartitionFile方法会对input files取样并创建partition file。有三种取样方法:1. RandomSampler 随机取样 2. IntervalSampler 从s个split里面按照一定间隔取样,通常适用于有序数据 3. SplitSampler 从s个split中选取前n条记录取样paritition file可以通过TotalOrderPartitioner.setPartitionFile(conf, partitionFile)来设置,在TotalOrderPartitioner instance创建的时候会调用setConf函数,这时会读入partition file中key值,如果key是BinaryComparable(可以认为是字符串类型)的话会构建trie,时间复杂度是O(n), n是树的深度。如果是非BinaryComparable类型就构建BinarySearchNode,用二分查找,时间复杂度O(log(n)),n是reduce数 boolean natOrder = conf.getBoolean(NATURAL_ORDER, true); if (natOrder && BinaryComparable.class.isAssignableFrom(keyClass)) { partitions = buildTrie((BinaryComparable[])splitPoints, 0, splitPoints.length, new byte[0], // Now that blocks of identical splitless trie nodes are // represented reentrantly, and we develop a leaf for any trie // node with only one split point, the only reason for a depth // limit is to refute stack overflow or bloat in the pathological // case where the split points are long and mostly look like bytes // iii...iixii...iii . Therefore, we make the default depth // limit large but not huge. conf.getInt(MAX_TRIE_DEPTH, 200)); } else { partitions = new BinarySearchNode(splitPoints, comparator); }示例程序import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.partition.InputSampler; import org.apache.hadoop.mapreduce.lib.partition.InputSampler.RandomSampler; import org.apache.hadoop.mapreduce.lib.partition.TotalOrderPartitioner;public class TotalSortMR {
public static int runTotalSortJob(String[] args) throws Exception { Path inputPath = new Path(args[0]); Path outputPath = new Path(args[1]); Path partitionFile = new Path(args[2]); int reduceNumber = Integer.parseInt(args[3]);