feat: cache batch

pull/2608/head
icey-yu 5 months ago
parent 176eb00941
commit 7f8d26c36b

@ -185,21 +185,12 @@ func (c *ConsumerHandler) shouldPushOffline(_ context.Context, msg *sdkws.MsgDat
} }
func (c *ConsumerHandler) GetConnsAndOnlinePush(ctx context.Context, msg *sdkws.MsgData, pushToUserIDs []string) ([]*msggateway.SingleMsgToUserResults, error) { func (c *ConsumerHandler) GetConnsAndOnlinePush(ctx context.Context, msg *sdkws.MsgData, pushToUserIDs []string) ([]*msggateway.SingleMsgToUserResults, error) {
var (
onlineUserIDs []string onlineUserIDs, offlineUserIDs, err := c.onlineCache.GetUsersOnline(ctx, pushToUserIDs)
offlineUserIDs []string if err != nil {
) return nil, err
for _, userID := range pushToUserIDs {
online, err := c.onlineCache.GetUserOnline(ctx, userID)
if err != nil {
return nil, err
}
if online {
onlineUserIDs = append(onlineUserIDs, userID)
} else {
offlineUserIDs = append(offlineUserIDs, userID)
}
} }
log.ZDebug(ctx, "GetConnsAndOnlinePush online cache", "sendID", msg.SendID, "recvID", msg.RecvID, "groupID", msg.GroupID, "sessionType", msg.SessionType, "clientMsgID", msg.ClientMsgID, "serverMsgID", msg.ServerMsgID, "offlineUserIDs", offlineUserIDs, "onlineUserIDs", onlineUserIDs) log.ZDebug(ctx, "GetConnsAndOnlinePush online cache", "sendID", msg.SendID, "recvID", msg.RecvID, "groupID", msg.GroupID, "sessionType", msg.SessionType, "clientMsgID", msg.ClientMsgID, "serverMsgID", msg.ServerMsgID, "offlineUserIDs", offlineUserIDs, "onlineUserIDs", onlineUserIDs)
var result []*msggateway.SingleMsgToUserResults var result []*msggateway.SingleMsgToUserResults
if len(onlineUserIDs) > 0 { if len(onlineUserIDs) > 0 {
@ -398,6 +389,7 @@ func (c *ConsumerHandler) getOfflinePushInfos(msg *sdkws.MsgData) (title, conten
} }
return return
} }
func (c *ConsumerHandler) DeleteMemberAndSetConversationSeq(ctx context.Context, groupID string, userIDs []string) error { func (c *ConsumerHandler) DeleteMemberAndSetConversationSeq(ctx context.Context, groupID string, userIDs []string) error {
conversationID := msgprocessor.GetConversationIDBySessionType(constant.ReadGroupChatType, groupID) conversationID := msgprocessor.GetConversationIDBySessionType(constant.ReadGroupChatType, groupID)
maxSeq, err := c.msgRpcClient.GetConversationMaxSeq(ctx, conversationID) maxSeq, err := c.msgRpcClient.GetConversationMaxSeq(ctx, conversationID)
@ -406,6 +398,7 @@ func (c *ConsumerHandler) DeleteMemberAndSetConversationSeq(ctx context.Context,
} }
return c.conversationRpcClient.SetConversationMaxSeq(ctx, userIDs, conversationID, maxSeq) return c.conversationRpcClient.SetConversationMaxSeq(ctx, userIDs, conversationID, maxSeq)
} }
func unmarshalNotificationElem(bytes []byte, t any) error { func unmarshalNotificationElem(bytes []byte, t any) error {
var notification sdkws.NotificationElem var notification sdkws.NotificationElem
if err := json.Unmarshal(bytes, &notification); err != nil { if err := json.Unmarshal(bytes, &notification); err != nil {

@ -88,6 +88,22 @@ func (x *LayLRU[K, V]) Get(key K, fetch func() (V, error)) (V, error) {
return v.value, v.err return v.value, v.err
} }
func (x *LayLRU[K, V]) GetBatch(keys []K, fetch func() ([]V, error)) ([]V, error) {
return nil, nil
}
func (x *LayLRU[K, V]) SetHasBatch(data map[K]V) bool {
x.lock.Lock()
defer x.lock.Unlock()
for key, value := range data {
if x.core.Contains(key) {
x.core.Add(key, &layLruItem[V]{value: value, expires: time.Now().Add(x.successTTL).UnixMilli()})
return true
}
}
return false
}
//func (x *LayLRU[K, V]) Set(key K, value V) { //func (x *LayLRU[K, V]) Set(key K, value V) {
// x.lock.Lock() // x.lock.Lock()
// x.core.Add(key, &layLruItem[V]{value: value, expires: time.Now().Add(x.successTTL).UnixMilli()}) // x.core.Add(key, &layLruItem[V]{value: value, expires: time.Now().Add(x.successTTL).UnixMilli()})

@ -77,6 +77,15 @@ func (o *OnlineCache) GetUserOnline(ctx context.Context, userID string) (bool, e
return len(platformIDs) > 0, nil return len(platformIDs) > 0, nil
} }
func (o *OnlineCache) GetUsersOnline(ctx context.Context, usersID []string) ([]string, []string, error) {
var (
onlineUserIDS []string
offlineUserIDs []string
)
return onlineUserIDS, offlineUserIDs, nil
}
//func (o *OnlineCache) GetUsersOnline(ctx context.Context, userIDs []string) ([]string, error) { //func (o *OnlineCache) GetUsersOnline(ctx context.Context, userIDs []string) ([]string, error) {
// onlineUserIDs := make([]string, 0, len(userIDs)) // onlineUserIDs := make([]string, 0, len(userIDs))
// for _, userID := range userIDs { // for _, userID := range userIDs {

Loading…
Cancel
Save