summaryrefslogtreecommitdiff
path: root/internal/transport
diff options
context:
space:
mode:
Diffstat (limited to 'internal/transport')
-rw-r--r--internal/transport/context.go42
-rw-r--r--internal/transport/context_test.go33
-rw-r--r--internal/transport/controller.go24
-rw-r--r--internal/transport/deliver.go111
-rw-r--r--internal/transport/transport.go187
5 files changed, 104 insertions, 293 deletions
diff --git a/internal/transport/context.go b/internal/transport/context.go
deleted file mode 100644
index 96d3f23f7..000000000
--- a/internal/transport/context.go
+++ /dev/null
@@ -1,42 +0,0 @@
-// GoToSocial
-// Copyright (C) GoToSocial Authors admin@gotosocial.org
-// SPDX-License-Identifier: AGPL-3.0-or-later
-//
-// This program is free software: you can redistribute it and/or modify
-// it under the terms of the GNU Affero General Public License as published by
-// the Free Software Foundation, either version 3 of the License, or
-// (at your option) any later version.
-//
-// This program is distributed in the hope that it will be useful,
-// but WITHOUT ANY WARRANTY; without even the implied warranty of
-// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-// GNU Affero General Public License for more details.
-//
-// You should have received a copy of the GNU Affero General Public License
-// along with this program. If not, see <http://www.gnu.org/licenses/>.
-
-package transport
-
-import "context"
-
-// ctxkey is our own unique context key type to prevent setting outside package.
-type ctxkey string
-
-// fastfailkey is our unique context key to indicate fast-fail is enabled.
-var fastfailkey = ctxkey("ff")
-
-// WithFastfail returns a Context which indicates that any http requests made
-// with it should return after the first failed attempt, instead of retrying.
-//
-// This can be used to fail quickly when you're making an outgoing http request
-// inside the context of an incoming http request, and you want to be able to
-// provide a snappy response to the user, instead of retrying + backing off.
-func WithFastfail(parent context.Context) context.Context {
- return context.WithValue(parent, fastfailkey, struct{}{})
-}
-
-// IsFastfail returns true if the given context was created by WithFastfail.
-func IsFastfail(ctx context.Context) bool {
- _, ok := ctx.Value(fastfailkey).(struct{})
- return ok
-}
diff --git a/internal/transport/context_test.go b/internal/transport/context_test.go
deleted file mode 100644
index e06e7c4d5..000000000
--- a/internal/transport/context_test.go
+++ /dev/null
@@ -1,33 +0,0 @@
-// GoToSocial
-// Copyright (C) GoToSocial Authors admin@gotosocial.org
-// SPDX-License-Identifier: AGPL-3.0-or-later
-//
-// This program is free software: you can redistribute it and/or modify
-// it under the terms of the GNU Affero General Public License as published by
-// the Free Software Foundation, either version 3 of the License, or
-// (at your option) any later version.
-//
-// This program is distributed in the hope that it will be useful,
-// but WITHOUT ANY WARRANTY; without even the implied warranty of
-// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-// GNU Affero General Public License for more details.
-//
-// You should have received a copy of the GNU Affero General Public License
-// along with this program. If not, see <http://www.gnu.org/licenses/>.
-
-package transport_test
-
-import (
- "context"
- "testing"
-
- "github.com/superseriousbusiness/gotosocial/internal/transport"
-)
-
-func TestFastFailContext(t *testing.T) {
- ctx := context.Background()
- ctx = transport.WithFastfail(ctx)
- if !transport.IsFastfail(ctx) {
- t.Fatal("failed to set fast-fail context key")
- }
-}
diff --git a/internal/transport/controller.go b/internal/transport/controller.go
index 331659f64..e1271d202 100644
--- a/internal/transport/controller.go
+++ b/internal/transport/controller.go
@@ -24,7 +24,7 @@ import (
"encoding/json"
"fmt"
"net/url"
- "time"
+ "runtime"
"codeberg.org/gruf/go-byteutil"
"codeberg.org/gruf/go-cache/v3"
@@ -32,7 +32,7 @@ import (
"github.com/superseriousbusiness/activity/streams"
"github.com/superseriousbusiness/gotosocial/internal/config"
"github.com/superseriousbusiness/gotosocial/internal/federation/federatingdb"
- "github.com/superseriousbusiness/gotosocial/internal/log"
+ "github.com/superseriousbusiness/gotosocial/internal/httpclient"
"github.com/superseriousbusiness/gotosocial/internal/state"
)
@@ -49,14 +49,14 @@ type controller struct {
state *state.State
fedDB federatingdb.DB
clock pub.Clock
- client pub.HttpClient
+ client httpclient.SigningClient
trspCache cache.Cache[string, *transport]
- badHosts cache.Cache[string, struct{}]
userAgent string
+ senders int // no. concurrent batch delivery routines.
}
// NewController returns an implementation of the Controller interface for creating new transports
-func NewController(state *state.State, federatingDB federatingdb.DB, clock pub.Clock, client pub.HttpClient) Controller {
+func NewController(state *state.State, federatingDB federatingdb.DB, clock pub.Clock, client httpclient.SigningClient) Controller {
applicationName := config.GetApplicationName()
host := config.GetHost()
proto := config.GetProtocol()
@@ -68,20 +68,8 @@ func NewController(state *state.State, federatingDB federatingdb.DB, clock pub.C
clock: clock,
client: client,
trspCache: cache.New[string, *transport](0, 100, 0),
- badHosts: cache.New[string, struct{}](0, 1000, 0),
userAgent: fmt.Sprintf("%s (+%s://%s) gotosocial/%s", applicationName, proto, host, version),
- }
-
- // Transport cache has TTL=1hr freq=1min
- c.trspCache.SetTTL(time.Hour, false)
- if !c.trspCache.Start(time.Minute) {
- log.Panic(nil, "failed to start transport controller cache")
- }
-
- // Bad hosts cache has TTL=15min freq=1min
- c.badHosts.SetTTL(15*time.Minute, false)
- if !c.badHosts.Start(time.Minute) {
- log.Panic(nil, "failed to start transport controller cache")
+ senders: runtime.GOMAXPROCS(0), // on batch delivery, only ever send GOMAXPROCS at a time.
}
return c
diff --git a/internal/transport/deliver.go b/internal/transport/deliver.go
index 8ec939503..fff7dbcf4 100644
--- a/internal/transport/deliver.go
+++ b/internal/transport/deliver.go
@@ -22,7 +22,6 @@ import (
"fmt"
"net/http"
"net/url"
- "strings"
"sync"
"codeberg.org/gruf/go-byteutil"
@@ -32,54 +31,90 @@ import (
)
func (t *transport) BatchDeliver(ctx context.Context, b []byte, recipients []*url.URL) error {
- // concurrently deliver to recipients; for each delivery, buffer the error if it fails
- wg := sync.WaitGroup{}
- errCh := make(chan error, len(recipients))
- for _, recipient := range recipients {
- wg.Add(1)
- go func(r *url.URL) {
- defer wg.Done()
- if err := t.Deliver(ctx, b, r); err != nil {
- errCh <- err
+ var (
+ // errs accumulates errors received during
+ // attempted delivery by deliverer routines.
+ errs gtserror.MultiError
+
+ // wait blocks until all sender
+ // routines have returned.
+ wait sync.WaitGroup
+
+ // mutex protects 'recipients' and
+ // 'errs' for concurrent access.
+ mutex sync.Mutex
+
+ // Get current instance host info.
+ domain = config.GetAccountDomain()
+ host = config.GetHost()
+ )
+
+ // Block on expect no. senders.
+ wait.Add(t.controller.senders)
+
+ for i := 0; i < t.controller.senders; i++ {
+ go func() {
+ // Mark returned.
+ defer wait.Done()
+
+ for {
+ // Acquire lock.
+ mutex.Lock()
+
+ if len(recipients) == 0 {
+ // Reached end.
+ mutex.Unlock()
+ return
+ }
+
+ // Pop next recipient.
+ i := len(recipients) - 1
+ to := recipients[i]
+ recipients = recipients[:i]
+
+ // Done with lock.
+ mutex.Unlock()
+
+ // Skip delivery to recipient if it is "us".
+ if to.Host == host || to.Host == domain {
+ continue
+ }
+
+ // Attempt to deliver data to recipient.
+ if err := t.deliver(ctx, b, to); err != nil {
+ mutex.Lock() // safely append err to accumulator.
+ errs.Appendf("error delivering to %s: %v", to, err)
+ mutex.Unlock()
+ }
}
- }(recipient)
+ }()
}
- // wait until all deliveries have succeeded or failed
- wg.Wait()
-
- // receive any buffered errors
- errs := make([]string, 0, len(errCh))
-outer:
- for {
- select {
- case e := <-errCh:
- errs = append(errs, e.Error())
- default:
- break outer
- }
- }
-
- if len(errs) > 0 {
- return fmt.Errorf("BatchDeliver: at least one failure: %s", strings.Join(errs, "; "))
- }
+ // Wait for finish.
+ wait.Wait()
- return nil
+ // Return combined err.
+ return errs.Combine()
}
func (t *transport) Deliver(ctx context.Context, b []byte, to *url.URL) error {
- // if the 'to' host is our own, just skip this delivery since we by definition already have the message!
+ // if 'to' host is our own, skip as we don't need to deliver to ourselves...
if to.Host == config.GetHost() || to.Host == config.GetAccountDomain() {
return nil
}
- urlStr := to.String()
+ // Deliver data to recipient.
+ return t.deliver(ctx, b, to)
+}
+
+func (t *transport) deliver(ctx context.Context, b []byte, to *url.URL) error {
+ url := to.String()
// Use rewindable bytes reader for body.
var body byteutil.ReadNopCloser
body.Reset(b)
- req, err := http.NewRequestWithContext(ctx, "POST", urlStr, &body)
+ req, err := http.NewRequestWithContext(ctx, "POST", url, &body)
if err != nil {
return err
}
@@ -88,16 +123,16 @@ func (t *transport) Deliver(ctx context.Context, b []byte, to *url.URL) error {
req.Header.Add("Accept-Charset", "utf-8")
req.Header.Set("Host", to.Host)
- resp, err := t.POST(req, b)
+ rsp, err := t.POST(req, b)
if err != nil {
return err
}
- defer resp.Body.Close()
+ defer rsp.Body.Close()
- if code := resp.StatusCode; code != http.StatusOK &&
+ if code := rsp.StatusCode; code != http.StatusOK &&
code != http.StatusCreated && code != http.StatusAccepted {
- err := fmt.Errorf("POST request to %s failed: %s", urlStr, resp.Status)
- return gtserror.WithStatusCode(err, resp.StatusCode)
+ err := fmt.Errorf("POST request to %s failed: %s", url, rsp.Status)
+ return gtserror.WithStatusCode(err, rsp.StatusCode)
}
return nil
diff --git a/internal/transport/transport.go b/internal/transport/transport.go
index e8f742f5b..0123b3ea8 100644
--- a/internal/transport/transport.go
+++ b/internal/transport/transport.go
@@ -20,26 +20,17 @@ package transport
import (
"context"
"crypto"
- "crypto/x509"
"errors"
- "fmt"
"io"
- "net"
"net/http"
"net/url"
- "strconv"
- "strings"
"sync"
"time"
- "codeberg.org/gruf/go-byteutil"
- errorsv2 "codeberg.org/gruf/go-errors/v2"
- "codeberg.org/gruf/go-kv"
"github.com/go-fed/httpsig"
- "github.com/superseriousbusiness/gotosocial/internal/gtserror"
+ "github.com/superseriousbusiness/gotosocial/internal/gtscontext"
"github.com/superseriousbusiness/gotosocial/internal/gtsmodel"
"github.com/superseriousbusiness/gotosocial/internal/httpclient"
- "github.com/superseriousbusiness/gotosocial/internal/log"
)
// Transport implements the pub.Transport interface with some additional functionality for fetching remote media.
@@ -78,7 +69,7 @@ type Transport interface {
Finger(ctx context.Context, targetUsername string, targetDomain string) ([]byte, error)
}
-// transport implements the Transport interface
+// transport implements the Transport interface.
type transport struct {
controller *controller
pubKeyID string
@@ -95,9 +86,11 @@ func (t *transport) GET(r *http.Request) (*http.Response, error) {
if r.Method != http.MethodGet {
return nil, errors.New("must be GET request")
}
- return t.do(r, func(r *http.Request) error {
- return t.signGET(r)
- })
+ ctx := r.Context() // extract, set pubkey ID.
+ ctx = gtscontext.SetPublicKeyID(ctx, t.pubKeyID)
+ r = r.WithContext(ctx) // replace request ctx.
+ r.Header.Set("User-Agent", t.controller.userAgent)
+ return t.controller.client.DoSigned(r, t.signGET())
}
// POST will perform given http request using transport client, retrying on certain preset errors.
@@ -105,161 +98,31 @@ func (t *transport) POST(r *http.Request, body []byte) (*http.Response, error) {
if r.Method != http.MethodPost {
return nil, errors.New("must be POST request")
}
- return t.do(r, func(r *http.Request) error {
- return t.signPOST(r, body)
- })
-}
-
-func (t *transport) do(r *http.Request, signer func(*http.Request) error) (*http.Response, error) {
- const (
- // max no. attempts
- maxRetries = 5
-
- // starting backoff duration.
- baseBackoff = 2 * time.Second
- )
-
- // Get request hostname
- host := r.URL.Hostname()
-
- // Check whether request should fast fail, we check this
- // before loop as each context.Value() requires mutex lock.
- fastFail := IsFastfail(r.Context())
- if !fastFail {
- // Check if recently reached max retries for this host
- // so we don't bother with a retry-backoff loop. The only
- // errors that are retried upon are server failure and
- // domain resolution type errors, so this cached result
- // indicates this server is likely having issues.
- fastFail = t.controller.badHosts.Has(host)
- }
-
- // Start a log entry for this request
- l := log.WithContext(r.Context()).
- WithFields(kv.Fields{
- {"pubKeyID", t.pubKeyID},
- {"method", r.Method},
- {"url", r.URL.String()},
- }...)
-
+ ctx := r.Context() // extract, set pubkey ID.
+ ctx = gtscontext.SetPublicKeyID(ctx, t.pubKeyID)
+ r = r.WithContext(ctx) // replace request ctx.
r.Header.Set("User-Agent", t.controller.userAgent)
-
- for i := 0; i < maxRetries; i++ {
- var backoff time.Duration
-
- // Reset signing header fields
- now := t.controller.clock.Now().UTC()
- r.Header.Set("Date", now.Format("Mon, 02 Jan 2006 15:04:05")+" GMT")
- r.Header.Del("Signature")
- r.Header.Del("Digest")
-
- // Rewind body reader and content-length if set.
- if rc, ok := r.Body.(*byteutil.ReadNopCloser); ok {
- r.ContentLength = int64(rc.Len())
- rc.Rewind()
- }
-
- // Perform request signing
- if err := signer(r); err != nil {
- return nil, err
- }
-
- l.Infof("performing request")
-
- // Attempt to perform request
- rsp, err := t.controller.client.Do(r)
- if err == nil { //nolint:gocritic
- // TooManyRequest means we need to slow
- // down and retry our request. Codes over
- // 500 generally indicate temp. outages.
- if code := rsp.StatusCode; code < 500 &&
- code != http.StatusTooManyRequests {
- return rsp, nil
- }
-
- // Generate error from status code for logging
- err = errors.New(`http response "` + rsp.Status + `"`)
-
- // Search for a provided "Retry-After" header value.
- if after := rsp.Header.Get("Retry-After"); after != "" {
-
- if u, _ := strconv.ParseUint(after, 10, 32); u != 0 {
- // An integer number of backoff seconds was provided.
- backoff = time.Duration(u) * time.Second
- } else if at, _ := http.ParseTime(after); !at.Before(now) {
- // An HTTP formatted future date-time was provided.
- backoff = at.Sub(now)
- }
-
- // Don't let their provided backoff exceed our max.
- if max := baseBackoff * maxRetries; backoff > max {
- backoff = max
- }
- }
-
- } else if errorsv2.Is(err,
- context.DeadlineExceeded,
- context.Canceled,
- httpclient.ErrInvalidRequest,
- httpclient.ErrBodyTooLarge,
- httpclient.ErrReservedAddr,
- ) {
- // Return on non-retryable errors
- return nil, err
- } else if strings.Contains(err.Error(), "stopped after 10 redirects") {
- // Don't bother if net/http returned after too many redirects
- return nil, err
- } else if errors.As(err, &x509.UnknownAuthorityError{}) {
- // Unknown authority errors we do NOT recover from
- return nil, err
- } else if dnserr := (*net.DNSError)(nil); // nocollapse
- errors.As(err, &dnserr) && dnserr.IsNotFound {
- // DNS lookup failure, this domain does not exist
- return nil, gtserror.SetNotFound(err)
- }
-
- if fastFail {
- // on fast-fail, don't bother backoff/retry
- return nil, fmt.Errorf("%w (fast fail)", err)
- }
-
- if backoff == 0 {
- // No retry-after found, set our predefined backoff.
- backoff = time.Duration(i) * baseBackoff
- }
-
- l.Errorf("backing off for %s after http request error: %v", backoff, err)
-
- select {
- // Request ctx cancelled
- case <-r.Context().Done():
- return nil, r.Context().Err()
-
- // Backoff for some time
- case <-time.After(backoff):
- }
- }
-
- // Add "bad" entry for this host.
- t.controller.badHosts.Set(host, struct{}{})
-
- return nil, errors.New("transport reached max retries")
+ return t.controller.client.DoSigned(r, t.signPOST(body))
}
// signGET will safely sign an HTTP GET request.
-func (t *transport) signGET(r *http.Request) (err error) {
- t.safesign(func() {
- err = t.getSigner.SignRequest(t.privkey, t.pubKeyID, r, nil)
- })
- return
+func (t *transport) signGET() httpclient.SignFunc {
+ return func(r *http.Request) (err error) {
+ t.safesign(func() {
+ err = t.getSigner.SignRequest(t.privkey, t.pubKeyID, r, nil)
+ })
+ return
+ }
}
// signPOST will safely sign an HTTP POST request for given body.
-func (t *transport) signPOST(r *http.Request, body []byte) (err error) {
- t.safesign(func() {
- err = t.postSigner.SignRequest(t.privkey, t.pubKeyID, r, body)
- })
- return
+func (t *transport) signPOST(body []byte) httpclient.SignFunc {
+ return func(r *http.Request) (err error) {
+ t.safesign(func() {
+ err = t.postSigner.SignRequest(t.privkey, t.pubKeyID, r, body)
+ })
+ return
+ }
}
// safesign will perform sign function within mutex protection,