diff --git a/internal/api/msg.go b/internal/api/msg.go index b21e792db..927247cca 100644 --- a/internal/api/msg.go +++ b/internal/api/msg.go @@ -15,12 +15,16 @@ package api import ( + "encoding/base64" + "encoding/json" + "github.com/gin-gonic/gin" "github.com/go-playground/validator/v10" "github.com/mitchellh/mapstructure" "github.com/openimsdk/open-im-server/v3/pkg/apistruct" "github.com/openimsdk/open-im-server/v3/pkg/authverify" "github.com/openimsdk/open-im-server/v3/pkg/common/config" + "github.com/openimsdk/open-im-server/v3/pkg/common/webhook" "github.com/openimsdk/open-im-server/v3/pkg/rpcli" "github.com/openimsdk/protocol/constant" "github.com/openimsdk/protocol/msg" @@ -368,6 +372,83 @@ func (m *MessageApi) BatchSendMsg(c *gin.Context) { apiresp.GinSuccess(c, resp) } +func (m *MessageApi) SendSimpleMessage(c *gin.Context) { + encodedKey, ok := c.GetQuery(webhook.Key) + if !ok { + apiresp.GinError(c, errs.ErrArgs.WithDetail("missing key in query").Wrap()) + return + } + + decodedData, err := base64.StdEncoding.DecodeString(encodedKey) + if err != nil { + apiresp.GinError(c, errs.ErrArgs.WithDetail(err.Error()).Wrap()) + return + } + var ( + req apistruct.SendSingleMsgReq + keyMsgData apistruct.KeyMsgData + + sendID string + sessionType int32 + recvID string + ) + err = json.Unmarshal(decodedData, &keyMsgData) + if err != nil { + apiresp.GinError(c, errs.ErrArgs.WithDetail(err.Error()).Wrap()) + return + } + if keyMsgData.GroupID != "" { + sessionType = constant.ReadGroupChatType + sendID = req.SendID + } else { + sessionType = constant.SingleChatType + sendID = keyMsgData.RecvID + recvID = keyMsgData.SendID + } + // check param + if keyMsgData.SendID == "" { + apiresp.GinError(c, errs.ErrArgs.WithDetail("missing recvID or GroupID").Wrap()) + return + } + if sendID == "" { + apiresp.GinError(c, errs.ErrArgs.WithDetail("missing sendID").Wrap()) + return + } + + msgData := &sdkws.MsgData{ + SendID: sendID, + RecvID: recvID, + GroupID: keyMsgData.GroupID, + ClientMsgID: idutil.GetMsgIDByMD5(sendID), + SenderPlatformID: constant.AdminPlatformID, + SessionType: sessionType, + MsgFrom: constant.UserMsgType, + ContentType: constant.Text, + Content: []byte(req.Content), + OfflinePushInfo: req.OfflinePushInfo, + Ex: req.Ex, + } + + respPb, err := m.Client.SendMsg(c, &msg.SendMsgReq{MsgData: msgData}) + if err != nil { + apiresp.GinError(c, err) + return + } + + var status = constant.MsgSendSuccessed + + _, err = m.Client.SetSendMsgStatus(c, &msg.SetSendMsgStatusReq{ + Status: int32(status), + }) + + if err != nil { + apiresp.GinError(c, err) + return + } + + apiresp.GinSuccess(c, respPb) +} + func (m *MessageApi) CheckMsgIsSendSuccess(c *gin.Context) { a2r.Call(c, msg.MsgClient.GetSendMsgStatus, m.Client) } diff --git a/internal/rpc/msg/callback.go b/internal/rpc/msg/callback.go index c66dd6ca9..adf1ff735 100644 --- a/internal/rpc/msg/callback.go +++ b/internal/rpc/msg/callback.go @@ -16,7 +16,11 @@ package msg import ( "context" + "encoding/base64" + + "github.com/openimsdk/open-im-server/v3/pkg/apistruct" "github.com/openimsdk/open-im-server/v3/pkg/common/webhook" + "github.com/openimsdk/tools/utils/stringutil" cbapi "github.com/openimsdk/open-im-server/v3/pkg/callbackstruct" "github.com/openimsdk/open-im-server/v3/pkg/common/config" @@ -94,7 +98,7 @@ func (m *msgServer) webhookAfterSendSingleMsg(ctx context.Context, after *config CommonCallbackReq: toCommonCallback(ctx, msg, cbapi.CallbackAfterSendSingleMsgCommand), RecvID: msg.MsgData.RecvID, } - m.webhookClient.AsyncPost(ctx, cbReq.GetCallbackCommand(), cbReq, &cbapi.CallbackAfterSendSingleMsgResp{}, after) + m.webhookClient.AsyncPostWithQuery(ctx, cbReq.GetCallbackCommand(), cbReq, &cbapi.CallbackAfterSendSingleMsgResp{}, after, buildKeyMsgDataQuery(msg.MsgData)) } func (m *msgServer) webhookBeforeSendGroupMsg(ctx context.Context, before *config.BeforeConfig, msg *pbchat.SendMsgReq) error { @@ -128,7 +132,8 @@ func (m *msgServer) webhookAfterSendGroupMsg(ctx context.Context, after *config. CommonCallbackReq: toCommonCallback(ctx, msg, cbapi.CallbackAfterSendGroupMsgCommand), GroupID: msg.MsgData.GroupID, } - m.webhookClient.AsyncPost(ctx, cbReq.GetCallbackCommand(), cbReq, &cbapi.CallbackAfterSendGroupMsgResp{}, after) + + m.webhookClient.AsyncPostWithQuery(ctx, cbReq.GetCallbackCommand(), cbReq, &cbapi.CallbackAfterSendGroupMsgResp{}, after, buildKeyMsgDataQuery(msg.MsgData)) } func (m *msgServer) webhookBeforeMsgModify(ctx context.Context, before *config.BeforeConfig, msg *pbchat.SendMsgReq) error { @@ -192,3 +197,15 @@ func (m *msgServer) webhookAfterRevokeMsg(ctx context.Context, after *config.Aft } m.webhookClient.AsyncPost(ctx, callbackReq.GetCallbackCommand(), callbackReq, &cbapi.CallbackAfterRevokeMsgResp{}, after) } + +func buildKeyMsgDataQuery(msg *sdkws.MsgData) map[string]string { + keyMsgData := apistruct.KeyMsgData{ + SendID: msg.SendID, + RecvID: msg.RecvID, + GroupID: msg.GroupID, + } + + return map[string]string{ + webhook.Key: base64.StdEncoding.EncodeToString(stringutil.StructToJsonBytes(keyMsgData)), + } +} diff --git a/internal/rpc/msg/send.go b/internal/rpc/msg/send.go index 72654ef3b..19f4e9ffd 100644 --- a/internal/rpc/msg/send.go +++ b/internal/rpc/msg/send.go @@ -195,34 +195,3 @@ func (m *msgServer) sendMsgSingleChat(ctx context.Context, req *pbmsg.SendMsgReq }, nil } } - -func (m *msgServer) SendSimpleMsg(ctx context.Context, req *pbmsg.SendSimpleMsgReq) (*pbmsg.SendSimpleMsgResp, error) { - user, err := m.UserLocalCache.GetUserInfo(ctx, req.SendID) - if err != nil { - return nil, err - } - msgData := &sdkws.MsgData{ - SendID: req.SendID, - RecvID: req.RecvID, - GroupID: req.GroupID, - ClientMsgID: GetMsgID(req.SendID), - ServerMsgID: GetMsgID(req.SendID), - SenderPlatformID: constant.AdminPlatformID, - SenderNickname: user.Nickname, - SenderFaceURL: user.FaceURL, - SessionType: datautil.If[int32](req.RecvID == "", constant.ReadGroupChatType, constant.SingleChatType), - MsgFrom: constant.UserMsgType, - ContentType: constant.Text, - Content: req.Content, - Seq: 0, - SendTime: 0, - CreateTime: 0, - Status: 0, - IsRead: false, - Options: nil, - OfflinePushInfo: nil, - AtUserIDList: nil, - AttachedInfo: "", - Ex: "", - } -} diff --git a/pkg/apistruct/manage.go b/pkg/apistruct/manage.go index f4deb9fb1..4f1d5863f 100644 --- a/pkg/apistruct/manage.go +++ b/pkg/apistruct/manage.go @@ -111,6 +111,21 @@ type BatchSendMsgResp struct { FailedIDs []string `json:"failedUserIDs"` } +// SendSingleMsgReq defines the structure for sending a message to multiple recipients. +type SendSingleMsgReq struct { + // groupMsg should appoint sendID + SendID string `json:"sendID"` + Content string `json:"content" binding:"required"` + OfflinePushInfo *sdkws.OfflinePushInfo `json:"offlinePushInfo"` + Ex string `json:"ex"` +} + +type KeyMsgData struct { + SendID string `json:"sendID"` + RecvID string `json:"recvID"` + GroupID string `json:"groupID"` +} + // SingleReturnResult encapsulates the result of a single message send attempt. type SingleReturnResult struct { // ServerMsgID is the message identifier on the server-side. diff --git a/pkg/common/webhook/http_client.go b/pkg/common/webhook/http_client.go index b90e387d3..0cd13f6e2 100644 --- a/pkg/common/webhook/http_client.go +++ b/pkg/common/webhook/http_client.go @@ -39,6 +39,8 @@ type Client struct { const ( webhookWorkerCount = 2 webhookBufferSize = 100 + + Key = "key" ) func NewWebhookClient(url string, options ...*memamq.MemoryQueue) *Client { @@ -68,9 +70,9 @@ func (c *Client) AsyncPost(ctx context.Context, command string, req callbackstru } } -func (c *Client) AsyncPostWithKey(ctx context.Context, command string, req callbackstruct.CallbackReq, resp callbackstruct.CallbackResp, after *config.AfterConfig) { +func (c *Client) AsyncPostWithQuery(ctx context.Context, command string, req callbackstruct.CallbackReq, resp callbackstruct.CallbackResp, after *config.AfterConfig, queryParams map[string]string) { if after.Enable { - c.queue.Push(func() { c.post(ctx, command, req, resp, after.Timeout) }) + c.queue.Push(func() { c.postWithQuery(ctx, command, req, resp, after.Timeout, queryParams) }) } }