vip 支持 kafka tag 过滤

pull/9/head
3y 2 years ago
parent 144491dbc9
commit 876f79d2c6

@ -13,6 +13,7 @@ import com.java3y.austin.handler.utils.GroupIdMappingUtils;
import com.java3y.austin.support.domain.MessageTemplate;
import com.java3y.austin.support.utils.LogUtils;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.config.ConfigurableBeanFactory;
@ -50,10 +51,11 @@ public class Receiver {
/**
*
*
* @param consumerRecord
* @param topicGroupId
*/
@KafkaListener(topics = "#{'${austin.business.topic.name}'}")
@KafkaListener(topics = "#{'${austin.business.topic.name}'}", containerFactory = "filterContainerFactory")
public void consumer(ConsumerRecord<?, String> consumerRecord, @Header(KafkaHeaders.GROUP_ID) String topicGroupId) {
Optional<String> kafkaMessage = Optional.ofNullable(consumerRecord.value());
if (kafkaMessage.isPresent()) {
@ -78,7 +80,7 @@ public class Receiver {
*
* @param consumerRecord
*/
@KafkaListener(topics = "#{'${austin.business.recall.topic.name}'}",groupId = "#{'${austin.business.recall.group.name}'}")
@KafkaListener(topics = "#{'${austin.business.recall.topic.name}'}",groupId = "#{'${austin.business.recall.group.name}'}",containerFactory = "filterContainerFactory")
public void recall(ConsumerRecord<?,String> consumerRecord){
Optional<String> kafkaMessage = Optional.ofNullable(consumerRecord.value());
if(kafkaMessage.isPresent()){

@ -1,15 +1,26 @@
package com.java3y.austin.handler.receiver;
import com.alibaba.fastjson.JSON;
import com.java3y.austin.handler.utils.GroupIdMappingUtils;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.header.Header;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.ApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.kafka.annotation.KafkaListenerAnnotationBeanPostProcessor;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.listener.ContainerProperties;
import org.springframework.kafka.listener.adapter.RecordFilterStrategy;
import org.springframework.stereotype.Service;
import javax.annotation.PostConstruct;
import java.lang.reflect.Method;
import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.Optional;
/**
*
@ -18,9 +29,12 @@ import java.util.List;
* @date 2021/12/4
*/
@Service
@Slf4j
public class ReceiverStart {
@Autowired
private ApplicationContext context;
@Autowired
private ConsumerFactory consumerFactory;
/**
* receiver
@ -63,4 +77,30 @@ public class ReceiverStart {
return attrs;
};
}
/**
* tag
* producer tagheader
* @return
*/
@Bean
public ConcurrentKafkaListenerContainerFactory filterContainerFactory(@Value("${austin.business.tagId.key}") String tagIdKey,
@Value("${austin.business.tagId.value}") String tagIdValue) {
ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory();
factory.setConsumerFactory(consumerFactory);
factory.setAckDiscarded(true);
factory.setRecordFilterStrategy(consumerRecord -> {
if (Optional.ofNullable(consumerRecord.value()).isPresent()) {
for (Header header : consumerRecord.headers()) {
if (header.key().equals(tagIdKey) && new String(header.value()).equals(new String(tagIdValue.getBytes(StandardCharsets.UTF_8)))) {
return false;
}
}
}
//返回true将会被丢弃
return true;
});
return factory;
}
}

@ -33,16 +33,19 @@ public class SendMqAction implements BusinessProcess<SendTaskModel> {
@Value("${austin.business.recall.topic.name}")
private String austinRecall;
@Value("${austin.business.tagId.value}")
private String tagId;
@Override
public void process(ProcessContext<SendTaskModel> context) {
SendTaskModel sendTaskModel = context.getProcessModel();
try {
if (BusinessCode.COMMON_SEND.getCode().equals(context.getCode())) {
String message = JSON.toJSONString(sendTaskModel.getTaskInfo(), new SerializerFeature[]{SerializerFeature.WriteClassName});
kafkaUtils.send(sendMessageTopic, message);
kafkaUtils.send(sendMessageTopic, message, tagId);
} else if (BusinessCode.RECALL.getCode().equals(context.getCode())) {
String message = JSON.toJSONString(sendTaskModel.getMessageTemplate(), new SerializerFeature[]{SerializerFeature.WriteClassName});
kafkaUtils.send(austinRecall, message);
kafkaUtils.send(austinRecall, message, tagId);
}
} catch (Exception e) {
context.setNeedBreak(true).setResponse(BasicResultVO.fail(RespStatusEnum.SERVICE_ERROR));

@ -1,10 +1,19 @@
package com.java3y.austin.support.utils;
import cn.hutool.core.util.StrUtil;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.header.internals.RecordHeader;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.List;
/**
* @author 3y
* @date 2022/2/16
@ -16,6 +25,8 @@ public class KafkaUtils {
@Autowired
private KafkaTemplate kafkaTemplate;
@Value("${austin.business.tagId.key}")
private String tagIdKey;
/**
* kafka
@ -24,7 +35,24 @@ public class KafkaUtils {
* @param jsonMessage
*/
public void send(String topicName, String jsonMessage) {
kafkaTemplate.send(topicName, jsonMessage);
kafkaTemplate.send(topicName, jsonMessage, null);
}
/**
* kafka
* tag
*
* @param topicName
* @param jsonMessage
* @param tagId
*/
public void send(String topicName, String jsonMessage, String tagId) {
if (StrUtil.isNotBlank(tagId)) {
List<Header> headers = Arrays.asList(new RecordHeader(tagIdKey, tagId.getBytes(StandardCharsets.UTF_8)));
kafkaTemplate.send(new ProducerRecord(topicName, null, null, null, jsonMessage, headers));
} else {
kafkaTemplate.send(topicName, jsonMessage);
}
}
}

@ -60,6 +60,11 @@ austin.business.recall.topic.name=austinRecall
austin.business.recall.group.name=recallGroupId
austin.business.log.topic.name=austinLog
austin.business.graylog.ip=${austin-grayLog-ip}
# TODO kafka tag filter,if you need, replace tagIdValue ,eg:com.java3y.austin.yyy
austin.business.tagId.key=kafka_tag_id
austin.business.tagId.value=com.java3y.austin.3y
# TODO if windows os and need upload file to send message ,replace path !
austin.business.upload.crowd.path=/Users/3y/temp

@ -1,6 +1,6 @@
discardMsgIds = []
deduplicationRule = {"deduplication_10":{"num":1,"time":300},"deduplication_20":{"num":5}}
emailAccount =[{"email_10":{"host":"smtp.qq.com","port":465,"user":"23423423@qq.com","pass":"23423432432423423","from":"234@qq.com","starttlsEnable":true,"auth":true,"sslEnable":true}},{"email_20":{"host":"smtp.163.com","port":465,"user":"22222@163.com","pass":"23432423","from":"234324324234@163.com","starttlsEnable":false,"auth":true,"sslEnable":true}}]
emailAccount = [{"email_10":{"host":"smtp.qq.com","port":465,"user":"23423432@qq.com","pass":"234324324","from":"123123@qq.com","starttlsEnable":"true","auth":true,"sslEnable":true}},{"email_20":{"host":"smtp.163.com","port":465,"user":"23423423@163.com","pass":"234234324","from":"112312312@163.com","starttlsEnable":"false","auth":true,"sslEnable":true}}]
smsAccount = [{"sms_10":{"url":"sms.tencentcloudapi.com","region":"ap-guangzhou","secretId":"234234","secretKey":"234324324","smsSdkAppId":"2343242","templateId":"234234","signName":"Java3y公众号","supplierId":10,"supplierName":"腾讯云"}},{"sms_20":{"url":"https://sms.yunpian.com/v2/sms/tpl_batch_send.json","apikey":"23423432","tpl_id":"23423432","supplierId":20,"supplierName":"云片"}}]
enterpriseWechatAccount = [{"enterprise_wechat_10":{"corpId":"23423423","corpSecret":"-234324234","agentId":1000002,"token":"234234","aesKey":"23423423"}}]
dingDingRobotAccount = [{"ding_ding_robot_10":{"secret":"234324324324","webhook":"https://oapi.dingtalk.com/robot/send?access_token=8d03b68d081f732343243242343247328b0c3003d164715d2c6c6e56"}}]

Loading…
Cancel
Save