From f91d8050ce68ee1c1620c718a9dbe4dfef67b0e6 Mon Sep 17 00:00:00 2001 From: "Xinwei Xiong (cubxxw)" <3293172751nss@gmail.com> Date: Thu, 29 Feb 2024 17:44:14 +0800 Subject: [PATCH] feat: add openim auto format code --- internal/api/route.go | 91 ++++++++++--------- internal/api/user.go | 56 ++++++------ internal/push/offlinepush/fcm/push.go | 40 +++----- internal/push/push_to_client.go | 2 +- pkg/common/cmd/api.go | 17 +++- pkg/common/cmd/msg_transfer.go | 12 ++- pkg/common/cmd/root.go | 2 +- pkg/common/cmd/rpc.go | 2 +- pkg/common/config/parse.go | 2 +- pkg/common/convert/friend.go | 3 +- pkg/common/db/controller/friend.go | 4 +- pkg/common/db/table/relation/group.go | 4 +- pkg/common/db/table/relation/user.go | 4 +- pkg/common/db/unrelation/msg.go | 37 +++----- .../discoveryregister/direct/directconn.go | 5 +- .../discoveryregister/discoveryregister.go | 3 +- pkg/common/http/http_client.go | 8 +- pkg/common/kafka/producer.go | 4 +- pkg/common/kafka/util.go | 3 +- pkg/common/startrpc/start.go | 2 +- pkg/util/genutil/genutil.go | 8 +- 21 files changed, 157 insertions(+), 152 deletions(-) diff --git a/internal/api/route.go b/internal/api/route.go index b55138691..f1dd1df1a 100644 --- a/internal/api/route.go +++ b/internal/api/route.go @@ -233,51 +233,60 @@ func GinParseToken(rdb redis.UniversalClient) gin.HandlerFunc { config.Config.TokenPolicy.Expire, ) return func(c *gin.Context) { - if c.Request.Method != http.MethodPost { - c.Next() - return - } - - token := c.Request.Header.Get(constant.Token) - if token == "" { - handleGinError(c, "header get token error", errs.ErrArgs, "header must have token") - return - } - - claims, err := tokenverify.GetClaimFromToken(token, authverify.Secret()) - if err != nil { - handleGinError(c, "jwt get token error", errs.ErrTokenUnknown, "") - return - } - - m, err := dataBase.GetTokensWithoutError(c, claims.UserID, claims.PlatformID) - if err != nil || len(m) == 0 { - handleGinError(c, "cache get token error", errs.ErrTokenNotExist, "") - return - } - - if v, ok := m[token]; ok { - if v == constant.KickedToken { - handleGinError(c, "cache kicked token error", errs.ErrTokenKicked, "") + switch c.Request.Method { + case http.MethodPost: + token := c.Request.Header.Get(constant.Token) + if token == "" { + log.ZWarn(c, "header get token error", errs.ErrArgs.Wrap("header must have token")) + apiresp.GinError(c, errs.ErrArgs.Wrap("header must have token")) + c.Abort() return - } else if v != constant.NormalToken { - handleGinError(c, "cache unknown token error", errs.ErrTokenUnknown, "") + } + claims, err := tokenverify.GetClaimFromToken(token, authverify.Secret()) + if err != nil { + log.ZWarn(c, "jwt get token error", errs.ErrTokenUnknown.Wrap()) + apiresp.GinError(c, errs.ErrTokenUnknown.Wrap()) + c.Abort() + return + } + m, err := dataBase.GetTokensWithoutError(c, claims.UserID, claims.PlatformID) + if err != nil { + log.ZWarn(c, "cache get token error", errs.ErrTokenNotExist.Wrap()) + apiresp.GinError(c, errs.ErrTokenNotExist.Wrap()) + c.Abort() + return + } + if len(m) == 0 { + log.ZWarn(c, "cache do not exist token error", errs.ErrTokenNotExist.Wrap()) + apiresp.GinError(c, errs.ErrTokenNotExist.Wrap()) + c.Abort() + } + if v, ok := m[token]; ok { + switch v { + case constant.NormalToken: + case constant.KickedToken: + log.ZWarn(c, "cache kicked token error", errs.ErrTokenKicked.Wrap()) + apiresp.GinError(c, errs.ErrTokenKicked.Wrap()) + c.Abort() + return + default: + log.ZWarn(c, "cache unknown token error", errs.ErrTokenUnknown.Wrap()) + apiresp.GinError(c, errs.ErrTokenUnknown.Wrap()) + c.Abort() + return + } + } else { + apiresp.GinError(c, errs.ErrTokenNotExist.Wrap()) + c.Abort() return } - } else { - handleGinError(c, "token does not exist error", errs.ErrTokenNotExist, "") - return } - - c.Set(constant.OpUserPlatform, constant.PlatformIDToName(claims.PlatformID)) - c.Set(constant.OpUserID, claims.UserID) - c.Next() } } -// handleGinError logs and returns an error response through Gin context. -func handleGinError(c *gin.Context, logMessage string, errType errs.CodeError, detail string) { - wrappedErr := errType.Wrap(detail) - apiresp.GinError(c, wrappedErr) - c.Abort() -} +// // handleGinError logs and returns an error response through Gin context. +// func handleGinError(c *gin.Context, logMessage string, errType errs.CodeError, detail string) { +// wrappedErr := errType.Wrap(detail) +// apiresp.GinError(c, wrappedErr) +// c.Abort() +// } diff --git a/internal/api/user.go b/internal/api/user.go index 0f3075cc8..901998319 100644 --- a/internal/api/user.go +++ b/internal/api/user.go @@ -64,57 +64,61 @@ func (u *UserApi) GetUsers(c *gin.Context) { a2r.Call(user.UserClient.GetPaginationUsers, u.Client, c) } -// GetUsersOnlineStatus retrieves the online status of users. +// GetUsersOnlineStatus Get user online status. func (u *UserApi) GetUsersOnlineStatus(c *gin.Context) { var req msggateway.GetUsersOnlineStatusReq if err := c.BindJSON(&req); err != nil { apiresp.GinError(c, err) return } - conns, err := u.Discov.GetConns(c, config.Config.RpcRegisterName.OpenImMessageGatewayName) if err != nil { apiresp.GinError(c, err) return } - wsResult := make([]*msggateway.GetUsersOnlineStatusResp_SuccessResult, 0) - for _, conn := range conns { - msgClient := msggateway.NewMsgGatewayClient(conn) + var wsResult []*msggateway.GetUsersOnlineStatusResp_SuccessResult + var respResult []*msggateway.GetUsersOnlineStatusResp_SuccessResult + flag := false + + // Online push message + for _, v := range conns { + msgClient := msggateway.NewMsgGatewayClient(v) reply, err := msgClient.GetUsersOnlineStatus(c, &req) if err != nil { log.ZDebug(c, "GetUsersOnlineStatus rpc error", err) - if apiresp.ParseError(err).ErrCode == errs.NoPermissionError { - apiresp.GinError(c, errs.Wrap(err)) + + parseError := apiresp.ParseError(err) + if parseError.ErrCode == errs.NoPermissionError { + apiresp.GinError(c, err) return } - continue + } else { + wsResult = append(wsResult, reply.SuccessResult...) } - wsResult = append(wsResult, reply.SuccessResult...) } - - respResult := compileResults(req.UserIDs, wsResult) - apiresp.GinSuccess(c, respResult) -} - -// compileResults aggregates online status results for the provided userIDs. -func compileResults(userIDs []string, wsResult []*msggateway.GetUsersOnlineStatusResp_SuccessResult) []*msggateway.GetUsersOnlineStatusResp_SuccessResult { - respResult := make([]*msggateway.GetUsersOnlineStatusResp_SuccessResult, 0, len(userIDs)) - for _, userID := range userIDs { - res := &msggateway.GetUsersOnlineStatusResp_SuccessResult{ - UserID: userID, - Status: constant.OfflineStatus, // Default to offline - } - for _, result := range wsResult { - if result.UserID == userID { + // Traversing the userIDs in the api request body + for _, v1 := range req.UserIDs { + flag = false + res := new(msggateway.GetUsersOnlineStatusResp_SuccessResult) + // Iterate through the online results fetched from various gateways + for _, v2 := range wsResult { + // If matches the above description on the line, and vice versa + if v2.UserID == v1 { + flag = true + res.UserID = v1 res.Status = constant.OnlineStatus - res.DetailPlatformStatus = append(res.DetailPlatformStatus, result.DetailPlatformStatus...) + res.DetailPlatformStatus = append(res.DetailPlatformStatus, v2.DetailPlatformStatus...) break } } + if !flag { + res.UserID = v1 + res.Status = constant.OfflineStatus + } respResult = append(respResult, res) } - return respResult + apiresp.GinSuccess(c, respResult) } func (u *UserApi) UserRegisterCount(c *gin.Context) { diff --git a/internal/push/offlinepush/fcm/push.go b/internal/push/offlinepush/fcm/push.go index 1bbba4bd0..a60570860 100644 --- a/internal/push/offlinepush/fcm/push.go +++ b/internal/push/offlinepush/fcm/push.go @@ -16,7 +16,6 @@ package fcm import ( "context" - "fmt" "path/filepath" firebase "firebase.google.com/go" @@ -42,34 +41,23 @@ type Fcm struct { // NewClient initializes a new FCM client using the Firebase Admin SDK. // It requires the FCM service account credentials file located within the project's configuration directory. -// The function returns an Fcm pointer on success, or nil and an error if initialization fails. -func NewClient(cache cache.MsgModel) (*Fcm, error) { - // Attempt to get the project root directory. - projectRoot, err := config.GetProjectRoot() - if err != nil { - return nil, fmt.Errorf("failed to get project root: %w", err) - } - - credentialsFilePath := filepath.Join(projectRoot, "config", config.Config.Push.Fcm.ServiceAccount) - opt := option.WithCredentialsFile(credentialsFilePath) - - // Initialize the Firebase app with the specified service account credentials. - fcmApp, err := firebase.NewApp(context.Background(), nil, opt) - if err != nil { - return nil, fmt.Errorf("failed to initialize Firebase app: %w", err) - } - - // Obtain the messaging client from the Firebase app. - ctx := context.Background() - fcmMsgClient, err := fcmApp.Messaging(ctx) - if err != nil { - return nil, fmt.Errorf("failed to get Firebase messaging client: %w", err) - } +func NewClient(cache cache.MsgModel) *Fcm { + projectRoot, _ := config.GetProjectRoot() + credentialsFilePath := filepath.Join(projectRoot, "config", config.Config.Push.Fcm.ServiceAccount) + opt := option.WithCredentialsFile(credentialsFilePath) + fcmApp, err := firebase.NewApp(context.Background(), nil, opt) + if err != nil { + return nil + } + ctx := context.Background() + fcmMsgClient, err := fcmApp.Messaging(ctx) + if err != nil { + return nil + } - return &Fcm{fcmMsgCli: fcmMsgClient, cache: cache}, nil + return &Fcm{fcmMsgCli: fcmMsgClient, cache: cache} } - func (f *Fcm) Push(ctx context.Context, userIDs []string, title, content string, opts *offlinepush.Opts) error { // accounts->registrationToken allTokens := make(map[string][]string, 0) diff --git a/internal/push/push_to_client.go b/internal/push/push_to_client.go index 9869da5e5..5fce34e83 100644 --- a/internal/push/push_to_client.go +++ b/internal/push/push_to_client.go @@ -82,7 +82,7 @@ func NewOfflinePusher(cache cache.MsgModel) offlinepush.OfflinePusher { case "getui": offlinePusher = getui.NewClient(cache) case "fcm": - offlinePusher, _ = fcm.NewClient(cache) + offlinePusher = fcm.NewClient(cache) case "jpush": offlinePusher = jpush.NewClient() default: diff --git a/pkg/common/cmd/api.go b/pkg/common/cmd/api.go index 64251a422..7156ce6c4 100644 --- a/pkg/common/cmd/api.go +++ b/pkg/common/cmd/api.go @@ -15,6 +15,9 @@ package cmd import ( + "errors" + "fmt" + "github.com/OpenIMSDK/protocol/constant" "github.com/spf13/cobra" @@ -50,11 +53,17 @@ func (a *ApiCmd) AddApi(f func(port int, promPort int) error) { } } -func (a *ApiCmd) GetPortFromConfig(portType string) (int,) { +func (a *ApiCmd) GetPortFromConfig(portType string) (int, error) { if portType == constant.FlagPort { - return config2.Config.Api.OpenImApiPort[0] + if len(config2.Config.Api.OpenImApiPort) > 0 { + return config2.Config.Api.OpenImApiPort[0], nil + } + return 0, errors.New("API port configuration is empty or missing") } else if portType == constant.FlagPrometheusPort { - return config2.Config.Prometheus.ApiPrometheusPort[0] + if len(config2.Config.Prometheus.ApiPrometheusPort) > 0 { + return config2.Config.Prometheus.ApiPrometheusPort[0], nil + } + return 0, errors.New("Prometheus port configuration is empty or missing") } - return 0 + return 0, fmt.Errorf("unknown port type: %s", portType) } diff --git a/pkg/common/cmd/msg_transfer.go b/pkg/common/cmd/msg_transfer.go index e57bab89d..a96550e2a 100644 --- a/pkg/common/cmd/msg_transfer.go +++ b/pkg/common/cmd/msg_transfer.go @@ -46,14 +46,18 @@ func (m *MsgTransferCmd) Exec() error { return m.Execute() } -func (m *MsgTransferCmd) GetPortFromConfig(portType string) int { +func (m *MsgTransferCmd) GetPortFromConfig(portType string) (int, error) { if portType == constant.FlagPort { - return 0 + return 0, nil } else if portType == constant.FlagPrometheusPort { n := m.getTransferProgressFlagValue() - return config2.Config.Prometheus.MessageTransferPrometheusPort[n] + + if n < len(config2.Config.Prometheus.MessageTransferPrometheusPort) { + return config2.Config.Prometheus.MessageTransferPrometheusPort[n], nil + } + return 0, fmt.Errorf("index out of range for MessageTransferPrometheusPort with index %d", n) } - return 0 + return 0, fmt.Errorf("unknown port type: %s", portType) } func (m *MsgTransferCmd) AddTransferProgressFlag() { diff --git a/pkg/common/cmd/root.go b/pkg/common/cmd/root.go index 726aed05e..3446cddd4 100644 --- a/pkg/common/cmd/root.go +++ b/pkg/common/cmd/root.go @@ -133,7 +133,7 @@ func (r *RootCmd) AddPortFlag() { r.Command.Flags().IntP(constant.FlagPort, "p", 0, "server listen port") } -func (r *RootCmd) getPortFlag(cmd *cobra.Command) (int, error) { +func (r *RootCmd) hubgetPortFlag(cmd *cobra.Command) (int, error) { port, err := cmd.Flags().GetInt(constant.FlagPort) if err != nil { // Wrapping the error with additional context diff --git a/pkg/common/cmd/rpc.go b/pkg/common/cmd/rpc.go index 2e9f7c5a0..e27535ab1 100644 --- a/pkg/common/cmd/rpc.go +++ b/pkg/common/cmd/rpc.go @@ -49,7 +49,7 @@ func (a *RpcCmd) Exec() error { var prometheusPort, err = a.getPrometheusPortFlag(cmd) if err != nil { - return errs.Wrap(err, "Failed to get Prometheus port") + return err } a.prometheusPort = prometheusPort diff --git a/pkg/common/config/parse.go b/pkg/common/config/parse.go index 2233f6d0d..06b11a6a5 100644 --- a/pkg/common/config/parse.go +++ b/pkg/common/config/parse.go @@ -65,7 +65,7 @@ func GetProjectRoot() (string, error) { // Attempt to compute the project root by navigating up from the executable's directory projectRoot, err := genutil.OutDir(filepath.Join(filepath.Dir(executablePath), "../../../../..")) if err != nil { - return "", errs.Wrap(err, "failed to determine project root directory") + return "", err } return projectRoot, nil diff --git a/pkg/common/convert/friend.go b/pkg/common/convert/friend.go index ed4456e35..0e9a05766 100644 --- a/pkg/common/convert/friend.go +++ b/pkg/common/convert/friend.go @@ -85,8 +85,7 @@ func FriendsDB2Pb(ctx context.Context, friendsDB []*relation.FriendModel, getUse } -func FriendRequestDB2Pb( - ctx context.Context, +func FriendRequestDB2Pb(ctx context.Context, friendRequests []*relation.FriendRequestModel, getUsers func(ctx context.Context, userIDs []string) (map[string]*sdkws.UserInfo, error), ) ([]*sdkws.FriendRequest, error) { diff --git a/pkg/common/db/controller/friend.go b/pkg/common/db/controller/friend.go index 3b98f5d7b..e3d494c7a 100644 --- a/pkg/common/db/controller/friend.go +++ b/pkg/common/db/controller/friend.go @@ -126,11 +126,11 @@ func (f *friendDatabase) AddFriendRequest(ctx context.Context, fromUserID, toUse }) } -// (1)先判断是否在好友表 (在不在都不返回错误) (2)对于不在好友列表的 插入即可. +// (1) First determine whether it is in the friends list (in or out does not return an error) (2) for not in the friends list can be inserted. func (f *friendDatabase) BecomeFriends(ctx context.Context, ownerUserID string, friendUserIDs []string, addSource int32) (err error) { return f.tx.Transaction(ctx, func(ctx context.Context) error { cache := f.cache.NewCache() - // 先find 找出重复的 去掉重复的 + // User find friends fs1, err := f.friend.FindFriends(ctx, ownerUserID, friendUserIDs) if err != nil { return err diff --git a/pkg/common/db/table/relation/group.go b/pkg/common/db/table/relation/group.go index 57d6b1d62..1f969cd4f 100644 --- a/pkg/common/db/table/relation/group.go +++ b/pkg/common/db/table/relation/group.go @@ -46,8 +46,8 @@ type GroupModelInterface interface { Find(ctx context.Context, groupIDs []string) (groups []*GroupModel, err error) Take(ctx context.Context, groupID string) (group *GroupModel, err error) Search(ctx context.Context, keyword string, pagination pagination.Pagination) (total int64, groups []*GroupModel, err error) - // 获取群总数 + // Get Group total quantity CountTotal(ctx context.Context, before *time.Time) (count int64, err error) - // 获取范围内群增量 + // Get Group total quantity every day CountRangeEverydayTotal(ctx context.Context, start time.Time, end time.Time) (map[string]int64, error) } diff --git a/pkg/common/db/table/relation/user.go b/pkg/common/db/table/relation/user.go index dbb2ff464..265c3c93a 100644 --- a/pkg/common/db/table/relation/user.go +++ b/pkg/common/db/table/relation/user.go @@ -62,9 +62,9 @@ type UserModelInterface interface { Exist(ctx context.Context, userID string) (exist bool, err error) GetAllUserID(ctx context.Context, pagination pagination.Pagination) (count int64, userIDs []string, err error) GetUserGlobalRecvMsgOpt(ctx context.Context, userID string) (opt int, err error) - // 获取用户总数 + // Get user total quantity CountTotal(ctx context.Context, before *time.Time) (count int64, err error) - // 获取范围内用户增量 + // Get user total quantity every day CountRangeEverydayTotal(ctx context.Context, start time.Time, end time.Time) (map[string]int64, error) //CRUD user command AddUserCommand(ctx context.Context, userID string, Type int32, UUID string, value string, ex string) error diff --git a/pkg/common/db/unrelation/msg.go b/pkg/common/db/unrelation/msg.go index bc9118a9a..526e3c327 100644 --- a/pkg/common/db/unrelation/msg.go +++ b/pkg/common/db/unrelation/msg.go @@ -122,13 +122,7 @@ func (m *MsgMongoDriver) UpdateMsgContent(ctx context.Context, docID string, ind return nil } -func (m *MsgMongoDriver) UpdateMsgStatusByIndexInOneDoc( - ctx context.Context, - docID string, - msg *sdkws.MsgData, - seqIndex int, - status int32, -) error { +func (m *MsgMongoDriver) UpdateMsgStatusByIndexInOneDoc(ctx context.Context, docID string, msg *sdkws.MsgData, seqIndex int, status int32) error { msg.Status = status bytes, err := proto.Marshal(msg) if err != nil { @@ -140,7 +134,7 @@ func (m *MsgMongoDriver) UpdateMsgStatusByIndexInOneDoc( bson.M{"$set": bson.M{fmt.Sprintf("msgs.%d.msg", seqIndex): bytes}}, ) if err != nil { - return errs.Wrap(err) + return errs.Wrap(err, fmt.Sprintf("docID is %s, seqIndex is %d", docID, seqIndex)) } return nil } @@ -166,7 +160,7 @@ func (m *MsgMongoDriver) GetMsgDocModelByIndex( findOpts, ) if err != nil { - return nil, errs.Wrap(err) + return nil, errs.Wrap(err, fmt.Sprintf("conversationID is %s", conversationID)) } var msgs []table.MsgDocModel err = cursor.All(ctx, &msgs) @@ -222,7 +216,7 @@ func (m *MsgMongoDriver) DeleteMsgsInOneDocByIndex(ctx context.Context, docID st } _, err := m.MsgCollection.UpdateMany(ctx, bson.M{"doc_id": docID}, updates) if err != nil { - return errs.Wrap(err) + return errs.Wrap(err, fmt.Sprintf("docID is %s, indexes is %v", docID, indexes)) } return nil } @@ -289,7 +283,7 @@ func (m *MsgMongoDriver) GetMsgBySeqIndexIn1Doc( defer cur.Close(ctx) var msgDocModel []table.MsgDocModel if err := cur.All(ctx, &msgDocModel); err != nil { - return nil, errs.Wrap(err) + return nil, errs.Wrap(err, fmt.Sprintf("docID is %s, seqs is %v", docID, seqs)) } if len(msgDocModel) == 0 { return nil, errs.Wrap(mongo.ErrNoDocuments) @@ -316,14 +310,14 @@ func (m *MsgMongoDriver) GetMsgBySeqIndexIn1Doc( } data, err := json.Marshal(&revokeContent) if err != nil { - return nil, err + return nil, errs.Wrap(err, fmt.Sprintf("docID is %s, seqs is %v", docID, seqs)) } elem := sdkws.NotificationElem{ Detail: string(data), } content, err := json.Marshal(&elem) if err != nil { - return nil, err + return nil, errs.Wrap(err, fmt.Sprintf("docID is %s, seqs is %v", docID, seqs)) } msg.Msg.ContentType = constant.MsgRevokeNotification msg.Msg.Content = string(content) @@ -336,17 +330,12 @@ func (m *MsgMongoDriver) GetMsgBySeqIndexIn1Doc( func (m *MsgMongoDriver) IsExistDocID(ctx context.Context, docID string) (bool, error) { count, err := m.MsgCollection.CountDocuments(ctx, bson.M{"doc_id": docID}) if err != nil { - return false, errs.Wrap(err) + return false, errs.Wrap(err, fmt.Sprintf("docID is %s", docID)) } return count > 0, nil } -func (m *MsgMongoDriver) MarkSingleChatMsgsAsRead( - ctx context.Context, - userID string, - docID string, - indexes []int64, -) error { +func (m *MsgMongoDriver) MarkSingleChatMsgsAsRead(ctx context.Context, userID string, docID string, indexes []int64) error { updates := []mongo.WriteModel{} for _, index := range indexes { filter := bson.M{ @@ -366,7 +355,7 @@ func (m *MsgMongoDriver) MarkSingleChatMsgsAsRead( updates = append(updates, updateModel) } _, err := m.MsgCollection.BulkWrite(ctx, updates) - return err + return errs.Wrap(err, fmt.Sprintf("docID is %s, indexes is %v", docID, indexes)) } // RangeUserSendCount @@ -1160,7 +1149,7 @@ func (m *MsgMongoDriver) searchMessage(ctx context.Context, req *msg.SearchMessa var msgsDocs []docModel err = cursor.All(ctx, &msgsDocs) if err != nil { - return 0, nil, err + return 0, nil, errs.Wrap(err, "cursor.All msgsDocs") } log.ZDebug(ctx, "query mongoDB", "result", msgsDocs) msgs := make([]*table.MsgInfoModel, 0) @@ -1185,14 +1174,14 @@ func (m *MsgMongoDriver) searchMessage(ctx context.Context, req *msg.SearchMessa } data, err := json.Marshal(&revokeContent) if err != nil { - return 0, nil, err + return 0, nil, errs.Wrap(err, "json.Marshal revokeContent") } elem := sdkws.NotificationElem{ Detail: string(data), } content, err := json.Marshal(&elem) if err != nil { - return 0, nil, err + return 0, nil, errs.Wrap(err, "json.Marshal elem") } msgInfo.Msg.ContentType = constant.MsgRevokeNotification msgInfo.Msg.Content = string(content) diff --git a/pkg/common/discoveryregister/direct/directconn.go b/pkg/common/discoveryregister/direct/directconn.go index 84f173ea6..cc59934a3 100644 --- a/pkg/common/discoveryregister/direct/directconn.go +++ b/pkg/common/discoveryregister/direct/directconn.go @@ -105,7 +105,7 @@ func (cd *ConnDirect) GetConns(ctx context.Context, } if len(connections) == 0 { - return nil, fmt.Errorf("no connections found for service: %s", serviceName) + return nil, errs.Wrap(errors.New("no connections found for service"), "serviceName", serviceName) } return connections, nil } @@ -155,10 +155,11 @@ func (cd *ConnDirect) dialService(ctx context.Context, address string, opts ...g conn, err := grpc.DialContext(ctx, cd.resolverDirect.Scheme()+":///"+address, options...) if err != nil { - return nil, err + return nil, errs.Wrap(err, "address", address) } return conn, nil } + func (cd *ConnDirect) dialServiceWithoutResolver(ctx context.Context, address string, opts ...grpc.DialOption) (*grpc.ClientConn, error) { options := append(opts, grpc.WithTransportCredentials(insecure.NewCredentials())) conn, err := grpc.DialContext(ctx, address, options...) diff --git a/pkg/common/discoveryregister/discoveryregister.go b/pkg/common/discoveryregister/discoveryregister.go index 23a9e3245..7e15b9d8b 100644 --- a/pkg/common/discoveryregister/discoveryregister.go +++ b/pkg/common/discoveryregister/discoveryregister.go @@ -24,6 +24,7 @@ import ( "github.com/openimsdk/open-im-server/v3/pkg/common/discoveryregister/zookeeper" "github.com/OpenIMSDK/tools/discoveryregistry" + "github.com/OpenIMSDK/tools/errs" ) // NewDiscoveryRegister creates a new service discovery and registry client based on the provided environment type. @@ -41,6 +42,6 @@ func NewDiscoveryRegister(envType string) (discoveryregistry.SvcDiscoveryRegistr case "direct": return direct.NewConnDirect() default: - return nil, errors.New("envType not correct") + return nil, errs.Wrap(erros.new("envType not correct")) } } diff --git a/pkg/common/http/http_client.go b/pkg/common/http/http_client.go index 258ab7126..141284b64 100644 --- a/pkg/common/http/http_client.go +++ b/pkg/common/http/http_client.go @@ -66,12 +66,12 @@ func Post(ctx context.Context, url string, header map[string]string, data any, t jsonStr, err := json.Marshal(data) if err != nil { - return nil, err + return nil, errs.Wrap(err, "Post: JSON marshal failed") } req, err := http.NewRequestWithContext(ctx, http.MethodPost, url, bytes.NewBuffer(jsonStr)) if err != nil { - return nil, err + return nil, errs.Wrap(err, "Post: NewRequestWithContext failed") } if operationID, _ := ctx.Value(constant.OperationID).(string); operationID != "" { @@ -84,13 +84,13 @@ func Post(ctx context.Context, url string, header map[string]string, data any, t resp, err := client.Do(req) if err != nil { - return nil, err + return nil, errs.Wrap(err, "Post: client.Do failed") } defer resp.Body.Close() result, err := io.ReadAll(resp.Body) if err != nil { - return nil, err + return nil, errs.Wrap(err, "Post: ReadAll failed") } return result, nil diff --git a/pkg/common/kafka/producer.go b/pkg/common/kafka/producer.go index 54d99a043..4b5ce6b52 100644 --- a/pkg/common/kafka/producer.go +++ b/pkg/common/kafka/producer.go @@ -90,7 +90,7 @@ func NewKafkaProducer(addr []string, topic string) (*Producer, error) { for i := 0; i <= maxRetry; i++ { p.producer, err = sarama.NewSyncProducer(p.addr, p.config) if err == nil { - return &p, nil + return &p, errs.Wrap(err) } time.Sleep(1 * time.Second) // Wait before retrying } @@ -178,7 +178,7 @@ func (p *Producer) SendMessage(ctx context.Context, key string, msg proto.Messag // Attach context metadata as headers header, err := GetMQHeaderWithContext(ctx) if err != nil { - return 0, 0, errs.Wrap(err) + return 0, 0, err } kMsg.Headers = header diff --git a/pkg/common/kafka/util.go b/pkg/common/kafka/util.go index bdbe56fec..578a308ef 100644 --- a/pkg/common/kafka/util.go +++ b/pkg/common/kafka/util.go @@ -20,7 +20,6 @@ import ( "strings" "github.com/IBM/sarama" - "github.com/OpenIMSDK/tools/errs" "github.com/openimsdk/open-im-server/v3/pkg/common/config" "github.com/openimsdk/open-im-server/v3/pkg/common/tls" @@ -37,7 +36,7 @@ func SetupTLSConfig(cfg *sarama.Config) error { []byte(config.Config.Kafka.TLS.ClientKeyPwd), ) if err != nil { - return errs.Wrap(err, "SetupTLSConfig: failed to set up TLS config") + return err } cfg.Net.TLS.Config = tlsConfig } diff --git a/pkg/common/startrpc/start.go b/pkg/common/startrpc/start.go index be9fbd25b..e19456d86 100644 --- a/pkg/common/startrpc/start.go +++ b/pkg/common/startrpc/start.go @@ -70,7 +70,7 @@ func Start( defer listener.Close() client, err := kdisc.NewDiscoveryRegister(config.Config.Envs.Discovery) if err != nil { - return errs.Wrap(err) + return err } defer client.Close() diff --git a/pkg/util/genutil/genutil.go b/pkg/util/genutil/genutil.go index bd9376d58..95735485d 100644 --- a/pkg/util/genutil/genutil.go +++ b/pkg/util/genutil/genutil.go @@ -18,6 +18,8 @@ import ( "fmt" "os" "path/filepath" + + "github.com/OpenIMSDK/tools/errs" ) // OutDir creates the absolute path name from path and checks path exists. @@ -25,16 +27,16 @@ import ( func OutDir(path string) (string, error) { outDir, err := filepath.Abs(path) if err != nil { - return "", err + return "", errs.Wrap(err, "output directory %s does not exist", path) } stat, err := os.Stat(outDir) if err != nil { - return "", err + return "", errs.Wrap(err, "output directory %s does not exist", outDir) } if !stat.IsDir() { - return "", fmt.Errorf("output directory %s is not a directory", outDir) + return "", errs.Wrap(err, "output directory %s is not a directory", outDir) } outDir += "/" return outDir, nil