pull/2447/head
withchao 1 year ago
parent a04e0e8ec2
commit 60eb5e913e

@ -16,6 +16,7 @@ package friend
import ( import (
"context" "context"
"github.com/openimsdk/tools/mq/memamq"
"github.com/openimsdk/open-im-server/v3/pkg/common/config" "github.com/openimsdk/open-im-server/v3/pkg/common/config"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache/redis" "github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache/redis"
@ -49,6 +50,7 @@ type friendServer struct {
RegisterCenter discovery.SvcDiscoveryRegistry RegisterCenter discovery.SvcDiscoveryRegistry
config *Config config *Config
webhookClient *webhook.Client webhookClient *webhook.Client
queue *memamq.MemoryQueue
} }
type Config struct { type Config struct {
@ -118,8 +120,8 @@ func Start(ctx context.Context, config *Config, client discovery.SvcDiscoveryReg
conversationRpcClient: rpcclient.NewConversationRpcClient(client, config.Share.RpcRegisterName.Conversation), conversationRpcClient: rpcclient.NewConversationRpcClient(client, config.Share.RpcRegisterName.Conversation),
config: config, config: config,
webhookClient: webhook.NewWebhookClient(config.WebhooksConfig.URL), webhookClient: webhook.NewWebhookClient(config.WebhooksConfig.URL),
queue: memamq.NewMemoryQueue(128, 1024*8),
}) })
return nil return nil
} }

@ -4,6 +4,7 @@ import (
"context" "context"
"github.com/openimsdk/open-im-server/v3/pkg/util/hashutil" "github.com/openimsdk/open-im-server/v3/pkg/util/hashutil"
"github.com/openimsdk/protocol/sdkws" "github.com/openimsdk/protocol/sdkws"
"github.com/openimsdk/tools/log"
"slices" "slices"
"github.com/openimsdk/open-im-server/v3/internal/rpc/incrversion" "github.com/openimsdk/open-im-server/v3/internal/rpc/incrversion"
@ -17,13 +18,22 @@ func (s *friendServer) NotificationUserInfoUpdate(ctx context.Context, req *rela
if err != nil { if err != nil {
return nil, err return nil, err
} }
if len(userIDs) > 0 {
friendUserIDs := []string{req.UserID}
noCancelCtx := context.WithoutCancel(ctx)
err := s.queue.PushCtx(ctx, func() {
for _, userID := range userIDs { for _, userID := range userIDs {
if err := s.db.OwnerIncrVersion(ctx, userID, []string{req.UserID}, model.VersionStateUpdate); err != nil { if err := s.db.OwnerIncrVersion(noCancelCtx, userID, friendUserIDs, model.VersionStateUpdate); err != nil {
return nil, err log.ZError(ctx, "OwnerIncrVersion", err, "userID", userID, "friendUserIDs", friendUserIDs)
} }
} }
for _, userID := range userIDs { for _, userID := range userIDs {
s.notificationSender.FriendInfoUpdatedNotification(ctx, req.UserID, userID) s.notificationSender.FriendInfoUpdatedNotification(noCancelCtx, req.UserID, userID)
}
})
if err != nil {
log.ZError(ctx, "NotificationUserInfoUpdate timeout", err, "userID", req.UserID)
}
} }
return &relation.NotificationUserInfoUpdateResp{}, nil return &relation.NotificationUserInfoUpdateResp{}, nil
} }

@ -111,7 +111,7 @@ func (m *msgServer) GetMaxSeq(ctx context.Context, req *sdkws.GetMaxSeqReq) (*sd
func (m *msgServer) SearchMessage(ctx context.Context, req *msg.SearchMessageReq) (resp *msg.SearchMessageResp, err error) { func (m *msgServer) SearchMessage(ctx context.Context, req *msg.SearchMessageReq) (resp *msg.SearchMessageResp, err error) {
var chatLogs []*sdkws.MsgData var chatLogs []*sdkws.MsgData
var total int32 var total int64
resp = &msg.SearchMessageResp{} resp = &msg.SearchMessageResp{}
if total, chatLogs, err = m.MsgDatabase.SearchMessage(ctx, req); err != nil { if total, chatLogs, err = m.MsgDatabase.SearchMessage(ctx, req); err != nil {
return nil, err return nil, err
@ -194,7 +194,7 @@ func (m *msgServer) SearchMessage(ctx context.Context, req *msg.SearchMessageReq
} }
resp.ChatLogs = append(resp.ChatLogs, pbchatLog) resp.ChatLogs = append(resp.ChatLogs, pbchatLog)
} }
resp.ChatLogsNum = total resp.ChatLogsNum = int32(total)
return resp, nil return resp, nil
} }

@ -45,7 +45,7 @@ func TestName(t *testing.T) {
} }
seqUser := NewSeqUserCacheRedis(rdb, mgoSeqUser) seqUser := NewSeqUserCacheRedis(rdb, mgoSeqUser)
res, err := seqUser.GetReadSeqs(ctx, "2110910952", []string{"sg_2920732023", "sg_345762580"}) res, err := seqUser.GetUserReadSeqs(ctx, "2110910952", []string{"sg_2920732023", "sg_345762580"})
if err != nil { if err != nil {
panic(err) panic(err)
} }

@ -4,7 +4,10 @@ import (
"context" "context"
"fmt" "fmt"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache/cachekey" "github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache/cachekey"
mgo2 "github.com/openimsdk/open-im-server/v3/pkg/common/storage/database/mgo"
"github.com/redis/go-redis/v9" "github.com/redis/go-redis/v9"
"go.mongodb.org/mongo-driver/mongo"
"go.mongodb.org/mongo-driver/mongo/options"
"log" "log"
"strconv" "strconv"
"sync/atomic" "sync/atomic"
@ -77,3 +80,32 @@ func TestRecvOnline(t *testing.T) {
fmt.Printf("Received message from channel %s: %s\n", msg.Channel, msg.Payload) fmt.Printf("Received message from channel %s: %s\n", msg.Channel, msg.Payload)
} }
} }
func TestName1(t *testing.T) {
opt := &redis.Options{
Addr: "172.16.8.48:16379",
Password: "openIM123",
DB: 0,
}
rdb := redis.NewClient(opt)
mgo, err := mongo.Connect(context.Background(),
options.Client().
ApplyURI("mongodb://openIM:openIM123@172.16.8.48:37017/openim_v3?maxPoolSize=100").
SetConnectTimeout(5*time.Second))
if err != nil {
panic(err)
}
model, err := mgo2.NewSeqUserMongo(mgo.Database("openim_v3"))
if err != nil {
panic(err)
}
seq := NewSeqUserCacheRedis(rdb, model)
res, err := seq.GetUserReadSeqs(context.Background(), "2110910952", []string{"sg_345762580", "2000", "3000"})
if err != nil {
panic(err)
}
t.Log(res)
}

@ -84,7 +84,7 @@ type CommonMsgDatabase interface {
//GetConversationMinMaxSeqInMongoAndCache(ctx context.Context, conversationID string) (minSeqMongo, maxSeqMongo, minSeqCache, maxSeqCache int64, err error) //GetConversationMinMaxSeqInMongoAndCache(ctx context.Context, conversationID string) (minSeqMongo, maxSeqMongo, minSeqCache, maxSeqCache int64, err error)
SetSendMsgStatus(ctx context.Context, id string, status int32) error SetSendMsgStatus(ctx context.Context, id string, status int32) error
GetSendMsgStatus(ctx context.Context, id string) (int32, error) GetSendMsgStatus(ctx context.Context, id string) (int32, error)
SearchMessage(ctx context.Context, req *pbmsg.SearchMessageReq) (total int32, msgData []*sdkws.MsgData, err error) SearchMessage(ctx context.Context, req *pbmsg.SearchMessageReq) (total int64, msgData []*sdkws.MsgData, err error)
FindOneByDocIDs(ctx context.Context, docIDs []string, seqs map[string]int64) (map[string]*sdkws.MsgData, error) FindOneByDocIDs(ctx context.Context, docIDs []string, seqs map[string]int64) (map[string]*sdkws.MsgData, error)
// to mq // to mq
@ -878,7 +878,7 @@ func (db *commonMsgDatabase) RangeGroupSendCount(
return db.msgDocDatabase.RangeGroupSendCount(ctx, start, end, ase, pageNumber, showNumber) return db.msgDocDatabase.RangeGroupSendCount(ctx, start, end, ase, pageNumber, showNumber)
} }
func (db *commonMsgDatabase) SearchMessage(ctx context.Context, req *pbmsg.SearchMessageReq) (total int32, msgData []*sdkws.MsgData, err error) { func (db *commonMsgDatabase) SearchMessage(ctx context.Context, req *pbmsg.SearchMessageReq) (total int64, msgData []*sdkws.MsgData, err error) {
var totalMsgs []*sdkws.MsgData var totalMsgs []*sdkws.MsgData
total, msgs, err := db.msgDocDatabase.SearchMessage(ctx, req) total, msgs, err := db.msgDocDatabase.SearchMessage(ctx, req)
if err != nil { if err != nil {

@ -278,124 +278,409 @@ func (m *MsgMgo) MarkSingleChatMsgsAsRead(ctx context.Context, userID string, do
return nil return nil
} }
func (m *MsgMgo) SearchMessage(ctx context.Context, req *msg.SearchMessageReq) (int32, []*model.MsgInfoModel, error) { //func (m *MsgMgo) searchCount(ctx context.Context, filter any) (int64, error) {
where := make(bson.A, 0, 6) //
// return nil, nil
//}
//func (m *MsgMgo) searchMessage(ctx context.Context, filter any, nextID primitive.ObjectID, content bool, limit int) (int64, []*model.MsgInfoModel, primitive.ObjectID, error) {
// var pipeline bson.A
// if !nextID.IsZero() {
// pipeline = append(pipeline, bson.M{"$match": bson.M{"_id": bson.M{"$gt": nextID}}})
// }
// pipeline = append(pipeline,
// bson.M{"$match": filter},
// bson.M{"$limit": limit},
// bson.M{"$unwind": "$msgs"},
// bson.M{"$match": filter},
// bson.M{
// "$group": bson.M{
// "_id": "$_id",
// "doc_id": bson.M{
// "$first": "$doc_id",
// },
// "msgs": bson.M{"$push": "$msgs"},
// },
// },
// )
// if !content {
// pipeline = append(pipeline,
// bson.M{
// "$project": bson.M{
// "_id": 1,
// "count": bson.M{"$size": "$msgs"},
// },
// },
// )
// type result struct {
// ID primitive.ObjectID `bson:"_id"`
// Count int64 `bson:"count"`
// }
// res, err := mongoutil.Aggregate[result](ctx, m.coll, pipeline)
// if err != nil {
// return 0, nil, primitive.ObjectID{}, err
// }
// if len(res) == 0 {
// return 0, nil, primitive.ObjectID{}, nil
// }
// var count int64
// for _, r := range res {
// count += r.Count
// }
// return count, nil, res[len(res)-1].ID, nil
// }
// type result struct {
// ID primitive.ObjectID `bson:"_id"`
// Msg []*model.MsgInfoModel `bson:"msgs"`
// }
// res, err := mongoutil.Aggregate[result](ctx, m.coll, pipeline)
// if err != nil {
// return 0, nil, primitive.ObjectID{}, err
// }
// if len(res) == 0 {
// return 0, nil, primitive.ObjectID{}, err
// }
// var count int
// for _, r := range res {
// count += len(r.Msg)
// }
// msgs := make([]*model.MsgInfoModel, 0, count)
// for _, r := range res {
// msgs = append(msgs, r.Msg...)
// }
// return int64(count), msgs, res[len(res)-1].ID, nil
//}
/*
db.msg3.aggregate(
[
{
"$match": {
"doc_id": "si_7009965934_8710838466:0"
},
}
]
)
*/
type searchMessageIndex struct {
ID primitive.ObjectID `bson:"_id"`
Index []int64 `bson:"index"`
}
func (m *MsgMgo) searchMessageIndex(ctx context.Context, filter any, nextID primitive.ObjectID, limit int) ([]searchMessageIndex, error) {
var pipeline bson.A
if !nextID.IsZero() {
pipeline = append(pipeline, bson.M{"$match": bson.M{"_id": bson.M{"$gt": nextID}}})
}
pipeline = append(pipeline,
bson.M{"$sort": bson.M{"_id": 1}},
bson.M{"$match": filter},
bson.M{"$limit": limit},
bson.M{
"$project": bson.M{
"_id": 1,
"msgs": bson.M{
"$map": bson.M{
"input": "$msgs",
"as": "msg",
"in": bson.M{
"$mergeObjects": bson.A{
"$$msg",
bson.M{
"_search_temp_index": bson.M{
"$indexOfArray": bson.A{
"$msgs", "$$msg",
},
},
},
},
},
},
},
},
},
bson.M{"$unwind": "$msgs"},
bson.M{"$match": filter},
bson.M{
"$project": bson.M{
"_id": 1,
"msgs._search_temp_index": 1,
},
},
bson.M{
"$group": bson.M{
"_id": "$_id",
"index": bson.M{"$push": "$msgs._search_temp_index"},
},
},
bson.M{"$sort": bson.M{"_id": 1}},
)
return mongoutil.Aggregate[searchMessageIndex](ctx, m.coll, pipeline)
}
func (m *MsgMgo) searchMessage(ctx context.Context, req *msg.SearchMessageReq) (int64, []searchMessageIndex, error) {
filter := bson.M{}
if req.RecvID != "" { if req.RecvID != "" {
where = append(where, bson.M{"msgs.msg.recv_id": req.RecvID}) filter["$or"] = bson.A{
bson.M{"msgs.msg.recv_id": req.RecvID},
bson.M{"msgs.msg.group_id": req.RecvID},
}
} }
if req.SendID != "" { if req.SendID != "" {
where = append(where, bson.M{"msgs.msg.send_id": req.SendID}) filter["msgs.msg.send_id"] = req.SendID
} }
if req.ContentType != 0 { if req.ContentType != 0 {
where = append(where, bson.M{"msgs.msg.content_type": req.ContentType}) filter["msgs.msg.content_type"] = req.ContentType
} }
if req.SessionType != 0 { if req.SessionType != 0 {
where = append(where, bson.M{"msgs.msg.session_type": req.SessionType}) filter["msgs.msg.session_type"] = req.SessionType
} }
if req.SendTime != "" { if req.SendTime != "" {
sendTime, err := time.Parse(time.DateOnly, req.SendTime) sendTime, err := time.Parse(time.DateOnly, req.SendTime)
if err != nil { if err != nil {
return 0, nil, errs.ErrArgs.WrapMsg("invalid sendTime", "req", req.SendTime, "format", time.DateOnly, "cause", err.Error()) return 0, nil, errs.ErrArgs.WrapMsg("invalid sendTime", "req", req.SendTime, "format", time.DateOnly, "cause", err.Error())
} }
where = append(where, filter["$and"] = bson.A{
bson.M{ bson.M{"msgs.msg.send_time": bson.M{
"msgs.msg.send_time": bson.M{
"$gte": sendTime.UnixMilli(), "$gte": sendTime.UnixMilli(),
}, }},
},
bson.M{ bson.M{
"msgs.msg.send_time": bson.M{ "msgs.msg.send_time": bson.M{
"$lt": sendTime.Add(time.Hour * 24).UnixMilli(), "$lt": sendTime.Add(time.Hour * 24).UnixMilli(),
}, },
}, },
}
}
var (
nextID primitive.ObjectID
count int
dataRange []searchMessageIndex
skip = int((req.Pagination.GetPageNumber() - 1) * req.Pagination.GetShowNumber())
) )
_, _ = dataRange, skip
const maxDoc = 50
data := make([]searchMessageIndex, 0, req.Pagination.GetShowNumber())
push := cap(data)
for i := 0; ; i++ {
res, err := m.searchMessageIndex(ctx, filter, nextID, maxDoc)
if err != nil {
return 0, nil, err
} }
pipeline := bson.A{ if len(res) > 0 {
bson.M{ nextID = res[len(res)-1].ID
"$unwind": "$msgs", }
}, for _, r := range res {
var dataIndex []int64
for _, index := range r.Index {
if push > 0 && count >= skip {
dataIndex = append(dataIndex, index)
push--
} }
if len(where) > 0 { count++
pipeline = append(pipeline, bson.M{ }
"$match": bson.M{"$and": where}, if len(dataIndex) > 0 {
data = append(data, searchMessageIndex{
ID: r.ID,
Index: dataIndex,
}) })
} }
pipeline = append(pipeline, }
bson.M{ if push <= 0 {
"$project": bson.M{ push--
"_id": 0, }
"msg": "$msgs.msg", if len(res) < maxDoc || push < -10 {
}, return int64(count), data, nil
}, }
bson.M{ }
"$count": "count", }
},
) func (m *MsgMgo) getDocRange(ctx context.Context, id primitive.ObjectID, index []int64) ([]*model.MsgInfoModel, error) {
count, err := mongoutil.Aggregate[int32](ctx, m.coll, pipeline) if len(index) == 0 {
return nil, nil
}
pipeline := bson.A{
bson.M{"$match": bson.M{"_id": id}},
bson.M{"$project": "$msgs"},
}
msgs, err := mongoutil.Aggregate[*model.MsgInfoModel](ctx, m.coll, pipeline)
if err != nil {
return nil, err
}
return msgs, nil
}
func (m *MsgMgo) SearchMessage(ctx context.Context, req *msg.SearchMessageReq) (int64, []*model.MsgInfoModel, error) {
count, data, err := m.searchMessage(ctx, req)
if err != nil { if err != nil {
return 0, nil, err return 0, nil, err
} }
if len(count) == 0 || count[0] == 0 { var msgs []*model.MsgInfoModel
return 0, nil, nil if len(data) > 0 {
var n int
for _, d := range data {
n += len(d.Index)
} }
pipeline = pipeline[:len(pipeline)-1] msgs = make([]*model.MsgInfoModel, 0, n)
pipeline = append(pipeline, }
bson.M{ for _, val := range data {
"$skip": (req.Pagination.GetPageNumber() - 1) * req.Pagination.GetShowNumber(), res, err := mongoutil.FindOne[*model.MsgDocModel](ctx, m.coll, bson.M{"_id": val.ID})
},
bson.M{
"$limit": req.Pagination.GetShowNumber(),
},
)
msgs, err := mongoutil.Aggregate[*model.MsgInfoModel](ctx, m.coll, pipeline)
if err != nil { if err != nil {
return 0, nil, err return 0, nil, err
} }
for i := range msgs { for _, i := range val.Index {
msgInfo := msgs[i] if i >= int64(len(res.Msg)) {
if msgInfo == nil || msgInfo.Msg == nil {
continue continue
} }
if msgInfo.Revoke != nil { msgs = append(msgs, res.Msg[i])
revokeContent := sdkws.MessageRevokedContent{
RevokerID: msgInfo.Revoke.UserID,
RevokerRole: msgInfo.Revoke.Role,
ClientMsgID: msgInfo.Msg.ClientMsgID,
RevokerNickname: msgInfo.Revoke.Nickname,
RevokeTime: msgInfo.Revoke.Time,
SourceMessageSendTime: msgInfo.Msg.SendTime,
SourceMessageSendID: msgInfo.Msg.SendID,
SourceMessageSenderNickname: msgInfo.Msg.SenderNickname,
SessionType: msgInfo.Msg.SessionType,
Seq: msgInfo.Msg.Seq,
Ex: msgInfo.Msg.Ex,
} }
data, err := jsonutil.JsonMarshal(&revokeContent)
if err != nil {
return 0, nil, errs.WrapMsg(err, "json.Marshal revokeContent")
} }
elem := sdkws.NotificationElem{Detail: string(data)} return count, msgs, nil
content, err := jsonutil.JsonMarshal(&elem)
if err != nil {
return 0, nil, errs.WrapMsg(err, "json.Marshal elem")
}
msgInfo.Msg.ContentType = constant.MsgRevokeNotification
msgInfo.Msg.Content = string(content)
}
}
//start := (req.Pagination.PageNumber - 1) * req.Pagination.ShowNumber
//n := int32(len(msgs))
//if start >= n {
// return n, []*relation.MsgInfoModel{}, nil
//}
//if start+req.Pagination.ShowNumber < n {
// msgs = msgs[start : start+req.Pagination.ShowNumber]
//} else {
// msgs = msgs[start:]
//}
return count[0], msgs, nil
} }
//func (m *MsgMgo) SearchMessage(ctx context.Context, req *msg.SearchMessageReq) (int32, []*model.MsgInfoModel, error) {
// where := make(bson.A, 0, 6)
// if req.RecvID != "" {
// if req.SessionType == constant.ReadGroupChatType {
// where = append(where, bson.M{
// "$or": bson.A{
// bson.M{"doc_id": "^n_" + req.RecvID + ":"},
// bson.M{"doc_id": "^sg_" + req.RecvID + ":"},
// },
// })
// } else {
// where = append(where, bson.M{"msgs.msg.recv_id": req.RecvID})
// }
// }
// if req.SendID != "" {
// where = append(where, bson.M{"msgs.msg.send_id": req.SendID})
// }
// if req.ContentType != 0 {
// where = append(where, bson.M{"msgs.msg.content_type": req.ContentType})
// }
// if req.SessionType != 0 {
// where = append(where, bson.M{"msgs.msg.session_type": req.SessionType})
// }
// if req.SendTime != "" {
// sendTime, err := time.Parse(time.DateOnly, req.SendTime)
// if err != nil {
// return 0, nil, errs.ErrArgs.WrapMsg("invalid sendTime", "req", req.SendTime, "format", time.DateOnly, "cause", err.Error())
// }
// where = append(where,
// bson.M{
// "msgs.msg.send_time": bson.M{
// "$gte": sendTime.UnixMilli(),
// },
// },
// bson.M{
// "msgs.msg.send_time": bson.M{
// "$lt": sendTime.Add(time.Hour * 24).UnixMilli(),
// },
// },
// )
// }
// opt := options.Find().SetLimit(100)
// res, err := mongoutil.Find[model.MsgDocModel](ctx, m.coll, bson.M{"$and": where}, opt)
// if err != nil {
// return 0, nil, err
// }
// _ = res
// fmt.Println()
//
// return 0, nil, nil
// pipeline := bson.A{
// bson.M{
// "$unwind": "$msgs",
// },
// }
// if len(where) > 0 {
// pipeline = append(pipeline, bson.M{
// "$match": bson.M{"$and": where},
// })
// }
// pipeline = append(pipeline,
// bson.M{
// "$project": bson.M{
// "_id": 0,
// "msg": "$msgs.msg",
// },
// },
// bson.M{
// "$count": "count",
// },
// )
// //count, err := mongoutil.Aggregate[int32](ctx, m.coll, pipeline)
// //if err != nil {
// // return 0, nil, err
// //}
// //if len(count) == 0 || count[0] == 0 {
// // return 0, nil, nil
// //}
// count := []int32{0}
// pipeline = pipeline[:len(pipeline)-1]
// pipeline = append(pipeline,
// bson.M{
// "$skip": (req.Pagination.GetPageNumber() - 1) * req.Pagination.GetShowNumber(),
// },
// bson.M{
// "$limit": req.Pagination.GetShowNumber(),
// },
// )
// msgs, err := mongoutil.Aggregate[*model.MsgInfoModel](ctx, m.coll, pipeline)
// if err != nil {
// return 0, nil, err
// }
// for i := range msgs {
// msgInfo := msgs[i]
// if msgInfo == nil || msgInfo.Msg == nil {
// continue
// }
// if msgInfo.Revoke != nil {
// revokeContent := sdkws.MessageRevokedContent{
// RevokerID: msgInfo.Revoke.UserID,
// RevokerRole: msgInfo.Revoke.Role,
// ClientMsgID: msgInfo.Msg.ClientMsgID,
// RevokerNickname: msgInfo.Revoke.Nickname,
// RevokeTime: msgInfo.Revoke.Time,
// SourceMessageSendTime: msgInfo.Msg.SendTime,
// SourceMessageSendID: msgInfo.Msg.SendID,
// SourceMessageSenderNickname: msgInfo.Msg.SenderNickname,
// SessionType: msgInfo.Msg.SessionType,
// Seq: msgInfo.Msg.Seq,
// Ex: msgInfo.Msg.Ex,
// }
// data, err := jsonutil.JsonMarshal(&revokeContent)
// if err != nil {
// return 0, nil, errs.WrapMsg(err, "json.Marshal revokeContent")
// }
// elem := sdkws.NotificationElem{Detail: string(data)}
// content, err := jsonutil.JsonMarshal(&elem)
// if err != nil {
// return 0, nil, errs.WrapMsg(err, "json.Marshal elem")
// }
// msgInfo.Msg.ContentType = constant.MsgRevokeNotification
// msgInfo.Msg.Content = string(content)
// }
// }
// //start := (req.Pagination.PageNumber - 1) * req.Pagination.ShowNumber
// //n := int32(len(msgs))
// //if start >= n {
// // return n, []*relation.MsgInfoModel{}, nil
// //}
// //if start+req.Pagination.ShowNumber < n {
// // msgs = msgs[start : start+req.Pagination.ShowNumber]
// //} else {
// // msgs = msgs[start:]
// //}
// return count[0], msgs, nil
//}
func (m *MsgMgo) RangeUserSendCount(ctx context.Context, start time.Time, end time.Time, group bool, ase bool, pageNumber int32, showNumber int32) (msgCount int64, userCount int64, users []*model.UserCount, dateCount map[string]int64, err error) { func (m *MsgMgo) RangeUserSendCount(ctx context.Context, start time.Time, end time.Time, group bool, ase bool, pageNumber int32, showNumber int32) (msgCount int64, userCount int64, users []*model.UserCount, dateCount map[string]int64, err error) {
var sort int var sort int
if ase { if ase {

@ -0,0 +1,75 @@
package mgo
import (
"context"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/model"
"github.com/openimsdk/protocol/msg"
"github.com/openimsdk/protocol/sdkws"
"github.com/openimsdk/tools/db/mongoutil"
"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/mongo"
"go.mongodb.org/mongo-driver/mongo/options"
"math/rand"
"strconv"
"testing"
"time"
)
func TestName1(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*300)
defer cancel()
cli := Result(mongo.Connect(ctx, options.Client().ApplyURI("mongodb://openIM:openIM123@172.16.8.48:37017/openim_v3?maxPoolSize=100").SetConnectTimeout(5*time.Second)))
v := &MsgMgo{
coll: cli.Database("openim_v3").Collection("msg3"),
}
req := &msg.SearchMessageReq{
//RecvID: "3187706596",
//SendID: "7009965934",
ContentType: 101,
//SendTime: "2024-05-06",
//SessionType: 3,
Pagination: &sdkws.RequestPagination{
PageNumber: 1,
ShowNumber: 10,
},
}
total, res, err := v.SearchMessage(ctx, req)
if err != nil {
panic(err)
}
for i, re := range res {
t.Logf("%d => %d | %+v", i+1, re.Msg.Seq, re.Msg.Content)
}
t.Log(total)
}
func TestName10(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*10)
defer cancel()
cli := Result(mongo.Connect(ctx, options.Client().ApplyURI("mongodb://openIM:openIM123@172.16.8.48:37017/openim_v3?maxPoolSize=100").SetConnectTimeout(5*time.Second)))
v := &MsgMgo{
coll: cli.Database("openim_v3").Collection("msg3"),
}
opt := options.Find().SetLimit(1000)
res, err := mongoutil.Find[model.MsgDocModel](ctx, v.coll, bson.M{}, opt)
if err != nil {
panic(err)
}
ctx = context.Background()
for i := 0; i < 100000; i++ {
for j := range res {
res[j].DocID = strconv.FormatUint(rand.Uint64(), 10) + ":0"
}
if err := mongoutil.InsertMany(ctx, v.coll, res); err != nil {
panic(err)
}
t.Log("====>", time.Now(), i)
}
}

@ -26,7 +26,7 @@ func Mongodb() *mongo.Database {
func TestUserSeq(t *testing.T) { func TestUserSeq(t *testing.T) {
uSeq := Result(NewSeqUserMongo(Mongodb())).(*seqUserMongo) uSeq := Result(NewSeqUserMongo(Mongodb())).(*seqUserMongo)
t.Log(uSeq.SetMinSeq(context.Background(), "1000", "2000", 4)) t.Log(uSeq.SetUserMinSeq(context.Background(), "1000", "2000", 4))
} }
func TestConversationSeq(t *testing.T) { func TestConversationSeq(t *testing.T) {
@ -35,3 +35,8 @@ func TestConversationSeq(t *testing.T) {
t.Log(cSeq.Malloc(context.Background(), "2000", 10)) t.Log(cSeq.Malloc(context.Background(), "2000", 10))
t.Log(cSeq.GetMaxSeq(context.Background(), "2000")) t.Log(cSeq.GetMaxSeq(context.Background(), "2000"))
} }
func TestUserGetUserReadSeqs(t *testing.T) {
uSeq := Result(NewSeqUserMongo(Mongodb())).(*seqUserMongo)
t.Log(uSeq.GetUserReadSeqs(context.Background(), "2110910952", []string{"sg_345762580", "2000", "3000"}))
}

@ -88,6 +88,14 @@ func (s *seqUserMongo) GetUserReadSeq(ctx context.Context, conversationID string
return s.getSeq(ctx, conversationID, userID, "read_seq") return s.getSeq(ctx, conversationID, userID, "read_seq")
} }
func (s *seqUserMongo) notFoundSet0(seq map[string]int64, conversationIDs []string) {
for _, conversationID := range conversationIDs {
if _, ok := seq[conversationID]; !ok {
seq[conversationID] = 0
}
}
}
func (s *seqUserMongo) GetUserReadSeqs(ctx context.Context, userID string, conversationID []string) (map[string]int64, error) { func (s *seqUserMongo) GetUserReadSeqs(ctx context.Context, userID string, conversationID []string) (map[string]int64, error) {
if len(conversationID) == 0 { if len(conversationID) == 0 {
return map[string]int64{}, nil return map[string]int64{}, nil
@ -102,6 +110,7 @@ func (s *seqUserMongo) GetUserReadSeqs(ctx context.Context, userID string, conve
for _, seq := range seqs { for _, seq := range seqs {
res[seq.ConversationID] = seq.ReadSeq res[seq.ConversationID] = seq.ReadSeq
} }
s.notFoundSet0(res, conversationID)
return res, nil return res, nil
} }

@ -37,7 +37,7 @@ type Msg interface {
GetMsgDocModelByIndex(ctx context.Context, conversationID string, index, sort int64) (*model.MsgDocModel, error) GetMsgDocModelByIndex(ctx context.Context, conversationID string, index, sort int64) (*model.MsgDocModel, error)
DeleteMsgsInOneDocByIndex(ctx context.Context, docID string, indexes []int) error DeleteMsgsInOneDocByIndex(ctx context.Context, docID string, indexes []int) error
MarkSingleChatMsgsAsRead(ctx context.Context, userID string, docID string, indexes []int64) error MarkSingleChatMsgsAsRead(ctx context.Context, userID string, docID string, indexes []int64) error
SearchMessage(ctx context.Context, req *msg.SearchMessageReq) (int32, []*model.MsgInfoModel, error) SearchMessage(ctx context.Context, req *msg.SearchMessageReq) (int64, []*model.MsgInfoModel, error)
RangeUserSendCount(ctx context.Context, start time.Time, end time.Time, group bool, ase bool, pageNumber int32, showNumber int32) (msgCount int64, userCount int64, users []*model.UserCount, dateCount map[string]int64, err error) RangeUserSendCount(ctx context.Context, start time.Time, end time.Time, group bool, ase bool, pageNumber int32, showNumber int32) (msgCount int64, userCount int64, users []*model.UserCount, dateCount map[string]int64, err error)
RangeGroupSendCount(ctx context.Context, start time.Time, end time.Time, ase bool, pageNumber int32, showNumber int32) (msgCount int64, userCount int64, groups []*model.GroupCount, dateCount map[string]int64, err error) RangeGroupSendCount(ctx context.Context, start time.Time, end time.Time, ase bool, pageNumber int32, showNumber int32) (msgCount int64, userCount int64, groups []*model.GroupCount, dateCount map[string]int64, err error)
ConvertMsgsDocLen(ctx context.Context, conversationIDs []string) ConvertMsgsDocLen(ctx context.Context, conversationIDs []string)

Loading…
Cancel
Save