From a18dcfce9e1d80d094b2047df6cd59ef81340890 Mon Sep 17 00:00:00 2001 From: Gordon <1432970085@qq.com> Date: Thu, 2 Jun 2022 16:44:55 +0800 Subject: [PATCH] ws and push update --- internal/msg_gateway/gate/callback.go | 10 +- internal/msg_gateway/gate/rpc_server.go | 114 +++++++++++++++--- internal/msg_gateway/gate/ws_server.go | 47 ++++---- internal/rpc/auth/auth.go | 2 +- .../constant/platform_number_id_to_name.go | 8 +- pkg/common/db/redisModel.go | 6 +- pkg/common/token_verify/jwt_token.go | 4 +- pkg/proto/relay/relay.proto | 21 +++- pkg/utils/strings.go | 9 +- pkg/utils/utils.go | 2 +- 10 files changed, 163 insertions(+), 60 deletions(-) diff --git a/internal/msg_gateway/gate/callback.go b/internal/msg_gateway/gate/callback.go index 58d3a98b2..f005f26a2 100644 --- a/internal/msg_gateway/gate/callback.go +++ b/internal/msg_gateway/gate/callback.go @@ -8,7 +8,7 @@ import ( http2 "net/http" ) -func callbackUserOnline(operationID, userID string, platformID int32, token string) cbApi.CommonCallbackResp { +func callbackUserOnline(operationID, userID string, platformID int, token string) cbApi.CommonCallbackResp { callbackResp := cbApi.CommonCallbackResp{OperationID: operationID} if !config.Config.Callback.CallbackUserOnline.Enable { return callbackResp @@ -19,7 +19,7 @@ func callbackUserOnline(operationID, userID string, platformID int32, token stri CallbackCommand: constant.CallbackUserOnlineCommand, OperationID: operationID, UserID: userID, - PlatformID: platformID, + PlatformID: int32(platformID), Platform: constant.PlatformIDToName(platformID), }} callbackUserOnlineResp := &cbApi.CallbackUserOnlineResp{CommonCallbackResp: callbackResp} @@ -30,7 +30,7 @@ func callbackUserOnline(operationID, userID string, platformID int32, token stri return callbackResp } -func callbackUserOffline(operationID, userID string, platform string) cbApi.CommonCallbackResp { +func callbackUserOffline(operationID, userID string, platformID int) cbApi.CommonCallbackResp { callbackResp := cbApi.CommonCallbackResp{OperationID: operationID} if !config.Config.Callback.CallbackUserOffline.Enable { return callbackResp @@ -39,8 +39,8 @@ func callbackUserOffline(operationID, userID string, platform string) cbApi.Comm CallbackCommand: constant.CallbackUserOfflineCommand, OperationID: operationID, UserID: userID, - PlatformID: constant.PlatformNameToID(platform), - Platform: platform, + PlatformID: int32(platformID), + Platform: constant.PlatformIDToName(platformID), }} callbackUserOfflineResp := &cbApi.CallbackUserOfflineResp{CommonCallbackResp: callbackResp} if err := http.PostReturn(config.Config.Callback.CallbackUrl, callbackOfflineReq, callbackUserOfflineResp, config.Config.Callback.CallbackUserOffline.CallbackTimeOut); err != nil { diff --git a/internal/msg_gateway/gate/rpc_server.go b/internal/msg_gateway/gate/rpc_server.go index 555015bb7..5249bdce8 100644 --- a/internal/msg_gateway/gate/rpc_server.go +++ b/internal/msg_gateway/gate/rpc_server.go @@ -25,6 +25,8 @@ type RPCServer struct { rpcRegisterName string etcdSchema string etcdAddr []string + platformList []int + pushTerminal []int } func (r *RPCServer) onInit(rpcPort int) { @@ -32,6 +34,8 @@ func (r *RPCServer) onInit(rpcPort int) { r.rpcRegisterName = config.Config.RpcRegisterName.OpenImOnlineMessageRelayName r.etcdSchema = config.Config.Etcd.EtcdSchema r.etcdAddr = config.Config.Etcd.EtcdAddr + r.platformList = genPlatformArray() + r.pushTerminal = []int{constant.IOSPlatformID, constant.AndroidPlatformID} } func (r *RPCServer) run() { listenIP := "" @@ -69,7 +73,7 @@ func (r *RPCServer) run() { } func (r *RPCServer) OnlinePushMsg(_ context.Context, in *pbRelay.OnlinePushMsgReq) (*pbRelay.OnlinePushMsgResp, error) { log.NewInfo(in.OperationID, "PushMsgToUser is arriving", in.String()) - var resp []*pbRelay.SingleMsgToUser + var resp []*pbRelay.SingleMsgToUserPlatform msgBytes, _ := proto.Marshal(in.MsgData) mReply := Resp{ ReqIdentifier: constant.WSPushMsg, @@ -84,22 +88,21 @@ func (r *RPCServer) OnlinePushMsg(_ context.Context, in *pbRelay.OnlinePushMsgRe } var tag bool recvID := in.PushToUserID - platformList := genPlatformArray() - for _, v := range platformList { + for _, v := range r.platformList { if conn := ws.getUserConn(recvID, v); conn != nil { tag = true resultCode := sendMsgToUser(conn, replyBytes.Bytes(), in, v, recvID) - temp := &pbRelay.SingleMsgToUser{ + temp := &pbRelay.SingleMsgToUserPlatform{ ResultCode: resultCode, RecvID: recvID, - RecvPlatFormID: constant.PlatformNameToID(v), + RecvPlatFormID: int32(v), } resp = append(resp, temp) } else { - temp := &pbRelay.SingleMsgToUser{ + temp := &pbRelay.SingleMsgToUserPlatform{ ResultCode: -1, RecvID: recvID, - RecvPlatFormID: constant.PlatformNameToID(v), + RecvPlatFormID: int32(v), } resp = append(resp, temp) } @@ -119,19 +122,19 @@ func (r *RPCServer) GetUsersOnlineStatus(_ context.Context, req *pbRelay.GetUser } var resp pbRelay.GetUsersOnlineStatusResp for _, userID := range req.UserIDList { - platformList := genPlatformArray() temp := new(pbRelay.GetUsersOnlineStatusResp_SuccessResult) temp.UserID = userID - for _, platform := range platformList { - if conn := ws.getUserConn(userID, platform); conn != nil { + userConnMap := ws.getUserAllCons(userID) + for platform, userConn := range userConnMap { + if userConn != nil { ps := new(pbRelay.GetUsersOnlineStatusResp_SuccessDetail) - ps.Platform = platform + ps.Platform = constant.PlatformIDToName(platform) ps.Status = constant.OnlineStatus temp.Status = constant.OnlineStatus temp.DetailPlatformStatus = append(temp.DetailPlatformStatus, ps) - } } + if temp.Status == constant.OnlineStatus { resp.SuccessResult = append(resp.SuccessResult, temp) } @@ -139,11 +142,90 @@ func (r *RPCServer) GetUsersOnlineStatus(_ context.Context, req *pbRelay.GetUser log.NewInfo(req.OperationID, "GetUsersOnlineStatus rpc return ", resp.String()) return &resp, nil } -func sendMsgToUser(conn *UserConn, bMsg []byte, in *pbRelay.OnlinePushMsgReq, RecvPlatForm, RecvID string) (ResultCode int64) { +func (r *RPCServer) OnlineBatchPushOneMsg(_ context.Context, req *pbRelay.OnlineBatchPushOneMsgReq) (*pbRelay.OnlineBatchPushOneMsgResp, error) { + log.NewInfo(req.OperationID, "BatchPushMsgToUser is arriving", req.String()) + var singleUserResult []*pbRelay.SingelMsgToUserResultList + msgBytes, _ := proto.Marshal(req.MsgData) + mReply := Resp{ + ReqIdentifier: constant.WSPushMsg, + OperationID: req.OperationID, + Data: msgBytes, + } + var replyBytes bytes.Buffer + enc := gob.NewEncoder(&replyBytes) + err := enc.Encode(mReply) + if err != nil { + log.NewError(req.OperationID, "data encode err", err.Error()) + } + for _, v := range req.PushToUserIDList { + var resp []*pbRelay.SingleMsgToUserPlatform + userConnMap := ws.getUserAllCons(v) + for platform, userConn := range userConnMap { + if userConn != nil { + resultCode := sendMsgBatchToUser(userConn, replyBytes.Bytes(), req, platform, v) + if resultCode == 0 && utils.IsContainInt(platform, r.pushTerminal) { //仅仅记录推送成功的平台端 + temp := &pbRelay.SingleMsgToUserPlatform{ + ResultCode: resultCode, + RecvID: v, + RecvPlatFormID: int32(platform), + } + resp = append(resp, temp) + } + + } + } + //for _, x := range r.platformList { + // if conn := ws.getUserConn(v, x); conn != nil { + // resultCode := sendMsgBatchToUser(conn, replyBytes.Bytes(), req, x, v) + // temp := &pbRelay.SingleMsgToUserPlatform{ + // ResultCode: resultCode, + // RecvID: v, + // RecvPlatFormID: constant.PlatformNameToID(x), + // } + // resp = append(resp, temp) + // } else { + // if utils.IsContain(x,r.pushTerminal) { + // temp := &pbRelay.SingleMsgToUserPlatform{ + // ResultCode: -1, + // RecvID: v, + // RecvPlatFormID: constant.PlatformNameToID(x), + // } + // resp = append(resp, temp) + // } + // + // } + //} + tempT := &pbRelay.SingelMsgToUserResultList{ + UserID: v, + Resp: resp, + } + singleUserResult = append(singleUserResult, tempT) + + } + + return &pbRelay.OnlineBatchPushOneMsgResp{ + SinglePushResult: singleUserResult, + }, nil +} +func sendMsgToUser(conn *UserConn, bMsg []byte, in *pbRelay.OnlinePushMsgReq, RecvPlatForm int, RecvID string) (ResultCode int64) { + err := ws.writeMsg(conn, websocket.BinaryMessage, bMsg) + if err != nil { + log.NewError(in.OperationID, "PushMsgToUser is failed By Ws", "Addr", conn.RemoteAddr().String(), + "error", err, "senderPlatform", constant.PlatformIDToName(int(in.MsgData.SenderPlatformID)), "recvPlatform", RecvPlatForm, "args", in.String(), "recvID", RecvID) + ResultCode = -2 + return ResultCode + } else { + log.NewDebug(in.OperationID, "PushMsgToUser is success By Ws", "args", in.String(), "recvPlatForm", RecvPlatForm, "recvID", RecvID) + ResultCode = 0 + return ResultCode + } + +} +func sendMsgBatchToUser(conn *UserConn, bMsg []byte, in *pbRelay.OnlineBatchPushOneMsgReq, RecvPlatForm int, RecvID string) (ResultCode int64) { err := ws.writeMsg(conn, websocket.BinaryMessage, bMsg) if err != nil { log.NewError(in.OperationID, "PushMsgToUser is failed By Ws", "Addr", conn.RemoteAddr().String(), - "error", err, "senderPlatform", constant.PlatformIDToName(in.MsgData.SenderPlatformID), "recvPlatform", RecvPlatForm, "args", in.String(), "recvID", RecvID) + "error", err, "senderPlatform", constant.PlatformIDToName(int(in.MsgData.SenderPlatformID)), "recvPlatform", RecvPlatForm, "args", in.String(), "recvID", RecvID) ResultCode = -2 return ResultCode } else { @@ -153,9 +235,9 @@ func sendMsgToUser(conn *UserConn, bMsg []byte, in *pbRelay.OnlinePushMsgReq, Re } } -func genPlatformArray() (array []string) { +func genPlatformArray() (array []int) { for i := 1; i <= constant.LinuxPlatformID; i++ { - array = append(array, constant.PlatformIDToName(int32(i))) + array = append(array, i) } return array } diff --git a/internal/msg_gateway/gate/ws_server.go b/internal/msg_gateway/gate/ws_server.go index 360b82926..e43a81f5a 100644 --- a/internal/msg_gateway/gate/ws_server.go +++ b/internal/msg_gateway/gate/ws_server.go @@ -25,15 +25,15 @@ type WServer struct { wsAddr string wsMaxConnNum int wsUpGrader *websocket.Upgrader - wsConnToUser map[*UserConn]map[string]string - wsUserToConn map[string]map[string]*UserConn + wsConnToUser map[*UserConn]map[int]string + wsUserToConn map[string]map[int]*UserConn } func (ws *WServer) onInit(wsPort int) { ws.wsAddr = ":" + utils.IntToString(wsPort) ws.wsMaxConnNum = config.Config.LongConnSvr.WebsocketMaxConnNum - ws.wsConnToUser = make(map[*UserConn]map[string]string) - ws.wsUserToConn = make(map[string]map[string]*UserConn) + ws.wsConnToUser = make(map[*UserConn]map[int]string) + ws.wsUserToConn = make(map[string]map[int]*UserConn) ws.wsUpGrader = &websocket.Upgrader{ HandshakeTimeout: time.Duration(config.Config.LongConnSvr.WebsocketTimeOut) * time.Second, ReadBufferSize: config.Config.LongConnSvr.WebsocketMaxMsgLen, @@ -62,7 +62,7 @@ func (ws *WServer) wsHandler(w http.ResponseWriter, r *http.Request) { //Initialize a lock for each user newConn := &UserConn{conn, new(sync.Mutex)} userCount++ - ws.addUserConn(query["sendID"][0], int32(utils.StringToInt64(query["platformID"][0])), newConn, query["token"][0]) + ws.addUserConn(query["sendID"][0], utils.StringToInt(query["platformID"][0]), newConn, query["token"][0]) go ws.readMsg(newConn) } } @@ -94,11 +94,11 @@ func (ws *WServer) writeMsg(conn *UserConn, a int, msg []byte) error { return conn.WriteMessage(a, msg) } -func (ws *WServer) MultiTerminalLoginChecker(uid string, platformID int32, newConn *UserConn, token string, operationID string) { +func (ws *WServer) MultiTerminalLoginChecker(uid string, platformID int, newConn *UserConn, token string, operationID string) { switch config.Config.MultiLoginPolicy { case constant.AllLoginButSameTermKick: if oldConnMap, ok := ws.wsUserToConn[uid]; ok { // user->map[platform->conn] - if oldConn, ok := oldConnMap[constant.PlatformIDToName(platformID)]; ok { + if oldConn, ok := oldConnMap[platformID]; ok { log.NewDebug(operationID, uid, platformID, "kick old conn") ws.sendKickMsg(oldConn, newConn) m, err := db.DB.GetTokenMapByUidPid(uid, constant.PlatformIDToName(platformID)) @@ -122,7 +122,7 @@ func (ws *WServer) MultiTerminalLoginChecker(uid string, platformID int32, newCo return } err = oldConn.Close() - delete(oldConnMap, constant.PlatformIDToName(platformID)) + delete(oldConnMap, platformID) ws.wsUserToConn[uid] = oldConnMap if len(oldConnMap) == 0 { delete(ws.wsUserToConn, uid) @@ -133,7 +133,7 @@ func (ws *WServer) MultiTerminalLoginChecker(uid string, platformID int32, newCo } } else { - log.NewWarn(operationID, "abnormal uid-conn ", uid, platformID, oldConnMap[constant.PlatformIDToName(platformID)]) + log.NewWarn(operationID, "abnormal uid-conn ", uid, platformID, oldConnMap[platformID]) } } else { @@ -162,7 +162,7 @@ func (ws *WServer) sendKickMsg(oldConn, newConn *UserConn) { log.NewError(mReply.OperationID, mReply.ReqIdentifier, mReply.ErrCode, mReply.ErrMsg, "WS WriteMsg error", oldConn.RemoteAddr().String(), newConn.RemoteAddr().String(), err.Error()) } } -func (ws *WServer) addUserConn(uid string, platformID int32, conn *UserConn, token string) { +func (ws *WServer) addUserConn(uid string, platformID int, conn *UserConn, token string) { rwLock.Lock() defer rwLock.Unlock() operationID := utils.OperationIDGenerator() @@ -172,21 +172,21 @@ func (ws *WServer) addUserConn(uid string, platformID int32, conn *UserConn, tok } ws.MultiTerminalLoginChecker(uid, platformID, conn, token, operationID) if oldConnMap, ok := ws.wsUserToConn[uid]; ok { - oldConnMap[constant.PlatformIDToName(platformID)] = conn + oldConnMap[platformID] = conn ws.wsUserToConn[uid] = oldConnMap log.Debug(operationID, "user not first come in, add conn ", uid, platformID, conn, oldConnMap) } else { - i := make(map[string]*UserConn) - i[constant.PlatformIDToName(platformID)] = conn + i := make(map[int]*UserConn) + i[platformID] = conn ws.wsUserToConn[uid] = i log.Debug(operationID, "user first come in, new user, conn", uid, platformID, conn, ws.wsUserToConn[uid]) } if oldStringMap, ok := ws.wsConnToUser[conn]; ok { - oldStringMap[constant.PlatformIDToName(platformID)] = uid + oldStringMap[platformID] = uid ws.wsConnToUser[conn] = oldStringMap } else { - i := make(map[string]string) - i[constant.PlatformIDToName(platformID)] = uid + i := make(map[int]string) + i[platformID] = uid ws.wsConnToUser[conn] = i } count := 0 @@ -200,7 +200,8 @@ func (ws *WServer) delUserConn(conn *UserConn) { rwLock.Lock() defer rwLock.Unlock() operationID := utils.OperationIDGenerator() - var platform, uid string + var uid string + var platform int if oldStringMap, ok := ws.wsConnToUser[conn]; ok { for k, v := range oldStringMap { platform = k @@ -233,7 +234,7 @@ func (ws *WServer) delUserConn(conn *UserConn) { } } -func (ws *WServer) getUserConn(uid string, platform string) *UserConn { +func (ws *WServer) getUserConn(uid string, platform int) *UserConn { rwLock.RLock() defer rwLock.RUnlock() if connMap, ok := ws.wsUserToConn[uid]; ok { @@ -243,7 +244,7 @@ func (ws *WServer) getUserConn(uid string, platform string) *UserConn { } return nil } -func (ws *WServer) getSingleUserAllConn(uid string) map[string]*UserConn { +func (ws *WServer) getUserAllCons(uid string) map[int]*UserConn { rwLock.RLock() defer rwLock.RUnlock() if connMap, ok := ws.wsUserToConn[uid]; ok { @@ -251,7 +252,8 @@ func (ws *WServer) getSingleUserAllConn(uid string) map[string]*UserConn { } return nil } -func (ws *WServer) getUserUid(conn *UserConn) (uid, platform string) { + +func (ws *WServer) getUserUid(conn *UserConn) (uid string, platform int) { rwLock.RLock() defer rwLock.RUnlock() @@ -262,7 +264,7 @@ func (ws *WServer) getUserUid(conn *UserConn) (uid, platform string) { } return uid, platform } - return "", "" + return "", 0 } func (ws *WServer) headerCheck(w http.ResponseWriter, r *http.Request) bool { status := http.StatusUnauthorized @@ -291,6 +293,3 @@ func (ws *WServer) headerCheck(w http.ResponseWriter, r *http.Request) bool { return false } } -func genMapKey(uid string, platformID int32) string { - return uid + " " + constant.PlatformIDToName(platformID) -} diff --git a/internal/rpc/auth/auth.go b/internal/rpc/auth/auth.go index 09b903c68..0066fc8d4 100644 --- a/internal/rpc/auth/auth.go +++ b/internal/rpc/auth/auth.go @@ -47,7 +47,7 @@ func (rpc *rpcAuth) UserToken(_ context.Context, req *pbAuth.UserTokenReq) (*pbA return &pbAuth.UserTokenResp{CommonResp: &pbAuth.CommonResp{ErrCode: constant.ErrDB.ErrCode, ErrMsg: errMsg}}, nil } - tokens, expTime, err := token_verify.CreateToken(req.FromUserID, req.Platform) + tokens, expTime, err := token_verify.CreateToken(req.FromUserID, int(req.Platform)) if err != nil { errMsg := req.OperationID + " token_verify.CreateToken failed " + err.Error() + req.FromUserID + utils.Int32ToString(req.Platform) log.NewError(req.OperationID, errMsg) diff --git a/pkg/common/constant/platform_number_id_to_name.go b/pkg/common/constant/platform_number_id_to_name.go index c3a970b69..cac93dea8 100644 --- a/pkg/common/constant/platform_number_id_to_name.go +++ b/pkg/common/constant/platform_number_id_to_name.go @@ -27,7 +27,7 @@ const ( TerminalMobile = "Mobile" ) -var PlatformID2Name = map[int32]string{ +var PlatformID2Name = map[int]string{ IOSPlatformID: IOSPlatformStr, AndroidPlatformID: AndroidPlatformStr, WindowsPlatformID: WindowsPlatformStr, @@ -36,7 +36,7 @@ var PlatformID2Name = map[int32]string{ MiniWebPlatformID: MiniWebPlatformStr, LinuxPlatformID: LinuxPlatformStr, } -var PlatformName2ID = map[string]int32{ +var PlatformName2ID = map[string]int{ IOSPlatformStr: IOSPlatformID, AndroidPlatformStr: AndroidPlatformID, WindowsPlatformStr: WindowsPlatformID, @@ -55,10 +55,10 @@ var Platform2class = map[string]string{ LinuxPlatformStr: TerminalPC, } -func PlatformIDToName(num int32) string { +func PlatformIDToName(num int) string { return PlatformID2Name[num] } -func PlatformNameToID(name string) int32 { +func PlatformNameToID(name string) int { return PlatformName2ID[name] } func PlatformNameToClass(name string) string { diff --git a/pkg/common/db/redisModel.go b/pkg/common/db/redisModel.go index da5a79492..8f978dc3e 100644 --- a/pkg/common/db/redisModel.go +++ b/pkg/common/db/redisModel.go @@ -111,7 +111,7 @@ func (d *DataBases) DelAppleDeviceToken(accountAddress string) (err error) { } //Store userid and platform class to redis -func (d *DataBases) AddTokenFlag(userID string, platformID int32, token string, flag int) error { +func (d *DataBases) AddTokenFlag(userID string, platformID int, token string, flag int) error { key := uidPidToken + userID + ":" + constant.PlatformIDToName(platformID) log2.NewDebug("", "add token key is ", key) _, err1 := d.Exec("HSet", key, token, flag) @@ -123,12 +123,12 @@ func (d *DataBases) GetTokenMapByUidPid(userID, platformID string) (map[string]i log2.NewDebug("", "get token key is ", key) return redis.IntMap(d.Exec("HGETALL", key)) } -func (d *DataBases) SetTokenMapByUidPid(userID string, platformID int32, m map[string]int) error { +func (d *DataBases) SetTokenMapByUidPid(userID string, platformID int, m map[string]int) error { key := uidPidToken + userID + ":" + constant.PlatformIDToName(platformID) _, err := d.Exec("hmset", key, redis.Args{}.Add().AddFlat(m)...) return err } -func (d *DataBases) DeleteTokenByUidPid(userID string, platformID int32, fields []string) error { +func (d *DataBases) DeleteTokenByUidPid(userID string, platformID int, fields []string) error { key := uidPidToken + userID + ":" + constant.PlatformIDToName(platformID) _, err := d.Exec("HDEL", key, redis.Args{}.Add().AddFlat(fields)...) return err diff --git a/pkg/common/token_verify/jwt_token.go b/pkg/common/token_verify/jwt_token.go index 363feeedc..08ff786aa 100644 --- a/pkg/common/token_verify/jwt_token.go +++ b/pkg/common/token_verify/jwt_token.go @@ -37,7 +37,7 @@ func BuildClaims(uid, platform string, ttl int64) Claims { }} } -func CreateToken(userID string, platformID int32) (string, int64, error) { +func CreateToken(userID string, platformID int) (string, int64, error) { claims := BuildClaims(userID, constant.PlatformIDToName(platformID), config.Config.TokenPolicy.AccessExpire) token := jwt.NewWithClaims(jwt.SigningMethodHS256, claims) tokenString, err := token.SignedString([]byte(config.Config.TokenPolicy.AccessSecret)) @@ -233,7 +233,7 @@ func WsVerifyToken(token, uid string, platformID string, operationID string) (bo if claims.UID != uid { return false, utils.Wrap(&constant.ErrTokenUnknown, "uid is not same to token uid"), "uid is not same to token uid" } - if claims.Platform != constant.PlatformIDToName(utils.StringToInt32(platformID)) { + if claims.Platform != constant.PlatformIDToName(utils.StringToInt(platformID)) { return false, utils.Wrap(&constant.ErrTokenUnknown, "platform is not same to token platform"), "platform is not same to token platform" } log.NewDebug(operationID, utils.GetSelfFuncName(), " check ok ", claims.UID, uid, claims.Platform) diff --git a/pkg/proto/relay/relay.proto b/pkg/proto/relay/relay.proto index d8c5f1b7e..b02edfa2c 100644 --- a/pkg/proto/relay/relay.proto +++ b/pkg/proto/relay/relay.proto @@ -9,8 +9,22 @@ message OnlinePushMsgReq { string pushToUserID = 3; } message OnlinePushMsgResp{ -repeated SingleMsgToUser resp = 1; -}//message SendMsgByWSReq{ +repeated SingleMsgToUserPlatform resp = 1; +} +message SingelMsgToUserResultList{ + string userID =1; + repeated SingleMsgToUserPlatform resp = 2; + +} +message OnlineBatchPushOneMsgReq{ + string OperationID = 1; + server_api_params.MsgData msgData = 2; + repeated string pushToUserIDList = 3; +} +message OnlineBatchPushOneMsgResp{ + repeated SingelMsgToUserResultList singlePushResult= 1; +} +//message SendMsgByWSReq{ // string SendID = 1; // string RecvID = 2; // string Content = 3; @@ -22,7 +36,7 @@ repeated SingleMsgToUser resp = 1; // int64 PlatformID = 9; //} -message SingleMsgToUser{ +message SingleMsgToUserPlatform{ int64 ResultCode = 1; string RecvID = 2; int32 RecvPlatFormID = 3; @@ -56,6 +70,7 @@ message GetUsersOnlineStatusResp{ service OnlineMessageRelayService { rpc OnlinePushMsg(OnlinePushMsgReq) returns(OnlinePushMsgResp); rpc GetUsersOnlineStatus(GetUsersOnlineStatusReq)returns(GetUsersOnlineStatusResp); + rpc OnlineBatchPushOneMsg(OnlineBatchPushOneMsgReq) returns(OnlineBatchPushOneMsgResp); // rpc SendMsgByWS(SendMsgByWSReq) returns(MsgToUserResp); } diff --git a/pkg/utils/strings.go b/pkg/utils/strings.go index a05ba927a..0e7c16106 100644 --- a/pkg/utils/strings.go +++ b/pkg/utils/strings.go @@ -51,7 +51,14 @@ func IsContainInt32(target int32, List []int32) bool { } return false } - +func IsContainInt(target int, List []int) bool { + for _, element := range List { + if target == element { + return true + } + } + return false +} func InterfaceArrayToStringArray(data []interface{}) (i []string) { for _, param := range data { i = append(i, param.(string)) diff --git a/pkg/utils/utils.go b/pkg/utils/utils.go index 8626f13cb..8d144c7e8 100644 --- a/pkg/utils/utils.go +++ b/pkg/utils/utils.go @@ -149,7 +149,7 @@ func String2Pb(s string, pb proto.Message) error { return proto.Unmarshal([]byte(s), pb) } -func Map2Pb(m map[string]interface{}) (pb proto.Message, err error) { +func Map2Pb(m map[string]string) (pb proto.Message, err error) { b, err := json.Marshal(m) if err != nil { return nil, err