From 5c52840d670e1a280335015d648bc090577cb068 Mon Sep 17 00:00:00 2001 From: chao <48119764+withchao@users.noreply.github.com> Date: Fri, 26 Jul 2024 15:53:25 +0800 Subject: [PATCH] fix: user seq, asynchronous friend notification, message search (#2447) * fix: GroupApplicationAcceptedNotification * fix: GroupApplicationAcceptedNotification * fix: NotificationUserInfoUpdate * cicd: robot automated Change * fix: component * fix: getConversationInfo * feat: cron task * feat: cron task * feat: cron task * feat: cron task * feat: cron task * fix: minio config url recognition error * new mongo * new mongo * new mongo * new mongo * new mongo * new mongo * new mongo * new mongo * friend incr sync * friend incr sync * friend incr sync * friend incr sync * friend incr sync * mage * optimization version log * optimization version log * sync * sync * sync * group sync * sync option * sync option * refactor: replace `friend` package with `realtion`. * refactor: update lastest commit to relation. * sync option * sync option * sync option * sync * sync * go.mod * seq * update: go mod * refactor: change incremental to full * feat: get full friend user ids * feat: api and config * seq * group version * merge * seq * seq * seq * fix: sort by id avoid unstable sort friends. * group * group * group * fix: sort by id avoid unstable sort friends. * fix: sort by id avoid unstable sort friends. * fix: sort by id avoid unstable sort friends. * user version * seq * seq * seq user * user online * implement minio expire delete. * user online * config * fix * fix * implement minio expire delete logic. * online cache * online cache * online cache * online cache * online cache * online cache * online cache * online cache * online cache * online cache * online cache * online cache * feat: implement scheduled delete outdated object in minio. * update gomake version * update gomake version * implement FindExpires pagination. * remove unnesseary incr. * fix uncorrect args call. * online push * online push * online push * resolving conflicts * resolving conflicts * test * api prommetrics * api prommetrics * api prommetrics * api prommetrics * api prommetrics * rpc prommetrics * rpc prommetrics * online status * online status * online status * online status * sub * conversation version incremental * merge seq * merge online * merge online * merge online * merge seq * GetOwnerConversation * fix: change incremental syncer router name. * rockscache batch get * rockscache seq batch get * fix: GetMsgDocModelByIndex bug * update go.mod * update go.mod * merge * feat: prometheus * feat: prometheus * group member sort * sub * sub * fix: seq conversion bug * fix: redis pipe exec * sort version * sort version * sort version * remove old version online subscription * remove old version online subscription * version log index * version log index * batch push * batch push * seq void filling * fix: batchGetMaxSeq * fix: batchGetMaxSeq * cache db error log * 111 * fix bug --------- Co-authored-by: withchao Co-authored-by: Monet Lee Co-authored-by: OpenIM-Gordon <46924906+FGadvancer@users.noreply.github.com> Co-authored-by: icey-yu <1186114839@qq.com> --- internal/rpc/friend/friend.go | 4 +- internal/rpc/friend/sync.go | 22 +- internal/rpc/msg/sync_msg.go | 4 +- pkg/common/storage/cache/redis/batch_test.go | 2 +- .../storage/cache/redis/seq_user_test.go | 32 ++ pkg/common/storage/controller/msg.go | 4 +- pkg/common/storage/database/mgo/msg.go | 457 ++++++++++++++---- pkg/common/storage/database/mgo/msg_test.go | 75 +++ .../database/mgo/seq_conversation_test.go | 7 +- pkg/common/storage/database/mgo/seq_user.go | 9 + pkg/common/storage/database/msg.go | 2 +- 11 files changed, 518 insertions(+), 100 deletions(-) create mode 100644 pkg/common/storage/database/mgo/msg_test.go diff --git a/internal/rpc/friend/friend.go b/internal/rpc/friend/friend.go index 622e19f42..bdb786bca 100644 --- a/internal/rpc/friend/friend.go +++ b/internal/rpc/friend/friend.go @@ -16,6 +16,7 @@ package friend import ( "context" + "github.com/openimsdk/tools/mq/memamq" "github.com/openimsdk/open-im-server/v3/pkg/common/config" "github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache/redis" @@ -49,6 +50,7 @@ type friendServer struct { RegisterCenter discovery.SvcDiscoveryRegistry config *Config webhookClient *webhook.Client + queue *memamq.MemoryQueue } type Config struct { @@ -118,8 +120,8 @@ func Start(ctx context.Context, config *Config, client discovery.SvcDiscoveryReg conversationRpcClient: rpcclient.NewConversationRpcClient(client, config.Share.RpcRegisterName.Conversation), config: config, webhookClient: webhook.NewWebhookClient(config.WebhooksConfig.URL), + queue: memamq.NewMemoryQueue(128, 1024*8), }) - return nil } diff --git a/internal/rpc/friend/sync.go b/internal/rpc/friend/sync.go index 145c287da..902cc7303 100644 --- a/internal/rpc/friend/sync.go +++ b/internal/rpc/friend/sync.go @@ -4,6 +4,7 @@ import ( "context" "github.com/openimsdk/open-im-server/v3/pkg/util/hashutil" "github.com/openimsdk/protocol/sdkws" + "github.com/openimsdk/tools/log" "slices" "github.com/openimsdk/open-im-server/v3/internal/rpc/incrversion" @@ -17,14 +18,23 @@ func (s *friendServer) NotificationUserInfoUpdate(ctx context.Context, req *rela if err != nil { return nil, err } - for _, userID := range userIDs { - if err := s.db.OwnerIncrVersion(ctx, userID, []string{req.UserID}, model.VersionStateUpdate); err != nil { - return nil, err + if len(userIDs) > 0 { + friendUserIDs := []string{req.UserID} + noCancelCtx := context.WithoutCancel(ctx) + err := s.queue.PushCtx(ctx, func() { + for _, userID := range userIDs { + if err := s.db.OwnerIncrVersion(noCancelCtx, userID, friendUserIDs, model.VersionStateUpdate); err != nil { + log.ZError(ctx, "OwnerIncrVersion", err, "userID", userID, "friendUserIDs", friendUserIDs) + } + } + for _, userID := range userIDs { + s.notificationSender.FriendInfoUpdatedNotification(noCancelCtx, req.UserID, userID) + } + }) + if err != nil { + log.ZError(ctx, "NotificationUserInfoUpdate timeout", err, "userID", req.UserID) } } - for _, userID := range userIDs { - s.notificationSender.FriendInfoUpdatedNotification(ctx, req.UserID, userID) - } return &relation.NotificationUserInfoUpdateResp{}, nil } diff --git a/internal/rpc/msg/sync_msg.go b/internal/rpc/msg/sync_msg.go index afb79506e..f5b5ebda5 100644 --- a/internal/rpc/msg/sync_msg.go +++ b/internal/rpc/msg/sync_msg.go @@ -111,7 +111,7 @@ func (m *msgServer) GetMaxSeq(ctx context.Context, req *sdkws.GetMaxSeqReq) (*sd func (m *msgServer) SearchMessage(ctx context.Context, req *msg.SearchMessageReq) (resp *msg.SearchMessageResp, err error) { var chatLogs []*sdkws.MsgData - var total int32 + var total int64 resp = &msg.SearchMessageResp{} if total, chatLogs, err = m.MsgDatabase.SearchMessage(ctx, req); err != nil { return nil, err @@ -194,7 +194,7 @@ func (m *msgServer) SearchMessage(ctx context.Context, req *msg.SearchMessageReq } resp.ChatLogs = append(resp.ChatLogs, pbchatLog) } - resp.ChatLogsNum = total + resp.ChatLogsNum = int32(total) return resp, nil } diff --git a/pkg/common/storage/cache/redis/batch_test.go b/pkg/common/storage/cache/redis/batch_test.go index e4caa2a21..bbb6d76f1 100644 --- a/pkg/common/storage/cache/redis/batch_test.go +++ b/pkg/common/storage/cache/redis/batch_test.go @@ -45,7 +45,7 @@ func TestName(t *testing.T) { } seqUser := NewSeqUserCacheRedis(rdb, mgoSeqUser) - res, err := seqUser.GetReadSeqs(ctx, "2110910952", []string{"sg_2920732023", "sg_345762580"}) + res, err := seqUser.GetUserReadSeqs(ctx, "2110910952", []string{"sg_2920732023", "sg_345762580"}) if err != nil { panic(err) } diff --git a/pkg/common/storage/cache/redis/seq_user_test.go b/pkg/common/storage/cache/redis/seq_user_test.go index e4fd95922..0059c81db 100644 --- a/pkg/common/storage/cache/redis/seq_user_test.go +++ b/pkg/common/storage/cache/redis/seq_user_test.go @@ -4,7 +4,10 @@ import ( "context" "fmt" "github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache/cachekey" + mgo2 "github.com/openimsdk/open-im-server/v3/pkg/common/storage/database/mgo" "github.com/redis/go-redis/v9" + "go.mongodb.org/mongo-driver/mongo" + "go.mongodb.org/mongo-driver/mongo/options" "log" "strconv" "sync/atomic" @@ -77,3 +80,32 @@ func TestRecvOnline(t *testing.T) { fmt.Printf("Received message from channel %s: %s\n", msg.Channel, msg.Payload) } } + +func TestName1(t *testing.T) { + opt := &redis.Options{ + Addr: "172.16.8.48:16379", + Password: "openIM123", + DB: 0, + } + rdb := redis.NewClient(opt) + + mgo, err := mongo.Connect(context.Background(), + options.Client(). + ApplyURI("mongodb://openIM:openIM123@172.16.8.48:37017/openim_v3?maxPoolSize=100"). + SetConnectTimeout(5*time.Second)) + if err != nil { + panic(err) + } + model, err := mgo2.NewSeqUserMongo(mgo.Database("openim_v3")) + if err != nil { + panic(err) + } + seq := NewSeqUserCacheRedis(rdb, model) + + res, err := seq.GetUserReadSeqs(context.Background(), "2110910952", []string{"sg_345762580", "2000", "3000"}) + if err != nil { + panic(err) + } + t.Log(res) + +} diff --git a/pkg/common/storage/controller/msg.go b/pkg/common/storage/controller/msg.go index a6f0dbbb0..4ea74ef69 100644 --- a/pkg/common/storage/controller/msg.go +++ b/pkg/common/storage/controller/msg.go @@ -84,7 +84,7 @@ type CommonMsgDatabase interface { //GetConversationMinMaxSeqInMongoAndCache(ctx context.Context, conversationID string) (minSeqMongo, maxSeqMongo, minSeqCache, maxSeqCache int64, err error) SetSendMsgStatus(ctx context.Context, id string, status int32) error GetSendMsgStatus(ctx context.Context, id string) (int32, error) - SearchMessage(ctx context.Context, req *pbmsg.SearchMessageReq) (total int32, msgData []*sdkws.MsgData, err error) + SearchMessage(ctx context.Context, req *pbmsg.SearchMessageReq) (total int64, msgData []*sdkws.MsgData, err error) FindOneByDocIDs(ctx context.Context, docIDs []string, seqs map[string]int64) (map[string]*sdkws.MsgData, error) // to mq @@ -878,7 +878,7 @@ func (db *commonMsgDatabase) RangeGroupSendCount( return db.msgDocDatabase.RangeGroupSendCount(ctx, start, end, ase, pageNumber, showNumber) } -func (db *commonMsgDatabase) SearchMessage(ctx context.Context, req *pbmsg.SearchMessageReq) (total int32, msgData []*sdkws.MsgData, err error) { +func (db *commonMsgDatabase) SearchMessage(ctx context.Context, req *pbmsg.SearchMessageReq) (total int64, msgData []*sdkws.MsgData, err error) { var totalMsgs []*sdkws.MsgData total, msgs, err := db.msgDocDatabase.SearchMessage(ctx, req) if err != nil { diff --git a/pkg/common/storage/database/mgo/msg.go b/pkg/common/storage/database/mgo/msg.go index 2d1819c3a..03f47c503 100644 --- a/pkg/common/storage/database/mgo/msg.go +++ b/pkg/common/storage/database/mgo/msg.go @@ -278,124 +278,409 @@ func (m *MsgMgo) MarkSingleChatMsgsAsRead(ctx context.Context, userID string, do return nil } -func (m *MsgMgo) SearchMessage(ctx context.Context, req *msg.SearchMessageReq) (int32, []*model.MsgInfoModel, error) { - where := make(bson.A, 0, 6) +//func (m *MsgMgo) searchCount(ctx context.Context, filter any) (int64, error) { +// +// return nil, nil +//} + +//func (m *MsgMgo) searchMessage(ctx context.Context, filter any, nextID primitive.ObjectID, content bool, limit int) (int64, []*model.MsgInfoModel, primitive.ObjectID, error) { +// var pipeline bson.A +// if !nextID.IsZero() { +// pipeline = append(pipeline, bson.M{"$match": bson.M{"_id": bson.M{"$gt": nextID}}}) +// } +// pipeline = append(pipeline, +// bson.M{"$match": filter}, +// bson.M{"$limit": limit}, +// bson.M{"$unwind": "$msgs"}, +// bson.M{"$match": filter}, +// bson.M{ +// "$group": bson.M{ +// "_id": "$_id", +// "doc_id": bson.M{ +// "$first": "$doc_id", +// }, +// "msgs": bson.M{"$push": "$msgs"}, +// }, +// }, +// ) +// if !content { +// pipeline = append(pipeline, +// bson.M{ +// "$project": bson.M{ +// "_id": 1, +// "count": bson.M{"$size": "$msgs"}, +// }, +// }, +// ) +// type result struct { +// ID primitive.ObjectID `bson:"_id"` +// Count int64 `bson:"count"` +// } +// res, err := mongoutil.Aggregate[result](ctx, m.coll, pipeline) +// if err != nil { +// return 0, nil, primitive.ObjectID{}, err +// } +// if len(res) == 0 { +// return 0, nil, primitive.ObjectID{}, nil +// } +// var count int64 +// for _, r := range res { +// count += r.Count +// } +// return count, nil, res[len(res)-1].ID, nil +// } +// type result struct { +// ID primitive.ObjectID `bson:"_id"` +// Msg []*model.MsgInfoModel `bson:"msgs"` +// } +// res, err := mongoutil.Aggregate[result](ctx, m.coll, pipeline) +// if err != nil { +// return 0, nil, primitive.ObjectID{}, err +// } +// if len(res) == 0 { +// return 0, nil, primitive.ObjectID{}, err +// } +// var count int +// for _, r := range res { +// count += len(r.Msg) +// } +// msgs := make([]*model.MsgInfoModel, 0, count) +// for _, r := range res { +// msgs = append(msgs, r.Msg...) +// } +// return int64(count), msgs, res[len(res)-1].ID, nil +//} + +/* + +db.msg3.aggregate( + [ + { + "$match": { + "doc_id": "si_7009965934_8710838466:0" + }, + + } + ] +) + + +*/ + +type searchMessageIndex struct { + ID primitive.ObjectID `bson:"_id"` + Index []int64 `bson:"index"` +} + +func (m *MsgMgo) searchMessageIndex(ctx context.Context, filter any, nextID primitive.ObjectID, limit int) ([]searchMessageIndex, error) { + var pipeline bson.A + if !nextID.IsZero() { + pipeline = append(pipeline, bson.M{"$match": bson.M{"_id": bson.M{"$gt": nextID}}}) + } + pipeline = append(pipeline, + bson.M{"$sort": bson.M{"_id": 1}}, + bson.M{"$match": filter}, + bson.M{"$limit": limit}, + bson.M{ + "$project": bson.M{ + "_id": 1, + "msgs": bson.M{ + "$map": bson.M{ + "input": "$msgs", + "as": "msg", + "in": bson.M{ + "$mergeObjects": bson.A{ + "$$msg", + bson.M{ + "_search_temp_index": bson.M{ + "$indexOfArray": bson.A{ + "$msgs", "$$msg", + }, + }, + }, + }, + }, + }, + }, + }, + }, + bson.M{"$unwind": "$msgs"}, + bson.M{"$match": filter}, + bson.M{ + "$project": bson.M{ + "_id": 1, + "msgs._search_temp_index": 1, + }, + }, + bson.M{ + "$group": bson.M{ + "_id": "$_id", + "index": bson.M{"$push": "$msgs._search_temp_index"}, + }, + }, + bson.M{"$sort": bson.M{"_id": 1}}, + ) + return mongoutil.Aggregate[searchMessageIndex](ctx, m.coll, pipeline) +} + +func (m *MsgMgo) searchMessage(ctx context.Context, req *msg.SearchMessageReq) (int64, []searchMessageIndex, error) { + filter := bson.M{} if req.RecvID != "" { - where = append(where, bson.M{"msgs.msg.recv_id": req.RecvID}) + filter["$or"] = bson.A{ + bson.M{"msgs.msg.recv_id": req.RecvID}, + bson.M{"msgs.msg.group_id": req.RecvID}, + } } if req.SendID != "" { - where = append(where, bson.M{"msgs.msg.send_id": req.SendID}) + filter["msgs.msg.send_id"] = req.SendID } if req.ContentType != 0 { - where = append(where, bson.M{"msgs.msg.content_type": req.ContentType}) + filter["msgs.msg.content_type"] = req.ContentType } if req.SessionType != 0 { - where = append(where, bson.M{"msgs.msg.session_type": req.SessionType}) + filter["msgs.msg.session_type"] = req.SessionType } if req.SendTime != "" { sendTime, err := time.Parse(time.DateOnly, req.SendTime) if err != nil { return 0, nil, errs.ErrArgs.WrapMsg("invalid sendTime", "req", req.SendTime, "format", time.DateOnly, "cause", err.Error()) } - where = append(where, - bson.M{ - "msgs.msg.send_time": bson.M{ - "$gte": sendTime.UnixMilli(), - }, - }, + filter["$and"] = bson.A{ + bson.M{"msgs.msg.send_time": bson.M{ + "$gte": sendTime.UnixMilli(), + }}, bson.M{ "msgs.msg.send_time": bson.M{ "$lt": sendTime.Add(time.Hour * 24).UnixMilli(), }, }, - ) - } - pipeline := bson.A{ - bson.M{ - "$unwind": "$msgs", - }, - } - if len(where) > 0 { - pipeline = append(pipeline, bson.M{ - "$match": bson.M{"$and": where}, - }) + } } - pipeline = append(pipeline, - bson.M{ - "$project": bson.M{ - "_id": 0, - "msg": "$msgs.msg", - }, - }, - bson.M{ - "$count": "count", - }, + + var ( + nextID primitive.ObjectID + count int + dataRange []searchMessageIndex + skip = int((req.Pagination.GetPageNumber() - 1) * req.Pagination.GetShowNumber()) ) - count, err := mongoutil.Aggregate[int32](ctx, m.coll, pipeline) - if err != nil { - return 0, nil, err + _, _ = dataRange, skip + const maxDoc = 50 + data := make([]searchMessageIndex, 0, req.Pagination.GetShowNumber()) + push := cap(data) + for i := 0; ; i++ { + res, err := m.searchMessageIndex(ctx, filter, nextID, maxDoc) + if err != nil { + return 0, nil, err + } + if len(res) > 0 { + nextID = res[len(res)-1].ID + } + for _, r := range res { + var dataIndex []int64 + for _, index := range r.Index { + if push > 0 && count >= skip { + dataIndex = append(dataIndex, index) + push-- + } + count++ + } + if len(dataIndex) > 0 { + data = append(data, searchMessageIndex{ + ID: r.ID, + Index: dataIndex, + }) + } + } + if push <= 0 { + push-- + } + if len(res) < maxDoc || push < -10 { + return int64(count), data, nil + } + } +} + +func (m *MsgMgo) getDocRange(ctx context.Context, id primitive.ObjectID, index []int64) ([]*model.MsgInfoModel, error) { + if len(index) == 0 { + return nil, nil } - if len(count) == 0 || count[0] == 0 { - return 0, nil, nil + + pipeline := bson.A{ + bson.M{"$match": bson.M{"_id": id}}, + bson.M{"$project": "$msgs"}, } - pipeline = pipeline[:len(pipeline)-1] - pipeline = append(pipeline, - bson.M{ - "$skip": (req.Pagination.GetPageNumber() - 1) * req.Pagination.GetShowNumber(), - }, - bson.M{ - "$limit": req.Pagination.GetShowNumber(), - }, - ) msgs, err := mongoutil.Aggregate[*model.MsgInfoModel](ctx, m.coll, pipeline) + if err != nil { + return nil, err + } + return msgs, nil +} + +func (m *MsgMgo) SearchMessage(ctx context.Context, req *msg.SearchMessageReq) (int64, []*model.MsgInfoModel, error) { + count, data, err := m.searchMessage(ctx, req) if err != nil { return 0, nil, err } - for i := range msgs { - msgInfo := msgs[i] - if msgInfo == nil || msgInfo.Msg == nil { - continue + var msgs []*model.MsgInfoModel + if len(data) > 0 { + var n int + for _, d := range data { + n += len(d.Index) } - if msgInfo.Revoke != nil { - revokeContent := sdkws.MessageRevokedContent{ - RevokerID: msgInfo.Revoke.UserID, - RevokerRole: msgInfo.Revoke.Role, - ClientMsgID: msgInfo.Msg.ClientMsgID, - RevokerNickname: msgInfo.Revoke.Nickname, - RevokeTime: msgInfo.Revoke.Time, - SourceMessageSendTime: msgInfo.Msg.SendTime, - SourceMessageSendID: msgInfo.Msg.SendID, - SourceMessageSenderNickname: msgInfo.Msg.SenderNickname, - SessionType: msgInfo.Msg.SessionType, - Seq: msgInfo.Msg.Seq, - Ex: msgInfo.Msg.Ex, - } - data, err := jsonutil.JsonMarshal(&revokeContent) - if err != nil { - return 0, nil, errs.WrapMsg(err, "json.Marshal revokeContent") - } - elem := sdkws.NotificationElem{Detail: string(data)} - content, err := jsonutil.JsonMarshal(&elem) - if err != nil { - return 0, nil, errs.WrapMsg(err, "json.Marshal elem") + msgs = make([]*model.MsgInfoModel, 0, n) + } + for _, val := range data { + res, err := mongoutil.FindOne[*model.MsgDocModel](ctx, m.coll, bson.M{"_id": val.ID}) + if err != nil { + return 0, nil, err + } + for _, i := range val.Index { + if i >= int64(len(res.Msg)) { + continue } - msgInfo.Msg.ContentType = constant.MsgRevokeNotification - msgInfo.Msg.Content = string(content) + msgs = append(msgs, res.Msg[i]) } } - //start := (req.Pagination.PageNumber - 1) * req.Pagination.ShowNumber - //n := int32(len(msgs)) - //if start >= n { - // return n, []*relation.MsgInfoModel{}, nil - //} - //if start+req.Pagination.ShowNumber < n { - // msgs = msgs[start : start+req.Pagination.ShowNumber] - //} else { - // msgs = msgs[start:] - //} - return count[0], msgs, nil + return count, msgs, nil } +//func (m *MsgMgo) SearchMessage(ctx context.Context, req *msg.SearchMessageReq) (int32, []*model.MsgInfoModel, error) { +// where := make(bson.A, 0, 6) +// if req.RecvID != "" { +// if req.SessionType == constant.ReadGroupChatType { +// where = append(where, bson.M{ +// "$or": bson.A{ +// bson.M{"doc_id": "^n_" + req.RecvID + ":"}, +// bson.M{"doc_id": "^sg_" + req.RecvID + ":"}, +// }, +// }) +// } else { +// where = append(where, bson.M{"msgs.msg.recv_id": req.RecvID}) +// } +// } +// if req.SendID != "" { +// where = append(where, bson.M{"msgs.msg.send_id": req.SendID}) +// } +// if req.ContentType != 0 { +// where = append(where, bson.M{"msgs.msg.content_type": req.ContentType}) +// } +// if req.SessionType != 0 { +// where = append(where, bson.M{"msgs.msg.session_type": req.SessionType}) +// } +// if req.SendTime != "" { +// sendTime, err := time.Parse(time.DateOnly, req.SendTime) +// if err != nil { +// return 0, nil, errs.ErrArgs.WrapMsg("invalid sendTime", "req", req.SendTime, "format", time.DateOnly, "cause", err.Error()) +// } +// where = append(where, +// bson.M{ +// "msgs.msg.send_time": bson.M{ +// "$gte": sendTime.UnixMilli(), +// }, +// }, +// bson.M{ +// "msgs.msg.send_time": bson.M{ +// "$lt": sendTime.Add(time.Hour * 24).UnixMilli(), +// }, +// }, +// ) +// } +// opt := options.Find().SetLimit(100) +// res, err := mongoutil.Find[model.MsgDocModel](ctx, m.coll, bson.M{"$and": where}, opt) +// if err != nil { +// return 0, nil, err +// } +// _ = res +// fmt.Println() +// +// return 0, nil, nil +// pipeline := bson.A{ +// bson.M{ +// "$unwind": "$msgs", +// }, +// } +// if len(where) > 0 { +// pipeline = append(pipeline, bson.M{ +// "$match": bson.M{"$and": where}, +// }) +// } +// pipeline = append(pipeline, +// bson.M{ +// "$project": bson.M{ +// "_id": 0, +// "msg": "$msgs.msg", +// }, +// }, +// bson.M{ +// "$count": "count", +// }, +// ) +// //count, err := mongoutil.Aggregate[int32](ctx, m.coll, pipeline) +// //if err != nil { +// // return 0, nil, err +// //} +// //if len(count) == 0 || count[0] == 0 { +// // return 0, nil, nil +// //} +// count := []int32{0} +// pipeline = pipeline[:len(pipeline)-1] +// pipeline = append(pipeline, +// bson.M{ +// "$skip": (req.Pagination.GetPageNumber() - 1) * req.Pagination.GetShowNumber(), +// }, +// bson.M{ +// "$limit": req.Pagination.GetShowNumber(), +// }, +// ) +// msgs, err := mongoutil.Aggregate[*model.MsgInfoModel](ctx, m.coll, pipeline) +// if err != nil { +// return 0, nil, err +// } +// for i := range msgs { +// msgInfo := msgs[i] +// if msgInfo == nil || msgInfo.Msg == nil { +// continue +// } +// if msgInfo.Revoke != nil { +// revokeContent := sdkws.MessageRevokedContent{ +// RevokerID: msgInfo.Revoke.UserID, +// RevokerRole: msgInfo.Revoke.Role, +// ClientMsgID: msgInfo.Msg.ClientMsgID, +// RevokerNickname: msgInfo.Revoke.Nickname, +// RevokeTime: msgInfo.Revoke.Time, +// SourceMessageSendTime: msgInfo.Msg.SendTime, +// SourceMessageSendID: msgInfo.Msg.SendID, +// SourceMessageSenderNickname: msgInfo.Msg.SenderNickname, +// SessionType: msgInfo.Msg.SessionType, +// Seq: msgInfo.Msg.Seq, +// Ex: msgInfo.Msg.Ex, +// } +// data, err := jsonutil.JsonMarshal(&revokeContent) +// if err != nil { +// return 0, nil, errs.WrapMsg(err, "json.Marshal revokeContent") +// } +// elem := sdkws.NotificationElem{Detail: string(data)} +// content, err := jsonutil.JsonMarshal(&elem) +// if err != nil { +// return 0, nil, errs.WrapMsg(err, "json.Marshal elem") +// } +// msgInfo.Msg.ContentType = constant.MsgRevokeNotification +// msgInfo.Msg.Content = string(content) +// } +// } +// //start := (req.Pagination.PageNumber - 1) * req.Pagination.ShowNumber +// //n := int32(len(msgs)) +// //if start >= n { +// // return n, []*relation.MsgInfoModel{}, nil +// //} +// //if start+req.Pagination.ShowNumber < n { +// // msgs = msgs[start : start+req.Pagination.ShowNumber] +// //} else { +// // msgs = msgs[start:] +// //} +// return count[0], msgs, nil +//} + func (m *MsgMgo) RangeUserSendCount(ctx context.Context, start time.Time, end time.Time, group bool, ase bool, pageNumber int32, showNumber int32) (msgCount int64, userCount int64, users []*model.UserCount, dateCount map[string]int64, err error) { var sort int if ase { diff --git a/pkg/common/storage/database/mgo/msg_test.go b/pkg/common/storage/database/mgo/msg_test.go new file mode 100644 index 000000000..5aed4dc51 --- /dev/null +++ b/pkg/common/storage/database/mgo/msg_test.go @@ -0,0 +1,75 @@ +package mgo + +import ( + "context" + "github.com/openimsdk/open-im-server/v3/pkg/common/storage/model" + "github.com/openimsdk/protocol/msg" + "github.com/openimsdk/protocol/sdkws" + "github.com/openimsdk/tools/db/mongoutil" + "go.mongodb.org/mongo-driver/bson" + "go.mongodb.org/mongo-driver/mongo" + "go.mongodb.org/mongo-driver/mongo/options" + "math/rand" + "strconv" + "testing" + "time" +) + +func TestName1(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), time.Second*300) + defer cancel() + cli := Result(mongo.Connect(ctx, options.Client().ApplyURI("mongodb://openIM:openIM123@172.16.8.48:37017/openim_v3?maxPoolSize=100").SetConnectTimeout(5*time.Second))) + + v := &MsgMgo{ + coll: cli.Database("openim_v3").Collection("msg3"), + } + + req := &msg.SearchMessageReq{ + //RecvID: "3187706596", + //SendID: "7009965934", + ContentType: 101, + //SendTime: "2024-05-06", + //SessionType: 3, + Pagination: &sdkws.RequestPagination{ + PageNumber: 1, + ShowNumber: 10, + }, + } + total, res, err := v.SearchMessage(ctx, req) + if err != nil { + panic(err) + } + + for i, re := range res { + t.Logf("%d => %d | %+v", i+1, re.Msg.Seq, re.Msg.Content) + } + + t.Log(total) +} + +func TestName10(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), time.Second*10) + defer cancel() + cli := Result(mongo.Connect(ctx, options.Client().ApplyURI("mongodb://openIM:openIM123@172.16.8.48:37017/openim_v3?maxPoolSize=100").SetConnectTimeout(5*time.Second))) + + v := &MsgMgo{ + coll: cli.Database("openim_v3").Collection("msg3"), + } + opt := options.Find().SetLimit(1000) + + res, err := mongoutil.Find[model.MsgDocModel](ctx, v.coll, bson.M{}, opt) + if err != nil { + panic(err) + } + ctx = context.Background() + for i := 0; i < 100000; i++ { + for j := range res { + res[j].DocID = strconv.FormatUint(rand.Uint64(), 10) + ":0" + } + if err := mongoutil.InsertMany(ctx, v.coll, res); err != nil { + panic(err) + } + t.Log("====>", time.Now(), i) + } + +} diff --git a/pkg/common/storage/database/mgo/seq_conversation_test.go b/pkg/common/storage/database/mgo/seq_conversation_test.go index 5167314da..42507a693 100644 --- a/pkg/common/storage/database/mgo/seq_conversation_test.go +++ b/pkg/common/storage/database/mgo/seq_conversation_test.go @@ -26,7 +26,7 @@ func Mongodb() *mongo.Database { func TestUserSeq(t *testing.T) { uSeq := Result(NewSeqUserMongo(Mongodb())).(*seqUserMongo) - t.Log(uSeq.SetMinSeq(context.Background(), "1000", "2000", 4)) + t.Log(uSeq.SetUserMinSeq(context.Background(), "1000", "2000", 4)) } func TestConversationSeq(t *testing.T) { @@ -35,3 +35,8 @@ func TestConversationSeq(t *testing.T) { t.Log(cSeq.Malloc(context.Background(), "2000", 10)) t.Log(cSeq.GetMaxSeq(context.Background(), "2000")) } + +func TestUserGetUserReadSeqs(t *testing.T) { + uSeq := Result(NewSeqUserMongo(Mongodb())).(*seqUserMongo) + t.Log(uSeq.GetUserReadSeqs(context.Background(), "2110910952", []string{"sg_345762580", "2000", "3000"})) +} diff --git a/pkg/common/storage/database/mgo/seq_user.go b/pkg/common/storage/database/mgo/seq_user.go index 7c9a8f133..9faad416a 100644 --- a/pkg/common/storage/database/mgo/seq_user.go +++ b/pkg/common/storage/database/mgo/seq_user.go @@ -88,6 +88,14 @@ func (s *seqUserMongo) GetUserReadSeq(ctx context.Context, conversationID string return s.getSeq(ctx, conversationID, userID, "read_seq") } +func (s *seqUserMongo) notFoundSet0(seq map[string]int64, conversationIDs []string) { + for _, conversationID := range conversationIDs { + if _, ok := seq[conversationID]; !ok { + seq[conversationID] = 0 + } + } +} + func (s *seqUserMongo) GetUserReadSeqs(ctx context.Context, userID string, conversationID []string) (map[string]int64, error) { if len(conversationID) == 0 { return map[string]int64{}, nil @@ -102,6 +110,7 @@ func (s *seqUserMongo) GetUserReadSeqs(ctx context.Context, userID string, conve for _, seq := range seqs { res[seq.ConversationID] = seq.ReadSeq } + s.notFoundSet0(res, conversationID) return res, nil } diff --git a/pkg/common/storage/database/msg.go b/pkg/common/storage/database/msg.go index b402f3ac7..84f3a9e3e 100644 --- a/pkg/common/storage/database/msg.go +++ b/pkg/common/storage/database/msg.go @@ -37,7 +37,7 @@ type Msg interface { GetMsgDocModelByIndex(ctx context.Context, conversationID string, index, sort int64) (*model.MsgDocModel, error) DeleteMsgsInOneDocByIndex(ctx context.Context, docID string, indexes []int) error MarkSingleChatMsgsAsRead(ctx context.Context, userID string, docID string, indexes []int64) error - SearchMessage(ctx context.Context, req *msg.SearchMessageReq) (int32, []*model.MsgInfoModel, error) + SearchMessage(ctx context.Context, req *msg.SearchMessageReq) (int64, []*model.MsgInfoModel, error) RangeUserSendCount(ctx context.Context, start time.Time, end time.Time, group bool, ase bool, pageNumber int32, showNumber int32) (msgCount int64, userCount int64, users []*model.UserCount, dateCount map[string]int64, err error) RangeGroupSendCount(ctx context.Context, start time.Time, end time.Time, ase bool, pageNumber int32, showNumber int32) (msgCount int64, userCount int64, groups []*model.GroupCount, dateCount map[string]int64, err error) ConvertMsgsDocLen(ctx context.Context, conversationIDs []string)