Flink SQL 知其所以然:SQL 的时间语义!

网友投稿 501 2023-06-15

Flink SQL 知其所以然:SQL 的时间语义!

Flink SQL 知其所以然:SQL 的时间语义!

SQL 的时间语义

hello,我是老羊,今天跟着老羊的思路学习 Flink SQL 的时间语义:

与离线处理中常见的时间分区字段一样,在实时处理中,时间属性也是一个核心概念。Flink 支持 处理时间、事件时间、摄入时间 三种时间语义。下文会分别介绍三种时间语义的应用场景及案例。三种时间在生产环境的使用频次 事件时间(SQL 常用) > 处理时间(SQL 几乎不用,DataStream 少用) > 摄入时间(不用)

一、Flink 三种时间属性简介

time

事件时间:指的是数据本身携带的时间,这个时间是在事件产生时的时间,而且在 Flink SQL 触发计算时,也使用数据本身携带的时间。这就叫做 事件时间。目前生产环境中用的最多。 处理时间:指的是具体算子计算数据执行时的机器时间(例如在算子中 Java 取 System.currentTimeMillis()) ),在生产环境中用的次多。 摄入时间:指的是数据从数据源进入 Flink 的时间。摄入时间用的最少,可以说基本不使用。

小伙伴萌要注意到:

上述的三种时间概念不是由于有了数据而诞生的,而是有了 Flink 之后根据实际的应用场景而诞生的。以事件时间举个例子,如果只是数据携带了时间,Flink 也消费了这个数据,但是在 Flink 中没有使用数据的这个时间作为计算的触发条件,也不能把这个 Flink 任务叫做事件时间的任务。 其次,要认识到,一般一个 Flink 任务只会有一个时间属性,所以时间属性通常认为是一个任务粒度的。举例:我们可以说 A 任务是事件时间语义的任务,B 任务是处理时间语义的任务。当然了,一个任务也可以存在多个时间属性。

二、Flink 三种时间属性的应用场景

讲到这里,xdm 会问,博主上面写的 3 种时间属性到底对我们的任务有啥影响呢?3 种时间属性的应用场景是啥?

先说结论,在 Flink 中时间的作用:

主要体现在包含时间窗口的计算中:用于标识任务的时间进度,来判断是否需要触发窗口的计算。比如常用的滚动窗口、滑动窗口等都需要时间推动触发。这些窗口的应用场景后续会详细介绍。 次要体现在自定义时间语义的计算中:举个例子,比如用户可以自定义每隔 10s 的本地时间,或者消费到的数据的时间戳每增大 10s,就把计算结果输出一次,时间在此类应用中也是一种标识任务进度的作用。

博主以 滚动窗口 的聚合任务为例来介绍一下事件时间和处理时间的对比区别。

1. 事件时间案例:还是以之前的 clicks 表拿来举例。

tumble window

后续 Flink SQL 任务在运行的过程中也会实际按照 cTime 的当前时间作为一小时窗口结束触发条件并计算一个小时窗口内的数据。

2.处理时间案例:还是以之前的 clicks 表拿来举例。

还是上面那个案例,但是这次需求方不需要按照数据上的时间戳划分数据(划分滚动窗口),只需要数据来了之后, 在 Flink 机器上的时间作为一小时窗口结束的书法条件并计算。

那么这种触发机制就是处理时间。

3. 摄入时间案例:在 Flink 从外部数据源读取到数据时,给这条数据带上的当前数据源算子的本地时间戳。下游可以用这个时间戳进行窗口聚合,不过这种几乎不使用。

三、SQL 指定时间属性的两种方式

那么来看看 Flink SQL 为我们提供的两种指定时间戳的方式:

CREATE TABLE DDL 创建表的时候指定。 可以在 DataStream 中指定,在后续的 DataStream 转的 Table 中使用。

一旦时间属性定义好,它就可以像普通列一样使用,也可以在时间相关的操作中使用。

四、SQL 事件时间案例

来看看 Flink 中如何指定事件时间。

1. CREATE TABLE DDL 指定时间戳的方式。

从上面这条语句可以看到,如果想使用事件时间,那么我们的时间戳类型必须是 TIMESTAMP 或者 TIMESTAMP_LTZ 类型。很多小伙伴会想到,我们的时间戳一般不都是秒或者是毫秒(BIGINT 类型)嘛,那这种情况怎么办?

解决方案必须要有啊。如下。

2.DataStream 中指定事件时间。

之前介绍了 Table 和 DataStream 可以互转,那么 Flink 也提供了一个能力,就是在 Table 转为 DataStream 时,指定时间戳字段。如下案例:

public class DataStreamSourceEventTimeTest { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration()); EnvironmentSettings settings = EnvironmentSettings .newInstance() .useBlinkPlanner() .inStreamingMode() .build(); StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, settings); // 1. 分配 watermark DataStream r = env.addSource(new UserDefinedSource()) .assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor(Time.minutes(0L)) { @Override public long extractTimestamp(Row element) { return (long) element.getField("f2"); } }); // 2. 使用 f2.rowtime 的方式将 f2 字段指为事件时间时间戳 Table sourceTable = tEnv.fromDataStream(r, "f0, f1, f2.rowtime"); tEnv.createTemporaryView("source_table", sourceTable); // 3. 在 tumble window 中使用 f2 String tumbleWindowSql = "SELECT TUMBLE_START(f2, INTERVAL '5' SECOND), COUNT(DISTINCT f0)\n" + "FROM source_table\n" + "GROUP BY TUMBLE(f2, INTERVAL '5' SECOND)" ; Table resultTable = tEnv.sqlQuery(tumbleWindowSql); tEnv.toDataStream(resultTable, Row.class).print(); env.execute(); } private static class UserDefinedSource implements SourceFunction, ResultTypeQueryable { private volatile boolean isCancel; @Override public void run(SourceContext sourceContext) throws Exception { int i = 0; while (!this.isCancel) { sourceContext.collect(Row.of("a" + i, "b", System.currentTimeMillis())); Thread.sleep(10L); i++; } } @Override public void cancel() { this.isCancel = true; } @Override public TypeInformation getProducedType() { return new RowTypeInfo(TypeInformation.of(String.class), TypeInformation.of(String.class), TypeInformation.of(Long.class)); } }}

五、SQL 处理时间案例

来看看 Flink SQL 中如何指定处理时间。

1.CREATE TABLE DDL 指定时间戳的方式。

⭐ DataStream 中指定处理时间。

public class DataStreamSourceProcessingTimeTest { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration()); EnvironmentSettings settings = EnvironmentSettings .newInstance() .useBlinkPlanner() .inStreamingMode() .build(); StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, settings); // 1. 分配 watermark DataStream r = env.addSource(new UserDefinedSource()); // 2. 使用 proctime.proctime 的方式将 f2 字段指为处理时间时间戳 Table sourceTable = tEnv.fromDataStream(r, "f0, f1, f2, proctime.proctime"); tEnv.createTemporaryView("source_table", sourceTable); // 3. 在 tumble window 中使用 f2 String tumbleWindowSql = "SELECT TUMBLE_START(proctime, INTERVAL '5' SECOND), COUNT(DISTINCT f0)\n" + "FROM source_table\n" + "GROUP BY TUMBLE(proctime, INTERVAL '5' SECOND)" ; Table resultTable = tEnv.sqlQuery(tumbleWindowSql); tEnv.toDataStream(resultTable, Row.class).print(); env.execute(); } private static class UserDefinedSource implements SourceFunction, ResultTypeQueryable { private volatile boolean isCancel; @Override public void run(SourceContext sourceContext) throws Exception { int i = 0; while (!this.isCancel) { sourceContext.collect(Row.of("a" + i, "b", System.currentTimeMillis())); Thread.sleep(10L); i++; } } @Override public void cancel() { this.isCancel = true; } @Override public TypeInformation getProducedType() { return new RowTypeInfo(TypeInformation.of(String.class), TypeInformation.of(String.class), TypeInformation.of(Long.class)); } }}

版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。

上一篇:细数 Mycat 中的那些坑
下一篇:假如数据库运维拥有了可观测性能力……
相关文章