From edc52e71b15efdae293f94648cf09f57fca65376 Mon Sep 17 00:00:00 2001 From: alimy Date: Tue, 21 Jun 2022 03:45:11 +0800 Subject: [PATCH] optimize search abstract service interface --- internal/core/cache.go | 1 + internal/core/core.go | 2 +- internal/core/search.go | 24 ++-- internal/core/storage.go | 2 + internal/dao/cache.go | 40 +++++++ internal/dao/dao.go | 85 +------------- internal/dao/oss.go | 54 +++++++++ internal/dao/oss_alioss.go | 9 ++ internal/dao/oss_local.go | 9 ++ internal/dao/oss_minio.go | 10 ++ internal/dao/post.go | 77 ++++++++++++ internal/dao/post_index.go | 39 ------- internal/dao/search.go | 169 +++------------------------ internal/dao/search_zinc.go | 219 +++++++++++++++++++++++++++++++++++ internal/routers/api/post.go | 6 +- internal/service/post.go | 42 +++---- internal/service/service.go | 7 +- 17 files changed, 477 insertions(+), 318 deletions(-) create mode 100644 internal/dao/cache.go create mode 100644 internal/dao/oss.go create mode 100644 internal/dao/search_zinc.go diff --git a/internal/core/cache.go b/internal/core/cache.go index 383f760e..7ea40217 100644 --- a/internal/core/cache.go +++ b/internal/core/cache.go @@ -34,5 +34,6 @@ func (a IndexActionT) String() string { type CacheIndexService interface { VersionInfo IndexPostsService + SendAction(active IndexActionT) } diff --git a/internal/core/core.go b/internal/core/core.go index adcdf537..934a77e2 100644 --- a/internal/core/core.go +++ b/internal/core/core.go @@ -7,7 +7,6 @@ import ( // DataService data service interface that process data related logic on database type DataService interface { WalletService - SearchService IndexPostsService GetComments(conditions *model.ConditionsT, offset, limit int) ([]*model.Comment, error) @@ -38,6 +37,7 @@ type DataService interface { GetPostByID(id int64) (*model.Post, error) GetPosts(conditions *model.ConditionsT, offset, limit int) ([]*model.Post, error) MergePosts(posts []*model.Post) ([]*model.PostFormated, error) + RevampPosts(posts []*model.PostFormated) ([]*model.PostFormated, error) GetPostCount(conditions *model.ConditionsT) (int64, error) UpdatePost(post *model.Post) error GetUserPostStar(postID, userID int64) (*model.PostStar, error) diff --git a/internal/core/search.go b/internal/core/search.go index 823da0d3..8bfc7253 100644 --- a/internal/core/search.go +++ b/internal/core/search.go @@ -2,7 +2,6 @@ package core import ( "github.com/rocboss/paopao-ce/internal/model" - "github.com/rocboss/paopao-ce/pkg/zinc" ) const ( @@ -12,18 +11,23 @@ const ( type SearchType string -type QueryT struct { +type QueryReq struct { Query string Visibility []model.PostVisibleT Type SearchType } -// SearchService search service interface that implement base zinc -type SearchService interface { - CreateSearchIndex(indexName string) - BulkPushDoc(data []map[string]interface{}) (bool, error) - DelDoc(indexName, id string) error - QueryAll(q *QueryT, indexName string, offset, limit int) (*zinc.QueryResultT, error) - QuerySearch(indexName, query string, offset, limit int) (*zinc.QueryResultT, error) - QueryTagSearch(indexName, query string, offset, limit int) (*zinc.QueryResultT, error) +type QueryResp struct { + Items []*model.PostFormated + Total int64 +} + +// TweetSearchService tweet search service interface +type TweetSearchService interface { + VersionInfo + + IndexName() string + AddDocuments(documents []map[string]interface{}, primaryKey ...string) (bool, error) + DeleteDocuments(identifiers []string) error + Search(q *QueryReq, offset, limit int) (*QueryResp, error) } diff --git a/internal/core/storage.go b/internal/core/storage.go index f1aa7ce7..1be1d690 100644 --- a/internal/core/storage.go +++ b/internal/core/storage.go @@ -6,6 +6,8 @@ import ( // ObjectStorageService storage service interface that implement base AliOSS、MINIO or other type ObjectStorageService interface { + VersionInfo + PutObject(objectKey string, reader io.Reader, objectSize int64, contentType string) (string, error) SignURL(objectKey string, expiredInSec int64) (string, error) ObjectURL(objetKey string) string diff --git a/internal/dao/cache.go b/internal/dao/cache.go new file mode 100644 index 00000000..143d0d27 --- /dev/null +++ b/internal/dao/cache.go @@ -0,0 +1,40 @@ +package dao + +import ( + "sync/atomic" + "time" + + "github.com/allegro/bigcache/v3" + "github.com/rocboss/paopao-ce/internal/core" + "github.com/rocboss/paopao-ce/internal/model" +) + +var ( + _ core.CacheIndexService = (*simpleCacheIndexServant)(nil) + _ core.CacheIndexService = (*bigCacheIndexServant)(nil) +) + +type postsEntry struct { + key string + posts []*model.PostFormated +} + +type indexPostsFunc func(int64, int, int) ([]*model.PostFormated, error) + +type bigCacheIndexServant struct { + getIndexPosts indexPostsFunc + indexActionCh chan core.IndexActionT + cachePostsCh chan *postsEntry + cache *bigcache.BigCache + lastCacheResetTime time.Time +} + +type simpleCacheIndexServant struct { + getIndexPosts indexPostsFunc + indexActionCh chan core.IndexActionT + indexPosts []*model.PostFormated + atomicIndex atomic.Value + maxIndexSize int + checkTick *time.Ticker + expireIndexTick *time.Ticker +} diff --git a/internal/dao/dao.go b/internal/dao/dao.go index 1c1f9886..15488018 100644 --- a/internal/dao/dao.go +++ b/internal/dao/dao.go @@ -1,15 +1,8 @@ package dao import ( - "sync/atomic" - "time" - - "github.com/aliyun/aliyun-oss-go-sdk/oss" - "github.com/allegro/bigcache/v3" - "github.com/minio/minio-go/v7" "github.com/rocboss/paopao-ce/internal/conf" "github.com/rocboss/paopao-ce/internal/core" - "github.com/rocboss/paopao-ce/internal/model" "github.com/rocboss/paopao-ce/pkg/zinc" "github.com/sirupsen/logrus" "gorm.io/gorm" @@ -17,13 +10,8 @@ import ( var ( _ core.DataService = (*dataServant)(nil) - _ core.ObjectStorageService = (*aliossServant)(nil) - _ core.ObjectStorageService = (*minioServant)(nil) - _ core.ObjectStorageService = (*s3Servant)(nil) - _ core.ObjectStorageService = (*localossServant)(nil) _ core.AttachmentCheckService = (*attachmentCheckServant)(nil) - _ core.CacheIndexService = (*simpleCacheIndexServant)(nil) - _ core.CacheIndexService = (*bigCacheIndexServant)(nil) + _ core.TweetSearchService = (*zincTweetSearchServant)(nil) ) type dataServant struct { @@ -34,55 +22,15 @@ type dataServant struct { zinc *zinc.ZincClient } -type indexPostsFunc func(int64, int, int) ([]*model.PostFormated, error) -type simpleCacheIndexServant struct { - getIndexPosts indexPostsFunc - indexActionCh chan core.IndexActionT - indexPosts []*model.PostFormated - atomicIndex atomic.Value - maxIndexSize int - checkTick *time.Ticker - expireIndexTick *time.Ticker -} - -type postsEntry struct { - key string - posts []*model.PostFormated -} -type bigCacheIndexServant struct { - getIndexPosts indexPostsFunc - indexActionCh chan core.IndexActionT - cachePostsCh chan *postsEntry - cache *bigcache.BigCache - lastCacheResetTime time.Time -} - -type localossServant struct { - savePath string - domain string -} - -type aliossServant struct { - bucket *oss.Bucket - domain string -} - -type minioServant struct { - client *minio.Client - bucket string - domain string -} - -type s3Servant = minioServant - type attachmentCheckServant struct { domain string } -func NewDataService(engine *gorm.DB, zinc *zinc.ZincClient) core.DataService { +func NewDataService() core.DataService { + client := zinc.NewClient(conf.ZincSetting) ds := &dataServant{ - engine: engine, - zinc: zinc, + engine: conf.DBEngine, + zinc: client, } // initialize CacheIndex if needed @@ -96,33 +44,12 @@ func NewDataService(engine *gorm.DB, zinc *zinc.ZincClient) core.DataService { } if ds.useCacheIndex { - logrus.Infof("use cache index service by %s for version: %s", ds.cacheIndex.Name(), ds.cacheIndex.Version()) + logrus.Infof("use %s as cache index service by version: %s", ds.cacheIndex.Name(), ds.cacheIndex.Version()) } return ds } -func NewObjectStorageService() (oss core.ObjectStorageService) { - if conf.CfgIf("AliOSS") { - oss = newAliossServent() - logrus.Infoln("use AliOSS as object storage") - } else if conf.CfgIf("MinIO") { - oss = newMinioServeant() - logrus.Infoln("use MinIO as object storage") - } else if conf.CfgIf("S3") { - oss = newS3Servent() - logrus.Infoln("use S3 as object storage") - } else if conf.CfgIf("LocalOSS") { - oss = newLocalossServent() - logrus.Infoln("use LocalOSS as object storage") - } else { - // default use AliOSS - oss = newAliossServent() - logrus.Infoln("use default AliOSS as object storage") - } - return -} - func NewAttachmentCheckerService() core.AttachmentCheckService { return &attachmentCheckServant{ domain: getOssDomain(), diff --git a/internal/dao/oss.go b/internal/dao/oss.go new file mode 100644 index 00000000..e02a7f79 --- /dev/null +++ b/internal/dao/oss.go @@ -0,0 +1,54 @@ +package dao + +import ( + "github.com/aliyun/aliyun-oss-go-sdk/oss" + "github.com/minio/minio-go/v7" + "github.com/rocboss/paopao-ce/internal/conf" + "github.com/rocboss/paopao-ce/internal/core" + "github.com/sirupsen/logrus" +) + +var ( + _ core.ObjectStorageService = (*aliossServant)(nil) + _ core.ObjectStorageService = (*minioServant)(nil) + _ core.ObjectStorageService = (*s3Servant)(nil) + _ core.ObjectStorageService = (*localossServant)(nil) +) + +type localossServant struct { + savePath string + domain string +} + +type aliossServant struct { + bucket *oss.Bucket + domain string +} + +type minioServant struct { + client *minio.Client + bucket string + domain string +} + +type s3Servant = minioServant + +func NewObjectStorageService() (oss core.ObjectStorageService) { + if conf.CfgIf("AliOSS") { + oss = newAliossServent() + } else if conf.CfgIf("MinIO") { + oss = newMinioServeant() + } else if conf.CfgIf("S3") { + oss = newS3Servent() + logrus.Infof("use S3 as object storage by version %s", oss.Version()) + return + } else if conf.CfgIf("LocalOSS") { + oss = newLocalossServent() + } else { + // default use AliOSS as object storage service + oss = newAliossServent() + logrus.Infof("use default AliOSS as object storage by version %s", oss.Version()) + } + logrus.Infof("use %s as object storage by version %s", oss.Name(), oss.Version()) + return +} diff --git a/internal/dao/oss_alioss.go b/internal/dao/oss_alioss.go index 21ae949d..641d4a54 100644 --- a/internal/dao/oss_alioss.go +++ b/internal/dao/oss_alioss.go @@ -5,6 +5,7 @@ import ( "net/url" "strings" + "github.com/Masterminds/semver/v3" "github.com/aliyun/aliyun-oss-go-sdk/oss" "github.com/rocboss/paopao-ce/internal/conf" "github.com/sirupsen/logrus" @@ -27,6 +28,14 @@ func newAliossServent() *aliossServant { } } +func (s *aliossServant) Name() string { + return "AliOSS" +} + +func (s *aliossServant) Version() *semver.Version { + return semver.MustParse("v0.1.0") +} + func (s *aliossServant) PutObject(objectKey string, reader io.Reader, objectSize int64, contentType string) (string, error) { err := s.bucket.PutObject(objectKey, reader, oss.ContentLength(objectSize), oss.ContentType(contentType)) if err != nil { diff --git a/internal/dao/oss_local.go b/internal/dao/oss_local.go index 00ed1b45..46983260 100644 --- a/internal/dao/oss_local.go +++ b/internal/dao/oss_local.go @@ -9,6 +9,7 @@ import ( "strings" "time" + "github.com/Masterminds/semver/v3" "github.com/rocboss/paopao-ce/internal/conf" "github.com/sirupsen/logrus" ) @@ -25,6 +26,14 @@ func newLocalossServent() *localossServant { } } +func (s *localossServant) Name() string { + return "LocalOSS" +} + +func (s *localossServant) Version() *semver.Version { + return semver.MustParse("v0.1.0") +} + func (s *localossServant) PutObject(objectKey string, reader io.Reader, objectSize int64, contentType string) (string, error) { saveDir := s.savePath + filepath.Dir(objectKey) err := os.MkdirAll(saveDir, 0750) diff --git a/internal/dao/oss_minio.go b/internal/dao/oss_minio.go index 053a6c10..470f03b1 100644 --- a/internal/dao/oss_minio.go +++ b/internal/dao/oss_minio.go @@ -7,6 +7,7 @@ import ( "strings" "time" + "github.com/Masterminds/semver/v3" "github.com/minio/minio-go/v7" "github.com/minio/minio-go/v7/pkg/credentials" "github.com/rocboss/paopao-ce/internal/conf" @@ -29,6 +30,14 @@ func newMinioServeant() *minioServant { } } +func (s *minioServant) Name() string { + return "MinIO" +} + +func (s *minioServant) Version() *semver.Version { + return semver.MustParse("v0.1.0") +} + func (s *minioServant) PutObject(objectKey string, reader io.Reader, objectSize int64, contentType string) (string, error) { uploadInfo, err := s.client.PutObject(context.Background(), s.bucket, objectKey, reader, objectSize, minio.PutObjectOptions{ContentType: contentType}) if err != nil { @@ -37,6 +46,7 @@ func (s *minioServant) PutObject(objectKey string, reader io.Reader, objectSize logrus.Infoln("Successfully uploaded bytes: ", uploadInfo) return s.domain + objectKey, nil } + func (s *minioServant) SignURL(objectKey string, expiredInSec int64) (string, error) { // TODO: Set request parameters for content-disposition. reqParams := make(url.Values) diff --git a/internal/dao/post.go b/internal/dao/post.go index 328de819..028d0fbc 100644 --- a/internal/dao/post.go +++ b/internal/dao/post.go @@ -207,3 +207,80 @@ func (d *dataServant) GetPostAttatchmentBill(postID, userID int64) (*model.PostA return bill.Get(d.engine) } + +// MergePosts post数据整合 +func (d *dataServant) MergePosts(posts []*model.Post) ([]*model.PostFormated, error) { + postIds := make([]int64, 0, len(posts)) + userIds := make([]int64, 0, len(posts)) + for _, post := range posts { + postIds = append(postIds, post.ID) + userIds = append(userIds, post.UserID) + } + + postContents, err := d.GetPostContentsByIDs(postIds) + if err != nil { + return nil, err + } + + users, err := d.GetUsersByIDs(userIds) + if err != nil { + return nil, err + } + + userMap := make(map[int64]*model.UserFormated, len(users)) + for _, user := range users { + userMap[user.ID] = user.Format() + } + + contentMap := make(map[int64][]*model.PostContentFormated, len(postContents)) + for _, content := range postContents { + contentMap[content.PostID] = append(contentMap[content.PostID], content.Format()) + } + + // 数据整合 + postsFormated := make([]*model.PostFormated, 0, len(posts)) + for _, post := range posts { + postFormated := post.Format() + postFormated.User = userMap[post.UserID] + postFormated.Contents = contentMap[post.ID] + postsFormated = append(postsFormated, postFormated) + } + return postsFormated, nil +} + +// RevampPosts post数据整形修复 +func (d *dataServant) RevampPosts(posts []*model.PostFormated) ([]*model.PostFormated, error) { + postIds := make([]int64, 0, len(posts)) + userIds := make([]int64, 0, len(posts)) + for _, post := range posts { + postIds = append(postIds, post.ID) + userIds = append(userIds, post.UserID) + } + + postContents, err := d.GetPostContentsByIDs(postIds) + if err != nil { + return nil, err + } + + users, err := d.GetUsersByIDs(userIds) + if err != nil { + return nil, err + } + + userMap := make(map[int64]*model.UserFormated, len(users)) + for _, user := range users { + userMap[user.ID] = user.Format() + } + + contentMap := make(map[int64][]*model.PostContentFormated, len(postContents)) + for _, content := range postContents { + contentMap[content.PostID] = append(contentMap[content.PostID], content.Format()) + } + + // 数据整合 + for _, post := range posts { + post.User = userMap[post.UserID] + post.Contents = contentMap[post.ID] + } + return posts, nil +} diff --git a/internal/dao/post_index.go b/internal/dao/post_index.go index 4802f1c8..1f35e164 100644 --- a/internal/dao/post_index.go +++ b/internal/dao/post_index.go @@ -17,45 +17,6 @@ func (d *dataServant) IndexPosts(userId int64, offset int, limit int) ([]*model. return d.getIndexPosts(userId, offset, limit) } -func (d *dataServant) MergePosts(posts []*model.Post) ([]*model.PostFormated, error) { - postIds := make([]int64, 0, len(posts)) - userIds := make([]int64, 0, len(posts)) - for _, post := range posts { - postIds = append(postIds, post.ID) - userIds = append(userIds, post.UserID) - } - - postContents, err := d.GetPostContentsByIDs(postIds) - if err != nil { - return nil, err - } - - users, err := d.GetUsersByIDs(userIds) - if err != nil { - return nil, err - } - - userMap := make(map[int64]*model.UserFormated, len(users)) - for _, user := range users { - userMap[user.ID] = user.Format() - } - - contentMap := make(map[int64][]*model.PostContentFormated, len(postContents)) - for _, content := range postContents { - contentMap[content.PostID] = append(contentMap[content.PostID], content.Format()) - } - - // 数据整合 - postsFormated := make([]*model.PostFormated, 0, len(posts)) - for _, post := range posts { - postFormated := post.Format() - postFormated.User = userMap[post.UserID] - postFormated.Contents = contentMap[post.ID] - postsFormated = append(postsFormated, postFormated) - } - return postsFormated, nil -} - // getIndexPosts _userId保留未来使用 // TODO: 未来可能根据userId查询广场推文列表,简单做到不同用户的主页都是不同的; func (d *dataServant) getIndexPosts(_userId int64, offset int, limit int) ([]*model.PostFormated, error) { diff --git a/internal/dao/search.go b/internal/dao/search.go index d30417db..992022d7 100644 --- a/internal/dao/search.go +++ b/internal/dao/search.go @@ -1,165 +1,28 @@ package dao import ( + "github.com/rocboss/paopao-ce/internal/conf" "github.com/rocboss/paopao-ce/internal/core" "github.com/rocboss/paopao-ce/pkg/zinc" + "github.com/sirupsen/logrus" ) -func (d *dataServant) CreateSearchIndex(indexName string) { - // 不存在则创建索引 - d.zinc.CreateIndex(indexName, &zinc.ZincIndexProperty{ - "id": &zinc.ZincIndexPropertyT{ - Type: "numeric", - Index: true, - Store: true, - Sortable: true, - }, - "user_id": &zinc.ZincIndexPropertyT{ - Type: "numeric", - Index: true, - Store: true, - }, - "comment_count": &zinc.ZincIndexPropertyT{ - Type: "numeric", - Index: true, - Sortable: true, - Store: true, - }, - "collection_count": &zinc.ZincIndexPropertyT{ - Type: "numeric", - Index: true, - Sortable: true, - Store: true, - }, - "upvote_count": &zinc.ZincIndexPropertyT{ - Type: "numeric", - Index: true, - Sortable: true, - Store: true, - }, - "is_top": &zinc.ZincIndexPropertyT{ - Type: "numeric", - Index: true, - Sortable: true, - Store: true, - }, - "is_essence": &zinc.ZincIndexPropertyT{ - Type: "numeric", - Index: true, - Sortable: true, - Store: true, - }, - "content": &zinc.ZincIndexPropertyT{ - Type: "text", - Index: true, - Store: true, - Aggregatable: true, - Highlightable: true, - Analyzer: "gse_search", - SearchAnalyzer: "gse_standard", - }, - "tags": &zinc.ZincIndexPropertyT{ - Type: "keyword", - Index: true, - Store: true, - }, - "ip_loc": &zinc.ZincIndexPropertyT{ - Type: "keyword", - Index: true, - Store: true, - }, - "latest_replied_on": &zinc.ZincIndexPropertyT{ - Type: "numeric", - Index: true, - Sortable: true, - Store: true, - }, - "attachment_price": &zinc.ZincIndexPropertyT{ - Type: "numeric", - Sortable: true, - Store: true, - }, - "created_on": &zinc.ZincIndexPropertyT{ - Type: "numeric", - Index: true, - Sortable: true, - Store: true, - }, - "modified_on": &zinc.ZincIndexPropertyT{ - Type: "numeric", - Index: true, - Sortable: true, - Store: true, - }, - }) - -} - -func (d *dataServant) BulkPushDoc(data []map[string]interface{}) (bool, error) { - return d.zinc.BulkPushDoc(data) -} - -func (d *dataServant) DelDoc(indexName, id string) error { - return d.zinc.DelDoc(indexName, id) -} - -func (d *dataServant) QueryAll(q *core.QueryT, indexName string, offset, limit int) (*zinc.QueryResultT, error) { - // 普通搜索 - if q.Type == core.SearchTypeDefault && q.Query != "" { - return d.QuerySearch(indexName, q.Query, offset, limit) - } - // Tag分类 - if q.Type == core.SearchTypeTag && q.Query != "" { - return d.QueryTagSearch(indexName, q.Query, offset, limit) - } - - queryMap := map[string]interface{}{ - "query": map[string]interface{}{ - "match_all": map[string]string{}, - }, - "sort": []string{"-is_top", "-latest_replied_on"}, - "from": offset, - "size": limit, - } - rsp, err := d.zinc.EsQuery(indexName, queryMap) - if err != nil { - return nil, err - } - - return rsp, err -} - -func (d *dataServant) QuerySearch(indexName, query string, offset, limit int) (*zinc.QueryResultT, error) { - rsp, err := d.zinc.EsQuery(indexName, map[string]interface{}{ - "query": map[string]interface{}{ - "match_phrase": map[string]interface{}{ - "content": query, - }, - }, - "sort": []string{"-is_top", "-latest_replied_on"}, - "from": offset, - "size": limit, - }) - if err != nil { - return nil, err - } +var ( + _ core.TweetSearchService = (*zincTweetSearchServant)(nil) +) - return rsp, err +type zincTweetSearchServant struct { + indexName string + client *zinc.ZincClient } -func (d *dataServant) QueryTagSearch(indexName, query string, offset, limit int) (*zinc.QueryResultT, error) { - rsp, err := d.zinc.ApiQuery(indexName, map[string]interface{}{ - "search_type": "querystring", - "query": map[string]interface{}{ - "term": "tags." + query + ":1", - }, - "sort_fields": []string{"-is_top", "-latest_replied_on"}, - "from": offset, - "max_results": limit, - }) - if err != nil { - return nil, err +func NewTweetSearchService() (ts core.TweetSearchService) { + if conf.CfgIf("Zinc") { + ts = newZincTweetSearchServant() + } else { + // default use Zinc as tweet search service + ts = newZincTweetSearchServant() } - - return rsp, err + logrus.Infof("use %s as tweet search serice by version %s", ts.Name(), ts.Version()) + return } diff --git a/internal/dao/search_zinc.go b/internal/dao/search_zinc.go new file mode 100644 index 00000000..3d52c2b6 --- /dev/null +++ b/internal/dao/search_zinc.go @@ -0,0 +1,219 @@ +package dao + +import ( + "github.com/Masterminds/semver/v3" + "github.com/rocboss/paopao-ce/internal/conf" + "github.com/rocboss/paopao-ce/internal/core" + "github.com/rocboss/paopao-ce/internal/model" + "github.com/rocboss/paopao-ce/pkg/json" + "github.com/rocboss/paopao-ce/pkg/zinc" +) + +func newZincTweetSearchServant() *zincTweetSearchServant { + s := conf.ZincSetting + zts := &zincTweetSearchServant{ + indexName: s.Index, + client: zinc.NewClient(s), + } + zts.createIndex() + + return zts +} + +func (s *zincTweetSearchServant) Name() string { + return "Zinc" +} + +func (s *zincTweetSearchServant) Version() *semver.Version { + return semver.MustParse("v0.1.0") +} + +func (s *zincTweetSearchServant) IndexName() string { + return s.indexName +} + +func (s *zincTweetSearchServant) AddDocuments(data []map[string]interface{}, primaryKey ...string) (bool, error) { + return s.client.BulkPushDoc(data) +} + +func (s *zincTweetSearchServant) DeleteDocuments(identifiers []string) error { + for _, id := range identifiers { + if err := s.client.DelDoc(s.indexName, id); err != nil { + return err + } + } + return nil +} + +func (s *zincTweetSearchServant) Search(q *core.QueryReq, offset, limit int) (*core.QueryResp, error) { + if q.Type == core.SearchTypeDefault && q.Query != "" { + return s.queryByContent(q, offset, limit) + } else if q.Type == core.SearchTypeTag && q.Query != "" { + return s.queryByTag(q, offset, limit) + } + return s.queryAny(offset, limit) +} + +func (s *zincTweetSearchServant) queryByContent(q *core.QueryReq, offset, limit int) (*core.QueryResp, error) { + resp, err := s.client.EsQuery(s.indexName, map[string]interface{}{ + "query": map[string]interface{}{ + "match_phrase": map[string]interface{}{ + "content": q.Query, + }, + }, + "sort": []string{"-is_top", "-latest_replied_on"}, + "from": offset, + "size": limit, + }) + if err != nil { + return nil, err + } + return s.postsFrom(resp) +} + +func (s *zincTweetSearchServant) queryByTag(q *core.QueryReq, offset, limit int) (*core.QueryResp, error) { + resp, err := s.client.ApiQuery(s.indexName, map[string]interface{}{ + "search_type": "querystring", + "query": map[string]interface{}{ + "term": "tags." + q.Query + ":1", + }, + "sort_fields": []string{"-is_top", "-latest_replied_on"}, + "from": offset, + "max_results": limit, + }) + if err != nil { + return nil, err + } + return s.postsFrom(resp) +} + +func (s *zincTweetSearchServant) queryAny(offset, limit int) (*core.QueryResp, error) { + queryMap := map[string]interface{}{ + "query": map[string]interface{}{ + "match_all": map[string]string{}, + }, + "sort": []string{"-is_top", "-latest_replied_on"}, + "from": offset, + "size": limit, + } + resp, err := s.client.EsQuery(s.indexName, queryMap) + if err != nil { + return nil, err + } + return s.postsFrom(resp) +} + +func (s *zincTweetSearchServant) postsFrom(resp *zinc.QueryResultT) (*core.QueryResp, error) { + posts := make([]*model.PostFormated, 0, len(resp.Hits.Hits)) + for _, hit := range resp.Hits.Hits { + item := &model.PostFormated{} + raw, err := json.Marshal(hit.Source) + if err != nil { + return nil, err + } + if err = json.Unmarshal(raw, item); err != nil { + return nil, err + } + posts = append(posts, item) + } + + return &core.QueryResp{ + Items: posts, + Total: resp.Hits.Total.Value, + }, nil +} + +func (s *zincTweetSearchServant) createIndex() { + // 不存在则创建索引 + s.client.CreateIndex(s.indexName, &zinc.ZincIndexProperty{ + "id": &zinc.ZincIndexPropertyT{ + Type: "numeric", + Index: true, + Store: true, + Sortable: true, + }, + "user_id": &zinc.ZincIndexPropertyT{ + Type: "numeric", + Index: true, + Store: true, + }, + "comment_count": &zinc.ZincIndexPropertyT{ + Type: "numeric", + Index: true, + Sortable: true, + Store: true, + }, + "collection_count": &zinc.ZincIndexPropertyT{ + Type: "numeric", + Index: true, + Sortable: true, + Store: true, + }, + "upvote_count": &zinc.ZincIndexPropertyT{ + Type: "numeric", + Index: true, + Sortable: true, + Store: true, + }, + "visibility": &zinc.ZincIndexPropertyT{ + Type: "numeric", + Index: true, + Sortable: true, + Store: true, + }, + "is_top": &zinc.ZincIndexPropertyT{ + Type: "numeric", + Index: true, + Sortable: true, + Store: true, + }, + "is_essence": &zinc.ZincIndexPropertyT{ + Type: "numeric", + Index: true, + Sortable: true, + Store: true, + }, + "content": &zinc.ZincIndexPropertyT{ + Type: "text", + Index: true, + Store: true, + Aggregatable: true, + Highlightable: true, + Analyzer: "gse_search", + SearchAnalyzer: "gse_standard", + }, + "tags": &zinc.ZincIndexPropertyT{ + Type: "keyword", + Index: true, + Store: true, + }, + "ip_loc": &zinc.ZincIndexPropertyT{ + Type: "keyword", + Index: true, + Store: true, + }, + "latest_replied_on": &zinc.ZincIndexPropertyT{ + Type: "numeric", + Index: true, + Sortable: true, + Store: true, + }, + "attachment_price": &zinc.ZincIndexPropertyT{ + Type: "numeric", + Sortable: true, + Store: true, + }, + "created_on": &zinc.ZincIndexPropertyT{ + Type: "numeric", + Index: true, + Sortable: true, + Store: true, + }, + "modified_on": &zinc.ZincIndexPropertyT{ + Type: "numeric", + Index: true, + Sortable: true, + Store: true, + }, + }) +} diff --git a/internal/routers/api/post.go b/internal/routers/api/post.go index 0f6f62a8..a779532b 100644 --- a/internal/routers/api/post.go +++ b/internal/routers/api/post.go @@ -14,7 +14,7 @@ import ( func GetPostList(c *gin.Context) { response := app.NewResponse(c) - q := &core.QueryT{ + q := &core.QueryReq{ Query: c.Query("query"), Type: "search", } @@ -23,8 +23,8 @@ func GetPostList(c *gin.Context) { } userId, _ := userIdFrom(c) + offset, limit := app.GetPageOffset(c) if q.Query == "" && q.Type == "search" { - offset, limit := app.GetPageOffset(c) posts, err := service.GetIndexPosts(userId, offset, limit) if err != nil { logrus.Errorf("service.GetPostList err: %v\n", err) @@ -38,7 +38,7 @@ func GetPostList(c *gin.Context) { response.ToResponseList(posts, totalRows) } else { - posts, totalRows, err := service.GetPostListFromSearch(q, (app.GetPage(c)-1)*app.GetPageSize(c), app.GetPageSize(c)) + posts, totalRows, err := service.GetPostListFromSearch(q, offset, limit) if err != nil { logrus.Errorf("service.GetPostListFromSearch err: %v\n", err) diff --git a/internal/service/post.go b/internal/service/post.go index e5960074..5a4a4d92 100644 --- a/internal/service/post.go +++ b/internal/service/post.go @@ -478,32 +478,24 @@ func GetPostCount(conditions *model.ConditionsT) (int64, error) { return ds.GetPostCount(conditions) } -func GetPostListFromSearch(q *core.QueryT, offset, limit int) ([]*model.PostFormated, int64, error) { - queryResult, err := ds.QueryAll(q, conf.ZincSetting.Index, offset, limit) +func GetPostListFromSearch(q *core.QueryReq, offset, limit int) ([]*model.PostFormated, int64, error) { + resp, err := ts.Search(q, offset, limit) if err != nil { return nil, 0, err } - - posts, err := FormatZincPost(queryResult) + posts, err := ds.RevampPosts(resp.Items) if err != nil { return nil, 0, err } - - return posts, queryResult.Hits.Total.Value, nil + return posts, resp.Total, nil } func GetPostListFromSearchByQuery(query string, offset, limit int) ([]*model.PostFormated, int64, error) { - queryResult, err := ds.QuerySearch(conf.ZincSetting.Index, query, offset, limit) - if err != nil { - return nil, 0, err - } - - posts, err := FormatZincPost(queryResult) - if err != nil { - return nil, 0, err + q := &core.QueryReq{ + Query: query, + Type: "search", } - - return posts, queryResult.Hits.Total.Value, nil + return GetPostListFromSearch(q, offset, limit) } func PushPostToSearch(post *model.Post) { @@ -512,8 +504,6 @@ func PushPostToSearch(post *model.Post) { return } - indexName := conf.ZincSetting.Index - postFormated := post.Format() postFormated.User = &model.UserFormated{ ID: post.UserID, @@ -539,7 +529,7 @@ func PushPostToSearch(post *model.Post) { data := []map[string]interface{}{} data = append(data, map[string]interface{}{ "index": map[string]interface{}{ - "_index": indexName, + "_index": ts.IndexName(), "_id": fmt.Sprintf("%d", post.ID), }, }, map[string]interface{}{ @@ -560,13 +550,11 @@ func PushPostToSearch(post *model.Post) { "modified_on": post.ModifiedOn, }) - ds.BulkPushDoc(data) + ts.AddDocuments(data) } func DeleteSearchPost(post *model.Post) error { - indexName := conf.ZincSetting.Index - - return ds.DelDoc(indexName, fmt.Sprintf("%d", post.ID)) + return ts.DeleteDocuments([]string{fmt.Sprintf("%d", post.ID)}) } func PushPostsToSearch(c *gin.Context) { @@ -579,10 +567,6 @@ func PushPostsToSearch(c *gin.Context) { pages := math.Ceil(float64(totalRows) / float64(splitNum)) nums := int(pages) - indexName := conf.ZincSetting.Index - // 创建索引 - ds.CreateSearchIndex(indexName) - for i := 0; i < nums; i++ { data := []map[string]interface{}{} @@ -605,7 +589,7 @@ func PushPostsToSearch(c *gin.Context) { data = append(data, map[string]interface{}{ "index": map[string]interface{}{ - "_index": indexName, + "_index": ts.IndexName(), "_id": fmt.Sprintf("%d", post.ID), }, }, map[string]interface{}{ @@ -628,7 +612,7 @@ func PushPostsToSearch(c *gin.Context) { } if len(data) > 0 { - ds.BulkPushDoc(data) + ts.AddDocuments(data) } } diff --git a/internal/service/service.go b/internal/service/service.go index 26682173..3ed9d261 100644 --- a/internal/service/service.go +++ b/internal/service/service.go @@ -4,19 +4,18 @@ import ( "github.com/rocboss/paopao-ce/internal/conf" "github.com/rocboss/paopao-ce/internal/core" "github.com/rocboss/paopao-ce/internal/dao" - "github.com/rocboss/paopao-ce/pkg/zinc" ) var ( ds core.DataService + ts core.TweetSearchService attachmentChecker core.AttachmentCheckService DisablePhoneVerify bool ) func Initialize() { - zincClient := zinc.NewClient(conf.ZincSetting) - ds = dao.NewDataService(conf.DBEngine, zincClient) - + ds = dao.NewDataService() + ts = dao.NewTweetSearchService() attachmentChecker = dao.NewAttachmentCheckerService() DisablePhoneVerify = !conf.CfgIf("Sms") }