友情链接
伴随互联网、移动互联网、物联网、5G等信息通信技术及产业的发展,全球数据量呈现爆发式增长的趋势,数据的生成速度呈指数级增长,这些因素给数据处理和分析带来了巨大挑战。
那么,如何从采集到的大量数据集中挖掘到更深层次的信息并进行深度分析来赋予其价值?
使用分布式计算框架并行计算处理。
MapReduce最早是Google提出的一个分布式计算框架,Apache Spark基于MapReduce框架做了优化改进,算法与MR一致,都是旨在使用并行算法来处理和分析大型数据集,加快大数据任务的处理效率。MapReduce框架采用split-apply-combine的方法将一个job划分为几个独立的task来并行处理大量数据,最终为应用程序提供统一输出,主要分为以下三个阶段:
MapReduce框架的核心思想是分而治之(map分,reduce合),并行计算。适用于海量数据查询、数据批处理计算等场景,具有以下优势:
在MapReduce框架出现之前行业内大多是通过MPP(Massive Parallel Programming)的方式来增强系统的计算能力,一般是通过复杂且昂贵的硬件来加速计算,譬如高性能计算机和数据库一体机等。
而MapReduce可以水平扩展,能够处理跨数千个节点的海量数据集。随着数据集大小的增加,只需数个廉价的硬件就可以实现可扩展的、高并行的计算能力,可支持PB级别的弹性化数据计算。
在容错性方面,由于MapReduce的分布式架构设计,在设计之初即设定了硬件故障的常态性,因此其计算模型设计了大量的容错逻辑,如任务心跳、重试、故障检测、重分布、任务黑/灰名单、磁盘故障处理等机制,覆盖了从JobTracker、TaskTracker到Job、Task和Record级别的从大到小各个层级的故障处理。
比如说每个worker只处理一个file split,而Map和Reduce过程之间通过硬盘进行数据交换,如果某一节点在处理过程中发生故障或出现其他问题,框架可以将相关任务重新分配给另一个节点,确保作业继续进行,无需人工干预。MapReduce也会定期向主节点报告每个节点的状态。如果某个节点没有按预期做出反应,主节点会将相关任务重新分配给其他可用的集群节点。
上述机制保障了系统良好的容错性和鲁棒性。
MapReduce框架抽象了分布式计算的复杂性,提供了一个简化的编程接口设计,与MPP领域流行的MPI等编程接口不同,MapReduce不需要开发者自己处理并行度、数据分布策略、节点间信息交换、传输机制等复杂问题,只需要关注于实现Map和Reduce对应的业务逻辑即可,大大简化了开发过程。
MapReduce的计算基于key-value的数据对,value域可以包含各种类型的数据,如结构化数据或图片、文件类非结构化数据,因此MapReduce计算框架能够很好地支持非结构化数据的处理。此外,该框架也支持多种编程语言,包括Java、Python和Ruby等等,程序开发变得更加简单。
这些核心优势后续也被一些计算框架有效继承(如Spark等),但是随着企业数据分析类需求的逐渐深入,MapReduce计算框架的架构问题逐渐暴露出来,比如不能处理实时类数据,无法满足一些快速发展的互联网场景需求,如实时推荐、实时调度、准实时分析等等。后续Spark基于MR框架做了进一步的优化,解决了MapReduce计算框架的不足,基于内存和DAG的计算模式有效的减少了数据shuffle落磁盘的IO和子过程数量,实现了性能的数量级上的提升。
Spark介绍参考:Spark计算框架
虽然Spark框架的优化使得Spark的性能比MapReduce快一个数量级以上,但是本质上仍然是一个MapReduce的计算模式。
为了能让您更进一步的了解MapReduce的工作原理,接下来将讲述当启动一个MR程序的一些执行组件。
文件是MapReduce程序中涉及的数据的存储池,输入文件通常位于分布式文件系统中。这些文件的格式是任意的,我们可以使用基于行的日志文件,也可以使用二进制格式,多行输入记录或使用其它的一些格式。
[Input Files--比如在HDFS中,NameNode扮演着十分重要的角色,其一大功能是负责进行元数据的存储与管理。NN(Master)节点数据分成块存入磁盘(Input Files)]
InputFormat 定义了如何拆分和读取Input Files。InputFormat将Input Files拆分为一个或多个InputSplit并分配给单独的Mapper。
InputFormat提供以下功能:
InputSplits由InputFormat创建,在逻辑上表示为单个map任务的一个单元。默认情况下FileInputFormat会将文件分成 128MB 的块(用户可以根据MapReduce程序中数据的大小来控制Split的大小,比如可以在mapred-site.xml文件内设置mapred.min.split.size参数)。
由于InputSplit实际上并不包含实际数据,只是在MapReduce 程序或其他处理技术中的数据处理过程中使用,因此接下来Splits会被拆分为records也就是键值对。
RecordReader主要是与InputSplit进行通信,用来加载数据并将数据转换为适合mapper读取的键值对。比如常见的TextInputFormat的输入格式,该格式提供了一个LineRecordReader,这个类会把输入文件的每一行都作为一个新的值,关联到每一行的key则是该行在文件中的字节偏移量。RecordReader会在输入块上被重复的调用直到处理完整个输入块,然后这些键值对会被发送到mapper进行进一步处理。
Mapper处理每条输入记录(来自RecordReader)并生成新的键值对,但是这里需要注意,Mapper所生成的这个键值对与前面通过RecordReader生成的键值对完全不同。Mapper的输出也称为中间输出(写入本地磁盘),不会存储在HDFS上,因为这是临时数据,如果存储在HDFS上的话会创建不必要的副本。此时Mappers的输出会被传递到combiner 进行进一步处理。
Combiner 也被称为“迷你mapper”,因为MapReduce Combiner会对mapper的输出执行本地聚合,可以最大限度地减少mapper和reducer之间的数据传输所使用的带宽(reducer的概念后面会提出)。执行combiner后,输出将被传递至partitioner以进行进一步的工作。
有着相同key的数值无论是否来自同一个mapper都会一起被reduce,所有的map节点必须针对不同的中间输出(intermediate (k,v) pairs)发往哪个reducer达成一致。Partitioner类就是用来决定给定键值对的去向。
Partitioner获取combiner的输出,然后计算MapReduce中key的哈希值,然后基于这个结果进行分区、排序,具有相同key值的记录将放置同一个分区,然后将这个分区中的数据发送到一个reducer,分区总数等于reduce任务的数量。
总结一下前面的步骤:在 MapReduce 的作业执行中,它获取输入数据集并生成键值对列表。 这些键值对就是map阶段的结果。其中输入的数据会被分割,每个任务处理分割和每个映射,生成新的键值对列表(intermediate (k,v) pairs)。然后,根据key对其进行分区, 将不同分区下的数据分别发送给不同的reducer。
那么如何发送?下面进入到shuffle阶段。
把map任务的中间输出(键值对)发送到reducer的过程叫做shuffle。
当所有的mappers完成,并且中间输出被shuffle到了reducer节点后(这是一个普通的从节点,但reduce阶段将在这里运行,因此称为reducer节点),接下来这个中间输出将被合并和排序,然后作为输入提供给reduce阶段。
每个reduce任务都会创建一个Reducer实例。Reducer将mapper生成的中间键值对集合作为输入,然后对每个键值对运行一个reduce()函数生成输出。通常,在Reducer中会进行聚合、过滤和组合这些数据(键、值)的计算。
Reducer的reduce()方法只会调用一次,它会接收一个key以及关联到这个key的所有值的一个迭代器,这个迭代器会以一个未定义的顺序返回关联到同一个键的所有值。
Reducer处理完数据后它会产生一组新的输出,它的输出是最终的输出,会存储在HDFS中(也可能是内存或者磁盘,基于框架决定)。
它将这些来自Reducer 阶段的输出键值对写入输出文件。
在Reducer处理完数据后,会以OutputCollector.collect() 方法将reduce任务的输出写入文件系统(或内存,取决于框架)。RecordWriter将这些提供给OutputCollector的键值对写入输出文件的方式则是由OutputFormat决定的。
总的来说,MapReduce运行的流程是将输入的数据集拆分成多个独立的部分并将它们分散到不同设备上进行处理。输入数据被分解为键值对后以并行的方式映射到不同的处理节点来处理分块数据。处理完成后基于不同的shuffle方式将数据进行排序,基于key分发至不同的节点。Reduce阶段将结果组合成一组特定的键值对输出,将结果数据输出到外部存储文件系统中或内存。
MapReduce支持多种编程语言的特性使程序开发变得更加简单,通过MR框架来管理系统各个部分之间的通信和数据传输,除了降低了通信成本之外,集群的计算速度、容错能力以及可靠性也得到了提升。对于用户来说只需要将自定义代码(业务逻辑代码)按照MapReduce的方式处理,无需考虑数据存储、节点间信息交换、传输机制等其他问题,交由引擎处理即可。
但是,尽管MR框架因为其高灵活性、容错性等优势被广泛应用,但是这类型计算框架在计算进程中存在一些限制:
这些限制可能会导致单个task上需要处理大量的数据,一些处理节点会比其他节点需要更长的时间运行才能完成数据计算,这样既限制了并行处理的效率,也造成了空闲处理节点的资源浪费,系统将无法充分利用节点进行并行处理,十分影响性能和效率。这个现象就是数据倾斜。
下一篇:分布式计算框架系列文章(二)数据倾斜现象诱因、原理、影响,以及星环对此的应对策略
友情链接
伴随互联网、移动互联网、物联网、5G等信息通信技术及产业的发展,全球数据量呈现爆发式增长的趋势,数据的生成速度呈指数级增长,这些因素给数据处理和分析带来了巨大挑战。
那么,如何从采集到的大量数据集中挖掘到更深层次的信息并进行深度分析来赋予其价值?
使用分布式计算框架并行计算处理。
MapReduce最早是Google提出的一个分布式计算框架,Apache Spark基于MapReduce框架做了优化改进,算法与MR一致,都是旨在使用并行算法来处理和分析大型数据集,加快大数据任务的处理效率。MapReduce框架采用split-apply-combine的方法将一个job划分为几个独立的task来并行处理大量数据,最终为应用程序提供统一输出,主要分为以下三个阶段:
MapReduce框架的核心思想是分而治之(map分,reduce合),并行计算。适用于海量数据查询、数据批处理计算等场景,具有以下优势:
在MapReduce框架出现之前行业内大多是通过MPP(Massive Parallel Programming)的方式来增强系统的计算能力,一般是通过复杂且昂贵的硬件来加速计算,譬如高性能计算机和数据库一体机等。
而MapReduce可以水平扩展,能够处理跨数千个节点的海量数据集。随着数据集大小的增加,只需数个廉价的硬件就可以实现可扩展的、高并行的计算能力,可支持PB级别的弹性化数据计算。
在容错性方面,由于MapReduce的分布式架构设计,在设计之初即设定了硬件故障的常态性,因此其计算模型设计了大量的容错逻辑,如任务心跳、重试、故障检测、重分布、任务黑/灰名单、磁盘故障处理等机制,覆盖了从JobTracker、TaskTracker到Job、Task和Record级别的从大到小各个层级的故障处理。
比如说每个worker只处理一个file split,而Map和Reduce过程之间通过硬盘进行数据交换,如果某一节点在处理过程中发生故障或出现其他问题,框架可以将相关任务重新分配给另一个节点,确保作业继续进行,无需人工干预。MapReduce也会定期向主节点报告每个节点的状态。如果某个节点没有按预期做出反应,主节点会将相关任务重新分配给其他可用的集群节点。
上述机制保障了系统良好的容错性和鲁棒性。
MapReduce框架抽象了分布式计算的复杂性,提供了一个简化的编程接口设计,与MPP领域流行的MPI等编程接口不同,MapReduce不需要开发者自己处理并行度、数据分布策略、节点间信息交换、传输机制等复杂问题,只需要关注于实现Map和Reduce对应的业务逻辑即可,大大简化了开发过程。
MapReduce的计算基于key-value的数据对,value域可以包含各种类型的数据,如结构化数据或图片、文件类非结构化数据,因此MapReduce计算框架能够很好地支持非结构化数据的处理。此外,该框架也支持多种编程语言,包括Java、Python和Ruby等等,程序开发变得更加简单。
这些核心优势后续也被一些计算框架有效继承(如Spark等),但是随着企业数据分析类需求的逐渐深入,MapReduce计算框架的架构问题逐渐暴露出来,比如不能处理实时类数据,无法满足一些快速发展的互联网场景需求,如实时推荐、实时调度、准实时分析等等。后续Spark基于MR框架做了进一步的优化,解决了MapReduce计算框架的不足,基于内存和DAG的计算模式有效的减少了数据shuffle落磁盘的IO和子过程数量,实现了性能的数量级上的提升。
Spark介绍参考:Spark计算框架
虽然Spark框架的优化使得Spark的性能比MapReduce快一个数量级以上,但是本质上仍然是一个MapReduce的计算模式。
为了能让您更进一步的了解MapReduce的工作原理,接下来将讲述当启动一个MR程序的一些执行组件。
文件是MapReduce程序中涉及的数据的存储池,输入文件通常位于分布式文件系统中。这些文件的格式是任意的,我们可以使用基于行的日志文件,也可以使用二进制格式,多行输入记录或使用其它的一些格式。
[Input Files--比如在HDFS中,NameNode扮演着十分重要的角色,其一大功能是负责进行元数据的存储与管理。NN(Master)节点数据分成块存入磁盘(Input Files)]
InputFormat 定义了如何拆分和读取Input Files。InputFormat将Input Files拆分为一个或多个InputSplit并分配给单独的Mapper。
InputFormat提供以下功能:
InputSplits由InputFormat创建,在逻辑上表示为单个map任务的一个单元。默认情况下FileInputFormat会将文件分成 128MB 的块(用户可以根据MapReduce程序中数据的大小来控制Split的大小,比如可以在mapred-site.xml文件内设置mapred.min.split.size参数)。
由于InputSplit实际上并不包含实际数据,只是在MapReduce 程序或其他处理技术中的数据处理过程中使用,因此接下来Splits会被拆分为records也就是键值对。
RecordReader主要是与InputSplit进行通信,用来加载数据并将数据转换为适合mapper读取的键值对。比如常见的TextInputFormat的输入格式,该格式提供了一个LineRecordReader,这个类会把输入文件的每一行都作为一个新的值,关联到每一行的key则是该行在文件中的字节偏移量。RecordReader会在输入块上被重复的调用直到处理完整个输入块,然后这些键值对会被发送到mapper进行进一步处理。
Mapper处理每条输入记录(来自RecordReader)并生成新的键值对,但是这里需要注意,Mapper所生成的这个键值对与前面通过RecordReader生成的键值对完全不同。Mapper的输出也称为中间输出(写入本地磁盘),不会存储在HDFS上,因为这是临时数据,如果存储在HDFS上的话会创建不必要的副本。此时Mappers的输出会被传递到combiner 进行进一步处理。
Combiner 也被称为“迷你mapper”,因为MapReduce Combiner会对mapper的输出执行本地聚合,可以最大限度地减少mapper和reducer之间的数据传输所使用的带宽(reducer的概念后面会提出)。执行combiner后,输出将被传递至partitioner以进行进一步的工作。
有着相同key的数值无论是否来自同一个mapper都会一起被reduce,所有的map节点必须针对不同的中间输出(intermediate (k,v) pairs)发往哪个reducer达成一致。Partitioner类就是用来决定给定键值对的去向。
Partitioner获取combiner的输出,然后计算MapReduce中key的哈希值,然后基于这个结果进行分区、排序,具有相同key值的记录将放置同一个分区,然后将这个分区中的数据发送到一个reducer,分区总数等于reduce任务的数量。
总结一下前面的步骤:在 MapReduce 的作业执行中,它获取输入数据集并生成键值对列表。 这些键值对就是map阶段的结果。其中输入的数据会被分割,每个任务处理分割和每个映射,生成新的键值对列表(intermediate (k,v) pairs)。然后,根据key对其进行分区, 将不同分区下的数据分别发送给不同的reducer。
那么如何发送?下面进入到shuffle阶段。
把map任务的中间输出(键值对)发送到reducer的过程叫做shuffle。
当所有的mappers完成,并且中间输出被shuffle到了reducer节点后(这是一个普通的从节点,但reduce阶段将在这里运行,因此称为reducer节点),接下来这个中间输出将被合并和排序,然后作为输入提供给reduce阶段。
每个reduce任务都会创建一个Reducer实例。Reducer将mapper生成的中间键值对集合作为输入,然后对每个键值对运行一个reduce()函数生成输出。通常,在Reducer中会进行聚合、过滤和组合这些数据(键、值)的计算。
Reducer的reduce()方法只会调用一次,它会接收一个key以及关联到这个key的所有值的一个迭代器,这个迭代器会以一个未定义的顺序返回关联到同一个键的所有值。
Reducer处理完数据后它会产生一组新的输出,它的输出是最终的输出,会存储在HDFS中(也可能是内存或者磁盘,基于框架决定)。
它将这些来自Reducer 阶段的输出键值对写入输出文件。
在Reducer处理完数据后,会以OutputCollector.collect() 方法将reduce任务的输出写入文件系统(或内存,取决于框架)。RecordWriter将这些提供给OutputCollector的键值对写入输出文件的方式则是由OutputFormat决定的。
总的来说,MapReduce运行的流程是将输入的数据集拆分成多个独立的部分并将它们分散到不同设备上进行处理。输入数据被分解为键值对后以并行的方式映射到不同的处理节点来处理分块数据。处理完成后基于不同的shuffle方式将数据进行排序,基于key分发至不同的节点。Reduce阶段将结果组合成一组特定的键值对输出,将结果数据输出到外部存储文件系统中或内存。
MapReduce支持多种编程语言的特性使程序开发变得更加简单,通过MR框架来管理系统各个部分之间的通信和数据传输,除了降低了通信成本之外,集群的计算速度、容错能力以及可靠性也得到了提升。对于用户来说只需要将自定义代码(业务逻辑代码)按照MapReduce的方式处理,无需考虑数据存储、节点间信息交换、传输机制等其他问题,交由引擎处理即可。
但是,尽管MR框架因为其高灵活性、容错性等优势被广泛应用,但是这类型计算框架在计算进程中存在一些限制:
这些限制可能会导致单个task上需要处理大量的数据,一些处理节点会比其他节点需要更长的时间运行才能完成数据计算,这样既限制了并行处理的效率,也造成了空闲处理节点的资源浪费,系统将无法充分利用节点进行并行处理,十分影响性能和效率。这个现象就是数据倾斜。
下一篇:分布式计算框架系列文章(二)数据倾斜现象诱因、原理、影响,以及星环对此的应对策略