From 8087f705c009d30311072c203371b3cba2869c5e Mon Sep 17 00:00:00 2001
From: icey-yu <119291641+icey-yu@users.noreply.github.com>
Date: Tue, 23 Jul 2024 10:26:04 +0800
Subject: [PATCH 1/2] Fix search log (#2425)
* feat: update alert email template
* fix: search log
* fix: log support ex
---
config/email.tmpl | 44 +++++++++++++++++++--------
internal/rpc/third/log.go | 5 +--
pkg/common/storage/controller/user.go | 1 +
3 files changed, 36 insertions(+), 14 deletions(-)
diff --git a/config/email.tmpl b/config/email.tmpl
index 0385601d0..824144e9d 100644
--- a/config/email.tmpl
+++ b/config/email.tmpl
@@ -1,16 +1,36 @@
{{ define "email.to.html" }}
-{{ range .Alerts }}
-
-
-
OpenIM Alert
-
Alert Program: Prometheus Alert
-
Severity Level: {{ .Labels.severity }}
-
Alert Type: {{ .Labels.alertname }}
-
Affected Host: {{ .Labels.instance }}
-
Affected Service: {{ .Labels.job }}
-
Alert Subject: {{ .Annotations.summary }}
-
Trigger Time: {{ .StartsAt.Format "2006-01-02 15:04:05" }}
-
+{{ if eq .Status "firing" }}
+ {{ range .Alerts }}
+
+
+
OpenIM Alert
+
Alert Status: firing
+
Alert Program: Prometheus Alert
+
Severity Level: {{ .Labels.severity }}
+
Alert Type: {{ .Labels.alertname }}
+
Affected Host: {{ .Labels.instance }}
+
Affected Service: {{ .Labels.job }}
+
Alert Subject: {{ .Annotations.summary }}
+
Trigger Time: {{ .StartsAt.Format "2006-01-02 15:04:05" }}
+
+ {{ end }}
+
+
+{{ else if eq .Status "resolved" }}
+ {{ range .Alerts }}
+
+
+
OpenIM Alert
+
Alert Status: resolved
+
Alert Program: Prometheus Alert
+
Severity Level: {{ .Labels.severity }}
+
Alert Type: {{ .Labels.alertname }}
+
Affected Host: {{ .Labels.instance }}
+
Affected Service: {{ .Labels.job }}
+
Alert Subject: {{ .Annotations.summary }}
+
Trigger Time: {{ .StartsAt.Format "2006-01-02 15:04:05" }}
+
+ {{ end }}
{{ end }}
{{ end }}
diff --git a/internal/rpc/third/log.go b/internal/rpc/third/log.go
index 5c0b1f2e6..cd52727cb 100644
--- a/internal/rpc/third/log.go
+++ b/internal/rpc/third/log.go
@@ -50,13 +50,14 @@ func (t *thirdServer) UploadLogs(ctx context.Context, req *third.UploadLogsReq)
platform := constant.PlatformID2Name[int(req.Platform)]
for _, fileURL := range req.FileURLs {
log := relationtb.Log{
- Version: req.Version,
- SystemType: req.SystemType,
Platform: platform,
UserID: userID,
CreateTime: time.Now(),
Url: fileURL.URL,
FileName: fileURL.Filename,
+ SystemType: req.SystemType,
+ Version: req.Version,
+ Ex: req.Ex,
}
for i := 0; i < 20; i++ {
id := genLogID()
diff --git a/pkg/common/storage/controller/user.go b/pkg/common/storage/controller/user.go
index 5ce8104e7..533eac78f 100644
--- a/pkg/common/storage/controller/user.go
+++ b/pkg/common/storage/controller/user.go
@@ -110,6 +110,7 @@ func (u *userDatabase) InitOnce(ctx context.Context, users []*model.User) error
// FindWithError Get the information of the specified user and return an error if the userID is not found.
func (u *userDatabase) FindWithError(ctx context.Context, userIDs []string) (users []*model.User, err error) {
+ userIDs = datautil.Distinct(userIDs)
users, err = u.cache.GetUsersInfo(ctx, userIDs)
if err != nil {
return
From 49ca5c998c96692bd61acdc9e9e69b24b6773994 Mon Sep 17 00:00:00 2001
From: chao <48119764+withchao@users.noreply.github.com>
Date: Tue, 23 Jul 2024 14:20:42 +0800
Subject: [PATCH 2/2] feat: msg queue push (#2434)
* fix: GroupApplicationAcceptedNotification
* fix: GroupApplicationAcceptedNotification
* fix: NotificationUserInfoUpdate
* cicd: robot automated Change
* fix: component
* fix: getConversationInfo
* feat: cron task
* feat: cron task
* feat: cron task
* feat: cron task
* feat: cron task
* fix: minio config url recognition error
* new mongo
* new mongo
* new mongo
* new mongo
* new mongo
* new mongo
* new mongo
* new mongo
* friend incr sync
* friend incr sync
* friend incr sync
* friend incr sync
* friend incr sync
* mage
* optimization version log
* optimization version log
* sync
* sync
* sync
* group sync
* sync option
* sync option
* refactor: replace `friend` package with `realtion`.
* refactor: update lastest commit to relation.
* sync option
* sync option
* sync option
* sync
* sync
* go.mod
* seq
* update: go mod
* refactor: change incremental to full
* feat: get full friend user ids
* feat: api and config
* seq
* group version
* merge
* seq
* seq
* seq
* fix: sort by id avoid unstable sort friends.
* group
* group
* group
* fix: sort by id avoid unstable sort friends.
* fix: sort by id avoid unstable sort friends.
* fix: sort by id avoid unstable sort friends.
* user version
* seq
* seq
* seq user
* user online
* implement minio expire delete.
* user online
* config
* fix
* fix
* implement minio expire delete logic.
* online cache
* online cache
* online cache
* online cache
* online cache
* online cache
* online cache
* online cache
* online cache
* online cache
* online cache
* online cache
* feat: implement scheduled delete outdated object in minio.
* update gomake version
* update gomake version
* implement FindExpires pagination.
* remove unnesseary incr.
* fix uncorrect args call.
* online push
* online push
* online push
* resolving conflicts
* resolving conflicts
* test
* api prommetrics
* api prommetrics
* api prommetrics
* api prommetrics
* api prommetrics
* rpc prommetrics
* rpc prommetrics
* online status
* online status
* online status
* online status
* sub
* conversation version incremental
* merge seq
* merge online
* merge online
* merge online
* merge seq
* GetOwnerConversation
* fix: change incremental syncer router name.
* rockscache batch get
* rockscache seq batch get
* fix: GetMsgDocModelByIndex bug
* update go.mod
* update go.mod
* merge
* feat: prometheus
* feat: prometheus
* group member sort
* sub
* sub
* fix: seq conversion bug
* fix: redis pipe exec
* sort version
* sort version
* sort version
* remove old version online subscription
* remove old version online subscription
* version log index
* version log index
* batch push
* batch push
---------
Co-authored-by: withchao
Co-authored-by: Monet Lee
Co-authored-by: OpenIM-Gordon <46924906+FGadvancer@users.noreply.github.com>
Co-authored-by: icey-yu <1186114839@qq.com>
---
go.mod | 2 +-
go.sum | 4 +-
internal/msggateway/hub_server.go | 124 ++++++++++++++++++++----------
3 files changed, 87 insertions(+), 43 deletions(-)
diff --git a/go.mod b/go.mod
index 0637492eb..fa40effdd 100644
--- a/go.mod
+++ b/go.mod
@@ -13,7 +13,7 @@ require (
github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0
github.com/mitchellh/mapstructure v1.5.0
github.com/openimsdk/protocol v0.0.69-alpha.38
- github.com/openimsdk/tools v0.0.49-alpha.51
+ github.com/openimsdk/tools v0.0.49-alpha.52
github.com/pkg/errors v0.9.1 // indirect
github.com/prometheus/client_golang v1.18.0
github.com/stretchr/testify v1.9.0
diff --git a/go.sum b/go.sum
index 92a783c06..cc0f5f766 100644
--- a/go.sum
+++ b/go.sum
@@ -321,8 +321,8 @@ github.com/openimsdk/gomake v0.0.14-alpha.5 h1:VY9c5x515lTfmdhhPjMvR3BBRrRquAUCF
github.com/openimsdk/gomake v0.0.14-alpha.5/go.mod h1:PndCozNc2IsQIciyn9mvEblYWZwJmAI+06z94EY+csI=
github.com/openimsdk/protocol v0.0.69-alpha.38 h1:kVZCHIXg/el8YJFoIBWhZu1sbbTUqmzgF4l0W3sUH24=
github.com/openimsdk/protocol v0.0.69-alpha.38/go.mod h1:OZQA9FR55lseYoN2Ql1XAHYKHJGu7OMNkUbuekrKCM8=
-github.com/openimsdk/tools v0.0.49-alpha.51 h1:JTPEetVSNOczw1n+XjiPozaH2SBPQAc+9VlPE41wEeY=
-github.com/openimsdk/tools v0.0.49-alpha.51/go.mod h1:h1cYmfyaVtgFbKmb1Cfsl8XwUOMTt8ubVUQrdGtsUh4=
+github.com/openimsdk/tools v0.0.49-alpha.52 h1:NwAAtBO4BV96qG6Z0P2btGEqn4AI2DFgaHvLMXNHal0=
+github.com/openimsdk/tools v0.0.49-alpha.52/go.mod h1:h1cYmfyaVtgFbKmb1Cfsl8XwUOMTt8ubVUQrdGtsUh4=
github.com/pelletier/go-toml/v2 v2.1.0 h1:FnwAJ4oYMvbT/34k9zzHuZNrhlz48GB3/s6at6/MHO4=
github.com/pelletier/go-toml/v2 v2.1.0/go.mod h1:tJU2Z3ZkXwnxa4DPO899bsyIoywizdUvyaeZurnPPDc=
github.com/pierrec/lz4/v4 v4.1.21 h1:yOVMLb6qSIDP67pl/5F7RepeKYu/VmTyEXvuMI5d9mQ=
diff --git a/internal/msggateway/hub_server.go b/internal/msggateway/hub_server.go
index 3891aa532..28c227162 100644
--- a/internal/msggateway/hub_server.go
+++ b/internal/msggateway/hub_server.go
@@ -22,11 +22,15 @@ import (
"github.com/openimsdk/open-im-server/v3/pkg/rpcclient"
"github.com/openimsdk/protocol/constant"
"github.com/openimsdk/protocol/msggateway"
+ "github.com/openimsdk/protocol/sdkws"
"github.com/openimsdk/tools/discovery"
"github.com/openimsdk/tools/errs"
"github.com/openimsdk/tools/log"
"github.com/openimsdk/tools/mcontext"
+ "github.com/openimsdk/tools/mq/memamq"
+ "github.com/openimsdk/tools/utils/datautil"
"google.golang.org/grpc"
+ "sync/atomic"
)
func (s *Server) InitServer(ctx context.Context, config *Config, disCov discovery.SvcDiscoveryRegistry, server *grpc.Server) error {
@@ -57,6 +61,7 @@ type Server struct {
pushTerminal map[int]struct{}
ready func(srv *Server) error
userRcp rpcclient.UserRpcClient
+ queue *memamq.MemoryQueue
}
func (s *Server) SetLongConnServer(LongConnServer LongConnServer) {
@@ -70,6 +75,7 @@ func NewServer(rpcPort int, longConnServer LongConnServer, conf *Config, ready f
pushTerminal: make(map[int]struct{}),
config: conf,
ready: ready,
+ queue: memamq.NewMemoryQueue(512, 1024*16),
}
s.pushTerminal[constant.IOSPlatformID] = struct{}{}
s.pushTerminal[constant.AndroidPlatformID] = struct{}{}
@@ -125,55 +131,93 @@ func (s *Server) OnlineBatchPushOneMsg(ctx context.Context, req *msggateway.Onli
return nil, nil
}
-func (s *Server) SuperGroupOnlineBatchPushOneMsg(ctx context.Context, req *msggateway.OnlineBatchPushOneMsgReq,
-) (*msggateway.OnlineBatchPushOneMsgResp, error) {
- var singleUserResults []*msggateway.SingleMsgToUserResults
- for _, v := range req.PushToUserIDs {
- var resp []*msggateway.SingleMsgToUserPlatform
- results := &msggateway.SingleMsgToUserResults{
- UserID: v,
+func (s *Server) pushToUser(ctx context.Context, userID string, msgData *sdkws.MsgData) *msggateway.SingleMsgToUserResults {
+ clients, ok := s.LongConnServer.GetUserAllCons(userID)
+ if !ok {
+ log.ZDebug(ctx, "push user not online", "userID", userID)
+ return &msggateway.SingleMsgToUserResults{
+ UserID: userID,
}
- clients, ok := s.LongConnServer.GetUserAllCons(v)
- if !ok {
- log.ZDebug(ctx, "push user not online", "userID", v)
- results.Resp = resp
- singleUserResults = append(singleUserResults, results)
+ }
+ log.ZDebug(ctx, "push user online", "clients", clients, "userID", userID)
+ result := &msggateway.SingleMsgToUserResults{
+ UserID: userID,
+ Resp: make([]*msggateway.SingleMsgToUserPlatform, 0, len(clients)),
+ }
+ for _, client := range clients {
+ if client == nil {
continue
}
-
- log.ZDebug(ctx, "push user online", "clients", clients, "userID", v)
- for _, client := range clients {
- if client == nil {
- continue
+ userPlatform := &msggateway.SingleMsgToUserPlatform{
+ RecvPlatFormID: int32(client.PlatformID),
+ }
+ if !client.IsBackground ||
+ (client.IsBackground && client.PlatformID != constant.IOSPlatformID) {
+ err := client.PushMessage(ctx, msgData)
+ if err != nil {
+ userPlatform.ResultCode = int64(servererrs.ErrPushMsgErr.Code())
+ } else {
+ if _, ok := s.pushTerminal[client.PlatformID]; ok {
+ result.OnlinePush = true
+ }
}
+ } else {
+ userPlatform.ResultCode = int64(servererrs.ErrIOSBackgroundPushErr.Code())
+ }
+ result.Resp = append(result.Resp, userPlatform)
+ }
+ return result
+}
- userPlatform := &msggateway.SingleMsgToUserPlatform{
- RecvPlatFormID: int32(client.PlatformID),
+func (s *Server) SuperGroupOnlineBatchPushOneMsg(ctx context.Context, req *msggateway.OnlineBatchPushOneMsgReq) (*msggateway.OnlineBatchPushOneMsgResp, error) {
+ if len(req.PushToUserIDs) == 0 {
+ return &msggateway.OnlineBatchPushOneMsgResp{}, nil
+ }
+ ch := make(chan *msggateway.SingleMsgToUserResults, len(req.PushToUserIDs))
+ var count atomic.Int64
+ count.Add(int64(len(req.PushToUserIDs)))
+ for i := range req.PushToUserIDs {
+ userID := req.PushToUserIDs[i]
+ err := s.queue.PushCtx(ctx, func() {
+ ch <- s.pushToUser(ctx, userID, req.MsgData)
+ if count.Add(-1) == 0 {
+ close(ch)
}
- if !client.IsBackground ||
- (client.IsBackground && client.PlatformID != constant.IOSPlatformID) {
- err := client.PushMessage(ctx, req.MsgData)
- if err != nil {
- userPlatform.ResultCode = int64(servererrs.ErrPushMsgErr.Code())
- resp = append(resp, userPlatform)
- } else {
- if _, ok := s.pushTerminal[client.PlatformID]; ok {
- results.OnlinePush = true
- resp = append(resp, userPlatform)
- }
- }
- } else {
- userPlatform.ResultCode = int64(servererrs.ErrIOSBackgroundPushErr.Code())
- resp = append(resp, userPlatform)
+ })
+ if err != nil {
+ if count.Add(-1) == 0 {
+ close(ch)
+ }
+ log.ZError(ctx, "pushToUser MemoryQueue failed", err, "userID", userID)
+ ch <- &msggateway.SingleMsgToUserResults{
+ UserID: userID,
}
}
- results.Resp = resp
- singleUserResults = append(singleUserResults, results)
}
-
- return &msggateway.OnlineBatchPushOneMsgResp{
- SinglePushResult: singleUserResults,
- }, nil
+ resp := &msggateway.OnlineBatchPushOneMsgResp{
+ SinglePushResult: make([]*msggateway.SingleMsgToUserResults, 0, len(req.PushToUserIDs)),
+ }
+ for {
+ select {
+ case <-ctx.Done():
+ log.ZError(ctx, "SuperGroupOnlineBatchPushOneMsg ctx done", context.Cause(ctx))
+ userIDSet := datautil.SliceSet(req.PushToUserIDs)
+ for _, results := range resp.SinglePushResult {
+ delete(userIDSet, results.UserID)
+ }
+ for userID := range userIDSet {
+ resp.SinglePushResult = append(resp.SinglePushResult, &msggateway.SingleMsgToUserResults{
+ UserID: userID,
+ })
+ }
+ return resp, nil
+ case res, ok := <-ch:
+ if !ok {
+ return resp, nil
+ }
+ resp.SinglePushResult = append(resp.SinglePushResult, res)
+ }
+ }
}
func (s *Server) KickUserOffline(