|
|
@ -5,6 +5,7 @@ import (
|
|
|
|
"encoding/json"
|
|
|
|
"encoding/json"
|
|
|
|
"errors"
|
|
|
|
"errors"
|
|
|
|
"fmt"
|
|
|
|
"fmt"
|
|
|
|
|
|
|
|
"time"
|
|
|
|
|
|
|
|
|
|
|
|
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/constant"
|
|
|
|
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/constant"
|
|
|
|
|
|
|
|
|
|
|
@ -308,3 +309,168 @@ func (m *MsgMongoDriver) MarkSingleChatMsgsAsRead(ctx context.Context, userID st
|
|
|
|
_, err := m.MsgCollection.BulkWrite(ctx, updates)
|
|
|
|
_, err := m.MsgCollection.BulkWrite(ctx, updates)
|
|
|
|
return err
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
func (m *MsgMongoDriver) RangeCount(ctx context.Context, start time.Time, end time.Time) (int64, error) {
|
|
|
|
|
|
|
|
type Total struct {
|
|
|
|
|
|
|
|
Total int64 `bson:"total"`
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
pipeline := bson.A{
|
|
|
|
|
|
|
|
bson.M{
|
|
|
|
|
|
|
|
"$addFields": bson.M{
|
|
|
|
|
|
|
|
"msgs": bson.M{
|
|
|
|
|
|
|
|
"$filter": bson.M{
|
|
|
|
|
|
|
|
"input": "$msgs",
|
|
|
|
|
|
|
|
"as": "item",
|
|
|
|
|
|
|
|
"cond": bson.M{
|
|
|
|
|
|
|
|
"$and": bson.A{
|
|
|
|
|
|
|
|
bson.M{
|
|
|
|
|
|
|
|
"$gte": bson.A{
|
|
|
|
|
|
|
|
"$$item.msg.send_time", start.UnixMilli(),
|
|
|
|
|
|
|
|
},
|
|
|
|
|
|
|
|
"$lt": bson.A{
|
|
|
|
|
|
|
|
"$$item.msg.send_time", end.UnixMilli(),
|
|
|
|
|
|
|
|
},
|
|
|
|
|
|
|
|
},
|
|
|
|
|
|
|
|
},
|
|
|
|
|
|
|
|
},
|
|
|
|
|
|
|
|
},
|
|
|
|
|
|
|
|
},
|
|
|
|
|
|
|
|
},
|
|
|
|
|
|
|
|
},
|
|
|
|
|
|
|
|
bson.M{
|
|
|
|
|
|
|
|
"$group": bson.M{
|
|
|
|
|
|
|
|
"_id": nil,
|
|
|
|
|
|
|
|
"total": bson.M{
|
|
|
|
|
|
|
|
"$sum": bson.M{
|
|
|
|
|
|
|
|
"$size": "$msgs",
|
|
|
|
|
|
|
|
},
|
|
|
|
|
|
|
|
},
|
|
|
|
|
|
|
|
},
|
|
|
|
|
|
|
|
},
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
cur, err := m.MsgCollection.Aggregate(ctx, pipeline)
|
|
|
|
|
|
|
|
if err != nil {
|
|
|
|
|
|
|
|
return 0, errs.Wrap(err)
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
defer cur.Close(ctx)
|
|
|
|
|
|
|
|
var total []Total
|
|
|
|
|
|
|
|
if err := cur.All(ctx, &total); err != nil {
|
|
|
|
|
|
|
|
return 0, err
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
if len(total) == 0 {
|
|
|
|
|
|
|
|
return 0, nil
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
return total[0].Total, nil
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
func (m *MsgMongoDriver) RangeUserSendCount(ctx context.Context, ase bool, start time.Time, end time.Time) (int64, []*table.UserCount, error) {
|
|
|
|
|
|
|
|
var sort int
|
|
|
|
|
|
|
|
if ase {
|
|
|
|
|
|
|
|
sort = -1
|
|
|
|
|
|
|
|
} else {
|
|
|
|
|
|
|
|
sort = 1
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
type Result struct {
|
|
|
|
|
|
|
|
Total int64 `bson:"result"`
|
|
|
|
|
|
|
|
Result []struct {
|
|
|
|
|
|
|
|
UserID string `bson:"_id"`
|
|
|
|
|
|
|
|
Count int64 `bson:"count"`
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
pipeline := bson.A{
|
|
|
|
|
|
|
|
bson.M{
|
|
|
|
|
|
|
|
"$addFields": bson.M{
|
|
|
|
|
|
|
|
"msgs": bson.M{
|
|
|
|
|
|
|
|
"$filter": bson.M{
|
|
|
|
|
|
|
|
"input": "$msgs",
|
|
|
|
|
|
|
|
"as": "item",
|
|
|
|
|
|
|
|
"cond": bson.M{
|
|
|
|
|
|
|
|
"$and": bson.A{
|
|
|
|
|
|
|
|
bson.M{
|
|
|
|
|
|
|
|
"$gte": bson.A{
|
|
|
|
|
|
|
|
"$$item.msg.send_time", start.UnixMilli(),
|
|
|
|
|
|
|
|
},
|
|
|
|
|
|
|
|
"$lt": bson.A{
|
|
|
|
|
|
|
|
"$$item.msg.send_time", end.UnixMilli(),
|
|
|
|
|
|
|
|
},
|
|
|
|
|
|
|
|
},
|
|
|
|
|
|
|
|
},
|
|
|
|
|
|
|
|
},
|
|
|
|
|
|
|
|
},
|
|
|
|
|
|
|
|
},
|
|
|
|
|
|
|
|
},
|
|
|
|
|
|
|
|
},
|
|
|
|
|
|
|
|
bson.M{
|
|
|
|
|
|
|
|
"project": bson.M{
|
|
|
|
|
|
|
|
"_id": 0,
|
|
|
|
|
|
|
|
"doc_id": 0,
|
|
|
|
|
|
|
|
},
|
|
|
|
|
|
|
|
},
|
|
|
|
|
|
|
|
bson.M{
|
|
|
|
|
|
|
|
"$project": bson.M{
|
|
|
|
|
|
|
|
"msgs": bson.M{
|
|
|
|
|
|
|
|
"$map": bson.M{
|
|
|
|
|
|
|
|
"input": "$msgs",
|
|
|
|
|
|
|
|
"as": "item",
|
|
|
|
|
|
|
|
"in": "$$item.msg.send_id",
|
|
|
|
|
|
|
|
},
|
|
|
|
|
|
|
|
},
|
|
|
|
|
|
|
|
},
|
|
|
|
|
|
|
|
},
|
|
|
|
|
|
|
|
bson.M{
|
|
|
|
|
|
|
|
"$unwind": "$msgs",
|
|
|
|
|
|
|
|
},
|
|
|
|
|
|
|
|
bson.M{
|
|
|
|
|
|
|
|
"$sortByCount": "$msgs",
|
|
|
|
|
|
|
|
},
|
|
|
|
|
|
|
|
bson.M{
|
|
|
|
|
|
|
|
"sort": bson.M{
|
|
|
|
|
|
|
|
"count": sort,
|
|
|
|
|
|
|
|
},
|
|
|
|
|
|
|
|
},
|
|
|
|
|
|
|
|
bson.M{
|
|
|
|
|
|
|
|
"$group": bson.M{
|
|
|
|
|
|
|
|
"_id": nil,
|
|
|
|
|
|
|
|
"result": bson.M{
|
|
|
|
|
|
|
|
"$push": "$$ROOT",
|
|
|
|
|
|
|
|
},
|
|
|
|
|
|
|
|
},
|
|
|
|
|
|
|
|
},
|
|
|
|
|
|
|
|
bson.M{
|
|
|
|
|
|
|
|
"addFields": bson.M{
|
|
|
|
|
|
|
|
"result": bson.M{
|
|
|
|
|
|
|
|
"$size": "$result",
|
|
|
|
|
|
|
|
},
|
|
|
|
|
|
|
|
},
|
|
|
|
|
|
|
|
},
|
|
|
|
|
|
|
|
bson.M{
|
|
|
|
|
|
|
|
"$addFields": bson.M{
|
|
|
|
|
|
|
|
"result": bson.M{
|
|
|
|
|
|
|
|
"$slice": bson.A{
|
|
|
|
|
|
|
|
"$result", 0, 10,
|
|
|
|
|
|
|
|
},
|
|
|
|
|
|
|
|
},
|
|
|
|
|
|
|
|
},
|
|
|
|
|
|
|
|
},
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
cur, err := m.MsgCollection.Aggregate(ctx, pipeline)
|
|
|
|
|
|
|
|
if err != nil {
|
|
|
|
|
|
|
|
return 0, nil, errs.Wrap(err)
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
defer cur.Close(ctx)
|
|
|
|
|
|
|
|
var result []Result
|
|
|
|
|
|
|
|
if err := cur.All(ctx, &result); err != nil {
|
|
|
|
|
|
|
|
return 0, nil, err
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
if len(result) == 0 {
|
|
|
|
|
|
|
|
return 0, nil, nil
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
res := make([]*table.UserCount, len(result[0].Result))
|
|
|
|
|
|
|
|
for i, r := range result[0].Result {
|
|
|
|
|
|
|
|
res[i] = &table.UserCount{
|
|
|
|
|
|
|
|
UserID: r.UserID,
|
|
|
|
|
|
|
|
Count: r.Count,
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
return result[0].Total, res, nil
|
|
|
|
|
|
|
|
}
|
|
|
|