From bff1266315654c0fd99d321889074f2231c26310 Mon Sep 17 00:00:00 2001 From: withchao <993506633@qq.com> Date: Wed, 8 May 2024 16:12:37 +0800 Subject: [PATCH 1/4] fix: SearchMessage --- pkg/common/db/mgo/msg.go | 122 +++++++++++++++++++++++---------------- 1 file changed, 72 insertions(+), 50 deletions(-) diff --git a/pkg/common/db/mgo/msg.go b/pkg/common/db/mgo/msg.go index 6fe24536b..17e493d33 100644 --- a/pkg/common/db/mgo/msg.go +++ b/pkg/common/db/mgo/msg.go @@ -267,58 +267,80 @@ func (m *MsgMgo) MarkSingleChatMsgsAsRead(ctx context.Context, userID string, do } func (m *MsgMgo) SearchMessage(ctx context.Context, req *msg.SearchMessageReq) (int32, []*relation.MsgInfoModel, error) { - var pipe mongo.Pipeline - condition := bson.A{} - if req.SendTime != "" { - // Changed to keyed fields for bson.M to avoid govet errors - condition = append(condition, bson.M{"$eq": bson.A{bson.M{"$dateToString": bson.M{"format": "%Y-%m-%d", "date": bson.M{"$toDate": "$$item.msg.send_time"}}}, req.SendTime}}) + where := make(bson.A, 0, 6) + if req.RecvID != "" { + where = append(where, bson.M{"msgs.msg.recv_id": req.RecvID}) + } + if req.SendID != "" { + where = append(where, bson.M{"msgs.msg.send_id": req.SendID}) } if req.ContentType != 0 { - condition = append(condition, bson.M{"$eq": bson.A{"$$item.msg.content_type", req.ContentType}}) + where = append(where, bson.M{"msgs.msg.content_type": req.ContentType}) } if req.SessionType != 0 { - condition = append(condition, bson.M{"$eq": bson.A{"$$item.msg.session_type", req.SessionType}}) + where = append(where, bson.M{"msgs.msg.session_type": req.SessionType}) } - if req.RecvID != "" { - condition = append(condition, bson.M{"$regexFind": bson.M{"input": "$$item.msg.recv_id", "regex": req.RecvID}}) + if req.SendTime != "" { + sendTime, err := time.Parse(time.DateOnly, req.SendTime) + if err != nil { + return 0, nil, errs.ErrArgs.WrapMsg("invalid sendTime", "req", req.SendTime, "format", time.DateOnly, "cause", err.Error()) + } + where = append(where, + bson.M{ + "msgs.msg.send_time": bson.M{ + "$gte": sendTime.UnixMilli(), + }, + }, + bson.M{ + "msgs.msg.send_time": bson.M{ + "$lt": sendTime.Add(time.Hour * 24).UnixMilli(), + }, + }, + ) } - if req.SendID != "" { - condition = append(condition, bson.M{"$regexFind": bson.M{"input": "$$item.msg.send_id", "regex": req.SendID}}) + pipeline := bson.A{ + bson.M{ + "$unwind": "$msgs", + }, } - - or := bson.A{ - bson.M{"doc_id": bson.M{"$regex": "^si_", "$options": "i"}}, - bson.M{"doc_id": bson.M{"$regex": "^g_", "$options": "i"}}, - bson.M{"doc_id": bson.M{"$regex": "^sg_", "$options": "i"}}, + if len(where) > 0 { + pipeline = append(pipeline, bson.M{ + "$match": bson.M{"$and": where}, + }) } - - // Use bson.D with keyed fields to specify the order explicitly - pipe = mongo.Pipeline{ - {{"$match", bson.D{{Key: "$or", Value: or}}}}, - {{"$project", bson.D{ - {Key: "msgs", Value: bson.D{ - {Key: "$filter", Value: bson.D{ - {Key: "input", Value: "$msgs"}, - {Key: "as", Value: "item"}, - {Key: "cond", Value: bson.D{{Key: "$and", Value: condition}}}, - }}, - }}, - {Key: "doc_id", Value: 1}, - }}}, - {{"$unwind", bson.M{"path": "$msgs"}}}, - {{"$sort", bson.M{"msgs.msg.send_time": -1}}}, + pipeline = append(pipeline, + bson.M{ + "$project": bson.M{ + "_id": 0, + "msg": "$msgs.msg", + }, + }, + bson.M{ + "$count": "count", + }, + ) + count, err := mongoutil.Aggregate[int32](ctx, m.coll, pipeline) + if err != nil { + return 0, nil, err } - type docModel struct { - DocID string `bson:"doc_id"` - Msg *relation.MsgInfoModel `bson:"msgs"` + if len(count) == 0 || count[0] == 0 { + return 0, nil, nil } - msgsDocs, err := mongoutil.Aggregate[*docModel](ctx, m.coll, pipe) + pipeline = pipeline[:len(pipeline)-1] + pipeline = append(pipeline, + bson.M{ + "$skip": (req.Pagination.GetPageNumber() - 1) * req.Pagination.GetShowNumber(), + }, + bson.M{ + "$limit": req.Pagination.GetShowNumber(), + }, + ) + msgs, err := mongoutil.Aggregate[*relation.MsgInfoModel](ctx, m.coll, pipeline) if err != nil { return 0, nil, err } - msgs := make([]*relation.MsgInfoModel, 0) - for _, doc := range msgsDocs { - msgInfo := doc.Msg + for i := range msgs { + msgInfo := msgs[i] if msgInfo == nil || msgInfo.Msg == nil { continue } @@ -350,17 +372,17 @@ func (m *MsgMgo) SearchMessage(ctx context.Context, req *msg.SearchMessageReq) ( } msgs = append(msgs, msgInfo) } - start := (req.Pagination.PageNumber - 1) * req.Pagination.ShowNumber - n := int32(len(msgs)) - if start >= n { - return n, []*relation.MsgInfoModel{}, nil - } - if start+req.Pagination.ShowNumber < n { - msgs = msgs[start : start+req.Pagination.ShowNumber] - } else { - msgs = msgs[start:] - } - return n, msgs, nil + //start := (req.Pagination.PageNumber - 1) * req.Pagination.ShowNumber + //n := int32(len(msgs)) + //if start >= n { + // return n, []*relation.MsgInfoModel{}, nil + //} + //if start+req.Pagination.ShowNumber < n { + // msgs = msgs[start : start+req.Pagination.ShowNumber] + //} else { + // msgs = msgs[start:] + //} + return count[0], msgs, nil } func (m *MsgMgo) RangeUserSendCount(ctx context.Context, start time.Time, end time.Time, group bool, ase bool, pageNumber int32, showNumber int32) (msgCount int64, userCount int64, users []*relation.UserCount, dateCount map[string]int64, err error) { From 0d1eea3cdb94d826a606fe4a51dfc4a142fda9cf Mon Sep 17 00:00:00 2001 From: withchao <993506633@qq.com> Date: Tue, 14 May 2024 15:45:05 +0800 Subject: [PATCH 2/4] fix: s3 config --- config/openim-rpc-third.yml | 17 +---------------- 1 file changed, 1 insertion(+), 16 deletions(-) diff --git a/config/openim-rpc-third.yml b/config/openim-rpc-third.yml index bb41c93ae..bde38ccc4 100644 --- a/config/openim-rpc-third.yml +++ b/config/openim-rpc-third.yml @@ -29,19 +29,4 @@ object: accessKeyID: '' accessKeySecret: '' sessionToken: '' - publicRead: false - kodo: - endpoint: "webhook://s3.cn-east-1.qiniucs.com" - bucket: "demo-9999999" - bucketURL: "webhook://your.domain.com" - accessKeyID: '' - accessKeySecret: '' - sessionToken: '' - publicRead: false - aws: - endpoint: "''" - region: "us-east-1" - bucket: "demo-9999999" - accessKeyID: '' - accessKeySecret: '' - publicRead: false + publicRead: false \ No newline at end of file From bb6b2ec542b93c06a7a5b291a69c6fa06d6941f6 Mon Sep 17 00:00:00 2001 From: withchao <993506633@qq.com> Date: Wed, 15 May 2024 10:30:37 +0800 Subject: [PATCH 3/4] fix: oss panic --- go.mod | 5 ++--- go.sum | 8 ++++---- 2 files changed, 6 insertions(+), 7 deletions(-) diff --git a/go.mod b/go.mod index b522065da..3a6ae0bd9 100644 --- a/go.mod +++ b/go.mod @@ -14,7 +14,7 @@ require ( github.com/lestrrat-go/file-rotatelogs v2.4.0+incompatible // indirect github.com/mitchellh/mapstructure v1.5.0 github.com/openimsdk/protocol v0.0.65 - github.com/openimsdk/tools v0.0.49-alpha.2 + github.com/openimsdk/tools v0.0.49-alpha.19 github.com/pkg/errors v0.9.1 // indirect github.com/prometheus/client_golang v1.18.0 github.com/stretchr/testify v1.9.0 @@ -44,7 +44,6 @@ require ( golang.org/x/sync v0.6.0 ) - require ( cloud.google.com/go v0.112.0 // indirect cloud.google.com/go/compute v1.23.3 // indirect @@ -75,7 +74,7 @@ require ( github.com/go-playground/universal-translator v0.18.1 // indirect github.com/go-zookeeper/zk v1.0.3 // indirect github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect - github.com/golang/protobuf v1.5.3 // indirect + github.com/golang/protobuf v1.5.4 // indirect github.com/golang/snappy v0.0.4 // indirect github.com/google/go-querystring v1.1.0 // indirect github.com/google/s2a-go v0.1.7 // indirect diff --git a/go.sum b/go.sum index e2ecf7284..fa72d61a2 100644 --- a/go.sum +++ b/go.sum @@ -132,8 +132,8 @@ github.com/golang/protobuf v1.4.2/go.mod h1:oDoupMAO8OvCJWAcko0GGGIgR6R6ocIYbsSw github.com/golang/protobuf v1.4.3/go.mod h1:oDoupMAO8OvCJWAcko0GGGIgR6R6ocIYbsSw735rRwI= github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk= github.com/golang/protobuf v1.5.2/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY= -github.com/golang/protobuf v1.5.3 h1:KhyjKVUg7Usr/dYsdSqoFveMYd5ko72D+zANwlG1mmg= -github.com/golang/protobuf v1.5.3/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY= +github.com/golang/protobuf v1.5.4 h1:i7eJL8qZTpSEXOPTxNKhASYpMn+8e5Q6AdndVa1dWek= +github.com/golang/protobuf v1.5.4/go.mod h1:lnTiLA8Wa4RWRcIUkrtSVa5nRhsEGBg48fD6rSs7xps= github.com/golang/snappy v0.0.4 h1:yAGX7huGHXlcLOEtBnF4w7FQwA26wojNCwOYAEhLjQM= github.com/golang/snappy v0.0.4/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= github.com/google/go-cmp v0.2.0/go.mod h1:oXzfMopK8JAjlY9xF4vHSVASa0yLyX7SntLO5aqRK0M= @@ -283,8 +283,8 @@ github.com/openimsdk/gomake v0.0.13 h1:xLDe/moqgWpRoptHzI4packAWzs4C16b+sVY+txNJ github.com/openimsdk/gomake v0.0.13/go.mod h1:PndCozNc2IsQIciyn9mvEblYWZwJmAI+06z94EY+csI= github.com/openimsdk/protocol v0.0.65 h1:SPT9qyUsFRTTKSKb/FjpS+xr6sxz/Kbnu+su1bxYagc= github.com/openimsdk/protocol v0.0.65/go.mod h1:OZQA9FR55lseYoN2Ql1XAHYKHJGu7OMNkUbuekrKCM8= -github.com/openimsdk/tools v0.0.49-alpha.2 h1:8IfV6o2ySU7C54sh/MG7ctEp1h3lSNe03OCUDWSk5Ws= -github.com/openimsdk/tools v0.0.49-alpha.2/go.mod h1:P4oGP1Pd+d4ctbLD5U/XQTgl8yu8Hd3skx640Fr69ko= +github.com/openimsdk/tools v0.0.49-alpha.19 h1:CbASL0yefRSVAmWPVeRnhF7wZKd6umLfz31CIhEgrBs= +github.com/openimsdk/tools v0.0.49-alpha.19/go.mod h1:g7mkHXYUPi0/8aAX8VPMHpnb3hqdV69Jph+bXOGvvNM= github.com/pelletier/go-toml/v2 v2.1.0 h1:FnwAJ4oYMvbT/34k9zzHuZNrhlz48GB3/s6at6/MHO4= github.com/pelletier/go-toml/v2 v2.1.0/go.mod h1:tJU2Z3ZkXwnxa4DPO899bsyIoywizdUvyaeZurnPPDc= github.com/pierrec/lz4/v4 v4.1.21 h1:yOVMLb6qSIDP67pl/5F7RepeKYu/VmTyEXvuMI5d9mQ= From 4265f99a7eef79bec0de175b97ec4b004808f55d Mon Sep 17 00:00:00 2001 From: withchao <993506633@qq.com> Date: Wed, 15 May 2024 10:37:35 +0800 Subject: [PATCH 4/4] kafka --- internal/msgtransfer/online_history_msg_handler.go | 2 +- internal/msgtransfer/online_msg_to_mongo_handler.go | 2 +- internal/push/push_handler.go | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/internal/msgtransfer/online_history_msg_handler.go b/internal/msgtransfer/online_history_msg_handler.go index 8691e92ab..df2660804 100644 --- a/internal/msgtransfer/online_history_msg_handler.go +++ b/internal/msgtransfer/online_history_msg_handler.go @@ -83,7 +83,7 @@ type OnlineHistoryRedisConsumerHandler struct { func NewOnlineHistoryRedisConsumerHandler(kafkaConf *config.Kafka, database controller.CommonMsgDatabase, conversationRpcClient *rpcclient.ConversationRpcClient, groupRpcClient *rpcclient.GroupRpcClient) (*OnlineHistoryRedisConsumerHandler, error) { - historyConsumerGroup, err := kafka.NewMConsumerGroup(kafkaConf.Build(), kafkaConf.ToRedisGroupID, []string{kafkaConf.ToRedisTopic}) + historyConsumerGroup, err := kafka.NewMConsumerGroup(kafkaConf.Build(), kafkaConf.ToRedisGroupID, []string{kafkaConf.ToRedisTopic}, true) if err != nil { return nil, err } diff --git a/internal/msgtransfer/online_msg_to_mongo_handler.go b/internal/msgtransfer/online_msg_to_mongo_handler.go index 978302e76..c9c035893 100644 --- a/internal/msgtransfer/online_msg_to_mongo_handler.go +++ b/internal/msgtransfer/online_msg_to_mongo_handler.go @@ -33,7 +33,7 @@ type OnlineHistoryMongoConsumerHandler struct { } func NewOnlineHistoryMongoConsumerHandler(kafkaConf *config.Kafka, database controller.CommonMsgDatabase) (*OnlineHistoryMongoConsumerHandler, error) { - historyConsumerGroup, err := kafka.NewMConsumerGroup(kafkaConf.Build(), kafkaConf.ToMongoGroupID, []string{kafkaConf.ToMongoTopic}) + historyConsumerGroup, err := kafka.NewMConsumerGroup(kafkaConf.Build(), kafkaConf.ToMongoGroupID, []string{kafkaConf.ToMongoTopic}, true) if err != nil { return nil, err } diff --git a/internal/push/push_handler.go b/internal/push/push_handler.go index 3a9a696f6..bf0ede375 100644 --- a/internal/push/push_handler.go +++ b/internal/push/push_handler.go @@ -60,7 +60,7 @@ func NewConsumerHandler(config *Config, offlinePusher offlinepush.OfflinePusher, var consumerHandler ConsumerHandler var err error consumerHandler.pushConsumerGroup, err = kafka.NewMConsumerGroup(config.KafkaConfig.Build(), config.KafkaConfig.ToPushGroupID, - []string{config.KafkaConfig.ToPushTopic}) + []string{config.KafkaConfig.ToPushTopic}, true) if err != nil { return nil, err }