From 67319031ea53718e3a3eb6ef63b1c7af5021a265 Mon Sep 17 00:00:00 2001 From: skiffer-git <44203734@qq.com> Date: Tue, 30 Aug 2022 21:22:14 +0800 Subject: [PATCH] log --- config/config.yaml | 6 +-- internal/msg_gateway/gate/logic.go | 4 +- internal/push/logic/init.go | 17 +++++--- internal/push/logic/push_to_client.go | 62 ++++++++++++++++++++++++++- 4 files changed, 75 insertions(+), 14 deletions(-) diff --git a/config/config.yaml b/config/config.yaml index 156b5a94e..55a96f0b8 100644 --- a/config/config.yaml +++ b/config/config.yaml @@ -17,9 +17,9 @@ mysql: dbMysqlDatabaseName: openIM_v2 #默认即可 dbTableName: eMsg #默认即可 dbMsgTableNum: 1 - dbMaxOpenConns: 2000 - dbMaxIdleConns: 100 - dbMaxLifeTime: 3600 + dbMaxOpenConns: 100 + dbMaxIdleConns: 10 + dbMaxLifeTime: 5 mongo: dbUri: ""#当dbUri值不为空则直接使用该值 diff --git a/internal/msg_gateway/gate/logic.go b/internal/msg_gateway/gate/logic.go index a6f3a1974..2ee9be712 100644 --- a/internal/msg_gateway/gate/logic.go +++ b/internal/msg_gateway/gate/logic.go @@ -153,6 +153,7 @@ func (ws *WServer) pullMsgBySeqListReq(conn *UserConn, m *Req) { ws.pullMsgBySeqListResp(conn, m, nReply) } } + func (ws *WServer) pullMsgBySeqListResp(conn *UserConn, m *Req, pb *sdk_ws.PullMessageBySeqListResp) { log.NewInfo(m.OperationID, "pullMsgBySeqListResp come here ", pb.String()) c, _ := proto.Marshal(pb) @@ -166,10 +167,9 @@ func (ws *WServer) pullMsgBySeqListResp(conn *UserConn, m *Req, pb *sdk_ws.PullM } log.NewInfo(m.OperationID, "pullMsgBySeqListResp all data is ", mReply.ReqIdentifier, mReply.MsgIncr, mReply.ErrCode, mReply.ErrMsg, len(mReply.Data)) - ws.sendMsg(conn, mReply) - } + func (ws *WServer) sendMsgReq(conn *UserConn, m *Req) { sendMsgAllCountLock.Lock() sendMsgAllCount++ diff --git a/internal/push/logic/init.go b/internal/push/logic/init.go index 6d6229dca..2a147fd27 100644 --- a/internal/push/logic/init.go +++ b/internal/push/logic/init.go @@ -16,22 +16,25 @@ import ( "Open_IM/pkg/common/kafka" "Open_IM/pkg/statistics" "fmt" + "sync" ) var ( - rpcServer RPCServer - pushCh PushConsumerHandler - pushTerminal []int32 - producer *kafka.Producer - offlinePusher pusher.OfflinePusher - successCount uint64 + rpcServer RPCServer + pushCh PushConsumerHandler + pushTerminal []int32 + producer *kafka.Producer + offlinePusher pusher.OfflinePusher + successCount uint64 + CacheGroupMemberUserIDList map[string]*GroupMemberUserIDListHash + CacheGroupMtx sync.RWMutex ) func Init(rpcPort int) { - rpcServer.Init(rpcPort) pushCh.Init() pushTerminal = []int32{constant.IOSPlatformID, constant.AndroidPlatformID} + CacheGroupMemberUserIDList = make(map[string]*GroupMemberUserIDListHash, 0) } func init() { producer = kafka.NewKafkaProducer(config.Config.Kafka.Ws2mschat.Addr, config.Config.Kafka.Ws2mschat.Topic) diff --git a/internal/push/logic/push_to_client.go b/internal/push/logic/push_to_client.go index 766704e1b..04c9ef664 100644 --- a/internal/push/logic/push_to_client.go +++ b/internal/push/logic/push_to_client.go @@ -11,6 +11,7 @@ import ( "Open_IM/pkg/common/config" "Open_IM/pkg/common/constant" "Open_IM/pkg/common/db" + rocksCache "Open_IM/pkg/common/db/rocks_cache" "Open_IM/pkg/common/log" "Open_IM/pkg/grpc-etcdv3/getcdv3" pbCache "Open_IM/pkg/proto/cache" @@ -21,9 +22,9 @@ import ( "Open_IM/pkg/utils" "context" "encoding/json" - "strings" - + "errors" "github.com/golang/protobuf/proto" + "strings" ) type OpenIMContent struct { @@ -38,8 +39,65 @@ type AtContent struct { IsAtSelf bool `json:"isAtSelf"` } +type GroupMemberUserIDListHash struct { + MemberListHash uint64 + UserIDList []string +} + //var grpcCons []*grpc.ClientConn +func GetGroupMemberUserIDList(groupID string, operationID string) ([]string, error) { + groupHashRemote, err := GetGroupMemberUserIDListHashFromRemote(groupID) + if err != nil { + CacheGroupMtx.Lock() + defer CacheGroupMtx.Unlock() + delete(CacheGroupMemberUserIDList, groupID) + log.Error(operationID, "GetGroupMemberUserIDListHashFromRemote failed ", err.Error(), groupID) + return nil, utils.Wrap(err, groupID) + } + + CacheGroupMtx.Lock() + defer CacheGroupMtx.Unlock() + groupInLocalCache, ok := CacheGroupMemberUserIDList[groupID] + if ok && groupInLocalCache.MemberListHash == groupHashRemote { + return groupInLocalCache.UserIDList, nil + } + + memberUserIDListRemote, err := GetGroupMemberUserIDListFromRemote(groupID, operationID) + if err != nil { + log.Error(operationID, "GetGroupMemberUserIDListFromRemote failed ", err.Error(), groupID) + return nil, utils.Wrap(err, groupID) + } + CacheGroupMemberUserIDList[groupID] = &GroupMemberUserIDListHash{MemberListHash: groupHashRemote, UserIDList: memberUserIDListRemote} + return memberUserIDListRemote, nil +} + +func GetGroupMemberUserIDListHashFromRemote(groupID string) (uint64, error) { + return rocksCache.GetGroupMemberListHashFromCache(groupID) +} + +func GetGroupMemberUserIDListFromRemote(groupID string, operationID string) ([]string, error) { + getGroupMemberIDListFromCacheReq := &pbCache.GetGroupMemberIDListFromCacheReq{OperationID: operationID, GroupID: groupID} + etcdConn := getcdv3.GetDefaultConn(config.Config.Etcd.EtcdSchema, strings.Join(config.Config.Etcd.EtcdAddr, ","), config.Config.RpcRegisterName.OpenImCacheName, operationID) + if etcdConn == nil { + errMsg := operationID + "getcdv3.GetDefaultConn == nil" + log.NewError(operationID, errMsg) + return nil, errors.New("errMsg") + } + client := pbCache.NewCacheClient(etcdConn) + cacheResp, err := client.GetGroupMemberIDListFromCache(context.Background(), getGroupMemberIDListFromCacheReq) + if err != nil { + log.NewError(operationID, "GetGroupMemberIDListFromCache rpc call failed ", err.Error()) + return nil, utils.Wrap(err, "GetGroupMemberIDListFromCache rpc call failed") + } + if cacheResp.CommonResp.ErrCode != 0 { + errMsg := operationID + "GetGroupMemberIDListFromCache rpc logic call failed " + cacheResp.CommonResp.ErrMsg + log.NewError(operationID, errMsg) + return nil, errors.New("errMsg") + } + return cacheResp.UserIDList, nil +} + func MsgToUser(pushMsg *pbPush.PushMsgReq) { var wsResult []*pbRelay.SingelMsgToUserResultList isOfflinePush := utils.GetSwitchFromOptions(pushMsg.MsgData.Options, constant.IsOfflinePush)