diff --git a/config/config.dev.yaml b/config/config.dev.yaml index 2b9537bef..4da51d6fb 100644 --- a/config/config.dev.yaml +++ b/config/config.dev.yaml @@ -37,20 +37,23 @@ redis: dbIdleTimeout: 120 dbPassWord: openIM -kafka: + +mq: ws2mschat: + type: kafka addr: [ kafka:9092 ] topic: "ws2ms_chat" ms2pschat: - addr: [ kafka:9092 ] + type: nsq + addr: [ nsq:4161 ] topic: "ms2ps_chat" + channel: "one" consumergroupid: msgToMongo: mongo msgToMySql: mysql msgToPush: push - #---------------Internal service configuration---------------------# # The service ip default is empty, @@ -144,4 +147,4 @@ tokenpolicy: messagecallback: callbackSwitch: false - callbackUrl: "http://www.xxx.com/msg/judge" \ No newline at end of file + callbackUrl: "http://www.xxx.com/msg/judge" diff --git a/go.mod b/go.mod index dff96a634..77d6c1dc9 100644 --- a/go.mod +++ b/go.mod @@ -8,7 +8,6 @@ require ( github.com/antonfisher/nested-logrus-formatter v1.3.0 github.com/bwmarrin/snowflake v0.3.0 github.com/coreos/go-semver v0.3.0 // indirect - github.com/cpuguy83/go-md2man/v2 v2.0.1 // indirect github.com/dustin/go-humanize v1.0.0 // indirect github.com/eapache/go-resiliency v1.2.0 // indirect github.com/eapache/go-xerial-snappy v0.0.0-20180814174437-776d5712da21 // indirect @@ -30,15 +29,16 @@ require ( github.com/mattn/go-sqlite3 v1.14.6 // indirect github.com/mitchellh/mapstructure v1.4.1 github.com/nfnt/resize v0.0.0-20180221191011-83c6a9932646 + github.com/nsqio/go-nsq v1.1.0 github.com/olivere/elastic/v7 v7.0.23 github.com/pierrec/lz4 v2.6.1+incompatible // indirect github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475 // indirect github.com/rifflock/lfshook v0.0.0-20180920164130-b9218ef580f5 github.com/sirupsen/logrus v1.8.1 github.com/stretchr/testify v1.7.0 - github.com/swaggo/files v0.0.0-20210815190702-a29dd2bc99b2 // indirect - github.com/swaggo/gin-swagger v1.3.3 // indirect - github.com/swaggo/swag v1.7.4 // indirect + github.com/swaggo/files v0.0.0-20210815190702-a29dd2bc99b2 + github.com/swaggo/gin-swagger v1.3.3 + github.com/swaggo/swag v1.7.4 github.com/tencentyun/qcloud-cos-sts-sdk v0.0.0-20210325043845-84a0811633ca github.com/tmc/grpc-websocket-proxy v0.0.0-20190109142713-0ad062ec5ee5 // indirect go.etcd.io/etcd v0.0.0-20200402134248-51bdeb39e698 diff --git a/go.sum b/go.sum index 5ae8a67e3..3ddb2947c 100644 --- a/go.sum +++ b/go.sum @@ -32,10 +32,7 @@ github.com/coreos/go-semver v0.3.0 h1:wkHLiw0WNATZnSG7epLsujiMCgPAc9xhjJ4tgnAxmf github.com/coreos/go-semver v0.3.0/go.mod h1:nnelYz7RCh+5ahJtPPxZlU+153eP4D4r3EedlOD2RNk= github.com/coreos/go-systemd/v22 v22.0.0 h1:XJIw/+VlJ+87J+doOxznsAWIdmWuViOVhkQamW5YV28= github.com/coreos/go-systemd/v22 v22.0.0/go.mod h1:xO0FLkIi5MaZafQlIrOotqXZ90ih+1atmu1JpKERPPk= -github.com/cpuguy83/go-md2man/v2 v2.0.0-20190314233015-f79a8a8ca69d h1:U+s90UTSYgptZMwQh2aRr3LuazLJIa+Pg3Kc1ylSYVY= github.com/cpuguy83/go-md2man/v2 v2.0.0-20190314233015-f79a8a8ca69d/go.mod h1:maD7wRr/U5Z6m/iR4s+kqSMx2CaBsrgA7czyZG/E6dU= -github.com/cpuguy83/go-md2man/v2 v2.0.1 h1:r/myEWzV9lfsM1tFLgDyu0atFtJ1fXn261LKYj/3DxU= -github.com/cpuguy83/go-md2man/v2 v2.0.1/go.mod h1:tgQtvFlXSQOSOSIRvRPT7W67SCa46tRHOmNcaadrF8o= github.com/creack/pty v1.1.7/go.mod h1:lj5s0c3V2DBrqTV7llrYr5NG6My20zk30Fl46Y7DoTY= github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= @@ -65,14 +62,12 @@ github.com/frankban/quicktest v1.14.0 h1:+cqqvzZV87b4adx/5ayVOaYZ2CrvM4ejQvUdBzP github.com/frankban/quicktest v1.14.0/go.mod h1:NeW+ay9A/U67EYXNFA1nPE8e/tnQv/09mUdL/ijj8og= github.com/garyburd/redigo v1.6.2 h1:yE/pwKCrbLpLpQICzYTeZ7JsTA/C53wFTJHaEtRqniM= github.com/garyburd/redigo v1.6.2/go.mod h1:NR3MbYisc3/PwhQ00EMzDiPmrwpPxAn5GI05/YaO1SY= -github.com/ghodss/yaml v1.0.0 h1:wQHKEahhL6wmXdzwWG11gIVCkOv05bNOh+Rxn0yngAk= github.com/ghodss/yaml v1.0.0/go.mod h1:4dBDuWmgqj2HViK6kFavaiC9ZROes6MMH2rRYeMEF04= +github.com/gin-contrib/gzip v0.0.3 h1:etUaeesHhEORpZMp18zoOhepboiWnFtXrBZxszWUn4k= github.com/gin-contrib/gzip v0.0.3/go.mod h1:YxxswVZIqOvcHEQpsSn+QF5guQtO1dCfy0shBPy4jFc= github.com/gin-contrib/sse v0.1.0 h1:Y/yl/+YNO8GZSjAhjMsSuLt29uWRFHdHYUb5lYOV9qE= github.com/gin-contrib/sse v0.1.0/go.mod h1:RHrZQHXnP2xjPF+u1gW/2HnVO7nvIa9PG3Gm+fLHvGI= github.com/gin-gonic/gin v1.6.3/go.mod h1:75u5sXoLsGZoRN5Sgbi1eraJ4GU3++wFwWzhwvtwp4M= -github.com/gin-gonic/gin v1.7.0 h1:jGB9xAJQ12AIGNB4HguylppmDK1Am9ppF7XnGXXJuoU= -github.com/gin-gonic/gin v1.7.0/go.mod h1:jD2toBW3GZUr5UMcdrwQA10I7RuaFOl/SGeDjXkfUtY= github.com/gin-gonic/gin v1.7.4 h1:QmUZXrvJ9qZ3GfWvQ+2wnW/1ePrTEJqPKMYEU3lD/DM= github.com/gin-gonic/gin v1.7.4/go.mod h1:jD2toBW3GZUr5UMcdrwQA10I7RuaFOl/SGeDjXkfUtY= github.com/go-kit/kit v0.8.0/go.mod h1:xBxKIO96dXMWWy0MnWVtmwkA9/13aqxPnvrjFYMA2as= @@ -80,16 +75,13 @@ github.com/go-logfmt/logfmt v0.3.0/go.mod h1:Qt1PoO58o5twSAckw1HlFXLmHsOX5/0LbT9 github.com/go-openapi/jsonpointer v0.19.3/go.mod h1:Pl9vOtqEWErmShwVjC8pYs9cog34VGT37dQOVbmoatg= github.com/go-openapi/jsonpointer v0.19.5 h1:gZr+CIYByUqjcgeLXnQu2gHYQC9o73G2XUeOFYEICuY= github.com/go-openapi/jsonpointer v0.19.5/go.mod h1:Pl9vOtqEWErmShwVjC8pYs9cog34VGT37dQOVbmoatg= -github.com/go-openapi/jsonreference v0.19.5 h1:1WJP/wi4OjB4iV8KVbH73rQaoialJrqv8gitZLxGLtM= github.com/go-openapi/jsonreference v0.19.5/go.mod h1:RdybgQwPxbL4UEjuAruzK1x3nE69AqPYEJeo/TWfEeg= github.com/go-openapi/jsonreference v0.19.6 h1:UBIxjkht+AWIgYzCDSv2GN+E/togfwXUJFRTWhl2Jjs= github.com/go-openapi/jsonreference v0.19.6/go.mod h1:diGHMEHg2IqXZGKxqyvWdfWU/aim5Dprw5bqpKkTvns= -github.com/go-openapi/spec v0.20.3 h1:uH9RQ6vdyPSs2pSy9fL8QPspDF2AMIMPtmK5coSSjtQ= github.com/go-openapi/spec v0.20.3/go.mod h1:gG4F8wdEDN+YPBMVnzE85Rbhf+Th2DTvA9nFPQ5AYEg= github.com/go-openapi/spec v0.20.4 h1:O8hJrt0UMnhHcluhIdUgCLRWyM2x7QkBXRvOs7m+O1M= github.com/go-openapi/spec v0.20.4/go.mod h1:faYFR1CvsJZ0mNsmsphTMSoRrNV3TEDoAM7FOEWeq8I= github.com/go-openapi/swag v0.19.5/go.mod h1:POnQmlKehdgb5mhVOsnJFsivZCEZ/vjK9gh66Z9tfKk= -github.com/go-openapi/swag v0.19.14 h1:gm3vOOXfiuw5i9p5N9xJvfjvuofpyvLA9Wr6QfK5Fng= github.com/go-openapi/swag v0.19.14/go.mod h1:QYRuS/SOXUCsnplDa677K7+DxSOj6IPNl/eQntq43wQ= github.com/go-openapi/swag v0.19.15 h1:D2NRCBzS9/pEY3gP9Nl8aDqGUcPFrwG2p+CNFrLyrCM= github.com/go-openapi/swag v0.19.15/go.mod h1:QYRuS/SOXUCsnplDa677K7+DxSOj6IPNl/eQntq43wQ= @@ -132,6 +124,7 @@ github.com/golang/protobuf v1.4.3/go.mod h1:oDoupMAO8OvCJWAcko0GGGIgR6R6ocIYbsSw github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk= github.com/golang/protobuf v1.5.2 h1:ROPKBNFfQgOUMifHyP+KYbvpjbdoFNs+aK7DXlji0Tw= github.com/golang/protobuf v1.5.2/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY= +github.com/golang/snappy v0.0.1/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= github.com/golang/snappy v0.0.4 h1:yAGX7huGHXlcLOEtBnF4w7FQwA26wojNCwOYAEhLjQM= github.com/golang/snappy v0.0.4/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= github.com/google/btree v1.0.0 h1:0udJVsspx3VBr5FwtLhQQtuAsVc79tTq0ocGIPAU6qo= @@ -224,7 +217,10 @@ github.com/modern-go/reflect2 v1.0.1/go.mod h1:bx2lNnkwVCuqBIxFjflWJWanXIb3Rllmb github.com/mwitkow/go-conntrack v0.0.0-20161129095857-cc309e4a2223/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U= github.com/nfnt/resize v0.0.0-20180221191011-83c6a9932646 h1:zYyBkD/k9seD2A7fsi6Oo2LfFZAehjjQMERAvZLEDnQ= github.com/nfnt/resize v0.0.0-20180221191011-83c6a9932646/go.mod h1:jpp1/29i3P1S/RLdc7JQKbRpFeM1dOBd8T9ki5s+AY8= +github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e h1:fD57ERR4JtEqsWbfPhv4DMiApHyliiK5xCTNVSPiaAs= github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e/go.mod h1:zD1mROLANZcx1PVRCS0qkT7pwLkGfwJo4zjcN/Tysno= +github.com/nsqio/go-nsq v1.1.0 h1:PQg+xxiUjA7V+TLdXw7nVrJ5Jbl3sN86EhGCQj4+FYE= +github.com/nsqio/go-nsq v1.1.0/go.mod h1:vKq36oyeVXgsS5Q8YEO7WghqidAVXQlcFxzQbQTuDEY= github.com/olekukonko/tablewriter v0.0.0-20170122224234-a0225b3f23b5/go.mod h1:vsDQFd/mU46D+Z4whnwzcISnGGzXWMclvtLoiIKAKIo= github.com/olivere/elastic/v7 v7.0.23 h1:b7tjMogDMhf2CisGI+L02LXLVa0ZyE82Z15XfW1e8t8= github.com/olivere/elastic/v7 v7.0.23/go.mod h1:OuWmD2DiuYhddWegBKPWQuelVKBLrW0fa/VUYgxuGTY= @@ -257,12 +253,8 @@ github.com/rogpeppe/fastuuid v0.0.0-20150106093220-6724a57986af/go.mod h1:XWv6So github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4= github.com/rogpeppe/go-internal v1.6.1 h1:/FiVV8dS/e+YqF2JvO3yXRFbBLTIuSDkuC7aBOAvL+k= github.com/rogpeppe/go-internal v1.6.1/go.mod h1:xXDCJY+GAPziupqXw64V24skbSoqbTEfhy4qGm1nDQc= -github.com/russross/blackfriday/v2 v2.0.1 h1:lPqVAte+HuHNfhJ/0LC98ESWRz8afy9tM/0RK8m9o+Q= github.com/russross/blackfriday/v2 v2.0.1/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM= -github.com/russross/blackfriday/v2 v2.1.0 h1:JIOH55/0cWyOuilr9/qlrm0BSXldqnqwMsf35Ld67mk= -github.com/russross/blackfriday/v2 v2.1.0/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM= github.com/shopspring/decimal v1.2.0/go.mod h1:DKyhrW/HYNuLGql+MJL6WCR6knT2jwCFRcu2hWCYk4o= -github.com/shurcooL/sanitized_anchor_name v1.0.0 h1:PdmoCO6wvbs+7yrJyMORt4/BmY5IYyJwS/kOiWx8mHo= github.com/shurcooL/sanitized_anchor_name v1.0.0/go.mod h1:1NzhyTcUVG4SuEtjjoZeVRXNmyL/1OwPU0+IJeTBvfc= github.com/sirupsen/logrus v1.2.0/go.mod h1:LxeOpSwHxABJmUn/MG1IvRgCAasNZTLOkJPxbbu5VWo= github.com/sirupsen/logrus v1.4.2/go.mod h1:tLMulIdttU9McNUspp0xgXVQah82FyeX6MwdIuYE2rE= @@ -300,9 +292,7 @@ github.com/ugorji/go v1.1.7 h1:/68gy2h+1mWMrwZFeD1kQialdSzAb432dtpeJ42ovdo= github.com/ugorji/go v1.1.7/go.mod h1:kZn38zHttfInRq0xu/PH0az30d+z6vm202qpg1oXVMw= github.com/ugorji/go/codec v1.1.7 h1:2SvQaVZ1ouYrrKKwoSk2pzd4A9evlKJb9oTL+OaLUSs= github.com/ugorji/go/codec v1.1.7/go.mod h1:Ax+UKWsSmolVDwsd+7N3ZtXu+yMGCf907BLYF3GoBXY= -github.com/urfave/cli v1.20.0 h1:fDqGv3UG/4jbVl/QkFwEdddtEDjh/5Ov6X+0B/3bPaw= github.com/urfave/cli v1.20.0/go.mod h1:70zkFmudgCuE/ngEzBv17Jvp/497gISqfk5gWijbERA= -github.com/urfave/cli/v2 v2.3.0 h1:qph92Y649prgesehzOrQjdWyxFOp/QVM+6imKHad91M= github.com/urfave/cli/v2 v2.3.0/go.mod h1:LJmUH05zAU44vOAcrfzZQKsZbVcdbOG8rtL3/XcUArI= github.com/xiang90/probing v0.0.0-20190116061207-43a291ad63a2 h1:eY9dn8+vbi4tKz5Qo6v2eYzo7kUS51QINcR5jNpbZS8= github.com/xiang90/probing v0.0.0-20190116061207-43a291ad63a2/go.mod h1:UETIi67q53MR2AWcXfiuqkDkRtnGDLqkBTpCHuJHxtU= @@ -337,8 +327,8 @@ golang.org/x/lint v0.0.0-20190227174305-5b3e6a55c961/go.mod h1:wehouNa3lNwaWXcvx golang.org/x/lint v0.0.0-20190930215403-16217165b5de h1:5hukYrvBGR8/eNkX5mdUezrA6JiaEZDtJb9Ei+1LlBs= golang.org/x/lint v0.0.0-20190930215403-16217165b5de/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc= golang.org/x/mod v0.0.0-20190513183733-4bf6d317e70e/go.mod h1:mXi4GBBbnImb6dmsKGUJ2LatrhH/nqhxcFungHvyanc= -golang.org/x/mod v0.3.0 h1:RM4zey1++hCTbCVQfnWeKs9/IEsaBLA8vTkd0WVtmH4= golang.org/x/mod v0.3.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= +golang.org/x/mod v0.4.2 h1:Gz96sIWK3OalVv/I/qNygP42zyoKp3xptRVCWRFEBvo= golang.org/x/mod v0.4.2/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= golang.org/x/net v0.0.0-20180218175443-cbe0f9307d01/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20180724234803-3673e40ba225/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= @@ -357,8 +347,6 @@ golang.org/x/net v0.0.0-20201110031124-69a78807bb2b/go.mod h1:sp8m0HH+o8qH0wwXwY golang.org/x/net v0.0.0-20210119194325-5f4716e94777/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg= golang.org/x/net v0.0.0-20210421230115-4e50805a0758/go.mod h1:72T/g9IO56b78aLF+1Kcs5dz7/ng1VjMUvfKvpfy+jM= golang.org/x/net v0.0.0-20210805182204-aaa1db679c0d/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= -golang.org/x/net v0.0.0-20210917221730-978cfadd31cf h1:R150MpwJIv1MpS0N/pc+NhTM8ajzvlmxlY5OYsrevXQ= -golang.org/x/net v0.0.0-20210917221730-978cfadd31cf/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= golang.org/x/net v0.0.0-20211111160137-58aab5ef257a h1:c83jeVQW0KGKNaKBRfelNYNHaev+qawl9yaA825s8XE= golang.org/x/net v0.0.0-20211111160137-58aab5ef257a/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= @@ -381,7 +369,6 @@ golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7w golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210119212857-b64e53b001e4/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210420072515-93ed5bcd2bfe/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= -golang.org/x/sys v0.0.0-20210423082822-04245dca01da h1:b3NXsE2LusjYGGjL5bxEVZZORm/YEFFrWFjR8eFrw/c= golang.org/x/sys v0.0.0-20210423082822-04245dca01da/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210809222454-d867a43fc93e/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20211112143042-c6105e7cf70d h1:jp6PtFmjL+vGsuzd86xYqaJGv6eXdLvmVGzVVLI6EPI= @@ -391,7 +378,6 @@ golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk= golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/text v0.3.5/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= -golang.org/x/text v0.3.6 h1:aRYxNxv6iGQlyVaZmk6ZgYEDa+Jg18DxebPSrd6bg1M= golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/text v0.3.7 h1:olpwvP2KacW1ZWvsR7uQhoyTYvKAupfQrRGBFM352Gk= golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ= @@ -405,9 +391,6 @@ golang.org/x/tools v0.0.0-20190621195816-6e04913cbbac/go.mod h1:/rFqwRUd4F7ZHNgw golang.org/x/tools v0.0.0-20191029041327-9cc4af7d6b2c/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= golang.org/x/tools v0.0.0-20191029190741-b9c20aec41a5/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= -golang.org/x/tools v0.0.0-20210106214847-113979e3529a h1:CB3a9Nez8M13wwlr/E2YtwoU+qYHKfC+JrDa45RXXoQ= -golang.org/x/tools v0.0.0-20210106214847-113979e3529a/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA= -golang.org/x/tools v0.1.0 h1:po9/4sTYwZU9lPhi1tOrb4hCv3qrhiQ77LZfGa2OjwY= golang.org/x/tools v0.1.0/go.mod h1:xkSsbof2nBLbhDlRMhhhyNLN/zl3eTqcnHD5viDpcZ0= golang.org/x/tools v0.1.7 h1:6j8CgantCy3yc8JGBqkDLMKWqZ0RDU2g1HVgacojGWQ= golang.org/x/tools v0.1.7/go.mod h1:LGqMHiF4EqQNHR1JncWGqT5BVaXmza+X+BDGol+dOxo= @@ -433,8 +416,8 @@ google.golang.org/protobuf v1.26.0 h1:bxAC2xTBsZGibn2RTntX0oH50xLsqy1OxA9tTL3p/l google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc= gopkg.in/alecthomas/kingpin.v2 v2.2.6/go.mod h1:FMv+mEhP44yOT+4EoQTLFTRgOQ1FBLkstjWtayDeSgw= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= -gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127 h1:qIbj1fsPNlZgppZ+VLlY7N33q108Sa+fhmuc+sWQYwY= gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/check.v1 v1.0.0-20200227125254-8fa46927fb4f h1:BLraFXnmrev5lT+xlilqcH8XK9/i0At2xKjWk4p6zsU= gopkg.in/check.v1 v1.0.0-20200227125254-8fa46927fb4f/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/cheggaaa/pb.v1 v1.0.25/go.mod h1:V/YB90LKu/1FcN3WVnfiiE5oMCibMjukxqG/qStrOgw= gopkg.in/errgo.v2 v2.1.0/go.mod h1:hNsd1EY+bozCKY1Ytp96fpM3vjJbqLJn88ws8XvfDNI= @@ -445,7 +428,6 @@ gopkg.in/yaml.v2 v2.0.0-20170812160011-eb3733d160e7/go.mod h1:JAlM8MvJe8wmxCU4Bl gopkg.in/yaml.v2 v2.2.1/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.2.3/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= -gopkg.in/yaml.v2 v2.2.8 h1:obN1ZagJSUGI0Ek/LBmuj4SNLPfIny3KsKFopxRdj10= gopkg.in/yaml.v2 v2.2.8/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.4.0 h1:D8xgwECY7CYvx+Y2n4sBz93Jn9JRvxdiyyo8CTfuKaY= gopkg.in/yaml.v2 v2.4.0/go.mod h1:RDklbk79AGWmwhnvt/jBztapEOGDOx6ZbXqjP6csGnQ= diff --git a/internal/msg_transfer/logic/history_msg_handler.go b/internal/msg_transfer/logic/history_msg_handler.go index c73a7253a..d832ea773 100644 --- a/internal/msg_transfer/logic/history_msg_handler.go +++ b/internal/msg_transfer/logic/history_msg_handler.go @@ -2,7 +2,9 @@ package logic import ( "Open_IM/pkg/common/mq" + "Open_IM/pkg/common/mq/nsq" "context" + "fmt" "strings" "Open_IM/pkg/common/config" @@ -18,16 +20,27 @@ import ( "github.com/golang/protobuf/proto" ) -type fcb func(msg []byte, msgKey string) - type HistoryConsumerHandler struct { historyConsumerGroup mq.Consumer } func (mc *HistoryConsumerHandler) Init() { - mc.historyConsumerGroup = kfk.NewMConsumerGroup(&kfk.MConsumerGroupConfig{KafkaVersion: sarama.V0_10_2_0, - OffsetsInitial: sarama.OffsetNewest, IsReturnErr: false}, config.Config.Kafka.Ws2mschat.Addr, config.Config.Kafka.ConsumerGroupID.MsgToMongo) - mc.historyConsumerGroup.RegisterMessageHandler(config.Config.Kafka.Ws2mschat.Topic, mq.MessageHandleFunc(mc.handleChatWs2Mongo)) + cfg := config.Config.MQ.Ws2mschat + switch cfg.Type { + case "kafka": + mc.historyConsumerGroup = kfk.NewMConsumerGroup(&kfk.MConsumerGroupConfig{KafkaVersion: sarama.V0_10_2_0, + OffsetsInitial: sarama.OffsetNewest, IsReturnErr: false}, cfg.Addr, config.Config.MQ.ConsumerGroupID.MsgToMongo) + case "nsq": + nc, err := nsq.NewNsqConsumer(cfg.Addr, cfg.Topic, cfg.Channel) + if err != nil { + panic(err) + } + mc.historyConsumerGroup = nc + default: + panic(fmt.Sprintf("unsupported mq type: %s", cfg.Type)) + } + + mc.historyConsumerGroup.RegisterMessageHandler(cfg.Topic, mq.MessageHandleFunc(mc.handleChatWs2Mongo)) } func (mc *HistoryConsumerHandler) handleChatWs2Mongo(message *mq.Message) error { @@ -134,7 +147,7 @@ func sendMessageToPush(message *pbMsg.MsgSvrToPushSvrChatMsg) { log.ErrorByKv("rpc dial failed", msg.OperationID, "push data", msg.String()) pid, offset, err := producer.SendMessage(message) if err != nil { - log.ErrorByKv("kafka send failed", msg.OperationID, "send data", message.String(), "pid", pid, "offset", offset, "err", err.Error()) + log.ErrorByKv("mq send failed", msg.OperationID, "send data", message.String(), "pid", pid, "offset", offset, "err", err.Error()) } return } @@ -144,7 +157,7 @@ func sendMessageToPush(message *pbMsg.MsgSvrToPushSvrChatMsg) { log.ErrorByKv("rpc send failed", msg.OperationID, "push data", msg.String(), "err", err.Error()) pid, offset, err := producer.SendMessage(message) if err != nil { - log.ErrorByKv("kafka send failed", msg.OperationID, "send data", message.String(), "pid", pid, "offset", offset, "err", err.Error()) + log.ErrorByKv("mq send failed", msg.OperationID, "send data", message.String(), "pid", pid, "offset", offset, "err", err.Error()) } } else { log.InfoByKv("rpc send success", msg.OperationID, "push data", msg.String()) diff --git a/internal/msg_transfer/logic/init.go b/internal/msg_transfer/logic/init.go index 2d4c02ccb..025670008 100644 --- a/internal/msg_transfer/logic/init.go +++ b/internal/msg_transfer/logic/init.go @@ -5,6 +5,7 @@ import ( "Open_IM/pkg/common/log" "Open_IM/pkg/common/mq" "Open_IM/pkg/common/mq/kafka" + "Open_IM/pkg/common/mq/nsq" ) var ( @@ -17,7 +18,18 @@ func Init() { log.NewPrivateLog(config.Config.ModuleName.MsgTransferName) persistentCH.Init() historyCH.Init() - producer = kafka.NewKafkaProducer(config.Config.Kafka.Ms2pschat.Addr, config.Config.Kafka.Ms2pschat.Topic) + + cfg := config.Config.MQ.Ms2pschat + switch cfg.Type { + case "kafka": + producer = kafka.NewKafkaProducer(cfg.Addr, cfg.Topic) + case "nsq": + p, err := nsq.NewNsqProducer(cfg.Addr[0], cfg.Topic) + if err != nil { + panic(err) + } + producer = p + } } func Run() { diff --git a/internal/msg_transfer/logic/persistent_msg_handler.go b/internal/msg_transfer/logic/persistent_msg_handler.go index 3ad249f40..d9ad96642 100644 --- a/internal/msg_transfer/logic/persistent_msg_handler.go +++ b/internal/msg_transfer/logic/persistent_msg_handler.go @@ -8,6 +8,7 @@ package logic import ( "Open_IM/pkg/common/mq" + "Open_IM/pkg/common/mq/nsq" "strings" "Open_IM/pkg/common/config" @@ -27,11 +28,23 @@ type PersistentConsumerHandler struct { } func (pc *PersistentConsumerHandler) Init() { - pc.persistentConsumerGroup = kfk.NewMConsumerGroup(&kfk.MConsumerGroupConfig{KafkaVersion: sarama.V0_10_2_0, - OffsetsInitial: sarama.OffsetNewest, IsReturnErr: false}, - config.Config.Kafka.Ws2mschat.Addr, config.Config.Kafka.ConsumerGroupID.MsgToMySql) + cfg := config.Config.MQ.Ws2mschat + switch cfg.Type { + case "kafka": + pc.persistentConsumerGroup = kfk.NewMConsumerGroup(&kfk.MConsumerGroupConfig{KafkaVersion: sarama.V0_10_2_0, + OffsetsInitial: sarama.OffsetNewest, IsReturnErr: false}, + cfg.Addr, config.Config.MQ.ConsumerGroupID.MsgToMySql) + case "nsq": + nc, err := nsq.NewNsqConsumer(cfg.Addr, cfg.Topic, cfg.Channel) + if err != nil { + panic(err) + } + pc.persistentConsumerGroup = nc + default: + panic("unsupported mq type: " + cfg.Type) + } - pc.persistentConsumerGroup.RegisterMessageHandler(config.Config.Kafka.Ws2mschat.Topic, mq.MessageHandleFunc(pc.handleChatWs2Mysql)) + pc.persistentConsumerGroup.RegisterMessageHandler(cfg.Topic, mq.MessageHandleFunc(pc.handleChatWs2Mysql)) } func (pc *PersistentConsumerHandler) handleChatWs2Mysql(message *mq.Message) error { diff --git a/internal/push/logic/init.go b/internal/push/logic/init.go index 3c9a66020..71c85dd71 100644 --- a/internal/push/logic/init.go +++ b/internal/push/logic/init.go @@ -11,6 +11,7 @@ import ( "Open_IM/pkg/common/log" "Open_IM/pkg/common/mq" "Open_IM/pkg/common/mq/kafka" + "Open_IM/pkg/common/mq/nsq" "Open_IM/pkg/utils" ) @@ -28,7 +29,17 @@ func Init(rpcPort int) { pushTerminal = []int32{utils.IOSPlatformID, utils.AndroidPlatformID} } func init() { - producer = kafka.NewKafkaProducer(config.Config.Kafka.Ws2mschat.Addr, config.Config.Kafka.Ws2mschat.Topic) + cfg := config.Config.MQ.Ws2mschat + switch cfg.Type { + case "kafka": + producer = kafka.NewKafkaProducer(cfg.Addr, cfg.Topic) + case "nsq": + p, err := nsq.NewNsqProducer(cfg.Addr[0], cfg.Topic) + if err != nil { + panic(err) + } + producer = p + } } func Run() { diff --git a/internal/push/logic/push_handler.go b/internal/push/logic/push_handler.go index 643b30082..e2ffb88d3 100644 --- a/internal/push/logic/push_handler.go +++ b/internal/push/logic/push_handler.go @@ -11,6 +11,7 @@ import ( "Open_IM/pkg/common/log" "Open_IM/pkg/common/mq" kfk "Open_IM/pkg/common/mq/kafka" + "Open_IM/pkg/common/mq/nsq" pbChat "Open_IM/pkg/proto/chat" pbRelay "Open_IM/pkg/proto/relay" "github.com/Shopify/sarama" @@ -22,15 +23,25 @@ type PushConsumerHandler struct { } func (ms *PushConsumerHandler) Init() { - ms.pushConsumerGroup = kfk.NewMConsumerGroup(&kfk.MConsumerGroupConfig{KafkaVersion: sarama.V0_10_2_0, - OffsetsInitial: sarama.OffsetNewest, IsReturnErr: false}, config.Config.Kafka.Ms2pschat.Addr, - config.Config.Kafka.ConsumerGroupID.MsgToPush) + cfg:= config.Config.MQ.Ms2pschat + switch cfg.Type { + case "kafka": + ms.pushConsumerGroup = kfk.NewMConsumerGroup(&kfk.MConsumerGroupConfig{KafkaVersion: sarama.V0_10_2_0, + OffsetsInitial: sarama.OffsetNewest, IsReturnErr: false}, cfg.Addr, + config.Config.MQ.ConsumerGroupID.MsgToPush) + case "nsq": + nc, err := nsq.NewNsqConsumer(cfg.Addr,cfg.Topic,cfg.Channel) + if err != nil { + panic(err) + } + ms.pushConsumerGroup = nc + } - ms.pushConsumerGroup.RegisterMessageHandler(config.Config.Kafka.Ms2pschat.Topic, mq.MessageHandleFunc(ms.handleMs2PsChat)) + ms.pushConsumerGroup.RegisterMessageHandler(cfg.Topic, mq.MessageHandleFunc(ms.handleMs2PsChat)) } func (ms *PushConsumerHandler) handleMs2PsChat(message *mq.Message) error { msg := message.Value - log.InfoByKv("msg come from kafka And push!!!", "", "msg", string(msg)) + log.InfoByKv("msg come from mq And push!!!", "", "msg", string(msg)) pbData := pbChat.MsgSvrToPushSvrChatMsg{} if err := proto.Unmarshal(msg, &pbData); err != nil { log.ErrorByKv("push Unmarshal msg err", "", "msg", string(msg), "err", err.Error()) diff --git a/internal/push/logic/push_to_client.go b/internal/push/logic/push_to_client.go index e6754212c..694b182fd 100644 --- a/internal/push/logic/push_to_client.go +++ b/internal/push/logic/push_to_client.go @@ -116,8 +116,8 @@ func SendMsgByWS(m *pbChat.WSToMsgSvrChatMsg) { m.ClientMsgID = m.MsgID switch m.SessionType { case constant.SingleChatType: - sendMsgToKafka(m, m.SendID, "msgKey--sendID") - sendMsgToKafka(m, m.RecvID, "msgKey--recvID") + sendMsgToMQ(m, m.SendID, "msgKey--sendID") + sendMsgToMQ(m, m.RecvID, "msgKey--recvID") case constant.GroupChatType: etcdConn := getcdv3.GetGroupConn() client := pbGroup.NewGroupClient(etcdConn) @@ -138,17 +138,17 @@ func SendMsgByWS(m *pbChat.WSToMsgSvrChatMsg) { groupID := m.RecvID for i, v := range reply.MemberList { m.RecvID = v.UserId + " " + groupID - sendMsgToKafka(m, utils.IntToString(i), "msgKey--recvID+\" \"+groupID") + sendMsgToMQ(m, utils.IntToString(i), "msgKey--recvID+\" \"+groupID") } default: } } -func sendMsgToKafka(m *pbChat.WSToMsgSvrChatMsg, key string, flag string) { +func sendMsgToMQ(m *pbChat.WSToMsgSvrChatMsg, key string, flag string) { pid, offset, err := producer.SendMessage(m, key) if err != nil { - log.ErrorByKv("kafka send failed", m.OperationID, "send data", m.String(), "pid", pid, "offset", offset, "err", err.Error(), flag, key) + log.ErrorByKv("mq send failed", m.OperationID, "send data", m.String(), "pid", pid, "offset", offset, "err", err.Error(), flag, key) } } diff --git a/internal/rpc/chat/rpcChat.go b/internal/rpc/chat/rpcChat.go index e220a1860..d897b59f3 100644 --- a/internal/rpc/chat/rpcChat.go +++ b/internal/rpc/chat/rpcChat.go @@ -1,6 +1,7 @@ package chat import ( + "Open_IM/pkg/common/mq/nsq" "net" "strconv" "strings" @@ -32,7 +33,17 @@ func NewRpcChatServer(port int) *rpcChat { etcdSchema: config.Config.Etcd.EtcdSchema, etcdAddr: config.Config.Etcd.EtcdAddr, } - rc.producer = kafka.NewKafkaProducer(config.Config.Kafka.Ws2mschat.Addr, config.Config.Kafka.Ws2mschat.Topic) + cfg := config.Config.MQ.Ws2mschat + switch cfg.Type { + case "kafka": + rc.producer = kafka.NewKafkaProducer(cfg.Addr, cfg.Topic) + case "nsq": + p, err := nsq.NewNsqProducer(cfg.Addr[0], cfg.Topic) + if err != nil { + panic(err) + } + rc.producer = p + } return &rc } diff --git a/internal/rpc/chat/send_msg.go b/internal/rpc/chat/send_msg.go index 3c31ab2dc..985ef014d 100644 --- a/internal/rpc/chat/send_msg.go +++ b/internal/rpc/chat/send_msg.go @@ -95,10 +95,10 @@ func (rpc *rpcChat) UserSendMsg(_ context.Context, pb *pbChat.UserSendMsgReq) (* } switch pbData.SessionType { case constant.SingleChatType: - err1 := rpc.sendMsgToKafka(&pbData, pbData.RecvID) - err2 := rpc.sendMsgToKafka(&pbData, pbData.SendID) + err1 := rpc.sendMsgToMQ(&pbData, pbData.RecvID) + err2 := rpc.sendMsgToMQ(&pbData, pbData.SendID) if err1 != nil || err2 != nil { - return returnMsg(&replay, pb, 201, "kafka send msg err", "", 0) + return returnMsg(&replay, pb, 201, "mq send msg err", "", 0) } return returnMsg(&replay, pb, 0, "", serverMsgID, pbData.SendTime) case constant.GroupChatType: @@ -144,16 +144,16 @@ func (rpc *rpcChat) UserSendMsg(_ context.Context, pb *pbChat.UserSendMsgReq) (* groupID := pbData.RecvID for i, v := range reply.MemberList { pbData.RecvID = v.UserId + " " + groupID - err := rpc.sendMsgToKafka(&pbData, utils.IntToString(i)) + err := rpc.sendMsgToMQ(&pbData, utils.IntToString(i)) if err != nil { - return returnMsg(&replay, pb, 201, "kafka send msg err", "", 0) + return returnMsg(&replay, pb, 201, "mq send msg err", "", 0) } } for i, v := range addUidList { pbData.RecvID = v + " " + groupID - err := rpc.sendMsgToKafka(&pbData, utils.IntToString(i+1)) + err := rpc.sendMsgToMQ(&pbData, utils.IntToString(i+1)) if err != nil { - return returnMsg(&replay, pb, 201, "kafka send msg err", "", 0) + return returnMsg(&replay, pb, 201, "mq send msg err", "", 0) } } return returnMsg(&replay, pb, 0, "", serverMsgID, pbData.SendTime) @@ -163,10 +163,10 @@ func (rpc *rpcChat) UserSendMsg(_ context.Context, pb *pbChat.UserSendMsgReq) (* } } -func (rpc *rpcChat) sendMsgToKafka(m *pbChat.WSToMsgSvrChatMsg, key string) error { +func (rpc *rpcChat) sendMsgToMQ(m *pbChat.WSToMsgSvrChatMsg, key string) error { pid, offset, err := rpc.producer.SendMessage(m, key) if err != nil { - log.ErrorByKv("kafka send failed", m.OperationID, "send data", m.String(), "pid", pid, "offset", offset, "err", err.Error(), "key", key) + log.ErrorByKv("mq send failed", m.OperationID, "send data", m.String(), "pid", pid, "offset", offset, "err", err.Error(), "key", key) } return err } diff --git a/pkg/common/config/config.go b/pkg/common/config/config.go index d26ec0204..a3b850e3f 100644 --- a/pkg/common/config/config.go +++ b/pkg/common/config/config.go @@ -2,8 +2,8 @@ package config import ( "Open_IM/pkg/base" - "io/ioutil" "gopkg.in/yaml.v3" + "io/ioutil" //"path/filepath" //"runtime" ) @@ -129,21 +129,25 @@ type config struct { AppManagerUid []string `yaml:"appManagerUid"` Secrets []string `yaml:"secrets"` } - Kafka struct { + MQ struct { Ws2mschat struct { + Type string `yaml:"type"` Addr []string `yaml:"addr"` Topic string `yaml:"topic"` + Channel string `yaml:"channel"` } Ms2pschat struct { + Type string `yaml:"type"` Addr []string `yaml:"addr"` Topic string `yaml:"topic"` + Channel string `yaml:"channel"` } ConsumerGroupID struct { MsgToMongo string `yaml:"msgToMongo"` MsgToMySql string `yaml:"msgToMySql"` MsgToPush string `yaml:"msgToPush"` } - } + } `yaml:"mq"` Secret string `yaml:"secret"` MultiLoginPolicy struct { OnlyOneTerminalAccess bool `yaml:"onlyOneTerminalAccess"` @@ -161,11 +165,11 @@ type config struct { } func init() { - confDir:= base.ConfigDir() - logDir:= base.LogDir() + confDir := base.ConfigDir() + logDir := base.LogDir() // fix log dir - Config.Log.StorageLocation= logDir + Config.Log.StorageLocation = logDir // if we cd Open-IM-Server/src/utils and run go test // it will panic cannot find config/config.yaml diff --git a/pkg/common/mq/consumer.go b/pkg/common/mq/consumer.go index 6576acc33..aea874a54 100644 --- a/pkg/common/mq/consumer.go +++ b/pkg/common/mq/consumer.go @@ -1,7 +1,5 @@ package mq -import "time" - type Consumer interface { // RegisterMessageHandler is used to register message handler // any received messages will be passed to handler to process @@ -23,18 +21,3 @@ type MessageHandleFunc func(msg *Message) error func (fn MessageHandleFunc) HandleMessage(msg *Message) error { return fn(msg) } - -type Message struct { - Key, Value []byte - Topic string - Partition int32 - Offset int64 - Timestamp time.Time // only set if kafka is version 0.10+, inner message timestamp - BlockTimestamp time.Time // only set if kafka is version 0.10+, outer (compressed) block timestamp - Headers []*RecordHeader // only set if kafka is version 0.11+ -} - -type RecordHeader struct { - Key []byte - Value []byte -} diff --git a/pkg/common/mq/message.go b/pkg/common/mq/message.go new file mode 100644 index 000000000..3be19a505 --- /dev/null +++ b/pkg/common/mq/message.go @@ -0,0 +1,18 @@ +package mq + +import "time" + +type Message struct { + Key, Value []byte + Topic string + Partition int32 + Offset int64 + Timestamp time.Time + BlockTimestamp time.Time + Headers []*RecordHeader +} + +type RecordHeader struct { + Key []byte + Value []byte +} diff --git a/pkg/common/mq/nsq/consumer.go b/pkg/common/mq/nsq/consumer.go new file mode 100644 index 000000000..86e096aac --- /dev/null +++ b/pkg/common/mq/nsq/consumer.go @@ -0,0 +1,66 @@ +package nsq + +import ( + "Open_IM/pkg/common/mq" + "github.com/nsqio/go-nsq" +) + +type nsqConsumer struct { + lookupAddrs []string + topic string + + handlers []mq.MessageHandler + consumer *nsq.Consumer +} + +var _ mq.Consumer = (*nsqConsumer)(nil) + +func NewNsqConsumer(lookupAddrs []string, topic, channel string) (*nsqConsumer, error) { + config := nsq.NewConfig() + consumer, err := nsq.NewConsumer(topic, channel, config) + if err != nil { + return nil, err + } + + nc := &nsqConsumer{ + lookupAddrs: lookupAddrs, + topic: topic, + handlers: make([]mq.MessageHandler, 0), + consumer: consumer, + } + + consumer.AddHandler(nsq.HandlerFunc(nc.consume)) + + return nc, nil +} + +func (c *nsqConsumer) RegisterMessageHandler(topic string, handler mq.MessageHandler) { + if topic != c.topic { + return + } + c.handlers = append(c.handlers, handler) +} + +func (c *nsqConsumer) consume(msg *nsq.Message) error { + for _, handler := range c.handlers { + if err := handler.HandleMessage(&mq.Message{ + Value: msg.Body, + }); err != nil { + return err + } + } + msg.Finish() + + return nil +} + +func (c *nsqConsumer) Start() error { + + if err := c.consumer.ConnectToNSQLookupds(c.lookupAddrs); err != nil { + return err + } + + <-c.consumer.StopChan + + return nil +} diff --git a/pkg/common/mq/nsq/producer.go b/pkg/common/mq/nsq/producer.go new file mode 100644 index 000000000..fba8c6d21 --- /dev/null +++ b/pkg/common/mq/nsq/producer.go @@ -0,0 +1,37 @@ +package nsq + +import ( + "Open_IM/pkg/common/mq" + "github.com/golang/protobuf/proto" + "github.com/nsqio/go-nsq" +) + +type nsqProducer struct { + topic string + + producer *nsq.Producer +} + +var _ mq.Producer = (*nsqProducer)(nil) + +func NewNsqProducer(addr string, topic string) (*nsqProducer, error) { + config := nsq.NewConfig() + producer, err := nsq.NewProducer(addr, config) + if err != nil { + return nil, err + } + + return &nsqProducer{ + topic: topic, + producer: producer, + }, nil + +} + +func (p *nsqProducer) SendMessage(message proto.Message, key ...string) (int32, int64, error) { + bytes, err := proto.Marshal(message) + if err != nil { + return 0, 0, err + } + return 0, 0, p.producer.Publish(p.topic, bytes) +}