From 9610da9123fea0fb3954553be7ebd138a7beb8d7 Mon Sep 17 00:00:00 2001 From: Xinwei Xiong <3293172751NSS@gmail.com> Date: Sun, 4 Feb 2024 20:05:53 +0800 Subject: [PATCH 1/5] fix(main): fix openim scripts start rpc log (#1877) * Update start.go * Update start.go --- pkg/common/startrpc/start.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pkg/common/startrpc/start.go b/pkg/common/startrpc/start.go index f7b547834..f6cda2ffb 100644 --- a/pkg/common/startrpc/start.go +++ b/pkg/common/startrpc/start.go @@ -17,7 +17,6 @@ package startrpc import ( "errors" "fmt" - "log" "net" "net/http" "os" @@ -117,7 +116,8 @@ func Start( // Create a HTTP server for prometheus. httpServer := &http.Server{Handler: promhttp.HandlerFor(reg, promhttp.HandlerOpts{}), Addr: fmt.Sprintf("0.0.0.0:%d", prometheusPort)} if err := httpServer.ListenAndServe(); err != nil { - log.Fatal("Unable to start a http server. ", err.Error(), "PrometheusPort:", prometheusPort) + fmt.Fprintf(os.Stderr, "\n\nexit -1: \n%+v PrometheusPort: %d \n\n", err, prometheusPort) + os.Exit(-1) } } return nil From 311d42283b2a0a022f8827fb0927ca30601fbb73 Mon Sep 17 00:00:00 2001 From: Xinwei Xiong <3293172751NSS@gmail.com> Date: Sun, 4 Feb 2024 20:06:34 +0800 Subject: [PATCH 2/5] feat: fix openim logs and ci (#1878) --- scripts/lib/logging.sh | 14 +++++++++++--- scripts/lib/util.sh | 4 ---- scripts/start-all.sh | 2 +- 3 files changed, 12 insertions(+), 8 deletions(-) diff --git a/scripts/lib/logging.sh b/scripts/lib/logging.sh index 2fa77dd83..8f2bb33cf 100755 --- a/scripts/lib/logging.sh +++ b/scripts/lib/logging.sh @@ -131,16 +131,24 @@ openim::log::error_exit() { exit "${code}" } -# Log an error but keep going. Don't dump the stack or exit. +# Log an error but keep going. Don't dump the stack or exit. openim::log::error() { + # Define red color + red='\033[0;31m' + # No color (reset) + nc='\033[0m' # No Color + timestamp=$(date +"[%Y-%m-%d %H:%M:%S %Z]") - echo_log "!!! ${timestamp} ${1-}" >&2 + # Apply red color for error message + echo_log "${red}!!! ${timestamp} ${1-}${nc}" >&2 shift for message; do - echo_log " ${message}" >&2 + # Apply red color for subsequent lines of the error message + echo_log "${red} ${message}${nc}" >&2 done } + # Print an usage message to stderr. The arguments are printed directly. openim::log::usage() { echo_log >&2 diff --git a/scripts/lib/util.sh b/scripts/lib/util.sh index b93f45205..eaefaf22a 100755 --- a/scripts/lib/util.sh +++ b/scripts/lib/util.sh @@ -1541,12 +1541,8 @@ openim::util::check_ports() { if [[ "$OSTYPE" == "linux-gnu"* ]]; then if command -v ss > /dev/null 2>&1; then info=$(ss -ltnp | grep ":$port" || true) - openim::color::echo $COLOR_RED "!!!!!!!! port=$port" - openim::color::echo $COLOR_RED "!!!!!!!! info=$info" else info=$(netstat -ltnp | grep ":$port" || true) - openim::color::echo $COLOR_RED "!!!!!!!! port=$port" - openim::color::echo $COLOR_RED "!!!!!!!! info=$info" fi elif [[ "$OSTYPE" == "darwin"* ]]; then # For macOS, use lsof diff --git a/scripts/start-all.sh b/scripts/start-all.sh index 5f34cbdbe..ca03f0c3c 100755 --- a/scripts/start-all.sh +++ b/scripts/start-all.sh @@ -82,4 +82,4 @@ execute_scripts openim::log::info "\n## Post Starting OpenIM services" ${TOOLS_START_SCRIPTS_PATH} openim::tools::post-start -openim::log::success "✨ All OpenIM services have been successfully started!" \ No newline at end of file +openim::color::echo $COLOR_BLUE "✨ All OpenIM services have been successfully started!" \ No newline at end of file From ee245157614af085525b88888daae0c51826276b Mon Sep 17 00:00:00 2001 From: Brabem <69128477+luhaoling@users.noreply.github.com> Date: Sun, 4 Feb 2024 20:13:17 +0800 Subject: [PATCH 3/5] feat: add getUserToken api and add ex field in getSortedConversationListResp (#1880) * fix: del the manager config and manger init statement * fix: fix the Manger judge condition * fix: fix revokeMsg error * fix: find erors * fix: find error * fix: fix the AdminAccount error * fix: del the debug statement * fix: fix the component check func * fix: fix the get zkAddress error * fix: fix the kafka client close error * fix: add env in minio connected * fix: del the minio env * fix: fix the go.mod tools version * fix: del get env in minio conneted * feat: add GetUserToken api and add ex field in GetSortedConversationList resp * fix: fix the go.mod version * fix: add lack method * fix: add a method * fix: add lack implement * fix: fix the tools pkg version * fix: del the unuser pkg * fix: add Limiting judgement of get admin token --- go.mod | 4 +--- go.sum | 4 ++-- internal/api/auth.go | 4 ++++ internal/api/route.go | 1 + internal/rpc/auth/auth.go | 22 ++++++++++++++++++++++ internal/rpc/conversation/conversaion.go | 5 +++++ internal/rpc/group/group.go | 5 +++++ internal/rpc/user/user.go | 5 +++++ pkg/authverify/token.go | 3 +-- 9 files changed, 46 insertions(+), 7 deletions(-) diff --git a/go.mod b/go.mod index 3da7c3ecd..ab138e68c 100644 --- a/go.mod +++ b/go.mod @@ -4,7 +4,7 @@ go 1.19 require ( firebase.google.com/go v3.13.0+incompatible - github.com/OpenIMSDK/protocol v0.0.48 + github.com/OpenIMSDK/protocol v0.0.55 github.com/OpenIMSDK/tools v0.0.33 github.com/bwmarrin/snowflake v0.3.0 // indirect github.com/dtm-labs/rockscache v0.1.1 @@ -155,5 +155,3 @@ require ( golang.org/x/crypto v0.17.0 // indirect gopkg.in/ini.v1 v1.67.0 // indirect ) - -replace github.com/OpenIMSDK/protocol v0.0.47 => github.com/AndrewZuo01/protocol v0.0.0-20240112093520-fd9c53e27b94 diff --git a/go.sum b/go.sum index 84620fe7d..94a516366 100644 --- a/go.sum +++ b/go.sum @@ -18,8 +18,8 @@ firebase.google.com/go v3.13.0+incompatible/go.mod h1:xlah6XbEyW6tbfSklcfe5FHJIw github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= github.com/IBM/sarama v1.41.3 h1:MWBEJ12vHC8coMjdEXFq/6ftO6DUZnQlFYcxtOJFa7c= github.com/IBM/sarama v1.41.3/go.mod h1:Xxho9HkHd4K/MDUo/T/sOqwtX/17D33++E9Wib6hUdQ= -github.com/OpenIMSDK/protocol v0.0.48 h1:8MIMjyzJRsruYhVv2ZKArFiOveroaofDOb3dlAdgjsw= -github.com/OpenIMSDK/protocol v0.0.48/go.mod h1:F25dFrwrIx3lkNoiuf6FkCfxuwf8L4Z8UIsdTHP/r0Y= +github.com/OpenIMSDK/protocol v0.0.55 h1:eBjg8DyuhxGmuCUjpoZjg6MJJJXU/xJ3xJwFhrn34yA= +github.com/OpenIMSDK/protocol v0.0.55/go.mod h1:F25dFrwrIx3lkNoiuf6FkCfxuwf8L4Z8UIsdTHP/r0Y= github.com/OpenIMSDK/tools v0.0.33 h1:rvFCxXaXxLv1MJFC4qcoWRGwKBnV+hR68UN2N0/zZhE= github.com/OpenIMSDK/tools v0.0.33/go.mod h1:wBfR5CYmEyvxl03QJbTkhz1CluK6J4/lX0lviu8JAjE= github.com/QcloudApi/qcloud_sign_golang v0.0.0-20141224014652-e4130a326409/go.mod h1:1pk82RBxDY/JZnPQrtqHlUFfCctgdorsd9M06fMynOM= diff --git a/internal/api/auth.go b/internal/api/auth.go index 44a97a013..88539f63a 100644 --- a/internal/api/auth.go +++ b/internal/api/auth.go @@ -33,6 +33,10 @@ func (o *AuthApi) UserToken(c *gin.Context) { a2r.Call(auth.AuthClient.UserToken, o.Client, c) } +func (o *AuthApi) GetUserToken(c *gin.Context) { + a2r.Call(auth.AuthClient.GetUserToken, o.Client, c) +} + func (o *AuthApi) ParseToken(c *gin.Context) { a2r.Call(auth.AuthClient.ParseToken, o.Client, c) } diff --git a/internal/api/route.go b/internal/api/route.go index 10907d086..24ed5f6bb 100644 --- a/internal/api/route.go +++ b/internal/api/route.go @@ -150,6 +150,7 @@ func NewGinRouter(discov discoveryregistry.SvcDiscoveryRegistry, rdb redis.Unive { a := NewAuthApi(*authRpc) authRouterGroup.POST("/user_token", a.UserToken) + authRouterGroup.POST("/get_user_token", ParseToken, a.GetUserToken) authRouterGroup.POST("/parse_token", a.ParseToken) authRouterGroup.POST("/force_logout", ParseToken, a.ForceLogout) } diff --git a/internal/rpc/auth/auth.go b/internal/rpc/auth/auth.go index ee8ead194..eaf63f868 100644 --- a/internal/rpc/auth/auth.go +++ b/internal/rpc/auth/auth.go @@ -80,6 +80,28 @@ func (s *authServer) UserToken(ctx context.Context, req *pbauth.UserTokenReq) (* return &resp, nil } +func (s *authServer) GetUserToken(ctx context.Context, req *pbauth.GetUserTokenReq) (*pbauth.GetUserTokenResp, error) { + if err := authverify.CheckAdmin(ctx); err != nil { + return nil, err + } + resp := pbauth.GetUserTokenResp{} + + if authverify.IsManagerUserID(req.UserID) { + return nil, errs.ErrNoPermission.Wrap("don't get Admin token") + } + + if _, err := s.userRpcClient.GetUserInfo(ctx, req.UserID); err != nil { + return nil, err + } + token, err := s.authDatabase.CreateToken(ctx, req.UserID, int(req.PlatformID)) + if err != nil { + return nil, err + } + resp.Token = token + resp.ExpireTimeSeconds = config.Config.TokenPolicy.Expire * 24 * 60 * 60 + return &resp, nil +} + func (s *authServer) parseToken(ctx context.Context, tokensString string) (claims *tokenverify.Claims, err error) { claims, err = tokenverify.GetClaimFromToken(tokensString, authverify.Secret()) if err != nil { diff --git a/internal/rpc/conversation/conversaion.go b/internal/rpc/conversation/conversaion.go index 40803089c..3317359e5 100644 --- a/internal/rpc/conversation/conversaion.go +++ b/internal/rpc/conversation/conversaion.go @@ -51,6 +51,11 @@ type conversationServer struct { conversationNotificationSender *notification.ConversationNotificationSender } +func (c *conversationServer) GetConversationNotReceiveMessageUserIDs(ctx context.Context, req *pbconversation.GetConversationNotReceiveMessageUserIDsReq) (*pbconversation.GetConversationNotReceiveMessageUserIDsResp, error) { + //TODO implement me + panic("implement me") +} + func Start(client discoveryregistry.SvcDiscoveryRegistry, server *grpc.Server) error { rdb, err := cache.NewRedis() if err != nil { diff --git a/internal/rpc/group/group.go b/internal/rpc/group/group.go index abc271651..1d068b1b2 100644 --- a/internal/rpc/group/group.go +++ b/internal/rpc/group/group.go @@ -108,6 +108,11 @@ type groupServer struct { msgRpcClient rpcclient.MessageRpcClient } +func (s *groupServer) GetJoinedGroupIDs(ctx context.Context, req *pbgroup.GetJoinedGroupIDsReq) (*pbgroup.GetJoinedGroupIDsResp, error) { + //TODO implement me + panic("implement me") +} + func (s *groupServer) NotificationUserInfoUpdate(ctx context.Context, req *pbgroup.NotificationUserInfoUpdateReq) (*pbgroup.NotificationUserInfoUpdateResp, error) { defer log.ZDebug(ctx, "NotificationUserInfoUpdate return") members, err := s.db.FindGroupMemberUser(ctx, nil, req.UserID) diff --git a/internal/rpc/user/user.go b/internal/rpc/user/user.go index e5567f436..6f9e2949f 100644 --- a/internal/rpc/user/user.go +++ b/internal/rpc/user/user.go @@ -61,6 +61,11 @@ type userServer struct { RegisterCenter registry.SvcDiscoveryRegistry } +func (s *userServer) GetGroupOnlineUser(ctx context.Context, req *pbuser.GetGroupOnlineUserReq) (*pbuser.GetGroupOnlineUserResp, error) { + //TODO implement me + panic("implement me") +} + func Start(client registry.SvcDiscoveryRegistry, server *grpc.Server) error { rdb, err := cache.NewRedis() if err != nil { diff --git a/pkg/authverify/token.go b/pkg/authverify/token.go index 97bb03391..b951bf219 100644 --- a/pkg/authverify/token.go +++ b/pkg/authverify/token.go @@ -48,8 +48,7 @@ func CheckAccessV3(ctx context.Context, ownerUserID string) (err error) { } func IsAppManagerUid(ctx context.Context) bool { - return (len(config.Config.Manager.UserID) > 0 && utils.IsContain(mcontext.GetOpUserID(ctx), config.Config.Manager.UserID)) || - utils.IsContain(mcontext.GetOpUserID(ctx), config.Config.IMAdmin.UserID) + return (len(config.Config.Manager.UserID) > 0 && utils.IsContain(mcontext.GetOpUserID(ctx), config.Config.Manager.UserID)) || utils.IsContain(mcontext.GetOpUserID(ctx), config.Config.IMAdmin.UserID) } func CheckAdmin(ctx context.Context) error { From 0865eb65b1eaed0b5a699c8571255b3fd6fb09a3 Mon Sep 17 00:00:00 2001 From: Xinwei Xiong <3293172751NSS@gmail.com> Date: Mon, 5 Feb 2024 10:08:55 +0800 Subject: [PATCH 4/5] fix: kill 10 process optimization (#1883) --- scripts/lib/util.sh | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/scripts/lib/util.sh b/scripts/lib/util.sh index eaefaf22a..cace53645 100755 --- a/scripts/lib/util.sh +++ b/scripts/lib/util.sh @@ -486,7 +486,7 @@ openim::util::stop_services_on_ports() { local pid=$(echo $line | awk '{print $2}') # Try to stop the service by killing its process. - if kill -TERM $pid; then + if kill -10 $pid; then stopped+=($port) else not_stopped+=($port) @@ -563,7 +563,7 @@ openim::util::stop_services_with_name() { # If there's a Process ID, it means the service with the name is running. if [[ -n $pid ]]; then # Try to stop the service by killing its process. - if kill -TERM $pid 2>/dev/null; then + if kill -10 $pid 2>/dev/null; then stopped_this_time=true fi fi @@ -1722,7 +1722,7 @@ openim::util::stop_services_on_ports() { local pid=$(echo $line | awk '{print $2}') # Try to stop the service by killing its process. - if kill -TERM $pid; then + if kill -10 $pid; then stopped+=($port) else not_stopped+=($port) @@ -1799,7 +1799,7 @@ openim::util::stop_services_with_name() { # If there's a Process ID, it means the service with the name is running. if [[ -n $pid ]]; then # Try to stop the service by killing its process. - if kill -TERM $pid 2>/dev/null; then + if kill -10 $pid 2>/dev/null; then stopped_this_time=true fi fi From 31381935f188210bc905d8cbe5c0959b8584fb61 Mon Sep 17 00:00:00 2001 From: "fengyun.rui" Date: Mon, 5 Feb 2024 10:37:53 +0800 Subject: [PATCH 5/5] fix: graceful exit for kafka consumer of msgtransfer (#1483) * fix: graceful exit for kafka consumer of msgtransfer Signed-off-by: rfyiamcool * Update init.go * Update init.go --------- Signed-off-by: rfyiamcool Co-authored-by: OpenIM-Gordon <46924906+FGadvancer@users.noreply.github.com> --- internal/msgtransfer/init.go | 74 +++++++++++++++---- .../msgtransfer/online_history_msg_handler.go | 58 +++++++++++---- pkg/common/kafka/consumer_group.go | 27 +++++-- 3 files changed, 124 insertions(+), 35 deletions(-) diff --git a/internal/msgtransfer/init.go b/internal/msgtransfer/init.go index 83ec00749..65a6b1935 100644 --- a/internal/msgtransfer/init.go +++ b/internal/msgtransfer/init.go @@ -18,7 +18,10 @@ import ( "context" "errors" "fmt" - "log" + "os" + "os/signal" + "syscall" + "time" "net/http" "sync" @@ -30,7 +33,7 @@ import ( "github.com/prometheus/client_golang/prometheus/promhttp" "google.golang.org/grpc" "google.golang.org/grpc/credentials/insecure" - + "github.com/OpenIMSDK/tools/log" "github.com/openimsdk/open-im-server/v3/pkg/common/config" "github.com/openimsdk/open-im-server/v3/pkg/common/db/cache" "github.com/openimsdk/open-im-server/v3/pkg/common/db/controller" @@ -51,11 +54,13 @@ func StartTransfer(prometheusPort int) error { if err != nil { return err } + mongo, err := unrelation.NewMongo() if err != nil { return err } - if err := mongo.CreateMsgIndex(); err != nil { + + if err = mongo.CreateMsgIndex(); err != nil { return err } client, err := kdisc.NewDiscoveryRegister(config.Config.Envs.Discovery) @@ -66,6 +71,7 @@ func StartTransfer(prometheusPort int) error { if err != nil { return err } + if err := client.CreateRpcRootNodes(config.Config.GetServiceNames()); err != nil { return err } @@ -103,26 +109,62 @@ func NewMsgTransfer(msgDatabase controller.CommonMsgDatabase, conversationRpcCli func (m *MsgTransfer) Start(prometheusPort int) error { ctx := context.Background() - var wg sync.WaitGroup - wg.Add(1) fmt.Println("start msg transfer", "prometheusPort:", prometheusPort) if prometheusPort <= 0 { return errs.Wrap(errors.New("prometheusPort not correct")) } - go m.historyCH.historyConsumerGroup.RegisterHandleAndConsumer(ctx, m.historyCH) - go m.historyMongoCH.historyConsumerGroup.RegisterHandleAndConsumer(ctx, m.historyMongoCH) + var wg sync.WaitGroup + + wg.Add(1) + go func() { + defer wg.Done() + + m.historyCH.historyConsumerGroup.RegisterHandleAndConsumer(ctx, m.historyCH) + }() + + wg.Add(1) + go func() { + defer wg.Done() + + m.historyMongoCH.historyConsumerGroup.RegisterHandleAndConsumer(ctx, m.historyMongoCH) + }() if config.Config.Prometheus.Enable { - reg := prometheus.NewRegistry() - reg.MustRegister( - collectors.NewGoCollector(), - ) - reg.MustRegister(prommetrics.GetGrpcCusMetrics("Transfer")...) - http.Handle("/metrics", promhttp.HandlerFor(reg, promhttp.HandlerOpts{Registry: reg})) - log.Fatal(http.ListenAndServe(fmt.Sprintf(":%d", prometheusPort), nil)) + go func() { + proreg := prometheus.NewRegistry() + proreg.MustRegister( + collectors.NewGoCollector(), + ) + proreg.MustRegister(prommetrics.GetGrpcCusMetrics("Transfer")...) + http.Handle("/metrics", promhttp.HandlerFor(proreg, promhttp.HandlerOpts{Registry: proreg})) + err := http.ListenAndServe(fmt.Sprintf(":%d", prometheusPort), nil) + if err != nil && err != http.ErrServerClosed { + panic(err) + } + }() } - //////////////////////////////////////// - wg.Wait() + + sigs := make(chan os.Signal, 1) + signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT) + <-sigs + + // graceful close kafka client. + go m.historyCH.historyConsumerGroup.Close() + go m.historyMongoCH.historyConsumerGroup.Close() + + done := make(chan struct{}, 1) + go func() { + wg.Wait() + close(done) + }() + + select { + case <-done: + log.ZInfo(context.Background(), "msgtrasfer exit successfully") + case <-time.After(15 * time.Second): + log.ZError(context.Background(), "msgtransfer force to exit, timeout 15s", nil) + } + return nil } diff --git a/internal/msgtransfer/online_history_msg_handler.go b/internal/msgtransfer/online_history_msg_handler.go index 35af330c9..6678715d4 100644 --- a/internal/msgtransfer/online_history_msg_handler.go +++ b/internal/msgtransfer/online_history_msg_handler.go @@ -19,6 +19,7 @@ import ( "strconv" "strings" "sync" + "sync/atomic" "time" "github.com/openimsdk/open-im-server/v3/pkg/msgprocessor" @@ -431,16 +432,29 @@ func (och *OnlineHistoryRedisConsumerHandler) ConsumeClaim( log.ZDebug(context.Background(), "online new session msg come", "highWaterMarkOffset", claim.HighWaterMarkOffset(), "topic", claim.Topic(), "partition", claim.Partition()) - split := 1000 - rwLock := new(sync.RWMutex) - messages := make([]*sarama.ConsumerMessage, 0, 1000) - ticker := time.NewTicker(time.Millisecond * 100) + var ( + split = 1000 + rwLock = new(sync.RWMutex) + messages = make([]*sarama.ConsumerMessage, 0, 1000) + ticker = time.NewTicker(time.Millisecond * 100) + wg = sync.WaitGroup{} + running = new(atomic.Bool) + ) + + wg.Add(1) go func() { + defer wg.Done() + for { select { case <-ticker.C: + // if the buffer is empty and running is false, return loop. if len(messages) == 0 { + if !running.Load() { + return + } + continue } @@ -473,17 +487,35 @@ func (och *OnlineHistoryRedisConsumerHandler) ConsumeClaim( } }() - for msg := range claim.Messages() { - if len(msg.Value) == 0 { - continue - } + wg.Add(1) + go func() { + defer wg.Done() - rwLock.Lock() - messages = append(messages, msg) - rwLock.Unlock() + for running.Load() { + select { + case msg, ok := <-claim.Messages(): + if !ok { + running.Store(false) + return + } - sess.MarkMessage(msg, "") - } + if len(msg.Value) == 0 { + continue + } + + rwLock.Lock() + messages = append(messages, msg) + rwLock.Unlock() + + sess.MarkMessage(msg, "") + + case <-sess.Context().Done(): + running.Store(false) + return + } + } + }() + wg.Wait() return nil } diff --git a/pkg/common/kafka/consumer_group.go b/pkg/common/kafka/consumer_group.go index 6e6f83fca..3f444cc1f 100644 --- a/pkg/common/kafka/consumer_group.go +++ b/pkg/common/kafka/consumer_group.go @@ -16,18 +16,18 @@ package kafka import ( "context" + "errors" + "github.com/IBM/sarama" "strings" - "github.com/OpenIMSDK/tools/errs" - "github.com/OpenIMSDK/tools/log" - "github.com/openimsdk/open-im-server/v3/pkg/common/config" - - "github.com/IBM/sarama" ) type MConsumerGroup struct { + ctx context.Context + cancel context.CancelFunc + sarama.ConsumerGroup groupID string topics []string @@ -54,7 +54,10 @@ func NewMConsumerGroup(consumerConfig *MConsumerGroupConfig, topics, addrs []str if err != nil { return nil, errs.Wrap(err, strings.Join(topics, ","), strings.Join(addrs, ","), groupID, config.Config.Kafka.Username, config.Config.Kafka.Password) } + + ctx, cancel := context.WithCancel(context.Background()) return &MConsumerGroup{ + ctx, cancel, consumerGroup, groupID, topics, @@ -68,7 +71,14 @@ func (mc *MConsumerGroup) GetContextFromMsg(cMsg *sarama.ConsumerMessage) contex func (mc *MConsumerGroup) RegisterHandleAndConsumer(ctx context.Context, handler sarama.ConsumerGroupHandler) { log.ZDebug(context.Background(), "register consumer group", "groupID", mc.groupID) for { - err := mc.ConsumerGroup.Consume(ctx, mc.topics, handler) + err := mc.ConsumerGroup.Consume(mc.ctx, mc.topics, handler) + if errors.Is(err, sarama.ErrClosedConsumerGroup) { + return + } + if mc.ctx.Err() != nil { + return + } + if err != nil { log.ZWarn(ctx, "consume err", err, "topic", mc.topics, "groupID", mc.groupID) } @@ -77,3 +87,8 @@ func (mc *MConsumerGroup) RegisterHandleAndConsumer(ctx context.Context, handler } } } + +func (mc *MConsumerGroup) Close() { + mc.cancel() + mc.ConsumerGroup.Close() +}