feat: 抽象消息接收者接口,方便管理与拓展

pull/63/head
xiaoxiamo 2 weeks ago
parent 0fba9367c9
commit 4c884afdb5

@ -0,0 +1,11 @@
package com.java3y.austin.handler.receiver;
/**
*
*
* @author xiaoxiamao
* @date 2024/06/16
*/
public interface MessageReceiver {
}

@ -3,6 +3,7 @@ package com.java3y.austin.handler.receiver.eventbus;
import com.google.common.eventbus.Subscribe; import com.google.common.eventbus.Subscribe;
import com.java3y.austin.common.domain.RecallTaskInfo; import com.java3y.austin.common.domain.RecallTaskInfo;
import com.java3y.austin.common.domain.TaskInfo; import com.java3y.austin.common.domain.TaskInfo;
import com.java3y.austin.handler.receiver.MessageReceiver;
import com.java3y.austin.handler.receiver.service.ConsumeService; import com.java3y.austin.handler.receiver.service.ConsumeService;
import com.java3y.austin.support.constans.MessageQueuePipeline; import com.java3y.austin.support.constans.MessageQueuePipeline;
import com.java3y.austin.support.mq.eventbus.EventBusListener; import com.java3y.austin.support.mq.eventbus.EventBusListener;
@ -17,7 +18,7 @@ import java.util.List;
*/ */
@Component @Component
@ConditionalOnProperty(name = "austin.mq.pipeline", havingValue = MessageQueuePipeline.EVENT_BUS) @ConditionalOnProperty(name = "austin.mq.pipeline", havingValue = MessageQueuePipeline.EVENT_BUS)
public class EventBusReceiver implements EventBusListener { public class EventBusReceiver implements EventBusListener, MessageReceiver {
@Autowired @Autowired
private ConsumeService consumeService; private ConsumeService consumeService;

@ -4,6 +4,7 @@ import cn.hutool.core.collection.CollUtil;
import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSON;
import com.java3y.austin.common.domain.RecallTaskInfo; import com.java3y.austin.common.domain.RecallTaskInfo;
import com.java3y.austin.common.domain.TaskInfo; import com.java3y.austin.common.domain.TaskInfo;
import com.java3y.austin.handler.receiver.MessageReceiver;
import com.java3y.austin.handler.receiver.service.ConsumeService; import com.java3y.austin.handler.receiver.service.ConsumeService;
import com.java3y.austin.handler.utils.GroupIdMappingUtils; import com.java3y.austin.handler.utils.GroupIdMappingUtils;
import com.java3y.austin.support.constans.MessageQueuePipeline; import com.java3y.austin.support.constans.MessageQueuePipeline;
@ -29,7 +30,7 @@ import java.util.Optional;
@Component @Component
@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE) @Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
@ConditionalOnProperty(name = "austin.mq.pipeline", havingValue = MessageQueuePipeline.KAFKA) @ConditionalOnProperty(name = "austin.mq.pipeline", havingValue = MessageQueuePipeline.KAFKA)
public class Receiver { public class Receiver implements MessageReceiver {
@Autowired @Autowired
private ConsumeService consumeService; private ConsumeService consumeService;

@ -3,6 +3,7 @@ package com.java3y.austin.handler.receiver.rabbit;
import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSON;
import com.java3y.austin.common.domain.RecallTaskInfo; import com.java3y.austin.common.domain.RecallTaskInfo;
import com.java3y.austin.common.domain.TaskInfo; import com.java3y.austin.common.domain.TaskInfo;
import com.java3y.austin.handler.receiver.MessageReceiver;
import com.java3y.austin.handler.receiver.service.ConsumeService; import com.java3y.austin.handler.receiver.service.ConsumeService;
import com.java3y.austin.support.constans.MessageQueuePipeline; import com.java3y.austin.support.constans.MessageQueuePipeline;
import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.StringUtils;
@ -25,7 +26,7 @@ import java.util.List;
*/ */
@Component @Component
@ConditionalOnProperty(name = "austin.mq.pipeline", havingValue = MessageQueuePipeline.RABBIT_MQ) @ConditionalOnProperty(name = "austin.mq.pipeline", havingValue = MessageQueuePipeline.RABBIT_MQ)
public class RabbitMqReceiver { public class RabbitMqReceiver implements MessageReceiver {
private static final String MSG_TYPE_SEND = "send"; private static final String MSG_TYPE_SEND = "send";
private static final String MSG_TYPE_RECALL = "recall"; private static final String MSG_TYPE_RECALL = "recall";

@ -2,6 +2,7 @@ package com.java3y.austin.handler.receiver.rocketmq;
import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSON;
import com.java3y.austin.common.domain.TaskInfo; import com.java3y.austin.common.domain.TaskInfo;
import com.java3y.austin.handler.receiver.MessageReceiver;
import com.java3y.austin.handler.receiver.service.ConsumeService; import com.java3y.austin.handler.receiver.service.ConsumeService;
import com.java3y.austin.support.constans.MessageQueuePipeline; import com.java3y.austin.support.constans.MessageQueuePipeline;
import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.StringUtils;
@ -27,7 +28,7 @@ import java.util.List;
selectorType = SelectorType.TAG, selectorType = SelectorType.TAG,
selectorExpression = "${austin.business.tagId.value}" selectorExpression = "${austin.business.tagId.value}"
) )
public class RocketMqBizReceiver implements RocketMQListener<String> { public class RocketMqBizReceiver implements RocketMQListener<String>, MessageReceiver {
@Autowired @Autowired
private ConsumeService consumeService; private ConsumeService consumeService;

@ -2,6 +2,7 @@ package com.java3y.austin.handler.receiver.rocketmq;
import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSON;
import com.java3y.austin.common.domain.RecallTaskInfo; import com.java3y.austin.common.domain.RecallTaskInfo;
import com.java3y.austin.handler.receiver.MessageReceiver;
import com.java3y.austin.handler.receiver.service.ConsumeService; import com.java3y.austin.handler.receiver.service.ConsumeService;
import com.java3y.austin.support.constans.MessageQueuePipeline; import com.java3y.austin.support.constans.MessageQueuePipeline;
import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.StringUtils;
@ -25,7 +26,7 @@ import org.springframework.stereotype.Component;
selectorType = SelectorType.TAG, selectorType = SelectorType.TAG,
selectorExpression = "${austin.business.tagId.value}" selectorExpression = "${austin.business.tagId.value}"
) )
public class RocketMqRecallReceiver implements RocketMQListener<String> { public class RocketMqRecallReceiver implements RocketMQListener<String>, MessageReceiver {
@Autowired @Autowired
private ConsumeService consumeService; private ConsumeService consumeService;

@ -3,6 +3,7 @@ package com.java3y.austin.handler.receiver.springeventbus;
import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSON;
import com.java3y.austin.common.domain.RecallTaskInfo; import com.java3y.austin.common.domain.RecallTaskInfo;
import com.java3y.austin.common.domain.TaskInfo; import com.java3y.austin.common.domain.TaskInfo;
import com.java3y.austin.handler.receiver.MessageReceiver;
import com.java3y.austin.support.constans.MessageQueuePipeline; import com.java3y.austin.support.constans.MessageQueuePipeline;
import com.java3y.austin.support.mq.springeventbus.AustinSpringEventBusEvent; import com.java3y.austin.support.mq.springeventbus.AustinSpringEventBusEvent;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
@ -19,7 +20,7 @@ import org.springframework.stereotype.Service;
*/ */
@Service @Service
@ConditionalOnProperty(name = "austin.mq.pipeline", havingValue = MessageQueuePipeline.SPRING_EVENT_BUS) @ConditionalOnProperty(name = "austin.mq.pipeline", havingValue = MessageQueuePipeline.SPRING_EVENT_BUS)
public class SpringEventBusReceiverListener implements ApplicationListener<AustinSpringEventBusEvent> { public class SpringEventBusReceiverListener implements ApplicationListener<AustinSpringEventBusEvent>, MessageReceiver {
@Autowired @Autowired
private SpringEventBusReceiver springEventBusReceiver; private SpringEventBusReceiver springEventBusReceiver;

File diff suppressed because one or more lines are too long
Loading…
Cancel
Save