diff --git a/.github/workflows/go-build-test.yml b/.github/workflows/go-build-test.yml index 5341c919d..1ed8f0397 100644 --- a/.github/workflows/go-build-test.yml +++ b/.github/workflows/go-build-test.yml @@ -89,6 +89,65 @@ jobs: mage start mage check + go-test: + name: Benchmark Test with go ${{ matrix.go_version }} on ${{ matrix.os }} + runs-on: ${{ matrix.os }} + permissions: + contents: write + env: + SDK_DIR: openim-sdk-core + CONFIG_PATH: config/notification.yml + # pull-requests: write + strategy: + matrix: + os: [ ubuntu-latest ] + go_version: [ "1.22.x" ] + + steps: + - name: Checkout Server repository + uses: actions/checkout@v4 + + - name: Checkout SDK repository + uses: actions/checkout@v4 + with: + repository: 'openimsdk/openim-sdk-core' + path: ${{ env.SDK_DIR }} + + - name: Set up Go ${{ matrix.go_version }} + uses: actions/setup-go@v5 + with: + go-version: ${{ matrix.go_version }} + + - name: Get Server dependencies + run: | + go install github.com/magefile/mage@latest + go mod download + + - name: Install yq + run: | + sudo wget https://github.com/mikefarah/yq/releases/download/v4.34.1/yq_linux_amd64 -O /usr/bin/yq + sudo chmod +x /usr/bin/yq + + - name: Modify Server Configuration + run: | + yq e '.groupCreated.unreadCount = true' -i ${{ env.CONFIG_PATH }} + yq e '.friendApplicationApproved.unreadCount = true' -i ${{ env.CONFIG_PATH }} + + - name: Start Server Services + run: | + docker compose up -d + mage build + mage start + mage check + + - name: Build test SDK core + run: | + cd ${{ env.SDK_DIR }} + go mod tidy + cd integration_test + mkdir data + go run main.go -lgr 0.8 -imf -crg -ckgn -ckcon -sem -ckmsn -u 20 -su 5 -lg 2 -cg 2 -cgm 3 -sm 10 -gm 10 -reg + dockerfile-test: name: Build and Test Dockerfile runs-on: ubuntu-latest diff --git a/config/openim-msgtransfer.yml b/config/openim-msgtransfer.yml index 753ac10bc..94ed073d8 100644 --- a/config/openim-msgtransfer.yml +++ b/config/openim-msgtransfer.yml @@ -3,4 +3,4 @@ prometheus: enable: true # List of ports that Prometheus listens on; each port corresponds to an instance of monitoring. Ensure these are managed accordingly # Because four instances have been launched, four ports need to be specified - ports: [ 12020, 12021, 12022, 12023, 12024, 12025, 12026, 12027 ] + ports: [ 12020, 12021, 12022, 12023, 12024, 12025, 12026, 12027, 12028, 12029, 12030, 12031, 12032, 12033, 12034, 12035 ] diff --git a/config/share.yml b/config/share.yml index 4c5892615..5f8521eaa 100644 --- a/config/share.yml +++ b/config/share.yml @@ -12,3 +12,5 @@ rpcRegisterName: imAdminUserID: [ imAdmin ] +# 1: For Android, iOS, Windows, Mac, and web platforms, only one instance can be online at a time +multiLoginPolicy: 1 diff --git a/go.mod b/go.mod index 5fab50991..5cc1f9ad3 100644 --- a/go.mod +++ b/go.mod @@ -3,7 +3,7 @@ module github.com/openimsdk/open-im-server/v3 go 1.21.2 require ( - firebase.google.com/go v3.13.0+incompatible + firebase.google.com/go/v4 v4.14.1 github.com/dtm-labs/rockscache v0.1.1 github.com/gin-gonic/gin v1.9.1 github.com/go-playground/validator/v10 v10.20.0 @@ -12,13 +12,13 @@ require ( github.com/gorilla/websocket v1.5.1 github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0 github.com/mitchellh/mapstructure v1.5.0 - github.com/openimsdk/protocol v0.0.72-alpha.25 + github.com/openimsdk/protocol v0.0.72-alpha.30 github.com/openimsdk/tools v0.0.50-alpha.12 github.com/pkg/errors v0.9.1 // indirect github.com/prometheus/client_golang v1.18.0 github.com/stretchr/testify v1.9.0 go.mongodb.org/mongo-driver v1.14.0 - google.golang.org/api v0.165.0 + google.golang.org/api v0.170.0 google.golang.org/grpc v1.66.2 google.golang.org/protobuf v1.34.2 gopkg.in/yaml.v3 v3.0.1 @@ -47,12 +47,13 @@ require ( ) require ( - cloud.google.com/go v0.112.0 // indirect + cloud.google.com/go v0.112.1 // indirect cloud.google.com/go/compute/metadata v0.3.0 // indirect - cloud.google.com/go/firestore v1.14.0 // indirect - cloud.google.com/go/iam v1.1.5 // indirect - cloud.google.com/go/longrunning v0.5.4 // indirect - cloud.google.com/go/storage v1.36.0 // indirect + cloud.google.com/go/firestore v1.15.0 // indirect + cloud.google.com/go/iam v1.1.7 // indirect + cloud.google.com/go/longrunning v0.5.5 // indirect + cloud.google.com/go/storage v1.40.0 // indirect + github.com/MicahParks/keyfunc v1.9.0 // indirect github.com/aliyun/aliyun-oss-go-sdk v3.0.2+incompatible // indirect github.com/aws/aws-sdk-go-v2 v1.23.1 // indirect github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.5.1 // indirect @@ -102,7 +103,7 @@ require ( github.com/google/go-querystring v1.1.0 // indirect github.com/google/s2a-go v0.1.7 // indirect github.com/googleapis/enterprise-certificate-proxy v0.3.2 // indirect - github.com/googleapis/gax-go/v2 v2.12.0 // indirect + github.com/googleapis/gax-go/v2 v2.12.3 // indirect github.com/hashicorp/errwrap v1.1.0 // indirect github.com/hashicorp/go-multierror v1.1.1 // indirect github.com/hashicorp/go-uuid v1.0.3 // indirect @@ -164,11 +165,11 @@ require ( go.etcd.io/etcd/client/pkg/v3 v3.5.13 // indirect go.etcd.io/etcd/client/v3 v3.5.13 // indirect go.opencensus.io v0.24.0 // indirect - go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.47.0 // indirect - go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.47.0 // indirect - go.opentelemetry.io/otel v1.23.0 // indirect - go.opentelemetry.io/otel/metric v1.23.0 // indirect - go.opentelemetry.io/otel/trace v1.23.0 // indirect + go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.49.0 // indirect + go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.49.0 // indirect + go.opentelemetry.io/otel v1.24.0 // indirect + go.opentelemetry.io/otel/metric v1.24.0 // indirect + go.opentelemetry.io/otel/trace v1.24.0 // indirect go.uber.org/atomic v1.9.0 // indirect go.uber.org/multierr v1.11.0 // indirect golang.org/x/arch v0.7.0 // indirect @@ -178,8 +179,8 @@ require ( golang.org/x/sys v0.25.0 // indirect golang.org/x/text v0.18.0 // indirect golang.org/x/time v0.5.0 // indirect - google.golang.org/appengine v1.6.8 // indirect - google.golang.org/genproto v0.0.0-20240125205218-1f4bbc51befe // indirect + google.golang.org/appengine/v2 v2.0.2 // indirect + google.golang.org/genproto v0.0.0-20240213162025-012b6fc9bca9 // indirect google.golang.org/genproto/googleapis/api v0.0.0-20240604185151-ef581f913117 // indirect google.golang.org/genproto/googleapis/rpc v0.0.0-20240903143218-8af14fe29dc1 // indirect gorm.io/gorm v1.25.8 // indirect diff --git a/go.sum b/go.sum index 4c3b4aa21..62cb5b7b1 100644 --- a/go.sum +++ b/go.sum @@ -1,21 +1,23 @@ cloud.google.com/go v0.26.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw= -cloud.google.com/go v0.112.0 h1:tpFCD7hpHFlQ8yPwT3x+QeXqc2T6+n6T+hmABHfDUSM= -cloud.google.com/go v0.112.0/go.mod h1:3jEEVwZ/MHU4djK5t5RHuKOA/GbLddgTdVubX1qnPD4= +cloud.google.com/go v0.112.1 h1:uJSeirPke5UNZHIb4SxfZklVSiWWVqW4oXlETwZziwM= +cloud.google.com/go v0.112.1/go.mod h1:+Vbu+Y1UU+I1rjmzeMOb/8RfkKJK2Gyxi1X6jJCZLo4= cloud.google.com/go/compute/metadata v0.3.0 h1:Tz+eQXMEqDIKRsmY3cHTL6FVaynIjX2QxYC4trgAKZc= cloud.google.com/go/compute/metadata v0.3.0/go.mod h1:zFmK7XCadkQkj6TtorcaGlCW1hT1fIilQDwofLpJ20k= -cloud.google.com/go/firestore v1.14.0 h1:8aLcKnMPoldYU3YHgu4t2exrKhLQkqaXAGqT0ljrFVw= -cloud.google.com/go/firestore v1.14.0/go.mod h1:96MVaHLsEhbvkBEdZgfN+AS/GIkco1LRpH9Xp9YZfzQ= -cloud.google.com/go/iam v1.1.5 h1:1jTsCu4bcsNsE4iiqNT5SHwrDRCfRmIaaaVFhRveTJI= -cloud.google.com/go/iam v1.1.5/go.mod h1:rB6P/Ic3mykPbFio+vo7403drjlgvoWfYpJhMXEbzv8= -cloud.google.com/go/longrunning v0.5.4 h1:w8xEcbZodnA2BbW6sVirkkoC+1gP8wS57EUUgGS0GVg= -cloud.google.com/go/longrunning v0.5.4/go.mod h1:zqNVncI0BOP8ST6XQD1+VcvuShMmq7+xFSzOL++V0dI= -cloud.google.com/go/storage v1.36.0 h1:P0mOkAcaJxhCTvAkMhxMfrTKiNcub4YmmPBtlhAyTr8= -cloud.google.com/go/storage v1.36.0/go.mod h1:M6M/3V/D3KpzMTJyPOR/HU6n2Si5QdaXYEsng2xgOs8= -firebase.google.com/go v3.13.0+incompatible h1:3TdYC3DDi6aHn20qoRkxwGqNgdjtblwVAyRLQwGn/+4= -firebase.google.com/go v3.13.0+incompatible/go.mod h1:xlah6XbEyW6tbfSklcfe5FHJIwjt8toICdV5Wh9ptHs= +cloud.google.com/go/firestore v1.15.0 h1:/k8ppuWOtNuDHt2tsRV42yI21uaGnKDEQnRFeBpbFF8= +cloud.google.com/go/firestore v1.15.0/go.mod h1:GWOxFXcv8GZUtYpWHw/w6IuYNux/BtmeVTMmjrm4yhk= +cloud.google.com/go/iam v1.1.7 h1:z4VHOhwKLF/+UYXAJDFwGtNF0b6gjsW1Pk9Ml0U/IoM= +cloud.google.com/go/iam v1.1.7/go.mod h1:J4PMPg8TtyurAUvSmPj8FF3EDgY1SPRZxcUGrn7WXGA= +cloud.google.com/go/longrunning v0.5.5 h1:GOE6pZFdSrTb4KAiKnXsJBtlE6mEyaW44oKyMILWnOg= +cloud.google.com/go/longrunning v0.5.5/go.mod h1:WV2LAxD8/rg5Z1cNW6FJ/ZpX4E4VnDnoTk0yawPBB7s= +cloud.google.com/go/storage v1.40.0 h1:VEpDQV5CJxFmJ6ueWNsKxcr1QAYOXEgxDa+sBbJahPw= +cloud.google.com/go/storage v1.40.0/go.mod h1:Rrj7/hKlG87BLqDJYtwR0fbPld8uJPbQ2ucUMY7Ir0g= +firebase.google.com/go/v4 v4.14.1 h1:4qiUETaFRWoFGE1XP5VbcEdtPX93Qs+8B/7KvP2825g= +firebase.google.com/go/v4 v4.14.1/go.mod h1:fgk2XshgNDEKaioKco+AouiegSI9oTWVqRaBdTTGBoM= github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= github.com/IBM/sarama v1.43.0 h1:YFFDn8mMI2QL0wOrG0J2sFoVIAFl7hS9JQi2YZsXtJc= github.com/IBM/sarama v1.43.0/go.mod h1:zlE6HEbC/SMQ9mhEYaF7nNLYOUyrs0obySKCckWP9BM= +github.com/MicahParks/keyfunc v1.9.0 h1:lhKd5xrFHLNOWrDc4Tyb/Q1AJ4LCzQ48GVJyVIID3+o= +github.com/MicahParks/keyfunc v1.9.0/go.mod h1:IdnCilugA0O/99dW+/MkvlyrsX8+L8+x95xuVNtM5jw= github.com/QcloudApi/qcloud_sign_golang v0.0.0-20141224014652-e4130a326409/go.mod h1:1pk82RBxDY/JZnPQrtqHlUFfCctgdorsd9M06fMynOM= github.com/aliyun/aliyun-oss-go-sdk v3.0.2+incompatible h1:8psS8a+wKfiLt1iVDX79F7Y6wUM49Lcha2FMXt4UM8g= github.com/aliyun/aliyun-oss-go-sdk v3.0.2+incompatible/go.mod h1:T/Aws4fEfogEE9v+HPhhw+CntffsBHJ8nXQCwKr0/g8= @@ -107,8 +109,6 @@ github.com/envoyproxy/go-control-plane v0.9.0/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymF github.com/envoyproxy/go-control-plane v0.9.1-0.20191026205805-5f8ba28d4473/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4= github.com/envoyproxy/go-control-plane v0.9.4/go.mod h1:6rpuAdCZL397s3pYoYcLgu1mIlRU8Am5FuJP05cCM98= github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7+kN2VEUnK/pcBlmesArF7c= -github.com/envoyproxy/protoc-gen-validate v1.0.4 h1:gVPz/FMfvh57HdSJQyvBtF00j8JU4zdyUgIUNhlgg0A= -github.com/envoyproxy/protoc-gen-validate v1.0.4/go.mod h1:qys6tmnRsYrQqIhm2bvKZH4Blx/1gTIZ2UKVY1M+Yew= github.com/fatih/color v1.14.1 h1:qfhVLaG5s+nCROl1zJsZRxFeYrHLqWroPOQ8BWiNb4w= github.com/fatih/color v1.14.1/go.mod h1:2oHN61fhTpgcxD3TSWCgKDiH1+x4OiDVVGH8WlgGZGg= github.com/felixge/httpsnoop v1.0.4 h1:NFTV2Zj1bL4mc9sqWACXbQFVBBg2W3GPvqp8/ESS2Wg= @@ -159,6 +159,7 @@ github.com/goccy/go-json v0.10.2/go.mod h1:6MelG93GURQebXPDq3khkgXZkazVtN9CRI+MG github.com/godbus/dbus/v5 v5.0.4/go.mod h1:xhWf0FNVPg57R7Z0UbKHbJfkEywrmjJnf7w5xrFpKfA= github.com/gogo/protobuf v1.3.2 h1:Ov1cvc58UF3b5XjBnZv7+opcTcQFZebYjWzi34vdm4Q= github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69NZV8Q= +github.com/golang-jwt/jwt/v4 v4.4.2/go.mod h1:m21LjoU+eqJr34lmDMbreY2eSTRJ1cv77w39/MY0Ch0= github.com/golang-jwt/jwt/v4 v4.5.0 h1:7cYmW1XlMY7h7ii7UhUyChSgS5wUJEnm9uZVTGqOWzg= github.com/golang-jwt/jwt/v4 v4.5.0/go.mod h1:m21LjoU+eqJr34lmDMbreY2eSTRJ1cv77w39/MY0Ch0= github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q= @@ -167,6 +168,7 @@ github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da h1:oI5xCqsCo564l github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc= github.com/golang/mock v1.1.1/go.mod h1:oTYuIxOrZwtPieC+H1uAHpcLFnEyAGVDL/k47Jfbm0A= github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= +github.com/golang/protobuf v1.3.1/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= github.com/golang/protobuf v1.3.2/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= github.com/golang/protobuf v1.4.0-rc.1/go.mod h1:ceaxUfeHdC40wWswd/P6IGgMaK3YpKi5j83Wpe3EHw8= github.com/golang/protobuf v1.4.0-rc.1.0.20200221234624-67d41d38c208/go.mod h1:xKAWHe0F5eneWXFV3EuXVDTCmh+JuBKY0li0aMyXATA= @@ -175,8 +177,6 @@ github.com/golang/protobuf v1.4.0-rc.4.0.20200313231945-b860323f09d0/go.mod h1:W github.com/golang/protobuf v1.4.0/go.mod h1:jodUvKwWbYaEsadDk5Fwe5c77LiNKVO9IDvqG2KuDX0= github.com/golang/protobuf v1.4.1/go.mod h1:U8fpvMrcmy5pZrNK1lt4xCsGvpyWQ/VVv6QDs8UjoX8= github.com/golang/protobuf v1.4.3/go.mod h1:oDoupMAO8OvCJWAcko0GGGIgR6R6ocIYbsSw735rRwI= -github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk= -github.com/golang/protobuf v1.5.2/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY= github.com/golang/protobuf v1.5.4 h1:i7eJL8qZTpSEXOPTxNKhASYpMn+8e5Q6AdndVa1dWek= github.com/golang/protobuf v1.5.4/go.mod h1:lnTiLA8Wa4RWRcIUkrtSVa5nRhsEGBg48fD6rSs7xps= github.com/golang/snappy v0.0.4 h1:yAGX7huGHXlcLOEtBnF4w7FQwA26wojNCwOYAEhLjQM= @@ -188,7 +188,6 @@ github.com/google/go-cmp v0.4.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/ github.com/google/go-cmp v0.5.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.5.2/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.5.3/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= -github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI= github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= github.com/google/go-querystring v1.0.0/go.mod h1:odCYkC5MyYFN7vkCjXpyrEuKhc/BUO6wN/zVPAxq5ck= @@ -205,8 +204,8 @@ github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/googleapis/enterprise-certificate-proxy v0.3.2 h1:Vie5ybvEvT75RniqhfFxPRy3Bf7vr3h0cechB90XaQs= github.com/googleapis/enterprise-certificate-proxy v0.3.2/go.mod h1:VLSiSSBs/ksPL8kq3OBOQ6WRI2QnaFynd1DCjZ62+V0= -github.com/googleapis/gax-go/v2 v2.12.0 h1:A+gCJKdRfqXkr+BIRGtZLibNXf0m1f9E4HG56etFpas= -github.com/googleapis/gax-go/v2 v2.12.0/go.mod h1:y+aIqrI5eb1YGMVJfuV3185Ts/D7qKpsEkdD5+I6QGU= +github.com/googleapis/gax-go/v2 v2.12.3 h1:5/zPPDvw8Q1SuXjrqrZslrqT7dL/uJT2CQii/cLCKqA= +github.com/googleapis/gax-go/v2 v2.12.3/go.mod h1:AKloxT6GtNbaLm8QTNSidHUVsHYcBHwWRvkNFJUQcS4= github.com/gorilla/securecookie v1.1.1/go.mod h1:ra0sb63/xPlUeL+yeDciTfxMRAA+MP+HVt/4epWDjd4= github.com/gorilla/sessions v1.2.1/go.mod h1:dk2InVEVJ0sfLlnXv9EAgkf6ecYs/i80K/zI+bUmuGM= github.com/gorilla/websocket v1.5.1 h1:gmztn0JnHVt9JZquRuzLw3g4wouNVzKL15iLr/zn/QY= @@ -322,8 +321,8 @@ github.com/onsi/gomega v1.25.0 h1:Vw7br2PCDYijJHSfBOWhov+8cAnUf8MfMaIOV323l6Y= github.com/onsi/gomega v1.25.0/go.mod h1:r+zV744Re+DiYCIPRlYOTxn0YkOLcAnW8k1xXdMPGhM= github.com/openimsdk/gomake v0.0.14-alpha.5 h1:VY9c5x515lTfmdhhPjMvR3BBRrRquAUCFsz7t7vbv7Y= github.com/openimsdk/gomake v0.0.14-alpha.5/go.mod h1:PndCozNc2IsQIciyn9mvEblYWZwJmAI+06z94EY+csI= -github.com/openimsdk/protocol v0.0.72-alpha.25 h1:W8E6gnwt5V6anr/8lYOf5v/Lcsggf7gIAzJbw7YU6So= -github.com/openimsdk/protocol v0.0.72-alpha.25/go.mod h1:OZQA9FR55lseYoN2Ql1XAHYKHJGu7OMNkUbuekrKCM8= +github.com/openimsdk/protocol v0.0.72-alpha.30 h1:LBIqDzD55cSQy3wX8fgSa3blz8+Cv54ae96/qUMINwM= +github.com/openimsdk/protocol v0.0.72-alpha.30/go.mod h1:OZQA9FR55lseYoN2Ql1XAHYKHJGu7OMNkUbuekrKCM8= github.com/openimsdk/tools v0.0.50-alpha.12 h1:rV3BxgqN+F79vZvdoQ+97Eob8ScsRVEM8D+Wrcl23uo= github.com/openimsdk/tools v0.0.50-alpha.12/go.mod h1:h1cYmfyaVtgFbKmb1Cfsl8XwUOMTt8ubVUQrdGtsUh4= github.com/pelletier/go-toml/v2 v2.2.2 h1:aYUidT7k73Pcl9nb2gScu7NSrKCSHIDE89b3+6Wq+LM= @@ -436,18 +435,18 @@ go.mongodb.org/mongo-driver v1.14.0 h1:P98w8egYRjYe3XDjxhYJagTokP/H6HzlsnojRgZRd go.mongodb.org/mongo-driver v1.14.0/go.mod h1:Vzb0Mk/pa7e6cWw85R4F/endUC3u0U9jGcNU603k65c= go.opencensus.io v0.24.0 h1:y73uSU6J157QMP2kn2r30vwW1A2W2WFwSCGnAVxeaD0= go.opencensus.io v0.24.0/go.mod h1:vNK8G9p7aAivkbmorf4v+7Hgx+Zs0yY+0fOtgBfjQKo= -go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.47.0 h1:UNQQKPfTDe1J81ViolILjTKPr9WetKW6uei2hFgJmFs= -go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.47.0/go.mod h1:r9vWsPS/3AQItv3OSlEJ/E4mbrhUbbw18meOjArPtKQ= -go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.47.0 h1:sv9kVfal0MK0wBMCOGr+HeJm9v803BkJxGrk2au7j08= -go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.47.0/go.mod h1:SK2UL73Zy1quvRPonmOmRDiWk1KBV3LyIeeIxcEApWw= -go.opentelemetry.io/otel v1.23.0 h1:Df0pqjqExIywbMCMTxkAwzjLZtRf+bBKLbUcpxO2C9E= -go.opentelemetry.io/otel v1.23.0/go.mod h1:YCycw9ZeKhcJFrb34iVSkyT0iczq/zYDtZYFufObyB0= -go.opentelemetry.io/otel/metric v1.23.0 h1:pazkx7ss4LFVVYSxYew7L5I6qvLXHA0Ap2pwV+9Cnpo= -go.opentelemetry.io/otel/metric v1.23.0/go.mod h1:MqUW2X2a6Q8RN96E2/nqNoT+z9BSms20Jb7Bbp+HiTo= -go.opentelemetry.io/otel/sdk v1.21.0 h1:FTt8qirL1EysG6sTQRZ5TokkU8d0ugCj8htOgThZXQ8= -go.opentelemetry.io/otel/sdk v1.21.0/go.mod h1:Nna6Yv7PWTdgJHVRD9hIYywQBRx7pbox6nwBnZIxl/E= -go.opentelemetry.io/otel/trace v1.23.0 h1:37Ik5Ib7xfYVb4V1UtnT97T1jI+AoIYkJyPkuL4iJgI= -go.opentelemetry.io/otel/trace v1.23.0/go.mod h1:GSGTbIClEsuZrGIzoEHqsVfxgn5UkggkflQwDScNUsk= +go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.49.0 h1:4Pp6oUg3+e/6M4C0A/3kJ2VYa++dsWVTtGgLVj5xtHg= +go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.49.0/go.mod h1:Mjt1i1INqiaoZOMGR1RIUJN+i3ChKoFRqzrRQhlkbs0= +go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.49.0 h1:jq9TW8u3so/bN+JPT166wjOI6/vQPF6Xe7nMNIltagk= +go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.49.0/go.mod h1:p8pYQP+m5XfbZm9fxtSKAbM6oIllS7s2AfxrChvc7iw= +go.opentelemetry.io/otel v1.24.0 h1:0LAOdjNmQeSTzGBzduGe/rU4tZhMwL5rWgtp9Ku5Jfo= +go.opentelemetry.io/otel v1.24.0/go.mod h1:W7b9Ozg4nkF5tWI5zsXkaKKDjdVjpD4oAt9Qi/MArHo= +go.opentelemetry.io/otel/metric v1.24.0 h1:6EhoGWWK28x1fbpA4tYTOWBkPefTDQnb8WSGXlc88kI= +go.opentelemetry.io/otel/metric v1.24.0/go.mod h1:VYhLe1rFfxuTXLgj4CBiyz+9WYBA8pNGJgDcSFRKBco= +go.opentelemetry.io/otel/sdk v1.22.0 h1:6coWHw9xw7EfClIC/+O31R8IY3/+EiRFHevmHafB2Gw= +go.opentelemetry.io/otel/sdk v1.22.0/go.mod h1:iu7luyVGYovrRpe2fmj3CVKouQNdTOkxtLzPvPz1DOc= +go.opentelemetry.io/otel/trace v1.24.0 h1:CsKnnL4dUAr/0llH9FKuc698G04IrpWV0MQA/Y1YELI= +go.opentelemetry.io/otel/trace v1.24.0/go.mod h1:HPc3Xr/cOApsBI154IU0OI0HJexz+aw5uPdbs3UCjNU= go.uber.org/atomic v1.9.0 h1:ECmE8Bn/WFTYwEW/bpKD3M8VtR/zQVbavAoalC1PYyE= go.uber.org/atomic v1.9.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc= go.uber.org/automaxprocs v1.5.3 h1:kWazyxZUrS3Gs4qUpbwo5kEIMGe/DAvi5Z4tl2NW4j8= @@ -492,6 +491,7 @@ golang.org/x/net v0.0.0-20200226121028-0de0cce0169b/go.mod h1:z5CRVTTTmAJ677TzLL golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU= golang.org/x/net v0.0.0-20201110031124-69a78807bb2b/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU= golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg= +golang.org/x/net v0.0.0-20220708220712-1185a9018129/go.mod h1:XRhObCWvk6IyKnWLug+ECip1KBveYUHfp+8e9klMJ9c= golang.org/x/net v0.0.0-20220722155237-a158d28d115b/go.mod h1:XRhObCWvk6IyKnWLug+ECip1KBveYUHfp+8e9klMJ9c= golang.org/x/net v0.1.0/go.mod h1:Cx3nUiGt4eDBEyega/BKRp+/AlGL8hYe7U9odMt2Cco= golang.org/x/net v0.6.0/go.mod h1:2Tu9+aMcznHK/AK1HMvgo6xiTLG5rD5rZLDS+rp2Bjs= @@ -554,19 +554,19 @@ golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8T golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= -golang.org/x/xerrors v0.0.0-20220907171357-04be3eba64a2 h1:H2TDz8ibqkAF6YGhCdN3jS9O0/s90v0rJh3X/OLHEUk= -golang.org/x/xerrors v0.0.0-20220907171357-04be3eba64a2/go.mod h1:K8+ghG5WaK9qNqU5K3HdILfMLy1f3aNYFI/wnl100a8= -google.golang.org/api v0.165.0 h1:zd5d4JIIIaYYsfVy1HzoXYZ9rWCSBxxAglbczzo7Bgc= -google.golang.org/api v0.165.0/go.mod h1:2OatzO7ZDQsoS7IFf3rvsE17/TldiU3F/zxFHeqUB5o= +golang.org/x/xerrors v0.0.0-20231012003039-104605ab7028 h1:+cNy6SZtPcJQH3LJVLOSmiC7MMxXNOb3PU/VUEz+EhU= +golang.org/x/xerrors v0.0.0-20231012003039-104605ab7028/go.mod h1:NDW/Ps6MPRej6fsCIbMTohpP40sJ/P/vI1MoTEGwX90= +google.golang.org/api v0.170.0 h1:zMaruDePM88zxZBG+NG8+reALO2rfLhe/JShitLyT48= +google.golang.org/api v0.170.0/go.mod h1:/xql9M2btF85xac/VAm4PsLMTLVGUOpq4BE9R8jyNy8= google.golang.org/appengine v1.1.0/go.mod h1:EbEs0AVv82hx2wNQdGPgUI5lhzA/G0D9YwlJXL52JkM= google.golang.org/appengine v1.4.0/go.mod h1:xpcJRLb0r/rnEns0DIKYYv+WjYCduHsrkT7/EB5XEv4= -google.golang.org/appengine v1.6.8 h1:IhEN5q69dyKagZPYMSdIjS2HqprW324FRQZJcGqPAsM= -google.golang.org/appengine v1.6.8/go.mod h1:1jJ3jBArFh5pcgW8gCtRJnepW8FzD1V44FJffLiz/Ds= +google.golang.org/appengine/v2 v2.0.2 h1:MSqyWy2shDLwG7chbwBJ5uMyw6SNqJzhJHNDwYB0Akk= +google.golang.org/appengine/v2 v2.0.2/go.mod h1:PkgRUWz4o1XOvbqtWTkBtCitEJ5Tp4HoVEdMMYQR/8E= google.golang.org/genproto v0.0.0-20180817151627-c66870c02cf8/go.mod h1:JiN7NxoALGmiZfu7CAH4rXhgtRTLTxftemlI0sWmxmc= google.golang.org/genproto v0.0.0-20190819201941-24fa4b261c55/go.mod h1:DMBHOl98Agz4BDEuKkezgsaosCRResVns1a3J2ZsMNc= google.golang.org/genproto v0.0.0-20200526211855-cb27e3aa2013/go.mod h1:NbSheEEYHJ7i3ixzK3sjbqSGDJWnxyFXZblF3eUsNvo= -google.golang.org/genproto v0.0.0-20240125205218-1f4bbc51befe h1:USL2DhxfgRchafRvt/wYyyQNzwgL7ZiURcozOE/Pkvo= -google.golang.org/genproto v0.0.0-20240125205218-1f4bbc51befe/go.mod h1:cc8bqMqtv9gMOr0zHg2Vzff5ULhhL2IXP4sbcn32Dro= +google.golang.org/genproto v0.0.0-20240213162025-012b6fc9bca9 h1:9+tzLLstTlPTRyJTh+ah5wIMsBW5c4tQwGTN3thOW9Y= +google.golang.org/genproto v0.0.0-20240213162025-012b6fc9bca9/go.mod h1:mqHbVIp48Muh7Ywss/AD6I5kNVKZMmAa/QEW58Gxp2s= google.golang.org/genproto/googleapis/api v0.0.0-20240604185151-ef581f913117 h1:+rdxYoE3E5htTEWIe15GlN6IfvbURM//Jt0mmkmm6ZU= google.golang.org/genproto/googleapis/api v0.0.0-20240604185151-ef581f913117/go.mod h1:OimBR/bc1wPO9iV4NC2bpyjy3VnAwZh5EBPQdtaE5oo= google.golang.org/genproto/googleapis/rpc v0.0.0-20240903143218-8af14fe29dc1 h1:pPJltXNxVzT4pK9yD8vR9X75DaWYYmLGMsEvBfFQZzQ= @@ -587,8 +587,6 @@ google.golang.org/protobuf v1.22.0/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2 google.golang.org/protobuf v1.23.0/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU= google.golang.org/protobuf v1.23.1-0.20200526195155-81db48ad09cc/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU= google.golang.org/protobuf v1.25.0/go.mod h1:9JNX74DMeImyA3h4bdi1ymwjUzf21/xIlbajtzgsN7c= -google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw= -google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc= google.golang.org/protobuf v1.34.2 h1:6xV6lTsCfpGD21XK49h7MhtcApnLqkfYgPcdHftf6hg= google.golang.org/protobuf v1.34.2/go.mod h1:qYOHts0dSfpeUzUFpOMr/WGzszTmLH+DiWniOlNbLDw= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= diff --git a/internal/api/conversation.go b/internal/api/conversation.go index 360313ea8..8e3a3ca82 100644 --- a/internal/api/conversation.go +++ b/internal/api/conversation.go @@ -62,3 +62,11 @@ func (o *ConversationApi) GetIncrementalConversation(c *gin.Context) { func (o *ConversationApi) GetOwnerConversation(c *gin.Context) { a2r.Call(conversation.ConversationClient.GetOwnerConversation, o.Client, c) } + +func (o *ConversationApi) GetNotNotifyConversationIDs(c *gin.Context) { + a2r.Call(conversation.ConversationClient.GetNotNotifyConversationIDs, o.Client, c) +} + +func (o *ConversationApi) GetPinnedConversationIDs(c *gin.Context) { + a2r.Call(conversation.ConversationClient.GetPinnedConversationIDs, o.Client, c) +} diff --git a/internal/api/friend.go b/internal/api/friend.go index f9f15fb24..d000cccdd 100644 --- a/internal/api/friend.go +++ b/internal/api/friend.go @@ -72,6 +72,10 @@ func (o *FriendApi) GetPaginationBlacks(c *gin.Context) { a2r.Call(relation.FriendClient.GetPaginationBlacks, o.Client, c) } +func (o *FriendApi) GetSpecifiedBlacks(c *gin.Context) { + a2r.Call(relation.FriendClient.GetSpecifiedBlacks, o.Client, c) +} + func (o *FriendApi) RemoveBlack(c *gin.Context) { a2r.Call(relation.FriendClient.RemoveBlack, o.Client, c) } diff --git a/internal/api/group.go b/internal/api/group.go index 3af39ae45..9c35da708 100644 --- a/internal/api/group.go +++ b/internal/api/group.go @@ -67,6 +67,10 @@ func (o *GroupApi) GetGroupUsersReqApplicationList(c *gin.Context) { a2r.Call(group.GroupClient.GetGroupUsersReqApplicationList, o.Client, c) } +func (o *GroupApi) GetSpecifiedUserGroupRequestInfo(c *gin.Context) { + a2r.Call(group.GroupClient.GetSpecifiedUserGroupRequestInfo, o.Client, c) +} + func (o *GroupApi) GetGroupsInfo(c *gin.Context) { a2r.Call(group.GroupClient.GetGroupsInfo, o.Client, c) //a2r.Call(group.GroupClient.GetGroupsInfo, o.Client, c, a2r.NewNilReplaceOption(group.GroupClient.GetGroupsInfo)) diff --git a/internal/api/jssdk/jssdk.go b/internal/api/jssdk/jssdk.go new file mode 100644 index 000000000..7f136c74c --- /dev/null +++ b/internal/api/jssdk/jssdk.go @@ -0,0 +1,204 @@ +package jssdk + +import ( + "github.com/gin-gonic/gin" + "github.com/openimsdk/protocol/conversation" + "github.com/openimsdk/protocol/msg" + "github.com/openimsdk/protocol/sdkws" + "github.com/openimsdk/tools/a2r" + "github.com/openimsdk/tools/mcontext" + "github.com/openimsdk/tools/utils/datautil" + "sort" +) + +const ( + maxGetActiveConversation = 500 + defaultGetActiveConversation = 100 +) + +func NewJSSdkApi(msg msg.MsgClient, conv conversation.ConversationClient) *JSSdk { + return &JSSdk{ + msg: msg, + conv: conv, + } +} + +type JSSdk struct { + msg msg.MsgClient + conv conversation.ConversationClient +} + +func (x *JSSdk) GetActiveConversations(c *gin.Context) { + call(c, x.getActiveConversations) +} + +func (x *JSSdk) GetConversations(c *gin.Context) { + call(c, x.getConversations) +} + +func (x *JSSdk) getActiveConversations(ctx *gin.Context) (*ConversationsResp, error) { + req, err := a2r.ParseRequest[ActiveConversationsReq](ctx) + if err != nil { + return nil, err + } + if req.Count <= 0 || req.Count > maxGetActiveConversation { + req.Count = defaultGetActiveConversation + } + opUserID := mcontext.GetOpUserID(ctx) + conversationIDs, err := field(ctx, x.conv.GetConversationIDs, + &conversation.GetConversationIDsReq{UserID: opUserID}, (*conversation.GetConversationIDsResp).GetConversationIDs) + if err != nil { + return nil, err + } + if len(conversationIDs) == 0 { + return &ConversationsResp{}, nil + } + readSeq, err := field(ctx, x.msg.GetHasReadSeqs, + &msg.GetHasReadSeqsReq{UserID: opUserID, ConversationIDs: conversationIDs}, (*msg.SeqsInfoResp).GetMaxSeqs) + if err != nil { + return nil, err + } + activeConversation, err := field(ctx, x.msg.GetActiveConversation, + &msg.GetActiveConversationReq{ConversationIDs: conversationIDs}, (*msg.GetActiveConversationResp).GetConversations) + if err != nil { + return nil, err + } + if len(activeConversation) == 0 { + return &ConversationsResp{}, nil + } + sortConversations := sortActiveConversations{ + Conversation: activeConversation, + } + if len(activeConversation) > 1 { + pinnedConversationIDs, err := field(ctx, x.conv.GetPinnedConversationIDs, + &conversation.GetPinnedConversationIDsReq{UserID: opUserID}, (*conversation.GetPinnedConversationIDsResp).GetConversationIDs) + if err != nil { + return nil, err + } + sortConversations.PinnedConversationIDs = datautil.SliceSet(pinnedConversationIDs) + } + sort.Sort(&sortConversations) + sortList := sortConversations.Top(req.Count) + conversations, err := field(ctx, x.conv.GetConversations, + &conversation.GetConversationsReq{ + OwnerUserID: opUserID, + ConversationIDs: datautil.Slice(sortList, func(c *msg.ActiveConversation) string { + return c.ConversationID + })}, (*conversation.GetConversationsResp).GetConversations) + if err != nil { + return nil, err + } + msgs, err := field(ctx, x.msg.GetSeqMessage, + &msg.GetSeqMessageReq{ + UserID: opUserID, + Conversations: datautil.Slice(sortList, func(c *msg.ActiveConversation) *msg.ConversationSeqs { + return &msg.ConversationSeqs{ + ConversationID: c.ConversationID, + Seqs: []int64{c.MaxSeq}, + } + }), + }, (*msg.GetSeqMessageResp).GetMsgs) + if err != nil { + return nil, err + } + conversationMap := datautil.SliceToMap(conversations, func(c *conversation.Conversation) string { + return c.ConversationID + }) + resp := make([]ConversationMsg, 0, len(sortList)) + for _, c := range sortList { + conv, ok := conversationMap[c.ConversationID] + if !ok { + continue + } + var lastMsg *sdkws.MsgData + if msgList, ok := msgs[c.ConversationID]; ok && len(msgList.Msgs) > 0 { + lastMsg = msgList.Msgs[0] + } + resp = append(resp, ConversationMsg{ + Conversation: conv, + LastMsg: lastMsg, + MaxSeq: c.MaxSeq, + ReadSeq: readSeq[c.ConversationID], + }) + } + var unreadCount int64 + for _, c := range activeConversation { + count := c.MaxSeq - readSeq[c.ConversationID] + if count > 0 { + unreadCount += count + } + } + return &ConversationsResp{ + Conversations: resp, + UnreadCount: unreadCount, + }, nil +} + +func (x *JSSdk) getConversations(ctx *gin.Context) (*ConversationsResp, error) { + req, err := a2r.ParseRequest[conversation.GetConversationsReq](ctx) + if err != nil { + return nil, err + } + req.OwnerUserID = mcontext.GetOpUserID(ctx) + conversations, err := field(ctx, x.conv.GetConversations, req, (*conversation.GetConversationsResp).GetConversations) + if err != nil { + return nil, err + } + if len(conversations) == 0 { + return &ConversationsResp{}, nil + } + req.ConversationIDs = datautil.Slice(conversations, func(c *conversation.Conversation) string { + return c.ConversationID + }) + maxSeqs, err := field(ctx, x.msg.GetMaxSeqs, + &msg.GetMaxSeqsReq{ConversationIDs: req.ConversationIDs}, (*msg.SeqsInfoResp).GetMaxSeqs) + if err != nil { + return nil, err + } + readSeqs, err := field(ctx, x.msg.GetHasReadSeqs, + &msg.GetHasReadSeqsReq{UserID: req.OwnerUserID, ConversationIDs: req.ConversationIDs}, (*msg.SeqsInfoResp).GetMaxSeqs) + if err != nil { + return nil, err + } + conversationSeqs := make([]*msg.ConversationSeqs, 0, len(conversations)) + for _, c := range conversations { + if seq := maxSeqs[c.ConversationID]; seq > 0 { + conversationSeqs = append(conversationSeqs, &msg.ConversationSeqs{ + ConversationID: c.ConversationID, + Seqs: []int64{seq}, + }) + } + } + var msgs map[string]*sdkws.PullMsgs + if len(conversationSeqs) > 0 { + msgs, err = field(ctx, x.msg.GetSeqMessage, + &msg.GetSeqMessageReq{UserID: req.OwnerUserID, Conversations: conversationSeqs}, (*msg.GetSeqMessageResp).GetMsgs) + if err != nil { + return nil, err + } + } + resp := make([]ConversationMsg, 0, len(conversations)) + for _, c := range conversations { + var lastMsg *sdkws.MsgData + if msgList, ok := msgs[c.ConversationID]; ok && len(msgList.Msgs) > 0 { + lastMsg = msgList.Msgs[0] + } + resp = append(resp, ConversationMsg{ + Conversation: c, + LastMsg: lastMsg, + MaxSeq: maxSeqs[c.ConversationID], + ReadSeq: readSeqs[c.ConversationID], + }) + } + var unreadCount int64 + for conversationID, maxSeq := range maxSeqs { + count := maxSeq - readSeqs[conversationID] + if count > 0 { + unreadCount += count + } + } + return &ConversationsResp{ + Conversations: resp, + UnreadCount: unreadCount, + }, nil +} diff --git a/internal/api/jssdk/sort.go b/internal/api/jssdk/sort.go new file mode 100644 index 000000000..f5fd04148 --- /dev/null +++ b/internal/api/jssdk/sort.go @@ -0,0 +1,33 @@ +package jssdk + +import "github.com/openimsdk/protocol/msg" + +type sortActiveConversations struct { + Conversation []*msg.ActiveConversation + PinnedConversationIDs map[string]struct{} +} + +func (s sortActiveConversations) Top(limit int) []*msg.ActiveConversation { + if limit > 0 && len(s.Conversation) > limit { + return s.Conversation[:limit] + } + return s.Conversation +} + +func (s sortActiveConversations) Len() int { + return len(s.Conversation) +} + +func (s sortActiveConversations) Less(i, j int) bool { + iv, jv := s.Conversation[i], s.Conversation[j] + _, ip := s.PinnedConversationIDs[iv.ConversationID] + _, jp := s.PinnedConversationIDs[jv.ConversationID] + if ip != jp { + return ip + } + return iv.LastTime > jv.LastTime +} + +func (s sortActiveConversations) Swap(i, j int) { + s.Conversation[i], s.Conversation[j] = s.Conversation[j], s.Conversation[i] +} diff --git a/internal/api/jssdk/stu.go b/internal/api/jssdk/stu.go new file mode 100644 index 000000000..2f63975b3 --- /dev/null +++ b/internal/api/jssdk/stu.go @@ -0,0 +1,22 @@ +package jssdk + +import ( + "github.com/openimsdk/protocol/conversation" + "github.com/openimsdk/protocol/sdkws" +) + +type ActiveConversationsReq struct { + Count int `json:"count"` +} + +type ConversationMsg struct { + Conversation *conversation.Conversation `json:"conversation"` + LastMsg *sdkws.MsgData `json:"lastMsg"` + MaxSeq int64 `json:"maxSeq"` + ReadSeq int64 `json:"readSeq"` +} + +type ConversationsResp struct { + UnreadCount int64 `json:"unreadCount"` + Conversations []ConversationMsg `json:"conversations"` +} diff --git a/internal/api/jssdk/tools.go b/internal/api/jssdk/tools.go new file mode 100644 index 000000000..c57457d9f --- /dev/null +++ b/internal/api/jssdk/tools.go @@ -0,0 +1,26 @@ +package jssdk + +import ( + "context" + "github.com/gin-gonic/gin" + "github.com/openimsdk/tools/apiresp" + "google.golang.org/grpc" +) + +func field[A, B, C any](ctx context.Context, fn func(ctx context.Context, req *A, opts ...grpc.CallOption) (*B, error), req *A, get func(*B) C) (C, error) { + resp, err := fn(ctx, req) + if err != nil { + var c C + return c, err + } + return get(resp), nil +} + +func call[R any](c *gin.Context, fn func(ctx *gin.Context) (R, error)) { + resp, err := fn(c) + if err != nil { + apiresp.GinError(c, err) + return + } + apiresp.GinSuccess(c, resp) +} diff --git a/internal/api/jssdk_test.go b/internal/api/jssdk_test.go new file mode 100644 index 000000000..472ca56b5 --- /dev/null +++ b/internal/api/jssdk_test.go @@ -0,0 +1,37 @@ +package api + +import ( + "github.com/openimsdk/protocol/msg" + "sort" + "testing" +) + +func TestName(t *testing.T) { + val := sortActiveConversations{ + Conversation: []*msg.ActiveConversation{ + { + ConversationID: "100", + LastTime: 100, + }, + { + ConversationID: "200", + LastTime: 200, + }, + { + ConversationID: "300", + LastTime: 300, + }, + { + ConversationID: "400", + LastTime: 400, + }, + }, + //PinnedConversationIDs: map[string]struct{}{ + // "100": {}, + // "300": {}, + //}, + } + sort.Sort(&val) + t.Log(val) + +} diff --git a/internal/api/router.go b/internal/api/router.go index 37d0347ad..8d2a688f4 100644 --- a/internal/api/router.go +++ b/internal/api/router.go @@ -2,6 +2,8 @@ package api import ( "fmt" + "github.com/openimsdk/open-im-server/v3/internal/api/jssdk" + "github.com/gin-contrib/gzip" "github.com/gin-gonic/gin" @@ -74,6 +76,7 @@ func newGinRouter(disCov discovery.SvcDiscoveryRegistry, config *Config) *gin.En r.Use(prommetricsGin(), gin.Recovery(), mw.CorsHandler(), mw.GinParseOperationID(), GinParseToken(authRpc)) u := NewUserApi(*userRpc) m := NewMessageApi(messageRpc, userRpc, config.Share.IMAdminUserID) + j := jssdk.NewJSSdkApi(messageRpc.Client, conversationRpc.Client) userRouterGroup := r.Group("/user") { userRouterGroup.POST("/user_register", u.UserRegister) @@ -115,6 +118,7 @@ func newGinRouter(disCov discovery.SvcDiscoveryRegistry, config *Config) *gin.En friendRouterGroup.POST("/set_friend_remark", f.SetFriendRemark) friendRouterGroup.POST("/add_black", f.AddBlack) friendRouterGroup.POST("/get_black_list", f.GetPaginationBlacks) + friendRouterGroup.POST("/get_specified_blacks", f.GetSpecifiedBlacks) friendRouterGroup.POST("/remove_black", f.RemoveBlack) friendRouterGroup.POST("/get_incremental_blacks", f.GetIncrementalBlacks) friendRouterGroup.POST("/import_friend", f.ImportFriends) @@ -138,6 +142,7 @@ func newGinRouter(disCov discovery.SvcDiscoveryRegistry, config *Config) *gin.En groupRouterGroup.POST("/get_recv_group_applicationList", g.GetRecvGroupApplicationList) groupRouterGroup.POST("/get_user_req_group_applicationList", g.GetUserReqGroupApplicationList) groupRouterGroup.POST("/get_group_users_req_application_list", g.GetGroupUsersReqApplicationList) + groupRouterGroup.POST("/get_specified_user_group_request_info", g.GetSpecifiedUserGroupRequestInfo) groupRouterGroup.POST("/get_groups_info", g.GetGroupsInfo) groupRouterGroup.POST("/kick_group", g.KickGroupMember) groupRouterGroup.POST("/get_group_members_info", g.GetGroupMembersInfo) @@ -230,6 +235,8 @@ func newGinRouter(disCov discovery.SvcDiscoveryRegistry, config *Config) *gin.En conversationGroup.POST("/get_full_conversation_ids", c.GetFullOwnerConversationIDs) conversationGroup.POST("/get_incremental_conversations", c.GetIncrementalConversation) conversationGroup.POST("/get_owner_conversation", c.GetOwnerConversation) + conversationGroup.POST("/get_not_notify_conversation_ids", c.GetNotNotifyConversationIDs) + conversationGroup.POST("/get_pinned_conversation_ids", c.GetPinnedConversationIDs) } statisticsGroup := r.Group("/statistics") @@ -239,6 +246,11 @@ func newGinRouter(disCov discovery.SvcDiscoveryRegistry, config *Config) *gin.En statisticsGroup.POST("/group/create", g.GroupCreateCount) statisticsGroup.POST("/group/active", m.GetActiveGroup) } + + jssdk := r.Group("/jssdk") + jssdk.POST("/get_conversations", j.GetConversations) + jssdk.POST("/get_active_conversations", j.GetActiveConversations) + return r } @@ -275,7 +287,6 @@ func GinParseToken(authRPC *rpcclient.Auth) gin.HandlerFunc { // Whitelist api not parse token var Whitelist = []string{ - "/user/user_register", "/auth/user_token", "/auth/parse_token", } diff --git a/internal/msggateway/ws_server.go b/internal/msggateway/ws_server.go index 60e2b8d53..7df297488 100644 --- a/internal/msggateway/ws_server.go +++ b/internal/msggateway/ws_server.go @@ -321,7 +321,7 @@ func (ws *WsServer) KickUserConn(client *Client) error { } func (ws *WsServer) multiTerminalLoginChecker(clientOK bool, oldClients []*Client, newClient *Client) { - switch ws.msgGatewayConfig.MsgGateway.MultiLoginPolicy { + switch ws.msgGatewayConfig.Share.MultiLoginPolicy { case constant.DefalutNotKick: case constant.PCAndOther: if constant.PlatformIDToClass(newClient.PlatformID) == constant.TerminalPC { diff --git a/internal/push/offlinepush/fcm/push.go b/internal/push/offlinepush/fcm/push.go index f015ca4e5..6e8355af3 100644 --- a/internal/push/offlinepush/fcm/push.go +++ b/internal/push/offlinepush/fcm/push.go @@ -22,8 +22,8 @@ import ( "path/filepath" "strings" - firebase "firebase.google.com/go" - "firebase.google.com/go/messaging" + firebase "firebase.google.com/go/v4" + "firebase.google.com/go/v4/messaging" "github.com/openimsdk/open-im-server/v3/pkg/common/config" "github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache" "github.com/openimsdk/protocol/constant" @@ -99,7 +99,7 @@ func (f *Fcm) Push(ctx context.Context, userIDs []string, title, content string, apns := &messaging.APNSConfig{Payload: &messaging.APNSPayload{Aps: &messaging.Aps{Sound: opts.IOSPushSound}}} messageCount := len(messages) if messageCount >= SinglePushCountLimit { - response, err := f.fcmMsgCli.SendAll(ctx, messages) + response, err := f.fcmMsgCli.SendEach(ctx, messages) if err != nil { Fail = Fail + messageCount // Record push error @@ -154,7 +154,7 @@ func (f *Fcm) Push(ctx context.Context, userIDs []string, title, content string, } messageCount := len(messages) if messageCount > 0 { - response, err := f.fcmMsgCli.SendAll(ctx, messages) + response, err := f.fcmMsgCli.SendEach(ctx, messages) if err != nil { Fail = Fail + messageCount } else { diff --git a/internal/rpc/auth/auth.go b/internal/rpc/auth/auth.go index 804375e4f..d870a6c58 100644 --- a/internal/rpc/auth/auth.go +++ b/internal/rpc/auth/auth.go @@ -64,6 +64,7 @@ func Start(ctx context.Context, config *Config, client discovery.SvcDiscoveryReg redis2.NewTokenCacheModel(rdb, config.RpcConfig.TokenPolicy.Expire), config.Share.Secret, config.RpcConfig.TokenPolicy.Expire, + config.Share.MultiLoginPolicy, ), config: config, }) diff --git a/internal/rpc/conversation/conversaion.go b/internal/rpc/conversation/conversation.go similarity index 96% rename from internal/rpc/conversation/conversaion.go rename to internal/rpc/conversation/conversation.go index 66e85cb8f..6f6ca1f67 100644 --- a/internal/rpc/conversation/conversaion.go +++ b/internal/rpc/conversation/conversation.go @@ -710,3 +710,19 @@ func (c *conversationServer) GetConversationsNeedDestructMsgs(ctx context.Contex return &pbconversation.GetConversationsNeedDestructMsgsResp{Conversations: convert.ConversationsDB2Pb(temp)}, nil } + +func (c *conversationServer) GetNotNotifyConversationIDs(ctx context.Context, req *pbconversation.GetNotNotifyConversationIDsReq) (*pbconversation.GetNotNotifyConversationIDsResp, error) { + conversationIDs, err := c.conversationDatabase.GetNotNotifyConversationIDs(ctx, req.UserID) + if err != nil { + return nil, err + } + return &pbconversation.GetNotNotifyConversationIDsResp{ConversationIDs: conversationIDs}, nil +} + +func (c *conversationServer) GetPinnedConversationIDs(ctx context.Context, req *pbconversation.GetPinnedConversationIDsReq) (*pbconversation.GetPinnedConversationIDsResp, error) { + conversationIDs, err := c.conversationDatabase.GetPinnedConversationIDs(ctx, req.UserID) + if err != nil { + return nil, err + } + return &pbconversation.GetPinnedConversationIDsResp{ConversationIDs: conversationIDs}, nil +} diff --git a/internal/rpc/group/callback.go b/internal/rpc/group/callback.go index ef391bd70..e748c66dc 100644 --- a/internal/rpc/group/callback.go +++ b/internal/rpc/group/callback.go @@ -218,6 +218,7 @@ func (s *groupServer) webhookAfterKickGroupMember(ctx context.Context, after *co CallbackCommand: callbackstruct.CallbackAfterKickGroupCommand, GroupID: req.GroupID, KickedUserIDs: req.KickedUserIDs, + Reason: req.Reason, } s.webhookClient.AsyncPost(ctx, cbReq.GetCallbackCommand(), cbReq, &callbackstruct.CallbackKillGroupMemberResp{}, after) } diff --git a/internal/rpc/group/group.go b/internal/rpc/group/group.go index bb0783460..8cb098dde 100644 --- a/internal/rpc/group/group.go +++ b/internal/rpc/group/group.go @@ -167,11 +167,11 @@ func (g *groupServer) CheckGroupAdmin(ctx context.Context, groupID string) error return nil } -func (g *groupServer) GetPublicUserInfoMap(ctx context.Context, userIDs []string, complete bool) (map[string]*sdkws.PublicUserInfo, error) { +func (g *groupServer) GetPublicUserInfoMap(ctx context.Context, userIDs []string) (map[string]*sdkws.PublicUserInfo, error) { if len(userIDs) == 0 { return map[string]*sdkws.PublicUserInfo{}, nil } - users, err := g.user.GetPublicUserInfos(ctx, userIDs, complete) + users, err := g.user.GetPublicUserInfos(ctx, userIDs) if err != nil { return nil, err } @@ -696,7 +696,7 @@ func (g *groupServer) GetGroupApplicationList(ctx context.Context, req *pbgroup. userIDs = append(userIDs, gr.UserID) } userIDs = datautil.Distinct(userIDs) - userMap, err := g.user.GetPublicUserInfoMap(ctx, userIDs, true) + userMap, err := g.user.GetPublicUserInfoMap(ctx, userIDs) if err != nil { return nil, err } @@ -1685,36 +1685,51 @@ func (g *groupServer) GetGroupUsersReqApplicationList(ctx context.Context, req * if err != nil { return nil, err } + if len(requests) == 0 { return &pbgroup.GetGroupUsersReqApplicationListResp{}, nil } + groupIDs := datautil.Distinct(datautil.Slice(requests, func(e *model.GroupRequest) string { return e.GroupID })) + groups, err := g.db.FindGroup(ctx, groupIDs) if err != nil { return nil, err } + groupMap := datautil.SliceToMap(groups, func(e *model.Group) string { return e.GroupID }) + if ids := datautil.Single(groupIDs, datautil.Keys(groupMap)); len(ids) > 0 { return nil, servererrs.ErrGroupIDNotFound.WrapMsg(strings.Join(ids, ",")) } + + userMap, err := g.user.GetPublicUserInfoMap(ctx, req.UserIDs) + if err != nil { + return nil, err + } + owners, err := g.db.FindGroupsOwner(ctx, groupIDs) if err != nil { return nil, err } + if err := g.PopulateGroupMember(ctx, owners...); err != nil { return nil, err } + ownerMap := datautil.SliceToMap(owners, func(e *model.GroupMember) string { return e.GroupID }) + groupMemberNum, err := g.db.MapGroupMemberNum(ctx, groupIDs) if err != nil { return nil, err } + return &pbgroup.GetGroupUsersReqApplicationListResp{ Total: int64(len(requests)), GroupRequests: datautil.Slice(requests, func(e *model.GroupRequest) *sdkws.GroupRequest { @@ -1722,7 +1737,71 @@ func (g *groupServer) GetGroupUsersReqApplicationList(ctx context.Context, req * if owner, ok := ownerMap[e.GroupID]; ok { ownerUserID = owner.UserID } - return convert.Db2PbGroupRequest(e, nil, convert.Db2PbGroupInfo(groupMap[e.GroupID], ownerUserID, groupMemberNum[e.GroupID])) + + var userInfo *sdkws.PublicUserInfo + if user, ok := userMap[e.UserID]; !ok { + userInfo = user + } + + return convert.Db2PbGroupRequest(e, userInfo, convert.Db2PbGroupInfo(groupMap[e.GroupID], ownerUserID, groupMemberNum[e.GroupID])) }), }, nil } + +func (g *groupServer) GetSpecifiedUserGroupRequestInfo(ctx context.Context, req *pbgroup.GetSpecifiedUserGroupRequestInfoReq) (*pbgroup.GetSpecifiedUserGroupRequestInfoResp, error) { + opUserID := mcontext.GetOpUserID(ctx) + + owners, err := g.db.FindGroupsOwner(ctx, []string{req.GroupID}) + if err != nil { + return nil, err + } + + if req.UserID != opUserID { + req.UserID = mcontext.GetOpUserID(ctx) + adminIDs, err := g.db.GetGroupRoleLevelMemberIDs(ctx, req.GroupID, constant.GroupAdmin) + if err != nil { + return nil, err + } + + adminIDs = append(adminIDs, owners[0].UserID) + + if !datautil.Contain(req.UserID, adminIDs...) { + return nil, errs.ErrNoPermission.WrapMsg("opUser no permission") + } + } + requests, err := g.db.FindGroupRequests(ctx, req.GroupID, []string{req.UserID}) + if err != nil { + return nil, err + } + + if len(requests) == 0 { + return &pbgroup.GetSpecifiedUserGroupRequestInfoResp{}, nil + } + + groups, err := g.db.FindGroup(ctx, []string{req.GroupID}) + if err != nil { + return nil, err + } + + userInfos, err := g.user.GetPublicUserInfos(ctx, []string{req.UserID}) + if err != nil { + return nil, err + } + + groupMemberNum, err := g.db.MapGroupMemberNum(ctx, []string{req.GroupID}) + if err != nil { + return nil, err + } + + resp := &pbgroup.GetSpecifiedUserGroupRequestInfoResp{ + GroupRequests: make([]*sdkws.GroupRequest, 0, len(requests)), + } + + for _, request := range requests { + resp.GroupRequests = append(resp.GroupRequests, convert.Db2PbGroupRequest(request, userInfos[0], convert.Db2PbGroupInfo(groups[0], owners[0].UserID, groupMemberNum[groups[0].GroupID]))) + } + + resp.Total = uint32(len(requests)) + + return resp, nil +} diff --git a/internal/rpc/msg/as_read.go b/internal/rpc/msg/as_read.go index bfba4824f..03f35b42d 100644 --- a/internal/rpc/msg/as_read.go +++ b/internal/rpc/msg/as_read.go @@ -55,7 +55,7 @@ func (m *msgServer) GetConversationsHasReadAndMaxSeq(ctx context.Context, req *m conversationMaxSeqMap[conversation.ConversationID] = conversation.MaxSeq } } - maxSeqs, err := m.MsgDatabase.GetMaxSeqs(ctx, conversationIDs) + maxSeqs, err := m.MsgDatabase.GetMaxSeqsWithTime(ctx, conversationIDs) if err != nil { return nil, err } @@ -63,7 +63,8 @@ func (m *msgServer) GetConversationsHasReadAndMaxSeq(ctx context.Context, req *m for conversationID, maxSeq := range maxSeqs { resp.Seqs[conversationID] = &msg.Seqs{ HasReadSeq: hasReadSeqs[conversationID], - MaxSeq: maxSeq, + MaxSeq: maxSeq.Seq, + MaxSeqTime: maxSeq.Time, } if v, ok := conversationMaxSeqMap[conversationID]; ok { resp.Seqs[conversationID].MaxSeq = v diff --git a/internal/rpc/msg/seq.go b/internal/rpc/msg/seq.go index 4d9eb6db9..5d40160de 100644 --- a/internal/rpc/msg/seq.go +++ b/internal/rpc/msg/seq.go @@ -16,10 +16,10 @@ package msg import ( "context" + pbmsg "github.com/openimsdk/protocol/msg" "github.com/openimsdk/tools/errs" "github.com/redis/go-redis/v9" - - pbmsg "github.com/openimsdk/protocol/msg" + "sort" ) func (m *msgServer) GetConversationMaxSeq(ctx context.Context, req *pbmsg.GetConversationMaxSeqReq) (*pbmsg.GetConversationMaxSeqResp, error) { @@ -62,3 +62,25 @@ func (m *msgServer) SetUserConversationsMinSeq(ctx context.Context, req *pbmsg.S } return &pbmsg.SetUserConversationsMinSeqResp{}, nil } + +func (m *msgServer) GetActiveConversation(ctx context.Context, req *pbmsg.GetActiveConversationReq) (*pbmsg.GetActiveConversationResp, error) { + res, err := m.MsgDatabase.GetCacheMaxSeqWithTime(ctx, req.ConversationIDs) + if err != nil { + return nil, err + } + conversations := make([]*pbmsg.ActiveConversation, 0, len(res)) + for conversationID, val := range res { + conversations = append(conversations, &pbmsg.ActiveConversation{ + MaxSeq: val.Seq, + LastTime: val.Time, + ConversationID: conversationID, + }) + } + if req.Limit > 0 { + sort.Sort(activeConversations(conversations)) + if len(conversations) > int(req.Limit) { + conversations = conversations[:req.Limit] + } + } + return &pbmsg.GetActiveConversationResp{Conversations: conversations}, nil +} diff --git a/internal/rpc/msg/utils.go b/internal/rpc/msg/utils.go index 69b4d0bf6..e3490848c 100644 --- a/internal/rpc/msg/utils.go +++ b/internal/rpc/msg/utils.go @@ -15,6 +15,7 @@ package msg import ( + "github.com/openimsdk/protocol/msg" "github.com/openimsdk/tools/errs" "github.com/redis/go-redis/v9" "go.mongodb.org/mongo-driver/mongo" @@ -28,3 +29,63 @@ func IsNotFound(err error) bool { return false } } + +type activeConversations []*msg.ActiveConversation + +func (s activeConversations) Len() int { + return len(s) +} + +func (s activeConversations) Less(i, j int) bool { + return s[i].LastTime > s[j].LastTime +} + +func (s activeConversations) Swap(i, j int) { + s[i], s[j] = s[j], s[i] +} + +//type seqTime struct { +// ConversationID string +// Seq int64 +// Time int64 +// Unread int64 +// Pinned bool +//} +// +//func (s seqTime) String() string { +// return fmt.Sprintf("", s.Time, s.Unread, s.Pinned) +//} +// +//type seqTimes []seqTime +// +//func (s seqTimes) Len() int { +// return len(s) +//} +// +//// Less sticky priority, unread priority, time descending +//func (s seqTimes) Less(i, j int) bool { +// iv, jv := s[i], s[j] +// if iv.Pinned && (!jv.Pinned) { +// return true +// } +// if jv.Pinned && (!iv.Pinned) { +// return false +// } +// if iv.Unread > 0 && jv.Unread == 0 { +// return true +// } +// if jv.Unread > 0 && iv.Unread == 0 { +// return false +// } +// return iv.Time > jv.Time +//} +// +//func (s seqTimes) Swap(i, j int) { +// s[i], s[j] = s[j], s[i] +//} +// +//type conversationStatus struct { +// ConversationID string +// Pinned bool +// Recv bool +//} diff --git a/internal/rpc/relation/black.go b/internal/rpc/relation/black.go index e149e3165..d8d457dac 100644 --- a/internal/rpc/relation/black.go +++ b/internal/rpc/relation/black.go @@ -23,13 +23,17 @@ import ( "github.com/openimsdk/open-im-server/v3/pkg/authverify" "github.com/openimsdk/open-im-server/v3/pkg/common/convert" "github.com/openimsdk/protocol/relation" + "github.com/openimsdk/protocol/sdkws" + "github.com/openimsdk/tools/errs" "github.com/openimsdk/tools/mcontext" + "github.com/openimsdk/tools/utils/datautil" ) func (s *friendServer) GetPaginationBlacks(ctx context.Context, req *relation.GetPaginationBlacksReq) (resp *relation.GetPaginationBlacksResp, err error) { - if err := s.userRpcClient.Access(ctx, req.UserID); err != nil { + if err := authverify.CheckAccessV3(ctx, req.UserID, s.config.Share.IMAdminUserID); err != nil { return nil, err } + total, blacks, err := s.blackDatabase.FindOwnerBlacks(ctx, req.UserID, req.Pagination) if err != nil { return nil, err @@ -55,7 +59,7 @@ func (s *friendServer) IsBlack(ctx context.Context, req *relation.IsBlackReq) (* } func (s *friendServer) RemoveBlack(ctx context.Context, req *relation.RemoveBlackReq) (*relation.RemoveBlackResp, error) { - if err := s.userRpcClient.Access(ctx, req.OwnerUserID); err != nil { + if err := authverify.CheckAccessV3(ctx, req.OwnerUserID, s.config.Share.IMAdminUserID); err != nil { return nil, err } @@ -64,6 +68,7 @@ func (s *friendServer) RemoveBlack(ctx context.Context, req *relation.RemoveBlac } s.notificationSender.BlackDeletedNotification(ctx, req) + s.webhookAfterRemoveBlack(ctx, &s.config.WebhooksConfig.AfterRemoveBlack, req) return &relation.RemoveBlackResp{}, nil } @@ -72,6 +77,11 @@ func (s *friendServer) AddBlack(ctx context.Context, req *relation.AddBlackReq) if err := authverify.CheckAccessV3(ctx, req.OwnerUserID, s.config.Share.IMAdminUserID); err != nil { return nil, err } + + if err := s.webhookBeforeAddBlack(ctx, &s.config.WebhooksConfig.BeforeAddBlack, req); err != nil { + return nil, err + } + _, err := s.userRpcClient.GetUsersInfo(ctx, []string{req.OwnerUserID, req.BlackUserID}) if err != nil { return nil, err @@ -90,3 +100,53 @@ func (s *friendServer) AddBlack(ctx context.Context, req *relation.AddBlackReq) s.notificationSender.BlackAddedNotification(ctx, req) return &relation.AddBlackResp{}, nil } + +func (s *friendServer) GetSpecifiedBlacks(ctx context.Context, req *relation.GetSpecifiedBlacksReq) (*relation.GetSpecifiedBlacksResp, error) { + if err := authverify.CheckAccessV3(ctx, req.OwnerUserID, s.config.Share.IMAdminUserID); err != nil { + return nil, err + } + + if len(req.UserIDList) == 0 { + return nil, errs.ErrArgs.WrapMsg("userIDList is empty") + } + + if datautil.Duplicate(req.UserIDList) { + return nil, errs.ErrArgs.WrapMsg("userIDList repeated") + } + + userMap, err := s.userRpcClient.GetPublicUserInfoMap(ctx, req.UserIDList) + if err != nil { + return nil, err + } + + blacks, err := s.blackDatabase.FindBlackInfos(ctx, req.OwnerUserID, req.UserIDList) + if err != nil { + return nil, err + } + + blackMap := datautil.SliceToMap(blacks, func(e *model.Black) string { + return e.BlockUserID + }) + + resp := &relation.GetSpecifiedBlacksResp{ + Blacks: make([]*sdkws.BlackInfo, 0, len(req.UserIDList)), + } + + for _, userID := range req.UserIDList { + if black := blackMap[userID]; black != nil { + resp.Blacks = append(resp.Blacks, + &sdkws.BlackInfo{ + OwnerUserID: black.OwnerUserID, + CreateTime: black.CreateTime.UnixMilli(), + BlackUserInfo: userMap[userID], + AddSource: black.AddSource, + OperatorUserID: black.OperatorUserID, + Ex: black.Ex, + }) + } + } + + resp.Total = int32(len(resp.Blacks)) + + return resp, nil +} diff --git a/internal/rpc/relation/friend.go b/internal/rpc/relation/friend.go index 913058932..f049420d9 100644 --- a/internal/rpc/relation/friend.go +++ b/internal/rpc/relation/friend.go @@ -228,20 +228,23 @@ func (s *friendServer) RespondFriendApply(ctx context.Context, req *relation.Res // ok. func (s *friendServer) DeleteFriend(ctx context.Context, req *relation.DeleteFriendReq) (resp *relation.DeleteFriendResp, err error) { - resp = &relation.DeleteFriendResp{} - if err := s.userRpcClient.Access(ctx, req.OwnerUserID); err != nil { + if err := authverify.CheckAccessV3(ctx, req.OwnerUserID, s.config.Share.IMAdminUserID); err != nil { return nil, err } + _, err = s.db.FindFriendsWithError(ctx, req.OwnerUserID, []string{req.FriendUserID}) if err != nil { return nil, err } + if err := s.db.Delete(ctx, req.OwnerUserID, []string{req.FriendUserID}); err != nil { return nil, err } + s.notificationSender.FriendDeletedNotification(ctx, req) s.webhookAfterDeleteFriend(ctx, &s.config.WebhooksConfig.AfterDeleteFriend, req) - return resp, nil + + return &relation.DeleteFriendResp{}, nil } // ok. @@ -249,20 +252,24 @@ func (s *friendServer) SetFriendRemark(ctx context.Context, req *relation.SetFri if err = s.webhookBeforeSetFriendRemark(ctx, &s.config.WebhooksConfig.BeforeSetFriendRemark, req); err != nil && err != servererrs.ErrCallbackContinue { return nil, err } - resp = &relation.SetFriendRemarkResp{} - if err := s.userRpcClient.Access(ctx, req.OwnerUserID); err != nil { + + if err := authverify.CheckAccessV3(ctx, req.OwnerUserID, s.config.Share.IMAdminUserID); err != nil { return nil, err } + _, err = s.db.FindFriendsWithError(ctx, req.OwnerUserID, []string{req.FriendUserID}) if err != nil { return nil, err } + if err := s.db.UpdateRemark(ctx, req.OwnerUserID, req.FriendUserID, req.Remark); err != nil { return nil, err } + s.webhookAfterSetFriendRemark(ctx, &s.config.WebhooksConfig.AfterSetFriendRemark, req) s.notificationSender.FriendRemarkSetNotification(ctx, req.OwnerUserID, req.FriendUserID) - return resp, nil + + return &relation.SetFriendRemarkResp{}, nil } // ok. @@ -309,7 +316,7 @@ func (s *friendServer) GetDesignatedFriendsApply(ctx context.Context, // Get received friend requests (i.e., those initiated by others). func (s *friendServer) GetPaginationFriendsApplyTo(ctx context.Context, req *relation.GetPaginationFriendsApplyToReq) (resp *relation.GetPaginationFriendsApplyToResp, err error) { - if err := s.userRpcClient.Access(ctx, req.UserID); err != nil { + if err := authverify.CheckAccessV3(ctx, req.UserID, s.config.Share.IMAdminUserID); err != nil { return nil, err } @@ -331,18 +338,23 @@ func (s *friendServer) GetPaginationFriendsApplyTo(ctx context.Context, req *rel func (s *friendServer) GetPaginationFriendsApplyFrom(ctx context.Context, req *relation.GetPaginationFriendsApplyFromReq) (resp *relation.GetPaginationFriendsApplyFromResp, err error) { resp = &relation.GetPaginationFriendsApplyFromResp{} - if err := s.userRpcClient.Access(ctx, req.UserID); err != nil { + + if err := authverify.CheckAccessV3(ctx, req.UserID, s.config.Share.IMAdminUserID); err != nil { return nil, err } + total, friendRequests, err := s.db.PageFriendRequestFromMe(ctx, req.UserID, req.Pagination) if err != nil { return nil, err } + resp.FriendRequests, err = convert.FriendRequestDB2Pb(ctx, friendRequests, s.userRpcClient.GetUsersInfoMap) if err != nil { return nil, err } + resp.Total = int32(total) + return resp, nil } @@ -357,31 +369,37 @@ func (s *friendServer) IsFriend(ctx context.Context, req *relation.IsFriendReq) } func (s *friendServer) GetPaginationFriends(ctx context.Context, req *relation.GetPaginationFriendsReq) (resp *relation.GetPaginationFriendsResp, err error) { - if err := s.userRpcClient.Access(ctx, req.UserID); err != nil { + if err := authverify.CheckAccessV3(ctx, req.UserID, s.config.Share.IMAdminUserID); err != nil { return nil, err } + total, friends, err := s.db.PageOwnerFriends(ctx, req.UserID, req.Pagination) if err != nil { return nil, err } + resp = &relation.GetPaginationFriendsResp{} resp.FriendsInfo, err = convert.FriendsDB2Pb(ctx, friends, s.userRpcClient.GetUsersInfoMap) if err != nil { return nil, err } + resp.Total = int32(total) + return resp, nil } func (s *friendServer) GetFriendIDs(ctx context.Context, req *relation.GetFriendIDsReq) (resp *relation.GetFriendIDsResp, err error) { - if err := s.userRpcClient.Access(ctx, req.UserID); err != nil { + if err := authverify.CheckAccessV3(ctx, req.UserID, s.config.Share.IMAdminUserID); err != nil { return nil, err } + resp = &relation.GetFriendIDsResp{} resp.FriendIDs, err = s.db.FindFriendUserIDs(ctx, req.UserID) if err != nil { return nil, err } + return resp, nil } @@ -389,35 +407,45 @@ func (s *friendServer) GetSpecifiedFriendsInfo(ctx context.Context, req *relatio if len(req.UserIDList) == 0 { return nil, errs.ErrArgs.WrapMsg("userIDList is empty") } + if datautil.Duplicate(req.UserIDList) { return nil, errs.ErrArgs.WrapMsg("userIDList repeated") } + userMap, err := s.userRpcClient.GetUsersInfoMap(ctx, req.UserIDList) if err != nil { return nil, err } + friends, err := s.db.FindFriendsWithError(ctx, req.OwnerUserID, req.UserIDList) if err != nil { return nil, err } + blacks, err := s.blackDatabase.FindBlackInfos(ctx, req.OwnerUserID, req.UserIDList) if err != nil { return nil, err } + friendMap := datautil.SliceToMap(friends, func(e *model.Friend) string { return e.FriendUserID }) + blackMap := datautil.SliceToMap(blacks, func(e *model.Black) string { return e.BlockUserID }) + resp := &relation.GetSpecifiedFriendsInfoResp{ Infos: make([]*relation.GetSpecifiedFriendsInfoInfo, 0, len(req.UserIDList)), } + for _, userID := range req.UserIDList { user := userMap[userID] + if user == nil { continue } + var friendInfo *sdkws.FriendInfo if friend := friendMap[userID]; friend != nil { friendInfo = &sdkws.FriendInfo{ @@ -430,6 +458,7 @@ func (s *friendServer) GetSpecifiedFriendsInfo(ctx context.Context, req *relatio IsPinned: friend.IsPinned, } } + var blackInfo *sdkws.BlackInfo if black := blackMap[userID]; black != nil { blackInfo = &sdkws.BlackInfo{ @@ -440,12 +469,14 @@ func (s *friendServer) GetSpecifiedFriendsInfo(ctx context.Context, req *relatio Ex: black.Ex, } } + resp.Infos = append(resp.Infos, &relation.GetSpecifiedFriendsInfoInfo{ UserInfo: user, FriendInfo: friendInfo, BlackInfo: blackInfo, }) } + return resp, nil } diff --git a/pkg/common/config/config.go b/pkg/common/config/config.go index da0d6f1a1..830b1ef9d 100644 --- a/pkg/common/config/config.go +++ b/pkg/common/config/config.go @@ -185,7 +185,6 @@ type MsgGateway struct { WebsocketMaxMsgLen int `mapstructure:"websocketMaxMsgLen"` WebsocketTimeout int `mapstructure:"websocketTimeout"` } `mapstructure:"longConnSvr"` - MultiLoginPolicy int `mapstructure:"multiLoginPolicy"` } type MsgTransfer struct { @@ -358,9 +357,10 @@ type AfterConfig struct { } type Share struct { - Secret string `mapstructure:"secret"` - RpcRegisterName RpcRegisterName `mapstructure:"rpcRegisterName"` - IMAdminUserID []string `mapstructure:"imAdminUserID"` + Secret string `mapstructure:"secret"` + RpcRegisterName RpcRegisterName `mapstructure:"rpcRegisterName"` + IMAdminUserID []string `mapstructure:"imAdminUserID"` + MultiLoginPolicy int `mapstructure:"multiLoginPolicy"` } type RpcRegisterName struct { User string `mapstructure:"user"` diff --git a/pkg/common/storage/cache/cachekey/conversation.go b/pkg/common/storage/cache/cachekey/conversation.go index d19fcc576..909774288 100644 --- a/pkg/common/storage/cache/cachekey/conversation.go +++ b/pkg/common/storage/cache/cachekey/conversation.go @@ -17,6 +17,8 @@ package cachekey const ( ConversationKey = "CONVERSATION:" ConversationIDsKey = "CONVERSATION_IDS:" + NotNotifyConversationIDsKey = "NOT_NOTIFY_CONVERSATION_IDS:" + PinnedConversationIDsKey = "PINNED_CONVERSATION_IDS:" ConversationIDsHashKey = "CONVERSATION_IDS_HASH:" ConversationHasReadSeqKey = "CONVERSATION_HAS_READ_SEQ:" RecvMsgOptKey = "RECV_MSG_OPT:" @@ -34,6 +36,14 @@ func GetConversationIDsKey(ownerUserID string) string { return ConversationIDsKey + ownerUserID } +func GetNotNotifyConversationIDsKey(ownerUserID string) string { + return NotNotifyConversationIDsKey + ownerUserID +} + +func GetPinnedConversationIDs(ownerUserID string) string { + return PinnedConversationIDsKey + ownerUserID +} + func GetSuperGroupRecvNotNotifyUserIDsKey(groupID string) string { return SuperGroupRecvMsgNotNotifyUserIDsKey + groupID } diff --git a/pkg/common/storage/cache/cachekey/group.go b/pkg/common/storage/cache/cachekey/group.go index 2ef42c0ff..72eb7c295 100644 --- a/pkg/common/storage/cache/cachekey/group.go +++ b/pkg/common/storage/cache/cachekey/group.go @@ -20,16 +20,17 @@ import ( ) const ( - groupExpireTime = time.Second * 60 * 60 * 12 - GroupInfoKey = "GROUP_INFO:" - GroupMemberIDsKey = "GROUP_MEMBER_IDS:" - GroupMembersHashKey = "GROUP_MEMBERS_HASH2:" - GroupMemberInfoKey = "GROUP_MEMBER_INFO:" - JoinedGroupsKey = "JOIN_GROUPS_KEY:" - GroupMemberNumKey = "GROUP_MEMBER_NUM_CACHE:" - GroupRoleLevelMemberIDsKey = "GROUP_ROLE_LEVEL_MEMBER_IDS:" - GroupMemberMaxVersionKey = "GROUP_MEMBER_MAX_VERSION:" - GroupJoinMaxVersionKey = "GROUP_JOIN_MAX_VERSION:" + groupExpireTime = time.Second * 60 * 60 * 12 + GroupInfoKey = "GROUP_INFO:" + GroupMemberIDsKey = "GROUP_MEMBER_IDS:" + GroupMembersHashKey = "GROUP_MEMBERS_HASH2:" + GroupMemberInfoKey = "GROUP_MEMBER_INFO:" + JoinedGroupsKey = "JOIN_GROUPS_KEY:" + GroupMemberNumKey = "GROUP_MEMBER_NUM_CACHE:" + GroupRoleLevelMemberIDsKey = "GROUP_ROLE_LEVEL_MEMBER_IDS:" + GroupAdminLevelMemberIDsKey = "GROUP_ADMIN_LEVEL_MEMBER_IDS:" + GroupMemberMaxVersionKey = "GROUP_MEMBER_MAX_VERSION:" + GroupJoinMaxVersionKey = "GROUP_JOIN_MAX_VERSION:" ) func GetGroupInfoKey(groupID string) string { diff --git a/pkg/common/storage/cache/conversation.go b/pkg/common/storage/cache/conversation.go index bc1761483..ac3011107 100644 --- a/pkg/common/storage/cache/conversation.go +++ b/pkg/common/storage/cache/conversation.go @@ -25,6 +25,8 @@ type ConversationCache interface { CloneConversationCache() ConversationCache // get user's conversationIDs from msgCache GetUserConversationIDs(ctx context.Context, ownerUserID string) ([]string, error) + GetUserNotNotifyConversationIDs(ctx context.Context, userID string) ([]string, error) + GetPinnedConversationIDs(ctx context.Context, userID string) ([]string, error) DelConversationIDs(userIDs ...string) ConversationCache GetUserConversationIDsHash(ctx context.Context, ownerUserID string) (hash uint64, err error) @@ -54,7 +56,8 @@ type ConversationCache interface { GetConversationNotReceiveMessageUserIDs(ctx context.Context, conversationID string) ([]string, error) DelConversationNotReceiveMessageUserIDs(conversationIDs ...string) ConversationCache - + DelConversationNotNotifyMessageUserIDs(userIDs ...string) ConversationCache + DelConversationPinnedMessageUserIDs(userIDs ...string) ConversationCache DelConversationVersionUserIDs(userIDs ...string) ConversationCache FindMaxConversationUserVersion(ctx context.Context, userID string) (*relationtb.VersionLog, error) diff --git a/pkg/common/storage/cache/group.go b/pkg/common/storage/cache/group.go index 1ec046295..05b75745a 100644 --- a/pkg/common/storage/cache/group.go +++ b/pkg/common/storage/cache/group.go @@ -16,6 +16,7 @@ package cache import ( "context" + "github.com/openimsdk/open-im-server/v3/pkg/common/storage/common" "github.com/openimsdk/open-im-server/v3/pkg/common/storage/model" ) diff --git a/pkg/common/storage/cache/redis/conversation.go b/pkg/common/storage/cache/redis/conversation.go index 95e680afb..326f60b96 100644 --- a/pkg/common/storage/cache/redis/conversation.go +++ b/pkg/common/storage/cache/redis/conversation.go @@ -71,6 +71,14 @@ func (c *ConversationRedisCache) getConversationIDsKey(ownerUserID string) strin return cachekey.GetConversationIDsKey(ownerUserID) } +func (c *ConversationRedisCache) getNotNotifyConversationIDsKey(ownerUserID string) string { + return cachekey.GetNotNotifyConversationIDsKey(ownerUserID) +} + +func (c *ConversationRedisCache) getPinnedConversationIDsKey(ownerUserID string) string { + return cachekey.GetPinnedConversationIDs(ownerUserID) +} + func (c *ConversationRedisCache) getSuperGroupRecvNotNotifyUserIDsKey(groupID string) string { return cachekey.GetSuperGroupRecvNotNotifyUserIDsKey(groupID) } @@ -105,6 +113,18 @@ func (c *ConversationRedisCache) GetUserConversationIDs(ctx context.Context, own }) } +func (c *ConversationRedisCache) GetUserNotNotifyConversationIDs(ctx context.Context, userID string) ([]string, error) { + return getCache(ctx, c.rcClient, c.getNotNotifyConversationIDsKey(userID), c.expireTime, func(ctx context.Context) ([]string, error) { + return c.conversationDB.FindUserIDAllNotNotifyConversationID(ctx, userID) + }) +} + +func (c *ConversationRedisCache) GetPinnedConversationIDs(ctx context.Context, userID string) ([]string, error) { + return getCache(ctx, c.rcClient, c.getPinnedConversationIDsKey(userID), c.expireTime, func(ctx context.Context) ([]string, error) { + return c.conversationDB.FindUserIDAllPinnedConversationID(ctx, userID) + }) +} + func (c *ConversationRedisCache) DelConversationIDs(userIDs ...string) cache.ConversationCache { keys := make([]string, 0, len(userIDs)) for _, userID := range userIDs { @@ -242,6 +262,22 @@ func (c *ConversationRedisCache) DelConversationNotReceiveMessageUserIDs(convers return cache } +func (c *ConversationRedisCache) DelConversationNotNotifyMessageUserIDs(userIDs ...string) cache.ConversationCache { + cache := c.CloneConversationCache() + for _, userID := range userIDs { + cache.AddKeys(c.getNotNotifyConversationIDsKey(userID)) + } + return cache +} + +func (c *ConversationRedisCache) DelConversationPinnedMessageUserIDs(userIDs ...string) cache.ConversationCache { + cache := c.CloneConversationCache() + for _, userID := range userIDs { + cache.AddKeys(c.getPinnedConversationIDsKey(userID)) + } + return cache +} + func (c *ConversationRedisCache) DelConversationVersionUserIDs(userIDs ...string) cache.ConversationCache { cache := c.CloneConversationCache() for _, userID := range userIDs { diff --git a/pkg/common/storage/cache/redis/seq_conversation.go b/pkg/common/storage/cache/redis/seq_conversation.go index 7fe849193..71705cef7 100644 --- a/pkg/common/storage/cache/redis/seq_conversation.go +++ b/pkg/common/storage/cache/redis/seq_conversation.go @@ -12,6 +12,7 @@ import ( "github.com/openimsdk/tools/errs" "github.com/openimsdk/tools/log" "github.com/redis/go-redis/v9" + "strconv" "time" ) @@ -57,6 +58,14 @@ func (s *seqConversationCacheRedis) getSingleMaxSeq(ctx context.Context, convers return map[string]int64{conversationID: seq}, nil } +func (s *seqConversationCacheRedis) getSingleMaxSeqWithTime(ctx context.Context, conversationID string) (map[string]database.SeqTime, error) { + seq, err := s.GetMaxSeqWithTime(ctx, conversationID) + if err != nil { + return nil, err + } + return map[string]database.SeqTime{conversationID: seq}, nil +} + func (s *seqConversationCacheRedis) batchGetMaxSeq(ctx context.Context, keys []string, keyConversationID map[string]string, seqs map[string]int64) error { result := make([]*redis.StringCmd, len(keys)) pipe := s.rdb.Pipeline() @@ -88,6 +97,46 @@ func (s *seqConversationCacheRedis) batchGetMaxSeq(ctx context.Context, keys []s return nil } +func (s *seqConversationCacheRedis) batchGetMaxSeqWithTime(ctx context.Context, keys []string, keyConversationID map[string]string, seqs map[string]database.SeqTime) error { + result := make([]*redis.SliceCmd, len(keys)) + pipe := s.rdb.Pipeline() + for i, key := range keys { + result[i] = pipe.HMGet(ctx, key, "CURR", "TIME") + } + if _, err := pipe.Exec(ctx); err != nil && !errors.Is(err, redis.Nil) { + return errs.Wrap(err) + } + var notFoundKey []string + for i, r := range result { + val, err := r.Result() + if len(val) != 2 { + return errs.WrapMsg(err, "batchGetMaxSeqWithTime invalid result", "key", keys[i], "res", val) + } + if val[0] == nil { + notFoundKey = append(notFoundKey, keys[i]) + continue + } + seq, err := s.parseInt64(val[0]) + if err != nil { + return err + } + mill, err := s.parseInt64(val[1]) + if err != nil { + return err + } + seqs[keyConversationID[keys[i]]] = database.SeqTime{Seq: seq, Time: mill} + } + for _, key := range notFoundKey { + conversationID := keyConversationID[key] + seq, err := s.GetMaxSeqWithTime(ctx, conversationID) + if err != nil { + return err + } + seqs[conversationID] = seq + } + return nil +} + func (s *seqConversationCacheRedis) GetMaxSeqs(ctx context.Context, conversationIDs []string) (map[string]int64, error) { switch len(conversationIDs) { case 0: @@ -121,11 +170,44 @@ func (s *seqConversationCacheRedis) GetMaxSeqs(ctx context.Context, conversation return seqs, nil } +func (s *seqConversationCacheRedis) GetMaxSeqsWithTime(ctx context.Context, conversationIDs []string) (map[string]database.SeqTime, error) { + switch len(conversationIDs) { + case 0: + return map[string]database.SeqTime{}, nil + case 1: + return s.getSingleMaxSeqWithTime(ctx, conversationIDs[0]) + } + keys := make([]string, 0, len(conversationIDs)) + keyConversationID := make(map[string]string, len(conversationIDs)) + for _, conversationID := range conversationIDs { + key := s.getSeqMallocKey(conversationID) + if _, ok := keyConversationID[key]; ok { + continue + } + keys = append(keys, key) + keyConversationID[key] = conversationID + } + if len(keys) == 1 { + return s.getSingleMaxSeqWithTime(ctx, conversationIDs[0]) + } + slotKeys, err := groupKeysBySlot(ctx, s.rdb, keys) + if err != nil { + return nil, err + } + seqs := make(map[string]database.SeqTime, len(conversationIDs)) + for _, keys := range slotKeys { + if err := s.batchGetMaxSeqWithTime(ctx, keys, keyConversationID, seqs); err != nil { + return nil, err + } + } + return seqs, nil +} + func (s *seqConversationCacheRedis) getSeqMallocKey(conversationID string) string { return cachekey.GetMallocSeqKey(conversationID) } -func (s *seqConversationCacheRedis) setSeq(ctx context.Context, key string, owner int64, currSeq int64, lastSeq int64) (int64, error) { +func (s *seqConversationCacheRedis) setSeq(ctx context.Context, key string, owner int64, currSeq int64, lastSeq int64, mill int64) (int64, error) { if lastSeq < currSeq { return 0, errs.New("lastSeq must be greater than currSeq") } @@ -138,8 +220,9 @@ local lockValue = ARGV[1] local dataSecond = ARGV[2] local curr_seq = tonumber(ARGV[3]) local last_seq = tonumber(ARGV[4]) +local mallocTime = ARGV[5] if redis.call("EXISTS", key) == 0 then - redis.call("HSET", key, "CURR", curr_seq, "LAST", last_seq) + redis.call("HSET", key, "CURR", curr_seq, "LAST", last_seq, "TIME", mallocTime) redis.call("EXPIRE", key, dataSecond) return 1 end @@ -147,11 +230,11 @@ if redis.call("HGET", key, "LOCK") ~= lockValue then return 2 end redis.call("HDEL", key, "LOCK") -redis.call("HSET", key, "CURR", curr_seq, "LAST", last_seq) +redis.call("HSET", key, "CURR", curr_seq, "LAST", last_seq, "TIME", mallocTime) redis.call("EXPIRE", key, dataSecond) return 0 ` - result, err := s.rdb.Eval(ctx, script, []string{key}, owner, int64(s.dataTime/time.Second), currSeq, lastSeq).Int64() + result, err := s.rdb.Eval(ctx, script, []string{key}, owner, int64(s.dataTime/time.Second), currSeq, lastSeq, mill).Int64() if err != nil { return 0, errs.Wrap(err) } @@ -169,6 +252,7 @@ local key = KEYS[1] local size = tonumber(ARGV[1]) local lockSecond = ARGV[2] local dataSecond = ARGV[3] +local mallocTime = ARGV[4] local result = {} if redis.call("EXISTS", key) == 0 then local lockValue = math.random(0, 999999999) @@ -176,6 +260,7 @@ if redis.call("EXISTS", key) == 0 then redis.call("EXPIRE", key, lockSecond) table.insert(result, 1) table.insert(result, lockValue) + table.insert(result, mallocTime) return result end if redis.call("HEXISTS", key, "LOCK") == 1 then @@ -189,6 +274,12 @@ if size == 0 then table.insert(result, 0) table.insert(result, curr_seq) table.insert(result, last_seq) + local setTime = redis.call("HGET", key, "TIME") + if setTime then + table.insert(result, setTime) + else + table.insert(result, 0) + end return result end local max_seq = curr_seq + size @@ -196,21 +287,25 @@ if max_seq > last_seq then local lockValue = math.random(0, 999999999) redis.call("HSET", key, "LOCK", lockValue) redis.call("HSET", key, "CURR", last_seq) + redis.call("HSET", key, "TIME", mallocTime) redis.call("EXPIRE", key, lockSecond) table.insert(result, 3) table.insert(result, curr_seq) table.insert(result, last_seq) table.insert(result, lockValue) + table.insert(result, mallocTime) return result end redis.call("HSET", key, "CURR", max_seq) +redis.call("HSET", key, "TIME", ARGV[4]) redis.call("EXPIRE", key, dataSecond) table.insert(result, 0) table.insert(result, curr_seq) table.insert(result, last_seq) +table.insert(result, mallocTime) return result ` - result, err := s.rdb.Eval(ctx, script, []string{key}, size, int64(s.lockTime/time.Second), int64(s.dataTime/time.Second)).Int64Slice() + result, err := s.rdb.Eval(ctx, script, []string{key}, size, int64(s.lockTime/time.Second), int64(s.dataTime/time.Second), time.Now().UnixMilli()).Int64Slice() if err != nil { return nil, errs.Wrap(err) } @@ -228,9 +323,9 @@ func (s *seqConversationCacheRedis) wait(ctx context.Context) error { } } -func (s *seqConversationCacheRedis) setSeqRetry(ctx context.Context, key string, owner int64, currSeq int64, lastSeq int64) { +func (s *seqConversationCacheRedis) setSeqRetry(ctx context.Context, key string, owner int64, currSeq int64, lastSeq int64, mill int64) { for i := 0; i < 10; i++ { - state, err := s.setSeq(ctx, key, owner, currSeq, lastSeq) + state, err := s.setSeq(ctx, key, owner, currSeq, lastSeq, mill) if err != nil { log.ZError(ctx, "set seq cache failed", err, "key", key, "owner", owner, "currSeq", currSeq, "lastSeq", lastSeq, "count", i+1) if err := s.wait(ctx); err != nil { @@ -267,60 +362,74 @@ func (s *seqConversationCacheRedis) getMallocSize(conversationID string, size in } func (s *seqConversationCacheRedis) Malloc(ctx context.Context, conversationID string, size int64) (int64, error) { + seq, _, err := s.mallocTime(ctx, conversationID, size) + return seq, err +} + +func (s *seqConversationCacheRedis) mallocTime(ctx context.Context, conversationID string, size int64) (int64, int64, error) { if size < 0 { - return 0, errs.New("size must be greater than 0") + return 0, 0, errs.New("size must be greater than 0") } key := s.getSeqMallocKey(conversationID) for i := 0; i < 10; i++ { states, err := s.malloc(ctx, key, size) if err != nil { - return 0, err + return 0, 0, err } switch states[0] { case 0: // success - return states[1], nil + return states[1], states[3], nil case 1: // not found mallocSize := s.getMallocSize(conversationID, size) seq, err := s.mgo.Malloc(ctx, conversationID, mallocSize) if err != nil { - return 0, err + return 0, 0, err } - s.setSeqRetry(ctx, key, states[1], seq+size, seq+mallocSize) - return seq, nil + s.setSeqRetry(ctx, key, states[1], seq+size, seq+mallocSize, states[2]) + return seq, 0, nil case 2: // locked if err := s.wait(ctx); err != nil { - return 0, err + return 0, 0, err } continue case 3: // exceeded cache max value currSeq := states[1] lastSeq := states[2] + mill := states[4] mallocSize := s.getMallocSize(conversationID, size) seq, err := s.mgo.Malloc(ctx, conversationID, mallocSize) if err != nil { - return 0, err + return 0, 0, err } if lastSeq == seq { - s.setSeqRetry(ctx, key, states[3], currSeq+size, seq+mallocSize) - return currSeq, nil + s.setSeqRetry(ctx, key, states[3], currSeq+size, seq+mallocSize, mill) + return currSeq, states[4], nil } else { log.ZWarn(ctx, "malloc seq not equal cache last seq", nil, "conversationID", conversationID, "currSeq", currSeq, "lastSeq", lastSeq, "mallocSeq", seq) - s.setSeqRetry(ctx, key, states[3], seq+size, seq+mallocSize) - return seq, nil + s.setSeqRetry(ctx, key, states[3], seq+size, seq+mallocSize, mill) + return seq, mill, nil } default: log.ZError(ctx, "malloc seq unknown state", nil, "state", states[0], "conversationID", conversationID, "size", size) - return 0, errs.New(fmt.Sprintf("unknown state: %d", states[0])) + return 0, 0, errs.New(fmt.Sprintf("unknown state: %d", states[0])) } } log.ZError(ctx, "malloc seq retrying still failed", nil, "conversationID", conversationID, "size", size) - return 0, errs.New("malloc seq waiting for lock timeout", "conversationID", conversationID, "size", size) + return 0, 0, errs.New("malloc seq waiting for lock timeout", "conversationID", conversationID, "size", size) } func (s *seqConversationCacheRedis) GetMaxSeq(ctx context.Context, conversationID string) (int64, error) { return s.Malloc(ctx, conversationID, 0) } +func (s *seqConversationCacheRedis) GetMaxSeqWithTime(ctx context.Context, conversationID string) (database.SeqTime, error) { + seq, mill, err := s.mallocTime(ctx, conversationID, 0) + if err != nil { + return database.SeqTime{}, err + } + return database.SeqTime{Seq: seq, Time: mill}, nil +} + func (s *seqConversationCacheRedis) SetMinSeqs(ctx context.Context, seqs map[string]int64) error { keys := make([]string, 0, len(seqs)) for conversationID, seq := range seqs { @@ -331,3 +440,80 @@ func (s *seqConversationCacheRedis) SetMinSeqs(ctx context.Context, seqs map[str } return DeleteCacheBySlot(ctx, s.rocks, keys) } + +// GetCacheMaxSeqWithTime only get the existing cache, if there is no cache, no cache will be generated +func (s *seqConversationCacheRedis) GetCacheMaxSeqWithTime(ctx context.Context, conversationIDs []string) (map[string]database.SeqTime, error) { + if len(conversationIDs) == 0 { + return map[string]database.SeqTime{}, nil + } + key2conversationID := make(map[string]string) + keys := make([]string, 0, len(conversationIDs)) + for _, conversationID := range conversationIDs { + key := s.getSeqMallocKey(conversationID) + if _, ok := key2conversationID[key]; ok { + continue + } + key2conversationID[key] = conversationID + keys = append(keys, key) + } + slotKeys, err := groupKeysBySlot(ctx, s.rdb, keys) + if err != nil { + return nil, err + } + res := make(map[string]database.SeqTime) + for _, keys := range slotKeys { + if len(keys) == 0 { + continue + } + pipe := s.rdb.Pipeline() + cmds := make([]*redis.SliceCmd, 0, len(keys)) + for _, key := range keys { + cmds = append(cmds, pipe.HMGet(ctx, key, "CURR", "TIME")) + } + if _, err := pipe.Exec(ctx); err != nil { + return nil, errs.Wrap(err) + } + for i, cmd := range cmds { + val, err := cmd.Result() + if err != nil { + return nil, err + } + if len(val) != 2 { + return nil, errs.WrapMsg(err, "GetCacheMaxSeqWithTime invalid result", "key", keys[i], "res", val) + } + if val[0] == nil { + continue + } + seq, err := s.parseInt64(val[0]) + if err != nil { + return nil, err + } + mill, err := s.parseInt64(val[1]) + if err != nil { + return nil, err + } + conversationID := key2conversationID[keys[i]] + res[conversationID] = database.SeqTime{Seq: seq, Time: mill} + } + } + return res, nil +} + +func (s *seqConversationCacheRedis) parseInt64(val any) (int64, error) { + switch v := val.(type) { + case nil: + return 0, nil + case int: + return int64(v), nil + case int64: + return v, nil + case string: + res, err := strconv.ParseInt(v, 10, 64) + if err != nil { + return 0, errs.WrapMsg(err, "invalid string not int64", "value", v) + } + return res, nil + default: + return 0, errs.New("invalid result not int64", "resType", fmt.Sprintf("%T", v), "value", v) + } +} diff --git a/pkg/common/storage/cache/redis/seq_conversation_test.go b/pkg/common/storage/cache/redis/seq_conversation_test.go index 1a40624b8..d8bfdfbfb 100644 --- a/pkg/common/storage/cache/redis/seq_conversation_test.go +++ b/pkg/common/storage/cache/redis/seq_conversation_test.go @@ -14,7 +14,7 @@ import ( ) func newTestSeq() *seqConversationCacheRedis { - mgocli, err := mongo.Connect(context.Background(), options.Client().ApplyURI("mongodb://openIM:openIM123@172.16.8.48:37017/openim_v3?maxPoolSize=100").SetConnectTimeout(5*time.Second)) + mgocli, err := mongo.Connect(context.Background(), options.Client().ApplyURI("mongodb://openIM:openIM123@127.0.0.1:37017/openim_v3?maxPoolSize=100").SetConnectTimeout(5*time.Second)) if err != nil { panic(err) } @@ -23,7 +23,7 @@ func newTestSeq() *seqConversationCacheRedis { panic(err) } opt := &redis.Options{ - Addr: "172.16.8.48:16379", + Addr: "127.0.0.1:16379", Password: "openIM123", DB: 1, } @@ -107,3 +107,37 @@ func TestMinSeq(t *testing.T) { ts := newTestSeq() t.Log(ts.GetMinSeq(context.Background(), "10000000")) } + +func TestMalloc(t *testing.T) { + ts := newTestSeq() + t.Log(ts.mallocTime(context.Background(), "10000000", 100)) +} + +func TestHMGET(t *testing.T) { + ts := newTestSeq() + res, err := ts.GetCacheMaxSeqWithTime(context.Background(), []string{"10000000", "123456"}) + if err != nil { + panic(err) + } + t.Log(res) +} + +func TestGetMaxSeqWithTime(t *testing.T) { + ts := newTestSeq() + t.Log(ts.GetMaxSeqWithTime(context.Background(), "10000000")) +} + +func TestGetMaxSeqWithTime1(t *testing.T) { + ts := newTestSeq() + t.Log(ts.GetMaxSeqsWithTime(context.Background(), []string{"10000000", "12345", "111"})) +} + +// +//func TestHMGET(t *testing.T) { +// ts := newTestSeq() +// res, err := ts.rdb.HMGet(context.Background(), "MALLOC_SEQ:1", "CURR", "TIME1").Result() +// if err != nil { +// panic(err) +// } +// t.Log(res) +//} diff --git a/pkg/common/storage/cache/seq_conversation.go b/pkg/common/storage/cache/seq_conversation.go index 2c893a5e8..f35d7bf52 100644 --- a/pkg/common/storage/cache/seq_conversation.go +++ b/pkg/common/storage/cache/seq_conversation.go @@ -1,6 +1,9 @@ package cache -import "context" +import ( + "context" + "github.com/openimsdk/open-im-server/v3/pkg/common/storage/database" +) type SeqConversationCache interface { Malloc(ctx context.Context, conversationID string, size int64) (int64, error) @@ -9,4 +12,7 @@ type SeqConversationCache interface { GetMinSeq(ctx context.Context, conversationID string) (int64, error) GetMaxSeqs(ctx context.Context, conversationIDs []string) (map[string]int64, error) SetMinSeqs(ctx context.Context, seqs map[string]int64) error + GetCacheMaxSeqWithTime(ctx context.Context, conversationIDs []string) (map[string]database.SeqTime, error) + GetMaxSeqsWithTime(ctx context.Context, conversationIDs []string) (map[string]database.SeqTime, error) + GetMaxSeqWithTime(ctx context.Context, conversationID string) (database.SeqTime, error) } diff --git a/pkg/common/storage/controller/auth.go b/pkg/common/storage/controller/auth.go index b725513d9..cb06a197d 100644 --- a/pkg/common/storage/controller/auth.go +++ b/pkg/common/storage/controller/auth.go @@ -35,13 +35,14 @@ type AuthDatabase interface { } type authDatabase struct { - cache cache.TokenModel - accessSecret string - accessExpire int64 + cache cache.TokenModel + accessSecret string + accessExpire int64 + multiLoginPolicy int } -func NewAuthDatabase(cache cache.TokenModel, accessSecret string, accessExpire int64) AuthDatabase { - return &authDatabase{cache: cache, accessSecret: accessSecret, accessExpire: accessExpire} +func NewAuthDatabase(cache cache.TokenModel, accessSecret string, accessExpire int64, policy int) AuthDatabase { + return &authDatabase{cache: cache, accessSecret: accessSecret, accessExpire: accessExpire, multiLoginPolicy: policy} } // If the result is empty. @@ -55,15 +56,19 @@ func (a *authDatabase) SetTokenMapByUidPid(ctx context.Context, userID string, p // Create Token. func (a *authDatabase) CreateToken(ctx context.Context, userID string, platformID int) (string, error) { + // todo: get all platform token tokens, err := a.cache.GetTokensWithoutError(ctx, userID, platformID) if err != nil { return "", err } var deleteTokenKey []string + var kickedTokenKey []string for k, v := range tokens { - _, err = tokenverify.GetClaimFromToken(k, authverify.Secret(a.accessSecret)) + t, err := tokenverify.GetClaimFromToken(k, authverify.Secret(a.accessSecret)) if err != nil || v != constant.NormalToken { deleteTokenKey = append(deleteTokenKey, k) + } else if a.checkKickToken(ctx, platformID, t) { + kickedTokenKey = append(kickedTokenKey, k) } } if len(deleteTokenKey) != 0 { @@ -72,6 +77,14 @@ func (a *authDatabase) CreateToken(ctx context.Context, userID string, platformI return "", err } } + if len(kickedTokenKey) != 0 { + for _, k := range kickedTokenKey { + err := a.cache.SetTokenFlagEx(ctx, userID, platformID, k, constant.KickedToken) + if err != nil { + return "", err + } + } + } claims := tokenverify.BuildClaims(userID, platformID, a.accessExpire) token := jwt.NewWithClaims(jwt.SigningMethodHS256, claims) @@ -85,3 +98,23 @@ func (a *authDatabase) CreateToken(ctx context.Context, userID string, platformI } return tokenString, nil } + +func (a *authDatabase) checkKickToken(ctx context.Context, platformID int, token *tokenverify.Claims) bool { + switch a.multiLoginPolicy { + case constant.DefalutNotKick: + return false + case constant.PCAndOther: + if constant.PlatformIDToClass(platformID) == constant.TerminalPC || + constant.PlatformIDToClass(token.PlatformID) == constant.TerminalPC { + return false + } + return true + case constant.AllLoginButSameTermKick: + if platformID == token.PlatformID { + return true + } + return false + default: + return false + } +} diff --git a/pkg/common/storage/controller/conversation.go b/pkg/common/storage/controller/conversation.go index c804d1cc5..06a073365 100644 --- a/pkg/common/storage/controller/conversation.go +++ b/pkg/common/storage/controller/conversation.go @@ -69,6 +69,10 @@ type ConversationDatabase interface { FindConversationUserVersion(ctx context.Context, userID string, version uint, limit int) (*relationtb.VersionLog, error) FindMaxConversationUserVersionCache(ctx context.Context, userID string) (*relationtb.VersionLog, error) GetOwnerConversation(ctx context.Context, ownerUserID string, pagination pagination.Pagination) (int64, []*relationtb.Conversation, error) + // GetNotNotifyConversationIDs gets not notify conversationIDs by userID + GetNotNotifyConversationIDs(ctx context.Context, userID string) ([]string, error) + // GetPinnedConversationIDs gets pinned conversationIDs by userID + GetPinnedConversationIDs(ctx context.Context, userID string) ([]string, error) } func NewConversationDatabase(conversation database.Conversation, cache cache.ConversationCache, tx tx.Tx) ConversationDatabase { @@ -108,6 +112,10 @@ func (c *conversationDatabase) SetUsersConversationFieldTx(ctx context.Context, } if _, ok := fieldMap["recv_msg_opt"]; ok { cache = cache.DelConversationNotReceiveMessageUserIDs(conversation.ConversationID) + cache = cache.DelConversationNotNotifyMessageUserIDs(userIDs...) + } + if _, ok := fieldMap["is_pinned"]; ok { + cache = cache.DelConversationPinnedMessageUserIDs(userIDs...) } cache = cache.DelConversationVersionUserIDs(haveUserIDs...) } @@ -144,6 +152,10 @@ func (c *conversationDatabase) UpdateUsersConversationField(ctx context.Context, cache = cache.DelUsersConversation(conversationID, userIDs...).DelConversationVersionUserIDs(userIDs...) if _, ok := args["recv_msg_opt"]; ok { cache = cache.DelConversationNotReceiveMessageUserIDs(conversationID) + cache = cache.DelConversationNotNotifyMessageUserIDs(userIDs...) + } + if _, ok := args["is_pinned"]; ok { + cache = cache.DelConversationPinnedMessageUserIDs(userIDs...) } return cache.ChainExecDel(ctx) } @@ -152,14 +164,30 @@ func (c *conversationDatabase) CreateConversation(ctx context.Context, conversat if err := c.conversationDB.Create(ctx, conversations); err != nil { return err } - var userIDs []string + var ( + userIDs []string + notNotifyUserIDs []string + pinnedUserIDs []string + ) + cache := c.cache.CloneConversationCache() for _, conversation := range conversations { cache = cache.DelConversations(conversation.OwnerUserID, conversation.ConversationID) cache = cache.DelConversationNotReceiveMessageUserIDs(conversation.ConversationID) userIDs = append(userIDs, conversation.OwnerUserID) + if conversation.RecvMsgOpt == constant.ReceiveNotNotifyMessage { + notNotifyUserIDs = append(notNotifyUserIDs, conversation.OwnerUserID) + } + if conversation.IsPinned == true { + pinnedUserIDs = append(pinnedUserIDs, conversation.OwnerUserID) + } } - return cache.DelConversationIDs(userIDs...).DelUserConversationIDsHash(userIDs...).DelConversationVersionUserIDs(userIDs...).ChainExecDel(ctx) + return cache.DelConversationIDs(userIDs...). + DelUserConversationIDsHash(userIDs...). + DelConversationVersionUserIDs(userIDs...). + DelConversationNotNotifyMessageUserIDs(notNotifyUserIDs...). + DelConversationPinnedMessageUserIDs(pinnedUserIDs...). + ChainExecDel(ctx) } func (c *conversationDatabase) SyncPeerUserPrivateConversationTx(ctx context.Context, conversations []*relationtb.Conversation) error { @@ -212,7 +240,10 @@ func (c *conversationDatabase) GetUserAllConversation(ctx context.Context, owner func (c *conversationDatabase) SetUserConversations(ctx context.Context, ownerUserID string, conversations []*relationtb.Conversation) error { return c.tx.Transaction(ctx, func(ctx context.Context) error { cache := c.cache.CloneConversationCache() - cache = cache.DelConversationVersionUserIDs(ownerUserID) + cache = cache.DelConversationVersionUserIDs(ownerUserID). + DelConversationNotNotifyMessageUserIDs(ownerUserID). + DelConversationPinnedMessageUserIDs(ownerUserID) + groupIDs := datautil.Distinct(datautil.Filter(conversations, func(e *relationtb.Conversation) (string, bool) { return e.GroupID, e.GroupID != "" })) @@ -353,3 +384,19 @@ func (c *conversationDatabase) GetOwnerConversation(ctx context.Context, ownerUs } return int64(len(conversationIDs)), conversations, nil } + +func (c *conversationDatabase) GetNotNotifyConversationIDs(ctx context.Context, userID string) ([]string, error) { + conversationIDs, err := c.cache.GetUserNotNotifyConversationIDs(ctx, userID) + if err != nil { + return nil, err + } + return conversationIDs, nil +} + +func (c *conversationDatabase) GetPinnedConversationIDs(ctx context.Context, userID string) ([]string, error) { + conversationIDs, err := c.cache.GetPinnedConversationIDs(ctx, userID) + if err != nil { + return nil, err + } + return conversationIDs, nil +} diff --git a/pkg/common/storage/controller/msg.go b/pkg/common/storage/controller/msg.go index 16a7b1c9b..d579069b6 100644 --- a/pkg/common/storage/controller/msg.go +++ b/pkg/common/storage/controller/msg.go @@ -74,6 +74,10 @@ type CommonMsgDatabase interface { GetHasReadSeq(ctx context.Context, userID string, conversationID string) (int64, error) UserSetHasReadSeqs(ctx context.Context, userID string, hasReadSeqs map[string]int64) error + GetMaxSeqsWithTime(ctx context.Context, conversationIDs []string) (map[string]database.SeqTime, error) + GetMaxSeqWithTime(ctx context.Context, conversationID string) (database.SeqTime, error) + GetCacheMaxSeqWithTime(ctx context.Context, conversationIDs []string) (map[string]database.SeqTime, error) + //GetMongoMaxAndMinSeq(ctx context.Context, conversationID string) (minSeqMongo, maxSeqMongo int64, err error) //GetConversationMinMaxSeqInMongoAndCache(ctx context.Context, conversationID string) (minSeqMongo, maxSeqMongo, minSeqCache, maxSeqCache int64, err error) SetSendMsgStatus(ctx context.Context, id string, status int32) error @@ -866,3 +870,16 @@ func (db *commonMsgDatabase) setMinSeq(ctx context.Context, conversationID strin func (db *commonMsgDatabase) GetDocIDs(ctx context.Context) ([]string, error) { return db.msgDocDatabase.GetDocIDs(ctx) } + +func (db *commonMsgDatabase) GetCacheMaxSeqWithTime(ctx context.Context, conversationIDs []string) (map[string]database.SeqTime, error) { + return db.seqConversation.GetCacheMaxSeqWithTime(ctx, conversationIDs) +} + +func (db *commonMsgDatabase) GetMaxSeqWithTime(ctx context.Context, conversationID string) (database.SeqTime, error) { + return db.seqConversation.GetMaxSeqWithTime(ctx, conversationID) +} + +func (db *commonMsgDatabase) GetMaxSeqsWithTime(ctx context.Context, conversationIDs []string) (map[string]database.SeqTime, error) { + // todo: only the time in the redis cache will be taken, not the message time + return db.seqConversation.GetMaxSeqsWithTime(ctx, conversationIDs) +} diff --git a/pkg/common/storage/database/conversation.go b/pkg/common/storage/database/conversation.go index 85f3dd668..5a9b19035 100644 --- a/pkg/common/storage/database/conversation.go +++ b/pkg/common/storage/database/conversation.go @@ -27,6 +27,8 @@ type Conversation interface { Find(ctx context.Context, ownerUserID string, conversationIDs []string) (conversations []*model.Conversation, err error) FindUserID(ctx context.Context, userIDs []string, conversationIDs []string) ([]string, error) FindUserIDAllConversationID(ctx context.Context, userID string) ([]string, error) + FindUserIDAllNotNotifyConversationID(ctx context.Context, userID string) ([]string, error) + FindUserIDAllPinnedConversationID(ctx context.Context, userID string) ([]string, error) Take(ctx context.Context, userID, conversationID string) (conversation *model.Conversation, err error) FindConversationID(ctx context.Context, userID string, conversationIDs []string) (existConversationID []string, err error) FindUserIDAllConversations(ctx context.Context, userID string) (conversations []*model.Conversation, err error) diff --git a/pkg/common/storage/database/mgo/conversation.go b/pkg/common/storage/database/mgo/conversation.go index 3d505f1d3..f7ced1c2c 100644 --- a/pkg/common/storage/database/mgo/conversation.go +++ b/pkg/common/storage/database/mgo/conversation.go @@ -124,6 +124,20 @@ func (c *ConversationMgo) FindUserIDAllConversationID(ctx context.Context, userI return mongoutil.Find[string](ctx, c.coll, bson.M{"owner_user_id": userID}, options.Find().SetProjection(bson.M{"_id": 0, "conversation_id": 1})) } +func (c *ConversationMgo) FindUserIDAllNotNotifyConversationID(ctx context.Context, userID string) ([]string, error) { + return mongoutil.Find[string](ctx, c.coll, bson.M{ + "owner_user_id": userID, + "recv_msg_opt": constant.ReceiveNotNotifyMessage, + }, options.Find().SetProjection(bson.M{"_id": 0, "conversation_id": 1})) +} + +func (c *ConversationMgo) FindUserIDAllPinnedConversationID(ctx context.Context, userID string) ([]string, error) { + return mongoutil.Find[string](ctx, c.coll, bson.M{ + "owner_user_id": userID, + "is_pinned": true, + }, options.Find().SetProjection(bson.M{"_id": 0, "conversation_id": 1})) +} + func (c *ConversationMgo) Take(ctx context.Context, userID, conversationID string) (conversation *model.Conversation, err error) { return mongoutil.FindOne[*model.Conversation](ctx, c.coll, bson.M{"owner_user_id": userID, "conversation_id": conversationID}) } diff --git a/pkg/common/storage/database/seq.go b/pkg/common/storage/database/seq.go index cf93b795f..a97ca2d1f 100644 --- a/pkg/common/storage/database/seq.go +++ b/pkg/common/storage/database/seq.go @@ -2,6 +2,11 @@ package database import "context" +type SeqTime struct { + Seq int64 + Time int64 +} + type SeqConversation interface { Malloc(ctx context.Context, conversationID string, size int64) (int64, error) GetMaxSeq(ctx context.Context, conversationID string) (int64, error) diff --git a/pkg/rpcclient/user.go b/pkg/rpcclient/user.go index 375cc993c..bdc1a2e01 100644 --- a/pkg/rpcclient/user.go +++ b/pkg/rpcclient/user.go @@ -109,12 +109,12 @@ func (u *UserRpcClient) GetUsersInfoMap(ctx context.Context, userIDs []string) ( func (u *UserRpcClient) GetPublicUserInfos( ctx context.Context, userIDs []string, - complete bool, ) ([]*sdkws.PublicUserInfo, error) { users, err := u.GetUsersInfo(ctx, userIDs) if err != nil { return nil, err } + return datautil.Slice(users, func(e *sdkws.UserInfo) *sdkws.PublicUserInfo { return &sdkws.PublicUserInfo{ UserID: e.UserID, @@ -127,10 +127,11 @@ func (u *UserRpcClient) GetPublicUserInfos( // GetPublicUserInfo retrieves public information for a single user based on the provided user ID. func (u *UserRpcClient) GetPublicUserInfo(ctx context.Context, userID string) (*sdkws.PublicUserInfo, error) { - users, err := u.GetPublicUserInfos(ctx, []string{userID}, true) + users, err := u.GetPublicUserInfos(ctx, []string{userID}) if err != nil { return nil, err } + return users[0], nil } @@ -138,12 +139,12 @@ func (u *UserRpcClient) GetPublicUserInfo(ctx context.Context, userID string) (* func (u *UserRpcClient) GetPublicUserInfoMap( ctx context.Context, userIDs []string, - complete bool, ) (map[string]*sdkws.PublicUserInfo, error) { - users, err := u.GetPublicUserInfos(ctx, userIDs, complete) + users, err := u.GetPublicUserInfos(ctx, userIDs) if err != nil { return nil, err } + return datautil.SliceToMap(users, func(e *sdkws.PublicUserInfo) string { return e.UserID }), nil