From e503229f6bd5d842b7dff45554da08dff5a1785d Mon Sep 17 00:00:00 2001 From: "Xinwei Xiong (cubxxw)" <3293172751nss@gmail.com> Date: Fri, 15 Mar 2024 12:24:59 +0800 Subject: [PATCH] fix: fix tools erros code --- internal/rpc/msg/as_read.go | 10 ++---- internal/rpc/msg/server.go | 42 ++++++++++++++------------ internal/rpc/msg/sync_msg.go | 7 ++--- internal/rpc/msg/verify.go | 12 +++----- internal/rpc/third/log.go | 6 ++-- pkg/authverify/token.go | 8 ++--- pkg/util/genutil/genutil.go | 11 +++++-- tools/versionchecker/versionchecker.go | 6 ++-- 8 files changed, 48 insertions(+), 54 deletions(-) diff --git a/internal/rpc/msg/as_read.go b/internal/rpc/msg/as_read.go index c6041d912..faa1ad411 100644 --- a/internal/rpc/msg/as_read.go +++ b/internal/rpc/msg/as_read.go @@ -38,6 +38,7 @@ func (m *msgServer) GetConversationsHasReadAndMaxSeq(ctx context.Context, req *m } else { conversationIDs = req.ConversationIDs } + hasReadSeqs, err := m.MsgDatabase.GetHasReadSeqs(ctx, req.UserID, conversationIDs) if err != nil { return nil, err @@ -207,14 +208,7 @@ func (m *msgServer) MarkConversationAsRead(ctx context.Context, req *msg.MarkCon return &msg.MarkConversationAsReadResp{}, nil } -func (m *msgServer) sendMarkAsReadNotification( - ctx context.Context, - conversationID string, - sessionType int32, - sendID, recvID string, - seqs []int64, - hasReadSeq int64, -) error { +func (m *msgServer) sendMarkAsReadNotification(ctx context.Context, conversationID string, sessionType int32, sendID, recvID string, seqs []int64, hasReadSeq int64) error { tips := &sdkws.MarkAsReadTips{ MarkAsReadUserID: sendID, ConversationID: conversationID, diff --git a/internal/rpc/msg/server.go b/internal/rpc/msg/server.go index cc4792e87..f30cd8ab2 100644 --- a/internal/rpc/msg/server.go +++ b/internal/rpc/msg/server.go @@ -30,18 +30,21 @@ import ( ) type ( + // MessageInterceptorChain defines a chain of message interceptor functions. MessageInterceptorChain []MessageInterceptorFunc - msgServer struct { - RegisterCenter discoveryregistry.SvcDiscoveryRegistry - MsgDatabase controller.CommonMsgDatabase - Conversation *rpcclient.ConversationRpcClient - UserLocalCache *rpccache.UserLocalCache - FriendLocalCache *rpccache.FriendLocalCache - GroupLocalCache *rpccache.GroupLocalCache - ConversationLocalCache *rpccache.ConversationLocalCache - Handlers MessageInterceptorChain - notificationSender *rpcclient.NotificationSender - config *config.GlobalConfig + + // msgServer encapsulates dependencies required for message handling. + msgServer struct { + RegisterCenter discoveryregistry.SvcDiscoveryRegistry // Service discovery registry for service registration. + MsgDatabase controller.CommonMsgDatabase // Interface for message database operations. + Conversation *rpcclient.ConversationRpcClient // RPC client for conversation service. + UserLocalCache *rpccache.UserLocalCache // Local cache for user data. + FriendLocalCache *rpccache.FriendLocalCache // Local cache for friend data. + GroupLocalCache *rpccache.GroupLocalCache // Local cache for group data. + ConversationLocalCache *rpccache.ConversationLocalCache // Local cache for conversation data. + Handlers MessageInterceptorChain // Chain of handlers for processing messages. + notificationSender *rpcclient.NotificationSender // RPC client for sending notifications. + config *config.GlobalConfig // Global configuration settings. } ) @@ -61,24 +64,24 @@ func (m *msgServer) addInterceptorHandler(interceptorFunc ...MessageInterceptorF //} func Start(config *config.GlobalConfig, client discoveryregistry.SvcDiscoveryRegistry, server *grpc.Server) error { - rdb, err := cache.NewRedis(config) + rdb, err := cache.NewRedis(&config.Redis) if err != nil { return err } - mongo, err := unrelation.NewMongo(config) + mongo, err := unrelation.NewMongo(&config.Mongo) if err != nil { return err } if err := mongo.CreateMsgIndex(); err != nil { return err } - cacheModel := cache.NewMsgCacheModel(rdb, config) + cacheModel := cache.NewMsgCacheModel(rdb, config.MsgCacheTimeout, &config.Redis) msgDocModel := unrelation.NewMsgMongoDriver(mongo.GetDatabase(config.Mongo.Database)) - conversationClient := rpcclient.NewConversationRpcClient(client, config) + conversationClient := rpcclient.NewConversationRpcClient(client, config.RpcRegisterName.OpenImConversationName) userRpcClient := rpcclient.NewUserRpcClient(client, config.RpcRegisterName.OpenImUserName, &config.Manager, &config.IMAdmin) - groupRpcClient := rpcclient.NewGroupRpcClient(client, config) - friendRpcClient := rpcclient.NewFriendRpcClient(client, config) - msgDatabase, err := controller.NewCommonMsgDatabase(msgDocModel, cacheModel, config) + groupRpcClient := rpcclient.NewGroupRpcClient(client, config.RpcRegisterName.OpenImGroupName) + friendRpcClient := rpcclient.NewFriendRpcClient(client, config.RpcRegisterName.OpenImFriendName) + msgDatabase, err := controller.NewCommonMsgDatabase(msgDocModel, cacheModel, &config.Kafka) if err != nil { return err } @@ -92,7 +95,8 @@ func Start(config *config.GlobalConfig, client discoveryregistry.SvcDiscoveryReg FriendLocalCache: rpccache.NewFriendLocalCache(friendRpcClient, rdb), config: config, } - s.notificationSender = rpcclient.NewNotificationSender(config, rpcclient.WithLocalSendMsg(s.SendMsg)) + + s.notificationSender = rpcclient.NewNotificationSender(&config.Notification, rpcclient.WithLocalSendMsg(s.SendMsg)) s.addInterceptorHandler(MessageHasReadEnabled) msg.RegisterMsgServer(server, s) return nil diff --git a/internal/rpc/msg/sync_msg.go b/internal/rpc/msg/sync_msg.go index 542b31ea6..2f25b0a11 100644 --- a/internal/rpc/msg/sync_msg.go +++ b/internal/rpc/msg/sync_msg.go @@ -27,10 +27,7 @@ import ( "github.com/openimsdk/open-im-server/v3/pkg/msgprocessor" ) -func (m *msgServer) PullMessageBySeqs( - ctx context.Context, - req *sdkws.PullMessageBySeqsReq, -) (*sdkws.PullMessageBySeqsResp, error) { +func (m *msgServer) PullMessageBySeqs(ctx context.Context, req *sdkws.PullMessageBySeqsReq) (*sdkws.PullMessageBySeqsResp, error) { resp := &sdkws.PullMessageBySeqsResp{} resp.Msgs = make(map[string]*sdkws.PullMsgs) resp.NotificationMsgs = make(map[string]*sdkws.PullMsgs) @@ -89,7 +86,7 @@ func (m *msgServer) PullMessageBySeqs( } func (m *msgServer) GetMaxSeq(ctx context.Context, req *sdkws.GetMaxSeqReq) (*sdkws.GetMaxSeqResp, error) { - if err := authverify.CheckAccessV3(ctx, req.UserID, m.config); err != nil { + if err := authverify.CheckAccessV3(ctx, req.UserID, &m.config.Manager, &m.config.IMAdmin); err != nil { return nil, err } conversationIDs, err := m.ConversationLocalCache.GetConversationIDs(ctx, req.UserID) diff --git a/internal/rpc/msg/verify.go b/internal/rpc/msg/verify.go index 5bfd7013e..53510d058 100644 --- a/internal/rpc/msg/verify.go +++ b/internal/rpc/msg/verify.go @@ -26,6 +26,7 @@ import ( "github.com/OpenIMSDK/tools/errs" "github.com/OpenIMSDK/tools/log" "github.com/OpenIMSDK/tools/utils" + "github.com/openimsdk/open-im-server/v3/pkg/util/genutil" ) var ExcludeContentType = []int{constant.HasReadReceipt} @@ -111,7 +112,7 @@ func (m *msgServer) messageVerification(ctx context.Context, data *msg.SendMsgRe groupMemberInfo, err := m.GroupLocalCache.GetGroupMember(ctx, data.MsgData.GroupID, data.MsgData.SendID) if err != nil { if errs.ErrRecordNotFound.Is(err) { - return errs.ErrNotInGroupYet.Wrap(err.Error()) + return errs.ErrNotInGroupYet.WrapMsg(err.Error()) } return err } @@ -178,16 +179,11 @@ func (m *msgServer) encapsulateMsgData(msg *sdkws.MsgData) { } func GetMsgID(sendID string) string { - t := time.Now().Format("2006-01-02 15:04:05") + t := genutil.GetCurrentTimeFormatted() return utils.Md5(t + "-" + sendID + "-" + strconv.Itoa(rand.Int())) } -func (m *msgServer) modifyMessageByUserMessageReceiveOpt( - ctx context.Context, - userID, conversationID string, - sessionType int, - pb *msg.SendMsgReq, -) (bool, error) { +func (m *msgServer) modifyMessageByUserMessageReceiveOpt(ctx context.Context, userID, conversationID string, sessionType int, pb *msg.SendMsgReq) (bool, error) { defer log.ZDebug(ctx, "modifyMessageByUserMessageReceiveOpt return") opt, err := m.UserLocalCache.GetUserGlobalMsgRecvOpt(ctx, userID) if err != nil { diff --git a/internal/rpc/third/log.go b/internal/rpc/third/log.go index 07ff9076f..4ef1cd076 100644 --- a/internal/rpc/third/log.go +++ b/internal/rpc/third/log.go @@ -83,7 +83,7 @@ func (t *thirdServer) UploadLogs(ctx context.Context, req *third.UploadLogsReq) } func (t *thirdServer) DeleteLogs(ctx context.Context, req *third.DeleteLogsReq) (*third.DeleteLogsResp, error) { - if err := authverify.CheckAdmin(ctx, t.config); err != nil { + if err := authverify.CheckAdmin(ctx, &t.config.Manager, &t.config.IMAdmin); err != nil { return nil, err } userID := "" @@ -96,7 +96,7 @@ func (t *thirdServer) DeleteLogs(ctx context.Context, req *third.DeleteLogsReq) logIDs = append(logIDs, log.LogID) } if ids := utils2.Single(req.LogIDs, logIDs); len(ids) > 0 { - return nil, errs.ErrRecordNotFound.Wrap(fmt.Sprintf("logIDs not found%#v", ids)) + return nil, errs.ErrRecordNotFound.WrapMsg(fmt.Sprintf("logIDs not found%#v", ids)) } err = t.thirdDatabase.DeleteLogs(ctx, req.LogIDs, userID) if err != nil { @@ -124,7 +124,7 @@ func dbToPbLogInfos(logs []*relationtb.LogModel) []*third.LogInfo { } func (t *thirdServer) SearchLogs(ctx context.Context, req *third.SearchLogsReq) (*third.SearchLogsResp, error) { - if err := authverify.CheckAdmin(ctx, t.config); err != nil { + if err := authverify.CheckAdmin(ctx, &t.config.Manager, &t.config.IMAdmin); err != nil { return nil, err } var ( diff --git a/pkg/authverify/token.go b/pkg/authverify/token.go index 139881678..7bc839dd3 100644 --- a/pkg/authverify/token.go +++ b/pkg/authverify/token.go @@ -59,7 +59,7 @@ func CheckAdmin(ctx context.Context, manager *config.Manager, imAdmin *config.IM if utils.Contain(mcontext.GetOpUserID(ctx), imAdmin.UserID...) { return nil } - return errs.ErrNoPermission.Wrap(fmt.Sprintf("user %s is not admin userID", mcontext.GetOpUserID(ctx))) + return errs.ErrNoPermission.WrapMsg(fmt.Sprintf("user %s is not admin userID", mcontext.GetOpUserID(ctx))) } func CheckIMAdmin(ctx context.Context, config *config.GlobalConfig) error { if utils.Contain(mcontext.GetOpUserID(ctx), config.IMAdmin.UserID...) { @@ -68,7 +68,7 @@ func CheckIMAdmin(ctx context.Context, config *config.GlobalConfig) error { if len(config.Manager.UserID) > 0 && utils.Contain(mcontext.GetOpUserID(ctx), config.Manager.UserID...) { return nil } - return errs.ErrNoPermission.Wrap(fmt.Sprintf("user %s is not CheckIMAdmin userID", mcontext.GetOpUserID(ctx))) + return errs.ErrNoPermission.WrapMsg(fmt.Sprintf("user %s is not CheckIMAdmin userID", mcontext.GetOpUserID(ctx))) } func ParseRedisInterfaceToken(redisToken any, secret string) (*tokenverify.Claims, error) { @@ -85,10 +85,10 @@ func WsVerifyToken(token, userID, secret string, platformID int) error { return err } if claim.UserID != userID { - return errs.ErrTokenInvalid.Wrap(fmt.Sprintf("token uid %s != userID %s", claim.UserID, userID)) + return errs.ErrTokenInvalid.WrapMsg(fmt.Sprintf("token uid %s != userID %s", claim.UserID, userID)) } if claim.PlatformID != platformID { - return errs.ErrTokenInvalid.Wrap(fmt.Sprintf("token platform %d != %d", claim.PlatformID, platformID)) + return errs.ErrTokenInvalid.WrapMsg(fmt.Sprintf("token platform %d != %d", claim.PlatformID, platformID)) } return nil } diff --git a/pkg/util/genutil/genutil.go b/pkg/util/genutil/genutil.go index 95735485d..01b5dfe1f 100644 --- a/pkg/util/genutil/genutil.go +++ b/pkg/util/genutil/genutil.go @@ -18,6 +18,7 @@ import ( "fmt" "os" "path/filepath" + "time" "github.com/OpenIMSDK/tools/errs" ) @@ -27,21 +28,25 @@ import ( func OutDir(path string) (string, error) { outDir, err := filepath.Abs(path) if err != nil { - return "", errs.Wrap(err, "output directory %s does not exist", path) + return "", errs.WrapMsg(err, "output directory %s does not exist", path) } stat, err := os.Stat(outDir) if err != nil { - return "", errs.Wrap(err, "output directory %s does not exist", outDir) + return "", errs.WrapMsg(err, "output directory %s does not exist", outDir) } if !stat.IsDir() { - return "", errs.Wrap(err, "output directory %s is not a directory", outDir) + return "", errs.WrapMsg(err, "output directory %s is not a directory", outDir) } outDir += "/" return outDir, nil } +func GetCurrentTimeFormatted() string { + return time.Now().Format("2006-01-02 15:04:05") +} + func ExitWithError(err error) { progName := filepath.Base(os.Args[0]) fmt.Fprintf(os.Stderr, "%s exit -1: %+v\n", progName, err) diff --git a/tools/versionchecker/versionchecker.go b/tools/versionchecker/versionchecker.go index 12254b58e..5dd25ddc4 100644 --- a/tools/versionchecker/versionchecker.go +++ b/tools/versionchecker/versionchecker.go @@ -19,10 +19,9 @@ import ( "fmt" "os/exec" "runtime" - "time" "github.com/fatih/color" - //"github.com/openimsdk/open-im-server/v3/pkg/common/version" + "github.com/openimsdk/open-im-server/v3/pkg/util/genutil" ) func ExecuteCommand(cmdName string, args ...string) (string, error) { @@ -40,8 +39,7 @@ func ExecuteCommand(cmdName string, args ...string) (string, error) { } func printTime() string { - currentTime := time.Now() - formattedTime := currentTime.Format("2006-01-02 15:04:05") + formattedTime := genutil.GetCurrentTimeFormatted() return fmt.Sprintf("Current Date & Time: %s", formattedTime) }