diff --git a/internal/push/push_rpc_server.go b/internal/push/push_rpc_server.go index 6890cc086..ca45a58fa 100644 --- a/internal/push/push_rpc_server.go +++ b/internal/push/push_rpc_server.go @@ -11,6 +11,7 @@ import ( "github.com/OpenIMSDK/Open-IM-Server/pkg/common/log" "github.com/OpenIMSDK/Open-IM-Server/pkg/discoveryregistry" pbPush "github.com/OpenIMSDK/Open-IM-Server/pkg/proto/push" + "github.com/OpenIMSDK/Open-IM-Server/pkg/rpcclient" "google.golang.org/grpc" ) @@ -26,7 +27,10 @@ func Start(client discoveryregistry.SvcDiscoveryRegistry, server *grpc.Server) e cacheModel := cache.NewMsgCacheModel(rdb) offlinePusher := NewOfflinePusher(cacheModel) database := controller.NewPushDatabase(cacheModel) - pusher := NewPusher(client, offlinePusher, database, localcache.NewGroupLocalCache(client), localcache.NewConversationLocalCache(client)) + groupRpcClient := rpcclient.NewGroupRpcClient(client) + conversationRpcClient := rpcclient.NewConversationRpcClient(client) + msgRpcClient := rpcclient.NewMessageRpcClient(client) + pusher := NewPusher(client, offlinePusher, database, localcache.NewGroupLocalCache(&groupRpcClient), localcache.NewConversationLocalCache(&conversationRpcClient), &conversationRpcClient, &groupRpcClient, &msgRpcClient) var wg sync.WaitGroup wg.Add(2) go func() { diff --git a/internal/push/push_to_client.go b/internal/push/push_to_client.go index df38f06cf..7f08ee0aa 100644 --- a/internal/push/push_to_client.go +++ b/internal/push/push_to_client.go @@ -30,7 +30,7 @@ type Pusher struct { offlinePusher offlinepush.OfflinePusher groupLocalCache *localcache.GroupLocalCache conversationLocalCache *localcache.ConversationLocalCache - msgClient *rpcclient.MessageRpcClient + msgRpcClient *rpcclient.MessageRpcClient conversationRpcClient *rpcclient.ConversationRpcClient groupRpcClient *rpcclient.GroupRpcClient successCount int @@ -39,19 +39,17 @@ type Pusher struct { var errNoOfflinePusher = errors.New("no offlinePusher is configured") func NewPusher(discov discoveryregistry.SvcDiscoveryRegistry, offlinePusher offlinepush.OfflinePusher, database controller.PushDatabase, - groupLocalCache *localcache.GroupLocalCache, conversationLocalCache *localcache.ConversationLocalCache) *Pusher { - msgClient := rpcclient.NewMessageRpcClient(discov) - conversationRpcClient := rpcclient.NewConversationRpcClient(discov) - groupRpcClient := rpcclient.NewGroupRpcClient(discov) + groupLocalCache *localcache.GroupLocalCache, conversationLocalCache *localcache.ConversationLocalCache, + conversationRpcClient *rpcclient.ConversationRpcClient, groupRpcClient *rpcclient.GroupRpcClient, msgRpcClient *rpcclient.MessageRpcClient) *Pusher { return &Pusher{ discov: discov, database: database, offlinePusher: offlinePusher, groupLocalCache: groupLocalCache, conversationLocalCache: conversationLocalCache, - msgClient: &msgClient, - conversationRpcClient: &conversationRpcClient, - groupRpcClient: &groupRpcClient, + msgRpcClient: msgRpcClient, + conversationRpcClient: conversationRpcClient, + groupRpcClient: groupRpcClient, } } @@ -70,7 +68,7 @@ func NewOfflinePusher(cache cache.MsgModel) offlinepush.OfflinePusher { func (p *Pusher) DeleteMemberAndSetConversationSeq(ctx context.Context, groupID string, userIDs []string) error { conevrsationID := utils.GetConversationIDBySessionType(constant.SuperGroupChatType, groupID) - maxSeq, err := p.msgClient.GetConversationMaxSeq(ctx, conevrsationID) + maxSeq, err := p.msgRpcClient.GetConversationMaxSeq(ctx, conevrsationID) if err != nil { return err } diff --git a/internal/rpc/msg/server.go b/internal/rpc/msg/server.go index 40e494fb1..c70d28e35 100644 --- a/internal/rpc/msg/server.go +++ b/internal/rpc/msg/server.go @@ -77,8 +77,8 @@ func Start(client discoveryregistry.SvcDiscoveryRegistry, server *grpc.Server) e MsgDatabase: msgDatabase, ExtendMsgDatabase: extendMsgDatabase, RegisterCenter: client, - GroupLocalCache: localcache.NewGroupLocalCache(client), - ConversationLocalCache: localcache.NewConversationLocalCache(client), + GroupLocalCache: localcache.NewGroupLocalCache(&groupRpcClient), + ConversationLocalCache: localcache.NewConversationLocalCache(&conversationClient), friend: &friendRpcClient, MessageLocker: NewLockerMessage(cacheModel), } diff --git a/pkg/common/db/localcache/conversation.go b/pkg/common/db/localcache/conversation.go index 3cb647217..3c188c90e 100644 --- a/pkg/common/db/localcache/conversation.go +++ b/pkg/common/db/localcache/conversation.go @@ -4,7 +4,6 @@ import ( "context" "sync" - "github.com/OpenIMSDK/Open-IM-Server/pkg/discoveryregistry" "github.com/OpenIMSDK/Open-IM-Server/pkg/proto/conversation" "github.com/OpenIMSDK/Open-IM-Server/pkg/rpcclient" ) @@ -13,7 +12,7 @@ type ConversationLocalCache struct { lock sync.Mutex superGroupRecvMsgNotNotifyUserIDs map[string]Hash conversationIDs map[string]Hash - client *rpcclient.Conversation + client *rpcclient.ConversationRpcClient } type Hash struct { @@ -21,11 +20,11 @@ type Hash struct { ids []string } -func NewConversationLocalCache(discov discoveryregistry.SvcDiscoveryRegistry) *ConversationLocalCache { +func NewConversationLocalCache(client *rpcclient.ConversationRpcClient) *ConversationLocalCache { return &ConversationLocalCache{ superGroupRecvMsgNotNotifyUserIDs: make(map[string]Hash), conversationIDs: make(map[string]Hash), - client: rpcclient.NewConversation(discov), + client: client, } } diff --git a/pkg/common/db/localcache/group.go b/pkg/common/db/localcache/group.go index 49443412c..ea66d122c 100644 --- a/pkg/common/db/localcache/group.go +++ b/pkg/common/db/localcache/group.go @@ -4,7 +4,6 @@ import ( "context" "sync" - "github.com/OpenIMSDK/Open-IM-Server/pkg/discoveryregistry" "github.com/OpenIMSDK/Open-IM-Server/pkg/errs" "github.com/OpenIMSDK/Open-IM-Server/pkg/proto/group" "github.com/OpenIMSDK/Open-IM-Server/pkg/rpcclient" @@ -13,7 +12,7 @@ import ( type GroupLocalCache struct { lock sync.Mutex cache map[string]GroupMemberIDsHash - client *rpcclient.Group + client *rpcclient.GroupRpcClient } type GroupMemberIDsHash struct { @@ -21,8 +20,7 @@ type GroupMemberIDsHash struct { userIDs []string } -func NewGroupLocalCache(discov discoveryregistry.SvcDiscoveryRegistry) *GroupLocalCache { - client := rpcclient.NewGroup(discov) +func NewGroupLocalCache(client *rpcclient.GroupRpcClient) *GroupLocalCache { return &GroupLocalCache{ cache: make(map[string]GroupMemberIDsHash, 0), client: client,