|
|
|
@ -2,7 +2,11 @@ package msgtransfer
|
|
|
|
|
|
|
|
|
|
import (
|
|
|
|
|
"context"
|
|
|
|
|
"fmt"
|
|
|
|
|
"github.com/OpenIMSDK/Open-IM-Server/pkg/errs"
|
|
|
|
|
"runtime/debug"
|
|
|
|
|
"strconv"
|
|
|
|
|
"strings"
|
|
|
|
|
"sync"
|
|
|
|
|
"time"
|
|
|
|
|
|
|
|
|
@ -155,6 +159,7 @@ func (och *OnlineHistoryRedisConsumerHandler) handleNotification(ctx context.Con
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (och *OnlineHistoryRedisConsumerHandler) toPushTopic(ctx context.Context, conversationID string, msgs []*sdkws.MsgData) {
|
|
|
|
|
fmt.Printf("toPushTopic Stack:\n%s\n", debug.Stack())
|
|
|
|
|
for _, v := range msgs {
|
|
|
|
|
och.msgDatabase.MsgToPushMQ(ctx, conversationID, v)
|
|
|
|
|
}
|
|
|
|
@ -219,6 +224,11 @@ func (och *OnlineHistoryRedisConsumerHandler) MessagesDistributionHandle() {
|
|
|
|
|
log.ZError(ctx, "msg_transfer Unmarshal msg err", err, string(consumerMessages[i].Value))
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
var arr []string
|
|
|
|
|
for i, header := range consumerMessages[i].Headers {
|
|
|
|
|
arr = append(arr, strconv.Itoa(i), string(header.Key), string(header.Value))
|
|
|
|
|
}
|
|
|
|
|
log.ZInfo(ctx, "consumer.kafka.GetContextWithMQHeader", "len", len(consumerMessages[i].Headers), "header", strings.Join(arr, ", "))
|
|
|
|
|
ctxMsg.ctx = kafka.GetContextWithMQHeader(consumerMessages[i].Headers)
|
|
|
|
|
ctxMsg.message = &msgFromMQ
|
|
|
|
|
log.ZDebug(ctx, "single msg come to distribution center", msgFromMQ.String(), string(consumerMessages[i].Key))
|
|
|
|
|