# 缓存和分布式锁 # 一、缓存 ## 1. 什么是缓存   缓存的作用是减低对数据源的访问频率。从而提高我们系统的性能。 ![image.png](https://fynotefile.oss-cn-zhangjiakou.aliyuncs.com/fynote/1462/1647243560000/f7b9a0b66af14121aadeb50bffd8e36c.png) ![image.png](https://fynotefile.oss-cn-zhangjiakou.aliyuncs.com/fynote/1462/1647243560000/cb0dbab831954c11bd7e3873914f7150.png) 缓存的流程图 ![image.png](https://fynotefile.oss-cn-zhangjiakou.aliyuncs.com/fynote/1462/1647243560000/cba94b3b543247feb6d8099a6ccc5d2f.png) ## 2.缓存的分类 ### 2.1 本地缓存   其实就是把缓存数据存储在内存中(Map `<String,Object>`).在单体架构中肯定没有问题。 ![image.png](https://fynotefile.oss-cn-zhangjiakou.aliyuncs.com/fynote/1462/1647243560000/8b6494222e834cc895c5934b15836c95.png) 单体架构下的缓存处理 ![image.png](https://fynotefile.oss-cn-zhangjiakou.aliyuncs.com/fynote/1462/1647243560000/fc2ef64e7b734fa6b4702420be0d5a36.png) ### 2.2 分布式缓存   在分布式环境下,我们原来的本地缓存就不是太使用了,原因是: * 缓存数据冗余 * 缓存效率不高 ![image.png](https://fynotefile.oss-cn-zhangjiakou.aliyuncs.com/fynote/1462/1647243560000/7465ee4dee6b4d0c9f4b61ba93dbfa87.png)   分布式缓存的结构图 ![image.png](https://fynotefile.oss-cn-zhangjiakou.aliyuncs.com/fynote/1462/1647243560000/b252ba1016044eca891ab5b3bdf9d1e0.png) ## 3.整合Redis   要整合Redis那么我们在SpringBoot项目中首页来添加对应的依赖 ```xml org.springframework.boot spring-boot-starter-data-redis ```   然后我们需要添加对应的配置信息 ![image.png](https://fynotefile.oss-cn-zhangjiakou.aliyuncs.com/fynote/1462/1647243560000/753f65df9029411a8145b2477d1e58d3.png) 测试操作Redis的数据 ```java @Autowired StringRedisTemplate stringRedisTemplate; @Test public void testStringRedisTemplate(){ // 获取操作String类型的Options对象 ValueOperations ops = stringRedisTemplate.opsForValue(); // 插入数据 ops.set("name","bobo"+ UUID.randomUUID()); // 获取存储的信息 System.out.println("刚刚保存的值:"+ops.get("name")); } ``` 查看可以通过Redis的客户端连接查看 ![image.png](https://fynotefile.oss-cn-zhangjiakou.aliyuncs.com/fynote/1462/1647243560000/1b678ea26d684404a93217b8bb781164.png) 也可以通过工具查看 ![image.png](https://fynotefile.oss-cn-zhangjiakou.aliyuncs.com/fynote/1462/1647243560000/33e044f827f0470bb9f3a27308f77118.png) ## 4.改造三级分类   在首页查询二级和三级分类数据的时候我们可以通过Redis来缓存存储对应的数据,来提升检索的效率。 ```java @Override public Map> getCatelog2JSON() { // 从Redis中获取分类的信息 String catalogJSON = stringRedisTemplate.opsForValue().get("catalogJSON"); if(StringUtils.isEmpty(catalogJSON)){ // 缓存中没有数据,需要从数据库中查询 Map> catelog2JSONForDb = getCatelog2JSONForDb(); // 从数据库中查询到的数据,我们需要给缓存中也存储一份 String json = JSON.toJSONString(catelog2JSONForDb); stringRedisTemplate.opsForValue().set("catalogJSON",json); return catelog2JSONForDb; } // 表示缓存命中了数据,那么从缓存中获取信息,然后返回 Map> stringListMap = JSON.parseObject(catalogJSON, new TypeReference>>() { }); return stringListMap; } ```   然后对三级分类的数据做压力测试 | 压力测试内容 | 压力测试的线程数 | 吞吐量/s | 90%响应时间 | 99%响应时间 | | ------------------------ | ---------------- | -------- | ----------- | ----------- | | Nginx | 50 | 7,385 | 10 | 70 | | Gateway | 50 | 23,170 | 3 | 14 | | 单独测试服务 | 50 | 23,160 | 3 | 7 | | Gateway+服务 | 50 | 8,461 | 12 | 46 | | Nginx+Gateway | 50 | | | | | Nginx+Gateway+服务 | 50 | 2,816 | 27 | 42 | | 一级菜单 | 50 | 1,321 | 48 | 74 | | 三级分类压测 | 50 | 12 | 4000 | 4000 | | 三级分类压测(业务优化后) | 50 | 448 | 113 | 227 | | 三级分类压测(Redis缓存) | 50 | 1163 | 49 | 59 |   通过对比可以看到Redis缓存加入后的性能提升的效果还是非常明显的。 ![image.png](https://fynotefile.oss-cn-zhangjiakou.aliyuncs.com/fynote/1462/1647243560000/29d741c218ec440e9df345b92f03ca69.png) ## 5.缓存穿透   指查询一个一定不存在的数据,由于缓存是不命中,将去查询数据库,但是数据库也无此记录,我们没有将这次查询的null写入缓存,这将导致这个不存在的数据每次请求都要到存储层去查询,失去了缓存的意义. ![image.png](https://fynotefile.oss-cn-zhangjiakou.aliyuncs.com/fynote/1462/1647243560000/dca9e58ad48f41cca7dfbfee4e25275c.png) 利用不存在的数据进行攻击,数据库瞬时压力增大,最终导致崩溃,解决方案也比较简单,直接把null结果缓存,并加入短暂的过期时间 ![image.png](https://fynotefile.oss-cn-zhangjiakou.aliyuncs.com/fynote/1462/1647243560000/afd4d9ca881948b9b8f78a5667fb4771.png) ## 6.缓存雪崩   缓存雪崩是指在我们设置缓存时key采用了相同的过期时间,导致缓存在某一时刻同时失效,请求全部转发到DB,DB瞬时压力过重雪崩。 ![image.png](https://fynotefile.oss-cn-zhangjiakou.aliyuncs.com/fynote/1462/1647243560000/a60f00c6642a4027889d8f3bba9b6a02.png) 解决方案:原有的失效时间基础上增加一个随机值,比如1-5分钟随机,这样每一个缓存的过期时间的重复率就会降低,就很难引发集体失效的事件。 ![image.png](https://fynotefile.oss-cn-zhangjiakou.aliyuncs.com/fynote/1462/1647243560000/a48b2bce1ed049db9eda217396345759.png) ## 7.缓存击穿   对于一些设置了过期时间的key,如果这些key可能会在某些时间点被超高并发地访问,是一种非常“热点”的数据。如果这个key在大量请求同时进来前正好失效,那么所有对这个key的数据查询都落到db,我们称为缓存击穿。 ![image.png](https://fynotefile.oss-cn-zhangjiakou.aliyuncs.com/fynote/1462/1647243560000/e17782f1bcde4c839989a5906aef9e35.png) 解决方案:加锁,大量并发只让一个去查,其他人等待,查到以后释放锁,其他人获取到锁,先查缓存,就会有数据,不用去db。 ![image.png](https://fynotefile.oss-cn-zhangjiakou.aliyuncs.com/fynote/1462/1647243560000/4f7f4bb074874ae6a6366315cb804e2e.png) 但是当我们压力测试的时候,输出的结果有点出乎我们的意料 ![image.png](https://fynotefile.oss-cn-zhangjiakou.aliyuncs.com/fynote/1462/1647243560000/faf97a0b64ae45d986f6f96151cc6adf.png) 做了两次的查询,原因是释放锁和查询结果缓存的时序问题 ![image.png](https://fynotefile.oss-cn-zhangjiakou.aliyuncs.com/fynote/1462/1647243560000/d25221e899c649ae822d6f9811c0f5e3.png) 我们只需要调整下释放锁和结果缓存的时序问题就可以了 ![image.png](https://fynotefile.oss-cn-zhangjiakou.aliyuncs.com/fynote/1462/1647243560000/931fa32bb41841aba9a33da4ca522160.png) 然后就是完整的代码处理 ```java /** * 查询出所有的二级和三级分类的数据 * 并封装为Map对象 * @return */ @Override public Map> getCatelog2JSON() { String key = "catalogJSON"; // 从Redis中获取分类的信息 String catalogJSON = stringRedisTemplate.opsForValue().get(key); if(StringUtils.isEmpty(catalogJSON)){ System.out.println("缓存没有命中....."); // 缓存中没有数据,需要从数据库中查询 Map> catelog2JSONForDb = getCatelog2JSONForDb(); if(catelog2JSONForDb == null){ // 那就说明数据库中也不存在 防止缓存穿透 stringRedisTemplate.opsForValue().set(key,"1",5, TimeUnit.SECONDS); }else{ // 从数据库中查询到的数据,我们需要给缓存中也存储一份 // 防止缓存雪崩 String json = JSON.toJSONString(catelog2JSONForDb); stringRedisTemplate.opsForValue().set("catalogJSON",json,10,TimeUnit.MINUTES); } return catelog2JSONForDb; } System.out.println("缓存命中了...."); // 表示缓存命中了数据,那么从缓存中获取信息,然后返回 Map> stringListMap = JSON.parseObject(catalogJSON, new TypeReference>>() { }); return stringListMap; } /** * 从数据库查询的结果 * 查询出所有的二级和三级分类的数据 * 并封装为Map对象 * 在SpringBoot中,默认的情况下是单例 * @return */ public Map> getCatelog2JSONForDb() { String keys = "catalogJSON"; synchronized (this){ /*if(cache.containsKey("getCatelog2JSON")){ // 直接从缓存中获取 return cache.get("getCatelog2JSON"); }*/ // 先去缓存中查询有没有数据,如果有就返回,否则查询数据库 // 从Redis中获取分类的信息 String catalogJSON = stringRedisTemplate.opsForValue().get(keys); if(!StringUtils.isEmpty(catalogJSON)){ // 说明缓存命中 // 表示缓存命中了数据,那么从缓存中获取信息,然后返回 Map> stringListMap = JSON.parseObject(catalogJSON, new TypeReference>>() { }); return stringListMap; } System.out.println("-----------》查询数据库操作"); // 获取所有的分类数据 List list = baseMapper.selectList(new QueryWrapper()); // 获取所有的一级分类的数据 List leve1Category = this.queryByParenCid(list,0l); // 把一级分类的数据转换为Map容器 key就是一级分类的编号, value就是一级分类对应的二级分类的数据 Map> map = leve1Category.stream().collect(Collectors.toMap( key -> key.getCatId().toString() , value -> { // 根据一级分类的编号,查询出对应的二级分类的数据 List l2Catalogs = this.queryByParenCid(list,value.getCatId()); List Catalog2VOs =null; if(l2Catalogs != null){ Catalog2VOs = l2Catalogs.stream().map(l2 -> { // 需要把查询出来的二级分类的数据填充到对应的Catelog2VO中 Catalog2VO catalog2VO = new Catalog2VO(l2.getParentCid().toString(), null, l2.getCatId().toString(), l2.getName()); // 根据二级分类的数据找到对应的三级分类的信息 List l3Catelogs = this.queryByParenCid(list,l2.getCatId()); if(l3Catelogs != null){ // 获取到的二级分类对应的三级分类的数据 List catalog3VOS = l3Catelogs.stream().map(l3 -> { Catalog2VO.Catalog3VO catalog3VO = new Catalog2VO.Catalog3VO(l3.getParentCid().toString(), l3.getCatId().toString(), l3.getName()); return catalog3VO; }).collect(Collectors.toList()); // 三级分类关联二级分类 catalog2VO.setCatalog3List(catalog3VOS); } return catalog2VO; }).collect(Collectors.toList()); } return Catalog2VOs; } )); // 从数据库中获取到了对应的信息 然后在缓存中也存储一份信息 //cache.put("getCatelog2JSON",map); // 表示缓存命中了数据,那么从缓存中获取信息,然后返回 if(map == null){ // 那就说明数据库中也不存在 防止缓存穿透 stringRedisTemplate.opsForValue().set(keys,"1",5, TimeUnit.SECONDS); }else{ // 从数据库中查询到的数据,我们需要给缓存中也存储一份 // 防止缓存雪崩 String json = JSON.toJSONString(map); stringRedisTemplate.opsForValue().set("catalogJSON",json,10,TimeUnit.MINUTES); } return map; } } ``` ## 8.本地锁的局限   本地锁在分布式环境下,是没有办法锁住其他节点的操作的,这种情况肯定是有问题的 ![image.png](https://fynotefile.oss-cn-zhangjiakou.aliyuncs.com/fynote/1462/1647243560000/0d462603f2e9461bafbf3bbdb6ed79ca.png) 针对本地锁的问题,我们需要通过分布式锁来解决,那么是不是意味着本身锁在分布式场景下就不需要了呢? ![image.png](https://fynotefile.oss-cn-zhangjiakou.aliyuncs.com/fynote/1462/1647243560000/d3707cdc47ed4941b6b00b49711937ce.png)   显然不是这样的,因为如果分布式环境下的每个节点不控制请求的数量,那么分布式锁的压力会非常大,这时我们需要本地锁来控制每个节点的同步,来降低分布式锁的压力,所以实际开发中我们都是本地锁和分布式锁结合使用的。 # 二、分布式锁 ## 1.分布式锁的原理   分布式锁或者本地锁的本质其实是一样的,都是将并行的操作转换为了串行的操作 ![image.png](https://fynotefile.oss-cn-zhangjiakou.aliyuncs.com/fynote/1462/1647243560000/62602c5ee39246acbdfb2f9990197e02.png) ## 2.分布式锁的常用解决方案 ### 2.1 数据库 ```txt 可以利用MySQL隔离性:唯一索引 ``` ```sql use test; CREATE TABLE `DistributedLock` (  `id` int(11) NOT NULL AUTO_INCREMENT COMMENT '主键',  `name` varchar(64) NOT NULL DEFAULT '' COMMENT '锁名',  `desc` varchar(1024) NOT NULL DEFAULT '备注信息',  `update_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '保存数据时间,自动生成',  PRIMARY KEY (`id`),  UNIQUE KEY `uidx_name` (`name`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8 COMMENT='锁定中的方法'; //数据库中的每一条记录就是一把锁,利用的mysql唯一索引的排他性 lock(name,desc){ insert into DistributedLock(`name`,`desc`) values (#{name},#{desc}); } unlock(name){ delete from DistributedLock where name = #{name} } ``` 可以利用拍他说来实现 select .... where ... for update; 乐观锁:乐观的任务数据不会出现数据安全问题,如果出现了就重试一次 ```sql select ...,version; update table set version+1 where version = xxx ``` ### 2.2 Redis setNX: setNX(key,value) :如果key不存在那么就添加key的值,否则添加失败,Redisson ### 2.3Zookeeper ![image.png](https://fynotefile.oss-cn-zhangjiakou.aliyuncs.com/fynote/1462/1647243560000/7daa3aeb84844f40ac2ca74ef1f08d52.png) ## 3.Redis实现分布式锁   在Redis中是通过setNX指令来实现锁的抢占,那么利用这个命令实现分布式锁的基础代码为: ```java public Map> getCatelog2JSONDbWithRedisLock() { String keys = "catalogJSON"; // 加锁 Boolean lock = stringRedisTemplate.opsForValue().setIfAbsent("lock", "1111"); if(lock){ // 加锁成功 Map> data = getDataForDB(keys); // 从数据库中获取数据成功后,我们应该要释放锁 stringRedisTemplate.delete("lock"); return data; }else{ // 加锁失败 // 休眠+重试 // Thread.sleep(1000); return getCatelog2JSONDbWithRedisLock(); } } ```   上面的代码其实是存在一些问题的,首先如果getDataForDB(keys)这个方法如果出现的异常,那么我们就不会删除该key也就是不会释放锁,从而造成了死锁,针对这个问题,我们可以通过设置过期时间来解决,具体代码如下: ```java public Map> getCatelog2JSONDbWithRedisLock() { String keys = "catalogJSON"; // 加锁 Boolean lock = stringRedisTemplate.opsForValue().setIfAbsent("lock", "1111"); if(lock){ // 给对应的key设置过期时间 stringRedisTemplate.expire("lock",20,TimeUnit.SECONDS); // 加锁成功 Map> data = getDataForDB(keys); // 从数据库中获取数据成功后,我们应该要释放锁 stringRedisTemplate.delete("lock"); return data; }else{ // 加锁失败 // 休眠+重试 // Thread.sleep(1000); return getCatelog2JSONDbWithRedisLock(); } } ```   上面虽然解决了getDataForDB方法出现异常的问题,但是如果在expire方法执行之前就中断呢?这样也会出现我们介绍的死锁的问题,那这个问题怎么办?这时我们就希望setNx和设置过期时间的操作能够保证原子性。 这时我们就可以在setIfAbsent方法中同时指定过期时间,保证这个原子性的行为 ```java public Map> getCatelog2JSONDbWithRedisLock() { String keys = "catalogJSON"; // 加锁 在执行插入操作的同时设置了过期时间 Boolean lock = stringRedisTemplate.opsForValue().setIfAbsent("lock", "1111",30,TimeUnit.SECONDS); if(lock){ // 给对应的key设置过期时间 stringRedisTemplate.expire("lock",20,TimeUnit.SECONDS); // 加锁成功 Map> data = getDataForDB(keys); // 从数据库中获取数据成功后,我们应该要释放锁 stringRedisTemplate.delete("lock"); return data; }else{ // 加锁失败 // 休眠+重试 // Thread.sleep(1000); return getCatelog2JSONDbWithRedisLock(); } } ```   如果获取锁的业务执行时间比较长,超过了我们设置的过期时间,那么就有可能业务还没执行完,锁就释放了,然后另一个请求进来了,并创建了key,这时原来的业务处理完成后,再去删除key的时候,那么就有可能删除别人的key,这时怎么办?针对这种情况我们可以查询的锁的信息通过UUID来区分,具体的代码如下: ```java public Map> getCatelog2JSONDbWithRedisLock() { String keys = "catalogJSON"; // 加锁 在执行插入操作的同时设置了过期时间 String uuid = UUID.randomUUID().toString(); Boolean lock = stringRedisTemplate.opsForValue().setIfAbsent("lock", uuid,30,TimeUnit.SECONDS); if(lock){ // 给对应的key设置过期时间 stringRedisTemplate.expire("lock",20,TimeUnit.SECONDS); // 加锁成功 Map> data = getDataForDB(keys); // 获取当前key对应的值 String val = stringRedisTemplate.opsForValue().get("lock"); if(uuid.equals(val)){ // 说明这把锁是自己的 // 从数据库中获取数据成功后,我们应该要释放锁 stringRedisTemplate.delete("lock"); } return data; }else{ // 加锁失败 // 休眠+重试 // Thread.sleep(1000); return getCatelog2JSONDbWithRedisLock(); } } ```   上面查询key的值和删除key其实不是一个原子性操作,这就会出现我查询出来key之后,时间过期了,然后key被删除了,然后其他的请求创建了一个新的key,然后原来的执行删除了这个key,又出现了删除别人key的情况。这时我们需要保证查询和删除是一个原子性行为。 ```java public Map> getCatelog2JSONDbWithRedisLock() { String keys = "catalogJSON"; // 加锁 在执行插入操作的同时设置了过期时间 String uuid = UUID.randomUUID().toString(); Boolean lock = stringRedisTemplate.opsForValue().setIfAbsent("lock", uuid,300,TimeUnit.SECONDS); if(lock){ Map> data = null; try { // 加锁成功 data = getDataForDB(keys); }finally { String srcipts = "if redis.call('get',KEYS[1]) == ARGV[1] then return redis.call('del',KEYS[1]) else return 0 end "; // 通过Redis的lua脚本实现 查询和删除操作的原子性 stringRedisTemplate.execute(new DefaultRedisScript(srcipts,Integer.class) ,Arrays.asList("lock"),uuid); } return data; }else{ // 加锁失败 // 休眠+重试 // Thread.sleep(1000); return getCatelog2JSONDbWithRedisLock(); } } ``` https://space.bilibili.com/435498550 分布式锁的实现 ## 4.Redisson分布式锁 ### 4.1 Redisson的整合 添加对应的依赖 ```xml org.redisson redisson 3.16.1 ``` 添加对应的配置类 ```java @Configuration public class MyRedisConfig { @Bean public RedissonClient redissonClient(){ Config config = new Config(); // 配置连接的信息 config.useSingleServer() .setAddress("redis://192.168.56.100:6379"); RedissonClient redissonClient = Redisson.create(config); return redissonClient; } } ``` ### 4.2 可重入锁 ```java /** * 1.锁会自动续期,如果业务时间超长,运行期间Redisson会自动给锁重新添加30s,不用担心业务时间,锁自动过去而造成的数据安全问题 * 2.加锁的业务只要执行完成, 那么就不会给当前的锁续期,即使我们不去主动的释放锁,锁在默认30s之后也会自动的删除 * @return */ @ResponseBody @GetMapping("/hello") public String hello(){ RLock myLock = redissonClient.getLock("myLock"); // 加锁 myLock.lock(); try { System.out.println("加锁成功...业务处理....." + Thread.currentThread().getName()); Thread.sleep(30000); }catch (Exception e){ }finally { System.out.println("释放锁成功..." + Thread.currentThread().getName()); // 释放锁 myLock.unlock(); } return "hello"; } ``` ### 4.3 读写锁   根据业务操作我们可以分为读写操作,读操作其实不会影响数据,那么如果还对读操作做串行处理,效率会很低,这时我们可以通过读写锁来解决这个问题 ```java @GetMapping("/writer") @ResponseBody public String writerValue(){ RReadWriteLock readWriteLock = redissonClient.getReadWriteLock("rw-lock"); // 加写锁 RLock rLock = readWriteLock.writeLock(); String s = null; rLock.lock(); // 加写锁 try { s = UUID.randomUUID().toString(); stringRedisTemplate.opsForValue().set("msg",s); Thread.sleep(30000); } catch (InterruptedException e) { e.printStackTrace(); } finally { rLock.unlock(); } return s; } @GetMapping("/reader") @ResponseBody public String readValue(){ RReadWriteLock readWriteLock = redissonClient.getReadWriteLock("rw-lock"); // 加读锁 RLock rLock = readWriteLock.readLock(); rLock.lock(); String s = null; try { s = stringRedisTemplate.opsForValue().get("msg"); }finally { rLock.unlock(); } return s; } ``` 在读写锁中,只有读读的行为是共享锁,相互之间不影响,只要有写的行为存在,那么就是一个互斥锁(排他锁) ### 4.4 闭锁 基于Redisson的Redisson分布式闭锁([CountDownLatch](http://static.javadoc.io/org.redisson/redisson/3.10.0/org/redisson/api/RCountDownLatch.html))Java对象 `RCountDownLatch`采用了与 `java.util.concurrent.CountDownLatch`相似的接口和用法。 ```java @GetMapping("/lockDoor") @ResponseBody public String lockDoor(){ RCountDownLatch door = redissonClient.getCountDownLatch("door"); door.trySetCount(5); try { door.await(); // 等待数量降低到0 } catch (InterruptedException e) { e.printStackTrace(); } return "关门熄灯..."; } @GetMapping("/goHome/{id}") @ResponseBody public String goHome(@PathVariable Long id){ RCountDownLatch door = redissonClient.getCountDownLatch("door"); door.countDown(); // 递减的操作 return id + "下班走人"; } ``` ### 4.5 信号量(Semaphore) 基于Redis的Redisson的分布式信号量([Semaphore](http://static.javadoc.io/org.redisson/redisson/3.10.0/org/redisson/api/RSemaphore.html))Java对象 `RSemaphore`采用了与 `java.util.concurrent.Semaphore`相似的接口和用法。同时还提供了[异步(Async)](http://static.javadoc.io/org.redisson/redisson/3.10.0/org/redisson/api/RSemaphoreAsync.html)、[反射式(Reactive)](http://static.javadoc.io/org.redisson/redisson/3.10.0/org/redisson/api/RSemaphoreReactive.html)和[RxJava2标准](http://static.javadoc.io/org.redisson/redisson/3.10.0/org/redisson/api/RSemaphoreRx.html)的接口。 ```java @GetMapping("/park") @ResponseBody public String park(){ RSemaphore park = redissonClient.getSemaphore("park"); boolean b = true; try { // park.acquire(); // 获取信号 阻塞到获取成功 b = park.tryAcquire();// 返回获取成功还是失败 } catch (Exception e) { e.printStackTrace(); } return "停车是否成功:" + b; } @GetMapping("/release") @ResponseBody public String release(){ RSemaphore park = redissonClient.getSemaphore("park"); park.release(); return "释放了一个车位"; } ``` ### 4.6 缓存数据一致性问题 ![image.png](https://fynotefile.oss-cn-zhangjiakou.aliyuncs.com/fynote/1462/1647243560000/2ab29d3241704c48a77981b80538d037.png) ![image.png](https://fynotefile.oss-cn-zhangjiakou.aliyuncs.com/fynote/1462/1647243560000/572fcf3f04e1424eb951a40dc7f829fd.png) ![image.png](https://fynotefile.oss-cn-zhangjiakou.aliyuncs.com/fynote/1462/1647243560000/2e3b5c9978f245119dce2840b1363964.png) ![image.png](https://fynotefile.oss-cn-zhangjiakou.aliyuncs.com/fynote/1462/1647243560000/224bafb6fb644d3db088c61b5338548f.png) 针对于上的两种解决方案我们怎么选择? 1. 缓存的所有数据我们都加上过期时间,数据过期之后主动触发更新操作 2. 使用读写锁来处理,读读的操作是不相互影响的 无论是双写模式还是失效模式,都会导致缓存的不一致问题。即多个实例同时更新会出事。怎么办? 1. 如果是用户纬度数据(订单数据、用户数据),这种并发几率非常小,不用考虑这个问题,缓存数据加 上过期时间,每隔一段时间触发读的主动更新即可 2. 如果是菜单,商品介绍等基础数据,也可以去使用canal订阅binlog的方式。 3. 缓存数据+过期时间也足够解决大部分业务对于缓存的要求。 4. 通过加锁保证并发读写,写写的时候按顺序排好队。读读无所谓。所以适合使用读写锁。(业务不关心 脏数据,允许临时脏数据可忽略) 总结: * 我们能放入缓存的数据本就不应该是实时性、一致性要求超高的。所以缓存数据的时候加上过期时间,保证每天拿到当前最新数据即可。 * 我们不应该过度设计,增加系统的复杂性 * 遇到实时性、一致性要求高的数据,就应该查数据库,即使慢点。 # 三、SpringCache SpringCache的不足: 1).读模式 * 缓存穿透:查询一个null的数据。可以解决 cache-null-values=true * 缓存击穿:大量并发进来同时查询一个正好过期的数据。解决方案:分布式锁 sync=true 本地锁 * 缓存雪崩:大量的key同一个时间点失效。解决方案:添加过期时间 time-to-live=60000 指定过期时间 2).写模式 * 读写锁 * 引入canal,监控binlog日志文件来同步更新数据 * 读多写多,直接去数据库中读取数据即可 总结: * 常规数据(读多写少):而且对及时性和数据的一致性要求不高的情况,我们完全可以使用SpringCache * 特殊情况:特殊情况特殊处理。