fix: reduce lock msg transfer (#1308)

* fix: reduce lock msg transfer

Signed-off-by: rfyiamcool <rfyiamcool@163.com>

* fix: reduce lock msg transfer

Signed-off-by: rfyiamcool <rfyiamcool@163.com>

---------

Signed-off-by: rfyiamcool <rfyiamcool@163.com>
pull/1348/head
fengyun.rui 1 year ago committed by GitHub
parent 69eb24f702
commit fd42c6dced
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -21,14 +21,13 @@ import (
"net/http"
"sync"
"github.com/OpenIMSDK/tools/mw"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/collectors"
"github.com/prometheus/client_golang/prometheus/promhttp"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
"github.com/OpenIMSDK/tools/mw"
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
"github.com/openimsdk/open-im-server/v3/pkg/common/db/cache"
"github.com/openimsdk/open-im-server/v3/pkg/common/db/controller"

@ -427,49 +427,62 @@ func (och *OnlineHistoryRedisConsumerHandler) ConsumeClaim(
break
}
}
rwLock := new(sync.RWMutex)
log.ZDebug(context.Background(), "online new session msg come", "highWaterMarkOffset",
claim.HighWaterMarkOffset(), "topic", claim.Topic(), "partition", claim.Partition())
cMsg := make([]*sarama.ConsumerMessage, 0, 1000)
t := time.NewTicker(time.Millisecond * 100)
split := 1000
rwLock := new(sync.RWMutex)
messages := make([]*sarama.ConsumerMessage, 0, 1000)
ticker := time.NewTicker(time.Millisecond * 100)
go func() {
for {
select {
case <-t.C:
if len(cMsg) > 0 {
rwLock.Lock()
ccMsg := make([]*sarama.ConsumerMessage, 0, 1000)
for _, v := range cMsg {
ccMsg = append(ccMsg, v)
case <-ticker.C:
if len(messages) == 0 {
continue
}
cMsg = make([]*sarama.ConsumerMessage, 0, 1000)
rwLock.Lock()
buffer := make([]*sarama.ConsumerMessage, 0, len(messages))
buffer = append(buffer, messages...)
// reuse slice, set cap to 0
messages = messages[:0]
rwLock.Unlock()
split := 1000
start := time.Now()
ctx := mcontext.WithTriggerIDContext(context.Background(), utils.OperationIDGenerator())
log.ZDebug(ctx, "timer trigger msg consumer start", "length", len(ccMsg))
for i := 0; i < len(ccMsg)/split; i++ {
// log.Debug()
log.ZDebug(ctx, "timer trigger msg consumer start", "length", len(buffer))
for i := 0; i < len(buffer)/split; i++ {
och.msgDistributionCh <- Cmd2Value{Cmd: ConsumerMsgs, Value: TriggerChannelValue{
ctx: ctx, cMsgList: ccMsg[i*split : (i+1)*split],
ctx: ctx, cMsgList: buffer[i*split : (i+1)*split],
}}
}
if (len(ccMsg) % split) > 0 {
if (len(buffer) % split) > 0 {
och.msgDistributionCh <- Cmd2Value{Cmd: ConsumerMsgs, Value: TriggerChannelValue{
ctx: ctx, cMsgList: ccMsg[split*(len(ccMsg)/split):],
ctx: ctx, cMsgList: buffer[split*(len(buffer)/split):],
}}
}
log.ZDebug(ctx, "timer trigger msg consumer end", "length", len(ccMsg))
}
log.ZDebug(ctx, "timer trigger msg consumer end",
"length", len(buffer), "time_cost", time.Since(start),
)
}
}
}()
for msg := range claim.Messages() {
rwLock.Lock()
if len(msg.Value) != 0 {
cMsg = append(cMsg, msg)
if len(msg.Value) == 0 {
continue
}
rwLock.Lock()
messages = append(messages, msg)
rwLock.Unlock()
sess.MarkMessage(msg, "")
}
return nil
}

Loading…
Cancel
Save