From 37ff6866ffa8bdf99a98c81d6b4de2f86bac58d7 Mon Sep 17 00:00:00 2001 From: Parker Date: Wed, 16 Sep 2020 02:32:58 +0800 Subject: [PATCH] =?UTF-8?q?Redis=20=E5=88=86=E5=B8=83=E5=BC=8F=E9=94=81?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- opsli-plugins/opsli-plugins-redis/pom.xml | 1 - .../org/opsli/plugins/redis/RedisPlugin.java | 157 ++++++++++++++---- .../plugins/redis/conf/RedisPluginConfig.java | 26 +-- .../opsli/plugins/redis/lock/RedisLock.java | 43 +++++ .../redis/scripts/RedisLockScript.java | 31 ++-- .../redis/scripts/RedisUnLockScript.java | 24 ++- 6 files changed, 213 insertions(+), 69 deletions(-) create mode 100644 opsli-plugins/opsli-plugins-redis/src/main/java/org/opsli/plugins/redis/lock/RedisLock.java diff --git a/opsli-plugins/opsli-plugins-redis/pom.xml b/opsli-plugins/opsli-plugins-redis/pom.xml index 13123eb..09b39e0 100644 --- a/opsli-plugins/opsli-plugins-redis/pom.xml +++ b/opsli-plugins/opsli-plugins-redis/pom.xml @@ -20,7 +20,6 @@ org.springframework.boot spring-boot-starter-data-redis - diff --git a/opsli-plugins/opsli-plugins-redis/src/main/java/org/opsli/plugins/redis/RedisPlugin.java b/opsli-plugins/opsli-plugins-redis/src/main/java/org/opsli/plugins/redis/RedisPlugin.java index 62c40f8..e780b78 100644 --- a/opsli-plugins/opsli-plugins-redis/src/main/java/org/opsli/plugins/redis/RedisPlugin.java +++ b/opsli-plugins/opsli-plugins-redis/src/main/java/org/opsli/plugins/redis/RedisPlugin.java @@ -1,21 +1,28 @@ package org.opsli.plugins.redis; import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang3.builder.ToStringBuilder; import org.opsli.plugins.redis.exception.RedisPluginException; +import org.opsli.plugins.redis.lock.RedisLock; import org.opsli.plugins.redis.msg.RedisMsg; import org.opsli.plugins.redis.pushsub.entity.BaseSubMessage; import org.opsli.plugins.redis.scripts.RedisPluginScript; import org.opsli.plugins.redis.scripts.RedisScriptCache; import org.opsli.plugins.redis.scripts.enums.RedisScriptsEnum; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.core.io.ClassPathResource; import org.springframework.data.redis.connection.RedisStringCommands; import org.springframework.data.redis.core.RedisCallback; import org.springframework.data.redis.core.RedisTemplate; import org.springframework.data.redis.core.ZSetOperations; +import org.springframework.data.redis.core.script.DefaultRedisScript; import org.springframework.data.redis.core.script.RedisScript; +import org.springframework.data.redis.serializer.RedisSerializer; +import org.springframework.data.redis.serializer.StringRedisSerializer; import org.springframework.stereotype.Component; import java.util.*; +import java.util.concurrent.FutureTask; import java.util.concurrent.TimeUnit; /** @@ -33,6 +40,7 @@ public class RedisPlugin { @Autowired private RedisScriptCache redisScriptCache; + // ===================== 基础相关 ===================== /** @@ -83,18 +91,23 @@ public class RedisPlugin { * @param argv 多参数 * @return object */ - public Object callScript(RedisScriptsEnum scriptsEnum, List keys, List argv) { + public Object callScript(RedisScriptsEnum scriptsEnum, List keys, Object... argv) { // 获得Script脚本 RedisPluginScript script = redisScriptCache.getScript(scriptsEnum); if(script == null){ return false; } - RedisScript of = RedisScript.of(script.getScript()); - return redisTemplate.execute(of,keys,argv); + DefaultRedisScript lockScript = new DefaultRedisScript<>(); + lockScript.setScriptText(script.getScript()); + lockScript.setResultType(Long.class); + return redisTemplate.execute(lockScript, keys, argv); } + // ===================== Redis 锁相关 ===================== /* + * 不建议在 Redis集群下使用,且使用时 需要加一个看门口 来自动续命 ,否则会出现锁重入 + * * 分布式锁需要考虑的问题 * 1、这把锁没有失效时间,一旦解锁操作失败,就会导致锁记录一直在tair中,其他线程无法再获得到锁。 * 2、这把锁只能是非阻塞的,无论成功还是失败都直接返回。 @@ -103,52 +116,124 @@ public class RedisPlugin { /** * Redis 加分布式锁 - * @param lockName 锁名称 - * @param acquireTimeOut 尝试获取锁等待时间 - * @param lockTimeOut 锁有效时间 + * @param redisLock 锁 * @return identifier 很重要,解锁全靠他 唯一凭证 */ - public String lockWithTimeOut(String lockName, long acquireTimeOut, int lockTimeOut) { - String identifier = UUID.randomUUID().toString(); - List keys = Collections.singletonList("lock:" + lockName); - List argv = Arrays.asList(identifier, - String.valueOf(lockTimeOut)); - - long acquireTimeEnd = System.currentTimeMillis() + acquireTimeOut; - boolean acquired = false; - // 尝试获得锁 - while (!acquired && (System.currentTimeMillis() < acquireTimeEnd)) { - Long ret = (Long) this.callScript(RedisScriptsEnum.REDIS_LOCK, keys, argv); - if(ret == null){ - break; - } - if (1 == ret){ - acquired = true; - } else { - try { - Thread.sleep(10); - } catch (InterruptedException ignored) { - log.error(ignored.getMessage(),ignored); + public RedisLock tryLock(RedisLock redisLock) { + //String identifier = UUID.randomUUID().toString().replaceAll("-",""); + String identifier = "aaa"; + + redisLock = this.tryLock(redisLock,identifier); + if(redisLock != null){ + log.info("分布式锁 - 开启: 锁名称: "+redisLock.getLockName()+" 锁凭证: \""+redisLock.getIdentifier()+"\""); + // 启动看门狗 + this.lockDog(redisLock); + } + return redisLock; + } + + /** + * Redis 加分布式锁 + * @param redisLock 锁 + * @return identifier 很重要,解锁全靠他 唯一凭证 + */ + private RedisLock tryLock(RedisLock redisLock,String identifier) { + try { + List keys = Collections.singletonList("lock:" + redisLock.getLockName()); + long acquireTimeEnd = System.currentTimeMillis() + redisLock.getAcquireTimeOut(); + boolean acquired = false; + // 尝试获得锁 + while (!acquired && (System.currentTimeMillis() < acquireTimeEnd)) { + Long ret = (Long) this.callScript(RedisScriptsEnum.REDIS_LOCK, keys, identifier,redisLock.getLockTimeOut()); + if(ret == null){ + break; + } + if (1 == ret){ + acquired = true; + } else { + try { + Thread.sleep(10); + } catch (InterruptedException ignored) { + log.error(ignored.getMessage(),ignored); + } } } + redisLock.setIdentifier(identifier); + return acquired ? redisLock : null; + }catch (Exception e){ + log.error(e.getMessage(),e); + return null; } - return acquired ? identifier : null; } /** * Redis 释放分布式锁 - * @param lockName 锁名称 - * @param identifier 唯一凭证 + * @param redisLock 锁 * @return boolean */ - public boolean unLock(String lockName, String identifier) { - List keys = Collections.singletonList("lock:" + lockName); - List argv = Collections.singletonList(identifier); - Long ret = (Long) this.callScript(RedisScriptsEnum.REDIS_UN_LOCK, keys, argv); - if(ret == null){ + public boolean unLock(RedisLock redisLock) { + try { + List keys = Collections.singletonList("lock:" + redisLock.getLockName()); + Long ret = (Long) this.callScript(RedisScriptsEnum.REDIS_UN_LOCK, keys, redisLock.getIdentifier()); + // 减去线程锁 + redisLock.unThreadLock(); + log.info("分布式锁 - 解除: 锁名称: "+redisLock.getLockName()+" 锁凭证: "+redisLock.getIdentifier()); + if(ret == null){ + return false; + } + if(1 == ret){ + return true; + } return false; + }catch (Exception e){ + log.error(e.getMessage(),e); } - return 1 == ret; + return false; + } + + /** + * Redis 分布式锁 - 看门狗 自动续命使用 + * @param redisLock 锁 + * @return boolean + */ + private void lockDog(RedisLock redisLock) { + Thread t = new Thread(()->{ + try { + // 倒计时前续命 + long countDownTime = 3000L; + // 锁释放时间 + long lockTimeOutEnd = System.currentTimeMillis() + redisLock.getLockTimeOut(); + boolean dogFlag = true; + // 看门狗检测 当前线程是否还存活 + while (dogFlag) { + int lock = redisLock.getThreadLock(); + if(lock <= 0){ + dogFlag = false; + // 再一次确定 解锁 防止线程差 最后加锁 + this.unLock(redisLock); + break; + } + try { + Thread.sleep(10); + } catch (InterruptedException ignored) { + log.error(ignored.getMessage(),ignored); + } + + // 如果 距离倒计时 前 2000 毫秒还没有动作 则执行续命 + if((System.currentTimeMillis()+countDownTime) >= lockTimeOutEnd){ + Object o = this.tryLock(redisLock,redisLock.getIdentifier()); + if(o != null){ + lockTimeOutEnd = System.currentTimeMillis() + redisLock.getLockTimeOut(); + log.info("分布式锁 - 续命: 锁名称: "+redisLock.getLockName()+" 锁凭证: "+redisLock.getIdentifier()); + } + } + } + }catch (Exception e){ + log.error(e.getMessage(),e); + } + }); + t.setName("分布式锁看门狗:"+" 锁名称: "+redisLock.getLockName()+" 锁凭证: "+redisLock.getIdentifier()); + t.start(); } diff --git a/opsli-plugins/opsli-plugins-redis/src/main/java/org/opsli/plugins/redis/conf/RedisPluginConfig.java b/opsli-plugins/opsli-plugins-redis/src/main/java/org/opsli/plugins/redis/conf/RedisPluginConfig.java index d966b1c..802e215 100644 --- a/opsli-plugins/opsli-plugins-redis/src/main/java/org/opsli/plugins/redis/conf/RedisPluginConfig.java +++ b/opsli-plugins/opsli-plugins-redis/src/main/java/org/opsli/plugins/redis/conf/RedisPluginConfig.java @@ -1,5 +1,6 @@ package org.opsli.plugins.redis.conf; +import com.alibaba.fastjson.support.spring.FastJsonRedisSerializer; import com.fasterxml.jackson.annotation.JsonAutoDetect; import com.fasterxml.jackson.annotation.PropertyAccessor; import com.fasterxml.jackson.databind.ObjectMapper; @@ -13,6 +14,7 @@ import org.springframework.context.annotation.Configuration; import org.springframework.data.redis.connection.lettuce.LettuceConnectionFactory; import org.springframework.data.redis.core.RedisTemplate; import org.springframework.data.redis.serializer.Jackson2JsonRedisSerializer; +import org.springframework.data.redis.serializer.RedisSerializer; import org.springframework.data.redis.serializer.StringRedisSerializer; import javax.annotation.Resource; @@ -29,6 +31,8 @@ import java.util.Set; @Configuration public class RedisPluginConfig { + private static final FastJsonRedisSerializer FAST_JSON_REDIS_SERIALIZER = new FastJsonRedisSerializer<>(Object.class); + @Resource private LettuceConnectionFactory factory; @@ -40,24 +44,20 @@ public class RedisPluginConfig { public RedisTemplate redisTemplate() { RedisTemplate template = new RedisTemplate<>(); template.setConnectionFactory(factory); - Jackson2JsonRedisSerializer jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer(Object.class); - ObjectMapper om = new ObjectMapper(); - om.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY); - om.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL); - jackson2JsonRedisSerializer.setObjectMapper(om); - StringRedisSerializer stringRedisSerializer = new StringRedisSerializer(); // key采用String的序列化方式 - template.setKeySerializer(stringRedisSerializer); + template.setKeySerializer(RedisSerializer.string()); // hash的key也采用String的序列化方式 - template.setHashKeySerializer(stringRedisSerializer); - // value序列化方式采用jackson - template.setValueSerializer(jackson2JsonRedisSerializer); - // hash的value序列化方式采用jackson - template.setHashValueSerializer(jackson2JsonRedisSerializer); + template.setHashKeySerializer(RedisSerializer.string()); + // value序列化方式采用 json + template.setValueSerializer(FAST_JSON_REDIS_SERIALIZER); + // hash的value序列化方式采用 json + template.setHashValueSerializer(FAST_JSON_REDIS_SERIALIZER); + template.afterPropertiesSet(); // 开启事务 - template.setEnableTransactionSupport(true); + //template.setEnableTransactionSupport(true); + return template; } diff --git a/opsli-plugins/opsli-plugins-redis/src/main/java/org/opsli/plugins/redis/lock/RedisLock.java b/opsli-plugins/opsli-plugins-redis/src/main/java/org/opsli/plugins/redis/lock/RedisLock.java new file mode 100644 index 0000000..1c45166 --- /dev/null +++ b/opsli-plugins/opsli-plugins-redis/src/main/java/org/opsli/plugins/redis/lock/RedisLock.java @@ -0,0 +1,43 @@ +package org.opsli.plugins.redis.lock; + +import lombok.Data; +import lombok.Value; + +import java.util.concurrent.atomic.AtomicInteger; + +/** + * @BelongsProject: opsli-boot + * @BelongsPackage: org.opsli.plugins.redis.lock + * @Author: Parker + * @CreateTime: 2020-09-16 00:51 + * @Description: Redis 锁 + */ +@Data +public class RedisLock { + + /** 锁名称 */ + private String lockName; + + /** 尝试获取锁等待时间 */ + private Long acquireTimeOut; + + /** 锁有效时间 */ + private Long lockTimeOut; + + /** 锁凭证 */ + private String identifier; + + /** 线程锁 */ + private AtomicInteger atomicInteger = new AtomicInteger(1); + + /** 获得线程锁 */ + public int getThreadLock(){ + return atomicInteger.get(); + } + + /** 解除线程锁 */ + public int unThreadLock(){ + return atomicInteger.decrementAndGet(); + } + +} diff --git a/opsli-plugins/opsli-plugins-redis/src/main/java/org/opsli/plugins/redis/scripts/RedisLockScript.java b/opsli-plugins/opsli-plugins-redis/src/main/java/org/opsli/plugins/redis/scripts/RedisLockScript.java index 05ecd7e..4974b51 100644 --- a/opsli-plugins/opsli-plugins-redis/src/main/java/org/opsli/plugins/redis/scripts/RedisLockScript.java +++ b/opsli-plugins/opsli-plugins-redis/src/main/java/org/opsli/plugins/redis/scripts/RedisLockScript.java @@ -22,17 +22,26 @@ public class RedisLockScript implements RedisPluginScript { @Override public String getScript() { - return "local key = KEYS[1]\n" + - "local identifier = ARGV[1]\n" + - "local lockTimeOut = ARGV[2]\n" + - "\n" + - "if redis.call(\"SETNX\", key, identifier) == 1 then\n" + - " redis.call(\"EXPIRE\", key, lockTimeOut)\n" + - " return 1\n" + - "elseif redis.call(\"TTL\", key) == -1 then\n" + - " redis.call(\"EXPIRE\", key, lockTimeOut)\n" + - "end\n" + - "return 0"; + + return "-- 加锁脚本\n"+ + "-- key1:要加锁的名称 argv1:当前线程或主机的地址 argv2:锁存活的时间ms \n"+ + "local expire_time = tonumber(ARGV[2])\n"+ + "if redis.call('exists', KEYS[1]) == 0 then\n"+ + " -- 锁不存在,创建一把锁,存入hash类型的值\n"+ + " redis.call('hset', KEYS[1], ARGV[1], 1)\n"+ + " -- 设置锁的存活时间,防止死锁\n"+ + " redis.call('pexpire', KEYS[1], expire_time)\n"+ + " return 1\n"+ + "end\n"+ + "if redis.call('hexists', KEYS[1], ARGV[1]) == 1 then\n"+ + " -- 表示是同一线程重入\n"+ + " redis.call('hincrby', KEYS[1], ARGV[1], 1)\n"+ + " -- 重新设置锁的过期时间\n"+ + " redis.call('pexpire', KEYS[1], expire_time)\n"+ + " return 1\n"+ + "end\n"+ + "-- 没抢到锁,返回锁的剩余有效时间ms\n"+ + "return 0\n"; } } diff --git a/opsli-plugins/opsli-plugins-redis/src/main/java/org/opsli/plugins/redis/scripts/RedisUnLockScript.java b/opsli-plugins/opsli-plugins-redis/src/main/java/org/opsli/plugins/redis/scripts/RedisUnLockScript.java index c9341bb..87f54c5 100644 --- a/opsli-plugins/opsli-plugins-redis/src/main/java/org/opsli/plugins/redis/scripts/RedisUnLockScript.java +++ b/opsli-plugins/opsli-plugins-redis/src/main/java/org/opsli/plugins/redis/scripts/RedisUnLockScript.java @@ -22,14 +22,22 @@ public class RedisUnLockScript implements RedisPluginScript { @Override public String getScript() { - return "local key = KEYS[1]\n" + - "local identifier = ARGV[1]\n" + - "\n" + - "if redis.call(\"GET\", key) == identifier then\n" + - " redis.call(\"DEL\", key)\n" + - " return 1\n" + - "end\n" + - "return 0"; + + return "-- 解锁脚本\n"+ + "-- 判断是当前线程持有锁,避免解了其他线程加的锁\n"+ + "if redis.call('hexists',KEYS[1],ARGV[1]) == 1 then\n"+ + " -- 重入次数大于1,扣减次数\n"+ + " --if tonumber(redis.call('hget',KEYS[1],ARGV[1])) > 1 then\n"+ + " -- return redis.call('hincrby', KEYS[1], ARGV[1], -1);\n"+ + " -- 重入次数等于1,删除该锁\n"+ + " --else\n"+ + " redis.call('del', KEYS[1]);\n"+ + " return 1;\n"+ + " --end\n"+ + "-- 判断不是当前线程持有锁,返回解锁失败\n"+ + "else\n"+ + " return 0;\n"+ + "end\n"; } }