|
|
# 缓存和分布式锁
|
|
|
|
|
|
# 一、缓存
|
|
|
|
|
|
## 1. 什么是缓存
|
|
|
|
|
|
  缓存的作用是减低对数据源的访问频率。从而提高我们系统的性能。
|
|
|
|
|
|

|
|
|
|
|
|

|
|
|
|
|
|
缓存的流程图
|
|
|
|
|
|

|
|
|
|
|
|
## 2.缓存的分类
|
|
|
|
|
|
### 2.1 本地缓存
|
|
|
|
|
|
  其实就是把缓存数据存储在内存中(Map `<String,Object>`).在单体架构中肯定没有问题。
|
|
|
|
|
|

|
|
|
|
|
|
单体架构下的缓存处理
|
|
|
|
|
|

|
|
|
|
|
|
### 2.2 分布式缓存
|
|
|
|
|
|
  在分布式环境下,我们原来的本地缓存就不是太使用了,原因是:
|
|
|
|
|
|
* 缓存数据冗余
|
|
|
* 缓存效率不高
|
|
|
|
|
|

|
|
|
|
|
|
  分布式缓存的结构图
|
|
|
|
|
|

|
|
|
|
|
|
## 3.整合Redis
|
|
|
|
|
|
  要整合Redis那么我们在SpringBoot项目中首页来添加对应的依赖
|
|
|
|
|
|
```xml
|
|
|
<dependency>
|
|
|
<groupId>org.springframework.boot</groupId>
|
|
|
<artifactId>spring-boot-starter-data-redis</artifactId>
|
|
|
</dependency>
|
|
|
```
|
|
|
|
|
|
  然后我们需要添加对应的配置信息
|
|
|
|
|
|

|
|
|
|
|
|
测试操作Redis的数据
|
|
|
|
|
|
```java
|
|
|
@Autowired
|
|
|
StringRedisTemplate stringRedisTemplate;
|
|
|
|
|
|
@Test
|
|
|
public void testStringRedisTemplate(){
|
|
|
// 获取操作String类型的Options对象
|
|
|
ValueOperations<String, String> ops = stringRedisTemplate.opsForValue();
|
|
|
// 插入数据
|
|
|
ops.set("name","bobo"+ UUID.randomUUID());
|
|
|
// 获取存储的信息
|
|
|
System.out.println("刚刚保存的值:"+ops.get("name"));
|
|
|
}
|
|
|
```
|
|
|
|
|
|
查看可以通过Redis的客户端连接查看
|
|
|
|
|
|

|
|
|
|
|
|
也可以通过工具查看
|
|
|
|
|
|

|
|
|
|
|
|
## 4.改造三级分类
|
|
|
|
|
|
  在首页查询二级和三级分类数据的时候我们可以通过Redis来缓存存储对应的数据,来提升检索的效率。
|
|
|
|
|
|
```java
|
|
|
@Override
|
|
|
public Map<String, List<Catalog2VO>> getCatelog2JSON() {
|
|
|
// 从Redis中获取分类的信息
|
|
|
String catalogJSON = stringRedisTemplate.opsForValue().get("catalogJSON");
|
|
|
if(StringUtils.isEmpty(catalogJSON)){
|
|
|
// 缓存中没有数据,需要从数据库中查询
|
|
|
Map<String, List<Catalog2VO>> catelog2JSONForDb = getCatelog2JSONForDb();
|
|
|
// 从数据库中查询到的数据,我们需要给缓存中也存储一份
|
|
|
String json = JSON.toJSONString(catelog2JSONForDb);
|
|
|
stringRedisTemplate.opsForValue().set("catalogJSON",json);
|
|
|
return catelog2JSONForDb;
|
|
|
}
|
|
|
// 表示缓存命中了数据,那么从缓存中获取信息,然后返回
|
|
|
Map<String, List<Catalog2VO>> stringListMap = JSON.parseObject(catalogJSON, new TypeReference<Map<String, List<Catalog2VO>>>() {
|
|
|
});
|
|
|
return stringListMap;
|
|
|
}
|
|
|
```
|
|
|
|
|
|
  然后对三级分类的数据做压力测试
|
|
|
|
|
|
| 压力测试内容 | 压力测试的线程数 | 吞吐量/s | 90%响应时间 | 99%响应时间 |
|
|
|
| ------------------------ | ---------------- | -------- | ----------- | ----------- |
|
|
|
| Nginx | 50 | 7,385 | 10 | 70 |
|
|
|
| Gateway | 50 | 23,170 | 3 | 14 |
|
|
|
| 单独测试服务 | 50 | 23,160 | 3 | 7 |
|
|
|
| Gateway+服务 | 50 | 8,461 | 12 | 46 |
|
|
|
| Nginx+Gateway | 50 | | | |
|
|
|
| Nginx+Gateway+服务 | 50 | 2,816 | 27 | 42 |
|
|
|
| 一级菜单 | 50 | 1,321 | 48 | 74 |
|
|
|
| 三级分类压测 | 50 | 12 | 4000 | 4000 |
|
|
|
| 三级分类压测(业务优化后) | 50 | 448 | 113 | 227 |
|
|
|
| 三级分类压测(Redis缓存) | 50 | 1163 | 49 | 59 |
|
|
|
|
|
|
  通过对比可以看到Redis缓存加入后的性能提升的效果还是非常明显的。
|
|
|
|
|
|

|
|
|
|
|
|
## 5.缓存穿透
|
|
|
|
|
|
  指查询一个一定不存在的数据,由于缓存是不命中,将去查询数据库,但是数据库也无此记录,我们没有将这次查询的null写入缓存,这将导致这个不存在的数据每次请求都要到存储层去查询,失去了缓存的意义.
|
|
|
|
|
|

|
|
|
|
|
|
利用不存在的数据进行攻击,数据库瞬时压力增大,最终导致崩溃,解决方案也比较简单,直接把null结果缓存,并加入短暂的过期时间
|
|
|
|
|
|

|
|
|
|
|
|
## 6.缓存雪崩
|
|
|
|
|
|
  缓存雪崩是指在我们设置缓存时key采用了相同的过期时间,导致缓存在某一时刻同时失效,请求全部转发到DB,DB瞬时压力过重雪崩。
|
|
|
|
|
|

|
|
|
|
|
|
解决方案:原有的失效时间基础上增加一个随机值,比如1-5分钟随机,这样每一个缓存的过期时间的重复率就会降低,就很难引发集体失效的事件。
|
|
|
|
|
|

|
|
|
|
|
|
## 7.缓存击穿
|
|
|
|
|
|
  对于一些设置了过期时间的key,如果这些key可能会在某些时间点被超高并发地访问,是一种非常“热点”的数据。如果这个key在大量请求同时进来前正好失效,那么所有对这个key的数据查询都落到db,我们称为缓存击穿。
|
|
|
|
|
|

|
|
|
|
|
|
解决方案:加锁,大量并发只让一个去查,其他人等待,查到以后释放锁,其他人获取到锁,先查缓存,就会有数据,不用去db。
|
|
|
|
|
|

|
|
|
|
|
|
但是当我们压力测试的时候,输出的结果有点出乎我们的意料
|
|
|
|
|
|

|
|
|
|
|
|
做了两次的查询,原因是释放锁和查询结果缓存的时序问题
|
|
|
|
|
|

|
|
|
|
|
|
我们只需要调整下释放锁和结果缓存的时序问题就可以了
|
|
|
|
|
|

|
|
|
|
|
|
然后就是完整的代码处理
|
|
|
|
|
|
```java
|
|
|
/**
|
|
|
* 查询出所有的二级和三级分类的数据
|
|
|
* 并封装为Map<String, Catalog2VO>对象
|
|
|
* @return
|
|
|
*/
|
|
|
@Override
|
|
|
public Map<String, List<Catalog2VO>> getCatelog2JSON() {
|
|
|
String key = "catalogJSON";
|
|
|
// 从Redis中获取分类的信息
|
|
|
String catalogJSON = stringRedisTemplate.opsForValue().get(key);
|
|
|
if(StringUtils.isEmpty(catalogJSON)){
|
|
|
System.out.println("缓存没有命中.....");
|
|
|
// 缓存中没有数据,需要从数据库中查询
|
|
|
Map<String, List<Catalog2VO>> catelog2JSONForDb = getCatelog2JSONForDb();
|
|
|
if(catelog2JSONForDb == null){
|
|
|
// 那就说明数据库中也不存在 防止缓存穿透
|
|
|
stringRedisTemplate.opsForValue().set(key,"1",5, TimeUnit.SECONDS);
|
|
|
}else{
|
|
|
// 从数据库中查询到的数据,我们需要给缓存中也存储一份
|
|
|
// 防止缓存雪崩
|
|
|
String json = JSON.toJSONString(catelog2JSONForDb);
|
|
|
stringRedisTemplate.opsForValue().set("catalogJSON",json,10,TimeUnit.MINUTES);
|
|
|
}
|
|
|
|
|
|
return catelog2JSONForDb;
|
|
|
}
|
|
|
System.out.println("缓存命中了....");
|
|
|
// 表示缓存命中了数据,那么从缓存中获取信息,然后返回
|
|
|
Map<String, List<Catalog2VO>> stringListMap = JSON.parseObject(catalogJSON, new TypeReference<Map<String, List<Catalog2VO>>>() {
|
|
|
});
|
|
|
return stringListMap;
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
* 从数据库查询的结果
|
|
|
* 查询出所有的二级和三级分类的数据
|
|
|
* 并封装为Map<String, Catalog2VO>对象
|
|
|
* 在SpringBoot中,默认的情况下是单例
|
|
|
* @return
|
|
|
*/
|
|
|
public Map<String, List<Catalog2VO>> getCatelog2JSONForDb() {
|
|
|
String keys = "catalogJSON";
|
|
|
synchronized (this){
|
|
|
/*if(cache.containsKey("getCatelog2JSON")){
|
|
|
// 直接从缓存中获取
|
|
|
return cache.get("getCatelog2JSON");
|
|
|
}*/
|
|
|
// 先去缓存中查询有没有数据,如果有就返回,否则查询数据库
|
|
|
// 从Redis中获取分类的信息
|
|
|
String catalogJSON = stringRedisTemplate.opsForValue().get(keys);
|
|
|
if(!StringUtils.isEmpty(catalogJSON)){
|
|
|
// 说明缓存命中
|
|
|
// 表示缓存命中了数据,那么从缓存中获取信息,然后返回
|
|
|
Map<String, List<Catalog2VO>> stringListMap = JSON.parseObject(catalogJSON, new TypeReference<Map<String, List<Catalog2VO>>>() {
|
|
|
});
|
|
|
return stringListMap;
|
|
|
}
|
|
|
System.out.println("-----------》查询数据库操作");
|
|
|
|
|
|
// 获取所有的分类数据
|
|
|
List<CategoryEntity> list = baseMapper.selectList(new QueryWrapper<CategoryEntity>());
|
|
|
// 获取所有的一级分类的数据
|
|
|
List<CategoryEntity> leve1Category = this.queryByParenCid(list,0l);
|
|
|
// 把一级分类的数据转换为Map容器 key就是一级分类的编号, value就是一级分类对应的二级分类的数据
|
|
|
Map<String, List<Catalog2VO>> map = leve1Category.stream().collect(Collectors.toMap(
|
|
|
key -> key.getCatId().toString()
|
|
|
, value -> {
|
|
|
// 根据一级分类的编号,查询出对应的二级分类的数据
|
|
|
List<CategoryEntity> l2Catalogs = this.queryByParenCid(list,value.getCatId());
|
|
|
List<Catalog2VO> Catalog2VOs =null;
|
|
|
if(l2Catalogs != null){
|
|
|
Catalog2VOs = l2Catalogs.stream().map(l2 -> {
|
|
|
// 需要把查询出来的二级分类的数据填充到对应的Catelog2VO中
|
|
|
Catalog2VO catalog2VO = new Catalog2VO(l2.getParentCid().toString(), null, l2.getCatId().toString(), l2.getName());
|
|
|
// 根据二级分类的数据找到对应的三级分类的信息
|
|
|
List<CategoryEntity> l3Catelogs = this.queryByParenCid(list,l2.getCatId());
|
|
|
if(l3Catelogs != null){
|
|
|
// 获取到的二级分类对应的三级分类的数据
|
|
|
List<Catalog2VO.Catalog3VO> catalog3VOS = l3Catelogs.stream().map(l3 -> {
|
|
|
Catalog2VO.Catalog3VO catalog3VO = new Catalog2VO.Catalog3VO(l3.getParentCid().toString(), l3.getCatId().toString(), l3.getName());
|
|
|
return catalog3VO;
|
|
|
}).collect(Collectors.toList());
|
|
|
// 三级分类关联二级分类
|
|
|
catalog2VO.setCatalog3List(catalog3VOS);
|
|
|
}
|
|
|
return catalog2VO;
|
|
|
}).collect(Collectors.toList());
|
|
|
}
|
|
|
|
|
|
return Catalog2VOs;
|
|
|
}
|
|
|
));
|
|
|
// 从数据库中获取到了对应的信息 然后在缓存中也存储一份信息
|
|
|
//cache.put("getCatelog2JSON",map);
|
|
|
// 表示缓存命中了数据,那么从缓存中获取信息,然后返回
|
|
|
if(map == null){
|
|
|
// 那就说明数据库中也不存在 防止缓存穿透
|
|
|
stringRedisTemplate.opsForValue().set(keys,"1",5, TimeUnit.SECONDS);
|
|
|
}else{
|
|
|
// 从数据库中查询到的数据,我们需要给缓存中也存储一份
|
|
|
// 防止缓存雪崩
|
|
|
String json = JSON.toJSONString(map);
|
|
|
stringRedisTemplate.opsForValue().set("catalogJSON",json,10,TimeUnit.MINUTES);
|
|
|
}
|
|
|
return map;
|
|
|
} }
|
|
|
```
|
|
|
|
|
|
## 8.本地锁的局限
|
|
|
|
|
|
  本地锁在分布式环境下,是没有办法锁住其他节点的操作的,这种情况肯定是有问题的
|
|
|
|
|
|

|
|
|
|
|
|
针对本地锁的问题,我们需要通过分布式锁来解决,那么是不是意味着本身锁在分布式场景下就不需要了呢?
|
|
|
|
|
|

|
|
|
|
|
|
  显然不是这样的,因为如果分布式环境下的每个节点不控制请求的数量,那么分布式锁的压力会非常大,这时我们需要本地锁来控制每个节点的同步,来降低分布式锁的压力,所以实际开发中我们都是本地锁和分布式锁结合使用的。
|
|
|
|
|
|
# 二、分布式锁
|
|
|
|
|
|
## 1.分布式锁的原理
|
|
|
|
|
|
  分布式锁或者本地锁的本质其实是一样的,都是将并行的操作转换为了串行的操作
|
|
|
|
|
|

|
|
|
|
|
|
## 2.分布式锁的常用解决方案
|
|
|
|
|
|
### 2.1 数据库
|
|
|
|
|
|
```txt
|
|
|
可以利用MySQL隔离性:唯一索引
|
|
|
```
|
|
|
|
|
|
```sql
|
|
|
use test;
|
|
|
CREATE TABLE `DistributedLock` (
|
|
|
`id` int(11) NOT NULL AUTO_INCREMENT COMMENT '主键',
|
|
|
`name` varchar(64) NOT NULL DEFAULT '' COMMENT '锁名',
|
|
|
`desc` varchar(1024) NOT NULL DEFAULT '备注信息',
|
|
|
`update_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '保存数据时间,自动生成',
|
|
|
PRIMARY KEY (`id`),
|
|
|
UNIQUE KEY `uidx_name` (`name`)
|
|
|
) ENGINE=InnoDB DEFAULT CHARSET=utf8 COMMENT='锁定中的方法';
|
|
|
|
|
|
//数据库中的每一条记录就是一把锁,利用的mysql唯一索引的排他性
|
|
|
|
|
|
lock(name,desc){
|
|
|
insert into DistributedLock(`name`,`desc`) values (#{name},#{desc});
|
|
|
}
|
|
|
|
|
|
unlock(name){
|
|
|
delete from DistributedLock where name = #{name}
|
|
|
}
|
|
|
```
|
|
|
|
|
|
可以利用拍他说来实现 select .... where ... for update;
|
|
|
|
|
|
乐观锁:乐观的任务数据不会出现数据安全问题,如果出现了就重试一次
|
|
|
|
|
|
```sql
|
|
|
select ...,version;
|
|
|
update table set version+1 where version = xxx
|
|
|
```
|
|
|
|
|
|
### 2.2 Redis
|
|
|
|
|
|
setNX: setNX(key,value) :如果key不存在那么就添加key的值,否则添加失败,Redisson
|
|
|
|
|
|
### 2.3Zookeeper
|
|
|
|
|
|

|
|
|
|
|
|
## 3.Redis实现分布式锁
|
|
|
|
|
|
  在Redis中是通过setNX指令来实现锁的抢占,那么利用这个命令实现分布式锁的基础代码为:
|
|
|
|
|
|
```java
|
|
|
public Map<String, List<Catalog2VO>> getCatelog2JSONDbWithRedisLock() {
|
|
|
String keys = "catalogJSON";
|
|
|
// 加锁
|
|
|
Boolean lock = stringRedisTemplate.opsForValue().setIfAbsent("lock", "1111");
|
|
|
if(lock){
|
|
|
// 加锁成功
|
|
|
Map<String, List<Catalog2VO>> data = getDataForDB(keys);
|
|
|
// 从数据库中获取数据成功后,我们应该要释放锁
|
|
|
stringRedisTemplate.delete("lock");
|
|
|
return data;
|
|
|
}else{
|
|
|
// 加锁失败
|
|
|
// 休眠+重试
|
|
|
// Thread.sleep(1000);
|
|
|
return getCatelog2JSONDbWithRedisLock();
|
|
|
}
|
|
|
}
|
|
|
```
|
|
|
|
|
|
  上面的代码其实是存在一些问题的,首先如果getDataForDB(keys)这个方法如果出现的异常,那么我们就不会删除该key也就是不会释放锁,从而造成了死锁,针对这个问题,我们可以通过设置过期时间来解决,具体代码如下:
|
|
|
|
|
|
```java
|
|
|
public Map<String, List<Catalog2VO>> getCatelog2JSONDbWithRedisLock() {
|
|
|
String keys = "catalogJSON";
|
|
|
// 加锁
|
|
|
Boolean lock = stringRedisTemplate.opsForValue().setIfAbsent("lock", "1111");
|
|
|
if(lock){
|
|
|
// 给对应的key设置过期时间
|
|
|
stringRedisTemplate.expire("lock",20,TimeUnit.SECONDS);
|
|
|
// 加锁成功
|
|
|
Map<String, List<Catalog2VO>> data = getDataForDB(keys);
|
|
|
// 从数据库中获取数据成功后,我们应该要释放锁
|
|
|
stringRedisTemplate.delete("lock");
|
|
|
return data;
|
|
|
}else{
|
|
|
// 加锁失败
|
|
|
// 休眠+重试
|
|
|
// Thread.sleep(1000);
|
|
|
return getCatelog2JSONDbWithRedisLock();
|
|
|
}
|
|
|
}
|
|
|
```
|
|
|
|
|
|
  上面虽然解决了getDataForDB方法出现异常的问题,但是如果在expire方法执行之前就中断呢?这样也会出现我们介绍的死锁的问题,那这个问题怎么办?这时我们就希望setNx和设置过期时间的操作能够保证原子性。
|
|
|
|
|
|
这时我们就可以在setIfAbsent方法中同时指定过期时间,保证这个原子性的行为
|
|
|
|
|
|
```java
|
|
|
public Map<String, List<Catalog2VO>> getCatelog2JSONDbWithRedisLock() {
|
|
|
String keys = "catalogJSON";
|
|
|
// 加锁 在执行插入操作的同时设置了过期时间
|
|
|
Boolean lock = stringRedisTemplate.opsForValue().setIfAbsent("lock", "1111",30,TimeUnit.SECONDS);
|
|
|
if(lock){
|
|
|
// 给对应的key设置过期时间
|
|
|
stringRedisTemplate.expire("lock",20,TimeUnit.SECONDS);
|
|
|
// 加锁成功
|
|
|
Map<String, List<Catalog2VO>> data = getDataForDB(keys);
|
|
|
// 从数据库中获取数据成功后,我们应该要释放锁
|
|
|
stringRedisTemplate.delete("lock");
|
|
|
return data;
|
|
|
}else{
|
|
|
// 加锁失败
|
|
|
// 休眠+重试
|
|
|
// Thread.sleep(1000);
|
|
|
return getCatelog2JSONDbWithRedisLock();
|
|
|
}
|
|
|
}
|
|
|
```
|
|
|
|
|
|
  如果获取锁的业务执行时间比较长,超过了我们设置的过期时间,那么就有可能业务还没执行完,锁就释放了,然后另一个请求进来了,并创建了key,这时原来的业务处理完成后,再去删除key的时候,那么就有可能删除别人的key,这时怎么办?针对这种情况我们可以查询的锁的信息通过UUID来区分,具体的代码如下:
|
|
|
|
|
|
```java
|
|
|
public Map<String, List<Catalog2VO>> getCatelog2JSONDbWithRedisLock() {
|
|
|
String keys = "catalogJSON";
|
|
|
// 加锁 在执行插入操作的同时设置了过期时间
|
|
|
String uuid = UUID.randomUUID().toString();
|
|
|
Boolean lock = stringRedisTemplate.opsForValue().setIfAbsent("lock", uuid,30,TimeUnit.SECONDS);
|
|
|
if(lock){
|
|
|
// 给对应的key设置过期时间
|
|
|
stringRedisTemplate.expire("lock",20,TimeUnit.SECONDS);
|
|
|
// 加锁成功
|
|
|
Map<String, List<Catalog2VO>> data = getDataForDB(keys);
|
|
|
// 获取当前key对应的值
|
|
|
String val = stringRedisTemplate.opsForValue().get("lock");
|
|
|
if(uuid.equals(val)){
|
|
|
// 说明这把锁是自己的
|
|
|
// 从数据库中获取数据成功后,我们应该要释放锁
|
|
|
stringRedisTemplate.delete("lock");
|
|
|
}
|
|
|
return data;
|
|
|
}else{
|
|
|
// 加锁失败
|
|
|
// 休眠+重试
|
|
|
// Thread.sleep(1000);
|
|
|
return getCatelog2JSONDbWithRedisLock();
|
|
|
}
|
|
|
}
|
|
|
```
|
|
|
|
|
|
  上面查询key的值和删除key其实不是一个原子性操作,这就会出现我查询出来key之后,时间过期了,然后key被删除了,然后其他的请求创建了一个新的key,然后原来的执行删除了这个key,又出现了删除别人key的情况。这时我们需要保证查询和删除是一个原子性行为。
|
|
|
|
|
|
```java
|
|
|
public Map<String, List<Catalog2VO>> getCatelog2JSONDbWithRedisLock() {
|
|
|
String keys = "catalogJSON";
|
|
|
// 加锁 在执行插入操作的同时设置了过期时间
|
|
|
String uuid = UUID.randomUUID().toString();
|
|
|
Boolean lock = stringRedisTemplate.opsForValue().setIfAbsent("lock", uuid,300,TimeUnit.SECONDS);
|
|
|
if(lock){
|
|
|
Map<String, List<Catalog2VO>> data = null;
|
|
|
try {
|
|
|
// 加锁成功
|
|
|
data = getDataForDB(keys);
|
|
|
}finally {
|
|
|
String srcipts = "if redis.call('get',KEYS[1]) == ARGV[1] then return redis.call('del',KEYS[1]) else return 0 end ";
|
|
|
// 通过Redis的lua脚本实现 查询和删除操作的原子性
|
|
|
stringRedisTemplate.execute(new DefaultRedisScript<Integer>(srcipts,Integer.class)
|
|
|
,Arrays.asList("lock"),uuid);
|
|
|
}
|
|
|
return data;
|
|
|
}else{
|
|
|
// 加锁失败
|
|
|
// 休眠+重试
|
|
|
// Thread.sleep(1000);
|
|
|
return getCatelog2JSONDbWithRedisLock();
|
|
|
}
|
|
|
}
|
|
|
```
|
|
|
|
|
|
https://space.bilibili.com/435498550 分布式锁的实现
|
|
|
|
|
|
## 4.Redisson分布式锁
|
|
|
|
|
|
### 4.1 Redisson的整合
|
|
|
|
|
|
添加对应的依赖
|
|
|
|
|
|
```xml
|
|
|
<dependency>
|
|
|
<groupId>org.redisson</groupId>
|
|
|
<artifactId>redisson</artifactId>
|
|
|
<version>3.16.1</version>
|
|
|
</dependency>
|
|
|
```
|
|
|
|
|
|
添加对应的配置类
|
|
|
|
|
|
```java
|
|
|
@Configuration
|
|
|
public class MyRedisConfig {
|
|
|
|
|
|
@Bean
|
|
|
public RedissonClient redissonClient(){
|
|
|
Config config = new Config();
|
|
|
// 配置连接的信息
|
|
|
config.useSingleServer()
|
|
|
.setAddress("redis://192.168.56.100:6379");
|
|
|
RedissonClient redissonClient = Redisson.create(config);
|
|
|
return redissonClient;
|
|
|
}
|
|
|
}
|
|
|
```
|
|
|
|
|
|
### 4.2 可重入锁
|
|
|
|
|
|
```java
|
|
|
/**
|
|
|
* 1.锁会自动续期,如果业务时间超长,运行期间Redisson会自动给锁重新添加30s,不用担心业务时间,锁自动过去而造成的数据安全问题
|
|
|
* 2.加锁的业务只要执行完成, 那么就不会给当前的锁续期,即使我们不去主动的释放锁,锁在默认30s之后也会自动的删除
|
|
|
* @return
|
|
|
*/
|
|
|
@ResponseBody
|
|
|
@GetMapping("/hello")
|
|
|
public String hello(){
|
|
|
RLock myLock = redissonClient.getLock("myLock");
|
|
|
// 加锁
|
|
|
myLock.lock();
|
|
|
try {
|
|
|
System.out.println("加锁成功...业务处理....." + Thread.currentThread().getName());
|
|
|
Thread.sleep(30000);
|
|
|
}catch (Exception e){
|
|
|
|
|
|
}finally {
|
|
|
System.out.println("释放锁成功..." + Thread.currentThread().getName());
|
|
|
// 释放锁
|
|
|
myLock.unlock();
|
|
|
}
|
|
|
return "hello";
|
|
|
}
|
|
|
```
|
|
|
|
|
|
### 4.3 读写锁
|
|
|
|
|
|
  根据业务操作我们可以分为读写操作,读操作其实不会影响数据,那么如果还对读操作做串行处理,效率会很低,这时我们可以通过读写锁来解决这个问题
|
|
|
|
|
|
```java
|
|
|
@GetMapping("/writer")
|
|
|
@ResponseBody
|
|
|
public String writerValue(){
|
|
|
RReadWriteLock readWriteLock = redissonClient.getReadWriteLock("rw-lock");
|
|
|
// 加写锁
|
|
|
RLock rLock = readWriteLock.writeLock();
|
|
|
String s = null;
|
|
|
rLock.lock(); // 加写锁
|
|
|
try {
|
|
|
s = UUID.randomUUID().toString();
|
|
|
stringRedisTemplate.opsForValue().set("msg",s);
|
|
|
Thread.sleep(30000);
|
|
|
} catch (InterruptedException e) {
|
|
|
e.printStackTrace();
|
|
|
} finally {
|
|
|
rLock.unlock();
|
|
|
}
|
|
|
return s;
|
|
|
}
|
|
|
|
|
|
@GetMapping("/reader")
|
|
|
@ResponseBody
|
|
|
public String readValue(){
|
|
|
RReadWriteLock readWriteLock = redissonClient.getReadWriteLock("rw-lock");
|
|
|
// 加读锁
|
|
|
RLock rLock = readWriteLock.readLock();
|
|
|
rLock.lock();
|
|
|
String s = null;
|
|
|
try {
|
|
|
s = stringRedisTemplate.opsForValue().get("msg");
|
|
|
}finally {
|
|
|
rLock.unlock();
|
|
|
}
|
|
|
|
|
|
return s;
|
|
|
}
|
|
|
```
|
|
|
|
|
|
在读写锁中,只有读读的行为是共享锁,相互之间不影响,只要有写的行为存在,那么就是一个互斥锁(排他锁)
|
|
|
|
|
|
### 4.4 闭锁
|
|
|
|
|
|
基于Redisson的Redisson分布式闭锁([CountDownLatch](http://static.javadoc.io/org.redisson/redisson/3.10.0/org/redisson/api/RCountDownLatch.html))Java对象 `RCountDownLatch`采用了与 `java.util.concurrent.CountDownLatch`相似的接口和用法。
|
|
|
|
|
|
```java
|
|
|
@GetMapping("/lockDoor")
|
|
|
@ResponseBody
|
|
|
public String lockDoor(){
|
|
|
RCountDownLatch door = redissonClient.getCountDownLatch("door");
|
|
|
door.trySetCount(5);
|
|
|
try {
|
|
|
door.await(); // 等待数量降低到0
|
|
|
} catch (InterruptedException e) {
|
|
|
e.printStackTrace();
|
|
|
}
|
|
|
return "关门熄灯...";
|
|
|
}
|
|
|
|
|
|
@GetMapping("/goHome/{id}")
|
|
|
@ResponseBody
|
|
|
public String goHome(@PathVariable Long id){
|
|
|
RCountDownLatch door = redissonClient.getCountDownLatch("door");
|
|
|
door.countDown(); // 递减的操作
|
|
|
return id + "下班走人";
|
|
|
}
|
|
|
```
|
|
|
|
|
|
### 4.5 信号量(Semaphore)
|
|
|
|
|
|
基于Redis的Redisson的分布式信号量([Semaphore](http://static.javadoc.io/org.redisson/redisson/3.10.0/org/redisson/api/RSemaphore.html))Java对象
|
|
|
|
|
|
`RSemaphore`采用了与 `java.util.concurrent.Semaphore`相似的接口和用法。同时还提供了[异步(Async)](http://static.javadoc.io/org.redisson/redisson/3.10.0/org/redisson/api/RSemaphoreAsync.html)、[反射式(Reactive)](http://static.javadoc.io/org.redisson/redisson/3.10.0/org/redisson/api/RSemaphoreReactive.html)和[RxJava2标准](http://static.javadoc.io/org.redisson/redisson/3.10.0/org/redisson/api/RSemaphoreRx.html)的接口。
|
|
|
|
|
|
```java
|
|
|
@GetMapping("/park")
|
|
|
@ResponseBody
|
|
|
public String park(){
|
|
|
RSemaphore park = redissonClient.getSemaphore("park");
|
|
|
boolean b = true;
|
|
|
try {
|
|
|
// park.acquire(); // 获取信号 阻塞到获取成功
|
|
|
b = park.tryAcquire();// 返回获取成功还是失败
|
|
|
} catch (Exception e) {
|
|
|
e.printStackTrace();
|
|
|
}
|
|
|
return "停车是否成功:" + b;
|
|
|
}
|
|
|
|
|
|
@GetMapping("/release")
|
|
|
@ResponseBody
|
|
|
public String release(){
|
|
|
RSemaphore park = redissonClient.getSemaphore("park");
|
|
|
park.release();
|
|
|
return "释放了一个车位";
|
|
|
}
|
|
|
```
|
|
|
|
|
|
### 4.6 缓存数据一致性问题
|
|
|
|
|
|

|
|
|
|
|
|

|
|
|
|
|
|

|
|
|
|
|
|

|
|
|
|
|
|
针对于上的两种解决方案我们怎么选择?
|
|
|
|
|
|
1. 缓存的所有数据我们都加上过期时间,数据过期之后主动触发更新操作
|
|
|
2. 使用读写锁来处理,读读的操作是不相互影响的
|
|
|
|
|
|
无论是双写模式还是失效模式,都会导致缓存的不一致问题。即多个实例同时更新会出事。怎么办?
|
|
|
|
|
|
1. 如果是用户纬度数据(订单数据、用户数据),这种并发几率非常小,不用考虑这个问题,缓存数据加
|
|
|
上过期时间,每隔一段时间触发读的主动更新即可
|
|
|
2. 如果是菜单,商品介绍等基础数据,也可以去使用canal订阅binlog的方式。
|
|
|
3. 缓存数据+过期时间也足够解决大部分业务对于缓存的要求。
|
|
|
4. 通过加锁保证并发读写,写写的时候按顺序排好队。读读无所谓。所以适合使用读写锁。(业务不关心
|
|
|
脏数据,允许临时脏数据可忽略)
|
|
|
|
|
|
总结:
|
|
|
|
|
|
* 我们能放入缓存的数据本就不应该是实时性、一致性要求超高的。所以缓存数据的时候加上过期时间,保证每天拿到当前最新数据即可。
|
|
|
* 我们不应该过度设计,增加系统的复杂性
|
|
|
* 遇到实时性、一致性要求高的数据,就应该查数据库,即使慢点。
|
|
|
|
|
|
# 三、SpringCache
|
|
|
|
|
|
SpringCache的不足:
|
|
|
|
|
|
1).读模式
|
|
|
|
|
|
* 缓存穿透:查询一个null的数据。可以解决 cache-null-values=true
|
|
|
* 缓存击穿:大量并发进来同时查询一个正好过期的数据。解决方案:分布式锁 sync=true 本地锁
|
|
|
* 缓存雪崩:大量的key同一个时间点失效。解决方案:添加过期时间 time-to-live=60000 指定过期时间
|
|
|
|
|
|
2).写模式
|
|
|
|
|
|
* 读写锁
|
|
|
* 引入canal,监控binlog日志文件来同步更新数据
|
|
|
* 读多写多,直接去数据库中读取数据即可
|
|
|
|
|
|
总结:
|
|
|
|
|
|
* 常规数据(读多写少):而且对及时性和数据的一致性要求不高的情况,我们完全可以使用SpringCache
|
|
|
* 特殊情况:特殊情况特殊处理。
|