数据湖有新解!Apache Hudi 与 Apache Flink 集成

简介:纵观大数据领域成熟、活跃、有生命力的框架,无一不是设计优雅,能与其他框架相互融合,彼此借力,各专所长。

Apache Hudi 是由 Uber 开发并开源的数据湖框架,它于 019" target="_blank">2019 年 1 月进入 Apache 孵化器孵化,次年 5 月份顺利毕业晋升为 Apache 顶级项目。是当前最为热门的数据湖框架之一2 M 6 l F

1. 为何要解耦

Hudi 自诞生至今一直使用 Spark 作为其数据处理引擎。如果用户想使用 Hudi 作为其数据湖框架,就必须在其平台技术栈中引入 Spark。放在几年前,使用 Spark 作为大数据处理引擎可以说是很平常甚至是理所当然的Q q B Q事。因为 Spark 既可以进行批处理也可以使用微批模拟流,流批一体,一套引擎解决流、批问题。然而,近年来,随着大数据技术的发展,同为大数据处理引擎的 Flink 逐渐进入人们的视野,并在计算引擎领域获占据了一定的市场,大数据处理引擎不再是一家独大。在大数据技术社区、论坛等领地,Hudi 是否支持o 0 ] ( 1 ( t使用 Flink 计算引擎的的声音开始逐渐出现,并日渐频繁。所以使 Hudi 支持 Flink 引擎是个有价值的事情,而集成 Flink 引擎的前提是 Hudi 与 Spa$ B n rrk 解耦。

同时,纵观大数据Q { M领域成熟、活跃、有生命力的框架,无一不是设计优雅,能与其他框架相互融合,彼此借力,各专所长。因此将 Hudi 与 Spark 解耦,将其变成一个引擎无关的数据湖框架,无疑是给 Hudi 与其他组件的融合创造了更多的可p p 9 . d ! _能,使得 Hudi 能更好的融入大数据生态圈。

2. 解耦难点

Hudi 内部使用 Spark API 像我们平时开发使用 List 一样稀) z s ` c - D k x松平常。自从数据源读取数据,到最终写出数据到表,无处不是使用 Spark RDD 作为主要数据结构,甚至连普通的工具类,都使i P ( * ( y & P }用 Spark API 实现,可以说 Hudi 就是用 Spark 实现的一个通用数据湖框架,它与 Spark 的绑定可] u s谓是深入骨髓。

此外C D 4,此次解耦后集成的首要引擎是 Flink。而 Flink 与 Spark 在核心抽象上差L p i : Z r异很大。Spark 认为数据是有界的,其核心抽象是一个有限的数W E s ^ V据集合。而 Flink 则认为数据的本质是流,其核心抽象 DataStream 中包含的是各种对数据的操作。同时,Hudi 内部还存在多P F s 6 w | Q M处同时操作多个 RDD,以及将一个 RDD 的处理Y o | @结果与另一个 RDU D ID 联合处理的情况,这种抽象上的区别以及实现时对于中间结果的复用,使得 Hudi 在解耦抽象上难O a f D A C K C以使用统一的 API 同时操作 RDD 和 DataStream。

3. 解H 8 Y w ! u耦思路

理论上,Hudi 使用 Spark 作为其计算引擎无非是为了使用 Spark 的分布式计算能力以及 RDD 丰富的算子能力。抛开分布式计算能力外,HV X &udi 更多是把 RDD 作为一个数据结构抽象,而 RDD 本质上又是一个有界数据集,因此,把 RDD 换成 List,在理论上完全可行(当然,可能会牺牲些A 0 @ k ( Q z性能)。为了8 / 4 . T h V (尽可能保证 Hudi Spark 版本的性能和稳定性。我们可以保留将有界数据集作为基本操作单位的设定,Hudi 主要操作 API 不变,将 RDD 抽取为一个泛型,Spark 引擎实现仍旧使用 RDD,其他引擎则根据实际g Q l情况使用 List 或者其他有界B J o W数据集。

解耦原则:

1)统一泛型。Spark Aq g f iPI 用到的 JavaRDD,JavaRDD,JavaRDD 统一使用泛型 I,K,O 代替;

2)去 Spark 化。抽象层所有 API 必须与 Spark 无关。涉及到具体操作难以在抽象层实现的,改写为抽象# C t C # u @ 0方法,引入 Spark 子类实现。

例如:Hu- h f G m G . 7di 内部多处使用到了 JavaSparkContext#map() 方法,去 Spark 化,则需要将 JavaSparkContext 隐藏,针对该问题我们引入了 HoodieEngineContext#map() 方法,该方法会屏蔽 map 的具体实现细节,从而在抽象成实现去 Spark 化。

3), U 3 I ~ / K &抽象层尽量减少改动,保证 Hudi 原版功能和性能;

4)使用 HoodieEngineContext 抽象类替换 JavaSparkContext,提供运行环境上下文。

4.Flink 集成设计

Hudi 的写操作在本质上是批处理,DeltaStreamer 的连续模式是通过循环进行批处理实现的。为使用统一 API,Hudi 集成 Flink 时选择r { d $ + M 9攒一批数据后再进行处理,最后统一进行提交(这里 Flink 我们使用 List 来攒批数据m P E o)。

攒批操作最容易想到的是通过使用时间窗口来实现,然而,使用窗口,在某个窗口没有数据流入时,将g K r没有输出数据,Sink 端难以判断同一批数据是否已经处理完。o & @ N I ] 0 ]因此我们{ H j f h { 使用 Flink 的检查点机制来攒批,每两个 Barrier 之间的数据为一个批次,当某个子任务中没有数据时,mock 结果数据凑数。这样在 Sink 端,当每个子任务都有结果数据下发时即可认为一批数据已经处理完成,可以执行 commit。

DAG 如下:

数据湖有新解!Apache Hudi 与 Apache Flink 集成

  • source 接收 Kafka 数据,转换成 List;
  • InstantGeneratorOperator 生成全局唯一的 instant.当上一个 instant 未完成或者当 8 r s ( q 1 ~前批次无数g Y C N -据时,X Z t r h Q O ! W不创建新的 instant;
  • KeyBy partitionPath 根据 partitionPath 分区,避免多个子任务写同一个分区;
  • WriteProcessOperator 执行写操作,当当前分区无数据时,向下游发送空的结果数据凑数;
  • CommitSink 接收上游任务的计算结果,当收到 parallelism 个结果时,认为上游子任务全部执行完成,执行 co6 i $ ~ ^mmit.

注:InstantGeneratorOperator 和 WriteProcessOperator 均为自定义的 Flink 算子,InstantGeneratorOperator 会在其内部阻塞检查上一个 instant 的状态,保证全局只有一个 inflight(或 requested)状b d G l w Q + b ^态的 instant.WriteProcessOperatD u e # | 4 5 k xor 是实际执行写操作的地方,其写操作在 checkpoint 时触发。

5. 实现示例

1) HoodieTable

/**

* Abstract implementation of a HoodieT( K / ! N t Z h _able.

* @param <T> Sub type o5 o c K = sf HoodieRecordPayload

* @param <I>N u A W q Type of inputs

* @param <K> Type of keys

* @param <O> Type of outputs

*/

public abstract class HoodieTable<T extends HoodieRecordPayload, I, K, O> implements Serializable {- G =

protected final HoodieWriteConfig con0 - I - Sfig;

protected final HoodieTableMetaCd # w X Elient metaClient;

protected final HoodieIndex<T,L / . 1 I, K, O> index;

public# t ( 7 3 z V abstract HoodieWrite7 C x B w 6 n AMetadata&l, E xt;O> upsert(HoodieEngineContext context, String instantTime,

I recorW n A 8 $ds);

public abstract HoodieWriteMetada9 o c ~ 2 bta<O> insert(HoodieEngineContext context, String inst9 c _ d -antTimN ^ * x _ L ie,

I records);

public abstract HI , 8 8 + ; ioodieWz 2 riteMetadata<O&3 L / % m $ @ Ngt; bu E m Q j t 0lkInsert(HoodieEP N ` 5 QngineContext contei : %xt, String instantTime,

I records, Option<BulkInsex x f b , ? -rtPartitioner<I>> bulkInsertPartitioner);

}

HoodieTable 是 Hudi 的核心抽象之一,其中定义了表支持的 insert,upsert,bulkInsert 等操作。以 upsert 为例,输入数据由原先的 JavaRDD inputRdds 换成了 I records, 运行时 JavaSparkf L H $ ^ e =Contex? 6 6 _ T 2t jsc 换成了 HoodieEngineContext context.

从类注释可以看到 T,I,K,O 分别代表了 Hudi 操作的负载数据类型、输入数据类型、主键类型以及输出数据类型。这些泛型将贯穿整个抽象层。

2) H9 { / X 6 J P toodieEngineContext

/**Z B $ 3

* Base class contaf U m - + E ins the context infoh O G T B 5 [ +rmation n h P Geeded by the engine aX g D g # { ?t runtime. I! e B It will be extendedR Y D t - ; w g 2 by different

* engine implementation if needed.

*/

public abstract class HoodiP n + ceEngineContext {

public abstract <I, O> List<O> mapQ - * M k X 9 ,(List<I> data, SerializableFunction<I, O> func, int parallelism);

public abstract <I, O> List<O> flatMap(List<I> data, SerializableFy 5 # ` { R L 1unction<I, Stream<O>> func, iN u 7 v Fnt parallelism);

public abstract <I> void foreach(List<I> data, SerializableConsu) 3 m 5mer<I> consumer, int parallelism);

}

HoodieEngineContexS Q Q }t 扮演了 JavaSpT ; # I s e ) b 0arkContext 的角色,它不仅能提供所有 Jav# k ! @ G X r raSparkContext 能提供的信息,还封装了 map,flatMap,foreA D a @ A 6 { bach 等诸多方法,隐藏了 JavaSparkCC 2 : `ontext#map(),JavaSparkContext#flatMap(),JavaSparkCx d n 3 U ) e 7 yontext#foreach() 等方法的具体实现。

以 map 方法为例,在 Spark 的实现类 HoodieSparkEngineContext 中,map 方法如下:

@Override

public <I, O> List<O> map(List<I> data, SerializableFunctL 0 ( }ion<I, O> fup E z 8 7 2nc, int parallelism) {

return javaSparkContext.parallelize(data, paralleliv ) 7sm).map{ x } G n - z L t(func::appT G b V 9ly).collect();

}

在操作 List 的引擎中其实现可以为(不同方法需注意线程安全问题,慎用 parallel()):

@Override

public <I, O> LisM # Y y V xt<O> map(List<I> data, SerializableFunction<I, O> func, int parallelism) {

return data.stQ F 8 q h @ Kream().parallel().map(func::apply).collect(Collectors.toList());

}

注:map 函数中抛出的异常,可以通过包装 SerializableFuz V s B ( enction func 解决.

这里简要介绍下 SerializableFunctionp j : y R G 3:

@FunctionalInterfac: l / Ze

public interface SerializableFunction<I, O> extends SerJ m + _ 5 ializable {

O apply(I v1) throws Exception;

}

该方法实际上是 java.util.function.Function 的变种,与java.util.function.Function 不同的是 SerializableFunction 可以序列化,可以抛异常。引入该函数是因为 JaH W 0 { c / - *vaSparkContext#map() 函数能接收的入参必须可序列,同时在hudi的逻辑中,有多处需要抛异常,而d 5 2 % c f在 Lambda 表达式中进行 try catce _ - 6 2 k Kh 代码会略显臃肿,不太优雅。

6.现状和后续计划

6.1 工作时间轴

2020 年 4 月,T3 出行(杨华@vinoyang,王祥虎@wangxianghu)和阿里巴巴的同学(李少锋@leesf)以及若干其他小伙伴一起设计、敲定了该解耦方= : r C x S案;

2020 年 4 月,T3 出行(王祥虎@w1 m n 3 .angxianghu)在内部完成了编码实现,并进行了初步验证,得出方案可行的结论| ^ + p a

2020 年 7 月,T3 出行(王祥虎@w A [ Kwangxianghu)将该设计实现和基于新抽象实现的 Spark 版本推向社5 2 v _ s区(HUDI-1089);

2020 年 9 月 26 日,顺丰科技基于 T3 内部分支修改完善的版本在 Apache Flink Meetup) e P C S &(深圳3 p T |站)公开 PR, 使其成为业界第一个在线上使用 F I plink 将数据写 Hudi 的企业。

2020 年 10 月 2 日,HUDI-1089 合并入 Hudi 主分A # 8 m T 7 ) t支,标志着 Hudi-Spark 解耦完成。

6.2 后续计划

1)推进 Hudi 和 Flink 集成

将 Flink 与 Hudi 的集成尽快推向社区,初期该特性可能只支持 Kafka 数据源。

2)性B 9 p ? 7 ! p c $能优化

为保证 Hudi-Spark 版本的稳定性和性能,此次解耦没有太多考虑 Flin= x i H , } S d :k 版本可能存在的性能问题。

3)类C Y | X T = N & flink-connector-hudi 第三方包开发

将 Hudi-Flink 的绑定做成第三方包,用户可以在 Flink 应用中以编码方式读取任意数据源,通过这个第三方包写入 Hudi。

作者:王祥虎

本文为阿里云原创内容,未经允许不得转载。