Merge pull request #148 from alimy/pr-optimize-core-service-logic

optimize core service implement logic
pull/149/head
Michael Li 2 years ago committed by GitHub
commit 65f8bd626e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -304,6 +304,8 @@ release/paopao-ce --no-default-features --features sqlite3,localoss,loggerfile,r
目前支持的功能集合: 目前支持的功能集合:
* 数据库: MySQL/Sqlite3/PostgreSQL * 数据库: MySQL/Sqlite3/PostgreSQL
`Gorm` + `MySQL`/`Sqlite3`/`PostgreSQL` 使用[gorm](https://github.com/go-gorm/gorm)作为数据库的ORM默认使用 `Grom` + `MySQL`组合(目前状态:稳定,默认,推荐使用)
`Sqlx` + `MySQL`/`PostgreSQL` 使用[sqlx](https://github.com/jmoiron/sqlx)作为数据库的ORM(目前状态WIP)
* 对象存储: AliOSS/MinIO/LocalOSS * 对象存储: AliOSS/MinIO/LocalOSS
`AliOSS` 阿里云对象存储服务; `AliOSS` 阿里云对象存储服务;
`MinIO` [MinIO](https://github.com/minio/minio)对象存储服务; `MinIO` [MinIO](https://github.com/minio/minio)对象存储服务;

@ -19,6 +19,7 @@ Features:
Option: ["SimpleCacheIndex"] Option: ["SimpleCacheIndex"]
Sms: "SmsJuhe" Sms: "SmsJuhe"
SmsJuhe: SmsJuhe:
Gateway: https://v.juhe.cn/sms/send
Key: Key:
TplID: TplID:
TplVal: "#code#=%d&#m#=%d" TplVal: "#code#=%d&#m#=%d"

@ -116,3 +116,27 @@ func Cfg(key string) (string, bool) {
func CfgIf(expression string) bool { func CfgIf(expression string) bool {
return features.CfgIf(expression) return features.CfgIf(expression)
} }
func GetOssDomain() string {
uri := "https://"
if CfgIf("AliOSS") {
return uri + AliOSSSetting.Domain + "/"
} else if CfgIf("MinIO") {
if !MinIOSetting.Secure {
uri = "http://"
}
return uri + MinIOSetting.Domain + "/" + MinIOSetting.Bucket + "/"
} else if CfgIf("S3") {
if !S3Setting.Secure {
uri = "http://"
}
// TODO: will not work well need test in real world
return uri + S3Setting.Domain + "/" + S3Setting.Bucket + "/"
} else if CfgIf("LocalOSS") {
if !LocalOSSSetting.Secure {
uri = "http://"
}
return uri + LocalOSSSetting.Domain + "/oss/" + LocalOSSSetting.Bucket + "/"
}
return uri + AliOSSSetting.Domain + "/"
}

@ -80,9 +80,10 @@ type AlipaySettingS struct {
} }
type SmsJuheSettings struct { type SmsJuheSettings struct {
Key string Gateway string
TplID string Key string
TplVal string TplID string
TplVal string
} }
type FeaturesSettingS struct { type FeaturesSettingS struct {

@ -45,8 +45,8 @@ type Action struct {
// AuthorizationManageService 授权管理服务 // AuthorizationManageService 授权管理服务
type AuthorizationManageService interface { type AuthorizationManageService interface {
IsAllow(user *model.User, action *Action) bool IsAllow(user *model.User, action *Action) bool
GetFriendFilter(userId int64) FriendFilter BeFriendFilter(userId int64) FriendFilter
GetFriendIds(userId int64) []int64 BeFriendIds(userId int64) ([]int64, error)
} }
func (f FriendFilter) IsFriend(userId int64) bool { func (f FriendFilter) IsFriend(userId int64) bool {

@ -2,6 +2,7 @@ package core
import ( import (
"github.com/rocboss/paopao-ce/internal/model" "github.com/rocboss/paopao-ce/internal/model"
"github.com/rocboss/paopao-ce/internal/model/rest"
) )
// TweetService 推文检索服务 // TweetService 推文检索服务
@ -44,5 +45,5 @@ type TweetHelpService interface {
// IndexPostsService 广场首页推文列表服务 // IndexPostsService 广场首页推文列表服务
type IndexPostsService interface { type IndexPostsService interface {
IndexPosts(user *model.User, offset int, limit int) ([]*model.PostFormated, error) IndexPosts(user *model.User, offset int, limit int) (*rest.IndexTweetsResp, error)
} }

@ -0,0 +1,11 @@
package core
import (
"github.com/Masterminds/semver/v3"
)
// VersionInfo 版本信息
type VersionInfo interface {
Name() string
Version() *semver.Version
}

@ -1,19 +0,0 @@
package dao
import (
"fmt"
"strings"
"github.com/rocboss/paopao-ce/internal/model"
)
func (d *dataServant) CreateAttachment(attachment *model.Attachment) (*model.Attachment, error) {
return attachment.Create(d.engine)
}
func (s *attachmentCheckServant) CheckAttachment(uri string) error {
if strings.Index(uri, s.domain) != 0 {
return fmt.Errorf("附件非本站资源")
}
return nil
}

@ -1,37 +0,0 @@
package dao
import (
"github.com/rocboss/paopao-ce/internal/conf"
"github.com/rocboss/paopao-ce/internal/core"
"github.com/rocboss/paopao-ce/internal/model"
)
func newSimpleAuthorizationManageService() *simpleAuthorizationManageService {
return &simpleAuthorizationManageService{
db: conf.DBEngine,
}
}
func (s *simpleAuthorizationManageService) IsAllow(user *model.User, action *core.Action) bool {
// user is activation if had bind phone
isActivation := (len(user.Phone) != 0)
isFriend := s.isFriend(action.UserId)
// TODO: just use defaut act authorization chek rule now
return action.Act.IsAllow(user, action.UserId, isFriend, isActivation)
}
// GetFriendFilter _userId保留未来使用
func (s *simpleAuthorizationManageService) GetFriendFilter(_userId int64) core.FriendFilter {
// TODO: just return an empty friend fileter now
return core.FriendFilter{}
}
func (s *simpleAuthorizationManageService) GetFriendIds(_userId int64) []int64 {
// TODO: just retrun empty now
return nil
}
func (s *simpleAuthorizationManageService) isFriend(_userId int64) bool {
// friend with all world now
return true
}

@ -1,4 +1,4 @@
package dao package cache
import ( import (
"bytes" "bytes"
@ -8,48 +8,32 @@ import (
"github.com/Masterminds/semver/v3" "github.com/Masterminds/semver/v3"
"github.com/allegro/bigcache/v3" "github.com/allegro/bigcache/v3"
"github.com/rocboss/paopao-ce/internal/conf"
"github.com/rocboss/paopao-ce/internal/core" "github.com/rocboss/paopao-ce/internal/core"
"github.com/rocboss/paopao-ce/internal/model" "github.com/rocboss/paopao-ce/internal/model"
"github.com/rocboss/paopao-ce/internal/model/rest"
"github.com/sirupsen/logrus" "github.com/sirupsen/logrus"
) )
func newBigCacheIndexServant(getIndexPosts indexPostsFunc) (*bigCacheIndexServant, versionInfo) { var (
s := conf.BigCacheIndexSetting _ core.CacheIndexService = (*bigCacheIndexServant)(nil)
_ core.VersionInfo = (*bigCacheIndexServant)(nil)
config := bigcache.DefaultConfig(s.ExpireInSecond) )
config.Shards = s.MaxIndexPage
config.Verbose = s.Verbose
config.MaxEntrySize = 10000
config.Logger = logrus.StandardLogger()
cache, err := bigcache.NewBigCache(config)
if err != nil {
logrus.Fatalf("initial bigCahceIndex failure by err: %v", err)
}
cacheIndex := &bigCacheIndexServant{
getIndexPosts: getIndexPosts,
cache: cache,
}
// indexActionCh capacity custom configure by conf.yaml need in [10, 10000] type postsEntry struct {
// or re-compile source to adjust min/max capacity key string
capacity := conf.CacheIndexSetting.MaxUpdateQPS tweets *rest.IndexTweetsResp
if capacity < 10 { }
capacity = 10
} else if capacity > 10000 {
capacity = 10000
}
cacheIndex.indexActionCh = make(chan core.IndexActionT, capacity)
cacheIndex.cachePostsCh = make(chan *postsEntry, capacity)
// 启动索引更新器 type bigCacheIndexServant struct {
go cacheIndex.startIndexPosts() ips core.IndexPostsService
return cacheIndex, cacheIndex indexActionCh chan core.IndexActionT
cachePostsCh chan *postsEntry
cache *bigcache.BigCache
lastCacheResetTime time.Time
} }
func (s *bigCacheIndexServant) IndexPosts(user *model.User, offset int, limit int) ([]*model.PostFormated, error) { func (s *bigCacheIndexServant) IndexPosts(user *model.User, offset int, limit int) (*rest.IndexTweetsResp, error) {
key := s.keyFrom(user, offset, limit) key := s.keyFrom(user, offset, limit)
posts, err := s.getPosts(key) posts, err := s.getPosts(key)
if err == nil { if err == nil {
@ -57,7 +41,7 @@ func (s *bigCacheIndexServant) IndexPosts(user *model.User, offset int, limit in
return posts, nil return posts, nil
} }
if posts, err = s.getIndexPosts(user, offset, limit); err != nil { if posts, err = s.ips.IndexPosts(user, offset, limit); err != nil {
return nil, err return nil, err
} }
logrus.Debugf("bigCacheIndexServant.IndexPosts get index posts from database by key: %s", key) logrus.Debugf("bigCacheIndexServant.IndexPosts get index posts from database by key: %s", key)
@ -65,7 +49,7 @@ func (s *bigCacheIndexServant) IndexPosts(user *model.User, offset int, limit in
return posts, nil return posts, nil
} }
func (s *bigCacheIndexServant) getPosts(key string) ([]*model.PostFormated, error) { func (s *bigCacheIndexServant) getPosts(key string) (*rest.IndexTweetsResp, error) {
data, err := s.cache.Get(key) data, err := s.cache.Get(key)
if err != nil { if err != nil {
logrus.Debugf("bigCacheIndexServant.getPosts get posts by key: %s from cache err: %v", key, err) logrus.Debugf("bigCacheIndexServant.getPosts get posts by key: %s from cache err: %v", key, err)
@ -73,16 +57,16 @@ func (s *bigCacheIndexServant) getPosts(key string) ([]*model.PostFormated, erro
} }
buf := bytes.NewBuffer(data) buf := bytes.NewBuffer(data)
dec := gob.NewDecoder(buf) dec := gob.NewDecoder(buf)
var posts []*model.PostFormated var resp rest.IndexTweetsResp
if err := dec.Decode(&posts); err != nil { if err := dec.Decode(&resp); err != nil {
logrus.Debugf("bigCacheIndexServant.getPosts get posts from cache in decode err: %v", err) logrus.Debugf("bigCacheIndexServant.getPosts get posts from cache in decode err: %v", err)
return nil, err return nil, err
} }
return posts, nil return &resp, nil
} }
func (s *bigCacheIndexServant) cachePosts(key string, posts []*model.PostFormated) { func (s *bigCacheIndexServant) cachePosts(key string, tweets *rest.IndexTweetsResp) {
entry := &postsEntry{key: key, posts: posts} entry := &postsEntry{key: key, tweets: tweets}
select { select {
case s.cachePostsCh <- entry: case s.cachePostsCh <- entry:
logrus.Debugf("bigCacheIndexServant.cachePosts cachePosts by chan of key: %s", key) logrus.Debugf("bigCacheIndexServant.cachePosts cachePosts by chan of key: %s", key)
@ -97,7 +81,7 @@ func (s *bigCacheIndexServant) cachePosts(key string, posts []*model.PostFormate
func (s *bigCacheIndexServant) setPosts(entry *postsEntry) { func (s *bigCacheIndexServant) setPosts(entry *postsEntry) {
var buf bytes.Buffer var buf bytes.Buffer
enc := gob.NewEncoder(&buf) enc := gob.NewEncoder(&buf)
if err := enc.Encode(entry.posts); err != nil { if err := enc.Encode(entry.tweets); err != nil {
logrus.Debugf("bigCacheIndexServant.setPosts setPosts encode post entry err: %v", err) logrus.Debugf("bigCacheIndexServant.setPosts setPosts encode post entry err: %v", err)
return return
} }
@ -153,10 +137,10 @@ func (s *bigCacheIndexServant) startIndexPosts() {
} }
} }
func (s *bigCacheIndexServant) name() string { func (s *bigCacheIndexServant) Name() string {
return "BigCacheIndex" return "BigCacheIndex"
} }
func (s *bigCacheIndexServant) version() *semver.Version { func (s *bigCacheIndexServant) Version() *semver.Version {
return semver.MustParse("v0.1.0") return semver.MustParse("v0.1.0")
} }

@ -0,0 +1,86 @@
package cache
import (
"time"
"github.com/allegro/bigcache/v3"
"github.com/rocboss/paopao-ce/internal/conf"
"github.com/rocboss/paopao-ce/internal/core"
"github.com/sirupsen/logrus"
)
func NewBigCacheIndexService(indexPosts core.IndexPostsService) (core.CacheIndexService, core.VersionInfo) {
s := conf.BigCacheIndexSetting
config := bigcache.DefaultConfig(s.ExpireInSecond)
config.Shards = s.MaxIndexPage
config.Verbose = s.Verbose
config.MaxEntrySize = 10000
config.Logger = logrus.StandardLogger()
cache, err := bigcache.NewBigCache(config)
if err != nil {
logrus.Fatalf("initial bigCahceIndex failure by err: %v", err)
}
cacheIndex := &bigCacheIndexServant{
ips: indexPosts,
cache: cache,
}
// indexActionCh capacity custom configure by conf.yaml need in [10, 10000]
// or re-compile source to adjust min/max capacity
capacity := conf.CacheIndexSetting.MaxUpdateQPS
if capacity < 10 {
capacity = 10
} else if capacity > 10000 {
capacity = 10000
}
cacheIndex.indexActionCh = make(chan core.IndexActionT, capacity)
cacheIndex.cachePostsCh = make(chan *postsEntry, capacity)
// 启动索引更新器
go cacheIndex.startIndexPosts()
return cacheIndex, cacheIndex
}
func NewSimpleCacheIndexService(indexPosts core.IndexPostsService) (core.CacheIndexService, core.VersionInfo) {
s := conf.SimpleCacheIndexSetting
cacheIndex := &simpleCacheIndexServant{
ips: indexPosts,
maxIndexSize: s.MaxIndexSize,
indexPosts: nil,
checkTick: time.NewTicker(s.CheckTickDuration), // check whether need update index every 1 minute
expireIndexTick: time.NewTicker(time.Second),
}
// force expire index every ExpireTickDuration second
if s.ExpireTickDuration != 0 {
cacheIndex.expireIndexTick.Reset(s.CheckTickDuration)
} else {
cacheIndex.expireIndexTick.Stop()
}
// indexActionCh capacity custom configure by conf.yaml need in [10, 10000]
// or re-compile source to adjust min/max capacity
capacity := conf.CacheIndexSetting.MaxUpdateQPS
if capacity < 10 {
capacity = 10
} else if capacity > 10000 {
capacity = 10000
}
cacheIndex.indexActionCh = make(chan core.IndexActionT, capacity)
// start index posts
cacheIndex.atomicIndex.Store(cacheIndex.indexPosts)
go cacheIndex.startIndexPosts()
return cacheIndex, cacheIndex
}
func NewNoneCacheIndexService(indexPosts core.IndexPostsService) (core.CacheIndexService, core.VersionInfo) {
obj := &noneCacheIndexServant{
ips: indexPosts,
}
return obj, obj
}

@ -0,0 +1,33 @@
package cache
import (
"github.com/Masterminds/semver/v3"
"github.com/rocboss/paopao-ce/internal/core"
"github.com/rocboss/paopao-ce/internal/model"
"github.com/rocboss/paopao-ce/internal/model/rest"
)
var (
_ core.CacheIndexService = (*noneCacheIndexServant)(nil)
_ core.VersionInfo = (*noneCacheIndexServant)(nil)
)
type noneCacheIndexServant struct {
ips core.IndexPostsService
}
func (s *noneCacheIndexServant) IndexPosts(user *model.User, offset int, limit int) (*rest.IndexTweetsResp, error) {
return s.ips.IndexPosts(user, offset, limit)
}
func (s *noneCacheIndexServant) SendAction(act core.IndexActionT) {
// empty
}
func (s *noneCacheIndexServant) Name() string {
return "NoneCacheIndex"
}
func (s *noneCacheIndexServant) Version() *semver.Version {
return semver.MustParse("v0.1.0")
}

@ -0,0 +1,106 @@
package cache
import (
"sync/atomic"
"time"
"github.com/Masterminds/semver/v3"
"github.com/rocboss/paopao-ce/internal/core"
"github.com/rocboss/paopao-ce/internal/model"
"github.com/rocboss/paopao-ce/internal/model/rest"
"github.com/sirupsen/logrus"
)
var (
_ core.CacheIndexService = (*simpleCacheIndexServant)(nil)
_ core.VersionInfo = (*simpleCacheIndexServant)(nil)
)
type simpleCacheIndexServant struct {
ips core.IndexPostsService
indexActionCh chan core.IndexActionT
indexPosts *rest.IndexTweetsResp
atomicIndex atomic.Value
maxIndexSize int
checkTick *time.Ticker
expireIndexTick *time.Ticker
}
func (s *simpleCacheIndexServant) IndexPosts(user *model.User, offset int, limit int) (*rest.IndexTweetsResp, error) {
cacheResp := s.atomicIndex.Load().(*rest.IndexTweetsResp)
end := offset + limit
if cacheResp != nil {
size := len(cacheResp.Tweets)
logrus.Debugf("simpleCacheIndexServant.IndexPosts get index posts from cache posts: %d offset:%d limit:%d start:%d, end:%d", size, offset, limit, offset, end)
if size >= end {
return &rest.IndexTweetsResp{
Tweets: cacheResp.Tweets[offset:end],
Total: cacheResp.Total,
}, nil
}
}
logrus.Debugln("simpleCacheIndexServant.IndexPosts get index posts from database")
return s.ips.IndexPosts(user, offset, limit)
}
func (s *simpleCacheIndexServant) SendAction(act core.IndexActionT) {
select {
case s.indexActionCh <- act:
logrus.Debugf("simpleCacheIndexServant.SendAction send indexAction by chan: %s", act)
default:
go func(ch chan<- core.IndexActionT, act core.IndexActionT) {
logrus.Debugf("simpleCacheIndexServant.SendAction send indexAction by goroutine: %s", act)
ch <- act
}(s.indexActionCh, act)
}
}
func (s *simpleCacheIndexServant) startIndexPosts() {
var err error
for {
select {
case <-s.checkTick.C:
if s.indexPosts == nil {
logrus.Debugf("index posts by checkTick")
if s.indexPosts, err = s.ips.IndexPosts(nil, 0, s.maxIndexSize); err == nil {
s.atomicIndex.Store(s.indexPosts)
} else {
logrus.Errorf("get index posts err: %v", err)
}
}
case <-s.expireIndexTick.C:
logrus.Debugf("expire index posts by expireIndexTick")
if s.indexPosts != nil {
s.indexPosts = nil
s.atomicIndex.Store(s.indexPosts)
}
case action := <-s.indexActionCh:
switch action {
// TODO: 这里列出来是因为后续可能会精细化处理每种情况
case core.IdxActCreatePost,
core.IdxActUpdatePost,
core.IdxActDeletePost,
core.IdxActStickPost,
core.IdxActVisiblePost:
// prevent many update post in least time
if s.indexPosts != nil {
logrus.Debugf("remove index posts by action %s", action)
s.indexPosts = nil
s.atomicIndex.Store(s.indexPosts)
}
default:
// nop
}
}
}
}
func (s *simpleCacheIndexServant) Name() string {
return "SimpleCacheIndex"
}
func (s *simpleCacheIndexServant) Version() *semver.Version {
return semver.MustParse("v0.1.0")
}

@ -1,45 +0,0 @@
package dao
import (
"sync/atomic"
"time"
"github.com/allegro/bigcache/v3"
"github.com/rocboss/paopao-ce/internal/core"
"github.com/rocboss/paopao-ce/internal/model"
)
var (
_ core.CacheIndexService = (*simpleCacheIndexServant)(nil)
_ core.CacheIndexService = (*bigCacheIndexServant)(nil)
_ core.CacheIndexService = (*noneCacheIndexServant)(nil)
)
type postsEntry struct {
key string
posts []*model.PostFormated
}
type indexPostsFunc func(*model.User, int, int) ([]*model.PostFormated, error)
type bigCacheIndexServant struct {
getIndexPosts indexPostsFunc
indexActionCh chan core.IndexActionT
cachePostsCh chan *postsEntry
cache *bigcache.BigCache
lastCacheResetTime time.Time
}
type simpleCacheIndexServant struct {
getIndexPosts indexPostsFunc
indexActionCh chan core.IndexActionT
indexPosts []*model.PostFormated
atomicIndex atomic.Value
maxIndexSize int
checkTick *time.Ticker
expireIndexTick *time.Ticker
}
type noneCacheIndexServant struct {
getIndexPosts indexPostsFunc
}

@ -1,30 +0,0 @@
package dao
import (
"github.com/Masterminds/semver/v3"
"github.com/rocboss/paopao-ce/internal/core"
"github.com/rocboss/paopao-ce/internal/model"
)
func newNoneCacheIndexServant(getIndexPosts indexPostsFunc) (*noneCacheIndexServant, versionInfo) {
obj := &noneCacheIndexServant{
getIndexPosts: getIndexPosts,
}
return obj, obj
}
func (s *noneCacheIndexServant) IndexPosts(user *model.User, offset int, limit int) ([]*model.PostFormated, error) {
return s.getIndexPosts(user, offset, limit)
}
func (s *noneCacheIndexServant) SendAction(act core.IndexActionT) {
// empty
}
func (s *noneCacheIndexServant) name() string {
return "NoneCacheIndex"
}
func (s *noneCacheIndexServant) version() *semver.Version {
return semver.MustParse("v0.1.0")
}

@ -1,118 +0,0 @@
package dao
import (
"time"
"github.com/Masterminds/semver/v3"
"github.com/rocboss/paopao-ce/internal/conf"
"github.com/rocboss/paopao-ce/internal/core"
"github.com/rocboss/paopao-ce/internal/model"
"github.com/sirupsen/logrus"
)
func newSimpleCacheIndexServant(getIndexPosts indexPostsFunc) (*simpleCacheIndexServant, versionInfo) {
s := conf.SimpleCacheIndexSetting
cacheIndex := &simpleCacheIndexServant{
getIndexPosts: getIndexPosts,
maxIndexSize: s.MaxIndexSize,
indexPosts: make([]*model.PostFormated, 0),
checkTick: time.NewTicker(s.CheckTickDuration), // check whether need update index every 1 minute
expireIndexTick: time.NewTicker(time.Second),
}
// force expire index every ExpireTickDuration second
if s.ExpireTickDuration != 0 {
cacheIndex.expireIndexTick.Reset(s.CheckTickDuration)
} else {
cacheIndex.expireIndexTick.Stop()
}
// indexActionCh capacity custom configure by conf.yaml need in [10, 10000]
// or re-compile source to adjust min/max capacity
capacity := conf.CacheIndexSetting.MaxUpdateQPS
if capacity < 10 {
capacity = 10
} else if capacity > 10000 {
capacity = 10000
}
cacheIndex.indexActionCh = make(chan core.IndexActionT, capacity)
// start index posts
cacheIndex.atomicIndex.Store(cacheIndex.indexPosts)
go cacheIndex.startIndexPosts()
return cacheIndex, cacheIndex
}
func (s *simpleCacheIndexServant) IndexPosts(user *model.User, offset int, limit int) ([]*model.PostFormated, error) {
posts := s.atomicIndex.Load().([]*model.PostFormated)
end := offset + limit
size := len(posts)
logrus.Debugf("get index posts from posts: %d offset:%d limit:%d start:%d, end:%d", size, offset, limit, offset, end)
if size >= end {
return posts[offset:end], nil
}
logrus.Debugln("simpleCacheIndexServant.IndexPosts get index posts from database")
return s.getIndexPosts(user, offset, limit)
}
func (s *simpleCacheIndexServant) SendAction(act core.IndexActionT) {
select {
case s.indexActionCh <- act:
logrus.Debugf("send indexAction by chan: %s", act)
default:
go func(ch chan<- core.IndexActionT, act core.IndexActionT) {
logrus.Debugf("send indexAction by goroutine: %s", act)
ch <- act
}(s.indexActionCh, act)
}
}
func (s *simpleCacheIndexServant) startIndexPosts() {
var err error
for {
select {
case <-s.checkTick.C:
if len(s.indexPosts) == 0 {
logrus.Debugf("index posts by checkTick")
if s.indexPosts, err = s.getIndexPosts(nil, 0, s.maxIndexSize); err == nil {
s.atomicIndex.Store(s.indexPosts)
} else {
logrus.Errorf("get index posts err: %v", err)
}
}
case <-s.expireIndexTick.C:
logrus.Debugf("expire index posts by expireIndexTick")
if len(s.indexPosts) != 0 {
s.indexPosts = nil
s.atomicIndex.Store(s.indexPosts)
}
case action := <-s.indexActionCh:
switch action {
// TODO: 这里列出来是因为后续可能会精细化处理每种情况
case core.IdxActCreatePost,
core.IdxActUpdatePost,
core.IdxActDeletePost,
core.IdxActStickPost,
core.IdxActVisiblePost:
// prevent many update post in least time
if len(s.indexPosts) != 0 {
logrus.Debugf("remove index posts by action %s", action)
s.indexPosts = nil
s.atomicIndex.Store(s.indexPosts)
}
default:
// nop
}
}
}
}
func (s *simpleCacheIndexServant) name() string {
return "SimpleCacheIndex"
}
func (s *simpleCacheIndexServant) version() *semver.Version {
return semver.MustParse("v0.1.0")
}

@ -1,45 +0,0 @@
package dao
import "github.com/rocboss/paopao-ce/internal/model"
func (d *dataServant) GetComments(conditions *model.ConditionsT, offset, limit int) ([]*model.Comment, error) {
return (&model.Comment{}).List(d.engine, conditions, offset, limit)
}
func (d *dataServant) GetCommentByID(id int64) (*model.Comment, error) {
comment := &model.Comment{
Model: &model.Model{
ID: id,
},
}
return comment.Get(d.engine)
}
func (d *dataServant) DeleteComment(comment *model.Comment) error {
return comment.Delete(d.engine)
}
func (d *dataServant) GetCommentCount(conditions *model.ConditionsT) (int64, error) {
return (&model.Comment{}).Count(d.engine, conditions)
}
func (d *dataServant) CreateComment(comment *model.Comment) (*model.Comment, error) {
return comment.Create(d.engine)
}
func (d *dataServant) CreateCommentReply(reply *model.CommentReply) (*model.CommentReply, error) {
return reply.Create(d.engine)
}
func (d *dataServant) GetCommentReplyByID(id int64) (*model.CommentReply, error) {
reply := &model.CommentReply{
Model: &model.Model{
ID: id,
},
}
return reply.Get(d.engine)
}
func (d *dataServant) DeleteCommentReply(reply *model.CommentReply) error {
return reply.Delete(d.engine)
}

@ -1,50 +0,0 @@
package dao
import "github.com/rocboss/paopao-ce/internal/model"
func (d *dataServant) GetCommentContentsByIDs(ids []int64) ([]*model.CommentContent, error) {
commentContent := &model.CommentContent{}
return commentContent.List(d.engine, &model.ConditionsT{
"comment_id IN ?": ids,
}, 0, 0)
}
func (d *dataServant) GetCommentRepliesByID(ids []int64) ([]*model.CommentReplyFormated, error) {
CommentReply := &model.CommentReply{}
replies, err := CommentReply.List(d.engine, &model.ConditionsT{
"comment_id IN ?": ids,
}, 0, 0)
if err != nil {
return nil, err
}
userIds := []int64{}
for _, reply := range replies {
userIds = append(userIds, reply.UserID, reply.AtUserID)
}
users, err := d.GetUsersByIDs(userIds)
if err != nil {
return nil, err
}
repliesFormated := []*model.CommentReplyFormated{}
for _, reply := range replies {
replyFormated := reply.Format()
for _, user := range users {
if reply.UserID == user.ID {
replyFormated.User = user.Format()
}
if reply.AtUserID == user.ID {
replyFormated.AtUser = user.Format()
}
}
repliesFormated = append(repliesFormated, replyFormated)
}
return repliesFormated, nil
}
func (d *dataServant) CreateCommentContent(content *model.CommentContent) (*model.CommentContent, error) {
return content.Create(d.engine)
}

@ -3,59 +3,80 @@ package dao
import ( import (
"github.com/rocboss/paopao-ce/internal/conf" "github.com/rocboss/paopao-ce/internal/conf"
"github.com/rocboss/paopao-ce/internal/core" "github.com/rocboss/paopao-ce/internal/core"
"github.com/rocboss/paopao-ce/internal/dao/jinzhu"
"github.com/rocboss/paopao-ce/internal/dao/sakila"
"github.com/rocboss/paopao-ce/internal/dao/search"
"github.com/rocboss/paopao-ce/internal/dao/slonik"
"github.com/rocboss/paopao-ce/internal/dao/storage"
"github.com/sirupsen/logrus" "github.com/sirupsen/logrus"
"gorm.io/gorm"
) )
var ( func NewDataService() (s core.DataService) {
_ core.DataService = (*dataServant)(nil) var v core.VersionInfo
_ core.AuthorizationManageService = (*simpleAuthorizationManageService)(nil) if conf.CfgIf("Gorm") {
) s, v = jinzhu.NewDataService()
} else if conf.CfgIf("Sqlx") && conf.CfgIf("MySQL") {
type dataServant struct { s, v = sakila.NewDataService()
core.CacheIndexService } else if conf.CfgIf("Sqlx") && (conf.CfgIf("Postgres") || conf.CfgIf("PostgreSQL")) {
core.AttachmentCheckService s, v = slonik.NewDataService()
} else {
ams core.AuthorizationManageService // default use gorm as orm for sql database
engine *gorm.DB s, v = jinzhu.NewDataService()
} }
logrus.Infof("use %s as data service with version %s", v.Name(), v.Version())
type simpleAuthorizationManageService struct { return
db *gorm.DB
}
type attachmentCheckServant struct {
domain string
} }
func NewDataService() core.DataService { func NewObjectStorageService() (oss core.ObjectStorageService) {
ds := &dataServant{ var v core.VersionInfo
ams: newAuthorizationManageService(), if conf.CfgIf("AliOSS") {
AttachmentCheckService: NewAttachmentCheckService(), oss, v = storage.NewAliossService()
} else if conf.CfgIf("MinIO") {
engine: conf.DBEngine, oss, v = storage.NewMinioService()
} else if conf.CfgIf("S3") {
oss, v = storage.NewS3Service()
logrus.Infof("use S3 as object storage by version %s", v.Version())
return
} else if conf.CfgIf("LocalOSS") {
oss, v = storage.NewLocalossService()
} else {
// default use AliOSS as object storage service
oss, v = storage.NewAliossService()
logrus.Infof("use default AliOSS as object storage by version %s", v.Version())
return
} }
logrus.Infof("use %s as object storage by version %s", v.Name(), v.Version())
return
}
// initialize CacheIndex if needed func NewTweetSearchService() core.TweetSearchService {
var v versionInfo var (
if conf.CfgIf("SimpleCacheIndex") { ts core.TweetSearchService
ds.CacheIndexService, v = newSimpleCacheIndexServant(ds.simpleCacheIndexGetPosts) v core.VersionInfo
} else if conf.CfgIf("BigCacheIndex") { )
ds.CacheIndexService, v = newBigCacheIndexServant(ds.getIndexPosts) ams := newAuthorizationManageService()
if conf.CfgIf("Zinc") {
ts, v = search.NewZincTweetSearchService(ams)
} else if conf.CfgIf("Meili") {
ts, v = search.NewMeiliTweetSearchService(ams)
} else { } else {
ds.CacheIndexService, v = newNoneCacheIndexServant(ds.getIndexPosts) // default use Zinc as tweet search service
ts, v = search.NewZincTweetSearchService(ams)
} }
logrus.Infof("use %s as cache index service by version: %s", v.name(), v.version()) logrus.Infof("use %s as tweet search serice by version %s", v.Name(), v.Version())
return ds return search.NewBridgeTweetSearchService(ts)
} }
func NewAttachmentCheckService() core.AttachmentCheckService { func newAuthorizationManageService() (s core.AuthorizationManageService) {
return &attachmentCheckServant{ if conf.CfgIf("Gorm") {
domain: getOssDomain(), s = jinzhu.NewAuthorizationManageService()
} else if conf.CfgIf("Sqlx") && conf.CfgIf("MySQL") {
s = sakila.NewAuthorizationManageService()
} else if conf.CfgIf("Sqlx") && (conf.CfgIf("Postgres") || conf.CfgIf("PostgreSQL")) {
s = slonik.NewAuthorizationManageService()
} else {
s = jinzhu.NewAuthorizationManageService()
} }
} return
func newAuthorizationManageService() core.AuthorizationManageService {
return newSimpleAuthorizationManageService()
} }

@ -0,0 +1,38 @@
package jinzhu
import (
"github.com/rocboss/paopao-ce/internal/core"
"github.com/rocboss/paopao-ce/internal/model"
"gorm.io/gorm"
)
var (
_ core.AuthorizationManageService = (*authorizationManageServant)(nil)
)
type authorizationManageServant struct {
db *gorm.DB
}
func (s *authorizationManageServant) IsAllow(user *model.User, action *core.Action) bool {
// user is activation if had bind phone
isActivation := (len(user.Phone) != 0)
isFriend := s.isFriend(user.ID, action.UserId)
// TODO: just use defaut act authorization chek rule now
return action.Act.IsAllow(user, action.UserId, isFriend, isActivation)
}
func (s *authorizationManageServant) BeFriendFilter(userId int64) core.FriendFilter {
// just empty now
return core.FriendFilter{}
}
func (s *authorizationManageServant) BeFriendIds(userId int64) ([]int64, error) {
// just empty now
return []int64{}, nil
}
func (s *authorizationManageServant) isFriend(userId int64, friendId int64) bool {
// just true now
return true
}

@ -0,0 +1,122 @@
package jinzhu
import (
"github.com/rocboss/paopao-ce/internal/core"
"github.com/rocboss/paopao-ce/internal/model"
"gorm.io/gorm"
)
var (
_ core.CommentService = (*commentServant)(nil)
_ core.CommentManageService = (*commentManageServant)(nil)
)
type commentServant struct {
db *gorm.DB
}
type commentManageServant struct {
db *gorm.DB
}
func newCommentService(db *gorm.DB) core.CommentService {
return &commentServant{
db: db,
}
}
func newCommentManageService(db *gorm.DB) core.CommentManageService {
return &commentManageServant{
db: db,
}
}
func (s *commentServant) GetComments(conditions *model.ConditionsT, offset, limit int) ([]*model.Comment, error) {
return (&model.Comment{}).List(s.db, conditions, offset, limit)
}
func (s *commentServant) GetCommentByID(id int64) (*model.Comment, error) {
comment := &model.Comment{
Model: &model.Model{
ID: id,
},
}
return comment.Get(s.db)
}
func (s *commentServant) GetCommentReplyByID(id int64) (*model.CommentReply, error) {
reply := &model.CommentReply{
Model: &model.Model{
ID: id,
},
}
return reply.Get(s.db)
}
func (s *commentServant) GetCommentCount(conditions *model.ConditionsT) (int64, error) {
return (&model.Comment{}).Count(s.db, conditions)
}
func (s *commentServant) GetCommentContentsByIDs(ids []int64) ([]*model.CommentContent, error) {
commentContent := &model.CommentContent{}
return commentContent.List(s.db, &model.ConditionsT{
"comment_id IN ?": ids,
}, 0, 0)
}
func (s *commentServant) GetCommentRepliesByID(ids []int64) ([]*model.CommentReplyFormated, error) {
CommentReply := &model.CommentReply{}
replies, err := CommentReply.List(s.db, &model.ConditionsT{
"comment_id IN ?": ids,
}, 0, 0)
if err != nil {
return nil, err
}
userIds := []int64{}
for _, reply := range replies {
userIds = append(userIds, reply.UserID, reply.AtUserID)
}
users, err := getUsersByIDs(s.db, userIds)
if err != nil {
return nil, err
}
repliesFormated := []*model.CommentReplyFormated{}
for _, reply := range replies {
replyFormated := reply.Format()
for _, user := range users {
if reply.UserID == user.ID {
replyFormated.User = user.Format()
}
if reply.AtUserID == user.ID {
replyFormated.AtUser = user.Format()
}
}
repliesFormated = append(repliesFormated, replyFormated)
}
return repliesFormated, nil
}
func (s *commentManageServant) DeleteComment(comment *model.Comment) error {
return comment.Delete(s.db)
}
func (s *commentManageServant) CreateComment(comment *model.Comment) (*model.Comment, error) {
return comment.Create(s.db)
}
func (s *commentManageServant) CreateCommentReply(reply *model.CommentReply) (*model.CommentReply, error) {
return reply.Create(s.db)
}
func (s *commentManageServant) DeleteCommentReply(reply *model.CommentReply) error {
return reply.Delete(s.db)
}
func (s *commentManageServant) CreateCommentContent(content *model.CommentContent) (*model.CommentContent, error) {
return content.Create(s.db)
}

@ -0,0 +1,105 @@
package jinzhu
import (
"github.com/rocboss/paopao-ce/internal/core"
"github.com/rocboss/paopao-ce/internal/model"
"github.com/rocboss/paopao-ce/internal/model/rest"
"github.com/rocboss/paopao-ce/pkg/types"
"github.com/sirupsen/logrus"
"gorm.io/gorm"
)
var (
_ core.IndexPostsService = (*indexPostsServant)(nil)
_ core.IndexPostsService = (*simpleIndexPostsServant)(nil)
)
type indexPostsServant struct {
ams core.AuthorizationManageService
ths core.TweetHelpService
db *gorm.DB
}
type simpleIndexPostsServant struct {
ths core.TweetHelpService
db *gorm.DB
}
func newIndexPostsService(db *gorm.DB) core.IndexPostsService {
return &indexPostsServant{
ams: NewAuthorizationManageService(),
ths: newTweetHelpService(db),
db: db,
}
}
func newSimpleIndexPostsService(db *gorm.DB) core.IndexPostsService {
return &simpleIndexPostsServant{
ths: newTweetHelpService(db),
db: db,
}
}
// IndexPosts 根据userId查询广场推文列表简单做到不同用户的主页都是不同的
func (s *indexPostsServant) IndexPosts(user *model.User, offset int, limit int) (*rest.IndexTweetsResp, error) {
predicates := model.Predicates{
"ORDER": types.AnySlice{"is_top DESC, latest_replied_on DESC"},
}
if user == nil {
predicates["visibility = ?"] = []types.Any{model.PostVisitPublic}
} else if !user.IsAdmin {
friendIds, _ := s.ams.BeFriendIds(user.ID)
friendIds = append(friendIds, user.ID)
args := types.AnySlice{model.PostVisitPublic, model.PostVisitPrivate, user.ID, model.PostVisitFriend, friendIds}
predicates["visibility = ? OR (visibility = ? AND user_id = ?) OR (visibility = ? AND user_id IN ?)"] = args
}
posts, err := (&model.Post{}).Fetch(s.db, predicates, offset, limit)
if err != nil {
logrus.Debugf("gormIndexPostsServant.IndexPosts err: %v", err)
return nil, err
}
formatPosts, err := s.ths.MergePosts(posts)
if err != nil {
return nil, err
}
total, err := (&model.Post{}).CountBy(s.db, predicates)
if err != nil {
return nil, err
}
return &rest.IndexTweetsResp{
Tweets: formatPosts,
Total: total,
}, nil
}
// simpleCacheIndexGetPosts simpleCacheIndex 专属获取广场推文列表函数
func (s *simpleIndexPostsServant) IndexPosts(_user *model.User, offset int, limit int) (*rest.IndexTweetsResp, error) {
predicates := model.Predicates{
"visibility = ?": []types.Any{model.PostVisitPublic},
"ORDER": []types.Any{"is_top DESC, latest_replied_on DESC"},
}
posts, err := (&model.Post{}).Fetch(s.db, predicates, offset, limit)
if err != nil {
logrus.Debugf("gormSimpleIndexPostsServant.IndexPosts err: %v", err)
return nil, err
}
formatPosts, err := s.ths.MergePosts(posts)
if err != nil {
return nil, err
}
total, err := (&model.Post{}).CountBy(s.db, predicates)
if err != nil {
return nil, err
}
return &rest.IndexTweetsResp{
Tweets: formatPosts,
Total: total,
}, nil
}

@ -0,0 +1,84 @@
// Core service implement base gorm+mysql/postgresql/sqlite3.
// Jinzhu is the primary developer of gorm so use his name as
// pakcage name as a saluter.
package jinzhu
import (
"github.com/Masterminds/semver/v3"
"github.com/rocboss/paopao-ce/internal/conf"
"github.com/rocboss/paopao-ce/internal/core"
"github.com/rocboss/paopao-ce/internal/dao/cache"
"github.com/rocboss/paopao-ce/internal/dao/security"
"github.com/sirupsen/logrus"
)
var (
_ core.DataService = (*dataServant)(nil)
_ core.VersionInfo = (*dataServant)(nil)
)
type dataServant struct {
core.IndexPostsService
core.WalletService
core.MessageService
core.TopicService
core.TweetService
core.TweetManageService
core.TweetHelpService
core.CommentService
core.CommentManageService
core.UserManageService
core.SecurityService
core.AttachmentCheckService
}
func NewDataService() (core.DataService, core.VersionInfo) {
// initialize CacheIndex if needed
var (
c core.CacheIndexService
v core.VersionInfo
)
db := conf.DBEngine
i := newIndexPostsService(db)
if conf.CfgIf("SimpleCacheIndex") {
i = newSimpleIndexPostsService(db)
c, v = cache.NewSimpleCacheIndexService(i)
} else if conf.CfgIf("BigCacheIndex") {
c, v = cache.NewBigCacheIndexService(i)
} else {
c, v = cache.NewNoneCacheIndexService(i)
}
logrus.Infof("use %s as cache index service by version: %s", v.Name(), v.Version())
ds := &dataServant{
IndexPostsService: c,
WalletService: newWalletService(db),
MessageService: newMessageService(db),
TopicService: newTopicService(db),
TweetService: newTweetService(db),
TweetManageService: newTweetManageService(db, c),
TweetHelpService: newTweetHelpService(db),
CommentService: newCommentService(db),
CommentManageService: newCommentManageService(db),
UserManageService: newUserManageService(db),
SecurityService: newSecurityService(db),
AttachmentCheckService: security.NewAttachmentCheckService(),
}
return ds, ds
}
func NewAuthorizationManageService() core.AuthorizationManageService {
return &authorizationManageServant{
db: conf.DBEngine,
}
}
func (s *dataServant) Name() string {
return "Gorm"
}
func (s *dataServant) Version() *semver.Version {
return semver.MustParse("v0.1.0")
}

@ -0,0 +1,64 @@
package jinzhu
import (
"github.com/rocboss/paopao-ce/internal/core"
"github.com/rocboss/paopao-ce/internal/model"
"gorm.io/gorm"
)
var (
_ core.MessageService = (*messageServant)(nil)
)
type messageServant struct {
db *gorm.DB
}
func newMessageService(db *gorm.DB) core.MessageService {
return &messageServant{
db: db,
}
}
func (d *messageServant) CreateMessage(msg *model.Message) (*model.Message, error) {
return msg.Create(d.db)
}
func (d *messageServant) GetUnreadCount(userID int64) (int64, error) {
return (&model.Message{}).Count(d.db, &model.ConditionsT{
"receiver_user_id": userID,
"is_read": model.MsgStatusUnread,
})
}
func (d *messageServant) GetMessageByID(id int64) (*model.Message, error) {
return (&model.Message{
Model: &model.Model{
ID: id,
},
}).Get(d.db)
}
func (d *messageServant) ReadMessage(message *model.Message) error {
message.IsRead = 1
return message.Update(d.db)
}
func (d *messageServant) GetMessages(conditions *model.ConditionsT, offset, limit int) ([]*model.MessageFormated, error) {
messages, err := (&model.Message{}).List(d.db, conditions, offset, limit)
if err != nil {
return nil, err
}
mfs := []*model.MessageFormated{}
for _, message := range messages {
mf := message.Format()
mfs = append(mfs, mf)
}
return mfs, nil
}
func (d *messageServant) GetMessageCount(conditions *model.ConditionsT) (int64, error) {
return (&model.Message{}).Count(d.db, conditions)
}

@ -0,0 +1,92 @@
package jinzhu
import (
"errors"
"fmt"
"math/rand"
"net/http"
"strconv"
"time"
"github.com/rocboss/paopao-ce/internal/conf"
"github.com/rocboss/paopao-ce/internal/core"
"github.com/rocboss/paopao-ce/internal/model"
"github.com/rocboss/paopao-ce/pkg/json"
"gopkg.in/resty.v1"
"gorm.io/gorm"
)
var (
_ core.SecurityService = (*securityServant)(nil)
)
type securityServant struct {
db *gorm.DB
}
func newSecurityService(db *gorm.DB) core.SecurityService {
return &securityServant{
db: db,
}
}
type juhePhoneCaptchaRsp struct {
ErrorCode int `json:"error_code"`
Reason string `json:"reason"`
}
// 获取最新短信验证码
func (s *securityServant) GetLatestPhoneCaptcha(phone string) (*model.Captcha, error) {
return (&model.Captcha{
Phone: phone,
}).Get(s.db)
}
// 更新短信验证码
func (s *securityServant) UsePhoneCaptcha(captcha *model.Captcha) error {
captcha.UseTimes++
return captcha.Update(s.db)
}
// 发送短信验证码
func (s *securityServant) SendPhoneCaptcha(phone string) error {
rand.Seed(time.Now().UnixNano())
captcha := rand.Intn(900000) + 100000
m := 5
client := resty.New()
client.DisableWarn = true
resp, err := client.R().
SetFormData(map[string]string{
"mobile": phone,
"tpl_id": conf.SmsJuheSetting.TplID,
"tpl_value": fmt.Sprintf(conf.SmsJuheSetting.TplVal, captcha, m),
"key": conf.SmsJuheSetting.Key,
}).Post(conf.SmsJuheSetting.Gateway)
if err != nil {
return err
}
if resp.StatusCode() != http.StatusOK {
return errors.New(resp.Status())
}
result := &juhePhoneCaptchaRsp{}
err = json.Unmarshal(resp.Body(), result)
if err != nil {
return err
}
if result.ErrorCode != 0 {
return errors.New(result.Reason)
}
// 写入表
captchaModel := &model.Captcha{
Phone: phone,
Captcha: strconv.Itoa(captcha),
ExpiredOn: time.Now().Add(time.Minute * time.Duration(m)).Unix(),
}
captchaModel.Create(s.db)
return nil
}

@ -0,0 +1,51 @@
package jinzhu
import (
"strings"
"github.com/rocboss/paopao-ce/internal/core"
"github.com/rocboss/paopao-ce/internal/model"
"gorm.io/gorm"
)
var (
_ core.TopicService = (*topicServant)(nil)
)
type topicServant struct {
db *gorm.DB
}
func newTopicService(db *gorm.DB) core.TopicService {
return &topicServant{
db: db,
}
}
func (s *topicServant) CreateTag(tag *model.Tag) (*model.Tag, error) {
return createTag(s.db, tag)
}
func (s *topicServant) DeleteTag(tag *model.Tag) error {
return deleteTag(s.db, tag)
}
func (s *topicServant) GetTags(conditions *model.ConditionsT, offset, limit int) ([]*model.Tag, error) {
return (&model.Tag{}).List(s.db, conditions, offset, limit)
}
func (s *topicServant) GetTagsByKeyword(keyword string) ([]*model.Tag, error) {
tag := &model.Tag{}
keyword = "%" + strings.Trim(keyword, " ") + "%"
if keyword == "%%" {
return tag.List(s.db, &model.ConditionsT{
"ORDER": "quote_num DESC",
}, 0, 6)
} else {
return tag.List(s.db, &model.ConditionsT{
"tag LIKE ?": keyword,
"ORDER": "quote_num DESC",
}, 0, 6)
}
}

@ -0,0 +1,361 @@
package jinzhu
import (
"strings"
"time"
"github.com/rocboss/paopao-ce/internal/core"
"github.com/rocboss/paopao-ce/internal/model"
"gorm.io/gorm"
)
var (
_ core.TweetService = (*tweetServant)(nil)
_ core.TweetManageService = (*tweetManageServant)(nil)
_ core.TweetHelpService = (*tweetHelpServant)(nil)
)
type tweetServant struct {
db *gorm.DB
}
type tweetManageServant struct {
cacheIndex core.CacheIndexService
db *gorm.DB
}
type tweetHelpServant struct {
db *gorm.DB
}
func newTweetService(db *gorm.DB) core.TweetService {
return &tweetServant{
db: db,
}
}
func newTweetManageService(db *gorm.DB, cacheIndex core.CacheIndexService) core.TweetManageService {
return &tweetManageServant{
cacheIndex: cacheIndex,
db: db,
}
}
func newTweetHelpService(db *gorm.DB) core.TweetHelpService {
return &tweetHelpServant{
db: db,
}
}
// MergePosts post数据整合
func (s *tweetHelpServant) MergePosts(posts []*model.Post) ([]*model.PostFormated, error) {
postIds := make([]int64, 0, len(posts))
userIds := make([]int64, 0, len(posts))
for _, post := range posts {
postIds = append(postIds, post.ID)
userIds = append(userIds, post.UserID)
}
postContents, err := s.getPostContentsByIDs(postIds)
if err != nil {
return nil, err
}
users, err := s.getUsersByIDs(userIds)
if err != nil {
return nil, err
}
userMap := make(map[int64]*model.UserFormated, len(users))
for _, user := range users {
userMap[user.ID] = user.Format()
}
contentMap := make(map[int64][]*model.PostContentFormated, len(postContents))
for _, content := range postContents {
contentMap[content.PostID] = append(contentMap[content.PostID], content.Format())
}
// 数据整合
postsFormated := make([]*model.PostFormated, 0, len(posts))
for _, post := range posts {
postFormated := post.Format()
postFormated.User = userMap[post.UserID]
postFormated.Contents = contentMap[post.ID]
postsFormated = append(postsFormated, postFormated)
}
return postsFormated, nil
}
// RevampPosts post数据整形修复
func (s *tweetHelpServant) RevampPosts(posts []*model.PostFormated) ([]*model.PostFormated, error) {
postIds := make([]int64, 0, len(posts))
userIds := make([]int64, 0, len(posts))
for _, post := range posts {
postIds = append(postIds, post.ID)
userIds = append(userIds, post.UserID)
}
postContents, err := s.getPostContentsByIDs(postIds)
if err != nil {
return nil, err
}
users, err := s.getUsersByIDs(userIds)
if err != nil {
return nil, err
}
userMap := make(map[int64]*model.UserFormated, len(users))
for _, user := range users {
userMap[user.ID] = user.Format()
}
contentMap := make(map[int64][]*model.PostContentFormated, len(postContents))
for _, content := range postContents {
contentMap[content.PostID] = append(contentMap[content.PostID], content.Format())
}
// 数据整合
for _, post := range posts {
post.User = userMap[post.UserID]
post.Contents = contentMap[post.ID]
}
return posts, nil
}
func (s *tweetHelpServant) getPostContentsByIDs(ids []int64) ([]*model.PostContent, error) {
return (&model.PostContent{}).List(s.db, &model.ConditionsT{
"post_id IN ?": ids,
"ORDER": "sort ASC",
}, 0, 0)
}
func (s *tweetHelpServant) getUsersByIDs(ids []int64) ([]*model.User, error) {
user := &model.User{}
return user.List(s.db, &model.ConditionsT{
"id IN ?": ids,
}, 0, 0)
}
func (s *tweetManageServant) CreatePostCollection(postID, userID int64) (*model.PostCollection, error) {
collection := &model.PostCollection{
PostID: postID,
UserID: userID,
}
return collection.Create(s.db)
}
func (s *tweetManageServant) DeletePostCollection(p *model.PostCollection) error {
return p.Delete(s.db)
}
func (s *tweetManageServant) CreatePostContent(content *model.PostContent) (*model.PostContent, error) {
return content.Create(s.db)
}
func (s *tweetManageServant) CreateAttachment(attachment *model.Attachment) (*model.Attachment, error) {
return attachment.Create(s.db)
}
func (s *tweetManageServant) CreatePost(post *model.Post) (*model.Post, error) {
post.LatestRepliedOn = time.Now().Unix()
p, err := post.Create(s.db)
if err != nil {
return nil, err
}
s.cacheIndex.SendAction(core.IdxActCreatePost)
return p, nil
}
func (s *tweetManageServant) DeletePost(post *model.Post) error {
if err := post.Delete(s.db); err != nil {
return err
}
s.cacheIndex.SendAction(core.IdxActDeletePost)
return nil
}
func (s *tweetManageServant) LockPost(post *model.Post) error {
post.IsLock = 1 - post.IsLock
return post.Update(s.db)
}
func (s *tweetManageServant) StickPost(post *model.Post) error {
post.IsTop = 1 - post.IsTop
if err := post.Update(s.db); err != nil {
return err
}
s.cacheIndex.SendAction(core.IdxActStickPost)
return nil
}
func (s *tweetManageServant) VisiblePost(post *model.Post, visibility model.PostVisibleT) error {
oldVisibility := post.Visibility
post.Visibility = visibility
// TODO: 这个判断是否可以不要呢
if oldVisibility == visibility {
return nil
}
// 私密推文 特殊处理
if visibility == model.PostVisitPrivate {
// 强制取消置顶
// TODO: 置顶推文用户是否有权设置成私密? 后续完善
post.IsTop = 0
}
db := s.db.Begin()
err := post.Update(db)
if err != nil {
db.Rollback()
return err
}
// tag处理
tags := strings.Split(post.Tags, ",")
for _, t := range tags {
tag := &model.Tag{
Tag: t,
}
// TODO: 暂时宽松不处理错误,这里或许可以有优化,后续完善
if oldVisibility == model.PostVisitPrivate {
// 从私密转为非私密才需要重新创建tag
createTag(db, tag)
} else if visibility == model.PostVisitPrivate {
// 从非私密转为私密才需要删除tag
deleteTag(db, tag)
}
}
db.Commit()
s.cacheIndex.SendAction(core.IdxActVisiblePost)
return nil
}
func (s *tweetManageServant) UpdatePost(post *model.Post) error {
if err := post.Update(s.db); err != nil {
return err
}
s.cacheIndex.SendAction(core.IdxActUpdatePost)
return nil
}
func (s *tweetManageServant) CreatePostStar(postID, userID int64) (*model.PostStar, error) {
star := &model.PostStar{
PostID: postID,
UserID: userID,
}
return star.Create(s.db)
}
func (s *tweetManageServant) DeletePostStar(p *model.PostStar) error {
return p.Delete(s.db)
}
func (s *tweetServant) GetPostByID(id int64) (*model.Post, error) {
post := &model.Post{
Model: &model.Model{
ID: id,
},
}
return post.Get(s.db)
}
func (s *tweetServant) GetPosts(conditions *model.ConditionsT, offset, limit int) ([]*model.Post, error) {
return (&model.Post{}).List(s.db, conditions, offset, limit)
}
func (s *tweetServant) GetPostCount(conditions *model.ConditionsT) (int64, error) {
return (&model.Post{}).Count(s.db, conditions)
}
func (s *tweetServant) GetUserPostStar(postID, userID int64) (*model.PostStar, error) {
star := &model.PostStar{
PostID: postID,
UserID: userID,
}
return star.Get(s.db)
}
func (s *tweetServant) GetUserPostStars(userID int64, offset, limit int) ([]*model.PostStar, error) {
star := &model.PostStar{
UserID: userID,
}
return star.List(s.db, &model.ConditionsT{
"ORDER": s.db.NamingStrategy.TableName("PostStar") + ".id DESC",
}, offset, limit)
}
func (s *tweetServant) GetUserPostStarCount(userID int64) (int64, error) {
star := &model.PostStar{
UserID: userID,
}
return star.Count(s.db, &model.ConditionsT{})
}
func (s *tweetServant) GetUserPostCollection(postID, userID int64) (*model.PostCollection, error) {
star := &model.PostCollection{
PostID: postID,
UserID: userID,
}
return star.Get(s.db)
}
func (s *tweetServant) GetUserPostCollections(userID int64, offset, limit int) ([]*model.PostCollection, error) {
collection := &model.PostCollection{
UserID: userID,
}
return collection.List(s.db, &model.ConditionsT{
"ORDER": s.db.NamingStrategy.TableName("PostCollection") + ".id DESC",
}, offset, limit)
}
func (s *tweetServant) GetUserPostCollectionCount(userID int64) (int64, error) {
collection := &model.PostCollection{
UserID: userID,
}
return collection.Count(s.db, &model.ConditionsT{})
}
func (s *tweetServant) GetUserWalletBills(userID int64, offset, limit int) ([]*model.WalletStatement, error) {
statement := &model.WalletStatement{
UserID: userID,
}
return statement.List(s.db, &model.ConditionsT{
"ORDER": "id DESC",
}, offset, limit)
}
func (s *tweetServant) GetUserWalletBillCount(userID int64) (int64, error) {
statement := &model.WalletStatement{
UserID: userID,
}
return statement.Count(s.db, &model.ConditionsT{})
}
func (s *tweetServant) GetPostAttatchmentBill(postID, userID int64) (*model.PostAttachmentBill, error) {
bill := &model.PostAttachmentBill{
PostID: postID,
UserID: userID,
}
return bill.Get(s.db)
}
func (s *tweetServant) GetPostContentsByIDs(ids []int64) ([]*model.PostContent, error) {
return (&model.PostContent{}).List(s.db, &model.ConditionsT{
"post_id IN ?": ids,
"ORDER": "sort ASC",
}, 0, 0)
}
func (s *tweetServant) GetPostContentByID(id int64) (*model.PostContent, error) {
return (&model.PostContent{
Model: &model.Model{
ID: id,
},
}).Get(s.db)
}

@ -0,0 +1,95 @@
package jinzhu
import (
"strings"
"github.com/rocboss/paopao-ce/internal/core"
"github.com/rocboss/paopao-ce/internal/model"
"gorm.io/gorm"
)
var (
_ core.UserManageService = (*userManageServant)(nil)
)
type userManageServant struct {
db *gorm.DB
}
func newUserManageService(db *gorm.DB) core.UserManageService {
return &userManageServant{
db: db,
}
}
func (s *userManageServant) GetUserByID(id int64) (*model.User, error) {
user := &model.User{
Model: &model.Model{
ID: id,
},
}
return user.Get(s.db)
}
func (s *userManageServant) GetUserByUsername(username string) (*model.User, error) {
user := &model.User{
Username: username,
}
return user.Get(s.db)
}
func (s *userManageServant) GetUserByPhone(phone string) (*model.User, error) {
user := &model.User{
Phone: phone,
}
return user.Get(s.db)
}
func (s *userManageServant) GetUsersByIDs(ids []int64) ([]*model.User, error) {
user := &model.User{}
return user.List(s.db, &model.ConditionsT{
"id IN ?": ids,
}, 0, 0)
}
func (s *userManageServant) GetUsersByKeyword(keyword string) ([]*model.User, error) {
user := &model.User{}
keyword = strings.Trim(keyword, " ") + "%"
if keyword == "%" {
return user.List(s.db, &model.ConditionsT{
"ORDER": "id ASC",
}, 0, 6)
} else {
return user.List(s.db, &model.ConditionsT{
"username LIKE ?": keyword,
}, 0, 6)
}
}
func (s *userManageServant) GetTagsByKeyword(keyword string) ([]*model.Tag, error) {
tag := &model.Tag{}
keyword = "%" + strings.Trim(keyword, " ") + "%"
if keyword == "%%" {
return tag.List(s.db, &model.ConditionsT{
"ORDER": "quote_num DESC",
}, 0, 6)
} else {
return tag.List(s.db, &model.ConditionsT{
"tag LIKE ?": keyword,
"ORDER": "quote_num DESC",
}, 0, 6)
}
}
func (s *userManageServant) CreateUser(user *model.User) (*model.User, error) {
return user.Create(s.db)
}
func (s *userManageServant) UpdateUser(user *model.User) error {
return user.Update(s.db)
}
func (s *userManageServant) IsFriend(userId int64, friendId int64) bool {
// just true now
return true
}

@ -0,0 +1,41 @@
package jinzhu
import (
"github.com/rocboss/paopao-ce/internal/model"
"gorm.io/gorm"
)
func createTag(db *gorm.DB, tag *model.Tag) (*model.Tag, error) {
t, err := tag.Get(db)
if err != nil {
tag.QuoteNum = 1
return tag.Create(db)
}
// 更新
t.QuoteNum++
err = t.Update(db)
if err != nil {
return nil, err
}
return t, nil
}
func deleteTag(db *gorm.DB, tag *model.Tag) error {
tag, err := tag.Get(db)
if err != nil {
return err
}
tag.QuoteNum--
return tag.Update(db)
}
// 根据IDs获取用户列表
func getUsersByIDs(db *gorm.DB, ids []int64) ([]*model.User, error) {
user := &model.User{}
return user.List(db, &model.ConditionsT{
"id IN ?": ids,
}, 0, 0)
}

@ -1,37 +1,70 @@
package dao package jinzhu
import ( import (
"github.com/rocboss/paopao-ce/internal/conf" "github.com/rocboss/paopao-ce/internal/conf"
"github.com/rocboss/paopao-ce/internal/core"
"github.com/rocboss/paopao-ce/internal/model" "github.com/rocboss/paopao-ce/internal/model"
"github.com/rocboss/paopao-ce/pkg/types"
"gorm.io/gorm" "gorm.io/gorm"
) )
func (d *dataServant) GetRechargeByID(id int64) (*model.WalletRecharge, error) { var (
_ core.WalletService = (*walletServant)(nil)
)
type walletServant struct {
db *gorm.DB
}
func newWalletService(db *gorm.DB) core.WalletService {
return &walletServant{
db: db,
}
}
func (d *walletServant) GetRechargeByID(id int64) (*model.WalletRecharge, error) {
recharge := &model.WalletRecharge{ recharge := &model.WalletRecharge{
Model: &model.Model{ Model: &model.Model{
ID: id, ID: id,
}, },
} }
return recharge.Get(d.engine) return recharge.Get(d.db)
} }
func (d *dataServant) CreateRecharge(userId, amount int64) (*model.WalletRecharge, error) { func (d *walletServant) CreateRecharge(userId, amount int64) (*model.WalletRecharge, error) {
recharge := &model.WalletRecharge{ recharge := &model.WalletRecharge{
UserID: userId, UserID: userId,
Amount: amount, Amount: amount,
} }
return recharge.Create(d.engine) return recharge.Create(d.db)
}
func (d *walletServant) GetUserWalletBills(userID int64, offset, limit int) ([]*model.WalletStatement, error) {
statement := &model.WalletStatement{
UserID: userID,
}
return statement.List(d.db, &model.ConditionsT{
"ORDER": "id DESC",
}, offset, limit)
}
func (d *walletServant) GetUserWalletBillCount(userID int64) (int64, error) {
statement := &model.WalletStatement{
UserID: userID,
}
return statement.Count(d.db, &model.ConditionsT{})
} }
func (d *dataServant) HandleRechargeSuccess(recharge *model.WalletRecharge, tradeNo string) error { func (d *walletServant) HandleRechargeSuccess(recharge *model.WalletRecharge, tradeNo string) error {
user, _ := (&model.User{ user, _ := (&model.User{
Model: &model.Model{ Model: &model.Model{
ID: recharge.UserID, ID: recharge.UserID,
}, },
}).Get(d.engine) }).Get(d.db)
return d.engine.Transaction(func(tx *gorm.DB) error { return d.db.Transaction(func(tx *gorm.DB) error {
// 扣除金额 // 扣除金额
if err := tx.Model(user).Update("balance", gorm.Expr("balance + ?", recharge.Amount)).Error; err != nil { if err := tx.Model(user).Update("balance", gorm.Expr("balance + ?", recharge.Amount)).Error; err != nil {
// 返回任何错误都会回滚事务 // 返回任何错误都会回滚事务
@ -49,7 +82,7 @@ func (d *dataServant) HandleRechargeSuccess(recharge *model.WalletRecharge, trad
} }
// 标记为已付款 // 标记为已付款
if err := tx.Model(recharge).Updates(map[string]interface{}{ if err := tx.Model(recharge).Updates(map[string]types.Any{
"trade_no": tradeNo, "trade_no": tradeNo,
"trade_status": "TRADE_SUCCESS", "trade_status": "TRADE_SUCCESS",
}).Error; err != nil { }).Error; err != nil {
@ -61,8 +94,8 @@ func (d *dataServant) HandleRechargeSuccess(recharge *model.WalletRecharge, trad
}) })
} }
func (d *dataServant) HandlePostAttachmentBought(post *model.Post, user *model.User) error { func (d *walletServant) HandlePostAttachmentBought(post *model.Post, user *model.User) error {
return d.engine.Transaction(func(tx *gorm.DB) error { return d.db.Transaction(func(tx *gorm.DB) error {
// 扣除金额 // 扣除金额
if err := tx.Model(user).Update("balance", gorm.Expr("balance - ?", post.AttachmentPrice)).Error; err != nil { if err := tx.Model(user).Update("balance", gorm.Expr("balance - ?", post.AttachmentPrice)).Error; err != nil {
// 返回任何错误都会回滚事务 // 返回任何错误都会回滚事务
@ -97,7 +130,7 @@ func (d *dataServant) HandlePostAttachmentBought(post *model.Post, user *model.U
ID: post.UserID, ID: post.UserID,
}, },
} }
master, _ = master.Get(d.engine) master, _ = master.Get(d.db)
if err := tx.Model(master).Update("balance", gorm.Expr("balance + ?", income)).Error; err != nil { if err := tx.Model(master).Update("balance", gorm.Expr("balance + ?", income)).Error; err != nil {
// 返回任何错误都会回滚事务 // 返回任何错误都会回滚事务

@ -1,46 +0,0 @@
package dao
import "github.com/rocboss/paopao-ce/internal/model"
func (d *dataServant) CreateMessage(msg *model.Message) (*model.Message, error) {
return msg.Create(d.engine)
}
func (d *dataServant) GetUnreadCount(userID int64) (int64, error) {
return (&model.Message{}).Count(d.engine, &model.ConditionsT{
"receiver_user_id": userID,
"is_read": model.MSG_UNREAD,
})
}
func (d *dataServant) GetMessageByID(id int64) (*model.Message, error) {
return (&model.Message{
Model: &model.Model{
ID: id,
},
}).Get(d.engine)
}
func (d *dataServant) ReadMessage(message *model.Message) error {
message.IsRead = 1
return message.Update(d.engine)
}
func (d *dataServant) GetMessages(conditions *model.ConditionsT, offset, limit int) ([]*model.MessageFormated, error) {
messages, err := (&model.Message{}).List(d.engine, conditions, offset, limit)
if err != nil {
return nil, err
}
mfs := []*model.MessageFormated{}
for _, message := range messages {
mf := message.Format()
mfs = append(mfs, mf)
}
return mfs, nil
}
func (d *dataServant) GetMessageCount(conditions *model.ConditionsT) (int64, error) {
return (&model.Message{}).Count(d.engine, conditions)
}

@ -1,56 +0,0 @@
package dao
import (
"github.com/aliyun/aliyun-oss-go-sdk/oss"
"github.com/minio/minio-go/v7"
"github.com/rocboss/paopao-ce/internal/conf"
"github.com/rocboss/paopao-ce/internal/core"
"github.com/sirupsen/logrus"
)
var (
_ core.ObjectStorageService = (*aliossServant)(nil)
_ core.ObjectStorageService = (*minioServant)(nil)
_ core.ObjectStorageService = (*s3Servant)(nil)
_ core.ObjectStorageService = (*localossServant)(nil)
)
type localossServant struct {
savePath string
domain string
}
type aliossServant struct {
bucket *oss.Bucket
domain string
}
type minioServant struct {
client *minio.Client
bucket string
domain string
}
type s3Servant = minioServant
func NewObjectStorageService() (oss core.ObjectStorageService) {
var v versionInfo
if conf.CfgIf("AliOSS") {
oss, v = newAliossServent()
} else if conf.CfgIf("MinIO") {
oss, v = newMinioServeant()
} else if conf.CfgIf("S3") {
oss, v = newS3Servent()
logrus.Infof("use S3 as object storage by version %s", v.version())
return
} else if conf.CfgIf("LocalOSS") {
oss, v = newLocalossServent()
} else {
// default use AliOSS as object storage service
oss, v = newAliossServent()
logrus.Infof("use default AliOSS as object storage by version %s", v.version())
return
}
logrus.Infof("use %s as object storage by version %s", v.name(), v.version())
return
}

@ -1,26 +0,0 @@
package dao
import (
"github.com/minio/minio-go/v7"
"github.com/minio/minio-go/v7/pkg/credentials"
"github.com/rocboss/paopao-ce/internal/conf"
"github.com/sirupsen/logrus"
)
func newS3Servent() (*s3Servant, versionInfo) {
// Initialize s3 client object use minio-go.
client, err := minio.New(conf.S3Setting.Endpoint, &minio.Options{
Creds: credentials.NewStaticV4(conf.S3Setting.AccessKey, conf.S3Setting.SecretKey, ""),
Secure: conf.S3Setting.Secure,
})
if err != nil {
logrus.Fatalf("s3.New err: %v", err)
}
obj := &s3Servant{
client: client,
bucket: conf.MinIOSetting.Bucket,
domain: getOssDomain(),
}
return obj, obj
}

@ -1,286 +0,0 @@
package dao
import (
"strings"
"time"
"github.com/rocboss/paopao-ce/internal/core"
"github.com/rocboss/paopao-ce/internal/model"
)
func (d *dataServant) CreatePost(post *model.Post) (*model.Post, error) {
post.LatestRepliedOn = time.Now().Unix()
p, err := post.Create(d.engine)
if err != nil {
return nil, err
}
d.SendAction(core.IdxActCreatePost)
return p, nil
}
func (d *dataServant) DeletePost(post *model.Post) error {
if err := post.Delete(d.engine); err != nil {
return err
}
d.SendAction(core.IdxActDeletePost)
return nil
}
func (d *dataServant) LockPost(post *model.Post) error {
post.IsLock = 1 - post.IsLock
return post.Update(d.engine)
}
func (d *dataServant) StickPost(post *model.Post) error {
post.IsTop = 1 - post.IsTop
if err := post.Update(d.engine); err != nil {
return err
}
d.SendAction(core.IdxActStickPost)
return nil
}
func (d *dataServant) VisiblePost(post *model.Post, visibility model.PostVisibleT) error {
oldVisibility := post.Visibility
post.Visibility = visibility
// TODO: 这个判断是否可以不要呢
if oldVisibility == visibility {
return nil
}
// 私密推文 特殊处理
if visibility == model.PostVisitPrivate {
// 强制取消置顶
// TODO: 置顶推文用户是否有权设置成私密? 后续完善
post.IsTop = 0
}
db := d.engine.Begin()
err := post.Update(db)
if err != nil {
db.Rollback()
return err
}
// tag处理
tags := strings.Split(post.Tags, ",")
for _, t := range tags {
tag := &model.Tag{
Tag: t,
}
// TODO: 暂时宽松不处理错误,这里或许可以有优化,后续完善
if oldVisibility == model.PostVisitPrivate {
// 从私密转为非私密才需要重新创建tag
d.createTag(db, tag)
} else if visibility == model.PostVisitPrivate {
// 从非私密转为私密才需要删除tag
d.deleteTag(db, tag)
}
}
db.Commit()
d.SendAction(core.IdxActVisiblePost)
return nil
}
func (d *dataServant) GetPostByID(id int64) (*model.Post, error) {
post := &model.Post{
Model: &model.Model{
ID: id,
},
}
return post.Get(d.engine)
}
func (d *dataServant) GetPosts(conditions *model.ConditionsT, offset, limit int) ([]*model.Post, error) {
return (&model.Post{}).List(d.engine, conditions, offset, limit)
}
func (d *dataServant) GetPostCount(conditions *model.ConditionsT) (int64, error) {
return (&model.Post{}).Count(d.engine, conditions)
}
func (d *dataServant) UpdatePost(post *model.Post) error {
if err := post.Update(d.engine); err != nil {
return err
}
d.SendAction(core.IdxActUpdatePost)
return nil
}
func (d *dataServant) GetUserPostStar(postID, userID int64) (*model.PostStar, error) {
star := &model.PostStar{
PostID: postID,
UserID: userID,
}
return star.Get(d.engine)
}
func (d *dataServant) GetUserPostStars(userID int64, offset, limit int) ([]*model.PostStar, error) {
star := &model.PostStar{
UserID: userID,
}
return star.List(d.engine, &model.ConditionsT{
"ORDER": d.engine.NamingStrategy.TableName("PostStar") + ".id DESC",
}, offset, limit)
}
func (d *dataServant) GetUserPostStarCount(userID int64) (int64, error) {
star := &model.PostStar{
UserID: userID,
}
return star.Count(d.engine, &model.ConditionsT{})
}
func (d *dataServant) CreatePostStar(postID, userID int64) (*model.PostStar, error) {
star := &model.PostStar{
PostID: postID,
UserID: userID,
}
return star.Create(d.engine)
}
func (d *dataServant) DeletePostStar(p *model.PostStar) error {
return p.Delete(d.engine)
}
func (d *dataServant) GetUserPostCollection(postID, userID int64) (*model.PostCollection, error) {
star := &model.PostCollection{
PostID: postID,
UserID: userID,
}
return star.Get(d.engine)
}
func (d *dataServant) GetUserPostCollections(userID int64, offset, limit int) ([]*model.PostCollection, error) {
collection := &model.PostCollection{
UserID: userID,
}
return collection.List(d.engine, &model.ConditionsT{
"ORDER": d.engine.NamingStrategy.TableName("PostCollection") + ".id DESC",
}, offset, limit)
}
func (d *dataServant) GetUserPostCollectionCount(userID int64) (int64, error) {
collection := &model.PostCollection{
UserID: userID,
}
return collection.Count(d.engine, &model.ConditionsT{})
}
func (d *dataServant) GetUserWalletBills(userID int64, offset, limit int) ([]*model.WalletStatement, error) {
statement := &model.WalletStatement{
UserID: userID,
}
return statement.List(d.engine, &model.ConditionsT{
"ORDER": "id DESC",
}, offset, limit)
}
func (d *dataServant) GetUserWalletBillCount(userID int64) (int64, error) {
statement := &model.WalletStatement{
UserID: userID,
}
return statement.Count(d.engine, &model.ConditionsT{})
}
func (d *dataServant) CreatePostCollection(postID, userID int64) (*model.PostCollection, error) {
collection := &model.PostCollection{
PostID: postID,
UserID: userID,
}
return collection.Create(d.engine)
}
func (d *dataServant) DeletePostCollection(p *model.PostCollection) error {
return p.Delete(d.engine)
}
func (d *dataServant) GetPostAttatchmentBill(postID, userID int64) (*model.PostAttachmentBill, error) {
bill := &model.PostAttachmentBill{
PostID: postID,
UserID: userID,
}
return bill.Get(d.engine)
}
// MergePosts post数据整合
func (d *dataServant) MergePosts(posts []*model.Post) ([]*model.PostFormated, error) {
postIds := make([]int64, 0, len(posts))
userIds := make([]int64, 0, len(posts))
for _, post := range posts {
postIds = append(postIds, post.ID)
userIds = append(userIds, post.UserID)
}
postContents, err := d.GetPostContentsByIDs(postIds)
if err != nil {
return nil, err
}
users, err := d.GetUsersByIDs(userIds)
if err != nil {
return nil, err
}
userMap := make(map[int64]*model.UserFormated, len(users))
for _, user := range users {
userMap[user.ID] = user.Format()
}
contentMap := make(map[int64][]*model.PostContentFormated, len(postContents))
for _, content := range postContents {
contentMap[content.PostID] = append(contentMap[content.PostID], content.Format())
}
// 数据整合
postsFormated := make([]*model.PostFormated, 0, len(posts))
for _, post := range posts {
postFormated := post.Format()
postFormated.User = userMap[post.UserID]
postFormated.Contents = contentMap[post.ID]
postsFormated = append(postsFormated, postFormated)
}
return postsFormated, nil
}
// RevampPosts post数据整形修复
func (d *dataServant) RevampPosts(posts []*model.PostFormated) ([]*model.PostFormated, error) {
postIds := make([]int64, 0, len(posts))
userIds := make([]int64, 0, len(posts))
for _, post := range posts {
postIds = append(postIds, post.ID)
userIds = append(userIds, post.UserID)
}
postContents, err := d.GetPostContentsByIDs(postIds)
if err != nil {
return nil, err
}
users, err := d.GetUsersByIDs(userIds)
if err != nil {
return nil, err
}
userMap := make(map[int64]*model.UserFormated, len(users))
for _, user := range users {
userMap[user.ID] = user.Format()
}
contentMap := make(map[int64][]*model.PostContentFormated, len(postContents))
for _, content := range postContents {
contentMap[content.PostID] = append(contentMap[content.PostID], content.Format())
}
// 数据整合
for _, post := range posts {
post.User = userMap[post.UserID]
post.Contents = contentMap[post.ID]
}
return posts, nil
}

@ -1,22 +0,0 @@
package dao
import "github.com/rocboss/paopao-ce/internal/model"
func (d *dataServant) CreatePostContent(content *model.PostContent) (*model.PostContent, error) {
return content.Create(d.engine)
}
func (d *dataServant) GetPostContentsByIDs(ids []int64) ([]*model.PostContent, error) {
return (&model.PostContent{}).List(d.engine, &model.ConditionsT{
"post_id IN ?": ids,
"ORDER": "sort ASC",
}, 0, 0)
}
func (d *dataServant) GetPostContentByID(id int64) (*model.PostContent, error) {
return (&model.PostContent{
Model: &model.Model{
ID: id,
},
}).Get(d.engine)
}

@ -1,51 +0,0 @@
package dao
import (
"github.com/rocboss/paopao-ce/internal/model"
"github.com/rocboss/paopao-ce/pkg/types"
"github.com/sirupsen/logrus"
)
// getIndexPosts TODO: 未来可能根据userId查询广场推文列表简单做到不同用户的主页都是不同的
func (d *dataServant) getIndexPosts(user *model.User, offset int, limit int) ([]*model.PostFormated, error) {
predicates := model.Predicates{
"ORDER": types.AnySlice{"is_top DESC, latest_replied_on DESC"},
}
if user == nil {
predicates["visibility = ?"] = types.AnySlice{model.PostVisitPublic}
} else if !user.IsAdmin {
frienddIds := d.ams.GetFriendIds(user.ID)
if len(frienddIds) > 0 {
args := types.AnySlice{model.PostVisitPublic, model.PostVisitPrivate, user.ID, model.PostVisitFriend, frienddIds}
predicates["visibility = ? OR (visibility = ? AND user_id = ?) OR (visibility = ? AND user_id IN ?"] = args
} else {
// TODO: 目前不处理没朋友情况默认用户与世界都是朋友但是目前无从知晓朋友id所以这里特殊处理后续完善
args := types.AnySlice{model.PostVisitPrivate, user.ID, model.PostVisitPublic, model.PostVisitFriend}
predicates["(visibility = ? AND user_id = ?) OR visibility = ? OR visibility = ?"] = args
}
}
posts, err := (&model.Post{}).Fetch(d.engine, predicates, offset, limit)
if err != nil {
logrus.Debugf("dataServant.getIndexPosts err: %v", err)
return nil, err
}
return d.MergePosts(posts)
}
// simpleCacheIndexGetPosts simpleCacheIndex 专属获取广场推文列表函数
func (d *dataServant) simpleCacheIndexGetPosts(_user *model.User, offset int, limit int) ([]*model.PostFormated, error) {
predicates := model.Predicates{
"visibility IN ?": types.AnySlice{[]model.PostVisibleT{model.PostVisitPublic, model.PostVisitPublic}},
"ORDER": types.AnySlice{"is_top DESC, latest_replied_on DESC"},
}
posts, err := (&model.Post{}).Fetch(d.engine, predicates, offset, limit)
if err != nil {
logrus.Debugf("dataServant.simpleCacheIndexGetPosts err: %v", err)
return nil, err
}
return d.MergePosts(posts)
}

@ -0,0 +1,19 @@
// Core service implement base sqlx+mysql. All sub-service
// will declare here and provide initial function.
package sakila
import (
"github.com/rocboss/paopao-ce/internal/core"
"github.com/sirupsen/logrus"
)
func NewDataService() (core.DataService, core.VersionInfo) {
logrus.Fatal("not support now")
return nil, nil
}
func NewAuthorizationManageService() core.AuthorizationManageService {
logrus.Fatal("not support now")
return nil
}

@ -1,86 +0,0 @@
package dao
import (
"github.com/meilisearch/meilisearch-go"
"github.com/rocboss/paopao-ce/internal/conf"
"github.com/rocboss/paopao-ce/internal/core"
"github.com/rocboss/paopao-ce/pkg/zinc"
"github.com/sirupsen/logrus"
)
var (
_ core.TweetSearchService = (*zincTweetSearchServant)(nil)
_ core.TweetSearchService = (*bridgeTweetSearchServant)(nil)
)
type documents struct {
primaryKey []string
docItems core.DocItems
identifiers []string
}
type bridgeTweetSearchServant struct {
ts core.TweetSearchService
updateDocsCh chan *documents
}
type tweetSearchFilter struct {
ams core.AuthorizationManageService
}
type zincTweetSearchServant struct {
tweetSearchFilter
indexName string
client *zinc.ZincClient
publicFilter string
privateFilter string
friendFilter string
}
type meiliTweetSearchServant struct {
tweetSearchFilter
client *meilisearch.Client
index *meilisearch.Index
publicFilter string
privateFilter string
friendFilter string
}
func NewTweetSearchService() core.TweetSearchService {
bts := &bridgeTweetSearchServant{}
capacity := conf.TweetSearchSetting.MaxUpdateQPS
if capacity < 10 {
capacity = 10
} else if capacity > 10000 {
capacity = 10000
}
bts.updateDocsCh = make(chan *documents, capacity)
var v versionInfo
if conf.CfgIf("Zinc") {
bts.ts, v = newZincTweetSearchServant()
} else if conf.CfgIf("Meili") {
bts.ts, v = newMeiliTweetSearchServant()
} else {
// default use Zinc as tweet search service
bts.ts, v = newZincTweetSearchServant()
}
logrus.Infof("use %s as tweet search serice by version %s", v.name(), v.version())
numWorker := conf.TweetSearchSetting.MinWorker
if numWorker < 5 {
numWorker = 5
} else if numWorker > 1000 {
numWorker = 1000
}
logrus.Debugf("use %d backend worker to update documents to search engine", numWorker)
// 启动文档更新器
for ; numWorker > 0; numWorker-- {
go bts.startUpdateDocs()
}
return bts
}

@ -1,4 +1,4 @@
package dao package search
import ( import (
"github.com/rocboss/paopao-ce/internal/core" "github.com/rocboss/paopao-ce/internal/core"
@ -6,6 +6,21 @@ import (
"github.com/sirupsen/logrus" "github.com/sirupsen/logrus"
) )
var (
_ core.TweetSearchService = (*bridgeTweetSearchServant)(nil)
)
type documents struct {
primaryKey []string
docItems core.DocItems
identifiers []string
}
type bridgeTweetSearchServant struct {
ts core.TweetSearchService
updateDocsCh chan *documents
}
func (s *bridgeTweetSearchServant) IndexName() string { func (s *bridgeTweetSearchServant) IndexName() string {
return s.ts.IndexName() return s.ts.IndexName()
} }

@ -1,10 +1,15 @@
package dao package search
import ( import (
"github.com/rocboss/paopao-ce/internal/core" "github.com/rocboss/paopao-ce/internal/core"
"github.com/rocboss/paopao-ce/internal/model" "github.com/rocboss/paopao-ce/internal/model"
"github.com/rocboss/paopao-ce/pkg/types"
) )
type tweetSearchFilter struct {
ams core.AuthorizationManageService
}
func (s *tweetSearchFilter) filterResp(user *model.User, resp *core.QueryResp) { func (s *tweetSearchFilter) filterResp(user *model.User, resp *core.QueryResp) {
// 管理员不过滤 // 管理员不过滤
if user != nil && user.IsAdmin { if user != nil && user.IsAdmin {
@ -27,7 +32,8 @@ func (s *tweetSearchFilter) filterResp(user *model.User, resp *core.QueryResp) {
} }
} else { } else {
var cutFriend, cutPrivate bool var cutFriend, cutPrivate bool
friendFilter := s.ams.GetFriendFilter(user.ID) friendFilter := s.ams.BeFriendFilter(user.ID)
friendFilter[user.ID] = types.Empty{}
for i := 0; i <= latestIndex; i++ { for i := 0; i <= latestIndex; i++ {
item = items[i] item = items[i]
cutFriend = (item.Visibility == model.PostVisitFriend && !friendFilter.IsFriend(item.UserID)) cutFriend = (item.Visibility == model.PostVisitFriend && !friendFilter.IsFriend(item.UserID))

@ -1,58 +1,36 @@
package dao package search
import ( import (
"fmt" "fmt"
"github.com/Masterminds/semver/v3" "github.com/Masterminds/semver/v3"
"github.com/meilisearch/meilisearch-go" "github.com/meilisearch/meilisearch-go"
"github.com/rocboss/paopao-ce/internal/conf"
"github.com/rocboss/paopao-ce/internal/core" "github.com/rocboss/paopao-ce/internal/core"
"github.com/rocboss/paopao-ce/internal/model" "github.com/rocboss/paopao-ce/internal/model"
"github.com/rocboss/paopao-ce/pkg/json" "github.com/rocboss/paopao-ce/pkg/json"
"github.com/sirupsen/logrus" "github.com/sirupsen/logrus"
) )
func newMeiliTweetSearchServant() (*meiliTweetSearchServant, versionInfo) { var (
s := conf.MeiliSetting _ core.TweetSearchService = (*meiliTweetSearchServant)(nil)
client := meilisearch.NewClient(meilisearch.ClientConfig{ _ core.VersionInfo = (*meiliTweetSearchServant)(nil)
Host: s.Endpoint(), )
APIKey: s.ApiKey,
}) type meiliTweetSearchServant struct {
tweetSearchFilter
if _, err := client.Index(s.Index).FetchInfo(); err != nil {
logrus.Debugf("create index because fetch index info error: %v", err) client *meilisearch.Client
client.CreateIndex(&meilisearch.IndexConfig{ index *meilisearch.Index
Uid: s.Index, publicFilter string
PrimaryKey: "id", privateFilter string
}) friendFilter string
searchableAttributes := []string{"content", "tags"}
sortableAttributes := []string{"is_top", "latest_replied_on"}
filterableAttributes := []string{"tags", "visibility", "user_id"}
index := client.Index(s.Index)
index.UpdateSearchableAttributes(&searchableAttributes)
index.UpdateSortableAttributes(&sortableAttributes)
index.UpdateFilterableAttributes(&filterableAttributes)
}
obj := &meiliTweetSearchServant{
tweetSearchFilter: tweetSearchFilter{
ams: newAuthorizationManageService(),
},
client: client,
index: client.Index(s.Index),
publicFilter: fmt.Sprintf("visibility=%d", model.PostVisitPublic),
privateFilter: fmt.Sprintf("visibility=%d AND user_id=", model.PostVisitPrivate),
friendFilter: fmt.Sprintf("visibility=%d", model.PostVisitFriend),
}
return obj, obj
} }
func (s *meiliTweetSearchServant) name() string { func (s *meiliTweetSearchServant) Name() string {
return "Meili" return "Meili"
} }
func (s *meiliTweetSearchServant) version() *semver.Version { func (s *meiliTweetSearchServant) Version() *semver.Version {
return semver.MustParse("v0.2.0") return semver.MustParse("v0.2.0")
} }

@ -0,0 +1,92 @@
package search
import (
"fmt"
"github.com/meilisearch/meilisearch-go"
"github.com/rocboss/paopao-ce/internal/conf"
"github.com/rocboss/paopao-ce/internal/core"
"github.com/rocboss/paopao-ce/internal/model"
"github.com/rocboss/paopao-ce/pkg/zinc"
"github.com/sirupsen/logrus"
)
func NewMeiliTweetSearchService(ams core.AuthorizationManageService) (core.TweetSearchService, core.VersionInfo) {
s := conf.MeiliSetting
client := meilisearch.NewClient(meilisearch.ClientConfig{
Host: s.Endpoint(),
APIKey: s.ApiKey,
})
if _, err := client.Index(s.Index).FetchInfo(); err != nil {
logrus.Debugf("create index because fetch index info error: %v", err)
client.CreateIndex(&meilisearch.IndexConfig{
Uid: s.Index,
PrimaryKey: "id",
})
searchableAttributes := []string{"content", "tags"}
sortableAttributes := []string{"is_top", "latest_replied_on"}
filterableAttributes := []string{"tags", "visibility", "user_id"}
index := client.Index(s.Index)
index.UpdateSearchableAttributes(&searchableAttributes)
index.UpdateSortableAttributes(&sortableAttributes)
index.UpdateFilterableAttributes(&filterableAttributes)
}
mts := &meiliTweetSearchServant{
tweetSearchFilter: tweetSearchFilter{
ams: ams,
},
client: client,
index: client.Index(s.Index),
publicFilter: fmt.Sprintf("visibility=%d", model.PostVisitPublic),
privateFilter: fmt.Sprintf("visibility=%d AND user_id=", model.PostVisitPrivate),
friendFilter: fmt.Sprintf("visibility=%d", model.PostVisitFriend),
}
return mts, mts
}
func NewZincTweetSearchService(ams core.AuthorizationManageService) (core.TweetSearchService, core.VersionInfo) {
s := conf.ZincSetting
zts := &zincTweetSearchServant{
tweetSearchFilter: tweetSearchFilter{
ams: ams,
},
indexName: s.Index,
client: zinc.NewClient(s),
publicFilter: fmt.Sprintf("visibility:%d", model.PostVisitPublic),
privateFilter: fmt.Sprintf("visibility:%d AND user_id:%%d", model.PostVisitPrivate),
friendFilter: fmt.Sprintf("visibility:%d", model.PostVisitFriend),
}
zts.createIndex()
return zts, zts
}
func NewBridgeTweetSearchService(ts core.TweetSearchService) core.TweetSearchService {
capacity := conf.TweetSearchSetting.MaxUpdateQPS
if capacity < 10 {
capacity = 10
} else if capacity > 10000 {
capacity = 10000
}
bts := &bridgeTweetSearchServant{
ts: ts,
updateDocsCh: make(chan *documents, capacity),
}
numWorker := conf.TweetSearchSetting.MinWorker
if numWorker < 5 {
numWorker = 5
} else if numWorker > 1000 {
numWorker = 1000
}
logrus.Debugf("use %d backend worker to update documents to search engine", numWorker)
// 启动文档更新器
for ; numWorker > 0; numWorker-- {
go bts.startUpdateDocs()
}
return bts
}

@ -1,39 +1,35 @@
package dao package search
import ( import (
"fmt"
"github.com/Masterminds/semver/v3" "github.com/Masterminds/semver/v3"
"github.com/rocboss/paopao-ce/internal/conf"
"github.com/rocboss/paopao-ce/internal/core" "github.com/rocboss/paopao-ce/internal/core"
"github.com/rocboss/paopao-ce/internal/model" "github.com/rocboss/paopao-ce/internal/model"
"github.com/rocboss/paopao-ce/pkg/json" "github.com/rocboss/paopao-ce/pkg/json"
"github.com/rocboss/paopao-ce/pkg/types"
"github.com/rocboss/paopao-ce/pkg/zinc" "github.com/rocboss/paopao-ce/pkg/zinc"
"github.com/sirupsen/logrus" "github.com/sirupsen/logrus"
) )
func newZincTweetSearchServant() (*zincTweetSearchServant, versionInfo) { var (
s := conf.ZincSetting _ core.TweetSearchService = (*zincTweetSearchServant)(nil)
zts := &zincTweetSearchServant{ _ core.VersionInfo = (*zincTweetSearchServant)(nil)
tweetSearchFilter: tweetSearchFilter{ )
ams: newAuthorizationManageService(),
}, type zincTweetSearchServant struct {
indexName: s.Index, tweetSearchFilter
client: zinc.NewClient(s),
publicFilter: fmt.Sprintf("visibility:%d", model.PostVisitPublic),
privateFilter: fmt.Sprintf("visibility:%d AND user_id:%%d", model.PostVisitPrivate),
friendFilter: fmt.Sprintf("visibility:%d", model.PostVisitFriend),
}
zts.createIndex()
return zts, zts indexName string
client *zinc.ZincClient
publicFilter string
privateFilter string
friendFilter string
} }
func (s *zincTweetSearchServant) name() string { func (s *zincTweetSearchServant) Name() string {
return "Zinc" return "Zinc"
} }
func (s *zincTweetSearchServant) version() *semver.Version { func (s *zincTweetSearchServant) Version() *semver.Version {
return semver.MustParse("v0.2.0") return semver.MustParse("v0.2.0")
} }
@ -44,15 +40,15 @@ func (s *zincTweetSearchServant) IndexName() string {
func (s *zincTweetSearchServant) AddDocuments(data core.DocItems, primaryKey ...string) (bool, error) { func (s *zincTweetSearchServant) AddDocuments(data core.DocItems, primaryKey ...string) (bool, error) {
buf := make(core.DocItems, 0, len(data)+1) buf := make(core.DocItems, 0, len(data)+1)
if len(primaryKey) > 0 { if len(primaryKey) > 0 {
buf = append(buf, map[string]interface{}{ buf = append(buf, map[string]types.Any{
"index": map[string]interface{}{ "index": map[string]types.Any{
"_index": s.indexName, "_index": s.indexName,
"_id": primaryKey[0], "_id": primaryKey[0],
}, },
}) })
} else { } else {
buf = append(buf, map[string]interface{}{ buf = append(buf, map[string]types.Any{
"index": map[string]interface{}{ "index": map[string]types.Any{
"_index": s.indexName, "_index": s.indexName,
}, },
}) })
@ -89,9 +85,9 @@ func (s *zincTweetSearchServant) Search(user *model.User, q *core.QueryReq, offs
} }
func (s *zincTweetSearchServant) queryByContent(user *model.User, q *core.QueryReq, offset, limit int) (*core.QueryResp, error) { func (s *zincTweetSearchServant) queryByContent(user *model.User, q *core.QueryReq, offset, limit int) (*core.QueryResp, error) {
resp, err := s.client.EsQuery(s.indexName, map[string]interface{}{ resp, err := s.client.EsQuery(s.indexName, map[string]types.Any{
"query": map[string]interface{}{ "query": map[string]types.Any{
"match_phrase": map[string]interface{}{ "match_phrase": map[string]types.Any{
"content": q.Query, "content": q.Query,
}, },
}, },
@ -106,9 +102,9 @@ func (s *zincTweetSearchServant) queryByContent(user *model.User, q *core.QueryR
} }
func (s *zincTweetSearchServant) queryByTag(user *model.User, q *core.QueryReq, offset, limit int) (*core.QueryResp, error) { func (s *zincTweetSearchServant) queryByTag(user *model.User, q *core.QueryReq, offset, limit int) (*core.QueryResp, error) {
resp, err := s.client.ApiQuery(s.indexName, map[string]interface{}{ resp, err := s.client.ApiQuery(s.indexName, map[string]types.Any{
"search_type": "querystring", "search_type": "querystring",
"query": map[string]interface{}{ "query": map[string]types.Any{
"term": "tags." + q.Query + ":1", "term": "tags." + q.Query + ":1",
}, },
"sort_fields": []string{"-is_top", "-latest_replied_on"}, "sort_fields": []string{"-is_top", "-latest_replied_on"},
@ -122,8 +118,8 @@ func (s *zincTweetSearchServant) queryByTag(user *model.User, q *core.QueryReq,
} }
func (s *zincTweetSearchServant) queryAny(user *model.User, offset, limit int) (*core.QueryResp, error) { func (s *zincTweetSearchServant) queryAny(user *model.User, offset, limit int) (*core.QueryResp, error) {
queryMap := map[string]interface{}{ queryMap := map[string]types.Any{
"query": map[string]interface{}{ "query": map[string]types.Any{
"match_all": map[string]string{}, "match_all": map[string]string{},
}, },
"sort": []string{"-is_top", "-latest_replied_on"}, "sort": []string{"-is_top", "-latest_replied_on"},

@ -0,0 +1,26 @@
package security
import (
"fmt"
"strings"
"github.com/rocboss/paopao-ce/internal/conf"
"github.com/rocboss/paopao-ce/internal/core"
)
type attachmentCheckServant struct {
domain string
}
func (s *attachmentCheckServant) CheckAttachment(uri string) error {
if strings.Index(uri, s.domain) != 0 {
return fmt.Errorf("附件非本站资源")
}
return nil
}
func NewAttachmentCheckService() core.AttachmentCheckService {
return &attachmentCheckServant{
domain: conf.GetOssDomain(),
}
}

@ -0,0 +1,19 @@
// Core service implement base sqlx+postgresql. All sub-service
// will declare here and provide initial function.
package slonik
import (
"github.com/rocboss/paopao-ce/internal/core"
"github.com/sirupsen/logrus"
)
func NewDataService() (core.DataService, core.VersionInfo) {
logrus.Fatal("not support now")
return nil, nil
}
func NewAuthorizationManageService() core.AuthorizationManageService {
logrus.Fatal("not support now")
return nil
}

@ -1,4 +1,4 @@
package dao package storage
import ( import (
"io" "io"
@ -7,33 +7,25 @@ import (
"github.com/Masterminds/semver/v3" "github.com/Masterminds/semver/v3"
"github.com/aliyun/aliyun-oss-go-sdk/oss" "github.com/aliyun/aliyun-oss-go-sdk/oss"
"github.com/rocboss/paopao-ce/internal/conf" "github.com/rocboss/paopao-ce/internal/core"
"github.com/sirupsen/logrus" "github.com/sirupsen/logrus"
) )
func newAliossServent() (*aliossServant, versionInfo) { var (
client, err := oss.New(conf.AliOSSSetting.Endpoint, conf.AliOSSSetting.AccessKeyID, conf.AliOSSSetting.AccessKeySecret) _ core.ObjectStorageService = (*aliossServant)(nil)
if err != nil { _ core.VersionInfo = (*aliossServant)(nil)
logrus.Fatalf("alioss.New err: %v", err) )
}
bucket, err := client.Bucket(conf.AliOSSSetting.Bucket)
if err != nil {
logrus.Fatalf("client.Bucket err: %v", err)
}
obj := &aliossServant{ type aliossServant struct {
bucket: bucket, bucket *oss.Bucket
domain: getOssDomain(), domain string
}
return obj, obj
} }
func (s *aliossServant) name() string { func (s *aliossServant) Name() string {
return "AliOSS" return "AliOSS"
} }
func (s *aliossServant) version() *semver.Version { func (s *aliossServant) Version() *semver.Version {
return semver.MustParse("v0.1.0") return semver.MustParse("v0.1.0")
} }

@ -1,4 +1,4 @@
package dao package storage
import ( import (
"errors" "errors"
@ -10,28 +10,24 @@ import (
"time" "time"
"github.com/Masterminds/semver/v3" "github.com/Masterminds/semver/v3"
"github.com/rocboss/paopao-ce/internal/conf" "github.com/rocboss/paopao-ce/internal/core"
"github.com/sirupsen/logrus"
) )
func newLocalossServent() (*localossServant, versionInfo) { var (
savePath, err := filepath.Abs(conf.LocalOSSSetting.SavePath) _ core.ObjectStorageService = (*localossServant)(nil)
if err != nil { _ core.VersionInfo = (*localossServant)(nil)
logrus.Fatalf("get localOSS save path err: %v", err) )
}
obj := &localossServant{ type localossServant struct {
savePath: savePath + "/" + conf.LocalOSSSetting.Bucket + "/", savePath string
domain: getOssDomain(), domain string
}
return obj, obj
} }
func (s *localossServant) name() string { func (s *localossServant) Name() string {
return "LocalOSS" return "LocalOSS"
} }
func (s *localossServant) version() *semver.Version { func (s *localossServant) Version() *semver.Version {
return semver.MustParse("v0.1.0") return semver.MustParse("v0.1.0")
} }

@ -1,4 +1,4 @@
package dao package storage
import ( import (
"context" "context"
@ -9,34 +9,28 @@ import (
"github.com/Masterminds/semver/v3" "github.com/Masterminds/semver/v3"
"github.com/minio/minio-go/v7" "github.com/minio/minio-go/v7"
"github.com/minio/minio-go/v7/pkg/credentials" "github.com/rocboss/paopao-ce/internal/core"
"github.com/rocboss/paopao-ce/internal/conf"
"github.com/sirupsen/logrus" "github.com/sirupsen/logrus"
) )
func newMinioServeant() (*minioServant, versionInfo) { var (
// Initialize minio client object. _ core.ObjectStorageService = (*minioServant)(nil)
client, err := minio.New(conf.MinIOSetting.Endpoint, &minio.Options{ _ core.VersionInfo = (*minioServant)(nil)
Creds: credentials.NewStaticV4(conf.MinIOSetting.AccessKey, conf.MinIOSetting.SecretKey, ""), )
Secure: conf.MinIOSetting.Secure,
})
if err != nil {
logrus.Fatalf("minio.New err: %v", err)
}
obj := &minioServant{ type minioServant struct {
client: client, client *minio.Client
bucket: conf.MinIOSetting.Bucket, bucket string
domain: getOssDomain(), domain string
}
return obj, obj
} }
func (s *minioServant) name() string { type s3Servant = minioServant
func (s *minioServant) Name() string {
return "MinIO" return "MinIO"
} }
func (s *minioServant) version() *semver.Version { func (s *minioServant) Version() *semver.Version {
return semver.MustParse("v0.1.0") return semver.MustParse("v0.1.0")
} }

@ -0,0 +1,79 @@
package storage
import (
"path/filepath"
"github.com/aliyun/aliyun-oss-go-sdk/oss"
"github.com/minio/minio-go/v7"
"github.com/minio/minio-go/v7/pkg/credentials"
"github.com/rocboss/paopao-ce/internal/conf"
"github.com/rocboss/paopao-ce/internal/core"
"github.com/sirupsen/logrus"
)
func NewAliossService() (core.ObjectStorageService, core.VersionInfo) {
client, err := oss.New(conf.AliOSSSetting.Endpoint, conf.AliOSSSetting.AccessKeyID, conf.AliOSSSetting.AccessKeySecret)
if err != nil {
logrus.Fatalf("alioss.New err: %v", err)
}
bucket, err := client.Bucket(conf.AliOSSSetting.Bucket)
if err != nil {
logrus.Fatalf("client.Bucket err: %v", err)
}
obj := &aliossServant{
bucket: bucket,
domain: conf.GetOssDomain(),
}
return obj, obj
}
func NewLocalossService() (core.ObjectStorageService, core.VersionInfo) {
savePath, err := filepath.Abs(conf.LocalOSSSetting.SavePath)
if err != nil {
logrus.Fatalf("get localOSS save path err: %v", err)
}
obj := &localossServant{
savePath: savePath + "/" + conf.LocalOSSSetting.Bucket + "/",
domain: conf.GetOssDomain(),
}
return obj, obj
}
func NewMinioService() (core.ObjectStorageService, core.VersionInfo) {
// Initialize minio client object.
client, err := minio.New(conf.MinIOSetting.Endpoint, &minio.Options{
Creds: credentials.NewStaticV4(conf.MinIOSetting.AccessKey, conf.MinIOSetting.SecretKey, ""),
Secure: conf.MinIOSetting.Secure,
})
if err != nil {
logrus.Fatalf("minio.New err: %v", err)
}
ms := &minioServant{
client: client,
bucket: conf.MinIOSetting.Bucket,
domain: conf.GetOssDomain(),
}
return ms, ms
}
func NewS3Service() (core.ObjectStorageService, core.VersionInfo) {
// Initialize s3 client object use minio-go.
client, err := minio.New(conf.S3Setting.Endpoint, &minio.Options{
Creds: credentials.NewStaticV4(conf.S3Setting.AccessKey, conf.S3Setting.SecretKey, ""),
Secure: conf.S3Setting.Secure,
})
if err != nil {
logrus.Fatalf("s3.New err: %v", err)
}
obj := &s3Servant{
client: client,
bucket: conf.MinIOSetting.Bucket,
domain: conf.GetOssDomain(),
}
return obj, obj
}

@ -1,45 +0,0 @@
package dao
import (
"github.com/rocboss/paopao-ce/internal/model"
"gorm.io/gorm"
)
func (d *dataServant) CreateTag(tag *model.Tag) (*model.Tag, error) {
return d.createTag(d.engine, tag)
}
func (d *dataServant) DeleteTag(tag *model.Tag) error {
return d.deleteTag(d.engine, tag)
}
func (d *dataServant) GetTags(conditions *model.ConditionsT, offset, limit int) ([]*model.Tag, error) {
return (&model.Tag{}).List(d.engine, conditions, offset, limit)
}
func (d *dataServant) createTag(db *gorm.DB, tag *model.Tag) (*model.Tag, error) {
t, err := tag.Get(d.engine)
if err != nil {
tag.QuoteNum = 1
return tag.Create(db)
}
// 更新
t.QuoteNum++
err = t.Update(db)
if err != nil {
return nil, err
}
return t, nil
}
func (d *dataServant) deleteTag(db *gorm.DB, tag *model.Tag) error {
tag, err := tag.Get(db)
if err != nil {
return err
}
tag.QuoteNum--
return tag.Update(db)
}

@ -1,165 +0,0 @@
package dao
import (
"errors"
"fmt"
"math/rand"
"net/http"
"strconv"
"strings"
"time"
"github.com/rocboss/paopao-ce/internal/conf"
"github.com/rocboss/paopao-ce/internal/model"
"github.com/rocboss/paopao-ce/pkg/json"
"gopkg.in/resty.v1"
)
type JuhePhoneCaptchaRsp struct {
ErrorCode int `json:"error_code"`
Reason string `json:"reason"`
}
// 根据用户ID获取用户
func (d *dataServant) GetUserByID(id int64) (*model.User, error) {
user := &model.User{
Model: &model.Model{
ID: id,
},
}
return user.Get(d.engine)
}
// 根据用户名获取用户
func (d *dataServant) GetUserByUsername(username string) (*model.User, error) {
user := &model.User{
Username: username,
}
return user.Get(d.engine)
}
// 根据手机号获取用户
func (d *dataServant) GetUserByPhone(phone string) (*model.User, error) {
user := &model.User{
Phone: phone,
}
return user.Get(d.engine)
}
// 根据IDs获取用户列表
func (d *dataServant) GetUsersByIDs(ids []int64) ([]*model.User, error) {
user := &model.User{}
return user.List(d.engine, &model.ConditionsT{
"id IN ?": ids,
}, 0, 0)
}
// 根据关键词模糊获取用户列表
func (d *dataServant) GetUsersByKeyword(keyword string) ([]*model.User, error) {
user := &model.User{}
keyword = strings.Trim(keyword, " ") + "%"
if keyword == "%" {
return user.List(d.engine, &model.ConditionsT{
"ORDER": "id ASC",
}, 0, 6)
} else {
return user.List(d.engine, &model.ConditionsT{
"username LIKE ?": keyword,
}, 0, 6)
}
}
// 根据关键词模糊获取用户列表
func (d *dataServant) GetTagsByKeyword(keyword string) ([]*model.Tag, error) {
tag := &model.Tag{}
keyword = "%" + strings.Trim(keyword, " ") + "%"
if keyword == "%%" {
return tag.List(d.engine, &model.ConditionsT{
"ORDER": "quote_num DESC",
}, 0, 6)
} else {
return tag.List(d.engine, &model.ConditionsT{
"tag LIKE ?": keyword,
"ORDER": "quote_num DESC",
}, 0, 6)
}
}
// 创建用户
func (d *dataServant) CreateUser(user *model.User) (*model.User, error) {
return user.Create(d.engine)
}
// 更新用户
func (d *dataServant) UpdateUser(user *model.User) error {
return user.Update(d.engine)
}
// 获取最新短信验证码
func (d *dataServant) GetLatestPhoneCaptcha(phone string) (*model.Captcha, error) {
return (&model.Captcha{
Phone: phone,
}).Get(d.engine)
}
// 更新短信验证码
func (d *dataServant) UsePhoneCaptcha(captcha *model.Captcha) error {
captcha.UseTimes++
return captcha.Update(d.engine)
}
// 发送短信验证码
func (d *dataServant) SendPhoneCaptcha(phone string) error {
rand.Seed(time.Now().UnixNano())
captcha := rand.Intn(900000) + 100000
m := 5
gateway := "https://v.juhe.cn/sms/send"
client := resty.New()
client.DisableWarn = true
resp, err := client.R().
SetFormData(map[string]string{
"mobile": phone,
"tpl_id": conf.SmsJuheSetting.TplID,
"tpl_value": fmt.Sprintf(conf.SmsJuheSetting.TplVal, captcha, m),
"key": conf.SmsJuheSetting.Key,
}).Post(gateway)
if err != nil {
return err
}
if resp.StatusCode() != http.StatusOK {
return errors.New(resp.Status())
}
result := &JuhePhoneCaptchaRsp{}
err = json.Unmarshal(resp.Body(), result)
if err != nil {
return err
}
if result.ErrorCode != 0 {
return errors.New(result.Reason)
}
// 写入表
captchaModel := &model.Captcha{
Phone: phone,
Captcha: strconv.Itoa(captcha),
ExpiredOn: time.Now().Add(time.Minute * time.Duration(m)).Unix(),
}
captchaModel.Create(d.engine)
return nil
}
func (d *dataServant) IsFriend(_userID int64, _friendID int64) bool {
// TODO: you are friend in all now
return true
}

@ -1,29 +0,0 @@
package dao
import (
"github.com/rocboss/paopao-ce/internal/conf"
)
func getOssDomain() string {
uri := "https://"
if conf.CfgIf("AliOSS") {
return uri + conf.AliOSSSetting.Domain + "/"
} else if conf.CfgIf("MinIO") {
if !conf.MinIOSetting.Secure {
uri = "http://"
}
return uri + conf.MinIOSetting.Domain + "/" + conf.MinIOSetting.Bucket + "/"
} else if conf.CfgIf("S3") {
if !conf.S3Setting.Secure {
uri = "http://"
}
// TODO: will not work well need test in real world
return uri + conf.S3Setting.Domain + "/" + conf.S3Setting.Bucket + "/"
} else if conf.CfgIf("LocalOSS") {
if !conf.LocalOSSSetting.Secure {
uri = "http://"
}
return uri + conf.LocalOSSSetting.Domain + "/oss/" + conf.LocalOSSSetting.Bucket + "/"
}
return uri + conf.AliOSSSetting.Domain + "/"
}

@ -1,11 +0,0 @@
package dao
import (
"github.com/Masterminds/semver/v3"
)
// versionInfo 版本信息
type versionInfo interface {
name() string
version() *semver.Version
}

@ -5,15 +5,16 @@ import "gorm.io/gorm"
type MessageT int8 type MessageT int8
const ( const (
MESSAGE_POST MessageT = iota + 1 MsgTypePost MessageT = iota + 1
MESSAGE_COMMENT MsgtypeComment
MESSAGE_REPLY MsgTypeReply
MESSAGE_WHISPER MsgTypeWhisper
MsgTypeRequestingFriend
MsgTypeSystem MessageT = 99
MsgStatusUnread = 0
MsgStatusReaded = 1
) )
const MESSAGE_SYSTEM MessageT = 99
const MSG_UNREAD = 0
const MSG_READED = 1
type Message struct { type Message struct {
*Model *Model

@ -161,6 +161,16 @@ func (p *Post) Fetch(db *gorm.DB, predicates Predicates, offset, limit int) ([]*
return posts, nil return posts, nil
} }
func (p *Post) CountBy(db *gorm.DB, predicates Predicates) (count int64, err error) {
for query, args := range predicates {
if query != "ORDER" {
db = db.Where(query, args...)
}
}
err = db.Model(p).Count(&count).Error
return
}
func (p *Post) Count(db *gorm.DB, conditions *ConditionsT) (int64, error) { func (p *Post) Count(db *gorm.DB, conditions *ConditionsT) (int64, error) {
var count int64 var count int64
if p.UserID > 0 { if p.UserID > 0 {

@ -0,0 +1,8 @@
package rest
import "github.com/rocboss/paopao-ce/internal/model"
type IndexTweetsResp struct {
Tweets []*model.PostFormated
Total int64
}

@ -6,11 +6,9 @@ import (
) )
var ( var (
objectStorage core.ObjectStorageService objectStorage core.ObjectStorageService
attachmentChecker core.AttachmentCheckService
) )
func Initialize() { func Initialize() {
objectStorage = dao.NewObjectStorageService() objectStorage = dao.NewObjectStorageService()
attachmentChecker = dao.NewAttachmentCheckService()
} }

@ -81,7 +81,7 @@ func SendUserWhisper(c *gin.Context) {
_, err := service.CreateWhisper(c, &model.Message{ _, err := service.CreateWhisper(c, &model.Message{
SenderUserID: userID.(int64), SenderUserID: userID.(int64),
ReceiverUserID: param.UserID, ReceiverUserID: param.UserID,
Type: model.MESSAGE_WHISPER, Type: model.MsgTypeWhisper,
Brief: "给你发送新私信了", Brief: "给你发送新私信了",
Content: param.Content, Content: param.Content,
}) })

@ -25,18 +25,14 @@ func GetPostList(c *gin.Context) {
user, _ := userFrom(c) user, _ := userFrom(c)
offset, limit := app.GetPageOffset(c) offset, limit := app.GetPageOffset(c)
if q.Query == "" && q.Type == "search" { if q.Query == "" && q.Type == "search" {
posts, err := service.GetIndexPosts(user, offset, limit) resp, err := service.GetIndexPosts(user, offset, limit)
if err != nil { if err != nil {
logrus.Errorf("service.GetPostList err: %v\n", err) logrus.Errorf("service.GetPostList err: %v\n", err)
response.ToErrorResponse(errcode.GetPostsFailed) response.ToErrorResponse(errcode.GetPostsFailed)
return return
} }
totalRows, _ := service.GetPostCount(&model.ConditionsT{
"visibility IN ?": []model.PostVisibleT{model.PostVisitPublic, model.PostVisitFriend},
"ORDER": "latest_replied_on DESC",
})
response.ToResponseList(posts, totalRows) response.ToResponseList(resp.Tweets, resp.Total)
} else { } else {
posts, totalRows, err := service.GetPostListFromSearch(user, q, offset, limit) posts, totalRows, err := service.GetPostListFromSearch(user, q, offset, limit)

@ -199,20 +199,18 @@ func ChangeAvatar(c *gin.Context) {
return return
} }
user := &model.User{} user, exist := userFrom(c)
if u, exists := c.Get("USER"); exists { if !exist {
user = u.(*model.User) response.ToErrorResponse(errcode.UnauthorizedTokenError)
}
if err := attachmentChecker.CheckAttachment(param.Avatar); err != nil {
response.ToErrorResponse(errcode.InvalidParams)
return return
} }
// 执行绑定 // 执行绑定
user.Avatar = param.Avatar user.Avatar = param.Avatar
service.UpdateUserInfo(user) if err := service.UpdateUserInfo(user); err != nil {
response.ToErrorResponse(err)
return
}
response.ToResponse(nil) response.ToResponse(nil)
} }

@ -144,7 +144,7 @@ func CreatePostComment(ctx *gin.Context, userID int64, param CommentCreationReq)
go ds.CreateMessage(&model.Message{ go ds.CreateMessage(&model.Message{
SenderUserID: userID, SenderUserID: userID,
ReceiverUserID: postMaster.ID, ReceiverUserID: postMaster.ID,
Type: model.MESSAGE_COMMENT, Type: model.MsgtypeComment,
Brief: "在泡泡中评论了你", Brief: "在泡泡中评论了你",
PostID: post.ID, PostID: post.ID,
CommentID: comment.ID, CommentID: comment.ID,
@ -160,7 +160,7 @@ func CreatePostComment(ctx *gin.Context, userID int64, param CommentCreationReq)
go ds.CreateMessage(&model.Message{ go ds.CreateMessage(&model.Message{
SenderUserID: userID, SenderUserID: userID,
ReceiverUserID: user.ID, ReceiverUserID: user.ID,
Type: model.MESSAGE_COMMENT, Type: model.MsgtypeComment,
Brief: "在泡泡评论中@了你", Brief: "在泡泡评论中@了你",
PostID: post.ID, PostID: post.ID,
CommentID: comment.ID, CommentID: comment.ID,
@ -259,7 +259,7 @@ func CreatePostCommentReply(ctx *gin.Context, commentID int64, content string, u
go ds.CreateMessage(&model.Message{ go ds.CreateMessage(&model.Message{
SenderUserID: userID, SenderUserID: userID,
ReceiverUserID: commentMaster.ID, ReceiverUserID: commentMaster.ID,
Type: model.MESSAGE_REPLY, Type: model.MsgTypeReply,
Brief: "在泡泡评论下回复了你", Brief: "在泡泡评论下回复了你",
PostID: post.ID, PostID: post.ID,
CommentID: comment.ID, CommentID: comment.ID,
@ -271,7 +271,7 @@ func CreatePostCommentReply(ctx *gin.Context, commentID int64, content string, u
go ds.CreateMessage(&model.Message{ go ds.CreateMessage(&model.Message{
SenderUserID: userID, SenderUserID: userID,
ReceiverUserID: postMaster.ID, ReceiverUserID: postMaster.ID,
Type: model.MESSAGE_REPLY, Type: model.MsgTypeReply,
Brief: "在泡泡评论下发布了新回复", Brief: "在泡泡评论下发布了新回复",
PostID: post.ID, PostID: post.ID,
CommentID: comment.ID, CommentID: comment.ID,
@ -285,7 +285,7 @@ func CreatePostCommentReply(ctx *gin.Context, commentID int64, content string, u
go ds.CreateMessage(&model.Message{ go ds.CreateMessage(&model.Message{
SenderUserID: userID, SenderUserID: userID,
ReceiverUserID: user.ID, ReceiverUserID: user.ID,
Type: model.MESSAGE_REPLY, Type: model.MsgTypeReply,
Brief: "在泡泡评论的回复中@了你", Brief: "在泡泡评论的回复中@了你",
PostID: post.ID, PostID: post.ID,
CommentID: comment.ID, CommentID: comment.ID,

@ -11,6 +11,7 @@ import (
"github.com/rocboss/paopao-ce/internal/conf" "github.com/rocboss/paopao-ce/internal/conf"
"github.com/rocboss/paopao-ce/internal/core" "github.com/rocboss/paopao-ce/internal/core"
"github.com/rocboss/paopao-ce/internal/model" "github.com/rocboss/paopao-ce/internal/model"
"github.com/rocboss/paopao-ce/internal/model/rest"
"github.com/rocboss/paopao-ce/pkg/errcode" "github.com/rocboss/paopao-ce/pkg/errcode"
"github.com/rocboss/paopao-ce/pkg/util" "github.com/rocboss/paopao-ce/pkg/util"
"github.com/sirupsen/logrus" "github.com/sirupsen/logrus"
@ -166,7 +167,7 @@ func CreatePost(c *gin.Context, userID int64, param PostCreationReq) (*model.Pos
go ds.CreateMessage(&model.Message{ go ds.CreateMessage(&model.Message{
SenderUserID: userID, SenderUserID: userID,
ReceiverUserID: user.ID, ReceiverUserID: user.ID,
Type: model.MESSAGE_POST, Type: model.MsgTypePost,
Brief: "在新发布的泡泡动态中@了你", Brief: "在新发布的泡泡动态中@了你",
PostID: post.ID, PostID: post.ID,
}) })
@ -402,7 +403,7 @@ func GetPostContentByID(id int64) (*model.PostContent, error) {
return ds.GetPostContentByID(id) return ds.GetPostContentByID(id)
} }
func GetIndexPosts(user *model.User, offset int, limit int) ([]*model.PostFormated, error) { func GetIndexPosts(user *model.User, offset int, limit int) (*rest.IndexTweetsResp, error) {
return ds.IndexPosts(user, offset, limit) return ds.IndexPosts(user, offset, limit)
} }

@ -260,8 +260,15 @@ func GetUserByUsername(username string) (*model.User, error) {
} }
// UpdateUserInfo 更新用户信息 // UpdateUserInfo 更新用户信息
func UpdateUserInfo(user *model.User) error { func UpdateUserInfo(user *model.User) *errcode.Error {
return ds.UpdateUser(user) err := ds.CheckAttachment(user.Avatar)
if err != nil {
return errcode.InvalidParams
}
if err = ds.UpdateUser(user); err != nil {
return errcode.ServerError
}
return nil
} }
// GetUserCollections 获取用户收藏列表 // GetUserCollections 获取用户收藏列表

Loading…
Cancel
Save