mirror of https://github.com/rocboss/paopao-ce
You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
117 lines
3.3 KiB
117 lines
3.3 KiB
// Copyright 2022 ROC. All rights reserved.
|
|
// Use of this source code is governed by a MIT style
|
|
// license that can be found in the LICENSE file.
|
|
|
|
package cache
|
|
|
|
import (
|
|
"sync/atomic"
|
|
"time"
|
|
|
|
"github.com/Masterminds/semver/v3"
|
|
"github.com/rocboss/paopao-ce/internal/core"
|
|
"github.com/rocboss/paopao-ce/internal/core/cs"
|
|
"github.com/rocboss/paopao-ce/internal/core/ms"
|
|
"github.com/rocboss/paopao-ce/pkg/debug"
|
|
"github.com/sirupsen/logrus"
|
|
)
|
|
|
|
var (
|
|
_ core.CacheIndexService = (*simpleCacheIndexServant)(nil)
|
|
_ core.VersionInfo = (*simpleCacheIndexServant)(nil)
|
|
)
|
|
|
|
type simpleCacheIndexServant struct {
|
|
ips core.IndexPostsService
|
|
|
|
indexActionCh chan core.IdxAct
|
|
indexPosts *ms.IndexTweetList
|
|
atomicIndex atomic.Value
|
|
maxIndexSize int
|
|
checkTick *time.Ticker
|
|
expireIndexTick *time.Ticker
|
|
}
|
|
|
|
func (s *simpleCacheIndexServant) IndexPosts(user *ms.User, offset int, limit int) (*ms.IndexTweetList, error) {
|
|
cacheResp := s.atomicIndex.Load().(*ms.IndexTweetList)
|
|
end := offset + limit
|
|
if cacheResp != nil {
|
|
size := len(cacheResp.Tweets)
|
|
logrus.Debugf("simpleCacheIndexServant.IndexPosts get index posts from cache posts: %d offset:%d limit:%d start:%d, end:%d", size, offset, limit, offset, end)
|
|
if size >= end {
|
|
return &ms.IndexTweetList{
|
|
Tweets: cacheResp.Tweets[offset:end],
|
|
Total: cacheResp.Total,
|
|
}, nil
|
|
}
|
|
}
|
|
|
|
logrus.Debugln("simpleCacheIndexServant.IndexPosts get index posts from database")
|
|
return s.ips.IndexPosts(user, offset, limit)
|
|
}
|
|
|
|
func (s *simpleCacheIndexServant) TweetTimeline(userId int64, offset int, limit int) (*cs.TweetBox, error) {
|
|
// TODO
|
|
return nil, debug.ErrNotImplemented
|
|
}
|
|
|
|
func (s *simpleCacheIndexServant) SendAction(act core.IdxAct, _post *ms.Post) {
|
|
select {
|
|
case s.indexActionCh <- act:
|
|
logrus.Debugf("simpleCacheIndexServant.SendAction send indexAction by chan: %s", act)
|
|
default:
|
|
go func(ch chan<- core.IdxAct, act core.IdxAct) {
|
|
logrus.Debugf("simpleCacheIndexServant.SendAction send indexAction by goroutine: %s", act)
|
|
ch <- act
|
|
}(s.indexActionCh, act)
|
|
}
|
|
}
|
|
|
|
func (s *simpleCacheIndexServant) startIndexPosts() {
|
|
var err error
|
|
for {
|
|
select {
|
|
case <-s.checkTick.C:
|
|
if s.indexPosts == nil {
|
|
logrus.Debugf("index posts by checkTick")
|
|
if s.indexPosts, err = s.ips.IndexPosts(nil, 0, s.maxIndexSize); err == nil {
|
|
s.atomicIndex.Store(s.indexPosts)
|
|
} else {
|
|
logrus.Errorf("get index posts err: %v", err)
|
|
}
|
|
}
|
|
case <-s.expireIndexTick.C:
|
|
logrus.Debugf("expire index posts by expireIndexTick")
|
|
if s.indexPosts != nil {
|
|
s.indexPosts = nil
|
|
s.atomicIndex.Store(s.indexPosts)
|
|
}
|
|
case action := <-s.indexActionCh:
|
|
switch action {
|
|
// TODO: 这里列出来是因为后续可能会精细化处理每种情况
|
|
case core.IdxActCreatePost,
|
|
core.IdxActUpdatePost,
|
|
core.IdxActDeletePost,
|
|
core.IdxActStickPost,
|
|
core.IdxActVisiblePost:
|
|
// prevent many update post in least time
|
|
if s.indexPosts != nil {
|
|
logrus.Debugf("remove index posts by action %s", action)
|
|
s.indexPosts = nil
|
|
s.atomicIndex.Store(s.indexPosts)
|
|
}
|
|
default:
|
|
// nop
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
func (s *simpleCacheIndexServant) Name() string {
|
|
return "SimpleCacheIndex"
|
|
}
|
|
|
|
func (s *simpleCacheIndexServant) Version() *semver.Version {
|
|
return semver.MustParse("v0.2.0")
|
|
}
|