add分布式锁

pull/1/head
595208882@qq.com 4 years ago
parent 39f094fb47
commit 740fecaab7

@ -2978,9 +2978,7 @@ if (typeof WeixinJSBridge == "undefined"){
# 分布式锁
**库存超卖问题**
## 库存超卖问题
系统A是一个电商系统目前是一台机器部署系统中有一个用户下订单的接口但是用户下订单之前一定要去检查一下库存确保库存足够了才会给用户下单。由于系统有一定的并发所以会预先将商品的库存保存在redis中用户下单的时候会更新redis的库存。此时系统架构如下
@ -2994,242 +2992,6 @@ if (typeof WeixinJSBridge == "undefined"){
**分布式锁的特点**
- **互斥性**:和我们本地锁一样互斥性是最基本,但是分布式锁需要保证在不同节点的不同线程的互斥
- **可重入性**:同一个节点上的同一个线程如果获取了锁之后那么也可以再次获取这个锁
- **锁超时**:和本地锁一样支持锁超时,防止死锁
- **高性能和高可用**:加锁和解锁需要高效,同时也需要保证高可用防止分布式锁失效,可以增加降级
- **支持阻塞和非阻塞**和ReentrantLock一样支持lock和trylock以及tryLocklong timeout
- **支持公平锁和非公平锁(可选)**:公平锁的意思是按照请求加锁的顺序获得锁,非公平锁就相反是无序的
**三种方案对比**
- **从理解的难易程度角度(从低到高)**:数据库 > 缓存 > Zookeeper
- **从实现的复杂性角度(从低到高)**Zookeeper >= 缓存 > 数据库
- **从性能角度(从高到低)**:缓存 > Zookeeper >= 数据库
- **从可靠性角度(从高到低)**Zookeeper > 缓存 > 数据库
## MySQL
**适用场景**
`Mysql` 分布式锁一般适用于资源不存在数据库,如果数据库存在比如订单,那么可以直接对这条数据加行锁,不需要我们下面多的繁琐的步骤。如一个订单,可以用 `select * from order_table where id = 'xxx' for update` 进行加行锁,那么其它事务就不能对其进行修改。
**优缺点**
- **优点**简单易于理解不需要维护额外的第三方中间件如RedisZk
- **缺点**:实现起来较为繁琐、需要自己考虑锁超时和加事务、性能局限于数据库
### 表主键唯一 + 乐观锁
利用主键唯一特性,如有多个请求同时提交到数据库,数据库会保证只有一个操作可以成功,那么我们就可以认为操作成功的那个线程获得了该方法的锁,当方法执行完毕之后,想要释放锁的话,删除这条数据库记录即可。上述方案简单实现有以下几个问题:
- 强依赖数据库可用性,是一个单点(部署双实例)
- 没有失效时间,一旦解锁失败,就会导致死锁(添加定时任务扫描表)
- 一旦插入失败就会直接报错不会进入排队队列使用while循环成功后才返回
- 是非重入锁,同一线程在没有释放锁之前无法再次获得该锁(添加字段记录机器和线程信息,查询时相同则直接分配)
- 非公平锁(建中间表记录等待锁的线程,根据创建时间排序后进行依次处理)
- 采用主键冲突防重,在大并发情况下有可能会造成锁表现象(采用程序生产主键进行防重)
### 表字段版本号 + 乐观锁
这个策略源于 `mysql``mvcc` 机制,使用这个策略其实本身没有什么问题,唯一的问题就是对数据表侵入较大,我们要为每个表设计一个版本号字段,然后写一条判断 `sql` 每次进行判断,增加了数据库操作的次数,在高并发的要求下,对数据库连接的开销也是无法忍受的。
### 基于数据库排他锁 + 悲观锁
在查询语句后面增加`for update`(会在执行成功后立即返回,在执行失败时一直处于阻塞状态,直到成功),数据库会在查询过程中给数据库表增加排他锁(注意: `InnoDB` 引擎在加锁的时候,只有通过索引进行检索的时候才会使用行级锁,否则会使用表级锁)。当获取到锁之后,可以执行方法的业务逻辑,执行完方法之后,通过`connection.commit()`操作来释放锁。依然没有办法直接解决数据库单点和可重入的问题。
- **表结构设计**
![mysqllock](images/Architecture/mysqllock.png)
- **lock()**
![mysqllock-lock-java](images/Architecture/mysqllock-lock-java.png)
![mysqllock-lock-sql](images/Architecture/mysqllock-lock-sql.png)
- **trylock()**
![mysqllock-trylock](images/Architecture/mysqllock-trylock.png)
- **trylock(long timeout)**
![mysqllock-trylock-timeout](images/Architecture/mysqllock-trylock-timeout.png)
- **unlock()**
![mysqllock-unlock](images/Architecture/mysqllock-unlock.png)
## Redis
基于缓存的分布式锁常用方式包括如下:
- 使用**Redis**的`setnx()`用于分布式锁(`setnx`直接设置值为当前时间+超时时间,保持操作原子性)
- 使用**Memcached**的`add()`方法用于分布式锁
- 使用**Tair**的`put()`方法用于分布式锁
**优缺点**
- **优点**:对于`Redis`实现简单,性能对比`ZK`和`Mysql`较好。如果不需要特别复杂的要求,那么自己就可以利用`setNx`进行实现,如果自己需要复杂的需求的话那么可以利用或者借鉴`Redission`。对于一些要求比较严格的场景来说的话可以使用`RedLock`
- **缺点**:需要维护`Redis`集群,如果要实现`RedLock`那么需要维护更多的集群
### LUA+SETNX+EXPIRE
先用`setnx`来抢锁,如果抢到之后,再用`expire`给锁设置一个过期时间,防止锁忘记了释放。
- **setnx()**
`setnx` 的含义就是 `SET if Not Exists`,其主要有两个参数 `setnx(key, value)`,该方法是原子的
- 如果 `key` 不存在,则设置当前 `key` 成功,返回 `1`
- 如果当前 `key` 已经存在,则设置当前 `key` 失败,返回 `0`
- **expire()**
`expire` 设置过期时间,要注意的是 `setnx` 命令不能设置 `key` 的超时时间,只能通过 `expire()` 来对 `key` 设置。
```java
ifjedis.setnx(key_resource_id,lock_value) == 1{ // 加锁
expirekey_resource_id100; // 设置过期时间
try {
do something // 业务请求
}catch(){
}
finally {
jedis.del(key_resource_id); // 释放锁
}
}
```
但该方案中,`setnx`和`expire`两个命令分开了,**「不是原子操作」**。如果执行完`setnx`加锁,正要执行`expire`设置过期时间时进程crash或者要重启维护了那么这个锁就永远不会过期其它线程永远获取不到锁。
**使用Lua脚本(SETNX+EXPIRE)**
实际上我们还可以使用Lua脚本来保证原子性包含setnx和expire两条指令lua脚本如下
```lua
if redis.call('setnx',KEYS[1],ARGV[1]) == 1 then
redis.call('expire',KEYS[1],ARGV[2])
else
return 0
end;
```
加锁代码如下:
```java
String lua_scripts = "if redis.call('setnx',KEYS[1],ARGV[1]) == 1 then" +
" redis.call('expire',KEYS[1],ARGV[2]) return 1 else return 0 end";
Object result = jedis.eval(lua_scripts, Collections.singletonList(key_resource_id), Collections.singletonList(values));
//判断是否成功
return result.equals(1L);
```
### Redisson
### Redlock
## Zookeeper
**实现原理**
**基于zookeeper临时有序节点可以实现的分布式锁**。每个客户端对某个方法加锁时,在`Zookeeper`上与该方法对应的指定节点的目录下,生成一个唯一的瞬时有序节点。 判断是否获取锁的方式很简单,只需要判断有序节点中序号最小的一个。 当释放锁的时候,只需将这个瞬时节点删除即可。同时,其可以避免服务宕机导致的锁无法释放,而产生的死锁问题。
![zookeeperlock](images/Architecture/zookeeperlock.png)
**Curator**
`Curator`封装了Zookeeper底层的Api使我们更加容易方便的对Zookeeper进行操作并且它封装了分布式锁的功能。
- **InterProcessMutex**:可重入锁。在可重入锁中还实现了读写锁
- **InterProcessSemaphoreMutex**:不可重入锁
**优缺点**
- **优点**ZK可以不需要关心锁超时时间实现起来有现成的第三方包比较方便并且支持读写锁ZK获取锁会按照加锁的顺序所以其是公平锁。对于高可用利用ZK集群进行保证
- **缺点**ZK需要额外维护增加维护成本性能和Mysql相差不大依然比较差。并且需要开发人员了解ZK是什么
### InterProcessMutex
`InterProcessMutex`是Curator实现的**可重入锁**,我们可以通过下面的一段代码实现我们的可重入锁:
![InterProcessMutex](images/Architecture/InterProcessMutex.png)
**加锁流程:**
- 首先进行可重入的判定:这里的可重入锁记录在`ConcurrentMap<Thread, LockData> threadData`这个Map里面如果`threadData.get(currentThread)`是有值的那么就证明是可重入锁然后记录就会加1。我们之前的Mysql其实也可以通过这种方法去优化可以不需要`count`字段的值,将这个维护在本地可以提高性能
- 然后在我们的资源目录下创建一个节点:比如这里创建一个`/0000000002`这个节点,这个节点需要设置为`EPHEMERAL_SEQUENTIAL`也就是临时节点并且有序
- 获取当前目录下所有子节点,判断自己的节点是否位于子节点第一个
- 如果是第一个,则获取到锁,那么可以返回
- 如果不是第一个,则证明前面已经有人获取到锁,那么需要获取自己节点的前一个节点。`/0000000002`的前一个节点是`/0000000001`,我们获取到该节点后,再上面注册`Watcher`(这里的`watcher`其实调用的是`object.notifyAll()`,用来解除阻塞)
- `object.wait(timeout)`或`object.wait()`:进行阻塞等待这里和我们第5步的`watcher`相对应
**解锁流程:**
- 首先进行可重入锁的判定:如果有可重入锁只需要次数减1即可减1之后加锁次数为0的话继续下面步骤不为0直接返回
- 删除当前节点
- 删除`threadDataMap`里面的可重入锁的数据
### InterProcessReadWriteLock
`Curator`提供了**读写锁**,其实现类是`InterProcessReadWriteLock`,这里的每个节点都会加上前缀:
```java
private static final String READ_LOCK_NAME = "__READ__";
private static final String WRITE_LOCK_NAME = "__WRIT__";
```
根据不同的前缀区分是读锁还是写锁,对于读锁,如果发现前面有写锁,那么需要将`Watcher`注册到和自己最近的写锁。写锁的逻辑和`InterProcessMutex`的分析依然保持不变。
### 锁超时
`Zookeeper`不需要配置锁超时,由于我们设置节点是临时节点,我们的每个机器维护着一个`ZK`的`Session`,通过这个`Session``Zookeeper`可以判断机器是否宕机。如果我们的机器挂掉的话,那么这个临时节点对应的就会被删除,所以我们不需要关心锁超时。
# 数据脱敏
先来看看什么是数据脱敏?数据脱敏也叫数据的去隐私化,在我们给定脱敏规则和策略的情况下,对敏感数据比如 `手机号`、`银行卡号` 等信息,进行转换或者修改的一种技术手段,防止敏感数据直接在不可靠的环境下使用。像政府、医疗行业、金融机构、移动运营商是比较早开始应用数据脱敏的,因为他们所掌握的都是用户最核心的私密数据,如果泄露后果是不可估量的。数据脱敏的应用在生活中是比较常见的,比如我们在淘宝买东西订单详情中,商家账户信息会被用 `*` 遮挡,保障了商户隐私不泄露,这就是一种数据脱敏方式。数据脱敏又分为静态数据脱敏(`SDM`)和 动态数据脱敏(`DDM`)。

@ -1088,13 +1088,13 @@ Træfɪk 是一个为了让部署微服务更加便捷而诞生的现代HTTP反
![ServiceDowngrade](images/Solution/ServiceDowngrade.png)
**使用场景**
## 使用场景
服务降级主要用于什么场景呢?当整个微服务架构整体的负载超出了预设的上限阈值或即将到来的流量预计将会超过预设的阈值时,为了保证重要或基本的服务能正常运行,我们可以将一些 **不重要** 或 **不紧急** 的服务或任务进行服务的 **延迟使用** 或 **暂停使用**。
**服务降级要考虑的问题**
## 服务降级要考虑的问题
- 核心和非核心服务
- 是否支持降级,降级策略
@ -1851,6 +1851,801 @@ List< Long > ids = TinyId . nextId( " test " , 10 );
# 分布式锁
**何为分布式锁?**
- 当在分布式模型下,数据只有一份(或有限制),此时需要利用锁的技术控制某一时刻修改数据的进程数
- 用一个状态值表示锁,对锁的占用和释放通过状态值来标识
**分布式锁的特点**
- **互斥性**:和我们本地锁一样互斥性是最基本,但是分布式锁需要保证在不同节点的不同线程的互斥
- **可重入性**:同一个节点上的同一个线程如果获取了锁之后那么也可以再次获取这个锁
- **锁超时**:和本地锁一样支持锁超时,防止死锁
- **高性能和高可用**:加锁和解锁需要高效,同时也需要保证高可用防止分布式锁失效,可以增加降级
- **支持阻塞和非阻塞**和ReentrantLock一样支持lock和trylock以及tryLocklong timeout
- **支持公平锁和非公平锁(可选)**:公平锁的意思是按照请求加锁的顺序获得锁,非公平锁就相反是无序的
**三种方案对比**
- **从理解的难易程度角度(从低到高)**:数据库 > 缓存 > Zookeeper
- **从实现的复杂性角度(从低到高)**Zookeeper >= 缓存 > 数据库
- **从性能角度(从高到低)**:缓存 > Zookeeper >= 数据库
- **从可靠性角度(从高到低)**Zookeeper > 缓存 > 数据库
## MySQL
### 基于唯一索引(`insert`)实现
**记录锁的乐观锁方案**。基于数据库的实现方式的核心思想是:在数据库中创建一个表,表中包含**方法名**等字段,并在**方法名字段上创建唯一索引**,想要执行某个方法,就使用这个方法名向表中插入数据,成功插入则获取锁,执行完成后删除对应的行数据释放锁。
#### 优缺点
**优点**
- 实现简单、易于理解
**缺点**
- 没有线程唤醒,获取失败就被丢掉了
- 没有超时保护,一旦解锁操作失败,就会导致锁记录一直在数据库中,其他线程无法再获得到锁
- 这把锁强依赖数据库的可用性,数据库是一个单点,一旦数据库挂掉,会导致业务系统不可用
- 并发量大的时候请求量大,获取锁的间隔,如果较小会给系统和数据库造成压力
- 这把锁只能是非阻塞的因为数据的insert操作一旦插入失败就会直接报错没有获得锁的线程并不会进入排队队列要想再次获得锁就要再次触发获得锁操作
- 这把锁是非重入的,同一个线程在没有释放锁之前无法再次获得该锁,因为数据中数据已经存在了
- 这把锁是非公平锁,所有等待锁的线程凭运气去争夺锁
#### 实现方案
```mysql
DROP TABLE IF EXISTS `method_lock`;
CREATE TABLE `method_lock` (
`id` int(11) unsigned NOT NULL AUTO_INCREMENT COMMENT '主键',
`lock_key` varchar(64) NOT NULL DEFAULT '' COMMENT '锁的键值',
`lock_timeout` datetime NOT NULL DEFAULT NOW() COMMENT '锁的超时时间',
`remarks` varchar(255) NOT NULL COMMENT '备注信息',
`update_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
PRIMARY KEY (`id`),
UNIQUE KEY `uidx_lock_key` (`lock_key`) USING BTREE
) ENGINE=InnoDB AUTO_INCREMENT=3 DEFAULT CHARSET=utf8 COMMENT='锁定中的方法';
```
**① 获取锁**:想要执行某个方法,就使用这个方法名向表中插入数据
```mysql
INSERT INTO method_lock (lock_key, lock_timeout, remarks) VALUES ('methodName', '2021-07-19 18:20:00', '测试的methodName');
```
**② 释放锁**:释放锁的时候就删除记录
```mysql
DELETE FROM method_lock WHERE lock_key ='methodName';
```
#### 问题与解决
- 强依赖数据库可用性,是一个单点(部署双实例)
- 没有失效时间,一旦解锁失败,就会导致死锁(添加定时任务扫描表)
- 一旦插入失败就会直接报错不会进入排队队列使用while循环成功后才返回
- 是非重入锁,同一线程在没有释放锁之前无法再次获得该锁(添加字段记录机器和线程信息,查询时相同则直接分配)
- 非公平锁(建中间表记录等待锁的线程,根据创建时间排序后进行依次处理)
- 采用唯一索引冲突防重,在大并发情况下有可能会造成锁表现象(采用程序生产主键进行防重)
### 基于表字段版本号实现
**版本号对比更新的乐观锁方案**。一般是通过为数据库表添加一个 `version` 字段来实现读取出数据时,将此版本号一同读出。之后更新时,对此版本号加 `1`,在更新过程中,会对版本号进行比较,如果是一致的,没有发生改变,则会成功执行本次操作;如果版本号不一致,则会更新失败。实际就是个`CAS`过程。
#### 优缺点
**缺点**
- 该方式使原本一次的update操作必须变为2次操作select版本号一次、update一次。增加了数据库操作的次数
- 如果业务场景中的一次业务流程中,多个资源都需要用保证数据一致性,那么如果全部使用基于数据库资源表的乐观锁,就要让每个资源都有一张资源表,这个在实际使用场景中肯定是无法满足的。而且这些都基于数据库操作,在高并发的要求下,对数据库连接的开销一定是无法忍受的
- 乐观锁机制往往基于系统中的数据存储逻辑,因此可能会造成脏数据被更新到数据库中
### 基于排他锁(`for update`)实现
**基于排它锁的悲观锁方案**。通过在select语句后增加`for update`来获取锁,数据库会在查询过程中给数据库表增加排他锁。当某条记录被加上排他锁之后,其他线程无法再在该行记录上增加排他锁,我们可以认为获得排它锁的线程即可获得分布式锁。释放锁通过`connection.commit();`操作,提交事务来实现。
#### 优缺点
**优点**
- 实现简单、易于理解
**缺点**
- 排他锁会占用连接,产生连接爆满的问题
- 如果表不大,可能并不会使用行锁
- 同样存在单点问题、并发量问题
#### 实现方案
**建表脚本**
```mysql
CREATE TABLE `methodLock` (
`id` INT ( 11 ) NOT NULL AUTO_INCREMENT COMMENT '主键',
`method_name` VARCHAR ( 64 ) NOT NULL DEFAULT '' COMMENT '锁定的方法名',
`desc` VARCHAR ( 1024 ) NOT NULL DEFAULT '备注信息',
`update_time` TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '保存数据时间,自动生成',
PRIMARY KEY ( `id` ),
UNIQUE KEY `uidx_method_name` ( `method_name ` ) USING BTREE
) ENGINE = INNODB DEFAULT CHARSET = utf8 COMMENT = '锁定中的方法';
```
**加解锁操作**
```java
/**
* 加锁
*/
public boolean lock() {
// 开启事务
connection.setAutoCommit(false);
// 循环阻塞,等待获取锁
while (true) {
// 执行获取锁的sql
String sql = "select * from methodLock where method_name = xxx for update";
// 创建prepareStatement对象用于执行SQL
ps = conn.prepareStatement(sql);
// 获取查询结果集
int result = ps.executeQuery();
// 结果非空,加锁成功
if (result != null) {
return true;
}
}
// 加锁失败
return false;
}
/**
* 解锁
*/
public void unlock() {
// 提交事务,解锁
connection.commit();
}
```
## Redis
### LUA+SETNX+EXPIRE
先用`setnx`来抢锁,如果抢到之后,再用`expire`给锁设置一个过期时间,防止锁忘记了释放。
- **setnx(key, value)**
`setnx` 的含义就是 `SET if Not Exists`,该方法是原子的。如果 `key` 不存在,则设置当前 `key``value` 成功,返回 `1`;如果当前 `key` 已经存在,则设置当前 `key` 失败,返回 `0`
- **expire(key, seconds)**
`expire` 设置过期时间,要注意的是 `setnx` 命令不能设置 `key` 的超时时间,只能通过 `expire()` 来对 `key` 设置。
**使用Lua脚本(SETNX+EXPIRE)**
可以使用Lua脚本来保证原子性包含setnx和expire两条指令加解锁代码如下
```java
/**
* 使用Lua脚本脚本中使用setnex+expire命令进行加锁操作
*/
public boolean lock(Jedis jedis, String key, String uniqueId, int seconds) {
String luaScript = "if redis.call('setnx',KEYS[1],ARGV[1]) == 1 then" +
"redis.call('expire',KEYS[1],ARGV[2]) return 1 else return 0 end";
Object result = jedis.eval(luaScript, Collections.singletonList(key),
Arrays.asList(uniqueId, String.valueOf(seconds)));
return result.equals(1L);
}
/**
* 使用Lua脚本进行解锁操纵解锁的时候验证value值
*/
public boolean unlock(Jedis jedis, String key, String value) {
String luaScript = "if redis.call('get',KEYS[1]) == ARGV[1] then " +
"return redis.call('del',KEYS[1]) else return 0 end";
return jedis.eval(luaScript, Collections.singletonList(key), Collections.singletonList(value)).equals(1L);
}
```
**STW**
如果在写文件过程中,发生了 FullGC并且其时间跨度较长 超过了锁超时的时间, 那么分布式就自动释放了。在此过程中client2 抢到锁写了文件。client1 的FullGC完成后也继续写文件**注意,此时 client1 的并没有占用锁**此时写入会导致文件数据错乱发生线程安全问题。这就是STW导致的锁过期问题。STW导致的锁过期问题如下图所示
![STW导致的锁过期问题](images/Solution/STW导致的锁过期问题.png)
STW导致的锁过期问题大概的解决方案有
- **方案一: 模拟CAS乐观锁的方式增加版本号如下图中的token**
![模拟CAS乐观锁的方式-增加版本号](images/Solution/模拟CAS乐观锁的方式-增加版本号.png)
此方案如果要实现,需要调整业务逻辑,与之配合,所以会入侵代码。
- **方案二watch dog自动延期机制**
客户端1加锁的锁key默认生存时间才30秒如果超过了30秒客户端1还想一直持有这把锁怎么办呢简单只要客户端1一旦加锁成功就会启动一个watch dog看门狗**它是一个后台线程会每隔10秒检查一下**如果客户端1还持有锁key那么就会不断的延长锁key的生存时间。Redission采用的就是这种方案 此方案不会入侵业务代码。
### SET-NX-EX
**方案**`SET key value [EX seconds] [PX milliseconds] [NX|XX]`
- `EX second` :设置键的过期时间为 `second` 秒。 `SET key value EX second` 效果等同于 `SETEX key second value`
- `PX millisecond` :设置键的过期时间为 `millisecond` 毫秒。 `SET key value PX millisecond` 效果等同于 `PSETEX key millisecond value`
- `NX` :只在键不存在时,才对键进行设置操作。 `SET key value NX` 效果等同于 `SETNX key value`
- `XX` :只在键已经存在时,才对键进行设置操作
客户端执行以上的命令:
- 如果服务器返回 `OK` ,那么这个客户端获得锁
- 如果服务器返回 `NIL` ,那么客户端获取锁失败,可以在稍后再重试
**① 加锁**使用redis命令 set key value NX EX max-lock-time 实现加锁
```java
Jedis jedis = new Jedis("127.0.0.1", 6379);
private static final String SUCCESS = "OK";
/**
* 加锁操作
* @param key 锁标识
* @param value 客户端标识
* @param timeOut 过期时间
*/
public Boolean lock(String key,String value,Long timeOut){
String var1 = jedis.set(key,value,"NX","EX",timeOut);
if(LOCK_SUCCESS.equals(var1)){
return true;
}
return false;
}
```
- 加锁操作 `jedis.set(key,value,"NX","EX",timeOut)`【保证加锁的原子操作】
- `key`是`redis`的`key`值作为锁的标识,`value`在作为客户端的标识,只有`key-value`都比配才有删除锁的权利【保证安全性】
- 通过`timeout`设置过期时间保证不会出现死锁【避免死锁】
- `NX`:只有这个`key`不存才的时候才会进行操作,`if not exists`
- `EX`:设置`key`的过期时间为秒,具体时间由第`5`个参数决定,过期时间设置的合理有效期需要根据业务具体决定,总的原则是任务执行`time*3`
**② 解锁**使用redis命令 EVAL 实现解锁
```java
Jedis jedis = new Jedis("127.0.0.1", 6379);
private static final String SUCCESS = "OK";
/**
* 加锁操作
* @param key 锁标识
* @param value 客户端标识
* @param timeOut 过期时间
*/
public Boolean lock(String key,String value,Long timeOut){
String var1 = jedis.set(key,value,"NX","EX",timeOut);
if(LOCK_SUCCESS.equals(var1)){
return true;
}
return false;
}
```
- luaScript 这个字符串是个lua脚本代表的意思是如果根据key拿到的value跟传入的value相同就执行del否则就返回0【保证安全性】
- jedis.eval(String,list,list);这个命令就是去执行lua脚本KEYS的集合就是第二个参数ARGV的集合就是第三参数【保证解锁的原子操作】
**③ 重试**
如果在业务中去拿锁如果没有拿到是应该阻塞着一直等待还是直接返回,这个问题其实可以写一个重试机制,根据重试次数和重试时间做一个循环去拿锁,当然这个重试的次数和时间设多少合适,是需要根据自身业务去衡量的。
```java
/**
* 重试机制
* @param key 锁标识
* @param value 客户端标识
* @param timeOut 过期时间
* @param retry 重试次数
* @param sleepTime 重试间隔时间
* @return
*/
public Boolean lockRetry(String key,String value,Long timeOut,Integer retry,Long sleepTime){
Boolean flag = false;
try {
for (int i=0;i<retry;i++){
flag = lock(key,value,timeOut);
if(flag){
break;
}
Thread.sleep(sleepTime);
}
}catch (Exception e){
e.printStackTrace();
}
return flag;
}
```
### Redisson
Redisson是一个在Redis的基础上实现的Java驻内存数据网格In-Memory Data Grid。它不仅提供了一系列的分布式的Java常用对象还实现了可重入锁Reentrant Lock、公平锁Fair Lock、联锁MultiLock、 红锁RedLock、 读写锁ReadWriteLock还提供了许多分布式服务。
![Redisson](images/Solution/Redisson.jpg)
**特性功能**
- 支持 Redis 单节点single模式、哨兵sentinel模式、主从Master/Slave模式以及集群Redis Cluster模式
- 程序接口调用方式采用异步执行和异步流执行两种方式
- 数据序列化Redisson 的对象编码类是用于将对象进行序列化和反序列化,以实现对该对象在 Redis 里的读取和存储
- 单个集合数据分片在集群模式下Redisson 为单个 Redis 集合类型提供了自动分片的功能
- 提供多种分布式对象Object BucketBitsetAtomicLongBloom Filter 和 HyperLogLog 等
- 提供丰富的分布式集合MapMultimapSetSortedSetListDequeQueue 等
- 分布式锁和同步器的实现可重入锁Reentrant Lock公平锁Fair Lock联锁MultiLock红锁Red Lock信号量Semaphonre可过期性信号锁PermitExpirableSemaphore
- 提供先进的分布式服务如分布式远程服务Remote Service分布式实时对象Live Object服务分布式执行服务Executor Service分布式调度任务服务Schedule Service和分布式映射归纳服务MapReduce
**Watch dog**
![Redisson分布式锁](images/Solution/Redisson分布式锁.jpg)
总体的Redisson框架的分布式锁类型大致如下
- **可重入锁**
- **公平锁**
- **联锁**
- **红锁**
- **读写锁**
- **信号量**
- **可过期信号量**
- **闭锁(/倒数闩)**
添加依赖
```xml
<!-- 方式一redisson-java -->
<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson</artifactId>
<version>3.11.4</version>
</dependency>
<!-- 方式二redisson-springboot -->
<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson-spring-boot-starter</artifactId>
<version>3.11.4</version>
</dependency>
```
定义接口
```java
import org.redisson.api.RLock;
import java.util.concurrent.TimeUnit;
public interface DistributedLocker {
RLock lock(String lockKey);
RLock lock(String lockKey, int timeout);
RLock lock(String lockKey, TimeUnit unit, int timeout);
boolean tryLock(String lockKey, TimeUnit unit, int waitTime, int leaseTime);
void unlock(String lockKey);
void unlock(RLock lock);
}
```
实现分布式锁
```java
import org.redisson.api.RLock;
import org.redisson.api.RedissonClient;
import java.util.concurrent.TimeUnit;
public class RedissonDistributedLocker implements DistributedLocker{
private RedissonClient redissonClient;
@Override
public RLock lock(String lockKey) {
RLock lock = redissonClient.getLock(lockKey);
lock.lock();
return lock;
}
@Override
public RLock lock(String lockKey, int leaseTime) {
RLock lock = redissonClient.getLock(lockKey);
lock.lock(leaseTime, TimeUnit.SECONDS);
return lock;
}
@Override
public RLock lock(String lockKey, TimeUnit unit ,int timeout) {
RLock lock = redissonClient.getLock(lockKey);
lock.lock(timeout, unit);
return lock;
}
@Override
public boolean tryLock(String lockKey, TimeUnit unit, int waitTime, int leaseTime) {
RLock lock = redissonClient.getLock(lockKey);
try {
return lock.tryLock(waitTime, leaseTime, unit);
} catch (InterruptedException e) {
return false;
}
}
@Override
public void unlock(String lockKey) {
RLock lock = redissonClient.getLock(lockKey);
lock.unlock();
}
@Override
public void unlock(RLock lock) {
lock.unlock();
}
public void setRedissonClient(RedissonClient redissonClient) {
this.redissonClient = redissonClient;
}
}
```
**高可用的RedLock红锁原理**
RedLock算法思想是不能只在一个redis实例上创建锁应该是在多个redis实例上创建锁n / 2 + 1必须在大多数redis节点上都成功创建锁才能算这个整体的RedLock加锁成功避免说仅仅在一个redis实例上加锁而带来的问题。
## Zookeeper
### Apache-Curator
![InterProcessMutex](images/Solution/InterProcessMutex.png)
如上借助于临时顺序节点可以避免同时多个节点的并发竞争锁缓解了服务端压力。这种实现方式所有加锁请求都进行排队加锁是公平锁的具体实现。Apache-Curator中提供的常见锁有如下
- **InterProcessMutex**:就是公平锁的实现。可重入、独占锁
- **InterProcessSemaphoreMutex**:不可重入、独占锁
- **InterProcessReadWriteLock**:读写锁
- **InterProcessSemaphoreV2**:共享信号量
- **InterProcessMultiLock**:多重共享锁 (将多个锁作为单个实体管理的容器)
### 使用案例
```java
import java.util.Arrays;
import java.util.Collection;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.locks.InterProcessLock;
import org.apache.curator.framework.recipes.locks.InterProcessMultiLock;
import org.apache.curator.framework.recipes.locks.InterProcessMutex;
import org.apache.curator.framework.recipes.locks.InterProcessReadWriteLock;
import org.apache.curator.framework.recipes.locks.InterProcessSemaphoreMutex;
import org.apache.curator.framework.recipes.locks.InterProcessSemaphoreV2;
import org.apache.curator.framework.recipes.locks.Lease;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.curator.utils.CloseableUtils;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
public class DistributedLockDemo {
// ZooKeeper 锁节点路径, 分布式锁的相关操作都是在这个节点上进行
private final String lockPath = "/distributed-lock";
// ZooKeeper 服务地址, 单机格式为:(127.0.0.1:2181),
// 集群格式为:(127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183)
private String connectString="127.0.0.1:2181";
// Curator 客户端重试策略
private RetryPolicy retry;
// Curator 客户端对象
private CuratorFramework client1;
// client2 用户模拟其他客户端
private CuratorFramework client2;
// 初始化资源
@Before
public void init() throws Exception {
// 重试策略
// 初始休眠时间为 1000ms, 最大重试次数为 3
retry = new ExponentialBackoffRetry(1000, 3);
// 创建一个客户端, 60000(ms)为 session 超时时间, 15000(ms)为链接超时时间
client1 = CuratorFrameworkFactory.newClient(connectString, 60000, 15000, retry);
client2 = CuratorFrameworkFactory.newClient(connectString, 60000, 15000, retry);
// 创建会话
client1.start();
client2.start();
}
// 释放资源
@After
public void close() {
CloseableUtils.closeQuietly(client1);
}
/**
* InterProcessMutex可重入、独占锁
*/
@Test
public void sharedReentrantLock() throws Exception {
// 创建可重入锁
InterProcessMutex lock1 = new InterProcessMutex(client1, lockPath);
// lock2 用于模拟其他客户端
InterProcessMutex lock2 = new InterProcessMutex(client2, lockPath);
// lock1 获取锁
lock1.acquire();
try {
// lock1 第2次获取锁
lock1.acquire();
try {
// lock2 超时获取锁, 因为锁已经被 lock1 客户端占用, 所以lock2获取锁失败, 需要等 lock1 释放
Assert.assertFalse(lock2.acquire(2, TimeUnit.SECONDS));
} finally {
lock1.release();
}
} finally {
// 重入锁获取与释放需要一一对应, 如果获取 2 次, 释放 1 次, 那么该锁依然是被占用,
// 如果将下面这行代码注释, 那么会发现下面的 lock2
// 获取锁失败
lock1.release();
}
// 在 lock1 释放后, lock2 能够获取锁
Assert.assertTrue(lock2.acquire(2, TimeUnit.SECONDS));
lock2.release();
}
/**
* InterProcessSemaphoreMutex 不可重入、独占锁
*/
@Test
public void sharedLock() throws Exception {
InterProcessSemaphoreMutex lock1 = new InterProcessSemaphoreMutex(client1, lockPath);
// lock2 用于模拟其他客户端
InterProcessSemaphoreMutex lock2 = new InterProcessSemaphoreMutex(client2, lockPath);
// 获取锁对象
lock1.acquire();
// 测试是否可以重入
// 因为锁已经被获取, 所以返回 false
Assert.assertFalse(lock1.acquire(2, TimeUnit.SECONDS));// lock1 返回是false
Assert.assertFalse(lock2.acquire(2, TimeUnit.SECONDS));// lock2 返回是false
// lock1 释放锁
lock1.release();
// lock2 尝试获取锁成功, 因为锁已经被释放
Assert.assertTrue(lock2.acquire(2, TimeUnit.SECONDS));// 返回是true
lock2.release();
System.out.println("测试结束");
}
/**
* InterProcessReadWriteLock读写锁.
* 特点:读写锁、可重入
*/
@Test
public void sharedReentrantReadWriteLock() throws Exception {
// 创建读写锁对象, Curator 以公平锁的方式进行实现
InterProcessReadWriteLock lock1 = new InterProcessReadWriteLock(client1, lockPath);
// lock2 用于模拟其他客户端
InterProcessReadWriteLock lock2 = new InterProcessReadWriteLock(client2, lockPath);
// 使用 lock1 模拟读操作
// 使用 lock2 模拟写操作
// 获取读锁(使用 InterProcessMutex 实现, 所以是可以重入的)
final InterProcessLock readLock = lock1.readLock();
// 获取写锁(使用 InterProcessMutex 实现, 所以是可以重入的)
final InterProcessLock writeLock = lock2.writeLock();
/**
* 读写锁测试对象
*/
class ReadWriteLockTest {
// 测试数据变更字段
private Integer testData = 0;
private Set<Thread> threadSet = new HashSet<>();
// 写入数据
private void write() throws Exception {
writeLock.acquire();
try {
Thread.sleep(10);
testData++;
System.out.println("写入数据 \t" + testData);
} finally {
writeLock.release();
}
}
// 读取数据
private void read() throws Exception {
readLock.acquire();
try {
Thread.sleep(10);
System.out.println("读取数据 \t" + testData);
} finally {
readLock.release();
}
}
// 等待线程结束, 防止 test 方法调用完成后, 当前线程直接退出, 导致控制台无法输出信息
public void waitThread() throws InterruptedException {
for (Thread thread : threadSet) {
thread.join();
}
}
// 创建线程方法
private void createThread(final int type) {
Thread thread = new Thread(new Runnable() {
@Override
public void run() {
try {
if (type == 1) {
write();
} else {
read();
}
} catch (Exception e) {
e.printStackTrace();
}
}
});
threadSet.add(thread);
thread.start();
}
// 测试方法
public void test() {
for (int i = 0; i < 5; i++) {
createThread(1);
}
for (int i = 0; i < 5; i++) {
createThread(2);
}
}
}
ReadWriteLockTest readWriteLockTest = new ReadWriteLockTest();
readWriteLockTest.test();
readWriteLockTest.waitThread();
}
/**
* InterProcessSemaphoreV2 共享信号量
*/
@Test
public void semaphore() throws Exception {
// 创建一个信号量, Curator 以公平锁的方式进行实现
InterProcessSemaphoreV2 semaphore1 = new InterProcessSemaphoreV2(client1, lockPath, 6);
// semaphore2 用于模拟其他客户端
InterProcessSemaphoreV2 semaphore2 = new InterProcessSemaphoreV2(client2, lockPath, 6);
// 获取一个许可
Lease lease1 = semaphore1.acquire();
Assert.assertNotNull(lease1);
// semaphore.getParticipantNodes() 会返回当前参与信号量的节点列表, 俩个客户端所获取的信息相同
Assert.assertEquals(semaphore1.getParticipantNodes(), semaphore2.getParticipantNodes());
// 超时获取一个许可
Lease lease2 = semaphore2.acquire(2, TimeUnit.SECONDS);
Assert.assertNotNull(lease2);
Assert.assertEquals(semaphore1.getParticipantNodes(), semaphore2.getParticipantNodes());
// 获取多个许可, 参数为许可数量
Collection<Lease> leases = semaphore1.acquire(2);
Assert.assertTrue(leases.size() == 2);
Assert.assertEquals(semaphore1.getParticipantNodes(), semaphore2.getParticipantNodes());
// 超时获取多个许可, 第一个参数为许可数量
Collection<Lease> leases2 = semaphore2.acquire(2, 2, TimeUnit.SECONDS);
Assert.assertTrue(leases2.size() == 2);
Assert.assertEquals(semaphore1.getParticipantNodes(), semaphore2.getParticipantNodes());
// 目前 semaphore 已经获取 3 个许可, semaphore2 也获取 3 个许可, 加起来为 6 个, 所以他们无法再进行许可获取
Assert.assertNull(semaphore1.acquire(2, TimeUnit.SECONDS));
Assert.assertNull(semaphore2.acquire(2, TimeUnit.SECONDS));
// 释放一个许可
semaphore1.returnLease(lease1);
semaphore2.returnLease(lease2);
// 释放多个许可
semaphore1.returnAll(leases);
semaphore2.returnAll(leases2);
}
/**
* InterProcessMutex :可重入、独占锁
* InterProcessSemaphoreMutex 不可重入、独占锁
* InterProcessMultiLock 多重共享锁(将多个锁作为单个实体管理的容器)
*/
@Test
public void multiLock() throws Exception {
InterProcessMutex mutex = new InterProcessMutex(client1, lockPath);
InterProcessSemaphoreMutex semaphoreMutex = new InterProcessSemaphoreMutex(client2, lockPath);
//将上面的两种锁入到其中
InterProcessMultiLock multiLock = new InterProcessMultiLock(Arrays.asList(mutex, semaphoreMutex));
// 获取参数集合中的所有锁
multiLock.acquire();
// 因为存在一个不可重入锁, 所以整个 multiLock 不可重入
Assert.assertFalse(multiLock.acquire(2, TimeUnit.SECONDS));
// mutex 是可重入锁, 所以可以继续获取锁
Assert.assertTrue(mutex.acquire(2, TimeUnit.SECONDS));
// semaphoreMutex 是不可重入锁, 所以获取锁失败
Assert.assertFalse(semaphoreMutex.acquire(2, TimeUnit.SECONDS));
// 释放参数集合中的所有锁
multiLock.release();
// interProcessLock2 中的锁已经释放, 所以可以获取
Assert.assertTrue(semaphoreMutex.acquire(2, TimeUnit.SECONDS));
}
}
```
# 分布式限流
当系统的处理能力不能应对外部请求的突增流量时,为了不让系统奔溃,必须采取限流的措施。

Binary file not shown.

After

Width:  |  Height:  |  Size: 118 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 88 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 62 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 106 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 139 KiB

Loading…
Cancel
Save