Merge remote-tracking branch 'origin/ver3' into ver3

pull/458/head
Gordon 2 years ago
commit 54ef87cd07

@ -11,6 +11,7 @@ import (
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/log" "github.com/OpenIMSDK/Open-IM-Server/pkg/common/log"
"github.com/OpenIMSDK/Open-IM-Server/pkg/discoveryregistry" "github.com/OpenIMSDK/Open-IM-Server/pkg/discoveryregistry"
pbPush "github.com/OpenIMSDK/Open-IM-Server/pkg/proto/push" pbPush "github.com/OpenIMSDK/Open-IM-Server/pkg/proto/push"
"github.com/OpenIMSDK/Open-IM-Server/pkg/rpcclient"
"google.golang.org/grpc" "google.golang.org/grpc"
) )
@ -26,7 +27,10 @@ func Start(client discoveryregistry.SvcDiscoveryRegistry, server *grpc.Server) e
cacheModel := cache.NewMsgCacheModel(rdb) cacheModel := cache.NewMsgCacheModel(rdb)
offlinePusher := NewOfflinePusher(cacheModel) offlinePusher := NewOfflinePusher(cacheModel)
database := controller.NewPushDatabase(cacheModel) database := controller.NewPushDatabase(cacheModel)
pusher := NewPusher(client, offlinePusher, database, localcache.NewGroupLocalCache(client), localcache.NewConversationLocalCache(client)) groupRpcClient := rpcclient.NewGroupRpcClient(client)
conversationRpcClient := rpcclient.NewConversationRpcClient(client)
msgRpcClient := rpcclient.NewMessageRpcClient(client)
pusher := NewPusher(client, offlinePusher, database, localcache.NewGroupLocalCache(&groupRpcClient), localcache.NewConversationLocalCache(&conversationRpcClient), &conversationRpcClient, &groupRpcClient, &msgRpcClient)
var wg sync.WaitGroup var wg sync.WaitGroup
wg.Add(2) wg.Add(2)
go func() { go func() {

@ -30,7 +30,7 @@ type Pusher struct {
offlinePusher offlinepush.OfflinePusher offlinePusher offlinepush.OfflinePusher
groupLocalCache *localcache.GroupLocalCache groupLocalCache *localcache.GroupLocalCache
conversationLocalCache *localcache.ConversationLocalCache conversationLocalCache *localcache.ConversationLocalCache
msgClient *rpcclient.MessageRpcClient msgRpcClient *rpcclient.MessageRpcClient
conversationRpcClient *rpcclient.ConversationRpcClient conversationRpcClient *rpcclient.ConversationRpcClient
groupRpcClient *rpcclient.GroupRpcClient groupRpcClient *rpcclient.GroupRpcClient
successCount int successCount int
@ -39,19 +39,17 @@ type Pusher struct {
var errNoOfflinePusher = errors.New("no offlinePusher is configured") var errNoOfflinePusher = errors.New("no offlinePusher is configured")
func NewPusher(discov discoveryregistry.SvcDiscoveryRegistry, offlinePusher offlinepush.OfflinePusher, database controller.PushDatabase, func NewPusher(discov discoveryregistry.SvcDiscoveryRegistry, offlinePusher offlinepush.OfflinePusher, database controller.PushDatabase,
groupLocalCache *localcache.GroupLocalCache, conversationLocalCache *localcache.ConversationLocalCache) *Pusher { groupLocalCache *localcache.GroupLocalCache, conversationLocalCache *localcache.ConversationLocalCache,
msgClient := rpcclient.NewMessageRpcClient(discov) conversationRpcClient *rpcclient.ConversationRpcClient, groupRpcClient *rpcclient.GroupRpcClient, msgRpcClient *rpcclient.MessageRpcClient) *Pusher {
conversationRpcClient := rpcclient.NewConversationRpcClient(discov)
groupRpcClient := rpcclient.NewGroupRpcClient(discov)
return &Pusher{ return &Pusher{
discov: discov, discov: discov,
database: database, database: database,
offlinePusher: offlinePusher, offlinePusher: offlinePusher,
groupLocalCache: groupLocalCache, groupLocalCache: groupLocalCache,
conversationLocalCache: conversationLocalCache, conversationLocalCache: conversationLocalCache,
msgClient: &msgClient, msgRpcClient: msgRpcClient,
conversationRpcClient: &conversationRpcClient, conversationRpcClient: conversationRpcClient,
groupRpcClient: &groupRpcClient, groupRpcClient: groupRpcClient,
} }
} }
@ -70,7 +68,7 @@ func NewOfflinePusher(cache cache.MsgModel) offlinepush.OfflinePusher {
func (p *Pusher) DeleteMemberAndSetConversationSeq(ctx context.Context, groupID string, userIDs []string) error { func (p *Pusher) DeleteMemberAndSetConversationSeq(ctx context.Context, groupID string, userIDs []string) error {
conevrsationID := utils.GetConversationIDBySessionType(constant.SuperGroupChatType, groupID) conevrsationID := utils.GetConversationIDBySessionType(constant.SuperGroupChatType, groupID)
maxSeq, err := p.msgClient.GetConversationMaxSeq(ctx, conevrsationID) maxSeq, err := p.msgRpcClient.GetConversationMaxSeq(ctx, conevrsationID)
if err != nil { if err != nil {
return err return err
} }

@ -77,8 +77,8 @@ func Start(client discoveryregistry.SvcDiscoveryRegistry, server *grpc.Server) e
MsgDatabase: msgDatabase, MsgDatabase: msgDatabase,
ExtendMsgDatabase: extendMsgDatabase, ExtendMsgDatabase: extendMsgDatabase,
RegisterCenter: client, RegisterCenter: client,
GroupLocalCache: localcache.NewGroupLocalCache(client), GroupLocalCache: localcache.NewGroupLocalCache(&groupRpcClient),
ConversationLocalCache: localcache.NewConversationLocalCache(client), ConversationLocalCache: localcache.NewConversationLocalCache(&conversationClient),
friend: &friendRpcClient, friend: &friendRpcClient,
MessageLocker: NewLockerMessage(cacheModel), MessageLocker: NewLockerMessage(cacheModel),
} }

@ -2,6 +2,7 @@ package a2r
import ( import (
"context" "context"
"github.com/OpenIMSDK/Open-IM-Server/pkg/checker"
"github.com/OpenIMSDK/Open-IM-Server/pkg/apiresp" "github.com/OpenIMSDK/Open-IM-Server/pkg/apiresp"
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/log" "github.com/OpenIMSDK/Open-IM-Server/pkg/common/log"
@ -21,12 +22,9 @@ func Call[A, B, C any](
apiresp.GinError(c, errs.ErrArgs.WithDetail(err.Error()).Wrap()) // 参数错误 apiresp.GinError(c, errs.ErrArgs.WithDetail(err.Error()).Wrap()) // 参数错误
return return
} }
if check, ok := any(&req).(interface{ Check() error }); ok { if err := checker.Validate(&req); err != nil {
if err := check.Check(); err != nil { apiresp.GinError(c, errs.ErrArgs.Wrap(err.Error())) // 参数校验失败
log.ZWarn(c, "custom check error", err, "req", req) return
apiresp.GinError(c, errs.ErrArgs.Wrap(err.Error())) // 参数校验失败
return
}
} }
data, err := rpc(client, c, &req) data, err := rpc(client, c, &req)
if err != nil { if err != nil {

@ -0,0 +1,14 @@
package checker
type Checker interface {
Check() error
}
func Validate(args any) error {
if checker, ok := args.(Checker); ok {
if err := checker.Check(); err != nil {
return err
}
}
return nil
}

@ -4,7 +4,6 @@ import (
"context" "context"
"sync" "sync"
"github.com/OpenIMSDK/Open-IM-Server/pkg/discoveryregistry"
"github.com/OpenIMSDK/Open-IM-Server/pkg/proto/conversation" "github.com/OpenIMSDK/Open-IM-Server/pkg/proto/conversation"
"github.com/OpenIMSDK/Open-IM-Server/pkg/rpcclient" "github.com/OpenIMSDK/Open-IM-Server/pkg/rpcclient"
) )
@ -13,7 +12,7 @@ type ConversationLocalCache struct {
lock sync.Mutex lock sync.Mutex
superGroupRecvMsgNotNotifyUserIDs map[string]Hash superGroupRecvMsgNotNotifyUserIDs map[string]Hash
conversationIDs map[string]Hash conversationIDs map[string]Hash
client *rpcclient.Conversation client *rpcclient.ConversationRpcClient
} }
type Hash struct { type Hash struct {
@ -21,11 +20,11 @@ type Hash struct {
ids []string ids []string
} }
func NewConversationLocalCache(discov discoveryregistry.SvcDiscoveryRegistry) *ConversationLocalCache { func NewConversationLocalCache(client *rpcclient.ConversationRpcClient) *ConversationLocalCache {
return &ConversationLocalCache{ return &ConversationLocalCache{
superGroupRecvMsgNotNotifyUserIDs: make(map[string]Hash), superGroupRecvMsgNotNotifyUserIDs: make(map[string]Hash),
conversationIDs: make(map[string]Hash), conversationIDs: make(map[string]Hash),
client: rpcclient.NewConversation(discov), client: client,
} }
} }

@ -4,7 +4,6 @@ import (
"context" "context"
"sync" "sync"
"github.com/OpenIMSDK/Open-IM-Server/pkg/discoveryregistry"
"github.com/OpenIMSDK/Open-IM-Server/pkg/errs" "github.com/OpenIMSDK/Open-IM-Server/pkg/errs"
"github.com/OpenIMSDK/Open-IM-Server/pkg/proto/group" "github.com/OpenIMSDK/Open-IM-Server/pkg/proto/group"
"github.com/OpenIMSDK/Open-IM-Server/pkg/rpcclient" "github.com/OpenIMSDK/Open-IM-Server/pkg/rpcclient"
@ -13,7 +12,7 @@ import (
type GroupLocalCache struct { type GroupLocalCache struct {
lock sync.Mutex lock sync.Mutex
cache map[string]GroupMemberIDsHash cache map[string]GroupMemberIDsHash
client *rpcclient.Group client *rpcclient.GroupRpcClient
} }
type GroupMemberIDsHash struct { type GroupMemberIDsHash struct {
@ -21,8 +20,7 @@ type GroupMemberIDsHash struct {
userIDs []string userIDs []string
} }
func NewGroupLocalCache(discov discoveryregistry.SvcDiscoveryRegistry) *GroupLocalCache { func NewGroupLocalCache(client *rpcclient.GroupRpcClient) *GroupLocalCache {
client := rpcclient.NewGroup(discov)
return &GroupLocalCache{ return &GroupLocalCache{
cache: make(map[string]GroupMemberIDsHash, 0), cache: make(map[string]GroupMemberIDsHash, 0),
client: client, client: client,

@ -3,6 +3,7 @@ package mw
import ( import (
"context" "context"
"fmt" "fmt"
"github.com/OpenIMSDK/Open-IM-Server/pkg/checker"
"math" "math"
"runtime" "runtime"
"strings" "strings"
@ -92,7 +93,12 @@ func RpcServerInterceptor(ctx context.Context, req interface{}, info *grpc.Unary
} }
} }
log.ZInfo(ctx, "rpc server req", "funcName", funcName, "req", rpcString(req)) log.ZInfo(ctx, "rpc server req", "funcName", funcName, "req", rpcString(req))
resp, err = handler(ctx, req) resp, err = func() (interface{}, error) {
if err := checker.Validate(req); err != nil {
return nil, err
}
return handler(ctx, req)
}()
if err == nil { if err == nil {
log.ZInfo(ctx, "rpc server resp", "funcName", funcName, "resp", rpcString(resp)) log.ZInfo(ctx, "rpc server resp", "funcName", funcName, "resp", rpcString(resp))
return resp, nil return resp, nil

Loading…
Cancel
Save