online cache

pull/2393/head
withchao 1 year ago
parent 45b07bc3cc
commit 32c5f65d2f

@ -17,13 +17,9 @@ package main
import ( import (
"github.com/openimsdk/open-im-server/v3/pkg/common/cmd" "github.com/openimsdk/open-im-server/v3/pkg/common/cmd"
"github.com/openimsdk/tools/system/program" "github.com/openimsdk/tools/system/program"
"os"
) )
func main() { func main() {
if len(os.Args) == 1 {
os.Args = []string{os.Args[0], "-i", "0", "-c", "/Users/chao/Desktop/project/open-im-server/config"}
}
if err := cmd.NewConversationRpcCmd().Exec(); err != nil { if err := cmd.NewConversationRpcCmd().Exec(); err != nil {
program.ExitWithError(err) program.ExitWithError(err)
} }

@ -17,13 +17,9 @@ package main
import ( import (
"github.com/openimsdk/open-im-server/v3/pkg/common/cmd" "github.com/openimsdk/open-im-server/v3/pkg/common/cmd"
"github.com/openimsdk/tools/system/program" "github.com/openimsdk/tools/system/program"
"os"
) )
func main() { func main() {
if len(os.Args) == 1 {
os.Args = []string{os.Args[0], "-i", "0", "-c", "/Users/chao/Desktop/project/open-im-server/config"}
}
if err := cmd.NewGroupRpcCmd().Exec(); err != nil { if err := cmd.NewGroupRpcCmd().Exec(); err != nil {
program.ExitWithError(err) program.ExitWithError(err)
} }

@ -140,7 +140,7 @@ func NewWsServer(msgGatewayConfig *Config, opts ...Option) *WsServer {
unregisterChan: make(chan *Client, 1000), unregisterChan: make(chan *Client, 1000),
kickHandlerChan: make(chan *kickHandler, 1000), kickHandlerChan: make(chan *kickHandler, 1000),
validate: v, validate: v,
clients: newUserMap(), clients: newUserMap1(),
Compressor: NewGzipCompressor(), Compressor: NewGzipCompressor(),
Encoder: NewGobEncoder(), Encoder: NewGobEncoder(),
webhookClient: webhook.NewWebhookClient(msgGatewayConfig.WebhooksConfig.URL), webhookClient: webhook.NewWebhookClient(msgGatewayConfig.WebhooksConfig.URL),
@ -345,7 +345,7 @@ func (ws *WsServer) multiTerminalLoginChecker(clientOK bool, oldClients []*Clien
func (ws *WsServer) unregisterClient(client *Client) { func (ws *WsServer) unregisterClient(client *Client) {
defer ws.clientPool.Put(client) defer ws.clientPool.Put(client)
isDeleteUser := ws.clients.Delete(client.UserID, client.ctx.GetRemoteAddr()) isDeleteUser := ws.clients.DeleteClients(client.UserID, []*Client{client})
if isDeleteUser { if isDeleteUser {
ws.onlineUserNum.Add(-1) ws.onlineUserNum.Add(-1)
prommetrics.OnlineUserGauge.Dec() prommetrics.OnlineUserGauge.Dec()

@ -98,7 +98,7 @@ func (ws *WsServer) ChangeOnlineStatus(concurrent int) {
case <-mergeTicker.C: case <-mergeTicker.C:
pushAllUserState() pushAllUserState()
case now := <-scanTicker.C: case now := <-scanTicker.C:
pushUserState(ws.clients.GetAllUserStatus(now.Add(-cachekey.OnlineExpire/3), now)...) pushUserState(ws.clients.GetAllUserStatus(now.Add(-cachekey.OnlineExpire / 3))...)
case state := <-ws.clients.UserState(): case state := <-ws.clients.UserState():
log.ZDebug(context.Background(), "user online change", "userID", state.UserID, "online", state.Online, "offline", state.Offline) log.ZDebug(context.Background(), "user online change", "userID", state.UserID, "online", state.Online, "offline", state.Offline)
pushUserState(state) pushUserState(state)

@ -14,123 +14,153 @@
package msggateway package msggateway
// import (
//import ( "context"
// "context" "sync"
// "sync" "time"
//
// "github.com/openimsdk/tools/log" "github.com/openimsdk/tools/log"
// "github.com/openimsdk/tools/utils/datautil" "github.com/openimsdk/tools/utils/datautil"
//) )
//
//type UserMap struct { func newUserMap1() UMap {
// m sync.Map return &UserMap{
//} ch: make(chan UserState, 1024),
// }
//func newUserMap() UMap { }
// return &UserMap{}
//} type UserPlatform1 struct {
// Time time.Time
//func (u *UserMap) GetAll(key string) ([]*Client, bool) { Clients []*Client
// allClients, ok := u.m.Load(key) }
// if ok {
// return allClients.([]*Client), ok func (u *UserPlatform1) PlatformIDs() []int32 {
// } if len(u.Clients) == 0 {
// return nil, ok return nil
//} }
// platformIDs := make([]int32, 0, len(u.Clients))
//func (u *UserMap) Get(key string, platformID int) ([]*Client, bool, bool) { for _, client := range u.Clients {
// allClients, userExisted := u.m.Load(key) platformIDs = append(platformIDs, int32(client.PlatformID))
// if userExisted { }
// var clients []*Client return platformIDs
// for _, client := range allClients.([]*Client) { }
// if client.PlatformID == platformID {
// clients = append(clients, client) type UserMap struct {
// } m sync.Map
// } ch chan UserState
// if len(clients) > 0 { }
// return clients, userExisted, true
// } func (u *UserMap) UserState() <-chan UserState {
// return clients, userExisted, false return u.ch
// } }
// return nil, userExisted, false
//} func (u *UserMap) GetAllUserStatus(deadline time.Time) []UserState {
// var result []UserState
//// Set adds a client to the map. u.m.Range(func(key, value any) bool {
//func (u *UserMap) Set(key string, v *Client) { client := value.(*UserPlatform1)
// allClients, existed := u.m.Load(key) if client.Time.Before(deadline) {
// if existed { return true
// log.ZDebug(context.Background(), "Set existed", "user_id", key, "client_user_id", v.UserID) }
// oldClients := allClients.([]*Client) client.Time = time.Now()
// oldClients = append(oldClients, v) us := UserState{
// u.m.Store(key, oldClients) UserID: key.(string),
// } else { Online: make([]int32, 0, len(client.Clients)),
// log.ZDebug(context.Background(), "Set not existed", "user_id", key, "client_user_id", v.UserID) }
// for _, c := range client.Clients {
// var clients []*Client us.Online = append(us.Online, int32(c.PlatformID))
// clients = append(clients, v) }
// u.m.Store(key, clients) return true
// } })
//} return result
// }
//func (u *UserMap) Delete(key string, connRemoteAddr string) (isDeleteUser bool) {
// // Attempt to load the clients associated with the key. func (u *UserMap) push(userID string, userPlatform *UserPlatform1, offline []int32) bool {
// allClients, existed := u.m.Load(key) select {
// if !existed { case u.ch <- UserState{UserID: userID, Online: userPlatform.PlatformIDs(), Offline: offline}:
// // Return false immediately if the key does not exist. userPlatform.Time = time.Now()
// return false return true
// } default:
// return false
// // Convert allClients to a slice of *Client. }
// oldClients := allClients.([]*Client) }
// var remainingClients []*Client
// for _, client := range oldClients { func (u *UserMap) GetAll(key string) ([]*Client, bool) {
// // Keep clients that do not match the connRemoteAddr. allClients, ok := u.m.Load(key)
// if client.ctx.GetRemoteAddr() != connRemoteAddr { if ok {
// remainingClients = append(remainingClients, client) return allClients.(*UserPlatform1).Clients, ok
// } }
// } return nil, ok
// }
// // If no clients remain after filtering, delete the key from the map.
// if len(remainingClients) == 0 { func (u *UserMap) Get(key string, platformID int) ([]*Client, bool, bool) {
// u.m.Delete(key) allClients, userExisted := u.m.Load(key)
// return true if userExisted {
// } var clients []*Client
// for _, client := range allClients.(*UserPlatform1).Clients {
// // Otherwise, update the key with the remaining clients. if client.PlatformID == platformID {
// u.m.Store(key, remainingClients) clients = append(clients, client)
// return false }
//} }
// if len(clients) > 0 {
//func (u *UserMap) DeleteClients(key string, clients []*Client) (isDeleteUser bool) { return clients, true, true
// m := datautil.SliceToMapAny(clients, func(c *Client) (string, struct{}) { }
// return c.ctx.GetRemoteAddr(), struct{}{} return clients, true, false
// }) }
// allClients, existed := u.m.Load(key) return nil, false, false
// if !existed { }
// // If the key doesn't exist, return false.
// return false // Set adds a client to the map.
// } func (u *UserMap) Set(key string, v *Client) {
// allClients, existed := u.m.Load(key)
// // Filter out clients that are in the deleteMap. if existed {
// oldClients := allClients.([]*Client) log.ZDebug(context.Background(), "Set existed", "user_id", key, "client_user_id", v.UserID)
// var remainingClients []*Client oldClients := allClients.(*UserPlatform1)
// for _, client := range oldClients { oldClients.Time = time.Now()
// if _, shouldBeDeleted := m[client.ctx.GetRemoteAddr()]; !shouldBeDeleted { oldClients.Clients = append(oldClients.Clients, v)
// remainingClients = append(remainingClients, client) u.push(key, oldClients, nil)
// } } else {
// } log.ZDebug(context.Background(), "Set not existed", "user_id", key, "client_user_id", v.UserID)
// cli := &UserPlatform1{
// // Update or delete the key based on the remaining clients. Time: time.Now(),
// if len(remainingClients) == 0 { Clients: []*Client{v},
// u.m.Delete(key) }
// return true u.m.Store(key, cli)
// } u.push(key, cli, nil)
// }
// u.m.Store(key, remainingClients)
// return false }
//}
// func (u *UserMap) DeleteClients(key string, clients []*Client) (isDeleteUser bool) {
//func (u *UserMap) DeleteAll(key string) { m := datautil.SliceToMapAny(clients, func(c *Client) (string, struct{}) {
// u.m.Delete(key) return c.ctx.GetRemoteAddr(), struct{}{}
//} })
allClients, existed := u.m.Load(key)
if !existed {
// If the key doesn't exist, return false.
return false
}
// Filter out clients that are in the deleteMap.
oldClients := allClients.(*UserPlatform1)
var (
remainingClients []*Client
offline []int32
)
for _, client := range oldClients.Clients {
if _, shouldBeDeleted := m[client.ctx.GetRemoteAddr()]; !shouldBeDeleted {
remainingClients = append(remainingClients, client)
} else {
offline = append(offline, int32(client.PlatformID))
}
}
oldClients.Clients = remainingClients
defer u.push(key, oldClients, offline)
// Update or delete the key based on the remaining clients.
if len(remainingClients) == 0 {
u.m.Delete(key)
return true
}
return false
}

@ -9,10 +9,9 @@ type UMap interface {
GetAll(userID string) ([]*Client, bool) GetAll(userID string) ([]*Client, bool)
Get(userID string, platformID int) ([]*Client, bool, bool) Get(userID string, platformID int) ([]*Client, bool, bool)
Set(userID string, v *Client) Set(userID string, v *Client)
Delete(userID string, connRemoteAddr string) (isDeleteUser bool)
DeleteClients(userID string, clients []*Client) (isDeleteUser bool) DeleteClients(userID string, clients []*Client) (isDeleteUser bool)
UserState() <-chan UserState UserState() <-chan UserState
GetAllUserStatus(deadline, nowtime time.Time) []UserState GetAllUserStatus(deadline time.Time) []UserState
} }
var _ UMap = (*UserMap2)(nil) var _ UMap = (*UserMap2)(nil)
@ -102,26 +101,6 @@ func (u *UserMap2) Set(userID string, client *Client) {
u.push(client.UserID, result, nil) u.push(client.UserID, result, nil)
} }
func (u *UserMap2) Delete(userID string, connRemoteAddr string) (isDeleteUser bool) {
u.lock.Lock()
defer u.lock.Unlock()
result, ok := u.data[userID]
if !ok {
return false
}
client, ok := result.Clients[connRemoteAddr]
if !ok {
return false
}
delete(result.Clients, connRemoteAddr)
defer u.push(userID, result, []int32{int32(client.PlatformID)})
if len(result.Clients) > 0 {
return false
}
delete(u.data, userID)
return true
}
func (u *UserMap2) DeleteClients(userID string, clients []*Client) (isDeleteUser bool) { func (u *UserMap2) DeleteClients(userID string, clients []*Client) (isDeleteUser bool) {
if len(clients) == 0 { if len(clients) == 0 {
return false return false
@ -145,7 +124,7 @@ func (u *UserMap2) DeleteClients(userID string, clients []*Client) (isDeleteUser
return true return true
} }
func (u *UserMap2) GetAllUserStatus(deadline, nowtime time.Time) []UserState { func (u *UserMap2) GetAllUserStatus(deadline time.Time) []UserState {
u.lock.RLock() u.lock.RLock()
defer u.lock.RUnlock() defer u.lock.RUnlock()
if len(u.data) == 0 { if len(u.data) == 0 {
@ -159,7 +138,7 @@ func (u *UserMap2) GetAllUserStatus(deadline, nowtime time.Time) []UserState {
if p.Time.Before(deadline) { if p.Time.Before(deadline) {
continue continue
} }
p.Time = nowtime p.Time = time.Now()
online := make([]int32, 0, len(p.Clients)) online := make([]int32, 0, len(p.Clients))
for _, client := range p.Clients { for _, client := range p.Clients {
online = append(online, int32(client.PlatformID)) online = append(online, int32(client.PlatformID))

Loading…
Cancel
Save