首页 / 操作系统 / Linux / Hadoop 2.2.0 fsimage和edit logs的处理逻辑
在类org.apache.Hadoop.hdfs.server.namenode.NNStorageRetentionManager的purgeOldStorage()方法中描述了fsimage和edit logs的处理逻辑:一、找到存在于fsimage中的最小txid,删除比最小txid小的fsimage二、最小txid - dfs.namenode.num.extra.edits.retained = 可以删除txid集合三、可删除txid集合 > dfs.namenode.max.extra.edits.segments.retained 时,删除集合中的最小值相关阅读:Ubuntu 13.04上搭建Hadoop环境 http://www.linuxidc.com/Linux/2013-06/86106.htmUbuntu 12.10 +Hadoop 1.2.1版本集群配置 http://www.linuxidc.com/Linux/2013-09/90600.htmUbuntu上搭建Hadoop环境(单机模式+伪分布模式) http://www.linuxidc.com/Linux/2013-01/77681.htmUbuntu下Hadoop环境的配置 http://www.linuxidc.com/Linux/2012-11/74539.htm单机版搭建Hadoop环境图文教程详解 http://www.linuxidc.com/Linux/2012-02/53927.htm搭建Hadoop环境(在Winodws环境下用虚拟机虚拟两个Ubuntu系统进行搭建) http://www.linuxidc.com/Linux/2011-12/48894.htm完整代码: public void purgeOldStorage() throws IOException { FSImageTransactionalStorageInspector inspector = new FSImageTransactionalStorageInspector(); storage.inspectStorageDirs(inspector); long minImageTxId = getImageTxIdToRetain(inspector); purgeCheckpointsOlderThan(inspector, minImageTxId); // If fsimage_N is the image we want to keep, then we need to keep // all txns > N. We can remove anything < N+1, since fsimage_N // reflects the state up to and including N. However, we also // provide a "cushion" of older txns that we keep, which is // handy for HA, where a remote node may not have as many // new images. // // First, determine the target number of extra transactions to retain based // on the configured amount. long minimumRequiredTxId = minImageTxId + 1; long purgeLogsFrom = Math.max(0, minimumRequiredTxId - numExtraEditsToRetain);
ArrayList<EditLogInputStream> editLogs = new ArrayList<EditLogInputStream>(); purgeableLogs.selectInputStreams(editLogs, purgeLogsFrom, false, false); Collections.sort(editLogs, new Comparator<EditLogInputStream>() { @Override public int compare(EditLogInputStream a, EditLogInputStream b) { return ComparisonChain.start() .compare(a.getFirstTxId(), b.getFirstTxId()) .compare(a.getLastTxId(), b.getLastTxId()) .result(); } }); // Remove from consideration any edit logs that are in fact required. while (editLogs.size() > 0 && editLogs.get(editLogs.size() - 1).getFirstTxId() >= minimumRequiredTxId) { editLogs.remove(editLogs.size() - 1); } // Next, adjust the number of transactions to retain if doing so would mean // keeping too many segments around. while (editLogs.size() > maxExtraEditsSegmentsToRetain) { purgeLogsFrom = editLogs.get(0).getLastTxId() + 1; editLogs.remove(0); }
// Finally, ensure that we"re not trying to purge any transactions that we // actually need. if (purgeLogsFrom > minimumRequiredTxId) { throw new AssertionError("Should not purge more edits than required to " + "restore: " + purgeLogsFrom + " should be <= " + minimumRequiredTxId); }