|
|
@ -18,14 +18,14 @@ import (
|
|
|
|
"context"
|
|
|
|
"context"
|
|
|
|
"encoding/json"
|
|
|
|
"encoding/json"
|
|
|
|
"errors"
|
|
|
|
"errors"
|
|
|
|
|
|
|
|
|
|
|
|
"github.com/openimsdk/open-im-server/v3/pkg/rpcli"
|
|
|
|
"github.com/openimsdk/open-im-server/v3/pkg/rpcli"
|
|
|
|
"github.com/openimsdk/tools/discovery"
|
|
|
|
"github.com/openimsdk/tools/discovery"
|
|
|
|
"strconv"
|
|
|
|
"github.com/openimsdk/tools/mq"
|
|
|
|
"strings"
|
|
|
|
|
|
|
|
"sync"
|
|
|
|
"sync"
|
|
|
|
"time"
|
|
|
|
"time"
|
|
|
|
|
|
|
|
|
|
|
|
"github.com/IBM/sarama"
|
|
|
|
|
|
|
|
"github.com/go-redis/redis"
|
|
|
|
"github.com/go-redis/redis"
|
|
|
|
"github.com/openimsdk/open-im-server/v3/pkg/common/prommetrics"
|
|
|
|
"github.com/openimsdk/open-im-server/v3/pkg/common/prommetrics"
|
|
|
|
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/controller"
|
|
|
|
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/controller"
|
|
|
@ -37,7 +37,6 @@ import (
|
|
|
|
"github.com/openimsdk/tools/errs"
|
|
|
|
"github.com/openimsdk/tools/errs"
|
|
|
|
"github.com/openimsdk/tools/log"
|
|
|
|
"github.com/openimsdk/tools/log"
|
|
|
|
"github.com/openimsdk/tools/mcontext"
|
|
|
|
"github.com/openimsdk/tools/mcontext"
|
|
|
|
"github.com/openimsdk/tools/mq/kafka"
|
|
|
|
|
|
|
|
"github.com/openimsdk/tools/utils/stringutil"
|
|
|
|
"github.com/openimsdk/tools/utils/stringutil"
|
|
|
|
"google.golang.org/protobuf/proto"
|
|
|
|
"google.golang.org/protobuf/proto"
|
|
|
|
)
|
|
|
|
)
|
|
|
@ -64,9 +63,7 @@ type userHasReadSeq struct {
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
type OnlineHistoryRedisConsumerHandler struct {
|
|
|
|
type OnlineHistoryRedisConsumerHandler struct {
|
|
|
|
historyConsumerGroup *kafka.MConsumerGroup
|
|
|
|
redisMessageBatches *batcher.Batcher[ConsumerMessage]
|
|
|
|
|
|
|
|
|
|
|
|
redisMessageBatches *batcher.Batcher[sarama.ConsumerMessage]
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
msgTransferDatabase controller.MsgTransferDatabase
|
|
|
|
msgTransferDatabase controller.MsgTransferDatabase
|
|
|
|
conversationUserHasReadChan chan *userHasReadSeq
|
|
|
|
conversationUserHasReadChan chan *userHasReadSeq
|
|
|
@ -76,12 +73,13 @@ type OnlineHistoryRedisConsumerHandler struct {
|
|
|
|
conversationClient *rpcli.ConversationClient
|
|
|
|
conversationClient *rpcli.ConversationClient
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
func NewOnlineHistoryRedisConsumerHandler(ctx context.Context, client discovery.SvcDiscoveryRegistry, config *Config, database controller.MsgTransferDatabase) (*OnlineHistoryRedisConsumerHandler, error) {
|
|
|
|
type ConsumerMessage struct {
|
|
|
|
kafkaConf := config.KafkaConfig
|
|
|
|
Ctx context.Context
|
|
|
|
historyConsumerGroup, err := kafka.NewMConsumerGroup(kafkaConf.Build(), kafkaConf.ToRedisGroupID, []string{kafkaConf.ToRedisTopic}, false)
|
|
|
|
Key string
|
|
|
|
if err != nil {
|
|
|
|
Value []byte
|
|
|
|
return nil, err
|
|
|
|
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
func NewOnlineHistoryRedisConsumerHandler(ctx context.Context, client discovery.SvcDiscoveryRegistry, config *Config, database controller.MsgTransferDatabase, historyConsumer mq.Consumer) (*OnlineHistoryRedisConsumerHandler, error) {
|
|
|
|
groupConn, err := client.GetConn(ctx, config.Discovery.RpcService.Group)
|
|
|
|
groupConn, err := client.GetConn(ctx, config.Discovery.RpcService.Group)
|
|
|
|
if err != nil {
|
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
return nil, err
|
|
|
@ -97,7 +95,7 @@ func NewOnlineHistoryRedisConsumerHandler(ctx context.Context, client discovery.
|
|
|
|
och.conversationClient = rpcli.NewConversationClient(conversationConn)
|
|
|
|
och.conversationClient = rpcli.NewConversationClient(conversationConn)
|
|
|
|
och.wg.Add(1)
|
|
|
|
och.wg.Add(1)
|
|
|
|
|
|
|
|
|
|
|
|
b := batcher.New[sarama.ConsumerMessage](
|
|
|
|
b := batcher.New[ConsumerMessage](
|
|
|
|
batcher.WithSize(size),
|
|
|
|
batcher.WithSize(size),
|
|
|
|
batcher.WithWorker(worker),
|
|
|
|
batcher.WithWorker(worker),
|
|
|
|
batcher.WithInterval(interval),
|
|
|
|
batcher.WithInterval(interval),
|
|
|
@ -109,16 +107,15 @@ func NewOnlineHistoryRedisConsumerHandler(ctx context.Context, client discovery.
|
|
|
|
hashCode := stringutil.GetHashCode(key)
|
|
|
|
hashCode := stringutil.GetHashCode(key)
|
|
|
|
return int(hashCode) % och.redisMessageBatches.Worker()
|
|
|
|
return int(hashCode) % och.redisMessageBatches.Worker()
|
|
|
|
}
|
|
|
|
}
|
|
|
|
b.Key = func(consumerMessage *sarama.ConsumerMessage) string {
|
|
|
|
b.Key = func(consumerMessage *ConsumerMessage) string {
|
|
|
|
return string(consumerMessage.Key)
|
|
|
|
return consumerMessage.Key
|
|
|
|
}
|
|
|
|
}
|
|
|
|
b.Do = och.do
|
|
|
|
b.Do = och.do
|
|
|
|
och.redisMessageBatches = b
|
|
|
|
och.redisMessageBatches = b
|
|
|
|
och.historyConsumerGroup = historyConsumerGroup
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
return &och, nil
|
|
|
|
return &och, nil
|
|
|
|
}
|
|
|
|
}
|
|
|
|
func (och *OnlineHistoryRedisConsumerHandler) do(ctx context.Context, channelID int, val *batcher.Msg[sarama.ConsumerMessage]) {
|
|
|
|
func (och *OnlineHistoryRedisConsumerHandler) do(ctx context.Context, channelID int, val *batcher.Msg[ConsumerMessage]) {
|
|
|
|
ctx = mcontext.WithTriggerIDContext(ctx, val.TriggerID())
|
|
|
|
ctx = mcontext.WithTriggerIDContext(ctx, val.TriggerID())
|
|
|
|
ctxMessages := och.parseConsumerMessages(ctx, val.Val())
|
|
|
|
ctxMessages := och.parseConsumerMessages(ctx, val.Val())
|
|
|
|
ctx = withAggregationCtx(ctx, ctxMessages)
|
|
|
|
ctx = withAggregationCtx(ctx, ctxMessages)
|
|
|
@ -189,7 +186,7 @@ func (och *OnlineHistoryRedisConsumerHandler) doSetReadSeq(ctx context.Context,
|
|
|
|
|
|
|
|
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
func (och *OnlineHistoryRedisConsumerHandler) parseConsumerMessages(ctx context.Context, consumerMessages []*sarama.ConsumerMessage) []*ContextMsg {
|
|
|
|
func (och *OnlineHistoryRedisConsumerHandler) parseConsumerMessages(ctx context.Context, consumerMessages []*ConsumerMessage) []*ContextMsg {
|
|
|
|
var ctxMessages []*ContextMsg
|
|
|
|
var ctxMessages []*ContextMsg
|
|
|
|
for i := 0; i < len(consumerMessages); i++ {
|
|
|
|
for i := 0; i < len(consumerMessages); i++ {
|
|
|
|
ctxMsg := &ContextMsg{}
|
|
|
|
ctxMsg := &ContextMsg{}
|
|
|
@ -199,16 +196,9 @@ func (och *OnlineHistoryRedisConsumerHandler) parseConsumerMessages(ctx context.
|
|
|
|
log.ZWarn(ctx, "msg_transfer Unmarshal msg err", err, string(consumerMessages[i].Value))
|
|
|
|
log.ZWarn(ctx, "msg_transfer Unmarshal msg err", err, string(consumerMessages[i].Value))
|
|
|
|
continue
|
|
|
|
continue
|
|
|
|
}
|
|
|
|
}
|
|
|
|
var arr []string
|
|
|
|
ctxMsg.ctx = consumerMessages[i].Ctx
|
|
|
|
for i, header := range consumerMessages[i].Headers {
|
|
|
|
|
|
|
|
arr = append(arr, strconv.Itoa(i), string(header.Key), string(header.Value))
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
log.ZDebug(ctx, "consumer.kafka.GetContextWithMQHeader", "len", len(consumerMessages[i].Headers),
|
|
|
|
|
|
|
|
"header", strings.Join(arr, ", "))
|
|
|
|
|
|
|
|
ctxMsg.ctx = kafka.GetContextWithMQHeader(consumerMessages[i].Headers)
|
|
|
|
|
|
|
|
ctxMsg.message = msgFromMQ
|
|
|
|
ctxMsg.message = msgFromMQ
|
|
|
|
log.ZDebug(ctx, "message parse finish", "message", msgFromMQ, "key",
|
|
|
|
log.ZDebug(ctx, "message parse finish", "message", msgFromMQ, "key", consumerMessages[i].Key)
|
|
|
|
string(consumerMessages[i].Key))
|
|
|
|
|
|
|
|
ctxMessages = append(ctxMessages, ctxMsg)
|
|
|
|
ctxMessages = append(ctxMessages, ctxMsg)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
return ctxMessages
|
|
|
|
return ctxMessages
|
|
|
@ -383,7 +373,9 @@ func (och *OnlineHistoryRedisConsumerHandler) Close() {
|
|
|
|
func (och *OnlineHistoryRedisConsumerHandler) toPushTopic(ctx context.Context, key, conversationID string, msgs []*ContextMsg) {
|
|
|
|
func (och *OnlineHistoryRedisConsumerHandler) toPushTopic(ctx context.Context, key, conversationID string, msgs []*ContextMsg) {
|
|
|
|
for _, v := range msgs {
|
|
|
|
for _, v := range msgs {
|
|
|
|
log.ZDebug(ctx, "push msg to topic", "msg", v.message.String())
|
|
|
|
log.ZDebug(ctx, "push msg to topic", "msg", v.message.String())
|
|
|
|
_, _, _ = och.msgTransferDatabase.MsgToPushMQ(v.ctx, key, conversationID, v.message)
|
|
|
|
if err := och.msgTransferDatabase.MsgToPushMQ(v.ctx, key, conversationID, v.message); err != nil {
|
|
|
|
|
|
|
|
log.ZError(ctx, "msg to push topic error", err, "msg", v.message.String())
|
|
|
|
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
@ -401,35 +393,10 @@ func withAggregationCtx(ctx context.Context, values []*ContextMsg) context.Conte
|
|
|
|
return mcontext.SetOperationID(ctx, allMessageOperationID)
|
|
|
|
return mcontext.SetOperationID(ctx, allMessageOperationID)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
func (och *OnlineHistoryRedisConsumerHandler) Setup(_ sarama.ConsumerGroupSession) error { return nil }
|
|
|
|
func (och *OnlineHistoryRedisConsumerHandler) HandlerRedisMessage(ctx context.Context, key string, value []byte) error { // a instance in the consumer group
|
|
|
|
func (och *OnlineHistoryRedisConsumerHandler) Cleanup(_ sarama.ConsumerGroupSession) error {
|
|
|
|
err := och.redisMessageBatches.Put(ctx, &ConsumerMessage{Ctx: ctx, Key: key, Value: value})
|
|
|
|
return nil
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
func (och *OnlineHistoryRedisConsumerHandler) ConsumeClaim(session sarama.ConsumerGroupSession,
|
|
|
|
|
|
|
|
claim sarama.ConsumerGroupClaim) error { // a instance in the consumer group
|
|
|
|
|
|
|
|
log.ZDebug(context.Background(), "online new session msg come", "highWaterMarkOffset",
|
|
|
|
|
|
|
|
claim.HighWaterMarkOffset(), "topic", claim.Topic(), "partition", claim.Partition())
|
|
|
|
|
|
|
|
och.redisMessageBatches.OnComplete = func(lastMessage *sarama.ConsumerMessage, totalCount int) {
|
|
|
|
|
|
|
|
session.MarkMessage(lastMessage, "")
|
|
|
|
|
|
|
|
session.Commit()
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
for {
|
|
|
|
|
|
|
|
select {
|
|
|
|
|
|
|
|
case msg, ok := <-claim.Messages():
|
|
|
|
|
|
|
|
if !ok {
|
|
|
|
|
|
|
|
return nil
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
if len(msg.Value) == 0 {
|
|
|
|
|
|
|
|
continue
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
err := och.redisMessageBatches.Put(context.Background(), msg)
|
|
|
|
|
|
|
|
if err != nil {
|
|
|
|
if err != nil {
|
|
|
|
log.ZWarn(context.Background(), "put msg to error", err, "msg", msg)
|
|
|
|
log.ZWarn(ctx, "put msg to error", err, "key", key, "value", value)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
case <-session.Context().Done():
|
|
|
|
|
|
|
|
return nil
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|