Merge remote-tracking branch 'origin/ver3' into ver3

pull/458/head
withchao 2 years ago
commit 48596a4769

@ -11,6 +11,7 @@ import (
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/log" "github.com/OpenIMSDK/Open-IM-Server/pkg/common/log"
"github.com/OpenIMSDK/Open-IM-Server/pkg/discoveryregistry" "github.com/OpenIMSDK/Open-IM-Server/pkg/discoveryregistry"
pbPush "github.com/OpenIMSDK/Open-IM-Server/pkg/proto/push" pbPush "github.com/OpenIMSDK/Open-IM-Server/pkg/proto/push"
"github.com/OpenIMSDK/Open-IM-Server/pkg/rpcclient"
"google.golang.org/grpc" "google.golang.org/grpc"
) )
@ -26,7 +27,10 @@ func Start(client discoveryregistry.SvcDiscoveryRegistry, server *grpc.Server) e
cacheModel := cache.NewMsgCacheModel(rdb) cacheModel := cache.NewMsgCacheModel(rdb)
offlinePusher := NewOfflinePusher(cacheModel) offlinePusher := NewOfflinePusher(cacheModel)
database := controller.NewPushDatabase(cacheModel) database := controller.NewPushDatabase(cacheModel)
pusher := NewPusher(client, offlinePusher, database, localcache.NewGroupLocalCache(client), localcache.NewConversationLocalCache(client)) groupRpcClient := rpcclient.NewGroupRpcClient(client)
conversationRpcClient := rpcclient.NewConversationRpcClient(client)
msgRpcClient := rpcclient.NewMessageRpcClient(client)
pusher := NewPusher(client, offlinePusher, database, localcache.NewGroupLocalCache(&groupRpcClient), localcache.NewConversationLocalCache(&conversationRpcClient), &conversationRpcClient, &groupRpcClient, &msgRpcClient)
var wg sync.WaitGroup var wg sync.WaitGroup
wg.Add(2) wg.Add(2)
go func() { go func() {

@ -30,7 +30,7 @@ type Pusher struct {
offlinePusher offlinepush.OfflinePusher offlinePusher offlinepush.OfflinePusher
groupLocalCache *localcache.GroupLocalCache groupLocalCache *localcache.GroupLocalCache
conversationLocalCache *localcache.ConversationLocalCache conversationLocalCache *localcache.ConversationLocalCache
msgClient *rpcclient.MessageRpcClient msgRpcClient *rpcclient.MessageRpcClient
conversationRpcClient *rpcclient.ConversationRpcClient conversationRpcClient *rpcclient.ConversationRpcClient
groupRpcClient *rpcclient.GroupRpcClient groupRpcClient *rpcclient.GroupRpcClient
successCount int successCount int
@ -39,19 +39,17 @@ type Pusher struct {
var errNoOfflinePusher = errors.New("no offlinePusher is configured") var errNoOfflinePusher = errors.New("no offlinePusher is configured")
func NewPusher(discov discoveryregistry.SvcDiscoveryRegistry, offlinePusher offlinepush.OfflinePusher, database controller.PushDatabase, func NewPusher(discov discoveryregistry.SvcDiscoveryRegistry, offlinePusher offlinepush.OfflinePusher, database controller.PushDatabase,
groupLocalCache *localcache.GroupLocalCache, conversationLocalCache *localcache.ConversationLocalCache) *Pusher { groupLocalCache *localcache.GroupLocalCache, conversationLocalCache *localcache.ConversationLocalCache,
msgClient := rpcclient.NewMessageRpcClient(discov) conversationRpcClient *rpcclient.ConversationRpcClient, groupRpcClient *rpcclient.GroupRpcClient, msgRpcClient *rpcclient.MessageRpcClient) *Pusher {
conversationRpcClient := rpcclient.NewConversationRpcClient(discov)
groupRpcClient := rpcclient.NewGroupRpcClient(discov)
return &Pusher{ return &Pusher{
discov: discov, discov: discov,
database: database, database: database,
offlinePusher: offlinePusher, offlinePusher: offlinePusher,
groupLocalCache: groupLocalCache, groupLocalCache: groupLocalCache,
conversationLocalCache: conversationLocalCache, conversationLocalCache: conversationLocalCache,
msgClient: &msgClient, msgRpcClient: msgRpcClient,
conversationRpcClient: &conversationRpcClient, conversationRpcClient: conversationRpcClient,
groupRpcClient: &groupRpcClient, groupRpcClient: groupRpcClient,
} }
} }
@ -70,7 +68,7 @@ func NewOfflinePusher(cache cache.MsgModel) offlinepush.OfflinePusher {
func (p *Pusher) DeleteMemberAndSetConversationSeq(ctx context.Context, groupID string, userIDs []string) error { func (p *Pusher) DeleteMemberAndSetConversationSeq(ctx context.Context, groupID string, userIDs []string) error {
conevrsationID := utils.GetConversationIDBySessionType(constant.SuperGroupChatType, groupID) conevrsationID := utils.GetConversationIDBySessionType(constant.SuperGroupChatType, groupID)
maxSeq, err := p.msgClient.GetConversationMaxSeq(ctx, conevrsationID) maxSeq, err := p.msgRpcClient.GetConversationMaxSeq(ctx, conevrsationID)
if err != nil { if err != nil {
return err return err
} }

@ -77,8 +77,8 @@ func Start(client discoveryregistry.SvcDiscoveryRegistry, server *grpc.Server) e
MsgDatabase: msgDatabase, MsgDatabase: msgDatabase,
ExtendMsgDatabase: extendMsgDatabase, ExtendMsgDatabase: extendMsgDatabase,
RegisterCenter: client, RegisterCenter: client,
GroupLocalCache: localcache.NewGroupLocalCache(client), GroupLocalCache: localcache.NewGroupLocalCache(&groupRpcClient),
ConversationLocalCache: localcache.NewConversationLocalCache(client), ConversationLocalCache: localcache.NewConversationLocalCache(&conversationClient),
friend: &friendRpcClient, friend: &friendRpcClient,
MessageLocker: NewLockerMessage(cacheModel), MessageLocker: NewLockerMessage(cacheModel),
} }

@ -4,7 +4,6 @@ import (
"context" "context"
"sync" "sync"
"github.com/OpenIMSDK/Open-IM-Server/pkg/discoveryregistry"
"github.com/OpenIMSDK/Open-IM-Server/pkg/proto/conversation" "github.com/OpenIMSDK/Open-IM-Server/pkg/proto/conversation"
"github.com/OpenIMSDK/Open-IM-Server/pkg/rpcclient" "github.com/OpenIMSDK/Open-IM-Server/pkg/rpcclient"
) )
@ -13,7 +12,7 @@ type ConversationLocalCache struct {
lock sync.Mutex lock sync.Mutex
superGroupRecvMsgNotNotifyUserIDs map[string]Hash superGroupRecvMsgNotNotifyUserIDs map[string]Hash
conversationIDs map[string]Hash conversationIDs map[string]Hash
client *rpcclient.Conversation client *rpcclient.ConversationRpcClient
} }
type Hash struct { type Hash struct {
@ -21,11 +20,11 @@ type Hash struct {
ids []string ids []string
} }
func NewConversationLocalCache(discov discoveryregistry.SvcDiscoveryRegistry) *ConversationLocalCache { func NewConversationLocalCache(client *rpcclient.ConversationRpcClient) *ConversationLocalCache {
return &ConversationLocalCache{ return &ConversationLocalCache{
superGroupRecvMsgNotNotifyUserIDs: make(map[string]Hash), superGroupRecvMsgNotNotifyUserIDs: make(map[string]Hash),
conversationIDs: make(map[string]Hash), conversationIDs: make(map[string]Hash),
client: rpcclient.NewConversation(discov), client: client,
} }
} }

@ -4,7 +4,6 @@ import (
"context" "context"
"sync" "sync"
"github.com/OpenIMSDK/Open-IM-Server/pkg/discoveryregistry"
"github.com/OpenIMSDK/Open-IM-Server/pkg/errs" "github.com/OpenIMSDK/Open-IM-Server/pkg/errs"
"github.com/OpenIMSDK/Open-IM-Server/pkg/proto/group" "github.com/OpenIMSDK/Open-IM-Server/pkg/proto/group"
"github.com/OpenIMSDK/Open-IM-Server/pkg/rpcclient" "github.com/OpenIMSDK/Open-IM-Server/pkg/rpcclient"
@ -13,7 +12,7 @@ import (
type GroupLocalCache struct { type GroupLocalCache struct {
lock sync.Mutex lock sync.Mutex
cache map[string]GroupMemberIDsHash cache map[string]GroupMemberIDsHash
client *rpcclient.Group client *rpcclient.GroupRpcClient
} }
type GroupMemberIDsHash struct { type GroupMemberIDsHash struct {
@ -21,8 +20,7 @@ type GroupMemberIDsHash struct {
userIDs []string userIDs []string
} }
func NewGroupLocalCache(discov discoveryregistry.SvcDiscoveryRegistry) *GroupLocalCache { func NewGroupLocalCache(client *rpcclient.GroupRpcClient) *GroupLocalCache {
client := rpcclient.NewGroup(discov)
return &GroupLocalCache{ return &GroupLocalCache{
cache: make(map[string]GroupMemberIDsHash, 0), cache: make(map[string]GroupMemberIDsHash, 0),
client: client, client: client,

Loading…
Cancel
Save