From 630b75a1fbe114c9be13369e6aacdd541faf296f Mon Sep 17 00:00:00 2001 From: rfyiamcool Date: Wed, 29 Nov 2023 10:57:12 +0800 Subject: [PATCH] fix: graceful exit for kafka consumer of msgtransfer Signed-off-by: rfyiamcool --- internal/msgtransfer/init.go | 96 ++++++++++++++----- .../msgtransfer/online_history_msg_handler.go | 58 ++++++++--- pkg/common/kafka/consumer_group.go | 29 +++++- 3 files changed, 140 insertions(+), 43 deletions(-) diff --git a/internal/msgtransfer/init.go b/internal/msgtransfer/init.go index 8436317ee..2f7ddef3d 100644 --- a/internal/msgtransfer/init.go +++ b/internal/msgtransfer/init.go @@ -15,17 +15,24 @@ package msgtransfer import ( + "context" "errors" "fmt" - "github.com/OpenIMSDK/tools/mw" + "net/http" + "os" + "os/signal" + "sync" + "syscall" + "time" + "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/collectors" "github.com/prometheus/client_golang/prometheus/promhttp" "google.golang.org/grpc" "google.golang.org/grpc/credentials/insecure" - "log" - "net/http" - "sync" + + "github.com/OpenIMSDK/tools/log" + "github.com/OpenIMSDK/tools/mw" "github.com/openimsdk/open-im-server/v3/pkg/common/config" "github.com/openimsdk/open-im-server/v3/pkg/common/db/cache" @@ -50,18 +57,22 @@ func StartTransfer(prometheusPort int) error { if err != nil { return err } - if err := db.AutoMigrate(&relationtb.ChatLogModel{}); err != nil { + + if err = db.AutoMigrate(&relationtb.ChatLogModel{}); err != nil { fmt.Printf("gorm: AutoMigrate ChatLogModel err: %v\n", err) } + rdb, err := cache.NewRedis() if err != nil { return err } + mongo, err := unrelation.NewMongo() if err != nil { return err } - if err := mongo.CreateMsgIndex(); err != nil { + + if err = mongo.CreateMsgIndex(); err != nil { return err } client, err := kdisc.NewDiscoveryRegister(config.Config.Envs.Discovery) @@ -72,9 +83,11 @@ func StartTransfer(prometheusPort int) error { if err != nil { return err } + if err := client.CreateRpcRootNodes(config.Config.GetServiceNames()); err != nil { return err } + client.AddOption(mw.GrpcClient(), grpc.WithTransportCredentials(insecure.NewCredentials())) msgModel := cache.NewMsgCacheModel(rdb) msgDocModel := unrelation.NewMsgMongoDriver(mongo.GetDatabase()) @@ -98,35 +111,68 @@ func NewMsgTransfer(chatLogDatabase controller.ChatLogDatabase, } func (m *MsgTransfer) Start(prometheusPort int) error { - var wg sync.WaitGroup - wg.Add(1) fmt.Println("start msg transfer", "prometheusPort:", prometheusPort) if prometheusPort <= 0 { return errors.New("prometheusPort not correct") } + if config.Config.ChatPersistenceMysql { // go m.persistentCH.persistentConsumerGroup.RegisterHandleAndConsumer(m.persistentCH) } else { fmt.Println("msg transfer not start mysql consumer") } - go m.historyCH.historyConsumerGroup.RegisterHandleAndConsumer(m.historyCH) - go m.historyMongoCH.historyConsumerGroup.RegisterHandleAndConsumer(m.historyMongoCH) - // go m.modifyCH.modifyMsgConsumerGroup.RegisterHandleAndConsumer(m.modifyCH) - /*err := prome.StartPrometheusSrv(prometheusPort) - if err != nil { - return err - }*/ - //////////////////////////// + + var wg sync.WaitGroup + + wg.Add(1) + go func() { + defer wg.Done() + + m.historyCH.historyConsumerGroup.RegisterHandleAndConsumer(m.historyCH) + }() + + wg.Add(1) + go func() { + defer wg.Done() + + m.historyMongoCH.historyConsumerGroup.RegisterHandleAndConsumer(m.historyMongoCH) + }() + if config.Config.Prometheus.Enable { - reg := prometheus.NewRegistry() - reg.MustRegister( - collectors.NewGoCollector(), - ) - reg.MustRegister(prommetrics.GetGrpcCusMetrics("Transfer")...) - http.Handle("/metrics", promhttp.HandlerFor(reg, promhttp.HandlerOpts{Registry: reg})) - log.Fatal(http.ListenAndServe(fmt.Sprintf(":%d", prometheusPort), nil)) + go func() { + proreg := prometheus.NewRegistry() + proreg.MustRegister( + collectors.NewGoCollector(), + ) + proreg.MustRegister(prommetrics.GetGrpcCusMetrics("Transfer")...) + http.Handle("/metrics", promhttp.HandlerFor(proreg, promhttp.HandlerOpts{Registry: proreg})) + err := http.ListenAndServe(fmt.Sprintf(":%d", prometheusPort), nil) + if err != nil && err != http.ErrServerClosed { + panic(err) + } + }() + } + + sigs := make(chan os.Signal, 1) + signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT) + <-sigs + + // graceful close kafka client. + go m.historyCH.historyConsumerGroup.Close() + go m.historyMongoCH.historyConsumerGroup.Close() + + done := make(chan struct{}, 1) + go func() { + wg.Wait() + close(done) + }() + + select { + case <-done: + log.ZInfo(context.Background(), "msgtrasfer exit successfully") + case <-time.After(15 * time.Second): + log.ZError(context.Background(), "msgtransfer force to exit, timeout 15s", nil) } - //////////////////////////////////////// - wg.Wait() + return nil } diff --git a/internal/msgtransfer/online_history_msg_handler.go b/internal/msgtransfer/online_history_msg_handler.go index eb8e500fe..ca75a8182 100644 --- a/internal/msgtransfer/online_history_msg_handler.go +++ b/internal/msgtransfer/online_history_msg_handler.go @@ -19,6 +19,7 @@ import ( "strconv" "strings" "sync" + "sync/atomic" "time" "github.com/openimsdk/open-im-server/v3/pkg/msgprocessor" @@ -430,16 +431,29 @@ func (och *OnlineHistoryRedisConsumerHandler) ConsumeClaim( log.ZDebug(context.Background(), "online new session msg come", "highWaterMarkOffset", claim.HighWaterMarkOffset(), "topic", claim.Topic(), "partition", claim.Partition()) - split := 1000 - rwLock := new(sync.RWMutex) - messages := make([]*sarama.ConsumerMessage, 0, 1000) - ticker := time.NewTicker(time.Millisecond * 100) + var ( + split = 1000 + rwLock = new(sync.RWMutex) + messages = make([]*sarama.ConsumerMessage, 0, 1000) + ticker = time.NewTicker(time.Millisecond * 100) + wg = sync.WaitGroup{} + running = new(atomic.Bool) + ) + + wg.Add(1) go func() { + defer wg.Done() + for { select { case <-ticker.C: + // if the buffer is empty and running is false, return loop. if len(messages) == 0 { + if !running.Load() { + return + } + continue } @@ -472,17 +486,35 @@ func (och *OnlineHistoryRedisConsumerHandler) ConsumeClaim( } }() - for msg := range claim.Messages() { - if len(msg.Value) == 0 { - continue - } + wg.Add(1) + go func() { + defer wg.Done() - rwLock.Lock() - messages = append(messages, msg) - rwLock.Unlock() + for running.Load() { + select { + case msg, ok := <-claim.Messages(): + if !ok { + running.Store(false) + return + } - sess.MarkMessage(msg, "") - } + if len(msg.Value) == 0 { + continue + } + + rwLock.Lock() + messages = append(messages, msg) + rwLock.Unlock() + + sess.MarkMessage(msg, "") + + case <-sess.Context().Done(): + running.Store(false) + return + } + } + }() + wg.Wait() return nil } diff --git a/pkg/common/kafka/consumer_group.go b/pkg/common/kafka/consumer_group.go index 1eb7b522a..9abe262cf 100644 --- a/pkg/common/kafka/consumer_group.go +++ b/pkg/common/kafka/consumer_group.go @@ -16,15 +16,19 @@ package kafka import ( "context" + "errors" + + "github.com/IBM/sarama" "github.com/OpenIMSDK/tools/log" "github.com/openimsdk/open-im-server/v3/pkg/common/config" - - "github.com/IBM/sarama" ) type MConsumerGroup struct { + ctx context.Context + cancel context.CancelFunc + sarama.ConsumerGroup groupID string topics []string @@ -51,7 +55,10 @@ func NewMConsumerGroup(consumerConfig *MConsumerGroupConfig, topics, addrs []str if err != nil { panic(err.Error()) } + + ctx, cancel := context.WithCancel(context.Background()) return &MConsumerGroup{ + ctx, cancel, consumerGroup, groupID, topics, @@ -64,11 +71,23 @@ func (mc *MConsumerGroup) GetContextFromMsg(cMsg *sarama.ConsumerMessage) contex func (mc *MConsumerGroup) RegisterHandleAndConsumer(handler sarama.ConsumerGroupHandler) { log.ZDebug(context.Background(), "register consumer group", "groupID", mc.groupID) - ctx := context.Background() for { - err := mc.ConsumerGroup.Consume(ctx, mc.topics, handler) + err := mc.ConsumerGroup.Consume(mc.ctx, mc.topics, handler) + if errors.Is(err, sarama.ErrClosedConsumerGroup) { + return + } + if mc.ctx.Err() != nil { + return + } + if err != nil { - panic(err.Error()) + log.ZError(context.Background(), "kafka consume error", err) + return } } } + +func (mc *MConsumerGroup) Close() { + mc.cancel() + mc.ConsumerGroup.Close() +}