diff --git a/cmd/Open-IM-SDK-Core b/cmd/Open-IM-SDK-Core index 992f76df0..1c6c7af53 160000 --- a/cmd/Open-IM-SDK-Core +++ b/cmd/Open-IM-SDK-Core @@ -1 +1 @@ -Subproject commit 992f76df0ee500a0377523b0780d3a85f2275755 +Subproject commit 1c6c7af5393b3e9eefbaf16b673519ca863a6c2c diff --git a/cmd/open_im_api/main.go b/cmd/open_im_api/main.go index d5c01986b..551a37a0e 100644 --- a/cmd/open_im_api/main.go +++ b/cmd/open_im_api/main.go @@ -102,7 +102,7 @@ func main() { conversationGroup.POST("/get_receive_message_opt", conversation.GetReceiveMessageOpt) //1 conversationGroup.POST("/get_all_conversation_message_opt", conversation.GetAllConversationMessageOpt) //1 } - + apiThird.MinioInit() log.NewPrivateLog("api") ginPort := flag.Int("port", 10000, "get ginServerPort from cmd,default 10000 as port") flag.Parse() diff --git a/config/config.yaml b/config/config.yaml index 6e3eaeea6..9a9e1ec6c 100644 --- a/config/config.yaml +++ b/config/config.yaml @@ -91,8 +91,8 @@ credential: #腾讯cos,发送图片、视频、文件时需要,请自行申 bucket: openim location: us-east-1 endpoint: http://127.0.0.1:9000 - accessKeyID: minioadmin - secretAccessKey: minioadmin + accessKeyID: user12345 + secretAccessKey: key12345 rpcport: #rpc服务端口 默认即可 @@ -131,8 +131,9 @@ rpcregistername: #rpc注册服务名,默认即可 log: storageLocation: ../logs/ rotationTime: 24 - remainRotationCount: 5 #日志数量 - remainLogLevel: 6 #日志级别 6表示全都打印,测试阶段建议设置为6 + remainRotationCount: 3 #日志数量 + #日志级别 6表示全都打印,测试阶段建议设置为6 + remainLogLevel: 6 elasticSearchSwitch: false elasticSearchAddr: [ 127.0.0.1:9201 ] elasticSearchUser: "" @@ -178,13 +179,6 @@ tokenpolicy: # Token effective time day as a unit accessExpire: 3650 #token过期时间(天) 默认即可 -messagecallback: - callbackSwitch: false - callbackUrl: "http://www.xxx.com/msg/judge" - #TimeOut use second as unit - callbackTimeOut: 10 -messagejudge: - isJudgeFriend: true # c2c: # callbackBeforeSendMsg: # switch: false @@ -199,6 +193,30 @@ iospush: pushSound: "xxx" badgeCount: true +callback: + # callback url 需要自行更换callback url + callbackUrl : "http://127.0.0.1:8080/callback" + # 开启关闭操作前后回调的配置 + callbackbeforeSendSingleMsg: + enable: false # 回调是否启用 + callbackTimeOut: 2 # 回调超时时间 + CallbackFailedContinue: true # 回调超时是否继续执行代码 + callbackAfterSendSingleMsg: + enable: false + callbackTimeOut: 2 + callbackBeforeSendGroupMsg: + enable: false + callbackTimeOut: 2 + CallbackFailedContinue: true + callbackAfterSendGroupMsg: + enable: false + callbackTimeOut: 2 + callbackWordFilter: + enable: false + callbackTimeOut: 2 + CallbackFailedContinue: true + + notification: groupCreated: conversation: diff --git a/docker-compose.yaml b/docker-compose.yaml index 5497faa43..154c61faa 100644 --- a/docker-compose.yaml +++ b/docker-compose.yaml @@ -24,6 +24,10 @@ services: - ./components/mongodb/data/db:/data/db - ./components/mongodb/data/logs:/data/logs - ./components/mongodb/data/conf:/etc/mongo + environment: + - TZ=Asia/Shanghai + # cache + - wiredTigerCacheSizeGB=1 # environment: # - MONGO_INITDB_ROOT_USERNAME=openIM # - MONGO_INITDB_ROOT_PASSWORD=openIM diff --git a/internal/api/third/minio_init.go b/internal/api/third/minio_init.go index 4f54996ba..29338ee77 100644 --- a/internal/api/third/minio_init.go +++ b/internal/api/third/minio_init.go @@ -10,7 +10,7 @@ import ( url2 "net/url" ) -func init() { +func MinioInit() { minioUrl, err := url2.Parse(config.Config.Credential.Minio.Endpoint) if err != nil { log.NewError("", utils.GetSelfFuncName(), "parse failed, please check config/config.yaml", err.Error()) @@ -30,18 +30,23 @@ func init() { } err = minioClient.MakeBucket(context.Background(), config.Config.Credential.Minio.Bucket, opt) if err != nil { + log.NewInfo("", utils.GetSelfFuncName(), err.Error()) exists, err := minioClient.BucketExists(context.Background(), config.Config.Credential.Minio.Bucket) if err == nil && exists { log.NewInfo("", utils.GetSelfFuncName(), "We already own %s\n", config.Config.Credential.Minio.Bucket) } else { - log.NewError("", utils.GetSelfFuncName(), "create bucket failed and bucket not exists", err.Error()) + if err != nil { + log.NewError("", utils.GetSelfFuncName(), err.Error()) + } + log.NewError("", utils.GetSelfFuncName(), "create bucket failed and bucket not exists") return } } + // 自动化桶public的代码 //err = minioClient.SetBucketPolicy(context.Background(), config.Config.Credential.Minio.Bucket, policy.BucketPolicyReadWrite) //if err != nil { // log.NewError("", utils.GetSelfFuncName(), "SetBucketPolicy failed please set in ", err.Error()) - // return + // return`z //} log.NewInfo("", utils.GetSelfFuncName(), "minio create and set policy success") } diff --git a/internal/api/third/minio_storage_credential.go b/internal/api/third/minio_storage_credential.go index 4d5800e06..f34f64c94 100644 --- a/internal/api/third/minio_storage_credential.go +++ b/internal/api/third/minio_storage_credential.go @@ -5,6 +5,7 @@ import ( "Open_IM/pkg/common/config" "Open_IM/pkg/common/constant" "Open_IM/pkg/common/log" + "Open_IM/pkg/common/token_verify" _ "Open_IM/pkg/common/token_verify" "Open_IM/pkg/utils" "github.com/gin-gonic/gin" @@ -23,12 +24,12 @@ func MinioStorageCredential(c *gin.Context) { c.JSON(http.StatusBadRequest, gin.H{"errCode": 400, "errMsg": err.Error()}) return } - //ok, _ := token_verify.GetUserIDFromToken(c.Request.Header.Get("token")) - //if !ok { - // log.NewError("", utils.GetSelfFuncName(), "GetUserIDFromToken false ", c.Request.Header.Get("token")) - // c.JSON(http.StatusInternalServerError, gin.H{"errCode": 500, "errMsg": "GetUserIDFromToken failed"}) - // return - //} + ok, _ := token_verify.GetUserIDFromToken(c.Request.Header.Get("token")) + if !ok { + log.NewError("", utils.GetSelfFuncName(), "GetUserIDFromToken false ", c.Request.Header.Get("token")) + c.JSON(http.StatusInternalServerError, gin.H{"errCode": 500, "errMsg": "GetUserIDFromToken failed"}) + return + } var stsOpts cr.STSAssumeRoleOptions stsOpts.AccessKey = config.Config.Credential.Minio.AccessKeyID stsOpts.SecretKey = config.Config.Credential.Minio.SecretAccessKey @@ -45,11 +46,6 @@ func MinioStorageCredential(c *gin.Context) { c.JSON(http.StatusBadRequest, gin.H{"errCode": 400, "errMsg": err.Error()}) return } - if err != nil { - log.NewError("0", utils.GetSelfFuncName(), err.Error()) - c.JSON(http.StatusBadRequest, gin.H{"errCode": 400, "errMsg": err.Error()}) - return - } resp.SessionToken = v.SessionToken resp.SecretAccessKey = v.SecretAccessKey resp.AccessKeyID = v.AccessKeyID diff --git a/internal/demo/register/login.go b/internal/demo/register/login.go index 3a477e12e..e6de926dc 100644 --- a/internal/demo/register/login.go +++ b/internal/demo/register/login.go @@ -53,7 +53,7 @@ func Login(c *gin.Context) { openIMGetUserToken.Secret = config.Config.Secret openIMGetUserToken.UserID = account openIMGetUserTokenResp := api.UserTokenResp{} - bMsg, err := http2.Post(url, openIMGetUserToken, config.Config.MessageCallBack.CallBackTimeOut) + bMsg, err := http2.Post(url, openIMGetUserToken, 2) if err != nil { log.NewError(params.OperationID, "request openIM get user token error", account, "err", err.Error()) c.JSON(http.StatusOK, gin.H{"errCode": constant.GetIMTokenErr, "errMsg": err.Error()}) diff --git a/internal/demo/register/reset_password.go b/internal/demo/register/reset_password.go index 0ff7115a0..b668f4f62 100644 --- a/internal/demo/register/reset_password.go +++ b/internal/demo/register/reset_password.go @@ -12,10 +12,10 @@ import ( ) type resetPasswordRequest struct { - VerificationCode string `json:"verificationCode"` + VerificationCode string `json:"verificationCode" binding:"required"` Email string `json:"email"` PhoneNumber string `json:"phoneNumber"` - NewPassword string `json:"newPassword"` + NewPassword string `json:"newPassword" binding:"required"` OperationID string `json:"operationID"` } diff --git a/internal/demo/register/set_password.go b/internal/demo/register/set_password.go index 3fb996a56..7540d9830 100644 --- a/internal/demo/register/set_password.go +++ b/internal/demo/register/set_password.go @@ -57,7 +57,7 @@ func SetPassword(c *gin.Context) { openIMRegisterReq.Nickname = account openIMRegisterReq.Secret = config.Config.Secret openIMRegisterResp := api.UserRegisterResp{} - bMsg, err := http2.Post(url, openIMRegisterReq, config.Config.MessageCallBack.CallBackTimeOut) + bMsg, err := http2.Post(url, openIMRegisterReq, 2) if err != nil { log.NewError(params.OperationID, "request openIM register error", account, "err", err.Error()) c.JSON(http.StatusOK, gin.H{"errCode": constant.RegisterFailed, "errMsg": err.Error()}) diff --git a/internal/msg_gateway/gate/init.go b/internal/msg_gateway/gate/init.go index 03f82f3ad..a98027561 100644 --- a/internal/msg_gateway/gate/init.go +++ b/internal/msg_gateway/gate/init.go @@ -3,15 +3,19 @@ package gate import ( "Open_IM/pkg/common/config" "Open_IM/pkg/common/log" + "Open_IM/pkg/statistics" + "fmt" "github.com/go-playground/validator/v10" "sync" ) var ( - rwLock *sync.RWMutex - validate *validator.Validate - ws WServer - rpcSvr RPCServer + rwLock *sync.RWMutex + validate *validator.Validate + ws WServer + rpcSvr RPCServer + sendMsgCount uint64 + userCount uint64 ) func Init(rpcPort, wsPort int) { @@ -19,6 +23,8 @@ func Init(rpcPort, wsPort int) { log.NewPrivateLog(config.Config.ModuleName.LongConnSvrName) rwLock = new(sync.RWMutex) validate = validator.New() + statistics.NewStatistics(&sendMsgCount, config.Config.ModuleName.LongConnSvrName, fmt.Sprintf("%d second recv to msg_gateway sendMsgCount", sendMsgCount), 10) + statistics.NewStatistics(&userCount, config.Config.ModuleName.LongConnSvrName, fmt.Sprintf("%d second add user conn", userCount), 10) ws.onInit(wsPort) rpcSvr.onInit(rpcPort) } diff --git a/internal/msg_gateway/gate/logic.go b/internal/msg_gateway/gate/logic.go index d1756093c..df3b1a356 100644 --- a/internal/msg_gateway/gate/logic.go +++ b/internal/msg_gateway/gate/logic.go @@ -142,6 +142,7 @@ func (ws *WServer) pullMsgBySeqListResp(conn *UserConn, m *Req, pb *sdk_ws.PullM } func (ws *WServer) sendMsgReq(conn *UserConn, m *Req) { + sendMsgCount++ log.NewInfo(m.OperationID, "Ws call success to sendMsgReq start", m.MsgIncr, m.ReqIdentifier, m.SendID) nReply := new(pbChat.SendMsgResp) isPass, errCode, errMsg, pData := ws.argsValidate(m, constant.WSSendMsg) diff --git a/internal/msg_gateway/gate/ws_server.go b/internal/msg_gateway/gate/ws_server.go index 2197fbbc2..477d43420 100644 --- a/internal/msg_gateway/gate/ws_server.go +++ b/internal/msg_gateway/gate/ws_server.go @@ -183,7 +183,8 @@ func (ws *WServer) addUserConn(uid string, platformID int32, conn *UserConn, tok for _, v := range ws.wsUserToConn { count = count + len(v) } - log.WarnByKv("WS Add operation", "", "wsUser added", ws.wsUserToConn, "connection_uid", uid, "connection_platform", constant.PlatformIDToName(platformID), "online_user_num", len(ws.wsUserToConn), "online_conn_num", count) + log.Debug("WS Add operation", "", "wsUser added", ws.wsUserToConn, "connection_uid", uid, "connection_platform", constant.PlatformIDToName(platformID), "online_user_num", len(ws.wsUserToConn), "online_conn_num", count) + userCount = uint64(len(ws.wsUserToConn)) } @@ -210,6 +211,7 @@ func (ws *WServer) delUserConn(conn *UserConn) { } else { log.WarnByKv("WS delete operation", "", "wsUser deleted", ws.wsUserToConn, "disconnection_uid", uid, "disconnection_platform", platform, "online_user_num", len(ws.wsUserToConn)) } + userCount = uint64(len(ws.wsUserToConn)) delete(ws.wsConnToUser, conn) } diff --git a/internal/msg_transfer/logic/history_msg_handler.go b/internal/msg_transfer/logic/history_msg_handler.go index b78695c11..294b5ed70 100644 --- a/internal/msg_transfer/logic/history_msg_handler.go +++ b/internal/msg_transfer/logic/history_msg_handler.go @@ -8,6 +8,7 @@ import ( "Open_IM/pkg/grpc-etcdv3/getcdv3" pbMsg "Open_IM/pkg/proto/chat" pbPush "Open_IM/pkg/proto/push" + "Open_IM/pkg/statistics" "Open_IM/pkg/utils" "context" "github.com/Shopify/sarama" @@ -20,9 +21,14 @@ type fcb func(msg []byte, msgKey string) type HistoryConsumerHandler struct { msgHandle map[string]fcb historyConsumerGroup *kfk.MConsumerGroup + singleMsgCount uint64 + groupMsgCount uint64 } func (mc *HistoryConsumerHandler) Init() { + statistics.NewStatistics(&mc.singleMsgCount, config.Config.ModuleName.MsgTransferName, "singleMsgCount insert to mongo ", 10) + statistics.NewStatistics(&mc.groupMsgCount, config.Config.ModuleName.MsgTransferName, "groupMsgCount insert to mongo ", 10) + mc.msgHandle = make(map[string]fcb) mc.msgHandle[config.Config.Kafka.Ws2mschat.Topic] = mc.handleChatWs2Mongo mc.historyConsumerGroup = kfk.NewMConsumerGroup(&kfk.MConsumerGroupConfig{KafkaVersion: sarama.V0_10_2_0, @@ -55,6 +61,7 @@ func (mc *HistoryConsumerHandler) handleChatWs2Mongo(msg []byte, msgKey string) log.NewError(operationID, "single data insert to mongo err", err.Error(), msgFromMQ.String()) return } + mc.singleMsgCount++ log.NewDebug(msgFromMQ.OperationID, "sendMessageToPush cost time ", utils.GetCurrentTimestampByNano()-time) } if !isSenderSync && msgKey == msgFromMQ.MsgData.SendID { @@ -70,6 +77,7 @@ func (mc *HistoryConsumerHandler) handleChatWs2Mongo(msg []byte, msgKey string) log.NewError(operationID, "group data insert to mongo err", msgFromMQ.String(), msgFromMQ.MsgData.RecvID, err.Error()) return } + mc.groupMsgCount++ } go sendMessageToPush(&msgFromMQ, msgFromMQ.MsgData.RecvID) default: diff --git a/internal/push/logic/init.go b/internal/push/logic/init.go index b8941cc6f..02d96fda4 100644 --- a/internal/push/logic/init.go +++ b/internal/push/logic/init.go @@ -11,6 +11,8 @@ import ( "Open_IM/pkg/common/constant" "Open_IM/pkg/common/kafka" "Open_IM/pkg/common/log" + "Open_IM/pkg/statistics" + "fmt" ) var ( @@ -18,6 +20,7 @@ var ( pushCh PushConsumerHandler pushTerminal []int32 producer *kafka.Producer + count uint64 ) func Init(rpcPort int) { @@ -28,6 +31,7 @@ func Init(rpcPort int) { } func init() { producer = kafka.NewKafkaProducer(config.Config.Kafka.Ws2mschat.Addr, config.Config.Kafka.Ws2mschat.Topic) + statistics.NewStatistics(&count, config.Config.ModuleName.PushName, fmt.Sprintf("%d second push to msg_gateway count", 10), 10) } func Run() { diff --git a/internal/push/logic/push_to_client.go b/internal/push/logic/push_to_client.go index 41c943c4b..a5a6397f2 100644 --- a/internal/push/logic/push_to_client.go +++ b/internal/push/logic/push_to_client.go @@ -35,10 +35,10 @@ type AtContent struct { func MsgToUser(pushMsg *pbPush.PushMsgReq) { var wsResult []*pbRelay.SingleMsgToUser isOfflinePush := utils.GetSwitchFromOptions(pushMsg.MsgData.Options, constant.IsOfflinePush) - log.InfoByKv("Get msg from msg_transfer And push msg", pushMsg.OperationID, "PushData", pushMsg.String()) + log.Debug("Get msg from msg_transfer And push msg", pushMsg.OperationID, "PushData", pushMsg.String()) grpcCons := getcdv3.GetConn4Unique(config.Config.Etcd.EtcdSchema, strings.Join(config.Config.Etcd.EtcdAddr, ","), config.Config.RpcRegisterName.OpenImOnlineMessageRelayName) //Online push message - log.InfoByKv("test", pushMsg.OperationID, "len grpc", len(grpcCons), "data", pushMsg.String()) + log.Debug("test", pushMsg.OperationID, "len grpc", len(grpcCons), "data", pushMsg.String()) for _, v := range grpcCons { msgClient := pbRelay.NewOnlineMessageRelayServiceClient(v) reply, err := msgClient.OnlinePushMsg(context.Background(), &pbRelay.OnlinePushMsgReq{OperationID: pushMsg.OperationID, MsgData: pushMsg.MsgData, PushToUserID: pushMsg.PushToUserID}) @@ -51,6 +51,7 @@ func MsgToUser(pushMsg *pbPush.PushMsgReq) { } } log.InfoByKv("push_result", pushMsg.OperationID, "result", wsResult, "sendData", pushMsg.MsgData) + count++ if isOfflinePush && pushMsg.PushToUserID != pushMsg.MsgData.SendID { for _, v := range wsResult { if v.ResultCode == 0 { diff --git a/internal/rpc/auth/callback.go b/internal/rpc/auth/callback.go new file mode 100644 index 000000000..8832b06d1 --- /dev/null +++ b/internal/rpc/auth/callback.go @@ -0,0 +1 @@ +package auth diff --git a/internal/rpc/friend/callback.go b/internal/rpc/friend/callback.go new file mode 100644 index 000000000..cf8ea69c8 --- /dev/null +++ b/internal/rpc/friend/callback.go @@ -0,0 +1 @@ +package friend diff --git a/internal/rpc/group/callback.go b/internal/rpc/group/callback.go new file mode 100644 index 000000000..8e0fcfb7b --- /dev/null +++ b/internal/rpc/group/callback.go @@ -0,0 +1,9 @@ +package group + +import ( + pbGroup "Open_IM/pkg/proto/group" +) + +func callbackBeforeCreateGroup(req *pbGroup.CreateGroupReq) (bool, error) { + return true, nil +} diff --git a/internal/rpc/group/group.go b/internal/rpc/group/group.go index 40255d071..7d4b85094 100644 --- a/internal/rpc/group/group.go +++ b/internal/rpc/group/group.go @@ -76,7 +76,12 @@ func (s *groupServer) CreateGroup(ctx context.Context, req *pbGroup.CreateGroupR log.NewError(req.OperationID, "CheckAccess false ", req.OpUserID, req.OwnerUserID) return &pbGroup.CreateGroupResp{ErrCode: constant.ErrAccess.ErrCode, ErrMsg: constant.ErrAccess.ErrMsg}, nil } - + canCreate, err := callbackBeforeCreateGroup(req) + if err != nil || !canCreate { + if err != nil { + log.NewError(req.OperationID, utils.GetSelfFuncName(), "callbackBeforeCreateGroup failed", ) + } + } //Time stamp + MD5 to generate group chat id groupId := utils.Md5(strconv.FormatInt(time.Now().UnixNano(), 10)) //to group @@ -84,7 +89,7 @@ func (s *groupServer) CreateGroup(ctx context.Context, req *pbGroup.CreateGroupR utils.CopyStructFields(&groupInfo, req.GroupInfo) groupInfo.CreatorUserID = req.OpUserID groupInfo.GroupID = groupId - err := imdb.InsertIntoGroup(groupInfo) + err = imdb.InsertIntoGroup(groupInfo) if err != nil { log.NewError(req.OperationID, "InsertIntoGroup failed, ", err.Error(), groupInfo) return &pbGroup.CreateGroupResp{ErrCode: constant.ErrDB.ErrCode, ErrMsg: constant.ErrDB.ErrMsg}, http.WrapError(constant.ErrDB) diff --git a/internal/rpc/msg/callback.go b/internal/rpc/msg/callback.go new file mode 100644 index 000000000..bb8ccd700 --- /dev/null +++ b/internal/rpc/msg/callback.go @@ -0,0 +1,159 @@ +package msg + +import ( + cbApi "Open_IM/pkg/call_back_struct" + "Open_IM/pkg/common/config" + "Open_IM/pkg/common/constant" + "Open_IM/pkg/common/http" + "Open_IM/pkg/common/log" + pbChat "Open_IM/pkg/proto/chat" + "Open_IM/pkg/utils" +) + +func copyCallbackCommonReqStruct(msg *pbChat.SendMsgReq) cbApi.CommonCallbackReq { + return cbApi.CommonCallbackReq{ + SendID: msg.MsgData.SendID, + ServerMsgID: msg.MsgData.ServerMsgID, + ClientMsgID: msg.MsgData.ClientMsgID, + OperationID: msg.OperationID, + SenderPlatformID: msg.MsgData.SenderPlatformID, + SenderNickname: msg.MsgData.SenderNickname, + SessionType: msg.MsgData.SessionType, + MsgFrom: msg.MsgData.MsgFrom, + ContentType: msg.MsgData.ContentType, + Status: msg.MsgData.Status, + CreateTime: msg.MsgData.CreateTime, + Content: string(msg.MsgData.Content), + } +} + +func callbackBeforeSendSingleMsg(msg *pbChat.SendMsgReq) (canSend bool, err error) { + if !config.Config.Callback.CallbackBeforeSendSingleMsg.Enable { + return true, nil + } + log.NewDebug(msg.OperationID, utils.GetSelfFuncName(), msg) + commonCallbackReq := copyCallbackCommonReqStruct(msg) + commonCallbackReq.CallbackCommand = constant.CallbackBeforeSendSingleMsgCommand + req := cbApi.CallbackBeforeSendSingleMsgReq{ + CommonCallbackReq: commonCallbackReq, + RecvID: msg.MsgData.RecvID, + } + resp := &cbApi.CallbackBeforeSendSingleMsgResp{ + CommonCallbackResp: cbApi.CommonCallbackResp{}, + } + //utils.CopyStructFields(req, msg.MsgData) + defer log.NewDebug(msg.OperationID, utils.GetSelfFuncName(), req, *resp) + if err := http.PostReturn(config.Config.Callback.CallbackUrl, req, resp, config.Config.Callback.CallbackBeforeSendSingleMsg.CallbackTimeOut); err != nil { + if !config.Config.Callback.CallbackBeforeSendSingleMsg.CallbackFailedContinue { + return false, err + } else { + return true, err + } + } else { + if resp.ActionCode == constant.ActionForbidden && resp.ErrCode == constant.CallbackHandleSuccess { + return false, nil + } + } + return true, err +} + +func callbackAfterSendSingleMsg(msg *pbChat.SendMsgReq) error { + if !config.Config.Callback.CallbackAfterSendSingleMsg.Enable { + return nil + } + log.NewDebug(msg.OperationID, utils.GetSelfFuncName(), msg) + commonCallbackReq := copyCallbackCommonReqStruct(msg) + commonCallbackReq.CallbackCommand = constant.CallbackAfterSendSingleMsgCommand + req := cbApi.CallbackAfterSendSingleMsgReq{ + CommonCallbackReq: commonCallbackReq, + RecvID: msg.MsgData.RecvID, + } + resp := &cbApi.CallbackAfterSendSingleMsgResp{CommonCallbackResp: cbApi.CommonCallbackResp{}} + //utils.CopyStructFields(req, msg.MsgData) + defer log.NewDebug(msg.OperationID, utils.GetSelfFuncName(), req, *resp) + if err := http.PostReturn(config.Config.Callback.CallbackUrl, req, resp, config.Config.Callback.CallbackAfterSendSingleMsg.CallbackTimeOut); err != nil { + return err + } + return nil +} + +func callbackBeforeSendGroupMsg(msg *pbChat.SendMsgReq) (canSend bool, err error) { + if !config.Config.Callback.CallbackBeforeSendGroupMsg.Enable { + return true, nil + } + log.NewDebug(msg.OperationID, utils.GetSelfFuncName(), msg) + commonCallbackReq := copyCallbackCommonReqStruct(msg) + commonCallbackReq.CallbackCommand = constant.CallbackBeforeSendGroupMsgCommand + req := cbApi.CallbackAfterSendGroupMsgReq{ + CommonCallbackReq: commonCallbackReq, + GroupID: msg.MsgData.GroupID, + } + resp := &cbApi.CallbackBeforeSendGroupMsgResp{CommonCallbackResp: cbApi.CommonCallbackResp{}} + //utils.CopyStructFields(req, msg.MsgData) + defer log.NewDebug(msg.OperationID, utils.GetSelfFuncName(), req, *resp) + if err := http.PostReturn(config.Config.Callback.CallbackUrl, req, resp, config.Config.Callback.CallbackBeforeSendGroupMsg.CallbackTimeOut); err != nil { + if !config.Config.Callback.CallbackBeforeSendGroupMsg.CallbackFailedContinue { + return false, err + } else { + return true, err + } + } else { + if resp.ActionCode == constant.ActionForbidden && resp.ErrCode == constant.CallbackHandleSuccess { + return false, nil + } + } + return true, err +} + +func callbackAfterSendGroupMsg(msg *pbChat.SendMsgReq) error { + if !config.Config.Callback.CallbackAfterSendGroupMsg.Enable { + return nil + } + log.NewDebug(msg.OperationID, utils.GetSelfFuncName(), msg) + commonCallbackReq := copyCallbackCommonReqStruct(msg) + commonCallbackReq.CallbackCommand = constant.CallbackAfterSendGroupMsgCommand + req := cbApi.CallbackAfterSendGroupMsgReq{ + CommonCallbackReq: commonCallbackReq, + GroupID: msg.MsgData.GroupID, + } + resp := &cbApi.CallbackAfterSendGroupMsgResp{CommonCallbackResp: cbApi.CommonCallbackResp{}} + + //utils.CopyStructFields(req, msg.MsgData) + defer log.NewDebug(msg.OperationID, utils.GetSelfFuncName(), req, *resp) + if err := http.PostReturn(config.Config.Callback.CallbackUrl, req, resp, config.Config.Callback.CallbackAfterSendGroupMsg.CallbackTimeOut); err != nil { + return err + } + return nil +} + +func callbackWordFilter(msg *pbChat.SendMsgReq) (canSend bool, err error) { + if !config.Config.Callback.CallbackWordFilter.Enable || msg.MsgData.ContentType != constant.Text { + return true, nil + } + log.NewDebug(msg.OperationID, utils.GetSelfFuncName(), msg) + commonCallbackReq := copyCallbackCommonReqStruct(msg) + commonCallbackReq.CallbackCommand = constant.CallbackWordFilterCommand + req := cbApi.CallbackWordFilterReq{ + CommonCallbackReq: commonCallbackReq, + } + resp := &cbApi.CallbackWordFilterResp{CommonCallbackResp: cbApi.CommonCallbackResp{}} + //utils.CopyStructFields(&req., msg.MsgData) + defer log.NewDebug(msg.OperationID, utils.GetSelfFuncName(), req, *resp) + if err := http.PostReturn(config.Config.Callback.CallbackUrl, req, resp, config.Config.Callback.CallbackWordFilter.CallbackTimeOut); err != nil { + if !config.Config.Callback.CallbackWordFilter.CallbackFailedContinue { + log.NewDebug(msg.OperationID, utils.GetSelfFuncName(), "callback failed and config disable, stop this operation") + return false, err + } else { + return true, err + } + } else { + if resp.ActionCode == constant.ActionForbidden && resp.ErrCode == constant.CallbackHandleSuccess { + return false, nil + } + if resp.ErrCode == constant.CallbackHandleSuccess { + msg.MsgData.Content = []byte(resp.Content) + } + log.NewDebug(msg.OperationID, utils.GetSelfFuncName(), string(msg.MsgData.Content)) + } + return true, err +} diff --git a/internal/rpc/msg/send_msg.go b/internal/rpc/msg/send_msg.go index 114746da2..af8f40b4f 100644 --- a/internal/rpc/msg/send_msg.go +++ b/internal/rpc/msg/send_msg.go @@ -4,7 +4,6 @@ import ( "Open_IM/pkg/common/config" "Open_IM/pkg/common/constant" "Open_IM/pkg/common/db" - http2 "Open_IM/pkg/common/http" "Open_IM/pkg/common/log" "Open_IM/pkg/grpc-etcdv3/getcdv3" pbChat "Open_IM/pkg/proto/chat" @@ -12,11 +11,9 @@ import ( sdk_ws "Open_IM/pkg/proto/sdk_ws" "Open_IM/pkg/utils" "context" - "encoding/json" "github.com/garyburd/redigo/redis" "github.com/golang/protobuf/proto" "math/rand" - "net/http" "strconv" "strings" "time" @@ -127,25 +124,27 @@ func (rpc *rpcChat) SendMsg(_ context.Context, pb *pbChat.SendMsgReq) (*pbChat.S if !isHistory { mReq.IsOnlineOnly = true } - mResp := MsgCallBackResp{} - if config.Config.MessageCallBack.CallbackSwitch { - bMsg, err := http2.Post(config.Config.MessageCallBack.CallbackUrl, mReq, config.Config.MessageCallBack.CallBackTimeOut) - if err != nil { - log.ErrorByKv("callback to Business server err", pb.OperationID, "args", pb.String(), "err", err.Error()) - return returnMsg(&replay, pb, http.StatusInternalServerError, http.StatusText(http.StatusInternalServerError), "", 0) - } else if err = json.Unmarshal(bMsg, &mResp); err != nil { - log.ErrorByKv("ws json Unmarshal err", pb.OperationID, "args", pb.String(), "err", err.Error()) - return returnMsg(&replay, pb, 200, err.Error(), "", 0) - } else { - if mResp.ErrCode != 0 { - return returnMsg(&replay, pb, mResp.ResponseErrCode, mResp.ErrMsg, "", 0) - } else { - pb.MsgData.Content = []byte(mResp.ResponseResult.ModifiedMsg) - } - } + + // callback + canSend, err := callbackWordFilter(pb) + if err != nil { + log.NewError(pb.OperationID, utils.GetSelfFuncName(), "callbackWordFilter failed", err.Error(), pb.MsgData) + } + if !canSend { + log.NewDebug(pb.OperationID, utils.GetSelfFuncName(), "callbackWordFilter result", canSend, "end rpc and return", pb.MsgData) + return returnMsg(&replay, pb, 201, "callbackWordFilter result stop rpc and return", "", 0) } switch pb.MsgData.SessionType { case constant.SingleChatType: + // callback + canSend, err := callbackBeforeSendSingleMsg(pb) + if err != nil { + log.NewError(pb.OperationID, utils.GetSelfFuncName(), "callbackBeforeSendSingleMsg failed", err.Error()) + } + if !canSend { + log.NewDebug(pb.OperationID, utils.GetSelfFuncName(), "callbackBeforeSendSingleMsg result", canSend, "end rpc and return") + return returnMsg(&replay, pb, 201, "callbackBeforeSendSingleMsg result stop rpc and return", "", 0) + } isSend := modifyMessageByUserMessageReceiveOpt(pb.MsgData.RecvID, pb.MsgData.SendID, constant.SingleChatType, pb) if isSend { msgToMQ.MsgData = pb.MsgData @@ -163,8 +162,21 @@ func (rpc *rpcChat) SendMsg(_ context.Context, pb *pbChat.SendMsgReq) (*pbChat.S return returnMsg(&replay, pb, 201, "kafka send msg err", "", 0) } } + // callback + if err := callbackAfterSendSingleMsg(pb); err != nil { + log.NewError(pb.OperationID, utils.GetSelfFuncName(), "callbackAfterSendSingleMsg failed", err.Error()) + } return returnMsg(&replay, pb, 0, "", msgToMQ.MsgData.ServerMsgID, msgToMQ.MsgData.SendTime) case constant.GroupChatType: + // callback + canSend, err := callbackBeforeSendGroupMsg(pb) + if err != nil { + log.NewError(pb.OperationID, utils.GetSelfFuncName(), "callbackBeforeSendGroupMsg failed", err.Error()) + } + if !canSend { + log.NewDebug(pb.OperationID, utils.GetSelfFuncName(), "callbackBeforeSendGroupMsg result", canSend, "end rpc and return") + return returnMsg(&replay, pb, 201, "callbackBeforeSendGroupMsg result stop rpc and return", "", 0) + } etcdConn := getcdv3.GetConn(config.Config.Etcd.EtcdSchema, strings.Join(config.Config.Etcd.EtcdAddr, ","), config.Config.RpcRegisterName.OpenImGroupName) client := pbGroup.NewGroupClient(etcdConn) req := &pbGroup.GetGroupAllMemberReq{ @@ -228,7 +240,10 @@ func (rpc *rpcChat) SendMsg(_ context.Context, pb *pbChat.SendMsgReq) (*pbChat.S return returnMsg(&replay, pb, 201, "kafka send msg err", "", 0) } } - + } + // callback + if err := callbackAfterSendGroupMsg(pb); err != nil { + log.NewError(pb.OperationID, utils.GetSelfFuncName(), "callbackAfterSendGroupMsg failed", err.Error()) } return returnMsg(&replay, pb, 0, "", msgToMQ.MsgData.ServerMsgID, msgToMQ.MsgData.SendTime) default: @@ -245,7 +260,7 @@ func (rpc *rpcChat) sendMsgToKafka(m *pbChat.MsgDataToMQ, key string) error { } func GetMsgID(sendID string) string { t := time.Now().Format("2006-01-02 15:04:05") - return t + "-" + sendID + "-" + strconv.Itoa(rand.Int()) + return utils.Md5(t + "-" + sendID + "-" + strconv.Itoa(rand.Int())) } func returnMsg(replay *pbChat.SendMsgResp, pb *pbChat.SendMsgReq, errCode int32, errMsg, serverMsgID string, sendTime int64) (*pbChat.SendMsgResp, error) { diff --git a/internal/rpc/statistics/statistics.go b/internal/rpc/statistics/statistics.go index 9554bf116..e8b1383f9 100644 --- a/internal/rpc/statistics/statistics.go +++ b/internal/rpc/statistics/statistics.go @@ -164,28 +164,41 @@ func GetRangeDate(from, to time.Time) [][2]time.Time { } // month case !isInOneMonth(from, to): - for i := 0; ; i++ { - if i == 0 { - fromTime := from - toTime := getFirstDateOfNextNMonth(fromTime, 1) + if to.Sub(from) < time.Hour * 24 * 30 { + for i := 0; ; i++ { + fromTime := from.Add(time.Hour * 24 * time.Duration(i)) + toTime := from.Add(time.Hour * 24 * time.Duration(i+1)) + if toTime.After(to.Add(time.Hour * 24)) { + break + } times = append(times, [2]time.Time{ fromTime, toTime, }) - } else { - fromTime := getFirstDateOfNextNMonth(from, i) - toTime := getFirstDateOfNextNMonth(fromTime, 1) - if toTime.After(to) { - toTime = to + } + } else { + for i := 0; ; i++ { + if i == 0 { + fromTime := from + toTime := getFirstDateOfNextNMonth(fromTime, 1) + times = append(times, [2]time.Time{ + fromTime, toTime, + }) + } else { + fromTime := getFirstDateOfNextNMonth(from, i) + toTime := getFirstDateOfNextNMonth(fromTime, 1) + if toTime.After(to) { + toTime = to + times = append(times, [2]time.Time{ + fromTime, toTime, + }) + break + } times = append(times, [2]time.Time{ fromTime, toTime, }) - break } - times = append(times, [2]time.Time{ - fromTime, toTime, - }) - } + } } } return times diff --git a/internal/rpc/user/callback.go b/internal/rpc/user/callback.go new file mode 100644 index 000000000..a00006b65 --- /dev/null +++ b/internal/rpc/user/callback.go @@ -0,0 +1 @@ +package user diff --git a/internal/utils/callback.go b/internal/utils/callback.go new file mode 100644 index 000000000..d4b585bf7 --- /dev/null +++ b/internal/utils/callback.go @@ -0,0 +1 @@ +package utils diff --git a/pkg/call_back_struct/common.go b/pkg/call_back_struct/common.go index 90b77eda8..e2bf510f4 100644 --- a/pkg/call_back_struct/common.go +++ b/pkg/call_back_struct/common.go @@ -1 +1,26 @@ package call_back_struct + +type CommonCallbackReq struct { + SendID string `json:"sendID"` + CallbackCommand string `json:"callbackCommand"` + ServerMsgID string `json:"serverMsgID"` + ClientMsgID string `json:"clientMsgID"` + OperationID string `json:"operationID"` + SenderPlatformID int32 `json:"senderPlatformID"` + SenderNickname string `json:"senderNickname"` + SessionType int32 `json:"sessionType"` + MsgFrom int32 `json:"msgFrom"` + ContentType int32 `json:"contentType"` + Status int32 `json:"status"` + CreateTime int64 `json:"createTime"` + Content string `json:"content"` +} + +type CommonCallbackResp struct { + ActionCode int `json:"actionCode"` + ErrCode int `json:"errCode"` + ErrMsg string `json:"errMsg"` + OperationID string `json:"operationID"` +} + + diff --git a/pkg/call_back_struct/group.go b/pkg/call_back_struct/group.go new file mode 100644 index 000000000..3681dddcc --- /dev/null +++ b/pkg/call_back_struct/group.go @@ -0,0 +1,9 @@ +package call_back_struct + +type CallbackBeforeCreateGroupReq struct { + CommonCallbackReq +} + +type CallbackAfterCreateGroupResp struct { + CommonCallbackResp +} \ No newline at end of file diff --git a/pkg/call_back_struct/message.go b/pkg/call_back_struct/message.go index 90b77eda8..31c215ea2 100644 --- a/pkg/call_back_struct/message.go +++ b/pkg/call_back_struct/message.go @@ -1 +1,47 @@ package call_back_struct + + +type CallbackBeforeSendSingleMsgReq struct { + CommonCallbackReq + RecvID string `json:"recvID"` +} + +type CallbackBeforeSendSingleMsgResp struct { + CommonCallbackResp +} + +type CallbackAfterSendSingleMsgReq struct { + CommonCallbackReq + RecvID string `json:"recvID"` +} + +type CallbackAfterSendSingleMsgResp struct { + CommonCallbackResp +} + +type CallbackBeforeSendGroupMsgReq struct { + CommonCallbackReq + GroupID string `json:"groupID"` +} + +type CallbackBeforeSendGroupMsgResp struct { + CommonCallbackResp +} + +type CallbackAfterSendGroupMsgReq struct { + CommonCallbackReq + GroupID string `json:"groupID"` +} + +type CallbackAfterSendGroupMsgResp struct { + CommonCallbackResp +} + +type CallbackWordFilterReq struct { + CommonCallbackReq +} + +type CallbackWordFilterResp struct { + CommonCallbackResp + Content string `json:"content"` +} diff --git a/pkg/common/config/config.go b/pkg/common/config/config.go index ae0d3cadc..282440672 100644 --- a/pkg/common/config/config.go +++ b/pkg/common/config/config.go @@ -19,6 +19,12 @@ var ( var Config config +type callBackConfig struct { + Enable bool `yaml:"enable"` + CallbackTimeOut int `yaml:"callbackTimeOut"` + CallbackFailedContinue bool `CallbackFailedContinue` +} + type config struct { ServerIP string `yaml:"serverip"` ServerVersion string `yaml:"serverversion"` @@ -167,11 +173,6 @@ type config struct { AccessSecret string `yaml:"accessSecret"` AccessExpire int64 `yaml:"accessExpire"` } - MessageCallBack struct { - CallbackSwitch bool `yaml:"callbackSwitch"` - CallbackUrl string `yaml:"callbackUrl"` - CallBackTimeOut int `yaml:"callbackTimeOut"` - } MessageJudge struct { IsJudgeFriend bool `yaml:"isJudgeFriend"` } @@ -179,6 +180,15 @@ type config struct { PushSound string `yaml:"pushSound"` BadgeCount bool `yaml:"badgeCount"` } + + Callback struct { + CallbackUrl string `yaml:"callbackUrl"` + CallbackBeforeSendSingleMsg callBackConfig `yaml:"callbackbeforeSendSingleMsg"` + CallbackAfterSendSingleMsg callBackConfig `yaml:"callbackAfterSendSingleMsg"` + CallbackBeforeSendGroupMsg callBackConfig `yaml:"callbackBeforeSendGroupMsg"` + CallbackAfterSendGroupMsg callBackConfig `yaml:"callbackAfterSendGroupMsg"` + CallbackWordFilter callBackConfig `yaml:"callbackWordFilter"` + } `yaml:"callback"` Notification struct { ///////////////////////group///////////////////////////// GroupCreated struct { diff --git a/pkg/common/constant/constant.go b/pkg/common/constant/constant.go index c5e2d38b9..199a1917a 100644 --- a/pkg/common/constant/constant.go +++ b/pkg/common/constant/constant.go @@ -136,6 +136,19 @@ const ( VerificationCodeForReset = 2 VerificationCodeForRegisterSuffix = "_forRegister" VerificationCodeForResetSuffix = "_forReset" + + //callbackCommand + CallbackBeforeSendSingleMsgCommand = "callbackBeforeSendSingleMsgCommand" + CallbackAfterSendSingleMsgCommand = "callbackAfterSendSingleMsgCommand" + CallbackBeforeSendGroupMsgCommand = "callbackBeforeSendGroupMsgCommand" + CallbackAfterSendGroupMsgCommand = "callbackAfterSendGroupMsgCommand" + CallbackWordFilterCommand = "callbackWordFilterCommand" + //callback actionCode + ActionAllow = 0 + ActionForbidden = 1 + //callback callbackHandleCode + CallbackHandleSuccess = 0 + CallbackHandleFailed = 1 ) var ContentType2PushContent = map[int64]string{ diff --git a/pkg/common/constant/error.go b/pkg/common/constant/error.go index 339f2c953..548720a37 100644 --- a/pkg/common/constant/error.go +++ b/pkg/common/constant/error.go @@ -51,6 +51,7 @@ var ( ErrAccess = ErrInfo{ErrCode: 801, ErrMsg: AccessMsg.Error()} ErrDB = ErrInfo{ErrCode: 802, ErrMsg: DBMsg.Error()} ErrArgs = ErrInfo{ErrCode: 8003, ErrMsg: ArgsMsg.Error()} + ErrCallback = ErrInfo{ErrCode: 809, ErrMsg: CallBackMsg.Error()} ) var ( @@ -64,6 +65,7 @@ var ( AccessMsg = errors.New("no permission") DBMsg = errors.New("db failed") ArgsMsg = errors.New("args failed") + CallBackMsg = errors.New("callback failed") ThirdPartyMsg = errors.New("third party error") ) diff --git a/pkg/common/db/mysql_model/im_mysql_model/friend_model.go b/pkg/common/db/mysql_model/im_mysql_model/friend_model.go index 45650357b..700082353 100644 --- a/pkg/common/db/mysql_model/im_mysql_model/friend_model.go +++ b/pkg/common/db/mysql_model/im_mysql_model/friend_model.go @@ -26,7 +26,7 @@ func GetFriendRelationshipFromFriend(OwnerUserID, FriendUserID string) (*db.Frie return nil, err } var friend db.Friend - err = dbConn.Table("friends").Where("owner_user_id=? and friend_user_id=?", OwnerUserID, FriendUserID).Find(&friend).Error + err = dbConn.Table("friends").Where("owner_user_id=? and friend_user_id=?", OwnerUserID, FriendUserID).Take(&friend).Error if err != nil { return nil, err } diff --git a/pkg/common/db/mysql_model/im_mysql_model/friend_request_model.go b/pkg/common/db/mysql_model/im_mysql_model/friend_request_model.go index 609be8687..171a6aedf 100644 --- a/pkg/common/db/mysql_model/im_mysql_model/friend_request_model.go +++ b/pkg/common/db/mysql_model/im_mysql_model/friend_request_model.go @@ -53,7 +53,7 @@ func GetFriendApplicationByBothUserID(FromUserID, ToUserID string) (*db.FriendRe return nil, err } var friendRequest db.FriendRequest - err = dbConn.Table("friend_requests").Where("from_user_id=? and to_user_id=?", FromUserID, ToUserID).Find(&friendRequest).Error + err = dbConn.Table("friend_requests").Where("from_user_id=? and to_user_id=?", FromUserID, ToUserID).Take(&friendRequest).Error if err != nil { return nil, err } diff --git a/pkg/common/db/mysql_model/im_mysql_model/group_member_model.go b/pkg/common/db/mysql_model/im_mysql_model/group_member_model.go index 4dafa30a6..63f12c115 100644 --- a/pkg/common/db/mysql_model/im_mysql_model/group_member_model.go +++ b/pkg/common/db/mysql_model/im_mysql_model/group_member_model.go @@ -4,6 +4,7 @@ import ( "Open_IM/pkg/common/constant" "Open_IM/pkg/common/db" "Open_IM/pkg/utils" + "errors" "fmt" "time" ) @@ -43,6 +44,7 @@ func GetGroupMemberListByUserID(userID string) ([]db.GroupMember, error) { } var groupMemberList []db.GroupMember err = dbConn.Table("group_members").Where("user_id=?", userID).Find(&groupMemberList).Error + //err = dbConn.Table("group_members").Where("user_id=?", userID).Take(&groupMemberList).Error if err != nil { return nil, err } @@ -82,7 +84,7 @@ func GetGroupMemberInfoByGroupIDAndUserID(groupID, userID string) (*db.GroupMemb return nil, err } var groupMember db.GroupMember - err = dbConn.Table("group_members").Where("group_id=? and user_id=? ", groupID, userID).Limit(1).Find(&groupMember).Error + err = dbConn.Table("group_members").Where("group_id=? and user_id=? ", groupID, userID).Limit(1).Take(&groupMember).Error if err != nil { return nil, err } @@ -149,7 +151,8 @@ func GetGroupOwnerInfoByGroupID(groupID string) (*db.GroupMember, error) { return &v, nil } } - return nil, nil + + return nil, utils.Wrap(errors.New("no owner"), "") } func IsExistGroupMember(groupID, userID string) bool { diff --git a/pkg/common/db/mysql_model/im_mysql_model/group_model.go b/pkg/common/db/mysql_model/im_mysql_model/group_model.go index a9fe13236..587e3672e 100644 --- a/pkg/common/db/mysql_model/im_mysql_model/group_model.go +++ b/pkg/common/db/mysql_model/im_mysql_model/group_model.go @@ -45,7 +45,7 @@ func GetGroupInfoByGroupID(groupId string) (*db.Group, error) { return nil, utils.Wrap(err, "") } var groupInfo db.Group - err = dbConn.Table("groups").Where("group_id=?", groupId).Find(&groupInfo).Error + err = dbConn.Table("groups").Where("group_id=?", groupId).Take(&groupInfo).Error if err != nil { return nil, err } @@ -86,11 +86,10 @@ func GetGroups(pageNumber, showNumber int) ([]db.Group, error) { return groups, nil } - func OperateGroupStatus(groupId string, groupStatus int32) error { group := db.Group{ GroupID: groupId, - Status: groupStatus, + Status: groupStatus, } if err := SetGroupInfo(group); err != nil { return err @@ -98,7 +97,6 @@ func OperateGroupStatus(groupId string, groupStatus int32) error { return nil } - func DeleteGroup(groupId string) error { dbConn, err := db.DB.MysqlDB.DefaultGormDB() if err != nil { @@ -129,15 +127,14 @@ func OperateGroupRole(userId, groupId string, roleLevel int32) (string, string, updateInfo := db.GroupMember{ RoleLevel: roleLevel, } - groupMaster := db.GroupMember{ - } + groupMaster := db.GroupMember{} switch roleLevel { case constant.GroupOwner: err = dbConn.Transaction(func(tx *gorm.DB) error { result := dbConn.Table("group_members").Where("group_id = ? and role_level = ?", groupId, constant.GroupOwner).First(&groupMaster).Update(&db.GroupMember{ RoleLevel: constant.GroupOrdinaryUsers, }) - if result.Error != nil { + if result.Error != nil { return result.Error } if result.RowsAffected == 0 { @@ -145,7 +142,7 @@ func OperateGroupRole(userId, groupId string, roleLevel int32) (string, string, } result = dbConn.Table("group_members").First(&groupMember).Update(updateInfo) - if result.Error != nil { + if result.Error != nil { return result.Error } if result.RowsAffected == 0 { @@ -161,7 +158,7 @@ func OperateGroupRole(userId, groupId string, roleLevel int32) (string, string, return result.Error } if result.RowsAffected == 0 { - return errors.New(fmt.Sprintf("user %s not exist in group %s or already operate", userId, groupId)) + return errors.New(fmt.Sprintf("user %s not exist in group %s or already operate", userId, groupId)) } if groupMaster.UserID == userId { return errors.New(fmt.Sprintf("user %s is master of %s, cant set to ordinary user", userId, groupId)) @@ -171,7 +168,7 @@ func OperateGroupRole(userId, groupId string, roleLevel int32) (string, string, return result.Error } if result.RowsAffected == 0 { - return errors.New(fmt.Sprintf("user %s not exist in group %s or already operate", userId, groupId)) + return errors.New(fmt.Sprintf("user %s not exist in group %s or already operate", userId, groupId)) } } return nil @@ -219,4 +216,4 @@ func GetGroupMaster(groupId string) (db.GroupMember, error) { return groupMember, err } return groupMember, nil -} \ No newline at end of file +} diff --git a/pkg/common/db/mysql_model/im_mysql_model/group_request_model.go b/pkg/common/db/mysql_model/im_mysql_model/group_request_model.go index 745ad0034..836f3f0bf 100644 --- a/pkg/common/db/mysql_model/im_mysql_model/group_request_model.go +++ b/pkg/common/db/mysql_model/im_mysql_model/group_request_model.go @@ -63,7 +63,7 @@ func GetGroupRequestByGroupIDAndUserID(groupID, userID string) (*db.GroupRequest return nil, err } var groupRequest db.GroupRequest - err = dbConn.Table("group_requests").Where("user_id=? and group_id=?", userID, groupID).Find(&groupRequest).Error + err = dbConn.Table("group_requests").Where("user_id=? and group_id=?", userID, groupID).Take(&groupRequest).Error if err != nil { return nil, err } @@ -128,7 +128,6 @@ func GetUserReqGroupByUserID(userID string) ([]db.GroupRequest, error) { return groupRequestList, err } - // //func GroupApplicationResponse(pb *group.GroupApplicationResponseReq) (*group.CommonResp, error) { // diff --git a/pkg/common/db/mysql_model/im_mysql_model/user_model.go b/pkg/common/db/mysql_model/im_mysql_model/user_model.go index de5045855..7492d6a38 100644 --- a/pkg/common/db/mysql_model/im_mysql_model/user_model.go +++ b/pkg/common/db/mysql_model/im_mysql_model/user_model.go @@ -51,18 +51,6 @@ func UserRegister(user db.User) error { return nil } -type User struct { - UserID string `gorm:"column:user_id;primaryKey;"` - Nickname string `gorm:"column:name"` - FaceUrl string `gorm:"column:icon"` - Gender int32 `gorm:"column:gender"` - PhoneNumber string `gorm:"column:phone_number"` - Birth string `gorm:"column:birth"` - Email string `gorm:"column:email"` - Ex string `gorm:"column:ex"` - CreateTime time.Time `gorm:"column:create_time"` -} - func DeleteUser(userID string) (i int64) { dbConn, err := db.DB.MysqlDB.DefaultGormDB() if err != nil { @@ -78,7 +66,7 @@ func GetUserByUserID(userID string) (*db.User, error) { return nil, err } var user db.User - err = dbConn.Table("users").Where("user_id=?", userID).First(&user).Error + err = dbConn.Table("users").Where("user_id=?", userID).Take(&user).Error if err != nil { return nil, err } diff --git a/pkg/common/db/mysql_model/im_mysql_msg_model/chat_log_model.go b/pkg/common/db/mysql_model/im_mysql_msg_model/chat_log_model.go index fef4c1fb2..99c099ac4 100644 --- a/pkg/common/db/mysql_model/im_mysql_msg_model/chat_log_model.go +++ b/pkg/common/db/mysql_model/im_mysql_msg_model/chat_log_model.go @@ -9,6 +9,7 @@ package im_mysql_msg_model import ( "Open_IM/pkg/common/constant" "Open_IM/pkg/common/db" + "Open_IM/pkg/common/log" pbMsg "Open_IM/pkg/proto/chat" "Open_IM/pkg/proto/sdk_ws" "Open_IM/pkg/utils" @@ -45,5 +46,6 @@ func InsertMessageToChatLog(msg pbMsg.MsgDataToMQ) error { } chatLog.CreateTime = utils.UnixMillSecondToTime(msg.MsgData.CreateTime) chatLog.SendTime = utils.UnixMillSecondToTime(msg.MsgData.SendTime) + log.NewDebug("test", "this is ", chatLog) return dbConn.Table("chat_logs").Create(chatLog).Error } diff --git a/pkg/common/http/http_client.go b/pkg/common/http/http_client.go index 4d480770a..e227da91e 100644 --- a/pkg/common/http/http_client.go +++ b/pkg/common/http/http_client.go @@ -53,3 +53,14 @@ func Post(url string, data interface{}, timeOutSecond int) (content []byte, err } return result, nil } + +func PostReturn(url string, input, output interface{}, timeOut int) error { + b, err := Post(url, input, timeOut) + if err != nil { + return err + } + if err = json.Unmarshal(b, output); err != nil { + return err + } + return nil +} diff --git a/pkg/common/log/file_line_hk.go b/pkg/common/log/file_line_hk.go index 9ffa7806e..bc908ee03 100644 --- a/pkg/common/log/file_line_hk.go +++ b/pkg/common/log/file_line_hk.go @@ -28,6 +28,17 @@ func (f *fileHook) Fire(entry *logrus.Entry) error { return nil } +//func (f *fileHook) Fire(entry *logrus.Entry) error { +// var s string +// _, b, c, _ := runtime.Caller(10) +// i := strings.SplitAfter(b, "/") +// if len(i) > 3 { +// s = i[len(i)-3] + i[len(i)-2] + i[len(i)-1] + ":" + utils.IntToString(c) +// } +// entry.Data["FilePath"] = s +// return nil +//} + func findCaller(skip int) string { file := "" line := 0 diff --git a/pkg/grpc-etcdv3/getcdv3/register.go b/pkg/grpc-etcdv3/getcdv3/register.go index 4fd2cd5a7..d1c7957bf 100644 --- a/pkg/grpc-etcdv3/getcdv3/register.go +++ b/pkg/grpc-etcdv3/getcdv3/register.go @@ -1,6 +1,7 @@ package getcdv3 import ( + "Open_IM/pkg/common/log" "context" "fmt" "go.etcd.io/etcd/clientv3" @@ -39,9 +40,9 @@ func RegisterEtcd4Unique(schema, etcdAddr, myHost string, myPort int, serviceNam func RegisterEtcd(schema, etcdAddr, myHost string, myPort int, serviceName string, ttl int) error { cli, err := clientv3.New(clientv3.Config{ Endpoints: strings.Split(etcdAddr, ","), DialTimeout: 5 * time.Second}) - fmt.Println("RegisterEtcd") + + log.Info("", "RegisterEtcd, ", schema, etcdAddr, myHost, myPort, serviceName, ttl) if err != nil { - // return fmt.Errorf("grpclb: create clientv3 client failed: %v", err) return fmt.Errorf("create etcd clientv3 client failed, errmsg:%v, etcd addr:%s", err, etcdAddr) } @@ -66,15 +67,16 @@ func RegisterEtcd(schema, etcdAddr, myHost string, myPort int, serviceName strin if err != nil { return fmt.Errorf("keepalive failed, errmsg:%v, lease id:%d", err, resp.ID) } - fmt.Println("RegisterEtcd ok") + log.Info("", "RegisterEtcd ok ") + go func() { for { select { case v, ok := <-kresp: if ok == true { - // fmt.Println(" kresp ok ", v) + log.Debug("", "KeepAlive kresp ok", v, schema, etcdAddr, myHost, myPort, serviceName, ttl) } else { - fmt.Println(" kresp failed ", v) + log.Error("", "KeepAlive kresp failed", schema, etcdAddr, myHost, myPort, serviceName, ttl) } } } diff --git a/pkg/grpc-etcdv3/getcdv3/resolver.go b/pkg/grpc-etcdv3/getcdv3/resolver.go index aa41f86c4..45f47a717 100644 --- a/pkg/grpc-etcdv3/getcdv3/resolver.go +++ b/pkg/grpc-etcdv3/getcdv3/resolver.go @@ -1,6 +1,7 @@ package getcdv3 import ( + "Open_IM/pkg/common/log" "context" "fmt" "go.etcd.io/etcd/clientv3" @@ -102,7 +103,7 @@ func (r *Resolver) Build(target resolver.Target, cc resolver.ClientConn, opts re if err == nil { var addrList []resolver.Address for i := range resp.Kvs { - fmt.Println("init addr: ", string(resp.Kvs[i].Value)) + log.Debug("", "init addr: ", string(resp.Kvs[i].Value)) addrList = append(addrList, resolver.Address{Addr: string(resp.Kvs[i].Value)}) } r.cc.UpdateState(resolver.State{Addresses: addrList}) @@ -148,27 +149,27 @@ func (r *Resolver) watch(prefix string, addrList []resolver.Address) { if !exists(addrList, string(ev.Kv.Value)) { flag = 1 addrList = append(addrList, resolver.Address{Addr: string(ev.Kv.Value)}) - fmt.Println("after add, new list: ", addrList) + log.Debug("", "after add, new list: ", addrList) } case mvccpb.DELETE: - fmt.Println("remove addr key: ", string(ev.Kv.Key), "value:", string(ev.Kv.Value)) + log.Debug("remove addr key: ", string(ev.Kv.Key), "value:", string(ev.Kv.Value)) i := strings.LastIndexAny(string(ev.Kv.Key), "/") if i < 0 { return } t := string(ev.Kv.Key)[i+1:] - fmt.Println("remove addr key: ", string(ev.Kv.Key), "value:", string(ev.Kv.Value), "addr:", t) + log.Debug("remove addr key: ", string(ev.Kv.Key), "value:", string(ev.Kv.Value), "addr:", t) if s, ok := remove(addrList, t); ok { flag = 1 addrList = s - fmt.Println("after remove, new list: ", addrList) + log.Debug("after remove, new list: ", addrList) } } } if flag == 1 { r.cc.UpdateState(resolver.State{Addresses: addrList}) - fmt.Println("update: ", addrList) + log.Debug("update: ", addrList) } } } @@ -176,7 +177,7 @@ func (r *Resolver) watch(prefix string, addrList []resolver.Address) { func GetConn4Unique(schema, etcdaddr, servicename string) []*grpc.ClientConn { gEtcdCli, err := clientv3.New(clientv3.Config{Endpoints: strings.Split(etcdaddr, ",")}) if err != nil { - fmt.Println("eeeeeeeeeeeee", err.Error()) + log.Error("clientv3.New failed", err.Error()) return nil } @@ -200,7 +201,7 @@ func GetConn4Unique(schema, etcdaddr, servicename string) []*grpc.ClientConn { } } else { gEtcdCli.Close() - fmt.Println("rrrrrrrrrrr", err.Error()) + log.Error("gEtcdCli.Get failed", err.Error()) return nil } gEtcdCli.Close() @@ -236,7 +237,7 @@ func GetConnPool(schema, etcdaddr, servicename string) (*ClientConn, error) { ctx, _ := context.WithDeadline(context.Background(), time.Now().Add(1000*time.Millisecond)) c, err := p.Get(ctx) - fmt.Println(err) + log.Info("", "Get ", err) return c, err } diff --git a/pkg/proto/sdk_ws/ws.proto b/pkg/proto/sdk_ws/ws.proto index 5d200cce6..cb622b96c 100644 --- a/pkg/proto/sdk_ws/ws.proto +++ b/pkg/proto/sdk_ws/ws.proto @@ -338,6 +338,36 @@ message ConversationUpdateTips{ } +///callback +message CommonCallbackURLReq { + string CallbackCommand = 1 [json_name = "code"]; + string OpenIMServerID = 2; + string OperationID = 3; +} + +message CommonCallbackURLResp { + string Code = 1 [json_name = "code"]; + string Msg = 2 [json_name = "msg"]; + string OperationID = 3 [json_name = "operationID"]; +} + +message CallbackBeforeSendMsgReq { + commonReq CommonCallbackURLReq = 1; + +} + +message CallbackBeforeSendMsgResp { + commonResp CommonCallbackURLResp = 1; + string FromUserID = 2; +} + +message CallbackAfterAddFriendReq { + commonReq CommonCallbackURLReq = 1; +} + +message CallbackAfterAddFriendResp { + commonResp CommonCallbackURLResp = 1; +} ///cms diff --git a/pkg/statistics/statistics.go b/pkg/statistics/statistics.go new file mode 100644 index 000000000..2711c5d91 --- /dev/null +++ b/pkg/statistics/statistics.go @@ -0,0 +1,31 @@ +package statistics + +import ( + "time" +) + +type Statistics struct { + Count *uint64 + ModuleName string + PrintArgs string + SleepTime int +} + +func (s *Statistics) output() { + t := time.NewTicker(time.Duration(s.SleepTime) * time.Second) + defer t.Stop() + //var sum uint64 + for { + //sum = *s.Count + select { + case <-t.C: + } + //log.NewWarn("", " system stat ", s.ModuleName, s.PrintArgs, *s.Count-sum, "total:", *s.Count) + } +} + +func NewStatistics(count *uint64, moduleName, printArgs string, sleepTime int) *Statistics { + p := &Statistics{Count: count, ModuleName: moduleName, SleepTime: sleepTime, PrintArgs: printArgs} + go p.output() + return p +} diff --git a/script/msg_gateway_start.sh b/script/msg_gateway_start.sh index 1df53cfdd..978cc11fb 100644 --- a/script/msg_gateway_start.sh +++ b/script/msg_gateway_start.sh @@ -3,6 +3,8 @@ source ./style_info.cfg source ./path_info.cfg source ./function.sh +ulimit -n 200000 + list1=$(cat $config_path | grep openImOnlineRelayPort | awk -F '[:]' '{print $NF}') list2=$(cat $config_path | grep openImWsPort | awk -F '[:]' '{print $NF}') list_to_string $list1 diff --git a/script/sdk_svr_start.sh b/script/sdk_svr_start.sh index e9604e5ea..5f47228dc 100644 --- a/script/sdk_svr_start.sh +++ b/script/sdk_svr_start.sh @@ -6,6 +6,7 @@ source ./function.sh list1=$(cat $config_path | grep openImApiPort | awk -F '[:]' '{print $NF}') list2=$(cat $config_path | grep openImWsPort | awk -F '[:]' '{print $NF}') list3=$(cat $config_path | grep openImSdkWsPort | awk -F '[:]' '{print $NF}') +logLevel=$(cat $config_path | grep remainLogLevel | awk -F '[:]' '{print $NF}') list_to_string $list1 api_ports=($ports_array) list_to_string $list2 @@ -26,7 +27,7 @@ fi #Waiting port recycling sleep 1 cd ${sdk_server_binary_root} - nohup ./${sdk_server_name} -openIM_api_port ${api_ports[0]} -openIM_ws_port ${ws_ports[0]} -sdk_ws_port ${sdk_ws_ports[0]} >>../logs/openIM.log 2>&1 & + nohup ./${sdk_server_name} -openIM_api_port ${api_ports[0]} -openIM_ws_port ${ws_ports[0]} -sdk_ws_port ${sdk_ws_ports[0]} -openIM_log_level ${logLevel} >>../logs/openIM.log 2>&1 & #Check launched service process sleep 3