feat: Implement stress test v2.

pull/3292/head
Monet Lee 5 months ago
parent d0426caa44
commit 347ef4b3c5

@ -25,36 +25,18 @@ import (
"github.com/openimsdk/tools/system/program" "github.com/openimsdk/tools/system/program"
) )
// 测试第二期: // 1. Create 100K New Users
// 自动化测试第二期:需要多个会话 每个会话都会有新消息 // 2. Create 100 100K Groups
// 需求: // 3. Create 1000 999 Groups
// 1整体系统达到百万个群组。 // 4. Send message to 100K Groups every second
// 2每个人同事有 100个10万人群 1000个999人群。 // 5. Send message to 999 Groups every minute
// 3每个同事1万好友。
// 4每个群周期性发送消息。
// 如何测试:
// 1先让之前的测试继续运行达到万人就停止这样就有一个万人群了同事也有万人好友了。
// 2创建10万个新用户。
// 3同事和新用户建立100个10万人群。每秒钟一个。
// 4同事和新用户创建1000个999人群。每秒钟一个。
// 5等以上完成后10万人群每秒钟发送1条消息。999人群每分钟发送1条消息。
// TODO
// 不要出现第一个邀请的是0应该是从1开始邀请
// 最好是加一个检测,先判断这一批有没有在里面,有的话剔除
var ( var (
// Use default userIDs List for testing, need to be created. // Use default userIDs List for testing, need to be created.
TestTargetUserList = []string{ TestTargetUserList = []string{
// "<need-update-it>", // "<need-update-it>",
"6760971175",
"test_v3_u0",
"test_v3_u1",
"test_v3_u2",
"test_v3_u3",
} }
DefaultGroupID = "<need-update-it>" // Use default group ID for testing, need to be created. // DefaultGroupID = "<need-update-it>" // Use default group ID for testing, need to be created.
) )
var ( var (
@ -73,16 +55,13 @@ var (
) )
const ( const (
MaxUser = 1000 MaxUser = 100000
// MaxUser = 100000
Max100KGroup = 100 Max100KGroup = 100
Max999Group = 1000 Max999Group = 1000
MaxInviteUserLimit = 999 MaxInviteUserLimit = 999
CreateUserTicker = 1 * time.Second CreateUserTicker = 1 * time.Second
CreateGroupTicker = 1 * time.Second CreateGroupTicker = 1 * time.Second
// Create100KGroupTicker = 1 * time.Minute
// Create999GroupTicker = 1 * time.Minute
Create100KGroupTicker = 1 * time.Second Create100KGroupTicker = 1 * time.Second
Create999GroupTicker = 1 * time.Second Create999GroupTicker = 1 * time.Second
SendMsgTo100KGroupTicker = 1 * time.Second SendMsgTo100KGroupTicker = 1 * time.Second
@ -289,28 +268,57 @@ func (st *StressTest) CreateUserBatch(ctx context.Context, userIDs []string) err
return nil return nil
} }
// func (st *StressTest) ImportFriend(ctx context.Context, userID string) error { func (st *StressTest) GetGroupMembersInfo(ctx context.Context, groupID string, userIDs []string) ([]string, error) {
// req := relation.ImportFriendReq{ needInviteUserIDs := make([]string, 0)
// OwnerUserID: userID,
// FriendUserIDs: TestTargetUserList, const maxBatchSize = 500
// } if len(userIDs) > maxBatchSize {
for i := 0; i < len(userIDs); i += maxBatchSize {
end := min(i+maxBatchSize, len(userIDs))
batchUserIDs := userIDs[i:end]
// _, err := st.PostRequest(ctx, ApiAddress+ImportFriend, &req) // log.ZInfo(ctx, "Processing group members batch", "groupID", groupID, "batch", i/maxBatchSize+1,
// if err != nil { // "batchUserCount", len(batchUserIDs))
// return err
// }
// return nil // Process a single batch
// } batchReq := group.GetGroupMembersInfoReq{
GroupID: groupID,
UserIDs: batchUserIDs,
}
resp, err := st.PostRequest(ctx, ApiAddress+GetGroupMemberInfo, &batchReq)
if err != nil {
log.ZError(ctx, "Batch query failed", err, "batch", i/maxBatchSize+1)
continue
}
data := &group.GetGroupMembersInfoResp{}
if err := json.Unmarshal(resp, &data); err != nil {
log.ZError(ctx, "Failed to parse batch response", err, "batch", i/maxBatchSize+1)
continue
}
// Process the batch results
existingMembers := make(map[string]bool)
for _, member := range data.Members {
existingMembers[member.UserID] = true
}
for _, userID := range batchUserIDs {
if !existingMembers[userID] {
needInviteUserIDs = append(needInviteUserIDs, userID)
}
}
}
return needInviteUserIDs, nil
}
func (st *StressTest) GetGroupMembersInfo(ctx context.Context, groupID string, userIDs []string) ([]string, error) {
req := group.GetGroupMembersInfoReq{ req := group.GetGroupMembersInfoReq{
GroupID: groupID, GroupID: groupID,
UserIDs: userIDs, UserIDs: userIDs,
} }
needInviteUserIDs := make([]string, 0)
resp, err := st.PostRequest(ctx, ApiAddress+GetGroupMemberInfo, &req) resp, err := st.PostRequest(ctx, ApiAddress+GetGroupMemberInfo, &req)
if err != nil { if err != nil {
return nil, err return nil, err
@ -321,6 +329,17 @@ func (st *StressTest) GetGroupMembersInfo(ctx context.Context, groupID string, u
return nil, err return nil, err
} }
existingMembers := make(map[string]bool)
for _, member := range data.Members {
existingMembers[member.UserID] = true
}
for _, userID := range userIDs {
if !existingMembers[userID] {
needInviteUserIDs = append(needInviteUserIDs, userID)
}
}
return needInviteUserIDs, nil return needInviteUserIDs, nil
} }
@ -339,7 +358,8 @@ func (st *StressTest) InviteToGroup(ctx context.Context, groupID string, userIDs
func (st *StressTest) SendMsg(ctx context.Context, userID string, groupID string) error { func (st *StressTest) SendMsg(ctx context.Context, userID string, groupID string) error {
contentObj := map[string]any{ contentObj := map[string]any{
"content": fmt.Sprintf("index %d. The current time is %s", st.MsgCounter, time.Now().Format("2006-01-02 15:04:05.000")), // "content": fmt.Sprintf("index %d. The current time is %s", st.MsgCounter, time.Now().Format("2006-01-02 15:04:05.000")),
"content": fmt.Sprintf("The current time is %s", time.Now().Format("2006-01-02 15:04:05.000")),
} }
req := &apistruct.SendMsgReq{ req := &apistruct.SendMsgReq{
@ -366,8 +386,6 @@ func (st *StressTest) SendMsg(ctx context.Context, userID string, groupID string
// Max userIDs number is 1000 // Max userIDs number is 1000
func (st *StressTest) CreateGroup(ctx context.Context, groupID string, userID string, userIDsList []string) (string, error) { func (st *StressTest) CreateGroup(ctx context.Context, groupID string, userID string, userIDsList []string) (string, error) {
// groupID := fmt.Sprintf("StressTestGroup_v2_%d_%s", st.GroupCounter, time.Now().Format("20060102150405"))
groupInfo := &sdkws.GroupInfo{ groupInfo := &sdkws.GroupInfo{
GroupID: groupID, GroupID: groupID,
GroupName: groupID, GroupName: groupID,
@ -459,9 +477,6 @@ func main() {
fmt.Println("Admin Token:", st.AdminToken) fmt.Println("Admin Token:", st.AdminToken)
fmt.Println("ApiAddress:", ApiAddress) fmt.Println("ApiAddress:", ApiAddress)
// 最后一位 后面在拼接
// userID := fmt.Sprintf("v2_Stresstest_%d", st.UserCounter)
for i := range MaxUser { for i := range MaxUser {
userID := fmt.Sprintf("v2_StressTest_User_%d", i) userID := fmt.Sprintf("v2_StressTest_User_%d", i)
st.CreatedUsers = append(st.CreatedUsers, userID) st.CreatedUsers = append(st.CreatedUsers, userID)
@ -485,206 +500,237 @@ func main() {
end := min(i+batchSize, totalUsers) end := min(i+batchSize, totalUsers)
userBatch := st.CreatedUsers[i:end] userBatch := st.CreatedUsers[i:end]
log.ZInfo(st.Ctx, "创建用户批次", "批次", i/batchSize+1, "数量", len(userBatch)) log.ZInfo(st.Ctx, "Creating user batch", "batch", i/batchSize+1, "count", len(userBatch))
err = st.CreateUserBatch(st.Ctx, userBatch) err = st.CreateUserBatch(st.Ctx, userBatch)
if err != nil { if err != nil {
log.ZError(st.Ctx, "批量创建用户失败", err, "批次", i/batchSize+1) log.ZError(st.Ctx, "Batch user creation failed", err, "batch", i/batchSize+1)
} else { } else {
successCount += len(userBatch) successCount += len(userBatch)
log.ZInfo(st.Ctx, "批量创建用户成功", "批次", i/batchSize+1, log.ZInfo(st.Ctx, "Batch user creation succeeded", "batch", i/batchSize+1,
"进度", fmt.Sprintf("%d/%d", successCount, totalUsers)) "progress", fmt.Sprintf("%d/%d", successCount, totalUsers))
} }
}
// Execute create 100k group // Execute create 100k group
st.Wg.Add(1) st.Wg.Add(1)
go func() { go func() {
create100kGroupTicker := time.NewTicker(Create100KGroupTicker) defer st.Wg.Done()
defer create100kGroupTicker.Stop()
create100kGroupTicker := time.NewTicker(Create100KGroupTicker)
for i := range Max100KGroup { defer create100kGroupTicker.Stop()
select {
case <-st.Ctx.Done(): for i := range Max100KGroup {
log.ZInfo(st.Ctx, "Stop Create 100K Group") select {
return case <-st.Ctx.Done():
log.ZInfo(st.Ctx, "Stop Create 100K Group")
case <-create100kGroupTicker.C: return
// Create 100K groups
st.Wg.Add(1) case <-create100kGroupTicker.C:
go func(idx int) { // Create 100K groups
defer st.Wg.Done() st.Wg.Add(1)
defer func() { go func(idx int) {
st.Create100kGroupCounter++ defer st.Wg.Done()
}() defer func() {
st.Create100kGroupCounter++
groupID := fmt.Sprintf("v2_StressTest_Group_100K_%d", idx) }()
if _, err = st.CreateGroup(st.Ctx, groupID, st.DefaultUserID, TestTargetUserList); err != nil { groupID := fmt.Sprintf("v2_StressTest_Group_100K_%d", idx)
log.ZError(st.Ctx, "Create group failed.", err)
// continue if _, err = st.CreateGroup(st.Ctx, groupID, st.DefaultUserID, TestTargetUserList); err != nil {
log.ZError(st.Ctx, "Create group failed.", err)
// continue
}
for i := 0; i < MaxUser/MaxInviteUserLimit; i++ {
InviteUserIDs := make([]string, 0)
// ensure TargetUserList is in group
InviteUserIDs = append(InviteUserIDs, TestTargetUserList...)
startIdx := max(i*MaxInviteUserLimit, 1)
endIdx := min((i+1)*MaxInviteUserLimit, MaxUser)
for j := startIdx; j < endIdx; j++ {
userCreatedID := fmt.Sprintf("v2_StressTest_User_%d", j)
InviteUserIDs = append(InviteUserIDs, userCreatedID)
} }
for i := 0; i < MaxUser/MaxInviteUserLimit; i++ { if len(InviteUserIDs) == 0 {
InviteUserIDs := make([]string, 0) log.ZWarn(st.Ctx, "InviteUserIDs is empty", nil, "groupID", groupID)
continue
startIdx := max(i*MaxInviteUserLimit, 1)
endIdx := min((i+1)*MaxInviteUserLimit, MaxUser)
for j := startIdx; j < endIdx; j++ {
userCreatedID := fmt.Sprintf("v2_StressTest_User_%d", j)
InviteUserIDs = append(InviteUserIDs, userCreatedID)
}
if len(InviteUserIDs) == 0 {
log.ZWarn(st.Ctx, "InviteUserIDs is empty", nil, "groupID", groupID)
continue
}
// Invite To Group
if err = st.InviteToGroup(st.Ctx, groupID, InviteUserIDs); err != nil {
log.ZError(st.Ctx, "Invite To Group failed.", err, "UserID", InviteUserIDs)
continue
// os.Exit(1)
// return
}
} }
}(i)
}
}
}()
// create 999 groups InviteUserIDs, err := st.GetGroupMembersInfo(ctx, groupID, InviteUserIDs)
st.Wg.Add(1) if err != nil {
go func() { log.ZError(st.Ctx, "GetGroupMembersInfo failed.", err, "groupID", groupID)
create999GroupTicker := time.NewTicker(Create999GroupTicker) continue
defer create999GroupTicker.Stop()
for i := range Max999Group {
select {
case <-st.Ctx.Done():
log.ZInfo(st.Ctx, "Stop Create 999 Group")
return
case <-create999GroupTicker.C:
// Create 999 groups
st.Wg.Add(1)
go func(idx int) {
defer st.Wg.Done()
defer func() {
st.Create999GroupCounter++
}()
groupID := fmt.Sprintf("v2_StressTest_Group_1K_%d", idx)
if groupID, err = st.CreateGroup(st.Ctx, groupID, st.DefaultUserID, TestTargetUserList); err != nil {
log.ZError(st.Ctx, "Create group failed.", err)
// continue
} }
for i := 0; i < MaxUser/MaxInviteUserLimit; i++ {
InviteUserIDs := make([]string, 0) if len(InviteUserIDs) == 0 {
log.ZWarn(st.Ctx, "InviteUserIDs is empty", nil, "groupID", groupID)
startIdx := i * MaxInviteUserLimit continue
endIdx := min((i+1)*MaxInviteUserLimit, MaxUser)
for j := startIdx; j < endIdx; j++ {
userCreatedID := fmt.Sprintf("v2_StressTest_User_%d", j)
InviteUserIDs = append(InviteUserIDs, userCreatedID)
}
if len(InviteUserIDs) == 0 {
log.ZWarn(st.Ctx, "InviteUserIDs is empty", nil, "groupID", groupID)
continue
}
// Invite To Group
if err = st.InviteToGroup(st.Ctx, groupID, InviteUserIDs); err != nil {
log.ZError(st.Ctx, "Invite To Group failed.", err, "UserID", InviteUserIDs)
continue
// os.Exit(1)
// return
}
} }
}(i)
}
}
}()
// Send message to 100K groups // Invite To Group
st.Wg.Wait() if err = st.InviteToGroup(st.Ctx, groupID, InviteUserIDs); err != nil {
fmt.Println("All groups created successfully, starting to send messages...") log.ZError(st.Ctx, "Invite To Group failed.", err, "UserID", InviteUserIDs)
continue
// os.Exit(1)
// return
}
}
}(i)
}
}
}()
var groups100K []string // create 999 groups
var groups999 []string st.Wg.Add(1)
go func() {
defer st.Wg.Done()
for i := range Max100KGroup { create999GroupTicker := time.NewTicker(Create999GroupTicker)
groupID := fmt.Sprintf("v2_StressTest_Group_100K_%d", i) defer create999GroupTicker.Stop()
groups100K = append(groups100K, groupID)
}
for i := range Max999Group { for i := range Max999Group {
groupID := fmt.Sprintf("v2_StressTest_Group_1K_%d", i) select {
groups999 = append(groups999, groupID) case <-st.Ctx.Done():
} log.ZInfo(st.Ctx, "Stop Create 999 Group")
return
case <-create999GroupTicker.C:
// Create 999 groups
st.Wg.Add(1)
go func(idx int) {
defer st.Wg.Done()
defer func() {
st.Create999GroupCounter++
}()
groupID := fmt.Sprintf("v2_StressTest_Group_1K_%d", idx)
if _, err = st.CreateGroup(st.Ctx, groupID, st.DefaultUserID, TestTargetUserList); err != nil {
log.ZError(st.Ctx, "Create group failed.", err)
// continue
}
for i := 0; i < MaxUser/MaxInviteUserLimit; i++ {
InviteUserIDs := make([]string, 0)
// ensure TargetUserList is in group
InviteUserIDs = append(InviteUserIDs, TestTargetUserList...)
send100kGroupLimiter := make(chan struct{}, 20) startIdx := max(i*MaxInviteUserLimit, 1)
send999GroupLimiter := make(chan struct{}, 100) endIdx := min((i+1)*MaxInviteUserLimit, MaxUser)
// execute Send message to 100K groups for j := startIdx; j < endIdx; j++ {
go func() { userCreatedID := fmt.Sprintf("v2_StressTest_User_%d", j)
ticker := time.NewTicker(SendMsgTo100KGroupTicker) InviteUserIDs = append(InviteUserIDs, userCreatedID)
defer ticker.Stop() }
for { if len(InviteUserIDs) == 0 {
select { log.ZWarn(st.Ctx, "InviteUserIDs is empty", nil, "groupID", groupID)
case <-st.Ctx.Done(): continue
log.ZInfo(st.Ctx, "Stop Send Message to 100K Group") }
return
InviteUserIDs, err := st.GetGroupMembersInfo(ctx, groupID, InviteUserIDs)
case <-ticker.C: if err != nil {
// Send message to 100K groups log.ZError(st.Ctx, "GetGroupMembersInfo failed.", err, "groupID", groupID)
for _, groupID := range groups100K { continue
send100kGroupLimiter <- struct{}{} }
go func(groupID string) {
defer func() { <-send100kGroupLimiter }() if len(InviteUserIDs) == 0 {
if err := st.SendMsg(st.Ctx, st.AdminUserID, groupID); err != nil { log.ZWarn(st.Ctx, "InviteUserIDs is empty", nil, "groupID", groupID)
log.ZError(st.Ctx, "Send message to 100K group failed.", err) continue
} }
}(groupID)
// Invite To Group
if err = st.InviteToGroup(st.Ctx, groupID, InviteUserIDs); err != nil {
log.ZError(st.Ctx, "Invite To Group failed.", err, "UserID", InviteUserIDs)
continue
// os.Exit(1)
// return
}
} }
log.ZInfo(st.Ctx, "Send message to 100K groups successfully.") }(i)
}
}
}()
// Send message to 100K groups
st.Wg.Wait()
fmt.Println("All groups created successfully, starting to send messages...")
log.ZInfo(ctx, "All groups created successfully, starting to send messages...")
var groups100K []string
var groups999 []string
for i := range Max100KGroup {
groupID := fmt.Sprintf("v2_StressTest_Group_100K_%d", i)
groups100K = append(groups100K, groupID)
}
for i := range Max999Group {
groupID := fmt.Sprintf("v2_StressTest_Group_1K_%d", i)
groups999 = append(groups999, groupID)
}
send100kGroupLimiter := make(chan struct{}, 20)
send999GroupLimiter := make(chan struct{}, 100)
// execute Send message to 100K groups
go func() {
ticker := time.NewTicker(SendMsgTo100KGroupTicker)
defer ticker.Stop()
for {
select {
case <-st.Ctx.Done():
log.ZInfo(st.Ctx, "Stop Send Message to 100K Group")
return
case <-ticker.C:
// Send message to 100K groups
for _, groupID := range groups100K {
send100kGroupLimiter <- struct{}{}
go func(groupID string) {
defer func() { <-send100kGroupLimiter }()
if err := st.SendMsg(st.Ctx, st.DefaultUserID, groupID); err != nil {
log.ZError(st.Ctx, "Send message to 100K group failed.", err)
}
}(groupID)
} }
// log.ZInfo(st.Ctx, "Send message to 100K groups successfully.")
} }
}() }
}()
// execute Send message to 999 groups // execute Send message to 999 groups
go func() { go func() {
ticker := time.NewTicker(SendMsgTo999GroupTicker) ticker := time.NewTicker(SendMsgTo999GroupTicker)
defer ticker.Stop() defer ticker.Stop()
for { for {
select { select {
case <-st.Ctx.Done(): case <-st.Ctx.Done():
log.ZInfo(st.Ctx, "Stop Send Message to 999 Group") log.ZInfo(st.Ctx, "Stop Send Message to 999 Group")
return return
case <-ticker.C: case <-ticker.C:
// Send message to 999 groups // Send message to 999 groups
for _, groupID := range groups999 { for _, groupID := range groups999 {
send999GroupLimiter <- struct{}{} send999GroupLimiter <- struct{}{}
go func(groupID string) { go func(groupID string) {
defer func() { <-send999GroupLimiter }() defer func() { <-send999GroupLimiter }()
if err := st.SendMsg(st.Ctx, st.AdminUserID, groupID); err != nil { if err := st.SendMsg(st.Ctx, st.DefaultUserID, groupID); err != nil {
log.ZError(st.Ctx, "Send message to 999 group failed.", err) log.ZError(st.Ctx, "Send message to 999 group failed.", err)
} }
}(groupID) }(groupID)
}
log.ZInfo(st.Ctx, "Send message to 999 groups successfully.")
} }
// log.ZInfo(st.Ctx, "Send message to 999 groups successfully.")
} }
}() }
}()
}
<-st.Ctx.Done() <-st.Ctx.Done()
fmt.Println("Received signal to exit, shutting down...") fmt.Println("Received signal to exit, shutting down...")
} }

Loading…
Cancel
Save