数据, 术→技巧, 法→原理

大数据通识:Google MapReduce

钱魏Way · · 583 次浏览

Google,作为全球最大的搜索引擎公司,其伟大之处不仅在于建立了一个强大的搜索引擎,还在于它创造了3项革命性的技术,即:GFS、MapReduce 和 BigTable。作为 Google 早期三驾马车,这三项革命性的技术不仅在大数据领域广为人知,更直接或间接性的推动了大数据、云计算、乃至如今火爆的人工智能领域的发展。

2004年12月5日,Google 在美国旧金山召开的第6届操作系统设计与实现研讨会(Operating Systems Design and Implementation,OSDI)上,发表了论文《MapReduce: Simplified Data Processing on Large Clusters》(MapReduce:超大集群的简单数据处理),向全世界介绍了 MapReduce 系统的编程模式、实现、技巧、性能和经验。基于 MapReduce 编写的程序是在成千上万的普通PC机上被并行分布式自动执行的。它将所有服务器中的处理器有效地利用起来计算保存在谷歌文件系统中的海量数据并得到想要的结果。

Google MapReduce论文译文

摘要(Abstract)

MapReduce准确的来说应该是一个编程模型,或者说是一个用来处理和生产大量数据集的相关实现。用户通过指定一个map函数来处理一个K/V键值对,这个键值对会被map生成一系列的中间键值对集。通过reduce函数,我们再将那些相同的中间key所关联的value值合并。这篇论文会展示在该模型下的一些真实场景下的任务。

那些用函数式的编写的程序会被自动的,并行的执行在一个很大的集群上。这个运行时系统会自己处理好如何分割输入的数据,程序执行资源的调度,机器失败的处理,管理必要的机器间的通信等这些细节问题。这让那些没有任何并行计算和分布式系统工作经验的程序员可以轻松的利用好一个很大的分布式系统的资源。

我们的集群是高度可扩展的:一个典型的MapReduce程序可以在上千台机器上来处理TB级的数据。程序员会觉得这个系统非常好用:在Google的每天都有数百个MapReduce程序被实现,并且上千个MapReduce任务运行在集群上。

1. 简介(Introduction)

在过去的五年中,作者和许多Google的其他人已经实现了数百个用于特殊目的的计算:这些计算来处理大量原始数据,如爬虫抓取的文档,Web请求日志等,从而计算各种衍生的数据,例如倒排索引,Web文档的各种分析图像表示,每个主机爬取的页面数的结果汇总,在给定日期中最频繁请求的集合等。大多数此类计算从概念上讲是很直接。但是,输入数据量通常很大,为了在合理的时间内完成,这些计算必须分布在数百或数千台机器上。如何并行化计算,数据分布以及故障处理这些问题需要大量复杂的代码来处理他,这就让原来的简单的计算不那么容易了。

为了应对这种复杂性,我们设计了一个新的抽象,它允许我们只需要表达我想要执行的最简单的计算,但是隐藏了底层中对计算并行化,容错,数据分发和负载均衡的复杂的实现细节。这种抽象的灵感来自于Lisp和许多其他函数语言中的map和reduce原语。我们意识到,大部分我们的计算都涉及到使用map操作来作用于输入数据中每一个逻辑上的“记录”来生成一些列中间临时的kv键值对,然后我们在对这些键值对中相同key的值做reduce操作来适当的组合出派生数据。使用一个由用户来指定的map和reduce操作的这样一个函数式模型,让我们可以容易的并行化处理大型计算并可以使用重新执行来作为主要容错机制。

我们这项工作的主要贡献点是提供了简单而强大的接口,可以让大规模计算的自动并行化和分发,并通过此接口的实现,可以在大量的商用PC集群上获取高性能。

本文第2节描述了基本的编程模型,并给出了几个例子。第3节描述了一个针对基于群集的计算环境定制的MapReduce接口的实现。第4节描述了几个我们发现比较有用的编程模型。第5节使用各种任务的对我们的实现进行性能测试。第6节探讨了MapReduce在Google中的使用,包括我们使用它来重写生产索引系统的经验。第7节讨论相关以及未来的工作。

2. 编程模型(Programming Model)

MapReduce编程模型的原理是:利用一个输入 key/value pair 集合来产生一个输出的 key/value pair 集合。MapReduce 库的用户用两个函数表达这个计算: Map 和 Reduce 。

用户自定义的 Map 函数接受一个输入的 key/value pair 值,然后产生一个中间 key/value pair 值的集合。MapReduce 库把所有具有相同中间 key 值 I 的中间 value 值集合在一起后传递给 reduce 函数。

用户自定义的 Reduce 函数接受一个中间 key 的值 I 和相关的一个 value 值的集合。 Reduce 函数合并这些value 值,形成一个较小的 value 值的集合。一般的,每次 Reduce 函数调用只产生 0 或 1 个输出 value 值。通常我们通过一个迭代器把中间 value 值提供给 Reduce 函数,这样我们就可以处理无法全部放入内存中的大量的 value 值的集合。

2.1 例子(Example)

例如,计算一个大的文档集合中每个单词出现的次数,下面是伪代码段:

map(String key, String value): 
    // key: document name 
    // value: document contents     
    for each word w in value: 
        EmitIntermediate(w, “1″); 
 
reduce(String key, Iterator values): 
    // key: a word 
    // values: a list of counts     
    int result = 0; 
    for each v in values: 
        result += ParseInt(v);     
    Emit(AsString(result)); 

Map函数输出文档中的每个词、以及这个词的出现次数 在这个简单的例子里就是 1) 。 Reduce 函数把 Map函数产生的每一个特定的词的计数累加起来。

另外,用户编写代码,使用输入和输出文件的名字、可选的调节参数来完成一个符合MapReduce 模型规范的对象,然后调用 MapReduce 函数,并把这个规范对象传递给它。用户的代码和 MapReduce 库链接在一起用 C++ 实现 。附录 A 包含了这个实例的全部程序代码。

2.2 类型(Types)

尽管在前面例子的伪代码中使用了以字符串表示的输入输出值,但是在概念上,用户定义的Map 和 Reduce函数都有相关联的类型:

map(k1,v1) -> list(k2,v2)
reduce(k2,list(v2)) -> list(v2)

比如,输入的key和value值与输出的key和value值在类型上推导的域不同。此外,中间key和value值与输出key和value值在类型上推导的域相同。

我们的C++中使用字符串类型作为用户自定义函数的输入输出,用户在自己的代码中对字符串进行适当的类型转换。

2.3 更多例子(More Examples)

这里有一些有趣程序的简单示例,可以很容易地表现MapReduce计算。

  • 分布式的grep:如果一行数据匹配所给定的模式map函数就会将改行发出来,然后reduce函数是一个标识函数只负责将中间的数据拷贝到结果输出。
  • 计算URL访问频率:map函数处理web网页的请求日志并输出{URL,1}。然后reduce函数对一样的URL的值相加,并发出{URL,总和}的键值对。
  • 反转Web链接图:map函数将一个页面中所有能找到的叫source的链接和它的目标URL输出成{target,source}。然后reduce函数对相同target的source们聚合然后发出这个新的键值对{target, list(source)}。
  • 每个主机的Term-向量: Term-向量是一个{word, frequency}键值对的一个列表,它总结了一个文档或者一组文档里面出现的最重要的单词。map函数对每个输入文档(其主机名是从文档的URL中抽取出来的)处理为{hostname, term vector}并发 出去。reduce函数传递一个主机所有文档的term-向量,然后将这些向量累加起来,丢弃不常用的term,最后发出{hostname, term vector}键值对。
  • 倒排索引:map函数解析每个文档,并发出一系列{word, 文档ID}的键值对,reduce函数会对一个给定的单词接受所有的键值对,然后对该单词的 文档ID进行排序并发出{word, list(文档ID)}的键值对。所有输出的键值对集合就形成了一个简单的倒排索引。通过增加这个计算来跟踪单词的位置是很容易的。
  • 分布式排序:map函数从每个记录中提取key,并发出{key,记录}这个键值对。 reduce函数会不做修改的发送所有的键值对。此计算要依赖第1节中描述的分区工具和第4.2节中描述的排序属性。

3 实现(Implementation)

MapReduce接口的可以有很多不同的实现。正确的选择取决于环境。例如,一种实现方式可能适用于小型共享存储器机器,另一种实现方式适用于大型NUMA多处理器,而另一种实现方式适用于甚至更大的联网机器集。

本节描述了一个在Google被广泛使用的计算环境的实现:一天用交换式以太网连接在一起的大型商用PC集群。在我们的环境中:

  • 一般机器都是运行了Linux系统的双处理器x86处理器,每台机器有2-4 GB的内存。
  • 使用商用网络硬件 – 一般在单机器级别上为100M/秒或1G/秒,但在整体上带宽的平均值来要比这个带宽中位数要少得多。
  • 集群由数百或数千台机器组成,因此机器故障很常见。
  • 存储由廉价的直连在各个机器上的IDE磁盘提供。内部开发的分布式文件系统[8]用于管理存储在这些磁盘上的数据。文件系统通过复制在不可靠的硬件之上提供可用性和可靠性。
  • 用户将作业提交给调度系统。每个作业由一组任务组成,并由调度程序映射到集群中的一组可用计算机。

3.1 执行概述(Execution Overview)

Map函数的调用将被分布在多太机器上来自动将输入的数据分割为M份。这些被分割的输入可以在不同的机器上被并行处理。Reduce函数的调用也会被分布在多台机子上,它通过一个分区函数(比如hash(key) mod R)将map中生成的临时key来分片为R个片段。分区数量(R)以及分区函数由用户来指定。

图1显示了我们实现中MapReduce操作的总体流程。当用户程序调用MapReduce函数时,会发生以下一系列操作(图1中的编号标签对应于下面列表中的数字):

  • 用户程序中的MapReduce库首先将输入文件分成每块通常16M到64MB的M个块(这个可由用户通过可选参数控制)。然后,MapReduce会在一组计算机上启动该程序的许多副本【fork的过程】。
  • 该程序的其中一个副本是特殊的-master。其余都是由主人分配工作的worker。有M个map任务和R个reduce任务要分配。master会挑选闲置的worker并为每个人分配一个map任务或reduce任务。
  • 被分配了Map任务的worker要读取相应的被拆分的输入块的内容。它从输入数据中解析出键/值对,并将该键值对传递给用户定义的Map函数。Map函数生成的中间键/值对是缓存在内存中的。
  • 周期性地,缓存的键值对会被写入本地磁盘,通过分区函数将数据划分为R个区域。这些被缓存的数据序列化到本地磁盘上这的磁盘位置将传回给master,master再负责将这些位置转发给reduce的workers。
  • 当主服务器通知reduce workers这些位置时,它使用远程过程调用(RPC)从map的workers的本地磁盘读取缓冲数据。当reduce工作者读取了所有中间数据时,它会通过键来对这些数据进行排序,以便将所有出现的相同键组合在一起。之所以需要排序,因为通常很多不同的键都被映射到同一个reduce任务上。如果中间数据量太大而无法全存储在内存中,则使用外部排序。
  • reduce的worker对已排序的中间数据进行迭代,并且对于每个遇到的唯一key,它将key相应的中间value值集合传递给用户的Reduce函数。Reduce函数的输出结果被附加到这部分reduce分区上的一个最终输出文件上去。
  • 当完成所有map任务和reduce任务后,master会唤醒用户程序。此时,用户程序中的MapReduce调用会return返回到用户代码中。

成功完成后,MapReduce的执行结果就在那R个输出文件中(每个reduce任务一个,文件名由用户指定)。通常,用户不需要将这R个输出文件合并成一个文件,它们通常将这些文件作为输入传递给另一个MapReduce过程的调用,或者从另一个能够处理被分区为多个文件的来作为输入的分布式应用程序中使用它们。

3.2 Master的数据结构(Master Data Structures)

master拥有几中数据结构。对于每个map任务和reduce任务,它存储他们的状态(空闲,正在进行或已完成)以及正在工作机器的标识(用于非空闲任务)。

master是将这些文件区域的位置从map任务传播到reduce任务的管道。因此,对每一个完成的map任务,master会存储这些由map任务生成的R个中间文件块的位置和大小。当map任务完成是,这些文件位置和大小的变更也会被master接受。信息将逐步推送给正在进行reduce任务的workers。

3.3 容错(Fault Tolerance)

由于MapReduce库旨在帮助使用数百或数千台计算机处理大量数据,因此程序库必须能够优雅地容忍计算机故障。

Worker故障

master周期性的 ping 每个 worker 。如果 在一个约定的时间范围内没有收到 worker 返回的信息, master 将把这个 worker 标记为失效。所有由这个失效的 worker 完成的 Map 任务被重设为初始的空闲状态,之后这些任务就可以被安排给其他的 worker 。同样的, worker 失效时正在运行的 Map 或 Reduce 任务也将被重新置为空闲状态,等待重新调度。

当 worker 故障时,由于已经完成的 Map 任务的输出存储在这台机器上, Map 任务的输出已不可访问了,因此必须重新执行。而已经完成的Reduce 任务的输出存储在全局文件系统上,因此不需要再次执行。

当一个 Map 任务首先被 worker A 执行,之后由于 worker A 失效了又被调度到 worker B 执行,这个“重新执行”的动作会被通知给所有执行 Reduce 任务的 worker 。任何还没有从 worker A 读取数据的 Reduce 任务将从 worker B 读取数据。

MapReduce 可以处理大规模 worker 失效的情况。比如,在一个 MapReduce 操作执行期间,在正在运行的集群上进行网络维护引起 80 台机器在几分钟内不可访问了, MapReduce master 只需要简单的再次执行那些不可访问的 worker 完成 的工作,之后继续执行未完成的任务,直到最终完成这个 MapReduce 操作。

Master失败

一个简单的解决办法是让master 周期性的将上面描述的数据结构的写入磁盘,即检查点( checkpoint )。如果这个 master 任务失效了,可以从最后一个检查点( checkpoint )开始启动另一个master 进程。然而,由于只有一个 master 进程, master 失效后再恢复是比较麻烦的,因此我们现在的实现是如果 master 失效,就中止 MapReduce 运算。客户可以检查到这个状态,并且可 以根据需要重新执行 MapReduce操作。

在失效方面的处理机制

当用户提供的Map和Reduce操作是输入确定性函数(即相同的输入产生相同的输出)时,我们的分布式实现在任何情况下的输出都和所有程序没有出现任何错误、顺序的执行产生的输出是一样的。

我们依赖对Map和Reduce任务的输出是原子提交的来完成这个特性。每个工作中的任务把它的输出写到私有的临时文件中。每个Reduce任务生成一个这样的文件,而每个Map任务则生成R个这样的文件(一个Reduce任务对应一个文件)。当一个Map任务完成的时,worker发送一个包含R个临时文件名的完成消息给master。如果master从一个已经完成的Map任务再次接收到到一个完成消息,master将忽略这个消息;否则,master将这R个文件的名字记录在数据结构里。

当Reduce任务完成时,Reduce worker进程以原子的方式把临时文件重命名为最终的输出文件。如果同一个Reduce任务在多台机器上执行,针对同一个最终的输出文件将有多个重命名操作执行。我们依赖底层文件系统提供的重命名操作的原子性来保证最终的文件系统状态仅仅包含一个Reduce任务产生的数据。

使用MapReduce模型的程序员可以很容易的理解他们程序的行为,因为我们绝大多数的Map和Reduce操作是确定性的,而且存在这样的一个事实:我们的失效处理机制等价于一个顺序的执行的操作。当Map或/和Reduce操作是不确定性的时候,我们提供虽然较弱但是依然合理的处理机制。当使用非确定操作的时候,一个Reduce任务R1的输出等价于一个非确定性程序顺序执行产生时的输出。但是,另一个Reduce任务R2的输出也许符合一个不同的非确定顺序程序执行产生的R2的输出。

考虑Map任务M和Reduce任务R1、R2的情况。我们设定e(Ri)是Ri已经提交的执行过程(有且仅有一个这样的执行过程)。当e(R1)读取了由M一次执行产生的输出,而e(R2)读取了由M的另一次执行产生的输出,导致了较弱的失效处理。

3.4 存储位置(Locality)

在我们的计算运行环境中,网络带宽是一个相当匮乏的资源。我们通过尽量把输入数据(由GFS管理)存储在集群中机器的本地磁盘上来节省网络带宽。GFS把每个文件按64MB一个Block分隔,每个Block保存在多台机器上,环境中就存放了多份拷贝(一般是3个拷贝)。MapReduce的master在调度Map任务时会考虑输入文件的位置信息,尽量将一个Map任务调度在包含相关输入数据拷贝的机器上执行;如果上述努力失败了,master将尝试在保存有输入数据拷贝的机器附近的机器上执行Map任务(例如,分配到一个和包含输入数据的机器在一个switch里的worker机器上执行)。当在一个足够大的cluster集群上运行大型MapReduce操作的时候,大部分的输入数据都能从本地机器读取,因此消耗非常少的网络带宽。

3.5 任务粒度(Task Granularity)

如上所述,我们把map拆分成了M个片段、把Reduce拆分成R个片段执行。理想情况下,M和R要比集群中worker的机器数量大的多。在每个worker机器执行大量的不同任务有助于集群的动态的负载均衡,并且一个worker失败是可以加快恢复速度:该失效机器上已经完成的许多map任务可以分布到任意其他的worker机器上去执行。

我们的具体实现中M和R可以取多大是有一定的实际界限的,因为master必须执行复杂度为O(M+R)次调度,并且在内存中保存O(MxR)个状态(但是对内存使用的影响因子还是很小的:O(MxR)块状态,大概每对Map任务/Reduce任务差不多只有1个字节)。此外,R值通常是用户指定的,因为每个Reduce任务最终都会生成一个单独的输出文件。实践中,我们也倾向于选择合适的M值,以便每一个独立任务处理的都是大约16M到64M的输入数据(这样上面说的存储位置优化才最有效),并且我们让R设为我们打算使用的worker机器数比较小的倍数。我们通常会用这样的比例来执行MapReduce计算:M=200,000,R=5,000,使用2000台worker机器。

3.6 备份任务(Backup Tasks)

一个延长MapReduce总执行时间的常见原因是因为一个“落伍者”:在计算过程中一台机器花了很长的时间才完成最后几个Map或Reduce任务。出现“落伍者”的原因有很多。比如:如果一个机器的硬盘有问题就可能导致频繁的纠错操作,而让读取数据的速度从30M/s降低到1M/s。集群的调度系统可能给这台机器上又调度了其他的任务,由于CPU、内存、本地硬盘和网络带宽等问题,导致这台机器执行MapReduce代码的更慢。我们最近遇到的一个问题是因为机器的初始化代码有bug,导致处理器缓存被禁用:受影响的计算机的计算速度减慢了一百多倍。

我们有一个通用的机制来减少“落伍者”出现。当一个MapReduce操作快要完成的时候,master调度备份任务进程来执行剩下的还在处理中的任务。无论是最初的执行、还是备份任务执行结束都标记这个任务已经完成。我们已经调整了这个机制,让它所带来额外的计算资源不会超过几个百分点。我们发现这显着缩短了完成大型MapReduce操作的时间。例如,在5.3节描述的排序任务,在关闭掉备用任务的情况下要多花44%的时间才能完成排序任务。

4 改进(Refinements)

虽然简单的Map和Reduce函数提供的基本功能已经能够满足大部分的计算需要,我们还是发掘出了一些有价值的扩展功能。本节将描述这些扩展功能。

4.1 分区功能(Partitioning Function)

MapReduce的使用者通常会指定他们想要的Reduce任务以及输出文件的数量(R)。数据会使用中间key通过分区函数被分区。一个默认的分区函数是使用hash分区(比如,hash(key) mod R)进行分区。hash分区能产生十分平衡的分区。然而,在某些情况下,使用其它的一些分区函数对key值进行的分区也非常有用。比如,输出的key结果是URLs,我们希望每个主机的所有记录都保存到同一个输出文件中。为了支持类似的情况,MapReduce库的用户可以提供一个专门的分区函数。例如,使用hash(Hostname(urlkey)) mod R作为分区函数让来自所有的来自同一个主机的URLs最终保存在同一个输出文件中。

4.2 排序保证(Ordering Guarantees)

我们保证在给定的分区中,中间键值对数据会按照key值增量顺序来处理。这个顺序处理保证了可以很容易的让每个分区生成的输出文件也是有序的,当输出文件的格式需要能够支持高效的按key的随机访问查找的时候,将会变得很有用,亦或者是输出文件的使用者会发现对结果的数据集进行排序会很方便。

4.3 组合功能(Combiner Function)

在某些情况下,map函数产生的中间key值的重复数据会占很大的比重,并且用户给定的reduce函数满足交换律和结合律。2.1节中统计词频的例子就是个很好的例子。由于词频倾向于一个Zipf分布(齐夫分布),每个map任务将产生成百上千<the,1>格式的记录。所有的这些计数将通过网络被发送到一个单独的reduce任务,然后通过reduce函数将这些值累加起来生成一个数字。我们允许用户指定一个可选的Combiner函数,可以让这些数据在发送到网络之前可以先部分进行合并。

Combiner函数是在每台执行Map任务的机器上执行的。通常Combiner和Reduce函数的实现使用的代码是一样的。唯一的区别是MapReduce库怎样处理函数的输出。reduce函数的输出被写入到最终输出文件里,而Combiner函数的输出是被写到中间文件里,然后会再被发送给一个reduce任务。

部分合并可以显著的加速一些MapReduce操作。附录A包含一个使用combiner函数的例子。

4.4 输入输出类型(Input and Output Types)

MapReduce库支持几种不同的输入数据的格式。比如,文本模式下输入数据的每一行被视为是一个键值对:key是该行在文件中偏移量,value是那一行的内容。另外一种常见的格式是存储了一个按照key排序的键值对序列。每种输入类型的实现都需要能够把输入数据分割成数据片段,以使得可以让单独的map任务来进行后续处理(例如,文本模式分割的后范围必须确保只会在每行的边界进行范围分割)。尽管大多数使用者大部分情况下都只需要使用MapReduce预定义的少部分输入类型中的一个,但是使用者依然可以通过提供一个Reader接口简单的实现就可以对一个新的输入类型提供支持。

一个reader并不一定要从文件中读取数据,比如,我们可以很容易定义一个reader,让他从一个数据库里读取记录,或者从内存中的数据结构读取数据。

以类似的方式,我们提供了一系列预定义的输出数据的类型用来可以产生不同格式的数据,并且用户代码可以很容易增加对新的输出类型的支持。

4.5 副作用(Side-effects)

在某些情况下,MapReduce的使用者发现,如果在Map和/或Reduce操作过程中增加辅助的输出文件会比较省事。我们依靠程序writer把这种“副作用”变成原子的和幂等的3。通常应用程序首先把输出结果写到一个临时文件中,在输出全部数据之后,在使用系统级的原子操作rename重新命名这个临时文件。如果一个任务产生了多个输出文件,我们没有提供类似两阶段提交的原子操作支持这种情况。因此,对于会产生多个输出文件、并且对于跨文件有一致性要求的任务,都必须是确定性的任务。但是在实际应用过程中,这个限制还没有给我们带来过麻烦。

4.6 跳过损坏的记录(Skipping Bad Records)

有时候,用户程序中的bug导致Map或者Reduce函数在处理某些记录的时候crash掉,MapReduce操作无法顺利完成。惯常的做法是修复bug后再次执行MapReduce操作,但是,有时候找出这些bug并修复它们不是一件容易的事情;这些bug也许是在第三方库里边,而我们手头没有这些库的源代码。而且在很多时候,忽略一些有问题的记录也是可以接受的,比如在一个巨大的数据集上进行统计分析的时候。我们提供了一种执行模式,在这种模式下,为了保证保证整个处理能继续进行,MapReduce会检测哪些记录导致确定性的crash,并且跳过这些记录不处理。

每个worker进程都设置了信号处理函数捕获内存段异常(segmentation violation)和总线错误(bus error)。在执行Map或者Reduce操作之前,MapReduce库通过全局变量保存记录序号。如果用户程序触发了一个系统信号,消息处理函数将用“最后一口气”通过UDP包向master发送处理的最后一条记录的序号。当master看到在处理某条特定记录不止失败一次时,master就标志着条记录需要被跳过,并且在下次重新执行相关的Map或者Reduce任务的时候跳过这条记录。

4.7 本地执行(Local Execution)

调试Map和Reduce函数的问题可能非常的棘手,因为实际计算是在分布式系统中执行的,通常在几千台计算机上执行,具体执行任务的分配是由master进行动态调度决定的。为了简化调试、分析和小规模测试,我们额外开发了一套MapReduce库,可以让MapReduce任务在本机上顺序执行所有任务。控制权交给用户,让用户可以把计算限制到特定的Map任务上。用户通过设定特殊的标志来本地调用他们的程序,这样可以很容易的使用任意调试和测试工具(比如gdb)。

4.8 状态信息(Status Information)

master运行了一个内嵌的HTTP服务器并暴露一组状态信息页供人监控。状态页显示着计算的进度,比如已经完成了多少任务、有多少任务正在处理、输入的字节数、中间数据的字节数、输出的字节数、处理百分比等等。页面还包含了每个任务所生成的标准错误和标准输出文件的链接。用户利用这些数据预测计算需要执行大约多长时间、是否需要增加额外的计算资源。这些页面也可以被用来分析什么时候计算执行的比预期的要慢。

另外,处于最顶层的状态页显示了哪些worker失效了,以及他们失效的时候正在运行的Map和Reduce任务。这些信息在用户尝试诊断用户代码中的bug的时候十分有用。

4.9 计数器(Counters)

MapReduce库提供了一个计数器来统计各种事件发生次数。比如,用户可能想统计已经处理单词的总数、或者已经索引的德国文档的数量等等。

为了使用这个功能,用户要在程序中创建并命名一个的计数器对象,并在适当的时候在Map和Reduce函数中增加计数器的值。例如:

Counter * uppercase;
uppercase = GetCounter("uppercase");
map(String name, String contents):
    for each word w in contents:
    if (IsCapitalized(w)):
        uppercase - > Increment();
EmitIntermediate(w, "1");

这些计数器的值周期性的从各个单独的worker机器上传送到master(附加在ping的响应包中)。master将成功的Map和Reduce任务的计数器值聚合,并在MapReduce操作完成之后,返回给用户代码。当前计数器的值也会显示在master的状态页面上,这样用户就可以观察当前活着的计算的进度。在聚合计数器的值时,master要消除重复运行的Map或者Reduce任务的影响,避免重复累加(备份任务的使用和失效后重新执行任务这两种情况会导致重复执行)。

有些计数器的值是由MapReduce库自动维持的,比如已经处理的输入的键值对数量、输出的键值对的数量等。

使用者会发现计数器功能对于检查MapReduce操作的行为非常有用。比如,在某些MapReduce操作中,用户需要确保输出的键值对数量完全等于输入的键值对数量,或者处理的德国文档数量在处理的整个文档数量在可容忍的范围。

5 性能(Performance)

在这节我们在一个大型集群上运行的两个计算来测量MapReduce的性能。一个计算来在大约1TB的数据中搜索出特定模式匹配的数据,另一个计算对大约1TB的数据进行排序。

这两个程序在大量MapReduce使用者的真实程序中是很有代表性— 一类是将数据从一种表现形式转换为另外一种表现形式;另一类是从大的数据集中抽取少量用户感兴趣的数据。

5.1 集群配置(Cluster Configuration)

所有的程序都是在一个由大约1800台机器组成的集群上执行的。每个机器都配备两个2GHz的Intel Xeon处理器,支持超线程,4GB内存,两个160GB的IDE磁盘和一个千兆以太网卡。这些机器被部署在一个两层的树形交换网络中,在根节点大约有100-200 Gbps的总带宽可用。所有这些机器都采用相同的部署,因此任何一对机器之间的网络往返时间小于1毫秒。

在4GB的内存中,大约有1-1.5GB是由集群上运行的其他任务保留的。测试的程序是在周末下午执行的,这时的cpu、磁盘和网络大部分处于空闲状态。

5.2 测试1 – 查找(Grep)

grep程序会扫描大概10的10次方个单个100个字节组成的记录,查找一个相对少见的3个字符的模式(这个模式在92337个记录有出现)。输入数据被分为差不多64M一片的若干分片(M=15000),整个输出数据存放在一个文件中(R=1)。

图2显示了这个运算随时间的处理过程。Y轴表示输入数据的处理速度。处理速度随着分配给MapReduce计算的机器数量的增加而增加,当1764台worker被分配参与计算的时,处理速度达到了30GB/s。当Map任务结束的时候,即在计算开始后80秒,输入的处理速度降到0。整个计算从开始到结束一共花了大概150秒。这包括了大概一分钟的初始启动阶段。初始启动阶段时间是因为要把这个程序传送到各个worker机器上的时间、等待GFS文件系统打开1000个输入文件集合的时间、获取相关的文件本地位置优化信息的时间。

5.3 测试2 – 排序(Sort)

排序程序会排序10的10次方单个100个字节组成的记录(大概1TB的数据)。这个程序是模仿TeraSort的benchmark[10]。

排序程序由不到50行代码组成。只有三行的Map函数从文本行中抽取出一个10字节的key值作为排序的key,并且把这个key和原始文本行作为中间的键值对输出。我们使用了一个内置的恒等函数作为Reduce操作函数。这个函数把中间的键值对不作任何改变输出。最终排序结果输出到两路复制的GFS系统文件中(也就是说,程序输出2TB的数据)。

如前面提到的,输入数据被了分成64MB的片(M=15000)。我们把排序后的输出结果分区后存储到4000个文件中(R=4000)。分区函数使用key的原始字节来把数据分区到R个片段中。

我们这个benchmark测试中分区函数知道key的分布情况。对于一个通常的排序程序来说,我们会增加一个预处理的MapReduce操作来采样key的分布情况,通过采样key的分布来计算对最终排序处理的分区点。

图三(a)显示了这个排序程序一个正常的执行过程。左上图显示的是输入数据读取的速度。数据读取速度峰值会达到13GB/s,并且所有Map任务完成之后,即大约200秒之后迅速滑落到0。注意一点,排序程序输入数据读取速度小于上面的grep程序。这是因为排序程序的Map任务花了大约一半的处理时间和I/O带宽把中间输出结果写到本地硬盘。而对应的分布式grep程序的中间结果输出几乎可以忽略不计。

左边中间的图显示了中间数据从Map任务发送到Reduce任务的网络速度。这个过程从第一个Map任务完成之后就开始缓慢启动了。图中的第一个峰值是启动了第一批大概1700个Reduce任务(整个MapReduce分配到大概1700台机器上,每台机器1次最多执行1个reduce任务)。计算运行差不多300秒后,一些第一批启动的Reduce任务完成了,我们开始为剩下的Reduce任务shuffle数据。所有的处理在大约600秒后结束。

左下图表示Reduce任务把排序后的数据写到最终的输出文件的速度。在第一个排序阶段结束和数据开始写入磁盘之间有一个小的延时,这是因为worker机器正在忙于排序中间数据。写入的速度在2-4GB/s持续一段时间。整个写入时间大约持续850秒。再包括初始启动时间,整个运算消耗了891秒。这和TeraSort benchmark[18]的最好纪录1057秒很接近了。

还有一些东西要提一下:输入数据的读取速度比数据shuffle速度和输出数据写入磁盘速度要高,这是因为我们的输入数据本地化优化策略—大部分数据都是从本地硬盘读取的,从而绕过了网络带宽的限制。排序速度比输出数据写入到磁盘的速度快,这是因为输出数据写了两份(为了保证数据可靠性和可用性我们使用了双备份的GFS文件系统)。我们把输出数据写入到两个复制节点的原因是因为这是底层文件系统的保证数据可靠性和可用性的实现机制。在输出数据写入磁盘的时候如果底层文件系统使用编码容错技术(erasure coding)[14]的方式而不是复制的方式保证数据的可靠性和可用性,那么就可以减少对网络带宽的需求。

5.4 备份任务的影响(Effect of Backup Tasks)

图三(b)显示了关闭备份任务后排序程序执行情况。执行的过程和图3(a)很相似,除了输出数据写磁盘的动作在时间上拖了一个很长的尾巴,而且在这段时间里,几乎没有什么写入动作。在960秒后,除了5个reduce任务其他任务都完成。这些拖后腿的任务又执行了300秒才完成。整个计算消耗了1283秒,多了44%的执行时间。

5.5 机器故障(Machine Failures)

在图三(c)中演示的排序程序执行的过程中,在几分钟中内我们故意的杀死了1746个worker中的200个。集群底层的调度立刻在这些机器上重新开始新的worker处理进程(因为只是worker机器上的worker进程被kill了,机器本身还在工作)。

worker的死亡显示了一个负的输入数据读取率,因为一些以前完成的map任务丢失了了(因为相应的map任务的worker被kill了),需要重新执行这些任务。这些Map任务很快就被会被重新执行。整个计算过程在933秒内完成,这包括启动开销(只比正常执行时间增加5%)。

6 经验(Experience)

我们在2003年1月写了首个版本的MapReduce库,在2003年8月做了有了显著的增强,这包括了数据本地优化、worker机器之间任务的动态负载均衡等等。从那以后,我们惊讶的发现,MapReduce库能广泛应用于我们日常工作中遇到的各类问题。它现在广泛的被Google内部各个领域广泛的使用,包括:

  • 大规模机器学习问题,
  • Google News和Froogle产品的集群问题,
  • 从流行的查询记录中抽取数据来生成报告(比如Google的Zeitgeist),
  • 从大量的新应用和新产品的网页中提取有用信息(比如,从大量的位置搜索网页中抽取地理位置信息),
  • 大规模的图计算。

图四显示了在我们的源代码管理系统中,这段时间内独立的MapReduce程序数量的巨大增长。从2003年早些时候的0个增长到2004年9月份的差不多900个不同的实例。MapReduce的成功是因为MapReduce库可以在不到半个小时时间内写出一个简单的程序,并能够有效的在上千台机器的组成的集群上运行,这大大的加速了开发和原形制作的周期。另外,MapReduce让完全没有分布式和/或并行系统开发经验的程序员很容易的利用大量的资源。

在每个任务结束的时候,MapReduce库日志分析出计算资源的使用状况。在表1,我们展示了2004年8月份在Google运行的MapReduce任务所占用的统计数据的一部分子集数据。

6.1 大规模的索引(Large-Scale Indexing)

目前MapReduce一个最成功的使用就是重写了Google网络搜索服务所使用到的索引构建系统。索引系统通过网络爬虫抓取回来的海量的文档作为输入数据,这些数据保存在GFS中。这些文档的原始内容有超过20TB的数据。索引程序是通过一系列大约5到10次的MapReduce操作来建立索引。使用MapReduce(而不是先前版本的分布式索引系统)带来了这些好处:

  • 索引的代码更加简单、小巧、并且容易理解,因为处理容错、分布式以及并行计算的代码都隐藏在MapReduce库中。例如,使用MapReduce库,一个计算阶段的代码行数从原来的3800行C++代码减少到大概700行代码。
  • MapReduce库的性能已经足够好了,因此我们可以把在概念上不相关的计算步骤分开,而不是混在一起来避免额外传递数据的开销。这样做也使得我们可以很容易去改变索引处理方式。比如,对之前的索引系统的一个修改可能要花好几个月的时间,但MapReduce只需要花几天时间就可以实现了。
  • 索引过程变得更容易操作,因为由机器、机器运行缓慢和网络故障引起的问题都是由MapRe- duce库自动处理的,不需要操作员的干预。此外,通过向in- dexing集群中添加新机器,可以很容易地提高索引过程的性能。
  • 构建索引的过程也变得更加容易操作了。因为大多数由机器故障、机器运行速度缓慢、以及网络故障阻塞等引起的问题都已经由MapReduce库解决了,不需要操作员的干预。此外,我们只需要通过向索引集群中添加新机器,就可以很容易地提高索引构建的性能。

7 相关工作(Related Work)

很多系统都提供了严格的编程模式,并且通过对编程的严格限制来实现并行计算。例如,一个结合函数可以通过把N个元素的数组的前缀在N个处理器上使用并行前缀算法,在log N的时间内计算完。MapReduce可以看作是我们结合在真实环境下处理海量数据的经验,对这些经典模型进行简化和萃取的成果。更加值得骄傲的是,我们还实现了基于上千台处理器的集群的容错处理。相比而言,大部分并发处理系统都只在小规模的集群上实现,并且把容错处理交给了程序员。

Bulk Synchronous Programming[17]和一些MPI原语[11]提供了更高级别的并行处理抽象,可以更容易写出并行处理的程序。MapReduce和这些系统的关键不同之处在于,MapReduce利用限制性编程模式实现了用户程序的自动并发处理,并且提供了透明的容错处理。

我们数据本地优化策略的灵感来源于active disks[12,15]等技术,在active disks中,计算任务是尽量推送到数据存储的节点处理6,这样就减少了网络和IO子系统的吞吐量。我们在挂载几个硬盘的普通机器上执行我们的运算,而不是在磁盘处理器上执行我们的工作,但是达到的目的一样的。

我们的备用任务机制和Charlotte System[3]提出的eager调度机制比较类似。Eager调度机制的一个缺点是如果一个任务反复失效,那么整个计算就不能完成。我们通过忽略引起故障的记录的方式在某种程度上解决了这个问题。

MapReduce的实现依赖于一个内部的集群管理系统,这个集群管理系统负责在一个超大的、共享机器的集群上分布和运行用户任务。虽然这个不是本论文的重点,但是有必要提一下,这个集群管理系统在理念上和其它系统,如Condor[16]是一样。

MapReduce库的排序机制和NOW-Sort[1]的操作上很类似。读取输入源的机器(map workers)把待排序的数据进行分区后,发送到R个Reduce worker中的一个进行处理。每个Reduce worker在本地对数据进行排序(尽可能在内存中排序)。当然,NOW-Sort没有给用户自定义的Map和Reduce函数的机会,因此不具备MapReduce库广泛的实用性。

River[2]提供了一个编程模型:处理进程通过分布式队列传送数据的方式进行互相通讯。和MapReduce类似,River系统尝试在不对等的硬件环境下,或者在系统颠簸的情况下也能提供近似平均的性能。River是通过精心调度硬盘和网络的通讯来平衡任务的完成时间。MapReduce库采用了其它的方法。通过对编程模型进行限制,MapReduce框架把问题分解成为大量的“小”任务。这些任务在可用的worker集群上动态的调度,这样快速的worker就可以执行更多的任务。通过对编程模型进行限制,我们可用在工作接近完成的时候调度备用任务,缩短在硬件配置不均衡的情况下缩小整个操作完成的时间(比如有的机器性能差、或者机器被某些操作阻塞了)。

BAD-FS[5]采用了和MapReduce完全不同的编程模式,它是面向广域网的。

不过,这两个系统有两个基础功能很类似。(1)两个系统采用重新执行的方式来防止由于失效导致的数据丢失。(2)两个都使用数据本地化调度策略,减少网络通讯的数据量。

TACC[7]是一个用于简化构造高可用性网络服务的系统。和MapReduce一样,它也依靠重新执行机制来实现的容错处理。

8 结束语(Conclusions)

MapReduce编程模型在Google内部成功应用于多个领域。我们把这成功归结为几个原因:首先,模型使用简单,即便对于完全没有并行或者分布式系统开发经验的程序员而言很很容易上手,因为MapReduce封装隐藏了并行处理、容错处理、数据本地化优化、负载均衡等底层实现细节。其次,大量各种类型的问题都可以通过MapReduce的计算形式来表示。比如,MapReduce用于在Google的web搜索服务所需的数据生成、用于排序、用于数据挖掘、用于机器学习,以及很多其它的系统;第三,我们开发了一·个MapReduce的实现,可以运行在一个包含了数千台机器组成的大规模集群。这个实现可以更加充分的利用这些丰富的计算资源,因此很适合用来解决Google遇到的那些需要大量计算的问题。

我们也从这份工作中学到了一些东西。首先,约束编程模式可以使并行和分布式计算变得容易,也易于让这样的计算可以容错;其次,网络带宽是稀有资源。我们系统中大量的系统都是为了减少网络的数据传输:本地优化策略使大量的数据从本地磁盘读取,并且写一份中间文件的副本在本地磁盘上也节省了网络带宽;第三,冗余的执行任务可以减少性能慢的机器带来的负面影响,并且解决由于机器失败导致的数据丢失问题。

Google MapReduce 论文学习

MapReduce 是什么?用来做什么?

MapReduce 的本质是一种编程模型,是一种编程方法,抽象理论,其主要用于大规模数据集的并行运算。它极大地方便了编程人员在不会分布式并行编程的情况下,将自己的程序运行在分布式系统上。

MapReduce 在以下背景下产生:单机系统无法处理海量数据,在各大互联网公司已经广泛采用分布式系统,但是缺少同一的范式来简化问题,导致只有少数的技术大牛才能写出、搭建出良好的分布式系统。

MapReduce 主要由两个概念组成:

  • Map 就是将大数据拆解(映射)为数据集的过程,比如说有辆红色的小汽车,有一群工人,把它拆成零件了,这就是 Map。
  • Reduce 就是组合(聚集),我们有很多汽车零件,还有很多其他各种装置零件,把它们一起拼装,变成变形金刚,这就是 Reduce。

MapReduce 的核心思想是:对于 PB 级别的数据,它们一定是分布式地存储于不同主机的磁盘上。传统单机的 “输入 –> 计算 –> 输出” 模型已经不能满足分布式计算的需求,因为传统单机的输入是本地磁盘,而分布式的输入可以是其他磁盘,因此涉及不稳定且带宽受限的网络。如果我们将数据通过网络 I/O 收集到部分主机上进行计算,那么计算的性能主要受限于网络带宽,而不是 CPU、内存。MapReduce 深刻地洞察了这个缺陷,其巧妙地避免了数据在计算时的大批量网络 I/O:既然数据是庞大的,而程序要比数据小得多,将数据输入给程序是不划算的,那么就反其道而行之,将程序分发到数据所在的地方进行计算,也就是所谓的移动计算比移动数据更划算。

另一方面 Map 和 Reduce 概念首次提出并不是在 Google 的这篇论文上,而是在 Lisp 语言中提出。Lisp 是一门古老而有先进的语言,起源于 1958 年,显得古老。但是其推崇函数式编程,到现在看起来,理念还是非常先进。Map 和 Reduce 实际上也是推荐使用函数式编程的方式进行处理,因为通常我们会面临不同结构、内容的初始数据集,但是具体处理的方法逻辑是相同的,函数式风格(functional style)就能很好地将相同的方法逻辑抽象出来。因此 Map、Reduce 只需要专注于单个处理元素,比如一份 html 文档、一个日志文件,而非 TB 级别的数据。

事实上,Java 在 JDK8 中也提出 Map/Reduce函数式编程模型,因为 Map/Reduce 本身就源于函数式编程模型,下面是两个例子。

//java.util.stream.Stream#map 将 String 元素映射为其长度
Object[] lens =  Stream.of("hello","world","spongecaptain").map(String::length).toArray();
for (Object len : lens) {
  System.out.print(len + " ");
}
//仅仅为了换行
System.out.println();

//java.util.stream.Stream#reduce 计算流中所有元素的和
int sum = Stream.of(1, 2, 3, 4, 5, 6, 7, 8, 9).reduce(0, (acc, n) -> acc + n);
System.out.println(sum);

MapReduce 用户编程范式

用户编程范式指的是非架构程序员,即业务程序员该如何来编写 MapReduce 代码,这部分代码被称为 User Program。

我们先来看看论文是怎么说的。在第一个节我们提到了 Map、Reduce 都分别对应着一个函数式编程中的方法体,这里我们用框图来表示它们的输入和输出。

首先是作为一个 MapReduce 整体,其输如和输入如下图所示:

输入输出都是一系列键值对,或者说是元素类型为键值对的集合。这通常被称为 Job。

其次,对于 Map 的函数式编程模型如下:

输入:一对键值对 key-value;输出:键值对集合 a set of key-value;Map 方法执行都被称为 Task。

最后,对于 Reduce 的函数式编程模型如下:

输入:键值对集合 a set of key-value,且他们的 key 要求是相同的;输出:一对键值对 key-value,如果干脆就什么都输出。Reduce 方法执行也被称为 Task。

注意事项:key-value 数据集的样式相同并不代表他们内容相同,我在这主要是为了图方便,使部分框格的样式相同。

Map 与 Reduce 的关系是由 MapReduce 分布式系统来协调和确定的:

  • 首先,分布式系统中的不同机器使用 Map 函数来处理由 MapReduce 系统分发来的不同 key-value 键值对,然后分别产生不尽相同的键值对集合,这被称为:a set of intermediate key/value pairs;
  • 其次,MapReduce 系统负责统一接管由多个 Map 运算得到的多个 键值对集合,然后对这些集合进行聚合处理,产生拥有着相同 key 的 key-value 键值对集合交给 Reduce 来处理;
  • 最后,Reduce 负责接收一个有着相同 key 的 key-value 键值对集合,然后进行处理,最终生产一个 key-value 键值对,或者干脆就不生产;

事实上为了取出 key 的冗余存储,相同 key 的键值对会优化为单个 key 以及一个 List 或者 Iterator 迭代器。

仅仅用语言描述可能有点抽象,下面用论文中的伪代码来解释 MapReduce。

场景 1:我们有数以百万计的 html 文档(比如它们都是关于 NBA 的),我们要求对这些文档中所有出现过的单词做一个统计,要求:

  • 系统输入为:数以百万计的 html 文档;
  • 系统输出为:a set of key-value,key 为单词字符串,value 为单词在上述文档中出现的次数;

Map 的伪码如下:

// key: docomenmt name 文档名
// value: document contents 文档的内容
map(String key, String value):
  for each word w in value:
    EmitIntermediate(w,"1");
// key: a word 一个单词
// value: a list of counts 单词出现次数对应的 list 的迭代器
reduce(String key,Iterator values):
  int result = 0;
  for each v in values:
    result += ParseInt(v);
  Emit(AsString(results));

一种通用的解决方案是在整个 MapReduce 计算快要结束时,master 对当前处于 in-progress 状态的 task 进行备份,无论是原来的 task 执行完毕还是备份的 task 执行完毕,那么就认为该 task 完成了。

导致整个 MapReduce 计算过程被延迟的原因之一是过多的时间花费在最后几个 map task 或 reduce task 上。导致这个问题的原因由很多,可能是因为 task 所在的节点硬盘的读写速度非常慢,同时 master 又有可能把新的 task 分配给了该节点,所以引入了更加激烈的 CPU 竞争、内存竞争。

Backup Tasks

通常情况下,R的大小是由用户指定的,而对M的选择要保证每个task的输入数据大小,即一个输入分片在16MB~64MB之间,这样可以最大化的利用数据本地性。

在具体的实现中,M和R的大小是有实际限制的,因为master至少要做O(M+R)次的调度决策,并且需要保持O(M\*R)个状态。

从上文我们可以得知,map阶段被划分成M个task,reduce阶段被划分成R个task,M和R一般会比集群中节点的个数大得多。每个节点运行多个task有利于动态的负载均衡,加速worker从失败中恢复。

Task Granularity

输入数据由GFS来管理,GFS把数据存储在集群节点的本地磁盘上,GFS将文件分割为64MB大小的块,并且针对每个块会做冗余(一般冗余2份)。master利用输入数据的位置信息,将map task分配给输入数据所在的节点。

如果在计算过程中出现了失败的情况,那么master会把任务调度给离输入数据较近的节点。

网络带宽在计算环境中属于一种非常稀缺的资源,利用输入数据的特性可以减小网络带宽。

Locality

可以通过定期建立检查点的方式来保存master的状态。但是,Google当时的做法是考虑到只有一个master,所以master出现故障的概率很小,如果出现故障了,重新开始整个MapReduce计算。

Master Failure

如果一个map task一开始运行在worker A上,接着由于worker A failed导致该map task迁移到worker B上。那么读取该map task输出数据并且处于正在执行的reduce worker会收到重新执行reduce task的通知,任何还未开始读取数据的reduce task也会收到通知。reduce worker接下来会从worker B上读取数据。

对于已完成的map task,也需要重新被执行。因为map task的输出是在worker的本地磁盘上,因为worker已经失联了,所以map task的输出数据自然也获取不到。对于已完成的reduce task,不再需要重新执行。因为reduce task的输出是在全局的文件系统(GFS)上。

master会定期ping worker,如果worker没有响应并且超过了一定的次数,那么master就认为worker已经failed了。因此,所有在该worker上完成的task的状态将会被重置为初始的idle状态,并且这些task需要被重新分配到其它的worker上去。类似的,该worker上处于in-progress状态的task也会被重置为最初的idle状态,并被重新分配到其它worker上去。

Worker Failure

正是因为集群中主机数量多,因此一天总会有两三台主机出现故障,因此分布式系统的管理策略很重要。这里还是把 MapReduce 的系统架构图放在这。

MapReduce 分布式系统的管理应当取决于具体的集群设备性能,Google 实现的 MapReduce 其内部有着成百上千的个人主机(2004 年,现在一定不止这个规模),每一个主机仅仅有着 2-4 GB 的内存,网络带宽也仅仅是商用网络,分布式存储则是由 GFS 文件系统来管理。

MapReduce 分布式系统的管理

下面还是拿第二节中为所有文档中的单词计数,不过这里将问题简化为只有 3 篇文档,只不过它们都要分别交给不同的 Worker 进行处理:

在执行完成后,实际上产生了 Reduce 节点个数的输出文件,每个reduce worker对应一个。这些文件暂时不需要不需要合并,因为它们往往是下一个 MapReduce 处理逻辑的输入数据。而在下一个 MapReduce 上会进行 Shuffle 操作,即从分布在不同磁盘上的有着相同 key 的 key-value pairs 聚集到一个磁盘上,这通常是通过网络来完成的,因此这也是 MapReduce 代价最高的一部分。

 

  • Input 和 Split 过程:MapReduce 库函数将输入数据切分为 M 个分片(分片的大小一般为 16~64 MB,用户可以设置分片大小),并把用户程序拷贝到集群中的多个节点(为了分区容错)。
  • 节点说明:图中有分为两种节点:Master 节点用于给空闲的 Worker 节点分配具体的任务,具体的任务可以是 Map 也就是 Reduce。Worker 即可以做 Map 也可以做 Reduce,其可以执行交替 Map 和 Reduce 方法;
  • 被分配 Map 任务的 Worker 的工作为:
    • 首先加载分片 split 数据;
    • 然后将 spilit 数据解析为 a set of key-value pairs;
    • 接着对每一个key-value 用 map 函数进行处理,处理的结果是 a set of intermediate key-value pairs;
    • 处理结果优先存放到内存中的缓存,除非容量不足;
    • 缓存在内存中的key/value pairs 会被划分为 R 个分区,并定期写入到本地磁盘中。写入磁盘的位置会被推送给 master 节点,master 节点会将磁盘的位置信息转发给下一阶段执行 reduce 任务的节点(reduce worker)。
  • 被分配 Reduce 任务的 Worker 的工作为:
    • 接收 Master 传来的 a set of intermediate key-value pairs 所处的磁盘位置信息;
    • 读取相应的磁盘中的数据,当所有的数据读取完毕后,在内存中按照key 将所有的 key/value pairs 进行一次排序。
    • 排序后对进行遍历,执行逻辑是:对于相同 key 的 key-value 作为一个整体传入 reduce 方法;
    • 最终将 reduce 方法的执行结果 append 追加在输出文件;

MapReduce 大致上可以分为 6 个过程:Input,Split,Map,Shuffle,Reduce,Finalize。

  • split:任务的分发,MapReduce 将输入的海量数据进行逻辑切片,一片对应一个 Map 任务。分布式系统的优势之一就是可以并行地处理海量数据,自然需要将数据这块大蛋糕交给不同的主机吃下;
  • Intermediate files:在这里是将中间键值对暂存在运行 Map 的主机的磁盘上,事实上现在很多基于或改进于 MapReduce 的相关处理机制会改进这一步,因为磁盘读写速度效率比较差;注意事项:Google 实际上自己实现了一个软件文件系统:GFS,因此这里的文件系统并不是很简单。

我们写的 Map 以及 Reduce 还仅仅是一个单机运行代码,那么 MapReduce 是如何将它们进行分布式地并行运行地呢?

MapReduce 内部运行架构

  • 统计网站访问人次的计数:每一个服务器可能运行着拥有几个 URL 地址的 Web 应用,同一个 Web 应用又分布式运行在不同的服务器上,利用 Map 将它们分别的计数得到,最终利用 Reduce 进行相同 URL 的统计;
  • 倒排索引:Map 函数的输入键值对为 key- html 文档 id : value-文档内容,输出的一系列键值对为 key-索引值 : value-html 文档 id。Reduce 输入键值对为key – 索引值 : value-有着相同索引的 html 文档id,输出键值对和输入值结构是类似的;

论文中还指出了其他可以 MapReduce 模型的案例:

Reduce 就是用于将相同 key 的 key-value 集合进行汇聚为一个结果,这里相同 key 的 key-value 集合用一个键值对表示,其 key 含义不变,value 为一个 List 实例对应的 Iterator 实例,List 存储着多个 value,Iterator 为了方便遍历。

最总输出一个总次数。这里没有输出一对 key-value 键值对作为结果,因为我们图中锁指的键值对都是广义的,并不是非要局限于 key-value 键值对。

Reduce 的伪码如下:

Map 做的事情就是对于每一个文章的所有单词进行创建中间键值对,比如 “foo bar is foo bar”,那么产生 5 个键值对,它们的 value 都是 1:“foo”:1、“bar”:1、“is”:1、“foo”:1、“bar”:1。

参考链接:

发表回复

您的电子邮箱地址不会被公开。 必填项已用*标注