首页 > 代码库 > Flink - CoGroup

Flink - CoGroup

使用方式,

dataStream.coGroup(otherStream)    .where(0).equalTo(1)    .window(TumblingEventTimeWindows.of(Time.seconds(3)))    .apply (new CoGroupFunction () {...});

 

可以看到coGroup只是产生CoGroupedStreams

    public <T2> CoGroupedStreams<T, T2> coGroup(DataStream<T2> otherStream) {        return new CoGroupedStreams<>(this, otherStream);    }

 

而where, equalTo只是添加keySelector,对于两个流需要分别指定

keySelector1,keySelector2

 

window设置双流的窗口,很容易理解

 

apply,

       /**         * Completes the co-group operation with the user function that is executed         * for windowed groups.         *         * <p>Note: This method‘s return type does not support setting an operator-specific parallelism.         * Due to binary backwards compatibility, this cannot be altered. Use the         * {@link #with(CoGroupFunction, TypeInformation)} method to set an operator-specific parallelism.         */        public <T> DataStream<T> apply(CoGroupFunction<T1, T2, T> function, TypeInformation<T> resultType) {            //clean the closure            function = input1.getExecutionEnvironment().clean(function);            UnionTypeInfo<T1, T2> unionType = new UnionTypeInfo<>(input1.getType(), input2.getType());            UnionKeySelector<T1, T2, KEY> unionKeySelector = new UnionKeySelector<>(keySelector1, keySelector2);            DataStream<TaggedUnion<T1, T2>> taggedInput1 = input1 //将input1封装成TaggedUnion,很简单,就是赋值到one上                    .map(new Input1Tagger<T1, T2>())                    .setParallelism(input1.getParallelism())                    .returns(unionType);            DataStream<TaggedUnion<T1, T2>> taggedInput2 = input2 //将input2封装成TaggedUnion                    .map(new Input2Tagger<T1, T2>())                    .setParallelism(input2.getParallelism())                    .returns(unionType);            DataStream<TaggedUnion<T1, T2>> unionStream = taggedInput1.union(taggedInput2); //由于现在双流都是TaggedUnion类型,union成一个流,问题被简化            // we explicitly create the keyed stream to manually pass the key type information in            WindowedStream<TaggedUnion<T1, T2>, KEY, W> windowOp = //创建窗口                    new KeyedStream<TaggedUnion<T1, T2>, KEY>(unionStream, unionKeySelector, keyType)                    .window(windowAssigner);            if (trigger != null) { //如果有trigger,evictor,设置上                windowOp.trigger(trigger);            }            if (evictor != null) {                windowOp.evictor(evictor);            }            return windowOp.apply(new CoGroupWindowFunction<T1, T2, T, KEY, W>(function), resultType); //调用window的apply        }

关键理解,他要把两个流变成一个流,这样问题域就变得很简单了

最终调用到WindowedStream的apply,apply是需要保留window里面的所有原始数据的,和reduce不一样

apply的逻辑,是CoGroupWindowFunction

 

private static class CoGroupWindowFunction<T1, T2, T, KEY, W extends Window>            extends WrappingFunction<CoGroupFunction<T1, T2, T>>            implements WindowFunction<TaggedUnion<T1, T2>, T, KEY, W> {        private static final long serialVersionUID = 1L;        public CoGroupWindowFunction(CoGroupFunction<T1, T2, T> userFunction) {            super(userFunction);        }        @Override        public void apply(KEY key,                W window,                Iterable<TaggedUnion<T1, T2>> values,                Collector<T> out) throws Exception {            List<T1> oneValues = new ArrayList<>();            List<T2> twoValues = new ArrayList<>();            for (TaggedUnion<T1, T2> val: values) {                if (val.isOne()) {                    oneValues.add(val.getOne());                } else {                    twoValues.add(val.getTwo());                }            }            wrappedFunction.coGroup(oneValues, twoValues, out);        }    }}

逻辑也非常的简单,就是将该key所在window里面的value,放到oneValues, twoValues两个列表中

最终调用到用户定义的wrappedFunction.coGroup

 

DataStream.join就是用CoGroup实现的

            return input1.coGroup(input2)                    .where(keySelector1)                    .equalTo(keySelector2)                    .window(windowAssigner)                    .trigger(trigger)                    .evictor(evictor)                    .apply(new FlatJoinCoGroupFunction<>(function), resultType);

 

FlatJoinCoGroupFunction

private static class FlatJoinCoGroupFunction<T1, T2, T>            extends WrappingFunction<FlatJoinFunction<T1, T2, T>>            implements CoGroupFunction<T1, T2, T> {        private static final long serialVersionUID = 1L;        public FlatJoinCoGroupFunction(FlatJoinFunction<T1, T2, T> wrappedFunction) {            super(wrappedFunction);        }        @Override        public void coGroup(Iterable<T1> first, Iterable<T2> second, Collector<T> out) throws Exception {            for (T1 val1: first) {                for (T2 val2: second) {                    wrappedFunction.join(val1, val2, out);                }            }        }    }

可以看出当前join是inner join,必须first和second都有的情况下,才会调到用户的join函数

Flink - CoGroup