optimize #118 add backend worker update documents to search engine

pull/122/head
alimy 3 years ago
parent 2fc1fd2f9b
commit 1facc41068

@ -53,6 +53,7 @@ JWT: # 鉴权加密
Expire: 86400 Expire: 86400
TweetSearch: # 推文关键字搜索相关配置 TweetSearch: # 推文关键字搜索相关配置
MaxUpdateQPS: 100 # 最大添加/删除/更新Post的QPS设置范围[10, 10000], 默认100 MaxUpdateQPS: 100 # 最大添加/删除/更新Post的QPS设置范围[10, 10000], 默认100
MinWorker: 10 # 最小后台更新工作者, 设置范围[5, 1000], 默认10
Zinc: # Zinc搜索配置 Zinc: # Zinc搜索配置
Host: http://zinc:4080 Host: http://zinc:4080
Index: paopao-data Index: paopao-data

@ -49,6 +49,7 @@ type AppSettingS struct {
type CacheIndexSettingS struct { type CacheIndexSettingS struct {
MaxUpdateQPS int MaxUpdateQPS int
MinWorker int
} }
type SimpleCacheIndexSettingS struct { type SimpleCacheIndexSettingS struct {
@ -82,6 +83,7 @@ type FeaturesSettingS struct {
type TweetSearchS struct { type TweetSearchS struct {
MaxUpdateQPS int MaxUpdateQPS int
MinWorker int
} }
type ZincSettingS struct { type ZincSettingS struct {

@ -47,8 +47,17 @@ func NewTweetSearchService() core.TweetSearchService {
} }
logrus.Infof("use %s as tweet search serice by version %s", bts.ts.Name(), bts.ts.Version()) logrus.Infof("use %s as tweet search serice by version %s", bts.ts.Name(), bts.ts.Version())
numWorker := conf.TweetSearchSetting.MinWorker
if numWorker < 5 {
numWorker = 5
} else if numWorker > 1000 {
numWorker = 1000
}
logrus.Debugf("use %d backend worker to update documents to search engine", numWorker)
// 启动文档更新器 // 启动文档更新器
for ; numWorker > 0; numWorker-- {
go bts.startUpdateDocs() go bts.startUpdateDocs()
}
return bts return bts
} }

@ -42,10 +42,17 @@ func (s *bridgeTweetSearchServant) updateDocs(doc *documents) {
case s.updateDocsCh <- doc: case s.updateDocsCh <- doc:
logrus.Debugln("addDocuments send documents by chan") logrus.Debugln("addDocuments send documents by chan")
default: default:
go func(ch chan<- *documents, item *documents) { go func(item *documents) {
s.updateDocsCh <- item if len(item.docItems) > 0 {
logrus.Debugln("addDocuments send documents by goroutine") if _, err := s.ts.AddDocuments(item.docItems, item.primaryKey...); err != nil {
}(s.updateDocsCh, doc) logrus.Errorf("addDocuments in gorotine occurs error: %v", err)
}
} else if len(item.identifiers) > 0 {
if err := s.ts.DeleteDocuments(item.identifiers); err != nil {
logrus.Errorf("deleteDocuments in gorotine occurs error: %s", err)
}
}
}(doc)
} }
} }
@ -55,8 +62,7 @@ func (s *bridgeTweetSearchServant) startUpdateDocs() {
if _, err := s.ts.AddDocuments(doc.docItems, doc.primaryKey...); err != nil { if _, err := s.ts.AddDocuments(doc.docItems, doc.primaryKey...); err != nil {
logrus.Errorf("addDocuments occurs error: %v", err) logrus.Errorf("addDocuments occurs error: %v", err)
} }
} } else if len(doc.identifiers) > 0 {
if len(doc.identifiers) > 0 {
if err := s.ts.DeleteDocuments(doc.identifiers); err != nil { if err := s.ts.DeleteDocuments(doc.identifiers); err != nil {
logrus.Errorf("deleteDocuments occurs error: %s", err) logrus.Errorf("deleteDocuments occurs error: %s", err)
} }

Loading…
Cancel
Save