首页 > 代码库 > 浅谈Flink批处理优化器之Join优化

浅谈Flink批处理优化器之Join优化

跟传统的关系型数据库类似,Flink提供了优化器“hint”(提示)以告诉优化器选择一些执行策略。目前优化提示主要针对批处理中的连接(join)。在批处理中共有三个跟连接有关的转换函数:

  • join:默认为等值连接(Equi-join),维基百科将其归类为内连接(inner join)的一种 https://en.wikipedia.org/wiki/Join_(SQL);
  • outerJoin:外连接,具体细分为left-outer join、right-outer join、full-outer join;
  • cross:交叉连接,求两个数据集的笛卡尔积;

完全展开之后共有五种,这也符合ANSI-standard SQL对连接种类的划分。

下文当我们提及“join”时,主要指equi-join,而当我们想表达outer-join时,我们会直接使用“外连接”,当我们想泛指时,我们将使用“连接”这个词。

常用来实现连接的算法有:hash join、sort-merge join以及nested loop join,下面我们对这三种算法进行简单介绍。首先,当基于hash算法实现连接时,通常划分为两个阶段:

  1. build:为参与连接的两个数据集中较小的数据集准备好哈希表,哈希表中的记录包含着连接的属性以及它对应的行。因为哈希表是通过对连接属性应用一个哈希函数来访问的,因此通过它将比扫描初始的数据集更快地发现给定的连接属性对应的行;
  2. probe:一旦哈希表构建完成,会扫描更大的数据集并通过从更小的数据集匹配哈希表以发现相关的行;

而使用sort-merge算法实现连接时,通常也划分为两个阶段:

  1. sort:对两个数据集在它们的连接键属性上进行排序;
  2. merge:合并排过序的数据集;

nested loop实现连接相对更容易理解,它使用两层嵌套循环分别作用于两个参与连接的数据集。

在Flink的DataSet API中,hash和sort-merge算法都可被选择用于实现join和outerJoin,而nested loop只用于实现cross join。

通过上面的介绍,我们得知当选择hash算法来实现连接时,需要确定以哪个输入端作为build端,哪个输入端作为probe端,这是影响其执行效率的因素之一(因为通常选择数据量较小的数据集作为build端)。因此,以hash算法来实现连接时,而不同的选择显然对应着不同的运算符描述器,列举如下:

  • HashJoinBuildFirstProperties
  • HashJoinBuildSecondProperties
  • HashLeftOuterJoinBuildFirstDescriptor
  • HashLeftOuterJoinBuildSecondDescriptor
  • HashRightOuterJoinBuildFirstDescriptor
  • HashRightOuterJoinBuildSecondDescriptor
  • HashFullOuterJoinBuildFirstDescriptor
  • HashFullOuterJoinBuildSecondDescriptor

而当以sort-merge算法来实现连接时,不会区分输入端的特殊职责,也就不存在build阶段和probe阶段,因此运算符描述器只有如下四种:

  • SortMergeInnerJoinDescriptor:
  • SortMergeLeftOuterJoinDescriptor:
  • SortMergeRightOuterJoinDescriptor:
  • SortMergeFullOuterJoinDescriptor:

以上这么多运算符描述器,主要是为它们设置不同的执行策略(DriverStrategy),不同的执行策略直接导致了不同的执行成本。

为了理清算法跟参与连接的输入端的关系,Flink将它们区分成两种不同策略的:本地策略以及传输(ship)策略。其中传输策略表示如何移动两个输入端中的数据使得它们具备连接的条件;本地策略则指两个已在本地的输入端数据集所执行的连接算法。

我们来解释一下这两种策略,假设有两个待连接的数据集(R和S)。传输策略有如下两种:

  • Broadcast-Forward strategy (BF):该策略会将一个完整的数据集,比如R,广播到数据集S的每一个分区上,而数据集S的所有数据则一直处于本地,无需网络传输;
  • Repartition-Repartition strategy (RR):以相同的分区函数以及用于连接的键属性分区两个数据集R、S;

正如上面已经提及的,本地策略也即连接的实现算法也有两种:

  • Sort-Merge-Join strategy (SM):首先对两个输入端的数据集在它们的连接键属性上进行排序(排序阶段),然后合并排过序的数据集(合并阶段);
  • Hybrid-Hash-Join strategy (HH):分为构建阶段和探索阶段;

在不指定“Hint”的情况下,Flink在进行批处理优化时会根据成本自动选择传输策略以及本地策略。优化器的一个关键特征是它会根据已经存在的数据属性来进行推理。就连接运算而言,如果某一个输入端的数据量远小于另一输入端,Flink会倾向于选择BF传输策略,将较小的输入端广播给较大的输入端的每一个分区,并在本地策略中选择HH且以较小的输入端作为HH的构建端;如果优化器得知某个(或两个)输入端已排好序,那么生成的候选计划将不再重分区该输入端,此时它更倾向于选择RR传输策略以及SM本地策略。

除了优化器的自动选择,当用户对数据集非常了解的情况下,Flink定义了JoinHint允许用户为join(inner join)指定连接策略给予优化器提示。JoinHint提供了人为选择连接策略的灵活性,其使用方式有两种,一种是直接指定两个输入端的大小:

DataSet<Tuple2<Integer, String>> input1 = // [...]
DataSet<Tuple2<Integer, String>> input2 = // [...]

DataSet<Tuple2<Tuple2<Integer, String>, Tuple2<Integer, String>>>
    result1 = input1.joinWithTiny(input2)    //提示优化器第二个数据集比第一个数据集小得多
        .where(0)
        .equalTo(0);

DataSet<Tuple2<Tuple2<Integer, String>, Tuple2<Integer, String>>>
    result2 = input1.joinWithHuge(input2)    //提示优化器第二个数据集比第一个数据集大得多
        .where(0)
        .equalTo(0);

另一种是直接指定连接策略:

DataSet<SomeType> input1 = // [...]
DataSet<AnotherType> input2 = // [...]

DataSet<Tuple2<SomeType, AnotherType> result =
      input1.join(input2, JoinHint.BROADCAST_HASH_FIRST)
            .where("id").equalTo("key");

当前有如下的这些策略可供选择:

  • OPTIMIZER_CHOOSES:将选择权交予Flink优化器,相当于没有给提示;
  • BROADCAST_HASH_FIRST:广播第一个输入端,同时基于它构建一个哈希表,而第二个输入端作为探索端,选择这种策略的场景是第一个输入端规模很小;
  • BROADCAST_HASH_SECOND:广播第二个输入端并基于它构建哈希表,第一个输入端作为探索端,选择这种策略的场景是第二个输入端的规模很小;
  • REPARTITION_HASH_FIRST:该策略会导致两个输入端都会被重分区,但会基于第一个输入端构建哈希表。该策略适用于第一个输入端数据量小于第二个输入端的数据量,但这两个输入端的规模仍然很大,优化器也是当没有办法估算大小,没有已存在的分区以及排序顺序可被使用时系统默认采用的策略;
  • REPARTITION_HASH_SECOND:该策略会导致两个输入端都会被重分区,但会基于第二个输入端构建哈希表。该策略适用于两个输入端的规模都很大,但第二个输入端的数据量小于第一个输入端的情况;
  • REPARTITION_SORT_MERGE:输入端被以流的形式进行连接并合并成排过序的输入。该策略适用于一个或两个输入端都已排过序的情况;

对应到优化器中,JoinHint被用来指定创建何种运算符描述器,由于JoinHint只适应于join,所以它只对应如下这些运算符描述器:

  • HashJoinBuildFirstProperties
  • HashJoinBuildSecondProperties
  • SortMergeInnerJoinDescriptor

因此,如果用户给出了JoinHint,则数据属性(其实这里主要是DriverStrategy)会通过以上三种运算符描述器来提供:

joinHint = joinHint == null ? JoinHint.OPTIMIZER_CHOOSES : joinHint;

switch (joinHint) {
    case BROADCAST_HASH_FIRST:
        list.add(new HashJoinBuildFirstProperties(this.keys1, this.keys2, true, false, false));
        break;
    case BROADCAST_HASH_SECOND:
        list.add(new HashJoinBuildSecondProperties(this.keys1, this.keys2, false, true, false));
        break;
    case REPARTITION_HASH_FIRST:
        list.add(new HashJoinBuildFirstProperties(this.keys1, this.keys2, false, false, true));
        break;
    case REPARTITION_HASH_SECOND:
        list.add(new HashJoinBuildSecondProperties(this.keys1, this.keys2, false, false, true));
        break;
    case REPARTITION_SORT_MERGE:
        list.add(new SortMergeInnerJoinDescriptor(this.keys1, this.keys2, false, false, true));
        break;
    case OPTIMIZER_CHOOSES:
        list.add(new SortMergeInnerJoinDescriptor(this.keys1, this.keys2));
        list.add(new HashJoinBuildFirstProperties(this.keys1, this.keys2));
        list.add(new HashJoinBuildSecondProperties(this.keys1, this.keys2));
        break;
    default:
        throw new CompilerException("Unrecognized join hint: " + joinHint);
}

由代码段可见,当将选择权交给优化器时,它会将三种运算符描述器都作为数据属性,供后续生成候选计划时再剔除。

除了针对join的提示外,Flink还提供了针对求交叉连接的提示CrossHint,该提示主要是针对输入端的数据量大小。使用示例如下:

DataSet<Tuple2<Integer, String>> input1 = // [...]
DataSet<Tuple2<Integer, String>> input2 = // [...]

DataSet<Tuple4<Integer, String, Integer, String>>
    udfResult = input1.crossWithTiny(input2)        //提示第二个数据集非常小
    // apply any Cross function (or projection)
    .with(new MyCrosser());

DataSet<Tuple3<Integer, Integer, String>>
    projectResult = input1.crossWithHuge(input2)    //提示第二个数据集非常大
    // apply a projection (or any Cross function)
    .projectFirst(0,1).projectSecond(1);

不同于Join提示,Cross提示被表述为不同的API。从代码层面上来看,CrossHint有三个枚举值:

  • OPTIMIZER_CHOOSES:将选择权交给Flink优化器;
  • FIRST_IS_SMALL:第一个输入端小于第二个输入端;
  • SECOND_IS_SMALL:第二个输入端数据量小于第一个输入端;

在创建相关运算符描述器CrossHint被用来指定特定的构造参数,比如是允许第一个输入端广播还是第二个输入端广播。交叉连接的实现算法为nested-loop,关于运算符描述器,考虑到以哪个数据集作为内、外层循环以及以阻塞模型还是流模型来处理这两个因素,有四种实现:

  • CrossBlockOuterFirstDescriptor:以第二个输入端作为内循环,第一个输入端作为外循环且以阻塞形式处理;
  • CrossBlockOuterSecondDescriptor:以第一个输入端作为内循环,第二个输入端作为外循环且以阻塞形式处理;
  • CrossStreamOuterFirstDescriptor:以第二个输入端作为内循环,第一个输入端作为外循环且以流模型处理;
  • CrossStreamOuterSecondDescriptor:以第一个输入端作为内循环,第二个输入端作为外循环且以流模型处理;

且需要注意的是,不同的处理模型,哪个输入端作为内外循环是相反的:

else if (hint == CrossHint.SECOND_IS_SMALL) {
    ArrayList<OperatorDescriptorDual> list = new ArrayList<OperatorDescriptorDual>();
    list.add(new CrossBlockOuterSecondDescriptor(false, true));
    list.add(new CrossStreamOuterFirstDescriptor(false, true));
    this.dataProperties = list;
}
else if (hint == CrossHint.FIRST_IS_SMALL) {
    ArrayList<OperatorDescriptorDual> list = new ArrayList<OperatorDescriptorDual>();
    list.add(new CrossBlockOuterFirstDescriptor(true, false));
    list.add(new CrossStreamOuterSecondDescriptor(true, false));
    this.dataProperties = list;
}

但广播哪个输入端是一致的。


微信扫码关注公众号:Apache_Flink

技术分享


QQ扫码关注QQ群:Apache Flink学习交流群(123414680)

技术分享

<script type="text/javascript"> $(function () { $(‘pre.prettyprint code‘).each(function () { var lines = $(this).text().split(‘\n‘).length; var $numbering = $(‘
    ‘).addClass(‘pre-numbering‘).hide(); $(this).addClass(‘has-numbering‘).parent().append($numbering); for (i = 1; i <= lines; i++) { $numbering.append($(‘
  • ‘).text(i)); }; $numbering.fadeIn(1700); }); }); </script>

    浅谈Flink批处理优化器之Join优化