|
|
@ -20,10 +20,8 @@ import java.util.concurrent.TimeUnit;
|
|
|
|
import java.util.function.Consumer;
|
|
|
|
import java.util.function.Consumer;
|
|
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
/**
|
|
|
|
* Redis 消息队列实现类
|
|
|
|
|
|
|
|
*
|
|
|
|
*
|
|
|
|
* Guava Eventbus 和 Spring EventBus 只适用于单体服务
|
|
|
|
* Redis 消息队列实现类
|
|
|
|
* Redis 适合微服务,且无需单独部署三方消息队列,方便开发与简单应用
|
|
|
|
|
|
|
|
*
|
|
|
|
*
|
|
|
|
* @author xiaoxiamao
|
|
|
|
* @author xiaoxiamao
|
|
|
|
* @date 2024/7/4
|
|
|
|
* @date 2024/7/4
|
|
|
@ -46,8 +44,11 @@ public class RedisReceiver implements MessageReceiver {
|
|
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
/**
|
|
|
|
* 消费发送消息
|
|
|
|
* 消费发送消息
|
|
|
|
|
|
|
|
*
|
|
|
|
|
|
|
|
* @Scheduled() 程序异常退出后拉起
|
|
|
|
|
|
|
|
*
|
|
|
|
*/
|
|
|
|
*/
|
|
|
|
@Scheduled(fixedDelay = 5000)
|
|
|
|
@Scheduled(fixedDelay = 1000)
|
|
|
|
public void receiveSendMessage() {
|
|
|
|
public void receiveSendMessage() {
|
|
|
|
receiveMessage(sendTopic, message -> {
|
|
|
|
receiveMessage(sendTopic, message -> {
|
|
|
|
List<TaskInfo> taskInfoList = JSON.parseArray(message, TaskInfo.class);
|
|
|
|
List<TaskInfo> taskInfoList = JSON.parseArray(message, TaskInfo.class);
|
|
|
@ -58,7 +59,7 @@ public class RedisReceiver implements MessageReceiver {
|
|
|
|
/**
|
|
|
|
/**
|
|
|
|
* 消费撤回消息
|
|
|
|
* 消费撤回消息
|
|
|
|
*/
|
|
|
|
*/
|
|
|
|
@Scheduled(fixedDelay = 5000)
|
|
|
|
@Scheduled(fixedDelay = 1000)
|
|
|
|
public void receiveRecallMessage() {
|
|
|
|
public void receiveRecallMessage() {
|
|
|
|
receiveMessage(recallTopic, message -> {
|
|
|
|
receiveMessage(recallTopic, message -> {
|
|
|
|
RecallTaskInfo recallTaskInfo = JSON.parseObject(message, RecallTaskInfo.class);
|
|
|
|
RecallTaskInfo recallTaskInfo = JSON.parseObject(message, RecallTaskInfo.class);
|
|
|
@ -69,6 +70,8 @@ public class RedisReceiver implements MessageReceiver {
|
|
|
|
/**
|
|
|
|
/**
|
|
|
|
* 消息处理方法
|
|
|
|
* 消息处理方法
|
|
|
|
*
|
|
|
|
*
|
|
|
|
|
|
|
|
* 处理责任链有去重处理,此处暂不做
|
|
|
|
|
|
|
|
*
|
|
|
|
* @param topic 消息主题
|
|
|
|
* @param topic 消息主题
|
|
|
|
* @param consumer 消费处理逻辑
|
|
|
|
* @param consumer 消费处理逻辑
|
|
|
|
*/
|
|
|
|
*/
|
|
|
@ -78,12 +81,13 @@ public class RedisReceiver implements MessageReceiver {
|
|
|
|
// 阻塞操作,减少CPU,IO消耗
|
|
|
|
// 阻塞操作,减少CPU,IO消耗
|
|
|
|
Optional<String> message = Optional.ofNullable(
|
|
|
|
Optional<String> message = Optional.ofNullable(
|
|
|
|
stringRedisTemplate.opsForList().rightPop(topic, 0, TimeUnit.SECONDS));
|
|
|
|
stringRedisTemplate.opsForList().rightPop(topic, 0, TimeUnit.SECONDS));
|
|
|
|
|
|
|
|
log.debug("RedisReceiver#receiveMessage Received message from Redis topic {}: {}", topic, message);
|
|
|
|
if (message.isPresent()) {
|
|
|
|
if (message.isPresent()) {
|
|
|
|
consumer.accept(message.get());
|
|
|
|
consumer.accept(message.get());
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
} catch (Exception e) {
|
|
|
|
} catch (Exception e) {
|
|
|
|
log.error("Error receiving messages from Redis topic {}: {}", topic, e);
|
|
|
|
log.error("RedisReceiver#receiveMessage Error receiving messages from Redis topic {}: {}", topic, e);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|