Flink SQL 知其所以然:SQL DDL!

网友投稿 1547 2023-06-15

Flink SQL 知其所以然:SQL DDL!

Flink SQL 知其所以然:SQL DDL!

SQL 语法篇

一、DDL:Create 子句

大家好,我是老羊,今天来学一波 Flink SQL 中的 DDL。

CREATE 语句用于向当前或指定的 Catalog 中注册库、表、视图或函数。注册后的库、表、视图和函数可以在 SQL 查询中使用。

目前 Flink SQL 支持下列 CREATE 语句:

CREATE TABLE。 CREATE DATABASE。CREATE VIEW。 CREATE FUNCTION。

此节重点介绍建表,建数据库、视图和 UDF 会在后面的扩展章节进行介绍。

1、建表语句

下面的 SQL 语句就是建表语句的定义,根据指定的表名创建一个表,如果同名表已经在 catalog 中存在了,则无法注册。

2、表中的列

常规列(即物理列)

物理列是数据库中所说的常规列。其定义了物理介质中存储的数据中字段的名称、类型和顺序。

举一个仅包含常规列的表的案例:

CREATE TABLE MyTable ( `user_id` BIGINT, `name` STRING) WITH ( ...);

元数据列

元数据列是 SQL 标准的扩展,允许访问数据源本身具有的一些元数据。元数据列由 METADATA 关键字标识。

例如,我们可以使用元数据列从 Kafka 数据中读取 Kafka 数据自带的时间戳(这个时间戳不是数据中的某个时间戳字段,而是数据写入 Kafka 时,Kafka 引擎给这条数据打上的时间戳标记),然后我们可以在 Flink SQL 中使用这个时间戳,比如进行基于时间的窗口操作。

举例:

CREATE TABLE MyTable ( `user_id` BIGINT, `name` STRING, -- 读取 kafka 本身自带的时间戳 `record_time` TIMESTAMP_LTZ(3) METADATA FROM 'timestamp') WITH ( 'connector' = 'kafka' ...);

元数据列可以用于后续数据的处理,或者写入到目标表中。

举例:

INSERT INTO MyTable SELECT user_id , name , record_time + INTERVAL '1' SECOND FROM MyTable;

如果自定义的列名称和 Connector 中定义 metadata 字段的名称一样的话,FROM xxx 子句是可以被省略的。

举例:

CREATE TABLE MyTable ( `user_id` BIGINT, `name` STRING, -- 读取 kafka 本身自带的时间戳 `timestamp` TIMESTAMP_LTZ(3) METADATA) WITH ( 'connector' = 'kafka' ...);

如果自定义列的数据类型和 Connector 中定义的 metadata 字段的数据类型不一致的话,程序运行时会自动 cast 强转。但是这要求两种数据类型是可以强转的。举例如下:

CREATE TABLE MyTable ( `user_id` BIGINT, `name` STRING, -- 将时间戳强转为 BIGINT `timestamp` BIGINT METADATA) WITH ( 'connector' = 'kafka' ...);

默认情况下,Flink SQL planner 认为 metadata 列是可以 读取 也可以 写入 的。但是有些外部存储系统的元数据信息是只能用于读取,不能写入的。

那么在往一个表写入的场景下,我们就可以使用 VIRTUAL 关键字来标识某个元数据列不写入到外部存储中(不持久化)。

以 Kafka 举例:

CREATE TABLE MyTable ( -- sink 时会写入 `timestamp` BIGINT METADATA, -- sink 时不写入 `offset` BIGINT METADATA VIRTUAL, `user_id` BIGINT, `name` STRING,) WITH ( 'connector' = 'kafka' ...);

在上面这个案例中,Kafka 引擎的 offset 是只读的。所以我们在把 MyTable 作为数据源(输入)表时,schema 中是包含 offset 的。在把 MyTable 作为数据汇(输出)表时,schema 中是不包含 offset 的。如下:

-- 当做数据源(输入)的 schemaMyTable(`timestamp` BIGINT, `offset` BIGINT, `user_id` BIGINT, `name` STRING)-- 当做数据汇(输出)的 schemaMyTable(`timestamp` BIGINT, `user_id` BIGINT, `name` STRING)

所以这里在写入时需要注意,不要在 SQL 的 INSERT INTO 语句中写入 offset 列,否则 Flink SQL 任务会直接报错。

计算列

计算列其实就是在写建表的 DDL 时,可以拿已有的一些列经过一些自定义的运算生成的新列。这些列本身是没有以物理形式存储到数据源中的。

举例:

CREATE TABLE MyTable ( `user_id` BIGINT, `price` DOUBLE, `quantity` DOUBLE, -- cost 就是使用 price 和 quanitity 生成的计算列,计算方式为 price * quanitity `cost` AS price * quanitity,) WITH ( 'connector' = 'kafka' ...);

注意!!!计算列可以包含其他列、常量或者函数,但是不能写一个子查询进去。

小伙伴萌这时会问到一个问题,既然只能包含列、常量或者函数计算,我就直接在 DML query 代码中写就完事了呗,为啥还要专门在 DDL 中定义呢?

结论:没错,如果只是简单的四则运算的话直接写在 DML 中就可以,但是计算列一般是用于定义时间属性的(因为在 SQL 任务中时间属性只能在 DDL 中定义,不能在 DML 语句中定义)。比如要把输入数据的时间格式标准化。处理时间、事件时间分别举例如下:

注意!!!和虚拟 metadata 列是类似的,计算列也是只能读不能写的。

也就是说,我们在把 MyTable 作为数据源(输入)表时,schema 中是包含 cost 的。

在把 MyTable 作为数据汇(输出)表时,schema 中是不包含 cost 的。举例:

-- 当做数据源(输入)的 schemaMyTable(`user_id` BIGINT, `price` DOUBLE, `quantity` DOUBLE, `cost` DOUBLE)-- 当做数据汇(输出)的 schemaMyTable(`user_id` BIGINT, `price` DOUBLE, `quantity` DOUBLE)

3、定义 Watermark

Watermark 是在 Create Table 中进行定义的。具体 SQL 语法标准是 WATERMARK FOR rowtime_column_name AS watermark_strategy_expression。

其中:

rowtime_column_name:表的事件时间属性字段。该列必须是 TIMESTAMP(3)、TIMESTAMP_LTZ(3) 类,这个时间可以是一个计算列。watermark_strategy_expression:定义 Watermark 的生成策略。Watermark 的一般都是由 rowtime_column_name 列减掉一段固定时间间隔。SQL 中 Watermark 的生产策略是:当前 Watermark 大于上次发出的 Watermark 时发出当前 Watermark。

注意:如果你使用的是事件时间语义,那么必须要设设置事件时间属性和 WATERMARK 生成策略。Watermark 的发出频率:Watermark 发出一般是间隔一定时间的,Watermark 的发出间隔时间可以由 pipeline.auto-watermark-interval 进行配置,如果设置为 200ms 则每 200ms 会计算一次 Watermark,然如果比之前发出的 Watermark 大,则发出。如果间隔设为 0ms,则 Watermark 只要满足触发条件就会发出,不会受到间隔时间控制。

Flink SQL 提供了几种 WATERMARK 生产策略:

有界无序:设置方式为 WATERMARK FOR rowtime_column AS rowtime_column - INTERVAL 'string' timeUnit。此类策略就可以用于设置最大乱序时间,假如设置为 WATERMARK FOR rowtime_column AS rowtime_column - INTERVAL '5' SECOND,则生成的是运行 5s 延迟的 Watermark。。一般都用这种 Watermark 生成策略,此类 Watermark 生成策略通常用于有数据乱序的场景中,而对应到实际的场景中,数据都是会存在乱序的,所以基本都使用此类策略。 严格升序:设置方式为 WATERMARK FOR rowtime_column AS rowtime_column。一般基本不用这种方式。如果你能保证你的数据源的时间戳是严格升序的,那就可以使用这种方式。严格升序代表 Flink 任务认为时间戳只会越来越大,也不存在相等的情况,只要相等或者小于之前的,就认为是迟到的数据。 递增:设置方式为 WATERMARK FOR rowtime_column AS rowtime_column - INTERVAL '0.001' SECOND。一般基本不用这种方式。如果设置此类,则允许有相同的时间戳出现。

4、Create Table With 子句

先看一个案例:

CREATE TABLE KafkaTable ( `user_id` BIGINT, `item_id` BIGINT, `behavior` STRING, `ts` TIMESTAMP(3) METADATA FROM 'timestamp') WITH ( 'connector' = 'kafka', 'topic' = 'user_behavior', 'properties.bootstrap.servers' = 'localhost:9092', 'properties.group.id' = 'testGroup', 'scan.startup.mode' = 'earliest-offset', 'format' = 'csv')

可以看到 DDL 中 With 子句就是在建表时,描述数据源、数据汇的具体外部存储的元数据信息的。

一般 With 中的配置项由 Flink SQL 的 Connector(链接外部存储的连接器) 来定义,每种 Connector 提供的 With 配置项都是不同的。

从这里也可以看出来 With 中具体要配置哪些配置项都是和每种 Connector 决定的。

5、Create Table Like 子句

Like 子句是 Create Table 子句的一个延伸。举例:

下面定义了一张 Orders 表:

CREATE TABLE Orders ( `user` BIGINT, product STRING, order_time TIMESTAMP(3)) WITH ( 'connector' = 'kafka', 'scan.startup.mode' = 'earliest-offset');

但是忘记定义 Watermark 了,那如果想加上 Watermark,就可以用 Like 子句定义一张带 Watermark 的新表:

上面这个语句的效果就等同于:

CREATE TABLE Orders_with_watermark ( `user` BIGINT, product STRING, order_time TIMESTAMP(3), WATERMARK FOR order_time AS order_time - INTERVAL '5' SECOND ) WITH ( 'connector' = 'kafka', 'scan.startup.mode' = 'latest-offset');

不过这种不常使用。就不过多介绍了。如果小伙伴萌感兴趣,直接去官网参考具体注意事项:

版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。

上一篇:图数据科学助力精准预测,引领人工智能实现跨越发展
下一篇:讲讲Redis各个数据类型的底层数据结构
相关文章