diff --git a/config/email.tmpl b/config/email.tmpl
index 0385601d0..824144e9d 100644
--- a/config/email.tmpl
+++ b/config/email.tmpl
@@ -1,16 +1,36 @@
{{ define "email.to.html" }}
-{{ range .Alerts }}
-
-
-
OpenIM Alert
-
Alert Program: Prometheus Alert
-
Severity Level: {{ .Labels.severity }}
-
Alert Type: {{ .Labels.alertname }}
-
Affected Host: {{ .Labels.instance }}
-
Affected Service: {{ .Labels.job }}
-
Alert Subject: {{ .Annotations.summary }}
-
Trigger Time: {{ .StartsAt.Format "2006-01-02 15:04:05" }}
-
+{{ if eq .Status "firing" }}
+ {{ range .Alerts }}
+
+
+
OpenIM Alert
+
Alert Status: firing
+
Alert Program: Prometheus Alert
+
Severity Level: {{ .Labels.severity }}
+
Alert Type: {{ .Labels.alertname }}
+
Affected Host: {{ .Labels.instance }}
+
Affected Service: {{ .Labels.job }}
+
Alert Subject: {{ .Annotations.summary }}
+
Trigger Time: {{ .StartsAt.Format "2006-01-02 15:04:05" }}
+
+ {{ end }}
+
+
+{{ else if eq .Status "resolved" }}
+ {{ range .Alerts }}
+
+
+
OpenIM Alert
+
Alert Status: resolved
+
Alert Program: Prometheus Alert
+
Severity Level: {{ .Labels.severity }}
+
Alert Type: {{ .Labels.alertname }}
+
Affected Host: {{ .Labels.instance }}
+
Affected Service: {{ .Labels.job }}
+
Alert Subject: {{ .Annotations.summary }}
+
Trigger Time: {{ .StartsAt.Format "2006-01-02 15:04:05" }}
+
+ {{ end }}
{{ end }}
{{ end }}
diff --git a/go.mod b/go.mod
index 0637492eb..fa40effdd 100644
--- a/go.mod
+++ b/go.mod
@@ -13,7 +13,7 @@ require (
github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0
github.com/mitchellh/mapstructure v1.5.0
github.com/openimsdk/protocol v0.0.69-alpha.38
- github.com/openimsdk/tools v0.0.49-alpha.51
+ github.com/openimsdk/tools v0.0.49-alpha.52
github.com/pkg/errors v0.9.1 // indirect
github.com/prometheus/client_golang v1.18.0
github.com/stretchr/testify v1.9.0
diff --git a/go.sum b/go.sum
index 92a783c06..cc0f5f766 100644
--- a/go.sum
+++ b/go.sum
@@ -321,8 +321,8 @@ github.com/openimsdk/gomake v0.0.14-alpha.5 h1:VY9c5x515lTfmdhhPjMvR3BBRrRquAUCF
github.com/openimsdk/gomake v0.0.14-alpha.5/go.mod h1:PndCozNc2IsQIciyn9mvEblYWZwJmAI+06z94EY+csI=
github.com/openimsdk/protocol v0.0.69-alpha.38 h1:kVZCHIXg/el8YJFoIBWhZu1sbbTUqmzgF4l0W3sUH24=
github.com/openimsdk/protocol v0.0.69-alpha.38/go.mod h1:OZQA9FR55lseYoN2Ql1XAHYKHJGu7OMNkUbuekrKCM8=
-github.com/openimsdk/tools v0.0.49-alpha.51 h1:JTPEetVSNOczw1n+XjiPozaH2SBPQAc+9VlPE41wEeY=
-github.com/openimsdk/tools v0.0.49-alpha.51/go.mod h1:h1cYmfyaVtgFbKmb1Cfsl8XwUOMTt8ubVUQrdGtsUh4=
+github.com/openimsdk/tools v0.0.49-alpha.52 h1:NwAAtBO4BV96qG6Z0P2btGEqn4AI2DFgaHvLMXNHal0=
+github.com/openimsdk/tools v0.0.49-alpha.52/go.mod h1:h1cYmfyaVtgFbKmb1Cfsl8XwUOMTt8ubVUQrdGtsUh4=
github.com/pelletier/go-toml/v2 v2.1.0 h1:FnwAJ4oYMvbT/34k9zzHuZNrhlz48GB3/s6at6/MHO4=
github.com/pelletier/go-toml/v2 v2.1.0/go.mod h1:tJU2Z3ZkXwnxa4DPO899bsyIoywizdUvyaeZurnPPDc=
github.com/pierrec/lz4/v4 v4.1.21 h1:yOVMLb6qSIDP67pl/5F7RepeKYu/VmTyEXvuMI5d9mQ=
diff --git a/internal/msggateway/hub_server.go b/internal/msggateway/hub_server.go
index 3891aa532..28c227162 100644
--- a/internal/msggateway/hub_server.go
+++ b/internal/msggateway/hub_server.go
@@ -22,11 +22,15 @@ import (
"github.com/openimsdk/open-im-server/v3/pkg/rpcclient"
"github.com/openimsdk/protocol/constant"
"github.com/openimsdk/protocol/msggateway"
+ "github.com/openimsdk/protocol/sdkws"
"github.com/openimsdk/tools/discovery"
"github.com/openimsdk/tools/errs"
"github.com/openimsdk/tools/log"
"github.com/openimsdk/tools/mcontext"
+ "github.com/openimsdk/tools/mq/memamq"
+ "github.com/openimsdk/tools/utils/datautil"
"google.golang.org/grpc"
+ "sync/atomic"
)
func (s *Server) InitServer(ctx context.Context, config *Config, disCov discovery.SvcDiscoveryRegistry, server *grpc.Server) error {
@@ -57,6 +61,7 @@ type Server struct {
pushTerminal map[int]struct{}
ready func(srv *Server) error
userRcp rpcclient.UserRpcClient
+ queue *memamq.MemoryQueue
}
func (s *Server) SetLongConnServer(LongConnServer LongConnServer) {
@@ -70,6 +75,7 @@ func NewServer(rpcPort int, longConnServer LongConnServer, conf *Config, ready f
pushTerminal: make(map[int]struct{}),
config: conf,
ready: ready,
+ queue: memamq.NewMemoryQueue(512, 1024*16),
}
s.pushTerminal[constant.IOSPlatformID] = struct{}{}
s.pushTerminal[constant.AndroidPlatformID] = struct{}{}
@@ -125,55 +131,93 @@ func (s *Server) OnlineBatchPushOneMsg(ctx context.Context, req *msggateway.Onli
return nil, nil
}
-func (s *Server) SuperGroupOnlineBatchPushOneMsg(ctx context.Context, req *msggateway.OnlineBatchPushOneMsgReq,
-) (*msggateway.OnlineBatchPushOneMsgResp, error) {
- var singleUserResults []*msggateway.SingleMsgToUserResults
- for _, v := range req.PushToUserIDs {
- var resp []*msggateway.SingleMsgToUserPlatform
- results := &msggateway.SingleMsgToUserResults{
- UserID: v,
+func (s *Server) pushToUser(ctx context.Context, userID string, msgData *sdkws.MsgData) *msggateway.SingleMsgToUserResults {
+ clients, ok := s.LongConnServer.GetUserAllCons(userID)
+ if !ok {
+ log.ZDebug(ctx, "push user not online", "userID", userID)
+ return &msggateway.SingleMsgToUserResults{
+ UserID: userID,
}
- clients, ok := s.LongConnServer.GetUserAllCons(v)
- if !ok {
- log.ZDebug(ctx, "push user not online", "userID", v)
- results.Resp = resp
- singleUserResults = append(singleUserResults, results)
+ }
+ log.ZDebug(ctx, "push user online", "clients", clients, "userID", userID)
+ result := &msggateway.SingleMsgToUserResults{
+ UserID: userID,
+ Resp: make([]*msggateway.SingleMsgToUserPlatform, 0, len(clients)),
+ }
+ for _, client := range clients {
+ if client == nil {
continue
}
-
- log.ZDebug(ctx, "push user online", "clients", clients, "userID", v)
- for _, client := range clients {
- if client == nil {
- continue
+ userPlatform := &msggateway.SingleMsgToUserPlatform{
+ RecvPlatFormID: int32(client.PlatformID),
+ }
+ if !client.IsBackground ||
+ (client.IsBackground && client.PlatformID != constant.IOSPlatformID) {
+ err := client.PushMessage(ctx, msgData)
+ if err != nil {
+ userPlatform.ResultCode = int64(servererrs.ErrPushMsgErr.Code())
+ } else {
+ if _, ok := s.pushTerminal[client.PlatformID]; ok {
+ result.OnlinePush = true
+ }
}
+ } else {
+ userPlatform.ResultCode = int64(servererrs.ErrIOSBackgroundPushErr.Code())
+ }
+ result.Resp = append(result.Resp, userPlatform)
+ }
+ return result
+}
- userPlatform := &msggateway.SingleMsgToUserPlatform{
- RecvPlatFormID: int32(client.PlatformID),
+func (s *Server) SuperGroupOnlineBatchPushOneMsg(ctx context.Context, req *msggateway.OnlineBatchPushOneMsgReq) (*msggateway.OnlineBatchPushOneMsgResp, error) {
+ if len(req.PushToUserIDs) == 0 {
+ return &msggateway.OnlineBatchPushOneMsgResp{}, nil
+ }
+ ch := make(chan *msggateway.SingleMsgToUserResults, len(req.PushToUserIDs))
+ var count atomic.Int64
+ count.Add(int64(len(req.PushToUserIDs)))
+ for i := range req.PushToUserIDs {
+ userID := req.PushToUserIDs[i]
+ err := s.queue.PushCtx(ctx, func() {
+ ch <- s.pushToUser(ctx, userID, req.MsgData)
+ if count.Add(-1) == 0 {
+ close(ch)
}
- if !client.IsBackground ||
- (client.IsBackground && client.PlatformID != constant.IOSPlatformID) {
- err := client.PushMessage(ctx, req.MsgData)
- if err != nil {
- userPlatform.ResultCode = int64(servererrs.ErrPushMsgErr.Code())
- resp = append(resp, userPlatform)
- } else {
- if _, ok := s.pushTerminal[client.PlatformID]; ok {
- results.OnlinePush = true
- resp = append(resp, userPlatform)
- }
- }
- } else {
- userPlatform.ResultCode = int64(servererrs.ErrIOSBackgroundPushErr.Code())
- resp = append(resp, userPlatform)
+ })
+ if err != nil {
+ if count.Add(-1) == 0 {
+ close(ch)
+ }
+ log.ZError(ctx, "pushToUser MemoryQueue failed", err, "userID", userID)
+ ch <- &msggateway.SingleMsgToUserResults{
+ UserID: userID,
}
}
- results.Resp = resp
- singleUserResults = append(singleUserResults, results)
}
-
- return &msggateway.OnlineBatchPushOneMsgResp{
- SinglePushResult: singleUserResults,
- }, nil
+ resp := &msggateway.OnlineBatchPushOneMsgResp{
+ SinglePushResult: make([]*msggateway.SingleMsgToUserResults, 0, len(req.PushToUserIDs)),
+ }
+ for {
+ select {
+ case <-ctx.Done():
+ log.ZError(ctx, "SuperGroupOnlineBatchPushOneMsg ctx done", context.Cause(ctx))
+ userIDSet := datautil.SliceSet(req.PushToUserIDs)
+ for _, results := range resp.SinglePushResult {
+ delete(userIDSet, results.UserID)
+ }
+ for userID := range userIDSet {
+ resp.SinglePushResult = append(resp.SinglePushResult, &msggateway.SingleMsgToUserResults{
+ UserID: userID,
+ })
+ }
+ return resp, nil
+ case res, ok := <-ch:
+ if !ok {
+ return resp, nil
+ }
+ resp.SinglePushResult = append(resp.SinglePushResult, res)
+ }
+ }
}
func (s *Server) KickUserOffline(
diff --git a/internal/rpc/third/log.go b/internal/rpc/third/log.go
index 5c0b1f2e6..cd52727cb 100644
--- a/internal/rpc/third/log.go
+++ b/internal/rpc/third/log.go
@@ -50,13 +50,14 @@ func (t *thirdServer) UploadLogs(ctx context.Context, req *third.UploadLogsReq)
platform := constant.PlatformID2Name[int(req.Platform)]
for _, fileURL := range req.FileURLs {
log := relationtb.Log{
- Version: req.Version,
- SystemType: req.SystemType,
Platform: platform,
UserID: userID,
CreateTime: time.Now(),
Url: fileURL.URL,
FileName: fileURL.Filename,
+ SystemType: req.SystemType,
+ Version: req.Version,
+ Ex: req.Ex,
}
for i := 0; i < 20; i++ {
id := genLogID()
diff --git a/pkg/common/storage/controller/user.go b/pkg/common/storage/controller/user.go
index 5ce8104e7..533eac78f 100644
--- a/pkg/common/storage/controller/user.go
+++ b/pkg/common/storage/controller/user.go
@@ -110,6 +110,7 @@ func (u *userDatabase) InitOnce(ctx context.Context, users []*model.User) error
// FindWithError Get the information of the specified user and return an error if the userID is not found.
func (u *userDatabase) FindWithError(ctx context.Context, userIDs []string) (users []*model.User, err error) {
+ userIDs = datautil.Distinct(userIDs)
users, err = u.cache.GetUsersInfo(ctx, userIDs)
if err != nil {
return