From 154a19eed1ec3b5398786424c13aef13a7b7cf7c Mon Sep 17 00:00:00 2001 From: Monet Lee Date: Fri, 29 Nov 2024 11:56:37 +0800 Subject: [PATCH] update gRPC Implement. --- internal/msggateway/hub_server.go | 1 + internal/push/push.go | 1 + internal/rpc/auth/auth.go | 1 + internal/rpc/conversation/conversation.go | 1 + internal/rpc/group/group.go | 1 + internal/rpc/msg/server.go | 1 + internal/rpc/relation/friend.go | 1 + internal/rpc/third/s3.go | 101 ++++++++++++---------- internal/rpc/third/third.go | 1 + internal/rpc/user/user.go | 1 + 10 files changed, 66 insertions(+), 44 deletions(-) diff --git a/internal/msggateway/hub_server.go b/internal/msggateway/hub_server.go index 23d915013..8eff32899 100644 --- a/internal/msggateway/hub_server.go +++ b/internal/msggateway/hub_server.go @@ -56,6 +56,7 @@ func (s *Server) Start(ctx context.Context, index int, conf *Config) error { } type Server struct { + msggateway.UnimplementedMsgGatewayServer rpcPort int LongConnServer LongConnServer config *Config diff --git a/internal/push/push.go b/internal/push/push.go index 850f91d22..7f14bced7 100644 --- a/internal/push/push.go +++ b/internal/push/push.go @@ -14,6 +14,7 @@ import ( ) type pushServer struct { + pbpush.UnimplementedPushMsgServiceServer database controller.PushDatabase disCov discovery.SvcDiscoveryRegistry offlinePusher offlinepush.OfflinePusher diff --git a/internal/rpc/auth/auth.go b/internal/rpc/auth/auth.go index a1acfd931..efa8e5d80 100644 --- a/internal/rpc/auth/auth.go +++ b/internal/rpc/auth/auth.go @@ -40,6 +40,7 @@ import ( ) type authServer struct { + pbauth.UnimplementedAuthServer authDatabase controller.AuthDatabase userRpcClient *rpcclient.UserRpcClient RegisterCenter discovery.SvcDiscoveryRegistry diff --git a/internal/rpc/conversation/conversation.go b/internal/rpc/conversation/conversation.go index 0c8a6fd85..ce720e640 100644 --- a/internal/rpc/conversation/conversation.go +++ b/internal/rpc/conversation/conversation.go @@ -43,6 +43,7 @@ import ( ) type conversationServer struct { + pbconversation.UnimplementedConversationServer msgRpcClient *rpcclient.MessageRpcClient user *rpcclient.UserRpcClient groupRpcClient *rpcclient.GroupRpcClient diff --git a/internal/rpc/group/group.go b/internal/rpc/group/group.go index b5ab1b209..9e610df0f 100644 --- a/internal/rpc/group/group.go +++ b/internal/rpc/group/group.go @@ -57,6 +57,7 @@ import ( ) type groupServer struct { + pbgroup.UnimplementedGroupServer db controller.GroupDatabase user rpcclient.UserRpcClient notification *GroupNotificationSender diff --git a/internal/rpc/msg/server.go b/internal/rpc/msg/server.go index bf8781747..6d5922ce3 100644 --- a/internal/rpc/msg/server.go +++ b/internal/rpc/msg/server.go @@ -55,6 +55,7 @@ type ( msgNotificationSender *MsgNotificationSender // RPC client for sending msg notifications. config *Config // Global configuration settings. webhookClient *webhook.Client + msg.UnimplementedMsgServer } Config struct { diff --git a/internal/rpc/relation/friend.go b/internal/rpc/relation/friend.go index 2f4843a8e..036c7aff5 100644 --- a/internal/rpc/relation/friend.go +++ b/internal/rpc/relation/friend.go @@ -43,6 +43,7 @@ import ( ) type friendServer struct { + relation.UnimplementedFriendServer db controller.FriendDatabase blackDatabase controller.BlackDatabase userRpcClient *rpcclient.UserRpcClient diff --git a/internal/rpc/third/s3.go b/internal/rpc/third/s3.go index f9625d57f..ec3cdc032 100644 --- a/internal/rpc/third/s3.go +++ b/internal/rpc/third/s3.go @@ -290,74 +290,87 @@ func (t *thirdServer) apiAddress(prefix, name string) string { func (t *thirdServer) DeleteOutdatedData(ctx context.Context, req *third.DeleteOutdatedDataReq) (*third.DeleteOutdatedDataResp, error) { var conf config.Third expireTime := time.UnixMilli(req.ExpireTime) - var deltotal int - excuteNum := 5 findPagination := &sdkws.RequestPagination{ PageNumber: 1, - ShowNumber: 1000, + ShowNumber: 500, } - for i := 0; i < excuteNum; i++ { - // Find all expired data in S3 database - total, models, err := t.s3dataBase.FindByExpires(ctx, expireTime, findPagination) + log.ZDebug(ctx, "del type is ", "needDelType", req.ObjectGroup) + + // Find all expired data in S3 database + total, models, err := t.s3dataBase.FindNeedDeleteObjectByDB(ctx, expireTime, req.ObjectGroup, findPagination) + if err != nil && errs.Unwrap(err) != mongo.ErrNoDocuments { + return nil, errs.Wrap(err) + } + + if total == 0 { + log.ZDebug(ctx, "Not have OutdatedData", "delete Total", total) + return &third.DeleteOutdatedDataResp{Count: int32(total)}, nil + } + + needDelObjectKeys := make([]string, len(models)) + for _, model := range models { + needDelObjectKeys = append(needDelObjectKeys, model.Key) + } + + // Remove duplicate keys, have the same key use in different models + needDelObjectKeys = datautil.Distinct(needDelObjectKeys) + + for _, key := range needDelObjectKeys { + // Find all models by key + keyModels, err := t.s3dataBase.FindModelsByKey(ctx, key) if err != nil && errs.Unwrap(err) != mongo.ErrNoDocuments { return nil, errs.Wrap(err) } - needDelObjectKeys := make([]string, len(models)) - for _, model := range models { - needDelObjectKeys = append(needDelObjectKeys, model.Key) - } - // Remove duplicate keys, have the same key use in different models - needDelObjectKeys = datautil.Distinct(needDelObjectKeys) - - for _, key := range needDelObjectKeys { - // Find all models by key - keyModels, err := t.s3dataBase.FindModelsByKey(ctx, key) - if err != nil && errs.Unwrap(err) != mongo.ErrNoDocuments { - return nil, errs.Wrap(err) + // check keyModels, if all keyModels. + needDelKey := true // Default can delete + for _, keymodel := range keyModels { + // If group is empty or CreateTime is after expireTime, can't delete this key + if keymodel.Group == "" || keymodel.CreateTime.After(expireTime) { + needDelKey = false + break } + } - // check keyModels, if all keyModels. - needDelKey := true // Default can delete - for _, model := range keyModels { - // If group is empty or CreateTime is after expireTime, can't delete this key - if model.Group == "" || model.CreateTime.After(expireTime) { - needDelKey = false - break + // If this object is not referenced by not expire data, delete it + if needDelKey && t.minio != nil { + // If have a thumbnail, delete it + thumbnailKey, _ := t.getMinioImageThumbnailKey(ctx, key) + if thumbnailKey != "" { + err := t.s3dataBase.DeleteObject(ctx, thumbnailKey) + if err != nil { + log.ZWarn(ctx, "Delete thumbnail object is error:", errs.Wrap(err), "thumbnailKey", thumbnailKey) } } - // If this object is not referenced by not expire data, delete it - if needDelKey && t.minio != nil { - thumbnailKey, _ := t.getMinioImageThumbnailKey(ctx, key) - - t.s3dataBase.DeleteObject(ctx, thumbnailKey) - t.s3dataBase.DeleteObject(ctx, key) - - t.s3dataBase.DelS3Key(ctx, conf.Object.Enable, key) + // Delete object + err = t.s3dataBase.DeleteObject(ctx, key) + if err != nil { + log.ZWarn(ctx, "Delete object is error", errs.Wrap(err), "object key", key) } - } - for _, model := range models { - // Delete all expired data row in S3 database - err := t.s3dataBase.DeleteSpecifiedData(ctx, model.Engine, model.Name) + // Delete cache key + err = t.s3dataBase.DelS3Key(ctx, conf.Object.Enable, key) if err != nil { - return nil, errs.Wrap(err) + log.ZWarn(ctx, "Delete cache key is error:", errs.Wrap(err), "cache S3 key:", key) } } + } - if total < int64(findPagination.ShowNumber) { - break + // handle delete data in S3 database + for _, model := range models { + // Delete all expired data row in S3 database + err := t.s3dataBase.DeleteSpecifiedData(ctx, model.Engine, model.Name) + if err != nil { + return nil, errs.Wrap(err) } - - deltotal += int(total) } - log.ZDebug(ctx, "DeleteOutdatedData", "delete Total", deltotal) + log.ZDebug(ctx, "DeleteOutdatedData", "delete Total", total) - return &third.DeleteOutdatedDataResp{}, nil + return &third.DeleteOutdatedDataResp{Count: int32(total)}, nil } type FormDataMate struct { diff --git a/internal/rpc/third/third.go b/internal/rpc/third/third.go index d37689b31..4206a2d6f 100644 --- a/internal/rpc/third/third.go +++ b/internal/rpc/third/third.go @@ -38,6 +38,7 @@ import ( ) type thirdServer struct { + third.UnimplementedThirdServer thirdDatabase controller.ThirdDatabase s3dataBase controller.S3Database userRpcClient rpcclient.UserRpcClient diff --git a/internal/rpc/user/user.go b/internal/rpc/user/user.go index b47c516d9..2dfbb01df 100644 --- a/internal/rpc/user/user.go +++ b/internal/rpc/user/user.go @@ -52,6 +52,7 @@ import ( ) type userServer struct { + pbuser.UnimplementedUserServer online cache.OnlineCache db controller.UserDatabase friendNotificationSender *relation.FriendNotificationSender