Flink SQL 知其所以然:SQL 数据类型大全!

网友投稿 1336 2023-06-15

Flink SQL 知其所以然:SQL 数据类型大全!

Flink SQL 知其所以然:SQL 数据类型大全!

SQL 数据类型

在介绍完一些基本概念之后,我们来认识一下,Flink SQL 中的数据类型。

Flink SQL 内置了很多常见的数据类型,并且也为用户提供了自定义数据类型的能力。

总共包含 3 部分:

原子数据类型。 复合数据类型。用户自定义数据类型。

一、原子数据类型

1、字符串类型:

CHAR、CHAR(n):定长字符串,就和 Java 中的 Char 一样,n 代表字符的定长,取值范围 [1, 2,147,483,647]。如果不指定 n,则默认为 1。 VARCHAR、VARCHAR(n)、STRING:可变长字符串,就和 Java 中的 String 一样,n 代表字符的最大长度,取值范围 [1, 2,147,483,647]。如果不指定 n,则默认为 1。STRING 等同于 VARCHAR(2147483647)。

2、二进制字符串类型:

BINARY、BINARY(n):定长二进制字符串,n 代表定长,取值范围 [1, 2,147,483,647]。如果不指定 n,则默认为 1。VARBINARY、VARBINARY(n)、BYTES:可变长二进制字符串,n 代表字符的最大长度,取值范围 [1, 2,147,483,647]。如果不指定 n,则默认为 1。BYTES 等同于 VARBINARY(2147483647)。

3、 精确数值类型:

DECIMAL、DECIMAL(p)、DECIMAL(p, s)、DEC、DEC(p)、DEC(p, s)、NUMERIC、NUMERIC(p)、NUMERIC(p, s):固定长度和精度的数值类型,就和 Java 中的 BigDecima一样,p 代表数值位数(长度),取值范围 [1, 38];s 代表小数点后的位数(精度),取值范围 [0, p]。如果不指定,p 默认为 10,s 默认为 0。 TINYINT:-128 到 127 的 1 字节大小的有符号整数,就和 Java 中的 byte 一样。 SMALLINT:-32,768 to 32,767 的 2 字节大小的有符号整数,就和 Java 中的 short 一样。 INT、INTEGER:-2,147,483,648 to 2,147,483,647 的 4 字节大小的有符号整数,就和 Java 中的 int 一样。 BIGINT:-9,223,372,036,854,775,808 to 9,223,372,036,854,775,807 的 8 字节大小的有符号整数,就和 Java 中的 long 一样。

4、有损精度数值类型:

5、布尔类型:BOOLEAN。

6、NULL 类型:NULL。

7、Raw 类型:RAW('class', 'snapshot') 。只会在数据发生网络传输时进行序列化,反序列化操作,可以保留其原始数据。以 Java 举例,class 参数代表具体对应的 Java 类型,snapshot 代表类型在发生网络传输时的序列化器。

8、日期、时间类型:

DATE:由 年-月-日 组成的 不带时区含义 的日期类型,取值范围 [0000-01-01, 9999-12-31] TIME、TIME(p):由 小时:分钟:秒[.小数秒] 组成的 不带时区含义 的的时间的数据类型,精度高达纳秒,取值范围 [00:00:00.000000000到23:59:59.9999999]。其中 p 代表小数秒的位数,取值范围 [0, 9],如果不指定 p,默认为 0。 TIMESTAMP、TIMESTAMP(p)、TIMESTAMP WITHOUT TIME ZONE、TIMESTAMP(p) WITHOUT TIME ZONE:由 年-月-日 小时:分钟:秒[.小数秒] 组成的 不带时区含义 的时间类型,取值范围 [0000-01-01 00:00:00.000000000, 9999-12-31 23:59:59.999999999]。其中 p 代表小数秒的位数,取值范围 [0, 9],如果不指定 p,默认为 6。 TIMESTAMP WITH TIME ZONE、TIMESTAMP(p) WITH TIME ZONE:由 年-月-日 小时:分钟:秒[.小数秒] 时区 组成的 带时区含义 的时间类型,取值范围 [0000-01-01 00:00:00.000000000 +14:59, 9999-12-31 23:59:59.999999999 -14:59]。其中 p 代表小数秒的位数,取值范围 [0, 9],如果不指定 p,默认为 6。 TIMESTAMP_LTZ、TIMESTAMP_LTZ(p):由 年-月-日 小时:分钟:秒[.小数秒] 时区 组成的 带时区含义 的时间类型,取值范围 [0000-01-01 00:00:00.000000000 +14:59, 9999-12-31 23:59:59.999999999 -14:59]。其中 p 代表小数秒的位数,取值范围 [0, 9],如果不指定 p,默认为 6。 TIMESTAMP_LTZ 与 TIMESTAMP WITH TIME ZONE 的区别在于:TIMESTAMP WITH TIME ZONE 的时区信息是携带在数据中的,举例:其输入数据应该是 2022-01-01 00:00:00.000000000 +08:00;TIMESTAMP_LTZ 的时区信息不是携带在数据中的,而是由 Flink SQL 任务的全局配置决定的,我们可以由 table.local-time-zone 参数来设置时区。 INTERVAL YEAR TO MONTH、 INTERVAL DAY TO SECOND:interval 的涉及到的种类比较多。INTERVAL 主要是用于给 TIMESTAMP、TIMESTAMP_LTZ 添加偏移量的。举例,比如给 TIMESTAMP 加、减几天、几个月、几年。INTERVAL 子句总共涉及到的语法种类如下 Flink SQL 案例所示。

CREATE TABLE sink_table ( result_interval_year TIMESTAMP(3), result_interval_year_p TIMESTAMP(3), result_interval_year_p_to_month TIMESTAMP(3), result_interval_month TIMESTAMP(3), result_interval_day TIMESTAMP(3), result_interval_day_p1 TIMESTAMP(3), result_interval_day_p1_to_hour TIMESTAMP(3), result_interval_day_p1_to_minute TIMESTAMP(3), result_interval_day_p1_to_second_p2 TIMESTAMP(3), result_interval_hour TIMESTAMP(3), result_interval_hour_to_minute TIMESTAMP(3), result_interval_hour_to_second TIMESTAMP(3), result_interval_minute TIMESTAMP(3), result_interval_minute_to_second_p2 TIMESTAMP(3), result_interval_second TIMESTAMP(3), result_interval_second_p2 TIMESTAMP(3)) WITH ( 'connector' = 'print');INSERT INTO sink_tableSELECT -- Flink SQL 支持的所有 INTERVAL 子句如下,总体可以分为 `年-月`、`日-小时-秒` 两种 -- 1. 年-月。取值范围为 [-9999-11, +9999-11],其中 p 是指有效位数,取值范围 [1, 4],默认值为 2。比如如果值为 1000,但是 p = 2,则会直接报错。 -- INTERVAL YEAR f1 + INTERVAL '10' YEAR as result_interval_year -- INTERVAL YEAR(p) , f1 + INTERVAL '100' YEAR(3) as result_interval_year_p -- INTERVAL YEAR(p) TO MONTH , f1 + INTERVAL '10-03' YEAR(3) TO MONTH as result_interval_year_p_to_month -- INTERVAL MONTH , f1 + INTERVAL '13' MONTH as result_interval_month -- 2. 日-小时-秒。取值范围为 [-999999 23:59:59.999999999, +999999 23:59:59.999999999],其中 p1\p2 都是有效位数,p1 取值范围 [1, 6],默认值为 2;p2 取值范围 [0, 9],默认值为 6 -- INTERVAL DAY , f1 + INTERVAL '10' DAY as result_interval_day -- INTERVAL DAY(p1) , f1 + INTERVAL '100' DAY(3) as result_interval_day_p1 -- INTERVAL DAY(p1) TO HOUR , f1 + INTERVAL '10 03' DAY(3) TO HOUR as result_interval_day_p1_to_hour -- INTERVAL DAY(p1) TO MINUTE , f1 + INTERVAL '10 03:12' DAY(3) TO MINUTE as result_interval_day_p1_to_minute -- INTERVAL DAY(p1) TO SECOND(p2) , f1 + INTERVAL '10 00:00:00.004' DAY TO SECOND(3) as result_interval_day_p1_to_second_p2 -- INTERVAL HOUR , f1 + INTERVAL '10' HOUR as result_interval_hour -- INTERVAL HOUR TO MINUTE , f1 + INTERVAL '10:03' HOUR TO MINUTE as result_interval_hour_to_minute -- INTERVAL HOUR TO SECOND(p2) , f1 + INTERVAL '00:00:00.004' HOUR TO SECOND(3) as result_interval_hour_to_second -- INTERVAL MINUTE , f1 + INTERVAL '10' MINUTE as result_interval_minute -- INTERVAL MINUTE TO SECOND(p2) , f1 + INTERVAL '05:05.006' MINUTE TO SECOND(3) as result_interval_minute_to_second_p2 -- INTERVAL SECOND , f1 + INTERVAL '3' SECOND as result_interval_second -- INTERVAL SECOND(p2) , f1 + INTERVAL '300' SECOND(3) as result_interval_second_p2FROM (SELECT TO_TIMESTAMP_LTZ(1640966476500, 3) as f1)

二、复合数据类型

数组类型:ARRAY、t ARRAY。数组最大长度为 2,147,483,647。t 代表数组内的数据类型。举例 ARRAY、ARRAY,其等同于 INT ARRAY、STRING ARRAY。 Map 类型:MAP。Map 类型就和 Java 中的 Map 类型一样,key 是没有重复的。举例 Map、Map。集合类型:MULTISET、t MULTISET。就和 Java 中的 List 类型,一样,运行重复的数据。举例 MULTISET,其等同于 INT MULTISET。 对象类型:ROW、ROW、ROW(n0 t0, n1 t1, ...>、ROW(n0 t0 'd0', n1 t1 'd1', ...)。就和 Java 中的自定义对象一样。举例:ROW(myField INT, myOtherField BOOLEAN),其等同于 ROW。

三、用户自定义数据类型

用户自定义类型就是运行用户使用 Java 等语言自定义一个数据类型出来。但是目前数据类型不支持使用 CREATE TABLE 的 DDL 进行定义,只支持作为函数的输入输出参数。如下案例:

第一步,自定义数据类型

第二步,在 UDF 中使用此数据类型

public class UserScalarFunction extends ScalarFunction { // 1. 自定义数据类型作为输出参数 public User eval(long i) { if (i > 0 && i <= 5) { User u = new User(); u.age = (int) i; u.name = "name1"; u.totalBalance = new BigDecimal(1.1d); return u; } else { User u = new User(); u.age = (int) i; u.name = "name2"; u.totalBalance = new BigDecimal(2.2d); return u; } } // 2. 自定义数据类型作为输入参数 public String eval(User i) { if (i.age > 0 && i.age <= 5) { User u = new User(); u.age = 1; u.name = "name1"; u.totalBalance = new BigDecimal(1.1d); return u.name; } else { User u = new User(); u.age = 2; u.name = "name2"; u.totalBalance = new BigDecimal(2.2d); return u.name; } }}

第三步,在 Flink SQL 中使用

-- 1. 创建 UDFCREATE FUNCTION user_scalar_func AS 'flink.examples.sql._12_data_type._02_user_defined.UserScalarFunction';-- 2. 创建数据源表CREATE TABLE source_table ( user_id BIGINT NOT NULL COMMENT '用户 id') WITH ( 'connector' = 'datagen', 'rows-per-second' = '1', 'fields.user_id.min' = '1', 'fields.user_id.max' = '10');-- 3. 创建数据汇表CREATE TABLE sink_table ( result_row_1 ROW, result_row_2 STRING) WITH ( 'connector' = 'print');-- 4. SQL 查询语句INSERT INTO sink_tableselect -- 4.a. 用户自定义类型作为输出 user_scalar_func(user_id) as result_row_1, -- 4.b. 用户自定义类型作为输出及输入 user_scalar_func(user_scalar_func(user_id)) as result_row_2from source_table;-- 5. 查询结果+I[+I[9, name2, 2.20], name2]+I[+I[1, name1, 1.10], name1]+I[+I[5, name1, 1.10], name1]

版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。

上一篇:你知道Hive统计函数count(*)为什么不走MR吗?
下一篇:亿级数据自助应用,京东物流基于Doris实现高性能秒级分析
相关文章