From ebbb8b86bbc767ce94fd005bd5c3f5df2f26ab44 Mon Sep 17 00:00:00 2001 From: skiffer-git <44203734@qq.com> Date: Mon, 15 Apr 2024 15:22:34 +0800 Subject: [PATCH] refactor webhook --- go.mod | 2 +- go.sum | 19 +++++++ internal/rpc/conversation/conversaion.go | 5 +- .../rpc/conversation/notification.go | 2 +- internal/rpc/friend/friend.go | 7 ++- .../rpc/friend/notification.go | 9 ++-- internal/rpc/msg/as_read.go | 2 +- internal/rpc/msg/delete.go | 8 +-- internal/rpc/msg/notification.go | 2 +- internal/rpc/msg/revoke.go | 2 +- internal/rpc/user/user.go | 6 +-- pkg/common/notification/notification.go | 51 +++++++++++++++++++ pkg/rpcclient/msg.go | 4 +- 13 files changed, 94 insertions(+), 25 deletions(-) rename pkg/rpcclient/notification/conversation.go => internal/rpc/conversation/notification.go (99%) rename pkg/rpcclient/notification/friend.go => internal/rpc/friend/notification.go (95%) create mode 100644 pkg/common/notification/notification.go diff --git a/go.mod b/go.mod index b81b06d6c..686e3e12a 100644 --- a/go.mod +++ b/go.mod @@ -1,6 +1,6 @@ module github.com/openimsdk/open-im-server/v3 -go 1.20 +go 1.21.2 require ( firebase.google.com/go v3.13.0+incompatible diff --git a/go.sum b/go.sum index 69243c03d..c4fe3ad0c 100644 --- a/go.sum +++ b/go.sum @@ -22,10 +22,13 @@ github.com/QcloudApi/qcloud_sign_golang v0.0.0-20141224014652-e4130a326409/go.mo github.com/aliyun/aliyun-oss-go-sdk v3.0.2+incompatible h1:8psS8a+wKfiLt1iVDX79F7Y6wUM49Lcha2FMXt4UM8g= github.com/aliyun/aliyun-oss-go-sdk v3.0.2+incompatible/go.mod h1:T/Aws4fEfogEE9v+HPhhw+CntffsBHJ8nXQCwKr0/g8= github.com/benbjohnson/clock v1.1.0 h1:Q92kusRqC1XV2MjkWETPvjJVqKetz1OzxZB7mHJLju8= +github.com/benbjohnson/clock v1.1.0/go.mod h1:J11/hYXuz8f4ySSvYwY0FKfm+ezbsZBKZxNJlLklBHA= github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM= github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw= github.com/bsm/ginkgo/v2 v2.12.0 h1:Ny8MWAHyOepLGlLKYmXG4IEkioBysk6GpaRTLC8zwWs= +github.com/bsm/ginkgo/v2 v2.12.0/go.mod h1:SwYbGRRDovPVboqFv0tPTcG1sN61LM1Z4ARdbAV9g4c= github.com/bsm/gomega v1.27.10 h1:yeMWxP2pV2fG3FgAODIY8EiRE3dy0aeFYt4l7wh6yKA= +github.com/bsm/gomega v1.27.10/go.mod h1:JyEr/xRbxbtgWNi8tIEVPUYZ5Dzef52k01W3YH0H+O0= github.com/bytedance/sonic v1.5.0/go.mod h1:ED5hyg4y6t3/9Ku1R6dU/4KyJ48DZ4jPhfY1O2AihPM= github.com/bytedance/sonic v1.9.1 h1:6iJ6NqdoxCDr6mbY8h18oSO+cShGSMRGCEo7F2h0x8s= github.com/bytedance/sonic v1.9.1/go.mod h1:i736AoUSYt75HyZLoJW9ERYxcy6eaN6h4BZXU064P/U= @@ -43,6 +46,7 @@ github.com/clbanning/mxj v1.8.4/go.mod h1:BVjHeAH+rl9rs6f+QIpeRl0tfu10SXn1pUSa5P github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw= github.com/cncf/udpa/go v0.0.0-20191209042840-269d4d468f6f/go.mod h1:M8M6+tZqaGXZJjfX53e64911xZQV5JYwmTeXPW+k8Sc= github.com/cncf/xds/go v0.0.0-20231128003011-0fa0005c9caa h1:jQCWAUqqlij9Pgj2i/PB79y4KOPYVyFYdROxgaCwdTQ= +github.com/cncf/xds/go v0.0.0-20231128003011-0fa0005c9caa/go.mod h1:x/1Gn8zydmfq8dk6e9PdstVsDgu9RuyIIJqAaF//0IM= github.com/cpuguy83/go-md2man/v2 v2.0.3/go.mod h1:tgQtvFlXSQOSOSIRvRPT7W67SCa46tRHOmNcaadrF8o= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= @@ -66,12 +70,15 @@ github.com/envoyproxy/go-control-plane v0.9.1-0.20191026205805-5f8ba28d4473/go.m github.com/envoyproxy/go-control-plane v0.9.4/go.mod h1:6rpuAdCZL397s3pYoYcLgu1mIlRU8Am5FuJP05cCM98= github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7+kN2VEUnK/pcBlmesArF7c= github.com/envoyproxy/protoc-gen-validate v1.0.4 h1:gVPz/FMfvh57HdSJQyvBtF00j8JU4zdyUgIUNhlgg0A= +github.com/envoyproxy/protoc-gen-validate v1.0.4/go.mod h1:qys6tmnRsYrQqIhm2bvKZH4Blx/1gTIZ2UKVY1M+Yew= github.com/fatih/color v1.14.1 h1:qfhVLaG5s+nCROl1zJsZRxFeYrHLqWroPOQ8BWiNb4w= github.com/fatih/color v1.14.1/go.mod h1:2oHN61fhTpgcxD3TSWCgKDiH1+x4OiDVVGH8WlgGZGg= github.com/felixge/httpsnoop v1.0.4 h1:NFTV2Zj1bL4mc9sqWACXbQFVBBg2W3GPvqp8/ESS2Wg= github.com/felixge/httpsnoop v1.0.4/go.mod h1:m8KPJKqk1gH5J9DgRY2ASl2lWCfGKXixSwevea8zH2U= github.com/fortytw2/leaktest v1.3.0 h1:u8491cBMTQ8ft8aeV+adlcytMZylmA5nnwwkRZjI8vw= +github.com/fortytw2/leaktest v1.3.0/go.mod h1:jDsjWgpAGjm2CA7WthBh/CdZYEPF31XHquHwclZch5g= github.com/frankban/quicktest v1.14.6 h1:7Xjx+VpznH+oBnejlPUj8oUpdxnVs4f8XU8WnHkI4W8= +github.com/frankban/quicktest v1.14.6/go.mod h1:4ptaffx2x8+WTWXmUCuVU6aPUX1/Mz7zb5vbUoiM6w0= github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo= github.com/fsnotify/fsnotify v1.4.9/go.mod h1:znqG4EE+3YCdAaPaxE2ZRY/06pZUdp0tY4IgpuI1SZQ= github.com/fsnotify/fsnotify v1.7.0 h1:8JEhPFa5W2WU7YfeZzPNqzMP6Lwt7L2715Ggo0nosvA= @@ -90,6 +97,7 @@ github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre github.com/go-ole/go-ole v1.2.6 h1:/Fpf6oFPoeFik9ty7siob0G6Ke8QvQEuVcuChpwXzpY= github.com/go-ole/go-ole v1.2.6/go.mod h1:pprOEPIfldk/42T2oK7lQ4v4JSDwmV0As9GaiUsvbm0= github.com/go-playground/assert/v2 v2.2.0 h1:JvknZsQTYeFEAhQwI4qEt9cyV5ONwRHC+lYKSsYSR8s= +github.com/go-playground/assert/v2 v2.2.0/go.mod h1:VDjEfimB/XKnb+ZQfWdccd7VUvScMdVu0Titje2rxJ4= github.com/go-playground/locales v0.14.1 h1:EWaQ/wswjilfKLTECiXz7Rh+3BjFhfDFKv/oXslEjJA= github.com/go-playground/locales v0.14.1/go.mod h1:hxrqLVvrK65+Rwrd5Fc6F2O76J/NuW9t0sjnWqG1slY= github.com/go-playground/universal-translator v0.18.1 h1:Bcnm0ZwsGyWbCzImXv+pAJnYK9S473LQFuzCbDbfSFY= @@ -137,11 +145,13 @@ github.com/google/go-cmp v0.5.2/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/ github.com/google/go-cmp v0.5.3/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI= +github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= github.com/google/go-querystring v1.0.0/go.mod h1:odCYkC5MyYFN7vkCjXpyrEuKhc/BUO6wN/zVPAxq5ck= github.com/google/go-querystring v1.1.0 h1:AnCroh3fv4ZBgVIf1Iwtovgjaw/GiKJo8M8yD/fhyJ8= github.com/google/go-querystring v1.1.0/go.mod h1:Kcdr2DB4koayq7X8pmAG4sNG59So17icRSOU623lUBU= github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg= github.com/google/martian/v3 v3.3.2 h1:IqNFLAmvJOgVlpdEBiQbDc2EwKW77amAycfTuWKdfvw= +github.com/google/martian/v3 v3.3.2/go.mod h1:oBOf6HBosgwRXnUGWUB05QECsc6uvmMiJ3+6W4l/CUk= github.com/google/pprof v0.0.0-20210407192527-94a9f03dee38/go.mod h1:kpwsk12EmLew5upagYY7GY0pfYCcupk39gWOCRROcvE= github.com/google/s2a-go v0.1.7 h1:60BLSyTrOV4/haCDW4zb1guZItoSq8foHCXrAnjBo/o= github.com/google/s2a-go v0.1.7/go.mod h1:50CgR4k1jNlWBu4UfS4AcfhVe1r6pdZPygJ3R8F0Qdw= @@ -194,6 +204,7 @@ github.com/jinzhu/inflection v1.0.0/go.mod h1:h+uFLlag+Qp1Va5pdKtLDYj+kHp5pxUVkr github.com/jinzhu/now v1.1.5 h1:/o9tlHleP7gOFmsnYNz3RGnqzefHA47wQpKrrdTIwXQ= github.com/jinzhu/now v1.1.5/go.mod h1:d3SSVoowX0Lcu0IBviAWJpolVfI5UJVZZ7cO71lE/z8= github.com/jonboulle/clockwork v0.4.0 h1:p4Cf1aMWXnXAUh8lVfewRBx1zaTSYKrKMF2g3ST4RZ4= +github.com/jonboulle/clockwork v0.4.0/go.mod h1:xgRqUGwRcjKCO1vbZUEtSLrqKoPSsUpK7fnezOII0kc= github.com/json-iterator/go v1.1.12 h1:PV8peI4a0ysnczrg+LtxykD8LfKY9ML6u2jnxaEnrnM= github.com/json-iterator/go v1.1.12/go.mod h1:e30LSqwooZae/UwlEbR2852Gd8hjQvJoHmT4TnhNGBo= github.com/kelindar/bitmap v1.5.2 h1:XwX7CTvJtetQZ64zrOkApoZZHBJRkjE23NfqUALA/HE= @@ -209,7 +220,9 @@ github.com/klauspost/cpuid/v2 v2.0.9/go.mod h1:FInQzS24/EEf25PyTYn52gqo7WaD8xa02 github.com/klauspost/cpuid/v2 v2.2.6 h1:ndNyv040zDGIDh8thGkXYjnFtiN02M1PVVF+JE/48xc= github.com/klauspost/cpuid/v2 v2.2.6/go.mod h1:Lcz8mBdAVJIBVzewtcLocK12l3Y+JytZYpaMropDUws= github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE= +github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk= github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= +github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= github.com/leodido/go-urn v1.4.0 h1:WT9HwE9SGECu3lg4d/dIA+jxlljEa1/ffXKmRjqdmIQ= github.com/leodido/go-urn v1.4.0/go.mod h1:bvxc+MVxLKB4z00jd1z+Dvzr47oO32F/QSNjSBOlFxI= github.com/lestrrat-go/envload v0.0.0-20180220234015-a3eb8ddeffcc h1:RKf14vYWi2ttpEmkA4aQ3j4u9dStX2t4M8UM6qqNsG8= @@ -282,6 +295,7 @@ github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZN github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 h1:Jamvg5psRIccs7FGNTlIRMkT8wgtp5eCXdBlqhYGL6U= github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/prashantv/gostub v1.1.0 h1:BTyx3RfQjRHnUWaGF9oQos79AlQ5k8WNktv7VGvVH4g= +github.com/prashantv/gostub v1.1.0/go.mod h1:A5zLQHz7ieHGG7is6LLXLz7I8+3LZzsrV0P1IAHhP5U= github.com/prometheus/client_golang v1.18.0 h1:HzFfmkOzH5Q8L8G+kSJKUx5dtG87sewO+FoDDqP5Tbk= github.com/prometheus/client_golang v1.18.0/go.mod h1:T+GXkCk5wSJyOqMIzVgvvjFDlkOQntgjkJWKrN5txjA= github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA= @@ -298,6 +312,7 @@ github.com/redis/go-redis/v9 v9.4.0/go.mod h1:hdY0cQFCN4fnSYT6TkisLufl/4W5UIXyv0 github.com/robfig/cron/v3 v3.0.1 h1:WdRxkvbJztn8LMz/QEvLN5sBU+xKpSqwwUO1Pjr4qDs= github.com/robfig/cron/v3 v3.0.1/go.mod h1:eQICP3HwyT7UooqI/z+Ov+PtYAWygg1TEWWzGIFLtro= github.com/rogpeppe/go-internal v1.11.0 h1:cWPaGQEPrBb5/AsnsZesgZZ9yb1OQ+GOISoDNXVBh4M= +github.com/rogpeppe/go-internal v1.11.0/go.mod h1:ddIwULY96R17DhadqLgMfk9H9tvdUzkipdSkR5nkCZA= github.com/rs/xid v1.5.0 h1:mKX4bl4iPYJtEIxp6CYiUuLQ/8DYMoz0PUdtGgMFRVc= github.com/rs/xid v1.5.0/go.mod h1:trrq9SKmegXys3aeAKXMUTdJsYXVwGY3RLcfgqegfbg= github.com/russross/blackfriday/v2 v2.1.0/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM= @@ -376,6 +391,7 @@ go.opentelemetry.io/otel v1.23.0/go.mod h1:YCycw9ZeKhcJFrb34iVSkyT0iczq/zYDtZYFu go.opentelemetry.io/otel/metric v1.23.0 h1:pazkx7ss4LFVVYSxYew7L5I6qvLXHA0Ap2pwV+9Cnpo= go.opentelemetry.io/otel/metric v1.23.0/go.mod h1:MqUW2X2a6Q8RN96E2/nqNoT+z9BSms20Jb7Bbp+HiTo= go.opentelemetry.io/otel/sdk v1.21.0 h1:FTt8qirL1EysG6sTQRZ5TokkU8d0ugCj8htOgThZXQ8= +go.opentelemetry.io/otel/sdk v1.21.0/go.mod h1:Nna6Yv7PWTdgJHVRD9hIYywQBRx7pbox6nwBnZIxl/E= go.opentelemetry.io/otel/trace v1.23.0 h1:37Ik5Ib7xfYVb4V1UtnT97T1jI+AoIYkJyPkuL4iJgI= go.opentelemetry.io/otel/trace v1.23.0/go.mod h1:GSGTbIClEsuZrGIzoEHqsVfxgn5UkggkflQwDScNUsk= go.uber.org/atomic v1.9.0 h1:ECmE8Bn/WFTYwEW/bpKD3M8VtR/zQVbavAoalC1PYyE= @@ -383,6 +399,7 @@ go.uber.org/atomic v1.9.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc= go.uber.org/automaxprocs v1.5.3 h1:kWazyxZUrS3Gs4qUpbwo5kEIMGe/DAvi5Z4tl2NW4j8= go.uber.org/automaxprocs v1.5.3/go.mod h1:eRbA25aqJrxAbsLO0xy5jVwPt7FQnRgjW+efnwa1WM0= go.uber.org/goleak v1.1.11 h1:wy28qYRKZgnJTxGxvye5/wgWr1EKjmUDGYox5mGlRlI= +go.uber.org/goleak v1.1.11/go.mod h1:cwTWslyiVhfpKIDGSZEM2HlOvcqm+tG4zioyIeLoqMQ= go.uber.org/multierr v1.11.0 h1:blXXJkSxSSfBVBlC76pxqeO+LN3aDfLQo+309xJstO0= go.uber.org/multierr v1.11.0/go.mod h1:20+QtiLqy0Nd6FdQB9TLXag12DsQkrbs3htMFfDN80Y= go.uber.org/zap v1.24.0 h1:FiJd5l1UOLj0wCgbSE0rwwXHzEdAZS6hiiSnxJN/D60= @@ -489,6 +506,7 @@ golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8T golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20220907171357-04be3eba64a2 h1:H2TDz8ibqkAF6YGhCdN3jS9O0/s90v0rJh3X/OLHEUk= +golang.org/x/xerrors v0.0.0-20220907171357-04be3eba64a2/go.mod h1:K8+ghG5WaK9qNqU5K3HdILfMLy1f3aNYFI/wnl100a8= google.golang.org/api v0.165.0 h1:zd5d4JIIIaYYsfVy1HzoXYZ9rWCSBxxAglbczzo7Bgc= google.golang.org/api v0.165.0/go.mod h1:2OatzO7ZDQsoS7IFf3rvsE17/TldiU3F/zxFHeqUB5o= google.golang.org/appengine v1.1.0/go.mod h1:EbEs0AVv82hx2wNQdGPgUI5lhzA/G0D9YwlJXL52JkM= @@ -526,6 +544,7 @@ google.golang.org/protobuf v1.33.0 h1:uNO2rsAINq/JlFpSdYEKIZ0uKD/R9cpdv0T+yoGwGm google.golang.org/protobuf v1.33.0/go.mod h1:c6P6GXX6sHbq/GpV6MGZEdwhWPcYBgnhAHhKbcUYpos= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk= +gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q= gopkg.in/fsnotify.v1 v1.4.7/go.mod h1:Tz8NjZHkW78fSQdbUxIjBTcgA1z1m8ZHf0WmKUhAMys= gopkg.in/ini.v1 v1.67.0 h1:Dgnx+6+nfE+IfzjUEISNeydPJh9AXNNsWbGP9KzCsOA= gopkg.in/ini.v1 v1.67.0/go.mod h1:pNLf8WUiyNEtQjuu5G5vTm06TEv9tsIgeAvK8hOrP4k= diff --git a/internal/rpc/conversation/conversaion.go b/internal/rpc/conversation/conversaion.go index d30053398..20eb46691 100644 --- a/internal/rpc/conversation/conversaion.go +++ b/internal/rpc/conversation/conversaion.go @@ -27,7 +27,6 @@ import ( tablerelation "github.com/openimsdk/open-im-server/v3/pkg/common/db/table/relation" "github.com/openimsdk/open-im-server/v3/pkg/common/servererrs" "github.com/openimsdk/open-im-server/v3/pkg/rpcclient" - "github.com/openimsdk/open-im-server/v3/pkg/rpcclient/notification" "github.com/openimsdk/protocol/constant" pbconversation "github.com/openimsdk/protocol/conversation" "github.com/openimsdk/protocol/sdkws" @@ -44,7 +43,7 @@ type conversationServer struct { user *rpcclient.UserRpcClient groupRpcClient *rpcclient.GroupRpcClient conversationDatabase controller.ConversationDatabase - conversationNotificationSender *notification.ConversationNotificationSender + conversationNotificationSender *ConversationNotificationSender config *Config } @@ -78,7 +77,7 @@ func Start(ctx context.Context, config *Config, client discovery.SvcDiscoveryReg pbconversation.RegisterConversationServer(server, &conversationServer{ msgRpcClient: &msgRpcClient, user: &userRpcClient, - conversationNotificationSender: notification.NewConversationNotificationSender(&config.NotificationConfig, &msgRpcClient), + conversationNotificationSender: NewConversationNotificationSender(&config.NotificationConfig, &msgRpcClient), groupRpcClient: &groupRpcClient, conversationDatabase: controller.NewConversationDatabase(conversationDB, cache.NewConversationRedis(rdb, &config.LocalCacheConfig, cache.GetDefaultOpt(), conversationDB), mgocli.GetTx()), }) diff --git a/pkg/rpcclient/notification/conversation.go b/internal/rpc/conversation/notification.go similarity index 99% rename from pkg/rpcclient/notification/conversation.go rename to internal/rpc/conversation/notification.go index dee74c3d8..2eabc4e46 100644 --- a/pkg/rpcclient/notification/conversation.go +++ b/internal/rpc/conversation/notification.go @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package notification +package conversation import ( "context" diff --git a/internal/rpc/friend/friend.go b/internal/rpc/friend/friend.go index e3bec71eb..43dfc8900 100644 --- a/internal/rpc/friend/friend.go +++ b/internal/rpc/friend/friend.go @@ -29,7 +29,6 @@ import ( tablerelation "github.com/openimsdk/open-im-server/v3/pkg/common/db/table/relation" "github.com/openimsdk/open-im-server/v3/pkg/common/servererrs" "github.com/openimsdk/open-im-server/v3/pkg/rpcclient" - "github.com/openimsdk/open-im-server/v3/pkg/rpcclient/notification" "github.com/openimsdk/protocol/constant" pbfriend "github.com/openimsdk/protocol/friend" "github.com/openimsdk/protocol/sdkws" @@ -49,7 +48,7 @@ type friendServer struct { friendDatabase controller.FriendDatabase blackDatabase controller.BlackDatabase userRpcClient *rpcclient.UserRpcClient - notificationSender *notification.FriendNotificationSender + notificationSender *FriendNotificationSender conversationRpcClient rpcclient.ConversationRpcClient RegisterCenter discovery.SvcDiscoveryRegistry config *Config @@ -97,10 +96,10 @@ func Start(ctx context.Context, config *Config, client discovery.SvcDiscoveryReg msgRpcClient := rpcclient.NewMessageRpcClient(client, config.Share.RpcRegisterName.Msg) // Initialize notification sender - notificationSender := notification.NewFriendNotificationSender( + notificationSender := NewFriendNotificationSender( &config.NotificationConfig, &msgRpcClient, - notification.WithRpcFunc(userRpcClient.GetUsersInfo), + WithRpcFunc(userRpcClient.GetUsersInfo), ) cache.InitLocalCache(&config.LocalCacheConfig) diff --git a/pkg/rpcclient/notification/friend.go b/internal/rpc/friend/notification.go similarity index 95% rename from pkg/rpcclient/notification/friend.go rename to internal/rpc/friend/notification.go index 76438023b..ec628fe29 100644 --- a/pkg/rpcclient/notification/friend.go +++ b/internal/rpc/friend/notification.go @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package notification +package friend import ( "context" @@ -22,6 +22,7 @@ import ( "github.com/openimsdk/open-im-server/v3/pkg/common/db/controller" relationtb "github.com/openimsdk/open-im-server/v3/pkg/common/db/table/relation" "github.com/openimsdk/open-im-server/v3/pkg/rpcclient" + "github.com/openimsdk/open-im-server/v3/pkg/rpcclient/notification" "github.com/openimsdk/protocol/constant" pbfriend "github.com/openimsdk/protocol/friend" "github.com/openimsdk/protocol/sdkws" @@ -31,7 +32,7 @@ import ( type FriendNotificationSender struct { *rpcclient.NotificationSender // Target not found err - getUsersInfo func(ctx context.Context, userIDs []string) ([]CommonUser, error) + getUsersInfo func(ctx context.Context, userIDs []string) ([]notification.CommonUser, error) // db controller db controller.FriendDatabase } @@ -48,7 +49,7 @@ func WithDBFunc( fn func(ctx context.Context, userIDs []string) (users []*relationtb.UserModel, err error), ) friendNotificationSenderOptions { return func(s *FriendNotificationSender) { - f := func(ctx context.Context, userIDs []string) (result []CommonUser, err error) { + f := func(ctx context.Context, userIDs []string) (result []notification.CommonUser, err error) { users, err := fn(ctx, userIDs) if err != nil { return nil, err @@ -66,7 +67,7 @@ func WithRpcFunc( fn func(ctx context.Context, userIDs []string) ([]*sdkws.UserInfo, error), ) friendNotificationSenderOptions { return func(s *FriendNotificationSender) { - f := func(ctx context.Context, userIDs []string) (result []CommonUser, err error) { + f := func(ctx context.Context, userIDs []string) (result []notification.CommonUser, err error) { users, err := fn(ctx, userIDs) if err != nil { return nil, err diff --git a/internal/rpc/msg/as_read.go b/internal/rpc/msg/as_read.go index 8d001ff56..cf3d5f546 100644 --- a/internal/rpc/msg/as_read.go +++ b/internal/rpc/msg/as_read.go @@ -208,7 +208,7 @@ func (m *msgServer) sendMarkAsReadNotification(ctx context.Context, conversation Seqs: seqs, HasReadSeq: hasReadSeq, } - err := m.notificationSender.NotificationWithSesstionType(ctx, sendID, recvID, constant.HasReadReceipt, sessionType, tips) + err := m.notificationSender.NotificationWithSessionType(ctx, sendID, recvID, constant.HasReadReceipt, sessionType, tips) if err != nil { log.ZWarn(ctx, "send has read Receipt err", err) } diff --git a/internal/rpc/msg/delete.go b/internal/rpc/msg/delete.go index 3f09d4225..e19bba867 100644 --- a/internal/rpc/msg/delete.go +++ b/internal/rpc/msg/delete.go @@ -79,7 +79,7 @@ func (m *msgServer) DeleteMsgs(ctx context.Context, req *msg.DeleteMsgsReq) (*ms return nil, err } tips := &sdkws.DeleteMsgsTips{UserID: req.UserID, ConversationID: req.ConversationID, Seqs: req.Seqs} - m.notificationSender.NotificationWithSesstionType( + m.notificationSender.NotificationWithSessionType( ctx, req.UserID, m.conversationAndGetRecvID(conversations[0], req.UserID), @@ -93,7 +93,7 @@ func (m *msgServer) DeleteMsgs(ctx context.Context, req *msg.DeleteMsgsReq) (*ms } if isSyncSelf { tips := &sdkws.DeleteMsgsTips{UserID: req.UserID, ConversationID: req.ConversationID, Seqs: req.Seqs} - m.notificationSender.NotificationWithSesstionType(ctx, req.UserID, req.UserID, constant.DeleteMsgsNotification, constant.SingleChatType, tips) + m.notificationSender.NotificationWithSessionType(ctx, req.UserID, req.UserID, constant.DeleteMsgsNotification, constant.SingleChatType, tips) } } return &msg.DeleteMsgsResp{}, nil @@ -144,7 +144,7 @@ func (m *msgServer) clearConversation(ctx context.Context, conversationIDs []str // notification 2 self if isSyncSelf { tips := &sdkws.ClearConversationTips{UserID: userID, ConversationIDs: existConversationIDs} - m.notificationSender.NotificationWithSesstionType(ctx, userID, userID, constant.ClearConversationNotification, constant.SingleChatType, tips) + m.notificationSender.NotificationWithSessionType(ctx, userID, userID, constant.ClearConversationNotification, constant.SingleChatType, tips) } } else { if err := m.MsgDatabase.SetMinSeqs(ctx, m.getMinSeqs(maxSeqs)); err != nil { @@ -152,7 +152,7 @@ func (m *msgServer) clearConversation(ctx context.Context, conversationIDs []str } for _, conversation := range existConversations { tips := &sdkws.ClearConversationTips{UserID: userID, ConversationIDs: []string{conversation.ConversationID}} - m.notificationSender.NotificationWithSesstionType(ctx, userID, m.conversationAndGetRecvID(conversation, userID), constant.ClearConversationNotification, conversation.ConversationType, tips) + m.notificationSender.NotificationWithSessionType(ctx, userID, m.conversationAndGetRecvID(conversation, userID), constant.ClearConversationNotification, conversation.ConversationType, tips) } } if err := m.MsgDatabase.UserSetHasReadSeqs(ctx, userID, maxSeqs); err != nil { diff --git a/internal/rpc/msg/notification.go b/internal/rpc/msg/notification.go index e06c43943..80f034f45 100644 --- a/internal/rpc/msg/notification.go +++ b/internal/rpc/msg/notification.go @@ -46,5 +46,5 @@ func (m *MsgNotificationSender) MarkAsReadNotification(ctx context.Context, conv Seqs: seqs, HasReadSeq: hasReadSeq, } - return m.NotificationWithSesstionType(ctx, sendID, recvID, constant.HasReadReceipt, sesstionType, tips) + return m.NotificationWithSessionType(ctx, sendID, recvID, constant.HasReadReceipt, sesstionType, tips) } diff --git a/internal/rpc/msg/revoke.go b/internal/rpc/msg/revoke.go index 167e2d0aa..4c9b5dcf0 100644 --- a/internal/rpc/msg/revoke.go +++ b/internal/rpc/msg/revoke.go @@ -123,7 +123,7 @@ func (m *msgServer) RevokeMsg(ctx context.Context, req *msg.RevokeMsgReq) (*msg. } else { recvID = msgs[0].RecvID } - if err := m.notificationSender.NotificationWithSesstionType(ctx, req.UserID, recvID, constant.MsgRevokeNotification, msgs[0].SessionType, &tips); err != nil { + if err := m.notificationSender.NotificationWithSessionType(ctx, req.UserID, recvID, constant.MsgRevokeNotification, msgs[0].SessionType, &tips); err != nil { return nil, err } m.webhookAfterRevokeMsg(ctx, &m.config.WebhooksConfig.AfterRevokeMsg, req) diff --git a/internal/rpc/user/user.go b/internal/rpc/user/user.go index 273a6ca42..1fe7c5076 100644 --- a/internal/rpc/user/user.go +++ b/internal/rpc/user/user.go @@ -16,6 +16,7 @@ package user import ( "context" + "github.com/openimsdk/open-im-server/v3/internal/rpc/friend" "github.com/openimsdk/open-im-server/v3/pkg/common/config" "github.com/openimsdk/open-im-server/v3/pkg/common/webhook" "github.com/openimsdk/open-im-server/v3/pkg/util/memAsyncQueue" @@ -33,7 +34,6 @@ import ( tablerelation "github.com/openimsdk/open-im-server/v3/pkg/common/db/table/relation" "github.com/openimsdk/open-im-server/v3/pkg/common/servererrs" "github.com/openimsdk/open-im-server/v3/pkg/rpcclient" - "github.com/openimsdk/open-im-server/v3/pkg/rpcclient/notification" "github.com/openimsdk/protocol/constant" "github.com/openimsdk/protocol/sdkws" pbuser "github.com/openimsdk/protocol/user" @@ -53,7 +53,7 @@ const ( type userServer struct { db controller.UserDatabase - friendNotificationSender *notification.FriendNotificationSender + friendNotificationSender *friend.FriendNotificationSender userNotificationSender *UserNotificationSender friendRpcClient *rpcclient.FriendRpcClient groupRpcClient *rpcclient.GroupRpcClient @@ -104,7 +104,7 @@ func Start(ctx context.Context, config *Config, client registry.SvcDiscoveryRegi RegisterCenter: client, friendRpcClient: &friendRpcClient, groupRpcClient: &groupRpcClient, - friendNotificationSender: notification.NewFriendNotificationSender(&config.NotificationConfig, &msgRpcClient, notification.WithDBFunc(database.FindWithError)), + friendNotificationSender: friend.NewFriendNotificationSender(&config.NotificationConfig, &msgRpcClient, friend.WithDBFunc(database.FindWithError)), userNotificationSender: NewUserNotificationSender(config, &msgRpcClient, WithUserFunc(database.FindWithError)), config: config, webhookClient: webhook.NewWebhookClient(config.WebhooksConfig.URL, memAsyncQueue.NewMemoryQueue(webhookWorkerCount, webhookBufferSize)), diff --git a/pkg/common/notification/notification.go b/pkg/common/notification/notification.go new file mode 100644 index 000000000..d29b97ef7 --- /dev/null +++ b/pkg/common/notification/notification.go @@ -0,0 +1,51 @@ +package notification + +import ( + "github.com/openimsdk/open-im-server/v3/pkg/callbackstruct" + "github.com/openimsdk/open-im-server/v3/pkg/common/config" + "github.com/openimsdk/open-im-server/v3/pkg/util/memAsyncQueue" + "github.com/openimsdk/tools/utils/httputil" + "net/http" +) + +package webhook + +import ( +"context" +"encoding/json" +"github.com/openimsdk/open-im-server/v3/pkg/callbackstruct" +"github.com/openimsdk/open-im-server/v3/pkg/common/config" +"github.com/openimsdk/open-im-server/v3/pkg/common/servererrs" +"github.com/openimsdk/open-im-server/v3/pkg/util/memAsyncQueue" +"github.com/openimsdk/protocol/constant" +"github.com/openimsdk/tools/log" +"github.com/openimsdk/tools/utils/httputil" +"net/http" +) + +type Client struct { + url string + queue *memAsyncQueue.MemoryQueue +} + +func NewWebhookClient(url string, queue *memAsyncQueue.MemoryQueue) *Client { + http.DefaultTransport.(*http.Transport).MaxConnsPerHost = 100 // Enhance the default number of max connections per host + return &Client{ + + url: url, + queue: queue, + } +} + +func (c *Client) SyncPost(ctx context.Context, command string, req callbackstruct.CallbackReq, resp callbackstruct.CallbackResp, before *config.BeforeConfig) error { + if before.Enable { + return c.post(ctx, command, req, resp, before.Timeout) + } + return nil +} + +func (c *Client) AsyncPost(ctx context.Context, command string, req callbackstruct.CallbackReq, resp callbackstruct.CallbackResp, after *config.AfterConfig) { + if after.Enable { + c.queue.Push(func() { c.post(ctx, command, req, resp, after.Timeout) }) + } +} \ No newline at end of file diff --git a/pkg/rpcclient/msg.go b/pkg/rpcclient/msg.go index 950b119f9..15c538449 100644 --- a/pkg/rpcclient/msg.go +++ b/pkg/rpcclient/msg.go @@ -259,7 +259,7 @@ func WithRpcGetUserName() NotificationOptions { } } -func (s *NotificationSender) NotificationWithSesstionType(ctx context.Context, sendID, recvID string, contentType, sesstionType int32, m proto.Message, opts ...NotificationOptions) (err error) { +func (s *NotificationSender) NotificationWithSessionType(ctx context.Context, sendID, recvID string, contentType, sesstionType int32, m proto.Message, opts ...NotificationOptions) (err error) { n := sdkws.NotificationElem{Detail: jsonutil.StructToJsonString(m)} content, err := json.Marshal(&n) if err != nil { @@ -309,7 +309,7 @@ func (s *NotificationSender) NotificationWithSesstionType(ctx context.Context, s } func (s *NotificationSender) Notification(ctx context.Context, sendID, recvID string, contentType int32, m proto.Message, opts ...NotificationOptions) error { - return s.NotificationWithSesstionType(ctx, sendID, recvID, contentType, s.sessionTypeConf[contentType], m, opts...) + return s.NotificationWithSessionType(ctx, sendID, recvID, contentType, s.sessionTypeConf[contentType], m, opts...) } func (s *NotificationSender) SetOptionsByContentType(_ context.Context, options map[string]bool, contentType int32) {