diff options
author | 2023-04-28 16:45:21 +0100 | |
---|---|---|
committer | 2023-04-28 17:45:21 +0200 | |
commit | 6a29c5ffd40f1919cac40030c53160c19812bc8d (patch) | |
tree | f1faaa6504cdb798dfbfd1df20f1493bcdd99602 /internal/httpclient/client.go | |
parent | [bugfix] Fix remaining mangled URI escaping issues in statuses + accounts (#1... (diff) | |
download | gotosocial-6a29c5ffd40f1919cac40030c53160c19812bc8d.tar.xz |
[performance] improved request batching (removes need for queueing) (#1687)
* revamp http client to not limit requests, instead use sender worker
Signed-off-by: kim <grufwub@gmail.com>
* remove separate sender worker pool, spawn 2*GOMAXPROCS batch senders each time, no need for transport cache sweeping
Signed-off-by: kim <grufwub@gmail.com>
* improve batch senders to keep popping recipients until remote URL found
Signed-off-by: kim <grufwub@gmail.com>
* fix recipient looping issue
Signed-off-by: kim <grufwub@gmail.com>
* fix missing mutex unlock
Signed-off-by: kim <grufwub@gmail.com>
* move request id ctx key to gtscontext, finish filling out more code comments, add basic support for not logging client IP
Signed-off-by: kim <grufwub@gmail.com>
* slight code reformatting
Signed-off-by: kim <grufwub@gmail.com>
* a whitespace
Signed-off-by: kim <grufwub@gmail.com>
* remove unused code
Signed-off-by: kim <grufwub@gmail.com>
* add missing license headers
Signed-off-by: kim <grufwub@gmail.com>
* fix request backoff calculation
Signed-off-by: kim <grufwub@gmail.com>
---------
Signed-off-by: kim <grufwub@gmail.com>
Diffstat (limited to 'internal/httpclient/client.go')
-rw-r--r-- | internal/httpclient/client.go | 270 |
1 files changed, 177 insertions, 93 deletions
diff --git a/internal/httpclient/client.go b/internal/httpclient/client.go index 9562bdc48..67a1d0715 100644 --- a/internal/httpclient/client.go +++ b/internal/httpclient/client.go @@ -18,31 +18,39 @@ package httpclient import ( + "context" + "crypto/x509" "errors" + "fmt" "io" "net" "net/http" "net/netip" "runtime" + "strconv" + "strings" "time" "codeberg.org/gruf/go-bytesize" + "codeberg.org/gruf/go-byteutil" + "codeberg.org/gruf/go-cache/v3" + errorsv2 "codeberg.org/gruf/go-errors/v2" "codeberg.org/gruf/go-kv" - "github.com/cornelk/hashmap" + "github.com/superseriousbusiness/gotosocial/internal/gtscontext" + "github.com/superseriousbusiness/gotosocial/internal/gtserror" "github.com/superseriousbusiness/gotosocial/internal/log" ) -// ErrInvalidRequest is returned if a given HTTP request is invalid and cannot be performed. -var ErrInvalidRequest = errors.New("invalid http request") +var ( + // ErrInvalidNetwork is returned if the request would not be performed over TCP + ErrInvalidNetwork = errors.New("invalid network type") -// ErrInvalidNetwork is returned if the request would not be performed over TCP -var ErrInvalidNetwork = errors.New("invalid network type") + // ErrReservedAddr is returned if a dialed address resolves to an IP within a blocked or reserved net. + ErrReservedAddr = errors.New("dial within blocked / reserved IP range") -// ErrReservedAddr is returned if a dialed address resolves to an IP within a blocked or reserved net. -var ErrReservedAddr = errors.New("dial within blocked / reserved IP range") - -// ErrBodyTooLarge is returned when a received response body is above predefined limit (default 40MB). -var ErrBodyTooLarge = errors.New("body size too large") + // ErrBodyTooLarge is returned when a received response body is above predefined limit (default 40MB). + ErrBodyTooLarge = errors.New("body size too large") +) // Config provides configuration details for setting up a new // instance of httpclient.Client{}. Within are a subset of the @@ -83,13 +91,10 @@ type Config struct { // cases to protect against forged / unknown content-lengths // - protection from server side request forgery (SSRF) by only dialing // out to known public IP prefixes, configurable with allows/blocks -// - limit number of concurrent requests, else blocking until a slot -// is available (context channels still respected) type Client struct { - client http.Client - queue *hashmap.Map[string, chan struct{}] - bmax int64 // max response body size - cmax int // max open conns per host + client http.Client + badHosts cache.Cache[string, struct{}] + bodyMax int64 } // New returns a new instance of Client initialized using configuration. @@ -109,28 +114,26 @@ func New(cfg Config) *Client { } if cfg.MaxIdleConns <= 0 { - // By default base this value on MaxOpenConns + // By default base this value on MaxOpenConns. cfg.MaxIdleConns = cfg.MaxOpenConnsPerHost * 10 } if cfg.MaxBodySize <= 0 { - // By default set this to a reasonable 40MB + // By default set this to a reasonable 40MB. cfg.MaxBodySize = int64(40 * bytesize.MiB) } - // Protect dialer with IP range sanitizer + // Protect dialer with IP range sanitizer. d.Control = (&sanitizer{ allow: cfg.AllowRanges, block: cfg.BlockRanges, }).Sanitize - // Prepare client fields + // Prepare client fields. c.client.Timeout = cfg.Timeout - c.cmax = cfg.MaxOpenConnsPerHost - c.bmax = cfg.MaxBodySize - c.queue = hashmap.New[string, chan struct{}]() + c.bodyMax = cfg.MaxBodySize - // Set underlying HTTP client roundtripper + // Set underlying HTTP client roundtripper. c.client.Transport = &http.Transport{ Proxy: http.ProxyFromEnvironment, ForceAttemptHTTP2: true, @@ -144,90 +147,185 @@ func New(cfg Config) *Client { DisableCompression: cfg.DisableCompression, } + // Initiate outgoing bad hosts lookup cache. + c.badHosts = cache.New[string, struct{}](0, 1000, 0) + c.badHosts.SetTTL(15*time.Minute, false) + if !c.badHosts.Start(time.Minute) { + log.Panic(nil, "failed to start transport controller cache") + } + return &c } -// Do will perform given request when an available slot in the queue is available, -// and block until this time. For returned values, this follows the same semantics -// as the standard http.Client{}.Do() implementation except that response body will -// be wrapped by an io.LimitReader() to limit response body sizes. -func (c *Client) Do(req *http.Request) (*http.Response, error) { - // Ensure this is a valid request - if err := ValidateRequest(req); err != nil { - return nil, err - } +// Do ... +func (c *Client) Do(r *http.Request) (*http.Response, error) { + return c.DoSigned(r, func(r *http.Request) error { + return nil // no request signing + }) +} - // Get host's wait queue - wait := c.wait(req.Host) - - var ok bool - - select { - // Quickly try grab a spot - case wait <- struct{}{}: - // it's our turn! - ok = true - - // NOTE: - // Ideally here we would set the slot release to happen either - // on error return, or via callback from the response body closer. - // However when implementing this, there appear deadlocks between - // the channel queue here and the media manager worker pool. So - // currently we only place a limit on connections dialing out, but - // there may still be more connections open than len(c.queue) given - // that connections may not be closed until response body is closed. - // The current implementation will reduce the viability of denial of - // service attacks, but if there are future issues heed this advice :] - defer func() { <-wait }() - default: +// DoSigned ... +func (c *Client) DoSigned(r *http.Request, sign SignFunc) (*http.Response, error) { + const ( + // max no. attempts. + maxRetries = 5 + + // starting backoff duration. + baseBackoff = 2 * time.Second + ) + + // Get request hostname. + host := r.URL.Hostname() + + // Check whether request should fast fail. + fastFail := gtscontext.IsFastfail(r.Context()) + if !fastFail { + // Check if recently reached max retries for this host + // so we don't bother with a retry-backoff loop. The only + // errors that are retried upon are server failure and + // domain resolution type errors, so this cached result + // indicates this server is likely having issues. + fastFail = c.badHosts.Has(host) } - if !ok { - // No spot acquired, log warning - log.WithContext(req.Context()). - WithFields(kv.Fields{ - {K: "queue", V: len(wait)}, - {K: "method", V: req.Method}, - {K: "host", V: req.Host}, - {K: "uri", V: req.URL.RequestURI()}, - }...).Warn("full request queue") + // Start a log entry for this request + l := log.WithContext(r.Context()). + WithFields(kv.Fields{ + {"method", r.Method}, + {"url", r.URL.String()}, + }...) + + for i := 0; i < maxRetries; i++ { + var backoff time.Duration + + // Reset signing header fields + now := time.Now().UTC() + r.Header.Set("Date", now.Format("Mon, 02 Jan 2006 15:04:05")+" GMT") + r.Header.Del("Signature") + r.Header.Del("Digest") + + // Rewind body reader and content-length if set. + if rc, ok := r.Body.(*byteutil.ReadNopCloser); ok { + r.ContentLength = int64(rc.Len()) + rc.Rewind() + } + + // Sign the outgoing request. + if err := sign(r); err != nil { + return nil, err + } + + l.Infof("performing request") + + // Perform the request. + rsp, err := c.do(r) + if err == nil { //nolint:gocritic + + // TooManyRequest means we need to slow + // down and retry our request. Codes over + // 500 generally indicate temp. outages. + if code := rsp.StatusCode; code < 500 && + code != http.StatusTooManyRequests { + return rsp, nil + } + + // Generate error from status code for logging + err = errors.New(`http response "` + rsp.Status + `"`) + + // Search for a provided "Retry-After" header value. + if after := rsp.Header.Get("Retry-After"); after != "" { + + if u, _ := strconv.ParseUint(after, 10, 32); u != 0 { + // An integer number of backoff seconds was provided. + backoff = time.Duration(u) * time.Second + } else if at, _ := http.ParseTime(after); !at.Before(now) { + // An HTTP formatted future date-time was provided. + backoff = at.Sub(now) + } + + // Don't let their provided backoff exceed our max. + if max := baseBackoff * maxRetries; backoff > max { + backoff = max + } + } + + } else if errorsv2.Is(err, + context.DeadlineExceeded, + context.Canceled, + ErrBodyTooLarge, + ErrReservedAddr, + ) { + // Return on non-retryable errors + return nil, err + } else if strings.Contains(err.Error(), "stopped after 10 redirects") { + // Don't bother if net/http returned after too many redirects + return nil, err + } else if errors.As(err, &x509.UnknownAuthorityError{}) { + // Unknown authority errors we do NOT recover from + return nil, err + } else if dnserr := (*net.DNSError)(nil); // nocollapse + errors.As(err, &dnserr) && dnserr.IsNotFound { + // DNS lookup failure, this domain does not exist + return nil, gtserror.SetNotFound(err) + } + + if fastFail { + // on fast-fail, don't bother backoff/retry + return nil, fmt.Errorf("%w (fast fail)", err) + } + + if backoff == 0 { + // No retry-after found, set our predefined + // backoff according to a multiplier of 2^n. + backoff = baseBackoff * 1 << (i + 1) + } + + l.Errorf("backing off for %s after http request error: %v", backoff, err) select { - case <-req.Context().Done(): - // the request was canceled before we - // got to our turn: no need to release - return nil, req.Context().Err() - case wait <- struct{}{}: - defer func() { <-wait }() + // Request ctx cancelled + case <-r.Context().Done(): + return nil, r.Context().Err() + + // Backoff for some time + case <-time.After(backoff): } } - // Perform the HTTP request + // Add "bad" entry for this host. + c.badHosts.Set(host, struct{}{}) + + return nil, errors.New("transport reached max retries") +} + +// do ... +func (c *Client) do(req *http.Request) (*http.Response, error) { + // Perform the HTTP request. rsp, err := c.client.Do(req) if err != nil { return nil, err } - // Check response body not too large - if rsp.ContentLength > c.bmax { + // Check response body not too large. + if rsp.ContentLength > c.bodyMax { return nil, ErrBodyTooLarge } - // Seperate the body implementers + // Seperate the body implementers. rbody := (io.Reader)(rsp.Body) cbody := (io.Closer)(rsp.Body) var limit int64 if limit = rsp.ContentLength; limit < 0 { - // If unknown, use max as reader limit - limit = c.bmax + // If unknown, use max as reader limit. + limit = c.bodyMax } - // Don't trust them, limit body reads + // Don't trust them, limit body reads. rbody = io.LimitReader(rbody, limit) - // Wrap body with limit + // Wrap body with limit. rsp.Body = &struct { io.Reader io.Closer @@ -235,17 +333,3 @@ func (c *Client) Do(req *http.Request) (*http.Response, error) { return rsp, nil } - -// wait acquires the 'wait' queue for the given host string, or allocates new. -func (c *Client) wait(host string) chan struct{} { - // Look for an existing queue - queue, ok := c.queue.Get(host) - if ok { - return queue - } - - // Allocate a new host queue (or return a sneaky existing one). - queue, _ = c.queue.GetOrInsert(host, make(chan struct{}, c.cmax)) - - return queue -} |