|
|
|
@ -13,6 +13,7 @@ import io.lettuce.core.RedisFuture;
|
|
|
|
|
import lombok.extern.slf4j.Slf4j;
|
|
|
|
|
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
|
|
|
|
|
|
|
|
|
|
import java.nio.charset.StandardCharsets;
|
|
|
|
|
import java.time.Duration;
|
|
|
|
|
import java.util.ArrayList;
|
|
|
|
|
import java.util.Date;
|
|
|
|
@ -50,8 +51,8 @@ public class AustinSink implements SinkFunction<AnchorInfo> {
|
|
|
|
|
*/
|
|
|
|
|
String redisMessageKey = CharSequenceUtil.join(StrPool.COLON, AustinConstant.CACHE_KEY_PREFIX, AustinConstant.MESSAGE_ID, info.getMessageId());
|
|
|
|
|
SimpleAnchorInfo messageAnchorInfo = SimpleAnchorInfo.builder().businessId(info.getBusinessId()).state(info.getState()).timestamp(info.getLogTimestamp()).build();
|
|
|
|
|
redisFutures.add(redisAsyncCommands.lpush(redisMessageKey.getBytes(), JSON.toJSONString(messageAnchorInfo).getBytes()));
|
|
|
|
|
redisFutures.add(redisAsyncCommands.expire(redisMessageKey.getBytes(), Duration.ofDays(3).toMillis() / 1000));
|
|
|
|
|
redisFutures.add(redisAsyncCommands.lpush(redisMessageKey.getBytes(StandardCharsets.UTF_8), JSON.toJSONString(messageAnchorInfo).getBytes(StandardCharsets.UTF_8)));
|
|
|
|
|
redisFutures.add(redisAsyncCommands.expire(redisMessageKey.getBytes(StandardCharsets.UTF_8), Duration.ofDays(3).toMillis() / 1000));
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* 1.构建userId维度的链路信息 数据结构list:{key,list}
|
|
|
|
@ -59,17 +60,17 @@ public class AustinSink implements SinkFunction<AnchorInfo> {
|
|
|
|
|
*/
|
|
|
|
|
SimpleAnchorInfo userAnchorInfo = SimpleAnchorInfo.builder().businessId(info.getBusinessId()).state(info.getState()).timestamp(info.getLogTimestamp()).build();
|
|
|
|
|
for (String id : info.getIds()) {
|
|
|
|
|
redisFutures.add(redisAsyncCommands.lpush(id.getBytes(), JSON.toJSONString(userAnchorInfo).getBytes()));
|
|
|
|
|
redisFutures.add(redisAsyncCommands.expire(id.getBytes(), (DateUtil.endOfDay(new Date()).getTime() - DateUtil.current()) / 1000));
|
|
|
|
|
redisFutures.add(redisAsyncCommands.lpush(id.getBytes(StandardCharsets.UTF_8), JSON.toJSONString(userAnchorInfo).getBytes(StandardCharsets.UTF_8)));
|
|
|
|
|
redisFutures.add(redisAsyncCommands.expire(id.getBytes(StandardCharsets.UTF_8), (DateUtil.endOfDay(new Date()).getTime() - DateUtil.current()) / 1000));
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* 2.构建消息模板维度的链路信息 数据结构hash:{key,hash}
|
|
|
|
|
* key:businessId,hashValue:{state,stateCount}
|
|
|
|
|
*/
|
|
|
|
|
redisFutures.add(redisAsyncCommands.hincrby(String.valueOf(info.getBusinessId()).getBytes(),
|
|
|
|
|
String.valueOf(info.getState()).getBytes(), info.getIds().size()));
|
|
|
|
|
redisFutures.add(redisAsyncCommands.expire(String.valueOf(info.getBusinessId()).getBytes(),
|
|
|
|
|
redisFutures.add(redisAsyncCommands.hincrby(String.valueOf(info.getBusinessId()).getBytes(StandardCharsets.UTF_8),
|
|
|
|
|
String.valueOf(info.getState()).getBytes(StandardCharsets.UTF_8), info.getIds().size()));
|
|
|
|
|
redisFutures.add(redisAsyncCommands.expire(String.valueOf(info.getBusinessId()).getBytes(StandardCharsets.UTF_8),
|
|
|
|
|
((DateUtil.offsetDay(new Date(), 30).getTime()) / 1000) - DateUtil.currentSeconds()));
|
|
|
|
|
|
|
|
|
|
return redisFutures;
|
|
|
|
|