黄东旭解析 TiDB 的核心优势
1169
2023-04-07
SpringBoot+Mybatis+自定义注解+Atomikos+实现多源数据库切换和分布式事务
在我们平时的项目开发中,经常会遇到一个系统操作多个数据源的情况。下面介绍一种通过Spring AOP+自定义注解的形式实现多源数据库切换的方式:
实现原理:
jdbc提供了AbstractRoutingDataSource抽象类用来支持多源数据库切换,通过重写determineCurrentLookupKey方法,设定要使用的数据源key即可完成数据源的切换。至于何时切换数据源,采用Aop+自定义注解,在需要切换数据源的方法上添加此注解,利用编写的自定义注解的解析器获取注解中配置的目标数据源,从而进行动态数据源切换。
protected DataSource determineTargetDataSource() { Assert.notNull(this.resolvedDataSources, "DataSource router not initialized"); Object lookupKey = determineCurrentLookupKey(); DataSource dataSource = this.resolvedDataSources.get(lookupKey); if (dataSource == null && (this.lenientFallback || lookupKey == null)) { dataSource = this.resolvedDefaultDataSource; } if (dataSource == null) { throw new IllegalStateException("Cannot determine target DataSource for lookup key [" + lookupKey + "]"); } return dataSource; }
项目结构:
测试数据库脚本:
# db01# 创建数据库CREATE DATABASE `db01` CHARACTER SET 'utf8mb4' COLLATE 'utf8mb4_general_ci';# 创建数据表CREATE TABLE `tbl_user` ( `id` int(11) NOT NULL AUTO_INCREMENT COMMENT '唯一ID', `name` varchar(30) DEFAULT NULL COMMENT '姓名', `sex` varchar(10) DEFAULT NULL COMMENT '性别', `age` int(11) DEFAULT NULL COMMENT '年龄', PRIMARY KEY (`id`)) ENGINE=InnoDB AUTO_INCREMENT=144 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci;# db02# 创建数据库CREATE DATABASE `db01` CHARACTER SET 'utf8mb4' COLLATE 'utf8mb4_general_ci';# 创建数据表CREATE TABLE `tbl_goods` ( `id` int(11) NOT NULL AUTO_INCREMENT COMMENT '唯一ID', `name` varchar(100) COLLATE utf8mb4_general_ci DEFAULT NULL COMMENT '商品名称', `price` decimal(10,0) DEFAULT NULL COMMENT '商品价格', PRIMARY KEY (`id`)) ENGINE=InnoDB AUTO_INCREMENT=52 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_general_ci;
一、创建SpringBoot测试项目
1、编写application.yml
spring: datasource: type: com.alibaba.druid.pool.DruidDataSource # 数据库01 db01: driver-class-name: com.mysql.cj.jdbc.Driver url: jdbc:mysql://localhost:3306/db01?useUnicode=true&characterEncoding=utf8&useSSL=false username: root password: root123mysql # 数据库02 db02: driver-class-name: com.mysql.cj.jdbc.Driver url: jdbc:mysql://localhost:3306/db02?useUnicode=true&characterEncoding=utf8&useSSL=false username: root password: root123mysql profiles: active: devmybatis: config-location: classpath:mybatis/mybatis-config.xml mapper-locations: classpath:mybatis/mapper/*.xml
2、编写mybatis-config.xml
3、在resources文件夹下添加相关mapper文件
GoodsMapper.xml
UserMapper.xml
二、在项目启动项目录下添加config文件夹,编写多源数据库配置和Swagger接口文档配置
SwaggerConfig.java
在config文件夹下添加datasource文件夹,数据库切换相关配置都写在此包下
1、创建数据源名称枚举DataSourceName
package com.whw.mdb.config.datasource;public enum DataSourceName { DB01("db01"), DB02("db02"); private String name; DataSourceName(String name) { this.name = name; } public String getName() { return name; }}
2、编写自定义数据源切换注解SwitchDataSource.java
package com.whw.mdb.config.datasource;import java.lang.annotation.*;/** * @描述 切换数据源注解 **/@Target({ElementType.TYPE, ElementType.METHOD})//指明此自定义注解只能用在方法上@Retention(RetentionPolicy.RUNTIME)//指明此自定义注解是运行时注解@Documentedpublic @interface SwitchDataSource { DataSourceName value() default DataSourceName.DB01;}
3、编写动态数据源切面DynamicDataSourceAspect.java
package com.whw.mdb.config.datasource;import org.aspectj.lang.JoinPoint;import org.aspectj.lang.annotation.After;import org.aspectj.lang.annotation.Aspect;import org.aspectj.lang.annotation.Before;import org.aspectj.lang.reflect.MethodSignature;import org.springframework.core.annotation.Order;import org.springframework.stereotype.Component;import java.lang.reflect.Method;@Aspect@Order(-1)//切面必须要在事务注解@Transactional之前,由于在开始事务之前就需要确定数据源@Componentpublic class DynamicDataSourceAspect { @Before("@within(SwitchDataSource)||@annotation(SwitchDataSource)") public void changedDataSource(JoinPoint joinpoint) { //获取切入点方法上的注解 Method method = ((MethodSignature) joinpoint.getSignature()).getMethod(); SwitchDataSource dataSourceAnnotation = method.getAnnotation(SwitchDataSource.class); if (dataSourceAnnotation == null) { //如果方法上没有数据源注解,则获取方法所在类上面的注解 dataSourceAnnotation = joinpoint.getTarget().getClass().getAnnotation(SwitchDataSource.class); //如果方法所在类上面也没有数据源注解,则使用默认数据源 if (dataSourceAnnotation == null) { return; } } //如果方法上面或者方法所在类上面有数据源注解,则设置当前线程的数据源为数据源注解指定的数据源 DataSourceName dataSourceName = dataSourceAnnotation.value(); DataSourceSwitcher.setDataSource(dataSourceName.getName()); } @After("@within(SwitchDataSource) || @annotation(SwitchDataSource)") public void clean() { //清理数据源的标签 DataSourceSwitcher.setToDefaultSource(); }}
4、编写数据源切换处理器DataSourceSwitcher
package com.whw.mdb.config.datasource;import org.springframework.jdbc.datasource.lookup.AbstractRoutingDataSource;public class DataSourceSwitcher extends AbstractRoutingDataSource { private static final ThreadLocal
5、编写多数据源配置文件DataSourceConfig
package com.whw.mdb.config.datasource;import com.alibaba.druid.spring.boot.autoconfigure.DruidDataSourceBuilder;import org.springframework.boot.context.properties.ConfigurationProperties;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;import org.springframework.context.annotation.Primary;import org.springframework.jdbc.datasource.lookup.AbstractRoutingDataSource;import javax.sql.DataSource;import java.util.HashMap;import java.util.Map;/** * @描述 动态数据源配置类 **/@Configurationpublic class DataSourceConfig { @Bean(name = "dataSourceDB01") @ConfigurationProperties(prefix = "spring.datasource.db01") public DataSource dataSourceDB01() { return DruidDataSourceBuilder.create().build(); } @Bean(name = "dataSourceDB02") @ConfigurationProperties(prefix = "spring.datasource.db02") public DataSource dataSourceDB02() { return DruidDataSourceBuilder.create().build(); } /* * @Primary:自动装配时当出现多个Bean候选者时,被注解为@Primary的Bean将作为首选者,否则将抛出异常 * */ @Primary @Bean("dynamicDataSource") public DataSource dataSource() { DataSource db01 = dataSourceDB01(); DataSource db02 = dataSourceDB02(); Map
三、实现数据库切换,我们只需要在需要切换数据库的方法上添加自定义注解@SwitchDataSource即可
1、编写pojo
Goods.java
package com.whw.mdb.pojo;import io.swagger.annotations.ApiModel;import io.swagger.annotations.ApiModelProperty;@ApiModel("商品实体类")public class Goods { @ApiModelProperty("唯一id") private Integer id; @ApiModelProperty("名称") public String name; @ApiModelProperty("价格") public Double price; public Integer getId() { return id; } public void setId(Integer id) { this.id = id; } public String getName() { return name; } public void setName(String name) { this.name = name; } public Double getPrice() { return price; } public void setPrice(Double price) { this.price = price; }}
User.java
package com.whw.mdb.pojo;import io.swagger.annotations.ApiModel;import io.swagger.annotations.ApiModelProperty;@ApiModel("用户实体类")public class User { @ApiModelProperty("唯一id") private Integer id; @ApiModelProperty("姓名") public String name; @ApiModelProperty("性别") public String sex; @ApiModelProperty("年龄") public Integer age; public Integer getId() { return id; } public void setId(Integer id) { this.id = id; } public String getName() { return name; } public void setName(String name) { this.name = name; } public String getSex() { return sex; } public void setSex(String sex) { this.sex = sex; } public Integer getAge() { return age; } public void setAge(Integer age) { this.age = age; }}
2、编写dao层
GoodsDao.java
package com.whw.mdb.dao;import com.whw.mdb.pojo.Goods;import java.util.List;public interface GoodsDao { int add(Goods goods); List
UserDao.java
package com.whw.mdb.dao;import com.whw.mdb.pojo.User;import org.springframework.stereotype.Repository;import java.util.List;@Repositorypublic interface UserDao { int add(User user); List
3、编写service及实现类impl,在实现类中需要切换数据源的方法上添加自定义注解即可
GoodsService.java
package com.whw.mdb.service;import com.whw.mdb.pojo.Goods;import java.util.List;public interface GoodsService { int add(Goods goods); List
GoodsServiceImpl.java
package com.whw.mdb.service.impl;import com.whw.mdb.config.datasource.DataSourceName;import com.whw.mdb.config.datasource.SwitchDataSource;import com.whw.mdb.dao.GoodsDao;import com.whw.mdb.pojo.Goods;import com.whw.mdb.service.GoodsService;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.stereotype.Service;import java.util.List;@Servicepublic class GoodsServiceImpl implements GoodsService { @Autowired GoodsDao goodsDao; @Override @SwitchDataSource(value = DataSourceName.DB02) public int add(Goods goods) { return goodsDao.add(goods); } @SwitchDataSource(value = DataSourceName.DB02) @Override public List
UserService.java
package com.whw.mdb.service;import com.whw.mdb.pojo.User;import java.util.List;public interface UserService { int add(User user); List
UserServiceImpl.java
package com.whw.mdb.service.impl;import com.whw.mdb.dao.UserDao;import com.whw.mdb.pojo.Goods;import com.whw.mdb.pojo.User;import com.whw.mdb.service.GoodsService;import com.whw.mdb.service.UserService;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.stereotype.Service;import org.springframework.transaction.annotation.Transactional;import java.util.List;@Servicepublic class UserServiceImpl implements UserService { @Autowired UserDao userDao; @Autowired GoodsService goodsService; @Override public int add(User user) { return userDao.add(user); } @Override public List
4、编写controller层
GoodsController.java
package com.whw.mdb.controller;import com.whw.mdb.pojo.Goods;import com.whw.mdb.service.GoodsService;import io.swagger.annotations.Api;import io.swagger.annotations.ApiOperation;import io.swagger.annotations.ApiParam;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.web.bind.annotation.*;import java.util.List;@Api("商品操作API")@RestController@RequestMapping("/goods/")public class GoodsController { @Autowired GoodsService goodsService; @ApiOperation("获取商品列表") @GetMapping("list") public List
UserController.java
package com.whw.mdb.controller;import com.whw.mdb.pojo.User;import com.whw.mdb.service.UserService;import io.swagger.annotations.Api;import io.swagger.annotations.ApiOperation;import io.swagger.annotations.ApiParam;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.web.bind.annotation.*;import java.util.List;@Api("用户操作API")@RestController@RequestMapping("/user/")public class UserController { @Autowired UserService userService; @ApiOperation("获取人员列表") @GetMapping("list") public List
四、测试
2、测试获取人员接口
3、测试获取商品接口
从上述测试看,系统在访问不同的功能时可以随时切换数据源。
五、存在问题
由于数据库中的事务是针对当前数据库操作的,数据源切换之后会造成事务功能不可用,如果在方法上添加了事务注解,此方法实现又需要切换数据源,则会发现出现异常,数据库无法切换
原因:使用了@Transactional注解。为了保证事物的一致性,它需要保证同一个线程的数据库执行Connection和事物执行的Connection必须保持一致,因此去调用下一个Mapper时仍然保持了上一个Mapper的连接。所以就报错。
从SpringManagedTransaction类中可以看出,事务开启的时候就会确定数据库连接,一个事务中的数据库连接是唯一的
public Connection getConnection() throws SQLException { if (this.connection == null) { this.openConnection(); } return this.connection; } private void openConnection() throws SQLException { this.connection = DataSourceUtils.getConnection(this.dataSource); this.autoCommit = this.connection.getAutoCommit(); this.isConnectionTransactional = DataSourceUtils.isConnectionTransactional(this.connection, this.dataSource); LOGGER.debug(() -> { return "JDBC Connection [" + this.connection + "] will" + (this.isConnectionTransactional ? " " : " not ") + "be managed by Spring"; }); }
解决办法:重写SpringManagedTransaction的getConnection()方法
第一步:创建MyTransactionsFactory
import org.apache.ibatis.transaction.Transaction;import org.mybatis.spring.transaction.SpringManagedTransactionFactory;import javax.sql.DataSource;public class MyTransactionsFactory extends SpringManagedTransactionFactory { @Override public Transaction newTransaction(DataSource dataSource, TransactionIsolationLevel level, boolean autoCommit) { return new MyManagedTransaction(dataSource); }}
第二步:重写MyManagedTransaction类中的getConnection()方法
package com.whw.mdbtransaction.config.datasource;import org.mybatis.spring.transaction.SpringManagedTransaction;import javax.sql.DataSource;import java.sql.Connection;import java.sql.SQLException;import java.util.concurrent.ConcurrentHashMap;public class MyManagedTransaction extends SpringManagedTransaction { DataSource dataSource; ConcurrentHashMap
第三步:在DataSourceConfig中指定事务工厂为自定的MyTransactionsFactory
@Beanpublic SqlSessionFactory sqlSessionFactory( @Qualifier("dynamicDataSource") DataSource dynamicDataSource) throws Exception { SqlSessionFactoryBean sessionFactoryBean = new SqlSessionFactoryBean(); sessionFactoryBean.setDataSource(dynamicDataSource); // 此处要指定mapper.xml文件所在位置 sessionFactoryBean.setMapperLocations( new PathMatchingResourcePatternResolver().getResources("classpath:mybatis/mapper/*.xml") ); // 指定自定义的事务工厂 sessionFactoryBean.setTransactionFactory(new MyTransactionsFactory()); return sessionFactoryBean.getObject();}
六、利用Atomikos实现多源数据库事务一致性
1、实现原理
对于单个数据库,通常直接使用Mysql的事务进行事务控制,通过begin,commit和rollback等操作进行开启、回滚或提交事务。但在复杂的应用中,有时会出现同时修改多个数据源数据的情况,为了保证这些数据的能够受事务的控制,则需要使用分布式事务,而XA协议则是分布式事务协议。主流的数据库如Mysql、***、Postgresql、SqlServer等都支持XA协议。 通过xa分布式事务协议可以允许多个数据源加入到一个全局事务中来,加入事务中的资源通常是关系数据库,也有可能是其他数据资源。在一个全局事务中,包含多个数据操作的动作,这些动作在全局事务中要么全部执行,要么全部不执行,一个使用全局事务的应用包含一个或者多个资源管理器和一个事务管理器。
资源管理器RM(resource manager):提供连接事务资源的的功能。一个数据库服务器就是一种资源管理器。资源管理器是事务的参与者,必须要提供提交和回滚事务的功能。 事务管理器TM(transaction manager):事务管理器是全局事务的协调者,他通过与资源管理器通信,协调多个事务的运作。Mysql通过实现XA协议,处理XA事务,让自身成为全局事务中的一个资源管理器。一个连接到Mysql服务器的客户端则充当一个事务管理器的角色。实现全局事务,需要知道哪些参与者参与到事务中,如何将他们运行到一个可以共同提交,或者回滚的点。作为全局事务,还需要考虑网络连接等因素导致的失败。
执行全局事务的过程分二阶段提交(2PC),三阶段提交(3PC)两种实现。后期会进行详细介绍...
2、实现步骤:
Atomikos是SpringBoot推荐使用的一个分布式事务协调工具,使用时我们只需要将DataSource改为XADataSource即可
第一步:引入依赖
第二步:改写DataSourceConfig.java即可
package com.whw.mdbtransaction.config.datasource;import com.atomikos.jdbc.AtomikosDataSourceBean;import com.mysql.cj.jdbc.MysqlXADataSource;import org.apache.ibatis.session.SqlSessionFactory;import org.mybatis.spring.SqlSessionFactoryBean;import org.mybatis.spring.annotation.MapperScan;import org.springframework.beans.factory.annotation.Qualifier;import org.springframework.beans.factory.annotation.Value;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;import org.springframework.context.annotation.Primary;import org.springframework.core.io.support.PathMatchingResourcePatternResolver;import org.springframework.jdbc.datasource.lookup.AbstractRoutingDataSource;import javax.sql.DataSource;import java.util.HashMap;import java.util.Map;/** * @描述 动态数据源配置类 **/@Configuration@MapperScan(basePackages = {"com.whw.mdbtransaction.dao"})public class DataSourceConfig { @Value("${spring.datasource.db01.url}") private String db01Url; @Value("${spring.datasource.db01.username}") private String db01UserName; @Value("${spring.datasource.db01.password}") private String db01Password; @Value("${spring.datasource.db02.url}") private String db02Url; @Value("${spring.datasource.db02.username}") private String db02UserName; @Value("${spring.datasource.db02.password}") private String db02Password; /** * 配置数据源db01 **/ @Bean(name = "dataSourceDB01") public AtomikosDataSourceBean dataSourceDB01() { MysqlXADataSource mysqlXADataSource = new MysqlXADataSource(); mysqlXADataSource.setUrl(db01Url); mysqlXADataSource.setUser(db01UserName); mysqlXADataSource.setPassword(db01Password); AtomikosDataSourceBean atomikosDataSourceBean = new AtomikosDataSourceBean(); atomikosDataSourceBean.setXaDataSourceClassName("com.alibaba.druid.pool.xa.DruidXADataSource"); atomikosDataSourceBean.setUniqueResourceName("db01"); atomikosDataSourceBean.setXaDataSource(mysqlXADataSource); atomikosDataSourceBean.setPoolSize(5); atomikosDataSourceBean.setMaxPoolSize(20); return atomikosDataSourceBean; } /** * 配置数据源db02 **/ @Bean(name = "dataSourceDB02") public AtomikosDataSourceBean dataSourceDB02() { MysqlXADataSource mysqlXADataSource = new MysqlXADataSource(); mysqlXADataSource.setUrl(db02Url); mysqlXADataSource.setUser(db02UserName); mysqlXADataSource.setPassword(db02Password); AtomikosDataSourceBean atomikosDataSourceBean = new AtomikosDataSourceBean(); atomikosDataSourceBean.setXaDataSourceClassName("com.alibaba.druid.pool.xa.DruidXADataSource"); atomikosDataSourceBean.setUniqueResourceName("db02"); atomikosDataSourceBean.setXaDataSource(mysqlXADataSource); atomikosDataSourceBean.setPoolSize(5); atomikosDataSourceBean.setMaxPoolSize(20); return atomikosDataSourceBean; } /** * 配置动态数据源 **/ @Primary @Bean("dynamicDataSource") public DataSource dataSource( @Qualifier("dataSourceDB01") DataSource dataSourceDB01, @Qualifier("dataSourceDB02") DataSource dataSourceDB02 ) { Map
第三步:测试
1、编写测试接口在Swagger接口文档中测试,放开注释,则两个数据库都能不能添加数据,取消注释,则事务正常提交,两个数据库中都能成功添加。
@Override @Transactional(propagation = Propagation.REQUIRED) public Integer addAll() { User user = new User(); user.setName("A"); user.setSex("男"); user.setAge(20); int count = userDao.add(user); Goods goods = new Goods(); goods.setName("三星"); goods.setPrice(3000d); count += goodsService.add(goods); // int i = 1 / 0; return count; }
写在最后:Atomikos虽然提供了一种分布式事务的解决方案,但对于高并发场景下存在很大的性能问题,而且相关文档说明不是很全,在项目中使用的并不是很多,建议研究学习了解下阿里的开源的分布式事务框架 Seata,提供了完备的解决方案。
版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。