Merge branch 'main' of github.com:openimsdk/open-im-server into log/rpc

pull/2436/head
Monet Lee 1 year ago
commit 79b70e3605

@ -1,16 +1,36 @@
{{ define "email.to.html" }} {{ define "email.to.html" }}
{{ range .Alerts }} {{ if eq .Status "firing" }}
<!-- Begin of OpenIM Alert --> {{ range .Alerts }}
<div style="border:1px solid #ccc; padding:10px; margin-bottom:10px;"> <!-- Begin of OpenIM Alert -->
<h3>OpenIM Alert</h3> <div style="border:1px solid #ccc; padding:10px; margin-bottom:10px;">
<p><strong>Alert Program:</strong> Prometheus Alert</p> <h3>OpenIM Alert</h3>
<p><strong>Severity Level:</strong> {{ .Labels.severity }}</p> <p><strong>Alert Status:</strong> firing</p>
<p><strong>Alert Type:</strong> {{ .Labels.alertname }}</p> <p><strong>Alert Program:</strong> Prometheus Alert</p>
<p><strong>Affected Host:</strong> {{ .Labels.instance }}</p> <p><strong>Severity Level:</strong> {{ .Labels.severity }}</p>
<p><strong>Affected Service:</strong> {{ .Labels.job }}</p> <p><strong>Alert Type:</strong> {{ .Labels.alertname }}</p>
<p><strong>Alert Subject:</strong> {{ .Annotations.summary }}</p> <p><strong>Affected Host:</strong> {{ .Labels.instance }}</p>
<p><strong>Trigger Time:</strong> {{ .StartsAt.Format "2006-01-02 15:04:05" }}</p> <p><strong>Affected Service:</strong> {{ .Labels.job }}</p>
</div> <p><strong>Alert Subject:</strong> {{ .Annotations.summary }}</p>
<p><strong>Trigger Time:</strong> {{ .StartsAt.Format "2006-01-02 15:04:05" }}</p>
</div>
{{ end }}
{{ else if eq .Status "resolved" }}
{{ range .Alerts }}
<!-- Begin of OpenIM Alert -->
<div style="border:1px solid #ccc; padding:10px; margin-bottom:10px;">
<h3>OpenIM Alert</h3>
<p><strong>Alert Status:</strong> resolved</p>
<p><strong>Alert Program:</strong> Prometheus Alert</p>
<p><strong>Severity Level:</strong> {{ .Labels.severity }}</p>
<p><strong>Alert Type:</strong> {{ .Labels.alertname }}</p>
<p><strong>Affected Host:</strong> {{ .Labels.instance }}</p>
<p><strong>Affected Service:</strong> {{ .Labels.job }}</p>
<p><strong>Alert Subject:</strong> {{ .Annotations.summary }}</p>
<p><strong>Trigger Time:</strong> {{ .StartsAt.Format "2006-01-02 15:04:05" }}</p>
</div>
{{ end }}
<!-- End of OpenIM Alert --> <!-- End of OpenIM Alert -->
{{ end }} {{ end }}
{{ end }} {{ end }}

@ -22,11 +22,15 @@ import (
"github.com/openimsdk/open-im-server/v3/pkg/rpcclient" "github.com/openimsdk/open-im-server/v3/pkg/rpcclient"
"github.com/openimsdk/protocol/constant" "github.com/openimsdk/protocol/constant"
"github.com/openimsdk/protocol/msggateway" "github.com/openimsdk/protocol/msggateway"
"github.com/openimsdk/protocol/sdkws"
"github.com/openimsdk/tools/discovery" "github.com/openimsdk/tools/discovery"
"github.com/openimsdk/tools/errs" "github.com/openimsdk/tools/errs"
"github.com/openimsdk/tools/log" "github.com/openimsdk/tools/log"
"github.com/openimsdk/tools/mcontext" "github.com/openimsdk/tools/mcontext"
"github.com/openimsdk/tools/mq/memamq"
"github.com/openimsdk/tools/utils/datautil"
"google.golang.org/grpc" "google.golang.org/grpc"
"sync/atomic"
) )
func (s *Server) InitServer(ctx context.Context, config *Config, disCov discovery.SvcDiscoveryRegistry, server *grpc.Server) error { func (s *Server) InitServer(ctx context.Context, config *Config, disCov discovery.SvcDiscoveryRegistry, server *grpc.Server) error {
@ -57,6 +61,7 @@ type Server struct {
pushTerminal map[int]struct{} pushTerminal map[int]struct{}
ready func(srv *Server) error ready func(srv *Server) error
userRcp rpcclient.UserRpcClient userRcp rpcclient.UserRpcClient
queue *memamq.MemoryQueue
} }
func (s *Server) SetLongConnServer(LongConnServer LongConnServer) { func (s *Server) SetLongConnServer(LongConnServer LongConnServer) {
@ -70,6 +75,7 @@ func NewServer(rpcPort int, longConnServer LongConnServer, conf *Config, ready f
pushTerminal: make(map[int]struct{}), pushTerminal: make(map[int]struct{}),
config: conf, config: conf,
ready: ready, ready: ready,
queue: memamq.NewMemoryQueue(512, 1024*16),
} }
s.pushTerminal[constant.IOSPlatformID] = struct{}{} s.pushTerminal[constant.IOSPlatformID] = struct{}{}
s.pushTerminal[constant.AndroidPlatformID] = struct{}{} s.pushTerminal[constant.AndroidPlatformID] = struct{}{}
@ -125,55 +131,93 @@ func (s *Server) OnlineBatchPushOneMsg(ctx context.Context, req *msggateway.Onli
return nil, nil return nil, nil
} }
func (s *Server) SuperGroupOnlineBatchPushOneMsg(ctx context.Context, req *msggateway.OnlineBatchPushOneMsgReq, func (s *Server) pushToUser(ctx context.Context, userID string, msgData *sdkws.MsgData) *msggateway.SingleMsgToUserResults {
) (*msggateway.OnlineBatchPushOneMsgResp, error) { clients, ok := s.LongConnServer.GetUserAllCons(userID)
var singleUserResults []*msggateway.SingleMsgToUserResults if !ok {
for _, v := range req.PushToUserIDs { log.ZDebug(ctx, "push user not online", "userID", userID)
var resp []*msggateway.SingleMsgToUserPlatform return &msggateway.SingleMsgToUserResults{
results := &msggateway.SingleMsgToUserResults{ UserID: userID,
UserID: v,
} }
clients, ok := s.LongConnServer.GetUserAllCons(v) }
if !ok { log.ZDebug(ctx, "push user online", "clients", clients, "userID", userID)
log.ZDebug(ctx, "push user not online", "userID", v) result := &msggateway.SingleMsgToUserResults{
results.Resp = resp UserID: userID,
singleUserResults = append(singleUserResults, results) Resp: make([]*msggateway.SingleMsgToUserPlatform, 0, len(clients)),
}
for _, client := range clients {
if client == nil {
continue continue
} }
userPlatform := &msggateway.SingleMsgToUserPlatform{
log.ZDebug(ctx, "push user online", "clients", clients, "userID", v) RecvPlatFormID: int32(client.PlatformID),
for _, client := range clients { }
if client == nil { if !client.IsBackground ||
continue (client.IsBackground && client.PlatformID != constant.IOSPlatformID) {
err := client.PushMessage(ctx, msgData)
if err != nil {
userPlatform.ResultCode = int64(servererrs.ErrPushMsgErr.Code())
} else {
if _, ok := s.pushTerminal[client.PlatformID]; ok {
result.OnlinePush = true
}
} }
} else {
userPlatform.ResultCode = int64(servererrs.ErrIOSBackgroundPushErr.Code())
}
result.Resp = append(result.Resp, userPlatform)
}
return result
}
userPlatform := &msggateway.SingleMsgToUserPlatform{ func (s *Server) SuperGroupOnlineBatchPushOneMsg(ctx context.Context, req *msggateway.OnlineBatchPushOneMsgReq) (*msggateway.OnlineBatchPushOneMsgResp, error) {
RecvPlatFormID: int32(client.PlatformID), if len(req.PushToUserIDs) == 0 {
return &msggateway.OnlineBatchPushOneMsgResp{}, nil
}
ch := make(chan *msggateway.SingleMsgToUserResults, len(req.PushToUserIDs))
var count atomic.Int64
count.Add(int64(len(req.PushToUserIDs)))
for i := range req.PushToUserIDs {
userID := req.PushToUserIDs[i]
err := s.queue.PushCtx(ctx, func() {
ch <- s.pushToUser(ctx, userID, req.MsgData)
if count.Add(-1) == 0 {
close(ch)
} }
if !client.IsBackground || })
(client.IsBackground && client.PlatformID != constant.IOSPlatformID) { if err != nil {
err := client.PushMessage(ctx, req.MsgData) if count.Add(-1) == 0 {
if err != nil { close(ch)
userPlatform.ResultCode = int64(servererrs.ErrPushMsgErr.Code()) }
resp = append(resp, userPlatform) log.ZError(ctx, "pushToUser MemoryQueue failed", err, "userID", userID)
} else { ch <- &msggateway.SingleMsgToUserResults{
if _, ok := s.pushTerminal[client.PlatformID]; ok { UserID: userID,
results.OnlinePush = true
resp = append(resp, userPlatform)
}
}
} else {
userPlatform.ResultCode = int64(servererrs.ErrIOSBackgroundPushErr.Code())
resp = append(resp, userPlatform)
} }
} }
results.Resp = resp
singleUserResults = append(singleUserResults, results)
} }
resp := &msggateway.OnlineBatchPushOneMsgResp{
return &msggateway.OnlineBatchPushOneMsgResp{ SinglePushResult: make([]*msggateway.SingleMsgToUserResults, 0, len(req.PushToUserIDs)),
SinglePushResult: singleUserResults, }
}, nil for {
select {
case <-ctx.Done():
log.ZError(ctx, "SuperGroupOnlineBatchPushOneMsg ctx done", context.Cause(ctx))
userIDSet := datautil.SliceSet(req.PushToUserIDs)
for _, results := range resp.SinglePushResult {
delete(userIDSet, results.UserID)
}
for userID := range userIDSet {
resp.SinglePushResult = append(resp.SinglePushResult, &msggateway.SingleMsgToUserResults{
UserID: userID,
})
}
return resp, nil
case res, ok := <-ch:
if !ok {
return resp, nil
}
resp.SinglePushResult = append(resp.SinglePushResult, res)
}
}
} }
func (s *Server) KickUserOffline( func (s *Server) KickUserOffline(

@ -50,13 +50,14 @@ func (t *thirdServer) UploadLogs(ctx context.Context, req *third.UploadLogsReq)
platform := constant.PlatformID2Name[int(req.Platform)] platform := constant.PlatformID2Name[int(req.Platform)]
for _, fileURL := range req.FileURLs { for _, fileURL := range req.FileURLs {
log := relationtb.Log{ log := relationtb.Log{
Version: req.Version,
SystemType: req.SystemType,
Platform: platform, Platform: platform,
UserID: userID, UserID: userID,
CreateTime: time.Now(), CreateTime: time.Now(),
Url: fileURL.URL, Url: fileURL.URL,
FileName: fileURL.Filename, FileName: fileURL.Filename,
SystemType: req.SystemType,
Version: req.Version,
Ex: req.Ex,
} }
for i := 0; i < 20; i++ { for i := 0; i < 20; i++ {
id := genLogID() id := genLogID()

@ -110,6 +110,7 @@ func (u *userDatabase) InitOnce(ctx context.Context, users []*model.User) error
// FindWithError Get the information of the specified user and return an error if the userID is not found. // FindWithError Get the information of the specified user and return an error if the userID is not found.
func (u *userDatabase) FindWithError(ctx context.Context, userIDs []string) (users []*model.User, err error) { func (u *userDatabase) FindWithError(ctx context.Context, userIDs []string) (users []*model.User, err error) {
userIDs = datautil.Distinct(userIDs)
users, err = u.cache.GetUsersInfo(ctx, userIDs) users, err = u.cache.GetUsersInfo(ctx, userIDs)
if err != nil { if err != nil {
return return

Loading…
Cancel
Save