mirror of https://github.com/rocboss/paopao-ce
parent
affeeeb22c
commit
4bacfa8e5e
@ -0,0 +1,71 @@
|
||||
// Copyright 2023 ROC. All rights reserved.
|
||||
// Use of this source code is governed by a MIT style
|
||||
// license that can be found in the LICENSE file.
|
||||
|
||||
package conf
|
||||
|
||||
import (
|
||||
"log"
|
||||
"net/http"
|
||||
"time"
|
||||
|
||||
hx "github.com/rocboss/paopao-ce/pkg/http"
|
||||
"github.com/rocboss/paopao-ce/pkg/json"
|
||||
"github.com/rocboss/paopao-ce/pkg/obx"
|
||||
"github.com/sirupsen/logrus"
|
||||
)
|
||||
|
||||
type observeLogData struct {
|
||||
Time time.Time `json:"time"`
|
||||
Level logrus.Level `json:"level"`
|
||||
Message string `json:"message"`
|
||||
Data logrus.Fields `json:"data"`
|
||||
}
|
||||
|
||||
type observeLogHook struct {
|
||||
client obx.OpenObserveClient
|
||||
}
|
||||
|
||||
func (h *observeLogHook) Fire(entry *logrus.Entry) error {
|
||||
info := []observeLogData{{
|
||||
Time: entry.Time,
|
||||
Level: entry.Level,
|
||||
Message: entry.Message,
|
||||
Data: entry.Data,
|
||||
}}
|
||||
data, _ := json.Marshal(info)
|
||||
h.client.LogJson(data)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (h *observeLogHook) Levels() []logrus.Level {
|
||||
return logrus.AllLevels
|
||||
}
|
||||
|
||||
func newObserveLogHook() *observeLogHook {
|
||||
s := loggerOpenObserveSetting
|
||||
obc := &obx.Config{
|
||||
Host: s.Host,
|
||||
User: s.User,
|
||||
Password: s.Password,
|
||||
Organization: s.Organization,
|
||||
Stream: s.Stream,
|
||||
Secure: s.Secure,
|
||||
}
|
||||
acc := &hx.AsyncClientConf{
|
||||
MinWorker: s.MinWorker,
|
||||
MaxRequestInCh: s.MaxLogBuffer,
|
||||
MaxRequestInTempCh: 100,
|
||||
MaxTickCount: 60,
|
||||
TickWaitTime: time.Second,
|
||||
}
|
||||
return &observeLogHook{
|
||||
client: obx.NewClient(obc, acc, func(req *http.Request, resp *http.Response, err error) {
|
||||
if err == nil && resp != nil && resp.Body != nil {
|
||||
resp.Body.Close()
|
||||
} else if err != nil {
|
||||
log.Printf("logrus use observe do LogJson error: %s", err)
|
||||
}
|
||||
}),
|
||||
}
|
||||
}
|
@ -0,0 +1,120 @@
|
||||
// Copyright 2023 Michael Li <alimy@gility.net>. All rights reserved.
|
||||
// Use of this source code is governed by Apache License 2.0 that
|
||||
// can be found in the LICENSE file.
|
||||
|
||||
package http
|
||||
|
||||
import (
|
||||
"net/http"
|
||||
"time"
|
||||
|
||||
"github.com/sirupsen/logrus"
|
||||
)
|
||||
|
||||
var (
|
||||
_ AsyncClient = (*wormClient)(nil)
|
||||
)
|
||||
|
||||
const (
|
||||
_minRequestInCh = 10
|
||||
_minRequestInTmpCh = 10
|
||||
_minWorker = 5
|
||||
)
|
||||
|
||||
// ResponseFn a function used handle the response of http.Client.Do
|
||||
type ResponseFn = func(req *http.Request, resp *http.Response, err error)
|
||||
|
||||
// AsyncClient asynchronous client interface
|
||||
type AsyncClient interface {
|
||||
Do(req *http.Request, fn ResponseFn)
|
||||
}
|
||||
|
||||
// AsyncClientConf client configure used to create an AsynClient instance
|
||||
type AsyncClientConf struct {
|
||||
MinWorker int
|
||||
MaxRequestInCh int
|
||||
MaxRequestInTempCh int
|
||||
MaxTickCount int
|
||||
TickWaitTime time.Duration
|
||||
}
|
||||
|
||||
type requestItem struct {
|
||||
request *http.Request
|
||||
fn ResponseFn
|
||||
}
|
||||
|
||||
type wormClient struct {
|
||||
client *http.Client
|
||||
requestCh chan *requestItem // 正式工 缓存通道
|
||||
requestTempCh chan *requestItem // 临时工 缓存通道
|
||||
maxTickCount int
|
||||
tickWaitTime time.Duration
|
||||
}
|
||||
|
||||
func (s *wormClient) Do(req *http.Request, fn ResponseFn) {
|
||||
item := &requestItem{req, fn}
|
||||
select {
|
||||
case s.requestCh <- item:
|
||||
logrus.Debugln("simepleClient.Do send request item by requestCh chan")
|
||||
default:
|
||||
select {
|
||||
case s.requestTempCh <- item:
|
||||
logrus.Debugln("simepleClient.Do send request item by requestTempCh chan")
|
||||
default:
|
||||
go func() {
|
||||
s.do(item)
|
||||
// watch requestTempCh to continue do work if needed.
|
||||
// cancel loop if no item had watched in s.maxCyle * s.maxWaitTime.
|
||||
for count := 0; count < s.maxTickCount; count++ {
|
||||
select {
|
||||
case item := <-s.requestTempCh:
|
||||
// reset count to continue do work
|
||||
count = 0
|
||||
s.do(item)
|
||||
default:
|
||||
// sleeping to wait request item pass over to do work
|
||||
time.Sleep(s.tickWaitTime)
|
||||
}
|
||||
}
|
||||
}()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (s *wormClient) starDotWork() {
|
||||
for item := range s.requestCh {
|
||||
s.do(item)
|
||||
}
|
||||
}
|
||||
|
||||
func (s *wormClient) do(req *requestItem) {
|
||||
resp, err := s.client.Do(req.request)
|
||||
req.fn(req.request, resp, err)
|
||||
}
|
||||
|
||||
// NewAsyncClient create an AsyncClient instance
|
||||
func NewAsyncClient(client *http.Client, conf *AsyncClientConf) AsyncClient {
|
||||
maxRequestInCh := _minRequestInCh
|
||||
maxRequestInTempCh := _minRequestInTmpCh
|
||||
if conf.MaxRequestInCh > _minRequestInCh {
|
||||
maxRequestInCh = conf.MaxRequestInCh
|
||||
}
|
||||
if conf.MaxRequestInTempCh > _minRequestInTmpCh {
|
||||
maxRequestInTempCh = conf.MaxRequestInTempCh
|
||||
}
|
||||
wc := &wormClient{
|
||||
client: client,
|
||||
requestCh: make(chan *requestItem, maxRequestInCh),
|
||||
requestTempCh: make(chan *requestItem, maxRequestInTempCh),
|
||||
}
|
||||
numWorker := conf.MinWorker
|
||||
if numWorker < _minWorker {
|
||||
numWorker = _minWorker
|
||||
}
|
||||
logrus.Debugf("use %d backend worker to do the http request", numWorker)
|
||||
// 启动 do work 正式工
|
||||
for ; numWorker > 0; numWorker-- {
|
||||
go wc.starDotWork()
|
||||
}
|
||||
return wc
|
||||
}
|
@ -0,0 +1,80 @@
|
||||
// Copyright 2023 ROC. All rights reserved.
|
||||
// Use of this source code is governed by a MIT style
|
||||
// license that can be found in the LICENSE file.
|
||||
|
||||
// package obx contain some help function for OpenObserve.
|
||||
package obx
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"net/http"
|
||||
|
||||
hx "github.com/rocboss/paopao-ce/pkg/http"
|
||||
)
|
||||
|
||||
var (
|
||||
_ OpenObserveClient = (*obxClient)(nil)
|
||||
)
|
||||
|
||||
const (
|
||||
_userAgent = "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_4) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/81.0.4044.138 Safari/537.36"
|
||||
)
|
||||
|
||||
// OpenObserveClient OpenObserve client interface
|
||||
type OpenObserveClient interface {
|
||||
LogJson(data []byte)
|
||||
}
|
||||
|
||||
// Config confiugre used for create a OpenObserveClient instance
|
||||
type Config struct {
|
||||
Host string
|
||||
User string
|
||||
Password string
|
||||
Organization string
|
||||
Stream string
|
||||
UserAgent string
|
||||
Secure bool
|
||||
}
|
||||
|
||||
type obxClient struct {
|
||||
endpoint string
|
||||
user string
|
||||
password string
|
||||
userAgent string
|
||||
respFn hx.ResponseFn
|
||||
client hx.AsyncClient
|
||||
}
|
||||
|
||||
func (c *Config) Endpoint() string {
|
||||
schema := "http"
|
||||
if c.Secure {
|
||||
schema = "https"
|
||||
}
|
||||
return schema + "://" + c.Host + "/api/" + c.Organization + "/" + c.Stream + "/_json"
|
||||
}
|
||||
|
||||
func (s *obxClient) LogJson(data []byte) {
|
||||
req, err := http.NewRequest("POST", s.endpoint, bytes.NewReader(data))
|
||||
if err != nil {
|
||||
s.respFn(nil, nil, err)
|
||||
}
|
||||
req.SetBasicAuth(s.user, s.password)
|
||||
req.Header.Set("Content-Type", "application/json")
|
||||
req.Header.Set("User-Agent", s.userAgent)
|
||||
s.client.Do(req, s.respFn)
|
||||
}
|
||||
|
||||
// NewClient create OpenObserve client instance
|
||||
func NewClient(conf *Config, acc *hx.AsyncClientConf, fn hx.ResponseFn) OpenObserveClient {
|
||||
userAgent := _userAgent
|
||||
if conf.UserAgent != "" {
|
||||
userAgent = conf.UserAgent
|
||||
}
|
||||
return &obxClient{
|
||||
endpoint: conf.Endpoint(),
|
||||
user: conf.User,
|
||||
password: conf.Password,
|
||||
userAgent: userAgent,
|
||||
client: hx.NewAsyncClient(http.DefaultClient, acc),
|
||||
}
|
||||
}
|
Loading…
Reference in new issue