一篇带给你跨数据源实现数据同步

网友投稿 695 2023-06-13

一篇带给你跨数据源实现数据同步

一篇带给你跨数据源实现数据同步

场景

在微服务拆分的架构中,各服务拥有自己的数据库,所以常常会遇到服务之间数据通信的问题。比如,B服务数据库的数据来源于A服务的数据库;A服务的数据有变更操作时,需要同步到B服务中。

解决方案

1、 在代码逻辑中,有相关A服务数据写操作时,以调用接口的方式,调用B服务接口,B服务再将数据写到新的数据库中。这种方式看似简单,但其实“坑”很多。在A服务代码逻辑中会增加大量这种调用接口同步的代码,增加了项目代码的复杂度,以后会越来越难维护。并且,接口调用的方式并不是一个稳定的方式,没有重试机制,没有同步位置记录,接口调用失败了怎么处理,突然的大量接口调用会产生的问题等,这些都要考虑并且在业务中处理。这里会有不少工作量。想到这里,就将这个方案排除了。

2、通过数据库的binlog进行同步。这种解决方案,与A服务是独立的,不会和A服务有代码上的耦合。可以直接TCP连接进行传输数据,优于接口调用的方式。 这是一套成熟的生产解决方案,也有不少binlog同步的中间件工具,所以我们关注的就是哪个工具能够更好的构建稳定、性能满足且易于高可用部署的方案。

经过调研,我们选择了canal[

原理简图

工作流程

1.Canal连接到A数据库,模拟slave

2.canal-client与Canal建立连接,并订阅对应的数据库表

3.A数据库发生变更写入到binlog,Canal向数据库发送dump请求,获取binlog并解析,发送解析后的数据给canal-client

4.canal-client收到数据,将数据同步到新的数据库

安装canal

下载canal

修改配置/conf/canal.properties

然后配置instance,找到

/conf/example/instance.properties配置文件:

## mysql serverId , v1.0.26+ will autoGen(自动生成,不需配置)# canal.instance.mysql.slaveId=0# position infocanal.instance.master.address=127.0.0.1:3306# 在Mysql执行 SHOW MASTER STATUS;查看当前数据库的binlogcanal.instance.master.journal.name=mysql-bin.000006canal.instance.master.position=4596# 账号密码canal.instance.dbUsername=canalcanal.instance.dbPassword=Canal@****canal.instance.connectionCharset = UTF-8#MQ队列名称canal.mq.topic=canaltopic#单队列模式的分区下标canal.mq.partition=0

启动zookeeper和kafka

zookeeper-server-start.bat ../../config/zookeeper.propertieskafka-server-start.bat ../../config/server.properties

启动 canal

canal/bin/start.bat

编写读取消息的相关代码

kafka相关配置

@Configuration@EnableKafkapublic class KafkaConfig { @Value("${spring.kafka.bootstrap-servers}") private String bootstrapServers; @Value("${spring.kafka.producer.retries}") private String retries; @Value("${spring.kafka.producer.batch-size}") private Integer batchSize; @Value("${spring.kafka.producer.buffer-memory}") private Integer bufferMemory; /** * 生产者配置信息 */ @Bean public Map producerConfigs() { Map props = new HashMap(); //重试,0为不启用重试机制 props.put(ProducerConfig.ACKS_CONFIG, "all"); //连接地址 props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); props.put(ProducerConfig.RETRIES_CONFIG, 2); //控制批处理大小,单位为字节 props.put(ProducerConfig.BATCH_SIZE_CONFIG, batchSize); //批量发送,延迟为1毫秒,启用该功能能有效减少生产者发送消息次数,从而提高并发量 props.put(ProducerConfig.LINGER_MS_CONFIG, 1); //生产者可以使用的总内存字节来缓冲等待发送到服务器的记录 props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, bufferMemory); //键的序列化方式 props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); //值的序列化方式 props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true); return props; } /** kafka无事务模式 * @return */ /* @Bean public ProducerFactory producerFactory() { return new DefaultKafkaProducerFactory<>(producerConfigs()); }*/ /** * 开启kafka事务 * * @return */ @Bean public ProducerFactory producerFactory() { DefaultKafkaProducerFactory factory = new DefaultKafkaProducerFactory<>(producerConfigs()); //在producerFactory中开启事务功能 factory.transactionCapable(); //TransactionIdPrefix是用来生成Transactional.id的前缀 factory.setTransactionIdPrefix("tran-"); return factory; } @Bean public KafkaTransactionManager transactionManager(ProducerFactory producerFactory) { KafkaTransactionManager manager = new KafkaTransactionManager(producerFactory); return manager; } @Bean public KafkaTemplate kafkaTemplate() { return new KafkaTemplate<>(producerFactory()); }}

读取消息

/** * 如何解决topic指定对应的表(一个topic对应一个表即可解决此问题) * @param record * @param ack * @param topic */ @KafkaListener(topics = KafkaConstants.CANAL_TOPIC, groupId = KafkaConstants.DISPATCH_GROUP) public void canalConsumer(ConsumerRecord record, Acknowledgment ack, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic) { Optional message = Optional.ofNullable(record.value()); if (message.isPresent()) { String messageStr = (String) message.get(); CanalDto canalDto = JSONObject.parseObject(messageStr, CanalDto.class); LOGGER.info("canalConsumer 消费了: Topic:{},Message:{}", topic, messageStr); LOGGER.info(canalDto.toString()); boolean isDdl = canalDto.isDdl(); if(!isDdl){ String type = canalDto.getType(); List data = canalDto.getData(); if("INSERT".equals(type)){ mongodbBase.batchSave(data,OrderTbl.class); }else if ("UPDATE".equals(type)) {// mongodbBase.updateFirst(); }else { //删除语句 for (OrderTbl orderTbl : data) { mongodbBase.remove(orderTbl); } } } ack.acknowledge(); } }

canal实体信息

public class CanalDto implements Serializable { private static final long serialVersionUID = 3652575521269639607L; //数据 private List data; //数据库名称 private String database; private long es; //递增,从1开始 private int id; //是否是DDL语句 private boolean isDdl; //表结构的字段类型 private MysqlType mysqlType; //UPDATE语句,旧数据 private String old; //主键名称 private List pkNames; //sql语句 private String sql; private SqlTypeDto sqlType; //表名 private String table; private long ts; //(新增)INSERT、(更新)UPDATE、(删除)DELETE、(删除表)ERASE等等 private String type; }

版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。

上一篇:MySQL 中如何定位 Ddl 被阻塞的问题
下一篇:图数据库基础,我们为什么需要图数据库?
相关文章