From 7a6568a4fee88d37941e5b687173a9d0b67e04e7 Mon Sep 17 00:00:00 2001 From: ltf Date: Mon, 28 Feb 2022 14:21:05 +0800 Subject: [PATCH] feat add after filter support on sendMsg --- internal/rpc/msg/context.go | 85 ++++++++++++++++++++---- internal/rpc/msg/filters.go | 3 + internal/rpc/msg/rpcChat.go | 8 +++ internal/rpc/msg/widget/before_sender.go | 10 --- 4 files changed, 82 insertions(+), 24 deletions(-) delete mode 100644 internal/rpc/msg/widget/before_sender.go diff --git a/internal/rpc/msg/context.go b/internal/rpc/msg/context.go index 3116cf725..0701a9bd8 100644 --- a/internal/rpc/msg/context.go +++ b/internal/rpc/msg/context.go @@ -4,14 +4,19 @@ import ( pbChat "Open_IM/pkg/proto/chat" "context" "net/http" + "time" ) +var _ context.Context = (*SendContext)(nil) + // SendContext is the most important part of RPC SendMsg. It allows us to pass variables between middleware type SendContext struct { ctx context.Context rpc *rpcChat // beforeFilters are filters which will be triggered before send msg beforeFilters []BeforeSendFilter + // afterSenders are filters which will be triggered after send msg + afterSenders []AfterSendFilter } func NewSendContext(ctx context.Context, rpc *rpcChat) *SendContext { @@ -19,22 +24,10 @@ func NewSendContext(ctx context.Context, rpc *rpcChat) *SendContext { ctx: ctx, rpc: rpc, beforeFilters: rpc.beforeSenders, + afterSenders: rpc.afterSenders, } } -func (c *SendContext) SetCtx(ctx context.Context) { - c.ctx = ctx -} - -func (c *SendContext) Value(key interface{}) interface{} { - return c.ctx.Value(key) -} - -func (c *SendContext) WithValue(key, val interface{}) { - ctx := context.WithValue(c.ctx, key, val) - c.SetCtx(ctx) -} - // doBeforeFilters executes the pending filters in the chain inside the calling handler. func (c *SendContext) doBeforeFilters(pb *pbChat.SendMsgReq) (*pbChat.SendMsgResp, bool, error) { for _, handler := range c.beforeFilters { @@ -50,6 +43,21 @@ func (c *SendContext) doBeforeFilters(pb *pbChat.SendMsgReq) (*pbChat.SendMsgRes return nil, true, nil } +// doAfterFilters executes the pending filters in the chain inside the calling handler. +func (c *SendContext) doAfterFilters(req *pbChat.SendMsgReq, res *pbChat.SendMsgResp) (*pbChat.SendMsgResp, bool, error) { + for _, handler := range c.afterSenders { + res, ok, err := handler(c, req, res) + if err != nil { + return nil, false, err + } + if !ok { + return res, ok, nil + } + } + + return nil, true, nil +} + func (c *SendContext) SendMsg(pb *pbChat.SendMsgReq) (*pbChat.SendMsgResp, error) { replay := pbChat.SendMsgResp{} res, ok, err := c.doBeforeFilters(pb) @@ -67,5 +75,54 @@ func (c *SendContext) SendMsg(pb *pbChat.SendMsgReq) (*pbChat.SendMsgResp, error return res, err } - return nil, nil + res, ok, err = c.doAfterFilters(pb, res) + if err != nil { + return returnMsg(&replay, pb, http.StatusInternalServerError, http.StatusText(http.StatusInternalServerError), err.Error(), 0) + } + if !ok { + return res, nil + } + + return res, nil +} + +func (c *SendContext) SetCtx(ctx context.Context) { + c.ctx = ctx +} + +func (c *SendContext) WithValue(key, val interface{}) { + ctx := context.WithValue(c.ctx, key, val) + c.SetCtx(ctx) +} + +/************************************/ +/***** context *****/ +/************************************/ + +// Deadline returns that there is no deadline (ok==false) when c has no Context. +func (c *SendContext) Deadline() (deadline time.Time, ok bool) { + if c.ctx == nil { + return + } + return c.ctx.Deadline() +} + +// Done returns nil (chan which will wait forever) when c has no Context. +func (c *SendContext) Done() <-chan struct{} { + if c.ctx == nil { + return nil + } + return c.ctx.Done() +} + +// Err returns nil when c has no Context. +func (c *SendContext) Err() error { + if c.ctx == nil { + return nil + } + return c.ctx.Err() +} + +func (c *SendContext) Value(key interface{}) interface{} { + return c.ctx.Value(key) } diff --git a/internal/rpc/msg/filters.go b/internal/rpc/msg/filters.go index 9b6bc98ed..57fc18316 100644 --- a/internal/rpc/msg/filters.go +++ b/internal/rpc/msg/filters.go @@ -4,3 +4,6 @@ import pbChat "Open_IM/pkg/proto/chat" // BeforeSendFilter handles custom logic before send msg. type BeforeSendFilter func(ctx *SendContext, pb *pbChat.SendMsgReq) (*pbChat.SendMsgResp, bool, error) + +// AfterSendFilter handles custom logic after send msg. +type AfterSendFilter func(ctx *SendContext, req *pbChat.SendMsgReq, res *pbChat.SendMsgResp) (*pbChat.SendMsgResp, bool, error) diff --git a/internal/rpc/msg/rpcChat.go b/internal/rpc/msg/rpcChat.go index ea9cfb115..88bb25f09 100644 --- a/internal/rpc/msg/rpcChat.go +++ b/internal/rpc/msg/rpcChat.go @@ -23,6 +23,9 @@ type rpcChat struct { producer *kafka.Producer // beforeSenders are filters which will be triggered before send msg beforeSenders []BeforeSendFilter + + // afterSenders are filters which will be triggered after send msg + afterSenders []AfterSendFilter } func NewRpcChatServer(port int) *rpcChat { @@ -42,6 +45,11 @@ func (rpc *rpcChat) UseBeforSendFilters(hs ...BeforeSendFilter) { rpc.beforeSenders = append(rpc.beforeSenders, hs...) } +// UseAfterSendFilters attaches a global filter to the AfterSendFilter logic +func (rpc *rpcChat) UseAfterSendFilters(hs ...AfterSendFilter) { + rpc.afterSenders = append(rpc.afterSenders, hs...) +} + func (rpc *rpcChat) Run() { log.Info("", "", "rpc get_token init...") diff --git a/internal/rpc/msg/widget/before_sender.go b/internal/rpc/msg/widget/before_sender.go deleted file mode 100644 index 465585acf..000000000 --- a/internal/rpc/msg/widget/before_sender.go +++ /dev/null @@ -1,10 +0,0 @@ -package widget - -import ( - "context" - - pbChat "Open_IM/pkg/proto/chat" -) - -// BeforeSendHandler handles custom logic before send msg. -type BeforeSendHandler func(ctx context.Context, pb *pbChat.SendMsgReq) (*pbChat.SendMsgResp, bool, error)