fix: fix tools erros code

pull/2100/head
Xinwei Xiong (cubxxw) 2 years ago
parent 65427a9592
commit e503229f6b

@ -38,6 +38,7 @@ func (m *msgServer) GetConversationsHasReadAndMaxSeq(ctx context.Context, req *m
} else { } else {
conversationIDs = req.ConversationIDs conversationIDs = req.ConversationIDs
} }
hasReadSeqs, err := m.MsgDatabase.GetHasReadSeqs(ctx, req.UserID, conversationIDs) hasReadSeqs, err := m.MsgDatabase.GetHasReadSeqs(ctx, req.UserID, conversationIDs)
if err != nil { if err != nil {
return nil, err return nil, err
@ -207,14 +208,7 @@ func (m *msgServer) MarkConversationAsRead(ctx context.Context, req *msg.MarkCon
return &msg.MarkConversationAsReadResp{}, nil return &msg.MarkConversationAsReadResp{}, nil
} }
func (m *msgServer) sendMarkAsReadNotification( func (m *msgServer) sendMarkAsReadNotification(ctx context.Context, conversationID string, sessionType int32, sendID, recvID string, seqs []int64, hasReadSeq int64) error {
ctx context.Context,
conversationID string,
sessionType int32,
sendID, recvID string,
seqs []int64,
hasReadSeq int64,
) error {
tips := &sdkws.MarkAsReadTips{ tips := &sdkws.MarkAsReadTips{
MarkAsReadUserID: sendID, MarkAsReadUserID: sendID,
ConversationID: conversationID, ConversationID: conversationID,

@ -30,18 +30,21 @@ import (
) )
type ( type (
// MessageInterceptorChain defines a chain of message interceptor functions.
MessageInterceptorChain []MessageInterceptorFunc MessageInterceptorChain []MessageInterceptorFunc
// msgServer encapsulates dependencies required for message handling.
msgServer struct { msgServer struct {
RegisterCenter discoveryregistry.SvcDiscoveryRegistry RegisterCenter discoveryregistry.SvcDiscoveryRegistry // Service discovery registry for service registration.
MsgDatabase controller.CommonMsgDatabase MsgDatabase controller.CommonMsgDatabase // Interface for message database operations.
Conversation *rpcclient.ConversationRpcClient Conversation *rpcclient.ConversationRpcClient // RPC client for conversation service.
UserLocalCache *rpccache.UserLocalCache UserLocalCache *rpccache.UserLocalCache // Local cache for user data.
FriendLocalCache *rpccache.FriendLocalCache FriendLocalCache *rpccache.FriendLocalCache // Local cache for friend data.
GroupLocalCache *rpccache.GroupLocalCache GroupLocalCache *rpccache.GroupLocalCache // Local cache for group data.
ConversationLocalCache *rpccache.ConversationLocalCache ConversationLocalCache *rpccache.ConversationLocalCache // Local cache for conversation data.
Handlers MessageInterceptorChain Handlers MessageInterceptorChain // Chain of handlers for processing messages.
notificationSender *rpcclient.NotificationSender notificationSender *rpcclient.NotificationSender // RPC client for sending notifications.
config *config.GlobalConfig config *config.GlobalConfig // Global configuration settings.
} }
) )
@ -61,24 +64,24 @@ func (m *msgServer) addInterceptorHandler(interceptorFunc ...MessageInterceptorF
//} //}
func Start(config *config.GlobalConfig, client discoveryregistry.SvcDiscoveryRegistry, server *grpc.Server) error { func Start(config *config.GlobalConfig, client discoveryregistry.SvcDiscoveryRegistry, server *grpc.Server) error {
rdb, err := cache.NewRedis(config) rdb, err := cache.NewRedis(&config.Redis)
if err != nil { if err != nil {
return err return err
} }
mongo, err := unrelation.NewMongo(config) mongo, err := unrelation.NewMongo(&config.Mongo)
if err != nil { if err != nil {
return err return err
} }
if err := mongo.CreateMsgIndex(); err != nil { if err := mongo.CreateMsgIndex(); err != nil {
return err return err
} }
cacheModel := cache.NewMsgCacheModel(rdb, config) cacheModel := cache.NewMsgCacheModel(rdb, config.MsgCacheTimeout, &config.Redis)
msgDocModel := unrelation.NewMsgMongoDriver(mongo.GetDatabase(config.Mongo.Database)) msgDocModel := unrelation.NewMsgMongoDriver(mongo.GetDatabase(config.Mongo.Database))
conversationClient := rpcclient.NewConversationRpcClient(client, config) conversationClient := rpcclient.NewConversationRpcClient(client, config.RpcRegisterName.OpenImConversationName)
userRpcClient := rpcclient.NewUserRpcClient(client, config.RpcRegisterName.OpenImUserName, &config.Manager, &config.IMAdmin) userRpcClient := rpcclient.NewUserRpcClient(client, config.RpcRegisterName.OpenImUserName, &config.Manager, &config.IMAdmin)
groupRpcClient := rpcclient.NewGroupRpcClient(client, config) groupRpcClient := rpcclient.NewGroupRpcClient(client, config.RpcRegisterName.OpenImGroupName)
friendRpcClient := rpcclient.NewFriendRpcClient(client, config) friendRpcClient := rpcclient.NewFriendRpcClient(client, config.RpcRegisterName.OpenImFriendName)
msgDatabase, err := controller.NewCommonMsgDatabase(msgDocModel, cacheModel, config) msgDatabase, err := controller.NewCommonMsgDatabase(msgDocModel, cacheModel, &config.Kafka)
if err != nil { if err != nil {
return err return err
} }
@ -92,7 +95,8 @@ func Start(config *config.GlobalConfig, client discoveryregistry.SvcDiscoveryReg
FriendLocalCache: rpccache.NewFriendLocalCache(friendRpcClient, rdb), FriendLocalCache: rpccache.NewFriendLocalCache(friendRpcClient, rdb),
config: config, config: config,
} }
s.notificationSender = rpcclient.NewNotificationSender(config, rpcclient.WithLocalSendMsg(s.SendMsg))
s.notificationSender = rpcclient.NewNotificationSender(&config.Notification, rpcclient.WithLocalSendMsg(s.SendMsg))
s.addInterceptorHandler(MessageHasReadEnabled) s.addInterceptorHandler(MessageHasReadEnabled)
msg.RegisterMsgServer(server, s) msg.RegisterMsgServer(server, s)
return nil return nil

@ -27,10 +27,7 @@ import (
"github.com/openimsdk/open-im-server/v3/pkg/msgprocessor" "github.com/openimsdk/open-im-server/v3/pkg/msgprocessor"
) )
func (m *msgServer) PullMessageBySeqs( func (m *msgServer) PullMessageBySeqs(ctx context.Context, req *sdkws.PullMessageBySeqsReq) (*sdkws.PullMessageBySeqsResp, error) {
ctx context.Context,
req *sdkws.PullMessageBySeqsReq,
) (*sdkws.PullMessageBySeqsResp, error) {
resp := &sdkws.PullMessageBySeqsResp{} resp := &sdkws.PullMessageBySeqsResp{}
resp.Msgs = make(map[string]*sdkws.PullMsgs) resp.Msgs = make(map[string]*sdkws.PullMsgs)
resp.NotificationMsgs = make(map[string]*sdkws.PullMsgs) resp.NotificationMsgs = make(map[string]*sdkws.PullMsgs)
@ -89,7 +86,7 @@ func (m *msgServer) PullMessageBySeqs(
} }
func (m *msgServer) GetMaxSeq(ctx context.Context, req *sdkws.GetMaxSeqReq) (*sdkws.GetMaxSeqResp, error) { func (m *msgServer) GetMaxSeq(ctx context.Context, req *sdkws.GetMaxSeqReq) (*sdkws.GetMaxSeqResp, error) {
if err := authverify.CheckAccessV3(ctx, req.UserID, m.config); err != nil { if err := authverify.CheckAccessV3(ctx, req.UserID, &m.config.Manager, &m.config.IMAdmin); err != nil {
return nil, err return nil, err
} }
conversationIDs, err := m.ConversationLocalCache.GetConversationIDs(ctx, req.UserID) conversationIDs, err := m.ConversationLocalCache.GetConversationIDs(ctx, req.UserID)

@ -26,6 +26,7 @@ import (
"github.com/OpenIMSDK/tools/errs" "github.com/OpenIMSDK/tools/errs"
"github.com/OpenIMSDK/tools/log" "github.com/OpenIMSDK/tools/log"
"github.com/OpenIMSDK/tools/utils" "github.com/OpenIMSDK/tools/utils"
"github.com/openimsdk/open-im-server/v3/pkg/util/genutil"
) )
var ExcludeContentType = []int{constant.HasReadReceipt} var ExcludeContentType = []int{constant.HasReadReceipt}
@ -111,7 +112,7 @@ func (m *msgServer) messageVerification(ctx context.Context, data *msg.SendMsgRe
groupMemberInfo, err := m.GroupLocalCache.GetGroupMember(ctx, data.MsgData.GroupID, data.MsgData.SendID) groupMemberInfo, err := m.GroupLocalCache.GetGroupMember(ctx, data.MsgData.GroupID, data.MsgData.SendID)
if err != nil { if err != nil {
if errs.ErrRecordNotFound.Is(err) { if errs.ErrRecordNotFound.Is(err) {
return errs.ErrNotInGroupYet.Wrap(err.Error()) return errs.ErrNotInGroupYet.WrapMsg(err.Error())
} }
return err return err
} }
@ -178,16 +179,11 @@ func (m *msgServer) encapsulateMsgData(msg *sdkws.MsgData) {
} }
func GetMsgID(sendID string) string { func GetMsgID(sendID string) string {
t := time.Now().Format("2006-01-02 15:04:05") t := genutil.GetCurrentTimeFormatted()
return utils.Md5(t + "-" + sendID + "-" + strconv.Itoa(rand.Int())) return utils.Md5(t + "-" + sendID + "-" + strconv.Itoa(rand.Int()))
} }
func (m *msgServer) modifyMessageByUserMessageReceiveOpt( func (m *msgServer) modifyMessageByUserMessageReceiveOpt(ctx context.Context, userID, conversationID string, sessionType int, pb *msg.SendMsgReq) (bool, error) {
ctx context.Context,
userID, conversationID string,
sessionType int,
pb *msg.SendMsgReq,
) (bool, error) {
defer log.ZDebug(ctx, "modifyMessageByUserMessageReceiveOpt return") defer log.ZDebug(ctx, "modifyMessageByUserMessageReceiveOpt return")
opt, err := m.UserLocalCache.GetUserGlobalMsgRecvOpt(ctx, userID) opt, err := m.UserLocalCache.GetUserGlobalMsgRecvOpt(ctx, userID)
if err != nil { if err != nil {

@ -83,7 +83,7 @@ func (t *thirdServer) UploadLogs(ctx context.Context, req *third.UploadLogsReq)
} }
func (t *thirdServer) DeleteLogs(ctx context.Context, req *third.DeleteLogsReq) (*third.DeleteLogsResp, error) { func (t *thirdServer) DeleteLogs(ctx context.Context, req *third.DeleteLogsReq) (*third.DeleteLogsResp, error) {
if err := authverify.CheckAdmin(ctx, t.config); err != nil { if err := authverify.CheckAdmin(ctx, &t.config.Manager, &t.config.IMAdmin); err != nil {
return nil, err return nil, err
} }
userID := "" userID := ""
@ -96,7 +96,7 @@ func (t *thirdServer) DeleteLogs(ctx context.Context, req *third.DeleteLogsReq)
logIDs = append(logIDs, log.LogID) logIDs = append(logIDs, log.LogID)
} }
if ids := utils2.Single(req.LogIDs, logIDs); len(ids) > 0 { if ids := utils2.Single(req.LogIDs, logIDs); len(ids) > 0 {
return nil, errs.ErrRecordNotFound.Wrap(fmt.Sprintf("logIDs not found%#v", ids)) return nil, errs.ErrRecordNotFound.WrapMsg(fmt.Sprintf("logIDs not found%#v", ids))
} }
err = t.thirdDatabase.DeleteLogs(ctx, req.LogIDs, userID) err = t.thirdDatabase.DeleteLogs(ctx, req.LogIDs, userID)
if err != nil { if err != nil {
@ -124,7 +124,7 @@ func dbToPbLogInfos(logs []*relationtb.LogModel) []*third.LogInfo {
} }
func (t *thirdServer) SearchLogs(ctx context.Context, req *third.SearchLogsReq) (*third.SearchLogsResp, error) { func (t *thirdServer) SearchLogs(ctx context.Context, req *third.SearchLogsReq) (*third.SearchLogsResp, error) {
if err := authverify.CheckAdmin(ctx, t.config); err != nil { if err := authverify.CheckAdmin(ctx, &t.config.Manager, &t.config.IMAdmin); err != nil {
return nil, err return nil, err
} }
var ( var (

@ -59,7 +59,7 @@ func CheckAdmin(ctx context.Context, manager *config.Manager, imAdmin *config.IM
if utils.Contain(mcontext.GetOpUserID(ctx), imAdmin.UserID...) { if utils.Contain(mcontext.GetOpUserID(ctx), imAdmin.UserID...) {
return nil return nil
} }
return errs.ErrNoPermission.Wrap(fmt.Sprintf("user %s is not admin userID", mcontext.GetOpUserID(ctx))) return errs.ErrNoPermission.WrapMsg(fmt.Sprintf("user %s is not admin userID", mcontext.GetOpUserID(ctx)))
} }
func CheckIMAdmin(ctx context.Context, config *config.GlobalConfig) error { func CheckIMAdmin(ctx context.Context, config *config.GlobalConfig) error {
if utils.Contain(mcontext.GetOpUserID(ctx), config.IMAdmin.UserID...) { if utils.Contain(mcontext.GetOpUserID(ctx), config.IMAdmin.UserID...) {
@ -68,7 +68,7 @@ func CheckIMAdmin(ctx context.Context, config *config.GlobalConfig) error {
if len(config.Manager.UserID) > 0 && utils.Contain(mcontext.GetOpUserID(ctx), config.Manager.UserID...) { if len(config.Manager.UserID) > 0 && utils.Contain(mcontext.GetOpUserID(ctx), config.Manager.UserID...) {
return nil return nil
} }
return errs.ErrNoPermission.Wrap(fmt.Sprintf("user %s is not CheckIMAdmin userID", mcontext.GetOpUserID(ctx))) return errs.ErrNoPermission.WrapMsg(fmt.Sprintf("user %s is not CheckIMAdmin userID", mcontext.GetOpUserID(ctx)))
} }
func ParseRedisInterfaceToken(redisToken any, secret string) (*tokenverify.Claims, error) { func ParseRedisInterfaceToken(redisToken any, secret string) (*tokenverify.Claims, error) {
@ -85,10 +85,10 @@ func WsVerifyToken(token, userID, secret string, platformID int) error {
return err return err
} }
if claim.UserID != userID { if claim.UserID != userID {
return errs.ErrTokenInvalid.Wrap(fmt.Sprintf("token uid %s != userID %s", claim.UserID, userID)) return errs.ErrTokenInvalid.WrapMsg(fmt.Sprintf("token uid %s != userID %s", claim.UserID, userID))
} }
if claim.PlatformID != platformID { if claim.PlatformID != platformID {
return errs.ErrTokenInvalid.Wrap(fmt.Sprintf("token platform %d != %d", claim.PlatformID, platformID)) return errs.ErrTokenInvalid.WrapMsg(fmt.Sprintf("token platform %d != %d", claim.PlatformID, platformID))
} }
return nil return nil
} }

@ -18,6 +18,7 @@ import (
"fmt" "fmt"
"os" "os"
"path/filepath" "path/filepath"
"time"
"github.com/OpenIMSDK/tools/errs" "github.com/OpenIMSDK/tools/errs"
) )
@ -27,21 +28,25 @@ import (
func OutDir(path string) (string, error) { func OutDir(path string) (string, error) {
outDir, err := filepath.Abs(path) outDir, err := filepath.Abs(path)
if err != nil { if err != nil {
return "", errs.Wrap(err, "output directory %s does not exist", path) return "", errs.WrapMsg(err, "output directory %s does not exist", path)
} }
stat, err := os.Stat(outDir) stat, err := os.Stat(outDir)
if err != nil { if err != nil {
return "", errs.Wrap(err, "output directory %s does not exist", outDir) return "", errs.WrapMsg(err, "output directory %s does not exist", outDir)
} }
if !stat.IsDir() { if !stat.IsDir() {
return "", errs.Wrap(err, "output directory %s is not a directory", outDir) return "", errs.WrapMsg(err, "output directory %s is not a directory", outDir)
} }
outDir += "/" outDir += "/"
return outDir, nil return outDir, nil
} }
func GetCurrentTimeFormatted() string {
return time.Now().Format("2006-01-02 15:04:05")
}
func ExitWithError(err error) { func ExitWithError(err error) {
progName := filepath.Base(os.Args[0]) progName := filepath.Base(os.Args[0])
fmt.Fprintf(os.Stderr, "%s exit -1: %+v\n", progName, err) fmt.Fprintf(os.Stderr, "%s exit -1: %+v\n", progName, err)

@ -19,10 +19,9 @@ import (
"fmt" "fmt"
"os/exec" "os/exec"
"runtime" "runtime"
"time"
"github.com/fatih/color" "github.com/fatih/color"
//"github.com/openimsdk/open-im-server/v3/pkg/common/version" "github.com/openimsdk/open-im-server/v3/pkg/util/genutil"
) )
func ExecuteCommand(cmdName string, args ...string) (string, error) { func ExecuteCommand(cmdName string, args ...string) (string, error) {
@ -40,8 +39,7 @@ func ExecuteCommand(cmdName string, args ...string) (string, error) {
} }
func printTime() string { func printTime() string {
currentTime := time.Now() formattedTime := genutil.GetCurrentTimeFormatted()
formattedTime := currentTime.Format("2006-01-02 15:04:05")
return fmt.Sprintf("Current Date & Time: %s", formattedTime) return fmt.Sprintf("Current Date & Time: %s", formattedTime)
} }

Loading…
Cancel
Save