diff options
author | 2022-11-08 09:35:24 +0000 | |
---|---|---|
committer | 2022-11-08 10:35:24 +0100 | |
commit | 0e572460830ce7767562f08034f96d51e41b6349 (patch) | |
tree | 2ffad44c4d6f07b9d4c3bd494d6d1a3d3918692f /internal/httpclient/client.go | |
parent | [chore] update gruf libraries (#996) (diff) | |
download | gotosocial-0e572460830ce7767562f08034f96d51e41b6349.tar.xz |
[feature] various worker / request queue improvements (#995)
* greatly simplify httpclient request queuing
Signed-off-by: kim <grufwub@gmail.com>
* improved request queue mutex logic
Signed-off-by: kim <grufwub@gmail.com>
* use improved hashmap library
Signed-off-by: kim <grufwub@gmail.com>
* add warn logging when request queues are full
Signed-off-by: kim <grufwub@gmail.com>
* improve worker pool prefix var naming
Signed-off-by: kim <grufwub@gmail.com>
* improved worker pool error logging
Signed-off-by: kim <grufwub@gmail.com>
* move error message into separate field
Signed-off-by: kim <grufwub@gmail.com>
* remove old log statement
Signed-off-by: kim <grufwub@gmail.com>
* don't export worker message, it gets very spammy :')
Signed-off-by: kim <grufwub@gmail.com>
Signed-off-by: kim <grufwub@gmail.com>
Diffstat (limited to 'internal/httpclient/client.go')
-rw-r--r-- | internal/httpclient/client.go | 78 |
1 files changed, 58 insertions, 20 deletions
diff --git a/internal/httpclient/client.go b/internal/httpclient/client.go index 7aa0cd8ea..8792e5b82 100644 --- a/internal/httpclient/client.go +++ b/internal/httpclient/client.go @@ -26,6 +26,11 @@ import ( "net/netip" "runtime" "time" + + "codeberg.org/gruf/go-bytesize" + "codeberg.org/gruf/go-kv" + "github.com/cornelk/hashmap" + "github.com/superseriousbusiness/gotosocial/internal/log" ) // ErrInvalidRequest is returned if a given HTTP request is invalid and cannot be performed. @@ -42,8 +47,8 @@ var ErrBodyTooLarge = errors.New("body size too large") // configuration values passed to initialized http.Transport{} // and http.Client{}, along with httpclient.Client{} specific. type Config struct { - // MaxOpenConns limits the max number of concurrent open connections. - MaxOpenConns int + // MaxOpenConnsPerHost limits the max number of open connections to a host. + MaxOpenConnsPerHost int // MaxIdleConns: see http.Transport{}.MaxIdleConns. MaxIdleConns int @@ -80,8 +85,9 @@ type Config struct { // is available (context channels still respected) type Client struct { client http.Client - rc *requestQueue - bmax int64 + queue *hashmap.Map[string, chan struct{}] + bmax int64 // max response body size + cmax int // max open conns per host } // New returns a new instance of Client initialized using configuration. @@ -94,20 +100,20 @@ func New(cfg Config) *Client { Resolver: &net.Resolver{}, } - if cfg.MaxOpenConns <= 0 { + if cfg.MaxOpenConnsPerHost <= 0 { // By default base this value on GOMAXPROCS. maxprocs := runtime.GOMAXPROCS(0) - cfg.MaxOpenConns = maxprocs * 10 + cfg.MaxOpenConnsPerHost = maxprocs * 20 } if cfg.MaxIdleConns <= 0 { // By default base this value on MaxOpenConns - cfg.MaxIdleConns = cfg.MaxOpenConns * 10 + cfg.MaxIdleConns = cfg.MaxOpenConnsPerHost * 10 } if cfg.MaxBodySize <= 0 { // By default set this to a reasonable 40MB - cfg.MaxBodySize = 40 * 1024 * 1024 + cfg.MaxBodySize = int64(40 * bytesize.MiB) } // Protect dialer with IP range sanitizer @@ -117,11 +123,10 @@ func New(cfg Config) *Client { }).Sanitize // Prepare client fields - c.bmax = cfg.MaxBodySize - c.rc = &requestQueue{ - maxOpenConns: cfg.MaxOpenConns, - } c.client.Timeout = cfg.Timeout + c.cmax = cfg.MaxOpenConnsPerHost + c.bmax = cfg.MaxBodySize + c.queue = hashmap.New[string, chan struct{}]() // Set underlying HTTP client roundtripper c.client.Transport = &http.Transport{ @@ -145,17 +150,16 @@ func New(cfg Config) *Client { // as the standard http.Client{}.Do() implementation except that response body will // be wrapped by an io.LimitReader() to limit response body sizes. func (c *Client) Do(req *http.Request) (*http.Response, error) { - // request a spot in the wait queue... - wait, release := c.rc.getWaitSpot(req.Host, req.Method) + // Get host's wait queue + wait := c.wait(req.Host) + + var ok bool - // ... and wait our turn select { - case <-req.Context().Done(): - // the request was canceled before we - // got to our turn: no need to release - return nil, req.Context().Err() + // Quickly try grab a spot case wait <- struct{}{}: // it's our turn! + ok = true // NOTE: // Ideally here we would set the slot release to happen either @@ -167,7 +171,27 @@ func (c *Client) Do(req *http.Request) (*http.Response, error) { // that connections may not be closed until response body is closed. // The current implementation will reduce the viability of denial of // service attacks, but if there are future issues heed this advice :] - defer release() + defer func() { <-wait }() + default: + } + + if !ok { + // No spot acquired, log warning + log.WithFields(kv.Fields{ + {K: "queue", V: len(wait)}, + {K: "method", V: req.Method}, + {K: "host", V: req.Host}, + {K: "uri", V: req.URL.RequestURI()}, + }...).Warn("full request queue") + + select { + case <-req.Context().Done(): + // the request was canceled before we + // got to our turn: no need to release + return nil, req.Context().Err() + case wait <- struct{}{}: + defer func() { <-wait }() + } } // Firstly, ensure this is a valid request @@ -208,3 +232,17 @@ func (c *Client) Do(req *http.Request) (*http.Response, error) { return rsp, nil } + +// wait acquires the 'wait' queue for the given host string, or allocates new. +func (c *Client) wait(host string) chan struct{} { + // Look for an existing queue + queue, ok := c.queue.Get(host) + if ok { + return queue + } + + // Allocate a new host queue (or return a sneaky existing one). + queue, _ = c.queue.GetOrInsert(host, make(chan struct{}, c.cmax)) + + return queue +} |