diff --git a/go.mod b/go.mod index 8b9f0783..612bc589 100644 --- a/go.mod +++ b/go.mod @@ -7,6 +7,7 @@ require ( github.com/afocus/captcha v0.0.0-20191010092841-4bd1f21c8868 github.com/alimy/cfg v0.4.0 github.com/alimy/mir/v4 v4.0.0 + github.com/alimy/tryst v0.1.0 github.com/aliyun/aliyun-oss-go-sdk v2.2.8+incompatible github.com/allegro/bigcache/v3 v3.1.0 github.com/bufbuild/connect-go v1.10.0 diff --git a/go.sum b/go.sum index 8252693e..517b9482 100644 --- a/go.sum +++ b/go.sum @@ -127,6 +127,8 @@ github.com/alimy/cfg v0.4.0 h1:SslKPndmxRViT1ePWLmNsEq7okYP0GVeuowQlRWZPkw= github.com/alimy/cfg v0.4.0/go.mod h1:rOxbasTH2srl6StAjNF5Vyi8bfrdkl3fLGmOYtSw81c= github.com/alimy/mir/v4 v4.0.0 h1:MzGfmoLjjvR69jbZEmpKJO3tUuqB0RGRv1UWPbtukBg= github.com/alimy/mir/v4 v4.0.0/go.mod h1:d58dBvw2KImcVbAUANrciEV/of0arMNsI9c/5UNCMMc= +github.com/alimy/tryst v0.1.0 h1:883i0eOLFmEn34S7vlUjICJLU0YFBToti+F7YRFcF1g= +github.com/alimy/tryst v0.1.0/go.mod h1:K//dPeoE/nnv2Jw8C3iPE7n8mO6LVqAxVmqbopM9nAk= github.com/aliyun/aliyun-oss-go-sdk v2.2.8+incompatible h1:6JF1bjhT0WN2srEmijfOFtVWwV91KZ6dJY1/JbdtGrI= github.com/aliyun/aliyun-oss-go-sdk v2.2.8+incompatible/go.mod h1:T/Aws4fEfogEE9v+HPhhw+CntffsBHJ8nXQCwKr0/g8= github.com/allegro/bigcache/v3 v3.1.0 h1:H2Vp8VOvxcrB91o86fUSVJFqeuz8kpyyB02eH3bSzwk= diff --git a/internal/conf/logger_observe.go b/internal/conf/logger_observe.go index 6297fa82..dc83a83e 100644 --- a/internal/conf/logger_observe.go +++ b/internal/conf/logger_observe.go @@ -53,11 +53,11 @@ func newObserveLogHook() *observeLogHook { Secure: s.Secure, } acc := &hx.AsyncClientConf{ - MinWorker: s.MinWorker, - MaxRequestInCh: s.MaxLogBuffer, - MaxRequestInTempCh: 100, - MaxTickCount: 60, - TickWaitTime: time.Second, + MinWorker: s.MinWorker, + MaxRequestBuf: s.MaxLogBuffer, + MaxRequestTempBuf: 100, + MaxTickCount: 60, + TickWaitTime: time.Second, } return &observeLogHook{ client: obx.NewClient(obc, acc, func(req *http.Request, resp *http.Response, err error) { diff --git a/pkg/http/client.go b/pkg/http/client.go index 6af18377..3dd85efd 100644 --- a/pkg/http/client.go +++ b/pkg/http/client.go @@ -8,7 +8,7 @@ import ( "net/http" "time" - "github.com/sirupsen/logrus" + gp "github.com/alimy/tryst/pool" ) var ( @@ -16,13 +16,13 @@ var ( ) const ( - _minRequestInCh = 10 - _minRequestInTmpCh = 10 + _minRequestBuf = 10 + _minRequestTempBuf = 10 _minWorker = 5 ) // ResponseFn a function used handle the response of http.Client.Do -type ResponseFn = func(req *http.Request, resp *http.Response, err error) +type ResponseFn = gp.ResponseFn[*http.Request, *http.Response] // AsyncClient asynchronous client interface type AsyncClient interface { @@ -31,90 +31,44 @@ type AsyncClient interface { // AsyncClientConf client configure used to create an AsynClient instance type AsyncClientConf struct { - MinWorker int - MaxRequestInCh int - MaxRequestInTempCh int - MaxTickCount int - TickWaitTime time.Duration -} - -type requestItem struct { - request *http.Request - fn ResponseFn + MinWorker int + MaxRequestBuf int + MaxRequestTempBuf int + MaxTickCount int + TickWaitTime time.Duration } type wormClient struct { - client *http.Client - requestCh chan *requestItem // 正式工 缓存通道 - requestTempCh chan *requestItem // 临时工 缓存通道 - maxTickCount int - tickWaitTime time.Duration + pool gp.GoroutinePool[*http.Request, *http.Response] } func (s *wormClient) Do(req *http.Request, fn ResponseFn) { - item := &requestItem{req, fn} - select { - case s.requestCh <- item: - // send request item by requestCh chan - default: - select { - case s.requestTempCh <- item: - // send request item by requestTempCh chan" - default: - go func() { - s.do(item) - // watch requestTempCh to continue do work if needed. - // cancel loop if no item had watched in s.maxCyle * s.maxWaitTime. - for count := 0; count < s.maxTickCount; count++ { - select { - case item := <-s.requestTempCh: - // reset count to continue do work - count = 0 - s.do(item) - default: - // sleeping to wait request item pass over to do work - time.Sleep(s.tickWaitTime) - } - } - }() - } - } -} - -func (s *wormClient) starDotWork() { - for item := range s.requestCh { - s.do(item) - } -} - -func (s *wormClient) do(req *requestItem) { - resp, err := s.client.Do(req.request) - req.fn(req.request, resp, err) + s.pool.Do(req, fn) } // NewAsyncClient create an AsyncClient instance func NewAsyncClient(client *http.Client, conf *AsyncClientConf) AsyncClient { - maxRequestInCh := _minRequestInCh - maxRequestInTempCh := _minRequestInTmpCh - if conf.MaxRequestInCh > _minRequestInCh { - maxRequestInCh = conf.MaxRequestInCh - } - if conf.MaxRequestInTempCh > _minRequestInTmpCh { - maxRequestInTempCh = conf.MaxRequestInTempCh + minWorker := _minWorker + maxRequestBuf := _minRequestBuf + maxRequestTempBuf := _minRequestTempBuf + if conf.MaxRequestBuf > _minRequestBuf { + maxRequestBuf = conf.MaxRequestBuf } - wc := &wormClient{ - client: client, - requestCh: make(chan *requestItem, maxRequestInCh), - requestTempCh: make(chan *requestItem, maxRequestInTempCh), + if conf.MaxRequestTempBuf > _minRequestTempBuf { + maxRequestTempBuf = conf.MaxRequestTempBuf } - numWorker := conf.MinWorker - if numWorker < _minWorker { - numWorker = _minWorker + if conf.MinWorker > _minWorker { + minWorker = conf.MinWorker } - logrus.Debugf("use %d backend worker to do the http request", numWorker) - // 启动 do work 正式工 - for ; numWorker > 0; numWorker-- { - go wc.starDotWork() + return &wormClient{ + pool: gp.NewGoroutinePool(func(req *http.Request) (*http.Response, error) { + return client.Do(req) + }, + gp.MinWorkerOpt(minWorker), + gp.MaxRequestBuffer(maxRequestBuf), + gp.MaxRequestTempBuffer(maxRequestTempBuf), + gp.MaxTickCount(conf.MaxTickCount), + gp.TickWaitTime(conf.TickWaitTime), + ), } - return wc }