version log index

pull/2434/head
withchao 1 year ago
parent daf263962e
commit 276c0ab188

@ -13,7 +13,7 @@ require (
github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0
github.com/mitchellh/mapstructure v1.5.0
github.com/openimsdk/protocol v0.0.69-alpha.38
github.com/openimsdk/tools v0.0.49-alpha.51
github.com/openimsdk/tools v0.0.49-alpha.52
github.com/pkg/errors v0.9.1 // indirect
github.com/prometheus/client_golang v1.18.0
github.com/stretchr/testify v1.9.0

@ -321,8 +321,8 @@ github.com/openimsdk/gomake v0.0.14-alpha.5 h1:VY9c5x515lTfmdhhPjMvR3BBRrRquAUCF
github.com/openimsdk/gomake v0.0.14-alpha.5/go.mod h1:PndCozNc2IsQIciyn9mvEblYWZwJmAI+06z94EY+csI=
github.com/openimsdk/protocol v0.0.69-alpha.38 h1:kVZCHIXg/el8YJFoIBWhZu1sbbTUqmzgF4l0W3sUH24=
github.com/openimsdk/protocol v0.0.69-alpha.38/go.mod h1:OZQA9FR55lseYoN2Ql1XAHYKHJGu7OMNkUbuekrKCM8=
github.com/openimsdk/tools v0.0.49-alpha.51 h1:JTPEetVSNOczw1n+XjiPozaH2SBPQAc+9VlPE41wEeY=
github.com/openimsdk/tools v0.0.49-alpha.51/go.mod h1:h1cYmfyaVtgFbKmb1Cfsl8XwUOMTt8ubVUQrdGtsUh4=
github.com/openimsdk/tools v0.0.49-alpha.52 h1:NwAAtBO4BV96qG6Z0P2btGEqn4AI2DFgaHvLMXNHal0=
github.com/openimsdk/tools v0.0.49-alpha.52/go.mod h1:h1cYmfyaVtgFbKmb1Cfsl8XwUOMTt8ubVUQrdGtsUh4=
github.com/pelletier/go-toml/v2 v2.1.0 h1:FnwAJ4oYMvbT/34k9zzHuZNrhlz48GB3/s6at6/MHO4=
github.com/pelletier/go-toml/v2 v2.1.0/go.mod h1:tJU2Z3ZkXwnxa4DPO899bsyIoywizdUvyaeZurnPPDc=
github.com/pierrec/lz4/v4 v4.1.21 h1:yOVMLb6qSIDP67pl/5F7RepeKYu/VmTyEXvuMI5d9mQ=

@ -22,11 +22,15 @@ import (
"github.com/openimsdk/open-im-server/v3/pkg/rpcclient"
"github.com/openimsdk/protocol/constant"
"github.com/openimsdk/protocol/msggateway"
"github.com/openimsdk/protocol/sdkws"
"github.com/openimsdk/tools/discovery"
"github.com/openimsdk/tools/errs"
"github.com/openimsdk/tools/log"
"github.com/openimsdk/tools/mcontext"
"github.com/openimsdk/tools/mq/memamq"
"github.com/openimsdk/tools/utils/datautil"
"google.golang.org/grpc"
"sync/atomic"
)
func (s *Server) InitServer(ctx context.Context, config *Config, disCov discovery.SvcDiscoveryRegistry, server *grpc.Server) error {
@ -57,6 +61,7 @@ type Server struct {
pushTerminal map[int]struct{}
ready func(srv *Server) error
userRcp rpcclient.UserRpcClient
queue *memamq.MemoryQueue
}
func (s *Server) SetLongConnServer(LongConnServer LongConnServer) {
@ -70,6 +75,7 @@ func NewServer(rpcPort int, longConnServer LongConnServer, conf *Config, ready f
pushTerminal: make(map[int]struct{}),
config: conf,
ready: ready,
queue: memamq.NewMemoryQueue(512, 1024*16),
}
s.pushTerminal[constant.IOSPlatformID] = struct{}{}
s.pushTerminal[constant.AndroidPlatformID] = struct{}{}
@ -125,57 +131,150 @@ func (s *Server) OnlineBatchPushOneMsg(ctx context.Context, req *msggateway.Onli
return nil, nil
}
func (s *Server) SuperGroupOnlineBatchPushOneMsg(ctx context.Context, req *msggateway.OnlineBatchPushOneMsgReq,
) (*msggateway.OnlineBatchPushOneMsgResp, error) {
var singleUserResults []*msggateway.SingleMsgToUserResults
for _, v := range req.PushToUserIDs {
var resp []*msggateway.SingleMsgToUserPlatform
results := &msggateway.SingleMsgToUserResults{
UserID: v,
func (s *Server) pushToUser(ctx context.Context, userID string, msgData *sdkws.MsgData) *msggateway.SingleMsgToUserResults {
clients, ok := s.LongConnServer.GetUserAllCons(userID)
if !ok {
log.ZDebug(ctx, "push user not online", "userID", userID)
return &msggateway.SingleMsgToUserResults{
UserID: userID,
}
clients, ok := s.LongConnServer.GetUserAllCons(v)
if !ok {
log.ZDebug(ctx, "push user not online", "userID", v)
results.Resp = resp
singleUserResults = append(singleUserResults, results)
}
log.ZDebug(ctx, "push user online", "clients", clients, "userID", userID)
result := &msggateway.SingleMsgToUserResults{
UserID: userID,
Resp: make([]*msggateway.SingleMsgToUserPlatform, 0, len(clients)),
}
for _, client := range clients {
if client == nil {
continue
}
log.ZDebug(ctx, "push user online", "clients", clients, "userID", v)
for _, client := range clients {
if client == nil {
continue
userPlatform := &msggateway.SingleMsgToUserPlatform{
RecvPlatFormID: int32(client.PlatformID),
}
if !client.IsBackground ||
(client.IsBackground && client.PlatformID != constant.IOSPlatformID) {
err := client.PushMessage(ctx, msgData)
if err != nil {
userPlatform.ResultCode = int64(servererrs.ErrPushMsgErr.Code())
//result.Resp = append(result.Resp, userPlatform)
//resp = append(resp, userPlatform)
} else {
if _, ok := s.pushTerminal[client.PlatformID]; ok {
result.OnlinePush = true
//result.Resp = append(result.Resp, userPlatform)
//results.OnlinePush = true
//resp = append(resp, userPlatform)
}
}
} else {
userPlatform.ResultCode = int64(servererrs.ErrIOSBackgroundPushErr.Code())
//resp = append(resp, userPlatform)
}
result.Resp = append(result.Resp, userPlatform)
}
return result
}
userPlatform := &msggateway.SingleMsgToUserPlatform{
RecvPlatFormID: int32(client.PlatformID),
func (s *Server) SuperGroupOnlineBatchPushOneMsg(ctx context.Context, req *msggateway.OnlineBatchPushOneMsgReq) (*msggateway.OnlineBatchPushOneMsgResp, error) {
if len(req.PushToUserIDs) == 0 {
return &msggateway.OnlineBatchPushOneMsgResp{}, nil
}
ch := make(chan *msggateway.SingleMsgToUserResults, len(req.PushToUserIDs))
var count atomic.Int64
count.Add(int64(len(req.PushToUserIDs)))
for _, userID := range req.PushToUserIDs {
err := s.queue.PushCtx(ctx, func() {
ch <- s.pushToUser(ctx, userID, req.MsgData)
if count.Add(-1) == 0 {
close(ch)
}
if !client.IsBackground ||
(client.IsBackground && client.PlatformID != constant.IOSPlatformID) {
err := client.PushMessage(ctx, req.MsgData)
if err != nil {
userPlatform.ResultCode = int64(servererrs.ErrPushMsgErr.Code())
resp = append(resp, userPlatform)
} else {
if _, ok := s.pushTerminal[client.PlatformID]; ok {
results.OnlinePush = true
resp = append(resp, userPlatform)
}
}
} else {
userPlatform.ResultCode = int64(servererrs.ErrIOSBackgroundPushErr.Code())
resp = append(resp, userPlatform)
})
if err != nil {
if count.Add(-1) == 0 {
close(ch)
}
log.ZError(ctx, "pushToUser MemoryQueue failed", err, "userID", userID)
ch <- &msggateway.SingleMsgToUserResults{
UserID: userID,
}
}
results.Resp = resp
singleUserResults = append(singleUserResults, results)
}
return &msggateway.OnlineBatchPushOneMsgResp{
SinglePushResult: singleUserResults,
}, nil
resp := &msggateway.OnlineBatchPushOneMsgResp{
SinglePushResult: make([]*msggateway.SingleMsgToUserResults, 0, len(req.PushToUserIDs)),
}
for {
select {
case <-ctx.Done():
log.ZError(ctx, "SuperGroupOnlineBatchPushOneMsg ctx done", context.Cause(ctx))
userIDSet := datautil.SliceSet(req.PushToUserIDs)
for _, results := range resp.SinglePushResult {
delete(userIDSet, results.UserID)
}
for userID := range userIDSet {
resp.SinglePushResult = append(resp.SinglePushResult, &msggateway.SingleMsgToUserResults{
UserID: userID,
})
}
return resp, nil
case res, ok := <-ch:
if !ok {
return resp, nil
}
resp.SinglePushResult = append(resp.SinglePushResult, res)
}
}
}
//func (s *Server) SuperGroupOnlineBatchPushOneMsg(ctx context.Context, req *msggateway.OnlineBatchPushOneMsgReq) (*msggateway.OnlineBatchPushOneMsgResp, error) {
// var singleUserResults []*msggateway.SingleMsgToUserResults
// for _, userID := range req.PushToUserIDs {
// var resp []*msggateway.SingleMsgToUserPlatform
// results := &msggateway.SingleMsgToUserResults{
// UserID: userID,
// }
// clients, ok := s.LongConnServer.GetUserAllCons(userID)
// if !ok {
// log.ZDebug(ctx, "push user not online", "userID", userID)
// results.Resp = resp
// singleUserResults = append(singleUserResults, results)
// continue
// }
//
// log.ZDebug(ctx, "push user online", "clients", clients, "userID", userID)
// for _, client := range clients {
// if client == nil {
// continue
// }
//
// userPlatform := &msggateway.SingleMsgToUserPlatform{
// RecvPlatFormID: int32(client.PlatformID),
// }
// if !client.IsBackground ||
// (client.IsBackground && client.PlatformID != constant.IOSPlatformID) {
// err := client.PushMessage(ctx, req.MsgData)
// if err != nil {
// userPlatform.ResultCode = int64(servererrs.ErrPushMsgErr.Code())
// resp = append(resp, userPlatform)
// } else {
// if _, ok := s.pushTerminal[client.PlatformID]; ok {
// results.OnlinePush = true
// resp = append(resp, userPlatform)
// }
// }
// } else {
// userPlatform.ResultCode = int64(servererrs.ErrIOSBackgroundPushErr.Code())
// resp = append(resp, userPlatform)
// }
// }
// results.Resp = resp
// singleUserResults = append(singleUserResults, results)
// }
//
// return &msggateway.OnlineBatchPushOneMsgResp{
// SinglePushResult: singleUserResults,
// }, nil
//}
func (s *Server) KickUserOffline(
ctx context.Context,
req *msggateway.KickUserOfflineReq,

Loading…
Cancel
Save