diff --git a/src/api/Makefile b/src/api/Makefile new file mode 100644 index 000000000..5ea7e5742 --- /dev/null +++ b/src/api/Makefile @@ -0,0 +1,26 @@ +.PHONY: all build run gotool install clean help + +BINARY_NAME=open_im_api +BIN_DIR=../../bin/ +LAN_FILE=.go +GO_FILE:=${BINARY_NAME}${LAN_FILE} + +all: gotool build + +build: + CGO_ENABLED=0 GOOS=linux GOARCH=amd64 go build -o ${BINARY_NAME} ${GO_FILE} + +run: + @go run ./ + +gotool: + go fmt ./ + go vet ./ + +install: + make build + mv ${BINARY_NAME} ${BIN_DIR} + +clean: + @if [ -f ${BINARY_NAME} ] ; then rm ${BINARY_NAME} ; fi + diff --git a/src/api/auth/user_register.go b/src/api/auth/user_register.go new file mode 100644 index 000000000..a66858160 --- /dev/null +++ b/src/api/auth/user_register.go @@ -0,0 +1,90 @@ +package apiAuth + +import ( + "Open_IM/src/common/config" + "Open_IM/src/common/log" + pbAuth "Open_IM/src/proto/auth" + "context" + "github.com/gin-gonic/gin" + "github.com/skiffer-git/grpc-etcdv3/getcdv3" + "net/http" + "strings" +) + +type paramsUserRegister struct { + Secret string `json:"secret" binding:"required,max=32"` + Platform int32 `json:"platform" binding:"required,min=1,max=7"` + UID string `json:"uid" binding:"required,min=1,max=64"` + Name string `json:"name" binding:"required,min=1,max=64"` + Icon string `json:"icon" binding:"omitempty,max=1024"` + Gender int32 `json:"gender" binding:"omitempty,oneof=0 1 2"` + Mobile string `json:"mobile" binding:"omitempty,max=32"` + Birth string `json:"birth" binding:"omitempty,max=16"` + Email string `json:"email" binding:"omitempty,max=64"` + Ex string `json:"ex" binding:"omitempty,max=1024"` +} + +func newUserRegisterReq(params *paramsUserRegister) *pbAuth.UserRegisterReq { + pbData := pbAuth.UserRegisterReq{ + UID: params.UID, + Name: params.Name, + Icon: params.Icon, + Gender: params.Gender, + Mobile: params.Mobile, + Birth: params.Birth, + Email: params.Email, + Ex: params.Ex, + } + return &pbData +} + +func UserRegister(c *gin.Context) { + log.Info("", "", "api user_register init ....") + etcdConn := getcdv3.GetConn(config.Config.Etcd.EtcdSchema, strings.Join(config.Config.Etcd.EtcdAddr, ","), config.Config.RpcRegisterName.RpcGetTokenName) + client := pbAuth.NewAuthClient(etcdConn) + + params := paramsUserRegister{} + if err := c.BindJSON(¶ms); err != nil { + c.JSON(http.StatusBadRequest, gin.H{"errCode": 400, "errMsg": err.Error()}) + return + } + pbData := newUserRegisterReq(¶ms) + + log.Info("", "", "api user_register is server, [data: %s]", pbData.String()) + reply, err := client.UserRegister(context.Background(), pbData) + if err != nil || !reply.Success { + log.Error("", "", "api user_register call rpc fail, [data: %s] [err: %s]", pbData.String(), err.Error()) + c.JSON(http.StatusInternalServerError, gin.H{"errCode": 500, "errMsg": err.Error()}) + return + } + log.Info("", "", "api user_register call rpc success, [data: %s] [reply: %s]", pbData.String(), reply.String()) + + pbDataToken := &pbAuth.UserTokenReq{ + Platform: params.Platform, + UID: params.UID, + } + replyToken, err := client.UserToken(context.Background(), pbDataToken) + if err != nil { + log.Error("", "", "api user_register call rpc fail, [data: %s] [err: %s]", pbData.String(), err.Error()) + c.JSON(http.StatusInternalServerError, gin.H{"errCode": 500, "errMsg": err.Error()}) + return + } + log.Info("", "", "api user_register call success, [data: %s] [reply: %s]", pbData.String(), reply.String()) + + if replyToken.ErrCode == 0 { + c.JSON(http.StatusOK, gin.H{ + "errCode": replyToken.ErrCode, + "errMsg": replyToken.ErrMsg, + "data": gin.H{ + "uid": pbData.UID, + "token": replyToken.Token, + "expiredTime": replyToken.ExpiredTime, + }, + }) + } else { + c.JSON(http.StatusOK, gin.H{ + "errCode": replyToken.ErrCode, + "errMsg": replyToken.ErrMsg, + }) + } +} diff --git a/src/api/auth/user_token.go b/src/api/auth/user_token.go new file mode 100644 index 000000000..2e18145f7 --- /dev/null +++ b/src/api/auth/user_token.go @@ -0,0 +1,66 @@ +package apiAuth + +import ( + "Open_IM/src/common/config" + "Open_IM/src/common/log" + pbAuth "Open_IM/src/proto/auth" + "context" + "github.com/gin-gonic/gin" + "github.com/skiffer-git/grpc-etcdv3/getcdv3" + "net/http" + "strings" +) + +type paramsUserToken struct { + Secret string `json:"secret" binding:"required,max=32"` + Platform int32 `json:"platform" binding:"required,min=1,max=7"` + UID string `json:"uid" binding:"required,min=1,max=64"` +} + +func newUserTokenReq(params *paramsUserToken) *pbAuth.UserTokenReq { + pbData := pbAuth.UserTokenReq{ + Platform: params.Platform, + UID: params.UID, + } + return &pbData +} + +func UserToken(c *gin.Context) { + log.Info("", "", "api user_token init ....") + etcdConn := getcdv3.GetConn(config.Config.Etcd.EtcdSchema, strings.Join(config.Config.Etcd.EtcdAddr, ","), config.Config.RpcRegisterName.RpcGetTokenName) + client := pbAuth.NewAuthClient(etcdConn) + + params := paramsUserToken{} + if err := c.BindJSON(¶ms); err != nil { + c.JSON(http.StatusBadRequest, gin.H{"errCode": 400, "errMsg": err.Error()}) + return + } + pbData := newUserTokenReq(¶ms) + + log.Info("", "", "api user_token is server, [data: %s]", pbData.String()) + reply, err := client.UserToken(context.Background(), pbData) + if err != nil { + log.Error("", "", "api user_token call rpc fail, [data: %s] [err: %s]", pbData.String(), err.Error()) + c.JSON(http.StatusInternalServerError, gin.H{"errCode": 500, "errMsg": err.Error()}) + return + } + log.Info("", "", "api user_token call rpc success, [data: %s] [reply: %s]", pbData.String(), reply.String()) + + if reply.ErrCode == 0 { + c.JSON(http.StatusOK, gin.H{ + "errCode": reply.ErrCode, + "errMsg": reply.ErrMsg, + "data": gin.H{ + "uid": pbData.UID, + "token": reply.Token, + "expiredTime": reply.ExpiredTime, + }, + }) + } else { + c.JSON(http.StatusOK, gin.H{ + "errCode": reply.ErrCode, + "errMsg": reply.ErrMsg, + }) + } + +} diff --git a/src/api/chat/newest_seq.go b/src/api/chat/newest_seq.go new file mode 100644 index 000000000..be6298c36 --- /dev/null +++ b/src/api/chat/newest_seq.go @@ -0,0 +1,59 @@ +package apiChat + +import ( + "Open_IM/src/common/config" + "Open_IM/src/common/log" + pbMsg "Open_IM/src/proto/chat" + "Open_IM/src/utils" + "context" + "github.com/gin-gonic/gin" + "github.com/skiffer-git/grpc-etcdv3/getcdv3" + "net/http" + "strings" +) + +type paramsUserNewestSeq struct { + ReqIdentifier int `json:"reqIdentifier" binding:"required"` + SendID string `json:"sendID" binding:"required"` + OperationID string `json:"operationID" binding:"required"` + MsgIncr int `json:"msgIncr" binding:"required"` +} + +func UserNewestSeq(c *gin.Context) { + params := paramsUserNewestSeq{} + if err := c.BindJSON(¶ms); err != nil { + c.JSON(http.StatusBadRequest, gin.H{"errCode": 400, "errMsg": err.Error()}) + return + } + + token := c.Request.Header.Get("token") + if !utils.VerifyToken(token, params.SendID) { + c.JSON(http.StatusBadRequest, gin.H{"errCode": 400, "errMsg": "token validate err"}) + return + } + + pbData := pbMsg.GetNewSeqReq{} + pbData.UserID = params.SendID + pbData.OperationID = params.OperationID + grpcConn := getcdv3.GetConn(config.Config.Etcd.EtcdSchema, strings.Join(config.Config.Etcd.EtcdAddr, ","), config.Config.RpcRegisterName.OpenImOfflineMessageName) + if grpcConn == nil { + log.ErrorByKv("get grpcConn err", pbData.OperationID, "args", params) + } + msgClient := pbMsg.NewChatClient(grpcConn) + reply, err := msgClient.GetNewSeq(context.Background(), &pbData) + if err != nil { + log.ErrorByKv("rpc call failed to getNewSeq", pbData.OperationID, "err", err, "pbData", pbData.String()) + return + } + + c.JSON(http.StatusOK, gin.H{ + "errCode": reply.ErrCode, + "errMsg": reply.ErrMsg, + "msgIncr": params.MsgIncr, + "reqIdentifier": params.ReqIdentifier, + "data": gin.H{ + "seq": reply.Seq, + }, + }) + +} diff --git a/src/api/chat/pull_msg.go b/src/api/chat/pull_msg.go new file mode 100644 index 000000000..cbbc84b57 --- /dev/null +++ b/src/api/chat/pull_msg.go @@ -0,0 +1,75 @@ +package apiChat + +import ( + "Open_IM/src/common/config" + "Open_IM/src/common/log" + "Open_IM/src/proto/chat" + "Open_IM/src/utils" + "context" + "github.com/gin-gonic/gin" + "github.com/skiffer-git/grpc-etcdv3/getcdv3" + "net/http" + "strings" +) + +type paramsUserPullMsg struct { + ReqIdentifier int `json:"reqIdentifier" binding:"required"` + SendID string `json:"sendID" binding:"required"` + OperationID string `json:"operationID" binding:"required"` + MsgIncr int `json:"msgIncr" binding:"required"` + Data struct { + SeqBegin int64 `json:"seqBegin" binding:"required"` + SeqEnd int64 `json:"seqEnd" binding:"required"` + } +} + +func UserPullMsg(c *gin.Context) { + params := paramsUserPullMsg{} + if err := c.BindJSON(¶ms); err != nil { + c.JSON(http.StatusBadRequest, gin.H{"errCode": 400, "errMsg": err.Error()}) + return + } + + token := c.Request.Header.Get("token") + if !utils.VerifyToken(token, params.SendID) { + c.JSON(http.StatusBadRequest, gin.H{"errCode": 400, "errMsg": "token validate err"}) + return + } + + pbData := pbChat.PullMessageReq{} + pbData.UserID = params.SendID + pbData.OperationID = params.OperationID + pbData.SeqBegin = params.Data.SeqBegin + pbData.SeqEnd = params.Data.SeqEnd + grpcConn := getcdv3.GetConn(config.Config.Etcd.EtcdSchema, strings.Join(config.Config.Etcd.EtcdAddr, ","), config.Config.RpcRegisterName.OpenImOfflineMessageName) + msgClient := pbChat.NewChatClient(grpcConn) + reply, err := msgClient.PullMessage(context.Background(), &pbData) + if err != nil { + log.ErrorByKv("PullMessage error", pbData.OperationID, "err", err.Error()) + return + } + log.InfoByKv("rpc call success to pullMsgRep", pbData.OperationID, "ReplyArgs", reply.String(), "maxSeq", reply.GetMaxSeq(), + "MinSeq", reply.GetMinSeq(), "singLen", len(reply.GetSingleUserMsg()), "groupLen", len(reply.GetGroupUserMsg())) + + msg := make(map[string]interface{}) + if v := reply.GetSingleUserMsg(); v != nil { + msg["single"] = v + } else { + msg["single"] = []pbChat.GatherFormat{} + } + if v := reply.GetGroupUserMsg(); v != nil { + msg["group"] = v + } else { + msg["group"] = []pbChat.GatherFormat{} + } + msg["maxSeq"] = reply.GetMaxSeq() + msg["minSeq"] = reply.GetMinSeq() + c.JSON(http.StatusOK, gin.H{ + "errCode": reply.ErrCode, + "errMsg": reply.ErrMsg, + "msgIncr": params.MsgIncr, + "reqIdentifier": params.ReqIdentifier, + "data": msg, + }) + +} diff --git a/src/api/chat/send_msg.go b/src/api/chat/send_msg.go new file mode 100644 index 000000000..30751eefd --- /dev/null +++ b/src/api/chat/send_msg.go @@ -0,0 +1,96 @@ +package apiChat + +import ( + "Open_IM/src/common/config" + "Open_IM/src/common/log" + pbChat "Open_IM/src/proto/chat" + "Open_IM/src/utils" + "context" + + "github.com/gin-gonic/gin" + "github.com/skiffer-git/grpc-etcdv3/getcdv3" + "net/http" + "strings" +) + +type paramsUserSendMsg struct { + ReqIdentifier int32 `json:"reqIdentifier" binding:"required"` + PlatformID int32 `json:"platformID" binding:"required"` + SendID string `json:"sendID" binding:"required"` + OperationID string `json:"operationID" binding:"required"` + MsgIncr int32 `json:"msgIncr" binding:"required"` + Data struct { + SessionType int32 `json:"sessionType" binding:"required"` + MsgFrom int32 `json:"msgFrom" binding:"required"` + ContentType int32 `json:"contentType" binding:"required"` + RecvID string `json:"recvID" binding:"required"` + ForceList []string `json:"forceList" binding:"required"` + Content string `json:"content" binding:"required"` + Options map[string]interface{} `json:"options" binding:"required"` + ClientMsgID string `json:"clientMsgID" binding:"required"` + OffLineInfo map[string]interface{} `json:"offlineInfo" binding:"required"` + Ex map[string]interface{} `json:"ext"` + } +} + +func newUserSendMsgReq(token string, params *paramsUserSendMsg) *pbChat.UserSendMsgReq { + pbData := pbChat.UserSendMsgReq{ + ReqIdentifier: params.ReqIdentifier, + Token: token, + SendID: params.SendID, + OperationID: params.OperationID, + MsgIncr: params.MsgIncr, + PlatformID: params.PlatformID, + SessionType: params.Data.SessionType, + MsgFrom: params.Data.MsgFrom, + ContentType: params.Data.ContentType, + RecvID: params.Data.RecvID, + ForceList: params.Data.ForceList, + Content: params.Data.Content, + Options: utils.MapToJsonString(params.Data.Options), + ClientMsgID: params.Data.ClientMsgID, + OffLineInfo: utils.MapToJsonString(params.Data.OffLineInfo), + Ex: utils.MapToJsonString(params.Data.Ex), + } + return &pbData +} + +func UserSendMsg(c *gin.Context) { + params := paramsUserSendMsg{} + if err := c.BindJSON(¶ms); err != nil { + c.JSON(http.StatusBadRequest, gin.H{"errCode": 400, "errMsg": err.Error()}) + log.ErrorByKv("json unmarshal err", "", "err", err.Error(), "data", c.PostForm("data")) + return + } + + token := c.Request.Header.Get("token") + if !utils.VerifyToken(token, params.SendID) { + c.JSON(http.StatusBadRequest, gin.H{"errCode": 400, "errMsg": "token validate err"}) + return + } + + log.InfoByKv("Ws call success to sendMsgReq", params.OperationID, "Parameters", params) + + pbData := newUserSendMsgReq(token, ¶ms) + log.Info("", "", "api UserSendMsg call start..., [data: %s]", pbData.String()) + + etcdConn := getcdv3.GetConn(config.Config.Etcd.EtcdSchema, strings.Join(config.Config.Etcd.EtcdAddr, ","), config.Config.RpcRegisterName.OpenImOfflineMessageName) + client := pbChat.NewChatClient(etcdConn) + + log.Info("", "", "api UserSendMsg call, api call rpc...") + + reply, _ := client.UserSendMsg(context.Background(), pbData) + log.Info("", "", "api UserSendMsg call end..., [data: %s] [reply: %s]", pbData.String(), reply.String()) + + c.JSON(http.StatusOK, gin.H{ + "errCode": 0, + "errMsg": "", + "msgIncr": reply.MsgIncr, + "reqIdentifier": reply.ReqIdentifier, + "data": gin.H{ + "clientMsgID": reply.ClientMsgID, + "serverMsgID": reply.ServerMsgID, + }, + }) + +} diff --git a/src/api/open_im_api.go b/src/api/open_im_api.go new file mode 100644 index 000000000..a2b6f45ae --- /dev/null +++ b/src/api/open_im_api.go @@ -0,0 +1,75 @@ +package main + +import ( + apiAuth "Open_IM/src/api/auth" + apiChat "Open_IM/src/api/chat" + "Open_IM/src/api/friend" + apiThird "Open_IM/src/api/third" + "Open_IM/src/api/user" + "Open_IM/src/common/log" + "Open_IM/src/utils" + "flag" + "github.com/gin-gonic/gin" + "strconv" +) + +func main() { + log.Info("", "", "api server running...") + r := gin.Default() + r.Use(utils.CorsHandler()) + // user routing group, which handles user registration and login services + userRouterGroup := r.Group("/user") + { + userRouterGroup.POST("/update_user_info", user.UpdateUserInfo) + userRouterGroup.POST("/get_user_info", user.GetUserInfo) + } + //friend routing group + friendRouterGroup := r.Group("/friend") + { + friendRouterGroup.POST("/search_friend", friend.SearchFriend) + friendRouterGroup.POST("/add_friend", friend.AddFriend) + friendRouterGroup.POST("/get_friend_apply_list", friend.GetFriendApplyList) + friendRouterGroup.POST("/get_friend_list", friend.GetFriendList) + friendRouterGroup.POST("/add_blacklist", friend.AddBlacklist) + friendRouterGroup.POST("/get_blacklist", friend.GetBlacklist) + friendRouterGroup.POST("/remove_blacklist", friend.RemoveBlacklist) + friendRouterGroup.POST("/delete_friend", friend.DeleteFriend) + friendRouterGroup.POST("/add_friend_response", friend.AddFriendResponse) + friendRouterGroup.POST("/set_friend_comment", friend.SetFriendComment) + } + //group related routing group + /*groupRouterGroup := r.Group("/group") + { + groupRouterGroup.POST("/create_group", group.CreateGroup) + groupRouterGroup.POST("/get_group_list", group.GetGroupList) + groupRouterGroup.POST("/get_group_info", group.GetGroupInfo) + groupRouterGroup.POST("/delete_group_member", group.DeleteGroupMember) + groupRouterGroup.POST("/set_group_name", group.SetGroupName) + groupRouterGroup.POST("/set_group_bulletin", group.SetGroupBulletin) + groupRouterGroup.POST("/set_owner_group_nickname", group.SetOwnerGroupNickname) + groupRouterGroup.POST("/set_group_head_image", group.SetGroupHeadImage) + groupRouterGroup.POST("/member_exit_group", group.MemberExitGroup) + }*/ + //certificate + authRouterGroup := r.Group("/auth") + { + authRouterGroup.POST("/user_register", apiAuth.UserRegister) + authRouterGroup.POST("/user_token", apiAuth.UserToken) + } + //Third service + thirdGroup := r.Group("/third") + { + thirdGroup.POST("/tencent_cloud_storage_credential", apiThird.TencentCloudStorageCredential) + } + //Message + chatGroup := r.Group("/chat") + { + chatGroup.POST("/newest_seq", apiChat.UserNewestSeq) + chatGroup.POST("/pull_msg", apiChat.UserPullMsg) + chatGroup.POST("/send_msg", apiChat.UserSendMsg) + } + + ginPort := flag.Int("port", 10000, "get ginServerPort from cmd,default 10000 as port") + flag.Parse() + r.Run(utils.ServerIP + ":" + strconv.Itoa(*ginPort)) +} diff --git a/src/api/third/tencent_cloud_storage_credential.go b/src/api/third/tencent_cloud_storage_credential.go new file mode 100644 index 000000000..e7d299842 --- /dev/null +++ b/src/api/third/tencent_cloud_storage_credential.go @@ -0,0 +1,70 @@ +package apiThird + +import ( + "Open_IM/src/common/config" + log2 "Open_IM/src/common/log" + "github.com/gin-gonic/gin" + sts "github.com/tencentyun/qcloud-cos-sts-sdk/go" + "net/http" + "time" +) + +type paramsTencentCloudStorageCredential struct { + Token string `json:"token"` + OperationID string `json:"operationID"` +} + +func TencentCloudStorageCredential(c *gin.Context) { + params := paramsTencentCloudStorageCredential{} + if err := c.BindJSON(¶ms); err != nil { + c.JSON(http.StatusBadRequest, gin.H{"errCode": 400, "errMsg": "Parameter parsing error,please check the parameters and request service again"}) + return + } + + log2.Info(params.Token, params.OperationID, "api TencentUpLoadCredential call start...") + + cli := sts.NewClient( + config.Config.Credential.Tencent.SecretID, + config.Config.Credential.Tencent.SecretKey, + nil, + ) + log2.Info(c.Request.Header.Get("token"), c.PostForm("optionID"), "api TencentUpLoadCredential sts.NewClient cli = %v", cli) + + opt := &sts.CredentialOptions{ + DurationSeconds: int64(time.Hour.Seconds()), + Region: config.Config.Credential.Tencent.Region, + Policy: &sts.CredentialPolicy{ + Statement: []sts.CredentialPolicyStatement{ + { + Action: []string{ + "name/cos:PostObject", + "name/cos:PutObject", + }, + Effect: "allow", + Resource: []string{ + "qcs::cos:" + config.Config.Credential.Tencent.Region + ":uid/" + config.Config.Credential.Tencent.AppID + ":" + config.Config.Credential.Tencent.Bucket + "/*", + }, + }, + }, + }, + } + log2.Info(c.Request.Header.Get("token"), c.PostForm("optionID"), "api TencentUpLoadCredential sts.CredentialOptions opt = %v", opt) + + res, err := cli.GetCredential(opt) + if err != nil { + log2.Error(c.Request.Header.Get("token"), c.PostForm("optionID"), "api TencentUpLoadCredential cli.GetCredential err = %s", err.Error()) + c.JSON(http.StatusOK, gin.H{ + "errCode": config.ErrTencentCredential.ErrCode, + "errMsg": err.Error(), + "data": res, + }) + return + } + log2.Info(c.Request.Header.Get("token"), c.PostForm("optionID"), "api TencentUpLoadCredential cli.GetCredential success res = %v, res.Credentials = %v", res, res.Credentials) + + c.JSON(http.StatusOK, gin.H{ + "errCode": 0, + "errMsg": "", + "data": res, + }) +} diff --git a/src/common/config/config.go b/src/common/config/config.go new file mode 100644 index 000000000..362225916 --- /dev/null +++ b/src/common/config/config.go @@ -0,0 +1,150 @@ +package config + +import ( + "gopkg.in/yaml.v3" + "io/ioutil" +) + +var Config config + +type config struct { + ServerIP string `yaml:"serverip"` + + Api struct { + GinPort []int `yaml:"openImApiPort"` + } + + Credential struct { + Tencent struct { + AppID string `yaml:"appID"` + Region string `yaml:"region"` + Bucket string `yaml:"bucket"` + SecretID string `yaml:"secretID"` + SecretKey string `yaml:"secretKey"` + } + } + + Mysql struct { + DBAddress []string `yaml:"dbAddress"` + DBUserName string `yaml:"dbUserName"` + DBPassword string `yaml:"dbPassword"` + DBDatabaseName string `yaml:"dbDatabaseName"` + DBTableName string `yaml:"DBTableName"` + DBMsgTableNum int `yaml:"dbMsgTableNum"` + DBMaxOpenConns int `yaml:"dbMaxOpenConns"` + DBMaxIdleConns int `yaml:"dbMaxIdleConns"` + DBMaxLifeTime int `yaml:"dbMaxLifeTime"` + } + Mongo struct { + DBAddress []string `yaml:"dbAddress"` + DBDirect bool `yaml:"dbDirect"` + DBTimeout int `yaml:"dbTimeout"` + DBDatabase []string `yaml:"dbDatabase"` + DBSource string `yaml:"dbSource"` + DBUserName string `yaml:"dbUserName"` + DBPassword string `yaml:"dbPassword"` + DBMaxPoolSize int `yaml:"dbMaxPoolSize"` + DBRetainChatRecords int `yaml:"dbRetainChatRecords"` + } + Redis struct { + DBAddress []string `yaml:"dbAddress"` + DBMaxIdle int `yaml:"dbMaxIdle"` + DBMaxActive int `yaml:"dbMaxActive"` + DBIdleTimeout int `yaml:"dbIdleTimeout"` + DBPassWord string `yaml:"dbPassWord"` + } + RpcPort struct { + OpenImUserPort []int `yaml:"openImUserPort"` + openImFriendPort []int `yaml:"openImFriendPort"` + RpcMessagePort []int `yaml:"rpcMessagePort"` + RpcPushMessagePort []int `yaml:"rpcPushMessagePort"` + OpenImGroupPort []int `yaml:"openImGroupPort"` + RpcModifyUserInfoPort []int `yaml:"rpcModifyUserInfoPort"` + RpcGetTokenPort []int `yaml:"rpcGetTokenPort"` + } + RpcRegisterName struct { + OpenImUserName string `yaml:"openImUserName"` + OpenImFriendName string `yaml:"openImFriendName"` + OpenImOfflineMessageName string `yaml:"openImOfflineMessageName"` + OpenImPushName string `yaml:"openImPushName"` + OpenImOnlineMessageRelayName string `yaml:"openImOnlineMessageRelayName"` + OpenImGroupName string `yaml:"openImGroupName"` + RpcGetTokenName string `yaml:"rpcGetTokenName"` + } + Etcd struct { + EtcdSchema string `yaml:"etcdSchema"` + EtcdAddr []string `yaml:"etcdAddr"` + } + Log struct { + StorageLocation string `yaml:"storageLocation"` + ElasticSearchSwitch bool `yaml:"elasticSearchSwitch"` + ElasticSearchAddr []string `yaml:"elasticSearchAddr"` + ElasticSearchUser string `yaml:"elasticSearchUser"` + ElasticSearchPassword string `yaml:"elasticSearchPassword"` + } + ModuleName struct { + LongConnSvrName string `yaml:"longConnSvrName"` + MsgTransferName string `yaml:"msgTransferName"` + PushName string `yaml:"pushName"` + } + LongConnSvr struct { + WebsocketPort []int `yaml:"websocketPort"` + WebsocketMaxConnNum int `yaml:"websocketMaxConnNum"` + WebsocketMaxMsgLen int `yaml:"websocketMaxMsgLen"` + WebsocketTimeOut int `yaml:"websocketTimeOut"` + } + + Push struct { + Tpns struct { + Ios struct { + AccessID string `yaml:"accessID"` + SecretKey string `yaml:"secretKey"` + } + Android struct { + AccessID string `yaml:"accessID"` + SecretKey string `yaml:"secretKey"` + } + } + } + + Kafka struct { + Ws2mschat struct { + Addr []string `yaml:"addr"` + Topic string `yaml:"topic"` + } + Ms2pschat struct { + Addr []string `yaml:"addr"` + Topic string `yaml:"topic"` + } + ConsumerGroupID struct { + MsgToMongo string `yaml:"msgToMongo"` + MsgToMySql string `yaml:"msgToMySql"` + MsgToPush string `yaml:"msgToPush"` + } + } + + Secret string + + MultiLoginPolicy struct { + OnlyOneTerminalAccess bool `yaml:"onlyOneTerminalAccess"` + MobileAndPCTerminalAccessButOtherTerminalKickEachOther bool `yaml:"mobileAndPCTerminalAccessButOtherTerminalKickEachOther"` + AllTerminalAccess bool `yaml:"allTerminalAccess"` + } + TokenPolicy struct { + AccessSecret string `yaml:"accessSecret"` + AccessExpire int64 `yaml:"accessExpire"` + } +} + +func init() { + bytes, err := ioutil.ReadFile("../config/config.yaml") + if err != nil { + panic(err) + return + } + if err = yaml.Unmarshal(bytes, &Config); err != nil { + panic(err) + return + } + +} diff --git a/src/common/config/error.go b/src/common/config/error.go new file mode 100644 index 000000000..e629cf727 --- /dev/null +++ b/src/common/config/error.go @@ -0,0 +1,38 @@ +package config + +// key = errCode, string = errMsg +type ErrInfo struct { + ErrCode int32 + ErrMsg string +} + +var ( + OK = ErrInfo{0, ""} + + ErrMysql = ErrInfo{100, ""} + ErrMongo = ErrInfo{110, ""} + ErrRedis = ErrInfo{120, ""} + ErrParseToken = ErrInfo{200, "Parse token failed"} + ErrCreateToken = ErrInfo{201, "Create token failed"} + ErrAppServerKey = ErrInfo{300, "key error"} + ErrTencentCredential = ErrInfo{400, ""} + + ErrorUserRegister = ErrInfo{600, "User registration failed"} + ErrAccountExists = ErrInfo{601, "The account is already registered and cannot be registered again"} + ErrUserPassword = ErrInfo{602, "User password error"} + ErrTokenIncorrect = ErrInfo{603, "Invalid token"} + ErrTokenExpired = ErrInfo{604, "Expired token"} + ErrRefreshToken = ErrInfo{605, "Failed to refresh token"} + ErrAddFriend = ErrInfo{606, "Failed to add friends"} + ErrAgreeToAddFriend = ErrInfo{607, "Failed to agree to add friend"} + ErrAddFriendToBlack = ErrInfo{608, "Failed to add friends to the blacklist"} + ErrGetBlackList = ErrInfo{609, "Failed to get blacklist"} + ErrDeleteFriend = ErrInfo{610, "Failed to delete friend"} + ErrGetFriendApplyList = ErrInfo{611, "Failed to get friend application list"} + ErrGetFriendList = ErrInfo{612, "Failed to get friend list"} + ErrRemoveBlackList = ErrInfo{613, "Failed to remove blacklist"} + ErrSearchUserInfo = ErrInfo{614, "Failed to find user information"} + ErrDelAppleDeviceToken = ErrInfo{615, ""} + ErrModifyUserInfo = ErrInfo{616, "update user some attribute failed"} + ErrSetFriendComment = ErrInfo{617, "set friend comment failed"} +) diff --git a/src/common/db/model.go b/src/common/db/model.go new file mode 100644 index 000000000..f76600f8f --- /dev/null +++ b/src/common/db/model.go @@ -0,0 +1,72 @@ +package db + +import ( + "gopkg.in/mgo.v2" +) + +var DB DataBases + +type DataBases struct { + MgoDB mongoDB + RedisDB redisDB + MysqlDB mysqlDB +} + +func key(dbAddress, dbName string) string { + return dbAddress + "_" + dbName +} + +//type Config struct { +// Mongo struct { +// DBAddress []string `yaml:"dbAddress"` +// DBDirect bool `yaml:"dbDirect"` +// DBTimeout int `yaml:"dbTimeout"` +// DBDatabase []string `yaml:"dbDatabase"` +// DBSource string `yaml:"dbSource"` +// DBUserName string `yaml:"dbUserName"` +// DBPassword string `yaml:"dbPassword"` +// DBMaxPoolSize int `yaml:"dbMaxPoolSize"` +// } +// Mysql struct { +// DBAddress []string `yaml:"dbAddress"` +// DBPort int `yaml:"dbPort"` +// DBUserName string `yaml:"dbUserName"` +// DBPassword string `yaml:"dbPassword"` +// DBDatabaseName string `yaml:"dbChatName"` // 默认使用DBAddress[0] +// DBTableName string `yaml:"dbMsgName"` +// DBMsgTableNum int `yaml:"dbMsgTableNum"` +// DBCharset string `yaml:"dbCharset"` +// DBMaxOpenConns int `yaml:"dbMaxOpenConns"` +// DBMaxIdleConns int `yaml:"dbMaxIdleConns"` +// DBMaxLifeTime int `yaml:"dbMaxLifeTime"` +// } +// Redis struct { +// DBAddress string `yaml:"dbAddress"` +// DBPort int `yaml:"dbPort"` +// DBMaxIdle int `yaml:"dbMaxIdle"` +// DBMaxActive int `yaml:"dbMaxActive"` +// DBIdleTimeout int `yaml:"dbIdleTimeout"` +// } +//} + +//func init() { +// bytes, err := ioutil.ReadFile("config/db.yaml") +// if err != nil { +// log.Error("", "", "read db.yaml config fail! err = %s", err.Error()) +// return +// } +// +// if err = yaml.Unmarshal(bytes, &DB.Config); err != nil { +// log.Error("", "", "unmarshal db.yaml config fail! err = %s", err.Error()) +// return +// } +// +// DB.RedisDB.newPool(DB.Config) +// //DB.MysqlDB.sqlxDB(DB.Config.Mysql.DBName[0], DB.Config) +//} +func init() { + DB.RedisDB.newPool() +} +func (d *DataBases) session(dbName string) *mgo.Session { + return d.MgoDB.mgoSession(dbName) +} diff --git a/src/common/db/mongo.go b/src/common/db/mongo.go new file mode 100644 index 000000000..2da6ff3e0 --- /dev/null +++ b/src/common/db/mongo.go @@ -0,0 +1,51 @@ +package db + +import ( + "Open_IM/src/common/config" + "errors" + "fmt" + "gopkg.in/mgo.v2" + "sync" + "time" +) + +type mongoDB struct { + sync.RWMutex + dbMap map[string]*mgo.Session +} + +func (m *mongoDB) mgoSession(dbName string) *mgo.Session { + m.Lock() + defer m.Unlock() + if _, ok := m.dbMap[dbName]; !ok { + if err := m.newMgoSession(dbName); err != nil { + panic(err) + return nil + } + } + return m.dbMap[dbName] +} + +func (m *mongoDB) newMgoSession(dbName string) error { + dailInfo := &mgo.DialInfo{ + Addrs: config.Config.Mongo.DBAddress, + Direct: config.Config.Mongo.DBDirect, + Timeout: time.Second * time.Duration(config.Config.Mongo.DBTimeout), + Database: dbName, + Source: config.Config.Mongo.DBSource, + Username: config.Config.Mongo.DBUserName, + Password: config.Config.Mongo.DBPassword, + PoolLimit: config.Config.Mongo.DBMaxPoolSize, + } + session, err := mgo.DialWithInfo(dailInfo) + if err != nil { + return errors.New(fmt.Sprintf("mongo DialWithInfo fail, err= %s", err.Error())) + } + + if m.dbMap == nil { + m.dbMap = make(map[string]*mgo.Session) + } + + m.dbMap[dbName] = session + return nil +} diff --git a/src/common/db/mongoModel.go b/src/common/db/mongoModel.go new file mode 100644 index 000000000..fdd923216 --- /dev/null +++ b/src/common/db/mongoModel.go @@ -0,0 +1,189 @@ +package db + +import ( + "Open_IM/src/common/config" + "Open_IM/src/common/constant" + pbMsg "Open_IM/src/proto/chat" + "errors" + "github.com/golang/protobuf/proto" + "gopkg.in/mgo.v2" + "gopkg.in/mgo.v2/bson" + "time" +) + +type UserChat struct { + UID string + Msg [][]byte +} + +func (d *DataBases) GetUserChat(uid string, seqBegin, seqEnd int64) (SingleMsg []*pbMsg.MsgFormat, GroupMsg []*pbMsg.MsgFormat, MaxSeq int64, MinSeq int64, err error) { + session := d.session(config.Config.Mongo.DBDatabase[0]).Clone() + if session == nil { + return nil, nil, MaxSeq, MinSeq, errors.New("session == nil") + } + defer session.Close() + + c := session.DB(config.Config.Mongo.DBDatabase[0]).C("chat") + + sChat := UserChat{} + if err = c.Find(bson.M{"uid": uid}).One(&sChat); err != nil { + return nil, nil, MaxSeq, MinSeq, err + } + pChat := pbMsg.MsgSvrToPushSvrChatMsg{} + for i := 0; i < len(sChat.Msg); i++ { + //每次产生新的指针 + temp := new(pbMsg.MsgFormat) + if err = proto.Unmarshal(sChat.Msg[i], &pChat); err != nil { + return nil, nil, MaxSeq, MinSeq, err + } + if pChat.RecvSeq >= seqBegin && pChat.RecvSeq <= seqEnd { + temp.SendID = pChat.SendID + temp.RecvID = pChat.RecvID + temp.MsgFrom = pChat.MsgFrom + temp.Seq = pChat.RecvSeq + temp.ServerMsgID = pChat.MsgID + temp.SendTime = pChat.SendTime + temp.Content = pChat.Content + temp.ContentType = pChat.ContentType + temp.SenderPlatformID = pChat.PlatformID + if pChat.RecvSeq > MaxSeq { + MaxSeq = pChat.RecvSeq + } + if i == 0 { + MinSeq = pChat.RecvSeq + } + if pChat.RecvSeq < MinSeq { + MinSeq = pChat.RecvSeq + } + //单聊消息 + if pChat.SessionType == constant.SingleChatType { + SingleMsg = append(SingleMsg, temp) + } else { + GroupMsg = append(GroupMsg, temp) + } + } + } + + //d.DelUserChat(&sChat) + + return SingleMsg, GroupMsg, MaxSeq, MinSeq, nil +} + +func (d *DataBases) SaveUserChat(uid string, m proto.Message) error { + session := d.session(config.Config.Mongo.DBDatabase[0]).Clone() + if session == nil { + return errors.New("session == nil") + } + defer session.Close() + session.SetMode(mgo.Monotonic, true) + + c := session.DB(config.Config.Mongo.DBDatabase[0]).C("chat") + + n, err := c.Find(bson.M{"uid": uid}).Count() + if err != nil { + return err + } + + if n == 0 { + sChat := UserChat{} + sChat.UID = uid + bMsg, _ := proto.Marshal(m) + sChat.Msg = append(sChat.Msg, bMsg) + + err = c.Insert(&sChat) + if err != nil { + return err + } + } else { + bMsg, err := proto.Marshal(m) + err = c.Update(bson.M{"uid": uid}, bson.M{"$addToSet": bson.M{"msg": bMsg}}) + if err != nil { + return err + } + } + + return nil +} + +func (d *DataBases) DelUserChat(uc *UserChat) { + delMaxIndex := 0 + pbData := pbMsg.WSToMsgSvrChatMsg{} + for i := 0; i < len(uc.Msg); i++ { + if err := proto.Unmarshal(uc.Msg[i], &pbData); err != nil { + delMaxIndex = i + } else { + if time.Now().Unix()-pbData.SendTime > 7*24*3600 { + delMaxIndex = i + } else { + break + } + } + } + + if delMaxIndex > 0 { + uc.Msg = uc.Msg[delMaxIndex:] + + session := d.session(config.Config.Mongo.DBDatabase[0]).Clone() + if session == nil { + return + } + defer session.Close() + + c := session.DB(config.Config.Mongo.DBDatabase[0]).C("chat") + if err := c.Update(bson.M{"uid": uc.UID}, bson.M{"msg": uc.Msg}); err != nil { + return + } + } +} + +func (d *DataBases) DelHistoryChat(days int64, ids []string) error { + session := d.session(config.Config.Mongo.DBDatabase[0]).Clone() + if session == nil { + return errors.New("mgo session == nil") + } + defer session.Close() + + c := session.DB(config.Config.Mongo.DBDatabase[0]).C("chat") + + for i := 0; i < len(ids); i++ { + d.delHistoryUserChat(c, days, ids[i]) + //time.Sleep(1 * time.Millisecond) + } + + return nil +} + +func (d *DataBases) delHistoryUserChat(c *mgo.Collection, days int64, id string) error { + sChat := UserChat{} + if err := c.Find(bson.M{"uid": id}).One(&sChat); err != nil { + return err + } + + delMaxIndex := 0 + pbData := pbMsg.WSToMsgSvrChatMsg{} + for i := 0; i < len(sChat.Msg); i++ { + if err := proto.Unmarshal(sChat.Msg[i], &pbData); err != nil { + delMaxIndex = i + } else { + if time.Now().Unix()-pbData.SendTime > int64(days)*24*3600 { + delMaxIndex = i + } else { + break + } + } + } + + if delMaxIndex > 0 { + if delMaxIndex < len(sChat.Msg) { + sChat.Msg = sChat.Msg[delMaxIndex:] + } else { + sChat.Msg = sChat.Msg[0:0] + } + + if err := c.Update(bson.M{"uid": sChat.UID}, bson.M{"msg": sChat.Msg}); err != nil { + return err + } + } + + return nil +} diff --git a/src/common/db/mysql.go b/src/common/db/mysql.go new file mode 100644 index 000000000..fbe7d3fa5 --- /dev/null +++ b/src/common/db/mysql.go @@ -0,0 +1,53 @@ +package db + +import ( + "Open_IM/src/common/config" + "fmt" + "github.com/jinzhu/gorm" + _ "github.com/jinzhu/gorm/dialects/mysql" + "sync" + "time" +) + +type mysqlDB struct { + sync.RWMutex + dbMap map[string]*gorm.DB +} + +func (m *mysqlDB) DefaultGormDB() (*gorm.DB, error) { + return m.GormDB(config.Config.Mysql.DBAddress[0], config.Config.Mysql.DBDatabaseName) +} + +func (m *mysqlDB) GormDB(dbAddress, dbName string) (*gorm.DB, error) { + m.Lock() + defer m.Unlock() + + k := key(dbAddress, dbName) + if _, ok := m.dbMap[k]; !ok { + if err := m.open(dbAddress, dbName); err != nil { + return nil, err + } + } + return m.dbMap[k], nil +} + +func (m *mysqlDB) open(dbAddress, dbName string) error { + dsn := fmt.Sprintf("%s:%s@tcp(%s)/%s?charset=utf8mb4&parseTime=true&loc=Local", + config.Config.Mysql.DBUserName, config.Config.Mysql.DBPassword, dbAddress, dbName) + db, err := gorm.Open("mysql", dsn) + if err != nil { + return err + } + + db.SingularTable(true) + db.DB().SetMaxOpenConns(config.Config.Mysql.DBMaxOpenConns) + db.DB().SetMaxIdleConns(config.Config.Mysql.DBMaxIdleConns) + db.DB().SetConnMaxLifetime(time.Duration(config.Config.Mysql.DBMaxLifeTime) * time.Second) + + if m.dbMap == nil { + m.dbMap = make(map[string]*gorm.DB) + } + k := key(dbAddress, dbName) + m.dbMap[k] = db + return nil +} diff --git a/src/common/db/redis.go b/src/common/db/redis.go new file mode 100644 index 000000000..31c2e86c4 --- /dev/null +++ b/src/common/db/redis.go @@ -0,0 +1,52 @@ +package db + +import ( + "Open_IM/src/common/config" + log2 "Open_IM/src/common/log" + "github.com/garyburd/redigo/redis" + "time" +) + +type redisDB struct { + pool *redis.Pool +} + +func (r *redisDB) newPool() { + r.pool = &redis.Pool{ + MaxIdle: config.Config.Redis.DBMaxIdle, + MaxActive: config.Config.Redis.DBMaxActive, + + IdleTimeout: time.Duration(config.Config.Redis.DBIdleTimeout) * time.Second, + Dial: func() (redis.Conn, error) { + return redis.Dial( + "tcp", + config.Config.Redis.DBAddress[0], + redis.DialReadTimeout(time.Duration(1000)*time.Millisecond), + redis.DialWriteTimeout(time.Duration(1000)*time.Millisecond), + redis.DialConnectTimeout(time.Duration(1000)*time.Millisecond), + redis.DialDatabase(0), + redis.DialPassword(config.Config.Redis.DBPassWord), + ) + }, + } +} + +func (r *redisDB) Exec(cmd string, key interface{}, args ...interface{}) (interface{}, error) { + con := r.pool.Get() + if err := con.Err(); err != nil { + log2.Error("", "", "redis cmd = %v, err = %v", cmd, err) + return nil, err + } + defer con.Close() + + params := make([]interface{}, 0) + params = append(params, key) + + if len(args) > 0 { + for _, v := range args { + params = append(params, v) + } + } + + return con.Do(cmd, params...) +} diff --git a/src/common/db/redisModel.go b/src/common/db/redisModel.go new file mode 100644 index 000000000..27a67b96b --- /dev/null +++ b/src/common/db/redisModel.go @@ -0,0 +1,71 @@ +package db + +import ( + "github.com/garyburd/redigo/redis" +) + +const ( + userIncrSeq = "REDIS_USER_INCR_SEQ:" // user incr seq + appleDeviceToken = "DEVICE_TOKEN" + lastGetSeq = "LAST_GET_SEQ" +) + +//执行用户消息的seq自增操作 +func (d *DataBases) IncrUserSeq(uid string) (int64, error) { + key := userIncrSeq + uid + return redis.Int64(d.RedisDB.Exec("INCR", key)) +} + +//获取最新的seq +func (d *DataBases) GetUserSeq(uid string) (int64, error) { + key := userIncrSeq + uid + return redis.Int64(d.RedisDB.Exec("GET", key)) +} + +//存储苹果的设备token到redis +func (d *DataBases) SetAppleDeviceToken(accountAddress, value string) (err error) { + key := appleDeviceToken + accountAddress + _, err = d.RedisDB.Exec("SET", key, value) + return err +} + +//删除苹果设备token +func (d *DataBases) DelAppleDeviceToken(accountAddress string) (err error) { + key := appleDeviceToken + accountAddress + _, err = d.RedisDB.Exec("DEL", key) + return err +} + +//记录用户上一次主动拉取Seq的值 +func (d *DataBases) SetLastGetSeq(uid string) (err error) { + key := lastGetSeq + uid + _, err = d.RedisDB.Exec("SET", key) + return err +} + +//获取用户上一次主动拉取Seq的值 +func (d *DataBases) GetLastGetSeq(uid string) (int64, error) { + key := userIncrSeq + uid + return redis.Int64(d.RedisDB.Exec("GET", key)) +} + +//Store userid and platform class to redis +func (d *DataBases) SetUserIDAndPlatform(userID, platformClass, value string, ttl int64) error { + key := userID + platformClass + _, err := d.RedisDB.Exec("SET", key, value, "EX", ttl) + return err +} + +//Check exists userid and platform class from redis +func (d *DataBases) ExistsUserIDAndPlatform(userID, platformClass string) (interface{}, error) { + key := userID + platformClass + exists, err := d.RedisDB.Exec("EXISTS", key) + return exists, err +} + +//Get platform class Token +func (d *DataBases) GetPlatformToken(userID, platformClass string) (interface{}, error) { + key := userID + platformClass + token, err := d.RedisDB.Exec("GET", key) + return token, err +}