optimize search abstract service interface

pull/118/head
alimy 3 years ago
parent 0b5dd8d15e
commit edc52e71b1

@ -34,5 +34,6 @@ func (a IndexActionT) String() string {
type CacheIndexService interface {
VersionInfo
IndexPostsService
SendAction(active IndexActionT)
}

@ -7,7 +7,6 @@ import (
// DataService data service interface that process data related logic on database
type DataService interface {
WalletService
SearchService
IndexPostsService
GetComments(conditions *model.ConditionsT, offset, limit int) ([]*model.Comment, error)
@ -38,6 +37,7 @@ type DataService interface {
GetPostByID(id int64) (*model.Post, error)
GetPosts(conditions *model.ConditionsT, offset, limit int) ([]*model.Post, error)
MergePosts(posts []*model.Post) ([]*model.PostFormated, error)
RevampPosts(posts []*model.PostFormated) ([]*model.PostFormated, error)
GetPostCount(conditions *model.ConditionsT) (int64, error)
UpdatePost(post *model.Post) error
GetUserPostStar(postID, userID int64) (*model.PostStar, error)

@ -2,7 +2,6 @@ package core
import (
"github.com/rocboss/paopao-ce/internal/model"
"github.com/rocboss/paopao-ce/pkg/zinc"
)
const (
@ -12,18 +11,23 @@ const (
type SearchType string
type QueryT struct {
type QueryReq struct {
Query string
Visibility []model.PostVisibleT
Type SearchType
}
// SearchService search service interface that implement base zinc
type SearchService interface {
CreateSearchIndex(indexName string)
BulkPushDoc(data []map[string]interface{}) (bool, error)
DelDoc(indexName, id string) error
QueryAll(q *QueryT, indexName string, offset, limit int) (*zinc.QueryResultT, error)
QuerySearch(indexName, query string, offset, limit int) (*zinc.QueryResultT, error)
QueryTagSearch(indexName, query string, offset, limit int) (*zinc.QueryResultT, error)
type QueryResp struct {
Items []*model.PostFormated
Total int64
}
// TweetSearchService tweet search service interface
type TweetSearchService interface {
VersionInfo
IndexName() string
AddDocuments(documents []map[string]interface{}, primaryKey ...string) (bool, error)
DeleteDocuments(identifiers []string) error
Search(q *QueryReq, offset, limit int) (*QueryResp, error)
}

@ -6,6 +6,8 @@ import (
// ObjectStorageService storage service interface that implement base AliOSS、MINIO or other
type ObjectStorageService interface {
VersionInfo
PutObject(objectKey string, reader io.Reader, objectSize int64, contentType string) (string, error)
SignURL(objectKey string, expiredInSec int64) (string, error)
ObjectURL(objetKey string) string

@ -0,0 +1,40 @@
package dao
import (
"sync/atomic"
"time"
"github.com/allegro/bigcache/v3"
"github.com/rocboss/paopao-ce/internal/core"
"github.com/rocboss/paopao-ce/internal/model"
)
var (
_ core.CacheIndexService = (*simpleCacheIndexServant)(nil)
_ core.CacheIndexService = (*bigCacheIndexServant)(nil)
)
type postsEntry struct {
key string
posts []*model.PostFormated
}
type indexPostsFunc func(int64, int, int) ([]*model.PostFormated, error)
type bigCacheIndexServant struct {
getIndexPosts indexPostsFunc
indexActionCh chan core.IndexActionT
cachePostsCh chan *postsEntry
cache *bigcache.BigCache
lastCacheResetTime time.Time
}
type simpleCacheIndexServant struct {
getIndexPosts indexPostsFunc
indexActionCh chan core.IndexActionT
indexPosts []*model.PostFormated
atomicIndex atomic.Value
maxIndexSize int
checkTick *time.Ticker
expireIndexTick *time.Ticker
}

@ -1,15 +1,8 @@
package dao
import (
"sync/atomic"
"time"
"github.com/aliyun/aliyun-oss-go-sdk/oss"
"github.com/allegro/bigcache/v3"
"github.com/minio/minio-go/v7"
"github.com/rocboss/paopao-ce/internal/conf"
"github.com/rocboss/paopao-ce/internal/core"
"github.com/rocboss/paopao-ce/internal/model"
"github.com/rocboss/paopao-ce/pkg/zinc"
"github.com/sirupsen/logrus"
"gorm.io/gorm"
@ -17,13 +10,8 @@ import (
var (
_ core.DataService = (*dataServant)(nil)
_ core.ObjectStorageService = (*aliossServant)(nil)
_ core.ObjectStorageService = (*minioServant)(nil)
_ core.ObjectStorageService = (*s3Servant)(nil)
_ core.ObjectStorageService = (*localossServant)(nil)
_ core.AttachmentCheckService = (*attachmentCheckServant)(nil)
_ core.CacheIndexService = (*simpleCacheIndexServant)(nil)
_ core.CacheIndexService = (*bigCacheIndexServant)(nil)
_ core.TweetSearchService = (*zincTweetSearchServant)(nil)
)
type dataServant struct {
@ -34,55 +22,15 @@ type dataServant struct {
zinc *zinc.ZincClient
}
type indexPostsFunc func(int64, int, int) ([]*model.PostFormated, error)
type simpleCacheIndexServant struct {
getIndexPosts indexPostsFunc
indexActionCh chan core.IndexActionT
indexPosts []*model.PostFormated
atomicIndex atomic.Value
maxIndexSize int
checkTick *time.Ticker
expireIndexTick *time.Ticker
}
type postsEntry struct {
key string
posts []*model.PostFormated
}
type bigCacheIndexServant struct {
getIndexPosts indexPostsFunc
indexActionCh chan core.IndexActionT
cachePostsCh chan *postsEntry
cache *bigcache.BigCache
lastCacheResetTime time.Time
}
type localossServant struct {
savePath string
domain string
}
type aliossServant struct {
bucket *oss.Bucket
domain string
}
type minioServant struct {
client *minio.Client
bucket string
domain string
}
type s3Servant = minioServant
type attachmentCheckServant struct {
domain string
}
func NewDataService(engine *gorm.DB, zinc *zinc.ZincClient) core.DataService {
func NewDataService() core.DataService {
client := zinc.NewClient(conf.ZincSetting)
ds := &dataServant{
engine: engine,
zinc: zinc,
engine: conf.DBEngine,
zinc: client,
}
// initialize CacheIndex if needed
@ -96,33 +44,12 @@ func NewDataService(engine *gorm.DB, zinc *zinc.ZincClient) core.DataService {
}
if ds.useCacheIndex {
logrus.Infof("use cache index service by %s for version: %s", ds.cacheIndex.Name(), ds.cacheIndex.Version())
logrus.Infof("use %s as cache index service by version: %s", ds.cacheIndex.Name(), ds.cacheIndex.Version())
}
return ds
}
func NewObjectStorageService() (oss core.ObjectStorageService) {
if conf.CfgIf("AliOSS") {
oss = newAliossServent()
logrus.Infoln("use AliOSS as object storage")
} else if conf.CfgIf("MinIO") {
oss = newMinioServeant()
logrus.Infoln("use MinIO as object storage")
} else if conf.CfgIf("S3") {
oss = newS3Servent()
logrus.Infoln("use S3 as object storage")
} else if conf.CfgIf("LocalOSS") {
oss = newLocalossServent()
logrus.Infoln("use LocalOSS as object storage")
} else {
// default use AliOSS
oss = newAliossServent()
logrus.Infoln("use default AliOSS as object storage")
}
return
}
func NewAttachmentCheckerService() core.AttachmentCheckService {
return &attachmentCheckServant{
domain: getOssDomain(),

@ -0,0 +1,54 @@
package dao
import (
"github.com/aliyun/aliyun-oss-go-sdk/oss"
"github.com/minio/minio-go/v7"
"github.com/rocboss/paopao-ce/internal/conf"
"github.com/rocboss/paopao-ce/internal/core"
"github.com/sirupsen/logrus"
)
var (
_ core.ObjectStorageService = (*aliossServant)(nil)
_ core.ObjectStorageService = (*minioServant)(nil)
_ core.ObjectStorageService = (*s3Servant)(nil)
_ core.ObjectStorageService = (*localossServant)(nil)
)
type localossServant struct {
savePath string
domain string
}
type aliossServant struct {
bucket *oss.Bucket
domain string
}
type minioServant struct {
client *minio.Client
bucket string
domain string
}
type s3Servant = minioServant
func NewObjectStorageService() (oss core.ObjectStorageService) {
if conf.CfgIf("AliOSS") {
oss = newAliossServent()
} else if conf.CfgIf("MinIO") {
oss = newMinioServeant()
} else if conf.CfgIf("S3") {
oss = newS3Servent()
logrus.Infof("use S3 as object storage by version %s", oss.Version())
return
} else if conf.CfgIf("LocalOSS") {
oss = newLocalossServent()
} else {
// default use AliOSS as object storage service
oss = newAliossServent()
logrus.Infof("use default AliOSS as object storage by version %s", oss.Version())
}
logrus.Infof("use %s as object storage by version %s", oss.Name(), oss.Version())
return
}

@ -5,6 +5,7 @@ import (
"net/url"
"strings"
"github.com/Masterminds/semver/v3"
"github.com/aliyun/aliyun-oss-go-sdk/oss"
"github.com/rocboss/paopao-ce/internal/conf"
"github.com/sirupsen/logrus"
@ -27,6 +28,14 @@ func newAliossServent() *aliossServant {
}
}
func (s *aliossServant) Name() string {
return "AliOSS"
}
func (s *aliossServant) Version() *semver.Version {
return semver.MustParse("v0.1.0")
}
func (s *aliossServant) PutObject(objectKey string, reader io.Reader, objectSize int64, contentType string) (string, error) {
err := s.bucket.PutObject(objectKey, reader, oss.ContentLength(objectSize), oss.ContentType(contentType))
if err != nil {

@ -9,6 +9,7 @@ import (
"strings"
"time"
"github.com/Masterminds/semver/v3"
"github.com/rocboss/paopao-ce/internal/conf"
"github.com/sirupsen/logrus"
)
@ -25,6 +26,14 @@ func newLocalossServent() *localossServant {
}
}
func (s *localossServant) Name() string {
return "LocalOSS"
}
func (s *localossServant) Version() *semver.Version {
return semver.MustParse("v0.1.0")
}
func (s *localossServant) PutObject(objectKey string, reader io.Reader, objectSize int64, contentType string) (string, error) {
saveDir := s.savePath + filepath.Dir(objectKey)
err := os.MkdirAll(saveDir, 0750)

@ -7,6 +7,7 @@ import (
"strings"
"time"
"github.com/Masterminds/semver/v3"
"github.com/minio/minio-go/v7"
"github.com/minio/minio-go/v7/pkg/credentials"
"github.com/rocboss/paopao-ce/internal/conf"
@ -29,6 +30,14 @@ func newMinioServeant() *minioServant {
}
}
func (s *minioServant) Name() string {
return "MinIO"
}
func (s *minioServant) Version() *semver.Version {
return semver.MustParse("v0.1.0")
}
func (s *minioServant) PutObject(objectKey string, reader io.Reader, objectSize int64, contentType string) (string, error) {
uploadInfo, err := s.client.PutObject(context.Background(), s.bucket, objectKey, reader, objectSize, minio.PutObjectOptions{ContentType: contentType})
if err != nil {
@ -37,6 +46,7 @@ func (s *minioServant) PutObject(objectKey string, reader io.Reader, objectSize
logrus.Infoln("Successfully uploaded bytes: ", uploadInfo)
return s.domain + objectKey, nil
}
func (s *minioServant) SignURL(objectKey string, expiredInSec int64) (string, error) {
// TODO: Set request parameters for content-disposition.
reqParams := make(url.Values)

@ -207,3 +207,80 @@ func (d *dataServant) GetPostAttatchmentBill(postID, userID int64) (*model.PostA
return bill.Get(d.engine)
}
// MergePosts post数据整合
func (d *dataServant) MergePosts(posts []*model.Post) ([]*model.PostFormated, error) {
postIds := make([]int64, 0, len(posts))
userIds := make([]int64, 0, len(posts))
for _, post := range posts {
postIds = append(postIds, post.ID)
userIds = append(userIds, post.UserID)
}
postContents, err := d.GetPostContentsByIDs(postIds)
if err != nil {
return nil, err
}
users, err := d.GetUsersByIDs(userIds)
if err != nil {
return nil, err
}
userMap := make(map[int64]*model.UserFormated, len(users))
for _, user := range users {
userMap[user.ID] = user.Format()
}
contentMap := make(map[int64][]*model.PostContentFormated, len(postContents))
for _, content := range postContents {
contentMap[content.PostID] = append(contentMap[content.PostID], content.Format())
}
// 数据整合
postsFormated := make([]*model.PostFormated, 0, len(posts))
for _, post := range posts {
postFormated := post.Format()
postFormated.User = userMap[post.UserID]
postFormated.Contents = contentMap[post.ID]
postsFormated = append(postsFormated, postFormated)
}
return postsFormated, nil
}
// RevampPosts post数据整形修复
func (d *dataServant) RevampPosts(posts []*model.PostFormated) ([]*model.PostFormated, error) {
postIds := make([]int64, 0, len(posts))
userIds := make([]int64, 0, len(posts))
for _, post := range posts {
postIds = append(postIds, post.ID)
userIds = append(userIds, post.UserID)
}
postContents, err := d.GetPostContentsByIDs(postIds)
if err != nil {
return nil, err
}
users, err := d.GetUsersByIDs(userIds)
if err != nil {
return nil, err
}
userMap := make(map[int64]*model.UserFormated, len(users))
for _, user := range users {
userMap[user.ID] = user.Format()
}
contentMap := make(map[int64][]*model.PostContentFormated, len(postContents))
for _, content := range postContents {
contentMap[content.PostID] = append(contentMap[content.PostID], content.Format())
}
// 数据整合
for _, post := range posts {
post.User = userMap[post.UserID]
post.Contents = contentMap[post.ID]
}
return posts, nil
}

@ -17,45 +17,6 @@ func (d *dataServant) IndexPosts(userId int64, offset int, limit int) ([]*model.
return d.getIndexPosts(userId, offset, limit)
}
func (d *dataServant) MergePosts(posts []*model.Post) ([]*model.PostFormated, error) {
postIds := make([]int64, 0, len(posts))
userIds := make([]int64, 0, len(posts))
for _, post := range posts {
postIds = append(postIds, post.ID)
userIds = append(userIds, post.UserID)
}
postContents, err := d.GetPostContentsByIDs(postIds)
if err != nil {
return nil, err
}
users, err := d.GetUsersByIDs(userIds)
if err != nil {
return nil, err
}
userMap := make(map[int64]*model.UserFormated, len(users))
for _, user := range users {
userMap[user.ID] = user.Format()
}
contentMap := make(map[int64][]*model.PostContentFormated, len(postContents))
for _, content := range postContents {
contentMap[content.PostID] = append(contentMap[content.PostID], content.Format())
}
// 数据整合
postsFormated := make([]*model.PostFormated, 0, len(posts))
for _, post := range posts {
postFormated := post.Format()
postFormated.User = userMap[post.UserID]
postFormated.Contents = contentMap[post.ID]
postsFormated = append(postsFormated, postFormated)
}
return postsFormated, nil
}
// getIndexPosts _userId保留未来使用
// TODO: 未来可能根据userId查询广场推文列表简单做到不同用户的主页都是不同的
func (d *dataServant) getIndexPosts(_userId int64, offset int, limit int) ([]*model.PostFormated, error) {

@ -1,165 +1,28 @@
package dao
import (
"github.com/rocboss/paopao-ce/internal/conf"
"github.com/rocboss/paopao-ce/internal/core"
"github.com/rocboss/paopao-ce/pkg/zinc"
"github.com/sirupsen/logrus"
)
func (d *dataServant) CreateSearchIndex(indexName string) {
// 不存在则创建索引
d.zinc.CreateIndex(indexName, &zinc.ZincIndexProperty{
"id": &zinc.ZincIndexPropertyT{
Type: "numeric",
Index: true,
Store: true,
Sortable: true,
},
"user_id": &zinc.ZincIndexPropertyT{
Type: "numeric",
Index: true,
Store: true,
},
"comment_count": &zinc.ZincIndexPropertyT{
Type: "numeric",
Index: true,
Sortable: true,
Store: true,
},
"collection_count": &zinc.ZincIndexPropertyT{
Type: "numeric",
Index: true,
Sortable: true,
Store: true,
},
"upvote_count": &zinc.ZincIndexPropertyT{
Type: "numeric",
Index: true,
Sortable: true,
Store: true,
},
"is_top": &zinc.ZincIndexPropertyT{
Type: "numeric",
Index: true,
Sortable: true,
Store: true,
},
"is_essence": &zinc.ZincIndexPropertyT{
Type: "numeric",
Index: true,
Sortable: true,
Store: true,
},
"content": &zinc.ZincIndexPropertyT{
Type: "text",
Index: true,
Store: true,
Aggregatable: true,
Highlightable: true,
Analyzer: "gse_search",
SearchAnalyzer: "gse_standard",
},
"tags": &zinc.ZincIndexPropertyT{
Type: "keyword",
Index: true,
Store: true,
},
"ip_loc": &zinc.ZincIndexPropertyT{
Type: "keyword",
Index: true,
Store: true,
},
"latest_replied_on": &zinc.ZincIndexPropertyT{
Type: "numeric",
Index: true,
Sortable: true,
Store: true,
},
"attachment_price": &zinc.ZincIndexPropertyT{
Type: "numeric",
Sortable: true,
Store: true,
},
"created_on": &zinc.ZincIndexPropertyT{
Type: "numeric",
Index: true,
Sortable: true,
Store: true,
},
"modified_on": &zinc.ZincIndexPropertyT{
Type: "numeric",
Index: true,
Sortable: true,
Store: true,
},
})
}
func (d *dataServant) BulkPushDoc(data []map[string]interface{}) (bool, error) {
return d.zinc.BulkPushDoc(data)
}
func (d *dataServant) DelDoc(indexName, id string) error {
return d.zinc.DelDoc(indexName, id)
}
func (d *dataServant) QueryAll(q *core.QueryT, indexName string, offset, limit int) (*zinc.QueryResultT, error) {
// 普通搜索
if q.Type == core.SearchTypeDefault && q.Query != "" {
return d.QuerySearch(indexName, q.Query, offset, limit)
}
// Tag分类
if q.Type == core.SearchTypeTag && q.Query != "" {
return d.QueryTagSearch(indexName, q.Query, offset, limit)
}
queryMap := map[string]interface{}{
"query": map[string]interface{}{
"match_all": map[string]string{},
},
"sort": []string{"-is_top", "-latest_replied_on"},
"from": offset,
"size": limit,
}
rsp, err := d.zinc.EsQuery(indexName, queryMap)
if err != nil {
return nil, err
}
return rsp, err
}
func (d *dataServant) QuerySearch(indexName, query string, offset, limit int) (*zinc.QueryResultT, error) {
rsp, err := d.zinc.EsQuery(indexName, map[string]interface{}{
"query": map[string]interface{}{
"match_phrase": map[string]interface{}{
"content": query,
},
},
"sort": []string{"-is_top", "-latest_replied_on"},
"from": offset,
"size": limit,
})
if err != nil {
return nil, err
}
var (
_ core.TweetSearchService = (*zincTweetSearchServant)(nil)
)
return rsp, err
type zincTweetSearchServant struct {
indexName string
client *zinc.ZincClient
}
func (d *dataServant) QueryTagSearch(indexName, query string, offset, limit int) (*zinc.QueryResultT, error) {
rsp, err := d.zinc.ApiQuery(indexName, map[string]interface{}{
"search_type": "querystring",
"query": map[string]interface{}{
"term": "tags." + query + ":1",
},
"sort_fields": []string{"-is_top", "-latest_replied_on"},
"from": offset,
"max_results": limit,
})
if err != nil {
return nil, err
func NewTweetSearchService() (ts core.TweetSearchService) {
if conf.CfgIf("Zinc") {
ts = newZincTweetSearchServant()
} else {
// default use Zinc as tweet search service
ts = newZincTweetSearchServant()
}
return rsp, err
logrus.Infof("use %s as tweet search serice by version %s", ts.Name(), ts.Version())
return
}

@ -0,0 +1,219 @@
package dao
import (
"github.com/Masterminds/semver/v3"
"github.com/rocboss/paopao-ce/internal/conf"
"github.com/rocboss/paopao-ce/internal/core"
"github.com/rocboss/paopao-ce/internal/model"
"github.com/rocboss/paopao-ce/pkg/json"
"github.com/rocboss/paopao-ce/pkg/zinc"
)
func newZincTweetSearchServant() *zincTweetSearchServant {
s := conf.ZincSetting
zts := &zincTweetSearchServant{
indexName: s.Index,
client: zinc.NewClient(s),
}
zts.createIndex()
return zts
}
func (s *zincTweetSearchServant) Name() string {
return "Zinc"
}
func (s *zincTweetSearchServant) Version() *semver.Version {
return semver.MustParse("v0.1.0")
}
func (s *zincTweetSearchServant) IndexName() string {
return s.indexName
}
func (s *zincTweetSearchServant) AddDocuments(data []map[string]interface{}, primaryKey ...string) (bool, error) {
return s.client.BulkPushDoc(data)
}
func (s *zincTweetSearchServant) DeleteDocuments(identifiers []string) error {
for _, id := range identifiers {
if err := s.client.DelDoc(s.indexName, id); err != nil {
return err
}
}
return nil
}
func (s *zincTweetSearchServant) Search(q *core.QueryReq, offset, limit int) (*core.QueryResp, error) {
if q.Type == core.SearchTypeDefault && q.Query != "" {
return s.queryByContent(q, offset, limit)
} else if q.Type == core.SearchTypeTag && q.Query != "" {
return s.queryByTag(q, offset, limit)
}
return s.queryAny(offset, limit)
}
func (s *zincTweetSearchServant) queryByContent(q *core.QueryReq, offset, limit int) (*core.QueryResp, error) {
resp, err := s.client.EsQuery(s.indexName, map[string]interface{}{
"query": map[string]interface{}{
"match_phrase": map[string]interface{}{
"content": q.Query,
},
},
"sort": []string{"-is_top", "-latest_replied_on"},
"from": offset,
"size": limit,
})
if err != nil {
return nil, err
}
return s.postsFrom(resp)
}
func (s *zincTweetSearchServant) queryByTag(q *core.QueryReq, offset, limit int) (*core.QueryResp, error) {
resp, err := s.client.ApiQuery(s.indexName, map[string]interface{}{
"search_type": "querystring",
"query": map[string]interface{}{
"term": "tags." + q.Query + ":1",
},
"sort_fields": []string{"-is_top", "-latest_replied_on"},
"from": offset,
"max_results": limit,
})
if err != nil {
return nil, err
}
return s.postsFrom(resp)
}
func (s *zincTweetSearchServant) queryAny(offset, limit int) (*core.QueryResp, error) {
queryMap := map[string]interface{}{
"query": map[string]interface{}{
"match_all": map[string]string{},
},
"sort": []string{"-is_top", "-latest_replied_on"},
"from": offset,
"size": limit,
}
resp, err := s.client.EsQuery(s.indexName, queryMap)
if err != nil {
return nil, err
}
return s.postsFrom(resp)
}
func (s *zincTweetSearchServant) postsFrom(resp *zinc.QueryResultT) (*core.QueryResp, error) {
posts := make([]*model.PostFormated, 0, len(resp.Hits.Hits))
for _, hit := range resp.Hits.Hits {
item := &model.PostFormated{}
raw, err := json.Marshal(hit.Source)
if err != nil {
return nil, err
}
if err = json.Unmarshal(raw, item); err != nil {
return nil, err
}
posts = append(posts, item)
}
return &core.QueryResp{
Items: posts,
Total: resp.Hits.Total.Value,
}, nil
}
func (s *zincTweetSearchServant) createIndex() {
// 不存在则创建索引
s.client.CreateIndex(s.indexName, &zinc.ZincIndexProperty{
"id": &zinc.ZincIndexPropertyT{
Type: "numeric",
Index: true,
Store: true,
Sortable: true,
},
"user_id": &zinc.ZincIndexPropertyT{
Type: "numeric",
Index: true,
Store: true,
},
"comment_count": &zinc.ZincIndexPropertyT{
Type: "numeric",
Index: true,
Sortable: true,
Store: true,
},
"collection_count": &zinc.ZincIndexPropertyT{
Type: "numeric",
Index: true,
Sortable: true,
Store: true,
},
"upvote_count": &zinc.ZincIndexPropertyT{
Type: "numeric",
Index: true,
Sortable: true,
Store: true,
},
"visibility": &zinc.ZincIndexPropertyT{
Type: "numeric",
Index: true,
Sortable: true,
Store: true,
},
"is_top": &zinc.ZincIndexPropertyT{
Type: "numeric",
Index: true,
Sortable: true,
Store: true,
},
"is_essence": &zinc.ZincIndexPropertyT{
Type: "numeric",
Index: true,
Sortable: true,
Store: true,
},
"content": &zinc.ZincIndexPropertyT{
Type: "text",
Index: true,
Store: true,
Aggregatable: true,
Highlightable: true,
Analyzer: "gse_search",
SearchAnalyzer: "gse_standard",
},
"tags": &zinc.ZincIndexPropertyT{
Type: "keyword",
Index: true,
Store: true,
},
"ip_loc": &zinc.ZincIndexPropertyT{
Type: "keyword",
Index: true,
Store: true,
},
"latest_replied_on": &zinc.ZincIndexPropertyT{
Type: "numeric",
Index: true,
Sortable: true,
Store: true,
},
"attachment_price": &zinc.ZincIndexPropertyT{
Type: "numeric",
Sortable: true,
Store: true,
},
"created_on": &zinc.ZincIndexPropertyT{
Type: "numeric",
Index: true,
Sortable: true,
Store: true,
},
"modified_on": &zinc.ZincIndexPropertyT{
Type: "numeric",
Index: true,
Sortable: true,
Store: true,
},
})
}

@ -14,7 +14,7 @@ import (
func GetPostList(c *gin.Context) {
response := app.NewResponse(c)
q := &core.QueryT{
q := &core.QueryReq{
Query: c.Query("query"),
Type: "search",
}
@ -23,8 +23,8 @@ func GetPostList(c *gin.Context) {
}
userId, _ := userIdFrom(c)
offset, limit := app.GetPageOffset(c)
if q.Query == "" && q.Type == "search" {
offset, limit := app.GetPageOffset(c)
posts, err := service.GetIndexPosts(userId, offset, limit)
if err != nil {
logrus.Errorf("service.GetPostList err: %v\n", err)
@ -38,7 +38,7 @@ func GetPostList(c *gin.Context) {
response.ToResponseList(posts, totalRows)
} else {
posts, totalRows, err := service.GetPostListFromSearch(q, (app.GetPage(c)-1)*app.GetPageSize(c), app.GetPageSize(c))
posts, totalRows, err := service.GetPostListFromSearch(q, offset, limit)
if err != nil {
logrus.Errorf("service.GetPostListFromSearch err: %v\n", err)

@ -478,32 +478,24 @@ func GetPostCount(conditions *model.ConditionsT) (int64, error) {
return ds.GetPostCount(conditions)
}
func GetPostListFromSearch(q *core.QueryT, offset, limit int) ([]*model.PostFormated, int64, error) {
queryResult, err := ds.QueryAll(q, conf.ZincSetting.Index, offset, limit)
func GetPostListFromSearch(q *core.QueryReq, offset, limit int) ([]*model.PostFormated, int64, error) {
resp, err := ts.Search(q, offset, limit)
if err != nil {
return nil, 0, err
}
posts, err := FormatZincPost(queryResult)
posts, err := ds.RevampPosts(resp.Items)
if err != nil {
return nil, 0, err
}
return posts, queryResult.Hits.Total.Value, nil
return posts, resp.Total, nil
}
func GetPostListFromSearchByQuery(query string, offset, limit int) ([]*model.PostFormated, int64, error) {
queryResult, err := ds.QuerySearch(conf.ZincSetting.Index, query, offset, limit)
if err != nil {
return nil, 0, err
}
posts, err := FormatZincPost(queryResult)
if err != nil {
return nil, 0, err
q := &core.QueryReq{
Query: query,
Type: "search",
}
return posts, queryResult.Hits.Total.Value, nil
return GetPostListFromSearch(q, offset, limit)
}
func PushPostToSearch(post *model.Post) {
@ -512,8 +504,6 @@ func PushPostToSearch(post *model.Post) {
return
}
indexName := conf.ZincSetting.Index
postFormated := post.Format()
postFormated.User = &model.UserFormated{
ID: post.UserID,
@ -539,7 +529,7 @@ func PushPostToSearch(post *model.Post) {
data := []map[string]interface{}{}
data = append(data, map[string]interface{}{
"index": map[string]interface{}{
"_index": indexName,
"_index": ts.IndexName(),
"_id": fmt.Sprintf("%d", post.ID),
},
}, map[string]interface{}{
@ -560,13 +550,11 @@ func PushPostToSearch(post *model.Post) {
"modified_on": post.ModifiedOn,
})
ds.BulkPushDoc(data)
ts.AddDocuments(data)
}
func DeleteSearchPost(post *model.Post) error {
indexName := conf.ZincSetting.Index
return ds.DelDoc(indexName, fmt.Sprintf("%d", post.ID))
return ts.DeleteDocuments([]string{fmt.Sprintf("%d", post.ID)})
}
func PushPostsToSearch(c *gin.Context) {
@ -579,10 +567,6 @@ func PushPostsToSearch(c *gin.Context) {
pages := math.Ceil(float64(totalRows) / float64(splitNum))
nums := int(pages)
indexName := conf.ZincSetting.Index
// 创建索引
ds.CreateSearchIndex(indexName)
for i := 0; i < nums; i++ {
data := []map[string]interface{}{}
@ -605,7 +589,7 @@ func PushPostsToSearch(c *gin.Context) {
data = append(data, map[string]interface{}{
"index": map[string]interface{}{
"_index": indexName,
"_index": ts.IndexName(),
"_id": fmt.Sprintf("%d", post.ID),
},
}, map[string]interface{}{
@ -628,7 +612,7 @@ func PushPostsToSearch(c *gin.Context) {
}
if len(data) > 0 {
ds.BulkPushDoc(data)
ts.AddDocuments(data)
}
}

@ -4,19 +4,18 @@ import (
"github.com/rocboss/paopao-ce/internal/conf"
"github.com/rocboss/paopao-ce/internal/core"
"github.com/rocboss/paopao-ce/internal/dao"
"github.com/rocboss/paopao-ce/pkg/zinc"
)
var (
ds core.DataService
ts core.TweetSearchService
attachmentChecker core.AttachmentCheckService
DisablePhoneVerify bool
)
func Initialize() {
zincClient := zinc.NewClient(conf.ZincSetting)
ds = dao.NewDataService(conf.DBEngine, zincClient)
ds = dao.NewDataService()
ts = dao.NewTweetSearchService()
attachmentChecker = dao.NewAttachmentCheckerService()
DisablePhoneVerify = !conf.CfgIf("Sms")
}

Loading…
Cancel
Save