get all node online user status

pull/103/head
Gordon 3 years ago
parent 0f20e58a46
commit c2a45cdbeb

@ -8,9 +8,13 @@ package manage
import ( import (
"Open_IM/pkg/common/config" "Open_IM/pkg/common/config"
"Open_IM/pkg/common/constant"
"Open_IM/pkg/common/log" "Open_IM/pkg/common/log"
"Open_IM/pkg/common/token_verify"
"Open_IM/pkg/grpc-etcdv3/getcdv3" "Open_IM/pkg/grpc-etcdv3/getcdv3"
pbRelay "Open_IM/pkg/proto/relay"
pbUser "Open_IM/pkg/proto/user" pbUser "Open_IM/pkg/proto/user"
"Open_IM/pkg/utils"
"context" "context"
"github.com/gin-gonic/gin" "github.com/gin-gonic/gin"
"net/http" "net/http"
@ -24,6 +28,10 @@ type paramsDeleteUsers struct {
type paramsGetAllUsersUid struct { type paramsGetAllUsersUid struct {
OperationID string `json:"operationID" binding:"required"` OperationID string `json:"operationID" binding:"required"`
} }
type paramsGetUsersOnlineStatus struct {
OperationID string `json:"operationID" binding:"required"`
UserIDList []string `json:"userIDList" binding:"required,lte=200"`
}
func DeleteUser(c *gin.Context) { func DeleteUser(c *gin.Context) {
params := paramsDeleteUsers{} params := paramsDeleteUsers{}
@ -80,3 +88,66 @@ func GetAllUsersUid(c *gin.Context) {
c.JSON(http.StatusOK, resp) c.JSON(http.StatusOK, resp)
} }
func GetUsersOnlineStatus(c *gin.Context) {
params := paramsGetUsersOnlineStatus{}
if err := c.BindJSON(&params); err != nil {
c.JSON(http.StatusBadRequest, gin.H{"errCode": 400, "errMsg": err.Error()})
return
}
claims, err := token_verify.ParseToken(c.Request.Header.Get("token"))
if err != nil {
log.ErrorByKv("parse token failed", params.OperationID, "err", err.Error())
c.JSON(http.StatusBadRequest, gin.H{"errCode": 401, "errMsg": err.Error()})
return
}
if !utils.IsContain(claims.UID, config.Config.Manager.AppManagerUid) {
log.ErrorByKv(" Authentication failed", params.OperationID, "args", c)
c.JSON(http.StatusBadRequest, gin.H{"errCode": 402, "errMsg": "not authorized"})
return
}
req := &pbRelay.GetUsersOnlineStatusReq{
OperationID: params.OperationID,
UserIDList: params.UserIDList,
}
var wsResult []*pbRelay.GetUsersOnlineStatusResp_SuccessResult
var respResult []*pbRelay.GetUsersOnlineStatusResp_SuccessResult
flag := false
log.NewDebug(params.OperationID, "GetUsersOnlineStatus req come here", params.UserIDList)
grpcCons := getcdv3.GetConn4Unique(config.Config.Etcd.EtcdSchema, strings.Join(config.Config.Etcd.EtcdAddr, ","), config.Config.RpcRegisterName.OpenImOnlineMessageRelayName)
for _, v := range grpcCons {
client := pbRelay.NewOnlineMessageRelayServiceClient(v)
reply, err := client.GetUsersOnlineStatus(context.Background(), req)
if err != nil {
log.NewError(params.OperationID, "GetUsersOnlineStatus rpc err", req.String(), err.Error())
continue
} else {
if reply.ErrCode == 0 {
wsResult = append(wsResult, reply.SuccessResult...)
}
}
}
log.NewDebug(params.OperationID, "call GetUsersOnlineStatus rpc server is success", wsResult)
//Online data merge of each node
for _, v1 := range params.UserIDList {
flag = false
temp := new(pbRelay.GetUsersOnlineStatusResp_SuccessResult)
for _, v2 := range wsResult {
if v2.UserID == v1 {
flag = true
temp.UserID = v1
temp.Status = constant.OnlineStatus
temp.DetailPlatformStatus = append(temp.DetailPlatformStatus, v2.DetailPlatformStatus...)
}
}
if !flag {
temp.UserID = v1
temp.Status = constant.OfflineStatus
}
respResult = append(respResult, temp)
}
log.NewDebug(params.OperationID, "Finished merged data", respResult)
resp := gin.H{"errCode": 0, "errMsg": "", "successResult": respResult}
c.JSON(http.StatusOK, resp)
}

@ -122,7 +122,31 @@ func (r *RPCServer) MsgToUser(_ context.Context, in *pbRelay.MsgToUserReq) (*pbR
Resp: resp, Resp: resp,
}, nil }, nil
} }
func (r *RPCServer) GetUsersOnlineStatus(_ context.Context, req *pbRelay.GetUsersOnlineStatusReq) (*pbRelay.GetUsersOnlineStatusResp, error) {
log.NewDebug(req.OperationID, "rpc GetUsersOnlineStatus arrived server", req.String())
var UIDAndPID []string
var resp pbRelay.GetUsersOnlineStatusResp
for _, v1 := range req.UserIDList {
userIDList := genUidPlatformArray(v1)
temp := new(pbRelay.GetUsersOnlineStatusResp_SuccessResult)
temp.UserID = v1
for _, v2 := range userIDList {
UIDAndPID = strings.Split(v2, " ")
if conn := ws.getUserConn(v2); conn != nil {
ps := new(pbRelay.GetUsersOnlineStatusResp_SuccessDetail)
ps.Platform = UIDAndPID[1]
ps.Status = constant.OnlineStatus
temp.Status = constant.OnlineStatus
temp.DetailPlatformStatus = append(temp.DetailPlatformStatus, ps)
}
}
if temp.Status == constant.OnlineStatus {
resp.SuccessResult = append(resp.SuccessResult, temp)
}
}
return &resp, nil
}
func sendMsgToUser(conn *UserConn, bMsg []byte, in *pbRelay.MsgToUserReq, RecvPlatForm, RecvID string) (ResultCode int64) { func sendMsgToUser(conn *UserConn, bMsg []byte, in *pbRelay.MsgToUserReq, RecvPlatForm, RecvID string) (ResultCode int64) {
err := ws.writeMsg(conn, websocket.BinaryMessage, bMsg) err := ws.writeMsg(conn, websocket.BinaryMessage, bMsg)
if err != nil { if err != nil {

@ -48,8 +48,9 @@ func MsgToUser(sendPbData *pbRelay.MsgToUserReq, OfflineInfo, Options string) {
reply, err := msgClient.MsgToUser(context.Background(), sendPbData) reply, err := msgClient.MsgToUser(context.Background(), sendPbData)
if err != nil { if err != nil {
log.InfoByKv("push data to client rpc err", sendPbData.OperationID, "err", err) log.InfoByKv("push data to client rpc err", sendPbData.OperationID, "err", err)
continue
} }
if reply != nil && reply.Resp != nil && err == nil { if reply != nil && reply.Resp != nil {
wsResult = append(wsResult, reply.Resp...) wsResult = append(wsResult, reply.Resp...)
} }
} }

@ -86,6 +86,9 @@ const (
WebAndOther = 3 WebAndOther = 3
//Pc端互斥移动端互斥但是web端可以同时在线 //Pc端互斥移动端互斥但是web端可以同时在线
PcMobileAndWeb = 4 PcMobileAndWeb = 4
OnlineStatus = "Online"
OfflineStatus = "Offline"
) )
var ContentType2PushContent = map[int64]string{ var ContentType2PushContent = map[int64]string{

Loading…
Cancel
Save