增加Redis消息订阅 - 用户缓存同步

v1.4.1
Parker 5 years ago
parent d5b0eaec34
commit 77c18d3ea1

@ -24,6 +24,13 @@
<artifactId>opsli-common</artifactId> <artifactId>opsli-common</artifactId>
<version>${version}</version> <version>${version}</version>
</dependency> </dependency>
<!-- 引入Redis模块 -->
<dependency>
<groupId>org.opsliframework.boot</groupId>
<artifactId>opsli-plugins-redis</artifactId>
<version>${plugins.version}</version>
</dependency>
</dependencies> </dependencies>
</project> </project>

@ -0,0 +1,21 @@
package org.opsli.core.cache.pushsub.enums;
/**
* @BelongsProject: opsli-boot
* @BelongsPackage: org.opsli.core.cache.pushsub.enums
* @Author: Parker
* @CreateTime: 2020-09-15 16:15
* @Description:
*/
public enum MsgArgsType {
/** 字典Key */
DICT_KEY,
/** 字典Field */
DICT_FIELD,
/** 字典Value */
DICT_VALUE,
;
}

@ -0,0 +1,23 @@
package org.opsli.core.cache.pushsub.enums;
/**
* @BelongsProject: opsli-boot
* @BelongsPackage: org.opsli.core.cache.pushsub.enums
* @Author: Parker
* @CreateTime: 2020-09-15 16:15
* @Description:
*/
public enum PushSubType {
/** 字典类型 */
DICT,
;
PushSubType(){
}
}

@ -0,0 +1,25 @@
package org.opsli.core.cache.pushsub.handler;
import com.alibaba.fastjson.JSONObject;
import org.opsli.core.cache.pushsub.enums.PushSubType;
/**
* @BelongsProject: opsli-boot
* @BelongsPackage: org.opsli.core.cache.pushsub.handler
* @Author: Parker
* @CreateTime: 2020-09-15 16:24
* @Description:
*/
public class DictHandler implements RedisPushSubHandler{
@Override
public PushSubType getType() {
return PushSubType.DICT;
}
@Override
public void handler(JSONObject msgJson) {
System.out.println(msgJson.toJSONString());
}
}

@ -0,0 +1,24 @@
package org.opsli.core.cache.pushsub.handler;
import com.alibaba.fastjson.JSONObject;
import org.opsli.core.cache.pushsub.enums.PushSubType;
/**
* @BelongsProject: opsli-boot
* @BelongsPackage: org.opsli.core.cache.pushsub.receiver
* @Author: Parker
* @CreateTime: 2020-09-15 15:11
* @Description:
*/
public interface RedisPushSubHandler {
PushSubType getType();
/**
*
* @param msgJson
*/
void handler(JSONObject msgJson);
}

@ -0,0 +1,44 @@
package org.opsli.core.cache.pushsub.msgs;
import com.alibaba.fastjson.JSONObject;
import lombok.Data;
import lombok.experimental.Accessors;
import org.opsli.core.cache.pushsub.enums.MsgArgsType;
import org.opsli.core.cache.pushsub.enums.PushSubType;
import org.opsli.core.cache.pushsub.receiver.RedisPushSubReceiver;
import org.opsli.plugins.redis.pushsub.entity.BaseSubMessage;
/**
* @BelongsProject: opsli-boot
* @BelongsPackage: org.opsli.core.cache.pushsub.msgs
* @Author: Parker
* @CreateTime: 2020-09-15 16:50
* @Description:
*/
@Data
@Accessors(chain = true)
public final class DictMsgFactory extends BaseSubMessage{
/** 通道 */
private static final String CHANNEL = RedisPushSubReceiver.BASE_CHANNEL+RedisPushSubReceiver.CHANNEL;
private DictMsgFactory(){}
/**
*
*/
public static BaseSubMessage createMsg(String key, String field, Object value){
BaseSubMessage baseSubMessage = new BaseSubMessage();
// 数据
JSONObject jsonObj = new JSONObject();
jsonObj.put(MsgArgsType.DICT_KEY.toString(),key);
jsonObj.put(MsgArgsType.DICT_FIELD.toString(),field);
jsonObj.put(MsgArgsType.DICT_VALUE.toString(),value);
// DICT 字典
baseSubMessage.build(CHANNEL,PushSubType.DICT.toString(),jsonObj);
return baseSubMessage;
}
}

@ -0,0 +1,84 @@
package org.opsli.core.cache.pushsub.receiver;
import com.alibaba.fastjson.JSONObject;
import lombok.extern.slf4j.Slf4j;
import org.opsli.common.utils.PackageUtil;
import org.opsli.core.cache.pushsub.enums.PushSubType;
import org.opsli.core.cache.pushsub.handler.RedisPushSubHandler;
import org.opsli.core.msg.CoreMsg;
import org.opsli.plugins.redis.pushsub.entity.BaseSubMessage;
import org.opsli.plugins.redis.pushsub.receiver.BaseReceiver;
import java.lang.reflect.Modifier;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
/**
* @BelongsProject: opsli-boot
* @BelongsPackage: org.opsli.plugins.redis.entity
* @Author: Parker
* @CreateTime: 2020-09-15 14:50
* @Description: Redis
*
*
*
*/
@Slf4j
public class RedisPushSubReceiver extends BaseReceiver {
/** 监听信道 */
public static final String CHANNEL = "opsli";
/** 处理方法集合 */
private static final Map<PushSubType, RedisPushSubHandler> HANDLER_MAP = new HashMap<>();
static {
// 拿到state包下 实现了 SystemEventState 接口的,所有子类
Set<Class<?>> clazzSet = PackageUtil.listSubClazz(RedisPushSubHandler.class.getPackage().getName(),
true,
RedisPushSubHandler.class
);
for (Class<?> aClass : clazzSet) {
// 位运算 去除抽象类
if((aClass.getModifiers() & Modifier.ABSTRACT) != 0){
continue;
}
try {
RedisPushSubHandler handler = (RedisPushSubHandler) aClass.newInstance();
// 加入集合
HANDLER_MAP.put(handler.getType(),handler);
} catch (Exception e){
log.error(CoreMsg.REDIS_EXCEPTION_PUSH_SUB.getMessage());
}
}
}
public RedisPushSubReceiver() {
super(CHANNEL);
}
@Override
public void receiveMessage(String msg) {
if(msg == null || "".equals(msg)){
return;
}
// 替换 转意符
String replaceAll = msg.replaceAll("\\\\", "");
String substring = replaceAll.substring(1, replaceAll.length() - 1);
JSONObject msgJson = JSONObject.parseObject(substring);
String type = (String) msgJson.get(BaseSubMessage.BASE_TYPE);
PushSubType pt = PushSubType.valueOf(type);
RedisPushSubHandler redisPushSubHandler = HANDLER_MAP.get(pt);
if(redisPushSubHandler == null){
return;
}
redisPushSubHandler.handler(msgJson);
}
}

@ -0,0 +1,64 @@
package org.opsli.core.conf;
import lombok.extern.slf4j.Slf4j;
import org.opsli.core.cache.pushsub.receiver.RedisPushSubReceiver;
import org.opsli.plugins.redis.pushsub.receiver.BaseReceiver;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.lettuce.LettuceConnectionFactory;
import org.springframework.data.redis.listener.PatternTopic;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
import org.springframework.data.redis.listener.adapter.MessageListenerAdapter;
import javax.annotation.Resource;
/**
* @author : parker
* @date : 2020-09-15
*
*
*
**/
@Slf4j
@Configuration
@ConditionalOnProperty(name = "spring.redis.pushsub.enable", havingValue = "true")
public class RedisMessageListener {
@Resource
private LettuceConnectionFactory factory;
/**
* redis
* redis
*
* @return
*/
@Bean
public RedisMessageListenerContainer container() {
RedisPushSubReceiver receiver = new RedisPushSubReceiver();
RedisMessageListenerContainer container = new RedisMessageListenerContainer();
container.setConnectionFactory(factory);
//订阅了的通道
container.addMessageListener(listenerAdapter(new RedisPushSubReceiver()), new PatternTopic(receiver.getListenerChannel()));
return container;
}
/**
*
*
*
*
* @return
*/
@Bean
public MessageListenerAdapter listenerAdapter(RedisPushSubReceiver baseReceiver) {
//这个地方 是给messageListenerAdapter 传入一个消息接受的处理器利用反射的方法调用“receiveMessage”
//也有好几个重载方法,这边默认调用处理器的方法 叫handleMessage 可以自己到源码里面看
//receiveMessage就是对应消费者那边的消费方法吗,而Receiver是自己弄的一个消费者类
return new MessageListenerAdapter(baseReceiver, "receiveMessage");
}
}

@ -1,6 +1,5 @@
package org.opsli.core.msg; package org.opsli.core.msg;
import io.swagger.models.auth.In;
import org.opsli.common.base.msg.BaseMsg; import org.opsli.common.base.msg.BaseMsg;
/** /**
@ -12,11 +11,27 @@ import org.opsli.common.base.msg.BaseMsg;
*/ */
public enum CoreMsg implements BaseMsg { public enum CoreMsg implements BaseMsg {
/**
* Mybatis-Plus
*/
/** Mybatis-Plus 乐观锁 */
MYBATIS_OPTIMISTIC_LOCKER(10100,"当前数据已被更改,请刷新重试!"),
/**
* Redis
*/
REDIS_EXCEPTION_PUSH_SUB(10200,"Redis 订阅通道失败!")
; ;
private Integer code; private int code;
private String message; private String message;
CoreMsg(int code,String message){
this.code = code;
this.message = message;
}
@Override @Override
public Integer getCode() { public Integer getCode() {
return this.code; return this.code;

Loading…
Cancel
Save