diff --git a/internal/rpc/conversation/conversaion.go b/internal/rpc/conversation/conversaion.go index e9b16387b..bacba5202 100644 --- a/internal/rpc/conversation/conversaion.go +++ b/internal/rpc/conversation/conversaion.go @@ -1,17 +1,22 @@ package conversation import ( + "Open_IM/internal/common/check" chat "Open_IM/internal/rpc/msg" "Open_IM/pkg/common/constant" - "Open_IM/pkg/common/db" - imdb "Open_IM/pkg/common/db/mysql_model/im_mysql_model" - rocksCache "Open_IM/pkg/common/db/rocks_cache" + "Open_IM/pkg/common/db/cache" + "Open_IM/pkg/common/db/controller" + "Open_IM/pkg/common/db/relation" + "Open_IM/pkg/common/db/table" + "Open_IM/pkg/common/db/unrelation" "Open_IM/pkg/common/log" promePkg "Open_IM/pkg/common/prometheus" "Open_IM/pkg/getcdv3" pbConversation "Open_IM/pkg/proto/conversation" + pbUser "Open_IM/pkg/proto/user" "Open_IM/pkg/utils" "context" + "github.com/dtm-labs/rockscache" "net" "strconv" "strings" @@ -23,156 +28,55 @@ import ( "google.golang.org/grpc" ) -type rpcConversation struct { +type conversationServer struct { rpcPort int rpcRegisterName string etcdSchema string etcdAddr []string + groupChecker *check.GroupChecker + controller.ConversationInterface } -func (rpc *rpcConversation) ModifyConversationField(c context.Context, req *pbConversation.ModifyConversationFieldReq) (*pbConversation.ModifyConversationFieldResp, error) { - log.NewInfo(req.OperationID, utils.GetSelfFuncName(), "req: ", req.String()) - resp := &pbConversation.ModifyConversationFieldResp{} - var err error - isSyncConversation := true - if req.Conversation.ConversationType == constant.GroupChatType { - groupInfo, err := imdb.GetGroupInfoByGroupID(req.Conversation.GroupID) - if err != nil { - log.NewError(req.OperationID, "GetGroupInfoByGroupID failed ", req.Conversation.GroupID, err.Error()) - resp.CommonResp = &pbConversation.CommonResp{ErrCode: constant.ErrDB.ErrCode, ErrMsg: constant.ErrDB.ErrMsg} - return resp, nil - } - if groupInfo.Status == constant.GroupStatusDismissed && !req.Conversation.IsNotInGroup && req.FieldType != constant.FieldUnread { - errMsg := "group status is dismissed" - resp.CommonResp = &pbConversation.CommonResp{ErrCode: constant.ErrDB.ErrCode, ErrMsg: errMsg} - return resp, nil - } - } - var conversation imdb.Conversation - if err := utils.CopyStructFields(&conversation, req.Conversation); err != nil { - log.NewDebug(req.OperationID, utils.GetSelfFuncName(), "CopyStructFields failed", *req.Conversation, err.Error()) - } - haveUserID, _ := imdb.GetExistConversationUserIDList(req.UserIDList, req.Conversation.ConversationID) - switch req.FieldType { - case constant.FieldRecvMsgOpt: - for _, v := range req.UserIDList { - if err = db.DB.SetSingleConversationRecvMsgOpt(v, req.Conversation.ConversationID, req.Conversation.RecvMsgOpt); err != nil { - log.NewError(req.OperationID, utils.GetSelfFuncName(), "cache failed, rpc return", err.Error()) - resp.CommonResp = &pbConversation.CommonResp{ErrCode: constant.ErrDB.ErrCode, ErrMsg: constant.ErrDB.ErrMsg} - return resp, nil - } - } - err = imdb.UpdateColumnsConversations(haveUserID, req.Conversation.ConversationID, map[string]interface{}{"recv_msg_opt": conversation.RecvMsgOpt}) - case constant.FieldGroupAtType: - err = imdb.UpdateColumnsConversations(haveUserID, req.Conversation.ConversationID, map[string]interface{}{"group_at_type": conversation.GroupAtType}) - case constant.FieldIsNotInGroup: - err = imdb.UpdateColumnsConversations(haveUserID, req.Conversation.ConversationID, map[string]interface{}{"is_not_in_group": conversation.IsNotInGroup}) - case constant.FieldIsPinned: - err = imdb.UpdateColumnsConversations(haveUserID, req.Conversation.ConversationID, map[string]interface{}{"is_pinned": conversation.IsPinned}) - case constant.FieldIsPrivateChat: - err = imdb.UpdateColumnsConversations(haveUserID, req.Conversation.ConversationID, map[string]interface{}{"is_private_chat": conversation.IsPrivateChat}) - case constant.FieldEx: - err = imdb.UpdateColumnsConversations(haveUserID, req.Conversation.ConversationID, map[string]interface{}{"ex": conversation.Ex}) - case constant.FieldAttachedInfo: - err = imdb.UpdateColumnsConversations(haveUserID, req.Conversation.ConversationID, map[string]interface{}{"attached_info": conversation.AttachedInfo}) - case constant.FieldUnread: - isSyncConversation = false - err = imdb.UpdateColumnsConversations(haveUserID, req.Conversation.ConversationID, map[string]interface{}{"update_unread_count_time": conversation.UpdateUnreadCountTime}) - case constant.FieldBurnDuration: - err = imdb.UpdateColumnsConversations(haveUserID, req.Conversation.ConversationID, map[string]interface{}{"burn_duration": conversation.BurnDuration}) - } - if err != nil { - log.NewError(req.OperationID, utils.GetSelfFuncName(), "UpdateColumnsConversations error", err.Error()) - resp.CommonResp = &pbConversation.CommonResp{ErrCode: constant.ErrDB.ErrCode, ErrMsg: constant.ErrDB.ErrMsg} - return resp, nil - } - for _, v := range utils.DifferenceString(haveUserID, req.UserIDList) { - conversation.OwnerUserID = v - err = rocksCache.DelUserConversationIDListFromCache(v) - if err != nil { - log.NewError(req.OperationID, utils.GetSelfFuncName(), v, req.Conversation.ConversationID, err.Error()) - } - err := imdb.SetOneConversation(conversation) - if err != nil { - log.NewError(req.OperationID, utils.GetSelfFuncName(), "SetConversation error", err.Error()) - resp.CommonResp = &pbConversation.CommonResp{ErrCode: constant.ErrDB.ErrCode, ErrMsg: constant.ErrDB.ErrMsg} - return resp, nil - } - } - - // notification - if req.Conversation.ConversationType == constant.SingleChatType && req.FieldType == constant.FieldIsPrivateChat { - //sync peer user conversation if conversation is singleChatType - if err := syncPeerUserConversation(req.Conversation, req.OperationID); err != nil { - log.NewError(req.OperationID, utils.GetSelfFuncName(), "syncPeerUserConversation", err.Error()) - resp.CommonResp = &pbConversation.CommonResp{ErrCode: constant.ErrDB.ErrCode, ErrMsg: constant.ErrDB.ErrMsg} - return resp, nil - } - - } else { - if isSyncConversation { - for _, v := range req.UserIDList { - if err = rocksCache.DelConversationFromCache(v, req.Conversation.ConversationID); err != nil { - log.NewError(req.OperationID, utils.GetSelfFuncName(), v, req.Conversation.ConversationID, err.Error()) - } - chat.ConversationChangeNotification(req.OperationID, v) - } - } else { - for _, v := range req.UserIDList { - if err = rocksCache.DelConversationFromCache(v, req.Conversation.ConversationID); err != nil { - log.NewError(req.OperationID, utils.GetSelfFuncName(), v, req.Conversation.ConversationID, err.Error()) - } - chat.ConversationUnreadChangeNotification(req.OperationID, v, req.Conversation.ConversationID, conversation.UpdateUnreadCountTime) - } - } - - } - log.NewInfo(req.OperationID, utils.GetSelfFuncName(), "rpc return", resp.String()) - resp.CommonResp = &pbConversation.CommonResp{} - return resp, nil -} -func syncPeerUserConversation(conversation *pbConversation.Conversation, operationID string) error { - peerUserConversation := imdb.Conversation{ - OwnerUserID: conversation.UserID, - ConversationID: utils.GetConversationIDBySessionType(conversation.OwnerUserID, constant.SingleChatType), - ConversationType: constant.SingleChatType, - UserID: conversation.OwnerUserID, - GroupID: "", - RecvMsgOpt: 0, - UnreadCount: 0, - DraftTextTime: 0, - IsPinned: false, - IsPrivateChat: conversation.IsPrivateChat, - AttachedInfo: "", - Ex: "", - } - err := imdb.PeerUserSetConversation(peerUserConversation) - if err != nil { - log.NewError(operationID, utils.GetSelfFuncName(), "SetConversation error", err.Error()) - return err - } - err = rocksCache.DelConversationFromCache(conversation.UserID, utils.GetConversationIDBySessionType(conversation.OwnerUserID, constant.SingleChatType)) - if err != nil { - log.NewError(operationID, utils.GetSelfFuncName(), "DelConversationFromCache failed", err.Error(), conversation.OwnerUserID, conversation.ConversationID) - } - err = rocksCache.DelConversationFromCache(conversation.OwnerUserID, conversation.ConversationID) - if err != nil { - log.NewError(operationID, utils.GetSelfFuncName(), "DelConversationFromCache failed", err.Error(), conversation.OwnerUserID, conversation.ConversationID) - } - chat.ConversationSetPrivateNotification(operationID, conversation.OwnerUserID, conversation.UserID, conversation.IsPrivateChat) - return nil -} -func NewRpcConversationServer(port int) *rpcConversation { +func NewConversationServer(port int) *conversationServer { log.NewPrivateLog(constant.LogFileName) - return &rpcConversation{ + c := conversationServer{ rpcPort: port, rpcRegisterName: config.Config.RpcRegisterName.OpenImConversationName, etcdSchema: config.Config.Etcd.EtcdSchema, etcdAddr: config.Config.Etcd.EtcdAddr, + groupChecker: check.NewGroupChecker(), + } + var cDB relation.Conversation + var cCache cache.ConversationCache + //mysql init + var mysql relation.Mysql + err := mysql.InitConn().AutoMigrateModel(&table.ConversationModel{}) + if err != nil { + panic("db init err:" + err.Error()) } + if mysql.GormConn() != nil { + //get gorm model + cDB = relation.NewConversationGorm(mysql.GormConn()) + } else { + panic("db init err:" + "conn is nil") + } + //redis init + var redis cache.RedisClient + redis.InitRedis() + rcClient := rockscache.NewClient(redis.GetClient(), rockscache.Options{ + RandomExpireAdjustment: 0.2, + DisableCacheRead: false, + DisableCacheDelete: false, + StrongConsistency: true, + }) + cCache = cache.NewConversationRedis(rcClient) + + database := controller.NewConversationDataBase(cDB, cCache) + c.ConversationInterface = controller.NewConversationController(database) + return &c } -func (rpc *rpcConversation) Run() { +func (c *conversationServer) Run() { log.NewInfo("0", "rpc conversation start...") listenIP := "" @@ -181,11 +85,11 @@ func (rpc *rpcConversation) Run() { } else { listenIP = config.Config.ListenIP } - address := listenIP + ":" + strconv.Itoa(rpc.rpcPort) + address := listenIP + ":" + strconv.Itoa(c.rpcPort) listener, err := net.Listen("tcp", address) if err != nil { - panic("listening err:" + err.Error() + rpc.rpcRegisterName) + panic("listening err:" + err.Error() + c.rpcRegisterName) } log.NewInfo("0", "listen network success, ", address, listener) //grpc server @@ -204,7 +108,7 @@ func (rpc *rpcConversation) Run() { defer srv.GracefulStop() //service registers with etcd - pbConversation.RegisterConversationServer(srv, rpc) + pbConversation.RegisterConversationServer(srv, c) rpcRegisterIP := config.Config.RpcRegisterIP if config.Config.RpcRegisterIP == "" { rpcRegisterIP, err = utils.GetLocalIP() @@ -213,13 +117,13 @@ func (rpc *rpcConversation) Run() { } } log.NewInfo("", "rpcRegisterIP", rpcRegisterIP) - err = getcdv3.RegisterEtcd(rpc.etcdSchema, strings.Join(rpc.etcdAddr, ","), rpcRegisterIP, rpc.rpcPort, rpc.rpcRegisterName, 10) + err = getcdv3.RegisterEtcd(c.etcdSchema, strings.Join(c.etcdAddr, ","), rpcRegisterIP, c.rpcPort, c.rpcRegisterName, 10, "") if err != nil { log.NewError("0", "RegisterEtcd failed ", err.Error(), - rpc.etcdSchema, strings.Join(rpc.etcdAddr, ","), rpcRegisterIP, rpc.rpcPort, rpc.rpcRegisterName) + c.etcdSchema, strings.Join(c.etcdAddr, ","), rpcRegisterIP, c.rpcPort, c.rpcRegisterName) panic(utils.Wrap(err, "register conversation module rpc to etcd err")) } - log.NewInfo("0", "RegisterConversationServer ok ", rpc.etcdSchema, strings.Join(rpc.etcdAddr, ","), rpcRegisterIP, rpc.rpcPort, rpc.rpcRegisterName) + log.NewInfo("0", "RegisterConversationServer ok ", c.etcdSchema, strings.Join(c.etcdAddr, ","), rpcRegisterIP, c.rpcPort, c.rpcRegisterName) err = srv.Serve(listener) if err != nil { log.NewError("0", "Serve failed ", err.Error()) @@ -227,3 +131,141 @@ func (rpc *rpcConversation) Run() { } log.NewInfo("0", "rpc conversation ok") } + +func (c *conversationServer) GetConversation(ctx context.Context, req *pbConversation.GetConversationReq) (*pbConversation.GetConversationResp, error) { + resp := &pbConversation.GetConversationResp{Conversation: &pbConversation.Conversation{}} + conversations, err := c.ConversationInterface.FindConversations(ctx, req.OwnerUserID, []string{req.ConversationID}) + if err != nil { + return nil, err + } + if len(conversations) > 0 { + if err := utils.CopyStructFields(resp.Conversation, &conversations[0]); err != nil { + return nil, err + } + return resp, nil + } + return nil, nil +} + +func (c *conversationServer) GetAllConversations(ctx context.Context, req *pbConversation.GetAllConversationsReq) (*pbConversation.GetAllConversationsResp, error) { + resp := &pbConversation.GetAllConversationsResp{Conversations: []*pbConversation.Conversation{}} + conversations, err := c.ConversationInterface.GetUserAllConversation(ctx, req.OwnerUserID) + if err != nil { + return nil, err + } + if err := utils.CopyStructFields(&resp.Conversations, conversations); err != nil { + return nil, err + } + return resp, nil +} + +func (c *conversationServer) GetConversations(ctx context.Context, req *pbConversation.GetConversationsReq) (*pbConversation.GetConversationsResp, error) { + resp := &pbConversation.GetConversationsResp{Conversations: []*pbConversation.Conversation{}} + conversations, err := c.ConversationInterface.FindConversations(ctx, req.OwnerUserID, req.ConversationIDs) + if err != nil { + return nil, err + } + if err := utils.CopyStructFields(&resp.Conversations, conversations); err != nil { + return nil, err + } + return resp, nil +} + +func (c *conversationServer) BatchSetConversations(ctx context.Context, req *pbConversation.BatchSetConversationsReq) (*pbConversation.BatchSetConversationsResp, error) { + resp := &pbConversation.BatchSetConversationsResp{} + var conversations []*table.ConversationModel + if err := utils.CopyStructFields(&conversations, req.Conversations); err != nil { + return nil, err + } + err := c.ConversationInterface.SetUserConversations(ctx, req.OwnerUserID, conversations) + if err != nil { + return nil, err + } + chat.ConversationChangeNotification(ctx, req.OwnerUserID) + return resp, nil +} + +func (c *conversationServer) SetConversation(ctx context.Context, req *pbConversation.SetConversationReq) (*pbConversation.SetConversationResp, error) { + panic("implement me") +} + +func (c *conversationServer) SetRecvMsgOpt(ctx context.Context, req *pbConversation.SetRecvMsgOptReq) (*pbConversation.SetRecvMsgOptResp, error) { + panic("implement me") +} + +func (c *conversationServer) ModifyConversationField(ctx context.Context, req *pbConversation.ModifyConversationFieldReq) (*pbConversation.ModifyConversationFieldResp, error) { + resp := &pbConversation.ModifyConversationFieldResp{} + var err error + isSyncConversation := true + if req.Conversation.ConversationType == constant.GroupChatType { + groupInfo, err := c.groupChecker.GetGroupInfo(req.Conversation.GroupID) + if err != nil { + return nil, err + } + if groupInfo.Status == constant.GroupStatusDismissed && req.FieldType != constant.FieldUnread { + return nil, err + } + } + var conversation table.ConversationModel + if err := utils.CopyStructFields(&conversation, req.Conversation); err != nil { + return nil, err + } + if req.FieldType == constant.FieldIsPrivateChat { + err := c.ConversationInterface.SyncPeerUserPrivateConversationTx(ctx, req.Conversation) + if err != nil { + return nil, err + } + chat.ConversationSetPrivateNotification(req.OperationID, req.Conversation.OwnerUserID, req.Conversation.UserID, req.Conversation.IsPrivateChat) + return resp, nil + } + //haveUserID, err := c.ConversationInterface.GetUserIDExistConversation(ctx, req.UserIDList, req.Conversation.ConversationID) + //if err != nil { + // return nil, err + //} + filedMap := make(map[string]interface{}) + switch req.FieldType { + case constant.FieldRecvMsgOpt: + filedMap["recv_msg_opt"] = req.Conversation.RecvMsgOpt + case constant.FieldGroupAtType: + filedMap["group_at_type"] = req.Conversation.GroupAtType + case constant.FieldIsNotInGroup: + filedMap["is_not_in_group"] = req.Conversation.IsNotInGroup + case constant.FieldIsPinned: + filedMap["is_pinned"] = req.Conversation.IsPinned + case constant.FieldEx: + filedMap["ex"] = req.Conversation.Ex + case constant.FieldAttachedInfo: + filedMap["attached_info"] = req.Conversation.AttachedInfo + case constant.FieldUnread: + isSyncConversation = false + filedMap["update_unread_count_time"] = req.Conversation.UpdateUnreadCountTime + case constant.FieldBurnDuration: + filedMap["burn_duration"] = req.Conversation.BurnDuration + } + c.ConversationInterface.SetUsersConversationFiledTx(ctx, req.UserIDList, &conversation, filedMap) + err = c.ConversationInterface.UpdateUsersConversationFiled(ctx, haveUserID, req.Conversation.ConversationID, filedMap) + if err != nil { + return nil, err + } + var conversations []*pbConversation.Conversation + for _, v := range utils.DifferenceString(haveUserID, req.UserIDList) { + temp := new(pbConversation.Conversation) + _ = utils.CopyStructFields(temp, req.Conversation) + temp.OwnerUserID = v + conversations = append(conversations, temp) + } + err = c.ConversationInterface.CreateConversation(ctx, conversations) + if err != nil { + return nil, err + } + if isSyncConversation { + for _, v := range req.UserIDList { + chat.ConversationChangeNotification(req.OperationID, v) + } + } else { + for _, v := range req.UserIDList { + chat.ConversationUnreadChangeNotification(req.OperationID, v, req.Conversation.ConversationID, req.Conversation.UpdateUnreadCountTime) + } + } + return resp, nil +}