online cache

pull/2393/head
withchao 1 year ago
parent 32c5f65d2f
commit 9388cb61e8

@ -140,7 +140,7 @@ func NewWsServer(msgGatewayConfig *Config, opts ...Option) *WsServer {
unregisterChan: make(chan *Client, 1000), unregisterChan: make(chan *Client, 1000),
kickHandlerChan: make(chan *kickHandler, 1000), kickHandlerChan: make(chan *kickHandler, 1000),
validate: v, validate: v,
clients: newUserMap1(), clients: newUserMap(),
Compressor: NewGzipCompressor(), Compressor: NewGzipCompressor(),
Encoder: NewGobEncoder(), Encoder: NewGobEncoder(),
webhookClient: webhook.NewWebhookClient(msgGatewayConfig.WebhooksConfig.URL), webhookClient: webhook.NewWebhookClient(msgGatewayConfig.WebhooksConfig.URL),

@ -98,7 +98,7 @@ func (ws *WsServer) ChangeOnlineStatus(concurrent int) {
case <-mergeTicker.C: case <-mergeTicker.C:
pushAllUserState() pushAllUserState()
case now := <-scanTicker.C: case now := <-scanTicker.C:
pushUserState(ws.clients.GetAllUserStatus(now.Add(-cachekey.OnlineExpire / 3))...) pushUserState(ws.clients.GetAllUserStatus(now.Add(-cachekey.OnlineExpire/3), now)...)
case state := <-ws.clients.UserState(): case state := <-ws.clients.UserState():
log.ZDebug(context.Background(), "user online change", "userID", state.UserID, "online", state.Online, "offline", state.Offline) log.ZDebug(context.Background(), "user online change", "userID", state.UserID, "online", state.Online, "offline", state.Offline)
pushUserState(state) pushUserState(state)

@ -54,14 +54,14 @@ func (u *UserMap) UserState() <-chan UserState {
return u.ch return u.ch
} }
func (u *UserMap) GetAllUserStatus(deadline time.Time) []UserState { func (u *UserMap) GetAllUserStatus(deadline time.Time, nowtime time.Time) []UserState {
var result []UserState var result []UserState
u.m.Range(func(key, value any) bool { u.m.Range(func(key, value any) bool {
client := value.(*UserPlatform1) client := value.(*UserPlatform1)
if client.Time.Before(deadline) { if client.Time.Before(deadline) {
return true return true
} }
client.Time = time.Now() client.Time = nowtime
us := UserState{ us := UserState{
UserID: key.(string), UserID: key.(string),
Online: make([]int32, 0, len(client.Clients)), Online: make([]int32, 0, len(client.Clients)),

@ -1,6 +1,7 @@
package msggateway package msggateway
import ( import (
"github.com/openimsdk/tools/utils/datautil"
"sync" "sync"
"time" "time"
) )
@ -11,14 +12,14 @@ type UMap interface {
Set(userID string, v *Client) Set(userID string, v *Client)
DeleteClients(userID string, clients []*Client) (isDeleteUser bool) DeleteClients(userID string, clients []*Client) (isDeleteUser bool)
UserState() <-chan UserState UserState() <-chan UserState
GetAllUserStatus(deadline time.Time) []UserState GetAllUserStatus(deadline time.Time, nowtime time.Time) []UserState
} }
var _ UMap = (*UserMap2)(nil) var _ UMap = (*userMap)(nil)
type UserPlatform struct { type UserPlatform struct {
Time time.Time Time time.Time
Clients map[string]*Client Clients []*Client
} }
func (u *UserPlatform) PlatformIDs() []int32 { func (u *UserPlatform) PlatformIDs() []int32 {
@ -33,19 +34,19 @@ func (u *UserPlatform) PlatformIDs() []int32 {
} }
func newUserMap() UMap { func newUserMap() UMap {
return &UserMap2{ return &userMap{
data: make(map[string]*UserPlatform), data: make(map[string]*UserPlatform),
ch: make(chan UserState, 10000), ch: make(chan UserState, 10000),
} }
} }
type UserMap2 struct { type userMap struct {
lock sync.RWMutex lock sync.RWMutex
data map[string]*UserPlatform data map[string]*UserPlatform
ch chan UserState ch chan UserState
} }
func (u *UserMap2) push(userID string, userPlatform *UserPlatform, offline []int32) bool { func (u *userMap) push(userID string, userPlatform *UserPlatform, offline []int32) bool {
select { select {
case u.ch <- UserState{UserID: userID, Online: userPlatform.PlatformIDs(), Offline: offline}: case u.ch <- UserState{UserID: userID, Online: userPlatform.PlatformIDs(), Offline: offline}:
userPlatform.Time = time.Now() userPlatform.Time = time.Now()
@ -55,21 +56,17 @@ func (u *UserMap2) push(userID string, userPlatform *UserPlatform, offline []int
} }
} }
func (u *UserMap2) GetAll(userID string) ([]*Client, bool) { func (u *userMap) GetAll(userID string) ([]*Client, bool) {
u.lock.RLock() u.lock.RLock()
defer u.lock.RUnlock() defer u.lock.RUnlock()
result, ok := u.data[userID] result, ok := u.data[userID]
if !ok { if !ok {
return nil, false return nil, false
} }
clients := make([]*Client, 0, len(result.Clients)) return result.Clients, true
for _, client := range result.Clients {
clients = append(clients, client)
}
return clients, true
} }
func (u *UserMap2) Get(userID string, platformID int) ([]*Client, bool, bool) { func (u *userMap) Get(userID string, platformID int) ([]*Client, bool, bool) {
u.lock.RLock() u.lock.RLock()
defer u.lock.RUnlock() defer u.lock.RUnlock()
result, ok := u.data[userID] result, ok := u.data[userID]
@ -85,23 +82,21 @@ func (u *UserMap2) Get(userID string, platformID int) ([]*Client, bool, bool) {
return clients, true, len(clients) > 0 return clients, true, len(clients) > 0
} }
func (u *UserMap2) Set(userID string, client *Client) { func (u *userMap) Set(userID string, client *Client) {
u.lock.Lock() u.lock.Lock()
defer u.lock.Unlock() defer u.lock.Unlock()
result, ok := u.data[userID] result, ok := u.data[userID]
if ok { if ok {
result.Clients[client.ctx.GetRemoteAddr()] = client result.Clients = append(result.Clients, client)
} else { } else {
result = &UserPlatform{ result = &UserPlatform{
Clients: map[string]*Client{ Clients: []*Client{client},
client.ctx.GetRemoteAddr(): client,
},
} }
} }
u.push(client.UserID, result, nil) u.push(client.UserID, result, nil)
} }
func (u *UserMap2) DeleteClients(userID string, clients []*Client) (isDeleteUser bool) { func (u *userMap) DeleteClients(userID string, clients []*Client) (isDeleteUser bool) {
if len(clients) == 0 { if len(clients) == 0 {
return false return false
} }
@ -112,9 +107,16 @@ func (u *UserMap2) DeleteClients(userID string, clients []*Client) (isDeleteUser
return false return false
} }
offline := make([]int32, 0, len(clients)) offline := make([]int32, 0, len(clients))
for _, client := range clients { deleteAddr := datautil.SliceSetAny(clients, func(client *Client) string {
offline = append(offline, int32(client.PlatformID)) return client.ctx.GetRemoteAddr()
delete(result.Clients, client.ctx.GetRemoteAddr()) })
tmp := result.Clients
result.Clients = result.Clients[:0]
for _, client := range tmp {
if _, ok := deleteAddr[client.ctx.GetRemoteAddr()]; ok {
continue
}
result.Clients = append(result.Clients, client)
} }
defer u.push(userID, result, offline) defer u.push(userID, result, offline)
if len(result.Clients) > 0 { if len(result.Clients) > 0 {
@ -124,23 +126,17 @@ func (u *UserMap2) DeleteClients(userID string, clients []*Client) (isDeleteUser
return true return true
} }
func (u *UserMap2) GetAllUserStatus(deadline time.Time) []UserState { func (u *userMap) GetAllUserStatus(deadline time.Time, nowtime time.Time) []UserState {
u.lock.RLock() u.lock.RLock()
defer u.lock.RUnlock() defer u.lock.RUnlock()
if len(u.data) == 0 {
return nil
}
result := make([]UserState, 0, len(u.data)) result := make([]UserState, 0, len(u.data))
for userID, p := range u.data { for userID, userPlatform := range u.data {
if len(result) == cap(result) { if userPlatform.Time.Before(deadline) {
break
}
if p.Time.Before(deadline) {
continue continue
} }
p.Time = time.Now() userPlatform.Time = time.Now()
online := make([]int32, 0, len(p.Clients)) online := make([]int32, 0, len(userPlatform.Clients))
for _, client := range p.Clients { for _, client := range userPlatform.Clients {
online = append(online, int32(client.PlatformID)) online = append(online, int32(client.PlatformID))
} }
result = append(result, UserState{UserID: userID, Online: online}) result = append(result, UserState{UserID: userID, Online: online})
@ -148,7 +144,7 @@ func (u *UserMap2) GetAllUserStatus(deadline time.Time) []UserState {
return result return result
} }
func (u *UserMap2) UserState() <-chan UserState { func (u *userMap) UserState() <-chan UserState {
return u.ch return u.ch
} }

Loading…
Cancel
Save