diff --git a/go.mod b/go.mod index d02d29993..7b68502f8 100644 --- a/go.mod +++ b/go.mod @@ -5,7 +5,7 @@ go 1.18 require ( firebase.google.com/go v3.13.0+incompatible github.com/OpenIMSDK/getcdv3 v1.0.4 - github.com/OpenIMSDK/openKeeper v0.9.8 + github.com/OpenIMSDK/openKeeper v0.9.7 github.com/OpenIMSDK/open_utils v1.0.8 github.com/Shopify/sarama v1.32.0 github.com/alibabacloud-go/darabonba-openapi v0.1.11 @@ -46,7 +46,7 @@ require ( github.com/tencentcloud/tencentcloud-sdk-go/tencentcloud/common v1.0.428 github.com/tencentcloud/tencentcloud-sdk-go/tencentcloud/sms v1.0.428 github.com/tencentyun/qcloud-cos-sts-sdk v0.0.0-20210325043845-84a0811633ca - go.etcd.io/etcd/client/v3 v3.5.6 + go.etcd.io/etcd/client/v3 v3.5.6 // indirect go.mongodb.org/mongo-driver v1.8.3 golang.org/x/image v0.3.0 golang.org/x/net v0.5.0 diff --git a/go.sum b/go.sum index 7c85168c7..91c586d20 100644 --- a/go.sum +++ b/go.sum @@ -379,8 +379,8 @@ github.com/KyleBanks/depth v1.2.1/go.mod h1:jzSb9d0L43HxTQfT+oSA1EEp2q+ne2uh6Xge github.com/OneOfOne/xxhash v1.2.2/go.mod h1:HSdplMjZKSmBqAxg5vPj2TmRDmfkzw+cTzAElWljhcU= github.com/OpenIMSDK/getcdv3 v1.0.4 h1:wKpLcp1gbLbh+fa7b5iCL4fTBLm87hB0+p0ZQMg9tK8= github.com/OpenIMSDK/getcdv3 v1.0.4/go.mod h1:ZvsBwAjOZZr7HBF3SytJaHCltuOfBKbM1vLSCjut7kw= -github.com/OpenIMSDK/openKeeper v0.9.8 h1:7IrOftwpGSyHkM0x10QDM7up1E9iB59cO9YwiT4mRXU= -github.com/OpenIMSDK/openKeeper v0.9.8/go.mod h1:RvyRXEcvWbonkmHLtT8KxGSCNlXY7OfDohhu53E6INU= +github.com/OpenIMSDK/openKeeper v0.9.7 h1:oWPAsXAHpMpZh8NPB9B8FktVfxk1Ec8tQPdxhzWgh7w= +github.com/OpenIMSDK/openKeeper v0.9.7/go.mod h1:RvyRXEcvWbonkmHLtT8KxGSCNlXY7OfDohhu53E6INU= github.com/OpenIMSDK/open_log v1.0.0 h1:ZQ908aWgPqfHOfkQ/oFSV20AZdRwPw+sZjC/sAPd5cA= github.com/OpenIMSDK/open_log v1.0.0/go.mod h1:qWvqF4iT2qBAP1eGGbinc0aAng1Y25X8A9Si1WS3oB4= github.com/OpenIMSDK/open_utils v1.0.8 h1:IopxWgJwEF5ZAPsRuiZZOfcxNOQOCt/p8VDENcHN9r4= diff --git a/internal/common/convert/convert.go b/internal/common/convert/convert.go index 7a4718a84..a829f053e 100644 --- a/internal/common/convert/convert.go +++ b/internal/common/convert/convert.go @@ -35,11 +35,25 @@ func NewPBFriend(friendInfo *sdk.FriendInfo) *PBFriend { } func (*PBFriend) PB2DB(friends []*sdk.FriendInfo) (DBFriends []*relation.FriendModel, err error) { - + for _, v := range friends { + u, err := NewPBFriend(v).Convert() + if err != nil { + return nil, err + } + DBFriends = append(DBFriends, u) + } + return } func (*DBFriend) DB2PB(friends []*relation.FriendModel) (PBFriends []*sdk.FriendInfo, err error) { - + for _, v := range friends { + u, err := NewDBFriend(v).Convert() + if err != nil { + return nil, err + } + PBFriends = append(PBFriends, u) + } + return } func (db *DBFriend) Convert() (*sdk.FriendInfo, error) { @@ -81,11 +95,25 @@ func NewPBFriendRequest(friendRequest *sdk.FriendRequest) *PBFriendRequest { } func (*PBFriendRequest) PB2DB(friendRequests []*sdk.FriendRequest) (DBFriendRequests []*relation.FriendRequestModel, err error) { - + for _, v := range friendRequests { + u, err := NewPBFriendRequest(v).Convert() + if err != nil { + return nil, err + } + DBFriendRequests = append(DBFriendRequests, u) + } + return } func (*DBFriendRequest) DB2PB(friendRequests []*relation.FriendRequestModel) (PBFriendRequests []*sdk.FriendRequest, err error) { - + for _, v := range friendRequests { + u, err := NewDBFriendRequest(v).Convert() + if err != nil { + return nil, err + } + PBFriendRequests = append(PBFriendRequests, u) + } + return } func (pb *PBFriendRequest) Convert() (*relation.FriendRequestModel, error) { @@ -122,11 +150,25 @@ type DBBlack struct { } func (*PBBlack) PB2DB(blacks []*sdk.BlackInfo) (DBBlacks []*relation.BlackModel, err error) { - + for _, v := range blacks { + u, err := NewPBBlack(v).Convert() + if err != nil { + return nil, err + } + DBBlacks = append(DBBlacks, u) + } + return } func (*DBBlack) DB2PB(blacks []*relation.BlackModel) (PBBlacks []*sdk.BlackInfo, err error) { - + for _, v := range blacks { + u, err := NewDBBlack(v).Convert() + if err != nil { + return nil, err + } + PBBlacks = append(PBBlacks, u) + } + return } func NewDBBlack(black *relation.BlackModel) *DBBlack { @@ -164,11 +206,25 @@ type DBGroup struct { } func (*PBGroup) PB2DB(groups []*sdk.GroupInfo) (DBGroups []*relation.GroupModel, err error) { - + for _, v := range groups { + u, err := NewPBGroup(v).Convert() + if err != nil { + return nil, err + } + DBGroups = append(DBGroups, u) + } + return } func (*DBGroup) DB2PB(groups []*relation.GroupModel) (PBGroups []*sdk.GroupInfo, err error) { - + for _, v := range groups { + u, err := NewDBGroup(v).Convert() + if err != nil { + return nil, err + } + PBGroups = append(PBGroups, u) + } + return } func NewDBGroup(group *relation.GroupModel) *DBGroup { @@ -183,10 +239,10 @@ func NewPBGroup(groupInfo *sdk.GroupInfo) *PBGroup { return &PBGroup{GroupInfo: groupInfo} } -func (pb *PBGroup) Convert() *relation.GroupModel { +func (pb *PBGroup) Convert() (*relation.GroupModel, error) { dst := &relation.GroupModel{} - _ = utils.CopyStructFields(dst, pb) - return dst + err := utils.CopyStructFields(dst, pb) + return dst, err } func (db *DBGroup) Convert() (*sdk.GroupInfo, error) { dst := &sdk.GroupInfo{} @@ -215,11 +271,25 @@ type DBGroupMember struct { } func (*PBGroupMember) PB2DB(groupMembers []*sdk.GroupMemberFullInfo) (DBGroupMembers []*relation.GroupMemberModel, err error) { - + for _, v := range groupMembers { + u, err := NewPBGroupMember(v).Convert() + if err != nil { + return nil, err + } + DBGroupMembers = append(DBGroupMembers, u) + } + return } func (*DBGroupMember) DB2PB(groupMembers []*relation.GroupMemberModel) (PBGroupMembers []*sdk.GroupMemberFullInfo, err error) { - + for _, v := range groupMembers { + u, err := NewDBGroupMember(v).Convert() + if err != nil { + return nil, err + } + PBGroupMembers = append(PBGroupMembers, u) + } + return } func NewDBGroupMember(groupMember *relation.GroupMemberModel) *DBGroupMember { @@ -267,11 +337,25 @@ type DBGroupRequest struct { } func (*PBGroupRequest) PB2DB(groupRequests []*sdk.GroupRequest) (DBGroupRequests []*relation.GroupRequestModel, err error) { - + for _, v := range groupRequests { + u, err := NewPBGroupRequest(v).Convert() + if err != nil { + return nil, err + } + DBGroupRequests = append(DBGroupRequests, u) + } + return } func (*DBGroupRequest) DB2PB(groupRequests []*relation.GroupRequestModel) (PBGroupRequests []*sdk.GroupRequest, err error) { - + for _, v := range groupRequests { + u, err := NewDBGroupRequest(v).Convert() + if err != nil { + return nil, err + } + PBGroupRequests = append(PBGroupRequests, u) + } + return } func NewDBGroupRequest(groupRequest *relation.GroupRequestModel) *DBGroupRequest { diff --git a/internal/common/network/ip.go b/internal/common/network/ip.go new file mode 100644 index 000000000..5c8ed4f59 --- /dev/null +++ b/internal/common/network/ip.go @@ -0,0 +1,23 @@ +package network + +import utils "github.com/OpenIMSDK/open_utils" + +func GetRpcRegisterIP(configIP string) (string, error) { + registerIP := configIP + if registerIP == "" { + ip, err := utils.GetLocalIP() + if err != nil { + return "", err + } + registerIP = ip + } + return registerIP, nil +} + +func GetListenIP(configIP string) string { + if configIP == "" { + return "0.0.0.0" + } else { + return configIP + } +} diff --git a/internal/common/rpc_server/a.go b/internal/common/rpc_server/a.go new file mode 100644 index 000000000..19852a1f5 --- /dev/null +++ b/internal/common/rpc_server/a.go @@ -0,0 +1,46 @@ +package rpc_server + +import ( + "Open_IM/internal/common/network" + "Open_IM/pkg/common/constant" + "Open_IM/pkg/common/log" + discoveryRegistry "Open_IM/pkg/discovery_registry" + "github.com/OpenIMSDK/openKeeper" + "net" + "strconv" +) + +type RpcServer struct { + Port int + RegisterName string + RegisterCenter discoveryRegistry.SvcDiscoveryRegistry +} + +func NewRpcServer(registerIPInConfig string, port int, registerName string, zkServers []string, zkRoot string) (*RpcServer, error) { + log.NewPrivateLog(constant.LogFileName) + s := &RpcServer{ + Port: port, + RegisterName: registerName, + } + + zkClient, err := openKeeper.NewClient(zkServers, zkRoot, 10, "", "") + if err != nil { + return nil, err + } + registerIP, err := network.GetRpcRegisterIP(registerIPInConfig) + if err != nil { + return nil, err + } + err = zkClient.Register(s.RegisterName, registerIP, s.Port) + if err != nil { + return nil, err + } + s.RegisterCenter = zkClient + return s, nil +} + +func GetTcpListen(listenIPInConfig string, port int) (net.Listener, string, error) { + address := network.GetListenIP(listenIPInConfig) + ":" + strconv.Itoa(port) + listener, err := net.Listen("tcp", address) + return listener, address, err +} diff --git a/internal/rpc/auth/auth.go b/internal/rpc/auth/auth.go index 303bc112f..b196915c5 100644 --- a/internal/rpc/auth/auth.go +++ b/internal/rpc/auth/auth.go @@ -2,28 +2,63 @@ package auth import ( "Open_IM/internal/common/check" + "Open_IM/internal/common/rpc_server" + "Open_IM/pkg/common/config" "Open_IM/pkg/common/constant" "Open_IM/pkg/common/db/controller" "Open_IM/pkg/common/log" promePkg "Open_IM/pkg/common/prometheus" "Open_IM/pkg/common/token_verify" "Open_IM/pkg/common/tracelog" - "Open_IM/pkg/getcdv3" pbAuth "Open_IM/pkg/proto/auth" pbRelay "Open_IM/pkg/proto/relay" "Open_IM/pkg/utils" "context" - "net" - "strconv" - "strings" - grpcPrometheus "github.com/grpc-ecosystem/go-grpc-prometheus" - - "Open_IM/pkg/common/config" - "google.golang.org/grpc" ) +func NewRpcAuthServer(port int) *rpcAuth { + r, err := rpc_server.NewRpcServer(config.Config.RpcRegisterIP, port, config.Config.RpcRegisterName.OpenImAuthName, config.Config.Zookeeper.ZkAddr, config.Config.Zookeeper.Schema) + if err != nil { + panic(err) + } + return &rpcAuth{ + RpcServer: r, + } +} + +func (s *rpcAuth) Run() { + operationID := utils.OperationIDGenerator() + log.NewInfo(operationID, "rpc auth start...") + listener, address, err := rpc_server.GetTcpListen(config.Config.ListenIP, s.Port) + if err != nil { + panic(err) + } + log.NewInfo(operationID, "listen network success ", listener, address) + var grpcOpts []grpc.ServerOption + if config.Config.Prometheus.Enable { + promePkg.NewGrpcRequestCounter() + promePkg.NewGrpcRequestFailedCounter() + promePkg.NewGrpcRequestSuccessCounter() + promePkg.NewUserRegisterCounter() + promePkg.NewUserLoginCounter() + grpcOpts = append(grpcOpts, []grpc.ServerOption{ + // grpc.UnaryInterceptor(promePkg.UnaryServerInterceptorProme), + grpc.StreamInterceptor(grpcPrometheus.StreamServerInterceptor), + grpc.UnaryInterceptor(grpcPrometheus.UnaryServerInterceptor), + }...) + } + srv := grpc.NewServer(grpcOpts...) + defer srv.GracefulStop() + pbAuth.RegisterAuthServer(srv, s) + err = srv.Serve(listener) + if err != nil { + panic(err) + } + log.NewInfo(operationID, "rpc auth ok") +} + func (s *rpcAuth) UserToken(ctx context.Context, req *pbAuth.UserTokenReq) (*pbAuth.UserTokenResp, error) { resp := pbAuth.UserTokenResp{} if _, err := check.GetUsersInfo(ctx, req.UserID); err != nil { @@ -85,7 +120,10 @@ func (s *rpcAuth) ForceLogout(ctx context.Context, req *pbAuth.ForceLogoutReq) ( } func (s *rpcAuth) forceKickOff(ctx context.Context, userID string, platformID int32, operationID string) error { - grpcCons := rpc.GetDefaultGatewayConn4Unique(config.Config.Etcd.EtcdSchema, strings.Join(config.Config.Etcd.EtcdAddr, ","), operationID) + grpcCons, err := s.RegisterCenter.GetConns(config.Config.RpcRegisterName.OpenImRelayName) + if err != nil { + return err + } for _, v := range grpcCons { client := pbRelay.NewRelayClient(v) kickReq := &pbRelay.KickUserOfflineReq{OperationID: operationID, KickUserIDList: []string{userID}, PlatformID: platformID} @@ -97,78 +135,6 @@ func (s *rpcAuth) forceKickOff(ctx context.Context, userID string, platformID in } type rpcAuth struct { - rpcPort int - rpcRegisterName string - etcdSchema string - etcdAddr []string + *rpc_server.RpcServer controller.AuthInterface } - -func NewRpcAuthServer(port int) *rpcAuth { - log.NewPrivateLog(constant.LogFileName) - return &rpcAuth{ - rpcPort: port, - rpcRegisterName: config.Config.RpcRegisterName.OpenImAuthName, - etcdSchema: config.Config.Etcd.EtcdSchema, - etcdAddr: config.Config.Etcd.EtcdAddr, - } -} - -func (s *rpcAuth) Run() { - operationID := utils.OperationIDGenerator() - log.NewInfo(operationID, "rpc auth start...") - - listenIP := "" - if config.Config.ListenIP == "" { - listenIP = "0.0.0.0" - } else { - listenIP = config.Config.ListenIP - } - address := listenIP + ":" + strconv.Itoa(s.rpcPort) - listener, err := net.Listen("tcp", address) - if err != nil { - panic("listening err:" + err.Error() + s.rpcRegisterName) - } - log.NewInfo(operationID, "listen network success, ", address, listener) - var grpcOpts []grpc.ServerOption - if config.Config.Prometheus.Enable { - promePkg.NewGrpcRequestCounter() - promePkg.NewGrpcRequestFailedCounter() - promePkg.NewGrpcRequestSuccessCounter() - promePkg.NewUserRegisterCounter() - promePkg.NewUserLoginCounter() - grpcOpts = append(grpcOpts, []grpc.ServerOption{ - // grpc.UnaryInterceptor(promePkg.UnaryServerInterceptorProme), - grpc.StreamInterceptor(grpcPrometheus.StreamServerInterceptor), - grpc.UnaryInterceptor(grpcPrometheus.UnaryServerInterceptor), - }...) - } - srv := grpc.NewServer(grpcOpts...) - defer srv.GracefulStop() - - //service registers with etcd - pbAuth.RegisterAuthServer(srv, s) - rpcRegisterIP := config.Config.RpcRegisterIP - if config.Config.RpcRegisterIP == "" { - rpcRegisterIP, err = utils.GetLocalIP() - if err != nil { - log.Error("", "GetLocalIP failed ", err.Error()) - } - } - log.NewInfo("", "rpcRegisterIP", rpcRegisterIP) - - err = rpc.RegisterEtcd(s.etcdSchema, strings.Join(s.etcdAddr, ","), rpcRegisterIP, s.rpcPort, s.rpcRegisterName, 10, "") - if err != nil { - log.NewError(operationID, "RegisterEtcd failed ", err.Error(), - s.etcdSchema, strings.Join(s.etcdAddr, ","), rpcRegisterIP, s.rpcPort, s.rpcRegisterName) - panic(utils.Wrap(err, "register auth module rpc to etcd err")) - - } - log.NewInfo(operationID, "RegisterAuthServer ok ", s.etcdSchema, strings.Join(s.etcdAddr, ","), rpcRegisterIP, s.rpcPort, s.rpcRegisterName) - err = srv.Serve(listener) - if err != nil { - log.NewError(operationID, "Serve failed ", err.Error()) - return - } - log.NewInfo(operationID, "rpc auth ok") -} diff --git a/internal/rpc/friend/friend.go b/internal/rpc/friend/friend.go index 11acc55e7..7b42061d1 100644 --- a/internal/rpc/friend/friend.go +++ b/internal/rpc/friend/friend.go @@ -1,78 +1,51 @@ package friend import ( + "Open_IM/internal/common/check" "Open_IM/internal/common/convert" + "Open_IM/internal/common/rpc_server" chat "Open_IM/internal/rpc/msg" "Open_IM/pkg/common/config" "Open_IM/pkg/common/constant" "Open_IM/pkg/common/db/controller" "Open_IM/pkg/common/db/relation" - relation2 "Open_IM/pkg/common/db/table/relation" + relationTb "Open_IM/pkg/common/db/table/relation" "Open_IM/pkg/common/log" "Open_IM/pkg/common/middleware" promePkg "Open_IM/pkg/common/prometheus" "Open_IM/pkg/common/token_verify" "Open_IM/pkg/common/tracelog" pbFriend "Open_IM/pkg/proto/friend" - pbUser "Open_IM/pkg/proto/user" "Open_IM/pkg/utils" "context" grpcPrometheus "github.com/grpc-ecosystem/go-grpc-prometheus" - "net" - "strconv" - "strings" - - "Open_IM/internal/common/check" - "github.com/OpenIMSDK/getcdv3" "google.golang.org/grpc" ) type friendServer struct { - rpcPort int - rpcRegisterName string - etcdSchema string - etcdAddr []string + *rpc_server.RpcServer + controller.FriendInterface controller.BlackInterface - - userRpc pbUser.UserClient } func NewFriendServer(port int) *friendServer { - log.NewPrivateLog(constant.LogFileName) - f := friendServer{ - rpcPort: port, - rpcRegisterName: config.Config.RpcRegisterName.OpenImFriendName, - etcdSchema: config.Config.Etcd.EtcdSchema, - etcdAddr: config.Config.Etcd.EtcdAddr, - } - ttl := 10 - etcdClient, err := getcdv3.NewEtcdConn(config.Config.Etcd.EtcdSchema, strings.Join(f.etcdAddr, ","), config.Config.RpcRegisterIP, config.Config.Etcd.UserName, config.Config.Etcd.Password, port, ttl) - if err != nil { - panic("NewEtcdConn failed" + err.Error()) - } - err = etcdClient.RegisterEtcd("", f.rpcRegisterName) + r, err := rpc_server.NewRpcServer(config.Config.RpcRegisterIP, port, config.Config.RpcRegisterName.OpenImFriendName, config.Config.Zookeeper.ZkAddr, config.Config.Zookeeper.Schema) if err != nil { - panic("NewEtcdConn failed" + err.Error()) + panic(err) } - - etcdClient.SetDefaultEtcdConfig(config.Config.RpcRegisterName.OpenImUserName, config.Config.RpcPort.OpenImUserPort) - conn := etcdClient.GetConn("", config.Config.RpcRegisterName.OpenImUserName) - f.userRpc = pbUser.NewUserClient(conn) - //mysql init var mysql relation.Mysql var model relation.FriendGorm - err = mysql.InitConn().AutoMigrateModel(&relation2.FriendModel{}) + err = mysql.InitConn().AutoMigrateModel(&relationTb.FriendModel{}) if err != nil { panic("db init err:" + err.Error()) } - err = mysql.InitConn().AutoMigrateModel(&relation2.FriendRequestModel{}) + err = mysql.InitConn().AutoMigrateModel(&relationTb.FriendRequestModel{}) if err != nil { panic("db init err:" + err.Error()) } - - err = mysql.InitConn().AutoMigrateModel(&relation2.BlackModel{}) + err = mysql.InitConn().AutoMigrateModel(&relationTb.BlackModel{}) if err != nil { panic("db init err:" + err.Error()) } @@ -81,28 +54,22 @@ func NewFriendServer(port int) *friendServer { } else { panic("db init err:" + "conn is nil") } - f.FriendInterface = controller.NewFriendController(model.DB) - f.BlackInterface = controller.NewBlackController(model.DB) - return &f + return &friendServer{ + RpcServer: r, + FriendInterface: controller.NewFriendController(model.DB), + BlackInterface: controller.NewBlackController(model.DB), + } } func (s *friendServer) Run() { - log.NewInfo("0", "friendServer run...") - - listenIP := "" - if config.Config.ListenIP == "" { - listenIP = "0.0.0.0" - } else { - listenIP = config.Config.ListenIP - } - address := listenIP + ":" + strconv.Itoa(s.rpcPort) - - //listener network - listener, err := net.Listen("tcp", address) + operationID := utils.OperationIDGenerator() + log.NewInfo(operationID, "friendServer run...") + listener, address, err := rpc_server.GetTcpListen(config.Config.ListenIP, s.Port) if err != nil { - panic("listening err:" + err.Error() + s.rpcRegisterName) + panic(err) } - log.NewInfo("0", "listen ok ", address) + + log.NewInfo(operationID, "listen ok ", address) defer listener.Close() //grpc server var grpcOpts []grpc.ServerOption @@ -122,7 +89,7 @@ func (s *friendServer) Run() { pbFriend.RegisterFriendServer(srv, s) err = srv.Serve(listener) if err != nil { - log.NewError("0", "Serve failed ", err.Error(), listener) + log.NewError(operationID, "Serve failed ", err.Error(), listener) return } } @@ -166,7 +133,7 @@ func (s *friendServer) ImportFriends(ctx context.Context, req *pbFriend.ImportFr return nil, err } - if utils.Contain(req.FriendUserIDs, req.OwnerUserID) { + if utils.Contain(req.OwnerUserID, req.FriendUserIDs...) { return nil, constant.ErrCanNotAddYourself.Wrap() } if utils.Duplicate(req.FriendUserIDs) { @@ -185,7 +152,7 @@ func (s *friendServer) RespondFriendApply(ctx context.Context, req *pbFriend.Res if err := check.Access(ctx, req.ToUserID); err != nil { return nil, err } - friendRequest := relation2.FriendRequestModel{FromUserID: req.FromUserID, ToUserID: req.ToUserID, HandleMsg: req.HandleMsg, HandleResult: req.HandleResult} + friendRequest := relationTb.FriendRequestModel{FromUserID: req.FromUserID, ToUserID: req.ToUserID, HandleMsg: req.HandleMsg, HandleResult: req.HandleResult} if req.HandleResult == constant.FriendResponseAgree { err := s.AgreeFriendRequest(ctx, &friendRequest) if err != nil { @@ -211,7 +178,7 @@ func (s *friendServer) DeleteFriend(ctx context.Context, req *pbFriend.DeleteFri if err := check.Access(ctx, req.OwnerUserID); err != nil { return nil, err } - _, err = s.FindFriends(ctx, req.OwnerUserID, []string{req.FriendUserID}) + _, err = s.FindFriendsWithError(ctx, req.OwnerUserID, []string{req.FriendUserID}) if err != nil { return nil, err } @@ -228,7 +195,7 @@ func (s *friendServer) SetFriendRemark(ctx context.Context, req *pbFriend.SetFri if err := check.Access(ctx, req.OwnerUserID); err != nil { return nil, err } - _, err = s.FindFriends(ctx, req.OwnerUserID, []string{req.FriendUserID}) + _, err = s.FindFriendsWithError(ctx, req.OwnerUserID, []string{req.FriendUserID}) if err != nil { return nil, err } @@ -240,12 +207,12 @@ func (s *friendServer) SetFriendRemark(ctx context.Context, req *pbFriend.SetFri } // ok -func (s *friendServer) GetDesignatedFriendsReq(ctx context.Context, req *pbFriend.GetDesignatedFriendsReq) (resp *pbFriend.GetDesignatedFriendsResp, err error) { +func (s *friendServer) GetDesignatedFriends(ctx context.Context, req *pbFriend.GetDesignatedFriendsReq) (resp *pbFriend.GetDesignatedFriendsResp, err error) { resp = &pbFriend.GetDesignatedFriendsResp{} if err := check.Access(ctx, req.UserID); err != nil { return nil, err } - friends, total, err := s.FriendInterface.FindOwnerFriends(ctx, req.UserID, req.Pagination.PageNumber, req.Pagination.ShowNumber) + friends, total, err := s.FriendInterface.PageOwnerFriends(ctx, req.UserID, req.Pagination.PageNumber, req.Pagination.ShowNumber) if err != nil { return nil, err } @@ -263,7 +230,7 @@ func (s *friendServer) GetPaginationFriendsApplyTo(ctx context.Context, req *pbF if err := check.Access(ctx, req.UserID); err != nil { return nil, err } - friendRequests, total, err := s.FriendInterface.FindFriendRequestToMe(ctx, req.UserID, req.Pagination.PageNumber, req.Pagination.ShowNumber) + friendRequests, total, err := s.FriendInterface.PageFriendRequestToMe(ctx, req.UserID, req.Pagination.PageNumber, req.Pagination.ShowNumber) if err != nil { return nil, err } @@ -281,7 +248,7 @@ func (s *friendServer) GetPaginationFriendsApplyFrom(ctx context.Context, req *p if err := check.Access(ctx, req.UserID); err != nil { return nil, err } - friendRequests, total, err := s.FriendInterface.FindFriendRequestFromMe(ctx, req.UserID, req.Pagination.PageNumber, req.Pagination.ShowNumber) + friendRequests, total, err := s.FriendInterface.PageFriendRequestFromMe(ctx, req.UserID, req.Pagination.PageNumber, req.Pagination.ShowNumber) if err != nil { return nil, err } @@ -309,7 +276,7 @@ func (s *friendServer) GetPaginationFriends(ctx context.Context, req *pbFriend.G if utils.Duplicate(req.FriendUserIDs) { return nil, constant.ErrArgs.Wrap("friend userID repeated") } - friends, err := s.FriendInterface.FindFriends(ctx, req.OwnerUserID, req.FriendUserIDs) + friends, err := s.FriendInterface.FindFriendsWithError(ctx, req.OwnerUserID, req.FriendUserIDs) if err != nil { return nil, err } diff --git a/internal/rpc/group/group.go b/internal/rpc/group/group.go index 6dcb94456..b97096ec3 100644 --- a/internal/rpc/group/group.go +++ b/internal/rpc/group/group.go @@ -1,13 +1,14 @@ package group import ( + "Open_IM/internal/common/network" chat "Open_IM/internal/rpc/msg" "Open_IM/pkg/common/config" "Open_IM/pkg/common/constant" "Open_IM/pkg/common/db/cache" "Open_IM/pkg/common/db/controller" - relation_conn "Open_IM/pkg/common/db/relation" - "Open_IM/pkg/common/db/table/relation" + "Open_IM/pkg/common/db/relation" + relationTb "Open_IM/pkg/common/db/table/relation" "Open_IM/pkg/common/db/unrelation" "Open_IM/pkg/common/log" "Open_IM/pkg/common/middleware" @@ -20,7 +21,6 @@ import ( "Open_IM/pkg/utils" "context" "fmt" - "github.com/OpenIMSDK/getcdv3" grpcPrometheus "github.com/grpc-ecosystem/go-grpc-prometheus" "math/big" "net" @@ -36,13 +36,10 @@ import ( type groupServer struct { rpcPort int rpcRegisterName string - etcdSchema string - etcdAddr []string + schema string + zkAddr []string GroupInterface controller.GroupInterface - - etcdConn *getcdv3.EtcdConn - //userRpc pbUser.UserClient - //conversationRpc pbConversation.ConversationClient + registerCenter discoveryRegistry.SvcDiscoveryRegistry } func NewGroupServer(port int) *groupServer { @@ -50,73 +47,48 @@ func NewGroupServer(port int) *groupServer { g := groupServer{ rpcPort: port, rpcRegisterName: config.Config.RpcRegisterName.OpenImGroupName, - etcdSchema: config.Config.Etcd.EtcdSchema, - etcdAddr: config.Config.Etcd.EtcdAddr, - } - ttl := 10 - etcdClient, err := getcdv3.NewEtcdConn(config.Config.Etcd.EtcdSchema, strings.Join(g.etcdAddr, ","), config.Config.RpcRegisterIP, config.Config.Etcd.UserName, config.Config.Etcd.Password, port, ttl) - if err != nil { - panic("NewEtcdConn failed" + err.Error()) - } - err = etcdClient.RegisterEtcd("", g.rpcRegisterName) - if err != nil { - panic("NewEtcdConn failed" + err.Error()) + schema: config.Config.Zookeeper.Schema, + zkAddr: config.Config.Zookeeper.ZkAddr, } - etcdClient.SetDefaultEtcdConfig(config.Config.RpcRegisterName.OpenImUserName, config.Config.RpcPort.OpenImUserPort) - //conn := etcdClient.GetConn("", config.Config.RpcRegisterName.OpenImUserName) - //g.userRpc = pbUser.NewUserClient(conn) - - etcdClient.SetDefaultEtcdConfig(config.Config.RpcRegisterName.OpenImConversationName, config.Config.RpcPort.OpenImConversationPort) - //conn = etcdClient.GetConn("", config.Config.RpcRegisterName.OpenImConversationName) - //g.conversationRpc = pbConversation.NewConversationClient(conn) - //mysql init - var mysql relation_conn.Mysql + var mysql relation.Mysql var mongo unrelation.Mongo - var groupModel relation.GroupModel + var groupModel relationTb.GroupModel var redis cache.RedisClient - err = mysql.InitConn().AutoMigrateModel(&groupModel) + err := mysql.InitConn().AutoMigrateModel(&groupModel) if err != nil { panic("db init err:" + err.Error()) } - if mysql.GormConn() != nil { - groupModel.DB = mysql.GormConn() - } else { - panic("db init err:" + "conn is nil") - } mongo.InitMongo() redis.InitRedis() mongo.CreateSuperGroupIndex() - zkClient, err := openKeeper.NewClient([]string{"43.154.157.177:2181"}, config.Config.Etcd.EtcdSchema, 10, "", "") + zkClient, err := openKeeper.NewClient(config.Config.Zookeeper.ZkAddr, config.Config.Zookeeper.Schema, 10, "", "") if err != nil { panic(err.Error()) } - registerIP, err := utils.GetRpcIP() - err = zkClient.Register(config.Config.RpcRegisterName.OpenImGroupName, registerIP, port) + registerIP, err := network.GetRpcRegisterIP(config.Config.RpcRegisterIP) + g.registerCenter = zkClient + err = g.registerCenter.Register(config.Config.RpcRegisterName.OpenImGroupName, registerIP, port) if err != nil { panic(err.Error()) } - var registerCenter discoveryRegistry.SvcDiscoveryRegistry = zkClient - conns, err := registerCenter.GetConns(config.Config.RpcRegisterName.OpenImConversationName) + + //conns, err := g.registerCenter.GetConns(config.Config.RpcRegisterName.OpenImConversationName) + g.GroupInterface = controller.NewGroupInterface(controller.NewGroupDatabase(mysql.GormConn(), redis.GetClient(), mongo.GetClient())) return &g } func (s *groupServer) Run() { - log.NewInfo("", "group rpc start ") - listenIP := "" - if config.Config.ListenIP == "" { - listenIP = "0.0.0.0" - } else { - listenIP = config.Config.ListenIP - } - address := listenIP + ":" + strconv.Itoa(s.rpcPort) + operationID := utils.OperationIDGenerator() + log.NewInfo(operationID, "group rpc start ") + address := network.GetListenIP(config.Config.ListenIP) + ":" + strconv.Itoa(s.rpcPort) //listener network listener, err := net.Listen("tcp", address) if err != nil { panic("listening err:" + err.Error() + s.rpcRegisterName) } - log.NewInfo("", "listen network success, ", address, listener) + log.NewInfo(operationID, "listen network success, ", address, listener) defer listener.Close() //grpc server @@ -141,28 +113,12 @@ func (s *groupServer) Run() { defer srv.GracefulStop() //Service registers with etcd pbGroup.RegisterGroupServer(srv, s) - - rpcRegisterIP := config.Config.RpcRegisterIP - if config.Config.RpcRegisterIP == "" { - rpcRegisterIP, err = utils.GetLocalIP() - if err != nil { - log.Error("", "GetLocalIP failed ", err.Error()) - } - } - log.NewInfo("", "rpcRegisterIP", rpcRegisterIP) - err = getcdv3.RegisterEtcd(s.etcdSchema, strings.Join(s.etcdAddr, ","), rpcRegisterIP, s.rpcPort, s.rpcRegisterName, 10, "") - if err != nil { - log.NewError("", "RegisterEtcd failed ", err.Error()) - panic(utils.Wrap(err, "register group module rpc to etcd err")) - - } - log.Info("", "RegisterEtcd ", s.etcdSchema, strings.Join(s.etcdAddr, ","), rpcRegisterIP, s.rpcPort, s.rpcRegisterName) err = srv.Serve(listener) if err != nil { - log.NewError("", "Serve failed ", err.Error()) + log.NewError(operationID, "Serve failed ", err.Error()) return } - log.NewInfo("", "group rpc success") + log.NewInfo(operationID, "group rpc success") } func (s *groupServer) CheckGroupAdmin(ctx context.Context, groupID string) error { @@ -200,7 +156,7 @@ func (s *groupServer) CreateGroup(ctx context.Context, req *pbGroup.CreateGroupR if err := callbackBeforeCreateGroup(ctx, req); err != nil { return nil, err } - var groupMembers []*relation.GroupMemberModel + var groupMembers []*relationTb.GroupMemberModel group := PbToDBGroupInfo(req.GroupInfo) group.GroupID = genGroupID(ctx, req.GroupInfo.GroupID) joinGroup := func(userID string, roleLevel int32) error { @@ -265,7 +221,7 @@ func (s *groupServer) GetJoinedGroupList(ctx context.Context, req *pbGroup.GetJo if len(members) == 0 { return resp, nil } - groupIDs := utils.Slice(members, func(e *relation.GroupMemberModel) string { + groupIDs := utils.Slice(members, func(e *relationTb.GroupMemberModel) string { return e.GroupID }) groups, err := s.GroupInterface.FindGroup(ctx, groupIDs) @@ -280,12 +236,12 @@ func (s *groupServer) GetJoinedGroupList(ctx context.Context, req *pbGroup.GetJo if err != nil { return nil, err } - ownerMap := utils.SliceToMap(owners, func(e *relation.GroupMemberModel) string { + ownerMap := utils.SliceToMap(owners, func(e *relationTb.GroupMemberModel) string { return e.GroupID }) - resp.Groups = utils.Slice(utils.Order(groupIDs, groups, func(group *relation.GroupModel) string { + resp.Groups = utils.Slice(utils.Order(groupIDs, groups, func(group *relationTb.GroupModel) string { return group.GroupID - }), func(group *relation.GroupModel) *open_im_sdk.GroupInfo { + }), func(group *relationTb.GroupModel) *open_im_sdk.GroupInfo { return DbToPbGroupInfo(group, ownerMap[group.GroupID].UserID, uint32(groupMemberNum[group.GroupID])) }) return resp, nil @@ -310,7 +266,7 @@ func (s *groupServer) InviteUserToGroup(ctx context.Context, req *pbGroup.Invite if err != nil { return nil, err } - memberMap := utils.SliceToMap(members, func(e *relation.GroupMemberModel) string { + memberMap := utils.SliceToMap(members, func(e *relationTb.GroupMemberModel) string { return e.UserID }) if ids := utils.Single(req.InvitedUserIDs, utils.Keys(memberMap)); len(ids) > 0 { @@ -364,7 +320,7 @@ func (s *groupServer) InviteUserToGroup(ctx context.Context, req *pbGroup.Invite } } else { opUserID := tracelog.GetOpUserID(ctx) - var groupMembers []*relation.GroupMemberModel + var groupMembers []*relationTb.GroupMemberModel for _, userID := range req.InvitedUserIDs { member := PbToDbGroupMember(userMap[userID]) member.GroupID = req.GroupID @@ -398,7 +354,7 @@ func (s *groupServer) GetGroupAllMember(ctx context.Context, req *pbGroup.GetGro if err != nil { return nil, err } - resp.Members = utils.Slice(members, func(e *relation.GroupMemberModel) *open_im_sdk.GroupMemberFullInfo { + resp.Members = utils.Slice(members, func(e *relationTb.GroupMemberModel) *open_im_sdk.GroupMemberFullInfo { return DbToPbGroupMembersCMSResp(e) }) return resp, nil @@ -411,7 +367,7 @@ func (s *groupServer) GetGroupMemberList(ctx context.Context, req *pbGroup.GetGr return nil, err } resp.Total = total - resp.Members = utils.Slice(members, func(e *relation.GroupMemberModel) *open_im_sdk.GroupMemberFullInfo { + resp.Members = utils.Slice(members, func(e *relationTb.GroupMemberModel) *open_im_sdk.GroupMemberFullInfo { return DbToPbGroupMembersCMSResp(e) }) return resp, nil @@ -447,7 +403,7 @@ func (s *groupServer) KickGroupMember(ctx context.Context, req *pbGroup.KickGrou if err != nil { return nil, err } - memberMap := make(map[string]*relation.GroupMemberModel) + memberMap := make(map[string]*relationTb.GroupMemberModel) for i, member := range members { memberMap[member.UserID] = members[i] } @@ -496,7 +452,7 @@ func (s *groupServer) GetGroupMembersInfo(ctx context.Context, req *pbGroup.GetG if err != nil { return nil, err } - resp.Members = utils.Slice(members, func(e *relation.GroupMemberModel) *open_im_sdk.GroupMemberFullInfo { + resp.Members = utils.Slice(members, func(e *relationTb.GroupMemberModel) *open_im_sdk.GroupMemberFullInfo { return DbToPbGroupMembersCMSResp(e) }) return resp, nil @@ -533,7 +489,7 @@ func (s *groupServer) GetGroupApplicationList(ctx context.Context, req *pbGroup. if err != nil { return nil, err } - groupMap := utils.SliceToMap(groups, func(e *relation.GroupModel) string { + groupMap := utils.SliceToMap(groups, func(e *relationTb.GroupModel) string { return e.GroupID }) if ids := utils.Single(utils.Keys(groupMap), groupIDs); len(ids) > 0 { @@ -547,10 +503,10 @@ func (s *groupServer) GetGroupApplicationList(ctx context.Context, req *pbGroup. if err != nil { return nil, err } - ownerMap := utils.SliceToMap(owners, func(e *relation.GroupMemberModel) string { + ownerMap := utils.SliceToMap(owners, func(e *relationTb.GroupMemberModel) string { return e.GroupID }) - resp.GroupRequests = utils.Slice(groupRequests, func(e *relation.GroupRequestModel) *open_im_sdk.GroupRequest { + resp.GroupRequests = utils.Slice(groupRequests, func(e *relationTb.GroupRequestModel) *open_im_sdk.GroupRequest { return DbToPbGroupRequest(e, userMap[e.UserID], DbToPbGroupInfo(groupMap[e.GroupID], ownerMap[e.GroupID].UserID, uint32(groupMemberNumMap[e.GroupID]))) }) return resp, nil @@ -573,10 +529,10 @@ func (s *groupServer) GetGroupsInfo(ctx context.Context, req *pbGroup.GetGroupsI if err != nil { return nil, err } - ownerMap := utils.SliceToMap(owners, func(e *relation.GroupMemberModel) string { + ownerMap := utils.SliceToMap(owners, func(e *relationTb.GroupMemberModel) string { return e.GroupID }) - resp.GroupInfos = utils.Slice(groups, func(e *relation.GroupModel) *open_im_sdk.GroupInfo { + resp.GroupInfos = utils.Slice(groups, func(e *relationTb.GroupModel) *open_im_sdk.GroupInfo { return DbToPbGroupInfo(e, ownerMap[e.GroupID].UserID, uint32(groupMemberNumMap[e.GroupID])) }) return resp, nil @@ -618,9 +574,9 @@ func (s *groupServer) GroupApplicationResponse(ctx context.Context, req *pbGroup if err != nil { return nil, err } - var member *relation.GroupMemberModel + var member *relationTb.GroupMemberModel if req.HandleResult == constant.GroupResponseAgree { - member = &relation.GroupMemberModel{ + member = &relationTb.GroupMemberModel{ GroupID: req.GroupID, UserID: user.UserID, Nickname: user.Nickname, @@ -664,7 +620,7 @@ func (s *groupServer) JoinGroup(ctx context.Context, req *pbGroup.JoinGroupReq) if group.GroupType == constant.SuperGroup { return nil, constant.ErrGroupTypeNotSupport.Wrap() } - user, err := relation_conn.GetUserByUserID(tracelog.GetOpUserID(ctx)) + user, err := relation.GetUserByUserID(tracelog.GetOpUserID(ctx)) if err != nil { return nil, err } @@ -677,20 +633,20 @@ func (s *groupServer) JoinGroup(ctx context.Context, req *pbGroup.JoinGroupReq) if err := CallbackBeforeMemberJoinGroup(ctx, tracelog.GetOperationID(ctx), groupMember, group.Ex); err != nil { return nil, err } - if err := s.GroupInterface.CreateGroup(ctx, nil, []*relation.GroupMemberModel{groupMember}); err != nil { + if err := s.GroupInterface.CreateGroup(ctx, nil, []*relationTb.GroupMemberModel{groupMember}); err != nil { return nil, err } chat.MemberEnterDirectlyNotification(req.GroupID, tracelog.GetOpUserID(ctx), tracelog.GetOperationID(ctx)) return resp, nil } - groupRequest := relation.GroupRequestModel{ + groupRequest := relationTb.GroupRequestModel{ UserID: tracelog.GetOpUserID(ctx), ReqMsg: req.ReqMessage, GroupID: req.GroupID, JoinSource: req.JoinSource, ReqTime: time.Now(), } - if err := s.GroupInterface.CreateGroupRequest(ctx, []*relation.GroupRequestModel{&groupRequest}); err != nil { + if err := s.GroupInterface.CreateGroupRequest(ctx, []*relationTb.GroupRequestModel{&groupRequest}); err != nil { return nil, err } chat.JoinGroupApplicationNotification(ctx, req) @@ -729,7 +685,7 @@ func (s *groupServer) SetGroupInfo(ctx context.Context, req *pbGroup.SetGroupInf return nil, constant.ErrNoPermission.Wrap("no group owner or admin") } } - group, err := s.TakeGroup(ctx, req.GroupInfoForSet.GroupID) + group, err := s.GroupInterface.TakeGroup(ctx, req.GroupInfoForSet.GroupID) if err != nil { return nil, err } @@ -743,7 +699,7 @@ func (s *groupServer) SetGroupInfo(ctx context.Context, req *pbGroup.SetGroupInf if err := s.GroupInterface.UpdateGroup(ctx, group.GroupID, data); err != nil { return nil, err } - group, err = s.TakeGroup(ctx, req.GroupInfoForSet.GroupID) + group, err = s.GroupInterface.TakeGroup(ctx, req.GroupInfoForSet.GroupID) if err != nil { return nil, err } @@ -770,7 +726,7 @@ func (s *groupServer) TransferGroupOwner(ctx context.Context, req *pbGroup.Trans if err != nil { return nil, err } - memberMap := utils.SliceToMap(members, func(e *relation.GroupMemberModel) string { return e.UserID }) + memberMap := utils.SliceToMap(members, func(e *relationTb.GroupMemberModel) string { return e.UserID }) if ids := utils.Single([]string{req.OldOwnerUserID, req.NewOwnerUserID}, utils.Keys(memberMap)); len(ids) > 0 { return nil, constant.ErrArgs.Wrap("user not in group " + strings.Join(ids, ",")) } @@ -804,7 +760,7 @@ func (s *groupServer) TransferGroupOwner(ctx context.Context, req *pbGroup.Trans func (s *groupServer) GetGroups(ctx context.Context, req *pbGroup.GetGroupsReq) (*pbGroup.GetGroupsResp, error) { resp := &pbGroup.GetGroupsResp{} var ( - groups []*relation.GroupModel + groups []*relationTb.GroupModel err error ) if req.GroupID != "" { @@ -816,14 +772,14 @@ func (s *groupServer) GetGroups(ctx context.Context, req *pbGroup.GetGroupsReq) if err != nil { return nil, err } - groupIDs := utils.Slice(groups, func(e *relation.GroupModel) string { + groupIDs := utils.Slice(groups, func(e *relationTb.GroupModel) string { return e.GroupID }) ownerMembers, err := s.GroupInterface.FindGroupMember(ctx, groupIDs, nil, []int32{constant.GroupOwner}) if err != nil { return nil, err } - ownerMemberMap := utils.SliceToMap(ownerMembers, func(e *relation.GroupMemberModel) string { + ownerMemberMap := utils.SliceToMap(ownerMembers, func(e *relationTb.GroupMemberModel) string { return e.GroupID }) if ids := utils.Single(groupIDs, utils.Keys(ownerMemberMap)); len(ids) > 0 { @@ -833,7 +789,7 @@ func (s *groupServer) GetGroups(ctx context.Context, req *pbGroup.GetGroupsReq) if err != nil { return nil, err } - resp.Groups = utils.Slice(groups, func(group *relation.GroupModel) *pbGroup.CMSGroup { + resp.Groups = utils.Slice(groups, func(group *relationTb.GroupModel) *pbGroup.CMSGroup { member := ownerMemberMap[group.GroupID] return DbToPbCMSGroup(group, member.UserID, member.Nickname, uint32(groupMemberNumMap[group.GroupID])) }) @@ -847,7 +803,7 @@ func (s *groupServer) GetGroupMembersCMS(ctx context.Context, req *pbGroup.GetGr return nil, err } resp.Total = total - resp.Members = utils.Slice(members, func(e *relation.GroupMemberModel) *open_im_sdk.GroupMemberFullInfo { + resp.Members = utils.Slice(members, func(e *relationTb.GroupMemberModel) *open_im_sdk.GroupMemberFullInfo { return DbToPbGroupMembersCMSResp(e) }) return resp, nil @@ -867,14 +823,14 @@ func (s *groupServer) GetUserReqApplicationList(ctx context.Context, req *pbGrou if len(requests) == 0 { return resp, nil } - groupIDs := utils.Distinct(utils.Slice(requests, func(e *relation.GroupRequestModel) string { + groupIDs := utils.Distinct(utils.Slice(requests, func(e *relationTb.GroupRequestModel) string { return e.GroupID })) groups, err := s.GroupInterface.FindGroup(ctx, groupIDs) if err != nil { return nil, err } - groupMap := utils.SliceToMap(groups, func(e *relation.GroupModel) string { + groupMap := utils.SliceToMap(groups, func(e *relationTb.GroupModel) string { return e.GroupID }) if ids := utils.Single(groupIDs, utils.Keys(groupMap)); len(ids) > 0 { @@ -884,7 +840,7 @@ func (s *groupServer) GetUserReqApplicationList(ctx context.Context, req *pbGrou if err != nil { return nil, err } - ownerMap := utils.SliceToMap(owners, func(e *relation.GroupMemberModel) string { + ownerMap := utils.SliceToMap(owners, func(e *relationTb.GroupMemberModel) string { return e.GroupID }) if ids := utils.Single(groupIDs, utils.Keys(ownerMap)); len(ids) > 0 { @@ -894,7 +850,7 @@ func (s *groupServer) GetUserReqApplicationList(ctx context.Context, req *pbGrou if err != nil { return nil, err } - resp.GroupRequests = utils.Slice(requests, func(e *relation.GroupRequestModel) *open_im_sdk.GroupRequest { + resp.GroupRequests = utils.Slice(requests, func(e *relationTb.GroupRequestModel) *open_im_sdk.GroupRequest { return DbToPbGroupRequest(e, user, DbToPbGroupInfo(groupMap[e.GroupID], ownerMap[e.GroupID].UserID, uint32(groupMemberNum[e.GroupID]))) }) return resp, nil @@ -1058,7 +1014,7 @@ func (s *groupServer) GetGroupAbstractInfo(ctx context.Context, req *pbGroup.Get if err != nil { return nil, err } - resp.GroupAbstractInfos = utils.Slice(groups, func(e *relation.GroupModel) *pbGroup.GroupAbstractInfo { + resp.GroupAbstractInfos = utils.Slice(groups, func(e *relationTb.GroupModel) *pbGroup.GroupAbstractInfo { userIDs := groupUserMap[e.GroupID] utils.Sort(userIDs, true) bi := big.NewInt(0) @@ -1077,7 +1033,7 @@ func (s *groupServer) GetUserInGroupMembers(ctx context.Context, req *pbGroup.Ge if err != nil { return nil, err } - resp.Members = utils.Slice(members, func(e *relation.GroupMemberModel) *open_im_sdk.GroupMemberFullInfo { + resp.Members = utils.Slice(members, func(e *relationTb.GroupMemberModel) *open_im_sdk.GroupMemberFullInfo { return DbToPbGroupMembersCMSResp(e) }) return resp, nil diff --git a/internal/rpc/user/user.go b/internal/rpc/user/user.go index bec86a761..1139b1c1d 100644 --- a/internal/rpc/user/user.go +++ b/internal/rpc/user/user.go @@ -2,27 +2,21 @@ package user import ( "Open_IM/internal/common/convert" + "Open_IM/internal/common/rpc_server" chat "Open_IM/internal/rpc/msg" "Open_IM/pkg/common/config" "Open_IM/pkg/common/constant" "Open_IM/pkg/common/db/controller" "Open_IM/pkg/common/db/relation" - relation2 "Open_IM/pkg/common/db/table/relation" + relationTb "Open_IM/pkg/common/db/table/relation" "Open_IM/pkg/common/log" promePkg "Open_IM/pkg/common/prometheus" "Open_IM/pkg/common/token_verify" "Open_IM/pkg/common/tracelog" - pbFriend "Open_IM/pkg/proto/friend" - pbGroup "Open_IM/pkg/proto/group" server_api_params "Open_IM/pkg/proto/sdk_ws" pbUser "Open_IM/pkg/proto/user" "Open_IM/pkg/utils" "context" - "github.com/golang/protobuf/ptypes/wrappers" - "net" - "strconv" - "strings" - grpcPrometheus "github.com/grpc-ecosystem/go-grpc-prometheus" "google.golang.org/grpc" @@ -31,23 +25,19 @@ import ( type userServer struct { rpcPort int rpcRegisterName string - etcdSchema string - etcdAddr []string + *rpc_server.RpcServer controller.UserInterface } func NewUserServer(port int) *userServer { - log.NewPrivateLog(constant.LogFileName) - u := userServer{ - rpcPort: port, - rpcRegisterName: config.Config.RpcRegisterName.OpenImUserName, - etcdSchema: config.Config.Etcd.EtcdSchema, - etcdAddr: config.Config.Etcd.EtcdAddr, + r, err := rpc_server.NewRpcServer(config.Config.RpcRegisterIP, port, config.Config.RpcRegisterName.OpenImUserName, config.Config.Zookeeper.ZkAddr, config.Config.Zookeeper.Schema) + if err != nil { + panic(err) } //mysql init var mysql relation.Mysql var model relation.UserGorm - err := mysql.InitConn().AutoMigrateModel(&model) + err = mysql.InitConn().AutoMigrateModel(&model) if err != nil { panic("db init err:" + err.Error()) } @@ -56,27 +46,17 @@ func NewUserServer(port int) *userServer { } else { panic("db init err:" + "conn is nil") } - u.UserInterface = controller.NewUserController(model.DB) - return &u + return &userServer{RpcServer: r, UserInterface: controller.NewUserController(model.DB)} } func (s *userServer) Run() { - log.NewInfo("", "rpc user start...") - - listenIP := "" - if config.Config.ListenIP == "" { - listenIP = "0.0.0.0" - } else { - listenIP = config.Config.ListenIP - } - address := listenIP + ":" + strconv.Itoa(s.rpcPort) - - //listener network - listener, err := net.Listen("tcp", address) + operationID := utils.OperationIDGenerator() + log.NewInfo(operationID, "rpc user start...") + listener, address, err := rpc_server.GetTcpListen(config.Config.ListenIP, s.Port) if err != nil { - panic("listening err:" + err.Error() + s.rpcRegisterName) + panic(err) } - log.NewInfo("", "listen network success, address ", address, listener) + log.NewInfo(operationID, "listen ok ", address) defer listener.Close() //grpc server var grpcOpts []grpc.ServerOption @@ -94,41 +74,22 @@ func (s *userServer) Run() { defer srv.GracefulStop() //Service registers with etcd pbUser.RegisterUserServer(srv, s) - rpcRegisterIP := config.Config.RpcRegisterIP - if config.Config.RpcRegisterIP == "" { - rpcRegisterIP, err = utils.GetLocalIP() - if err != nil { - log.Error("", "GetLocalIP failed ", err.Error()) - } - } - err = rpc.RegisterEtcd(s.etcdSchema, strings.Join(s.etcdAddr, ","), rpcRegisterIP, s.rpcPort, s.rpcRegisterName, 10, "") - if err != nil { - log.NewError("", "RegisterEtcd failed ", err.Error(), s.etcdSchema, strings.Join(s.etcdAddr, ","), rpcRegisterIP, s.rpcPort, s.rpcRegisterName) - panic(utils.Wrap(err, "register user module rpc to etcd err")) - } + err = srv.Serve(listener) if err != nil { - log.NewError("", "Serve failed ", err.Error()) - return + panic(err) } - log.NewInfo("", "rpc user success") + log.NewInfo(operationID, "rpc user success") } +// ok func (s *userServer) SyncJoinedGroupMemberFaceURL(ctx context.Context, userID string, faceURL string, operationID string, opUserID string) { - etcdConn, err := rpc.GetConn(ctx, config.Config.RpcRegisterName.OpenImFriendName) - if err != nil { - return - } - client := pbGroup.NewGroupClient(etcdConn) - newReq := &pbGroup.GetJoinedGroupListReq{FromUserID: userID} - rpcResp, err := client.GetJoinedGroupList(ctx, newReq) + members, err := s.GetJoinedGroupMembers(ctx, userID) if err != nil { return } - - for _, group := range rpcResp.Groups { - req := &pbGroup.SetGroupMemberInfoReq{GroupID: group.GroupID, UserID: userID, FaceURL: &wrappers.StringValue{Value: faceURL}} - _, err := client.SetGroupMemberInfo(ctx, req) + for _, group := range members { + err := s.SetGroupMemberFaceURL(ctx, faceURL, group.GroupID, userID) if err != nil { return } @@ -136,29 +97,15 @@ func (s *userServer) SyncJoinedGroupMemberFaceURL(ctx context.Context, userID st } } +// ok func (s *userServer) SyncJoinedGroupMemberNickname(ctx context.Context, userID string, newNickname, oldNickname string, operationID string, opUserID string) { - etcdConn, err := rpc.GetConn(ctx, config.Config.RpcRegisterName.OpenImFriendName) - if err != nil { - return - } - client := pbGroup.NewGroupClient(etcdConn) - newReq := &pbGroup.GetJoinedGroupListReq{FromUserID: userID} - rpcResp, err := client.GetJoinedGroupList(ctx, newReq) - if err != nil { - return - } - req := pbGroup.GetUserInGroupMembersReq{UserID: userID} - for _, group := range rpcResp.Groups { - req.GroupIDs = append(req.GroupIDs, group.GroupID) - } - resp, err := client.GetUserInGroupMembers(ctx, &req) + members, err := s.GetJoinedGroupMembers(ctx, userID) if err != nil { return } - for _, v := range resp.Members { + for _, v := range members { if v.Nickname == oldNickname { - req := pbGroup.SetGroupMemberNicknameReq{Nickname: newNickname, GroupID: v.GroupID, UserID: v.UserID} - _, err := client.SetGroupMemberNickname(ctx, &req) + err := s.SetGroupMemberNickname(ctx, newNickname, v.GroupID, v.UserID) if err != nil { return } @@ -167,10 +114,25 @@ func (s *userServer) SyncJoinedGroupMemberNickname(ctx context.Context, userID s } } +// 设置群昵称 +func (s *userServer) SetGroupMemberNickname(ctx context.Context, nickname string, groupID string, userID string) (err error) { + return +} + +// 设置群头像 +func (s *userServer) SetGroupMemberFaceURL(ctx context.Context, faceURL string, groupID string, userID string) (err error) { + return +} + +// 获取加入的群成员信息 +func (s *userServer) GetJoinedGroupMembers(ctx context.Context, userID string) (members []*server_api_params.GroupMemberFullInfo, err error) { + return +} + // ok func (s *userServer) GetDesignateUsers(ctx context.Context, req *pbUser.GetDesignateUsersReq) (resp *pbUser.GetDesignateUsersResp, err error) { resp = &pbUser.GetDesignateUsersResp{} - users, err := s.Find(ctx, req.UserIDs) + users, err := s.FindWithError(ctx, req.UserIDs) if err != nil { return nil, err } @@ -181,15 +143,20 @@ func (s *userServer) GetDesignateUsers(ctx context.Context, req *pbUser.GetDesig return resp, nil } -func (s *userServer) UpdateUserInfo(ctx context.Context, req *pbUser.UpdateUserInfoReq) (*pbUser.UpdateUserInfoResp, error) { - resp := pbUser.UpdateUserInfoResp{} - err := token_verify.CheckAccessV3(ctx, req.UserInfo.UserID) +func (s *userServer) GetAllPageFriends(ctx context.Context, ownerUserID string) (resp []*server_api_params.FriendInfo, err error) { + return +} + +// ok +func (s *userServer) UpdateUserInfo(ctx context.Context, req *pbUser.UpdateUserInfoReq) (resp *pbUser.UpdateUserInfoResp, err error) { + resp = &pbUser.UpdateUserInfoResp{} + err = token_verify.CheckAccessV3(ctx, req.UserInfo.UserID) if err != nil { return nil, err } oldNickname := "" if req.UserInfo.Nickname != "" { - u, err := s.Find(ctx, []string{req.UserInfo.UserID}) + u, err := s.FindWithError(ctx, []string{req.UserInfo.UserID}) if err != nil { return nil, err } @@ -199,40 +166,34 @@ func (s *userServer) UpdateUserInfo(ctx context.Context, req *pbUser.UpdateUserI if err != nil { return nil, err } - err = s.Update(ctx, []*relation2.UserModel{user}) - if err != nil { - return nil, err - } - etcdConn, err := rpc.GetConn(ctx, config.Config.RpcRegisterName.OpenImFriendName) + err = s.Update(ctx, []*relationTb.UserModel{user}) if err != nil { return nil, err } - client := pbFriend.NewFriendClient(etcdConn) - newReq := &pbFriend.GetFriendsReq{UserID: req.UserInfo.UserID} - rpcResp, err := client.GetFriends(context.Background(), newReq) + friends, err := s.GetAllPageFriends(ctx, req.UserInfo.UserID) if err != nil { return nil, err } go func() { - for _, v := range rpcResp.FriendsInfo { + for _, v := range friends { chat.FriendInfoUpdatedNotification(ctx, req.UserInfo.UserID, v.FriendUser.UserID, tracelog.GetOpUserID(ctx)) } }() chat.UserInfoUpdatedNotification(ctx, tracelog.GetOpUserID(ctx), req.UserInfo.UserID) if req.UserInfo.FaceURL != "" { - s.SyncJoinedGroupMemberFaceURL(ctx, req.UserInfo.UserID, req.UserInfo.FaceURL, tracelog.GetOperationID(ctx), tracelog.GetOpUserID(ctx)) + go s.SyncJoinedGroupMemberFaceURL(ctx, req.UserInfo.UserID, req.UserInfo.FaceURL, tracelog.GetOperationID(ctx), tracelog.GetOpUserID(ctx)) } if req.UserInfo.Nickname != "" { - s.SyncJoinedGroupMemberNickname(ctx, req.UserInfo.UserID, req.UserInfo.Nickname, oldNickname, tracelog.GetOperationID(ctx), tracelog.GetOpUserID(ctx)) + go s.SyncJoinedGroupMemberNickname(ctx, req.UserInfo.UserID, req.UserInfo.Nickname, oldNickname, tracelog.GetOperationID(ctx), tracelog.GetOpUserID(ctx)) } - return &resp, nil + return resp, nil } +// ok func (s *userServer) SetGlobalRecvMessageOpt(ctx context.Context, req *pbUser.SetGlobalRecvMessageOptReq) (resp *pbUser.SetGlobalRecvMessageOptResp, err error) { resp = &pbUser.SetGlobalRecvMessageOptResp{} - - if _, err := s.Find(ctx, []string{req.UserID}); err != nil { + if _, err := s.FindWithError(ctx, []string{req.UserID}); err != nil { return nil, err } m := make(map[string]interface{}, 1) @@ -244,18 +205,22 @@ func (s *userServer) SetGlobalRecvMessageOpt(ctx context.Context, req *pbUser.Se return resp, nil } +// ok func (s *userServer) AccountCheck(ctx context.Context, req *pbUser.AccountCheckReq) (resp *pbUser.AccountCheckResp, err error) { resp = &pbUser.AccountCheckResp{} + if utils.Duplicate(req.CheckUserIDs) { + return nil, constant.ErrArgs.Wrap("userID repeated") + } err = token_verify.CheckAdmin(ctx) if err != nil { return nil, err } - user, err := s.Find(ctx, req.CheckUserIDs) + users, err := s.Find(ctx, req.CheckUserIDs) if err != nil { return nil, err } userIDs := make(map[string]interface{}, 0) - for _, v := range user { + for _, v := range users { userIDs[v.UserID] = nil } for _, v := range req.CheckUserIDs { @@ -270,25 +235,19 @@ func (s *userServer) AccountCheck(ctx context.Context, req *pbUser.AccountCheckR return resp, nil } +// ok func (s *userServer) GetPaginationUsers(ctx context.Context, req *pbUser.GetPaginationUsersReq) (resp *pbUser.GetPaginationUsersResp, err error) { resp = &pbUser.GetPaginationUsersResp{} - usersDB, total, err := s.Get(ctx, req.Pagination.PageNumber, req.Pagination.ShowNumber) + usersDB, total, err := s.Page(ctx, req.Pagination.PageNumber, req.Pagination.ShowNumber) if err != nil { return nil, err } - resp.Total = int32(total) - - for _, userDB := range usersDB { - u, err := convert.NewDBUser(userDB).Convert() - if err != nil { - return nil, err - } - resp.Users = append(resp.Users, u) - } + resp.Users, err = (*convert.DBUser)(nil).DB2PB(usersDB) return resp, nil } +// ok func (s *userServer) UserRegister(ctx context.Context, req *pbUser.UserRegisterReq) (resp *pbUser.UserRegisterResp, err error) { resp = &pbUser.UserRegisterResp{} if utils.DuplicateAny(req.Users, func(e *server_api_params.UserInfo) string { return e.UserID }) { @@ -304,7 +263,7 @@ func (s *userServer) UserRegister(ctx context.Context, req *pbUser.UserRegisterR return nil, err } if exist { - return nil, constant.ErrRegisteredAlready.Wrap("userID exist in db") + return nil, constant.ErrRegisteredAlready.Wrap("userID registered already") } users, err := (*convert.PBUser)(nil).PB2DB(req.Users) if err != nil { diff --git a/pkg/common/db/cache/black.go b/pkg/common/db/cache/black.go index 145fc1365..9ccfdbf13 100644 --- a/pkg/common/db/cache/black.go +++ b/pkg/common/db/cache/black.go @@ -1,8 +1,6 @@ package cache import ( - "Open_IM/pkg/common/db/relation" - relation2 "Open_IM/pkg/common/db/table/relation" "Open_IM/pkg/common/tracelog" "Open_IM/pkg/utils" "context" @@ -17,25 +15,30 @@ const ( blackExpireTime = time.Second * 60 * 60 * 12 ) -type BlackCache struct { - blackDB *relation2.BlackModel +type BlackCache interface { + //get blackIDs from cache + GetBlackIDs(ctx context.Context, userID string, fn func(ctx context.Context, userID string) ([]string, error)) (blackIDs []string, err error) + //del user's blackIDs cache, exec when a user's black list changed + DelBlackIDs(ctx context.Context, userID string) (err error) +} + +type BlackCacheRedis struct { expireTime time.Duration rcClient *rockscache.Client } -func NewBlackCache(rdb redis.UniversalClient, blackDB *relation.BlackGorm, options rockscache.Options) *BlackCache { - return &BlackCache{ - blackDB: blackDB, +func NewBlackCacheRedis(rdb redis.UniversalClient, blackDB BlackCache, options rockscache.Options) *BlackCacheRedis { + return &BlackCacheRedis{ expireTime: blackExpireTime, rcClient: rockscache.NewClient(rdb, options), } } -func (b *BlackCache) getBlackIDsKey(ownerUserID string) string { +func (b *BlackCacheRedis) getBlackIDsKey(ownerUserID string) string { return blackIDsKey + ownerUserID } -func (b *BlackCache) GetBlackIDs(ctx context.Context, userID string) (blackIDs []string, err error) { +func (b *BlackCacheRedis) GetBlackIDs(ctx context.Context, userID string) (blackIDs []string, err error) { getBlackIDList := func() (string, error) { blackIDs, err := b.blackDB.GetBlackIDs(ctx, userID) if err != nil { @@ -58,7 +61,7 @@ func (b *BlackCache) GetBlackIDs(ctx context.Context, userID string) (blackIDs [ return blackIDs, utils.Wrap(err, "") } -func (b *BlackCache) DelBlackIDListFromCache(ctx context.Context, userID string) (err error) { +func (b *BlackCacheRedis) DelBlackIDs(ctx context.Context, userID string) (err error) { defer func() { tracelog.SetCtxDebug(ctx, utils.GetFuncName(1), err, "userID", userID) }() diff --git a/pkg/common/db/cache/conversation.go b/pkg/common/db/cache/conversation.go index 19ce8449e..3a6b0e0ce 100644 --- a/pkg/common/db/cache/conversation.go +++ b/pkg/common/db/cache/conversation.go @@ -2,7 +2,7 @@ package cache import ( "Open_IM/pkg/common/db/relation" - relation2 "Open_IM/pkg/common/db/table/relation" + relationTb "Open_IM/pkg/common/db/table/relation" "Open_IM/pkg/common/tracelog" "Open_IM/pkg/utils" "context" @@ -13,18 +13,39 @@ import ( "time" ) -type DBFun func() (string, error) +const ( + conversationKey = "CONVERSATION:" + conversationIDsKey = "CONVERSATION_IDS:" + recvMsgOptKey = "RECV_MSG_OPT:" + superGroupRecvMsgNotNotifyUserIDsKey = "SUPER_GROUP_RECV_MSG_NOT_NOTIFY_USER_IDS:" + conversationExpireTime = time.Second * 60 * 60 * 12 +) type ConversationCache interface { - GetUserConversationIDListFromCache(userID string, fn DBFun) ([]string, error) - DelUserConversationIDListFromCache(userID string) error - GetConversationFromCache(ownerUserID, conversationID string, fn DBFun) (*table.ConversationModel, error) - GetConversationsFromCache(ownerUserID string, conversationIDList []string, fn DBFun) ([]*table.ConversationModel, error) - GetUserAllConversationList(ownerUserID string, fn DBFun) ([]*table.ConversationModel, error) - DelConversationFromCache(ownerUserID, conversationID string) error - - GetUserConversationIDs(ctx context.Context, ownerUserID string, f func(ctx context.Context, userID string) ([]string, error)) ([]string, error) + // get user's conversationIDs from cache + GetUserConversationIDs(ctx context.Context, userID string, fn func(ctx context.Context, userID string) ([]string, error)) ([]string, error) + // del user's conversationIDs from cache, call when a user add or reduce a conversation + DelUserConversationIDs(ctx context.Context, userID string) error + // get one conversation from cache + GetConversation(ctx context.Context, ownerUserID, conversationID string, fn func(ctx context.Context, ownerUserID, conversationID string) (*relationTb.ConversationModel, error)) (*relationTb.ConversationModel, error) + // get one conversation from cache + GetConversations(ctx context.Context, ownerUserID string, conversationIDs []string, fn func(ctx context.Context, ownerUserID, conversationIDs []string) ([]*relationTb.ConversationModel, error)) ([]*relationTb.ConversationModel, error) + // get one user's all conversations from cache + GetUserAllConversations(ctx context.Context, ownerUserID string, fn func(ctx context.Context, ownerUserIDs string) ([]*relationTb.ConversationModel, error)) ([]*relationTb.ConversationModel, error) + // del one conversation from cache, call when one user's conversation Info changed + DelConversation(ctx context.Context, ownerUserID, conversationID string) error + // get user conversation recv msg from cache + GetUserRecvMsgOpt(ctx context.Context, ownerUserID, conversationID string, fn func(ctx context.Context, ownerUserID, conversationID string) (opt int, err error)) (opt int, err error) + // del user recv msg opt from cache, call when user's conversation recv msg opt changed + DelUserRecvMsgOpt(ctx context.Context, ownerUserID, conversationID string) error + // get one super group recv msg but do not notification userID list + GetSuperGroupRecvMsgNotNotifyUserIDs(ctx context.Context, groupID string, fn func(ctx context.Context, groupID string) (userIDs []string, err error)) (userIDs []string, err error) + // del one super group recv msg but do not notification userID list, call it when this list changed + DelSuperGroupRecvMsgNotNotifyUserIDs(ctx context.Context, groupID string) error + //GetSuperGroupRecvMsgNotNotifyUserIDsHash(ctx context.Context, groupID string) (hash uint32, err error) + //DelSuperGroupRecvMsgNotNotifyUserIDsHash(ctx context.Context, groupID string) } + type ConversationRedis struct { rcClient *rockscache.Client } @@ -33,27 +54,27 @@ func NewConversationRedis(rcClient *rockscache.Client) *ConversationRedis { return &ConversationRedis{rcClient: rcClient} } -func NewConversationCache(rdb redis.UniversalClient, conversationDB *relation.ConversationGorm, options rockscache.Options) *ConversationCache { - return &ConversationCache{conversationDB: conversationDB, expireTime: conversationExpireTime, rcClient: rockscache.NewClient(rdb, options)} +func NewNewConversationRedis(rdb redis.UniversalClient, conversationDB *relation.ConversationGorm, options rockscache.Options) *ConversationRedis { + return &ConversationRedis{conversationDB: conversationDB, expireTime: conversationExpireTime, rcClient: rockscache.NewClient(rdb, options)} } -func (c *ConversationCache) getConversationKey(ownerUserID, conversationID string) string { +func (c *ConversationRedis) getConversationKey(ownerUserID, conversationID string) string { return conversationKey + ownerUserID + ":" + conversationID } -func (c *ConversationCache) getConversationIDsKey(ownerUserID string) string { +func (c *ConversationRedis) getConversationIDsKey(ownerUserID string) string { return conversationIDsKey + ownerUserID } -func (c *ConversationCache) getRecvMsgOptKey(ownerUserID, conversationID string) string { +func (c *ConversationRedis) getRecvMsgOptKey(ownerUserID, conversationID string) string { return recvMsgOptKey + ownerUserID + ":" + conversationID } -func (c *ConversationCache) getSuperGroupRecvNotNotifyUserIDsKey(groupID string) string { +func (c *ConversationRedis) getSuperGroupRecvNotNotifyUserIDsKey(groupID string) string { return superGroupRecvMsgNotNotifyUserIDsKey + groupID } -func (c *ConversationCache) GetUserConversationIDs(ctx context.Context, ownerUserID string, f func(ctx context.Context, userID string) ([]string, error)) (conversationIDs []string, err error) { +func (c *ConversationRedis) GetUserConversationIDs(ctx context.Context, ownerUserID string, f func(userID string) ([]string, error)) (conversationIDs []string, err error) { //getConversationIDs := func() (string, error) { // conversationIDs, err := relation.GetConversationIDsByUserID(ownerUserID) // if err != nil { @@ -79,7 +100,7 @@ func (c *ConversationCache) GetUserConversationIDs(ctx context.Context, ownerUse }) } -func (c *ConversationCache) GetUserConversationIDs1(ctx context.Context, ownerUserID string) (conversationIDs []string, err error) { +func (c *ConversationRedis) GetUserConversationIDs1(ctx context.Context, ownerUserID string) (conversationIDs []string, err error) { //getConversationIDs := func() (string, error) { // conversationIDs, err := relation.GetConversationIDsByUserID(ownerUserID) // if err != nil { @@ -149,14 +170,14 @@ func GetCache[T any](ctx context.Context, rcClient *rockscache.Client, key strin return t, nil } -func (c *ConversationCache) DelUserConversationIDs(ctx context.Context, ownerUserID string) (err error) { +func (c *ConversationRedis) DelUserConversationIDs(ctx context.Context, ownerUserID string) (err error) { defer func() { tracelog.SetCtxDebug(ctx, utils.GetFuncName(1), err, "ownerUserID", ownerUserID) }() return utils.Wrap(c.rcClient.TagAsDeleted(c.getConversationIDsKey(ownerUserID)), "DelUserConversationIDs err") } -func (c *ConversationCache) GetConversation(ctx context.Context, ownerUserID, conversationID string) (conversation *relation2.ConversationModel, err error) { +func (c *ConversationRedis) GetConversation(ctx context.Context, ownerUserID, conversationID string) (conversation *relationTb.Conversation, err error) { getConversation := func() (string, error) { conversation, err := relation.GetConversation(ownerUserID, conversationID) if err != nil { @@ -175,19 +196,19 @@ func (c *ConversationCache) GetConversation(ctx context.Context, ownerUserID, co if err != nil { return nil, err } - conversation = &relation2.ConversationModel{} + conversation = &relationTb.ConversationModel{} err = json.Unmarshal([]byte(conversationStr), &conversation) return conversation, utils.Wrap(err, "Unmarshal failed") } -func (c *ConversationCache) DelConversation(ctx context.Context, ownerUserID, conversationID string) (err error) { +func (c *ConversationRedis) DelConversation(ctx context.Context, ownerUserID, conversationID string) (err error) { defer func() { tracelog.SetCtxDebug(ctx, utils.GetFuncName(1), err, "ownerUserID", ownerUserID, "conversationID", conversationID) }() return utils.Wrap(c.rcClient.TagAsDeleted(c.getConversationKey(ownerUserID, conversationID)), "DelConversation err") } -func (c *ConversationCache) GetConversations(ctx context.Context, ownerUserID string, conversationIDs []string) (conversations []relation2.ConversationModel, err error) { +func (c *ConversationRedis) GetConversations(ctx context.Context, ownerUserID string, conversationIDs []string) (conversations []relationTb.ConversationModel, err error) { defer func() { tracelog.SetCtxDebug(ctx, utils.GetFuncName(1), err, "ownerUserID", ownerUserID, "conversationIDs", conversationIDs, "conversations", conversations) }() @@ -201,7 +222,7 @@ func (c *ConversationCache) GetConversations(ctx context.Context, ownerUserID st return conversations, nil } -func (c *ConversationCache) GetUserAllConversations(ctx context.Context, ownerUserID string) (conversations []relation2.ConversationModel, err error) { +func (c *ConversationRedis) GetUserAllConversations(ctx context.Context, ownerUserID string) (conversations []relationTb.ConversationModel, err error) { defer func() { tracelog.SetCtxDebug(ctx, utils.GetFuncName(1), err, "ownerUserID", ownerUserID, "conversations", conversations) }() @@ -209,7 +230,7 @@ func (c *ConversationCache) GetUserAllConversations(ctx context.Context, ownerUs if err != nil { return nil, err } - var conversationIDs []relation2.ConversationModel + var conversationIDs []relationTb.ConversationModel for _, conversationID := range IDs { conversation, err := c.GetConversation(ctx, ownerUserID, conversationID) if err != nil { @@ -220,7 +241,7 @@ func (c *ConversationCache) GetUserAllConversations(ctx context.Context, ownerUs return conversationIDs, nil } -func (c *ConversationCache) GetUserRecvMsgOpt(ctx context.Context, ownerUserID, conversationID string) (opt int, err error) { +func (c *ConversationRedis) GetUserRecvMsgOpt(ctx context.Context, ownerUserID, conversationID string) (opt int, err error) { getConversation := func() (string, error) { conversation, err := relation.GetConversation(ownerUserID, conversationID) if err != nil { @@ -238,22 +259,22 @@ func (c *ConversationCache) GetUserRecvMsgOpt(ctx context.Context, ownerUserID, return strconv.Atoi(optStr) } -func (c *ConversationCache) DelUserRecvMsgOpt(ctx context.Context, ownerUserID, conversationID string) error { +func (c *ConversationRedis) DelUserRecvMsgOpt(ctx context.Context, ownerUserID, conversationID string) error { return utils.Wrap(c.rcClient.TagAsDeleted(c.getConversationKey(ownerUserID, conversationID)), "DelUserRecvMsgOpt failed") } -func (c *ConversationCache) GetSuperGroupRecvMsgNotNotifyUserIDs(ctx context.Context, groupID string) (userIDs []string, err error) { +func (c *ConversationRedis) GetSuperGroupRecvMsgNotNotifyUserIDs(ctx context.Context, groupID string) (userIDs []string, err error) { return nil, nil } -func (c *ConversationCache) DelSuperGroupRecvMsgNotNotifyUserIDs(ctx context.Context, groupID string) (err error) { +func (c *ConversationRedis) DelSuperGroupRecvMsgNotNotifyUserIDs(ctx context.Context, groupID string) (err error) { return nil } -func (c *ConversationCache) GetSuperGroupRecvMsgNotNotifyUserIDsHash(ctx context.Context, groupID string) (hash uint32, err error) { +func (c *ConversationRedis) GetSuperGroupRecvMsgNotNotifyUserIDsHash(ctx context.Context, groupID string) (hash uint32, err error) { return } -func (c *ConversationCache) DelSuperGroupRecvMsgNotNotifyUserIDsHash(ctx context.Context, groupID string) { +func (c *ConversationRedis) DelSuperGroupRecvMsgNotNotifyUserIDsHash(ctx context.Context, groupID string) { return } diff --git a/pkg/common/db/cache/friend.go b/pkg/common/db/cache/friend.go index 4d70b1ff6..fb6c5c8c4 100644 --- a/pkg/common/db/cache/friend.go +++ b/pkg/common/db/cache/friend.go @@ -2,7 +2,7 @@ package cache import ( "Open_IM/pkg/common/db/relation" - relation2 "Open_IM/pkg/common/db/table/relation" + relationTb "Open_IM/pkg/common/db/table/relation" "Open_IM/pkg/common/tracelog" "Open_IM/pkg/utils" "context" @@ -19,33 +19,42 @@ const ( friendKey = "FRIEND_INFO:" ) -type FriendCache struct { +type FriendCache interface { + GetFriendIDs(ctx context.Context, ownerUserID string, fn func(ctx context.Context, ownerUserID string) (friendIDs []string, err error)) (friendIDs []string, err error) + // call when friendID List changed + DelFriendIDs(ctx context.Context, ownerUserID string) (err error) + GetFriend(ctx context.Context, ownerUserID, friendUserID string, fn func(ctx context.Context, ownerUserID, friendUserID string) (friend *relationTb.FriendModel, err error)) (friend *relationTb.FriendModel, err error) + // del friend when friend info changed or remove it + DelFriend(ctx context.Context, ownerUserID, friendUserID string) (err error) +} + +type FriendCacheRedis struct { friendDB *relation.FriendGorm expireTime time.Duration rcClient *rockscache.Client } -func NewFriendCache(rdb redis.UniversalClient, friendDB *relation.FriendGorm, options rockscache.Options) *FriendCache { - return &FriendCache{ +func NewFriendCacheRedis(rdb redis.UniversalClient, friendDB *relation.FriendGorm, options rockscache.Options) *FriendCacheRedis { + return &FriendCacheRedis{ friendDB: friendDB, expireTime: friendExpireTime, rcClient: rockscache.NewClient(rdb, options), } } -func (f *FriendCache) getFriendIDsKey(ownerUserID string) string { +func (f *FriendCacheRedis) getFriendIDsKey(ownerUserID string) string { return friendIDsKey + ownerUserID } -func (f *FriendCache) getTwoWayFriendsIDsKey(ownerUserID string) string { +func (f *FriendCacheRedis) getTwoWayFriendsIDsKey(ownerUserID string) string { return TwoWayFriendsIDsKey + ownerUserID } -func (f *FriendCache) getFriendKey(ownerUserID, friendUserID string) string { +func (f *FriendCacheRedis) getFriendKey(ownerUserID, friendUserID string) string { return friendKey + ownerUserID + "-" + friendUserID } -func (f *FriendCache) GetFriendIDs(ctx context.Context, ownerUserID string) (friendIDs []string, err error) { +func (f *FriendCacheRedis) GetFriendIDs(ctx context.Context, ownerUserID string) (friendIDs []string, err error) { getFriendIDs := func() (string, error) { friendIDs, err := f.friendDB.GetFriendIDs(ctx, ownerUserID) if err != nil { @@ -68,14 +77,14 @@ func (f *FriendCache) GetFriendIDs(ctx context.Context, ownerUserID string) (fri return friendIDs, utils.Wrap(err, "") } -func (f *FriendCache) DelFriendIDs(ctx context.Context, ownerUserID string) (err error) { +func (f *FriendCacheRedis) DelFriendIDs(ctx context.Context, ownerUserID string) (err error) { defer func() { tracelog.SetCtxDebug(ctx, utils.GetFuncName(1), err, "ownerUserID", ownerUserID) }() return f.rcClient.TagAsDeleted(f.getFriendIDsKey(ownerUserID)) } -func (f *FriendCache) GetTwoWayFriendIDs(ctx context.Context, ownerUserID string) (twoWayFriendIDs []string, err error) { +func (f *FriendCacheRedis) GetTwoWayFriendIDs(ctx context.Context, ownerUserID string) (twoWayFriendIDs []string, err error) { friendIDs, err := f.GetFriendIDs(ctx, ownerUserID) if err != nil { return nil, err @@ -92,14 +101,14 @@ func (f *FriendCache) GetTwoWayFriendIDs(ctx context.Context, ownerUserID string return twoWayFriendIDs, nil } -func (f *FriendCache) DelTwoWayFriendIDs(ctx context.Context, ownerUserID string) (err error) { +func (f *FriendCacheRedis) DelTwoWayFriendIDs(ctx context.Context, ownerUserID string) (err error) { defer func() { tracelog.SetCtxDebug(ctx, utils.GetFuncName(1), err, "ownerUserID", ownerUserID) }() return f.rcClient.TagAsDeleted(f.getTwoWayFriendsIDsKey(ownerUserID)) } -func (f *FriendCache) GetFriend(ctx context.Context, ownerUserID, friendUserID string) (friend *relation2.FriendModel, err error) { +func (f *FriendCacheRedis) GetFriend(ctx context.Context, ownerUserID, friendUserID string) (friend *relationTb.FriendModel, err error) { getFriend := func() (string, error) { friend, err = f.friendDB.Take(ctx, ownerUserID, friendUserID) if err != nil { @@ -115,12 +124,12 @@ func (f *FriendCache) GetFriend(ctx context.Context, ownerUserID, friendUserID s if err != nil { return nil, err } - friend = &relation2.FriendModel{} + friend = &relationTb.FriendModel{} err = json.Unmarshal([]byte(friendStr), friend) return friend, utils.Wrap(err, "") } -func (f *FriendCache) DelFriend(ctx context.Context, ownerUserID, friendUserID string) (err error) { +func (f *FriendCacheRedis) DelFriend(ctx context.Context, ownerUserID, friendUserID string) (err error) { defer func() { tracelog.SetCtxDebug(ctx, utils.GetFuncName(1), err, "ownerUserID", ownerUserID, "friendUserID", friendUserID) }() diff --git a/pkg/common/db/cache/group.go b/pkg/common/db/cache/group.go index 2a75d97c4..f7e267360 100644 --- a/pkg/common/db/cache/group.go +++ b/pkg/common/db/cache/group.go @@ -3,7 +3,7 @@ package cache import ( "Open_IM/pkg/common/constant" "Open_IM/pkg/common/db/relation" - relation2 "Open_IM/pkg/common/db/table/relation" + relationTb "Open_IM/pkg/common/db/table/relation" "Open_IM/pkg/common/db/unrelation" "Open_IM/pkg/common/tracelog" "Open_IM/pkg/utils" @@ -28,18 +28,25 @@ const ( groupMemberNumKey = "GROUP_MEMBER_NUM_CACHE:" ) -type GroupCache struct { - group relation2.GroupModelInterface - groupMember relation2.GroupMemberModelInterface - groupRequest relation2.GroupRequestModelInterface +type GroupCache interface { + GetGroupsInfo(ctx context.Context, groupIDs []string, fn func(ctx context.Context, groupIDs []string) (groups []*relationTb.GroupModel, err error)) (groups []*relationTb.GroupModel, err error) + DelGroupsInfo(ctx context.Context, groupID string) (err error) + GetGroupInfo(ctx context.Context, groupID string, fn func(ctx context.Context, groupID string) (group *relationTb.GroupModel, err error)) (group *relationTb.GroupModel, err error) + DelGroupInfo(ctx context.Context, groupID string) (err error) +} + +type GroupCacheRedis struct { + group *relation.GroupGorm + groupMember *relation.GroupMemberGorm + groupRequest *relation.GroupRequestGorm mongoDB *unrelation.SuperGroupMongoDriver expireTime time.Duration redisClient *RedisClient rcClient *rockscache.Client } -func NewGroupCache(rdb redis.UniversalClient, groupDB relation2.GroupModelInterface, groupMemberDB relation2.GroupMemberModelInterface, groupRequestDB relation2.GroupRequestModelInterface, mongoClient *unrelation.SuperGroupMongoDriver, opts rockscache.Options) *GroupCache { - return &GroupCache{rcClient: rockscache.NewClient(rdb, opts), expireTime: groupExpireTime, +func NewGroupCacheRedis(rdb redis.UniversalClient, groupDB *relation.GroupGorm, groupMemberDB *relation.GroupMemberGorm, groupRequestDB *relation.GroupRequestGorm, mongoClient *unrelation.SuperGroupMongoDriver, opts rockscache.Options) *GroupCacheRedis { + return &GroupCacheRedis{rcClient: rockscache.NewClient(rdb, opts), expireTime: groupExpireTime, group: groupDB, groupMember: groupMemberDB, groupRequest: groupRequestDB, redisClient: NewRedisClient(rdb), mongoDB: mongoClient, } diff --git a/pkg/common/db/cache/user.go b/pkg/common/db/cache/user.go index 92a5e9168..58186453f 100644 --- a/pkg/common/db/cache/user.go +++ b/pkg/common/db/cache/user.go @@ -2,7 +2,7 @@ package cache import ( "Open_IM/pkg/common/db/relation" - relation2 "Open_IM/pkg/common/db/table/relation" + relationTb "Open_IM/pkg/common/db/table/relation" "Open_IM/pkg/common/tracelog" "Open_IM/pkg/utils" "context" @@ -44,7 +44,7 @@ func (u *UserCache) getUserGlobalRecvMsgOptKey(userID string) string { return userGlobalRecvMsgOptKey + userID } -func (u *UserCache) GetUserInfo(ctx context.Context, userID string) (userInfo *relation2.UserModel, err error) { +func (u *UserCache) GetUserInfo(ctx context.Context, userID string) (userInfo *relationTb.UserModel, err error) { getUserInfo := func() (string, error) { userInfo, err := u.userDB.Take(ctx, userID) if err != nil { @@ -63,13 +63,13 @@ func (u *UserCache) GetUserInfo(ctx context.Context, userID string) (userInfo *r if err != nil { return nil, err } - userInfo = &relation2.UserModel{} + userInfo = &relationTb.UserModel{} err = json.Unmarshal([]byte(userInfoStr), userInfo) return userInfo, utils.Wrap(err, "") } -func (u *UserCache) GetUsersInfo(ctx context.Context, userIDs []string) ([]*relation2.UserModel, error) { - var users []*relation2.UserModel +func (u *UserCache) GetUsersInfo(ctx context.Context, userIDs []string) ([]*relationTb.UserModel, error) { + var users []*relationTb.UserModel for _, userID := range userIDs { user, err := GetUserInfoFromCache(ctx, userID) if err != nil { diff --git a/pkg/common/db/controller/black.go b/pkg/common/db/controller/black.go index 1695ed624..6897b4974 100644 --- a/pkg/common/db/controller/black.go +++ b/pkg/common/db/controller/black.go @@ -1,6 +1,7 @@ package controller import ( + relation2 "Open_IM/pkg/common/db/relation" "Open_IM/pkg/common/db/table/relation" "context" "errors" @@ -27,17 +28,17 @@ func NewBlackController(db *gorm.DB) *BlackController { } // Create 增加黑名单 -func (b *BlackController) Create(ctx context.Context, blacks []*relation.Black) (err error) { +func (b *BlackController) Create(ctx context.Context, blacks []*relation.BlackModel) (err error) { return b.database.Create(ctx, blacks) } // Delete 删除黑名单 -func (b *BlackController) Delete(ctx context.Context, blacks []*relation.Black) (err error) { +func (b *BlackController) Delete(ctx context.Context, blacks []*relation.BlackModel) (err error) { return b.database.Delete(ctx, blacks) } // FindOwnerBlacks 获取黑名单列表 -func (b *BlackController) FindOwnerBlacks(ctx context.Context, ownerUserID string, pageNumber, showNumber int32) (blackList []*relation.Black, total int64, err error) { +func (b *BlackController) FindOwnerBlacks(ctx context.Context, ownerUserID string, pageNumber, showNumber int32) (blackList []*relation.BlackModel, total int64, err error) { return b.database.FindOwnerBlacks(ctx, ownerUserID, pageNumber, showNumber) } @@ -48,21 +49,21 @@ func (b *BlackController) CheckIn(ctx context.Context, userID1, userID2 string) type BlackDatabaseInterface interface { // Create 增加黑名单 - Create(ctx context.Context, blacks []*relation.Black) (err error) + Create(ctx context.Context, blacks []*relation.BlackModel) (err error) // Delete 删除黑名单 - Delete(ctx context.Context, blacks []*relation.Black) (err error) + Delete(ctx context.Context, blacks []*relation.BlackModel) (err error) // FindOwnerBlacks 获取黑名单列表 - FindOwnerBlacks(ctx context.Context, ownerUserID string, pageNumber, showNumber int32) (blacks []*relation.Black, total int64, err error) + FindOwnerBlacks(ctx context.Context, ownerUserID string, pageNumber, showNumber int32) (blacks []*relation.BlackModel, total int64, err error) // CheckIn 检查user2是否在user1的黑名单列表中(inUser1Blacks==true) 检查user1是否在user2的黑名单列表中(inUser2Blacks==true) CheckIn(ctx context.Context, userID1, userID2 string) (inUser1Blacks bool, inUser2Blacks bool, err error) } type BlackDatabase struct { - sqlDB *relation.Black + sqlDB *relation2.BlackGorm } func NewBlackDatabase(db *gorm.DB) *BlackDatabase { - sqlDB := relation.NewBlack(db) + sqlDB := relation2.NewBlackGorm(db) database := &BlackDatabase{ sqlDB: sqlDB, } @@ -70,17 +71,17 @@ func NewBlackDatabase(db *gorm.DB) *BlackDatabase { } // Create 增加黑名单 -func (b *BlackDatabase) Create(ctx context.Context, blacks []*relation.Black) (err error) { +func (b *BlackDatabase) Create(ctx context.Context, blacks []*relation.BlackModel) (err error) { return b.sqlDB.Create(ctx, blacks) } // Delete 删除黑名单 -func (b *BlackDatabase) Delete(ctx context.Context, blacks []*relation.Black) (err error) { +func (b *BlackDatabase) Delete(ctx context.Context, blacks []*relation.BlackModel) (err error) { return b.sqlDB.Delete(ctx, blacks) } // FindOwnerBlacks 获取黑名单列表 -func (b *BlackDatabase) FindOwnerBlacks(ctx context.Context, ownerUserID string, pageNumber, showNumber int32) (blacks []*relation.Black, total int64, err error) { +func (b *BlackDatabase) FindOwnerBlacks(ctx context.Context, ownerUserID string, pageNumber, showNumber int32) (blacks []*relation.BlackModel, total int64, err error) { return b.sqlDB.FindOwnerBlacks(ctx, ownerUserID, pageNumber, showNumber) } diff --git a/pkg/common/db/controller/conversation.go b/pkg/common/db/controller/conversation.go index 3ed221c45..c5fe8bc3b 100644 --- a/pkg/common/db/controller/conversation.go +++ b/pkg/common/db/controller/conversation.go @@ -3,7 +3,7 @@ package controller import ( "Open_IM/pkg/common/db/cache" "Open_IM/pkg/common/db/relation" - "Open_IM/pkg/common/db/table" + relationTb "Open_IM/pkg/common/db/table/relation" "context" ) @@ -13,15 +13,15 @@ type ConversationInterface interface { //UpdateUserConversationFiled 更新用户该会话的属性信息 UpdateUsersConversationFiled(ctx context.Context, UserIDList []string, conversationID string, args map[string]interface{}) error //CreateConversation 创建一批新的会话 - CreateConversation(ctx context.Context, conversations []*table.ConversationModel) error + CreateConversation(ctx context.Context, conversations []*relationTb.ConversationModel) error //SyncPeerUserPrivateConversation 同步对端私聊会话内部保证事务操作 - SyncPeerUserPrivateConversationTx(ctx context.Context, conversation *table.ConversationModel) error + SyncPeerUserPrivateConversationTx(ctx context.Context, conversation *relationTb.ConversationModel) error //FindConversations 根据会话ID获取某个用户的多个会话 - FindConversations(ctx context.Context, ownerUserID string, conversationID []string) ([]*table.ConversationModel, error) + FindConversations(ctx context.Context, ownerUserID string, conversationID []string) ([]*relationTb.ConversationModel, error) //GetUserAllConversation 获取一个用户在服务器上所有的会话 - GetUserAllConversation(ctx context.Context, ownerUserID string) ([]*table.ConversationModel, error) + GetUserAllConversation(ctx context.Context, ownerUserID string) ([]*relationTb.ConversationModel, error) //SetUserConversations 设置用户多个会话属性,如果会话不存在则创建,否则更新,内部保证原子性 - SetUserConversations(ctx context.Context, ownerUserID string, conversations []*table.ConversationModel) error + SetUserConversations(ctx context.Context, ownerUserID string, conversations []*relationTb.ConversationModel) error } type ConversationController struct { database ConversationDataBaseInterface @@ -39,22 +39,22 @@ func (c ConversationController) UpdateUsersConversationFiled(ctx context.Context panic("implement me") } -func (c ConversationController) CreateConversation(ctx context.Context, conversations []*table.ConversationModel) error { +func (c ConversationController) CreateConversation(ctx context.Context, conversations []*relationTb.ConversationModel) error { panic("implement me") } -func (c ConversationController) SyncPeerUserPrivateConversationTx(ctx context.Context, conversation *table.ConversationModel) error { +func (c ConversationController) SyncPeerUserPrivateConversationTx(ctx context.Context, conversation *relationTb.ConversationModel) error { panic("implement me") } -func (c ConversationController) FindConversations(ctx context.Context, ownerUserID string, conversationID []string) ([]*table.ConversationModel, error) { +func (c ConversationController) FindConversations(ctx context.Context, ownerUserID string, conversationID []string) ([]*relationTb.ConversationModel, error) { panic("implement me") } -func (c ConversationController) GetUserAllConversation(ctx context.Context, ownerUserID string) ([]*table.ConversationModel, error) { +func (c ConversationController) GetUserAllConversation(ctx context.Context, ownerUserID string) ([]*relationTb.ConversationModel, error) { panic("implement me") } -func (c ConversationController) SetUserConversations(ctx context.Context, ownerUserID string, conversations []*table.ConversationModel) error { +func (c ConversationController) SetUserConversations(ctx context.Context, ownerUserID string, conversations []*relationTb.ConversationModel) error { panic("implement me") } @@ -66,15 +66,15 @@ type ConversationDataBaseInterface interface { //UpdateUserConversationFiled 更新用户该会话的属性信息 UpdateUsersConversationFiled(ctx context.Context, UserIDList []string, conversationID string, args map[string]interface{}) error //CreateConversation 创建一批新的会话 - CreateConversation(ctx context.Context, conversations []*table.ConversationModel) error + CreateConversation(ctx context.Context, conversations []*relationTb.ConversationModel) error //SyncPeerUserPrivateConversation 同步对端私聊会话内部保证事务操作 - SyncPeerUserPrivateConversationTx(ctx context.Context, conversation *table.ConversationModel) error + SyncPeerUserPrivateConversationTx(ctx context.Context, conversation *relationTb.ConversationModel) error //FindConversations 根据会话ID获取某个用户的多个会话 - FindConversations(ctx context.Context, ownerUserID string, conversationID []string) ([]*table.ConversationModel, error) + FindConversations(ctx context.Context, ownerUserID string, conversationID []string) ([]*relationTb.ConversationModel, error) //GetUserAllConversation 获取一个用户在服务器上所有的会话 - GetUserAllConversation(ctx context.Context, ownerUserID string) ([]*table.ConversationModel, error) + GetUserAllConversation(ctx context.Context, ownerUserID string) ([]*relationTb.ConversationModel, error) //SetUserConversations 设置用户多个会话属性,如果会话不存在则创建,否则更新,内部保证原子性 - SetUserConversations(ctx context.Context, ownerUserID string, conversations []*table.ConversationModel) error + SetUserConversations(ctx context.Context, ownerUserID string, conversations []*relationTb.ConversationModel) error } type ConversationDataBase struct { db relation.Conversation @@ -89,23 +89,23 @@ func (c ConversationDataBase) UpdateUsersConversationFiled(ctx context.Context, panic("implement me") } -func (c ConversationDataBase) CreateConversation(ctx context.Context, conversations []*table.ConversationModel) error { +func (c ConversationDataBase) CreateConversation(ctx context.Context, conversations []*relationTb.ConversationModel) error { panic("implement me") } -func (c ConversationDataBase) SyncPeerUserPrivateConversationTx(ctx context.Context, conversation *table.ConversationModel) error { +func (c ConversationDataBase) SyncPeerUserPrivateConversationTx(ctx context.Context, conversation *relationTb.ConversationModel) error { panic("implement me") } -func (c ConversationDataBase) FindConversations(ctx context.Context, ownerUserID string, conversationID []string) ([]*table.ConversationModel, error) { +func (c ConversationDataBase) FindConversations(ctx context.Context, ownerUserID string, conversationID []string) ([]*relationTb.ConversationModel, error) { panic("implement me") } -func (c ConversationDataBase) GetUserAllConversation(ctx context.Context, ownerUserID string) ([]*table.ConversationModel, error) { +func (c ConversationDataBase) GetUserAllConversation(ctx context.Context, ownerUserID string) ([]*relationTb.ConversationModel, error) { panic("implement me") } -func (c ConversationDataBase) SetUserConversations(ctx context.Context, ownerUserID string, conversations []*table.ConversationModel) error { +func (c ConversationDataBase) SetUserConversations(ctx context.Context, ownerUserID string, conversations []*relationTb.ConversationModel) error { panic("implement me") } diff --git a/pkg/common/db/controller/friend.go b/pkg/common/db/controller/friend.go index f236f31d3..ea322a9d3 100644 --- a/pkg/common/db/controller/friend.go +++ b/pkg/common/db/controller/friend.go @@ -26,15 +26,15 @@ type FriendInterface interface { // 更新好友备注 零值也支持 UpdateRemark(ctx context.Context, ownerUserID, friendUserID, remark string) (err error) // 获取ownerUserID的好友列表 无结果不返回错误 - FindOwnerFriends(ctx context.Context, ownerUserID string, pageNumber, showNumber int32) (friends []*relation.FriendModel, total int64, err error) + PageOwnerFriends(ctx context.Context, ownerUserID string, pageNumber, showNumber int32) (friends []*relation.FriendModel, total int64, err error) // friendUserID在哪些人的好友列表中 - FindInWhoseFriends(ctx context.Context, friendUserID string, pageNumber, showNumber int32) (friends []*relation.FriendModel, total int64, err error) + PageInWhoseFriends(ctx context.Context, friendUserID string, pageNumber, showNumber int32) (friends []*relation.FriendModel, total int64, err error) // 获取我发出去的好友申请 无结果不返回错误 - FindFriendRequestFromMe(ctx context.Context, userID string, pageNumber, showNumber int32) (friends []*relation.FriendRequestModel, total int64, err error) + PageFriendRequestFromMe(ctx context.Context, userID string, pageNumber, showNumber int32) (friends []*relation.FriendRequestModel, total int64, err error) // 获取我收到的的好友申请 无结果不返回错误 - FindFriendRequestToMe(ctx context.Context, userID string, pageNumber, showNumber int32) (friends []*relation.FriendRequestModel, total int64, err error) + PageFriendRequestToMe(ctx context.Context, userID string, pageNumber, showNumber int32) (friends []*relation.FriendRequestModel, total int64, err error) // 获取某人指定好友的信息 如果有一个不存在也返回错误 - FindFriends(ctx context.Context, ownerUserID string, friendUserIDs []string) (friends []*relation.FriendModel, err error) + FindFriendsWithError(ctx context.Context, ownerUserID string, friendUserIDs []string) (friends []*relation.FriendModel, err error) } type FriendController struct { @@ -81,28 +81,28 @@ func (f *FriendController) UpdateRemark(ctx context.Context, ownerUserID, friend } // FindOwnerFriends 获取ownerUserID的好友列表 -func (f *FriendController) FindOwnerFriends(ctx context.Context, ownerUserID string, pageNumber, showNumber int32) (friends []*relation.FriendModel, total int64, err error) { - return f.database.FindOwnerFriends(ctx, ownerUserID, pageNumber, showNumber) +func (f *FriendController) PageOwnerFriends(ctx context.Context, ownerUserID string, pageNumber, showNumber int32) (friends []*relation.FriendModel, total int64, err error) { + return f.database.PageOwnerFriends(ctx, ownerUserID, pageNumber, showNumber) } // FindInWhoseFriends friendUserID在哪些人的好友列表中 -func (f *FriendController) FindInWhoseFriends(ctx context.Context, friendUserID string, pageNumber, showNumber int32) (friends []*relation.FriendModel, total int64, err error) { - return f.database.FindInWhoseFriends(ctx, friendUserID, pageNumber, showNumber) +func (f *FriendController) PageInWhoseFriends(ctx context.Context, friendUserID string, pageNumber, showNumber int32) (friends []*relation.FriendModel, total int64, err error) { + return f.database.PageInWhoseFriends(ctx, friendUserID, pageNumber, showNumber) } // FindFriendRequestFromMe 获取我发出去的好友申请 -func (f *FriendController) FindFriendRequestFromMe(ctx context.Context, userID string, pageNumber, showNumber int32) (friends []*relation.FriendRequestModel, total int64, err error) { - return f.database.FindFriendRequestFromMe(ctx, userID, pageNumber, showNumber) +func (f *FriendController) PageFriendRequestFromMe(ctx context.Context, userID string, pageNumber, showNumber int32) (friends []*relation.FriendRequestModel, total int64, err error) { + return f.database.PageFriendRequestFromMe(ctx, userID, pageNumber, showNumber) } // FindFriendRequestToMe 获取我收到的的好友申请 -func (f *FriendController) FindFriendRequestToMe(ctx context.Context, userID string, pageNumber, showNumber int32) (friends []*relation.FriendRequestModel, total int64, err error) { - return f.database.FindFriendRequestToMe(ctx, userID, pageNumber, showNumber) +func (f *FriendController) PageFriendRequestToMe(ctx context.Context, userID string, pageNumber, showNumber int32) (friends []*relation.FriendRequestModel, total int64, err error) { + return f.database.PageFriendRequestToMe(ctx, userID, pageNumber, showNumber) } // FindFriends 获取某人指定好友的信息 -func (f *FriendController) FindFriends(ctx context.Context, ownerUserID string, friendUserIDs []string) (friends []*relation.FriendModel, err error) { - return f.database.FindFriends(ctx, ownerUserID, friendUserIDs) +func (f *FriendController) FindFriendsWithError(ctx context.Context, ownerUserID string, friendUserIDs []string) (friends []*relation.FriendModel, err error) { + return f.database.FindFriendsWithError(ctx, ownerUserID, friendUserIDs) } type FriendDatabaseInterface interface { @@ -121,15 +121,15 @@ type FriendDatabaseInterface interface { // 更新好友备注 UpdateRemark(ctx context.Context, ownerUserID, friendUserID, remark string) (err error) // 获取ownerUserID的好友列表 - FindOwnerFriends(ctx context.Context, ownerUserID string, pageNumber, showNumber int32) (friends []*relation.FriendModel, total int64, err error) + PageOwnerFriends(ctx context.Context, ownerUserID string, pageNumber, showNumber int32) (friends []*relation.FriendModel, total int64, err error) // friendUserID在哪些人的好友列表中 - FindInWhoseFriends(ctx context.Context, friendUserID string, pageNumber, showNumber int32) (friends []*relation.FriendModel, total int64, err error) + PageInWhoseFriends(ctx context.Context, friendUserID string, pageNumber, showNumber int32) (friends []*relation.FriendModel, total int64, err error) // 获取我发出去的好友申请 - FindFriendRequestFromMe(ctx context.Context, userID string, pageNumber, showNumber int32) (friends []*relation.FriendRequestModel, total int64, err error) + PageFriendRequestFromMe(ctx context.Context, userID string, pageNumber, showNumber int32) (friends []*relation.FriendRequestModel, total int64, err error) // 获取我收到的的好友申请 - FindFriendRequestToMe(ctx context.Context, userID string, pageNumber, showNumber int32) (friends []*relation.FriendRequestModel, total int64, err error) + PageFriendRequestToMe(ctx context.Context, userID string, pageNumber, showNumber int32) (friends []*relation.FriendRequestModel, total int64, err error) // 获取某人指定好友的信息 - FindFriends(ctx context.Context, ownerUserID string, friendUserIDs []string) (friends []*relation.FriendModel, err error) + FindFriendsWithError(ctx context.Context, ownerUserID string, friendUserIDs []string) (friends []*relation.FriendModel, err error) } type FriendDatabase struct { @@ -302,27 +302,27 @@ func (f *FriendDatabase) UpdateRemark(ctx context.Context, ownerUserID, friendUs } // 获取ownerUserID的好友列表 无结果不返回错误 -func (f *FriendDatabase) FindOwnerFriends(ctx context.Context, ownerUserID string, pageNumber, showNumber int32) (friends []*relation.FriendModel, total int64, err error) { +func (f *FriendDatabase) PageOwnerFriends(ctx context.Context, ownerUserID string, pageNumber, showNumber int32) (friends []*relation.FriendModel, total int64, err error) { return f.friend.FindOwnerFriends(ctx, ownerUserID, pageNumber, showNumber) } // friendUserID在哪些人的好友列表中 -func (f *FriendDatabase) FindInWhoseFriends(ctx context.Context, friendUserID string, pageNumber, showNumber int32) (friends []*relation.FriendModel, total int64, err error) { +func (f *FriendDatabase) PageInWhoseFriends(ctx context.Context, friendUserID string, pageNumber, showNumber int32) (friends []*relation.FriendModel, total int64, err error) { return f.friend.FindInWhoseFriends(ctx, friendUserID, pageNumber, showNumber) } // 获取我发出去的好友申请 无结果不返回错误 -func (f *FriendDatabase) FindFriendRequestFromMe(ctx context.Context, userID string, pageNumber, showNumber int32) (friends []*relation.FriendRequestModel, total int64, err error) { +func (f *FriendDatabase) PageFriendRequestFromMe(ctx context.Context, userID string, pageNumber, showNumber int32) (friends []*relation.FriendRequestModel, total int64, err error) { return f.friendRequest.FindFromUserID(ctx, userID, pageNumber, showNumber) } // 获取我收到的的好友申请 无结果不返回错误 -func (f *FriendDatabase) FindFriendRequestToMe(ctx context.Context, userID string, pageNumber, showNumber int32) (friends []*relation.FriendRequestModel, total int64, err error) { +func (f *FriendDatabase) PageFriendRequestToMe(ctx context.Context, userID string, pageNumber, showNumber int32) (friends []*relation.FriendRequestModel, total int64, err error) { return f.friendRequest.FindToUserID(ctx, userID, pageNumber, showNumber) } // 获取某人指定好友的信息 如果有好友不存在,也返回错误 -func (f *FriendDatabase) FindFriends(ctx context.Context, ownerUserID string, friendUserIDs []string) (friends []*relation.FriendModel, err error) { +func (f *FriendDatabase) FindFriendsWithError(ctx context.Context, ownerUserID string, friendUserIDs []string) (friends []*relation.FriendModel, err error) { friends, err = f.friend.FindFriends(ctx, ownerUserID, friendUserIDs) if err != nil { return diff --git a/pkg/common/db/controller/user.go b/pkg/common/db/controller/user.go index b6bf2dfbd..b5cef87f1 100644 --- a/pkg/common/db/controller/user.go +++ b/pkg/common/db/controller/user.go @@ -10,6 +10,8 @@ import ( type UserInterface interface { //获取指定用户的信息 如有userID未找到 也返回错误 + FindWithError(ctx context.Context, userIDs []string) (users []*relation2.UserModel, err error) + //获取指定用户的信息 如有userID未找到 不返回错误 Find(ctx context.Context, userIDs []string) (users []*relation2.UserModel, err error) //插入多条 外部保证userID 不重复 且在db中不存在 Create(ctx context.Context, users []*relation2.UserModel) (err error) @@ -17,9 +19,9 @@ type UserInterface interface { Update(ctx context.Context, users []*relation2.UserModel) (err error) //更新(零值) 外部保证userID存在 UpdateByMap(ctx context.Context, userID string, args map[string]interface{}) (err error) - //获取,如果没找到,不返回错误 - Get(ctx context.Context, pageNumber, showNumber int32) (users []*relation2.UserModel, count int64, err error) - //userIDs是否存在 只要有一个存在就为true + //如果没找到,不返回错误 + Page(ctx context.Context, pageNumber, showNumber int32) (users []*relation2.UserModel, count int64, err error) + //只要有一个存在就为true IsExist(ctx context.Context, userIDs []string) (exist bool, err error) } @@ -27,6 +29,11 @@ type UserController struct { database UserDatabaseInterface } +// 获取指定用户的信息 如有userID未找到 也返回错误 +func (u *UserController) FindWithError(ctx context.Context, userIDs []string) (users []*relation2.UserModel, err error) { + return u.database.FindWithError(ctx, userIDs) + +} func (u *UserController) Find(ctx context.Context, userIDs []string) (users []*relation2.UserModel, err error) { return u.database.Find(ctx, userIDs) } @@ -41,8 +48,8 @@ func (u *UserController) UpdateByMap(ctx context.Context, userID string, args ma return u.database.UpdateByMap(ctx, userID, args) } -func (u *UserController) Get(ctx context.Context, pageNumber, showNumber int32) (users []*relation2.UserModel, count int64, err error) { - return u.database.Get(ctx, pageNumber, showNumber) +func (u *UserController) Page(ctx context.Context, pageNumber, showNumber int32) (users []*relation2.UserModel, count int64, err error) { + return u.database.Page(ctx, pageNumber, showNumber) } func (u *UserController) IsExist(ctx context.Context, userIDs []string) (exist bool, err error) { @@ -54,11 +61,19 @@ func NewUserController(db *gorm.DB) *UserController { } type UserDatabaseInterface interface { + //获取指定用户的信息 如有userID未找到 也返回错误 + FindWithError(ctx context.Context, userIDs []string) (users []*relation2.UserModel, err error) + //获取指定用户的信息 如有userID未找到 不返回错误 Find(ctx context.Context, userIDs []string) (users []*relation2.UserModel, err error) - Create(ctx context.Context, users []*relation2.UserModel) error + //插入多条 外部保证userID 不重复 且在db中不存在 + Create(ctx context.Context, users []*relation2.UserModel) (err error) + //更新(非零值) 外部保证userID存在 Update(ctx context.Context, users []*relation2.UserModel) (err error) + //更新(零值) 外部保证userID存在 UpdateByMap(ctx context.Context, userID string, args map[string]interface{}) (err error) - Get(ctx context.Context, pageNumber, showNumber int32) (users []*relation2.UserModel, count int64, err error) + //如果没找到,不返回错误 + Page(ctx context.Context, pageNumber, showNumber int32) (users []*relation2.UserModel, count int64, err error) + //只要有一个存在就为true IsExist(ctx context.Context, userIDs []string) (exist bool, err error) } @@ -75,7 +90,7 @@ func newUserDatabase(db *gorm.DB) *UserDatabase { } // 获取指定用户的信息 如有userID未找到 也返回错误 -func (u *UserDatabase) Find(ctx context.Context, userIDs []string) (users []*relation2.UserModel, err error) { +func (u *UserDatabase) FindWithError(ctx context.Context, userIDs []string) (users []*relation2.UserModel, err error) { users, err = u.user.Find(ctx, userIDs) if err != nil { return @@ -86,6 +101,12 @@ func (u *UserDatabase) Find(ctx context.Context, userIDs []string) (users []*rel return } +// 获取指定用户的信息 如有userID未找到 不返回错误 +func (u *UserDatabase) Find(ctx context.Context, userIDs []string) (users []*relation2.UserModel, err error) { + users, err = u.user.Find(ctx, userIDs) + return +} + // 插入多条 外部保证userID 不重复 且在db中不存在 func (u *UserDatabase) Create(ctx context.Context, users []*relation2.UserModel) (err error) { return u.user.Create(ctx, users) @@ -102,8 +123,8 @@ func (u *UserDatabase) UpdateByMap(ctx context.Context, userID string, args map[ } // 获取,如果没找到,不返回错误 -func (u *UserDatabase) Get(ctx context.Context, showNumber, pageNumber int32) (users []*relation2.UserModel, count int64, err error) { - return u.user.Get(ctx, showNumber, pageNumber) +func (u *UserDatabase) Page(ctx context.Context, showNumber, pageNumber int32) (users []*relation2.UserModel, count int64, err error) { + return u.user.Page(ctx, showNumber, pageNumber) } // userIDs是否存在 只要有一个存在就为true diff --git a/pkg/common/db/localcache/conversation.go b/pkg/common/db/localcache/conversation.go index cb07bd7c7..3a4775367 100644 --- a/pkg/common/db/localcache/conversation.go +++ b/pkg/common/db/localcache/conversation.go @@ -1,8 +1,8 @@ package localcache import ( + discoveryRegistry "Open_IM/pkg/discovery_registry" "context" - "github.com/OpenIMSDK/openKeeper" "sync" ) @@ -13,16 +13,17 @@ type ConversationLocalCacheInterface interface { type ConversationLocalCache struct { lock sync.Mutex SuperGroupRecvMsgNotNotifyUserIDs map[string][]string - zkClient *openKeeper.ZkClient + client discoveryRegistry.SvcDiscoveryRegistry } -func NewConversationLocalCache(zkClient *openKeeper.ZkClient) ConversationLocalCache { +func NewConversationLocalCache(client discoveryRegistry.SvcDiscoveryRegistry) ConversationLocalCache { return ConversationLocalCache{ SuperGroupRecvMsgNotNotifyUserIDs: make(map[string][]string, 0), - zkClient: zkClient, + client: client, } } func (g *ConversationLocalCache) GetRecvMsgNotNotifyUserIDs(ctx context.Context, groupID string) []string { + g.client.GetConn() return []string{} } diff --git a/pkg/common/db/localcache/group.go b/pkg/common/db/localcache/group.go index c4606532d..87d9d2f27 100644 --- a/pkg/common/db/localcache/group.go +++ b/pkg/common/db/localcache/group.go @@ -3,10 +3,9 @@ package localcache import ( "Open_IM/pkg/common/config" "Open_IM/pkg/common/constant" + discoveryRegistry "Open_IM/pkg/discovery_registry" "Open_IM/pkg/proto/group" "context" - "github.com/OpenIMSDK/openKeeper" - "google.golang.org/grpc" "sync" ) @@ -15,9 +14,9 @@ type GroupLocalCacheInterface interface { } type GroupLocalCache struct { - lock sync.Mutex - cache map[string]GroupMemberIDsHash - zkClient *openKeeper.ZkClient + lock sync.Mutex + cache map[string]GroupMemberIDsHash + client discoveryRegistry.SvcDiscoveryRegistry } type GroupMemberIDsHash struct { @@ -25,17 +24,17 @@ type GroupMemberIDsHash struct { userIDs []string } -func NewGroupMemberIDsLocalCache(zkClient *openKeeper.ZkClient) GroupLocalCache { +func NewGroupMemberIDsLocalCache(client discoveryRegistry.SvcDiscoveryRegistry) GroupLocalCache { return GroupLocalCache{ - cache: make(map[string]GroupMemberIDsHash, 0), - zkClient: zkClient, + cache: make(map[string]GroupMemberIDsHash, 0), + client: client, } } func (g *GroupLocalCache) GetGroupMemberIDs(ctx context.Context, groupID string) ([]string, error) { g.lock.Lock() defer g.lock.Unlock() - conn, err := g.zkClient.GetConn(config.Config.RpcRegisterName.OpenImGroupName, nil) + conn, err := g.client.GetConn(config.Config.RpcRegisterName.OpenImGroupName, nil) if err != nil { return nil, err } diff --git a/pkg/common/db/relation/friend_model_k.go b/pkg/common/db/relation/friend_model_k.go index d88d4c6b1..b452bae4e 100644 --- a/pkg/common/db/relation/friend_model_k.go +++ b/pkg/common/db/relation/friend_model_k.go @@ -31,7 +31,7 @@ type FriendUser struct { } // 插入多条记录 -func (f *FriendGorm) Create(ctx context.Context, friends []*relation.FriendModel, tx ...*gorm.DB) (err error) { +func (f *FriendGorm) Create(ctx context.Context, friends []*relation.FriendModel, tx ...any) (err error) { defer func() { tracelog.SetCtxDebug(ctx, utils.GetFuncName(1), err, "friends", friends) }() @@ -39,7 +39,7 @@ func (f *FriendGorm) Create(ctx context.Context, friends []*relation.FriendModel } // 删除ownerUserID指定的好友 -func (f *FriendGorm) Delete(ctx context.Context, ownerUserID string, friendUserIDs []string, tx ...*gorm.DB) (err error) { +func (f *FriendGorm) Delete(ctx context.Context, ownerUserID string, friendUserIDs []string, tx ...any) (err error) { defer func() { tracelog.SetCtxDebug(ctx, utils.GetFuncName(1), err, "ownerUserID", ownerUserID, "friendUserIDs", friendUserIDs) }() @@ -48,7 +48,7 @@ func (f *FriendGorm) Delete(ctx context.Context, ownerUserID string, friendUserI } // 更新ownerUserID单个好友信息 更新零值 -func (f *FriendGorm) UpdateByMap(ctx context.Context, ownerUserID string, friendUserID string, args map[string]interface{}, tx ...*gorm.DB) (err error) { +func (f *FriendGorm) UpdateByMap(ctx context.Context, ownerUserID string, friendUserID string, args map[string]interface{}, tx ...any) (err error) { defer func() { tracelog.SetCtxDebug(ctx, utils.GetFuncName(1), err, "ownerUserID", ownerUserID, "friendUserID", friendUserID, "args", args) }() @@ -56,7 +56,7 @@ func (f *FriendGorm) UpdateByMap(ctx context.Context, ownerUserID string, friend } // 更新好友信息的非零值 -func (f *FriendGorm) Update(ctx context.Context, friends []*relation.FriendModel, tx ...*gorm.DB) (err error) { +func (f *FriendGorm) Update(ctx context.Context, friends []*relation.FriendModel, tx ...any) (err error) { defer func() { tracelog.SetCtxDebug(ctx, utils.GetFuncName(1), err, "friends", friends) }() @@ -64,7 +64,7 @@ func (f *FriendGorm) Update(ctx context.Context, friends []*relation.FriendModel } // 更新好友备注(也支持零值 ) -func (f *FriendGorm) UpdateRemark(ctx context.Context, ownerUserID, friendUserID, remark string, tx ...*gorm.DB) (err error) { +func (f *FriendGorm) UpdateRemark(ctx context.Context, ownerUserID, friendUserID, remark string, tx ...any) (err error) { defer func() { tracelog.SetCtxDebug(ctx, utils.GetFuncName(1), err, "ownerUserID", ownerUserID, "friendUserID", friendUserID, "remark", remark) }() @@ -78,14 +78,14 @@ func (f *FriendGorm) UpdateRemark(ctx context.Context, ownerUserID, friendUserID } // 获取单个好友信息,如没找到 返回错误 -func (f *FriendGorm) Take(ctx context.Context, ownerUserID, friendUserID string, tx ...*gorm.DB) (friend *relation.FriendModel, err error) { +func (f *FriendGorm) Take(ctx context.Context, ownerUserID, friendUserID string, tx ...any) (friend *relation.FriendModel, err error) { friend = &relation.FriendModel{} defer tracelog.SetCtxDebug(ctx, utils.GetFuncName(1), err, "ownerUserID", ownerUserID, "friendUserID", friendUserID, "friend", *friend) return friend, utils.Wrap(getDBConn(f.DB, tx).Where("owner_user_id = ? and friend_user_id", ownerUserID, friendUserID).Take(friend).Error, "") } // 查找好友关系,如果是双向关系,则都返回 -func (f *FriendGorm) FindUserState(ctx context.Context, userID1, userID2 string, tx ...*gorm.DB) (friends []*relation.FriendModel, err error) { +func (f *FriendGorm) FindUserState(ctx context.Context, userID1, userID2 string, tx ...any) (friends []*relation.FriendModel, err error) { defer func() { tracelog.SetCtxDebug(ctx, utils.GetFuncName(1), err, "userID1", userID1, "userID2", userID2, "friends", friends) }() @@ -93,7 +93,7 @@ func (f *FriendGorm) FindUserState(ctx context.Context, userID1, userID2 string, } // 获取 owner指定的好友列表 如果有friendUserIDs不存在,也不返回错误 -func (f *FriendGorm) FindFriends(ctx context.Context, ownerUserID string, friendUserIDs []string, tx ...*gorm.DB) (friends []*relation.FriendModel, err error) { +func (f *FriendGorm) FindFriends(ctx context.Context, ownerUserID string, friendUserIDs []string, tx ...any) (friends []*relation.FriendModel, err error) { defer func() { tracelog.SetCtxDebug(ctx, utils.GetFuncName(1), err, "ownerUserID", ownerUserID, "friendUserIDs", friendUserIDs, "friends", friends) }() @@ -101,7 +101,7 @@ func (f *FriendGorm) FindFriends(ctx context.Context, ownerUserID string, friend } // 获取哪些人添加了friendUserID 如果有ownerUserIDs不存在,也不返回错误 -func (f *FriendGorm) FindReversalFriends(ctx context.Context, friendUserID string, ownerUserIDs []string, tx ...*gorm.DB) (friends []*relation.FriendModel, err error) { +func (f *FriendGorm) FindReversalFriends(ctx context.Context, friendUserID string, ownerUserIDs []string, tx ...any) (friends []*relation.FriendModel, err error) { defer func() { tracelog.SetCtxDebug(ctx, utils.GetFuncName(1), err, "friendUserID", friendUserID, "ownerUserIDs", ownerUserIDs, "friends", friends) }() @@ -109,7 +109,7 @@ func (f *FriendGorm) FindReversalFriends(ctx context.Context, friendUserID strin } // 获取ownerUserID好友列表 支持翻页 -func (f *FriendGorm) FindOwnerFriends(ctx context.Context, ownerUserID string, pageNumber, showNumber int32, tx ...*gorm.DB) (friends []*relation.FriendModel, total int64, err error) { +func (f *FriendGorm) FindOwnerFriends(ctx context.Context, ownerUserID string, pageNumber, showNumber int32, tx ...any) (friends []*relation.FriendModel, total int64, err error) { defer func() { tracelog.SetCtxDebug(ctx, utils.GetFuncName(1), err, "ownerUserID", ownerUserID, "pageNumber", pageNumber, "showNumber", showNumber, "friends", friends, "total", total) }() @@ -122,7 +122,7 @@ func (f *FriendGorm) FindOwnerFriends(ctx context.Context, ownerUserID string, p } // 获取哪些人添加了friendUserID 支持翻页 -func (f *FriendGorm) FindInWhoseFriends(ctx context.Context, friendUserID string, pageNumber, showNumber int32, tx ...*gorm.DB) (friends []*relation.FriendModel, total int64, err error) { +func (f *FriendGorm) FindInWhoseFriends(ctx context.Context, friendUserID string, pageNumber, showNumber int32, tx ...any) (friends []*relation.FriendModel, total int64, err error) { defer func() { tracelog.SetCtxDebug(ctx, utils.GetFuncName(1), err, "friendUserID", friendUserID, "pageNumber", pageNumber, "showNumber", showNumber, "friends", friends, "total", total) }() diff --git a/pkg/common/db/relation/init_mysql.go b/pkg/common/db/relation/init_mysql.go index 6d6ec61ee..986060b0a 100644 --- a/pkg/common/db/relation/init_mysql.go +++ b/pkg/common/db/relation/init_mysql.go @@ -63,6 +63,9 @@ func (m *Mysql) InitConn() *Mysql { sqlDB.SetConnMaxLifetime(time.Second * time.Duration(config.Config.Mysql.DBMaxLifeTime)) sqlDB.SetMaxOpenConns(config.Config.Mysql.DBMaxOpenConns) sqlDB.SetMaxIdleConns(config.Config.Mysql.DBMaxIdleConns) + if db == nil { + panic("db is nil") + } m.SetGormConn(db) return m } @@ -89,8 +92,8 @@ func (w Writer) Printf(format string, args ...interface{}) { func getDBConn(db *gorm.DB, tx []any) *gorm.DB { if len(tx) > 0 { - if txDb, ok := tx[0].(*gorm.DB); ok { - return txDb + if txDB, ok := tx[0].(*gorm.DB); ok { + return txDB } } return db diff --git a/pkg/common/db/relation/user_model_k.go b/pkg/common/db/relation/user_model_k.go index b8ed1a252..c57adb7de 100644 --- a/pkg/common/db/relation/user_model_k.go +++ b/pkg/common/db/relation/user_model_k.go @@ -20,7 +20,7 @@ func NewUserGorm(db *gorm.DB) *UserGorm { } // 插入多条 -func (u *UserGorm) Create(ctx context.Context, users []*relation.UserModel, tx ...*gorm.DB) (err error) { +func (u *UserGorm) Create(ctx context.Context, users []*relation.UserModel, tx ...any) (err error) { defer func() { tracelog.SetCtxDebug(ctx, utils.GetFuncName(1), err, "users", users) }() @@ -28,7 +28,7 @@ func (u *UserGorm) Create(ctx context.Context, users []*relation.UserModel, tx . } // 更新用户信息 零值 -func (u *UserGorm) UpdateByMap(ctx context.Context, userID string, args map[string]interface{}, tx ...*gorm.DB) (err error) { +func (u *UserGorm) UpdateByMap(ctx context.Context, userID string, args map[string]interface{}, tx ...any) (err error) { defer func() { tracelog.SetCtxDebug(ctx, utils.GetFuncName(1), err, "userID", userID, "args", args) }() @@ -36,7 +36,7 @@ func (u *UserGorm) UpdateByMap(ctx context.Context, userID string, args map[stri } // 更新多个用户信息 非零值 -func (u *UserGorm) Update(ctx context.Context, users []*relation.UserModel, tx ...*gorm.DB) (err error) { +func (u *UserGorm) Update(ctx context.Context, users []*relation.UserModel, tx ...any) (err error) { defer func() { tracelog.SetCtxDebug(ctx, utils.GetFuncName(1), err, "users", users) }() @@ -44,7 +44,7 @@ func (u *UserGorm) Update(ctx context.Context, users []*relation.UserModel, tx . } // 获取指定用户信息 不存在,也不返回错误 -func (u *UserGorm) Find(ctx context.Context, userIDs []string, tx ...*gorm.DB) (users []*relation.UserModel, err error) { +func (u *UserGorm) Find(ctx context.Context, userIDs []string, tx ...any) (users []*relation.UserModel, err error) { defer func() { tracelog.SetCtxDebug(ctx, utils.GetFuncName(1), err, "userIDs", userIDs, "users", users) }() @@ -53,7 +53,7 @@ func (u *UserGorm) Find(ctx context.Context, userIDs []string, tx ...*gorm.DB) ( } // 获取某个用户信息 不存在,则返回错误 -func (u *UserGorm) Take(ctx context.Context, userID string, tx ...*gorm.DB) (user *relation.UserModel, err error) { +func (u *UserGorm) Take(ctx context.Context, userID string, tx ...any) (user *relation.UserModel, err error) { user = &relation.UserModel{} defer func() { tracelog.SetCtxDebug(ctx, utils.GetFuncName(1), err, "userID", userID, "user", *user) @@ -63,7 +63,7 @@ func (u *UserGorm) Take(ctx context.Context, userID string, tx ...*gorm.DB) (use } // 通过名字查找用户 不存在,不返回错误 -func (u *UserGorm) GetByName(ctx context.Context, userName string, pageNumber, showNumber int32, tx ...*gorm.DB) (users []*relation.UserModel, count int64, err error) { +func (u *UserGorm) GetByName(ctx context.Context, userName string, pageNumber, showNumber int32, tx ...any) (users []*relation.UserModel, count int64, err error) { defer func() { tracelog.SetCtxDebug(ctx, utils.GetFuncName(1), err, "userName", userName, "pageNumber", pageNumber, "showNumber", showNumber, "users", users, "count", count) }() @@ -76,7 +76,7 @@ func (u *UserGorm) GetByName(ctx context.Context, userName string, pageNumber, s } // 通过名字或userID查找用户 不存在,不返回错误 -func (u *UserGorm) GetByNameAndID(ctx context.Context, content string, pageNumber, showNumber int32, tx ...*gorm.DB) (users []*relation.UserModel, count int64, err error) { +func (u *UserGorm) GetByNameAndID(ctx context.Context, content string, pageNumber, showNumber int32, tx ...any) (users []*relation.UserModel, count int64, err error) { defer func() { tracelog.SetCtxDebug(ctx, utils.GetFuncName(1), err, "content", content, "pageNumber", pageNumber, "showNumber", showNumber, "users", users, "count", count) }() @@ -89,7 +89,7 @@ func (u *UserGorm) GetByNameAndID(ctx context.Context, content string, pageNumbe } // 获取用户信息 不存在,不返回错误 -func (u *UserGorm) Get(ctx context.Context, pageNumber, showNumber int32, tx ...*gorm.DB) (users []*relation.UserModel, count int64, err error) { +func (u *UserGorm) Page(ctx context.Context, pageNumber, showNumber int32, tx ...any) (users []*relation.UserModel, count int64, err error) { defer func() { tracelog.SetCtxDebug(ctx, utils.GetFuncName(1), err, "pageNumber", pageNumber, "showNumber", showNumber, "users", users, "count", count) }() diff --git a/pkg/common/db/unrelation/init_mongo.go b/pkg/common/db/unrelation/mongo.go similarity index 92% rename from pkg/common/db/unrelation/init_mongo.go rename to pkg/common/db/unrelation/mongo.go index d7cfcae66..a359e11d0 100644 --- a/pkg/common/db/unrelation/init_mongo.go +++ b/pkg/common/db/unrelation/mongo.go @@ -136,3 +136,17 @@ func (m *Mongo) createMongoIndex(collection string, isUnique bool, keys ...strin } return nil } + +func MongoTransaction(ctx context.Context, mgo *mongo.Client, fn func(ctx mongo.SessionContext) error) error { + sess, err := mgo.StartSession() + if err != nil { + return err + } + sCtx := mongo.NewSessionContext(ctx, sess) + defer sess.EndSession(sCtx) + if err := fn(sCtx); err != nil { + _ = sess.AbortTransaction(sCtx) + return err + } + return utils.Wrap(sess.CommitTransaction(sCtx), "") +} diff --git a/pkg/common/db/unrelation/mongo_model.go b/pkg/common/db/unrelation/mongo_model.go index 9cb4daaef..aee4ae29e 100644 --- a/pkg/common/db/unrelation/mongo_model.go +++ b/pkg/common/db/unrelation/mongo_model.go @@ -57,40 +57,6 @@ type GroupMember_x struct { var ErrMsgListNotExist = errors.New("user not have msg in mongoDB") -func (d *db.DataBases) GetMinSeqFromMongo(uid string) (MinSeq uint32, err error) { - return 1, nil - //var i, NB uint32 - //var seqUid string - //session := d.mgoSession.Clone() - //if session == nil { - // return MinSeq, errors.New("session == nil") - //} - //defer session.Close() - //c := session.DB(config.Config.Mongo.DBDatabase).C(cChat) - //MaxSeq, err := d.GetUserMaxSeq(uid) - //if err != nil && err != redis.ErrNil { - // return MinSeq, err - //} - //NB = uint32(MaxSeq / singleGocMsgNum) - //for i = 0; i <= NB; i++ { - // seqUid = indexGen(uid, i) - // n, err := c.Find(bson.M{"uid": seqUid}).Count() - // if err == nil && n != 0 { - // if i == 0 { - // MinSeq = 1 - // } else { - // MinSeq = uint32(i * singleGocMsgNum) - // } - // break - // } - //} - //return MinSeq, nil -} - -func (d *db.DataBases) GetMinSeqFromMongo2(uid string) (MinSeq uint32, err error) { - return 1, nil -} - // deleteMsgByLogic func (d *db.DataBases) DelMsgBySeqList(userID string, seqList []uint32, operationID string) (totalUnexistSeqList []uint32, err error) { log.Debug(operationID, utils.GetSelfFuncName(), "args ", userID, seqList) @@ -657,24 +623,6 @@ func (d *db.DataBases) SaveUserChat(uid string, sendTime int64, m *pbMsg.MsgData return nil } -func (d *db.DataBases) DelUserChat(uid string) error { - return nil - //session := d.mgoSession.Clone() - //if session == nil { - // return errors.New("session == nil") - //} - //defer session.Close() - // - //c := session.DB(config.Config.Mongo.DBDatabase).C(cChat) - // - //delTime := time.Now().Unix() - int64(config.Config.Mongo.DBRetainChatRecords)*24*3600 - //if err := c.Update(bson.M{"uid": uid}, bson.M{"$pull": bson.M{"msg": bson.M{"sendtime": bson.M{"$lte": delTime}}}}); err != nil { - // return err - //} - // - //return nil -} - func (d *db.DataBases) DelUserChatMongo2(uid string) error { ctx, _ := context.WithTimeout(context.Background(), time.Duration(config.Config.Mongo.DBTimeout)*time.Second) c := d.mongoClient.Database(config.Config.Mongo.DBDatabase).Collection(cChat) @@ -687,19 +635,6 @@ func (d *db.DataBases) DelUserChatMongo2(uid string) error { return nil } -func (d *db.DataBases) MgoUserCount() (int, error) { - return 0, nil - //session := d.mgoSession.Clone() - //if session == nil { - // return 0, errors.New("session == nil") - //} - //defer session.Close() - // - //c := session.DB(config.Config.Mongo.DBDatabase).C(cChat) - // - //return c.Find(nil).Count() -} - func (d *db.DataBases) MgoSkipUID(count int) (string, error) { return "", nil //session := d.mgoSession.Clone() @@ -715,249 +650,6 @@ func (d *db.DataBases) MgoSkipUID(count int) (string, error) { //return sChat.UID, nil } -func (d *db.DataBases) GetGroupMember(groupID string) []string { - return nil - //groupInfo := GroupMember_x{} - //groupInfo.GroupID = groupID - //groupInfo.UIDList = make([]string, 0) - // - //session := d.mgoSession.Clone() - //if session == nil { - // return groupInfo.UIDList - //} - //defer session.Close() - // - //c := session.DB(config.Config.Mongo.DBDatabase).C(cGroup) - // - //if err := c.Find(bson.M{"groupid": groupInfo.GroupID}).One(&groupInfo); err != nil { - // return groupInfo.UIDList - //} - // - //return groupInfo.UIDList -} - -func (d *db.DataBases) AddGroupMember(groupID, uid string) error { - return nil - //session := d.mgoSession.Clone() - //if session == nil { - // return errors.New("session == nil") - //} - //defer session.Close() - // - //c := session.DB(config.Config.Mongo.DBDatabase).C(cGroup) - // - //n, err := c.Find(bson.M{"groupid": groupID}).Count() - //if err != nil { - // return err - //} - // - //if n == 0 { - // groupInfo := GroupMember_x{} - // groupInfo.GroupID = groupID - // groupInfo.UIDList = append(groupInfo.UIDList, uid) - // err = c.Insert(&groupInfo) - // if err != nil { - // return err - // } - //} else { - // err = c.Update(bson.M{"groupid": groupID}, bson.M{"$addToSet": bson.M{"uidlist": uid}}) - // if err != nil { - // return err - // } - //} - // - //return nil -} - -func (d *db.DataBases) DelGroupMember(groupID, uid string) error { - return nil - //session := d.mgoSession.Clone() - //if session == nil { - // return errors.New("session == nil") - //} - //defer session.Close() - // - //c := session.DB(config.Config.Mongo.DBDatabase).C(cGroup) - // - //if err := c.Update(bson.M{"groupid": groupID}, bson.M{"$pull": bson.M{"uidlist": uid}}); err != nil { - // return err - //} - // - //return nil -} - -//type SuperGroup struct { -// GroupID string `bson:"group_id" json:"groupID"` -// MemberIDList []string `bson:"member_id_list" json:"memberIDList"` -//} -// -//type UserToSuperGroup struct { -// UserID string `bson:"user_id" json:"userID"` -// GroupIDList []string `bson:"group_id_list" json:"groupIDList"` -//} - -func (d *db.DataBases) CreateSuperGroup(groupID string, initMemberIDList []string, memberNumCount int) error { - ctx, _ := context.WithTimeout(context.Background(), time.Duration(config.Config.Mongo.DBTimeout)*time.Second) - c := d.mongoClient.Database(config.Config.Mongo.DBDatabase).Collection(cSuperGroup) - session, err := d.mongoClient.StartSession() - if err != nil { - return utils.Wrap(err, "start session failed") - } - defer session.EndSession(ctx) - sCtx := mongo.NewSessionContext(ctx, session) - superGroup := SuperGroup{ - GroupID: groupID, - MemberIDList: initMemberIDList, - } - _, err = c.InsertOne(sCtx, superGroup) - if err != nil { - _ = session.AbortTransaction(ctx) - return utils.Wrap(err, "transaction failed") - } - var users []UserToSuperGroup - for _, v := range initMemberIDList { - users = append(users, UserToSuperGroup{ - UserID: v, - }) - } - upsert := true - opts := &options.UpdateOptions{ - Upsert: &upsert, - } - c = d.mongoClient.Database(config.Config.Mongo.DBDatabase).Collection(cUserToSuperGroup) - //_, err = c.UpdateMany(sCtx, bson.M{"user_id": bson.M{"$in": initMemberIDList}}, bson.M{"$addToSet": bson.M{"group_id_list": groupID}}, opts) - //if err != nil { - // session.AbortTransaction(ctx) - // return utils.Wrap(err, "transaction failed") - //} - for _, userID := range initMemberIDList { - _, err = c.UpdateOne(sCtx, bson.M{"user_id": userID}, bson.M{"$addToSet": bson.M{"group_id_list": groupID}}, opts) - if err != nil { - _ = session.AbortTransaction(ctx) - return utils.Wrap(err, "transaction failed") - } - - } - return err -} - -func (d *db.DataBases) GetSuperGroup(groupID string) (SuperGroup, error) { - ctx, _ := context.WithTimeout(context.Background(), time.Duration(config.Config.Mongo.DBTimeout)*time.Second) - c := d.mongoClient.Database(config.Config.Mongo.DBDatabase).Collection(cSuperGroup) - superGroup := SuperGroup{} - err := c.FindOne(ctx, bson.M{"group_id": groupID}).Decode(&superGroup) - return superGroup, err -} - -func (d *db.DataBases) AddUserToSuperGroup(groupID string, userIDList []string) error { - ctx, _ := context.WithTimeout(context.Background(), time.Duration(config.Config.Mongo.DBTimeout)*time.Second) - c := d.mongoClient.Database(config.Config.Mongo.DBDatabase).Collection(cSuperGroup) - session, err := d.mongoClient.StartSession() - if err != nil { - return utils.Wrap(err, "start session failed") - } - defer session.EndSession(ctx) - sCtx := mongo.NewSessionContext(ctx, session) - if err != nil { - return utils.Wrap(err, "start transaction failed") - } - _, err = c.UpdateOne(sCtx, bson.M{"group_id": groupID}, bson.M{"$addToSet": bson.M{"member_id_list": bson.M{"$each": userIDList}}}) - if err != nil { - _ = session.AbortTransaction(ctx) - return utils.Wrap(err, "transaction failed") - } - c = d.mongoClient.Database(config.Config.Mongo.DBDatabase).Collection(cUserToSuperGroup) - var users []UserToSuperGroup - for _, v := range userIDList { - users = append(users, UserToSuperGroup{ - UserID: v, - }) - } - upsert := true - opts := &options.UpdateOptions{ - Upsert: &upsert, - } - for _, userID := range userIDList { - _, err = c.UpdateOne(sCtx, bson.M{"user_id": userID}, bson.M{"$addToSet": bson.M{"group_id_list": groupID}}, opts) - if err != nil { - _ = session.AbortTransaction(ctx) - return utils.Wrap(err, "transaction failed") - } - } - _ = session.CommitTransaction(ctx) - return err -} - -func (d *db.DataBases) RemoverUserFromSuperGroup(groupID string, userIDList []string) error { - ctx, _ := context.WithTimeout(context.Background(), time.Duration(config.Config.Mongo.DBTimeout)*time.Second) - c := d.mongoClient.Database(config.Config.Mongo.DBDatabase).Collection(cSuperGroup) - session, err := d.mongoClient.StartSession() - if err != nil { - return utils.Wrap(err, "start session failed") - } - defer session.EndSession(ctx) - sCtx := mongo.NewSessionContext(ctx, session) - _, err = c.UpdateOne(ctx, bson.M{"group_id": groupID}, bson.M{"$pull": bson.M{"member_id_list": bson.M{"$in": userIDList}}}) - if err != nil { - _ = session.AbortTransaction(ctx) - return utils.Wrap(err, "transaction failed") - } - err = d.RemoveGroupFromUser(ctx, sCtx, groupID, userIDList) - if err != nil { - _ = session.AbortTransaction(ctx) - return utils.Wrap(err, "transaction failed") - } - _ = session.CommitTransaction(ctx) - return err -} - -func (d *db.DataBases) GetSuperGroupByUserID(userID string) (UserToSuperGroup, error) { - ctx, _ := context.WithTimeout(context.Background(), time.Duration(config.Config.Mongo.DBTimeout)*time.Second) - c := d.mongoClient.Database(config.Config.Mongo.DBDatabase).Collection(cUserToSuperGroup) - var user UserToSuperGroup - _ = c.FindOne(ctx, bson.M{"user_id": userID}).Decode(&user) - return user, nil -} - -func (d *db.DataBases) DeleteSuperGroup(groupID string) error { - ctx, _ := context.WithTimeout(context.Background(), time.Duration(config.Config.Mongo.DBTimeout)*time.Second) - c := d.mongoClient.Database(config.Config.Mongo.DBDatabase).Collection(cSuperGroup) - session, err := d.mongoClient.StartSession() - if err != nil { - return utils.Wrap(err, "start session failed") - } - defer session.EndSession(ctx) - sCtx := mongo.NewSessionContext(ctx, session) - superGroup := &SuperGroup{} - result := c.FindOneAndDelete(sCtx, bson.M{"group_id": groupID}) - err = result.Decode(superGroup) - if err != nil { - session.AbortTransaction(ctx) - return utils.Wrap(err, "transaction failed") - } - if err = d.RemoveGroupFromUser(ctx, sCtx, groupID, superGroup.MemberIDList); err != nil { - session.AbortTransaction(ctx) - return utils.Wrap(err, "transaction failed") - } - session.CommitTransaction(ctx) - return nil -} - -func (d *db.DataBases) RemoveGroupFromUser(ctx, sCtx context.Context, groupID string, userIDList []string) error { - var users []UserToSuperGroup - for _, v := range userIDList { - users = append(users, UserToSuperGroup{ - UserID: v, - }) - } - c := d.mongoClient.Database(config.Config.Mongo.DBDatabase).Collection(cUserToSuperGroup) - _, err := c.UpdateOne(sCtx, bson.M{"user_id": bson.M{"$in": userIDList}}, bson.M{"$pull": bson.M{"group_id_list": groupID}}) - if err != nil { - return utils.Wrap(err, "UpdateOne transaction failed") - } - return err -} - func generateTagID(tagName, userID string) string { return utils.Md5(tagName + userID + strconv.Itoa(rand.Int()) + time.Now().String()) } diff --git a/pkg/discovery_registry/discovery_register.go b/pkg/discovery_registry/discovery_register.go index be13103fc..56b2eaf34 100644 --- a/pkg/discovery_registry/discovery_register.go +++ b/pkg/discovery_registry/discovery_register.go @@ -1,54 +1,44 @@ package discoveryRegistry import ( - "Open_IM/pkg/common/config" - "Open_IM/pkg/utils" - "context" - "fmt" - "github.com/OpenIMSDK/getcdv3" - clientv3 "go.etcd.io/etcd/client/v3" "google.golang.org/grpc" - "time" - - "gopkg.in/yaml.v3" - "strings" ) type SvcDiscoveryRegistry interface { Register(serviceName, host string, port int, opts ...grpc.DialOption) error UnRegister() error GetConns(serviceName string, opts ...grpc.DialOption) ([]*grpc.ClientConn, error) - GetConn(serviceName string, strategy func(slice []*grpc.ClientConn) int, opts ...grpc.DialOption) (*grpc.ClientConn, error) + GetConn(serviceName string, opts ...grpc.DialOption) (*grpc.ClientConn, error) //RegisterConf(conf []byte) error //LoadConf() ([]byte, error) } -func registerConf(key, conf string) { - etcdAddr := strings.Join(config.Config.Etcd.EtcdAddr, ",") - cli, err := clientv3.New(clientv3.Config{ - Endpoints: strings.Split(etcdAddr, ","), DialTimeout: 5 * time.Second}) - - if err != nil { - panic(err.Error()) - } - //lease - if _, err := cli.Put(context.Background(), key, conf); err != nil { - fmt.Println("panic, params: ") - panic(err.Error()) - } -} - -func RegisterConf() { - bytes, err := yaml.Marshal(config.Config) - if err != nil { - panic(err.Error()) - } - secretMD5 := utils.Md5(config.Config.Etcd.Secret) - confBytes, err := utils.AesEncrypt(bytes, []byte(secretMD5[0:16])) - if err != nil { - panic(err.Error()) - } - fmt.Println("start register", secretMD5, getcdv3.GetPrefix(config.Config.Etcd.EtcdSchema, config.ConfName)) - registerConf(getcdv3.GetPrefix(config.Config.Etcd.EtcdSchema, config.ConfName), string(confBytes)) - fmt.Println("etcd register conf ok") -} +//func registerConf(key, conf string) { +// etcdAddr := strings.Join(config.Config.Etcd.EtcdAddr, ",") +// cli, err := clientv3.New(clientv3.Config{ +// Endpoints: strings.Split(etcdAddr, ","), DialTimeout: 5 * time.Second}) +// +// if err != nil { +// panic(err.Error()) +// } +// //lease +// if _, err := cli.Put(context.Background(), key, conf); err != nil { +// fmt.Println("panic, params: ") +// panic(err.Error()) +// } +//} +// +//func RegisterConf() { +// bytes, err := yaml.Marshal(config.Config) +// if err != nil { +// panic(err.Error()) +// } +// secretMD5 := utils.Md5(config.Config.Etcd.Secret) +// confBytes, err := utils.AesEncrypt(bytes, []byte(secretMD5[0:16])) +// if err != nil { +// panic(err.Error()) +// } +// fmt.Println("start register", secretMD5, getcdv3.GetPrefix(config.Config.Etcd.EtcdSchema, config.ConfName)) +// registerConf(getcdv3.GetPrefix(config.Config.Etcd.EtcdSchema, config.ConfName), string(confBytes)) +// fmt.Println("etcd register conf ok") +//} diff --git a/pkg/utils/get_server_ip.go b/pkg/utils/get_server_ip.go index 6ae5d877f..b8056af9d 100644 --- a/pkg/utils/get_server_ip.go +++ b/pkg/utils/get_server_ip.go @@ -1,7 +1,6 @@ package utils import ( - "Open_IM/pkg/common/config" "errors" "net" ) @@ -24,15 +23,3 @@ func GetLocalIP() (string, error) { return "", errors.New("no ip") } - -func GetRpcIP() (string, error) { - registerIP := config.Config.RpcRegisterIP - if registerIP == "" { - ip, err := GetLocalIP() - if err != nil { - return "", err - } - registerIP = ip - } - return registerIP, nil -}