From 36780c6b4b2c8212b70343eef6722e7f5f80d94a Mon Sep 17 00:00:00 2001 From: Gordon <46924906+FGadvancer@users.noreply.github.com> Date: Fri, 19 Apr 2024 17:59:02 +0800 Subject: [PATCH 1/8] fix: push update. --- go.mod | 4 +- go.sum | 23 +- internal/api/custom_validator.go | 2 +- .../msgtransfer/online_history_msg_handler.go | 2 +- internal/push/callback.go | 25 +- internal/push/consumer_init.go | 41 -- .../push/offlinepush/offlinepush_interface.go | 37 -- internal/push/offlinepush/offlinepusher.go | 52 ++ internal/push/offlinepush/options/options.go | 14 + internal/push/onlinepusher.go | 204 +++++++ internal/push/push.go | 71 +++ internal/push/push_handler.go | 302 +++++++++- internal/push/push_rpc_server.go | 121 ---- internal/push/push_to_client.go | 529 ------------------ internal/push/tools.go | 31 - internal/rpc/conversation/conversaion.go | 8 +- internal/rpc/group/group.go | 6 +- internal/rpc/msg/as_read.go | 2 +- internal/rpc/msg/revoke.go | 4 +- internal/rpc/msg/send.go | 2 +- internal/rpc/msg/server.go | 2 +- internal/rpc/msg/sync_msg.go | 4 +- internal/rpc/msg/verify.go | 2 +- pkg/common/db/cache/msg.go | 4 +- pkg/common/db/controller/conversation.go | 4 +- pkg/msgprocessor/conversation.go | 22 +- pkg/rpcclient/msg.go | 36 +- 27 files changed, 699 insertions(+), 855 deletions(-) delete mode 100644 internal/push/consumer_init.go delete mode 100644 internal/push/offlinepush/offlinepush_interface.go create mode 100644 internal/push/offlinepush/offlinepusher.go create mode 100644 internal/push/offlinepush/options/options.go create mode 100644 internal/push/onlinepusher.go create mode 100644 internal/push/push.go delete mode 100644 internal/push/push_rpc_server.go delete mode 100644 internal/push/push_to_client.go delete mode 100644 internal/push/tools.go diff --git a/go.mod b/go.mod index 68ddc656d..56517c233 100644 --- a/go.mod +++ b/go.mod @@ -1,6 +1,6 @@ module github.com/openimsdk/open-im-server/v3 -go 1.21.2 +go 1.20 require ( firebase.google.com/go v3.13.0+incompatible @@ -14,7 +14,7 @@ require ( github.com/lestrrat-go/file-rotatelogs v2.4.0+incompatible // indirect github.com/mitchellh/mapstructure v1.5.0 github.com/openimsdk/localcache v0.0.1 - github.com/openimsdk/protocol v0.0.63 + github.com/openimsdk/protocol v0.0.64-0.20240419092747-47b8c07ee719 github.com/openimsdk/tools v0.0.47-alpha.42 github.com/pkg/errors v0.9.1 // indirect github.com/prometheus/client_golang v1.18.0 diff --git a/go.sum b/go.sum index 68e241cdc..3a9436a68 100644 --- a/go.sum +++ b/go.sum @@ -22,13 +22,10 @@ github.com/QcloudApi/qcloud_sign_golang v0.0.0-20141224014652-e4130a326409/go.mo github.com/aliyun/aliyun-oss-go-sdk v3.0.2+incompatible h1:8psS8a+wKfiLt1iVDX79F7Y6wUM49Lcha2FMXt4UM8g= github.com/aliyun/aliyun-oss-go-sdk v3.0.2+incompatible/go.mod h1:T/Aws4fEfogEE9v+HPhhw+CntffsBHJ8nXQCwKr0/g8= github.com/benbjohnson/clock v1.1.0 h1:Q92kusRqC1XV2MjkWETPvjJVqKetz1OzxZB7mHJLju8= -github.com/benbjohnson/clock v1.1.0/go.mod h1:J11/hYXuz8f4ySSvYwY0FKfm+ezbsZBKZxNJlLklBHA= github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM= github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw= github.com/bsm/ginkgo/v2 v2.12.0 h1:Ny8MWAHyOepLGlLKYmXG4IEkioBysk6GpaRTLC8zwWs= -github.com/bsm/ginkgo/v2 v2.12.0/go.mod h1:SwYbGRRDovPVboqFv0tPTcG1sN61LM1Z4ARdbAV9g4c= github.com/bsm/gomega v1.27.10 h1:yeMWxP2pV2fG3FgAODIY8EiRE3dy0aeFYt4l7wh6yKA= -github.com/bsm/gomega v1.27.10/go.mod h1:JyEr/xRbxbtgWNi8tIEVPUYZ5Dzef52k01W3YH0H+O0= github.com/bytedance/sonic v1.5.0/go.mod h1:ED5hyg4y6t3/9Ku1R6dU/4KyJ48DZ4jPhfY1O2AihPM= github.com/bytedance/sonic v1.9.1 h1:6iJ6NqdoxCDr6mbY8h18oSO+cShGSMRGCEo7F2h0x8s= github.com/bytedance/sonic v1.9.1/go.mod h1:i736AoUSYt75HyZLoJW9ERYxcy6eaN6h4BZXU064P/U= @@ -46,7 +43,6 @@ github.com/clbanning/mxj v1.8.4/go.mod h1:BVjHeAH+rl9rs6f+QIpeRl0tfu10SXn1pUSa5P github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw= github.com/cncf/udpa/go v0.0.0-20191209042840-269d4d468f6f/go.mod h1:M8M6+tZqaGXZJjfX53e64911xZQV5JYwmTeXPW+k8Sc= github.com/cncf/xds/go v0.0.0-20231128003011-0fa0005c9caa h1:jQCWAUqqlij9Pgj2i/PB79y4KOPYVyFYdROxgaCwdTQ= -github.com/cncf/xds/go v0.0.0-20231128003011-0fa0005c9caa/go.mod h1:x/1Gn8zydmfq8dk6e9PdstVsDgu9RuyIIJqAaF//0IM= github.com/cpuguy83/go-md2man/v2 v2.0.3/go.mod h1:tgQtvFlXSQOSOSIRvRPT7W67SCa46tRHOmNcaadrF8o= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= @@ -70,15 +66,12 @@ github.com/envoyproxy/go-control-plane v0.9.1-0.20191026205805-5f8ba28d4473/go.m github.com/envoyproxy/go-control-plane v0.9.4/go.mod h1:6rpuAdCZL397s3pYoYcLgu1mIlRU8Am5FuJP05cCM98= github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7+kN2VEUnK/pcBlmesArF7c= github.com/envoyproxy/protoc-gen-validate v1.0.4 h1:gVPz/FMfvh57HdSJQyvBtF00j8JU4zdyUgIUNhlgg0A= -github.com/envoyproxy/protoc-gen-validate v1.0.4/go.mod h1:qys6tmnRsYrQqIhm2bvKZH4Blx/1gTIZ2UKVY1M+Yew= github.com/fatih/color v1.14.1 h1:qfhVLaG5s+nCROl1zJsZRxFeYrHLqWroPOQ8BWiNb4w= github.com/fatih/color v1.14.1/go.mod h1:2oHN61fhTpgcxD3TSWCgKDiH1+x4OiDVVGH8WlgGZGg= github.com/felixge/httpsnoop v1.0.4 h1:NFTV2Zj1bL4mc9sqWACXbQFVBBg2W3GPvqp8/ESS2Wg= github.com/felixge/httpsnoop v1.0.4/go.mod h1:m8KPJKqk1gH5J9DgRY2ASl2lWCfGKXixSwevea8zH2U= github.com/fortytw2/leaktest v1.3.0 h1:u8491cBMTQ8ft8aeV+adlcytMZylmA5nnwwkRZjI8vw= -github.com/fortytw2/leaktest v1.3.0/go.mod h1:jDsjWgpAGjm2CA7WthBh/CdZYEPF31XHquHwclZch5g= github.com/frankban/quicktest v1.14.6 h1:7Xjx+VpznH+oBnejlPUj8oUpdxnVs4f8XU8WnHkI4W8= -github.com/frankban/quicktest v1.14.6/go.mod h1:4ptaffx2x8+WTWXmUCuVU6aPUX1/Mz7zb5vbUoiM6w0= github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo= github.com/fsnotify/fsnotify v1.4.9/go.mod h1:znqG4EE+3YCdAaPaxE2ZRY/06pZUdp0tY4IgpuI1SZQ= github.com/fsnotify/fsnotify v1.7.0 h1:8JEhPFa5W2WU7YfeZzPNqzMP6Lwt7L2715Ggo0nosvA= @@ -97,7 +90,6 @@ github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre github.com/go-ole/go-ole v1.2.6 h1:/Fpf6oFPoeFik9ty7siob0G6Ke8QvQEuVcuChpwXzpY= github.com/go-ole/go-ole v1.2.6/go.mod h1:pprOEPIfldk/42T2oK7lQ4v4JSDwmV0As9GaiUsvbm0= github.com/go-playground/assert/v2 v2.2.0 h1:JvknZsQTYeFEAhQwI4qEt9cyV5ONwRHC+lYKSsYSR8s= -github.com/go-playground/assert/v2 v2.2.0/go.mod h1:VDjEfimB/XKnb+ZQfWdccd7VUvScMdVu0Titje2rxJ4= github.com/go-playground/locales v0.14.1 h1:EWaQ/wswjilfKLTECiXz7Rh+3BjFhfDFKv/oXslEjJA= github.com/go-playground/locales v0.14.1/go.mod h1:hxrqLVvrK65+Rwrd5Fc6F2O76J/NuW9t0sjnWqG1slY= github.com/go-playground/universal-translator v0.18.1 h1:Bcnm0ZwsGyWbCzImXv+pAJnYK9S473LQFuzCbDbfSFY= @@ -145,13 +137,11 @@ github.com/google/go-cmp v0.5.2/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/ github.com/google/go-cmp v0.5.3/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI= -github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= github.com/google/go-querystring v1.0.0/go.mod h1:odCYkC5MyYFN7vkCjXpyrEuKhc/BUO6wN/zVPAxq5ck= github.com/google/go-querystring v1.1.0 h1:AnCroh3fv4ZBgVIf1Iwtovgjaw/GiKJo8M8yD/fhyJ8= github.com/google/go-querystring v1.1.0/go.mod h1:Kcdr2DB4koayq7X8pmAG4sNG59So17icRSOU623lUBU= github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg= github.com/google/martian/v3 v3.3.2 h1:IqNFLAmvJOgVlpdEBiQbDc2EwKW77amAycfTuWKdfvw= -github.com/google/martian/v3 v3.3.2/go.mod h1:oBOf6HBosgwRXnUGWUB05QECsc6uvmMiJ3+6W4l/CUk= github.com/google/pprof v0.0.0-20210407192527-94a9f03dee38/go.mod h1:kpwsk12EmLew5upagYY7GY0pfYCcupk39gWOCRROcvE= github.com/google/s2a-go v0.1.7 h1:60BLSyTrOV4/haCDW4zb1guZItoSq8foHCXrAnjBo/o= github.com/google/s2a-go v0.1.7/go.mod h1:50CgR4k1jNlWBu4UfS4AcfhVe1r6pdZPygJ3R8F0Qdw= @@ -204,7 +194,6 @@ github.com/jinzhu/inflection v1.0.0/go.mod h1:h+uFLlag+Qp1Va5pdKtLDYj+kHp5pxUVkr github.com/jinzhu/now v1.1.5 h1:/o9tlHleP7gOFmsnYNz3RGnqzefHA47wQpKrrdTIwXQ= github.com/jinzhu/now v1.1.5/go.mod h1:d3SSVoowX0Lcu0IBviAWJpolVfI5UJVZZ7cO71lE/z8= github.com/jonboulle/clockwork v0.4.0 h1:p4Cf1aMWXnXAUh8lVfewRBx1zaTSYKrKMF2g3ST4RZ4= -github.com/jonboulle/clockwork v0.4.0/go.mod h1:xgRqUGwRcjKCO1vbZUEtSLrqKoPSsUpK7fnezOII0kc= github.com/json-iterator/go v1.1.12 h1:PV8peI4a0ysnczrg+LtxykD8LfKY9ML6u2jnxaEnrnM= github.com/json-iterator/go v1.1.12/go.mod h1:e30LSqwooZae/UwlEbR2852Gd8hjQvJoHmT4TnhNGBo= github.com/kelindar/bitmap v1.5.2 h1:XwX7CTvJtetQZ64zrOkApoZZHBJRkjE23NfqUALA/HE= @@ -220,9 +209,7 @@ github.com/klauspost/cpuid/v2 v2.0.9/go.mod h1:FInQzS24/EEf25PyTYn52gqo7WaD8xa02 github.com/klauspost/cpuid/v2 v2.2.6 h1:ndNyv040zDGIDh8thGkXYjnFtiN02M1PVVF+JE/48xc= github.com/klauspost/cpuid/v2 v2.2.6/go.mod h1:Lcz8mBdAVJIBVzewtcLocK12l3Y+JytZYpaMropDUws= github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE= -github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk= github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= -github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= github.com/leodido/go-urn v1.4.0 h1:WT9HwE9SGECu3lg4d/dIA+jxlljEa1/ffXKmRjqdmIQ= github.com/leodido/go-urn v1.4.0/go.mod h1:bvxc+MVxLKB4z00jd1z+Dvzr47oO32F/QSNjSBOlFxI= github.com/lestrrat-go/envload v0.0.0-20180220234015-a3eb8ddeffcc h1:RKf14vYWi2ttpEmkA4aQ3j4u9dStX2t4M8UM6qqNsG8= @@ -281,8 +268,8 @@ github.com/onsi/gomega v1.18.1 h1:M1GfJqGRrBrrGGsbxzV5dqM2U2ApXefZCQpkukxYRLE= github.com/onsi/gomega v1.18.1/go.mod h1:0q+aL8jAiMXy9hbwj2mr5GziHiwhAIQpFmmtT5hitRs= github.com/openimsdk/gomake v0.0.6 h1:bJmQWDHBj8PQ7oGJ2SL3Gsx0k5CdI/BPfGzlGcV105s= github.com/openimsdk/gomake v0.0.6/go.mod h1:PndCozNc2IsQIciyn9mvEblYWZwJmAI+06z94EY+csI= -github.com/openimsdk/protocol v0.0.63 h1:9DnweZe9nEYDFa4fGTbC9Cqi0gLUdtBhRo1NRP2X3WQ= -github.com/openimsdk/protocol v0.0.63/go.mod h1:OZQA9FR55lseYoN2Ql1XAHYKHJGu7OMNkUbuekrKCM8= +github.com/openimsdk/protocol v0.0.64-0.20240419092747-47b8c07ee719 h1:tr4iHGfegx2ZWxkT6bvnFeuKuh2OsJE3GBHwjEGMZcg= +github.com/openimsdk/protocol v0.0.64-0.20240419092747-47b8c07ee719/go.mod h1:OZQA9FR55lseYoN2Ql1XAHYKHJGu7OMNkUbuekrKCM8= github.com/openimsdk/tools v0.0.47-alpha.42 h1:wM6t9otTLhXECq8aQcYaZGvBgo/ZAmbNTqVt3g3NHGg= github.com/openimsdk/tools v0.0.47-alpha.42/go.mod h1:P4oGP1Pd+d4ctbLD5U/XQTgl8yu8Hd3skx640Fr69ko= github.com/pelletier/go-toml/v2 v2.1.0 h1:FnwAJ4oYMvbT/34k9zzHuZNrhlz48GB3/s6at6/MHO4= @@ -295,7 +282,6 @@ github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZN github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 h1:Jamvg5psRIccs7FGNTlIRMkT8wgtp5eCXdBlqhYGL6U= github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/prashantv/gostub v1.1.0 h1:BTyx3RfQjRHnUWaGF9oQos79AlQ5k8WNktv7VGvVH4g= -github.com/prashantv/gostub v1.1.0/go.mod h1:A5zLQHz7ieHGG7is6LLXLz7I8+3LZzsrV0P1IAHhP5U= github.com/prometheus/client_golang v1.18.0 h1:HzFfmkOzH5Q8L8G+kSJKUx5dtG87sewO+FoDDqP5Tbk= github.com/prometheus/client_golang v1.18.0/go.mod h1:T+GXkCk5wSJyOqMIzVgvvjFDlkOQntgjkJWKrN5txjA= github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA= @@ -312,7 +298,6 @@ github.com/redis/go-redis/v9 v9.4.0/go.mod h1:hdY0cQFCN4fnSYT6TkisLufl/4W5UIXyv0 github.com/robfig/cron/v3 v3.0.1 h1:WdRxkvbJztn8LMz/QEvLN5sBU+xKpSqwwUO1Pjr4qDs= github.com/robfig/cron/v3 v3.0.1/go.mod h1:eQICP3HwyT7UooqI/z+Ov+PtYAWygg1TEWWzGIFLtro= github.com/rogpeppe/go-internal v1.11.0 h1:cWPaGQEPrBb5/AsnsZesgZZ9yb1OQ+GOISoDNXVBh4M= -github.com/rogpeppe/go-internal v1.11.0/go.mod h1:ddIwULY96R17DhadqLgMfk9H9tvdUzkipdSkR5nkCZA= github.com/rs/xid v1.5.0 h1:mKX4bl4iPYJtEIxp6CYiUuLQ/8DYMoz0PUdtGgMFRVc= github.com/rs/xid v1.5.0/go.mod h1:trrq9SKmegXys3aeAKXMUTdJsYXVwGY3RLcfgqegfbg= github.com/russross/blackfriday/v2 v2.1.0/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM= @@ -391,7 +376,6 @@ go.opentelemetry.io/otel v1.23.0/go.mod h1:YCycw9ZeKhcJFrb34iVSkyT0iczq/zYDtZYFu go.opentelemetry.io/otel/metric v1.23.0 h1:pazkx7ss4LFVVYSxYew7L5I6qvLXHA0Ap2pwV+9Cnpo= go.opentelemetry.io/otel/metric v1.23.0/go.mod h1:MqUW2X2a6Q8RN96E2/nqNoT+z9BSms20Jb7Bbp+HiTo= go.opentelemetry.io/otel/sdk v1.21.0 h1:FTt8qirL1EysG6sTQRZ5TokkU8d0ugCj8htOgThZXQ8= -go.opentelemetry.io/otel/sdk v1.21.0/go.mod h1:Nna6Yv7PWTdgJHVRD9hIYywQBRx7pbox6nwBnZIxl/E= go.opentelemetry.io/otel/trace v1.23.0 h1:37Ik5Ib7xfYVb4V1UtnT97T1jI+AoIYkJyPkuL4iJgI= go.opentelemetry.io/otel/trace v1.23.0/go.mod h1:GSGTbIClEsuZrGIzoEHqsVfxgn5UkggkflQwDScNUsk= go.uber.org/atomic v1.9.0 h1:ECmE8Bn/WFTYwEW/bpKD3M8VtR/zQVbavAoalC1PYyE= @@ -399,7 +383,6 @@ go.uber.org/atomic v1.9.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc= go.uber.org/automaxprocs v1.5.3 h1:kWazyxZUrS3Gs4qUpbwo5kEIMGe/DAvi5Z4tl2NW4j8= go.uber.org/automaxprocs v1.5.3/go.mod h1:eRbA25aqJrxAbsLO0xy5jVwPt7FQnRgjW+efnwa1WM0= go.uber.org/goleak v1.1.11 h1:wy28qYRKZgnJTxGxvye5/wgWr1EKjmUDGYox5mGlRlI= -go.uber.org/goleak v1.1.11/go.mod h1:cwTWslyiVhfpKIDGSZEM2HlOvcqm+tG4zioyIeLoqMQ= go.uber.org/multierr v1.11.0 h1:blXXJkSxSSfBVBlC76pxqeO+LN3aDfLQo+309xJstO0= go.uber.org/multierr v1.11.0/go.mod h1:20+QtiLqy0Nd6FdQB9TLXag12DsQkrbs3htMFfDN80Y= go.uber.org/zap v1.24.0 h1:FiJd5l1UOLj0wCgbSE0rwwXHzEdAZS6hiiSnxJN/D60= @@ -506,7 +489,6 @@ golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8T golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20220907171357-04be3eba64a2 h1:H2TDz8ibqkAF6YGhCdN3jS9O0/s90v0rJh3X/OLHEUk= -golang.org/x/xerrors v0.0.0-20220907171357-04be3eba64a2/go.mod h1:K8+ghG5WaK9qNqU5K3HdILfMLy1f3aNYFI/wnl100a8= google.golang.org/api v0.165.0 h1:zd5d4JIIIaYYsfVy1HzoXYZ9rWCSBxxAglbczzo7Bgc= google.golang.org/api v0.165.0/go.mod h1:2OatzO7ZDQsoS7IFf3rvsE17/TldiU3F/zxFHeqUB5o= google.golang.org/appengine v1.1.0/go.mod h1:EbEs0AVv82hx2wNQdGPgUI5lhzA/G0D9YwlJXL52JkM= @@ -544,7 +526,6 @@ google.golang.org/protobuf v1.33.0 h1:uNO2rsAINq/JlFpSdYEKIZ0uKD/R9cpdv0T+yoGwGm google.golang.org/protobuf v1.33.0/go.mod h1:c6P6GXX6sHbq/GpV6MGZEdwhWPcYBgnhAHhKbcUYpos= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk= -gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q= gopkg.in/fsnotify.v1 v1.4.7/go.mod h1:Tz8NjZHkW78fSQdbUxIjBTcgA1z1m8ZHf0WmKUhAMys= gopkg.in/ini.v1 v1.67.0 h1:Dgnx+6+nfE+IfzjUEISNeydPJh9AXNNsWbGP9KzCsOA= gopkg.in/ini.v1 v1.67.0/go.mod h1:pNLf8WUiyNEtQjuu5G5vTm06TEv9tsIgeAvK8hOrP4k= diff --git a/internal/api/custom_validator.go b/internal/api/custom_validator.go index e3873d16f..99c84f074 100644 --- a/internal/api/custom_validator.go +++ b/internal/api/custom_validator.go @@ -26,7 +26,7 @@ func RequiredIf(fl validator.FieldLevel) bool { switch sessionType { case constant.SingleChatType, constant.NotificationChatType: return fl.FieldName() != "RecvID" || fl.Field().String() != "" - case constant.GroupChatType, constant.SuperGroupChatType: + case constant.WriteGroupChatType, constant.ReadGroupChatType: return fl.FieldName() != "GroupID" || fl.Field().String() != "" default: return true diff --git a/internal/msgtransfer/online_history_msg_handler.go b/internal/msgtransfer/online_history_msg_handler.go index 43f4622a8..9960aa518 100644 --- a/internal/msgtransfer/online_history_msg_handler.go +++ b/internal/msgtransfer/online_history_msg_handler.go @@ -257,7 +257,7 @@ func (och *OnlineHistoryRedisConsumerHandler) handleMsg(ctx context.Context, key } if isNewConversation { switch storageList[0].SessionType { - case constant.SuperGroupChatType: + case constant.ReadGroupChatType: log.ZInfo(ctx, "group chat first create conversation", "conversationID", conversationID) userIDs, err := och.groupRpcClient.GetGroupMemberIDs(ctx, storageList[0].GroupID) diff --git a/internal/push/callback.go b/internal/push/callback.go index bb5a89069..889729582 100644 --- a/internal/push/callback.go +++ b/internal/push/callback.go @@ -16,6 +16,7 @@ package push import ( "context" + "encoding/json" "github.com/openimsdk/open-im-server/v3/pkg/common/webhook" "github.com/openimsdk/open-im-server/v3/pkg/callbackstruct" @@ -26,7 +27,7 @@ import ( "github.com/openimsdk/tools/utils/datautil" ) -func (p *Pusher) webhookBeforeOfflinePush(ctx context.Context, before *config.BeforeConfig, userIDs []string, msg *sdkws.MsgData, offlinePushUserIDs *[]string) error { +func (c *ConsumerHandler) webhookBeforeOfflinePush(ctx context.Context, before *config.BeforeConfig, userIDs []string, msg *sdkws.MsgData, offlinePushUserIDs *[]string) error { return webhook.WithCondition(ctx, before, func(ctx context.Context) error { if msg.ContentType == constant.Typing { return nil @@ -53,7 +54,7 @@ func (p *Pusher) webhookBeforeOfflinePush(ctx context.Context, before *config.Be resp := &callbackstruct.CallbackBeforePushResp{} - if err := p.webhookClient.SyncPost(ctx, req.GetCallbackCommand(), req, resp, before); err != nil { + if err := c.webhookClient.SyncPost(ctx, req.GetCallbackCommand(), req, resp, before); err != nil { return err } @@ -67,7 +68,7 @@ func (p *Pusher) webhookBeforeOfflinePush(ctx context.Context, before *config.Be }) } -func (p *Pusher) webhookBeforeOnlinePush(ctx context.Context, before *config.BeforeConfig, userIDs []string, msg *sdkws.MsgData) error { +func (c *ConsumerHandler) webhookBeforeOnlinePush(ctx context.Context, before *config.BeforeConfig, userIDs []string, msg *sdkws.MsgData) error { return webhook.WithCondition(ctx, before, func(ctx context.Context) error { if datautil.Contain(msg.SendID, userIDs...) || msg.ContentType == constant.Typing { return nil @@ -91,14 +92,14 @@ func (p *Pusher) webhookBeforeOnlinePush(ctx context.Context, before *config.Bef Content: GetContent(msg), } resp := &callbackstruct.CallbackBeforePushResp{} - if err := p.webhookClient.SyncPost(ctx, req.GetCallbackCommand(), req, resp, before); err != nil { + if err := c.webhookClient.SyncPost(ctx, req.GetCallbackCommand(), req, resp, before); err != nil { return err } return nil }) } -func (p *Pusher) webhookBeforeGroupOnlinePush( +func (c *ConsumerHandler) webhookBeforeGroupOnlinePush( ctx context.Context, before *config.BeforeConfig, groupID string, @@ -126,7 +127,7 @@ func (p *Pusher) webhookBeforeGroupOnlinePush( Seq: msg.Seq, } resp := &callbackstruct.CallbackBeforeSuperGroupOnlinePushResp{} - if err := p.webhookClient.SyncPost(ctx, req.GetCallbackCommand(), req, resp, before); err != nil { + if err := c.webhookClient.SyncPost(ctx, req.GetCallbackCommand(), req, resp, before); err != nil { return err } if len(resp.UserIDs) != 0 { @@ -135,3 +136,15 @@ func (p *Pusher) webhookBeforeGroupOnlinePush( return nil }) } + +func GetContent(msg *sdkws.MsgData) string { + if msg.ContentType >= constant.NotificationBegin && msg.ContentType <= constant.NotificationEnd { + var notification sdkws.NotificationElem + if err := json.Unmarshal(msg.Content, ¬ification); err != nil { + return "" + } + return notification.Detail + } else { + return string(msg.Content) + } +} diff --git a/internal/push/consumer_init.go b/internal/push/consumer_init.go deleted file mode 100644 index 3b401735e..000000000 --- a/internal/push/consumer_init.go +++ /dev/null @@ -1,41 +0,0 @@ -// Copyright © 2023 OpenIM. All rights reserved. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package push - -import ( - "context" - - "github.com/openimsdk/open-im-server/v3/pkg/common/config" -) - -type Consumer struct { - pushCh ConsumerHandler - // successCount is unused - // successCount uint64 -} - -func NewConsumer(kafkaConf *config.Kafka, pusher *Pusher) (*Consumer, error) { - c, err := NewConsumerHandler(kafkaConf, pusher) - if err != nil { - return nil, err - } - return &Consumer{ - pushCh: *c, - }, nil -} - -func (c *Consumer) Start() { - go c.pushCh.pushConsumerGroup.RegisterHandleAndConsumer(context.Background(), &c.pushCh) -} diff --git a/internal/push/offlinepush/offlinepush_interface.go b/internal/push/offlinepush/offlinepush_interface.go deleted file mode 100644 index a5d4051f9..000000000 --- a/internal/push/offlinepush/offlinepush_interface.go +++ /dev/null @@ -1,37 +0,0 @@ -// Copyright © 2023 OpenIM. All rights reserved. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package offlinepush - -import ( - "context" -) - -// OfflinePusher Offline Pusher. -type OfflinePusher interface { - Push(ctx context.Context, userIDs []string, title, content string, opts *Opts) error -} - -// Opts opts. -type Opts struct { - Signal *Signal - IOSPushSound string - IOSBadgeCount bool - Ex string -} - -// Signal message id. -type Signal struct { - ClientMsgID string -} diff --git a/internal/push/offlinepush/offlinepusher.go b/internal/push/offlinepush/offlinepusher.go new file mode 100644 index 000000000..0706be64a --- /dev/null +++ b/internal/push/offlinepush/offlinepusher.go @@ -0,0 +1,52 @@ +// Copyright © 2023 OpenIM. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package offlinepush + +import ( + "context" + "github.com/openimsdk/open-im-server/v3/internal/push/offlinepush/dummy" + "github.com/openimsdk/open-im-server/v3/internal/push/offlinepush/fcm" + "github.com/openimsdk/open-im-server/v3/internal/push/offlinepush/getui" + "github.com/openimsdk/open-im-server/v3/internal/push/offlinepush/jpush" + "github.com/openimsdk/open-im-server/v3/internal/push/offlinepush/options" + "github.com/openimsdk/open-im-server/v3/pkg/common/config" + "github.com/openimsdk/open-im-server/v3/pkg/common/db/cache" +) + +const ( + geTUI = "getui" + firebase = "fcm" + jPush = "jpush" +) + +// OfflinePusher Offline Pusher. +type OfflinePusher interface { + Push(ctx context.Context, userIDs []string, title, content string, opts *options.Opts) error +} + +func NewOfflinePusher(pushConf *config.Push, cache cache.ThirdCache) (OfflinePusher, error) { + var offlinePusher OfflinePusher + switch pushConf.Enable { + case geTUI: + offlinePusher = getui.NewClient(pushConf, cache) + case firebase: + return fcm.NewClient(pushConf, cache) + case jPush: + offlinePusher = jpush.NewClient(pushConf) + default: + offlinePusher = dummy.NewClient() + } + return offlinePusher, nil +} diff --git a/internal/push/offlinepush/options/options.go b/internal/push/offlinepush/options/options.go new file mode 100644 index 000000000..056f6b711 --- /dev/null +++ b/internal/push/offlinepush/options/options.go @@ -0,0 +1,14 @@ +package options + +// Opts opts. +type Opts struct { + Signal *Signal + IOSPushSound string + IOSBadgeCount bool + Ex string +} + +// Signal message id. +type Signal struct { + ClientMsgID string +} diff --git a/internal/push/onlinepusher.go b/internal/push/onlinepusher.go new file mode 100644 index 000000000..30bdf3e2e --- /dev/null +++ b/internal/push/onlinepusher.go @@ -0,0 +1,204 @@ +package push + +import ( + "context" + "github.com/openimsdk/protocol/msggateway" + "github.com/openimsdk/protocol/sdkws" + "github.com/openimsdk/tools/discovery" + "github.com/openimsdk/tools/log" + "github.com/openimsdk/tools/utils/datautil" + "golang.org/x/sync/errgroup" + "google.golang.org/grpc" + "sync" +) + +const ( + KUBERNETES = "k8s" + ZOOKEEPER = "zookeeper" +) + +type OnlinePusher interface { + GetConnsAndOnlinePush(ctx context.Context, msg *sdkws.MsgData, + pushToUserIDs []string) (wsResults []*msggateway.SingleMsgToUserResults, err error) + GetOnlinePushFailedUserIDs(ctx context.Context, msg *sdkws.MsgData, wsResults []*msggateway.SingleMsgToUserResults, + pushToUserIDs *[]string) []string +} + +type emptyOnlinePUsher struct{} + +func newEmptyOnlinePUsher() *emptyOnlinePUsher { + return &emptyOnlinePUsher{} +} + +func (emptyOnlinePUsher) GetConnsAndOnlinePush(ctx context.Context, msg *sdkws.MsgData, + pushToUserIDs []string) (wsResults []*msggateway.SingleMsgToUserResults, err error) { + log.ZWarn(ctx, "emptyOnlinePUsher GetConnsAndOnlinePush", nil) + return nil, nil +} +func (u emptyOnlinePUsher) GetOnlinePushFailedUserIDs(ctx context.Context, msg *sdkws.MsgData, + wsResults []*msggateway.SingleMsgToUserResults, pushToUserIDs *[]string) []string { + log.ZWarn(ctx, "emptyOnlinePUsher GetOnlinePushFailedUserIDs", nil) + return nil +} + +func NewOnlinePusher(disCov discovery.SvcDiscoveryRegistry, config *Config) OnlinePusher { + switch config.Share.Env { + case KUBERNETES: + return NewK8sStaticConsistentHash(disCov, config) + case ZOOKEEPER: + return NewDefaultAllNode(disCov, config) + default: + return newEmptyOnlinePUsher() + } +} + +type DefaultAllNode struct { + disCov discovery.SvcDiscoveryRegistry + config *Config +} + +func NewDefaultAllNode(disCov discovery.SvcDiscoveryRegistry, config *Config) *DefaultAllNode { + return &DefaultAllNode{disCov: disCov, config: config} +} + +func (d *DefaultAllNode) GetConnsAndOnlinePush(ctx context.Context, msg *sdkws.MsgData, + pushToUserIDs []string) (wsResults []*msggateway.SingleMsgToUserResults, err error) { + conns, err := d.disCov.GetConns(ctx, d.config.Share.RpcRegisterName.MessageGateway) + log.ZDebug(ctx, "get gateway conn", "conn length", len(conns)) + if err != nil { + return nil, err + } + + var ( + mu sync.Mutex + wg = errgroup.Group{} + input = &msggateway.OnlineBatchPushOneMsgReq{MsgData: msg, PushToUserIDs: pushToUserIDs} + maxWorkers = d.config.RpcConfig.MaxConcurrentWorkers + ) + + if maxWorkers < 3 { + maxWorkers = 3 + } + + wg.SetLimit(maxWorkers) + + // Online push message + for _, conn := range conns { + conn := conn // loop var safe + wg.Go(func() error { + msgClient := msggateway.NewMsgGatewayClient(conn) + reply, err := msgClient.SuperGroupOnlineBatchPushOneMsg(ctx, input) + if err != nil { + return nil + } + + log.ZDebug(ctx, "push result", "reply", reply) + if reply != nil && reply.SinglePushResult != nil { + mu.Lock() + wsResults = append(wsResults, reply.SinglePushResult...) + mu.Unlock() + } + + return nil + }) + } + + _ = wg.Wait() + + // always return nil + return wsResults, nil +} + +func (d *DefaultAllNode) GetOnlinePushFailedUserIDs(_ context.Context, msg *sdkws.MsgData, + wsResults []*msggateway.SingleMsgToUserResults, pushToUserIDs *[]string) []string { + + onlineSuccessUserIDs := []string{msg.SendID} + for _, v := range wsResults { + //message sender do not need offline push + if msg.SendID == v.UserID { + continue + } + // mobile online push success + if v.OnlinePush { + onlineSuccessUserIDs = append(onlineSuccessUserIDs, v.UserID) + } + + } + + return datautil.SliceSub(*pushToUserIDs, onlineSuccessUserIDs) +} + +type K8sStaticConsistentHash struct { + disCov discovery.SvcDiscoveryRegistry + config *Config +} + +func NewK8sStaticConsistentHash(disCov discovery.SvcDiscoveryRegistry, config *Config) *K8sStaticConsistentHash { + return &K8sStaticConsistentHash{disCov: disCov, config: config} +} + +func (k *K8sStaticConsistentHash) GetConnsAndOnlinePush(ctx context.Context, msg *sdkws.MsgData, + pushToUserIDs []string) (wsResults []*msggateway.SingleMsgToUserResults, err error) { + + var usersHost = make(map[string][]string) + for _, v := range pushToUserIDs { + tHost, err := k.disCov.GetUserIdHashGatewayHost(ctx, v) + if err != nil { + log.ZError(ctx, "get msg gateway hash error", err) + return nil, err + } + tUsers, tbl := usersHost[tHost] + if tbl { + tUsers = append(tUsers, v) + usersHost[tHost] = tUsers + } else { + usersHost[tHost] = []string{v} + } + } + log.ZDebug(ctx, "genUsers send hosts struct:", "usersHost", usersHost) + var usersConns = make(map[*grpc.ClientConn][]string) + for host, userIds := range usersHost { + tconn, _ := k.disCov.GetConn(ctx, host) + usersConns[tconn] = userIds + } + var ( + mu sync.Mutex + wg = errgroup.Group{} + maxWorkers = k.config.RpcConfig.MaxConcurrentWorkers + ) + if maxWorkers < 3 { + maxWorkers = 3 + } + wg.SetLimit(maxWorkers) + for conn, userIds := range usersConns { + tcon := conn + tuserIds := userIds + wg.Go(func() error { + input := &msggateway.OnlineBatchPushOneMsgReq{MsgData: msg, PushToUserIDs: tuserIds} + msgClient := msggateway.NewMsgGatewayClient(tcon) + reply, err := msgClient.SuperGroupOnlineBatchPushOneMsg(ctx, input) + if err != nil { + return nil + } + log.ZDebug(ctx, "push result", "reply", reply) + if reply != nil && reply.SinglePushResult != nil { + mu.Lock() + wsResults = append(wsResults, reply.SinglePushResult...) + mu.Unlock() + } + return nil + }) + } + _ = wg.Wait() + return wsResults, nil +} +func (k *K8sStaticConsistentHash) GetOnlinePushFailedUserIDs(_ context.Context, _ *sdkws.MsgData, + wsResults []*msggateway.SingleMsgToUserResults, _ *[]string) []string { + var needOfflinePushUserIDs []string + for _, v := range wsResults { + if !v.OnlinePush { + needOfflinePushUserIDs = append(needOfflinePushUserIDs, v.UserID) + } + } + return needOfflinePushUserIDs +} diff --git a/internal/push/push.go b/internal/push/push.go new file mode 100644 index 000000000..18012a864 --- /dev/null +++ b/internal/push/push.go @@ -0,0 +1,71 @@ +package push + +import ( + "context" + "github.com/openimsdk/open-im-server/v3/internal/push/offlinepush" + "github.com/openimsdk/open-im-server/v3/pkg/common/config" + "github.com/openimsdk/open-im-server/v3/pkg/common/db/cache" + "github.com/openimsdk/open-im-server/v3/pkg/common/db/controller" + pbpush "github.com/openimsdk/protocol/push" + "github.com/openimsdk/tools/db/redisutil" + "github.com/openimsdk/tools/discovery" + "google.golang.org/grpc" +) + +type pushServer struct { + database controller.PushDatabase + disCov discovery.SvcDiscoveryRegistry + offlinePusher offlinepush.OfflinePusher + pushCh *ConsumerHandler +} + +type Config struct { + RpcConfig config.Push + RedisConfig config.Redis + MongodbConfig config.Mongo + KafkaConfig config.Kafka + ZookeeperConfig config.ZooKeeper + NotificationConfig config.Notification + Share config.Share + WebhooksConfig config.Webhooks + LocalCacheConfig config.LocalCache +} + +func (p pushServer) PushMsg(ctx context.Context, req *pbpush.PushMsgReq) (*pbpush.PushMsgResp, error) { + //todo reserved Interface + return nil, nil +} + +func (p pushServer) DelUserPushToken(ctx context.Context, + req *pbpush.DelUserPushTokenReq) (resp *pbpush.DelUserPushTokenResp, err error) { + if err = p.database.DelFcmToken(ctx, req.UserID, int(req.PlatformID)); err != nil { + return nil, err + } + return &pbpush.DelUserPushTokenResp{}, nil +} + +func Start(ctx context.Context, config *Config, client discovery.SvcDiscoveryRegistry, server *grpc.Server) error { + rdb, err := redisutil.NewRedisClient(ctx, config.RedisConfig.Build()) + if err != nil { + return err + } + cacheModel := cache.NewThirdCache(rdb) + offlinePusher, err := offlinepush.NewOfflinePusher(&config.RpcConfig, cacheModel) + if err != nil { + return err + } + database := controller.NewPushDatabase(cacheModel) + + consumer, err := NewConsumerHandler(config, offlinePusher, rdb, client) + if err != nil { + return err + } + pbpush.RegisterPushMsgServiceServer(server, &pushServer{ + database: database, + disCov: client, + offlinePusher: offlinePusher, + pushCh: consumer, + }) + go consumer.pushConsumerGroup.RegisterHandleAndConsumer(ctx, consumer) + return nil +} diff --git a/internal/push/push_handler.go b/internal/push/push_handler.go index 5de3c594c..2246fb3f6 100644 --- a/internal/push/push_handler.go +++ b/internal/push/push_handler.go @@ -16,9 +16,22 @@ package push import ( "context" + "encoding/json" + "github.com/openimsdk/open-im-server/v3/internal/push/offlinepush" + "github.com/openimsdk/open-im-server/v3/internal/push/offlinepush/options" + "github.com/openimsdk/open-im-server/v3/pkg/common/prommetrics" + "github.com/openimsdk/open-im-server/v3/pkg/common/webhook" + "github.com/openimsdk/open-im-server/v3/pkg/msgprocessor" + "github.com/openimsdk/open-im-server/v3/pkg/rpccache" + "github.com/openimsdk/open-im-server/v3/pkg/rpcclient" + "github.com/openimsdk/open-im-server/v3/pkg/util/conversationutil" + "github.com/openimsdk/protocol/sdkws" + "github.com/openimsdk/tools/discovery" + "github.com/openimsdk/tools/mcontext" + "github.com/openimsdk/tools/utils/jsonutil" + "github.com/redis/go-redis/v9" "github.com/IBM/sarama" - "github.com/openimsdk/open-im-server/v3/pkg/common/config" "github.com/openimsdk/protocol/constant" pbchat "github.com/openimsdk/protocol/msg" pbpush "github.com/openimsdk/protocol/push" @@ -30,19 +43,38 @@ import ( ) type ConsumerHandler struct { - pushConsumerGroup *kafka.MConsumerGroup - pusher *Pusher + pushConsumerGroup *kafka.MConsumerGroup + offlinePusher offlinepush.OfflinePusher + onlinePusher OnlinePusher + groupLocalCache *rpccache.GroupLocalCache + conversationLocalCache *rpccache.ConversationLocalCache + msgRpcClient rpcclient.MessageRpcClient + conversationRpcClient rpcclient.ConversationRpcClient + groupRpcClient rpcclient.GroupRpcClient + webhookClient *webhook.Client + config *Config } -func NewConsumerHandler(kafkaConf *config.Kafka, pusher *Pusher) (*ConsumerHandler, error) { - pushConsumerGroup, err := kafka.NewMConsumerGroup(kafkaConf.Build(), kafkaConf.ToPushGroupID, []string{kafkaConf.ToPushTopic}) +func NewConsumerHandler(config *Config, offlinePusher offlinepush.OfflinePusher, rdb redis.UniversalClient, + client discovery.SvcDiscoveryRegistry) (*ConsumerHandler, error) { + var consumerHandler ConsumerHandler + var err error + consumerHandler.pushConsumerGroup, err = kafka.NewMConsumerGroup(config.KafkaConfig.Build(), config.KafkaConfig.ToPushGroupID, + []string{config.KafkaConfig.ToPushTopic}) if err != nil { return nil, err } - return &ConsumerHandler{ - pushConsumerGroup: pushConsumerGroup, - pusher: pusher, - }, nil + consumerHandler.offlinePusher = offlinePusher + consumerHandler.onlinePusher = NewOnlinePusher(client, config) + consumerHandler.groupRpcClient = rpcclient.NewGroupRpcClient(client, config.Share.RpcRegisterName.Group) + consumerHandler.groupLocalCache = rpccache.NewGroupLocalCache(consumerHandler.groupRpcClient, &config.LocalCacheConfig, rdb) + consumerHandler.msgRpcClient = rpcclient.NewMessageRpcClient(client, config.Share.RpcRegisterName.Msg) + consumerHandler.conversationRpcClient = rpcclient.NewConversationRpcClient(client, config.Share.RpcRegisterName.Conversation) + consumerHandler.conversationLocalCache = rpccache.NewConversationLocalCache(consumerHandler.conversationRpcClient, + &config.LocalCacheConfig, rdb) + consumerHandler.webhookClient = webhook.NewWebhookClient(config.WebhooksConfig.URL) + consumerHandler.config = config + return &consumerHandler, nil } func (c *ConsumerHandler) handleMs2PsChat(ctx context.Context, msg []byte) { @@ -63,8 +95,8 @@ func (c *ConsumerHandler) handleMs2PsChat(ctx context.Context, msg []byte) { } var err error switch msgFromMQ.MsgData.SessionType { - case constant.SuperGroupChatType: - err = c.pusher.Push2SuperGroup(ctx, pbData.MsgData.GroupID, pbData.MsgData) + case constant.ReadGroupChatType: + err = c.Push2SuperGroup(ctx, pbData.MsgData.GroupID, pbData.MsgData) default: var pushUserIDList []string isSenderSync := datautil.GetSwitchFromOptions(pbData.MsgData.Options, constant.IsSenderSync) @@ -73,14 +105,10 @@ func (c *ConsumerHandler) handleMs2PsChat(ctx context.Context, msg []byte) { } else { pushUserIDList = append(pushUserIDList, pbData.MsgData.RecvID, pbData.MsgData.SendID) } - err = c.pusher.Push2User(ctx, pushUserIDList, pbData.MsgData) + err = c.Push2User(ctx, pushUserIDList, pbData.MsgData) } if err != nil { - if err == errNoOfflinePusher { - log.ZWarn(ctx, "offline push failed", err, "msg", pbData.String()) - } else { - log.ZError(ctx, "push failed", err, "msg", pbData.String()) - } + log.ZError(ctx, "push failed", err, "msg", pbData.String()) } } @@ -96,3 +124,243 @@ func (c *ConsumerHandler) ConsumeClaim(sess sarama.ConsumerGroupSession, claim s } return nil } + +// Push2User Suitable for two types of conversations, one is SingleChatType and the other is NotificationChatType. +func (c *ConsumerHandler) Push2User(ctx context.Context, userIDs []string, msg *sdkws.MsgData) error { + log.ZDebug(ctx, "Get msg from msg_transfer And push msg", "userIDs", userIDs, "msg", msg.String()) + if err := c.webhookBeforeOnlinePush(ctx, &c.config.WebhooksConfig.BeforeOnlinePush, userIDs, msg); err != nil { + return err + } + wsResults, err := c.onlinePusher.GetConnsAndOnlinePush(ctx, msg, userIDs) + if err != nil { + return err + } + + log.ZDebug(ctx, "single and notification push result", "result", wsResults, "msg", msg, "push_to_userID", userIDs) + + if !c.shouldPushOffline(ctx, msg) { + return nil + } + + for _, v := range wsResults { + //message sender do not need offline push + if msg.SendID == v.UserID { + continue + } + //receiver online push success + if v.OnlinePush { + return nil + } + } + offlinePUshUserID := []string{msg.RecvID} + + //receiver offline push + if err = c.webhookBeforeOfflinePush(ctx, &c.config.WebhooksConfig.BeforeOfflinePush, + offlinePUshUserID, msg, nil); err != nil { + return err + } + + err = c.offlinePushMsg(ctx, msg, offlinePUshUserID) + if err != nil { + return err + } + + return nil +} + +func (c *ConsumerHandler) shouldPushOffline(_ context.Context, msg *sdkws.MsgData) bool { + isOfflinePush := datautil.GetSwitchFromOptions(msg.Options, constant.IsOfflinePush) + if !isOfflinePush { + return false + } + if msg.ContentType == constant.SignalingNotification { + return false + } + return true +} + +func (c *ConsumerHandler) Push2SuperGroup(ctx context.Context, groupID string, msg *sdkws.MsgData) (err error) { + log.ZDebug(ctx, "Get super group msg from msg_transfer and push msg", "msg", msg.String(), "groupID", groupID) + var pushToUserIDs []string + if err = c.webhookBeforeGroupOnlinePush(ctx, &c.config.WebhooksConfig.BeforeGroupOnlinePush, groupID, msg, + &pushToUserIDs); err != nil { + return err + } + + err = c.groupMessagesHandler(ctx, groupID, &pushToUserIDs, msg) + if err != nil { + return err + } + + wsResults, err := c.onlinePusher.GetConnsAndOnlinePush(ctx, msg, pushToUserIDs) + if err != nil { + return err + } + + log.ZDebug(ctx, "group push result", "result", wsResults, "msg", msg) + + if !c.shouldPushOffline(ctx, msg) { + return nil + } + needOfflinePushUserIDs := c.onlinePusher.GetOnlinePushFailedUserIDs(ctx, msg, wsResults, &pushToUserIDs) + + //filter some user, like don not disturb or don't need offline push etc. + needOfflinePushUserIDs, err = c.filterGroupMessageOfflinePush(ctx, groupID, msg, needOfflinePushUserIDs) + if err != nil { + return err + } + // Use offline push messaging + if len(needOfflinePushUserIDs) > 0 { + var offlinePushUserIDs []string + err = c.webhookBeforeOfflinePush(ctx, &c.config.WebhooksConfig.BeforeOfflinePush, needOfflinePushUserIDs, msg, &offlinePushUserIDs) + if err != nil { + return err + } + + if len(offlinePushUserIDs) > 0 { + needOfflinePushUserIDs = offlinePushUserIDs + } + + err = c.offlinePushMsg(ctx, msg, needOfflinePushUserIDs) + if err != nil { + log.ZError(ctx, "offlinePushMsg failed", err, "groupID", groupID, "msg", msg) + return err + } + + } + + return nil +} +func (c *ConsumerHandler) groupMessagesHandler(ctx context.Context, groupID string, pushToUserIDs *[]string, msg *sdkws.MsgData) (err error) { + if len(*pushToUserIDs) == 0 { + *pushToUserIDs, err = c.groupLocalCache.GetGroupMemberIDs(ctx, groupID) + if err != nil { + return err + } + switch msg.ContentType { + case constant.MemberQuitNotification: + var tips sdkws.MemberQuitTips + if unmarshalNotificationElem(msg.Content, &tips) != nil { + return err + } + if err = c.DeleteMemberAndSetConversationSeq(ctx, groupID, []string{tips.QuitUser.UserID}); err != nil { + log.ZError(ctx, "MemberQuitNotification DeleteMemberAndSetConversationSeq", err, "groupID", groupID, "userID", tips.QuitUser.UserID) + } + *pushToUserIDs = append(*pushToUserIDs, tips.QuitUser.UserID) + case constant.MemberKickedNotification: + var tips sdkws.MemberKickedTips + if unmarshalNotificationElem(msg.Content, &tips) != nil { + return err + } + kickedUsers := datautil.Slice(tips.KickedUserList, func(e *sdkws.GroupMemberFullInfo) string { return e.UserID }) + if err = c.DeleteMemberAndSetConversationSeq(ctx, groupID, kickedUsers); err != nil { + log.ZError(ctx, "MemberKickedNotification DeleteMemberAndSetConversationSeq", err, "groupID", groupID, "userIDs", kickedUsers) + } + + *pushToUserIDs = append(*pushToUserIDs, kickedUsers...) + case constant.GroupDismissedNotification: + if msgprocessor.IsNotification(msgprocessor.GetConversationIDByMsg(msg)) { // 消息先到,通知后到 + var tips sdkws.GroupDismissedTips + if unmarshalNotificationElem(msg.Content, &tips) != nil { + return err + } + log.ZInfo(ctx, "GroupDismissedNotificationInfo****", "groupID", groupID, "num", len(*pushToUserIDs), "list", pushToUserIDs) + if len(c.config.Share.IMAdminUserID) > 0 { + ctx = mcontext.WithOpUserIDContext(ctx, c.config.Share.IMAdminUserID[0]) + } + defer func(groupID string) { + if err = c.groupRpcClient.DismissGroup(ctx, groupID); err != nil { + log.ZError(ctx, "DismissGroup Notification clear members", err, "groupID", groupID) + } + }(groupID) + } + } + } + return err +} + +func (c *ConsumerHandler) offlinePushMsg(ctx context.Context, msg *sdkws.MsgData, offlinePushUserIDs []string) error { + title, content, opts, err := c.getOfflinePushInfos(msg) + if err != nil { + return err + } + err = c.offlinePusher.Push(ctx, offlinePushUserIDs, title, content, opts) + if err != nil { + prommetrics.MsgOfflinePushFailedCounter.Inc() + return err + } + return nil +} + +func (c *ConsumerHandler) filterGroupMessageOfflinePush(ctx context.Context, groupID string, msg *sdkws.MsgData, + offlinePushUserIDs []string) (userIDs []string, err error) { + + //todo local cache Obtain the difference set through local comparison. + needOfflinePushUserIDs, err := c.conversationRpcClient.GetConversationOfflinePushUserIDs( + ctx, conversationutil.GenGroupConversationID(groupID), offlinePushUserIDs) + if err != nil { + return nil, err + } + return needOfflinePushUserIDs, nil +} + +func (c *ConsumerHandler) getOfflinePushInfos(msg *sdkws.MsgData) (title, content string, opts *options.Opts, err error) { + type AtTextElem struct { + Text string `json:"text,omitempty"` + AtUserList []string `json:"atUserList,omitempty"` + IsAtSelf bool `json:"isAtSelf"` + } + + opts = &options.Opts{Signal: &options.Signal{}} + if msg.OfflinePushInfo != nil { + opts.IOSBadgeCount = msg.OfflinePushInfo.IOSBadgeCount + opts.IOSPushSound = msg.OfflinePushInfo.IOSPushSound + opts.Ex = msg.OfflinePushInfo.Ex + } + + if msg.OfflinePushInfo != nil { + title = msg.OfflinePushInfo.Title + content = msg.OfflinePushInfo.Desc + } + if title == "" { + switch msg.ContentType { + case constant.Text: + fallthrough + case constant.Picture: + fallthrough + case constant.Voice: + fallthrough + case constant.Video: + fallthrough + case constant.File: + title = constant.ContentType2PushContent[int64(msg.ContentType)] + case constant.AtText: + ac := AtTextElem{} + _ = jsonutil.JsonStringToStruct(string(msg.Content), &ac) + case constant.SignalingNotification: + title = constant.ContentType2PushContent[constant.SignalMsg] + default: + title = constant.ContentType2PushContent[constant.Common] + } + } + if content == "" { + content = title + } + return +} +func (c *ConsumerHandler) DeleteMemberAndSetConversationSeq(ctx context.Context, groupID string, userIDs []string) error { + conversationID := msgprocessor.GetConversationIDBySessionType(constant.ReadGroupChatType, groupID) + maxSeq, err := c.msgRpcClient.GetConversationMaxSeq(ctx, conversationID) + if err != nil { + return err + } + return c.conversationRpcClient.SetConversationMaxSeq(ctx, userIDs, conversationID, maxSeq) +} +func unmarshalNotificationElem(bytes []byte, t any) error { + var notification sdkws.NotificationElem + if err := json.Unmarshal(bytes, ¬ification); err != nil { + return err + } + + return json.Unmarshal([]byte(notification.Detail), t) +} diff --git a/internal/push/push_rpc_server.go b/internal/push/push_rpc_server.go deleted file mode 100644 index 68a8dd65b..000000000 --- a/internal/push/push_rpc_server.go +++ /dev/null @@ -1,121 +0,0 @@ -// Copyright © 2023 OpenIM. All rights reserved. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package push - -import ( - "context" - "github.com/openimsdk/open-im-server/v3/pkg/common/config" - "github.com/openimsdk/tools/db/redisutil" - - "github.com/openimsdk/open-im-server/v3/pkg/common/db/cache" - "github.com/openimsdk/open-im-server/v3/pkg/common/db/controller" - "github.com/openimsdk/open-im-server/v3/pkg/rpccache" - "github.com/openimsdk/open-im-server/v3/pkg/rpcclient" - "github.com/openimsdk/protocol/constant" - pbpush "github.com/openimsdk/protocol/push" - "github.com/openimsdk/tools/discovery" - "github.com/openimsdk/tools/log" - "github.com/openimsdk/tools/utils/datautil" - "google.golang.org/grpc" -) - -type pushServer struct { - pusher *Pusher -} - -type Config struct { - RpcConfig config.Push - RedisConfig config.Redis - MongodbConfig config.Mongo - KafkaConfig config.Kafka - ZookeeperConfig config.ZooKeeper - NotificationConfig config.Notification - Share config.Share - WebhooksConfig config.Webhooks - LocalCacheConfig config.LocalCache -} - -func Start(ctx context.Context, config *Config, client discovery.SvcDiscoveryRegistry, server *grpc.Server) error { - rdb, err := redisutil.NewRedisClient(ctx, config.RedisConfig.Build()) - if err != nil { - return err - } - cacheModel := cache.NewThirdCache(rdb) - offlinePusher, err := NewOfflinePusher(&config.RpcConfig, cacheModel) - if err != nil { - return err - } - database := controller.NewPushDatabase(cacheModel) - groupRpcClient := rpcclient.NewGroupRpcClient(client, config.Share.RpcRegisterName.Group) - conversationRpcClient := rpcclient.NewConversationRpcClient(client, config.Share.RpcRegisterName.Conversation) - msgRpcClient := rpcclient.NewMessageRpcClient(client, config.Share.RpcRegisterName.Msg) - pusher := NewPusher( - config, - client, - offlinePusher, - database, - rpccache.NewGroupLocalCache(groupRpcClient, &config.LocalCacheConfig, rdb), - rpccache.NewConversationLocalCache(conversationRpcClient, &config.LocalCacheConfig, rdb), - &conversationRpcClient, - &groupRpcClient, - &msgRpcClient, - ) - - pbpush.RegisterPushMsgServiceServer(server, &pushServer{ - pusher: pusher, - }) - - consumer, err := NewConsumer(&config.KafkaConfig, pusher) - if err != nil { - return err - } - - consumer.Start() - - return nil -} - -func (r *pushServer) PushMsg(ctx context.Context, pbData *pbpush.PushMsgReq) (resp *pbpush.PushMsgResp, err error) { - switch pbData.MsgData.SessionType { - case constant.SuperGroupChatType: - err = r.pusher.Push2SuperGroup(ctx, pbData.MsgData.GroupID, pbData.MsgData) - default: - var pushUserIDList []string - isSenderSync := datautil.GetSwitchFromOptions(pbData.MsgData.Options, constant.IsSenderSync) - if !isSenderSync { - pushUserIDList = append(pushUserIDList, pbData.MsgData.RecvID) - } else { - pushUserIDList = append(pushUserIDList, pbData.MsgData.RecvID, pbData.MsgData.SendID) - } - err = r.pusher.Push2User(ctx, pushUserIDList, pbData.MsgData) - } - if err != nil { - if err != errNoOfflinePusher { - return nil, err - } - log.ZWarn(ctx, "offline push failed", err, "msg", pbData.String()) - } - return &pbpush.PushMsgResp{}, nil -} - -func (r *pushServer) DelUserPushToken( - ctx context.Context, - req *pbpush.DelUserPushTokenReq, -) (resp *pbpush.DelUserPushTokenResp, err error) { - if err = r.pusher.database.DelFcmToken(ctx, req.UserID, int(req.PlatformID)); err != nil { - return nil, err - } - return &pbpush.DelUserPushTokenResp{}, nil -} diff --git a/internal/push/push_to_client.go b/internal/push/push_to_client.go deleted file mode 100644 index 85b3e9056..000000000 --- a/internal/push/push_to_client.go +++ /dev/null @@ -1,529 +0,0 @@ -// Copyright © 2023 OpenIM. All rights reserved. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package push - -import ( - "context" - "encoding/json" - "github.com/openimsdk/open-im-server/v3/pkg/common/webhook" - "github.com/openimsdk/tools/errs" - "sync" - - "github.com/openimsdk/open-im-server/v3/internal/push/offlinepush" - "github.com/openimsdk/open-im-server/v3/internal/push/offlinepush/dummy" - "github.com/openimsdk/open-im-server/v3/internal/push/offlinepush/fcm" - "github.com/openimsdk/open-im-server/v3/internal/push/offlinepush/getui" - "github.com/openimsdk/open-im-server/v3/internal/push/offlinepush/jpush" - "github.com/openimsdk/open-im-server/v3/pkg/common/config" - "github.com/openimsdk/open-im-server/v3/pkg/common/db/cache" - "github.com/openimsdk/open-im-server/v3/pkg/common/db/controller" - "github.com/openimsdk/open-im-server/v3/pkg/common/prommetrics" - "github.com/openimsdk/open-im-server/v3/pkg/msgprocessor" - "github.com/openimsdk/open-im-server/v3/pkg/rpccache" - "github.com/openimsdk/open-im-server/v3/pkg/rpcclient" - "github.com/openimsdk/open-im-server/v3/pkg/util/conversationutil" - "github.com/openimsdk/protocol/constant" - "github.com/openimsdk/protocol/conversation" - "github.com/openimsdk/protocol/msggateway" - "github.com/openimsdk/protocol/sdkws" - "github.com/openimsdk/tools/discovery" - "github.com/openimsdk/tools/log" - "github.com/openimsdk/tools/mcontext" - "github.com/openimsdk/tools/utils/datautil" - "github.com/openimsdk/tools/utils/jsonutil" - "github.com/openimsdk/tools/utils/stringutil" - "golang.org/x/sync/errgroup" - "google.golang.org/grpc" -) - -type Pusher struct { - config *Config - database controller.PushDatabase - discov discovery.SvcDiscoveryRegistry - offlinePusher offlinepush.OfflinePusher - groupLocalCache *rpccache.GroupLocalCache - conversationLocalCache *rpccache.ConversationLocalCache - msgRpcClient *rpcclient.MessageRpcClient - conversationRpcClient *rpcclient.ConversationRpcClient - groupRpcClient *rpcclient.GroupRpcClient - webhookClient *webhook.Client -} - -var errNoOfflinePusher = errs.New("no offlinePusher is configured") - -func NewPusher(config *Config, discov discovery.SvcDiscoveryRegistry, offlinePusher offlinepush.OfflinePusher, database controller.PushDatabase, - groupLocalCache *rpccache.GroupLocalCache, conversationLocalCache *rpccache.ConversationLocalCache, - conversationRpcClient *rpcclient.ConversationRpcClient, groupRpcClient *rpcclient.GroupRpcClient, msgRpcClient *rpcclient.MessageRpcClient, -) *Pusher { - return &Pusher{ - config: config, - discov: discov, - database: database, - offlinePusher: offlinePusher, - groupLocalCache: groupLocalCache, - conversationLocalCache: conversationLocalCache, - msgRpcClient: msgRpcClient, - conversationRpcClient: conversationRpcClient, - groupRpcClient: groupRpcClient, - webhookClient: webhook.NewWebhookClient(config.WebhooksConfig.URL), - } -} - -func NewOfflinePusher(pushConf *config.Push, cache cache.ThirdCache) (offlinepush.OfflinePusher, error) { - var offlinePusher offlinepush.OfflinePusher - switch pushConf.Enable { - case "getui": - offlinePusher = getui.NewClient(pushConf, cache) - case "fcm": - return fcm.NewClient(pushConf, cache) - case "jpush": - offlinePusher = jpush.NewClient(pushConf) - default: - offlinePusher = dummy.NewClient() - } - return offlinePusher, nil -} - -func (p *Pusher) DeleteMemberAndSetConversationSeq(ctx context.Context, groupID string, userIDs []string) error { - conevrsationID := msgprocessor.GetConversationIDBySessionType(constant.SuperGroupChatType, groupID) - maxSeq, err := p.msgRpcClient.GetConversationMaxSeq(ctx, conevrsationID) - if err != nil { - return err - } - return p.conversationRpcClient.SetConversationMaxSeq(ctx, userIDs, conevrsationID, maxSeq) -} - -func (p *Pusher) Push2User(ctx context.Context, userIDs []string, msg *sdkws.MsgData) error { - log.ZDebug(ctx, "Get msg from msg_transfer And push msg", "userIDs", userIDs, "msg", msg.String()) - - if err := p.webhookBeforeOnlinePush(ctx, &p.config.WebhooksConfig.BeforeOnlinePush, userIDs, msg); err != nil { - return err - } - - // push - wsResults, err := p.GetConnsAndOnlinePush(ctx, msg, userIDs) - if err != nil { - return err - } - - isOfflinePush := datautil.GetSwitchFromOptions(msg.Options, constant.IsOfflinePush) - log.ZDebug(ctx, "push_result", "ws push result", wsResults, "sendData", msg, "isOfflinePush", isOfflinePush, "push_to_userID", userIDs) - - if !isOfflinePush { - return nil - } - - if len(wsResults) == 0 { - return nil - } - onlinePushSuccUserIDSet := datautil.SliceSet(datautil.Filter(wsResults, func(e *msggateway.SingleMsgToUserResults) (string, bool) { - return e.UserID, e.OnlinePush && e.UserID != "" - })) - offlinePushUserIDList := datautil.Filter(wsResults, func(e *msggateway.SingleMsgToUserResults) (string, bool) { - _, exist := onlinePushSuccUserIDSet[e.UserID] - return e.UserID, !exist && e.UserID != "" && e.UserID != msg.SendID - }) - - if len(offlinePushUserIDList) > 0 { - if err = p.webhookBeforeOfflinePush(ctx, &p.config.WebhooksConfig.BeforeOfflinePush, offlinePushUserIDList, msg, &[]string{}); err != nil { - return err - } - err = p.offlinePushMsg(ctx, msg.SendID, msg, offlinePushUserIDList) - if err != nil { - return err - } - } - return nil -} - -func (p *Pusher) UnmarshalNotificationElem(bytes []byte, t any) error { - var notification sdkws.NotificationElem - if err := json.Unmarshal(bytes, ¬ification); err != nil { - return err - } - - return json.Unmarshal([]byte(notification.Detail), t) -} - -/* -k8s deployment,offline push group messages function. -*/ -func (p *Pusher) k8sOfflinePush2SuperGroup(ctx context.Context, groupID string, msg *sdkws.MsgData, wsResults []*msggateway.SingleMsgToUserResults) error { - - var needOfflinePushUserIDs []string - for _, v := range wsResults { - if !v.OnlinePush { - needOfflinePushUserIDs = append(needOfflinePushUserIDs, v.UserID) - } - } - if len(needOfflinePushUserIDs) > 0 { - var offlinePushUserIDs []string - err := p.webhookBeforeOfflinePush(ctx, &p.config.WebhooksConfig.BeforeOfflinePush, needOfflinePushUserIDs, msg, &offlinePushUserIDs) - if err != nil { - return err - } - - if len(offlinePushUserIDs) > 0 { - needOfflinePushUserIDs = offlinePushUserIDs - } - if msg.ContentType != constant.SignalingNotification { - resp, err := p.conversationRpcClient.Client.GetConversationOfflinePushUserIDs( - ctx, - &conversation.GetConversationOfflinePushUserIDsReq{ConversationID: conversationutil.GenGroupConversationID(groupID), UserIDs: needOfflinePushUserIDs}, - ) - if err != nil { - return err - } - if len(resp.UserIDs) > 0 { - err = p.offlinePushMsg(ctx, groupID, msg, resp.UserIDs) - if err != nil { - log.ZError(ctx, "offlinePushMsg failed", err, "groupID", groupID, "msg", msg) - return err - } - } - } - - } - return nil -} -func (p *Pusher) Push2SuperGroup(ctx context.Context, groupID string, msg *sdkws.MsgData) (err error) { - log.ZDebug(ctx, "Get super group msg from msg_transfer and push msg", "msg", msg.String(), "groupID", groupID) - var pushToUserIDs []string - - if err = p.webhookBeforeGroupOnlinePush(ctx, &p.config.WebhooksConfig.BeforeGroupOnlinePush, groupID, msg, &pushToUserIDs); err != nil { - return err - } - - if len(pushToUserIDs) == 0 { - pushToUserIDs, err = p.groupLocalCache.GetGroupMemberIDs(ctx, groupID) - if err != nil { - return err - } - - switch msg.ContentType { - case constant.MemberQuitNotification: - var tips sdkws.MemberQuitTips - if p.UnmarshalNotificationElem(msg.Content, &tips) != nil { - return err - } - defer func(groupID string, userIDs []string) { - if err = p.DeleteMemberAndSetConversationSeq(ctx, groupID, userIDs); err != nil { - log.ZError(ctx, "MemberQuitNotification DeleteMemberAndSetConversationSeq", err, "groupID", groupID, "userIDs", userIDs) - } - }(groupID, []string{tips.QuitUser.UserID}) - pushToUserIDs = append(pushToUserIDs, tips.QuitUser.UserID) - case constant.MemberKickedNotification: - var tips sdkws.MemberKickedTips - if p.UnmarshalNotificationElem(msg.Content, &tips) != nil { - return err - } - kickedUsers := datautil.Slice(tips.KickedUserList, func(e *sdkws.GroupMemberFullInfo) string { return e.UserID }) - defer func(groupID string, userIDs []string) { - if err = p.DeleteMemberAndSetConversationSeq(ctx, groupID, userIDs); err != nil { - log.ZError(ctx, "MemberKickedNotification DeleteMemberAndSetConversationSeq", err, "groupID", groupID, "userIDs", userIDs) - } - }(groupID, kickedUsers) - pushToUserIDs = append(pushToUserIDs, kickedUsers...) - case constant.GroupDismissedNotification: - // Messages arrive first, notifications arrive later - if msgprocessor.IsNotification(msgprocessor.GetConversationIDByMsg(msg)) { - var tips sdkws.GroupDismissedTips - if p.UnmarshalNotificationElem(msg.Content, &tips) != nil { - return err - } - log.ZDebug(ctx, "GroupDismissedNotificationInfo****", "groupID", groupID, "num", len(pushToUserIDs), "list", pushToUserIDs) - if len(p.config.Share.IMAdminUserID) > 0 { - ctx = mcontext.WithOpUserIDContext(ctx, p.config.Share.IMAdminUserID[0]) - } - defer func(groupID string) { - if err = p.groupRpcClient.DismissGroup(ctx, groupID); err != nil { - log.ZError(ctx, "DismissGroup notification clear members", err, "groupID", groupID) - } - }(groupID) - } - } - } - - wsResults, err := p.GetConnsAndOnlinePush(ctx, msg, pushToUserIDs) - if err != nil { - return err - } - - log.ZDebug(ctx, "get conn and online push success", "result", wsResults, "msg", msg) - isOfflinePush := datautil.GetSwitchFromOptions(msg.Options, constant.IsOfflinePush) - if isOfflinePush && p.config.Share.Env == "k8s" { - return p.k8sOfflinePush2SuperGroup(ctx, groupID, msg, wsResults) - } - if isOfflinePush && p.config.Share.Env == "zookeeper" { - var ( - onlineSuccessUserIDs = []string{msg.SendID} - webAndPcBackgroundUserIDs []string - ) - - for _, v := range wsResults { - if v.OnlinePush && v.UserID != msg.SendID { - onlineSuccessUserIDs = append(onlineSuccessUserIDs, v.UserID) - } - - if v.OnlinePush { - continue - } - - if len(v.Resp) == 0 { - continue - } - - for _, singleResult := range v.Resp { - if singleResult.ResultCode != -2 { - continue - } - - isPC := constant.PlatformIDToName(int(singleResult.RecvPlatFormID)) == constant.TerminalPC - isWebID := singleResult.RecvPlatFormID == constant.WebPlatformID - - if isPC || isWebID { - webAndPcBackgroundUserIDs = append(webAndPcBackgroundUserIDs, v.UserID) - } - } - } - - needOfflinePushUserIDs := stringutil.DifferenceString(onlineSuccessUserIDs, pushToUserIDs) - - // Use offline push messaging - if len(needOfflinePushUserIDs) > 0 { - var offlinePushUserIDs []string - - err = p.webhookBeforeOfflinePush(ctx, &p.config.WebhooksConfig.BeforeOfflinePush, needOfflinePushUserIDs, msg, &offlinePushUserIDs) - if err != nil { - return err - } - - if len(offlinePushUserIDs) > 0 { - needOfflinePushUserIDs = offlinePushUserIDs - } - if msg.ContentType != constant.SignalingNotification { - resp, err := p.conversationRpcClient.Client.GetConversationOfflinePushUserIDs( - ctx, - &conversation.GetConversationOfflinePushUserIDsReq{ConversationID: conversationutil.GenGroupConversationID(groupID), UserIDs: needOfflinePushUserIDs}, - ) - if err != nil { - return err - } - if len(resp.UserIDs) > 0 { - err = p.offlinePushMsg(ctx, groupID, msg, resp.UserIDs) - if err != nil { - log.ZError(ctx, "offlinePushMsg failed", err, "groupID", groupID, "msg", msg) - return err - } - if _, err := p.GetConnsAndOnlinePush(ctx, msg, stringutil.IntersectString(resp.UserIDs, webAndPcBackgroundUserIDs)); err != nil { - log.ZError(ctx, "offlinePushMsg failed", err, "groupID", groupID, "msg", msg, "userIDs", stringutil.IntersectString(needOfflinePushUserIDs, webAndPcBackgroundUserIDs)) - return err - } - } - } - - } - } - return nil -} - -func (p *Pusher) k8sOnlinePush(ctx context.Context, msg *sdkws.MsgData, pushToUserIDs []string) (wsResults []*msggateway.SingleMsgToUserResults, err error) { - var usersHost = make(map[string][]string) - for _, v := range pushToUserIDs { - tHost, err := p.discov.GetUserIdHashGatewayHost(ctx, v) - if err != nil { - log.ZError(ctx, "get msggateway hash error", err) - return nil, err - } - tUsers, tbl := usersHost[tHost] - if tbl { - tUsers = append(tUsers, v) - usersHost[tHost] = tUsers - } else { - usersHost[tHost] = []string{v} - } - } - log.ZDebug(ctx, "genUsers send hosts struct:", "usersHost", usersHost) - var usersConns = make(map[*grpc.ClientConn][]string) - for host, userIds := range usersHost { - tconn, _ := p.discov.GetConn(ctx, host) - usersConns[tconn] = userIds - } - var ( - mu sync.Mutex - wg = errgroup.Group{} - maxWorkers = p.config.RpcConfig.MaxConcurrentWorkers - ) - if maxWorkers < 3 { - maxWorkers = 3 - } - wg.SetLimit(maxWorkers) - for conn, userIds := range usersConns { - tcon := conn - tuserIds := userIds - wg.Go(func() error { - input := &msggateway.OnlineBatchPushOneMsgReq{MsgData: msg, PushToUserIDs: tuserIds} - msgClient := msggateway.NewMsgGatewayClient(tcon) - reply, err := msgClient.SuperGroupOnlineBatchPushOneMsg(ctx, input) - if err != nil { - return nil - } - log.ZDebug(ctx, "push result", "reply", reply) - if reply != nil && reply.SinglePushResult != nil { - mu.Lock() - wsResults = append(wsResults, reply.SinglePushResult...) - mu.Unlock() - } - return nil - }) - } - _ = wg.Wait() - return wsResults, nil -} -func (p *Pusher) GetConnsAndOnlinePush(ctx context.Context, msg *sdkws.MsgData, pushToUserIDs []string) (wsResults []*msggateway.SingleMsgToUserResults, err error) { - if p.config.Share.Env == "k8s" { - return p.k8sOnlinePush(ctx, msg, pushToUserIDs) - } - conns, err := p.discov.GetConns(ctx, p.config.Share.RpcRegisterName.MessageGateway) - log.ZDebug(ctx, "get gateway conn", "conn length", len(conns)) - if err != nil { - return nil, err - } - - var ( - mu sync.Mutex - wg = errgroup.Group{} - input = &msggateway.OnlineBatchPushOneMsgReq{MsgData: msg, PushToUserIDs: pushToUserIDs} - maxWorkers = p.config.RpcConfig.MaxConcurrentWorkers - ) - - if maxWorkers < 3 { - maxWorkers = 3 - } - - wg.SetLimit(maxWorkers) - - // Online push message - for _, conn := range conns { - conn := conn // loop var safe - wg.Go(func() error { - msgClient := msggateway.NewMsgGatewayClient(conn) - reply, err := msgClient.SuperGroupOnlineBatchPushOneMsg(ctx, input) - if err != nil { - return nil - } - - log.ZDebug(ctx, "push result", "reply", reply) - if reply != nil && reply.SinglePushResult != nil { - mu.Lock() - wsResults = append(wsResults, reply.SinglePushResult...) - mu.Unlock() - } - - return nil - }) - } - - _ = wg.Wait() - - // always return nil - return wsResults, nil -} - -func (p *Pusher) offlinePushMsg(ctx context.Context, conversationID string, msg *sdkws.MsgData, offlinePushUserIDs []string) error { - title, content, opts, err := p.getOfflinePushInfos(conversationID, msg) - if err != nil { - return err - } - err = p.offlinePusher.Push(ctx, offlinePushUserIDs, title, content, opts) - if err != nil { - prommetrics.MsgOfflinePushFailedCounter.Inc() - return err - } - return nil -} - -func (p *Pusher) GetOfflinePushOpts(msg *sdkws.MsgData) (opts *offlinepush.Opts, err error) { - opts = &offlinepush.Opts{Signal: &offlinepush.Signal{}} - // if msg.ContentType > constant.SignalingNotificationBegin && msg.ContentType < constant.SignalingNotificationEnd { - // req := &sdkws.SignalReq{} - // if err := proto.Unmarshal(msg.Content, req); err != nil { - // return nil, utils.Wrap(err, "") - // } - // switch req.Payload.(type) { - // case *sdkws.SignalReq_Invite, *sdkws.SignalReq_InviteInGroup: - // opts.Signal = &offlinepush.Signal{ClientMsgID: msg.ClientMsgID} - // } - // } - if msg.OfflinePushInfo != nil { - opts.IOSBadgeCount = msg.OfflinePushInfo.IOSBadgeCount - opts.IOSPushSound = msg.OfflinePushInfo.IOSPushSound - opts.Ex = msg.OfflinePushInfo.Ex - } - return opts, nil -} - -func (p *Pusher) getOfflinePushInfos(conversationID string, msg *sdkws.MsgData) (title, content string, opts *offlinepush.Opts, err error) { - if p.offlinePusher == nil { - err = errNoOfflinePusher - return - } - - type atContent struct { - Text string `json:"text"` - AtUserList []string `json:"atUserList"` - IsAtSelf bool `json:"isAtSelf"` - } - - opts, err = p.GetOfflinePushOpts(msg) - if err != nil { - return - } - - if msg.OfflinePushInfo != nil { - title = msg.OfflinePushInfo.Title - content = msg.OfflinePushInfo.Desc - } - if title == "" { - switch msg.ContentType { - case constant.Text: - fallthrough - case constant.Picture: - fallthrough - case constant.Voice: - fallthrough - case constant.Video: - fallthrough - case constant.File: - title = constant.ContentType2PushContent[int64(msg.ContentType)] - case constant.AtText: - ac := atContent{} - _ = jsonutil.JsonStringToStruct(string(msg.Content), &ac) - if datautil.Contain(conversationID, ac.AtUserList...) { - title = constant.ContentType2PushContent[constant.AtText] + constant.ContentType2PushContent[constant.Common] - } else { - title = constant.ContentType2PushContent[constant.GroupMsg] - } - case constant.SignalingNotification: - title = constant.ContentType2PushContent[constant.SignalMsg] - default: - title = constant.ContentType2PushContent[constant.Common] - } - } - if content == "" { - content = title - } - return -} diff --git a/internal/push/tools.go b/internal/push/tools.go deleted file mode 100644 index 076dc4e34..000000000 --- a/internal/push/tools.go +++ /dev/null @@ -1,31 +0,0 @@ -// Copyright © 2023 OpenIM. All rights reserved. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package push - -import ( - "github.com/openimsdk/protocol/constant" - "github.com/openimsdk/protocol/sdkws" - "google.golang.org/protobuf/proto" -) - -func GetContent(msg *sdkws.MsgData) string { - if msg.ContentType >= constant.NotificationBegin && msg.ContentType <= constant.NotificationEnd { - var tips sdkws.TipsComm - _ = proto.Unmarshal(msg.Content, &tips) - content := tips.JsonDetail - return content - } - return string(msg.Content) -} diff --git a/internal/rpc/conversation/conversaion.go b/internal/rpc/conversation/conversaion.go index cc253e141..1bf612923 100644 --- a/internal/rpc/conversation/conversaion.go +++ b/internal/rpc/conversation/conversaion.go @@ -210,7 +210,7 @@ func (c *conversationServer) SetConversations(ctx context.Context, req *pbconver if req.Conversation == nil { return nil, errs.ErrArgs.WrapMsg("conversation must not be nil") } - if req.Conversation.ConversationType == constant.GroupChatType { + if req.Conversation.ConversationType == constant.WriteGroupChatType { groupInfo, err := c.groupRpcClient.GetGroupInfo(ctx, req.Conversation.GroupID) if err != nil { return nil, err @@ -279,7 +279,7 @@ func (c *conversationServer) SetConversations(ctx context.Context, req *pbconver unequal++ } } - if req.Conversation.IsPrivateChat != nil && req.Conversation.ConversationType != constant.SuperGroupChatType { + if req.Conversation.IsPrivateChat != nil && req.Conversation.ConversationType != constant.ReadGroupChatType { var conversations []*tablerelation.ConversationModel for _, ownerUserID := range req.UserIDs { conversation2 := conversation @@ -467,7 +467,7 @@ func (c *conversationServer) getConversationInfo( sendIDs = append(sendIDs, chatLog.RecvID) } sendIDs = append(sendIDs, chatLog.SendID) - case constant.GroupChatType, constant.SuperGroupChatType: + case constant.WriteGroupChatType, constant.ReadGroupChatType: groupIDs = append(groupIDs, chatLog.GroupID) sendIDs = append(sendIDs, chatLog.SendID) } @@ -509,7 +509,7 @@ func (c *conversationServer) getConversationInfo( msgInfo.FaceURL = send.FaceURL msgInfo.SenderName = send.Nickname } - case constant.GroupChatType, constant.SuperGroupChatType: + case constant.WriteGroupChatType, constant.ReadGroupChatType: msgInfo.GroupID = chatLog.GroupID if group, ok := groupMap[chatLog.GroupID]; ok { msgInfo.GroupName = group.GroupName diff --git a/internal/rpc/group/group.go b/internal/rpc/group/group.go index 5d7224a2d..13bd7f9be 100644 --- a/internal/rpc/group/group.go +++ b/internal/rpc/group/group.go @@ -931,7 +931,7 @@ func (s *groupServer) QuitGroup(ctx context.Context, req *pbgroup.QuitGroupReq) } func (s *groupServer) deleteMemberAndSetConversationSeq(ctx context.Context, groupID string, userIDs []string) error { - conevrsationID := msgprocessor.GetConversationIDBySessionType(constant.SuperGroupChatType, groupID) + conevrsationID := msgprocessor.GetConversationIDBySessionType(constant.ReadGroupChatType, groupID) maxSeq, err := s.msgRpcClient.GetConversationMaxSeq(ctx, conevrsationID) if err != nil { return err @@ -1002,8 +1002,8 @@ func (s *groupServer) SetGroupInfo(ctx context.Context, req *pbgroup.SetGroupInf num-- func() { conversation := &pbconversation.ConversationReq{ - ConversationID: msgprocessor.GetConversationIDBySessionType(constant.SuperGroupChatType, req.GroupInfoForSet.GroupID), - ConversationType: constant.SuperGroupChatType, + ConversationID: msgprocessor.GetConversationIDBySessionType(constant.ReadGroupChatType, req.GroupInfoForSet.GroupID), + ConversationType: constant.ReadGroupChatType, GroupID: req.GroupInfoForSet.GroupID, } resp, err := s.GetGroupMemberUserIDs(ctx, &pbgroup.GetGroupMemberUserIDsReq{GroupID: req.GroupInfoForSet.GroupID}) diff --git a/internal/rpc/msg/as_read.go b/internal/rpc/msg/as_read.go index 992374d57..bfba4824f 100644 --- a/internal/rpc/msg/as_read.go +++ b/internal/rpc/msg/as_read.go @@ -166,7 +166,7 @@ func (m *msgServer) MarkConversationAsRead(ctx context.Context, req *msg.MarkCon } m.sendMarkAsReadNotification(ctx, req.ConversationID, conversation.ConversationType, req.UserID, m.conversationAndGetRecvID(conversation, req.UserID), seqs, hasReadSeq) - } else if conversation.ConversationType == constant.SuperGroupChatType || + } else if conversation.ConversationType == constant.ReadGroupChatType || conversation.ConversationType == constant.NotificationChatType { if req.HasReadSeq > hasReadSeq { err = m.MsgDatabase.SetHasReadSeq(ctx, req.UserID, req.ConversationID, req.HasReadSeq) diff --git a/internal/rpc/msg/revoke.go b/internal/rpc/msg/revoke.go index be5af0e49..7dbc307a1 100644 --- a/internal/rpc/msg/revoke.go +++ b/internal/rpc/msg/revoke.go @@ -69,7 +69,7 @@ func (m *msgServer) RevokeMsg(ctx context.Context, req *msg.RevokeMsgReq) (*msg. return nil, err } role = user.AppMangerLevel - case constant.SuperGroupChatType: + case constant.ReadGroupChatType: members, err := m.GroupLocalCache.GetGroupMemberInfoMap(ctx, msgs[0].GroupID, datautil.Distinct([]string{req.UserID, msgs[0].SendID})) if err != nil { return nil, err @@ -118,7 +118,7 @@ func (m *msgServer) RevokeMsg(ctx context.Context, req *msg.RevokeMsgReq) (*msg. IsAdminRevoke: flag, } var recvID string - if msgs[0].SessionType == constant.SuperGroupChatType { + if msgs[0].SessionType == constant.ReadGroupChatType { recvID = msgs[0].GroupID } else { recvID = msgs[0].RecvID diff --git a/internal/rpc/msg/send.go b/internal/rpc/msg/send.go index 5514788d8..4a2d21019 100644 --- a/internal/rpc/msg/send.go +++ b/internal/rpc/msg/send.go @@ -40,7 +40,7 @@ func (m *msgServer) SendMsg(ctx context.Context, req *pbmsg.SendMsgReq) (*pbmsg. return m.sendMsgSingleChat(ctx, req) case constant.NotificationChatType: return m.sendMsgNotification(ctx, req) - case constant.SuperGroupChatType: + case constant.ReadGroupChatType: return m.sendMsgSuperGroupChat(ctx, req) default: return nil, errs.ErrArgs.WrapMsg("unknown sessionType") diff --git a/internal/rpc/msg/server.go b/internal/rpc/msg/server.go index 81720d674..3f4df8d4b 100644 --- a/internal/rpc/msg/server.go +++ b/internal/rpc/msg/server.go @@ -121,7 +121,7 @@ func (m *msgServer) conversationAndGetRecvID(conversation *conversation.Conversa } else { return conversation.OwnerUserID } - } else if conversation.ConversationType == constant.SuperGroupChatType { + } else if conversation.ConversationType == constant.ReadGroupChatType { return conversation.GroupID } return "" diff --git a/internal/rpc/msg/sync_msg.go b/internal/rpc/msg/sync_msg.go index af04102e6..afb79506e 100644 --- a/internal/rpc/msg/sync_msg.go +++ b/internal/rpc/msg/sync_msg.go @@ -132,7 +132,7 @@ func (m *msgServer) SearchMessage(ctx context.Context, req *msg.SearchMessageReq switch chatLog.SessionType { case constant.SingleChatType, constant.NotificationChatType: recvIDs = append(recvIDs, chatLog.RecvID) - case constant.GroupChatType, constant.SuperGroupChatType: + case constant.WriteGroupChatType, constant.ReadGroupChatType: groupIDs = append(groupIDs, chatLog.GroupID) } } @@ -183,7 +183,7 @@ func (m *msgServer) SearchMessage(ctx context.Context, req *msg.SearchMessageReq switch chatLog.SessionType { case constant.SingleChatType, constant.NotificationChatType: pbchatLog.RecvNickname = recvMap[chatLog.RecvID] - case constant.GroupChatType, constant.SuperGroupChatType: + case constant.WriteGroupChatType, constant.ReadGroupChatType: groupInfo := groupMap[chatLog.GroupID] pbchatLog.SenderFaceURL = groupInfo.FaceURL pbchatLog.GroupMemberCount = groupInfo.MemberCount // Reflects actual member count diff --git a/internal/rpc/msg/verify.go b/internal/rpc/msg/verify.go index 30b6af85f..33879bfe7 100644 --- a/internal/rpc/msg/verify.go +++ b/internal/rpc/msg/verify.go @@ -77,7 +77,7 @@ func (m *msgServer) messageVerification(ctx context.Context, data *msg.SendMsgRe return nil } return nil - case constant.SuperGroupChatType: + case constant.ReadGroupChatType: groupInfo, err := m.GroupLocalCache.GetGroupInfo(ctx, data.MsgData.GroupID) if err != nil { return err diff --git a/pkg/common/db/cache/msg.go b/pkg/common/db/cache/msg.go index 94307343f..a2502ca80 100644 --- a/pkg/common/db/cache/msg.go +++ b/pkg/common/db/cache/msg.go @@ -408,9 +408,9 @@ func (c *msgCache) getMessageReactionExPrefix(clientMsgID string, sessionType in switch sessionType { case constant.SingleChatType: return "EX_SINGLE_" + clientMsgID - case constant.GroupChatType: + case constant.WriteGroupChatType: return "EX_GROUP_" + clientMsgID - case constant.SuperGroupChatType: + case constant.ReadGroupChatType: return "EX_SUPER_GROUP_" + clientMsgID case constant.NotificationChatType: return "EX_NOTIFICATION" + clientMsgID diff --git a/pkg/common/db/controller/conversation.go b/pkg/common/db/controller/conversation.go index f6647ad20..567bcb270 100644 --- a/pkg/common/db/controller/conversation.go +++ b/pkg/common/db/controller/conversation.go @@ -260,7 +260,7 @@ func (c *conversationDatabase) SetUserConversations(ctx context.Context, ownerUs func (c *conversationDatabase) CreateGroupChatConversation(ctx context.Context, groupID string, userIDs []string) error { return c.tx.Transaction(ctx, func(ctx context.Context) error { cache := c.cache.NewCache() - conversationID := msgprocessor.GetConversationIDBySessionType(constant.SuperGroupChatType, groupID) + conversationID := msgprocessor.GetConversationIDBySessionType(constant.ReadGroupChatType, groupID) existConversationUserIDs, err := c.conversationDB.FindUserID(ctx, userIDs, []string{conversationID}) if err != nil { return err @@ -268,7 +268,7 @@ func (c *conversationDatabase) CreateGroupChatConversation(ctx context.Context, notExistUserIDs := stringutil.DifferenceString(userIDs, existConversationUserIDs) var conversations []*relationtb.ConversationModel for _, v := range notExistUserIDs { - conversation := relationtb.ConversationModel{ConversationType: constant.SuperGroupChatType, GroupID: groupID, OwnerUserID: v, ConversationID: conversationID} + conversation := relationtb.ConversationModel{ConversationType: constant.ReadGroupChatType, GroupID: groupID, OwnerUserID: v, ConversationID: conversationID} conversations = append(conversations, &conversation) cache = cache.DelConversations(v, conversationID).DelConversationNotReceiveMessageUserIDs(conversationID) } diff --git a/pkg/msgprocessor/conversation.go b/pkg/msgprocessor/conversation.go index 710f4ccf9..b369269cc 100644 --- a/pkg/msgprocessor/conversation.go +++ b/pkg/msgprocessor/conversation.go @@ -30,9 +30,9 @@ func GetNotificationConversationIDByMsg(msg *sdkws.MsgData) string { l := []string{msg.SendID, msg.RecvID} sort.Strings(l) return "n_" + strings.Join(l, "_") - case constant.GroupChatType: + case constant.WriteGroupChatType: return "n_" + msg.GroupID - case constant.SuperGroupChatType: + case constant.ReadGroupChatType: return "n_" + msg.GroupID case constant.NotificationChatType: return "n_" + msg.SendID + "_" + msg.RecvID @@ -46,9 +46,9 @@ func GetChatConversationIDByMsg(msg *sdkws.MsgData) string { l := []string{msg.SendID, msg.RecvID} sort.Strings(l) return "si_" + strings.Join(l, "_") - case constant.GroupChatType: + case constant.WriteGroupChatType: return "g_" + msg.GroupID - case constant.SuperGroupChatType: + case constant.ReadGroupChatType: return "sg_" + msg.GroupID case constant.NotificationChatType: return "sn_" + msg.SendID + "_" + msg.RecvID @@ -63,7 +63,7 @@ func GenConversationUniqueKey(msg *sdkws.MsgData) string { l := []string{msg.SendID, msg.RecvID} sort.Strings(l) return strings.Join(l, "_") - case constant.SuperGroupChatType: + case constant.ReadGroupChatType: return msg.GroupID } return "" @@ -79,12 +79,12 @@ func GetConversationIDByMsg(msg *sdkws.MsgData) string { return "n_" + strings.Join(l, "_") } return "si_" + strings.Join(l, "_") // single chat - case constant.GroupChatType: + case constant.WriteGroupChatType: if !options.IsNotNotification() { return "n_" + msg.GroupID // group chat } return "g_" + msg.GroupID // group chat - case constant.SuperGroupChatType: + case constant.ReadGroupChatType: if !options.IsNotNotification() { return "n_" + msg.GroupID // super group chat } @@ -106,9 +106,9 @@ func GetConversationIDBySessionType(sessionType int, ids ...string) string { switch sessionType { case constant.SingleChatType: return "si_" + strings.Join(ids, "_") // single chat - case constant.GroupChatType: + case constant.WriteGroupChatType: return "g_" + ids[0] // group chat - case constant.SuperGroupChatType: + case constant.ReadGroupChatType: return "sg_" + ids[0] // super group chat case constant.NotificationChatType: return "sn_" + ids[0] // server notification chat @@ -134,7 +134,7 @@ func GetNotificationConversationID(sessionType int, ids ...string) string { switch sessionType { case constant.SingleChatType: return "n_" + strings.Join(ids, "_") // single chat - case constant.SuperGroupChatType: + case constant.ReadGroupChatType: return "n_" + ids[0] // super group chat } return "" @@ -158,7 +158,7 @@ func ParseConversationID(msg *sdkws.MsgData) (isNotification bool, conversationI return true, "n_" + strings.Join(l, "_") } return false, "si_" + strings.Join(l, "_") // single chat - case constant.SuperGroupChatType: + case constant.ReadGroupChatType: if !options.IsNotNotification() { return true, "n_" + msg.GroupID // super group chat } diff --git a/pkg/rpcclient/msg.go b/pkg/rpcclient/msg.go index 51b51dc7c..36daf9f66 100644 --- a/pkg/rpcclient/msg.go +++ b/pkg/rpcclient/msg.go @@ -85,26 +85,26 @@ func newContentTypeConf(conf *config.Notification) map[int32]config.Notification func newSessionTypeConf() map[int32]int32 { return map[int32]int32{ // group - constant.GroupCreatedNotification: constant.SuperGroupChatType, - constant.GroupInfoSetNotification: constant.SuperGroupChatType, + constant.GroupCreatedNotification: constant.ReadGroupChatType, + constant.GroupInfoSetNotification: constant.ReadGroupChatType, constant.JoinGroupApplicationNotification: constant.SingleChatType, - constant.MemberQuitNotification: constant.SuperGroupChatType, + constant.MemberQuitNotification: constant.ReadGroupChatType, constant.GroupApplicationAcceptedNotification: constant.SingleChatType, constant.GroupApplicationRejectedNotification: constant.SingleChatType, - constant.GroupOwnerTransferredNotification: constant.SuperGroupChatType, - constant.MemberKickedNotification: constant.SuperGroupChatType, - constant.MemberInvitedNotification: constant.SuperGroupChatType, - constant.MemberEnterNotification: constant.SuperGroupChatType, - constant.GroupDismissedNotification: constant.SuperGroupChatType, - constant.GroupMutedNotification: constant.SuperGroupChatType, - constant.GroupCancelMutedNotification: constant.SuperGroupChatType, - constant.GroupMemberMutedNotification: constant.SuperGroupChatType, - constant.GroupMemberCancelMutedNotification: constant.SuperGroupChatType, - constant.GroupMemberInfoSetNotification: constant.SuperGroupChatType, - constant.GroupMemberSetToAdminNotification: constant.SuperGroupChatType, - constant.GroupMemberSetToOrdinaryUserNotification: constant.SuperGroupChatType, - constant.GroupInfoSetAnnouncementNotification: constant.SuperGroupChatType, - constant.GroupInfoSetNameNotification: constant.SuperGroupChatType, + constant.GroupOwnerTransferredNotification: constant.ReadGroupChatType, + constant.MemberKickedNotification: constant.ReadGroupChatType, + constant.MemberInvitedNotification: constant.ReadGroupChatType, + constant.MemberEnterNotification: constant.ReadGroupChatType, + constant.GroupDismissedNotification: constant.ReadGroupChatType, + constant.GroupMutedNotification: constant.ReadGroupChatType, + constant.GroupCancelMutedNotification: constant.ReadGroupChatType, + constant.GroupMemberMutedNotification: constant.ReadGroupChatType, + constant.GroupMemberCancelMutedNotification: constant.ReadGroupChatType, + constant.GroupMemberInfoSetNotification: constant.ReadGroupChatType, + constant.GroupMemberSetToAdminNotification: constant.ReadGroupChatType, + constant.GroupMemberSetToOrdinaryUserNotification: constant.ReadGroupChatType, + constant.GroupInfoSetAnnouncementNotification: constant.ReadGroupChatType, + constant.GroupInfoSetNameNotification: constant.ReadGroupChatType, // user constant.UserInfoUpdatedNotification: constant.SingleChatType, constant.UserStatusChangeNotification: constant.SingleChatType, @@ -307,7 +307,7 @@ func (s *NotificationSender) send(ctx context.Context, sendID, recvID string, co msg.MsgFrom = constant.SysMsgType msg.ContentType = contentType msg.SessionType = sessionType - if msg.SessionType == constant.SuperGroupChatType { + if msg.SessionType == constant.ReadGroupChatType { msg.GroupID = recvID } msg.CreateTime = timeutil.GetCurrentTimestampByMill() From 977b67d6ee77b843cb1cbd903599440ba33a5717 Mon Sep 17 00:00:00 2001 From: Gordon <46924906+FGadvancer@users.noreply.github.com> Date: Fri, 19 Apr 2024 18:02:11 +0800 Subject: [PATCH 2/8] fix: push update. --- internal/push/offlinepush/dummy/push.go | 5 ++--- internal/push/offlinepush/fcm/push.go | 4 ++-- internal/push/offlinepush/getui/push.go | 4 ++-- internal/push/offlinepush/jpush/push.go | 4 ++-- 4 files changed, 8 insertions(+), 9 deletions(-) diff --git a/internal/push/offlinepush/dummy/push.go b/internal/push/offlinepush/dummy/push.go index f147886d9..028e7edd3 100644 --- a/internal/push/offlinepush/dummy/push.go +++ b/internal/push/offlinepush/dummy/push.go @@ -16,8 +16,7 @@ package dummy import ( "context" - - "github.com/openimsdk/open-im-server/v3/internal/push/offlinepush" + "github.com/openimsdk/open-im-server/v3/internal/push/offlinepush/options" ) func NewClient() *Dummy { @@ -27,6 +26,6 @@ func NewClient() *Dummy { type Dummy struct { } -func (d *Dummy) Push(ctx context.Context, userIDs []string, title, content string, opts *offlinepush.Opts) error { +func (d *Dummy) Push(ctx context.Context, userIDs []string, title, content string, opts *options.Opts) error { return nil } diff --git a/internal/push/offlinepush/fcm/push.go b/internal/push/offlinepush/fcm/push.go index 3b41959d2..34ad1c0d6 100644 --- a/internal/push/offlinepush/fcm/push.go +++ b/internal/push/offlinepush/fcm/push.go @@ -16,11 +16,11 @@ package fcm import ( "context" + "github.com/openimsdk/open-im-server/v3/internal/push/offlinepush/options" "path/filepath" firebase "firebase.google.com/go" "firebase.google.com/go/messaging" - "github.com/openimsdk/open-im-server/v3/internal/push/offlinepush" "github.com/openimsdk/open-im-server/v3/pkg/common/config" "github.com/openimsdk/open-im-server/v3/pkg/common/db/cache" "github.com/openimsdk/protocol/constant" @@ -60,7 +60,7 @@ func NewClient(pushConf *config.Push, cache cache.ThirdCache) (*Fcm, error) { return &Fcm{fcmMsgCli: fcmMsgClient, cache: cache}, nil } -func (f *Fcm) Push(ctx context.Context, userIDs []string, title, content string, opts *offlinepush.Opts) error { +func (f *Fcm) Push(ctx context.Context, userIDs []string, title, content string, opts *options.Opts) error { // accounts->registrationToken allTokens := make(map[string][]string, 0) for _, account := range userIDs { diff --git a/internal/push/offlinepush/getui/push.go b/internal/push/offlinepush/getui/push.go index ae7b8bc58..8ecea3a62 100644 --- a/internal/push/offlinepush/getui/push.go +++ b/internal/push/offlinepush/getui/push.go @@ -18,11 +18,11 @@ import ( "context" "crypto/sha256" "encoding/hex" + "github.com/openimsdk/open-im-server/v3/internal/push/offlinepush/options" "strconv" "sync" "time" - "github.com/openimsdk/open-im-server/v3/internal/push/offlinepush" "github.com/openimsdk/open-im-server/v3/pkg/common/config" "github.com/openimsdk/open-im-server/v3/pkg/common/db/cache" "github.com/openimsdk/tools/errs" @@ -67,7 +67,7 @@ func NewClient(pushConf *config.Push, cache cache.ThirdCache) *Client { } } -func (g *Client) Push(ctx context.Context, userIDs []string, title, content string, opts *offlinepush.Opts) error { +func (g *Client) Push(ctx context.Context, userIDs []string, title, content string, opts *options.Opts) error { token, err := g.cache.GetGetuiToken(ctx) if err != nil { if errs.Unwrap(err) == redis.Nil { diff --git a/internal/push/offlinepush/jpush/push.go b/internal/push/offlinepush/jpush/push.go index 7d2b17bd2..dac52597f 100644 --- a/internal/push/offlinepush/jpush/push.go +++ b/internal/push/offlinepush/jpush/push.go @@ -18,8 +18,8 @@ import ( "context" "encoding/base64" "fmt" + "github.com/openimsdk/open-im-server/v3/internal/push/offlinepush/options" - "github.com/openimsdk/open-im-server/v3/internal/push/offlinepush" "github.com/openimsdk/open-im-server/v3/internal/push/offlinepush/jpush/body" "github.com/openimsdk/open-im-server/v3/pkg/common/config" "github.com/openimsdk/tools/utils/httputil" @@ -51,7 +51,7 @@ func (j *JPush) getAuthorization(appKey string, masterSecret string) string { return Authorization } -func (j *JPush) Push(ctx context.Context, userIDs []string, title, content string, opts *offlinepush.Opts) error { +func (j *JPush) Push(ctx context.Context, userIDs []string, title, content string, opts *options.Opts) error { var pf body.Platform pf.SetAll() var au body.Audience From a6ec6867a18cd76571f5af9720cc427f18f6dc39 Mon Sep 17 00:00:00 2001 From: Gordon <46924906+FGadvancer@users.noreply.github.com> Date: Fri, 19 Apr 2024 18:05:59 +0800 Subject: [PATCH 3/8] fix: push update. --- go.mod | 2 +- go.sum | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/go.mod b/go.mod index 56517c233..ebb3cae5f 100644 --- a/go.mod +++ b/go.mod @@ -14,7 +14,7 @@ require ( github.com/lestrrat-go/file-rotatelogs v2.4.0+incompatible // indirect github.com/mitchellh/mapstructure v1.5.0 github.com/openimsdk/localcache v0.0.1 - github.com/openimsdk/protocol v0.0.64-0.20240419092747-47b8c07ee719 + github.com/openimsdk/protocol v0.0.63 github.com/openimsdk/tools v0.0.47-alpha.42 github.com/pkg/errors v0.9.1 // indirect github.com/prometheus/client_golang v1.18.0 diff --git a/go.sum b/go.sum index 3a9436a68..370bb62f8 100644 --- a/go.sum +++ b/go.sum @@ -268,8 +268,8 @@ github.com/onsi/gomega v1.18.1 h1:M1GfJqGRrBrrGGsbxzV5dqM2U2ApXefZCQpkukxYRLE= github.com/onsi/gomega v1.18.1/go.mod h1:0q+aL8jAiMXy9hbwj2mr5GziHiwhAIQpFmmtT5hitRs= github.com/openimsdk/gomake v0.0.6 h1:bJmQWDHBj8PQ7oGJ2SL3Gsx0k5CdI/BPfGzlGcV105s= github.com/openimsdk/gomake v0.0.6/go.mod h1:PndCozNc2IsQIciyn9mvEblYWZwJmAI+06z94EY+csI= -github.com/openimsdk/protocol v0.0.64-0.20240419092747-47b8c07ee719 h1:tr4iHGfegx2ZWxkT6bvnFeuKuh2OsJE3GBHwjEGMZcg= -github.com/openimsdk/protocol v0.0.64-0.20240419092747-47b8c07ee719/go.mod h1:OZQA9FR55lseYoN2Ql1XAHYKHJGu7OMNkUbuekrKCM8= +github.com/openimsdk/protocol v0.0.63 h1:9DnweZe9nEYDFa4fGTbC9Cqi0gLUdtBhRo1NRP2X3WQ= +github.com/openimsdk/protocol v0.0.63/go.mod h1:OZQA9FR55lseYoN2Ql1XAHYKHJGu7OMNkUbuekrKCM8= github.com/openimsdk/tools v0.0.47-alpha.42 h1:wM6t9otTLhXECq8aQcYaZGvBgo/ZAmbNTqVt3g3NHGg= github.com/openimsdk/tools v0.0.47-alpha.42/go.mod h1:P4oGP1Pd+d4ctbLD5U/XQTgl8yu8Hd3skx640Fr69ko= github.com/pelletier/go-toml/v2 v2.1.0 h1:FnwAJ4oYMvbT/34k9zzHuZNrhlz48GB3/s6at6/MHO4= From 23c225284118783120aefe5676f47d80654a1039 Mon Sep 17 00:00:00 2001 From: Gordon <46924906+FGadvancer@users.noreply.github.com> Date: Fri, 19 Apr 2024 18:06:56 +0800 Subject: [PATCH 4/8] fix: push update. --- go.mod | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/go.mod b/go.mod index ebb3cae5f..3518ea8e6 100644 --- a/go.mod +++ b/go.mod @@ -14,7 +14,7 @@ require ( github.com/lestrrat-go/file-rotatelogs v2.4.0+incompatible // indirect github.com/mitchellh/mapstructure v1.5.0 github.com/openimsdk/localcache v0.0.1 - github.com/openimsdk/protocol v0.0.63 + github.com/openimsdk/protocol v0.0.57 github.com/openimsdk/tools v0.0.47-alpha.42 github.com/pkg/errors v0.9.1 // indirect github.com/prometheus/client_golang v1.18.0 From 6c19531a00246f3bee6fe5b2f41fed30860de789 Mon Sep 17 00:00:00 2001 From: Gordon <46924906+FGadvancer@users.noreply.github.com> Date: Fri, 19 Apr 2024 18:08:53 +0800 Subject: [PATCH 5/8] fix: push update. --- go.mod | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/go.mod b/go.mod index 3518ea8e6..8ada8f2e7 100644 --- a/go.mod +++ b/go.mod @@ -14,7 +14,7 @@ require ( github.com/lestrrat-go/file-rotatelogs v2.4.0+incompatible // indirect github.com/mitchellh/mapstructure v1.5.0 github.com/openimsdk/localcache v0.0.1 - github.com/openimsdk/protocol v0.0.57 + github.com/openimsdk/protocol v0.0.64 github.com/openimsdk/tools v0.0.47-alpha.42 github.com/pkg/errors v0.9.1 // indirect github.com/prometheus/client_golang v1.18.0 From 20db896ced9045a6ddc16690da15dd49c23ba7dc Mon Sep 17 00:00:00 2001 From: Gordon <46924906+FGadvancer@users.noreply.github.com> Date: Fri, 19 Apr 2024 18:12:21 +0800 Subject: [PATCH 6/8] fix: push update. --- go.sum | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/go.sum b/go.sum index 370bb62f8..0ef6209fc 100644 --- a/go.sum +++ b/go.sum @@ -268,8 +268,8 @@ github.com/onsi/gomega v1.18.1 h1:M1GfJqGRrBrrGGsbxzV5dqM2U2ApXefZCQpkukxYRLE= github.com/onsi/gomega v1.18.1/go.mod h1:0q+aL8jAiMXy9hbwj2mr5GziHiwhAIQpFmmtT5hitRs= github.com/openimsdk/gomake v0.0.6 h1:bJmQWDHBj8PQ7oGJ2SL3Gsx0k5CdI/BPfGzlGcV105s= github.com/openimsdk/gomake v0.0.6/go.mod h1:PndCozNc2IsQIciyn9mvEblYWZwJmAI+06z94EY+csI= -github.com/openimsdk/protocol v0.0.63 h1:9DnweZe9nEYDFa4fGTbC9Cqi0gLUdtBhRo1NRP2X3WQ= -github.com/openimsdk/protocol v0.0.63/go.mod h1:OZQA9FR55lseYoN2Ql1XAHYKHJGu7OMNkUbuekrKCM8= +github.com/openimsdk/protocol v0.0.64 h1:OrjSs4CgKN9VLvJvrAsc37O7Ru0E0VllXZQSmG/ab7U= +github.com/openimsdk/protocol v0.0.64/go.mod h1:OZQA9FR55lseYoN2Ql1XAHYKHJGu7OMNkUbuekrKCM8= github.com/openimsdk/tools v0.0.47-alpha.42 h1:wM6t9otTLhXECq8aQcYaZGvBgo/ZAmbNTqVt3g3NHGg= github.com/openimsdk/tools v0.0.47-alpha.42/go.mod h1:P4oGP1Pd+d4ctbLD5U/XQTgl8yu8Hd3skx640Fr69ko= github.com/pelletier/go-toml/v2 v2.1.0 h1:FnwAJ4oYMvbT/34k9zzHuZNrhlz48GB3/s6at6/MHO4= From 5fbe768e4be2b524f44977d3c2b4f3750e3899a9 Mon Sep 17 00:00:00 2001 From: Gordon <46924906+FGadvancer@users.noreply.github.com> Date: Fri, 19 Apr 2024 18:28:18 +0800 Subject: [PATCH 7/8] fix: push update. --- pkg/common/db/cache/msg.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/common/db/cache/msg.go b/pkg/common/db/cache/msg.go index a2502ca80..e8a86b71b 100644 --- a/pkg/common/db/cache/msg.go +++ b/pkg/common/db/cache/msg.go @@ -31,7 +31,7 @@ import ( "golang.org/x/sync/errgroup" ) -const msgCacheTimeout = 86400 +const msgCacheTimeout = 86400 * time.Second const ( maxSeq = "MAX_SEQ:" From 95b19f57223b6b3172478c87fb8e9f02cee29870 Mon Sep 17 00:00:00 2001 From: Gordon <46924906+FGadvancer@users.noreply.github.com> Date: Fri, 19 Apr 2024 19:08:09 +0800 Subject: [PATCH 8/8] fix: websocket handle error remove when upgrade error. --- internal/msggateway/n_ws_server.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/internal/msggateway/n_ws_server.go b/internal/msggateway/n_ws_server.go index cd9bc57f6..cf607d470 100644 --- a/internal/msggateway/n_ws_server.go +++ b/internal/msggateway/n_ws_server.go @@ -417,8 +417,8 @@ func (ws *WsServer) wsHandler(w http.ResponseWriter, r *http.Request) { // Create a WebSocket long connection object wsLongConn := newGWebSocket(WebSocket, ws.handshakeTimeout, ws.writeBufferSize) if err := wsLongConn.GenerateLongConn(w, r); err != nil { - // If creating the long connection fails, return an error via HTTP and stop processing - httpError(connContext, err) + //If the creation of the long connection fails, the error is handled internally during the handshake process. + log.ZWarn(connContext, "long connection fails", err) return } else { // Check if a normal response should be sent via WebSocket