一、MapReduce出生地
2004年,Google发表了一篇论文,向全世界的人们介绍了MapReduce。现在已经到处都有人在谈论MapReduce(微软、雅虎等大公司也不例外)。在Google发表论文时,MapReduce的最大成就是重写了Google的索引文件系统。而现在,谁也不知道它还会取得多大的成就。MapReduce被广泛地应用于日志分析、海量数据排序、在海量数据中查找特定模式等场景中。Hadoop根据Google的论文实现了MapReduce这个编程框架,并将源代码完全贡献了出来。本章就是要向大家介绍MapReduce这个流行的编程框架。
二、理解MapReduce
【例1】我们要数图书馆中的所有书,怎么办?非常简单,你数1号架上的书,我数2号书架上的书。我们人越多,数书就越快,这就是Map。
把所有人的统计数加在一起,这就是Reduce。
例1:就是MapReduce最基本的模型。
【例2 】 用通俗易懂的大白话讲解Map/Reduce原理: http://blog.csdn.net/lifuxiangcaohui/article/details/22675437
例2:就是MapReduce运用的实例模型~
三、 为什么要用MapReduce
MapReduce的流行是有理由的。它非常简单、易于实现且扩展性强。大家可以通过它轻易地编写出同时在多台主机上运行的程序,也可以使用Ruby、Python、PHP和C++等非Java类语言编写Map或Reduce程序,还可以在任何安装Hadoop的集群中运行同样的程序,不论这个集群有多少台主机。MapReduce适合处理海量数据,因为它会被多台主机同时处理,这样通常会有较快的速度。
下面来看一个例子。
引文分析是评价论文好坏的一个非常重要的方面,本例只对其中最简单的一部分,即论文的被引用次数进行了统计。假设有很多篇论文(百万级),且每篇论文的引文形式如下所示:
在单机运行时,想要完成这个统计任务,需要先切分出所有论文的名字存入一个Hash表中,然后遍历所有论文,查看引文信息,一一计数。因为文章数量很多,需要进行很多次内外存交换,这无疑会延长程序的执行时间。但在MapReduce中,这是一个WordCount就能解决的问题。(HADOOP版本说明、与MapReduce模型简介)
)
四、MapReduce计算模型
在Hadoop中,用于执行MapReduce任务的机器有两个角色:一个是JobTracker,另一个是TaskTracker。JobTracker是用于管理和调度工作的,TaskTracker是用于执行工作的。一个Hadoop集群中只有一台JobTracker。
MapReduce Job:
在Hadoop中,每个MapReduce任务都被初始化为一个Job。每个Job又可以分为两个阶段:Map阶段和Reduce阶段。这两个阶段分别用两个函数来表示,即Map函数和Reduce函数。Map函数接收一个<key, value>形式的输入,然后产生同样为<key, value>形式的中间输出,Hadoop会负责将所有具有相同中间key值的value集合到一起传递给Reduce函数,Reduce函数接收一个如<key, (list of values)>形式的输入,然后对这个value集合进行处理并输出结果,Reduce的输出也是<key, value>形式的。
为了方便理解,分别将三个<key, value>对标记为<k1, v1>、<k2, v2>、<k3, v3>,那么上面所述的过程就可以用下面几张图来表示了。
图1:
图2:
图3:
图4:
五、MapReduce 1是如何运行的
来源于:http://blog.csdn.net/summerDG/article/details/16509765
运行作业的方式通常有两种,一种是通过Job的submit()方法来提交任务,另一种是通过waitForCompletion()(如果作业没有提交就提交作业,然后一直等待作业执行完成)。
mapred.job.tracker决定了执行的方法:如果这个配置属性被设为local,那么就使用本地作业运行器,这个运行器使作业运行在单个jvm上,设计它的目的是在小数据集上运行、测试MapReduce项目;如果这个配置被设为host:port形式,这个属性值就被解释为jobTracker地址,运行器就把作业提交到这个地址的jobtracker。
在hadoop2.0有一个新的mapreduce框架被引入了,只要通过设置mapreduce.framework.name属性值,相应的框架就可以执行了,属性值有三种local(用于本地作业运行器),classic(用于老的mapreduce框架),yarn(新框架)。
经典框架:
客户端:用于提交作业
jobtracker:协调作业的运行,jobtracker是一个Java应用程序,主类是JobTracker
tasktracker:运行作业被划分后的任务,tasktracker是一个java应用程序,主类是TaskTracker
分布是文件系统(HDFS),用于在其他实体间共享作业文件。
1、作业提交:
看一下Job的summit()方法,我们就知道其过程了
[java] view plain copy
- public void submit() throws IOException, InterruptedException,
- ClassNotFoundException {
- ensureState(JobState.DEFINE);
- setUseNewAPI();
- // Connect to the JobTracker and submit the job
- connect();
- info = jobClient.submitJobInternal(conf);
- super.setJobID(info.getID());
- state = JobState.RUNNING;
- }
创建一个JobClient(第三版中是JobSummitter,但我用的是1.2.1版本,又改回到了JobClient)实例来调用submitJobInternal,这个方法才真正要开始做正事了,我们一会再具体讲这个函数。
waitForCompletion()则是轮询作业进度(每个一秒查询一次),如果和上次报告的进度不同就向控制台报告进度,作业成功完成,就显示作业计数器,否则就把引起作业失败的错误打印到控制台
[java] view plain copy
- 1282 public boolean waitForCompletion(boolean verbose
- 1283 ) throws IOException, InterruptedException,
- 1284 ClassNotFoundException {
- 1285 if (state == JobState.DEFINE) {
- 1286 submit(); //这里提交任务
- 1287 }
- 1288 if (verbose) {
- 1289 monitorAndPrintJob();
- 1290 } else {
- 1291 // get the completion poll interval from the client.
- 1292 int completionPollIntervalMillis =
- 1293 Job.getCompletionPollInterval(cluster.getConf());
- 1294 while (!isComplete()) {//轮询是否完成
- 1295 try {
- 1296 Thread.sleep(completionPollIntervalMillis);
- 1297 } catch (InterruptedException ie) {
- 1298 }
- 1299 }
- 1300 }
- 1301 return isSuccessful();
- 1302 }
作业提交进程是由submitJobInternal完成的:
[java] view plain copy
- public
- RunningJob submitJobInternal(final JobConf job
- ) throws FileNotFoundException,
- ClassNotFoundException,
- InterruptedException,
- IOException {
- /*
- * configure the command line options correctly on the submitting dfs
- */
- ...
- JobID jobId = jobSubmitClient.getNewJobId();
- Path submitJobDir = new Path(jobStagingArea, jobId.toString());
- jobCopy.set("mapreduce.job.dir", submitJobDir.toString());
- ...
- // Check the output specification
- if (reduces == 0 ? jobCopy.getUseNewMapper() :
- jobCopy.getUseNewReducer()) {
- org.apache.hadoop.mapreduce.OutputFormat<?,?> output =
- ReflectionUtils.newInstance(context.getOutputFormatClass(),
- jobCopy);
- output.checkOutputSpecs(context);
- } else {
- jobCopy.getOutputFormat().checkOutputSpecs(fs, jobCopy);
- }
- jobCopy = (JobConf)context.getConfiguration();
- // Create the splits for the job
- FileSystem fs = submitJobDir.getFileSystem(jobCopy);
- int maps = writeSplits(context, submitJobDir);
- jobCopy.setNumMapTasks(maps);
- ...
- // Write job file to JobTracker's fs
- FSDataOutputStream out =
- FileSystem.create(fs, submitJobFile,
- new FsPermission(JobSubmissionFiles.JOB_FILE_PERMISSION));
- ...
- // Now, actually submit the job (using the submit name)
- //
- printTokens(jobId, jobCopy.getCredentials());
- status = jobSubmitClient.submitJob(
- jobId, submitJobDir.toString(), jobCopy.getCredentials());
- ...
- }
源码比想象中的长,那我们只看一下重点
[java] view plain copy
- JobID jobId = jobSubmitClient.getNewJobId();
进去第一件事就是获取jobId,用到了jobSubmitClient对象,jobSubmitClient对应的类是JobSubmissionProtocol的实现之一(目前有两个实现,JobTracker和LocalJobRunner),由此可判断出jobSubmitClient对应的类要么是JobTracker,要么是LocalJobRunner。那作业是提交到JobTracker去,还是在本地执行?可能就是看这个jobSunmitClient初始化时得到的是哪个类的实例了,我们可以稍稍的先往后看看,你会发现submitJobInternal最后用了
[java] view plain copy
- status = jobSubmitClient.submitJob(
- jobId, submitJobDir.toString(), jobCopy.getCredentials())
来提交作业,再稍稍看看JobTracker和LocalJobRunner的submitJob实现,看来确实是这么回事。好,那我们就先跳回去看看这个jobSubmitClient是如何初始化的。在JobClient的init中我们可以发现jobSubmitClient的初始化语句:
[java] view plain copy
- public void init(JobConf conf) throws IOException {
- String tracker = conf.get("mapred.job.tracker", "local");
- tasklogtimeout = conf.getInt(
- TASKLOG_PULL_TIMEOUT_KEY, DEFAULT_TASKLOG_TIMEOUT);
- this.ugi = UserGroupInformation.getCurrentUser();
- if ("local".equals(tracker)) {
- conf.setNumMapTasks(1);
- this.jobSubmitClient = new LocalJobRunner(conf);
- } else {
- this.rpcJobSubmitClient =
- createRPCProxy(JobTracker.getAddress(conf), conf);
- this.jobSubmitClient = createProxy(this.rpcJobSubmitClient, conf);
- }
- }
原来是跟conf中的mapred.job.tracker属性有关,如果你没设置,那默认得到的值就是local,jobSubmitClient也就会被赋予LocalJobRunner的实例。平时,我们开发时一般都只是引用lib里面的库,不引用conf文件夹里的配置文件,这里就能解释为什么我们直接Run as Java Application时,作业被提交到Local去运行了,而不是Hadoop Cluster中。
由于现在的水平有限,作业提交的内容暂时只深入到这里,可以看一下这篇博文,介绍的很详细,但不同的版本实现的api不会完全相同。
检查作业的输出说明。例如:没有指定输出目录或者输出目录已经存在就不提交,把错误抛回给mapreduce程序。
计算作业的输入分片
将运行作业所需的资源(包括作业jar文件,配置文件和计算所得的输出分片)复制到jobtracker的文件系统中,这个文件系统是在一个以作业id命名的目录下变,也就是我们先前代码
[java] view plain copy
- Path submitJobDir = new Path(jobStagingArea, jobId.toString());
创建的目录路径
这段代码就很好的展现了这一过程:
[java] view plain copy
- // Write job file to JobTracker's fs
- FSDataOutputStream out =
- FileSystem.create(fs, submitJobFile,
- new FsPermission(JobSubmissionFiles.JOB_FILE_PERMISSION));
最后就是通过调用Jobtracker的submitJob()方法来告知jobtracker作业准备执行。
[java] view plain copy
- // Now, actually submit the job (using the submit name)
- //
- printTokens(jobId, jobCopy.getCredentials());
- status = jobSubmitClient.submitJob(
- jobId, submitJobDir.toString(), jobCopy.getCredentials());
2、作业初始化:
Jobtracker接收到一个对submitJob()的响应后,它会把作业放入一个内部队列,作业调度器会对它进行调度并初始化。初始化包括创建一个对象来表示正在运行的作业,这个对象会封装任务、记录信息,从而跟踪任务的状态和进度。
想要创建运行的任务列表,作业调度器首先要从共享文件系统检索由客户端计算的输入分片,然后为每个分片创建一个map任务。要创建的reduce任务的数量由mapred.reduce.tasks属性值决定,并通过setNumReduceTasks方法来设定(该方法是通过属性值设定的),调度器创建相应个数的reduce任务运行,任务此时被赋予一个id。
除了map和reduce任务,还有两个任务被创建了:一个作业建立任务,一个作业清理任务。这两个任务由tasktracker运行,并且 在map任务运行前建立作业,在reduce任务执行完后清理作业。我们还为作业配置了一个OutputCommitter来决定要运行的代码,默认情况下是 FileOutputCommitter。对于作业建立任务,它将会为作业创建一个输出目录,并给任务输出创建一个临时工作空间,对于作业清理任务,它会把临时工作空间删除掉。
3、任务分配:
Tasktrackers会运行一个简单的循环来周期性地给jobtracker发送一个“heartbeat”方法响应。“heartbeat”告诉tasktracker现在还在运行(还活着),而且“heartbeat”同时还是tasktracker和jobtracker的信息通道。作为“heartbeat”的一部分,tasktracker将会指明它是否已经准备运行新任务,如果是,jobtracker会分配一个任务给tasktracker,jobrtacker通过“heartbeat”的返回值与tasktracker通信。
在jobtracker为tasktracker选择任务前,jobtracker必须选择一个作业,从而从这个作业中选择任务。调度算法很复杂,默认方法是简单地维护一个作业优先级列表。选择好一个作业后,就可以选择作业了。
Tasktracker有固定数量的任务槽以供map和reduce任务使用,这些任务槽是独立设置的。例如:一个tasktracker也许会被配置成同时运行两个map任务和两个reduce任务(精确的数字依赖于tasktracker的核的数目和内存大小)。在一个给定的任务范围内,默认的调度器会在填满reduce任务槽之前先填满map任务槽。所以如果tasktracker拥有空map任务槽,那么jobtracker将会先选择一个map任务;否则才会选择一个reduce任务。
要选择一个reduce任务,jobtracker仅需要从待运行的reduce任务列表中选择下一个就好,因为不需要考虑本地化数据。对于一个map任务,jobtracker会考虑tasktracker的网络位置,并选取一个离tasktracker尽可能近的任务。最佳情况就是,该任务是本地化数据的,也就是说对应分片正好也在这个节点上。或者,任务可能是本地机架上的:对应分片在同一个机架上,而不是同个节点上。还有一些任务即使不是本地化数据的也不是本地机架的,那么只能从不同的机架上检索他们的数据了。
4、执行任务:
现在tasktracker已经被分配了一个任务了,下一步就是运行这个任务。首先,tasktracker会从共享文件系统中复制作业包到本tasktracker文件系统。它还会从分布式缓存中复制应用需要的文件到本地硬盘。然后,它会为该任务建立一个工作目录,减压jar包的内容到该目录。最后,它会建立一个TaskRunner实例来运行该任务。
TaskRunner会建立一个新的JVM来运行每个任务,从而使用户定义的map和reduce方法产生的bug不会影响tasktracker(例如,导致tasktracker崩溃或挂起)。但是,可以在任务之间重利用JVM。
子进程会通过umbilical接口与父进程进行通信。这样可以通知父进程任务的进度(每隔1秒,直到任务完成)。
每个任务都有建立和清除行为,这些行为会运行在和任务同样的JVM上,而且这些行为由作业的OutputCommitter决定。清理行为用来提交任务,在基于文件的作业的情况下,意味着其输出将会被写入到与任务对应的位置。这个提交协议确保了在推测执行的时候,重复任务(推测执行的概念)只有一个会被提交,其余的都中止。
5、进度与状态更新
当一个任务正在运行,它可以跟踪其进度,也就是任务的完成比例。对于map任务,这是已处理输入数据的比例。对于reduce任务,它是一个更复杂一点,但系统仍然可以估算reduce输入被处理的比例。为此,它把总进度分为三部分,对应到shuffle的三个阶段。例如,如果任务已经执行了reducer一半的输入,那么任务进度就是5/6.因为已经完成了复制与排序阶段(各占1/3),并且已近完成了reduce阶段的一半(1/6)。
6、Streaming和Pipes
Streaming和Pipes都是运行特殊的map和reduce任务来运行用户提供的可执行程序,并与其进行通信。
Streaming任务会利用标准输入输出流与进程通信。另一方面,Pipes任务监听socket并发送该环境的一个端口号给c++进程,这样在开始时,c++进程就建立了一个与父java Pipes任务的持久化socket链接。
两种情况下,java进程都会在任务执行时把输入键值对发送给外部进程,由外部进程运行用户定义的map和reduce方法,然后把输出键值对传回给java进程。从tasktracker来看,这好像是在子进程中运行了map和reduce代码。
7、作业完成:
Jobtracker收到最后一个任务(这是一个特殊的作业清理任务)的完成通知后,辨别作业状态改为“成功”。然后当作业查询状态的时候,就会知道作业已完成,然后打印信息通知用户,返回waitForCompletion()。作业统计与计数值会打印到控制台。
Jobtracker还会发送一个http作业通知(如果配置了的话)。可以通过job.end.notifucation.url属性来配置。
最后,jobtracker清理掉他的工作状态,叫tasktracker也做一样的工作(如清空中间输出)。
六、MapReduce2 是如何运行的
YARN框架:
YARN框架主要是将jobtracker的工作分为单独的实体,从而弥补经典框架的扩展性短板。jobtracker主要负责作业调度和任务进度监视。
YARN将这两个责任分配给两个独立的守护进程:一个是资源管理器(resource manager),用于管理进群上资源的使用,另一个是应用管理器(application master),用于管理集群上应用的生命周期。设计思想就是,application master与resource manager协商集群资源的分配(资源是由一组Container来描述的,每一个container都有一定的内存),然后在这些container上运行特定的应用进程,这些container被node manager监视,以确保该应用不会使用额外的资源。 NodeManager 是每一台机器框架的代理,是执行应用程序的容器,监控应用程序的资源使用情况 (CPU,内存,硬盘,网络 ) 并且向调度器汇报。
和jobtracker相比,应用的每一个实例都有一个分配的application master,它运行在应用程序的持续时间内。application master用来调节map和reduce任务的运行。
事实上,mapreduce仅仅是YARN应用的一种类型,这里还有其他YARN应用,如分布式shell,它可以在在集群的一组node上运行一个脚本。YARN设计的亮点在于不同的YARN应用共存于同样的集群上。
此外,用户甚至可以在相同的YARN集群上运行不同版本的MapReduce,这样可以让更新的MapReduce更便于管理。
YARN上的实例:
客户端:提交MapReduce作业
YARN资源管理器(resource manager),调度进群上的计算资源的分配。
YARN节点管理器(node manager),启动并监视进群上机器的计算容器(container)。
MapReduce应用管理器(application master),调度MapReduce作业上运行的任务,application master和MapReduce任务都运行在容器中(由resource manager计划
,并由node manager管理)。
分布式文件系统:
1、作业提交:
在YARN中,作业提交和MapRduce1中所用的api相同。YARN中有个ClientProtocol接口,这个接口在mapreduce.framework.name设置为yarn的时候被激活。提交过程和经典的实现方案很像。新的jobID是由resource manager检索出来的,而不是jobtracker,只是以YARN的命名方式,它是一个应用ID。客户端检查作业的输出规范,计算输入分片(虽然还可以在集群上计算分片,yarn.app.mapreduce.am.compute-splits-in-cluster设置),复制作业资源(包括作业包,配置和分片信息)到HDFS。最后,作业通过调用submitApplication被提交到resource manager。
2、作业初始化:
当resource manager接收到作业的submitApplication()的呼叫,它就会把这个请求传给调度器。调度器分配一个container和resource manager,然后在node manager的管理下启动application master进程。
MapReduce作业的application master是一个java应用程序,它的主类是MRAppMaster。他会通过创建很多记录对象(bookkeeping object)跟踪作业进度的方式来初始化作业,同时它会接收作业的进度和完成情况报告。接下来,application master会从共享文件系统检索由客户端上计算的输入分片。然后Application master会为每个分片创建一个map任务对象,和一些reduce任务对象(数量由mapreduce.job.reduces属性决定)。
Application master接下来做的事情是决定如何运行任务(任务构成MapReduce作业)。如果作业很小,那么application master会选择在同一个JVM(application master所在JVM)上运行这个任务。与在单个节点上顺序运行这些任务相比,如果application master判断分配的开销,以及在新container上运行任务的开销超过了并行所能带来的好处,那么才会在同一个JVM上运行这些任务(与MapReduce1不同,MapReduce1中多个小作业是不会在单个tasktracker上运行的)。这种作业被称做,以uber任务运行的作业。
什么才算是小作业呢?默认情况下,小作业就是只有一个reducer,最多10个mapper,输入量小于一个HDFS块的大小的作业。(这些值是可以通过设定mapreduce.job.ubertask.maxmaps、mapreduce.job.ubertask.maxreduces、mapreduce.job.ubertask.maxbytes的值来改变。)还可以没有uber作业(把mapreduce.job.ubertask.enable设定为false)。在任何任务运行前,作业创建方法就被调用创建了作业输出目录。MapReduce1中这个方法是在一个特定的任务(由tasktracker运行的)中调用的,而在YARN中,这个方法是直接由application master调用的。
3、任务分配:
如果作业不能以uber任务运行,那么application master就会从resource manager请求足够数量的container(能满足作业上所有map和reduce任务)。所有的请求都会搭载在“heartbeat”,这些信息包括每个map任务的数据位置,尤其是主机号和输入分片所在的机架。调度器利用这些信息来做调度(类似于jobtracker的调度器)。调度器会尽量把任务放在相应数据所在结点,但是如果不行,调度器就会把任务放在同一机架(和数据不在同一节点)上。
这些请求还会指定任务的内存需求。默认情况下,map和reduce任务会被分配1024MB的内存,但这可以通过mapreduce.map.memory.mb 和 mapreduce.reduce.memory.mb来设定。
这种内存分配不同于MapReduce1,在MapReduce1上,tasktracker有固定数量的任务槽(在集群配置时设定),每个任务都会运行在单独的任务槽里。任务槽有允许的最大内存,对于集群来说,它还是固定的,这样会导致在任务只使用少量内存时(因为等待其他任务不能使用空闲内存)内存的浪费,还会导致作业不能完成而失败的问题,因为没有足够的内存去正确的运行因此无法完成。
YARN中,资源更加细粒度了,所以这两个问题就可以避免了。特别是,应用可以请求最大分配量和最小分配量之间的任意大小的内存,但这必须是最小分配量的整数倍。默认的内存分配是特定调度器完成的,对于容量调度器,默认最小分配量为1024MB(由yarn.scheduler.capacity.maximum-allocation-mb设定)。因此,作业能请求从1到10G任意大小的内存(必须是整数GB,调度器会计算出最接近的整数值),可以通过适当的设定mapreduce.map.memory.mb和mapreduce.reduce.memory.mb来改变最小分配量。
4、任务执行:
一旦任务通过resource manager调度器分配到了一个container,这个application master就会与node manager联系来启动这个container。任务由一个java应用执行,这个应用的主类是YarnChild。在application master运行任务前,它会把任务需要的数据本地化,这些数据包括作业配置和jar文件,还有分布式缓存的相关文件。最后application master会运行map和reduce任务。
在MapReduce1中tasktracker会为任务酝酿新的JVM,出于这个原因,YarnChild运行在一个专用的JVM上来将用户代码与长期运行的系统守护进程隔离开。和MapReduce不同的是,YARN不支持JVM重用,所以每个任务运行在新的JVM上。
Streaming 和 Pipes工作方式和MapReduce1相同。
5、进度与状态更新:
在YARN下运行时,任务会向application master来报告它的进度和状态,application master会有一个对作业的整体了解。与MapReduce1相比,进度更新流从子进程通过tasktracker到达jobtracker聚集。客户端每秒轮询application master(可以通过mapreduce.client.progressmonitor.pollinterval来改变轮询的间隔时间)来接收进度更新,然后显示给用户。
6、作业完成:
每隔5秒客户端会通过waiForCompletion()检查作业是否完成,这个轮间隔可以通过mapreduce.client.completion.pollinterval属性设置。
作业完成通知还支持http回调的方式(像MapReduce1),在MapReduce2中,application master启动回调函数。
作业完成后,application master和任务container会清理掉工作状态,OutputCommitter的作业调用清理方法。作业信息由作业历史服务器打包以供用户查看。
七、源码分析篇(空间有限,精力有限,只留好文链接~)
hadoop 源码分析(一) jobClient 提交到JobTracker :http://wwangcg.iteye.com/blog/1836567
hadoop 源码分析(二) jobClient 通过RPC 代理提交作业到JobTracker:http://wwangcg.iteye.com/blog/1837254
hadoop 源码分析(三) hadoop RPC 机制:http://wwangcg.iteye.com/blog/1838038
hadoop 源码分析(四)JobTracker 添加job 到schduler 队列中:http://wwangcg.iteye.com/blog/1838933
hadoop 源码分析(五)hadoop 任务调度TaskScheduler:http://wwangcg.iteye.com/blog/1839921
hadoop 源码分析(六)hadoop taskTracker 生成map 和reduce任务流程:http://wwangcg.iteye.com/blog/1844646
以上来自: --来自(iteye博客: 黎明lm)
MapReduce源码分析总结:http://blog.csdn.net/HEYUTAO007/article/details/5725379 来自csdn-和大黄
Hadoop-2.4.1源码分析--MapReduce作业(job)提交源码跟踪:http://blog.csdn.net/u010010428/article/details/51416317 来自csdn- Gleemanman
八、MapReduce任务异常处理
典型问题:Hadoop如何判断一个任务失败?失败了怎么做?
分析:实际情况下,用户代码存在软件错误、进程崩溃、机器故障等都会导致失败。Hadoop判断的失败有不同级别类型,针对不同级别的失败有不同的处理对策,这就是MapReduce的容错机制。下面是几个不同级别失败的分类:
1、任务失败
分为3种情况:Task失败、子进程JVM退出、超时检测被关闭。
1.任务失败。最常见的是Map或Reduce任务的失败,即写的本身MR代码导致失败。发生Map或Reduce失败的时候,子任务JVM进程会在退出之前向上一级TaskTracker发送错误报告。错误报告最后悔记录在用户的错误日志里面,TaskTracker会将此次task attempt标记为failed,释放一个任务槽slot用来运行另一个任务。
2. 子进程JVM突然退出。可能由于JVM的bug导致,从而导致MapReduce用户代码执行失败。在这种情况下,TaskTracker会监控到进程以便退出,并将此次尝试标记为“failed”失败。
3. 关闭了超时连接(把超时timeout设置成0)。所以长时间运行的任务永不会被标记failed。在这种情况下,被挂起的任务永远不会释放其所占用的任务槽slot,并随时间推移会降低整个集群的性能。
2、TaskTracker失败
正常情况下,TaskTracker 会通过心跳向 JobTracker 通信,如果发生故障,心跳减少, JobTracker 会将TaskTracker 从等待任务调度的池中移除,安排上一个成功运行的 Map 任务返回。
主要有两种情况:
1.Map 阶段的情况。如果属于未完成的作业,Reduce 阶段无法获取本地 Map 输出的文件结果,任务都需要重新调度和执行,只要是Map阶段失败必然是重新执行这个任务。
2.Reduce 阶段的情况。自然是执行未完成的 Reduce 任务。因为 Reduce 只要执行完了就会把输出写到 HDFS 上。
3、JobTracker失败
最严重的情况就是 JobTracker 失败,并且这情况在单点故障时是最严重的,因为此情况下作业最终失败。
解决方案是启动多个 JobTracker ,只运行主 JobTracker ,其可以通过 ZooKeeper 来协调。
4、任务失败重试
当 MapTask 执行失败后会重试,超过重试次数(默认为4),整个Job会失败。
Hadoop 提供配置参数 mapred.max.ap.failures.percent 解决这个问题。如果一个 Job 有 200 个 MapTask ,参数设置为5,则单个 Job 最多允许 10 个 MapTask 失败(200×5%=10),其可以在配置文件 mapred-site.xml 里修改。
5、 Yarn的失败
对于在YARN中运行的MapReduce程序,需要考虑一下几种实体的失败
任务、application master、节点管理器和资源管理器。
1).任务运行失败
任务运行失败类似于经典的情况。JVM的运行时异常和突然退出被反馈给applitation master,该任务尝试被标记为失败。类似的,通过在umbilical
channel上的ping缺失(由mapreduce.task.time设定超时值),applitationmaster会注意到挂起的任务,任务尝试再次被标记为失败。
确定任务什么时候失败的配置属性和经典情况一样4次尝试后任务标记
为失败(map任务的由mappeduce.map.maxattemps 设置,reducer任务的
由mappeduce.reduce.maxattempts设置)。如果一个作业中超过
mappeduce.map.failures.maxpercent的map任务或超过mapreduce.reduce.failures.maxpercent 的reduce任务运行失败,那么整个作业就失败了!
2).application master运行失败
YARN中的应用程序在运行失败的时候有几次尝试机会,就像MapReduce任务在遇到硬件或网络故障时要进行几次尝试一样。
在默认情况下,只要应用程序运行失败一次就会被标记为失败,但我们可以设置yarn.resourcemanager.am.max一retries属性增加允许失败的次数。
application master向资源管理器发送周期性的心跳,当application master发生故障时,资源管理器将检测到该故障并在一个新的容器(由节点管理器管理)中开始一个新的master实例。Mapreduce application master可以恢复故障应用程序所运行任务的状态,使其不必重新运行。默认情况下是不能恢复的,因此故障applicatibn master将重新运行它们的所有任务,但我们可
以设置yarn.app.mapreduce.am.job.recovery.enable为true,启用这个功能。
客户端向application master轮询进度报告,如果它的application master运行失败,客户端就需要定位新的实例。
在作业初始化期间,客户端向资源管理器询问并缓存application master的地址,使其每次需要向applicationmaster查询时不必重载资源管理器。
但是,如果applicationMaster运行失败,客户端就会在发出状态更新请求时超时,这时客户端会返回资源管理器请求新的application master的地址。
3).节点管理器运行失败
如果节点管理器失败,就会停止向资源管理器发送心跳信息并被移出可用节点资源管理器池。
默认值为600000(10分钟)的属性yarn.resourcemanager.nm.liveness-monitor.expiry-interval-ms决定着资源管理器认为节点管理器失败之前的等待时间。
在故障节点管理器上运行的所有任务或application master都用前两节描述的机制进行恢复。
如果应用程序的运行失败次数过高,那么节点管理器可能会被拉黑。由application master管理黑名单,对于MapReduce,如果一个节点管理器上有
超过三个任务失败,application master就会尽量将任务调度到不同的节点上。用户可以通过mapreduce.job.maxtaskfailupes.per.tracker设置该阈值。
注意,在本书写作时,资源管理器节点不执行拉黑操作,因此新作
业中的任务可能被调度到故障节点上,即使这些故障节点已经被运行早期作业的application master 拉黑、
4).资源管理器运行失败
资源管理器失败是非常严重的问题,没有资源管理器,作业和任务容器将无法启动。
资源管理器的设计从一开始就通过使用检查点机制将其状态保存到持久性存储,从而实现从失败中恢复,不过在本书写作的时候,最新版本还没有完全实现该功能。
在资源管理器失败后,由管理员启动一个新的资源管理器实例并恢复到保存的状态。状态由系统中的节点管理器和运行的应用程序组成。
(注意,任务并非资源管理器状态的组成部分,因为它们由application master管理。)
因此,存储的状态数量比jobtracker中的状态量更好管理。
资源管理器使用的存储容量通过yarn.resourcemanager.store.class属性进行配置。
默认值为opg.apache.hadoop.yarn.server.resourcemanager.recover.Memstore,这保存在内存中,因此可操作性不是很高。
然而基于ZooKeeper的存储,以后会支持从资源管理器失败中进行可靠的恢复。
九、MapReduce任务调度简介
MapReduce中作业调度机制主要有3种:
1.先入先出FIFO
Hadoop 中默认的调度器,它先按照作业的优先级高低,再按照到达时间的先后选择被执行的作业。
2.公平调度器(相当于时间片轮转调度)
为任务分配资源的方法,其目的是随着时间的推移,让提交的作业获取等量的集群共享资源,让用户公平地共享集群。具体做法是:当集群上只有一个任务在运行时,它将使用整个集群,当有其他作业提交时,系统会将TaskTracker节点空间的时间片分配给这些新的作业,并保证每个任务都得到大概等量的CPU时间。
配置公平调度器
1)修改mapred-stie.xml 加入如下内容
<property>
<name>mapred.jobtracker.taskScheduler</name>
<value>org.apache.hadoop.mapred.FairScheduler</value>
</property>
<property>
<name>mapred.fairscheduler.allocation.file</name>
<value>/opt/hadoop/conf/allocations.xml</value>
</property>
<property>
<name>mapred.fairscheduler.poolnameproperty</name>
<value>pool.name</value>
</property>
2) 在 Hadoop conf 下创建allocations.xml内容为:
<?xml version="1.0"?>
<alloctions>
</alloctions>
样例:
<pool name="sample_pool">
<minMaps>5</minMaps>
<minReduces>5</minReduces>
<weight>2.0</weight>
</pool>
<user name="sample_user">
<maxRunningJobs>6</maxRunningJobs>
</user>
<userMaxJobsDefault>3</userMaxJobsDefault>
3). 重启 JobTracker
4). 访问 http://jobTracker:50030/scheduler , 查看 FariScheduler 的 UI
5 ). 提交任务测试
3.容量调度器
支持多个队列,每个队列可配置一定的资源量,每个队列采用 FIFO 调度策略,为 了防止同一个用户的作业独占队列中的资源,该调度器会对同一用户提交的作业所占资源量进行限定。调度时,首先按以下策略选择一个合适队列:计算每个队列中正在运行的任务数与其应该分得的计算资源之间的比值,选择一个该比值最小的队列;然后按以下策略选择该队列中一个作业:按照作业优先级和提交时间顺序选择 ,同时考虑用户资源量限制和内存限制。但是不可剥夺式。
十、shuffle和排序
shuffle简介:多个map和reduce任务的时候,每个reduce任务的输入都来自许多map任务, 就会出现数据流的混洗(shuffle),调整混洗参数 对作业总执行时间的影响非常大。
系统执行排序的过程称为shuffle。
shuffle是MapReduce的“心脏”是奇迹发生的地方。
map端
我们知道map产生的输出是临时写到本地磁盘的,但是他并不是简单的写到本地磁盘中,这个过程更为复杂,如图:
他会首先使用缓冲的方式写入到内存中,并且处于效率的考虑进行预排序。每个map都有一个缓冲区用于存储任务输出,这个缓冲区的大小默认为100MB,可以通过io.sort.mb属性调整。一旦缓冲的内容达到预设的阀值(通过io.sort.spill.percent,默认是0.8或80%),一个后台进程便把内容溢出(spill)到磁盘。在溢出过程map输出会继续写到缓冲区,如果在此期间被填满,则发生堵塞,直到写磁盘过程完成。这个溢出写的过程会将数据写到mapreduce.cluster.local.dir指定的目录内。在写硬盘之前会根据输出的reduce进行分区(partition),然后对每个分区内容进行排序,如果有combiner函数,则在排序之后执行combiner。每次内存缓冲区达到溢出的阀值,就会新建一个溢出文件(spill file)。最终会有几个溢出文件,这些溢出文件会被合并成一个已分区且已排序的输出文件,io.sort.factor控制一次最多能够合并多少流,默认是10。如果至少有3个溢出文件(这个值由min.num.spills.for.combine属性设置)则就会在输出文件写到磁盘之前在此运行以此combiner。将输出进行压缩可以减少输出及传递到reduce的网络开销,可以设置mapreduce.compress.map.output设置为true,使用mapreduce.map.output.compression.codec指定压缩方法。
reduce端
reduce任务需要集群中若干个map的输出作为其输入,但是每个map的完成时间并不一样,所以只要有一个map输出,reduce就开始复制其输出,这就是reduce端的复制阶段。reduce有少量的复制线程,默认是5个,这个值由mapreduce.reduce.parallel.copies属性改变。
那么reduce如何知道从哪台机器获取map输出呢?
map任务完成后,会通知其父tasktracker,tasktracker会通知jobtracker(在MR2中是applicationMaster),从而jobtracker(applicationMaster)知道了tasktracker与map的映射关系,reduce中的一个线程会定期向applicationMaster(或者jobtracker)进行询问,以便获取map输出的位置。
复制完成后,reduce开始进入排序阶段(其实是合并节阶段,因为排序是在map端进行的),这个阶段合并map输出,保持其排好的顺序。这个合并是循环进行的,可以设置合并因子io.sort.factor,默认是10,即每趟合并10个文件,假设总共50个map,总共进行5趟,最终有5个中文文件。之后是reduce阶段,直接把数据输入到reduce函数,而不用将这5个文件合并称一个大文件。reduce函数输出直接写到HDFS上。
配置调优
map端的调优属性:
属性名称
|
类型
|
默认值
|
说明
|
io.sort.mb
|
int
|
100
|
map输出所使用的内存缓冲区大小,以MB为单位
|
io.sort.spill.percent
|
float
|
0.80
|
缓冲区预设的阀值,超过这个百分比开始将内容溢到磁盘
|
io.sort.factory
|
int
|
10
|
排序文件时一次最多合并的流数,在reduce端也是用
|
min.num.spills.for.combine
|
int
|
3
|
运行combiner所需要最少溢出文件数
|
mapreduce.compress.map.output
|
Boolean
|
false
|
压缩map输出
|
mapreduce.map.output.compression.codec
|
Class Name
|
org.apache.hadoop.
io.compress.DefaultCodec
|
用于map输出的压缩编码器
|
tasktracker.http.threads
|
int
|
40
|
每个tasktracker运行的线程数,用于将map输出到reduce,在YARN不适用
|
这个过程总的来说就是要为shuffle分配更多的内存,但是这时候可能还需要考虑到map函数和reduce函数能够得到足够运行的内。所以一般map函数和reduce函数在编写的时候尽量少占内存。map端可以通过避免多次溢出写磁盘来获得最佳性能,一次是最佳的情况。
reduce端的调优属性
属性名称
|
类型
|
默认值
|
说明
|
mapreduce.reduce.parallel.copies
|
int
|
5
|
用于把map的输出复制到reduce的线程数
|
mapreduce.reduce.copy.backoff
|
int
|
300
|
在声明失败之前,reducer获取一个map输出所花的最大时间,以秒
为单位,如果失败,reducer可以在此时间内尝试重传
|
io.sort.factor
|
int
|
10
|
排序合并时的合并因子
|
mapreduce.job.shuffle.input.buffer.percent
|
float
|
0.70
|
在shuffle阶段,分配给map输出的缓冲区占堆空间的百分比
|
mapreduce.iob.shuffle.merge.percent
|
float
|
0.66
|
map输出缓冲区(上面定义的那个)的阀值使用比例,用于启动合并输出和磁盘溢出写的过程
|
mapreduce.inmem.merge.threshold
|
int
|
1000
|
启动合并输出和磁盘溢出写过程的map的输出的阀值数。0或更小,意味着没有阀值限制
|
mapreduce.iob.reduce.input.buffer.percent
|
float
|
0.0
|
在reduce过程,在内存中保存map输出的空间占整个堆空间的比例。reduce阶段开始时
,内存中的map输出不能大于这个值
|
十一、MapReduce任务执行
Map任务数量由InputSplit决定,InputSplit分片大小默认是HDFS块大小(hadoop1.x=64mb,hadoop2.x是128mb)。
例如:
MapReduce作业读取HDFS上(hadoop2.x)两个文件,一个是200MB,一个是100MB,这时候就有3个InputSplit,每个InputSplit会起一个Mapper任务读取,通过
RecordReader转换为key,value提供Mapper函数使用
MapReduce作业调优:
1、关闭推测执行
MapReduce模型将作业分解成任务,然后并行的运行任务是作业整体执行时间少于各个任务顺序执行时间。这使作业执行时间多运行缓慢的任务很敏感,因为运行一个缓慢
的任务会使整个作业所用的时间远远长于其他任务的时间。当一个作业由成百上千时可能会出现拖后腿的任务。
任务执行缓慢可能有多种原因,包括硬件老化或软件配置错误,但检测问题具体原因很难,因为任务总能成功执行完,尽管比预计时间长。hadoop不会尝试诊断或修复执行
慢的任务。在一个任务运行比预期慢时,他会尽量检测,并启动另一个相同的任务作为备份,这就是推测执行(speculative execution);如果同时启动两个相同的任务他们
会相互竞争,导致推测执行无法正常工作,这对集群资源是一种浪费。只有在一个作业的所有任务都启动之后才会启动推测任务,并只针对已经运行一段时间(至少一分钟)且
比作业中其他任务平均进度慢的任务。一个任务成功完成后任何正在运行的重复任务都将中止。如果原任务在推测任务之前完成则推测任务就会被中止,同样,如果推测任务先
完成则原任务就会被中止。
推测任务只是一种优化措施,它并不能使作业运行的更加可靠。如果有一些软件缺陷造成的任务挂起或运行速度慢,依靠推测执行是不能成功解决的。默认情况下推测执行
是启用的,可根据集群或每个作业,单独为map任务或reduce任务启用或禁用该功能。
job.getConfiguration().setBoolean(job.MAP_SPECULATIVE, false); //是否对Map Task启用推测执行机制,默认是true
job.getConfiguration().setBoolean(job.REDUCE_SPECULATIVE, false); //是否对Reduce Task启用推测执行机制,默认是true conf.set("mapreduce.map.speculative", "false");
conf.set("mapreduce.reduce.speculative", "false");
2、开启jvm重用
属性:mapred.job.reuse.jvm.num.tasks,默认值是1,在一个taskTracker上对于给定的作业的每个jvm上可以运行任务最大数。-1表示无限制,即同一个jvm可以被该作
业的所有任务使用。
conf.set("mapreduce.job.jvm.numtasks", "-1"); //开启jvm重用
job.getConfiguration().setInt(job.JVM_NUMTASKS_TORUN, -1); //开启jvm重用
3、跳过坏记录
在MRv1中可通过启用skipping mode来跳过损坏的记录(格式错误或字段丢失),当启用后任务将正在处理的记录报告给tasktracker,任务失败时tasktracker会重新运行
该任务,跳过导致任务失败的记录。由于额外的网络流量和记录错误以维护失败记录范围,所有只有在任务失败两次以上才会启用skipping mode。
在MRv2中不再支持skipping mode选项。权威指南中建议在mapper和reducer代码中处理被损坏的记录。运行时检测到坏记录根据业务需求选择忽略或通过抛异常来终止作业
运行;如果业务要求必须处理则可以通过计数器来计算作业中总的坏记录数,看问题影响的范围进行处理。
conf.set("mapreduce.map.skip.maxrecords", "0"); //Map Task跳过损坏记录的最大数,默认0,如果应用能接受任何坏记录则取Long.MAX_VALUE
job.getConfiguration().setLong(job.MAP_SKIP_MAX_RECORDS, Long.MAX_VALUE); //Map Task跳过损坏记录的最大数,默认0。
如果应用能接受任何坏记录则取 Long.MAX_VALUE(9223372036854775807)个坏记录
4、设置Map和Reduce任务失败重试次数
conf.set("mapreduce.map.maxattempts", "4"); //Map Task最大失败尝试次数,默认是4次
conf.set("mapreduce.reduce.maxattempts", "4"); //Reduce Task最大失败尝试次数,默认是4次
十二、MapReduce的类型与格式
http://blog.csdn.net/colin_yjz/article/details/46658921 ---来源CSDN-colin_yjz