Merge pull request #118 from alimy/pr-meilisearch

optimize search logic to abstract in a new service interface
pull/121/head
Michael Li 2 years ago committed by GitHub
commit 8edda3be81
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -34,5 +34,6 @@ func (a IndexActionT) String() string {
type CacheIndexService interface { type CacheIndexService interface {
VersionInfo VersionInfo
IndexPostsService IndexPostsService
SendAction(active IndexActionT) SendAction(active IndexActionT)
} }

@ -7,7 +7,6 @@ import (
// DataService data service interface that process data related logic on database // DataService data service interface that process data related logic on database
type DataService interface { type DataService interface {
WalletService WalletService
SearchService
IndexPostsService IndexPostsService
GetComments(conditions *model.ConditionsT, offset, limit int) ([]*model.Comment, error) GetComments(conditions *model.ConditionsT, offset, limit int) ([]*model.Comment, error)
@ -38,6 +37,7 @@ type DataService interface {
GetPostByID(id int64) (*model.Post, error) GetPostByID(id int64) (*model.Post, error)
GetPosts(conditions *model.ConditionsT, offset, limit int) ([]*model.Post, error) GetPosts(conditions *model.ConditionsT, offset, limit int) ([]*model.Post, error)
MergePosts(posts []*model.Post) ([]*model.PostFormated, error) MergePosts(posts []*model.Post) ([]*model.PostFormated, error)
RevampPosts(posts []*model.PostFormated) ([]*model.PostFormated, error)
GetPostCount(conditions *model.ConditionsT) (int64, error) GetPostCount(conditions *model.ConditionsT) (int64, error)
UpdatePost(post *model.Post) error UpdatePost(post *model.Post) error
GetUserPostStar(postID, userID int64) (*model.PostStar, error) GetUserPostStar(postID, userID int64) (*model.PostStar, error)

@ -2,7 +2,6 @@ package core
import ( import (
"github.com/rocboss/paopao-ce/internal/model" "github.com/rocboss/paopao-ce/internal/model"
"github.com/rocboss/paopao-ce/pkg/zinc"
) )
const ( const (
@ -12,18 +11,23 @@ const (
type SearchType string type SearchType string
type QueryT struct { type QueryReq struct {
Query string Query string
Visibility []model.PostVisibleT Visibility []model.PostVisibleT
Type SearchType Type SearchType
} }
// SearchService search service interface that implement base zinc type QueryResp struct {
type SearchService interface { Items []*model.PostFormated
CreateSearchIndex(indexName string) Total int64
BulkPushDoc(data []map[string]interface{}) (bool, error) }
DelDoc(indexName, id string) error
QueryAll(q *QueryT, indexName string, offset, limit int) (*zinc.QueryResultT, error) // TweetSearchService tweet search service interface
QuerySearch(indexName, query string, offset, limit int) (*zinc.QueryResultT, error) type TweetSearchService interface {
QueryTagSearch(indexName, query string, offset, limit int) (*zinc.QueryResultT, error) VersionInfo
IndexName() string
AddDocuments(documents []map[string]interface{}, primaryKey ...string) (bool, error)
DeleteDocuments(identifiers []string) error
Search(q *QueryReq, offset, limit int) (*QueryResp, error)
} }

@ -6,6 +6,8 @@ import (
// ObjectStorageService storage service interface that implement base AliOSS、MINIO or other // ObjectStorageService storage service interface that implement base AliOSS、MINIO or other
type ObjectStorageService interface { type ObjectStorageService interface {
VersionInfo
PutObject(objectKey string, reader io.Reader, objectSize int64, contentType string) (string, error) PutObject(objectKey string, reader io.Reader, objectSize int64, contentType string) (string, error)
SignURL(objectKey string, expiredInSec int64) (string, error) SignURL(objectKey string, expiredInSec int64) (string, error)
ObjectURL(objetKey string) string ObjectURL(objetKey string) string

@ -0,0 +1,40 @@
package dao
import (
"sync/atomic"
"time"
"github.com/allegro/bigcache/v3"
"github.com/rocboss/paopao-ce/internal/core"
"github.com/rocboss/paopao-ce/internal/model"
)
var (
_ core.CacheIndexService = (*simpleCacheIndexServant)(nil)
_ core.CacheIndexService = (*bigCacheIndexServant)(nil)
)
type postsEntry struct {
key string
posts []*model.PostFormated
}
type indexPostsFunc func(int64, int, int) ([]*model.PostFormated, error)
type bigCacheIndexServant struct {
getIndexPosts indexPostsFunc
indexActionCh chan core.IndexActionT
cachePostsCh chan *postsEntry
cache *bigcache.BigCache
lastCacheResetTime time.Time
}
type simpleCacheIndexServant struct {
getIndexPosts indexPostsFunc
indexActionCh chan core.IndexActionT
indexPosts []*model.PostFormated
atomicIndex atomic.Value
maxIndexSize int
checkTick *time.Ticker
expireIndexTick *time.Ticker
}

@ -1,15 +1,8 @@
package dao package dao
import ( import (
"sync/atomic"
"time"
"github.com/aliyun/aliyun-oss-go-sdk/oss"
"github.com/allegro/bigcache/v3"
"github.com/minio/minio-go/v7"
"github.com/rocboss/paopao-ce/internal/conf" "github.com/rocboss/paopao-ce/internal/conf"
"github.com/rocboss/paopao-ce/internal/core" "github.com/rocboss/paopao-ce/internal/core"
"github.com/rocboss/paopao-ce/internal/model"
"github.com/rocboss/paopao-ce/pkg/zinc" "github.com/rocboss/paopao-ce/pkg/zinc"
"github.com/sirupsen/logrus" "github.com/sirupsen/logrus"
"gorm.io/gorm" "gorm.io/gorm"
@ -17,13 +10,8 @@ import (
var ( var (
_ core.DataService = (*dataServant)(nil) _ core.DataService = (*dataServant)(nil)
_ core.ObjectStorageService = (*aliossServant)(nil)
_ core.ObjectStorageService = (*minioServant)(nil)
_ core.ObjectStorageService = (*s3Servant)(nil)
_ core.ObjectStorageService = (*localossServant)(nil)
_ core.AttachmentCheckService = (*attachmentCheckServant)(nil) _ core.AttachmentCheckService = (*attachmentCheckServant)(nil)
_ core.CacheIndexService = (*simpleCacheIndexServant)(nil) _ core.TweetSearchService = (*zincTweetSearchServant)(nil)
_ core.CacheIndexService = (*bigCacheIndexServant)(nil)
) )
type dataServant struct { type dataServant struct {
@ -34,55 +22,15 @@ type dataServant struct {
zinc *zinc.ZincClient zinc *zinc.ZincClient
} }
type indexPostsFunc func(int64, int, int) ([]*model.PostFormated, error)
type simpleCacheIndexServant struct {
getIndexPosts indexPostsFunc
indexActionCh chan core.IndexActionT
indexPosts []*model.PostFormated
atomicIndex atomic.Value
maxIndexSize int
checkTick *time.Ticker
expireIndexTick *time.Ticker
}
type postsEntry struct {
key string
posts []*model.PostFormated
}
type bigCacheIndexServant struct {
getIndexPosts indexPostsFunc
indexActionCh chan core.IndexActionT
cachePostsCh chan *postsEntry
cache *bigcache.BigCache
lastCacheResetTime time.Time
}
type localossServant struct {
savePath string
domain string
}
type aliossServant struct {
bucket *oss.Bucket
domain string
}
type minioServant struct {
client *minio.Client
bucket string
domain string
}
type s3Servant = minioServant
type attachmentCheckServant struct { type attachmentCheckServant struct {
domain string domain string
} }
func NewDataService(engine *gorm.DB, zinc *zinc.ZincClient) core.DataService { func NewDataService() core.DataService {
client := zinc.NewClient(conf.ZincSetting)
ds := &dataServant{ ds := &dataServant{
engine: engine, engine: conf.DBEngine,
zinc: zinc, zinc: client,
} }
// initialize CacheIndex if needed // initialize CacheIndex if needed
@ -96,33 +44,12 @@ func NewDataService(engine *gorm.DB, zinc *zinc.ZincClient) core.DataService {
} }
if ds.useCacheIndex { if ds.useCacheIndex {
logrus.Infof("use cache index service by %s for version: %s", ds.cacheIndex.Name(), ds.cacheIndex.Version()) logrus.Infof("use %s as cache index service by version: %s", ds.cacheIndex.Name(), ds.cacheIndex.Version())
} }
return ds return ds
} }
func NewObjectStorageService() (oss core.ObjectStorageService) {
if conf.CfgIf("AliOSS") {
oss = newAliossServent()
logrus.Infoln("use AliOSS as object storage")
} else if conf.CfgIf("MinIO") {
oss = newMinioServeant()
logrus.Infoln("use MinIO as object storage")
} else if conf.CfgIf("S3") {
oss = newS3Servent()
logrus.Infoln("use S3 as object storage")
} else if conf.CfgIf("LocalOSS") {
oss = newLocalossServent()
logrus.Infoln("use LocalOSS as object storage")
} else {
// default use AliOSS
oss = newAliossServent()
logrus.Infoln("use default AliOSS as object storage")
}
return
}
func NewAttachmentCheckerService() core.AttachmentCheckService { func NewAttachmentCheckerService() core.AttachmentCheckService {
return &attachmentCheckServant{ return &attachmentCheckServant{
domain: getOssDomain(), domain: getOssDomain(),

@ -0,0 +1,54 @@
package dao
import (
"github.com/aliyun/aliyun-oss-go-sdk/oss"
"github.com/minio/minio-go/v7"
"github.com/rocboss/paopao-ce/internal/conf"
"github.com/rocboss/paopao-ce/internal/core"
"github.com/sirupsen/logrus"
)
var (
_ core.ObjectStorageService = (*aliossServant)(nil)
_ core.ObjectStorageService = (*minioServant)(nil)
_ core.ObjectStorageService = (*s3Servant)(nil)
_ core.ObjectStorageService = (*localossServant)(nil)
)
type localossServant struct {
savePath string
domain string
}
type aliossServant struct {
bucket *oss.Bucket
domain string
}
type minioServant struct {
client *minio.Client
bucket string
domain string
}
type s3Servant = minioServant
func NewObjectStorageService() (oss core.ObjectStorageService) {
if conf.CfgIf("AliOSS") {
oss = newAliossServent()
} else if conf.CfgIf("MinIO") {
oss = newMinioServeant()
} else if conf.CfgIf("S3") {
oss = newS3Servent()
logrus.Infof("use S3 as object storage by version %s", oss.Version())
return
} else if conf.CfgIf("LocalOSS") {
oss = newLocalossServent()
} else {
// default use AliOSS as object storage service
oss = newAliossServent()
logrus.Infof("use default AliOSS as object storage by version %s", oss.Version())
}
logrus.Infof("use %s as object storage by version %s", oss.Name(), oss.Version())
return
}

@ -5,6 +5,7 @@ import (
"net/url" "net/url"
"strings" "strings"
"github.com/Masterminds/semver/v3"
"github.com/aliyun/aliyun-oss-go-sdk/oss" "github.com/aliyun/aliyun-oss-go-sdk/oss"
"github.com/rocboss/paopao-ce/internal/conf" "github.com/rocboss/paopao-ce/internal/conf"
"github.com/sirupsen/logrus" "github.com/sirupsen/logrus"
@ -27,6 +28,14 @@ func newAliossServent() *aliossServant {
} }
} }
func (s *aliossServant) Name() string {
return "AliOSS"
}
func (s *aliossServant) Version() *semver.Version {
return semver.MustParse("v0.1.0")
}
func (s *aliossServant) PutObject(objectKey string, reader io.Reader, objectSize int64, contentType string) (string, error) { func (s *aliossServant) PutObject(objectKey string, reader io.Reader, objectSize int64, contentType string) (string, error) {
err := s.bucket.PutObject(objectKey, reader, oss.ContentLength(objectSize), oss.ContentType(contentType)) err := s.bucket.PutObject(objectKey, reader, oss.ContentLength(objectSize), oss.ContentType(contentType))
if err != nil { if err != nil {

@ -9,6 +9,7 @@ import (
"strings" "strings"
"time" "time"
"github.com/Masterminds/semver/v3"
"github.com/rocboss/paopao-ce/internal/conf" "github.com/rocboss/paopao-ce/internal/conf"
"github.com/sirupsen/logrus" "github.com/sirupsen/logrus"
) )
@ -25,6 +26,14 @@ func newLocalossServent() *localossServant {
} }
} }
func (s *localossServant) Name() string {
return "LocalOSS"
}
func (s *localossServant) Version() *semver.Version {
return semver.MustParse("v0.1.0")
}
func (s *localossServant) PutObject(objectKey string, reader io.Reader, objectSize int64, contentType string) (string, error) { func (s *localossServant) PutObject(objectKey string, reader io.Reader, objectSize int64, contentType string) (string, error) {
saveDir := s.savePath + filepath.Dir(objectKey) saveDir := s.savePath + filepath.Dir(objectKey)
err := os.MkdirAll(saveDir, 0750) err := os.MkdirAll(saveDir, 0750)

@ -7,6 +7,7 @@ import (
"strings" "strings"
"time" "time"
"github.com/Masterminds/semver/v3"
"github.com/minio/minio-go/v7" "github.com/minio/minio-go/v7"
"github.com/minio/minio-go/v7/pkg/credentials" "github.com/minio/minio-go/v7/pkg/credentials"
"github.com/rocboss/paopao-ce/internal/conf" "github.com/rocboss/paopao-ce/internal/conf"
@ -29,6 +30,14 @@ func newMinioServeant() *minioServant {
} }
} }
func (s *minioServant) Name() string {
return "MinIO"
}
func (s *minioServant) Version() *semver.Version {
return semver.MustParse("v0.1.0")
}
func (s *minioServant) PutObject(objectKey string, reader io.Reader, objectSize int64, contentType string) (string, error) { func (s *minioServant) PutObject(objectKey string, reader io.Reader, objectSize int64, contentType string) (string, error) {
uploadInfo, err := s.client.PutObject(context.Background(), s.bucket, objectKey, reader, objectSize, minio.PutObjectOptions{ContentType: contentType}) uploadInfo, err := s.client.PutObject(context.Background(), s.bucket, objectKey, reader, objectSize, minio.PutObjectOptions{ContentType: contentType})
if err != nil { if err != nil {
@ -37,6 +46,7 @@ func (s *minioServant) PutObject(objectKey string, reader io.Reader, objectSize
logrus.Infoln("Successfully uploaded bytes: ", uploadInfo) logrus.Infoln("Successfully uploaded bytes: ", uploadInfo)
return s.domain + objectKey, nil return s.domain + objectKey, nil
} }
func (s *minioServant) SignURL(objectKey string, expiredInSec int64) (string, error) { func (s *minioServant) SignURL(objectKey string, expiredInSec int64) (string, error) {
// TODO: Set request parameters for content-disposition. // TODO: Set request parameters for content-disposition.
reqParams := make(url.Values) reqParams := make(url.Values)

@ -207,3 +207,80 @@ func (d *dataServant) GetPostAttatchmentBill(postID, userID int64) (*model.PostA
return bill.Get(d.engine) return bill.Get(d.engine)
} }
// MergePosts post数据整合
func (d *dataServant) MergePosts(posts []*model.Post) ([]*model.PostFormated, error) {
postIds := make([]int64, 0, len(posts))
userIds := make([]int64, 0, len(posts))
for _, post := range posts {
postIds = append(postIds, post.ID)
userIds = append(userIds, post.UserID)
}
postContents, err := d.GetPostContentsByIDs(postIds)
if err != nil {
return nil, err
}
users, err := d.GetUsersByIDs(userIds)
if err != nil {
return nil, err
}
userMap := make(map[int64]*model.UserFormated, len(users))
for _, user := range users {
userMap[user.ID] = user.Format()
}
contentMap := make(map[int64][]*model.PostContentFormated, len(postContents))
for _, content := range postContents {
contentMap[content.PostID] = append(contentMap[content.PostID], content.Format())
}
// 数据整合
postsFormated := make([]*model.PostFormated, 0, len(posts))
for _, post := range posts {
postFormated := post.Format()
postFormated.User = userMap[post.UserID]
postFormated.Contents = contentMap[post.ID]
postsFormated = append(postsFormated, postFormated)
}
return postsFormated, nil
}
// RevampPosts post数据整形修复
func (d *dataServant) RevampPosts(posts []*model.PostFormated) ([]*model.PostFormated, error) {
postIds := make([]int64, 0, len(posts))
userIds := make([]int64, 0, len(posts))
for _, post := range posts {
postIds = append(postIds, post.ID)
userIds = append(userIds, post.UserID)
}
postContents, err := d.GetPostContentsByIDs(postIds)
if err != nil {
return nil, err
}
users, err := d.GetUsersByIDs(userIds)
if err != nil {
return nil, err
}
userMap := make(map[int64]*model.UserFormated, len(users))
for _, user := range users {
userMap[user.ID] = user.Format()
}
contentMap := make(map[int64][]*model.PostContentFormated, len(postContents))
for _, content := range postContents {
contentMap[content.PostID] = append(contentMap[content.PostID], content.Format())
}
// 数据整合
for _, post := range posts {
post.User = userMap[post.UserID]
post.Contents = contentMap[post.ID]
}
return posts, nil
}

@ -17,45 +17,6 @@ func (d *dataServant) IndexPosts(userId int64, offset int, limit int) ([]*model.
return d.getIndexPosts(userId, offset, limit) return d.getIndexPosts(userId, offset, limit)
} }
func (d *dataServant) MergePosts(posts []*model.Post) ([]*model.PostFormated, error) {
postIds := make([]int64, 0, len(posts))
userIds := make([]int64, 0, len(posts))
for _, post := range posts {
postIds = append(postIds, post.ID)
userIds = append(userIds, post.UserID)
}
postContents, err := d.GetPostContentsByIDs(postIds)
if err != nil {
return nil, err
}
users, err := d.GetUsersByIDs(userIds)
if err != nil {
return nil, err
}
userMap := make(map[int64]*model.UserFormated, len(users))
for _, user := range users {
userMap[user.ID] = user.Format()
}
contentMap := make(map[int64][]*model.PostContentFormated, len(postContents))
for _, content := range postContents {
contentMap[content.PostID] = append(contentMap[content.PostID], content.Format())
}
// 数据整合
postsFormated := make([]*model.PostFormated, 0, len(posts))
for _, post := range posts {
postFormated := post.Format()
postFormated.User = userMap[post.UserID]
postFormated.Contents = contentMap[post.ID]
postsFormated = append(postsFormated, postFormated)
}
return postsFormated, nil
}
// getIndexPosts _userId保留未来使用 // getIndexPosts _userId保留未来使用
// TODO: 未来可能根据userId查询广场推文列表简单做到不同用户的主页都是不同的 // TODO: 未来可能根据userId查询广场推文列表简单做到不同用户的主页都是不同的
func (d *dataServant) getIndexPosts(_userId int64, offset int, limit int) ([]*model.PostFormated, error) { func (d *dataServant) getIndexPosts(_userId int64, offset int, limit int) ([]*model.PostFormated, error) {

@ -1,165 +1,28 @@
package dao package dao
import ( import (
"github.com/rocboss/paopao-ce/internal/conf"
"github.com/rocboss/paopao-ce/internal/core" "github.com/rocboss/paopao-ce/internal/core"
"github.com/rocboss/paopao-ce/pkg/zinc" "github.com/rocboss/paopao-ce/pkg/zinc"
"github.com/sirupsen/logrus"
) )
func (d *dataServant) CreateSearchIndex(indexName string) { var (
// 不存在则创建索引 _ core.TweetSearchService = (*zincTweetSearchServant)(nil)
d.zinc.CreateIndex(indexName, &zinc.ZincIndexProperty{ )
"id": &zinc.ZincIndexPropertyT{
Type: "numeric",
Index: true,
Store: true,
Sortable: true,
},
"user_id": &zinc.ZincIndexPropertyT{
Type: "numeric",
Index: true,
Store: true,
},
"comment_count": &zinc.ZincIndexPropertyT{
Type: "numeric",
Index: true,
Sortable: true,
Store: true,
},
"collection_count": &zinc.ZincIndexPropertyT{
Type: "numeric",
Index: true,
Sortable: true,
Store: true,
},
"upvote_count": &zinc.ZincIndexPropertyT{
Type: "numeric",
Index: true,
Sortable: true,
Store: true,
},
"is_top": &zinc.ZincIndexPropertyT{
Type: "numeric",
Index: true,
Sortable: true,
Store: true,
},
"is_essence": &zinc.ZincIndexPropertyT{
Type: "numeric",
Index: true,
Sortable: true,
Store: true,
},
"content": &zinc.ZincIndexPropertyT{
Type: "text",
Index: true,
Store: true,
Aggregatable: true,
Highlightable: true,
Analyzer: "gse_search",
SearchAnalyzer: "gse_standard",
},
"tags": &zinc.ZincIndexPropertyT{
Type: "keyword",
Index: true,
Store: true,
},
"ip_loc": &zinc.ZincIndexPropertyT{
Type: "keyword",
Index: true,
Store: true,
},
"latest_replied_on": &zinc.ZincIndexPropertyT{
Type: "numeric",
Index: true,
Sortable: true,
Store: true,
},
"attachment_price": &zinc.ZincIndexPropertyT{
Type: "numeric",
Sortable: true,
Store: true,
},
"created_on": &zinc.ZincIndexPropertyT{
Type: "numeric",
Index: true,
Sortable: true,
Store: true,
},
"modified_on": &zinc.ZincIndexPropertyT{
Type: "numeric",
Index: true,
Sortable: true,
Store: true,
},
})
}
func (d *dataServant) BulkPushDoc(data []map[string]interface{}) (bool, error) {
return d.zinc.BulkPushDoc(data)
}
func (d *dataServant) DelDoc(indexName, id string) error {
return d.zinc.DelDoc(indexName, id)
}
func (d *dataServant) QueryAll(q *core.QueryT, indexName string, offset, limit int) (*zinc.QueryResultT, error) {
// 普通搜索
if q.Type == core.SearchTypeDefault && q.Query != "" {
return d.QuerySearch(indexName, q.Query, offset, limit)
}
// Tag分类
if q.Type == core.SearchTypeTag && q.Query != "" {
return d.QueryTagSearch(indexName, q.Query, offset, limit)
}
queryMap := map[string]interface{}{
"query": map[string]interface{}{
"match_all": map[string]string{},
},
"sort": []string{"-is_top", "-latest_replied_on"},
"from": offset,
"size": limit,
}
rsp, err := d.zinc.EsQuery(indexName, queryMap)
if err != nil {
return nil, err
}
return rsp, err
}
func (d *dataServant) QuerySearch(indexName, query string, offset, limit int) (*zinc.QueryResultT, error) {
rsp, err := d.zinc.EsQuery(indexName, map[string]interface{}{
"query": map[string]interface{}{
"match_phrase": map[string]interface{}{
"content": query,
},
},
"sort": []string{"-is_top", "-latest_replied_on"},
"from": offset,
"size": limit,
})
if err != nil {
return nil, err
}
return rsp, err type zincTweetSearchServant struct {
indexName string
client *zinc.ZincClient
} }
func (d *dataServant) QueryTagSearch(indexName, query string, offset, limit int) (*zinc.QueryResultT, error) { func NewTweetSearchService() (ts core.TweetSearchService) {
rsp, err := d.zinc.ApiQuery(indexName, map[string]interface{}{ if conf.CfgIf("Zinc") {
"search_type": "querystring", ts = newZincTweetSearchServant()
"query": map[string]interface{}{ } else {
"term": "tags." + query + ":1", // default use Zinc as tweet search service
}, ts = newZincTweetSearchServant()
"sort_fields": []string{"-is_top", "-latest_replied_on"},
"from": offset,
"max_results": limit,
})
if err != nil {
return nil, err
} }
logrus.Infof("use %s as tweet search serice by version %s", ts.Name(), ts.Version())
return rsp, err return
} }

@ -0,0 +1,219 @@
package dao
import (
"github.com/Masterminds/semver/v3"
"github.com/rocboss/paopao-ce/internal/conf"
"github.com/rocboss/paopao-ce/internal/core"
"github.com/rocboss/paopao-ce/internal/model"
"github.com/rocboss/paopao-ce/pkg/json"
"github.com/rocboss/paopao-ce/pkg/zinc"
)
func newZincTweetSearchServant() *zincTweetSearchServant {
s := conf.ZincSetting
zts := &zincTweetSearchServant{
indexName: s.Index,
client: zinc.NewClient(s),
}
zts.createIndex()
return zts
}
func (s *zincTweetSearchServant) Name() string {
return "Zinc"
}
func (s *zincTweetSearchServant) Version() *semver.Version {
return semver.MustParse("v0.1.0")
}
func (s *zincTweetSearchServant) IndexName() string {
return s.indexName
}
func (s *zincTweetSearchServant) AddDocuments(data []map[string]interface{}, primaryKey ...string) (bool, error) {
return s.client.BulkPushDoc(data)
}
func (s *zincTweetSearchServant) DeleteDocuments(identifiers []string) error {
for _, id := range identifiers {
if err := s.client.DelDoc(s.indexName, id); err != nil {
return err
}
}
return nil
}
func (s *zincTweetSearchServant) Search(q *core.QueryReq, offset, limit int) (*core.QueryResp, error) {
if q.Type == core.SearchTypeDefault && q.Query != "" {
return s.queryByContent(q, offset, limit)
} else if q.Type == core.SearchTypeTag && q.Query != "" {
return s.queryByTag(q, offset, limit)
}
return s.queryAny(offset, limit)
}
func (s *zincTweetSearchServant) queryByContent(q *core.QueryReq, offset, limit int) (*core.QueryResp, error) {
resp, err := s.client.EsQuery(s.indexName, map[string]interface{}{
"query": map[string]interface{}{
"match_phrase": map[string]interface{}{
"content": q.Query,
},
},
"sort": []string{"-is_top", "-latest_replied_on"},
"from": offset,
"size": limit,
})
if err != nil {
return nil, err
}
return s.postsFrom(resp)
}
func (s *zincTweetSearchServant) queryByTag(q *core.QueryReq, offset, limit int) (*core.QueryResp, error) {
resp, err := s.client.ApiQuery(s.indexName, map[string]interface{}{
"search_type": "querystring",
"query": map[string]interface{}{
"term": "tags." + q.Query + ":1",
},
"sort_fields": []string{"-is_top", "-latest_replied_on"},
"from": offset,
"max_results": limit,
})
if err != nil {
return nil, err
}
return s.postsFrom(resp)
}
func (s *zincTweetSearchServant) queryAny(offset, limit int) (*core.QueryResp, error) {
queryMap := map[string]interface{}{
"query": map[string]interface{}{
"match_all": map[string]string{},
},
"sort": []string{"-is_top", "-latest_replied_on"},
"from": offset,
"size": limit,
}
resp, err := s.client.EsQuery(s.indexName, queryMap)
if err != nil {
return nil, err
}
return s.postsFrom(resp)
}
func (s *zincTweetSearchServant) postsFrom(resp *zinc.QueryResultT) (*core.QueryResp, error) {
posts := make([]*model.PostFormated, 0, len(resp.Hits.Hits))
for _, hit := range resp.Hits.Hits {
item := &model.PostFormated{}
raw, err := json.Marshal(hit.Source)
if err != nil {
return nil, err
}
if err = json.Unmarshal(raw, item); err != nil {
return nil, err
}
posts = append(posts, item)
}
return &core.QueryResp{
Items: posts,
Total: resp.Hits.Total.Value,
}, nil
}
func (s *zincTweetSearchServant) createIndex() {
// 不存在则创建索引
s.client.CreateIndex(s.indexName, &zinc.ZincIndexProperty{
"id": &zinc.ZincIndexPropertyT{
Type: "numeric",
Index: true,
Store: true,
Sortable: true,
},
"user_id": &zinc.ZincIndexPropertyT{
Type: "numeric",
Index: true,
Store: true,
},
"comment_count": &zinc.ZincIndexPropertyT{
Type: "numeric",
Index: true,
Sortable: true,
Store: true,
},
"collection_count": &zinc.ZincIndexPropertyT{
Type: "numeric",
Index: true,
Sortable: true,
Store: true,
},
"upvote_count": &zinc.ZincIndexPropertyT{
Type: "numeric",
Index: true,
Sortable: true,
Store: true,
},
"visibility": &zinc.ZincIndexPropertyT{
Type: "numeric",
Index: true,
Sortable: true,
Store: true,
},
"is_top": &zinc.ZincIndexPropertyT{
Type: "numeric",
Index: true,
Sortable: true,
Store: true,
},
"is_essence": &zinc.ZincIndexPropertyT{
Type: "numeric",
Index: true,
Sortable: true,
Store: true,
},
"content": &zinc.ZincIndexPropertyT{
Type: "text",
Index: true,
Store: true,
Aggregatable: true,
Highlightable: true,
Analyzer: "gse_search",
SearchAnalyzer: "gse_standard",
},
"tags": &zinc.ZincIndexPropertyT{
Type: "keyword",
Index: true,
Store: true,
},
"ip_loc": &zinc.ZincIndexPropertyT{
Type: "keyword",
Index: true,
Store: true,
},
"latest_replied_on": &zinc.ZincIndexPropertyT{
Type: "numeric",
Index: true,
Sortable: true,
Store: true,
},
"attachment_price": &zinc.ZincIndexPropertyT{
Type: "numeric",
Sortable: true,
Store: true,
},
"created_on": &zinc.ZincIndexPropertyT{
Type: "numeric",
Index: true,
Sortable: true,
Store: true,
},
"modified_on": &zinc.ZincIndexPropertyT{
Type: "numeric",
Index: true,
Sortable: true,
Store: true,
},
})
}

@ -14,7 +14,7 @@ import (
func GetPostList(c *gin.Context) { func GetPostList(c *gin.Context) {
response := app.NewResponse(c) response := app.NewResponse(c)
q := &core.QueryT{ q := &core.QueryReq{
Query: c.Query("query"), Query: c.Query("query"),
Type: "search", Type: "search",
} }
@ -23,8 +23,8 @@ func GetPostList(c *gin.Context) {
} }
userId, _ := userIdFrom(c) userId, _ := userIdFrom(c)
if q.Query == "" && q.Type == "search" {
offset, limit := app.GetPageOffset(c) offset, limit := app.GetPageOffset(c)
if q.Query == "" && q.Type == "search" {
posts, err := service.GetIndexPosts(userId, offset, limit) posts, err := service.GetIndexPosts(userId, offset, limit)
if err != nil { if err != nil {
logrus.Errorf("service.GetPostList err: %v\n", err) logrus.Errorf("service.GetPostList err: %v\n", err)
@ -38,7 +38,7 @@ func GetPostList(c *gin.Context) {
response.ToResponseList(posts, totalRows) response.ToResponseList(posts, totalRows)
} else { } else {
posts, totalRows, err := service.GetPostListFromSearch(q, (app.GetPage(c)-1)*app.GetPageSize(c), app.GetPageSize(c)) posts, totalRows, err := service.GetPostListFromSearch(q, offset, limit)
if err != nil { if err != nil {
logrus.Errorf("service.GetPostListFromSearch err: %v\n", err) logrus.Errorf("service.GetPostListFromSearch err: %v\n", err)

@ -478,32 +478,24 @@ func GetPostCount(conditions *model.ConditionsT) (int64, error) {
return ds.GetPostCount(conditions) return ds.GetPostCount(conditions)
} }
func GetPostListFromSearch(q *core.QueryT, offset, limit int) ([]*model.PostFormated, int64, error) { func GetPostListFromSearch(q *core.QueryReq, offset, limit int) ([]*model.PostFormated, int64, error) {
queryResult, err := ds.QueryAll(q, conf.ZincSetting.Index, offset, limit) resp, err := ts.Search(q, offset, limit)
if err != nil { if err != nil {
return nil, 0, err return nil, 0, err
} }
posts, err := ds.RevampPosts(resp.Items)
posts, err := FormatZincPost(queryResult)
if err != nil { if err != nil {
return nil, 0, err return nil, 0, err
} }
return posts, resp.Total, nil
return posts, queryResult.Hits.Total.Value, nil
} }
func GetPostListFromSearchByQuery(query string, offset, limit int) ([]*model.PostFormated, int64, error) { func GetPostListFromSearchByQuery(query string, offset, limit int) ([]*model.PostFormated, int64, error) {
queryResult, err := ds.QuerySearch(conf.ZincSetting.Index, query, offset, limit) q := &core.QueryReq{
if err != nil { Query: query,
return nil, 0, err Type: "search",
}
posts, err := FormatZincPost(queryResult)
if err != nil {
return nil, 0, err
} }
return GetPostListFromSearch(q, offset, limit)
return posts, queryResult.Hits.Total.Value, nil
} }
func PushPostToSearch(post *model.Post) { func PushPostToSearch(post *model.Post) {
@ -512,8 +504,6 @@ func PushPostToSearch(post *model.Post) {
return return
} }
indexName := conf.ZincSetting.Index
postFormated := post.Format() postFormated := post.Format()
postFormated.User = &model.UserFormated{ postFormated.User = &model.UserFormated{
ID: post.UserID, ID: post.UserID,
@ -539,7 +529,7 @@ func PushPostToSearch(post *model.Post) {
data := []map[string]interface{}{} data := []map[string]interface{}{}
data = append(data, map[string]interface{}{ data = append(data, map[string]interface{}{
"index": map[string]interface{}{ "index": map[string]interface{}{
"_index": indexName, "_index": ts.IndexName(),
"_id": fmt.Sprintf("%d", post.ID), "_id": fmt.Sprintf("%d", post.ID),
}, },
}, map[string]interface{}{ }, map[string]interface{}{
@ -560,13 +550,11 @@ func PushPostToSearch(post *model.Post) {
"modified_on": post.ModifiedOn, "modified_on": post.ModifiedOn,
}) })
ds.BulkPushDoc(data) ts.AddDocuments(data)
} }
func DeleteSearchPost(post *model.Post) error { func DeleteSearchPost(post *model.Post) error {
indexName := conf.ZincSetting.Index return ts.DeleteDocuments([]string{fmt.Sprintf("%d", post.ID)})
return ds.DelDoc(indexName, fmt.Sprintf("%d", post.ID))
} }
func PushPostsToSearch(c *gin.Context) { func PushPostsToSearch(c *gin.Context) {
@ -579,10 +567,6 @@ func PushPostsToSearch(c *gin.Context) {
pages := math.Ceil(float64(totalRows) / float64(splitNum)) pages := math.Ceil(float64(totalRows) / float64(splitNum))
nums := int(pages) nums := int(pages)
indexName := conf.ZincSetting.Index
// 创建索引
ds.CreateSearchIndex(indexName)
for i := 0; i < nums; i++ { for i := 0; i < nums; i++ {
data := []map[string]interface{}{} data := []map[string]interface{}{}
@ -605,7 +589,7 @@ func PushPostsToSearch(c *gin.Context) {
data = append(data, map[string]interface{}{ data = append(data, map[string]interface{}{
"index": map[string]interface{}{ "index": map[string]interface{}{
"_index": indexName, "_index": ts.IndexName(),
"_id": fmt.Sprintf("%d", post.ID), "_id": fmt.Sprintf("%d", post.ID),
}, },
}, map[string]interface{}{ }, map[string]interface{}{
@ -628,7 +612,7 @@ func PushPostsToSearch(c *gin.Context) {
} }
if len(data) > 0 { if len(data) > 0 {
ds.BulkPushDoc(data) ts.AddDocuments(data)
} }
} }

@ -4,19 +4,18 @@ import (
"github.com/rocboss/paopao-ce/internal/conf" "github.com/rocboss/paopao-ce/internal/conf"
"github.com/rocboss/paopao-ce/internal/core" "github.com/rocboss/paopao-ce/internal/core"
"github.com/rocboss/paopao-ce/internal/dao" "github.com/rocboss/paopao-ce/internal/dao"
"github.com/rocboss/paopao-ce/pkg/zinc"
) )
var ( var (
ds core.DataService ds core.DataService
ts core.TweetSearchService
attachmentChecker core.AttachmentCheckService attachmentChecker core.AttachmentCheckService
DisablePhoneVerify bool DisablePhoneVerify bool
) )
func Initialize() { func Initialize() {
zincClient := zinc.NewClient(conf.ZincSetting) ds = dao.NewDataService()
ds = dao.NewDataService(conf.DBEngine, zincClient) ts = dao.NewTweetSearchService()
attachmentChecker = dao.NewAttachmentCheckerService() attachmentChecker = dao.NewAttachmentCheckerService()
DisablePhoneVerify = !conf.CfgIf("Sms") DisablePhoneVerify = !conf.CfgIf("Sms")
} }

Loading…
Cancel
Save