|
|
|
@ -1,11 +1,9 @@
|
|
|
|
|
package com.java3y.austin.handler.receiver.kafka;
|
|
|
|
|
|
|
|
|
|
import com.alibaba.fastjson.JSON;
|
|
|
|
|
import com.java3y.austin.handler.utils.GroupIdMappingUtils;
|
|
|
|
|
import com.java3y.austin.support.constans.MessageQueuePipeline;
|
|
|
|
|
import lombok.extern.slf4j.Slf4j;
|
|
|
|
|
import org.apache.kafka.clients.consumer.ConsumerRecord;
|
|
|
|
|
import org.apache.kafka.common.header.Header;
|
|
|
|
|
import com.java3y.austin.support.constans.MessageQueuePipeline;
|
|
|
|
|
import org.springframework.beans.factory.annotation.Autowired;
|
|
|
|
|
import org.springframework.beans.factory.annotation.Value;
|
|
|
|
|
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
|
|
|
|
@ -14,8 +12,6 @@ import org.springframework.context.annotation.Bean;
|
|
|
|
|
import org.springframework.kafka.annotation.KafkaListenerAnnotationBeanPostProcessor;
|
|
|
|
|
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
|
|
|
|
|
import org.springframework.kafka.core.ConsumerFactory;
|
|
|
|
|
import org.springframework.kafka.listener.ContainerProperties;
|
|
|
|
|
import org.springframework.kafka.listener.adapter.RecordFilterStrategy;
|
|
|
|
|
import org.springframework.stereotype.Service;
|
|
|
|
|
|
|
|
|
|
import javax.annotation.PostConstruct;
|
|
|
|
@ -40,6 +36,9 @@ public class ReceiverStart {
|
|
|
|
|
@Autowired
|
|
|
|
|
private ConsumerFactory consumerFactory;
|
|
|
|
|
|
|
|
|
|
@Value("${austin.nacos.enabled}")
|
|
|
|
|
private Boolean nacosEnabled;
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* receiver的消费方法常量
|
|
|
|
|
*/
|
|
|
|
@ -60,7 +59,13 @@ public class ReceiverStart {
|
|
|
|
|
*/
|
|
|
|
|
@PostConstruct
|
|
|
|
|
public void init() {
|
|
|
|
|
for (int i = 0; i < groupIds.size(); i++) {
|
|
|
|
|
int total = groupIds.size();
|
|
|
|
|
if (nacosEnabled) {
|
|
|
|
|
// 当nacos开启时 会导致Receiver提前加载 所以这里getBean次数-1
|
|
|
|
|
// nacos issue: https://github.com/nacos-group/nacos-spring-project/issues/249
|
|
|
|
|
total -= 1;
|
|
|
|
|
}
|
|
|
|
|
for (int i = 0; i < total; i++) {
|
|
|
|
|
context.getBean(Receiver.class);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
@ -74,8 +79,7 @@ public class ReceiverStart {
|
|
|
|
|
if (element instanceof Method) {
|
|
|
|
|
String name = ((Method) element).getDeclaringClass().getSimpleName() + "." + ((Method) element).getName();
|
|
|
|
|
if (RECEIVER_METHOD_NAME.equals(name)) {
|
|
|
|
|
attrs.put("groupId", groupIds.get(index));
|
|
|
|
|
index++;
|
|
|
|
|
attrs.put("groupId", groupIds.get(index++));
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
return attrs;
|
|
|
|
|