Inceptor的后台执行引擎是Spark,Spark的基本执行单位是Task,Task的数目是由RDD的分区数决定的。Inceptor中把一个SQL的执行划分为map阶段和reduce阶段,这两个阶段的task数目受到很多因素影响。
Map阶段
数据集主要是存储在磁盘中,在启动一个MR程序后,数据会被拆分成一个或多个InputSplits并分配给单独的Mapper生成对应数量的map tasks,数据的拆分方式取决于不同的数据源,受很多因素影响。
如果是开源的格式,例如txt,orc等,那么InputFormat则定义了如何拆分和读取Input Files以及RecordReader。
举个例子,如果数据源是txt格式,那么数据在输入时会基于设定的块大小(比如128MB)进行切分。然后TextInputFormat会把输入文件的每一行作为单独的一个记录,按行读取每条记录,此格式对应的key是行的字节偏移量,value则是行的内容。然后后续将键值对发送到mapper进行进一步处理,一个block则对应一个task。
如果是csv格式的话,每个csv文件对应一个task,单个csv文件本身不会被拆分为多个task处理。
再比如说如果数据源是orc格式,那么对应的则是OrcInputFormat,OrcInputFormat的算法逻辑是由参数hive.exec.orc.split.strategy控制在读取ORC表时生成split的策略,策略分为BI、ETL、HYBRID等等,不同策略划分方式不一样。比如BI策略则是以文件为粒度进行split划分。
在星环的计算框架中Inputformat 主要用于和一些开源的格式适配,它的缺点是速度比较慢,而且接口比较固定不灵活。
所以如果数据源是星环自研的格式,例如holodesk、timelyre、stelallardb等等,则使用的是星环自研的stargate接口。该接口定义了如何分块,不同格式算法不一样,会针对map处理的数据量以及map总数量做一个权衡。
当两个算子之间需要进行数据的重分布时会需要重新计算下一个Stage的task数,reduce task的数量决定了最终输出文件的数目。
这个阶段的task数量主要是由map 的数量以及和operator类型设置的阈值决定的,计算方式为: