perf: add concurrency and pipeline mode for redis cache

Signed-off-by: rfyiamcool <rfyiamcool@163.com>
pull/1338/head
rfyiamcool 2 years ago
parent 62e9980f3c
commit b9c18a601e
No known key found for this signature in database
GPG Key ID: EBA61C4D83B4DC5C

@ -21,6 +21,7 @@ import (
"time" "time"
"github.com/dtm-labs/rockscache" "github.com/dtm-labs/rockscache"
"golang.org/x/sync/errgroup"
unrelationtb "github.com/openimsdk/open-im-server/v3/pkg/common/db/table/unrelation" unrelationtb "github.com/openimsdk/open-im-server/v3/pkg/common/db/table/unrelation"
@ -62,6 +63,8 @@ const (
uidPidToken = "UID_PID_TOKEN_STATUS:" uidPidToken = "UID_PID_TOKEN_STATUS:"
) )
var concurrentLimit = 3
type SeqCache interface { type SeqCache interface {
SetMaxSeq(ctx context.Context, conversationID string, maxSeq int64) error SetMaxSeq(ctx context.Context, conversationID string, maxSeq int64) error
GetMaxSeqs(ctx context.Context, conversationIDs []string) (map[string]int64, error) GetMaxSeqs(ctx context.Context, conversationIDs []string) (map[string]int64, error)
@ -345,85 +348,165 @@ func (c *msgCache) allMessageCacheKey(conversationID string) string {
} }
func (c *msgCache) GetMessagesBySeq(ctx context.Context, conversationID string, seqs []int64) (seqMsgs []*sdkws.MsgData, failedSeqs []int64, err error) { func (c *msgCache) GetMessagesBySeq(ctx context.Context, conversationID string, seqs []int64) (seqMsgs []*sdkws.MsgData, failedSeqs []int64, err error) {
if config.Config.Redis.EnablePipeline {
return c.PipeGetMessagesBySeq(ctx, conversationID, seqs)
}
return c.ParallelGetMessagesBySeq(ctx, conversationID, seqs)
}
func (c *msgCache) PipeGetMessagesBySeq(ctx context.Context, conversationID string, seqs []int64) (seqMsgs []*sdkws.MsgData, failedSeqs []int64, err error) {
pipe := c.rdb.Pipeline()
results := []*redis.StringCmd{}
for _, seq := range seqs { for _, seq := range seqs {
res, err := c.rdb.Get(ctx, c.getMessageCacheKey(conversationID, seq)).Result() results = append(results, pipe.Get(ctx, c.getMessageCacheKey(conversationID, seq)))
if err != nil { }
log.ZError(ctx, "GetMessagesBySeq failed", err, "conversationID", conversationID, "seq", seq)
_, err = pipe.Exec(ctx)
if err != nil && err != redis.Nil {
return seqMsgs, failedSeqs, errs.Wrap(err, "pipe.get")
}
for idx, res := range results {
seq := seqs[idx]
if res.Err() != nil {
log.ZError(ctx, "GetMessagesBySeq failed", err, "conversationID", conversationID, "seq", seq, "err", res.Err())
failedSeqs = append(failedSeqs, seq) failedSeqs = append(failedSeqs, seq)
continue continue
} }
msg := sdkws.MsgData{} msg := sdkws.MsgData{}
if err = msgprocessor.String2Pb(res, &msg); err != nil { if err = msgprocessor.String2Pb(res.Val(), &msg); err != nil {
log.ZError(ctx, "GetMessagesBySeq Unmarshal failed", err, "res", res, "conversationID", conversationID, "seq", seq) log.ZError(ctx, "GetMessagesBySeq Unmarshal failed", err, "res", res, "conversationID", conversationID, "seq", seq)
failedSeqs = append(failedSeqs, seq) failedSeqs = append(failedSeqs, seq)
continue continue
} }
if msg.Status == constant.MsgDeleted { if msg.Status == constant.MsgDeleted {
failedSeqs = append(failedSeqs, seq) failedSeqs = append(failedSeqs, seq)
continue continue
} }
seqMsgs = append(seqMsgs, &msg) seqMsgs = append(seqMsgs, &msg)
} }
return return
//pipe := c.rdb.Pipeline() }
//for _, v := range seqs {
// // MESSAGE_CACHE:169.254.225.224_reliability1653387820_0_1 func (c *msgCache) ParallelGetMessagesBySeq(ctx context.Context, conversationID string, seqs []int64) (seqMsgs []*sdkws.MsgData, failedSeqs []int64, err error) {
// key := c.getMessageCacheKey(conversationID, v) type entry struct {
// if err := pipe.Get(ctx, key).Err(); err != nil && err != redis.Nil { err error
// return nil, nil, err msg *sdkws.MsgData
// } }
//}
//result, err := pipe.Exec(ctx) wg := errgroup.Group{}
//for i, v := range result { wg.SetLimit(concurrentLimit)
// cmd := v.(*redis.StringCmd)
// if cmd.Err() != nil { results := make([]entry, len(seqs)) // set slice len/cap to length of seqs.
// failedSeqs = append(failedSeqs, seqs[i]) for idx, seq := range seqs {
// } else { // closure safe var
// msg := sdkws.MsgData{} idx := idx
// err = msgprocessor.String2Pb(cmd.Val(), &msg) seq := seq
// if err == nil {
// if msg.Status != constant.MsgDeleted { wg.Go(func() error {
// seqMsgs = append(seqMsgs, &msg) res, err := c.rdb.Get(ctx, c.getMessageCacheKey(conversationID, seq)).Result()
// continue if err != nil {
// } log.ZError(ctx, "GetMessagesBySeq failed", err, "conversationID", conversationID, "seq", seq)
// } else { results[idx] = entry{err: err}
// log.ZWarn(ctx, "UnmarshalString failed", err, "conversationID", conversationID, "seq", seqs[i], "msg", cmd.Val()) return nil
// } }
// failedSeqs = append(failedSeqs, seqs[i])
// } msg := sdkws.MsgData{}
//} if err = msgprocessor.String2Pb(res, &msg); err != nil {
//return seqMsgs, failedSeqs, err log.ZError(ctx, "GetMessagesBySeq Unmarshal failed", err, "res", res, "conversationID", conversationID, "seq", seq)
results[idx] = entry{err: err}
return nil
}
if msg.Status == constant.MsgDeleted {
results[idx] = entry{err: err}
return nil
}
results[idx] = entry{msg: &msg}
return nil
})
}
_ = wg.Wait()
for idx, res := range results {
if res.err != nil {
failedSeqs = append(failedSeqs, seqs[idx])
continue
}
seqMsgs = append(seqMsgs, res.msg)
}
return
} }
func (c *msgCache) SetMessageToCache(ctx context.Context, conversationID string, msgs []*sdkws.MsgData) (int, error) { func (c *msgCache) SetMessageToCache(ctx context.Context, conversationID string, msgs []*sdkws.MsgData) (int, error) {
if config.Config.Redis.EnablePipeline {
return c.PipeSetMessageToCache(ctx, conversationID, msgs)
}
return c.ParallelSetMessageToCache(ctx, conversationID, msgs)
}
func (c *msgCache) PipeSetMessageToCache(ctx context.Context, conversationID string, msgs []*sdkws.MsgData) (int, error) {
pipe := c.rdb.Pipeline()
for _, msg := range msgs { for _, msg := range msgs {
s, err := msgprocessor.Pb2String(msg) s, err := msgprocessor.Pb2String(msg)
if err != nil { if err != nil {
return 0, errs.Wrap(err, "pb.marshal")
}
key := c.getMessageCacheKey(conversationID, msg.Seq)
_ = pipe.Set(ctx, key, s, time.Duration(config.Config.MsgCacheTimeout)*time.Second)
}
results, err := pipe.Exec(ctx)
if err != nil {
return 0, errs.Wrap(err, "pipe.set")
}
for _, res := range results {
if res.Err() != nil {
return 0, errs.Wrap(err) return 0, errs.Wrap(err)
} }
}
return len(msgs), nil
}
func (c *msgCache) ParallelSetMessageToCache(ctx context.Context, conversationID string, msgs []*sdkws.MsgData) (int, error) {
wg := errgroup.Group{}
wg.SetLimit(concurrentLimit)
for _, msg := range msgs {
msg := msg // closure safe var
wg.Go(func() error {
s, err := msgprocessor.Pb2String(msg)
if err != nil {
return errs.Wrap(err)
}
key := c.getMessageCacheKey(conversationID, msg.Seq) key := c.getMessageCacheKey(conversationID, msg.Seq)
if err := c.rdb.Set(ctx, key, s, time.Duration(config.Config.MsgCacheTimeout)*time.Second).Err(); err != nil { if err := c.rdb.Set(ctx, key, s, time.Duration(config.Config.MsgCacheTimeout)*time.Second).Err(); err != nil {
return 0, errs.Wrap(err) return errs.Wrap(err)
}
return nil
})
} }
err := wg.Wait()
if err != nil {
return 0, err
} }
return len(msgs), nil return len(msgs), nil
//pipe := c.rdb.Pipeline()
//var failedMsgs []*sdkws.MsgData
//for _, msg := range msgs {
// key := c.getMessageCacheKey(conversationID, msg.Seq)
// s, err := msgprocessor.Pb2String(msg)
// if err != nil {
// return 0, errs.Wrap(err)
// }
// err = pipe.Set(ctx, key, s, time.Duration(config.Config.MsgCacheTimeout)*time.Second).Err()
// if err != nil {
// failedMsgs = append(failedMsgs, msg)
// log.ZWarn(ctx, "set msg 2 cache failed", err, "msg", failedMsgs)
// }
//}
//_, err := pipe.Exec(ctx)
//return len(failedMsgs), err
} }
func (c *msgCache) getMessageDelUserListKey(conversationID string, seq int64) string { func (c *msgCache) getMessageDelUserListKey(conversationID string, seq int64) string {

Loading…
Cancel
Save