diff --git a/.github/workflows/update-version-file-on-release.yml b/.github/workflows/update-version-file-on-release.yml index b7e2a4409..039ff3d4b 100644 --- a/.github/workflows/update-version-file-on-release.yml +++ b/.github/workflows/update-version-file-on-release.yml @@ -96,13 +96,13 @@ jobs: repo, per_page: 100 }); - + release = releases.data.find(r => r.draft && r.tag_name === tagName); if (!release) { throw new Error(`No release found with tag ${tagName}`); } } - + await github.rest.repos.updateRelease({ owner, repo, @@ -110,10 +110,10 @@ jobs: draft: false, prerelease: release.prerelease }); - + const status = release.draft ? "was draft" : "was already published"; core.info(`Release ${tagName} ensured to be published (${status}).`); - + } catch (error) { core.warning(`Could not find or update release for tag ${tagName}: ${error.message}`); } diff --git a/config/openim-rpc-msg.yml b/config/openim-rpc-msg.yml index ffe8943df..e05eeb7e4 100644 --- a/config/openim-rpc-msg.yml +++ b/config/openim-rpc-msg.yml @@ -1,6 +1,6 @@ rpc: # The IP address where this RPC service registers itself; if left blank, it defaults to the internal network IP - registerIP: + registerIP: # IP address that the RPC service listens on; setting to 0.0.0.0 listens on both internal and external IPs. If left blank, it automatically uses the internal network IP listenIP: 0.0.0.0 # autoSetPorts indicates whether to automatically set the ports diff --git a/go.mod b/go.mod index de5c4f92f..f8c5fa73c 100644 --- a/go.mod +++ b/go.mod @@ -27,6 +27,7 @@ require ( require github.com/google/uuid v1.6.0 require ( + github.com/IBM/sarama v1.43.0 github.com/fatih/color v1.14.1 github.com/gin-contrib/gzip v1.0.1 github.com/go-redis/redis v6.15.9+incompatible @@ -54,7 +55,6 @@ require ( cloud.google.com/go/iam v1.1.7 // indirect cloud.google.com/go/longrunning v0.5.5 // indirect cloud.google.com/go/storage v1.40.0 // indirect - github.com/IBM/sarama v1.43.0 // indirect github.com/MicahParks/keyfunc v1.9.0 // indirect github.com/aliyun/aliyun-oss-go-sdk v3.0.2+incompatible // indirect github.com/aws/aws-sdk-go-v2 v1.32.5 // indirect diff --git a/internal/api/router.go b/internal/api/router.go index b3bad136e..81787f9a9 100644 --- a/internal/api/router.go +++ b/internal/api/router.go @@ -290,6 +290,7 @@ func newGinRouter(ctx context.Context, client discovery.SvcDiscoveryRegistry, cf conversationGroup.POST("/get_not_notify_conversation_ids", c.GetNotNotifyConversationIDs) conversationGroup.POST("/get_pinned_conversation_ids", c.GetPinnedConversationIDs) conversationGroup.POST("/delete_conversations", c.DeleteConversations) + conversationGroup.POST("/update_conversations_by_user", c.UpdateConversationsByUser) } { diff --git a/internal/rpc/conversation/sync.go b/internal/rpc/conversation/sync.go index a24dd85c6..85128f719 100644 --- a/internal/rpc/conversation/sync.go +++ b/internal/rpc/conversation/sync.go @@ -27,7 +27,7 @@ func (c *conversationServer) GetFullOwnerConversationIDs(ctx context.Context, re conversationIDs = nil } return &conversation.GetFullOwnerConversationIDsResp{ - Version: idHash, + Version: uint64(vl.Version), VersionID: vl.ID.Hex(), Equal: req.IdHash == idHash, ConversationIDs: conversationIDs, diff --git a/internal/rpc/group/sync.go b/internal/rpc/group/sync.go index b864fbf53..92c7a60ce 100644 --- a/internal/rpc/group/sync.go +++ b/internal/rpc/group/sync.go @@ -34,7 +34,7 @@ func (g *groupServer) GetFullGroupMemberUserIDs(ctx context.Context, req *pbgrou userIDs = nil } return &pbgroup.GetFullGroupMemberUserIDsResp{ - Version: idHash, + Version: uint64(vl.Version), VersionID: vl.ID.Hex(), Equal: req.IdHash == idHash, UserIDs: userIDs, @@ -58,7 +58,7 @@ func (g *groupServer) GetFullJoinGroupIDs(ctx context.Context, req *pbgroup.GetF groupIDs = nil } return &pbgroup.GetFullJoinGroupIDsResp{ - Version: idHash, + Version: uint64(vl.Version), VersionID: vl.ID.Hex(), Equal: req.IdHash == idHash, GroupIDs: groupIDs, diff --git a/internal/rpc/relation/sync.go b/internal/rpc/relation/sync.go index 79fa0858c..187f6238d 100644 --- a/internal/rpc/relation/sync.go +++ b/internal/rpc/relation/sync.go @@ -56,7 +56,7 @@ func (s *friendServer) GetFullFriendUserIDs(ctx context.Context, req *relation.G userIDs = nil } return &relation.GetFullFriendUserIDsResp{ - Version: idHash, + Version: uint64(vl.Version), VersionID: vl.ID.Hex(), Equal: req.IdHash == idHash, UserIDs: userIDs, diff --git a/pkg/common/storage/cache/redis/token.go b/pkg/common/storage/cache/redis/token.go index 9ad4c319f..3a9967e90 100644 --- a/pkg/common/storage/cache/redis/token.go +++ b/pkg/common/storage/cache/redis/token.go @@ -220,7 +220,6 @@ func (c *tokenCache) DeleteAndSetTemporary(ctx context.Context, userID string, p if err := c.rdb.HDel(ctx, key, fields...).Err(); err != nil { return errs.Wrap(err) } - if c.localCache != nil { c.removeLocalTokenCache(ctx, key) } diff --git a/pkg/common/storage/kafka/config.go b/pkg/common/storage/kafka/config.go new file mode 100644 index 000000000..1c9c4b0a0 --- /dev/null +++ b/pkg/common/storage/kafka/config.go @@ -0,0 +1,33 @@ +// Copyright © 2024 OpenIM open source community. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package kafka + +type TLSConfig struct { + EnableTLS bool `yaml:"enableTLS"` + CACrt string `yaml:"caCrt"` + ClientCrt string `yaml:"clientCrt"` + ClientKey string `yaml:"clientKey"` + ClientKeyPwd string `yaml:"clientKeyPwd"` + InsecureSkipVerify bool `yaml:"insecureSkipVerify"` +} + +type Config struct { + Username string `yaml:"username"` + Password string `yaml:"password"` + ProducerAck string `yaml:"producerAck"` + CompressType string `yaml:"compressType"` + Addr []string `yaml:"addr"` + TLS TLSConfig `yaml:"tls"` +} diff --git a/pkg/common/storage/kafka/consumer_group.go b/pkg/common/storage/kafka/consumer_group.go new file mode 100644 index 000000000..f0e84bbc9 --- /dev/null +++ b/pkg/common/storage/kafka/consumer_group.go @@ -0,0 +1,68 @@ +// Copyright © 2023 OpenIM. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package kafka + +import ( + "context" + "errors" + + "github.com/IBM/sarama" + "github.com/openimsdk/tools/log" +) + +type MConsumerGroup struct { + sarama.ConsumerGroup + groupID string + topics []string +} + +func NewMConsumerGroup(conf *Config, groupID string, topics []string, autoCommitEnable bool) (*MConsumerGroup, error) { + config, err := BuildConsumerGroupConfig(conf, sarama.OffsetNewest, autoCommitEnable) + if err != nil { + return nil, err + } + group, err := NewConsumerGroup(config, conf.Addr, groupID) + if err != nil { + return nil, err + } + return &MConsumerGroup{ + ConsumerGroup: group, + groupID: groupID, + topics: topics, + }, nil +} + +func (mc *MConsumerGroup) GetContextFromMsg(cMsg *sarama.ConsumerMessage) context.Context { + return GetContextWithMQHeader(cMsg.Headers) +} + +func (mc *MConsumerGroup) RegisterHandleAndConsumer(ctx context.Context, handler sarama.ConsumerGroupHandler) { + for { + err := mc.ConsumerGroup.Consume(ctx, mc.topics, handler) + if errors.Is(err, sarama.ErrClosedConsumerGroup) { + return + } + if errors.Is(err, context.Canceled) { + return + } + if err != nil { + log.ZWarn(ctx, "consume err", err, "topic", mc.topics, "groupID", mc.groupID) + } + } +} + +func (mc *MConsumerGroup) Close() error { + return mc.ConsumerGroup.Close() +} diff --git a/pkg/common/storage/kafka/producer.go b/pkg/common/storage/kafka/producer.go new file mode 100644 index 000000000..5f6be29ed --- /dev/null +++ b/pkg/common/storage/kafka/producer.go @@ -0,0 +1,82 @@ +// Copyright © 2023 OpenIM. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package kafka + +import ( + "context" + "github.com/IBM/sarama" + "github.com/openimsdk/tools/errs" + "google.golang.org/protobuf/proto" +) + +// Producer represents a Kafka producer. +type Producer struct { + addr []string + topic string + config *sarama.Config + producer sarama.SyncProducer +} + +func NewKafkaProducer(config *sarama.Config, addr []string, topic string) (*Producer, error) { + producer, err := NewProducer(config, addr) + if err != nil { + return nil, err + } + return &Producer{ + addr: addr, + topic: topic, + config: config, + producer: producer, + }, nil +} + +// SendMessage sends a message to the Kafka topic configured in the Producer. +func (p *Producer) SendMessage(ctx context.Context, key string, msg proto.Message) (int32, int64, error) { + // Marshal the protobuf message + bMsg, err := proto.Marshal(msg) + if err != nil { + return 0, 0, errs.WrapMsg(err, "kafka proto Marshal err") + } + if len(bMsg) == 0 { + return 0, 0, errs.WrapMsg(errEmptyMsg, "kafka proto Marshal err") + } + + // Prepare Kafka message + kMsg := &sarama.ProducerMessage{ + Topic: p.topic, + Key: sarama.StringEncoder(key), + Value: sarama.ByteEncoder(bMsg), + } + + // Validate message key and value + if kMsg.Key.Length() == 0 || kMsg.Value.Length() == 0 { + return 0, 0, errs.Wrap(errEmptyMsg) + } + + // Attach context metadata as headers + header, err := GetMQHeaderWithContext(ctx) + if err != nil { + return 0, 0, err + } + kMsg.Headers = header + + // Send the message + partition, offset, err := p.producer.SendMessage(kMsg) + if err != nil { + return 0, 0, errs.WrapMsg(err, "p.producer.SendMessage error") + } + + return partition, offset, nil +} diff --git a/pkg/common/storage/kafka/sarama.go b/pkg/common/storage/kafka/sarama.go new file mode 100644 index 000000000..23220b4d0 --- /dev/null +++ b/pkg/common/storage/kafka/sarama.go @@ -0,0 +1,85 @@ +package kafka + +import ( + "bytes" + "strings" + + "github.com/IBM/sarama" + "github.com/openimsdk/tools/errs" +) + +func BuildConsumerGroupConfig(conf *Config, initial int64, autoCommitEnable bool) (*sarama.Config, error) { + kfk := sarama.NewConfig() + kfk.Version = sarama.V2_0_0_0 + kfk.Consumer.Offsets.Initial = initial + kfk.Consumer.Offsets.AutoCommit.Enable = autoCommitEnable + kfk.Consumer.Return.Errors = false + if conf.Username != "" || conf.Password != "" { + kfk.Net.SASL.Enable = true + kfk.Net.SASL.User = conf.Username + kfk.Net.SASL.Password = conf.Password + } + if conf.TLS.EnableTLS { + tls, err := newTLSConfig(conf.TLS.ClientCrt, conf.TLS.ClientKey, conf.TLS.CACrt, []byte(conf.TLS.ClientKeyPwd), conf.TLS.InsecureSkipVerify) + if err != nil { + return nil, err + } + kfk.Net.TLS.Config = tls + kfk.Net.TLS.Enable = true + } + return kfk, nil +} + +func NewConsumerGroup(conf *sarama.Config, addr []string, groupID string) (sarama.ConsumerGroup, error) { + cg, err := sarama.NewConsumerGroup(addr, groupID, conf) + if err != nil { + return nil, errs.WrapMsg(err, "NewConsumerGroup failed", "addr", addr, "groupID", groupID, "conf", *conf) + } + return cg, nil +} + +func BuildProducerConfig(conf Config) (*sarama.Config, error) { + kfk := sarama.NewConfig() + kfk.Producer.Return.Successes = true + kfk.Producer.Return.Errors = true + kfk.Producer.Partitioner = sarama.NewHashPartitioner + if conf.Username != "" || conf.Password != "" { + kfk.Net.SASL.Enable = true + kfk.Net.SASL.User = conf.Username + kfk.Net.SASL.Password = conf.Password + } + switch strings.ToLower(conf.ProducerAck) { + case "no_response": + kfk.Producer.RequiredAcks = sarama.NoResponse + case "wait_for_local": + kfk.Producer.RequiredAcks = sarama.WaitForLocal + case "wait_for_all": + kfk.Producer.RequiredAcks = sarama.WaitForAll + default: + kfk.Producer.RequiredAcks = sarama.WaitForAll + } + if conf.CompressType == "" { + kfk.Producer.Compression = sarama.CompressionNone + } else { + if err := kfk.Producer.Compression.UnmarshalText(bytes.ToLower([]byte(conf.CompressType))); err != nil { + return nil, errs.WrapMsg(err, "UnmarshalText failed", "compressType", conf.CompressType) + } + } + if conf.TLS.EnableTLS { + tls, err := newTLSConfig(conf.TLS.ClientCrt, conf.TLS.ClientKey, conf.TLS.CACrt, []byte(conf.TLS.ClientKeyPwd), conf.TLS.InsecureSkipVerify) + if err != nil { + return nil, err + } + kfk.Net.TLS.Config = tls + kfk.Net.TLS.Enable = true + } + return kfk, nil +} + +func NewProducer(conf *sarama.Config, addr []string) (sarama.SyncProducer, error) { + producer, err := sarama.NewSyncProducer(addr, conf) + if err != nil { + return nil, errs.WrapMsg(err, "NewSyncProducer failed", "addr", addr, "conf", *conf) + } + return producer, nil +} diff --git a/pkg/common/storage/kafka/tls.go b/pkg/common/storage/kafka/tls.go new file mode 100644 index 000000000..00c89dcc1 --- /dev/null +++ b/pkg/common/storage/kafka/tls.go @@ -0,0 +1,83 @@ +// Copyright © 2024 OpenIM open source community. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package kafka + +import ( + "crypto/tls" + "crypto/x509" + "encoding/pem" + "os" + + "github.com/openimsdk/tools/errs" +) + +// decryptPEM decrypts a PEM block using a password. +func decryptPEM(data []byte, passphrase []byte) ([]byte, error) { + if len(passphrase) == 0 { + return data, nil + } + b, _ := pem.Decode(data) + d, err := x509.DecryptPEMBlock(b, passphrase) + if err != nil { + return nil, errs.WrapMsg(err, "DecryptPEMBlock failed") + } + return pem.EncodeToMemory(&pem.Block{ + Type: b.Type, + Bytes: d, + }), nil +} + +func readEncryptablePEMBlock(path string, pwd []byte) ([]byte, error) { + data, err := os.ReadFile(path) + if err != nil { + return nil, errs.WrapMsg(err, "ReadFile failed", "path", path) + } + return decryptPEM(data, pwd) +} + +// newTLSConfig setup the TLS config from general config file. +func newTLSConfig(clientCertFile, clientKeyFile, caCertFile string, keyPwd []byte, insecureSkipVerify bool) (*tls.Config, error) { + var tlsConfig tls.Config + if clientCertFile != "" && clientKeyFile != "" { + certPEMBlock, err := os.ReadFile(clientCertFile) + if err != nil { + return nil, errs.WrapMsg(err, "ReadFile failed", "clientCertFile", clientCertFile) + } + keyPEMBlock, err := readEncryptablePEMBlock(clientKeyFile, keyPwd) + if err != nil { + return nil, err + } + + cert, err := tls.X509KeyPair(certPEMBlock, keyPEMBlock) + if err != nil { + return nil, errs.WrapMsg(err, "X509KeyPair failed") + } + tlsConfig.Certificates = []tls.Certificate{cert} + } + + if caCertFile != "" { + caCert, err := os.ReadFile(caCertFile) + if err != nil { + return nil, errs.WrapMsg(err, "ReadFile failed", "caCertFile", caCertFile) + } + caCertPool := x509.NewCertPool() + if ok := caCertPool.AppendCertsFromPEM(caCert); !ok { + return nil, errs.New("AppendCertsFromPEM failed") + } + tlsConfig.RootCAs = caCertPool + } + tlsConfig.InsecureSkipVerify = insecureSkipVerify + return &tlsConfig, nil +} diff --git a/pkg/common/storage/kafka/util.go b/pkg/common/storage/kafka/util.go new file mode 100644 index 000000000..61abe5450 --- /dev/null +++ b/pkg/common/storage/kafka/util.go @@ -0,0 +1,34 @@ +package kafka + +import ( + "context" + "errors" + "github.com/IBM/sarama" + "github.com/openimsdk/protocol/constant" + "github.com/openimsdk/tools/mcontext" +) + +var errEmptyMsg = errors.New("kafka binary msg is empty") + +// GetMQHeaderWithContext extracts message queue headers from the context. +func GetMQHeaderWithContext(ctx context.Context) ([]sarama.RecordHeader, error) { + operationID, opUserID, platform, connID, err := mcontext.GetCtxInfos(ctx) + if err != nil { + return nil, err + } + return []sarama.RecordHeader{ + {Key: []byte(constant.OperationID), Value: []byte(operationID)}, + {Key: []byte(constant.OpUserID), Value: []byte(opUserID)}, + {Key: []byte(constant.OpUserPlatform), Value: []byte(platform)}, + {Key: []byte(constant.ConnID), Value: []byte(connID)}, + }, nil +} + +// GetContextWithMQHeader creates a context from message queue headers. +func GetContextWithMQHeader(header []*sarama.RecordHeader) context.Context { + var values []string + for _, recordHeader := range header { + values = append(values, string(recordHeader.Value)) + } + return mcontext.WithMustInfoCtx(values) // Attach extracted values to context +} diff --git a/pkg/common/storage/kafka/verify.go b/pkg/common/storage/kafka/verify.go new file mode 100644 index 000000000..0a09eed4e --- /dev/null +++ b/pkg/common/storage/kafka/verify.go @@ -0,0 +1,79 @@ +// Copyright © 2024 OpenIM open source community. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package kafka + +import ( + "context" + "fmt" + + "github.com/IBM/sarama" + "github.com/openimsdk/tools/errs" +) + +func CheckTopics(ctx context.Context, conf *Config, topics []string) error { + kfk, err := BuildConsumerGroupConfig(conf, sarama.OffsetNewest, false) + if err != nil { + return err + } + cli, err := sarama.NewClient(conf.Addr, kfk) + if err != nil { + return errs.WrapMsg(err, "NewClient failed", "config: ", fmt.Sprintf("%+v", conf)) + } + defer cli.Close() + + existingTopics, err := cli.Topics() + if err != nil { + return errs.WrapMsg(err, "Failed to list topics") + } + + existingTopicsMap := make(map[string]bool) + for _, t := range existingTopics { + existingTopicsMap[t] = true + } + + for _, topic := range topics { + if !existingTopicsMap[topic] { + return errs.New("topic not exist", "topic", topic).Wrap() + } + } + return nil +} + +func CheckHealth(ctx context.Context, conf *Config) error { + kfk, err := BuildConsumerGroupConfig(conf, sarama.OffsetNewest, false) + if err != nil { + return err + } + cli, err := sarama.NewClient(conf.Addr, kfk) + if err != nil { + return errs.WrapMsg(err, "NewClient failed", "config: ", fmt.Sprintf("%+v", conf)) + } + defer cli.Close() + + // Get broker list + brokers := cli.Brokers() + if len(brokers) == 0 { + return errs.New("no brokers found").Wrap() + } + + // Check if all brokers are reachable + for _, broker := range brokers { + if err := broker.Open(kfk); err != nil { + return errs.WrapMsg(err, "failed to open broker", "broker", broker.Addr()) + } + } + + return nil +} diff --git a/tools/seq/internal/seq.go b/tools/seq/internal/seq.go index e90c4a41b..b14ebbf70 100644 --- a/tools/seq/internal/seq.go +++ b/tools/seq/internal/seq.go @@ -72,7 +72,7 @@ func Main(conf string, del time.Duration) error { if err != nil { return err } - + mongodbConfig, err := readConfig[config.Mongo](conf, config.MongodbConfigFileName) if err != nil { return err diff --git a/tools/stress-test-v2/main.go b/tools/stress-test-v2/main.go new file mode 100644 index 000000000..8adcbd62c --- /dev/null +++ b/tools/stress-test-v2/main.go @@ -0,0 +1,735 @@ +package main + +import ( + "bytes" + "context" + "encoding/json" + "flag" + "fmt" + "io" + "net/http" + "os" + "os/signal" + "sync" + "syscall" + "time" + + "github.com/openimsdk/open-im-server/v3/pkg/apistruct" + "github.com/openimsdk/open-im-server/v3/pkg/common/config" + "github.com/openimsdk/protocol/auth" + "github.com/openimsdk/protocol/constant" + "github.com/openimsdk/protocol/group" + "github.com/openimsdk/protocol/sdkws" + pbuser "github.com/openimsdk/protocol/user" + "github.com/openimsdk/tools/log" + "github.com/openimsdk/tools/system/program" +) + +// 1. Create 100K New Users +// 2. Create 100 100K Groups +// 3. Create 1000 999 Groups +// 4. Send message to 100K Groups every second +// 5. Send message to 999 Groups every minute + +var ( + // Use default userIDs List for testing, need to be created. + TestTargetUserList = []string{ + // "", + } + // DefaultGroupID = "" // Use default group ID for testing, need to be created. +) + +var ( + ApiAddress string + + // API method + GetAdminToken = "/auth/get_admin_token" + UserCheck = "/user/account_check" + CreateUser = "/user/user_register" + ImportFriend = "/friend/import_friend" + InviteToGroup = "/group/invite_user_to_group" + GetGroupMemberInfo = "/group/get_group_members_info" + SendMsg = "/msg/send_msg" + CreateGroup = "/group/create_group" + GetUserToken = "/auth/user_token" +) + +const ( + MaxUser = 100000 + Max100KGroup = 100 + Max999Group = 1000 + MaxInviteUserLimit = 999 + + CreateUserTicker = 1 * time.Second + CreateGroupTicker = 1 * time.Second + Create100KGroupTicker = 1 * time.Second + Create999GroupTicker = 1 * time.Second + SendMsgTo100KGroupTicker = 1 * time.Second + SendMsgTo999GroupTicker = 1 * time.Minute +) + +type BaseResp struct { + ErrCode int `json:"errCode"` + ErrMsg string `json:"errMsg"` + Data json.RawMessage `json:"data"` +} + +type StressTest struct { + Conf *conf + AdminUserID string + AdminToken string + DefaultGroupID string + DefaultUserID string + UserCounter int + CreateUserCounter int + Create100kGroupCounter int + Create999GroupCounter int + MsgCounter int + CreatedUsers []string + CreatedGroups []string + Mutex sync.Mutex + Ctx context.Context + Cancel context.CancelFunc + HttpClient *http.Client + Wg sync.WaitGroup + Once sync.Once +} + +type conf struct { + Share config.Share + Api config.API +} + +func initConfig(configDir string) (*config.Share, *config.API, error) { + var ( + share = &config.Share{} + apiConfig = &config.API{} + ) + + err := config.Load(configDir, config.ShareFileName, config.EnvPrefixMap[config.ShareFileName], share) + if err != nil { + return nil, nil, err + } + + err = config.Load(configDir, config.OpenIMAPICfgFileName, config.EnvPrefixMap[config.OpenIMAPICfgFileName], apiConfig) + if err != nil { + return nil, nil, err + } + + return share, apiConfig, nil +} + +// Post Request +func (st *StressTest) PostRequest(ctx context.Context, url string, reqbody any) ([]byte, error) { + // Marshal body + jsonBody, err := json.Marshal(reqbody) + if err != nil { + log.ZError(ctx, "Failed to marshal request body", err, "url", url, "reqbody", reqbody) + return nil, err + } + + req, err := http.NewRequest(http.MethodPost, url, bytes.NewReader(jsonBody)) + if err != nil { + return nil, err + } + req.Header.Set("Content-Type", "application/json") + req.Header.Set("operationID", st.AdminUserID) + if st.AdminToken != "" { + req.Header.Set("token", st.AdminToken) + } + + // log.ZInfo(ctx, "Header info is ", "Content-Type", "application/json", "operationID", st.AdminUserID, "token", st.AdminToken) + + resp, err := st.HttpClient.Do(req) + if err != nil { + log.ZError(ctx, "Failed to send request", err, "url", url, "reqbody", reqbody) + return nil, err + } + defer resp.Body.Close() + + respBody, err := io.ReadAll(resp.Body) + if err != nil { + log.ZError(ctx, "Failed to read response body", err, "url", url) + return nil, err + } + + var baseResp BaseResp + if err := json.Unmarshal(respBody, &baseResp); err != nil { + log.ZError(ctx, "Failed to unmarshal response body", err, "url", url, "respBody", string(respBody)) + return nil, err + } + + if baseResp.ErrCode != 0 { + err = fmt.Errorf(baseResp.ErrMsg) + log.ZError(ctx, "Failed to send request", err, "url", url, "reqbody", reqbody, "resp", baseResp) + return nil, err + } + + return baseResp.Data, nil +} + +func (st *StressTest) GetAdminToken(ctx context.Context) (string, error) { + req := auth.GetAdminTokenReq{ + Secret: st.Conf.Share.Secret, + UserID: st.AdminUserID, + } + + resp, err := st.PostRequest(ctx, ApiAddress+GetAdminToken, &req) + if err != nil { + return "", err + } + + data := &auth.GetAdminTokenResp{} + if err := json.Unmarshal(resp, &data); err != nil { + return "", err + } + + return data.Token, nil +} + +func (st *StressTest) CheckUser(ctx context.Context, userIDs []string) ([]string, error) { + req := pbuser.AccountCheckReq{ + CheckUserIDs: userIDs, + } + + resp, err := st.PostRequest(ctx, ApiAddress+UserCheck, &req) + if err != nil { + return nil, err + } + + data := &pbuser.AccountCheckResp{} + if err := json.Unmarshal(resp, &data); err != nil { + return nil, err + } + + unRegisteredUserIDs := make([]string, 0) + + for _, res := range data.Results { + if res.AccountStatus == constant.UnRegistered { + unRegisteredUserIDs = append(unRegisteredUserIDs, res.UserID) + } + } + + return unRegisteredUserIDs, nil +} + +func (st *StressTest) CreateUser(ctx context.Context, userID string) (string, error) { + user := &sdkws.UserInfo{ + UserID: userID, + Nickname: userID, + } + + req := pbuser.UserRegisterReq{ + Users: []*sdkws.UserInfo{user}, + } + + _, err := st.PostRequest(ctx, ApiAddress+CreateUser, &req) + if err != nil { + return "", err + } + + st.UserCounter++ + return userID, nil +} + +func (st *StressTest) CreateUserBatch(ctx context.Context, userIDs []string) error { + // The method can import a large number of users at once. + var userList []*sdkws.UserInfo + + defer st.Once.Do( + func() { + st.DefaultUserID = userIDs[0] + fmt.Println("Default Send User Created ID:", st.DefaultUserID) + }) + + needUserIDs, err := st.CheckUser(ctx, userIDs) + if err != nil { + return err + } + + for _, userID := range needUserIDs { + user := &sdkws.UserInfo{ + UserID: userID, + Nickname: userID, + } + userList = append(userList, user) + } + + req := pbuser.UserRegisterReq{ + Users: userList, + } + + _, err = st.PostRequest(ctx, ApiAddress+CreateUser, &req) + if err != nil { + return err + } + + st.UserCounter += len(userList) + return nil +} + +func (st *StressTest) GetGroupMembersInfo(ctx context.Context, groupID string, userIDs []string) ([]string, error) { + needInviteUserIDs := make([]string, 0) + + const maxBatchSize = 500 + if len(userIDs) > maxBatchSize { + for i := 0; i < len(userIDs); i += maxBatchSize { + end := min(i+maxBatchSize, len(userIDs)) + batchUserIDs := userIDs[i:end] + + // log.ZInfo(ctx, "Processing group members batch", "groupID", groupID, "batch", i/maxBatchSize+1, + // "batchUserCount", len(batchUserIDs)) + + // Process a single batch + batchReq := group.GetGroupMembersInfoReq{ + GroupID: groupID, + UserIDs: batchUserIDs, + } + + resp, err := st.PostRequest(ctx, ApiAddress+GetGroupMemberInfo, &batchReq) + if err != nil { + log.ZError(ctx, "Batch query failed", err, "batch", i/maxBatchSize+1) + continue + } + + data := &group.GetGroupMembersInfoResp{} + if err := json.Unmarshal(resp, &data); err != nil { + log.ZError(ctx, "Failed to parse batch response", err, "batch", i/maxBatchSize+1) + continue + } + + // Process the batch results + existingMembers := make(map[string]bool) + for _, member := range data.Members { + existingMembers[member.UserID] = true + } + + for _, userID := range batchUserIDs { + if !existingMembers[userID] { + needInviteUserIDs = append(needInviteUserIDs, userID) + } + } + } + + return needInviteUserIDs, nil + } + + req := group.GetGroupMembersInfoReq{ + GroupID: groupID, + UserIDs: userIDs, + } + + resp, err := st.PostRequest(ctx, ApiAddress+GetGroupMemberInfo, &req) + if err != nil { + return nil, err + } + + data := &group.GetGroupMembersInfoResp{} + if err := json.Unmarshal(resp, &data); err != nil { + return nil, err + } + + existingMembers := make(map[string]bool) + for _, member := range data.Members { + existingMembers[member.UserID] = true + } + + for _, userID := range userIDs { + if !existingMembers[userID] { + needInviteUserIDs = append(needInviteUserIDs, userID) + } + } + + return needInviteUserIDs, nil +} + +func (st *StressTest) InviteToGroup(ctx context.Context, groupID string, userIDs []string) error { + req := group.InviteUserToGroupReq{ + GroupID: groupID, + InvitedUserIDs: userIDs, + } + _, err := st.PostRequest(ctx, ApiAddress+InviteToGroup, &req) + if err != nil { + return err + } + + return nil +} + +func (st *StressTest) SendMsg(ctx context.Context, userID string, groupID string) error { + contentObj := map[string]any{ + // "content": fmt.Sprintf("index %d. The current time is %s", st.MsgCounter, time.Now().Format("2006-01-02 15:04:05.000")), + "content": fmt.Sprintf("The current time is %s", time.Now().Format("2006-01-02 15:04:05.000")), + } + + req := &apistruct.SendMsgReq{ + SendMsg: apistruct.SendMsg{ + SendID: userID, + SenderNickname: userID, + GroupID: groupID, + ContentType: constant.Text, + SessionType: constant.ReadGroupChatType, + Content: contentObj, + }, + } + + _, err := st.PostRequest(ctx, ApiAddress+SendMsg, &req) + if err != nil { + log.ZError(ctx, "Failed to send message", err, "userID", userID, "req", &req) + return err + } + + st.MsgCounter++ + + return nil +} + +// Max userIDs number is 1000 +func (st *StressTest) CreateGroup(ctx context.Context, groupID string, userID string, userIDsList []string) (string, error) { + groupInfo := &sdkws.GroupInfo{ + GroupID: groupID, + GroupName: groupID, + GroupType: constant.WorkingGroup, + } + + req := group.CreateGroupReq{ + OwnerUserID: userID, + MemberUserIDs: userIDsList, + GroupInfo: groupInfo, + } + + resp := group.CreateGroupResp{} + + response, err := st.PostRequest(ctx, ApiAddress+CreateGroup, &req) + if err != nil { + return "", err + } + + if err := json.Unmarshal(response, &resp); err != nil { + return "", err + } + + // st.GroupCounter++ + + return resp.GroupInfo.GroupID, nil +} + +func main() { + var configPath string + // defaultConfigDir := filepath.Join("..", "..", "..", "..", "..", "config") + // flag.StringVar(&configPath, "c", defaultConfigDir, "config path") + flag.StringVar(&configPath, "c", "", "config path") + flag.Parse() + + if configPath == "" { + _, _ = fmt.Fprintln(os.Stderr, "config path is empty") + os.Exit(1) + return + } + + fmt.Printf(" Config Path: %s\n", configPath) + + share, apiConfig, err := initConfig(configPath) + if err != nil { + program.ExitWithError(err) + return + } + + ApiAddress = fmt.Sprintf("http://%s:%s", "127.0.0.1", fmt.Sprint(apiConfig.Api.Ports[0])) + + ctx, cancel := context.WithCancel(context.Background()) + // ch := make(chan struct{}) + + st := &StressTest{ + Conf: &conf{ + Share: *share, + Api: *apiConfig, + }, + AdminUserID: share.IMAdminUser.UserIDs[0], + Ctx: ctx, + Cancel: cancel, + HttpClient: &http.Client{ + Timeout: 50 * time.Second, + }, + } + + c := make(chan os.Signal, 1) + signal.Notify(c, os.Interrupt, syscall.SIGTERM) + go func() { + <-c + fmt.Println("\nReceived stop signal, stopping...") + + go func() { + // time.Sleep(5 * time.Second) + fmt.Println("Force exit") + os.Exit(0) + }() + + st.Cancel() + }() + + token, err := st.GetAdminToken(st.Ctx) + if err != nil { + log.ZError(ctx, "Get Admin Token failed.", err, "AdminUserID", st.AdminUserID) + } + + st.AdminToken = token + fmt.Println("Admin Token:", st.AdminToken) + fmt.Println("ApiAddress:", ApiAddress) + for i := 0; i < MaxUser; i++ { + userID := fmt.Sprintf("v2_StressTest_User_%d", i) + st.CreatedUsers = append(st.CreatedUsers, userID) + st.CreateUserCounter++ + } + + // err = st.CreateUserBatch(st.Ctx, st.CreatedUsers) + // if err != nil { + // log.ZError(ctx, "Create user failed.", err) + // } + + const batchSize = 1000 + totalUsers := len(st.CreatedUsers) + successCount := 0 + + if st.DefaultUserID == "" && len(st.CreatedUsers) > 0 { + st.DefaultUserID = st.CreatedUsers[0] + } + + for i := 0; i < totalUsers; i += batchSize { + end := min(i+batchSize, totalUsers) + + userBatch := st.CreatedUsers[i:end] + log.ZInfo(st.Ctx, "Creating user batch", "batch", i/batchSize+1, "count", len(userBatch)) + + err = st.CreateUserBatch(st.Ctx, userBatch) + if err != nil { + log.ZError(st.Ctx, "Batch user creation failed", err, "batch", i/batchSize+1) + } else { + successCount += len(userBatch) + log.ZInfo(st.Ctx, "Batch user creation succeeded", "batch", i/batchSize+1, + "progress", fmt.Sprintf("%d/%d", successCount, totalUsers)) + } + } + + // Execute create 100k group + st.Wg.Add(1) + go func() { + defer st.Wg.Done() + + create100kGroupTicker := time.NewTicker(Create100KGroupTicker) + defer create100kGroupTicker.Stop() + + for i := 0; i < Max100KGroup; i++ { + select { + case <-st.Ctx.Done(): + log.ZInfo(st.Ctx, "Stop Create 100K Group") + return + + case <-create100kGroupTicker.C: + // Create 100K groups + st.Wg.Add(1) + go func(idx int) { + defer st.Wg.Done() + defer func() { + st.Create100kGroupCounter++ + }() + + groupID := fmt.Sprintf("v2_StressTest_Group_100K_%d", idx) + + if _, err = st.CreateGroup(st.Ctx, groupID, st.DefaultUserID, TestTargetUserList); err != nil { + log.ZError(st.Ctx, "Create group failed.", err) + // continue + } + + for i := 0; i < MaxUser/MaxInviteUserLimit; i++ { + InviteUserIDs := make([]string, 0) + // ensure TargetUserList is in group + InviteUserIDs = append(InviteUserIDs, TestTargetUserList...) + + startIdx := max(i*MaxInviteUserLimit, 1) + endIdx := min((i+1)*MaxInviteUserLimit, MaxUser) + + for j := startIdx; j < endIdx; j++ { + userCreatedID := fmt.Sprintf("v2_StressTest_User_%d", j) + InviteUserIDs = append(InviteUserIDs, userCreatedID) + } + + if len(InviteUserIDs) == 0 { + log.ZWarn(st.Ctx, "InviteUserIDs is empty", nil, "groupID", groupID) + continue + } + + InviteUserIDs, err := st.GetGroupMembersInfo(ctx, groupID, InviteUserIDs) + if err != nil { + log.ZError(st.Ctx, "GetGroupMembersInfo failed.", err, "groupID", groupID) + continue + } + + if len(InviteUserIDs) == 0 { + log.ZWarn(st.Ctx, "InviteUserIDs is empty", nil, "groupID", groupID) + continue + } + + // Invite To Group + if err = st.InviteToGroup(st.Ctx, groupID, InviteUserIDs); err != nil { + log.ZError(st.Ctx, "Invite To Group failed.", err, "UserID", InviteUserIDs) + continue + // os.Exit(1) + // return + } + } + }(i) + } + } + }() + + // create 999 groups + st.Wg.Add(1) + go func() { + defer st.Wg.Done() + + create999GroupTicker := time.NewTicker(Create999GroupTicker) + defer create999GroupTicker.Stop() + + for i := 0; i < Max999Group; i++ { + select { + case <-st.Ctx.Done(): + log.ZInfo(st.Ctx, "Stop Create 999 Group") + return + + case <-create999GroupTicker.C: + // Create 999 groups + st.Wg.Add(1) + go func(idx int) { + defer st.Wg.Done() + defer func() { + st.Create999GroupCounter++ + }() + + groupID := fmt.Sprintf("v2_StressTest_Group_1K_%d", idx) + + if _, err = st.CreateGroup(st.Ctx, groupID, st.DefaultUserID, TestTargetUserList); err != nil { + log.ZError(st.Ctx, "Create group failed.", err) + // continue + } + for i := 0; i < MaxUser/MaxInviteUserLimit; i++ { + InviteUserIDs := make([]string, 0) + // ensure TargetUserList is in group + InviteUserIDs = append(InviteUserIDs, TestTargetUserList...) + + startIdx := max(i*MaxInviteUserLimit, 1) + endIdx := min((i+1)*MaxInviteUserLimit, MaxUser) + + for j := startIdx; j < endIdx; j++ { + userCreatedID := fmt.Sprintf("v2_StressTest_User_%d", j) + InviteUserIDs = append(InviteUserIDs, userCreatedID) + } + + if len(InviteUserIDs) == 0 { + log.ZWarn(st.Ctx, "InviteUserIDs is empty", nil, "groupID", groupID) + continue + } + + InviteUserIDs, err := st.GetGroupMembersInfo(ctx, groupID, InviteUserIDs) + if err != nil { + log.ZError(st.Ctx, "GetGroupMembersInfo failed.", err, "groupID", groupID) + continue + } + + if len(InviteUserIDs) == 0 { + log.ZWarn(st.Ctx, "InviteUserIDs is empty", nil, "groupID", groupID) + continue + } + + // Invite To Group + if err = st.InviteToGroup(st.Ctx, groupID, InviteUserIDs); err != nil { + log.ZError(st.Ctx, "Invite To Group failed.", err, "UserID", InviteUserIDs) + continue + // os.Exit(1) + // return + } + } + }(i) + } + } + }() + + // Send message to 100K groups + st.Wg.Wait() + fmt.Println("All groups created successfully, starting to send messages...") + log.ZInfo(ctx, "All groups created successfully, starting to send messages...") + + var groups100K []string + var groups999 []string + + for i := 0; i < Max100KGroup; i++ { + groupID := fmt.Sprintf("v2_StressTest_Group_100K_%d", i) + groups100K = append(groups100K, groupID) + } + + for i := 0; i < Max999Group; i++ { + groupID := fmt.Sprintf("v2_StressTest_Group_1K_%d", i) + groups999 = append(groups999, groupID) + } + + send100kGroupLimiter := make(chan struct{}, 20) + send999GroupLimiter := make(chan struct{}, 100) + + // execute Send message to 100K groups + go func() { + ticker := time.NewTicker(SendMsgTo100KGroupTicker) + defer ticker.Stop() + + for { + select { + case <-st.Ctx.Done(): + log.ZInfo(st.Ctx, "Stop Send Message to 100K Group") + return + + case <-ticker.C: + // Send message to 100K groups + for _, groupID := range groups100K { + send100kGroupLimiter <- struct{}{} + go func(groupID string) { + defer func() { <-send100kGroupLimiter }() + if err := st.SendMsg(st.Ctx, st.DefaultUserID, groupID); err != nil { + log.ZError(st.Ctx, "Send message to 100K group failed.", err) + } + }(groupID) + } + // log.ZInfo(st.Ctx, "Send message to 100K groups successfully.") + } + } + }() + + // execute Send message to 999 groups + go func() { + ticker := time.NewTicker(SendMsgTo999GroupTicker) + defer ticker.Stop() + + for { + select { + case <-st.Ctx.Done(): + log.ZInfo(st.Ctx, "Stop Send Message to 999 Group") + return + + case <-ticker.C: + // Send message to 999 groups + for _, groupID := range groups999 { + send999GroupLimiter <- struct{}{} + go func(groupID string) { + defer func() { <-send999GroupLimiter }() + + if err := st.SendMsg(st.Ctx, st.DefaultUserID, groupID); err != nil { + log.ZError(st.Ctx, "Send message to 999 group failed.", err) + } + }(groupID) + } + // log.ZInfo(st.Ctx, "Send message to 999 groups successfully.") + } + } + }() + + <-st.Ctx.Done() + fmt.Println("Received signal to exit, shutting down...") +}