From 1facc41068f281ee14124c439df2e8c63ccc37a1 Mon Sep 17 00:00:00 2001 From: alimy Date: Tue, 21 Jun 2022 15:30:55 +0800 Subject: [PATCH] optimize #118 add backend worker update documents to search engine --- config.yaml.sample | 1 + internal/conf/settting.go | 2 ++ internal/dao/search.go | 11 ++++++++++- internal/dao/search_bridge.go | 18 ++++++++++++------ 4 files changed, 25 insertions(+), 7 deletions(-) diff --git a/config.yaml.sample b/config.yaml.sample index b02b2ece..dc4a81ac 100644 --- a/config.yaml.sample +++ b/config.yaml.sample @@ -53,6 +53,7 @@ JWT: # 鉴权加密 Expire: 86400 TweetSearch: # 推文关键字搜索相关配置 MaxUpdateQPS: 100 # 最大添加/删除/更新Post的QPS,设置范围[10, 10000], 默认100 + MinWorker: 10 # 最小后台更新工作者, 设置范围[5, 1000], 默认10 Zinc: # Zinc搜索配置 Host: http://zinc:4080 Index: paopao-data diff --git a/internal/conf/settting.go b/internal/conf/settting.go index cb03e715..9bceceb9 100644 --- a/internal/conf/settting.go +++ b/internal/conf/settting.go @@ -49,6 +49,7 @@ type AppSettingS struct { type CacheIndexSettingS struct { MaxUpdateQPS int + MinWorker int } type SimpleCacheIndexSettingS struct { @@ -82,6 +83,7 @@ type FeaturesSettingS struct { type TweetSearchS struct { MaxUpdateQPS int + MinWorker int } type ZincSettingS struct { diff --git a/internal/dao/search.go b/internal/dao/search.go index deda5969..0b25f4ca 100644 --- a/internal/dao/search.go +++ b/internal/dao/search.go @@ -47,8 +47,17 @@ func NewTweetSearchService() core.TweetSearchService { } logrus.Infof("use %s as tweet search serice by version %s", bts.ts.Name(), bts.ts.Version()) + numWorker := conf.TweetSearchSetting.MinWorker + if numWorker < 5 { + numWorker = 5 + } else if numWorker > 1000 { + numWorker = 1000 + } + logrus.Debugf("use %d backend worker to update documents to search engine", numWorker) // 启动文档更新器 - go bts.startUpdateDocs() + for ; numWorker > 0; numWorker-- { + go bts.startUpdateDocs() + } return bts } diff --git a/internal/dao/search_bridge.go b/internal/dao/search_bridge.go index 45766474..26b96f92 100644 --- a/internal/dao/search_bridge.go +++ b/internal/dao/search_bridge.go @@ -42,10 +42,17 @@ func (s *bridgeTweetSearchServant) updateDocs(doc *documents) { case s.updateDocsCh <- doc: logrus.Debugln("addDocuments send documents by chan") default: - go func(ch chan<- *documents, item *documents) { - s.updateDocsCh <- item - logrus.Debugln("addDocuments send documents by goroutine") - }(s.updateDocsCh, doc) + go func(item *documents) { + if len(item.docItems) > 0 { + if _, err := s.ts.AddDocuments(item.docItems, item.primaryKey...); err != nil { + logrus.Errorf("addDocuments in gorotine occurs error: %v", err) + } + } else if len(item.identifiers) > 0 { + if err := s.ts.DeleteDocuments(item.identifiers); err != nil { + logrus.Errorf("deleteDocuments in gorotine occurs error: %s", err) + } + } + }(doc) } } @@ -55,8 +62,7 @@ func (s *bridgeTweetSearchServant) startUpdateDocs() { if _, err := s.ts.AddDocuments(doc.docItems, doc.primaryKey...); err != nil { logrus.Errorf("addDocuments occurs error: %v", err) } - } - if len(doc.identifiers) > 0 { + } else if len(doc.identifiers) > 0 { if err := s.ts.DeleteDocuments(doc.identifiers); err != nil { logrus.Errorf("deleteDocuments occurs error: %s", err) }