From 347ef4b3c5f2af60a4ed9196d15163e4c0a6d58f Mon Sep 17 00:00:00 2001 From: Monet Lee Date: Tue, 22 Apr 2025 14:53:30 +0800 Subject: [PATCH] feat: Implement stress test v2. --- tools/stress-test-v2/main.go | 494 +++++++++++++++++++---------------- 1 file changed, 270 insertions(+), 224 deletions(-) diff --git a/tools/stress-test-v2/main.go b/tools/stress-test-v2/main.go index fc004de44..0c309b9c9 100644 --- a/tools/stress-test-v2/main.go +++ b/tools/stress-test-v2/main.go @@ -25,36 +25,18 @@ import ( "github.com/openimsdk/tools/system/program" ) -// 测试第二期: -// 自动化测试第二期:需要多个会话 每个会话都会有新消息 -// 需求: -// (1)整体系统达到百万个群组。 -// (2)每个人同事有 100个10万人群, 1000个999人群。 -// (3)每个同事1万好友。 -// (4)每个群周期性发送消息。 - -// 如何测试: - -// (1)先让之前的测试继续运行,达到万人就停止,这样就有一个万人群了,同事也有万人好友了。 -// (2)创建10万个新用户。 -// (3)同事和新用户,建立100个10万人群。每秒钟一个。 -// (4)同事和新用户,创建1000个999人群。每秒钟一个。 -// (5)等以上完成后,10万人群,每秒钟发送1条消息。999人群,每分钟发送1条消息。 - -// TODO -// 不要出现第一个邀请的是0,应该是从1开始邀请 -// 最好是加一个检测,先判断这一批有没有在里面,有的话剔除 +// 1. Create 100K New Users +// 2. Create 100 100K Groups +// 3. Create 1000 999 Groups +// 4. Send message to 100K Groups every second +// 5. Send message to 999 Groups every minute + var ( // Use default userIDs List for testing, need to be created. TestTargetUserList = []string{ // "", - "6760971175", - "test_v3_u0", - "test_v3_u1", - "test_v3_u2", - "test_v3_u3", } - DefaultGroupID = "" // Use default group ID for testing, need to be created. + // DefaultGroupID = "" // Use default group ID for testing, need to be created. ) var ( @@ -73,16 +55,13 @@ var ( ) const ( - MaxUser = 1000 - // MaxUser = 100000 + MaxUser = 100000 Max100KGroup = 100 Max999Group = 1000 MaxInviteUserLimit = 999 - CreateUserTicker = 1 * time.Second - CreateGroupTicker = 1 * time.Second - // Create100KGroupTicker = 1 * time.Minute - // Create999GroupTicker = 1 * time.Minute + CreateUserTicker = 1 * time.Second + CreateGroupTicker = 1 * time.Second Create100KGroupTicker = 1 * time.Second Create999GroupTicker = 1 * time.Second SendMsgTo100KGroupTicker = 1 * time.Second @@ -289,28 +268,57 @@ func (st *StressTest) CreateUserBatch(ctx context.Context, userIDs []string) err return nil } -// func (st *StressTest) ImportFriend(ctx context.Context, userID string) error { -// req := relation.ImportFriendReq{ -// OwnerUserID: userID, -// FriendUserIDs: TestTargetUserList, -// } +func (st *StressTest) GetGroupMembersInfo(ctx context.Context, groupID string, userIDs []string) ([]string, error) { + needInviteUserIDs := make([]string, 0) + + const maxBatchSize = 500 + if len(userIDs) > maxBatchSize { + for i := 0; i < len(userIDs); i += maxBatchSize { + end := min(i+maxBatchSize, len(userIDs)) + batchUserIDs := userIDs[i:end] -// _, err := st.PostRequest(ctx, ApiAddress+ImportFriend, &req) -// if err != nil { -// return err -// } + // log.ZInfo(ctx, "Processing group members batch", "groupID", groupID, "batch", i/maxBatchSize+1, + // "batchUserCount", len(batchUserIDs)) -// return nil -// } + // Process a single batch + batchReq := group.GetGroupMembersInfoReq{ + GroupID: groupID, + UserIDs: batchUserIDs, + } + + resp, err := st.PostRequest(ctx, ApiAddress+GetGroupMemberInfo, &batchReq) + if err != nil { + log.ZError(ctx, "Batch query failed", err, "batch", i/maxBatchSize+1) + continue + } + + data := &group.GetGroupMembersInfoResp{} + if err := json.Unmarshal(resp, &data); err != nil { + log.ZError(ctx, "Failed to parse batch response", err, "batch", i/maxBatchSize+1) + continue + } + + // Process the batch results + existingMembers := make(map[string]bool) + for _, member := range data.Members { + existingMembers[member.UserID] = true + } + + for _, userID := range batchUserIDs { + if !existingMembers[userID] { + needInviteUserIDs = append(needInviteUserIDs, userID) + } + } + } + + return needInviteUserIDs, nil + } -func (st *StressTest) GetGroupMembersInfo(ctx context.Context, groupID string, userIDs []string) ([]string, error) { req := group.GetGroupMembersInfoReq{ GroupID: groupID, UserIDs: userIDs, } - needInviteUserIDs := make([]string, 0) - resp, err := st.PostRequest(ctx, ApiAddress+GetGroupMemberInfo, &req) if err != nil { return nil, err @@ -321,6 +329,17 @@ func (st *StressTest) GetGroupMembersInfo(ctx context.Context, groupID string, u return nil, err } + existingMembers := make(map[string]bool) + for _, member := range data.Members { + existingMembers[member.UserID] = true + } + + for _, userID := range userIDs { + if !existingMembers[userID] { + needInviteUserIDs = append(needInviteUserIDs, userID) + } + } + return needInviteUserIDs, nil } @@ -339,7 +358,8 @@ func (st *StressTest) InviteToGroup(ctx context.Context, groupID string, userIDs func (st *StressTest) SendMsg(ctx context.Context, userID string, groupID string) error { contentObj := map[string]any{ - "content": fmt.Sprintf("index %d. The current time is %s", st.MsgCounter, time.Now().Format("2006-01-02 15:04:05.000")), + // "content": fmt.Sprintf("index %d. The current time is %s", st.MsgCounter, time.Now().Format("2006-01-02 15:04:05.000")), + "content": fmt.Sprintf("The current time is %s", time.Now().Format("2006-01-02 15:04:05.000")), } req := &apistruct.SendMsgReq{ @@ -366,8 +386,6 @@ func (st *StressTest) SendMsg(ctx context.Context, userID string, groupID string // Max userIDs number is 1000 func (st *StressTest) CreateGroup(ctx context.Context, groupID string, userID string, userIDsList []string) (string, error) { - // groupID := fmt.Sprintf("StressTestGroup_v2_%d_%s", st.GroupCounter, time.Now().Format("20060102150405")) - groupInfo := &sdkws.GroupInfo{ GroupID: groupID, GroupName: groupID, @@ -459,9 +477,6 @@ func main() { fmt.Println("Admin Token:", st.AdminToken) fmt.Println("ApiAddress:", ApiAddress) - // 最后一位 后面在拼接 - // userID := fmt.Sprintf("v2_Stresstest_%d", st.UserCounter) - for i := range MaxUser { userID := fmt.Sprintf("v2_StressTest_User_%d", i) st.CreatedUsers = append(st.CreatedUsers, userID) @@ -485,206 +500,237 @@ func main() { end := min(i+batchSize, totalUsers) userBatch := st.CreatedUsers[i:end] - log.ZInfo(st.Ctx, "创建用户批次", "批次", i/batchSize+1, "数量", len(userBatch)) + log.ZInfo(st.Ctx, "Creating user batch", "batch", i/batchSize+1, "count", len(userBatch)) err = st.CreateUserBatch(st.Ctx, userBatch) if err != nil { - log.ZError(st.Ctx, "批量创建用户失败", err, "批次", i/batchSize+1) + log.ZError(st.Ctx, "Batch user creation failed", err, "batch", i/batchSize+1) } else { successCount += len(userBatch) - log.ZInfo(st.Ctx, "批量创建用户成功", "批次", i/batchSize+1, - "进度", fmt.Sprintf("%d/%d", successCount, totalUsers)) + log.ZInfo(st.Ctx, "Batch user creation succeeded", "batch", i/batchSize+1, + "progress", fmt.Sprintf("%d/%d", successCount, totalUsers)) } + } - // Execute create 100k group - st.Wg.Add(1) - go func() { - create100kGroupTicker := time.NewTicker(Create100KGroupTicker) - defer create100kGroupTicker.Stop() - - for i := range Max100KGroup { - select { - case <-st.Ctx.Done(): - log.ZInfo(st.Ctx, "Stop Create 100K Group") - return - - case <-create100kGroupTicker.C: - // Create 100K groups - st.Wg.Add(1) - go func(idx int) { - defer st.Wg.Done() - defer func() { - st.Create100kGroupCounter++ - }() - - groupID := fmt.Sprintf("v2_StressTest_Group_100K_%d", idx) - - if _, err = st.CreateGroup(st.Ctx, groupID, st.DefaultUserID, TestTargetUserList); err != nil { - log.ZError(st.Ctx, "Create group failed.", err) - // continue + // Execute create 100k group + st.Wg.Add(1) + go func() { + defer st.Wg.Done() + + create100kGroupTicker := time.NewTicker(Create100KGroupTicker) + defer create100kGroupTicker.Stop() + + for i := range Max100KGroup { + select { + case <-st.Ctx.Done(): + log.ZInfo(st.Ctx, "Stop Create 100K Group") + return + + case <-create100kGroupTicker.C: + // Create 100K groups + st.Wg.Add(1) + go func(idx int) { + defer st.Wg.Done() + defer func() { + st.Create100kGroupCounter++ + }() + + groupID := fmt.Sprintf("v2_StressTest_Group_100K_%d", idx) + + if _, err = st.CreateGroup(st.Ctx, groupID, st.DefaultUserID, TestTargetUserList); err != nil { + log.ZError(st.Ctx, "Create group failed.", err) + // continue + } + + for i := 0; i < MaxUser/MaxInviteUserLimit; i++ { + InviteUserIDs := make([]string, 0) + // ensure TargetUserList is in group + InviteUserIDs = append(InviteUserIDs, TestTargetUserList...) + + startIdx := max(i*MaxInviteUserLimit, 1) + endIdx := min((i+1)*MaxInviteUserLimit, MaxUser) + + for j := startIdx; j < endIdx; j++ { + userCreatedID := fmt.Sprintf("v2_StressTest_User_%d", j) + InviteUserIDs = append(InviteUserIDs, userCreatedID) } - for i := 0; i < MaxUser/MaxInviteUserLimit; i++ { - InviteUserIDs := make([]string, 0) - - startIdx := max(i*MaxInviteUserLimit, 1) - endIdx := min((i+1)*MaxInviteUserLimit, MaxUser) - - for j := startIdx; j < endIdx; j++ { - userCreatedID := fmt.Sprintf("v2_StressTest_User_%d", j) - InviteUserIDs = append(InviteUserIDs, userCreatedID) - } - - if len(InviteUserIDs) == 0 { - log.ZWarn(st.Ctx, "InviteUserIDs is empty", nil, "groupID", groupID) - continue - } - - // Invite To Group - if err = st.InviteToGroup(st.Ctx, groupID, InviteUserIDs); err != nil { - log.ZError(st.Ctx, "Invite To Group failed.", err, "UserID", InviteUserIDs) - continue - // os.Exit(1) - // return - } + if len(InviteUserIDs) == 0 { + log.ZWarn(st.Ctx, "InviteUserIDs is empty", nil, "groupID", groupID) + continue } - }(i) - } - } - }() - // create 999 groups - st.Wg.Add(1) - go func() { - create999GroupTicker := time.NewTicker(Create999GroupTicker) - defer create999GroupTicker.Stop() - - for i := range Max999Group { - select { - case <-st.Ctx.Done(): - log.ZInfo(st.Ctx, "Stop Create 999 Group") - return - - case <-create999GroupTicker.C: - // Create 999 groups - st.Wg.Add(1) - go func(idx int) { - defer st.Wg.Done() - defer func() { - st.Create999GroupCounter++ - }() - - groupID := fmt.Sprintf("v2_StressTest_Group_1K_%d", idx) - - if groupID, err = st.CreateGroup(st.Ctx, groupID, st.DefaultUserID, TestTargetUserList); err != nil { - log.ZError(st.Ctx, "Create group failed.", err) - // continue + InviteUserIDs, err := st.GetGroupMembersInfo(ctx, groupID, InviteUserIDs) + if err != nil { + log.ZError(st.Ctx, "GetGroupMembersInfo failed.", err, "groupID", groupID) + continue } - for i := 0; i < MaxUser/MaxInviteUserLimit; i++ { - InviteUserIDs := make([]string, 0) - - startIdx := i * MaxInviteUserLimit - endIdx := min((i+1)*MaxInviteUserLimit, MaxUser) - - for j := startIdx; j < endIdx; j++ { - userCreatedID := fmt.Sprintf("v2_StressTest_User_%d", j) - InviteUserIDs = append(InviteUserIDs, userCreatedID) - } - - if len(InviteUserIDs) == 0 { - log.ZWarn(st.Ctx, "InviteUserIDs is empty", nil, "groupID", groupID) - continue - } - - // Invite To Group - if err = st.InviteToGroup(st.Ctx, groupID, InviteUserIDs); err != nil { - log.ZError(st.Ctx, "Invite To Group failed.", err, "UserID", InviteUserIDs) - continue - // os.Exit(1) - // return - } + + if len(InviteUserIDs) == 0 { + log.ZWarn(st.Ctx, "InviteUserIDs is empty", nil, "groupID", groupID) + continue } - }(i) - } - } - }() - // Send message to 100K groups - st.Wg.Wait() - fmt.Println("All groups created successfully, starting to send messages...") + // Invite To Group + if err = st.InviteToGroup(st.Ctx, groupID, InviteUserIDs); err != nil { + log.ZError(st.Ctx, "Invite To Group failed.", err, "UserID", InviteUserIDs) + continue + // os.Exit(1) + // return + } + } + }(i) + } + } + }() - var groups100K []string - var groups999 []string + // create 999 groups + st.Wg.Add(1) + go func() { + defer st.Wg.Done() - for i := range Max100KGroup { - groupID := fmt.Sprintf("v2_StressTest_Group_100K_%d", i) - groups100K = append(groups100K, groupID) - } + create999GroupTicker := time.NewTicker(Create999GroupTicker) + defer create999GroupTicker.Stop() for i := range Max999Group { - groupID := fmt.Sprintf("v2_StressTest_Group_1K_%d", i) - groups999 = append(groups999, groupID) - } + select { + case <-st.Ctx.Done(): + log.ZInfo(st.Ctx, "Stop Create 999 Group") + return + + case <-create999GroupTicker.C: + // Create 999 groups + st.Wg.Add(1) + go func(idx int) { + defer st.Wg.Done() + defer func() { + st.Create999GroupCounter++ + }() + + groupID := fmt.Sprintf("v2_StressTest_Group_1K_%d", idx) + + if _, err = st.CreateGroup(st.Ctx, groupID, st.DefaultUserID, TestTargetUserList); err != nil { + log.ZError(st.Ctx, "Create group failed.", err) + // continue + } + for i := 0; i < MaxUser/MaxInviteUserLimit; i++ { + InviteUserIDs := make([]string, 0) + // ensure TargetUserList is in group + InviteUserIDs = append(InviteUserIDs, TestTargetUserList...) - send100kGroupLimiter := make(chan struct{}, 20) - send999GroupLimiter := make(chan struct{}, 100) + startIdx := max(i*MaxInviteUserLimit, 1) + endIdx := min((i+1)*MaxInviteUserLimit, MaxUser) - // execute Send message to 100K groups - go func() { - ticker := time.NewTicker(SendMsgTo100KGroupTicker) - defer ticker.Stop() - - for { - select { - case <-st.Ctx.Done(): - log.ZInfo(st.Ctx, "Stop Send Message to 100K Group") - return - - case <-ticker.C: - // Send message to 100K groups - for _, groupID := range groups100K { - send100kGroupLimiter <- struct{}{} - go func(groupID string) { - defer func() { <-send100kGroupLimiter }() - if err := st.SendMsg(st.Ctx, st.AdminUserID, groupID); err != nil { - log.ZError(st.Ctx, "Send message to 100K group failed.", err) - } - }(groupID) + for j := startIdx; j < endIdx; j++ { + userCreatedID := fmt.Sprintf("v2_StressTest_User_%d", j) + InviteUserIDs = append(InviteUserIDs, userCreatedID) + } + + if len(InviteUserIDs) == 0 { + log.ZWarn(st.Ctx, "InviteUserIDs is empty", nil, "groupID", groupID) + continue + } + + InviteUserIDs, err := st.GetGroupMembersInfo(ctx, groupID, InviteUserIDs) + if err != nil { + log.ZError(st.Ctx, "GetGroupMembersInfo failed.", err, "groupID", groupID) + continue + } + + if len(InviteUserIDs) == 0 { + log.ZWarn(st.Ctx, "InviteUserIDs is empty", nil, "groupID", groupID) + continue + } + + // Invite To Group + if err = st.InviteToGroup(st.Ctx, groupID, InviteUserIDs); err != nil { + log.ZError(st.Ctx, "Invite To Group failed.", err, "UserID", InviteUserIDs) + continue + // os.Exit(1) + // return + } } - log.ZInfo(st.Ctx, "Send message to 100K groups successfully.") + }(i) + } + } + }() + + // Send message to 100K groups + st.Wg.Wait() + fmt.Println("All groups created successfully, starting to send messages...") + log.ZInfo(ctx, "All groups created successfully, starting to send messages...") + + var groups100K []string + var groups999 []string + + for i := range Max100KGroup { + groupID := fmt.Sprintf("v2_StressTest_Group_100K_%d", i) + groups100K = append(groups100K, groupID) + } + + for i := range Max999Group { + groupID := fmt.Sprintf("v2_StressTest_Group_1K_%d", i) + groups999 = append(groups999, groupID) + } + + send100kGroupLimiter := make(chan struct{}, 20) + send999GroupLimiter := make(chan struct{}, 100) + + // execute Send message to 100K groups + go func() { + ticker := time.NewTicker(SendMsgTo100KGroupTicker) + defer ticker.Stop() + + for { + select { + case <-st.Ctx.Done(): + log.ZInfo(st.Ctx, "Stop Send Message to 100K Group") + return + + case <-ticker.C: + // Send message to 100K groups + for _, groupID := range groups100K { + send100kGroupLimiter <- struct{}{} + go func(groupID string) { + defer func() { <-send100kGroupLimiter }() + if err := st.SendMsg(st.Ctx, st.DefaultUserID, groupID); err != nil { + log.ZError(st.Ctx, "Send message to 100K group failed.", err) + } + }(groupID) } + // log.ZInfo(st.Ctx, "Send message to 100K groups successfully.") } - }() + } + }() - // execute Send message to 999 groups - go func() { - ticker := time.NewTicker(SendMsgTo999GroupTicker) - defer ticker.Stop() - - for { - select { - case <-st.Ctx.Done(): - log.ZInfo(st.Ctx, "Stop Send Message to 999 Group") - return - - case <-ticker.C: - // Send message to 999 groups - for _, groupID := range groups999 { - send999GroupLimiter <- struct{}{} - go func(groupID string) { - defer func() { <-send999GroupLimiter }() - - if err := st.SendMsg(st.Ctx, st.AdminUserID, groupID); err != nil { - log.ZError(st.Ctx, "Send message to 999 group failed.", err) - } - }(groupID) - } - log.ZInfo(st.Ctx, "Send message to 999 groups successfully.") + // execute Send message to 999 groups + go func() { + ticker := time.NewTicker(SendMsgTo999GroupTicker) + defer ticker.Stop() + + for { + select { + case <-st.Ctx.Done(): + log.ZInfo(st.Ctx, "Stop Send Message to 999 Group") + return + + case <-ticker.C: + // Send message to 999 groups + for _, groupID := range groups999 { + send999GroupLimiter <- struct{}{} + go func(groupID string) { + defer func() { <-send999GroupLimiter }() + + if err := st.SendMsg(st.Ctx, st.DefaultUserID, groupID); err != nil { + log.ZError(st.Ctx, "Send message to 999 group failed.", err) + } + }(groupID) } + // log.ZInfo(st.Ctx, "Send message to 999 groups successfully.") } - }() + } + }() - } <-st.Ctx.Done() fmt.Println("Received signal to exit, shutting down...") }