监听器初始化Job,JobTracker相应TaskTracker心跳,调度器分配task分析2015-01-28监听器初始化Job,JobTracker相应TaskTracker心跳,调度器分配task的源码级分析JobTracker和TaskTracker分别启动之后(JobTracker启动流程源码级分析,TaskTracker启动过程源码级分析),taskTracker会通过心跳与JobTracker通信,并获取分配它的任务。用户将作业提交到JobTracker之后,放入相应的数据结构中,静等被分配。mapreduce job提交流程源码级分析(三)这篇文章已经分析了用户提交作业的最后步骤,主要是构造作业对应的JobInProgress并加入jobs,告知所有的JobInProgressListener。默认调度器创建了两个Listener:JobQueueJobInProgressListener和EagerTaskInitializationListener,用户提交的作业被封装成JobInProgress job加入这两个Listener。一、JobQueueJobInProgressListener.jobAdded(job)会将此JobInProgress放入Map<JobSchedulingInfo, JobInProgress> jobQueue中。二、EagerTaskInitializationListener.jobAdded(job)会将此 JobInProgress放入List<JobInProgress> jobInitQueue中,然后调用resortInitQueue()对这个列表进行排序先按优先级相同则按开始时间;然后唤醒在此对象监视器上等待的所有线程jobInitQueue.notifyAll()。EagerTaskInitializationListener.start()方法已经在调度器start时运行,会创建一个线程JobInitManager implements Runnable,它的run方法主要是监控jobInitQueue列表,一旦发现不为空就获取第一个JobInProgress,然后创建一个 InitJob implements Runnable初始化线程并放入线程池ExecutorService threadPool(这个线程池在构建EagerTaskInitializationListener对象时由构造方法实现),InitJob线程的 run方法就一句话ttm.initJob(job),调用的是JobTracker的initJob(job)方法对JIP进行初始化,实际调用 JobInProgress.initTasks()对job进行初始化,initTasks()方法代码如下:
/** * Construct the splits, etc.This is invoked from an async * thread so that split-computation doesn"t block anyone. *///任务Task分两种: MapTask 和reduceTask,它们的管理对象都是TaskInProgress 。public synchronized void initTasks() throws IOException, KillInterruptedException, UnknownHostException {if (tasksInited || isComplete()) {return;}synchronized(jobInitKillStatus){if(jobInitKillStatus.killed || jobInitKillStatus.initStarted) {return;}jobInitKillStatus.initStarted = true;}LOG.info("Initializing " + jobId);final long startTimeFinal = this.startTime;// log job info as the user running the jobtry {userUGI.doAs(new PrivilegedExceptionAction<Object>() {@Overridepublic Object run() throws Exception {JobHistory.JobInfo.logSubmitted(getJobID(), conf, jobFile, startTimeFinal, hasRestarted());return null;}});} catch(InterruptedException ie) {throw new IOException(ie);}// log the job prioritysetPriority(this.priority);//// generate security keys needed by Tasks//generateAndStoreTokens();//// read input splits and create a map per a split//TaskSplitMetaInfo[] splits = createSplits(jobId);if (numMapTasks != splits.length) {throw new IOException("Number of maps in JobConf doesn"t match number of " +"recieved splits for job " + jobId + "! " +"numMapTasks=" + numMapTasks + ", #splits=" + splits.length);}numMapTasks = splits.length;//map task的个数就是input split的个数// Sanity check the locations so we don"t create/initialize unnecessary tasksfor (TaskSplitMetaInfo split : splits) {NetUtils.verifyHostnames(split.getLocations());}jobtracker.getInstrumentation().addWaitingMaps(getJobID(), numMapTasks);jobtracker.getInstrumentation().addWaitingReduces(getJobID(), numReduceTasks);this.queueMetrics.addWaitingMaps(getJobID(), numMapTasks);this.queueMetrics.addWaitingReduces(getJobID(), numReduceTasks);maps = new TaskInProgress[numMapTasks]; //为每个map tasks生成一个TaskInProgress来处理一个input split for(int i=0; i < numMapTasks; ++i) {inputLength += splits[i].getInputDataLength();maps[i] = new TaskInProgress(jobId, jobFile, //类型是map task splits[i],jobtracker, conf, this, i, numSlotsPerMap);}LOG.info("Input size for job " + jobId + " = " + inputLength+ ". Number of splits = " + splits.length);// Set localityWaitFactor before creating cachelocalityWaitFactor = conf.getFloat(LOCALITY_WAIT_FACTOR, DEFAULT_LOCALITY_WAIT_FACTOR);/* 对于map task,将其放入nonRunningMapCache,是一个Map<Node,List<TaskInProgress>>,也即对于map task来讲,其将会被分配到其inputsplit所在的Node上。在此,Node代表一个datanode或者机架或者数据中心。nonRunningMapCache将在JobTracker向TaskTracker分配map task的 时候使用。*/if (numMapTasks > 0) { //通过createCache()方法为这些TaskInProgress对象产生一个未执行任务的Map缓存nonRunningMapCache。//slave端的TaskTracker向master发送心跳时,就可以直接从这个cache中取任务去执行。nonRunningMapCache = createCache(splits, maxLevel);}// set the launch timethis.launchTime = jobtracker.getClock().getTime();//// Create reduce tasks////其次JobInProgress会创建Reduce的监控对象,这个比较简单,根据JobConf里指定的Reduce数目创建,//缺省只创建1个Reduce任务。监控和调度Reduce任务的是TaskInProgress类,不过构造方法有所不同,//TaskInProgress会根据不同参数分别创建具体的MapTask或者ReduceTask。同样地,//initTasks()也会通过createCache()方法产生nonRunningReduceCache成员。this.reduces = new TaskInProgress[numReduceTasks];for (int i = 0; i < numReduceTasks; i++) {reduces[i] = new TaskInProgress(jobId, jobFile, //这是reduce tasknumMapTasks, i, jobtracker, conf, this, numSlotsPerReduce);/*reducetask放入nonRunningReduces,其将在JobTracker向TaskTracker分配reduce task的时候使用。*/nonRunningReduces.add(reduces[i]);}// Calculate the minimum number of maps to be complete before // we should start scheduling reducescompletedMapsForReduceSlowstart = (int)Math.ceil((conf.getFloat("mapred.reduce.slowstart.completed.maps",DEFAULT_COMPLETED_MAPS_PERCENT_FOR_REDUCE_SLOWSTART) *numMapTasks));// ... use the same for estimating the total output of all mapsresourceEstimator.setThreshhold(completedMapsForReduceSlowstart);// create cleanup two cleanup tips, one map and one reduce.//创建两个cleanup task,一个用来清理map,一个用来清理reduce. cleanup = new TaskInProgress[2];// cleanup map tip. This map doesn"t use any splits. Just assign an empty// split.TaskSplitMetaInfo emptySplit = JobSplit.EMPTY_TASK_SPLIT;cleanup[0] = new TaskInProgress(jobId, jobFile, emptySplit, jobtracker, conf, this, numMapTasks, 1);cleanup[0].setJobCleanupTask();// cleanup reduce tip.cleanup[1] = new TaskInProgress(jobId, jobFile, numMapTasks, numReduceTasks, jobtracker, conf, this, 1);cleanup[1].setJobCleanupTask();// create two setup tips, one map and one reduce.//创建两个初始化 task,一个初始化map,一个初始化reduce. setup = new TaskInProgress[2];// setup map tip. This map doesn"t use any split. Just assign an empty// split.setup[0] = new TaskInProgress(jobId, jobFile, emptySplit, jobtracker, conf, this, numMapTasks + 1, 1);setup[0].setJobSetupTask();// setup reduce tip.setup[1] = new TaskInProgress(jobId, jobFile, numMapTasks, numReduceTasks + 1, jobtracker, conf, this, 1);setup[1].setJobSetupTask();synchronized(jobInitKillStatus){jobInitKillStatus.initDone = true;if(jobInitKillStatus.killed) {throw new KillInterruptedException("Job " + jobId + " killed in init");}}//JobInProgress创建完TaskInProgress后,最后构造JobStatus并记录job正在执行中,//然后再调用JobHistory.JobInfo.logInited()记录job的执行日志。tasksInited = true;JobHistory.JobInfo.logInited(profile.getJobID(), this.launchTime,numMapTasks, numReduceTasks); // Log the number of map and reduce tasks LOG.info("Job " + jobId + " initialized successfully with " + numMapTasks+ " map tasks and " + numReduceTasks + " reduce tasks.");}