From 389b05dc449f68080f06ad3253aabcf4f7fa911e Mon Sep 17 00:00:00 2001 From: withchao <993506633@qq.com> Date: Thu, 1 Aug 2024 11:25:19 +0800 Subject: [PATCH] test --- internal/msggateway/online.go | 2 +- internal/msggateway/subscription.go | 2 +- internal/msggateway/user_map.go | 20 +++++++++++++++----- 3 files changed, 17 insertions(+), 7 deletions(-) diff --git a/internal/msggateway/online.go b/internal/msggateway/online.go index 9604d563b..27b4544aa 100644 --- a/internal/msggateway/online.go +++ b/internal/msggateway/online.go @@ -107,7 +107,7 @@ func (ws *WsServer) ChangeOnlineStatus(concurrent int) { case now := <-renewalTicker.C: deadline := now.Add(-cachekey.OnlineExpire / 3) users := ws.clients.GetAllUserStatus(deadline, now) - log.ZDebug(context.Background(), "renewal ticker", "deadline", deadline, "nowtime", now, "users", users) + log.ZDebug(context.Background(), "renewal ticker", "deadline", deadline, "nowtime", now, "num", len(users), "users", users) pushUserState(users...) case state := <-ws.clients.UserState(): log.ZDebug(context.Background(), "OnlineCache user online change", "userID", state.UserID, "online", state.Online, "offline", state.Offline) diff --git a/internal/msggateway/subscription.go b/internal/msggateway/subscription.go index 9bb41e0df..e5f9a4e1a 100644 --- a/internal/msggateway/subscription.go +++ b/internal/msggateway/subscription.go @@ -15,7 +15,7 @@ func (ws *WsServer) subscriberUserOnlineStatusChanges(ctx context.Context, userI } else { log.ZDebug(ctx, "gateway ignore user online status changes", "userID", userID, "platformIDs", platformIDs) } - ws.pushUserIDOnlineStatus(ctx, userID, platformIDs) + go ws.pushUserIDOnlineStatus(ctx, userID, platformIDs) } func (ws *WsServer) SubUserOnlineStatus(ctx context.Context, client *Client, data *Req) ([]byte, error) { diff --git a/internal/msggateway/user_map.go b/internal/msggateway/user_map.go index 14e7c66ac..d0addbd54 100644 --- a/internal/msggateway/user_map.go +++ b/internal/msggateway/user_map.go @@ -6,9 +6,11 @@ import ( "fmt" "github.com/openimsdk/protocol/constant" "github.com/openimsdk/tools/log" + "github.com/openimsdk/tools/mcontext" "github.com/openimsdk/tools/utils/datautil" "strconv" "sync" + "sync/atomic" "time" ) @@ -36,7 +38,7 @@ type UserPlatform struct { func (u *UserPlatform) String() string { buf := bytes.NewBuffer(nil) buf.WriteString("UserPlatform{Time: ") - buf.WriteString(u.Time.Format(time.DateTime)) + buf.WriteString(u.Time.String()) buf.WriteString(", Clients<") buf.WriteString(strconv.Itoa(len(u.Clients))) buf.WriteString(">: [") @@ -189,13 +191,21 @@ func (u *userMap) DeleteClients(userID string, clients []*Client) (isDeleteUser return true } -func (u *userMap) GetAllUserStatus(deadline time.Time, nowtime time.Time) []UserState { +var opIDIncr atomic.Int64 + +func (u *userMap) GetAllUserStatus(deadline time.Time, nowtime time.Time) (result []UserState) { + ctx := mcontext.SetOperationID(context.Background(), fmt.Sprintf("op_%d", opIDIncr.Add(1))) + log.ZDebug(ctx, "userMap GetAllUserStatus", "deadline", deadline, "nowtime", nowtime) + defer func() { + log.ZDebug(ctx, "userMap GetAllUserStatus", "num", len(result), "result", result) + }() u.lock.RLock() defer u.lock.RUnlock() - result := make([]UserState, 0, len(u.data)) + result = make([]UserState, 0, len(u.data)) for userID, userPlatform := range u.data { - log.ZDebug(context.Background(), "userMap GetAllUserStatus", "userID", userID, "platforms", userPlatform.String()) - if userPlatform.Time.Before(deadline) { + add := userPlatform.Time.Before(deadline) + log.ZDebug(ctx, "userMap GetAllUserStatus", "userID", userID, "add", add, "platforms", userPlatform.String()) + if add { continue } userPlatform.Time = nowtime