1、从一个Demo入手

因为阅读一个源码,需要找到他的入口,一般一些计算框架,他都会提供一些例子类。所以我们就U型那咋从WordCount这个类来进行入手!

image-20200822115748386

1.1 WordCount

public class WordCount {

  public static class TokenizerMapper 
       extends Mapper<Object, Text, Text, IntWritable>{
    
    private final static IntWritable one = new IntWritable(1);
    private Text word = new Text();
      
    public void map(Object key, Text value, Context context
                    ) throws IOException, InterruptedException {
      StringTokenizer itr = new StringTokenizer(value.toString());
      while (itr.hasMoreTokens()) {
        word.set(itr.nextToken());
        context.write(word, one);
      }
    }
  }
  
  public static class IntSumReducer 
       extends Reducer<Text,IntWritable,Text,IntWritable> {
    private IntWritable result = new IntWritable();

    public void reduce(Text key, Iterable<IntWritable> values, 
                       Context context
                       ) throws IOException, InterruptedException {
      int sum = 0;
      for (IntWritable val : values) {
        sum += val.get();
      }
      result.set(sum);
      context.write(key, result);
    }
  }

  public static void main(String[] args) throws Exception {
    /**
     * TODO
     *  new Configuration 会自动加载8个配置文件
     *  正常来说:把安装集群的时候的四个配置文件,都会放在项目的resources的目录里面
     */
    Configuration conf = new Configuration();
    String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
    if (otherArgs.length < 2) {
      System.err.println("Usage: wordcount <in> [<in>...] <out>");
      System.exit(2);
    }
    Job job = Job.getInstance(conf, "word count");
    job.setJarByClass(WordCount.class);
    job.setMapperClass(TokenizerMapper.class);
    job.setCombinerClass(IntSumReducer.class);
    job.setReducerClass(IntSumReducer.class);
    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(IntWritable.class);
    for (int i = 0; i < otherArgs.length - 1; ++i) {
      FileInputFormat.addInputPath(job, new Path(otherArgs[i]));
    }
    FileOutputFormat.setOutputPath(job,
      new Path(otherArgs[otherArgs.length - 1]));
    //TODO 提交 true 就是进度信息
    System.exit(job.waitForCompletion(true) ? 0 : 1);
  }
}

1.2 waitForCompletion(true)

/**
 * Submit the job to the cluster and wait for it to finish.
 * @param verbose print the progress to the user
 * @return true if the job succeeded
 * @throws IOException thrown if the communication with the 
 *         <code>JobTracker</code> is lost
 */
public boolean waitForCompletion(boolean verbose
                                 ) throws IOException, InterruptedException,
                                          ClassNotFoundException {
  if (state == JobState.DEFINE) {
    //TODO tijiao
    submit();
  }
  if (verbose) {
    monitorAndPrintJob();
  } else {
    // get the completion poll interval from the client.
    int completionPollIntervalMillis = 
      Job.getCompletionPollInterval(cluster.getConf());
    while (!isComplete()) {
      try {
        Thread.sleep(completionPollIntervalMillis);
      } catch (InterruptedException ie) {
      }
    }
  }
  return isSuccessful();
}

2、Job提交的流程

2.1 submit();

/**
 * Submit the job to the cluster and return immediately.
 * @throws IOException
 */
public void submit() 
       throws IOException, InterruptedException, ClassNotFoundException {
  /**
   * TODO 确保任务是DEFINE的装填
   */
  ensureState(JobState.DEFINE);
  //TODO 启用新API
  // hadoop2.x  new api org.apache.hadoop.mapreduce.xxx  以前是接口的一些类,现在都是普通类了
  // hadoop1.x  old api org.apache.hadoop.mapred.xxx     很多父类都是接口
  // 其实很多技术组件都在做这个事情:把原来的定义的接口,都在往抽象类和普通类改
  setUseNewAPI();
  // TODO 连接集群! 连接Yarn 集群! 这里最重要的是声明了一个提交任务到yarn集群的客户端
  /**
   * 这个connect 方法最重要的工作就是构建了一个cluster成员变量
   * 然后这个cluster在初始化的时候,又会构建一个成员变量 也就是client = YarnRunner
   *
   * 如果是本地运行,则是LocalRunner
   */
  connect();
  //TODO 生成提交器 JobSubmitter
  final JobSubmitter submitter = 
      getJobSubmitter(cluster.getFileSystem(), cluster.getClient());
  status = ugi.doAs(new PrivilegedExceptionAction<JobStatus>() {
    public JobStatus run() throws IOException, InterruptedException, 
    ClassNotFoundException {
      /**
       * TODO 提交任务
       */
      return submitter.submitJobInternal(Job.this, cluster);
    }
  });
  state = JobState.RUNNING;
  LOG.info("The url to track the job: " + getTrackingURL());
 }

2.2 connect();

private synchronized void connect()
        throws IOException, InterruptedException, ClassNotFoundException {
  /**
   * cluster 其实就是我们的yarn集群
   */
  if (cluster == null) {
    cluster = 
      ugi.doAs(new PrivilegedExceptionAction<Cluster>() {
                 public Cluster run()
                        throws IOException, InterruptedException, 
                               ClassNotFoundException {
                   //TODO
                   // CLuster类中,有一个成员变量: clientProtocol = client = YARNRunner
                   return new Cluster(getConfiguration());
                 }
               });
  }
}

2.3 Cluster的构造方法的initialize

private void initialize(InetSocketAddress jobTrackAddr, Configuration conf)
      throws IOException {

    synchronized (frameworkLoader) {
      for (ClientProtocolProvider provider : frameworkLoader) {
        LOG.debug("Trying ClientProtocolProvider : "
            + provider.getClass().getName());

        ClientProtocol clientProtocol = null; 
        try {
          /**
           * TODO
           * provider 用来构建一个客户端的请求协议
           * 两个实现类 YarnCLientProtocolProvider LocakCLientProtocolProvider
           */
          if (jobTrackAddr == null) {
            clientProtocol = provider.create(conf);
          } else {
            clientProtocol = provider.create(jobTrackAddr, conf);
          }

          if (clientProtocol != null) {
            clientProtocolProvider = provider;
            /**
             * Cluster类中有个成员变量 就是 clientProtocol = client = YarnCLient
             */
            client = clientProtocol;
            LOG.debug("Picked " + provider.getClass().getName()
                + " as the ClientProtocolProvider");
            break;
          }
          else {
            LOG.debug("Cannot pick " + provider.getClass().getName()
                + " as the ClientProtocolProvider - returned null protocol");
          }
        } 
        catch (Exception e) {
          LOG.info("Failed to use " + provider.getClass().getName()
              + " due to error: " + e.getMessage());
        }
      }
    }

    if (null == clientProtocolProvider || null == client) {
      throw new IOException(
          "Cannot initialize Cluster. Please check your configuration for "
              + MRConfig.FRAMEWORK_NAME
              + " and the correspond server addresses.");
    }
  }

2.4 JobSubmitter的submitJobInternal

JobStatus submitJobInternal(Job job, Cluster cluster)  {

  //validate the jobs output specs
  /**
   * 检查工作目录和输出目录
   * 如果你指定的输出目录不是不存在的,则会报错
   */
  checkSpecs(job);

  //TODO 添加缓存
  addMRFrameworkToDistributedCache(conf);

  //TODO 生成任务ID
  JobID jobId = submitClient.getNewJobID();
  job.setJobID(jobId);

  Path submitJobDir = new Path(jobStagingArea, jobId.toString());
 
  Path submitJobFile = JobSubmissionFiles.getJobConfPath(submitJobDir);

    /**
     * 指定任务的执行队列
     */
    String queue = conf.get(MRJobConfig.QUEUE_NAME,
        JobConf.DEFAULT_QUEUE_NAME);


    /**
     * TODO 真正的提交任务
     * jobId  yarn集群中的每个任务都会有全局唯一的id
     * submitJobDir  提交任务的目录
     * getCredentials
     * 用户提交一个MR任务的时候,首先客户端会解析MR任务生成一些必须的组件,
     * 一个启动脚本,一个job.xml  一个jar包
     * 会存放在HDFS上的一个临时目录中
     */
    status = submitClient.submitJob(jobId, submitJobDir.toString(), job.getCredentials());

}

2.5 YarnRunner的submitJob

public JobStatus submitJob(JobID jobId, String jobSubmitDir, Credentials ts)
throws IOException, InterruptedException {
  
  addHistoryToken(ts);
  
  // Construct necessary information to start the MR AM
  ApplicationSubmissionContext appContext =
    createApplicationSubmissionContext(conf, jobSubmitDir, ts);

  // Submit to ResourceManager
  try {
    //TODO 提交任务
    ApplicationId applicationId = resMgrDelegate.submitApplication(appContext);

    ApplicationReport appMaster = resMgrDelegate.getApplicationReport(applicationId);
    String diagnostics =
        (appMaster == null ?
            "application report is null" : appMaster.getDiagnostics());
    if (appMaster == null
        || appMaster.getYarnApplicationState() == YarnApplicationState.FAILED
        || appMaster.getYarnApplicationState() == YarnApplicationState.KILLED) {
      throw new IOException("Failed to run job : " +
          diagnostics);
    }
    return clientCache.getClient(jobId).getJobStatus(jobId);
  } catch (YarnException e) {
    throw new IOException(e);
  }
}

2.6 submitApplication

@Override
public SubmitApplicationResponse submitApplication(
    SubmitApplicationRequest request) throws YarnException,
    IOException {
  SubmitApplicationRequestProto requestProto =
      ((SubmitApplicationRequestPBImpl) request).getProto();
  try {
    /**
     * TODO 提交任务到这基本OK
     *  所谓提交任务,其实就是RM的一个客户端代理对象,给RM发送了一个事件,告诉RM 我 提交了一个应用程序
     *  这个事件当中,会包含很多信息: jobid submitDir
     *  整个任务提交接下来就是yarn的事情了,剩下的事情比较复杂,也超出了我的能力范围,但是不妨碍我们理解啊的提交流程
     */
    return new SubmitApplicationResponsePBImpl(proxy.submitApplication(null,
      requestProto));
  } catch (ServiceException e) {
    RPCUtil.unwrapAndThrowException(e);
    return null;
  }
}

2.7 流程图

image-20200822162150710

2.8 Yarn提交流程图

image-20200822171546331

3、MapReduce组件启动

3.1 mrAppMaster

/**
 * 初始化 and 启动
 * @param appMaster
 * @param conf
 * @param jobUserName
 * @throws IOException
 * @throws InterruptedException
 */
protected static void initAndStartAppMaster(final MRAppMaster appMaster,
    final JobConf conf, String jobUserName) throws IOException,
    InterruptedException {
  UserGroupInformation.setConfiguration(conf);
  // Security framework already loaded the tokens into current UGI, just use
  // them
  Credentials credentials =
      UserGroupInformation.getCurrentUser().getCredentials();
  LOG.info("Executing with tokens:");
  for (Token<?> token : credentials.getAllTokens()) {
    LOG.info(token);
  }
  
  UserGroupInformation appMasterUgi = UserGroupInformation
      .createRemoteUser(jobUserName);
  appMasterUgi.addCredentials(credentials);

  // Now remove the AM->RM token so tasks don't have it
  Iterator<Token<?>> iter = credentials.getAllTokens().iterator();
  while (iter.hasNext()) {
    Token<?> token = iter.next();
    if (token.getKind().equals(AMRMTokenIdentifier.KIND_NAME)) {
      iter.remove();
    }
  }
  conf.getCredentials().addAll(credentials);
  appMasterUgi.doAs(new PrivilegedExceptionAction<Object>() {
    @Override
    public Object run() throws Exception {
      /**
       * TODO 初始化很多组件
       *
       */
      appMaster.init(conf);
      appMaster.start();
      if(appMaster.errorHappenedShutDown) {
        throw new IOException("Was asked to shut down.");
      }
      return null;
    }
  });
}

3.2 mrAppMaster的initAndStartAppMaster

/**
 * 初始化 and 启动
 * @param appMaster
 * @param conf
 * @param jobUserName
 * @throws IOException
 * @throws InterruptedException
 */
protected static void initAndStartAppMaster(final MRAppMaster appMaster,
    final JobConf conf, String jobUserName) throws IOException,
    InterruptedException {
  UserGroupInformation.setConfiguration(conf);
  // Security framework already loaded the tokens into current UGI, just use
  // them
  Credentials credentials =
      UserGroupInformation.getCurrentUser().getCredentials();
  LOG.info("Executing with tokens:");
  for (Token<?> token : credentials.getAllTokens()) {
    LOG.info(token);
  }
  
  UserGroupInformation appMasterUgi = UserGroupInformation
      .createRemoteUser(jobUserName);
  appMasterUgi.addCredentials(credentials);

  // Now remove the AM->RM token so tasks don't have it
  Iterator<Token<?>> iter = credentials.getAllTokens().iterator();
  while (iter.hasNext()) {
    Token<?> token = iter.next();
    if (token.getKind().equals(AMRMTokenIdentifier.KIND_NAME)) {
      iter.remove();
    }
  }
  conf.getCredentials().addAll(credentials);
  appMasterUgi.doAs(new PrivilegedExceptionAction<Object>() {
    @Override
    public Object run() throws Exception {
      /**
       * TODO 初始化很多组件
       *
       */
      appMaster.init(conf);
      appMaster.start();
      if(appMaster.errorHappenedShutDown) {
        throw new IOException("Was asked to shut down.");
      }
      return null;
    }
  });
}

3.3 YarnChild的main方法

public static void main(String[] args) throws Throwable {
  
    childUGI.doAs(new PrivilegedExceptionAction<Object>() {
      @Override
      public Object run() throws Exception {
        // use job-specified working directory
        FileSystem.get(job).setWorkingDirectory(job.getWorkingDirectory());

        /**
         * TODO 正经的调用给一个Task 执行 (MapTask,ReduceTask)
         */
        taskFinal.run(job, umbilical); // run the task
        return null;
      }
    });
 
}

4. MapTask启动

4.1 MapTask的run方法

public void run(final JobConf job, final TaskUmbilicalProtocol umbilical)
  throws IOException, ClassNotFoundException, InterruptedException {
  this.umbilical = umbilical;

  //TODO 判断当前Task 是不是 MapTask
  if (isMapTask()) {
    // If there are no reducers then there won't be any sort. Hence the map 
    // phase will govern the entire attempt's progress.
    //TODO 判断有reducer阶段吗
    if (conf.getNumReduceTasks() == 0) {
      mapPhase = getProgress().addPhase("map", 1.0f);
    } else {
      // If there are reducers then the entire attempt's progress will be 
      // split between the map phase (67%) and the sort phase (33%).
      /**
       * map 完成就是 2/3了
       * sort 完成 回事 1/3了
       */
      mapPhase = getProgress().addPhase("map", 0.667f);
      sortPhase  = getProgress().addPhase("sort", 0.333f);
    }
  }

  TaskReporter reporter = startReporter(umbilical);

  /**
   * 判断是否启用新API useNewAPi = true
   */
  boolean useNewApi = job.getUseNewMapper();
  /**
   * TODO 初始化
   * 做了一件重要的事情,就是初始化了TextOutputFormat
   */
  initialize(job, getJobID(), reporter, useNewApi);

  // check if it is a cleanupJobTask
  if (jobCleanup) {
    runJobCleanupTask(umbilical, reporter);
    return;
  }
  if (jobSetup) {
    runJobSetupTask(umbilical, reporter);
    return;
  }
  if (taskCleanup) {
    runTaskCleanupTask(umbilical, reporter);
    return;
  }

  if (useNewApi) {
    //TODO 新API的 所以我们跑的是这个
    runNewMapper(job, splitMetaInfo, umbilical, reporter);
  } else {
    //TODO 新API的
    runOldMapper(job, splitMetaInfo, umbilical, reporter);
  }
  done(umbilical, reporter);
}

4.2 initialize

public void initialize(JobConf job, JobID id, 
                       Reporter reporter,
                       boolean useNewApi) throws IOException, 
                                                 ClassNotFoundException,
                                                 InterruptedException {
  jobContext = new JobContextImpl(job, id, reporter);
  taskContext = new TaskAttemptContextImpl(job, taskId, reporter);
  if (getState() == TaskStatus.State.UNASSIGNED) {
    setState(TaskStatus.State.RUNNING);
  }
  if (useNewApi) {
    if (LOG.isDebugEnabled()) {
      LOG.debug("using new api for output committer");
    }
    /**
     * TODO 获取outputformat
     * TODO taskContext.getOutputFormatClass() = TextOutputFormat
     * 反射创建 TextOutputFormat
     */
    outputFormat = ReflectionUtils.newInstance(taskContext.getOutputFormatClass(), job);
    committer = outputFormat.getOutputCommitter(taskContext);
  } else {
    committer = conf.getOutputCommitter();
  }
  Path outputPath = FileOutputFormat.getOutputPath(conf);
  if (outputPath != null) {
    if ((committer instanceof FileOutputCommitter)) {
      FileOutputFormat.setWorkOutputPath(conf, 
        ((FileOutputCommitter)committer).getTaskAttemptPath(taskContext));
    } else {
      FileOutputFormat.setWorkOutputPath(conf, outputPath);
    }
  }
  committer.setupTask(taskContext);
  Class<? extends ResourceCalculatorProcessTree> clazz =
      conf.getClass(MRConfig.RESOURCE_CALCULATOR_PROCESS_TREE,
          null, ResourceCalculatorProcessTree.class);
  pTree = ResourceCalculatorProcessTree
          .getResourceCalculatorProcessTree(System.getenv().get("JVM_PID"), clazz, conf);
  LOG.info(" Using ResourceCalculatorProcessTree : " + pTree);
  if (pTree != null) {
    pTree.updateProcessTree();
    initCpuCumulativeTime = pTree.getCumulativeCpuTime();
  }
}

4.3 runNewMapper

private <INKEY,INVALUE,OUTKEY,OUTVALUE>
void runNewMapper(final JobConf job,
                  final TaskSplitIndex splitIndex,
                  final TaskUmbilicalProtocol umbilical,
                  TaskReporter reporter
                  ) throws IOException, ClassNotFoundException,
                           InterruptedException {
  // make a task context so we can get the classes
  /**
   * TODO
   *  每个MapTask都会有一个 TaskContext的上下文对象
   *  job.setXXX() 设置的各种组件,都放置在这个taskContext对象里面
   */
  org.apache.hadoop.mapreduce.TaskAttemptContext taskContext =
    new org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl(job, 
                                                                getTaskID(),
                                                                reporter);
  // make a mapper
  /**
   * TODO make a maper
   *  就是自己定义的mapper组件的实例对象
   */
  org.apache.hadoop.mapreduce.Mapper<INKEY,INVALUE,OUTKEY,OUTVALUE> mapper =
    (org.apache.hadoop.mapreduce.Mapper<INKEY,INVALUE,OUTKEY,OUTVALUE>)
            //TODO 通过反射的形式来进行构建的
      ReflectionUtils.newInstance(taskContext.getMapperClass(), job);
  // make the input format
  org.apache.hadoop.mapreduce.InputFormat<INKEY,INVALUE> inputFormat =
    (org.apache.hadoop.mapreduce.InputFormat<INKEY,INVALUE>)
            //TODO 构建一个输入组件, 默认是TextInputFormat.class
      ReflectionUtils.newInstance(taskContext.getInputFormatClass(), job);
  // rebuild the input split
  //TODO 先进行逻辑切片,每个逻辑切片,其实就是一个InputSplit对象
  //TODO 每个 逻辑切片启动一个MapTask
  org.apache.hadoop.mapreduce.InputSplit split = null;
  split = getSplitDetails(new Path(splitIndex.getSplitLocation()),
      splitIndex.getStartOffset());
  LOG.info("Processing split: " + split);

  /**
   * TODO
   *  正常来说,应该是由InputFormat来定义输入的格式
   *  具体的输入工作由RecordReader来做,  RecorReader 就是inputFormat的衍生组件 (组合设计模式)
   *  RecordReader 是由 InputFormat 来创建和关闭的!
   */
  org.apache.hadoop.mapreduce.RecordReader<INKEY,INVALUE> input = new NewTrackingRecordReader<INKEY,INVALUE>
      (split, inputFormat, reporter, taskContext);
  
  job.setBoolean(JobContext.SKIP_RECORDS, isSkipping());
  /**
   * TODO RecordWriter 负责数据写出
   */
  org.apache.hadoop.mapreduce.RecordWriter output = null;
  
  // get an output object
  /**
   * 如果没有 reducetask 那么就走 NewDirectOutputCollector
   * 如果有 reducetask   那么就走 NewOutputCollector
   */
  if (job.getNumReduceTasks() == 0) {
    output =
      new NewDirectOutputCollector(taskContext, job, umbilical, reporter);
  } else {
    //TODO  默认走这里
    // 如果没有reducer直接写出,这里准备了MapBuffer 也就是环形缓冲区
    /**
     * TODO
     *  context.write()
     *  output.collect()
     */
    output = new NewOutputCollector(taskContext, job, umbilical, reporter);
  }

  /**
   * TODO MapContext 初始化 其实就是传播了很多的信息 来进行让其他的组件也能看到
   * mapper.run(context) 中context 具备四个方法,其实就是调用了mapperContext的四个方法
   */
  org.apache.hadoop.mapreduce.MapContext<INKEY, INVALUE, OUTKEY, OUTVALUE> mapContext = 
    new MapContextImpl<INKEY, INVALUE, OUTKEY, OUTVALUE>(job, getTaskID(), 
        input, output, 
        committer, 
        reporter, split);

  //装饰者设计模式.就是 把 mapContext 包装了一下
  /**
   * mapper.run(context) 中context 具备四个方法,其实也就是调用了 mapperContext的四个方法
   * 其实就是调用了 mapContext的四个同名方法
   */
  org.apache.hadoop.mapreduce.Mapper<INKEY,INVALUE,OUTKEY,OUTVALUE>.Context 
      mapperContext = 
        new WrappedMapper<INKEY, INVALUE, OUTKEY, OUTVALUE>().getMapContext(
            mapContext);

  try {
    /**
     * TODO 访问数据源,如果是文件,提前创建好读取该文件的输入流
     * TODO 构建了逐行读取器
     * input = LineRecordReader
     * 这里又是采用了包装,我发现 设计中 用包装比较好,
     * 最重要的事情,就是根据OutputFormat创建了一个RecordReader
     * real = LineRecordReader
     */
    input.initialize(split, mapperContext);
    //TODO map开始执行,其实就是我们自己写的程序的Mapper实例对象

    mapper.run(mapperContext);
    mapPhase.complete();
    setPhase(TaskStatus.Phase.SORT);
    statusUpdate(umbilical);
    input.close();
    input = null;
    output.close(mapperContext);
    output = null;
  } finally {
    closeQuietly(input);
    closeQuietly(output, mapperContext);
  }
}

4.4. NewOutputCollector(环形缓缓冲区的)初始化操作

NewOutputCollector(org.apache.hadoop.mapreduce.JobContext jobContext,
                   JobConf job,
                   TaskUmbilicalProtocol umbilical,
                   TaskReporter reporter
                   ) throws IOException, ClassNotFoundException {
  /**
   * TODO 很重要
   * context.write()
   *
   */
  collector = createSortingCollector(job, reporter);
  partitions = jobContext.getNumReduceTasks();

  if (partitions > 1) {
    //TODO 如果大于1,那么就有分区的必要
    partitioner = (org.apache.hadoop.mapreduce.Partitioner<K,V>)
      ReflectionUtils.newInstance(jobContext.getPartitionerClass(), job);
  }
  //TODO 小于等于1
  else {
    //那么永远都是0号分区
    partitioner = new org.apache.hadoop.mapreduce.Partitioner<K,V>() {
      @Override
      public int getPartition(K key, V value, int numPartitions) {
        return partitions - 1;
      }
    };
  }
}

4.4 createSortingCollector

private <KEY, VALUE> MapOutputCollector<KEY, VALUE>
        createSortingCollector(JobConf job, TaskReporter reporter)
  throws IOException, ClassNotFoundException {
  MapOutputCollector.Context context =
    new MapOutputCollector.Context(this, job, reporter);

  //TODO MapOutputBuffer 这个就是环形缓冲区
  Class<?>[] collectorClasses = job.getClasses(
    JobContext.MAP_OUTPUT_COLLECTOR_CLASS_ATTR, MapOutputBuffer.class);

  int remainingCollectors = collectorClasses.length;
  Exception lastException = null;
  for (Class clazz : collectorClasses) {
    try {
      if (!MapOutputCollector.class.isAssignableFrom(clazz)) {
        throw new IOException("Invalid output collector class: " + clazz.getName() +
          " (does not implement MapOutputCollector)");
      }
      Class<? extends MapOutputCollector> subclazz =
        clazz.asSubclass(MapOutputCollector.class);
      LOG.debug("Trying map output collector class: " + subclazz.getName());
      //TODO 通过反射构建实例
      /**
       * 当前这个实例:
       * 一个MapTask 对应一个MapOutputCollector
       */
      MapOutputCollector<KEY, VALUE> collector =
        ReflectionUtils.newInstance(subclazz, job);
      /**
       * TODO 环形缓冲区的初始化
       */
      collector.init(context);
      LOG.info("Map output collector class = " + collector.getClass().getName());
      return collector;
    } catch (Exception e) {
      String msg = "Unable to initialize MapOutputCollector " + clazz.getName();
      if (--remainingCollectors > 0) {
        msg += " (" + remainingCollectors + " more collector(s) to try)";
      }
      lastException = e;
      LOG.warn(msg, e);
    }
  }
  throw new IOException("Initialization of all the collectors failed. " +
    "Error in last collector was :" + lastException.getMessage(), lastException);
}

4.5 init 方法

public void init(MapOutputCollector.Context context
                ) throws IOException, ClassNotFoundException {
  job = context.getJobConf();
  reporter = context.getReporter();
  mapTask = context.getMapTask();
  mapOutputFile = mapTask.getMapOutputFile();
  sortPhase = mapTask.getSortPhase();
  spilledRecordsCounter = reporter.getCounter(TaskCounter.SPILLED_RECORDS);
  partitions = job.getNumReduceTasks();
  rfs = ((LocalFileSystem)FileSystem.getLocal(job)).getRaw();

  //sanity checks
  final float spillper =
    job.getFloat(JobContext.MAP_SORT_SPILL_PERCENT, (float)0.8);
  final int sortmb = job.getInt(JobContext.IO_SORT_MB, 100);
  indexCacheMemoryLimit = job.getInt(JobContext.INDEX_CACHE_MEMORY_LIMIT,
                                     INDEX_CACHE_MEMORY_LIMIT_DEFAULT);
  if (spillper > (float)1.0 || spillper <= (float)0.0) {
    throw new IOException("Invalid \"" + JobContext.MAP_SORT_SPILL_PERCENT +
        "\": " + spillper);
  }
  if ((sortmb & 0x7FF) != sortmb) {
    throw new IOException(
        "Invalid \"" + JobContext.IO_SORT_MB + "\": " + sortmb);
  }
  sorter = ReflectionUtils.newInstance(job.getClass("map.sort.class",
        QuickSort.class, IndexedSorter.class), job);
  // buffers and accounting
  /**
   * TODO 初始化一个100M大小的环形缓冲区
   * 100M
   * 从100M 中分出来一部分 用来存储真实数据
   *
   */
  int maxMemUsage = sortmb << 20;
  maxMemUsage -= maxMemUsage % METASIZE;
  kvbuffer = new byte[maxMemUsage];
  bufvoid = kvbuffer.length;
  kvmeta = ByteBuffer.wrap(kvbuffer)
     .order(ByteOrder.nativeOrder())
     .asIntBuffer();
  setEquator(0);
  bufstart = bufend = bufindex = equator;
  kvstart = kvend = kvindex;

  maxRec = kvmeta.capacity() / NMETA;
  softLimit = (int)(kvbuffer.length * spillper);
  bufferRemaining = softLimit;
  LOG.info(JobContext.IO_SORT_MB + ": " + sortmb);
  LOG.info("soft limit at " + softLimit);
  LOG.info("bufstart = " + bufstart + "; bufvoid = " + bufvoid);
  LOG.info("kvstart = " + kvstart + "; length = " + maxRec);

  // k/v serialization
  comparator = job.getOutputKeyComparator();
  keyClass = (Class<K>)job.getMapOutputKeyClass();
  valClass = (Class<V>)job.getMapOutputValueClass();
  serializationFactory = new SerializationFactory(job);
  keySerializer = serializationFactory.getSerializer(keyClass);
  keySerializer.open(bb);
  valSerializer = serializationFactory.getSerializer(valClass);
  valSerializer.open(bb);

  // output counters
  mapOutputByteCounter = reporter.getCounter(TaskCounter.MAP_OUTPUT_BYTES);
  mapOutputRecordCounter =
    reporter.getCounter(TaskCounter.MAP_OUTPUT_RECORDS);
  fileOutputByteCounter = reporter
      .getCounter(TaskCounter.MAP_OUTPUT_MATERIALIZED_BYTES);

  // compression
  if (job.getCompressMapOutput()) {
    Class<? extends CompressionCodec> codecClass =
      job.getMapOutputCompressorClass(DefaultCodec.class);
    codec = ReflectionUtils.newInstance(codecClass, job);
  } else {
    codec = null;
  }

  // combiner
  final Counters.Counter combineInputCounter =
    reporter.getCounter(TaskCounter.COMBINE_INPUT_RECORDS);
  combinerRunner = CombinerRunner.create(job, getTaskID(), 
                                         combineInputCounter,
                                         reporter, null);
  if (combinerRunner != null) {
    final Counters.Counter combineOutputCounter =
      reporter.getCounter(TaskCounter.COMBINE_OUTPUT_RECORDS);
    combineCollector= new CombineOutputCollector<K,V>(combineOutputCounter, reporter, job);
  } else {
    combineCollector = null;
  }
  spillInProgress = false;
  minSpillsForCombine = job.getInt(JobContext.MAP_COMBINE_MIN_SPILLS, 3);
  spillThread.setDaemon(true);
  spillThread.setName("SpillThread");
  /**
   * TODO 当前MapOutputBuffer这个缓冲区的管理类,事实上管理了两个重要的东西
   * 1. 100M大小的字节数组
   * 2. SpillThread 负责 kvBuffer 中装满了的80% 的数据的溢写
   */
  spillLock.lock();
  try {
    spillThread.start();
    while (!spillThreadRunning) {
        //TODO 刚开始是阻塞状态
      spillDone.await();
    }
  } catch (InterruptedException e) {
    throw new IOException("Spill thread failed to initialize", e);
  } finally {
    spillLock.unlock();
  }
  if (sortSpillException != null) {
    throw new IOException("Spill thread failed to initialize",
        sortSpillException);
  }
}

4.6 mapper.run(mapperContext);

这里就开始读取数据了,然后调用自己写的map的context然后写出去

 public void run(Context context) throws IOException, InterruptedException {
    //一般用来做初始化,默认实现什么都没做
    setup(context);
    try {
      /**
       * TODO nextkeyvalue() 取一个keyvalue 表示当前防范能取到值吗
       * mapper组件中,都有一个叫做 context的参数 这个参数拥有最重要的四个方法:
       * 1.context.nextKeyValue()
       * 2.context.getCurrentKey()
       * 3.context.getCurrentValue()
       * 4.context.write()
       *
       * context  对象中 包含了真正去读取数据 和输出数据的对象实例
       * 这里就是典型的装饰器设计模式的体现
       */
      while (context.nextKeyValue()) {
        map(context.getCurrentKey(), context.getCurrentValue(), context);
      }
    } finally {
      // 一般用来做收尾操作
      cleanup(context);
    }
  }

4.7 NewOutputCollector的write

@Override
public void write(K key, V value) throws IOException, InterruptedException {
  int partion = partitioner.getPartition(key, value, partitions);
  collector.collect(key, value, partion );
}

4.8 NewOutputCollector的collect方法

这里就是我们的环形缓冲器的处理方法

/**
 * Serialize the key, value to intermediate storage.
 * When this method returns, kvindex must refer to sufficient unused
 * storage to store one METADATA.
 */
public synchronized void collect(K key, V value, final int partition
                                 ) throws IOException {
  reporter.progress();
  if (key.getClass() != keyClass) {
    throw new IOException("Type mismatch in key from map: expected "
                          + keyClass.getName() + ", received "
                          + key.getClass().getName());
  }
  if (value.getClass() != valClass) {
    throw new IOException("Type mismatch in value from map: expected "
                          + valClass.getName() + ", received "
                          + value.getClass().getName());
  }
  if (partition < 0 || partition >= partitions) {
    throw new IOException("Illegal partition for " + key + " (" +
        partition + ")");
  }
  checkSpillException();
  bufferRemaining -= METASIZE;
  if (bufferRemaining <= 0) {
    // start spill if the thread is not running and the soft limit has been
    // reached
    spillLock.lock();
    try {
      do {
        if (!spillInProgress) {
          final int kvbidx = 4 * kvindex;
          final int kvbend = 4 * kvend;
          // serialized, unspilled bytes always lie between kvindex and
          // bufindex, crossing the equator. Note that any void space
          // created by a reset must be included in "used" bytes
          final int bUsed = distanceTo(kvbidx, bufindex);
          final boolean bufsoftlimit = bUsed >= softLimit;
          if ((kvbend + METASIZE) % kvbuffer.length !=
              equator - (equator % METASIZE)) {
            // spill finished, reclaim space
            resetSpill();
            bufferRemaining = Math.min(
                distanceTo(bufindex, kvbidx) - 2 * METASIZE,
                softLimit - bUsed) - METASIZE;
            continue;
          } else if (bufsoftlimit && kvindex != kvend) {
            // spill records, if any collected; check latter, as it may
            // be possible for metadata alignment to hit spill pcnt
            startSpill();
            final int avgRec = (int)
              (mapOutputByteCounter.getCounter() /
              mapOutputRecordCounter.getCounter());
            // leave at least half the split buffer for serialization data
            // ensure that kvindex >= bufindex
            final int distkvi = distanceTo(bufindex, kvbidx);
            final int newPos = (bufindex +
              Math.max(2 * METASIZE - 1,
                      Math.min(distkvi / 2,
                               distkvi / (METASIZE + avgRec) * METASIZE)))
              % kvbuffer.length;
            setEquator(newPos);
            bufmark = bufindex = newPos;
            final int serBound = 4 * kvend;
            // bytes remaining before the lock must be held and limits
            // checked is the minimum of three arcs: the metadata space, the
            // serialization space, and the soft limit
            bufferRemaining = Math.min(
                // metadata max
                distanceTo(bufend, newPos),
                Math.min(
                  // serialization max
                  distanceTo(newPos, serBound),
                  // soft limit
                  softLimit)) - 2 * METASIZE;
          }
        }
      } while (false);
    } finally {
      spillLock.unlock();
    }
  }

  try {
    // serialize key bytes into buffer
    int keystart = bufindex;
    keySerializer.serialize(key);
    if (bufindex < keystart) {
      // wrapped the key; must make contiguous
      bb.shiftBufferedKey();
      keystart = 0;
    }
    // serialize value bytes into buffer
    final int valstart = bufindex;
    valSerializer.serialize(value);
    // It's possible for records to have zero length, i.e. the serializer
    // will perform no writes. To ensure that the boundary conditions are
    // checked and that the kvindex invariant is maintained, perform a
    // zero-length write into the buffer. The logic monitoring this could be
    // moved into collect, but this is cleaner and inexpensive. For now, it
    // is acceptable.
    bb.write(b0, 0, 0);

    // the record must be marked after the preceding write, as the metadata
    // for this record are not yet written
    int valend = bb.markRecord();

    mapOutputRecordCounter.increment(1);
    mapOutputByteCounter.increment(
        distanceTo(keystart, valend, bufvoid));

    // write accounting info
    kvmeta.put(kvindex + PARTITION, partition);
    kvmeta.put(kvindex + KEYSTART, keystart);
    kvmeta.put(kvindex + VALSTART, valstart);
    kvmeta.put(kvindex + VALLEN, distanceTo(valstart, valend));
    // advance kvindex
    kvindex = (kvindex - NMETA + kvmeta.capacity()) % kvmeta.capacity();
  } catch (MapBufferTooSmallException e) {
    LOG.info("Record too large for in-memory buffer: " + e.getMessage());
    spillSingleRecord(key, value, partition);
    mapOutputRecordCounter.increment(1);
    return;
  }
}

4.7 我们现在已经走到流程图的哪里了

image-20200822185650312

未完待续…

Logo

立足具身智能前沿赛道,致力于搭建全球化、开源化、全栈式技术交流与实践共创平台。

更多推荐