diff --git a/Dockerfile b/Dockerfile index 4b38d711b..f6a2ee9fe 100644 --- a/Dockerfile +++ b/Dockerfile @@ -8,7 +8,7 @@ ENV SERVER_DIR=/openim-server WORKDIR $SERVER_DIR # Set the Go proxy to improve dependency resolution speed -ENV GOPROXY=https://goproxy.io,direct +# ENV GOPROXY=https://goproxy.io,direct # Copy all files from the current directory into the container COPY . . diff --git a/config/openim-crontask.yml b/config/openim-crontask.yml index c05bd2485..ff69c7dc8 100644 --- a/config/openim-crontask.yml +++ b/config/openim-crontask.yml @@ -1,3 +1,4 @@ cronExecuteTime: 0 2 * * * retainChatRecords: 365 -fileExpireTime: 90 +fileExpireTime: 180 +deleteObjectType: ["msg-picture","msg-file", "msg-voice","msg-video","msg-video-snapshot","sdklog"] \ No newline at end of file diff --git a/go.mod b/go.mod index 581910cc5..bdb95e6cf 100644 --- a/go.mod +++ b/go.mod @@ -1,6 +1,6 @@ module github.com/openimsdk/open-im-server/v3 -go 1.22.0 +go 1.22.7 toolchain go1.23.2 @@ -14,15 +14,15 @@ require ( github.com/gorilla/websocket v1.5.1 github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0 github.com/mitchellh/mapstructure v1.5.0 - github.com/openimsdk/protocol v0.0.72-alpha.55 + github.com/openimsdk/protocol v0.0.72-alpha.59 github.com/openimsdk/tools v0.0.50-alpha.39 github.com/pkg/errors v0.9.1 // indirect github.com/prometheus/client_golang v1.18.0 github.com/stretchr/testify v1.9.0 go.mongodb.org/mongo-driver v1.14.0 google.golang.org/api v0.170.0 - google.golang.org/grpc v1.66.2 - google.golang.org/protobuf v1.34.2 + google.golang.org/grpc v1.68.0 + google.golang.org/protobuf v1.35.1 gopkg.in/yaml.v3 v3.0.1 ) @@ -50,7 +50,7 @@ require ( require ( cloud.google.com/go v0.112.1 // indirect - cloud.google.com/go/compute/metadata v0.3.0 // indirect + cloud.google.com/go/compute/metadata v0.5.0 // indirect cloud.google.com/go/firestore v1.15.0 // indirect cloud.google.com/go/iam v1.1.7 // indirect cloud.google.com/go/longrunning v0.5.5 // indirect @@ -177,13 +177,13 @@ require ( golang.org/x/arch v0.7.0 // indirect golang.org/x/image v0.15.0 // indirect golang.org/x/net v0.29.0 // indirect - golang.org/x/oauth2 v0.21.0 // indirect + golang.org/x/oauth2 v0.23.0 // indirect golang.org/x/sys v0.25.0 // indirect golang.org/x/text v0.18.0 // indirect golang.org/x/time v0.5.0 // indirect google.golang.org/appengine/v2 v2.0.2 // indirect google.golang.org/genproto v0.0.0-20240213162025-012b6fc9bca9 // indirect - google.golang.org/genproto/googleapis/api v0.0.0-20240604185151-ef581f913117 // indirect + google.golang.org/genproto/googleapis/api v0.0.0-20240903143218-8af14fe29dc1 // indirect google.golang.org/genproto/googleapis/rpc v0.0.0-20240903143218-8af14fe29dc1 // indirect gorm.io/gorm v1.25.8 // indirect stathat.com/c/consistent v1.0.0 // indirect diff --git a/go.sum b/go.sum index b6f93316a..20c72edd1 100644 --- a/go.sum +++ b/go.sum @@ -1,8 +1,8 @@ cloud.google.com/go v0.26.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw= cloud.google.com/go v0.112.1 h1:uJSeirPke5UNZHIb4SxfZklVSiWWVqW4oXlETwZziwM= cloud.google.com/go v0.112.1/go.mod h1:+Vbu+Y1UU+I1rjmzeMOb/8RfkKJK2Gyxi1X6jJCZLo4= -cloud.google.com/go/compute/metadata v0.3.0 h1:Tz+eQXMEqDIKRsmY3cHTL6FVaynIjX2QxYC4trgAKZc= -cloud.google.com/go/compute/metadata v0.3.0/go.mod h1:zFmK7XCadkQkj6TtorcaGlCW1hT1fIilQDwofLpJ20k= +cloud.google.com/go/compute/metadata v0.5.0 h1:Zr0eK8JbFv6+Wi4ilXAR8FJ3wyNdpxHKJNPos6LTZOY= +cloud.google.com/go/compute/metadata v0.5.0/go.mod h1:aHnloV2TPI38yx4s9+wAZhHykWvVCfu7hQbF+9CWoiY= cloud.google.com/go/firestore v1.15.0 h1:/k8ppuWOtNuDHt2tsRV42yI21uaGnKDEQnRFeBpbFF8= cloud.google.com/go/firestore v1.15.0/go.mod h1:GWOxFXcv8GZUtYpWHw/w6IuYNux/BtmeVTMmjrm4yhk= cloud.google.com/go/iam v1.1.7 h1:z4VHOhwKLF/+UYXAJDFwGtNF0b6gjsW1Pk9Ml0U/IoM= @@ -319,8 +319,8 @@ github.com/onsi/gomega v1.25.0 h1:Vw7br2PCDYijJHSfBOWhov+8cAnUf8MfMaIOV323l6Y= github.com/onsi/gomega v1.25.0/go.mod h1:r+zV744Re+DiYCIPRlYOTxn0YkOLcAnW8k1xXdMPGhM= github.com/openimsdk/gomake v0.0.14-alpha.5 h1:VY9c5x515lTfmdhhPjMvR3BBRrRquAUCFsz7t7vbv7Y= github.com/openimsdk/gomake v0.0.14-alpha.5/go.mod h1:PndCozNc2IsQIciyn9mvEblYWZwJmAI+06z94EY+csI= -github.com/openimsdk/protocol v0.0.72-alpha.55 h1:9PPWPHvkFk3neBSbNr+IoOdKIFjxTvEqUfMK/TEq1+8= -github.com/openimsdk/protocol v0.0.72-alpha.55/go.mod h1:OZQA9FR55lseYoN2Ql1XAHYKHJGu7OMNkUbuekrKCM8= +github.com/openimsdk/protocol v0.0.72-alpha.59 h1:+ycb2+68mLKPIo7VrxF0id/GXP6OqZ2/nBM1YZQr7qY= +github.com/openimsdk/protocol v0.0.72-alpha.59/go.mod h1:Iet+piS/jaS+kWWyj6EEr36mk4ISzIRYjoMSVA4dq2M= github.com/openimsdk/tools v0.0.50-alpha.39 h1:CebneQY+4D12qMl5XnYRAUOsEeNzHFLWpqtM4c3CiaA= github.com/openimsdk/tools v0.0.50-alpha.39/go.mod h1:/Em/fQH46CuWf60+hcmvZyboGCQpSDEb2MdQ4nmQRAk= github.com/pelletier/go-toml/v2 v2.2.2 h1:aYUidT7k73Pcl9nb2gScu7NSrKCSHIDE89b3+6Wq+LM= @@ -497,8 +497,8 @@ golang.org/x/net v0.7.0/go.mod h1:2Tu9+aMcznHK/AK1HMvgo6xiTLG5rD5rZLDS+rp2Bjs= golang.org/x/net v0.29.0 h1:5ORfpBpCs4HzDYoodCDBbwHzdR5UrLBZ3sOnUJmFoHo= golang.org/x/net v0.29.0/go.mod h1:gLkgy8jTGERgjzMic6DS9+SP0ajcu6Xu3Orq/SpETg0= golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= -golang.org/x/oauth2 v0.21.0 h1:tsimM75w1tF/uws5rbeHzIWxEqElMehnc+iW793zsZs= -golang.org/x/oauth2 v0.21.0/go.mod h1:XYTD2NtWslqkgxebSiOHnXEap4TF09sJSc7H1sXbhtI= +golang.org/x/oauth2 v0.23.0 h1:PbgcYx2W7i4LvjJWEbf0ngHV6qJYr86PkAV3bXdLEbs= +golang.org/x/oauth2 v0.23.0/go.mod h1:XYTD2NtWslqkgxebSiOHnXEap4TF09sJSc7H1sXbhtI= golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20181108010431-42b317875d0f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= @@ -565,8 +565,8 @@ google.golang.org/genproto v0.0.0-20190819201941-24fa4b261c55/go.mod h1:DMBHOl98 google.golang.org/genproto v0.0.0-20200526211855-cb27e3aa2013/go.mod h1:NbSheEEYHJ7i3ixzK3sjbqSGDJWnxyFXZblF3eUsNvo= google.golang.org/genproto v0.0.0-20240213162025-012b6fc9bca9 h1:9+tzLLstTlPTRyJTh+ah5wIMsBW5c4tQwGTN3thOW9Y= google.golang.org/genproto v0.0.0-20240213162025-012b6fc9bca9/go.mod h1:mqHbVIp48Muh7Ywss/AD6I5kNVKZMmAa/QEW58Gxp2s= -google.golang.org/genproto/googleapis/api v0.0.0-20240604185151-ef581f913117 h1:+rdxYoE3E5htTEWIe15GlN6IfvbURM//Jt0mmkmm6ZU= -google.golang.org/genproto/googleapis/api v0.0.0-20240604185151-ef581f913117/go.mod h1:OimBR/bc1wPO9iV4NC2bpyjy3VnAwZh5EBPQdtaE5oo= +google.golang.org/genproto/googleapis/api v0.0.0-20240903143218-8af14fe29dc1 h1:hjSy6tcFQZ171igDaN5QHOw2n6vx40juYbC/x67CEhc= +google.golang.org/genproto/googleapis/api v0.0.0-20240903143218-8af14fe29dc1/go.mod h1:qpvKtACPCQhAdu3PyQgV4l3LMXZEtft7y8QcarRsp9I= google.golang.org/genproto/googleapis/rpc v0.0.0-20240903143218-8af14fe29dc1 h1:pPJltXNxVzT4pK9yD8vR9X75DaWYYmLGMsEvBfFQZzQ= google.golang.org/genproto/googleapis/rpc v0.0.0-20240903143218-8af14fe29dc1/go.mod h1:UqMtugtsSgubUsoxbuAoiCXvqvErP7Gf0so0mK9tHxU= google.golang.org/grpc v1.19.0/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZiDr8c= @@ -574,8 +574,8 @@ google.golang.org/grpc v1.23.0/go.mod h1:Y5yQAOtifL1yxbo5wqy6BxZv8vAUGQwXBOALyac google.golang.org/grpc v1.25.1/go.mod h1:c3i+UQWmh7LiEpx4sFZnkU36qjEYZ0imhYfXVyQciAY= google.golang.org/grpc v1.27.0/go.mod h1:qbnxyOmOxrQa7FizSgH+ReBfzJrCY1pSN7KXBS8abTk= google.golang.org/grpc v1.33.2/go.mod h1:JMHMWHQWaTccqQQlmk3MJZS+GWXOdAesneDmEnv2fbc= -google.golang.org/grpc v1.66.2 h1:3QdXkuq3Bkh7w+ywLdLvM56cmGvQHUMZpiCzt6Rqaoo= -google.golang.org/grpc v1.66.2/go.mod h1:s3/l6xSSCURdVfAnL+TqCNMyTDAGN6+lZeVxnZR128Y= +google.golang.org/grpc v1.68.0 h1:aHQeeJbo8zAkAa3pRzrVjZlbz6uSfeOXlJNQM0RAbz0= +google.golang.org/grpc v1.68.0/go.mod h1:fmSPC5AsjSBCK54MyHRx48kpOti1/jRfOlwEWywNjWA= google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod h1:DFci5gLYBciE7Vtevhsrf46CRTquxDuWsQurQQe4oz8= google.golang.org/protobuf v0.0.0-20200221191635-4d8936d0db64/go.mod h1:kwYJMbMJ01Woi6D6+Kah6886xMZcty6N08ah7+eCXa0= google.golang.org/protobuf v0.0.0-20200228230310-ab0ca4ff8a60/go.mod h1:cfTl7dwQJ+fmap5saPgwCLgHXTUD7jkjRqWcaiX5VyM= @@ -585,8 +585,8 @@ google.golang.org/protobuf v1.22.0/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2 google.golang.org/protobuf v1.23.0/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU= google.golang.org/protobuf v1.23.1-0.20200526195155-81db48ad09cc/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU= google.golang.org/protobuf v1.25.0/go.mod h1:9JNX74DMeImyA3h4bdi1ymwjUzf21/xIlbajtzgsN7c= -google.golang.org/protobuf v1.34.2 h1:6xV6lTsCfpGD21XK49h7MhtcApnLqkfYgPcdHftf6hg= -google.golang.org/protobuf v1.34.2/go.mod h1:qYOHts0dSfpeUzUFpOMr/WGzszTmLH+DiWniOlNbLDw= +google.golang.org/protobuf v1.35.1 h1:m3LfL6/Ca+fqnjnlqQXNpFPABW1UD7mjh8KO2mKFytA= +google.golang.org/protobuf v1.35.1/go.mod h1:9fA7Ob0pmnwhb644+1+CVWFRbNajQ6iRojtC/QF5bRE= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk= diff --git a/internal/msggateway/hub_server.go b/internal/msggateway/hub_server.go index 23d915013..8eff32899 100644 --- a/internal/msggateway/hub_server.go +++ b/internal/msggateway/hub_server.go @@ -56,6 +56,7 @@ func (s *Server) Start(ctx context.Context, index int, conf *Config) error { } type Server struct { + msggateway.UnimplementedMsgGatewayServer rpcPort int LongConnServer LongConnServer config *Config diff --git a/internal/push/push.go b/internal/push/push.go index 850f91d22..7f14bced7 100644 --- a/internal/push/push.go +++ b/internal/push/push.go @@ -14,6 +14,7 @@ import ( ) type pushServer struct { + pbpush.UnimplementedPushMsgServiceServer database controller.PushDatabase disCov discovery.SvcDiscoveryRegistry offlinePusher offlinepush.OfflinePusher diff --git a/internal/push/push_handler.go b/internal/push/push_handler.go index b5bbed3ed..5bda21c77 100644 --- a/internal/push/push_handler.go +++ b/internal/push/push_handler.go @@ -414,6 +414,7 @@ func (c *ConsumerHandler) DeleteMemberAndSetConversationSeq(ctx context.Context, if err != nil { return err } + return c.conversationRpcClient.SetConversationMaxSeq(ctx, userIDs, conversationID, maxSeq) } diff --git a/internal/rpc/auth/auth.go b/internal/rpc/auth/auth.go index 66ceec739..949fbf5f9 100644 --- a/internal/rpc/auth/auth.go +++ b/internal/rpc/auth/auth.go @@ -40,6 +40,7 @@ import ( ) type authServer struct { + pbauth.UnimplementedAuthServer authDatabase controller.AuthDatabase userRpcClient *rpcclient.UserRpcClient RegisterCenter discovery.SvcDiscoveryRegistry @@ -196,7 +197,7 @@ func (s *authServer) forceKickOff(ctx context.Context, userID string, platformID } m, err := s.authDatabase.GetTokensWithoutError(ctx, userID, int(platformID)) - if err != nil && errors.Is(err, redis.Nil) { + if err != nil && !errors.Is(err, redis.Nil) { return err } for k := range m { @@ -214,7 +215,7 @@ func (s *authServer) forceKickOff(ctx context.Context, userID string, platformID func (s *authServer) InvalidateToken(ctx context.Context, req *pbauth.InvalidateTokenReq) (*pbauth.InvalidateTokenResp, error) { m, err := s.authDatabase.GetTokensWithoutError(ctx, req.UserID, int(req.PlatformID)) - if err != nil && errors.Is(err, redis.Nil) { + if err != nil && !errors.Is(err, redis.Nil) { return nil, err } if m == nil { diff --git a/internal/rpc/conversation/conversation.go b/internal/rpc/conversation/conversation.go index 0c8a6fd85..0b6b656a4 100644 --- a/internal/rpc/conversation/conversation.go +++ b/internal/rpc/conversation/conversation.go @@ -43,6 +43,7 @@ import ( ) type conversationServer struct { + pbconversation.UnimplementedConversationServer msgRpcClient *rpcclient.MessageRpcClient user *rpcclient.UserRpcClient groupRpcClient *rpcclient.GroupRpcClient @@ -440,6 +441,7 @@ func (c *conversationServer) SetConversationMaxSeq(ctx context.Context, req *pbc map[string]any{"max_seq": req.MaxSeq}); err != nil { return nil, err } + return &pbconversation.SetConversationMaxSeqResp{}, nil } @@ -669,7 +671,7 @@ func (c *conversationServer) GetOwnerConversation(ctx context.Context, req *pbco }, nil } -func (c *conversationServer) GetConversationsNeedDestructMsgs(ctx context.Context, _ *pbconversation.GetConversationsNeedDestructMsgsReq) (*pbconversation.GetConversationsNeedDestructMsgsResp, error) { +func (c *conversationServer) GetConversationsNeedClearMsg(ctx context.Context, _ *pbconversation.GetConversationsNeedClearMsgReq) (*pbconversation.GetConversationsNeedClearMsgResp, error) { num, err := c.conversationDatabase.GetAllConversationIDsNumber(ctx) if err != nil { log.ZError(ctx, "GetAllConversationIDsNumber failed", err) @@ -693,7 +695,7 @@ func (c *conversationServer) GetConversationsNeedDestructMsgs(ctx context.Contex conversationIDs, err := c.conversationDatabase.PageConversationIDs(ctx, pagination) if err != nil { - // log.ZError(ctx, "PageConversationIDs failed", err, "pageNumber", pageNumber) + log.ZError(ctx, "PageConversationIDs failed", err, "pageNumber", pageNumber) continue } @@ -716,7 +718,7 @@ func (c *conversationServer) GetConversationsNeedDestructMsgs(ctx context.Contex } } - return &pbconversation.GetConversationsNeedDestructMsgsResp{Conversations: convert.ConversationsDB2Pb(temp)}, nil + return &pbconversation.GetConversationsNeedClearMsgResp{Conversations: convert.ConversationsDB2Pb(temp)}, nil } func (c *conversationServer) GetNotNotifyConversationIDs(ctx context.Context, req *pbconversation.GetNotNotifyConversationIDsReq) (*pbconversation.GetNotNotifyConversationIDsResp, error) { diff --git a/internal/rpc/group/db_map.go b/internal/rpc/group/db_map.go index 26c9a4625..e99f9e772 100644 --- a/internal/rpc/group/db_map.go +++ b/internal/rpc/group/db_map.go @@ -110,7 +110,7 @@ func UpdateGroupMemberMap(req *pbgroup.SetGroupMemberInfo) map[string]any { m["nickname"] = req.Nickname.Value } if req.FaceURL != nil { - m["user_group_face_url"] = req.FaceURL.Value + m["face_url"] = req.FaceURL.Value } if req.RoleLevel != nil { m["role_level"] = req.RoleLevel.Value diff --git a/internal/rpc/group/group.go b/internal/rpc/group/group.go index b5ab1b209..ba0c8a42d 100644 --- a/internal/rpc/group/group.go +++ b/internal/rpc/group/group.go @@ -57,6 +57,7 @@ import ( ) type groupServer struct { + pbgroup.UnimplementedGroupServer db controller.GroupDatabase user rpcclient.UserRpcClient notification *GroupNotificationSender @@ -963,6 +964,7 @@ func (g *groupServer) deleteMemberAndSetConversationSeq(ctx context.Context, gro if err != nil { return err } + return g.conversationRpcClient.SetConversationMaxSeq(ctx, userIDs, conevrsationID, maxSeq) } diff --git a/internal/rpc/msg/as_read.go b/internal/rpc/msg/as_read.go index c9610969d..de1879438 100644 --- a/internal/rpc/msg/as_read.go +++ b/internal/rpc/msg/as_read.go @@ -137,7 +137,7 @@ func (m *msgServer) MarkConversationAsRead(ctx context.Context, req *msg.MarkCon return nil, err } hasReadSeq, err := m.MsgDatabase.GetHasReadSeq(ctx, req.UserID, req.ConversationID) - if err != nil && errors.Is(err, redis.Nil) { + if err != nil && !errors.Is(err, redis.Nil) { return nil, err } var seqs []int64 diff --git a/internal/rpc/msg/clear.go b/internal/rpc/msg/clear.go index c5bd36b44..ff732136a 100644 --- a/internal/rpc/msg/clear.go +++ b/internal/rpc/msg/clear.go @@ -12,13 +12,14 @@ import ( "github.com/openimsdk/tools/errs" "github.com/openimsdk/tools/log" "github.com/openimsdk/tools/mcontext" + "github.com/openimsdk/tools/utils/datautil" "github.com/openimsdk/tools/utils/idutil" "github.com/openimsdk/tools/utils/stringutil" "golang.org/x/sync/errgroup" ) // hard delete in Database. -func (m *msgServer) ClearMsg(ctx context.Context, req *msg.ClearMsgReq) (_ *msg.ClearMsgResp, err error) { +func (m *msgServer) DestructMsgs(ctx context.Context, req *msg.DestructMsgsReq) (_ *msg.DestructMsgsResp, err error) { if err := authverify.CheckAdmin(ctx, m.config.Share.IMAdminUserID); err != nil { return nil, err } @@ -26,18 +27,19 @@ func (m *msgServer) ClearMsg(ctx context.Context, req *msg.ClearMsgReq) (_ *msg. return nil, errs.ErrArgs.WrapMsg("request millisecond timestamp error") } var ( - docNum int - msgNum int - start = time.Now() + docNum int + msgNum int + start = time.Now() + getLimit = 5000 ) - clearMsg := func(ctx context.Context) (bool, error) { + destructMsg := func(ctx context.Context) (bool, error) { docIDs, err := m.MsgDatabase.GetDocIDs(ctx) if err != nil { return false, err } - msgs, err := m.MsgDatabase.GetBeforeMsg(ctx, req.Timestamp, docIDs, 5000) + msgs, err := m.MsgDatabase.GetBeforeMsg(ctx, req.Timestamp, docIDs, getLimit) if err != nil { return false, err } @@ -61,7 +63,7 @@ func (m *msgServer) ClearMsg(ctx context.Context, req *msg.ClearMsgReq) (_ *msg. return true, nil } - _, err = clearMsg(ctx) + _, err = destructMsg(ctx) if err != nil { log.ZError(ctx, "clear msg failed", err, "docNum", docNum, "msgNum", msgNum, "cost", time.Since(start)) return nil, err @@ -69,11 +71,11 @@ func (m *msgServer) ClearMsg(ctx context.Context, req *msg.ClearMsgReq) (_ *msg. log.ZDebug(ctx, "clearing message", "docNum", docNum, "msgNum", msgNum, "cost", time.Since(start)) - return &msg.ClearMsgResp{}, nil + return &msg.DestructMsgsResp{}, nil } -// soft delete for self -func (m *msgServer) DestructMsgs(ctx context.Context, req *msg.DestructMsgsReq) (_ *msg.DestructMsgsResp, err error) { +// soft delete for user self +func (m *msgServer) ClearMsg(ctx context.Context, req *msg.ClearMsgReq) (_ *msg.ClearMsgResp, err error) { temp := convert.ConversationsPb2DB(req.Conversations) batchNum := 100 @@ -93,22 +95,31 @@ func (m *msgServer) DestructMsgs(ctx context.Context, req *msg.DestructMsgsReq) "msgDestructTime", conversation.MsgDestructTime, "lastMsgDestructTime", conversation.LatestMsgDestructTime) - seqs, err := m.MsgDatabase.UserMsgsDestruct(handleCtx, conversation.OwnerUserID, conversation.ConversationID, conversation.MsgDestructTime, conversation.LatestMsgDestructTime) + seqs, err := m.MsgDatabase.ClearUserMsgs(handleCtx, conversation.OwnerUserID, conversation.ConversationID, conversation.MsgDestructTime, conversation.LatestMsgDestructTime) if err != nil { log.ZError(handleCtx, "user msg destruct failed", err, "conversationID", conversation.ConversationID, "ownerUserID", conversation.OwnerUserID) continue } if len(seqs) > 0 { + minseq := datautil.Max(seqs...) + + // update if err := m.Conversation.UpdateConversation(handleCtx, &pbconversation.UpdateConversationReq{ UserIDs: []string{conversation.OwnerUserID}, ConversationID: conversation.ConversationID, - LatestMsgDestructTime: wrapperspb.Int64(time.Now().UnixMilli())}); err != nil { + LatestMsgDestructTime: wrapperspb.Int64(time.Now().UnixMilli()), + MinSeq: wrapperspb.Int64(minseq), + }); err != nil { log.ZError(handleCtx, "updateUsersConversationField failed", err, "conversationID", conversation.ConversationID, "ownerUserID", conversation.OwnerUserID) continue } + if err := m.Conversation.SetConversationMinSeq(handleCtx, []string{conversation.OwnerUserID}, conversation.ConversationID, minseq); err != nil { + return err + } + // if you need Notify SDK client userseq is update. // m.msgNotificationSender.UserDeleteMsgsNotification(handleCtx, conversation.OwnerUserID, conversation.ConversationID, seqs) } diff --git a/internal/rpc/msg/server.go b/internal/rpc/msg/server.go index bf8781747..6d5922ce3 100644 --- a/internal/rpc/msg/server.go +++ b/internal/rpc/msg/server.go @@ -55,6 +55,7 @@ type ( msgNotificationSender *MsgNotificationSender // RPC client for sending msg notifications. config *Config // Global configuration settings. webhookClient *webhook.Client + msg.UnimplementedMsgServer } Config struct { diff --git a/internal/rpc/relation/friend.go b/internal/rpc/relation/friend.go index 2f4843a8e..036c7aff5 100644 --- a/internal/rpc/relation/friend.go +++ b/internal/rpc/relation/friend.go @@ -43,6 +43,7 @@ import ( ) type friendServer struct { + relation.UnimplementedFriendServer db controller.FriendDatabase blackDatabase controller.BlackDatabase userRpcClient *rpcclient.UserRpcClient diff --git a/internal/rpc/third/s3.go b/internal/rpc/third/s3.go index fb6a1157e..b4b064d7f 100644 --- a/internal/rpc/third/s3.go +++ b/internal/rpc/third/s3.go @@ -290,48 +290,85 @@ func (t *thirdServer) apiAddress(prefix, name string) string { func (t *thirdServer) DeleteOutdatedData(ctx context.Context, req *third.DeleteOutdatedDataReq) (*third.DeleteOutdatedDataResp, error) { var conf config.Third expireTime := time.UnixMilli(req.ExpireTime) - var deltotal int + findPagination := &sdkws.RequestPagination{ PageNumber: 1, - ShowNumber: 1000, + ShowNumber: 500, + } + + // Find all expired data in S3 database + total, models, err := t.s3dataBase.FindNeedDeleteObjectByDB(ctx, expireTime, req.ObjectGroup, findPagination) + if err != nil && errs.Unwrap(err) != mongo.ErrNoDocuments { + return nil, errs.Wrap(err) + } + + if total == 0 { + log.ZDebug(ctx, "Not have OutdatedData", "delete Total", total) + return &third.DeleteOutdatedDataResp{Count: int32(total)}, nil } - for { - total, models, err := t.s3dataBase.FindByExpires(ctx, expireTime, findPagination) + + needDelObjectKeys := make([]string, len(models)) + for _, model := range models { + needDelObjectKeys = append(needDelObjectKeys, model.Key) + } + + // Remove duplicate keys, have the same key use in different models + needDelObjectKeys = datautil.Distinct(needDelObjectKeys) + + for _, key := range needDelObjectKeys { + // Find all models by key + keyModels, err := t.s3dataBase.FindModelsByKey(ctx, key) if err != nil && errs.Unwrap(err) != mongo.ErrNoDocuments { return nil, errs.Wrap(err) } - needDelObjectKeys := make([]string, 0) - for _, model := range models { - needDelObjectKeys = append(needDelObjectKeys, model.Key) + + // check keyModels, if all keyModels. + needDelKey := true // Default can delete + for _, keymodel := range keyModels { + // If group is empty or CreateTime is after expireTime, can't delete this key + if keymodel.Group == "" || keymodel.CreateTime.After(expireTime) { + needDelKey = false + break + } } - needDelObjectKeys = datautil.Distinct(needDelObjectKeys) - for _, key := range needDelObjectKeys { - count, err := t.s3dataBase.FindNotDelByS3(ctx, key, expireTime) - if err != nil && errs.Unwrap(err) != mongo.ErrNoDocuments { - return nil, errs.Wrap(err) + // If this object is not referenced by not expire data, delete it + if needDelKey && t.minio != nil { + // If have a thumbnail, delete it + thumbnailKey, _ := t.getMinioImageThumbnailKey(ctx, key) + if thumbnailKey != "" { + err := t.s3dataBase.DeleteObject(ctx, thumbnailKey) + if err != nil { + log.ZWarn(ctx, "Delete thumbnail object is error:", errs.Wrap(err), "thumbnailKey", thumbnailKey) + } } - if int(count) < 1 && t.minio != nil { - thumbnailKey, _ := t.getMinioImageThumbnailKey(ctx, key) - t.s3dataBase.DeleteObject(ctx, thumbnailKey) - t.s3dataBase.DelS3Key(ctx, conf.Object.Enable, needDelObjectKeys...) - t.s3dataBase.DeleteObject(ctx, key) + // Delete object + err = t.s3dataBase.DeleteObject(ctx, key) + if err != nil { + log.ZWarn(ctx, "Delete object is error", errs.Wrap(err), "object key", key) } - } - for _, model := range models { - err := t.s3dataBase.DeleteSpecifiedData(ctx, model.Engine, model.Name) + + // Delete cache key + err = t.s3dataBase.DelS3Key(ctx, conf.Object.Enable, key) if err != nil { - return nil, errs.Wrap(err) + log.ZWarn(ctx, "Delete cache key is error:", errs.Wrap(err), "cache S3 key:", key) } } - if total < int64(findPagination.ShowNumber) { - break + } + + // handle delete data in S3 database + for _, model := range models { + // Delete all expired data row in S3 database + err := t.s3dataBase.DeleteSpecifiedData(ctx, model.Engine, model.Name) + if err != nil { + return nil, errs.Wrap(err) } - deltotal += int(total) } - log.ZDebug(ctx, "DeleteOutdatedData", "delete Total", deltotal) - return &third.DeleteOutdatedDataResp{}, nil + + log.ZDebug(ctx, "DeleteOutdatedData", "delete Total", total) + + return &third.DeleteOutdatedDataResp{Count: int32(total)}, nil } type FormDataMate struct { diff --git a/internal/rpc/third/third.go b/internal/rpc/third/third.go index d37689b31..4206a2d6f 100644 --- a/internal/rpc/third/third.go +++ b/internal/rpc/third/third.go @@ -38,6 +38,7 @@ import ( ) type thirdServer struct { + third.UnimplementedThirdServer thirdDatabase controller.ThirdDatabase s3dataBase controller.S3Database userRpcClient rpcclient.UserRpcClient diff --git a/internal/rpc/user/user.go b/internal/rpc/user/user.go index b47c516d9..2dfbb01df 100644 --- a/internal/rpc/user/user.go +++ b/internal/rpc/user/user.go @@ -52,6 +52,7 @@ import ( ) type userServer struct { + pbuser.UnimplementedUserServer online cache.OnlineCache db controller.UserDatabase friendNotificationSender *relation.FriendNotificationSender diff --git a/internal/tools/cron_task.go b/internal/tools/cron_task.go index dbb4e34f6..4f87036c7 100644 --- a/internal/tools/cron_task.go +++ b/internal/tools/cron_task.go @@ -24,6 +24,7 @@ import ( kdisc "github.com/openimsdk/open-im-server/v3/pkg/common/discoveryregister" pbconversation "github.com/openimsdk/protocol/conversation" "github.com/openimsdk/protocol/msg" + "github.com/openimsdk/protocol/third" "github.com/openimsdk/tools/mcontext" "github.com/openimsdk/tools/mw" @@ -58,10 +59,10 @@ func Start(ctx context.Context, config *CronTaskConfig) error { return err } - // thirdConn, err := client.GetConn(ctx, config.Share.RpcRegisterName.Third) - // if err != nil { - // return err - // } + thirdConn, err := client.GetConn(ctx, config.Share.RpcRegisterName.Third) + if err != nil { + return err + } conversationConn, err := client.GetConn(ctx, config.Share.RpcRegisterName.Conversation) if err != nil { @@ -70,66 +71,83 @@ func Start(ctx context.Context, config *CronTaskConfig) error { msgClient := msg.NewMsgClient(msgConn) conversationClient := pbconversation.NewConversationClient(conversationConn) - // thirdClient := third.NewThirdClient(thirdConn) + thirdClient := third.NewThirdClient(thirdConn) crontab := cron.New() // scheduled hard delete outdated Msgs in specific time. - clearMsgFunc := func() { + destructMsgsFunc := func() { now := time.Now() deltime := now.Add(-time.Hour * 24 * time.Duration(config.CronTask.RetainChatRecords)) ctx := mcontext.SetOperationID(ctx, fmt.Sprintf("cron_%d_%d", os.Getpid(), deltime.UnixMilli())) - log.ZDebug(ctx, "clear chat records", "deltime", deltime, "timestamp", deltime.UnixMilli()) + log.ZDebug(ctx, "Destruct chat records", "deltime", deltime, "timestamp", deltime.UnixMilli()) - if _, err := msgClient.ClearMsg(ctx, &msg.ClearMsgReq{Timestamp: deltime.UnixMilli()}); err != nil { - log.ZError(ctx, "cron clear chat records failed", err, "deltime", deltime, "cont", time.Since(now)) + if _, err := msgClient.DestructMsgs(ctx, &msg.DestructMsgsReq{Timestamp: deltime.UnixMilli()}); err != nil { + log.ZError(ctx, "cron destruct chat records failed", err, "deltime", deltime, "cont", time.Since(now)) return } - log.ZDebug(ctx, "cron clear chat records success", "deltime", deltime, "cont", time.Since(now)) + log.ZDebug(ctx, "cron destruct chat records success", "deltime", deltime, "cont", time.Since(now)) } - if _, err := crontab.AddFunc(config.CronTask.CronExecuteTime, clearMsgFunc); err != nil { + if _, err := crontab.AddFunc(config.CronTask.CronExecuteTime, destructMsgsFunc); err != nil { return errs.Wrap(err) } // scheduled soft delete outdated Msgs in specific time when user set `is_msg_destruct` feature. - msgDestructFunc := func() { + clearMsgFunc := func() { now := time.Now() ctx := mcontext.SetOperationID(ctx, fmt.Sprintf("cron_%d_%d", os.Getpid(), now.UnixMilli())) - log.ZDebug(ctx, "msg destruct cron start", "now", now) + log.ZDebug(ctx, "clear msg cron start", "now", now) - conversations, err := conversationClient.GetConversationsNeedDestructMsgs(ctx, &pbconversation.GetConversationsNeedDestructMsgsReq{}) + conversations, err := conversationClient.GetConversationsNeedClearMsg(ctx, &pbconversation.GetConversationsNeedClearMsgReq{}) if err != nil { log.ZError(ctx, "Get conversation need Destruct msgs failed.", err) return - } else { - _, err := msgClient.DestructMsgs(ctx, &msg.DestructMsgsReq{Conversations: conversations.Conversations}) + } + + _, err = msgClient.ClearMsg(ctx, &msg.ClearMsgReq{Conversations: conversations.Conversations}) + if err != nil { + log.ZError(ctx, "Clear Msg failed.", err) + return + } + + log.ZDebug(ctx, "clear msg cron task completed", "cont", time.Since(now)) + } + if _, err := crontab.AddFunc(config.CronTask.CronExecuteTime, clearMsgFunc); err != nil { + return errs.Wrap(err) + } + + // scheduled delete outdated file Objects and their datas in specific time. + deleteObjectFunc := func() { + now := time.Now() + executeNum := 5 + // number of pagination. if need modify, need update value in third.DeleteOutdatedData + pageShowNumber := 500 + deleteTime := now.Add(-time.Hour * 24 * time.Duration(config.CronTask.FileExpireTime)) + ctx := mcontext.SetOperationID(ctx, fmt.Sprintf("cron_%d_%d", os.Getpid(), deleteTime.UnixMilli())) + log.ZDebug(ctx, "deleteoutDatedData", "deletetime", deleteTime, "timestamp", deleteTime.UnixMilli()) + + if len(config.CronTask.DeleteObjectType) == 0 { + log.ZDebug(ctx, "cron deleteoutDatedData not type need delete", "deletetime", deleteTime, "DeleteObjectType", config.CronTask.DeleteObjectType, "cont", time.Since(now)) + return + } + + for i := 0; i < executeNum; i++ { + resp, err := thirdClient.DeleteOutdatedData(ctx, &third.DeleteOutdatedDataReq{ExpireTime: deleteTime.UnixMilli(), ObjectGroup: config.CronTask.DeleteObjectType}) if err != nil { - log.ZError(ctx, "Destruct Msgs failed.", err) + log.ZError(ctx, "cron deleteoutDatedData failed", err, "deleteTime", deleteTime, "cont", time.Since(now)) return } + if resp.Count == 0 || resp.Count < int32(pageShowNumber) { + break + } } - log.ZDebug(ctx, "msg destruct cron task completed", "cont", time.Since(now)) + + log.ZDebug(ctx, "cron deleteoutDatedData success", "deltime", deleteTime, "cont", time.Since(now)) } - if _, err := crontab.AddFunc(config.CronTask.CronExecuteTime, msgDestructFunc); err != nil { + if _, err := crontab.AddFunc(config.CronTask.CronExecuteTime, deleteObjectFunc); err != nil { return errs.Wrap(err) } - // // scheduled delete outdated file Objects and their datas in specific time. - // deleteObjectFunc := func() { - // now := time.Now() - // deleteTime := now.Add(-time.Hour * 24 * time.Duration(config.CronTask.FileExpireTime)) - // ctx := mcontext.SetOperationID(ctx, fmt.Sprintf("cron_%d_%d", os.Getpid(), deleteTime.UnixMilli())) - // log.ZDebug(ctx, "deleteoutDatedData ", "deletetime", deleteTime, "timestamp", deleteTime.UnixMilli()) - // if _, err := thirdClient.DeleteOutdatedData(ctx, &third.DeleteOutdatedDataReq{ExpireTime: deleteTime.UnixMilli()}); err != nil { - // log.ZError(ctx, "cron deleteoutDatedData failed", err, "deleteTime", deleteTime, "cont", time.Since(now)) - // return - // } - // log.ZDebug(ctx, "cron deleteoutDatedData success", "deltime", deleteTime, "cont", time.Since(now)) - // } - // if _, err := crontab.AddFunc(config.CronTask.CronExecuteTime, deleteObjectFunc); err != nil { - // return errs.Wrap(err) - // } - log.ZDebug(ctx, "start cron task", "CronExecuteTime", config.CronTask.CronExecuteTime) crontab.Start() <-ctx.Done() diff --git a/pkg/common/config/config.go b/pkg/common/config/config.go index 468a150e8..fa571e9f2 100644 --- a/pkg/common/config/config.go +++ b/pkg/common/config/config.go @@ -113,9 +113,10 @@ type API struct { } type CronTask struct { - CronExecuteTime string `mapstructure:"cronExecuteTime"` - RetainChatRecords int `mapstructure:"retainChatRecords"` - FileExpireTime int `mapstructure:"fileExpireTime"` + CronExecuteTime string `mapstructure:"cronExecuteTime"` + RetainChatRecords int `mapstructure:"retainChatRecords"` + FileExpireTime int `mapstructure:"fileExpireTime"` + DeleteObjectType []string `mapstructure:"deleteObjectType"` } type OfflinePushConfig struct { diff --git a/pkg/common/storage/controller/conversation.go b/pkg/common/storage/controller/conversation.go index f0b7d70db..bf41cce95 100644 --- a/pkg/common/storage/controller/conversation.go +++ b/pkg/common/storage/controller/conversation.go @@ -179,7 +179,7 @@ func (c *conversationDatabase) CreateConversation(ctx context.Context, conversat if conversation.RecvMsgOpt == constant.ReceiveNotNotifyMessage { notNotifyUserIDs = append(notNotifyUserIDs, conversation.OwnerUserID) } - if conversation.IsPinned == true { + if conversation.IsPinned { pinnedUserIDs = append(pinnedUserIDs, conversation.OwnerUserID) } } diff --git a/pkg/common/storage/controller/msg.go b/pkg/common/storage/controller/msg.go index 59718e7b9..85b797c40 100644 --- a/pkg/common/storage/controller/msg.go +++ b/pkg/common/storage/controller/msg.go @@ -57,8 +57,8 @@ type CommonMsgDatabase interface { // DeleteConversationMsgsAndSetMinSeq deletes conversation messages and resets the minimum sequence number. If `remainTime` is 0, all messages are deleted (this method does not delete Redis // cache). DeleteConversationMsgsAndSetMinSeq(ctx context.Context, conversationID string, remainTime int64) error - // UserMsgsDestruct marks messages for deletion based on destruct time and returns a list of sequence numbers for marked messages. - UserMsgsDestruct(ctx context.Context, userID string, conversationID string, destructTime int64, lastMsgDestructTime time.Time) (seqs []int64, err error) + // ClearUserMsgs marks messages for deletion based on clear time and returns a list of sequence numbers for marked messages. + ClearUserMsgs(ctx context.Context, userID string, conversationID string, clearTime int64, lastMsgClearTime time.Time) (seqs []int64, err error) // DeleteUserMsgsBySeqs allows a user to delete messages based on sequence numbers. DeleteUserMsgsBySeqs(ctx context.Context, userID string, conversationID string, seqs []int64) error // DeleteMsgsPhysicalBySeqs physically deletes messages by emptying them based on sequence numbers. @@ -92,7 +92,7 @@ type CommonMsgDatabase interface { RangeGroupSendCount(ctx context.Context, start time.Time, end time.Time, ase bool, pageNumber int32, showNumber int32) (msgCount int64, userCount int64, groups []*model.GroupCount, dateCount map[string]int64, err error) ConvertMsgsDocLen(ctx context.Context, conversationIDs []string) - // clear msg + // get Msg when destruct msg before GetBeforeMsg(ctx context.Context, ts int64, docIds []string, limit int) ([]*model.MsgDocModel, error) DeleteDocMsgBefore(ctx context.Context, ts int64, doc *model.MsgDocModel) ([]int, error) @@ -490,8 +490,8 @@ func (db *commonMsgDatabase) GetMsgBySeqs(ctx context.Context, userID string, co } successMsgs, failedSeqs, err := db.msg.GetMessagesBySeq(ctx, conversationID, newSeqs) if err != nil { - if errors.Is(err, redis.Nil) { - log.ZError(ctx, "get message from redis exception", err, "failedSeqs", failedSeqs, "conversationID", conversationID) + if !errors.Is(err, redis.Nil) { + log.ZWarn(ctx, "get message from redis exception", err, "failedSeqs", failedSeqs, "conversationID", conversationID) } } log.ZDebug(ctx, "db.seq.GetMessagesBySeq", "userID", userID, "conversationID", conversationID, "seqs", @@ -528,10 +528,10 @@ func (db *commonMsgDatabase) DeleteConversationMsgsAndSetMinSeq(ctx context.Cont return db.seqConversation.SetMinSeq(ctx, conversationID, minSeq) } -func (db *commonMsgDatabase) UserMsgsDestruct(ctx context.Context, userID string, conversationID string, destructTime int64, lastMsgDestructTime time.Time) (seqs []int64, err error) { +func (db *commonMsgDatabase) ClearUserMsgs(ctx context.Context, userID string, conversationID string, clearTime int64, lastMsgClearTime time.Time) (seqs []int64, err error) { var index int64 for { - // from oldest 2 newest + // from oldest 2 newest, ASC msgDocModel, err := db.msgDocDatabase.GetMsgDocModelByIndex(ctx, conversationID, index, 1) if err != nil || msgDocModel.DocID == "" { if err != nil { @@ -544,15 +544,19 @@ func (db *commonMsgDatabase) UserMsgsDestruct(ctx context.Context, userID string // If an error is reported, or the error cannot be obtained, it is physically deleted and seq delMongoMsgsPhysical(delStruct.delDocIDList) is returned to end the recursion break } + index++ - // && msgDocModel.Msg[0].Msg.SendTime > lastMsgDestructTime.UnixMilli() + + // && msgDocModel.Msg[0].Msg.SendTime > lastMsgClearTime.UnixMilli() if len(msgDocModel.Msg) > 0 { i := 0 var over bool for _, msg := range msgDocModel.Msg { i++ - if msg != nil && msg.Msg != nil && msg.Msg.SendTime+destructTime*1000 <= time.Now().UnixMilli() { - if msg.Msg.SendTime+destructTime*1000 > lastMsgDestructTime.UnixMilli() && !datautil.Contain(userID, msg.DelList...) { + // over clear time, need to clear + if msg != nil && msg.Msg != nil && msg.Msg.SendTime+clearTime*1000 <= time.Now().UnixMilli() { + // if msg is not in del list, add to del list + if msg.Msg.SendTime+clearTime*1000 > lastMsgClearTime.UnixMilli() && !datautil.Contain(userID, msg.DelList...) { seqs = append(seqs, msg.Msg.Seq) } } else { @@ -567,13 +571,18 @@ func (db *commonMsgDatabase) UserMsgsDestruct(ctx context.Context, userID string } } - log.ZDebug(ctx, "UserMsgsDestruct", "conversationID", conversationID, "userID", userID, "seqs", seqs) + log.ZDebug(ctx, "ClearUserMsgs", "conversationID", conversationID, "userID", userID, "seqs", seqs) + + // have msg need to destruct if len(seqs) > 0 { - userMinSeq := seqs[len(seqs)-1] + 1 - currentUserMinSeq, err := db.seqUser.GetUserMinSeq(ctx, conversationID, userID) + // update min seq to clear after + userMinSeq := seqs[len(seqs)-1] + 1 // user min seq when clear after + currentUserMinSeq, err := db.seqUser.GetUserMinSeq(ctx, conversationID, userID) // user min seq when clear before if err != nil { return nil, err } + + // if before < after, update min seq if currentUserMinSeq < userMinSeq { if err := db.seqUser.SetUserMinSeq(ctx, conversationID, userID, userMinSeq); err != nil { return nil, err diff --git a/pkg/common/storage/controller/s3.go b/pkg/common/storage/controller/s3.go index 9b56661a5..4e5ad18b6 100644 --- a/pkg/common/storage/controller/s3.go +++ b/pkg/common/storage/controller/s3.go @@ -40,10 +40,10 @@ type S3Database interface { SetObject(ctx context.Context, info *model.Object) error StatObject(ctx context.Context, name string) (*s3.ObjectInfo, error) FormData(ctx context.Context, name string, size int64, contentType string, duration time.Duration) (*s3.FormData, error) - FindByExpires(ctx context.Context, duration time.Time, pagination pagination.Pagination) (total int64, objects []*model.Object, err error) + FindNeedDeleteObjectByDB(ctx context.Context, duration time.Time, needDelType []string, pagination pagination.Pagination) (total int64, objects []*model.Object, err error) DeleteObject(ctx context.Context, name string) error DeleteSpecifiedData(ctx context.Context, engine string, name string) error - FindNotDelByS3(ctx context.Context, key string, duration time.Time) (int64, error) + FindModelsByKey(ctx context.Context, key string) (objects []*model.Object, err error) DelS3Key(ctx context.Context, engine string, keys ...string) error } @@ -120,9 +120,8 @@ func (s *s3Database) StatObject(ctx context.Context, name string) (*s3.ObjectInf func (s *s3Database) FormData(ctx context.Context, name string, size int64, contentType string, duration time.Duration) (*s3.FormData, error) { return s.s3.FormData(ctx, name, size, contentType, duration) } -func (s *s3Database) FindByExpires(ctx context.Context, duration time.Time, pagination pagination.Pagination) (total int64, objects []*model.Object, err error) { - - return s.db.FindByExpires(ctx, duration, pagination) +func (s *s3Database) FindNeedDeleteObjectByDB(ctx context.Context, duration time.Time, needDelType []string, pagination pagination.Pagination) (total int64, objects []*model.Object, err error) { + return s.db.FindNeedDeleteObjectByDB(ctx, duration, needDelType, pagination) } func (s *s3Database) DeleteObject(ctx context.Context, name string) error { @@ -132,8 +131,8 @@ func (s *s3Database) DeleteSpecifiedData(ctx context.Context, engine string, nam return s.db.Delete(ctx, engine, name) } -func (s *s3Database) FindNotDelByS3(ctx context.Context, key string, duration time.Time) (int64, error) { - return s.db.FindNotDelByS3(ctx, key, duration) +func (s *s3Database) FindModelsByKey(ctx context.Context, key string) (objects []*model.Object, err error) { + return s.db.FindModelsByKey(ctx, key) } func (s *s3Database) DelS3Key(ctx context.Context, engine string, keys ...string) error { diff --git a/pkg/common/storage/database/conversation.go b/pkg/common/storage/database/conversation.go index 5a9b19035..30ca01ee7 100644 --- a/pkg/common/storage/database/conversation.go +++ b/pkg/common/storage/database/conversation.go @@ -16,6 +16,7 @@ package database import ( "context" + "github.com/openimsdk/open-im-server/v3/pkg/common/storage/model" "github.com/openimsdk/tools/db/pagination" ) diff --git a/pkg/common/storage/database/mgo/conversation.go b/pkg/common/storage/database/mgo/conversation.go index f7ced1c2c..10e223c89 100644 --- a/pkg/common/storage/database/mgo/conversation.go +++ b/pkg/common/storage/database/mgo/conversation.go @@ -16,9 +16,10 @@ package mgo import ( "context" + "time" + "github.com/openimsdk/open-im-server/v3/pkg/common/storage/database" "github.com/openimsdk/open-im-server/v3/pkg/common/storage/model" - "time" "github.com/openimsdk/protocol/constant" "github.com/openimsdk/tools/db/mongoutil" diff --git a/pkg/common/storage/database/mgo/group_member.go b/pkg/common/storage/database/mgo/group_member.go index 2fdf2003b..510a049d0 100644 --- a/pkg/common/storage/database/mgo/group_member.go +++ b/pkg/common/storage/database/mgo/group_member.go @@ -60,7 +60,7 @@ type GroupMemberMgo struct { } func (g *GroupMemberMgo) memberSort() any { - return bson.D{{"role_level", -1}, {"create_time", 1}} + return bson.D{{Key: "role_level", Value: -1}, {Key: "create_time", Value: 1}} } func (g *GroupMemberMgo) Create(ctx context.Context, groupMembers []*model.GroupMember) (err error) { diff --git a/pkg/common/storage/database/mgo/object.go b/pkg/common/storage/database/mgo/object.go index 4242fbb53..5bc329d33 100644 --- a/pkg/common/storage/database/mgo/object.go +++ b/pkg/common/storage/database/mgo/object.go @@ -31,6 +31,8 @@ import ( func NewS3Mongo(db *mongo.Database) (database.ObjectInfo, error) { coll := db.Collection(database.ObjectName) + + // Create index for name _, err := coll.Indexes().CreateOne(context.Background(), mongo.IndexModel{ Keys: bson.D{ {Key: "name", Value: 1}, @@ -40,6 +42,27 @@ func NewS3Mongo(db *mongo.Database) (database.ObjectInfo, error) { if err != nil { return nil, errs.Wrap(err) } + + // Create index for create_time + _, err = coll.Indexes().CreateOne(context.Background(), mongo.IndexModel{ + Keys: bson.D{ + {Key: "create_time", Value: 1}, + }, + }) + if err != nil { + return nil, errs.Wrap(err) + } + + // Create index for key + _, err = coll.Indexes().CreateOne(context.Background(), mongo.IndexModel{ + Keys: bson.D{ + {Key: "key", Value: 1}, + }, + }) + if err != nil { + return nil, errs.Wrap(err) + } + return &S3Mongo{coll: coll}, nil } @@ -71,14 +94,18 @@ func (o *S3Mongo) Take(ctx context.Context, engine string, name string) (*model. func (o *S3Mongo) Delete(ctx context.Context, engine string, name string) error { return mongoutil.DeleteOne(ctx, o.coll, bson.M{"name": name, "engine": engine}) } -func (o *S3Mongo) FindByExpires(ctx context.Context, duration time.Time, pagination pagination.Pagination) (total int64, objects []*model.Object, err error) { + +// Find Expires object +func (o *S3Mongo) FindNeedDeleteObjectByDB(ctx context.Context, duration time.Time, needDelType []string, pagination pagination.Pagination) (total int64, objects []*model.Object, err error) { return mongoutil.FindPage[*model.Object](ctx, o.coll, bson.M{ "create_time": bson.M{"$lt": duration}, + "group": bson.M{"$in": needDelType}, }, pagination) } -func (o *S3Mongo) FindNotDelByS3(ctx context.Context, key string, duration time.Time) (int64, error) { - return mongoutil.Count(ctx, o.coll, bson.M{ - "key": key, - "create_time": bson.M{"$gt": duration}, + +// Find object by key +func (o *S3Mongo) FindModelsByKey(ctx context.Context, key string) (objects []*model.Object, err error) { + return mongoutil.Find[*model.Object](ctx, o.coll, bson.M{ + "key": key, }) } diff --git a/pkg/common/storage/database/object.go b/pkg/common/storage/database/object.go index 8292006a0..c741e39a6 100644 --- a/pkg/common/storage/database/object.go +++ b/pkg/common/storage/database/object.go @@ -26,6 +26,6 @@ type ObjectInfo interface { SetObject(ctx context.Context, obj *model.Object) error Take(ctx context.Context, engine string, name string) (*model.Object, error) Delete(ctx context.Context, engine string, name string) error - FindByExpires(ctx context.Context, duration time.Time, pagination pagination.Pagination) (total int64, objects []*model.Object, err error) - FindNotDelByS3(ctx context.Context, key string, duration time.Time) (int64, error) + FindNeedDeleteObjectByDB(ctx context.Context, duration time.Time, needDelType []string, pagination pagination.Pagination) (total int64, objects []*model.Object, err error) + FindModelsByKey(ctx context.Context, key string) (objects []*model.Object, err error) } diff --git a/pkg/rpcclient/conversation.go b/pkg/rpcclient/conversation.go index ccca85619..c69d355d6 100644 --- a/pkg/rpcclient/conversation.go +++ b/pkg/rpcclient/conversation.go @@ -152,8 +152,8 @@ func (c *ConversationRpcClient) GetConversationNotReceiveMessageUserIDs(ctx cont return resp.UserIDs, nil } -func (c *ConversationRpcClient) GetConversationsNeedDestructMsgs(ctx context.Context) ([]*pbconversation.Conversation, error) { - resp, err := c.Client.GetConversationsNeedDestructMsgs(ctx, &pbconversation.GetConversationsNeedDestructMsgsReq{}) +func (c *ConversationRpcClient) GetConversationsNeedClearMsg(ctx context.Context) ([]*pbconversation.Conversation, error) { + resp, err := c.Client.GetConversationsNeedClearMsg(ctx, &pbconversation.GetConversationsNeedClearMsgReq{}) if err != nil { return nil, err } diff --git a/pkg/rpcclient/msg.go b/pkg/rpcclient/msg.go index 9b26a7abd..8313937cd 100644 --- a/pkg/rpcclient/msg.go +++ b/pkg/rpcclient/msg.go @@ -244,8 +244,8 @@ func (m *MessageRpcClient) GetConversationMaxSeq(ctx context.Context, conversati return resp.MaxSeq, nil } -func (m *MessageRpcClient) ClearMsg(ctx context.Context, ts int64) error { - _, err := m.Client.ClearMsg(ctx, &msg.ClearMsgReq{Timestamp: ts}) +func (m *MessageRpcClient) DestructMsgs(ctx context.Context, ts int64) error { + _, err := m.Client.DestructMsgs(ctx, &msg.DestructMsgsReq{Timestamp: ts}) return err }