分布式事务锁及分布式事务锁的实现方式

Yanyan 806 2023-10-23

微服务的流行,使得现在基本都是分布式开发,也就是同一份代码会在多台机器上部署运行,此时若多台机器需要同步访问同一个资源(同一时间只能有一个节点机器在运行同一段代码),就需要使用到分布式锁。

分布式事务锁及分布式事务锁的实现方式

1.1 什么是分布式锁

在非分布式系统中(单机应用)一个共享的变量或者一个方法进行多线程同步访问,可以使用简单加锁(synchronized)方式实现,让同一时刻,只有一个线程执行。随着互联网发展,单机应用已经满足不了需求,分布式系统就出现了,假如不同系统或者是同一系统的不同节点(满足高并发实现集群),需要有共享资源(共享变量或者业务方法),控制他们同步访问的方式就叫分布式锁。通俗的说,就是在同一时刻,大量先线程访问分布式中的共享资源时,为了防止相互干扰,排斥其他线程,只让一个线程对共享变量进行更改访问,或者进行业务方法操作。

1.2 为什么要使用分布式锁

为了保证一个方法在高并发情况下的同一时间只能被同一个线程执行,在传统单体应用单机部署的情况下,可以使用Java并发处理相关的API(如ReentrantLcok或synchronized)进行互斥控制。但是,随着业务发展的需要,原单体单机部署的系统被演化成分布式系统后,由于分布式系统多线程、多进程并且分布在不同机器上,这将使原单机部署情况下的并发控制锁策略失效,为了解决这个问题就需要一种跨JVM的互斥机制来控制共享资源的访问,这就是分布式锁要解决的问题。

1.3 分布式锁应具备哪些条件

高可用 ,高性能获取和释放锁

具备锁的失效机制,防止死锁

具备非阻塞锁的特性,就是没有获取到锁的时候直接返回获取锁失败。

1.4 分布式锁常见实现方式

借助于数据库实现

要实现分布式锁,最简单的方式可能就是直接创建一张锁表,然后通过操作该表中的数据来实现了。当我们要锁住某个方法或资源时,我们就在该表中增加一条记录,想要释放锁的时候就删除这条记录。

除了可以通过增删操作数据表中的记录以外,其实还可以借助数据中自带的锁来实现分布式的锁。 还可以通过数据库的排他锁来实现分布式锁。 基于MySql的InnoDB引擎,可以使用查询语句加上for update实现加锁操作。

这两种方式都是依赖数据库的一张表,一种是通过表中的记录的存在情况确定当前是否有锁存在,另外一种是通过数据库的排他锁来实现分布式锁。

基于数据库的锁设计存在一下问题:

l 这把锁强依赖数据库的可用性,数据库是一个单点,一旦数据库挂掉,会导致业务系统不可用。

l 这把锁没有失效时间,一旦解锁操作失败,就会导致锁记录一直在数据库中,其他线程无法再获得到锁。

l 这把锁只能是非阻塞的,因为数据的insert操作,一旦插入失败就会直接报错。没有获得锁的线程并不会进入排队队列,要想再次获得锁就要再次触发获得锁操作。

l 这把锁是非重入的,同一个线程在没有释放锁之前无法再次获得该锁。因为数据中数据已经存在了。

使用redis实现

相比较于基于数据库实现分布式锁的方案来说,基于缓存来实现在性能方面会表现的更好一点。而且很多缓存是可以集群部署的,可以解决单点问题。

本章下面内容就是基于redis实现,在此不再做单独讲解。

使用zookeeper实现

基于zookeeper临时有序节点可以实现的分布式锁。

实现思路:

首先zookeeper中我们可以创建一个/distributed_lock持久化节点

然后再在/distributed_lock节点下创建自己的临时顺序节点,比如:/distributed_lock/task_00000000000, task_00000000001…

获取所有的/distributed_lock下的所有子节点,并排序

判读自己创建的节点是否最小值(第一位)

如果是,则获取得到锁,执行自己的业务逻辑,最后删除这个临时节点。(如果出错可以让会话断开,创建临时节点消失,有效防止锁定或者死锁)

如果不是最小值,则需要监听自己创建节点前一位节点的数据变化,并阻塞。

当前一位节点被删除时,我们需要通过递归来判断自己创建的节点是否在是最小的,如果是则执行5);如果不是则执行6)(就是递归循环的判断)

1.5 Redission简介

Redisson是一个在Redis的基础上实现的Java驻内存数据网格(In-Memory Data Grid)。它不仅提供了一系列的分布式的Java常用对象,还提供了许多分布式服务。Redisson提供了使用Redis的最简单和最便捷的方法。Redisson的宗旨是促进使用者对Redis的关注分离(Separation of Concern),从而让使用者能够将精力更集中地放在处理业务逻辑上。

1.6 Springcloud+Redisson+redis集群实现分布式锁

引入JAR包

父项目中:

<!--分布式锁需要jar-->
  <!-- https://mvnrepository.com/artifact/org.redisson/redisson -->
     <dependency>
        <groupId>org.redisson</groupId>
        <artifactId>redisson</artifactId>
         <version>3.12.0</version>
     </dependency>

common项目引入:

    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-data-redis</artifactId>
    </dependency>
    <dependency>
        <groupId>org.redisson</groupId>
        <artifactId>redisson</artifactId>
    </dependency>
micro_servieces中引入jar:
   <dependency>
       <groupId>org.springframework.boot</groupId>
       <artifactId>spring-boot-starter-data-redis</artifactId>
   </dependency>
   <dependency>
       <groupId>org.redisson</groupId>
       <artifactId>redisson</artifactId>
   </dependency>

在common项目中编写相关工具类

package com.aaa.common.util;

import java.util.concurrent.TimeUnit;
/**
 * fileName:DistributedLocker
 * description:
 * author:zz
 * createTime:2020/1/29 10:48
 * version:1.0.0
 */
public interface DistributedLocker {
    /**
     * 不带时间的锁定方法
     * @param lockKey
     */
    void lock(String lockKey);
    
    /**
     * 带时间的锁定方法,默认为秒(实现类中自己定义)
     * @param lockKey
     * @param timeout
     */
    void lock(String lockKey, int timeout);
    /**
     * 带时间和单位的锁定方法
     * @param lockKey
     * @param unit
     * @param timeout
     */
    void lock(String lockKey, TimeUnit unit , int timeout);
    /**
     * 解锁方法
     * @param lockKey
     */
    void unlock(String lockKey);
}

接口实现类:RedissonDistributedLocker

package com.aaa.common.util;
import org.redisson.api.RLock;
import org.redisson.api.RedissonClient;
import java.util.concurrent.TimeUnit;
/**
 * fileName:RedissonDistributedLocker
 * description:
 * author:zz
 * createTime:2020/1/29 10:49
 * version:1.0.0
 */
public class RedissonDistributedLocker implements DistributedLocker {
    private RedissonClient redissonClient;
    @Override
    public void lock(String lockKey) {
        RLock lock = redissonClient.getLock(lockKey);
        lock.lock();
    }
    @Override
    public void unlock(String lockKey) {
        RLock lock = redissonClient.getLock(lockKey);
        lock.unlock();
    }
    @Override
    public void lock(String lockKey, int timeout) {
        RLock lock = redissonClient.getLock(lockKey);
        lock.lock(timeout, TimeUnit.SECONDS);
    }
    @Override
    public void lock(String lockKey, TimeUnit unit, int timeout) {
        RLock lock = redissonClient.getLock(lockKey);
        lock.lock(timeout, unit);
    }
    /**
     * 手动注入redissonClient方法
     * @param redissonClient
     */
    public void setRedissonClient(RedissonClient redissonClient) {
        this.redissonClient = redissonClient;
    }
}

工具类调用上面接口的进行锁定或者解锁操作RedissLockUtil:

package com.aaa.common.util;

import java.util.concurrent.TimeUnit;
/**
 * fileName:RedissLockUtil
 * description:
 * author:zz
 * createTime:2020/1/29 10:56
 * version:1.0.0
 */
public class RedissLockUtil {
    private static DistributedLocker redissLock;
    public static void setLocker(DistributedLocker locker) {
        redissLock = locker;
    }
    public static void lock(String lockKey) {
        redissLock.lock(lockKey);
    }
    public static void unlock(String lockKey) {
        redissLock.unlock(lockKey);
    }
    /**
     * 带超时的锁
     * @param lockKey
     * @param timeout 超时时间   单位:秒
     */
    public static void lock(String lockKey, int timeout) {
        redissLock.lock(lockKey, timeout);
    }
    /**
     * 带超时的锁
     * @param lockKey
     * @param unit 时间单位
     * @param timeout 超时时间
     */
    public static void lock(String lockKey, TimeUnit unit , int timeout) {
        redissLock.lock(lockKey, unit, timeout);
    }
}

加载配置文件的配置类 RedissonProperties:

 package com.aaa.common.util;

import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Configuration;
/**
 * fileName:RedissonProperties
 * description:
 * author:zz
 * createTime:2020/1/29 10:54
 * version:1.0.0
 */
@Configuration
@ConfigurationProperties(prefix = "redisson")
public class RedissonProperties {
    private int timeout;
    private String address;
    private String password;
    private int connectionPoolSize;
    private int connectionMinimumIdleSize;
    private int slaveConnectionPoolSize;
    private int masterConnectionPoolSize;
    private String[] sentinelAddresses;
    private String masterName;
    public int getTimeout() {
        return timeout;
    }
    public void setTimeout(int timeout) {
        this.timeout = timeout;
    }
    public String getAddress() {
        return address;
    }
    public void setAddress(String address) {
        this.address = address;
    }
    public String getPassword() {
        return password;
    }
    public void setPassword(String password) {
        this.password = password;
    }
    public int getConnectionPoolSize() {
        return connectionPoolSize;
    }
    public void setConnectionPoolSize(int connectionPoolSize) {
        this.connectionPoolSize = connectionPoolSize;
    }
    public int getConnectionMinimumIdleSize() {
        return connectionMinimumIdleSize;
    }
    public void setConnectionMinimumIdleSize(int connectionMinimumIdleSize) {
        this.connectionMinimumIdleSize = connectionMinimumIdleSize;
    }
    public int getSlaveConnectionPoolSize() {
        return slaveConnectionPoolSize;
    }
    public void setSlaveConnectionPoolSize(int slaveConnectionPoolSize) {
        this.slaveConnectionPoolSize = slaveConnectionPoolSize;
    }
    public int getMasterConnectionPoolSize() {
        return masterConnectionPoolSize;
    }
    public void setMasterConnectionPoolSize(int masterConnectionPoolSize) {
        this.masterConnectionPoolSize = masterConnectionPoolSize;
    }
    public String[] getSentinelAddresses() {
        return sentinelAddresses;
    }
    public void setSentinelAddresses(String[] sentinelAddresses) {
        this.sentinelAddresses = sentinelAddresses;
    }
    public String getMasterName() {
        return masterName;
    }
    public void setMasterName(String masterName) {
        this.masterName = masterName;
    }
}

在微服务项目中真正使用redisson做分布式锁

application.properties中添加下面配置:

#redisson配置   官网地址:https://github.com/redisson/redisson/wiki/2.-%E9%85%8D%E7%BD%AE%E6%96%B9%E6%B3%95

#配置redis链接地址

redisson.address=redis://127.0.0.1:6379
#密码
redisson.password=
#命令等待超时
redisson.timeout = 3000
#连接池大小
redisson.connectionPoolSize=64
#最小空闲连接数
redisson.connectionMinimumIdleSize=10
#从节点连接池大小
redisson.slaveConnectionPoolSize = 250
#主节点连接池大小
redisson.masterConnectionPoolSize = 250

在微服务中编写RedissonAutoConfiguration:

package com.aaa.pay.config;

import com.aaa.common.util.DistributedLocker;
import com.aaa.common.util.RedissLockUtil;
import com.aaa.common.util.RedissonDistributedLocker;
import com.aaa.common.util.RedissonProperties;
import org.redisson.Redisson;
import org.redisson.api.RedissonClient;
import org.redisson.config.Config;
import org.redisson.config.SentinelServersConfig;
import org.redisson.config.SingleServerConfig;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
 * fileName:RedissonAutoConfiguration
 * description:
 * author:zz
 * createTime:2020/1/29 11:03
 * version:1.0.0
 */
@Configuration
@ConditionalOnClass(Config.class)
/**
 * @ConditionalOnBean         //   当给定的在bean存在时,则实例化当前Bean
 @ConditionalOnMissingBean  //   当给定的在bean不存在时,则实例化当前Bean
 @ConditionalOnClass        //   当给定的类名在类路径上存在,则实例化当前Bean
 @ConditionalOnMissingClass //   当给定的类名在类路径上不存在,则实例化当前Bean
 */
@EnableConfigurationProperties(RedissonProperties.class)
public class RedissonAutoConfiguration {
    @Autowired
    private RedissonProperties redssionProperties;
    /**
     * 哨兵模式自动装配
     * @return
     */
   // @Bean
   // @ConditionalOnProperty(name="redisson.master-name")
    RedissonClient redissonSentinel() {
        Config config = new Config();
        SentinelServersConfig serverConfig = config.useSentinelServers().addSentinelAddress(redssionProperties.getSentinelAddresses())
                .setMasterName(redssionProperties.getMasterName())
                .setTimeout(redssionProperties.getTimeout())
                .setMasterConnectionPoolSize(redssionProperties.getMasterConnectionPoolSize())
                .setSlaveConnectionPoolSize(redssionProperties.getSlaveConnectionPoolSize());
        if(redssionProperties.getPassword() != null && !"".equals(redssionProperties.getPassword())) {
            serverConfig.setPassword(redssionProperties.getPassword());
        }
        return Redisson.create(config);
    }
    /**
     * 单机模式自动装配
     * @return
     */
    @Bean
    @ConditionalOnProperty(prefix = "redisson",name="address") //ConditionalOnProperty 可以控制类或者方法是否生效
    RedissonClient redissonSingle() {
        Config config = new Config();
       // System.out.println(redssionProperties.getAddress()+","+redssionProperties.getTimeout()+"...");
        SingleServerConfig serverConfig = config.useSingleServer()
                .setAddress(redssionProperties.getAddress())
                .setTimeout(redssionProperties.getTimeout())
                .setConnectionPoolSize(redssionProperties.getConnectionPoolSize())
                .setConnectionMinimumIdleSize(redssionProperties.getConnectionMinimumIdleSize());
        if(redssionProperties.getPassword() != null && !"".equals(redssionProperties.getPassword())) {
            serverConfig.setPassword(redssionProperties.getPassword());
        }
        return Redisson.create(config);
    }
    /**
     * 装配locker类,并将实例注入到RedissLockUtil中
     * @return
     */
    @Bean
    DistributedLocker distributedLocker(RedissonClient redissonClient) {
        RedissonDistributedLocker locker = new RedissonDistributedLocker();
        locker.setRedissonClient(redissonClient);
        RedissLockUtil.setLocker(locker);
        return locker;
    }
}

分布式锁的三种实现方式

Java中的锁主要包括synchronized锁和JUC包中的锁,这些锁都是针对单个JVM实例上的锁,对于分布式环境如果我们需要加锁就显得无能为力。在单个JVM实例上,锁的竞争者通常是一些不同的线程,而在分布式环境中,锁的竞争者通常是一些不同的线程或者进程。如何实现在分布式环境中对一个对象进行加锁呢?答案就是分布式锁。

分布式锁实现方案

目前分布式锁的实现方案主要包括三种:

基于数据库(唯一索引)

基于缓存(Redis,memcached,tair)

基于Zookeeper

基于数据库实现分布式锁:主要是利用数据库的唯一索引来实现,唯一索引天然具有排他性,这刚好符合我们对锁的要求:同一时刻只能允许一个竞争者获取锁。加锁时我们在数据库中插入一条锁记录,利用业务id进行防重。当第一个竞争者加锁成功后,第二个竞争者再来加锁就会抛出唯一索引冲突,如果抛出这个异常,我们就判定当前竞争者加锁失败。防重业务id需要我们自己来定义,例如我们的锁对象是一个方法,则我们的业务防重id就是这个方法的名字,如果锁定的对象是一个类,则业务防重id就是这个类名。

基于缓存实现分布式锁:理论上来说使用缓存来实现分布式锁的效率最高,加锁速度最快,因为Redis几乎都是纯内存操作,而基于数据库的方案和基于Zookeeper的方案都会涉及到磁盘文件IO,效率相对低下。一般使用Redis来实现分布式锁都是利用Redis的SETNX key value这个命令,只有当key不存在时才会执行成功,如果key已经存在则命令执行失败。

基于Zookeeper:Zookeeper一般用作配置中心,其实现分布式锁的原理和Redis类似,我们在Zookeeper中创建瞬时节点,利用节点不能重复创建的特性来保证排他性。

在实现分布式锁的时候我们需要考虑一些问题,例如:分布式锁是否可重入,分布式锁的释放时机,分布式锁服务端是否有单点问题等。

基于数据库实现分布式锁

上面已经分析了基于数据库实现分布式锁的基本原理:通过唯一索引保持排他性,加锁时插入一条记录,解锁是删除这条记录。下面我们就简要实现一下基于数据库的分布式锁。

表设计

CREATE TABLE `distributed_lock` (
  `id` bigint(20) NOT NULL AUTO_INCREMENT,
  `unique_mutex` varchar(255) NOT NULL COMMENT '业务防重id',
  `holder_id` varchar(255) NOT NULL COMMENT '锁持有者id',
  `create_time` datetime DEFAULT NULL ON UPDATE CURRENT_TIMESTAMP,
  PRIMARY KEY (`id`),
  UNIQUE KEY `mutex_index` (`unique_mutex`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;

id字段是数据库的自增id,unique_mutex字段就是我们的防重id,也就是加锁的对象,此对象唯一。在这张表上我们加了一个唯一索引,保证unique_mutex唯一性。holder_id代表竞争到锁的持有者id。

加锁

insert into distributed_lock(unique_mutex, holder_id) values (‘unique_mutex’, ‘holder_id’);

如果当前sql执行成功代表加锁成功,如果抛出唯一索引异常(DuplicatedKeyException)则代表加锁失败,当前锁已经被其他竞争者获取。

解锁

delete from methodLock where unique_mutex=‘unique_mutex’ and holder_id=‘holder_id’;

解锁很简单,直接删除此条记录即可。

分析

是否可重入:就以上的方案来说,我们实现的分布式锁是不可重入的,即是是同一个竞争者,在获取锁后未释放锁之前再来加锁,一样会加锁失败,因此是不可重入的。解决不可重入问题也很简单:加锁时判断记录中是否存在unique_mutex的记录,如果存在且holder_id和当前竞争者id相同,则加锁成功。这样就可以解决不可重入问题。

锁释放时机:设想如果一个竞争者获取锁时候,进程挂了,此时distributed_lock表中的这条记录就会一直存在,其他竞争者无法加锁。为了解决这个问题,每次加锁之前我们先判断已经存在的记录的创建时间和当前系统时间之间的差是否已经超过超时时间,如果已经超过则先删除这条记录,再插入新的记录。另外在解锁时,必须是锁的持有者来解锁,其他竞争者无法解锁。这点可以通过holder_id字段来判定。

数据库单点问题:单个数据库容易产生单点问题:如果数据库挂了,我们的锁服务就挂了。对于这个问题,可以考虑实现数据库的高可用方案,例如MySQL的MHA高可用解决方案。

基于Zookeeper实现分布式锁

前置知识:

Zookeeper的数据存储结构就像一棵树,这棵树由节点组成,这种节点叫做Znode。

Znode分为四种类型:

持久节点(PERSISTENT):默认的节点类型。创建节点的客户端与zookeeper断开连接后,该节点依旧存在 。

持久节点顺序节点(PERSISTENT_SEQUENTIAL): 所谓顺序节点,就是在创建节点时,Zookeeper根据创建的时间顺序给该节点名称进行编号:

临时节点(EPHEMERAL) :和持久节点相反,当创建节点的客户端与zookeeper断开连接后,临时节点会被删除。

临时顺序节点(EPHEMERAL_SEQUENTIAL) :顾名思义,临时顺序节点结合和临时节点和顺序节点的特点:在创建节点时,Zookeeper根据创建的时间顺序给该节点名称进行编号;当创建节点的客户端与Zookeeper断开连接后,临时节点会被删除。

Zookeeper分布式锁恰恰应用了临时顺序节点。具体如何实现呢?让我们来看一看详细步骤:

加锁和解锁流程

获取锁

首先,在Zookeeper当中创建一个持久节点ParentLock。当第一个客户端想要获得锁时,需要在ParentLock这个节点下面创建一个临时顺序节点 Lock1。

之后,Client1查找ParentLock下面所有的临时顺序节点并排序,判断自己所创建的节点Lock1是不是顺序最靠前的一个。如果是第一个节点,则成功获得锁。

这时候,如果再有一个客户端 Client2 前来获取锁,则在ParentLock下载再创建一个临时顺序节点Lock2。

Client2查找ParentLock下面所有的临时顺序节点并排序,判断自己所创建的节点Lock2是不是顺序最靠前的一个,结果发现节点Lock2并不是最小的。

于是,Client2向排序仅比它靠前的节点Lock1注册Watcher,用于监听Lock1节点是否存在。这意味着Client2抢锁失败,进入了等待状态。

这时候,如果又有一个客户端Client3前来获取锁,则在ParentLock下载再创建一个临时顺序节点Lock3。

Client3查找ParentLock下面所有的临时顺序节点并排序,判断自己所创建的节点Lock3是不是顺序最靠前的一个,结果同样发现节点Lock3并不是最小的。

于是,Client3向排序仅比它靠前的节点Lock2注册Watcher,用于监听Lock2节点是否存在。这意味着Client3同样抢锁失败,进入了等待状态。

这样一来,Client1得到了锁,Client2监听了Lock1,Client3监听了Lock2。这恰恰形成了一个等待队列,很像是Java当中ReentrantLock(可重入锁)所依赖的AQS(AbstractQueuedSynchronizer)。

获得锁的过程大致就是这样,那么Zookeeper如何释放锁呢?

释放锁的过程很简单,只需要释放对应的子节点就好。

释放锁

释放锁分为两种情况:

1.任务完成,客户端显示释放

当任务完成时,Client1会显示调用删除节点Lock1的指令。

2.任务执行过程中,客户端崩溃

获得锁的Client1在任务执行过程中,如果Duang的一声崩溃,则会断开与Zookeeper服务端的链接。根据临时节点的特性,相关联的节点Lock1会随之自动删除。

由于Client2一直监听着Lock1的存在状态,当Lock1节点被删除,Client2会立刻收到通知。这时候Client2会再次查询ParentLock下面的所有节点,确认自己创建的节点Lock2是不是目前最小的节点。如果是最小,则Client2顺理成章获得了锁。

同理,如果Client2也因为任务完成或者节点崩溃而删除了节点Lock2,那么Client3就会接到通知。

最终,Client3成功得到了锁。

使用Zookeeper实现分布式锁的大致流程就是这样。

分析

解决不可重入:客户端加锁时将主机和线程信息写入锁中,下一次再来加锁时直接和序列最小的节点对比,如果相同,则加锁成功,锁重入。

锁释放时机:由于我们创建的节点是顺序临时节点,当客户端获取锁成功之后突然session会话断开,ZK会自动删除这个临时节点。

单点问题:ZK是集群部署的,主要一半以上的机器存活,就可以保证服务可用性。

利用curator实现

Zookeeper第三方客户端curator中已经实现了基于Zookeeper的分布式锁。利用curator加锁和解锁的代码如下:

@Autowired
private CuratorFramework curatorFramework;
// 加锁,支持超时,可重入
public boolean tryLock(long timeout, TimeUnit unit) throws InterruptedException {
    //
    InterProcessMutex interProcessMutex= new InterProcessMutex(curatorFramework, "/ParenLock");
    try {
        return interProcessMutex.acquire(timeout, unit);
    } catch (Exception e) {
        e.printStackTrace();
    }
    return true;
}
// 解锁
public boolean unlock() {
InterProcessMutex interProcessMutex= new InterProcessMutex(curatorFramework,  "/ParenLock");
    try {
        interProcessMutex.release();
    } catch (Throwable e) {
        log.error(e.getMessage(), e);
    } finally {
        executorService.schedule(new Cleaner(client, path), delayTimeForClean, TimeUnit.MILLISECONDS);
    }
    return true;
}

最常用的锁:

InterProcessMutex:分布式可重入排它锁
InterProcessSemaphoreMutex:分布式排它锁
InterProcessReadWriteLock:分布式读写锁

基于缓存实现分布式锁,以Redis为例

加锁

public class RedisTool {
    private static final String LOCK_SUCCESS = "OK";
    private static final String SET_IF_NOT_EXIST = "NX";
    private static final String SET_WITH_EXPIRE_TIME = "PX";
    /**
     * 加锁
     * @param stringRedisTemplate Redis客户端
     * @param lockKey 锁的key
     * @param requestId 竞争者id
     * @param expireTime 锁超时时间,超时之后锁自动释放
     * @return 
     */
    public static boolean getDistributedLock(StringRedisTemplate stringRedisTemplate, String lockKey, String requestId, int expireTime) {
        return stringRedisTemplate.opsForValue().setIfAbsent(lockKey, requestId, 30, TimeUnit.SECONDS);
    }
}

可以看到,我们加锁就一行代码:

stringRedisTemplate.opsForValue().setIfAbsent(lockKey, requestId, 30, TimeUnit.SECONDS);

1

这个setIfAbsent()方法一共五个形参:

第一个为key,我们使用key来当锁,因为key是唯一的。

第二个为value,这里写的是锁竞争者的id,在解锁时,我们需要判断当前解锁的竞争者id是否为锁持有者。

第三个为expx,这个参数我们传的是PX,意思是我们要给这个key加一个过期时间的设置,具体时间由第五个参数决定;

第四个参数为time,与第四个参数相呼应,代表key的过期时间。

总的来说,执行上面的setIfAbsent()方法就只会导致两种结果:

1.当前没有锁(key不存在),那么就进行加锁操作,并对锁设置一个有效期,同时value表示加锁的客户端。

2.已经有锁存在,不做任何操作。上述解锁请求中,缓存超时机制保证了即使一个竞争者加锁之后挂了,也不会产生死锁问题:超时之后其他竞争者依然可以获取锁。通过设置value为竞争者的id,保证了只有锁的持有者才能来解锁,否则任何竞争者都能解锁,那岂不是乱套了。

解锁

public class RedisTool {
    private static final Long RELEASE_SUCCESS = 1L;
    /**
     * 释放分布式锁
     * @param stringRedisTemplate Redis客户端
     * @param lockKey 锁
     * @param requestId 锁持有者id
     * @return 是否释放成功
     */
    public static boolean releaseDistributedLock(StringRedisTemplate stringRedisTemplate, String lockKey, String requestId) {
        String script = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end";
        Long result = stringRedisTemplate.execute(new DefaultRedisScript<Long>(script, Long.class), Collections.singletonList(lockKey), requestId);
        return RELEASE_SUCCESS.equals(result);
    }
}

解锁的步骤:

1、判断当前解锁的竞争者id是否为锁的持有者,如果不是直接返回失败,如果是则进入第2步。

2、删除key,如果删除成功,返回解锁成功,否则解锁失败。

注意到这里解锁其实是分为2个步骤,涉及到解锁操作的一个原子性操作问题。这也是为什么我们解锁的时候用Lua脚本来实现,因为Lua脚本可以保证操作的原子性。那么这里为什么需要保证这两个步骤的操作是原子操作呢?

设想:假设当前锁的持有者是竞争者1,竞争者1来解锁,成功执行第1步,判断自己就是锁持有者,这是还未执行第2步。这是锁过期了,然后竞争者2对这个key进行了加锁。加锁完成后,竞争者1又来执行第2步,此时错误产生了:竞争者1解锁了不属于自己持有的锁。可能会有人问为什么竞争者1执行完第1步之后突然停止了呢?这个问题其实很好回答,例如竞争者1所在的JVM发生了GC停顿,导致竞争者1的线程停顿。这样的情况发生的概率很低,但是请记住即使只有万分之一的概率,在线上环境中完全可能发生。因此必须保证这两个步骤的操作是原子操作。

分析

是否可重入:以上实现的锁是不可重入的,如果需要实现可重入,在SET_IF_NOT_EXIST之后,再判断key对应的value是否为当前竞争者id,如果是返回加锁成功,否则失败。

锁释放时机:加锁时我们设置了key的超时,当超时后,如果还未解锁,则自动删除key达到解锁的目的。如果一个竞争者获取锁之后挂了,我们的锁服务最多也就在超时时间的这段时间之内不可用。

Redis单点问题:如果需要保证锁服务的高可用,可以对Redis做高可用方案:Redis集群+主从切换。目前都有比较成熟的解决方案。

redis分布式锁,更详细的可以参考:分布式锁(Redisson)原理分析

三种方案比较

方案 理解难易程度 实现的复杂度 性能 可靠性 优点 缺点

基于数据库 容易 复杂 差 不可靠

基于缓存(Redis) 一般 一般 高 可靠 Set和Del指令性能较高 1.实现复杂,需要考虑超时,原子性,误删等情形。

2.没有等待锁的队列,只能在客户端自旋来等待,效率低下。

(但是现在有Redisson这两缺点就相当于没有了)

基于Zookeeper 难 简单 一般 一般 1.有封装好的框架,容易实现

2.有等待锁的队列,大大提升抢锁效率。 添加和删除节点性能较低

版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。

上一篇:服务器上的数据备份与恢复及具体实施方案
下一篇:云原生数据库前世今生 云数据库和普通数据库的区别
相关文章