麒麟v10 上部署 TiDB v5.1.2 生产环境优化实践
668
2023-10-30
本篇内容主要讲解“redis中的分布式锁有哪些特点”,感兴趣的朋友不妨来看看。本文介绍的方法操作简单快捷,实用性强。下面就让小编来带大家学习“redis中的分布式锁有哪些特点”吧!
1.独占性
不论在任何情况下都只能有一个线程持有锁。
2.高可用
redis集群环境不能因为某一个节点宕机而出现获取锁或释放锁失败。
3.防死锁
必须有超时控制机制或者撤销操作。
4.不乱抢
自己加锁,自己释放。不能释放别人加的锁。
5.重入性
同一线程可以多次加锁。
一般情况下都是使用setnx+lua脚本实现。
直接贴代码
packagecom.fandf.test.redis;import cn.hutool.core.util.IdUtil; import cn.hutool.core.util.RandomUtil; importlombok.extern.slf4j.Slf4j;import org.springframework.data.redis.core.RedisTemplate; import org.springframework.data.redis.core.script.DefaultRedisScript;import org.springframework.stereotype.Service; import javax.annotation.Resource;import java.util.Collections; import java.util.concurrent.TimeUnit; /** * redis 单机锁 * *@author fandongfeng * @date 2023/3/29 06:52 */ @Slf4j @Service public class RedisLock { @Resource RedisTemplate<String, Object> redisTemplate; privatestaticfinal String SELL_LOCK = "kill:"; /** * 模拟秒杀 * * @return 是否成功 */ public String kill() { String productId = "123"; String key = SELL_LOCK + productId;//锁value,解锁时 用来判断当前锁是否是自己加的String value = IdUtil.fastSimpleUUID();//加锁 十秒钟过期 防死锁 Boolean flag = redisTemplate.opsForValue().setIfAbsent(key, value, 10, TimeUnit.SECONDS);if (!flag) { return "加锁失败"; } try { String productKey = "good123"; //获取商品库存 Integer stock = (Integer) redisTemplate.opsForValue().get(productKey); if (stock == null) {//模拟录入数据, 实际应该加载时从数据库读取 redisTemplate.opsForValue().set(productKey, 100); stock =100; } if (stock <= 0) { return "卖完了,下次早点来吧"; } //扣减库存, 模拟随机卖出数量 int randomInt = RandomUtil.randomInt(1, 10); redisTemplate.opsForValue().decrement(productKey, randomInt);// 修改db,可以丢到队列里慢慢处理 return "成功卖出" + randomInt + "个,库存剩余"+ redisTemplate.opsForValue().get(productKey) + "个"; } finally { // //这种方法会存在删除别人加的锁的可能 // redisTemplate.delete(key); // if(value.equals(redisTemplate.opsForValue().get(key))){ // //因为if条件的判断和 delete不是原子性的, // //if条件判断成功后,恰好锁到期自己解锁 // //此时别的线程如果持有锁了,就会把别人的锁删除掉 // redisTemplate.delete(key); // } //使用lua脚本保证判断和删除的原子性 String luaScript = "if (redis.call(get,KEYS[1]) == ARGV[1]) then " + "return redis.call(del,KEYS[1]) " + "else " + "return 0 " + "end"; redisTemplate.execute(new DefaultRedisScript<>(luaScript,Boolean.class), Collections.singletonList(key), value); } } }进行单元测试,模拟一百个线程同时进行秒杀
package com.fandf.test.redis; import org.junit.jupiter.api.DisplayName; importorg.junit.jupiter.api.RepeatedTest;import org.junit.jupiter.api.Test; importorg.junit.jupiter.api.parallel.Execution;import org.springframework.boot.test.context.SpringBootTest; import javax.annotation.Resource;import static org.junit.jupiter.api.parallel.ExecutionMode.CONCURRENT; /** * @Description: * @author: fandongfeng * @date: 2023-3-24 16:45 */ @SpringBootTest class SignServiceTest { @Resource RedisLock redisLock; @RepeatedTest(100) @Execution(CONCURRENT) public void redisLock() { String result = redisLock.kill(); if("加锁失败".equals(result)) { }else { System.out.println(result); } } }只有三个线程抢到了锁
成功卖出5个,库存剩余95个 成功卖出8个,库存剩余87个 成功卖出7个,库存剩余80个总的来说有两个:
1.无法重入。
2.我们为了防止死锁,加锁时都会加上过期时间,这个时间大部分情况下都是根据经验对现有业务评估得出来的,但是万一程序阻塞或者异常,导致执行了很长时间,锁过期就会自动释放了。此时如果别的线程拿到锁,执行逻辑,就有可能出现问题。
那么这两个问题有没有办法解决呢?有,接下来我们就来讲讲Redisson
Redisson是一个在Redis的基础上实现的Java驻内存数据网格(In-Memory Data Grid)。它不仅提供了一系列的分布式的Java常用对象,还提供了许多分布式服务。其中包括(BitSet, Set, Multimap, SortedSet, Map, List, Queue, BlockingQueue, Deque, BlockingDeque, Semaphore, Lock, AtomicLong, CountDownLatch, Publish / Subscribe, Bloom filter, Remote service, Spring cache, Executor service, Live Object service, Scheduler service) Redisson提供了使用Redis的最简单和最便捷的方法。Redisson的宗旨是促进使用者对Redis的关注分离(Separation of Concern),从而让使用者能够将精力更集中地放在处理业务逻辑上。
springboot集成Redisson集成很简单,只需两步
pom引入依赖
<dependency> <groupId>org.redisson</groupId> <artifactId>redisson-spring-boot-starter</artifactId> </dependency>application.yml增加redis配置
spring: application: name: test redis: host: 127.0.0.1 port: 6379使用也很简单,只需要注入RedissonClient即可
package com.fandf.test.redis; import lombok.extern.slf4j.Slf4j; import org.redisson.api.RLock; importorg.redisson.api.RedissonClient;import org.springframework.stereotype.Component; importjavax.annotation.Resource;/** * @author fandongfeng */ @Component @Slf4j public class RedissonTest { @Resource RedissonClient redissonClient; public void test() { RLock rLock = redissonClient.getLock("anyKey"); //rLock.lock(10, TimeUnit.SECONDS); rLock.lock(); try { // do something } catch(Exception e) { log.error("业务异常", e); } finally{ rLock.unlock(); } } }可能不了解redisson的小伙伴会不禁发出疑问。
what?加锁时不需要加过期时间吗?这样会不会导致死锁啊。解锁不需要判断是不是自己持有吗?
哈哈,别着急,我们接下来一步步揭开redisson的面纱。Redisson lock()源码跟踪我们来一步步跟着lock()方法看下源码(本地redisson版本为3.20.0)
//RedissonLock.class @Override public void lock() { try { lock(-1, null, false); } catch (InterruptedException e) { throw newIllegalStateException(); } }查看lock(-1, null, false);方法
private void lock(long leaseTime, TimeUnit unit, booleaninterruptibly) throws InterruptedException { //获取当前线程id long threadId = Thread.currentThread().getId(); //加锁代码块, 返回锁的失效时间 Long ttl = tryAcquire(-1, leaseTime, unit, threadId); // lock acquired if (ttl == null) { return; } CompletableFuture<RedissonLockEntry> future = subscribe(threadId); pubSub.timeout(future); RedissonLockEntry entry;if(interruptibly) { entry = commandExecutor.getInterrupted(future); }else{ entry = commandExecutor.get(future); }try { while (true) { ttl = tryAcquire(-1, leaseTime, unit, threadId); // lock acquired if (ttl == null) { break; }// waiting for message if (ttl >= 0) { try{ entry.getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS); }catch(InterruptedException e) {if (interruptibly) { throwe; } entry.getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS); } }else { if(interruptibly) { entry.getLatch().acquire(); }else{ entry.getLatch().acquireUninterruptibly(); } } } }finally { unsubscribe(entry, threadId); } // get(lockAsync(leaseTime, unit)); }我们看下它是怎么上锁的,也就是tryAcquire方法
private Long tryAcquire(long waitTime, longleaseTime, TimeUnit unit,long threadId) { //真假加锁方法 tryAcquireAsync return get(tryAcquireAsync(waitTime, leaseTime, unit, threadId)); }public RedissonLock(CommandAsyncExecutor commandExecutor, String name) { super(commandExecutor, name);this.commandExecutor = commandExecutor; this.internalLockLeaseTime = commandExecutor.getServiceManager().getCfg().getLockWatchdogTimeout();this.pubSub = commandExecutor.getConnectionManager().getSubscribeService().getLockPubSub(); }private <T> RFuture<Long> tryAcquireAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId) { RFuture<Long> ttlRemainingFuture;if (leaseTime > 0) { ttlRemainingFuture = tryLockInnerAsync(waitTime, leaseTime, unit, threadId, RedisCommands.EVAL_LONG); }else { //waitTime和leaseTime都是-1,所以走这里 //过期时间internalLockLeaseTime初始化的时候赋值commandExecutor.getServiceManager().getCfg().getLockWatchdogTimeout(); //跟进去源码发现默认值是30秒, private long lockWatchdogTimeout = 30 * 1000;ttlRemainingFuture = tryLockInnerAsync(waitTime, internalLockLeaseTime, TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG); } CompletionStage<Long> s = handleNoSync(threadId, ttlRemainingFuture); ttlRemainingFuture = new CompletableFutureWrapper<>(s);//加锁成功,开启子线程进行续约CompletionStage<Long> f = ttlRemainingFuture.thenApply(ttlRemaining -> { // lock acquired if(ttlRemaining ==null) { if (leaseTime > 0) { //如果指定了过期时间,则不续约internalLockLeaseTime = unit.toMillis(leaseTime); }else { //没指定过期时间,或者小于0,在这里实现锁自动续约scheduleExpirationRenewal(threadId); } }return ttlRemaining; }); returnnew CompletableFutureWrapper<>(f); }上面代码里面包含加锁和锁续约的逻辑,我们先来看看加锁的代码
<T> RFuture<T> tryLockInnerAsync(long waitTime, longleaseTime, TimeUnit unit,long threadId, RedisStrictCommand<T> command) { returnevalWriteAsync(getRawName(), LongCodec.INSTANCE, command,"if ((redis.call(exists, KEYS[1]) == 0) " + "or (redis.call(hexists, KEYS[1], ARGV[2]) == 1)) then " + "redis.call(hincrby, KEYS[1], ARGV[2], 1); " + "redis.call(pexpire, KEYS[1], ARGV[1]); " + "return nil; " + "end; " + "return redis.call(pttl, KEYS[1]);", Collections.singletonList(getRawName()), unit.toMillis(leaseTime), getLockName(threadId)); }这里就看的很明白了吧,redisson使用了lua脚本来保证了命令的原子性。
redis.call(hexists, KEYS[1], ARGV[2]) 查看 key value 是否存在。Redis Hexists 命令用于查看哈希表的指定字段是否存在。
如果哈希表含有给定字段,返回 1 。 如果哈希表不含有给定字段,或 key 不存在,返回 0 。127.0.0.1:6379> hexists 123 uuid (integer) 0 127.0.0.1:6379> hincrby 123 uuid 1 (integer) 1 127.0.0.1:6379> hincrby 123 uuid 1 (integer) 2 127.0.0.1:6379> hincrby 123 uuid 1 (integer) 3 127.0.0.1:6379> hexists 123 uuid (integer) 1 127.0.0.1:6379> hgetall 123 1) "uuid" 2) "3" 127.0.0.1:6379>当key不存在,或者已经含有给定字段(也就是已经加过锁了,这里是为了实现重入性),直接对字段的值+1
这个字段的值,也就是ARGV[2], 取得是getLockName(threadId)方法,我们再看看这个字段的值是什么 protected String getLockName(long threadId) { return id + ":"+ threadId; }public RedissonBaseLock(CommandAsyncExecutor commandExecutor, String name) { super(commandExecutor, name);this.commandExecutor = commandExecutor; this.id = commandExecutor.getServiceManager().getId();this.internalLockLeaseTime = commandExecutor.getServiceManager().getCfg().getLockWatchdogTimeout();this.entryName = id + ":" + name; } //commandExecutor.getServiceManager() 的id默认值 private final String id = UUID.randomUUID().toString();这里就明白了,字段名称是 uuid + : + threadId
接下来我们看看锁续约的代码scheduleExpirationRenewal(threadId);
protected void scheduleExpirationRenewal(long threadId) { ExpirationEntry entry =new ExpirationEntry(); //判断该实例是否加过锁ExpirationEntry oldEntry = EXPIRATION_RENEWAL_MAP.putIfAbsent(getEntryName(), entry);if (oldEntry != null) { //重入次数+1oldEntry.addThreadId(threadId); }else { //第一次加锁 entry.addThreadId(threadId); try { //锁续约核心代码 renewExpiration(); } finally { if (Thread.currentThread().isInterrupted()) { //如果线程异常终止,则关闭锁续约线程cancelExpirationRenewal(threadId); } } } }我们看看renewExpiration()方法
privatevoid renewExpiration() { ExpirationEntry ee = EXPIRATION_RENEWAL_MAP.get(getEntryName()); if(ee ==null) { return; } //新建一个线程执行Timeout task = commandExecutor.getServiceManager().newTimeout(new TimerTask() {@Override publicvoid run(Timeout timeout) throws Exception { ExpirationEntry ent = EXPIRATION_RENEWAL_MAP.get(getEntryName()); if (ent == null) { return; }Long threadId = ent.getFirstThreadId(); if (threadId == null) { return; } //设置锁过期时间为30秒 CompletionStage<Boolean> future = renewExpirationAsync(threadId); future.whenComplete((res, e) -> {if (e != null) { log.error("Cant update lock {} expiration", getRawName(), e); EXPIRATION_RENEWAL_MAP.remove(getEntryName());return; } //检查锁是还否存在 if (res) { // reschedule itself 10后调用自己renewExpiration(); }else { //关闭续约 cancelExpirationRenewal(null); } }); } }, internalLockLeaseTime /3, TimeUnit.MILLISECONDS);//注意上行代码internalLockLeaseTime / 3, //internalLockLeaseTime默认30s,那么也就是10s检查一次ee.setTimeout(task); }//设置锁过期时间为internalLockLeaseTime 也就是30s lua脚本保证原子性 protected CompletionStage<Boolean> renewExpirationAsync(long threadId) {returnevalWriteAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,"if (redis.call(hexists, KEYS[1], ARGV[2]) == 1) then " + "redis.call(pexpire, KEYS[1], ARGV[1]); " + "return 1; " + "end; " + "return 0;", Collections.singletonList(getRawName()), internalLockLeaseTime, getLockName(threadId)); }OK,分析到这里我们已经知道了,lock(),方法会默认加30秒过期时间,并且开启一个新线程,每隔10秒检查一下,锁是否释放,如果没释放,就将锁过期时间设置为30秒,如果锁已经释放,那么就将这个新线程也关掉。
我们写个测试类看看
package com.fandf.test.redis; import org.junit.jupiter.api.Test; import org.redisson.api.RLock; importorg.redisson.api.RedissonClient;import org.springframework.boot.test.context.SpringBootTest; importjavax.annotation.Resource;/** * @Description: * @author: fandongfeng * @date: 2023-3-2416:45 */ @SpringBootTest class RedissonTest { @Resource privateRedissonClient redisson;@Test public void watchDog() throws InterruptedException { RLock lock = redisson.getLock("123"); lock.lock(); Thread.sleep(1000000); } }查看锁的过期时间,及是否续约
127.0.0.1:6379> keys * 1) "123" 127.0.0.1:6379> ttl 123 (integer) 30 127.0.0.1:6379> ttl 123 (integer) 26 127.0.0.1:6379> ttl 123 (integer) 24 127.0.0.1:6379> ttl 123 (integer) 22 127.0.0.1:6379> ttl 123 (integer) 21 127.0.0.1:6379> ttl 123 (integer) 20 127.0.0.1:6379> ttl 123 (integer) 30 127.0.0.1:6379> ttl 123 (integer) 28 127.0.0.1:6379>我们再改改代码,看看是否可重入和字段名称是否和我们预期一致
package com.fandf.test.redis; importorg.junit.jupiter.api.Test;import org.redisson.api.RLock; import org.redisson.api.RedissonClient; importorg.springframework.boot.test.context.SpringBootTest;import javax.annotation.Resource; /** * @Description: *@author: fandongfeng * @date: 2023-3-24 16:45 */ @SpringBootTest class RedissonTest { @Resource private RedissonClient redisson; @Test public void watchDog() throwsInterruptedException{ RLock lock = redisson.getLock("123"); lock.lock(); lock.lock(); lock.lock();//加了三次锁,此时重入次数为3 Thread.sleep(3000); //解锁一次,此时重入次数变为3 lock.unlock(); Thread.sleep(1000000); } }127.0.0.1:6379> keys * 1) "123" 127.0.0.1:6379> 127.0.0.1:6379> ttl 123 (integer) 24 127.0.0.1:6379> hgetall 123 1) "df7f4c71-b57b-455f-acee-936ad8475e01:12" 2) "3" 127.0.0.1:6379> 127.0.0.1:6379> hgetall 123 1) "df7f4c71-b57b-455f-acee-936ad8475e01:12" 2) "2" 127.0.0.1:6379>我们加锁了三次,重入次数是3,字段值也是 uuid+:+threadId,和我们预期结果是一致的。
redisson是基于Redlock算法实现的,那么什么是Redlock算法呢?
假设当前集群有5个节点,那么运行redlock算法的客户端会一次执行下面步骤
1.客户端记录当前系统时间,以毫秒为单位
2.依次尝试从5个redis实例中,使用相同key获取锁
当redis请求获取锁时,客户端会设置一个网络连接和响应超时时间,避免因为网络故障等原因导致阻塞。3.客户端使用当前时间减去开始获取锁时间(步骤1的时间),得到获取锁消耗的时间
只有当半数以上redis节点加锁成功,并且加锁消耗的时间要小于锁失效时间,才算锁获取成功4.如果获取到了锁,key的真正有效时间等于锁失效时间 减去 获取锁消耗的时间
5.如果获取锁失败,所有的redis实例都会进行解锁
防止因为服务端响应消息丢失,但是实际数据又添加成功导致数据不一致问题这里有下面几个点需要注意:
1.我们都知道单机的redis是cp的,但是集群情况下redis是ap的,所以运行Redisson的节点必须是主节点,不能有从节点,防止主节点加锁成功未同步从节点就宕机,而客户端却收到加锁成功,导致数据不一致问题。
2.为了提高redis节点宕机的容错率,可以使用公式2N(n指宕机数量)+1,假设宕机一台,Redisson还要继续运行,那么至少要部署2*1+1=3台主节点。
到此,相信大家对“redis中的分布式锁有哪些特点”有了更深的了解,不妨来实际操作一番吧!这里是网站,更多相关内容可以进入相关频道进行查询,关注我们,继续学习!
版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。