Merge pull request #65 from xiaoxiamo/redis

添加Redis消息队列
pull/68/head
Java3y 5 months ago committed by GitHub
commit 8782c142be
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

@ -0,0 +1,102 @@
package com.java3y.austin.handler.receiver.redis;
import com.alibaba.fastjson.JSON;
import com.java3y.austin.common.domain.RecallTaskInfo;
import com.java3y.austin.common.domain.TaskInfo;
import com.java3y.austin.handler.receiver.MessageReceiver;
import com.java3y.austin.handler.receiver.service.ConsumeService;
import com.java3y.austin.support.constans.MessageQueuePipeline;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
/**
*
* Redis
*
* @author xiaoxiamao
* @date 2024/7/4
*/
@Slf4j
@Component
@ConditionalOnProperty(name = "austin.mq.pipeline", havingValue = MessageQueuePipeline.REDIS)
public class RedisReceiver implements MessageReceiver {
@Value("${austin.business.topic.name}")
private String sendTopic;
@Value("${austin.business.recall.topic.name}")
private String recallTopic;
@Autowired
private StringRedisTemplate stringRedisTemplate;
@Autowired
private ConsumeService consumeService;
/**
* 线
*/
@PostConstruct
public void init() {
// 创建调度线程池
ScheduledExecutorService scheduler = new ScheduledThreadPoolExecutor(2,
r -> new Thread(r, "RedisReceiverThread"));
// 定时调度
scheduler.scheduleWithFixedDelay(this::receiveSendMessage, 0, 1, TimeUnit.SECONDS);
scheduler.scheduleWithFixedDelay(this::receiveRecallMessage, 0, 1, TimeUnit.SECONDS);
}
/**
*
*/
public void receiveSendMessage() {
receiveMessage(sendTopic, message -> {
log.debug("RedisReceiver#receiveSendMessage message:{}", message);
List<TaskInfo> taskInfoList = JSON.parseArray(message, TaskInfo.class);
consumeService.consume2Send(taskInfoList);
});
}
/**
*
*/
public void receiveRecallMessage() {
receiveMessage(recallTopic, message -> {
log.debug("RedisReceiver#receiveRecallMessage recallTaskInfo:{}", message);
RecallTaskInfo recallTaskInfo = JSON.parseObject(message, RecallTaskInfo.class);
consumeService.consume2recall(recallTaskInfo);
});
}
/**
*
*
*
*
* @param topic
* @param consumer
*/
private void receiveMessage(String topic, Consumer<String> consumer) {
try {
while (true) {
// 阻塞操作减少CPUIO消耗
Optional<String> message = Optional.ofNullable(
stringRedisTemplate.opsForList().rightPop(topic, 20, TimeUnit.SECONDS));
message.ifPresent(consumer);
}
} catch (Exception e) {
log.error("RedisReceiver#receiveMessage Error receiving messages from Redis topic {}: {}",
topic, e.getMessage());
}
}
}

@ -8,6 +8,7 @@ package com.java3y.austin.support.constans;
*/
public class MessageQueuePipeline {
public static final String EVENT_BUS = "eventBus";
public static final String REDIS = "redis";
public static final String KAFKA = "kafka";
public static final String ROCKET_MQ = "rocketMq";
public static final String RABBIT_MQ = "rabbitMq";

@ -0,0 +1,63 @@
package com.java3y.austin.support.mq.redis;
import com.java3y.austin.support.constans.MessageQueuePipeline;
import com.java3y.austin.support.mq.SendMqService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Service;
/**
* Redis
*
* Guava Eventbus Spring EventBus
* Redis 便
*
* @author xiaoxiamao
* @date 2024/7/4
*/
@Slf4j
@Service
@ConditionalOnProperty(name = "austin.mq.pipeline", havingValue = MessageQueuePipeline.REDIS)
public class RedisSendMqServiceImpl implements SendMqService {
@Autowired
private StringRedisTemplate stringRedisTemplate;
@Value("${austin.business.topic.name}")
private String sendTopic;
@Value("${austin.business.recall.topic.name}")
private String recallTopic;
/**
* Redis
*
* @param topic
* @param jsonValue
* @param tagId
*/
@Override
public void send(String topic, String jsonValue, String tagId) {
// 非业务topic抛错不发送
if (!sendTopic.equals(topic) && !recallTopic.equals(topic)) {
log.error("RedisSendMqServiceImpl#send The topic type is not supported! topic:{}, jsonValue:{}, tagId:{}",
topic, jsonValue, tagId);
return;
}
log.debug("RedisSendMqServiceImpl#send topic:{}, jsonValue:{}, tagId:{}", topic, jsonValue, tagId);
stringRedisTemplate.opsForList().leftPush(topic, jsonValue);
}
/**
* Redis
*
* @param topic
* @param jsonValue
*/
@Override
public void send(String topic, String jsonValue) {
send(topic, jsonValue, null);
}
}
Loading…
Cancel
Save