MapReduce: Simplifified Data Processing on Large Clusters 论文链接

MapReduce整体介绍

MapReduce是一个针对处理大数据集的编程模型,主要有Map函数和Reduce函数两个核心概念。我们使用Map函数来从输入文件中产生intermediate(这里翻译成“中间”) key/value键值对,然后使用Reduce函数来合并所有属于相同key的value。

具体来说,用户编写Map函数接受原始key/value键值对,然后产生一系列的中间key/value,MapReduce会将属于同一个key的所有value聚合成value列表,然后将它们传给Reduce函数。用户编写的Reduce函数接收中间key以及属于该key的value列表,然后将属于同一个key的value合并输出。根据论文描述,intermediate value是通过迭代器的形式提供给Reduce函数的,这样做的目的是为了应对value列表太大,无法全部放入内存的情况。

以这种风格编写的程序会在大型机器集群上自动并行执行,其中,运行时系统负责以下工作:

  • 输入数据划分的相关细节
  • 多机器上的任务调度
  • 处理机器故障
  • 管理集群内机器间的通信

在这里插入图片描述

MapReduce实际部署与执行

通过自动将输入数据划分成M份,Map函数的调用被分布到多台机器上。划分的M份数据可以在多台机器上并行处理。Reduce函数调用通过使用划分函数partition function,例如$hash(key)$将中间键划分为R份,划分的数量和划分函数由用户指定。

Execution Overview

上图是MapReduce执行的整个流程,当用户调用MapReduce函数时,将发生以下行为:

  • 1,用户程序中的MapReduce库首先将用户输入划分成M份,每一份的大小通常在16MB到64MB之间,可以由用户控制。然后将在集群机器上开启程序的多个副本。
  • 2,副本中有一个特殊副本—master,剩下的是由master分配任务的workers。共有M个map任务和R个reduce任务需要划分。master会选择空闲的workers并为其分配一个map或者reduce任务。
  • 3,被分配map任务的worker将读取对应的输入数据内容。它解析输入数据中的key/value对并将每一个对传输给用户定义的Map函数。由Map函数产生的中间key/value对将被缓存在内存中。
  • 4,被缓存的键值对被周期性的写入磁盘,并由partition函数划分成R份,它们的磁盘地址将被传输给master,master随后负责将这些地址传输给reduce workers。
  • 5,当一个reduce worker从master处获得上述中间key/value的磁盘地址,它将通过远程过程调用读取map workers磁盘上对应的中间键值对数据。读取完成后,reduce worker将对中间key/value进行排序以使得相同key的键值对可以被分到同一组。排序是需要的,因为很多不同的key会被映射到统一个reduce任务。如果中间键值对的数量太多以至于无法装入内存,将使用外部排序。
  • 6,reduce worker将遍历排序后的键值对,对于每一个不同的key,将该key和其对应的value列表(由5中排序生成)传输给用户定义的Reduce函数。Reduce函数的输出将被附加到该reduce分区的最终输出文件中。
  • 7,当所有的map任务和reduce任务全部完成,master唤醒用户程序,此刻MapReduce函数调用将返回到用户程序。

当程序成功执行后,mapreduce的输出数据将保存在R份文件中,通常用户不需要将这R份文件合并成一份文件,通常这些文件将作为其他MapReduce的输入,或者通过其他支持处理分区文件的应用系统来使用这些文件。

Master Data Structures

master中保存并维护了诸多数据结构,对于每一个map任务和reduce任务,master存储其状态如:$idel、in-progress、completed$,并且保存了处于非空闲状态的worker机器的标识。

中间文件的地址通过master从map任务传输到reduce任务,因此对于每一个已经完成的map任务,master存储其地址以及由map任务产生的R个中间文件区域的大小。当map任务完成时对这些信息进行更新,这些信息被逐步推送给拥有运行状态reduce任务的workers。

Fault Tolerance

由于MapReduce是为大规模机器集群设计的,因此机器故障必须被优雅地解决。

  • worker failure:master周期性地ping每一个worker,如果一定时间内master没有收到回复则将worker标记为故障。worker完成地map任务将被重设为空闲状态得以在其他worker上进行调度。一个failure worker上处于in-progress状态地map任务或者reduce任务也会被重设为空闲状态,有资格进行重调度。

    处于failure状态的已完成的map任务需要被重新执行因为其输出文件被存储在故障机器的磁盘无法访问。处于failure状态的已完成的reduce任务不需要重执行,因为其输出保存在一个全局文件系统中。

    当map任务起初在worker A执行但是由于A故障而后在worker B执行,所有正在执行reduce任务的worker都会被通知这次重执行。所有尚未从worker A读取数据的reduce任务将从worker B读取数据。

  • master failure:通过周期性地将master中的数据结构写入checkpoint,当master挂掉,可以利用最新的checkpoint创建副本,但是考虑到只有一个master,一般不太可能让其失败。如果master失败,则停止MapReduce,客户端可以检查此状态并重试其定义的MapReduce操作。

  • Semantics in the Presence of Failures:当用户提供的map和reduce运算符是确定性函数时,我们所实现的分布式系统在任何情况下的输出都和所有程序在没有任何错误、并且按照顺序生成的输出是一样的。

    我们依赖于map任务和 reduce任务输出的原子性提交来实现这个特性。每个正在执行的任务会将它的输出写入到私有的临时文件中去。每个Reduce任务会生成这样一个文件,每个Map任务则会生成R个这样的文件(一个Reduce任务对应一个文件)。当一个map任务完成时,该map任务对应的worker会向master发送信息,该信息中包含了R个临时文件的名字。如果master从一个已经完成的map工作的worker处又收到这个完成信息,master就会将该信息忽略。否则,它会将这R个文件名记录在master的数据结构中。

    当Reduce任务完成时,reduce worker会以原子的方式将临时输出文件重命名为最终输出文件。如果多台机器执行同一个reduce任务,那么对同一个输出文件会进行多次重命名。我们依赖于底层文件系统所提供的原子性重命名操作来保证最终的文件系统状态仅包含一个Reduce任务所产生的数据。

    我们的map和reduce运算符绝大多数情况下是确定性的,在这种情况下我们的语义就代表了程序的执行顺序,这使得能够轻易地理解其程序的行为。当map 和reduce运算都是非确定性的情况下,我们会提供一种稍弱但依旧合理的语义。当在进行一个非确定性操作时,Reduce任务R1的输出等同于一个非确定性程序按顺序执行产生的输出。但是另一个Reduce任务R2的输出可能符合一个不同的非确定顺序程序执行产生的R2的输出。

    (知秋注:输出的结果可以由A来处理,也可以由B来处理,比如A处理{a,b,c},B处理{a,d,e},现在map下发了一个a,那这个a既可以交由A,也可以交由B进行处理,又好比编译原理中的词法分析,if可以被identify处理,也可以被keywords处理,只不过我们在其中设定了优先级,那弱语义就变为了强语义)

    考虑下这种情况,我们有一个Map任务M,两个Reduce任务,$R_1$和$R_2$。假设$e(R_i)$是$R_i$已经提交的执行过程(此处的e代表execution)(有且只有这样一次的提交)。当$e(R_1)$已经读取了由M产生的一次输出,并且$e(R_2)$读取了由M产生的另一次输出,这就会导致较弱语义的发生。

    (知秋注:结合上一个注,如果map下发了两次a,第一次A处理了,第二次B处理了,这就是所谓的较弱语义的发生)。

Locality

GFS将每个文件划分为64MB的块,并在不同的机器上存储每个块的几个副本(通常是3个副本)。MapReduce主服务器会考虑到输入文件的位置信息,并尝试在包含相应输入数据副本的机器上调度映射任务。如果失败,它将试图在该任务的输入数据的副本附近调度一个映射任务(例如,在与包含该数据的机器在相同的网络交换机上的工作机器上)。

Task Granularity

如前所述,我们将映射阶段细分为M个,减少阶段细分为R。理想情况下,M和R应该远远大于工作机器的数量。让每个worker执行许多不同的任务可以改善动态负载平衡,并在worker失败时加速恢复:它已经完成的许多映射任务可以分散到所有其他worker机器上。在我们的实现中,对于M和R的大小有实际的界限,因为主服务器必须做出$O(M+R)$调度决策,并在内存中保持$O(M∗R)$状态。

在实践中,我们倾向于选择M使得每个单独的任务输入数据的大小为16MB到64MB(以便使得上述的局部性优化最有效),我们将R设置为使用的工作机器数量的小倍数。我们经常使用M=200,000,R=5,000,使用2,000台机器。

Backup Tasks

由于单个机器可能存在磁盘损坏等因素从而延长整个MapReduce任务的执行时间,当一个MapReduce操作将要完成时,master为剩下的in-progress状态的任务启用备份执行,当原任务或者备份任务完成时将任务状态设置为完成。

优化

上述描述对于大多数需求已经足够,但是论文仍然提出了一些有用的扩展。

Partition 函数

中间键值对通过partition函数被划分,默认的函数为$hash(key) mod R$,产生的划分也相对平衡。在一些特定应用中也可以采用不同的分区函数来更好的满足应用需求,例如有时输出中间键为URLs,我们希望含有相同hostname的URL出现在同一份输出文件中,则分区函数可以设置为:

Ordering Guarantees

在一个给定分区中的中间键值对将按照键的顺序进行处理,有助于在每个分区生成排序输出文件,文件按key排序有助于按键进行的随机查找。

Combiner 函数

在某些情况下,由每个map任务生成的中间键可能存在大量重复,而用户指定的reduce函数与其相关联。例如在wordcount实例中map任务将产生大量的$$键值对,所有的这些计数将通过网络传输到reduce任务然后通过reduce函数累加为一个数。我们允许用户指定一个Combiner函数,Combiner函数可以在键值对数据通过网络传输前进行对属于相同key的value进行部分合并操作。

Combiner函数是在执行map任务的机器上执行的,通常Combiner函数和Reduce函数的代码相同。唯一的区别在于MapReduce如何处理函数的输出。reduce函数的输出被写入最终的输出文件,combiner函数的输出将被写入中间文件,这些中间文件随后将被传输到reduce任务。

对于某些任务,combiner函数可以起到很好的加速作用。

Input and Output Types

MapReduce支持多种不同形式的数据输入。用户可以自己实现Reader接口来添加输入数据类型。

Side-effects

在某些情况下,MapReduce的用户发现从他们的map或者reduce任务中生成辅助文件作为额外的输出很方便。我们依靠应用程序编写者来使这种副作用成为原子的和幂等的。通常,应用程序会写入临时文件,并在文件完全生成后原子重命名。

Skipping Bad Records

有时,一些特定记录导致的bug将使得整个MapReduce失败,有时忽略一些记录也是可以接受的,例如在大型数据集上进行统计分析。我们提供了一种可选的执行模式,其中MapReduce库检测哪些记录会导致确定性崩溃,并跳过这些记录,以便继续前进。

每个工作进程安装一个信号处理程序,以捕获分段违规和总线错误。在调用用户映射或减少操作之前,MapReduce库将参数的序列号存储在全局变量中。如果用户代码产生一个信号,信号处理程序向MapReduce主服务器发送一个包含序列号的“最后喘息”UDP包。当主服务器在特定记录上看到多个失败时,它表示在下次重新发出对应的“映射”任务或“减少”任务时,应跳过该记录。

Local Execution

Map或Reduce函数中的调试问题可能很棘手,因为实际的计算发生在一个分布式系统中,通常发生在几千台机器上,并由主机动态地做出工作分配决策。为了帮助促进调试、分析和小规模测试,我们开发了MapReduce库的另一种实现,该实现可以在本地机器上依次执行MapReduce操作的所有工作。

Status Information

主服务器运行一个内部HTTP服务器,并导出一组状态页以供人类使用。状态页面显示计算的进度,如已经完成了多少任务、多少正在进行、输入的字节、中间数据的字节、输出的字节、处理速率等。这些页面还包含了链向标准错误信息和输出文件。用户可以利用这些数据来预测计算将花费多长时间以及是否应该为计算增加更多的资源。这些页面也可以被用来洞察何时计算比预期慢。

此外,顶级状态页面显示了哪些工人失败了,以及哪些工人映射并减少了他们在失败时正在处理的任务。当试图诊断用户代码中的错误时,此信息非常有用。

Counters

MapReduce库提供了一个计数器工具来计算各种事件的发生次数。例如,用户代码可能希望计算已处理的单词总数或被索引的德语文档数。要使用此工具,用户代码将创建一个命名的计数器对象,然后在“Map/Reduce”函数中适当地递增计数器。如:

1
2
3
4
5
6
7
8
Counter* uppercase;
uppercase = GetCounter("uppercase");

map(String name, String contents):
for each word w in contents:
if (IsCapitalized(w)):
uppercase->Increment();
EmitIntermediate(w, "1");

来自单个工作机器的计数器值会定期传播到主服务器(可以包含在ping响应中)。主服务器聚合成功映射中的计数器值并减少任务,并在MapReduce操作完成后将它们返回给用户代码。当前计数器值也显示在主状态页面上,以便人类可以观察实时计算的进度。在聚合计数器值时,主会消除重复执行同一映射的影响,或减少任务以避免重复计数。

一些计数器值由MapReduce库自动维护,例如处理的输入键/值对的数量和产生的输出键/值对的数量。用户发现该计数器工具对完整性检查MapReduce操作的行为很有用。例如,在某些MapReduce操作中,用户代码可能希望确保产生的输出对的数量完全等于处理的输入对的数量,或者处理的德国文档的比例在处理的文档总数的一些可容忍的比例内。