From 07276acc24e85174944db7554ece9e88962bf3c6 Mon Sep 17 00:00:00 2001 From: 3y Date: Wed, 13 Jul 2022 20:27:05 +0800 Subject: [PATCH] =?UTF-8?q?=E5=BC=83=E7=94=A8=20kafkaUtils=EF=BC=8C?= =?UTF-8?q?=E5=86=99=E6=97=A5=E5=BF=97=E7=9A=84=E6=97=B6=E5=80=99=E5=88=A4?= =?UTF-8?q?=E6=96=AD=E6=98=AF=E5=90=A6=E5=AE=9E=E7=8E=B0=E4=B8=BAKafka?= =?UTF-8?q?=EF=BC=8C=E6=98=AF=E6=89=8D=E5=8F=91=E9=80=81=E5=88=B0topic?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../NightShieldLazyPendingHandler.java | 6 ++-- .../service/api/impl/action/SendMqAction.java | 6 ---- .../austin/support/utils/KafkaUtils.java | 30 ------------------- .../java3y/austin/support/utils/LogUtils.java | 21 ++++++++----- 4 files changed, 17 insertions(+), 46 deletions(-) delete mode 100644 austin-support/src/main/java/com/java3y/austin/support/utils/KafkaUtils.java diff --git a/austin-cron/src/main/java/com/java3y/austin/cron/handler/NightShieldLazyPendingHandler.java b/austin-cron/src/main/java/com/java3y/austin/cron/handler/NightShieldLazyPendingHandler.java index c7452bb..0ee2191 100644 --- a/austin-cron/src/main/java/com/java3y/austin/cron/handler/NightShieldLazyPendingHandler.java +++ b/austin-cron/src/main/java/com/java3y/austin/cron/handler/NightShieldLazyPendingHandler.java @@ -6,12 +6,12 @@ import com.alibaba.fastjson.serializer.SerializerFeature; import com.google.common.base.Throwables; import com.java3y.austin.common.domain.TaskInfo; import com.java3y.austin.support.config.SupportThreadPoolConfig; -import com.java3y.austin.support.utils.KafkaUtils; import com.java3y.austin.support.utils.RedisUtils; import com.xxl.job.core.handler.annotation.XxlJob; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; +import org.springframework.kafka.core.KafkaTemplate; import org.springframework.stereotype.Service; import java.util.Arrays; @@ -31,7 +31,7 @@ public class NightShieldLazyPendingHandler { private static final String NIGHT_SHIELD_BUT_NEXT_DAY_SEND_KEY = "night_shield_send"; @Autowired - private KafkaUtils kafkaUtils; + private KafkaTemplate kafkaTemplate; @Value("${austin.business.topic.name}") private String topicName; @Autowired @@ -48,7 +48,7 @@ public class NightShieldLazyPendingHandler { String taskInfo = redisUtils.lPop(NIGHT_SHIELD_BUT_NEXT_DAY_SEND_KEY); if (StrUtil.isNotBlank(taskInfo)) { try { - kafkaUtils.send(topicName, JSON.toJSONString(Arrays.asList(JSON.parseObject(taskInfo, TaskInfo.class)) + kafkaTemplate.send(topicName, JSON.toJSONString(Arrays.asList(JSON.parseObject(taskInfo, TaskInfo.class)) , new SerializerFeature[]{SerializerFeature.WriteClassName})); } catch (Exception e) { log.error("nightShieldLazyJob send kafka fail! e:{},params:{}", Throwables.getStackTraceAsString(e), taskInfo); diff --git a/austin-service-api-impl/src/main/java/com/java3y/austin/service/api/impl/action/SendMqAction.java b/austin-service-api-impl/src/main/java/com/java3y/austin/service/api/impl/action/SendMqAction.java index fda58c7..e95ccda 100644 --- a/austin-service-api-impl/src/main/java/com/java3y/austin/service/api/impl/action/SendMqAction.java +++ b/austin-service-api-impl/src/main/java/com/java3y/austin/service/api/impl/action/SendMqAction.java @@ -4,24 +4,18 @@ import cn.hutool.core.collection.CollUtil; import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.serializer.SerializerFeature; import com.google.common.base.Throwables; -import com.google.common.eventbus.EventBus; -import com.java3y.austin.common.domain.TaskInfo; import com.java3y.austin.common.enums.RespStatusEnum; import com.java3y.austin.common.vo.BasicResultVO; import com.java3y.austin.service.api.enums.BusinessCode; import com.java3y.austin.service.api.impl.domain.SendTaskModel; import com.java3y.austin.support.mq.SendMqService; -import com.java3y.austin.support.mq.eventbus.EventBusListener; import com.java3y.austin.support.pipeline.BusinessProcess; import com.java3y.austin.support.pipeline.ProcessContext; -import com.java3y.austin.support.utils.KafkaUtils; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Service; -import java.util.List; - /** * @author 3y * 将消息发送到MQ diff --git a/austin-support/src/main/java/com/java3y/austin/support/utils/KafkaUtils.java b/austin-support/src/main/java/com/java3y/austin/support/utils/KafkaUtils.java deleted file mode 100644 index 193f1ee..0000000 --- a/austin-support/src/main/java/com/java3y/austin/support/utils/KafkaUtils.java +++ /dev/null @@ -1,30 +0,0 @@ -package com.java3y.austin.support.utils; - -import lombok.extern.slf4j.Slf4j; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.kafka.core.KafkaTemplate; -import org.springframework.stereotype.Component; - -/** - * @author 3y - * @date 2022/2/16 - * Kafka工具类 - */ -@Component -@Slf4j -public class KafkaUtils { - - @Autowired - private KafkaTemplate kafkaTemplate; - - /** - * 发送kafka消息 - * - * @param topicName - * @param jsonMessage - */ - public void send(String topicName, String jsonMessage) { - kafkaTemplate.send(topicName, jsonMessage); - } - -} diff --git a/austin-support/src/main/java/com/java3y/austin/support/utils/LogUtils.java b/austin-support/src/main/java/com/java3y/austin/support/utils/LogUtils.java index 055f065..1b542d1 100644 --- a/austin-support/src/main/java/com/java3y/austin/support/utils/LogUtils.java +++ b/austin-support/src/main/java/com/java3y/austin/support/utils/LogUtils.java @@ -6,9 +6,11 @@ import com.alibaba.fastjson.JSON; import com.google.common.base.Throwables; import com.java3y.austin.common.domain.AnchorInfo; import com.java3y.austin.common.domain.LogParam; +import com.java3y.austin.support.constans.MessageQueuePipeline; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; +import org.springframework.kafka.core.KafkaTemplate; import org.springframework.stereotype.Component; /** @@ -20,8 +22,11 @@ import org.springframework.stereotype.Component; @Component public class LogUtils extends CustomLogListener { + @Value("${austin-mq-pipeline}") + private String mqPipeline; + @Autowired - private KafkaUtils kafkaUtils; + private KafkaTemplate kafkaTemplate; @Value("${austin.business.log.topic.name}") private String topicName; @@ -49,13 +54,15 @@ public class LogUtils extends CustomLogListener { anchorInfo.setTimestamp(System.currentTimeMillis()); String message = JSON.toJSONString(anchorInfo); log.info(message); - - try { - kafkaUtils.send(topicName, message); - } catch (Exception e) { - log.error("LogUtils#print kafka fail! e:{},params:{}", Throwables.getStackTraceAsString(e) - , JSON.toJSONString(anchorInfo)); + if (MessageQueuePipeline.KAFKA.equals(mqPipeline)) { + try { + kafkaTemplate.send(topicName, message); + } catch (Exception e) { + log.error("LogUtils#print kafka fail! e:{},params:{}", Throwables.getStackTraceAsString(e) + , JSON.toJSONString(anchorInfo)); + } } + } /**