From 40adcf71a0d6bdb595824791ea121af97120df24 Mon Sep 17 00:00:00 2001 From: jujiyfb <151103583+jujiyfb@users.noreply.github.com> Date: Wed, 27 Dec 2023 10:25:23 +0800 Subject: [PATCH] Update ConsumeServiceImpl.java --- .../service/impl/ConsumeServiceImpl.java | 65 ++++++++++++++++++- 1 file changed, 64 insertions(+), 1 deletion(-) diff --git a/austin-handler/src/main/java/com/java3y/austin/handler/receiver/service/impl/ConsumeServiceImpl.java b/austin-handler/src/main/java/com/java3y/austin/handler/receiver/service/impl/ConsumeServiceImpl.java index 68659cd..a83054e 100644 --- a/austin-handler/src/main/java/com/java3y/austin/handler/receiver/service/impl/ConsumeServiceImpl.java +++ b/austin-handler/src/main/java/com/java3y/austin/handler/receiver/service/impl/ConsumeServiceImpl.java @@ -1,10 +1,16 @@ package com.java3y.austin.handler.receiver.service.impl; import cn.hutool.core.collection.CollUtil; +import cn.hutool.core.date.DateUtil; +import cn.hutool.core.text.CharSequenceUtil; +import cn.hutool.core.text.StrPool; import com.java3y.austin.common.domain.AnchorInfo; +import com.java3y.austin.common.domain.SimpleAnchorInfo; import com.java3y.austin.common.domain.LogParam; import com.java3y.austin.common.domain.RecallTaskInfo; import com.java3y.austin.common.domain.TaskInfo; +import com.java3y.austin.common.constant.AustinConstant; +import com.java3y.austin.stream.utils.LettuceRedisUtils; import com.java3y.austin.common.enums.AnchorState; import com.java3y.austin.handler.handler.HandlerHolder; import com.java3y.austin.handler.pending.Task; @@ -16,11 +22,24 @@ import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.ApplicationContext; import org.springframework.stereotype.Service; +import com.alibaba.fastjson.JSON; +import com.google.common.base.Throwables; +import io.lettuce.core.RedisFuture; +import lombok.extern.slf4j.Slf4j; + import java.util.List; +import java.time.Duration; +import java.util.ArrayList; +import java.util.Date; + + + + /** * @author 3y */ +@Slf4j @Service public class ConsumeServiceImpl implements ConsumeService { private static final String LOG_BIZ_TYPE = "Receiver#consumer"; @@ -41,9 +60,12 @@ public class ConsumeServiceImpl implements ConsumeService { public void consume2Send(List taskInfoLists) { String topicGroupId = GroupIdMappingUtils.getGroupIdByTaskInfo(CollUtil.getFirst(taskInfoLists.iterator())); for (TaskInfo taskInfo : taskInfoLists) { - logUtils.print(LogParam.builder().bizType(LOG_BIZ_TYPE).object(taskInfo).build(), AnchorInfo.builder().bizId(taskInfo.getBizId()).messageId(taskInfo.getMessageId()).ids(taskInfo.getReceiver()).businessId(taskInfo.getBusinessId()).state(AnchorState.RECEIVE.getCode()).build()); + AnchorInfo info = AnchorInfo.builder().bizId(taskInfo.getBizId()).messageId(taskInfo.getMessageId()).ids(taskInfo.getReceiver()).businessId(taskInfo.getBusinessId()).state(AnchorState.RECEIVE.getCode()).build(); + logUtils.print(LogParam.builder().bizType(LOG_BIZ_TYPE).object(taskInfo).build(), info); Task task = context.getBean(Task.class).setTaskInfo(taskInfo); taskPendingHolder.route(topicGroupId).execute(task); + + realTimeData(info); } } @@ -52,4 +74,45 @@ public class ConsumeServiceImpl implements ConsumeService { logUtils.print(LogParam.builder().bizType(LOG_BIZ_RECALL_TYPE).object(recallTaskInfo).build()); handlerHolder.route(recallTaskInfo.getSendChannel()).recall(recallTaskInfo); } + + private void realTimeData(AnchorInfo info) { + try { + LettuceRedisUtils.pipeline(redisAsyncCommands -> { + List> redisFutures = new ArrayList<>(); + + /** + * 0.构建messageId维度的链路信息 数据结构list:{key,list} + * key:Austin:MessageId:{messageId},listValue:[{timestamp,state,businessId},{timestamp,state,businessId}] + */ + String redisMessageKey = CharSequenceUtil.join(StrPool.COLON, AustinConstant.CACHE_KEY_PREFIX, AustinConstant.MESSAGE_ID, info.getMessageId()); + SimpleAnchorInfo messageAnchorInfo = SimpleAnchorInfo.builder().businessId(info.getBusinessId()).state(info.getState()).timestamp(info.getLogTimestamp()).build(); + redisFutures.add(redisAsyncCommands.lpush(redisMessageKey.getBytes(), JSON.toJSONString(messageAnchorInfo).getBytes())); + redisFutures.add(redisAsyncCommands.expire(redisMessageKey.getBytes(), Duration.ofDays(3).toMillis() / 1000)); + + /** + * 1.构建userId维度的链路信息 数据结构list:{key,list} + * key:userId,listValue:[{timestamp,state,businessId},{timestamp,state,businessId}] + */ + SimpleAnchorInfo userAnchorInfo = SimpleAnchorInfo.builder().businessId(info.getBusinessId()).state(info.getState()).timestamp(info.getLogTimestamp()).build(); + for (String id : info.getIds()) { + redisFutures.add(redisAsyncCommands.lpush(id.getBytes(), JSON.toJSONString(userAnchorInfo).getBytes())); + redisFutures.add(redisAsyncCommands.expire(id.getBytes(), (DateUtil.endOfDay(new Date()).getTime() - DateUtil.current()) / 1000)); + } + + /** + * 2.构建消息模板维度的链路信息 数据结构hash:{key,hash} + * key:businessId,hashValue:{state,stateCount} + */ + redisFutures.add(redisAsyncCommands.hincrby(String.valueOf(info.getBusinessId()).getBytes(), + String.valueOf(info.getState()).getBytes(), info.getIds().size())); + redisFutures.add(redisAsyncCommands.expire(String.valueOf(info.getBusinessId()).getBytes(), + ((DateUtil.offsetDay(new Date(), 30).getTime()) / 1000) - DateUtil.currentSeconds())); + + return redisFutures; + }); + + } catch (Exception e) { + log.error("AustinSink#invoke error: {}", Throwables.getStackTraceAsString(e)); + } + } }