后备任务调度策略
问题描述
通过上一章的系统评估和性能分析,我们看到了由于部分落后机器而导致的任务延迟增大问题。在Map或者是Reduce阶段,位于某台机器上的某个子任务,由于各种原因而导致其显著慢于同一个任务同种类型的其他子任务,从而导致整个任务的延迟显著增大。
由此系统评估和分析我们可以知道,本系统中的任务延迟存在此性能问题。接下来我们针对此问题改善系统的单任务延迟,并由此说明系统评估在其中起的重要作用。
对于短任务的用户来说,系统的单任务延迟时间是一个重要的衡量因素。因为短任务的用户都希望系统能够快速地对他们提交的任务做出回答,比如在日志分析、系统监控、商务数据分析等等应用中,这样的任务是很常见的。Google的MapReduce也是主要用于短任务,根据MapReduce的论文,Google在2007年9月中提交的MapReduce任务的平均延迟时间是395秒。延迟时间对于SQL-like的系统和任务来说也很重要,比较在这些系统之上的语言如Pig、SCOPE等等,用它们描述的计算任务基本上都希望有好的延迟。同时,对于EC2这样的应用来说,单任务的延迟就显得更加重要,因为EC2用户的付费是按时间来计算的,如果使用的系统对单任务有更好的延迟,那无疑能够有显著的经济效应。
所以,使用Backup的调度策略,对于落后任务投机地使用后备任务,能够极大地提交系统的单任务延迟时间,对于短任务来说,尤为如此。而对于较长的任务,后备任务的使用的效果就相对比较小。我们这一章的工作中,也将主要考虑改善短任务的延迟时间。
相关工作
MapReduce
Google的论文中说明了他们在工作中这个问题的状况。他们观察到在任务的执行最后过程中,经常有一台机器在运行这最后的几个Map或Reduce任务时花费异常长的时间。他们也分析了可能的原因:这台机器的硬盘状况不好,可能会导致它的读写出现异常;或者是由于调度使得同一台机器的负载过重而使得原先执行的任务变慢;或者是任务程序中的bug问题等等。通常这样的落后机器出现的概率是1/100。
他们也使用了一些机制来启动后备任务。他们的策略是这样的,当MapReduce的任务快结束时,他们对于还在运行的子任务启用后备任务,原子任务或者后备任务中的其中一个结束就标记这个子任务结束。他们同时采取了一些措施使得后备任务不会占用系统资源过多。根据文章的实验,对于sort来说,启用后备任务后,性能提高了44%。
Hadoop
对于Hadoop2中的实现,针对此问题,分析代码细节如下。
如果一台机器空闲的时候,Hadoop将选择一个任务在其之上运行。选择的方式如下:首先,失败的任务优先最高执行,这是Hadoop的错误侦察机制,为了找到由于bug或者其他原因不断失败的任务并最终放弃;其次,没有运行过的任务根据调度策略调度到此空闲机器上执行;最后,如果前两种情况都没有发生,Hadoop选择一个正在运行的任务投机执行,而判断的策略将在下面阐述。
为了挑选出需要投机执行的后备任务,Hadoop使用了一个进程分数的值来进行判断。进程分数是一个在0到1之间的数值,它是由几部分组成的。
对于Map任务来说,进程分数是该任务输入数据占整个任务输入数据的比值。而对于Reduce任务来说,这个值的获取分三个阶段考虑:
1.copy阶段,当Reduce任务正在从Map任务拷贝数据的时候该值计入进程分数,值为copy数据占整个Map数据的比例。
2.sort阶段,当Map输出的数据根据key来排序的时候该值计入进程分数,值为sort数据占合并数据的比例。
3.reduce阶段,当用户定义的reduce函数正在对Map数据进行reduce操作时该值计入进程分数,值为reduce数据占此阶段数据的比例。
然后,Hadoop对每种任务根据它们的平均进程分数定义了一个阈值。Hadoop简单地把阈值设为平均分数减0.2。对于落后者的判断,Hadoop发现如果一个任务的进程分数低于此类任务的阈值,并且已经运行了至少一分钟,则这个任务被标记为一个落后任务,需要执行后备任务以加快任务的延迟。
最后,Hadoop使用一个FIFO的队伍来进行任务的调度。
异构环境中后备任务调度
在OSDI 08的这篇文章中[ Matei Zaharia Andy Konwinski Anthony D. Joseph Randy Katz Ion Stoica. Improving MapReduce Performance in Heterogeneous Environments, proceedings of OSDI, 2008.],作者关注了在异构的环境下后备任务的调度。在日常的应用中,如Amazon的EC2这样的按需式计算服务,或者是小型机构的机房中,异构的机群和环境都是很常见的。而Hadoop的调度基本上是基于同构的环境来设计的。作者分析了Hadoop的设计并给异构的环境中设计了新的调度策略。最后在200台机器的EC2异构机群中,提高了2倍的性能。
实现细节
首先,我们考虑了Tplatform的实际情况:主要用于本实验室小组的搜索引擎的研究信息处理,应用程序的数据基本上是与机群规模匹配的数据集合,而任务集合基本上是搜索引擎相关的如网页消重、网页的实体分析、数据的挖掘等等。而机群的环境都是同构的,所以我们考虑的是怎么在同构的环境在针对这些中短任务进行后备任务策略的优化。如下我们将细致讨论实现细节。
整体框架
我们从高层逻辑来阐述实现的细节。首先需要在Master端解决落后者的判定问题,对于一个正在Worker端执行的子任务,其是否是落后者,需要Master综合各方面信息分析后根据判定策略给出决策;其次,一旦该子任务被判定为落后者,具体的处理措施分析在Master和Worker端进行,需要保证Master端记录任务的数据结构的一致性和效率,以免Master变成整个系统的性能瓶颈,此外还需要保证Worker端的后备任务不会过多成为完全的冗余,避免系统由于后备任务策略的开销反而低效。
设计系统在执行此策略的流程如下:
(1)如果有一个Worker的机器出于空闲状态,它向Master汇报为没有任务在其上执行,并且Master也没有分配新的任务给它,此时Master启动后备任务的策略,决定是否在此机器上执行后备任务。
(2)Master根据落后者的判定策略,选择一个合适的后备任务在该Worker上执行,或者Master根据策略认为当前系统不需要后备任务,这可能是因为没有落后者或者是系统认为此时的系统负载不合适执行过多的后备任务。总之,此处将根据策略决定是否执行,原则是能够改善系统的性能同时保证开销不会过大。
(3)系统进行处理,分别在Master端和Worker端做出反应,直至完成。
我们在下面的小节中详细阐述这三个步骤中的细节。
落后者判定策略
落后者的判定是这个系统优化中最重要的部分,如果系统误将非落后任务判定为落后任务,那将造成不必要的开销最终导致性能变差;如果系统没有识别出落后者,那潜在的性能优化空间并没有得到填充,那优化并没有得到充分发挥。而如何决策使得上述两个方面的折衷能够得到同时满足,一方面是根据最终系统运行的实际应用程序而定的,另一方面是根据系统的实际状况而决定的。根据上述分析,我们的应用场景和系统是这样的:在同构的系统环境下面对中短任务的计算。
首先,我们分析导致落后者出现的原因和表现行为。落后者可能因为多种原因出现,在Google的MapReduce论文中已经有所提及。无论这些原因是什么,落后者出现后,它们的表现行为都是落后于正常子任务,根据短板效应导致最后整个任务需要等待这个最慢的子任务才能算结束,从而增大了延迟。所以对于落后者的判定,要做的就是把它们和普通的正常子任务区分开来,知道哪些子任务的正常的,哪些是落后的。
然后,对于子任务耗时的信息和完成进度和来说,我们可以分别在Master和Worker端获取这些信息。
最后,我们在Master汇总这两方面的信息,确定性地给出判定的结果。
Master端:
我们有一个假设,那就是对于同一个任务的同种类型的子任务(Map或者Reduce,Transfer子任务会在Map做完后开始,我们不考虑对Transfer子任务投机地进行后备执行)可能会在相同的时间内完成。此假设认为1.机群环境同构;2.数据基本上均匀分布;3.计算时间和IO时间不会因为数据的值而发生剧烈的变化。如果这一系列假设不成立时,就有可能出现落后者问题。我们将在下面根据实验说明在测试的基准程序集合中,这样的应用程序负载下此假设是成立的,偶尔会因为各种原因使得假设失败,这些情况就是落后者,需要做后备任务投机执行以改善延迟。
所以,我们将根据系统监控和程序的概要分析记录下任务的Map子任务和Reduce子任务的平均完成时间,如果一个正在执行的子任务比同任务同类型的其他子任务显著地慢,那它可被判定为落后者。
Worker端:
对于正在Worker端执行的子任务来说,Worker具有知道该子任务进行到什么地步的能力。我们用进度分数来描述Worker正在执行的任务处于什么状态,进度分数是一个从0到1的连续值。
在Map阶段,由于TFS的默认设置是一个chunk大小为64M,所以默认的Map输入数据大小为64M。我们根据Mapper读入的数据大小来确定进度分数。
在Reduce阶段,需要先进行一个Sort的过程,因为Sort的完整性,我们简化了此过程中进度分数记录。也就是说,在Sort的过程中,Worker认为Sort还没有完成,进度为0。整个Reduce阶段的进度分数由是否Sort完成和reduce进度两部分组成,这两部分的权值我们进行了简单的权重分配。Sort阶段在大数据时会进行多路归并的外排,而reduce阶段基本上是IO占主要的时间。同时,我们根据实验的经验设置两部分权重比例为1:1。最后根据权重计算出整个Reduce阶段目前的进度分数。
在Worker端记录下各阶段的进度分数后,在本地由文件管道传递给和Master端通信的心跳进程,再由此进程通过心跳把进度和任务的相关情况捎带传给Master。
综合:
对于需要判断的正在执行的子任务来说,一方面,Master通过记录的以前执行的同类型子任务的历史信息可以知道平均耗时,一方面Master通过Worker每次心跳传来的实时进度可以知道此子任务进行到什么进度。如果该子任务已经显著超过平均耗时水平或者根据进度明显慢于同类型任务,那即可判定该子任务为落后者。
系统处理过程
从两方面来描述这一过程中系统的处理。
首先是Master端,Master对一个子任务进行是否为落后者的判定后,需要修改Master端的数据结构,以进行处理。
如果不是落后者,那Master不作处理。
如果落后者判定成功,Master修改数据结构以记录原始的子任务和新的后备子任务。初始化后备子任务的数据结构,在和指定的Worker发送消息时将此新命令发出。
在命令发出之后,Master还需要处理此子任务的结束。如果原始子任务和后备子任务其中一个完成,Master即认为此子任务结束,并发停止执行的命令给还未结束的Worker,以免浪费资源。
然后是Worker端的处理。在这里的实现中,Worker对后备任务这一策略是透明的,如果Master发命令给Worker要求做一个任务,原始任务和后备任务在Worker看来是一样的。
数据结构细节
同样地,我们从Master和Worker两方面来描述数据结构的实现细节。
对于Master,在保证不规模修改实现接口的情况下,进行了如下的实现。
TaskInfo结构:TaskInfo是记录子任务信息的数据结构,在之前的实现上添加了两个域,用以标明和区分后备子任务。
/// if it is backup task: MAP/REDUCE only
bool isBackup;
/// for backup task: original task id
int32_t originalTaskId;
一个是标明此任务是否为后备任务,一个是记录原始任务的ID。
在TaskManager中添加两个辅助的数据结构,用来在添加后备任务以及判定完成情况时处理。
/// task status map: record status of map/reduce tasks with each job, task is completed when either backup task or orginal task completed
std::map<int32_t, std::map<int32_t, bool> > m_jobid2tasksIsCompleted;
/// backup task map: one task only have a single backup task at one time
std::map<int32_t, bool> m_taskid2backing;
第一个map是记录各个job中的task是否完成,如果有后备子任务,只要原始任务和后备子任务中其中一个完成就算此任务完成。
第二个map是记录各个任务是否有后备任务,在这里使用map是因为后备任务的查看和处理过程中需要经常看原始任务的状态,所以使用map避免Master端大规模的扫描任务队列,成为性能瓶颈。此外,这里的一对一映射保证了一个原始子任务最多只有一个后备子任务,这是为了防止造成多个后备任务出现而造成开销太大,或者是后备子任务再次成为落后者引起级联反馈的效果后浪费系统的资源。
后备任务策略评估实验
机群配置和任务准备
我们的机群配置如下。
我们在后备任务策略的评估实验中使用了一台Master、十四台Worker组成的MapReduce系统集群。所有的机器都是Dell 2850服务器,每台机器配置为2颗Intel Xeon处理器,2GB内存,6个7200 rpm SCSI硬盘组成一个RAID-0的逻辑卷。这些机器存放在两个机架中,各有一台Dell 2748 1Gbps交换机,机器通过一个1Gbps的全双工以太网卡与交换机相连接,两个机架通过一个Cisco千兆路由器链接。
我们实验使用的是PennySort程序来进行评估。生成了50M的Record,一共是4.8G大小的数据。
任务耗时趋同性分析
我们首先分析在5.3.2节的设计中做出的假设在我们的环境和工作负载下是否合理。
我们的假设是,对于同一个任务的同种类型的任务基本上会在相同的时间内完成。
我们对与选定的基准程序集合的任务集合的耗时情况进行分析如下表,可以看到它们耗时的标准差和均值的比例并不高,说明这些任务基本上是在相同的时间内完成的。
系统优化方向
根据我们对系统的分析和评估,以及我们在Tplatform平台的日常使用中的经验,除了已经实现的后备任务策略,我们针对分析得出的其他系统优化方向,进行分析和探讨。
网络传输问题
在MapReduce的模型和体系结构的实现中,需要进行网络的传输任务,也就是在reduce阶段需要把map生成的数据传到reduce对应的机器上。在这个阶段很多应用程序可能生成大量的中间数据,而经过我们的之前的分析,网络会成为MapReduce系统的性能瓶颈。所以,如果优化网络的传输,减少不必要的中间数据,也是一个直接和实际的系统优化问题。
在网络传输的问题上,可以有不同的研究方向:
如何优化机群的网络传输,考虑在此环境下机器的路由和网络层的优化问题。已经有一些研究工作在这个方向上进行,如DCell[ Chuanxiong Guo etc., DCell: A Scalable and Fault-Tolerant Network Structure for Data Centers, proceedings of SIGCOMM, 2008.]、Fat Tree[ Al-Fares, Loukissas, Vahdat, A Scalable, Commodity Data Center Network Architecture, proceedings of SIGCOMM, 2008.]。
如何通过优化负载和任务等不同粒度上的调度,来优化系统的网络传输。
如何有效地利用应用程序的数据特征,使得系统可以充分利用数据的时间和空间局部性,从而减少产生的中间数据,最后达到优化网络传输问题的目的。
此外,还有其他方向和角度来考虑这个问题。从我们的日常使用经验来看,网络通常是比硬盘I/O更容易成为瓶颈。
增加用户和系统的交互
在MapReduce的体系框架下,系统对用户掩盖了很多系统细节,同时简单的计算模型也使得大量的并行细节对用户来说并不透明。Google对于MapReduce的此设计目的在于掩盖细节,使得用户只是简单的实现Map函数和Reduce函数,就可以进行大规模的数据处理。但是我们发现,在实际使用中过于简单的模型和不透明也会带来性能问题。用户对系统的不了解可能造成重复计算或者浪费用户的时间,用户白白等待无效的计算等等情况。
所以此方向我们需要研究的是,哪些系统实现细节是有必要对用户掩盖的,哪些系统实现细节如果用户知道能够使得一个专家级的用户更好地控制系统和对应用程序进行优化。同时,我们还需要研究的是,在应用程序层面上来看,哪些信息如果让系统知道,能够更好更高效地执行应用程序;此外,为了更好地让系统了解应用程序,系统应该提供什么样的接口或者配置让用户方便地和系统进行交互。
总之,我们认为,系统和用户不应该是孤立的,系统对用户也不是完全透明的,同时系统对用户的应用程序也不是一无所知的。系统应该多了解用户的行为和应用程序的特点,同时用户也需要更了解系统。用户和系统之间的交互应该增加。
从数据库领域看系统性能的其他提升空间
关于MapReduce和分布式数据库到底有什么不同,是目前人们争论的一个焦点[ Andrew Pavlo. Erik Paulson. Alexander Rasin etc., A Comparison of Approaches to Large-Scale Data Analysis, proceedings of SIGMOD, 2009.]。数据库通过很多年的发展对数据的存储和计算,以及用户使用的语言等等都做了大量的研究并发展了很多成熟的技术。但是在MapReduce这样类似的“云计算”环境下,数据库的技术是否在MapReduce系统的研究中可以参考和借鉴,哪些可以参考和借鉴,什么样的任务是分布式数据库难以胜任的,什么样的任务是MapReduce难以胜任的,他们两种体系的计算引擎的本质区别到底是什么?这些都是亟待解决的问题,也是人们关心和争论的焦点。
我们针对其中的一些问题,可以进行研究。
比如索引的使用,在分布式数据库中是很正常和成熟的技术。MapReduce的系统中是不支持索引的,对于一些任务来说,如果使用MapReduce的框架来进行处理,将是比较低效的21。但是如何在MapReduce这样的体系下使用索引还是一个需要研究的问题。
系统易用性
我们通过日常的使用发现,MapReduce程序的编写还是过于底层,通常一些简单的任务如日志分析等等需要花费比较长的时间来编写。对记录层级的数据进行直接处理和使用文件系统作为底层存储也会对易用性造成一些问题,现在有一些高层语言来处理这些问题,如Pig Latin3等,但是系统的易用性和语言的问题仍然是一个需要不断研究的问题。
总结
我们介绍了我们的Tplatform的设计特点,包括TFS和MapReduce,并探讨了由于我们设计的不同导致系统的性能优化和设计折衷。
然后我们分析了系统的评估目标,包括从单任务延迟、总机器时间、平均结束时间、加速比、公平性、故障恢复稳定性等多个方面来考察系统的性能和其他各方面表现。
同时,我们并设计了一系列的基准程序和数据,从MapReduce的系统体系结构出发,考虑不同的程序和数据如计算密集型、I/O密集型、网络密集型等来衡量MapReduce或者类似系统的上述评估目标。
为了达到分析和评估系统的目的,我们在系统中设计了性能的监控和程序概要分析框架,用来收集系统的相关表现信息。通过对收集的数据进行分析,我们可以得到实验的结果,并对系统进行分析和给出改善意见。我们还在实验中分析了程序概要分析框架的开销,实验结果可以看到开销不大。
我们设计了一系列的实验来对我们的Tplatform进行评估,分析系统中的实际情况。我们发现网络常常成为MapReduce和类似系统的瓶颈,落后者对系统的延迟有很大的影响,系统对短任务的调度并不公平等等问题。
通过实验和分析,我们发现了当前系统的这些问题,然后我们选取了落后者的问题进行改进。我们针对此问题实现了后备任务策略,落后者会显著地造成延迟增大的性能问题。我们的模拟实验表明,我们的后备任务能够有效地改善这一问题。
最后我们总结了在分析和日常使用中发现的问题,并提出了一系列的未来工作方向。