首页 > 代码库 > Flink批处理优化器之范围分区重写

Flink批处理优化器之范围分区重写

为最终计划应用范围分区重写

Flink的批处理程序允许用户使用partitionByRange API来基于某个(或某些)字段进行按范围分区且可以选择性地指定排序顺序,示例代码如下:

final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

final DataSet<Tuple2<Integer, String>> ds = getTupleDataSet(env);
ds.partitionByRange(0).withOrders(Order.ASCENDING, Order.DESCENDING);

在使用范围分区这一特性时,需要尽可能保证各分区所处理的数据集均衡性以最大化利用计算资源并减少作业的执行时间。为此,优化器提供了范围分区重写器(RangePartitionRewriter)来对范围分区的分区策略进行优化,使其尽可能平均地分配数据,避免数据倾斜。要做到这一点需要对数据集的范围有足够的“了解”,RangePartitionRewriter通过对数据集进行采样来得到分区的范围。接下来我们就来分析RangePartitionRewriter的实现细节。

范围分区重写器

范围分区重写器(RangePartitionRewriter)同样遍历的是最终选择的计划并作用于计划节点(PlanNode),其主要用于在后置遍历时对传输策略为范围分区节点的输入端通道的连接情况进行重写,核心逻辑如下:

//提取当前所有的计划节点的输入通道
final Iterable<Channel> inputChannels = node.getInputs();
//遍历输入通道
for (Channel channel : inputChannels) {
    ShipStrategyType shipStrategy = channel.getShipStrategy();
    // 确保优化的通道的数据传输策略为范围分区
    if (shipStrategy == ShipStrategyType.PARTITION_RANGE) {

        if(channel.getDataDistribution() == null) {
            if (node.isOnDynamicPath()) {
                throw new InvalidProgramException("Range Partitioning not supported within iterations " 
                    + " if users do not supply the data distribution.");
            }

            //对该通道的范围分区进行“重写”,并将当前通道从源计划节点的通道中删除,然后加入新的通道集合
            PlanNode channelSource = channel.getSource();
            List<Channel> newSourceOutputChannels = rewriteRangePartitionChannel(channel);
            channelSource.getOutgoingChannels().remove(channel);
            channelSource.getOutgoingChannels().addAll(newSourceOutputChannels);
        }
    }
}

上述代码段的关键在于对rewriteRangePartitionChannel方法的调用,它封装了对最终计划进行改写的逻辑,改写产生的逻辑Dataflow对比示意图如下:

技术分享

由上图可见,改写后的逻辑dataflow被拆分成两个分支:上层分支主要完成的功能是采样跟构建范围边界,我们将其简称为“采样分支”;下层分支则用于对记录分区的索引进行查找、路由以及相关处理,可简称为“数据处理分支”。两分支之间有一个衔接关系在于:采样分支最终会输出“范围边界”,并将其以广播变量的形式传递给数据处理分支(见图中的虚线部分),数据处理分支将依据范围边界为来自source的记录查找该归属的分区编号。你可能会产生疑惑:依照这种表述来看,采样分支和数据处理分支是有前后的时序依赖关系的,而单纯的逻辑Dataflow中从source分拆的两个分支通常没有这种关系。那么Flink是如何保证该依赖关系的呢?答案在于数据处理分支的第一个channel,其数据交换模式(DataExchangeMode)被设置为Batch模式(见图中括号的标注,没有特别备注的数据交换模式默认都是Pipeline),Batch模式将数据生产者跟消费者解耦并使得它们不必时刻互相依赖(当数据都生产完成之后,消费者才消费),这也避免了数据处理分支开始处理数据流时还没有收到来自采样分支的范围边界广播变量。

具体而言,其核心流程可分解为如下六步:

  1. 为每个分区采样固定数目的记录作为样本;
  2. 让中央协调器从每个分区的样本中采样固定数目的样本作为最终的样本;
  3. 基于最终样本数据构建范围边界;
  4. 将范围边界作为广播变量传递同时为每个记录构建<分区编号,记录>的二元组并输出然后以自定义分区来分区记录;
  5. 找到记录的分区之后,分区编号就没有存在的意义了,因此为流中的记录移除分区编号;
  6. 连接目标节点

关于采样算法的细节我们将会在下一小节专门进行分析,因此这里我们先假设已采样完成并从广播变量中得到了范围边界。接下来我们来分析数据处理分支的核心逻辑。当记录到来后需要确定它要落到哪个分区,这需要对范围边界集合进行查找并定位分区编号,优化器提供了一个RangeBoundaries接口,其定义了一个方法来提供该功能:

int getRangeIndex(T record);

其通用实现CommonRangeBoundaries使用二分查找来实现该方法:

public int getRangeIndex(T record) {
    return binarySearch(record);
}

CommonRangeBoundaries将会被应用在一个名为AssignRangeIndex的UDF(扩展自:RichMapPartitionFunction)中。AssignRangeIndex首先获取“范围边界”这一广播变量,然后构建CommonRangeBoundaries的实例,随之遍历当前聚集的分区数据并一一查找其分区编号以构建二元组,然后输出到下游,代码如下:

public void mapPartition(Iterable<IN> values, Collector<Tuple2<Integer, IN>> out) throws Exception {
    List<Object> broadcastVariable = getRuntimeContext().getBroadcastVariable("RangeBoundaries");
    if (broadcastVariable == null || broadcastVariable.size() != 1) {
        throw new RuntimeException("AssignRangeIndex require a single RangeBoundaries as broadcast input.");
    }
    Object[][] boundaryObjects = (Object[][]) broadcastVariable.get(0);
    RangeBoundaries rangeBoundaries = new CommonRangeBoundaries(typeComparator.createComparator(), 
        boundaryObjects);

    Tuple2<Integer, IN> tupleWithPartitionId = new Tuple2<>();

    for (IN record : values) {
        tupleWithPartitionId.f0 = rangeBoundaries.getRangeIndex(record);
        tupleWithPartitionId.f1 = record;
        out.collect(tupleWithPartitionId);
    }
}

以AssignRangeIndex构建的运算符所产生的计划节点连接着自定义的分区器来对为记录路由到指定的分区:

//以下标为0的字段(也即上面查找到的分区索引)作为分区依据
final FieldList keys = new FieldList(0);
partChannel.setShipStrategy(ShipStrategyType.PARTITION_CUSTOM, keys, 
    idPartitioner, DataExchangeMode.PIPELINED);

当记录(此时已是上面的二元组了)被路由到正确的分区之后,分区编号已没有用了,不需要再往下游传输了,优化器又定义了一个名为RemoveRangeIndex的UDF来移除分区编号,具体的做法是只输出二元组里下标为1的字段。最终将以RemoveRangeIndex构建的运算符所生成的计划节点替换通道原先的source节点并使得其与target节点进行连接。


微信扫码关注公众号:Apache_Flink

技术分享


QQ扫码关注QQ群:Apache Flink学习交流群(123414680)

技术分享

<script type="text/javascript"> $(function () { $(‘pre.prettyprint code‘).each(function () { var lines = $(this).text().split(‘\n‘).length; var $numbering = $(‘
    ‘).addClass(‘pre-numbering‘).hide(); $(this).addClass(‘has-numbering‘).parent().append($numbering); for (i = 1; i <= lines; i++) { $numbering.append($(‘
  • ‘).text(i)); }; $numbering.fadeIn(1700); }); }); </script>

    Flink批处理优化器之范围分区重写