|
|
@ -40,6 +40,8 @@ import (
|
|
|
|
"github.com/openimsdk/tools/utils/timeutil"
|
|
|
|
"github.com/openimsdk/tools/utils/timeutil"
|
|
|
|
"github.com/redis/go-redis/v9"
|
|
|
|
"github.com/redis/go-redis/v9"
|
|
|
|
"google.golang.org/protobuf/proto"
|
|
|
|
"google.golang.org/protobuf/proto"
|
|
|
|
|
|
|
|
"math/rand"
|
|
|
|
|
|
|
|
"strconv"
|
|
|
|
"time"
|
|
|
|
"time"
|
|
|
|
)
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
|
@ -130,6 +132,9 @@ func (c *ConsumerHandler) ConsumeClaim(sess sarama.ConsumerGroupSession, claim s
|
|
|
|
c.onlineCache.Cond.Wait()
|
|
|
|
c.onlineCache.Cond.Wait()
|
|
|
|
}
|
|
|
|
}
|
|
|
|
c.onlineCache.Lock.Unlock()
|
|
|
|
c.onlineCache.Lock.Unlock()
|
|
|
|
|
|
|
|
ctx := mcontext.SetOperationID(context.TODO(), strconv.FormatInt(time.Now().UnixNano()+int64(rand.Uint32()), 10))
|
|
|
|
|
|
|
|
log.ZInfo(ctx, "begin consume messages")
|
|
|
|
|
|
|
|
|
|
|
|
for msg := range claim.Messages() {
|
|
|
|
for msg := range claim.Messages() {
|
|
|
|
ctx := c.pushConsumerGroup.GetContextFromMsg(msg)
|
|
|
|
ctx := c.pushConsumerGroup.GetContextFromMsg(msg)
|
|
|
|
c.handleMs2PsChat(ctx, msg.Value)
|
|
|
|
c.handleMs2PsChat(ctx, msg.Value)
|
|
|
|