diff --git a/cmd/rpc/open_im_msg/main.go b/cmd/rpc/open_im_msg/main.go index 7344facf4..2c6d785d7 100644 --- a/cmd/rpc/open_im_msg/main.go +++ b/cmd/rpc/open_im_msg/main.go @@ -1,6 +1,7 @@ package main import ( + "Open_IM/cmd/rpc/open_im_msg/widget" rpcChat "Open_IM/internal/rpc/msg" "flag" ) @@ -9,5 +10,11 @@ func main() { rpcPort := flag.Int("port", 10300, "rpc listening port") flag.Parse() rpcServer := rpcChat.NewRpcChatServer(*rpcPort) + // register widgets + + // mock + rpcServer.UseWidgetBeforSend(widget.MockBeforeSendHandler) + + // rpcServer.Run() } diff --git a/cmd/rpc/open_im_msg/widget/widget.go b/cmd/rpc/open_im_msg/widget/widget.go new file mode 100644 index 000000000..9f9224a64 --- /dev/null +++ b/cmd/rpc/open_im_msg/widget/widget.go @@ -0,0 +1,20 @@ +package widget + +import ( + "Open_IM/pkg/common/constant" + pbChat "Open_IM/pkg/proto/chat" + "context" + "errors" +) + +// MockBeforeSendHandler is a mock handle that handles custom logic before send msg. +func MockBeforeSendHandler(ctx context.Context, pb *pbChat.SendMsgReq) (*pbChat.SendMsgResp, bool, error) { + if pb.MsgData.ContentType == constant.Text { + msg := string(pb.MsgData.Content) + if msg == "this is a m..m..mock msg" { + return nil, false, errors.New("BANG! This msg has been banned by MockBeforeSendHandler") + } + } + + return nil, true, nil +} diff --git a/internal/rpc/msg/rpcChat.go b/internal/rpc/msg/rpcChat.go index ef3bb6083..e2a96d2b8 100644 --- a/internal/rpc/msg/rpcChat.go +++ b/internal/rpc/msg/rpcChat.go @@ -1,12 +1,14 @@ package msg import ( + "Open_IM/internal/rpc/msg/widget" "Open_IM/pkg/common/config" "Open_IM/pkg/common/kafka" "Open_IM/pkg/common/log" "Open_IM/pkg/grpc-etcdv3/getcdv3" pbChat "Open_IM/pkg/proto/chat" "Open_IM/pkg/utils" + "context" "net" "strconv" "strings" @@ -21,6 +23,8 @@ type rpcChat struct { etcdSchema string etcdAddr []string producer *kafka.Producer + // beforeSenders are filters which will be triggered before send msg + beforeSenders []widget.BeforeSendHandler } func NewRpcChatServer(port int) *rpcChat { @@ -35,6 +39,24 @@ func NewRpcChatServer(port int) *rpcChat { return &rc } +func (rpc *rpcChat) UseWidgetBeforSend(hs ...widget.BeforeSendHandler) { + rpc.beforeSenders = append(rpc.beforeSenders, hs...) +} + +func (rpc *rpcChat) callWidgetBeforeSend(ctx context.Context, pb *pbChat.SendMsgReq) (*pbChat.SendMsgResp, bool, error) { + for _, handler := range rpc.beforeSenders { + res, ok, err := handler(ctx, pb) + if err != nil { + return nil, false, err + } + if !ok { + return res, ok, nil + } + } + + return nil, true, nil +} + func (rpc *rpcChat) Run() { log.Info("", "", "rpc get_token init...") diff --git a/internal/rpc/msg/send_msg.go b/internal/rpc/msg/send_msg.go index 114746da2..d21f4736d 100644 --- a/internal/rpc/msg/send_msg.go +++ b/internal/rpc/msg/send_msg.go @@ -13,13 +13,14 @@ import ( "Open_IM/pkg/utils" "context" "encoding/json" - "github.com/garyburd/redigo/redis" - "github.com/golang/protobuf/proto" "math/rand" "net/http" "strconv" "strings" "time" + + "github.com/garyburd/redigo/redis" + "github.com/golang/protobuf/proto" ) type MsgCallBackReq struct { @@ -102,9 +103,18 @@ func (rpc *rpcChat) encapsulateMsgData(msg *sdk_ws.MsgData) { } } -func (rpc *rpcChat) SendMsg(_ context.Context, pb *pbChat.SendMsgReq) (*pbChat.SendMsgResp, error) { +func (rpc *rpcChat) SendMsg(ctx context.Context, pb *pbChat.SendMsgReq) (*pbChat.SendMsgResp, error) { replay := pbChat.SendMsgResp{} log.NewDebug(pb.OperationID, "rpc sendMsg come here", pb.String()) + + res, ok, err := rpc.callWidgetBeforeSend(ctx, pb) + if err != nil { + return returnMsg(&replay, pb, http.StatusInternalServerError, http.StatusText(http.StatusInternalServerError), err.Error(), 0) + } + if !ok { + return res, nil + } + userRelationshipVerification(pb) //if !utils.VerifyToken(pb.Token, pb.SendID) { // return returnMsg(&replay, pb, http.StatusUnauthorized, "token validate err,not authorized", "", 0) diff --git a/internal/rpc/msg/widget/before_sender.go b/internal/rpc/msg/widget/before_sender.go new file mode 100644 index 000000000..465585acf --- /dev/null +++ b/internal/rpc/msg/widget/before_sender.go @@ -0,0 +1,10 @@ +package widget + +import ( + "context" + + pbChat "Open_IM/pkg/proto/chat" +) + +// BeforeSendHandler handles custom logic before send msg. +type BeforeSendHandler func(ctx context.Context, pb *pbChat.SendMsgReq) (*pbChat.SendMsgResp, bool, error)