diff options
Diffstat (limited to 'internal/transport')
-rw-r--r-- | internal/transport/context.go | 42 | ||||
-rw-r--r-- | internal/transport/context_test.go | 33 | ||||
-rw-r--r-- | internal/transport/controller.go | 24 | ||||
-rw-r--r-- | internal/transport/deliver.go | 111 | ||||
-rw-r--r-- | internal/transport/transport.go | 187 |
5 files changed, 104 insertions, 293 deletions
diff --git a/internal/transport/context.go b/internal/transport/context.go deleted file mode 100644 index 96d3f23f7..000000000 --- a/internal/transport/context.go +++ /dev/null @@ -1,42 +0,0 @@ -// GoToSocial -// Copyright (C) GoToSocial Authors admin@gotosocial.org -// SPDX-License-Identifier: AGPL-3.0-or-later -// -// This program is free software: you can redistribute it and/or modify -// it under the terms of the GNU Affero General Public License as published by -// the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. -// -// This program is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU Affero General Public License for more details. -// -// You should have received a copy of the GNU Affero General Public License -// along with this program. If not, see <http://www.gnu.org/licenses/>. - -package transport - -import "context" - -// ctxkey is our own unique context key type to prevent setting outside package. -type ctxkey string - -// fastfailkey is our unique context key to indicate fast-fail is enabled. -var fastfailkey = ctxkey("ff") - -// WithFastfail returns a Context which indicates that any http requests made -// with it should return after the first failed attempt, instead of retrying. -// -// This can be used to fail quickly when you're making an outgoing http request -// inside the context of an incoming http request, and you want to be able to -// provide a snappy response to the user, instead of retrying + backing off. -func WithFastfail(parent context.Context) context.Context { - return context.WithValue(parent, fastfailkey, struct{}{}) -} - -// IsFastfail returns true if the given context was created by WithFastfail. -func IsFastfail(ctx context.Context) bool { - _, ok := ctx.Value(fastfailkey).(struct{}) - return ok -} diff --git a/internal/transport/context_test.go b/internal/transport/context_test.go deleted file mode 100644 index e06e7c4d5..000000000 --- a/internal/transport/context_test.go +++ /dev/null @@ -1,33 +0,0 @@ -// GoToSocial -// Copyright (C) GoToSocial Authors admin@gotosocial.org -// SPDX-License-Identifier: AGPL-3.0-or-later -// -// This program is free software: you can redistribute it and/or modify -// it under the terms of the GNU Affero General Public License as published by -// the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. -// -// This program is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU Affero General Public License for more details. -// -// You should have received a copy of the GNU Affero General Public License -// along with this program. If not, see <http://www.gnu.org/licenses/>. - -package transport_test - -import ( - "context" - "testing" - - "github.com/superseriousbusiness/gotosocial/internal/transport" -) - -func TestFastFailContext(t *testing.T) { - ctx := context.Background() - ctx = transport.WithFastfail(ctx) - if !transport.IsFastfail(ctx) { - t.Fatal("failed to set fast-fail context key") - } -} diff --git a/internal/transport/controller.go b/internal/transport/controller.go index 331659f64..e1271d202 100644 --- a/internal/transport/controller.go +++ b/internal/transport/controller.go @@ -24,7 +24,7 @@ import ( "encoding/json" "fmt" "net/url" - "time" + "runtime" "codeberg.org/gruf/go-byteutil" "codeberg.org/gruf/go-cache/v3" @@ -32,7 +32,7 @@ import ( "github.com/superseriousbusiness/activity/streams" "github.com/superseriousbusiness/gotosocial/internal/config" "github.com/superseriousbusiness/gotosocial/internal/federation/federatingdb" - "github.com/superseriousbusiness/gotosocial/internal/log" + "github.com/superseriousbusiness/gotosocial/internal/httpclient" "github.com/superseriousbusiness/gotosocial/internal/state" ) @@ -49,14 +49,14 @@ type controller struct { state *state.State fedDB federatingdb.DB clock pub.Clock - client pub.HttpClient + client httpclient.SigningClient trspCache cache.Cache[string, *transport] - badHosts cache.Cache[string, struct{}] userAgent string + senders int // no. concurrent batch delivery routines. } // NewController returns an implementation of the Controller interface for creating new transports -func NewController(state *state.State, federatingDB federatingdb.DB, clock pub.Clock, client pub.HttpClient) Controller { +func NewController(state *state.State, federatingDB federatingdb.DB, clock pub.Clock, client httpclient.SigningClient) Controller { applicationName := config.GetApplicationName() host := config.GetHost() proto := config.GetProtocol() @@ -68,20 +68,8 @@ func NewController(state *state.State, federatingDB federatingdb.DB, clock pub.C clock: clock, client: client, trspCache: cache.New[string, *transport](0, 100, 0), - badHosts: cache.New[string, struct{}](0, 1000, 0), userAgent: fmt.Sprintf("%s (+%s://%s) gotosocial/%s", applicationName, proto, host, version), - } - - // Transport cache has TTL=1hr freq=1min - c.trspCache.SetTTL(time.Hour, false) - if !c.trspCache.Start(time.Minute) { - log.Panic(nil, "failed to start transport controller cache") - } - - // Bad hosts cache has TTL=15min freq=1min - c.badHosts.SetTTL(15*time.Minute, false) - if !c.badHosts.Start(time.Minute) { - log.Panic(nil, "failed to start transport controller cache") + senders: runtime.GOMAXPROCS(0), // on batch delivery, only ever send GOMAXPROCS at a time. } return c diff --git a/internal/transport/deliver.go b/internal/transport/deliver.go index 8ec939503..fff7dbcf4 100644 --- a/internal/transport/deliver.go +++ b/internal/transport/deliver.go @@ -22,7 +22,6 @@ import ( "fmt" "net/http" "net/url" - "strings" "sync" "codeberg.org/gruf/go-byteutil" @@ -32,54 +31,90 @@ import ( ) func (t *transport) BatchDeliver(ctx context.Context, b []byte, recipients []*url.URL) error { - // concurrently deliver to recipients; for each delivery, buffer the error if it fails - wg := sync.WaitGroup{} - errCh := make(chan error, len(recipients)) - for _, recipient := range recipients { - wg.Add(1) - go func(r *url.URL) { - defer wg.Done() - if err := t.Deliver(ctx, b, r); err != nil { - errCh <- err + var ( + // errs accumulates errors received during + // attempted delivery by deliverer routines. + errs gtserror.MultiError + + // wait blocks until all sender + // routines have returned. + wait sync.WaitGroup + + // mutex protects 'recipients' and + // 'errs' for concurrent access. + mutex sync.Mutex + + // Get current instance host info. + domain = config.GetAccountDomain() + host = config.GetHost() + ) + + // Block on expect no. senders. + wait.Add(t.controller.senders) + + for i := 0; i < t.controller.senders; i++ { + go func() { + // Mark returned. + defer wait.Done() + + for { + // Acquire lock. + mutex.Lock() + + if len(recipients) == 0 { + // Reached end. + mutex.Unlock() + return + } + + // Pop next recipient. + i := len(recipients) - 1 + to := recipients[i] + recipients = recipients[:i] + + // Done with lock. + mutex.Unlock() + + // Skip delivery to recipient if it is "us". + if to.Host == host || to.Host == domain { + continue + } + + // Attempt to deliver data to recipient. + if err := t.deliver(ctx, b, to); err != nil { + mutex.Lock() // safely append err to accumulator. + errs.Appendf("error delivering to %s: %v", to, err) + mutex.Unlock() + } } - }(recipient) + }() } - // wait until all deliveries have succeeded or failed - wg.Wait() - - // receive any buffered errors - errs := make([]string, 0, len(errCh)) -outer: - for { - select { - case e := <-errCh: - errs = append(errs, e.Error()) - default: - break outer - } - } - - if len(errs) > 0 { - return fmt.Errorf("BatchDeliver: at least one failure: %s", strings.Join(errs, "; ")) - } + // Wait for finish. + wait.Wait() - return nil + // Return combined err. + return errs.Combine() } func (t *transport) Deliver(ctx context.Context, b []byte, to *url.URL) error { - // if the 'to' host is our own, just skip this delivery since we by definition already have the message! + // if 'to' host is our own, skip as we don't need to deliver to ourselves... if to.Host == config.GetHost() || to.Host == config.GetAccountDomain() { return nil } - urlStr := to.String() + // Deliver data to recipient. + return t.deliver(ctx, b, to) +} + +func (t *transport) deliver(ctx context.Context, b []byte, to *url.URL) error { + url := to.String() // Use rewindable bytes reader for body. var body byteutil.ReadNopCloser body.Reset(b) - req, err := http.NewRequestWithContext(ctx, "POST", urlStr, &body) + req, err := http.NewRequestWithContext(ctx, "POST", url, &body) if err != nil { return err } @@ -88,16 +123,16 @@ func (t *transport) Deliver(ctx context.Context, b []byte, to *url.URL) error { req.Header.Add("Accept-Charset", "utf-8") req.Header.Set("Host", to.Host) - resp, err := t.POST(req, b) + rsp, err := t.POST(req, b) if err != nil { return err } - defer resp.Body.Close() + defer rsp.Body.Close() - if code := resp.StatusCode; code != http.StatusOK && + if code := rsp.StatusCode; code != http.StatusOK && code != http.StatusCreated && code != http.StatusAccepted { - err := fmt.Errorf("POST request to %s failed: %s", urlStr, resp.Status) - return gtserror.WithStatusCode(err, resp.StatusCode) + err := fmt.Errorf("POST request to %s failed: %s", url, rsp.Status) + return gtserror.WithStatusCode(err, rsp.StatusCode) } return nil diff --git a/internal/transport/transport.go b/internal/transport/transport.go index e8f742f5b..0123b3ea8 100644 --- a/internal/transport/transport.go +++ b/internal/transport/transport.go @@ -20,26 +20,17 @@ package transport import ( "context" "crypto" - "crypto/x509" "errors" - "fmt" "io" - "net" "net/http" "net/url" - "strconv" - "strings" "sync" "time" - "codeberg.org/gruf/go-byteutil" - errorsv2 "codeberg.org/gruf/go-errors/v2" - "codeberg.org/gruf/go-kv" "github.com/go-fed/httpsig" - "github.com/superseriousbusiness/gotosocial/internal/gtserror" + "github.com/superseriousbusiness/gotosocial/internal/gtscontext" "github.com/superseriousbusiness/gotosocial/internal/gtsmodel" "github.com/superseriousbusiness/gotosocial/internal/httpclient" - "github.com/superseriousbusiness/gotosocial/internal/log" ) // Transport implements the pub.Transport interface with some additional functionality for fetching remote media. @@ -78,7 +69,7 @@ type Transport interface { Finger(ctx context.Context, targetUsername string, targetDomain string) ([]byte, error) } -// transport implements the Transport interface +// transport implements the Transport interface. type transport struct { controller *controller pubKeyID string @@ -95,9 +86,11 @@ func (t *transport) GET(r *http.Request) (*http.Response, error) { if r.Method != http.MethodGet { return nil, errors.New("must be GET request") } - return t.do(r, func(r *http.Request) error { - return t.signGET(r) - }) + ctx := r.Context() // extract, set pubkey ID. + ctx = gtscontext.SetPublicKeyID(ctx, t.pubKeyID) + r = r.WithContext(ctx) // replace request ctx. + r.Header.Set("User-Agent", t.controller.userAgent) + return t.controller.client.DoSigned(r, t.signGET()) } // POST will perform given http request using transport client, retrying on certain preset errors. @@ -105,161 +98,31 @@ func (t *transport) POST(r *http.Request, body []byte) (*http.Response, error) { if r.Method != http.MethodPost { return nil, errors.New("must be POST request") } - return t.do(r, func(r *http.Request) error { - return t.signPOST(r, body) - }) -} - -func (t *transport) do(r *http.Request, signer func(*http.Request) error) (*http.Response, error) { - const ( - // max no. attempts - maxRetries = 5 - - // starting backoff duration. - baseBackoff = 2 * time.Second - ) - - // Get request hostname - host := r.URL.Hostname() - - // Check whether request should fast fail, we check this - // before loop as each context.Value() requires mutex lock. - fastFail := IsFastfail(r.Context()) - if !fastFail { - // Check if recently reached max retries for this host - // so we don't bother with a retry-backoff loop. The only - // errors that are retried upon are server failure and - // domain resolution type errors, so this cached result - // indicates this server is likely having issues. - fastFail = t.controller.badHosts.Has(host) - } - - // Start a log entry for this request - l := log.WithContext(r.Context()). - WithFields(kv.Fields{ - {"pubKeyID", t.pubKeyID}, - {"method", r.Method}, - {"url", r.URL.String()}, - }...) - + ctx := r.Context() // extract, set pubkey ID. + ctx = gtscontext.SetPublicKeyID(ctx, t.pubKeyID) + r = r.WithContext(ctx) // replace request ctx. r.Header.Set("User-Agent", t.controller.userAgent) - - for i := 0; i < maxRetries; i++ { - var backoff time.Duration - - // Reset signing header fields - now := t.controller.clock.Now().UTC() - r.Header.Set("Date", now.Format("Mon, 02 Jan 2006 15:04:05")+" GMT") - r.Header.Del("Signature") - r.Header.Del("Digest") - - // Rewind body reader and content-length if set. - if rc, ok := r.Body.(*byteutil.ReadNopCloser); ok { - r.ContentLength = int64(rc.Len()) - rc.Rewind() - } - - // Perform request signing - if err := signer(r); err != nil { - return nil, err - } - - l.Infof("performing request") - - // Attempt to perform request - rsp, err := t.controller.client.Do(r) - if err == nil { //nolint:gocritic - // TooManyRequest means we need to slow - // down and retry our request. Codes over - // 500 generally indicate temp. outages. - if code := rsp.StatusCode; code < 500 && - code != http.StatusTooManyRequests { - return rsp, nil - } - - // Generate error from status code for logging - err = errors.New(`http response "` + rsp.Status + `"`) - - // Search for a provided "Retry-After" header value. - if after := rsp.Header.Get("Retry-After"); after != "" { - - if u, _ := strconv.ParseUint(after, 10, 32); u != 0 { - // An integer number of backoff seconds was provided. - backoff = time.Duration(u) * time.Second - } else if at, _ := http.ParseTime(after); !at.Before(now) { - // An HTTP formatted future date-time was provided. - backoff = at.Sub(now) - } - - // Don't let their provided backoff exceed our max. - if max := baseBackoff * maxRetries; backoff > max { - backoff = max - } - } - - } else if errorsv2.Is(err, - context.DeadlineExceeded, - context.Canceled, - httpclient.ErrInvalidRequest, - httpclient.ErrBodyTooLarge, - httpclient.ErrReservedAddr, - ) { - // Return on non-retryable errors - return nil, err - } else if strings.Contains(err.Error(), "stopped after 10 redirects") { - // Don't bother if net/http returned after too many redirects - return nil, err - } else if errors.As(err, &x509.UnknownAuthorityError{}) { - // Unknown authority errors we do NOT recover from - return nil, err - } else if dnserr := (*net.DNSError)(nil); // nocollapse - errors.As(err, &dnserr) && dnserr.IsNotFound { - // DNS lookup failure, this domain does not exist - return nil, gtserror.SetNotFound(err) - } - - if fastFail { - // on fast-fail, don't bother backoff/retry - return nil, fmt.Errorf("%w (fast fail)", err) - } - - if backoff == 0 { - // No retry-after found, set our predefined backoff. - backoff = time.Duration(i) * baseBackoff - } - - l.Errorf("backing off for %s after http request error: %v", backoff, err) - - select { - // Request ctx cancelled - case <-r.Context().Done(): - return nil, r.Context().Err() - - // Backoff for some time - case <-time.After(backoff): - } - } - - // Add "bad" entry for this host. - t.controller.badHosts.Set(host, struct{}{}) - - return nil, errors.New("transport reached max retries") + return t.controller.client.DoSigned(r, t.signPOST(body)) } // signGET will safely sign an HTTP GET request. -func (t *transport) signGET(r *http.Request) (err error) { - t.safesign(func() { - err = t.getSigner.SignRequest(t.privkey, t.pubKeyID, r, nil) - }) - return +func (t *transport) signGET() httpclient.SignFunc { + return func(r *http.Request) (err error) { + t.safesign(func() { + err = t.getSigner.SignRequest(t.privkey, t.pubKeyID, r, nil) + }) + return + } } // signPOST will safely sign an HTTP POST request for given body. -func (t *transport) signPOST(r *http.Request, body []byte) (err error) { - t.safesign(func() { - err = t.postSigner.SignRequest(t.privkey, t.pubKeyID, r, body) - }) - return +func (t *transport) signPOST(body []byte) httpclient.SignFunc { + return func(r *http.Request) (err error) { + t.safesign(func() { + err = t.postSigner.SignRequest(t.privkey, t.pubKeyID, r, body) + }) + return + } } // safesign will perform sign function within mutex protection, |