diff --git a/internal/msgtransfer/init.go b/internal/msgtransfer/init.go index bebf6819a..4ce015543 100644 --- a/internal/msgtransfer/init.go +++ b/internal/msgtransfer/init.go @@ -21,14 +21,13 @@ import ( "net/http" "sync" + "github.com/OpenIMSDK/tools/mw" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/collectors" "github.com/prometheus/client_golang/prometheus/promhttp" "google.golang.org/grpc" "google.golang.org/grpc/credentials/insecure" - "github.com/OpenIMSDK/tools/mw" - "github.com/openimsdk/open-im-server/v3/pkg/common/config" "github.com/openimsdk/open-im-server/v3/pkg/common/db/cache" "github.com/openimsdk/open-im-server/v3/pkg/common/db/controller" diff --git a/internal/msgtransfer/online_history_msg_handler.go b/internal/msgtransfer/online_history_msg_handler.go index b4556634c..b019b0120 100644 --- a/internal/msgtransfer/online_history_msg_handler.go +++ b/internal/msgtransfer/online_history_msg_handler.go @@ -427,49 +427,62 @@ func (och *OnlineHistoryRedisConsumerHandler) ConsumeClaim( break } } - rwLock := new(sync.RWMutex) log.ZDebug(context.Background(), "online new session msg come", "highWaterMarkOffset", claim.HighWaterMarkOffset(), "topic", claim.Topic(), "partition", claim.Partition()) - cMsg := make([]*sarama.ConsumerMessage, 0, 1000) - t := time.NewTicker(time.Millisecond * 100) + + split := 1000 + rwLock := new(sync.RWMutex) + messages := make([]*sarama.ConsumerMessage, 0, 1000) + ticker := time.NewTicker(time.Millisecond * 100) + go func() { for { select { - case <-t.C: - if len(cMsg) > 0 { - rwLock.Lock() - ccMsg := make([]*sarama.ConsumerMessage, 0, 1000) - for _, v := range cMsg { - ccMsg = append(ccMsg, v) - } - cMsg = make([]*sarama.ConsumerMessage, 0, 1000) - rwLock.Unlock() - split := 1000 - ctx := mcontext.WithTriggerIDContext(context.Background(), utils.OperationIDGenerator()) - log.ZDebug(ctx, "timer trigger msg consumer start", "length", len(ccMsg)) - for i := 0; i < len(ccMsg)/split; i++ { - // log.Debug() - och.msgDistributionCh <- Cmd2Value{Cmd: ConsumerMsgs, Value: TriggerChannelValue{ - ctx: ctx, cMsgList: ccMsg[i*split : (i+1)*split], - }} - } - if (len(ccMsg) % split) > 0 { - och.msgDistributionCh <- Cmd2Value{Cmd: ConsumerMsgs, Value: TriggerChannelValue{ - ctx: ctx, cMsgList: ccMsg[split*(len(ccMsg)/split):], - }} - } - log.ZDebug(ctx, "timer trigger msg consumer end", "length", len(ccMsg)) + case <-ticker.C: + if len(messages) == 0 { + continue + } + + rwLock.Lock() + buffer := make([]*sarama.ConsumerMessage, 0, len(messages)) + buffer = append(buffer, messages...) + + // reuse slice, set cap to 0 + messages = messages[:0] + rwLock.Unlock() + + start := time.Now() + ctx := mcontext.WithTriggerIDContext(context.Background(), utils.OperationIDGenerator()) + log.ZDebug(ctx, "timer trigger msg consumer start", "length", len(buffer)) + for i := 0; i < len(buffer)/split; i++ { + och.msgDistributionCh <- Cmd2Value{Cmd: ConsumerMsgs, Value: TriggerChannelValue{ + ctx: ctx, cMsgList: buffer[i*split : (i+1)*split], + }} + } + if (len(buffer) % split) > 0 { + och.msgDistributionCh <- Cmd2Value{Cmd: ConsumerMsgs, Value: TriggerChannelValue{ + ctx: ctx, cMsgList: buffer[split*(len(buffer)/split):], + }} } + + log.ZDebug(ctx, "timer trigger msg consumer end", + "length", len(buffer), "time_cost", time.Since(start), + ) } } }() + for msg := range claim.Messages() { - rwLock.Lock() - if len(msg.Value) != 0 { - cMsg = append(cMsg, msg) + if len(msg.Value) == 0 { + continue } + + rwLock.Lock() + messages = append(messages, msg) rwLock.Unlock() + sess.MarkMessage(msg, "") } + return nil }