optimize search logic for bridge use two level channel to dynamic process docs update

pull/190/head
Michael Li 2 years ago
parent 8a836ae56c
commit 67c669f6c4

@ -1,6 +1,8 @@
package search
import (
"time"
"github.com/rocboss/paopao-ce/internal/core"
"github.com/rocboss/paopao-ce/internal/model"
"github.com/sirupsen/logrus"
@ -17,8 +19,9 @@ type documents struct {
}
type bridgeTweetSearchServant struct {
ts core.TweetSearchService
updateDocsCh chan *documents
ts core.TweetSearchService
updateDocsCh chan *documents
updateDocsTempCh chan *documents
}
func (s *bridgeTweetSearchServant) IndexName() string {
@ -47,32 +50,47 @@ func (s *bridgeTweetSearchServant) Search(user *model.User, q *core.QueryReq, of
func (s *bridgeTweetSearchServant) updateDocs(doc *documents) {
select {
case s.updateDocsCh <- doc:
logrus.Debugln("addDocuments send documents by chan")
logrus.Debugln("addDocuments send documents by updateDocsCh chan")
default:
go func(item *documents) {
if len(item.docItems) > 0 {
if _, err := s.ts.AddDocuments(item.docItems, item.primaryKey...); err != nil {
logrus.Errorf("addDocuments in gorotine occurs error: %v", err)
}
} else if len(item.identifiers) > 0 {
if err := s.ts.DeleteDocuments(item.identifiers); err != nil {
logrus.Errorf("deleteDocuments in gorotine occurs error: %s", err)
select {
case s.updateDocsTempCh <- doc:
logrus.Debugln("addDocuments send documents by updateDocsTempCh chan")
default:
go func() {
s.handleUpdate(doc)
// watch updateDocsTempch to continue handle update if needed.
// cancel loop if no item had watched in 1 minute.
for count := 0; count > 60; count++ {
select {
case item := <-s.updateDocsTempCh:
// reset count to continue handle docs update
count = 0
s.handleUpdate(item)
default:
// sleeping to wait docs item pass over to handle
time.Sleep(1 * time.Second)
}
}
}
}(doc)
}()
}
}
}
func (s *bridgeTweetSearchServant) startUpdateDocs() {
for doc := range s.updateDocsCh {
if len(doc.docItems) > 0 {
if _, err := s.ts.AddDocuments(doc.docItems, doc.primaryKey...); err != nil {
logrus.Errorf("addDocuments occurs error: %v", err)
}
} else if len(doc.identifiers) > 0 {
if err := s.ts.DeleteDocuments(doc.identifiers); err != nil {
logrus.Errorf("deleteDocuments occurs error: %s", err)
}
s.handleUpdate(doc)
}
}
func (s *bridgeTweetSearchServant) handleUpdate(item *documents) {
if len(item.docItems) > 0 {
if _, err := s.ts.AddDocuments(item.docItems, item.primaryKey...); err != nil {
logrus.Errorf("addDocuments occurs error: %v", err)
}
} else if len(item.identifiers) > 0 {
if err := s.ts.DeleteDocuments(item.identifiers); err != nil {
logrus.Errorf("deleteDocuments occurs error: %s", err)
}
}
}

@ -72,8 +72,9 @@ func NewBridgeTweetSearchService(ts core.TweetSearchService) core.TweetSearchSer
capacity = 10000
}
bts := &bridgeTweetSearchServant{
ts: ts,
updateDocsCh: make(chan *documents, capacity),
ts: ts,
updateDocsCh: make(chan *documents, capacity),
updateDocsTempCh: make(chan *documents, 100),
}
numWorker := conf.TweetSearchSetting.MinWorker

Loading…
Cancel
Save