MapReduce执行流程源码深度剖析(二)
1、从一个Demo入手因为阅读一个源码,需要找到他的入口,一般一些计算框架,他都会提供一些例子类。所以我们就U型那咋从WordCount这个类来进行入手!1.1 WordCountpublic class WordCount {public static class TokenizerMapperextends Mapper<Object, Text, Text, IntWritable&g
·
1、从一个Demo入手
因为阅读一个源码,需要找到他的入口,一般一些计算框架,他都会提供一些例子类。所以我们就U型那咋从WordCount这个类来进行入手!

1.1 WordCount
public class WordCount {
public static class TokenizerMapper
extends Mapper<Object, Text, Text, IntWritable>{
private final static IntWritable one = new IntWritable(1);
private Text word = new Text();
public void map(Object key, Text value, Context context
) throws IOException, InterruptedException {
StringTokenizer itr = new StringTokenizer(value.toString());
while (itr.hasMoreTokens()) {
word.set(itr.nextToken());
context.write(word, one);
}
}
}
public static class IntSumReducer
extends Reducer<Text,IntWritable,Text,IntWritable> {
private IntWritable result = new IntWritable();
public void reduce(Text key, Iterable<IntWritable> values,
Context context
) throws IOException, InterruptedException {
int sum = 0;
for (IntWritable val : values) {
sum += val.get();
}
result.set(sum);
context.write(key, result);
}
}
public static void main(String[] args) throws Exception {
/**
* TODO
* new Configuration 会自动加载8个配置文件
* 正常来说:把安装集群的时候的四个配置文件,都会放在项目的resources的目录里面
*/
Configuration conf = new Configuration();
String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
if (otherArgs.length < 2) {
System.err.println("Usage: wordcount <in> [<in>...] <out>");
System.exit(2);
}
Job job = Job.getInstance(conf, "word count");
job.setJarByClass(WordCount.class);
job.setMapperClass(TokenizerMapper.class);
job.setCombinerClass(IntSumReducer.class);
job.setReducerClass(IntSumReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
for (int i = 0; i < otherArgs.length - 1; ++i) {
FileInputFormat.addInputPath(job, new Path(otherArgs[i]));
}
FileOutputFormat.setOutputPath(job,
new Path(otherArgs[otherArgs.length - 1]));
//TODO 提交 true 就是进度信息
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
1.2 waitForCompletion(true)
/**
* Submit the job to the cluster and wait for it to finish.
* @param verbose print the progress to the user
* @return true if the job succeeded
* @throws IOException thrown if the communication with the
* <code>JobTracker</code> is lost
*/
public boolean waitForCompletion(boolean verbose
) throws IOException, InterruptedException,
ClassNotFoundException {
if (state == JobState.DEFINE) {
//TODO tijiao
submit();
}
if (verbose) {
monitorAndPrintJob();
} else {
// get the completion poll interval from the client.
int completionPollIntervalMillis =
Job.getCompletionPollInterval(cluster.getConf());
while (!isComplete()) {
try {
Thread.sleep(completionPollIntervalMillis);
} catch (InterruptedException ie) {
}
}
}
return isSuccessful();
}
2、Job提交的流程
2.1 submit();
/**
* Submit the job to the cluster and return immediately.
* @throws IOException
*/
public void submit()
throws IOException, InterruptedException, ClassNotFoundException {
/**
* TODO 确保任务是DEFINE的装填
*/
ensureState(JobState.DEFINE);
//TODO 启用新API
// hadoop2.x new api org.apache.hadoop.mapreduce.xxx 以前是接口的一些类,现在都是普通类了
// hadoop1.x old api org.apache.hadoop.mapred.xxx 很多父类都是接口
// 其实很多技术组件都在做这个事情:把原来的定义的接口,都在往抽象类和普通类改
setUseNewAPI();
// TODO 连接集群! 连接Yarn 集群! 这里最重要的是声明了一个提交任务到yarn集群的客户端
/**
* 这个connect 方法最重要的工作就是构建了一个cluster成员变量
* 然后这个cluster在初始化的时候,又会构建一个成员变量 也就是client = YarnRunner
*
* 如果是本地运行,则是LocalRunner
*/
connect();
//TODO 生成提交器 JobSubmitter
final JobSubmitter submitter =
getJobSubmitter(cluster.getFileSystem(), cluster.getClient());
status = ugi.doAs(new PrivilegedExceptionAction<JobStatus>() {
public JobStatus run() throws IOException, InterruptedException,
ClassNotFoundException {
/**
* TODO 提交任务
*/
return submitter.submitJobInternal(Job.this, cluster);
}
});
state = JobState.RUNNING;
LOG.info("The url to track the job: " + getTrackingURL());
}
2.2 connect();
private synchronized void connect()
throws IOException, InterruptedException, ClassNotFoundException {
/**
* cluster 其实就是我们的yarn集群
*/
if (cluster == null) {
cluster =
ugi.doAs(new PrivilegedExceptionAction<Cluster>() {
public Cluster run()
throws IOException, InterruptedException,
ClassNotFoundException {
//TODO
// CLuster类中,有一个成员变量: clientProtocol = client = YARNRunner
return new Cluster(getConfiguration());
}
});
}
}
2.3 Cluster的构造方法的initialize
private void initialize(InetSocketAddress jobTrackAddr, Configuration conf)
throws IOException {
synchronized (frameworkLoader) {
for (ClientProtocolProvider provider : frameworkLoader) {
LOG.debug("Trying ClientProtocolProvider : "
+ provider.getClass().getName());
ClientProtocol clientProtocol = null;
try {
/**
* TODO
* provider 用来构建一个客户端的请求协议
* 两个实现类 YarnCLientProtocolProvider LocakCLientProtocolProvider
*/
if (jobTrackAddr == null) {
clientProtocol = provider.create(conf);
} else {
clientProtocol = provider.create(jobTrackAddr, conf);
}
if (clientProtocol != null) {
clientProtocolProvider = provider;
/**
* Cluster类中有个成员变量 就是 clientProtocol = client = YarnCLient
*/
client = clientProtocol;
LOG.debug("Picked " + provider.getClass().getName()
+ " as the ClientProtocolProvider");
break;
}
else {
LOG.debug("Cannot pick " + provider.getClass().getName()
+ " as the ClientProtocolProvider - returned null protocol");
}
}
catch (Exception e) {
LOG.info("Failed to use " + provider.getClass().getName()
+ " due to error: " + e.getMessage());
}
}
}
if (null == clientProtocolProvider || null == client) {
throw new IOException(
"Cannot initialize Cluster. Please check your configuration for "
+ MRConfig.FRAMEWORK_NAME
+ " and the correspond server addresses.");
}
}
2.4 JobSubmitter的submitJobInternal
JobStatus submitJobInternal(Job job, Cluster cluster) {
//validate the jobs output specs
/**
* 检查工作目录和输出目录
* 如果你指定的输出目录不是不存在的,则会报错
*/
checkSpecs(job);
//TODO 添加缓存
addMRFrameworkToDistributedCache(conf);
//TODO 生成任务ID
JobID jobId = submitClient.getNewJobID();
job.setJobID(jobId);
Path submitJobDir = new Path(jobStagingArea, jobId.toString());
Path submitJobFile = JobSubmissionFiles.getJobConfPath(submitJobDir);
/**
* 指定任务的执行队列
*/
String queue = conf.get(MRJobConfig.QUEUE_NAME,
JobConf.DEFAULT_QUEUE_NAME);
/**
* TODO 真正的提交任务
* jobId yarn集群中的每个任务都会有全局唯一的id
* submitJobDir 提交任务的目录
* getCredentials
* 用户提交一个MR任务的时候,首先客户端会解析MR任务生成一些必须的组件,
* 一个启动脚本,一个job.xml 一个jar包
* 会存放在HDFS上的一个临时目录中
*/
status = submitClient.submitJob(jobId, submitJobDir.toString(), job.getCredentials());
}
2.5 YarnRunner的submitJob
public JobStatus submitJob(JobID jobId, String jobSubmitDir, Credentials ts)
throws IOException, InterruptedException {
addHistoryToken(ts);
// Construct necessary information to start the MR AM
ApplicationSubmissionContext appContext =
createApplicationSubmissionContext(conf, jobSubmitDir, ts);
// Submit to ResourceManager
try {
//TODO 提交任务
ApplicationId applicationId = resMgrDelegate.submitApplication(appContext);
ApplicationReport appMaster = resMgrDelegate.getApplicationReport(applicationId);
String diagnostics =
(appMaster == null ?
"application report is null" : appMaster.getDiagnostics());
if (appMaster == null
|| appMaster.getYarnApplicationState() == YarnApplicationState.FAILED
|| appMaster.getYarnApplicationState() == YarnApplicationState.KILLED) {
throw new IOException("Failed to run job : " +
diagnostics);
}
return clientCache.getClient(jobId).getJobStatus(jobId);
} catch (YarnException e) {
throw new IOException(e);
}
}
2.6 submitApplication
@Override
public SubmitApplicationResponse submitApplication(
SubmitApplicationRequest request) throws YarnException,
IOException {
SubmitApplicationRequestProto requestProto =
((SubmitApplicationRequestPBImpl) request).getProto();
try {
/**
* TODO 提交任务到这基本OK
* 所谓提交任务,其实就是RM的一个客户端代理对象,给RM发送了一个事件,告诉RM 我 提交了一个应用程序
* 这个事件当中,会包含很多信息: jobid submitDir
* 整个任务提交接下来就是yarn的事情了,剩下的事情比较复杂,也超出了我的能力范围,但是不妨碍我们理解啊的提交流程
*/
return new SubmitApplicationResponsePBImpl(proxy.submitApplication(null,
requestProto));
} catch (ServiceException e) {
RPCUtil.unwrapAndThrowException(e);
return null;
}
}
2.7 流程图

2.8 Yarn提交流程图

3、MapReduce组件启动
3.1 mrAppMaster
/**
* 初始化 and 启动
* @param appMaster
* @param conf
* @param jobUserName
* @throws IOException
* @throws InterruptedException
*/
protected static void initAndStartAppMaster(final MRAppMaster appMaster,
final JobConf conf, String jobUserName) throws IOException,
InterruptedException {
UserGroupInformation.setConfiguration(conf);
// Security framework already loaded the tokens into current UGI, just use
// them
Credentials credentials =
UserGroupInformation.getCurrentUser().getCredentials();
LOG.info("Executing with tokens:");
for (Token<?> token : credentials.getAllTokens()) {
LOG.info(token);
}
UserGroupInformation appMasterUgi = UserGroupInformation
.createRemoteUser(jobUserName);
appMasterUgi.addCredentials(credentials);
// Now remove the AM->RM token so tasks don't have it
Iterator<Token<?>> iter = credentials.getAllTokens().iterator();
while (iter.hasNext()) {
Token<?> token = iter.next();
if (token.getKind().equals(AMRMTokenIdentifier.KIND_NAME)) {
iter.remove();
}
}
conf.getCredentials().addAll(credentials);
appMasterUgi.doAs(new PrivilegedExceptionAction<Object>() {
@Override
public Object run() throws Exception {
/**
* TODO 初始化很多组件
*
*/
appMaster.init(conf);
appMaster.start();
if(appMaster.errorHappenedShutDown) {
throw new IOException("Was asked to shut down.");
}
return null;
}
});
}
3.2 mrAppMaster的initAndStartAppMaster
/**
* 初始化 and 启动
* @param appMaster
* @param conf
* @param jobUserName
* @throws IOException
* @throws InterruptedException
*/
protected static void initAndStartAppMaster(final MRAppMaster appMaster,
final JobConf conf, String jobUserName) throws IOException,
InterruptedException {
UserGroupInformation.setConfiguration(conf);
// Security framework already loaded the tokens into current UGI, just use
// them
Credentials credentials =
UserGroupInformation.getCurrentUser().getCredentials();
LOG.info("Executing with tokens:");
for (Token<?> token : credentials.getAllTokens()) {
LOG.info(token);
}
UserGroupInformation appMasterUgi = UserGroupInformation
.createRemoteUser(jobUserName);
appMasterUgi.addCredentials(credentials);
// Now remove the AM->RM token so tasks don't have it
Iterator<Token<?>> iter = credentials.getAllTokens().iterator();
while (iter.hasNext()) {
Token<?> token = iter.next();
if (token.getKind().equals(AMRMTokenIdentifier.KIND_NAME)) {
iter.remove();
}
}
conf.getCredentials().addAll(credentials);
appMasterUgi.doAs(new PrivilegedExceptionAction<Object>() {
@Override
public Object run() throws Exception {
/**
* TODO 初始化很多组件
*
*/
appMaster.init(conf);
appMaster.start();
if(appMaster.errorHappenedShutDown) {
throw new IOException("Was asked to shut down.");
}
return null;
}
});
}
3.3 YarnChild的main方法
public static void main(String[] args) throws Throwable {
childUGI.doAs(new PrivilegedExceptionAction<Object>() {
@Override
public Object run() throws Exception {
// use job-specified working directory
FileSystem.get(job).setWorkingDirectory(job.getWorkingDirectory());
/**
* TODO 正经的调用给一个Task 执行 (MapTask,ReduceTask)
*/
taskFinal.run(job, umbilical); // run the task
return null;
}
});
}
4. MapTask启动
4.1 MapTask的run方法
public void run(final JobConf job, final TaskUmbilicalProtocol umbilical)
throws IOException, ClassNotFoundException, InterruptedException {
this.umbilical = umbilical;
//TODO 判断当前Task 是不是 MapTask
if (isMapTask()) {
// If there are no reducers then there won't be any sort. Hence the map
// phase will govern the entire attempt's progress.
//TODO 判断有reducer阶段吗
if (conf.getNumReduceTasks() == 0) {
mapPhase = getProgress().addPhase("map", 1.0f);
} else {
// If there are reducers then the entire attempt's progress will be
// split between the map phase (67%) and the sort phase (33%).
/**
* map 完成就是 2/3了
* sort 完成 回事 1/3了
*/
mapPhase = getProgress().addPhase("map", 0.667f);
sortPhase = getProgress().addPhase("sort", 0.333f);
}
}
TaskReporter reporter = startReporter(umbilical);
/**
* 判断是否启用新API useNewAPi = true
*/
boolean useNewApi = job.getUseNewMapper();
/**
* TODO 初始化
* 做了一件重要的事情,就是初始化了TextOutputFormat
*/
initialize(job, getJobID(), reporter, useNewApi);
// check if it is a cleanupJobTask
if (jobCleanup) {
runJobCleanupTask(umbilical, reporter);
return;
}
if (jobSetup) {
runJobSetupTask(umbilical, reporter);
return;
}
if (taskCleanup) {
runTaskCleanupTask(umbilical, reporter);
return;
}
if (useNewApi) {
//TODO 新API的 所以我们跑的是这个
runNewMapper(job, splitMetaInfo, umbilical, reporter);
} else {
//TODO 新API的
runOldMapper(job, splitMetaInfo, umbilical, reporter);
}
done(umbilical, reporter);
}
4.2 initialize
public void initialize(JobConf job, JobID id,
Reporter reporter,
boolean useNewApi) throws IOException,
ClassNotFoundException,
InterruptedException {
jobContext = new JobContextImpl(job, id, reporter);
taskContext = new TaskAttemptContextImpl(job, taskId, reporter);
if (getState() == TaskStatus.State.UNASSIGNED) {
setState(TaskStatus.State.RUNNING);
}
if (useNewApi) {
if (LOG.isDebugEnabled()) {
LOG.debug("using new api for output committer");
}
/**
* TODO 获取outputformat
* TODO taskContext.getOutputFormatClass() = TextOutputFormat
* 反射创建 TextOutputFormat
*/
outputFormat = ReflectionUtils.newInstance(taskContext.getOutputFormatClass(), job);
committer = outputFormat.getOutputCommitter(taskContext);
} else {
committer = conf.getOutputCommitter();
}
Path outputPath = FileOutputFormat.getOutputPath(conf);
if (outputPath != null) {
if ((committer instanceof FileOutputCommitter)) {
FileOutputFormat.setWorkOutputPath(conf,
((FileOutputCommitter)committer).getTaskAttemptPath(taskContext));
} else {
FileOutputFormat.setWorkOutputPath(conf, outputPath);
}
}
committer.setupTask(taskContext);
Class<? extends ResourceCalculatorProcessTree> clazz =
conf.getClass(MRConfig.RESOURCE_CALCULATOR_PROCESS_TREE,
null, ResourceCalculatorProcessTree.class);
pTree = ResourceCalculatorProcessTree
.getResourceCalculatorProcessTree(System.getenv().get("JVM_PID"), clazz, conf);
LOG.info(" Using ResourceCalculatorProcessTree : " + pTree);
if (pTree != null) {
pTree.updateProcessTree();
initCpuCumulativeTime = pTree.getCumulativeCpuTime();
}
}
4.3 runNewMapper
private <INKEY,INVALUE,OUTKEY,OUTVALUE>
void runNewMapper(final JobConf job,
final TaskSplitIndex splitIndex,
final TaskUmbilicalProtocol umbilical,
TaskReporter reporter
) throws IOException, ClassNotFoundException,
InterruptedException {
// make a task context so we can get the classes
/**
* TODO
* 每个MapTask都会有一个 TaskContext的上下文对象
* job.setXXX() 设置的各种组件,都放置在这个taskContext对象里面
*/
org.apache.hadoop.mapreduce.TaskAttemptContext taskContext =
new org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl(job,
getTaskID(),
reporter);
// make a mapper
/**
* TODO make a maper
* 就是自己定义的mapper组件的实例对象
*/
org.apache.hadoop.mapreduce.Mapper<INKEY,INVALUE,OUTKEY,OUTVALUE> mapper =
(org.apache.hadoop.mapreduce.Mapper<INKEY,INVALUE,OUTKEY,OUTVALUE>)
//TODO 通过反射的形式来进行构建的
ReflectionUtils.newInstance(taskContext.getMapperClass(), job);
// make the input format
org.apache.hadoop.mapreduce.InputFormat<INKEY,INVALUE> inputFormat =
(org.apache.hadoop.mapreduce.InputFormat<INKEY,INVALUE>)
//TODO 构建一个输入组件, 默认是TextInputFormat.class
ReflectionUtils.newInstance(taskContext.getInputFormatClass(), job);
// rebuild the input split
//TODO 先进行逻辑切片,每个逻辑切片,其实就是一个InputSplit对象
//TODO 每个 逻辑切片启动一个MapTask
org.apache.hadoop.mapreduce.InputSplit split = null;
split = getSplitDetails(new Path(splitIndex.getSplitLocation()),
splitIndex.getStartOffset());
LOG.info("Processing split: " + split);
/**
* TODO
* 正常来说,应该是由InputFormat来定义输入的格式
* 具体的输入工作由RecordReader来做, RecorReader 就是inputFormat的衍生组件 (组合设计模式)
* RecordReader 是由 InputFormat 来创建和关闭的!
*/
org.apache.hadoop.mapreduce.RecordReader<INKEY,INVALUE> input = new NewTrackingRecordReader<INKEY,INVALUE>
(split, inputFormat, reporter, taskContext);
job.setBoolean(JobContext.SKIP_RECORDS, isSkipping());
/**
* TODO RecordWriter 负责数据写出
*/
org.apache.hadoop.mapreduce.RecordWriter output = null;
// get an output object
/**
* 如果没有 reducetask 那么就走 NewDirectOutputCollector
* 如果有 reducetask 那么就走 NewOutputCollector
*/
if (job.getNumReduceTasks() == 0) {
output =
new NewDirectOutputCollector(taskContext, job, umbilical, reporter);
} else {
//TODO 默认走这里
// 如果没有reducer直接写出,这里准备了MapBuffer 也就是环形缓冲区
/**
* TODO
* context.write()
* output.collect()
*/
output = new NewOutputCollector(taskContext, job, umbilical, reporter);
}
/**
* TODO MapContext 初始化 其实就是传播了很多的信息 来进行让其他的组件也能看到
* mapper.run(context) 中context 具备四个方法,其实就是调用了mapperContext的四个方法
*/
org.apache.hadoop.mapreduce.MapContext<INKEY, INVALUE, OUTKEY, OUTVALUE> mapContext =
new MapContextImpl<INKEY, INVALUE, OUTKEY, OUTVALUE>(job, getTaskID(),
input, output,
committer,
reporter, split);
//装饰者设计模式.就是 把 mapContext 包装了一下
/**
* mapper.run(context) 中context 具备四个方法,其实也就是调用了 mapperContext的四个方法
* 其实就是调用了 mapContext的四个同名方法
*/
org.apache.hadoop.mapreduce.Mapper<INKEY,INVALUE,OUTKEY,OUTVALUE>.Context
mapperContext =
new WrappedMapper<INKEY, INVALUE, OUTKEY, OUTVALUE>().getMapContext(
mapContext);
try {
/**
* TODO 访问数据源,如果是文件,提前创建好读取该文件的输入流
* TODO 构建了逐行读取器
* input = LineRecordReader
* 这里又是采用了包装,我发现 设计中 用包装比较好,
* 最重要的事情,就是根据OutputFormat创建了一个RecordReader
* real = LineRecordReader
*/
input.initialize(split, mapperContext);
//TODO map开始执行,其实就是我们自己写的程序的Mapper实例对象
mapper.run(mapperContext);
mapPhase.complete();
setPhase(TaskStatus.Phase.SORT);
statusUpdate(umbilical);
input.close();
input = null;
output.close(mapperContext);
output = null;
} finally {
closeQuietly(input);
closeQuietly(output, mapperContext);
}
}
4.4. NewOutputCollector(环形缓缓冲区的)初始化操作
NewOutputCollector(org.apache.hadoop.mapreduce.JobContext jobContext,
JobConf job,
TaskUmbilicalProtocol umbilical,
TaskReporter reporter
) throws IOException, ClassNotFoundException {
/**
* TODO 很重要
* context.write()
*
*/
collector = createSortingCollector(job, reporter);
partitions = jobContext.getNumReduceTasks();
if (partitions > 1) {
//TODO 如果大于1,那么就有分区的必要
partitioner = (org.apache.hadoop.mapreduce.Partitioner<K,V>)
ReflectionUtils.newInstance(jobContext.getPartitionerClass(), job);
}
//TODO 小于等于1
else {
//那么永远都是0号分区
partitioner = new org.apache.hadoop.mapreduce.Partitioner<K,V>() {
@Override
public int getPartition(K key, V value, int numPartitions) {
return partitions - 1;
}
};
}
}
4.4 createSortingCollector
private <KEY, VALUE> MapOutputCollector<KEY, VALUE>
createSortingCollector(JobConf job, TaskReporter reporter)
throws IOException, ClassNotFoundException {
MapOutputCollector.Context context =
new MapOutputCollector.Context(this, job, reporter);
//TODO MapOutputBuffer 这个就是环形缓冲区
Class<?>[] collectorClasses = job.getClasses(
JobContext.MAP_OUTPUT_COLLECTOR_CLASS_ATTR, MapOutputBuffer.class);
int remainingCollectors = collectorClasses.length;
Exception lastException = null;
for (Class clazz : collectorClasses) {
try {
if (!MapOutputCollector.class.isAssignableFrom(clazz)) {
throw new IOException("Invalid output collector class: " + clazz.getName() +
" (does not implement MapOutputCollector)");
}
Class<? extends MapOutputCollector> subclazz =
clazz.asSubclass(MapOutputCollector.class);
LOG.debug("Trying map output collector class: " + subclazz.getName());
//TODO 通过反射构建实例
/**
* 当前这个实例:
* 一个MapTask 对应一个MapOutputCollector
*/
MapOutputCollector<KEY, VALUE> collector =
ReflectionUtils.newInstance(subclazz, job);
/**
* TODO 环形缓冲区的初始化
*/
collector.init(context);
LOG.info("Map output collector class = " + collector.getClass().getName());
return collector;
} catch (Exception e) {
String msg = "Unable to initialize MapOutputCollector " + clazz.getName();
if (--remainingCollectors > 0) {
msg += " (" + remainingCollectors + " more collector(s) to try)";
}
lastException = e;
LOG.warn(msg, e);
}
}
throw new IOException("Initialization of all the collectors failed. " +
"Error in last collector was :" + lastException.getMessage(), lastException);
}
4.5 init 方法
public void init(MapOutputCollector.Context context
) throws IOException, ClassNotFoundException {
job = context.getJobConf();
reporter = context.getReporter();
mapTask = context.getMapTask();
mapOutputFile = mapTask.getMapOutputFile();
sortPhase = mapTask.getSortPhase();
spilledRecordsCounter = reporter.getCounter(TaskCounter.SPILLED_RECORDS);
partitions = job.getNumReduceTasks();
rfs = ((LocalFileSystem)FileSystem.getLocal(job)).getRaw();
//sanity checks
final float spillper =
job.getFloat(JobContext.MAP_SORT_SPILL_PERCENT, (float)0.8);
final int sortmb = job.getInt(JobContext.IO_SORT_MB, 100);
indexCacheMemoryLimit = job.getInt(JobContext.INDEX_CACHE_MEMORY_LIMIT,
INDEX_CACHE_MEMORY_LIMIT_DEFAULT);
if (spillper > (float)1.0 || spillper <= (float)0.0) {
throw new IOException("Invalid \"" + JobContext.MAP_SORT_SPILL_PERCENT +
"\": " + spillper);
}
if ((sortmb & 0x7FF) != sortmb) {
throw new IOException(
"Invalid \"" + JobContext.IO_SORT_MB + "\": " + sortmb);
}
sorter = ReflectionUtils.newInstance(job.getClass("map.sort.class",
QuickSort.class, IndexedSorter.class), job);
// buffers and accounting
/**
* TODO 初始化一个100M大小的环形缓冲区
* 100M
* 从100M 中分出来一部分 用来存储真实数据
*
*/
int maxMemUsage = sortmb << 20;
maxMemUsage -= maxMemUsage % METASIZE;
kvbuffer = new byte[maxMemUsage];
bufvoid = kvbuffer.length;
kvmeta = ByteBuffer.wrap(kvbuffer)
.order(ByteOrder.nativeOrder())
.asIntBuffer();
setEquator(0);
bufstart = bufend = bufindex = equator;
kvstart = kvend = kvindex;
maxRec = kvmeta.capacity() / NMETA;
softLimit = (int)(kvbuffer.length * spillper);
bufferRemaining = softLimit;
LOG.info(JobContext.IO_SORT_MB + ": " + sortmb);
LOG.info("soft limit at " + softLimit);
LOG.info("bufstart = " + bufstart + "; bufvoid = " + bufvoid);
LOG.info("kvstart = " + kvstart + "; length = " + maxRec);
// k/v serialization
comparator = job.getOutputKeyComparator();
keyClass = (Class<K>)job.getMapOutputKeyClass();
valClass = (Class<V>)job.getMapOutputValueClass();
serializationFactory = new SerializationFactory(job);
keySerializer = serializationFactory.getSerializer(keyClass);
keySerializer.open(bb);
valSerializer = serializationFactory.getSerializer(valClass);
valSerializer.open(bb);
// output counters
mapOutputByteCounter = reporter.getCounter(TaskCounter.MAP_OUTPUT_BYTES);
mapOutputRecordCounter =
reporter.getCounter(TaskCounter.MAP_OUTPUT_RECORDS);
fileOutputByteCounter = reporter
.getCounter(TaskCounter.MAP_OUTPUT_MATERIALIZED_BYTES);
// compression
if (job.getCompressMapOutput()) {
Class<? extends CompressionCodec> codecClass =
job.getMapOutputCompressorClass(DefaultCodec.class);
codec = ReflectionUtils.newInstance(codecClass, job);
} else {
codec = null;
}
// combiner
final Counters.Counter combineInputCounter =
reporter.getCounter(TaskCounter.COMBINE_INPUT_RECORDS);
combinerRunner = CombinerRunner.create(job, getTaskID(),
combineInputCounter,
reporter, null);
if (combinerRunner != null) {
final Counters.Counter combineOutputCounter =
reporter.getCounter(TaskCounter.COMBINE_OUTPUT_RECORDS);
combineCollector= new CombineOutputCollector<K,V>(combineOutputCounter, reporter, job);
} else {
combineCollector = null;
}
spillInProgress = false;
minSpillsForCombine = job.getInt(JobContext.MAP_COMBINE_MIN_SPILLS, 3);
spillThread.setDaemon(true);
spillThread.setName("SpillThread");
/**
* TODO 当前MapOutputBuffer这个缓冲区的管理类,事实上管理了两个重要的东西
* 1. 100M大小的字节数组
* 2. SpillThread 负责 kvBuffer 中装满了的80% 的数据的溢写
*/
spillLock.lock();
try {
spillThread.start();
while (!spillThreadRunning) {
//TODO 刚开始是阻塞状态
spillDone.await();
}
} catch (InterruptedException e) {
throw new IOException("Spill thread failed to initialize", e);
} finally {
spillLock.unlock();
}
if (sortSpillException != null) {
throw new IOException("Spill thread failed to initialize",
sortSpillException);
}
}
4.6 mapper.run(mapperContext);
这里就开始读取数据了,然后调用自己写的map的context然后写出去
public void run(Context context) throws IOException, InterruptedException {
//一般用来做初始化,默认实现什么都没做
setup(context);
try {
/**
* TODO nextkeyvalue() 取一个keyvalue 表示当前防范能取到值吗
* mapper组件中,都有一个叫做 context的参数 这个参数拥有最重要的四个方法:
* 1.context.nextKeyValue()
* 2.context.getCurrentKey()
* 3.context.getCurrentValue()
* 4.context.write()
*
* context 对象中 包含了真正去读取数据 和输出数据的对象实例
* 这里就是典型的装饰器设计模式的体现
*/
while (context.nextKeyValue()) {
map(context.getCurrentKey(), context.getCurrentValue(), context);
}
} finally {
// 一般用来做收尾操作
cleanup(context);
}
}
4.7 NewOutputCollector的write
@Override
public void write(K key, V value) throws IOException, InterruptedException {
int partion = partitioner.getPartition(key, value, partitions);
collector.collect(key, value, partion );
}
4.8 NewOutputCollector的collect方法
这里就是我们的环形缓冲器的处理方法
/**
* Serialize the key, value to intermediate storage.
* When this method returns, kvindex must refer to sufficient unused
* storage to store one METADATA.
*/
public synchronized void collect(K key, V value, final int partition
) throws IOException {
reporter.progress();
if (key.getClass() != keyClass) {
throw new IOException("Type mismatch in key from map: expected "
+ keyClass.getName() + ", received "
+ key.getClass().getName());
}
if (value.getClass() != valClass) {
throw new IOException("Type mismatch in value from map: expected "
+ valClass.getName() + ", received "
+ value.getClass().getName());
}
if (partition < 0 || partition >= partitions) {
throw new IOException("Illegal partition for " + key + " (" +
partition + ")");
}
checkSpillException();
bufferRemaining -= METASIZE;
if (bufferRemaining <= 0) {
// start spill if the thread is not running and the soft limit has been
// reached
spillLock.lock();
try {
do {
if (!spillInProgress) {
final int kvbidx = 4 * kvindex;
final int kvbend = 4 * kvend;
// serialized, unspilled bytes always lie between kvindex and
// bufindex, crossing the equator. Note that any void space
// created by a reset must be included in "used" bytes
final int bUsed = distanceTo(kvbidx, bufindex);
final boolean bufsoftlimit = bUsed >= softLimit;
if ((kvbend + METASIZE) % kvbuffer.length !=
equator - (equator % METASIZE)) {
// spill finished, reclaim space
resetSpill();
bufferRemaining = Math.min(
distanceTo(bufindex, kvbidx) - 2 * METASIZE,
softLimit - bUsed) - METASIZE;
continue;
} else if (bufsoftlimit && kvindex != kvend) {
// spill records, if any collected; check latter, as it may
// be possible for metadata alignment to hit spill pcnt
startSpill();
final int avgRec = (int)
(mapOutputByteCounter.getCounter() /
mapOutputRecordCounter.getCounter());
// leave at least half the split buffer for serialization data
// ensure that kvindex >= bufindex
final int distkvi = distanceTo(bufindex, kvbidx);
final int newPos = (bufindex +
Math.max(2 * METASIZE - 1,
Math.min(distkvi / 2,
distkvi / (METASIZE + avgRec) * METASIZE)))
% kvbuffer.length;
setEquator(newPos);
bufmark = bufindex = newPos;
final int serBound = 4 * kvend;
// bytes remaining before the lock must be held and limits
// checked is the minimum of three arcs: the metadata space, the
// serialization space, and the soft limit
bufferRemaining = Math.min(
// metadata max
distanceTo(bufend, newPos),
Math.min(
// serialization max
distanceTo(newPos, serBound),
// soft limit
softLimit)) - 2 * METASIZE;
}
}
} while (false);
} finally {
spillLock.unlock();
}
}
try {
// serialize key bytes into buffer
int keystart = bufindex;
keySerializer.serialize(key);
if (bufindex < keystart) {
// wrapped the key; must make contiguous
bb.shiftBufferedKey();
keystart = 0;
}
// serialize value bytes into buffer
final int valstart = bufindex;
valSerializer.serialize(value);
// It's possible for records to have zero length, i.e. the serializer
// will perform no writes. To ensure that the boundary conditions are
// checked and that the kvindex invariant is maintained, perform a
// zero-length write into the buffer. The logic monitoring this could be
// moved into collect, but this is cleaner and inexpensive. For now, it
// is acceptable.
bb.write(b0, 0, 0);
// the record must be marked after the preceding write, as the metadata
// for this record are not yet written
int valend = bb.markRecord();
mapOutputRecordCounter.increment(1);
mapOutputByteCounter.increment(
distanceTo(keystart, valend, bufvoid));
// write accounting info
kvmeta.put(kvindex + PARTITION, partition);
kvmeta.put(kvindex + KEYSTART, keystart);
kvmeta.put(kvindex + VALSTART, valstart);
kvmeta.put(kvindex + VALLEN, distanceTo(valstart, valend));
// advance kvindex
kvindex = (kvindex - NMETA + kvmeta.capacity()) % kvmeta.capacity();
} catch (MapBufferTooSmallException e) {
LOG.info("Record too large for in-memory buffer: " + e.getMessage());
spillSingleRecord(key, value, partition);
mapOutputRecordCounter.increment(1);
return;
}
}
4.7 我们现在已经走到流程图的哪里了

未完待续…
更多推荐

所有评论(0)