From c37e0227ffdccf1cfdc983af9ef3eae1f586a227 Mon Sep 17 00:00:00 2001 From: Monet Lee Date: Thu, 29 Aug 2024 12:17:19 +0800 Subject: [PATCH 1/3] fix: delay deleteObject func. (#2566) * refactor: refactor workflows contents. * add tool workflows. * update field. * fix: remove chat error. * Fix err. * fix error. * remove cn comment. * update workflows files. * update infra config. * move workflows. * feat: update bot. * fix: solve uncorrect outdated msg get. * update get docIDs logic. * update * update skip logic. * fix * update. * fix: delay deleteObject func. * remove unused content. --- internal/tools/cron_task.go | 41 ++++++++++++++++++------------------- 1 file changed, 20 insertions(+), 21 deletions(-) diff --git a/internal/tools/cron_task.go b/internal/tools/cron_task.go index afbaf34b4..337272d69 100644 --- a/internal/tools/cron_task.go +++ b/internal/tools/cron_task.go @@ -25,7 +25,6 @@ import ( pbconversation "github.com/openimsdk/protocol/conversation" "github.com/openimsdk/protocol/msg" - "github.com/openimsdk/protocol/third" "github.com/openimsdk/tools/mcontext" "github.com/openimsdk/tools/mw" "google.golang.org/grpc" @@ -59,10 +58,10 @@ func Start(ctx context.Context, config *CronTaskConfig) error { return err } - thirdConn, err := client.GetConn(ctx, config.Share.RpcRegisterName.Third) - if err != nil { - return err - } + // thirdConn, err := client.GetConn(ctx, config.Share.RpcRegisterName.Third) + // if err != nil { + // return err + // } conversationConn, err := client.GetConn(ctx, config.Share.RpcRegisterName.Conversation) if err != nil { @@ -71,7 +70,7 @@ func Start(ctx context.Context, config *CronTaskConfig) error { msgClient := msg.NewMsgClient(msgConn) conversationClient := pbconversation.NewConversationClient(conversationConn) - thirdClient := third.NewThirdClient(thirdConn) + // thirdClient := third.NewThirdClient(thirdConn) crontab := cron.New() @@ -115,21 +114,21 @@ func Start(ctx context.Context, config *CronTaskConfig) error { return errs.Wrap(err) } - // scheduled delete outdated file Objects and their datas in specific time. - deleteObjectFunc := func() { - now := time.Now() - deleteTime := now.Add(-time.Hour * 24 * time.Duration(config.CronTask.FileExpireTime)) - ctx := mcontext.SetOperationID(ctx, fmt.Sprintf("cron_%d_%d", os.Getpid(), deleteTime.UnixMilli())) - log.ZInfo(ctx, "deleteoutDatedData ", "deletetime", deleteTime, "timestamp", deleteTime.UnixMilli()) - if _, err := thirdClient.DeleteOutdatedData(ctx, &third.DeleteOutdatedDataReq{ExpireTime: deleteTime.UnixMilli()}); err != nil { - log.ZError(ctx, "cron deleteoutDatedData failed", err, "deleteTime", deleteTime, "cont", time.Since(now)) - return - } - log.ZInfo(ctx, "cron deleteoutDatedData success", "deltime", deleteTime, "cont", time.Since(now)) - } - if _, err := crontab.AddFunc(config.CronTask.CronExecuteTime, deleteObjectFunc); err != nil { - return errs.Wrap(err) - } + // // scheduled delete outdated file Objects and their datas in specific time. + // deleteObjectFunc := func() { + // now := time.Now() + // deleteTime := now.Add(-time.Hour * 24 * time.Duration(config.CronTask.FileExpireTime)) + // ctx := mcontext.SetOperationID(ctx, fmt.Sprintf("cron_%d_%d", os.Getpid(), deleteTime.UnixMilli())) + // log.ZInfo(ctx, "deleteoutDatedData ", "deletetime", deleteTime, "timestamp", deleteTime.UnixMilli()) + // if _, err := thirdClient.DeleteOutdatedData(ctx, &third.DeleteOutdatedDataReq{ExpireTime: deleteTime.UnixMilli()}); err != nil { + // log.ZError(ctx, "cron deleteoutDatedData failed", err, "deleteTime", deleteTime, "cont", time.Since(now)) + // return + // } + // log.ZInfo(ctx, "cron deleteoutDatedData success", "deltime", deleteTime, "cont", time.Since(now)) + // } + // if _, err := crontab.AddFunc(config.CronTask.CronExecuteTime, deleteObjectFunc); err != nil { + // return errs.Wrap(err) + // } log.ZInfo(ctx, "start cron task", "CronExecuteTime", config.CronTask.CronExecuteTime) crontab.Start() From 275491a1b5c10abb008afb79bf6c0665ac2d6628 Mon Sep 17 00:00:00 2001 From: chao <48119764+withchao@users.noreply.github.com> Date: Thu, 29 Aug 2024 15:18:23 +0800 Subject: [PATCH 2/3] fix: memory queue optimization (#2568) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * fix: GroupApplicationAcceptedNotification * fix: GroupApplicationAcceptedNotification * fix: NotificationUserInfoUpdate * cicd: robot automated Change * fix: component * fix: getConversationInfo * feat: cron task * feat: cron task * feat: cron task * feat: cron task * feat: cron task * fix: minio config url recognition error * update gomake version * update gomake version * fix: seq conversion bug * fix: redis pipe exec * fix: ImportFriends * fix: A large number of logs keysAndValues ​​length is not even * feat: mark read aggregate write * feat: online status supports redis cluster * feat: online status supports redis cluster * feat: online status supports redis cluster * merge * merge * read seq is written to mongo * read seq is written to mongo * fix: invitation to join group notification * fix: friend op_user_id * feat: optimizing asynchronous context * feat: optimizing memamq size --------- Co-authored-by: withchao --- internal/rpc/relation/friend.go | 2 +- pkg/common/storage/controller/friend.go | 2 +- pkg/rpcclient/msg.go | 8 ++++---- 3 files changed, 6 insertions(+), 6 deletions(-) diff --git a/internal/rpc/relation/friend.go b/internal/rpc/relation/friend.go index 3d29ad337..6b52181b6 100644 --- a/internal/rpc/relation/friend.go +++ b/internal/rpc/relation/friend.go @@ -121,7 +121,7 @@ func Start(ctx context.Context, config *Config, client discovery.SvcDiscoveryReg conversationRpcClient: rpcclient.NewConversationRpcClient(client, config.Share.RpcRegisterName.Conversation), config: config, webhookClient: webhook.NewWebhookClient(config.WebhooksConfig.URL), - queue: memamq.NewMemoryQueue(128, 1024*8), + queue: memamq.NewMemoryQueue(16, 1024*1024), }) return nil } diff --git a/pkg/common/storage/controller/friend.go b/pkg/common/storage/controller/friend.go index 94cb7d661..88a5fc863 100644 --- a/pkg/common/storage/controller/friend.go +++ b/pkg/common/storage/controller/friend.go @@ -160,7 +160,7 @@ func (f *friendDatabase) BecomeFriends(ctx context.Context, ownerUserID string, if err != nil { return err } - opUserID := mcontext.GetOperationID(ctx) + opUserID := mcontext.GetOpUserID(ctx) friends := make([]*model.Friend, 0, len(friendUserIDs)*2) myFriendsSet := datautil.SliceSetAny(myFriends, func(friend *model.Friend) string { return friend.FriendUserID diff --git a/pkg/rpcclient/msg.go b/pkg/rpcclient/msg.go index 958cb69a6..715014800 100644 --- a/pkg/rpcclient/msg.go +++ b/pkg/rpcclient/msg.go @@ -23,7 +23,6 @@ import ( "github.com/openimsdk/protocol/sdkws" "github.com/openimsdk/tools/discovery" "github.com/openimsdk/tools/log" - "github.com/openimsdk/tools/mcontext" "github.com/openimsdk/tools/mq/memamq" "github.com/openimsdk/tools/system/program" "github.com/openimsdk/tools/utils/idutil" @@ -270,8 +269,8 @@ func WithUserRpcClient(userRpcClient *UserRpcClient) NotificationSenderOptions { } const ( - notificationWorkerCount = 2 - notificationBufferSize = 200 + notificationWorkerCount = 16 + notificationBufferSize = 1024 * 1024 * 2 ) func NewNotificationSender(conf *config.Notification, opts ...NotificationSenderOptions) *NotificationSender { @@ -298,7 +297,8 @@ func WithRpcGetUserName() NotificationOptions { } func (s *NotificationSender) send(ctx context.Context, sendID, recvID string, contentType, sessionType int32, m proto.Message, opts ...NotificationOptions) { - ctx = mcontext.WithMustInfoCtx([]string{mcontext.GetOperationID(ctx), mcontext.GetOpUserID(ctx), mcontext.GetOpUserPlatform(ctx), mcontext.GetConnID(ctx)}) + //ctx = mcontext.WithMustInfoCtx([]string{mcontext.GetOperationID(ctx), mcontext.GetOpUserID(ctx), mcontext.GetOpUserPlatform(ctx), mcontext.GetConnID(ctx)}) + ctx = context.WithoutCancel(ctx) ctx, cancel := context.WithTimeout(ctx, time.Second*time.Duration(5)) defer cancel() n := sdkws.NotificationElem{Detail: jsonutil.StructToJsonString(m)} From 8c7de0416bea15f0f6758e796ab20e169e4b1eae Mon Sep 17 00:00:00 2001 From: icey-yu <119291641+icey-yu@users.noreply.github.com> Date: Fri, 30 Aug 2024 17:19:41 +0800 Subject: [PATCH 3/3] fix: fill opUser in invite tips (#2578) * fix: fill opUser in invite tips * fix: del code --- internal/rpc/group/notification.go | 29 ++++++----------------------- 1 file changed, 6 insertions(+), 23 deletions(-) diff --git a/internal/rpc/group/notification.go b/internal/rpc/group/notification.go index e87e7c495..4a69b6aed 100644 --- a/internal/rpc/group/notification.go +++ b/internal/rpc/group/notification.go @@ -535,28 +535,7 @@ func (g *GroupNotificationSender) MemberEnterNotification(ctx context.Context, g if err := g.conversationRpcClient.GroupChatFirstCreateConversation(ctx, groupID, entrantUserID); err != nil { return err } - opUserID := mcontext.GetOpUserID(ctx) - var opUser *sdkws.GroupMemberFullInfo - if authverify.IsAppManagerUid(ctx, g.config.Share.IMAdminUserID) { - opUser = &sdkws.GroupMemberFullInfo{ - GroupID: groupID, - UserID: opUserID, - AppMangerLevel: constant.AppAdmin, - } - } else { - users, err := g.getGroupMembers(ctx, groupID, []string{opUserID}) - if err != nil { - return err - } - if len(users) == 0 { - opUser = &sdkws.GroupMemberFullInfo{ - GroupID: groupID, - UserID: opUserID, - } - } else { - opUser = users[0] - } - } + var group *sdkws.GroupInfo group, err = g.getGroupInfo(ctx, groupID) if err != nil { @@ -566,7 +545,11 @@ func (g *GroupNotificationSender) MemberEnterNotification(ctx context.Context, g if err != nil { return err } - tips := &sdkws.MemberInvitedTips{Group: group, InvitedUserList: users, OpUser: opUser} + + tips := &sdkws.MemberInvitedTips{Group: group, InvitedUserList: users} + if err = g.fillOpUser(ctx, &tips.OpUser, tips.Group.GroupID); err != nil { + return nil + } g.setVersion(ctx, &tips.GroupMemberVersion, &tips.GroupMemberVersionID, database.GroupMemberVersionName, tips.Group.GroupID) g.Notification(ctx, mcontext.GetOpUserID(ctx), group.GroupID, constant.MemberInvitedNotification, tips) return nil