|
|
|
@ -1,10 +1,16 @@
|
|
|
|
|
package com.java3y.austin.handler.receiver.service.impl;
|
|
|
|
|
|
|
|
|
|
import cn.hutool.core.collection.CollUtil;
|
|
|
|
|
import cn.hutool.core.date.DateUtil;
|
|
|
|
|
import cn.hutool.core.text.CharSequenceUtil;
|
|
|
|
|
import cn.hutool.core.text.StrPool;
|
|
|
|
|
import com.java3y.austin.common.domain.AnchorInfo;
|
|
|
|
|
import com.java3y.austin.common.domain.SimpleAnchorInfo;
|
|
|
|
|
import com.java3y.austin.common.domain.LogParam;
|
|
|
|
|
import com.java3y.austin.common.domain.RecallTaskInfo;
|
|
|
|
|
import com.java3y.austin.common.domain.TaskInfo;
|
|
|
|
|
import com.java3y.austin.common.constant.AustinConstant;
|
|
|
|
|
import com.java3y.austin.stream.utils.LettuceRedisUtils;
|
|
|
|
|
import com.java3y.austin.common.enums.AnchorState;
|
|
|
|
|
import com.java3y.austin.handler.handler.HandlerHolder;
|
|
|
|
|
import com.java3y.austin.handler.pending.Task;
|
|
|
|
@ -16,11 +22,24 @@ import org.springframework.beans.factory.annotation.Autowired;
|
|
|
|
|
import org.springframework.context.ApplicationContext;
|
|
|
|
|
import org.springframework.stereotype.Service;
|
|
|
|
|
|
|
|
|
|
import com.alibaba.fastjson.JSON;
|
|
|
|
|
import com.google.common.base.Throwables;
|
|
|
|
|
import io.lettuce.core.RedisFuture;
|
|
|
|
|
import lombok.extern.slf4j.Slf4j;
|
|
|
|
|
|
|
|
|
|
import java.util.List;
|
|
|
|
|
import java.time.Duration;
|
|
|
|
|
import java.util.ArrayList;
|
|
|
|
|
import java.util.Date;
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* @author 3y
|
|
|
|
|
*/
|
|
|
|
|
@Slf4j
|
|
|
|
|
@Service
|
|
|
|
|
public class ConsumeServiceImpl implements ConsumeService {
|
|
|
|
|
private static final String LOG_BIZ_TYPE = "Receiver#consumer";
|
|
|
|
@ -41,9 +60,12 @@ public class ConsumeServiceImpl implements ConsumeService {
|
|
|
|
|
public void consume2Send(List<TaskInfo> taskInfoLists) {
|
|
|
|
|
String topicGroupId = GroupIdMappingUtils.getGroupIdByTaskInfo(CollUtil.getFirst(taskInfoLists.iterator()));
|
|
|
|
|
for (TaskInfo taskInfo : taskInfoLists) {
|
|
|
|
|
logUtils.print(LogParam.builder().bizType(LOG_BIZ_TYPE).object(taskInfo).build(), AnchorInfo.builder().bizId(taskInfo.getBizId()).messageId(taskInfo.getMessageId()).ids(taskInfo.getReceiver()).businessId(taskInfo.getBusinessId()).state(AnchorState.RECEIVE.getCode()).build());
|
|
|
|
|
AnchorInfo info = AnchorInfo.builder().bizId(taskInfo.getBizId()).messageId(taskInfo.getMessageId()).ids(taskInfo.getReceiver()).businessId(taskInfo.getBusinessId()).state(AnchorState.RECEIVE.getCode()).build();
|
|
|
|
|
logUtils.print(LogParam.builder().bizType(LOG_BIZ_TYPE).object(taskInfo).build(), info);
|
|
|
|
|
Task task = context.getBean(Task.class).setTaskInfo(taskInfo);
|
|
|
|
|
taskPendingHolder.route(topicGroupId).execute(task);
|
|
|
|
|
|
|
|
|
|
realTimeData(info);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
@ -52,4 +74,45 @@ public class ConsumeServiceImpl implements ConsumeService {
|
|
|
|
|
logUtils.print(LogParam.builder().bizType(LOG_BIZ_RECALL_TYPE).object(recallTaskInfo).build());
|
|
|
|
|
handlerHolder.route(recallTaskInfo.getSendChannel()).recall(recallTaskInfo);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
private void realTimeData(AnchorInfo info) {
|
|
|
|
|
try {
|
|
|
|
|
LettuceRedisUtils.pipeline(redisAsyncCommands -> {
|
|
|
|
|
List<RedisFuture<?>> redisFutures = new ArrayList<>();
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* 0.构建messageId维度的链路信息 数据结构list:{key,list}
|
|
|
|
|
* key:Austin:MessageId:{messageId},listValue:[{timestamp,state,businessId},{timestamp,state,businessId}]
|
|
|
|
|
*/
|
|
|
|
|
String redisMessageKey = CharSequenceUtil.join(StrPool.COLON, AustinConstant.CACHE_KEY_PREFIX, AustinConstant.MESSAGE_ID, info.getMessageId());
|
|
|
|
|
SimpleAnchorInfo messageAnchorInfo = SimpleAnchorInfo.builder().businessId(info.getBusinessId()).state(info.getState()).timestamp(info.getLogTimestamp()).build();
|
|
|
|
|
redisFutures.add(redisAsyncCommands.lpush(redisMessageKey.getBytes(), JSON.toJSONString(messageAnchorInfo).getBytes()));
|
|
|
|
|
redisFutures.add(redisAsyncCommands.expire(redisMessageKey.getBytes(), Duration.ofDays(3).toMillis() / 1000));
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* 1.构建userId维度的链路信息 数据结构list:{key,list}
|
|
|
|
|
* key:userId,listValue:[{timestamp,state,businessId},{timestamp,state,businessId}]
|
|
|
|
|
*/
|
|
|
|
|
SimpleAnchorInfo userAnchorInfo = SimpleAnchorInfo.builder().businessId(info.getBusinessId()).state(info.getState()).timestamp(info.getLogTimestamp()).build();
|
|
|
|
|
for (String id : info.getIds()) {
|
|
|
|
|
redisFutures.add(redisAsyncCommands.lpush(id.getBytes(), JSON.toJSONString(userAnchorInfo).getBytes()));
|
|
|
|
|
redisFutures.add(redisAsyncCommands.expire(id.getBytes(), (DateUtil.endOfDay(new Date()).getTime() - DateUtil.current()) / 1000));
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* 2.构建消息模板维度的链路信息 数据结构hash:{key,hash}
|
|
|
|
|
* key:businessId,hashValue:{state,stateCount}
|
|
|
|
|
*/
|
|
|
|
|
redisFutures.add(redisAsyncCommands.hincrby(String.valueOf(info.getBusinessId()).getBytes(),
|
|
|
|
|
String.valueOf(info.getState()).getBytes(), info.getIds().size()));
|
|
|
|
|
redisFutures.add(redisAsyncCommands.expire(String.valueOf(info.getBusinessId()).getBytes(),
|
|
|
|
|
((DateUtil.offsetDay(new Date(), 30).getTime()) / 1000) - DateUtil.currentSeconds()));
|
|
|
|
|
|
|
|
|
|
return redisFutures;
|
|
|
|
|
});
|
|
|
|
|
|
|
|
|
|
} catch (Exception e) {
|
|
|
|
|
log.error("AustinSink#invoke error: {}", Throwables.getStackTraceAsString(e));
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|