From fcda73f4bc0eb3f48b74d5567ff25364d853fb4e Mon Sep 17 00:00:00 2001 From: withchao <993506633@qq.com> Date: Thu, 4 Jul 2024 10:16:31 +0800 Subject: [PATCH] online push --- cmd/openim-api/main.go | 1 - go.mod | 2 +- go.sum | 4 +- internal/msggateway/client.go | 2 + internal/msggateway/n_ws_server.go | 22 +++-- internal/msggateway/subscription.go | 148 ++++++++++++++++++++++++++++ 6 files changed, 165 insertions(+), 14 deletions(-) create mode 100644 internal/msggateway/subscription.go diff --git a/cmd/openim-api/main.go b/cmd/openim-api/main.go index 58e540c05..e29ed2a59 100644 --- a/cmd/openim-api/main.go +++ b/cmd/openim-api/main.go @@ -25,5 +25,4 @@ func main() { if err := cmd.NewApiCmd().Exec(); err != nil { program.ExitWithError(err) } - } diff --git a/go.mod b/go.mod index e7ed446f4..e4f45e963 100644 --- a/go.mod +++ b/go.mod @@ -13,7 +13,7 @@ require ( github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0 github.com/lestrrat-go/file-rotatelogs v2.4.0+incompatible // indirect github.com/mitchellh/mapstructure v1.5.0 - github.com/openimsdk/protocol v0.0.69-alpha.20 + github.com/openimsdk/protocol v0.0.69-alpha.24 github.com/openimsdk/tools v0.0.49-alpha.25 github.com/pkg/errors v0.9.1 // indirect github.com/prometheus/client_golang v1.18.0 diff --git a/go.sum b/go.sum index adde3aa82..b0f495e82 100644 --- a/go.sum +++ b/go.sum @@ -270,8 +270,8 @@ github.com/onsi/gomega v1.25.0 h1:Vw7br2PCDYijJHSfBOWhov+8cAnUf8MfMaIOV323l6Y= github.com/onsi/gomega v1.25.0/go.mod h1:r+zV744Re+DiYCIPRlYOTxn0YkOLcAnW8k1xXdMPGhM= github.com/openimsdk/gomake v0.0.13 h1:xLDe/moqgWpRoptHzI4packAWzs4C16b+sVY+txNJp0= github.com/openimsdk/gomake v0.0.13/go.mod h1:PndCozNc2IsQIciyn9mvEblYWZwJmAI+06z94EY+csI= -github.com/openimsdk/protocol v0.0.69-alpha.20 h1:skZu82sqoMhiQVEZgrRsjcfI3Grp1IpThx1LJPqETWs= -github.com/openimsdk/protocol v0.0.69-alpha.20/go.mod h1:OZQA9FR55lseYoN2Ql1XAHYKHJGu7OMNkUbuekrKCM8= +github.com/openimsdk/protocol v0.0.69-alpha.24 h1:TYcNJeWOTuE40UQ54eNPdDdy0KTOh9rAOgax8lCyhDc= +github.com/openimsdk/protocol v0.0.69-alpha.24/go.mod h1:OZQA9FR55lseYoN2Ql1XAHYKHJGu7OMNkUbuekrKCM8= github.com/openimsdk/tools v0.0.49-alpha.25 h1:OpRPwDZ2xWX7Zj5kyfZhryu/NfZTrsRVr2GFwu1HQHI= github.com/openimsdk/tools v0.0.49-alpha.25/go.mod h1:rwsFI1G/nBHNfiNapbven41akRDPBbH4df0Cgy6xueU= github.com/pelletier/go-toml/v2 v2.1.0 h1:FnwAJ4oYMvbT/34k9zzHuZNrhlz48GB3/s6at6/MHO4= diff --git a/internal/msggateway/client.go b/internal/msggateway/client.go index 0581a025b..62c71d5be 100644 --- a/internal/msggateway/client.go +++ b/internal/msggateway/client.go @@ -72,6 +72,8 @@ type Client struct { closed atomic.Bool closedErr error token string + //subLock sync.Mutex + //subUserIDs map[string]struct{} } // ResetClient updates the client's state with new connection and context information. diff --git a/internal/msggateway/n_ws_server.go b/internal/msggateway/n_ws_server.go index 1af9f014f..8a79151fe 100644 --- a/internal/msggateway/n_ws_server.go +++ b/internal/msggateway/n_ws_server.go @@ -54,13 +54,14 @@ type LongConnServer interface { } type WsServer struct { - msgGatewayConfig *Config - port int - wsMaxConnNum int64 - registerChan chan *Client - unregisterChan chan *Client - kickHandlerChan chan *kickHandler - clients UserMap + msgGatewayConfig *Config + port int + wsMaxConnNum int64 + registerChan chan *Client + unregisterChan chan *Client + kickHandlerChan chan *kickHandler + clients UserMap + //subscription *Subscription clientPool sync.Pool onlineUserNum atomic.Int64 onlineUserConnNum atomic.Int64 @@ -141,9 +142,10 @@ func NewWsServer(msgGatewayConfig *Config, opts ...Option) *WsServer { kickHandlerChan: make(chan *kickHandler, 1000), validate: v, clients: newUserMap(), - Compressor: NewGzipCompressor(), - Encoder: NewGobEncoder(), - webhookClient: webhook.NewWebhookClient(msgGatewayConfig.WebhooksConfig.URL), + //subscription: newSubscription(), + Compressor: NewGzipCompressor(), + Encoder: NewGobEncoder(), + webhookClient: webhook.NewWebhookClient(msgGatewayConfig.WebhooksConfig.URL), } } diff --git a/internal/msggateway/subscription.go b/internal/msggateway/subscription.go new file mode 100644 index 000000000..d7e606037 --- /dev/null +++ b/internal/msggateway/subscription.go @@ -0,0 +1,148 @@ +package msggateway + +//import ( +// "context" +// "encoding/json" +// "github.com/openimsdk/protocol/constant" +// "github.com/openimsdk/protocol/sdkws" +// "github.com/openimsdk/tools/log" +// "github.com/openimsdk/tools/utils/datautil" +// "github.com/openimsdk/tools/utils/idutil" +// "sync" +// "time" +//) +// +//type subClient struct { +// clients map[string]*Client +//} +// +//func newSubscription() *Subscription { +// return &Subscription{ +// userIDs: make(map[string]*subClient), +// } +//} +// +//type Subscription struct { +// lock sync.RWMutex +// userIDs map[string]*subClient +//} +// +//func (s *Subscription) GetClient(userID string) []*Client { +// s.lock.RLock() +// defer s.lock.RUnlock() +// cs, ok := s.userIDs[userID] +// if !ok { +// return nil +// } +// clients := make([]*Client, 0, len(cs.clients)) +// for _, client := range cs.clients { +// clients = append(clients, client) +// } +// return clients +//} +// +//func (s *Subscription) DelClient(client *Client) { +// client.subLock.Lock() +// userIDs := datautil.Keys(client.subUserIDs) +// for _, userID := range userIDs { +// delete(client.subUserIDs, userID) +// } +// client.subLock.Unlock() +// if len(userIDs) == 0 { +// return +// } +// addr := client.ctx.GetRemoteAddr() +// s.lock.Lock() +// defer s.lock.Unlock() +// for _, userID := range userIDs { +// sub, ok := s.userIDs[userID] +// if !ok { +// continue +// } +// delete(sub.clients, addr) +// if len(sub.clients) == 0 { +// delete(s.userIDs, userID) +// } +// } +//} +// +//func (s *Subscription) Sub(client *Client, addUserIDs, delUserIDs []string) { +// if len(addUserIDs)+len(delUserIDs) == 0 { +// return +// } +// var ( +// del = make(map[string]struct{}) +// add = make(map[string]struct{}) +// ) +// client.subLock.Lock() +// for _, userID := range delUserIDs { +// if _, ok := client.subUserIDs[userID]; !ok { +// continue +// } +// del[userID] = struct{}{} +// delete(client.subUserIDs, userID) +// } +// for _, userID := range addUserIDs { +// delete(del, userID) +// if _, ok := client.subUserIDs[userID]; ok { +// continue +// } +// client.subUserIDs[userID] = struct{}{} +// } +// client.subLock.Unlock() +// if len(del)+len(add) == 0 { +// return +// } +// addr := client.ctx.GetRemoteAddr() +// s.lock.Lock() +// defer s.lock.Unlock() +// for userID := range del { +// sub, ok := s.userIDs[userID] +// if !ok { +// continue +// } +// delete(sub.clients, addr) +// if len(sub.clients) == 0 { +// delete(s.userIDs, userID) +// } +// } +// for userID := range add { +// sub, ok := s.userIDs[userID] +// if !ok { +// sub = &subClient{clients: make(map[string]*Client)} +// s.userIDs[userID] = sub +// } +// sub.clients[addr] = client +// } +//} +// +//func (ws *WsServer) pushUserIDOnlineStatus(ctx context.Context, userID string, platformIDs []int32) { +// clients := ws.subscription.GetClient(userID) +// if len(clients) == 0 { +// return +// } +// msgContent, err := json.Marshal(platformIDs) +// if err != nil { +// log.ZError(ctx, "pushUserIDOnlineStatus json.Marshal", err) +// return +// } +// now := time.Now().UnixMilli() +// msgID := idutil.GetMsgIDByMD5(userID) +// msg := &sdkws.MsgData{ +// SendID: userID, +// ClientMsgID: msgID, +// ServerMsgID: msgID, +// SenderPlatformID: constant.AdminPlatformID, +// SessionType: constant.NotificationChatType, +// ContentType: constant.UserSubscribeOnlineStatusNotification, +// Content: msgContent, +// SendTime: now, +// CreateTime: now, +// } +// for _, client := range clients { +// msg.RecvID = client.UserID +// if err := client.PushMessage(ctx, msg); err != nil { +// log.ZError(ctx, "UserSubscribeOnlineStatusNotification push failed", err, "userID", client.UserID, "platformID", client.PlatformID, "changeUserID", userID, "content", msgContent) +// } +// } +//}