summaryrefslogtreecommitdiff
path: root/internal/httpclient
diff options
context:
space:
mode:
authorLibravatar kim <89579420+NyaaaWhatsUpDoc@users.noreply.github.com>2023-04-28 16:45:21 +0100
committerLibravatar GitHub <noreply@github.com>2023-04-28 17:45:21 +0200
commit6a29c5ffd40f1919cac40030c53160c19812bc8d (patch)
treef1faaa6504cdb798dfbfd1df20f1493bcdd99602 /internal/httpclient
parent[bugfix] Fix remaining mangled URI escaping issues in statuses + accounts (#1... (diff)
downloadgotosocial-6a29c5ffd40f1919cac40030c53160c19812bc8d.tar.xz
[performance] improved request batching (removes need for queueing) (#1687)
* revamp http client to not limit requests, instead use sender worker Signed-off-by: kim <grufwub@gmail.com> * remove separate sender worker pool, spawn 2*GOMAXPROCS batch senders each time, no need for transport cache sweeping Signed-off-by: kim <grufwub@gmail.com> * improve batch senders to keep popping recipients until remote URL found Signed-off-by: kim <grufwub@gmail.com> * fix recipient looping issue Signed-off-by: kim <grufwub@gmail.com> * fix missing mutex unlock Signed-off-by: kim <grufwub@gmail.com> * move request id ctx key to gtscontext, finish filling out more code comments, add basic support for not logging client IP Signed-off-by: kim <grufwub@gmail.com> * slight code reformatting Signed-off-by: kim <grufwub@gmail.com> * a whitespace Signed-off-by: kim <grufwub@gmail.com> * remove unused code Signed-off-by: kim <grufwub@gmail.com> * add missing license headers Signed-off-by: kim <grufwub@gmail.com> * fix request backoff calculation Signed-off-by: kim <grufwub@gmail.com> --------- Signed-off-by: kim <grufwub@gmail.com>
Diffstat (limited to 'internal/httpclient')
-rw-r--r--internal/httpclient/client.go270
-rw-r--r--internal/httpclient/client_test.go8
-rw-r--r--internal/httpclient/request.go62
-rw-r--r--internal/httpclient/sign.go28
4 files changed, 205 insertions, 163 deletions
diff --git a/internal/httpclient/client.go b/internal/httpclient/client.go
index 9562bdc48..67a1d0715 100644
--- a/internal/httpclient/client.go
+++ b/internal/httpclient/client.go
@@ -18,31 +18,39 @@
package httpclient
import (
+ "context"
+ "crypto/x509"
"errors"
+ "fmt"
"io"
"net"
"net/http"
"net/netip"
"runtime"
+ "strconv"
+ "strings"
"time"
"codeberg.org/gruf/go-bytesize"
+ "codeberg.org/gruf/go-byteutil"
+ "codeberg.org/gruf/go-cache/v3"
+ errorsv2 "codeberg.org/gruf/go-errors/v2"
"codeberg.org/gruf/go-kv"
- "github.com/cornelk/hashmap"
+ "github.com/superseriousbusiness/gotosocial/internal/gtscontext"
+ "github.com/superseriousbusiness/gotosocial/internal/gtserror"
"github.com/superseriousbusiness/gotosocial/internal/log"
)
-// ErrInvalidRequest is returned if a given HTTP request is invalid and cannot be performed.
-var ErrInvalidRequest = errors.New("invalid http request")
+var (
+ // ErrInvalidNetwork is returned if the request would not be performed over TCP
+ ErrInvalidNetwork = errors.New("invalid network type")
-// ErrInvalidNetwork is returned if the request would not be performed over TCP
-var ErrInvalidNetwork = errors.New("invalid network type")
+ // ErrReservedAddr is returned if a dialed address resolves to an IP within a blocked or reserved net.
+ ErrReservedAddr = errors.New("dial within blocked / reserved IP range")
-// ErrReservedAddr is returned if a dialed address resolves to an IP within a blocked or reserved net.
-var ErrReservedAddr = errors.New("dial within blocked / reserved IP range")
-
-// ErrBodyTooLarge is returned when a received response body is above predefined limit (default 40MB).
-var ErrBodyTooLarge = errors.New("body size too large")
+ // ErrBodyTooLarge is returned when a received response body is above predefined limit (default 40MB).
+ ErrBodyTooLarge = errors.New("body size too large")
+)
// Config provides configuration details for setting up a new
// instance of httpclient.Client{}. Within are a subset of the
@@ -83,13 +91,10 @@ type Config struct {
// cases to protect against forged / unknown content-lengths
// - protection from server side request forgery (SSRF) by only dialing
// out to known public IP prefixes, configurable with allows/blocks
-// - limit number of concurrent requests, else blocking until a slot
-// is available (context channels still respected)
type Client struct {
- client http.Client
- queue *hashmap.Map[string, chan struct{}]
- bmax int64 // max response body size
- cmax int // max open conns per host
+ client http.Client
+ badHosts cache.Cache[string, struct{}]
+ bodyMax int64
}
// New returns a new instance of Client initialized using configuration.
@@ -109,28 +114,26 @@ func New(cfg Config) *Client {
}
if cfg.MaxIdleConns <= 0 {
- // By default base this value on MaxOpenConns
+ // By default base this value on MaxOpenConns.
cfg.MaxIdleConns = cfg.MaxOpenConnsPerHost * 10
}
if cfg.MaxBodySize <= 0 {
- // By default set this to a reasonable 40MB
+ // By default set this to a reasonable 40MB.
cfg.MaxBodySize = int64(40 * bytesize.MiB)
}
- // Protect dialer with IP range sanitizer
+ // Protect dialer with IP range sanitizer.
d.Control = (&sanitizer{
allow: cfg.AllowRanges,
block: cfg.BlockRanges,
}).Sanitize
- // Prepare client fields
+ // Prepare client fields.
c.client.Timeout = cfg.Timeout
- c.cmax = cfg.MaxOpenConnsPerHost
- c.bmax = cfg.MaxBodySize
- c.queue = hashmap.New[string, chan struct{}]()
+ c.bodyMax = cfg.MaxBodySize
- // Set underlying HTTP client roundtripper
+ // Set underlying HTTP client roundtripper.
c.client.Transport = &http.Transport{
Proxy: http.ProxyFromEnvironment,
ForceAttemptHTTP2: true,
@@ -144,90 +147,185 @@ func New(cfg Config) *Client {
DisableCompression: cfg.DisableCompression,
}
+ // Initiate outgoing bad hosts lookup cache.
+ c.badHosts = cache.New[string, struct{}](0, 1000, 0)
+ c.badHosts.SetTTL(15*time.Minute, false)
+ if !c.badHosts.Start(time.Minute) {
+ log.Panic(nil, "failed to start transport controller cache")
+ }
+
return &c
}
-// Do will perform given request when an available slot in the queue is available,
-// and block until this time. For returned values, this follows the same semantics
-// as the standard http.Client{}.Do() implementation except that response body will
-// be wrapped by an io.LimitReader() to limit response body sizes.
-func (c *Client) Do(req *http.Request) (*http.Response, error) {
- // Ensure this is a valid request
- if err := ValidateRequest(req); err != nil {
- return nil, err
- }
+// Do ...
+func (c *Client) Do(r *http.Request) (*http.Response, error) {
+ return c.DoSigned(r, func(r *http.Request) error {
+ return nil // no request signing
+ })
+}
- // Get host's wait queue
- wait := c.wait(req.Host)
-
- var ok bool
-
- select {
- // Quickly try grab a spot
- case wait <- struct{}{}:
- // it's our turn!
- ok = true
-
- // NOTE:
- // Ideally here we would set the slot release to happen either
- // on error return, or via callback from the response body closer.
- // However when implementing this, there appear deadlocks between
- // the channel queue here and the media manager worker pool. So
- // currently we only place a limit on connections dialing out, but
- // there may still be more connections open than len(c.queue) given
- // that connections may not be closed until response body is closed.
- // The current implementation will reduce the viability of denial of
- // service attacks, but if there are future issues heed this advice :]
- defer func() { <-wait }()
- default:
+// DoSigned ...
+func (c *Client) DoSigned(r *http.Request, sign SignFunc) (*http.Response, error) {
+ const (
+ // max no. attempts.
+ maxRetries = 5
+
+ // starting backoff duration.
+ baseBackoff = 2 * time.Second
+ )
+
+ // Get request hostname.
+ host := r.URL.Hostname()
+
+ // Check whether request should fast fail.
+ fastFail := gtscontext.IsFastfail(r.Context())
+ if !fastFail {
+ // Check if recently reached max retries for this host
+ // so we don't bother with a retry-backoff loop. The only
+ // errors that are retried upon are server failure and
+ // domain resolution type errors, so this cached result
+ // indicates this server is likely having issues.
+ fastFail = c.badHosts.Has(host)
}
- if !ok {
- // No spot acquired, log warning
- log.WithContext(req.Context()).
- WithFields(kv.Fields{
- {K: "queue", V: len(wait)},
- {K: "method", V: req.Method},
- {K: "host", V: req.Host},
- {K: "uri", V: req.URL.RequestURI()},
- }...).Warn("full request queue")
+ // Start a log entry for this request
+ l := log.WithContext(r.Context()).
+ WithFields(kv.Fields{
+ {"method", r.Method},
+ {"url", r.URL.String()},
+ }...)
+
+ for i := 0; i < maxRetries; i++ {
+ var backoff time.Duration
+
+ // Reset signing header fields
+ now := time.Now().UTC()
+ r.Header.Set("Date", now.Format("Mon, 02 Jan 2006 15:04:05")+" GMT")
+ r.Header.Del("Signature")
+ r.Header.Del("Digest")
+
+ // Rewind body reader and content-length if set.
+ if rc, ok := r.Body.(*byteutil.ReadNopCloser); ok {
+ r.ContentLength = int64(rc.Len())
+ rc.Rewind()
+ }
+
+ // Sign the outgoing request.
+ if err := sign(r); err != nil {
+ return nil, err
+ }
+
+ l.Infof("performing request")
+
+ // Perform the request.
+ rsp, err := c.do(r)
+ if err == nil { //nolint:gocritic
+
+ // TooManyRequest means we need to slow
+ // down and retry our request. Codes over
+ // 500 generally indicate temp. outages.
+ if code := rsp.StatusCode; code < 500 &&
+ code != http.StatusTooManyRequests {
+ return rsp, nil
+ }
+
+ // Generate error from status code for logging
+ err = errors.New(`http response "` + rsp.Status + `"`)
+
+ // Search for a provided "Retry-After" header value.
+ if after := rsp.Header.Get("Retry-After"); after != "" {
+
+ if u, _ := strconv.ParseUint(after, 10, 32); u != 0 {
+ // An integer number of backoff seconds was provided.
+ backoff = time.Duration(u) * time.Second
+ } else if at, _ := http.ParseTime(after); !at.Before(now) {
+ // An HTTP formatted future date-time was provided.
+ backoff = at.Sub(now)
+ }
+
+ // Don't let their provided backoff exceed our max.
+ if max := baseBackoff * maxRetries; backoff > max {
+ backoff = max
+ }
+ }
+
+ } else if errorsv2.Is(err,
+ context.DeadlineExceeded,
+ context.Canceled,
+ ErrBodyTooLarge,
+ ErrReservedAddr,
+ ) {
+ // Return on non-retryable errors
+ return nil, err
+ } else if strings.Contains(err.Error(), "stopped after 10 redirects") {
+ // Don't bother if net/http returned after too many redirects
+ return nil, err
+ } else if errors.As(err, &x509.UnknownAuthorityError{}) {
+ // Unknown authority errors we do NOT recover from
+ return nil, err
+ } else if dnserr := (*net.DNSError)(nil); // nocollapse
+ errors.As(err, &dnserr) && dnserr.IsNotFound {
+ // DNS lookup failure, this domain does not exist
+ return nil, gtserror.SetNotFound(err)
+ }
+
+ if fastFail {
+ // on fast-fail, don't bother backoff/retry
+ return nil, fmt.Errorf("%w (fast fail)", err)
+ }
+
+ if backoff == 0 {
+ // No retry-after found, set our predefined
+ // backoff according to a multiplier of 2^n.
+ backoff = baseBackoff * 1 << (i + 1)
+ }
+
+ l.Errorf("backing off for %s after http request error: %v", backoff, err)
select {
- case <-req.Context().Done():
- // the request was canceled before we
- // got to our turn: no need to release
- return nil, req.Context().Err()
- case wait <- struct{}{}:
- defer func() { <-wait }()
+ // Request ctx cancelled
+ case <-r.Context().Done():
+ return nil, r.Context().Err()
+
+ // Backoff for some time
+ case <-time.After(backoff):
}
}
- // Perform the HTTP request
+ // Add "bad" entry for this host.
+ c.badHosts.Set(host, struct{}{})
+
+ return nil, errors.New("transport reached max retries")
+}
+
+// do ...
+func (c *Client) do(req *http.Request) (*http.Response, error) {
+ // Perform the HTTP request.
rsp, err := c.client.Do(req)
if err != nil {
return nil, err
}
- // Check response body not too large
- if rsp.ContentLength > c.bmax {
+ // Check response body not too large.
+ if rsp.ContentLength > c.bodyMax {
return nil, ErrBodyTooLarge
}
- // Seperate the body implementers
+ // Seperate the body implementers.
rbody := (io.Reader)(rsp.Body)
cbody := (io.Closer)(rsp.Body)
var limit int64
if limit = rsp.ContentLength; limit < 0 {
- // If unknown, use max as reader limit
- limit = c.bmax
+ // If unknown, use max as reader limit.
+ limit = c.bodyMax
}
- // Don't trust them, limit body reads
+ // Don't trust them, limit body reads.
rbody = io.LimitReader(rbody, limit)
- // Wrap body with limit
+ // Wrap body with limit.
rsp.Body = &struct {
io.Reader
io.Closer
@@ -235,17 +333,3 @@ func (c *Client) Do(req *http.Request) (*http.Response, error) {
return rsp, nil
}
-
-// wait acquires the 'wait' queue for the given host string, or allocates new.
-func (c *Client) wait(host string) chan struct{} {
- // Look for an existing queue
- queue, ok := c.queue.Get(host)
- if ok {
- return queue
- }
-
- // Allocate a new host queue (or return a sneaky existing one).
- queue, _ = c.queue.GetOrInsert(host, make(chan struct{}, c.cmax))
-
- return queue
-}
diff --git a/internal/httpclient/client_test.go b/internal/httpclient/client_test.go
index 9eab0fed4..f0ec01ec3 100644
--- a/internal/httpclient/client_test.go
+++ b/internal/httpclient/client_test.go
@@ -48,14 +48,6 @@ var bodies = []string{
"body with\r\nnewlines",
}
-// Note:
-// There is no test for the .MaxOpenConns implementation
-// in the httpclient.Client{}, due to the difficult to test
-// this. The block is only held for the actual dial out to
-// the connection, so the usual test of blocking and holding
-// open this queue slot to check we can't open another isn't
-// an easy test here.
-
func TestHTTPClientSmallBody(t *testing.T) {
for _, body := range bodies {
_TestHTTPClientWithBody(t, []byte(body), int(^uint16(0)))
diff --git a/internal/httpclient/request.go b/internal/httpclient/request.go
deleted file mode 100644
index 881d3f699..000000000
--- a/internal/httpclient/request.go
+++ /dev/null
@@ -1,62 +0,0 @@
-// GoToSocial
-// Copyright (C) GoToSocial Authors admin@gotosocial.org
-// SPDX-License-Identifier: AGPL-3.0-or-later
-//
-// This program is free software: you can redistribute it and/or modify
-// it under the terms of the GNU Affero General Public License as published by
-// the Free Software Foundation, either version 3 of the License, or
-// (at your option) any later version.
-//
-// This program is distributed in the hope that it will be useful,
-// but WITHOUT ANY WARRANTY; without even the implied warranty of
-// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-// GNU Affero General Public License for more details.
-//
-// You should have received a copy of the GNU Affero General Public License
-// along with this program. If not, see <http://www.gnu.org/licenses/>.
-
-package httpclient
-
-import (
- "fmt"
- "net/http"
- "strings"
-
- "golang.org/x/net/http/httpguts"
-)
-
-// ValidateRequest performs the same request validation logic found in the default
-// net/http.Transport{}.roundTrip() function, but pulls it out into this separate
-// function allowing validation errors to be wrapped under a single error type.
-func ValidateRequest(r *http.Request) error {
- switch {
- case r.URL == nil:
- return fmt.Errorf("%w: nil url", ErrInvalidRequest)
- case r.Header == nil:
- return fmt.Errorf("%w: nil header", ErrInvalidRequest)
- case r.URL.Host == "":
- return fmt.Errorf("%w: empty url host", ErrInvalidRequest)
- case r.URL.Scheme != "http" && r.URL.Scheme != "https":
- return fmt.Errorf("%w: unsupported protocol %q", ErrInvalidRequest, r.URL.Scheme)
- case strings.IndexFunc(r.Method, func(r rune) bool { return !httpguts.IsTokenRune(r) }) != -1:
- return fmt.Errorf("%w: invalid method %q", ErrInvalidRequest, r.Method)
- }
-
- for key, values := range r.Header {
- // Check field key name is valid
- if !httpguts.ValidHeaderFieldName(key) {
- return fmt.Errorf("%w: invalid header field name %q", ErrInvalidRequest, key)
- }
-
- // Check each field value is valid
- for i := 0; i < len(values); i++ {
- if !httpguts.ValidHeaderFieldValue(values[i]) {
- return fmt.Errorf("%w: invalid header field value %q", ErrInvalidRequest, values[i])
- }
- }
- }
-
- // ps. kim wrote this
-
- return nil
-}
diff --git a/internal/httpclient/sign.go b/internal/httpclient/sign.go
new file mode 100644
index 000000000..78046aa28
--- /dev/null
+++ b/internal/httpclient/sign.go
@@ -0,0 +1,28 @@
+// GoToSocial
+// Copyright (C) GoToSocial Authors admin@gotosocial.org
+// SPDX-License-Identifier: AGPL-3.0-or-later
+//
+// This program is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Affero General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// This program is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Affero General Public License for more details.
+//
+// You should have received a copy of the GNU Affero General Public License
+// along with this program. If not, see <http://www.gnu.org/licenses/>.
+
+package httpclient
+
+import "net/http"
+
+// SignFunc is a function signature that provides request signing.
+type SignFunc func(r *http.Request) error
+
+type SigningClient interface {
+ Do(r *http.Request) (*http.Response, error)
+ DoSigned(r *http.Request, sign SignFunc) (*http.Response, error)
+}