redis msg cache

pull/3007/head
withchao 9 months ago
parent 4d1655e32c
commit 9e64d8a2ca

@ -92,11 +92,11 @@ func Start(ctx context.Context, index int, config *Config) error {
} }
client.AddOption(mw.GrpcClient(), grpc.WithTransportCredentials(insecure.NewCredentials()), client.AddOption(mw.GrpcClient(), grpc.WithTransportCredentials(insecure.NewCredentials()),
grpc.WithDefaultServiceConfig(fmt.Sprintf(`{"LoadBalancingPolicy": "%s"}`, "round_robin"))) grpc.WithDefaultServiceConfig(fmt.Sprintf(`{"LoadBalancingPolicy": "%s"}`, "round_robin")))
msgModel := redis.NewMsgCache(rdb)
msgDocModel, err := mgo.NewMsgMongo(mgocli.GetDB()) msgDocModel, err := mgo.NewMsgMongo(mgocli.GetDB())
if err != nil { if err != nil {
return err return err
} }
msgModel := redis.NewMsgCache(rdb, msgDocModel)
seqConversation, err := mgo.NewSeqConversationMongo(mgocli.GetDB()) seqConversation, err := mgo.NewSeqConversationMongo(mgocli.GetDB())
if err != nil { if err != nil {
return err return err

@ -89,7 +89,7 @@ func Start(ctx context.Context, config *Config, client discovery.SvcDiscoveryReg
if err != nil { if err != nil {
return err return err
} }
msgModel := redis.NewMsgCache(rdb) msgModel := redis.NewMsgCache(rdb, msgDocModel)
seqConversation, err := mgo.NewSeqConversationMongo(mgocli.GetDB()) seqConversation, err := mgo.NewSeqConversationMongo(mgocli.GetDB())
if err != nil { if err != nil {
return err return err

@ -19,17 +19,12 @@ import (
) )
const ( const (
messageCache = "MESSAGE_CACHE:"
sendMsgFailedFlag = "SEND_MSG_FAILED_FLAG:" sendMsgFailedFlag = "SEND_MSG_FAILED_FLAG:"
messageCacheV2 = "MESSAGE_CACHE_V2:" messageCache = "MSG_CACHE:"
) )
func GetMessageCacheKey(conversationID string, seq int64) string { func GetMsgCacheKey(conversationID string, seq int64) string {
return messageCache + conversationID + "_" + strconv.Itoa(int(seq)) return messageCache + conversationID + ":" + strconv.Itoa(int(seq))
}
func GetMessageCacheKeyV2(conversationID string, seq int64) string {
return messageCacheV2 + conversationID + "_" + strconv.Itoa(int(seq))
} }
func GetSendMsgKey(id string) string { func GetSendMsgKey(id string) string {

@ -17,13 +17,9 @@ package cache
import ( import (
"context" "context"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/model" "github.com/openimsdk/open-im-server/v3/pkg/common/storage/model"
"github.com/openimsdk/protocol/sdkws"
) )
type MsgCache interface { type MsgCache interface {
GetMessagesBySeq(ctx context.Context, conversationID string, seqs []int64) (seqMsg []*sdkws.MsgData, failedSeqList []int64, err error)
SetMessagesToCache(ctx context.Context, conversationID string, msgs []*sdkws.MsgData) (int, error)
DeleteMessagesFromCache(ctx context.Context, conversationID string, seqs []int64) error
SetSendMsgStatus(ctx context.Context, id string, status int32) error SetSendMsgStatus(ctx context.Context, id string, status int32) error
GetSendMsgStatus(ctx context.Context, id string) (int32, error) GetSendMsgStatus(ctx context.Context, id string) (int32, error)

@ -7,8 +7,6 @@ import (
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache/cachekey" "github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache/cachekey"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/database" "github.com/openimsdk/open-im-server/v3/pkg/common/storage/database"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/model" "github.com/openimsdk/open-im-server/v3/pkg/common/storage/model"
"github.com/openimsdk/open-im-server/v3/pkg/msgprocessor"
"github.com/openimsdk/protocol/sdkws"
"github.com/openimsdk/tools/errs" "github.com/openimsdk/tools/errs"
"github.com/openimsdk/tools/utils/datautil" "github.com/openimsdk/tools/utils/datautil"
"github.com/redis/go-redis/v9" "github.com/redis/go-redis/v9"
@ -18,61 +16,24 @@ import (
// msgCacheTimeout is expiration time of message cache, 86400 seconds // msgCacheTimeout is expiration time of message cache, 86400 seconds
const msgCacheTimeout = time.Hour * 24 const msgCacheTimeout = time.Hour * 24
func NewMsgCache(client redis.UniversalClient) cache.MsgCache { func NewMsgCache(client redis.UniversalClient, db database.Msg) cache.MsgCache {
return &msgCache{rdb: client} return &msgCache{
rdb: client,
rcClient: rockscache.NewClient(client, *GetRocksCacheOptions()),
msgDocDatabase: db,
}
} }
type msgCache struct { type msgCache struct {
rdb redis.UniversalClient rdb redis.UniversalClient
rcClient *rockscache.Client rcClient *rockscache.Client
msgDocDatabase database.Msg msgDocDatabase database.Msg
msgTable model.MsgDocModel
}
func (c *msgCache) getMessageCacheKey(conversationID string, seq int64) string {
return cachekey.GetMessageCacheKey(conversationID, seq)
} }
func (c *msgCache) getSendMsgKey(id string) string { func (c *msgCache) getSendMsgKey(id string) string {
return cachekey.GetSendMsgKey(id) return cachekey.GetSendMsgKey(id)
} }
func (c *msgCache) SetMessagesToCache(ctx context.Context, conversationID string, msgs []*sdkws.MsgData) (int, error) {
msgMap := datautil.SliceToMap(msgs, func(msg *sdkws.MsgData) string {
return c.getMessageCacheKey(conversationID, msg.Seq)
})
keys := datautil.Slice(msgs, func(msg *sdkws.MsgData) string {
return c.getMessageCacheKey(conversationID, msg.Seq)
})
err := ProcessKeysBySlot(ctx, c.rdb, keys, func(ctx context.Context, slot int64, keys []string) error {
var values []string
for _, key := range keys {
if msg, ok := msgMap[key]; ok {
s, err := msgprocessor.Pb2String(msg)
if err != nil {
return err
}
values = append(values, s)
}
}
return LuaSetBatchWithCommonExpire(ctx, c.rdb, keys, values, int(msgCacheTimeout/time.Second))
})
if err != nil {
return 0, err
}
return len(msgs), nil
}
func (c *msgCache) DeleteMessagesFromCache(ctx context.Context, conversationID string, seqs []int64) error {
var keys []string
for _, seq := range seqs {
keys = append(keys, c.getMessageCacheKey(conversationID, seq))
}
return ProcessKeysBySlot(ctx, c.rdb, keys, func(ctx context.Context, slot int64, keys []string) error {
return LuaDeleteBatch(ctx, c.rdb, keys)
})
}
func (c *msgCache) SetSendMsgStatus(ctx context.Context, id string, status int32) error { func (c *msgCache) SetSendMsgStatus(ctx context.Context, id string, status int32) error {
return errs.Wrap(c.rdb.Set(ctx, c.getSendMsgKey(id), status, time.Hour*24).Err()) return errs.Wrap(c.rdb.Set(ctx, c.getSendMsgKey(id), status, time.Hour*24).Err())
} }
@ -82,49 +43,12 @@ func (c *msgCache) GetSendMsgStatus(ctx context.Context, id string) (int32, erro
return int32(result), errs.Wrap(err) return int32(result), errs.Wrap(err)
} }
func (c *msgCache) GetMessagesBySeq(ctx context.Context, conversationID string, seqs []int64) (seqMsgs []*sdkws.MsgData, failedSeqs []int64, err error) {
var keys []string
keySeqMap := make(map[string]int64, 10)
for _, seq := range seqs {
key := c.getMessageCacheKey(conversationID, seq)
keys = append(keys, key)
keySeqMap[key] = seq
}
err = ProcessKeysBySlot(ctx, c.rdb, keys, func(ctx context.Context, slot int64, keys []string) error {
result, err := LuaGetBatch(ctx, c.rdb, keys)
if err != nil {
return err
}
for i, value := range result {
seq := keySeqMap[keys[i]]
if value == nil {
failedSeqs = append(failedSeqs, seq)
continue
}
msg := &sdkws.MsgData{}
msgString, ok := value.(string)
if !ok || msgprocessor.String2Pb(msgString, msg) != nil {
failedSeqs = append(failedSeqs, seq)
continue
}
seqMsgs = append(seqMsgs, msg)
}
return nil
})
if err != nil {
return nil, nil, err
}
return seqMsgs, failedSeqs, nil
}
func (c *msgCache) GetMessageBySeqs(ctx context.Context, conversationID string, seqs []int64) ([]*model.MsgInfoModel, error) { func (c *msgCache) GetMessageBySeqs(ctx context.Context, conversationID string, seqs []int64) ([]*model.MsgInfoModel, error) {
if len(seqs) == 0 { if len(seqs) == 0 {
return nil, nil return nil, nil
} }
getKey := func(seq int64) string { getKey := func(seq int64) string {
return cachekey.GetMessageCacheKeyV2(conversationID, seq) return cachekey.GetMsgCacheKey(conversationID, seq)
} }
getMsgID := func(msg *model.MsgInfoModel) int64 { getMsgID := func(msg *model.MsgInfoModel) int64 {
return msg.Msg.Seq return msg.Msg.Seq
@ -140,7 +64,7 @@ func (c *msgCache) DelMessageBySeqs(ctx context.Context, conversationID string,
return nil return nil
} }
keys := datautil.Slice(seqs, func(seq int64) string { keys := datautil.Slice(seqs, func(seq int64) string {
return cachekey.GetMessageCacheKeyV2(conversationID, seq) return cachekey.GetMsgCacheKey(conversationID, seq)
}) })
slotKeys, err := groupKeysBySlot(ctx, getRocksCacheRedisClient(c.rcClient), keys) slotKeys, err := groupKeysBySlot(ctx, getRocksCacheRedisClient(c.rcClient), keys)
if err != nil { if err != nil {

@ -1,133 +0,0 @@
// Copyright © 2023 OpenIM. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package redis
import (
"context"
"fmt"
"github.com/openimsdk/protocol/sdkws"
"github.com/redis/go-redis/v9"
"github.com/stretchr/testify/assert"
"google.golang.org/protobuf/proto"
"testing"
)
func Test_msgCache_SetMessagesToCache(t *testing.T) {
type fields struct {
rdb redis.UniversalClient
}
type args struct {
ctx context.Context
conversationID string
msgs []*sdkws.MsgData
}
tests := []struct {
name string
fields fields
args args
want int
wantErr assert.ErrorAssertionFunc
}{
{"test1", fields{rdb: redis.NewClient(&redis.Options{Addr: "localhost:16379", Username: "", Password: "openIM123", DB: 0})}, args{context.Background(),
"cid", []*sdkws.MsgData{{Seq: 1}, {Seq: 2}, {Seq: 3}}}, 3, assert.NoError},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
c := &msgCache{
rdb: tt.fields.rdb,
}
got, err := c.SetMessagesToCache(tt.args.ctx, tt.args.conversationID, tt.args.msgs)
if !tt.wantErr(t, err, fmt.Sprintf("SetMessagesToCache(%v, %v, %v)", tt.args.ctx, tt.args.conversationID, tt.args.msgs)) {
return
}
assert.Equalf(t, tt.want, got, "SetMessagesToCache(%v, %v, %v)", tt.args.ctx, tt.args.conversationID, tt.args.msgs)
})
}
}
func Test_msgCache_GetMessagesBySeq(t *testing.T) {
type fields struct {
rdb redis.UniversalClient
}
type args struct {
ctx context.Context
conversationID string
seqs []int64
}
var failedSeq []int64
tests := []struct {
name string
fields fields
args args
wantSeqMsgs []*sdkws.MsgData
wantFailedSeqs []int64
wantErr assert.ErrorAssertionFunc
}{
{"test1", fields{rdb: redis.NewClient(&redis.Options{Addr: "localhost:16379", Password: "openIM123", DB: 0})},
args{context.Background(), "cid", []int64{1, 2, 3}},
[]*sdkws.MsgData{{Seq: 1}, {Seq: 2}, {Seq: 3}}, failedSeq, assert.NoError},
{"test2", fields{rdb: redis.NewClient(&redis.Options{Addr: "localhost:16379", Password: "openIM123", DB: 0})},
args{context.Background(), "cid", []int64{4, 5, 6}},
nil, []int64{4, 5, 6}, assert.NoError},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
c := &msgCache{
rdb: tt.fields.rdb,
}
gotSeqMsgs, gotFailedSeqs, err := c.GetMessagesBySeq(tt.args.ctx, tt.args.conversationID, tt.args.seqs)
if !tt.wantErr(t, err, fmt.Sprintf("GetMessagesBySeq(%v, %v, %v)", tt.args.ctx, tt.args.conversationID, tt.args.seqs)) {
return
}
equalMsgDataSlices(t, tt.wantSeqMsgs, gotSeqMsgs)
assert.Equalf(t, tt.wantFailedSeqs, gotFailedSeqs, "GetMessagesBySeq(%v, %v, %v)", tt.args.ctx, tt.args.conversationID, tt.args.seqs)
})
}
}
func equalMsgDataSlices(t *testing.T, expected, actual []*sdkws.MsgData) {
assert.Equal(t, len(expected), len(actual), "Slices have different lengths")
for i := range expected {
assert.True(t, proto.Equal(expected[i], actual[i]), "Element %d not equal: expected %v, got %v", i, expected[i], actual[i])
}
}
func Test_msgCache_DeleteMessagesFromCache(t *testing.T) {
type fields struct {
rdb redis.UniversalClient
}
type args struct {
ctx context.Context
conversationID string
seqs []int64
}
tests := []struct {
name string
fields fields
args args
wantErr assert.ErrorAssertionFunc
}{
{"test1", fields{rdb: redis.NewClient(&redis.Options{Addr: "localhost:16379", Password: "openIM123"})},
args{context.Background(), "cid", []int64{1, 2, 3}}, assert.NoError},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
c := &msgCache{
rdb: tt.fields.rdb,
}
tt.wantErr(t, c.DeleteMessagesFromCache(tt.args.ctx, tt.args.conversationID, tt.args.seqs),
fmt.Sprintf("DeleteMessagesFromCache(%v, %v, %v)", tt.args.ctx, tt.args.conversationID, tt.args.seqs))
})
}
}

@ -58,8 +58,6 @@ type CommonMsgDatabase interface {
GetMsgBySeqs(ctx context.Context, userID string, conversationID string, seqs []int64) (minSeq int64, maxSeq int64, seqMsg []*sdkws.MsgData, err error) GetMsgBySeqs(ctx context.Context, userID string, conversationID string, seqs []int64) (minSeq int64, maxSeq int64, seqMsg []*sdkws.MsgData, err error)
GetMessagesBySeqWithBounds(ctx context.Context, userID string, conversationID string, seqs []int64, pullOrder sdkws.PullOrder) (bool, int64, []*sdkws.MsgData, error) GetMessagesBySeqWithBounds(ctx context.Context, userID string, conversationID string, seqs []int64, pullOrder sdkws.PullOrder) (bool, int64, []*sdkws.MsgData, error)
// ClearUserMsgs marks messages for deletion based on clear time and returns a list of sequence numbers for marked messages.
ClearUserMsgs(ctx context.Context, userID string, conversationID string, clearTime int64, lastMsgClearTime time.Time) (seqs []int64, err error)
// DeleteUserMsgsBySeqs allows a user to delete messages based on sequence numbers. // DeleteUserMsgsBySeqs allows a user to delete messages based on sequence numbers.
DeleteUserMsgsBySeqs(ctx context.Context, userID string, conversationID string, seqs []int64) error DeleteUserMsgsBySeqs(ctx context.Context, userID string, conversationID string, seqs []int64) error
// DeleteMsgsPhysicalBySeqs physically deletes messages by emptying them based on sequence numbers. // DeleteMsgsPhysicalBySeqs physically deletes messages by emptying them based on sequence numbers.
@ -112,7 +110,7 @@ func NewCommonMsgDatabase(msgDocModel database.Msg, msg cache.MsgCache, seqUser
} }
return &commonMsgDatabase{ return &commonMsgDatabase{
msgDocDatabase: msgDocModel, msgDocDatabase: msgDocModel,
msg: msg, msgCache: msg,
seqUser: seqUser, seqUser: seqUser,
seqConversation: seqConversation, seqConversation: seqConversation,
producer: producerToRedis, producer: producerToRedis,
@ -122,7 +120,7 @@ func NewCommonMsgDatabase(msgDocModel database.Msg, msg cache.MsgCache, seqUser
type commonMsgDatabase struct { type commonMsgDatabase struct {
msgDocDatabase database.Msg msgDocDatabase database.Msg
msgTable model.MsgDocModel msgTable model.MsgDocModel
msg cache.MsgCache msgCache cache.MsgCache
seqConversation cache.SeqConversationCache seqConversation cache.SeqConversationCache
seqUser cache.SeqUser seqUser cache.SeqUser
producer *kafka.Producer producer *kafka.Producer
@ -239,7 +237,7 @@ func (db *commonMsgDatabase) RevokeMsg(ctx context.Context, conversationID strin
if err := db.batchInsertBlock(ctx, conversationID, []any{revoke}, updateKeyRevoke, seq); err != nil { if err := db.batchInsertBlock(ctx, conversationID, []any{revoke}, updateKeyRevoke, seq); err != nil {
return err return err
} }
return db.msg.DelMessageBySeqs(ctx, conversationID, []int64{seq}) return db.msgCache.DelMessageBySeqs(ctx, conversationID, []int64{seq})
} }
func (db *commonMsgDatabase) MarkSingleChatMsgsAsRead(ctx context.Context, userID string, conversationID string, totalSeqs []int64) error { func (db *commonMsgDatabase) MarkSingleChatMsgsAsRead(ctx context.Context, userID string, conversationID string, totalSeqs []int64) error {
@ -254,7 +252,7 @@ func (db *commonMsgDatabase) MarkSingleChatMsgsAsRead(ctx context.Context, userI
return err return err
} }
} }
return db.msg.DelMessageBySeqs(ctx, conversationID, totalSeqs) return db.msgCache.DelMessageBySeqs(ctx, conversationID, totalSeqs)
} }
func (db *commonMsgDatabase) getMsgBySeqs(ctx context.Context, userID, conversationID string, seqs []int64) (totalMsgs []*sdkws.MsgData, err error) { func (db *commonMsgDatabase) getMsgBySeqs(ctx context.Context, userID, conversationID string, seqs []int64) (totalMsgs []*sdkws.MsgData, err error) {
@ -533,82 +531,8 @@ func (db *commonMsgDatabase) GetMessagesBySeqWithBounds(ctx context.Context, use
return isEnd, endSeq, successMsgs, nil return isEnd, endSeq, successMsgs, nil
} }
func (db *commonMsgDatabase) ClearUserMsgs(ctx context.Context, userID string, conversationID string, clearTime int64, lastMsgClearTime time.Time) (seqs []int64, err error) {
var index int64
for {
// from oldest 2 newest, ASC
msgDocModel, err := db.msgDocDatabase.GetMsgDocModelByIndex(ctx, conversationID, index, 1)
if err != nil || msgDocModel.DocID == "" {
if err != nil {
if err == model.ErrMsgListNotExist {
log.ZDebug(ctx, "not doc find", "conversationID", conversationID, "userID", userID, "index", index)
} else {
log.ZError(ctx, "deleteMsgRecursion GetUserMsgListByIndex failed", err, "conversationID", conversationID, "index", index)
}
}
// If an error is reported, or the error cannot be obtained, it is physically deleted and seq delMongoMsgsPhysical(delStruct.delDocIDList) is returned to end the recursion
break
}
index++
// && msgDocModel.Msg[0].Msg.SendTime > lastMsgClearTime.UnixMilli()
if len(msgDocModel.Msg) > 0 {
i := 0
var over bool
for _, msg := range msgDocModel.Msg {
i++
// over clear time, need to clear
if msg != nil && msg.Msg != nil && msg.Msg.SendTime+clearTime*1000 <= time.Now().UnixMilli() {
// if msg is not in del list, add to del list
if msg.Msg.SendTime+clearTime*1000 > lastMsgClearTime.UnixMilli() && !datautil.Contain(userID, msg.DelList...) {
seqs = append(seqs, msg.Msg.Seq)
}
} else {
log.ZDebug(ctx, "all msg need destruct is found", "conversationID", conversationID, "userID", userID, "index", index, "stop index", i)
over = true
break
}
}
if over {
break
}
}
}
log.ZDebug(ctx, "ClearUserMsgs", "conversationID", conversationID, "userID", userID, "seqs", seqs)
// have msg need to destruct
if len(seqs) > 0 {
// update min seq to clear after
userMinSeq := seqs[len(seqs)-1] + 1 // user min seq when clear after
currentUserMinSeq, err := db.seqUser.GetUserMinSeq(ctx, conversationID, userID) // user min seq when clear before
if err != nil {
return nil, err
}
// if before < after, update min seq
if currentUserMinSeq < userMinSeq {
if err := db.seqUser.SetUserMinSeq(ctx, conversationID, userID, userMinSeq); err != nil {
return nil, err
}
}
}
return seqs, nil
}
// this is struct for recursion.
type delMsgRecursionStruct struct {
minSeq int64
delDocIDs []string
}
func (d *delMsgRecursionStruct) getSetMinSeq() int64 {
return d.minSeq
}
func (db *commonMsgDatabase) DeleteMsgsPhysicalBySeqs(ctx context.Context, conversationID string, allSeqs []int64) error { func (db *commonMsgDatabase) DeleteMsgsPhysicalBySeqs(ctx context.Context, conversationID string, allSeqs []int64) error {
if err := db.msg.DeleteMessagesFromCache(ctx, conversationID, allSeqs); err != nil { if err := db.msgCache.DelMessageBySeqs(ctx, conversationID, allSeqs); err != nil {
return err return err
} }
for docID, seqs := range db.msgTable.GetDocIDSeqsMap(conversationID, allSeqs) { for docID, seqs := range db.msgTable.GetDocIDSeqsMap(conversationID, allSeqs) {
@ -620,11 +544,11 @@ func (db *commonMsgDatabase) DeleteMsgsPhysicalBySeqs(ctx context.Context, conve
return err return err
} }
} }
return db.msg.DelMessageBySeqs(ctx, conversationID, allSeqs) return db.msgCache.DelMessageBySeqs(ctx, conversationID, allSeqs)
} }
func (db *commonMsgDatabase) DeleteUserMsgsBySeqs(ctx context.Context, userID string, conversationID string, seqs []int64) error { func (db *commonMsgDatabase) DeleteUserMsgsBySeqs(ctx context.Context, userID string, conversationID string, seqs []int64) error {
if err := db.msg.DeleteMessagesFromCache(ctx, conversationID, seqs); err != nil { if err := db.msgCache.DelMessageBySeqs(ctx, conversationID, seqs); err != nil {
return err return err
} }
for docID, seqs := range db.msgTable.GetDocIDSeqsMap(conversationID, seqs) { for docID, seqs := range db.msgTable.GetDocIDSeqsMap(conversationID, seqs) {
@ -634,7 +558,7 @@ func (db *commonMsgDatabase) DeleteUserMsgsBySeqs(ctx context.Context, userID st
} }
} }
} }
return db.msg.DelMessageBySeqs(ctx, conversationID, seqs) return db.msgCache.DelMessageBySeqs(ctx, conversationID, seqs)
} }
func (db *commonMsgDatabase) GetMaxSeqs(ctx context.Context, conversationIDs []string) (map[string]int64, error) { func (db *commonMsgDatabase) GetMaxSeqs(ctx context.Context, conversationIDs []string) (map[string]int64, error) {
@ -678,11 +602,11 @@ func (db *commonMsgDatabase) GetHasReadSeq(ctx context.Context, userID string, c
} }
func (db *commonMsgDatabase) SetSendMsgStatus(ctx context.Context, id string, status int32) error { func (db *commonMsgDatabase) SetSendMsgStatus(ctx context.Context, id string, status int32) error {
return db.msg.SetSendMsgStatus(ctx, id, status) return db.msgCache.SetSendMsgStatus(ctx, id, status)
} }
func (db *commonMsgDatabase) GetSendMsgStatus(ctx context.Context, id string) (int32, error) { func (db *commonMsgDatabase) GetSendMsgStatus(ctx context.Context, id string) (int32, error) {
return db.msg.GetSendMsgStatus(ctx, id) return db.msgCache.GetSendMsgStatus(ctx, id)
} }
func (db *commonMsgDatabase) GetConversationMinMaxSeqInMongoAndCache(ctx context.Context, conversationID string) (minSeqMongo, maxSeqMongo, minSeqCache, maxSeqCache int64, err error) { func (db *commonMsgDatabase) GetConversationMinMaxSeqInMongoAndCache(ctx context.Context, conversationID string) (minSeqMongo, maxSeqMongo, minSeqCache, maxSeqCache int64, err error) {
@ -812,7 +736,7 @@ func (db *commonMsgDatabase) DeleteDoc(ctx context.Context, docID string) error
if err := db.msgDocDatabase.DeleteDoc(ctx, docID); err != nil { if err := db.msgDocDatabase.DeleteDoc(ctx, docID); err != nil {
return err return err
} }
return db.msg.DelMessageBySeqs(ctx, conversationID, seqs) return db.msgCache.DelMessageBySeqs(ctx, conversationID, seqs)
} }
func (db *commonMsgDatabase) GetLastMessageSeqByTime(ctx context.Context, conversationID string, time int64) (int64, error) { func (db *commonMsgDatabase) GetLastMessageSeqByTime(ctx context.Context, conversationID string, time int64) (int64, error) {
@ -872,7 +796,7 @@ func (db *commonMsgDatabase) handlerQuote(ctx context.Context, userID, conversat
} }
func (db *commonMsgDatabase) GetMessageBySeqs(ctx context.Context, conversationID string, userID string, seqs []int64) ([]*sdkws.MsgData, error) { func (db *commonMsgDatabase) GetMessageBySeqs(ctx context.Context, conversationID string, userID string, seqs []int64) ([]*sdkws.MsgData, error) {
msgs, err := db.msg.GetMessageBySeqs(ctx, conversationID, seqs) msgs, err := db.msgCache.GetMessageBySeqs(ctx, conversationID, seqs)
if err != nil { if err != nil {
return nil, err return nil, err
} }

@ -5,7 +5,6 @@ import (
"github.com/openimsdk/protocol/constant" "github.com/openimsdk/protocol/constant"
"github.com/openimsdk/open-im-server/v3/pkg/common/config" "github.com/openimsdk/open-im-server/v3/pkg/common/config"
"github.com/openimsdk/open-im-server/v3/pkg/common/prommetrics"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache" "github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/database" "github.com/openimsdk/open-im-server/v3/pkg/common/storage/database"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/model" "github.com/openimsdk/open-im-server/v3/pkg/common/storage/model"
@ -50,7 +49,7 @@ func NewMsgTransferDatabase(msgDocModel database.Msg, msg cache.MsgCache, seqUse
} }
return &msgTransferDatabase{ return &msgTransferDatabase{
msgDocDatabase: msgDocModel, msgDocDatabase: msgDocModel,
msg: msg, msgCache: msg,
seqUser: seqUser, seqUser: seqUser,
seqConversation: seqConversation, seqConversation: seqConversation,
producerToMongo: producerToMongo, producerToMongo: producerToMongo,
@ -61,7 +60,7 @@ func NewMsgTransferDatabase(msgDocModel database.Msg, msg cache.MsgCache, seqUse
type msgTransferDatabase struct { type msgTransferDatabase struct {
msgDocDatabase database.Msg msgDocDatabase database.Msg
msgTable model.MsgDocModel msgTable model.MsgDocModel
msg cache.MsgCache msgCache cache.MsgCache
seqConversation cache.SeqConversationCache seqConversation cache.SeqConversationCache
seqUser cache.SeqUser seqUser cache.SeqUser
producerToMongo *kafka.Producer producerToMongo *kafka.Producer
@ -73,10 +72,12 @@ func (db *msgTransferDatabase) BatchInsertChat2DB(ctx context.Context, conversat
return errs.ErrArgs.WrapMsg("msgList is empty") return errs.ErrArgs.WrapMsg("msgList is empty")
} }
msgs := make([]any, len(msgList)) msgs := make([]any, len(msgList))
seqs := make([]int64, len(msgList))
for i, msg := range msgList { for i, msg := range msgList {
if msg == nil { if msg == nil {
continue continue
} }
seqs[i] = msg.Seq
var offlinePushModel *model.OfflinePushModel var offlinePushModel *model.OfflinePushModel
if msg.OfflinePushInfo != nil { if msg.OfflinePushInfo != nil {
offlinePushModel = &model.OfflinePushModel{ offlinePushModel = &model.OfflinePushModel{
@ -114,7 +115,10 @@ func (db *msgTransferDatabase) BatchInsertChat2DB(ctx context.Context, conversat
Ex: msg.Ex, Ex: msg.Ex,
} }
} }
return db.BatchInsertBlock(ctx, conversationID, msgs, updateKeyMsg, msgList[0].Seq) if err := db.BatchInsertBlock(ctx, conversationID, msgs, updateKeyMsg, msgList[0].Seq); err != nil {
return err
}
return db.msgCache.DelMessageBySeqs(ctx, conversationID, seqs)
} }
func (db *msgTransferDatabase) BatchInsertBlock(ctx context.Context, conversationID string, fields []any, key int8, firstSeq int64) error { func (db *msgTransferDatabase) BatchInsertBlock(ctx context.Context, conversationID string, fields []any, key int8, firstSeq int64) error {
@ -219,7 +223,7 @@ func (db *msgTransferDatabase) BatchInsertBlock(ctx context.Context, conversatio
} }
func (db *msgTransferDatabase) DeleteMessagesFromCache(ctx context.Context, conversationID string, seqs []int64) error { func (db *msgTransferDatabase) DeleteMessagesFromCache(ctx context.Context, conversationID string, seqs []int64) error {
return db.msg.DeleteMessagesFromCache(ctx, conversationID, seqs) return db.msgCache.DelMessageBySeqs(ctx, conversationID, seqs)
} }
func (db *msgTransferDatabase) BatchInsertChat2Cache(ctx context.Context, conversationID string, msgs []*sdkws.MsgData) (seq int64, isNew bool, userHasReadMap map[string]int64, err error) { func (db *msgTransferDatabase) BatchInsertChat2Cache(ctx context.Context, conversationID string, msgs []*sdkws.MsgData) (seq int64, isNew bool, userHasReadMap map[string]int64, err error) {
@ -238,20 +242,17 @@ func (db *msgTransferDatabase) BatchInsertChat2Cache(ctx context.Context, conver
isNew = currentMaxSeq == 0 isNew = currentMaxSeq == 0
lastMaxSeq := currentMaxSeq lastMaxSeq := currentMaxSeq
userSeqMap := make(map[string]int64) userSeqMap := make(map[string]int64)
seqs := make([]int64, 0, lenList)
for _, m := range msgs { for _, m := range msgs {
currentMaxSeq++ currentMaxSeq++
m.Seq = currentMaxSeq m.Seq = currentMaxSeq
userSeqMap[m.SendID] = m.Seq userSeqMap[m.SendID] = m.Seq
seqs = append(seqs, m.Seq)
} }
if err := db.msgCache.DelMessageBySeqs(ctx, conversationID, seqs); err != nil {
failedNum, err := db.msg.SetMessagesToCache(ctx, conversationID, msgs) return 0, false, nil, err
if err != nil {
prommetrics.MsgInsertRedisFailedCounter.Add(float64(failedNum))
log.ZError(ctx, "setMessageToCache error", err, "len", len(msgs), "conversationID", conversationID)
} else {
prommetrics.MsgInsertRedisSuccessCounter.Inc()
} }
return lastMaxSeq, isNew, userSeqMap, errs.Wrap(err) return lastMaxSeq, isNew, userSeqMap, nil
} }
func (db *msgTransferDatabase) SetHasReadSeqs(ctx context.Context, conversationID string, userSeqMap map[string]int64) error { func (db *msgTransferDatabase) SetHasReadSeqs(ctx context.Context, conversationID string, userSeqMap map[string]int64) error {

Loading…
Cancel
Save