diff --git a/internal/msgtransfer/init.go b/internal/msgtransfer/init.go index 8ef3efd83..1766a5419 100644 --- a/internal/msgtransfer/init.go +++ b/internal/msgtransfer/init.go @@ -20,8 +20,6 @@ import ( "fmt" "github.com/OpenIMSDK/tools/errs" - "github.com/OpenIMSDK/tools/log" - util "github.com/openimsdk/open-im-server/v3/pkg/util/genutil" "net/http" @@ -118,11 +116,8 @@ func (m *MsgTransfer) Start(prometheusPort int) error { netErr error ) - onError := func(ctx context.Context, err error, errInfo string) { - log.ZWarn(ctx, errInfo, err) - } - go m.historyCH.historyConsumerGroup.RegisterHandleAndConsumer(m.ctx, m.historyCH, onError) - go m.historyMongoCH.historyConsumerGroup.RegisterHandleAndConsumer(m.ctx, m.historyMongoCH, onError) + go m.historyCH.historyConsumerGroup.RegisterHandleAndConsumer(m.ctx, m.historyCH) + go m.historyMongoCH.historyConsumerGroup.RegisterHandleAndConsumer(m.ctx, m.historyMongoCH) if config.Config.Prometheus.Enable { go func() { diff --git a/internal/msgtransfer/online_history_msg_handler.go b/internal/msgtransfer/online_history_msg_handler.go index 6f0ee7706..393ec7a75 100644 --- a/internal/msgtransfer/online_history_msg_handler.go +++ b/internal/msgtransfer/online_history_msg_handler.go @@ -438,6 +438,7 @@ func (och *OnlineHistoryRedisConsumerHandler) ConsumeClaim( wg = sync.WaitGroup{} running = new(atomic.Bool) ) + running.Store(true) wg.Add(1) go func() { diff --git a/internal/push/consumer_init.go b/internal/push/consumer_init.go index 572afe0eb..daaa37e8a 100644 --- a/internal/push/consumer_init.go +++ b/internal/push/consumer_init.go @@ -16,8 +16,6 @@ package push import ( "context" - - "github.com/OpenIMSDK/tools/log" ) type Consumer struct { @@ -36,9 +34,6 @@ func NewConsumer(pusher *Pusher) (*Consumer, error) { } func (c *Consumer) Start() { - onError := func(ctx context.Context, err error, errInfo string) { - log.ZWarn(ctx, errInfo, err) - } - go c.pushCh.pushConsumerGroup.RegisterHandleAndConsumer(context.Background(), &c.pushCh, onError) + go c.pushCh.pushConsumerGroup.RegisterHandleAndConsumer(context.Background(), &c.pushCh) } diff --git a/pkg/common/kafka/consumer_group.go b/pkg/common/kafka/consumer_group.go index 5245c6a6f..d63527620 100644 --- a/pkg/common/kafka/consumer_group.go +++ b/pkg/common/kafka/consumer_group.go @@ -17,8 +17,6 @@ package kafka import ( "context" "errors" - "fmt" - "github.com/IBM/sarama" "github.com/OpenIMSDK/tools/errs" "github.com/OpenIMSDK/tools/log" @@ -67,18 +65,19 @@ func (mc *MConsumerGroup) GetContextFromMsg(cMsg *sarama.ConsumerMessage) contex return GetContextWithMQHeader(cMsg.Headers) } -func (mc *MConsumerGroup) RegisterHandleAndConsumer(ctx context.Context, handler sarama.ConsumerGroupHandler, onError func(context.Context, error, string)) { +func (mc *MConsumerGroup) RegisterHandleAndConsumer(ctx context.Context, handler sarama.ConsumerGroupHandler) { log.ZDebug(ctx, "register consumer group", "groupID", mc.groupID) for { err := mc.ConsumerGroup.Consume(ctx, mc.topics, handler) - if errors.Is(err, sarama.ErrClosedConsumerGroup) || errors.Is(err, context.Canceled) { + if errors.Is(err, sarama.ErrClosedConsumerGroup) { return } - if err != nil { - errInfo := fmt.Sprintf("consume err: %v, topic: %v, groupID: %s", err, strings.Join(mc.topics, ", "), mc.groupID) - onError(ctx, err, errInfo) // 调用回调函数处理错误 + if errors.Is(err, context.Canceled) { return } + if err != nil { + log.ZWarn(ctx, "consume err", err, "topic", mc.topics, "groupID", mc.groupID) + } } }