online status

pull/2393/head
withchao 1 year ago
parent 97636c4c7a
commit c1967a63ca

@ -17,6 +17,7 @@ package msggateway
import ( import (
"context" "context"
"github.com/openimsdk/open-im-server/v3/pkg/common/config" "github.com/openimsdk/open-im-server/v3/pkg/common/config"
"github.com/openimsdk/tools/db/redisutil"
"github.com/openimsdk/tools/utils/datautil" "github.com/openimsdk/tools/utils/datautil"
"time" "time"
@ -26,6 +27,7 @@ import (
type Config struct { type Config struct {
MsgGateway config.MsgGateway MsgGateway config.MsgGateway
Share config.Share Share config.Share
RedisConfig config.Redis
WebhooksConfig config.Webhooks WebhooksConfig config.Webhooks
Discovery config.Discovery Discovery config.Discovery
} }
@ -42,6 +44,10 @@ func Start(ctx context.Context, index int, conf *Config) error {
if err != nil { if err != nil {
return err return err
} }
rdb, err := redisutil.NewRedisClient(ctx, conf.RedisConfig.Build())
if err != nil {
return err
}
longServer := NewWsServer( longServer := NewWsServer(
conf, conf,
WithPort(wsPort), WithPort(wsPort),
@ -52,6 +58,8 @@ func Start(ctx context.Context, index int, conf *Config) error {
go longServer.ChangeOnlineStatus(4) go longServer.ChangeOnlineStatus(4)
go longServer.SubscriberUserOnlineStatusChanges(rdb)
hubServer := NewServer(rpcPort, longServer, conf) hubServer := NewServer(rpcPort, longServer, conf)
netDone := make(chan error) netDone := make(chan error)
go func() { go func() {

@ -1,5 +1,30 @@
package msggateway package msggateway
import (
"context"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache/cachekey"
"github.com/openimsdk/open-im-server/v3/pkg/util/useronline"
"github.com/openimsdk/tools/log"
"github.com/openimsdk/tools/mcontext"
"github.com/redis/go-redis/v9"
"math/rand"
"strconv"
)
func (ws *WsServer) SubscriberUserOnlineStatusChanges(rdb redis.UniversalClient) {
ctx := mcontext.SetOperationID(context.Background(), cachekey.OnlineChannel+strconv.FormatUint(rand.Uint64(), 10))
for message := range rdb.Subscribe(ctx, cachekey.OnlineChannel).Channel() {
userID, platformIDs, err := useronline.ParseUserOnlineStatus(message.Payload)
if err != nil {
log.ZError(ctx, "OnlineCache redis subscribe parseUserOnlineStatus", err, "payload", message.Payload, "channel", message.Channel)
continue
}
if ws.clients.RecvSubChange(userID, platformIDs) {
log.ZDebug(ctx, "receive subscription message and go back online", "userID", userID)
}
}
}
//import ( //import (
// "context" // "context"
// "encoding/json" // "encoding/json"

@ -13,6 +13,7 @@ type UserMap interface {
DeleteClients(userID string, clients []*Client) (isDeleteUser bool) DeleteClients(userID string, clients []*Client) (isDeleteUser bool)
UserState() <-chan UserState UserState() <-chan UserState
GetAllUserStatus(deadline time.Time, nowtime time.Time) []UserState GetAllUserStatus(deadline time.Time, nowtime time.Time) []UserState
RecvSubChange(userID string, platformIDs []int32) bool
} }
type UserState struct { type UserState struct {
@ -37,6 +38,17 @@ func (u *UserPlatform) PlatformIDs() []int32 {
return platformIDs return platformIDs
} }
func (u *UserPlatform) PlatformIDSet() map[int32]struct{} {
if len(u.Clients) == 0 {
return nil
}
platformIDs := make(map[int32]struct{})
for _, client := range u.Clients {
platformIDs[int32(client.PlatformID)] = struct{}{}
}
return platformIDs
}
func newUserMap() UserMap { func newUserMap() UserMap {
return &userMap{ return &userMap{
data: make(map[string]*UserPlatform), data: make(map[string]*UserPlatform),
@ -50,6 +62,24 @@ type userMap struct {
ch chan UserState ch chan UserState
} }
func (u *userMap) RecvSubChange(userID string, platformIDs []int32) bool {
u.lock.RLock()
defer u.lock.RUnlock()
result, ok := u.data[userID]
if !ok {
return false
}
localPlatformIDs := result.PlatformIDSet()
for _, platformID := range platformIDs {
delete(localPlatformIDs, platformID)
}
if len(localPlatformIDs) == 0 {
return false
}
u.push(userID, result, nil)
return true
}
func (u *userMap) push(userID string, userPlatform *UserPlatform, offline []int32) bool { func (u *userMap) push(userID string, userPlatform *UserPlatform, offline []int32) bool {
select { select {
case u.ch <- UserState{UserID: userID, Online: userPlatform.PlatformIDs(), Offline: offline}: case u.ch <- UserState{UserID: userID, Online: userPlatform.PlatformIDs(), Offline: offline}:

@ -37,6 +37,7 @@ func NewMsgGatewayCmd() *MsgGatewayCmd {
ret.configMap = map[string]any{ ret.configMap = map[string]any{
OpenIMMsgGatewayCfgFileName: &msgGatewayConfig.MsgGateway, OpenIMMsgGatewayCfgFileName: &msgGatewayConfig.MsgGateway,
ShareFileName: &msgGatewayConfig.Share, ShareFileName: &msgGatewayConfig.Share,
RedisConfigFileName: &msgGatewayConfig.RedisConfig,
WebhooksConfigFileName: &msgGatewayConfig.WebhooksConfig, WebhooksConfigFileName: &msgGatewayConfig.WebhooksConfig,
DiscoveryConfigFilename: &msgGatewayConfig.Discovery, DiscoveryConfigFilename: &msgGatewayConfig.Discovery,
} }

@ -2,17 +2,16 @@ package rpccache
import ( import (
"context" "context"
"errors"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache/cachekey" "github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache/cachekey"
"github.com/openimsdk/open-im-server/v3/pkg/localcache" "github.com/openimsdk/open-im-server/v3/pkg/localcache"
"github.com/openimsdk/open-im-server/v3/pkg/localcache/lru" "github.com/openimsdk/open-im-server/v3/pkg/localcache/lru"
"github.com/openimsdk/open-im-server/v3/pkg/rpcclient" "github.com/openimsdk/open-im-server/v3/pkg/rpcclient"
"github.com/openimsdk/open-im-server/v3/pkg/util/useronline"
"github.com/openimsdk/tools/log" "github.com/openimsdk/tools/log"
"github.com/openimsdk/tools/mcontext" "github.com/openimsdk/tools/mcontext"
"github.com/redis/go-redis/v9" "github.com/redis/go-redis/v9"
"math/rand" "math/rand"
"strconv" "strconv"
"strings"
"time" "time"
) )
@ -25,28 +24,9 @@ func NewOnlineCache(user rpcclient.UserRpcClient, group *GroupLocalCache, rdb re
}), }),
} }
go func() { go func() {
parseUserOnlineStatus := func(payload string) (string, []int32, error) {
arr := strings.Split(payload, ":")
if len(arr) == 0 {
return "", nil, errors.New("invalid data")
}
userID := arr[len(arr)-1]
if userID == "" {
return "", nil, errors.New("userID is empty")
}
platformIDs := make([]int32, len(arr)-1)
for i := range platformIDs {
platformID, err := strconv.Atoi(arr[i])
if err != nil {
return "", nil, err
}
platformIDs[i] = int32(platformID)
}
return userID, platformIDs, nil
}
ctx := mcontext.SetOperationID(context.Background(), cachekey.OnlineChannel+strconv.FormatUint(rand.Uint64(), 10)) ctx := mcontext.SetOperationID(context.Background(), cachekey.OnlineChannel+strconv.FormatUint(rand.Uint64(), 10))
for message := range rdb.Subscribe(ctx, cachekey.OnlineChannel).Channel() { for message := range rdb.Subscribe(ctx, cachekey.OnlineChannel).Channel() {
userID, platformIDs, err := parseUserOnlineStatus(message.Payload) userID, platformIDs, err := useronline.ParseUserOnlineStatus(message.Payload)
if err != nil { if err != nil {
log.ZError(ctx, "OnlineCache redis subscribe parseUserOnlineStatus", err, "payload", message.Payload, "channel", message.Channel) log.ZError(ctx, "OnlineCache redis subscribe parseUserOnlineStatus", err, "payload", message.Payload, "channel", message.Channel)
continue continue

@ -0,0 +1,27 @@
package useronline
import (
"errors"
"strconv"
"strings"
)
func ParseUserOnlineStatus(payload string) (string, []int32, error) {
arr := strings.Split(payload, ":")
if len(arr) == 0 {
return "", nil, errors.New("invalid data")
}
userID := arr[len(arr)-1]
if userID == "" {
return "", nil, errors.New("userID is empty")
}
platformIDs := make([]int32, len(arr)-1)
for i := range platformIDs {
platformID, err := strconv.Atoi(arr[i])
if err != nil {
return "", nil, err
}
platformIDs[i] = int32(platformID)
}
return userID, platformIDs, nil
}
Loading…
Cancel
Save