实时计算Flink集成开源连接器-TiDB CDC Connector的实操

Tiit 682 2024-03-25

TIDB部署(***ECS)

1、系统配置

TIDB官方建议使用CentOS7.3及以上版本:

             Linux 操作系统                                      版本                        
Red Hat Enterprise Linux7.3 及以上
CentOS7.3 及以上

本次实验我们选择CentOS 7.6 64位,考虑网络连通,需将TIDB ECS实例与Flink集群部署在相同VPC网络。

image.png

2、TIDB部署

a、下载并安装 TiUP

curl --proto '=https' --tlsv1.2 -sSf https://tiup-mirrors.pingcap.com/install.sh | sh

image.png

b. 安装 TiUP 的 cluster 组件

声明全局的环境变量,不然找不到tiup命令:

source .bash_profile

执行安装cluster命令:

tiup cluster

image.png

c. 调大 sshd 服务的连接数限制(可选)

#调大sshd服务的连接数限制cat /etc/ssh/sshd_config | grep -n MaxSessions
vim +41 /etc/ssh/sshd_config#改完后重启sshdsystemctl restart sshd.service

d. 创建并启动集群

按配置模板,编辑配置文件(当前目录vim编辑后保存即可),命名为 topo.yaml,其中:

  • host:部署TIDB的服务器i(ECS内网IP)

  • ssh_port默认是22

  • 官方文件的tikv_servers是3个节点,这里设置成了只有1个节点,原因是本地配置多个节点时只有1个节点能启动成功

模板如下:

global:
 user: "tidb"
 ssh_port: 22
 deploy_dir: "/tidb-deploy"
 data_dir: "/tidb-data"monitored:
 node_exporter_port: 9100
 blackbox_exporter_port: 9115server_configs:
 tidb:
   log.slow-threshold: 300
 tikv:
   readpool.storage.use-unified-pool: false
   readpool.coprocessor.use-unified-pool: true
 pd:
   replication.enable-placement-rules: true
   replication.location-labels: ["host"]
 tiflash:
   logger.level: "info"pd_servers:
 - host: 192.168.0.1

tidb_servers:
 - host: 192.168.0.1

tikv_servers:
 - host: 192.168.0.1
   port: 20160
   status_port: 20180
   config:
     server.labels: { host: "laomo-talkservice-tidb-1" }tiflash_servers:
 - host: 192.168.0.1

部署集群:

# cluster-name:集群名称# tidb-version:TiDB版本号,可以通过tiup list tidb这个命令来查看tiup cluster deploy <cluster-name> <tidb-version> ./topo.yaml --user root -p <pwd># 这里使用 v6.0.0,集群名称叫mytidb-cluster来部署tiup cluster deploy mytidb-cluster v6.0.0 ./topo.yaml --user root -p P...D

image.png

启动集群:

#启动刚才部署的集群,集群名称为mytidb-clustertiup cluster start mytidb-cluster

image.png

e. 开启CDC(重要:CDC不开启无法捕获数据变更。另外,本示例单节点,多节点集群需要配置节点访问免密

# 创建scale-out.yaml文件添加 TiCDC 节点信息cdc_servers:
  - host: 192.168.0.1
    gc-ttl: 86400# TiUP运行scale-out命令tiup cluster scale-out mytidb-cluster scale-out.yaml#TiUP 停止和启动 TiCDCtiup cluster stop -R cdc
tiup cluster start -R cdc
tiup cluster restart -R cdc

f. 访问TiDB

在ECS上安装 MySQL 客户端(通过MySQL客户端访问TiDB):

yum -y install mysql

访问TiDB,密码默认为空

mysql -h 192.168.0.1 -P 4000 -u root

image.png

Flink TiDB CDC

TiDB CDC

TiDB CDC connector 是一个 Flink Source connector,它会先读取数据库快照,然后在发生故障时以exactly-once 处理继续读取更改事件,支持exactly-once语义

兼容版本

TiDB CDC Version

Flink Version

TiDB Version

2.2.*

1.13.*, 1.14.*

TiDB: 5.1.x, 5.2.x, 5.3.x, 5.4.x, 6.0.0

2.3.*

1.13.*, 1.14.*, 1.15.*, 1.16.0

TiDB: 5.1.x, 5.2.x, 5.3.x, 5.4.x, 6.0.0

参数配置

参数说明默认值数据类型
connector固定值:tidb-cdcnoneString
database-nameTiDB 服务器的数据库名称noneString
table-nameTiDB 服务器的表名称noneString
scan.startup.mode消费者启动模式,直接读取其底层 TiKV 存储中的 全量数据和增量数据实现数据捕获,读取具体枚举如下: initial:在第一次启动时,会先扫描历史全量数据, 其中全量部分是通过按 key 划分 range进行读取 latest-offset:不会扫描历史全量数据, 增量部分使用 TiDB 提供的 CDC Client 获取增量变更数据initialString
pd-addressesTiKV 集群的 PD 地址noneString
tikv.grpc.timeout_in_ms以毫秒为单位的 TiKV GRPC 超时noneLong
tikv.grpc.scan_timeout_in_ms以毫秒为单位的 TiKV GRPC 扫描超时noneLong
tikv.batch_get_concurrencyTiKV GRPC并发20Integer
tikv.*TiDB 客户端的属性noneString

集成TiDB连接器

实时计算Flink版是一套基于Apache Flink构建的⼀站式实时大数据分析平台,支持作业开发、数据调试、运行与监控、自动调优、智能诊断等全生命周期能力。本文案例基于***实时计算Flink版进行演示。

1、版本准备

兼容版本可以参考Flink TiDB CDC小结介绍,本次实验选型如下:

❏ Flink全托管版本:vvr-6.0.4-flink-1.15

❏ TiDB版本:v6.0.0

❏ TiDB CDC Connector:flink-connector-tidb-cdc-2.3.0

2、集群创建

登录***控制台,创建全托管VVP集群,注意集群部署网络环境与TiDB保持一致(同一VPC):

image.png

3、注册TiDB连接器

image.png

4创建Session集群

创建一个vvr-6.0.4-flink-1.15版本的Session集群,创建后并启动:

image.png

5、基于Flink SQL开发作业

注意:代码里没有配置scan.startup.mode参数,默认值initial:在第一次启动时,会先扫描历史全量数据(后续会演示相关效果)

CREATE TEMPORARY TABLE students_tidb_source (
  db_name STRING METADATA FROM 'database_name' VIRTUAL,
  table_name STRING METADATA  FROM 'table_name' VIRTUAL,
  operation_ts TIMESTAMP_LTZ(3) METADATA FROM 'op_ts' VIRTUAL,
  id INT,
  `name` STRING,
  score INT,
  PRIMARY KEY(id) NOT ENFORCED) WITH (
  'connector' = 'tidb-cdc',
  'tikv.grpc.timeout_in_ms' = '20000',
  'pd-addresses' = '192.168.0.1:2379',
  'database-name' = 'tidb_source',
  'table-name' = 'student');CREATE TEMPORARY TABLE students_print_sink (
  `db_name` STRING,
  `table_name` STRING,
  operation_ts TIMESTAMP_LTZ(3),
  id INT,
  `name` STRING,
  score INT) WITH (
  'connector' = 'print',
  'limit' = '100',
  'logger'='true');insert into students_print_sinkselect * from students_tidb_source;

6、TiDB数据准备

运行作业前,先添加一条数据:

CREATE DATABASE  tidb_source;use  tidb_source;--创建student表CREATE TABLE student(
  id int NOT NULL ,
  name varchar(100) NULL ,
  score int NULL ,
  primary key(id));--插入数据insert into student values(1,"lincloud",100);select * from student;

image.png

7、验证测试

启动作业,测试


版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。

上一篇:TiDB的数据对比工具sync-diff-inspector实践指南
下一篇:通过TiOperator备份数据到共享存储的方法
相关文章