diff --git a/internal/msgtransfer/init.go b/internal/msgtransfer/init.go index 83ec00749..65a6b1935 100644 --- a/internal/msgtransfer/init.go +++ b/internal/msgtransfer/init.go @@ -18,7 +18,10 @@ import ( "context" "errors" "fmt" - "log" + "os" + "os/signal" + "syscall" + "time" "net/http" "sync" @@ -30,7 +33,7 @@ import ( "github.com/prometheus/client_golang/prometheus/promhttp" "google.golang.org/grpc" "google.golang.org/grpc/credentials/insecure" - + "github.com/OpenIMSDK/tools/log" "github.com/openimsdk/open-im-server/v3/pkg/common/config" "github.com/openimsdk/open-im-server/v3/pkg/common/db/cache" "github.com/openimsdk/open-im-server/v3/pkg/common/db/controller" @@ -51,11 +54,13 @@ func StartTransfer(prometheusPort int) error { if err != nil { return err } + mongo, err := unrelation.NewMongo() if err != nil { return err } - if err := mongo.CreateMsgIndex(); err != nil { + + if err = mongo.CreateMsgIndex(); err != nil { return err } client, err := kdisc.NewDiscoveryRegister(config.Config.Envs.Discovery) @@ -66,6 +71,7 @@ func StartTransfer(prometheusPort int) error { if err != nil { return err } + if err := client.CreateRpcRootNodes(config.Config.GetServiceNames()); err != nil { return err } @@ -103,26 +109,62 @@ func NewMsgTransfer(msgDatabase controller.CommonMsgDatabase, conversationRpcCli func (m *MsgTransfer) Start(prometheusPort int) error { ctx := context.Background() - var wg sync.WaitGroup - wg.Add(1) fmt.Println("start msg transfer", "prometheusPort:", prometheusPort) if prometheusPort <= 0 { return errs.Wrap(errors.New("prometheusPort not correct")) } - go m.historyCH.historyConsumerGroup.RegisterHandleAndConsumer(ctx, m.historyCH) - go m.historyMongoCH.historyConsumerGroup.RegisterHandleAndConsumer(ctx, m.historyMongoCH) + var wg sync.WaitGroup + + wg.Add(1) + go func() { + defer wg.Done() + + m.historyCH.historyConsumerGroup.RegisterHandleAndConsumer(ctx, m.historyCH) + }() + + wg.Add(1) + go func() { + defer wg.Done() + + m.historyMongoCH.historyConsumerGroup.RegisterHandleAndConsumer(ctx, m.historyMongoCH) + }() if config.Config.Prometheus.Enable { - reg := prometheus.NewRegistry() - reg.MustRegister( - collectors.NewGoCollector(), - ) - reg.MustRegister(prommetrics.GetGrpcCusMetrics("Transfer")...) - http.Handle("/metrics", promhttp.HandlerFor(reg, promhttp.HandlerOpts{Registry: reg})) - log.Fatal(http.ListenAndServe(fmt.Sprintf(":%d", prometheusPort), nil)) + go func() { + proreg := prometheus.NewRegistry() + proreg.MustRegister( + collectors.NewGoCollector(), + ) + proreg.MustRegister(prommetrics.GetGrpcCusMetrics("Transfer")...) + http.Handle("/metrics", promhttp.HandlerFor(proreg, promhttp.HandlerOpts{Registry: proreg})) + err := http.ListenAndServe(fmt.Sprintf(":%d", prometheusPort), nil) + if err != nil && err != http.ErrServerClosed { + panic(err) + } + }() } - //////////////////////////////////////// - wg.Wait() + + sigs := make(chan os.Signal, 1) + signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT) + <-sigs + + // graceful close kafka client. + go m.historyCH.historyConsumerGroup.Close() + go m.historyMongoCH.historyConsumerGroup.Close() + + done := make(chan struct{}, 1) + go func() { + wg.Wait() + close(done) + }() + + select { + case <-done: + log.ZInfo(context.Background(), "msgtrasfer exit successfully") + case <-time.After(15 * time.Second): + log.ZError(context.Background(), "msgtransfer force to exit, timeout 15s", nil) + } + return nil } diff --git a/internal/msgtransfer/online_history_msg_handler.go b/internal/msgtransfer/online_history_msg_handler.go index 35af330c9..6678715d4 100644 --- a/internal/msgtransfer/online_history_msg_handler.go +++ b/internal/msgtransfer/online_history_msg_handler.go @@ -19,6 +19,7 @@ import ( "strconv" "strings" "sync" + "sync/atomic" "time" "github.com/openimsdk/open-im-server/v3/pkg/msgprocessor" @@ -431,16 +432,29 @@ func (och *OnlineHistoryRedisConsumerHandler) ConsumeClaim( log.ZDebug(context.Background(), "online new session msg come", "highWaterMarkOffset", claim.HighWaterMarkOffset(), "topic", claim.Topic(), "partition", claim.Partition()) - split := 1000 - rwLock := new(sync.RWMutex) - messages := make([]*sarama.ConsumerMessage, 0, 1000) - ticker := time.NewTicker(time.Millisecond * 100) + var ( + split = 1000 + rwLock = new(sync.RWMutex) + messages = make([]*sarama.ConsumerMessage, 0, 1000) + ticker = time.NewTicker(time.Millisecond * 100) + wg = sync.WaitGroup{} + running = new(atomic.Bool) + ) + + wg.Add(1) go func() { + defer wg.Done() + for { select { case <-ticker.C: + // if the buffer is empty and running is false, return loop. if len(messages) == 0 { + if !running.Load() { + return + } + continue } @@ -473,17 +487,35 @@ func (och *OnlineHistoryRedisConsumerHandler) ConsumeClaim( } }() - for msg := range claim.Messages() { - if len(msg.Value) == 0 { - continue - } + wg.Add(1) + go func() { + defer wg.Done() - rwLock.Lock() - messages = append(messages, msg) - rwLock.Unlock() + for running.Load() { + select { + case msg, ok := <-claim.Messages(): + if !ok { + running.Store(false) + return + } - sess.MarkMessage(msg, "") - } + if len(msg.Value) == 0 { + continue + } + + rwLock.Lock() + messages = append(messages, msg) + rwLock.Unlock() + + sess.MarkMessage(msg, "") + + case <-sess.Context().Done(): + running.Store(false) + return + } + } + }() + wg.Wait() return nil } diff --git a/pkg/common/kafka/consumer_group.go b/pkg/common/kafka/consumer_group.go index 6e6f83fca..3f444cc1f 100644 --- a/pkg/common/kafka/consumer_group.go +++ b/pkg/common/kafka/consumer_group.go @@ -16,18 +16,18 @@ package kafka import ( "context" + "errors" + "github.com/IBM/sarama" "strings" - "github.com/OpenIMSDK/tools/errs" - "github.com/OpenIMSDK/tools/log" - "github.com/openimsdk/open-im-server/v3/pkg/common/config" - - "github.com/IBM/sarama" ) type MConsumerGroup struct { + ctx context.Context + cancel context.CancelFunc + sarama.ConsumerGroup groupID string topics []string @@ -54,7 +54,10 @@ func NewMConsumerGroup(consumerConfig *MConsumerGroupConfig, topics, addrs []str if err != nil { return nil, errs.Wrap(err, strings.Join(topics, ","), strings.Join(addrs, ","), groupID, config.Config.Kafka.Username, config.Config.Kafka.Password) } + + ctx, cancel := context.WithCancel(context.Background()) return &MConsumerGroup{ + ctx, cancel, consumerGroup, groupID, topics, @@ -68,7 +71,14 @@ func (mc *MConsumerGroup) GetContextFromMsg(cMsg *sarama.ConsumerMessage) contex func (mc *MConsumerGroup) RegisterHandleAndConsumer(ctx context.Context, handler sarama.ConsumerGroupHandler) { log.ZDebug(context.Background(), "register consumer group", "groupID", mc.groupID) for { - err := mc.ConsumerGroup.Consume(ctx, mc.topics, handler) + err := mc.ConsumerGroup.Consume(mc.ctx, mc.topics, handler) + if errors.Is(err, sarama.ErrClosedConsumerGroup) { + return + } + if mc.ctx.Err() != nil { + return + } + if err != nil { log.ZWarn(ctx, "consume err", err, "topic", mc.topics, "groupID", mc.groupID) } @@ -77,3 +87,8 @@ func (mc *MConsumerGroup) RegisterHandleAndConsumer(ctx context.Context, handler } } } + +func (mc *MConsumerGroup) Close() { + mc.cancel() + mc.ConsumerGroup.Close() +}