diff --git a/opsli-core/pom.xml b/opsli-core/pom.xml index 354df75a..480f7dd9 100644 --- a/opsli-core/pom.xml +++ b/opsli-core/pom.xml @@ -24,6 +24,13 @@ opsli-common ${version} + + + + org.opsliframework.boot + opsli-plugins-redis + ${plugins.version} + \ No newline at end of file diff --git a/opsli-core/src/main/java/org/opsli/core/cache/pushsub/enums/MsgArgsType.java b/opsli-core/src/main/java/org/opsli/core/cache/pushsub/enums/MsgArgsType.java new file mode 100644 index 00000000..78dcf686 --- /dev/null +++ b/opsli-core/src/main/java/org/opsli/core/cache/pushsub/enums/MsgArgsType.java @@ -0,0 +1,21 @@ +package org.opsli.core.cache.pushsub.enums; + +/** + * @BelongsProject: opsli-boot + * @BelongsPackage: org.opsli.core.cache.pushsub.enums + * @Author: Parker + * @CreateTime: 2020-09-15 16:15 + * @Description: 消息具体类型 + */ +public enum MsgArgsType { + + /** 字典Key */ + DICT_KEY, + /** 字典Field */ + DICT_FIELD, + /** 字典Value */ + DICT_VALUE, + + ; + +} diff --git a/opsli-core/src/main/java/org/opsli/core/cache/pushsub/enums/PushSubType.java b/opsli-core/src/main/java/org/opsli/core/cache/pushsub/enums/PushSubType.java new file mode 100644 index 00000000..dd1707d7 --- /dev/null +++ b/opsli-core/src/main/java/org/opsli/core/cache/pushsub/enums/PushSubType.java @@ -0,0 +1,23 @@ +package org.opsli.core.cache.pushsub.enums; + +/** + * @BelongsProject: opsli-boot + * @BelongsPackage: org.opsli.core.cache.pushsub.enums + * @Author: Parker + * @CreateTime: 2020-09-15 16:15 + * @Description: 发布订阅类型 + */ +public enum PushSubType { + + /** 字典类型 */ + DICT, + + ; + + + + PushSubType(){ + + } + +} diff --git a/opsli-core/src/main/java/org/opsli/core/cache/pushsub/handler/DictHandler.java b/opsli-core/src/main/java/org/opsli/core/cache/pushsub/handler/DictHandler.java new file mode 100644 index 00000000..0244813f --- /dev/null +++ b/opsli-core/src/main/java/org/opsli/core/cache/pushsub/handler/DictHandler.java @@ -0,0 +1,25 @@ +package org.opsli.core.cache.pushsub.handler; + +import com.alibaba.fastjson.JSONObject; +import org.opsli.core.cache.pushsub.enums.PushSubType; + +/** + * @BelongsProject: opsli-boot + * @BelongsPackage: org.opsli.core.cache.pushsub.handler + * @Author: Parker + * @CreateTime: 2020-09-15 16:24 + * @Description: 字典消息处理 + */ +public class DictHandler implements RedisPushSubHandler{ + + @Override + public PushSubType getType() { + return PushSubType.DICT; + } + + @Override + public void handler(JSONObject msgJson) { + System.out.println(msgJson.toJSONString()); + } + +} diff --git a/opsli-core/src/main/java/org/opsli/core/cache/pushsub/handler/RedisPushSubHandler.java b/opsli-core/src/main/java/org/opsli/core/cache/pushsub/handler/RedisPushSubHandler.java new file mode 100644 index 00000000..85edfba4 --- /dev/null +++ b/opsli-core/src/main/java/org/opsli/core/cache/pushsub/handler/RedisPushSubHandler.java @@ -0,0 +1,24 @@ +package org.opsli.core.cache.pushsub.handler; + +import com.alibaba.fastjson.JSONObject; +import org.opsli.core.cache.pushsub.enums.PushSubType; + +/** + * @BelongsProject: opsli-boot + * @BelongsPackage: org.opsli.core.cache.pushsub.receiver + * @Author: Parker + * @CreateTime: 2020-09-15 15:11 + * @Description: 标示类 用于获得 消息未知 + */ +public interface RedisPushSubHandler { + + + PushSubType getType(); + + /** + * 消息处理 + * @param msgJson + */ + void handler(JSONObject msgJson); + +} diff --git a/opsli-core/src/main/java/org/opsli/core/cache/pushsub/msgs/DictMsgFactory.java b/opsli-core/src/main/java/org/opsli/core/cache/pushsub/msgs/DictMsgFactory.java new file mode 100644 index 00000000..45815c76 --- /dev/null +++ b/opsli-core/src/main/java/org/opsli/core/cache/pushsub/msgs/DictMsgFactory.java @@ -0,0 +1,44 @@ +package org.opsli.core.cache.pushsub.msgs; + +import com.alibaba.fastjson.JSONObject; +import lombok.Data; +import lombok.experimental.Accessors; +import org.opsli.core.cache.pushsub.enums.MsgArgsType; +import org.opsli.core.cache.pushsub.enums.PushSubType; +import org.opsli.core.cache.pushsub.receiver.RedisPushSubReceiver; +import org.opsli.plugins.redis.pushsub.entity.BaseSubMessage; + +/** + * @BelongsProject: opsli-boot + * @BelongsPackage: org.opsli.core.cache.pushsub.msgs + * @Author: Parker + * @CreateTime: 2020-09-15 16:50 + * @Description: 字典消息 + */ + +@Data +@Accessors(chain = true) +public final class DictMsgFactory extends BaseSubMessage{ + + /** 通道 */ + private static final String CHANNEL = RedisPushSubReceiver.BASE_CHANNEL+RedisPushSubReceiver.CHANNEL; + + private DictMsgFactory(){} + + /** + * 构建消息 + */ + public static BaseSubMessage createMsg(String key, String field, Object value){ + BaseSubMessage baseSubMessage = new BaseSubMessage(); + // 数据 + JSONObject jsonObj = new JSONObject(); + jsonObj.put(MsgArgsType.DICT_KEY.toString(),key); + jsonObj.put(MsgArgsType.DICT_FIELD.toString(),field); + jsonObj.put(MsgArgsType.DICT_VALUE.toString(),value); + + // DICT 字典 + baseSubMessage.build(CHANNEL,PushSubType.DICT.toString(),jsonObj); + return baseSubMessage; + } + +} diff --git a/opsli-core/src/main/java/org/opsli/core/cache/pushsub/receiver/RedisPushSubReceiver.java b/opsli-core/src/main/java/org/opsli/core/cache/pushsub/receiver/RedisPushSubReceiver.java new file mode 100644 index 00000000..da56d4fb --- /dev/null +++ b/opsli-core/src/main/java/org/opsli/core/cache/pushsub/receiver/RedisPushSubReceiver.java @@ -0,0 +1,84 @@ +package org.opsli.core.cache.pushsub.receiver; + +import com.alibaba.fastjson.JSONObject; +import lombok.extern.slf4j.Slf4j; +import org.opsli.common.utils.PackageUtil; +import org.opsli.core.cache.pushsub.enums.PushSubType; +import org.opsli.core.cache.pushsub.handler.RedisPushSubHandler; +import org.opsli.core.msg.CoreMsg; +import org.opsli.plugins.redis.pushsub.entity.BaseSubMessage; +import org.opsli.plugins.redis.pushsub.receiver.BaseReceiver; + +import java.lang.reflect.Modifier; +import java.util.HashMap; +import java.util.Map; +import java.util.Set; + +/** + * @BelongsProject: opsli-boot + * @BelongsPackage: org.opsli.plugins.redis.entity + * @Author: Parker + * @CreateTime: 2020-09-15 14:50 + * @Description: Redis 消息订阅 更新本地缓存 + * + * 字典缓存更新 + * + */ +@Slf4j +public class RedisPushSubReceiver extends BaseReceiver { + + /** 监听信道 */ + public static final String CHANNEL = "opsli"; + + /** 处理方法集合 */ + private static final Map HANDLER_MAP = new HashMap<>(); + + static { + // 拿到state包下 实现了 SystemEventState 接口的,所有子类 + Set> clazzSet = PackageUtil.listSubClazz(RedisPushSubHandler.class.getPackage().getName(), + true, + RedisPushSubHandler.class + ); + + for (Class aClass : clazzSet) { + // 位运算 去除抽象类 + if((aClass.getModifiers() & Modifier.ABSTRACT) != 0){ + continue; + } + + try { + RedisPushSubHandler handler = (RedisPushSubHandler) aClass.newInstance(); + + // 加入集合 + HANDLER_MAP.put(handler.getType(),handler); + + } catch (Exception e){ + log.error(CoreMsg.REDIS_EXCEPTION_PUSH_SUB.getMessage()); + } + } + } + + + public RedisPushSubReceiver() { + super(CHANNEL); + } + + @Override + public void receiveMessage(String msg) { + if(msg == null || "".equals(msg)){ + return; + } + // 替换 转意符 + String replaceAll = msg.replaceAll("\\\\", ""); + String substring = replaceAll.substring(1, replaceAll.length() - 1); + JSONObject msgJson = JSONObject.parseObject(substring); + String type = (String) msgJson.get(BaseSubMessage.BASE_TYPE); + PushSubType pt = PushSubType.valueOf(type); + RedisPushSubHandler redisPushSubHandler = HANDLER_MAP.get(pt); + if(redisPushSubHandler == null){ + return; + } + redisPushSubHandler.handler(msgJson); + } + +} diff --git a/opsli-core/src/main/java/org/opsli/core/conf/RedisMessageListener.java b/opsli-core/src/main/java/org/opsli/core/conf/RedisMessageListener.java new file mode 100644 index 00000000..6cd2171c --- /dev/null +++ b/opsli-core/src/main/java/org/opsli/core/conf/RedisMessageListener.java @@ -0,0 +1,64 @@ +package org.opsli.core.conf; + +import lombok.extern.slf4j.Slf4j; +import org.opsli.core.cache.pushsub.receiver.RedisPushSubReceiver; +import org.opsli.plugins.redis.pushsub.receiver.BaseReceiver; +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.data.redis.connection.lettuce.LettuceConnectionFactory; +import org.springframework.data.redis.listener.PatternTopic; +import org.springframework.data.redis.listener.RedisMessageListenerContainer; +import org.springframework.data.redis.listener.adapter.MessageListenerAdapter; + +import javax.annotation.Resource; + +/** + * @author : parker + * @date : 2020-09-15 + * + * 消息订阅 配置 + * + **/ +@Slf4j +@Configuration +@ConditionalOnProperty(name = "spring.redis.pushsub.enable", havingValue = "true") +public class RedisMessageListener { + + @Resource + private LettuceConnectionFactory factory; + + /** + * redis消息监听器容器 + * 可以添加多个监听不同话题的redis监听器,只需要把消息监听器和相应的消息订阅处理器绑定,该消息监听器 + * 通过反射技术调用消息订阅处理器的相关方法进行一些业务处理 + * @return + */ + + @Bean + public RedisMessageListenerContainer container() { + RedisPushSubReceiver receiver = new RedisPushSubReceiver(); + RedisMessageListenerContainer container = new RedisMessageListenerContainer(); + container.setConnectionFactory(factory); + + //订阅了的通道 + container.addMessageListener(listenerAdapter(new RedisPushSubReceiver()), new PatternTopic(receiver.getListenerChannel())); + + return container; + } + /** + * 消息监听器适配器,绑定消息处理器,利用反射技术调用消息处理器的业务方法 + * + * 想要处理消息 需要重写 消息接受方法 + * + * @return + */ + @Bean + public MessageListenerAdapter listenerAdapter(RedisPushSubReceiver baseReceiver) { + //这个地方 是给messageListenerAdapter 传入一个消息接受的处理器,利用反射的方法调用“receiveMessage” + //也有好几个重载方法,这边默认调用处理器的方法 叫handleMessage 可以自己到源码里面看 + //receiveMessage就是对应消费者那边的消费方法吗,而Receiver是自己弄的一个消费者类 + return new MessageListenerAdapter(baseReceiver, "receiveMessage"); + } + +} diff --git a/opsli-core/src/main/java/org/opsli/core/msg/CoreMsg.java b/opsli-core/src/main/java/org/opsli/core/msg/CoreMsg.java index 617d1996..df5818a8 100644 --- a/opsli-core/src/main/java/org/opsli/core/msg/CoreMsg.java +++ b/opsli-core/src/main/java/org/opsli/core/msg/CoreMsg.java @@ -1,6 +1,5 @@ package org.opsli.core.msg; -import io.swagger.models.auth.In; import org.opsli.common.base.msg.BaseMsg; /** @@ -12,11 +11,27 @@ import org.opsli.common.base.msg.BaseMsg; */ public enum CoreMsg implements BaseMsg { + /** + * Mybatis-Plus + */ + /** Mybatis-Plus 乐观锁 */ + MYBATIS_OPTIMISTIC_LOCKER(10100,"当前数据已被更改,请刷新重试!"), + + /** + * Redis + */ + REDIS_EXCEPTION_PUSH_SUB(10200,"Redis 订阅通道失败!") + ; - private Integer code; + private int code; private String message; + CoreMsg(int code,String message){ + this.code = code; + this.message = message; + } + @Override public Integer getCode() { return this.code;