feat: kafka group third

pull/2148/head
withchao 2 years ago
parent e0ad9847c5
commit 8b3e8ff549

@ -82,7 +82,7 @@ type OnlineHistoryRedisConsumerHandler struct {
}
func NewOnlineHistoryRedisConsumerHandler(kafkaConf *config.Kafka, database controller.CommonMsgDatabase, conversationRpcClient *rpcclient.ConversationRpcClient, groupRpcClient *rpcclient.GroupRpcClient) (*OnlineHistoryRedisConsumerHandler, error) {
historyConsumerGroup, err := kafka.NewMConsumerGroup(kafkaConf.Config, kafkaConf.ConsumerGroupID.MsgToRedis, []string{kafkaConf.LatestMsgToRedis.Topic})
historyConsumerGroup, err := kafka.NewMConsumerGroup(&kafkaConf.Config, kafkaConf.ConsumerGroupID.MsgToRedis, []string{kafkaConf.LatestMsgToRedis.Topic})
if err != nil {
return nil, err
}

@ -33,7 +33,7 @@ type OnlineHistoryMongoConsumerHandler struct {
}
func NewOnlineHistoryMongoConsumerHandler(kafkaConf *config.Kafka, database controller.CommonMsgDatabase) (*OnlineHistoryMongoConsumerHandler, error) {
historyConsumerGroup, err := kfk.NewMConsumerGroup(kafkaConf.Config, kafkaConf.ConsumerGroupID.MsgToMongo, []string{kafkaConf.MsgToMongo.Topic})
historyConsumerGroup, err := kfk.NewMConsumerGroup(&kafkaConf.Config, kafkaConf.ConsumerGroupID.MsgToMongo, []string{kafkaConf.MsgToMongo.Topic})
if err != nil {
return nil, err
}

@ -35,7 +35,7 @@ type ConsumerHandler struct {
}
func NewConsumerHandler(kafkaConf *config.Kafka, pusher *Pusher) (*ConsumerHandler, error) {
pushConsumerGroup, err := kfk.NewMConsumerGroup(kafkaConf.Config, kafkaConf.ConsumerGroupID.MsgToPush, []string{kafkaConf.MsgToPush.Topic})
pushConsumerGroup, err := kfk.NewMConsumerGroup(&kafkaConf.Config, kafkaConf.ConsumerGroupID.MsgToPush, []string{kafkaConf.MsgToPush.Topic})
if err != nil {
return nil, err
}

@ -17,6 +17,12 @@ package group
import (
"context"
"fmt"
"github.com/openimsdk/open-im-server/v3/pkg/common/servererrs"
"github.com/openimsdk/tools/db/mongoutil"
"github.com/openimsdk/tools/db/redisutil"
"github.com/openimsdk/tools/discovery"
"github.com/openimsdk/tools/utils/datautil"
"github.com/openimsdk/tools/utils/encrypt"
"math/big"
"math/rand"
"strconv"
@ -27,11 +33,9 @@ import (
"github.com/openimsdk/open-im-server/v3/pkg/callbackstruct"
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
"github.com/openimsdk/open-im-server/v3/pkg/common/convert"
"github.com/openimsdk/open-im-server/v3/pkg/common/db/cache"
"github.com/openimsdk/open-im-server/v3/pkg/common/db/controller"
"github.com/openimsdk/open-im-server/v3/pkg/common/db/mgo"
relationtb "github.com/openimsdk/open-im-server/v3/pkg/common/db/table/relation"
"github.com/openimsdk/open-im-server/v3/pkg/common/db/unrelation"
"github.com/openimsdk/open-im-server/v3/pkg/msgprocessor"
"github.com/openimsdk/open-im-server/v3/pkg/rpcclient"
"github.com/openimsdk/open-im-server/v3/pkg/rpcclient/grouphash"
@ -40,13 +44,10 @@ import (
pbconversation "github.com/openimsdk/protocol/conversation"
pbgroup "github.com/openimsdk/protocol/group"
"github.com/openimsdk/protocol/sdkws"
"github.com/openimsdk/tools/discoveryregistry"
"github.com/openimsdk/tools/errs"
"github.com/openimsdk/tools/log"
"github.com/openimsdk/tools/mcontext"
"github.com/openimsdk/tools/mw/specialerror"
"github.com/openimsdk/tools/tx"
"github.com/openimsdk/tools/utils"
"google.golang.org/grpc"
"google.golang.org/protobuf/types/known/wrapperspb"
)
@ -60,36 +61,32 @@ type groupServer struct {
config *config.GlobalConfig
}
func Start(ctx context.Context, config *config.GlobalConfig, client discoveryregistry.SvcDiscoveryRegistry, server *grpc.Server) error {
mongo, err := unrelation.NewMongoDB(ctx, &config.Mongo)
func Start(ctx context.Context, config *config.GlobalConfig, client discovery.SvcDiscoveryRegistry, server *grpc.Server) error {
mgocli, err := mongoutil.NewMongoDB(ctx, config.Mongo.Build())
if err != nil {
return err
}
rdb, err := cache.NewRedis(ctx, &config.Redis)
rdb, err := redisutil.NewRedisClient(ctx, config.Redis.Build())
if err != nil {
return err
}
groupDB, err := mgo.NewGroupMongo(mongo.GetDatabase(config.Mongo.Database))
groupDB, err := mgo.NewGroupMongo(mgocli.GetDB())
if err != nil {
return err
}
groupMemberDB, err := mgo.NewGroupMember(mongo.GetDatabase(config.Mongo.Database))
groupMemberDB, err := mgo.NewGroupMember(mgocli.GetDB())
if err != nil {
return err
}
groupRequestDB, err := mgo.NewGroupRequestMgo(mongo.GetDatabase(config.Mongo.Database))
groupRequestDB, err := mgo.NewGroupRequestMgo(mgocli.GetDB())
if err != nil {
return err
}
userRpcClient := rpcclient.NewUserRpcClient(client, config.RpcRegisterName.OpenImUserName, &config.Manager, &config.IMAdmin)
msgRpcClient := rpcclient.NewMessageRpcClient(client, config.RpcRegisterName.OpenImMsgName)
conversationRpcClient := rpcclient.NewConversationRpcClient(client, config.RpcRegisterName.OpenImConversationName)
var gs groupServer
database := controller.NewGroupDatabase(rdb, groupDB, groupMemberDB, groupRequestDB, tx.NewMongo(mongo.GetClient()), grouphash.NewGroupHashFromGroupServer(&gs))
database := controller.NewGroupDatabase(rdb, groupDB, groupMemberDB, groupRequestDB, mgocli.GetTx(), grouphash.NewGroupHashFromGroupServer(&gs))
gs.db = database
gs.User = userRpcClient
gs.Notification = notification.NewGroupNotificationSender(database, &msgRpcClient, &userRpcClient, config, func(ctx context.Context, userIDs []string) ([]notification.CommonUser, error) {
@ -97,7 +94,7 @@ func Start(ctx context.Context, config *config.GlobalConfig, client discoveryreg
if err != nil {
return nil, err
}
return utils.Slice(users, func(e *sdkws.UserInfo) notification.CommonUser { return e }), nil
return datautil.Slice(users, func(e *sdkws.UserInfo) notification.CommonUser { return e }), nil
})
gs.conversationRpcClient = conversationRpcClient
gs.msgRpcClient = msgRpcClient
@ -156,7 +153,7 @@ func (s *groupServer) GetPublicUserInfoMap(ctx context.Context, userIDs []string
if err != nil {
return nil, err
}
return utils.SliceToMapAny(users, func(e *sdkws.PublicUserInfo) (string, *sdkws.PublicUserInfo) {
return datautil.SliceToMapAny(users, func(e *sdkws.PublicUserInfo) (string, *sdkws.PublicUserInfo) {
return e.UserID, e
}), nil
}
@ -169,7 +166,7 @@ func (s *groupServer) GenGroupID(ctx context.Context, groupID *string) error {
if *groupID != "" {
_, err := s.db.TakeGroup(ctx, *groupID)
if err == nil {
return errs.ErrGroupIDExisted.WrapMsg("group id existed " + *groupID)
return servererrs.ErrGroupIDExisted.WrapMsg("group id existed " + *groupID)
} else if s.IsNotFound(err) {
return nil
} else {
@ -177,7 +174,7 @@ func (s *groupServer) GenGroupID(ctx context.Context, groupID *string) error {
}
}
for i := 0; i < 10; i++ {
id := utils.Md5(strings.Join([]string{mcontext.GetOperationID(ctx), strconv.FormatInt(time.Now().UnixNano(), 10), strconv.Itoa(rand.Int())}, ",;,"))
id := encrypt.Md5(strings.Join([]string{mcontext.GetOperationID(ctx), strconv.FormatInt(time.Now().UnixNano(), 10), strconv.Itoa(rand.Int())}, ",;,"))
bi := big.NewInt(0)
bi.SetString(id[0:8], 16)
id = bi.String()
@ -191,7 +188,7 @@ func (s *groupServer) GenGroupID(ctx context.Context, groupID *string) error {
return err
}
}
return errs.ErrData.WrapMsg("group id gen error")
return servererrs.ErrData.WrapMsg("group id gen error")
}
func (s *groupServer) CreateGroup(ctx context.Context, req *pbgroup.CreateGroupReq) (*pbgroup.CreateGroupResp, error) {
@ -206,11 +203,11 @@ func (s *groupServer) CreateGroup(ctx context.Context, req *pbgroup.CreateGroupR
}
userIDs := append(append(req.MemberUserIDs, req.AdminUserIDs...), req.OwnerUserID)
opUserID := mcontext.GetOpUserID(ctx)
if !utils.Contain(opUserID, userIDs...) {
if !datautil.Contain(opUserID, userIDs...) {
userIDs = append(userIDs, opUserID)
}
if utils.Duplicate(userIDs) {
if datautil.Duplicate(userIDs) {
return nil, errs.ErrArgs.WrapMsg("group member repeated")
}
@ -220,7 +217,7 @@ func (s *groupServer) CreateGroup(ctx context.Context, req *pbgroup.CreateGroupR
}
if len(userMap) != len(userIDs) {
return nil, errs.ErrUserIDNotFound.WrapMsg("user not found")
return nil, servererrs.ErrUserIDNotFound.WrapMsg("user not found")
}
config := &GroupEventCallbackConfig{
@ -348,7 +345,7 @@ func (s *groupServer) GetJoinedGroupList(ctx context.Context, req *pbgroup.GetJo
if len(members) == 0 {
return resp, nil
}
groupIDs := utils.Slice(members, func(e *relationtb.GroupMemberModel) string {
groupIDs := datautil.Slice(members, func(e *relationtb.GroupMemberModel) string {
return e.GroupID
})
groups, err := s.db.FindGroup(ctx, groupIDs)
@ -366,10 +363,10 @@ func (s *groupServer) GetJoinedGroupList(ctx context.Context, req *pbgroup.GetJo
if err := s.PopulateGroupMember(ctx, members...); err != nil {
return nil, err
}
ownerMap := utils.SliceToMap(owners, func(e *relationtb.GroupMemberModel) string {
ownerMap := datautil.SliceToMap(owners, func(e *relationtb.GroupMemberModel) string {
return e.GroupID
})
resp.Groups = utils.Slice(utils.Order(groupIDs, groups, func(group *relationtb.GroupModel) string {
resp.Groups = datautil.Slice(datautil.Order(groupIDs, groups, func(group *relationtb.GroupModel) string {
return group.GroupID
}), func(group *relationtb.GroupModel) *sdkws.GroupInfo {
var userID string
@ -387,7 +384,7 @@ func (s *groupServer) InviteUserToGroup(ctx context.Context, req *pbgroup.Invite
if len(req.InvitedUserIDs) == 0 {
return nil, errs.ErrArgs.WrapMsg("user empty")
}
if utils.Duplicate(req.InvitedUserIDs) {
if datautil.Duplicate(req.InvitedUserIDs) {
return nil, errs.ErrArgs.WrapMsg("userID duplicate")
}
group, err := s.db.TakeGroup(ctx, req.GroupID)
@ -396,7 +393,7 @@ func (s *groupServer) InviteUserToGroup(ctx context.Context, req *pbgroup.Invite
}
if group.Status == constant.GroupStatusDismissed {
return nil, errs.ErrDismissedAlready.WrapMsg("group dismissed checking group status found it dismissed")
return nil, servererrs.ErrDismissedAlready.WrapMsg("group dismissed checking group status found it dismissed")
}
userMap, err := s.User.GetUsersInfoMap(ctx, req.InvitedUserIDs)
@ -499,7 +496,7 @@ func (s *groupServer) GetGroupAllMember(ctx context.Context, req *pbgroup.GetGro
return nil, err
}
resp := &pbgroup.GetGroupAllMemberResp{}
resp.Members = utils.Slice(members, func(e *relationtb.GroupMemberModel) *sdkws.GroupMemberFullInfo {
resp.Members = datautil.Slice(members, func(e *relationtb.GroupMemberModel) *sdkws.GroupMemberFullInfo {
return convert.Db2PbGroupMember(e)
})
return resp, nil
@ -538,13 +535,13 @@ func (s *groupServer) GetGroupMemberList(ctx context.Context, req *pbgroup.GetGr
}
}
GMembers := utils.Paginate(groupMembers, int(req.Pagination.GetPageNumber()), int(req.Pagination.GetShowNumber()))
resp.Members = utils.Batch(convert.Db2PbGroupMember, GMembers)
GMembers := datautil.Paginate(groupMembers, int(req.Pagination.GetPageNumber()), int(req.Pagination.GetShowNumber()))
resp.Members = datautil.Batch(convert.Db2PbGroupMember, GMembers)
resp.Total = uint32(total)
return resp, nil
}
resp.Total = uint32(total)
resp.Members = utils.Batch(convert.Db2PbGroupMember, members)
resp.Members = datautil.Batch(convert.Db2PbGroupMember, members)
return resp, nil
}
@ -557,11 +554,11 @@ func (s *groupServer) KickGroupMember(ctx context.Context, req *pbgroup.KickGrou
if len(req.KickedUserIDs) == 0 {
return nil, errs.ErrArgs.WrapMsg("KickedUserIDs empty")
}
if utils.IsDuplicateStringSlice(req.KickedUserIDs) {
if datautil.Duplicate(req.KickedUserIDs) {
return nil, errs.ErrArgs.WrapMsg("KickedUserIDs duplicate")
}
opUserID := mcontext.GetOpUserID(ctx)
if utils.IsContain(opUserID, req.KickedUserIDs) {
if datautil.Contain(opUserID, req.KickedUserIDs...) {
return nil, errs.ErrArgs.WrapMsg("opUserID in KickedUserIDs")
}
members, err := s.db.FindGroupMembers(ctx, req.GroupID, append(req.KickedUserIDs, opUserID))
@ -580,7 +577,7 @@ func (s *groupServer) KickGroupMember(ctx context.Context, req *pbgroup.KickGrou
for _, userID := range req.KickedUserIDs {
member, ok := memberMap[userID]
if !ok {
return nil, errs.ErrUserIDNotFound.WrapMsg(userID)
return nil, servererrs.ErrUserIDNotFound.WrapMsg(userID)
}
if !isAppManagerUid {
if opMember == nil {
@ -673,7 +670,7 @@ func (s *groupServer) GetGroupMembersInfo(ctx context.Context, req *pbgroup.GetG
if err := s.PopulateGroupMember(ctx, members...); err != nil {
return nil, err
}
resp.Members = utils.Slice(members, func(e *relationtb.GroupMemberModel) *sdkws.GroupMemberFullInfo {
resp.Members = datautil.Slice(members, func(e *relationtb.GroupMemberModel) *sdkws.GroupMemberFullInfo {
return convert.Db2PbGroupMember(e)
})
return resp, nil
@ -702,20 +699,20 @@ func (s *groupServer) GetGroupApplicationList(ctx context.Context, req *pbgroup.
for _, gr := range groupRequests {
userIDs = append(userIDs, gr.UserID)
}
userIDs = utils.Distinct(userIDs)
userIDs = datautil.Distinct(userIDs)
userMap, err := s.User.GetPublicUserInfoMap(ctx, userIDs, true)
if err != nil {
return nil, err
}
groups, err := s.db.FindGroup(ctx, utils.Distinct(groupIDs))
groups, err := s.db.FindGroup(ctx, datautil.Distinct(groupIDs))
if err != nil {
return nil, err
}
groupMap := utils.SliceToMap(groups, func(e *relationtb.GroupModel) string {
groupMap := datautil.SliceToMap(groups, func(e *relationtb.GroupModel) string {
return e.GroupID
})
if ids := utils.Single(utils.Keys(groupMap), groupIDs); len(ids) > 0 {
return nil, errs.ErrGroupIDNotFound.WrapMsg(strings.Join(ids, ","))
if ids := datautil.Single(datautil.Keys(groupMap), groupIDs); len(ids) > 0 {
return nil, servererrs.ErrGroupIDNotFound.WrapMsg(strings.Join(ids, ","))
}
groupMemberNumMap, err := s.db.MapGroupMemberNum(ctx, groupIDs)
if err != nil {
@ -728,10 +725,10 @@ func (s *groupServer) GetGroupApplicationList(ctx context.Context, req *pbgroup.
if err := s.PopulateGroupMember(ctx, owners...); err != nil {
return nil, err
}
ownerMap := utils.SliceToMap(owners, func(e *relationtb.GroupMemberModel) string {
ownerMap := datautil.SliceToMap(owners, func(e *relationtb.GroupMemberModel) string {
return e.GroupID
})
resp.GroupRequests = utils.Slice(groupRequests, func(e *relationtb.GroupRequestModel) *sdkws.GroupRequest {
resp.GroupRequests = datautil.Slice(groupRequests, func(e *relationtb.GroupRequestModel) *sdkws.GroupRequest {
var ownerUserID string
if owner, ok := ownerMap[e.GroupID]; ok {
ownerUserID = owner.UserID
@ -761,10 +758,10 @@ func (s *groupServer) GetGroupsInfo(ctx context.Context, req *pbgroup.GetGroupsI
if err := s.PopulateGroupMember(ctx, owners...); err != nil {
return nil, err
}
ownerMap := utils.SliceToMap(owners, func(e *relationtb.GroupMemberModel) string {
ownerMap := datautil.SliceToMap(owners, func(e *relationtb.GroupMemberModel) string {
return e.GroupID
})
resp.GroupInfos = utils.Slice(groups, func(e *relationtb.GroupModel) *sdkws.GroupInfo {
resp.GroupInfos = datautil.Slice(groups, func(e *relationtb.GroupModel) *sdkws.GroupInfo {
var ownerUserID string
if owner, ok := ownerMap[e.GroupID]; ok {
ownerUserID = owner.UserID
@ -775,7 +772,7 @@ func (s *groupServer) GetGroupsInfo(ctx context.Context, req *pbgroup.GetGroupsI
}
func (s *groupServer) GroupApplicationResponse(ctx context.Context, req *pbgroup.GroupApplicationResponseReq) (*pbgroup.GroupApplicationResponseResp, error) {
if !utils.Contain(req.HandleResult, constant.GroupResponseAgree, constant.GroupResponseRefuse) {
if !datautil.Contain(req.HandleResult, constant.GroupResponseAgree, constant.GroupResponseRefuse) {
return nil, errs.ErrArgs.WrapMsg("HandleResult unknown")
}
if !authverify.IsAppManagerUid(ctx, &s.config.Manager, &s.config.IMAdmin) {
@ -796,7 +793,7 @@ func (s *groupServer) GroupApplicationResponse(ctx context.Context, req *pbgroup
return nil, err
}
if groupRequest.HandleResult != 0 {
return nil, errs.ErrGroupRequestHandled.WrapMsg("group request already processed")
return nil, servererrs.ErrGroupRequestHandled.WrapMsg("group request already processed")
}
var inGroup bool
if _, err := s.db.TakeGroupMember(ctx, req.GroupID, req.FromUserID); err == nil {
@ -864,7 +861,7 @@ func (s *groupServer) JoinGroup(ctx context.Context, req *pbgroup.JoinGroupReq)
return nil, err
}
if group.Status == constant.GroupStatusDismissed {
return nil, errs.ErrDismissedAlready.Wrap()
return nil, servererrs.ErrDismissedAlready.Wrap()
}
reqCall := &callbackstruct.CallbackJoinGroupReq{
@ -886,7 +883,7 @@ func (s *groupServer) JoinGroup(ctx context.Context, req *pbgroup.JoinGroupReq)
_, err = s.db.TakeGroupMember(ctx, req.GroupID, req.InviterUserID)
if err == nil {
return nil, errs.ErrArgs.Wrap()
} else if !s.IsNotFound(err) && utils.Unwrap(err) != errs.ErrRecordNotFound {
} else if !s.IsNotFound(err) && errs.Unwrap(err) != errs.ErrRecordNotFound {
return nil, err
}
log.ZInfo(ctx, "JoinGroup.groupInfo", "group", group, "eq", group.NeedVerification == constant.Directly)
@ -1012,7 +1009,7 @@ func (s *groupServer) SetGroupInfo(ctx context.Context, req *pbgroup.SetGroupInf
return nil, err
}
if group.Status == constant.GroupStatusDismissed {
return nil, errs.ErrDismissedAlready.Wrap()
return nil, servererrs.ErrDismissedAlready.Wrap()
}
resp := &pbgroup.SetGroupInfoResp{}
@ -1089,7 +1086,7 @@ func (s *groupServer) TransferGroupOwner(ctx context.Context, req *pbgroup.Trans
return nil, err
}
if group.Status == constant.GroupStatusDismissed {
return nil, errs.ErrDismissedAlready.Wrap()
return nil, servererrs.ErrDismissedAlready.Wrap()
}
if req.OldOwnerUserID == req.NewOwnerUserID {
return nil, errs.ErrArgs.WrapMsg("OldOwnerUserID == NewOwnerUserID")
@ -1101,8 +1098,8 @@ func (s *groupServer) TransferGroupOwner(ctx context.Context, req *pbgroup.Trans
if err := s.PopulateGroupMember(ctx, members...); err != nil {
return nil, err
}
memberMap := utils.SliceToMap(members, func(e *relationtb.GroupMemberModel) string { return e.UserID })
if ids := utils.Single([]string{req.OldOwnerUserID, req.NewOwnerUserID}, utils.Keys(memberMap)); len(ids) > 0 {
memberMap := datautil.SliceToMap(members, func(e *relationtb.GroupMemberModel) string { return e.UserID })
if ids := datautil.Single([]string{req.OldOwnerUserID, req.NewOwnerUserID}, datautil.Keys(memberMap)); len(ids) > 0 {
return nil, errs.ErrArgs.WrapMsg("user not in group " + strings.Join(ids, ","))
}
oldOwner := memberMap[req.OldOwnerUserID]
@ -1154,7 +1151,7 @@ func (s *groupServer) GetGroups(ctx context.Context, req *pbgroup.GetGroupsReq)
return nil, err
}
groupIDs := utils.Slice(group, func(e *relationtb.GroupModel) string {
groupIDs := datautil.Slice(group, func(e *relationtb.GroupModel) string {
return e.GroupID
})
@ -1163,14 +1160,14 @@ func (s *groupServer) GetGroups(ctx context.Context, req *pbgroup.GetGroupsReq)
return nil, err
}
ownerMemberMap := utils.SliceToMap(ownerMembers, func(e *relationtb.GroupMemberModel) string {
ownerMemberMap := datautil.SliceToMap(ownerMembers, func(e *relationtb.GroupMemberModel) string {
return e.GroupID
})
groupMemberNumMap, err := s.db.MapGroupMemberNum(ctx, groupIDs)
if err != nil {
return nil, err
}
resp.Groups = utils.Slice(group, func(group *relationtb.GroupModel) *pbgroup.CMSGroup {
resp.Groups = datautil.Slice(group, func(group *relationtb.GroupModel) *pbgroup.CMSGroup {
var (
userID string
username string
@ -1194,7 +1191,7 @@ func (s *groupServer) GetGroupMembersCMS(ctx context.Context, req *pbgroup.GetGr
if err := s.PopulateGroupMember(ctx, members...); err != nil {
return nil, err
}
resp.Members = utils.Slice(members, func(e *relationtb.GroupMemberModel) *sdkws.GroupMemberFullInfo {
resp.Members = datautil.Slice(members, func(e *relationtb.GroupMemberModel) *sdkws.GroupMemberFullInfo {
return convert.Db2PbGroupMember(e)
})
return resp, nil
@ -1214,14 +1211,14 @@ func (s *groupServer) GetUserReqApplicationList(ctx context.Context, req *pbgrou
if len(requests) == 0 {
return resp, nil
}
groupIDs := utils.Distinct(utils.Slice(requests, func(e *relationtb.GroupRequestModel) string {
groupIDs := datautil.Distinct(datautil.Slice(requests, func(e *relationtb.GroupRequestModel) string {
return e.GroupID
}))
groups, err := s.db.FindGroup(ctx, groupIDs)
if err != nil {
return nil, err
}
groupMap := utils.SliceToMap(groups, func(e *relationtb.GroupModel) string {
groupMap := datautil.SliceToMap(groups, func(e *relationtb.GroupModel) string {
return e.GroupID
})
owners, err := s.db.FindGroupsOwner(ctx, groupIDs)
@ -1231,14 +1228,14 @@ func (s *groupServer) GetUserReqApplicationList(ctx context.Context, req *pbgrou
if err := s.PopulateGroupMember(ctx, owners...); err != nil {
return nil, err
}
ownerMap := utils.SliceToMap(owners, func(e *relationtb.GroupMemberModel) string {
ownerMap := datautil.SliceToMap(owners, func(e *relationtb.GroupMemberModel) string {
return e.GroupID
})
groupMemberNum, err := s.db.MapGroupMemberNum(ctx, groupIDs)
if err != nil {
return nil, err
}
resp.GroupRequests = utils.Slice(requests, func(e *relationtb.GroupRequestModel) *sdkws.GroupRequest {
resp.GroupRequests = datautil.Slice(requests, func(e *relationtb.GroupRequestModel) *sdkws.GroupRequest {
var ownerUserID string
if owner, ok := ownerMap[e.GroupID]; ok {
ownerUserID = owner.UserID
@ -1267,7 +1264,7 @@ func (s *groupServer) DismissGroup(ctx context.Context, req *pbgroup.DismissGrou
return nil, err
}
if !req.DeleteMember && group.Status == constant.GroupStatusDismissed {
return nil, errs.ErrDismissedAlready.WrapMsg("group status is dismissed")
return nil, servererrs.ErrDismissedAlready.WrapMsg("group status is dismissed")
}
if err := s.db.DismissGroup(ctx, req.GroupID, req.DeleteMember); err != nil {
return nil, err
@ -1504,7 +1501,7 @@ func (s *groupServer) SetGroupMemberInfo(ctx context.Context, req *pbgroup.SetGr
return nil, err
}
}
if err := s.db.UpdateGroupMembers(ctx, utils.Slice(req.Members, func(e *pbgroup.SetGroupMemberInfo) *relationtb.BatchUpdateGroupMember {
if err := s.db.UpdateGroupMembers(ctx, datautil.Slice(req.Members, func(e *pbgroup.SetGroupMemberInfo) *relationtb.BatchUpdateGroupMember {
return &relationtb.BatchUpdateGroupMember{
GroupID: e.GroupID,
UserID: e.UserID,
@ -1541,26 +1538,26 @@ func (s *groupServer) GetGroupAbstractInfo(ctx context.Context, req *pbgroup.Get
if len(req.GroupIDs) == 0 {
return nil, errs.ErrArgs.WrapMsg("groupIDs empty")
}
if utils.Duplicate(req.GroupIDs) {
if datautil.Duplicate(req.GroupIDs) {
return nil, errs.ErrArgs.WrapMsg("groupIDs duplicate")
}
groups, err := s.db.FindGroup(ctx, req.GroupIDs)
if err != nil {
return nil, err
}
if ids := utils.Single(req.GroupIDs, utils.Slice(groups, func(group *relationtb.GroupModel) string {
if ids := datautil.Single(req.GroupIDs, datautil.Slice(groups, func(group *relationtb.GroupModel) string {
return group.GroupID
})); len(ids) > 0 {
return nil, errs.ErrGroupIDNotFound.WrapMsg("not found group " + strings.Join(ids, ","))
return nil, servererrs.ErrGroupIDNotFound.WrapMsg("not found group " + strings.Join(ids, ","))
}
groupUserMap, err := s.db.MapGroupMemberUserID(ctx, req.GroupIDs)
if err != nil {
return nil, err
}
if ids := utils.Single(req.GroupIDs, utils.Keys(groupUserMap)); len(ids) > 0 {
return nil, errs.ErrGroupIDNotFound.WrapMsg(fmt.Sprintf("group %s not found member", strings.Join(ids, ",")))
if ids := datautil.Single(req.GroupIDs, datautil.Keys(groupUserMap)); len(ids) > 0 {
return nil, servererrs.ErrGroupIDNotFound.WrapMsg(fmt.Sprintf("group %s not found member", strings.Join(ids, ",")))
}
resp.GroupAbstractInfos = utils.Slice(groups, func(group *relationtb.GroupModel) *pbgroup.GroupAbstractInfo {
resp.GroupAbstractInfos = datautil.Slice(groups, func(group *relationtb.GroupModel) *pbgroup.GroupAbstractInfo {
users := groupUserMap[group.GroupID]
return convert.Db2PbGroupAbstractInfo(group.GroupID, users.MemberNum, users.Hash)
})
@ -1579,7 +1576,7 @@ func (s *groupServer) GetUserInGroupMembers(ctx context.Context, req *pbgroup.Ge
if err := s.PopulateGroupMember(ctx, members...); err != nil {
return nil, err
}
resp.Members = utils.Slice(members, func(e *relationtb.GroupMemberModel) *sdkws.GroupMemberFullInfo {
resp.Members = datautil.Slice(members, func(e *relationtb.GroupMemberModel) *sdkws.GroupMemberFullInfo {
return convert.Db2PbGroupMember(e)
})
return resp, nil
@ -1606,7 +1603,7 @@ func (s *groupServer) GetGroupMemberRoleLevel(ctx context.Context, req *pbgroup.
if err := s.PopulateGroupMember(ctx, members...); err != nil {
return nil, err
}
resp.Members = utils.Slice(members, func(e *relationtb.GroupMemberModel) *sdkws.GroupMemberFullInfo {
resp.Members = datautil.Slice(members, func(e *relationtb.GroupMemberModel) *sdkws.GroupMemberFullInfo {
return convert.Db2PbGroupMember(e)
})
return resp, nil
@ -1621,18 +1618,18 @@ func (s *groupServer) GetGroupUsersReqApplicationList(ctx context.Context, req *
if len(requests) == 0 {
return resp, nil
}
groupIDs := utils.Distinct(utils.Slice(requests, func(e *relationtb.GroupRequestModel) string {
groupIDs := datautil.Distinct(datautil.Slice(requests, func(e *relationtb.GroupRequestModel) string {
return e.GroupID
}))
groups, err := s.db.FindGroup(ctx, groupIDs)
if err != nil {
return nil, err
}
groupMap := utils.SliceToMap(groups, func(e *relationtb.GroupModel) string {
groupMap := datautil.SliceToMap(groups, func(e *relationtb.GroupModel) string {
return e.GroupID
})
if ids := utils.Single(groupIDs, utils.Keys(groupMap)); len(ids) > 0 {
return nil, errs.ErrGroupIDNotFound.WrapMsg(strings.Join(ids, ","))
if ids := datautil.Single(groupIDs, datautil.Keys(groupMap)); len(ids) > 0 {
return nil, servererrs.ErrGroupIDNotFound.WrapMsg(strings.Join(ids, ","))
}
owners, err := s.db.FindGroupsOwner(ctx, groupIDs)
if err != nil {
@ -1641,14 +1638,14 @@ func (s *groupServer) GetGroupUsersReqApplicationList(ctx context.Context, req *
if err := s.PopulateGroupMember(ctx, owners...); err != nil {
return nil, err
}
ownerMap := utils.SliceToMap(owners, func(e *relationtb.GroupMemberModel) string {
ownerMap := datautil.SliceToMap(owners, func(e *relationtb.GroupMemberModel) string {
return e.GroupID
})
groupMemberNum, err := s.db.MapGroupMemberNum(ctx, groupIDs)
if err != nil {
return nil, err
}
resp.GroupRequests = utils.Slice(requests, func(e *relationtb.GroupRequestModel) *sdkws.GroupRequest {
resp.GroupRequests = datautil.Slice(requests, func(e *relationtb.GroupRequestModel) *sdkws.GroupRequest {
var ownerUserID string
if owner, ok := ownerMap[e.GroupID]; ok {
ownerUserID = owner.UserID

@ -18,6 +18,9 @@ import (
"context"
"crypto/rand"
"fmt"
"github.com/openimsdk/open-im-server/v3/pkg/common/servererrs"
"github.com/openimsdk/tools/utils/datautil"
"github.com/openimsdk/tools/utils/stringutil"
"time"
"github.com/openimsdk/open-im-server/v3/pkg/authverify"
@ -25,8 +28,6 @@ import (
"github.com/openimsdk/protocol/constant"
"github.com/openimsdk/protocol/third"
"github.com/openimsdk/tools/errs"
"github.com/openimsdk/tools/utils"
utils2 "github.com/openimsdk/tools/utils"
)
func genLogID() string {
@ -70,7 +71,7 @@ func (t *thirdServer) UploadLogs(ctx context.Context, req *third.UploadLogsReq)
}
}
if log.LogID == "" {
return nil, errs.ErrData.WrapMsg("LogModel id gen error")
return nil, servererrs.ErrData.WrapMsg("LogModel id gen error")
}
DBlogs = append(DBlogs, &log)
}
@ -94,7 +95,7 @@ func (t *thirdServer) DeleteLogs(ctx context.Context, req *third.DeleteLogsReq)
for _, log := range logs {
logIDs = append(logIDs, log.LogID)
}
if ids := utils2.Single(req.LogIDs, logIDs); len(ids) > 0 {
if ids := datautil.Single(req.LogIDs, logIDs); len(ids) > 0 {
return nil, errs.ErrRecordNotFound.WrapMsg(fmt.Sprintf("logIDs not found%#v", ids))
}
err = t.thirdDatabase.DeleteLogs(ctx, req.LogIDs, userID)
@ -110,7 +111,7 @@ func dbToPbLogInfos(logs []*relationtb.LogModel) []*third.LogInfo {
return &third.LogInfo{
Filename: log.FileName,
UserID: log.UserID,
Platform: utils.StringToInt32(log.Platform),
Platform: stringutil.StringToInt32(log.Platform),
Url: log.Url,
CreateTime: log.CreateTime.UnixMilli(),
LogID: log.LogID,
@ -119,7 +120,7 @@ func dbToPbLogInfos(logs []*relationtb.LogModel) []*third.LogInfo {
Ex: log.Ex,
}
}
return utils.Slice(logs, db2pbForLogInfo)
return datautil.Slice(logs, db2pbForLogInfo)
}
func (t *thirdServer) SearchLogs(ctx context.Context, req *third.SearchLogsReq) (*third.SearchLogsResp, error) {

@ -17,6 +17,9 @@ package third
import (
"context"
"fmt"
"github.com/openimsdk/tools/db/mongoutil"
"github.com/openimsdk/tools/db/redisutil"
"github.com/openimsdk/tools/discovery"
"net/url"
"time"
@ -28,28 +31,26 @@ import (
"github.com/openimsdk/open-im-server/v3/pkg/common/db/s3/cos"
"github.com/openimsdk/open-im-server/v3/pkg/common/db/s3/minio"
"github.com/openimsdk/open-im-server/v3/pkg/common/db/s3/oss"
"github.com/openimsdk/open-im-server/v3/pkg/common/db/unrelation"
"github.com/openimsdk/open-im-server/v3/pkg/rpcclient"
"github.com/openimsdk/protocol/third"
"github.com/openimsdk/tools/discoveryregistry"
"github.com/openimsdk/tools/errs"
"google.golang.org/grpc"
)
func Start(ctx context.Context, config *config.GlobalConfig, client discoveryregistry.SvcDiscoveryRegistry, server *grpc.Server) error {
rdb, err := cache.NewRedis(ctx, &config.Redis)
func Start(ctx context.Context, config *config.GlobalConfig, client discovery.SvcDiscoveryRegistry, server *grpc.Server) error {
mgocli, err := mongoutil.NewMongoDB(ctx, config.Mongo.Build())
if err != nil {
return err
}
mongo, err := unrelation.NewMongoDB(ctx, &config.Mongo)
rdb, err := redisutil.NewRedisClient(ctx, config.Redis.Build())
if err != nil {
return err
}
logdb, err := mgo.NewLogMongo(mongo.GetDatabase(config.Mongo.Database))
logdb, err := mgo.NewLogMongo(mgocli.GetDB())
if err != nil {
return err
}
s3db, err := mgo.NewS3Mongo(mongo.GetDatabase(config.Mongo.Database))
s3db, err := mgo.NewS3Mongo(mgocli.GetDB())
if err != nil {
return err
}

@ -17,6 +17,7 @@ package controller
import (
"context"
"github.com/openimsdk/tools/db/pagination"
"github.com/openimsdk/tools/db/tx"
"github.com/openimsdk/tools/utils/datautil"
"time"
@ -24,7 +25,6 @@ import (
"github.com/openimsdk/open-im-server/v3/pkg/common/db/cache"
relationtb "github.com/openimsdk/open-im-server/v3/pkg/common/db/table/relation"
"github.com/openimsdk/protocol/constant"
"github.com/openimsdk/tools/db"
"github.com/redis/go-redis/v9"
)
@ -110,7 +110,7 @@ func NewGroupDatabase(
groupDB relationtb.GroupModelInterface,
groupMemberDB relationtb.GroupMemberModelInterface,
groupRequestDB relationtb.GroupRequestModelInterface,
ctxTx db.CtxTx,
ctxTx tx.Tx,
groupHash cache.GroupHash,
) GroupDatabase {
rcOptions := rockscache.NewDefaultOptions()
@ -129,7 +129,7 @@ type groupDatabase struct {
groupDB relationtb.GroupModelInterface
groupMemberDB relationtb.GroupMemberModelInterface
groupRequestDB relationtb.GroupRequestModelInterface
ctxTx db.CtxTx
ctxTx tx.Tx
cache cache.GroupCache
}

@ -18,8 +18,8 @@ import (
"context"
"errors"
"github.com/IBM/sarama"
"github.com/openimsdk/tools/db/kafka"
"github.com/openimsdk/tools/log"
kfk "github.com/openimsdk/tools/mq/kafka"
)
type MConsumerGroup struct {
@ -36,12 +36,12 @@ type MConsumerGroupConfig struct {
Password string
}
func NewMConsumerGroup(conf kafka.Config, groupID string, topics []string) (*MConsumerGroup, error) {
kfk, err := kafka.BuildConsumerGroupConfig(conf, sarama.OffsetNewest)
func NewMConsumerGroup(conf *kfk.Config, groupID string, topics []string) (*MConsumerGroup, error) {
config, err := kfk.BuildConsumerGroupConfig(conf, sarama.OffsetNewest)
if err != nil {
return nil, err
}
group, err := kafka.NewConsumerGroup(kfk, conf.Addr, groupID)
group, err := kfk.NewConsumerGroup(config, conf.Addr, groupID)
if err != nil {
return nil, err
}

@ -19,15 +19,13 @@ import (
"errors"
"github.com/IBM/sarama"
"github.com/openimsdk/protocol/constant"
"github.com/openimsdk/tools/db/kafka"
"github.com/openimsdk/tools/errs"
"github.com/openimsdk/tools/log"
"github.com/openimsdk/tools/mcontext"
kfk "github.com/openimsdk/tools/mq/kafka"
"google.golang.org/protobuf/proto"
)
const maxRetry = 10 // number of retries
var errEmptyMsg = errors.New("kafka binary msg is empty")
// Producer represents a Kafka producer.
@ -45,19 +43,19 @@ type ProducerConfig struct {
Password string
}
func BuildProducerConfig(conf kafka.Config) (*sarama.Config, error) {
return kafka.BuildProducerConfig(conf)
func BuildProducerConfig(conf kfk.Config) (*sarama.Config, error) {
return kfk.BuildProducerConfig(conf)
}
func NewKafkaProducer(kfk *sarama.Config, addr []string, topic string) (*Producer, error) {
producer, err := kafka.NewProducer(kfk, addr)
func NewKafkaProducer(config *sarama.Config, addr []string, topic string) (*Producer, error) {
producer, err := kfk.NewProducer(config, addr)
if err != nil {
return nil, err
}
return &Producer{
addr: addr,
topic: topic,
config: kfk,
config: config,
producer: producer,
}, nil
}

@ -1,77 +0,0 @@
// Copyright © 2023 OpenIM. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package kafka
import (
"fmt"
"os"
"strings"
"github.com/IBM/sarama"
"github.com/openimsdk/open-im-server/v3/pkg/common/tls"
)
type TLSConfig struct {
CACrt string
ClientCrt string
ClientKey string
ClientKeyPwd string
InsecureSkipVerify bool
}
// SetupTLSConfig set up the TLS config from config file.
func SetupTLSConfig(cfg *sarama.Config, tlsConfig *TLSConfig) error {
if tlsConfig != nil {
cfg.Net.TLS.Enable = true
tlsConfig, err := tls.NewTLSConfig(
tlsConfig.ClientCrt,
tlsConfig.ClientKey,
tlsConfig.CACrt,
[]byte(tlsConfig.ClientKeyPwd),
tlsConfig.InsecureSkipVerify,
)
if err != nil {
return err
}
cfg.Net.TLS.Config = tlsConfig
}
return nil
}
// getEnvOrConfig returns the value of the environment variable if it exists,
// otherwise, it returns the value from the configuration file.
func getEnvOrConfig(envName string, configValue string) string {
if value, exists := os.LookupEnv(envName); exists {
return value
}
return configValue
}
// getKafkaAddrFromEnv returns the Kafka addresses combined from the KAFKA_ADDRESS and KAFKA_PORT environment variables.
// If the environment variables are not set, it returns the fallback value.
func getKafkaAddrFromEnv(fallback []string) []string {
envAddr := os.Getenv("KAFKA_ADDRESS")
envPort := os.Getenv("KAFKA_PORT")
if envAddr != "" && envPort != "" {
addresses := strings.Split(envAddr, ",")
for i, addr := range addresses {
addresses[i] = fmt.Sprintf("%s:%s", addr, envPort)
}
return addresses
}
return fallback
}

@ -16,26 +16,26 @@ package rpcclient
import (
"context"
"github.com/openimsdk/open-im-server/v3/pkg/common/servererrs"
"github.com/openimsdk/tools/discovery"
"github.com/openimsdk/tools/system/program"
"github.com/openimsdk/tools/utils/datautil"
"strings"
util "github.com/openimsdk/open-im-server/v3/pkg/util/genutil"
"github.com/openimsdk/protocol/constant"
"github.com/openimsdk/protocol/group"
"github.com/openimsdk/protocol/sdkws"
"github.com/openimsdk/tools/discoveryregistry"
"github.com/openimsdk/tools/errs"
"github.com/openimsdk/tools/utils"
)
type Group struct {
Client group.GroupClient
discov discoveryregistry.SvcDiscoveryRegistry
discov discovery.SvcDiscoveryRegistry
}
func NewGroup(discov discoveryregistry.SvcDiscoveryRegistry, rpcRegisterName string) *Group {
func NewGroup(discov discovery.SvcDiscoveryRegistry, rpcRegisterName string) *Group {
conn, err := discov.GetConn(context.Background(), rpcRegisterName)
if err != nil {
util.ExitWithError(err)
program.ExitWithError(err)
}
client := group.NewGroupClient(conn)
return &Group{discov: discov, Client: client}
@ -43,7 +43,7 @@ func NewGroup(discov discoveryregistry.SvcDiscoveryRegistry, rpcRegisterName str
type GroupRpcClient Group
func NewGroupRpcClient(discov discoveryregistry.SvcDiscoveryRegistry, rpcRegisterName string) GroupRpcClient {
func NewGroupRpcClient(discov discovery.SvcDiscoveryRegistry, rpcRegisterName string) GroupRpcClient {
return GroupRpcClient(*NewGroup(discov, rpcRegisterName))
}
@ -55,10 +55,10 @@ func (g *GroupRpcClient) GetGroupInfos(ctx context.Context, groupIDs []string, c
return nil, err
}
if complete {
if ids := utils.Single(groupIDs, utils.Slice(resp.GroupInfos, func(e *sdkws.GroupInfo) string {
if ids := datautil.Single(groupIDs, datautil.Slice(resp.GroupInfos, func(e *sdkws.GroupInfo) string {
return e.GroupID
})); len(ids) > 0 {
return nil, errs.ErrGroupIDNotFound.WrapMsg(strings.Join(ids, ","))
return nil, servererrs.ErrGroupIDNotFound.WrapMsg(strings.Join(ids, ","))
}
}
return resp.GroupInfos, nil
@ -81,7 +81,7 @@ func (g *GroupRpcClient) GetGroupInfoMap(
if err != nil {
return nil, err
}
return utils.SliceToMap(groups, func(e *sdkws.GroupInfo) string {
return datautil.SliceToMap(groups, func(e *sdkws.GroupInfo) string {
return e.GroupID
}), nil
}
@ -100,10 +100,10 @@ func (g *GroupRpcClient) GetGroupMemberInfos(
return nil, err
}
if complete {
if ids := utils.Single(userIDs, utils.Slice(resp.Members, func(e *sdkws.GroupMemberFullInfo) string {
if ids := datautil.Single(userIDs, datautil.Slice(resp.Members, func(e *sdkws.GroupMemberFullInfo) string {
return e.UserID
})); len(ids) > 0 {
return nil, errs.ErrNotInGroupYet.WrapMsg(strings.Join(ids, ","))
return nil, servererrs.ErrNotInGroupYet.WrapMsg(strings.Join(ids, ","))
}
}
return resp.Members, nil
@ -131,7 +131,7 @@ func (g *GroupRpcClient) GetGroupMemberInfoMap(
if err != nil {
return nil, err
}
return utils.SliceToMap(members, func(e *sdkws.GroupMemberFullInfo) string {
return datautil.SliceToMap(members, func(e *sdkws.GroupMemberFullInfo) string {
return e.UserID
}), nil
}

@ -16,28 +16,28 @@ package rpcclient
import (
"context"
"github.com/openimsdk/tools/system/program"
util "github.com/openimsdk/open-im-server/v3/pkg/util/genutil"
"github.com/openimsdk/protocol/third"
"github.com/openimsdk/tools/discoveryregistry"
"github.com/openimsdk/tools/discovery"
"google.golang.org/grpc"
)
type Third struct {
conn grpc.ClientConnInterface
Client third.ThirdClient
discov discoveryregistry.SvcDiscoveryRegistry
discov discovery.SvcDiscoveryRegistry
GrafanaUrl string
}
func NewThird(discov discoveryregistry.SvcDiscoveryRegistry, rpcRegisterName, grafanaUrl string) *Third {
func NewThird(discov discovery.SvcDiscoveryRegistry, rpcRegisterName, grafanaUrl string) *Third {
conn, err := discov.GetConn(context.Background(), rpcRegisterName)
if err != nil {
util.ExitWithError(err)
program.ExitWithError(err)
}
client := third.NewThirdClient(conn)
if err != nil {
util.ExitWithError(err)
program.ExitWithError(err)
}
return &Third{discov: discov, Client: client, conn: conn, GrafanaUrl: grafanaUrl}
}

@ -15,15 +15,16 @@
package main
import (
"context"
"errors"
"flag"
"fmt"
kfk "github.com/openimsdk/tools/mq/kafka"
"os"
"strconv"
"strings"
"time"
"github.com/openimsdk/open-im-server/v3/pkg/common/kafka"
"gopkg.in/yaml.v2"
"github.com/openimsdk/tools/component"
@ -214,70 +215,14 @@ func checkZookeeper(config *config.GlobalConfig) error {
// checkKafka checks the Kafka connection
func checkKafka(config *config.GlobalConfig) error {
// Prioritize environment variables
kafkaStu := &component.Kafka{
Username: config.Kafka.Username,
Password: config.Kafka.Password,
Addr: config.Kafka.Addr,
}
kafkaClient, err := component.CheckKafka(kafkaStu)
if err != nil {
return err
}
defer kafkaClient.Close()
// Verify if necessary topics exist
topics, err := kafkaClient.Topics()
if err != nil {
return errs.Wrap(err)
}
requiredTopics := []string{
topics := []string{
config.Kafka.MsgToMongo.Topic,
config.Kafka.MsgToPush.Topic,
config.Kafka.LatestMsgToRedis.Topic,
}
for _, requiredTopic := range requiredTopics {
if !isTopicPresent(requiredTopic, topics) {
return errs.WrapMsg(nil, "Kafka missing required topic", "topic", requiredTopic, "availableTopics", strings.Join(topics, ", "))
}
}
type Item struct {
Topic string
GroupID string
}
items := []Item{
{
Topic: config.Kafka.LatestMsgToRedis.Topic,
GroupID: config.Kafka.ConsumerGroupID.MsgToRedis,
},
{
Topic: config.Kafka.MsgToMongo.Topic,
GroupID: config.Kafka.ConsumerGroupID.MsgToMongo,
},
{
Topic: config.Kafka.MsgToPush.Topic,
GroupID: config.Kafka.ConsumerGroupID.MsgToPush,
},
}
for _, item := range items {
cg, err := kafka.NewMConsumerGroup(config.Kafka.Config, item.GroupID, []string{item.Topic})
if err != nil {
return err
}
if err := cg.Close(); err != nil {
return err
}
}
return nil
ctx, cancel := context.WithTimeout(context.Background(), time.Second*60)
defer cancel()
return kfk.CheckKafka(ctx, &config.Kafka.Config, topics)
}
// isTopicPresent checks if a topic is present in the list of topics

Loading…
Cancel
Save