From 5be441f4fd50d268f093988bdef1267b811c4b5f Mon Sep 17 00:00:00 2001 From: wangchuxiao Date: Mon, 20 Feb 2023 10:13:29 +0800 Subject: [PATCH] errcode --- cmd/api/main.go | 7 +- cmd/push/main.go | 6 +- config/config.yaml | 6 - docker-compose.yaml | 17 -- go.mod | 4 +- go.sum | 1 - internal/crontask/clear_msg.go | 64 ++--- .../msgtransfer/online_history_msg_handler.go | 2 +- internal/push/{logic => }/callback.go | 48 ++-- internal/push/fcm/push.go | 61 ++--- internal/push/getui/body.go | 142 ++++++++++ internal/push/getui/push.go | 248 ++++-------------- internal/push/{logic => }/init.go | 19 +- .../jpush/{requestbody => body}/audience.go | 3 +- .../jpush/{requestbody => body}/message.go | 2 +- .../{requestbody => body}/notification.go | 2 +- .../jpush/{requestbody => body}/options.go | 2 +- .../jpush/{requestbody => body}/platform.go | 2 +- .../jpush/{requestbody => body}/pushobj.go | 2 +- internal/push/jpush/common/JGPlatform.go | 13 - internal/push/jpush/push.go | 30 ++- internal/push/mobpush/common/getSign.go | 18 -- internal/push/mobpush/push.go | 73 ------ .../push/mobpush/requestparams/pushForward.go | 14 - .../push/mobpush/requestparams/pushNotify.go | 26 -- .../push/mobpush/requestparams/pushObj.go | 28 -- .../push/mobpush/requestparams/pushTarget.go | 13 - internal/push/{logic => }/push_handler.go | 25 +- internal/push/push_interface.go | 12 +- internal/push/{logic => }/push_rpc_server.go | 3 +- internal/push/{logic => }/push_to_client.go | 32 +-- .../sdk/tpns-server-sdk-go/go/auth/auth.go | 62 ----- internal/push/{logic => }/tpns.go | 2 +- internal/rpc/conversation/conversaion.go | 4 +- pkg/callbackstruct/push.go | 17 +- pkg/common/http/http_client.go | 23 +- pkg/discoveryregistry/discovery_register.go | 1 + 37 files changed, 364 insertions(+), 670 deletions(-) rename internal/push/{logic => }/callback.go (88%) create mode 100644 internal/push/getui/body.go rename internal/push/{logic => }/init.go (69%) rename internal/push/jpush/{requestbody => body}/audience.go (97%) rename internal/push/jpush/{requestbody => body}/message.go (96%) rename internal/push/jpush/{requestbody => body}/notification.go (98%) rename internal/push/jpush/{requestbody => body}/options.go (87%) rename internal/push/jpush/{requestbody => body}/platform.go (98%) rename internal/push/jpush/{requestbody => body}/pushobj.go (96%) delete mode 100644 internal/push/jpush/common/JGPlatform.go delete mode 100644 internal/push/mobpush/common/getSign.go delete mode 100644 internal/push/mobpush/push.go delete mode 100644 internal/push/mobpush/requestparams/pushForward.go delete mode 100644 internal/push/mobpush/requestparams/pushNotify.go delete mode 100644 internal/push/mobpush/requestparams/pushObj.go delete mode 100644 internal/push/mobpush/requestparams/pushTarget.go rename internal/push/{logic => }/push_handler.go (66%) rename internal/push/{logic => }/push_rpc_server.go (98%) rename internal/push/{logic => }/push_to_client.go (93%) delete mode 100644 internal/push/sdk/tpns-server-sdk-go/go/auth/auth.go rename internal/push/{logic => }/tpns.go (98%) diff --git a/cmd/api/main.go b/cmd/api/main.go index a9430f3c0..1e3f6d918 100644 --- a/cmd/api/main.go +++ b/cmd/api/main.go @@ -1,7 +1,6 @@ package main import ( - _ "Open_IM/cmd/open_im_api/docs" apiAuth "Open_IM/internal/api/auth" "Open_IM/internal/api/conversation" "Open_IM/internal/api/friend" @@ -14,7 +13,6 @@ import ( "Open_IM/pkg/common/log" "Open_IM/pkg/common/middleware" "Open_IM/pkg/common/tokenverify" - "Open_IM/pkg/utils" "flag" "fmt" @@ -25,10 +23,9 @@ import ( swaggerFiles "github.com/swaggo/files" ginSwagger "github.com/swaggo/gin-swagger" - "github.com/gin-gonic/gin" - //"syscall" "Open_IM/pkg/common/constant" - prome "Open_IM/pkg/common/prometheus" + "Open_IM/pkg/common/prome" + "github.com/gin-gonic/gin" ) // @title open-IM-Server API diff --git a/cmd/push/main.go b/cmd/push/main.go index 8bcb23081..9658d4249 100644 --- a/cmd/push/main.go +++ b/cmd/push/main.go @@ -1,7 +1,7 @@ package main import ( - "Open_IM/internal/push/logic" + "Open_IM/internal/push" "Open_IM/pkg/common/config" "Open_IM/pkg/common/constant" "Open_IM/pkg/common/log" @@ -19,7 +19,7 @@ func main() { wg.Add(1) log.NewPrivateLog(constant.LogFileName) fmt.Println("start push rpc server, port: ", *rpcPort, ", OpenIM version: ", constant.CurrentVersion, "\n") - logic.Init(*rpcPort) - logic.Run(*prometheusPort) + push.Init(*rpcPort) + push.Run(*prometheusPort) wg.Wait() } diff --git a/config/config.yaml b/config/config.yaml index a6ddea70b..771ea9666 100644 --- a/config/config.yaml +++ b/config/config.yaml @@ -213,12 +213,6 @@ push: fcm: #firebase cloud message 消息推送 serviceAccount: "openim-5c6c0-firebase-adminsdk-ppwol-8765884a78.json" #帐号文件,此处需要改修配置,并且这个文件放在 config目录下 enable: false - mob: #袤博推送 - appKey: #帐号文件,此处需要改修配置,并且这个文件放在 config目录下 - pushUrl: - scheme: - appSecret: - enable: false manager: #app管理员userID和对应的secret 建议修改。 用于管理后台登录,也可以用户管理后台对应的api diff --git a/docker-compose.yaml b/docker-compose.yaml index 4188bbdf8..0abdab9ad 100644 --- a/docker-compose.yaml +++ b/docker-compose.yaml @@ -34,7 +34,6 @@ services: - MONGO_INITDB_DATABASE=openIM - MONGO_USERNAME=${USER} - MONGO_PASSWORD=${PASSWORD} - # restart: always redis: @@ -84,20 +83,6 @@ services: depends_on: - zookeeper - etcd: - image: quay.io/coreos/etcd - ports: - - 2379:2379 - - 2380:2380 - container_name: etcd - volumes: - - /etc/timezone:/etc/timezone - - /etc/localtime:/etc/localtime - environment: - ETCDCTL_API: 3 - restart: always - command: /usr/local/bin/etcd --name etcd0 --data-dir /etcd-data --listen-client-urls http://0.0.0.0:2379 --advertise-client-urls http://0.0.0.0:2379 --listen-peer-urls http://0.0.0.0:2380 --initial-advertise-peer-urls http://0.0.0.0:2380 --initial-cluster etcd0=http://0.0.0.0:2380 --initial-cluster-token tkn --initial-cluster-state new - minio: image: minio/minio ports: @@ -129,7 +114,6 @@ services: - mysql - mongodb - redis - - etcd - minio network_mode: "host" logging: @@ -149,7 +133,6 @@ services: - mysql - mongodb - redis - - etcd - minio - open_im_server network_mode: "host" diff --git a/go.mod b/go.mod index 1f50db214..ccc587ea4 100644 --- a/go.mod +++ b/go.mod @@ -1,6 +1,6 @@ module Open_IM -go 1.18 +go 1.16 require ( firebase.google.com/go v3.13.0+incompatible @@ -49,14 +49,12 @@ require ( google.golang.org/api v0.103.0 google.golang.org/grpc v1.52.3 google.golang.org/protobuf v1.28.1 - gopkg.in/mgo.v2 v2.0.0-20190816093944-a6b53ec6cb22 gopkg.in/yaml.v3 v3.0.1 gorm.io/driver/mysql v1.3.5 gorm.io/gorm v1.23.8 ) require ( - github.com/envoyproxy/protoc-gen-validate v0.1.0 github.com/go-openapi/spec v0.20.6 // indirect github.com/go-openapi/swag v0.21.1 // indirect github.com/go-playground/locales v0.14.1 // indirect diff --git a/go.sum b/go.sum index 4ca367233..805ab32e1 100644 --- a/go.sum +++ b/go.sum @@ -510,7 +510,6 @@ github.com/envoyproxy/go-control-plane v0.9.9-0.20210217033140-668b12f5399d/go.m github.com/envoyproxy/go-control-plane v0.9.9-0.20210512163311-63b5d3c536b0/go.mod h1:hliV/p42l8fGbc6Y9bQ70uLwIvmJyVE5k4iMKlh8wCQ= github.com/envoyproxy/go-control-plane v0.9.10-0.20210907150352-cf90f659a021/go.mod h1:AFq3mo9L8Lqqiid3OhADV3RfLJnjiw63cSpi+fDTRC0= github.com/envoyproxy/go-control-plane v0.10.2-0.20220325020618-49ff273808a1/go.mod h1:KJwIaB5Mv44NWtYuAOFCVOjcI94vtpEz2JU/D2v6IjE= -github.com/envoyproxy/protoc-gen-validate v0.1.0 h1:EQciDnbrYxy13PgWoY8AqoxGiPrpgBZ1R8UNe3ddc+A= github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7+kN2VEUnK/pcBlmesArF7c= github.com/fatih/structs v1.1.0 h1:Q7juDM0QtcnhCpeyLGQKyg4TOIghuNXrkL32pHAUMxo= github.com/fatih/structs v1.1.0/go.mod h1:9NiDSp5zOcgEDl+j00MP/WkGVPOlPRLejGD8Ga6PJ7M= diff --git a/internal/crontask/clear_msg.go b/internal/crontask/clear_msg.go index 296d125e7..c9eaf129e 100644 --- a/internal/crontask/clear_msg.go +++ b/internal/crontask/clear_msg.go @@ -11,7 +11,6 @@ import ( "math" ) - type ClearMsgTool struct { msgInterface controller.MsgInterface userInterface controller.UserInterface @@ -46,15 +45,16 @@ func (c *ClearMsgTool) ClearAll() { func (c *ClearMsgTool) ClearUsersMsg(ctx context.Context, userIDList []string) { for _, userID := range userIDList { - if err := c.msgInterface.DeleteUserMsgsAndSetMinSeq(ctx, userID, int64(config.Config.Mongo.DBRetainChatRecords * 24 *60 *60)); err != nil { + if err := c.msgInterface.DeleteUserMsgsAndSetMinSeq(ctx, userID, int64(config.Config.Mongo.DBRetainChatRecords*24*60*60)); err != nil { log.NewError(tracelog.GetOperationID(ctx), utils.GetSelfFuncName(), err.Error(), userID) } - minSeqMongo, maxSeqMongo, minSeqCache, maxSeqCache, err := c.msgInterface.GetUserMinMaxSeqInMongoAndCache(ctx, userID) + _, maxSeqMongo, minSeqCache, maxSeqCache, err := c.msgInterface.GetUserMinMaxSeqInMongoAndCache(ctx, userID) if err != nil { log.NewError(tracelog.GetOperationID(ctx), utils.GetSelfFuncName(), err.Error(), "GetUserMinMaxSeqInMongoAndCache failed", userID) continue } - if + c.FixUserSeq(ctx, userID, minSeqCache, maxSeqCache) + c.CheckMaxSeqWithMongo(ctx, userID, maxSeqCache, maxSeqMongo, constant.WriteDiffusion) } } @@ -65,39 +65,41 @@ func (c *ClearMsgTool) ClearSuperGroupMsg(ctx context.Context, workingGroupIDLis log.NewError(tracelog.GetOperationID(ctx), utils.GetSelfFuncName(), "FindGroupMemberUserID", err.Error(), groupID) continue } - if err := c.msgInterface.DeleteUserSuperGroupMsgsAndSetMinSeq(ctx, groupID, userIDs, int64(config.Config.Mongo.DBRetainChatRecords * 24 *60 *60)); err != nil { - //log.NewError(operationID, utils.GetSelfFuncName(), err.Error(), groupID, userIDList) + if err := c.msgInterface.DeleteUserSuperGroupMsgsAndSetMinSeq(ctx, groupID, userIDs, int64(config.Config.Mongo.DBRetainChatRecords*24*60*60)); err != nil { + log.NewError(tracelog.GetOperationID(ctx), utils.GetSelfFuncName(), err.Error(), "DeleteUserSuperGroupMsgsAndSetMinSeq failed", groupID, userIDs, config.Config.Mongo.DBRetainChatRecords) } - minSeqMongo, maxSeqMongo, minSeqCache, maxSeqCache, err := c.msgInterface.GetSuperGroupMinMaxSeqInMongoAndCache(ctx, groupID) - + _, maxSeqMongo, maxSeqCache, err := c.msgInterface.GetSuperGroupMinMaxSeqInMongoAndCache(ctx, groupID) + if err != nil { + log.NewError(tracelog.GetOperationID(ctx), utils.GetSelfFuncName(), err.Error(), "GetUserMinMaxSeqInMongoAndCache failed", groupID) + continue + } + c.FixGroupUserSeq(ctx, userIDs, groupID) + c.CheckMaxSeqWithMongo(ctx, groupID, maxSeqCache, maxSeqMongo, constant.WriteDiffusion) } } -func (c *ClearMsgTool) checkMaxSeqWithMongo(ctx context.Context, sourceID string, diffusionType int) error { - var seqRedis uint64 - var err error - if diffusionType == constant.WriteDiffusion { - seqRedis, err = db.DB.GetUserMaxSeq(sourceID) - } else { - seqRedis, err = db.DB.GetGroupMaxSeq(sourceID) - } - if err != nil { - if err == goRedis.Nil { - return nil +func (c *ClearMsgTool) FixUserSeq(ctx context.Context, userID string, minSeqCache, maxSeqCache int64) { + if minSeqCache > maxSeqCache { + if err := c.msgInterface.SetUserMinSeq(ctx, userID, maxSeqCache); err != nil { + log.NewError(tracelog.GetOperationID(ctx), "SetUserMinSeq failed", userID, minSeqCache, maxSeqCache) + } else { + log.NewWarn(tracelog.GetOperationID(ctx), "SetUserMinSeq success", userID, minSeqCache, maxSeqCache) } - return utils.Wrap(err, "GetUserMaxSeq failed") } - msg, err := db.DB.GetNewestMsg(sourceID) - if err != nil { - return utils.Wrap(err, "GetNewestMsg failed") - } - if msg == nil { - return nil +} + +func (c *ClearMsgTool) FixGroupUserSeq(ctx context.Context, userID string, groupID string, minSeqCache, maxSeqCache int64) { + if minSeqCache > maxSeqCache { + if err := c.msgInterface.SetGroupUserMinSeq(ctx, groupID, userID, maxSeqCache); err != nil { + log.NewError(tracelog.GetOperationID(ctx), "SetGroupUserMinSeq failed", userID, minSeqCache, maxSeqCache) + } else { + log.NewWarn(tracelog.GetOperationID(ctx), "SetGroupUserMinSeq success", userID, minSeqCache, maxSeqCache) + } } - if math.Abs(float64(msg.Seq-uint32(seqRedis))) > 10 { - log.NewWarn(operationID, utils.GetSelfFuncName(), "seqMongo, seqRedis", msg.Seq, seqRedis, sourceID, "redis maxSeq is different with msg.Seq > 10", "status: ", msg.Status, msg.SendTime) - } else { - log.NewInfo(operationID, utils.GetSelfFuncName(), "seqMongo, seqRedis", msg.Seq, seqRedis, sourceID, "seq and msg OK", "status:", msg.Status, msg.SendTime) +} + +func (c *ClearMsgTool) CheckMaxSeqWithMongo(ctx context.Context, sourceID string, maxSeqCache, maxSeqMongo int64, diffusionType int) { + if math.Abs(float64(maxSeqMongo-maxSeqCache)) > 10 { + log.NewWarn(tracelog.GetOperationID(ctx), "cache max seq and mongo max seq is diff > 10", sourceID, maxSeqCache, maxSeqMongo, diffusionType) } - return nil } diff --git a/internal/msgtransfer/online_history_msg_handler.go b/internal/msgtransfer/online_history_msg_handler.go index f4c4fd09d..b533afd46 100644 --- a/internal/msgtransfer/online_history_msg_handler.go +++ b/internal/msgtransfer/online_history_msg_handler.go @@ -261,7 +261,7 @@ func (och *OnlineHistoryRedisConsumerHandler) sendMessageToModifyMQ(ctx context. } } -func (och *OnlineHistoryRedisConsumerHandler) SendMessageToMongoCH(ctx context.Context, aggregationID string, triggerID string, messages []*pbMsg.MsgDataToMQ, lastSeq uint64) { +func (och *OnlineHistoryRedisConsumerHandler) SendMessageToMongoCH(ctx context.Context, aggregationID string, triggerID string, messages []*pbMsg.MsgDataToMQ, lastSeq int64) { if len(messages) > 0 { pid, offset, err := och.producerToMongo.SendMessage(&pbMsg.MsgDataToMongoByMQ{LastSeq: lastSeq, AggregationID: aggregationID, MessageList: messages, TriggerID: triggerID}, aggregationID, triggerID) if err != nil { diff --git a/internal/push/logic/callback.go b/internal/push/callback.go similarity index 88% rename from internal/push/logic/callback.go rename to internal/push/callback.go index 68f99ea5b..d0777b2fa 100644 --- a/internal/push/logic/callback.go +++ b/internal/push/callback.go @@ -1,7 +1,7 @@ -package logic +package push import ( - cbapi "Open_IM/pkg/callbackstruct" + "Open_IM/pkg/callbackstruct" "Open_IM/pkg/common/config" "Open_IM/pkg/common/constant" "Open_IM/pkg/common/http" @@ -15,13 +15,13 @@ func url() string { return config.Config.Callback.CallbackUrl } -func CallbackOfflinePush(ctx context.Context, userIDList []string, msg *common.MsgData, offlinePushUserIDList *[]string) error { +func callbackOfflinePush(ctx context.Context, userIDList []string, msg *common.MsgData, offlinePushUserIDList *[]string) error { if !config.Config.Callback.CallbackOfflinePush.Enable { return nil } - req := &cbapi.CallbackBeforePushReq{ - UserStatusBatchCallbackReq: cbapi.UserStatusBatchCallbackReq{ - UserStatusBaseCallback: cbapi.UserStatusBaseCallback{ + req := &callbackstruct.CallbackBeforePushReq{ + UserStatusBatchCallbackReq: callbackstruct.UserStatusBatchCallbackReq{ + UserStatusBaseCallback: callbackstruct.UserStatusBaseCallback{ CallbackCommand: constant.CallbackOfflinePushCommand, OperationID: tracelog.GetOperationID(ctx), PlatformID: int(msg.SenderPlatformID), @@ -35,10 +35,10 @@ func CallbackOfflinePush(ctx context.Context, userIDList []string, msg *common.M GroupID: msg.GroupID, ContentType: msg.ContentType, SessionType: msg.SessionType, - AtUserIDList: msg.AtUserIDList, + AtUserIDs: msg.AtUserIDList, Content: utils.GetContent(msg), } - resp := &cbapi.CallbackBeforePushResp{} + resp := &callbackstruct.CallbackBeforePushResp{} err := http.CallBackPostReturn(url(), req, resp, config.Config.Callback.CallbackOfflinePush) if err != nil { return err @@ -52,13 +52,13 @@ func CallbackOfflinePush(ctx context.Context, userIDList []string, msg *common.M return nil } -func CallbackOnlinePush(operationID string, userIDList []string, msg *common.MsgData) error { +func callbackOnlinePush(operationID string, userIDList []string, msg *common.MsgData) error { if !config.Config.Callback.CallbackOnlinePush.Enable || utils.Contain(msg.SendID, userIDList...) { return nil } - req := cbapi.CallbackBeforePushReq{ - UserStatusBatchCallbackReq: cbapi.UserStatusBatchCallbackReq{ - UserStatusBaseCallback: cbapi.UserStatusBaseCallback{ + req := callbackstruct.CallbackBeforePushReq{ + UserStatusBatchCallbackReq: callbackstruct.UserStatusBatchCallbackReq{ + UserStatusBaseCallback: callbackstruct.UserStatusBaseCallback{ CallbackCommand: constant.CallbackOnlinePushCommand, OperationID: operationID, PlatformID: int(msg.SenderPlatformID), @@ -66,24 +66,24 @@ func CallbackOnlinePush(operationID string, userIDList []string, msg *common.Msg }, UserIDList: userIDList, }, - ClientMsgID: msg.ClientMsgID, - SendID: msg.SendID, - GroupID: msg.GroupID, - ContentType: msg.ContentType, - SessionType: msg.SessionType, - AtUserIDList: msg.AtUserIDList, - Content: utils.GetContent(msg), + ClientMsgID: msg.ClientMsgID, + SendID: msg.SendID, + GroupID: msg.GroupID, + ContentType: msg.ContentType, + SessionType: msg.SessionType, + AtUserIDs: msg.AtUserIDList, + Content: utils.GetContent(msg), } - resp := &cbapi.CallbackBeforePushResp{} + resp := &callbackstruct.CallbackBeforePushResp{} return http.CallBackPostReturn(url(), req, resp, config.Config.Callback.CallbackOnlinePush) } -func CallbackBeforeSuperGroupOnlinePush(ctx context.Context, groupID string, msg *common.MsgData, pushToUserList *[]string) error { +func callbackBeforeSuperGroupOnlinePush(ctx context.Context, groupID string, msg *common.MsgData, pushToUserList *[]string) error { if !config.Config.Callback.CallbackBeforeSuperGroupOnlinePush.Enable { return nil } - req := cbapi.CallbackBeforeSuperGroupOnlinePushReq{ - UserStatusBaseCallback: cbapi.UserStatusBaseCallback{ + req := callbackstruct.CallbackBeforeSuperGroupOnlinePushReq{ + UserStatusBaseCallback: callbackstruct.UserStatusBaseCallback{ CallbackCommand: constant.CallbackSuperGroupOnlinePushCommand, OperationID: tracelog.GetOperationID(ctx), PlatformID: int(msg.SenderPlatformID), @@ -98,7 +98,7 @@ func CallbackBeforeSuperGroupOnlinePush(ctx context.Context, groupID string, msg Content: utils.GetContent(msg), Seq: msg.Seq, } - resp := &cbapi.CallbackBeforeSuperGroupOnlinePushResp{} + resp := &callbackstruct.CallbackBeforeSuperGroupOnlinePushResp{} if err := http.CallBackPostReturn(config.Config.Callback.CallbackUrl, req, resp, config.Config.Callback.CallbackBeforeSuperGroupOnlinePush); err != nil { return err } diff --git a/internal/push/fcm/push.go b/internal/push/fcm/push.go index 8a739335a..6042a4bf8 100644 --- a/internal/push/fcm/push.go +++ b/internal/push/fcm/push.go @@ -3,57 +3,52 @@ package fcm import ( "Open_IM/internal/push" "Open_IM/pkg/common/config" - "Open_IM/pkg/common/db" - "Open_IM/pkg/common/log" + "Open_IM/pkg/common/constant" + "Open_IM/pkg/common/db/cache" "context" - go_redis "github.com/go-redis/redis/v8" - "path/filepath" - "strconv" - firebase "firebase.google.com/go" "firebase.google.com/go/messaging" + "github.com/go-redis/redis/v8" "google.golang.org/api/option" + "path/filepath" ) const SinglePushCountLimit = 400 +var Terminal = []int{constant.IOSPlatformID, constant.AndroidPlatformID, constant.WebPlatformID} + type Fcm struct { - FcmMsgCli *messaging.Client + fcmMsgCli *messaging.Client + cache cache.Cache } -func NewFcm() *Fcm { - return newFcmClient() -} -func newFcmClient() *Fcm { +func newFcmClient(cache cache.Cache) *Fcm { opt := option.WithCredentialsFile(filepath.Join(config.Root, "config", config.Config.Push.Fcm.ServiceAccount)) fcmApp, err := firebase.NewApp(context.Background(), nil, opt) if err != nil { - log.Debug("", "error initializing app: ", err.Error()) return nil } - //授权 + // auth // fcmClient, err := fcmApp.Auth(context.Background()) // if err != nil { - // log.Println("error getting Auth client: %v\n", err) // return // } - // log.Printf("%#v\r\n", fcmClient) ctx := context.Background() fcmMsgClient, err := fcmApp.Messaging(ctx) if err != nil { panic(err.Error()) return nil } - return &Fcm{FcmMsgCli: fcmMsgClient} + return &Fcm{fcmMsgCli: fcmMsgClient} } -func (f *Fcm) Push(accounts []string, title, detailContent, operationID string, opts push.PushOpts) (string, error) { +func (f *Fcm) Push(ctx context.Context, userIDs []string, title, content string, opts push.Opts) error { // accounts->registrationToken allTokens := make(map[string][]string, 0) - for _, account := range accounts { + for _, account := range userIDs { var personTokens []string - for _, v := range push.PushTerminal { - Token, err := db.DB.GetFcmToken(account, v) + for _, v := range Terminal { + Token, err := f.cache.GetFcmToken(ctx, account, v) if err == nil { personTokens = append(personTokens, Token) } @@ -63,18 +58,16 @@ func (f *Fcm) Push(accounts []string, title, detailContent, operationID string, Success := 0 Fail := 0 notification := &messaging.Notification{} - notification.Body = detailContent + notification.Body = content notification.Title = title var messages []*messaging.Message - ctx := context.Background() - for uid, personTokens := range allTokens { + for userID, personTokens := range allTokens { apns := &messaging.APNSConfig{Payload: &messaging.APNSPayload{Aps: &messaging.Aps{Sound: opts.IOSPushSound}}} messageCount := len(messages) if messageCount >= SinglePushCountLimit { - response, err := f.FcmMsgCli.SendAll(ctx, messages) + response, err := f.fcmMsgCli.SendAll(ctx, messages) if err != nil { Fail = Fail + messageCount - log.Info(operationID, "some token push err", err.Error(), messageCount) } else { Success = Success + response.SuccessCount Fail = Fail + response.FailureCount @@ -82,30 +75,30 @@ func (f *Fcm) Push(accounts []string, title, detailContent, operationID string, messages = messages[0:0] } if opts.IOSBadgeCount { - unreadCountSum, err := db.DB.IncrUserBadgeUnreadCountSum(uid) + unreadCountSum, err := f.cache.IncrUserBadgeUnreadCountSum(ctx, userID) if err == nil { apns.Payload.Aps.Badge = &unreadCountSum } else { - log.Error(operationID, "IncrUserBadgeUnreadCountSum redis err", err.Error(), uid) + //log.Error(operationID, "IncrUserBadgeUnreadCountSum redis err", err.Error(), uid) Fail++ continue } } else { - unreadCountSum, err := db.DB.GetUserBadgeUnreadCountSum(uid) + unreadCountSum, err := f.cache.GetUserBadgeUnreadCountSum(ctx, userID) if err == nil && unreadCountSum != 0 { apns.Payload.Aps.Badge = &unreadCountSum - } else if err == go_redis.Nil || unreadCountSum == 0 { + } else if err == redis.Nil || unreadCountSum == 0 { zero := 1 apns.Payload.Aps.Badge = &zero } else { - log.Error(operationID, "GetUserBadgeUnreadCountSum redis err", err.Error(), uid) + //log.Error(operationID, "GetUserBadgeUnreadCountSum redis err", err.Error(), uid) Fail++ continue } } for _, token := range personTokens { temp := &messaging.Message{ - Data: map[string]string{"ex": opts.Data}, + Data: map[string]string{"ex": opts.Ex}, Token: token, Notification: notification, APNS: apns, @@ -116,14 +109,14 @@ func (f *Fcm) Push(accounts []string, title, detailContent, operationID string, } messageCount := len(messages) if messageCount > 0 { - response, err := f.FcmMsgCli.SendAll(ctx, messages) + response, err := f.fcmMsgCli.SendAll(ctx, messages) if err != nil { Fail = Fail + messageCount - log.Info(operationID, "some token push err", err.Error(), messageCount) + //log.Info(operationID, "some token push err", err.Error(), messageCount) } else { Success = Success + response.SuccessCount Fail = Fail + response.FailureCount } } - return strconv.Itoa(Success) + " Success," + strconv.Itoa(Fail) + " Fail", nil + return nil } diff --git a/internal/push/getui/body.go b/internal/push/getui/body.go new file mode 100644 index 000000000..6c3628d1d --- /dev/null +++ b/internal/push/getui/body.go @@ -0,0 +1,142 @@ +package getui + +import "Open_IM/pkg/common/config" + +type CommonResp struct { + Code int `json:"code"` + Msg string `json:"msg"` + Data interface{} `json:"data"` +} + +type AuthReq struct { + Sign string `json:"sign"` + Timestamp string `json:"timestamp"` + AppKey string `json:"appkey"` +} + +type AuthResp struct { + ExpireTime string `json:"expire_time"` + Token string `json:"token"` +} + +type TaskResp struct { + TaskID string `json:"taskID"` +} + +type Settings struct { + TTL *int64 `json:"ttl"` +} + +type Audience struct { + Alias []string `json:"alias"` +} + +type PushMessage struct { + Notification *Notification `json:"notification,omitempty"` + Transmission *string `json:"transmission,omitempty"` +} + +type PushChannel struct { + Ios *Ios `json:"ios"` + Android *Android `json:"android"` +} + +type PushReq struct { + RequestID *string `json:"request_id"` + Settings *Settings `json:"settings"` + Audience *Audience `json:"audience"` + PushMessage *PushMessage `json:"push_message"` + PushChannel *PushChannel `json:"push_channel"` + IsAsync *bool `json:"is_async"` + TaskID *string `json:"taskid"` +} + +type Ios struct { + NotificationType *string `json:"type"` + AutoBadge *string `json:"auto_badge"` + Aps struct { + Sound string `json:"sound"` + Alert Alert `json:"alert"` + } `json:"aps"` +} + +type Alert struct { + Title string `json:"title"` + Body string `json:"body"` +} + +type Android struct { + Ups struct { + Notification Notification `json:"notification"` + Options Options `json:"options"` + } `json:"ups"` +} + +type Notification struct { + Title string `json:"title"` + Body string `json:"body"` + ChannelID string `json:"channelID"` + ChannelName string `json:"ChannelName"` + ClickType string `json:"click_type"` +} + +type Options struct { + HW struct { + DefaultSound bool `json:"/message/android/notification/default_sound"` + ChannelID string `json:"/message/android/notification/channel_id"` + Sound string `json:"/message/android/notification/sound"` + Importance string `json:"/message/android/notification/importance"` + } `json:"HW"` + XM struct { + ChannelID string `json:"/extra.channel_id"` + } `json:"XM"` + VV struct { + Classification int `json:"/classification"` + } `json:"VV"` +} + +func newPushReq(title, content string) PushReq { + pushReq := PushReq{PushMessage: &PushMessage{Notification: &Notification{ + Title: title, + Body: content, + ClickType: "startapp", + ChannelID: config.Config.Push.Getui.ChannelID, + ChannelName: config.Config.Push.Getui.ChannelName, + }}} + return pushReq +} + +func (pushReq *PushReq) setPushChannel(title string, body string) { + pushReq.PushChannel = &PushChannel{} + // autoBadge := "+1" + pushReq.PushChannel.Ios = &Ios{} + notify := "notify" + pushReq.PushChannel.Ios.NotificationType = ¬ify + pushReq.PushChannel.Ios.Aps.Sound = "default" + pushReq.PushChannel.Ios.Aps.Alert = Alert{ + Title: title, + Body: body, + } + pushReq.PushChannel.Android = &Android{} + pushReq.PushChannel.Android.Ups.Notification = Notification{ + Title: title, + Body: body, + ClickType: "startapp", + } + pushReq.PushChannel.Android.Ups.Options = Options{ + HW: struct { + DefaultSound bool `json:"/message/android/notification/default_sound"` + ChannelID string `json:"/message/android/notification/channel_id"` + Sound string `json:"/message/android/notification/sound"` + Importance string `json:"/message/android/notification/importance"` + }{ChannelID: "RingRing4", Sound: "/raw/ring001", Importance: "NORMAL"}, + XM: struct { + ChannelID string `json:"/extra.channel_id"` + }{ChannelID: "high_system"}, + VV: struct { + Classification int "json:\"/classification\"" + }{ + Classification: 1, + }, + } +} diff --git a/internal/push/getui/push.go b/internal/push/getui/push.go index 783f62f54..525e335dd 100644 --- a/internal/push/getui/push.go +++ b/internal/push/getui/push.go @@ -3,16 +3,16 @@ package getui import ( "Open_IM/internal/push" "Open_IM/pkg/common/config" - "Open_IM/pkg/common/db" + "Open_IM/pkg/common/db/cache" + //http2 "Open_IM/pkg/common/http" "Open_IM/pkg/common/log" "Open_IM/pkg/utils" "bytes" + "context" "crypto/sha256" - "errors" - - //"crypto/sha512" "encoding/hex" "encoding/json" + "errors" "io/ioutil" "net/http" "strconv" @@ -20,166 +20,61 @@ import ( ) var ( - GetuiClient *Getui - TokenExpireError = errors.New("token expire") ) const ( - PushURL = "/push/single/alias" - AuthURL = "/auth" - TaskURL = "/push/list/message" - BatchPushURL = "/push/list/alias" -) - -func init() { - GetuiClient = newGetuiClient() -} - -type Getui struct{} - -type GetuiCommonResp struct { - Code int `json:"code"` - Msg string `json:"msg"` - Data interface{} `json:"data"` -} - -type AuthReq struct { - Sign string `json:"sign"` - Timestamp string `json:"timestamp"` - Appkey string `json:"appkey"` -} - -type AuthResp struct { - ExpireTime string `json:"expire_time"` - Token string `json:"token"` -} - -type TaskResp struct { - TaskID string `json:"taskID"` -} - -type Settings struct { - TTL *int64 `json:"ttl"` -} - -type Audience struct { - Alias []string `json:"alias"` -} - -type PushMessage struct { - Notification *Notification `json:"notification,omitempty"` - Transmission *string `json:"transmission,omitempty"` -} - -type PushChannel struct { - Ios *Ios `json:"ios"` - Android *Android `json:"android"` -} - -type PushReq struct { - RequestID *string `json:"request_id"` - Settings *Settings `json:"settings"` - Audience *Audience `json:"audience"` - PushMessage *PushMessage `json:"push_message"` - PushChannel *PushChannel `json:"push_channel"` - IsAsync *bool `json:"is_async"` - Taskid *string `json:"taskid"` -} - -type Ios struct { - NotiType *string `json:"type"` - AutoBadge *string `json:"auto_badge"` - Aps struct { - Sound string `json:"sound"` - Alert Alert `json:"alert"` - } `json:"aps"` -} - -type Alert struct { - Title string `json:"title"` - Body string `json:"body"` -} - -type Android struct { - Ups struct { - Notification Notification `json:"notification"` - Options Options `json:"options"` - } `json:"ups"` -} + pushURL = "/push/single/alias" + authURL = "/auth" + taskURL = "/push/list/message" + batchPushURL = "/push/list/alias" -type Notification struct { - Title string `json:"title"` - Body string `json:"body"` - ChannelID string `json:"channelID"` - ChannelName string `json:"ChannelName"` - ClickType string `json:"click_type"` -} - -type Options struct { - HW struct { - DefaultSound bool `json:"/message/android/notification/default_sound"` - ChannelID string `json:"/message/android/notification/channel_id"` - Sound string `json:"/message/android/notification/sound"` - Importance string `json:"/message/android/notification/importance"` - } `json:"HW"` - XM struct { - ChannelID string `json:"/extra.channel_id"` - } `json:"XM"` - VV struct { - Classification int `json:"/classification"` - } `json:"VV"` -} + tokenExpire = 10001 + ttl = 0 +) -type PushResp struct { +type Client struct { + cache cache.Cache } -func newGetuiClient() *Getui { - return &Getui{} +func newClient(cache cache.Cache) *Client { + return &Client{cache: cache} } -func (g *Getui) Push(userIDList []string, title, detailContent, operationID string, opts push.PushOpts) (resp string, err error) { - token, err := db.DB.GetGetuiToken() - log.NewDebug(operationID, utils.GetSelfFuncName(), "token:", token, userIDList) +func (g *Client) Push(ctx context.Context, userIDs []string, title, content, operationID string, opts *push.Opts) error { + token, err := g.cache.GetGetuiToken(ctx) if err != nil { log.NewError(operationID, utils.GetSelfFuncName(), "GetGetuiToken failed", err.Error()) } if token == "" || err != nil { - token, err = g.getTokenAndSave2Redis(operationID) + token, err = g.getTokenAndSave2Redis(ctx) if err != nil { log.NewError(operationID, utils.GetSelfFuncName(), "getTokenAndSave2Redis failed", err.Error()) - return "", utils.Wrap(err, "") + return utils.Wrap(err, "") } } - - pushReq := PushReq{PushMessage: &PushMessage{Notification: &Notification{ - Title: title, - Body: detailContent, - ClickType: "startapp", - ChannelID: config.Config.Push.Getui.ChannelID, - ChannelName: config.Config.Push.Getui.ChannelName, - }}} - pushReq.setPushChannel(title, detailContent) - pushResp := PushResp{} - if len(userIDList) > 1 { - taskID, err := g.GetTaskID(operationID, token, pushReq) + pushReq := newPushReq(title, content) + pushReq.setPushChannel(title, content) + pushResp := struct{}{} + if len(userIDs) > 1 { + taskID, err := g.GetTaskID(ctx, token, pushReq) if err != nil { - return "", utils.Wrap(err, "GetTaskIDAndSave2Redis failed") + return utils.Wrap(err, "GetTaskIDAndSave2Redis failed") } - pushReq = PushReq{Audience: &Audience{Alias: userIDList}} + pushReq = PushReq{Audience: &Audience{Alias: userIDs}} var IsAsync = true pushReq.IsAsync = &IsAsync - pushReq.Taskid = &taskID - err = g.request(BatchPushURL, pushReq, token, &pushResp, operationID) + pushReq.TaskID = &taskID + err = g.request(ctx, batchPushURL, pushReq, token, &pushResp) } else { reqID := utils.OperationIDGenerator() pushReq.RequestID = &reqID - pushReq.Audience = &Audience{Alias: []string{userIDList[0]}} - err = g.request(PushURL, pushReq, token, &pushResp, operationID) + pushReq.Audience = &Audience{Alias: []string{userIDs[0]}} + err = g.request(ctx, pushURL, pushReq, token, &pushResp) } switch err { case TokenExpireError: - token, err = g.getTokenAndSave2Redis(operationID) + token, err = g.getTokenAndSave2Redis(ctx) if err != nil { log.NewError(operationID, utils.GetSelfFuncName(), "getTokenAndSave2Redis failed, ", err.Error()) } else { @@ -187,52 +82,47 @@ func (g *Getui) Push(userIDList []string, title, detailContent, operationID stri } } if err != nil { - return "", utils.Wrap(err, "push failed") + return utils.Wrap(err, "push failed") } - respBytes, err := json.Marshal(pushResp) - return string(respBytes), utils.Wrap(err, "") + return utils.Wrap(err, "") } -func (g *Getui) Auth(operationID string, timeStamp int64) (token string, expireTime int64, err error) { - log.NewInfo(operationID, utils.GetSelfFuncName(), config.Config.Push.Getui.AppKey, timeStamp, config.Config.Push.Getui.MasterSecret) +func (g *Client) Auth(ctx context.Context, timeStamp int64) (token string, expireTime int64, err error) { h := sha256.New() h.Write([]byte(config.Config.Push.Getui.AppKey + strconv.Itoa(int(timeStamp)) + config.Config.Push.Getui.MasterSecret)) - sum := h.Sum(nil) - sign := hex.EncodeToString(sum) - log.NewInfo(operationID, utils.GetSelfFuncName(), "sha256 result", sign) + sign := hex.EncodeToString(h.Sum(nil)) reqAuth := AuthReq{ Sign: sign, Timestamp: strconv.Itoa(int(timeStamp)), - Appkey: config.Config.Push.Getui.AppKey, + AppKey: config.Config.Push.Getui.AppKey, } respAuth := AuthResp{} - err = g.request(AuthURL, reqAuth, "", &respAuth, operationID) + err = g.request(ctx, authURL, reqAuth, "", &respAuth) if err != nil { return "", 0, err } - log.NewInfo(operationID, utils.GetSelfFuncName(), "result: ", respAuth) + //log.NewInfo(operationID, utils.GetSelfFuncName(), "result: ", respAuth) expire, err := strconv.Atoi(respAuth.ExpireTime) return respAuth.Token, int64(expire), err } -func (g *Getui) GetTaskID(operationID, token string, pushReq PushReq) (string, error) { +func (g *Client) GetTaskID(ctx context.Context, token string, pushReq PushReq) (string, error) { respTask := TaskResp{} ttl := int64(1000 * 60 * 5) pushReq.Settings = &Settings{TTL: &ttl} - err := g.request(TaskURL, pushReq, token, &respTask, operationID) + err := g.request(ctx, taskURL, pushReq, token, &respTask) if err != nil { return "", utils.Wrap(err, "") } return respTask.TaskID, nil } -func (g *Getui) request(url string, content interface{}, token string, returnStruct interface{}, operationID string) error { +func (g *Client) request(ctx context.Context, url string, content interface{}, token string, output interface{}) error { con, err := json.Marshal(content) if err != nil { return err } client := &http.Client{} - log.Debug(operationID, utils.GetSelfFuncName(), "json:", string(con), "token:", token) req, err := http.NewRequest("POST", config.Config.Push.Getui.PushUrl+url, bytes.NewBuffer(con)) if err != nil { return err @@ -250,74 +140,38 @@ func (g *Getui) request(url string, content interface{}, token string, returnStr if err != nil { return err } - log.NewDebug(operationID, "getui", utils.GetSelfFuncName(), "resp, ", string(result)) - commonResp := GetuiCommonResp{} - commonResp.Data = returnStruct + //log.NewDebug(operationID, "getui", utils.GetSelfFuncName(), "resp, ", string(result)) + commonResp := CommonResp{} + commonResp.Data = output if err := json.Unmarshal(result, &commonResp); err != nil { return err } - if commonResp.Code == 10001 { + if commonResp.Code == tokenExpire { return TokenExpireError } return nil } -func (pushReq *PushReq) setPushChannel(title string, body string) { - pushReq.PushChannel = &PushChannel{} - // autoBadge := "+1" - pushReq.PushChannel.Ios = &Ios{} - notify := "notify" - pushReq.PushChannel.Ios.NotiType = ¬ify - pushReq.PushChannel.Ios.Aps.Sound = "default" - pushReq.PushChannel.Ios.Aps.Alert = Alert{ - Title: title, - Body: body, - } - pushReq.PushChannel.Android = &Android{} - pushReq.PushChannel.Android.Ups.Notification = Notification{ - Title: title, - Body: body, - ClickType: "startapp", - } - pushReq.PushChannel.Android.Ups.Options = Options{ - HW: struct { - DefaultSound bool `json:"/message/android/notification/default_sound"` - ChannelID string `json:"/message/android/notification/channel_id"` - Sound string `json:"/message/android/notification/sound"` - Importance string `json:"/message/android/notification/importance"` - }{ChannelID: "RingRing4", Sound: "/raw/ring001", Importance: "NORMAL"}, - XM: struct { - ChannelID string `json:"/extra.channel_id"` - }{ChannelID: "high_system"}, - VV: struct { - Classification int "json:\"/classification\"" - }{ - Classification: 1, - }, - } -} - -func (g *Getui) getTokenAndSave2Redis(operationID string) (token string, err error) { - token, expireTime, err := g.Auth(operationID, time.Now().UnixNano()/1e6) +func (g *Client) getTokenAndSave2Redis(ctx context.Context) (token string, err error) { + token, _, err = g.Auth(ctx, time.Now().UnixNano()/1e6) if err != nil { return "", utils.Wrap(err, "Auth failed") } - log.NewDebug(operationID, "getui", utils.GetSelfFuncName(), token, expireTime, err) - err = db.DB.SetGetuiToken(token, 60*60*23) + err = g.cache.SetGetuiTaskID(ctx, token, 60*60*23) if err != nil { return "", utils.Wrap(err, "Auth failed") } return token, nil } -func (g *Getui) GetTaskIDAndSave2Redis(operationID, token string, pushReq PushReq) (taskID string, err error) { +func (g *Client) GetTaskIDAndSave2Redis(ctx context.Context, token string, pushReq PushReq) (taskID string, err error) { ttl := int64(1000 * 60 * 60 * 24) pushReq.Settings = &Settings{TTL: &ttl} - taskID, err = g.GetTaskID(operationID, token, pushReq) + taskID, err = g.GetTaskID(ctx, token, pushReq) if err != nil { return "", utils.Wrap(err, "GetTaskIDAndSave2Redis failed") } - err = db.DB.SetGetuiTaskID(taskID, 60*60*23) + err = g.cache.SetGetuiTaskID(ctx, taskID, 60*60*23) if err != nil { return "", utils.Wrap(err, "Auth failed") } diff --git a/internal/push/logic/init.go b/internal/push/init.go similarity index 69% rename from internal/push/logic/init.go rename to internal/push/init.go index 04334a1fd..5482c65d4 100644 --- a/internal/push/logic/init.go +++ b/internal/push/init.go @@ -4,27 +4,23 @@ ** author("fg,Gordon@open-im.io"). ** time(2021/3/22 15:33). */ -package logic +package push import ( - pusher "Open_IM/internal/push" fcm "Open_IM/internal/push/fcm" "Open_IM/internal/push/getui" jpush "Open_IM/internal/push/jpush" - "Open_IM/internal/push/mobpush" "Open_IM/pkg/common/config" "Open_IM/pkg/common/constant" - "Open_IM/pkg/common/kafka" - prome "Open_IM/pkg/common/prometheus" + "Open_IM/pkg/common/prome" "Open_IM/pkg/statistics" "fmt" ) var ( rpcServer RPCServer - pushCh PushConsumerHandler - producer *kafka.Producer - offlinePusher pusher.OfflinePusher + pushCh ConsumerHandler + offlinePusher OfflinePusher successCount uint64 ) @@ -34,7 +30,6 @@ func Init(rpcPort int) { } func init() { - producer = kafka.NewKafkaProducer(config.Config.Kafka.Ws2mschat.Addr, config.Config.Kafka.Ws2mschat.Topic) statistics.NewStatistics(&successCount, config.Config.ModuleName.PushName, fmt.Sprintf("%d second push to msg_gateway count", constant.StatisticsTimeInterval), constant.StatisticsTimeInterval) if *config.Config.Push.Getui.Enable { offlinePusher = getui.GetuiClient @@ -46,10 +41,6 @@ func init() { if config.Config.Push.Fcm.Enable { offlinePusher = fcm.NewFcm() } - - if config.Config.Push.Mob.Enable { - offlinePusher = mobpush.MobPushClient - } } func initPrometheus() { @@ -59,7 +50,7 @@ func initPrometheus() { func Run(promethuesPort int) { go rpcServer.run() - go pushCh.pushConsumerGroup.RegisterHandleAndConsumer(&pushCh) + go pushCh.ConsumerGroup.RegisterHandleAndConsumer(&pushCh) go func() { err := prome.StartPromeSrv(promethuesPort) if err != nil { diff --git a/internal/push/jpush/requestbody/audience.go b/internal/push/jpush/body/audience.go similarity index 97% rename from internal/push/jpush/requestbody/audience.go rename to internal/push/jpush/body/audience.go index 9ccdd2cec..b523693f9 100644 --- a/internal/push/jpush/requestbody/audience.go +++ b/internal/push/jpush/body/audience.go @@ -1,4 +1,4 @@ -package requestbody +package body const ( TAG = "tag" @@ -20,7 +20,6 @@ func (a *Audience) set(key string, v []string) { a.audience = make(map[string][]string) a.Object = a.audience } - //v, ok = this.audience[key] //if ok { // return diff --git a/internal/push/jpush/requestbody/message.go b/internal/push/jpush/body/message.go similarity index 96% rename from internal/push/jpush/requestbody/message.go rename to internal/push/jpush/body/message.go index 5ed293f64..955a0fffb 100644 --- a/internal/push/jpush/requestbody/message.go +++ b/internal/push/jpush/body/message.go @@ -1,4 +1,4 @@ -package requestbody +package body type Message struct { MsgContent string `json:"msg_content"` diff --git a/internal/push/jpush/requestbody/notification.go b/internal/push/jpush/body/notification.go similarity index 98% rename from internal/push/jpush/requestbody/notification.go rename to internal/push/jpush/body/notification.go index 67c37b1c7..72794793d 100644 --- a/internal/push/jpush/requestbody/notification.go +++ b/internal/push/jpush/body/notification.go @@ -1,4 +1,4 @@ -package requestbody +package body import ( "Open_IM/pkg/common/config" diff --git a/internal/push/jpush/requestbody/options.go b/internal/push/jpush/body/options.go similarity index 87% rename from internal/push/jpush/requestbody/options.go rename to internal/push/jpush/body/options.go index dd0733c1b..323637414 100644 --- a/internal/push/jpush/requestbody/options.go +++ b/internal/push/jpush/body/options.go @@ -1,4 +1,4 @@ -package requestbody +package body type Options struct { ApnsProduction bool `json:"apns_production"` diff --git a/internal/push/jpush/requestbody/platform.go b/internal/push/jpush/body/platform.go similarity index 98% rename from internal/push/jpush/requestbody/platform.go rename to internal/push/jpush/body/platform.go index 363e132d9..7b8ac255f 100644 --- a/internal/push/jpush/requestbody/platform.go +++ b/internal/push/jpush/body/platform.go @@ -1,4 +1,4 @@ -package requestbody +package body import ( "Open_IM/pkg/common/constant" diff --git a/internal/push/jpush/requestbody/pushobj.go b/internal/push/jpush/body/pushobj.go similarity index 96% rename from internal/push/jpush/requestbody/pushobj.go rename to internal/push/jpush/body/pushobj.go index e29403109..e91faf25a 100644 --- a/internal/push/jpush/requestbody/pushobj.go +++ b/internal/push/jpush/body/pushobj.go @@ -1,4 +1,4 @@ -package requestbody +package body type PushObj struct { Platform interface{} `json:"platform"` diff --git a/internal/push/jpush/common/JGPlatform.go b/internal/push/jpush/common/JGPlatform.go deleted file mode 100644 index e2e858cf4..000000000 --- a/internal/push/jpush/common/JGPlatform.go +++ /dev/null @@ -1,13 +0,0 @@ -package common - -import ( - "encoding/base64" - "fmt" -) - -func GetAuthorization(Appkey string, MasterSecret string) string { - str := fmt.Sprintf("%s:%s", Appkey, MasterSecret) - buf := []byte(str) - Authorization := fmt.Sprintf("Basic %s", base64.StdEncoding.EncodeToString(buf)) - return Authorization -} diff --git a/internal/push/jpush/push.go b/internal/push/jpush/push.go index 6a5026301..4369b90af 100644 --- a/internal/push/jpush/push.go +++ b/internal/push/jpush/push.go @@ -2,11 +2,13 @@ package push import ( "Open_IM/internal/push" + "Open_IM/internal/push/jpush/body" "Open_IM/internal/push/jpush/common" - "Open_IM/internal/push/jpush/requestBody" "Open_IM/pkg/common/config" "bytes" + "encoding/base64" "encoding/json" + "fmt" "io/ioutil" "net/http" ) @@ -33,26 +35,31 @@ func (j *JPush) SetAlias(cid, alias string) (resp string, err error) { return resp, nil } -func (j *JPush) Push(accounts []string, title, detailContent, operationID string, opts push.PushOpts) (string, error) { +func (j *JPush) getAuthorization(Appkey string, MasterSecret string) string { + str := fmt.Sprintf("%s:%s", Appkey, MasterSecret) + buf := []byte(str) + Authorization := fmt.Sprintf("Basic %s", base64.StdEncoding.EncodeToString(buf)) + return Authorization +} - var pf requestbody.Platform +func (j *JPush) Push(accounts []string, title, detailContent, operationID string, opts push.PushOpts) (string, error) { + var pf body.Platform pf.SetAll() - var au requestbody.Audience + var au body.Audience au.SetAlias(accounts) - var no requestbody.Notification - - var extras requestbody.Extras + var no body.Notification + var extras body.Extras if opts.Signal.ClientMsgID != "" { extras.ClientMsgID = opts.Signal.ClientMsgID } no.IOSEnableMutableContent() no.SetExtras(extras) no.SetAlert(title) - var me requestbody.Message + var me body.Message me.SetMsgContent(detailContent) - var o requestbody.Options + var o body.Options o.SetApnsProduction(config.Config.IOSPush.Production) - var po requestbody.PushObj + var po body.PushObj po.SetPlatform(&pf) po.SetAudience(&au) po.SetNotification(&no) @@ -68,8 +75,7 @@ func (j *JPush) Push(accounts []string, title, detailContent, operationID string if err != nil { return "", err } - req.Header.Set("Authorization", common.GetAuthorization(config.Config.Push.Jpns.AppKey, config.Config.Push.Jpns.MasterSecret)) - + req.Header.Set("Authorization", j.getAuthorization(config.Config.Push.Jpns.AppKey, config.Config.Push.Jpns.MasterSecret)) resp, err := client.Do(req) if err != nil { return "", err diff --git a/internal/push/mobpush/common/getSign.go b/internal/push/mobpush/common/getSign.go deleted file mode 100644 index 0ee7b4a60..000000000 --- a/internal/push/mobpush/common/getSign.go +++ /dev/null @@ -1,18 +0,0 @@ -package common - -import ( - "Open_IM/pkg/common/config" - "crypto/md5" - "encoding/hex" - "fmt" - "io" -) - -func GetSign(paramsStr string) string { - h := md5.New() - io.WriteString(h, paramsStr) - io.WriteString(h, config.Config.Push.Mob.AppSecret) - fmt.Printf("%x", h.Sum(nil)) - - return hex.EncodeToString(h.Sum(nil)) -} diff --git a/internal/push/mobpush/push.go b/internal/push/mobpush/push.go deleted file mode 100644 index be28f1adb..000000000 --- a/internal/push/mobpush/push.go +++ /dev/null @@ -1,73 +0,0 @@ -package mobpush - -import ( - "Open_IM/internal/push" - "Open_IM/internal/push/mobpush/common" - "Open_IM/internal/push/mobpush/requestParams" - "Open_IM/pkg/common/config" - "encoding/json" - "io/ioutil" - "net/http" - "strings" -) - -var ( - MobPushClient *MobPush -) - -func init() { - MobPushClient = newGetuiClient() -} - -type MobPush struct{} - -func newGetuiClient() *MobPush { - return &MobPush{} -} - -func (j *MobPush) Push(accounts []string, alert, detailContent, operationID string, opts push.PushOpts) (string, error) { - var target requestparams.PushTarget - target.SetAlias(accounts) - target.SetTarget(2) - - var no requestparams.PushNotify - no.SetType(1) - no.SetIosProduction(1) - no.SetPlats([]int{1, 2}) - no.SetContent(alert) - - var forward requestparams.PushForward - forward.SetNextType(2) - forward.SetScheme(config.Config.Push.Mob.Scheme) - - var po requestparams.PushObj - po.SetSource("webapi") - po.SetAppkey(config.Config.Push.Mob.AppKey) - po.SetPushTarget(&target) - po.SetPushNotify(&no) - po.SetPushForward(&forward) - - con, err := json.Marshal(po) - if err != nil { - return "", err - } - client := &http.Client{} - req, err := http.NewRequest("POST", config.Config.Push.Mob.PushUrl, strings.NewReader(string(con))) - if err != nil { - return "", err - } - req.Header.Set("Content-Type", "application/json") - req.Header.Set("key", config.Config.Push.Mob.AppKey) - req.Header.Set("sign", common.GetSign(string(con))) - - resp, err := client.Do(req) - if err != nil { - return "", err - } - defer resp.Body.Close() - result, err := ioutil.ReadAll(resp.Body) - if err != nil { - return "", err - } - return string(result), nil -} diff --git a/internal/push/mobpush/requestparams/pushForward.go b/internal/push/mobpush/requestparams/pushForward.go deleted file mode 100644 index be448af09..000000000 --- a/internal/push/mobpush/requestparams/pushForward.go +++ /dev/null @@ -1,14 +0,0 @@ -package requestparams - -type PushForward struct { - NextType int `json:"nextType"` - Scheme string `json:"scheme,omitempty"` -} - -func (m *PushForward) SetNextType(c int) { - m.NextType = c -} - -func (m *PushForward) SetScheme(t string) { - m.Scheme = t -} diff --git a/internal/push/mobpush/requestparams/pushNotify.go b/internal/push/mobpush/requestparams/pushNotify.go deleted file mode 100644 index 2c259b0d8..000000000 --- a/internal/push/mobpush/requestparams/pushNotify.go +++ /dev/null @@ -1,26 +0,0 @@ -package requestparams - -type PushNotify struct { - Plats []int `json:"plats,omitempty"` - IosProduction int `json:"iosProduction,omitempty"` - Content string `json:"content,omitempty"` - Type int `json:"type,omitempty"` -} - -func (n *PushNotify) SetPlats(plats []int) { - n.Plats = plats - -} - -func (n *PushNotify) SetIosProduction(iosProduction int) { - n.IosProduction = iosProduction - -} - -func (n *PushNotify) SetContent(content string) { - n.Content = content -} - -func (n *PushNotify) SetType(Type int) { - n.Type = Type -} diff --git a/internal/push/mobpush/requestparams/pushObj.go b/internal/push/mobpush/requestparams/pushObj.go deleted file mode 100644 index b04d40d5a..000000000 --- a/internal/push/mobpush/requestparams/pushObj.go +++ /dev/null @@ -1,28 +0,0 @@ -package requestparams - -type PushObj struct { - Source interface{} `json:"source"` - Appkey interface{} `json:"appkey"` - PushTarget interface{} `json:"pushTarget,omitempty"` - PushNotify interface{} `json:"pushNotify,omitempty"` - PushForward interface{} `json:"pushForward,omitempty"` -} - -func (p *PushObj) SetSource(source string) { - p.Source = source -} - -func (p *PushObj) SetAppkey(appkey string) { - p.Appkey = appkey -} - -func (p *PushObj) SetPushTarget(no *PushTarget) { - p.PushTarget = no -} - -func (p *PushObj) SetPushNotify(m *PushNotify) { - p.PushNotify = m -} -func (p *PushObj) SetPushForward(o *PushForward) { - p.PushForward = o -} diff --git a/internal/push/mobpush/requestparams/pushTarget.go b/internal/push/mobpush/requestparams/pushTarget.go deleted file mode 100644 index bd6a310e1..000000000 --- a/internal/push/mobpush/requestparams/pushTarget.go +++ /dev/null @@ -1,13 +0,0 @@ -package requestparams - -type PushTarget struct { - Target interface{} `json:"target,omitempty"` - Alias []string `json:"alias,omitempty"` -} - -func (p *PushTarget) SetTarget(target int) { - p.Target = target -} -func (p *PushTarget) SetAlias(alias []string) { - p.Alias = alias -} diff --git a/internal/push/logic/push_handler.go b/internal/push/push_handler.go similarity index 66% rename from internal/push/logic/push_handler.go rename to internal/push/push_handler.go index f137c09d6..b9bca2888 100644 --- a/internal/push/logic/push_handler.go +++ b/internal/push/push_handler.go @@ -4,7 +4,7 @@ ** author("fg,Gordon@tuoyun.net"). ** time(2021/5/13 10:33). */ -package logic +package push import ( "Open_IM/pkg/common/config" @@ -18,21 +18,16 @@ import ( "github.com/golang/protobuf/proto" ) -type fcb func(msg []byte) - -type PushConsumerHandler struct { - msgHandle map[string]fcb +type ConsumerHandler struct { pushConsumerGroup *kfk.MConsumerGroup } -func (ms *PushConsumerHandler) Init() { - ms.msgHandle = make(map[string]fcb) - ms.msgHandle[config.Config.Kafka.Ms2pschat.Topic] = ms.handleMs2PsChat - ms.pushConsumerGroup = kfk.NewMConsumerGroup(&kfk.MConsumerGroupConfig{KafkaVersion: sarama.V2_0_0_0, +func (c *ConsumerHandler) Init() { + c.pushConsumerGroup = kfk.NewMConsumerGroup(&kfk.MConsumerGroupConfig{KafkaVersion: sarama.V2_0_0_0, OffsetsInitial: sarama.OffsetNewest, IsReturnErr: false}, []string{config.Config.Kafka.Ms2pschat.Topic}, config.Config.Kafka.Ms2pschat.Addr, config.Config.Kafka.ConsumerGroupID.MsgToPush) } -func (ms *PushConsumerHandler) handleMs2PsChat(msg []byte) { +func (c *ConsumerHandler) handleMs2PsChat(msg []byte) { log.NewDebug("", "msg come from kafka And push!!!", "msg", string(msg)) msgFromMQ := pbChat.PushMsgDataToMQ{} if err := proto.Unmarshal(msg, &msgFromMQ); err != nil { @@ -55,16 +50,14 @@ func (ms *PushConsumerHandler) handleMs2PsChat(msg []byte) { default: MsgToUser(pbData) } - //Call push module to send message to the user - //MsgToUser((*pbPush.PushMsgReq)(&msgFromMQ)) } -func (PushConsumerHandler) Setup(_ sarama.ConsumerGroupSession) error { return nil } -func (PushConsumerHandler) Cleanup(_ sarama.ConsumerGroupSession) error { return nil } -func (ms *PushConsumerHandler) ConsumeClaim(sess sarama.ConsumerGroupSession, +func (ConsumerHandler) Setup(_ sarama.ConsumerGroupSession) error { return nil } +func (ConsumerHandler) Cleanup(_ sarama.ConsumerGroupSession) error { return nil } +func (c *ConsumerHandler) ConsumeClaim(sess sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error { for msg := range claim.Messages() { log.NewDebug("", "kafka get info to mysql", "msgTopic", msg.Topic, "msgPartition", msg.Partition, "msg", string(msg.Value)) - ms.msgHandle[msg.Topic](msg.Value) + c.handleMs2PsChat(msg.Value) sess.MarkMessage(msg, "") } return nil diff --git a/internal/push/push_interface.go b/internal/push/push_interface.go index 4cd8bae9a..7ed3c4b85 100644 --- a/internal/push/push_interface.go +++ b/internal/push/push_interface.go @@ -1,18 +1,16 @@ package push -import "Open_IM/pkg/common/constant" - -var PushTerminal = []int{constant.IOSPlatformID, constant.AndroidPlatformID, constant.WebPlatformID} +import "context" type OfflinePusher interface { - Push(userIDList []string, title, detailContent, operationID string, opts PushOpts) (resp string, err error) + Push(ctx context.Context, userIDs []string, title, content, opts *Opts) error } -type PushOpts struct { - Signal Signal +type Opts struct { + Signal *Signal IOSPushSound string IOSBadgeCount bool - Data string + Ex string } type Signal struct { diff --git a/internal/push/logic/push_rpc_server.go b/internal/push/push_rpc_server.go similarity index 98% rename from internal/push/logic/push_rpc_server.go rename to internal/push/push_rpc_server.go index 888dfb255..6a9487c9e 100644 --- a/internal/push/logic/push_rpc_server.go +++ b/internal/push/push_rpc_server.go @@ -1,4 +1,4 @@ -package logic +package push import ( "Open_IM/pkg/common/config" @@ -6,7 +6,6 @@ import ( "Open_IM/pkg/common/db" "Open_IM/pkg/common/log" prome "Open_IM/pkg/common/prometheus" - "Open_IM/pkg/getcdv3" pbPush "Open_IM/pkg/proto/push" "Open_IM/pkg/utils" "context" diff --git a/internal/push/logic/push_to_client.go b/internal/push/push_to_client.go similarity index 93% rename from internal/push/logic/push_to_client.go rename to internal/push/push_to_client.go index f1e42dc02..31052cf27 100644 --- a/internal/push/logic/push_to_client.go +++ b/internal/push/push_to_client.go @@ -4,45 +4,33 @@ ** author("fg,Gordon@open-im.io"). ** time(2021/3/5 14:31). */ -package logic +package push import ( - "Open_IM/internal/push" "Open_IM/pkg/common/config" "Open_IM/pkg/common/constant" - "Open_IM/pkg/common/db" "Open_IM/pkg/common/log" - "Open_IM/pkg/getcdv3" + "Open_IM/pkg/common/prome" pbPush "Open_IM/pkg/proto/push" pbRelay "Open_IM/pkg/proto/relay" pbRtc "Open_IM/pkg/proto/rtc" "Open_IM/pkg/utils" "context" - "strings" - - prome "Open_IM/pkg/common/prometheus" - "github.com/golang/protobuf/proto" + "strings" ) -type OpenIMContent struct { - SessionType int `json:"sessionType"` - From string `json:"from"` - To string `json:"to"` - Seq uint32 `json:"seq"` -} type AtContent struct { Text string `json:"text"` AtUserList []string `json:"atUserList"` IsAtSelf bool `json:"isAtSelf"` } -//var grpcCons []*grpc.ClientConn - func MsgToUser(pushMsg *pbPush.PushMsgReq) { var wsResult []*pbRelay.SingelMsgToUserResultList isOfflinePush := utils.GetSwitchFromOptions(pushMsg.MsgData.Options, constant.IsOfflinePush) log.Debug(pushMsg.OperationID, "Get msg from msg_transfer And push msg", pushMsg.String()) + grpcCons := rpc.GetDefaultGatewayConn4Unique(config.Config.Etcd.EtcdSchema, strings.Join(config.Config.Etcd.EtcdAddr, ","), pushMsg.OperationID) var UIDList = []string{pushMsg.PushToUserID} @@ -277,13 +265,13 @@ func MsgToSuperGroupUser(pushMsg *pbPush.PushMsgReq) { } } -func GetOfflinePushOpts(pushMsg *pbPush.PushMsgReq) (opts push.PushOpts, err error) { - if pushMsg.MsgData.ContentType < constant.SignalingNotificationEnd && pushMsg.MsgData.ContentType > constant.SignalingNotificationBegin { +func GetOfflinePushOpts(pushMsg *pbPush.PushMsgReq) (opts *Opts, err error) { + if pushMsg.MsgData.ContentType > constant.SignalingNotificationBegin && pushMsg.MsgData.ContentType < constant.SignalingNotificationEnd { req := &pbRtc.SignalReq{} if err := proto.Unmarshal(pushMsg.MsgData.Content, req); err != nil { - return opts, utils.Wrap(err, "") + return nil, utils.Wrap(err, "") } - log.NewDebug(pushMsg.OperationID, utils.GetSelfFuncName(), "SignalReq: ", req.String()) + opts = &Opts{} switch req.Payload.(type) { case *pbRtc.SignalReq_Invite, *pbRtc.SignalReq_InviteInGroup: opts.Signal.ClientMsgID = pushMsg.MsgData.ClientMsgID @@ -291,10 +279,10 @@ func GetOfflinePushOpts(pushMsg *pbPush.PushMsgReq) (opts push.PushOpts, err err } } if pushMsg.MsgData.OfflinePushInfo != nil { + opts = &Opts{} opts.IOSBadgeCount = pushMsg.MsgData.OfflinePushInfo.IOSBadgeCount opts.IOSPushSound = pushMsg.MsgData.OfflinePushInfo.IOSPushSound - opts.Data = pushMsg.MsgData.OfflinePushInfo.Ex + opts.Ex = pushMsg.MsgData.OfflinePushInfo.Ex } - return opts, nil } diff --git a/internal/push/sdk/tpns-server-sdk-go/go/auth/auth.go b/internal/push/sdk/tpns-server-sdk-go/go/auth/auth.go deleted file mode 100644 index 0a3c07531..000000000 --- a/internal/push/sdk/tpns-server-sdk-go/go/auth/auth.go +++ /dev/null @@ -1,62 +0,0 @@ -package auth - -import ( - "crypto/hmac" - "crypto/sha256" - "encoding/base64" - b64 "encoding/base64" - "encoding/hex" - "fmt" - "net/http" - "strconv" - "time" -) - -type Auther struct { - AccessID string - SecretKey string -} - -var UseSignAuthored = true - -func (a *Auther) Auth(req *http.Request, useSignAuthored bool, auth Auther, reqBody string) { - - if useSignAuthored { - now := time.Now() - timeStamp := now.Unix() - req.Header.Add("AccessId", auth.AccessID) - req.Header.Add("TimeStamp", strconv.Itoa(int(timeStamp))) - sign := GenSign(uint64(timeStamp), auth.AccessID, auth.SecretKey, reqBody) - req.Header.Add("Sign", sign) - } else { - author := makeAuthHeader(a.AccessID, a.SecretKey) - //log.Printf("author string:%v", author) - req.Header.Add("Authorization", author) - } - //req.Header.Add("Content-Type", "application/json") -} - -func makeAuthHeader(appID, secretKey string) string { - base64Str := base64.StdEncoding.EncodeToString( - []byte( - fmt.Sprintf("%s:%s", appID, secretKey), - ), - ) - return fmt.Sprintf("Basic %s", base64Str) -} - -func GenSign(timeStamp uint64, accessId string, secretKey, requestBody string) string { - signBody := strconv.Itoa(int(timeStamp)) + accessId + requestBody - // Create a new HMAC by defining the hash type and the key (as byte array) - h := hmac.New(sha256.New, []byte(secretKey)) - // Write Map to it - h.Write([]byte(signBody)) - - // Get result and encode as hexadecimal string - sha := hex.EncodeToString(h.Sum(nil)) - //fmt.Println() - //fmt.Println("timeStamp: " + strconv.Itoa(int(timeStamp)) + " accessID:" + accessId + " body:" + requestBody) - sEnc := b64.StdEncoding.EncodeToString([]byte(sha)) - //fmt.Println("final Result " + sEnc) - return sEnc -} diff --git a/internal/push/logic/tpns.go b/internal/push/tpns.go similarity index 98% rename from internal/push/logic/tpns.go rename to internal/push/tpns.go index 3d737f4a7..0c0b5c422 100644 --- a/internal/push/logic/tpns.go +++ b/internal/push/tpns.go @@ -1,4 +1,4 @@ -package logic +package push import ( tpns "Open_IM/internal/push/sdk/tpns-server-sdk-go/go" diff --git a/internal/rpc/conversation/conversaion.go b/internal/rpc/conversation/conversaion.go index 4cfb99cbe..5478df36f 100644 --- a/internal/rpc/conversation/conversaion.go +++ b/internal/rpc/conversation/conversaion.go @@ -8,10 +8,8 @@ import ( "Open_IM/pkg/common/db/controller" "Open_IM/pkg/common/db/relation" tableRelation "Open_IM/pkg/common/db/table/relation" - "Open_IM/pkg/common/db/unrelation" "Open_IM/pkg/common/log" - promePkg "Open_IM/pkg/common/prometheus" - "Open_IM/pkg/getcdv3" + "Open_IM/pkg/common/prome" pbConversation "Open_IM/pkg/proto/conversation" pbUser "Open_IM/pkg/proto/user" "Open_IM/pkg/utils" diff --git a/pkg/callbackstruct/push.go b/pkg/callbackstruct/push.go index 9a686b011..8e3dab706 100644 --- a/pkg/callbackstruct/push.go +++ b/pkg/callbackstruct/push.go @@ -5,13 +5,13 @@ import common "Open_IM/pkg/proto/sdkws" type CallbackBeforePushReq struct { UserStatusBatchCallbackReq *common.OfflinePushInfo - ClientMsgID string `json:"clientMsgID"` - SendID string `json:"sendID"` - GroupID string `json:"groupID"` - ContentType int32 `json:"contentType"` - SessionType int32 `json:"sessionType"` - AtUserIDList []string `json:"atUserIDList"` - Content string `json:"content"` + ClientMsgID string `json:"clientMsgID"` + SendID string `json:"sendID"` + GroupID string `json:"groupID"` + ContentType int32 `json:"contentType"` + SessionType int32 `json:"sessionType"` + AtUserIDs []string `json:"atUserIDList"` + Content string `json:"content"` } type CallbackBeforePushResp struct { @@ -21,7 +21,6 @@ type CallbackBeforePushResp struct { } type CallbackBeforeSuperGroupOnlinePushReq struct { - //*common.OfflinePushInfo UserStatusBaseCallback ClientMsgID string `json:"clientMsgID"` SendID string `json:"sendID"` @@ -30,7 +29,7 @@ type CallbackBeforeSuperGroupOnlinePushReq struct { SessionType int32 `json:"sessionType"` AtUserIDList []string `json:"atUserIDList"` Content string `json:"content"` - Seq uint32 `json:"seq"` + Seq int64 `json:"seq"` } type CallbackBeforeSuperGroupOnlinePushResp struct { diff --git a/pkg/common/http/http_client.go b/pkg/common/http/http_client.go index 16e93a328..e798beb0d 100644 --- a/pkg/common/http/http_client.go +++ b/pkg/common/http/http_client.go @@ -7,7 +7,7 @@ package http import ( - cbapi "Open_IM/pkg/callbackstruct" + "Open_IM/pkg/callbackstruct" "Open_IM/pkg/common/config" "Open_IM/pkg/common/constant" "bytes" @@ -33,19 +33,17 @@ func Get(url string) (response []byte, err error) { } // application/json; charset=utf-8 -func Post(url string, data interface{}, timeOutSecond int) (content []byte, err error) { +func Post(url string, header map[string]string, data interface{}, timeOutSecond int) (content []byte, err error) { jsonStr, err := json.Marshal(data) if err != nil { return nil, err } - req, err := http.NewRequest("POST", url, bytes.NewBuffer(jsonStr)) if err != nil { return nil, err } req.Close = true req.Header.Add("content-type", "application/json; charset=utf-8") - client := &http.Client{Timeout: time.Duration(timeOutSecond) * time.Second} resp, err := client.Do(req) if err != nil { @@ -59,11 +57,20 @@ func Post(url string, data interface{}, timeOutSecond int) (content []byte, err return result, nil } -func callBackPostReturn(url, callbackCommand string, input interface{}, output cbapi.CallbackResp, callbackConfig config.CallBackConfig) error { +func PostReturn(url string, header map[string]string, input, output interface{}, timeOutSecond int) error { + b, err := Post(url, header, input, timeOutSecond) + if err != nil { + return err + } + err = json.Unmarshal(b, output) + return err +} + +func callBackPostReturn(url, command string, input interface{}, output callbackstruct.CallbackResp, callbackConfig config.CallBackConfig) error { v := urlLib.Values{} - v.Set("callbackCommand", callbackCommand) + v.Set("callbackCommand", command) url = url + "?" + v.Encode() - b, err := Post(url, input, callbackConfig.CallbackTimeOut) + b, err := Post(url, nil, input, callbackConfig.CallbackTimeOut) if err != nil { if callbackConfig.CallbackFailedContinue != nil && *callbackConfig.CallbackFailedContinue { return constant.ErrCallbackContinue @@ -79,6 +86,6 @@ func callBackPostReturn(url, callbackCommand string, input interface{}, output c return output.Parse() } -func CallBackPostReturn(url string, req cbapi.CallbackReq, resp cbapi.CallbackResp, callbackConfig config.CallBackConfig) error { +func CallBackPostReturn(url string, req callbackstruct.CallbackReq, resp callbackstruct.CallbackResp, callbackConfig config.CallBackConfig) error { return callBackPostReturn(url, req.GetCallbackCommand(), req, resp, callbackConfig) } diff --git a/pkg/discoveryregistry/discovery_register.go b/pkg/discoveryregistry/discovery_register.go index 95284aacf..72eca2a67 100644 --- a/pkg/discoveryregistry/discovery_register.go +++ b/pkg/discoveryregistry/discovery_register.go @@ -9,6 +9,7 @@ type SvcDiscoveryRegistry interface { UnRegister() error GetConns(serviceName string, opts ...grpc.DialOption) ([]*grpc.ClientConn, error) GetConn(serviceName string, opts ...grpc.DialOption) (*grpc.ClientConn, error) + //RegisterConf(conf []byte) error //LoadConf() ([]byte, error) }