Redis场景 | 缓存穿透、击穿问题

网友投稿 673 2023-04-17

Redis场景 | 缓存穿透、击穿问题

Redis场景 | 缓存穿透、击穿问题

场景问题及原因

缓存穿透:

原因:客户端请求的数据在缓存和数据库中不存在,这样缓存永远不会生效,请求全部打入数据库,造成数据库连接异常。

解决思路:

缓存空对象对于不存在的数据也在Redis建立缓存,值为空,并设置一个较短的TTL时间问题:实现简单,维护方便,但短期的数据不一致问题

缓存雪崩:

原因:在同一时段大量的缓存key同时失效或者Redis服务宕机,导致大量请求到达数据库,带来巨大压力。

解决思路:给不同的Key的TTL添加随机值(简单),给缓存业务添加降级限流策略(复杂),给业务添加多级缓存(复杂)

缓存击穿(热点Key):

前提条件:热点Key&在某一时段被高并发访问&缓存重建耗时较长

原因:热点key突然过期,因为重建耗时长,在这段时间内大量请求落到数据库,带来巨大冲击

解决思路:

互斥锁给缓存重建过程加锁,确保重建过程只有一个线程执行,其它线程等待问题:线程阻塞,导致性能下降且有死锁风险逻辑过期热点key缓存永不过期,而是设置一个逻辑过期时间,查询到数据时通过对逻辑过期时间判断,来决定是否需要重建缓存;重建缓存也通过互斥锁保证单线程执行,但是重建缓存利用独立线程异步执行,其它线程无需等待,直接查询到的旧数据即可问题:不保证一致性,有额外内存消耗且实现复杂

场景问题实践解决

分支:20221221-xbhog-cacheBrenkdown

分支:20230110-xbhog-Cache_Penetration_Avalance

缓存穿透:

代码实现:

12345678910111213141516171819202122public Shop queryWithPassThrough(Long id){ //从redis查询商铺信息 String shopInfo = stringRedisTemplate.opsForValue().get(SHOP_CACHE_KEY + id); //命中缓存,返回店铺信息 if(StrUtil.isNotBlank(shopInfo)){ return JSONUtil.toBean(shopInfo, Shop.class); } //redis既没有key的缓存,但查出来信息不为null,则为空字符串 if(shopInfo != null){ return null; } //未命中缓存 Shop shop = getById(id); if(Objects.isNull(shop)){ //将null添加至缓存,过期时间减少 stringRedisTemplate.opsForValue().set(SHOP_CACHE_KEY+id,"",5L, TimeUnit.MINUTES); return null; } //对象转字符串 stringRedisTemplate.opsForValue().set(SHOP_CACHE_KEY+id,JSONUtil.toJsonStr(shop),30L, TimeUnit.MINUTES); return shop;}

上述流程图和代码非常清晰,由于缓存雪崩简单实现(复杂实践不会)增加随机TTL值,缓存穿透和缓存雪崩不过多解释。

缓存击穿:

缓存击穿逻辑分析:

首先线程1在查询缓存时未命中,然后进行查询数据库并重建缓存。注意上述缓存击穿发生的条件,被高并发访问&缓存重建耗时较长;

由于缓存重建耗时较长,在这时间穿插线程2,3,4进入;那么这些线程都不能从缓存中查询到数据,同一时间去访问数据库,同时的去执行数据库操作代码,对数据库访问压力过大。

互斥锁:

解决方式:加锁;****可以采用**tryLock方法 + double check**来解决这样的问题

在线程2执行的时候,由于线程1加锁在重建缓存,所以线程2被阻塞,休眠等待线程1执行完成后查询缓存。由此造成在重建缓存的时候阻塞进程,效率下降且有死锁的风险。

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455private Shop queryWithMutex(Long id) { //从redis查询商铺信息 String shopInfo = stringRedisTemplate.opsForValue().get(SHOP_CACHE_KEY + id); //命中缓存,返回店铺信息 if(StrUtil.isNotBlank(shopInfo)){ return JSONUtil.toBean(shopInfo, Shop.class); } //redis既没有key的缓存,但查出来信息不为null,则为空字符串 if(shopInfo != null){ return null; } //实现缓存重建 String lockKey = "lock:shop:"+id; Shop shop = null; try { Boolean aBoolean = tryLock(lockKey); if(!aBoolean){ //加锁失败,休眠 Thread.sleep(50); //递归等待 return queryWithMutex(id); } //获取锁成功应该再次检测redis缓存是否还存在,做doubleCheck,如果存在则无需重建缓存。 synchronized (this){ //从redis查询商铺信息 String shopInfoTwo = stringRedisTemplate.opsForValue().get(SHOP_CACHE_KEY + id); //命中缓存,返回店铺信息 if(StrUtil.isNotBlank(shopInfoTwo)){ return JSONUtil.toBean(shopInfoTwo, Shop.class); } //redis既没有key的缓存,但查出来信息不为null,则为“” if(shopInfoTwo != null){ return null; } //未命中缓存 shop = getById(id); // 5.不存在,返回错误 if(Objects.isNull(shop)){ //将null添加至缓存,过期时间减少 stringRedisTemplate.opsForValue().set(SHOP_CACHE_KEY+id,"",5L, TimeUnit.MINUTES); return null; } //模拟重建的延时 Thread.sleep(200); //对象转字符串 stringRedisTemplate.opsForValue().set(SHOP_CACHE_KEY+id,JSONUtil.toJsonStr(shop),30L, TimeUnit.MINUTES); } } catch (InterruptedException e) { throw new RuntimeException(e); } finally { unLock(lockKey); } return shop;}

在获取锁失败时,证明已有线程在重建缓存,使当前线程休眠并重试(递归实现)。

代码中需要注意的是synchronized关键字的使用,在获取到锁的时候,在判断下缓存是否存在(失效)double-check,该关键字锁的是当前对象。在其关键字{}中是同步处理。

然后进行测试代码,进行压力测试(jmeter),首先去除缓存中的值,模拟缓存失效。

设置1000个线程,多线程执行间隔5s。

所有的请求都是成功的,其qps大约在200,其吞吐量还是比较可观的。然后看下缓存是否成功(只查询一次数据库);

逻辑过期:

思路分析:

当用户开始查询redis时,判断是否命中,如果没有命中则直接返回空数据,不查询数据库,而一旦命中后,将value取出,判断value中的过期时间是否满足,如果没有过期,则直接返回redis中的数据,如果过期,则在开启独立线程后直接返回之前的数据,独立线程去重构数据,重构完成后释放互斥锁。

封装数据:这里我们采用新建实体类来实现

12345678910/** * @author xbhog * @describe: * @date@Datapublic class RedisData { private LocalDateTime expireTime; private Object data;}

使得过期时间和数据有关联关系,这里的数据类型是Object,方便后续不同类型的封装。

123456789101112131415161718192021222324252627282930313233343536373839public Shop queryWithLogicalExpire( Long id ) { String key = CACHE_SHOP_KEY + id; // 1.从redis查询商铺缓存 String json = stringRedisTemplate.opsForValue().get(key); // 2.判断是否存在 if (StrUtil.isBlank(json)) { // 3.存在,直接返回 return null; } // 4.命中,需要先把json反序列化为对象 RedisData redisData = JSONUtil.toBean(json, RedisData.class); Shop shop = JSONUtil.toBean((JSONObject) redisData.getData(), Shop.class); LocalDateTime expireTime = redisData.getExpireTime(); // 5.判断是否过期 if(expireTime.isAfter(LocalDateTime.now())) { // 5.1.未过期,直接返回店铺信息 return shop; } // 5.2.已过期,需要缓存重建 // 6.缓存重建 // 6.1.获取互斥锁 String lockKey = LOCK_SHOP_KEY + id; boolean isLock = tryLock(lockKey); // 6.2.判断是否获取锁成功 if (isLock){ exectorPool().execute(() -> { try { //重建缓存 this.saveShop2Redis(id, 20L); } catch (Exception e) { throw new RuntimeException(e); } finally { unLock(lockKey); } }); } // 6.4.返回过期的商铺信息 return shop;}

当前的执行流程跟互斥锁基本相同,需要注意的是,在获取锁成功后,我们将缓存重建放到线程池中执行,来异步实现。

线程池代码:

12345678910111213141516/** * 线程池的创建 * @return */private static ThreadPoolExecutor exectorPool(){ ThreadPoolExecutor executor = new ThreadPoolExecutor( 5, //根据自己的处理器数量+1 Runtime.getRuntime().availableProcessors()+1, 2L, TimeUnit.SECONDS, new LinkedBlockingDeque<>(3), Executors.defaultThreadFactory(), new ThreadPoolExecutor.AbortPolicy()); return executor;}

缓存重建代码:

1234567891011121314/** * 重建缓存 * @param id 重建ID * @param l 过期时间 */public void saveShop2Redis(Long id, long l){ //查询店铺信息 Shop shop = getById(id); //封装逻辑过期时间 RedisData redisData = new RedisData(); redisData.setData(shop); redisData.setExpireTime(LocalDateTime.now().plusSeconds(l)); stringRedisTemplate.opsForValue().set(CACHE_SHOP_KEY+id,JSONUtil.toJsonStr(redisData));}

测试条件:100线程,1s线程间隔时间,缓存失效时间10s。

测试环境:缓存中存在对应的数据,并且在缓存快失效之前修改数据库中的数据,造成缓存与数据库不一致,通过执行压测,来查看相关线程返回的数据情况。

从上述两张图中可以看到,在前几个线程执行过程中店铺name为102,当执行时间从19-20的时候店铺name发生变化为105,满足逻辑过期异步执行缓存重建的需求.​

版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。

上一篇:TiFlash 源码阅读(五)DeltaTree 存储引擎设计及实现分析 - Part 2
下一篇:TiFlash 面向编译器的自动向量化加速
相关文章