Redis 分布式锁

v1.4.1
Parker 5 years ago
parent 1c5789269e
commit 37ff6866ff

@ -20,7 +20,6 @@
<groupId>org.springframework.boot</groupId> <groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId> <artifactId>spring-boot-starter-data-redis</artifactId>
</dependency> </dependency>
<!-- 集成Redis缓存 END --> <!-- 集成Redis缓存 END -->
</dependencies> </dependencies>

@ -1,21 +1,28 @@
package org.opsli.plugins.redis; package org.opsli.plugins.redis;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.builder.ToStringBuilder;
import org.opsli.plugins.redis.exception.RedisPluginException; import org.opsli.plugins.redis.exception.RedisPluginException;
import org.opsli.plugins.redis.lock.RedisLock;
import org.opsli.plugins.redis.msg.RedisMsg; import org.opsli.plugins.redis.msg.RedisMsg;
import org.opsli.plugins.redis.pushsub.entity.BaseSubMessage; import org.opsli.plugins.redis.pushsub.entity.BaseSubMessage;
import org.opsli.plugins.redis.scripts.RedisPluginScript; import org.opsli.plugins.redis.scripts.RedisPluginScript;
import org.opsli.plugins.redis.scripts.RedisScriptCache; import org.opsli.plugins.redis.scripts.RedisScriptCache;
import org.opsli.plugins.redis.scripts.enums.RedisScriptsEnum; import org.opsli.plugins.redis.scripts.enums.RedisScriptsEnum;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.core.io.ClassPathResource;
import org.springframework.data.redis.connection.RedisStringCommands; import org.springframework.data.redis.connection.RedisStringCommands;
import org.springframework.data.redis.core.RedisCallback; import org.springframework.data.redis.core.RedisCallback;
import org.springframework.data.redis.core.RedisTemplate; import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.core.ZSetOperations; import org.springframework.data.redis.core.ZSetOperations;
import org.springframework.data.redis.core.script.DefaultRedisScript;
import org.springframework.data.redis.core.script.RedisScript; import org.springframework.data.redis.core.script.RedisScript;
import org.springframework.data.redis.serializer.RedisSerializer;
import org.springframework.data.redis.serializer.StringRedisSerializer;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import java.util.*; import java.util.*;
import java.util.concurrent.FutureTask;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
/** /**
@ -33,6 +40,7 @@ public class RedisPlugin {
@Autowired @Autowired
private RedisScriptCache redisScriptCache; private RedisScriptCache redisScriptCache;
// ===================== 基础相关 ===================== // ===================== 基础相关 =====================
/** /**
@ -83,18 +91,23 @@ public class RedisPlugin {
* @param argv * @param argv
* @return object * @return object
*/ */
public Object callScript(RedisScriptsEnum scriptsEnum, List<String> keys, List<String> argv) { public Object callScript(RedisScriptsEnum scriptsEnum, List<String> keys, Object... argv) {
// 获得Script脚本 // 获得Script脚本
RedisPluginScript script = redisScriptCache.getScript(scriptsEnum); RedisPluginScript script = redisScriptCache.getScript(scriptsEnum);
if(script == null){ if(script == null){
return false; return false;
} }
RedisScript<Object> of = RedisScript.of(script.getScript()); DefaultRedisScript<Long> lockScript = new DefaultRedisScript<>();
return redisTemplate.execute(of,keys,argv); lockScript.setScriptText(script.getScript());
lockScript.setResultType(Long.class);
return redisTemplate.execute(lockScript, keys, argv);
} }
// ===================== Redis 锁相关 ===================== // ===================== Redis 锁相关 =====================
/* /*
* Redis使使
*
* *
* 1tair线 * 1tair线
* 2 * 2
@ -103,22 +116,35 @@ public class RedisPlugin {
/** /**
* Redis * Redis
* @param lockName * @param redisLock
* @param acquireTimeOut
* @param lockTimeOut
* @return identifier * @return identifier
*/ */
public String lockWithTimeOut(String lockName, long acquireTimeOut, int lockTimeOut) { public RedisLock tryLock(RedisLock redisLock) {
String identifier = UUID.randomUUID().toString(); //String identifier = UUID.randomUUID().toString().replaceAll("-","");
List<String> keys = Collections.singletonList("lock:" + lockName); String identifier = "aaa";
List<String> argv = Arrays.asList(identifier,
String.valueOf(lockTimeOut)); redisLock = this.tryLock(redisLock,identifier);
if(redisLock != null){
log.info("分布式锁 - 开启: 锁名称: "+redisLock.getLockName()+" 锁凭证: \""+redisLock.getIdentifier()+"\"");
// 启动看门狗
this.lockDog(redisLock);
}
return redisLock;
}
long acquireTimeEnd = System.currentTimeMillis() + acquireTimeOut; /**
* Redis
* @param redisLock
* @return identifier
*/
private RedisLock tryLock(RedisLock redisLock,String identifier) {
try {
List<String> keys = Collections.singletonList("lock:" + redisLock.getLockName());
long acquireTimeEnd = System.currentTimeMillis() + redisLock.getAcquireTimeOut();
boolean acquired = false; boolean acquired = false;
// 尝试获得锁 // 尝试获得锁
while (!acquired && (System.currentTimeMillis() < acquireTimeEnd)) { while (!acquired && (System.currentTimeMillis() < acquireTimeEnd)) {
Long ret = (Long) this.callScript(RedisScriptsEnum.REDIS_LOCK, keys, argv); Long ret = (Long) this.callScript(RedisScriptsEnum.REDIS_LOCK, keys, identifier,redisLock.getLockTimeOut());
if(ret == null){ if(ret == null){
break; break;
} }
@ -132,23 +158,82 @@ public class RedisPlugin {
} }
} }
} }
return acquired ? identifier : null; redisLock.setIdentifier(identifier);
return acquired ? redisLock : null;
}catch (Exception e){
log.error(e.getMessage(),e);
return null;
}
} }
/** /**
* Redis * Redis
* @param lockName * @param redisLock
* @param identifier
* @return boolean * @return boolean
*/ */
public boolean unLock(String lockName, String identifier) { public boolean unLock(RedisLock redisLock) {
List<String> keys = Collections.singletonList("lock:" + lockName); try {
List<String> argv = Collections.singletonList(identifier); List<String> keys = Collections.singletonList("lock:" + redisLock.getLockName());
Long ret = (Long) this.callScript(RedisScriptsEnum.REDIS_UN_LOCK, keys, argv); Long ret = (Long) this.callScript(RedisScriptsEnum.REDIS_UN_LOCK, keys, redisLock.getIdentifier());
// 减去线程锁
redisLock.unThreadLock();
log.info("分布式锁 - 解除: 锁名称: "+redisLock.getLockName()+" 锁凭证: "+redisLock.getIdentifier());
if(ret == null){ if(ret == null){
return false; return false;
} }
return 1 == ret; if(1 == ret){
return true;
}
return false;
}catch (Exception e){
log.error(e.getMessage(),e);
}
return false;
}
/**
* Redis - 使
* @param redisLock
* @return boolean
*/
private void lockDog(RedisLock redisLock) {
Thread t = new Thread(()->{
try {
// 倒计时前续命
long countDownTime = 3000L;
// 锁释放时间
long lockTimeOutEnd = System.currentTimeMillis() + redisLock.getLockTimeOut();
boolean dogFlag = true;
// 看门狗检测 当前线程是否还存活
while (dogFlag) {
int lock = redisLock.getThreadLock();
if(lock <= 0){
dogFlag = false;
// 再一次确定 解锁 防止线程差 最后加锁
this.unLock(redisLock);
break;
}
try {
Thread.sleep(10);
} catch (InterruptedException ignored) {
log.error(ignored.getMessage(),ignored);
}
// 如果 距离倒计时 前 2000 毫秒还没有动作 则执行续命
if((System.currentTimeMillis()+countDownTime) >= lockTimeOutEnd){
Object o = this.tryLock(redisLock,redisLock.getIdentifier());
if(o != null){
lockTimeOutEnd = System.currentTimeMillis() + redisLock.getLockTimeOut();
log.info("分布式锁 - 续命: 锁名称: "+redisLock.getLockName()+" 锁凭证: "+redisLock.getIdentifier());
}
}
}
}catch (Exception e){
log.error(e.getMessage(),e);
}
});
t.setName("分布式锁看门狗:"+" 锁名称: "+redisLock.getLockName()+" 锁凭证: "+redisLock.getIdentifier());
t.start();
} }

@ -1,5 +1,6 @@
package org.opsli.plugins.redis.conf; package org.opsli.plugins.redis.conf;
import com.alibaba.fastjson.support.spring.FastJsonRedisSerializer;
import com.fasterxml.jackson.annotation.JsonAutoDetect; import com.fasterxml.jackson.annotation.JsonAutoDetect;
import com.fasterxml.jackson.annotation.PropertyAccessor; import com.fasterxml.jackson.annotation.PropertyAccessor;
import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectMapper;
@ -13,6 +14,7 @@ import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.lettuce.LettuceConnectionFactory; import org.springframework.data.redis.connection.lettuce.LettuceConnectionFactory;
import org.springframework.data.redis.core.RedisTemplate; import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.serializer.Jackson2JsonRedisSerializer; import org.springframework.data.redis.serializer.Jackson2JsonRedisSerializer;
import org.springframework.data.redis.serializer.RedisSerializer;
import org.springframework.data.redis.serializer.StringRedisSerializer; import org.springframework.data.redis.serializer.StringRedisSerializer;
import javax.annotation.Resource; import javax.annotation.Resource;
@ -29,6 +31,8 @@ import java.util.Set;
@Configuration @Configuration
public class RedisPluginConfig { public class RedisPluginConfig {
private static final FastJsonRedisSerializer<Object> FAST_JSON_REDIS_SERIALIZER = new FastJsonRedisSerializer<>(Object.class);
@Resource @Resource
private LettuceConnectionFactory factory; private LettuceConnectionFactory factory;
@ -40,24 +44,20 @@ public class RedisPluginConfig {
public RedisTemplate<String, Object> redisTemplate() { public RedisTemplate<String, Object> redisTemplate() {
RedisTemplate<String, Object> template = new RedisTemplate<>(); RedisTemplate<String, Object> template = new RedisTemplate<>();
template.setConnectionFactory(factory); template.setConnectionFactory(factory);
Jackson2JsonRedisSerializer jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer(Object.class);
ObjectMapper om = new ObjectMapper();
om.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
om.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL);
jackson2JsonRedisSerializer.setObjectMapper(om);
StringRedisSerializer stringRedisSerializer = new StringRedisSerializer();
// key采用String的序列化方式 // key采用String的序列化方式
template.setKeySerializer(stringRedisSerializer); template.setKeySerializer(RedisSerializer.string());
// hash的key也采用String的序列化方式 // hash的key也采用String的序列化方式
template.setHashKeySerializer(stringRedisSerializer); template.setHashKeySerializer(RedisSerializer.string());
// value序列化方式采用jackson // value序列化方式采用 json
template.setValueSerializer(jackson2JsonRedisSerializer); template.setValueSerializer(FAST_JSON_REDIS_SERIALIZER);
// hash的value序列化方式采用jackson // hash的value序列化方式采用 json
template.setHashValueSerializer(jackson2JsonRedisSerializer); template.setHashValueSerializer(FAST_JSON_REDIS_SERIALIZER);
template.afterPropertiesSet(); template.afterPropertiesSet();
// 开启事务 // 开启事务
template.setEnableTransactionSupport(true); //template.setEnableTransactionSupport(true);
return template; return template;
} }

@ -0,0 +1,43 @@
package org.opsli.plugins.redis.lock;
import lombok.Data;
import lombok.Value;
import java.util.concurrent.atomic.AtomicInteger;
/**
* @BelongsProject: opsli-boot
* @BelongsPackage: org.opsli.plugins.redis.lock
* @Author: Parker
* @CreateTime: 2020-09-16 00:51
* @Description: Redis
*/
@Data
public class RedisLock {
/** 锁名称 */
private String lockName;
/** 尝试获取锁等待时间 */
private Long acquireTimeOut;
/** 锁有效时间 */
private Long lockTimeOut;
/** 锁凭证 */
private String identifier;
/** 线程锁 */
private AtomicInteger atomicInteger = new AtomicInteger(1);
/** 获得线程锁 */
public int getThreadLock(){
return atomicInteger.get();
}
/** 解除线程锁 */
public int unThreadLock(){
return atomicInteger.decrementAndGet();
}
}

@ -22,17 +22,26 @@ public class RedisLockScript implements RedisPluginScript {
@Override @Override
public String getScript() { public String getScript() {
return "local key = KEYS[1]\n" +
"local identifier = ARGV[1]\n" + return "-- 加锁脚本\n"+
"local lockTimeOut = ARGV[2]\n" + "-- key1要加锁的名称 argv1:当前线程或主机的地址 argv2锁存活的时间ms \n"+
"\n" + "local expire_time = tonumber(ARGV[2])\n"+
"if redis.call(\"SETNX\", key, identifier) == 1 then\n" + "if redis.call('exists', KEYS[1]) == 0 then\n"+
" redis.call(\"EXPIRE\", key, lockTimeOut)\n" + " -- 锁不存在创建一把锁存入hash类型的值\n"+
" return 1\n" + " redis.call('hset', KEYS[1], ARGV[1], 1)\n"+
"elseif redis.call(\"TTL\", key) == -1 then\n" + " -- 设置锁的存活时间,防止死锁\n"+
" redis.call(\"EXPIRE\", key, lockTimeOut)\n" + " redis.call('pexpire', KEYS[1], expire_time)\n"+
"end\n" + " return 1\n"+
"return 0"; "end\n"+
"if redis.call('hexists', KEYS[1], ARGV[1]) == 1 then\n"+
" -- 表示是同一线程重入\n"+
" redis.call('hincrby', KEYS[1], ARGV[1], 1)\n"+
" -- 重新设置锁的过期时间\n"+
" redis.call('pexpire', KEYS[1], expire_time)\n"+
" return 1\n"+
"end\n"+
"-- 没抢到锁返回锁的剩余有效时间ms\n"+
"return 0\n";
} }
} }

@ -22,14 +22,22 @@ public class RedisUnLockScript implements RedisPluginScript {
@Override @Override
public String getScript() { public String getScript() {
return "local key = KEYS[1]\n" +
"local identifier = ARGV[1]\n" + return "-- 解锁脚本\n"+
"\n" + "-- 判断是当前线程持有锁,避免解了其他线程加的锁\n"+
"if redis.call(\"GET\", key) == identifier then\n" + "if redis.call('hexists',KEYS[1],ARGV[1]) == 1 then\n"+
" redis.call(\"DEL\", key)\n" + " -- 重入次数大于1扣减次数\n"+
" return 1\n" + " --if tonumber(redis.call('hget',KEYS[1],ARGV[1])) > 1 then\n"+
"end\n" + " -- return redis.call('hincrby', KEYS[1], ARGV[1], -1);\n"+
"return 0"; " -- 重入次数等于1删除该锁\n"+
" --else\n"+
" redis.call('del', KEYS[1]);\n"+
" return 1;\n"+
" --end\n"+
"-- 判断不是当前线程持有锁,返回解锁失败\n"+
"else\n"+
" return 0;\n"+
"end\n";
} }
} }

Loading…
Cancel
Save