optimize #118 wrap async interface for update documents to search engine

pull/121/head
alimy 3 years ago
parent 8edda3be81
commit 2fc1fd2f9b

@ -26,16 +26,16 @@ SmsJuhe:
Alipay:
AppID:
PrivateKey:
CacheIndex:
MaxUpdateQPS: 100 # 最大添加/删除/更新Post的QPS, 设置范围[10, 10000], 默认100
SimpleCacheIndex: # 缓存泡泡广场消息流
MaxIndexSize: 200 # 最大缓存条数
CheckTickDuration: 60 # 循环自检查每多少秒一次
ExpireTickDuration: 300 # 每多少秒后强制过期缓存, 设置为0禁止强制使缓存过期
ActionQPS: 100 # 添加/删除/更新Post的QPS, 默认100范围设置[10, 10000]
BigCacheIndex: # 使用BigCache缓存泡泡广场消息流
MaxIndexPage: 1024 # 最大缓存页数必须是2^n, 代表最大同时缓存多少页数据
Verbose: False # 是否打印cache操作的log
ExpireInSecond: 300 # 多少秒(>0)后强制过期缓存
UpdateQPS: 100 # 添加/删除/更新Post的QPS, 默认100
MaxIndexPage: 1024 # 最大缓存页数必须是2^n, 代表最大同时缓存多少页数据
Verbose: False # 是否打印cache操作的log
ExpireInSecond: 300 # 多少秒(>0)后强制过期缓存
LoggerFile: # 使用File写日志
SavePath: data/paopao-ce/logs
FileName: app
@ -51,6 +51,8 @@ JWT: # 鉴权加密
Secret: 18a6413dc4fe394c66345ebe501b2f26
Issuer: paopao-api
Expire: 86400
TweetSearch: # 推文关键字搜索相关配置
MaxUpdateQPS: 100 # 最大添加/删除/更新Post的QPS设置范围[10, 10000], 默认100
Zinc: # Zinc搜索配置
Host: http://zinc:4080
Index: paopao-data

@ -18,10 +18,12 @@ var (
ServerSetting *ServerSettingS
AppSetting *AppSettingS
CacheIndexSetting *CacheIndexSettingS
SimpleCacheIndexSetting *SimpleCacheIndexSettingS
BigCacheIndexSetting *BigCacheIndexSettingS
SmsJuheSetting *SmsJuheSettings
AlipaySetting *AlipaySettingS
TweetSearchSetting *TweetSearchS
ZincSetting *ZincSettingS
AliOSSSetting *AliOSSSettingS
MinIOSetting *MinIOSettingS
@ -47,6 +49,7 @@ func setupSetting(suite []string, noDefault bool) error {
objects := map[string]interface{}{
"App": &AppSetting,
"Server": &ServerSetting,
"CacheIndex": &CacheIndexSetting,
"SimpleCacheIndex": &SimpleCacheIndexSetting,
"BigCacheIndex": &BigCacheIndexSetting,
"Alipay": &AlipaySetting,
@ -57,6 +60,7 @@ func setupSetting(suite []string, noDefault bool) error {
"MySQL": &mysqlSetting,
"Postgres": &postgresSetting,
"Sqlite3": &sqlite3Setting,
"TweetSearch": &TweetSearchSetting,
"Zinc": &ZincSetting,
"Redis": &redisSetting,
"JWT": &JWTSetting,

@ -47,18 +47,20 @@ type AppSettingS struct {
TronApiKeys []string
}
type CacheIndexSettingS struct {
MaxUpdateQPS int
}
type SimpleCacheIndexSettingS struct {
MaxIndexSize int
CheckTickDuration time.Duration
ExpireTickDuration time.Duration
ActionQPS int
}
type BigCacheIndexSettingS struct {
MaxIndexPage int
ExpireInSecond time.Duration
Verbose bool
UpdateQPS int
}
type AlipaySettingS struct {
@ -78,6 +80,10 @@ type FeaturesSettingS struct {
features map[string]string
}
type TweetSearchS struct {
MaxUpdateQPS int
}
type ZincSettingS struct {
Host string
Index string

@ -22,12 +22,14 @@ type QueryResp struct {
Total int64
}
type DocItems []map[string]interface{}
// TweetSearchService tweet search service interface
type TweetSearchService interface {
VersionInfo
IndexName() string
AddDocuments(documents []map[string]interface{}, primaryKey ...string) (bool, error)
AddDocuments(documents DocItems, primaryKey ...string) (bool, error)
DeleteDocuments(identifiers []string) error
Search(q *QueryReq, offset, limit int) (*QueryResp, error)
}

@ -34,7 +34,7 @@ func newBigCacheIndexServant(getIndexPosts indexPostsFunc) *bigCacheIndexServant
// indexActionCh capacity custom configure by conf.yaml need in [10, 10000]
// or re-compile source to adjust min/max capacity
capacity := s.UpdateQPS
capacity := conf.CacheIndexSetting.MaxUpdateQPS
if capacity < 10 {
capacity = 10
} else if capacity > 10000 {
@ -43,6 +43,7 @@ func newBigCacheIndexServant(getIndexPosts indexPostsFunc) *bigCacheIndexServant
cacheIndex.indexActionCh = make(chan core.IndexActionT, capacity)
cacheIndex.cachePostsCh = make(chan *postsEntry, capacity)
// 启动索引更新器
go cacheIndex.startIndexPosts()
return cacheIndex
@ -84,10 +85,10 @@ func (s *bigCacheIndexServant) cachePosts(key string, posts []*model.PostFormate
entry := &postsEntry{key: key, posts: posts}
select {
case s.cachePostsCh <- entry:
logrus.Debugf("send indexAction by chan of key: %s", key)
logrus.Debugf("cachePosts by chan of key: %s", key)
default:
go func(ch chan<- *postsEntry, entry *postsEntry) {
logrus.Debugf("send indexAction by goroutine of key: %s", key)
logrus.Debugf("cachePosts indexAction by goroutine of key: %s", key)
ch <- entry
}(s.cachePostsCh, entry)
}

@ -34,7 +34,7 @@ func newSimpleCacheIndexServant(getIndexPosts indexPostsFunc) *simpleCacheIndexS
// indexActionCh capacity custom configure by conf.yaml need in [10, 10000]
// or re-compile source to adjust min/max capacity
capacity := s.ActionQPS
capacity := conf.CacheIndexSetting.MaxUpdateQPS
if capacity < 10 {
capacity = 10
} else if capacity > 10000 {

@ -50,7 +50,7 @@ func NewDataService() core.DataService {
return ds
}
func NewAttachmentCheckerService() core.AttachmentCheckService {
func NewAttachmentCheckService() core.AttachmentCheckService {
return &attachmentCheckServant{
domain: getOssDomain(),
}

@ -48,6 +48,7 @@ func NewObjectStorageService() (oss core.ObjectStorageService) {
// default use AliOSS as object storage service
oss = newAliossServent()
logrus.Infof("use default AliOSS as object storage by version %s", oss.Version())
return
}
logrus.Infof("use %s as object storage by version %s", oss.Name(), oss.Version())
return

@ -9,20 +9,46 @@ import (
var (
_ core.TweetSearchService = (*zincTweetSearchServant)(nil)
_ core.TweetSearchService = (*bridgeTweetSearchServant)(nil)
)
type documents struct {
primaryKey []string
docItems core.DocItems
identifiers []string
}
type bridgeTweetSearchServant struct {
ts core.TweetSearchService
updateDocsCh chan *documents
}
type zincTweetSearchServant struct {
indexName string
client *zinc.ZincClient
}
func NewTweetSearchService() (ts core.TweetSearchService) {
func NewTweetSearchService() core.TweetSearchService {
bts := &bridgeTweetSearchServant{}
capacity := conf.TweetSearchSetting.MaxUpdateQPS
if capacity < 10 {
capacity = 10
} else if capacity > 10000 {
capacity = 10000
}
bts.updateDocsCh = make(chan *documents, capacity)
if conf.CfgIf("Zinc") {
ts = newZincTweetSearchServant()
bts.ts = newZincTweetSearchServant()
} else {
// default use Zinc as tweet search service
ts = newZincTweetSearchServant()
bts.ts = newZincTweetSearchServant()
}
logrus.Infof("use %s as tweet search serice by version %s", ts.Name(), ts.Version())
return
logrus.Infof("use %s as tweet search serice by version %s", bts.ts.Name(), bts.ts.Version())
// 启动文档更新器
go bts.startUpdateDocs()
return bts
}

@ -0,0 +1,65 @@
package dao
import (
"github.com/Masterminds/semver/v3"
"github.com/rocboss/paopao-ce/internal/core"
"github.com/sirupsen/logrus"
)
func (s *bridgeTweetSearchServant) Name() string {
return "BridgeTweetSearch"
}
func (s *bridgeTweetSearchServant) Version() *semver.Version {
return semver.MustParse("v0.1.0")
}
func (s *bridgeTweetSearchServant) IndexName() string {
return s.ts.IndexName()
}
func (s *bridgeTweetSearchServant) AddDocuments(data core.DocItems, primaryKey ...string) (bool, error) {
s.updateDocs(&documents{
primaryKey: primaryKey,
docItems: data,
})
return true, nil
}
func (s *bridgeTweetSearchServant) DeleteDocuments(identifiers []string) error {
s.updateDocs(&documents{
identifiers: identifiers,
})
return nil
}
func (s *bridgeTweetSearchServant) Search(q *core.QueryReq, offset, limit int) (*core.QueryResp, error) {
return s.ts.Search(q, offset, limit)
}
func (s *bridgeTweetSearchServant) updateDocs(doc *documents) {
select {
case s.updateDocsCh <- doc:
logrus.Debugln("addDocuments send documents by chan")
default:
go func(ch chan<- *documents, item *documents) {
s.updateDocsCh <- item
logrus.Debugln("addDocuments send documents by goroutine")
}(s.updateDocsCh, doc)
}
}
func (s *bridgeTweetSearchServant) startUpdateDocs() {
for doc := range s.updateDocsCh {
if len(doc.docItems) > 0 {
if _, err := s.ts.AddDocuments(doc.docItems, doc.primaryKey...); err != nil {
logrus.Errorf("addDocuments occurs error: %v", err)
}
}
if len(doc.identifiers) > 0 {
if err := s.ts.DeleteDocuments(doc.identifiers); err != nil {
logrus.Errorf("deleteDocuments occurs error: %s", err)
}
}
}
}

@ -32,7 +32,7 @@ func (s *zincTweetSearchServant) IndexName() string {
return s.indexName
}
func (s *zincTweetSearchServant) AddDocuments(data []map[string]interface{}, primaryKey ...string) (bool, error) {
func (s *zincTweetSearchServant) AddDocuments(data core.DocItems, primaryKey ...string) (bool, error) {
return s.client.BulkPushDoc(data)
}

@ -12,5 +12,5 @@ var (
func Initialize() {
objectStorage = dao.NewObjectStorageService()
attachmentChecker = dao.NewAttachmentCheckerService()
attachmentChecker = dao.NewAttachmentCheckService()
}

@ -136,7 +136,7 @@ func CreatePostComment(ctx *gin.Context, userID int64, param CommentCreationReq)
ds.UpdatePost(post)
// 更新索引
go PushPostToSearch(post)
PushPostToSearch(post)
// 创建用户消息提醒
postMaster, err := ds.GetUserByID(post.UserID)
@ -251,7 +251,7 @@ func CreatePostCommentReply(ctx *gin.Context, commentID int64, content string, u
ds.UpdatePost(post)
// 更新索引
go PushPostToSearch(post)
PushPostToSearch(post)
// 创建用户消息提醒
commentMaster, err := ds.GetUserByID(comment.UserID)
@ -325,7 +325,7 @@ func DeletePostCommentReply(reply *model.CommentReply) error {
ds.UpdatePost(post)
// 更新索引
go PushPostToSearch(post)
PushPostToSearch(post)
return nil
}

@ -177,8 +177,7 @@ func CreatePost(c *gin.Context, userID int64, param PostCreationReq) (*model.Pos
})
}
// 推送Search
// TODO: 优化推送文章到搜索的处理机制最好使用通道channel传递文章可以省goroutine
go PushPostToSearch(post)
PushPostToSearch(post)
}
return post, nil
@ -204,7 +203,7 @@ func DeletePost(id int64) error {
}
// 删除索引
go DeleteSearchPost(post)
DeleteSearchPost(post)
return nil
}
@ -263,11 +262,11 @@ func VisiblePost(user *model.User, postId int64, visibility model.PostVisibleT)
if oldVisibility == model.PostVisitPrivate {
// 从私密转为非私密需要push
logrus.Debugf("visible post set to re-public to add search index: %d, visibility: %s", post.ID, visibility)
go PushPostToSearch(post)
PushPostToSearch(post)
} else if visibility == model.PostVisitPrivate {
// 从非私密转为私密需要删除索引
logrus.Debugf("visible post set to private to delete search index: %d, visibility: %s", post.ID, visibility)
go DeleteSearchPost(post)
DeleteSearchPost(post)
}
return nil
}
@ -298,7 +297,7 @@ func CreatePostStar(postID, userID int64) (*model.PostStar, error) {
ds.UpdatePost(post)
// 更新索引
go PushPostToSearch(post)
PushPostToSearch(post)
return star, nil
}
@ -324,7 +323,7 @@ func DeletePostStar(star *model.PostStar) error {
ds.UpdatePost(post)
// 更新索引
go PushPostToSearch(post)
PushPostToSearch(post)
return nil
}
@ -355,7 +354,7 @@ func CreatePostCollection(postID, userID int64) (*model.PostCollection, error) {
ds.UpdatePost(post)
// 更新索引
go PushPostToSearch(post)
PushPostToSearch(post)
return collection, nil
}
@ -381,7 +380,7 @@ func DeletePostCollection(collection *model.PostCollection) error {
ds.UpdatePost(post)
// 更新索引
go PushPostToSearch(post)
PushPostToSearch(post)
return nil
}
@ -526,7 +525,7 @@ func PushPostToSearch(post *model.Post) {
tagMaps[tag] = 1
}
data := []map[string]interface{}{}
data := core.DocItems{}
data = append(data, map[string]interface{}{
"index": map[string]interface{}{
"_index": ts.IndexName(),

@ -16,6 +16,6 @@ var (
func Initialize() {
ds = dao.NewDataService()
ts = dao.NewTweetSearchService()
attachmentChecker = dao.NewAttachmentCheckerService()
attachmentChecker = dao.NewAttachmentCheckService()
DisablePhoneVerify = !conf.CfgIf("Sms")
}

Loading…
Cancel
Save