You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
Open-IM-Server/internal/rpc/msg/server.go

99 lines
3.7 KiB

package msg
import (
"context"
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/db/cache"
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/db/controller"
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/db/localcache"
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/db/tx"
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/db/unrelation"
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/prome"
"github.com/OpenIMSDK/Open-IM-Server/pkg/discoveryregistry"
"github.com/OpenIMSDK/Open-IM-Server/pkg/proto/msg"
"github.com/OpenIMSDK/Open-IM-Server/pkg/rpcclient"
"google.golang.org/grpc"
)
type MessageInterceptorChain []MessageInterceptorFunc
type msgServer struct {
RegisterCenter discoveryregistry.SvcDiscoveryRegistry
MsgDatabase controller.CommonMsgDatabase
ExtendMsgDatabase controller.ExtendMsgDatabase
Group *rpcclient.GroupClient
User *rpcclient.UserClient
Conversation *rpcclient.ConversationClient
friend *rpcclient.FriendClient
black *rpcclient.BlackClient
GroupLocalCache *localcache.GroupLocalCache
ConversationLocalCache *localcache.ConversationLocalCache
MessageLocker MessageLocker
Handlers MessageInterceptorChain
}
func (m *msgServer) addInterceptorHandler(interceptorFunc ...MessageInterceptorFunc) {
m.Handlers = append(m.Handlers, interceptorFunc...)
}
func (m *msgServer) execInterceptorHandler(ctx context.Context, req *msg.SendMsgReq) error {
for _, handler := range m.Handlers {
msgData, err := handler(ctx, req)
if err != nil {
return err
}
req.MsgData = msgData
}
return nil
}
func Start(client discoveryregistry.SvcDiscoveryRegistry, server *grpc.Server) error {
rdb, err := cache.NewRedis()
if err != nil {
return err
}
mongo, err := unrelation.NewMongo()
if err != nil {
return err
}
cacheModel := cache.NewMsgCacheModel(rdb)
msgDocModel := unrelation.NewMsgMongoDriver(mongo.GetDatabase())
extendMsgModel := unrelation.NewExtendMsgSetMongoDriver(mongo.GetDatabase())
extendMsgCacheModel := cache.NewExtendMsgSetCacheRedis(rdb, extendMsgModel, cache.GetDefaultOpt())
extendMsgDatabase := controller.NewExtendMsgDatabase(extendMsgModel, extendMsgCacheModel, tx.NewMongo(mongo.GetClient()))
msgDatabase := controller.NewCommonMsgDatabase(msgDocModel, cacheModel)
s := &msgServer{
Conversation: rpcclient.NewConversationClient(client),
User: rpcclient.NewUserClient(client),
Group: rpcclient.NewGroupClient(client),
MsgDatabase: msgDatabase,
ExtendMsgDatabase: extendMsgDatabase,
RegisterCenter: client,
GroupLocalCache: localcache.NewGroupLocalCache(client),
ConversationLocalCache: localcache.NewConversationLocalCache(client),
black: rpcclient.NewBlackClient(client),
friend: rpcclient.NewFriendClient(client),
MessageLocker: NewLockerMessage(cacheModel),
}
s.addInterceptorHandler(MessageHasReadEnabled, MessageModifyCallback)
s.initPrometheus()
msg.RegisterMsgServer(server, s)
return nil
}
func (m *msgServer) initPrometheus() {
prome.NewMsgPullFromRedisSuccessCounter()
prome.NewMsgPullFromRedisFailedCounter()
prome.NewMsgPullFromMongoSuccessCounter()
prome.NewMsgPullFromMongoFailedCounter()
prome.NewSingleChatMsgRecvSuccessCounter()
prome.NewGroupChatMsgRecvSuccessCounter()
prome.NewWorkSuperGroupChatMsgRecvSuccessCounter()
prome.NewSingleChatMsgProcessSuccessCounter()
prome.NewSingleChatMsgProcessFailedCounter()
prome.NewGroupChatMsgProcessSuccessCounter()
prome.NewGroupChatMsgProcessFailedCounter()
prome.NewWorkSuperGroupChatMsgProcessSuccessCounter()
prome.NewWorkSuperGroupChatMsgProcessFailedCounter()
}