Merge branch 'x/tryst-gorotine-pool' into dev

pull/383/head
Michael Li 1 year ago
commit 07ff699857
No known key found for this signature in database

@ -53,11 +53,11 @@ func newObserveLogHook() *observeLogHook {
Secure: s.Secure, Secure: s.Secure,
} }
acc := &hx.AsyncClientConf{ acc := &hx.AsyncClientConf{
MinWorker: s.MinWorker, MinWorker: s.MinWorker,
MaxRequestInCh: s.MaxLogBuffer, MaxRequestBuf: s.MaxLogBuffer,
MaxRequestInTempCh: 100, MaxRequestTempBuf: 100,
MaxTickCount: 60, MaxTickCount: 60,
TickWaitTime: time.Second, TickWaitTime: time.Second,
} }
return &observeLogHook{ return &observeLogHook{
client: obx.NewClient(obc, acc, func(req *http.Request, resp *http.Response, err error) { client: obx.NewClient(obc, acc, func(req *http.Request, resp *http.Response, err error) {

@ -8,7 +8,7 @@ import (
"net/http" "net/http"
"time" "time"
"github.com/sirupsen/logrus" gp "github.com/alimy/tryst/pool"
) )
var ( var (
@ -16,13 +16,13 @@ var (
) )
const ( const (
_minRequestInCh = 10 _minRequestBuf = 10
_minRequestInTmpCh = 10 _minRequestTempBuf = 10
_minWorker = 5 _minWorker = 5
) )
// ResponseFn a function used handle the response of http.Client.Do // ResponseFn a function used handle the response of http.Client.Do
type ResponseFn = func(req *http.Request, resp *http.Response, err error) type ResponseFn = gp.ResponseFn[*http.Request, *http.Response]
// AsyncClient asynchronous client interface // AsyncClient asynchronous client interface
type AsyncClient interface { type AsyncClient interface {
@ -31,90 +31,44 @@ type AsyncClient interface {
// AsyncClientConf client configure used to create an AsynClient instance // AsyncClientConf client configure used to create an AsynClient instance
type AsyncClientConf struct { type AsyncClientConf struct {
MinWorker int MinWorker int
MaxRequestInCh int MaxRequestBuf int
MaxRequestInTempCh int MaxRequestTempBuf int
MaxTickCount int MaxTickCount int
TickWaitTime time.Duration TickWaitTime time.Duration
}
type requestItem struct {
request *http.Request
fn ResponseFn
} }
type wormClient struct { type wormClient struct {
client *http.Client pool gp.GoroutinePool[*http.Request, *http.Response]
requestCh chan *requestItem // 正式工 缓存通道
requestTempCh chan *requestItem // 临时工 缓存通道
maxTickCount int
tickWaitTime time.Duration
} }
func (s *wormClient) Do(req *http.Request, fn ResponseFn) { func (s *wormClient) Do(req *http.Request, fn ResponseFn) {
item := &requestItem{req, fn} s.pool.Do(req, fn)
select {
case s.requestCh <- item:
// send request item by requestCh chan
default:
select {
case s.requestTempCh <- item:
// send request item by requestTempCh chan"
default:
go func() {
s.do(item)
// watch requestTempCh to continue do work if needed.
// cancel loop if no item had watched in s.maxCyle * s.maxWaitTime.
for count := 0; count < s.maxTickCount; count++ {
select {
case item := <-s.requestTempCh:
// reset count to continue do work
count = 0
s.do(item)
default:
// sleeping to wait request item pass over to do work
time.Sleep(s.tickWaitTime)
}
}
}()
}
}
}
func (s *wormClient) starDotWork() {
for item := range s.requestCh {
s.do(item)
}
}
func (s *wormClient) do(req *requestItem) {
resp, err := s.client.Do(req.request)
req.fn(req.request, resp, err)
} }
// NewAsyncClient create an AsyncClient instance // NewAsyncClient create an AsyncClient instance
func NewAsyncClient(client *http.Client, conf *AsyncClientConf) AsyncClient { func NewAsyncClient(client *http.Client, conf *AsyncClientConf) AsyncClient {
maxRequestInCh := _minRequestInCh minWorker := _minWorker
maxRequestInTempCh := _minRequestInTmpCh maxRequestBuf := _minRequestBuf
if conf.MaxRequestInCh > _minRequestInCh { maxRequestTempBuf := _minRequestTempBuf
maxRequestInCh = conf.MaxRequestInCh if conf.MaxRequestBuf > _minRequestBuf {
} maxRequestBuf = conf.MaxRequestBuf
if conf.MaxRequestInTempCh > _minRequestInTmpCh {
maxRequestInTempCh = conf.MaxRequestInTempCh
} }
wc := &wormClient{ if conf.MaxRequestTempBuf > _minRequestTempBuf {
client: client, maxRequestTempBuf = conf.MaxRequestTempBuf
requestCh: make(chan *requestItem, maxRequestInCh),
requestTempCh: make(chan *requestItem, maxRequestInTempCh),
} }
numWorker := conf.MinWorker if conf.MinWorker > _minWorker {
if numWorker < _minWorker { minWorker = conf.MinWorker
numWorker = _minWorker
} }
logrus.Debugf("use %d backend worker to do the http request", numWorker) return &wormClient{
// 启动 do work 正式工 pool: gp.NewGoroutinePool(func(req *http.Request) (*http.Response, error) {
for ; numWorker > 0; numWorker-- { return client.Do(req)
go wc.starDotWork() },
gp.MinWorkerOpt(minWorker),
gp.MaxRequestBufOpt(maxRequestBuf),
gp.MaxRequestTempBufOpt(maxRequestTempBuf),
gp.MaxTickCountOpt(conf.MaxTickCount),
gp.TickWaitTimeOpt(conf.TickWaitTime),
),
} }
return wc
} }

Loading…
Cancel
Save