fix bug: multiple gateway kick user (#568)

* new feature: add batch send msg

* new feature: add batch send msg

* new feature: add batch send msg

* new feature: add batch send msg

* new feature: add batch send msg

* new feature: add batch send msg

* fix bug: multiple gateway kick user

* fix bug: multiple gateway kick user

* fix bug: multiple gateway kick user

* fix bug: multiple gateway kick user

* fix bug: multiple gateway kick user
pull/571/head
WangchuXiao 12 months ago committed by GitHub
parent 0c23b3a443
commit 2f59be98d1
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -214,6 +214,7 @@ func (m *MessageApi) SendMessage(c *gin.Context) {
if err != nil {
log.ZError(c, "decodeData failed", err)
apiresp.GinError(c, err)
return
}
sendMsgReq.MsgData.RecvID = req.RecvID
var status int
@ -260,6 +261,7 @@ func (m *MessageApi) BatchSendMsg(c *gin.Context) {
if err != nil {
log.ZError(c, "GetAllUserIDs failed", err)
apiresp.GinError(c, err)
return
}
if len(recvIDsPart) < showNumber {
recvIDs = append(recvIDs, recvIDsPart...)
@ -275,6 +277,7 @@ func (m *MessageApi) BatchSendMsg(c *gin.Context) {
if err != nil {
log.ZError(c, "decodeData failed", err)
apiresp.GinError(c, err)
return
}
for _, recvID := range recvIDs {
sendMsgReq.MsgData.RecvID = recvID

@ -181,13 +181,12 @@ func (s *Server) KickUserOffline(
if clients, _, ok := s.LongConnServer.GetUserPlatformCons(v, int(req.PlatformID)); ok {
for _, client := range clients {
log.ZDebug(ctx, "kick user offline", "userID", v, "platformID", req.PlatformID, "client", client)
err := client.KickOnlineMessage()
if err != nil {
return nil, err
if err := client.longConnServer.KickUserConn(client); err != nil {
log.ZWarn(ctx, "kick user offline failed", err, "userID", v, "platformID", req.PlatformID)
}
}
} else {
log.ZWarn(ctx, "conn not exist", nil, "userID", v, "platformID", req.PlatformID)
log.ZInfo(ctx, "conn not exist", "userID", v, "platformID", req.PlatformID)
}
}
return &msggateway.KickUserOfflineResp{}, nil

@ -47,6 +47,7 @@ type LongConnServer interface {
Validate(s interface{}) error
SetCacheHandler(cache cache.MsgModel)
SetDiscoveryRegistry(client discoveryregistry.SvcDiscoveryRegistry)
KickUserConn(client *Client) error
UnRegister(c *Client)
Compressor
Encoder
@ -145,7 +146,7 @@ func (ws *WsServer) Run() error {
case client = <-ws.unregisterChan:
ws.unregisterClient(client)
case onlineInfo := <-ws.kickHandlerChan:
ws.multiTerminalLoginChecker(onlineInfo)
ws.multiTerminalLoginChecker(onlineInfo.clientOK, onlineInfo.oldClients, onlineInfo.newClient)
}
}
}()
@ -207,80 +208,77 @@ func getRemoteAdders(client []*Client) string {
return ret
}
func (ws *WsServer) multiTerminalLoginChecker(info *kickHandler) {
func (ws *WsServer) KickUserConn(client *Client) error {
ws.clients.deleteClients(client.UserID, []*Client{client})
return client.KickOnlineMessage()
}
func (ws *WsServer) multiTerminalLoginChecker(clientOK bool, oldClients []*Client, newClient *Client) {
switch config.Config.MultiLoginPolicy {
case constant.DefalutNotKick:
case constant.PCAndOther:
if constant.PlatformIDToClass(info.newClient.PlatformID) == constant.TerminalPC {
if constant.PlatformIDToClass(newClient.PlatformID) == constant.TerminalPC {
return
}
fallthrough
case constant.AllLoginButSameTermKick:
if info.clientOK {
ws.clients.deleteClients(info.newClient.UserID, info.oldClients)
for _, c := range info.oldClients {
if clientOK {
ws.clients.deleteClients(newClient.UserID, oldClients)
for _, c := range oldClients {
err := c.KickOnlineMessage()
if err != nil {
log.ZWarn(c.ctx, "KickOnlineMessage", err)
}
}
m, err := ws.cache.GetTokensWithoutError(
info.newClient.ctx,
info.newClient.UserID,
info.newClient.PlatformID,
newClient.ctx,
newClient.UserID,
newClient.PlatformID,
)
if err != nil && err != redis.Nil {
log.ZWarn(
info.newClient.ctx,
newClient.ctx,
"get token from redis err",
err,
"userID",
info.newClient.UserID,
newClient.UserID,
"platformID",
info.newClient.PlatformID,
newClient.PlatformID,
)
return
}
if m == nil {
log.ZWarn(
info.newClient.ctx,
newClient.ctx,
"m is nil",
errors.New("m is nil"),
"userID",
info.newClient.UserID,
newClient.UserID,
"platformID",
info.newClient.PlatformID,
newClient.PlatformID,
)
return
}
log.ZDebug(
info.newClient.ctx,
newClient.ctx,
"get token from redis",
"userID",
info.newClient.UserID,
newClient.UserID,
"platformID",
info.newClient.PlatformID,
newClient.PlatformID,
"tokenMap",
m,
)
for k := range m {
if k != info.newClient.ctx.GetToken() {
if k != newClient.ctx.GetToken() {
m[k] = constant.KickedToken
}
}
log.ZDebug(info.newClient.ctx, "set token map is ", "token map", m, "userID", info.newClient.UserID)
err = ws.cache.SetTokenMapByUidPid(info.newClient.ctx, info.newClient.UserID, info.newClient.PlatformID, m)
log.ZDebug(newClient.ctx, "set token map is ", "token map", m, "userID", newClient.UserID)
err = ws.cache.SetTokenMapByUidPid(newClient.ctx, newClient.UserID, newClient.PlatformID, m)
if err != nil {
log.ZWarn(
info.newClient.ctx,
"SetTokenMapByUidPid err",
err,
"userID",
info.newClient.UserID,
"platformID",
info.newClient.PlatformID,
)
log.ZWarn(newClient.ctx, "SetTokenMapByUidPid err", err, "userID", newClient.UserID, "platformID", newClient.PlatformID)
return
}
}

@ -23,6 +23,7 @@ import (
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/constant"
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/db/cache"
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/db/controller"
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/log"
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/mcontext"
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/tokenverify"
"github.com/OpenIMSDK/Open-IM-Server/pkg/discoveryregistry"
@ -129,11 +130,16 @@ func (s *authServer) forceKickOff(ctx context.Context, userID string, platformID
if err != nil {
return err
}
for _, v := range conns {
log.ZDebug(ctx, "forceKickOff", "conn", v.(*grpc.ClientConn).Target())
}
for _, v := range conns {
client := msggateway.NewMsgGatewayClient(v)
kickReq := &msggateway.KickUserOfflineReq{KickUserIDList: []string{userID}, PlatformID: platformID}
_, err := client.KickUserOffline(ctx, kickReq)
return utils.Wrap(err, "")
if err != nil {
log.ZError(ctx, "forceKickOff", err, "kickReq", kickReq)
}
}
return nil
}

@ -41,11 +41,11 @@ func StartCronTask() error {
panic(err)
}
log.ZInfo(context.Background(), "start msgDestruct cron task", "cron config", config.Config.MsgDestructTime)
_, err = c.AddFunc(config.Config.MsgDestructTime, msgTool.ConversationsDestructMsgs)
if err != nil {
fmt.Println("start conversationsDestructMsgs cron failed", err.Error(), config.Config.ChatRecordsClearTime)
panic(err)
}
// _, err = c.AddFunc(config.Config.MsgDestructTime, msgTool.ConversationsDestructMsgs)
// if err != nil {
// fmt.Println("start conversationsDestructMsgs cron failed", err.Error(), config.Config.ChatRecordsClearTime)
// panic(err)
// }
c.Start()
wg.Wait()
return nil

@ -126,6 +126,11 @@ func (x *MarkMsgsAsReadReq) Check() error {
if x.UserID == "" {
return errs.ErrArgs.Wrap("userID is empty")
}
for _, seq := range x.Seqs {
if seq == 0 {
return errs.ErrArgs.Wrap("seqs has 0 value is invalid")
}
}
return nil
}
@ -139,6 +144,11 @@ func (x *MarkConversationAsReadReq) Check() error {
if x.HasReadSeq < 1 {
return errs.ErrArgs.Wrap("hasReadSeq is invalid")
}
for _, seq := range x.Seqs {
if seq == 0 {
return errs.ErrArgs.Wrap("seqs has 0 value is invalid")
}
}
return nil
}

Loading…
Cancel
Save