use github.com/alimy/tryst/pool.GorotinePool as goroutine pool to implement AsyncClient

pull/383/head
Michael Li 1 year ago
parent 2034eda2ad
commit c34e53e285
No known key found for this signature in database

@ -7,6 +7,7 @@ require (
github.com/afocus/captcha v0.0.0-20191010092841-4bd1f21c8868
github.com/alimy/cfg v0.4.0
github.com/alimy/mir/v4 v4.0.0
github.com/alimy/tryst v0.1.0
github.com/aliyun/aliyun-oss-go-sdk v2.2.8+incompatible
github.com/allegro/bigcache/v3 v3.1.0
github.com/bufbuild/connect-go v1.10.0

@ -127,6 +127,8 @@ github.com/alimy/cfg v0.4.0 h1:SslKPndmxRViT1ePWLmNsEq7okYP0GVeuowQlRWZPkw=
github.com/alimy/cfg v0.4.0/go.mod h1:rOxbasTH2srl6StAjNF5Vyi8bfrdkl3fLGmOYtSw81c=
github.com/alimy/mir/v4 v4.0.0 h1:MzGfmoLjjvR69jbZEmpKJO3tUuqB0RGRv1UWPbtukBg=
github.com/alimy/mir/v4 v4.0.0/go.mod h1:d58dBvw2KImcVbAUANrciEV/of0arMNsI9c/5UNCMMc=
github.com/alimy/tryst v0.1.0 h1:883i0eOLFmEn34S7vlUjICJLU0YFBToti+F7YRFcF1g=
github.com/alimy/tryst v0.1.0/go.mod h1:K//dPeoE/nnv2Jw8C3iPE7n8mO6LVqAxVmqbopM9nAk=
github.com/aliyun/aliyun-oss-go-sdk v2.2.8+incompatible h1:6JF1bjhT0WN2srEmijfOFtVWwV91KZ6dJY1/JbdtGrI=
github.com/aliyun/aliyun-oss-go-sdk v2.2.8+incompatible/go.mod h1:T/Aws4fEfogEE9v+HPhhw+CntffsBHJ8nXQCwKr0/g8=
github.com/allegro/bigcache/v3 v3.1.0 h1:H2Vp8VOvxcrB91o86fUSVJFqeuz8kpyyB02eH3bSzwk=

@ -54,8 +54,8 @@ func newObserveLogHook() *observeLogHook {
}
acc := &hx.AsyncClientConf{
MinWorker: s.MinWorker,
MaxRequestInCh: s.MaxLogBuffer,
MaxRequestInTempCh: 100,
MaxRequestBuf: s.MaxLogBuffer,
MaxRequestTempBuf: 100,
MaxTickCount: 60,
TickWaitTime: time.Second,
}

@ -8,7 +8,7 @@ import (
"net/http"
"time"
"github.com/sirupsen/logrus"
gp "github.com/alimy/tryst/pool"
)
var (
@ -16,13 +16,13 @@ var (
)
const (
_minRequestInCh = 10
_minRequestInTmpCh = 10
_minRequestBuf = 10
_minRequestTempBuf = 10
_minWorker = 5
)
// ResponseFn a function used handle the response of http.Client.Do
type ResponseFn = func(req *http.Request, resp *http.Response, err error)
type ResponseFn = gp.ResponseFn[*http.Request, *http.Response]
// AsyncClient asynchronous client interface
type AsyncClient interface {
@ -32,89 +32,43 @@ type AsyncClient interface {
// AsyncClientConf client configure used to create an AsynClient instance
type AsyncClientConf struct {
MinWorker int
MaxRequestInCh int
MaxRequestInTempCh int
MaxRequestBuf int
MaxRequestTempBuf int
MaxTickCount int
TickWaitTime time.Duration
}
type requestItem struct {
request *http.Request
fn ResponseFn
}
type wormClient struct {
client *http.Client
requestCh chan *requestItem // 正式工 缓存通道
requestTempCh chan *requestItem // 临时工 缓存通道
maxTickCount int
tickWaitTime time.Duration
pool gp.GoroutinePool[*http.Request, *http.Response]
}
func (s *wormClient) Do(req *http.Request, fn ResponseFn) {
item := &requestItem{req, fn}
select {
case s.requestCh <- item:
// send request item by requestCh chan
default:
select {
case s.requestTempCh <- item:
// send request item by requestTempCh chan"
default:
go func() {
s.do(item)
// watch requestTempCh to continue do work if needed.
// cancel loop if no item had watched in s.maxCyle * s.maxWaitTime.
for count := 0; count < s.maxTickCount; count++ {
select {
case item := <-s.requestTempCh:
// reset count to continue do work
count = 0
s.do(item)
default:
// sleeping to wait request item pass over to do work
time.Sleep(s.tickWaitTime)
}
}
}()
}
}
}
func (s *wormClient) starDotWork() {
for item := range s.requestCh {
s.do(item)
}
}
func (s *wormClient) do(req *requestItem) {
resp, err := s.client.Do(req.request)
req.fn(req.request, resp, err)
s.pool.Do(req, fn)
}
// NewAsyncClient create an AsyncClient instance
func NewAsyncClient(client *http.Client, conf *AsyncClientConf) AsyncClient {
maxRequestInCh := _minRequestInCh
maxRequestInTempCh := _minRequestInTmpCh
if conf.MaxRequestInCh > _minRequestInCh {
maxRequestInCh = conf.MaxRequestInCh
}
if conf.MaxRequestInTempCh > _minRequestInTmpCh {
maxRequestInTempCh = conf.MaxRequestInTempCh
minWorker := _minWorker
maxRequestBuf := _minRequestBuf
maxRequestTempBuf := _minRequestTempBuf
if conf.MaxRequestBuf > _minRequestBuf {
maxRequestBuf = conf.MaxRequestBuf
}
wc := &wormClient{
client: client,
requestCh: make(chan *requestItem, maxRequestInCh),
requestTempCh: make(chan *requestItem, maxRequestInTempCh),
if conf.MaxRequestTempBuf > _minRequestTempBuf {
maxRequestTempBuf = conf.MaxRequestTempBuf
}
numWorker := conf.MinWorker
if numWorker < _minWorker {
numWorker = _minWorker
if conf.MinWorker > _minWorker {
minWorker = conf.MinWorker
}
logrus.Debugf("use %d backend worker to do the http request", numWorker)
// 启动 do work 正式工
for ; numWorker > 0; numWorker-- {
go wc.starDotWork()
return &wormClient{
pool: gp.NewGoroutinePool(func(req *http.Request) (*http.Response, error) {
return client.Do(req)
},
gp.MinWorkerOpt(minWorker),
gp.MaxRequestBuffer(maxRequestBuf),
gp.MaxRequestTempBuffer(maxRequestTempBuf),
gp.MaxTickCount(conf.MaxTickCount),
gp.TickWaitTime(conf.TickWaitTime),
),
}
return wc
}

Loading…
Cancel
Save