功能02-商铺查询缓存02
知识补充
(1)缓存穿透
缓存穿透(cache penetration)是指用户访问的数据既不在缓存当中,也不在数据库中。出于容错的考虑,如果从底层数据库查询不到数据,则不写入缓存。这就导致每次请求都会到底层数据库进行查询,缓存也失去了意义。当高并发或有人利用不存在的Key频繁攻击时,数据库的压力骤增,甚至崩溃,这就是缓存穿透问题。
简单地说,缓存穿透是指用户请求的数据在缓存和数据库中都不存在,则每次请求都会打到数据库中,给数据库带来巨大压力。
常见的两种解决方案
(1)缓存空对象:是指在持久层没有命中的情况下,对key进行set (key,null)。
缓存空对象会有两个问题:
-
value为null 不代表不占用内存空间,空值做了缓存,意味着缓存层中存了更多的键,需要更多的内存空间,比较有效的方法是针对这类数据设置一个较短的过期时间,让其自动剔除。
-
缓存层和存储层的数据会有一段时间窗口的不一致,可能会对业务有一定影响。例如过期时间设置为5分钟,如果此时存储层添加了这个数据,那此段时间就会出现缓存层和存储层数据的不一致,此时可以利用消息系统或者其他方式清除掉缓存层中的空对象。
(2)布隆过滤器:
在访问缓存层和存储层之前,将存在的key用布隆过滤器提前保存起来,做第一层拦截,当收到一个对key请求时,先用布隆过滤器验证是key否存在,如果存在再进入缓存层、存储层。
可以使用bitmap做布隆过滤器。这种方法适用于数据命中不高、数据相对固定、实时性低的应用场景,代码维护较为复杂,但是缓存空间占用少。
布隆过滤器实际上是一个很长的二进制向量和一系列随机映射函数。布隆过滤器可以用于检索一个元素是否在一个集合中。它的优点是空间效率和查询时间都远远超过一般的算法,缺点是有一定的误识别率和删除困难。
布隆过滤器拦截的算法描述:
初始状态时,BloomFilter是一个长度为m的位数组,每一位都置为0。添加元素x时,x使用k个hash函数得到k个hash值,对m取余,对应的bit位设置为1。
判断y是否属于这个集合,对y使用k个哈希函数得到k个哈希值,对m取余,所有对应的位置都是1,则认为y属于该集合(哈希冲突,可能存在误判),否则就认为y不属于该集合。可以通过增加哈希函数和增加二进制位数组的长度来降低错报率
两种方案的比较:
缓存穿透的方案 | 使用场景 | 维护成本 |
---|---|---|
缓存空对象 | 1.数据命中率不高 2.数据频繁变化实时性高 | 1.代码维护简单 2.需要过多的缓存空间 3.数据不一致 |
布隆过滤器 | 1.数据命中不高 2.数据相对固定实时性低 | 1.代码维护复杂 2.缓存空间占用少 |
缓存穿透的解决方案还有:
(2)缓存雪崩
缓存雪崩
在使用缓存时,通常会对缓存设置过期时间,一方面目的是保持缓存与数据库数据的一致性,另一方面是减少冷缓存占用过多的内存空间。但当缓存中大量热点缓存采用了相同的实效时间,就会导致缓存在某一个时刻同时实效,请求全部转发到数据库,从而导致数据库压力骤增,甚至宕机。从而形成一系列的连锁反应,造成系统崩溃等情况,这就是缓存雪崩(Cache Avalanche)。
简单地说,缓存雪崩是指在同一时间段大量的热点key同时失效,或者Redis服务宕机,导致大量请求到达数据库,给数据库带来巨大压力。
解决方案
- 给不同的key的TTL添加随机值(比如随机1-5分钟),让key均匀地失效
- 利用redis集群提高服务的可用性(提高高可用性)
- 给缓存业务添加熔断、降级、限流策略
- 给业务添加多级缓存
(3)缓存击穿
缓存击穿
如果有一个热点key,在不停的扛着大并发,在这个key失效的瞬间,持续的大并发请求就会击破缓存,直接请求到数据库,好像蛮力击穿一样。这种情况就是缓存击穿(Cache Breakdown)。
缓存击穿问题也叫做热点key问题,简单来说,就是一个被高并发访问并且缓存重建业务较复杂的key突然失效了,无数的请求访问在瞬间给数据库带来巨大的冲击。
从定义上可以看出,缓存击穿和缓存雪崩很类似,只不过是缓存击穿是一个热点key失效,而缓存雪崩是大量热点key失效。因此,可以将缓存击穿看作是缓存雪崩的一个子集。
解决方案
方案一:使用互斥锁(Mutex Key),只让一个线程构建缓存,其他线程等待构建缓存执行完毕,重新从缓存中获取数据。单机通过synchronized或lock来处理,分布式环境采用分布式锁。
方案二:逻辑过期。热点数据不设置过期时间,只在value中设置逻辑上的过期时间。后台异步更新缓存,适用于不严格要求缓存一致性的场景。
两种方案的对比:
3.功能02-商铺查询缓存
3.4查询商铺id的缓存穿透问题
3.4.3需求分析
解决查询商铺查询可能存在的缓存穿透问题:当访问不存在的店铺时,请求会直接打到数据库上,并且redis缓存永远不会生效。
这里使用缓存空对象的方式来解决。
3.4.4代码实现
(1)修改ShopServiceImpl.java的queryById方法
@Override
public Result queryById(Long id) {
String key = CACHE_SHOP_KEY + id;
//1.从redis中查询商铺缓存
String shopJson = stringRedisTemplate.opsForValue().get(key);
//2.判断缓存是否命中
if (StrUtil.isNotBlank(shopJson)) {
//2.1若命中,直接返回商铺信息
Shop shop = JSONUtil.toBean(shopJson, Shop.class);
return Result.ok(shop);
}
//判断命中的是否是redis的空值
if (shopJson != null) {
return Result.fail("店铺不存在!");
}
//2.2未命中,根据id查询数据库,判断商铺是否存在数据库中
Shop shop = getById(id);
if (shop == null) {
//2.2.1不存在,防止缓存穿透,将空值存入redis,TTL设置为2min
stringRedisTemplate.opsForValue().set(key, "",
CACHE_NULL_TTL, TimeUnit.MINUTES);
//返回错误信息
return Result.fail("店铺不存在!");
}
//2.2.2存在,则将商铺数据写入redis中
stringRedisTemplate.opsForValue().set(key, JSONUtil.toJsonStr(shop),
CACHE_SHOP_TTL, TimeUnit.MINUTES);
return Result.ok(shop);
}
(2)测试,访问一个缓存和数据库都不存在的数据:
可以看到redis已经缓存了一个空值
之后再访问该数据,只要redis的空值对没有过期,就不会访问到数据库,从而起到保护数据库的作用。
3.5查询商铺id的缓存击穿问题
当查询店铺id时,可能会出现该店铺id对应的缓存失效,从而大量请求发送到数据库的情况,这里使用两种方案分别解决该问题。
3.5.1基于互斥锁方案解决
3.5.1.1需求分析
修改根据id查询商铺的业务,基于互斥锁方式来解决缓存击穿问题。
如下,当出现缓存击穿问题,首先需要判断当前的线程是否能够获取锁:
- 若可以,则进行缓存重建(将数据库数据重新写入缓存中),然后释放锁。
- 如果不能,则线程等待一段时间,然后再判断缓存是否能命中。
- 如果未命中,则重复获取锁的流程,直到缓存命中,或者获得锁,重建缓存。
根据redis的setnx命令,当setnx设置某个key之后,如果该key存在,则其他线程无法设置该key。
我们可以根据这个特性,作为一个lock的逻辑标志,当一个线程setnx某个key后,代表获取了“锁”。当删除这个key时,代表释放“锁”,这样其他线程就可以重新获取“锁”。此外,可以对该key设置一个有效期,防止删除key失败,产生“死锁”。
3.5.1.2代码实现
(1)修改 ShopServiceImpl.java
package com.hmdp.service.impl;
import ...
/**
* 服务实现类
*
* @author 李
* @version 1.0
*/
@Service
public class ShopServiceImpl extends ServiceImpl<ShopMapper, Shop>
implements IShopService {
@Resource
StringRedisTemplate stringRedisTemplate;
@Override
public Result queryById(Long id) {
Shop shop = queryWithMutex(id);
if (shop == null) {
return Result.fail("店铺不存在!");
}
return Result.ok(shop);
}
//缓存穿透(存储空对象)+缓存击穿解决(互斥锁解决)
public Shop queryWithMutex(Long id) {
String key = CACHE_SHOP_KEY + id;
//从redis中查询商铺缓存
String shopJson = stringRedisTemplate.opsForValue().get(key);
//判断缓存是否命中
if (StrUtil.isNotBlank(shopJson)) {
//命中,直接返回商铺信息
return JSONUtil.toBean(shopJson, Shop.class);
}
//判断命中的是否是redis的空值(缓存击穿解决)
if (shopJson != null) {
return null;
}
//未命中,尝试获取互斥锁
String lockKey = "lock:shop:" + id;
boolean isLock = false;
Shop shop = null;
try {
//获取互斥锁
isLock = tryLock(lockKey);
//判断是否获取成功
if (!isLock) {//失败
//等待并重试
Thread.sleep(50);
//直到缓存命中,或者获取到锁
return queryWithMutex(id);
}
//获取锁成功,开始重建缓存
//根据id查询数据库,判断商铺是否存在数据库中
shop = getById(id);
//模拟重建缓存的延迟-----------
Thread.sleep(200);
if (shop == null) {
//不存在,防止缓存穿透,将空值存入redis,TTL设置为2min
stringRedisTemplate.opsForValue().set(key, "",
CACHE_NULL_TTL, TimeUnit.MINUTES);
//返回错误信息
return null;
}
//存在,则将商铺数据写入redis中
stringRedisTemplate.opsForValue().set(key, JSONUtil.toJsonStr(shop),
CACHE_SHOP_TTL, TimeUnit.MINUTES);
} catch (InterruptedException e) {
throw new RuntimeException(e);
} finally {
//释放互斥锁
unLock(lockKey);
}
//返回从缓存或数据库中查到的数据
return shop;
}
//缓存穿透方案
// public Shop queryWithPassThrough(Long id) {
// String key = CACHE_SHOP_KEY + id;
// //1.从redis中查询商铺缓存
// String shopJson = stringRedisTemplate.opsForValue().get(key);
// //2.判断缓存是否命中
// if (StrUtil.isNotBlank(shopJson)) {
// //2.1若命中,直接返回商铺信息
// return JSONUtil.toBean(shopJson, Shop.class);
// }
// //判断命中的是否是redis的空值
// if (shopJson != null) {
// return null;
// }
// //2.2未命中,根据id查询数据库,判断商铺是否存在数据库中
// Shop shop = getById(id);
// if (shop == null) {
// //2.2.1不存在,防止缓存穿透,将空值存入redis,TTL设置为2min
// stringRedisTemplate.opsForValue().set(key, "",
// CACHE_NULL_TTL, TimeUnit.MINUTES);
// //返回错误信息
// return null;
// }
// //2.2.2存在,则将商铺数据写入redis中
// stringRedisTemplate.opsForValue().set(key, JSONUtil.toJsonStr(shop),
// CACHE_SHOP_TTL, TimeUnit.MINUTES);
// return shop;
// }
private boolean tryLock(String key) {
Boolean flag = stringRedisTemplate.opsForValue()
.setIfAbsent(key, "1", 10, TimeUnit.SECONDS);
return BooleanUtil.isTrue(flag);
}
private void unLock(String key) {
stringRedisTemplate.delete(key);
}
@Override
@Transactional
public Result update(Shop shop) {
Long id = shop.getId();
if (id == null) {
return Result.fail("店铺id不能为空");
}
//1.更新数据库
updateById(shop);
//2.删除redis缓存
stringRedisTemplate.delete(CACHE_SHOP_KEY + id);
return Result.ok();
}
}
(2)使用jemeter模拟高并发的情况:
5秒发起1000个请求线程:
模拟http请求:
全部请求成功,获取到数据:
在服务器的控制台中可以看到:对于数据库的请求只触发了一次,证明在高并发的场景下,只有一个线程对数据库发起请求,并对redis对应的缓存重新设置。
3.5.2基于逻辑过期方案解决
3.5.2.1需求分析
需求:修改根据id查询商铺的业务,基于逻辑过期方式来解决缓存击穿问题。
如下,当查询商铺id的请求发送到服务器时:
- 首先从redis查询缓存是否命中(作为热点key,是事先预存在缓存中的。如果未命中,则说明不是热点key,按照普通流程即可)。
- 如果命中了,需要判断该缓存是否过期(逻辑过期的做法是将热点key设置为永不过期,只在逻辑上进行判断是否“过期”)。
- 如果缓存“过期”了,则说明给需要进行逻辑上的缓存重建,为了防止并发情况,需要通过互斥锁来解决
- 判断当前线程是否能获取互斥锁,如果不行,说明有其他线程正在进行缓存重建,当前线程直接返回旧的缓存数据;如果可以,则开启一个新的独立的线程,到数据库获取新数据,将新数据写入redis并设置逻辑过期时间,释放互斥锁。同时,获取锁的线程,直接返回旧的缓存数据。
3.5.2.2代码实现
(1)RedisData,因为要设置逻辑过期时间,为了不改变源代码,这里对相关的实体类做一个“包装”,带上expireTime
package com.hmdp.utils;
import lombok.Data;
import java.time.LocalDateTime;
/**
* @author 李
* @version 1.0
*/
@Data
public class RedisData {
//逻辑过期时间
private LocalDateTime expireTime;
//存入redis的数据
private Object data;
}
(2)热点key需要提前设置到redis中
(2.1)修改ShopServiceImpl,添加方法:该方法用于对热点key增加逻辑过期时间,并存入redis中
public void saveShop2Redis(Long id, Long expireSeconds) {
//查询店铺数据
Shop shop = getById(id);
//封装逻辑过期时间
RedisData redisData = new RedisData();
redisData.setData(shop);
//逻辑过期时间=当前时间+expireSeconds秒
redisData.setExpireTime(LocalDateTime.now().plusSeconds(expireSeconds));
//写入redis
stringRedisTemplate.opsForValue()
.set(CACHE_SHOP_KEY + id, JSONUtil.toJsonStr(redisData));
}
(2.2)在测试类中调用上述方法,将数据设置到redis中
package com.hmdp;
import ...
/**
* @author 李
* @version 1.0
*/
@SpringBootTest
public class HmDianPingApplicationTests {
@Resource
private ShopServiceImpl shopService;
@Test
public void testSaveShop() {
shopService.saveShop2Redis(1L, 10L);
}
}
成功设置热点key信息,并设置逻辑过期时间
(3)ShopServiceImpl,增加queryWithLogicalExpire方法,该方法完成:对热点key逻辑过期判断,以及重建热点缓存工作。
package com.hmdp.service.impl;
import ...
/**
* 服务实现类
*
* @author 李
* @version 1.0
*/
@Service
public class ShopServiceImpl extends ServiceImpl<ShopMapper, Shop>
implements IShopService {
@Resource
StringRedisTemplate stringRedisTemplate;
@Override
public Result queryById(Long id) {
//缓存击穿方案(逻辑过期)
Shop shop = queryWithLogicalExpire(id);
if (shop == null) {
return Result.fail("店铺不存在!");
}
return Result.ok(shop);
}
private static final ExecutorService CACHE_REBUILD_EXECUTOR =
Executors.newFixedThreadPool(10);
//缓存击穿方案(逻辑过期)
public Shop queryWithLogicalExpire(Long id) {
String key = CACHE_SHOP_KEY + id;
String shopJson = stringRedisTemplate.opsForValue().get(key);
//这里不再考虑缓存穿透问题,因为key永不过期
if (StrUtil.isBlank(shopJson)) {//如果未命中,说明不是热点key
//直接返回null
return null;
}
//如果命中
//先把json反序列化为对象
RedisData redisData = JSONUtil.toBean(shopJson, RedisData.class);
Shop shop = JSONUtil.toBean((JSONObject) redisData.getData(), Shop.class);
LocalDateTime expireTime = redisData.getExpireTime();
//判断是否逻辑过期
if (expireTime.isAfter(LocalDateTime.now())) {
//未过期,直接返回店铺信息
return shop;
}
//过期,获取互斥锁
String lockKey = LOCK_SHOP_KEY + id;
boolean isLock = tryLock(lockKey);
if (isLock) {//成功获取互斥锁
//开启独立线程,重建热点缓存
CACHE_REBUILD_EXECUTOR.submit(() -> {
try {
this.saveShop2Redis(id, 20L);
} catch (Exception e) {
throw new RuntimeException(e);
}
//释放互斥锁
unLock(lockKey);
});
}
//如果未获取互斥锁,直接返回旧数据
return shop;
}
//....
private boolean tryLock(String key) {
Boolean flag = stringRedisTemplate.opsForValue()
.setIfAbsent(key, "1", 10, TimeUnit.SECONDS);
return BooleanUtil.isTrue(flag);
}
private void unLock(String key) {
stringRedisTemplate.delete(key);
}
//....
public void saveShop2Redis(Long id, Long expireSeconds) {
//查询店铺数据
Shop shop = getById(id);
//封装逻辑过期时间
RedisData redisData = new RedisData();
redisData.setData(shop);
//逻辑过期时间=当前时间+expireSeconds秒
redisData.setExpireTime(LocalDateTime.now().plusSeconds(expireSeconds));
//写入redis
stringRedisTemplate.opsForValue()
.set(CACHE_SHOP_KEY + id, JSONUtil.toJsonStr(redisData));
}
}
(4)测试目标:测试在高并发的情况下,1.会不会出现大家一起进行重建热点缓存的情况 2.数据一致性问题。
我们首先在数据库中对对应的数据进行修改,看看会不会出现数据一致性问题。
数据库:
redis:
使用jemeter测试,一秒100并发
测试结果:
服务端控制台发出了一次对数据库的操作,说明对没有出现并发重建热点缓存的情况。
在大约第30个线程左右时,对缓存进行了重建
文章来源: 博客园
- 还没有人评论,欢迎说说您的想法!