diff options
author | 2024-04-11 10:45:35 +0100 | |
---|---|---|
committer | 2024-04-11 11:45:35 +0200 | |
commit | a483bd9e38333b153df1d4df95276cca38f99ff5 (patch) | |
tree | b2fdb6f53ef248b31719a15adc93e767eba3d5c4 /internal/httpclient/client.go | |
parent | [chore]: Bump github.com/yuin/goldmark from 1.7.0 to 1.7.1 (#2819) (diff) | |
download | gotosocial-a483bd9e38333b153df1d4df95276cca38f99ff5.tar.xz |
[performance] massively improved ActivityPub delivery worker efficiency (#2812)
* add delivery worker type that pulls from queue to httpclient package
* finish up some code commenting, bodge a vendored activity library change, integrate the deliverypool changes into transportcontroller
* hook up queue deletion logic
* support deleting queued http requests by target ID
* don't index APRequest by hostname in the queue
* use gorun
* use the original context's values when wrapping msg type as delivery{}
* actually log in the AP delivery worker ...
* add uncommitted changes
* use errors.AsV2()
* use errorsv2.AsV2()
* finish adding some code comments, add bad host handling to delivery workers
* slightly tweak deliveryworkerpool API, use advanced sender multiplier
* remove PopCtx() method, let others instead rely on Wait()
* shuffle things around to move delivery stuff into transport/ subpkg
* remove dead code
* formatting
* validate request before queueing for delivery
* finish adding code comments, fix up backoff code
* finish adding more code comments
* clamp minimum no. senders to 1
* add start/stop logging to delivery worker, some slight changes
* remove double logging
* use worker ptrs
* expose the embedded log fields in httpclient.Request{}
* ensure request context values are preserved when updating ctx
* add delivery worker tests
* fix linter issues
* ensure delivery worker gets inited in testrig
* fix tests to delivering messages to check worker delivery queue
* update error type to use ptr instead of value receiver
* fix test calling Workers{}.Start() instead of testrig.StartWorkers()
* update docs for advanced-sender-multiplier
* update to the latest activity library version
* add comment about not using httptest.Server{}
Diffstat (limited to 'internal/httpclient/client.go')
-rw-r--r-- | internal/httpclient/client.go | 256 |
1 files changed, 143 insertions, 113 deletions
diff --git a/internal/httpclient/client.go b/internal/httpclient/client.go index 6c2427372..31c6df7d0 100644 --- a/internal/httpclient/client.go +++ b/internal/httpclient/client.go @@ -35,7 +35,6 @@ import ( "codeberg.org/gruf/go-cache/v3" errorsv2 "codeberg.org/gruf/go-errors/v2" "codeberg.org/gruf/go-iotools" - "codeberg.org/gruf/go-kv" "github.com/superseriousbusiness/gotosocial/internal/gtscontext" "github.com/superseriousbusiness/gotosocial/internal/gtserror" "github.com/superseriousbusiness/gotosocial/internal/log" @@ -109,11 +108,13 @@ type Client struct { client http.Client badHosts cache.TTLCache[string, struct{}] bodyMax int64 + retries uint } // New returns a new instance of Client initialized using configuration. func New(cfg Config) *Client { var c Client + c.retries = 5 d := &net.Dialer{ Timeout: 15 * time.Second, @@ -177,7 +178,7 @@ func New(cfg Config) *Client { }} // Initiate outgoing bad hosts lookup cache. - c.badHosts = cache.NewTTL[string, struct{}](0, 1000, 0) + c.badHosts = cache.NewTTL[string, struct{}](0, 512, 0) c.badHosts.SetTTL(time.Hour, false) if !c.badHosts.Start(time.Minute) { log.Panic(nil, "failed to start transport controller cache") @@ -187,154 +188,184 @@ func New(cfg Config) *Client { } // Do will essentially perform http.Client{}.Do() with retry-backoff functionality. -func (c *Client) Do(r *http.Request) (*http.Response, error) { - return c.DoSigned(r, func(r *http.Request) error { - return nil // no request signing - }) -} - -// DoSigned will essentially perform http.Client{}.Do() with retry-backoff functionality and requesting signing.. -func (c *Client) DoSigned(r *http.Request, sign SignFunc) (rsp *http.Response, err error) { - const ( - // max no. attempts. - maxRetries = 5 - - // starting backoff duration. - baseBackoff = 2 * time.Second - ) +func (c *Client) Do(r *http.Request) (rsp *http.Response, err error) { // First validate incoming request. if err := ValidateRequest(r); err != nil { return nil, err } - // Get request hostname. - host := r.URL.Hostname() - - // Check whether request should fast fail. - fastFail := gtscontext.IsFastfail(r.Context()) - if !fastFail { - // Check if recently reached max retries for this host - // so we don't bother with a retry-backoff loop. The only - // errors that are retried upon are server failure, TLS - // and domain resolution type errors, so this cached result - // indicates this server is likely having issues. - fastFail = c.badHosts.Has(host) - defer func() { - if err != nil { - // On error return mark as bad-host. - c.badHosts.Set(host, struct{}{}) - } - }() + // Wrap in our own request + // type for retry-backoff. + req := WrapRequest(r) + + if gtscontext.IsFastfail(r.Context()) { + // If the fast-fail flag was set, just + // attempt a single iteration instead of + // following the below retry-backoff loop. + rsp, _, err = c.DoOnce(&req) + if err != nil { + return nil, fmt.Errorf("%w (fast fail)", err) + } + return rsp, nil } - // Start a log entry for this request - l := log.WithContext(r.Context()). - WithFields(kv.Fields{ - {"method", r.Method}, - {"url", r.URL.String()}, - }...) + for { + var retry bool - for i := 0; i < maxRetries; i++ { - var backoff time.Duration + // Perform the http request. + rsp, retry, err = c.DoOnce(&req) + if err == nil { + return rsp, nil + } - l.Info("performing request") + if !retry { + // reached max retries, don't further backoff + return nil, fmt.Errorf("%w (max retries)", err) + } - // Perform the request. - rsp, err = c.do(r) - if err == nil { //nolint:gocritic + // Start new backoff sleep timer. + backoff := time.NewTimer(req.BackOff()) - // TooManyRequest means we need to slow - // down and retry our request. Codes over - // 500 generally indicate temp. outages. - if code := rsp.StatusCode; code < 500 && - code != http.StatusTooManyRequests { - return rsp, nil - } + select { + // Request ctx cancelled. + case <-r.Context().Done(): + backoff.Stop() - // Create loggable error from response status code. - err = fmt.Errorf(`http response: %s`, rsp.Status) + // Return context error. + err = r.Context().Err() + return nil, err - // Search for a provided "Retry-After" header value. - if after := rsp.Header.Get("Retry-After"); after != "" { + // Backoff for time. + case <-backoff.C: + } + } +} - // Get current time. - now := time.Now() +// DoOnce wraps an underlying http.Client{}.Do() to perform our wrapped request type: +// rewinding response body to permit reuse, signing request data when SignFunc provided, +// marking erroring hosts, updating retry attempt counts and setting backoff from header. +func (c *Client) DoOnce(r *Request) (rsp *http.Response, retry bool, err error) { + if r.attempts > c.retries { + // Ensure request hasn't reached max number of attempts. + err = fmt.Errorf("httpclient: reached max retries (%d)", c.retries) + return + } - if u, _ := strconv.ParseUint(after, 10, 32); u != 0 { - // An integer number of backoff seconds was provided. - backoff = time.Duration(u) * time.Second - } else if at, _ := http.ParseTime(after); !at.Before(now) { - // An HTTP formatted future date-time was provided. - backoff = at.Sub(now) - } + // Update no. + // attempts. + r.attempts++ - // Don't let their provided backoff exceed our max. - if max := baseBackoff * maxRetries; backoff > max { - backoff = max - } - } + // Reset backoff. + r.backoff = 0 + + // Perform main routine. + rsp, retry, err = c.do(r) - // Close + unset rsp. - _ = rsp.Body.Close() - rsp = nil + if rsp != nil { + // Log successful rsp. + r.Entry.Info(rsp.Status) + return + } - } else if errorsv2.IsV2(err, + // Log any errors. + r.Entry.Error(err) + + switch { + case !retry: + // If they were told not to + // retry, also set number of + // attempts to prevent retry. + r.attempts = c.retries + 1 + + case r.attempts > c.retries: + // On max retries, mark this as + // a "badhost", i.e. is erroring. + c.badHosts.Set(r.Host, struct{}{}) + + // Ensure retry flag is unset + // when reached max attempts. + retry = false + + case c.badHosts.Has(r.Host): + // When retry is still permitted, + // check host hasn't been marked + // as a "badhost", i.e. erroring. + r.attempts = c.retries + 1 + retry = false + } + + return +} + +// do performs the "meat" of DoOnce(), but it's separated out to allow +// easier wrapping of the response, retry, error returns with further logic. +func (c *Client) do(r *Request) (rsp *http.Response, retry bool, err error) { + // Perform the HTTP request. + rsp, err = c.client.Do(r.Request) + if err != nil { + + if errorsv2.IsV2(err, context.DeadlineExceeded, context.Canceled, ErrBodyTooLarge, ErrReservedAddr, ) { // Non-retryable errors. - return nil, err - } else if errstr := err.Error(); // nocollapse + return nil, false, err + } + + if errstr := err.Error(); // strings.Contains(errstr, "stopped after 10 redirects") || strings.Contains(errstr, "tls: ") || strings.Contains(errstr, "x509: ") { // These error types aren't wrapped // so we have to check the error string. // All are unrecoverable! - return nil, err - } else if dnserr := (*net.DNSError)(nil); // nocollapse - errors.As(err, &dnserr) && dnserr.IsNotFound { - // DNS lookup failure, this domain does not exist - return nil, gtserror.SetNotFound(err) + return nil, false, err } - if fastFail { - // on fast-fail, don't bother backoff/retry - return nil, fmt.Errorf("%w (fast fail)", err) + if dnserr := errorsv2.AsV2[*net.DNSError](err); // + dnserr != nil && dnserr.IsNotFound { + // DNS lookup failure, this domain does not exist + return nil, false, gtserror.SetNotFound(err) } - if backoff == 0 { - // No retry-after found, set our predefined - // backoff according to a multiplier of 2^n. - backoff = baseBackoff * 1 << (i + 1) - } + // A retryable error. + return nil, true, err - l.Errorf("backing off for %s after http request error: %v", backoff, err) + } else if rsp.StatusCode > 500 || + rsp.StatusCode == http.StatusTooManyRequests { - select { - // Request ctx cancelled - case <-r.Context().Done(): - return nil, r.Context().Err() + // Codes over 500 (and 429: too many requests) + // are generally temporary errors. For these + // we replace the response with a loggable error. + err = fmt.Errorf(`http response: %s`, rsp.Status) - // Backoff for some time - case <-time.After(backoff): - } - } + // Search for a provided "Retry-After" header value. + if after := rsp.Header.Get("Retry-After"); after != "" { - // Set error return to trigger setting "bad host". - err = errors.New("transport reached max retries") - return -} + // Get cur time. + now := time.Now() -// do wraps http.Client{}.Do() to provide safely limited response bodies. -func (c *Client) do(req *http.Request) (*http.Response, error) { - // Perform the HTTP request. - rsp, err := c.client.Do(req) - if err != nil { - return nil, err + if u, _ := strconv.ParseUint(after, 10, 32); u != 0 { + // An integer no. of backoff seconds was provided. + r.backoff = time.Duration(u) * time.Second + } else if at, _ := http.ParseTime(after); !at.Before(now) { + // An HTTP formatted future date-time was provided. + r.backoff = at.Sub(now) + } + + // Don't let their provided backoff exceed our max. + if max := baseBackoff * time.Duration(c.retries); // + r.backoff > max { + r.backoff = max + } + } + + // Unset + close rsp. + _ = rsp.Body.Close() + return nil, true, err } // Seperate the body implementers. @@ -364,11 +395,10 @@ func (c *Client) do(req *http.Request) (*http.Response, error) { // Check response body not too large. if rsp.ContentLength > c.bodyMax { - _ = rsp.Body.Close() - return nil, ErrBodyTooLarge + return nil, false, ErrBodyTooLarge } - return rsp, nil + return rsp, true, nil } // cast discard writer to full interface it supports. |