From 6e2659c64a7f91faefb804af8c1084913647c2e5 Mon Sep 17 00:00:00 2001 From: withchao <993506633@qq.com> Date: Tue, 2 Jul 2024 15:46:57 +0800 Subject: [PATCH] online cache --- internal/msggateway/n_ws_server.go | 2 +- internal/msggateway/user_map.go | 208 ++++++++++++++--------------- internal/msggateway/user_map2.go | 161 ---------------------- 3 files changed, 99 insertions(+), 272 deletions(-) delete mode 100644 internal/msggateway/user_map2.go diff --git a/internal/msggateway/n_ws_server.go b/internal/msggateway/n_ws_server.go index 58408e1ef..1af9f014f 100644 --- a/internal/msggateway/n_ws_server.go +++ b/internal/msggateway/n_ws_server.go @@ -60,7 +60,7 @@ type WsServer struct { registerChan chan *Client unregisterChan chan *Client kickHandlerChan chan *kickHandler - clients UMap + clients UserMap clientPool sync.Pool onlineUserNum atomic.Int64 onlineUserConnNum atomic.Int64 diff --git a/internal/msggateway/user_map.go b/internal/msggateway/user_map.go index e1388d588..eb03123a5 100644 --- a/internal/msggateway/user_map.go +++ b/internal/msggateway/user_map.go @@ -1,40 +1,32 @@ -// Copyright © 2023 OpenIM. All rights reserved. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - package msggateway import ( - "context" + "github.com/openimsdk/tools/utils/datautil" "sync" "time" - - "github.com/openimsdk/tools/log" - "github.com/openimsdk/tools/utils/datautil" ) -func newUserMap1() UMap { - return &UserMap{ - ch: make(chan UserState, 1024), - } +type UserMap interface { + GetAll(userID string) ([]*Client, bool) + Get(userID string, platformID int) ([]*Client, bool, bool) + Set(userID string, v *Client) + DeleteClients(userID string, clients []*Client) (isDeleteUser bool) + UserState() <-chan UserState + GetAllUserStatus(deadline time.Time, nowtime time.Time) []UserState } -type UserPlatform1 struct { +type UserState struct { + UserID string + Online []int32 + Offline []int32 +} + +type UserPlatform struct { Time time.Time Clients []*Client } -func (u *UserPlatform1) PlatformIDs() []int32 { +func (u *UserPlatform) PlatformIDs() []int32 { if len(u.Clients) == 0 { return nil } @@ -45,36 +37,20 @@ func (u *UserPlatform1) PlatformIDs() []int32 { return platformIDs } -type UserMap struct { - m sync.Map - ch chan UserState -} - -func (u *UserMap) UserState() <-chan UserState { - return u.ch +func newUserMap() UserMap { + return &userMap{ + data: make(map[string]*UserPlatform), + ch: make(chan UserState, 10000), + } } -func (u *UserMap) GetAllUserStatus(deadline time.Time, nowtime time.Time) []UserState { - var result []UserState - u.m.Range(func(key, value any) bool { - client := value.(*UserPlatform1) - if client.Time.Before(deadline) { - return true - } - client.Time = nowtime - us := UserState{ - UserID: key.(string), - Online: make([]int32, 0, len(client.Clients)), - } - for _, c := range client.Clients { - us.Online = append(us.Online, int32(c.PlatformID)) - } - return true - }) - return result +type userMap struct { + lock sync.RWMutex + data map[string]*UserPlatform + ch chan UserState } -func (u *UserMap) push(userID string, userPlatform *UserPlatform1, offline []int32) bool { +func (u *userMap) push(userID string, userPlatform *UserPlatform, offline []int32) bool { select { case u.ch <- UserState{UserID: userID, Online: userPlatform.PlatformIDs(), Offline: offline}: userPlatform.Time = time.Now() @@ -84,83 +60,95 @@ func (u *UserMap) push(userID string, userPlatform *UserPlatform1, offline []int } } -func (u *UserMap) GetAll(key string) ([]*Client, bool) { - allClients, ok := u.m.Load(key) - if ok { - return allClients.(*UserPlatform1).Clients, ok +func (u *userMap) GetAll(userID string) ([]*Client, bool) { + u.lock.RLock() + defer u.lock.RUnlock() + result, ok := u.data[userID] + if !ok { + return nil, false } - return nil, ok + return result.Clients, true } -func (u *UserMap) Get(key string, platformID int) ([]*Client, bool, bool) { - allClients, userExisted := u.m.Load(key) - if userExisted { - var clients []*Client - for _, client := range allClients.(*UserPlatform1).Clients { - if client.PlatformID == platformID { - clients = append(clients, client) - } - } - if len(clients) > 0 { - return clients, true, true +func (u *userMap) Get(userID string, platformID int) ([]*Client, bool, bool) { + u.lock.RLock() + defer u.lock.RUnlock() + result, ok := u.data[userID] + if !ok { + return nil, false, false + } + var clients []*Client + for _, client := range result.Clients { + if client.PlatformID == platformID { + clients = append(clients, client) } - return clients, true, false } - return nil, false, false + return clients, true, len(clients) > 0 } -// Set adds a client to the map. -func (u *UserMap) Set(key string, v *Client) { - allClients, existed := u.m.Load(key) - if existed { - log.ZDebug(context.Background(), "Set existed", "user_id", key, "client_user_id", v.UserID) - oldClients := allClients.(*UserPlatform1) - oldClients.Time = time.Now() - oldClients.Clients = append(oldClients.Clients, v) - u.push(key, oldClients, nil) +func (u *userMap) Set(userID string, client *Client) { + u.lock.Lock() + defer u.lock.Unlock() + result, ok := u.data[userID] + if ok { + result.Clients = append(result.Clients, client) } else { - log.ZDebug(context.Background(), "Set not existed", "user_id", key, "client_user_id", v.UserID) - cli := &UserPlatform1{ - Time: time.Now(), - Clients: []*Client{v}, + result = &UserPlatform{ + Clients: []*Client{client}, } - u.m.Store(key, cli) - u.push(key, cli, nil) + u.data[userID] = result } - + u.push(client.UserID, result, nil) } -func (u *UserMap) DeleteClients(key string, clients []*Client) (isDeleteUser bool) { - m := datautil.SliceToMapAny(clients, func(c *Client) (string, struct{}) { - return c.ctx.GetRemoteAddr(), struct{}{} - }) - allClients, existed := u.m.Load(key) - if !existed { - // If the key doesn't exist, return false. +func (u *userMap) DeleteClients(userID string, clients []*Client) (isDeleteUser bool) { + if len(clients) == 0 { return false } - - // Filter out clients that are in the deleteMap. - oldClients := allClients.(*UserPlatform1) - var ( - remainingClients []*Client - offline []int32 - ) - for _, client := range oldClients.Clients { - if _, shouldBeDeleted := m[client.ctx.GetRemoteAddr()]; !shouldBeDeleted { - remainingClients = append(remainingClients, client) - } else { - offline = append(offline, int32(client.PlatformID)) + u.lock.Lock() + defer u.lock.Unlock() + result, ok := u.data[userID] + if !ok { + return false + } + offline := make([]int32, 0, len(clients)) + deleteAddr := datautil.SliceSetAny(clients, func(client *Client) string { + return client.ctx.GetRemoteAddr() + }) + tmp := result.Clients + result.Clients = result.Clients[:0] + for _, client := range tmp { + if _, ok := deleteAddr[client.ctx.GetRemoteAddr()]; ok { + continue } + result.Clients = append(result.Clients, client) } + defer u.push(userID, result, offline) + if len(result.Clients) > 0 { + return false + } + delete(u.data, userID) + return true +} - oldClients.Clients = remainingClients - defer u.push(key, oldClients, offline) - // Update or delete the key based on the remaining clients. - if len(remainingClients) == 0 { - u.m.Delete(key) - return true +func (u *userMap) GetAllUserStatus(deadline time.Time, nowtime time.Time) []UserState { + u.lock.RLock() + defer u.lock.RUnlock() + result := make([]UserState, 0, len(u.data)) + for userID, userPlatform := range u.data { + if userPlatform.Time.Before(deadline) { + continue + } + userPlatform.Time = nowtime + online := make([]int32, 0, len(userPlatform.Clients)) + for _, client := range userPlatform.Clients { + online = append(online, int32(client.PlatformID)) + } + result = append(result, UserState{UserID: userID, Online: online}) } + return result +} - return false +func (u *userMap) UserState() <-chan UserState { + return u.ch } diff --git a/internal/msggateway/user_map2.go b/internal/msggateway/user_map2.go deleted file mode 100644 index c913f49cc..000000000 --- a/internal/msggateway/user_map2.go +++ /dev/null @@ -1,161 +0,0 @@ -package msggateway - -import ( - "context" - "github.com/openimsdk/tools/log" - "github.com/openimsdk/tools/utils/datautil" - "sync" - "time" -) - -type UMap interface { - GetAll(userID string) ([]*Client, bool) - Get(userID string, platformID int) ([]*Client, bool, bool) - Set(userID string, v *Client) - DeleteClients(userID string, clients []*Client) (isDeleteUser bool) - UserState() <-chan UserState - GetAllUserStatus(deadline time.Time, nowtime time.Time) []UserState -} - -var _ UMap = (*userMap)(nil) - -type UserPlatform struct { - Time time.Time - Clients []*Client -} - -func (u *UserPlatform) PlatformIDs() []int32 { - if len(u.Clients) == 0 { - return nil - } - platformIDs := make([]int32, 0, len(u.Clients)) - for _, client := range u.Clients { - platformIDs = append(platformIDs, int32(client.PlatformID)) - } - return platformIDs -} - -func newUserMap() UMap { - return &userMap{ - data: make(map[string]*UserPlatform), - ch: make(chan UserState, 10000), - } -} - -type userMap struct { - lock sync.RWMutex - data map[string]*UserPlatform - ch chan UserState -} - -func (u *userMap) push(userID string, userPlatform *UserPlatform, offline []int32) bool { - select { - case u.ch <- UserState{UserID: userID, Online: userPlatform.PlatformIDs(), Offline: offline}: - userPlatform.Time = time.Now() - return true - default: - return false - } -} - -func (u *userMap) GetAll(userID string) ([]*Client, bool) { - log.ZInfo(context.Background(), "UserMap GetAll", "userID", userID) - u.lock.RLock() - defer u.lock.RUnlock() - result, ok := u.data[userID] - if !ok { - return nil, false - } - return result.Clients, true -} - -func (u *userMap) Get(userID string, platformID int) ([]*Client, bool, bool) { - log.ZInfo(context.Background(), "UserMap Get", "userID", userID, "platformID", platformID) - u.lock.RLock() - defer u.lock.RUnlock() - result, ok := u.data[userID] - if !ok { - return nil, false, false - } - var clients []*Client - for _, client := range result.Clients { - if client.PlatformID == platformID { - clients = append(clients, client) - } - } - return clients, true, len(clients) > 0 -} - -func (u *userMap) Set(userID string, client *Client) { - log.ZInfo(context.Background(), "UserMap Set", "userID", userID, "client", client.ctx.GetRemoteAddr()) - u.lock.Lock() - defer u.lock.Unlock() - result, ok := u.data[userID] - if ok { - result.Clients = append(result.Clients, client) - } else { - result = &UserPlatform{ - Clients: []*Client{client}, - } - } - u.push(client.UserID, result, nil) -} - -func (u *userMap) DeleteClients(userID string, clients []*Client) (isDeleteUser bool) { - log.ZInfo(context.Background(), "UserMap DeleteClients", "userID", userID, "client", len(clients)) - if len(clients) == 0 { - return false - } - u.lock.Lock() - defer u.lock.Unlock() - result, ok := u.data[userID] - if !ok { - return false - } - offline := make([]int32, 0, len(clients)) - deleteAddr := datautil.SliceSetAny(clients, func(client *Client) string { - return client.ctx.GetRemoteAddr() - }) - tmp := result.Clients - result.Clients = result.Clients[:0] - for _, client := range tmp { - if _, ok := deleteAddr[client.ctx.GetRemoteAddr()]; ok { - continue - } - result.Clients = append(result.Clients, client) - } - defer u.push(userID, result, offline) - if len(result.Clients) > 0 { - return false - } - delete(u.data, userID) - return true -} - -func (u *userMap) GetAllUserStatus(deadline time.Time, nowtime time.Time) []UserState { - u.lock.RLock() - defer u.lock.RUnlock() - result := make([]UserState, 0, len(u.data)) - for userID, userPlatform := range u.data { - if userPlatform.Time.Before(deadline) { - continue - } - userPlatform.Time = nowtime - online := make([]int32, 0, len(userPlatform.Clients)) - for _, client := range userPlatform.Clients { - online = append(online, int32(client.PlatformID)) - } - result = append(result, UserState{UserID: userID, Online: online}) - } - return result -} - -func (u *userMap) UserState() <-chan UserState { - return u.ch -} - -type UserState struct { - UserID string - Online []int32 - Offline []int32 -}