From e283e66f0051592947dd4fbda7028501389ad4a3 Mon Sep 17 00:00:00 2001 From: wangchuxiao Date: Wed, 10 May 2023 17:18:04 +0800 Subject: [PATCH] conversationID --- internal/msgtransfer/online_history_msg_handler.go | 10 ++++------ internal/rpc/msg/send_pull.go | 9 ++------- pkg/common/kafka/producer.go | 4 ++-- pkg/common/mcontext/ctx.go | 13 +++++++++++++ 4 files changed, 21 insertions(+), 15 deletions(-) diff --git a/internal/msgtransfer/online_history_msg_handler.go b/internal/msgtransfer/online_history_msg_handler.go index 3159c118b..208791475 100644 --- a/internal/msgtransfer/online_history_msg_handler.go +++ b/internal/msgtransfer/online_history_msg_handler.go @@ -2,14 +2,13 @@ package msgtransfer import ( "context" - "fmt" - "github.com/OpenIMSDK/Open-IM-Server/pkg/errs" - "runtime/debug" "strconv" "strings" "sync" "time" + "github.com/OpenIMSDK/Open-IM-Server/pkg/errs" + "github.com/OpenIMSDK/Open-IM-Server/pkg/common/config" "github.com/OpenIMSDK/Open-IM-Server/pkg/common/constant" "github.com/OpenIMSDK/Open-IM-Server/pkg/common/db/controller" @@ -159,7 +158,6 @@ func (och *OnlineHistoryRedisConsumerHandler) handleNotification(ctx context.Con } func (och *OnlineHistoryRedisConsumerHandler) toPushTopic(ctx context.Context, conversationID string, msgs []*sdkws.MsgData) { - fmt.Printf("toPushTopic Stack:\n%s\n", debug.Stack()) for _, v := range msgs { och.msgDatabase.MsgToPushMQ(ctx, conversationID, v) } @@ -168,7 +166,6 @@ func (och *OnlineHistoryRedisConsumerHandler) toPushTopic(ctx context.Context, c func (och *OnlineHistoryRedisConsumerHandler) handleMsg(ctx context.Context, conversationID string, storageList, notStorageList []*sdkws.MsgData) { och.toPushTopic(ctx, conversationID, notStorageList) if len(storageList) > 0 { - lastSeq, isNewConversation, err := och.msgDatabase.BatchInsertChat2Cache(ctx, conversationID, storageList) if err != nil && errs.Unwrap(err) != redis.Nil { log.ZError(ctx, "batch data insert to redis err", err, "storageMsgList", storageList) @@ -288,7 +285,8 @@ func (och *OnlineHistoryRedisConsumerHandler) ConsumeClaim(sess sarama.ConsumerG cMsg = make([]*sarama.ConsumerMessage, 0, 1000) rwLock.Unlock() split := 1000 - ctx := mcontext.WithTriggerIDContext(context.Background(), utils.OperationIDGenerator()) + ctx := mcontext.NewCtx(utils.OperationIDGenerator()) + ctx = mcontext.WithTriggerIDContext(context.Background(), utils.OperationIDGenerator()) log.ZDebug(ctx, "timer trigger msg consumer start", "length", len(ccMsg)) for i := 0; i < len(ccMsg)/split; i++ { //log.Debug() diff --git a/internal/rpc/msg/send_pull.go b/internal/rpc/msg/send_pull.go index f9f565ffd..b2ff41e35 100644 --- a/internal/rpc/msg/send_pull.go +++ b/internal/rpc/msg/send_pull.go @@ -62,18 +62,13 @@ func (m *msgServer) sendMsgSingleChat(ctx context.Context, req *msg.SendMsgReq) if err != nil { return nil, err } + conversationID := utils.GetConversationIDByMsg(req.MsgData) isSend, err := m.modifyMessageByUserMessageReceiveOpt(ctx, req.MsgData.RecvID, req.MsgData.SendID, constant.SingleChatType, req) if err != nil { return nil, err } if isSend { - err = m.MsgDatabase.MsgToMQ(ctx, req.MsgData.RecvID, req.MsgData) - if err != nil { - return nil, errs.ErrInternalServer.Wrap("insert to mq") - } - } - if req.MsgData.SendID != req.MsgData.RecvID { //Filter messages sent to yourself - err = m.MsgDatabase.MsgToMQ(ctx, req.MsgData.SendID, req.MsgData) + err = m.MsgDatabase.MsgToMQ(ctx, conversationID, req.MsgData) if err != nil { return nil, errs.ErrInternalServer.Wrap("insert to mq") } diff --git a/pkg/common/kafka/producer.go b/pkg/common/kafka/producer.go index 9da96d430..6c6d6f149 100644 --- a/pkg/common/kafka/producer.go +++ b/pkg/common/kafka/producer.go @@ -51,7 +51,7 @@ func NewKafkaProducer(addr []string, topic string) *Producer { return &p } func GetMQHeaderWithContext(ctx context.Context) ([]sarama.RecordHeader, error) { - operationID, opUserID, platform, connID, err := mcontext.GetMustCtxInfo(ctx) + operationID, opUserID, platform, connID, err := mcontext.GetCtxInfos(ctx) if err != nil { return nil, err } @@ -69,7 +69,7 @@ func GetContextWithMQHeader(header []*sarama.RecordHeader) context.Context { return mcontext.WithMustInfoCtx(values) // TODO } func (p *Producer) SendMessage(ctx context.Context, key string, m proto.Message) (int32, int64, error) { - log.ZDebug(ctx, "SendMessage", "key ", key, "msg", m) + log.ZDebug(ctx, "SendMessage", "key ", key, "msg", m, "topic", p.topic) kMsg := &sarama.ProducerMessage{} kMsg.Topic = p.topic kMsg.Key = sarama.StringEncoder(key) diff --git a/pkg/common/mcontext/ctx.go b/pkg/common/mcontext/ctx.go index 448521ad5..5dee1341a 100644 --- a/pkg/common/mcontext/ctx.go +++ b/pkg/common/mcontext/ctx.go @@ -2,6 +2,7 @@ package mcontext import ( "context" + "github.com/OpenIMSDK/Open-IM-Server/pkg/common/constant" "github.com/OpenIMSDK/Open-IM-Server/pkg/errs" ) @@ -109,8 +110,20 @@ func GetMustCtxInfo(ctx context.Context) (operationID, opUserID, platform, connI } connID, _ = ctx.Value(constant.ConnID).(string) return +} +func GetCtxInfos(ctx context.Context) (operationID, opUserID, platform, connID string, err error) { + operationID, ok := ctx.Value(constant.OperationID).(string) + if !ok { + err = errs.ErrArgs.Wrap("ctx missing operationID") + return + } + opUserID, _ = ctx.Value(constant.OpUserID).(string) + platform, _ = ctx.Value(constant.OpUserPlatform).(string) + connID, _ = ctx.Value(constant.ConnID).(string) + return } + func WithMustInfoCtx(values []string) context.Context { ctx := context.Background() for i, v := range values {