From e479a5f4881c12918a808cf6d05a503323f2a154 Mon Sep 17 00:00:00 2001 From: withchao <993506633@qq.com> Date: Tue, 22 Apr 2025 15:55:09 +0800 Subject: [PATCH] feat: GroupApplicationAgreeMemberEnterNotification splitting, rpc body size limit --- config/share.yml | 4 +++ internal/rpc/group/notification.go | 14 ++++++++ pkg/common/config/config.go | 12 +++++-- pkg/common/startrpc/start.go | 53 +++++++++++++++++++++++++++++- 4 files changed, 79 insertions(+), 4 deletions(-) diff --git a/config/share.yml b/config/share.yml index 0913c1e88..2e9821436 100644 --- a/config/share.yml +++ b/config/share.yml @@ -7,3 +7,7 @@ multiLogin: policy: 1 # max num of tokens in one end maxNumOneEnd: 30 + +rpcMaxBodySize: + requestMaxBodySize: 8388608 + responseMaxBodySize: 8388608 diff --git a/internal/rpc/group/notification.go b/internal/rpc/group/notification.go index f9da88863..8c5ecb2f6 100644 --- a/internal/rpc/group/notification.go +++ b/internal/rpc/group/notification.go @@ -522,6 +522,20 @@ func (g *NotificationSender) MemberKickedNotification(ctx context.Context, tips } func (g *NotificationSender) GroupApplicationAgreeMemberEnterNotification(ctx context.Context, groupID string, SendMessage *bool, invitedOpUserID string, entrantUserID ...string) error { + const singleQuantity = 50 + for start := 0; start < len(entrantUserID); start += singleQuantity { + end := start + singleQuantity + if end > len(entrantUserID) { + end = len(entrantUserID) + } + if err := g.groupApplicationAgreeMemberEnterNotification(ctx, groupID, SendMessage, invitedOpUserID, entrantUserID[start:end]...); err != nil { + return err + } + } + return nil +} + +func (g *NotificationSender) groupApplicationAgreeMemberEnterNotification(ctx context.Context, groupID string, SendMessage *bool, invitedOpUserID string, entrantUserID ...string) error { var err error defer func() { if err != nil { diff --git a/pkg/common/config/config.go b/pkg/common/config/config.go index fa93e3406..d5ae68ec0 100644 --- a/pkg/common/config/config.go +++ b/pkg/common/config/config.go @@ -348,9 +348,15 @@ type AfterConfig struct { } type Share struct { - Secret string `yaml:"secret"` - IMAdminUserID []string `yaml:"imAdminUserID"` - MultiLogin MultiLogin `yaml:"multiLogin"` + Secret string `yaml:"secret"` + IMAdminUserID []string `yaml:"imAdminUserID"` + MultiLogin MultiLogin `yaml:"multiLogin"` + RPCMaxBodySize MaxRequestBody `yaml:"rpcMaxBodySize"` +} + +type MaxRequestBody struct { + RequestMaxBodySize int `yaml:"requestMaxBodySize"` + ResponseMaxBodySize int `yaml:"responseMaxBodySize"` } type MultiLogin struct { diff --git a/pkg/common/startrpc/start.go b/pkg/common/startrpc/start.go index a69edae20..af50c408d 100644 --- a/pkg/common/startrpc/start.go +++ b/pkg/common/startrpc/start.go @@ -21,6 +21,7 @@ import ( "net" "os" "os/signal" + "reflect" "strconv" "syscall" "time" @@ -45,6 +46,36 @@ func init() { prommetrics.RegistryAll() } +func getConfigRpcMaxRequestBody(value reflect.Value) *conf.MaxRequestBody { + for value.Kind() == reflect.Pointer { + value = value.Elem() + } + if value.Kind() == reflect.Struct { + num := value.NumField() + for i := 0; i < num; i++ { + field := value.Field(i) + if !field.CanInterface() { + continue + } + for field.Kind() == reflect.Pointer { + field = field.Elem() + } + switch elem := field.Interface().(type) { + case conf.Share: + return &elem.RPCMaxBodySize + case conf.MaxRequestBody: + return &elem + } + if field.Kind() == reflect.Struct { + if elem := getConfigRpcMaxRequestBody(field); elem != nil { + return elem + } + } + } + } + return nil +} + func Start[T any](ctx context.Context, disc *conf.Discovery, prometheusConfig *conf.Prometheus, listenIP, registerIP string, autoSetPorts bool, rpcPorts []int, index int, rpcRegisterName string, notification *conf.Notification, config T, watchConfigNames []string, watchServiceNames []string, @@ -55,7 +86,24 @@ func Start[T any](ctx context.Context, disc *conf.Discovery, prometheusConfig *c conf.InitNotification(notification) } - options = append(options, mw.GrpcServer()) + maxRequestBody := getConfigRpcMaxRequestBody(reflect.ValueOf(config)) + + log.ZDebug(ctx, "rpc start", "rpcMaxRequestBody", maxRequestBody, "rpcRegisterName", rpcRegisterName, "registerIP", registerIP, "listenIP", listenIP) + + options = append(options, + mw.GrpcServer(), + ) + var clientOptions []grpc.DialOption + if maxRequestBody != nil { + if maxRequestBody.RequestMaxBodySize > 0 { + options = append(options, grpc.MaxRecvMsgSize(maxRequestBody.RequestMaxBodySize)) + clientOptions = append(clientOptions, grpc.WithDefaultCallOptions(grpc.MaxCallSendMsgSize(maxRequestBody.RequestMaxBodySize))) + } + if maxRequestBody.ResponseMaxBodySize > 0 { + options = append(options, grpc.MaxSendMsgSize(maxRequestBody.ResponseMaxBodySize)) + clientOptions = append(clientOptions, grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(maxRequestBody.ResponseMaxBodySize))) + } + } registerIP, err := network.GetRpcRegisterIP(registerIP) if err != nil { @@ -84,6 +132,9 @@ func Start[T any](ctx context.Context, disc *conf.Discovery, prometheusConfig *c mw.GrpcClient(), grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithDefaultServiceConfig(fmt.Sprintf(`{"LoadBalancingPolicy": "%s"}`, "round_robin")), ) + if len(clientOptions) > 0 { + client.AddOption(clientOptions...) + } ctx, cancel := context.WithCancelCause(ctx)