From 876f79d2c600e06c32232a29a914ce34bf9f8c97 Mon Sep 17 00:00:00 2001 From: 3y Date: Tue, 28 Jun 2022 20:40:19 +0800 Subject: [PATCH] =?UTF-8?q?vip=20=E6=94=AF=E6=8C=81=20kafka=20tag=20?= =?UTF-8?q?=E8=BF=87=E6=BB=A4?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../austin/handler/receiver/Receiver.java | 6 ++- .../handler/receiver/ReceiverStart.java | 40 +++++++++++++++++++ .../service/api/impl/action/SendMqAction.java | 7 +++- .../austin/support/utils/KafkaUtils.java | 30 +++++++++++++- .../src/main/resources/application.properties | 5 +++ .../src/main/resources/local.properties | 2 +- 6 files changed, 84 insertions(+), 6 deletions(-) diff --git a/austin-handler/src/main/java/com/java3y/austin/handler/receiver/Receiver.java b/austin-handler/src/main/java/com/java3y/austin/handler/receiver/Receiver.java index 29465ea..425f8c3 100644 --- a/austin-handler/src/main/java/com/java3y/austin/handler/receiver/Receiver.java +++ b/austin-handler/src/main/java/com/java3y/austin/handler/receiver/Receiver.java @@ -13,6 +13,7 @@ import com.java3y.austin.handler.utils.GroupIdMappingUtils; import com.java3y.austin.support.domain.MessageTemplate; import com.java3y.austin.support.utils.LogUtils; import lombok.extern.slf4j.Slf4j; +import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.config.ConfigurableBeanFactory; @@ -50,10 +51,11 @@ public class Receiver { /** * 发送消息 + * * @param consumerRecord * @param topicGroupId */ - @KafkaListener(topics = "#{'${austin.business.topic.name}'}") + @KafkaListener(topics = "#{'${austin.business.topic.name}'}", containerFactory = "filterContainerFactory") public void consumer(ConsumerRecord consumerRecord, @Header(KafkaHeaders.GROUP_ID) String topicGroupId) { Optional kafkaMessage = Optional.ofNullable(consumerRecord.value()); if (kafkaMessage.isPresent()) { @@ -78,7 +80,7 @@ public class Receiver { * 撤回消息 * @param consumerRecord */ - @KafkaListener(topics = "#{'${austin.business.recall.topic.name}'}",groupId = "#{'${austin.business.recall.group.name}'}") + @KafkaListener(topics = "#{'${austin.business.recall.topic.name}'}",groupId = "#{'${austin.business.recall.group.name}'}",containerFactory = "filterContainerFactory") public void recall(ConsumerRecord consumerRecord){ Optional kafkaMessage = Optional.ofNullable(consumerRecord.value()); if(kafkaMessage.isPresent()){ diff --git a/austin-handler/src/main/java/com/java3y/austin/handler/receiver/ReceiverStart.java b/austin-handler/src/main/java/com/java3y/austin/handler/receiver/ReceiverStart.java index a7f02a6..754fb43 100644 --- a/austin-handler/src/main/java/com/java3y/austin/handler/receiver/ReceiverStart.java +++ b/austin-handler/src/main/java/com/java3y/austin/handler/receiver/ReceiverStart.java @@ -1,15 +1,26 @@ package com.java3y.austin.handler.receiver; +import com.alibaba.fastjson.JSON; import com.java3y.austin.handler.utils.GroupIdMappingUtils; +import lombok.extern.slf4j.Slf4j; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.common.header.Header; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Value; import org.springframework.context.ApplicationContext; import org.springframework.context.annotation.Bean; import org.springframework.kafka.annotation.KafkaListenerAnnotationBeanPostProcessor; +import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; +import org.springframework.kafka.core.ConsumerFactory; +import org.springframework.kafka.listener.ContainerProperties; +import org.springframework.kafka.listener.adapter.RecordFilterStrategy; import org.springframework.stereotype.Service; import javax.annotation.PostConstruct; import java.lang.reflect.Method; +import java.nio.charset.StandardCharsets; import java.util.List; +import java.util.Optional; /** * 启动消费者 @@ -18,9 +29,12 @@ import java.util.List; * @date 2021/12/4 */ @Service +@Slf4j public class ReceiverStart { @Autowired private ApplicationContext context; + @Autowired + private ConsumerFactory consumerFactory; /** * receiver的消费方法常量 @@ -63,4 +77,30 @@ public class ReceiverStart { return attrs; }; } + + /** + * 针对tag消息过滤 + * producer 将tag写进header里 + * @return + */ + @Bean + public ConcurrentKafkaListenerContainerFactory filterContainerFactory(@Value("${austin.business.tagId.key}") String tagIdKey, + @Value("${austin.business.tagId.value}") String tagIdValue) { + ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory(); + factory.setConsumerFactory(consumerFactory); + factory.setAckDiscarded(true); + + factory.setRecordFilterStrategy(consumerRecord -> { + if (Optional.ofNullable(consumerRecord.value()).isPresent()) { + for (Header header : consumerRecord.headers()) { + if (header.key().equals(tagIdKey) && new String(header.value()).equals(new String(tagIdValue.getBytes(StandardCharsets.UTF_8)))) { + return false; + } + } + } + //返回true将会被丢弃 + return true; + }); + return factory; + } } diff --git a/austin-service-api-impl/src/main/java/com/java3y/austin/service/api/impl/action/SendMqAction.java b/austin-service-api-impl/src/main/java/com/java3y/austin/service/api/impl/action/SendMqAction.java index 7711a7e..d82d37c 100644 --- a/austin-service-api-impl/src/main/java/com/java3y/austin/service/api/impl/action/SendMqAction.java +++ b/austin-service-api-impl/src/main/java/com/java3y/austin/service/api/impl/action/SendMqAction.java @@ -33,16 +33,19 @@ public class SendMqAction implements BusinessProcess { @Value("${austin.business.recall.topic.name}") private String austinRecall; + @Value("${austin.business.tagId.value}") + private String tagId; + @Override public void process(ProcessContext context) { SendTaskModel sendTaskModel = context.getProcessModel(); try { if (BusinessCode.COMMON_SEND.getCode().equals(context.getCode())) { String message = JSON.toJSONString(sendTaskModel.getTaskInfo(), new SerializerFeature[]{SerializerFeature.WriteClassName}); - kafkaUtils.send(sendMessageTopic, message); + kafkaUtils.send(sendMessageTopic, message, tagId); } else if (BusinessCode.RECALL.getCode().equals(context.getCode())) { String message = JSON.toJSONString(sendTaskModel.getMessageTemplate(), new SerializerFeature[]{SerializerFeature.WriteClassName}); - kafkaUtils.send(austinRecall, message); + kafkaUtils.send(austinRecall, message, tagId); } } catch (Exception e) { context.setNeedBreak(true).setResponse(BasicResultVO.fail(RespStatusEnum.SERVICE_ERROR)); diff --git a/austin-support/src/main/java/com/java3y/austin/support/utils/KafkaUtils.java b/austin-support/src/main/java/com/java3y/austin/support/utils/KafkaUtils.java index 193f1ee..b2a1587 100644 --- a/austin-support/src/main/java/com/java3y/austin/support/utils/KafkaUtils.java +++ b/austin-support/src/main/java/com/java3y/austin/support/utils/KafkaUtils.java @@ -1,10 +1,19 @@ package com.java3y.austin.support.utils; +import cn.hutool.core.util.StrUtil; import lombok.extern.slf4j.Slf4j; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.header.Header; +import org.apache.kafka.common.header.internals.RecordHeader; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Value; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.stereotype.Component; +import java.nio.charset.StandardCharsets; +import java.util.Arrays; +import java.util.List; + /** * @author 3y * @date 2022/2/16 @@ -16,6 +25,8 @@ public class KafkaUtils { @Autowired private KafkaTemplate kafkaTemplate; + @Value("${austin.business.tagId.key}") + private String tagIdKey; /** * 发送kafka消息 @@ -24,7 +35,24 @@ public class KafkaUtils { * @param jsonMessage */ public void send(String topicName, String jsonMessage) { - kafkaTemplate.send(topicName, jsonMessage); + kafkaTemplate.send(topicName, jsonMessage, null); } + /** + * 发送kafka消息 + * 支持tag过滤 + * + * @param topicName + * @param jsonMessage + * @param tagId + */ + public void send(String topicName, String jsonMessage, String tagId) { + if (StrUtil.isNotBlank(tagId)) { + List
headers = Arrays.asList(new RecordHeader(tagIdKey, tagId.getBytes(StandardCharsets.UTF_8))); + kafkaTemplate.send(new ProducerRecord(topicName, null, null, null, jsonMessage, headers)); + } else { + kafkaTemplate.send(topicName, jsonMessage); + } + + } } diff --git a/austin-web/src/main/resources/application.properties b/austin-web/src/main/resources/application.properties index b3cdaa0..1e7dbc6 100644 --- a/austin-web/src/main/resources/application.properties +++ b/austin-web/src/main/resources/application.properties @@ -60,6 +60,11 @@ austin.business.recall.topic.name=austinRecall austin.business.recall.group.name=recallGroupId austin.business.log.topic.name=austinLog austin.business.graylog.ip=${austin-grayLog-ip} + +# TODO kafka tag filter,if you need, replace tagIdValue ,eg:com.java3y.austin.yyy +austin.business.tagId.key=kafka_tag_id +austin.business.tagId.value=com.java3y.austin.3y + # TODO if windows os and need upload file to send message ,replace path ! austin.business.upload.crowd.path=/Users/3y/temp diff --git a/austin-web/src/main/resources/local.properties b/austin-web/src/main/resources/local.properties index 8e6f9f2..807c7f2 100644 --- a/austin-web/src/main/resources/local.properties +++ b/austin-web/src/main/resources/local.properties @@ -1,6 +1,6 @@ discardMsgIds = [] deduplicationRule = {"deduplication_10":{"num":1,"time":300},"deduplication_20":{"num":5}} -emailAccount =[{"email_10":{"host":"smtp.qq.com","port":465,"user":"23423423@qq.com","pass":"23423432432423423","from":"234@qq.com","starttlsEnable":true,"auth":true,"sslEnable":true}},{"email_20":{"host":"smtp.163.com","port":465,"user":"22222@163.com","pass":"23432423","from":"234324324234@163.com","starttlsEnable":false,"auth":true,"sslEnable":true}}] +emailAccount = [{"email_10":{"host":"smtp.qq.com","port":465,"user":"23423432@qq.com","pass":"234324324","from":"123123@qq.com","starttlsEnable":"true","auth":true,"sslEnable":true}},{"email_20":{"host":"smtp.163.com","port":465,"user":"23423423@163.com","pass":"234234324","from":"112312312@163.com","starttlsEnable":"false","auth":true,"sslEnable":true}}] smsAccount = [{"sms_10":{"url":"sms.tencentcloudapi.com","region":"ap-guangzhou","secretId":"234234","secretKey":"234324324","smsSdkAppId":"2343242","templateId":"234234","signName":"Java3y公众号","supplierId":10,"supplierName":"腾讯云"}},{"sms_20":{"url":"https://sms.yunpian.com/v2/sms/tpl_batch_send.json","apikey":"23423432","tpl_id":"23423432","supplierId":20,"supplierName":"云片"}}] enterpriseWechatAccount = [{"enterprise_wechat_10":{"corpId":"23423423","corpSecret":"-234324234","agentId":1000002,"token":"234234","aesKey":"23423423"}}] dingDingRobotAccount = [{"ding_ding_robot_10":{"secret":"234324324324","webhook":"https://oapi.dingtalk.com/robot/send?access_token=8d03b68d081f732343243242343247328b0c3003d164715d2c6c6e56"}}]