From 4bacfa8e5e26e143a32e8f64eb0ffb15dee7bd78 Mon Sep 17 00:00:00 2001 From: Michael Li Date: Fri, 25 Aug 2023 21:02:33 +0800 Subject: [PATCH] add LoggerOpenObserve feature --- CHANGELOG.md | 22 +++++- README.md | 1 + features-status.md | 4 ++ internal/conf/conf.go | 14 ++-- internal/conf/config.yaml | 9 +++ internal/conf/logger.go | 7 +- internal/conf/logger_observe.go | 71 +++++++++++++++++++ internal/conf/setting.go | 11 +++ internal/dao/search/bridge.go | 2 +- pkg/http/client.go | 120 ++++++++++++++++++++++++++++++++ pkg/http/http.go | 2 + pkg/obx/obx.go | 80 +++++++++++++++++++++ 12 files changed, 334 insertions(+), 9 deletions(-) create mode 100644 internal/conf/logger_observe.go create mode 100644 pkg/http/client.go create mode 100644 pkg/obx/obx.go diff --git a/CHANGELOG.md b/CHANGELOG.md index eeb5499e..07ace981 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,26 @@ All notable changes to paopao-ce are documented in this file. ## 0.5.0+dev ([`dev`](https://github.com/rocboss/paopao-ce/tree/dev)) +### Added +- add `LoggerOpenObserve` feature use OpenObserve to collect log.[#370](https://github.com/rocboss/paopao-ce/pull/370) + add `LoggerOpenObserve` to `conf.yaml` 's `Features` section to enable this feature like below: + ```yaml + # file config.yaml + ... + Features: + Default: ["Base", "Postgres", "Meili", "LocalOSS", "LoggerOpenObserve", "BigCacheIndex", "web"] + LoggerOpenObserve: # 使用OpenObserve写日志 + Host: 127.0.0.1:5080 + Organization: paopao-ce + Stream: default + User: root@paopao.info + Password: tiFEI8UeJWuYA7kN + Secure: False + MinWorker: 5 # 最小后台工作者, 设置范围[5, 100], 默认5 + MaxLogBuffer: 100 # 最大log缓存条数, 设置范围[10, 10000], 默认100 + ... + ``` + ## 0.4.1 ### Changed - infinite scrolling instead of pagination for Home/User/Profile page @@ -19,7 +39,7 @@ All notable changes to paopao-ce are documented in this file. ``` - add user highlight tweet support include custom tweet set to highlight and list in user/profile page. - add cli subcommand to start paopao-ce serve or other task. [#354](https://github.com/rocboss/paopao-ce/pull/354) -- add `Friendship` feature . [#355](https://github.com/rocboss/paopao-ce/pull/355) +- add `Followship` feature . [#355](https://github.com/rocboss/paopao-ce/pull/355) migration database first(sql ddl file in `scripts/migration/**/*_user_following.up.sql`): ```sql DROP TABLE IF EXISTS p_following; diff --git a/README.md b/README.md index 8b6832cc..8d3be694 100644 --- a/README.md +++ b/README.md @@ -373,6 +373,7 @@ release/paopao serve --no-default-features --features sqlite3,localoss,loggerfil |`LoggerFile` | 日志 | 稳定 | 使用文件写日志 | |`LoggerZinc` | 日志 | 稳定(推荐) | 使用[Zinc](https://github.com/zinclabs/zinc)写日志 | |`LoggerMeili` | 日志 | 内测 | 使用[Meilisearch](https://github.com/meilisearch/meilisearch)写日志 | +|`LoggerOpenObserve` | 日志 | 内测 | 使用[OpenObserve](https://github.com/openobserve/openobserve)写日志 | |[`Friendship`](docs/proposal/22110410-关于Friendship功能项的设计.md) | 关系模式 | 内置 Builtin | 弱关系好友模式,类似微信朋友圈 | |[`Followship`](docs/proposal/22110409-关于Followship功能项的设计.md) | 关系模式 | 内置 Builtin | 关注者模式,类似Twitter的Follow模式 | |[`Lightship`](docs/proposal/22121409-关于Lightship功能项的设计.md) | 关系模式 | 弃用 Deprecated | 开放模式,所有推文都公开可见 | diff --git a/features-status.md b/features-status.md index e630c77d..b8b0dafc 100644 --- a/features-status.md +++ b/features-status.md @@ -137,6 +137,10 @@ * [ ] 提按文档 * [x] 接口定义 * [x] 业务逻辑实现 +* `LoggerOpenObserve` 使用[OpenObserve](https://github.com/openobserve/openobserve)写日志(目前状态: 内测阶段); + * [ ] 提按文档 + * [x] 接口定义 + * [x] 业务逻辑实现 #### 监控: * `Sentry` 使用Sentry进行错误跟踪与性能监控(目前状态: 内测); diff --git a/internal/conf/conf.go b/internal/conf/conf.go index a0dc88df..645189d4 100644 --- a/internal/conf/conf.go +++ b/internal/conf/conf.go @@ -12,12 +12,13 @@ import ( ) var ( - loggerSetting *loggerConf - loggerFileSetting *loggerFileConf - loggerZincSetting *loggerZincConf - loggerMeiliSetting *loggerMeiliConf - sentrySetting *sentryConf - redisSetting *redisConf + loggerSetting *loggerConf + loggerFileSetting *loggerFileConf + loggerZincSetting *loggerZincConf + loggerMeiliSetting *loggerMeiliConf + loggerOpenObserveSetting *loggerOpenObserveConf + sentrySetting *sentryConf + redisSetting *redisConf PyroscopeSetting *pyroscopeConf DatabaseSetting *databaseConf @@ -89,6 +90,7 @@ func setupSetting(suite []string, noDefault bool) error { "LoggerFile": &loggerFileSetting, "LoggerZinc": &loggerZincSetting, "LoggerMeili": &loggerMeiliSetting, + "LoggerOpenObserve": &loggerOpenObserveSetting, "Database": &DatabaseSetting, "MySQL": &MysqlSetting, "Postgres": &PostgresSetting, diff --git a/internal/conf/config.yaml b/internal/conf/config.yaml index fc86dfd4..a4deda16 100644 --- a/internal/conf/config.yaml +++ b/internal/conf/config.yaml @@ -114,6 +114,15 @@ LoggerMeili: # 使用Meili写日志 Secure: False MinWorker: 5 # 最小后台工作者, 设置范围[5, 100], 默认5 MaxLogBuffer: 100 # 最大log缓存条数, 设置范围[10, 10000], 默认100 +LoggerOpenObserve: # 使用OpenObserve写日志 + Host: 127.0.0.1:5080 + Organization: paopao-ce + Stream: default + User: root@paopao.info + Password: tiFEI8UeJWuYA7kN + Secure: False + MinWorker: 5 # 最小后台工作者, 设置范围[5, 100], 默认5 + MaxLogBuffer: 100 # 最大log缓存条数, 设置范围[10, 10000], 默认100 JWT: # 鉴权加密 Secret: 18a6413dc4fe394c66345ebe501b2f26 Issuer: paopao-api diff --git a/internal/conf/logger.go b/internal/conf/logger.go index f6d08723..dac2e580 100644 --- a/internal/conf/logger.go +++ b/internal/conf/logger.go @@ -28,7 +28,7 @@ func setupLogger() { logrus.SetFormatter(&logrus.JSONFormatter{}) logrus.SetLevel(loggerSetting.logLevel()) - cfg.In(cfg.Actions{ + cfg.On(cfg.Actions{ "LoggerFile": func() { out := newFileLogger() logrus.SetOutput(out) @@ -43,6 +43,11 @@ func setupLogger() { logrus.SetOutput(io.Discard) logrus.AddHook(hook) }, + "LoggerOpenObserve": func() { + hook := newObserveLogHook() + logrus.SetOutput(io.Discard) + logrus.AddHook(hook) + }, }) } diff --git a/internal/conf/logger_observe.go b/internal/conf/logger_observe.go new file mode 100644 index 00000000..6297fa82 --- /dev/null +++ b/internal/conf/logger_observe.go @@ -0,0 +1,71 @@ +// Copyright 2023 ROC. All rights reserved. +// Use of this source code is governed by a MIT style +// license that can be found in the LICENSE file. + +package conf + +import ( + "log" + "net/http" + "time" + + hx "github.com/rocboss/paopao-ce/pkg/http" + "github.com/rocboss/paopao-ce/pkg/json" + "github.com/rocboss/paopao-ce/pkg/obx" + "github.com/sirupsen/logrus" +) + +type observeLogData struct { + Time time.Time `json:"time"` + Level logrus.Level `json:"level"` + Message string `json:"message"` + Data logrus.Fields `json:"data"` +} + +type observeLogHook struct { + client obx.OpenObserveClient +} + +func (h *observeLogHook) Fire(entry *logrus.Entry) error { + info := []observeLogData{{ + Time: entry.Time, + Level: entry.Level, + Message: entry.Message, + Data: entry.Data, + }} + data, _ := json.Marshal(info) + h.client.LogJson(data) + return nil +} + +func (h *observeLogHook) Levels() []logrus.Level { + return logrus.AllLevels +} + +func newObserveLogHook() *observeLogHook { + s := loggerOpenObserveSetting + obc := &obx.Config{ + Host: s.Host, + User: s.User, + Password: s.Password, + Organization: s.Organization, + Stream: s.Stream, + Secure: s.Secure, + } + acc := &hx.AsyncClientConf{ + MinWorker: s.MinWorker, + MaxRequestInCh: s.MaxLogBuffer, + MaxRequestInTempCh: 100, + MaxTickCount: 60, + TickWaitTime: time.Second, + } + return &observeLogHook{ + client: obx.NewClient(obc, acc, func(req *http.Request, resp *http.Response, err error) { + if err == nil && resp != nil && resp.Body != nil { + resp.Body.Close() + } else if err != nil { + log.Printf("logrus use observe do LogJson error: %s", err) + } + }), + } +} diff --git a/internal/conf/setting.go b/internal/conf/setting.go index ccb05cbf..a210cbb1 100644 --- a/internal/conf/setting.go +++ b/internal/conf/setting.go @@ -63,6 +63,17 @@ type loggerMeiliConf struct { MinWorker int } +type loggerOpenObserveConf struct { + Host string + Organization string + Stream string + User string + Password string + Secure bool + MaxLogBuffer int + MinWorker int +} + type httpServerConf struct { RunMode string HttpIp string diff --git a/internal/dao/search/bridge.go b/internal/dao/search/bridge.go index bc4eb039..6c9ed164 100644 --- a/internal/dao/search/bridge.go +++ b/internal/dao/search/bridge.go @@ -65,7 +65,7 @@ func (s *bridgeTweetSearchServant) updateDocs(doc *documents) { // watch updateDocsTempch to continue handle update if needed. // cancel loop if no item had watched in 1 minute. - for count := 0; count > 60; count++ { + for count := 0; count < 60; count++ { select { case item := <-s.updateDocsTempCh: // reset count to continue handle docs update diff --git a/pkg/http/client.go b/pkg/http/client.go new file mode 100644 index 00000000..d65bd116 --- /dev/null +++ b/pkg/http/client.go @@ -0,0 +1,120 @@ +// Copyright 2023 Michael Li . All rights reserved. +// Use of this source code is governed by Apache License 2.0 that +// can be found in the LICENSE file. + +package http + +import ( + "net/http" + "time" + + "github.com/sirupsen/logrus" +) + +var ( + _ AsyncClient = (*wormClient)(nil) +) + +const ( + _minRequestInCh = 10 + _minRequestInTmpCh = 10 + _minWorker = 5 +) + +// ResponseFn a function used handle the response of http.Client.Do +type ResponseFn = func(req *http.Request, resp *http.Response, err error) + +// AsyncClient asynchronous client interface +type AsyncClient interface { + Do(req *http.Request, fn ResponseFn) +} + +// AsyncClientConf client configure used to create an AsynClient instance +type AsyncClientConf struct { + MinWorker int + MaxRequestInCh int + MaxRequestInTempCh int + MaxTickCount int + TickWaitTime time.Duration +} + +type requestItem struct { + request *http.Request + fn ResponseFn +} + +type wormClient struct { + client *http.Client + requestCh chan *requestItem // 正式工 缓存通道 + requestTempCh chan *requestItem // 临时工 缓存通道 + maxTickCount int + tickWaitTime time.Duration +} + +func (s *wormClient) Do(req *http.Request, fn ResponseFn) { + item := &requestItem{req, fn} + select { + case s.requestCh <- item: + logrus.Debugln("simepleClient.Do send request item by requestCh chan") + default: + select { + case s.requestTempCh <- item: + logrus.Debugln("simepleClient.Do send request item by requestTempCh chan") + default: + go func() { + s.do(item) + // watch requestTempCh to continue do work if needed. + // cancel loop if no item had watched in s.maxCyle * s.maxWaitTime. + for count := 0; count < s.maxTickCount; count++ { + select { + case item := <-s.requestTempCh: + // reset count to continue do work + count = 0 + s.do(item) + default: + // sleeping to wait request item pass over to do work + time.Sleep(s.tickWaitTime) + } + } + }() + } + } +} + +func (s *wormClient) starDotWork() { + for item := range s.requestCh { + s.do(item) + } +} + +func (s *wormClient) do(req *requestItem) { + resp, err := s.client.Do(req.request) + req.fn(req.request, resp, err) +} + +// NewAsyncClient create an AsyncClient instance +func NewAsyncClient(client *http.Client, conf *AsyncClientConf) AsyncClient { + maxRequestInCh := _minRequestInCh + maxRequestInTempCh := _minRequestInTmpCh + if conf.MaxRequestInCh > _minRequestInCh { + maxRequestInCh = conf.MaxRequestInCh + } + if conf.MaxRequestInTempCh > _minRequestInTmpCh { + maxRequestInTempCh = conf.MaxRequestInTempCh + } + wc := &wormClient{ + client: client, + requestCh: make(chan *requestItem, maxRequestInCh), + requestTempCh: make(chan *requestItem, maxRequestInTempCh), + } + numWorker := conf.MinWorker + if numWorker < _minWorker { + numWorker = _minWorker + } + logrus.Debugf("use %d backend worker to do the http request", numWorker) + // 启动 do work 正式工 + for ; numWorker > 0; numWorker-- { + go wc.starDotWork() + } + return wc +} diff --git a/pkg/http/http.go b/pkg/http/http.go index efb096d7..08876448 100644 --- a/pkg/http/http.go +++ b/pkg/http/http.go @@ -2,4 +2,6 @@ // Use of this source code is governed by Apache License 2.0 that // can be found in the LICENSE file. +// package http contain some custom help function for std http library. + package http diff --git a/pkg/obx/obx.go b/pkg/obx/obx.go new file mode 100644 index 00000000..7ae8277e --- /dev/null +++ b/pkg/obx/obx.go @@ -0,0 +1,80 @@ +// Copyright 2023 ROC. All rights reserved. +// Use of this source code is governed by a MIT style +// license that can be found in the LICENSE file. + +// package obx contain some help function for OpenObserve. +package obx + +import ( + "bytes" + "net/http" + + hx "github.com/rocboss/paopao-ce/pkg/http" +) + +var ( + _ OpenObserveClient = (*obxClient)(nil) +) + +const ( + _userAgent = "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_4) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/81.0.4044.138 Safari/537.36" +) + +// OpenObserveClient OpenObserve client interface +type OpenObserveClient interface { + LogJson(data []byte) +} + +// Config confiugre used for create a OpenObserveClient instance +type Config struct { + Host string + User string + Password string + Organization string + Stream string + UserAgent string + Secure bool +} + +type obxClient struct { + endpoint string + user string + password string + userAgent string + respFn hx.ResponseFn + client hx.AsyncClient +} + +func (c *Config) Endpoint() string { + schema := "http" + if c.Secure { + schema = "https" + } + return schema + "://" + c.Host + "/api/" + c.Organization + "/" + c.Stream + "/_json" +} + +func (s *obxClient) LogJson(data []byte) { + req, err := http.NewRequest("POST", s.endpoint, bytes.NewReader(data)) + if err != nil { + s.respFn(nil, nil, err) + } + req.SetBasicAuth(s.user, s.password) + req.Header.Set("Content-Type", "application/json") + req.Header.Set("User-Agent", s.userAgent) + s.client.Do(req, s.respFn) +} + +// NewClient create OpenObserve client instance +func NewClient(conf *Config, acc *hx.AsyncClientConf, fn hx.ResponseFn) OpenObserveClient { + userAgent := _userAgent + if conf.UserAgent != "" { + userAgent = conf.UserAgent + } + return &obxClient{ + endpoint: conf.Endpoint(), + user: conf.User, + password: conf.Password, + userAgent: userAgent, + client: hx.NewAsyncClient(http.DefaultClient, acc), + } +}