From 1664579cf7eb91786e839656271eb4e27c9425a4 Mon Sep 17 00:00:00 2001 From: "fengyun.rui" Date: Thu, 2 Nov 2023 12:11:19 +0800 Subject: [PATCH 1/5] perf: reduce register client latency (#1312) Signed-off-by: rfyiamcool --- internal/msggateway/n_ws_server.go | 45 +++++++++++++++++++----------- 1 file changed, 29 insertions(+), 16 deletions(-) diff --git a/internal/msggateway/n_ws_server.go b/internal/msggateway/n_ws_server.go index 07a83fb5c..83e297502 100644 --- a/internal/msggateway/n_ws_server.go +++ b/internal/msggateway/n_ws_server.go @@ -74,8 +74,8 @@ type WsServer struct { kickHandlerChan chan *kickHandler clients *UserMap clientPool sync.Pool - onlineUserNum int64 - onlineUserConnNum int64 + onlineUserNum atomic.Int64 + onlineUserConnNum atomic.Int64 handshakeTimeout time.Duration hubServer *Server validate *validator.Validate @@ -220,8 +220,8 @@ func (ws *WsServer) registerClient(client *Client) { if !userOK { ws.clients.Set(client.UserID, client) log.ZDebug(client.ctx, "user not exist", "userID", client.UserID, "platformID", client.PlatformID) - atomic.AddInt64(&ws.onlineUserNum, 1) - atomic.AddInt64(&ws.onlineUserConnNum, 1) + ws.onlineUserNum.Add(1) + ws.onlineUserConnNum.Add(1) } else { i := &kickHandler{ clientOK: clientOK, @@ -234,22 +234,35 @@ func (ws *WsServer) registerClient(client *Client) { ws.clients.Set(client.UserID, client) // 已经有同平台的连接存在 log.ZInfo(client.ctx, "repeat login", "userID", client.UserID, "platformID", client.PlatformID, "old remote addr", getRemoteAdders(oldClients)) - atomic.AddInt64(&ws.onlineUserConnNum, 1) + ws.onlineUserConnNum.Add(1) } else { ws.clients.Set(client.UserID, client) - - atomic.AddInt64(&ws.onlineUserConnNum, 1) + ws.onlineUserConnNum.Add(1) } } - ws.sendUserOnlineInfoToOtherNode(client.ctx, client) - ws.SetUserOnlineStatus(client.ctx, client, constant.Online) + + wg := sync.WaitGroup{} + wg.Add(1) + go func() { + defer wg.Done() + _ = ws.sendUserOnlineInfoToOtherNode(client.ctx, client) + }() + + wg.Add(1) + go func() { + defer wg.Done() + ws.SetUserOnlineStatus(client.ctx, client, constant.Online) + }() + + wg.Wait() + log.ZInfo( client.ctx, "user online", "online user Num", - ws.onlineUserNum, + ws.onlineUserNum.Load(), "online user conn Num", - ws.onlineUserConnNum, + ws.onlineUserConnNum.Load(), ) } @@ -282,7 +295,7 @@ func (ws *WsServer) multiTerminalLoginChecker(clientOK bool, oldClients []*Clien if clientOK { isDeleteUser := ws.clients.deleteClients(newClient.UserID, oldClients) if isDeleteUser { - atomic.AddInt64(&ws.onlineUserNum, -1) + ws.onlineUserNum.Add(-1) } for _, c := range oldClients { err := c.KickOnlineMessage() @@ -350,18 +363,18 @@ func (ws *WsServer) unregisterClient(client *Client) { defer ws.clientPool.Put(client) isDeleteUser := ws.clients.delete(client.UserID, client.ctx.GetRemoteAddr()) if isDeleteUser { - atomic.AddInt64(&ws.onlineUserNum, -1) + ws.onlineUserNum.Add(-1) } - atomic.AddInt64(&ws.onlineUserConnNum, -1) + ws.onlineUserConnNum.Add(-1) ws.SetUserOnlineStatus(client.ctx, client, constant.Offline) log.ZInfo(client.ctx, "user offline", "close reason", client.closedErr, "online user Num", ws.onlineUserNum, "online user conn Num", - ws.onlineUserConnNum, + ws.onlineUserConnNum.Load(), ) } func (ws *WsServer) wsHandler(w http.ResponseWriter, r *http.Request) { connContext := newContext(w, r) - if ws.onlineUserConnNum >= ws.wsMaxConnNum { + if ws.onlineUserConnNum.Load() >= ws.wsMaxConnNum { httpError(connContext, errs.ErrConnOverMaxNumLimit) return } From 3ce1e6ed71c52f744b38feaf7f050379fcddb8e7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E9=86=89=E5=A2=A8=E5=B1=85=E5=A3=AB?= <94230530+go75@users.noreply.github.com> Date: Thu, 2 Nov 2023 12:27:59 +0800 Subject: [PATCH 2/5] Optimize the control structure (#1294) * cicd: robot automated Change * cicd: robot automated Change --------- Co-authored-by: go75 --- cmd/openim-api/main.go | 1 + internal/msgtransfer/init.go | 6 ++++-- internal/rpc/conversation/conversaion.go | 2 +- internal/tools/msg.go | 6 ++++-- pkg/apistruct/manage.go | 2 +- pkg/common/cmd/api.go | 4 +++- pkg/common/cmd/rpc.go | 4 +++- pkg/common/config/parse.go | 3 ++- pkg/common/db/relation/mysql_init.go | 3 ++- .../k8s_discovery_register.go | 6 ++++-- pkg/common/version/base.go | 6 +++--- pkg/common/version/types.go | 18 +++++++++--------- pkg/common/version/version.go | 16 ++++++++-------- 13 files changed, 45 insertions(+), 32 deletions(-) diff --git a/cmd/openim-api/main.go b/cmd/openim-api/main.go index d1f5cb3f8..174300dc7 100644 --- a/cmd/openim-api/main.go +++ b/cmd/openim-api/main.go @@ -26,6 +26,7 @@ import ( "github.com/OpenIMSDK/protocol/constant" "github.com/OpenIMSDK/tools/discoveryregistry" "github.com/OpenIMSDK/tools/log" + "github.com/openimsdk/open-im-server/v3/internal/api" "github.com/openimsdk/open-im-server/v3/pkg/common/cmd" "github.com/openimsdk/open-im-server/v3/pkg/common/config" diff --git a/internal/msgtransfer/init.go b/internal/msgtransfer/init.go index 4487826ee..7efc35794 100644 --- a/internal/msgtransfer/init.go +++ b/internal/msgtransfer/init.go @@ -16,10 +16,12 @@ package msgtransfer import ( "fmt" - "github.com/openimsdk/open-im-server/v3/pkg/common/discovery_register" + "sync" + "google.golang.org/grpc" "google.golang.org/grpc/credentials/insecure" - "sync" + + "github.com/openimsdk/open-im-server/v3/pkg/common/discovery_register" "github.com/OpenIMSDK/tools/mw" diff --git a/internal/rpc/conversation/conversaion.go b/internal/rpc/conversation/conversaion.go index 0ea7d54be..d39881b35 100644 --- a/internal/rpc/conversation/conversaion.go +++ b/internal/rpc/conversation/conversaion.go @@ -112,7 +112,7 @@ func (c *conversationServer) SetConversation(ctx context.Context, req *pbconvers return resp, nil } -//nolint +// nolint func (c *conversationServer) SetConversations(ctx context.Context, req *pbconversation.SetConversationsReq, ) (*pbconversation.SetConversationsResp, error) { diff --git a/internal/tools/msg.go b/internal/tools/msg.go index 5397689b2..ca095051c 100644 --- a/internal/tools/msg.go +++ b/internal/tools/msg.go @@ -17,11 +17,13 @@ package tools import ( "context" "fmt" - "github.com/openimsdk/open-im-server/v3/pkg/common/discovery_register" + "math" + "github.com/redis/go-redis/v9" "google.golang.org/grpc" "google.golang.org/grpc/credentials/insecure" - "math" + + "github.com/openimsdk/open-im-server/v3/pkg/common/discovery_register" "github.com/OpenIMSDK/tools/errs" "github.com/OpenIMSDK/tools/log" diff --git a/pkg/apistruct/manage.go b/pkg/apistruct/manage.go index 1238b4757..411bd5662 100644 --- a/pkg/apistruct/manage.go +++ b/pkg/apistruct/manage.go @@ -41,7 +41,7 @@ type SendMsgReq struct { type BatchSendMsgReq struct { SendMsg IsSendAll bool `json:"isSendAll"` - RecvIDs []string `json:"recvIDs" binding:"required"` + RecvIDs []string `json:"recvIDs" binding:"required"` } type BatchSendMsgResp struct { diff --git a/pkg/common/cmd/api.go b/pkg/common/cmd/api.go index 7ce872fac..6c74ddad9 100644 --- a/pkg/common/cmd/api.go +++ b/pkg/common/cmd/api.go @@ -16,9 +16,11 @@ package cmd import ( "fmt" + "github.com/OpenIMSDK/protocol/constant" - config2 "github.com/openimsdk/open-im-server/v3/pkg/common/config" "github.com/spf13/cobra" + + config2 "github.com/openimsdk/open-im-server/v3/pkg/common/config" ) type ApiCmd struct { diff --git a/pkg/common/cmd/rpc.go b/pkg/common/cmd/rpc.go index 224edc0a0..0ccc37fcb 100644 --- a/pkg/common/cmd/rpc.go +++ b/pkg/common/cmd/rpc.go @@ -16,11 +16,13 @@ package cmd import ( "errors" + "github.com/OpenIMSDK/protocol/constant" - config2 "github.com/openimsdk/open-im-server/v3/pkg/common/config" "github.com/spf13/cobra" "google.golang.org/grpc" + config2 "github.com/openimsdk/open-im-server/v3/pkg/common/config" + "github.com/OpenIMSDK/tools/discoveryregistry" "github.com/openimsdk/open-im-server/v3/pkg/common/startrpc" diff --git a/pkg/common/config/parse.go b/pkg/common/config/parse.go index 2c20f1a98..fee5efbe9 100644 --- a/pkg/common/config/parse.go +++ b/pkg/common/config/parse.go @@ -21,8 +21,9 @@ import ( "path/filepath" "github.com/OpenIMSDK/protocol/constant" - "github.com/openimsdk/open-im-server/v3/pkg/msgprocessor" "gopkg.in/yaml.v3" + + "github.com/openimsdk/open-im-server/v3/pkg/msgprocessor" ) //go:embed version diff --git a/pkg/common/db/relation/mysql_init.go b/pkg/common/db/relation/mysql_init.go index 16b8c99fa..41399d5ca 100644 --- a/pkg/common/db/relation/mysql_init.go +++ b/pkg/common/db/relation/mysql_init.go @@ -22,10 +22,11 @@ import ( "github.com/OpenIMSDK/tools/log" "github.com/OpenIMSDK/tools/mw/specialerror" mysqldriver "github.com/go-sql-driver/mysql" - "github.com/openimsdk/open-im-server/v3/pkg/common/config" "gorm.io/driver/mysql" "gorm.io/gorm" "gorm.io/gorm/logger" + + "github.com/openimsdk/open-im-server/v3/pkg/common/config" ) const ( diff --git a/pkg/common/discovery_register/k8s_discovery_register.go b/pkg/common/discovery_register/k8s_discovery_register.go index 811d35b96..81543a447 100644 --- a/pkg/common/discovery_register/k8s_discovery_register.go +++ b/pkg/common/discovery_register/k8s_discovery_register.go @@ -4,12 +4,14 @@ import ( "context" "errors" "fmt" + "time" + "github.com/OpenIMSDK/tools/discoveryregistry" openkeeper "github.com/OpenIMSDK/tools/discoveryregistry/zookeeper" "github.com/OpenIMSDK/tools/log" - "github.com/openimsdk/open-im-server/v3/pkg/common/config" "google.golang.org/grpc" - "time" + + "github.com/openimsdk/open-im-server/v3/pkg/common/config" ) func NewDiscoveryRegister(envType string) (discoveryregistry.SvcDiscoveryRegistry, error) { diff --git a/pkg/common/version/base.go b/pkg/common/version/base.go index 4161107c1..ac214269f 100644 --- a/pkg/common/version/base.go +++ b/pkg/common/version/base.go @@ -40,8 +40,8 @@ var ( // companion .gitattributes file containing 'export-subst' in this same // directory. See also https://git-scm.com/docs/gitattributes gitVersion string = "latest" - gitCommit string = "" // sha1 from git, output of $(git rev-parse HEAD) - gitTreeState string = "" // state of git tree, either "clean" or "dirty" + gitCommit string = "" // sha1 from git, output of $(git rev-parse HEAD) + gitTreeState string = "" // state of git tree, either "clean" or "dirty" buildDate string = "1970-01-01T00:00:00Z" // build date in ISO8601 format, output of $(date -u +'%Y-%m-%dT%H:%M:%SZ') -) \ No newline at end of file +) diff --git a/pkg/common/version/types.go b/pkg/common/version/types.go index 3512640cc..ee4664149 100644 --- a/pkg/common/version/types.go +++ b/pkg/common/version/types.go @@ -4,15 +4,15 @@ package version // TODO: Add []string of api versions supported? It's still unclear // how we'll want to distribute that information. type Info struct { - Major string `json:"major,omitempty"` - Minor string `json:"minor,omitempty"` - GitVersion string `json:"gitVersion"` + Major string `json:"major,omitempty"` + Minor string `json:"minor,omitempty"` + GitVersion string `json:"gitVersion"` GitTreeState string `json:"gitTreeState,omitempty"` - GitCommit string `json:"gitCommit,omitempty"` - BuildDate string `json:"buildDate"` - GoVersion string `json:"goVersion"` - Compiler string `json:"compiler"` - Platform string `json:"platform"` + GitCommit string `json:"gitCommit,omitempty"` + BuildDate string `json:"buildDate"` + GoVersion string `json:"goVersion"` + Compiler string `json:"compiler"` + Platform string `json:"platform"` } type Output struct { @@ -21,7 +21,7 @@ type Output struct { } type OpenIMClientVersion struct { - ClientVersion string `json:"clientVersion,omitempty" yaml:"clientVersion,omitempty"` //sdk core version + ClientVersion string `json:"clientVersion,omitempty" yaml:"clientVersion,omitempty"` //sdk core version } // String returns info as a human-friendly version string. diff --git a/pkg/common/version/version.go b/pkg/common/version/version.go index b841c3f65..b8ccfaf81 100644 --- a/pkg/common/version/version.go +++ b/pkg/common/version/version.go @@ -13,15 +13,15 @@ func Get() Info { // These variables typically come from -ldflags settings and in // their absence fallback to the settings in ./base.go return Info{ - Major: gitMajor, - Minor: gitMinor, - GitVersion: gitVersion, + Major: gitMajor, + Minor: gitMinor, + GitVersion: gitVersion, GitTreeState: gitTreeState, - GitCommit: gitCommit, - BuildDate: buildDate, - GoVersion: runtime.Version(), - Compiler: runtime.Compiler, - Platform: fmt.Sprintf("%s/%s", runtime.GOOS, runtime.GOARCH), + GitCommit: gitCommit, + BuildDate: buildDate, + GoVersion: runtime.Version(), + Compiler: runtime.Compiler, + Platform: fmt.Sprintf("%s/%s", runtime.GOOS, runtime.GOARCH), } } From d2f0af1b8b0fad57e0518bba230fce90e193b8ed Mon Sep 17 00:00:00 2001 From: Gordon <46924906+FGadvancer@users.noreply.github.com> Date: Thu, 2 Nov 2023 12:28:11 +0800 Subject: [PATCH 3/5] fix: go mod update. (#1306) * fix: to start im or chat, ZooKeeper must be started first. * fix: msg gateway start output err info Signed-off-by: Gordon <1432970085@qq.com> * fix: msg gateway start output err info Signed-off-by: Gordon <1432970085@qq.com> * chore: package path changes Signed-off-by: withchao <993506633@qq.com> * fix: go mod update Signed-off-by: Gordon <1432970085@qq.com> * fix: token update Signed-off-by: Gordon <1432970085@qq.com> * chore: package path changes Signed-off-by: withchao <993506633@qq.com> * chore: package path changes Signed-off-by: withchao <993506633@qq.com> * fix: token update Signed-off-by: Gordon <1432970085@qq.com> * fix: token update Signed-off-by: Gordon <1432970085@qq.com> * fix: token update Signed-off-by: Gordon <1432970085@qq.com> * fix: token update Signed-off-by: Gordon <1432970085@qq.com> * fix: token update Signed-off-by: Gordon <1432970085@qq.com> * fix: token update Signed-off-by: Gordon <1432970085@qq.com> * fix: get all userID Signed-off-by: Gordon <46924906+FGadvancer@users.noreply.github.com> * fix: msggateway add online status call Signed-off-by: Gordon <46924906+FGadvancer@users.noreply.github.com> * refactor: log change Signed-off-by: Gordon <46924906+FGadvancer@users.noreply.github.com> * refactor: log change Signed-off-by: Gordon <46924906+FGadvancer@users.noreply.github.com> * chore: network mode change Signed-off-by: Gordon <46924906+FGadvancer@users.noreply.github.com> * cicd: robot automated Change Signed-off-by: github-actions[bot] <41898282+github-actions[bot]@users.noreply.github.com> * cicd: robot automated Change Signed-off-by: github-actions[bot] <41898282+github-actions[bot]@users.noreply.github.com> * cicd: robot automated Change Signed-off-by: github-actions[bot] <41898282+github-actions[bot]@users.noreply.github.com> * feat: add api of get server time Signed-off-by: Gordon <46924906+FGadvancer@users.noreply.github.com> * feat: remove go work sum Signed-off-by: Gordon <46924906+FGadvancer@users.noreply.github.com> * cicd: robot automated Change Signed-off-by: github-actions[bot] <41898282+github-actions[bot]@users.noreply.github.com> * fix: pull message add isRead field Signed-off-by: Gordon <46924906+FGadvancer@users.noreply.github.com> * fix: check msg-transfer script Signed-off-by: Gordon <46924906+FGadvancer@users.noreply.github.com> * fix: script update Signed-off-by: Gordon <46924906+FGadvancer@users.noreply.github.com> * fix: script update Signed-off-by: Gordon <46924906+FGadvancer@users.noreply.github.com> * fix: script update Signed-off-by: Gordon <46924906+FGadvancer@users.noreply.github.com> * fix: script update Signed-off-by: Gordon <46924906+FGadvancer@users.noreply.github.com> * fix: script update Signed-off-by: Gordon <46924906+FGadvancer@users.noreply.github.com> * fix: script update Signed-off-by: Gordon <46924906+FGadvancer@users.noreply.github.com> * fix: script update Signed-off-by: Gordon <46924906+FGadvancer@users.noreply.github.com> * fix: script update Signed-off-by: Gordon <46924906+FGadvancer@users.noreply.github.com> * fix: script update Signed-off-by: Gordon <46924906+FGadvancer@users.noreply.github.com> * fix: script update Signed-off-by: Gordon <46924906+FGadvancer@users.noreply.github.com> * fix: script update Signed-off-by: Gordon <46924906+FGadvancer@users.noreply.github.com> * fix: script update Signed-off-by: Gordon <46924906+FGadvancer@users.noreply.github.com> * fix: script update Signed-off-by: Gordon <46924906+FGadvancer@users.noreply.github.com> * fix: script update Signed-off-by: Gordon <46924906+FGadvancer@users.noreply.github.com> * fix: script update Signed-off-by: Gordon <46924906+FGadvancer@users.noreply.github.com> * fix: script update Signed-off-by: Gordon <46924906+FGadvancer@users.noreply.github.com> * fix: script update Signed-off-by: Gordon <46924906+FGadvancer@users.noreply.github.com> * fix: start don't kill old process Signed-off-by: Gordon <46924906+FGadvancer@users.noreply.github.com> * cicd: robot automated Change Signed-off-by: github-actions[bot] <41898282+github-actions[bot]@users.noreply.github.com> * fix: check component Signed-off-by: Gordon <46924906+FGadvancer@users.noreply.github.com> * fix: pull message set isRead only message come from single. Signed-off-by: Gordon <46924906+FGadvancer@users.noreply.github.com> * cicd: robot automated Change Signed-off-by: github-actions[bot] <41898282+github-actions[bot]@users.noreply.github.com> * fix: multiple gateway kick user each other. Signed-off-by: Gordon <46924906+FGadvancer@users.noreply.github.com> * fix: multiple gateway kick user each other. Signed-off-by: Gordon <46924906+FGadvancer@users.noreply.github.com> * fix: multiple gateway kick user each other. Signed-off-by: Gordon <46924906+FGadvancer@users.noreply.github.com> * fix: multiple gateway kick user each other. Signed-off-by: Gordon <46924906+FGadvancer@users.noreply.github.com> * fix: multiple gateway kick user each other. Signed-off-by: Gordon <46924906+FGadvancer@users.noreply.github.com> * fix: multiple gateway kick user each other. Signed-off-by: Gordon <46924906+FGadvancer@users.noreply.github.com> * cicd: robot automated Change Signed-off-by: github-actions[bot] <41898282+github-actions[bot]@users.noreply.github.com> * fix: multiple gateway kick user each other. Signed-off-by: Gordon <46924906+FGadvancer@users.noreply.github.com> * fix: add ex field to update group info. Signed-off-by: Gordon <46924906+FGadvancer@users.noreply.github.com> * cicd: robot automated Change * cicd: robot automated Change * refactor: change project module name. Signed-off-by: Gordon <46924906+FGadvancer@users.noreply.github.com> * refactor: change project module name. Signed-off-by: Gordon <46924906+FGadvancer@users.noreply.github.com> * refactor: change project module name. Signed-off-by: Gordon <46924906+FGadvancer@users.noreply.github.com> * cicd: robot automated Change * test: for pressure test. Signed-off-by: Gordon <46924906+FGadvancer@users.noreply.github.com> * test: for pressure test. Signed-off-by: Gordon <46924906+FGadvancer@users.noreply.github.com> * test: for pressure test. Signed-off-by: Gordon <46924906+FGadvancer@users.noreply.github.com> * test: message log. Signed-off-by: Gordon <46924906+FGadvancer@users.noreply.github.com> * cicd: robot automated Change * fxi: component check output valid info. Signed-off-by: Gordon <46924906+FGadvancer@users.noreply.github.com> * fxi: component check output valid info. Signed-off-by: Gordon <46924906+FGadvancer@users.noreply.github.com> * test: send message test log. Signed-off-by: Gordon <46924906+FGadvancer@users.noreply.github.com> * cicd: robot automated Change * cicd: robot automated Change * test: remove info log. Signed-off-by: Gordon <46924906+FGadvancer@users.noreply.github.com> * feat: api of send message add sendTime field. Signed-off-by: Gordon <46924906+FGadvancer@users.noreply.github.com> * fix: add callback for update user's info. * cicd: robot automated Change * fix: change callback command name. * cicd: robot automated Change * fix: single chat unread status change. * fix: single chat unread status change. * fix: single chat unread status change. * fix: user status change. * cicd: robot automated Change * fix: user status change. * fix: user status change. * fix: user status change. * cicd: robot automated Change * fix: ws close when user logout. * fix: remove repeat platform on online status. * cicd: robot automated Change * fix: api send messages for notification conversation . * fix: api send messages for notification conversation . * fix: api send messages for notification conversation . * fix: api send messages for notification conversation . * fix: api send messages for notification conversation . * fix: api send messages for notification conversation. * fix: api send messages for notification conversation. * fix: api send messages for notification conversation. * fix: api send messages for notification conversation. * fix: api send messages for notification conversation. * fix: api send messages for notification conversation. * re: remove router of unsubscribeStatus. * re: remove router of unsubscribeStatus. * re: remove router of unsubscribeStatus. * re: remove router of unsubscribeStatus. * fix: reset branch * fix: not support redis cluster. CROSSSLOT Keys in request don't hash to the same slot * fix: update user.FaceURL do not trigger GroupMemberInfoSetNotification * cicd: robot automated Change * fix: api send messages for notification conversation. * fix: api send messages for notification conversation. * fix: zk add close to avoid zk block. * fix: go mod update. --------- Signed-off-by: Gordon <1432970085@qq.com> Signed-off-by: withchao <993506633@qq.com> Signed-off-by: Gordon <46924906+FGadvancer@users.noreply.github.com> Signed-off-by: github-actions[bot] <41898282+github-actions[bot]@users.noreply.github.com> Co-authored-by: withchao <993506633@qq.com> Co-authored-by: Xinwei Xiong <3293172751NSS@gmail.com> Co-authored-by: FGadvancer Co-authored-by: withchao --- go.mod | 2 +- go.sum | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/go.mod b/go.mod index 82e38e8e3..64a4db405 100644 --- a/go.mod +++ b/go.mod @@ -38,7 +38,7 @@ require github.com/google/uuid v1.3.1 require ( github.com/IBM/sarama v1.41.3 github.com/OpenIMSDK/protocol v0.0.30 - github.com/OpenIMSDK/tools v0.0.15 + github.com/OpenIMSDK/tools v0.0.16 github.com/aliyun/aliyun-oss-go-sdk v2.2.9+incompatible github.com/go-redis/redis v6.15.9+incompatible github.com/go-sql-driver/mysql v1.7.1 diff --git a/go.sum b/go.sum index c64fc8d68..4116d5428 100644 --- a/go.sum +++ b/go.sum @@ -20,8 +20,8 @@ github.com/IBM/sarama v1.41.3 h1:MWBEJ12vHC8coMjdEXFq/6ftO6DUZnQlFYcxtOJFa7c= github.com/IBM/sarama v1.41.3/go.mod h1:Xxho9HkHd4K/MDUo/T/sOqwtX/17D33++E9Wib6hUdQ= github.com/OpenIMSDK/protocol v0.0.30 h1:MiHO6PyQMR9ojBHNnSFxCHLmsoE2xZqaiYj975JiZnM= github.com/OpenIMSDK/protocol v0.0.30/go.mod h1:F25dFrwrIx3lkNoiuf6FkCfxuwf8L4Z8UIsdTHP/r0Y= -github.com/OpenIMSDK/tools v0.0.15 h1:FF3m0TQUG56pJC15a11jmBG6Y1EjXarEW4JV3CBF/Jc= -github.com/OpenIMSDK/tools v0.0.15/go.mod h1:eg+q4A34Qmu73xkY0mt37FHGMCMfC6CtmOnm0kFEGFI= +github.com/OpenIMSDK/tools v0.0.16 h1:te/GIq2imCMsrRPgU9OObYKbzZ3rT08Lih/o+3QFIz0= +github.com/OpenIMSDK/tools v0.0.16/go.mod h1:eg+q4A34Qmu73xkY0mt37FHGMCMfC6CtmOnm0kFEGFI= github.com/QcloudApi/qcloud_sign_golang v0.0.0-20141224014652-e4130a326409/go.mod h1:1pk82RBxDY/JZnPQrtqHlUFfCctgdorsd9M06fMynOM= github.com/alcortesm/tgz v0.0.0-20161220082320-9c5fe88206d7 h1:uSoVVbwJiQipAclBbw+8quDsfcvFjOpI5iCf4p/cqCs= github.com/alcortesm/tgz v0.0.0-20161220082320-9c5fe88206d7/go.mod h1:6zEj6s6u/ghQa61ZWa/C2Aw3RkjiTBOix7dkqa1VLIs= From 62e9980f3c52d2d17e7db98859496734b2049ba7 Mon Sep 17 00:00:00 2001 From: chao <48119764+withchao@users.noreply.github.com> Date: Thu, 2 Nov 2023 03:11:45 -0500 Subject: [PATCH 4/5] refactor: scheduled task splitting (#1299) * optimize scheduled deletion * optimize scheduled deletion * optimize scheduled deletion * fix: conflicts --- internal/tools/conversation.go | 131 +++++++++++++++---- internal/tools/cron_task.go | 4 +- internal/tools/msg.go | 54 ++++++-- pkg/common/db/controller/conversation.go | 10 ++ pkg/common/db/relation/conversation_model.go | 14 +- pkg/common/db/table/relation/conversation.go | 2 + 6 files changed, 175 insertions(+), 40 deletions(-) diff --git a/internal/tools/conversation.go b/internal/tools/conversation.go index 4adec99ee..5d568cac5 100644 --- a/internal/tools/conversation.go +++ b/internal/tools/conversation.go @@ -16,49 +16,126 @@ package tools import ( "context" - "time" - "github.com/OpenIMSDK/tools/log" "github.com/OpenIMSDK/tools/mcontext" "github.com/OpenIMSDK/tools/utils" + "github.com/openimsdk/open-im-server/v3/pkg/common/db/table/relation" + "math/rand" + "time" ) +//func (c *MsgTool) ConversationsDestructMsgs() { +// log.ZInfo(context.Background(), "start msg destruct cron task") +// ctx := mcontext.NewCtx(utils.GetSelfFuncName()) +// conversations, err := c.conversationDatabase.GetConversationIDsNeedDestruct(ctx) +// if err != nil { +// log.ZError(ctx, "get conversation id need destruct failed", err) +// return +// } +// log.ZDebug(context.Background(), "nums conversations need destruct", "nums", len(conversations)) +// for _, conversation := range conversations { +// ctx = mcontext.NewCtx(utils.GetSelfFuncName() + "-" + utils.OperationIDGenerator() + "-" + conversation.ConversationID + "-" + conversation.OwnerUserID) +// log.ZDebug( +// ctx, +// "UserMsgsDestruct", +// "conversationID", +// conversation.ConversationID, +// "ownerUserID", +// conversation.OwnerUserID, +// "msgDestructTime", +// conversation.MsgDestructTime, +// "lastMsgDestructTime", +// conversation.LatestMsgDestructTime, +// ) +// now := time.Now() +// seqs, err := c.msgDatabase.UserMsgsDestruct(ctx, conversation.OwnerUserID, conversation.ConversationID, conversation.MsgDestructTime, conversation.LatestMsgDestructTime) +// if err != nil { +// log.ZError(ctx, "user msg destruct failed", err, "conversationID", conversation.ConversationID, "ownerUserID", conversation.OwnerUserID) +// continue +// } +// if len(seqs) > 0 { +// if err := c.conversationDatabase.UpdateUsersConversationFiled(ctx, []string{conversation.OwnerUserID}, conversation.ConversationID, map[string]interface{}{"latest_msg_destruct_time": now}); err != nil { +// log.ZError(ctx, "updateUsersConversationFiled failed", err, "conversationID", conversation.ConversationID, "ownerUserID", conversation.OwnerUserID) +// continue +// } +// if err := c.msgNotificationSender.UserDeleteMsgsNotification(ctx, conversation.OwnerUserID, conversation.ConversationID, seqs); err != nil { +// log.ZError(ctx, "userDeleteMsgsNotification failed", err, "conversationID", conversation.ConversationID, "ownerUserID", conversation.OwnerUserID) +// } +// } +// } +//} + func (c *MsgTool) ConversationsDestructMsgs() { log.ZInfo(context.Background(), "start msg destruct cron task") ctx := mcontext.NewCtx(utils.GetSelfFuncName()) - conversations, err := c.conversationDatabase.GetConversationIDsNeedDestruct(ctx) + num, err := c.conversationDatabase.GetAllConversationIDsNumber(ctx) if err != nil { - log.ZError(ctx, "get conversation id need destruct failed", err) + log.ZError(ctx, "GetAllConversationIDsNumber failed", err) return } - log.ZDebug(context.Background(), "nums conversations need destruct", "nums", len(conversations)) - for _, conversation := range conversations { - ctx = mcontext.NewCtx(utils.GetSelfFuncName() + "-" + utils.OperationIDGenerator() + "-" + conversation.ConversationID + "-" + conversation.OwnerUserID) - log.ZDebug( - ctx, - "UserMsgsDestruct", - "conversationID", - conversation.ConversationID, - "ownerUserID", - conversation.OwnerUserID, - "msgDestructTime", - conversation.MsgDestructTime, - "lastMsgDestructTime", - conversation.LatestMsgDestructTime, - ) - now := time.Now() - seqs, err := c.msgDatabase.UserMsgsDestruct(ctx, conversation.OwnerUserID, conversation.ConversationID, conversation.MsgDestructTime, conversation.LatestMsgDestructTime) + const batchNum = 50 + log.ZDebug(ctx, "GetAllConversationIDsNumber", "num", num) + if num == 0 { + return + } + count := int(num/batchNum + num/batchNum/2) + if count < 1 { + count = 1 + } + maxPage := 1 + num/batchNum + if num%batchNum != 0 { + maxPage++ + } + for i := 0; i < count; i++ { + pageNumber := rand.Int63() % maxPage + conversationIDs, err := c.conversationDatabase.PageConversationIDs(ctx, int32(pageNumber), batchNum) if err != nil { - log.ZError(ctx, "user msg destruct failed", err, "conversationID", conversation.ConversationID, "ownerUserID", conversation.OwnerUserID) + log.ZError(ctx, "PageConversationIDs failed", err, "pageNumber", pageNumber) continue } - if len(seqs) > 0 { - if err := c.conversationDatabase.UpdateUsersConversationFiled(ctx, []string{conversation.OwnerUserID}, conversation.ConversationID, map[string]interface{}{"latest_msg_destruct_time": now}); err != nil { - log.ZError(ctx, "updateUsersConversationFiled failed", err, "conversationID", conversation.ConversationID, "ownerUserID", conversation.OwnerUserID) + log.ZError(ctx, "PageConversationIDs failed", err, "pageNumber", pageNumber, "conversationIDsNum", len(conversationIDs), "conversationIDs", conversationIDs) + if len(conversationIDs) == 0 { + continue + } + conversations, err := c.conversationDatabase.GetConversationsByConversationID(ctx, conversationIDs) + if err != nil { + log.ZError(ctx, "GetConversationsByConversationID failed", err, "conversationIDs", conversationIDs) + continue + } + temp := make([]*relation.ConversationModel, 0, len(conversations)) + for i, conversation := range conversations { + if conversation.IsMsgDestruct && conversation.MsgDestructTime != 0 && (time.Now().Unix() > (conversation.MsgDestructTime+conversation.LatestMsgDestructTime.Unix()+8*60*60)) || conversation.LatestMsgDestructTime.IsZero() { + temp = append(temp, conversations[i]) + } + } + for _, conversation := range temp { + ctx = mcontext.NewCtx(utils.GetSelfFuncName() + "-" + utils.OperationIDGenerator() + "-" + conversation.ConversationID + "-" + conversation.OwnerUserID) + log.ZDebug( + ctx, + "UserMsgsDestruct", + "conversationID", + conversation.ConversationID, + "ownerUserID", + conversation.OwnerUserID, + "msgDestructTime", + conversation.MsgDestructTime, + "lastMsgDestructTime", + conversation.LatestMsgDestructTime, + ) + now := time.Now() + seqs, err := c.msgDatabase.UserMsgsDestruct(ctx, conversation.OwnerUserID, conversation.ConversationID, conversation.MsgDestructTime, conversation.LatestMsgDestructTime) + if err != nil { + log.ZError(ctx, "user msg destruct failed", err, "conversationID", conversation.ConversationID, "ownerUserID", conversation.OwnerUserID) continue } - if err := c.msgNotificationSender.UserDeleteMsgsNotification(ctx, conversation.OwnerUserID, conversation.ConversationID, seqs); err != nil { - log.ZError(ctx, "userDeleteMsgsNotification failed", err, "conversationID", conversation.ConversationID, "ownerUserID", conversation.OwnerUserID) + if len(seqs) > 0 { + if err := c.conversationDatabase.UpdateUsersConversationFiled(ctx, []string{conversation.OwnerUserID}, conversation.ConversationID, map[string]interface{}{"latest_msg_destruct_time": now}); err != nil { + log.ZError(ctx, "updateUsersConversationFiled failed", err, "conversationID", conversation.ConversationID, "ownerUserID", conversation.OwnerUserID) + continue + } + if err := c.msgNotificationSender.UserDeleteMsgsNotification(ctx, conversation.OwnerUserID, conversation.ConversationID, seqs); err != nil { + log.ZError(ctx, "userDeleteMsgsNotification failed", err, "conversationID", conversation.ConversationID, "ownerUserID", conversation.OwnerUserID) + } } } } diff --git a/internal/tools/cron_task.go b/internal/tools/cron_task.go index 6702bc6c7..6f4803628 100644 --- a/internal/tools/cron_task.go +++ b/internal/tools/cron_task.go @@ -39,13 +39,13 @@ func StartTask() error { log.ZInfo(context.Background(), "start chatRecordsClearTime cron task", "cron config", config.Config.ChatRecordsClearTime) _, err = c.AddFunc(config.Config.ChatRecordsClearTime, msgTool.AllConversationClearMsgAndFixSeq) if err != nil { - fmt.Println("start allConversationClearMsgAndFixSeq cron failed", err.Error(), config.Config.ChatRecordsClearTime) + log.ZError(context.Background(), "start allConversationClearMsgAndFixSeq cron failed", err) panic(err) } log.ZInfo(context.Background(), "start msgDestruct cron task", "cron config", config.Config.MsgDestructTime) _, err = c.AddFunc(config.Config.MsgDestructTime, msgTool.ConversationsDestructMsgs) if err != nil { - fmt.Println("start conversationsDestructMsgs cron failed", err.Error(), config.Config.ChatRecordsClearTime) + log.ZError(context.Background(), "start conversationsDestructMsgs cron failed", err) panic(err) } c.Start() diff --git a/internal/tools/msg.go b/internal/tools/msg.go index ca095051c..7e06fda4a 100644 --- a/internal/tools/msg.go +++ b/internal/tools/msg.go @@ -17,13 +17,11 @@ package tools import ( "context" "fmt" - "math" - + "github.com/openimsdk/open-im-server/v3/pkg/common/discovery_register" "github.com/redis/go-redis/v9" "google.golang.org/grpc" "google.golang.org/grpc/credentials/insecure" - - "github.com/openimsdk/open-im-server/v3/pkg/common/discovery_register" + "math" "github.com/OpenIMSDK/tools/errs" "github.com/OpenIMSDK/tools/log" @@ -31,6 +29,7 @@ import ( "github.com/OpenIMSDK/tools/mw" "github.com/OpenIMSDK/tools/tx" "github.com/OpenIMSDK/tools/utils" + "math/rand" "github.com/openimsdk/open-im-server/v3/pkg/common/config" "github.com/openimsdk/open-im-server/v3/pkg/common/db/cache" @@ -104,18 +103,55 @@ func InitMsgTool() (*MsgTool, error) { return msgTool, nil } +//func (c *MsgTool) AllConversationClearMsgAndFixSeq() { +// ctx := mcontext.NewCtx(utils.GetSelfFuncName()) +// log.ZInfo(ctx, "============================ start del cron task ============================") +// conversationIDs, err := c.conversationDatabase.GetAllConversationIDs(ctx) +// if err != nil { +// log.ZError(ctx, "GetAllConversationIDs failed", err) +// return +// } +// for _, conversationID := range conversationIDs { +// conversationIDs = append(conversationIDs, utils.GetNotificationConversationIDByConversationID(conversationID)) +// } +// c.ClearConversationsMsg(ctx, conversationIDs) +// log.ZInfo(ctx, "============================ start del cron finished ============================") +//} + func (c *MsgTool) AllConversationClearMsgAndFixSeq() { ctx := mcontext.NewCtx(utils.GetSelfFuncName()) log.ZInfo(ctx, "============================ start del cron task ============================") - conversationIDs, err := c.conversationDatabase.GetAllConversationIDs(ctx) + num, err := c.conversationDatabase.GetAllConversationIDsNumber(ctx) if err != nil { - log.ZError(ctx, "GetAllConversationIDs failed", err) + log.ZError(ctx, "GetAllConversationIDsNumber failed", err) return } - for _, conversationID := range conversationIDs { - conversationIDs = append(conversationIDs, utils.GetNotificationConversationIDByConversationID(conversationID)) + const batchNum = 50 + log.ZDebug(ctx, "GetAllConversationIDsNumber", "num", num) + if num == 0 { + return + } + count := int(num/batchNum + num/batchNum/2) + if count < 1 { + count = 1 + } + maxPage := 1 + num/batchNum + if num%batchNum != 0 { + maxPage++ + } + for i := 0; i < count; i++ { + pageNumber := rand.Int63() % maxPage + conversationIDs, err := c.conversationDatabase.PageConversationIDs(ctx, int32(pageNumber), batchNum) + if err != nil { + log.ZError(ctx, "PageConversationIDs failed", err, "pageNumber", pageNumber) + continue + } + log.ZDebug(ctx, "PageConversationIDs failed", "pageNumber", pageNumber, "conversationIDsNum", len(conversationIDs), "conversationIDs", conversationIDs) + if len(conversationIDs) == 0 { + continue + } + c.ClearConversationsMsg(ctx, conversationIDs) } - c.ClearConversationsMsg(ctx, conversationIDs) log.ZInfo(ctx, "============================ start del cron finished ============================") } diff --git a/pkg/common/db/controller/conversation.go b/pkg/common/db/controller/conversation.go index b93f0bf06..4f7de5ee0 100644 --- a/pkg/common/db/controller/conversation.go +++ b/pkg/common/db/controller/conversation.go @@ -50,6 +50,8 @@ type ConversationDatabase interface { GetConversationIDs(ctx context.Context, userID string) ([]string, error) GetUserConversationIDsHash(ctx context.Context, ownerUserID string) (hash uint64, err error) GetAllConversationIDs(ctx context.Context) ([]string, error) + GetAllConversationIDsNumber(ctx context.Context) (int64, error) + PageConversationIDs(ctx context.Context, pageNumber, showNumber int32) (conversationIDs []string, err error) //GetUserAllHasReadSeqs(ctx context.Context, ownerUserID string) (map[string]int64, error) GetConversationsByConversationID(ctx context.Context, conversationIDs []string) ([]*relationtb.ConversationModel, error) GetConversationIDsNeedDestruct(ctx context.Context) ([]*relationtb.ConversationModel, error) @@ -295,6 +297,14 @@ func (c *conversationDatabase) GetAllConversationIDs(ctx context.Context) ([]str return c.conversationDB.GetAllConversationIDs(ctx) } +func (c *conversationDatabase) GetAllConversationIDsNumber(ctx context.Context) (int64, error) { + return c.conversationDB.GetAllConversationIDsNumber(ctx) +} + +func (c *conversationDatabase) PageConversationIDs(ctx context.Context, pageNumber, showNumber int32) ([]string, error) { + return c.conversationDB.PageConversationIDs(ctx, pageNumber, showNumber) +} + //func (c *conversationDatabase) GetUserAllHasReadSeqs(ctx context.Context, ownerUserID string) (map[string]int64, error) { // return c.cache.GetUserAllHasReadSeqs(ctx, ownerUserID) //} diff --git a/pkg/common/db/relation/conversation_model.go b/pkg/common/db/relation/conversation_model.go index d5ca92ec2..b6c554864 100644 --- a/pkg/common/db/relation/conversation_model.go +++ b/pkg/common/db/relation/conversation_model.go @@ -16,9 +16,7 @@ package relation import ( "context" - "github.com/OpenIMSDK/tools/errs" - "gorm.io/gorm" "github.com/OpenIMSDK/protocol/constant" @@ -188,6 +186,18 @@ func (c *ConversationGorm) GetAllConversationIDs(ctx context.Context) (conversat ) } +func (c *ConversationGorm) GetAllConversationIDsNumber(ctx context.Context) (int64, error) { + var num int64 + err := c.db(ctx).Select("COUNT(DISTINCT conversation_id)").Model(&relation.ConversationModel{}).Count(&num).Error + return num, errs.Wrap(err) +} + +func (c *ConversationGorm) PageConversationIDs(ctx context.Context, pageNumber, showNumber int32) (conversationIDs []string, err error) { + err = c.db(ctx).Distinct("conversation_id").Limit(int(showNumber)).Offset(int((pageNumber-1)*showNumber)).Pluck("conversation_id", &conversationIDs).Error + err = errs.Wrap(err) + return +} + func (c *ConversationGorm) GetUserAllHasReadSeqs( ctx context.Context, ownerUserID string, diff --git a/pkg/common/db/table/relation/conversation.go b/pkg/common/db/table/relation/conversation.go index 7e6c6bdf8..e9680873f 100644 --- a/pkg/common/db/table/relation/conversation.go +++ b/pkg/common/db/table/relation/conversation.go @@ -63,6 +63,8 @@ type ConversationModelInterface interface { GetUserRecvMsgOpt(ctx context.Context, ownerUserID, conversationID string) (opt int, err error) FindSuperGroupRecvMsgNotNotifyUserIDs(ctx context.Context, groupID string) ([]string, error) GetAllConversationIDs(ctx context.Context) ([]string, error) + GetAllConversationIDsNumber(ctx context.Context) (int64, error) + PageConversationIDs(ctx context.Context, pageNumber, showNumber int32) (conversationIDs []string, err error) GetUserAllHasReadSeqs(ctx context.Context, ownerUserID string) (hashReadSeqs map[string]int64, err error) GetConversationsByConversationID(ctx context.Context, conversationIDs []string) ([]*ConversationModel, error) GetConversationIDsNeedDestruct(ctx context.Context) ([]*ConversationModel, error) From cb0bf6443545773e7185dd68bf83e1b6ab43694a Mon Sep 17 00:00:00 2001 From: chao <48119764+withchao@users.noreply.github.com> Date: Thu, 2 Nov 2023 20:40:45 -0500 Subject: [PATCH 5/5] feat: S3 server cache (#1329) * optimize scheduled deletion * optimize scheduled deletion * optimize scheduled deletion * optimize scheduled deletion * minio cache * fix: conflicts * feat: minio cache * feat: cache optimize * feat: cache optimize * feat: cache optimize * feat: cache optimize * feat: cache optimize --- internal/rpc/third/third.go | 10 +- pkg/common/db/cache/meta_cache.go | 4 + pkg/common/db/cache/s3.go | 190 ++++++++++++++++++++++++++++ pkg/common/db/controller/s3.go | 26 ++-- pkg/common/db/s3/cont/controller.go | 30 +++-- pkg/common/db/s3/minio/minio.go | 129 ++----------------- pkg/common/db/s3/minio/struct.go | 22 ---- pkg/common/db/s3/minio/thumbnail.go | 134 ++++++++++++++++++++ 8 files changed, 386 insertions(+), 159 deletions(-) create mode 100644 pkg/common/db/cache/s3.go delete mode 100644 pkg/common/db/s3/minio/struct.go create mode 100644 pkg/common/db/s3/minio/thumbnail.go diff --git a/internal/rpc/third/third.go b/internal/rpc/third/third.go index ae32a1f40..b48eddea9 100644 --- a/internal/rpc/third/third.go +++ b/internal/rpc/third/third.go @@ -67,7 +67,7 @@ func Start(client discoveryregistry.SvcDiscoveryRegistry, server *grpc.Server) e var o s3.Interface switch config.Config.Object.Enable { case "minio": - o, err = minio.NewMinio() + o, err = minio.NewMinio(cache.NewMinioCache(rdb)) case "cos": o, err = cos.NewCos() case "oss": @@ -78,11 +78,17 @@ func Start(client discoveryregistry.SvcDiscoveryRegistry, server *grpc.Server) e if err != nil { return err } + //specialerror.AddErrHandler(func(err error) errs.CodeError { + // if o.IsNotFound(err) { + // return errs.ErrRecordNotFound + // } + // return nil + //}) third.RegisterThirdServer(server, &thirdServer{ apiURL: apiURL, thirdDatabase: controller.NewThirdDatabase(cache.NewMsgCacheModel(rdb), db), userRpcClient: rpcclient.NewUserRpcClient(client), - s3dataBase: controller.NewS3Database(o, relation.NewObjectInfo(db)), + s3dataBase: controller.NewS3Database(rdb, o, relation.NewObjectInfo(db)), defaultExpire: time.Hour * 24 * 7, }) return nil diff --git a/pkg/common/db/cache/meta_cache.go b/pkg/common/db/cache/meta_cache.go index 549a0ea69..45888ed8f 100644 --- a/pkg/common/db/cache/meta_cache.go +++ b/pkg/common/db/cache/meta_cache.go @@ -18,6 +18,7 @@ import ( "context" "encoding/json" "errors" + "github.com/OpenIMSDK/tools/mw/specialerror" "time" "github.com/dtm-labs/rockscache" @@ -209,6 +210,9 @@ func batchGetCache2[T any, K comparable]( return fns(ctx, key) }) if err != nil { + if errs.ErrRecordNotFound.Is(specialerror.ErrCode(errs.Unwrap(err))) { + continue + } return nil, err } res = append(res, val) diff --git a/pkg/common/db/cache/s3.go b/pkg/common/db/cache/s3.go new file mode 100644 index 000000000..a63496d05 --- /dev/null +++ b/pkg/common/db/cache/s3.go @@ -0,0 +1,190 @@ +package cache + +import ( + "context" + "github.com/dtm-labs/rockscache" + "github.com/openimsdk/open-im-server/v3/pkg/common/db/s3" + relationtb "github.com/openimsdk/open-im-server/v3/pkg/common/db/table/relation" + "github.com/redis/go-redis/v9" + "strconv" + "time" +) + +type ObjectCache interface { + metaCache + GetName(ctx context.Context, name string) (*relationtb.ObjectModel, error) + DelObjectName(names ...string) ObjectCache +} + +func NewObjectCacheRedis(rdb redis.UniversalClient, objDB relationtb.ObjectInfoModelInterface) ObjectCache { + rcClient := rockscache.NewClient(rdb, rockscache.NewDefaultOptions()) + return &objectCacheRedis{ + rcClient: rcClient, + expireTime: time.Hour * 12, + objDB: objDB, + metaCache: NewMetaCacheRedis(rcClient), + } +} + +type objectCacheRedis struct { + metaCache + objDB relationtb.ObjectInfoModelInterface + rcClient *rockscache.Client + expireTime time.Duration +} + +func (g *objectCacheRedis) NewCache() ObjectCache { + return &objectCacheRedis{ + rcClient: g.rcClient, + expireTime: g.expireTime, + objDB: g.objDB, + metaCache: NewMetaCacheRedis(g.rcClient, g.metaCache.GetPreDelKeys()...), + } +} + +func (g *objectCacheRedis) DelObjectName(names ...string) ObjectCache { + objectCache := g.NewCache() + keys := make([]string, 0, len(names)) + for _, name := range names { + keys = append(keys, g.getObjectKey(name)) + } + objectCache.AddKeys(keys...) + return objectCache +} + +func (g *objectCacheRedis) getObjectKey(name string) string { + return "OBJECT:" + name +} + +func (g *objectCacheRedis) GetName(ctx context.Context, name string) (*relationtb.ObjectModel, error) { + return getCache(ctx, g.rcClient, g.getObjectKey(name), g.expireTime, func(ctx context.Context) (*relationtb.ObjectModel, error) { + return g.objDB.Take(ctx, name) + }) +} + +type S3Cache interface { + metaCache + GetKey(ctx context.Context, engine string, key string) (*s3.ObjectInfo, error) + DelS3Key(engine string, keys ...string) S3Cache +} + +func NewS3Cache(rdb redis.UniversalClient, s3 s3.Interface) S3Cache { + rcClient := rockscache.NewClient(rdb, rockscache.NewDefaultOptions()) + return &s3CacheRedis{ + rcClient: rcClient, + expireTime: time.Hour * 12, + s3: s3, + metaCache: NewMetaCacheRedis(rcClient), + } +} + +type s3CacheRedis struct { + metaCache + s3 s3.Interface + rcClient *rockscache.Client + expireTime time.Duration +} + +func (g *s3CacheRedis) NewCache() S3Cache { + return &s3CacheRedis{ + rcClient: g.rcClient, + expireTime: g.expireTime, + s3: g.s3, + metaCache: NewMetaCacheRedis(g.rcClient, g.metaCache.GetPreDelKeys()...), + } +} + +func (g *s3CacheRedis) DelS3Key(engine string, keys ...string) S3Cache { + s3cache := g.NewCache() + ks := make([]string, 0, len(keys)) + for _, key := range keys { + ks = append(ks, g.getS3Key(engine, key)) + } + s3cache.AddKeys(ks...) + return s3cache +} + +func (g *s3CacheRedis) getS3Key(engine string, name string) string { + return "S3:" + engine + ":" + name +} + +func (g *s3CacheRedis) GetKey(ctx context.Context, engine string, name string) (*s3.ObjectInfo, error) { + return getCache(ctx, g.rcClient, g.getS3Key(engine, name), g.expireTime, func(ctx context.Context) (*s3.ObjectInfo, error) { + return g.s3.StatObject(ctx, name) + }) +} + +type MinioCache interface { + metaCache + GetImageObjectKeyInfo(ctx context.Context, key string, fn func(ctx context.Context) (*MinioImageInfo, error)) (*MinioImageInfo, error) + GetThumbnailKey(ctx context.Context, key string, format string, width int, height int, minioCache func(ctx context.Context) (string, error)) (string, error) + DelObjectImageInfoKey(keys ...string) MinioCache + DelImageThumbnailKey(key string, format string, width int, height int) MinioCache +} + +func NewMinioCache(rdb redis.UniversalClient) MinioCache { + rcClient := rockscache.NewClient(rdb, rockscache.NewDefaultOptions()) + return &minioCacheRedis{ + rcClient: rcClient, + expireTime: time.Hour * 24 * 7, + metaCache: NewMetaCacheRedis(rcClient), + } +} + +type minioCacheRedis struct { + metaCache + rcClient *rockscache.Client + expireTime time.Duration +} + +func (g *minioCacheRedis) NewCache() MinioCache { + return &minioCacheRedis{ + rcClient: g.rcClient, + expireTime: g.expireTime, + metaCache: NewMetaCacheRedis(g.rcClient, g.metaCache.GetPreDelKeys()...), + } +} + +func (g *minioCacheRedis) DelObjectImageInfoKey(keys ...string) MinioCache { + s3cache := g.NewCache() + ks := make([]string, 0, len(keys)) + for _, key := range keys { + ks = append(ks, g.getObjectImageInfoKey(key)) + } + s3cache.AddKeys(ks...) + return s3cache +} + +func (g *minioCacheRedis) DelImageThumbnailKey(key string, format string, width int, height int) MinioCache { + s3cache := g.NewCache() + s3cache.AddKeys(g.getMinioImageThumbnailKey(key, format, width, height)) + return s3cache +} + +func (g *minioCacheRedis) getObjectImageInfoKey(key string) string { + return "MINIO:IMAGE:" + key +} + +func (g *minioCacheRedis) getMinioImageThumbnailKey(key string, format string, width int, height int) string { + return "MINIO:THUMBNAIL:" + format + ":w" + strconv.Itoa(width) + ":h" + strconv.Itoa(height) + ":" + key +} + +func (g *minioCacheRedis) GetImageObjectKeyInfo(ctx context.Context, key string, fn func(ctx context.Context) (*MinioImageInfo, error)) (*MinioImageInfo, error) { + info, err := getCache(ctx, g.rcClient, g.getObjectImageInfoKey(key), g.expireTime, fn) + if err != nil { + return nil, err + } + return info, nil +} + +func (g *minioCacheRedis) GetThumbnailKey(ctx context.Context, key string, format string, width int, height int, minioCache func(ctx context.Context) (string, error)) (string, error) { + return getCache(ctx, g.rcClient, g.getMinioImageThumbnailKey(key, format, width, height), g.expireTime, minioCache) +} + +type MinioImageInfo struct { + IsImg bool `json:"isImg"` + Width int `json:"width"` + Height int `json:"height"` + Format string `json:"format"` + Etag string `json:"etag"` +} diff --git a/pkg/common/db/controller/s3.go b/pkg/common/db/controller/s3.go index 6ef3e73b3..ffe9ff508 100644 --- a/pkg/common/db/controller/s3.go +++ b/pkg/common/db/controller/s3.go @@ -16,12 +16,13 @@ package controller import ( "context" - "path/filepath" - "time" - + "github.com/openimsdk/open-im-server/v3/pkg/common/db/cache" "github.com/openimsdk/open-im-server/v3/pkg/common/db/s3" "github.com/openimsdk/open-im-server/v3/pkg/common/db/s3/cont" "github.com/openimsdk/open-im-server/v3/pkg/common/db/table/relation" + "github.com/redis/go-redis/v9" + "path/filepath" + "time" ) type S3Database interface { @@ -34,16 +35,18 @@ type S3Database interface { SetObject(ctx context.Context, info *relation.ObjectModel) error } -func NewS3Database(s3 s3.Interface, obj relation.ObjectInfoModelInterface) S3Database { +func NewS3Database(rdb redis.UniversalClient, s3 s3.Interface, obj relation.ObjectInfoModelInterface) S3Database { return &s3Database{ - s3: cont.New(s3), - obj: obj, + s3: cont.New(cache.NewS3Cache(rdb, s3), s3), + cache: cache.NewObjectCacheRedis(rdb, obj), + db: obj, } } type s3Database struct { - s3 *cont.Controller - obj relation.ObjectInfoModelInterface + s3 *cont.Controller + cache cache.ObjectCache + db relation.ObjectInfoModelInterface } func (s *s3Database) PartSize(ctx context.Context, size int64) (int64, error) { @@ -67,11 +70,14 @@ func (s *s3Database) CompleteMultipartUpload(ctx context.Context, uploadID strin } func (s *s3Database) SetObject(ctx context.Context, info *relation.ObjectModel) error { - return s.obj.SetObject(ctx, info) + if err := s.db.SetObject(ctx, info); err != nil { + return err + } + return s.cache.DelObjectName(info.Name).ExecDel(ctx) } func (s *s3Database) AccessURL(ctx context.Context, name string, expire time.Duration, opt *s3.AccessURLOption) (time.Time, string, error) { - obj, err := s.obj.Take(ctx, name) + obj, err := s.cache.GetName(ctx, name) if err != nil { return time.Time{}, "", err } diff --git a/pkg/common/db/s3/cont/controller.go b/pkg/common/db/s3/cont/controller.go index 6faa997a9..2d9d7d32d 100644 --- a/pkg/common/db/s3/cont/controller.go +++ b/pkg/common/db/s3/cont/controller.go @@ -20,6 +20,7 @@ import ( "encoding/hex" "errors" "fmt" + "github.com/openimsdk/open-im-server/v3/pkg/common/db/cache" "path" "strings" "time" @@ -32,12 +33,16 @@ import ( "github.com/openimsdk/open-im-server/v3/pkg/common/db/s3" ) -func New(impl s3.Interface) *Controller { - return &Controller{impl: impl} +func New(cache cache.S3Cache, impl s3.Interface) *Controller { + return &Controller{ + cache: cache, + impl: impl, + } } type Controller struct { - impl s3.Interface + cache cache.S3Cache + impl s3.Interface } func (c *Controller) HashPath(md5 string) string { @@ -69,8 +74,12 @@ func (c *Controller) PartLimit() *s3.PartLimit { return c.impl.PartLimit() } +func (c *Controller) StatObject(ctx context.Context, name string) (*s3.ObjectInfo, error) { + return c.cache.GetKey(ctx, c.impl.Engine(), name) +} + func (c *Controller) GetHashObject(ctx context.Context, hash string) (*s3.ObjectInfo, error) { - return c.impl.StatObject(ctx, c.HashPath(hash)) + return c.StatObject(ctx, c.HashPath(hash)) } func (c *Controller) InitiateUpload(ctx context.Context, hash string, size int64, expire time.Duration, maxParts int) (*InitiateUploadResult, error) { @@ -94,7 +103,7 @@ func (c *Controller) InitiateUpload(ctx context.Context, hash string, size int64 if maxParts > 0 && partNumber > 0 && partNumber < maxParts { return nil, errors.New(fmt.Sprintf("too many parts: %d", partNumber)) } - if info, err := c.impl.StatObject(ctx, c.HashPath(hash)); err == nil { + if info, err := c.StatObject(ctx, c.HashPath(hash)); err == nil { return nil, &HashAlreadyExistsError{Object: info} } else if !c.impl.IsNotFound(err) { return nil, err @@ -168,13 +177,13 @@ func (c *Controller) CompleteUpload(ctx context.Context, uploadID string, partHa fmt.Println("CompleteUpload sum:", hex.EncodeToString(md5Sum[:]), "upload hash:", upload.Hash) return nil, errors.New("md5 mismatching") } - if info, err := c.impl.StatObject(ctx, c.HashPath(upload.Hash)); err == nil { + if info, err := c.StatObject(ctx, c.HashPath(upload.Hash)); err == nil { return &UploadResult{ Key: info.Key, Size: info.Size, Hash: info.ETag, }, nil - } else if !c.impl.IsNotFound(err) { + } else if !c.IsNotFound(err) { return nil, err } cleanObject := make(map[string]struct{}) @@ -200,7 +209,7 @@ func (c *Controller) CompleteUpload(ctx context.Context, uploadID string, partHa } targetKey = result.Key case UploadTypePresigned: - uploadInfo, err := c.impl.StatObject(ctx, upload.Key) + uploadInfo, err := c.StatObject(ctx, upload.Key) if err != nil { return nil, err } @@ -230,6 +239,9 @@ func (c *Controller) CompleteUpload(ctx context.Context, uploadID string, partHa default: return nil, errors.New("invalid upload id type") } + if err := c.cache.DelS3Key(c.impl.Engine(), targetKey).ExecDel(ctx); err != nil { + return nil, err + } return &UploadResult{ Key: targetKey, Size: upload.Size, @@ -253,7 +265,7 @@ func (c *Controller) AuthSign(ctx context.Context, uploadID string, partNumbers } func (c *Controller) IsNotFound(err error) bool { - return c.impl.IsNotFound(err) + return c.impl.IsNotFound(err) || errs.ErrRecordNotFound.Is(err) } func (c *Controller) AccessURL(ctx context.Context, name string, expire time.Duration, opt *s3.AccessURLOption) (string, error) { diff --git a/pkg/common/db/s3/minio/minio.go b/pkg/common/db/s3/minio/minio.go index 7984df5a0..01101e893 100644 --- a/pkg/common/db/s3/minio/minio.go +++ b/pkg/common/db/s3/minio/minio.go @@ -15,20 +15,14 @@ package minio import ( - "bytes" "context" - "encoding/json" "errors" "fmt" - "image" - "image/gif" - "image/jpeg" - "image/png" + "github.com/openimsdk/open-im-server/v3/pkg/common/db/cache" "io" "net/http" "net/url" "path" - "path/filepath" "reflect" "strconv" "strings" @@ -56,13 +50,13 @@ const ( ) const ( - maxImageWidth = 1024 - maxImageHeight = 1024 - maxImageSize = 1024 * 1024 * 50 - pathInfo = "openim/thumbnail" + maxImageWidth = 1024 + maxImageHeight = 1024 + maxImageSize = 1024 * 1024 * 50 + imageThumbnailPath = "openim/thumbnail" ) -func NewMinio() (s3.Interface, error) { +func NewMinio(cache cache.MinioCache) (s3.Interface, error) { u, err := url.Parse(config.Config.Object.Minio.Endpoint) if err != nil { return nil, err @@ -80,6 +74,7 @@ func NewMinio() (s3.Interface, error) { core: &minio.Core{Client: client}, lock: &sync.Mutex{}, init: false, + cache: cache, } if config.Config.Object.Minio.SignEndpoint == "" || config.Config.Object.Minio.SignEndpoint == config.Config.Object.Minio.Endpoint { m.opts = opts @@ -124,6 +119,7 @@ type Minio struct { lock sync.Locker init bool prefix string + cache cache.MinioCache } func (m *Minio) initMinio(ctx context.Context) error { @@ -227,6 +223,7 @@ func (m *Minio) CompleteMultipartUpload(ctx context.Context, uploadID string, na if err != nil { return nil, err } + m.delObjectImageInfoKey(ctx, name, upload.Size) return &s3.CompleteMultipartUploadResult{ Location: upload.Location, Bucket: upload.Bucket, @@ -389,7 +386,7 @@ func (m *Minio) ListUploadedParts(ctx context.Context, uploadID string, name str return res, nil } -func (m *Minio) presignedGetObject(ctx context.Context, name string, expire time.Duration, query url.Values) (string, error) { +func (m *Minio) PresignedGetObject(ctx context.Context, name string, expire time.Duration, query url.Values) (string, error) { if expire <= 0 { expire = time.Hour * 24 * 365 * 99 // 99 years } else if expire < time.Second { @@ -427,109 +424,9 @@ func (m *Minio) AccessURL(ctx context.Context, name string, expire time.Duration } } if opt.Image == nil || (opt.Image.Width < 0 && opt.Image.Height < 0 && opt.Image.Format == "") || (opt.Image.Width > maxImageWidth || opt.Image.Height > maxImageHeight) { - return m.presignedGetObject(ctx, name, expire, reqParams) + return m.PresignedGetObject(ctx, name, expire, reqParams) } - fileInfo, err := m.StatObject(ctx, name) - if err != nil { - return "", err - } - if fileInfo.Size > maxImageSize { - return "", errors.New("file size too large") - } - objectInfoPath := path.Join(pathInfo, fileInfo.ETag, "image.json") - var ( - img image.Image - info minioImageInfo - ) - data, err := m.getObjectData(ctx, objectInfoPath, 1024) - if err == nil { - if err := json.Unmarshal(data, &info); err != nil { - return "", fmt.Errorf("unmarshal minio image info.json error: %w", err) - } - if info.NotImage { - return "", errors.New("not image") - } - } else if m.IsNotFound(err) { - reader, err := m.core.Client.GetObject(ctx, m.bucket, name, minio.GetObjectOptions{}) - if err != nil { - return "", err - } - defer reader.Close() - imageInfo, format, err := ImageStat(reader) - if err == nil { - info.NotImage = false - info.Format = format - info.Width, info.Height = ImageWidthHeight(imageInfo) - img = imageInfo - } else { - info.NotImage = true - } - data, err := json.Marshal(&info) - if err != nil { - return "", err - } - if _, err := m.core.Client.PutObject(ctx, m.bucket, objectInfoPath, bytes.NewReader(data), int64(len(data)), minio.PutObjectOptions{}); err != nil { - return "", err - } - } else { - return "", err - } - if opt.Image.Width > info.Width || opt.Image.Width <= 0 { - opt.Image.Width = info.Width - } - if opt.Image.Height > info.Height || opt.Image.Height <= 0 { - opt.Image.Height = info.Height - } - opt.Image.Format = strings.ToLower(opt.Image.Format) - if opt.Image.Format == formatJpg { - opt.Image.Format = formatJpeg - } - switch opt.Image.Format { - case formatPng: - case formatJpeg: - case formatGif: - default: - if info.Format == formatGif { - opt.Image.Format = formatGif - } else { - opt.Image.Format = formatJpeg - } - } - reqParams.Set("response-content-type", "image/"+opt.Image.Format) - if opt.Image.Width == info.Width && opt.Image.Height == info.Height && opt.Image.Format == info.Format { - return m.presignedGetObject(ctx, name, expire, reqParams) - } - cacheKey := filepath.Join(pathInfo, fileInfo.ETag, fmt.Sprintf("image_w%d_h%d.%s", opt.Image.Width, opt.Image.Height, opt.Image.Format)) - if _, err := m.core.Client.StatObject(ctx, m.bucket, cacheKey, minio.StatObjectOptions{}); err == nil { - return m.presignedGetObject(ctx, cacheKey, expire, reqParams) - } else if !m.IsNotFound(err) { - return "", err - } - if img == nil { - reader, err := m.core.Client.GetObject(ctx, m.bucket, name, minio.GetObjectOptions{}) - if err != nil { - return "", err - } - defer reader.Close() - img, _, err = ImageStat(reader) - if err != nil { - return "", err - } - } - thumbnail := resizeImage(img, opt.Image.Width, opt.Image.Height) - buf := bytes.NewBuffer(nil) - switch opt.Image.Format { - case formatPng: - err = png.Encode(buf, thumbnail) - case formatJpeg: - err = jpeg.Encode(buf, thumbnail, nil) - case formatGif: - err = gif.Encode(buf, thumbnail, nil) - } - if _, err := m.core.Client.PutObject(ctx, m.bucket, cacheKey, buf, int64(buf.Len()), minio.PutObjectOptions{}); err != nil { - return "", err - } - return m.presignedGetObject(ctx, cacheKey, expire, reqParams) + return m.getImageThumbnailURL(ctx, name, expire, opt.Image) } func (m *Minio) getObjectData(ctx context.Context, name string, limit int64) ([]byte, error) { @@ -541,5 +438,5 @@ func (m *Minio) getObjectData(ctx context.Context, name string, limit int64) ([] if limit < 0 { return io.ReadAll(object) } - return io.ReadAll(io.LimitReader(object, 1024)) + return io.ReadAll(io.LimitReader(object, limit)) } diff --git a/pkg/common/db/s3/minio/struct.go b/pkg/common/db/s3/minio/struct.go deleted file mode 100644 index 28b8bfdc3..000000000 --- a/pkg/common/db/s3/minio/struct.go +++ /dev/null @@ -1,22 +0,0 @@ -// Copyright © 2023 OpenIM. All rights reserved. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package minio - -type minioImageInfo struct { - NotImage bool `json:"notImage,omitempty"` - Width int `json:"width,omitempty"` - Height int `json:"height,omitempty"` - Format string `json:"format,omitempty"` -} diff --git a/pkg/common/db/s3/minio/thumbnail.go b/pkg/common/db/s3/minio/thumbnail.go new file mode 100644 index 000000000..fa3581572 --- /dev/null +++ b/pkg/common/db/s3/minio/thumbnail.go @@ -0,0 +1,134 @@ +package minio + +import ( + "bytes" + "context" + "errors" + "fmt" + "github.com/OpenIMSDK/tools/errs" + "github.com/OpenIMSDK/tools/log" + "github.com/minio/minio-go/v7" + "github.com/openimsdk/open-im-server/v3/pkg/common/db/cache" + "github.com/openimsdk/open-im-server/v3/pkg/common/db/s3" + "image" + "image/gif" + "image/jpeg" + "image/png" + "net/url" + "path/filepath" + "strings" + "time" +) + +func (m *Minio) getImageThumbnailURL(ctx context.Context, name string, expire time.Duration, opt *s3.Image) (string, error) { + var img image.Image + info, err := m.cache.GetImageObjectKeyInfo(ctx, name, func(ctx context.Context) (info *cache.MinioImageInfo, err error) { + info, img, err = m.getObjectImageInfo(ctx, name) + return + }) + if err != nil { + return "", err + } + if !info.IsImg { + return "", errs.ErrData.Wrap("object not image") + } + if opt.Width > info.Width || opt.Width <= 0 { + opt.Width = info.Width + } + if opt.Height > info.Height || opt.Height <= 0 { + opt.Height = info.Height + } + opt.Format = strings.ToLower(opt.Format) + if opt.Format == formatJpg { + opt.Format = formatJpeg + } + switch opt.Format { + case formatPng, formatJpeg, formatGif: + default: + opt.Format = "" + } + reqParams := make(url.Values) + if opt.Width == info.Width && opt.Height == info.Height && (opt.Format == info.Format || opt.Format == "") { + reqParams.Set("response-content-type", "image/"+info.Format) + return m.PresignedGetObject(ctx, name, expire, reqParams) + } + if opt.Format == "" { + switch opt.Format { + case formatGif: + opt.Format = formatGif + case formatJpeg: + opt.Format = formatJpeg + case formatPng: + opt.Format = formatPng + default: + opt.Format = formatPng + } + } + key, err := m.cache.GetThumbnailKey(ctx, name, opt.Format, opt.Width, opt.Height, func(ctx context.Context) (string, error) { + if img == nil { + reader, err := m.core.Client.GetObject(ctx, m.bucket, name, minio.GetObjectOptions{}) + if err != nil { + return "", err + } + defer reader.Close() + img, _, err = ImageStat(reader) + if err != nil { + return "", err + } + } + thumbnail := resizeImage(img, opt.Width, opt.Height) + buf := bytes.NewBuffer(nil) + switch opt.Format { + case formatPng: + err = png.Encode(buf, thumbnail) + case formatJpeg: + err = jpeg.Encode(buf, thumbnail, nil) + case formatGif: + err = gif.Encode(buf, thumbnail, nil) + } + cacheKey := filepath.Join(imageThumbnailPath, info.Etag, fmt.Sprintf("image_w%d_h%d.%s", opt.Width, opt.Height, opt.Format)) + if _, err := m.core.Client.PutObject(ctx, m.bucket, cacheKey, buf, int64(buf.Len()), minio.PutObjectOptions{}); err != nil { + return "", err + } + return cacheKey, nil + }) + if err != nil { + return "", err + } + reqParams.Set("response-content-type", "image/"+opt.Format) + return m.PresignedGetObject(ctx, key, expire, reqParams) +} + +func (m *Minio) getObjectImageInfo(ctx context.Context, name string) (*cache.MinioImageInfo, image.Image, error) { + fileInfo, err := m.StatObject(ctx, name) + if err != nil { + return nil, nil, err + } + if fileInfo.Size > maxImageSize { + return nil, nil, errors.New("file size too large") + } + imageData, err := m.getObjectData(ctx, name, fileInfo.Size) + if err != nil { + return nil, nil, err + } + var info cache.MinioImageInfo + imageInfo, format, err := ImageStat(bytes.NewReader(imageData)) + if err == nil { + info.IsImg = true + info.Format = format + info.Width, info.Height = ImageWidthHeight(imageInfo) + } else { + info.IsImg = false + } + info.Etag = fileInfo.ETag + return &info, imageInfo, nil +} + +func (m *Minio) delObjectImageInfoKey(ctx context.Context, key string, size int64) { + if size > 0 && size > maxImageSize { + return + } + if err := m.cache.DelObjectImageInfoKey(key).ExecDel(ctx); err != nil { + log.ZError(ctx, "DelObjectImageInfoKey failed", err, "key", key) + } +}