From 32c5f65d2fd35f501a0db7a9a322e61fa7ad2800 Mon Sep 17 00:00:00 2001 From: withchao <993506633@qq.com> Date: Tue, 2 Jul 2024 14:32:58 +0800 Subject: [PATCH] online cache --- .../openim-rpc-conversation/main.go | 4 - cmd/openim-rpc/openim-rpc-group/main.go | 4 - internal/msggateway/n_ws_server.go | 4 +- internal/msggateway/online.go | 2 +- internal/msggateway/user_map.go | 270 ++++++++++-------- internal/msggateway/user_map2.go | 27 +- 6 files changed, 156 insertions(+), 155 deletions(-) diff --git a/cmd/openim-rpc/openim-rpc-conversation/main.go b/cmd/openim-rpc/openim-rpc-conversation/main.go index 5b3077ccb..5b2e66c95 100644 --- a/cmd/openim-rpc/openim-rpc-conversation/main.go +++ b/cmd/openim-rpc/openim-rpc-conversation/main.go @@ -17,13 +17,9 @@ package main import ( "github.com/openimsdk/open-im-server/v3/pkg/common/cmd" "github.com/openimsdk/tools/system/program" - "os" ) func main() { - if len(os.Args) == 1 { - os.Args = []string{os.Args[0], "-i", "0", "-c", "/Users/chao/Desktop/project/open-im-server/config"} - } if err := cmd.NewConversationRpcCmd().Exec(); err != nil { program.ExitWithError(err) } diff --git a/cmd/openim-rpc/openim-rpc-group/main.go b/cmd/openim-rpc/openim-rpc-group/main.go index 44e5509df..5badf934e 100644 --- a/cmd/openim-rpc/openim-rpc-group/main.go +++ b/cmd/openim-rpc/openim-rpc-group/main.go @@ -17,13 +17,9 @@ package main import ( "github.com/openimsdk/open-im-server/v3/pkg/common/cmd" "github.com/openimsdk/tools/system/program" - "os" ) func main() { - if len(os.Args) == 1 { - os.Args = []string{os.Args[0], "-i", "0", "-c", "/Users/chao/Desktop/project/open-im-server/config"} - } if err := cmd.NewGroupRpcCmd().Exec(); err != nil { program.ExitWithError(err) } diff --git a/internal/msggateway/n_ws_server.go b/internal/msggateway/n_ws_server.go index ff93ce6bc..b87f93ab6 100644 --- a/internal/msggateway/n_ws_server.go +++ b/internal/msggateway/n_ws_server.go @@ -140,7 +140,7 @@ func NewWsServer(msgGatewayConfig *Config, opts ...Option) *WsServer { unregisterChan: make(chan *Client, 1000), kickHandlerChan: make(chan *kickHandler, 1000), validate: v, - clients: newUserMap(), + clients: newUserMap1(), Compressor: NewGzipCompressor(), Encoder: NewGobEncoder(), webhookClient: webhook.NewWebhookClient(msgGatewayConfig.WebhooksConfig.URL), @@ -345,7 +345,7 @@ func (ws *WsServer) multiTerminalLoginChecker(clientOK bool, oldClients []*Clien func (ws *WsServer) unregisterClient(client *Client) { defer ws.clientPool.Put(client) - isDeleteUser := ws.clients.Delete(client.UserID, client.ctx.GetRemoteAddr()) + isDeleteUser := ws.clients.DeleteClients(client.UserID, []*Client{client}) if isDeleteUser { ws.onlineUserNum.Add(-1) prommetrics.OnlineUserGauge.Dec() diff --git a/internal/msggateway/online.go b/internal/msggateway/online.go index 61549083a..b640cdd80 100644 --- a/internal/msggateway/online.go +++ b/internal/msggateway/online.go @@ -98,7 +98,7 @@ func (ws *WsServer) ChangeOnlineStatus(concurrent int) { case <-mergeTicker.C: pushAllUserState() case now := <-scanTicker.C: - pushUserState(ws.clients.GetAllUserStatus(now.Add(-cachekey.OnlineExpire/3), now)...) + pushUserState(ws.clients.GetAllUserStatus(now.Add(-cachekey.OnlineExpire / 3))...) case state := <-ws.clients.UserState(): log.ZDebug(context.Background(), "user online change", "userID", state.UserID, "online", state.Online, "offline", state.Offline) pushUserState(state) diff --git a/internal/msggateway/user_map.go b/internal/msggateway/user_map.go index dcab066f8..119b07ae5 100644 --- a/internal/msggateway/user_map.go +++ b/internal/msggateway/user_map.go @@ -14,123 +14,153 @@ package msggateway -// -//import ( -// "context" -// "sync" -// -// "github.com/openimsdk/tools/log" -// "github.com/openimsdk/tools/utils/datautil" -//) -// -//type UserMap struct { -// m sync.Map -//} -// -//func newUserMap() UMap { -// return &UserMap{} -//} -// -//func (u *UserMap) GetAll(key string) ([]*Client, bool) { -// allClients, ok := u.m.Load(key) -// if ok { -// return allClients.([]*Client), ok -// } -// return nil, ok -//} -// -//func (u *UserMap) Get(key string, platformID int) ([]*Client, bool, bool) { -// allClients, userExisted := u.m.Load(key) -// if userExisted { -// var clients []*Client -// for _, client := range allClients.([]*Client) { -// if client.PlatformID == platformID { -// clients = append(clients, client) -// } -// } -// if len(clients) > 0 { -// return clients, userExisted, true -// } -// return clients, userExisted, false -// } -// return nil, userExisted, false -//} -// -//// Set adds a client to the map. -//func (u *UserMap) Set(key string, v *Client) { -// allClients, existed := u.m.Load(key) -// if existed { -// log.ZDebug(context.Background(), "Set existed", "user_id", key, "client_user_id", v.UserID) -// oldClients := allClients.([]*Client) -// oldClients = append(oldClients, v) -// u.m.Store(key, oldClients) -// } else { -// log.ZDebug(context.Background(), "Set not existed", "user_id", key, "client_user_id", v.UserID) -// -// var clients []*Client -// clients = append(clients, v) -// u.m.Store(key, clients) -// } -//} -// -//func (u *UserMap) Delete(key string, connRemoteAddr string) (isDeleteUser bool) { -// // Attempt to load the clients associated with the key. -// allClients, existed := u.m.Load(key) -// if !existed { -// // Return false immediately if the key does not exist. -// return false -// } -// -// // Convert allClients to a slice of *Client. -// oldClients := allClients.([]*Client) -// var remainingClients []*Client -// for _, client := range oldClients { -// // Keep clients that do not match the connRemoteAddr. -// if client.ctx.GetRemoteAddr() != connRemoteAddr { -// remainingClients = append(remainingClients, client) -// } -// } -// -// // If no clients remain after filtering, delete the key from the map. -// if len(remainingClients) == 0 { -// u.m.Delete(key) -// return true -// } -// -// // Otherwise, update the key with the remaining clients. -// u.m.Store(key, remainingClients) -// return false -//} -// -//func (u *UserMap) DeleteClients(key string, clients []*Client) (isDeleteUser bool) { -// m := datautil.SliceToMapAny(clients, func(c *Client) (string, struct{}) { -// return c.ctx.GetRemoteAddr(), struct{}{} -// }) -// allClients, existed := u.m.Load(key) -// if !existed { -// // If the key doesn't exist, return false. -// return false -// } -// -// // Filter out clients that are in the deleteMap. -// oldClients := allClients.([]*Client) -// var remainingClients []*Client -// for _, client := range oldClients { -// if _, shouldBeDeleted := m[client.ctx.GetRemoteAddr()]; !shouldBeDeleted { -// remainingClients = append(remainingClients, client) -// } -// } -// -// // Update or delete the key based on the remaining clients. -// if len(remainingClients) == 0 { -// u.m.Delete(key) -// return true -// } -// -// u.m.Store(key, remainingClients) -// return false -//} -// -//func (u *UserMap) DeleteAll(key string) { -// u.m.Delete(key) -//} +import ( + "context" + "sync" + "time" + + "github.com/openimsdk/tools/log" + "github.com/openimsdk/tools/utils/datautil" +) + +func newUserMap1() UMap { + return &UserMap{ + ch: make(chan UserState, 1024), + } +} + +type UserPlatform1 struct { + Time time.Time + Clients []*Client +} + +func (u *UserPlatform1) PlatformIDs() []int32 { + if len(u.Clients) == 0 { + return nil + } + platformIDs := make([]int32, 0, len(u.Clients)) + for _, client := range u.Clients { + platformIDs = append(platformIDs, int32(client.PlatformID)) + } + return platformIDs +} + +type UserMap struct { + m sync.Map + ch chan UserState +} + +func (u *UserMap) UserState() <-chan UserState { + return u.ch +} + +func (u *UserMap) GetAllUserStatus(deadline time.Time) []UserState { + var result []UserState + u.m.Range(func(key, value any) bool { + client := value.(*UserPlatform1) + if client.Time.Before(deadline) { + return true + } + client.Time = time.Now() + us := UserState{ + UserID: key.(string), + Online: make([]int32, 0, len(client.Clients)), + } + for _, c := range client.Clients { + us.Online = append(us.Online, int32(c.PlatformID)) + } + return true + }) + return result +} + +func (u *UserMap) push(userID string, userPlatform *UserPlatform1, offline []int32) bool { + select { + case u.ch <- UserState{UserID: userID, Online: userPlatform.PlatformIDs(), Offline: offline}: + userPlatform.Time = time.Now() + return true + default: + return false + } +} + +func (u *UserMap) GetAll(key string) ([]*Client, bool) { + allClients, ok := u.m.Load(key) + if ok { + return allClients.(*UserPlatform1).Clients, ok + } + return nil, ok +} + +func (u *UserMap) Get(key string, platformID int) ([]*Client, bool, bool) { + allClients, userExisted := u.m.Load(key) + if userExisted { + var clients []*Client + for _, client := range allClients.(*UserPlatform1).Clients { + if client.PlatformID == platformID { + clients = append(clients, client) + } + } + if len(clients) > 0 { + return clients, true, true + } + return clients, true, false + } + return nil, false, false +} + +// Set adds a client to the map. +func (u *UserMap) Set(key string, v *Client) { + allClients, existed := u.m.Load(key) + if existed { + log.ZDebug(context.Background(), "Set existed", "user_id", key, "client_user_id", v.UserID) + oldClients := allClients.(*UserPlatform1) + oldClients.Time = time.Now() + oldClients.Clients = append(oldClients.Clients, v) + u.push(key, oldClients, nil) + } else { + log.ZDebug(context.Background(), "Set not existed", "user_id", key, "client_user_id", v.UserID) + cli := &UserPlatform1{ + Time: time.Now(), + Clients: []*Client{v}, + } + u.m.Store(key, cli) + u.push(key, cli, nil) + } + +} + +func (u *UserMap) DeleteClients(key string, clients []*Client) (isDeleteUser bool) { + m := datautil.SliceToMapAny(clients, func(c *Client) (string, struct{}) { + return c.ctx.GetRemoteAddr(), struct{}{} + }) + allClients, existed := u.m.Load(key) + if !existed { + // If the key doesn't exist, return false. + return false + } + + // Filter out clients that are in the deleteMap. + oldClients := allClients.(*UserPlatform1) + var ( + remainingClients []*Client + offline []int32 + ) + for _, client := range oldClients.Clients { + if _, shouldBeDeleted := m[client.ctx.GetRemoteAddr()]; !shouldBeDeleted { + remainingClients = append(remainingClients, client) + } else { + offline = append(offline, int32(client.PlatformID)) + } + } + + oldClients.Clients = remainingClients + defer u.push(key, oldClients, offline) + // Update or delete the key based on the remaining clients. + if len(remainingClients) == 0 { + u.m.Delete(key) + return true + } + + return false +} diff --git a/internal/msggateway/user_map2.go b/internal/msggateway/user_map2.go index b6ed40373..8a064ab83 100644 --- a/internal/msggateway/user_map2.go +++ b/internal/msggateway/user_map2.go @@ -9,10 +9,9 @@ type UMap interface { GetAll(userID string) ([]*Client, bool) Get(userID string, platformID int) ([]*Client, bool, bool) Set(userID string, v *Client) - Delete(userID string, connRemoteAddr string) (isDeleteUser bool) DeleteClients(userID string, clients []*Client) (isDeleteUser bool) UserState() <-chan UserState - GetAllUserStatus(deadline, nowtime time.Time) []UserState + GetAllUserStatus(deadline time.Time) []UserState } var _ UMap = (*UserMap2)(nil) @@ -102,26 +101,6 @@ func (u *UserMap2) Set(userID string, client *Client) { u.push(client.UserID, result, nil) } -func (u *UserMap2) Delete(userID string, connRemoteAddr string) (isDeleteUser bool) { - u.lock.Lock() - defer u.lock.Unlock() - result, ok := u.data[userID] - if !ok { - return false - } - client, ok := result.Clients[connRemoteAddr] - if !ok { - return false - } - delete(result.Clients, connRemoteAddr) - defer u.push(userID, result, []int32{int32(client.PlatformID)}) - if len(result.Clients) > 0 { - return false - } - delete(u.data, userID) - return true -} - func (u *UserMap2) DeleteClients(userID string, clients []*Client) (isDeleteUser bool) { if len(clients) == 0 { return false @@ -145,7 +124,7 @@ func (u *UserMap2) DeleteClients(userID string, clients []*Client) (isDeleteUser return true } -func (u *UserMap2) GetAllUserStatus(deadline, nowtime time.Time) []UserState { +func (u *UserMap2) GetAllUserStatus(deadline time.Time) []UserState { u.lock.RLock() defer u.lock.RUnlock() if len(u.data) == 0 { @@ -159,7 +138,7 @@ func (u *UserMap2) GetAllUserStatus(deadline, nowtime time.Time) []UserState { if p.Time.Before(deadline) { continue } - p.Time = nowtime + p.Time = time.Now() online := make([]int32, 0, len(p.Clients)) for _, client := range p.Clients { online = append(online, int32(client.PlatformID))