feat: 添加Redis做消息队列(一):添加Redis消息队列

pull/65/head
xiaoxiamo 6 months ago
parent 780d769575
commit 171f4ba16c

@ -0,0 +1,89 @@
package com.java3y.austin.handler.receiver.redis;
import com.alibaba.fastjson.JSON;
import com.java3y.austin.common.domain.RecallTaskInfo;
import com.java3y.austin.common.domain.TaskInfo;
import com.java3y.austin.handler.receiver.MessageReceiver;
import com.java3y.austin.handler.receiver.service.ConsumeService;
import com.java3y.austin.support.constans.MessageQueuePipeline;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
/**
* Redis
*
* Guava Eventbus Spring EventBus
* Redis 便
*
* @author xiaoxiamao
* @date 2024/7/4
*/
@Slf4j
@Component
@ConditionalOnProperty(name = "austin.mq.pipeline", havingValue = MessageQueuePipeline.REDIS)
public class RedisReceiver implements MessageReceiver {
@Autowired
private StringRedisTemplate stringRedisTemplate;
@Autowired
private ConsumeService consumeService;
@Value("${austin.business.topic.name}")
private String sendTopic;
@Value("${austin.business.recall.topic.name}")
private String recallTopic;
/**
*
*/
@Scheduled(fixedDelay = 5000)
public void receiveSendMessage() {
receiveMessage(sendTopic, message -> {
List<TaskInfo> taskInfoList = JSON.parseArray(message, TaskInfo.class);
consumeService.consume2Send(taskInfoList);
});
}
/**
*
*/
@Scheduled(fixedDelay = 5000)
public void receiveRecallMessage() {
receiveMessage(recallTopic, message -> {
RecallTaskInfo recallTaskInfo = JSON.parseObject(message, RecallTaskInfo.class);
consumeService.consume2recall(recallTaskInfo);
});
}
/**
*
*
* @param topic
* @param consumer
*/
private void receiveMessage(String topic, Consumer<String> consumer) {
try {
while (true) {
// 阻塞操作减少CPUIO消耗
Optional<String> message = Optional.ofNullable(
stringRedisTemplate.opsForList().rightPop(topic, 0, TimeUnit.SECONDS));
if (message.isPresent()) {
consumer.accept(message.get());
}
}
} catch (Exception e) {
log.error("Error receiving messages from Redis topic {}: {}", topic, e);
}
}
}

@ -8,6 +8,7 @@ package com.java3y.austin.support.constans;
*/ */
public class MessageQueuePipeline { public class MessageQueuePipeline {
public static final String EVENT_BUS = "eventBus"; public static final String EVENT_BUS = "eventBus";
public static final String REDIS = "redis";
public static final String KAFKA = "kafka"; public static final String KAFKA = "kafka";
public static final String ROCKET_MQ = "rocketMq"; public static final String ROCKET_MQ = "rocketMq";
public static final String RABBIT_MQ = "rabbitMq"; public static final String RABBIT_MQ = "rabbitMq";

@ -0,0 +1,59 @@
package com.java3y.austin.support.mq.redis;
import com.java3y.austin.support.constans.MessageQueuePipeline;
import com.java3y.austin.support.mq.SendMqService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Service;
/**
* Redis
*
* @author xiaoxiamao
* @date 2024/7/4
*/
@Slf4j
@Service
@ConditionalOnProperty(name = "austin.mq.pipeline", havingValue = MessageQueuePipeline.REDIS)
public class RedisSendMqServiceImpl implements SendMqService {
@Autowired
private StringRedisTemplate stringRedisTemplate;
@Value("${austin.business.topic.name}")
private String sendTopic;
@Value("${austin.business.recall.topic.name}")
private String recallTopic;
/**
* Redis
*
* @param topic
* @param jsonValue
* @param tagId
*/
@Override
public void send(String topic, String jsonValue, String tagId) {
// 非业务topic抛错不发送
if (!sendTopic.equals(topic) && !recallTopic.equals(topic)) {
log.error("RedisSendMqServiceImpl#The topic type is not supported! topic:{}, jsonValue:{}, tagId:{}",
topic, jsonValue, tagId);
return;
}
stringRedisTemplate.opsForList().leftPush(topic, jsonValue);
}
/**
* Redis
*
* @param topic
* @param jsonValue
*/
@Override
public void send(String topic, String jsonValue) {
send(topic, jsonValue, null);
}
}
Loading…
Cancel
Save