merge vip into master

pull/11/head
3y 2 years ago
commit 36da9a45eb

@ -42,10 +42,11 @@ public class Receiver {
private ConsumeService consumeService;
/**
*
*
* @param consumerRecord
* @param topicGroupId
*/
@KafkaListener(topics = "#{'${austin.business.topic.name}'}")
@KafkaListener(topics = "#{'${austin.business.topic.name}'}", containerFactory = "filterContainerFactory")
public void consumer(ConsumerRecord<?, String> consumerRecord, @Header(KafkaHeaders.GROUP_ID) String topicGroupId) {
Optional<String> kafkaMessage = Optional.ofNullable(consumerRecord.value());
if (kafkaMessage.isPresent()) {
@ -65,7 +66,7 @@ public class Receiver {
*
* @param consumerRecord
*/
@KafkaListener(topics = "#{'${austin.business.recall.topic.name}'}",groupId = "#{'${austin.business.recall.group.name}'}")
@KafkaListener(topics = "#{'${austin.business.recall.topic.name}'}",groupId = "#{'${austin.business.recall.group.name}'}",containerFactory = "filterContainerFactory")
public void recall(ConsumerRecord<?,String> consumerRecord){
Optional<String> kafkaMessage = Optional.ofNullable(consumerRecord.value());
if(kafkaMessage.isPresent()){

@ -1,17 +1,28 @@
package com.java3y.austin.handler.receiver.kafka;
import com.alibaba.fastjson.JSON;
import com.java3y.austin.handler.utils.GroupIdMappingUtils;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.header.Header;
import com.java3y.austin.support.constans.MessageQueuePipeline;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.context.ApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.kafka.annotation.KafkaListenerAnnotationBeanPostProcessor;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.listener.ContainerProperties;
import org.springframework.kafka.listener.adapter.RecordFilterStrategy;
import org.springframework.stereotype.Service;
import javax.annotation.PostConstruct;
import java.lang.reflect.Method;
import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.Optional;
/**
*
@ -21,10 +32,13 @@ import java.util.List;
*/
@Service
@ConditionalOnProperty(name = "austin-mq-pipeline", havingValue = MessageQueuePipeline.KAFKA)
@Slf4j
public class ReceiverStart {
@Autowired
private ApplicationContext context;
@Autowired
private ConsumerFactory consumerFactory;
/**
* receiver
@ -67,4 +81,30 @@ public class ReceiverStart {
return attrs;
};
}
/**
* tag
* producer tagheader
* @return
*/
@Bean
public ConcurrentKafkaListenerContainerFactory filterContainerFactory(@Value("${austin.business.tagId.key}") String tagIdKey,
@Value("${austin.business.tagId.value}") String tagIdValue) {
ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory();
factory.setConsumerFactory(consumerFactory);
factory.setAckDiscarded(true);
factory.setRecordFilterStrategy(consumerRecord -> {
if (Optional.ofNullable(consumerRecord.value()).isPresent()) {
for (Header header : consumerRecord.headers()) {
if (header.key().equals(tagIdKey) && new String(header.value()).equals(new String(tagIdValue.getBytes(StandardCharsets.UTF_8)))) {
return false;
}
}
}
//返回true将会被丢弃
return true;
});
return factory;
}
}

@ -36,6 +36,7 @@ public class SendMqAction implements BusinessProcess<SendTaskModel> {
@Value("${austin.business.tagId.value}")
private String tagId;
@Override
public void process(ProcessContext<SendTaskModel> context) {
SendTaskModel sendTaskModel = context.getProcessModel();

@ -9,6 +9,8 @@ public class AustinFlinkConstant {
/**
* Kafka
* TODO 使kafka broker ip:port
* (ip,hostsip)
* groupId
*/
public static final String GROUP_ID = "austinLogGroup";
public static final String TOPIC_NAME = "austinLog";
@ -17,6 +19,7 @@ public class AustinFlinkConstant {
/**
* redis
* TODO 使redis ip:port
* (ip,hostsip)
*/
public static final String REDIS_IP = "ip";
public static final String REDIS_PORT = "port";

@ -93,6 +93,12 @@
<groupId>com.aliyun</groupId>
<artifactId>alibaba-dingtalk-service-sdk</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-rabbit</artifactId>
</dependency>
</dependencies>
</project>

@ -10,5 +10,6 @@ public interface MessageQueuePipeline {
String EVENT_BUS = "eventBus";
String KAFKA = "kafka";
String ROCKET_MQ = "rocketMq";
String RABBIT_MQ = "rabbitMq";
}

@ -0,0 +1,45 @@
package com.java3y.austin.support.mq.rabbit;
import com.java3y.austin.support.constans.MessageQueuePipeline;
import com.java3y.austin.support.mq.SendMqService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.stereotype.Service;
/**
* @Autor xzcawl
* @Date 2022/7/15 17:29
*/
@Slf4j
@Service
@ConditionalOnProperty(name = "austin-mq-pipeline", havingValue = MessageQueuePipeline.RABBIT_MQ)
public class RabbitSendMqServiceImpl implements SendMqService {
@Autowired
private RabbitTemplate rabbitTemplate;
@Value("${austin.rabbitmq.topic.name}")
private String confTopic;
@Value("${austin.rabbitmq.exchange.name}")
private String exchangeName;
@Override
public void send(String topic, String jsonValue, String tagId) {
if (topic.equals(confTopic)) {
rabbitTemplate.convertAndSend(exchangeName, confTopic, jsonValue);
} else {
log.error("RabbitSendMqServiceImpl send topic error! topic:{},confTopic:{}", topic, confTopic);
}
}
@Override
public void send(String topic, String jsonValue) {
send(topic, jsonValue, null);
}
}

@ -55,9 +55,24 @@ spring.kafka.consumer.auto.offset.reset=earliest
spring.kafka.consumer.auto-commit-interval=1000
spring.kafka.consumer.enable-auto-commit=true
# TODO austin support kafka tag filter,if you need, replace tagIdValue ,eg:com.java3y.austin.yyy
austin.business.tagId.key=kafka_tag_id
austin.business.tagId.value=com.java3y.austin.3y
##################### Rabbit properties #####################
server.port=8080
spring.application.name=cl
#RabbitMq所在服务器IP
spring.rabbitmq.host=127.0.0.1
#连接端口号
spring.rabbitmq.port=5672
#用户名
spring.rabbitmq.username=root
#用户密码
spring.rabbitmq.password=123456
# 开启发送确认
spring.rabbitmq.publisher-confirm-type=correlated
# 开启发送失败退回
spring.rabbitmq.publisher-returns=true
spring.rabbitmq.virtual-host=/
austin.rabbitmq.topic.name=austinRabbit
austin.rabbitmq.exchange.name=austin.point
##################### redis properties #####################
spring.redis.host=${austin-redis-ip}
@ -70,6 +85,11 @@ austin.business.recall.topic.name=austinRecall
austin.business.recall.group.name=recallGroupId
austin.business.log.topic.name=austinLog
austin.business.graylog.ip=${austin-grayLog-ip}
# TODO kafka tag filter,if you need, replace tagIdValue ,eg:com.java3y.austin.yyy
austin.business.tagId.key=kafka_tag_id
austin.business.tagId.value=com.java3y.austin.3y
# TODO if windows os and need upload file to send message ,replace path !
austin.business.upload.crowd.path=/Users/3y/temp

@ -1,6 +1,6 @@
discardMsgIds = []
deduplicationRule = {"deduplication_10":{"num":1,"time":300},"deduplication_20":{"num":5}}
emailAccount =[{"email_10":{"host":"smtp.qq.com","port":465,"user":"23423423@qq.com","pass":"23423432432423423","from":"234@qq.com","starttlsEnable":true,"auth":true,"sslEnable":true}},{"email_20":{"host":"smtp.163.com","port":465,"user":"22222@163.com","pass":"23432423","from":"234324324234@163.com","starttlsEnable":false,"auth":true,"sslEnable":true}}]
emailAccount = [{"email_10":{"host":"smtp.qq.com","port":465,"user":"23423432@qq.com","pass":"234324324","from":"123123@qq.com","starttlsEnable":"true","auth":true,"sslEnable":true}},{"email_20":{"host":"smtp.163.com","port":465,"user":"23423423@163.com","pass":"234234324","from":"112312312@163.com","starttlsEnable":"false","auth":true,"sslEnable":true}}]
smsAccount = [{"sms_10":{"url":"sms.tencentcloudapi.com","region":"ap-guangzhou","secretId":"234234","secretKey":"234324324","smsSdkAppId":"2343242","templateId":"234234","signName":"Java3y公众号","supplierId":10,"supplierName":"腾讯云"}},{"sms_20":{"url":"https://sms.yunpian.com/v2/sms/tpl_batch_send.json","apikey":"23423432","tpl_id":"23423432","supplierId":20,"supplierName":"云片"}}]
enterpriseWechatAccount = [{"enterprise_wechat_10":{"corpId":"23423423","corpSecret":"-234324234","agentId":1000002,"token":"234234","aesKey":"23423423"}}]
dingDingRobotAccount = [{"ding_ding_robot_10":{"secret":"234324324324","webhook":"https://oapi.dingtalk.com/robot/send?access_token=8d03b68d081f732343243242343247328b0c3003d164715d2c6c6e56"}}]

@ -153,6 +153,13 @@
<version>${flink.version}</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-connector-rabbitmq -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-rabbitmq</artifactId>
<version>1.15.1</version>
</dependency>
<!--微信服务号第三方SDK-->
<dependency>
<groupId>com.github.binarywang</groupId>
@ -187,6 +194,7 @@
<artifactId>alibaba-dingtalk-service-sdk</artifactId>
<version>2.0.0</version>
</dependency>
</dependencies>
</dependencyManagement>

Loading…
Cancel
Save