feat: add openim auto format code

pull/1906/head
Xinwei Xiong (cubxxw) 2 years ago
parent 8a81768b56
commit f91d8050ce

@ -233,51 +233,60 @@ func GinParseToken(rdb redis.UniversalClient) gin.HandlerFunc {
config.Config.TokenPolicy.Expire, config.Config.TokenPolicy.Expire,
) )
return func(c *gin.Context) { return func(c *gin.Context) {
if c.Request.Method != http.MethodPost { switch c.Request.Method {
c.Next() case http.MethodPost:
return
}
token := c.Request.Header.Get(constant.Token) token := c.Request.Header.Get(constant.Token)
if token == "" { if token == "" {
handleGinError(c, "header get token error", errs.ErrArgs, "header must have token") log.ZWarn(c, "header get token error", errs.ErrArgs.Wrap("header must have token"))
apiresp.GinError(c, errs.ErrArgs.Wrap("header must have token"))
c.Abort()
return return
} }
claims, err := tokenverify.GetClaimFromToken(token, authverify.Secret()) claims, err := tokenverify.GetClaimFromToken(token, authverify.Secret())
if err != nil { if err != nil {
handleGinError(c, "jwt get token error", errs.ErrTokenUnknown, "") log.ZWarn(c, "jwt get token error", errs.ErrTokenUnknown.Wrap())
apiresp.GinError(c, errs.ErrTokenUnknown.Wrap())
c.Abort()
return return
} }
m, err := dataBase.GetTokensWithoutError(c, claims.UserID, claims.PlatformID) m, err := dataBase.GetTokensWithoutError(c, claims.UserID, claims.PlatformID)
if err != nil || len(m) == 0 { if err != nil {
handleGinError(c, "cache get token error", errs.ErrTokenNotExist, "") log.ZWarn(c, "cache get token error", errs.ErrTokenNotExist.Wrap())
apiresp.GinError(c, errs.ErrTokenNotExist.Wrap())
c.Abort()
return return
} }
if len(m) == 0 {
log.ZWarn(c, "cache do not exist token error", errs.ErrTokenNotExist.Wrap())
apiresp.GinError(c, errs.ErrTokenNotExist.Wrap())
c.Abort()
}
if v, ok := m[token]; ok { if v, ok := m[token]; ok {
if v == constant.KickedToken { switch v {
handleGinError(c, "cache kicked token error", errs.ErrTokenKicked, "") case constant.NormalToken:
case constant.KickedToken:
log.ZWarn(c, "cache kicked token error", errs.ErrTokenKicked.Wrap())
apiresp.GinError(c, errs.ErrTokenKicked.Wrap())
c.Abort()
return return
} else if v != constant.NormalToken { default:
handleGinError(c, "cache unknown token error", errs.ErrTokenUnknown, "") log.ZWarn(c, "cache unknown token error", errs.ErrTokenUnknown.Wrap())
apiresp.GinError(c, errs.ErrTokenUnknown.Wrap())
c.Abort()
return return
} }
} else { } else {
handleGinError(c, "token does not exist error", errs.ErrTokenNotExist, "") apiresp.GinError(c, errs.ErrTokenNotExist.Wrap())
c.Abort()
return return
} }
}
c.Set(constant.OpUserPlatform, constant.PlatformIDToName(claims.PlatformID))
c.Set(constant.OpUserID, claims.UserID)
c.Next()
} }
} }
// handleGinError logs and returns an error response through Gin context. // // handleGinError logs and returns an error response through Gin context.
func handleGinError(c *gin.Context, logMessage string, errType errs.CodeError, detail string) { // func handleGinError(c *gin.Context, logMessage string, errType errs.CodeError, detail string) {
wrappedErr := errType.Wrap(detail) // wrappedErr := errType.Wrap(detail)
apiresp.GinError(c, wrappedErr) // apiresp.GinError(c, wrappedErr)
c.Abort() // c.Abort()
} // }

@ -64,57 +64,61 @@ func (u *UserApi) GetUsers(c *gin.Context) {
a2r.Call(user.UserClient.GetPaginationUsers, u.Client, c) a2r.Call(user.UserClient.GetPaginationUsers, u.Client, c)
} }
// GetUsersOnlineStatus retrieves the online status of users. // GetUsersOnlineStatus Get user online status.
func (u *UserApi) GetUsersOnlineStatus(c *gin.Context) { func (u *UserApi) GetUsersOnlineStatus(c *gin.Context) {
var req msggateway.GetUsersOnlineStatusReq var req msggateway.GetUsersOnlineStatusReq
if err := c.BindJSON(&req); err != nil { if err := c.BindJSON(&req); err != nil {
apiresp.GinError(c, err) apiresp.GinError(c, err)
return return
} }
conns, err := u.Discov.GetConns(c, config.Config.RpcRegisterName.OpenImMessageGatewayName) conns, err := u.Discov.GetConns(c, config.Config.RpcRegisterName.OpenImMessageGatewayName)
if err != nil { if err != nil {
apiresp.GinError(c, err) apiresp.GinError(c, err)
return return
} }
wsResult := make([]*msggateway.GetUsersOnlineStatusResp_SuccessResult, 0) var wsResult []*msggateway.GetUsersOnlineStatusResp_SuccessResult
for _, conn := range conns { var respResult []*msggateway.GetUsersOnlineStatusResp_SuccessResult
msgClient := msggateway.NewMsgGatewayClient(conn) flag := false
// Online push message
for _, v := range conns {
msgClient := msggateway.NewMsgGatewayClient(v)
reply, err := msgClient.GetUsersOnlineStatus(c, &req) reply, err := msgClient.GetUsersOnlineStatus(c, &req)
if err != nil { if err != nil {
log.ZDebug(c, "GetUsersOnlineStatus rpc error", err) log.ZDebug(c, "GetUsersOnlineStatus rpc error", err)
if apiresp.ParseError(err).ErrCode == errs.NoPermissionError {
apiresp.GinError(c, errs.Wrap(err)) parseError := apiresp.ParseError(err)
if parseError.ErrCode == errs.NoPermissionError {
apiresp.GinError(c, err)
return return
} }
continue } else {
}
wsResult = append(wsResult, reply.SuccessResult...) wsResult = append(wsResult, reply.SuccessResult...)
} }
respResult := compileResults(req.UserIDs, wsResult)
apiresp.GinSuccess(c, respResult)
}
// compileResults aggregates online status results for the provided userIDs.
func compileResults(userIDs []string, wsResult []*msggateway.GetUsersOnlineStatusResp_SuccessResult) []*msggateway.GetUsersOnlineStatusResp_SuccessResult {
respResult := make([]*msggateway.GetUsersOnlineStatusResp_SuccessResult, 0, len(userIDs))
for _, userID := range userIDs {
res := &msggateway.GetUsersOnlineStatusResp_SuccessResult{
UserID: userID,
Status: constant.OfflineStatus, // Default to offline
} }
for _, result := range wsResult { // Traversing the userIDs in the api request body
if result.UserID == userID { for _, v1 := range req.UserIDs {
flag = false
res := new(msggateway.GetUsersOnlineStatusResp_SuccessResult)
// Iterate through the online results fetched from various gateways
for _, v2 := range wsResult {
// If matches the above description on the line, and vice versa
if v2.UserID == v1 {
flag = true
res.UserID = v1
res.Status = constant.OnlineStatus res.Status = constant.OnlineStatus
res.DetailPlatformStatus = append(res.DetailPlatformStatus, result.DetailPlatformStatus...) res.DetailPlatformStatus = append(res.DetailPlatformStatus, v2.DetailPlatformStatus...)
break break
} }
} }
if !flag {
res.UserID = v1
res.Status = constant.OfflineStatus
}
respResult = append(respResult, res) respResult = append(respResult, res)
} }
return respResult apiresp.GinSuccess(c, respResult)
} }
func (u *UserApi) UserRegisterCount(c *gin.Context) { func (u *UserApi) UserRegisterCount(c *gin.Context) {

@ -16,7 +16,6 @@ package fcm
import ( import (
"context" "context"
"fmt"
"path/filepath" "path/filepath"
firebase "firebase.google.com/go" firebase "firebase.google.com/go"
@ -42,34 +41,23 @@ type Fcm struct {
// NewClient initializes a new FCM client using the Firebase Admin SDK. // NewClient initializes a new FCM client using the Firebase Admin SDK.
// It requires the FCM service account credentials file located within the project's configuration directory. // It requires the FCM service account credentials file located within the project's configuration directory.
// The function returns an Fcm pointer on success, or nil and an error if initialization fails. func NewClient(cache cache.MsgModel) *Fcm {
func NewClient(cache cache.MsgModel) (*Fcm, error) { projectRoot, _ := config.GetProjectRoot()
// Attempt to get the project root directory.
projectRoot, err := config.GetProjectRoot()
if err != nil {
return nil, fmt.Errorf("failed to get project root: %w", err)
}
credentialsFilePath := filepath.Join(projectRoot, "config", config.Config.Push.Fcm.ServiceAccount) credentialsFilePath := filepath.Join(projectRoot, "config", config.Config.Push.Fcm.ServiceAccount)
opt := option.WithCredentialsFile(credentialsFilePath) opt := option.WithCredentialsFile(credentialsFilePath)
// Initialize the Firebase app with the specified service account credentials.
fcmApp, err := firebase.NewApp(context.Background(), nil, opt) fcmApp, err := firebase.NewApp(context.Background(), nil, opt)
if err != nil { if err != nil {
return nil, fmt.Errorf("failed to initialize Firebase app: %w", err) return nil
} }
// Obtain the messaging client from the Firebase app.
ctx := context.Background() ctx := context.Background()
fcmMsgClient, err := fcmApp.Messaging(ctx) fcmMsgClient, err := fcmApp.Messaging(ctx)
if err != nil { if err != nil {
return nil, fmt.Errorf("failed to get Firebase messaging client: %w", err) return nil
} }
return &Fcm{fcmMsgCli: fcmMsgClient, cache: cache}, nil return &Fcm{fcmMsgCli: fcmMsgClient, cache: cache}
} }
func (f *Fcm) Push(ctx context.Context, userIDs []string, title, content string, opts *offlinepush.Opts) error { func (f *Fcm) Push(ctx context.Context, userIDs []string, title, content string, opts *offlinepush.Opts) error {
// accounts->registrationToken // accounts->registrationToken
allTokens := make(map[string][]string, 0) allTokens := make(map[string][]string, 0)

@ -82,7 +82,7 @@ func NewOfflinePusher(cache cache.MsgModel) offlinepush.OfflinePusher {
case "getui": case "getui":
offlinePusher = getui.NewClient(cache) offlinePusher = getui.NewClient(cache)
case "fcm": case "fcm":
offlinePusher, _ = fcm.NewClient(cache) offlinePusher = fcm.NewClient(cache)
case "jpush": case "jpush":
offlinePusher = jpush.NewClient() offlinePusher = jpush.NewClient()
default: default:

@ -15,6 +15,9 @@
package cmd package cmd
import ( import (
"errors"
"fmt"
"github.com/OpenIMSDK/protocol/constant" "github.com/OpenIMSDK/protocol/constant"
"github.com/spf13/cobra" "github.com/spf13/cobra"
@ -50,11 +53,17 @@ func (a *ApiCmd) AddApi(f func(port int, promPort int) error) {
} }
} }
func (a *ApiCmd) GetPortFromConfig(portType string) (int,) { func (a *ApiCmd) GetPortFromConfig(portType string) (int, error) {
if portType == constant.FlagPort { if portType == constant.FlagPort {
return config2.Config.Api.OpenImApiPort[0] if len(config2.Config.Api.OpenImApiPort) > 0 {
return config2.Config.Api.OpenImApiPort[0], nil
}
return 0, errors.New("API port configuration is empty or missing")
} else if portType == constant.FlagPrometheusPort { } else if portType == constant.FlagPrometheusPort {
return config2.Config.Prometheus.ApiPrometheusPort[0] if len(config2.Config.Prometheus.ApiPrometheusPort) > 0 {
return config2.Config.Prometheus.ApiPrometheusPort[0], nil
}
return 0, errors.New("Prometheus port configuration is empty or missing")
} }
return 0 return 0, fmt.Errorf("unknown port type: %s", portType)
} }

@ -46,14 +46,18 @@ func (m *MsgTransferCmd) Exec() error {
return m.Execute() return m.Execute()
} }
func (m *MsgTransferCmd) GetPortFromConfig(portType string) int { func (m *MsgTransferCmd) GetPortFromConfig(portType string) (int, error) {
if portType == constant.FlagPort { if portType == constant.FlagPort {
return 0 return 0, nil
} else if portType == constant.FlagPrometheusPort { } else if portType == constant.FlagPrometheusPort {
n := m.getTransferProgressFlagValue() n := m.getTransferProgressFlagValue()
return config2.Config.Prometheus.MessageTransferPrometheusPort[n]
if n < len(config2.Config.Prometheus.MessageTransferPrometheusPort) {
return config2.Config.Prometheus.MessageTransferPrometheusPort[n], nil
} }
return 0 return 0, fmt.Errorf("index out of range for MessageTransferPrometheusPort with index %d", n)
}
return 0, fmt.Errorf("unknown port type: %s", portType)
} }
func (m *MsgTransferCmd) AddTransferProgressFlag() { func (m *MsgTransferCmd) AddTransferProgressFlag() {

@ -133,7 +133,7 @@ func (r *RootCmd) AddPortFlag() {
r.Command.Flags().IntP(constant.FlagPort, "p", 0, "server listen port") r.Command.Flags().IntP(constant.FlagPort, "p", 0, "server listen port")
} }
func (r *RootCmd) getPortFlag(cmd *cobra.Command) (int, error) { func (r *RootCmd) hubgetPortFlag(cmd *cobra.Command) (int, error) {
port, err := cmd.Flags().GetInt(constant.FlagPort) port, err := cmd.Flags().GetInt(constant.FlagPort)
if err != nil { if err != nil {
// Wrapping the error with additional context // Wrapping the error with additional context

@ -49,7 +49,7 @@ func (a *RpcCmd) Exec() error {
var prometheusPort, err = a.getPrometheusPortFlag(cmd) var prometheusPort, err = a.getPrometheusPortFlag(cmd)
if err != nil { if err != nil {
return errs.Wrap(err, "Failed to get Prometheus port") return err
} }
a.prometheusPort = prometheusPort a.prometheusPort = prometheusPort

@ -65,7 +65,7 @@ func GetProjectRoot() (string, error) {
// Attempt to compute the project root by navigating up from the executable's directory // Attempt to compute the project root by navigating up from the executable's directory
projectRoot, err := genutil.OutDir(filepath.Join(filepath.Dir(executablePath), "../../../../..")) projectRoot, err := genutil.OutDir(filepath.Join(filepath.Dir(executablePath), "../../../../.."))
if err != nil { if err != nil {
return "", errs.Wrap(err, "failed to determine project root directory") return "", err
} }
return projectRoot, nil return projectRoot, nil

@ -85,8 +85,7 @@ func FriendsDB2Pb(ctx context.Context, friendsDB []*relation.FriendModel, getUse
} }
func FriendRequestDB2Pb( func FriendRequestDB2Pb(ctx context.Context,
ctx context.Context,
friendRequests []*relation.FriendRequestModel, friendRequests []*relation.FriendRequestModel,
getUsers func(ctx context.Context, userIDs []string) (map[string]*sdkws.UserInfo, error), getUsers func(ctx context.Context, userIDs []string) (map[string]*sdkws.UserInfo, error),
) ([]*sdkws.FriendRequest, error) { ) ([]*sdkws.FriendRequest, error) {

@ -126,11 +126,11 @@ func (f *friendDatabase) AddFriendRequest(ctx context.Context, fromUserID, toUse
}) })
} }
// (1)先判断是否在好友表 (在不在都不返回错误) (2)对于不在好友列表的 插入即可. // (1) First determine whether it is in the friends list (in or out does not return an error) (2) for not in the friends list can be inserted.
func (f *friendDatabase) BecomeFriends(ctx context.Context, ownerUserID string, friendUserIDs []string, addSource int32) (err error) { func (f *friendDatabase) BecomeFriends(ctx context.Context, ownerUserID string, friendUserIDs []string, addSource int32) (err error) {
return f.tx.Transaction(ctx, func(ctx context.Context) error { return f.tx.Transaction(ctx, func(ctx context.Context) error {
cache := f.cache.NewCache() cache := f.cache.NewCache()
// 先find 找出重复的 去掉重复的 // User find friends
fs1, err := f.friend.FindFriends(ctx, ownerUserID, friendUserIDs) fs1, err := f.friend.FindFriends(ctx, ownerUserID, friendUserIDs)
if err != nil { if err != nil {
return err return err

@ -46,8 +46,8 @@ type GroupModelInterface interface {
Find(ctx context.Context, groupIDs []string) (groups []*GroupModel, err error) Find(ctx context.Context, groupIDs []string) (groups []*GroupModel, err error)
Take(ctx context.Context, groupID string) (group *GroupModel, err error) Take(ctx context.Context, groupID string) (group *GroupModel, err error)
Search(ctx context.Context, keyword string, pagination pagination.Pagination) (total int64, groups []*GroupModel, err error) Search(ctx context.Context, keyword string, pagination pagination.Pagination) (total int64, groups []*GroupModel, err error)
// 获取群总数 // Get Group total quantity
CountTotal(ctx context.Context, before *time.Time) (count int64, err error) CountTotal(ctx context.Context, before *time.Time) (count int64, err error)
// 获取范围内群增量 // Get Group total quantity every day
CountRangeEverydayTotal(ctx context.Context, start time.Time, end time.Time) (map[string]int64, error) CountRangeEverydayTotal(ctx context.Context, start time.Time, end time.Time) (map[string]int64, error)
} }

@ -62,9 +62,9 @@ type UserModelInterface interface {
Exist(ctx context.Context, userID string) (exist bool, err error) Exist(ctx context.Context, userID string) (exist bool, err error)
GetAllUserID(ctx context.Context, pagination pagination.Pagination) (count int64, userIDs []string, err error) GetAllUserID(ctx context.Context, pagination pagination.Pagination) (count int64, userIDs []string, err error)
GetUserGlobalRecvMsgOpt(ctx context.Context, userID string) (opt int, err error) GetUserGlobalRecvMsgOpt(ctx context.Context, userID string) (opt int, err error)
// 获取用户总数 // Get user total quantity
CountTotal(ctx context.Context, before *time.Time) (count int64, err error) CountTotal(ctx context.Context, before *time.Time) (count int64, err error)
// 获取范围内用户增量 // Get user total quantity every day
CountRangeEverydayTotal(ctx context.Context, start time.Time, end time.Time) (map[string]int64, error) CountRangeEverydayTotal(ctx context.Context, start time.Time, end time.Time) (map[string]int64, error)
//CRUD user command //CRUD user command
AddUserCommand(ctx context.Context, userID string, Type int32, UUID string, value string, ex string) error AddUserCommand(ctx context.Context, userID string, Type int32, UUID string, value string, ex string) error

@ -122,13 +122,7 @@ func (m *MsgMongoDriver) UpdateMsgContent(ctx context.Context, docID string, ind
return nil return nil
} }
func (m *MsgMongoDriver) UpdateMsgStatusByIndexInOneDoc( func (m *MsgMongoDriver) UpdateMsgStatusByIndexInOneDoc(ctx context.Context, docID string, msg *sdkws.MsgData, seqIndex int, status int32) error {
ctx context.Context,
docID string,
msg *sdkws.MsgData,
seqIndex int,
status int32,
) error {
msg.Status = status msg.Status = status
bytes, err := proto.Marshal(msg) bytes, err := proto.Marshal(msg)
if err != nil { if err != nil {
@ -140,7 +134,7 @@ func (m *MsgMongoDriver) UpdateMsgStatusByIndexInOneDoc(
bson.M{"$set": bson.M{fmt.Sprintf("msgs.%d.msg", seqIndex): bytes}}, bson.M{"$set": bson.M{fmt.Sprintf("msgs.%d.msg", seqIndex): bytes}},
) )
if err != nil { if err != nil {
return errs.Wrap(err) return errs.Wrap(err, fmt.Sprintf("docID is %s, seqIndex is %d", docID, seqIndex))
} }
return nil return nil
} }
@ -166,7 +160,7 @@ func (m *MsgMongoDriver) GetMsgDocModelByIndex(
findOpts, findOpts,
) )
if err != nil { if err != nil {
return nil, errs.Wrap(err) return nil, errs.Wrap(err, fmt.Sprintf("conversationID is %s", conversationID))
} }
var msgs []table.MsgDocModel var msgs []table.MsgDocModel
err = cursor.All(ctx, &msgs) err = cursor.All(ctx, &msgs)
@ -222,7 +216,7 @@ func (m *MsgMongoDriver) DeleteMsgsInOneDocByIndex(ctx context.Context, docID st
} }
_, err := m.MsgCollection.UpdateMany(ctx, bson.M{"doc_id": docID}, updates) _, err := m.MsgCollection.UpdateMany(ctx, bson.M{"doc_id": docID}, updates)
if err != nil { if err != nil {
return errs.Wrap(err) return errs.Wrap(err, fmt.Sprintf("docID is %s, indexes is %v", docID, indexes))
} }
return nil return nil
} }
@ -289,7 +283,7 @@ func (m *MsgMongoDriver) GetMsgBySeqIndexIn1Doc(
defer cur.Close(ctx) defer cur.Close(ctx)
var msgDocModel []table.MsgDocModel var msgDocModel []table.MsgDocModel
if err := cur.All(ctx, &msgDocModel); err != nil { if err := cur.All(ctx, &msgDocModel); err != nil {
return nil, errs.Wrap(err) return nil, errs.Wrap(err, fmt.Sprintf("docID is %s, seqs is %v", docID, seqs))
} }
if len(msgDocModel) == 0 { if len(msgDocModel) == 0 {
return nil, errs.Wrap(mongo.ErrNoDocuments) return nil, errs.Wrap(mongo.ErrNoDocuments)
@ -316,14 +310,14 @@ func (m *MsgMongoDriver) GetMsgBySeqIndexIn1Doc(
} }
data, err := json.Marshal(&revokeContent) data, err := json.Marshal(&revokeContent)
if err != nil { if err != nil {
return nil, err return nil, errs.Wrap(err, fmt.Sprintf("docID is %s, seqs is %v", docID, seqs))
} }
elem := sdkws.NotificationElem{ elem := sdkws.NotificationElem{
Detail: string(data), Detail: string(data),
} }
content, err := json.Marshal(&elem) content, err := json.Marshal(&elem)
if err != nil { if err != nil {
return nil, err return nil, errs.Wrap(err, fmt.Sprintf("docID is %s, seqs is %v", docID, seqs))
} }
msg.Msg.ContentType = constant.MsgRevokeNotification msg.Msg.ContentType = constant.MsgRevokeNotification
msg.Msg.Content = string(content) msg.Msg.Content = string(content)
@ -336,17 +330,12 @@ func (m *MsgMongoDriver) GetMsgBySeqIndexIn1Doc(
func (m *MsgMongoDriver) IsExistDocID(ctx context.Context, docID string) (bool, error) { func (m *MsgMongoDriver) IsExistDocID(ctx context.Context, docID string) (bool, error) {
count, err := m.MsgCollection.CountDocuments(ctx, bson.M{"doc_id": docID}) count, err := m.MsgCollection.CountDocuments(ctx, bson.M{"doc_id": docID})
if err != nil { if err != nil {
return false, errs.Wrap(err) return false, errs.Wrap(err, fmt.Sprintf("docID is %s", docID))
} }
return count > 0, nil return count > 0, nil
} }
func (m *MsgMongoDriver) MarkSingleChatMsgsAsRead( func (m *MsgMongoDriver) MarkSingleChatMsgsAsRead(ctx context.Context, userID string, docID string, indexes []int64) error {
ctx context.Context,
userID string,
docID string,
indexes []int64,
) error {
updates := []mongo.WriteModel{} updates := []mongo.WriteModel{}
for _, index := range indexes { for _, index := range indexes {
filter := bson.M{ filter := bson.M{
@ -366,7 +355,7 @@ func (m *MsgMongoDriver) MarkSingleChatMsgsAsRead(
updates = append(updates, updateModel) updates = append(updates, updateModel)
} }
_, err := m.MsgCollection.BulkWrite(ctx, updates) _, err := m.MsgCollection.BulkWrite(ctx, updates)
return err return errs.Wrap(err, fmt.Sprintf("docID is %s, indexes is %v", docID, indexes))
} }
// RangeUserSendCount // RangeUserSendCount
@ -1160,7 +1149,7 @@ func (m *MsgMongoDriver) searchMessage(ctx context.Context, req *msg.SearchMessa
var msgsDocs []docModel var msgsDocs []docModel
err = cursor.All(ctx, &msgsDocs) err = cursor.All(ctx, &msgsDocs)
if err != nil { if err != nil {
return 0, nil, err return 0, nil, errs.Wrap(err, "cursor.All msgsDocs")
} }
log.ZDebug(ctx, "query mongoDB", "result", msgsDocs) log.ZDebug(ctx, "query mongoDB", "result", msgsDocs)
msgs := make([]*table.MsgInfoModel, 0) msgs := make([]*table.MsgInfoModel, 0)
@ -1185,14 +1174,14 @@ func (m *MsgMongoDriver) searchMessage(ctx context.Context, req *msg.SearchMessa
} }
data, err := json.Marshal(&revokeContent) data, err := json.Marshal(&revokeContent)
if err != nil { if err != nil {
return 0, nil, err return 0, nil, errs.Wrap(err, "json.Marshal revokeContent")
} }
elem := sdkws.NotificationElem{ elem := sdkws.NotificationElem{
Detail: string(data), Detail: string(data),
} }
content, err := json.Marshal(&elem) content, err := json.Marshal(&elem)
if err != nil { if err != nil {
return 0, nil, err return 0, nil, errs.Wrap(err, "json.Marshal elem")
} }
msgInfo.Msg.ContentType = constant.MsgRevokeNotification msgInfo.Msg.ContentType = constant.MsgRevokeNotification
msgInfo.Msg.Content = string(content) msgInfo.Msg.Content = string(content)

@ -105,7 +105,7 @@ func (cd *ConnDirect) GetConns(ctx context.Context,
} }
if len(connections) == 0 { if len(connections) == 0 {
return nil, fmt.Errorf("no connections found for service: %s", serviceName) return nil, errs.Wrap(errors.New("no connections found for service"), "serviceName", serviceName)
} }
return connections, nil return connections, nil
} }
@ -155,10 +155,11 @@ func (cd *ConnDirect) dialService(ctx context.Context, address string, opts ...g
conn, err := grpc.DialContext(ctx, cd.resolverDirect.Scheme()+":///"+address, options...) conn, err := grpc.DialContext(ctx, cd.resolverDirect.Scheme()+":///"+address, options...)
if err != nil { if err != nil {
return nil, err return nil, errs.Wrap(err, "address", address)
} }
return conn, nil return conn, nil
} }
func (cd *ConnDirect) dialServiceWithoutResolver(ctx context.Context, address string, opts ...grpc.DialOption) (*grpc.ClientConn, error) { func (cd *ConnDirect) dialServiceWithoutResolver(ctx context.Context, address string, opts ...grpc.DialOption) (*grpc.ClientConn, error) {
options := append(opts, grpc.WithTransportCredentials(insecure.NewCredentials())) options := append(opts, grpc.WithTransportCredentials(insecure.NewCredentials()))
conn, err := grpc.DialContext(ctx, address, options...) conn, err := grpc.DialContext(ctx, address, options...)

@ -24,6 +24,7 @@ import (
"github.com/openimsdk/open-im-server/v3/pkg/common/discoveryregister/zookeeper" "github.com/openimsdk/open-im-server/v3/pkg/common/discoveryregister/zookeeper"
"github.com/OpenIMSDK/tools/discoveryregistry" "github.com/OpenIMSDK/tools/discoveryregistry"
"github.com/OpenIMSDK/tools/errs"
) )
// NewDiscoveryRegister creates a new service discovery and registry client based on the provided environment type. // NewDiscoveryRegister creates a new service discovery and registry client based on the provided environment type.
@ -41,6 +42,6 @@ func NewDiscoveryRegister(envType string) (discoveryregistry.SvcDiscoveryRegistr
case "direct": case "direct":
return direct.NewConnDirect() return direct.NewConnDirect()
default: default:
return nil, errors.New("envType not correct") return nil, errs.Wrap(erros.new("envType not correct"))
} }
} }

@ -66,12 +66,12 @@ func Post(ctx context.Context, url string, header map[string]string, data any, t
jsonStr, err := json.Marshal(data) jsonStr, err := json.Marshal(data)
if err != nil { if err != nil {
return nil, err return nil, errs.Wrap(err, "Post: JSON marshal failed")
} }
req, err := http.NewRequestWithContext(ctx, http.MethodPost, url, bytes.NewBuffer(jsonStr)) req, err := http.NewRequestWithContext(ctx, http.MethodPost, url, bytes.NewBuffer(jsonStr))
if err != nil { if err != nil {
return nil, err return nil, errs.Wrap(err, "Post: NewRequestWithContext failed")
} }
if operationID, _ := ctx.Value(constant.OperationID).(string); operationID != "" { if operationID, _ := ctx.Value(constant.OperationID).(string); operationID != "" {
@ -84,13 +84,13 @@ func Post(ctx context.Context, url string, header map[string]string, data any, t
resp, err := client.Do(req) resp, err := client.Do(req)
if err != nil { if err != nil {
return nil, err return nil, errs.Wrap(err, "Post: client.Do failed")
} }
defer resp.Body.Close() defer resp.Body.Close()
result, err := io.ReadAll(resp.Body) result, err := io.ReadAll(resp.Body)
if err != nil { if err != nil {
return nil, err return nil, errs.Wrap(err, "Post: ReadAll failed")
} }
return result, nil return result, nil

@ -90,7 +90,7 @@ func NewKafkaProducer(addr []string, topic string) (*Producer, error) {
for i := 0; i <= maxRetry; i++ { for i := 0; i <= maxRetry; i++ {
p.producer, err = sarama.NewSyncProducer(p.addr, p.config) p.producer, err = sarama.NewSyncProducer(p.addr, p.config)
if err == nil { if err == nil {
return &p, nil return &p, errs.Wrap(err)
} }
time.Sleep(1 * time.Second) // Wait before retrying time.Sleep(1 * time.Second) // Wait before retrying
} }
@ -178,7 +178,7 @@ func (p *Producer) SendMessage(ctx context.Context, key string, msg proto.Messag
// Attach context metadata as headers // Attach context metadata as headers
header, err := GetMQHeaderWithContext(ctx) header, err := GetMQHeaderWithContext(ctx)
if err != nil { if err != nil {
return 0, 0, errs.Wrap(err) return 0, 0, err
} }
kMsg.Headers = header kMsg.Headers = header

@ -20,7 +20,6 @@ import (
"strings" "strings"
"github.com/IBM/sarama" "github.com/IBM/sarama"
"github.com/OpenIMSDK/tools/errs"
"github.com/openimsdk/open-im-server/v3/pkg/common/config" "github.com/openimsdk/open-im-server/v3/pkg/common/config"
"github.com/openimsdk/open-im-server/v3/pkg/common/tls" "github.com/openimsdk/open-im-server/v3/pkg/common/tls"
@ -37,7 +36,7 @@ func SetupTLSConfig(cfg *sarama.Config) error {
[]byte(config.Config.Kafka.TLS.ClientKeyPwd), []byte(config.Config.Kafka.TLS.ClientKeyPwd),
) )
if err != nil { if err != nil {
return errs.Wrap(err, "SetupTLSConfig: failed to set up TLS config") return err
} }
cfg.Net.TLS.Config = tlsConfig cfg.Net.TLS.Config = tlsConfig
} }

@ -70,7 +70,7 @@ func Start(
defer listener.Close() defer listener.Close()
client, err := kdisc.NewDiscoveryRegister(config.Config.Envs.Discovery) client, err := kdisc.NewDiscoveryRegister(config.Config.Envs.Discovery)
if err != nil { if err != nil {
return errs.Wrap(err) return err
} }
defer client.Close() defer client.Close()

@ -18,6 +18,8 @@ import (
"fmt" "fmt"
"os" "os"
"path/filepath" "path/filepath"
"github.com/OpenIMSDK/tools/errs"
) )
// OutDir creates the absolute path name from path and checks path exists. // OutDir creates the absolute path name from path and checks path exists.
@ -25,16 +27,16 @@ import (
func OutDir(path string) (string, error) { func OutDir(path string) (string, error) {
outDir, err := filepath.Abs(path) outDir, err := filepath.Abs(path)
if err != nil { if err != nil {
return "", err return "", errs.Wrap(err, "output directory %s does not exist", path)
} }
stat, err := os.Stat(outDir) stat, err := os.Stat(outDir)
if err != nil { if err != nil {
return "", err return "", errs.Wrap(err, "output directory %s does not exist", outDir)
} }
if !stat.IsDir() { if !stat.IsDir() {
return "", fmt.Errorf("output directory %s is not a directory", outDir) return "", errs.Wrap(err, "output directory %s is not a directory", outDir)
} }
outDir += "/" outDir += "/"
return outDir, nil return outDir, nil

Loading…
Cancel
Save