diff --git a/internal/msggateway/n_ws_server.go b/internal/msggateway/n_ws_server.go index b87f93ab6..58408e1ef 100644 --- a/internal/msggateway/n_ws_server.go +++ b/internal/msggateway/n_ws_server.go @@ -140,7 +140,7 @@ func NewWsServer(msgGatewayConfig *Config, opts ...Option) *WsServer { unregisterChan: make(chan *Client, 1000), kickHandlerChan: make(chan *kickHandler, 1000), validate: v, - clients: newUserMap1(), + clients: newUserMap(), Compressor: NewGzipCompressor(), Encoder: NewGobEncoder(), webhookClient: webhook.NewWebhookClient(msgGatewayConfig.WebhooksConfig.URL), diff --git a/internal/msggateway/online.go b/internal/msggateway/online.go index b640cdd80..61549083a 100644 --- a/internal/msggateway/online.go +++ b/internal/msggateway/online.go @@ -98,7 +98,7 @@ func (ws *WsServer) ChangeOnlineStatus(concurrent int) { case <-mergeTicker.C: pushAllUserState() case now := <-scanTicker.C: - pushUserState(ws.clients.GetAllUserStatus(now.Add(-cachekey.OnlineExpire / 3))...) + pushUserState(ws.clients.GetAllUserStatus(now.Add(-cachekey.OnlineExpire/3), now)...) case state := <-ws.clients.UserState(): log.ZDebug(context.Background(), "user online change", "userID", state.UserID, "online", state.Online, "offline", state.Offline) pushUserState(state) diff --git a/internal/msggateway/user_map.go b/internal/msggateway/user_map.go index 119b07ae5..e1388d588 100644 --- a/internal/msggateway/user_map.go +++ b/internal/msggateway/user_map.go @@ -54,14 +54,14 @@ func (u *UserMap) UserState() <-chan UserState { return u.ch } -func (u *UserMap) GetAllUserStatus(deadline time.Time) []UserState { +func (u *UserMap) GetAllUserStatus(deadline time.Time, nowtime time.Time) []UserState { var result []UserState u.m.Range(func(key, value any) bool { client := value.(*UserPlatform1) if client.Time.Before(deadline) { return true } - client.Time = time.Now() + client.Time = nowtime us := UserState{ UserID: key.(string), Online: make([]int32, 0, len(client.Clients)), diff --git a/internal/msggateway/user_map2.go b/internal/msggateway/user_map2.go index 8a064ab83..74b49d9d4 100644 --- a/internal/msggateway/user_map2.go +++ b/internal/msggateway/user_map2.go @@ -1,6 +1,7 @@ package msggateway import ( + "github.com/openimsdk/tools/utils/datautil" "sync" "time" ) @@ -11,14 +12,14 @@ type UMap interface { Set(userID string, v *Client) DeleteClients(userID string, clients []*Client) (isDeleteUser bool) UserState() <-chan UserState - GetAllUserStatus(deadline time.Time) []UserState + GetAllUserStatus(deadline time.Time, nowtime time.Time) []UserState } -var _ UMap = (*UserMap2)(nil) +var _ UMap = (*userMap)(nil) type UserPlatform struct { Time time.Time - Clients map[string]*Client + Clients []*Client } func (u *UserPlatform) PlatformIDs() []int32 { @@ -33,19 +34,19 @@ func (u *UserPlatform) PlatformIDs() []int32 { } func newUserMap() UMap { - return &UserMap2{ + return &userMap{ data: make(map[string]*UserPlatform), ch: make(chan UserState, 10000), } } -type UserMap2 struct { +type userMap struct { lock sync.RWMutex data map[string]*UserPlatform ch chan UserState } -func (u *UserMap2) push(userID string, userPlatform *UserPlatform, offline []int32) bool { +func (u *userMap) push(userID string, userPlatform *UserPlatform, offline []int32) bool { select { case u.ch <- UserState{UserID: userID, Online: userPlatform.PlatformIDs(), Offline: offline}: userPlatform.Time = time.Now() @@ -55,21 +56,17 @@ func (u *UserMap2) push(userID string, userPlatform *UserPlatform, offline []int } } -func (u *UserMap2) GetAll(userID string) ([]*Client, bool) { +func (u *userMap) GetAll(userID string) ([]*Client, bool) { u.lock.RLock() defer u.lock.RUnlock() result, ok := u.data[userID] if !ok { return nil, false } - clients := make([]*Client, 0, len(result.Clients)) - for _, client := range result.Clients { - clients = append(clients, client) - } - return clients, true + return result.Clients, true } -func (u *UserMap2) Get(userID string, platformID int) ([]*Client, bool, bool) { +func (u *userMap) Get(userID string, platformID int) ([]*Client, bool, bool) { u.lock.RLock() defer u.lock.RUnlock() result, ok := u.data[userID] @@ -85,23 +82,21 @@ func (u *UserMap2) Get(userID string, platformID int) ([]*Client, bool, bool) { return clients, true, len(clients) > 0 } -func (u *UserMap2) Set(userID string, client *Client) { +func (u *userMap) Set(userID string, client *Client) { u.lock.Lock() defer u.lock.Unlock() result, ok := u.data[userID] if ok { - result.Clients[client.ctx.GetRemoteAddr()] = client + result.Clients = append(result.Clients, client) } else { result = &UserPlatform{ - Clients: map[string]*Client{ - client.ctx.GetRemoteAddr(): client, - }, + Clients: []*Client{client}, } } u.push(client.UserID, result, nil) } -func (u *UserMap2) DeleteClients(userID string, clients []*Client) (isDeleteUser bool) { +func (u *userMap) DeleteClients(userID string, clients []*Client) (isDeleteUser bool) { if len(clients) == 0 { return false } @@ -112,9 +107,16 @@ func (u *UserMap2) DeleteClients(userID string, clients []*Client) (isDeleteUser return false } offline := make([]int32, 0, len(clients)) - for _, client := range clients { - offline = append(offline, int32(client.PlatformID)) - delete(result.Clients, client.ctx.GetRemoteAddr()) + deleteAddr := datautil.SliceSetAny(clients, func(client *Client) string { + return client.ctx.GetRemoteAddr() + }) + tmp := result.Clients + result.Clients = result.Clients[:0] + for _, client := range tmp { + if _, ok := deleteAddr[client.ctx.GetRemoteAddr()]; ok { + continue + } + result.Clients = append(result.Clients, client) } defer u.push(userID, result, offline) if len(result.Clients) > 0 { @@ -124,23 +126,17 @@ func (u *UserMap2) DeleteClients(userID string, clients []*Client) (isDeleteUser return true } -func (u *UserMap2) GetAllUserStatus(deadline time.Time) []UserState { +func (u *userMap) GetAllUserStatus(deadline time.Time, nowtime time.Time) []UserState { u.lock.RLock() defer u.lock.RUnlock() - if len(u.data) == 0 { - return nil - } result := make([]UserState, 0, len(u.data)) - for userID, p := range u.data { - if len(result) == cap(result) { - break - } - if p.Time.Before(deadline) { + for userID, userPlatform := range u.data { + if userPlatform.Time.Before(deadline) { continue } - p.Time = time.Now() - online := make([]int32, 0, len(p.Clients)) - for _, client := range p.Clients { + userPlatform.Time = time.Now() + online := make([]int32, 0, len(userPlatform.Clients)) + for _, client := range userPlatform.Clients { online = append(online, int32(client.PlatformID)) } result = append(result, UserState{UserID: userID, Online: online}) @@ -148,7 +144,7 @@ func (u *UserMap2) GetAllUserStatus(deadline time.Time) []UserState { return result } -func (u *UserMap2) UserState() <-chan UserState { +func (u *userMap) UserState() <-chan UserState { return u.ch }