黄东旭解析 TiDB 的核心优势
827
2023-06-13
一篇带给你跨数据源实现数据同步
场景
在微服务拆分的架构中,各服务拥有自己的数据库,所以常常会遇到服务之间数据通信的问题。比如,B服务数据库的数据来源于A服务的数据库;A服务的数据有变更操作时,需要同步到B服务中。
解决方案
1、 在代码逻辑中,有相关A服务数据写操作时,以调用接口的方式,调用B服务接口,B服务再将数据写到新的数据库中。这种方式看似简单,但其实“坑”很多。在A服务代码逻辑中会增加大量这种调用接口同步的代码,增加了项目代码的复杂度,以后会越来越难维护。并且,接口调用的方式并不是一个稳定的方式,没有重试机制,没有同步位置记录,接口调用失败了怎么处理,突然的大量接口调用会产生的问题等,这些都要考虑并且在业务中处理。这里会有不少工作量。想到这里,就将这个方案排除了。
2、通过数据库的binlog进行同步。这种解决方案,与A服务是独立的,不会和A服务有代码上的耦合。可以直接TCP连接进行传输数据,优于接口调用的方式。 这是一套成熟的生产解决方案,也有不少binlog同步的中间件工具,所以我们关注的就是哪个工具能够更好的构建稳定、性能满足且易于高可用部署的方案。
经过调研,我们选择了canal[
原理简图
工作流程
1.Canal连接到A数据库,模拟slave
2.canal-client与Canal建立连接,并订阅对应的数据库表
3.A数据库发生变更写入到binlog,Canal向数据库发送dump请求,获取binlog并解析,发送解析后的数据给canal-client
4.canal-client收到数据,将数据同步到新的数据库
安装canal
下载canal
修改配置/conf/canal.properties
然后配置instance,找到
/conf/example/instance.properties配置文件:
## mysql serverId , v1.0.26+ will autoGen(自动生成,不需配置)# canal.instance.mysql.slaveId=0# position infocanal.instance.master.address=127.0.0.1:3306# 在Mysql执行 SHOW MASTER STATUS;查看当前数据库的binlogcanal.instance.master.journal.name=mysql-bin.000006canal.instance.master.position=4596# 账号密码canal.instance.dbUsername=canalcanal.instance.dbPassword=Canal@****canal.instance.connectionCharset = UTF-8#MQ队列名称canal.mq.topic=canaltopic#单队列模式的分区下标canal.mq.partition=0
启动zookeeper和kafka
zookeeper-server-start.bat ../../config/zookeeper.propertieskafka-server-start.bat ../../config/server.properties
启动 canal
canal/bin/start.bat
编写读取消息的相关代码
kafka相关配置
@Configuration@EnableKafkapublic class KafkaConfig { @Value("${spring.kafka.bootstrap-servers}") private String bootstrapServers; @Value("${spring.kafka.producer.retries}") private String retries; @Value("${spring.kafka.producer.batch-size}") private Integer batchSize; @Value("${spring.kafka.producer.buffer-memory}") private Integer bufferMemory; /** * 生产者配置信息 */ @Bean public Map
读取消息
/** * 如何解决topic指定对应的表(一个topic对应一个表即可解决此问题) * @param record * @param ack * @param topic */ @KafkaListener(topics = KafkaConstants.CANAL_TOPIC, groupId = KafkaConstants.DISPATCH_GROUP) public void canalConsumer(ConsumerRecord, ?> record, Acknowledgment ack, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic) { Optional message = Optional.ofNullable(record.value()); if (message.isPresent()) { String messageStr = (String) message.get(); CanalDto
canal实体信息
public class CanalDto
版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。