diff --git a/internal/msgtransfer/online_history_msg_handler.go b/internal/msgtransfer/online_history_msg_handler.go index 0e06c9624..4e9d66bcc 100644 --- a/internal/msgtransfer/online_history_msg_handler.go +++ b/internal/msgtransfer/online_history_msg_handler.go @@ -82,7 +82,7 @@ type OnlineHistoryRedisConsumerHandler struct { } func NewOnlineHistoryRedisConsumerHandler(kafkaConf *config.Kafka, database controller.CommonMsgDatabase, conversationRpcClient *rpcclient.ConversationRpcClient, groupRpcClient *rpcclient.GroupRpcClient) (*OnlineHistoryRedisConsumerHandler, error) { - historyConsumerGroup, err := kafka.NewMConsumerGroup(kafkaConf.Config, kafkaConf.ConsumerGroupID.MsgToRedis, []string{kafkaConf.LatestMsgToRedis.Topic}) + historyConsumerGroup, err := kafka.NewMConsumerGroup(&kafkaConf.Config, kafkaConf.ConsumerGroupID.MsgToRedis, []string{kafkaConf.LatestMsgToRedis.Topic}) if err != nil { return nil, err } diff --git a/internal/msgtransfer/online_msg_to_mongo_handler.go b/internal/msgtransfer/online_msg_to_mongo_handler.go index 39a6a4906..699211545 100644 --- a/internal/msgtransfer/online_msg_to_mongo_handler.go +++ b/internal/msgtransfer/online_msg_to_mongo_handler.go @@ -33,7 +33,7 @@ type OnlineHistoryMongoConsumerHandler struct { } func NewOnlineHistoryMongoConsumerHandler(kafkaConf *config.Kafka, database controller.CommonMsgDatabase) (*OnlineHistoryMongoConsumerHandler, error) { - historyConsumerGroup, err := kfk.NewMConsumerGroup(kafkaConf.Config, kafkaConf.ConsumerGroupID.MsgToMongo, []string{kafkaConf.MsgToMongo.Topic}) + historyConsumerGroup, err := kfk.NewMConsumerGroup(&kafkaConf.Config, kafkaConf.ConsumerGroupID.MsgToMongo, []string{kafkaConf.MsgToMongo.Topic}) if err != nil { return nil, err } diff --git a/internal/push/push_handler.go b/internal/push/push_handler.go index 647082966..abdb7ba89 100644 --- a/internal/push/push_handler.go +++ b/internal/push/push_handler.go @@ -35,7 +35,7 @@ type ConsumerHandler struct { } func NewConsumerHandler(kafkaConf *config.Kafka, pusher *Pusher) (*ConsumerHandler, error) { - pushConsumerGroup, err := kfk.NewMConsumerGroup(kafkaConf.Config, kafkaConf.ConsumerGroupID.MsgToPush, []string{kafkaConf.MsgToPush.Topic}) + pushConsumerGroup, err := kfk.NewMConsumerGroup(&kafkaConf.Config, kafkaConf.ConsumerGroupID.MsgToPush, []string{kafkaConf.MsgToPush.Topic}) if err != nil { return nil, err } diff --git a/internal/rpc/group/group.go b/internal/rpc/group/group.go index e01ff6fad..ff0c27576 100644 --- a/internal/rpc/group/group.go +++ b/internal/rpc/group/group.go @@ -17,6 +17,12 @@ package group import ( "context" "fmt" + "github.com/openimsdk/open-im-server/v3/pkg/common/servererrs" + "github.com/openimsdk/tools/db/mongoutil" + "github.com/openimsdk/tools/db/redisutil" + "github.com/openimsdk/tools/discovery" + "github.com/openimsdk/tools/utils/datautil" + "github.com/openimsdk/tools/utils/encrypt" "math/big" "math/rand" "strconv" @@ -27,11 +33,9 @@ import ( "github.com/openimsdk/open-im-server/v3/pkg/callbackstruct" "github.com/openimsdk/open-im-server/v3/pkg/common/config" "github.com/openimsdk/open-im-server/v3/pkg/common/convert" - "github.com/openimsdk/open-im-server/v3/pkg/common/db/cache" "github.com/openimsdk/open-im-server/v3/pkg/common/db/controller" "github.com/openimsdk/open-im-server/v3/pkg/common/db/mgo" relationtb "github.com/openimsdk/open-im-server/v3/pkg/common/db/table/relation" - "github.com/openimsdk/open-im-server/v3/pkg/common/db/unrelation" "github.com/openimsdk/open-im-server/v3/pkg/msgprocessor" "github.com/openimsdk/open-im-server/v3/pkg/rpcclient" "github.com/openimsdk/open-im-server/v3/pkg/rpcclient/grouphash" @@ -40,13 +44,10 @@ import ( pbconversation "github.com/openimsdk/protocol/conversation" pbgroup "github.com/openimsdk/protocol/group" "github.com/openimsdk/protocol/sdkws" - "github.com/openimsdk/tools/discoveryregistry" "github.com/openimsdk/tools/errs" "github.com/openimsdk/tools/log" "github.com/openimsdk/tools/mcontext" "github.com/openimsdk/tools/mw/specialerror" - "github.com/openimsdk/tools/tx" - "github.com/openimsdk/tools/utils" "google.golang.org/grpc" "google.golang.org/protobuf/types/known/wrapperspb" ) @@ -60,36 +61,32 @@ type groupServer struct { config *config.GlobalConfig } -func Start(ctx context.Context, config *config.GlobalConfig, client discoveryregistry.SvcDiscoveryRegistry, server *grpc.Server) error { - mongo, err := unrelation.NewMongoDB(ctx, &config.Mongo) +func Start(ctx context.Context, config *config.GlobalConfig, client discovery.SvcDiscoveryRegistry, server *grpc.Server) error { + mgocli, err := mongoutil.NewMongoDB(ctx, config.Mongo.Build()) if err != nil { return err } - rdb, err := cache.NewRedis(ctx, &config.Redis) + rdb, err := redisutil.NewRedisClient(ctx, config.Redis.Build()) if err != nil { return err } - - groupDB, err := mgo.NewGroupMongo(mongo.GetDatabase(config.Mongo.Database)) + groupDB, err := mgo.NewGroupMongo(mgocli.GetDB()) if err != nil { return err } - - groupMemberDB, err := mgo.NewGroupMember(mongo.GetDatabase(config.Mongo.Database)) + groupMemberDB, err := mgo.NewGroupMember(mgocli.GetDB()) if err != nil { return err } - - groupRequestDB, err := mgo.NewGroupRequestMgo(mongo.GetDatabase(config.Mongo.Database)) + groupRequestDB, err := mgo.NewGroupRequestMgo(mgocli.GetDB()) if err != nil { return err } - userRpcClient := rpcclient.NewUserRpcClient(client, config.RpcRegisterName.OpenImUserName, &config.Manager, &config.IMAdmin) msgRpcClient := rpcclient.NewMessageRpcClient(client, config.RpcRegisterName.OpenImMsgName) conversationRpcClient := rpcclient.NewConversationRpcClient(client, config.RpcRegisterName.OpenImConversationName) var gs groupServer - database := controller.NewGroupDatabase(rdb, groupDB, groupMemberDB, groupRequestDB, tx.NewMongo(mongo.GetClient()), grouphash.NewGroupHashFromGroupServer(&gs)) + database := controller.NewGroupDatabase(rdb, groupDB, groupMemberDB, groupRequestDB, mgocli.GetTx(), grouphash.NewGroupHashFromGroupServer(&gs)) gs.db = database gs.User = userRpcClient gs.Notification = notification.NewGroupNotificationSender(database, &msgRpcClient, &userRpcClient, config, func(ctx context.Context, userIDs []string) ([]notification.CommonUser, error) { @@ -97,7 +94,7 @@ func Start(ctx context.Context, config *config.GlobalConfig, client discoveryreg if err != nil { return nil, err } - return utils.Slice(users, func(e *sdkws.UserInfo) notification.CommonUser { return e }), nil + return datautil.Slice(users, func(e *sdkws.UserInfo) notification.CommonUser { return e }), nil }) gs.conversationRpcClient = conversationRpcClient gs.msgRpcClient = msgRpcClient @@ -156,7 +153,7 @@ func (s *groupServer) GetPublicUserInfoMap(ctx context.Context, userIDs []string if err != nil { return nil, err } - return utils.SliceToMapAny(users, func(e *sdkws.PublicUserInfo) (string, *sdkws.PublicUserInfo) { + return datautil.SliceToMapAny(users, func(e *sdkws.PublicUserInfo) (string, *sdkws.PublicUserInfo) { return e.UserID, e }), nil } @@ -169,7 +166,7 @@ func (s *groupServer) GenGroupID(ctx context.Context, groupID *string) error { if *groupID != "" { _, err := s.db.TakeGroup(ctx, *groupID) if err == nil { - return errs.ErrGroupIDExisted.WrapMsg("group id existed " + *groupID) + return servererrs.ErrGroupIDExisted.WrapMsg("group id existed " + *groupID) } else if s.IsNotFound(err) { return nil } else { @@ -177,7 +174,7 @@ func (s *groupServer) GenGroupID(ctx context.Context, groupID *string) error { } } for i := 0; i < 10; i++ { - id := utils.Md5(strings.Join([]string{mcontext.GetOperationID(ctx), strconv.FormatInt(time.Now().UnixNano(), 10), strconv.Itoa(rand.Int())}, ",;,")) + id := encrypt.Md5(strings.Join([]string{mcontext.GetOperationID(ctx), strconv.FormatInt(time.Now().UnixNano(), 10), strconv.Itoa(rand.Int())}, ",;,")) bi := big.NewInt(0) bi.SetString(id[0:8], 16) id = bi.String() @@ -191,7 +188,7 @@ func (s *groupServer) GenGroupID(ctx context.Context, groupID *string) error { return err } } - return errs.ErrData.WrapMsg("group id gen error") + return servererrs.ErrData.WrapMsg("group id gen error") } func (s *groupServer) CreateGroup(ctx context.Context, req *pbgroup.CreateGroupReq) (*pbgroup.CreateGroupResp, error) { @@ -206,11 +203,11 @@ func (s *groupServer) CreateGroup(ctx context.Context, req *pbgroup.CreateGroupR } userIDs := append(append(req.MemberUserIDs, req.AdminUserIDs...), req.OwnerUserID) opUserID := mcontext.GetOpUserID(ctx) - if !utils.Contain(opUserID, userIDs...) { + if !datautil.Contain(opUserID, userIDs...) { userIDs = append(userIDs, opUserID) } - if utils.Duplicate(userIDs) { + if datautil.Duplicate(userIDs) { return nil, errs.ErrArgs.WrapMsg("group member repeated") } @@ -220,7 +217,7 @@ func (s *groupServer) CreateGroup(ctx context.Context, req *pbgroup.CreateGroupR } if len(userMap) != len(userIDs) { - return nil, errs.ErrUserIDNotFound.WrapMsg("user not found") + return nil, servererrs.ErrUserIDNotFound.WrapMsg("user not found") } config := &GroupEventCallbackConfig{ @@ -348,7 +345,7 @@ func (s *groupServer) GetJoinedGroupList(ctx context.Context, req *pbgroup.GetJo if len(members) == 0 { return resp, nil } - groupIDs := utils.Slice(members, func(e *relationtb.GroupMemberModel) string { + groupIDs := datautil.Slice(members, func(e *relationtb.GroupMemberModel) string { return e.GroupID }) groups, err := s.db.FindGroup(ctx, groupIDs) @@ -366,10 +363,10 @@ func (s *groupServer) GetJoinedGroupList(ctx context.Context, req *pbgroup.GetJo if err := s.PopulateGroupMember(ctx, members...); err != nil { return nil, err } - ownerMap := utils.SliceToMap(owners, func(e *relationtb.GroupMemberModel) string { + ownerMap := datautil.SliceToMap(owners, func(e *relationtb.GroupMemberModel) string { return e.GroupID }) - resp.Groups = utils.Slice(utils.Order(groupIDs, groups, func(group *relationtb.GroupModel) string { + resp.Groups = datautil.Slice(datautil.Order(groupIDs, groups, func(group *relationtb.GroupModel) string { return group.GroupID }), func(group *relationtb.GroupModel) *sdkws.GroupInfo { var userID string @@ -387,7 +384,7 @@ func (s *groupServer) InviteUserToGroup(ctx context.Context, req *pbgroup.Invite if len(req.InvitedUserIDs) == 0 { return nil, errs.ErrArgs.WrapMsg("user empty") } - if utils.Duplicate(req.InvitedUserIDs) { + if datautil.Duplicate(req.InvitedUserIDs) { return nil, errs.ErrArgs.WrapMsg("userID duplicate") } group, err := s.db.TakeGroup(ctx, req.GroupID) @@ -396,7 +393,7 @@ func (s *groupServer) InviteUserToGroup(ctx context.Context, req *pbgroup.Invite } if group.Status == constant.GroupStatusDismissed { - return nil, errs.ErrDismissedAlready.WrapMsg("group dismissed checking group status found it dismissed") + return nil, servererrs.ErrDismissedAlready.WrapMsg("group dismissed checking group status found it dismissed") } userMap, err := s.User.GetUsersInfoMap(ctx, req.InvitedUserIDs) @@ -499,7 +496,7 @@ func (s *groupServer) GetGroupAllMember(ctx context.Context, req *pbgroup.GetGro return nil, err } resp := &pbgroup.GetGroupAllMemberResp{} - resp.Members = utils.Slice(members, func(e *relationtb.GroupMemberModel) *sdkws.GroupMemberFullInfo { + resp.Members = datautil.Slice(members, func(e *relationtb.GroupMemberModel) *sdkws.GroupMemberFullInfo { return convert.Db2PbGroupMember(e) }) return resp, nil @@ -538,13 +535,13 @@ func (s *groupServer) GetGroupMemberList(ctx context.Context, req *pbgroup.GetGr } } - GMembers := utils.Paginate(groupMembers, int(req.Pagination.GetPageNumber()), int(req.Pagination.GetShowNumber())) - resp.Members = utils.Batch(convert.Db2PbGroupMember, GMembers) + GMembers := datautil.Paginate(groupMembers, int(req.Pagination.GetPageNumber()), int(req.Pagination.GetShowNumber())) + resp.Members = datautil.Batch(convert.Db2PbGroupMember, GMembers) resp.Total = uint32(total) return resp, nil } resp.Total = uint32(total) - resp.Members = utils.Batch(convert.Db2PbGroupMember, members) + resp.Members = datautil.Batch(convert.Db2PbGroupMember, members) return resp, nil } @@ -557,11 +554,11 @@ func (s *groupServer) KickGroupMember(ctx context.Context, req *pbgroup.KickGrou if len(req.KickedUserIDs) == 0 { return nil, errs.ErrArgs.WrapMsg("KickedUserIDs empty") } - if utils.IsDuplicateStringSlice(req.KickedUserIDs) { + if datautil.Duplicate(req.KickedUserIDs) { return nil, errs.ErrArgs.WrapMsg("KickedUserIDs duplicate") } opUserID := mcontext.GetOpUserID(ctx) - if utils.IsContain(opUserID, req.KickedUserIDs) { + if datautil.Contain(opUserID, req.KickedUserIDs...) { return nil, errs.ErrArgs.WrapMsg("opUserID in KickedUserIDs") } members, err := s.db.FindGroupMembers(ctx, req.GroupID, append(req.KickedUserIDs, opUserID)) @@ -580,7 +577,7 @@ func (s *groupServer) KickGroupMember(ctx context.Context, req *pbgroup.KickGrou for _, userID := range req.KickedUserIDs { member, ok := memberMap[userID] if !ok { - return nil, errs.ErrUserIDNotFound.WrapMsg(userID) + return nil, servererrs.ErrUserIDNotFound.WrapMsg(userID) } if !isAppManagerUid { if opMember == nil { @@ -673,7 +670,7 @@ func (s *groupServer) GetGroupMembersInfo(ctx context.Context, req *pbgroup.GetG if err := s.PopulateGroupMember(ctx, members...); err != nil { return nil, err } - resp.Members = utils.Slice(members, func(e *relationtb.GroupMemberModel) *sdkws.GroupMemberFullInfo { + resp.Members = datautil.Slice(members, func(e *relationtb.GroupMemberModel) *sdkws.GroupMemberFullInfo { return convert.Db2PbGroupMember(e) }) return resp, nil @@ -702,20 +699,20 @@ func (s *groupServer) GetGroupApplicationList(ctx context.Context, req *pbgroup. for _, gr := range groupRequests { userIDs = append(userIDs, gr.UserID) } - userIDs = utils.Distinct(userIDs) + userIDs = datautil.Distinct(userIDs) userMap, err := s.User.GetPublicUserInfoMap(ctx, userIDs, true) if err != nil { return nil, err } - groups, err := s.db.FindGroup(ctx, utils.Distinct(groupIDs)) + groups, err := s.db.FindGroup(ctx, datautil.Distinct(groupIDs)) if err != nil { return nil, err } - groupMap := utils.SliceToMap(groups, func(e *relationtb.GroupModel) string { + groupMap := datautil.SliceToMap(groups, func(e *relationtb.GroupModel) string { return e.GroupID }) - if ids := utils.Single(utils.Keys(groupMap), groupIDs); len(ids) > 0 { - return nil, errs.ErrGroupIDNotFound.WrapMsg(strings.Join(ids, ",")) + if ids := datautil.Single(datautil.Keys(groupMap), groupIDs); len(ids) > 0 { + return nil, servererrs.ErrGroupIDNotFound.WrapMsg(strings.Join(ids, ",")) } groupMemberNumMap, err := s.db.MapGroupMemberNum(ctx, groupIDs) if err != nil { @@ -728,10 +725,10 @@ func (s *groupServer) GetGroupApplicationList(ctx context.Context, req *pbgroup. if err := s.PopulateGroupMember(ctx, owners...); err != nil { return nil, err } - ownerMap := utils.SliceToMap(owners, func(e *relationtb.GroupMemberModel) string { + ownerMap := datautil.SliceToMap(owners, func(e *relationtb.GroupMemberModel) string { return e.GroupID }) - resp.GroupRequests = utils.Slice(groupRequests, func(e *relationtb.GroupRequestModel) *sdkws.GroupRequest { + resp.GroupRequests = datautil.Slice(groupRequests, func(e *relationtb.GroupRequestModel) *sdkws.GroupRequest { var ownerUserID string if owner, ok := ownerMap[e.GroupID]; ok { ownerUserID = owner.UserID @@ -761,10 +758,10 @@ func (s *groupServer) GetGroupsInfo(ctx context.Context, req *pbgroup.GetGroupsI if err := s.PopulateGroupMember(ctx, owners...); err != nil { return nil, err } - ownerMap := utils.SliceToMap(owners, func(e *relationtb.GroupMemberModel) string { + ownerMap := datautil.SliceToMap(owners, func(e *relationtb.GroupMemberModel) string { return e.GroupID }) - resp.GroupInfos = utils.Slice(groups, func(e *relationtb.GroupModel) *sdkws.GroupInfo { + resp.GroupInfos = datautil.Slice(groups, func(e *relationtb.GroupModel) *sdkws.GroupInfo { var ownerUserID string if owner, ok := ownerMap[e.GroupID]; ok { ownerUserID = owner.UserID @@ -775,7 +772,7 @@ func (s *groupServer) GetGroupsInfo(ctx context.Context, req *pbgroup.GetGroupsI } func (s *groupServer) GroupApplicationResponse(ctx context.Context, req *pbgroup.GroupApplicationResponseReq) (*pbgroup.GroupApplicationResponseResp, error) { - if !utils.Contain(req.HandleResult, constant.GroupResponseAgree, constant.GroupResponseRefuse) { + if !datautil.Contain(req.HandleResult, constant.GroupResponseAgree, constant.GroupResponseRefuse) { return nil, errs.ErrArgs.WrapMsg("HandleResult unknown") } if !authverify.IsAppManagerUid(ctx, &s.config.Manager, &s.config.IMAdmin) { @@ -796,7 +793,7 @@ func (s *groupServer) GroupApplicationResponse(ctx context.Context, req *pbgroup return nil, err } if groupRequest.HandleResult != 0 { - return nil, errs.ErrGroupRequestHandled.WrapMsg("group request already processed") + return nil, servererrs.ErrGroupRequestHandled.WrapMsg("group request already processed") } var inGroup bool if _, err := s.db.TakeGroupMember(ctx, req.GroupID, req.FromUserID); err == nil { @@ -864,7 +861,7 @@ func (s *groupServer) JoinGroup(ctx context.Context, req *pbgroup.JoinGroupReq) return nil, err } if group.Status == constant.GroupStatusDismissed { - return nil, errs.ErrDismissedAlready.Wrap() + return nil, servererrs.ErrDismissedAlready.Wrap() } reqCall := &callbackstruct.CallbackJoinGroupReq{ @@ -886,7 +883,7 @@ func (s *groupServer) JoinGroup(ctx context.Context, req *pbgroup.JoinGroupReq) _, err = s.db.TakeGroupMember(ctx, req.GroupID, req.InviterUserID) if err == nil { return nil, errs.ErrArgs.Wrap() - } else if !s.IsNotFound(err) && utils.Unwrap(err) != errs.ErrRecordNotFound { + } else if !s.IsNotFound(err) && errs.Unwrap(err) != errs.ErrRecordNotFound { return nil, err } log.ZInfo(ctx, "JoinGroup.groupInfo", "group", group, "eq", group.NeedVerification == constant.Directly) @@ -1012,7 +1009,7 @@ func (s *groupServer) SetGroupInfo(ctx context.Context, req *pbgroup.SetGroupInf return nil, err } if group.Status == constant.GroupStatusDismissed { - return nil, errs.ErrDismissedAlready.Wrap() + return nil, servererrs.ErrDismissedAlready.Wrap() } resp := &pbgroup.SetGroupInfoResp{} @@ -1089,7 +1086,7 @@ func (s *groupServer) TransferGroupOwner(ctx context.Context, req *pbgroup.Trans return nil, err } if group.Status == constant.GroupStatusDismissed { - return nil, errs.ErrDismissedAlready.Wrap() + return nil, servererrs.ErrDismissedAlready.Wrap() } if req.OldOwnerUserID == req.NewOwnerUserID { return nil, errs.ErrArgs.WrapMsg("OldOwnerUserID == NewOwnerUserID") @@ -1101,8 +1098,8 @@ func (s *groupServer) TransferGroupOwner(ctx context.Context, req *pbgroup.Trans if err := s.PopulateGroupMember(ctx, members...); err != nil { return nil, err } - memberMap := utils.SliceToMap(members, func(e *relationtb.GroupMemberModel) string { return e.UserID }) - if ids := utils.Single([]string{req.OldOwnerUserID, req.NewOwnerUserID}, utils.Keys(memberMap)); len(ids) > 0 { + memberMap := datautil.SliceToMap(members, func(e *relationtb.GroupMemberModel) string { return e.UserID }) + if ids := datautil.Single([]string{req.OldOwnerUserID, req.NewOwnerUserID}, datautil.Keys(memberMap)); len(ids) > 0 { return nil, errs.ErrArgs.WrapMsg("user not in group " + strings.Join(ids, ",")) } oldOwner := memberMap[req.OldOwnerUserID] @@ -1154,7 +1151,7 @@ func (s *groupServer) GetGroups(ctx context.Context, req *pbgroup.GetGroupsReq) return nil, err } - groupIDs := utils.Slice(group, func(e *relationtb.GroupModel) string { + groupIDs := datautil.Slice(group, func(e *relationtb.GroupModel) string { return e.GroupID }) @@ -1163,14 +1160,14 @@ func (s *groupServer) GetGroups(ctx context.Context, req *pbgroup.GetGroupsReq) return nil, err } - ownerMemberMap := utils.SliceToMap(ownerMembers, func(e *relationtb.GroupMemberModel) string { + ownerMemberMap := datautil.SliceToMap(ownerMembers, func(e *relationtb.GroupMemberModel) string { return e.GroupID }) groupMemberNumMap, err := s.db.MapGroupMemberNum(ctx, groupIDs) if err != nil { return nil, err } - resp.Groups = utils.Slice(group, func(group *relationtb.GroupModel) *pbgroup.CMSGroup { + resp.Groups = datautil.Slice(group, func(group *relationtb.GroupModel) *pbgroup.CMSGroup { var ( userID string username string @@ -1194,7 +1191,7 @@ func (s *groupServer) GetGroupMembersCMS(ctx context.Context, req *pbgroup.GetGr if err := s.PopulateGroupMember(ctx, members...); err != nil { return nil, err } - resp.Members = utils.Slice(members, func(e *relationtb.GroupMemberModel) *sdkws.GroupMemberFullInfo { + resp.Members = datautil.Slice(members, func(e *relationtb.GroupMemberModel) *sdkws.GroupMemberFullInfo { return convert.Db2PbGroupMember(e) }) return resp, nil @@ -1214,14 +1211,14 @@ func (s *groupServer) GetUserReqApplicationList(ctx context.Context, req *pbgrou if len(requests) == 0 { return resp, nil } - groupIDs := utils.Distinct(utils.Slice(requests, func(e *relationtb.GroupRequestModel) string { + groupIDs := datautil.Distinct(datautil.Slice(requests, func(e *relationtb.GroupRequestModel) string { return e.GroupID })) groups, err := s.db.FindGroup(ctx, groupIDs) if err != nil { return nil, err } - groupMap := utils.SliceToMap(groups, func(e *relationtb.GroupModel) string { + groupMap := datautil.SliceToMap(groups, func(e *relationtb.GroupModel) string { return e.GroupID }) owners, err := s.db.FindGroupsOwner(ctx, groupIDs) @@ -1231,14 +1228,14 @@ func (s *groupServer) GetUserReqApplicationList(ctx context.Context, req *pbgrou if err := s.PopulateGroupMember(ctx, owners...); err != nil { return nil, err } - ownerMap := utils.SliceToMap(owners, func(e *relationtb.GroupMemberModel) string { + ownerMap := datautil.SliceToMap(owners, func(e *relationtb.GroupMemberModel) string { return e.GroupID }) groupMemberNum, err := s.db.MapGroupMemberNum(ctx, groupIDs) if err != nil { return nil, err } - resp.GroupRequests = utils.Slice(requests, func(e *relationtb.GroupRequestModel) *sdkws.GroupRequest { + resp.GroupRequests = datautil.Slice(requests, func(e *relationtb.GroupRequestModel) *sdkws.GroupRequest { var ownerUserID string if owner, ok := ownerMap[e.GroupID]; ok { ownerUserID = owner.UserID @@ -1267,7 +1264,7 @@ func (s *groupServer) DismissGroup(ctx context.Context, req *pbgroup.DismissGrou return nil, err } if !req.DeleteMember && group.Status == constant.GroupStatusDismissed { - return nil, errs.ErrDismissedAlready.WrapMsg("group status is dismissed") + return nil, servererrs.ErrDismissedAlready.WrapMsg("group status is dismissed") } if err := s.db.DismissGroup(ctx, req.GroupID, req.DeleteMember); err != nil { return nil, err @@ -1504,7 +1501,7 @@ func (s *groupServer) SetGroupMemberInfo(ctx context.Context, req *pbgroup.SetGr return nil, err } } - if err := s.db.UpdateGroupMembers(ctx, utils.Slice(req.Members, func(e *pbgroup.SetGroupMemberInfo) *relationtb.BatchUpdateGroupMember { + if err := s.db.UpdateGroupMembers(ctx, datautil.Slice(req.Members, func(e *pbgroup.SetGroupMemberInfo) *relationtb.BatchUpdateGroupMember { return &relationtb.BatchUpdateGroupMember{ GroupID: e.GroupID, UserID: e.UserID, @@ -1541,26 +1538,26 @@ func (s *groupServer) GetGroupAbstractInfo(ctx context.Context, req *pbgroup.Get if len(req.GroupIDs) == 0 { return nil, errs.ErrArgs.WrapMsg("groupIDs empty") } - if utils.Duplicate(req.GroupIDs) { + if datautil.Duplicate(req.GroupIDs) { return nil, errs.ErrArgs.WrapMsg("groupIDs duplicate") } groups, err := s.db.FindGroup(ctx, req.GroupIDs) if err != nil { return nil, err } - if ids := utils.Single(req.GroupIDs, utils.Slice(groups, func(group *relationtb.GroupModel) string { + if ids := datautil.Single(req.GroupIDs, datautil.Slice(groups, func(group *relationtb.GroupModel) string { return group.GroupID })); len(ids) > 0 { - return nil, errs.ErrGroupIDNotFound.WrapMsg("not found group " + strings.Join(ids, ",")) + return nil, servererrs.ErrGroupIDNotFound.WrapMsg("not found group " + strings.Join(ids, ",")) } groupUserMap, err := s.db.MapGroupMemberUserID(ctx, req.GroupIDs) if err != nil { return nil, err } - if ids := utils.Single(req.GroupIDs, utils.Keys(groupUserMap)); len(ids) > 0 { - return nil, errs.ErrGroupIDNotFound.WrapMsg(fmt.Sprintf("group %s not found member", strings.Join(ids, ","))) + if ids := datautil.Single(req.GroupIDs, datautil.Keys(groupUserMap)); len(ids) > 0 { + return nil, servererrs.ErrGroupIDNotFound.WrapMsg(fmt.Sprintf("group %s not found member", strings.Join(ids, ","))) } - resp.GroupAbstractInfos = utils.Slice(groups, func(group *relationtb.GroupModel) *pbgroup.GroupAbstractInfo { + resp.GroupAbstractInfos = datautil.Slice(groups, func(group *relationtb.GroupModel) *pbgroup.GroupAbstractInfo { users := groupUserMap[group.GroupID] return convert.Db2PbGroupAbstractInfo(group.GroupID, users.MemberNum, users.Hash) }) @@ -1579,7 +1576,7 @@ func (s *groupServer) GetUserInGroupMembers(ctx context.Context, req *pbgroup.Ge if err := s.PopulateGroupMember(ctx, members...); err != nil { return nil, err } - resp.Members = utils.Slice(members, func(e *relationtb.GroupMemberModel) *sdkws.GroupMemberFullInfo { + resp.Members = datautil.Slice(members, func(e *relationtb.GroupMemberModel) *sdkws.GroupMemberFullInfo { return convert.Db2PbGroupMember(e) }) return resp, nil @@ -1606,7 +1603,7 @@ func (s *groupServer) GetGroupMemberRoleLevel(ctx context.Context, req *pbgroup. if err := s.PopulateGroupMember(ctx, members...); err != nil { return nil, err } - resp.Members = utils.Slice(members, func(e *relationtb.GroupMemberModel) *sdkws.GroupMemberFullInfo { + resp.Members = datautil.Slice(members, func(e *relationtb.GroupMemberModel) *sdkws.GroupMemberFullInfo { return convert.Db2PbGroupMember(e) }) return resp, nil @@ -1621,18 +1618,18 @@ func (s *groupServer) GetGroupUsersReqApplicationList(ctx context.Context, req * if len(requests) == 0 { return resp, nil } - groupIDs := utils.Distinct(utils.Slice(requests, func(e *relationtb.GroupRequestModel) string { + groupIDs := datautil.Distinct(datautil.Slice(requests, func(e *relationtb.GroupRequestModel) string { return e.GroupID })) groups, err := s.db.FindGroup(ctx, groupIDs) if err != nil { return nil, err } - groupMap := utils.SliceToMap(groups, func(e *relationtb.GroupModel) string { + groupMap := datautil.SliceToMap(groups, func(e *relationtb.GroupModel) string { return e.GroupID }) - if ids := utils.Single(groupIDs, utils.Keys(groupMap)); len(ids) > 0 { - return nil, errs.ErrGroupIDNotFound.WrapMsg(strings.Join(ids, ",")) + if ids := datautil.Single(groupIDs, datautil.Keys(groupMap)); len(ids) > 0 { + return nil, servererrs.ErrGroupIDNotFound.WrapMsg(strings.Join(ids, ",")) } owners, err := s.db.FindGroupsOwner(ctx, groupIDs) if err != nil { @@ -1641,14 +1638,14 @@ func (s *groupServer) GetGroupUsersReqApplicationList(ctx context.Context, req * if err := s.PopulateGroupMember(ctx, owners...); err != nil { return nil, err } - ownerMap := utils.SliceToMap(owners, func(e *relationtb.GroupMemberModel) string { + ownerMap := datautil.SliceToMap(owners, func(e *relationtb.GroupMemberModel) string { return e.GroupID }) groupMemberNum, err := s.db.MapGroupMemberNum(ctx, groupIDs) if err != nil { return nil, err } - resp.GroupRequests = utils.Slice(requests, func(e *relationtb.GroupRequestModel) *sdkws.GroupRequest { + resp.GroupRequests = datautil.Slice(requests, func(e *relationtb.GroupRequestModel) *sdkws.GroupRequest { var ownerUserID string if owner, ok := ownerMap[e.GroupID]; ok { ownerUserID = owner.UserID diff --git a/internal/rpc/third/log.go b/internal/rpc/third/log.go index b6f4d359f..e9df513aa 100644 --- a/internal/rpc/third/log.go +++ b/internal/rpc/third/log.go @@ -18,6 +18,9 @@ import ( "context" "crypto/rand" "fmt" + "github.com/openimsdk/open-im-server/v3/pkg/common/servererrs" + "github.com/openimsdk/tools/utils/datautil" + "github.com/openimsdk/tools/utils/stringutil" "time" "github.com/openimsdk/open-im-server/v3/pkg/authverify" @@ -25,8 +28,6 @@ import ( "github.com/openimsdk/protocol/constant" "github.com/openimsdk/protocol/third" "github.com/openimsdk/tools/errs" - "github.com/openimsdk/tools/utils" - utils2 "github.com/openimsdk/tools/utils" ) func genLogID() string { @@ -70,7 +71,7 @@ func (t *thirdServer) UploadLogs(ctx context.Context, req *third.UploadLogsReq) } } if log.LogID == "" { - return nil, errs.ErrData.WrapMsg("LogModel id gen error") + return nil, servererrs.ErrData.WrapMsg("LogModel id gen error") } DBlogs = append(DBlogs, &log) } @@ -94,7 +95,7 @@ func (t *thirdServer) DeleteLogs(ctx context.Context, req *third.DeleteLogsReq) for _, log := range logs { logIDs = append(logIDs, log.LogID) } - if ids := utils2.Single(req.LogIDs, logIDs); len(ids) > 0 { + if ids := datautil.Single(req.LogIDs, logIDs); len(ids) > 0 { return nil, errs.ErrRecordNotFound.WrapMsg(fmt.Sprintf("logIDs not found%#v", ids)) } err = t.thirdDatabase.DeleteLogs(ctx, req.LogIDs, userID) @@ -110,7 +111,7 @@ func dbToPbLogInfos(logs []*relationtb.LogModel) []*third.LogInfo { return &third.LogInfo{ Filename: log.FileName, UserID: log.UserID, - Platform: utils.StringToInt32(log.Platform), + Platform: stringutil.StringToInt32(log.Platform), Url: log.Url, CreateTime: log.CreateTime.UnixMilli(), LogID: log.LogID, @@ -119,7 +120,7 @@ func dbToPbLogInfos(logs []*relationtb.LogModel) []*third.LogInfo { Ex: log.Ex, } } - return utils.Slice(logs, db2pbForLogInfo) + return datautil.Slice(logs, db2pbForLogInfo) } func (t *thirdServer) SearchLogs(ctx context.Context, req *third.SearchLogsReq) (*third.SearchLogsResp, error) { diff --git a/internal/rpc/third/third.go b/internal/rpc/third/third.go index 2a5e8022b..30f83beab 100644 --- a/internal/rpc/third/third.go +++ b/internal/rpc/third/third.go @@ -17,6 +17,9 @@ package third import ( "context" "fmt" + "github.com/openimsdk/tools/db/mongoutil" + "github.com/openimsdk/tools/db/redisutil" + "github.com/openimsdk/tools/discovery" "net/url" "time" @@ -28,28 +31,26 @@ import ( "github.com/openimsdk/open-im-server/v3/pkg/common/db/s3/cos" "github.com/openimsdk/open-im-server/v3/pkg/common/db/s3/minio" "github.com/openimsdk/open-im-server/v3/pkg/common/db/s3/oss" - "github.com/openimsdk/open-im-server/v3/pkg/common/db/unrelation" "github.com/openimsdk/open-im-server/v3/pkg/rpcclient" "github.com/openimsdk/protocol/third" - "github.com/openimsdk/tools/discoveryregistry" "github.com/openimsdk/tools/errs" "google.golang.org/grpc" ) -func Start(ctx context.Context, config *config.GlobalConfig, client discoveryregistry.SvcDiscoveryRegistry, server *grpc.Server) error { - rdb, err := cache.NewRedis(ctx, &config.Redis) +func Start(ctx context.Context, config *config.GlobalConfig, client discovery.SvcDiscoveryRegistry, server *grpc.Server) error { + mgocli, err := mongoutil.NewMongoDB(ctx, config.Mongo.Build()) if err != nil { return err } - mongo, err := unrelation.NewMongoDB(ctx, &config.Mongo) + rdb, err := redisutil.NewRedisClient(ctx, config.Redis.Build()) if err != nil { return err } - logdb, err := mgo.NewLogMongo(mongo.GetDatabase(config.Mongo.Database)) + logdb, err := mgo.NewLogMongo(mgocli.GetDB()) if err != nil { return err } - s3db, err := mgo.NewS3Mongo(mongo.GetDatabase(config.Mongo.Database)) + s3db, err := mgo.NewS3Mongo(mgocli.GetDB()) if err != nil { return err } diff --git a/pkg/common/db/controller/group.go b/pkg/common/db/controller/group.go index 1a11426ad..cddc22a7e 100644 --- a/pkg/common/db/controller/group.go +++ b/pkg/common/db/controller/group.go @@ -17,6 +17,7 @@ package controller import ( "context" "github.com/openimsdk/tools/db/pagination" + "github.com/openimsdk/tools/db/tx" "github.com/openimsdk/tools/utils/datautil" "time" @@ -24,7 +25,6 @@ import ( "github.com/openimsdk/open-im-server/v3/pkg/common/db/cache" relationtb "github.com/openimsdk/open-im-server/v3/pkg/common/db/table/relation" "github.com/openimsdk/protocol/constant" - "github.com/openimsdk/tools/db" "github.com/redis/go-redis/v9" ) @@ -110,7 +110,7 @@ func NewGroupDatabase( groupDB relationtb.GroupModelInterface, groupMemberDB relationtb.GroupMemberModelInterface, groupRequestDB relationtb.GroupRequestModelInterface, - ctxTx db.CtxTx, + ctxTx tx.Tx, groupHash cache.GroupHash, ) GroupDatabase { rcOptions := rockscache.NewDefaultOptions() @@ -129,7 +129,7 @@ type groupDatabase struct { groupDB relationtb.GroupModelInterface groupMemberDB relationtb.GroupMemberModelInterface groupRequestDB relationtb.GroupRequestModelInterface - ctxTx db.CtxTx + ctxTx tx.Tx cache cache.GroupCache } diff --git a/pkg/common/kafka/consumer_group.go b/pkg/common/kafka/consumer_group.go index 3741c95d4..ecb6b1f7b 100644 --- a/pkg/common/kafka/consumer_group.go +++ b/pkg/common/kafka/consumer_group.go @@ -18,8 +18,8 @@ import ( "context" "errors" "github.com/IBM/sarama" - "github.com/openimsdk/tools/db/kafka" "github.com/openimsdk/tools/log" + kfk "github.com/openimsdk/tools/mq/kafka" ) type MConsumerGroup struct { @@ -36,12 +36,12 @@ type MConsumerGroupConfig struct { Password string } -func NewMConsumerGroup(conf kafka.Config, groupID string, topics []string) (*MConsumerGroup, error) { - kfk, err := kafka.BuildConsumerGroupConfig(conf, sarama.OffsetNewest) +func NewMConsumerGroup(conf *kfk.Config, groupID string, topics []string) (*MConsumerGroup, error) { + config, err := kfk.BuildConsumerGroupConfig(conf, sarama.OffsetNewest) if err != nil { return nil, err } - group, err := kafka.NewConsumerGroup(kfk, conf.Addr, groupID) + group, err := kfk.NewConsumerGroup(config, conf.Addr, groupID) if err != nil { return nil, err } diff --git a/pkg/common/kafka/producer.go b/pkg/common/kafka/producer.go index 42bb14b4e..eedc17202 100644 --- a/pkg/common/kafka/producer.go +++ b/pkg/common/kafka/producer.go @@ -19,15 +19,13 @@ import ( "errors" "github.com/IBM/sarama" "github.com/openimsdk/protocol/constant" - "github.com/openimsdk/tools/db/kafka" "github.com/openimsdk/tools/errs" "github.com/openimsdk/tools/log" "github.com/openimsdk/tools/mcontext" + kfk "github.com/openimsdk/tools/mq/kafka" "google.golang.org/protobuf/proto" ) -const maxRetry = 10 // number of retries - var errEmptyMsg = errors.New("kafka binary msg is empty") // Producer represents a Kafka producer. @@ -45,19 +43,19 @@ type ProducerConfig struct { Password string } -func BuildProducerConfig(conf kafka.Config) (*sarama.Config, error) { - return kafka.BuildProducerConfig(conf) +func BuildProducerConfig(conf kfk.Config) (*sarama.Config, error) { + return kfk.BuildProducerConfig(conf) } -func NewKafkaProducer(kfk *sarama.Config, addr []string, topic string) (*Producer, error) { - producer, err := kafka.NewProducer(kfk, addr) +func NewKafkaProducer(config *sarama.Config, addr []string, topic string) (*Producer, error) { + producer, err := kfk.NewProducer(config, addr) if err != nil { return nil, err } return &Producer{ addr: addr, topic: topic, - config: kfk, + config: config, producer: producer, }, nil } diff --git a/pkg/common/kafka/util.go b/pkg/common/kafka/util.go deleted file mode 100644 index 4e2a02714..000000000 --- a/pkg/common/kafka/util.go +++ /dev/null @@ -1,77 +0,0 @@ -// Copyright © 2023 OpenIM. All rights reserved. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package kafka - -import ( - "fmt" - "os" - "strings" - - "github.com/IBM/sarama" - "github.com/openimsdk/open-im-server/v3/pkg/common/tls" -) - -type TLSConfig struct { - CACrt string - ClientCrt string - ClientKey string - ClientKeyPwd string - InsecureSkipVerify bool -} - -// SetupTLSConfig set up the TLS config from config file. -func SetupTLSConfig(cfg *sarama.Config, tlsConfig *TLSConfig) error { - if tlsConfig != nil { - cfg.Net.TLS.Enable = true - tlsConfig, err := tls.NewTLSConfig( - tlsConfig.ClientCrt, - tlsConfig.ClientKey, - tlsConfig.CACrt, - []byte(tlsConfig.ClientKeyPwd), - tlsConfig.InsecureSkipVerify, - ) - if err != nil { - return err - } - cfg.Net.TLS.Config = tlsConfig - } - return nil -} - -// getEnvOrConfig returns the value of the environment variable if it exists, -// otherwise, it returns the value from the configuration file. -func getEnvOrConfig(envName string, configValue string) string { - if value, exists := os.LookupEnv(envName); exists { - return value - } - return configValue -} - -// getKafkaAddrFromEnv returns the Kafka addresses combined from the KAFKA_ADDRESS and KAFKA_PORT environment variables. -// If the environment variables are not set, it returns the fallback value. -func getKafkaAddrFromEnv(fallback []string) []string { - envAddr := os.Getenv("KAFKA_ADDRESS") - envPort := os.Getenv("KAFKA_PORT") - - if envAddr != "" && envPort != "" { - addresses := strings.Split(envAddr, ",") - for i, addr := range addresses { - addresses[i] = fmt.Sprintf("%s:%s", addr, envPort) - } - return addresses - } - - return fallback -} diff --git a/pkg/rpcclient/group.go b/pkg/rpcclient/group.go index 1756211db..04f84cf52 100644 --- a/pkg/rpcclient/group.go +++ b/pkg/rpcclient/group.go @@ -16,26 +16,26 @@ package rpcclient import ( "context" + "github.com/openimsdk/open-im-server/v3/pkg/common/servererrs" + "github.com/openimsdk/tools/discovery" + "github.com/openimsdk/tools/system/program" + "github.com/openimsdk/tools/utils/datautil" "strings" - util "github.com/openimsdk/open-im-server/v3/pkg/util/genutil" "github.com/openimsdk/protocol/constant" "github.com/openimsdk/protocol/group" "github.com/openimsdk/protocol/sdkws" - "github.com/openimsdk/tools/discoveryregistry" - "github.com/openimsdk/tools/errs" - "github.com/openimsdk/tools/utils" ) type Group struct { Client group.GroupClient - discov discoveryregistry.SvcDiscoveryRegistry + discov discovery.SvcDiscoveryRegistry } -func NewGroup(discov discoveryregistry.SvcDiscoveryRegistry, rpcRegisterName string) *Group { +func NewGroup(discov discovery.SvcDiscoveryRegistry, rpcRegisterName string) *Group { conn, err := discov.GetConn(context.Background(), rpcRegisterName) if err != nil { - util.ExitWithError(err) + program.ExitWithError(err) } client := group.NewGroupClient(conn) return &Group{discov: discov, Client: client} @@ -43,7 +43,7 @@ func NewGroup(discov discoveryregistry.SvcDiscoveryRegistry, rpcRegisterName str type GroupRpcClient Group -func NewGroupRpcClient(discov discoveryregistry.SvcDiscoveryRegistry, rpcRegisterName string) GroupRpcClient { +func NewGroupRpcClient(discov discovery.SvcDiscoveryRegistry, rpcRegisterName string) GroupRpcClient { return GroupRpcClient(*NewGroup(discov, rpcRegisterName)) } @@ -55,10 +55,10 @@ func (g *GroupRpcClient) GetGroupInfos(ctx context.Context, groupIDs []string, c return nil, err } if complete { - if ids := utils.Single(groupIDs, utils.Slice(resp.GroupInfos, func(e *sdkws.GroupInfo) string { + if ids := datautil.Single(groupIDs, datautil.Slice(resp.GroupInfos, func(e *sdkws.GroupInfo) string { return e.GroupID })); len(ids) > 0 { - return nil, errs.ErrGroupIDNotFound.WrapMsg(strings.Join(ids, ",")) + return nil, servererrs.ErrGroupIDNotFound.WrapMsg(strings.Join(ids, ",")) } } return resp.GroupInfos, nil @@ -81,7 +81,7 @@ func (g *GroupRpcClient) GetGroupInfoMap( if err != nil { return nil, err } - return utils.SliceToMap(groups, func(e *sdkws.GroupInfo) string { + return datautil.SliceToMap(groups, func(e *sdkws.GroupInfo) string { return e.GroupID }), nil } @@ -100,10 +100,10 @@ func (g *GroupRpcClient) GetGroupMemberInfos( return nil, err } if complete { - if ids := utils.Single(userIDs, utils.Slice(resp.Members, func(e *sdkws.GroupMemberFullInfo) string { + if ids := datautil.Single(userIDs, datautil.Slice(resp.Members, func(e *sdkws.GroupMemberFullInfo) string { return e.UserID })); len(ids) > 0 { - return nil, errs.ErrNotInGroupYet.WrapMsg(strings.Join(ids, ",")) + return nil, servererrs.ErrNotInGroupYet.WrapMsg(strings.Join(ids, ",")) } } return resp.Members, nil @@ -131,7 +131,7 @@ func (g *GroupRpcClient) GetGroupMemberInfoMap( if err != nil { return nil, err } - return utils.SliceToMap(members, func(e *sdkws.GroupMemberFullInfo) string { + return datautil.SliceToMap(members, func(e *sdkws.GroupMemberFullInfo) string { return e.UserID }), nil } diff --git a/pkg/rpcclient/third.go b/pkg/rpcclient/third.go index 7995e452f..f505f048b 100644 --- a/pkg/rpcclient/third.go +++ b/pkg/rpcclient/third.go @@ -16,28 +16,28 @@ package rpcclient import ( "context" + "github.com/openimsdk/tools/system/program" - util "github.com/openimsdk/open-im-server/v3/pkg/util/genutil" "github.com/openimsdk/protocol/third" - "github.com/openimsdk/tools/discoveryregistry" + "github.com/openimsdk/tools/discovery" "google.golang.org/grpc" ) type Third struct { conn grpc.ClientConnInterface Client third.ThirdClient - discov discoveryregistry.SvcDiscoveryRegistry + discov discovery.SvcDiscoveryRegistry GrafanaUrl string } -func NewThird(discov discoveryregistry.SvcDiscoveryRegistry, rpcRegisterName, grafanaUrl string) *Third { +func NewThird(discov discovery.SvcDiscoveryRegistry, rpcRegisterName, grafanaUrl string) *Third { conn, err := discov.GetConn(context.Background(), rpcRegisterName) if err != nil { - util.ExitWithError(err) + program.ExitWithError(err) } client := third.NewThirdClient(conn) if err != nil { - util.ExitWithError(err) + program.ExitWithError(err) } return &Third{discov: discov, Client: client, conn: conn, GrafanaUrl: grafanaUrl} } diff --git a/tools/component/component.go b/tools/component/component.go index 485573837..c0106eb0f 100644 --- a/tools/component/component.go +++ b/tools/component/component.go @@ -15,15 +15,16 @@ package main import ( + "context" "errors" "flag" "fmt" + kfk "github.com/openimsdk/tools/mq/kafka" "os" "strconv" "strings" "time" - "github.com/openimsdk/open-im-server/v3/pkg/common/kafka" "gopkg.in/yaml.v2" "github.com/openimsdk/tools/component" @@ -214,70 +215,14 @@ func checkZookeeper(config *config.GlobalConfig) error { // checkKafka checks the Kafka connection func checkKafka(config *config.GlobalConfig) error { - // Prioritize environment variables - kafkaStu := &component.Kafka{ - Username: config.Kafka.Username, - Password: config.Kafka.Password, - Addr: config.Kafka.Addr, - } - - kafkaClient, err := component.CheckKafka(kafkaStu) - if err != nil { - return err - } - defer kafkaClient.Close() - - // Verify if necessary topics exist - topics, err := kafkaClient.Topics() - if err != nil { - return errs.Wrap(err) - } - - requiredTopics := []string{ + topics := []string{ config.Kafka.MsgToMongo.Topic, config.Kafka.MsgToPush.Topic, config.Kafka.LatestMsgToRedis.Topic, } - - for _, requiredTopic := range requiredTopics { - if !isTopicPresent(requiredTopic, topics) { - return errs.WrapMsg(nil, "Kafka missing required topic", "topic", requiredTopic, "availableTopics", strings.Join(topics, ", ")) - } - } - - type Item struct { - Topic string - GroupID string - } - - items := []Item{ - { - Topic: config.Kafka.LatestMsgToRedis.Topic, - GroupID: config.Kafka.ConsumerGroupID.MsgToRedis, - }, - - { - Topic: config.Kafka.MsgToMongo.Topic, - GroupID: config.Kafka.ConsumerGroupID.MsgToMongo, - }, - - { - Topic: config.Kafka.MsgToPush.Topic, - GroupID: config.Kafka.ConsumerGroupID.MsgToPush, - }, - } - - for _, item := range items { - cg, err := kafka.NewMConsumerGroup(config.Kafka.Config, item.GroupID, []string{item.Topic}) - if err != nil { - return err - } - if err := cg.Close(); err != nil { - return err - } - } - - return nil + ctx, cancel := context.WithTimeout(context.Background(), time.Second*60) + defer cancel() + return kfk.CheckKafka(ctx, &config.Kafka.Config, topics) } // isTopicPresent checks if a topic is present in the list of topics