conversationID

test-errcode
wangchuxiao 2 years ago
parent ead25c24ec
commit e283e66f00

@ -2,14 +2,13 @@ package msgtransfer
import ( import (
"context" "context"
"fmt"
"github.com/OpenIMSDK/Open-IM-Server/pkg/errs"
"runtime/debug"
"strconv" "strconv"
"strings" "strings"
"sync" "sync"
"time" "time"
"github.com/OpenIMSDK/Open-IM-Server/pkg/errs"
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/config" "github.com/OpenIMSDK/Open-IM-Server/pkg/common/config"
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/constant" "github.com/OpenIMSDK/Open-IM-Server/pkg/common/constant"
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/db/controller" "github.com/OpenIMSDK/Open-IM-Server/pkg/common/db/controller"
@ -159,7 +158,6 @@ func (och *OnlineHistoryRedisConsumerHandler) handleNotification(ctx context.Con
} }
func (och *OnlineHistoryRedisConsumerHandler) toPushTopic(ctx context.Context, conversationID string, msgs []*sdkws.MsgData) { func (och *OnlineHistoryRedisConsumerHandler) toPushTopic(ctx context.Context, conversationID string, msgs []*sdkws.MsgData) {
fmt.Printf("toPushTopic Stack:\n%s\n", debug.Stack())
for _, v := range msgs { for _, v := range msgs {
och.msgDatabase.MsgToPushMQ(ctx, conversationID, v) och.msgDatabase.MsgToPushMQ(ctx, conversationID, v)
} }
@ -168,7 +166,6 @@ func (och *OnlineHistoryRedisConsumerHandler) toPushTopic(ctx context.Context, c
func (och *OnlineHistoryRedisConsumerHandler) handleMsg(ctx context.Context, conversationID string, storageList, notStorageList []*sdkws.MsgData) { func (och *OnlineHistoryRedisConsumerHandler) handleMsg(ctx context.Context, conversationID string, storageList, notStorageList []*sdkws.MsgData) {
och.toPushTopic(ctx, conversationID, notStorageList) och.toPushTopic(ctx, conversationID, notStorageList)
if len(storageList) > 0 { if len(storageList) > 0 {
lastSeq, isNewConversation, err := och.msgDatabase.BatchInsertChat2Cache(ctx, conversationID, storageList) lastSeq, isNewConversation, err := och.msgDatabase.BatchInsertChat2Cache(ctx, conversationID, storageList)
if err != nil && errs.Unwrap(err) != redis.Nil { if err != nil && errs.Unwrap(err) != redis.Nil {
log.ZError(ctx, "batch data insert to redis err", err, "storageMsgList", storageList) log.ZError(ctx, "batch data insert to redis err", err, "storageMsgList", storageList)
@ -288,7 +285,8 @@ func (och *OnlineHistoryRedisConsumerHandler) ConsumeClaim(sess sarama.ConsumerG
cMsg = make([]*sarama.ConsumerMessage, 0, 1000) cMsg = make([]*sarama.ConsumerMessage, 0, 1000)
rwLock.Unlock() rwLock.Unlock()
split := 1000 split := 1000
ctx := mcontext.WithTriggerIDContext(context.Background(), utils.OperationIDGenerator()) ctx := mcontext.NewCtx(utils.OperationIDGenerator())
ctx = mcontext.WithTriggerIDContext(context.Background(), utils.OperationIDGenerator())
log.ZDebug(ctx, "timer trigger msg consumer start", "length", len(ccMsg)) log.ZDebug(ctx, "timer trigger msg consumer start", "length", len(ccMsg))
for i := 0; i < len(ccMsg)/split; i++ { for i := 0; i < len(ccMsg)/split; i++ {
//log.Debug() //log.Debug()

@ -62,18 +62,13 @@ func (m *msgServer) sendMsgSingleChat(ctx context.Context, req *msg.SendMsgReq)
if err != nil { if err != nil {
return nil, err return nil, err
} }
conversationID := utils.GetConversationIDByMsg(req.MsgData)
isSend, err := m.modifyMessageByUserMessageReceiveOpt(ctx, req.MsgData.RecvID, req.MsgData.SendID, constant.SingleChatType, req) isSend, err := m.modifyMessageByUserMessageReceiveOpt(ctx, req.MsgData.RecvID, req.MsgData.SendID, constant.SingleChatType, req)
if err != nil { if err != nil {
return nil, err return nil, err
} }
if isSend { if isSend {
err = m.MsgDatabase.MsgToMQ(ctx, req.MsgData.RecvID, req.MsgData) err = m.MsgDatabase.MsgToMQ(ctx, conversationID, req.MsgData)
if err != nil {
return nil, errs.ErrInternalServer.Wrap("insert to mq")
}
}
if req.MsgData.SendID != req.MsgData.RecvID { //Filter messages sent to yourself
err = m.MsgDatabase.MsgToMQ(ctx, req.MsgData.SendID, req.MsgData)
if err != nil { if err != nil {
return nil, errs.ErrInternalServer.Wrap("insert to mq") return nil, errs.ErrInternalServer.Wrap("insert to mq")
} }

@ -51,7 +51,7 @@ func NewKafkaProducer(addr []string, topic string) *Producer {
return &p return &p
} }
func GetMQHeaderWithContext(ctx context.Context) ([]sarama.RecordHeader, error) { func GetMQHeaderWithContext(ctx context.Context) ([]sarama.RecordHeader, error) {
operationID, opUserID, platform, connID, err := mcontext.GetMustCtxInfo(ctx) operationID, opUserID, platform, connID, err := mcontext.GetCtxInfos(ctx)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -69,7 +69,7 @@ func GetContextWithMQHeader(header []*sarama.RecordHeader) context.Context {
return mcontext.WithMustInfoCtx(values) // TODO return mcontext.WithMustInfoCtx(values) // TODO
} }
func (p *Producer) SendMessage(ctx context.Context, key string, m proto.Message) (int32, int64, error) { func (p *Producer) SendMessage(ctx context.Context, key string, m proto.Message) (int32, int64, error) {
log.ZDebug(ctx, "SendMessage", "key ", key, "msg", m) log.ZDebug(ctx, "SendMessage", "key ", key, "msg", m, "topic", p.topic)
kMsg := &sarama.ProducerMessage{} kMsg := &sarama.ProducerMessage{}
kMsg.Topic = p.topic kMsg.Topic = p.topic
kMsg.Key = sarama.StringEncoder(key) kMsg.Key = sarama.StringEncoder(key)

@ -2,6 +2,7 @@ package mcontext
import ( import (
"context" "context"
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/constant" "github.com/OpenIMSDK/Open-IM-Server/pkg/common/constant"
"github.com/OpenIMSDK/Open-IM-Server/pkg/errs" "github.com/OpenIMSDK/Open-IM-Server/pkg/errs"
) )
@ -109,8 +110,20 @@ func GetMustCtxInfo(ctx context.Context) (operationID, opUserID, platform, connI
} }
connID, _ = ctx.Value(constant.ConnID).(string) connID, _ = ctx.Value(constant.ConnID).(string)
return return
}
func GetCtxInfos(ctx context.Context) (operationID, opUserID, platform, connID string, err error) {
operationID, ok := ctx.Value(constant.OperationID).(string)
if !ok {
err = errs.ErrArgs.Wrap("ctx missing operationID")
return
} }
opUserID, _ = ctx.Value(constant.OpUserID).(string)
platform, _ = ctx.Value(constant.OpUserPlatform).(string)
connID, _ = ctx.Value(constant.ConnID).(string)
return
}
func WithMustInfoCtx(values []string) context.Context { func WithMustInfoCtx(values []string) context.Context {
ctx := context.Background() ctx := context.Background()
for i, v := range values { for i, v := range values {

Loading…
Cancel
Save