diff --git a/austin-handler/src/main/java/com/java3y/austin/handler/receiver/redis/RedisReceiver.java b/austin-handler/src/main/java/com/java3y/austin/handler/receiver/redis/RedisReceiver.java new file mode 100644 index 0000000..01318d9 --- /dev/null +++ b/austin-handler/src/main/java/com/java3y/austin/handler/receiver/redis/RedisReceiver.java @@ -0,0 +1,102 @@ +package com.java3y.austin.handler.receiver.redis; + +import com.alibaba.fastjson.JSON; +import com.java3y.austin.common.domain.RecallTaskInfo; +import com.java3y.austin.common.domain.TaskInfo; +import com.java3y.austin.handler.receiver.MessageReceiver; +import com.java3y.austin.handler.receiver.service.ConsumeService; +import com.java3y.austin.support.constans.MessageQueuePipeline; +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; +import org.springframework.data.redis.core.StringRedisTemplate; +import org.springframework.stereotype.Component; + +import javax.annotation.PostConstruct; +import java.util.List; +import java.util.Optional; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.function.Consumer; + +/** + * + * Redis 消息队列实现类 + * + * @author xiaoxiamao + * @date 2024/7/4 + */ +@Slf4j +@Component +@ConditionalOnProperty(name = "austin.mq.pipeline", havingValue = MessageQueuePipeline.REDIS) +public class RedisReceiver implements MessageReceiver { + + @Value("${austin.business.topic.name}") + private String sendTopic; + @Value("${austin.business.recall.topic.name}") + private String recallTopic; + + @Autowired + private StringRedisTemplate stringRedisTemplate; + @Autowired + private ConsumeService consumeService; + + /** + * 初始化调度线程池 + */ + @PostConstruct + public void init() { + // 创建调度线程池 + ScheduledExecutorService scheduler = new ScheduledThreadPoolExecutor(2, + r -> new Thread(r, "RedisReceiverThread")); + // 定时调度 + scheduler.scheduleWithFixedDelay(this::receiveSendMessage, 0, 1, TimeUnit.SECONDS); + scheduler.scheduleWithFixedDelay(this::receiveRecallMessage, 0, 1, TimeUnit.SECONDS); + } + + /** + * 消费发送消息 + */ + public void receiveSendMessage() { + receiveMessage(sendTopic, message -> { + log.debug("RedisReceiver#receiveSendMessage message:{}", message); + List taskInfoList = JSON.parseArray(message, TaskInfo.class); + consumeService.consume2Send(taskInfoList); + }); + } + + /** + * 消费撤回消息 + */ + public void receiveRecallMessage() { + receiveMessage(recallTopic, message -> { + log.debug("RedisReceiver#receiveRecallMessage recallTaskInfo:{}", message); + RecallTaskInfo recallTaskInfo = JSON.parseObject(message, RecallTaskInfo.class); + consumeService.consume2recall(recallTaskInfo); + }); + } + + /** + * 消息处理方法 + * + * 处理责任链有去重处理,此处暂不做 + * + * @param topic 消息主题 + * @param consumer 消费处理逻辑 + */ + private void receiveMessage(String topic, Consumer consumer) { + try { + while (true) { + // 阻塞操作,减少CPU,IO消耗 + Optional message = Optional.ofNullable( + stringRedisTemplate.opsForList().rightPop(topic, 20, TimeUnit.SECONDS)); + message.ifPresent(consumer); + } + } catch (Exception e) { + log.error("RedisReceiver#receiveMessage Error receiving messages from Redis topic {}: {}", + topic, e.getMessage()); + } + } +} diff --git a/austin-support/src/main/java/com/java3y/austin/support/constans/MessageQueuePipeline.java b/austin-support/src/main/java/com/java3y/austin/support/constans/MessageQueuePipeline.java index 8859bf1..5517508 100644 --- a/austin-support/src/main/java/com/java3y/austin/support/constans/MessageQueuePipeline.java +++ b/austin-support/src/main/java/com/java3y/austin/support/constans/MessageQueuePipeline.java @@ -8,6 +8,7 @@ package com.java3y.austin.support.constans; */ public class MessageQueuePipeline { public static final String EVENT_BUS = "eventBus"; + public static final String REDIS = "redis"; public static final String KAFKA = "kafka"; public static final String ROCKET_MQ = "rocketMq"; public static final String RABBIT_MQ = "rabbitMq"; diff --git a/austin-support/src/main/java/com/java3y/austin/support/mq/redis/RedisSendMqServiceImpl.java b/austin-support/src/main/java/com/java3y/austin/support/mq/redis/RedisSendMqServiceImpl.java new file mode 100644 index 0000000..ca5178a --- /dev/null +++ b/austin-support/src/main/java/com/java3y/austin/support/mq/redis/RedisSendMqServiceImpl.java @@ -0,0 +1,63 @@ +package com.java3y.austin.support.mq.redis; + +import com.java3y.austin.support.constans.MessageQueuePipeline; +import com.java3y.austin.support.mq.SendMqService; +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; +import org.springframework.data.redis.core.StringRedisTemplate; +import org.springframework.stereotype.Service; + +/** + * Redis 消息队列实现类 + * + * Guava Eventbus 和 Spring EventBus 只适用于单体服务 + * Redis 适合单体、微服务,且无需单独部署三方消息队列,方便开发与简单应用 + * + * @author xiaoxiamao + * @date 2024/7/4 + */ +@Slf4j +@Service +@ConditionalOnProperty(name = "austin.mq.pipeline", havingValue = MessageQueuePipeline.REDIS) +public class RedisSendMqServiceImpl implements SendMqService { + + @Autowired + private StringRedisTemplate stringRedisTemplate; + + @Value("${austin.business.topic.name}") + private String sendTopic; + @Value("${austin.business.recall.topic.name}") + private String recallTopic; + + /** + * Redis 发送消息,左进右出 + * + * @param topic + * @param jsonValue + * @param tagId + */ + @Override + public void send(String topic, String jsonValue, String tagId) { + // 非业务topic,抛错不发送 + if (!sendTopic.equals(topic) && !recallTopic.equals(topic)) { + log.error("RedisSendMqServiceImpl#send The topic type is not supported! topic:{}, jsonValue:{}, tagId:{}", + topic, jsonValue, tagId); + return; + } + log.debug("RedisSendMqServiceImpl#send topic:{}, jsonValue:{}, tagId:{}", topic, jsonValue, tagId); + stringRedisTemplate.opsForList().leftPush(topic, jsonValue); + } + + /** + * Redis 发送消息 + * + * @param topic + * @param jsonValue + */ + @Override + public void send(String topic, String jsonValue) { + send(topic, jsonValue, null); + } +}