聊聊 Flink SQL增量查询Hudi表

网友投稿 1171 2023-04-22

聊聊 Flink SQL增量查询Hudi表

聊聊 Flink SQL增量查询Hudi表

​官网文档

参数

版本

建表造数:

Hudi 0.9.0Spark 2.4.5

我这里建表造数使用Hudi Spark SQL 0.9.0,目的是为了模拟项目上用Java Client和Spark SQL创建的Hudi表,以验证Hudi Flink SQL增量查询时是否兼容旧版本的Hudi表(大家没有这种需求的,可以使用任何方式正常造数)

查询

建表造数

-- Spark SQL Hudi 0.9.0create table hudi.test_flink_incremental ( id int, name string, price double, ts long, dt string) using hudi partitioned by (dt) options ( primaryKey = 'id', preCombineField = 'ts', type = 'cow');insert into hudi.test_flink_incremental values (1,'a1', 10, 1000, '2022-11-25');insert into hudi.test_flink_incremental values (2,'a2', 20, 2000, '2022-11-25');update hudi.test_flink_incremental set name='hudi2_update' where id = 2;insert into hudi.test_flink_incremental values (3,'a3', 30, 3000, '2022-11-26');insert into hudi.test_flink_incremental values (4,'a4', 40, 4000, '2022-12-26');

2022120515273620221205152723202212051527122022120515270220221205152650

Flink SQL创建Hudi内存表

CREATE TABLE test_flink_incremental ( id int PRIMARY KEY NOT ENFORCED, name VARCHAR(10), price double, ts bigint, dt VARCHAR(10))PARTITIONED BY (dt)WITH ( 'connector' = 'hudi', 'path' = 'hdfs://cluster1/warehouse/tablespace/managed/hive/hudi.db/test_flink_incremental');

建表时不指定增量查询相关的参数,我们在查询时动态指定,这样比较灵活。动态指定参数方法,在查询语句后面加上如下形式的语句

批读

Flink SQL读Hudi有两种模式:批读和流读。默认批读,先看一下批读的增量查询

验证是否包含起始时间和默认结束时间

结果包含起始时间,不指定结束时间默认读到最新的数据

id name price ts dt 4 a4 40.0 4000 dt=2022-12-26 3 a3 30.0 3000 dt=2022-11-26

验证是否包含结束时间

结果包含结束时间

id name price ts dt 3 a3 30.0 3000 dt=2022-11-26 2 hudi2_update 20.0 2000 dt=2022-11-25

验证默认开始时间

这种情况是指定结束时间,但不指定开始时间,如果都不指定,则读表所有的最新版本的记录。

id name price ts dt 2 hudi2_update 20.0 2000 dt=2022-11-25

时间旅行(查询历史记录)

验证是否可以查询历史记录,我们更新id为2的name,更新前name为a2,更新后为hudi2_update,我们验证一下,是否可以通过FlinkSQL查询Hudi历史记录,预期结果查出id=2,name=a2

结果:可以正确查询历史记录

id name price ts dt 2 a2 20.0 2000 dt=2022-11-25

流读

开启流读的参数:

read.streaming.enabled = true

流读不需要设置结束时间,因为一般的需求是读所有的增量数据,我们只需要验证开始时间就好了

验证默认开始时间

select * from test_flink_incremental /*+ options( 'read.streaming.enabled'='true', 'read.streaming.check-interval' = '4') */

id name price ts dt 4 a4 40.0 4000 dt=2022-12-26

验证指定开始时间

结果:

id name price ts dt 2 hudi2_update 20.0 2000 dt=2022-11-25 3 a3 30.0 3000 dt=2022-11-26 4 a4 40.0 4000 dt=2022-11-26

id name price ts dt 1 a1 10.0 1000 dt=2022-11-25 2 hudi2_update 20.0 2000 dt=2022-11-25 3 a3 30.0 3000 dt=2022-11-26 4 a4 40.0 4000 dt=2022-11-26

验证流读的连续性

验证新的增量数据进来,是否可以持续消费Hudi增量数据,验证数据的准确一致性,为了方便验证,我可以使用Flink SQL增量流读Hudi表然后Sink到MySQL表中,最后通过读取MySQL表中的数据验证数据的准确性

先在MySQL中创建一张Sink表

-- MySQLCREATE TABLE `test_sink` ( `id` int(11), `name` text DEFAULT NULL, `price` int(11), `ts` int(11), `dt` text DEFAULT NULL) ENGINE=InnoDB DEFAULT CHARSET=utf8;

Flink中创建对应的sink表

create table test_sink ( id int, name string, price double, ts bigint, dt string) with ( 'connector' = 'jdbc', 'url' = 'jdbc:mysql://192.468.44.128:3306/hudi?useSSL=false&useUnicode=true&characterEncoding=UTF-8&characterSetResults=UTF-8', 'username' = 'root', 'password' = 'root-123', 'table-name' = 'test_sink', 'sink.buffer-flush.max-rows' = '1');

然后流式增量读取Hudi表Sink Mysql

这样会起一个长任务,一直处于running状态,我们可以在yarn-session界面上验证这一点

然后先在MySQL中验证一下历史数据的准确性

再利用Spark SQL往source表插入两条数据

-- Spark SQLinsert into hudi.test_flink_incremental values (5,'a5', 50, 5000, '2022-12-07');insert into hudi.test_flink_incremental values (6,'a6', 60, 6000, '2022-12-07');

我们增量读取的间隔设置的4s,成功插入数据等待4s后,再在MySQL表中验证一下数据

发现新增的数据已经成功Sink到MySQL中了,并且数据没有重复

最后验证一下更新的增量数据,Spark SQL更新Hudi source表

-- Spark SQLupdate hudi.test_flink_incremental set name='hudi5_update' where id = 5;

继续验证结果

结果是更新的增量数据也会insert到MySQL中的sink表,但是不会更新原来的数据

那如果想实现更新的效果呢?我们需要在MySQL和Flink的sink表中加上主键字段,两者缺一不可,如下:

-- MySQLCREATE TABLE `test_sink` ( `id` int(11), `name` text DEFAULT NULL, `price` int(11), `ts` int(11), `dt` text DEFAULT NULL, PRIMARY KEY (`id`)) ENGINE=InnoDB DEFAULT CHARSET=utf8;

-- Flink SQLcreate table test_sink ( id int PRIMARY KEY NOT ENFORCED, name string, price double, ts bigint, dt string) with ( 'connector' = 'jdbc', 'url' = 'jdbc:mysql://192.468.44.128:3306/hudi?useSSL=false&useUnicode=true&characterEncoding=UTF-8&characterSetResults=UTF-8', 'username' = 'root', 'password' = 'root-123', 'table-name' = 'test_sink', 'sink.buffer-flush.max-rows' = '1');

将刚才起的长任务关掉,重新执行刚才的insert语句,先跑一下历史数据,最后再验证一下增量效果

-- Spark SQLupdate hudi.test_flink_incremental set name='hudi6_update' where id = 6;insert into hudi.test_flink_incremental values (7,'a7', 70, 7000, '2022-12-07');

可以看到,达到了预期效果,对于id=6的执行更新操作,对于id=7的执行插入操作。

版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。

上一篇:信创成果持续深化,恒生电子多层次助力金融信创稳步推进
下一篇:如何避免数仓模型 “烟囱式” 建设?
相关文章