From 33ee58904ba7f21675efa263d75a79eaf5265b7c Mon Sep 17 00:00:00 2001 From: Gordon <1432970085@qq.com> Date: Mon, 27 Feb 2023 17:08:34 +0800 Subject: [PATCH] callback add --- config/config.yaml | 3 + internal/msg_transfer/logic/callback.go | 62 +++++++++++++++++++ .../logic/online_history_msg_handler.go | 4 ++ pkg/call_back_struct/message.go | 8 +++ pkg/common/config/config.go | 1 + pkg/common/constant/constant.go | 1 + 6 files changed, 79 insertions(+) create mode 100644 internal/msg_transfer/logic/callback.go diff --git a/config/config.yaml b/config/config.yaml index 050811b31..0dfb24242 100644 --- a/config/config.yaml +++ b/config/config.yaml @@ -311,6 +311,9 @@ callback: callbackAfterSendGroupMsg: enable: false callbackTimeOut: 2 + callbackAfterConsumeGroupMsg: + enable: false + callbackTimeOut: 2 callbackMsgModify: enable: false callbackTimeOut: 2 diff --git a/internal/msg_transfer/logic/callback.go b/internal/msg_transfer/logic/callback.go new file mode 100644 index 000000000..048b5c288 --- /dev/null +++ b/internal/msg_transfer/logic/callback.go @@ -0,0 +1,62 @@ +package logic + +import ( + cbApi "Open_IM/pkg/call_back_struct" + "Open_IM/pkg/common/callback" + "Open_IM/pkg/common/config" + "Open_IM/pkg/common/constant" + "Open_IM/pkg/common/http" + "Open_IM/pkg/common/log" + pbChat "Open_IM/pkg/proto/msg" + "Open_IM/pkg/utils" + http2 "net/http" +) + +func callbackAfterConsumeGroupMsg(msg []*pbChat.MsgDataToMQ, triggerID string) cbApi.CommonCallbackResp { + callbackResp := cbApi.CommonCallbackResp{OperationID: triggerID} + if !config.Config.Callback.CallbackAfterConsumeGroupMsg.Enable { + return callbackResp + } + for _, v := range msg { + if v.MsgData.SessionType == constant.SuperGroupChatType || v.MsgData.SessionType == constant.GroupChatType { + commonCallbackReq := copyCallbackCommonReqStruct(v) + commonCallbackReq.CallbackCommand = constant.CallbackAfterConsumeGroupMsgCommand + req := cbApi.CallbackAfterConsumeGroupMsgReq{ + CommonCallbackReq: commonCallbackReq, + GroupID: v.MsgData.GroupID, + } + resp := &cbApi.CallbackAfterConsumeGroupMsgResp{CommonCallbackResp: &callbackResp} + defer log.NewDebug(triggerID, utils.GetSelfFuncName(), req, *resp) + if err := http.CallBackPostReturn(config.Config.Callback.CallbackUrl, constant.CallbackAfterConsumeGroupMsgCommand, req, resp, config.Config.Callback.CallbackAfterConsumeGroupMsg.CallbackTimeOut); err != nil { + callbackResp.ErrCode = http2.StatusInternalServerError + callbackResp.ErrMsg = err.Error() + return callbackResp + } + } + } + + log.NewDebug(triggerID, utils.GetSelfFuncName(), msg) + + return callbackResp +} +func copyCallbackCommonReqStruct(msg *pbChat.MsgDataToMQ) cbApi.CommonCallbackReq { + req := cbApi.CommonCallbackReq{ + SendID: msg.MsgData.SendID, + ServerMsgID: msg.MsgData.ServerMsgID, + ClientMsgID: msg.MsgData.ClientMsgID, + OperationID: msg.OperationID, + SenderPlatformID: msg.MsgData.SenderPlatformID, + SenderNickname: msg.MsgData.SenderNickname, + SessionType: msg.MsgData.SessionType, + MsgFrom: msg.MsgData.MsgFrom, + ContentType: msg.MsgData.ContentType, + Status: msg.MsgData.Status, + CreateTime: msg.MsgData.CreateTime, + AtUserIDList: msg.MsgData.AtUserIDList, + SenderFaceURL: msg.MsgData.SenderFaceURL, + Content: callback.GetContent(msg.MsgData), + Seq: msg.MsgData.Seq, + Ex: msg.MsgData.Ex, + } + return req +} diff --git a/internal/msg_transfer/logic/online_history_msg_handler.go b/internal/msg_transfer/logic/online_history_msg_handler.go index af22e9259..26c0da5f4 100644 --- a/internal/msg_transfer/logic/online_history_msg_handler.go +++ b/internal/msg_transfer/logic/online_history_msg_handler.go @@ -111,6 +111,10 @@ func (och *OnlineHistoryRedisConsumerHandler) Run(channelID int) { singleMsgSuccessCountMutex.Lock() singleMsgSuccessCount += uint64(len(storageMsgList)) singleMsgSuccessCountMutex.Unlock() + callbackResp := callbackAfterConsumeGroupMsg(storageMsgList, triggerID) + if callbackResp.ErrCode != 0 { + log.NewError(triggerID, utils.GetSelfFuncName(), "callbackAfterConsumeGroupMsg resp: ", callbackResp) + } och.SendMessageToMongoCH(msgChannelValue.aggregationID, triggerID, storageMsgList, lastSeq) for _, v := range storageMsgList { diff --git a/pkg/call_back_struct/message.go b/pkg/call_back_struct/message.go index 2fcb2b3aa..2eb8034bd 100644 --- a/pkg/call_back_struct/message.go +++ b/pkg/call_back_struct/message.go @@ -41,6 +41,14 @@ type CallbackAfterSendGroupMsgResp struct { *CommonCallbackResp } +type CallbackAfterConsumeGroupMsgReq struct { + CommonCallbackReq + GroupID string `json:"groupID"` +} + +type CallbackAfterConsumeGroupMsgResp struct { + *CommonCallbackResp +} type CallbackMsgModifyCommandReq struct { CommonCallbackReq } diff --git a/pkg/common/config/config.go b/pkg/common/config/config.go index 1913bac79..90be70fc7 100644 --- a/pkg/common/config/config.go +++ b/pkg/common/config/config.go @@ -287,6 +287,7 @@ type config struct { CallbackAfterSendSingleMsg callBackConfig `yaml:"callbackAfterSendSingleMsg"` CallbackBeforeSendGroupMsg callBackConfig `yaml:"callbackBeforeSendGroupMsg"` CallbackAfterSendGroupMsg callBackConfig `yaml:"callbackAfterSendGroupMsg"` + CallbackAfterConsumeGroupMsg callBackConfig `yaml:"callbackAfterConsumeGroupMsg"` CallbackMsgModify callBackConfig `yaml:"callbackMsgModify"` CallbackUserOnline callBackConfig `yaml:"callbackUserOnline"` CallbackUserOffline callBackConfig `yaml:"callbackUserOffline"` diff --git a/pkg/common/constant/constant.go b/pkg/common/constant/constant.go index c189cf709..9a24b694e 100644 --- a/pkg/common/constant/constant.go +++ b/pkg/common/constant/constant.go @@ -210,6 +210,7 @@ const ( CallbackAfterSendSingleMsgCommand = "callbackAfterSendSingleMsgCommand" CallbackBeforeSendGroupMsgCommand = "callbackBeforeSendGroupMsgCommand" CallbackAfterSendGroupMsgCommand = "callbackAfterSendGroupMsgCommand" + CallbackAfterConsumeGroupMsgCommand = "callbackAfterConsumeGroupMsgCommand" CallbackMsgModifyCommand = "callbackMsgModifyCommand" CallbackUserOnlineCommand = "callbackUserOnlineCommand" CallbackUserOfflineCommand = "callbackUserOfflineCommand"