【论文阅读笔记】MapReduce: Simplified Data Processing on Large Clusters
1 概念
MapReduce
是一种用于在大型集群上进行简化数据处理的编程模型和计算框架。它最初由 Google 公司设计用于解决大规模数据处理问题,后来被广泛应用于各种大数据处理场景。
MapReduce
模型的核心思想是将大规模的数据集分解成多个小的数据块,然后分配给集群中的多个计算节点进行并行处理,最终将结果合并成最终的输出。
2 编程模型
MapReduce 编程模型由两个主要阶段组成:map 阶段和 reduce 阶段。
- map 阶段:在 map 阶段,输入数据被分割成若干个数据块,并由不同的计算节点进行并行处理。每个计算节点都会执行用户定义的 map 函数,将输入数据转换为键值对的形式,并发出中间结果。
- reduce 阶段:在 reduce 阶段,会将中间结果按照键进行分组,并由不同的计算节点进行并行处理。每个计算节点都会执行用户定义的 reduce 函数,对相同键的数据进行合并和处理,最终生成最终的输出结果。
对于用户(MapReduce的使用者)而言:MapReduce是一种抽象化的编程模型,它隐藏了分布式数据处理的细节,仅对外暴露map
和reduce
的抽象,用户来实现具体的map
和reduce
功能。MapReduce自身关注的是并行计算、容错、分布式数据、负载均衡等一系列问题,并且保证分布计算的结果和无错误的串形计算的结果一致。
形式化地说,由用户提供的 map
函数和 reduce
函数应有如下类型:
$$
\begin{align*}
\text{map} &\quad (k_1, v_1)\quad\quad\quad\rightarrow\quad\text{list}(k_2, v_2)\
\text{reduce} &\quad (k_2,\text{list}(v_2))\quad\rightarrow\quad\text{list}(v_2)
\end{align*}
$$
其中,输入的 key
和 value
值与输出的 key
和 value
值在类型上推导的域不同。此外,中间结果 key
和 value
值与输出 key
和 value
值在类型上推导的域相同。
例如,计算一个大的文档集合中每个单词出现的次数,下面是伪代码段:
|
|
map
函数输出文档中的每个词、以及这个词的出现次数(在这个简单的例子里就是 1)。reduce
函数把 map
函数产生的每一个特定的词的计数累加起来。
值得注意的是,在实际的实现中 MapReduce
框架使用 Iterator
来代表作为输入的集合,主要是为了避免集合过大,无法被完整地放入到内存中。
3 实现
3.1 MapReduce执行流程
下图展示了MapReduce
操作的全部流程。当用户调用 MapReduce
函数时,将发生下面的一 系列动作(下面的序号和图中的序号一一对应):
- 用户程序首先调用的
MapReduce
库将输入文件分成 $M$ 个数据片度,每个数据片段的大小一般从 $16\text{ MB}$ 到$64\text{ MB}$(可以通过可选的参数来控制每个数据片段的大小)。然后用户程序在集群中创建大量的程序副本。 - 这些程序副本中的有一个特殊的程序—
master
。副本中其它的程序都是worker
程序,由master
分配 任务。有 $M$ 个map
任务和 $R$ 个reduce
任务将被分配,master
将一个map
任务或reduce
任务分配给一个空闲的worker
。 - 被分配了
map
任务的 worker 程序读取相关的输入数据片段,从输入的数据片段中解析出键值对,然后把键值对传递给用户自定义的 Map 函数,由 Map 函数生成并输出的中间结果键值对,并缓存在内存中。 - 缓存中的键值对通过分区函数(可由用户指定,默认为
hasy(key) mod R
)分成 $R$ 个区域,之后周期性的写入到本地磁盘上。缓存的键值对在本地磁盘上的存储位置将被回传给master
,由master
负责把这些存储位置再传送给reduce worker
。 - 当
reduce worker
程序接收到master
程序发来的数据存储位置信息后,使用RPC
从Map worker
所在主机的磁盘上读取这些缓存数据。当reduce worker
读取了所有的中间数据后,通过对key
进行排序后使得具有相同key
值的数据聚合在一起。由于许多不同的key
值会映射到相同的reduce
任务上, 因此必须进行排序。如果中间数据太大无法在内存中完成排序,那么就要在外部进行排序。 reduce worker
程序遍历排序后的中间数据,对于每一个唯一的中间key
值,reduce worker
程序将这 个key
值和它相关的中间结果value
值的集合传递给用户自定义的reduce
函数。reduce
函数的输出被追加到所属分区的输出文件。- 当所有的
map
和reduce
任务都完成之后,master
唤醒用户程序。在这个时候,在用户程序里的对MapReduce
调用才返回。
在成功完成任务之后,MapReduce
的输出存放在 $R$ 个输出文件中(对应每个 reduce
任务产生一个输出文件,文件名由用户指定)。一般情况下,用户不需要将这 $R$ 个输出文件合并成一个文件—他们经常把这些文件作为另外一个 MapReduce
的输入,或者在另外一个可以处理多个分割文件的分布式应用中使用。
3.2 master数据结构
master
持有一些数据结构,它存储每一个 map
和 reduce
任务的状态(空闲、工作中或完成),以及 worker
机器(非空闲任务的机器)的标识。
master
就像一个数据管道,中间文件存储区域的位置信息通过这个管道从 map
传递到 reduce
。因此, 对于每个已经完成的 map
任务,master
存储了 map
任务产生的 $R$ 个中间文件存储区域的大小和位置。当 map
任务完成时,master
接收到位置和大小的更新信息,这些信息被逐步递增的推送给那些正在工作的 reduce
任务。
3.3 容错机制
3.3.1 worker故障
故障判定
master
周期性的ping
每个worker
。如果在一个约定的时间范围内没有收到worker
返回的信息,master
将 把这个worker
标记为失效。故障处理
- 正在运行:正在运行的
map
或reduce
任务将被重新置为空闲状态,等待重新调度。 - 已完成:所有由这个故障的
worker
完成的map
任务也会被重设为初始的空闲状态,等待重新调度,因为该worker
不可用也意味着存储在该worker
本地磁盘上的中间结果也不可用了;已经完成的reduce
任务的输出存储在全局文件系统(eg. Google File System)上,因此不需要重新执行。
- 正在运行:正在运行的
当一个 map
任务首先被 worker A
执行,之后由于 worker A
故障了又被调度到 worker B
执行,这个“重新执行”的动作会被通知给所有执行 reduce
任务的 worker
。任何还没有从 worker A
读取数据的 reduce
任务 将从 worker B
读取数据。
3.3.2 master故障
一个简单的解决办法是让 master
周期性的将上面描述的master数据结构的写入磁盘,即检查点(checkpoint)。如果这个 master 任务失败了,可以从最后一个检查点(checkpoint)开始启动另一个 master
进程。
然而,由于只有一个 master
进程,master
失效后再恢复是比较麻烦的,因此现在的实现是如果 master
故障,就中止MapReduce
运算。用户可以检查到这个状态,并且可以根据需要重新执行 MapReduce
操作。
3.3.3 出现故障时的语义
当用户提供的 map
和 reduce
操作是输入确定性函数(即相同的输入产生相同的输出)时,MapReduce保证任何情况下的输出都和所有程序没有出现任何错误、顺序的执行产生的输出是一样的。 这依赖对 map
和 reduce
任务的输出是原子提交的来完成这个特性。
- 每个工作中的任务把它的输出写到私有的临时文件中。
- 每个
reduce
任务生成一个这样的文件,而每个map
任务则生成 $R$ 个这样的文件(一 个reduce
任务对应一个文件)。 - 当一个
map
任务完成的时,worker
发送一个包含R
个临时文件名的完成消息给master
。如果master
从一个已经完成的map
任务再次接收到到一个完成消息,master
将忽略这个消息;否 则,master
将这 $R$ 个文件的名字记录在数据结构里。 - 当
reduce
任务完成时,reduce worker
进程以原子的方式把临时文件重命名为最终的输出文件。如果同一个reduce
任务在多台机器上执行,针对同一个最终的输出文件将有多个重命名操作执行。这就依赖底层文件系统提供的重命名操作的原子性来保证最终的文件系统状态仅仅包含一个reduce
任务产生的数据。
使用 MapReduce 模型的程序员可以很容易的理解他们程序的行为,因为我们绝大多数的 map
和 reduce
操作是确定性的,而且存在这样的一个事实:我们的语义(也可以理解为处理机制)等价于一个顺序的执行的操作。
当 map
and/or reduce
操作是不确定性的时候,MapReduce提供虽然较弱但是依然合理的语义。当使用非确定操作的时候, 一个 reduce
任务 $R_1$ 的输出等价于一个非确定性程序顺序执行产生时的输出。但是,另一个 reduce
任务 $R_2$的输出也许符合一个不同的非确定程序顺序执行产生的 $R_2$ 的输出。
考虑 map
任务 $M$ 和 reduce
任务 $R_1$、$R_2$ 的情况。我们设定 $e(R_i)$是 $R_i$ 已经提交的执行过程(有且仅有一个这样的执行过程)。出现较弱语义是因为 $e(R_1)$可能读取了$M$ 一次执行产生的输出,而 $e(R_2)$可能读取了 $M$ 的另一次执行产生的输出。
3.4 存储位置
核心思想:尽量把输入数据(由 GFS 管理)存储在集群中机器的本地磁盘上来节省网络带宽。
GFS 把每个文件按 64MB 一个 Block 分隔,每个 Block 保存 在多台机器上,环境中就存放了多份拷贝(一般是 3 个拷贝)。MapReduce 的 master
在调度 map
任务时会考虑输入文件的位置信息,尽量将一个 map
任务调度在包含相关输入数据拷贝的机器上执行;
如果上述努力失败 了,master
将尝试在保存有输入数据拷贝的机器附近的机器上执行 map
任务(例如,分配到一个和包含输入数据的机器在一个 switch 里的 worker 机器上执行)。当在一个足够大的 cluster 集群上运行大型 MapReduce 操作的时候,大部分的输入数据都能从本地机器读取,因此消耗非常少的网络带宽。
3.5 任务粒度
理想情况下,$M$ 和 $R$ 应当比集群中 worker
的机器数量要多得多。在每台 worker
机器都执行大量的不同任务能够提高集群的动态的负载均衡能力,并且能够加快故障恢复的速度:失效机器上执行的大量 map
任务都可以分布到所有其他的 worker
机器上去执行。
但是实际上,在具体实现中对 $M$ 和 $R$ 的取值都有一定的客观限制,因为 master
必须执行 $O(M+R) $次调度,并且在内存中保存 $O(M\times R)$个状态(对影响内存使用的因素还是比较小的:$O(M\times R)$块状态,大概每对 map
任务/reduce
任务 1 个字节就可以了)。
更进一步,$R$ 值通常是由用户指定的,因为每个 reduce
任务最终都会生成一个独立的输出文件。实际使用时我们也倾向于选择合适的 $M$ 值,以使得每一个独立任务都是处理大约 16M 到 64M 的输入数据(这样, 上面描写的输入数据本地存储优化策略才最有效),另外,我们把 $R$ 值设置为我们想使用的 worker
机器数量的小的倍数。
所以我们通常会用这样的比例来执行 MapReduce:$M=200000$,$R=5000$,使用 $2000$ 台 worker
机器。
3.6 备用任务
如果集群中有某个 worker
花了特别长的时间来完成最后的几个 map
或 reduce
任务,整个 MapReduce 计算任务的耗时就会因此被拖长,这样的 worker
也就成了落后者(Straggler)。
因此,论文提出一个通用的机制来减少“落伍者”出现的情况。当一个 MapReduce 操作接近完成的时候,master
会调度备用(backup)任务进程来执行剩下的、处于处理中状态(in-progress)的任务。无论是最初的执行、还是备用(backup)任务进程完成了任务,我们都把这个任务标记成为已经完成。
4 扩展技巧
4.1 分区函数
MapReduce 的使用者通常会指定 reduce
任务和 reduce
任务输出文件的数量($R$)。我们在中间结果key
上使用分区函数来对数据进行分区,之后再输入到后续任务执行进程。
一个缺省的分区函数是使用 hash
方法(比如, hash(key) mod R
)进行分区。hash
方法能产生非常平衡的分区。然而,有的时候,其它的一些分区函数对 key
值进行的分区将非常有用。
比如,输出的 key
值是 URLs,我们希望每个主机的所有条目保持在同一个输出文件中。为了支持类似的情况,MapReduce库的用户需要提供专门的分区函数。例如,使用hash(Hostname(urlkey)) mod R
作为分区函数就可以把所有来自同一个主机的 URLs 保存在同一个输出文件中。
4.2 顺序保证
在给定的分区$R$中,MapReduce保证所有中间键值对数据的处理顺序是按照 key
值增量顺序处理的。
4.3 Combiner函数
在某些情况下,map
函数产生的中间 key
值的重复数据会占很大的比重,并且,用户自定义的 reduce
函数满足结合律和交换律。词数统计程序是个很好的例子。由于词频率倾向于一个 zipf 分布,每个 map
任务将产生成千上万个这样的记录。所有的这些记录将通过网络被发送到一个单独的 reduce
任务,然后由这个reduce
任务把所有这些记录累加起来产生一个数字。
MapReduce允许用户指定一个可选的 combiner
函数,combiner
函数首先在本地将这些记录进行一次合并,然后将合并的结果再通过网络发送出 去。 combiner
函数在每台执行 Map 任务的机器上都会被执行一次。一般情况下,combiner
和 reduce
函数是 一样的。combiner
函数和 Reduce 函数之间唯一的区别是 MapReduce 库怎样控制函数的输出。
reduce
函数的输出被保存在最终的输出文件里,而 combiner
函数的输出被写到中间文件里,然后被发送给 reduce
任务。 部分的合并中间结果可以显著的提高一些 MapReduce 操作的速度。
4.4 输入和输出的类型
MapReduce库支持几种不同的格式的输入数据。比如文本模式中,key
是文件的偏移量,value
是该行内容。
程序员可以定义Reader
接口来适应不同的输入类型,程序员需要保证必须能把输入数据切分成数据片段,且这些数据片段能够由单独的Map任务来处理就行了。Reader
的数据源可能是数据库,可能是文本文件,甚至是内存等。
同样,用户采用类似添加新的输入数据类型的方式增加新的输出类型(定义Writer
接口)。
4.5 副作用
程序员在写map
和/或reduce
操作的时候,可能会因为方便,定义很多额外功能,比如增加辅助的输出文件等。但应当时刻记住,map
和reduce
操作应当保证原子性和幂等性。
比如,一个任务生成了多个输出文件,但是我们没有原子化多段commit的操作。这就需要程序员自己保证生成多个输出的任务是确定性任务。
4.6 跳过损坏的记录
有时候,用户程序中的 bug 导致 map
或者 reduce
函数在处理某些记录的时候 crash 掉,MapReduce 操作 无法顺利完成。相较于修复无法执行的 Bug,跳过引发 Bug 的记录可能更为明智。因此,我们希望 MapReduce 检测哪些记录导致确定性的crash, 并且跳过这些记录不处理。
MapReduce 如何自动检测这种情况呢?首先,每个worker
进程都设置了信号处理函数捕获内存段异常(segmentation violation)和总线错误(bus error)。 在执行 map
或者 reduce
操作之前,MapReduce 库通过全局变量保存记录序号。如果用户程序触发了一个系统信号,信号处理函数将用“最后一口气”通过 UDP 包向 master
发送处理的最后一条记录的序号。当 master
看到在处理某条特定记录不止失败一次时,master
就标志这条记录需要被跳过,并且在下次重新执行相关的map
或者 reduce
任务的时候跳过这条记录。
4.7 本地执行
调试 map
和 reduce
函数的 bug 非常困难,因为它们在分布式系统中执行,并且通常跨多台计算机执行,由 master
动态调度。为了简化调试、性能分析和小规模测试,Google开发了本地版本的 MapReduce 库。这个本地版本可以让 MapReduce 操作在单台计算机上顺序执行。用户可以控制操作的执行,并且可以将其限制在特定的 map
任务上。通过设置特殊标志,用户可以在本地执行他们的程序,并且轻松使用本地调试和测试工具(如 gdb
)。
4.8 状态信息
在 master
内部,设有一个内置的 HTTP 服务器,用于展示一系列状态信息页面。这些页面会显示计算进度,例如已完成的任务数量、正在执行的任务数量、输入、中间数据和输出的字节数,以及处理速率等。
这些页面还包含了指向每个任务的stderr
和stdout
文件的链接。用户可以利用这些数据来预测计算完成所需的时间,以及是否需要增加更多资源。当计算花费的时间超过预期时,这些页面还可以帮助用户找出执行速度缓慢的原因。
另外,顶层状态页面还会显示出现故障的worker
及其故障时正在执行的 map
和 reduce
任务。这些信息对于调试用户代码中的 bug 非常有帮助。
很多分布式系统架构都会提供可视化监控界面,这是提升分布式系统的可维护性的重要手段。
4.9 计数器
MapReduce 库提供计数器机制,用来统计不同操作发生次数。比如,用户可能想统计已经处理了多少个单词、已经索引的多少篇 German 文档等等。
要想使用这个特性,用户需要创建Counter对象,然后在map
和reduce
函数中以正确的方式增加counter
。这些计数器的值周期性的从各个单独的worker
机器上传递给master
(附加在ping的应答包中传递)。master
把执行成功的 map
和 reduce
任务的计数器值进行累计,当 MapReduce 操作完成之后,返回给用户代码。 计数器当前的值也会显示在 master
的状态页面上,这样用户就可以看到当前计算的进度。
当累加这些counter
的值时,master
会去掉那些重复执行的相同map
或者reduce
操作的次数,以此避免重复计数(之前提到的备用任务和故障后重新执行任务,这两种情况会导致相同的任务被多次执行)。
有些counter
值是由MapReduce库自动维护的,例如已经处理过的输入键值对的数量以及生成的输出键值对的数量等等。
5 应用场景
5.1 论文中提出的应用场景
- 分布式的 Grep:
map
函数输出匹配某个模式的一行,reduce
函数是一个恒等函数,即把中间数据复制到输出。 - 计算 URL 访问频率:
map
函数处理日志中 web 页面请求的记录,然后输出 (URL,1)。reduce
函数把相同 URL 的 value 值都累加起来,产生 (URL, 记录总数)结果。 - 倒转网络链接图:
map
函数在源页面(source)中搜索所有的链接目标(target)并输出为(target,source)。reduce
函数把给定链接目标(target)的链接组合成一个列表,输出(target,list(source))。 - 每个主机的检索词向量:检索词向量用一个(词,频率)列表来概述出现在文档或文档集中的最重要的一些词。
map
函数为每一个输入文档输出(主机名,检索词向量),其中主机名来自文档的 URL。reduce
函数接收给定主机的所有文档的检索词向量,并把这些检索词向量加在一起,丢弃掉低频的检索词,输出一个最终的(主机名,检索词向量)。 - 倒排索引:
map
函数分析每个文档输出一个(词,文档号)的列表,reduce
函数的输入是一个给定词的所有 (词,文档号),排序所有的文档号,输出(词,list(文档号))。所有的输出集合形成一个简单的倒排索引,它以一种简单的算法跟踪词在文档中的位置。 - 分布式排序:
map
函数从每个记录提取key
,输出(key,record)。reduce
函数不改变任何的值。这个运算依赖分区机制和排序属性。 - 重建索引系统:重写了 Google 网络搜索服务所使用的索引系统。这个索引系统的输入数据是网络爬虫抓取回来的大量文档,这些文档数据保存在 GFS 文件系统中,其原始内容超过了 20TB。通过一系列的 MapReduce 操作(大约 5 到 10 次),来建立索引。使用 MapReduce(替换上一个特别设计的、分布式处理的索引程序)带来这些好处:
- 简化的代码:索引部分的代码变得简单、小巧、易于理解;
- 灵活性:MapReduce 库的性能已经足够好,因此可以将概念上不相关的计算步骤分开处理,减少数据传递的额外开销;
- 操作管理的简化:因为由机器失效、机器处理速度缓慢、以及网络的瞬间阻塞等引起的绝大部分问题都已经由 MapReduce 库解决了,不再需要操作人员的介入了。另外,我们可以通过在索引系统集群中增加机器的简单方法提高整体处理性能。
5.2 其他应用场景
- 数据清洗和预处理:MapReduce 可以用于处理大规模数据集的清洗和预处理,包括数据去重、数据过滤、数据格式转换等操作;
- 日志分析和异常检测:MapReduce 可以用于分析大规模日志数据,检测异常行为、故障事件和系统性能问题;
- 图算法和社交网络分析:MapReduce 可以应用于图算法和社交网络分析,包括图的遍历、最短路径计算、社区发现等操作;
- 文本挖掘和信息抽取:MapReduce 可以用于处理文本数据,进行信息抽取、实体识别、主题建模等自然语言处理任务。
6 FAQ
当你调用emit时,数据会发生什么变化?emit函数在哪运行?
首先看,这些函数在哪运行。这里可以看MapReduce论文的图1。现实中,MapReduce运行在大量的服务器之上,我们称之为worker服务器或者worker。同时,也会有一个Master节点来组织整个计算过程。这里实际发生的是,Master服务器知道有多少输入文件,例如5000个输入文件,之后它将Map函数分发到不同的worker。所以,它会向worker服务器发送一条消息说,请对这个输入文件执行Map函数吧。之后,MapReduce框架中的worker进程会读取文件的内容,调用Map函数并将文件名和文件内容作为参数传给Map函数。worker进程还需要实现emit,这样,每次Map函数调用emit,worker进程就会将数据写入到本地磁盘的文件中。所以,Map函数中调用emit的效果是在worker的本地磁盘上创建文件,这些文件包含了当前worker的Map函数生成的所有的key和value。
所以,Map阶段结束时,我们看到的就是Map函数在worker上生成的一些文件。之后,MapReduce的worker会将这些数据移动到Reduce所需要的位置。对于一个典型的大型运算,Reduce的入参包含了所有Map函数对于特定key的输出。通常来说,每个Map函数都可能生成大量key。所以通常来说,在运行Reduce函数之前。运行在MapReduce的worker服务器上的进程需要与集群中每一个其他服务器交互来询问说,看,我需要对key=a运行Reduce,请看一下你本地磁盘中存储的Map函数的中间输出,找出所有key=a,并通过网络将它们发给我。所以,Reduce worker需要从每一个worker获取特定key的实例。这是通过由Master通知到Reduce worker的一条指令来触发。一旦worker收集完所有的数据,它会调用Reduce函数,Reduce函数运算完了会调用自己的emit,这个emit与Map函数中的emit不一样,它会将输出写入到一个Google使用的共享文件服务中。
有关输入和输出文件的存放位置,这是我之前没有提到的,它们都存放在文件中,但是因为我们想要灵活的在任意的worker上读取任意的数据,这意味着我们需要某种网络文件系统(network file system)来存放输入数据。所以实际上,MapReduce论文谈到了GFS(Google File System)。GFS是一个共享文件服务,并且它也运行在MapReduce的worker集群的物理服务器上。GFS会自动拆分你存储的任何大文件,并且以64MB的块存储在多个服务器之上。所以,如果你有了10TB的网页数据,你只需要将它们写入到GFS,甚至你写入的时候是作为一个大文件写入的,GFS会自动将这个大文件拆分成64MB的块,并将这些块平均的分布在所有的GFS服务器之上,而这是极好的,这正是我们所需要的。如果我们接下来想要对刚刚那10TB的网页数据运行MapReduce Job,数据已经均匀的分割存储在所有的服务器上了。如果我们有1000台服务器,我们会启动1000个Map worker,每个Map worker会读取1/1000输入数据。这些Map worker可以并行的从1000个GFS文件服务器读取数据,并获取巨大的读取吞吐量,也就是1000台服务器能提供的吞吐量。