From a7518b32ee2b78a975ca190e49aa4cc989edfa7c Mon Sep 17 00:00:00 2001 From: Gordon <46924906+FGadvancer@users.noreply.github.com> Date: Sun, 28 Apr 2024 20:48:33 +0800 Subject: [PATCH] refactor: msg transfer refactor. --- go.mod | 4 +- go.sum | 23 +-- internal/msgtransfer/init.go | 64 ++----- .../msgtransfer/online_history_msg_handler.go | 159 +++--------------- .../online_msg_to_mongo_handler.go | 2 +- internal/push/push_handler.go | 3 +- internal/rpc/group/group.go | 14 -- pkg/{util => tools}/batcher/batcher.go | 27 +-- pkg/{util => tools}/batcher/batcher_test.go | 0 9 files changed, 58 insertions(+), 238 deletions(-) rename pkg/{util => tools}/batcher/batcher.go (88%) rename pkg/{util => tools}/batcher/batcher_test.go (100%) diff --git a/go.mod b/go.mod index cc0de7614..a1154d9f6 100644 --- a/go.mod +++ b/go.mod @@ -1,6 +1,6 @@ module github.com/openimsdk/open-im-server/v3 -go 1.21.2 +go 1.20 require ( firebase.google.com/go v3.13.0+incompatible @@ -14,7 +14,7 @@ require ( github.com/lestrrat-go/file-rotatelogs v2.4.0+incompatible // indirect github.com/mitchellh/mapstructure v1.5.0 github.com/openimsdk/protocol v0.0.65 - github.com/openimsdk/tools v0.0.49-alpha.2 + github.com/openimsdk/tools v0.0.49-alpha. github.com/pkg/errors v0.9.1 // indirect github.com/prometheus/client_golang v1.18.0 github.com/stretchr/testify v1.9.0 diff --git a/go.sum b/go.sum index 7c256c58d..e19858a62 100644 --- a/go.sum +++ b/go.sum @@ -22,13 +22,10 @@ github.com/QcloudApi/qcloud_sign_golang v0.0.0-20141224014652-e4130a326409/go.mo github.com/aliyun/aliyun-oss-go-sdk v3.0.2+incompatible h1:8psS8a+wKfiLt1iVDX79F7Y6wUM49Lcha2FMXt4UM8g= github.com/aliyun/aliyun-oss-go-sdk v3.0.2+incompatible/go.mod h1:T/Aws4fEfogEE9v+HPhhw+CntffsBHJ8nXQCwKr0/g8= github.com/benbjohnson/clock v1.1.0 h1:Q92kusRqC1XV2MjkWETPvjJVqKetz1OzxZB7mHJLju8= -github.com/benbjohnson/clock v1.1.0/go.mod h1:J11/hYXuz8f4ySSvYwY0FKfm+ezbsZBKZxNJlLklBHA= github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM= github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw= github.com/bsm/ginkgo/v2 v2.12.0 h1:Ny8MWAHyOepLGlLKYmXG4IEkioBysk6GpaRTLC8zwWs= -github.com/bsm/ginkgo/v2 v2.12.0/go.mod h1:SwYbGRRDovPVboqFv0tPTcG1sN61LM1Z4ARdbAV9g4c= github.com/bsm/gomega v1.27.10 h1:yeMWxP2pV2fG3FgAODIY8EiRE3dy0aeFYt4l7wh6yKA= -github.com/bsm/gomega v1.27.10/go.mod h1:JyEr/xRbxbtgWNi8tIEVPUYZ5Dzef52k01W3YH0H+O0= github.com/bytedance/sonic v1.5.0/go.mod h1:ED5hyg4y6t3/9Ku1R6dU/4KyJ48DZ4jPhfY1O2AihPM= github.com/bytedance/sonic v1.9.1 h1:6iJ6NqdoxCDr6mbY8h18oSO+cShGSMRGCEo7F2h0x8s= github.com/bytedance/sonic v1.9.1/go.mod h1:i736AoUSYt75HyZLoJW9ERYxcy6eaN6h4BZXU064P/U= @@ -46,7 +43,6 @@ github.com/clbanning/mxj v1.8.4/go.mod h1:BVjHeAH+rl9rs6f+QIpeRl0tfu10SXn1pUSa5P github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw= github.com/cncf/udpa/go v0.0.0-20191209042840-269d4d468f6f/go.mod h1:M8M6+tZqaGXZJjfX53e64911xZQV5JYwmTeXPW+k8Sc= github.com/cncf/xds/go v0.0.0-20231128003011-0fa0005c9caa h1:jQCWAUqqlij9Pgj2i/PB79y4KOPYVyFYdROxgaCwdTQ= -github.com/cncf/xds/go v0.0.0-20231128003011-0fa0005c9caa/go.mod h1:x/1Gn8zydmfq8dk6e9PdstVsDgu9RuyIIJqAaF//0IM= github.com/cpuguy83/go-md2man/v2 v2.0.3/go.mod h1:tgQtvFlXSQOSOSIRvRPT7W67SCa46tRHOmNcaadrF8o= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= @@ -70,15 +66,12 @@ github.com/envoyproxy/go-control-plane v0.9.1-0.20191026205805-5f8ba28d4473/go.m github.com/envoyproxy/go-control-plane v0.9.4/go.mod h1:6rpuAdCZL397s3pYoYcLgu1mIlRU8Am5FuJP05cCM98= github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7+kN2VEUnK/pcBlmesArF7c= github.com/envoyproxy/protoc-gen-validate v1.0.4 h1:gVPz/FMfvh57HdSJQyvBtF00j8JU4zdyUgIUNhlgg0A= -github.com/envoyproxy/protoc-gen-validate v1.0.4/go.mod h1:qys6tmnRsYrQqIhm2bvKZH4Blx/1gTIZ2UKVY1M+Yew= github.com/fatih/color v1.14.1 h1:qfhVLaG5s+nCROl1zJsZRxFeYrHLqWroPOQ8BWiNb4w= github.com/fatih/color v1.14.1/go.mod h1:2oHN61fhTpgcxD3TSWCgKDiH1+x4OiDVVGH8WlgGZGg= github.com/felixge/httpsnoop v1.0.4 h1:NFTV2Zj1bL4mc9sqWACXbQFVBBg2W3GPvqp8/ESS2Wg= github.com/felixge/httpsnoop v1.0.4/go.mod h1:m8KPJKqk1gH5J9DgRY2ASl2lWCfGKXixSwevea8zH2U= github.com/fortytw2/leaktest v1.3.0 h1:u8491cBMTQ8ft8aeV+adlcytMZylmA5nnwwkRZjI8vw= -github.com/fortytw2/leaktest v1.3.0/go.mod h1:jDsjWgpAGjm2CA7WthBh/CdZYEPF31XHquHwclZch5g= github.com/frankban/quicktest v1.14.6 h1:7Xjx+VpznH+oBnejlPUj8oUpdxnVs4f8XU8WnHkI4W8= -github.com/frankban/quicktest v1.14.6/go.mod h1:4ptaffx2x8+WTWXmUCuVU6aPUX1/Mz7zb5vbUoiM6w0= github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo= github.com/fsnotify/fsnotify v1.4.9/go.mod h1:znqG4EE+3YCdAaPaxE2ZRY/06pZUdp0tY4IgpuI1SZQ= github.com/fsnotify/fsnotify v1.7.0 h1:8JEhPFa5W2WU7YfeZzPNqzMP6Lwt7L2715Ggo0nosvA= @@ -97,7 +90,6 @@ github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre github.com/go-ole/go-ole v1.2.6 h1:/Fpf6oFPoeFik9ty7siob0G6Ke8QvQEuVcuChpwXzpY= github.com/go-ole/go-ole v1.2.6/go.mod h1:pprOEPIfldk/42T2oK7lQ4v4JSDwmV0As9GaiUsvbm0= github.com/go-playground/assert/v2 v2.2.0 h1:JvknZsQTYeFEAhQwI4qEt9cyV5ONwRHC+lYKSsYSR8s= -github.com/go-playground/assert/v2 v2.2.0/go.mod h1:VDjEfimB/XKnb+ZQfWdccd7VUvScMdVu0Titje2rxJ4= github.com/go-playground/locales v0.14.1 h1:EWaQ/wswjilfKLTECiXz7Rh+3BjFhfDFKv/oXslEjJA= github.com/go-playground/locales v0.14.1/go.mod h1:hxrqLVvrK65+Rwrd5Fc6F2O76J/NuW9t0sjnWqG1slY= github.com/go-playground/universal-translator v0.18.1 h1:Bcnm0ZwsGyWbCzImXv+pAJnYK9S473LQFuzCbDbfSFY= @@ -145,13 +137,11 @@ github.com/google/go-cmp v0.5.2/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/ github.com/google/go-cmp v0.5.3/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI= -github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= github.com/google/go-querystring v1.0.0/go.mod h1:odCYkC5MyYFN7vkCjXpyrEuKhc/BUO6wN/zVPAxq5ck= github.com/google/go-querystring v1.1.0 h1:AnCroh3fv4ZBgVIf1Iwtovgjaw/GiKJo8M8yD/fhyJ8= github.com/google/go-querystring v1.1.0/go.mod h1:Kcdr2DB4koayq7X8pmAG4sNG59So17icRSOU623lUBU= github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg= github.com/google/martian/v3 v3.3.2 h1:IqNFLAmvJOgVlpdEBiQbDc2EwKW77amAycfTuWKdfvw= -github.com/google/martian/v3 v3.3.2/go.mod h1:oBOf6HBosgwRXnUGWUB05QECsc6uvmMiJ3+6W4l/CUk= github.com/google/pprof v0.0.0-20210407192527-94a9f03dee38/go.mod h1:kpwsk12EmLew5upagYY7GY0pfYCcupk39gWOCRROcvE= github.com/google/s2a-go v0.1.7 h1:60BLSyTrOV4/haCDW4zb1guZItoSq8foHCXrAnjBo/o= github.com/google/s2a-go v0.1.7/go.mod h1:50CgR4k1jNlWBu4UfS4AcfhVe1r6pdZPygJ3R8F0Qdw= @@ -204,7 +194,6 @@ github.com/jinzhu/inflection v1.0.0/go.mod h1:h+uFLlag+Qp1Va5pdKtLDYj+kHp5pxUVkr github.com/jinzhu/now v1.1.5 h1:/o9tlHleP7gOFmsnYNz3RGnqzefHA47wQpKrrdTIwXQ= github.com/jinzhu/now v1.1.5/go.mod h1:d3SSVoowX0Lcu0IBviAWJpolVfI5UJVZZ7cO71lE/z8= github.com/jonboulle/clockwork v0.4.0 h1:p4Cf1aMWXnXAUh8lVfewRBx1zaTSYKrKMF2g3ST4RZ4= -github.com/jonboulle/clockwork v0.4.0/go.mod h1:xgRqUGwRcjKCO1vbZUEtSLrqKoPSsUpK7fnezOII0kc= github.com/json-iterator/go v1.1.12 h1:PV8peI4a0ysnczrg+LtxykD8LfKY9ML6u2jnxaEnrnM= github.com/json-iterator/go v1.1.12/go.mod h1:e30LSqwooZae/UwlEbR2852Gd8hjQvJoHmT4TnhNGBo= github.com/kelindar/bitmap v1.5.2 h1:XwX7CTvJtetQZ64zrOkApoZZHBJRkjE23NfqUALA/HE= @@ -220,9 +209,7 @@ github.com/klauspost/cpuid/v2 v2.0.9/go.mod h1:FInQzS24/EEf25PyTYn52gqo7WaD8xa02 github.com/klauspost/cpuid/v2 v2.2.6 h1:ndNyv040zDGIDh8thGkXYjnFtiN02M1PVVF+JE/48xc= github.com/klauspost/cpuid/v2 v2.2.6/go.mod h1:Lcz8mBdAVJIBVzewtcLocK12l3Y+JytZYpaMropDUws= github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE= -github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk= github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= -github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= github.com/leodido/go-urn v1.4.0 h1:WT9HwE9SGECu3lg4d/dIA+jxlljEa1/ffXKmRjqdmIQ= github.com/leodido/go-urn v1.4.0/go.mod h1:bvxc+MVxLKB4z00jd1z+Dvzr47oO32F/QSNjSBOlFxI= github.com/lestrrat-go/envload v0.0.0-20180220234015-a3eb8ddeffcc h1:RKf14vYWi2ttpEmkA4aQ3j4u9dStX2t4M8UM6qqNsG8= @@ -283,8 +270,8 @@ github.com/openimsdk/gomake v0.0.11 h1:jJ9286zKFfBeARkmfqMEcUYg9lJ+Cj9lylxP8W9uC github.com/openimsdk/gomake v0.0.11/go.mod h1:PndCozNc2IsQIciyn9mvEblYWZwJmAI+06z94EY+csI= github.com/openimsdk/protocol v0.0.65 h1:SPT9qyUsFRTTKSKb/FjpS+xr6sxz/Kbnu+su1bxYagc= github.com/openimsdk/protocol v0.0.65/go.mod h1:OZQA9FR55lseYoN2Ql1XAHYKHJGu7OMNkUbuekrKCM8= -github.com/openimsdk/tools v0.0.49-alpha.2 h1:8IfV6o2ySU7C54sh/MG7ctEp1h3lSNe03OCUDWSk5Ws= -github.com/openimsdk/tools v0.0.49-alpha.2/go.mod h1:P4oGP1Pd+d4ctbLD5U/XQTgl8yu8Hd3skx640Fr69ko= +github.com/openimsdk/tools v0.0.49-alpha.5 h1:kJacyByLDjObO5w5EMPoo/JjD9D2fhSAQ5qeK2XgawI= +github.com/openimsdk/tools v0.0.49-alpha.5/go.mod h1:P4oGP1Pd+d4ctbLD5U/XQTgl8yu8Hd3skx640Fr69ko= github.com/pelletier/go-toml/v2 v2.1.0 h1:FnwAJ4oYMvbT/34k9zzHuZNrhlz48GB3/s6at6/MHO4= github.com/pelletier/go-toml/v2 v2.1.0/go.mod h1:tJU2Z3ZkXwnxa4DPO899bsyIoywizdUvyaeZurnPPDc= github.com/pierrec/lz4/v4 v4.1.21 h1:yOVMLb6qSIDP67pl/5F7RepeKYu/VmTyEXvuMI5d9mQ= @@ -295,7 +282,6 @@ github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZN github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 h1:Jamvg5psRIccs7FGNTlIRMkT8wgtp5eCXdBlqhYGL6U= github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/prashantv/gostub v1.1.0 h1:BTyx3RfQjRHnUWaGF9oQos79AlQ5k8WNktv7VGvVH4g= -github.com/prashantv/gostub v1.1.0/go.mod h1:A5zLQHz7ieHGG7is6LLXLz7I8+3LZzsrV0P1IAHhP5U= github.com/prometheus/client_golang v1.18.0 h1:HzFfmkOzH5Q8L8G+kSJKUx5dtG87sewO+FoDDqP5Tbk= github.com/prometheus/client_golang v1.18.0/go.mod h1:T+GXkCk5wSJyOqMIzVgvvjFDlkOQntgjkJWKrN5txjA= github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA= @@ -312,7 +298,6 @@ github.com/redis/go-redis/v9 v9.4.0/go.mod h1:hdY0cQFCN4fnSYT6TkisLufl/4W5UIXyv0 github.com/robfig/cron/v3 v3.0.1 h1:WdRxkvbJztn8LMz/QEvLN5sBU+xKpSqwwUO1Pjr4qDs= github.com/robfig/cron/v3 v3.0.1/go.mod h1:eQICP3HwyT7UooqI/z+Ov+PtYAWygg1TEWWzGIFLtro= github.com/rogpeppe/go-internal v1.11.0 h1:cWPaGQEPrBb5/AsnsZesgZZ9yb1OQ+GOISoDNXVBh4M= -github.com/rogpeppe/go-internal v1.11.0/go.mod h1:ddIwULY96R17DhadqLgMfk9H9tvdUzkipdSkR5nkCZA= github.com/rs/xid v1.5.0 h1:mKX4bl4iPYJtEIxp6CYiUuLQ/8DYMoz0PUdtGgMFRVc= github.com/rs/xid v1.5.0/go.mod h1:trrq9SKmegXys3aeAKXMUTdJsYXVwGY3RLcfgqegfbg= github.com/russross/blackfriday/v2 v2.1.0/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM= @@ -391,7 +376,6 @@ go.opentelemetry.io/otel v1.23.0/go.mod h1:YCycw9ZeKhcJFrb34iVSkyT0iczq/zYDtZYFu go.opentelemetry.io/otel/metric v1.23.0 h1:pazkx7ss4LFVVYSxYew7L5I6qvLXHA0Ap2pwV+9Cnpo= go.opentelemetry.io/otel/metric v1.23.0/go.mod h1:MqUW2X2a6Q8RN96E2/nqNoT+z9BSms20Jb7Bbp+HiTo= go.opentelemetry.io/otel/sdk v1.21.0 h1:FTt8qirL1EysG6sTQRZ5TokkU8d0ugCj8htOgThZXQ8= -go.opentelemetry.io/otel/sdk v1.21.0/go.mod h1:Nna6Yv7PWTdgJHVRD9hIYywQBRx7pbox6nwBnZIxl/E= go.opentelemetry.io/otel/trace v1.23.0 h1:37Ik5Ib7xfYVb4V1UtnT97T1jI+AoIYkJyPkuL4iJgI= go.opentelemetry.io/otel/trace v1.23.0/go.mod h1:GSGTbIClEsuZrGIzoEHqsVfxgn5UkggkflQwDScNUsk= go.uber.org/atomic v1.9.0 h1:ECmE8Bn/WFTYwEW/bpKD3M8VtR/zQVbavAoalC1PYyE= @@ -399,7 +383,6 @@ go.uber.org/atomic v1.9.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc= go.uber.org/automaxprocs v1.5.3 h1:kWazyxZUrS3Gs4qUpbwo5kEIMGe/DAvi5Z4tl2NW4j8= go.uber.org/automaxprocs v1.5.3/go.mod h1:eRbA25aqJrxAbsLO0xy5jVwPt7FQnRgjW+efnwa1WM0= go.uber.org/goleak v1.1.11 h1:wy28qYRKZgnJTxGxvye5/wgWr1EKjmUDGYox5mGlRlI= -go.uber.org/goleak v1.1.11/go.mod h1:cwTWslyiVhfpKIDGSZEM2HlOvcqm+tG4zioyIeLoqMQ= go.uber.org/multierr v1.11.0 h1:blXXJkSxSSfBVBlC76pxqeO+LN3aDfLQo+309xJstO0= go.uber.org/multierr v1.11.0/go.mod h1:20+QtiLqy0Nd6FdQB9TLXag12DsQkrbs3htMFfDN80Y= go.uber.org/zap v1.24.0 h1:FiJd5l1UOLj0wCgbSE0rwwXHzEdAZS6hiiSnxJN/D60= @@ -506,7 +489,6 @@ golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8T golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20220907171357-04be3eba64a2 h1:H2TDz8ibqkAF6YGhCdN3jS9O0/s90v0rJh3X/OLHEUk= -golang.org/x/xerrors v0.0.0-20220907171357-04be3eba64a2/go.mod h1:K8+ghG5WaK9qNqU5K3HdILfMLy1f3aNYFI/wnl100a8= google.golang.org/api v0.165.0 h1:zd5d4JIIIaYYsfVy1HzoXYZ9rWCSBxxAglbczzo7Bgc= google.golang.org/api v0.165.0/go.mod h1:2OatzO7ZDQsoS7IFf3rvsE17/TldiU3F/zxFHeqUB5o= google.golang.org/appengine v1.1.0/go.mod h1:EbEs0AVv82hx2wNQdGPgUI5lhzA/G0D9YwlJXL52JkM= @@ -544,7 +526,6 @@ google.golang.org/protobuf v1.33.0 h1:uNO2rsAINq/JlFpSdYEKIZ0uKD/R9cpdv0T+yoGwGm google.golang.org/protobuf v1.33.0/go.mod h1:c6P6GXX6sHbq/GpV6MGZEdwhWPcYBgnhAHhKbcUYpos= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk= -gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q= gopkg.in/fsnotify.v1 v1.4.7/go.mod h1:Tz8NjZHkW78fSQdbUxIjBTcgA1z1m8ZHf0WmKUhAMys= gopkg.in/ini.v1 v1.67.0 h1:Dgnx+6+nfE+IfzjUEISNeydPJh9AXNNsWbGP9KzCsOA= gopkg.in/ini.v1 v1.67.0/go.mod h1:pNLf8WUiyNEtQjuu5G5vTm06TEv9tsIgeAvK8hOrP4k= diff --git a/internal/msgtransfer/init.go b/internal/msgtransfer/init.go index 0cd0239eb..a0d274d5e 100644 --- a/internal/msgtransfer/init.go +++ b/internal/msgtransfer/init.go @@ -20,7 +20,6 @@ import ( "github.com/openimsdk/tools/db/mongoutil" "github.com/openimsdk/tools/db/redisutil" "github.com/openimsdk/tools/utils/datautil" - "net" "net/http" "os" "os/signal" @@ -96,28 +95,19 @@ func Start(ctx context.Context, index int, config *Config) error { } conversationRpcClient := rpcclient.NewConversationRpcClient(client, config.Share.RpcRegisterName.Conversation) groupRpcClient := rpcclient.NewGroupRpcClient(client, config.Share.RpcRegisterName.Group) - msgTransfer, err := NewMsgTransfer(&config.KafkaConfig, msgDatabase, &conversationRpcClient, &groupRpcClient) + historyCH, err := NewOnlineHistoryRedisConsumerHandler(&config.KafkaConfig, msgDatabase, &conversationRpcClient, &groupRpcClient) if err != nil { return err } - return msgTransfer.Start(index, config) -} - -func NewMsgTransfer(kafkaConf *config.Kafka, msgDatabase controller.CommonMsgDatabase, - conversationRpcClient *rpcclient.ConversationRpcClient, groupRpcClient *rpcclient.GroupRpcClient) (*MsgTransfer, error) { - historyCH, err := NewOnlineHistoryRedisConsumerHandler(kafkaConf, msgDatabase, conversationRpcClient, groupRpcClient) - if err != nil { - return nil, err - } - historyMongoCH, err := NewOnlineHistoryMongoConsumerHandler(kafkaConf, msgDatabase) + historyMongoCH, err := NewOnlineHistoryMongoConsumerHandler(&config.KafkaConfig, msgDatabase) if err != nil { - return nil, err + return err } - - return &MsgTransfer{ + msgTransfer := &MsgTransfer{ historyCH: historyCH, historyMongoCH: historyMongoCH, - }, nil + } + return msgTransfer.Start(index, config) } func (m *MsgTransfer) Start(index int, config *Config) error { @@ -134,6 +124,10 @@ func (m *MsgTransfer) Start(index int, config *Config) error { go m.historyCH.historyConsumerGroup.RegisterHandleAndConsumer(m.ctx, m.historyCH) go m.historyMongoCH.historyConsumerGroup.RegisterHandleAndConsumer(m.ctx, m.historyMongoCH) + err = m.historyCH.redisMessageBatches.Start() + if err != nil { + return err + } if config.MsgTransfer.Prometheus.Enable { go func() { @@ -158,50 +152,16 @@ func (m *MsgTransfer) Start(index int, config *Config) error { program.SIGTERMExit() // graceful close kafka client. m.cancel() + m.historyCH.redisMessageBatches.Close() m.historyCH.historyConsumerGroup.Close() m.historyMongoCH.historyConsumerGroup.Close() return nil case <-netDone: m.cancel() + m.historyCH.redisMessageBatches.Close() m.historyCH.historyConsumerGroup.Close() m.historyMongoCH.historyConsumerGroup.Close() close(netDone) return netErr } - - if config.MsgTransfer.Prometheus.Enable { - go func() { - proreg := prometheus.NewRegistry() - proreg.MustRegister( - collectors.NewGoCollector(), - ) - proreg.MustRegister(prommetrics.GetGrpcCusMetrics("Transfer", &config.Share)...) - - http.Handle("/metrics", promhttp.HandlerFor(proreg, promhttp.HandlerOpts{Registry: proreg})) - - lc := net.ListenConfig{ - Control: func(network, address string, c syscall.RawConn) error { - return c.Control(func(fd uintptr) { - err := syscall.SetsockoptInt(int(fd), syscall.SOL_SOCKET, syscall.SO_REUSEADDR, 1) - if err != nil { - return fmt.Errorf("setsockopt failed: %w", err) - } - }) - }, - } - - listener, err := lc.Listen(context.Background(), "tcp", fmt.Sprintf(":%d", prometheusPort)) - if err != nil { - netErr = errs.WrapMsg(err, "prometheus start error", "prometheusPort", prometheusPort) - netDone <- struct{}{} - return - } - - err = http.Serve(listener, nil) - if err != nil && err != http.ErrServerClosed { - netErr = errs.WrapMsg(err, "HTTP server start error", "prometheusPort", prometheusPort) - netDone <- struct{}{} - } - }() - } } diff --git a/internal/msgtransfer/online_history_msg_handler.go b/internal/msgtransfer/online_history_msg_handler.go index 0b24a6607..96747bde8 100644 --- a/internal/msgtransfer/online_history_msg_handler.go +++ b/internal/msgtransfer/online_history_msg_handler.go @@ -16,52 +16,34 @@ package msgtransfer import ( "context" - "github.com/openimsdk/open-im-server/v3/pkg/util/batcher" - "strconv" - "strings" - "sync" - "sync/atomic" - "time" - "github.com/IBM/sarama" "github.com/go-redis/redis" "github.com/openimsdk/open-im-server/v3/pkg/common/config" "github.com/openimsdk/open-im-server/v3/pkg/common/db/controller" "github.com/openimsdk/open-im-server/v3/pkg/msgprocessor" "github.com/openimsdk/open-im-server/v3/pkg/rpcclient" + "github.com/openimsdk/open-im-server/v3/pkg/tools/batcher" "github.com/openimsdk/protocol/constant" "github.com/openimsdk/protocol/sdkws" "github.com/openimsdk/tools/errs" "github.com/openimsdk/tools/log" "github.com/openimsdk/tools/mcontext" "github.com/openimsdk/tools/mq/kafka" - "github.com/openimsdk/tools/utils/idutil" "github.com/openimsdk/tools/utils/stringutil" "google.golang.org/protobuf/proto" + "strconv" + "strings" + "time" ) const ( - ConsumerMsgs = 3 - SourceMessages = 4 - MongoMessages = 5 - ChannelNum = 100 + size = 500 + mainDataBuffer = 500 + subChanBuffer = 50 + worker = 50 + interval = 100 * time.Millisecond ) -type MsgChannelValue struct { - uniqueKey string - ctx context.Context - ctxMsgList []*ContextMsg -} - -type TriggerChannelValue struct { - ctx context.Context - cMsgList []*sarama.ConsumerMessage -} - -type Cmd2Value struct { - Cmd int - Value any -} type ContextMsg struct { message *sdkws.MsgData ctx context.Context @@ -69,8 +51,6 @@ type ContextMsg struct { type OnlineHistoryRedisConsumerHandler struct { historyConsumerGroup *kafka.MConsumerGroup - chArrays [ChannelNum]chan Cmd2Value - msgDistributionCh chan Cmd2Value redisMessageBatches *batcher.Batcher[sarama.ConsumerMessage] @@ -81,14 +61,21 @@ type OnlineHistoryRedisConsumerHandler struct { func NewOnlineHistoryRedisConsumerHandler(kafkaConf *config.Kafka, database controller.CommonMsgDatabase, conversationRpcClient *rpcclient.ConversationRpcClient, groupRpcClient *rpcclient.GroupRpcClient) (*OnlineHistoryRedisConsumerHandler, error) { - historyConsumerGroup, err := kafka.NewMConsumerGroup(kafkaConf.Build(), kafkaConf.ToRedisGroupID, []string{kafkaConf.ToRedisTopic}) + historyConsumerGroup, err := kafka.NewMConsumerGroup(kafkaConf.Build(), kafkaConf.ToRedisGroupID, []string{kafkaConf.ToRedisTopic}, false) if err != nil { return nil, err } var och OnlineHistoryRedisConsumerHandler och.msgDatabase = database - b := batcher.New[sarama.ConsumerMessage]() + b := batcher.New[sarama.ConsumerMessage]( + batcher.WithSize(size), + batcher.WithWorker(worker), + batcher.WithInterval(interval), + batcher.WithDataBuffer(mainDataBuffer), + batcher.WithSyncWait(true), + batcher.WithBuffer(subChanBuffer), + ) b.Sharding = func(key string) int { hashCode := stringutil.GetHashCode(key) return int(hashCode) % och.redisMessageBatches.Worker() @@ -96,18 +83,8 @@ func NewOnlineHistoryRedisConsumerHandler(kafkaConf *config.Kafka, database cont b.Key = func(consumerMessage *sarama.ConsumerMessage) string { return string(consumerMessage.Key) } + b.Do = och.do och.redisMessageBatches = b - - err = b.Start() - if err != nil { - return nil, err - } - //och.msgDistributionCh = make(chan Cmd2Value) // no buffer channel - //go och.MessagesDistributionHandle() - //for i := 0; i < ChannelNum; i++ { - // och.chArrays[i] = make(chan Cmd2Value, 50) - // go och.Run(i) - //} och.conversationRpcClient = conversationRpcClient och.groupRpcClient = groupRpcClient och.historyConsumerGroup = historyConsumerGroup @@ -250,7 +227,8 @@ func (och *OnlineHistoryRedisConsumerHandler) handleMsg(ctx context.Context, key log.ZDebug(ctx, "success incr to next topic") err = och.msgDatabase.MsgToMongoMQ(ctx, key, conversationID, storageMessageList, lastSeq) if err != nil { - log.ZError(ctx, "MsgToMongoMQ error", err) + log.ZError(ctx, "Msg To MongoDB MQ error", err, "conversationID", + conversationID, "storageList", storageMessageList, "lastSeq", lastSeq) } och.toPushTopic(ctx, key, conversationID, storageList) } @@ -273,7 +251,8 @@ func (och *OnlineHistoryRedisConsumerHandler) handleNotification(ctx context.Con log.ZDebug(ctx, "success to next topic", "conversationID", conversationID) err = och.msgDatabase.MsgToMongoMQ(ctx, key, conversationID, storageMessageList, lastSeq) if err != nil { - log.ZError(ctx, "MsgToMongoMQ error", err) + log.ZError(ctx, "Msg To MongoDB MQ error", err, "conversationID", + conversationID, "storageList", storageMessageList, "lastSeq", lastSeq) } och.toPushTopic(ctx, key, conversationID, storageList) } @@ -310,6 +289,7 @@ func (och *OnlineHistoryRedisConsumerHandler) ConsumeClaim(session sarama.Consum claim.HighWaterMarkOffset(), "topic", claim.Topic(), "partition", claim.Partition()) och.redisMessageBatches.OnComplete = func(lastMessage *sarama.ConsumerMessage, totalCount int) { session.MarkMessage(lastMessage, "") + session.Commit() } for { select { @@ -325,99 +305,8 @@ func (och *OnlineHistoryRedisConsumerHandler) ConsumeClaim(session sarama.Consum if err != nil { log.ZWarn(context.Background(), "put msg to error", err, "msg", msg) } - - session.MarkMessage(msg, "") - case <-session.Context().Done(): return nil } } - - var ( - split = 1000 - rwLock = new(sync.RWMutex) - messages = make([]*sarama.ConsumerMessage, 0, 1000) - ticker = time.NewTicker(time.Millisecond * 100) - - wg = sync.WaitGroup{} - running = new(atomic.Bool) - ) - running.Store(true) - - wg.Add(1) - go func() { - defer wg.Done() - - for { - select { - case <-ticker.C: - // if the buffer is empty and running is false, return loop. - if len(messages) == 0 { - if !running.Load() { - return - } - - continue - } - - rwLock.Lock() - buffer := make([]*sarama.ConsumerMessage, 0, len(messages)) - buffer = append(buffer, messages...) - - // reuse slice, set cap to 0 - messages = messages[:0] - rwLock.Unlock() - - start := time.Now() - ctx := mcontext.WithTriggerIDContext(context.Background(), idutil.OperationIDGenerator()) - log.ZDebug(ctx, "timer trigger msg consumer start", "length", len(buffer)) - for i := 0; i < len(buffer)/split; i++ { - och.msgDistributionCh <- Cmd2Value{Cmd: ConsumerMsgs, Value: TriggerChannelValue{ - ctx: ctx, cMsgList: buffer[i*split : (i+1)*split], - }} - } - if (len(buffer) % split) > 0 { - och.msgDistributionCh <- Cmd2Value{Cmd: ConsumerMsgs, Value: TriggerChannelValue{ - ctx: ctx, cMsgList: buffer[split*(len(buffer)/split):], - }} - } - - log.ZDebug(ctx, "timer trigger msg consumer end", - "length", len(buffer), "time_cost", time.Since(start), - ) - } - } - }() - - wg.Add(1) - go func() { - defer wg.Done() - - for running.Load() { - select { - case msg, ok := <-claim.Messages(): - if !ok { - running.Store(false) - return - } - - if len(msg.Value) == 0 { - continue - } - - rwLock.Lock() - messages = append(messages, msg) - rwLock.Unlock() - - session.MarkMessage(msg, "") - - case <-session.Context().Done(): - running.Store(false) - return - } - } - }() - - wg.Wait() - return nil } diff --git a/internal/msgtransfer/online_msg_to_mongo_handler.go b/internal/msgtransfer/online_msg_to_mongo_handler.go index 978302e76..c9c035893 100644 --- a/internal/msgtransfer/online_msg_to_mongo_handler.go +++ b/internal/msgtransfer/online_msg_to_mongo_handler.go @@ -33,7 +33,7 @@ type OnlineHistoryMongoConsumerHandler struct { } func NewOnlineHistoryMongoConsumerHandler(kafkaConf *config.Kafka, database controller.CommonMsgDatabase) (*OnlineHistoryMongoConsumerHandler, error) { - historyConsumerGroup, err := kafka.NewMConsumerGroup(kafkaConf.Build(), kafkaConf.ToMongoGroupID, []string{kafkaConf.ToMongoTopic}) + historyConsumerGroup, err := kafka.NewMConsumerGroup(kafkaConf.Build(), kafkaConf.ToMongoGroupID, []string{kafkaConf.ToMongoTopic}, true) if err != nil { return nil, err } diff --git a/internal/push/push_handler.go b/internal/push/push_handler.go index 2e236195b..03c299b7a 100644 --- a/internal/push/push_handler.go +++ b/internal/push/push_handler.go @@ -37,6 +37,7 @@ import ( "github.com/openimsdk/tools/utils/datautil" "github.com/openimsdk/tools/utils/jsonutil" "github.com/openimsdk/tools/utils/timeutil" + "github.com/redis/go-redis/v9" "google.golang.org/protobuf/proto" ) @@ -58,7 +59,7 @@ func NewConsumerHandler(config *Config, offlinePusher offlinepush.OfflinePusher, var consumerHandler ConsumerHandler var err error consumerHandler.pushConsumerGroup, err = kafka.NewMConsumerGroup(config.KafkaConfig.Build(), config.KafkaConfig.ToPushGroupID, - []string{config.KafkaConfig.ToPushTopic}) + []string{config.KafkaConfig.ToPushTopic}, true) if err != nil { return nil, err } diff --git a/internal/rpc/group/group.go b/internal/rpc/group/group.go index 2f1df656f..033e3619d 100644 --- a/internal/rpc/group/group.go +++ b/internal/rpc/group/group.go @@ -278,20 +278,6 @@ func (s *groupServer) CreateGroup(ctx context.Context, req *pbgroup.CreateGroupR resp.GroupInfo = convert.Db2PbGroupInfo(group, req.OwnerUserID, uint32(len(userIDs))) resp.GroupInfo.MemberCount = uint32(len(userIDs)) - tips := &sdkws.GroupCreatedTips{ - Group: resp.GroupInfo, - OperationTime: group.CreateTime.UnixMilli(), - GroupOwnerUser: s.groupMemberDB2PB(groupMembers[0], userMap[groupMembers[0].UserID].AppMangerLevel), - } - for _, member := range groupMembers { - member.Nickname = userMap[member.UserID].Nickname - tips.MemberList = append(tips.MemberList, s.groupMemberDB2PB(member, userMap[member.UserID].AppMangerLevel)) - if member.UserID == opUserID { - tips.OpUser = s.groupMemberDB2PB(member, userMap[member.UserID].AppMangerLevel) - break - } - } - tips := &sdkws.GroupCreatedTips{ Group: resp.GroupInfo, OperationTime: group.CreateTime.UnixMilli(), diff --git a/pkg/util/batcher/batcher.go b/pkg/tools/batcher/batcher.go similarity index 88% rename from pkg/util/batcher/batcher.go rename to pkg/tools/batcher/batcher.go index 87d3128d5..632ce512c 100644 --- a/pkg/util/batcher/batcher.go +++ b/pkg/tools/batcher/batcher.go @@ -19,11 +19,12 @@ var ( ) type Config struct { - size int // Number of message aggregations - buffer int // The number of caches running in a single coroutine - worker int // Number of coroutines processed in parallel - interval time.Duration // Time of message aggregations - syncWait bool // Whether to wait synchronously after distributing messages + size int // Number of message aggregations + buffer int // The number of caches running in a single coroutine + dataBuffer int // The size of the main data channel + worker int // Number of coroutines processed in parallel + interval time.Duration // Time of message aggregations + syncWait bool // Whether to wait synchronously after distributing messages } type Option func(c *Config) @@ -58,6 +59,12 @@ func WithSyncWait(wait bool) Option { } } +func WithDataBuffer(size int) Option { + return func(c *Config) { + c.dataBuffer = size + } +} + type Batcher[T any] struct { config *Config @@ -145,7 +152,7 @@ func (b *Batcher[T]) scheduler() { defer func() { ticker.Stop() for _, ch := range b.chArrays { - close(ch) // 发送关闭信号到每个worker + close(ch) } close(b.data) b.wait.Done() @@ -163,15 +170,12 @@ func (b *Batcher[T]) scheduler() { return } if data == nil { - // 接收到nil作为结束信号 - fmt.Println("Batcher Closing1", count) if count > 0 { - fmt.Println("Batcher Closing2", count) b.distributeMessage(vals, count, lastAny) } return } - // 正常数据处理 + key := b.Key(data) vals[key] = append(vals[key], data) lastAny = data @@ -179,7 +183,6 @@ func (b *Batcher[T]) scheduler() { count++ if count >= b.config.size { - fmt.Printf("counter to %d, %v\n", count, lastAny) b.distributeMessage(vals, count, lastAny) vals = make(map[string][]*T) count = 0 @@ -187,7 +190,7 @@ func (b *Batcher[T]) scheduler() { case <-ticker.C: if count > 0 { - fmt.Printf("ticker to %v , %d, %v\n", b.config.interval, count, lastAny) + b.distributeMessage(vals, count, lastAny) vals = make(map[string][]*T) count = 0 diff --git a/pkg/util/batcher/batcher_test.go b/pkg/tools/batcher/batcher_test.go similarity index 100% rename from pkg/util/batcher/batcher_test.go rename to pkg/tools/batcher/batcher_test.go