diff --git a/austin-handler/src/main/java/com/java3y/austin/handler/receiver/kafka/ReceiverStart.java b/austin-handler/src/main/java/com/java3y/austin/handler/receiver/kafka/ReceiverStart.java index 49bd953..0ba8242 100644 --- a/austin-handler/src/main/java/com/java3y/austin/handler/receiver/kafka/ReceiverStart.java +++ b/austin-handler/src/main/java/com/java3y/austin/handler/receiver/kafka/ReceiverStart.java @@ -1,11 +1,9 @@ package com.java3y.austin.handler.receiver.kafka; -import com.alibaba.fastjson.JSON; import com.java3y.austin.handler.utils.GroupIdMappingUtils; +import com.java3y.austin.support.constans.MessageQueuePipeline; import lombok.extern.slf4j.Slf4j; -import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.common.header.Header; -import com.java3y.austin.support.constans.MessageQueuePipeline; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; @@ -14,8 +12,6 @@ import org.springframework.context.annotation.Bean; import org.springframework.kafka.annotation.KafkaListenerAnnotationBeanPostProcessor; import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; import org.springframework.kafka.core.ConsumerFactory; -import org.springframework.kafka.listener.ContainerProperties; -import org.springframework.kafka.listener.adapter.RecordFilterStrategy; import org.springframework.stereotype.Service; import javax.annotation.PostConstruct; @@ -40,6 +36,9 @@ public class ReceiverStart { @Autowired private ConsumerFactory consumerFactory; + @Value("${austin.nacos.enabled}") + private Boolean nacosEnabled; + /** * receiver的消费方法常量 */ @@ -60,7 +59,13 @@ public class ReceiverStart { */ @PostConstruct public void init() { - for (int i = 0; i < groupIds.size(); i++) { + int total = groupIds.size(); + if (nacosEnabled) { + // 当nacos开启时 会导致Receiver提前加载 所以这里getBean次数-1 + // nacos issue: https://github.com/nacos-group/nacos-spring-project/issues/249 + total -= 1; + } + for (int i = 0; i < total; i++) { context.getBean(Receiver.class); } } @@ -74,8 +79,7 @@ public class ReceiverStart { if (element instanceof Method) { String name = ((Method) element).getDeclaringClass().getSimpleName() + "." + ((Method) element).getName(); if (RECEIVER_METHOD_NAME.equals(name)) { - attrs.put("groupId", groupIds.get(index)); - index++; + attrs.put("groupId", groupIds.get(index++)); } } return attrs; diff --git a/austin-service-api-impl/src/main/java/com/java3y/austin/service/api/impl/action/SendMqAction.java b/austin-service-api-impl/src/main/java/com/java3y/austin/service/api/impl/action/SendMqAction.java index ed01385..85e80c9 100644 --- a/austin-service-api-impl/src/main/java/com/java3y/austin/service/api/impl/action/SendMqAction.java +++ b/austin-service-api-impl/src/main/java/com/java3y/austin/service/api/impl/action/SendMqAction.java @@ -36,6 +36,9 @@ public class SendMqAction implements BusinessProcess { @Value("${austin.business.tagId.value}") private String tagId; + @Value("${austin.mq.pipeline}") + private String mqPipeline; + @Override public void process(ProcessContext context) { @@ -50,7 +53,7 @@ public class SendMqAction implements BusinessProcess { } } catch (Exception e) { context.setNeedBreak(true).setResponse(BasicResultVO.fail(RespStatusEnum.SERVICE_ERROR)); - log.error("send kafka fail! e:{},params:{}", Throwables.getStackTraceAsString(e) + log.error("send {} fail! e:{},params:{}", mqPipeline, Throwables.getStackTraceAsString(e) , JSON.toJSONString(CollUtil.getFirst(sendTaskModel.getTaskInfo().listIterator()))); } } diff --git a/austin-stream/src/main/java/com/java3y/austin/stream/constants/AustinFlinkConstant.java b/austin-stream/src/main/java/com/java3y/austin/stream/constants/AustinFlinkConstant.java index f26a476..b41f5e6 100644 --- a/austin-stream/src/main/java/com/java3y/austin/stream/constants/AustinFlinkConstant.java +++ b/austin-stream/src/main/java/com/java3y/austin/stream/constants/AustinFlinkConstant.java @@ -14,15 +14,15 @@ public class AustinFlinkConstant { */ public static final String GROUP_ID = "austinLogGroup"; public static final String TOPIC_NAME = "austinTraceLog"; - public static final String BROKER = "ip:port"; + public static final String BROKER = "austin.kafka"; /** * redis 配置 * TODO 使用前配置redis ip:port * (真实网络ip,这里不能用配置的hosts,看语雀文档得到真实ip) */ - public static final String REDIS_IP = "ip"; - public static final String REDIS_PORT = "port"; + public static final String REDIS_IP = "austin.redis"; + public static final String REDIS_PORT = "5003"; public static final String REDIS_PASSWORD = "austin"; diff --git a/austin-web/src/main/java/com/java3y/austin/AustinApplication.java b/austin-web/src/main/java/com/java3y/austin/AustinApplication.java index 0cfc723..93be36b 100644 --- a/austin-web/src/main/java/com/java3y/austin/AustinApplication.java +++ b/austin-web/src/main/java/com/java3y/austin/AustinApplication.java @@ -12,13 +12,12 @@ public class AustinApplication { public static void main(String[] args) { /** - * TODO 【optional】 - * 如果你需要动态配置 + * 如果你需要启动Apollo动态配置 * 1、启动apollo * 2、将application.properties配置文件的 austin.apollo.enabled 改为true * 3、下方的property替换真实的ip和port */ - System.setProperty("apollo.config-service", "http://ip:port"); + System.setProperty("apollo.config-service", "http://austin.apollo.config:5001"); SpringApplication.run(AustinApplication.class, args); } } diff --git a/austin-web/src/main/resources/application.properties b/austin-web/src/main/resources/application.properties index 23d01a7..2de4250 100644 --- a/austin-web/src/main/resources/application.properties +++ b/austin-web/src/main/resources/application.properties @@ -3,13 +3,13 @@ # TODO please replace 【must】 config value # todo [database] ip/port/username/password 【must】 -austin.database.ip=localhost -austin.database.port=3306 +austin.database.ip=austin.mysql +austin.database.port=5004 austin.database.username=root austin.database.password=root123_A # todo [redis] ip/port/password【must】 -austin.redis.ip=localhost +austin.redis.ip=austin.redis austin.redis.port=5003 austin.redis.password=austin @@ -17,8 +17,8 @@ austin.redis.password=austin austin.mq.pipeline=eventBus # todo [kafka] ip/port【optional】, if austin.mq.pipeline=kafka 【must】 -austin.kafka.ip= -austin.kafka.port= +austin.kafka.ip=austin.kafka +austin.kafka.port=9092 # todo [rocketMq] 【optional】, if austin.mq.pipeline=rocketMq【must】 austin.rocketmq.nameserver.ip= @@ -38,7 +38,7 @@ austin.apollo.enabled=false austin.nacos.enabled=false # todo [grayLog] ip 【optional】 -austin.grayLog.ip=127.0.0.1 +austin.grayLog.ip=austin.graylog ##################### system properties ##################### server.shutdown=graceful @@ -131,6 +131,7 @@ austin.nacos.server= austin.nacos.dataId=austin austin.nacos.group=DEFAULT_GROUP austin.nacos.namespace=9537c674-f3a6-4203-b286-ef0c36bfacb2 +nacos.config.enabled=${austin.nacos.enabled} ##################### httpUtils properties ##################### ok.http.connect-timeout=30