Merge pull request #121 from alimy/pr-search

optimize #118 wrap async interface for update documents to search engine
pull/125/head
Michael Li 2 years ago committed by GitHub
commit 5e8ae40807
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -26,16 +26,16 @@ SmsJuhe:
Alipay: Alipay:
AppID: AppID:
PrivateKey: PrivateKey:
CacheIndex:
MaxUpdateQPS: 100 # 最大添加/删除/更新Post的QPS, 设置范围[10, 10000], 默认100
SimpleCacheIndex: # 缓存泡泡广场消息流 SimpleCacheIndex: # 缓存泡泡广场消息流
MaxIndexSize: 200 # 最大缓存条数 MaxIndexSize: 200 # 最大缓存条数
CheckTickDuration: 60 # 循环自检查每多少秒一次 CheckTickDuration: 60 # 循环自检查每多少秒一次
ExpireTickDuration: 300 # 每多少秒后强制过期缓存, 设置为0禁止强制使缓存过期 ExpireTickDuration: 300 # 每多少秒后强制过期缓存, 设置为0禁止强制使缓存过期
ActionQPS: 100 # 添加/删除/更新Post的QPS, 默认100范围设置[10, 10000]
BigCacheIndex: # 使用BigCache缓存泡泡广场消息流 BigCacheIndex: # 使用BigCache缓存泡泡广场消息流
MaxIndexPage: 1024 # 最大缓存页数必须是2^n, 代表最大同时缓存多少页数据 MaxIndexPage: 1024 # 最大缓存页数必须是2^n, 代表最大同时缓存多少页数据
Verbose: False # 是否打印cache操作的log Verbose: False # 是否打印cache操作的log
ExpireInSecond: 300 # 多少秒(>0)后强制过期缓存 ExpireInSecond: 300 # 多少秒(>0)后强制过期缓存
UpdateQPS: 100 # 添加/删除/更新Post的QPS, 默认100
LoggerFile: # 使用File写日志 LoggerFile: # 使用File写日志
SavePath: data/paopao-ce/logs SavePath: data/paopao-ce/logs
FileName: app FileName: app
@ -51,6 +51,8 @@ JWT: # 鉴权加密
Secret: 18a6413dc4fe394c66345ebe501b2f26 Secret: 18a6413dc4fe394c66345ebe501b2f26
Issuer: paopao-api Issuer: paopao-api
Expire: 86400 Expire: 86400
TweetSearch: # 推文关键字搜索相关配置
MaxUpdateQPS: 100 # 最大添加/删除/更新Post的QPS设置范围[10, 10000], 默认100
Zinc: # Zinc搜索配置 Zinc: # Zinc搜索配置
Host: http://zinc:4080 Host: http://zinc:4080
Index: paopao-data Index: paopao-data

@ -18,10 +18,12 @@ var (
ServerSetting *ServerSettingS ServerSetting *ServerSettingS
AppSetting *AppSettingS AppSetting *AppSettingS
CacheIndexSetting *CacheIndexSettingS
SimpleCacheIndexSetting *SimpleCacheIndexSettingS SimpleCacheIndexSetting *SimpleCacheIndexSettingS
BigCacheIndexSetting *BigCacheIndexSettingS BigCacheIndexSetting *BigCacheIndexSettingS
SmsJuheSetting *SmsJuheSettings SmsJuheSetting *SmsJuheSettings
AlipaySetting *AlipaySettingS AlipaySetting *AlipaySettingS
TweetSearchSetting *TweetSearchS
ZincSetting *ZincSettingS ZincSetting *ZincSettingS
AliOSSSetting *AliOSSSettingS AliOSSSetting *AliOSSSettingS
MinIOSetting *MinIOSettingS MinIOSetting *MinIOSettingS
@ -47,6 +49,7 @@ func setupSetting(suite []string, noDefault bool) error {
objects := map[string]interface{}{ objects := map[string]interface{}{
"App": &AppSetting, "App": &AppSetting,
"Server": &ServerSetting, "Server": &ServerSetting,
"CacheIndex": &CacheIndexSetting,
"SimpleCacheIndex": &SimpleCacheIndexSetting, "SimpleCacheIndex": &SimpleCacheIndexSetting,
"BigCacheIndex": &BigCacheIndexSetting, "BigCacheIndex": &BigCacheIndexSetting,
"Alipay": &AlipaySetting, "Alipay": &AlipaySetting,
@ -57,6 +60,7 @@ func setupSetting(suite []string, noDefault bool) error {
"MySQL": &mysqlSetting, "MySQL": &mysqlSetting,
"Postgres": &postgresSetting, "Postgres": &postgresSetting,
"Sqlite3": &sqlite3Setting, "Sqlite3": &sqlite3Setting,
"TweetSearch": &TweetSearchSetting,
"Zinc": &ZincSetting, "Zinc": &ZincSetting,
"Redis": &redisSetting, "Redis": &redisSetting,
"JWT": &JWTSetting, "JWT": &JWTSetting,

@ -47,18 +47,20 @@ type AppSettingS struct {
TronApiKeys []string TronApiKeys []string
} }
type CacheIndexSettingS struct {
MaxUpdateQPS int
}
type SimpleCacheIndexSettingS struct { type SimpleCacheIndexSettingS struct {
MaxIndexSize int MaxIndexSize int
CheckTickDuration time.Duration CheckTickDuration time.Duration
ExpireTickDuration time.Duration ExpireTickDuration time.Duration
ActionQPS int
} }
type BigCacheIndexSettingS struct { type BigCacheIndexSettingS struct {
MaxIndexPage int MaxIndexPage int
ExpireInSecond time.Duration ExpireInSecond time.Duration
Verbose bool Verbose bool
UpdateQPS int
} }
type AlipaySettingS struct { type AlipaySettingS struct {
@ -78,6 +80,10 @@ type FeaturesSettingS struct {
features map[string]string features map[string]string
} }
type TweetSearchS struct {
MaxUpdateQPS int
}
type ZincSettingS struct { type ZincSettingS struct {
Host string Host string
Index string Index string

@ -22,12 +22,14 @@ type QueryResp struct {
Total int64 Total int64
} }
type DocItems []map[string]interface{}
// TweetSearchService tweet search service interface // TweetSearchService tweet search service interface
type TweetSearchService interface { type TweetSearchService interface {
VersionInfo VersionInfo
IndexName() string IndexName() string
AddDocuments(documents []map[string]interface{}, primaryKey ...string) (bool, error) AddDocuments(documents DocItems, primaryKey ...string) (bool, error)
DeleteDocuments(identifiers []string) error DeleteDocuments(identifiers []string) error
Search(q *QueryReq, offset, limit int) (*QueryResp, error) Search(q *QueryReq, offset, limit int) (*QueryResp, error)
} }

@ -34,7 +34,7 @@ func newBigCacheIndexServant(getIndexPosts indexPostsFunc) *bigCacheIndexServant
// indexActionCh capacity custom configure by conf.yaml need in [10, 10000] // indexActionCh capacity custom configure by conf.yaml need in [10, 10000]
// or re-compile source to adjust min/max capacity // or re-compile source to adjust min/max capacity
capacity := s.UpdateQPS capacity := conf.CacheIndexSetting.MaxUpdateQPS
if capacity < 10 { if capacity < 10 {
capacity = 10 capacity = 10
} else if capacity > 10000 { } else if capacity > 10000 {
@ -43,6 +43,7 @@ func newBigCacheIndexServant(getIndexPosts indexPostsFunc) *bigCacheIndexServant
cacheIndex.indexActionCh = make(chan core.IndexActionT, capacity) cacheIndex.indexActionCh = make(chan core.IndexActionT, capacity)
cacheIndex.cachePostsCh = make(chan *postsEntry, capacity) cacheIndex.cachePostsCh = make(chan *postsEntry, capacity)
// 启动索引更新器
go cacheIndex.startIndexPosts() go cacheIndex.startIndexPosts()
return cacheIndex return cacheIndex
@ -84,10 +85,10 @@ func (s *bigCacheIndexServant) cachePosts(key string, posts []*model.PostFormate
entry := &postsEntry{key: key, posts: posts} entry := &postsEntry{key: key, posts: posts}
select { select {
case s.cachePostsCh <- entry: case s.cachePostsCh <- entry:
logrus.Debugf("send indexAction by chan of key: %s", key) logrus.Debugf("cachePosts by chan of key: %s", key)
default: default:
go func(ch chan<- *postsEntry, entry *postsEntry) { go func(ch chan<- *postsEntry, entry *postsEntry) {
logrus.Debugf("send indexAction by goroutine of key: %s", key) logrus.Debugf("cachePosts indexAction by goroutine of key: %s", key)
ch <- entry ch <- entry
}(s.cachePostsCh, entry) }(s.cachePostsCh, entry)
} }

@ -34,7 +34,7 @@ func newSimpleCacheIndexServant(getIndexPosts indexPostsFunc) *simpleCacheIndexS
// indexActionCh capacity custom configure by conf.yaml need in [10, 10000] // indexActionCh capacity custom configure by conf.yaml need in [10, 10000]
// or re-compile source to adjust min/max capacity // or re-compile source to adjust min/max capacity
capacity := s.ActionQPS capacity := conf.CacheIndexSetting.MaxUpdateQPS
if capacity < 10 { if capacity < 10 {
capacity = 10 capacity = 10
} else if capacity > 10000 { } else if capacity > 10000 {

@ -50,7 +50,7 @@ func NewDataService() core.DataService {
return ds return ds
} }
func NewAttachmentCheckerService() core.AttachmentCheckService { func NewAttachmentCheckService() core.AttachmentCheckService {
return &attachmentCheckServant{ return &attachmentCheckServant{
domain: getOssDomain(), domain: getOssDomain(),
} }

@ -48,6 +48,7 @@ func NewObjectStorageService() (oss core.ObjectStorageService) {
// default use AliOSS as object storage service // default use AliOSS as object storage service
oss = newAliossServent() oss = newAliossServent()
logrus.Infof("use default AliOSS as object storage by version %s", oss.Version()) logrus.Infof("use default AliOSS as object storage by version %s", oss.Version())
return
} }
logrus.Infof("use %s as object storage by version %s", oss.Name(), oss.Version()) logrus.Infof("use %s as object storage by version %s", oss.Name(), oss.Version())
return return

@ -9,20 +9,46 @@ import (
var ( var (
_ core.TweetSearchService = (*zincTweetSearchServant)(nil) _ core.TweetSearchService = (*zincTweetSearchServant)(nil)
_ core.TweetSearchService = (*bridgeTweetSearchServant)(nil)
) )
type documents struct {
primaryKey []string
docItems core.DocItems
identifiers []string
}
type bridgeTweetSearchServant struct {
ts core.TweetSearchService
updateDocsCh chan *documents
}
type zincTweetSearchServant struct { type zincTweetSearchServant struct {
indexName string indexName string
client *zinc.ZincClient client *zinc.ZincClient
} }
func NewTweetSearchService() (ts core.TweetSearchService) { func NewTweetSearchService() core.TweetSearchService {
bts := &bridgeTweetSearchServant{}
capacity := conf.TweetSearchSetting.MaxUpdateQPS
if capacity < 10 {
capacity = 10
} else if capacity > 10000 {
capacity = 10000
}
bts.updateDocsCh = make(chan *documents, capacity)
if conf.CfgIf("Zinc") { if conf.CfgIf("Zinc") {
ts = newZincTweetSearchServant() bts.ts = newZincTweetSearchServant()
} else { } else {
// default use Zinc as tweet search service // default use Zinc as tweet search service
ts = newZincTweetSearchServant() bts.ts = newZincTweetSearchServant()
} }
logrus.Infof("use %s as tweet search serice by version %s", ts.Name(), ts.Version()) logrus.Infof("use %s as tweet search serice by version %s", bts.ts.Name(), bts.ts.Version())
return
// 启动文档更新器
go bts.startUpdateDocs()
return bts
} }

@ -0,0 +1,65 @@
package dao
import (
"github.com/Masterminds/semver/v3"
"github.com/rocboss/paopao-ce/internal/core"
"github.com/sirupsen/logrus"
)
func (s *bridgeTweetSearchServant) Name() string {
return "BridgeTweetSearch"
}
func (s *bridgeTweetSearchServant) Version() *semver.Version {
return semver.MustParse("v0.1.0")
}
func (s *bridgeTweetSearchServant) IndexName() string {
return s.ts.IndexName()
}
func (s *bridgeTweetSearchServant) AddDocuments(data core.DocItems, primaryKey ...string) (bool, error) {
s.updateDocs(&documents{
primaryKey: primaryKey,
docItems: data,
})
return true, nil
}
func (s *bridgeTweetSearchServant) DeleteDocuments(identifiers []string) error {
s.updateDocs(&documents{
identifiers: identifiers,
})
return nil
}
func (s *bridgeTweetSearchServant) Search(q *core.QueryReq, offset, limit int) (*core.QueryResp, error) {
return s.ts.Search(q, offset, limit)
}
func (s *bridgeTweetSearchServant) updateDocs(doc *documents) {
select {
case s.updateDocsCh <- doc:
logrus.Debugln("addDocuments send documents by chan")
default:
go func(ch chan<- *documents, item *documents) {
s.updateDocsCh <- item
logrus.Debugln("addDocuments send documents by goroutine")
}(s.updateDocsCh, doc)
}
}
func (s *bridgeTweetSearchServant) startUpdateDocs() {
for doc := range s.updateDocsCh {
if len(doc.docItems) > 0 {
if _, err := s.ts.AddDocuments(doc.docItems, doc.primaryKey...); err != nil {
logrus.Errorf("addDocuments occurs error: %v", err)
}
}
if len(doc.identifiers) > 0 {
if err := s.ts.DeleteDocuments(doc.identifiers); err != nil {
logrus.Errorf("deleteDocuments occurs error: %s", err)
}
}
}
}

@ -32,7 +32,7 @@ func (s *zincTweetSearchServant) IndexName() string {
return s.indexName return s.indexName
} }
func (s *zincTweetSearchServant) AddDocuments(data []map[string]interface{}, primaryKey ...string) (bool, error) { func (s *zincTweetSearchServant) AddDocuments(data core.DocItems, primaryKey ...string) (bool, error) {
return s.client.BulkPushDoc(data) return s.client.BulkPushDoc(data)
} }

@ -12,5 +12,5 @@ var (
func Initialize() { func Initialize() {
objectStorage = dao.NewObjectStorageService() objectStorage = dao.NewObjectStorageService()
attachmentChecker = dao.NewAttachmentCheckerService() attachmentChecker = dao.NewAttachmentCheckService()
} }

@ -136,7 +136,7 @@ func CreatePostComment(ctx *gin.Context, userID int64, param CommentCreationReq)
ds.UpdatePost(post) ds.UpdatePost(post)
// 更新索引 // 更新索引
go PushPostToSearch(post) PushPostToSearch(post)
// 创建用户消息提醒 // 创建用户消息提醒
postMaster, err := ds.GetUserByID(post.UserID) postMaster, err := ds.GetUserByID(post.UserID)
@ -251,7 +251,7 @@ func CreatePostCommentReply(ctx *gin.Context, commentID int64, content string, u
ds.UpdatePost(post) ds.UpdatePost(post)
// 更新索引 // 更新索引
go PushPostToSearch(post) PushPostToSearch(post)
// 创建用户消息提醒 // 创建用户消息提醒
commentMaster, err := ds.GetUserByID(comment.UserID) commentMaster, err := ds.GetUserByID(comment.UserID)
@ -325,7 +325,7 @@ func DeletePostCommentReply(reply *model.CommentReply) error {
ds.UpdatePost(post) ds.UpdatePost(post)
// 更新索引 // 更新索引
go PushPostToSearch(post) PushPostToSearch(post)
return nil return nil
} }

@ -177,8 +177,7 @@ func CreatePost(c *gin.Context, userID int64, param PostCreationReq) (*model.Pos
}) })
} }
// 推送Search // 推送Search
// TODO: 优化推送文章到搜索的处理机制最好使用通道channel传递文章可以省goroutine PushPostToSearch(post)
go PushPostToSearch(post)
} }
return post, nil return post, nil
@ -204,7 +203,7 @@ func DeletePost(id int64) error {
} }
// 删除索引 // 删除索引
go DeleteSearchPost(post) DeleteSearchPost(post)
return nil return nil
} }
@ -263,11 +262,11 @@ func VisiblePost(user *model.User, postId int64, visibility model.PostVisibleT)
if oldVisibility == model.PostVisitPrivate { if oldVisibility == model.PostVisitPrivate {
// 从私密转为非私密需要push // 从私密转为非私密需要push
logrus.Debugf("visible post set to re-public to add search index: %d, visibility: %s", post.ID, visibility) logrus.Debugf("visible post set to re-public to add search index: %d, visibility: %s", post.ID, visibility)
go PushPostToSearch(post) PushPostToSearch(post)
} else if visibility == model.PostVisitPrivate { } else if visibility == model.PostVisitPrivate {
// 从非私密转为私密需要删除索引 // 从非私密转为私密需要删除索引
logrus.Debugf("visible post set to private to delete search index: %d, visibility: %s", post.ID, visibility) logrus.Debugf("visible post set to private to delete search index: %d, visibility: %s", post.ID, visibility)
go DeleteSearchPost(post) DeleteSearchPost(post)
} }
return nil return nil
} }
@ -298,7 +297,7 @@ func CreatePostStar(postID, userID int64) (*model.PostStar, error) {
ds.UpdatePost(post) ds.UpdatePost(post)
// 更新索引 // 更新索引
go PushPostToSearch(post) PushPostToSearch(post)
return star, nil return star, nil
} }
@ -324,7 +323,7 @@ func DeletePostStar(star *model.PostStar) error {
ds.UpdatePost(post) ds.UpdatePost(post)
// 更新索引 // 更新索引
go PushPostToSearch(post) PushPostToSearch(post)
return nil return nil
} }
@ -355,7 +354,7 @@ func CreatePostCollection(postID, userID int64) (*model.PostCollection, error) {
ds.UpdatePost(post) ds.UpdatePost(post)
// 更新索引 // 更新索引
go PushPostToSearch(post) PushPostToSearch(post)
return collection, nil return collection, nil
} }
@ -381,7 +380,7 @@ func DeletePostCollection(collection *model.PostCollection) error {
ds.UpdatePost(post) ds.UpdatePost(post)
// 更新索引 // 更新索引
go PushPostToSearch(post) PushPostToSearch(post)
return nil return nil
} }
@ -526,7 +525,7 @@ func PushPostToSearch(post *model.Post) {
tagMaps[tag] = 1 tagMaps[tag] = 1
} }
data := []map[string]interface{}{} data := core.DocItems{}
data = append(data, map[string]interface{}{ data = append(data, map[string]interface{}{
"index": map[string]interface{}{ "index": map[string]interface{}{
"_index": ts.IndexName(), "_index": ts.IndexName(),

@ -16,6 +16,6 @@ var (
func Initialize() { func Initialize() {
ds = dao.NewDataService() ds = dao.NewDataService()
ts = dao.NewTweetSearchService() ts = dao.NewTweetSearchService()
attachmentChecker = dao.NewAttachmentCheckerService() attachmentChecker = dao.NewAttachmentCheckService()
DisablePhoneVerify = !conf.CfgIf("Sms") DisablePhoneVerify = !conf.CfgIf("Sms")
} }

Loading…
Cancel
Save