online cache

pull/2393/head
withchao 1 year ago
parent 28c8b78b31
commit 6e2659c64a

@ -60,7 +60,7 @@ type WsServer struct {
registerChan chan *Client
unregisterChan chan *Client
kickHandlerChan chan *kickHandler
clients UMap
clients UserMap
clientPool sync.Pool
onlineUserNum atomic.Int64
onlineUserConnNum atomic.Int64

@ -1,40 +1,32 @@
// Copyright © 2023 OpenIM. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package msggateway
import (
"context"
"github.com/openimsdk/tools/utils/datautil"
"sync"
"time"
"github.com/openimsdk/tools/log"
"github.com/openimsdk/tools/utils/datautil"
)
func newUserMap1() UMap {
return &UserMap{
ch: make(chan UserState, 1024),
}
type UserMap interface {
GetAll(userID string) ([]*Client, bool)
Get(userID string, platformID int) ([]*Client, bool, bool)
Set(userID string, v *Client)
DeleteClients(userID string, clients []*Client) (isDeleteUser bool)
UserState() <-chan UserState
GetAllUserStatus(deadline time.Time, nowtime time.Time) []UserState
}
type UserPlatform1 struct {
type UserState struct {
UserID string
Online []int32
Offline []int32
}
type UserPlatform struct {
Time time.Time
Clients []*Client
}
func (u *UserPlatform1) PlatformIDs() []int32 {
func (u *UserPlatform) PlatformIDs() []int32 {
if len(u.Clients) == 0 {
return nil
}
@ -45,36 +37,20 @@ func (u *UserPlatform1) PlatformIDs() []int32 {
return platformIDs
}
type UserMap struct {
m sync.Map
ch chan UserState
}
func (u *UserMap) UserState() <-chan UserState {
return u.ch
func newUserMap() UserMap {
return &userMap{
data: make(map[string]*UserPlatform),
ch: make(chan UserState, 10000),
}
}
func (u *UserMap) GetAllUserStatus(deadline time.Time, nowtime time.Time) []UserState {
var result []UserState
u.m.Range(func(key, value any) bool {
client := value.(*UserPlatform1)
if client.Time.Before(deadline) {
return true
}
client.Time = nowtime
us := UserState{
UserID: key.(string),
Online: make([]int32, 0, len(client.Clients)),
}
for _, c := range client.Clients {
us.Online = append(us.Online, int32(c.PlatformID))
}
return true
})
return result
type userMap struct {
lock sync.RWMutex
data map[string]*UserPlatform
ch chan UserState
}
func (u *UserMap) push(userID string, userPlatform *UserPlatform1, offline []int32) bool {
func (u *userMap) push(userID string, userPlatform *UserPlatform, offline []int32) bool {
select {
case u.ch <- UserState{UserID: userID, Online: userPlatform.PlatformIDs(), Offline: offline}:
userPlatform.Time = time.Now()
@ -84,83 +60,95 @@ func (u *UserMap) push(userID string, userPlatform *UserPlatform1, offline []int
}
}
func (u *UserMap) GetAll(key string) ([]*Client, bool) {
allClients, ok := u.m.Load(key)
if ok {
return allClients.(*UserPlatform1).Clients, ok
func (u *userMap) GetAll(userID string) ([]*Client, bool) {
u.lock.RLock()
defer u.lock.RUnlock()
result, ok := u.data[userID]
if !ok {
return nil, false
}
return nil, ok
return result.Clients, true
}
func (u *UserMap) Get(key string, platformID int) ([]*Client, bool, bool) {
allClients, userExisted := u.m.Load(key)
if userExisted {
func (u *userMap) Get(userID string, platformID int) ([]*Client, bool, bool) {
u.lock.RLock()
defer u.lock.RUnlock()
result, ok := u.data[userID]
if !ok {
return nil, false, false
}
var clients []*Client
for _, client := range allClients.(*UserPlatform1).Clients {
for _, client := range result.Clients {
if client.PlatformID == platformID {
clients = append(clients, client)
}
}
if len(clients) > 0 {
return clients, true, true
}
return clients, true, false
}
return nil, false, false
return clients, true, len(clients) > 0
}
// Set adds a client to the map.
func (u *UserMap) Set(key string, v *Client) {
allClients, existed := u.m.Load(key)
if existed {
log.ZDebug(context.Background(), "Set existed", "user_id", key, "client_user_id", v.UserID)
oldClients := allClients.(*UserPlatform1)
oldClients.Time = time.Now()
oldClients.Clients = append(oldClients.Clients, v)
u.push(key, oldClients, nil)
func (u *userMap) Set(userID string, client *Client) {
u.lock.Lock()
defer u.lock.Unlock()
result, ok := u.data[userID]
if ok {
result.Clients = append(result.Clients, client)
} else {
log.ZDebug(context.Background(), "Set not existed", "user_id", key, "client_user_id", v.UserID)
cli := &UserPlatform1{
Time: time.Now(),
Clients: []*Client{v},
result = &UserPlatform{
Clients: []*Client{client},
}
u.m.Store(key, cli)
u.push(key, cli, nil)
u.data[userID] = result
}
u.push(client.UserID, result, nil)
}
func (u *UserMap) DeleteClients(key string, clients []*Client) (isDeleteUser bool) {
m := datautil.SliceToMapAny(clients, func(c *Client) (string, struct{}) {
return c.ctx.GetRemoteAddr(), struct{}{}
func (u *userMap) DeleteClients(userID string, clients []*Client) (isDeleteUser bool) {
if len(clients) == 0 {
return false
}
u.lock.Lock()
defer u.lock.Unlock()
result, ok := u.data[userID]
if !ok {
return false
}
offline := make([]int32, 0, len(clients))
deleteAddr := datautil.SliceSetAny(clients, func(client *Client) string {
return client.ctx.GetRemoteAddr()
})
allClients, existed := u.m.Load(key)
if !existed {
// If the key doesn't exist, return false.
tmp := result.Clients
result.Clients = result.Clients[:0]
for _, client := range tmp {
if _, ok := deleteAddr[client.ctx.GetRemoteAddr()]; ok {
continue
}
result.Clients = append(result.Clients, client)
}
defer u.push(userID, result, offline)
if len(result.Clients) > 0 {
return false
}
delete(u.data, userID)
return true
}
// Filter out clients that are in the deleteMap.
oldClients := allClients.(*UserPlatform1)
var (
remainingClients []*Client
offline []int32
)
for _, client := range oldClients.Clients {
if _, shouldBeDeleted := m[client.ctx.GetRemoteAddr()]; !shouldBeDeleted {
remainingClients = append(remainingClients, client)
} else {
offline = append(offline, int32(client.PlatformID))
func (u *userMap) GetAllUserStatus(deadline time.Time, nowtime time.Time) []UserState {
u.lock.RLock()
defer u.lock.RUnlock()
result := make([]UserState, 0, len(u.data))
for userID, userPlatform := range u.data {
if userPlatform.Time.Before(deadline) {
continue
}
userPlatform.Time = nowtime
online := make([]int32, 0, len(userPlatform.Clients))
for _, client := range userPlatform.Clients {
online = append(online, int32(client.PlatformID))
}
oldClients.Clients = remainingClients
defer u.push(key, oldClients, offline)
// Update or delete the key based on the remaining clients.
if len(remainingClients) == 0 {
u.m.Delete(key)
return true
result = append(result, UserState{UserID: userID, Online: online})
}
return result
}
return false
func (u *userMap) UserState() <-chan UserState {
return u.ch
}

@ -1,161 +0,0 @@
package msggateway
import (
"context"
"github.com/openimsdk/tools/log"
"github.com/openimsdk/tools/utils/datautil"
"sync"
"time"
)
type UMap interface {
GetAll(userID string) ([]*Client, bool)
Get(userID string, platformID int) ([]*Client, bool, bool)
Set(userID string, v *Client)
DeleteClients(userID string, clients []*Client) (isDeleteUser bool)
UserState() <-chan UserState
GetAllUserStatus(deadline time.Time, nowtime time.Time) []UserState
}
var _ UMap = (*userMap)(nil)
type UserPlatform struct {
Time time.Time
Clients []*Client
}
func (u *UserPlatform) PlatformIDs() []int32 {
if len(u.Clients) == 0 {
return nil
}
platformIDs := make([]int32, 0, len(u.Clients))
for _, client := range u.Clients {
platformIDs = append(platformIDs, int32(client.PlatformID))
}
return platformIDs
}
func newUserMap() UMap {
return &userMap{
data: make(map[string]*UserPlatform),
ch: make(chan UserState, 10000),
}
}
type userMap struct {
lock sync.RWMutex
data map[string]*UserPlatform
ch chan UserState
}
func (u *userMap) push(userID string, userPlatform *UserPlatform, offline []int32) bool {
select {
case u.ch <- UserState{UserID: userID, Online: userPlatform.PlatformIDs(), Offline: offline}:
userPlatform.Time = time.Now()
return true
default:
return false
}
}
func (u *userMap) GetAll(userID string) ([]*Client, bool) {
log.ZInfo(context.Background(), "UserMap GetAll", "userID", userID)
u.lock.RLock()
defer u.lock.RUnlock()
result, ok := u.data[userID]
if !ok {
return nil, false
}
return result.Clients, true
}
func (u *userMap) Get(userID string, platformID int) ([]*Client, bool, bool) {
log.ZInfo(context.Background(), "UserMap Get", "userID", userID, "platformID", platformID)
u.lock.RLock()
defer u.lock.RUnlock()
result, ok := u.data[userID]
if !ok {
return nil, false, false
}
var clients []*Client
for _, client := range result.Clients {
if client.PlatformID == platformID {
clients = append(clients, client)
}
}
return clients, true, len(clients) > 0
}
func (u *userMap) Set(userID string, client *Client) {
log.ZInfo(context.Background(), "UserMap Set", "userID", userID, "client", client.ctx.GetRemoteAddr())
u.lock.Lock()
defer u.lock.Unlock()
result, ok := u.data[userID]
if ok {
result.Clients = append(result.Clients, client)
} else {
result = &UserPlatform{
Clients: []*Client{client},
}
}
u.push(client.UserID, result, nil)
}
func (u *userMap) DeleteClients(userID string, clients []*Client) (isDeleteUser bool) {
log.ZInfo(context.Background(), "UserMap DeleteClients", "userID", userID, "client", len(clients))
if len(clients) == 0 {
return false
}
u.lock.Lock()
defer u.lock.Unlock()
result, ok := u.data[userID]
if !ok {
return false
}
offline := make([]int32, 0, len(clients))
deleteAddr := datautil.SliceSetAny(clients, func(client *Client) string {
return client.ctx.GetRemoteAddr()
})
tmp := result.Clients
result.Clients = result.Clients[:0]
for _, client := range tmp {
if _, ok := deleteAddr[client.ctx.GetRemoteAddr()]; ok {
continue
}
result.Clients = append(result.Clients, client)
}
defer u.push(userID, result, offline)
if len(result.Clients) > 0 {
return false
}
delete(u.data, userID)
return true
}
func (u *userMap) GetAllUserStatus(deadline time.Time, nowtime time.Time) []UserState {
u.lock.RLock()
defer u.lock.RUnlock()
result := make([]UserState, 0, len(u.data))
for userID, userPlatform := range u.data {
if userPlatform.Time.Before(deadline) {
continue
}
userPlatform.Time = nowtime
online := make([]int32, 0, len(userPlatform.Clients))
for _, client := range userPlatform.Clients {
online = append(online, int32(client.PlatformID))
}
result = append(result, UserState{UserID: userID, Online: online})
}
return result
}
func (u *userMap) UserState() <-chan UserState {
return u.ch
}
type UserState struct {
UserID string
Online []int32
Offline []int32
}
Loading…
Cancel
Save