redis msg cache

pull/3007/head
withchao 9 months ago
parent 2d9a945791
commit ca8d55138b

@ -77,27 +77,13 @@ func (mc *OnlineHistoryMongoConsumerHandler) handleChatWs2Mongo(ctx context.Cont
for _, msg := range msgFromMQ.MsgData { for _, msg := range msgFromMQ.MsgData {
seqs = append(seqs, msg.Seq) seqs = append(seqs, msg.Seq)
} }
err = mc.msgTransferDatabase.DeleteMessagesFromCache(ctx, msgFromMQ.ConversationID, seqs)
if err != nil {
log.ZError(
ctx,
"remove cache msg from redis err",
err,
"msg",
msgFromMQ.MsgData,
"conversationID",
msgFromMQ.ConversationID,
)
}
} }
func (*OnlineHistoryMongoConsumerHandler) Setup(_ sarama.ConsumerGroupSession) error { return nil } func (*OnlineHistoryMongoConsumerHandler) Setup(_ sarama.ConsumerGroupSession) error { return nil }
func (*OnlineHistoryMongoConsumerHandler) Cleanup(_ sarama.ConsumerGroupSession) error { return nil } func (*OnlineHistoryMongoConsumerHandler) Cleanup(_ sarama.ConsumerGroupSession) error { return nil }
func (mc *OnlineHistoryMongoConsumerHandler) ConsumeClaim( func (mc *OnlineHistoryMongoConsumerHandler) ConsumeClaim(sess sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error { // an instance in the consumer group
sess sarama.ConsumerGroupSession,
claim sarama.ConsumerGroupClaim,
) error { // an instance in the consumer group
log.ZDebug(context.Background(), "online new session msg come", "highWaterMarkOffset", log.ZDebug(context.Background(), "online new session msg come", "highWaterMarkOffset",
claim.HighWaterMarkOffset(), "topic", claim.Topic(), "partition", claim.Partition()) claim.HighWaterMarkOffset(), "topic", claim.Topic(), "partition", claim.Partition())
for msg := range claim.Messages() { for msg := range claim.Messages() {

@ -25,4 +25,5 @@ type MsgCache interface {
GetMessageBySeqs(ctx context.Context, conversationID string, seqs []int64) ([]*model.MsgInfoModel, error) GetMessageBySeqs(ctx context.Context, conversationID string, seqs []int64) ([]*model.MsgInfoModel, error)
DelMessageBySeqs(ctx context.Context, conversationID string, seqs []int64) error DelMessageBySeqs(ctx context.Context, conversationID string, seqs []int64) error
SetMessageBySeqs(ctx context.Context, conversationID string, msgs []*model.MsgDataModel) error
} }

@ -2,6 +2,7 @@ package redis
import ( import (
"context" "context"
"encoding/json"
"github.com/dtm-labs/rockscache" "github.com/dtm-labs/rockscache"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache" "github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache/cachekey" "github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache/cachekey"
@ -77,3 +78,19 @@ func (c *msgCache) DelMessageBySeqs(ctx context.Context, conversationID string,
} }
return nil return nil
} }
func (c *msgCache) SetMessageBySeqs(ctx context.Context, conversationID string, msgs []*model.MsgDataModel) error {
for _, msg := range msgs {
if msg == nil || msg.Seq <= 0 {
continue
}
data, err := json.Marshal(msg)
if err != nil {
return err
}
if err := c.rcClient.RawSet(ctx, cachekey.GetMsgCacheKey(conversationID, msg.Seq), string(data), msgCacheTimeout); err != nil {
return err
}
}
return nil
}

@ -2,7 +2,9 @@ package controller
import ( import (
"context" "context"
"github.com/openimsdk/open-im-server/v3/pkg/common/convert"
"github.com/openimsdk/protocol/constant" "github.com/openimsdk/protocol/constant"
"github.com/openimsdk/tools/utils/datautil"
"github.com/openimsdk/open-im-server/v3/pkg/common/config" "github.com/openimsdk/open-im-server/v3/pkg/common/config"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache" "github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache"
@ -252,6 +254,9 @@ func (db *msgTransferDatabase) BatchInsertChat2Cache(ctx context.Context, conver
if err := db.msgCache.DelMessageBySeqs(ctx, conversationID, seqs); err != nil { if err := db.msgCache.DelMessageBySeqs(ctx, conversationID, seqs); err != nil {
return 0, false, nil, err return 0, false, nil, err
} }
if err := db.msgCache.SetMessageBySeqs(ctx, conversationID, datautil.Slice(msgs, convert.MsgPb2DB)); err != nil {
return 0, false, nil, err
}
return lastMaxSeq, isNew, userSeqMap, nil return lastMaxSeq, isNew, userSeqMap, nil
} }

Loading…
Cancel
Save