Merge pull request #190 from rocboss/jc/alimy

optimize search logic for bridgeTweetSearchServant
pull/191/head
Michael Li 2 years ago committed by GitHub
commit f2d674c62e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -1,6 +1,8 @@
package search package search
import ( import (
"time"
"github.com/rocboss/paopao-ce/internal/core" "github.com/rocboss/paopao-ce/internal/core"
"github.com/rocboss/paopao-ce/internal/model" "github.com/rocboss/paopao-ce/internal/model"
"github.com/sirupsen/logrus" "github.com/sirupsen/logrus"
@ -19,6 +21,7 @@ type documents struct {
type bridgeTweetSearchServant struct { type bridgeTweetSearchServant struct {
ts core.TweetSearchService ts core.TweetSearchService
updateDocsCh chan *documents updateDocsCh chan *documents
updateDocsTempCh chan *documents
} }
func (s *bridgeTweetSearchServant) IndexName() string { func (s *bridgeTweetSearchServant) IndexName() string {
@ -47,32 +50,47 @@ func (s *bridgeTweetSearchServant) Search(user *model.User, q *core.QueryReq, of
func (s *bridgeTweetSearchServant) updateDocs(doc *documents) { func (s *bridgeTweetSearchServant) updateDocs(doc *documents) {
select { select {
case s.updateDocsCh <- doc: case s.updateDocsCh <- doc:
logrus.Debugln("addDocuments send documents by chan") logrus.Debugln("addDocuments send documents by updateDocsCh chan")
default: default:
go func(item *documents) { select {
if len(item.docItems) > 0 { case s.updateDocsTempCh <- doc:
if _, err := s.ts.AddDocuments(item.docItems, item.primaryKey...); err != nil { logrus.Debugln("addDocuments send documents by updateDocsTempCh chan")
logrus.Errorf("addDocuments in gorotine occurs error: %v", err) default:
go func() {
s.handleUpdate(doc)
// watch updateDocsTempch to continue handle update if needed.
// cancel loop if no item had watched in 1 minute.
for count := 0; count > 60; count++ {
select {
case item := <-s.updateDocsTempCh:
// reset count to continue handle docs update
count = 0
s.handleUpdate(item)
default:
// sleeping to wait docs item pass over to handle
time.Sleep(1 * time.Second)
} }
} else if len(item.identifiers) > 0 {
if err := s.ts.DeleteDocuments(item.identifiers); err != nil {
logrus.Errorf("deleteDocuments in gorotine occurs error: %s", err)
} }
}()
} }
}(doc)
} }
} }
func (s *bridgeTweetSearchServant) startUpdateDocs() { func (s *bridgeTweetSearchServant) startUpdateDocs() {
for doc := range s.updateDocsCh { for doc := range s.updateDocsCh {
if len(doc.docItems) > 0 { s.handleUpdate(doc)
if _, err := s.ts.AddDocuments(doc.docItems, doc.primaryKey...); err != nil { }
}
func (s *bridgeTweetSearchServant) handleUpdate(item *documents) {
if len(item.docItems) > 0 {
if _, err := s.ts.AddDocuments(item.docItems, item.primaryKey...); err != nil {
logrus.Errorf("addDocuments occurs error: %v", err) logrus.Errorf("addDocuments occurs error: %v", err)
} }
} else if len(doc.identifiers) > 0 { } else if len(item.identifiers) > 0 {
if err := s.ts.DeleteDocuments(doc.identifiers); err != nil { if err := s.ts.DeleteDocuments(item.identifiers); err != nil {
logrus.Errorf("deleteDocuments occurs error: %s", err) logrus.Errorf("deleteDocuments occurs error: %s", err)
} }
} }
} }
}

@ -74,6 +74,7 @@ func NewBridgeTweetSearchService(ts core.TweetSearchService) core.TweetSearchSer
bts := &bridgeTweetSearchServant{ bts := &bridgeTweetSearchServant{
ts: ts, ts: ts,
updateDocsCh: make(chan *documents, capacity), updateDocsCh: make(chan *documents, capacity),
updateDocsTempCh: make(chan *documents, 100),
} }
numWorker := conf.TweetSearchSetting.MinWorker numWorker := conf.TweetSearchSetting.MinWorker

Loading…
Cancel
Save