Merge: main

pull/2914/head
icey-yu 10 months ago
commit 913d4c5f39

@ -8,7 +8,7 @@ ENV SERVER_DIR=/openim-server
WORKDIR $SERVER_DIR WORKDIR $SERVER_DIR
# Set the Go proxy to improve dependency resolution speed # Set the Go proxy to improve dependency resolution speed
ENV GOPROXY=https://goproxy.io,direct # ENV GOPROXY=https://goproxy.io,direct
# Copy all files from the current directory into the container # Copy all files from the current directory into the container
COPY . . COPY . .

@ -1,3 +1,4 @@
cronExecuteTime: 0 2 * * * cronExecuteTime: 0 2 * * *
retainChatRecords: 365 retainChatRecords: 365
fileExpireTime: 90 fileExpireTime: 180
deleteObjectType: ["msg-picture","msg-file", "msg-voice","msg-video","msg-video-snapshot","sdklog"]

@ -1,6 +1,6 @@
module github.com/openimsdk/open-im-server/v3 module github.com/openimsdk/open-im-server/v3
go 1.22.0 go 1.22.7
toolchain go1.23.2 toolchain go1.23.2
@ -14,15 +14,15 @@ require (
github.com/gorilla/websocket v1.5.1 github.com/gorilla/websocket v1.5.1
github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0 github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0
github.com/mitchellh/mapstructure v1.5.0 github.com/mitchellh/mapstructure v1.5.0
github.com/openimsdk/protocol v0.0.72-alpha.55 github.com/openimsdk/protocol v0.0.72-alpha.59
github.com/openimsdk/tools v0.0.50-alpha.39 github.com/openimsdk/tools v0.0.50-alpha.39
github.com/pkg/errors v0.9.1 // indirect github.com/pkg/errors v0.9.1 // indirect
github.com/prometheus/client_golang v1.18.0 github.com/prometheus/client_golang v1.18.0
github.com/stretchr/testify v1.9.0 github.com/stretchr/testify v1.9.0
go.mongodb.org/mongo-driver v1.14.0 go.mongodb.org/mongo-driver v1.14.0
google.golang.org/api v0.170.0 google.golang.org/api v0.170.0
google.golang.org/grpc v1.66.2 google.golang.org/grpc v1.68.0
google.golang.org/protobuf v1.34.2 google.golang.org/protobuf v1.35.1
gopkg.in/yaml.v3 v3.0.1 gopkg.in/yaml.v3 v3.0.1
) )
@ -50,7 +50,7 @@ require (
require ( require (
cloud.google.com/go v0.112.1 // indirect cloud.google.com/go v0.112.1 // indirect
cloud.google.com/go/compute/metadata v0.3.0 // indirect cloud.google.com/go/compute/metadata v0.5.0 // indirect
cloud.google.com/go/firestore v1.15.0 // indirect cloud.google.com/go/firestore v1.15.0 // indirect
cloud.google.com/go/iam v1.1.7 // indirect cloud.google.com/go/iam v1.1.7 // indirect
cloud.google.com/go/longrunning v0.5.5 // indirect cloud.google.com/go/longrunning v0.5.5 // indirect
@ -177,13 +177,13 @@ require (
golang.org/x/arch v0.7.0 // indirect golang.org/x/arch v0.7.0 // indirect
golang.org/x/image v0.15.0 // indirect golang.org/x/image v0.15.0 // indirect
golang.org/x/net v0.29.0 // indirect golang.org/x/net v0.29.0 // indirect
golang.org/x/oauth2 v0.21.0 // indirect golang.org/x/oauth2 v0.23.0 // indirect
golang.org/x/sys v0.25.0 // indirect golang.org/x/sys v0.25.0 // indirect
golang.org/x/text v0.18.0 // indirect golang.org/x/text v0.18.0 // indirect
golang.org/x/time v0.5.0 // indirect golang.org/x/time v0.5.0 // indirect
google.golang.org/appengine/v2 v2.0.2 // indirect google.golang.org/appengine/v2 v2.0.2 // indirect
google.golang.org/genproto v0.0.0-20240213162025-012b6fc9bca9 // indirect google.golang.org/genproto v0.0.0-20240213162025-012b6fc9bca9 // indirect
google.golang.org/genproto/googleapis/api v0.0.0-20240604185151-ef581f913117 // indirect google.golang.org/genproto/googleapis/api v0.0.0-20240903143218-8af14fe29dc1 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20240903143218-8af14fe29dc1 // indirect google.golang.org/genproto/googleapis/rpc v0.0.0-20240903143218-8af14fe29dc1 // indirect
gorm.io/gorm v1.25.8 // indirect gorm.io/gorm v1.25.8 // indirect
stathat.com/c/consistent v1.0.0 // indirect stathat.com/c/consistent v1.0.0 // indirect

@ -1,8 +1,8 @@
cloud.google.com/go v0.26.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw= cloud.google.com/go v0.26.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw=
cloud.google.com/go v0.112.1 h1:uJSeirPke5UNZHIb4SxfZklVSiWWVqW4oXlETwZziwM= cloud.google.com/go v0.112.1 h1:uJSeirPke5UNZHIb4SxfZklVSiWWVqW4oXlETwZziwM=
cloud.google.com/go v0.112.1/go.mod h1:+Vbu+Y1UU+I1rjmzeMOb/8RfkKJK2Gyxi1X6jJCZLo4= cloud.google.com/go v0.112.1/go.mod h1:+Vbu+Y1UU+I1rjmzeMOb/8RfkKJK2Gyxi1X6jJCZLo4=
cloud.google.com/go/compute/metadata v0.3.0 h1:Tz+eQXMEqDIKRsmY3cHTL6FVaynIjX2QxYC4trgAKZc= cloud.google.com/go/compute/metadata v0.5.0 h1:Zr0eK8JbFv6+Wi4ilXAR8FJ3wyNdpxHKJNPos6LTZOY=
cloud.google.com/go/compute/metadata v0.3.0/go.mod h1:zFmK7XCadkQkj6TtorcaGlCW1hT1fIilQDwofLpJ20k= cloud.google.com/go/compute/metadata v0.5.0/go.mod h1:aHnloV2TPI38yx4s9+wAZhHykWvVCfu7hQbF+9CWoiY=
cloud.google.com/go/firestore v1.15.0 h1:/k8ppuWOtNuDHt2tsRV42yI21uaGnKDEQnRFeBpbFF8= cloud.google.com/go/firestore v1.15.0 h1:/k8ppuWOtNuDHt2tsRV42yI21uaGnKDEQnRFeBpbFF8=
cloud.google.com/go/firestore v1.15.0/go.mod h1:GWOxFXcv8GZUtYpWHw/w6IuYNux/BtmeVTMmjrm4yhk= cloud.google.com/go/firestore v1.15.0/go.mod h1:GWOxFXcv8GZUtYpWHw/w6IuYNux/BtmeVTMmjrm4yhk=
cloud.google.com/go/iam v1.1.7 h1:z4VHOhwKLF/+UYXAJDFwGtNF0b6gjsW1Pk9Ml0U/IoM= cloud.google.com/go/iam v1.1.7 h1:z4VHOhwKLF/+UYXAJDFwGtNF0b6gjsW1Pk9Ml0U/IoM=
@ -319,8 +319,8 @@ github.com/onsi/gomega v1.25.0 h1:Vw7br2PCDYijJHSfBOWhov+8cAnUf8MfMaIOV323l6Y=
github.com/onsi/gomega v1.25.0/go.mod h1:r+zV744Re+DiYCIPRlYOTxn0YkOLcAnW8k1xXdMPGhM= github.com/onsi/gomega v1.25.0/go.mod h1:r+zV744Re+DiYCIPRlYOTxn0YkOLcAnW8k1xXdMPGhM=
github.com/openimsdk/gomake v0.0.14-alpha.5 h1:VY9c5x515lTfmdhhPjMvR3BBRrRquAUCFsz7t7vbv7Y= github.com/openimsdk/gomake v0.0.14-alpha.5 h1:VY9c5x515lTfmdhhPjMvR3BBRrRquAUCFsz7t7vbv7Y=
github.com/openimsdk/gomake v0.0.14-alpha.5/go.mod h1:PndCozNc2IsQIciyn9mvEblYWZwJmAI+06z94EY+csI= github.com/openimsdk/gomake v0.0.14-alpha.5/go.mod h1:PndCozNc2IsQIciyn9mvEblYWZwJmAI+06z94EY+csI=
github.com/openimsdk/protocol v0.0.72-alpha.55 h1:9PPWPHvkFk3neBSbNr+IoOdKIFjxTvEqUfMK/TEq1+8= github.com/openimsdk/protocol v0.0.72-alpha.59 h1:+ycb2+68mLKPIo7VrxF0id/GXP6OqZ2/nBM1YZQr7qY=
github.com/openimsdk/protocol v0.0.72-alpha.55/go.mod h1:OZQA9FR55lseYoN2Ql1XAHYKHJGu7OMNkUbuekrKCM8= github.com/openimsdk/protocol v0.0.72-alpha.59/go.mod h1:Iet+piS/jaS+kWWyj6EEr36mk4ISzIRYjoMSVA4dq2M=
github.com/openimsdk/tools v0.0.50-alpha.39 h1:CebneQY+4D12qMl5XnYRAUOsEeNzHFLWpqtM4c3CiaA= github.com/openimsdk/tools v0.0.50-alpha.39 h1:CebneQY+4D12qMl5XnYRAUOsEeNzHFLWpqtM4c3CiaA=
github.com/openimsdk/tools v0.0.50-alpha.39/go.mod h1:/Em/fQH46CuWf60+hcmvZyboGCQpSDEb2MdQ4nmQRAk= github.com/openimsdk/tools v0.0.50-alpha.39/go.mod h1:/Em/fQH46CuWf60+hcmvZyboGCQpSDEb2MdQ4nmQRAk=
github.com/pelletier/go-toml/v2 v2.2.2 h1:aYUidT7k73Pcl9nb2gScu7NSrKCSHIDE89b3+6Wq+LM= github.com/pelletier/go-toml/v2 v2.2.2 h1:aYUidT7k73Pcl9nb2gScu7NSrKCSHIDE89b3+6Wq+LM=
@ -497,8 +497,8 @@ golang.org/x/net v0.7.0/go.mod h1:2Tu9+aMcznHK/AK1HMvgo6xiTLG5rD5rZLDS+rp2Bjs=
golang.org/x/net v0.29.0 h1:5ORfpBpCs4HzDYoodCDBbwHzdR5UrLBZ3sOnUJmFoHo= golang.org/x/net v0.29.0 h1:5ORfpBpCs4HzDYoodCDBbwHzdR5UrLBZ3sOnUJmFoHo=
golang.org/x/net v0.29.0/go.mod h1:gLkgy8jTGERgjzMic6DS9+SP0ajcu6Xu3Orq/SpETg0= golang.org/x/net v0.29.0/go.mod h1:gLkgy8jTGERgjzMic6DS9+SP0ajcu6Xu3Orq/SpETg0=
golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U=
golang.org/x/oauth2 v0.21.0 h1:tsimM75w1tF/uws5rbeHzIWxEqElMehnc+iW793zsZs= golang.org/x/oauth2 v0.23.0 h1:PbgcYx2W7i4LvjJWEbf0ngHV6qJYr86PkAV3bXdLEbs=
golang.org/x/oauth2 v0.21.0/go.mod h1:XYTD2NtWslqkgxebSiOHnXEap4TF09sJSc7H1sXbhtI= golang.org/x/oauth2 v0.23.0/go.mod h1:XYTD2NtWslqkgxebSiOHnXEap4TF09sJSc7H1sXbhtI=
golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20181108010431-42b317875d0f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20181108010431-42b317875d0f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
@ -565,8 +565,8 @@ google.golang.org/genproto v0.0.0-20190819201941-24fa4b261c55/go.mod h1:DMBHOl98
google.golang.org/genproto v0.0.0-20200526211855-cb27e3aa2013/go.mod h1:NbSheEEYHJ7i3ixzK3sjbqSGDJWnxyFXZblF3eUsNvo= google.golang.org/genproto v0.0.0-20200526211855-cb27e3aa2013/go.mod h1:NbSheEEYHJ7i3ixzK3sjbqSGDJWnxyFXZblF3eUsNvo=
google.golang.org/genproto v0.0.0-20240213162025-012b6fc9bca9 h1:9+tzLLstTlPTRyJTh+ah5wIMsBW5c4tQwGTN3thOW9Y= google.golang.org/genproto v0.0.0-20240213162025-012b6fc9bca9 h1:9+tzLLstTlPTRyJTh+ah5wIMsBW5c4tQwGTN3thOW9Y=
google.golang.org/genproto v0.0.0-20240213162025-012b6fc9bca9/go.mod h1:mqHbVIp48Muh7Ywss/AD6I5kNVKZMmAa/QEW58Gxp2s= google.golang.org/genproto v0.0.0-20240213162025-012b6fc9bca9/go.mod h1:mqHbVIp48Muh7Ywss/AD6I5kNVKZMmAa/QEW58Gxp2s=
google.golang.org/genproto/googleapis/api v0.0.0-20240604185151-ef581f913117 h1:+rdxYoE3E5htTEWIe15GlN6IfvbURM//Jt0mmkmm6ZU= google.golang.org/genproto/googleapis/api v0.0.0-20240903143218-8af14fe29dc1 h1:hjSy6tcFQZ171igDaN5QHOw2n6vx40juYbC/x67CEhc=
google.golang.org/genproto/googleapis/api v0.0.0-20240604185151-ef581f913117/go.mod h1:OimBR/bc1wPO9iV4NC2bpyjy3VnAwZh5EBPQdtaE5oo= google.golang.org/genproto/googleapis/api v0.0.0-20240903143218-8af14fe29dc1/go.mod h1:qpvKtACPCQhAdu3PyQgV4l3LMXZEtft7y8QcarRsp9I=
google.golang.org/genproto/googleapis/rpc v0.0.0-20240903143218-8af14fe29dc1 h1:pPJltXNxVzT4pK9yD8vR9X75DaWYYmLGMsEvBfFQZzQ= google.golang.org/genproto/googleapis/rpc v0.0.0-20240903143218-8af14fe29dc1 h1:pPJltXNxVzT4pK9yD8vR9X75DaWYYmLGMsEvBfFQZzQ=
google.golang.org/genproto/googleapis/rpc v0.0.0-20240903143218-8af14fe29dc1/go.mod h1:UqMtugtsSgubUsoxbuAoiCXvqvErP7Gf0so0mK9tHxU= google.golang.org/genproto/googleapis/rpc v0.0.0-20240903143218-8af14fe29dc1/go.mod h1:UqMtugtsSgubUsoxbuAoiCXvqvErP7Gf0so0mK9tHxU=
google.golang.org/grpc v1.19.0/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZiDr8c= google.golang.org/grpc v1.19.0/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZiDr8c=
@ -574,8 +574,8 @@ google.golang.org/grpc v1.23.0/go.mod h1:Y5yQAOtifL1yxbo5wqy6BxZv8vAUGQwXBOALyac
google.golang.org/grpc v1.25.1/go.mod h1:c3i+UQWmh7LiEpx4sFZnkU36qjEYZ0imhYfXVyQciAY= google.golang.org/grpc v1.25.1/go.mod h1:c3i+UQWmh7LiEpx4sFZnkU36qjEYZ0imhYfXVyQciAY=
google.golang.org/grpc v1.27.0/go.mod h1:qbnxyOmOxrQa7FizSgH+ReBfzJrCY1pSN7KXBS8abTk= google.golang.org/grpc v1.27.0/go.mod h1:qbnxyOmOxrQa7FizSgH+ReBfzJrCY1pSN7KXBS8abTk=
google.golang.org/grpc v1.33.2/go.mod h1:JMHMWHQWaTccqQQlmk3MJZS+GWXOdAesneDmEnv2fbc= google.golang.org/grpc v1.33.2/go.mod h1:JMHMWHQWaTccqQQlmk3MJZS+GWXOdAesneDmEnv2fbc=
google.golang.org/grpc v1.66.2 h1:3QdXkuq3Bkh7w+ywLdLvM56cmGvQHUMZpiCzt6Rqaoo= google.golang.org/grpc v1.68.0 h1:aHQeeJbo8zAkAa3pRzrVjZlbz6uSfeOXlJNQM0RAbz0=
google.golang.org/grpc v1.66.2/go.mod h1:s3/l6xSSCURdVfAnL+TqCNMyTDAGN6+lZeVxnZR128Y= google.golang.org/grpc v1.68.0/go.mod h1:fmSPC5AsjSBCK54MyHRx48kpOti1/jRfOlwEWywNjWA=
google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod h1:DFci5gLYBciE7Vtevhsrf46CRTquxDuWsQurQQe4oz8= google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod h1:DFci5gLYBciE7Vtevhsrf46CRTquxDuWsQurQQe4oz8=
google.golang.org/protobuf v0.0.0-20200221191635-4d8936d0db64/go.mod h1:kwYJMbMJ01Woi6D6+Kah6886xMZcty6N08ah7+eCXa0= google.golang.org/protobuf v0.0.0-20200221191635-4d8936d0db64/go.mod h1:kwYJMbMJ01Woi6D6+Kah6886xMZcty6N08ah7+eCXa0=
google.golang.org/protobuf v0.0.0-20200228230310-ab0ca4ff8a60/go.mod h1:cfTl7dwQJ+fmap5saPgwCLgHXTUD7jkjRqWcaiX5VyM= google.golang.org/protobuf v0.0.0-20200228230310-ab0ca4ff8a60/go.mod h1:cfTl7dwQJ+fmap5saPgwCLgHXTUD7jkjRqWcaiX5VyM=
@ -585,8 +585,8 @@ google.golang.org/protobuf v1.22.0/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2
google.golang.org/protobuf v1.23.0/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU= google.golang.org/protobuf v1.23.0/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU=
google.golang.org/protobuf v1.23.1-0.20200526195155-81db48ad09cc/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU= google.golang.org/protobuf v1.23.1-0.20200526195155-81db48ad09cc/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU=
google.golang.org/protobuf v1.25.0/go.mod h1:9JNX74DMeImyA3h4bdi1ymwjUzf21/xIlbajtzgsN7c= google.golang.org/protobuf v1.25.0/go.mod h1:9JNX74DMeImyA3h4bdi1ymwjUzf21/xIlbajtzgsN7c=
google.golang.org/protobuf v1.34.2 h1:6xV6lTsCfpGD21XK49h7MhtcApnLqkfYgPcdHftf6hg= google.golang.org/protobuf v1.35.1 h1:m3LfL6/Ca+fqnjnlqQXNpFPABW1UD7mjh8KO2mKFytA=
google.golang.org/protobuf v1.34.2/go.mod h1:qYOHts0dSfpeUzUFpOMr/WGzszTmLH+DiWniOlNbLDw= google.golang.org/protobuf v1.35.1/go.mod h1:9fA7Ob0pmnwhb644+1+CVWFRbNajQ6iRojtC/QF5bRE=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk= gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk=

@ -56,6 +56,7 @@ func (s *Server) Start(ctx context.Context, index int, conf *Config) error {
} }
type Server struct { type Server struct {
msggateway.UnimplementedMsgGatewayServer
rpcPort int rpcPort int
LongConnServer LongConnServer LongConnServer LongConnServer
config *Config config *Config

@ -14,6 +14,7 @@ import (
) )
type pushServer struct { type pushServer struct {
pbpush.UnimplementedPushMsgServiceServer
database controller.PushDatabase database controller.PushDatabase
disCov discovery.SvcDiscoveryRegistry disCov discovery.SvcDiscoveryRegistry
offlinePusher offlinepush.OfflinePusher offlinePusher offlinepush.OfflinePusher

@ -414,6 +414,7 @@ func (c *ConsumerHandler) DeleteMemberAndSetConversationSeq(ctx context.Context,
if err != nil { if err != nil {
return err return err
} }
return c.conversationRpcClient.SetConversationMaxSeq(ctx, userIDs, conversationID, maxSeq) return c.conversationRpcClient.SetConversationMaxSeq(ctx, userIDs, conversationID, maxSeq)
} }

@ -40,6 +40,7 @@ import (
) )
type authServer struct { type authServer struct {
pbauth.UnimplementedAuthServer
authDatabase controller.AuthDatabase authDatabase controller.AuthDatabase
userRpcClient *rpcclient.UserRpcClient userRpcClient *rpcclient.UserRpcClient
RegisterCenter discovery.SvcDiscoveryRegistry RegisterCenter discovery.SvcDiscoveryRegistry
@ -196,7 +197,7 @@ func (s *authServer) forceKickOff(ctx context.Context, userID string, platformID
} }
m, err := s.authDatabase.GetTokensWithoutError(ctx, userID, int(platformID)) m, err := s.authDatabase.GetTokensWithoutError(ctx, userID, int(platformID))
if err != nil && errors.Is(err, redis.Nil) { if err != nil && !errors.Is(err, redis.Nil) {
return err return err
} }
for k := range m { for k := range m {
@ -214,7 +215,7 @@ func (s *authServer) forceKickOff(ctx context.Context, userID string, platformID
func (s *authServer) InvalidateToken(ctx context.Context, req *pbauth.InvalidateTokenReq) (*pbauth.InvalidateTokenResp, error) { func (s *authServer) InvalidateToken(ctx context.Context, req *pbauth.InvalidateTokenReq) (*pbauth.InvalidateTokenResp, error) {
m, err := s.authDatabase.GetTokensWithoutError(ctx, req.UserID, int(req.PlatformID)) m, err := s.authDatabase.GetTokensWithoutError(ctx, req.UserID, int(req.PlatformID))
if err != nil && errors.Is(err, redis.Nil) { if err != nil && !errors.Is(err, redis.Nil) {
return nil, err return nil, err
} }
if m == nil { if m == nil {

@ -43,6 +43,7 @@ import (
) )
type conversationServer struct { type conversationServer struct {
pbconversation.UnimplementedConversationServer
msgRpcClient *rpcclient.MessageRpcClient msgRpcClient *rpcclient.MessageRpcClient
user *rpcclient.UserRpcClient user *rpcclient.UserRpcClient
groupRpcClient *rpcclient.GroupRpcClient groupRpcClient *rpcclient.GroupRpcClient
@ -440,6 +441,7 @@ func (c *conversationServer) SetConversationMaxSeq(ctx context.Context, req *pbc
map[string]any{"max_seq": req.MaxSeq}); err != nil { map[string]any{"max_seq": req.MaxSeq}); err != nil {
return nil, err return nil, err
} }
return &pbconversation.SetConversationMaxSeqResp{}, nil return &pbconversation.SetConversationMaxSeqResp{}, nil
} }
@ -669,7 +671,7 @@ func (c *conversationServer) GetOwnerConversation(ctx context.Context, req *pbco
}, nil }, nil
} }
func (c *conversationServer) GetConversationsNeedDestructMsgs(ctx context.Context, _ *pbconversation.GetConversationsNeedDestructMsgsReq) (*pbconversation.GetConversationsNeedDestructMsgsResp, error) { func (c *conversationServer) GetConversationsNeedClearMsg(ctx context.Context, _ *pbconversation.GetConversationsNeedClearMsgReq) (*pbconversation.GetConversationsNeedClearMsgResp, error) {
num, err := c.conversationDatabase.GetAllConversationIDsNumber(ctx) num, err := c.conversationDatabase.GetAllConversationIDsNumber(ctx)
if err != nil { if err != nil {
log.ZError(ctx, "GetAllConversationIDsNumber failed", err) log.ZError(ctx, "GetAllConversationIDsNumber failed", err)
@ -693,7 +695,7 @@ func (c *conversationServer) GetConversationsNeedDestructMsgs(ctx context.Contex
conversationIDs, err := c.conversationDatabase.PageConversationIDs(ctx, pagination) conversationIDs, err := c.conversationDatabase.PageConversationIDs(ctx, pagination)
if err != nil { if err != nil {
// log.ZError(ctx, "PageConversationIDs failed", err, "pageNumber", pageNumber) log.ZError(ctx, "PageConversationIDs failed", err, "pageNumber", pageNumber)
continue continue
} }
@ -716,7 +718,7 @@ func (c *conversationServer) GetConversationsNeedDestructMsgs(ctx context.Contex
} }
} }
return &pbconversation.GetConversationsNeedDestructMsgsResp{Conversations: convert.ConversationsDB2Pb(temp)}, nil return &pbconversation.GetConversationsNeedClearMsgResp{Conversations: convert.ConversationsDB2Pb(temp)}, nil
} }
func (c *conversationServer) GetNotNotifyConversationIDs(ctx context.Context, req *pbconversation.GetNotNotifyConversationIDsReq) (*pbconversation.GetNotNotifyConversationIDsResp, error) { func (c *conversationServer) GetNotNotifyConversationIDs(ctx context.Context, req *pbconversation.GetNotNotifyConversationIDsReq) (*pbconversation.GetNotNotifyConversationIDsResp, error) {

@ -110,7 +110,7 @@ func UpdateGroupMemberMap(req *pbgroup.SetGroupMemberInfo) map[string]any {
m["nickname"] = req.Nickname.Value m["nickname"] = req.Nickname.Value
} }
if req.FaceURL != nil { if req.FaceURL != nil {
m["user_group_face_url"] = req.FaceURL.Value m["face_url"] = req.FaceURL.Value
} }
if req.RoleLevel != nil { if req.RoleLevel != nil {
m["role_level"] = req.RoleLevel.Value m["role_level"] = req.RoleLevel.Value

@ -57,6 +57,7 @@ import (
) )
type groupServer struct { type groupServer struct {
pbgroup.UnimplementedGroupServer
db controller.GroupDatabase db controller.GroupDatabase
user rpcclient.UserRpcClient user rpcclient.UserRpcClient
notification *GroupNotificationSender notification *GroupNotificationSender
@ -963,6 +964,7 @@ func (g *groupServer) deleteMemberAndSetConversationSeq(ctx context.Context, gro
if err != nil { if err != nil {
return err return err
} }
return g.conversationRpcClient.SetConversationMaxSeq(ctx, userIDs, conevrsationID, maxSeq) return g.conversationRpcClient.SetConversationMaxSeq(ctx, userIDs, conevrsationID, maxSeq)
} }

@ -137,7 +137,7 @@ func (m *msgServer) MarkConversationAsRead(ctx context.Context, req *msg.MarkCon
return nil, err return nil, err
} }
hasReadSeq, err := m.MsgDatabase.GetHasReadSeq(ctx, req.UserID, req.ConversationID) hasReadSeq, err := m.MsgDatabase.GetHasReadSeq(ctx, req.UserID, req.ConversationID)
if err != nil && errors.Is(err, redis.Nil) { if err != nil && !errors.Is(err, redis.Nil) {
return nil, err return nil, err
} }
var seqs []int64 var seqs []int64

@ -12,13 +12,14 @@ import (
"github.com/openimsdk/tools/errs" "github.com/openimsdk/tools/errs"
"github.com/openimsdk/tools/log" "github.com/openimsdk/tools/log"
"github.com/openimsdk/tools/mcontext" "github.com/openimsdk/tools/mcontext"
"github.com/openimsdk/tools/utils/datautil"
"github.com/openimsdk/tools/utils/idutil" "github.com/openimsdk/tools/utils/idutil"
"github.com/openimsdk/tools/utils/stringutil" "github.com/openimsdk/tools/utils/stringutil"
"golang.org/x/sync/errgroup" "golang.org/x/sync/errgroup"
) )
// hard delete in Database. // hard delete in Database.
func (m *msgServer) ClearMsg(ctx context.Context, req *msg.ClearMsgReq) (_ *msg.ClearMsgResp, err error) { func (m *msgServer) DestructMsgs(ctx context.Context, req *msg.DestructMsgsReq) (_ *msg.DestructMsgsResp, err error) {
if err := authverify.CheckAdmin(ctx, m.config.Share.IMAdminUserID); err != nil { if err := authverify.CheckAdmin(ctx, m.config.Share.IMAdminUserID); err != nil {
return nil, err return nil, err
} }
@ -29,15 +30,16 @@ func (m *msgServer) ClearMsg(ctx context.Context, req *msg.ClearMsgReq) (_ *msg.
docNum int docNum int
msgNum int msgNum int
start = time.Now() start = time.Now()
getLimit = 5000
) )
clearMsg := func(ctx context.Context) (bool, error) { destructMsg := func(ctx context.Context) (bool, error) {
docIDs, err := m.MsgDatabase.GetDocIDs(ctx) docIDs, err := m.MsgDatabase.GetDocIDs(ctx)
if err != nil { if err != nil {
return false, err return false, err
} }
msgs, err := m.MsgDatabase.GetBeforeMsg(ctx, req.Timestamp, docIDs, 5000) msgs, err := m.MsgDatabase.GetBeforeMsg(ctx, req.Timestamp, docIDs, getLimit)
if err != nil { if err != nil {
return false, err return false, err
} }
@ -61,7 +63,7 @@ func (m *msgServer) ClearMsg(ctx context.Context, req *msg.ClearMsgReq) (_ *msg.
return true, nil return true, nil
} }
_, err = clearMsg(ctx) _, err = destructMsg(ctx)
if err != nil { if err != nil {
log.ZError(ctx, "clear msg failed", err, "docNum", docNum, "msgNum", msgNum, "cost", time.Since(start)) log.ZError(ctx, "clear msg failed", err, "docNum", docNum, "msgNum", msgNum, "cost", time.Since(start))
return nil, err return nil, err
@ -69,11 +71,11 @@ func (m *msgServer) ClearMsg(ctx context.Context, req *msg.ClearMsgReq) (_ *msg.
log.ZDebug(ctx, "clearing message", "docNum", docNum, "msgNum", msgNum, "cost", time.Since(start)) log.ZDebug(ctx, "clearing message", "docNum", docNum, "msgNum", msgNum, "cost", time.Since(start))
return &msg.ClearMsgResp{}, nil return &msg.DestructMsgsResp{}, nil
} }
// soft delete for self // soft delete for user self
func (m *msgServer) DestructMsgs(ctx context.Context, req *msg.DestructMsgsReq) (_ *msg.DestructMsgsResp, err error) { func (m *msgServer) ClearMsg(ctx context.Context, req *msg.ClearMsgReq) (_ *msg.ClearMsgResp, err error) {
temp := convert.ConversationsPb2DB(req.Conversations) temp := convert.ConversationsPb2DB(req.Conversations)
batchNum := 100 batchNum := 100
@ -93,22 +95,31 @@ func (m *msgServer) DestructMsgs(ctx context.Context, req *msg.DestructMsgsReq)
"msgDestructTime", conversation.MsgDestructTime, "msgDestructTime", conversation.MsgDestructTime,
"lastMsgDestructTime", conversation.LatestMsgDestructTime) "lastMsgDestructTime", conversation.LatestMsgDestructTime)
seqs, err := m.MsgDatabase.UserMsgsDestruct(handleCtx, conversation.OwnerUserID, conversation.ConversationID, conversation.MsgDestructTime, conversation.LatestMsgDestructTime) seqs, err := m.MsgDatabase.ClearUserMsgs(handleCtx, conversation.OwnerUserID, conversation.ConversationID, conversation.MsgDestructTime, conversation.LatestMsgDestructTime)
if err != nil { if err != nil {
log.ZError(handleCtx, "user msg destruct failed", err, "conversationID", conversation.ConversationID, "ownerUserID", conversation.OwnerUserID) log.ZError(handleCtx, "user msg destruct failed", err, "conversationID", conversation.ConversationID, "ownerUserID", conversation.OwnerUserID)
continue continue
} }
if len(seqs) > 0 { if len(seqs) > 0 {
minseq := datautil.Max(seqs...)
// update
if err := m.Conversation.UpdateConversation(handleCtx, if err := m.Conversation.UpdateConversation(handleCtx,
&pbconversation.UpdateConversationReq{ &pbconversation.UpdateConversationReq{
UserIDs: []string{conversation.OwnerUserID}, UserIDs: []string{conversation.OwnerUserID},
ConversationID: conversation.ConversationID, ConversationID: conversation.ConversationID,
LatestMsgDestructTime: wrapperspb.Int64(time.Now().UnixMilli())}); err != nil { LatestMsgDestructTime: wrapperspb.Int64(time.Now().UnixMilli()),
MinSeq: wrapperspb.Int64(minseq),
}); err != nil {
log.ZError(handleCtx, "updateUsersConversationField failed", err, "conversationID", conversation.ConversationID, "ownerUserID", conversation.OwnerUserID) log.ZError(handleCtx, "updateUsersConversationField failed", err, "conversationID", conversation.ConversationID, "ownerUserID", conversation.OwnerUserID)
continue continue
} }
if err := m.Conversation.SetConversationMinSeq(handleCtx, []string{conversation.OwnerUserID}, conversation.ConversationID, minseq); err != nil {
return err
}
// if you need Notify SDK client userseq is update. // if you need Notify SDK client userseq is update.
// m.msgNotificationSender.UserDeleteMsgsNotification(handleCtx, conversation.OwnerUserID, conversation.ConversationID, seqs) // m.msgNotificationSender.UserDeleteMsgsNotification(handleCtx, conversation.OwnerUserID, conversation.ConversationID, seqs)
} }

@ -55,6 +55,7 @@ type (
msgNotificationSender *MsgNotificationSender // RPC client for sending msg notifications. msgNotificationSender *MsgNotificationSender // RPC client for sending msg notifications.
config *Config // Global configuration settings. config *Config // Global configuration settings.
webhookClient *webhook.Client webhookClient *webhook.Client
msg.UnimplementedMsgServer
} }
Config struct { Config struct {

@ -43,6 +43,7 @@ import (
) )
type friendServer struct { type friendServer struct {
relation.UnimplementedFriendServer
db controller.FriendDatabase db controller.FriendDatabase
blackDatabase controller.BlackDatabase blackDatabase controller.BlackDatabase
userRpcClient *rpcclient.UserRpcClient userRpcClient *rpcclient.UserRpcClient

@ -290,48 +290,85 @@ func (t *thirdServer) apiAddress(prefix, name string) string {
func (t *thirdServer) DeleteOutdatedData(ctx context.Context, req *third.DeleteOutdatedDataReq) (*third.DeleteOutdatedDataResp, error) { func (t *thirdServer) DeleteOutdatedData(ctx context.Context, req *third.DeleteOutdatedDataReq) (*third.DeleteOutdatedDataResp, error) {
var conf config.Third var conf config.Third
expireTime := time.UnixMilli(req.ExpireTime) expireTime := time.UnixMilli(req.ExpireTime)
var deltotal int
findPagination := &sdkws.RequestPagination{ findPagination := &sdkws.RequestPagination{
PageNumber: 1, PageNumber: 1,
ShowNumber: 1000, ShowNumber: 500,
} }
for {
total, models, err := t.s3dataBase.FindByExpires(ctx, expireTime, findPagination) // Find all expired data in S3 database
total, models, err := t.s3dataBase.FindNeedDeleteObjectByDB(ctx, expireTime, req.ObjectGroup, findPagination)
if err != nil && errs.Unwrap(err) != mongo.ErrNoDocuments { if err != nil && errs.Unwrap(err) != mongo.ErrNoDocuments {
return nil, errs.Wrap(err) return nil, errs.Wrap(err)
} }
needDelObjectKeys := make([]string, 0)
if total == 0 {
log.ZDebug(ctx, "Not have OutdatedData", "delete Total", total)
return &third.DeleteOutdatedDataResp{Count: int32(total)}, nil
}
needDelObjectKeys := make([]string, len(models))
for _, model := range models { for _, model := range models {
needDelObjectKeys = append(needDelObjectKeys, model.Key) needDelObjectKeys = append(needDelObjectKeys, model.Key)
} }
// Remove duplicate keys, have the same key use in different models
needDelObjectKeys = datautil.Distinct(needDelObjectKeys) needDelObjectKeys = datautil.Distinct(needDelObjectKeys)
for _, key := range needDelObjectKeys { for _, key := range needDelObjectKeys {
count, err := t.s3dataBase.FindNotDelByS3(ctx, key, expireTime) // Find all models by key
keyModels, err := t.s3dataBase.FindModelsByKey(ctx, key)
if err != nil && errs.Unwrap(err) != mongo.ErrNoDocuments { if err != nil && errs.Unwrap(err) != mongo.ErrNoDocuments {
return nil, errs.Wrap(err) return nil, errs.Wrap(err)
} }
if int(count) < 1 && t.minio != nil {
// check keyModels, if all keyModels.
needDelKey := true // Default can delete
for _, keymodel := range keyModels {
// If group is empty or CreateTime is after expireTime, can't delete this key
if keymodel.Group == "" || keymodel.CreateTime.After(expireTime) {
needDelKey = false
break
}
}
// If this object is not referenced by not expire data, delete it
if needDelKey && t.minio != nil {
// If have a thumbnail, delete it
thumbnailKey, _ := t.getMinioImageThumbnailKey(ctx, key) thumbnailKey, _ := t.getMinioImageThumbnailKey(ctx, key)
if thumbnailKey != "" {
err := t.s3dataBase.DeleteObject(ctx, thumbnailKey)
if err != nil {
log.ZWarn(ctx, "Delete thumbnail object is error:", errs.Wrap(err), "thumbnailKey", thumbnailKey)
}
}
// Delete object
err = t.s3dataBase.DeleteObject(ctx, key)
if err != nil {
log.ZWarn(ctx, "Delete object is error", errs.Wrap(err), "object key", key)
}
t.s3dataBase.DeleteObject(ctx, thumbnailKey) // Delete cache key
t.s3dataBase.DelS3Key(ctx, conf.Object.Enable, needDelObjectKeys...) err = t.s3dataBase.DelS3Key(ctx, conf.Object.Enable, key)
t.s3dataBase.DeleteObject(ctx, key) if err != nil {
log.ZWarn(ctx, "Delete cache key is error:", errs.Wrap(err), "cache S3 key:", key)
}
} }
} }
// handle delete data in S3 database
for _, model := range models { for _, model := range models {
// Delete all expired data row in S3 database
err := t.s3dataBase.DeleteSpecifiedData(ctx, model.Engine, model.Name) err := t.s3dataBase.DeleteSpecifiedData(ctx, model.Engine, model.Name)
if err != nil { if err != nil {
return nil, errs.Wrap(err) return nil, errs.Wrap(err)
} }
} }
if total < int64(findPagination.ShowNumber) {
break log.ZDebug(ctx, "DeleteOutdatedData", "delete Total", total)
}
deltotal += int(total) return &third.DeleteOutdatedDataResp{Count: int32(total)}, nil
}
log.ZDebug(ctx, "DeleteOutdatedData", "delete Total", deltotal)
return &third.DeleteOutdatedDataResp{}, nil
} }
type FormDataMate struct { type FormDataMate struct {

@ -38,6 +38,7 @@ import (
) )
type thirdServer struct { type thirdServer struct {
third.UnimplementedThirdServer
thirdDatabase controller.ThirdDatabase thirdDatabase controller.ThirdDatabase
s3dataBase controller.S3Database s3dataBase controller.S3Database
userRpcClient rpcclient.UserRpcClient userRpcClient rpcclient.UserRpcClient

@ -52,6 +52,7 @@ import (
) )
type userServer struct { type userServer struct {
pbuser.UnimplementedUserServer
online cache.OnlineCache online cache.OnlineCache
db controller.UserDatabase db controller.UserDatabase
friendNotificationSender *relation.FriendNotificationSender friendNotificationSender *relation.FriendNotificationSender

@ -24,6 +24,7 @@ import (
kdisc "github.com/openimsdk/open-im-server/v3/pkg/common/discoveryregister" kdisc "github.com/openimsdk/open-im-server/v3/pkg/common/discoveryregister"
pbconversation "github.com/openimsdk/protocol/conversation" pbconversation "github.com/openimsdk/protocol/conversation"
"github.com/openimsdk/protocol/msg" "github.com/openimsdk/protocol/msg"
"github.com/openimsdk/protocol/third"
"github.com/openimsdk/tools/mcontext" "github.com/openimsdk/tools/mcontext"
"github.com/openimsdk/tools/mw" "github.com/openimsdk/tools/mw"
@ -58,10 +59,10 @@ func Start(ctx context.Context, config *CronTaskConfig) error {
return err return err
} }
// thirdConn, err := client.GetConn(ctx, config.Share.RpcRegisterName.Third) thirdConn, err := client.GetConn(ctx, config.Share.RpcRegisterName.Third)
// if err != nil { if err != nil {
// return err return err
// } }
conversationConn, err := client.GetConn(ctx, config.Share.RpcRegisterName.Conversation) conversationConn, err := client.GetConn(ctx, config.Share.RpcRegisterName.Conversation)
if err != nil { if err != nil {
@ -70,65 +71,82 @@ func Start(ctx context.Context, config *CronTaskConfig) error {
msgClient := msg.NewMsgClient(msgConn) msgClient := msg.NewMsgClient(msgConn)
conversationClient := pbconversation.NewConversationClient(conversationConn) conversationClient := pbconversation.NewConversationClient(conversationConn)
// thirdClient := third.NewThirdClient(thirdConn) thirdClient := third.NewThirdClient(thirdConn)
crontab := cron.New() crontab := cron.New()
// scheduled hard delete outdated Msgs in specific time. // scheduled hard delete outdated Msgs in specific time.
clearMsgFunc := func() { destructMsgsFunc := func() {
now := time.Now() now := time.Now()
deltime := now.Add(-time.Hour * 24 * time.Duration(config.CronTask.RetainChatRecords)) deltime := now.Add(-time.Hour * 24 * time.Duration(config.CronTask.RetainChatRecords))
ctx := mcontext.SetOperationID(ctx, fmt.Sprintf("cron_%d_%d", os.Getpid(), deltime.UnixMilli())) ctx := mcontext.SetOperationID(ctx, fmt.Sprintf("cron_%d_%d", os.Getpid(), deltime.UnixMilli()))
log.ZDebug(ctx, "clear chat records", "deltime", deltime, "timestamp", deltime.UnixMilli()) log.ZDebug(ctx, "Destruct chat records", "deltime", deltime, "timestamp", deltime.UnixMilli())
if _, err := msgClient.ClearMsg(ctx, &msg.ClearMsgReq{Timestamp: deltime.UnixMilli()}); err != nil { if _, err := msgClient.DestructMsgs(ctx, &msg.DestructMsgsReq{Timestamp: deltime.UnixMilli()}); err != nil {
log.ZError(ctx, "cron clear chat records failed", err, "deltime", deltime, "cont", time.Since(now)) log.ZError(ctx, "cron destruct chat records failed", err, "deltime", deltime, "cont", time.Since(now))
return return
} }
log.ZDebug(ctx, "cron clear chat records success", "deltime", deltime, "cont", time.Since(now)) log.ZDebug(ctx, "cron destruct chat records success", "deltime", deltime, "cont", time.Since(now))
} }
if _, err := crontab.AddFunc(config.CronTask.CronExecuteTime, clearMsgFunc); err != nil { if _, err := crontab.AddFunc(config.CronTask.CronExecuteTime, destructMsgsFunc); err != nil {
return errs.Wrap(err) return errs.Wrap(err)
} }
// scheduled soft delete outdated Msgs in specific time when user set `is_msg_destruct` feature. // scheduled soft delete outdated Msgs in specific time when user set `is_msg_destruct` feature.
msgDestructFunc := func() { clearMsgFunc := func() {
now := time.Now() now := time.Now()
ctx := mcontext.SetOperationID(ctx, fmt.Sprintf("cron_%d_%d", os.Getpid(), now.UnixMilli())) ctx := mcontext.SetOperationID(ctx, fmt.Sprintf("cron_%d_%d", os.Getpid(), now.UnixMilli()))
log.ZDebug(ctx, "msg destruct cron start", "now", now) log.ZDebug(ctx, "clear msg cron start", "now", now)
conversations, err := conversationClient.GetConversationsNeedDestructMsgs(ctx, &pbconversation.GetConversationsNeedDestructMsgsReq{}) conversations, err := conversationClient.GetConversationsNeedClearMsg(ctx, &pbconversation.GetConversationsNeedClearMsgReq{})
if err != nil { if err != nil {
log.ZError(ctx, "Get conversation need Destruct msgs failed.", err) log.ZError(ctx, "Get conversation need Destruct msgs failed.", err)
return return
} else { }
_, err := msgClient.DestructMsgs(ctx, &msg.DestructMsgsReq{Conversations: conversations.Conversations})
_, err = msgClient.ClearMsg(ctx, &msg.ClearMsgReq{Conversations: conversations.Conversations})
if err != nil { if err != nil {
log.ZError(ctx, "Destruct Msgs failed.", err) log.ZError(ctx, "Clear Msg failed.", err)
return return
} }
log.ZDebug(ctx, "clear msg cron task completed", "cont", time.Since(now))
} }
log.ZDebug(ctx, "msg destruct cron task completed", "cont", time.Since(now)) if _, err := crontab.AddFunc(config.CronTask.CronExecuteTime, clearMsgFunc); err != nil {
}
if _, err := crontab.AddFunc(config.CronTask.CronExecuteTime, msgDestructFunc); err != nil {
return errs.Wrap(err) return errs.Wrap(err)
} }
// // scheduled delete outdated file Objects and their datas in specific time. // scheduled delete outdated file Objects and their datas in specific time.
// deleteObjectFunc := func() { deleteObjectFunc := func() {
// now := time.Now() now := time.Now()
// deleteTime := now.Add(-time.Hour * 24 * time.Duration(config.CronTask.FileExpireTime)) executeNum := 5
// ctx := mcontext.SetOperationID(ctx, fmt.Sprintf("cron_%d_%d", os.Getpid(), deleteTime.UnixMilli())) // number of pagination. if need modify, need update value in third.DeleteOutdatedData
// log.ZDebug(ctx, "deleteoutDatedData ", "deletetime", deleteTime, "timestamp", deleteTime.UnixMilli()) pageShowNumber := 500
// if _, err := thirdClient.DeleteOutdatedData(ctx, &third.DeleteOutdatedDataReq{ExpireTime: deleteTime.UnixMilli()}); err != nil { deleteTime := now.Add(-time.Hour * 24 * time.Duration(config.CronTask.FileExpireTime))
// log.ZError(ctx, "cron deleteoutDatedData failed", err, "deleteTime", deleteTime, "cont", time.Since(now)) ctx := mcontext.SetOperationID(ctx, fmt.Sprintf("cron_%d_%d", os.Getpid(), deleteTime.UnixMilli()))
// return log.ZDebug(ctx, "deleteoutDatedData", "deletetime", deleteTime, "timestamp", deleteTime.UnixMilli())
// }
// log.ZDebug(ctx, "cron deleteoutDatedData success", "deltime", deleteTime, "cont", time.Since(now)) if len(config.CronTask.DeleteObjectType) == 0 {
// } log.ZDebug(ctx, "cron deleteoutDatedData not type need delete", "deletetime", deleteTime, "DeleteObjectType", config.CronTask.DeleteObjectType, "cont", time.Since(now))
// if _, err := crontab.AddFunc(config.CronTask.CronExecuteTime, deleteObjectFunc); err != nil { return
// return errs.Wrap(err) }
// }
for i := 0; i < executeNum; i++ {
resp, err := thirdClient.DeleteOutdatedData(ctx, &third.DeleteOutdatedDataReq{ExpireTime: deleteTime.UnixMilli(), ObjectGroup: config.CronTask.DeleteObjectType})
if err != nil {
log.ZError(ctx, "cron deleteoutDatedData failed", err, "deleteTime", deleteTime, "cont", time.Since(now))
return
}
if resp.Count == 0 || resp.Count < int32(pageShowNumber) {
break
}
}
log.ZDebug(ctx, "cron deleteoutDatedData success", "deltime", deleteTime, "cont", time.Since(now))
}
if _, err := crontab.AddFunc(config.CronTask.CronExecuteTime, deleteObjectFunc); err != nil {
return errs.Wrap(err)
}
log.ZDebug(ctx, "start cron task", "CronExecuteTime", config.CronTask.CronExecuteTime) log.ZDebug(ctx, "start cron task", "CronExecuteTime", config.CronTask.CronExecuteTime)
crontab.Start() crontab.Start()

@ -116,6 +116,7 @@ type CronTask struct {
CronExecuteTime string `mapstructure:"cronExecuteTime"` CronExecuteTime string `mapstructure:"cronExecuteTime"`
RetainChatRecords int `mapstructure:"retainChatRecords"` RetainChatRecords int `mapstructure:"retainChatRecords"`
FileExpireTime int `mapstructure:"fileExpireTime"` FileExpireTime int `mapstructure:"fileExpireTime"`
DeleteObjectType []string `mapstructure:"deleteObjectType"`
} }
type OfflinePushConfig struct { type OfflinePushConfig struct {

@ -179,7 +179,7 @@ func (c *conversationDatabase) CreateConversation(ctx context.Context, conversat
if conversation.RecvMsgOpt == constant.ReceiveNotNotifyMessage { if conversation.RecvMsgOpt == constant.ReceiveNotNotifyMessage {
notNotifyUserIDs = append(notNotifyUserIDs, conversation.OwnerUserID) notNotifyUserIDs = append(notNotifyUserIDs, conversation.OwnerUserID)
} }
if conversation.IsPinned == true { if conversation.IsPinned {
pinnedUserIDs = append(pinnedUserIDs, conversation.OwnerUserID) pinnedUserIDs = append(pinnedUserIDs, conversation.OwnerUserID)
} }
} }

@ -57,8 +57,8 @@ type CommonMsgDatabase interface {
// DeleteConversationMsgsAndSetMinSeq deletes conversation messages and resets the minimum sequence number. If `remainTime` is 0, all messages are deleted (this method does not delete Redis // DeleteConversationMsgsAndSetMinSeq deletes conversation messages and resets the minimum sequence number. If `remainTime` is 0, all messages are deleted (this method does not delete Redis
// cache). // cache).
DeleteConversationMsgsAndSetMinSeq(ctx context.Context, conversationID string, remainTime int64) error DeleteConversationMsgsAndSetMinSeq(ctx context.Context, conversationID string, remainTime int64) error
// UserMsgsDestruct marks messages for deletion based on destruct time and returns a list of sequence numbers for marked messages. // ClearUserMsgs marks messages for deletion based on clear time and returns a list of sequence numbers for marked messages.
UserMsgsDestruct(ctx context.Context, userID string, conversationID string, destructTime int64, lastMsgDestructTime time.Time) (seqs []int64, err error) ClearUserMsgs(ctx context.Context, userID string, conversationID string, clearTime int64, lastMsgClearTime time.Time) (seqs []int64, err error)
// DeleteUserMsgsBySeqs allows a user to delete messages based on sequence numbers. // DeleteUserMsgsBySeqs allows a user to delete messages based on sequence numbers.
DeleteUserMsgsBySeqs(ctx context.Context, userID string, conversationID string, seqs []int64) error DeleteUserMsgsBySeqs(ctx context.Context, userID string, conversationID string, seqs []int64) error
// DeleteMsgsPhysicalBySeqs physically deletes messages by emptying them based on sequence numbers. // DeleteMsgsPhysicalBySeqs physically deletes messages by emptying them based on sequence numbers.
@ -92,7 +92,7 @@ type CommonMsgDatabase interface {
RangeGroupSendCount(ctx context.Context, start time.Time, end time.Time, ase bool, pageNumber int32, showNumber int32) (msgCount int64, userCount int64, groups []*model.GroupCount, dateCount map[string]int64, err error) RangeGroupSendCount(ctx context.Context, start time.Time, end time.Time, ase bool, pageNumber int32, showNumber int32) (msgCount int64, userCount int64, groups []*model.GroupCount, dateCount map[string]int64, err error)
ConvertMsgsDocLen(ctx context.Context, conversationIDs []string) ConvertMsgsDocLen(ctx context.Context, conversationIDs []string)
// clear msg // get Msg when destruct msg before
GetBeforeMsg(ctx context.Context, ts int64, docIds []string, limit int) ([]*model.MsgDocModel, error) GetBeforeMsg(ctx context.Context, ts int64, docIds []string, limit int) ([]*model.MsgDocModel, error)
DeleteDocMsgBefore(ctx context.Context, ts int64, doc *model.MsgDocModel) ([]int, error) DeleteDocMsgBefore(ctx context.Context, ts int64, doc *model.MsgDocModel) ([]int, error)
@ -490,8 +490,8 @@ func (db *commonMsgDatabase) GetMsgBySeqs(ctx context.Context, userID string, co
} }
successMsgs, failedSeqs, err := db.msg.GetMessagesBySeq(ctx, conversationID, newSeqs) successMsgs, failedSeqs, err := db.msg.GetMessagesBySeq(ctx, conversationID, newSeqs)
if err != nil { if err != nil {
if errors.Is(err, redis.Nil) { if !errors.Is(err, redis.Nil) {
log.ZError(ctx, "get message from redis exception", err, "failedSeqs", failedSeqs, "conversationID", conversationID) log.ZWarn(ctx, "get message from redis exception", err, "failedSeqs", failedSeqs, "conversationID", conversationID)
} }
} }
log.ZDebug(ctx, "db.seq.GetMessagesBySeq", "userID", userID, "conversationID", conversationID, "seqs", log.ZDebug(ctx, "db.seq.GetMessagesBySeq", "userID", userID, "conversationID", conversationID, "seqs",
@ -528,10 +528,10 @@ func (db *commonMsgDatabase) DeleteConversationMsgsAndSetMinSeq(ctx context.Cont
return db.seqConversation.SetMinSeq(ctx, conversationID, minSeq) return db.seqConversation.SetMinSeq(ctx, conversationID, minSeq)
} }
func (db *commonMsgDatabase) UserMsgsDestruct(ctx context.Context, userID string, conversationID string, destructTime int64, lastMsgDestructTime time.Time) (seqs []int64, err error) { func (db *commonMsgDatabase) ClearUserMsgs(ctx context.Context, userID string, conversationID string, clearTime int64, lastMsgClearTime time.Time) (seqs []int64, err error) {
var index int64 var index int64
for { for {
// from oldest 2 newest // from oldest 2 newest, ASC
msgDocModel, err := db.msgDocDatabase.GetMsgDocModelByIndex(ctx, conversationID, index, 1) msgDocModel, err := db.msgDocDatabase.GetMsgDocModelByIndex(ctx, conversationID, index, 1)
if err != nil || msgDocModel.DocID == "" { if err != nil || msgDocModel.DocID == "" {
if err != nil { if err != nil {
@ -544,15 +544,19 @@ func (db *commonMsgDatabase) UserMsgsDestruct(ctx context.Context, userID string
// If an error is reported, or the error cannot be obtained, it is physically deleted and seq delMongoMsgsPhysical(delStruct.delDocIDList) is returned to end the recursion // If an error is reported, or the error cannot be obtained, it is physically deleted and seq delMongoMsgsPhysical(delStruct.delDocIDList) is returned to end the recursion
break break
} }
index++ index++
// && msgDocModel.Msg[0].Msg.SendTime > lastMsgDestructTime.UnixMilli()
// && msgDocModel.Msg[0].Msg.SendTime > lastMsgClearTime.UnixMilli()
if len(msgDocModel.Msg) > 0 { if len(msgDocModel.Msg) > 0 {
i := 0 i := 0
var over bool var over bool
for _, msg := range msgDocModel.Msg { for _, msg := range msgDocModel.Msg {
i++ i++
if msg != nil && msg.Msg != nil && msg.Msg.SendTime+destructTime*1000 <= time.Now().UnixMilli() { // over clear time, need to clear
if msg.Msg.SendTime+destructTime*1000 > lastMsgDestructTime.UnixMilli() && !datautil.Contain(userID, msg.DelList...) { if msg != nil && msg.Msg != nil && msg.Msg.SendTime+clearTime*1000 <= time.Now().UnixMilli() {
// if msg is not in del list, add to del list
if msg.Msg.SendTime+clearTime*1000 > lastMsgClearTime.UnixMilli() && !datautil.Contain(userID, msg.DelList...) {
seqs = append(seqs, msg.Msg.Seq) seqs = append(seqs, msg.Msg.Seq)
} }
} else { } else {
@ -567,13 +571,18 @@ func (db *commonMsgDatabase) UserMsgsDestruct(ctx context.Context, userID string
} }
} }
log.ZDebug(ctx, "UserMsgsDestruct", "conversationID", conversationID, "userID", userID, "seqs", seqs) log.ZDebug(ctx, "ClearUserMsgs", "conversationID", conversationID, "userID", userID, "seqs", seqs)
// have msg need to destruct
if len(seqs) > 0 { if len(seqs) > 0 {
userMinSeq := seqs[len(seqs)-1] + 1 // update min seq to clear after
currentUserMinSeq, err := db.seqUser.GetUserMinSeq(ctx, conversationID, userID) userMinSeq := seqs[len(seqs)-1] + 1 // user min seq when clear after
currentUserMinSeq, err := db.seqUser.GetUserMinSeq(ctx, conversationID, userID) // user min seq when clear before
if err != nil { if err != nil {
return nil, err return nil, err
} }
// if before < after, update min seq
if currentUserMinSeq < userMinSeq { if currentUserMinSeq < userMinSeq {
if err := db.seqUser.SetUserMinSeq(ctx, conversationID, userID, userMinSeq); err != nil { if err := db.seqUser.SetUserMinSeq(ctx, conversationID, userID, userMinSeq); err != nil {
return nil, err return nil, err

@ -40,10 +40,10 @@ type S3Database interface {
SetObject(ctx context.Context, info *model.Object) error SetObject(ctx context.Context, info *model.Object) error
StatObject(ctx context.Context, name string) (*s3.ObjectInfo, error) StatObject(ctx context.Context, name string) (*s3.ObjectInfo, error)
FormData(ctx context.Context, name string, size int64, contentType string, duration time.Duration) (*s3.FormData, error) FormData(ctx context.Context, name string, size int64, contentType string, duration time.Duration) (*s3.FormData, error)
FindByExpires(ctx context.Context, duration time.Time, pagination pagination.Pagination) (total int64, objects []*model.Object, err error) FindNeedDeleteObjectByDB(ctx context.Context, duration time.Time, needDelType []string, pagination pagination.Pagination) (total int64, objects []*model.Object, err error)
DeleteObject(ctx context.Context, name string) error DeleteObject(ctx context.Context, name string) error
DeleteSpecifiedData(ctx context.Context, engine string, name string) error DeleteSpecifiedData(ctx context.Context, engine string, name string) error
FindNotDelByS3(ctx context.Context, key string, duration time.Time) (int64, error) FindModelsByKey(ctx context.Context, key string) (objects []*model.Object, err error)
DelS3Key(ctx context.Context, engine string, keys ...string) error DelS3Key(ctx context.Context, engine string, keys ...string) error
} }
@ -120,9 +120,8 @@ func (s *s3Database) StatObject(ctx context.Context, name string) (*s3.ObjectInf
func (s *s3Database) FormData(ctx context.Context, name string, size int64, contentType string, duration time.Duration) (*s3.FormData, error) { func (s *s3Database) FormData(ctx context.Context, name string, size int64, contentType string, duration time.Duration) (*s3.FormData, error) {
return s.s3.FormData(ctx, name, size, contentType, duration) return s.s3.FormData(ctx, name, size, contentType, duration)
} }
func (s *s3Database) FindByExpires(ctx context.Context, duration time.Time, pagination pagination.Pagination) (total int64, objects []*model.Object, err error) { func (s *s3Database) FindNeedDeleteObjectByDB(ctx context.Context, duration time.Time, needDelType []string, pagination pagination.Pagination) (total int64, objects []*model.Object, err error) {
return s.db.FindNeedDeleteObjectByDB(ctx, duration, needDelType, pagination)
return s.db.FindByExpires(ctx, duration, pagination)
} }
func (s *s3Database) DeleteObject(ctx context.Context, name string) error { func (s *s3Database) DeleteObject(ctx context.Context, name string) error {
@ -132,8 +131,8 @@ func (s *s3Database) DeleteSpecifiedData(ctx context.Context, engine string, nam
return s.db.Delete(ctx, engine, name) return s.db.Delete(ctx, engine, name)
} }
func (s *s3Database) FindNotDelByS3(ctx context.Context, key string, duration time.Time) (int64, error) { func (s *s3Database) FindModelsByKey(ctx context.Context, key string) (objects []*model.Object, err error) {
return s.db.FindNotDelByS3(ctx, key, duration) return s.db.FindModelsByKey(ctx, key)
} }
func (s *s3Database) DelS3Key(ctx context.Context, engine string, keys ...string) error { func (s *s3Database) DelS3Key(ctx context.Context, engine string, keys ...string) error {

@ -16,6 +16,7 @@ package database
import ( import (
"context" "context"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/model" "github.com/openimsdk/open-im-server/v3/pkg/common/storage/model"
"github.com/openimsdk/tools/db/pagination" "github.com/openimsdk/tools/db/pagination"
) )

@ -16,9 +16,10 @@ package mgo
import ( import (
"context" "context"
"time"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/database" "github.com/openimsdk/open-im-server/v3/pkg/common/storage/database"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/model" "github.com/openimsdk/open-im-server/v3/pkg/common/storage/model"
"time"
"github.com/openimsdk/protocol/constant" "github.com/openimsdk/protocol/constant"
"github.com/openimsdk/tools/db/mongoutil" "github.com/openimsdk/tools/db/mongoutil"

@ -60,7 +60,7 @@ type GroupMemberMgo struct {
} }
func (g *GroupMemberMgo) memberSort() any { func (g *GroupMemberMgo) memberSort() any {
return bson.D{{"role_level", -1}, {"create_time", 1}} return bson.D{{Key: "role_level", Value: -1}, {Key: "create_time", Value: 1}}
} }
func (g *GroupMemberMgo) Create(ctx context.Context, groupMembers []*model.GroupMember) (err error) { func (g *GroupMemberMgo) Create(ctx context.Context, groupMembers []*model.GroupMember) (err error) {

@ -31,6 +31,8 @@ import (
func NewS3Mongo(db *mongo.Database) (database.ObjectInfo, error) { func NewS3Mongo(db *mongo.Database) (database.ObjectInfo, error) {
coll := db.Collection(database.ObjectName) coll := db.Collection(database.ObjectName)
// Create index for name
_, err := coll.Indexes().CreateOne(context.Background(), mongo.IndexModel{ _, err := coll.Indexes().CreateOne(context.Background(), mongo.IndexModel{
Keys: bson.D{ Keys: bson.D{
{Key: "name", Value: 1}, {Key: "name", Value: 1},
@ -40,6 +42,27 @@ func NewS3Mongo(db *mongo.Database) (database.ObjectInfo, error) {
if err != nil { if err != nil {
return nil, errs.Wrap(err) return nil, errs.Wrap(err)
} }
// Create index for create_time
_, err = coll.Indexes().CreateOne(context.Background(), mongo.IndexModel{
Keys: bson.D{
{Key: "create_time", Value: 1},
},
})
if err != nil {
return nil, errs.Wrap(err)
}
// Create index for key
_, err = coll.Indexes().CreateOne(context.Background(), mongo.IndexModel{
Keys: bson.D{
{Key: "key", Value: 1},
},
})
if err != nil {
return nil, errs.Wrap(err)
}
return &S3Mongo{coll: coll}, nil return &S3Mongo{coll: coll}, nil
} }
@ -71,14 +94,18 @@ func (o *S3Mongo) Take(ctx context.Context, engine string, name string) (*model.
func (o *S3Mongo) Delete(ctx context.Context, engine string, name string) error { func (o *S3Mongo) Delete(ctx context.Context, engine string, name string) error {
return mongoutil.DeleteOne(ctx, o.coll, bson.M{"name": name, "engine": engine}) return mongoutil.DeleteOne(ctx, o.coll, bson.M{"name": name, "engine": engine})
} }
func (o *S3Mongo) FindByExpires(ctx context.Context, duration time.Time, pagination pagination.Pagination) (total int64, objects []*model.Object, err error) {
// Find Expires object
func (o *S3Mongo) FindNeedDeleteObjectByDB(ctx context.Context, duration time.Time, needDelType []string, pagination pagination.Pagination) (total int64, objects []*model.Object, err error) {
return mongoutil.FindPage[*model.Object](ctx, o.coll, bson.M{ return mongoutil.FindPage[*model.Object](ctx, o.coll, bson.M{
"create_time": bson.M{"$lt": duration}, "create_time": bson.M{"$lt": duration},
"group": bson.M{"$in": needDelType},
}, pagination) }, pagination)
} }
func (o *S3Mongo) FindNotDelByS3(ctx context.Context, key string, duration time.Time) (int64, error) {
return mongoutil.Count(ctx, o.coll, bson.M{ // Find object by key
func (o *S3Mongo) FindModelsByKey(ctx context.Context, key string) (objects []*model.Object, err error) {
return mongoutil.Find[*model.Object](ctx, o.coll, bson.M{
"key": key, "key": key,
"create_time": bson.M{"$gt": duration},
}) })
} }

@ -26,6 +26,6 @@ type ObjectInfo interface {
SetObject(ctx context.Context, obj *model.Object) error SetObject(ctx context.Context, obj *model.Object) error
Take(ctx context.Context, engine string, name string) (*model.Object, error) Take(ctx context.Context, engine string, name string) (*model.Object, error)
Delete(ctx context.Context, engine string, name string) error Delete(ctx context.Context, engine string, name string) error
FindByExpires(ctx context.Context, duration time.Time, pagination pagination.Pagination) (total int64, objects []*model.Object, err error) FindNeedDeleteObjectByDB(ctx context.Context, duration time.Time, needDelType []string, pagination pagination.Pagination) (total int64, objects []*model.Object, err error)
FindNotDelByS3(ctx context.Context, key string, duration time.Time) (int64, error) FindModelsByKey(ctx context.Context, key string) (objects []*model.Object, err error)
} }

@ -152,8 +152,8 @@ func (c *ConversationRpcClient) GetConversationNotReceiveMessageUserIDs(ctx cont
return resp.UserIDs, nil return resp.UserIDs, nil
} }
func (c *ConversationRpcClient) GetConversationsNeedDestructMsgs(ctx context.Context) ([]*pbconversation.Conversation, error) { func (c *ConversationRpcClient) GetConversationsNeedClearMsg(ctx context.Context) ([]*pbconversation.Conversation, error) {
resp, err := c.Client.GetConversationsNeedDestructMsgs(ctx, &pbconversation.GetConversationsNeedDestructMsgsReq{}) resp, err := c.Client.GetConversationsNeedClearMsg(ctx, &pbconversation.GetConversationsNeedClearMsgReq{})
if err != nil { if err != nil {
return nil, err return nil, err
} }

@ -244,8 +244,8 @@ func (m *MessageRpcClient) GetConversationMaxSeq(ctx context.Context, conversati
return resp.MaxSeq, nil return resp.MaxSeq, nil
} }
func (m *MessageRpcClient) ClearMsg(ctx context.Context, ts int64) error { func (m *MessageRpcClient) DestructMsgs(ctx context.Context, ts int64) error {
_, err := m.Client.ClearMsg(ctx, &msg.ClearMsgReq{Timestamp: ts}) _, err := m.Client.DestructMsgs(ctx, &msg.DestructMsgsReq{Timestamp: ts})
return err return err
} }

Loading…
Cancel
Save