feat: 添加Redis做消息队列(三):测试Redis消息队列通过,减少非必要日志打印

pull/65/head
xiaoxiamo 6 months ago
parent 53fea21a44
commit da6b5a8ad1

@ -11,11 +11,13 @@ import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value; import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.data.redis.core.StringRedisTemplate; import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import java.util.List; import java.util.List;
import java.util.Optional; import java.util.Optional;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.function.Consumer; import java.util.function.Consumer;
@ -31,26 +33,35 @@ import java.util.function.Consumer;
@ConditionalOnProperty(name = "austin.mq.pipeline", havingValue = MessageQueuePipeline.REDIS) @ConditionalOnProperty(name = "austin.mq.pipeline", havingValue = MessageQueuePipeline.REDIS)
public class RedisReceiver implements MessageReceiver { public class RedisReceiver implements MessageReceiver {
@Value("${austin.business.topic.name}")
private String sendTopic;
@Value("${austin.business.recall.topic.name}")
private String recallTopic;
@Autowired @Autowired
private StringRedisTemplate stringRedisTemplate; private StringRedisTemplate stringRedisTemplate;
@Autowired @Autowired
private ConsumeService consumeService; private ConsumeService consumeService;
@Value("${austin.business.topic.name}") /**
private String sendTopic; * 线
@Value("${austin.business.recall.topic.name}") */
private String recallTopic; @PostConstruct
public void init() {
// 创建调度线程池
ScheduledExecutorService scheduler = new ScheduledThreadPoolExecutor(2,
r -> new Thread(r, "RedisReceiverThread"));
// 定时调度
scheduler.scheduleWithFixedDelay(this::receiveSendMessage, 0, 1, TimeUnit.SECONDS);
scheduler.scheduleWithFixedDelay(this::receiveRecallMessage, 0, 1, TimeUnit.SECONDS);
}
/** /**
* *
*
* @Scheduled() 退
*
*/ */
@Scheduled(fixedDelay = 1000)
public void receiveSendMessage() { public void receiveSendMessage() {
receiveMessage(sendTopic, message -> { receiveMessage(sendTopic, message -> {
log.debug("RedisReceiver#receiveSendMessage message:{}", message);
List<TaskInfo> taskInfoList = JSON.parseArray(message, TaskInfo.class); List<TaskInfo> taskInfoList = JSON.parseArray(message, TaskInfo.class);
consumeService.consume2Send(taskInfoList); consumeService.consume2Send(taskInfoList);
}); });
@ -59,9 +70,9 @@ public class RedisReceiver implements MessageReceiver {
/** /**
* *
*/ */
@Scheduled(fixedDelay = 1000)
public void receiveRecallMessage() { public void receiveRecallMessage() {
receiveMessage(recallTopic, message -> { receiveMessage(recallTopic, message -> {
log.debug("RedisReceiver#receiveRecallMessage recallTaskInfo:{}", message);
RecallTaskInfo recallTaskInfo = JSON.parseObject(message, RecallTaskInfo.class); RecallTaskInfo recallTaskInfo = JSON.parseObject(message, RecallTaskInfo.class);
consumeService.consume2recall(recallTaskInfo); consumeService.consume2recall(recallTaskInfo);
}); });
@ -80,14 +91,12 @@ public class RedisReceiver implements MessageReceiver {
while (true) { while (true) {
// 阻塞操作减少CPUIO消耗 // 阻塞操作减少CPUIO消耗
Optional<String> message = Optional.ofNullable( Optional<String> message = Optional.ofNullable(
stringRedisTemplate.opsForList().rightPop(topic, 0, TimeUnit.SECONDS)); stringRedisTemplate.opsForList().rightPop(topic, 20, TimeUnit.SECONDS));
log.debug("RedisReceiver#receiveMessage Received message from Redis topic {}: {}", topic, message); message.ifPresent(consumer);
if (message.isPresent()) {
consumer.accept(message.get());
}
} }
} catch (Exception e) { } catch (Exception e) {
log.error("RedisReceiver#receiveMessage Error receiving messages from Redis topic {}: {}", topic, e); log.error("RedisReceiver#receiveMessage Error receiving messages from Redis topic {}: {}",
topic, e.getMessage());
} }
} }
} }

Loading…
Cancel
Save