diff --git a/cmd/rpc/open_im_msg/filters/mock.go b/cmd/rpc/open_im_msg/filters/mock.go new file mode 100644 index 000000000..17eccd5c4 --- /dev/null +++ b/cmd/rpc/open_im_msg/filters/mock.go @@ -0,0 +1,39 @@ +package filters + +import ( + rpcChat "Open_IM/internal/rpc/msg" + "Open_IM/pkg/common/constant" + pbChat "Open_IM/pkg/proto/chat" + "errors" + "fmt" +) + +func MockBeforeSendFilter1(ctx *rpcChat.SendContext, pb *pbChat.SendMsgReq) (*pbChat.SendMsgResp, bool, error) { + ctxKey := "test_key" + v := true + fmt.Printf("MockBeforeSendFilter1:%s set value to ctx,value is :%v\n", ctxKey, v) + ctx.WithValue(ctxKey, v) + + return nil, true, nil +} + +// MockBeforeSendFilter is a mock handle that handles custom logic before send msg. +func MockBeforeSendFilter2(ctx *rpcChat.SendContext, pb *pbChat.SendMsgReq) (*pbChat.SendMsgResp, bool, error) { + ctxKey := "test_key" + v, ok := ctx.Value(ctxKey).(bool) + if ok { + fmt.Printf("MockBeforeSendFilter2:%s selected from ctx,value is :%v\n", ctxKey, v) + } + + fmt.Printf("MockBeforeSendHandler trigger,contentType:%d\n", pb.MsgData.GetContentType()) + if pb.MsgData.ContentType == constant.Text { + msg := string(pb.MsgData.Content) + fmt.Printf("text msg:%s", msg) + if msg == "this is a m..m..mock msg" { + fmt.Println(".==>msg had banned") + return nil, false, errors.New("BANG! This msg has been banned by MockBeforeSendHandler") + } + } + + return nil, true, nil +} diff --git a/cmd/rpc/open_im_msg/main.go b/cmd/rpc/open_im_msg/main.go index 6c689033a..382ed3b6e 100644 --- a/cmd/rpc/open_im_msg/main.go +++ b/cmd/rpc/open_im_msg/main.go @@ -1,7 +1,7 @@ package main import ( - "Open_IM/cmd/rpc/open_im_msg/widget" + "Open_IM/cmd/rpc/open_im_msg/filters" rpcChat "Open_IM/internal/rpc/msg" "flag" ) @@ -10,10 +10,10 @@ func main() { rpcPort := flag.Int("port", 10300, "rpc listening port") flag.Parse() rpcServer := rpcChat.NewRpcChatServer(*rpcPort) - // register widgets + // register filters // mock 注册发送前的拦截器 - rpcServer.UseWidgetBeforSend(widget.MockBeforeSendHandler) + rpcServer.UseBeforSendFilters(filters.MockBeforeSendFilter1, filters.MockBeforeSendFilter2) // rpcServer.Run() diff --git a/cmd/rpc/open_im_msg/widget/widget.go b/cmd/rpc/open_im_msg/widget/widget.go deleted file mode 100644 index 92d3c5e50..000000000 --- a/cmd/rpc/open_im_msg/widget/widget.go +++ /dev/null @@ -1,24 +0,0 @@ -package widget - -import ( - "Open_IM/pkg/common/constant" - pbChat "Open_IM/pkg/proto/chat" - "context" - "errors" - "fmt" -) - -// MockBeforeSendHandler is a mock handle that handles custom logic before send msg. -func MockBeforeSendHandler(ctx context.Context, pb *pbChat.SendMsgReq) (*pbChat.SendMsgResp, bool, error) { - fmt.Printf("MockBeforeSendHandler trigger,contentType:%d\n", pb.MsgData.GetContentType()) - if pb.MsgData.ContentType == constant.Text { - msg := string(pb.MsgData.Content) - fmt.Printf("text msg:%s", msg) - if msg == "this is a m..m..mock msg" { - fmt.Println(".==>msg had banned") - return nil, false, errors.New("BANG! This msg has been banned by MockBeforeSendHandler") - } - } - - return nil, true, nil -} diff --git a/internal/rpc/msg/context.go b/internal/rpc/msg/context.go new file mode 100644 index 000000000..00f41c8f5 --- /dev/null +++ b/internal/rpc/msg/context.go @@ -0,0 +1,69 @@ +package msg + +import ( + pbChat "Open_IM/pkg/proto/chat" + "context" + "net/http" +) + +// SendContext is the most important part of RPC SendMsg. It allows us to pass variables between middleware +type SendContext struct { + ctx context.Context + rpc *rpcChat + // beforeFilters are filters which will be triggered before send msg + beforeFilters []BeforeSendFilter +} + +func NewSendContext(ctx context.Context, rpc *rpcChat) *SendContext { + return &SendContext{ + ctx: ctx, + rpc: rpc, + beforeFilters: rpc.beforeSenders, + } +} + +func (c *SendContext) SetCtx(ctx context.Context) { + c.ctx = ctx +} + +func (c *SendContext) Value(key interface{}) interface{} { + return c.ctx.Value(key) +} + +func (c *SendContext) WithValue(key, val interface{}) { + ctx := context.WithValue(c.ctx, key, val) + c.SetCtx(ctx) +} + +// doBeforeFilters executes the pending filters in the chain inside the calling handler. +func (c *SendContext) doBeforeFilters(pb *pbChat.SendMsgReq) (*pbChat.SendMsgResp, bool, error) { + for _, handler := range c.beforeFilters { + res, ok, err := handler(c, pb) + if err != nil { + return nil, false, err + } + if !ok { + return res, ok, nil + } + } + + return nil, true, nil +} + +func (c *SendContext) SendMsg(pb *pbChat.SendMsgReq) (*pbChat.SendMsgResp, error) { + replay := pbChat.SendMsgResp{} + res, ok, err := c.doBeforeFilters(pb) + if err != nil { + return returnMsg(&replay, pb, http.StatusInternalServerError, http.StatusText(http.StatusInternalServerError), err.Error(), 0) + } + if !ok { + return res, nil + } + + res, err = c.rpc.doSendMsg(c.ctx, pb) + if err != nil { + return res, err + } + + return nil, nil +} diff --git a/internal/rpc/msg/filters.go b/internal/rpc/msg/filters.go new file mode 100644 index 000000000..9b6bc98ed --- /dev/null +++ b/internal/rpc/msg/filters.go @@ -0,0 +1,6 @@ +package msg + +import pbChat "Open_IM/pkg/proto/chat" + +// BeforeSendFilter handles custom logic before send msg. +type BeforeSendFilter func(ctx *SendContext, pb *pbChat.SendMsgReq) (*pbChat.SendMsgResp, bool, error) diff --git a/internal/rpc/msg/rpcChat.go b/internal/rpc/msg/rpcChat.go index e2a96d2b8..ea9cfb115 100644 --- a/internal/rpc/msg/rpcChat.go +++ b/internal/rpc/msg/rpcChat.go @@ -1,14 +1,12 @@ package msg import ( - "Open_IM/internal/rpc/msg/widget" "Open_IM/pkg/common/config" "Open_IM/pkg/common/kafka" "Open_IM/pkg/common/log" "Open_IM/pkg/grpc-etcdv3/getcdv3" pbChat "Open_IM/pkg/proto/chat" "Open_IM/pkg/utils" - "context" "net" "strconv" "strings" @@ -24,7 +22,7 @@ type rpcChat struct { etcdAddr []string producer *kafka.Producer // beforeSenders are filters which will be triggered before send msg - beforeSenders []widget.BeforeSendHandler + beforeSenders []BeforeSendFilter } func NewRpcChatServer(port int) *rpcChat { @@ -39,24 +37,11 @@ func NewRpcChatServer(port int) *rpcChat { return &rc } -func (rpc *rpcChat) UseWidgetBeforSend(hs ...widget.BeforeSendHandler) { +// UseBeforSendFilters attaches a global filter to the BeforSendFilter logic +func (rpc *rpcChat) UseBeforSendFilters(hs ...BeforeSendFilter) { rpc.beforeSenders = append(rpc.beforeSenders, hs...) } -func (rpc *rpcChat) callWidgetBeforeSend(ctx context.Context, pb *pbChat.SendMsgReq) (*pbChat.SendMsgResp, bool, error) { - for _, handler := range rpc.beforeSenders { - res, ok, err := handler(ctx, pb) - if err != nil { - return nil, false, err - } - if !ok { - return res, ok, nil - } - } - - return nil, true, nil -} - func (rpc *rpcChat) Run() { log.Info("", "", "rpc get_token init...") diff --git a/internal/rpc/msg/send_msg.go b/internal/rpc/msg/send_msg.go index d21f4736d..59556535b 100644 --- a/internal/rpc/msg/send_msg.go +++ b/internal/rpc/msg/send_msg.go @@ -104,17 +104,14 @@ func (rpc *rpcChat) encapsulateMsgData(msg *sdk_ws.MsgData) { } } func (rpc *rpcChat) SendMsg(ctx context.Context, pb *pbChat.SendMsgReq) (*pbChat.SendMsgResp, error) { + c := NewSendContext(ctx, rpc) + return c.SendMsg(pb) +} + +func (rpc *rpcChat) doSendMsg(ctx context.Context, pb *pbChat.SendMsgReq) (*pbChat.SendMsgResp, error) { replay := pbChat.SendMsgResp{} log.NewDebug(pb.OperationID, "rpc sendMsg come here", pb.String()) - res, ok, err := rpc.callWidgetBeforeSend(ctx, pb) - if err != nil { - return returnMsg(&replay, pb, http.StatusInternalServerError, http.StatusText(http.StatusInternalServerError), err.Error(), 0) - } - if !ok { - return res, nil - } - userRelationshipVerification(pb) //if !utils.VerifyToken(pb.Token, pb.SendID) { // return returnMsg(&replay, pb, http.StatusUnauthorized, "token validate err,not authorized", "", 0)