From 979c3104c707d78b20f999f9f3c22d135d0ccefa Mon Sep 17 00:00:00 2001 From: withchao <993506633@qq.com> Date: Wed, 12 Feb 2025 17:30:52 +0800 Subject: [PATCH 1/6] fix: msg cache --- internal/msgtransfer/online_msg_to_mongo_handler.go | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/internal/msgtransfer/online_msg_to_mongo_handler.go b/internal/msgtransfer/online_msg_to_mongo_handler.go index d8836d54e..279ec70c6 100644 --- a/internal/msgtransfer/online_msg_to_mongo_handler.go +++ b/internal/msgtransfer/online_msg_to_mongo_handler.go @@ -77,6 +77,10 @@ func (mc *OnlineHistoryMongoConsumerHandler) handleChatWs2Mongo(ctx context.Cont for _, msg := range msgFromMQ.MsgData { seqs = append(seqs, msg.Seq) } + if err := mc.msgTransferDatabase.DeleteMessagesFromCache(ctx, msgFromMQ.ConversationID, seqs); err != nil { + log.ZError(ctx, "remove cache msg from redis err", err, "msg", + msgFromMQ.MsgData, "conversationID", msgFromMQ.ConversationID) + } } func (*OnlineHistoryMongoConsumerHandler) Setup(_ sarama.ConsumerGroupSession) error { return nil } From 7f0deee5bfd3ff15255f2a1f85d6b4e9a8929f3e Mon Sep 17 00:00:00 2001 From: withchao <993506633@qq.com> Date: Wed, 12 Feb 2025 18:17:47 +0800 Subject: [PATCH 2/6] fix: msg cache --- pkg/common/storage/controller/msg.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/common/storage/controller/msg.go b/pkg/common/storage/controller/msg.go index b92f9b510..0069dc7cc 100644 --- a/pkg/common/storage/controller/msg.go +++ b/pkg/common/storage/controller/msg.go @@ -310,7 +310,7 @@ func (db *commonMsgDatabase) handlerDBMsg(ctx context.Context, cache map[int64][ log.ZError(ctx, "json.Unmarshal", err) return } - if quoteMsg.QuoteMessage == nil || quoteMsg.QuoteMessage.Content == "" { + if quoteMsg.QuoteMessage == nil { return } if quoteMsg.QuoteMessage.Content == "e30=" { From d32353658832f07f36e6b5441f4a97bfceeb6ec2 Mon Sep 17 00:00:00 2001 From: withchao <993506633@qq.com> Date: Wed, 12 Feb 2025 18:24:15 +0800 Subject: [PATCH 3/6] fix: msg cache --- .../msgtransfer/online_msg_to_mongo_handler.go | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/internal/msgtransfer/online_msg_to_mongo_handler.go b/internal/msgtransfer/online_msg_to_mongo_handler.go index 279ec70c6..8405be7fe 100644 --- a/internal/msgtransfer/online_msg_to_mongo_handler.go +++ b/internal/msgtransfer/online_msg_to_mongo_handler.go @@ -73,14 +73,14 @@ func (mc *OnlineHistoryMongoConsumerHandler) handleChatWs2Mongo(ctx context.Cont } else { prommetrics.MsgInsertMongoSuccessCounter.Inc() } - var seqs []int64 - for _, msg := range msgFromMQ.MsgData { - seqs = append(seqs, msg.Seq) - } - if err := mc.msgTransferDatabase.DeleteMessagesFromCache(ctx, msgFromMQ.ConversationID, seqs); err != nil { - log.ZError(ctx, "remove cache msg from redis err", err, "msg", - msgFromMQ.MsgData, "conversationID", msgFromMQ.ConversationID) - } + //var seqs []int64 + //for _, msg := range msgFromMQ.MsgData { + // seqs = append(seqs, msg.Seq) + //} + //if err := mc.msgTransferDatabase.DeleteMessagesFromCache(ctx, msgFromMQ.ConversationID, seqs); err != nil { + // log.ZError(ctx, "remove cache msg from redis err", err, "msg", + // msgFromMQ.MsgData, "conversationID", msgFromMQ.ConversationID) + //} } func (*OnlineHistoryMongoConsumerHandler) Setup(_ sarama.ConsumerGroupSession) error { return nil } From 5079b99e9785e84b9932de7d40546b5b4b65e999 Mon Sep 17 00:00:00 2001 From: withchao <993506633@qq.com> Date: Thu, 15 May 2025 11:42:42 +0800 Subject: [PATCH 4/6] fix: merge conflicts --- go.mod | 6 +- go.sum | 4 +- internal/msggateway/init.go | 1 - internal/rpc/group/notification.go | 67 +++--- magefile.go | 13 +- pkg/common/config/config.go | 261 ++++++----------------- pkg/common/config/load_config_test.go | 42 +++- pkg/common/config/parse.go | 12 +- pkg/common/startrpc/start.go | 97 ++++++++- pkg/common/storage/cache/mcache/token.go | 38 +++- pkg/notification/msg.go | 17 +- pkg/rpcli/auth.go | 1 + 12 files changed, 303 insertions(+), 256 deletions(-) diff --git a/go.mod b/go.mod index 8250bfece..88289d2a9 100644 --- a/go.mod +++ b/go.mod @@ -12,8 +12,8 @@ require ( github.com/gorilla/websocket v1.5.1 github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0 github.com/mitchellh/mapstructure v1.5.0 - github.com/openimsdk/protocol v0.0.72-alpha.71 - github.com/openimsdk/tools v0.0.50-alpha.74 + github.com/openimsdk/protocol v0.0.73-alpha.6 + github.com/openimsdk/tools v0.0.50-alpha.81 github.com/pkg/errors v0.9.1 // indirect github.com/prometheus/client_golang v1.18.0 github.com/stretchr/testify v1.9.0 @@ -27,6 +27,7 @@ require ( require github.com/google/uuid v1.6.0 require ( + github.com/IBM/sarama v1.43.0 github.com/fatih/color v1.14.1 github.com/gin-contrib/gzip v1.0.1 github.com/go-redis/redis v6.15.9+incompatible @@ -54,7 +55,6 @@ require ( cloud.google.com/go/iam v1.1.7 // indirect cloud.google.com/go/longrunning v0.5.5 // indirect cloud.google.com/go/storage v1.40.0 // indirect - github.com/IBM/sarama v1.43.0 // indirect github.com/MicahParks/keyfunc v1.9.0 // indirect github.com/aliyun/aliyun-oss-go-sdk v3.0.2+incompatible // indirect github.com/aws/aws-sdk-go-v2 v1.32.5 // indirect diff --git a/go.sum b/go.sum index c80690f80..6bc410a2d 100644 --- a/go.sum +++ b/go.sum @@ -349,8 +349,8 @@ github.com/openimsdk/gomake v0.0.15-alpha.5 h1:eEZCEHm+NsmcO3onXZPIUbGFCYPYbsX5b github.com/openimsdk/gomake v0.0.15-alpha.5/go.mod h1:PndCozNc2IsQIciyn9mvEblYWZwJmAI+06z94EY+csI= github.com/openimsdk/protocol v0.0.73-alpha.6 h1:sna9coWG7HN1zObBPtvG0Ki/vzqHXiB4qKbA5P3w7kc= github.com/openimsdk/protocol v0.0.73-alpha.6/go.mod h1:WF7EuE55vQvpyUAzDXcqg+B+446xQyEba0X35lTINmw= -github.com/openimsdk/tools v0.0.50-alpha.79 h1:jxYEbrzaze4Z2r4NrKad816buZ690ix0L9MTOOOH3ik= -github.com/openimsdk/tools v0.0.50-alpha.79/go.mod h1:n2poR3asX1e1XZce4O+MOWAp+X02QJRFvhcLCXZdzRo= +github.com/openimsdk/tools v0.0.50-alpha.81 h1:VbuJKtigNXLkCKB/Q6f2UHsqoSaTOAwS8F51c1nhOCA= +github.com/openimsdk/tools v0.0.50-alpha.81/go.mod h1:n2poR3asX1e1XZce4O+MOWAp+X02QJRFvhcLCXZdzRo= github.com/pelletier/go-toml/v2 v2.2.2 h1:aYUidT7k73Pcl9nb2gScu7NSrKCSHIDE89b3+6Wq+LM= github.com/pelletier/go-toml/v2 v2.2.2/go.mod h1:1t835xjRzz80PqgE6HHgN2JOsmgYu/h4qDAS4n929Rs= github.com/pierrec/lz4/v4 v4.1.21 h1:yOVMLb6qSIDP67pl/5F7RepeKYu/VmTyEXvuMI5d9mQ= diff --git a/internal/msggateway/init.go b/internal/msggateway/init.go index 478566eaa..8772693cc 100644 --- a/internal/msggateway/init.go +++ b/internal/msggateway/init.go @@ -27,7 +27,6 @@ import ( "google.golang.org/grpc" "github.com/openimsdk/tools/log" - "github.com/openimsdk/tools/utils/datautil" ) type Config struct { diff --git a/internal/rpc/group/notification.go b/internal/rpc/group/notification.go index a6c715735..0a18371f9 100644 --- a/internal/rpc/group/notification.go +++ b/internal/rpc/group/notification.go @@ -22,6 +22,8 @@ import ( "github.com/openimsdk/open-im-server/v3/pkg/rpcli" + "go.mongodb.org/mongo-driver/mongo" + "github.com/openimsdk/open-im-server/v3/pkg/authverify" "github.com/openimsdk/open-im-server/v3/pkg/common/convert" "github.com/openimsdk/open-im-server/v3/pkg/common/servererrs" @@ -41,7 +43,6 @@ import ( "github.com/openimsdk/tools/mcontext" "github.com/openimsdk/tools/utils/datautil" "github.com/openimsdk/tools/utils/stringutil" - "go.mongodb.org/mongo-driver/mongo" ) // GroupApplicationReceiver @@ -52,11 +53,11 @@ const ( func NewNotificationSender(db controller.GroupDatabase, config *Config, userClient *rpcli.UserClient, msgClient *rpcli.MsgClient, conversationClient *rpcli.ConversationClient) *NotificationSender { return &NotificationSender{ - NotificationSender: rpcclient.NewNotificationSender(&config.NotificationConfig, - rpcclient.WithRpcClient(func(ctx context.Context, req *msg.SendMsgReq) (*msg.SendMsgResp, error) { + NotificationSender: notification.NewNotificationSender(&config.NotificationConfig, + notification.WithRpcClient(func(ctx context.Context, req *msg.SendMsgReq) (*msg.SendMsgResp, error) { return msgClient.SendMsg(ctx, req) }), - rpcclient.WithUserRpcClient(userClient.GetUserInfo), + notification.WithUserRpcClient(userClient.GetUserInfo), ), getUsersInfo: func(ctx context.Context, userIDs []string) ([]common_user.CommonUser, error) { users, err := userClient.GetUsersInfo(ctx, userIDs) @@ -73,7 +74,7 @@ func NewNotificationSender(db controller.GroupDatabase, config *Config, userClie } type NotificationSender struct { - *rpcclient.NotificationSender + *notification.NotificationSender getUsersInfo func(ctx context.Context, userIDs []string) ([]common_user.CommonUser, error) db controller.GroupDatabase config *Config @@ -233,12 +234,12 @@ func (g *NotificationSender) groupMemberDB2PB(member *model.GroupMember, appMang return result, nil } */ -func (g *NotificationSender) fillOpUser(ctx context.Context, opUser **sdkws.GroupMemberFullInfo, groupID string) (err error) { - return g.fillOpUserByUserID(ctx, mcontext.GetOpUserID(ctx), opUser, groupID) +func (g *NotificationSender) fillOpUser(ctx context.Context, targetUser **sdkws.GroupMemberFullInfo, groupID string) (err error) { + return g.fillUserByUserID(ctx, mcontext.GetOpUserID(ctx), targetUser, groupID) } -func (g *NotificationSender) fillOpUserByUserID(ctx context.Context, userID string, opUser **sdkws.GroupMemberFullInfo, groupID string) error { - if opUser == nil { +func (g *NotificationSender) fillUserByUserID(ctx context.Context, userID string, targetUser **sdkws.GroupMemberFullInfo, groupID string) error { + if targetUser == nil { return errs.ErrInternalServer.WrapMsg("**sdkws.GroupMemberFullInfo is nil") } if groupID != "" { @@ -252,7 +253,7 @@ func (g *NotificationSender) fillOpUserByUserID(ctx context.Context, userID stri } else { member, err := g.db.TakeGroupMember(ctx, groupID, userID) if err == nil { - *opUser = g.groupMemberDB2PB(member, 0) + *targetUser = g.groupMemberDB2PB(member, 0) } else if !(errors.Is(err, mongo.ErrNoDocuments) || errs.ErrRecordNotFound.Is(err)) { return err } @@ -262,8 +263,8 @@ func (g *NotificationSender) fillOpUserByUserID(ctx context.Context, userID stri if err != nil { return err } - if *opUser == nil { - *opUser = &sdkws.GroupMemberFullInfo{ + if *targetUser == nil { + *targetUser = &sdkws.GroupMemberFullInfo{ GroupID: groupID, UserID: userID, Nickname: user.Nickname, @@ -271,11 +272,11 @@ func (g *NotificationSender) fillOpUserByUserID(ctx context.Context, userID stri OperatorUserID: userID, } } else { - if (*opUser).Nickname == "" { - (*opUser).Nickname = user.Nickname + if (*targetUser).Nickname == "" { + (*targetUser).Nickname = user.Nickname } - if (*opUser).FaceURL == "" { - (*opUser).FaceURL = user.FaceURL + if (*targetUser).FaceURL == "" { + (*targetUser).FaceURL = user.FaceURL } } return nil @@ -308,7 +309,7 @@ func (g *NotificationSender) setSortVersion(ctx context.Context, version *uint64 } } -func (g *NotificationSender) GroupCreatedNotification(ctx context.Context, tips *sdkws.GroupCreatedTips) { +func (g *NotificationSender) GroupCreatedNotification(ctx context.Context, tips *sdkws.GroupCreatedTips, SendMessage *bool) { var err error defer func() { if err != nil { @@ -319,7 +320,7 @@ func (g *NotificationSender) GroupCreatedNotification(ctx context.Context, tips return } g.setVersion(ctx, &tips.GroupMemberVersion, &tips.GroupMemberVersionID, database.GroupMemberVersionName, tips.Group.GroupID) - g.Notification(ctx, mcontext.GetOpUserID(ctx), tips.Group.GroupID, constant.GroupCreatedNotification, tips) + g.Notification(ctx, mcontext.GetOpUserID(ctx), tips.Group.GroupID, constant.GroupCreatedNotification, tips, notification.WithSendMessage(SendMessage)) } func (g *NotificationSender) GroupInfoSetNotification(ctx context.Context, tips *sdkws.GroupInfoSetTips) { @@ -333,7 +334,7 @@ func (g *NotificationSender) GroupInfoSetNotification(ctx context.Context, tips return } g.setVersion(ctx, &tips.GroupMemberVersion, &tips.GroupMemberVersionID, database.GroupMemberVersionName, tips.Group.GroupID) - g.Notification(ctx, mcontext.GetOpUserID(ctx), tips.Group.GroupID, constant.GroupInfoSetNotification, tips, rpcclient.WithRpcGetUserName()) + g.Notification(ctx, mcontext.GetOpUserID(ctx), tips.Group.GroupID, constant.GroupInfoSetNotification, tips, notification.WithRpcGetUserName()) } func (g *NotificationSender) GroupInfoSetNameNotification(ctx context.Context, tips *sdkws.GroupInfoSetNameTips) { @@ -350,7 +351,7 @@ func (g *NotificationSender) GroupInfoSetNameNotification(ctx context.Context, t g.Notification(ctx, mcontext.GetOpUserID(ctx), tips.Group.GroupID, constant.GroupInfoSetNameNotification, tips) } -func (g *NotificationSender) GroupInfoSetAnnouncementNotification(ctx context.Context, tips *sdkws.GroupInfoSetAnnouncementTips) { +func (g *NotificationSender) GroupInfoSetAnnouncementNotification(ctx context.Context, tips *sdkws.GroupInfoSetAnnouncementTips, sendMessage *bool) { var err error defer func() { if err != nil { @@ -361,7 +362,7 @@ func (g *NotificationSender) GroupInfoSetAnnouncementNotification(ctx context.Co return } g.setVersion(ctx, &tips.GroupMemberVersion, &tips.GroupMemberVersionID, database.GroupMemberVersionName, tips.Group.GroupID) - g.Notification(ctx, mcontext.GetOpUserID(ctx), tips.Group.GroupID, constant.GroupInfoSetAnnouncementNotification, tips, rpcclient.WithRpcGetUserName()) + g.Notification(ctx, mcontext.GetOpUserID(ctx), tips.Group.GroupID, constant.GroupInfoSetAnnouncementNotification, tips, notification.WithRpcGetUserName(), notification.WithSendMessage(sendMessage)) } func (g *NotificationSender) JoinGroupApplicationNotification(ctx context.Context, req *pbgroup.JoinGroupReq) { @@ -506,7 +507,7 @@ func (g *NotificationSender) GroupOwnerTransferredNotification(ctx context.Conte g.Notification(ctx, mcontext.GetOpUserID(ctx), group.GroupID, constant.GroupOwnerTransferredNotification, tips) } -func (g *NotificationSender) MemberKickedNotification(ctx context.Context, tips *sdkws.MemberKickedTips) { +func (g *NotificationSender) MemberKickedNotification(ctx context.Context, tips *sdkws.MemberKickedTips, SendMessage *bool) { var err error defer func() { if err != nil { @@ -517,7 +518,7 @@ func (g *NotificationSender) MemberKickedNotification(ctx context.Context, tips return } g.setVersion(ctx, &tips.GroupMemberVersion, &tips.GroupMemberVersionID, database.GroupMemberVersionName, tips.Group.GroupID) - g.Notification(ctx, mcontext.GetOpUserID(ctx), tips.Group.GroupID, constant.MemberKickedNotification, tips) + g.Notification(ctx, mcontext.GetOpUserID(ctx), tips.Group.GroupID, constant.MemberKickedNotification, tips, notification.WithSendMessage(SendMessage)) } func (g *NotificationSender) GroupApplicationAgreeMemberEnterNotification(ctx context.Context, groupID string, SendMessage *bool, invitedOpUserID string, entrantUserID ...string) error { @@ -561,20 +562,18 @@ func (g *NotificationSender) groupApplicationAgreeMemberEnterNotification(ctx co InvitedUserList: users, } opUserID := mcontext.GetOpUserID(ctx) - if err = g.fillOpUserByUserID(ctx, opUserID, &tips.OpUser, tips.Group.GroupID); err != nil { + if err = g.fillUserByUserID(ctx, opUserID, &tips.OpUser, tips.Group.GroupID); err != nil { return nil } - switch { - case invitedOpUserID == "": - case invitedOpUserID == opUserID: + if invitedOpUserID == opUserID { tips.InviterUser = tips.OpUser - default: - if err = g.fillOpUserByUserID(ctx, invitedOpUserID, &tips.InviterUser, tips.Group.GroupID); err != nil { + } else { + if err = g.fillUserByUserID(ctx, invitedOpUserID, &tips.InviterUser, tips.Group.GroupID); err != nil { return err } } g.setVersion(ctx, &tips.GroupMemberVersion, &tips.GroupMemberVersionID, database.GroupMemberVersionName, tips.Group.GroupID) - g.Notification(ctx, mcontext.GetOpUserID(ctx), group.GroupID, constant.MemberInvitedNotification, tips) + g.Notification(ctx, mcontext.GetOpUserID(ctx), group.GroupID, constant.MemberInvitedNotification, tips, notification.WithSendMessage(SendMessage)) return nil } @@ -619,7 +618,7 @@ func (g *NotificationSender) MemberEnterNotification(ctx context.Context, groupI return nil } -func (g *NotificationSender) GroupDismissedNotification(ctx context.Context, tips *sdkws.GroupDismissedTips) { +func (g *NotificationSender) GroupDismissedNotification(ctx context.Context, tips *sdkws.GroupDismissedTips, SendMessage *bool) { var err error defer func() { if err != nil { @@ -629,7 +628,7 @@ func (g *NotificationSender) GroupDismissedNotification(ctx context.Context, tip if err = g.fillOpUser(ctx, &tips.OpUser, tips.Group.GroupID); err != nil { return } - g.Notification(ctx, mcontext.GetOpUserID(ctx), tips.Group.GroupID, constant.GroupDismissedNotification, tips) + g.Notification(ctx, mcontext.GetOpUserID(ctx), tips.Group.GroupID, constant.GroupDismissedNotification, tips, notification.WithSendMessage(SendMessage)) } func (g *NotificationSender) GroupMemberMutedNotification(ctx context.Context, groupID, groupMemberUserID string, mutedSeconds uint32) { @@ -786,7 +785,7 @@ func (g *NotificationSender) GroupMemberSetToAdminNotification(ctx context.Conte if err = g.fillOpUser(ctx, &tips.OpUser, tips.Group.GroupID); err != nil { return } - g.setVersion(ctx, &tips.GroupMemberVersion, &tips.GroupMemberVersionID, database.GroupMemberVersionName, tips.Group.GroupID) + g.setSortVersion(ctx, &tips.GroupMemberVersion, &tips.GroupMemberVersionID, database.GroupMemberVersionName, tips.Group.GroupID, &tips.GroupSortVersion) g.Notification(ctx, mcontext.GetOpUserID(ctx), group.GroupID, constant.GroupMemberSetToAdminNotification, tips) } @@ -811,6 +810,6 @@ func (g *NotificationSender) GroupMemberSetToOrdinaryUserNotification(ctx contex if err = g.fillOpUser(ctx, &tips.OpUser, tips.Group.GroupID); err != nil { return } - g.setVersion(ctx, &tips.GroupMemberVersion, &tips.GroupMemberVersionID, database.GroupMemberVersionName, tips.Group.GroupID) + g.setSortVersion(ctx, &tips.GroupMemberVersion, &tips.GroupMemberVersionID, database.GroupMemberVersionName, tips.Group.GroupID, &tips.GroupSortVersion) g.Notification(ctx, mcontext.GetOpUserID(ctx), group.GroupID, constant.GroupMemberSetToOrdinaryUserNotification, tips) } diff --git a/magefile.go b/magefile.go index a8a1c4040..d0e77f6fc 100644 --- a/magefile.go +++ b/magefile.go @@ -4,14 +4,23 @@ package main import ( - "github.com/openimsdk/gomake/mageutil" + "flag" "os" + + "github.com/openimsdk/gomake/mageutil" ) var Default = Build func Build() { - mageutil.Build() + flag.Parse() + + bin := flag.Args() + if len(bin) != 0 { + bin = bin[1:] + } + + mageutil.Build(bin) } func Start() { diff --git a/pkg/common/config/config.go b/pkg/common/config/config.go index 6dbab944b..d5ae68ec0 100644 --- a/pkg/common/config/config.go +++ b/pkg/common/config/config.go @@ -18,9 +18,9 @@ import ( "strings" "time" - "github.com/openimsdk/open-im-server/v3/pkg/common/storage/kafka" "github.com/openimsdk/tools/db/mongoutil" "github.com/openimsdk/tools/db/redisutil" + "github.com/openimsdk/tools/mq/kafka" "github.com/openimsdk/tools/s3/aws" "github.com/openimsdk/tools/s3/cos" "github.com/openimsdk/tools/s3/kodo" @@ -334,25 +334,29 @@ type Redis struct { } type BeforeConfig struct { - Enable bool `yaml:"enable"` - Timeout int `yaml:"timeout"` - FailedContinue bool `yaml:"failedContinue"` - AllowedTypes []string `yaml:"allowedTypes"` - DeniedTypes []string `yaml:"deniedTypes"` + Enable bool `yaml:"enable"` + Timeout int `yaml:"timeout"` + FailedContinue bool `yaml:"failedContinue"` + DeniedTypes []int32 `yaml:"deniedTypes"` } type AfterConfig struct { Enable bool `yaml:"enable"` Timeout int `yaml:"timeout"` AttentionIds []string `yaml:"attentionIds"` - AllowedTypes []string `yaml:"allowedTypes"` - DeniedTypes []string `yaml:"deniedTypes"` + DeniedTypes []int32 `yaml:"deniedTypes"` } type Share struct { - Secret string `yaml:"secret"` - IMAdminUserID []string `yaml:"imAdminUserID"` - MultiLogin MultiLogin `yaml:"multiLogin"` + Secret string `yaml:"secret"` + IMAdminUserID []string `yaml:"imAdminUserID"` + MultiLogin MultiLogin `yaml:"multiLogin"` + RPCMaxBodySize MaxRequestBody `yaml:"rpcMaxBodySize"` +} + +type MaxRequestBody struct { + RequestMaxBodySize int `yaml:"requestMaxBodySize"` + ResponseMaxBodySize int `yaml:"responseMaxBodySize"` } type MultiLogin struct { @@ -372,7 +376,7 @@ type RpcService struct { Third string `yaml:"third"` } -func (r *RpcRegisterName) GetServiceNames() []string { +func (r *RpcService) GetServiceNames() []string { return []string{ r.User, r.Friend, @@ -388,55 +392,59 @@ func (r *RpcRegisterName) GetServiceNames() []string { // FullConfig stores all configurations for before and after events type Webhooks struct { - URL string `yaml:"url"` - BeforeSendSingleMsg BeforeConfig `yaml:"beforeSendSingleMsg"` - BeforeUpdateUserInfoEx BeforeConfig `yaml:"beforeUpdateUserInfoEx"` - AfterUpdateUserInfoEx AfterConfig `yaml:"afterUpdateUserInfoEx"` - AfterSendSingleMsg AfterConfig `yaml:"afterSendSingleMsg"` - BeforeSendGroupMsg BeforeConfig `yaml:"beforeSendGroupMsg"` - BeforeMsgModify BeforeConfig `yaml:"beforeMsgModify"` - AfterSendGroupMsg AfterConfig `yaml:"afterSendGroupMsg"` - AfterUserOnline AfterConfig `yaml:"afterUserOnline"` - AfterUserOffline AfterConfig `yaml:"afterUserOffline"` - AfterUserKickOff AfterConfig `yaml:"afterUserKickOff"` - BeforeOfflinePush BeforeConfig `yaml:"beforeOfflinePush"` - BeforeOnlinePush BeforeConfig `yaml:"beforeOnlinePush"` - BeforeGroupOnlinePush BeforeConfig `yaml:"beforeGroupOnlinePush"` - BeforeAddFriend BeforeConfig `yaml:"beforeAddFriend"` - BeforeUpdateUserInfo BeforeConfig `yaml:"beforeUpdateUserInfo"` - AfterUpdateUserInfo AfterConfig `yaml:"afterUpdateUserInfo"` - BeforeCreateGroup BeforeConfig `yaml:"beforeCreateGroup"` - AfterCreateGroup AfterConfig `yaml:"afterCreateGroup"` - BeforeMemberJoinGroup BeforeConfig `yaml:"beforeMemberJoinGroup"` - BeforeSetGroupMemberInfo BeforeConfig `yaml:"beforeSetGroupMemberInfo"` - AfterSetGroupMemberInfo AfterConfig `yaml:"afterSetGroupMemberInfo"` - AfterQuitGroup AfterConfig `yaml:"afterQuitGroup"` - AfterKickGroupMember AfterConfig `yaml:"afterKickGroupMember"` - AfterDismissGroup AfterConfig `yaml:"afterDismissGroup"` - BeforeApplyJoinGroup BeforeConfig `yaml:"beforeApplyJoinGroup"` - AfterGroupMsgRead AfterConfig `yaml:"afterGroupMsgRead"` - AfterSingleMsgRead AfterConfig `yaml:"afterSingleMsgRead"` - BeforeUserRegister BeforeConfig `yaml:"beforeUserRegister"` - AfterUserRegister AfterConfig `yaml:"afterUserRegister"` - AfterTransferGroupOwner AfterConfig `yaml:"afterTransferGroupOwner"` - BeforeSetFriendRemark BeforeConfig `yaml:"beforeSetFriendRemark"` - AfterSetFriendRemark AfterConfig `yaml:"afterSetFriendRemark"` - AfterGroupMsgRevoke AfterConfig `yaml:"afterGroupMsgRevoke"` - AfterJoinGroup AfterConfig `yaml:"afterJoinGroup"` - BeforeInviteUserToGroup BeforeConfig `yaml:"beforeInviteUserToGroup"` - AfterSetGroupInfo AfterConfig `yaml:"afterSetGroupInfo"` - BeforeSetGroupInfo BeforeConfig `yaml:"beforeSetGroupInfo"` - AfterSetGroupInfoEx AfterConfig `yaml:"afterSetGroupInfoEx"` - BeforeSetGroupInfoEx BeforeConfig `yaml:"beforeSetGroupInfoEx"` - AfterRevokeMsg AfterConfig `yaml:"afterRevokeMsg"` - BeforeAddBlack BeforeConfig `yaml:"beforeAddBlack"` - AfterAddFriend AfterConfig `yaml:"afterAddFriend"` - BeforeAddFriendAgree BeforeConfig `yaml:"beforeAddFriendAgree"` - AfterAddFriendAgree AfterConfig `yaml:"afterAddFriendAgree"` - AfterDeleteFriend AfterConfig `yaml:"afterDeleteFriend"` - BeforeImportFriends BeforeConfig `yaml:"beforeImportFriends"` - AfterImportFriends AfterConfig `yaml:"afterImportFriends"` - AfterRemoveBlack AfterConfig `yaml:"afterRemoveBlack"` + URL string `yaml:"url"` + BeforeSendSingleMsg BeforeConfig `yaml:"beforeSendSingleMsg"` + BeforeUpdateUserInfoEx BeforeConfig `yaml:"beforeUpdateUserInfoEx"` + AfterUpdateUserInfoEx AfterConfig `yaml:"afterUpdateUserInfoEx"` + AfterSendSingleMsg AfterConfig `yaml:"afterSendSingleMsg"` + BeforeSendGroupMsg BeforeConfig `yaml:"beforeSendGroupMsg"` + BeforeMsgModify BeforeConfig `yaml:"beforeMsgModify"` + AfterSendGroupMsg AfterConfig `yaml:"afterSendGroupMsg"` + AfterUserOnline AfterConfig `yaml:"afterUserOnline"` + AfterUserOffline AfterConfig `yaml:"afterUserOffline"` + AfterUserKickOff AfterConfig `yaml:"afterUserKickOff"` + BeforeOfflinePush BeforeConfig `yaml:"beforeOfflinePush"` + BeforeOnlinePush BeforeConfig `yaml:"beforeOnlinePush"` + BeforeGroupOnlinePush BeforeConfig `yaml:"beforeGroupOnlinePush"` + BeforeAddFriend BeforeConfig `yaml:"beforeAddFriend"` + BeforeUpdateUserInfo BeforeConfig `yaml:"beforeUpdateUserInfo"` + AfterUpdateUserInfo AfterConfig `yaml:"afterUpdateUserInfo"` + BeforeCreateGroup BeforeConfig `yaml:"beforeCreateGroup"` + AfterCreateGroup AfterConfig `yaml:"afterCreateGroup"` + BeforeMemberJoinGroup BeforeConfig `yaml:"beforeMemberJoinGroup"` + BeforeSetGroupMemberInfo BeforeConfig `yaml:"beforeSetGroupMemberInfo"` + AfterSetGroupMemberInfo AfterConfig `yaml:"afterSetGroupMemberInfo"` + AfterQuitGroup AfterConfig `yaml:"afterQuitGroup"` + AfterKickGroupMember AfterConfig `yaml:"afterKickGroupMember"` + AfterDismissGroup AfterConfig `yaml:"afterDismissGroup"` + BeforeApplyJoinGroup BeforeConfig `yaml:"beforeApplyJoinGroup"` + AfterGroupMsgRead AfterConfig `yaml:"afterGroupMsgRead"` + AfterSingleMsgRead AfterConfig `yaml:"afterSingleMsgRead"` + BeforeUserRegister BeforeConfig `yaml:"beforeUserRegister"` + AfterUserRegister AfterConfig `yaml:"afterUserRegister"` + AfterTransferGroupOwner AfterConfig `yaml:"afterTransferGroupOwner"` + BeforeSetFriendRemark BeforeConfig `yaml:"beforeSetFriendRemark"` + AfterSetFriendRemark AfterConfig `yaml:"afterSetFriendRemark"` + AfterGroupMsgRevoke AfterConfig `yaml:"afterGroupMsgRevoke"` + AfterJoinGroup AfterConfig `yaml:"afterJoinGroup"` + BeforeInviteUserToGroup BeforeConfig `yaml:"beforeInviteUserToGroup"` + AfterSetGroupInfo AfterConfig `yaml:"afterSetGroupInfo"` + BeforeSetGroupInfo BeforeConfig `yaml:"beforeSetGroupInfo"` + AfterSetGroupInfoEx AfterConfig `yaml:"afterSetGroupInfoEx"` + BeforeSetGroupInfoEx BeforeConfig `yaml:"beforeSetGroupInfoEx"` + AfterRevokeMsg AfterConfig `yaml:"afterRevokeMsg"` + BeforeAddBlack BeforeConfig `yaml:"beforeAddBlack"` + AfterAddFriend AfterConfig `yaml:"afterAddFriend"` + BeforeAddFriendAgree BeforeConfig `yaml:"beforeAddFriendAgree"` + AfterAddFriendAgree AfterConfig `yaml:"afterAddFriendAgree"` + AfterDeleteFriend AfterConfig `yaml:"afterDeleteFriend"` + BeforeImportFriends BeforeConfig `yaml:"beforeImportFriends"` + AfterImportFriends AfterConfig `yaml:"afterImportFriends"` + AfterRemoveBlack AfterConfig `yaml:"afterRemoveBlack"` + BeforeCreateSingleChatConversations BeforeConfig `yaml:"beforeCreateSingleChatConversations"` + AfterCreateSingleChatConversations AfterConfig `yaml:"afterCreateSingleChatConversations"` + BeforeCreateGroupChatConversations BeforeConfig `yaml:"beforeCreateGroupChatConversations"` + AfterCreateGroupChatConversations AfterConfig `yaml:"afterCreateGroupChatConversations"` } type ZooKeeper struct { @@ -457,23 +465,6 @@ type Kubernetes struct { Namespace string `yaml:"namespace"` } -func (r *RpcService) GetServiceNames() []string { - return []string{ - r.User, - r.Friend, - r.Msg, - r.Push, - r.MessageGateway, - r.Group, - r.Auth, - r.Conversation, - r.Third, - } -} - -type Kubernetes struct { - Namespace string `yaml:"namespace"` -} type Etcd struct { RootDirectory string `yaml:"rootDirectory"` Address []string `yaml:"address"` @@ -541,6 +532,7 @@ func (m *Minio) Build() *minio.Config { SignEndpoint: formatEndpoint(m.ExternalAddress), } } + func (c *Cos) Build() *cos.Config { return &cos.Config{ BucketURL: c.BucketURL, @@ -597,119 +589,6 @@ func (l *CacheConfig) Enable() bool { return l.Topic != "" && l.SlotNum > 0 && l.SlotSize > 0 } -const ( - DiscoveryConfigFilename = "discovery.yml" - KafkaConfigFileName = "kafka.yml" - LocalCacheConfigFileName = "local-cache.yml" - LogConfigFileName = "log.yml" - MinioConfigFileName = "minio.yml" - MongodbConfigFileName = "mongodb.yml" - OpenIMAPICfgFileName = "openim-api.yml" - OpenIMCronTaskCfgFileName = "openim-crontask.yml" - OpenIMMsgGatewayCfgFileName = "openim-msggateway.yml" - OpenIMMsgTransferCfgFileName = "openim-msgtransfer.yml" - OpenIMPushCfgFileName = "openim-push.yml" - OpenIMRPCAuthCfgFileName = "openim-rpc-auth.yml" - OpenIMRPCConversationCfgFileName = "openim-rpc-conversation.yml" - OpenIMRPCFriendCfgFileName = "openim-rpc-friend.yml" - OpenIMRPCGroupCfgFileName = "openim-rpc-group.yml" - OpenIMRPCMsgCfgFileName = "openim-rpc-msg.yml" - OpenIMRPCThirdCfgFileName = "openim-rpc-third.yml" - OpenIMRPCUserCfgFileName = "openim-rpc-user.yml" - RedisConfigFileName = "redis.yml" - ShareFileName = "share.yml" - WebhooksConfigFileName = "webhooks.yml" - NotificationFileName = "notification.yml" -) - -func (d *Discovery) GetConfigFileName() string { - return DiscoveryConfigFilename -} - -func (k *Kafka) GetConfigFileName() string { - return KafkaConfigFileName -} - -func (lc *LocalCache) GetConfigFileName() string { - return LocalCacheConfigFileName -} - -func (l *Log) GetConfigFileName() string { - return LogConfigFileName -} - -func (m *Minio) GetConfigFileName() string { - return MinioConfigFileName -} - -func (m *Mongo) GetConfigFileName() string { - return MongodbConfigFileName -} - -func (n *Notification) GetConfigFileName() string { - return NotificationFileName -} - -func (a *API) GetConfigFileName() string { - return OpenIMAPICfgFileName -} - -func (ct *CronTask) GetConfigFileName() string { - return OpenIMCronTaskCfgFileName -} - -func (mg *MsgGateway) GetConfigFileName() string { - return OpenIMMsgGatewayCfgFileName -} - -func (mt *MsgTransfer) GetConfigFileName() string { - return OpenIMMsgTransferCfgFileName -} - -func (p *Push) GetConfigFileName() string { - return OpenIMPushCfgFileName -} - -func (a *Auth) GetConfigFileName() string { - return OpenIMRPCAuthCfgFileName -} - -func (c *Conversation) GetConfigFileName() string { - return OpenIMRPCConversationCfgFileName -} - -func (f *Friend) GetConfigFileName() string { - return OpenIMRPCFriendCfgFileName -} - -func (g *Group) GetConfigFileName() string { - return OpenIMRPCGroupCfgFileName -} - -func (m *Msg) GetConfigFileName() string { - return OpenIMRPCMsgCfgFileName -} - -func (t *Third) GetConfigFileName() string { - return OpenIMRPCThirdCfgFileName -} - -func (u *User) GetConfigFileName() string { - return OpenIMRPCUserCfgFileName -} - -func (r *Redis) GetConfigFileName() string { - return RedisConfigFileName -} - -func (s *Share) GetConfigFileName() string { - return ShareFileName -} - -func (w *Webhooks) GetConfigFileName() string { - return WebhooksConfigFileName -} - func InitNotification(notification *Notification) { notification.GroupCreated.UnreadCount = false notification.GroupCreated.ReliabilityLevel = 1 diff --git a/pkg/common/config/load_config_test.go b/pkg/common/config/load_config_test.go index 763bffd9f..f11d91dad 100644 --- a/pkg/common/config/load_config_test.go +++ b/pkg/common/config/load_config_test.go @@ -1,27 +1,51 @@ package config import ( - "github.com/stretchr/testify/assert" + "os" "testing" + + "github.com/stretchr/testify/assert" ) func TestLoadLogConfig(t *testing.T) { var log Log - err := LoadConfig("../../../config/log.yml", "IMENV_LOG", &log) + os.Setenv("IMENV_LOG_REMAINLOGLEVEL", "5") + err := Load("../../../config/", "log.yml", "IMENV_LOG", &log) + assert.Nil(t, err) + t.Log(log.RemainLogLevel) + // assert.Equal(t, "../../../../logs/", log.StorageLocation) +} + +func TestLoadMongoConfig(t *testing.T) { + var mongo Mongo + // os.Setenv("DEPLOYMENT_TYPE", "kubernetes") + os.Setenv("IMENV_MONGODB_PASSWORD", "openIM1231231") + // os.Setenv("IMENV_MONGODB_URI", "openIM123") + // os.Setenv("IMENV_MONGODB_USERNAME", "openIM123") + err := Load("../../../config/", "mongodb.yml", "IMENV_MONGODB", &mongo) + // err := LoadApiConfig("../../../config/mongodb.yml", "IMENV_MONGODB", &mongo) + assert.Nil(t, err) - assert.Equal(t, "../../../../logs/", log.StorageLocation) + t.Log(mongo.Password) + // assert.Equal(t, "openIM123", mongo.Password) + t.Log(os.Getenv("IMENV_MONGODB_PASSWORD")) + t.Log(mongo) + // //export IMENV_OPENIM_RPC_USER_RPC_LISTENIP="0.0.0.0" + // assert.Equal(t, "0.0.0.0", user.RPC.ListenIP) + // //export IMENV_OPENIM_RPC_USER_RPC_PORTS="10110,10111,10112" + // assert.Equal(t, []int{10110, 10111, 10112}, user.RPC.Ports) } func TestLoadMinioConfig(t *testing.T) { var storageConfig Minio - err := LoadConfig("../../../config/minio.yml", "IMENV_MINIO", &storageConfig) + err := Load("../../../config/minio.yml", "IMENV_MINIO", "", &storageConfig) assert.Nil(t, err) assert.Equal(t, "openim", storageConfig.Bucket) } func TestLoadWebhooksConfig(t *testing.T) { var webhooks Webhooks - err := LoadConfig("../../../config/webhooks.yml", "IMENV_WEBHOOKS", &webhooks) + err := Load("../../../config/webhooks.yml", "IMENV_WEBHOOKS", "", &webhooks) assert.Nil(t, err) assert.Equal(t, 5, webhooks.BeforeAddBlack.Timeout) @@ -29,7 +53,7 @@ func TestLoadWebhooksConfig(t *testing.T) { func TestLoadOpenIMRpcUserConfig(t *testing.T) { var user User - err := LoadConfig("../../../config/openim-rpc-user.yml", "IMENV_OPENIM_RPC_USER", &user) + err := Load("../../../config/openim-rpc-user.yml", "IMENV_OPENIM_RPC_USER", "", &user) assert.Nil(t, err) //export IMENV_OPENIM_RPC_USER_RPC_LISTENIP="0.0.0.0" assert.Equal(t, "0.0.0.0", user.RPC.ListenIP) @@ -39,14 +63,14 @@ func TestLoadOpenIMRpcUserConfig(t *testing.T) { func TestLoadNotificationConfig(t *testing.T) { var noti Notification - err := LoadConfig("../../../config/notification.yml", "IMENV_NOTIFICATION", ¬i) + err := Load("../../../config/notification.yml", "IMENV_NOTIFICATION", "", ¬i) assert.Nil(t, err) assert.Equal(t, "Your friend's profile has been changed", noti.FriendRemarkSet.OfflinePush.Title) } func TestLoadOpenIMThirdConfig(t *testing.T) { var third Third - err := LoadConfig("../../../config/openim-rpc-third.yml", "IMENV_OPENIM_RPC_THIRD", &third) + err := Load("../../../config/openim-rpc-third.yml", "IMENV_OPENIM_RPC_THIRD", "", &third) assert.Nil(t, err) assert.Equal(t, "enabled", third.Object.Enable) assert.Equal(t, "https://oss-cn-chengdu.aliyuncs.com", third.Object.Oss.Endpoint) @@ -62,7 +86,7 @@ func TestLoadOpenIMThirdConfig(t *testing.T) { func TestTransferConfig(t *testing.T) { var tran MsgTransfer - err := LoadConfig("../../../config/openim-msgtransfer.yml", "IMENV_OPENIM-MSGTRANSFER", &tran) + err := Load("../../../config/openim-msgtransfer.yml", "IMENV_OPENIM-MSGTRANSFER", "", &tran) assert.Nil(t, err) assert.Equal(t, true, tran.Prometheus.Enable) assert.Equal(t, true, tran.Prometheus.AutoSetPorts) diff --git a/pkg/common/config/parse.go b/pkg/common/config/parse.go index 42cca08cd..df4ecc5a0 100644 --- a/pkg/common/config/parse.go +++ b/pkg/common/config/parse.go @@ -18,15 +18,16 @@ import ( "os" "path/filepath" + "gopkg.in/yaml.v3" + "github.com/openimsdk/open-im-server/v3/pkg/msgprocessor" "github.com/openimsdk/protocol/constant" "github.com/openimsdk/tools/errs" "github.com/openimsdk/tools/field" - "gopkg.in/yaml.v3" ) const ( - FileName = "config.yaml" + DefaultFolderPath = "../config/" ) // return absolude path join ../config/, this is k8s container config path. @@ -56,10 +57,13 @@ func GetProjectRoot() (string, error) { return projectRoot, nil } -func GetOptionsByNotification(cfg NotificationConfig) msgprocessor.Options { +func GetOptionsByNotification(cfg NotificationConfig, sendMessage *bool) msgprocessor.Options { opts := msgprocessor.NewOptions() - if cfg.UnreadCount { + if sendMessage != nil { + cfg.IsSendMsg = *sendMessage + } + if cfg.IsSendMsg { opts = msgprocessor.WithOptions(opts, msgprocessor.WithUnreadCount(true)) } if cfg.OfflinePush.Enable { diff --git a/pkg/common/startrpc/start.go b/pkg/common/startrpc/start.go index 4c0265b6e..03621343b 100644 --- a/pkg/common/startrpc/start.go +++ b/pkg/common/startrpc/start.go @@ -37,7 +37,8 @@ import ( "github.com/openimsdk/tools/discovery" "github.com/openimsdk/tools/errs" "github.com/openimsdk/tools/log" - "github.com/openimsdk/tools/mw" + grpccli "github.com/openimsdk/tools/mw/grpc/client" + grpcsrv "github.com/openimsdk/tools/mw/grpc/server" "google.golang.org/grpc" "google.golang.org/grpc/credentials/insecure" ) @@ -46,6 +47,64 @@ func init() { prommetrics.RegistryAll() } +func getConfigRpcMaxRequestBody(value reflect.Value) *conf.MaxRequestBody { + for value.Kind() == reflect.Pointer { + value = value.Elem() + } + if value.Kind() == reflect.Struct { + num := value.NumField() + for i := 0; i < num; i++ { + field := value.Field(i) + if !field.CanInterface() { + continue + } + for field.Kind() == reflect.Pointer { + field = field.Elem() + } + switch elem := field.Interface().(type) { + case conf.Share: + return &elem.RPCMaxBodySize + case conf.MaxRequestBody: + return &elem + } + if field.Kind() == reflect.Struct { + if elem := getConfigRpcMaxRequestBody(field); elem != nil { + return elem + } + } + } + } + return nil +} + +func getConfigShare(value reflect.Value) *conf.Share { + for value.Kind() == reflect.Pointer { + value = value.Elem() + } + if value.Kind() == reflect.Struct { + num := value.NumField() + for i := 0; i < num; i++ { + field := value.Field(i) + if !field.CanInterface() { + continue + } + for field.Kind() == reflect.Pointer { + field = field.Elem() + } + switch elem := field.Interface().(type) { + case conf.Share: + return &elem + } + if field.Kind() == reflect.Struct { + if elem := getConfigShare(field); elem != nil { + return elem + } + } + } + } + return nil +} + func Start[T any](ctx context.Context, disc *conf.Discovery, prometheusConfig *conf.Prometheus, listenIP, registerIP string, autoSetPorts bool, rpcPorts []int, index int, rpcRegisterName string, notification *conf.Notification, config T, watchConfigNames []string, watchServiceNames []string, @@ -56,7 +115,32 @@ func Start[T any](ctx context.Context, disc *conf.Discovery, prometheusConfig *c conf.InitNotification(notification) } - options = append(options, mw.GrpcServer()) + maxRequestBody := getConfigRpcMaxRequestBody(reflect.ValueOf(config)) + shareConfig := getConfigShare(reflect.ValueOf(config)) + + log.ZDebug(ctx, "rpc start", "rpcMaxRequestBody", maxRequestBody, "rpcRegisterName", rpcRegisterName, "registerIP", registerIP, "listenIP", listenIP) + + options = append(options, + grpcsrv.GrpcServerMetadataContext(), + grpcsrv.GrpcServerLogger(), + grpcsrv.GrpcServerErrorConvert(), + grpcsrv.GrpcServerRequestValidate(), + grpcsrv.GrpcServerPanicCapture(), + ) + if shareConfig != nil && len(shareConfig.IMAdminUserID) > 0 { + options = append(options, grpcServerIMAdminUserID(shareConfig.IMAdminUserID)) + } + var clientOptions []grpc.DialOption + if maxRequestBody != nil { + if maxRequestBody.RequestMaxBodySize > 0 { + options = append(options, grpc.MaxRecvMsgSize(maxRequestBody.RequestMaxBodySize)) + clientOptions = append(clientOptions, grpc.WithDefaultCallOptions(grpc.MaxCallSendMsgSize(maxRequestBody.RequestMaxBodySize))) + } + if maxRequestBody.ResponseMaxBodySize > 0 { + options = append(options, grpc.MaxSendMsgSize(maxRequestBody.ResponseMaxBodySize)) + clientOptions = append(clientOptions, grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(maxRequestBody.ResponseMaxBodySize))) + } + } registerIP, err := network.GetRpcRegisterIP(registerIP) if err != nil { @@ -82,9 +166,16 @@ func Start[T any](ctx context.Context, disc *conf.Discovery, prometheusConfig *c defer client.Close() client.AddOption( - mw.GrpcClient(), grpc.WithTransportCredentials(insecure.NewCredentials()), + grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithDefaultServiceConfig(fmt.Sprintf(`{"LoadBalancingPolicy": "%s"}`, "round_robin")), + + grpccli.GrpcClientLogger(), + grpccli.GrpcClientContext(), + grpccli.GrpcClientErrorConvert(), ) + if len(clientOptions) > 0 { + client.AddOption(clientOptions...) + } ctx, cancel := context.WithCancelCause(ctx) diff --git a/pkg/common/storage/cache/mcache/token.go b/pkg/common/storage/cache/mcache/token.go index d7ae29cfc..98b9cc066 100644 --- a/pkg/common/storage/cache/mcache/token.go +++ b/pkg/common/storage/cache/mcache/token.go @@ -27,7 +27,6 @@ type tokenCache struct { func (x *tokenCache) getTokenKey(userID string, platformID int, token string) string { return cachekey.GetTokenKey(userID, platformID) + ":" + token - } func (x *tokenCache) SetTokenFlag(ctx context.Context, userID string, platformID int, token string, flag int) error { @@ -57,6 +56,14 @@ func (x *tokenCache) GetTokensWithoutError(ctx context.Context, userID string, p return mm, nil } +func (x *tokenCache) HasTemporaryToken(ctx context.Context, userID string, platformID int, token string) error { + key := cachekey.GetTemporaryTokenKey(userID, platformID, token) + if _, err := x.cache.Get(ctx, []string{key}); err != nil { + return err + } + return nil +} + func (x *tokenCache) GetAllTokensWithoutError(ctx context.Context, userID string) (map[int]map[string]int, error) { prefix := cachekey.UidPidToken + userID + ":" tokens, err := x.cache.Prefix(ctx, prefix) @@ -128,3 +135,32 @@ func (x *tokenCache) DeleteTokenByUidPid(ctx context.Context, userID string, pla func (x *tokenCache) getExpireTime(t int64) time.Duration { return time.Hour * 24 * time.Duration(t) } + +func (x *tokenCache) DeleteTokenByTokenMap(ctx context.Context, userID string, tokens map[int][]string) error { + keys := make([]string, 0, len(tokens)) + for platformID, ts := range tokens { + for _, t := range ts { + keys = append(keys, x.getTokenKey(userID, platformID, t)) + } + } + return x.cache.Del(ctx, keys) +} + +func (x *tokenCache) DeleteAndSetTemporary(ctx context.Context, userID string, platformID int, fields []string) error { + keys := make([]string, 0, len(fields)) + for _, f := range fields { + keys = append(keys, x.getTokenKey(userID, platformID, f)) + } + if err := x.cache.Del(ctx, keys); err != nil { + return err + } + + for _, f := range fields { + k := cachekey.GetTemporaryTokenKey(userID, platformID, f) + if err := x.cache.Set(ctx, k, "", time.Minute*5); err != nil { + return errs.Wrap(err) + } + } + + return nil +} diff --git a/pkg/notification/msg.go b/pkg/notification/msg.go index 0795982c8..ba8a9185a 100644 --- a/pkg/notification/msg.go +++ b/pkg/notification/msg.go @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package rpcclient +package notification import ( "context" @@ -179,19 +179,24 @@ func NewNotificationSender(conf *config.Notification, opts ...NotificationSender } type notificationOpt struct { - WithRpcGetUsername bool + RpcGetUsername bool + SendMessage *bool } type NotificationOptions func(*notificationOpt) func WithRpcGetUserName() NotificationOptions { return func(opt *notificationOpt) { - opt.WithRpcGetUsername = true + opt.RpcGetUsername = true + } +} +func WithSendMessage(sendMessage *bool) NotificationOptions { + return func(opt *notificationOpt) { + opt.SendMessage = sendMessage } } func (s *NotificationSender) send(ctx context.Context, sendID, recvID string, contentType, sessionType int32, m proto.Message, opts ...NotificationOptions) { - //ctx = mcontext.WithMustInfoCtx([]string{mcontext.GetOperationID(ctx), mcontext.GetOpUserID(ctx), mcontext.GetOpUserPlatform(ctx), mcontext.GetConnID(ctx)}) ctx = context.WithoutCancel(ctx) ctx, cancel := context.WithTimeout(ctx, time.Second*time.Duration(5)) defer cancel() @@ -208,7 +213,7 @@ func (s *NotificationSender) send(ctx context.Context, sendID, recvID string, co var req msg.SendMsgReq var msg sdkws.MsgData var userInfo *sdkws.UserInfo - if notificationOpt.WithRpcGetUsername && s.getUserInfo != nil { + if notificationOpt.RpcGetUsername && s.getUserInfo != nil { userInfo, err = s.getUserInfo(ctx, sendID) if err != nil { log.ZWarn(ctx, "getUserInfo failed", err, "sendID", sendID) @@ -233,7 +238,7 @@ func (s *NotificationSender) send(ctx context.Context, sendID, recvID string, co if sendID == recvID && contentType == constant.HasReadReceipt { optionsConfig.ReliabilityLevel = constant.UnreliableNotification } - options := config.GetOptionsByNotification(optionsConfig) + options := config.GetOptionsByNotification(optionsConfig, notificationOpt.SendMessage) s.SetOptionsByContentType(ctx, options, contentType) msg.Options = options // fill Notification OfflinePush by config diff --git a/pkg/rpcli/auth.go b/pkg/rpcli/auth.go index 17fc6ea28..35db552b7 100644 --- a/pkg/rpcli/auth.go +++ b/pkg/rpcli/auth.go @@ -2,6 +2,7 @@ package rpcli import ( "context" + "github.com/openimsdk/protocol/auth" "google.golang.org/grpc" ) From 6750e4002276153da8847acc8daca065629b43a8 Mon Sep 17 00:00:00 2001 From: withchao <993506633@qq.com> Date: Thu, 15 May 2025 14:23:27 +0800 Subject: [PATCH 5/6] fix: merge conflicts --- internal/api/msg.go | 3 +- internal/api/router.go | 1 - .../msgtransfer/online_history_msg_handler.go | 1 - .../online_msg_to_mongo_handler.go | 1 - internal/push/offlinepush_handler.go | 1 - internal/push/push_handler.go | 1 - internal/rpc/conversation/notification.go | 7 +- internal/rpc/group/db_map.go | 24 +- internal/rpc/group/group.go | 74 +-- internal/rpc/group/sync.go | 44 +- internal/rpc/msg/notification.go | 6 +- internal/rpc/msg/server.go | 11 +- internal/rpc/relation/notification.go | 5 +- internal/rpc/user/notification.go | 5 +- pkg/common/storage/controller/auth.go | 24 +- pkg/common/storage/controller/msg.go | 1 - pkg/common/storage/controller/msg_transfer.go | 1 - pkg/common/storage/controller/push.go | 1 - tools/check-component/main.go | 2 +- tools/stress-test/README.md | 25 - tools/stress-test/main.go | 459 ------------------ 21 files changed, 125 insertions(+), 572 deletions(-) delete mode 100644 tools/stress-test/README.md delete mode 100755 tools/stress-test/main.go diff --git a/internal/api/msg.go b/internal/api/msg.go index 8be4832e6..5349faf87 100644 --- a/internal/api/msg.go +++ b/internal/api/msg.go @@ -379,7 +379,7 @@ func (m *MessageApi) SendBusinessNotification(c *gin.Context) { IsSendMsg: req.SendMsg, ReliabilityLevel: *req.ReliabilityLevel, UnreadCount: false, - }), + }, nil), }, } respPb, err := m.Client.SendMsg(c, &sendMsgReq) @@ -524,6 +524,7 @@ func (m *MessageApi) SendSimpleMessage(c *gin.Context) { apiresp.GinError(c, err) return } + m.ginRespSendMsg(c, sendReq, respPb) } diff --git a/internal/api/router.go b/internal/api/router.go index 7afe61fec..31c535e53 100644 --- a/internal/api/router.go +++ b/internal/api/router.go @@ -12,7 +12,6 @@ import ( "github.com/openimsdk/open-im-server/v3/pkg/authverify" "github.com/openimsdk/tools/mcontext" "github.com/openimsdk/tools/utils/datautil" - clientv3 "go.etcd.io/etcd/client/v3" "github.com/openimsdk/open-im-server/v3/internal/api/jssdk" "github.com/openimsdk/open-im-server/v3/pkg/common/config" diff --git a/internal/msgtransfer/online_history_msg_handler.go b/internal/msgtransfer/online_history_msg_handler.go index 41bbe081c..a2d0cca67 100644 --- a/internal/msgtransfer/online_history_msg_handler.go +++ b/internal/msgtransfer/online_history_msg_handler.go @@ -28,7 +28,6 @@ import ( "github.com/go-redis/redis" "github.com/openimsdk/open-im-server/v3/pkg/common/prommetrics" "github.com/openimsdk/open-im-server/v3/pkg/common/storage/controller" - "github.com/openimsdk/open-im-server/v3/pkg/common/storage/kafka" "github.com/openimsdk/open-im-server/v3/pkg/msgprocessor" "github.com/openimsdk/open-im-server/v3/pkg/tools/batcher" "github.com/openimsdk/protocol/constant" diff --git a/internal/msgtransfer/online_msg_to_mongo_handler.go b/internal/msgtransfer/online_msg_to_mongo_handler.go index 407a4ae28..ae14d02a1 100644 --- a/internal/msgtransfer/online_msg_to_mongo_handler.go +++ b/internal/msgtransfer/online_msg_to_mongo_handler.go @@ -19,7 +19,6 @@ import ( "github.com/openimsdk/open-im-server/v3/pkg/common/prommetrics" "github.com/openimsdk/open-im-server/v3/pkg/common/storage/controller" - "github.com/openimsdk/open-im-server/v3/pkg/common/storage/kafka" pbmsg "github.com/openimsdk/protocol/msg" "github.com/openimsdk/tools/log" "google.golang.org/protobuf/proto" diff --git a/internal/push/offlinepush_handler.go b/internal/push/offlinepush_handler.go index d99f2647d..eaf6b8ed8 100644 --- a/internal/push/offlinepush_handler.go +++ b/internal/push/offlinepush_handler.go @@ -6,7 +6,6 @@ import ( "github.com/openimsdk/open-im-server/v3/internal/push/offlinepush" "github.com/openimsdk/open-im-server/v3/internal/push/offlinepush/options" "github.com/openimsdk/open-im-server/v3/pkg/common/prommetrics" - "github.com/openimsdk/open-im-server/v3/pkg/common/storage/kafka" "github.com/openimsdk/protocol/constant" pbpush "github.com/openimsdk/protocol/push" "github.com/openimsdk/protocol/sdkws" diff --git a/internal/push/push_handler.go b/internal/push/push_handler.go index 92eec3cb1..418c4c7f2 100644 --- a/internal/push/push_handler.go +++ b/internal/push/push_handler.go @@ -9,7 +9,6 @@ import ( "github.com/openimsdk/open-im-server/v3/internal/push/offlinepush/options" "github.com/openimsdk/open-im-server/v3/pkg/common/prommetrics" "github.com/openimsdk/open-im-server/v3/pkg/common/storage/controller" - "github.com/openimsdk/open-im-server/v3/pkg/common/storage/kafka" "github.com/openimsdk/open-im-server/v3/pkg/common/webhook" "github.com/openimsdk/open-im-server/v3/pkg/msgprocessor" "github.com/openimsdk/open-im-server/v3/pkg/rpccache" diff --git a/internal/rpc/conversation/notification.go b/internal/rpc/conversation/notification.go index c6368b916..370865c1a 100644 --- a/internal/rpc/conversation/notification.go +++ b/internal/rpc/conversation/notification.go @@ -16,21 +16,22 @@ package conversation import ( "context" + + "github.com/openimsdk/open-im-server/v3/pkg/notification" "github.com/openimsdk/open-im-server/v3/pkg/rpcli" "github.com/openimsdk/protocol/msg" "github.com/openimsdk/open-im-server/v3/pkg/common/config" - "github.com/openimsdk/open-im-server/v3/pkg/notification" "github.com/openimsdk/protocol/constant" "github.com/openimsdk/protocol/sdkws" ) type ConversationNotificationSender struct { - *rpcclient.NotificationSender + *notification.NotificationSender } func NewConversationNotificationSender(conf *config.Notification, msgClient *rpcli.MsgClient) *ConversationNotificationSender { - return &ConversationNotificationSender{rpcclient.NewNotificationSender(conf, rpcclient.WithRpcClient(func(ctx context.Context, req *msg.SendMsgReq) (*msg.SendMsgResp, error) { + return &ConversationNotificationSender{notification.NewNotificationSender(conf, notification.WithRpcClient(func(ctx context.Context, req *msg.SendMsgReq) (*msg.SendMsgResp, error) { return msgClient.SendMsg(ctx, req) }))} } diff --git a/internal/rpc/group/db_map.go b/internal/rpc/group/db_map.go index e99f9e772..7504bc851 100644 --- a/internal/rpc/group/db_map.go +++ b/internal/rpc/group/db_map.go @@ -16,6 +16,7 @@ package group import ( "context" + "strings" "time" pbgroup "github.com/openimsdk/protocol/group" @@ -55,41 +56,52 @@ func UpdateGroupInfoMap(ctx context.Context, group *sdkws.GroupInfoForSet) map[s return m } -func UpdateGroupInfoExMap(ctx context.Context, group *pbgroup.SetGroupInfoExReq) (map[string]any, error) { - m := make(map[string]any) +func UpdateGroupInfoExMap(ctx context.Context, group *pbgroup.SetGroupInfoExReq) (m map[string]any, normalFlag, groupNameFlag, notificationFlag bool, err error) { + m = make(map[string]any) if group.GroupName != nil { - if group.GroupName.Value != "" { + if strings.TrimSpace(group.GroupName.Value) != "" { m["group_name"] = group.GroupName.Value + groupNameFlag = true } else { - return nil, errs.ErrArgs.WrapMsg("group name is empty") + return nil, normalFlag, notificationFlag, groupNameFlag, errs.ErrArgs.WrapMsg("group name is empty") } } + if group.Notification != nil { + notificationFlag = true + group.Notification.Value = strings.TrimSpace(group.Notification.Value) // if Notification only contains spaces, set it to empty string + m["notification"] = group.Notification.Value - m["notification_update_time"] = time.Now() m["notification_user_id"] = mcontext.GetOpUserID(ctx) + m["notification_update_time"] = time.Now() } if group.Introduction != nil { m["introduction"] = group.Introduction.Value + normalFlag = true } if group.FaceURL != nil { m["face_url"] = group.FaceURL.Value + normalFlag = true } if group.NeedVerification != nil { m["need_verification"] = group.NeedVerification.Value + normalFlag = true } if group.LookMemberInfo != nil { m["look_member_info"] = group.LookMemberInfo.Value + normalFlag = true } if group.ApplyMemberFriend != nil { m["apply_member_friend"] = group.ApplyMemberFriend.Value + normalFlag = true } if group.Ex != nil { m["ex"] = group.Ex.Value + normalFlag = true } - return m, nil + return m, normalFlag, groupNameFlag, notificationFlag, nil } func UpdateGroupStatusMap(status int) map[string]any { diff --git a/internal/rpc/group/group.go b/internal/rpc/group/group.go index 1255e4eba..10cdc2546 100644 --- a/internal/rpc/group/group.go +++ b/internal/rpc/group/group.go @@ -26,6 +26,8 @@ import ( "github.com/openimsdk/open-im-server/v3/pkg/dbbuild" "github.com/openimsdk/open-im-server/v3/pkg/rpcli" + "google.golang.org/grpc" + "github.com/openimsdk/open-im-server/v3/pkg/authverify" "github.com/openimsdk/open-im-server/v3/pkg/callbackstruct" "github.com/openimsdk/open-im-server/v3/pkg/common/config" @@ -51,7 +53,6 @@ import ( "github.com/openimsdk/tools/mw/specialerror" "github.com/openimsdk/tools/utils/datautil" "github.com/openimsdk/tools/utils/encrypt" - "google.golang.org/grpc" ) type groupServer struct { @@ -284,13 +285,14 @@ func (g *groupServer) CreateGroup(ctx context.Context, req *pbgroup.CreateGroupR break } } - g.notification.GroupCreatedNotification(ctx, tips) + g.notification.GroupCreatedNotification(ctx, tips, req.SendMessage) if req.GroupInfo.Notification != "" { + notificationFlag := true g.notification.GroupInfoSetAnnouncementNotification(ctx, &sdkws.GroupInfoSetAnnouncementTips{ Group: tips.Group, OpUser: tips.OpUser, - }) + }, ¬ificationFlag) } reqCallBackAfter := &pbgroup.CreateGroupReq{ @@ -613,7 +615,7 @@ func (g *groupServer) KickGroupMember(ctx context.Context, req *pbgroup.KickGrou for _, userID := range req.KickedUserIDs { tips.KickedUserList = append(tips.KickedUserList, convert.Db2PbGroupMember(memberMap[userID])) } - g.notification.MemberKickedNotification(ctx, tips) + g.notification.MemberKickedNotification(ctx, tips, req.SendMessage) if err := g.deleteMemberAndSetConversationSeq(ctx, req.GroupID, req.KickedUserIDs); err != nil { return nil, err } @@ -822,8 +824,14 @@ func (g *groupServer) GroupApplicationResponse(ctx context.Context, req *pbgroup if member == nil { log.ZDebug(ctx, "GroupApplicationResponse", "member is nil") } else { - if err = g.notification.GroupApplicationAgreeMemberEnterNotification(ctx, req.GroupID, groupRequest.InviterUserID, req.FromUserID); err != nil { - return nil, err + if groupRequest.InviterUserID == "" { + if err = g.notification.MemberEnterNotification(ctx, req.GroupID, req.FromUserID); err != nil { + return nil, err + } + } else { + if err = g.notification.GroupApplicationAgreeMemberEnterNotification(ctx, req.GroupID, nil, groupRequest.InviterUserID, req.FromUserID); err != nil { + return nil, err + } } } case constant.GroupResponseRefuse: @@ -1025,7 +1033,8 @@ func (g *groupServer) SetGroupInfo(ctx context.Context, req *pbgroup.SetGroupInf log.ZWarn(ctx, "SetConversations", err, "UserIDs", resp.UserIDs, "conversation", conversation) } }() - g.notification.GroupInfoSetAnnouncementNotification(ctx, &sdkws.GroupInfoSetAnnouncementTips{Group: tips.Group, OpUser: tips.OpUser}) + notficationFlag := true + g.notification.GroupInfoSetAnnouncementNotification(ctx, &sdkws.GroupInfoSetAnnouncementTips{Group: tips.Group, OpUser: tips.OpUser}, ¬ficationFlag) } if req.GroupInfoForSet.GroupName != "" { num-- @@ -1086,7 +1095,7 @@ func (g *groupServer) SetGroupInfoEx(ctx context.Context, req *pbgroup.SetGroupI return nil, err } - updatedData, err := UpdateGroupInfoExMap(ctx, req) + updatedData, normalFlag, groupNameFlag, notificationFlag, err := UpdateGroupInfoExMap(ctx, req) if len(updatedData) == 0 { return &pbgroup.SetGroupInfoExResp{}, nil } @@ -1114,41 +1123,38 @@ func (g *groupServer) SetGroupInfoEx(ctx context.Context, req *pbgroup.SetGroupI tips.OpUser = g.groupMemberDB2PB(opMember, 0) } - num := len(updatedData) - - if req.Notification != nil { - num -= 3 - + if notificationFlag { if req.Notification.Value != "" { - func() { - conversation := &pbconv.ConversationReq{ - ConversationID: msgprocessor.GetConversationIDBySessionType(constant.ReadGroupChatType, req.GroupID), - ConversationType: constant.ReadGroupChatType, - GroupID: req.GroupID, - } + conversation := &pbconv.ConversationReq{ + ConversationID: msgprocessor.GetConversationIDBySessionType(constant.ReadGroupChatType, req.GroupID), + ConversationType: constant.ReadGroupChatType, + GroupID: req.GroupID, + } - resp, err := g.GetGroupMemberUserIDs(ctx, &pbgroup.GetGroupMemberUserIDsReq{GroupID: req.GroupID}) - if err != nil { - log.ZWarn(ctx, "GetGroupMemberIDs is failed.", err) - return - } + resp, err := g.GetGroupMemberUserIDs(ctx, &pbgroup.GetGroupMemberUserIDsReq{GroupID: req.GroupID}) + if err != nil { + log.ZWarn(ctx, "GetGroupMemberIDs is failed.", err) + return nil, err + } - conversation.GroupAtType = &wrapperspb.Int32Value{Value: constant.GroupNotification} - if err := g.conversationClient.SetConversations(ctx, resp.UserIDs, conversation); err != nil { - log.ZWarn(ctx, "SetConversations", err, "UserIDs", resp.UserIDs, "conversation", conversation) - } - }() + conversation.GroupAtType = &wrapperspb.Int32Value{Value: constant.GroupNotification} + if err := g.conversationClient.SetConversations(ctx, resp.UserIDs, conversation); err != nil { + log.ZWarn(ctx, "SetConversations", err, "UserIDs", resp.UserIDs, "conversation", conversation) + } - g.notification.GroupInfoSetAnnouncementNotification(ctx, &sdkws.GroupInfoSetAnnouncementTips{Group: tips.Group, OpUser: tips.OpUser}) + g.notification.GroupInfoSetAnnouncementNotification(ctx, &sdkws.GroupInfoSetAnnouncementTips{Group: tips.Group, OpUser: tips.OpUser}, ¬ificationFlag) + } else { + notificationFlag = false + g.notification.GroupInfoSetAnnouncementNotification(ctx, &sdkws.GroupInfoSetAnnouncementTips{Group: tips.Group, OpUser: tips.OpUser}, ¬ificationFlag) } } - if req.GroupName != nil { - num-- + if groupNameFlag { g.notification.GroupInfoSetNameNotification(ctx, &sdkws.GroupInfoSetNameTips{Group: tips.Group, OpUser: tips.OpUser}) } - if num > 0 { + // if updatedData > 0, send the normal notification + if normalFlag { g.notification.GroupInfoSetNotification(ctx, tips) } @@ -1369,7 +1375,7 @@ func (g *groupServer) DismissGroup(ctx context.Context, req *pbgroup.DismissGrou if mcontext.GetOpUserID(ctx) == owner.UserID { tips.OpUser = g.groupMemberDB2PB(owner, 0) } - g.notification.GroupDismissedNotification(ctx, tips) + g.notification.GroupDismissedNotification(ctx, tips, req.SendMessage) } membersID, err := g.db.FindGroupMemberUserID(ctx, group.GroupID) if err != nil { diff --git a/internal/rpc/group/sync.go b/internal/rpc/group/sync.go index ed608dea3..822b15307 100644 --- a/internal/rpc/group/sync.go +++ b/internal/rpc/group/sync.go @@ -52,7 +52,7 @@ func (g *groupServer) GetFullJoinGroupIDs(ctx context.Context, req *pbgroup.GetF if err != nil { return nil, err } - groupIDs, err := s.db.FindJoinGroupID(ctx, req.UserID) + groupIDs, err := g.db.FindJoinGroupID(ctx, req.UserID) if err != nil { return nil, err } @@ -68,8 +68,8 @@ func (g *groupServer) GetFullJoinGroupIDs(ctx context.Context, req *pbgroup.GetF }, nil } -func (s *groupServer) GetIncrementalGroupMember(ctx context.Context, req *pbgroup.GetIncrementalGroupMemberReq) (*pbgroup.GetIncrementalGroupMemberResp, error) { - group, err := s.db.TakeGroup(ctx, req.GroupID) +func (g *groupServer) GetIncrementalGroupMember(ctx context.Context, req *pbgroup.GetIncrementalGroupMemberReq) (*pbgroup.GetIncrementalGroupMemberResp, error) { + group, err := g.db.TakeGroup(ctx, req.GroupID) if err != nil { return nil, err } @@ -89,7 +89,7 @@ func (s *groupServer) GetIncrementalGroupMember(ctx context.Context, req *pbgrou VersionID: req.VersionID, VersionNumber: req.Version, Version: func(ctx context.Context, groupID string, version uint, limit int) (*model.VersionLog, error) { - vl, err := s.db.FindMemberIncrVersion(ctx, groupID, version, limit) + vl, err := g.db.FindMemberIncrVersion(ctx, groupID, version, limit) if err != nil { return nil, err } @@ -112,9 +112,9 @@ func (s *groupServer) GetIncrementalGroupMember(ctx context.Context, req *pbgrou } return vl, nil }, - CacheMaxVersion: s.db.FindMaxGroupMemberVersionCache, + CacheMaxVersion: g.db.FindMaxGroupMemberVersionCache, Find: func(ctx context.Context, ids []string) ([]*sdkws.GroupMemberFullInfo, error) { - return s.getGroupMembersInfo(ctx, req.GroupID, ids) + return g.getGroupMembersInfo(ctx, req.GroupID, ids) }, Resp: func(version *model.VersionLog, delIDs []string, insertList, updateList []*sdkws.GroupMemberFullInfo, full bool) *pbgroup.GetIncrementalGroupMemberResp { return &pbgroup.GetIncrementalGroupMemberResp{ @@ -133,15 +133,15 @@ func (s *groupServer) GetIncrementalGroupMember(ctx context.Context, req *pbgrou return nil, err } if resp.Full || hasGroupUpdate { - count, err := s.db.FindGroupMemberNum(ctx, group.GroupID) + count, err := g.db.FindGroupMemberNum(ctx, group.GroupID) if err != nil { return nil, err } - owner, err := s.db.TakeGroupOwner(ctx, group.GroupID) + owner, err := g.db.TakeGroupOwner(ctx, group.GroupID) if err != nil { return nil, err } - resp.Group = s.groupDB2PB(group, owner.UserID, count) + resp.Group = g.groupDB2PB(group, owner.UserID, count) } return resp, nil } @@ -155,9 +155,9 @@ func (g *groupServer) GetIncrementalJoinGroup(ctx context.Context, req *pbgroup. VersionKey: req.UserID, VersionID: req.VersionID, VersionNumber: req.Version, - Version: s.db.FindJoinIncrVersion, - CacheMaxVersion: s.db.FindMaxJoinGroupVersionCache, - Find: s.getGroupsInfo, + Version: g.db.FindJoinIncrVersion, + CacheMaxVersion: g.db.FindMaxJoinGroupVersionCache, + Find: g.getGroupsInfo, Resp: func(version *model.VersionLog, delIDs []string, insertList, updateList []*sdkws.GroupInfo, full bool) *pbgroup.GetIncrementalJoinGroupResp { return &pbgroup.GetIncrementalJoinGroupResp{ VersionID: version.ID.Hex(), @@ -171,3 +171,23 @@ func (g *groupServer) GetIncrementalJoinGroup(ctx context.Context, req *pbgroup. } return opt.Build() } + +func (g *groupServer) BatchGetIncrementalGroupMember(ctx context.Context, req *pbgroup.BatchGetIncrementalGroupMemberReq) (*pbgroup.BatchGetIncrementalGroupMemberResp, error) { + var num int + resp := make(map[string]*pbgroup.GetIncrementalGroupMemberResp) + for _, memberReq := range req.ReqList { + if _, ok := resp[memberReq.GroupID]; ok { + continue + } + memberResp, err := g.GetIncrementalGroupMember(ctx, memberReq) + if err != nil { + return nil, err + } + resp[memberReq.GroupID] = memberResp + num += len(memberResp.Insert) + len(memberResp.Update) + len(memberResp.Delete) + if num >= versionSyncLimit { + break + } + } + return &pbgroup.BatchGetIncrementalGroupMemberResp{RespList: resp}, nil +} diff --git a/internal/rpc/msg/notification.go b/internal/rpc/msg/notification.go index d5604286a..0418823d6 100644 --- a/internal/rpc/msg/notification.go +++ b/internal/rpc/msg/notification.go @@ -23,11 +23,11 @@ import ( ) type MsgNotificationSender struct { - *rpcclient.NotificationSender + *notification.NotificationSender } -func NewMsgNotificationSender(config *Config, opts ...rpcclient.NotificationSenderOptions) *MsgNotificationSender { - return &MsgNotificationSender{rpcclient.NewNotificationSender(&config.NotificationConfig, opts...)} +func NewMsgNotificationSender(config *Config, opts ...notification.NotificationSenderOptions) *MsgNotificationSender { + return &MsgNotificationSender{notification.NewNotificationSender(&config.NotificationConfig, opts...)} } func (m *MsgNotificationSender) UserDeleteMsgsNotification(ctx context.Context, userID, conversationID string, seqs []int64) { diff --git a/internal/rpc/msg/server.go b/internal/rpc/msg/server.go index fd360295f..5ebe79dd9 100644 --- a/internal/rpc/msg/server.go +++ b/internal/rpc/msg/server.go @@ -58,15 +58,14 @@ type Config struct { // MsgServer encapsulates dependencies required for message handling. type msgServer struct { msg.UnimplementedMsgServer - RegisterCenter discovery.Conn // Service discovery registry for service registration. - MsgDatabase controller.CommonMsgDatabase // Interface for message database operations. - StreamMsgDatabase controller.StreamMsgDatabase + RegisterCenter discovery.Conn // Service discovery registry for service registration. + MsgDatabase controller.CommonMsgDatabase // Interface for message database operations. UserLocalCache *rpccache.UserLocalCache // Local cache for user data. FriendLocalCache *rpccache.FriendLocalCache // Local cache for friend data. GroupLocalCache *rpccache.GroupLocalCache // Local cache for group data. ConversationLocalCache *rpccache.ConversationLocalCache // Local cache for conversation data. Handlers MessageInterceptorChain // Chain of handlers for processing messages. - notificationSender *rpcclient.NotificationSender // RPC client for sending notifications. + notificationSender *notification.NotificationSender // RPC client for sending notifications. msgNotificationSender *MsgNotificationSender // RPC client for sending msg notifications. config *Config // Global configuration settings. webhookClient *webhook.Client @@ -147,8 +146,8 @@ func Start(ctx context.Context, config *Config, client discovery.Conn, server gr conversationClient: conversationClient, } - s.notificationSender = rpcclient.NewNotificationSender(&config.NotificationConfig, rpcclient.WithLocalSendMsg(s.SendMsg)) - s.msgNotificationSender = NewMsgNotificationSender(config, rpcclient.WithLocalSendMsg(s.SendMsg)) + s.notificationSender = notification.NewNotificationSender(&config.NotificationConfig, notification.WithLocalSendMsg(s.SendMsg)) + s.msgNotificationSender = NewMsgNotificationSender(config, notification.WithLocalSendMsg(s.SendMsg)) msg.RegisterMsgServer(server, s) diff --git a/internal/rpc/relation/notification.go b/internal/rpc/relation/notification.go index a34a4d322..caf2dafe1 100644 --- a/internal/rpc/relation/notification.go +++ b/internal/rpc/relation/notification.go @@ -16,6 +16,7 @@ package relation import ( "context" + "github.com/openimsdk/open-im-server/v3/pkg/rpcli" "github.com/openimsdk/protocol/msg" @@ -36,7 +37,7 @@ import ( ) type FriendNotificationSender struct { - *rpcclient.NotificationSender + *notification.NotificationSender // Target not found err getUsersInfo func(ctx context.Context, userIDs []string) ([]common_user.CommonUser, error) // db controller @@ -89,7 +90,7 @@ func WithRpcFunc( func NewFriendNotificationSender(conf *config.Notification, msgClient *rpcli.MsgClient, opts ...friendNotificationSenderOptions) *FriendNotificationSender { f := &FriendNotificationSender{ - NotificationSender: rpcclient.NewNotificationSender(conf, rpcclient.WithRpcClient(func(ctx context.Context, req *msg.SendMsgReq) (*msg.SendMsgResp, error) { + NotificationSender: notification.NewNotificationSender(conf, notification.WithRpcClient(func(ctx context.Context, req *msg.SendMsgReq) (*msg.SendMsgResp, error) { return msgClient.SendMsg(ctx, req) })), } diff --git a/internal/rpc/user/notification.go b/internal/rpc/user/notification.go index 03fdf95bd..4fb214f74 100644 --- a/internal/rpc/user/notification.go +++ b/internal/rpc/user/notification.go @@ -16,6 +16,7 @@ package user import ( "context" + "github.com/openimsdk/open-im-server/v3/pkg/rpcli" "github.com/openimsdk/protocol/msg" @@ -29,7 +30,7 @@ import ( ) type UserNotificationSender struct { - *rpcclient.NotificationSender + *notification.NotificationSender getUsersInfo func(ctx context.Context, userIDs []string) ([]common_user.CommonUser, error) // db controller db controller.UserDatabase @@ -63,7 +64,7 @@ func WithUserFunc( func NewUserNotificationSender(config *Config, msgClient *rpcli.MsgClient, opts ...userNotificationSenderOptions) *UserNotificationSender { f := &UserNotificationSender{ - NotificationSender: rpcclient.NewNotificationSender(&config.NotificationConfig, rpcclient.WithRpcClient(func(ctx context.Context, req *msg.SendMsgReq) (*msg.SendMsgResp, error) { + NotificationSender: notification.NewNotificationSender(&config.NotificationConfig, notification.WithRpcClient(func(ctx context.Context, req *msg.SendMsgReq) (*msg.SendMsgResp, error) { return msgClient.SendMsg(ctx, req) })), } diff --git a/pkg/common/storage/controller/auth.go b/pkg/common/storage/controller/auth.go index a3260d1bc..488a116c3 100644 --- a/pkg/common/storage/controller/auth.go +++ b/pkg/common/storage/controller/auth.go @@ -44,7 +44,8 @@ func NewAuthDatabase(cache cache.TokenModel, accessSecret string, accessExpire i return &authDatabase{cache: cache, accessSecret: accessSecret, accessExpire: accessExpire, multiLogin: multiLoginConfig{ Policy: multiLogin.Policy, MaxNumOneEnd: multiLogin.MaxNumOneEnd, - }, adminUserIDs: adminUserIDs, + }, + adminUserIDs: adminUserIDs, } } @@ -90,23 +91,25 @@ func (a *authDatabase) CreateToken(ctx context.Context, userID string, platformI return "", err } - deleteTokenKey, kickedTokenKey, err := a.checkToken(ctx, tokens, platformID) + deleteTokenKey, kickedTokenKey, adminTokens, err := a.checkToken(ctx, tokens, platformID) if err != nil { return "", err } if len(deleteTokenKey) != 0 { - err = a.cache.DeleteTokenByUidPid(ctx, userID, platformID, deleteTokenKey) + err = a.cache.DeleteTokenByTokenMap(ctx, userID, deleteTokenKey) if err != nil { return "", err } } if len(kickedTokenKey) != 0 { - for _, k := range kickedTokenKey { - err := a.cache.SetTokenFlagEx(ctx, userID, platformID, k, constant.KickedToken) - if err != nil { - return "", err + for plt, ks := range kickedTokenKey { + for _, k := range ks { + err := a.cache.SetTokenFlagEx(ctx, userID, plt, k, constant.KickedToken) + if err != nil { + return "", err + } + log.ZDebug(ctx, "kicked token in create token", "token", k) } - log.ZDebug(ctx, "kicked token in create token", "token", k) } } if len(adminTokens) != 0 { @@ -242,8 +245,9 @@ func (a *authDatabase) checkToken(ctx context.Context, tokens map[int]map[string //if l > adminTokenMaxNum { // kickToken = append(kickToken, adminToken[:l-adminTokenMaxNum]...) //} + var deleteAdminToken []string if platformID == constant.AdminPlatformID { - kickToken = append(kickToken, adminToken...) + deleteAdminToken = adminToken } - return deleteToken, kickToken, nil + return deleteToken, kickToken, deleteAdminToken, nil } diff --git a/pkg/common/storage/controller/msg.go b/pkg/common/storage/controller/msg.go index 117d6492d..53dd7f13d 100644 --- a/pkg/common/storage/controller/msg.go +++ b/pkg/common/storage/controller/msg.go @@ -35,7 +35,6 @@ import ( "github.com/openimsdk/open-im-server/v3/pkg/common/convert" "github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache" - "github.com/openimsdk/open-im-server/v3/pkg/common/storage/kafka" "github.com/openimsdk/protocol/constant" pbmsg "github.com/openimsdk/protocol/msg" "github.com/openimsdk/protocol/sdkws" diff --git a/pkg/common/storage/controller/msg_transfer.go b/pkg/common/storage/controller/msg_transfer.go index 011182b10..28392d66e 100644 --- a/pkg/common/storage/controller/msg_transfer.go +++ b/pkg/common/storage/controller/msg_transfer.go @@ -11,7 +11,6 @@ import ( "github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache" "github.com/openimsdk/open-im-server/v3/pkg/common/storage/database" - "github.com/openimsdk/open-im-server/v3/pkg/common/storage/kafka" "github.com/openimsdk/open-im-server/v3/pkg/common/storage/model" pbmsg "github.com/openimsdk/protocol/msg" "github.com/openimsdk/protocol/sdkws" diff --git a/pkg/common/storage/controller/push.go b/pkg/common/storage/controller/push.go index d792346e8..ce62a7258 100644 --- a/pkg/common/storage/controller/push.go +++ b/pkg/common/storage/controller/push.go @@ -18,7 +18,6 @@ import ( "context" "github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache" - "github.com/openimsdk/open-im-server/v3/pkg/common/storage/kafka" "github.com/openimsdk/protocol/push" "github.com/openimsdk/protocol/sdkws" "github.com/openimsdk/tools/log" diff --git a/tools/check-component/main.go b/tools/check-component/main.go index cce2e7c95..993f549be 100644 --- a/tools/check-component/main.go +++ b/tools/check-component/main.go @@ -25,11 +25,11 @@ import ( "time" "github.com/openimsdk/open-im-server/v3/pkg/common/config" - "github.com/openimsdk/open-im-server/v3/pkg/common/storage/kafka" "github.com/openimsdk/tools/db/mongoutil" "github.com/openimsdk/tools/db/redisutil" "github.com/openimsdk/tools/discovery/etcd" "github.com/openimsdk/tools/discovery/zookeeper" + "github.com/openimsdk/tools/mq/kafka" "github.com/openimsdk/tools/s3/minio" "github.com/openimsdk/tools/system/program" ) diff --git a/tools/stress-test/README.md b/tools/stress-test/README.md deleted file mode 100644 index 531233a20..000000000 --- a/tools/stress-test/README.md +++ /dev/null @@ -1,25 +0,0 @@ -# Stress Test - -## Usage - -You need set `TestTargetUserList` and `DefaultGroupID` variables. - -### Build - -```bash -go build -o _output/bin/tools/linux/amd64/stress-test tools/stress-test/main.go - -# or - -go build -o tools/stress-test/stress-test tools/stress-test/main.go -``` - -### Excute - -```bash -_output/bin/tools/linux/amd64/stress-test -c config/ - -#or - -tools/stress-test/stress-test -c config/ -``` diff --git a/tools/stress-test/main.go b/tools/stress-test/main.go deleted file mode 100755 index f845b5e93..000000000 --- a/tools/stress-test/main.go +++ /dev/null @@ -1,459 +0,0 @@ -package main - -import ( - "bytes" - "context" - "encoding/json" - "flag" - "fmt" - "io" - "net/http" - "os" - "os/signal" - "sync" - "syscall" - "time" - - "github.com/openimsdk/open-im-server/v3/pkg/apistruct" - "github.com/openimsdk/open-im-server/v3/pkg/common/config" - "github.com/openimsdk/protocol/auth" - "github.com/openimsdk/protocol/constant" - "github.com/openimsdk/protocol/group" - "github.com/openimsdk/protocol/relation" - "github.com/openimsdk/protocol/sdkws" - pbuser "github.com/openimsdk/protocol/user" - "github.com/openimsdk/tools/log" - "github.com/openimsdk/tools/system/program" -) - -/* - 1. Create one user every minute - 2. Import target users as friends - 3. Add users to the default group - 4. Send a message to the default group every second, containing index and current timestamp - 5. Create a new group every minute and invite target users to join -*/ - -// !!! ATTENTION: This variable is must be added! -var ( - // Use default userIDs List for testing, need to be created. - TestTargetUserList = []string{ - "", - } - DefaultGroupID = "" // Use default group ID for testing, need to be created. -) - -var ( - ApiAddress string - - // API method - GetAdminToken = "/auth/get_admin_token" - CreateUser = "/user/user_register" - ImportFriend = "/friend/import_friend" - InviteToGroup = "/group/invite_user_to_group" - SendMsg = "/msg/send_msg" - CreateGroup = "/group/create_group" - GetUserToken = "/auth/user_token" -) - -const ( - MaxUser = 10000 - MaxGroup = 1000 - - CreateUserTicker = 1 * time.Minute // Ticker is 1min in create user - SendMessageTicker = 1 * time.Second // Ticker is 1s in send message - CreateGroupTicker = 1 * time.Minute -) - -type BaseResp struct { - ErrCode int `json:"errCode"` - ErrMsg string `json:"errMsg"` - Data json.RawMessage `json:"data"` -} - -type StressTest struct { - Conf *conf - AdminUserID string - AdminToken string - DefaultGroupID string - DefaultUserID string - UserCounter int - GroupCounter int - MsgCounter int - CreatedUsers []string - CreatedGroups []string - Mutex sync.Mutex - Ctx context.Context - Cancel context.CancelFunc - HttpClient *http.Client - Wg sync.WaitGroup - Once sync.Once -} - -type conf struct { - Share config.Share - Api config.API -} - -func initConfig(configDir string) (*config.Share, *config.API, error) { - var ( - share = &config.Share{} - apiConfig = &config.API{} - ) - - err := config.Load(configDir, config.ShareFileName, config.EnvPrefixMap[config.ShareFileName], share) - if err != nil { - return nil, nil, err - } - - err = config.Load(configDir, config.OpenIMAPICfgFileName, config.EnvPrefixMap[config.OpenIMAPICfgFileName], apiConfig) - if err != nil { - return nil, nil, err - } - - return share, apiConfig, nil -} - -// Post Request -func (st *StressTest) PostRequest(ctx context.Context, url string, reqbody any) ([]byte, error) { - // Marshal body - jsonBody, err := json.Marshal(reqbody) - if err != nil { - log.ZError(ctx, "Failed to marshal request body", err, "url", url, "reqbody", reqbody) - return nil, err - } - - req, err := http.NewRequest(http.MethodPost, url, bytes.NewReader(jsonBody)) - if err != nil { - return nil, err - } - req.Header.Set("Content-Type", "application/json") - req.Header.Set("operationID", st.AdminUserID) - if st.AdminToken != "" { - req.Header.Set("token", st.AdminToken) - } - - // log.ZInfo(ctx, "Header info is ", "Content-Type", "application/json", "operationID", st.AdminUserID, "token", st.AdminToken) - - resp, err := st.HttpClient.Do(req) - if err != nil { - log.ZError(ctx, "Failed to send request", err, "url", url, "reqbody", reqbody) - return nil, err - } - defer resp.Body.Close() - - respBody, err := io.ReadAll(resp.Body) - if err != nil { - log.ZError(ctx, "Failed to read response body", err, "url", url) - return nil, err - } - - var baseResp BaseResp - if err := json.Unmarshal(respBody, &baseResp); err != nil { - log.ZError(ctx, "Failed to unmarshal response body", err, "url", url, "respBody", string(respBody)) - return nil, err - } - - if baseResp.ErrCode != 0 { - err = fmt.Errorf(baseResp.ErrMsg) - log.ZError(ctx, "Failed to send request", err, "url", url, "reqbody", reqbody, "resp", baseResp) - return nil, err - } - - return baseResp.Data, nil -} - -func (st *StressTest) GetAdminToken(ctx context.Context) (string, error) { - req := auth.GetAdminTokenReq{ - Secret: st.Conf.Share.Secret, - UserID: st.AdminUserID, - } - - resp, err := st.PostRequest(ctx, ApiAddress+GetAdminToken, &req) - if err != nil { - return "", err - } - - data := &auth.GetAdminTokenResp{} - if err := json.Unmarshal(resp, &data); err != nil { - return "", err - } - - return data.Token, nil -} - -func (st *StressTest) CreateUser(ctx context.Context, userID string) (string, error) { - user := &sdkws.UserInfo{ - UserID: userID, - Nickname: userID, - } - - req := pbuser.UserRegisterReq{ - Users: []*sdkws.UserInfo{user}, - } - - _, err := st.PostRequest(ctx, ApiAddress+CreateUser, &req) - if err != nil { - return "", err - } - - st.UserCounter++ - return userID, nil -} - -func (st *StressTest) ImportFriend(ctx context.Context, userID string) error { - req := relation.ImportFriendReq{ - OwnerUserID: userID, - FriendUserIDs: TestTargetUserList, - } - - _, err := st.PostRequest(ctx, ApiAddress+ImportFriend, &req) - if err != nil { - return err - } - - return nil -} - -func (st *StressTest) InviteToGroup(ctx context.Context, userID string) error { - req := group.InviteUserToGroupReq{ - GroupID: st.DefaultGroupID, - InvitedUserIDs: []string{userID}, - } - _, err := st.PostRequest(ctx, ApiAddress+InviteToGroup, &req) - if err != nil { - return err - } - - return nil -} - -func (st *StressTest) SendMsg(ctx context.Context, userID string) error { - contentObj := map[string]any{ - "content": fmt.Sprintf("index %d. The current time is %s", st.MsgCounter, time.Now().Format("2006-01-02 15:04:05.000")), - } - - req := &apistruct.SendMsgReq{ - SendMsg: apistruct.SendMsg{ - SendID: userID, - SenderNickname: userID, - GroupID: st.DefaultGroupID, - ContentType: constant.Text, - SessionType: constant.ReadGroupChatType, - Content: contentObj, - }, - } - - _, err := st.PostRequest(ctx, ApiAddress+SendMsg, &req) - if err != nil { - log.ZError(ctx, "Failed to send message", err, "userID", userID, "req", &req) - return err - } - - st.MsgCounter++ - - return nil -} - -func (st *StressTest) CreateGroup(ctx context.Context, userID string) (string, error) { - groupID := fmt.Sprintf("StressTestGroup_%d_%s", st.GroupCounter, time.Now().Format("20060102150405")) - - groupInfo := &sdkws.GroupInfo{ - GroupID: groupID, - GroupName: groupID, - GroupType: constant.WorkingGroup, - } - - req := group.CreateGroupReq{ - OwnerUserID: userID, - MemberUserIDs: TestTargetUserList, - GroupInfo: groupInfo, - } - - resp := group.CreateGroupResp{} - - response, err := st.PostRequest(ctx, ApiAddress+CreateGroup, &req) - if err != nil { - return "", err - } - - if err := json.Unmarshal(response, &resp); err != nil { - return "", err - } - - st.GroupCounter++ - - return resp.GroupInfo.GroupID, nil -} - -func main() { - var configPath string - // defaultConfigDir := filepath.Join("..", "..", "..", "..", "..", "config") - // flag.StringVar(&configPath, "c", defaultConfigDir, "config path") - flag.StringVar(&configPath, "c", "", "config path") - flag.Parse() - - if configPath == "" { - _, _ = fmt.Fprintln(os.Stderr, "config path is empty") - os.Exit(1) - return - } - - fmt.Printf(" Config Path: %s\n", configPath) - - share, apiConfig, err := initConfig(configPath) - if err != nil { - program.ExitWithError(err) - return - } - - ApiAddress = fmt.Sprintf("http://%s:%s", "127.0.0.1", fmt.Sprint(apiConfig.Api.Ports[0])) - - ctx, cancel := context.WithCancel(context.Background()) - ch := make(chan struct{}) - - defer cancel() - - st := &StressTest{ - Conf: &conf{ - Share: *share, - Api: *apiConfig, - }, - AdminUserID: share.IMAdminUserID[0], - Ctx: ctx, - Cancel: cancel, - HttpClient: &http.Client{ - Timeout: 50 * time.Second, - }, - } - - c := make(chan os.Signal, 1) - signal.Notify(c, os.Interrupt, syscall.SIGTERM) - go func() { - <-c - fmt.Println("\nReceived stop signal, stopping...") - - select { - case <-ch: - default: - close(ch) - } - - st.Cancel() - }() - - token, err := st.GetAdminToken(st.Ctx) - if err != nil { - log.ZError(ctx, "Get Admin Token failed.", err, "AdminUserID", st.AdminUserID) - } - - st.AdminToken = token - fmt.Println("Admin Token:", st.AdminToken) - fmt.Println("ApiAddress:", ApiAddress) - - st.DefaultGroupID = DefaultGroupID - - st.Wg.Add(1) - go func() { - defer st.Wg.Done() - - ticker := time.NewTicker(CreateUserTicker) - defer ticker.Stop() - - for st.UserCounter < MaxUser { - select { - case <-st.Ctx.Done(): - log.ZInfo(st.Ctx, "Stop Create user", "reason", "context done") - return - - case <-ticker.C: - // Create User - userID := fmt.Sprintf("%d_Stresstest_%s", st.UserCounter, time.Now().Format("0102150405")) - - userCreatedID, err := st.CreateUser(st.Ctx, userID) - if err != nil { - log.ZError(st.Ctx, "Create User failed.", err, "UserID", userID) - os.Exit(1) - return - } - // fmt.Println("User Created ID:", userCreatedID) - - // Import Friend - if err = st.ImportFriend(st.Ctx, userCreatedID); err != nil { - log.ZError(st.Ctx, "Import Friend failed.", err, "UserID", userCreatedID) - os.Exit(1) - return - } - - // Invite To Group - if err = st.InviteToGroup(st.Ctx, userCreatedID); err != nil { - log.ZError(st.Ctx, "Invite To Group failed.", err, "UserID", userCreatedID) - os.Exit(1) - return - } - - st.Once.Do(func() { - st.DefaultUserID = userCreatedID - fmt.Println("Default Send User Created ID:", userCreatedID) - close(ch) - }) - } - } - }() - - st.Wg.Add(1) - go func() { - defer st.Wg.Done() - - ticker := time.NewTicker(SendMessageTicker) - defer ticker.Stop() - <-ch - - for { - select { - case <-st.Ctx.Done(): - log.ZInfo(st.Ctx, "Stop Send message", "reason", "context done") - return - - case <-ticker.C: - // Send Message - if err = st.SendMsg(st.Ctx, st.DefaultSendUserID); err != nil { - log.ZError(st.Ctx, "Send Message failed.", err, "UserID", st.DefaultSendUserID) - continue - } - } - } - }() - - st.Wg.Add(1) - go func() { - defer st.Wg.Done() - - ticker := time.NewTicker(CreateGroupTicker) - defer ticker.Stop() - <-ch - - for st.GroupCounter < MaxGroup { - - select { - case <-st.Ctx.Done(): - log.ZInfo(st.Ctx, "Stop Create Group", "reason", "context done") - return - - case <-ticker.C: - - // Create Group - _, err := st.CreateGroup(st.Ctx, st.DefaultUserID) - if err != nil { - log.ZError(st.Ctx, "Create Group failed.", err, "UserID", st.DefaultUserID) - os.Exit(1) - return - } - - // fmt.Println("Group Created ID:", groupID) - } - } - }() - - st.Wg.Wait() -} From b40bc731e670677fae68f8bc7b8289557c0fb90f Mon Sep 17 00:00:00 2001 From: withchao <993506633@qq.com> Date: Thu, 15 May 2025 14:26:30 +0800 Subject: [PATCH 6/6] fix: merge conflicts --- go.mod | 2 -- 1 file changed, 2 deletions(-) diff --git a/go.mod b/go.mod index 88289d2a9..218f77a3d 100644 --- a/go.mod +++ b/go.mod @@ -219,5 +219,3 @@ require ( golang.org/x/crypto v0.27.0 // indirect gopkg.in/ini.v1 v1.67.0 // indirect ) - -//replace github.com/openimsdk/tools => /Users/chao/Desktop/code/tools