feat: GroupApplicationAgreeMemberEnterNotification splitting, rpc body size limit

pull/3297/head
withchao 5 months ago
parent dea42ef84c
commit e479a5f488

@ -7,3 +7,7 @@ multiLogin:
policy: 1 policy: 1
# max num of tokens in one end # max num of tokens in one end
maxNumOneEnd: 30 maxNumOneEnd: 30
rpcMaxBodySize:
requestMaxBodySize: 8388608
responseMaxBodySize: 8388608

@ -522,6 +522,20 @@ func (g *NotificationSender) MemberKickedNotification(ctx context.Context, tips
} }
func (g *NotificationSender) GroupApplicationAgreeMemberEnterNotification(ctx context.Context, groupID string, SendMessage *bool, invitedOpUserID string, entrantUserID ...string) error { func (g *NotificationSender) GroupApplicationAgreeMemberEnterNotification(ctx context.Context, groupID string, SendMessage *bool, invitedOpUserID string, entrantUserID ...string) error {
const singleQuantity = 50
for start := 0; start < len(entrantUserID); start += singleQuantity {
end := start + singleQuantity
if end > len(entrantUserID) {
end = len(entrantUserID)
}
if err := g.groupApplicationAgreeMemberEnterNotification(ctx, groupID, SendMessage, invitedOpUserID, entrantUserID[start:end]...); err != nil {
return err
}
}
return nil
}
func (g *NotificationSender) groupApplicationAgreeMemberEnterNotification(ctx context.Context, groupID string, SendMessage *bool, invitedOpUserID string, entrantUserID ...string) error {
var err error var err error
defer func() { defer func() {
if err != nil { if err != nil {

@ -351,6 +351,12 @@ type Share struct {
Secret string `yaml:"secret"` Secret string `yaml:"secret"`
IMAdminUserID []string `yaml:"imAdminUserID"` IMAdminUserID []string `yaml:"imAdminUserID"`
MultiLogin MultiLogin `yaml:"multiLogin"` MultiLogin MultiLogin `yaml:"multiLogin"`
RPCMaxBodySize MaxRequestBody `yaml:"rpcMaxBodySize"`
}
type MaxRequestBody struct {
RequestMaxBodySize int `yaml:"requestMaxBodySize"`
ResponseMaxBodySize int `yaml:"responseMaxBodySize"`
} }
type MultiLogin struct { type MultiLogin struct {

@ -21,6 +21,7 @@ import (
"net" "net"
"os" "os"
"os/signal" "os/signal"
"reflect"
"strconv" "strconv"
"syscall" "syscall"
"time" "time"
@ -45,6 +46,36 @@ func init() {
prommetrics.RegistryAll() prommetrics.RegistryAll()
} }
func getConfigRpcMaxRequestBody(value reflect.Value) *conf.MaxRequestBody {
for value.Kind() == reflect.Pointer {
value = value.Elem()
}
if value.Kind() == reflect.Struct {
num := value.NumField()
for i := 0; i < num; i++ {
field := value.Field(i)
if !field.CanInterface() {
continue
}
for field.Kind() == reflect.Pointer {
field = field.Elem()
}
switch elem := field.Interface().(type) {
case conf.Share:
return &elem.RPCMaxBodySize
case conf.MaxRequestBody:
return &elem
}
if field.Kind() == reflect.Struct {
if elem := getConfigRpcMaxRequestBody(field); elem != nil {
return elem
}
}
}
}
return nil
}
func Start[T any](ctx context.Context, disc *conf.Discovery, prometheusConfig *conf.Prometheus, listenIP, func Start[T any](ctx context.Context, disc *conf.Discovery, prometheusConfig *conf.Prometheus, listenIP,
registerIP string, autoSetPorts bool, rpcPorts []int, index int, rpcRegisterName string, notification *conf.Notification, config T, registerIP string, autoSetPorts bool, rpcPorts []int, index int, rpcRegisterName string, notification *conf.Notification, config T,
watchConfigNames []string, watchServiceNames []string, watchConfigNames []string, watchServiceNames []string,
@ -55,7 +86,24 @@ func Start[T any](ctx context.Context, disc *conf.Discovery, prometheusConfig *c
conf.InitNotification(notification) conf.InitNotification(notification)
} }
options = append(options, mw.GrpcServer()) maxRequestBody := getConfigRpcMaxRequestBody(reflect.ValueOf(config))
log.ZDebug(ctx, "rpc start", "rpcMaxRequestBody", maxRequestBody, "rpcRegisterName", rpcRegisterName, "registerIP", registerIP, "listenIP", listenIP)
options = append(options,
mw.GrpcServer(),
)
var clientOptions []grpc.DialOption
if maxRequestBody != nil {
if maxRequestBody.RequestMaxBodySize > 0 {
options = append(options, grpc.MaxRecvMsgSize(maxRequestBody.RequestMaxBodySize))
clientOptions = append(clientOptions, grpc.WithDefaultCallOptions(grpc.MaxCallSendMsgSize(maxRequestBody.RequestMaxBodySize)))
}
if maxRequestBody.ResponseMaxBodySize > 0 {
options = append(options, grpc.MaxSendMsgSize(maxRequestBody.ResponseMaxBodySize))
clientOptions = append(clientOptions, grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(maxRequestBody.ResponseMaxBodySize)))
}
}
registerIP, err := network.GetRpcRegisterIP(registerIP) registerIP, err := network.GetRpcRegisterIP(registerIP)
if err != nil { if err != nil {
@ -84,6 +132,9 @@ func Start[T any](ctx context.Context, disc *conf.Discovery, prometheusConfig *c
mw.GrpcClient(), grpc.WithTransportCredentials(insecure.NewCredentials()), mw.GrpcClient(), grpc.WithTransportCredentials(insecure.NewCredentials()),
grpc.WithDefaultServiceConfig(fmt.Sprintf(`{"LoadBalancingPolicy": "%s"}`, "round_robin")), grpc.WithDefaultServiceConfig(fmt.Sprintf(`{"LoadBalancingPolicy": "%s"}`, "round_robin")),
) )
if len(clientOptions) > 0 {
client.AddOption(clientOptions...)
}
ctx, cancel := context.WithCancelCause(ctx) ctx, cancel := context.WithCancelCause(ctx)

Loading…
Cancel
Save