上游sql通过drainer同步到kafka时在kafka中是什么样子的

网友投稿 568 2023-04-10

环境:上游为v5.4.1版本tidb集群,下游为2.12-2.4.1版本kafka集群,使用drainer进行同步数据

上游sql通过drainer同步到kafka时在kafka中是什么样子的

本文对上游中的ddl、dml在下游是如何体现,以及是否会对同步产生影响,做个抛砖引玉的介绍,相关测试过程如下:

drainer的配置

- host: 10.103.236.178 ssh_port: 22 port: 8239 deploy_dir: /data/tidb-deploy/drainer-8239 data_dir: /data/tidb-data/drainer-8239 log_dir: log config: syncer.db-type: kafka syncer.ignore-schemas: INFORMATION_SCHEMA,PERFORMANCE_SCHEMA,mysql,db_name syncer.to.kafka-addrs: 10.xxx.xxx.10:9092,10.xxx.xxx.11:9092,10.xxx.xxx.59:9092 syncer.to.kafka-max-messages: 1024 syncer.to.kafka-version: 2.4.1 syncer.to.topic-name: syk-test-binlog-to-kafka arch: amd64 os: linux

注意:也可以设置replicate-do-db来指定只复制哪些库,但是这个replicate-do-db参数我在测试时,是不成功的,体现在drainer这个服务根本就启动不起来。

1,上游执行create

create table moe_test ( id int(3) auto_increment not null primary key, name char(10) not null, address varchar(50) default beijing, year date );

drainer日志

[2022/08/19 11:17:06.836 +08:00] [INFO] [collector.go:285] ["start query job"] [id=105937] [binlog="tp:Commit start_ts:435389471196708868 commit_ts:435389471196708871 prewrite_key:\"mDB:5840\\000\\376\\000\\000\\000\\000\\000\\000\\000hTable:10\\3775936\\000\\000\\000\\000\\373\" ddl_query:\"create table moe_test\\n(\\n id int(3) auto_increment not null primary key,\\n name char(10) not null,\\n address varchar(50) default beijing,\\n year date\\n)\" ddl_job_id:105937 ddl_schema_state:5 "] [2022/08/19 11:17:06.863 +08:00] [INFO] [collector.go:307] ["get ddl job"] [job="ID:105937, Type:create table, State:synced, SchemaState:public, SchemaID:5840, TableID:105936, RowCount:0, ArgLen:0, start time: 2022-08-19 11:17:05.256 +0800 CST, Err:<nil>, ErrCount:0, SnapshotVersion:0"] [2022/08/19 11:17:06.973 +08:00] [INFO] [syncer.go:518] ["add ddl item to syncer, you can add this commit ts to `ignore-txn-commit-ts` to skip this ddl if needed"] [sql="create table moe_test\n(\n id int(3) auto_increment not null primary key,\n name char(10) not null,\n address varchar(50) default beijing,\n year date\n)"] ["commit ts"=435389471196708871] [shouldSkip=false] [2022/08/19 11:17:06.999 +08:00] [INFO] [sarama.go:122] ["[sarama] client/metadata fetching metadata for [[syk-test-binlog-to-kafka] 10.xxx.xxx.59:9092] from broker %!s(MISSING)\n"] [2022/08/19 11:17:07.023 +08:00] [INFO] [sarama.go:122] ["[sarama] Connected to broker at [10.xxx.xxx.59:9092] (unregistered)\n"] [2022/08/19 11:17:07.096 +08:00] [INFO] [sarama.go:122] ["[sarama] client/brokers registered new broker #[2 %!d(string=10.xxx.xxx.10:9092)] at %!s(MISSING)"] [2022/08/19 11:17:07.096 +08:00] [INFO] [sarama.go:122] ["[sarama] client/brokers registered new broker #[3 %!d(string=10.xxx.xxx.11:9092)] at %!s(MISSING)"] [2022/08/19 11:17:07.096 +08:00] [INFO] [sarama.go:122] ["[sarama] client/brokers registered new broker #[1 %!d(string=10.xxx.xxx.59:9092)] at %!s(MISSING)"] [2022/08/19 11:17:07.096 +08:00] [INFO] [client.go:902] ["[sarama] client/metadata found some partitions to be leaderless"] [2022/08/19 11:17:07.096 +08:00] [INFO] [client.go:870] ["[sarama] client/metadata retrying after 500ms... (10000 attempts remaining)\n"] [2022/08/19 11:17:07.598 +08:00] [INFO] [sarama.go:122] ["[sarama] client/metadata fetching metadata for [[syk-test-binlog-to-kafka] 10.xxx.xxx.59:9092] from broker %!s(MISSING)\n"] [2022/08/19 11:17:07.612 +08:00] [INFO] [async_producer.go:744] ["[sarama] producer/broker/2 starting up\n"] [2022/08/19 11:17:07.613 +08:00] [INFO] [async_producer.go:760] ["[sarama] producer/broker/2 state change to [open] on syk-test-binlog-to-kafka/0\n"] [2022/08/19 11:17:07.626 +08:00] [INFO] [sarama.go:122] ["[sarama] Connected to broker at [10.xxx.xxx.10:9092 %!s(int32=2)] (registered as #%!d(MISSING))\n"]

kafka中查询

小结:在drainer日志中create语句、kafka broker信息、topic都体现出来了。查看kafka的内容,发现的是create的sql语句。因此,对于create语句,不会产生大的binlog,也不会引起kafka server: Message was too large

2,上游执行insert

insert into moe_test (name,address,year) values(allen,大连一中,1976-10-10); insert into moe_test (name,address,year) values(jack,大连二中,1975-12-23); insert into moe_test (name,address,year) values(jordan,芝加哥公牛,1984-03-23);

drainer日志

[2022/08/19 12:25:57.918 +08:00] [INFO] [syncer.go:278] ["write save point"] [ts=435390554362609666] [version=130819]

kafka中查询

小结:对于insert,kafka中内容是表结构与行数据都有体现,因此,对于如果是大量insert语句,是可能产生大的binlog,也可能引起kafka server: Message was too large,也可能引起下面错误

[2022/08/18 18:16:30.214 +08:00] [INFO] [pump.go:166] ["receive big size binlog"] [size="624 MB"] [2022/08/18 18:16:30.577 +08:00] [INFO] [syncer.go:278] ["write save point"] [ts=435373417798303755] [version=202807] [2022/08/18 18:17:51.315 +08:00] [INFO] [syncer.go:278] ["write save point"] [ts=435373417811410954] [version=202807] [2022/08/18 18:17:54.373 +08:00] [INFO] [syncer.go:278] ["write save point"] [ts=435373440801177602] [version=202807] [2022/08/18 18:17:57.460 +08:00] [INFO] [syncer.go:278] ["write save point"] [ts=435373441535442957] [version=202807] [2022/08/18 18:33:27.895 +08:00] [ERROR] [syncer.go:533] ["Failed to close syncer"] [error="fail to push msg to kafka after 30s, check if kafka is up and working"] [errorVerbose="fail to push msg to kafka after 30s, check if kafka is up and working\ngith ub.com/pingcap/tidb-binlog/drainer/sync.(*KafkaSyncer).run\n\t/home/jenkins/agent/workspace/build-common/go/src/github.com/pingcap/tidb-binlog/drainer/sync/kafka.go:236\nruntime.goexit\n\t/usr/local/go/src/runtime/asm_amd64.s:1371"] [2022/08/18 18:33:27.895 +08:00] [INFO] [syncer.go:278] ["write save point"] [ts=435373669469650948] [version=202816] [2022/08/18 18:33:31.311 +08:00] [INFO] [syncer.go:278] ["write save point"] [ts=435373669469650950] [version=202816] [2022/08/18 18:33:37.896 +08:00] [INFO] [server.go:465] ["begin to close drainer server"] [2022/08/18 18:33:37.896 +08:00] [ERROR] [util.go:69] ["Recovered from panic"] [err="\"Waiting too long for `Syncer.run` to quit.\""] ["real stack"="github.com/pingcap/tidb-binlog/drainer.(*taskGroup).start.func1.1\n\t/home/jenkins/agent/workspace/build- common/go/src/github.com/pingcap/tidb-binlog/drainer/util.go:71\nruntime.gopanic\n\t/usr/local/go/src/runtime/panic.go:965\ngithub.com/pingcap/tidb-binlog/drainer.(*Syncer).run\n\t/home/jenkins/agent/workspace/build-common/go/src/github.com/pingcap/tidb- binlog/drainer/syncer.go:539\ngithub.com/pingcap/tidb-binlog/drainer.(*Syncer).Start\n\t/home/jenkins/agent/workspace/build-common/go/src/github.com/pingcap/tidb-binlog/drainer/syncer.go:151\ngithub.com/pingcap/tidb-binlog/drainer.(*Server).Start.func4\n \t/home/jenkins/agent/workspace/build-common/go/src/github.com/pingcap/tidb-binlog/drainer/server.go:290\ngithub.com/pingcap/tidb-binlog/drainer.(*taskGroup).start.func1\n\t/home/jenkins/agent/workspace/build-common/go/src/github.com/pingcap/tidb-binlog/ drainer/util.go:79"] [name=syncer] [2022/08/18 18:33:37.896 +08:00] [INFO] [util.go:76] [Exit] [name=syncer] [2022/08/18 18:33:37.927 +08:00] [INFO] [server.go:430] ["has already update status"] [id=10.xxx.xxx.59:8249] [2022/08/18 18:33:37.927 +08:00] [INFO] [server.go:469] ["commit status done"] [2022/08/18 18:33:37.927 +08:00] [INFO] [collector.go:136] ["publishBinlogs quit"] [2022/08/18 18:33:37.927 +08:00] [INFO] [util.go:76] [Exit] [name=heartbeat] [2022/08/18 18:33:37.927 +08:00] [INFO] [pump.go:77] ["pump is closing"] [id=10.xxx.xxx.xxx:8250] [2022/08/18 18:33:37.927 +08:00] [INFO] [pump.go:77] ["pump is closing"] [id=10.xxx.xxx.xxx:8250] [2022/08/18 18:33:37.927 +08:00] [INFO] [util.go:76] [Exit] [name=collect] [2022/08/18 18:33:37.927 +08:00] [INFO] [main.go:73] ["drainer exit"]

3,上游执行delete

delete from moe_test where id=1;

drainer日志

[2022/08/19 12:30:42.362 +08:00] [INFO] [sarama.go:122] ["[sarama] client/metadata fetching metadata for [[syk-test-binlog-to-kafka] 10.99.110.11:9092] from broker %!s(MISSING)\n"] [2022/08/19 12:30:45.497 +08:00] [INFO] [syncer.go:278] ["write save point"] [ts=435390629322424322] [version=130819]

kafka中查询

小结:对于delete来说,可能情况跟上面的insert一样。

4,上游执行update

update moe_test set address=xxxxxx where id=2;

drainer日志

[2022/08/19 12:35:46.485 +08:00] [INFO] [syncer.go:278] ["write save point"] [ts=435390708280459266] [version=130819]

kafka中查询

小结:update在kafka中是把更新前的值与更新的值一起体现出来,其他可能的情况跟insert一样。

5,上游执行列操作

alter table moe_test drop column year;

drainer日志

[2022/08/19 12:38:02.341 +08:00] [INFO] [collector.go:285] ["start query job"] [id=105938] [binlog="tp:Commit start_ts:435390744390795265 commit_ts:435390744403902465 prewrite_key:\"mDB:5840\\000\\376\\000\\000\\000\\000\\000\\000\\000hTable:10\\3775936\\000\\000\\000\\000\\373\" ddl_query:\"alter table moe_test drop column year\" ddl_job_id:105938 ddl_schema_state:1 "] [2022/08/19 12:38:03.347 +08:00] [INFO] [collector.go:307] ["get ddl job"] [job="ID:105938, Type:drop column, State:synced, SchemaState:queueing, SchemaID:5840, TableID:105936, RowCount:0, ArgLen:0, start time: 2022-08-19 12:38:02.056 +0800 CST, Err:<nil>, ErrCount:0, SnapshotVersion:0"] [2022/08/19 12:38:03.347 +08:00] [INFO] [schema.go:289] ["Got DeleteOnly Job"] [job="ID:105938, Type:drop column, State:synced, SchemaState:delete only, SchemaID:5840, TableID:105936, RowCount:0, ArgLen:0, start time: 2022-08-19 12:38:02.056 +0800 CST, Err:<nil>, ErrCount:0, SnapshotVersion:0"] [2022/08/19 12:38:03.347 +08:00] [INFO] [syncer.go:454] ["Syncer skips DeleteOnly DDL"] [job="ID:105938, Type:drop column, State:synced, SchemaState:delete only, SchemaID:5840, TableID:105936, RowCount:0, ArgLen:0, start time: 2022-08-19 12:38:02.056 +0800 CST, Err:<nil>, ErrCount:0, SnapshotVersion:0"] [ts=435390744403902465] [2022/08/19 12:38:07.549 +08:00] [INFO] [collector.go:285] ["start query job"] [id=105938] [binlog="tp:Commit start_ts:435390744430116865 commit_ts:435390744430116866 prewrite_key:\"mDB:5840\\000\\376\\000\\000\\000\\000\\000\\000\\000hTable:10\\3775936\\000\\000\\000\\000\\373\" ddl_query:\"alter table moe_test drop column year\" ddl_job_id:105938 "] [2022/08/19 12:38:07.551 +08:00] [INFO] [collector.go:307] ["get ddl job"] [job="ID:105938, Type:drop column, State:synced, SchemaState:queueing, SchemaID:5840, TableID:105936, RowCount:0, ArgLen:0, start time: 2022-08-19 12:38:02.056 +0800 CST, Err:<nil>, ErrCount:0, SnapshotVersion:0"] [2022/08/19 12:38:07.552 +08:00] [INFO] [schema.go:501] ["Finished dropping column"] [job="ID:105938, Type:drop column, State:synced, SchemaState:queueing, SchemaID:5840, TableID:105936, RowCount:0, ArgLen:0, start time: 2022-08-19 12:38:02.056 +0800 CST, Err:<nil>, ErrCount:0, SnapshotVersion:0"] [2022/08/19 12:38:07.563 +08:00] [INFO] [syncer.go:518] ["add ddl item to syncer, you can add this commit ts to `ignore-txn-commit-ts` to skip this ddl if needed"] [sql="alter table moe_test drop column year"] ["commit ts"=435390744430116866] [shouldSkip=false] [2022/08/19 12:38:07.577 +08:00] [INFO] [syncer.go:278] ["write save point"] [ts=435390744430116866] [version=130823] [2022/08/19 12:38:12.697 +08:00] [INFO] [syncer.go:278] ["write save point"] [ts=435390746579697665] [version=130823]

kafka中查询

小结:日志与kafka内容中都体现的是alter语句,不会产生大的binlog,也不会引起kafka server: Message was too large。

6,上游执行truncate

truncate table moe_test;

drainer日志

[2022/08/19 14:05:31.893 +08:00] [INFO] [collector.go:285] ["start query job"] [id=105940] [binlog="tp:Commit start_ts:435392119781720073 commit_ts:435392119781720075 prewrite_key:\"mDB:5840\\000\\376\\000\\000\\000\\000\\000\\000\\000hTID:1059\\37736\\000\\000\\000\\000\\000\\000\\371\" ddl_query:\"truncate table moe_test\" ddl_job_id:105940 ddl_schema_state:5 "] [2022/08/19 14:05:31.897 +08:00] [INFO] [collector.go:307] ["get ddl job"] [job="ID:105940, Type:truncate table, State:synced, SchemaState:public, SchemaID:5840, TableID:105936, RowCount:0, ArgLen:0, start time: 2022-08-19 14:05:28.806 +0800 CST, Err:<nil>, ErrCount:0, SnapshotVersion:0"] [2022/08/19 14:05:31.926 +08:00] [INFO] [syncer.go:518] ["add ddl item to syncer, you can add this commit ts to `ignore-txn-commit-ts` to skip this ddl if needed"] [sql="truncate table moe_test"] ["commit ts"=435392119781720075] [shouldSkip=false] [2022/08/19 14:05:31.926 +08:00] [INFO] [async_producer.go:1011] ["[sarama] producer/broker/2 state change to [closing] because kafka: broker not connected\n"] [2022/08/19 14:05:31.926 +08:00] [INFO] [async_producer.go:611] ["[sarama] producer/leader/syk-test-binlog-to-kafka/0 state change to [retrying-1]\n"] [2022/08/19 14:05:31.926 +08:00] [INFO] [async_producer.go:621] ["[sarama] producer/leader/syk-test-binlog-to-kafka/0 abandoning broker 2\n"] [2022/08/19 14:05:31.926 +08:00] [INFO] [async_producer.go:750] ["[sarama] producer/broker/2 input chan closed\n"] [2022/08/19 14:05:31.926 +08:00] [INFO] [async_producer.go:843] ["[sarama] producer/broker/2 shut down\n"] [2022/08/19 14:05:32.427 +08:00] [INFO] [sarama.go:122] ["[sarama] client/metadata fetching metadata for [[syk-test-binlog-to-kafka] 10.99.110.10:9092] from broker %!s(MISSING)\n"] [2022/08/19 14:05:32.441 +08:00] [INFO] [async_producer.go:744] ["[sarama] producer/broker/2 starting up\n"] [2022/08/19 14:05:32.441 +08:00] [INFO] [async_producer.go:760] ["[sarama] producer/broker/2 state change to [open] on syk-test-binlog-to-kafka/0\n"] [2022/08/19 14:05:32.441 +08:00] [INFO] [async_producer.go:594] ["[sarama] producer/leader/syk-test-binlog-to-kafka/0 selected broker 2\n"] [2022/08/19 14:05:32.441 +08:00] [INFO] [async_producer.go:627] ["[sarama] producer/leader/syk-test-binlog-to-kafka/0 state change to [flushing-1]\n"] [2022/08/19 14:05:32.441 +08:00] [INFO] [async_producer.go:649] ["[sarama] producer/leader/syk-test-binlog-to-kafka/0 state change to [normal]\n"] [2022/08/19 14:05:32.454 +08:00] [INFO] [syncer.go:278] ["write save point"] [ts=435392119781720075] [version=130824]

kafka中查询

小结:日志与kafka内容中都体现的是truncate语句,不会产生大的binlog,也不会引起kafka server: Message was too large。

7,上游执行drop

drop table syk_test;

drainer日志

[2022/08/19 14:10:03.681 +08:00] [INFO] [collector.go:285] ["start query job"] [id=105941] [binlog="tp:Commit start_ts:435392191333924865 commit_ts:435392191347032067 prewrite_key:\"mDB:5840\\000\\376\\000\\000\\000\\000\\000\\000\\000hTID:4603\\3777\\000\\000\\000\\000\\000\\000\\000\\370\" ddl_query:\"drop table syk_test\" ddl_job_id:105941 "] [2022/08/19 14:10:03.685 +08:00] [INFO] [collector.go:307] ["get ddl job"] [job="ID:105941, Type:drop table, State:synced, SchemaState:queueing, SchemaID:5840, TableID:46037, RowCount:0, ArgLen:0, start time: 2022-08-19 14:10:01.606 +0800 CST, Err:<nil>, ErrCount:0, SnapshotVersion:0"] [2022/08/19 14:10:03.685 +08:00] [INFO] [syncer.go:518] ["add ddl item to syncer, you can add this commit ts to `ignore-txn-commit-ts` to skip this ddl if needed"] [sql="drop table syk_test"] ["commit ts"=435392191347032067] [shouldSkip=false] [2022/08/19 14:10:03.700 +08:00] [INFO] [syncer.go:278] ["write save point"] [ts=435392191347032067] [version=130827] [2022/08/19 14:10:09.786 +08:00] [INFO] [syncer.go:278] ["write save point"] [ts=435392192985169921] [version=130827]

kafka中查询

小结:日志与kafka内容中都体现的是drop语句,不会产生大的binlog,也不会引起kafka server: Message was too large。

根据上面测试可以得出结论:

1,只有dml才可能造成大的binlog。

2,大事务可能引起kafka server: Message was too large,此时可以调整下游kafka中参数(参考https://docs.pingcap.com/zh/tidb/v4.0/handle-tidb-binlog-errors#drainer-%E5%90%8C%E6%AD%A5%E6%95%B0%E6%8D%AE%E5%88%B0-kafka-%E6%97%B6%E6%8A%A5%E9%94%99-kafka-server-message-was-too-large-server-rejected-it-to-avoid-allocation-error),但更应该考虑上游业务,是否可以把大事务拆成多个小事务。

3,大binlog会引起下面错误(我们反复测试后发现binlog在500M-1G时会引起)。

[2022/08/18 18:33:27.895 +08:00] [ERROR] [syncer.go:533] ["Failed to close syncer"] [error="fail to push msg to kafka after 30s, check if kafka is up and working"] [errorVerbose="fail to push msg to kafka after 30s, check if kafka is up and working\ngith ub.com/pingcap/tidb-binlog/drainer/sync.(*KafkaSyncer).run\n\t/home/jenkins/agent/workspace/build-common/go/src/github.com/pingcap/tidb-binlog/drainer/sync/kafka.go:236\nruntime.goexit\n\t/usr/local/go/src/runtime/asm_amd64.s:1371"] [2022/08/18 18:33:27.895 +08:00] [INFO] [syncer.go:278] ["write save point"] [ts=435373669469650948] [version=202816] [2022/08/18 18:33:31.311 +08:00] [INFO] [syncer.go:278] ["write save point"] [ts=435373669469650950] [version=202816] [2022/08/18 18:33:37.896 +08:00] [INFO] [server.go:465] ["begin to close drainer server"] [2022/08/18 18:33:37.896 +08:00] [ERROR] [util.go:69] ["Recovered from panic"] [err="\"Waiting too long for `Syncer.run` to quit.\""] ["real stack"="github.com/pingcap/tidb-binlog/drainer.(*taskGroup).start.func1.1\n\t/home/jenkins/agent/workspace/build- common/go/src/github.com/pingcap/tidb-binlog/drainer/util.go:71\nruntime.gopanic\n\t/usr/local/go/src/runtime/panic.go:965\ngithub.com/pingcap/tidb-binlog/drainer.(*Syncer).run\n\t/home/jenkins/agent/workspace/build-common/go/src/github.com/pingcap/tidb- binlog/drainer/syncer.go:539\ngithub.com/pingcap/tidb-binlog/drainer.(*Syncer).Start\n\t/home/jenkins/agent/workspace/build-common/go/src/github.com/pingcap/tidb-binlog/drainer/syncer.go:151\ngithub.com/pingcap/tidb-binlog/drainer.(*Server).Start.func4\n \t/home/jenkins/agent/workspace/build-common/go/src/github.com/pingcap/tidb-binlog/drainer/server.go:290\ngithub.com/pingcap/tidb-binlog/drainer.(*taskGroup).start.func1\n\t/home/jenkins/agent/workspace/build-common/go/src/github.com/pingcap/tidb-binlog/ drainer/util.go:79"] [name=syncer] [2022/08/18 18:33:37.896 +08:00] [INFO] [util.go:76] [Exit] [name=syncer] [2022/08/18 18:33:37.927 +08:00] [INFO] [server.go:430] ["has already update status"] [id=10.xxx.xxx.59:8249] [2022/08/18 18:33:37.927 +08:00] [INFO] [server.go:469] ["commit status done"] [2022/08/18 18:33:37.927 +08:00] [INFO] [collector.go:136] ["publishBinlogs quit"] [2022/08/18 18:33:37.927 +08:00] [INFO] [util.go:76] [Exit] [name=heartbeat] [2022/08/18 18:33:37.927 +08:00] [INFO] [pump.go:77] ["pump is closing"] [id=10.xxx.xxx.xxx:8250] [2022/08/18 18:33:37.927 +08:00] [INFO] [pump.go:77] ["pump is closing"] [id=10.xxx.xxx.xxx:8250] [2022/08/18 18:33:37.927 +08:00] [INFO] [util.go:76] [Exit] [name=collect] [2022/08/18 18:33:37.927 +08:00] [INFO] [main.go:73] ["drainer exit"]

#end

版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。

上一篇:pd-ctl 选项 --jq 格式化语法使用案例详解
下一篇:TiFlash Proxy 模块介绍
相关文章