From 171f4ba16c730ba1fe4ee74674319b1850db757e Mon Sep 17 00:00:00 2001 From: xiaoxiamo <82970607@qq.com> Date: Thu, 4 Jul 2024 21:01:17 +0800 Subject: [PATCH 1/3] =?UTF-8?q?feat:=20=E6=B7=BB=E5=8A=A0Redis=E5=81=9A?= =?UTF-8?q?=E6=B6=88=E6=81=AF=E9=98=9F=E5=88=97=EF=BC=88=E4=B8=80=EF=BC=89?= =?UTF-8?q?=EF=BC=9A=E6=B7=BB=E5=8A=A0Redis=E6=B6=88=E6=81=AF=E9=98=9F?= =?UTF-8?q?=E5=88=97?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../handler/receiver/redis/RedisReceiver.java | 89 +++++++++++++++++++ .../constans/MessageQueuePipeline.java | 1 + .../mq/redis/RedisSendMqServiceImpl.java | 59 ++++++++++++ 3 files changed, 149 insertions(+) create mode 100644 austin-handler/src/main/java/com/java3y/austin/handler/receiver/redis/RedisReceiver.java create mode 100644 austin-support/src/main/java/com/java3y/austin/support/mq/redis/RedisSendMqServiceImpl.java diff --git a/austin-handler/src/main/java/com/java3y/austin/handler/receiver/redis/RedisReceiver.java b/austin-handler/src/main/java/com/java3y/austin/handler/receiver/redis/RedisReceiver.java new file mode 100644 index 0000000..297a939 --- /dev/null +++ b/austin-handler/src/main/java/com/java3y/austin/handler/receiver/redis/RedisReceiver.java @@ -0,0 +1,89 @@ +package com.java3y.austin.handler.receiver.redis; + +import com.alibaba.fastjson.JSON; +import com.java3y.austin.common.domain.RecallTaskInfo; +import com.java3y.austin.common.domain.TaskInfo; +import com.java3y.austin.handler.receiver.MessageReceiver; +import com.java3y.austin.handler.receiver.service.ConsumeService; +import com.java3y.austin.support.constans.MessageQueuePipeline; +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; +import org.springframework.data.redis.core.StringRedisTemplate; +import org.springframework.scheduling.annotation.Scheduled; +import org.springframework.stereotype.Component; + +import java.util.List; +import java.util.Optional; +import java.util.concurrent.TimeUnit; +import java.util.function.Consumer; + +/** + * Redis 消息队列实现类 + * + * Guava Eventbus 和 Spring EventBus 只适用于单体服务 + * Redis 适合微服务,且无需单独部署三方消息队列,方便开发与简单应用 + * + * @author xiaoxiamao + * @date 2024/7/4 + */ +@Slf4j +@Component +@ConditionalOnProperty(name = "austin.mq.pipeline", havingValue = MessageQueuePipeline.REDIS) +public class RedisReceiver implements MessageReceiver { + + @Autowired + private StringRedisTemplate stringRedisTemplate; + + @Autowired + private ConsumeService consumeService; + + @Value("${austin.business.topic.name}") + private String sendTopic; + @Value("${austin.business.recall.topic.name}") + private String recallTopic; + + /** + * 消费发送消息 + */ + @Scheduled(fixedDelay = 5000) + public void receiveSendMessage() { + receiveMessage(sendTopic, message -> { + List taskInfoList = JSON.parseArray(message, TaskInfo.class); + consumeService.consume2Send(taskInfoList); + }); + } + + /** + * 消费撤回消息 + */ + @Scheduled(fixedDelay = 5000) + public void receiveRecallMessage() { + receiveMessage(recallTopic, message -> { + RecallTaskInfo recallTaskInfo = JSON.parseObject(message, RecallTaskInfo.class); + consumeService.consume2recall(recallTaskInfo); + }); + } + + /** + * 消息处理方法 + * + * @param topic 消息主题 + * @param consumer 消费处理逻辑 + */ + private void receiveMessage(String topic, Consumer consumer) { + try { + while (true) { + // 阻塞操作,减少CPU,IO消耗 + Optional message = Optional.ofNullable( + stringRedisTemplate.opsForList().rightPop(topic, 0, TimeUnit.SECONDS)); + if (message.isPresent()) { + consumer.accept(message.get()); + } + } + } catch (Exception e) { + log.error("Error receiving messages from Redis topic {}: {}", topic, e); + } + } +} diff --git a/austin-support/src/main/java/com/java3y/austin/support/constans/MessageQueuePipeline.java b/austin-support/src/main/java/com/java3y/austin/support/constans/MessageQueuePipeline.java index 8859bf1..5517508 100644 --- a/austin-support/src/main/java/com/java3y/austin/support/constans/MessageQueuePipeline.java +++ b/austin-support/src/main/java/com/java3y/austin/support/constans/MessageQueuePipeline.java @@ -8,6 +8,7 @@ package com.java3y.austin.support.constans; */ public class MessageQueuePipeline { public static final String EVENT_BUS = "eventBus"; + public static final String REDIS = "redis"; public static final String KAFKA = "kafka"; public static final String ROCKET_MQ = "rocketMq"; public static final String RABBIT_MQ = "rabbitMq"; diff --git a/austin-support/src/main/java/com/java3y/austin/support/mq/redis/RedisSendMqServiceImpl.java b/austin-support/src/main/java/com/java3y/austin/support/mq/redis/RedisSendMqServiceImpl.java new file mode 100644 index 0000000..c0ae35c --- /dev/null +++ b/austin-support/src/main/java/com/java3y/austin/support/mq/redis/RedisSendMqServiceImpl.java @@ -0,0 +1,59 @@ +package com.java3y.austin.support.mq.redis; + +import com.java3y.austin.support.constans.MessageQueuePipeline; +import com.java3y.austin.support.mq.SendMqService; +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; +import org.springframework.data.redis.core.StringRedisTemplate; +import org.springframework.stereotype.Service; + +/** + * Redis 消息队列实现类 + * + * @author xiaoxiamao + * @date 2024/7/4 + */ +@Slf4j +@Service +@ConditionalOnProperty(name = "austin.mq.pipeline", havingValue = MessageQueuePipeline.REDIS) +public class RedisSendMqServiceImpl implements SendMqService { + + @Autowired + private StringRedisTemplate stringRedisTemplate; + + @Value("${austin.business.topic.name}") + private String sendTopic; + @Value("${austin.business.recall.topic.name}") + private String recallTopic; + + /** + * Redis 发送消息,左进右出 + * + * @param topic + * @param jsonValue + * @param tagId + */ + @Override + public void send(String topic, String jsonValue, String tagId) { + // 非业务topic,抛错不发送 + if (!sendTopic.equals(topic) && !recallTopic.equals(topic)) { + log.error("RedisSendMqServiceImpl#The topic type is not supported! topic:{}, jsonValue:{}, tagId:{}", + topic, jsonValue, tagId); + return; + } + stringRedisTemplate.opsForList().leftPush(topic, jsonValue); + } + + /** + * Redis 发送消息 + * + * @param topic + * @param jsonValue + */ + @Override + public void send(String topic, String jsonValue) { + send(topic, jsonValue, null); + } +} From 53fea21a4414fb943aea88390f3f4d2983f8787e Mon Sep 17 00:00:00 2001 From: xiaoxiamo <82970607@qq.com> Date: Fri, 5 Jul 2024 16:22:43 +0800 Subject: [PATCH 2/3] =?UTF-8?q?feat:=20=E6=B7=BB=E5=8A=A0Redis=E5=81=9A?= =?UTF-8?q?=E6=B6=88=E6=81=AF=E9=98=9F=E5=88=97=EF=BC=88=E4=BA=8C=EF=BC=89?= =?UTF-8?q?=EF=BC=9A=E6=B7=BB=E5=8A=A0=E6=B3=A8=E9=87=8A=E4=B8=8E=E6=97=A5?= =?UTF-8?q?=E5=BF=97?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../handler/receiver/redis/RedisReceiver.java | 16 ++++++++++------ .../support/mq/redis/RedisSendMqServiceImpl.java | 6 +++++- 2 files changed, 15 insertions(+), 7 deletions(-) diff --git a/austin-handler/src/main/java/com/java3y/austin/handler/receiver/redis/RedisReceiver.java b/austin-handler/src/main/java/com/java3y/austin/handler/receiver/redis/RedisReceiver.java index 297a939..83f0ef3 100644 --- a/austin-handler/src/main/java/com/java3y/austin/handler/receiver/redis/RedisReceiver.java +++ b/austin-handler/src/main/java/com/java3y/austin/handler/receiver/redis/RedisReceiver.java @@ -20,10 +20,8 @@ import java.util.concurrent.TimeUnit; import java.util.function.Consumer; /** - * Redis 消息队列实现类 * - * Guava Eventbus 和 Spring EventBus 只适用于单体服务 - * Redis 适合微服务,且无需单独部署三方消息队列,方便开发与简单应用 + * Redis 消息队列实现类 * * @author xiaoxiamao * @date 2024/7/4 @@ -46,8 +44,11 @@ public class RedisReceiver implements MessageReceiver { /** * 消费发送消息 + * + * @Scheduled() 程序异常退出后拉起 + * */ - @Scheduled(fixedDelay = 5000) + @Scheduled(fixedDelay = 1000) public void receiveSendMessage() { receiveMessage(sendTopic, message -> { List taskInfoList = JSON.parseArray(message, TaskInfo.class); @@ -58,7 +59,7 @@ public class RedisReceiver implements MessageReceiver { /** * 消费撤回消息 */ - @Scheduled(fixedDelay = 5000) + @Scheduled(fixedDelay = 1000) public void receiveRecallMessage() { receiveMessage(recallTopic, message -> { RecallTaskInfo recallTaskInfo = JSON.parseObject(message, RecallTaskInfo.class); @@ -69,6 +70,8 @@ public class RedisReceiver implements MessageReceiver { /** * 消息处理方法 * + * 处理责任链有去重处理,此处暂不做 + * * @param topic 消息主题 * @param consumer 消费处理逻辑 */ @@ -78,12 +81,13 @@ public class RedisReceiver implements MessageReceiver { // 阻塞操作,减少CPU,IO消耗 Optional message = Optional.ofNullable( stringRedisTemplate.opsForList().rightPop(topic, 0, TimeUnit.SECONDS)); + log.debug("RedisReceiver#receiveMessage Received message from Redis topic {}: {}", topic, message); if (message.isPresent()) { consumer.accept(message.get()); } } } catch (Exception e) { - log.error("Error receiving messages from Redis topic {}: {}", topic, e); + log.error("RedisReceiver#receiveMessage Error receiving messages from Redis topic {}: {}", topic, e); } } } diff --git a/austin-support/src/main/java/com/java3y/austin/support/mq/redis/RedisSendMqServiceImpl.java b/austin-support/src/main/java/com/java3y/austin/support/mq/redis/RedisSendMqServiceImpl.java index c0ae35c..ca5178a 100644 --- a/austin-support/src/main/java/com/java3y/austin/support/mq/redis/RedisSendMqServiceImpl.java +++ b/austin-support/src/main/java/com/java3y/austin/support/mq/redis/RedisSendMqServiceImpl.java @@ -12,6 +12,9 @@ import org.springframework.stereotype.Service; /** * Redis 消息队列实现类 * + * Guava Eventbus 和 Spring EventBus 只适用于单体服务 + * Redis 适合单体、微服务,且无需单独部署三方消息队列,方便开发与简单应用 + * * @author xiaoxiamao * @date 2024/7/4 */ @@ -39,10 +42,11 @@ public class RedisSendMqServiceImpl implements SendMqService { public void send(String topic, String jsonValue, String tagId) { // 非业务topic,抛错不发送 if (!sendTopic.equals(topic) && !recallTopic.equals(topic)) { - log.error("RedisSendMqServiceImpl#The topic type is not supported! topic:{}, jsonValue:{}, tagId:{}", + log.error("RedisSendMqServiceImpl#send The topic type is not supported! topic:{}, jsonValue:{}, tagId:{}", topic, jsonValue, tagId); return; } + log.debug("RedisSendMqServiceImpl#send topic:{}, jsonValue:{}, tagId:{}", topic, jsonValue, tagId); stringRedisTemplate.opsForList().leftPush(topic, jsonValue); } From da6b5a8ad14b4e844b5750f3e30b37f95119bf2e Mon Sep 17 00:00:00 2001 From: xiaoxiamo <82970607@qq.com> Date: Sat, 13 Jul 2024 23:43:11 +0800 Subject: [PATCH 3/3] =?UTF-8?q?feat:=20=E6=B7=BB=E5=8A=A0Redis=E5=81=9A?= =?UTF-8?q?=E6=B6=88=E6=81=AF=E9=98=9F=E5=88=97=EF=BC=88=E4=B8=89=EF=BC=89?= =?UTF-8?q?=EF=BC=9A=E6=B5=8B=E8=AF=95Redis=E6=B6=88=E6=81=AF=E9=98=9F?= =?UTF-8?q?=E5=88=97=E9=80=9A=E8=BF=87=EF=BC=8C=E5=87=8F=E5=B0=91=E9=9D=9E?= =?UTF-8?q?=E5=BF=85=E8=A6=81=E6=97=A5=E5=BF=97=E6=89=93=E5=8D=B0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../handler/receiver/redis/RedisReceiver.java | 43 +++++++++++-------- 1 file changed, 26 insertions(+), 17 deletions(-) diff --git a/austin-handler/src/main/java/com/java3y/austin/handler/receiver/redis/RedisReceiver.java b/austin-handler/src/main/java/com/java3y/austin/handler/receiver/redis/RedisReceiver.java index 83f0ef3..01318d9 100644 --- a/austin-handler/src/main/java/com/java3y/austin/handler/receiver/redis/RedisReceiver.java +++ b/austin-handler/src/main/java/com/java3y/austin/handler/receiver/redis/RedisReceiver.java @@ -11,11 +11,13 @@ import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.data.redis.core.StringRedisTemplate; -import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Component; +import javax.annotation.PostConstruct; import java.util.List; import java.util.Optional; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.function.Consumer; @@ -31,26 +33,35 @@ import java.util.function.Consumer; @ConditionalOnProperty(name = "austin.mq.pipeline", havingValue = MessageQueuePipeline.REDIS) public class RedisReceiver implements MessageReceiver { + @Value("${austin.business.topic.name}") + private String sendTopic; + @Value("${austin.business.recall.topic.name}") + private String recallTopic; + @Autowired private StringRedisTemplate stringRedisTemplate; - @Autowired private ConsumeService consumeService; - @Value("${austin.business.topic.name}") - private String sendTopic; - @Value("${austin.business.recall.topic.name}") - private String recallTopic; + /** + * 初始化调度线程池 + */ + @PostConstruct + public void init() { + // 创建调度线程池 + ScheduledExecutorService scheduler = new ScheduledThreadPoolExecutor(2, + r -> new Thread(r, "RedisReceiverThread")); + // 定时调度 + scheduler.scheduleWithFixedDelay(this::receiveSendMessage, 0, 1, TimeUnit.SECONDS); + scheduler.scheduleWithFixedDelay(this::receiveRecallMessage, 0, 1, TimeUnit.SECONDS); + } /** * 消费发送消息 - * - * @Scheduled() 程序异常退出后拉起 - * */ - @Scheduled(fixedDelay = 1000) public void receiveSendMessage() { receiveMessage(sendTopic, message -> { + log.debug("RedisReceiver#receiveSendMessage message:{}", message); List taskInfoList = JSON.parseArray(message, TaskInfo.class); consumeService.consume2Send(taskInfoList); }); @@ -59,9 +70,9 @@ public class RedisReceiver implements MessageReceiver { /** * 消费撤回消息 */ - @Scheduled(fixedDelay = 1000) public void receiveRecallMessage() { receiveMessage(recallTopic, message -> { + log.debug("RedisReceiver#receiveRecallMessage recallTaskInfo:{}", message); RecallTaskInfo recallTaskInfo = JSON.parseObject(message, RecallTaskInfo.class); consumeService.consume2recall(recallTaskInfo); }); @@ -80,14 +91,12 @@ public class RedisReceiver implements MessageReceiver { while (true) { // 阻塞操作,减少CPU,IO消耗 Optional message = Optional.ofNullable( - stringRedisTemplate.opsForList().rightPop(topic, 0, TimeUnit.SECONDS)); - log.debug("RedisReceiver#receiveMessage Received message from Redis topic {}: {}", topic, message); - if (message.isPresent()) { - consumer.accept(message.get()); - } + stringRedisTemplate.opsForList().rightPop(topic, 20, TimeUnit.SECONDS)); + message.ifPresent(consumer); } } catch (Exception e) { - log.error("RedisReceiver#receiveMessage Error receiving messages from Redis topic {}: {}", topic, e); + log.error("RedisReceiver#receiveMessage Error receiving messages from Redis topic {}: {}", + topic, e.getMessage()); } } }