在分布式计算系统中,计算任务(Task)的数量对于整个系统的性能和效率至关重要。比如:
综上所述,计算任务数量的合理配置对于分布式计算系统的性能、效率、稳定性和扩展性都有着直接的影响。
Inceptor的后台执行引擎是Spark,Spark的基本执行单位是Task,Task的数目是由RDD的分区数决定的。Inceptor中把一个SQL的执行划分为map阶段和reduce阶段,这两个阶段的task数目受到很多因素影响。
其次ArgoDB的任务分配是以Task为粒度执行的,每一个Task同时只会执行在一个Executor上,是用一个vCore资源,因此如果要充分利用上CPU,就必须要干预Task数量。
数据集主要是存储在磁盘中,在启动一个MR程序后,数据会被拆分成一个或多个InputSplits并分配给单独的Mapper生成对应数量的map tasks,数据的拆分方式取决于不同的数据源,受很多因素影响。
如果是开源的格式,例如text,orc等,那么InputFormat则定义了如何拆分和读取Input Files以及RecordReader。举个例子:
如果数据源是text格式,那么数据在输入时会基于设定的块大小(比如128MB)进行切分。然后TextInputFormat会把输入文件的每一行作为单独的一个记录,按行读取每条记录,此格式对应的key是行的字节偏移量,value则是行的内容。然后后续将键值对发送到mapper进行进一步处理,一个block则对应一个task。
也就是说处理text表的task数目是由HDFS文件的文件数数和split决定的,一个split对应一个task。一个大文件,HDFS 默认根据block size (TDH默认128MB)来分split。
注意:有些压缩文件,是没法分split的,一个文件只能一个task处理
如果是csv格式的话,每个csv文件对应一个task,单个csv文件本身不会被拆分为多个task处理。
再比如说如果数据源是orc格式,那么对应的则是OrcInputFormat,OrcInputFormat的算法逻辑是由参数hive.exec.orc.split.strategy控制在读取ORC表时生成split的策略,策略分为BI、ETL、HYBRID等等,不同策略划分方式不一样。比如BI策略则是以文件为粒度进行split划分。
通常默认是256MB一个split,但是由于切分实际并不准确,经常会有超过256MB的文件不能分split。
在星环的计算框架中Inputformat 主要用于和一些开源的格式适配,它的缺点是速度比较慢,而且接口比较固定不灵活。
所以如果数据源是星环自研的格式,例如holodesk、timelyre、stelallardb等等,则使用的是星环自研的stargate接口。该接口定义了如何分块,不同格式算法不一样,会针对map处理的数据量以及map总数量做一个权衡。
Holodesk表默认是256MB一个split,切分相对较准。
分桶表
一般分桶数就是task数目。Holodesk表超过256MB会切。实际观察下来,TORC一般不会切。
分区分桶表
当两个算子之间需要进行数据的重分布时会需要重新计算下一个Stage的task数,reduce task的数量决定了最终输出文件的数目。
这个阶段的task数量主要是由map 的数量以及和operator类型设置的阈值决定的,计算方式为:
因为星环计算引擎是以Shuffle来切分Stage的,因此只有两个算子之间需要进行数据的重分布(比如ArgoDB中使用Hash Shuffle)时,会需要重新计算下一个Stage的task数,而计算的方式为:
0.6的比值由 hive.groupby.aggregateratio 参数决定
单独计算不同部分的map&reduce task数目,然后相加
当进行limit时会进行预估,裁剪掉不必要的task。默认会先2个Task执行(参数 ngmr.num.parts.try.limit 决定,limit不够再起新的task),因此建议用户不要进行大数据量的limit,否则性能会差很多。
如果需要向目标表中插入数据,数据插入时,Stage的task数取决于是否向分桶表插入数据。
这是因为分桶的逻辑是根据数据的Hash值进行数据的分布计算,将数据写到不同的Tablet中,因此入库时的task数量一般是:
一般跟max(map task数)一致
通过开启AutoMerge,让一个Task读取多个Small Block,来实现减少Task数的效果
手动设置以下参数,会影响读表stage以外的stage的task数,可以增大或减少task数
mapred.reduce.tasks 设置reduce 个数
mapred.minreduce.tasks 设置最小reduce个数
对分桶表插入,如果mapred.reduce.tasks跟桶数不一致,结果可能有问题,所以分桶表插入时,不要设置mapred.reduce.tasks。
主要是通过ngmr.topo.mapred.limit,来控制任意两个Stage之间Map数量和Reduce数量乘积,如果大于这个值则会报错,默认值:1000000000(10亿)
在分布式计算系统中,计算任务(Task)的数量对于整个系统的性能和效率至关重要。比如:
综上所述,计算任务数量的合理配置对于分布式计算系统的性能、效率、稳定性和扩展性都有着直接的影响。
Inceptor的后台执行引擎是Spark,Spark的基本执行单位是Task,Task的数目是由RDD的分区数决定的。Inceptor中把一个SQL的执行划分为map阶段和reduce阶段,这两个阶段的task数目受到很多因素影响。
其次ArgoDB的任务分配是以Task为粒度执行的,每一个Task同时只会执行在一个Executor上,是用一个vCore资源,因此如果要充分利用上CPU,就必须要干预Task数量。
数据集主要是存储在磁盘中,在启动一个MR程序后,数据会被拆分成一个或多个InputSplits并分配给单独的Mapper生成对应数量的map tasks,数据的拆分方式取决于不同的数据源,受很多因素影响。
如果是开源的格式,例如text,orc等,那么InputFormat则定义了如何拆分和读取Input Files以及RecordReader。举个例子:
如果数据源是text格式,那么数据在输入时会基于设定的块大小(比如128MB)进行切分。然后TextInputFormat会把输入文件的每一行作为单独的一个记录,按行读取每条记录,此格式对应的key是行的字节偏移量,value则是行的内容。然后后续将键值对发送到mapper进行进一步处理,一个block则对应一个task。
也就是说处理text表的task数目是由HDFS文件的文件数数和split决定的,一个split对应一个task。一个大文件,HDFS 默认根据block size (TDH默认128MB)来分split。
注意:有些压缩文件,是没法分split的,一个文件只能一个task处理
如果是csv格式的话,每个csv文件对应一个task,单个csv文件本身不会被拆分为多个task处理。
再比如说如果数据源是orc格式,那么对应的则是OrcInputFormat,OrcInputFormat的算法逻辑是由参数hive.exec.orc.split.strategy控制在读取ORC表时生成split的策略,策略分为BI、ETL、HYBRID等等,不同策略划分方式不一样。比如BI策略则是以文件为粒度进行split划分。
通常默认是256MB一个split,但是由于切分实际并不准确,经常会有超过256MB的文件不能分split。
在星环的计算框架中Inputformat 主要用于和一些开源的格式适配,它的缺点是速度比较慢,而且接口比较固定不灵活。
所以如果数据源是星环自研的格式,例如holodesk、timelyre、stelallardb等等,则使用的是星环自研的stargate接口。该接口定义了如何分块,不同格式算法不一样,会针对map处理的数据量以及map总数量做一个权衡。
Holodesk表默认是256MB一个split,切分相对较准。
分桶表
一般分桶数就是task数目。Holodesk表超过256MB会切。实际观察下来,TORC一般不会切。
分区分桶表
当两个算子之间需要进行数据的重分布时会需要重新计算下一个Stage的task数,reduce task的数量决定了最终输出文件的数目。
这个阶段的task数量主要是由map 的数量以及和operator类型设置的阈值决定的,计算方式为:
因为星环计算引擎是以Shuffle来切分Stage的,因此只有两个算子之间需要进行数据的重分布(比如ArgoDB中使用Hash Shuffle)时,会需要重新计算下一个Stage的task数,而计算的方式为:
0.6的比值由 hive.groupby.aggregateratio 参数决定
单独计算不同部分的map&reduce task数目,然后相加
当进行limit时会进行预估,裁剪掉不必要的task。默认会先2个Task执行(参数 ngmr.num.parts.try.limit 决定,limit不够再起新的task),因此建议用户不要进行大数据量的limit,否则性能会差很多。
如果需要向目标表中插入数据,数据插入时,Stage的task数取决于是否向分桶表插入数据。
这是因为分桶的逻辑是根据数据的Hash值进行数据的分布计算,将数据写到不同的Tablet中,因此入库时的task数量一般是:
一般跟max(map task数)一致
通过开启AutoMerge,让一个Task读取多个Small Block,来实现减少Task数的效果
手动设置以下参数,会影响读表stage以外的stage的task数,可以增大或减少task数
mapred.reduce.tasks 设置reduce 个数
mapred.minreduce.tasks 设置最小reduce个数
对分桶表插入,如果mapred.reduce.tasks跟桶数不一致,结果可能有问题,所以分桶表插入时,不要设置mapred.reduce.tasks。
主要是通过ngmr.topo.mapred.limit,来控制任意两个Stage之间Map数量和Reduce数量乘积,如果大于这个值则会报错,默认值:1000000000(10亿)