From 455ac082efc386631bb85259dfbcdee41ea98ebc Mon Sep 17 00:00:00 2001 From: luhaoling <2198702716@qq.com> Date: Sun, 18 Feb 2024 18:19:13 +0800 Subject: [PATCH] fix: fix the unsolve error --- internal/msggateway/init.go | 3 +++ internal/msgtransfer/init.go | 8 ++++++-- internal/push/consumer_init.go | 10 ++++++++-- pkg/common/kafka/consumer_group.go | 14 +++++++------- 4 files changed, 24 insertions(+), 11 deletions(-) diff --git a/internal/msggateway/init.go b/internal/msggateway/init.go index 321407f7e..f2bedbc9f 100644 --- a/internal/msggateway/init.go +++ b/internal/msggateway/init.go @@ -49,6 +49,9 @@ func RunWsAndServer(rpcPort, wsPort, prometheusPort int) error { if err != nil { netDone <- err } + if err == nil { + fmt.Println("review SIGNAL but err is nil") + } }() return hubServer.LongConnServer.Run(netDone) } diff --git a/internal/msgtransfer/init.go b/internal/msgtransfer/init.go index abf20e677..062017f44 100644 --- a/internal/msgtransfer/init.go +++ b/internal/msgtransfer/init.go @@ -19,6 +19,7 @@ import ( "errors" "fmt" "github.com/OpenIMSDK/tools/errs" + "github.com/OpenIMSDK/tools/log" util "github.com/openimsdk/open-im-server/v3/pkg/util/genutil" "github.com/OpenIMSDK/tools/mw" @@ -113,8 +114,11 @@ func (m *MsgTransfer) Start(prometheusPort int) error { netErr error ) - go m.historyCH.historyConsumerGroup.RegisterHandleAndConsumer(m.ctx, m.historyCH) - go m.historyMongoCH.historyConsumerGroup.RegisterHandleAndConsumer(m.ctx, m.historyMongoCH) + onError := func(ctx context.Context, err error, errInfo string) { + log.ZWarn(ctx, errInfo, err) + } + go m.historyCH.historyConsumerGroup.RegisterHandleAndConsumer(m.ctx, m.historyCH, onError) + go m.historyMongoCH.historyConsumerGroup.RegisterHandleAndConsumer(m.ctx, m.historyMongoCH, onError) if config.Config.Prometheus.Enable { go func() { diff --git a/internal/push/consumer_init.go b/internal/push/consumer_init.go index ceab86165..80478de99 100644 --- a/internal/push/consumer_init.go +++ b/internal/push/consumer_init.go @@ -14,7 +14,10 @@ package push -import "context" +import ( + "context" + "github.com/OpenIMSDK/tools/log" +) type Consumer struct { pushCh ConsumerHandler @@ -32,6 +35,9 @@ func NewConsumer(pusher *Pusher) (*Consumer, error) { } func (c *Consumer) Start() { + onError := func(ctx context.Context, err error, errInfo string) { + log.ZWarn(ctx, errInfo, err) + } + go c.pushCh.pushConsumerGroup.RegisterHandleAndConsumer(context.Background(), &c.pushCh, onError) - go c.pushCh.pushConsumerGroup.RegisterHandleAndConsumer(context.Background(), &c.pushCh) } diff --git a/pkg/common/kafka/consumer_group.go b/pkg/common/kafka/consumer_group.go index 7c0b4ef26..908b8f088 100644 --- a/pkg/common/kafka/consumer_group.go +++ b/pkg/common/kafka/consumer_group.go @@ -17,6 +17,7 @@ package kafka import ( "context" "errors" + "fmt" "github.com/IBM/sarama" "github.com/OpenIMSDK/tools/errs" "github.com/OpenIMSDK/tools/log" @@ -64,18 +65,17 @@ func (mc *MConsumerGroup) GetContextFromMsg(cMsg *sarama.ConsumerMessage) contex return GetContextWithMQHeader(cMsg.Headers) } -func (mc *MConsumerGroup) RegisterHandleAndConsumer(ctx context.Context, handler sarama.ConsumerGroupHandler) error { +func (mc *MConsumerGroup) RegisterHandleAndConsumer(ctx context.Context, handler sarama.ConsumerGroupHandler, onError func(context.Context, error, string)) { log.ZDebug(ctx, "register consumer group", "groupID", mc.groupID) for { err := mc.ConsumerGroup.Consume(ctx, mc.topics, handler) - if errors.Is(err, sarama.ErrClosedConsumerGroup) { - return nil - } - if errors.Is(err, context.Canceled) { - return nil + if errors.Is(err, sarama.ErrClosedConsumerGroup) || errors.Is(err, context.Canceled) { + return } if err != nil { - log.ZWarn(ctx, "consume err", err, "topic", mc.topics, "groupID", mc.groupID) + errInfo := fmt.Sprintf("consume err: %v, topic: %v, groupID: %s", err, strings.Join(mc.topics, ", "), mc.groupID) + onError(ctx, err, errInfo) // 调用回调函数处理错误 + return } } }