parent
8f0403e67c
commit
8f86049599
@ -0,0 +1,106 @@
|
|||||||
|
package msggateway
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"crypto/md5"
|
||||||
|
"encoding/binary"
|
||||||
|
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache/cachekey"
|
||||||
|
pbuser "github.com/openimsdk/protocol/user"
|
||||||
|
"github.com/openimsdk/tools/log"
|
||||||
|
"github.com/openimsdk/tools/mcontext"
|
||||||
|
"github.com/openimsdk/tools/utils/datautil"
|
||||||
|
"math/rand"
|
||||||
|
"strconv"
|
||||||
|
"time"
|
||||||
|
)
|
||||||
|
|
||||||
|
func (ws *WsServer) ChangeOnlineStatus(concurrent int) {
|
||||||
|
if concurrent < 1 {
|
||||||
|
concurrent = 1
|
||||||
|
}
|
||||||
|
scanTicker := time.NewTicker(time.Minute * 3)
|
||||||
|
|
||||||
|
requestChs := make([]chan *pbuser.SetUserOnlineStatusReq, concurrent)
|
||||||
|
changeStatus := make([][]UserState, concurrent)
|
||||||
|
|
||||||
|
for i := 0; i < concurrent; i++ {
|
||||||
|
requestChs[i] = make(chan *pbuser.SetUserOnlineStatusReq, 64)
|
||||||
|
changeStatus[i] = make([]UserState, 100)
|
||||||
|
}
|
||||||
|
|
||||||
|
mergeTicker := time.NewTicker(time.Second)
|
||||||
|
|
||||||
|
local2pb := func(u UserState) *pbuser.UserOnlineStatus {
|
||||||
|
return &pbuser.UserOnlineStatus{
|
||||||
|
UserID: u.UserID,
|
||||||
|
Online: u.Online,
|
||||||
|
Offline: u.Offline,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
rNum := rand.Uint64()
|
||||||
|
pushUserState := func(us ...UserState) {
|
||||||
|
for _, u := range us {
|
||||||
|
sum := md5.Sum([]byte(u.UserID))
|
||||||
|
i := (binary.BigEndian.Uint64(sum[:]) + rNum) % uint64(concurrent)
|
||||||
|
changeStatus[i] = append(changeStatus[i], u)
|
||||||
|
status := changeStatus[i]
|
||||||
|
if len(status) == cap(status) {
|
||||||
|
req := &pbuser.SetUserOnlineStatusReq{
|
||||||
|
Status: datautil.Slice(status, local2pb),
|
||||||
|
}
|
||||||
|
changeStatus[i] = status[0:]
|
||||||
|
select {
|
||||||
|
case requestChs[i] <- req:
|
||||||
|
default:
|
||||||
|
log.ZError(context.Background(), "user online processing is too slow", nil)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pushAllUserState := func() {
|
||||||
|
for i, status := range changeStatus {
|
||||||
|
if len(status) == 0 {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
req := &pbuser.SetUserOnlineStatusReq{
|
||||||
|
Status: datautil.Slice(status, local2pb),
|
||||||
|
}
|
||||||
|
changeStatus[i] = status[0:]
|
||||||
|
select {
|
||||||
|
case requestChs[i] <- req:
|
||||||
|
default:
|
||||||
|
log.ZError(context.Background(), "user online processing is too slow", nil)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
opIdCtx := mcontext.SetOperationID(context.Background(), "r"+strconv.FormatUint(rNum, 10))
|
||||||
|
doRequest := func(req *pbuser.SetUserOnlineStatusReq) {
|
||||||
|
ctx, cancel := context.WithTimeout(opIdCtx, time.Second*5)
|
||||||
|
defer cancel()
|
||||||
|
if _, err := ws.userClient.Client.SetUserOnlineStatus(ctx, req); err != nil {
|
||||||
|
log.ZError(ctx, "update user online status", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
for i := 0; i < concurrent; i++ {
|
||||||
|
go func(ch <-chan *pbuser.SetUserOnlineStatusReq) {
|
||||||
|
for req := range ch {
|
||||||
|
doRequest(req)
|
||||||
|
}
|
||||||
|
}(requestChs[i])
|
||||||
|
}
|
||||||
|
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case <-mergeTicker.C:
|
||||||
|
pushAllUserState()
|
||||||
|
case now := <-scanTicker.C:
|
||||||
|
pushUserState(ws.clients.GetAllUserStatus(now.Add(-cachekey.OnlineExpire/3), now)...)
|
||||||
|
case state := <-ws.clients.UserState():
|
||||||
|
pushUserState(state)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
@ -1,32 +1,180 @@
|
|||||||
package msggateway
|
package msggateway
|
||||||
|
|
||||||
/*
|
import (
|
||||||
|
"sync"
|
||||||
sorted set
|
"time"
|
||||||
|
)
|
||||||
userID: 10000
|
|
||||||
|
type UMap interface {
|
||||||
USER_ONLINE:10000
|
GetAll(userID string) ([]*Client, bool)
|
||||||
|
Get(userID string, platformID int) ([]*Client, bool, bool)
|
||||||
|
Set(userID string, v *Client)
|
||||||
|
Delete(userID string, connRemoteAddr string) (isDeleteUser bool)
|
||||||
|
DeleteClients(userID string, clients []*Client) (isDeleteUser bool)
|
||||||
|
UserState() <-chan UserState
|
||||||
platformID: 1
|
GetAllUserStatus(deadline, nowtime time.Time) []UserState
|
||||||
|
}
|
||||||
|
|
||||||
|
var _ UMap = (*UserMap2)(nil)
|
||||||
|
|
||||||
|
type UserPlatform struct {
|
||||||
key score
|
Time time.Time
|
||||||
|
Clients map[string]*Client
|
||||||
E1 123456789
|
}
|
||||||
O1 234567895
|
|
||||||
|
func (u *UserPlatform) PlatformIDs() []int32 {
|
||||||
|
if len(u.Clients) == 0 {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
platformIDs := make([]int32, 0, len(u.Clients))
|
||||||
|
for _, client := range u.Clients {
|
||||||
|
platformIDs = append(platformIDs, int32(client.PlatformID))
|
||||||
*/
|
}
|
||||||
|
return platformIDs
|
||||||
|
}
|
||||||
|
|
||||||
|
func newUserMap() UMap {
|
||||||
|
return &UserMap2{
|
||||||
|
data: make(map[string]*UserPlatform),
|
||||||
|
ch: make(chan UserState, 10000),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
type UserMap2 struct {
|
||||||
|
lock sync.RWMutex
|
||||||
|
data map[string]*UserPlatform
|
||||||
|
ch chan UserState
|
||||||
|
}
|
||||||
|
|
||||||
|
func (u *UserMap2) push(userID string, userPlatform *UserPlatform, offline []int32) bool {
|
||||||
|
select {
|
||||||
|
case u.ch <- UserState{UserID: userID, Online: userPlatform.PlatformIDs(), Offline: offline}:
|
||||||
|
userPlatform.Time = time.Now()
|
||||||
|
return true
|
||||||
|
default:
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (u *UserMap2) GetAll(userID string) ([]*Client, bool) {
|
||||||
|
u.lock.RLock()
|
||||||
|
defer u.lock.RUnlock()
|
||||||
|
result, ok := u.data[userID]
|
||||||
|
if !ok {
|
||||||
|
return nil, false
|
||||||
|
}
|
||||||
|
clients := make([]*Client, 0, len(result.Clients))
|
||||||
|
for _, client := range result.Clients {
|
||||||
|
clients = append(clients, client)
|
||||||
|
}
|
||||||
|
return clients, true
|
||||||
|
}
|
||||||
|
|
||||||
|
func (u *UserMap2) Get(userID string, platformID int) ([]*Client, bool, bool) {
|
||||||
|
u.lock.RLock()
|
||||||
|
defer u.lock.RUnlock()
|
||||||
|
result, ok := u.data[userID]
|
||||||
|
if !ok {
|
||||||
|
return nil, false, false
|
||||||
|
}
|
||||||
|
var clients []*Client
|
||||||
|
for _, client := range result.Clients {
|
||||||
|
if client.PlatformID == platformID {
|
||||||
|
clients = append(clients, client)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return clients, true, len(clients) > 0
|
||||||
|
}
|
||||||
|
|
||||||
|
func (u *UserMap2) Set(userID string, client *Client) {
|
||||||
|
u.lock.Lock()
|
||||||
|
defer u.lock.Unlock()
|
||||||
|
result, ok := u.data[userID]
|
||||||
|
if ok {
|
||||||
|
result.Clients[client.ctx.GetRemoteAddr()] = client
|
||||||
|
} else {
|
||||||
|
result = &UserPlatform{
|
||||||
|
Clients: map[string]*Client{
|
||||||
|
client.ctx.GetRemoteAddr(): client,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
}
|
||||||
|
u.push(client.UserID, result, nil)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (u *UserMap2) Delete(userID string, connRemoteAddr string) (isDeleteUser bool) {
|
||||||
|
u.lock.Lock()
|
||||||
|
defer u.lock.Unlock()
|
||||||
|
result, ok := u.data[userID]
|
||||||
|
if !ok {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
client, ok := result.Clients[connRemoteAddr]
|
||||||
|
if !ok {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
delete(result.Clients, connRemoteAddr)
|
||||||
|
defer u.push(userID, result, []int32{int32(client.PlatformID)})
|
||||||
|
if len(result.Clients) > 0 {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
delete(u.data, userID)
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
|
||||||
|
func (u *UserMap2) DeleteClients(userID string, clients []*Client) (isDeleteUser bool) {
|
||||||
|
if len(clients) == 0 {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
u.lock.Lock()
|
||||||
|
defer u.lock.Unlock()
|
||||||
|
result, ok := u.data[userID]
|
||||||
|
if !ok {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
offline := make([]int32, 0, len(clients))
|
||||||
|
for _, client := range clients {
|
||||||
|
offline = append(offline, int32(client.PlatformID))
|
||||||
|
delete(result.Clients, client.ctx.GetRemoteAddr())
|
||||||
|
}
|
||||||
|
defer u.push(userID, result, offline)
|
||||||
|
if len(result.Clients) > 0 {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
delete(u.data, userID)
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
|
||||||
|
func (u *UserMap2) GetAllUserStatus(deadline, nowtime time.Time) []UserState {
|
||||||
|
u.lock.RLock()
|
||||||
|
defer u.lock.RUnlock()
|
||||||
|
if len(u.data) == 0 {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
result := make([]UserState, 0, len(u.data))
|
||||||
|
for userID, p := range u.data {
|
||||||
|
if len(result) == cap(result) {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
if p.Time.Before(deadline) {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
p.Time = nowtime
|
||||||
|
online := make([]int32, 0, len(p.Clients))
|
||||||
|
for _, client := range p.Clients {
|
||||||
|
online = append(online, int32(client.PlatformID))
|
||||||
|
}
|
||||||
|
result = append(result, UserState{UserID: userID, Online: online})
|
||||||
|
}
|
||||||
|
return result
|
||||||
|
}
|
||||||
|
|
||||||
|
func (u *UserMap2) UserState() <-chan UserState {
|
||||||
|
return u.ch
|
||||||
|
}
|
||||||
|
|
||||||
|
type UserState struct {
|
||||||
|
UserID string
|
||||||
|
Online []int32
|
||||||
|
Offline []int32
|
||||||
|
}
|
||||||
|
@ -0,0 +1,122 @@
|
|||||||
|
package user
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"github.com/openimsdk/protocol/constant"
|
||||||
|
"github.com/openimsdk/protocol/sdkws"
|
||||||
|
pbuser "github.com/openimsdk/protocol/user"
|
||||||
|
)
|
||||||
|
|
||||||
|
func (s *userServer) getUserOnlineStatus(ctx context.Context, userID string) (*pbuser.OnlineStatus, error) {
|
||||||
|
platformIDs, err := s.online.GetOnline(ctx, userID)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
status := pbuser.OnlineStatus{
|
||||||
|
UserID: userID,
|
||||||
|
PlatformIDs: platformIDs,
|
||||||
|
}
|
||||||
|
if len(platformIDs) > 0 {
|
||||||
|
status.Status = constant.Online
|
||||||
|
} else {
|
||||||
|
status.Status = constant.Offline
|
||||||
|
}
|
||||||
|
return &status, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *userServer) getUsersOnlineStatus(ctx context.Context, userIDs []string) ([]*pbuser.OnlineStatus, error) {
|
||||||
|
res := make([]*pbuser.OnlineStatus, 0, len(userIDs))
|
||||||
|
for _, userID := range userIDs {
|
||||||
|
status, err := s.getUserOnlineStatus(ctx, userID)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
res = append(res, status)
|
||||||
|
}
|
||||||
|
return res, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// SubscribeOrCancelUsersStatus Subscribe online or cancel online users.
|
||||||
|
func (s *userServer) SubscribeOrCancelUsersStatus(ctx context.Context, req *pbuser.SubscribeOrCancelUsersStatusReq) (*pbuser.SubscribeOrCancelUsersStatusResp, error) {
|
||||||
|
if req.Genre == constant.SubscriberUser {
|
||||||
|
err := s.db.SubscribeUsersStatus(ctx, req.UserID, req.UserIDs)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
var status []*pbuser.OnlineStatus
|
||||||
|
status, err = s.getUsersOnlineStatus(ctx, req.UserIDs)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
return &pbuser.SubscribeOrCancelUsersStatusResp{StatusList: status}, nil
|
||||||
|
} else if req.Genre == constant.Unsubscribe {
|
||||||
|
err := s.db.UnsubscribeUsersStatus(ctx, req.UserID, req.UserIDs)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return &pbuser.SubscribeOrCancelUsersStatusResp{}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// GetUserStatus Get the online status of the user.
|
||||||
|
func (s *userServer) GetUserStatus(ctx context.Context, req *pbuser.GetUserStatusReq) (*pbuser.GetUserStatusResp, error) {
|
||||||
|
res, err := s.getUsersOnlineStatus(ctx, req.UserIDs)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
return &pbuser.GetUserStatusResp{StatusList: res}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// SetUserStatus Synchronize user's online status.
|
||||||
|
func (s *userServer) SetUserStatus(ctx context.Context, req *pbuser.SetUserStatusReq) (*pbuser.SetUserStatusResp, error) {
|
||||||
|
var (
|
||||||
|
online []int32
|
||||||
|
offline []int32
|
||||||
|
)
|
||||||
|
switch req.Status {
|
||||||
|
case constant.Online:
|
||||||
|
online = []int32{req.PlatformID}
|
||||||
|
case constant.Offline:
|
||||||
|
online = []int32{req.PlatformID}
|
||||||
|
}
|
||||||
|
if err := s.online.SetUserOnline(ctx, req.UserID, online, offline); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
list, err := s.db.GetSubscribedList(ctx, req.UserID)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
for _, userID := range list {
|
||||||
|
tips := &sdkws.UserStatusChangeTips{
|
||||||
|
FromUserID: req.UserID,
|
||||||
|
ToUserID: userID,
|
||||||
|
Status: req.Status,
|
||||||
|
PlatformID: req.PlatformID,
|
||||||
|
}
|
||||||
|
s.userNotificationSender.UserStatusChangeNotification(ctx, tips)
|
||||||
|
}
|
||||||
|
|
||||||
|
return &pbuser.SetUserStatusResp{}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// GetSubscribeUsersStatus Get the online status of subscribers.
|
||||||
|
func (s *userServer) GetSubscribeUsersStatus(ctx context.Context, req *pbuser.GetSubscribeUsersStatusReq) (*pbuser.GetSubscribeUsersStatusResp, error) {
|
||||||
|
userList, err := s.db.GetAllSubscribeList(ctx, req.UserID)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
onlineStatusList, err := s.getUsersOnlineStatus(ctx, userList)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
return &pbuser.GetSubscribeUsersStatusResp{StatusList: onlineStatusList}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *userServer) SetUserOnlineStatus(ctx context.Context, req *pbuser.SetUserOnlineStatusReq) (*pbuser.SetUserOnlineStatusResp, error) {
|
||||||
|
for _, status := range req.Status {
|
||||||
|
if err := s.online.SetUserOnline(ctx, status.UserID, status.Online, status.Offline); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return &pbuser.SetUserOnlineStatusResp{}, nil
|
||||||
|
}
|
@ -0,0 +1,13 @@
|
|||||||
|
package cachekey
|
||||||
|
|
||||||
|
import "time"
|
||||||
|
|
||||||
|
const (
|
||||||
|
OnlineKey = "ONLINE:"
|
||||||
|
OnlineChannel = "online_change"
|
||||||
|
OnlineExpire = time.Hour / 2
|
||||||
|
)
|
||||||
|
|
||||||
|
func GetOnlineKey(userID string) string {
|
||||||
|
return OnlineKey + userID
|
||||||
|
}
|
@ -0,0 +1,8 @@
|
|||||||
|
package cache
|
||||||
|
|
||||||
|
import "context"
|
||||||
|
|
||||||
|
type OnlineCache interface {
|
||||||
|
GetOnline(ctx context.Context, userID string) ([]int32, error)
|
||||||
|
SetUserOnline(ctx context.Context, userID string, online, offline []int32) error
|
||||||
|
}
|
Loading…
Reference in new issue