diff --git a/config.yaml.sample b/config.yaml.sample index 38fa5a8b..b02b2ece 100644 --- a/config.yaml.sample +++ b/config.yaml.sample @@ -26,16 +26,16 @@ SmsJuhe: Alipay: AppID: PrivateKey: +CacheIndex: + MaxUpdateQPS: 100 # 最大添加/删除/更新Post的QPS, 设置范围[10, 10000], 默认100 SimpleCacheIndex: # 缓存泡泡广场消息流 MaxIndexSize: 200 # 最大缓存条数 CheckTickDuration: 60 # 循环自检查每多少秒一次 ExpireTickDuration: 300 # 每多少秒后强制过期缓存, 设置为0禁止强制使缓存过期 - ActionQPS: 100 # 添加/删除/更新Post的QPS, 默认100,范围设置[10, 10000] BigCacheIndex: # 使用BigCache缓存泡泡广场消息流 - MaxIndexPage: 1024 # 最大缓存页数,必须是2^n, 代表最大同时缓存多少页数据 - Verbose: False # 是否打印cache操作的log - ExpireInSecond: 300 # 多少秒(>0)后强制过期缓存 - UpdateQPS: 100 # 添加/删除/更新Post的QPS, 默认100 + MaxIndexPage: 1024 # 最大缓存页数,必须是2^n, 代表最大同时缓存多少页数据 + Verbose: False # 是否打印cache操作的log + ExpireInSecond: 300 # 多少秒(>0)后强制过期缓存 LoggerFile: # 使用File写日志 SavePath: data/paopao-ce/logs FileName: app @@ -51,6 +51,8 @@ JWT: # 鉴权加密 Secret: 18a6413dc4fe394c66345ebe501b2f26 Issuer: paopao-api Expire: 86400 +TweetSearch: # 推文关键字搜索相关配置 + MaxUpdateQPS: 100 # 最大添加/删除/更新Post的QPS,设置范围[10, 10000], 默认100 Zinc: # Zinc搜索配置 Host: http://zinc:4080 Index: paopao-data diff --git a/internal/conf/conf.go b/internal/conf/conf.go index 2e44f662..6f22f46f 100644 --- a/internal/conf/conf.go +++ b/internal/conf/conf.go @@ -18,10 +18,12 @@ var ( ServerSetting *ServerSettingS AppSetting *AppSettingS + CacheIndexSetting *CacheIndexSettingS SimpleCacheIndexSetting *SimpleCacheIndexSettingS BigCacheIndexSetting *BigCacheIndexSettingS SmsJuheSetting *SmsJuheSettings AlipaySetting *AlipaySettingS + TweetSearchSetting *TweetSearchS ZincSetting *ZincSettingS AliOSSSetting *AliOSSSettingS MinIOSetting *MinIOSettingS @@ -47,6 +49,7 @@ func setupSetting(suite []string, noDefault bool) error { objects := map[string]interface{}{ "App": &AppSetting, "Server": &ServerSetting, + "CacheIndex": &CacheIndexSetting, "SimpleCacheIndex": &SimpleCacheIndexSetting, "BigCacheIndex": &BigCacheIndexSetting, "Alipay": &AlipaySetting, @@ -57,6 +60,7 @@ func setupSetting(suite []string, noDefault bool) error { "MySQL": &mysqlSetting, "Postgres": &postgresSetting, "Sqlite3": &sqlite3Setting, + "TweetSearch": &TweetSearchSetting, "Zinc": &ZincSetting, "Redis": &redisSetting, "JWT": &JWTSetting, diff --git a/internal/conf/settting.go b/internal/conf/settting.go index 182b43a5..cb03e715 100644 --- a/internal/conf/settting.go +++ b/internal/conf/settting.go @@ -47,18 +47,20 @@ type AppSettingS struct { TronApiKeys []string } +type CacheIndexSettingS struct { + MaxUpdateQPS int +} + type SimpleCacheIndexSettingS struct { MaxIndexSize int CheckTickDuration time.Duration ExpireTickDuration time.Duration - ActionQPS int } type BigCacheIndexSettingS struct { MaxIndexPage int ExpireInSecond time.Duration Verbose bool - UpdateQPS int } type AlipaySettingS struct { @@ -78,6 +80,10 @@ type FeaturesSettingS struct { features map[string]string } +type TweetSearchS struct { + MaxUpdateQPS int +} + type ZincSettingS struct { Host string Index string diff --git a/internal/core/search.go b/internal/core/search.go index 8bfc7253..36583f68 100644 --- a/internal/core/search.go +++ b/internal/core/search.go @@ -22,12 +22,14 @@ type QueryResp struct { Total int64 } +type DocItems []map[string]interface{} + // TweetSearchService tweet search service interface type TweetSearchService interface { VersionInfo IndexName() string - AddDocuments(documents []map[string]interface{}, primaryKey ...string) (bool, error) + AddDocuments(documents DocItems, primaryKey ...string) (bool, error) DeleteDocuments(identifiers []string) error Search(q *QueryReq, offset, limit int) (*QueryResp, error) } diff --git a/internal/dao/cache_index_big.go b/internal/dao/cache_index_big.go index 7b9615a9..d221bdb5 100644 --- a/internal/dao/cache_index_big.go +++ b/internal/dao/cache_index_big.go @@ -34,7 +34,7 @@ func newBigCacheIndexServant(getIndexPosts indexPostsFunc) *bigCacheIndexServant // indexActionCh capacity custom configure by conf.yaml need in [10, 10000] // or re-compile source to adjust min/max capacity - capacity := s.UpdateQPS + capacity := conf.CacheIndexSetting.MaxUpdateQPS if capacity < 10 { capacity = 10 } else if capacity > 10000 { @@ -43,6 +43,7 @@ func newBigCacheIndexServant(getIndexPosts indexPostsFunc) *bigCacheIndexServant cacheIndex.indexActionCh = make(chan core.IndexActionT, capacity) cacheIndex.cachePostsCh = make(chan *postsEntry, capacity) + // 启动索引更新器 go cacheIndex.startIndexPosts() return cacheIndex @@ -84,10 +85,10 @@ func (s *bigCacheIndexServant) cachePosts(key string, posts []*model.PostFormate entry := &postsEntry{key: key, posts: posts} select { case s.cachePostsCh <- entry: - logrus.Debugf("send indexAction by chan of key: %s", key) + logrus.Debugf("cachePosts by chan of key: %s", key) default: go func(ch chan<- *postsEntry, entry *postsEntry) { - logrus.Debugf("send indexAction by goroutine of key: %s", key) + logrus.Debugf("cachePosts indexAction by goroutine of key: %s", key) ch <- entry }(s.cachePostsCh, entry) } diff --git a/internal/dao/cache_index_simple.go b/internal/dao/cache_index_simple.go index f6c44b64..a5014b7a 100644 --- a/internal/dao/cache_index_simple.go +++ b/internal/dao/cache_index_simple.go @@ -34,7 +34,7 @@ func newSimpleCacheIndexServant(getIndexPosts indexPostsFunc) *simpleCacheIndexS // indexActionCh capacity custom configure by conf.yaml need in [10, 10000] // or re-compile source to adjust min/max capacity - capacity := s.ActionQPS + capacity := conf.CacheIndexSetting.MaxUpdateQPS if capacity < 10 { capacity = 10 } else if capacity > 10000 { diff --git a/internal/dao/dao.go b/internal/dao/dao.go index 15488018..71826803 100644 --- a/internal/dao/dao.go +++ b/internal/dao/dao.go @@ -50,7 +50,7 @@ func NewDataService() core.DataService { return ds } -func NewAttachmentCheckerService() core.AttachmentCheckService { +func NewAttachmentCheckService() core.AttachmentCheckService { return &attachmentCheckServant{ domain: getOssDomain(), } diff --git a/internal/dao/oss.go b/internal/dao/oss.go index e02a7f79..a85bb8d9 100644 --- a/internal/dao/oss.go +++ b/internal/dao/oss.go @@ -48,6 +48,7 @@ func NewObjectStorageService() (oss core.ObjectStorageService) { // default use AliOSS as object storage service oss = newAliossServent() logrus.Infof("use default AliOSS as object storage by version %s", oss.Version()) + return } logrus.Infof("use %s as object storage by version %s", oss.Name(), oss.Version()) return diff --git a/internal/dao/search.go b/internal/dao/search.go index 992022d7..deda5969 100644 --- a/internal/dao/search.go +++ b/internal/dao/search.go @@ -9,20 +9,46 @@ import ( var ( _ core.TweetSearchService = (*zincTweetSearchServant)(nil) + _ core.TweetSearchService = (*bridgeTweetSearchServant)(nil) ) +type documents struct { + primaryKey []string + docItems core.DocItems + identifiers []string +} + +type bridgeTweetSearchServant struct { + ts core.TweetSearchService + updateDocsCh chan *documents +} + type zincTweetSearchServant struct { indexName string client *zinc.ZincClient } -func NewTweetSearchService() (ts core.TweetSearchService) { +func NewTweetSearchService() core.TweetSearchService { + bts := &bridgeTweetSearchServant{} + + capacity := conf.TweetSearchSetting.MaxUpdateQPS + if capacity < 10 { + capacity = 10 + } else if capacity > 10000 { + capacity = 10000 + } + bts.updateDocsCh = make(chan *documents, capacity) + if conf.CfgIf("Zinc") { - ts = newZincTweetSearchServant() + bts.ts = newZincTweetSearchServant() } else { // default use Zinc as tweet search service - ts = newZincTweetSearchServant() + bts.ts = newZincTweetSearchServant() } - logrus.Infof("use %s as tweet search serice by version %s", ts.Name(), ts.Version()) - return + logrus.Infof("use %s as tweet search serice by version %s", bts.ts.Name(), bts.ts.Version()) + + // 启动文档更新器 + go bts.startUpdateDocs() + + return bts } diff --git a/internal/dao/search_bridge.go b/internal/dao/search_bridge.go new file mode 100644 index 00000000..45766474 --- /dev/null +++ b/internal/dao/search_bridge.go @@ -0,0 +1,65 @@ +package dao + +import ( + "github.com/Masterminds/semver/v3" + "github.com/rocboss/paopao-ce/internal/core" + "github.com/sirupsen/logrus" +) + +func (s *bridgeTweetSearchServant) Name() string { + return "BridgeTweetSearch" +} + +func (s *bridgeTweetSearchServant) Version() *semver.Version { + return semver.MustParse("v0.1.0") +} + +func (s *bridgeTweetSearchServant) IndexName() string { + return s.ts.IndexName() +} + +func (s *bridgeTweetSearchServant) AddDocuments(data core.DocItems, primaryKey ...string) (bool, error) { + s.updateDocs(&documents{ + primaryKey: primaryKey, + docItems: data, + }) + return true, nil +} + +func (s *bridgeTweetSearchServant) DeleteDocuments(identifiers []string) error { + s.updateDocs(&documents{ + identifiers: identifiers, + }) + return nil +} + +func (s *bridgeTweetSearchServant) Search(q *core.QueryReq, offset, limit int) (*core.QueryResp, error) { + return s.ts.Search(q, offset, limit) +} + +func (s *bridgeTweetSearchServant) updateDocs(doc *documents) { + select { + case s.updateDocsCh <- doc: + logrus.Debugln("addDocuments send documents by chan") + default: + go func(ch chan<- *documents, item *documents) { + s.updateDocsCh <- item + logrus.Debugln("addDocuments send documents by goroutine") + }(s.updateDocsCh, doc) + } +} + +func (s *bridgeTweetSearchServant) startUpdateDocs() { + for doc := range s.updateDocsCh { + if len(doc.docItems) > 0 { + if _, err := s.ts.AddDocuments(doc.docItems, doc.primaryKey...); err != nil { + logrus.Errorf("addDocuments occurs error: %v", err) + } + } + if len(doc.identifiers) > 0 { + if err := s.ts.DeleteDocuments(doc.identifiers); err != nil { + logrus.Errorf("deleteDocuments occurs error: %s", err) + } + } + } +} diff --git a/internal/dao/search_zinc.go b/internal/dao/search_zinc.go index 3d52c2b6..6cda0503 100644 --- a/internal/dao/search_zinc.go +++ b/internal/dao/search_zinc.go @@ -32,7 +32,7 @@ func (s *zincTweetSearchServant) IndexName() string { return s.indexName } -func (s *zincTweetSearchServant) AddDocuments(data []map[string]interface{}, primaryKey ...string) (bool, error) { +func (s *zincTweetSearchServant) AddDocuments(data core.DocItems, primaryKey ...string) (bool, error) { return s.client.BulkPushDoc(data) } diff --git a/internal/routers/api/api.go b/internal/routers/api/api.go index 9049e110..26b65a4e 100644 --- a/internal/routers/api/api.go +++ b/internal/routers/api/api.go @@ -12,5 +12,5 @@ var ( func Initialize() { objectStorage = dao.NewObjectStorageService() - attachmentChecker = dao.NewAttachmentCheckerService() + attachmentChecker = dao.NewAttachmentCheckService() } diff --git a/internal/service/comment.go b/internal/service/comment.go index 4cd7fb24..8805c106 100644 --- a/internal/service/comment.go +++ b/internal/service/comment.go @@ -136,7 +136,7 @@ func CreatePostComment(ctx *gin.Context, userID int64, param CommentCreationReq) ds.UpdatePost(post) // 更新索引 - go PushPostToSearch(post) + PushPostToSearch(post) // 创建用户消息提醒 postMaster, err := ds.GetUserByID(post.UserID) @@ -251,7 +251,7 @@ func CreatePostCommentReply(ctx *gin.Context, commentID int64, content string, u ds.UpdatePost(post) // 更新索引 - go PushPostToSearch(post) + PushPostToSearch(post) // 创建用户消息提醒 commentMaster, err := ds.GetUserByID(comment.UserID) @@ -325,7 +325,7 @@ func DeletePostCommentReply(reply *model.CommentReply) error { ds.UpdatePost(post) // 更新索引 - go PushPostToSearch(post) + PushPostToSearch(post) return nil } diff --git a/internal/service/post.go b/internal/service/post.go index 5a4a4d92..2401f0ef 100644 --- a/internal/service/post.go +++ b/internal/service/post.go @@ -177,8 +177,7 @@ func CreatePost(c *gin.Context, userID int64, param PostCreationReq) (*model.Pos }) } // 推送Search - // TODO: 优化推送文章到搜索的处理机制,最好使用通道channel传递文章,可以省goroutine - go PushPostToSearch(post) + PushPostToSearch(post) } return post, nil @@ -204,7 +203,7 @@ func DeletePost(id int64) error { } // 删除索引 - go DeleteSearchPost(post) + DeleteSearchPost(post) return nil } @@ -263,11 +262,11 @@ func VisiblePost(user *model.User, postId int64, visibility model.PostVisibleT) if oldVisibility == model.PostVisitPrivate { // 从私密转为非私密需要push logrus.Debugf("visible post set to re-public to add search index: %d, visibility: %s", post.ID, visibility) - go PushPostToSearch(post) + PushPostToSearch(post) } else if visibility == model.PostVisitPrivate { // 从非私密转为私密需要删除索引 logrus.Debugf("visible post set to private to delete search index: %d, visibility: %s", post.ID, visibility) - go DeleteSearchPost(post) + DeleteSearchPost(post) } return nil } @@ -298,7 +297,7 @@ func CreatePostStar(postID, userID int64) (*model.PostStar, error) { ds.UpdatePost(post) // 更新索引 - go PushPostToSearch(post) + PushPostToSearch(post) return star, nil } @@ -324,7 +323,7 @@ func DeletePostStar(star *model.PostStar) error { ds.UpdatePost(post) // 更新索引 - go PushPostToSearch(post) + PushPostToSearch(post) return nil } @@ -355,7 +354,7 @@ func CreatePostCollection(postID, userID int64) (*model.PostCollection, error) { ds.UpdatePost(post) // 更新索引 - go PushPostToSearch(post) + PushPostToSearch(post) return collection, nil } @@ -381,7 +380,7 @@ func DeletePostCollection(collection *model.PostCollection) error { ds.UpdatePost(post) // 更新索引 - go PushPostToSearch(post) + PushPostToSearch(post) return nil } @@ -526,7 +525,7 @@ func PushPostToSearch(post *model.Post) { tagMaps[tag] = 1 } - data := []map[string]interface{}{} + data := core.DocItems{} data = append(data, map[string]interface{}{ "index": map[string]interface{}{ "_index": ts.IndexName(), diff --git a/internal/service/service.go b/internal/service/service.go index 3ed9d261..8593f754 100644 --- a/internal/service/service.go +++ b/internal/service/service.go @@ -16,6 +16,6 @@ var ( func Initialize() { ds = dao.NewDataService() ts = dao.NewTweetSearchService() - attachmentChecker = dao.NewAttachmentCheckerService() + attachmentChecker = dao.NewAttachmentCheckService() DisablePhoneVerify = !conf.CfgIf("Sms") }