From a64b04650478d90b3767862b190a19ef32e33b24 Mon Sep 17 00:00:00 2001 From: withchao <993506633@qq.com> Date: Fri, 20 Oct 2023 17:25:58 +0800 Subject: [PATCH] fix: redis remove pipeline --- pkg/common/db/cache/msg.go | 277 +++++++++++++++++++++++++------------ 1 file changed, 190 insertions(+), 87 deletions(-) diff --git a/pkg/common/db/cache/msg.go b/pkg/common/db/cache/msg.go index 6726b343d..41618acc2 100644 --- a/pkg/common/db/cache/msg.go +++ b/pkg/common/db/cache/msg.go @@ -158,28 +158,41 @@ func (c *msgCache) getSeq(ctx context.Context, conversationID string, getkey fun } func (c *msgCache) getSeqs(ctx context.Context, items []string, getkey func(s string) string) (m map[string]int64, err error) { - pipe := c.rdb.Pipeline() - for _, v := range items { - if err := pipe.Get(ctx, getkey(v)).Err(); err != nil && err != redis.Nil { - return nil, errs.Wrap(err) - } - } - result, err := pipe.Exec(ctx) - if err != nil && err != redis.Nil { - return nil, errs.Wrap(err) - } m = make(map[string]int64, len(items)) - for i, v := range result { - seq := v.(*redis.StringCmd) - if seq.Err() != nil && seq.Err() != redis.Nil { - return nil, errs.Wrap(v.Err()) + for i, v := range items { + res, err := c.rdb.Get(ctx, getkey(v)).Result() + if err != nil && err != redis.Nil { + return nil, errs.Wrap(err) } - val := utils.StringToInt64(seq.Val()) + val := utils.StringToInt64(res) if val != 0 { m[items[i]] = val } } return m, nil + + //pipe := c.rdb.Pipeline() + //for _, v := range items { + // if err := pipe.Get(ctx, getkey(v)).Err(); err != nil && err != redis.Nil { + // return nil, errs.Wrap(err) + // } + //} + //result, err := pipe.Exec(ctx) + //if err != nil && err != redis.Nil { + // return nil, errs.Wrap(err) + //} + //m = make(map[string]int64, len(items)) + //for i, v := range result { + // seq := v.(*redis.StringCmd) + // if seq.Err() != nil && seq.Err() != redis.Nil { + // return nil, errs.Wrap(v.Err()) + // } + // val := utils.StringToInt64(seq.Val()) + // if val != 0 { + // m[items[i]] = val + // } + //} + //return m, nil } func (c *msgCache) SetMaxSeq(ctx context.Context, conversationID string, maxSeq int64) error { @@ -199,15 +212,21 @@ func (c *msgCache) SetMinSeq(ctx context.Context, conversationID string, minSeq } func (c *msgCache) setSeqs(ctx context.Context, seqs map[string]int64, getkey func(key string) string) error { - pipe := c.rdb.Pipeline() - for k, seq := range seqs { - err := pipe.Set(ctx, getkey(k), seq, 0).Err() - if err != nil { + for conversationID, seq := range seqs { + if err := c.rdb.Set(ctx, getkey(conversationID), seq, 0).Err(); err != nil { return errs.Wrap(err) } } - _, err := pipe.Exec(ctx) - return err + return nil + //pipe := c.rdb.Pipeline() + //for k, seq := range seqs { + // err := pipe.Set(ctx, getkey(k), seq, 0).Err() + // if err != nil { + // return errs.Wrap(err) + // } + //} + //_, err := pipe.Exec(ctx) + //return err } func (c *msgCache) SetMinSeqs(ctx context.Context, seqs map[string]int64) error { @@ -319,53 +338,84 @@ func (c *msgCache) allMessageCacheKey(conversationID string) string { } func (c *msgCache) GetMessagesBySeq(ctx context.Context, conversationID string, seqs []int64) (seqMsgs []*sdkws.MsgData, failedSeqs []int64, err error) { - pipe := c.rdb.Pipeline() - for _, v := range seqs { - // MESSAGE_CACHE:169.254.225.224_reliability1653387820_0_1 - key := c.getMessageCacheKey(conversationID, v) - if err := pipe.Get(ctx, key).Err(); err != nil && err != redis.Nil { - return nil, nil, err + for _, seq := range seqs { + res, err := c.rdb.Get(ctx, c.getMessageCacheKey(conversationID, seq)).Result() + if err != nil { + log.ZError(ctx, "GetMessagesBySeq failed", err, "conversationID", conversationID, "seq", seq) + failedSeqs = append(failedSeqs, seq) + continue } - } - result, err := pipe.Exec(ctx) - for i, v := range result { - cmd := v.(*redis.StringCmd) - if cmd.Err() != nil { - failedSeqs = append(failedSeqs, seqs[i]) - } else { - msg := sdkws.MsgData{} - err = msgprocessor.String2Pb(cmd.Val(), &msg) - if err == nil { - if msg.Status != constant.MsgDeleted { - seqMsgs = append(seqMsgs, &msg) - continue - } - } else { - log.ZWarn(ctx, "UnmarshalString failed", err, "conversationID", conversationID, "seq", seqs[i], "msg", cmd.Val()) - } - failedSeqs = append(failedSeqs, seqs[i]) + msg := sdkws.MsgData{} + if err = msgprocessor.String2Pb(res, &msg); err != nil { + log.ZError(ctx, "GetMessagesBySeq Unmarshal failed", err, "res", res, "conversationID", conversationID, "seq", seq) + failedSeqs = append(failedSeqs, seq) + continue } + if msg.Status == constant.MsgDeleted { + failedSeqs = append(failedSeqs, seq) + continue + } + seqMsgs = append(seqMsgs, &msg) } - return seqMsgs, failedSeqs, err + return + //pipe := c.rdb.Pipeline() + //for _, v := range seqs { + // // MESSAGE_CACHE:169.254.225.224_reliability1653387820_0_1 + // key := c.getMessageCacheKey(conversationID, v) + // if err := pipe.Get(ctx, key).Err(); err != nil && err != redis.Nil { + // return nil, nil, err + // } + //} + //result, err := pipe.Exec(ctx) + //for i, v := range result { + // cmd := v.(*redis.StringCmd) + // if cmd.Err() != nil { + // failedSeqs = append(failedSeqs, seqs[i]) + // } else { + // msg := sdkws.MsgData{} + // err = msgprocessor.String2Pb(cmd.Val(), &msg) + // if err == nil { + // if msg.Status != constant.MsgDeleted { + // seqMsgs = append(seqMsgs, &msg) + // continue + // } + // } else { + // log.ZWarn(ctx, "UnmarshalString failed", err, "conversationID", conversationID, "seq", seqs[i], "msg", cmd.Val()) + // } + // failedSeqs = append(failedSeqs, seqs[i]) + // } + //} + //return seqMsgs, failedSeqs, err } func (c *msgCache) SetMessageToCache(ctx context.Context, conversationID string, msgs []*sdkws.MsgData) (int, error) { - pipe := c.rdb.Pipeline() - var failedMsgs []*sdkws.MsgData for _, msg := range msgs { - key := c.getMessageCacheKey(conversationID, msg.Seq) s, err := msgprocessor.Pb2String(msg) if err != nil { return 0, errs.Wrap(err) } - err = pipe.Set(ctx, key, s, time.Duration(config.Config.MsgCacheTimeout)*time.Second).Err() - if err != nil { - failedMsgs = append(failedMsgs, msg) - log.ZWarn(ctx, "set msg 2 cache failed", err, "msg", failedMsgs) + key := c.getMessageCacheKey(conversationID, msg.Seq) + if err := c.rdb.Set(ctx, key, s, time.Duration(config.Config.MsgCacheTimeout)*time.Second).Err(); err != nil { + return 0, errs.Wrap(err) } } - _, err := pipe.Exec(ctx) - return len(failedMsgs), err + return len(msgs), nil + //pipe := c.rdb.Pipeline() + //var failedMsgs []*sdkws.MsgData + //for _, msg := range msgs { + // key := c.getMessageCacheKey(conversationID, msg.Seq) + // s, err := msgprocessor.Pb2String(msg) + // if err != nil { + // return 0, errs.Wrap(err) + // } + // err = pipe.Set(ctx, key, s, time.Duration(config.Config.MsgCacheTimeout)*time.Second).Err() + // if err != nil { + // failedMsgs = append(failedMsgs, msg) + // log.ZWarn(ctx, "set msg 2 cache failed", err, "msg", failedMsgs) + // } + //} + //_, err := pipe.Exec(ctx) + //return len(failedMsgs), err } func (c *msgCache) getMessageDelUserListKey(conversationID string, seq int64) string { @@ -377,27 +427,46 @@ func (c *msgCache) getUserDelList(conversationID, userID string) string { } func (c *msgCache) UserDeleteMsgs(ctx context.Context, conversationID string, seqs []int64, userID string) error { - pipe := c.rdb.Pipeline() for _, seq := range seqs { delUserListKey := c.getMessageDelUserListKey(conversationID, seq) userDelListKey := c.getUserDelList(conversationID, userID) - err := pipe.SAdd(ctx, delUserListKey, userID).Err() + err := c.rdb.SAdd(ctx, delUserListKey, userID).Err() if err != nil { return errs.Wrap(err) } - err = pipe.SAdd(ctx, userDelListKey, seq).Err() + err = c.rdb.SAdd(ctx, userDelListKey, seq).Err() if err != nil { return errs.Wrap(err) } - if err := pipe.Expire(ctx, delUserListKey, time.Duration(config.Config.MsgCacheTimeout)*time.Second).Err(); err != nil { + if err := c.rdb.Expire(ctx, delUserListKey, time.Duration(config.Config.MsgCacheTimeout)*time.Second).Err(); err != nil { return errs.Wrap(err) } - if err := pipe.Expire(ctx, userDelListKey, time.Duration(config.Config.MsgCacheTimeout)*time.Second).Err(); err != nil { + if err := c.rdb.Expire(ctx, userDelListKey, time.Duration(config.Config.MsgCacheTimeout)*time.Second).Err(); err != nil { return errs.Wrap(err) } } - _, err := pipe.Exec(ctx) - return errs.Wrap(err) + return nil + //pipe := c.rdb.Pipeline() + //for _, seq := range seqs { + // delUserListKey := c.getMessageDelUserListKey(conversationID, seq) + // userDelListKey := c.getUserDelList(conversationID, userID) + // err := pipe.SAdd(ctx, delUserListKey, userID).Err() + // if err != nil { + // return errs.Wrap(err) + // } + // err = pipe.SAdd(ctx, userDelListKey, seq).Err() + // if err != nil { + // return errs.Wrap(err) + // } + // if err := pipe.Expire(ctx, delUserListKey, time.Duration(config.Config.MsgCacheTimeout)*time.Second).Err(); err != nil { + // return errs.Wrap(err) + // } + // if err := pipe.Expire(ctx, userDelListKey, time.Duration(config.Config.MsgCacheTimeout)*time.Second).Err(); err != nil { + // return errs.Wrap(err) + // } + //} + //_, err := pipe.Exec(ctx) + //return errs.Wrap(err) } func (c *msgCache) GetUserDelList(ctx context.Context, userID, conversationID string) (seqs []int64, err error) { @@ -420,46 +489,74 @@ func (c *msgCache) DelUserDeleteMsgsList(ctx context.Context, conversationID str continue } if len(delUsers) > 0 { - pipe := c.rdb.Pipeline() var failedFlag bool for _, userID := range delUsers { - err = pipe.SRem(ctx, c.getUserDelList(conversationID, userID), seq).Err() + err = c.rdb.SRem(ctx, c.getUserDelList(conversationID, userID), seq).Err() if err != nil { failedFlag = true - log.ZWarn( - ctx, - "DelUserDeleteMsgsList failed", - err, - "conversationID", - conversationID, - "seq", - seq, - "userID", - userID, - ) + log.ZWarn(ctx, "DelUserDeleteMsgsList failed", err, "conversationID", conversationID, "seq", seq, "userID", userID) } } if !failedFlag { - if err := pipe.Del(ctx, c.getMessageDelUserListKey(conversationID, seq)).Err(); err != nil { + if err := c.rdb.Del(ctx, c.getMessageDelUserListKey(conversationID, seq)).Err(); err != nil { log.ZWarn(ctx, "DelUserDeleteMsgsList failed", err, "conversationID", conversationID, "seq", seq) } } - if _, err := pipe.Exec(ctx); err != nil { - log.ZError(ctx, "pipe exec failed", err, "conversationID", conversationID, "seq", seq) - } } } + //for _, seq := range seqs { + // delUsers, err := c.rdb.SMembers(ctx, c.getMessageDelUserListKey(conversationID, seq)).Result() + // if err != nil { + // log.ZWarn(ctx, "DelUserDeleteMsgsList failed", err, "conversationID", conversationID, "seq", seq) + // continue + // } + // if len(delUsers) > 0 { + // pipe := c.rdb.Pipeline() + // var failedFlag bool + // for _, userID := range delUsers { + // err = pipe.SRem(ctx, c.getUserDelList(conversationID, userID), seq).Err() + // if err != nil { + // failedFlag = true + // log.ZWarn( + // ctx, + // "DelUserDeleteMsgsList failed", + // err, + // "conversationID", + // conversationID, + // "seq", + // seq, + // "userID", + // userID, + // ) + // } + // } + // if !failedFlag { + // if err := pipe.Del(ctx, c.getMessageDelUserListKey(conversationID, seq)).Err(); err != nil { + // log.ZWarn(ctx, "DelUserDeleteMsgsList failed", err, "conversationID", conversationID, "seq", seq) + // } + // } + // if _, err := pipe.Exec(ctx); err != nil { + // log.ZError(ctx, "pipe exec failed", err, "conversationID", conversationID, "seq", seq) + // } + // } + //} } func (c *msgCache) DeleteMessages(ctx context.Context, conversationID string, seqs []int64) error { - pipe := c.rdb.Pipeline() for _, seq := range seqs { - if err := pipe.Del(ctx, c.getMessageCacheKey(conversationID, seq)).Err(); err != nil { + if err := c.rdb.Del(ctx, c.getMessageCacheKey(conversationID, seq)).Err(); err != nil { return errs.Wrap(err) } } - _, err := pipe.Exec(ctx) - return errs.Wrap(err) + return nil + //pipe := c.rdb.Pipeline() + //for _, seq := range seqs { + // if err := pipe.Del(ctx, c.getMessageCacheKey(conversationID, seq)).Err(); err != nil { + // return errs.Wrap(err) + // } + //} + //_, err := pipe.Exec(ctx) + //return errs.Wrap(err) } func (c *msgCache) CleanUpOneConversationAllMsg(ctx context.Context, conversationID string) error { @@ -470,14 +567,20 @@ func (c *msgCache) CleanUpOneConversationAllMsg(ctx context.Context, conversatio if err != nil { return errs.Wrap(err) } - pipe := c.rdb.Pipeline() for _, v := range vals { - if err := pipe.Del(ctx, v).Err(); err != nil { + if err := c.rdb.Del(ctx, v).Err(); err != nil { return errs.Wrap(err) } } - _, err = pipe.Exec(ctx) - return errs.Wrap(err) + return nil + //pipe := c.rdb.Pipeline() + //for _, v := range vals { + // if err := pipe.Del(ctx, v).Err(); err != nil { + // return errs.Wrap(err) + // } + //} + //_, err = pipe.Exec(ctx) + //return errs.Wrap(err) } func (c *msgCache) DelMsgFromCache(ctx context.Context, userID string, seqs []int64) error {