Abstract
MapReduce is becoming an important parallel programming paradigm for processing Internet scale data. It is widely used to process jobs such as searching, analyzing, and mining on large scale structured and semi-structured data. It is still a problem for the emerging MapReduce-like systems to analyze and eva luate systematically and efficiently.
This paper discussed the issues in performance eva luation for MapReduce runtime system. We designed and chose a series of representative programs and data as benchmark. And then we implement profiling in our homemade MapReduce system which named Tplatform. We did the eva luation experiment for finding the bottleneck of the system. Through the experiment, we found some performance problems such as scheduling and stragglers etc. We implemented backup tasks for improving the problems caused by stragglers. Our simulation results reveal that we improve the performance efficiently.
引言
MapReduce正在成为人们在海量数据上进行并行计算的重要编程模型,比如为大规模的网页做索引、在海量的数据中进行挖掘、庞大的科学计算任务等等。
人们开始关注在普通计算机上实现大规模的并行计算以提供各种服务,Google则无疑是这方面的先驱者。Google使用MapReduce作为日常计算的引擎,将每天处理20PB的数据[ Dean, J. and Ghemawat, S. MapReduce: Simplified Data Processing on Large Clusters. proceedings of OSDI, 2004.]存在底层的存储系统如GFS6、BigTable7中。很多重要的搜索引擎服务,如索引、网页排序、网页消重与去噪、用户日志分析、用户行为预测等等,都可以使用MapReduce的框架来加快程序员在进行相关的处理。
此外,MapReduce也是一个如今很受欢迎的并行计算模型。MapReduce良好的可扩展性使得并行处理变得很容易,人们可以很方便地把MapReduce部署到大规模的廉价机群上使用。它的开源实现版本Hadoop[ Hadoop. The hadoop project. http://lucene.apache.org/hadoop/, 2006.]也得到了广泛的应用。如今很多公司如Yahoo!、FaceBook、Amazon、New York Times,以及部分研究机构和大学如CMU、Cornell等等都开始使用Hadoop进行研究和开发。
为了更好和方便地让程序员使用MapReduce或者类似的并行处理计算框架如Map/Reduce/Merge6,人们在其上架设了一系列的编译系统,并通过高层的语言把计算任务映射为底层的MapReduce任务。这方面的工作如Yahoo! 在Hadoop上实现的Pig[ C. Olston, B. Reed, U. Srivastava, R. Kumar and A. Tomkins. Pig Latin: A Not-So-Foreign Language for Data Processing. proceedings of SIGMOD, 2008.]、Google实现的Sawzall[ Pike, R., Dorward, S., Griesemer, R., and Quinlan, S. Interpreting the data: Parallel analysis with sawzall. Scientific Programming 13, 4 (2005), 277–298.]等等。
类似系统的开发和研究也层出不穷,如微软有自己的Dryad5/SCOPE7/DryadLINQ[ Isard, M., Budiu, M., Yu, Y., Birrell, A., and Fetterly, D. Dryad: distributed data-parallel programs from sequential building blocks. proceedings of the ACM SIGOPS, 2007]系列系统。拥有这样的处理能力无疑成为一个互联网公司的核心竞争力,可以预见在未来的一段时间里面,还有类似的很多系统和研究出现。
人们在使用Hadoop或者类似的其他并行处理计算框架及其上层语言时,众多的使用者对底层大规模并行处理计算框架有自己的需求。比如大学或研究机构使用此类框架进行科学计算时,系统的工作负载可能是偏向计算密集型,人们也关心系统对于计算任务的延迟反应;而大型因特网公司如Google、Yahoo!、Microsoft Live Search等的数据中心中,有若干程序员在同时提交计算任务,程序员不但关心计算任务的延迟,还关心整个中心中负载的调度公平性;而对于此类系统的开发和研究人员来说,他们关心系统的吞吐量、系统中各机器的状态和使用情况等等。所以考虑此类并行处理计算框架特别是MapReduce系统的各项系统指标,并确定评估的程序和方法,对评估类似系统、基于用户希望的系统设计折衷进行系统之间的比较、改进系统等等有很重要的意义。在这个基础上如Berkeley也有一些系统测试的工作如分析网络的性能X-Trace[ R Fonseca, G Porter, RH Katz, S Shenker, I Stoica. X-trace: A pervasive network tracing framework. proceedings of NSDI , 2007.],以及对MapReduce系统和数据库系统性能评估的讨论15。
我们基于MapReduce实现了自己的并行处理计算框架,并在其之上进行了系统的测试和评估。我们提出了测试程序和数据,并基于此在系统中实现了监控和程序性能概要分析框架。通过测试和评估实验,我们总结了系统的性能指标和观察到的问题。我们针对其中的单机落后问题,实现并验证了后备任务策略,并基于此改进系统性能。最后,我们总结并给出了其他工作方向。
论文的剩余部分按如下方式进行组织。第二章对MapReduce的模型和体系结构进行概述,而第三章列出了需要评估的系统目标和我们设计的基准程序和数据集合。为了分析和评估系统,我们在第四章阐述了系统监控框架和程序概要分析的设计和实现细节。之后我们在第五章中列出了实验结果和给出了实验的分析,并在针对其中的落后者问题实现了后备任务策略,在第六章中详细阐述了后备任务策略的实现和实验评估。我们在第七章中对系统可能的优化方向进行了展望并在第八章中进行了总结,最后是致谢。
MapReduce框架
在这一章里面,我们将简单介绍MapReduce框架的模型和我们的系统实现。
MapReduce模型介绍
Google的研究人员受到函数式编程语言(functional language)的启发,在总结大量的大规模分布式处理程序共同特征的基础上,提出了MapReduce并行程序框架。
MapReduce是一大类大规模并行数据处理程序的抽象。这类计算的输入是一个(键,值)对的集合,输出也是一个(键,值)对的集合。用户只需要提供两个操作map和reduce的实现,MapReduce运行时库就可以自动把用户程序并行化。
用户提供Map函数的实现,它接收一个输入对,产生一组中间结果对。MapReduce库会把具有相同键的所有中间结果对聚合到一起,把他们传给Reduce函数。用户提供的Reduce函数,接收中间结果的一个键和具有此键的一组值,处理这些值,产生若干个(键,值)对做为输出。它们的一般形式如下:[ 杨志丰,GFS与MapReduce的实现研究及其应用,北京大学硕士论文,2008.]
Map (k1, v1) -> list (k2, v2)
Reduce (k2, list (v2)) -> list (v2)
MapReduce模型的最大好处是简便性,用户只需要提供这两个接口就可以处理大规模的数据,而不需要太多分布式计算的实现细节。
系统实现
MapReduce的实时运行主要是为并行化和并发执行服务的。为了尽可能的并行化和扩展系统,MapReduce把输入的数据分割到多个机器上。中间数据的传输和序列化处理等由系统来控制。分割的数据由多个Reduce来处理。这两个步骤中Map任务和Reduce都可以同时执行,且它们都具有良好的可扩展性,也即可以方便地增加机器增加并发度。
而在系统实现的层面上,系统需要决定底层的各个细节如数据单元的大小、中间数据的处理、内存的缓存多大、排序的方式、各个任务的调度、机器的失败和容错处理等等。系统自动的把这些细节都掩盖,所以对程序员来说,他只需要知道这个编程模型并编写MapReduce的程序即可。
Google的论文中描述了他们在分布式机群系统上对MapReduce的实现。系统把输入数据划分为M份数据片,这些输入数据片可以在不同的机器上并发的被Map函数处理。所有的中间结果对使用一个分区函数(partitioning function)分为R份。然后,对于每个分区,通过排序把具有相同键的所有(键,值)对聚合到一起,用Reduce函数处理,最后产生R个输出文件。R的值和分区函数可以由用户指定,系统默认的分区函数是hash(key) mod R1。
Google的MapReduce实现是构建在GFS之上的,所有的MapReduce程序的输入和输出都是存储在GFS中的文件。由于GFS中的数据都有多个副本,当执行MapReduce的机群和运行GFS的机群是同一个时,MapReduce库的调度模块会尽量把map任务分配到存储数据的机器上本地运行,这样可以避免输入数据的网络传输,极大的提高性能。此外,用户可以指定函数用来把原始输入数据转换为map函数的输入,用户也可以指定函数用来把reduce的输出结果序列化为输出数据。体系结构图1如下:
数据的流图[ Colby Ranger, Ramanan Raghuraman, A. P. G. B. C. K. eva luating mapreduce for multi-core and multiprocessor systems. proceedings of HPCA, 2007]如下,Worker分别执行本地的任务,可能是Map任务、Transfer任务和Reduce任务。整个过程由Master控制和协调调度。
Tplatform的实现
我们实现的类似平台是Tplatform。我们自己也实现了一个自制的MapReduce,建立在GFS类似的分布式文件系统TFS上[ Zhifeng Yang, Qichen Tu, K. F, L. Z, R. C, B.P. Performance gain with variable chunk size in gfs-like file systems. Journal of Computational Information System 3(2008), 1077-1084]。TFS在设计上与GFS的不同之处在于对底层Chunk大小的设置7。 与Google提供运行时库然后通过一个二进制程序的多个副本扮演不同角色的方式不同,我们的实现提供的是一个执行MapReduce作业的服务,用户把编写好的实现指定接口的动态链接库用系统提供的API提交上来,MapReduce系统就会自动调度和运行相关的任务。服务由一台主控(Master)机器和若干台工作机(Worker)组成,Master负责把用户提交的作业(Job)切分为若干个任务,然后调度他们在各台工作机上执行。相比提供运行时库由用户编译为一个程序的方式,这样做的好处是,系统的改进升级对用户是不可见的。如果系统的实现改变了,只要MapReduce API不改变,用户无需改变代码甚至不需要重新编译生成动态链接库就可以执行MapReduce作业,这给我们未来系统的升级优化带来了极大的便利。不仅如此,在Google的原始实现中,如果同一个机群有多个作业在同时运行,因为作业由主控程序负责调度但一个作业的主控程序是不知道另一个作业的存在的,所以多个作业之间可能产生资源的互相抢占。而在我们的系统中,一个机群只有一个主控程序,主控程序可以综合各个作业的情况对所有任务整体进行调度。
这里需要详细说明我们任务设计的细节。
我们把Worker需要做的任务分成三个类型:Map、Transfer、Reduce,我们把传输任务从原来的Reduce中抽离出来并作为一个可以由Worker单独调度执行的任务。在这里我们对此设计有如下的分析。
在原来的Map、Reduce任务的执行流程和设计下,对于Map执行完生成的中间数据,是由Reduce来到Map机器上通过远程调用取得。这些有可能出现的场景是很多Reducer同时来一台Map机器上进行取数据操作,造成Map机器对硬盘的随机写,而随机写对性能的影响是很大的,这样的数据传输模型可以称之为“拉”。而我们把传输任务独立开来,由Master调度控制,可以控制Mapper传输的时间,同时Reducer在同时接到多个传输任务的数据时可以做缓存,避免随机写的出现。
此外,我们在Worker端通过心跳线程和Master通信,在执行分配的任务时用Exec方式启动一个新的进程来执行具体的Map和Reduce任务。而传输任务使用启动线程用Socket进行传输。
我们在此基础上,实现了MapReduce的系统,我们的设计在实现上有很多和Google不同之处,也不同于开源的Hadoop2。在完成原型的开发和测试后,针对性能和系统的评估成为了我们亟待解决的关键问题。我们由此开始系统地对MapReduce和类似相关系统进行分析和评估,我们相信对于MapReduce和类似系统的研究工作的下一步将是对此类系统的优化。
所以对当前系统的分析和评估成为关键,找到系统使用中的瓶颈所在,针对用户需求的目标进行改进,都是实际应用中的重要问题。我们在Tplatform的实现基础上,开发了一系列的基准程序,细致地分析了系统中可能出现的问题。
我们说明和分析表中的数值。
首先,Map和Reduce的任务的选择度都是1,因为对于PennySort来说,Map做的是把数据简单地读入,然后进行传输和分割,而对Reduce来说,进行完数据的排序后也只需要把数据简单地输出,所以选择度都是1.
然后,对于传输的方式,按记录的生成原则,可以均称地进行hash分割。中间数据比初始读入的数据反而小是因为很多数据Map任务做完后可以在本地直接进行Reduce,利用的数据的空间数据性,所以传输数据变小。
最后Reduce任务需要进行排序,系统实现使用快排,复杂度为nO(logn).
系统监控和程序概要分析
更好地理解和监控云计算的基础设施系统如MapReduce是一个烦人且亟待解决的问题。现有的实现都是比较简单地记录系统的相关性能信息,而且并没有太多关于在此类系统中如何监控和评估的工作。但是在我们的开发和使用过程中,我们发现了系统的性能概要分析很重要,或者说通过更好地理解底层系统,能够更好地改善和优化现有的系统。例如如下的几个场景中,我们将说明这一点:
数据中心中的一个程序员向系统提交了一个用高层语言如Pig Latin描述的任务后,他/她可能想知道他的任务做到什么程度。从性能概要分析的角度来考虑任务监控这个问题,任务在多个机器上的性能分布很重要。这样可以知道任务中最耗时的函数,从来让程序员可以针对此考虑改进自己的程序,或者在系统对任务的编译中进行优化。
失效在数据中心里面是正常的1。MapReduce这样的系统对用户掩盖机器的失效,如果机器发生宕机,系统将处理并调度计算重执行;而对于计算任务的失效,处理方式是重新执行,如果多次失效超过一定次数,将放弃执行。这是因为在数据中心中, 很有可能是用户提交的任务的程序中存在BUG,或者是数据有不满足格式而导致无法读入等等。对于需要进行长任务处理的工作来说,在现有系统的实现下,可能是一件极消耗用户程序员精力的事情。可能的情形是,执行了很久到快结束的时候由于BUG或者存储的问题导致失败而最终放弃。 而实时的监控和交互可以部分地解决这个问题,让用户及时地知道系统里面发生的情况,对于系统无法做出判断的事情(程序有错),交给用户去解决不失为一个可行的方案。
分布式系统中的一个很重要的措施就是要保证负载均衡,这对于并行计算的框架来说,同样意义重大。在计算的过程中记录性能信息和进行监控,可以通知用户或者系统。通过重新的调度或者其他手段使得负载尽可能均衡。
总之,通过监控和程序的性能概要分析,我们可以让系统和用户之间有更多交互。同时给出的数据可以帮助用以评估系统,提供给不同的人如用户或者系统开发人员分析。
实现细节
我们需要记录一个子任务的运行时性能概要信息,通过以下的数据结构来实现。
struct ProfileInfo
{
// for map task
int mapFanIn;
int mapFanOut;
int mapRecordNumber;
int localCombineFanIn;
int localCombineFanOut;
int localCombineRecordNumber;
// for reduce task
int reduceFanIn;
int reduceFanOut;
int reduceRecordNumber
// for transfer task
int transferIO;
int transferRecordNumber;
// cost time, by seconds
int taskCostTime;
};
对于Map阶段,分别记录扇入扇出的数据大小、map的记录个数;以及做localcombine的扇入扇出、记录个数;对于Reduce阶段,记录扇入扇出的数据大小、reduce的记录个数;还有传输任务的传输数据量;最后是各个任务的花费时间。
通过在Worker端执行任务后记录下任务的性能概要情况,然后通过文件管道传递给Worker的心跳进程,然后通过心跳捎带给Master以供分析。
进行捎带处理的心跳使用rpc实现,具体实现如下。
先使用ICE的slice描述rpc的接口。
/**
* report to the master the task is successfully completed.
*
* @param taskID
* @param profileInfo, send the profileInfo piggybackly
*/
idempotent void completeTask(Address workerAddress, int taskID, ProfileInfo taskProfile);