diff --git a/internal/dao/search/bridge.go b/internal/dao/search/bridge.go index 84623f7c..20b9a73d 100644 --- a/internal/dao/search/bridge.go +++ b/internal/dao/search/bridge.go @@ -1,6 +1,8 @@ package search import ( + "time" + "github.com/rocboss/paopao-ce/internal/core" "github.com/rocboss/paopao-ce/internal/model" "github.com/sirupsen/logrus" @@ -17,8 +19,9 @@ type documents struct { } type bridgeTweetSearchServant struct { - ts core.TweetSearchService - updateDocsCh chan *documents + ts core.TweetSearchService + updateDocsCh chan *documents + updateDocsTempCh chan *documents } func (s *bridgeTweetSearchServant) IndexName() string { @@ -47,32 +50,47 @@ func (s *bridgeTweetSearchServant) Search(user *model.User, q *core.QueryReq, of func (s *bridgeTweetSearchServant) updateDocs(doc *documents) { select { case s.updateDocsCh <- doc: - logrus.Debugln("addDocuments send documents by chan") + logrus.Debugln("addDocuments send documents by updateDocsCh chan") default: - go func(item *documents) { - if len(item.docItems) > 0 { - if _, err := s.ts.AddDocuments(item.docItems, item.primaryKey...); err != nil { - logrus.Errorf("addDocuments in gorotine occurs error: %v", err) - } - } else if len(item.identifiers) > 0 { - if err := s.ts.DeleteDocuments(item.identifiers); err != nil { - logrus.Errorf("deleteDocuments in gorotine occurs error: %s", err) + select { + case s.updateDocsTempCh <- doc: + logrus.Debugln("addDocuments send documents by updateDocsTempCh chan") + default: + go func() { + s.handleUpdate(doc) + + // watch updateDocsTempch to continue handle update if needed. + // cancel loop if no item had watched in 1 minute. + for count := 0; count > 60; count++ { + select { + case item := <-s.updateDocsTempCh: + // reset count to continue handle docs update + count = 0 + s.handleUpdate(item) + default: + // sleeping to wait docs item pass over to handle + time.Sleep(1 * time.Second) + } } - } - }(doc) + }() + } } } func (s *bridgeTweetSearchServant) startUpdateDocs() { for doc := range s.updateDocsCh { - if len(doc.docItems) > 0 { - if _, err := s.ts.AddDocuments(doc.docItems, doc.primaryKey...); err != nil { - logrus.Errorf("addDocuments occurs error: %v", err) - } - } else if len(doc.identifiers) > 0 { - if err := s.ts.DeleteDocuments(doc.identifiers); err != nil { - logrus.Errorf("deleteDocuments occurs error: %s", err) - } + s.handleUpdate(doc) + } +} + +func (s *bridgeTweetSearchServant) handleUpdate(item *documents) { + if len(item.docItems) > 0 { + if _, err := s.ts.AddDocuments(item.docItems, item.primaryKey...); err != nil { + logrus.Errorf("addDocuments occurs error: %v", err) + } + } else if len(item.identifiers) > 0 { + if err := s.ts.DeleteDocuments(item.identifiers); err != nil { + logrus.Errorf("deleteDocuments occurs error: %s", err) } } } diff --git a/internal/dao/search/search.go b/internal/dao/search/search.go index 517591ea..1dda80ef 100644 --- a/internal/dao/search/search.go +++ b/internal/dao/search/search.go @@ -72,8 +72,9 @@ func NewBridgeTweetSearchService(ts core.TweetSearchService) core.TweetSearchSer capacity = 10000 } bts := &bridgeTweetSearchServant{ - ts: ts, - updateDocsCh: make(chan *documents, capacity), + ts: ts, + updateDocsCh: make(chan *documents, capacity), + updateDocsTempCh: make(chan *documents, 100), } numWorker := conf.TweetSearchSetting.MinWorker