From 0bed7ee669da994060c7219b765138d191656205 Mon Sep 17 00:00:00 2001 From: wangchuxiao Date: Mon, 28 Feb 2022 17:57:03 +0800 Subject: [PATCH 1/5] merge code --- config/config.yaml | 38 ++++++-- .../api/third/minio_storage_credential.go | 18 ++-- internal/demo/register/reset_password.go | 4 +- internal/rpc/auth/callback.go | 1 + internal/rpc/friend/callback.go | 1 + internal/rpc/group/callback.go | 9 ++ internal/rpc/group/group.go | 9 +- internal/rpc/msg/callback.go | 94 +++++++++++++++++++ internal/rpc/msg/send_msg.go | 50 ++++++---- internal/rpc/user/callback.go | 1 + internal/utils/callback.go | 1 + pkg/call_back_struct/common.go | 16 ++++ pkg/call_back_struct/group.go | 9 ++ pkg/call_back_struct/message.go | 60 ++++++++++++ pkg/common/config/config.go | 20 +++- pkg/common/constant/constant.go | 10 ++ pkg/common/constant/error.go | 2 + pkg/common/http/http_client.go | 10 ++ pkg/common/log/logrus.go | 15 ++- pkg/proto/sdk_ws/ws.proto | 30 ++++++ 20 files changed, 341 insertions(+), 57 deletions(-) create mode 100644 internal/rpc/auth/callback.go create mode 100644 internal/rpc/friend/callback.go create mode 100644 internal/rpc/group/callback.go create mode 100644 internal/rpc/msg/callback.go create mode 100644 internal/rpc/user/callback.go create mode 100644 internal/utils/callback.go create mode 100644 pkg/call_back_struct/group.go diff --git a/config/config.yaml b/config/config.yaml index 7d025c863..da3264de7 100644 --- a/config/config.yaml +++ b/config/config.yaml @@ -8,7 +8,7 @@ etcd: etcdAddr: [ 127.0.0.1:2379 ] #单机部署时,默认即可 mysql: - dbMysqlAddress: [ 127.0.0.1:13306 ] #mysql地址 目前仅支持单机,默认即可 + dbMysqlAddress: [ 43.128.5.63:13306 ] #mysql地址 目前仅支持单机,默认即可 dbMysqlUserName: root #mysql用户名,建议修改 dbMysqlPassword: openIM # mysql密码,建议修改 dbMysqlDatabaseName: openIM_v2 #默认即可 @@ -19,7 +19,7 @@ mysql: dbMaxLifeTime: 120 mongo: - dbAddress: [ 127.0.0.1:37017 ] #redis地址 目前仅支持单机,默认即可 + dbAddress: [ 43.128.5.63:37017 ] #redis地址 目前仅支持单机,默认即可 dbDirect: false dbTimeout: 10 dbDatabase: openIM #mongo db 默认即可 @@ -30,7 +30,7 @@ mongo: dbRetainChatRecords: 3650 #mongo保存离线消息时间(天),根据需求修改 redis: - dbAddress: 127.0.0.1:16379 #redis地址 目前仅支持单机,默认即可 + dbAddress: 43.128.5.63:16379 #redis地址 目前仅支持单机,默认即可 dbMaxIdle: 128 dbMaxActive: 0 dbIdleTimeout: 120 @@ -178,13 +178,6 @@ tokenpolicy: # Token effective time day as a unit accessExpire: 3650 #token过期时间(天) 默认即可 -messagecallback: - callbackSwitch: false - callbackUrl: "http://www.xxx.com/msg/judge" - #TimeOut use second as unit - callbackTimeOut: 10 -messagejudge: - isJudgeFriend: true # c2c: # callbackBeforeSendMsg: # switch: false @@ -199,6 +192,31 @@ iospush: pushSound: "xxx" badgeCount: true +callback: + callbackUrl : "http://xxx.com" + # 开启关闭操作前后回调的配置 + callbackbeforeSendSingleMsg: + enable: true # 回调是否启用 + callbackTimeOut: 5 # 回调超时时间 + CallbackFailedContinue: true # 回调超时是否继续执行代码 + callbackAfterSendSingleMsg: + enable: true + callbackTimeOut: 5 + CallbackFailedContinue: true + callbackBeforeSendGroupMsg: + enable: true + callbackTimeOut: 5 + CallbackFailedContinue: true + callbackAfterSendGroupMsg: + enable: true + callbackTimeOut: 5 + CallbackFailedContinue: true + callbackWordFilter: + enable: true + callbackTimeOut: 5 + CallbackFailedContinue: true + + notification: groupCreated: conversation: diff --git a/internal/api/third/minio_storage_credential.go b/internal/api/third/minio_storage_credential.go index 4d5800e06..f34f64c94 100644 --- a/internal/api/third/minio_storage_credential.go +++ b/internal/api/third/minio_storage_credential.go @@ -5,6 +5,7 @@ import ( "Open_IM/pkg/common/config" "Open_IM/pkg/common/constant" "Open_IM/pkg/common/log" + "Open_IM/pkg/common/token_verify" _ "Open_IM/pkg/common/token_verify" "Open_IM/pkg/utils" "github.com/gin-gonic/gin" @@ -23,12 +24,12 @@ func MinioStorageCredential(c *gin.Context) { c.JSON(http.StatusBadRequest, gin.H{"errCode": 400, "errMsg": err.Error()}) return } - //ok, _ := token_verify.GetUserIDFromToken(c.Request.Header.Get("token")) - //if !ok { - // log.NewError("", utils.GetSelfFuncName(), "GetUserIDFromToken false ", c.Request.Header.Get("token")) - // c.JSON(http.StatusInternalServerError, gin.H{"errCode": 500, "errMsg": "GetUserIDFromToken failed"}) - // return - //} + ok, _ := token_verify.GetUserIDFromToken(c.Request.Header.Get("token")) + if !ok { + log.NewError("", utils.GetSelfFuncName(), "GetUserIDFromToken false ", c.Request.Header.Get("token")) + c.JSON(http.StatusInternalServerError, gin.H{"errCode": 500, "errMsg": "GetUserIDFromToken failed"}) + return + } var stsOpts cr.STSAssumeRoleOptions stsOpts.AccessKey = config.Config.Credential.Minio.AccessKeyID stsOpts.SecretKey = config.Config.Credential.Minio.SecretAccessKey @@ -45,11 +46,6 @@ func MinioStorageCredential(c *gin.Context) { c.JSON(http.StatusBadRequest, gin.H{"errCode": 400, "errMsg": err.Error()}) return } - if err != nil { - log.NewError("0", utils.GetSelfFuncName(), err.Error()) - c.JSON(http.StatusBadRequest, gin.H{"errCode": 400, "errMsg": err.Error()}) - return - } resp.SessionToken = v.SessionToken resp.SecretAccessKey = v.SecretAccessKey resp.AccessKeyID = v.AccessKeyID diff --git a/internal/demo/register/reset_password.go b/internal/demo/register/reset_password.go index 0ff7115a0..b668f4f62 100644 --- a/internal/demo/register/reset_password.go +++ b/internal/demo/register/reset_password.go @@ -12,10 +12,10 @@ import ( ) type resetPasswordRequest struct { - VerificationCode string `json:"verificationCode"` + VerificationCode string `json:"verificationCode" binding:"required"` Email string `json:"email"` PhoneNumber string `json:"phoneNumber"` - NewPassword string `json:"newPassword"` + NewPassword string `json:"newPassword" binding:"required"` OperationID string `json:"operationID"` } diff --git a/internal/rpc/auth/callback.go b/internal/rpc/auth/callback.go new file mode 100644 index 000000000..8832b06d1 --- /dev/null +++ b/internal/rpc/auth/callback.go @@ -0,0 +1 @@ +package auth diff --git a/internal/rpc/friend/callback.go b/internal/rpc/friend/callback.go new file mode 100644 index 000000000..cf8ea69c8 --- /dev/null +++ b/internal/rpc/friend/callback.go @@ -0,0 +1 @@ +package friend diff --git a/internal/rpc/group/callback.go b/internal/rpc/group/callback.go new file mode 100644 index 000000000..8e0fcfb7b --- /dev/null +++ b/internal/rpc/group/callback.go @@ -0,0 +1,9 @@ +package group + +import ( + pbGroup "Open_IM/pkg/proto/group" +) + +func callbackBeforeCreateGroup(req *pbGroup.CreateGroupReq) (bool, error) { + return true, nil +} diff --git a/internal/rpc/group/group.go b/internal/rpc/group/group.go index 40255d071..7d4b85094 100644 --- a/internal/rpc/group/group.go +++ b/internal/rpc/group/group.go @@ -76,7 +76,12 @@ func (s *groupServer) CreateGroup(ctx context.Context, req *pbGroup.CreateGroupR log.NewError(req.OperationID, "CheckAccess false ", req.OpUserID, req.OwnerUserID) return &pbGroup.CreateGroupResp{ErrCode: constant.ErrAccess.ErrCode, ErrMsg: constant.ErrAccess.ErrMsg}, nil } - + canCreate, err := callbackBeforeCreateGroup(req) + if err != nil || !canCreate { + if err != nil { + log.NewError(req.OperationID, utils.GetSelfFuncName(), "callbackBeforeCreateGroup failed", ) + } + } //Time stamp + MD5 to generate group chat id groupId := utils.Md5(strconv.FormatInt(time.Now().UnixNano(), 10)) //to group @@ -84,7 +89,7 @@ func (s *groupServer) CreateGroup(ctx context.Context, req *pbGroup.CreateGroupR utils.CopyStructFields(&groupInfo, req.GroupInfo) groupInfo.CreatorUserID = req.OpUserID groupInfo.GroupID = groupId - err := imdb.InsertIntoGroup(groupInfo) + err = imdb.InsertIntoGroup(groupInfo) if err != nil { log.NewError(req.OperationID, "InsertIntoGroup failed, ", err.Error(), groupInfo) return &pbGroup.CreateGroupResp{ErrCode: constant.ErrDB.ErrCode, ErrMsg: constant.ErrDB.ErrMsg}, http.WrapError(constant.ErrDB) diff --git a/internal/rpc/msg/callback.go b/internal/rpc/msg/callback.go new file mode 100644 index 000000000..d55d9046e --- /dev/null +++ b/internal/rpc/msg/callback.go @@ -0,0 +1,94 @@ +package msg + +import ( + cbApi "Open_IM/pkg/call_back_struct" + "Open_IM/pkg/common/config" + "Open_IM/pkg/common/constant" + "Open_IM/pkg/common/http" + "Open_IM/pkg/common/log" + pbChat "Open_IM/pkg/proto/chat" + "Open_IM/pkg/utils" +) + +func callbackBeforeSendSingleMsg(msg *pbChat.SendMsgReq) (canSend bool, err error) { + log.NewDebug(msg.OperationID, utils.GetSelfFuncName(), msg) + if !config.Config.Callback.CallbackbeforeSendSingleMsg.Enable || msg.MsgData.ContentType != constant.Text { + return true, nil + } + req := cbApi.CallbackBeforeSendSingleMsgReq{CommonCallbackReq:cbApi.CommonCallbackReq{ + }} + resp := &cbApi.CallbackBeforeSendSingleMsgResp{CommonCallbackResp:cbApi.CommonCallbackResp{ + }} + utils.CopyStructFields(req, msg.MsgData) + if err := http.PostReturn(config.Config.Callback.CallbackUrl, req, resp, config.Config.Callback.CallbackbeforeSendSingleMsg.CallbackTimeOut); err != nil && !config.Config.Callback.CallbackbeforeSendSingleMsg.CallbackFailedContinue{ + return false, err + } + if resp.ActionCode == constant.ActionForbidden { + return false, nil + } + return true, nil +} + + +func callbackAfterSendSingleMsg(msg *pbChat.SendMsgReq) error { + log.NewDebug(msg.OperationID, utils.GetSelfFuncName(), msg) + if !config.Config.Callback.CallbackAfterSendSingleMsg.Enable || msg.MsgData.ContentType != constant.Text { + return nil + } + req := cbApi.CallbackAfterSendSingleMsgReq{CommonCallbackReq: cbApi.CommonCallbackReq{}} + resp := &cbApi.CallbackAfterSendSingleMsgResp{CommonCallbackResp: cbApi.CommonCallbackResp{}} + utils.CopyStructFields(req, msg.MsgData) + if err := http.PostReturn(config.Config.Callback.CallbackUrl, req, resp, config.Config.Callback.CallbackAfterSendSingleMsg.CallbackTimeOut); err != nil && config.Config.Callback.CallbackAfterSendSingleMsg.CallbackFailedContinue{ + return err + } + return nil +} + + +func callbackBeforeSendGroupMsg(msg *pbChat.SendMsgReq) (canSend bool, err error) { + log.NewDebug(msg.OperationID, utils.GetSelfFuncName(), msg) + if !config.Config.Callback.CallbackBeforeSendGroupMsg.Enable || msg.MsgData.ContentType != constant.Text { + return true, nil + } + req := cbApi.CallbackBeforeSendSingleMsgReq{CommonCallbackReq: cbApi.CommonCallbackReq{}} + resp := &cbApi.CallbackBeforeSendGroupMsgResp{CommonCallbackResp: cbApi.CommonCallbackResp{}} + utils.CopyStructFields(req, msg.MsgData) + if err := http.PostReturn(config.Config.Callback.CallbackUrl, req, resp, config.Config.Callback.CallbackBeforeSendGroupMsg.CallbackTimeOut); err != nil && !config.Config.Callback.CallbackBeforeSendGroupMsg.CallbackFailedContinue{ + return false, nil + } + if resp.ActionCode == constant.ActionForbidden { + return false, nil + } + return true, nil +} + +func callbackAfterSendGroupMsg(msg *pbChat.SendMsgReq) error { + log.NewDebug(msg.OperationID, utils.GetSelfFuncName(), msg) + if !config.Config.Callback.CallbackAfterSendGroupMsg.Enable || msg.MsgData.ContentType != constant.Text { + return nil + } + return nil +} + + +func callBackWordFilter(msg *pbChat.SendMsgReq) (canSend bool, err error) { + log.NewDebug(msg.OperationID, utils.GetSelfFuncName(), msg) + if !config.Config.Callback.CallbackWordFilter.Enable || msg.MsgData.ContentType != constant.Text { + return true, nil + } + req := cbApi.CallBackWordFilterReq{ + CommonCallbackReq: cbApi.CommonCallbackReq{}, + } + resp := cbApi.CallBackWordFilterResp{CommonCallbackResp: cbApi.CommonCallbackResp{}} + utils.CopyStructFields(&req, msg.MsgData) + if err := http.PostReturn(msg.OperationID, req, &resp, config.Config.Callback.CallbackWordFilter.CallbackTimeOut); err != nil { + return false, err + } + msg.MsgData.Content = resp.Content + return true, nil +} + + + + + diff --git a/internal/rpc/msg/send_msg.go b/internal/rpc/msg/send_msg.go index a57a6e892..390f151f9 100644 --- a/internal/rpc/msg/send_msg.go +++ b/internal/rpc/msg/send_msg.go @@ -4,7 +4,6 @@ import ( "Open_IM/pkg/common/config" "Open_IM/pkg/common/constant" "Open_IM/pkg/common/db" - http2 "Open_IM/pkg/common/http" "Open_IM/pkg/common/log" "Open_IM/pkg/grpc-etcdv3/getcdv3" pbChat "Open_IM/pkg/proto/chat" @@ -12,11 +11,9 @@ import ( sdk_ws "Open_IM/pkg/proto/sdk_ws" "Open_IM/pkg/utils" "context" - "encoding/json" "github.com/garyburd/redigo/redis" "github.com/golang/protobuf/proto" "math/rand" - "net/http" "strconv" "strings" "time" @@ -127,25 +124,26 @@ func (rpc *rpcChat) SendMsg(_ context.Context, pb *pbChat.SendMsgReq) (*pbChat.S if !isHistory { mReq.IsOnlineOnly = true } - mResp := MsgCallBackResp{} - if config.Config.MessageCallBack.CallbackSwitch { - bMsg, err := http2.Post(config.Config.MessageCallBack.CallbackUrl, mReq, config.Config.MessageCallBack.CallBackTimeOut) - if err != nil { - log.ErrorByKv("callback to Business server err", pb.OperationID, "args", pb.String(), "err", err.Error()) - return returnMsg(&replay, pb, http.StatusInternalServerError, http.StatusText(http.StatusInternalServerError), "", 0) - } else if err = json.Unmarshal(bMsg, &mResp); err != nil { - log.ErrorByKv("ws json Unmarshal err", pb.OperationID, "args", pb.String(), "err", err.Error()) - return returnMsg(&replay, pb, 200, err.Error(), "", 0) - } else { - if mResp.ErrCode != 0 { - return returnMsg(&replay, pb, mResp.ResponseErrCode, mResp.ErrMsg, "", 0) - } else { - pb.MsgData.Content = []byte(mResp.ResponseResult.ModifiedMsg) - } - } + + // callback + canSend, err := callBackWordFilter(pb) + if err != nil { + log.NewError(pb.OperationID, utils.GetSelfFuncName(), "CallbackBeforeSendMsg failed", err.Error(), pb.MsgData) + } + if !canSend { + log.NewDebug(pb.OperationID, utils.GetSelfFuncName(), "callback result", canSend, "end rpc and return") + return returnMsg(&replay, pb, 201, "callback result stop rpc and return", "", 0) } switch pb.MsgData.SessionType { case constant.SingleChatType: + canSend, err := callbackBeforeSendSingleMsg(pb) + if err != nil { + log.NewError(pb.OperationID, utils.GetSelfFuncName(), "CallBackAfterSendMsg failed", err.Error()) + } + if !canSend { + log.NewDebug(pb.OperationID, utils.GetSelfFuncName(), "callback result", canSend, "end rpc and return") + return returnMsg(&replay, pb, 201, "callback result stop rpc and return", "", 0) + } isSend := modifyMessageByUserMessageReceiveOpt(pb.MsgData.RecvID, pb.MsgData.SendID, constant.SingleChatType, pb) if isSend { msgToMQ.MsgData = pb.MsgData @@ -163,8 +161,19 @@ func (rpc *rpcChat) SendMsg(_ context.Context, pb *pbChat.SendMsgReq) (*pbChat.S return returnMsg(&replay, pb, 201, "kafka send msg err", "", 0) } } + if err := callbackAfterSendSingleMsg(pb); err != nil { + log.NewError(pb.OperationID, utils.GetSelfFuncName(), "CallBackAfterSendMsg failed", err.Error()) + } return returnMsg(&replay, pb, 0, "", msgToMQ.MsgData.ServerMsgID, msgToMQ.MsgData.SendTime) case constant.GroupChatType: + canSend, err := callbackBeforeSendGroupMsg(pb) + if err != nil { + log.NewError(pb.OperationID, utils.GetSelfFuncName(), "CallBackAfterSendMsg failed", err.Error()) + } + if !canSend { + log.NewDebug(pb.OperationID, utils.GetSelfFuncName(), "callback result", canSend, "end rpc and return") + return returnMsg(&replay, pb, 201, "callback result stop rpc and return", "", 0) + } etcdConn := getcdv3.GetConn(config.Config.Etcd.EtcdSchema, strings.Join(config.Config.Etcd.EtcdAddr, ","), config.Config.RpcRegisterName.OpenImGroupName) client := pbGroup.NewGroupClient(etcdConn) req := &pbGroup.GetGroupAllMemberReq{ @@ -230,6 +239,9 @@ func (rpc *rpcChat) SendMsg(_ context.Context, pb *pbChat.SendMsgReq) (*pbChat.S } } + if err := callbackAfterSendGroupMsg(pb); err != nil { + log.NewError(pb.OperationID, utils.GetSelfFuncName(), "CallBackAfterSendMsg failed", err.Error()) + } return returnMsg(&replay, pb, 0, "", msgToMQ.MsgData.ServerMsgID, msgToMQ.MsgData.SendTime) default: return returnMsg(&replay, pb, 203, "unkonwn sessionType", "", 0) diff --git a/internal/rpc/user/callback.go b/internal/rpc/user/callback.go new file mode 100644 index 000000000..a00006b65 --- /dev/null +++ b/internal/rpc/user/callback.go @@ -0,0 +1 @@ +package user diff --git a/internal/utils/callback.go b/internal/utils/callback.go new file mode 100644 index 000000000..d4b585bf7 --- /dev/null +++ b/internal/utils/callback.go @@ -0,0 +1 @@ +package utils diff --git a/pkg/call_back_struct/common.go b/pkg/call_back_struct/common.go index 90b77eda8..fa7e4d4d3 100644 --- a/pkg/call_back_struct/common.go +++ b/pkg/call_back_struct/common.go @@ -1 +1,17 @@ package call_back_struct + +type CommonCallbackReq struct { + CallbackCommand string `json:"callbackCommand"` + ServerMsgID string `json:"serverID"` + ClientID string `json:"clientID"` + OperationID string `json:"operationID"` +} + +type CommonCallbackResp struct { + ActionCode int `json:"actionCode"` + ErrCode int `json:"errCode"` + ErrMsg string `json:"errMsg"` + OperationID string `json:"operationID"` +} + + diff --git a/pkg/call_back_struct/group.go b/pkg/call_back_struct/group.go new file mode 100644 index 000000000..3681dddcc --- /dev/null +++ b/pkg/call_back_struct/group.go @@ -0,0 +1,9 @@ +package call_back_struct + +type CallbackBeforeCreateGroupReq struct { + CommonCallbackReq +} + +type CallbackAfterCreateGroupResp struct { + CommonCallbackResp +} \ No newline at end of file diff --git a/pkg/call_back_struct/message.go b/pkg/call_back_struct/message.go index 90b77eda8..26473182e 100644 --- a/pkg/call_back_struct/message.go +++ b/pkg/call_back_struct/message.go @@ -1 +1,61 @@ package call_back_struct + +type singleMsg struct { + SendID string `json:"sendID"` + RecvID string `json:"recvID"` + ClientMsgID string `json:"clientMsgID"` + ServerMsgID string `json:"serverMsgId"` + SendTime int64 `json:"sendTime"` + CreateTime int64 `json:"createTime"` +} + +type CallbackBeforeSendSingleMsgReq struct { + CommonCallbackReq + singleMsg +} + +type CallbackBeforeSendSingleMsgResp struct { + CommonCallbackResp +} + +type CallbackAfterSendSingleMsgReq struct { + CommonCallbackReq + singleMsg +} + +type CallbackAfterSendSingleMsgResp struct { + CommonCallbackResp +} + +type groupMsg struct { + +} + +type CallbackBeforeSendGroupMsgReq struct { + CommonCallbackReq +} + +type CallbackBeforeSendGroupMsgResp struct { + CommonCallbackResp +} + +type CallbackAfterSendGroupMsgReq struct { + CommonCallbackReq +} + +type CallbackAfterSendGroupMsgResp struct { + CommonCallbackResp +} + +type CallBackWordFilterReq struct { + CommonCallbackReq + Content []byte `json:"content"` + SendID string `json:"SendID"` + RecvID string `json:"RecvID"` + GroupID string `json:"GroupID"` +} + +type CallBackWordFilterResp struct { + CommonCallbackResp + Content []byte `json:"content"` +} diff --git a/pkg/common/config/config.go b/pkg/common/config/config.go index ae0d3cadc..4782276e7 100644 --- a/pkg/common/config/config.go +++ b/pkg/common/config/config.go @@ -19,6 +19,12 @@ var ( var Config config +type callBackConfig struct { + Enable bool `yaml:"enable"` + CallbackTimeOut int `yaml:"callbackTimeOut"` + CallbackFailedContinue bool `CallbackFailedContinue` +} + type config struct { ServerIP string `yaml:"serverip"` ServerVersion string `yaml:"serverversion"` @@ -167,11 +173,6 @@ type config struct { AccessSecret string `yaml:"accessSecret"` AccessExpire int64 `yaml:"accessExpire"` } - MessageCallBack struct { - CallbackSwitch bool `yaml:"callbackSwitch"` - CallbackUrl string `yaml:"callbackUrl"` - CallBackTimeOut int `yaml:"callbackTimeOut"` - } MessageJudge struct { IsJudgeFriend bool `yaml:"isJudgeFriend"` } @@ -179,6 +180,15 @@ type config struct { PushSound string `yaml:"pushSound"` BadgeCount bool `yaml:"badgeCount"` } + + Callback struct { + CallbackUrl string `yaml:"callbackUrl"` + CallbackbeforeSendSingleMsg callBackConfig `yaml:"callbackSendGroupMsg"` + CallbackAfterSendSingleMsg callBackConfig `yaml:"callbackSendSingleMsg"` + CallbackBeforeSendGroupMsg callBackConfig `yaml:"CallbackBeforeSendGroupMsg"` + CallbackAfterSendGroupMsg callBackConfig `yaml:"callbackAfterSendGroupMsg"` + CallbackWordFilter callBackConfig `yaml:"callbackWordFilter"` + } `yaml:"callback"` Notification struct { ///////////////////////group///////////////////////////// GroupCreated struct { diff --git a/pkg/common/constant/constant.go b/pkg/common/constant/constant.go index c5e2d38b9..9d0c9724b 100644 --- a/pkg/common/constant/constant.go +++ b/pkg/common/constant/constant.go @@ -136,6 +136,16 @@ const ( VerificationCodeForReset = 2 VerificationCodeForRegisterSuffix = "_forRegister" VerificationCodeForResetSuffix = "_forReset" + + //callbackCommand + CallbackBeforeSendSingleMsgCommand = "callbackBeforeSendSingleMsg" + CallbackAfterSendSingleMsgCommand = "callbackAfterSendSingleMsg" + CallbackBeforeSendGroupMsgCommand = "callbackBeforeSendGroupMsg" + CallbackAfterSendGroupMsgCommand = "callbackAfterSendGroupMsgCommand" + CallbackWordFilterCommand = "callbackWordFilterCommand" + //actionCode + ActionAllow = 0 + ActionForbidden = 1 ) var ContentType2PushContent = map[int64]string{ diff --git a/pkg/common/constant/error.go b/pkg/common/constant/error.go index 339f2c953..548720a37 100644 --- a/pkg/common/constant/error.go +++ b/pkg/common/constant/error.go @@ -51,6 +51,7 @@ var ( ErrAccess = ErrInfo{ErrCode: 801, ErrMsg: AccessMsg.Error()} ErrDB = ErrInfo{ErrCode: 802, ErrMsg: DBMsg.Error()} ErrArgs = ErrInfo{ErrCode: 8003, ErrMsg: ArgsMsg.Error()} + ErrCallback = ErrInfo{ErrCode: 809, ErrMsg: CallBackMsg.Error()} ) var ( @@ -64,6 +65,7 @@ var ( AccessMsg = errors.New("no permission") DBMsg = errors.New("db failed") ArgsMsg = errors.New("args failed") + CallBackMsg = errors.New("callback failed") ThirdPartyMsg = errors.New("third party error") ) diff --git a/pkg/common/http/http_client.go b/pkg/common/http/http_client.go index 4d480770a..38d18c119 100644 --- a/pkg/common/http/http_client.go +++ b/pkg/common/http/http_client.go @@ -53,3 +53,13 @@ func Post(url string, data interface{}, timeOutSecond int) (content []byte, err } return result, nil } + +func PostReturn(url string, input, output interface{}, timeOut int) error { + b, err := Post(url, input, timeOut) + if err != nil { + return err + } + if err = json.Unmarshal(b, output); err != nil { + return err + } +} diff --git a/pkg/common/log/logrus.go b/pkg/common/log/logrus.go index a5e0ae7dd..63485e8d9 100644 --- a/pkg/common/log/logrus.go +++ b/pkg/common/log/logrus.go @@ -2,7 +2,6 @@ package log import ( "Open_IM/pkg/common/config" - "bufio" "fmt" "os" "time" @@ -33,13 +32,13 @@ func loggerInit(moduleName string) *Logger { //All logs will be printed logger.SetLevel(logrus.Level(config.Config.Log.RemainLogLevel)) //Close std console output - src, err := os.OpenFile(os.DevNull, os.O_APPEND|os.O_WRONLY, os.ModeAppend) - if err != nil { - panic(err.Error()) - } - writer := bufio.NewWriter(src) - logger.SetOutput(writer) - //logger.SetOutput(os.Stdout) + //src, err := os.OpenFile(os.DevNull, os.O_APPEND|os.O_WRONLY, os.ModeAppend) + //if err != nil { + // panic(err.Error()) + //} + //writer := bufio.NewWriter(src) + //logger.SetOutput(writer) + logger.SetOutput(os.Stdout) //Log Console Print Style Setting logger.SetFormatter(&nested.Formatter{ TimestampFormat: "2006-01-02 15:04:05.000", diff --git a/pkg/proto/sdk_ws/ws.proto b/pkg/proto/sdk_ws/ws.proto index 5d200cce6..cb622b96c 100644 --- a/pkg/proto/sdk_ws/ws.proto +++ b/pkg/proto/sdk_ws/ws.proto @@ -338,6 +338,36 @@ message ConversationUpdateTips{ } +///callback +message CommonCallbackURLReq { + string CallbackCommand = 1 [json_name = "code"]; + string OpenIMServerID = 2; + string OperationID = 3; +} + +message CommonCallbackURLResp { + string Code = 1 [json_name = "code"]; + string Msg = 2 [json_name = "msg"]; + string OperationID = 3 [json_name = "operationID"]; +} + +message CallbackBeforeSendMsgReq { + commonReq CommonCallbackURLReq = 1; + +} + +message CallbackBeforeSendMsgResp { + commonResp CommonCallbackURLResp = 1; + string FromUserID = 2; +} + +message CallbackAfterAddFriendReq { + commonReq CommonCallbackURLReq = 1; +} + +message CallbackAfterAddFriendResp { + commonResp CommonCallbackURLResp = 1; +} ///cms From dcfd802757f92c3ab3655dc2102d1e1a84c196e5 Mon Sep 17 00:00:00 2001 From: wangchuxiao Date: Wed, 2 Mar 2022 19:07:17 +0800 Subject: [PATCH 2/5] callback add --- config/config.yaml | 6 ++-- internal/api/third/minio_init.go | 1 + internal/rpc/msg/callback.go | 48 ++++++++++++++++++--------- internal/rpc/msg/send_msg.go | 2 +- internal/rpc/statistics/statistics.go | 41 +++++++++++++++-------- pkg/call_back_struct/message.go | 24 +++++++++----- 6 files changed, 80 insertions(+), 42 deletions(-) diff --git a/config/config.yaml b/config/config.yaml index da3264de7..d8ff4060d 100644 --- a/config/config.yaml +++ b/config/config.yaml @@ -8,7 +8,7 @@ etcd: etcdAddr: [ 127.0.0.1:2379 ] #单机部署时,默认即可 mysql: - dbMysqlAddress: [ 43.128.5.63:13306 ] #mysql地址 目前仅支持单机,默认即可 + dbMysqlAddress: [ 127.0.0.1:13306 ] #mysql地址 目前仅支持单机,默认即可 dbMysqlUserName: root #mysql用户名,建议修改 dbMysqlPassword: openIM # mysql密码,建议修改 dbMysqlDatabaseName: openIM_v2 #默认即可 @@ -19,7 +19,7 @@ mysql: dbMaxLifeTime: 120 mongo: - dbAddress: [ 43.128.5.63:37017 ] #redis地址 目前仅支持单机,默认即可 + dbAddress: [ 127.0.0.1:37017 ] #redis地址 目前仅支持单机,默认即可 dbDirect: false dbTimeout: 10 dbDatabase: openIM #mongo db 默认即可 @@ -30,7 +30,7 @@ mongo: dbRetainChatRecords: 3650 #mongo保存离线消息时间(天),根据需求修改 redis: - dbAddress: 43.128.5.63:16379 #redis地址 目前仅支持单机,默认即可 + dbAddress: 127.0.0.1:16379 #redis地址 目前仅支持单机,默认即可 dbMaxIdle: 128 dbMaxActive: 0 dbIdleTimeout: 120 diff --git a/internal/api/third/minio_init.go b/internal/api/third/minio_init.go index d073524f3..bcec45402 100644 --- a/internal/api/third/minio_init.go +++ b/internal/api/third/minio_init.go @@ -42,6 +42,7 @@ func init() { return } } + // 自动化桶public的代码 //err = minioClient.SetBucketPolicy(context.Background(), config.Config.Credential.Minio.Bucket, policy.BucketPolicyReadWrite) //if err != nil { // log.NewError("", utils.GetSelfFuncName(), "SetBucketPolicy failed please set in ", err.Error()) diff --git a/internal/rpc/msg/callback.go b/internal/rpc/msg/callback.go index d55d9046e..19d8ca4f4 100644 --- a/internal/rpc/msg/callback.go +++ b/internal/rpc/msg/callback.go @@ -12,16 +12,19 @@ import ( func callbackBeforeSendSingleMsg(msg *pbChat.SendMsgReq) (canSend bool, err error) { log.NewDebug(msg.OperationID, utils.GetSelfFuncName(), msg) - if !config.Config.Callback.CallbackbeforeSendSingleMsg.Enable || msg.MsgData.ContentType != constant.Text { + if !config.Config.Callback.CallbackbeforeSendSingleMsg.Enable { return true, nil } req := cbApi.CallbackBeforeSendSingleMsgReq{CommonCallbackReq:cbApi.CommonCallbackReq{ }} resp := &cbApi.CallbackBeforeSendSingleMsgResp{CommonCallbackResp:cbApi.CommonCallbackResp{ }} + defer log.NewDebug(msg.OperationID, utils.GetSelfFuncName(), req, resp) utils.CopyStructFields(req, msg.MsgData) - if err := http.PostReturn(config.Config.Callback.CallbackUrl, req, resp, config.Config.Callback.CallbackbeforeSendSingleMsg.CallbackTimeOut); err != nil && !config.Config.Callback.CallbackbeforeSendSingleMsg.CallbackFailedContinue{ - return false, err + if err := http.PostReturn(config.Config.Callback.CallbackUrl, req, resp, config.Config.Callback.CallbackbeforeSendSingleMsg.CallbackTimeOut); err != nil{ + if !config.Config.Callback.CallbackbeforeSendSingleMsg.CallbackFailedContinue { + return false, err + } } if resp.ActionCode == constant.ActionForbidden { return false, nil @@ -32,13 +35,14 @@ func callbackBeforeSendSingleMsg(msg *pbChat.SendMsgReq) (canSend bool, err erro func callbackAfterSendSingleMsg(msg *pbChat.SendMsgReq) error { log.NewDebug(msg.OperationID, utils.GetSelfFuncName(), msg) - if !config.Config.Callback.CallbackAfterSendSingleMsg.Enable || msg.MsgData.ContentType != constant.Text { + if !config.Config.Callback.CallbackAfterSendSingleMsg.Enable { return nil } req := cbApi.CallbackAfterSendSingleMsgReq{CommonCallbackReq: cbApi.CommonCallbackReq{}} resp := &cbApi.CallbackAfterSendSingleMsgResp{CommonCallbackResp: cbApi.CommonCallbackResp{}} + defer log.NewDebug(msg.OperationID, utils.GetSelfFuncName(), req, resp) utils.CopyStructFields(req, msg.MsgData) - if err := http.PostReturn(config.Config.Callback.CallbackUrl, req, resp, config.Config.Callback.CallbackAfterSendSingleMsg.CallbackTimeOut); err != nil && config.Config.Callback.CallbackAfterSendSingleMsg.CallbackFailedContinue{ + if err := http.PostReturn(config.Config.Callback.CallbackUrl, req, resp, config.Config.Callback.CallbackAfterSendSingleMsg.CallbackTimeOut); err != nil{ return err } return nil @@ -47,14 +51,17 @@ func callbackAfterSendSingleMsg(msg *pbChat.SendMsgReq) error { func callbackBeforeSendGroupMsg(msg *pbChat.SendMsgReq) (canSend bool, err error) { log.NewDebug(msg.OperationID, utils.GetSelfFuncName(), msg) - if !config.Config.Callback.CallbackBeforeSendGroupMsg.Enable || msg.MsgData.ContentType != constant.Text { + if !config.Config.Callback.CallbackBeforeSendGroupMsg.Enable { return true, nil } req := cbApi.CallbackBeforeSendSingleMsgReq{CommonCallbackReq: cbApi.CommonCallbackReq{}} resp := &cbApi.CallbackBeforeSendGroupMsgResp{CommonCallbackResp: cbApi.CommonCallbackResp{}} + defer log.NewDebug(msg.OperationID, utils.GetSelfFuncName(), req, resp) utils.CopyStructFields(req, msg.MsgData) - if err := http.PostReturn(config.Config.Callback.CallbackUrl, req, resp, config.Config.Callback.CallbackBeforeSendGroupMsg.CallbackTimeOut); err != nil && !config.Config.Callback.CallbackBeforeSendGroupMsg.CallbackFailedContinue{ - return false, nil + if err := http.PostReturn(config.Config.Callback.CallbackUrl, req, resp, config.Config.Callback.CallbackBeforeSendGroupMsg.CallbackTimeOut); err != nil { + if !config.Config.Callback.CallbackBeforeSendGroupMsg.CallbackFailedContinue { + return false, nil + } } if resp.ActionCode == constant.ActionForbidden { return false, nil @@ -64,9 +71,16 @@ func callbackBeforeSendGroupMsg(msg *pbChat.SendMsgReq) (canSend bool, err error func callbackAfterSendGroupMsg(msg *pbChat.SendMsgReq) error { log.NewDebug(msg.OperationID, utils.GetSelfFuncName(), msg) - if !config.Config.Callback.CallbackAfterSendGroupMsg.Enable || msg.MsgData.ContentType != constant.Text { + if !config.Config.Callback.CallbackAfterSendGroupMsg.Enable { return nil } + req := cbApi.CallbackAfterSendGroupMsgReq{CommonCallbackReq: cbApi.CommonCallbackReq{}} + resp := &cbApi.CallbackAfterSendGroupMsgResp{CommonCallbackResp: cbApi.CommonCallbackResp{}} + defer log.NewDebug(msg.OperationID, utils.GetSelfFuncName(), req, resp) + utils.CopyStructFields(req, msg.MsgData) + if err := http.PostReturn(config.Config.Callback.CallbackUrl, req, resp, config.Config.Callback.CallbackAfterSendGroupMsg.CallbackTimeOut); err != nil { + return err + } return nil } @@ -76,13 +90,17 @@ func callBackWordFilter(msg *pbChat.SendMsgReq) (canSend bool, err error) { if !config.Config.Callback.CallbackWordFilter.Enable || msg.MsgData.ContentType != constant.Text { return true, nil } - req := cbApi.CallBackWordFilterReq{ - CommonCallbackReq: cbApi.CommonCallbackReq{}, - } - resp := cbApi.CallBackWordFilterResp{CommonCallbackResp: cbApi.CommonCallbackResp{}} + req := cbApi.CallbackWordFilterReq{CommonCallbackReq: cbApi.CommonCallbackReq{}} + resp := &cbApi.CallbackWordFilterResp{CommonCallbackResp: cbApi.CommonCallbackResp{}} + defer log.NewDebug(msg.OperationID, utils.GetSelfFuncName(), req, resp) utils.CopyStructFields(&req, msg.MsgData) - if err := http.PostReturn(msg.OperationID, req, &resp, config.Config.Callback.CallbackWordFilter.CallbackTimeOut); err != nil { - return false, err + if err := http.PostReturn(msg.OperationID, req, resp, config.Config.Callback.CallbackWordFilter.CallbackTimeOut); err != nil { + if !config.Config.Callback.CallbackWordFilter.CallbackFailedContinue { + return false, err + } + } + if resp.ActionCode == constant.ActionForbidden { + return false, nil } msg.MsgData.Content = resp.Content return true, nil diff --git a/internal/rpc/msg/send_msg.go b/internal/rpc/msg/send_msg.go index 390f151f9..86a28d480 100644 --- a/internal/rpc/msg/send_msg.go +++ b/internal/rpc/msg/send_msg.go @@ -131,7 +131,7 @@ func (rpc *rpcChat) SendMsg(_ context.Context, pb *pbChat.SendMsgReq) (*pbChat.S log.NewError(pb.OperationID, utils.GetSelfFuncName(), "CallbackBeforeSendMsg failed", err.Error(), pb.MsgData) } if !canSend { - log.NewDebug(pb.OperationID, utils.GetSelfFuncName(), "callback result", canSend, "end rpc and return") + log.NewDebug(pb.OperationID, utils.GetSelfFuncName(), "callback result", canSend, "end rpc and return", pb.MsgData) return returnMsg(&replay, pb, 201, "callback result stop rpc and return", "", 0) } switch pb.MsgData.SessionType { diff --git a/internal/rpc/statistics/statistics.go b/internal/rpc/statistics/statistics.go index 9554bf116..e8b1383f9 100644 --- a/internal/rpc/statistics/statistics.go +++ b/internal/rpc/statistics/statistics.go @@ -164,28 +164,41 @@ func GetRangeDate(from, to time.Time) [][2]time.Time { } // month case !isInOneMonth(from, to): - for i := 0; ; i++ { - if i == 0 { - fromTime := from - toTime := getFirstDateOfNextNMonth(fromTime, 1) + if to.Sub(from) < time.Hour * 24 * 30 { + for i := 0; ; i++ { + fromTime := from.Add(time.Hour * 24 * time.Duration(i)) + toTime := from.Add(time.Hour * 24 * time.Duration(i+1)) + if toTime.After(to.Add(time.Hour * 24)) { + break + } times = append(times, [2]time.Time{ fromTime, toTime, }) - } else { - fromTime := getFirstDateOfNextNMonth(from, i) - toTime := getFirstDateOfNextNMonth(fromTime, 1) - if toTime.After(to) { - toTime = to + } + } else { + for i := 0; ; i++ { + if i == 0 { + fromTime := from + toTime := getFirstDateOfNextNMonth(fromTime, 1) + times = append(times, [2]time.Time{ + fromTime, toTime, + }) + } else { + fromTime := getFirstDateOfNextNMonth(from, i) + toTime := getFirstDateOfNextNMonth(fromTime, 1) + if toTime.After(to) { + toTime = to + times = append(times, [2]time.Time{ + fromTime, toTime, + }) + break + } times = append(times, [2]time.Time{ fromTime, toTime, }) - break } - times = append(times, [2]time.Time{ - fromTime, toTime, - }) - } + } } } return times diff --git a/pkg/call_back_struct/message.go b/pkg/call_back_struct/message.go index 26473182e..253c2561e 100644 --- a/pkg/call_back_struct/message.go +++ b/pkg/call_back_struct/message.go @@ -1,12 +1,16 @@ package call_back_struct -type singleMsg struct { +type msg struct { SendID string `json:"sendID"` - RecvID string `json:"recvID"` ClientMsgID string `json:"clientMsgID"` ServerMsgID string `json:"serverMsgId"` - SendTime int64 `json:"sendTime"` CreateTime int64 `json:"createTime"` + Content []byte `json:"content"` +} + +type singleMsg struct { + msg + RecvID string `json:"recvID"` } type CallbackBeforeSendSingleMsgReq struct { @@ -16,6 +20,7 @@ type CallbackBeforeSendSingleMsgReq struct { type CallbackBeforeSendSingleMsgResp struct { CommonCallbackResp + singleMsg } type CallbackAfterSendSingleMsgReq struct { @@ -28,18 +33,22 @@ type CallbackAfterSendSingleMsgResp struct { } type groupMsg struct { - + msg + GroupID string `json:"groupID"` } type CallbackBeforeSendGroupMsgReq struct { CommonCallbackReq + groupMsg } type CallbackBeforeSendGroupMsgResp struct { CommonCallbackResp + groupMsg } type CallbackAfterSendGroupMsgReq struct { + groupMsg CommonCallbackReq } @@ -47,15 +56,12 @@ type CallbackAfterSendGroupMsgResp struct { CommonCallbackResp } -type CallBackWordFilterReq struct { +type CallbackWordFilterReq struct { CommonCallbackReq Content []byte `json:"content"` - SendID string `json:"SendID"` - RecvID string `json:"RecvID"` - GroupID string `json:"GroupID"` } -type CallBackWordFilterResp struct { +type CallbackWordFilterResp struct { CommonCallbackResp Content []byte `json:"content"` } From c0c08fe373344542aaf6f95e28b58cc6b7f713e2 Mon Sep 17 00:00:00 2001 From: skiffer-git <44203734@qq.com> Date: Wed, 2 Mar 2022 20:59:54 +0800 Subject: [PATCH 3/5] adjust log --- internal/msg_gateway/gate/ws_server.go | 2 +- internal/push/logic/push_to_client.go | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/internal/msg_gateway/gate/ws_server.go b/internal/msg_gateway/gate/ws_server.go index 0f751b58d..ff7e739c8 100644 --- a/internal/msg_gateway/gate/ws_server.go +++ b/internal/msg_gateway/gate/ws_server.go @@ -182,7 +182,7 @@ func (ws *WServer) addUserConn(uid string, platformID int32, conn *UserConn, tok for _, v := range ws.wsUserToConn { count = count + len(v) } - log.WarnByKv("WS Add operation", "", "wsUser added", ws.wsUserToConn, "connection_uid", uid, "connection_platform", constant.PlatformIDToName(platformID), "online_user_num", len(ws.wsUserToConn), "online_conn_num", count) + log.Debug("WS Add operation", "", "wsUser added", ws.wsUserToConn, "connection_uid", uid, "connection_platform", constant.PlatformIDToName(platformID), "online_user_num", len(ws.wsUserToConn), "online_conn_num", count) userCount = uint64(len(ws.wsUserToConn)) } diff --git a/internal/push/logic/push_to_client.go b/internal/push/logic/push_to_client.go index b6a624f91..11d090ac5 100644 --- a/internal/push/logic/push_to_client.go +++ b/internal/push/logic/push_to_client.go @@ -31,10 +31,10 @@ type AtContent struct { func MsgToUser(pushMsg *pbPush.PushMsgReq) { var wsResult []*pbRelay.SingleMsgToUser //isOfflinePush := utils.GetSwitchFromOptions(pushMsg.MsgData.Options, constant.IsOfflinePush) - log.InfoByKv("Get msg from msg_transfer And push msg", pushMsg.OperationID, "PushData", pushMsg.String()) + log.Debug("Get msg from msg_transfer And push msg", pushMsg.OperationID, "PushData", pushMsg.String()) grpcCons := getcdv3.GetConn4Unique(config.Config.Etcd.EtcdSchema, strings.Join(config.Config.Etcd.EtcdAddr, ","), config.Config.RpcRegisterName.OpenImOnlineMessageRelayName) //Online push message - log.InfoByKv("test", pushMsg.OperationID, "len grpc", len(grpcCons), "data", pushMsg.String()) + log.Debug("test", pushMsg.OperationID, "len grpc", len(grpcCons), "data", pushMsg.String()) for _, v := range grpcCons { msgClient := pbRelay.NewOnlineMessageRelayServiceClient(v) reply, err := msgClient.OnlinePushMsg(context.Background(), &pbRelay.OnlinePushMsgReq{OperationID: pushMsg.OperationID, MsgData: pushMsg.MsgData, PushToUserID: pushMsg.PushToUserID}) From e6b302abf9c51bccd121b563f09339a1a7f6f4f9 Mon Sep 17 00:00:00 2001 From: skiffer-git <44203734@qq.com> Date: Wed, 2 Mar 2022 21:02:21 +0800 Subject: [PATCH 4/5] log --- pkg/common/http/http_client.go | 1 + 1 file changed, 1 insertion(+) diff --git a/pkg/common/http/http_client.go b/pkg/common/http/http_client.go index 38d18c119..e227da91e 100644 --- a/pkg/common/http/http_client.go +++ b/pkg/common/http/http_client.go @@ -62,4 +62,5 @@ func PostReturn(url string, input, output interface{}, timeOut int) error { if err = json.Unmarshal(b, output); err != nil { return err } + return nil } From 25f24801d6acb46c81364dd2c0a342af7f01dc92 Mon Sep 17 00:00:00 2001 From: FG <1432970085@qq.com> Date: Wed, 2 Mar 2022 22:52:39 +0800 Subject: [PATCH 5/5] log --- pkg/common/log/logrus.go | 15 ++++++++------- 1 file changed, 8 insertions(+), 7 deletions(-) diff --git a/pkg/common/log/logrus.go b/pkg/common/log/logrus.go index 63485e8d9..a5e0ae7dd 100644 --- a/pkg/common/log/logrus.go +++ b/pkg/common/log/logrus.go @@ -2,6 +2,7 @@ package log import ( "Open_IM/pkg/common/config" + "bufio" "fmt" "os" "time" @@ -32,13 +33,13 @@ func loggerInit(moduleName string) *Logger { //All logs will be printed logger.SetLevel(logrus.Level(config.Config.Log.RemainLogLevel)) //Close std console output - //src, err := os.OpenFile(os.DevNull, os.O_APPEND|os.O_WRONLY, os.ModeAppend) - //if err != nil { - // panic(err.Error()) - //} - //writer := bufio.NewWriter(src) - //logger.SetOutput(writer) - logger.SetOutput(os.Stdout) + src, err := os.OpenFile(os.DevNull, os.O_APPEND|os.O_WRONLY, os.ModeAppend) + if err != nil { + panic(err.Error()) + } + writer := bufio.NewWriter(src) + logger.SetOutput(writer) + //logger.SetOutput(os.Stdout) //Log Console Print Style Setting logger.SetFormatter(&nested.Formatter{ TimestampFormat: "2006-01-02 15:04:05.000",