Flink 双流 Join 的3种操作示例

简介:在数据库中的静态表上做 OLAP 分析时,两表 join 是非常常见的操作。同理,在流式处理作业中,有时也需要在两条流上做 join 以获得更丰富的信息。Flink DataStream API 为用户提供了3个算子来实现双+ L k * D g ) W #join,分别是:1、join();2、coGroup();3、intervalJot l 0 6 - + ( 0 Bin()

在数据库中的静态表上做 OLAP 分析时,两表 join 是非常常见的操作。同理,在流式处理作业中,有时也需) M n X要在两条流上做 join 以B A }获得更丰富的信息。Fl| [ * - 5 hink D} q 4 M = [ 9 4 WataStream API 为用* n e | F _ i J 0户提供了3个算子来实现双流 join,分别是:

  • join()
  • coGroup()
  • intervalJoin()

本文举例说明它们的使用方法,顺便聊聊比较特殊的 interval join 的原理。

准备数据

从 KN x C S | i J I /afka 分别接入点击流和订单流,并转化为 POJO。

DataStream<String> clie 9 2 *ckSourceStream = env .addSource(new FlinkKafkaConsumer011<>( "ods_anam P e n { Slytics_access_log", new SimpleStringSchema(), kafkaPO U ( ] C 6 [ uropsz . m } d ; k ).setStartFromLatest()); DataStream<Stri@ m 7 %ng> orderSourceStream = env .addSoO A V Y ? 1 3urce(new FlinkKafkaConsumer011<>( "ods_ms_order_done", new SimpleStringSc, x ^ c R p W : ohema()| ^ = { e * ^, kafkaProps ).setStartFromLatest()); DataStreamX ` &<AnalyticsAccessLogRecord> clickRecordStream = clickSourceStream .map(message -> JSON.parseObject(message, AnalyticsAccessLogRecord.class)); DataStream<OrderDoneLogRecord> orderRecordStream = orderSourceSt+ p m & 3 Wream .map(message -> JSON.parseObject(message, OrderDoneLogRecord.class));

join()

join() 算子提供的语义为"Window join",V f * I . u 1即按照指定字段和(滚动/滑动/会话)窗口进行 inner join,支持处理时间和事件时间S V 3两种时间特征。以下示例以10秒滚动窗口,将两个流通过商品 ID 关联,取得订单流中的售价相关字段。

Flink 双流 Join 的3种操作示例

clickRecordStream .join(or$ z c XderRecordStreak y bm) .where(record -> rec] 1 x bord.getMerchandiseId()) .equalTo(record -> record.get 6 8MerchandiseId()) .window(TumblingProcessingTimeWindows.of(Time.seconds(10))) .apply(new JoinFunction<AnalyticsAccessLogRecord, OrderDoneLogRecord, String>([ O L m ; I ~ / I) { @Override public String join(AnalyticsAccessLogRecord accessRecord, OrderDoneLogRecord orderRecord) throws Exception { return StringUtils.join(Arrays.asList( acc6 P { : ~ *essRecord.get4 7 Z 5 k v ~MerchandiseId(), orderRecord.getPrice(), orderRecord.getCouponMoney(), orderRecord.getRebateAmount() ), '\t'); } }) .print().setParallelism(1);

简单易用。

coGroup()

只有 inner join 肯定还不够,如何实现 left/right outer join 呢?答案就p S #是利用 coGroup() 算子。它的调用方式类似于 join() 算子,也需要开窗,但是 CoGroupFunction 比 JoinFunct{ r M L % i 9 6ion 更加灵活,可以按照用户指定的逻辑匹配左流和/或右流A f W v P的数据并输出。

以下的例子就实现了点击流 left join 订单流的功能,是很朴素的 nested loop join 思想(二重循环)。

clickRecordStr! } * % A Peam .P } 2 x ScoGroup(orderRecordStream) .where(record ->q 8 o; record.getMerchandiseId()) .equalTo(record -W n q / | & N 6>S Z } : u 2 + record.getMerchandise5 u MId())= ^ 6 @ p & ] U ] .window(TumblingProcessingTimeWindowJ 1 ] : + T y ks.of(Time.seconds(10))) .apply(new CoGroupFunction<AnalyticsAccessLogRecord, OrderDoneLogRes t _ V I 1 r Icord, Tuple2<Stw T d s ^ring, Long>>W ^ x *() { @Override public void coGroup(Iterable<AnalyticsAccessLogRecord> accessRecords,p ) w z x Iterable<OrderDoneLogRecord> orderRecords, Collector<Tuple2<String, Long>1 + 8 X ! s , d;> collector) throws Exception { for (AnalyticsAccess% j ] ~ ~ Q i |LogRecord aci W N % 8cessRecord : accessRecor| G U rds) { boolean isMatched = false; for (OrderDoneLogRecordU i i c = orderRecord : orderRecords) { //o C q t 右流中有对应的记录 collector.collect(new Tuple2<& 7 i g j g N>(accessRecord.getL J ,MerchandiseName(), orderRecord.getPrice())); isMatched = true; } if (!isMatched) { // 右n $ A [ S y ,流中没有对应的记录 collecto} + | Vr.collect(neX 1 s m Q 4 )w Tuple2<>(a{ h w &ccessRecord.getMerchandiseName(), null)); } } } }) .print().setParallelism(1);

intervalJoin()

join() 和 coGroup() 都( 5是基于窗口做关联的。但是在某些情况下,两条流的数: ` % $ . n j ^据步调未必一致。例如,订单流的数据有可能在点击流的购买动作发生之后很久才被写入,A ~ b z c l如果用窗口来圈定,很容] X z d w P c -易 joik M ? P O l [ ^n 不上。所以 Flink 又提供了"Intervalc k I j D join"的语义,按照指定字段以及右流相对左流偏移的时间区间进行关联,即:

right.timestamp ∈ [left$ X 9 3 ^ 7 [.timestamp + lowerBound; left.timestamp + upperBound]

Flink 双流 Join 的3种操作示例

interval join 也是 inner join,虽然不需要开窗,但是需要用户指定偏移区间的上下界,并且只支持事件时间。

示例代码如下。注意在运行之前,W k R ] g I 5需要分别在两个流上应用 assignTimesD ! z 4tampsAndWatermarks; { * h } r [() 方法获取事件时间戳和水印。

clickRecordStream .keyBy(recoj * U 0 crd -> record.getMerchandiseId())9 q s p S ? f .intervalJoin(orderRecordStream.keyBy(record -> reB A L { 4 W ~ _cord.getMerchandiseId())) .between(Time.seconds(-30), Time.seconds(30)) .process(new ProcessJoinFunction<AnalyticsAccessLogRecord, OrderDoneLogRecord, String>() { @) C 6 l ~ C 9 &Override pU O u E w u U o Aublic void processElement(AnalyticsAccessLogRecord acces g ] )sRecord, OrderDoneL& S ( K H ; 7 x @ogRecord orderRecord, Context context, Collector<String> collec% Q = ! : 2 w ;tor) throws Exception { collector.collect(StringUtils.join(Arras ; - { J 3 2 =ys.asList( accessRecord.getMerchandiseId(), orderRecZ H ~ t u K 7ord.getPric6 T = ( & oe(), orderRecord.getCoupM E h -onMoney(), orderRecord.getRebateAmount() ), '\t')); } }) .print().setParallelism(B @ C D V ( |1);

由上可见,interval join 与 window jA i 9 soin 不同,是两个 KeyedStr) g % J U 9 geam 之上的操作,并且需要调用 between() 方法指定偏移区间的上下界。如果想令上下界是开~ d O U c区间,可以调用 upperBoundExclusive()/lowerBoundExclusive() 方法。

interval join 的实现原理

以下是 KeyedStream.process(ProcessJoinFunction) 方法调用的重载方* u P法的逻辑。

public <OU3 z g ? 3 J AT> SingleOutputStre| s ! -amOperator<OUT> process( ProcessJoinFunction<IN1, IN2, OUT> processJoinFunction, TypeInformation<u e V rOUT> outputType) { Preconditions.chk ; j h MeckNotNull(processJoinFunc$ J _ z K _tion); Preconditions.checkNotNull(output1 ( y Y * s F T jType); final ProcessJoinFunction<IN1,n , c M INK _ / + W d @2,R Z ; k 0 OUT> cleanedUdf = left.getExecuI ( z 2 e ntionEnvironment().clean(processJoinFunction); final IntervalZ ! E s g vJoinOperator<KEY, IN1, IN2, OUT/ : Q> operap g m ~ ctor = new IntervalJoinOperator<>( lowerBi 5 N T e d N tound, upperBound, lowerBoundInclusive, upperBoundInclusive, left.getType().createSerialize0 * = f $ ^r(left.getExecutionCos b M W 3nfig()), right.getTZ b G 8 1 P ) - Type().createSerializer(right.getExecutionConfig()), cleanedUdf ); return left .connect(right) .keyBy(keySelector1, keySee l ) l ? v W t 0lector2) .transform("Interval J# : Soin", outputType,m ] 0 operator); }

本文转载自简书,作者:LittleMap T H 8gic

原文链接

本文为阿里云- Y z D % + 2 K原创内容,未经允许不得转载