|
|
@ -1,5 +1,6 @@
|
|
|
|
package com.java3y.austin.handler.receiver.kafka;
|
|
|
|
package com.java3y.austin.handler.receiver.kafka;
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
import cn.hutool.core.util.StrUtil;
|
|
|
|
import com.java3y.austin.handler.utils.GroupIdMappingUtils;
|
|
|
|
import com.java3y.austin.handler.utils.GroupIdMappingUtils;
|
|
|
|
import com.java3y.austin.support.constans.MessageQueuePipeline;
|
|
|
|
import com.java3y.austin.support.constans.MessageQueuePipeline;
|
|
|
|
import lombok.extern.slf4j.Slf4j;
|
|
|
|
import lombok.extern.slf4j.Slf4j;
|
|
|
@ -68,7 +69,7 @@ public class ReceiverStart {
|
|
|
|
public static KafkaListenerAnnotationBeanPostProcessor.AnnotationEnhancer groupIdEnhancer() {
|
|
|
|
public static KafkaListenerAnnotationBeanPostProcessor.AnnotationEnhancer groupIdEnhancer() {
|
|
|
|
return (attrs, element) -> {
|
|
|
|
return (attrs, element) -> {
|
|
|
|
if (element instanceof Method) {
|
|
|
|
if (element instanceof Method) {
|
|
|
|
String name = ((Method) element).getDeclaringClass().getSimpleName() + "." + ((Method) element).getName();
|
|
|
|
String name = ((Method) element).getDeclaringClass().getSimpleName() + StrUtil.DOT + ((Method) element).getName();
|
|
|
|
if (RECEIVER_METHOD_NAME.equals(name)) {
|
|
|
|
if (RECEIVER_METHOD_NAME.equals(name)) {
|
|
|
|
attrs.put("groupId", groupIds.get(index++));
|
|
|
|
attrs.put("groupId", groupIds.get(index++));
|
|
|
|
}
|
|
|
|
}
|
|
|
@ -81,7 +82,7 @@ public class ReceiverStart {
|
|
|
|
* 针对tag消息过滤
|
|
|
|
* 针对tag消息过滤
|
|
|
|
* producer 将tag写进header里
|
|
|
|
* producer 将tag写进header里
|
|
|
|
*
|
|
|
|
*
|
|
|
|
* @return
|
|
|
|
* @return true 消息将会被丢弃
|
|
|
|
*/
|
|
|
|
*/
|
|
|
|
@Bean
|
|
|
|
@Bean
|
|
|
|
public ConcurrentKafkaListenerContainerFactory filterContainerFactory(@Value("${austin.business.tagId.key}") String tagIdKey,
|
|
|
|
public ConcurrentKafkaListenerContainerFactory filterContainerFactory(@Value("${austin.business.tagId.key}") String tagIdKey,
|
|
|
@ -98,7 +99,6 @@ public class ReceiverStart {
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
//返回true将会被丢弃
|
|
|
|
|
|
|
|
return true;
|
|
|
|
return true;
|
|
|
|
});
|
|
|
|
});
|
|
|
|
return factory;
|
|
|
|
return factory;
|
|
|
|