Welcome 微信登录
编程资源 图片资源库 蚂蚁家优选 PDF转换器

首页 / 操作系统 / Linux / Python 写的Hadoop小程序

该程序是在python2.3上完成的,python版本间有差异。Mapper:import sysline_number = 0
tab_number = 0
pv_number = 0
clk_number = 0
if_compressed_tested = 0
if_compressed = 0#functions:
def compressed_stat(line):
    global line_number
    global tab_number
    global pv_number
    global clk_number
    try:
        line_number += 1
        line_split_list = line.split(" ")
        line_split_list_size = len(line_split_list)
        tab_number += (line_split_list_size - 1)
        index = 1
        while index < line_split_list_size:
            pv_clk_list = line_split_list[index].strip().split(" ")
            pv_number += int(pv_clk_list[0])
            clk_number += int(pv_clk_list[1])
            index += 1
    except ValueError:
        print line," ERROR"
def before_compress_stat(line):
    global line_number
    global pv_number
    global clk_number
    try:
        line_number += 1
        line = line.strip()
        line_split_list = line.split(" ")
        pv_number += int(line_split_list[0])
        clk_number += int(line_split_list[1])
    except ValueError:
        print line," ERROR"
#end functions  
   for line in sys.stdin:
    try:
        line = line.strip()
        if if_compressed_tested == 0:
            if_compressed_tested = 1
            if line.find(" ") > 0:
                if_compressed = 1
        if if_compressed == 0:
            before_compress_stat(line)
        else:
            compressed_stat(line)
    except ValueError:
        pass
if if_compressed == 1:
    print ("%ld %ld %ld %ld"%(line_number, tab_number, pv_number,clk_number))
else:    print ("%ld %ld %ld"%(line_number,pv_number,clk_number))Reducer:
import sys
line_number = 0
tab_number = 0
pv_number = 0
clk_number = 0
if_compressed_tested = 0
if_compressed = 0def compressed_stat(line):
    global line_number
    global tab_number
    global pv_number
    global clk_number
    pv_clk_list = line.split(" ")
    if len(pv_clk_list) != 4:
        print line," ERROR"
    else:
        line_number += int(pv_clk_list[0])
        tab_number += int(pv_clk_list[1])
        pv_number += int(pv_clk_list[2])
        clk_number += int(pv_clk_list[3])def before_compress_stat(line):
    global line_number
    global pv_number
    global clk_number
    pv_clk_list = line.split(" ")
    if len(pv_clk_list) != 3:
        print line," ERROR"
    else:
        line_number += int(pv_clk_list[0])
        pv_number += int(pv_clk_list[1])
        clk_number += int(pv_clk_list[2])
#for line in sys.stdin:
    try:
        line = line.strip()
        if line.count("ERROR") > 0:
            print line
            continue
       
        if if_compressed_tested == 0:
            if_compressed_tested = 1
            if len(line.split(" ")) == 4:
                if_compressed = 1
            elif len(line.split(" ")) == 3:
                if_compressed = 0
            else:
                print line," ERROR"
                continue
           
        if if_compressed == 0:
            before_compress_stat(line)
        else:
            compressed_stat(line)
    except ValueError:
        print line, " ERROR"
        pass
       
if if_compressed == 0:
    print "LINE_NUMBER:",line_number,"TOTAL_PV_NUMBER:",pv_number, "TOTAL_CLK_NUMBER:",clk_number
else:
    print "LINE_NUMBER:",line_number,"TAB_NUMBER",tab_number,"TOTAL_PV_NUMBER:",pv_number, "TOTAL_CLK_NUMBER:",clk_number