fix: api and msg gateway update.

pull/2100/head
Gordon 2 years ago
parent 2f5786ae34
commit 177c8b3d59

@ -30,7 +30,10 @@ import (
"github.com/openimsdk/tools/errs"
"github.com/openimsdk/tools/log"
"github.com/openimsdk/tools/mcontext"
"github.com/openimsdk/tools/utils"
"github.com/openimsdk/tools/utils/datautil"
"github.com/openimsdk/tools/utils/idutil"
"github.com/openimsdk/tools/utils/jsonutil"
"github.com/openimsdk/tools/utils/timeutil"
)
type MessageApi struct {
@ -49,10 +52,10 @@ func NewMessageApi(msgRpcClient *rpcclient.Message, userRpcClient *rpcclient.Use
}
func (MessageApi) SetOptions(options map[string]bool, value bool) {
utils.SetSwitchFromOptions(options, constant.IsHistory, value)
utils.SetSwitchFromOptions(options, constant.IsPersistent, value)
utils.SetSwitchFromOptions(options, constant.IsSenderSync, value)
utils.SetSwitchFromOptions(options, constant.IsConversationUpdate, value)
datautil.SetSwitchFromOptions(options, constant.IsHistory, value)
datautil.SetSwitchFromOptions(options, constant.IsPersistent, value)
datautil.SetSwitchFromOptions(options, constant.IsSenderSync, value)
datautil.SetSwitchFromOptions(options, constant.IsConversationUpdate, value)
}
func (m MessageApi) newUserSendMsgReq(_ *gin.Context, params *apistruct.SendMsg) *msg.SendMsgReq {
@ -61,8 +64,8 @@ func (m MessageApi) newUserSendMsgReq(_ *gin.Context, params *apistruct.SendMsg)
switch params.ContentType {
case constant.OANotification:
notification := sdkws.NotificationElem{}
notification.Detail = utils.StructToJsonString(params.Content)
newContent = utils.StructToJsonString(&notification)
notification.Detail = jsonutil.StructToJsonString(params.Content)
newContent = jsonutil.StructToJsonString(&notification)
case constant.Text:
fallthrough
case constant.Picture:
@ -76,19 +79,19 @@ func (m MessageApi) newUserSendMsgReq(_ *gin.Context, params *apistruct.SendMsg)
case constant.File:
fallthrough
default:
newContent = utils.StructToJsonString(params.Content)
newContent = jsonutil.StructToJsonString(params.Content)
}
if params.IsOnlineOnly {
m.SetOptions(options, false)
}
if params.NotOfflinePush {
utils.SetSwitchFromOptions(options, constant.IsOfflinePush, false)
datautil.SetSwitchFromOptions(options, constant.IsOfflinePush, false)
}
pbData := msg.SendMsgReq{
MsgData: &sdkws.MsgData{
SendID: params.SendID,
GroupID: params.GroupID,
ClientMsgID: utils.GetMsgID(params.SendID),
ClientMsgID: idutil.GetMsgIDByMD5(params.SendID),
SenderPlatformID: params.SenderPlatformID,
SenderNickname: params.SenderNickname,
SenderFaceURL: params.SenderFaceURL,
@ -96,7 +99,7 @@ func (m MessageApi) newUserSendMsgReq(_ *gin.Context, params *apistruct.SendMsg)
MsgFrom: constant.SysMsgType,
ContentType: params.ContentType,
Content: []byte(newContent),
CreateTime: utils.GetCurrentTimestampByMill(),
CreateTime: timeutil.GetCurrentTimestampByMill(),
SendTime: params.SendTime,
Options: options,
OfflinePushInfo: params.OfflinePushInfo,
@ -269,8 +272,8 @@ func (m *MessageApi) SendBusinessNotification(c *gin.Context) {
MsgData: &sdkws.MsgData{
SendID: req.SendUserID,
RecvID: req.RecvUserID,
Content: []byte(utils.StructToJsonString(&sdkws.NotificationElem{
Detail: utils.StructToJsonString(&struct {
Content: []byte(jsonutil.StructToJsonString(&sdkws.NotificationElem{
Detail: jsonutil.StructToJsonString(&struct {
Key string `json:"key"`
Data string `json:"data"`
}{Key: req.Key, Data: req.Data}),
@ -278,8 +281,8 @@ func (m *MessageApi) SendBusinessNotification(c *gin.Context) {
MsgFrom: constant.SysMsgType,
ContentType: constant.BusinessNotification,
SessionType: constant.SingleChatType,
CreateTime: utils.GetCurrentTimestampByMill(),
ClientMsgID: utils.GetMsgID(mcontext.GetOpUserID(c)),
CreateTime: timeutil.GetCurrentTimestampByMill(),
ClientMsgID: idutil.GetMsgIDByMD5(mcontext.GetOpUserID(c)),
Options: config.GetOptionsByNotification(config.NotificationConf{
IsSendMsg: false,
ReliabilityLevel: 1,

@ -18,6 +18,8 @@ import (
"context"
"errors"
"fmt"
"github.com/openimsdk/open-im-server/v3/pkg/common/servererrs"
"github.com/openimsdk/tools/discovery"
"net"
"net/http"
"os"
@ -42,7 +44,6 @@ import (
util "github.com/openimsdk/open-im-server/v3/pkg/util/genutil"
"github.com/openimsdk/protocol/constant"
"github.com/openimsdk/tools/apiresp"
"github.com/openimsdk/tools/discoveryregistry"
"github.com/openimsdk/tools/errs"
"github.com/openimsdk/tools/log"
"github.com/openimsdk/tools/mw"
@ -61,7 +62,7 @@ func Start(ctx context.Context, config *config.GlobalConfig, port int, proPort i
return err
}
var client discoveryregistry.SvcDiscoveryRegistry
var client discovery.SvcDiscoveryRegistry
// Determine whether zk is passed according to whether it is a clustered deployment
client, err = kdisc.NewDiscoveryRegister(config)
@ -133,7 +134,7 @@ func Start(ctx context.Context, config *config.GlobalConfig, port int, proPort i
return nil
}
func newGinRouter(disCov discoveryregistry.SvcDiscoveryRegistry, rdb redis.UniversalClient, config *config.GlobalConfig) *gin.Engine {
func newGinRouter(disCov discovery.SvcDiscoveryRegistry, rdb redis.UniversalClient, config *config.GlobalConfig) *gin.Engine {
disCov.AddOption(mw.GrpcClient(), grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithDefaultServiceConfig(fmt.Sprintf(`{"LoadBalancingPolicy": "%s"}`, "round_robin")))
gin.SetMode(gin.ReleaseMode)
r := gin.New()
@ -326,26 +327,26 @@ func GinParseToken(rdb redis.UniversalClient, config *config.GlobalConfig) gin.H
case http.MethodPost:
token := c.Request.Header.Get(constant.Token)
if token == "" {
log.ZWarn(c, "header get token error", errs.ErrArgs.WrapMsg("header must have token"))
apiresp.GinError(c, errs.ErrArgs.WrapMsg("header must have token"))
log.ZWarn(c, "header get token error", servererrs.ErrArgs.WrapMsg("header must have token"))
apiresp.GinError(c, servererrs.ErrArgs.WrapMsg("header must have token"))
c.Abort()
return
}
claims, err := tokenverify.GetClaimFromToken(token, authverify.Secret(config.Secret))
if err != nil {
log.ZWarn(c, "jwt get token error", errs.ErrTokenUnknown.Wrap())
apiresp.GinError(c, errs.ErrTokenUnknown.Wrap())
apiresp.GinError(c, servererrs.ErrTokenUnknown.Wrap())
c.Abort()
return
}
m, err := dataBase.GetTokensWithoutError(c, claims.UserID, claims.PlatformID)
if err != nil {
apiresp.GinError(c, errs.ErrTokenNotExist.Wrap())
apiresp.GinError(c, servererrs.ErrTokenNotExist.Wrap())
c.Abort()
return
}
if len(m) == 0 {
apiresp.GinError(c, errs.ErrTokenNotExist.Wrap())
apiresp.GinError(c, servererrs.ErrTokenNotExist.Wrap())
c.Abort()
return
}
@ -353,16 +354,16 @@ func GinParseToken(rdb redis.UniversalClient, config *config.GlobalConfig) gin.H
switch v {
case constant.NormalToken:
case constant.KickedToken:
apiresp.GinError(c, errs.ErrTokenKicked.Wrap())
apiresp.GinError(c, servererrs.ErrTokenKicked.Wrap())
c.Abort()
return
default:
apiresp.GinError(c, errs.ErrTokenUnknown.Wrap())
apiresp.GinError(c, servererrs.ErrTokenUnknown.Wrap())
c.Abort()
return
}
} else {
apiresp.GinError(c, errs.ErrTokenNotExist.Wrap())
apiresp.GinError(c, servererrs.ErrTokenNotExist.Wrap())
c.Abort()
return
}

@ -16,6 +16,7 @@ package msggateway
import (
"context"
"github.com/openimsdk/open-im-server/v3/pkg/common/servererrs"
"github.com/openimsdk/open-im-server/v3/pkg/authverify"
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
@ -156,7 +157,7 @@ func (s *Server) SuperGroupOnlineBatchPushOneMsg(ctx context.Context, req *msgga
(client.IsBackground && client.PlatformID != constant.IOSPlatformID) {
err := client.PushMessage(ctx, req.MsgData)
if err != nil {
userPlatform.ResultCode = int64(errs.ErrPushMsgErr.Code())
userPlatform.ResultCode = int64(servererrs.ErrPushMsgErr.Code())
resp = append(resp, userPlatform)
} else {
if _, ok := s.pushTerminal[client.PlatformID]; ok {
@ -165,7 +166,7 @@ func (s *Server) SuperGroupOnlineBatchPushOneMsg(ctx context.Context, req *msgga
}
}
} else {
userPlatform.ResultCode = int64(errs.ErrIOSBackgroundPushErr.Code())
userPlatform.ResultCode = int64(servererrs.ErrIOSBackgroundPushErr.Code())
resp = append(resp, userPlatform)
}
}

@ -17,7 +17,7 @@ package msggateway
import (
"context"
"github.com/openimsdk/tools/discovery"
"github.com/openimsdk/tools/utils/goassist"
"github.com/openimsdk/tools/utils/jsonutil"
"sync"
"github.com/go-playground/validator/v10"
@ -46,7 +46,7 @@ func (r *Req) String() string {
tReq.SendID = r.SendID
tReq.OperationID = r.OperationID
tReq.MsgIncr = r.MsgIncr
return goassist.StructToJsonString(tReq)
return jsonutil.StructToJsonString(tReq)
}
var reqPool = sync.Pool{
@ -86,7 +86,7 @@ func (r *Resp) String() string {
tResp.OperationID = r.OperationID
tResp.ErrCode = r.ErrCode
tResp.ErrMsg = r.ErrMsg
return goassist.StructToJsonString(tResp)
return jsonutil.StructToJsonString(tResp)
}
type MessageHandler interface {

@ -19,6 +19,7 @@ import (
"encoding/json"
"errors"
"fmt"
"github.com/openimsdk/open-im-server/v3/pkg/common/servererrs"
"github.com/openimsdk/tools/discovery"
"github.com/openimsdk/tools/utils/stringutil"
"net/http"
@ -427,21 +428,21 @@ func (ws *WsServer) ParseWSArgs(r *http.Request) (args *WSArgs, err error) {
query := r.URL.Query()
v.MsgResp, _ = strconv.ParseBool(query.Get(MsgResp))
if ws.onlineUserConnNum.Load() >= ws.wsMaxConnNum {
return nil, errs.ErrConnOverMaxNumLimit.WrapMsg("over max conn num limit")
return nil, servererrs.ErrConnOverMaxNumLimit.WrapMsg("over max conn num limit")
}
if v.Token = query.Get(Token); v.Token == "" {
return nil, errs.ErrConnArgsErr.WrapMsg("token is empty")
return nil, servererrs.ErrConnArgsErr.WrapMsg("token is empty")
}
if v.UserID = query.Get(WsUserID); v.UserID == "" {
return nil, errs.ErrConnArgsErr.WrapMsg("sendID is empty")
return nil, servererrs.ErrConnArgsErr.WrapMsg("sendID is empty")
}
platformIDStr := query.Get(PlatformID)
if platformIDStr == "" {
return nil, errs.ErrConnArgsErr.WrapMsg("platformID is empty")
return nil, servererrs.ErrConnArgsErr.WrapMsg("platformID is empty")
}
platformID, err := strconv.Atoi(platformIDStr)
if err != nil {
return nil, errs.ErrConnArgsErr.WrapMsg("platformID is not int")
return nil, servererrs.ErrConnArgsErr.WrapMsg("platformID is not int")
}
v.PlatformID = platformID
if err = authverify.WsVerifyToken(v.Token, v.UserID, ws.globalConfig.Secret, platformID); err != nil {
@ -461,12 +462,12 @@ func (ws *WsServer) ParseWSArgs(r *http.Request) (args *WSArgs, err error) {
switch v {
case constant.NormalToken:
case constant.KickedToken:
return nil, errs.ErrTokenKicked.Wrap()
return nil, servererrs.ErrTokenKicked.Wrap()
default:
return nil, errs.ErrTokenUnknown.WrapMsg(fmt.Sprintf("token status is %d", v))
return nil, servererrs.ErrTokenUnknown.WrapMsg(fmt.Sprintf("token status is %d", v))
}
} else {
return nil, errs.ErrTokenNotExist.Wrap()
return nil, servererrs.ErrTokenNotExist.Wrap()
}
return &v, nil
}

@ -16,7 +16,7 @@ package msggateway
import (
"context"
"github.com/openimsdk/tools/utils/goassist"
"github.com/openimsdk/tools/utils/datautil"
"sync"
"github.com/openimsdk/tools/log"
@ -93,7 +93,7 @@ func (u *UserMap) delete(key string, connRemoteAddr string) (isDeleteUser bool)
}
func (u *UserMap) deleteClients(key string, clients []*Client) (isDeleteUser bool) {
m := goassist.SliceToMapAny(clients, func(c *Client) (string, struct{}) {
m := datautil.SliceToMapAny(clients, func(c *Client) (string, struct{}) {
return c.ctx.GetRemoteAddr(), struct{}{}
})
allClients, existed := u.m.Load(key)

@ -44,6 +44,13 @@ const (
CallbackError = 80000
// General error codes.
ServerInternalError = 500 // Server internal error
ArgsError = 1001 // Input parameter error
NoPermissionError = 1002 // Insufficient permission
DuplicateKeyError = 1003
RecordNotFoundError = 1004 // Record does not exist
// Account error codes.
UserIDNotFoundError = 1101 // UserID does not exist or is not registered
RegisteredAlreadyError = 1102 // User is already registered

@ -22,6 +22,12 @@ var (
ErrCallback = errs.NewCodeError(CallbackError, "CallbackError")
ErrCallbackContinue = errs.NewCodeError(CallbackError, "ErrCallbackContinue")
ErrInternalServer = errs.NewCodeError(ServerInternalError, "ServerInternalError")
ErrArgs = errs.NewCodeError(ArgsError, "ArgsError")
ErrNoPermission = errs.NewCodeError(NoPermissionError, "NoPermissionError")
ErrDuplicateKey = errs.NewCodeError(DuplicateKeyError, "DuplicateKeyError")
ErrRecordNotFound = errs.NewCodeError(RecordNotFoundError, "RecordNotFoundError")
ErrUserIDNotFound = errs.NewCodeError(UserIDNotFoundError, "UserIDNotFoundError")
ErrGroupIDNotFound = errs.NewCodeError(GroupIDNotFoundError, "GroupIDNotFoundError")
ErrGroupIDExisted = errs.NewCodeError(GroupIDExisted, "GroupIDExisted")

@ -16,6 +16,8 @@ package rpcclient
import (
"context"
"github.com/openimsdk/open-im-server/v3/pkg/common/servererrs"
"github.com/openimsdk/tools/utils/datautil"
"strings"
"github.com/openimsdk/open-im-server/v3/pkg/authverify"
@ -23,9 +25,7 @@ import (
util "github.com/openimsdk/open-im-server/v3/pkg/util/genutil"
"github.com/openimsdk/protocol/sdkws"
"github.com/openimsdk/protocol/user"
"github.com/openimsdk/tools/discoveryregistry"
"github.com/openimsdk/tools/errs"
"github.com/openimsdk/tools/utils"
"github.com/openimsdk/tools/discovery"
"google.golang.org/grpc"
)
@ -33,14 +33,14 @@ import (
type User struct {
conn grpc.ClientConnInterface
Client user.UserClient
Discov discoveryregistry.SvcDiscoveryRegistry
Discov discovery.SvcDiscoveryRegistry
MessageGateWayRpcName string
manager *config.Manager
imAdmin *config.IMAdmin
}
// NewUser initializes and returns a User instance based on the provided service discovery registry.
func NewUser(discov discoveryregistry.SvcDiscoveryRegistry, rpcRegisterName, messageGateWayRpcName string,
func NewUser(discov discovery.SvcDiscoveryRegistry, rpcRegisterName, messageGateWayRpcName string,
manager *config.Manager, imAdmin *config.IMAdmin) *User {
conn, err := discov.GetConn(context.Background(), rpcRegisterName)
if err != nil {
@ -64,7 +64,7 @@ func NewUserRpcClientByUser(user *User) *UserRpcClient {
}
// NewUserRpcClient initializes a UserRpcClient based on the provided service discovery registry.
func NewUserRpcClient(client discoveryregistry.SvcDiscoveryRegistry, rpcRegisterName string,
func NewUserRpcClient(client discovery.SvcDiscoveryRegistry, rpcRegisterName string,
manager *config.Manager, imAdmin *config.IMAdmin) UserRpcClient {
return UserRpcClient(*NewUser(client, rpcRegisterName, "", manager, imAdmin))
}
@ -80,10 +80,10 @@ func (u *UserRpcClient) GetUsersInfo(ctx context.Context, userIDs []string) ([]*
if err != nil {
return nil, err
}
if ids := utils.Single(userIDs, utils.Slice(resp.UsersInfo, func(e *sdkws.UserInfo) string {
if ids := datautil.Single(userIDs, datautil.Slice(resp.UsersInfo, func(e *sdkws.UserInfo) string {
return e.UserID
})); len(ids) > 0 {
return nil, errs.ErrUserIDNotFound.WrapMsg(strings.Join(ids, ","))
return nil, servererrs.ErrUserIDNotFound.WrapMsg(strings.Join(ids, ","))
}
return resp.UsersInfo, nil
}
@ -103,7 +103,7 @@ func (u *UserRpcClient) GetUsersInfoMap(ctx context.Context, userIDs []string) (
if err != nil {
return nil, err
}
return utils.SliceToMap(users, func(e *sdkws.UserInfo) string {
return datautil.SliceToMap(users, func(e *sdkws.UserInfo) string {
return e.UserID
}), nil
}
@ -118,7 +118,7 @@ func (u *UserRpcClient) GetPublicUserInfos(
if err != nil {
return nil, err
}
return utils.Slice(users, func(e *sdkws.UserInfo) *sdkws.PublicUserInfo {
return datautil.Slice(users, func(e *sdkws.UserInfo) *sdkws.PublicUserInfo {
return &sdkws.PublicUserInfo{
UserID: e.UserID,
Nickname: e.Nickname,
@ -147,7 +147,7 @@ func (u *UserRpcClient) GetPublicUserInfoMap(
if err != nil {
return nil, err
}
return utils.SliceToMap(users, func(e *sdkws.PublicUserInfo) string {
return datautil.SliceToMap(users, func(e *sdkws.PublicUserInfo) string {
return e.UserID
}), nil
}

Loading…
Cancel
Save