diff --git a/cmd/openim-rpc/openim-rpc-auth/main.go b/cmd/openim-rpc/openim-rpc-auth/main.go index b29efd484..de09de93d 100644 --- a/cmd/openim-rpc/openim-rpc-auth/main.go +++ b/cmd/openim-rpc/openim-rpc-auth/main.go @@ -17,6 +17,7 @@ package main import ( "fmt" "os" + "path/filepath" "github.com/openimsdk/open-im-server/v3/internal/rpc/auth" "github.com/openimsdk/open-im-server/v3/pkg/common/cmd" @@ -31,7 +32,9 @@ func main() { panic(err.Error()) } if err := authCmd.StartSvr(config.Config.RpcRegisterName.OpenImAuthName, auth.Start); err != nil { - fmt.Fprintf(os.Stderr, "\n\nexit -1: \n%+v\n\n", err) + progName := filepath.Base(os.Args[0]) + fmt.Fprintf(os.Stderr, "\n\n%s exit -1: \n%+v\n\n", progName, err) os.Exit(-1) } + } diff --git a/go.mod b/go.mod index 3da7c3ecd..ab138e68c 100644 --- a/go.mod +++ b/go.mod @@ -4,7 +4,7 @@ go 1.19 require ( firebase.google.com/go v3.13.0+incompatible - github.com/OpenIMSDK/protocol v0.0.48 + github.com/OpenIMSDK/protocol v0.0.55 github.com/OpenIMSDK/tools v0.0.33 github.com/bwmarrin/snowflake v0.3.0 // indirect github.com/dtm-labs/rockscache v0.1.1 @@ -155,5 +155,3 @@ require ( golang.org/x/crypto v0.17.0 // indirect gopkg.in/ini.v1 v1.67.0 // indirect ) - -replace github.com/OpenIMSDK/protocol v0.0.47 => github.com/AndrewZuo01/protocol v0.0.0-20240112093520-fd9c53e27b94 diff --git a/go.sum b/go.sum index 84620fe7d..94a516366 100644 --- a/go.sum +++ b/go.sum @@ -18,8 +18,8 @@ firebase.google.com/go v3.13.0+incompatible/go.mod h1:xlah6XbEyW6tbfSklcfe5FHJIw github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= github.com/IBM/sarama v1.41.3 h1:MWBEJ12vHC8coMjdEXFq/6ftO6DUZnQlFYcxtOJFa7c= github.com/IBM/sarama v1.41.3/go.mod h1:Xxho9HkHd4K/MDUo/T/sOqwtX/17D33++E9Wib6hUdQ= -github.com/OpenIMSDK/protocol v0.0.48 h1:8MIMjyzJRsruYhVv2ZKArFiOveroaofDOb3dlAdgjsw= -github.com/OpenIMSDK/protocol v0.0.48/go.mod h1:F25dFrwrIx3lkNoiuf6FkCfxuwf8L4Z8UIsdTHP/r0Y= +github.com/OpenIMSDK/protocol v0.0.55 h1:eBjg8DyuhxGmuCUjpoZjg6MJJJXU/xJ3xJwFhrn34yA= +github.com/OpenIMSDK/protocol v0.0.55/go.mod h1:F25dFrwrIx3lkNoiuf6FkCfxuwf8L4Z8UIsdTHP/r0Y= github.com/OpenIMSDK/tools v0.0.33 h1:rvFCxXaXxLv1MJFC4qcoWRGwKBnV+hR68UN2N0/zZhE= github.com/OpenIMSDK/tools v0.0.33/go.mod h1:wBfR5CYmEyvxl03QJbTkhz1CluK6J4/lX0lviu8JAjE= github.com/QcloudApi/qcloud_sign_golang v0.0.0-20141224014652-e4130a326409/go.mod h1:1pk82RBxDY/JZnPQrtqHlUFfCctgdorsd9M06fMynOM= diff --git a/internal/api/auth.go b/internal/api/auth.go index 44a97a013..88539f63a 100644 --- a/internal/api/auth.go +++ b/internal/api/auth.go @@ -33,6 +33,10 @@ func (o *AuthApi) UserToken(c *gin.Context) { a2r.Call(auth.AuthClient.UserToken, o.Client, c) } +func (o *AuthApi) GetUserToken(c *gin.Context) { + a2r.Call(auth.AuthClient.GetUserToken, o.Client, c) +} + func (o *AuthApi) ParseToken(c *gin.Context) { a2r.Call(auth.AuthClient.ParseToken, o.Client, c) } diff --git a/internal/api/route.go b/internal/api/route.go index 10907d086..24ed5f6bb 100644 --- a/internal/api/route.go +++ b/internal/api/route.go @@ -150,6 +150,7 @@ func NewGinRouter(discov discoveryregistry.SvcDiscoveryRegistry, rdb redis.Unive { a := NewAuthApi(*authRpc) authRouterGroup.POST("/user_token", a.UserToken) + authRouterGroup.POST("/get_user_token", ParseToken, a.GetUserToken) authRouterGroup.POST("/parse_token", a.ParseToken) authRouterGroup.POST("/force_logout", ParseToken, a.ForceLogout) } diff --git a/internal/msgtransfer/init.go b/internal/msgtransfer/init.go index 83ec00749..65a6b1935 100644 --- a/internal/msgtransfer/init.go +++ b/internal/msgtransfer/init.go @@ -18,7 +18,10 @@ import ( "context" "errors" "fmt" - "log" + "os" + "os/signal" + "syscall" + "time" "net/http" "sync" @@ -30,7 +33,7 @@ import ( "github.com/prometheus/client_golang/prometheus/promhttp" "google.golang.org/grpc" "google.golang.org/grpc/credentials/insecure" - + "github.com/OpenIMSDK/tools/log" "github.com/openimsdk/open-im-server/v3/pkg/common/config" "github.com/openimsdk/open-im-server/v3/pkg/common/db/cache" "github.com/openimsdk/open-im-server/v3/pkg/common/db/controller" @@ -51,11 +54,13 @@ func StartTransfer(prometheusPort int) error { if err != nil { return err } + mongo, err := unrelation.NewMongo() if err != nil { return err } - if err := mongo.CreateMsgIndex(); err != nil { + + if err = mongo.CreateMsgIndex(); err != nil { return err } client, err := kdisc.NewDiscoveryRegister(config.Config.Envs.Discovery) @@ -66,6 +71,7 @@ func StartTransfer(prometheusPort int) error { if err != nil { return err } + if err := client.CreateRpcRootNodes(config.Config.GetServiceNames()); err != nil { return err } @@ -103,26 +109,62 @@ func NewMsgTransfer(msgDatabase controller.CommonMsgDatabase, conversationRpcCli func (m *MsgTransfer) Start(prometheusPort int) error { ctx := context.Background() - var wg sync.WaitGroup - wg.Add(1) fmt.Println("start msg transfer", "prometheusPort:", prometheusPort) if prometheusPort <= 0 { return errs.Wrap(errors.New("prometheusPort not correct")) } - go m.historyCH.historyConsumerGroup.RegisterHandleAndConsumer(ctx, m.historyCH) - go m.historyMongoCH.historyConsumerGroup.RegisterHandleAndConsumer(ctx, m.historyMongoCH) + var wg sync.WaitGroup + + wg.Add(1) + go func() { + defer wg.Done() + + m.historyCH.historyConsumerGroup.RegisterHandleAndConsumer(ctx, m.historyCH) + }() + + wg.Add(1) + go func() { + defer wg.Done() + + m.historyMongoCH.historyConsumerGroup.RegisterHandleAndConsumer(ctx, m.historyMongoCH) + }() if config.Config.Prometheus.Enable { - reg := prometheus.NewRegistry() - reg.MustRegister( - collectors.NewGoCollector(), - ) - reg.MustRegister(prommetrics.GetGrpcCusMetrics("Transfer")...) - http.Handle("/metrics", promhttp.HandlerFor(reg, promhttp.HandlerOpts{Registry: reg})) - log.Fatal(http.ListenAndServe(fmt.Sprintf(":%d", prometheusPort), nil)) + go func() { + proreg := prometheus.NewRegistry() + proreg.MustRegister( + collectors.NewGoCollector(), + ) + proreg.MustRegister(prommetrics.GetGrpcCusMetrics("Transfer")...) + http.Handle("/metrics", promhttp.HandlerFor(proreg, promhttp.HandlerOpts{Registry: proreg})) + err := http.ListenAndServe(fmt.Sprintf(":%d", prometheusPort), nil) + if err != nil && err != http.ErrServerClosed { + panic(err) + } + }() } - //////////////////////////////////////// - wg.Wait() + + sigs := make(chan os.Signal, 1) + signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT) + <-sigs + + // graceful close kafka client. + go m.historyCH.historyConsumerGroup.Close() + go m.historyMongoCH.historyConsumerGroup.Close() + + done := make(chan struct{}, 1) + go func() { + wg.Wait() + close(done) + }() + + select { + case <-done: + log.ZInfo(context.Background(), "msgtrasfer exit successfully") + case <-time.After(15 * time.Second): + log.ZError(context.Background(), "msgtransfer force to exit, timeout 15s", nil) + } + return nil } diff --git a/internal/msgtransfer/online_history_msg_handler.go b/internal/msgtransfer/online_history_msg_handler.go index 35af330c9..6678715d4 100644 --- a/internal/msgtransfer/online_history_msg_handler.go +++ b/internal/msgtransfer/online_history_msg_handler.go @@ -19,6 +19,7 @@ import ( "strconv" "strings" "sync" + "sync/atomic" "time" "github.com/openimsdk/open-im-server/v3/pkg/msgprocessor" @@ -431,16 +432,29 @@ func (och *OnlineHistoryRedisConsumerHandler) ConsumeClaim( log.ZDebug(context.Background(), "online new session msg come", "highWaterMarkOffset", claim.HighWaterMarkOffset(), "topic", claim.Topic(), "partition", claim.Partition()) - split := 1000 - rwLock := new(sync.RWMutex) - messages := make([]*sarama.ConsumerMessage, 0, 1000) - ticker := time.NewTicker(time.Millisecond * 100) + var ( + split = 1000 + rwLock = new(sync.RWMutex) + messages = make([]*sarama.ConsumerMessage, 0, 1000) + ticker = time.NewTicker(time.Millisecond * 100) + wg = sync.WaitGroup{} + running = new(atomic.Bool) + ) + + wg.Add(1) go func() { + defer wg.Done() + for { select { case <-ticker.C: + // if the buffer is empty and running is false, return loop. if len(messages) == 0 { + if !running.Load() { + return + } + continue } @@ -473,17 +487,35 @@ func (och *OnlineHistoryRedisConsumerHandler) ConsumeClaim( } }() - for msg := range claim.Messages() { - if len(msg.Value) == 0 { - continue - } + wg.Add(1) + go func() { + defer wg.Done() - rwLock.Lock() - messages = append(messages, msg) - rwLock.Unlock() + for running.Load() { + select { + case msg, ok := <-claim.Messages(): + if !ok { + running.Store(false) + return + } - sess.MarkMessage(msg, "") - } + if len(msg.Value) == 0 { + continue + } + + rwLock.Lock() + messages = append(messages, msg) + rwLock.Unlock() + + sess.MarkMessage(msg, "") + + case <-sess.Context().Done(): + running.Store(false) + return + } + } + }() + wg.Wait() return nil } diff --git a/internal/rpc/auth/auth.go b/internal/rpc/auth/auth.go index ee8ead194..eaf63f868 100644 --- a/internal/rpc/auth/auth.go +++ b/internal/rpc/auth/auth.go @@ -80,6 +80,28 @@ func (s *authServer) UserToken(ctx context.Context, req *pbauth.UserTokenReq) (* return &resp, nil } +func (s *authServer) GetUserToken(ctx context.Context, req *pbauth.GetUserTokenReq) (*pbauth.GetUserTokenResp, error) { + if err := authverify.CheckAdmin(ctx); err != nil { + return nil, err + } + resp := pbauth.GetUserTokenResp{} + + if authverify.IsManagerUserID(req.UserID) { + return nil, errs.ErrNoPermission.Wrap("don't get Admin token") + } + + if _, err := s.userRpcClient.GetUserInfo(ctx, req.UserID); err != nil { + return nil, err + } + token, err := s.authDatabase.CreateToken(ctx, req.UserID, int(req.PlatformID)) + if err != nil { + return nil, err + } + resp.Token = token + resp.ExpireTimeSeconds = config.Config.TokenPolicy.Expire * 24 * 60 * 60 + return &resp, nil +} + func (s *authServer) parseToken(ctx context.Context, tokensString string) (claims *tokenverify.Claims, err error) { claims, err = tokenverify.GetClaimFromToken(tokensString, authverify.Secret()) if err != nil { diff --git a/internal/rpc/conversation/conversaion.go b/internal/rpc/conversation/conversaion.go index 40803089c..3317359e5 100644 --- a/internal/rpc/conversation/conversaion.go +++ b/internal/rpc/conversation/conversaion.go @@ -51,6 +51,11 @@ type conversationServer struct { conversationNotificationSender *notification.ConversationNotificationSender } +func (c *conversationServer) GetConversationNotReceiveMessageUserIDs(ctx context.Context, req *pbconversation.GetConversationNotReceiveMessageUserIDsReq) (*pbconversation.GetConversationNotReceiveMessageUserIDsResp, error) { + //TODO implement me + panic("implement me") +} + func Start(client discoveryregistry.SvcDiscoveryRegistry, server *grpc.Server) error { rdb, err := cache.NewRedis() if err != nil { diff --git a/internal/rpc/group/group.go b/internal/rpc/group/group.go index abc271651..1d068b1b2 100644 --- a/internal/rpc/group/group.go +++ b/internal/rpc/group/group.go @@ -108,6 +108,11 @@ type groupServer struct { msgRpcClient rpcclient.MessageRpcClient } +func (s *groupServer) GetJoinedGroupIDs(ctx context.Context, req *pbgroup.GetJoinedGroupIDsReq) (*pbgroup.GetJoinedGroupIDsResp, error) { + //TODO implement me + panic("implement me") +} + func (s *groupServer) NotificationUserInfoUpdate(ctx context.Context, req *pbgroup.NotificationUserInfoUpdateReq) (*pbgroup.NotificationUserInfoUpdateResp, error) { defer log.ZDebug(ctx, "NotificationUserInfoUpdate return") members, err := s.db.FindGroupMemberUser(ctx, nil, req.UserID) diff --git a/internal/rpc/user/user.go b/internal/rpc/user/user.go index e5567f436..6f9e2949f 100644 --- a/internal/rpc/user/user.go +++ b/internal/rpc/user/user.go @@ -61,6 +61,11 @@ type userServer struct { RegisterCenter registry.SvcDiscoveryRegistry } +func (s *userServer) GetGroupOnlineUser(ctx context.Context, req *pbuser.GetGroupOnlineUserReq) (*pbuser.GetGroupOnlineUserResp, error) { + //TODO implement me + panic("implement me") +} + func Start(client registry.SvcDiscoveryRegistry, server *grpc.Server) error { rdb, err := cache.NewRedis() if err != nil { diff --git a/pkg/authverify/token.go b/pkg/authverify/token.go index 97bb03391..b951bf219 100644 --- a/pkg/authverify/token.go +++ b/pkg/authverify/token.go @@ -48,8 +48,7 @@ func CheckAccessV3(ctx context.Context, ownerUserID string) (err error) { } func IsAppManagerUid(ctx context.Context) bool { - return (len(config.Config.Manager.UserID) > 0 && utils.IsContain(mcontext.GetOpUserID(ctx), config.Config.Manager.UserID)) || - utils.IsContain(mcontext.GetOpUserID(ctx), config.Config.IMAdmin.UserID) + return (len(config.Config.Manager.UserID) > 0 && utils.IsContain(mcontext.GetOpUserID(ctx), config.Config.Manager.UserID)) || utils.IsContain(mcontext.GetOpUserID(ctx), config.Config.IMAdmin.UserID) } func CheckAdmin(ctx context.Context) error { diff --git a/pkg/common/kafka/consumer_group.go b/pkg/common/kafka/consumer_group.go index 6e6f83fca..3f444cc1f 100644 --- a/pkg/common/kafka/consumer_group.go +++ b/pkg/common/kafka/consumer_group.go @@ -16,18 +16,18 @@ package kafka import ( "context" + "errors" + "github.com/IBM/sarama" "strings" - "github.com/OpenIMSDK/tools/errs" - "github.com/OpenIMSDK/tools/log" - "github.com/openimsdk/open-im-server/v3/pkg/common/config" - - "github.com/IBM/sarama" ) type MConsumerGroup struct { + ctx context.Context + cancel context.CancelFunc + sarama.ConsumerGroup groupID string topics []string @@ -54,7 +54,10 @@ func NewMConsumerGroup(consumerConfig *MConsumerGroupConfig, topics, addrs []str if err != nil { return nil, errs.Wrap(err, strings.Join(topics, ","), strings.Join(addrs, ","), groupID, config.Config.Kafka.Username, config.Config.Kafka.Password) } + + ctx, cancel := context.WithCancel(context.Background()) return &MConsumerGroup{ + ctx, cancel, consumerGroup, groupID, topics, @@ -68,7 +71,14 @@ func (mc *MConsumerGroup) GetContextFromMsg(cMsg *sarama.ConsumerMessage) contex func (mc *MConsumerGroup) RegisterHandleAndConsumer(ctx context.Context, handler sarama.ConsumerGroupHandler) { log.ZDebug(context.Background(), "register consumer group", "groupID", mc.groupID) for { - err := mc.ConsumerGroup.Consume(ctx, mc.topics, handler) + err := mc.ConsumerGroup.Consume(mc.ctx, mc.topics, handler) + if errors.Is(err, sarama.ErrClosedConsumerGroup) { + return + } + if mc.ctx.Err() != nil { + return + } + if err != nil { log.ZWarn(ctx, "consume err", err, "topic", mc.topics, "groupID", mc.groupID) } @@ -77,3 +87,8 @@ func (mc *MConsumerGroup) RegisterHandleAndConsumer(ctx context.Context, handler } } } + +func (mc *MConsumerGroup) Close() { + mc.cancel() + mc.ConsumerGroup.Close() +} diff --git a/pkg/common/startrpc/start.go b/pkg/common/startrpc/start.go index 8b1c7d4af..3921c0c2a 100644 --- a/pkg/common/startrpc/start.go +++ b/pkg/common/startrpc/start.go @@ -23,6 +23,7 @@ import ( "net/http" "os" "os/signal" + "path/filepath" "strconv" "sync" "syscall" @@ -134,10 +135,10 @@ func Start( sigs := make(chan os.Signal, 1) signal.Notify(sigs, syscall.SIGUSR1) - select { case <-sigs: - print("receive process terminal SIGUSR1 exit\n") + progName := filepath.Base(os.Args[0]) + print("\n\n%s receive process terminal SIGUSR1 exit 0\n\n", progName) ctx, cancel := context.WithTimeout(context.Background(), 15*time.Second) defer cancel() if err := gracefulStopWithCtx(ctx, srv.GracefulStop); err != nil { diff --git a/scripts/lib/logging.sh b/scripts/lib/logging.sh index 2fa77dd83..8f2bb33cf 100755 --- a/scripts/lib/logging.sh +++ b/scripts/lib/logging.sh @@ -131,16 +131,24 @@ openim::log::error_exit() { exit "${code}" } -# Log an error but keep going. Don't dump the stack or exit. +# Log an error but keep going. Don't dump the stack or exit. openim::log::error() { + # Define red color + red='\033[0;31m' + # No color (reset) + nc='\033[0m' # No Color + timestamp=$(date +"[%Y-%m-%d %H:%M:%S %Z]") - echo_log "!!! ${timestamp} ${1-}" >&2 + # Apply red color for error message + echo_log "${red}!!! ${timestamp} ${1-}${nc}" >&2 shift for message; do - echo_log " ${message}" >&2 + # Apply red color for subsequent lines of the error message + echo_log "${red} ${message}${nc}" >&2 done } + # Print an usage message to stderr. The arguments are printed directly. openim::log::usage() { echo_log >&2 diff --git a/scripts/lib/util.sh b/scripts/lib/util.sh index b93f45205..cace53645 100755 --- a/scripts/lib/util.sh +++ b/scripts/lib/util.sh @@ -486,7 +486,7 @@ openim::util::stop_services_on_ports() { local pid=$(echo $line | awk '{print $2}') # Try to stop the service by killing its process. - if kill -TERM $pid; then + if kill -10 $pid; then stopped+=($port) else not_stopped+=($port) @@ -563,7 +563,7 @@ openim::util::stop_services_with_name() { # If there's a Process ID, it means the service with the name is running. if [[ -n $pid ]]; then # Try to stop the service by killing its process. - if kill -TERM $pid 2>/dev/null; then + if kill -10 $pid 2>/dev/null; then stopped_this_time=true fi fi @@ -1541,12 +1541,8 @@ openim::util::check_ports() { if [[ "$OSTYPE" == "linux-gnu"* ]]; then if command -v ss > /dev/null 2>&1; then info=$(ss -ltnp | grep ":$port" || true) - openim::color::echo $COLOR_RED "!!!!!!!! port=$port" - openim::color::echo $COLOR_RED "!!!!!!!! info=$info" else info=$(netstat -ltnp | grep ":$port" || true) - openim::color::echo $COLOR_RED "!!!!!!!! port=$port" - openim::color::echo $COLOR_RED "!!!!!!!! info=$info" fi elif [[ "$OSTYPE" == "darwin"* ]]; then # For macOS, use lsof @@ -1726,7 +1722,7 @@ openim::util::stop_services_on_ports() { local pid=$(echo $line | awk '{print $2}') # Try to stop the service by killing its process. - if kill -TERM $pid; then + if kill -10 $pid; then stopped+=($port) else not_stopped+=($port) @@ -1803,7 +1799,7 @@ openim::util::stop_services_with_name() { # If there's a Process ID, it means the service with the name is running. if [[ -n $pid ]]; then # Try to stop the service by killing its process. - if kill -TERM $pid 2>/dev/null; then + if kill -10 $pid 2>/dev/null; then stopped_this_time=true fi fi diff --git a/scripts/start-all.sh b/scripts/start-all.sh index 5f34cbdbe..ca03f0c3c 100755 --- a/scripts/start-all.sh +++ b/scripts/start-all.sh @@ -82,4 +82,4 @@ execute_scripts openim::log::info "\n## Post Starting OpenIM services" ${TOOLS_START_SCRIPTS_PATH} openim::tools::post-start -openim::log::success "✨ All OpenIM services have been successfully started!" \ No newline at end of file +openim::color::echo $COLOR_BLUE "✨ All OpenIM services have been successfully started!" \ No newline at end of file