Signed-off-by: ‘hanzhixiao’ <‘709674996@qq.com’>
pull/606/head
‘hanzhixiao’ 2 years ago
commit da90d4c5bf

@ -182,7 +182,7 @@ We take notes of each [biweekly meeting](https://github.com/OpenIMSDK/Open-IM-Se
## Who are using Open-IM-Server ## Who are using Open-IM-Server
The [user case studies](https://github.com/OpenIMSDK/community/blob/main/ADOPTERS.md) page includes the user list of the project. You can leave a [📝comment](https://github.com/OpenIMSDK/Open-IM-Server/issues/379) to let us know your use case. The [user case studies](https://github.com/OpenIMSDK/community/blob/main/ADOPTERS.md) page includes the user list of the project. You can leave a [📝comment](https://github.com/OpenIMSDK/Open-IM-Server/issues/379) to let us know your use case.
![avatar](https://github.com/OpenIMSDK/OpenIM-Docs/blob/main/docs/images/WechatIMG20.jpeg) ![avatar](https://openim-1253691595.cos.ap-nanjing.myqcloud.com/WechatIMG20.jpeg)
## License ## License

@ -100,7 +100,7 @@ services:
openim_server: openim_server:
image: ghcr.io/openimsdk/openim-server:v3.0.1 image: ghcr.io/openimsdk/openim-server:v3.0
container_name: openim-server container_name: openim-server
volumes: volumes:
- ./logs:/Open-IM-Server/logs - ./logs:/Open-IM-Server/logs
@ -123,7 +123,7 @@ services:
max-file: "2" max-file: "2"
openim_chat: openim_chat:
image: openim/openim_chat:v1.1.0 image: ghcr.io/openimsdk/openim-chat:v1.0.0
container_name: openim_chat container_name: openim_chat
restart: always restart: always
depends_on: depends_on:

@ -21,7 +21,12 @@ if ! command -v docker >/dev/null 2>&1; then
fi fi
# Start Docker services using docker-compose # Start Docker services using docker-compose
if command -v docker-compose &> /dev/null
then
docker-compose up -d docker-compose up -d
else
docker compose up -d
fi
# Move back to the 'scripts' folder # Move back to the 'scripts' folder
cd scripts cd scripts

@ -214,6 +214,7 @@ func (m *MessageApi) SendMessage(c *gin.Context) {
if err != nil { if err != nil {
log.ZError(c, "decodeData failed", err) log.ZError(c, "decodeData failed", err)
apiresp.GinError(c, err) apiresp.GinError(c, err)
return
} }
sendMsgReq.MsgData.RecvID = req.RecvID sendMsgReq.MsgData.RecvID = req.RecvID
var status int var status int
@ -260,6 +261,7 @@ func (m *MessageApi) BatchSendMsg(c *gin.Context) {
if err != nil { if err != nil {
log.ZError(c, "GetAllUserIDs failed", err) log.ZError(c, "GetAllUserIDs failed", err)
apiresp.GinError(c, err) apiresp.GinError(c, err)
return
} }
if len(recvIDsPart) < showNumber { if len(recvIDsPart) < showNumber {
recvIDs = append(recvIDs, recvIDsPart...) recvIDs = append(recvIDs, recvIDsPart...)
@ -275,6 +277,7 @@ func (m *MessageApi) BatchSendMsg(c *gin.Context) {
if err != nil { if err != nil {
log.ZError(c, "decodeData failed", err) log.ZError(c, "decodeData failed", err)
apiresp.GinError(c, err) apiresp.GinError(c, err)
return
} }
for _, recvID := range recvIDs { for _, recvID := range recvIDs {
sendMsgReq.MsgData.RecvID = recvID sendMsgReq.MsgData.RecvID = recvID

@ -25,9 +25,7 @@ import (
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/constant" "github.com/OpenIMSDK/Open-IM-Server/pkg/common/constant"
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/log" "github.com/OpenIMSDK/Open-IM-Server/pkg/common/log"
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/prome" "github.com/OpenIMSDK/Open-IM-Server/pkg/common/prome"
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/tokenverify"
"github.com/OpenIMSDK/Open-IM-Server/pkg/discoveryregistry" "github.com/OpenIMSDK/Open-IM-Server/pkg/discoveryregistry"
"github.com/OpenIMSDK/Open-IM-Server/pkg/errs"
"github.com/OpenIMSDK/Open-IM-Server/pkg/proto/msggateway" "github.com/OpenIMSDK/Open-IM-Server/pkg/proto/msggateway"
"github.com/OpenIMSDK/Open-IM-Server/pkg/startrpc" "github.com/OpenIMSDK/Open-IM-Server/pkg/startrpc"
"github.com/OpenIMSDK/Open-IM-Server/pkg/utils" "github.com/OpenIMSDK/Open-IM-Server/pkg/utils"
@ -84,9 +82,6 @@ func (s *Server) GetUsersOnlineStatus(
ctx context.Context, ctx context.Context,
req *msggateway.GetUsersOnlineStatusReq, req *msggateway.GetUsersOnlineStatusReq,
) (*msggateway.GetUsersOnlineStatusResp, error) { ) (*msggateway.GetUsersOnlineStatusResp, error) {
if !tokenverify.IsAppManagerUid(ctx) {
return nil, errs.ErrNoPermission.Wrap("only app manager")
}
var resp msggateway.GetUsersOnlineStatusResp var resp msggateway.GetUsersOnlineStatusResp
for _, userID := range req.UserIDs { for _, userID := range req.UserIDs {
clients, ok := s.LongConnServer.GetUserAllCons(userID) clients, ok := s.LongConnServer.GetUserAllCons(userID)
@ -181,13 +176,12 @@ func (s *Server) KickUserOffline(
if clients, _, ok := s.LongConnServer.GetUserPlatformCons(v, int(req.PlatformID)); ok { if clients, _, ok := s.LongConnServer.GetUserPlatformCons(v, int(req.PlatformID)); ok {
for _, client := range clients { for _, client := range clients {
log.ZDebug(ctx, "kick user offline", "userID", v, "platformID", req.PlatformID, "client", client) log.ZDebug(ctx, "kick user offline", "userID", v, "platformID", req.PlatformID, "client", client)
err := client.KickOnlineMessage() if err := client.longConnServer.KickUserConn(client); err != nil {
if err != nil { log.ZWarn(ctx, "kick user offline failed", err, "userID", v, "platformID", req.PlatformID)
return nil, err
} }
} }
} else { } else {
log.ZWarn(ctx, "conn not exist", nil, "userID", v, "platformID", req.PlatformID) log.ZInfo(ctx, "conn not exist", "userID", v, "platformID", req.PlatformID)
} }
} }
return &msggateway.KickUserOfflineResp{}, nil return &msggateway.KickUserOfflineResp{}, nil

@ -47,6 +47,7 @@ type LongConnServer interface {
Validate(s interface{}) error Validate(s interface{}) error
SetCacheHandler(cache cache.MsgModel) SetCacheHandler(cache cache.MsgModel)
SetDiscoveryRegistry(client discoveryregistry.SvcDiscoveryRegistry) SetDiscoveryRegistry(client discoveryregistry.SvcDiscoveryRegistry)
KickUserConn(client *Client) error
UnRegister(c *Client) UnRegister(c *Client)
Compressor Compressor
Encoder Encoder
@ -145,7 +146,7 @@ func (ws *WsServer) Run() error {
case client = <-ws.unregisterChan: case client = <-ws.unregisterChan:
ws.unregisterClient(client) ws.unregisterClient(client)
case onlineInfo := <-ws.kickHandlerChan: case onlineInfo := <-ws.kickHandlerChan:
ws.multiTerminalLoginChecker(onlineInfo) ws.multiTerminalLoginChecker(onlineInfo.clientOK, onlineInfo.oldClients, onlineInfo.newClient)
} }
} }
}() }()
@ -207,80 +208,77 @@ func getRemoteAdders(client []*Client) string {
return ret return ret
} }
func (ws *WsServer) multiTerminalLoginChecker(info *kickHandler) { func (ws *WsServer) KickUserConn(client *Client) error {
ws.clients.deleteClients(client.UserID, []*Client{client})
return client.KickOnlineMessage()
}
func (ws *WsServer) multiTerminalLoginChecker(clientOK bool, oldClients []*Client, newClient *Client) {
switch config.Config.MultiLoginPolicy { switch config.Config.MultiLoginPolicy {
case constant.DefalutNotKick: case constant.DefalutNotKick:
case constant.PCAndOther: case constant.PCAndOther:
if constant.PlatformIDToClass(info.newClient.PlatformID) == constant.TerminalPC { if constant.PlatformIDToClass(newClient.PlatformID) == constant.TerminalPC {
return return
} }
fallthrough fallthrough
case constant.AllLoginButSameTermKick: case constant.AllLoginButSameTermKick:
if info.clientOK { if clientOK {
ws.clients.deleteClients(info.newClient.UserID, info.oldClients) ws.clients.deleteClients(newClient.UserID, oldClients)
for _, c := range info.oldClients { for _, c := range oldClients {
err := c.KickOnlineMessage() err := c.KickOnlineMessage()
if err != nil { if err != nil {
log.ZWarn(c.ctx, "KickOnlineMessage", err) log.ZWarn(c.ctx, "KickOnlineMessage", err)
} }
} }
m, err := ws.cache.GetTokensWithoutError( m, err := ws.cache.GetTokensWithoutError(
info.newClient.ctx, newClient.ctx,
info.newClient.UserID, newClient.UserID,
info.newClient.PlatformID, newClient.PlatformID,
) )
if err != nil && err != redis.Nil { if err != nil && err != redis.Nil {
log.ZWarn( log.ZWarn(
info.newClient.ctx, newClient.ctx,
"get token from redis err", "get token from redis err",
err, err,
"userID", "userID",
info.newClient.UserID, newClient.UserID,
"platformID", "platformID",
info.newClient.PlatformID, newClient.PlatformID,
) )
return return
} }
if m == nil { if m == nil {
log.ZWarn( log.ZWarn(
info.newClient.ctx, newClient.ctx,
"m is nil", "m is nil",
errors.New("m is nil"), errors.New("m is nil"),
"userID", "userID",
info.newClient.UserID, newClient.UserID,
"platformID", "platformID",
info.newClient.PlatformID, newClient.PlatformID,
) )
return return
} }
log.ZDebug( log.ZDebug(
info.newClient.ctx, newClient.ctx,
"get token from redis", "get token from redis",
"userID", "userID",
info.newClient.UserID, newClient.UserID,
"platformID", "platformID",
info.newClient.PlatformID, newClient.PlatformID,
"tokenMap", "tokenMap",
m, m,
) )
for k := range m { for k := range m {
if k != info.newClient.ctx.GetToken() { if k != newClient.ctx.GetToken() {
m[k] = constant.KickedToken m[k] = constant.KickedToken
} }
} }
log.ZDebug(info.newClient.ctx, "set token map is ", "token map", m, "userID", info.newClient.UserID) log.ZDebug(newClient.ctx, "set token map is ", "token map", m, "userID", newClient.UserID)
err = ws.cache.SetTokenMapByUidPid(info.newClient.ctx, info.newClient.UserID, info.newClient.PlatformID, m) err = ws.cache.SetTokenMapByUidPid(newClient.ctx, newClient.UserID, newClient.PlatformID, m)
if err != nil { if err != nil {
log.ZWarn( log.ZWarn(newClient.ctx, "SetTokenMapByUidPid err", err, "userID", newClient.UserID, "platformID", newClient.PlatformID)
info.newClient.ctx,
"SetTokenMapByUidPid err",
err,
"userID",
info.newClient.UserID,
"platformID",
info.newClient.PlatformID,
)
return return
} }
} }

@ -23,6 +23,7 @@ import (
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/constant" "github.com/OpenIMSDK/Open-IM-Server/pkg/common/constant"
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/db/cache" "github.com/OpenIMSDK/Open-IM-Server/pkg/common/db/cache"
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/db/controller" "github.com/OpenIMSDK/Open-IM-Server/pkg/common/db/controller"
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/log"
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/mcontext" "github.com/OpenIMSDK/Open-IM-Server/pkg/common/mcontext"
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/tokenverify" "github.com/OpenIMSDK/Open-IM-Server/pkg/common/tokenverify"
"github.com/OpenIMSDK/Open-IM-Server/pkg/discoveryregistry" "github.com/OpenIMSDK/Open-IM-Server/pkg/discoveryregistry"
@ -129,11 +130,16 @@ func (s *authServer) forceKickOff(ctx context.Context, userID string, platformID
if err != nil { if err != nil {
return err return err
} }
for _, v := range conns {
log.ZDebug(ctx, "forceKickOff", "conn", v.(*grpc.ClientConn).Target())
}
for _, v := range conns { for _, v := range conns {
client := msggateway.NewMsgGatewayClient(v) client := msggateway.NewMsgGatewayClient(v)
kickReq := &msggateway.KickUserOfflineReq{KickUserIDList: []string{userID}, PlatformID: platformID} kickReq := &msggateway.KickUserOfflineReq{KickUserIDList: []string{userID}, PlatformID: platformID}
_, err := client.KickUserOffline(ctx, kickReq) _, err := client.KickUserOffline(ctx, kickReq)
return utils.Wrap(err, "") if err != nil {
log.ZError(ctx, "forceKickOff", err, "kickReq", kickReq)
}
} }
return nil return nil
} }

@ -41,11 +41,11 @@ func StartCronTask() error {
panic(err) panic(err)
} }
log.ZInfo(context.Background(), "start msgDestruct cron task", "cron config", config.Config.MsgDestructTime) log.ZInfo(context.Background(), "start msgDestruct cron task", "cron config", config.Config.MsgDestructTime)
_, err = c.AddFunc(config.Config.MsgDestructTime, msgTool.ConversationsDestructMsgs) // _, err = c.AddFunc(config.Config.MsgDestructTime, msgTool.ConversationsDestructMsgs)
if err != nil { // if err != nil {
fmt.Println("start conversationsDestructMsgs cron failed", err.Error(), config.Config.ChatRecordsClearTime) // fmt.Println("start conversationsDestructMsgs cron failed", err.Error(), config.Config.ChatRecordsClearTime)
panic(err) // panic(err)
} // }
c.Start() c.Start()
wg.Wait() wg.Wait()
return nil return nil

@ -54,7 +54,7 @@ type ConversationCache interface {
// get one conversation from msgCache // get one conversation from msgCache
GetConversation(ctx context.Context, ownerUserID, conversationID string) (*relationTb.ConversationModel, error) GetConversation(ctx context.Context, ownerUserID, conversationID string) (*relationTb.ConversationModel, error)
DelConvsersations(ownerUserID string, conversationIDs ...string) ConversationCache DelConversations(ownerUserID string, conversationIDs ...string) ConversationCache
DelUsersConversation(conversationID string, ownerUserIDs ...string) ConversationCache DelUsersConversation(conversationID string, ownerUserIDs ...string) ConversationCache
// get one conversation from msgCache // get one conversation from msgCache
GetConversations( GetConversations(
@ -225,9 +225,9 @@ func (c *ConversationRedisCache) GetConversation(
) )
} }
func (c *ConversationRedisCache) DelConvsersations(ownerUserID string, convsersationIDs ...string) ConversationCache { func (c *ConversationRedisCache) DelConversations(ownerUserID string, conversationIDs ...string) ConversationCache {
var keys []string var keys []string
for _, conversationID := range convsersationIDs { for _, conversationID := range conversationIDs {
keys = append(keys, c.getConversationKey(ownerUserID, conversationID)) keys = append(keys, c.getConversationKey(ownerUserID, conversationID))
} }
cache := c.NewCache() cache := c.NewCache()

@ -104,7 +104,7 @@ func (c *conversationDatabase) SetUsersConversationFiledTx(ctx context.Context,
if err != nil { if err != nil {
return err return err
} }
cache = cache.DelConversationIDs(NotUserIDs...).DelUserConversationIDsHash(NotUserIDs...).DelConvsersations(conversation.ConversationID, NotUserIDs...) cache = cache.DelConversationIDs(NotUserIDs...).DelUserConversationIDsHash(NotUserIDs...).DelConversations(conversation.ConversationID, NotUserIDs...)
} }
return nil return nil
}); err != nil { }); err != nil {
@ -128,7 +128,7 @@ func (c *conversationDatabase) CreateConversation(ctx context.Context, conversat
var userIDs []string var userIDs []string
cache := c.cache.NewCache() cache := c.cache.NewCache()
for _, conversation := range conversations { for _, conversation := range conversations {
cache = cache.DelConvsersations(conversation.OwnerUserID, conversation.ConversationID) cache = cache.DelConversations(conversation.OwnerUserID, conversation.ConversationID)
userIDs = append(userIDs, conversation.OwnerUserID) userIDs = append(userIDs, conversation.OwnerUserID)
} }
return cache.DelConversationIDs(userIDs...).DelUserConversationIDsHash(userIDs...).ExecDel(ctx) return cache.DelConversationIDs(userIDs...).DelUserConversationIDsHash(userIDs...).ExecDel(ctx)
@ -190,7 +190,7 @@ func (c *conversationDatabase) SetUserConversations(ctx context.Context, ownerUs
var conversationIDs []string var conversationIDs []string
for _, conversation := range conversations { for _, conversation := range conversations {
conversationIDs = append(conversationIDs, conversation.ConversationID) conversationIDs = append(conversationIDs, conversation.ConversationID)
cache = cache.DelConvsersations(conversation.OwnerUserID, conversation.ConversationID) cache = cache.DelConversations(conversation.OwnerUserID, conversation.ConversationID)
} }
conversationTx := c.conversationDB.NewTx(tx) conversationTx := c.conversationDB.NewTx(tx)
existConversations, err := conversationTx.Find(ctx, ownerUserID, conversationIDs) existConversations, err := conversationTx.Find(ctx, ownerUserID, conversationIDs)
@ -247,7 +247,7 @@ func (c *conversationDatabase) CreateGroupChatConversation(ctx context.Context,
for _, v := range notExistUserIDs { for _, v := range notExistUserIDs {
conversation := relationTb.ConversationModel{ConversationType: constant.SuperGroupChatType, GroupID: groupID, OwnerUserID: v, ConversationID: conversationID} conversation := relationTb.ConversationModel{ConversationType: constant.SuperGroupChatType, GroupID: groupID, OwnerUserID: v, ConversationID: conversationID}
conversations = append(conversations, &conversation) conversations = append(conversations, &conversation)
cache = cache.DelConvsersations(v, conversationID) cache = cache.DelConversations(v, conversationID)
} }
cache = cache.DelConversationIDs(notExistUserIDs...).DelUserConversationIDsHash(notExistUserIDs...) cache = cache.DelConversationIDs(notExistUserIDs...).DelUserConversationIDsHash(notExistUserIDs...)
if len(conversations) > 0 { if len(conversations) > 0 {
@ -261,7 +261,7 @@ func (c *conversationDatabase) CreateGroupChatConversation(ctx context.Context,
return err return err
} }
for _, v := range existConversationUserIDs { for _, v := range existConversationUserIDs {
cache = cache.DelConvsersations(v, conversationID) cache = cache.DelConversations(v, conversationID)
} }
return nil return nil
}); err != nil { }); err != nil {

@ -126,6 +126,11 @@ func (x *MarkMsgsAsReadReq) Check() error {
if x.UserID == "" { if x.UserID == "" {
return errs.ErrArgs.Wrap("userID is empty") return errs.ErrArgs.Wrap("userID is empty")
} }
for _, seq := range x.Seqs {
if seq == 0 {
return errs.ErrArgs.Wrap("seqs has 0 value is invalid")
}
}
return nil return nil
} }
@ -139,6 +144,11 @@ func (x *MarkConversationAsReadReq) Check() error {
if x.HasReadSeq < 1 { if x.HasReadSeq < 1 {
return errs.ErrArgs.Wrap("hasReadSeq is invalid") return errs.ErrArgs.Wrap("hasReadSeq is invalid")
} }
for _, seq := range x.Seqs {
if seq == 0 {
return errs.ErrArgs.Wrap("seqs has 0 value is invalid")
}
}
return nil return nil
} }

@ -8,5 +8,5 @@ start msg.exe -p 10030
start msggateway.exe -p 10040 -w 10001 start msggateway.exe -p 10040 -w 10001
start push.exe -p 10070 start push.exe -p 10070
start msgtransfer.exe start msgtransfer.exe
start third.exe -p 10090
start user.exe -p 10010 start user.exe -p 10010
start third.exe -p 10090

Loading…
Cancel
Save