弃用 kafkaUtils,写日志的时候判断是否实现为Kafka,是才发送到topic

pull/11/head
3y 2 years ago
parent 92dc438450
commit 07276acc24

@ -6,12 +6,12 @@ import com.alibaba.fastjson.serializer.SerializerFeature;
import com.google.common.base.Throwables;
import com.java3y.austin.common.domain.TaskInfo;
import com.java3y.austin.support.config.SupportThreadPoolConfig;
import com.java3y.austin.support.utils.KafkaUtils;
import com.java3y.austin.support.utils.RedisUtils;
import com.xxl.job.core.handler.annotation.XxlJob;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;
import java.util.Arrays;
@ -31,7 +31,7 @@ public class NightShieldLazyPendingHandler {
private static final String NIGHT_SHIELD_BUT_NEXT_DAY_SEND_KEY = "night_shield_send";
@Autowired
private KafkaUtils kafkaUtils;
private KafkaTemplate kafkaTemplate;
@Value("${austin.business.topic.name}")
private String topicName;
@Autowired
@ -48,7 +48,7 @@ public class NightShieldLazyPendingHandler {
String taskInfo = redisUtils.lPop(NIGHT_SHIELD_BUT_NEXT_DAY_SEND_KEY);
if (StrUtil.isNotBlank(taskInfo)) {
try {
kafkaUtils.send(topicName, JSON.toJSONString(Arrays.asList(JSON.parseObject(taskInfo, TaskInfo.class))
kafkaTemplate.send(topicName, JSON.toJSONString(Arrays.asList(JSON.parseObject(taskInfo, TaskInfo.class))
, new SerializerFeature[]{SerializerFeature.WriteClassName}));
} catch (Exception e) {
log.error("nightShieldLazyJob send kafka fail! e:{},params:{}", Throwables.getStackTraceAsString(e), taskInfo);

@ -4,24 +4,18 @@ import cn.hutool.core.collection.CollUtil;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.serializer.SerializerFeature;
import com.google.common.base.Throwables;
import com.google.common.eventbus.EventBus;
import com.java3y.austin.common.domain.TaskInfo;
import com.java3y.austin.common.enums.RespStatusEnum;
import com.java3y.austin.common.vo.BasicResultVO;
import com.java3y.austin.service.api.enums.BusinessCode;
import com.java3y.austin.service.api.impl.domain.SendTaskModel;
import com.java3y.austin.support.mq.SendMqService;
import com.java3y.austin.support.mq.eventbus.EventBusListener;
import com.java3y.austin.support.pipeline.BusinessProcess;
import com.java3y.austin.support.pipeline.ProcessContext;
import com.java3y.austin.support.utils.KafkaUtils;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
import java.util.List;
/**
* @author 3y
* MQ

@ -1,30 +0,0 @@
package com.java3y.austin.support.utils;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;
/**
* @author 3y
* @date 2022/2/16
* Kafka
*/
@Component
@Slf4j
public class KafkaUtils {
@Autowired
private KafkaTemplate kafkaTemplate;
/**
* kafka
*
* @param topicName
* @param jsonMessage
*/
public void send(String topicName, String jsonMessage) {
kafkaTemplate.send(topicName, jsonMessage);
}
}

@ -6,9 +6,11 @@ import com.alibaba.fastjson.JSON;
import com.google.common.base.Throwables;
import com.java3y.austin.common.domain.AnchorInfo;
import com.java3y.austin.common.domain.LogParam;
import com.java3y.austin.support.constans.MessageQueuePipeline;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;
/**
@ -20,8 +22,11 @@ import org.springframework.stereotype.Component;
@Component
public class LogUtils extends CustomLogListener {
@Value("${austin-mq-pipeline}")
private String mqPipeline;
@Autowired
private KafkaUtils kafkaUtils;
private KafkaTemplate kafkaTemplate;
@Value("${austin.business.log.topic.name}")
private String topicName;
@ -49,13 +54,15 @@ public class LogUtils extends CustomLogListener {
anchorInfo.setTimestamp(System.currentTimeMillis());
String message = JSON.toJSONString(anchorInfo);
log.info(message);
try {
kafkaUtils.send(topicName, message);
} catch (Exception e) {
log.error("LogUtils#print kafka fail! e:{},params:{}", Throwables.getStackTraceAsString(e)
, JSON.toJSONString(anchorInfo));
if (MessageQueuePipeline.KAFKA.equals(mqPipeline)) {
try {
kafkaTemplate.send(topicName, message);
} catch (Exception e) {
log.error("LogUtils#print kafka fail! e:{},params:{}", Throwables.getStackTraceAsString(e)
, JSON.toJSONString(anchorInfo));
}
}
}
/**

Loading…
Cancel
Save