summaryrefslogtreecommitdiff
path: root/internal/httpclient/client.go
diff options
context:
space:
mode:
authorLibravatar kim <89579420+NyaaaWhatsUpDoc@users.noreply.github.com>2024-04-11 10:45:35 +0100
committerLibravatar GitHub <noreply@github.com>2024-04-11 11:45:35 +0200
commita483bd9e38333b153df1d4df95276cca38f99ff5 (patch)
treeb2fdb6f53ef248b31719a15adc93e767eba3d5c4 /internal/httpclient/client.go
parent[chore]: Bump github.com/yuin/goldmark from 1.7.0 to 1.7.1 (#2819) (diff)
downloadgotosocial-a483bd9e38333b153df1d4df95276cca38f99ff5.tar.xz
[performance] massively improved ActivityPub delivery worker efficiency (#2812)
* add delivery worker type that pulls from queue to httpclient package * finish up some code commenting, bodge a vendored activity library change, integrate the deliverypool changes into transportcontroller * hook up queue deletion logic * support deleting queued http requests by target ID * don't index APRequest by hostname in the queue * use gorun * use the original context's values when wrapping msg type as delivery{} * actually log in the AP delivery worker ... * add uncommitted changes * use errors.AsV2() * use errorsv2.AsV2() * finish adding some code comments, add bad host handling to delivery workers * slightly tweak deliveryworkerpool API, use advanced sender multiplier * remove PopCtx() method, let others instead rely on Wait() * shuffle things around to move delivery stuff into transport/ subpkg * remove dead code * formatting * validate request before queueing for delivery * finish adding code comments, fix up backoff code * finish adding more code comments * clamp minimum no. senders to 1 * add start/stop logging to delivery worker, some slight changes * remove double logging * use worker ptrs * expose the embedded log fields in httpclient.Request{} * ensure request context values are preserved when updating ctx * add delivery worker tests * fix linter issues * ensure delivery worker gets inited in testrig * fix tests to delivering messages to check worker delivery queue * update error type to use ptr instead of value receiver * fix test calling Workers{}.Start() instead of testrig.StartWorkers() * update docs for advanced-sender-multiplier * update to the latest activity library version * add comment about not using httptest.Server{}
Diffstat (limited to 'internal/httpclient/client.go')
-rw-r--r--internal/httpclient/client.go256
1 files changed, 143 insertions, 113 deletions
diff --git a/internal/httpclient/client.go b/internal/httpclient/client.go
index 6c2427372..31c6df7d0 100644
--- a/internal/httpclient/client.go
+++ b/internal/httpclient/client.go
@@ -35,7 +35,6 @@ import (
"codeberg.org/gruf/go-cache/v3"
errorsv2 "codeberg.org/gruf/go-errors/v2"
"codeberg.org/gruf/go-iotools"
- "codeberg.org/gruf/go-kv"
"github.com/superseriousbusiness/gotosocial/internal/gtscontext"
"github.com/superseriousbusiness/gotosocial/internal/gtserror"
"github.com/superseriousbusiness/gotosocial/internal/log"
@@ -109,11 +108,13 @@ type Client struct {
client http.Client
badHosts cache.TTLCache[string, struct{}]
bodyMax int64
+ retries uint
}
// New returns a new instance of Client initialized using configuration.
func New(cfg Config) *Client {
var c Client
+ c.retries = 5
d := &net.Dialer{
Timeout: 15 * time.Second,
@@ -177,7 +178,7 @@ func New(cfg Config) *Client {
}}
// Initiate outgoing bad hosts lookup cache.
- c.badHosts = cache.NewTTL[string, struct{}](0, 1000, 0)
+ c.badHosts = cache.NewTTL[string, struct{}](0, 512, 0)
c.badHosts.SetTTL(time.Hour, false)
if !c.badHosts.Start(time.Minute) {
log.Panic(nil, "failed to start transport controller cache")
@@ -187,154 +188,184 @@ func New(cfg Config) *Client {
}
// Do will essentially perform http.Client{}.Do() with retry-backoff functionality.
-func (c *Client) Do(r *http.Request) (*http.Response, error) {
- return c.DoSigned(r, func(r *http.Request) error {
- return nil // no request signing
- })
-}
-
-// DoSigned will essentially perform http.Client{}.Do() with retry-backoff functionality and requesting signing..
-func (c *Client) DoSigned(r *http.Request, sign SignFunc) (rsp *http.Response, err error) {
- const (
- // max no. attempts.
- maxRetries = 5
-
- // starting backoff duration.
- baseBackoff = 2 * time.Second
- )
+func (c *Client) Do(r *http.Request) (rsp *http.Response, err error) {
// First validate incoming request.
if err := ValidateRequest(r); err != nil {
return nil, err
}
- // Get request hostname.
- host := r.URL.Hostname()
-
- // Check whether request should fast fail.
- fastFail := gtscontext.IsFastfail(r.Context())
- if !fastFail {
- // Check if recently reached max retries for this host
- // so we don't bother with a retry-backoff loop. The only
- // errors that are retried upon are server failure, TLS
- // and domain resolution type errors, so this cached result
- // indicates this server is likely having issues.
- fastFail = c.badHosts.Has(host)
- defer func() {
- if err != nil {
- // On error return mark as bad-host.
- c.badHosts.Set(host, struct{}{})
- }
- }()
+ // Wrap in our own request
+ // type for retry-backoff.
+ req := WrapRequest(r)
+
+ if gtscontext.IsFastfail(r.Context()) {
+ // If the fast-fail flag was set, just
+ // attempt a single iteration instead of
+ // following the below retry-backoff loop.
+ rsp, _, err = c.DoOnce(&req)
+ if err != nil {
+ return nil, fmt.Errorf("%w (fast fail)", err)
+ }
+ return rsp, nil
}
- // Start a log entry for this request
- l := log.WithContext(r.Context()).
- WithFields(kv.Fields{
- {"method", r.Method},
- {"url", r.URL.String()},
- }...)
+ for {
+ var retry bool
- for i := 0; i < maxRetries; i++ {
- var backoff time.Duration
+ // Perform the http request.
+ rsp, retry, err = c.DoOnce(&req)
+ if err == nil {
+ return rsp, nil
+ }
- l.Info("performing request")
+ if !retry {
+ // reached max retries, don't further backoff
+ return nil, fmt.Errorf("%w (max retries)", err)
+ }
- // Perform the request.
- rsp, err = c.do(r)
- if err == nil { //nolint:gocritic
+ // Start new backoff sleep timer.
+ backoff := time.NewTimer(req.BackOff())
- // TooManyRequest means we need to slow
- // down and retry our request. Codes over
- // 500 generally indicate temp. outages.
- if code := rsp.StatusCode; code < 500 &&
- code != http.StatusTooManyRequests {
- return rsp, nil
- }
+ select {
+ // Request ctx cancelled.
+ case <-r.Context().Done():
+ backoff.Stop()
- // Create loggable error from response status code.
- err = fmt.Errorf(`http response: %s`, rsp.Status)
+ // Return context error.
+ err = r.Context().Err()
+ return nil, err
- // Search for a provided "Retry-After" header value.
- if after := rsp.Header.Get("Retry-After"); after != "" {
+ // Backoff for time.
+ case <-backoff.C:
+ }
+ }
+}
- // Get current time.
- now := time.Now()
+// DoOnce wraps an underlying http.Client{}.Do() to perform our wrapped request type:
+// rewinding response body to permit reuse, signing request data when SignFunc provided,
+// marking erroring hosts, updating retry attempt counts and setting backoff from header.
+func (c *Client) DoOnce(r *Request) (rsp *http.Response, retry bool, err error) {
+ if r.attempts > c.retries {
+ // Ensure request hasn't reached max number of attempts.
+ err = fmt.Errorf("httpclient: reached max retries (%d)", c.retries)
+ return
+ }
- if u, _ := strconv.ParseUint(after, 10, 32); u != 0 {
- // An integer number of backoff seconds was provided.
- backoff = time.Duration(u) * time.Second
- } else if at, _ := http.ParseTime(after); !at.Before(now) {
- // An HTTP formatted future date-time was provided.
- backoff = at.Sub(now)
- }
+ // Update no.
+ // attempts.
+ r.attempts++
- // Don't let their provided backoff exceed our max.
- if max := baseBackoff * maxRetries; backoff > max {
- backoff = max
- }
- }
+ // Reset backoff.
+ r.backoff = 0
+
+ // Perform main routine.
+ rsp, retry, err = c.do(r)
- // Close + unset rsp.
- _ = rsp.Body.Close()
- rsp = nil
+ if rsp != nil {
+ // Log successful rsp.
+ r.Entry.Info(rsp.Status)
+ return
+ }
- } else if errorsv2.IsV2(err,
+ // Log any errors.
+ r.Entry.Error(err)
+
+ switch {
+ case !retry:
+ // If they were told not to
+ // retry, also set number of
+ // attempts to prevent retry.
+ r.attempts = c.retries + 1
+
+ case r.attempts > c.retries:
+ // On max retries, mark this as
+ // a "badhost", i.e. is erroring.
+ c.badHosts.Set(r.Host, struct{}{})
+
+ // Ensure retry flag is unset
+ // when reached max attempts.
+ retry = false
+
+ case c.badHosts.Has(r.Host):
+ // When retry is still permitted,
+ // check host hasn't been marked
+ // as a "badhost", i.e. erroring.
+ r.attempts = c.retries + 1
+ retry = false
+ }
+
+ return
+}
+
+// do performs the "meat" of DoOnce(), but it's separated out to allow
+// easier wrapping of the response, retry, error returns with further logic.
+func (c *Client) do(r *Request) (rsp *http.Response, retry bool, err error) {
+ // Perform the HTTP request.
+ rsp, err = c.client.Do(r.Request)
+ if err != nil {
+
+ if errorsv2.IsV2(err,
context.DeadlineExceeded,
context.Canceled,
ErrBodyTooLarge,
ErrReservedAddr,
) {
// Non-retryable errors.
- return nil, err
- } else if errstr := err.Error(); // nocollapse
+ return nil, false, err
+ }
+
+ if errstr := err.Error(); //
strings.Contains(errstr, "stopped after 10 redirects") ||
strings.Contains(errstr, "tls: ") ||
strings.Contains(errstr, "x509: ") {
// These error types aren't wrapped
// so we have to check the error string.
// All are unrecoverable!
- return nil, err
- } else if dnserr := (*net.DNSError)(nil); // nocollapse
- errors.As(err, &dnserr) && dnserr.IsNotFound {
- // DNS lookup failure, this domain does not exist
- return nil, gtserror.SetNotFound(err)
+ return nil, false, err
}
- if fastFail {
- // on fast-fail, don't bother backoff/retry
- return nil, fmt.Errorf("%w (fast fail)", err)
+ if dnserr := errorsv2.AsV2[*net.DNSError](err); //
+ dnserr != nil && dnserr.IsNotFound {
+ // DNS lookup failure, this domain does not exist
+ return nil, false, gtserror.SetNotFound(err)
}
- if backoff == 0 {
- // No retry-after found, set our predefined
- // backoff according to a multiplier of 2^n.
- backoff = baseBackoff * 1 << (i + 1)
- }
+ // A retryable error.
+ return nil, true, err
- l.Errorf("backing off for %s after http request error: %v", backoff, err)
+ } else if rsp.StatusCode > 500 ||
+ rsp.StatusCode == http.StatusTooManyRequests {
- select {
- // Request ctx cancelled
- case <-r.Context().Done():
- return nil, r.Context().Err()
+ // Codes over 500 (and 429: too many requests)
+ // are generally temporary errors. For these
+ // we replace the response with a loggable error.
+ err = fmt.Errorf(`http response: %s`, rsp.Status)
- // Backoff for some time
- case <-time.After(backoff):
- }
- }
+ // Search for a provided "Retry-After" header value.
+ if after := rsp.Header.Get("Retry-After"); after != "" {
- // Set error return to trigger setting "bad host".
- err = errors.New("transport reached max retries")
- return
-}
+ // Get cur time.
+ now := time.Now()
-// do wraps http.Client{}.Do() to provide safely limited response bodies.
-func (c *Client) do(req *http.Request) (*http.Response, error) {
- // Perform the HTTP request.
- rsp, err := c.client.Do(req)
- if err != nil {
- return nil, err
+ if u, _ := strconv.ParseUint(after, 10, 32); u != 0 {
+ // An integer no. of backoff seconds was provided.
+ r.backoff = time.Duration(u) * time.Second
+ } else if at, _ := http.ParseTime(after); !at.Before(now) {
+ // An HTTP formatted future date-time was provided.
+ r.backoff = at.Sub(now)
+ }
+
+ // Don't let their provided backoff exceed our max.
+ if max := baseBackoff * time.Duration(c.retries); //
+ r.backoff > max {
+ r.backoff = max
+ }
+ }
+
+ // Unset + close rsp.
+ _ = rsp.Body.Close()
+ return nil, true, err
}
// Seperate the body implementers.
@@ -364,11 +395,10 @@ func (c *Client) do(req *http.Request) (*http.Response, error) {
// Check response body not too large.
if rsp.ContentLength > c.bodyMax {
- _ = rsp.Body.Close()
- return nil, ErrBodyTooLarge
+ return nil, false, ErrBodyTooLarge
}
- return rsp, nil
+ return rsp, true, nil
}
// cast discard writer to full interface it supports.