|
|
|
@ -14,6 +14,7 @@ import org.springframework.data.redis.core.StringRedisTemplate;
|
|
|
|
|
import org.springframework.stereotype.Component;
|
|
|
|
|
|
|
|
|
|
import javax.annotation.PostConstruct;
|
|
|
|
|
import javax.annotation.PreDestroy;
|
|
|
|
|
import java.util.List;
|
|
|
|
|
import java.util.Optional;
|
|
|
|
|
import java.util.concurrent.ScheduledExecutorService;
|
|
|
|
@ -22,7 +23,6 @@ import java.util.concurrent.TimeUnit;
|
|
|
|
|
import java.util.function.Consumer;
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
*
|
|
|
|
|
* Redis 消息队列实现类
|
|
|
|
|
*
|
|
|
|
|
* @author xiaoxiamao
|
|
|
|
@ -43,13 +43,18 @@ public class RedisReceiver implements MessageReceiver {
|
|
|
|
|
@Autowired
|
|
|
|
|
private ConsumeService consumeService;
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* 调度线程池
|
|
|
|
|
*/
|
|
|
|
|
private ScheduledExecutorService scheduler;
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* 初始化调度线程池
|
|
|
|
|
*/
|
|
|
|
|
@PostConstruct
|
|
|
|
|
public void init() {
|
|
|
|
|
// 创建调度线程池
|
|
|
|
|
ScheduledExecutorService scheduler = new ScheduledThreadPoolExecutor(2,
|
|
|
|
|
this.scheduler = new ScheduledThreadPoolExecutor(2,
|
|
|
|
|
r -> new Thread(r, "RedisReceiverThread"));
|
|
|
|
|
// 定时调度
|
|
|
|
|
scheduler.scheduleWithFixedDelay(this::receiveSendMessage, 0, 1, TimeUnit.SECONDS);
|
|
|
|
@ -80,23 +85,39 @@ public class RedisReceiver implements MessageReceiver {
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* 消息处理方法
|
|
|
|
|
*
|
|
|
|
|
* <p>
|
|
|
|
|
* 处理责任链有去重处理,此处暂不做
|
|
|
|
|
*
|
|
|
|
|
* @param topic 消息主题
|
|
|
|
|
* @param topic 消息主题
|
|
|
|
|
* @param consumer 消费处理逻辑
|
|
|
|
|
*/
|
|
|
|
|
private void receiveMessage(String topic, Consumer<String> consumer) {
|
|
|
|
|
try {
|
|
|
|
|
while (true) {
|
|
|
|
|
while (true) {
|
|
|
|
|
try {
|
|
|
|
|
// 阻塞操作,减少CPU,IO消耗
|
|
|
|
|
Optional<String> message = Optional.ofNullable(
|
|
|
|
|
stringRedisTemplate.opsForList().rightPop(topic, 20, TimeUnit.SECONDS));
|
|
|
|
|
message.ifPresent(consumer);
|
|
|
|
|
} catch (Exception e) {
|
|
|
|
|
log.error("RedisReceiver#receiveMessage Error receiving messages from Redis topic {}: {}",
|
|
|
|
|
topic, e.getMessage());
|
|
|
|
|
try {
|
|
|
|
|
TimeUnit.SECONDS.sleep(10);
|
|
|
|
|
} catch (InterruptedException ex) {
|
|
|
|
|
log.error("RedisReceiver#receiveMessage interrupted: {}", e.getMessage());
|
|
|
|
|
Thread.currentThread().interrupt();
|
|
|
|
|
break;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
} catch (Exception e) {
|
|
|
|
|
log.error("RedisReceiver#receiveMessage Error receiving messages from Redis topic {}: {}",
|
|
|
|
|
topic, e.getMessage());
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* 销毁调用
|
|
|
|
|
*/
|
|
|
|
|
@PreDestroy
|
|
|
|
|
public void onDestroy() {
|
|
|
|
|
scheduler.shutdown();
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
}
|
|
|
|
|