From bcae7720e104335e6c11ad8f4f4ab6c99bdf460a Mon Sep 17 00:00:00 2001 From: wangchuxiao Date: Fri, 4 Mar 2022 14:27:54 +0800 Subject: [PATCH] add callback --- cmd/open_im_api/main.go | 2 +- config/config.yaml | 14 +++++++------- internal/api/third/minio_init.go | 2 +- internal/rpc/msg/callback.go | 10 ++++++---- internal/rpc/msg/send_msg.go | 25 ++++++++++++++----------- pkg/call_back_struct/common.go | 6 +++--- pkg/common/constant/constant.go | 10 +++++----- 7 files changed, 37 insertions(+), 32 deletions(-) diff --git a/cmd/open_im_api/main.go b/cmd/open_im_api/main.go index d5c01986b..551a37a0e 100644 --- a/cmd/open_im_api/main.go +++ b/cmd/open_im_api/main.go @@ -102,7 +102,7 @@ func main() { conversationGroup.POST("/get_receive_message_opt", conversation.GetReceiveMessageOpt) //1 conversationGroup.POST("/get_all_conversation_message_opt", conversation.GetAllConversationMessageOpt) //1 } - + apiThird.MinioInit() log.NewPrivateLog("api") ginPort := flag.Int("port", 10000, "get ginServerPort from cmd,default 10000 as port") flag.Parse() diff --git a/config/config.yaml b/config/config.yaml index 4d3990194..7d326758d 100644 --- a/config/config.yaml +++ b/config/config.yaml @@ -91,8 +91,8 @@ credential: #腾讯cos,发送图片、视频、文件时需要,请自行申 bucket: openim location: us-east-1 endpoint: http://127.0.0.1:9000 - accessKeyID: minioadmin - secretAccessKey: minioadmin + accessKeyID: user12345 + secretAccessKey: key12345 rpcport: #rpc服务端口 默认即可 @@ -196,21 +196,21 @@ callback: callbackUrl : "http://127.0.0.1:8080/callback" # 开启关闭操作前后回调的配置 callbackbeforeSendSingleMsg: - enable: false # 回调是否启用 + enable: true # 回调是否启用 callbackTimeOut: 2 # 回调超时时间 CallbackFailedContinue: true # 回调超时是否继续执行代码 callbackAfterSendSingleMsg: - enable: false + enable: true callbackTimeOut: 2 callbackBeforeSendGroupMsg: - enable: false + enable: true callbackTimeOut: 2 CallbackFailedContinue: true callbackAfterSendGroupMsg: - enable: false + enable: true callbackTimeOut: 2 callbackWordFilter: - enable: false + enable: true callbackTimeOut: 2 CallbackFailedContinue: true diff --git a/internal/api/third/minio_init.go b/internal/api/third/minio_init.go index 8dae26d9f..29338ee77 100644 --- a/internal/api/third/minio_init.go +++ b/internal/api/third/minio_init.go @@ -10,7 +10,7 @@ import ( url2 "net/url" ) -func init() { +func MinioInit() { minioUrl, err := url2.Parse(config.Config.Credential.Minio.Endpoint) if err != nil { log.NewError("", utils.GetSelfFuncName(), "parse failed, please check config/config.yaml", err.Error()) diff --git a/internal/rpc/msg/callback.go b/internal/rpc/msg/callback.go index 54ff93786..c560fff95 100644 --- a/internal/rpc/msg/callback.go +++ b/internal/rpc/msg/callback.go @@ -113,10 +113,10 @@ func callbackAfterSendGroupMsg(msg *pbChat.SendMsgReq) error { } log.NewDebug(msg.OperationID, utils.GetSelfFuncName(), msg) commonCallbackReq := copyCallbackCommonReqStruct(msg) - commonCallbackReq.CallbackCommand = constant.CallbackBeforeSendGroupMsgCommand - req := cbApi.CallbackBeforeSendSingleMsgReq{ + commonCallbackReq.CallbackCommand = constant.CallbackAfterSendGroupMsgCommand + req := cbApi.CallbackAfterSendGroupMsgReq{ CommonCallbackReq: commonCallbackReq, - RecvID: msg.MsgData.RecvID, + GroupID: msg.MsgData.GroupID, } resp := &cbApi.CallbackAfterSendGroupMsgResp{CommonCallbackResp: cbApi.CommonCallbackResp{}} @@ -152,7 +152,9 @@ func callbackWordFilter(msg *pbChat.SendMsgReq) (canSend bool, err error) { if resp.ActionCode == constant.ActionForbidden && resp.ErrCode == constant.CallbackHandleSuccess { return false, nil } - msg.MsgData.Content = []byte(resp.Content) + if resp.ErrCode == constant.CallbackHandleSuccess { + msg.MsgData.Content = []byte(resp.Content) + } log.NewDebug(msg.OperationID, utils.GetSelfFuncName(), string(msg.MsgData.Content)) } return true, nil diff --git a/internal/rpc/msg/send_msg.go b/internal/rpc/msg/send_msg.go index 16720575a..739a28171 100644 --- a/internal/rpc/msg/send_msg.go +++ b/internal/rpc/msg/send_msg.go @@ -128,21 +128,22 @@ func (rpc *rpcChat) SendMsg(_ context.Context, pb *pbChat.SendMsgReq) (*pbChat.S // callback canSend, err := callbackWordFilter(pb) if err != nil { - log.NewError(pb.OperationID, utils.GetSelfFuncName(), "CallbackBeforeSendMsg failed", err.Error(), pb.MsgData) + log.NewError(pb.OperationID, utils.GetSelfFuncName(), "callbackWordFilter failed", err.Error(), pb.MsgData) } if !canSend { - log.NewDebug(pb.OperationID, utils.GetSelfFuncName(), "callback result", canSend, "end rpc and return", pb.MsgData) - return returnMsg(&replay, pb, 201, "callback result stop rpc and return", "", 0) + log.NewDebug(pb.OperationID, utils.GetSelfFuncName(), "callbackWordFilter result", canSend, "end rpc and return", pb.MsgData) + return returnMsg(&replay, pb, 201, "callbackWordFilter result stop rpc and return", "", 0) } switch pb.MsgData.SessionType { case constant.SingleChatType: + // callback canSend, err := callbackBeforeSendSingleMsg(pb) if err != nil { - log.NewError(pb.OperationID, utils.GetSelfFuncName(), "CallBackAfterSendMsg failed", err.Error()) + log.NewError(pb.OperationID, utils.GetSelfFuncName(), "callbackBeforeSendSingleMsg failed", err.Error()) } if !canSend { - log.NewDebug(pb.OperationID, utils.GetSelfFuncName(), "callback result", canSend, "end rpc and return") - return returnMsg(&replay, pb, 201, "callback result stop rpc and return", "", 0) + log.NewDebug(pb.OperationID, utils.GetSelfFuncName(), "callbackBeforeSendSingleMsg result", canSend, "end rpc and return") + return returnMsg(&replay, pb, 201, "callbackBeforeSendSingleMsg result stop rpc and return", "", 0) } isSend := modifyMessageByUserMessageReceiveOpt(pb.MsgData.RecvID, pb.MsgData.SendID, constant.SingleChatType, pb) if isSend { @@ -161,18 +162,20 @@ func (rpc *rpcChat) SendMsg(_ context.Context, pb *pbChat.SendMsgReq) (*pbChat.S return returnMsg(&replay, pb, 201, "kafka send msg err", "", 0) } } + // callback if err := callbackAfterSendSingleMsg(pb); err != nil { - log.NewError(pb.OperationID, utils.GetSelfFuncName(), "CallBackAfterSendMsg failed", err.Error()) + log.NewError(pb.OperationID, utils.GetSelfFuncName(), "callbackAfterSendSingleMsg failed", err.Error()) } return returnMsg(&replay, pb, 0, "", msgToMQ.MsgData.ServerMsgID, msgToMQ.MsgData.SendTime) case constant.GroupChatType: + // callback canSend, err := callbackBeforeSendGroupMsg(pb) if err != nil { - log.NewError(pb.OperationID, utils.GetSelfFuncName(), "CallBackAfterSendMsg failed", err.Error()) + log.NewError(pb.OperationID, utils.GetSelfFuncName(), "callbackBeforeSendGroupMsg failed", err.Error()) } if !canSend { - log.NewDebug(pb.OperationID, utils.GetSelfFuncName(), "callback result", canSend, "end rpc and return") - return returnMsg(&replay, pb, 201, "callback result stop rpc and return", "", 0) + log.NewDebug(pb.OperationID, utils.GetSelfFuncName(), "callbackBeforeSendGroupMsg result", canSend, "end rpc and return") + return returnMsg(&replay, pb, 201, "callbackBeforeSendGroupMsg result stop rpc and return", "", 0) } etcdConn := getcdv3.GetConn(config.Config.Etcd.EtcdSchema, strings.Join(config.Config.Etcd.EtcdAddr, ","), config.Config.RpcRegisterName.OpenImGroupName) client := pbGroup.NewGroupClient(etcdConn) @@ -237,8 +240,8 @@ func (rpc *rpcChat) SendMsg(_ context.Context, pb *pbChat.SendMsgReq) (*pbChat.S return returnMsg(&replay, pb, 201, "kafka send msg err", "", 0) } } - } + // callback if err := callbackAfterSendGroupMsg(pb); err != nil { log.NewError(pb.OperationID, utils.GetSelfFuncName(), "CallBackAfterSendMsg failed", err.Error()) } diff --git a/pkg/call_back_struct/common.go b/pkg/call_back_struct/common.go index 5e33889fe..e2bf510f4 100644 --- a/pkg/call_back_struct/common.go +++ b/pkg/call_back_struct/common.go @@ -3,13 +3,13 @@ package call_back_struct type CommonCallbackReq struct { SendID string `json:"sendID"` CallbackCommand string `json:"callbackCommand"` - ServerMsgID string `json:"serverID"` - ClientMsgID string `json:"clientID"` + ServerMsgID string `json:"serverMsgID"` + ClientMsgID string `json:"clientMsgID"` OperationID string `json:"operationID"` SenderPlatformID int32 `json:"senderPlatformID"` SenderNickname string `json:"senderNickname"` SessionType int32 `json:"sessionType"` - MsgFrom int32 `json:"MsgFrom"` + MsgFrom int32 `json:"msgFrom"` ContentType int32 `json:"contentType"` Status int32 `json:"status"` CreateTime int64 `json:"createTime"` diff --git a/pkg/common/constant/constant.go b/pkg/common/constant/constant.go index fa8646d5e..199a1917a 100644 --- a/pkg/common/constant/constant.go +++ b/pkg/common/constant/constant.go @@ -138,15 +138,15 @@ const ( VerificationCodeForResetSuffix = "_forReset" //callbackCommand - CallbackBeforeSendSingleMsgCommand = "callbackBeforeSendSingleMsg" - CallbackAfterSendSingleMsgCommand = "callbackAfterSendSingleMsg" - CallbackBeforeSendGroupMsgCommand = "callbackBeforeSendGroupMsg" + CallbackBeforeSendSingleMsgCommand = "callbackBeforeSendSingleMsgCommand" + CallbackAfterSendSingleMsgCommand = "callbackAfterSendSingleMsgCommand" + CallbackBeforeSendGroupMsgCommand = "callbackBeforeSendGroupMsgCommand" CallbackAfterSendGroupMsgCommand = "callbackAfterSendGroupMsgCommand" CallbackWordFilterCommand = "callbackWordFilterCommand" - //actionCode + //callback actionCode ActionAllow = 0 ActionForbidden = 1 - //callbackHandleCode + //callback callbackHandleCode CallbackHandleSuccess = 0 CallbackHandleFailed = 1 )