1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
|
// GoToSocial
// Copyright (C) GoToSocial Authors admin@gotosocial.org
// SPDX-License-Identifier: AGPL-3.0-or-later
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package transport
import (
"context"
"crypto"
"crypto/x509"
"errors"
"fmt"
"io"
"net"
"net/http"
"net/url"
"strconv"
"strings"
"sync"
"time"
"codeberg.org/gruf/go-byteutil"
errorsv2 "codeberg.org/gruf/go-errors/v2"
"codeberg.org/gruf/go-kv"
"github.com/go-fed/httpsig"
"github.com/superseriousbusiness/gotosocial/internal/gtserror"
"github.com/superseriousbusiness/gotosocial/internal/gtsmodel"
"github.com/superseriousbusiness/gotosocial/internal/httpclient"
"github.com/superseriousbusiness/gotosocial/internal/log"
)
// Transport implements the pub.Transport interface with some additional functionality for fetching remote media.
//
// Since the transport has the concept of 'shortcuts' for fetching data locally rather than remotely, it is
// not *always* the case that calling a Transport function does an http call, but it usually will for remote
// hosts or resources for which a shortcut isn't provided by the transport controller (also in this package).
//
// For any of the transport functions, if a Fastfail context is passed in as the first parameter, the function
// will return after the first transport failure, instead of retrying + backing off.
type Transport interface {
/*
POST functions
*/
// Deliver sends an ActivityStreams object.
Deliver(ctx context.Context, b []byte, to *url.URL) error
// BatchDeliver sends an ActivityStreams object to multiple recipients.
BatchDeliver(ctx context.Context, b []byte, recipients []*url.URL) error
/*
GET functions
*/
// Dereference fetches the ActivityStreams object located at this IRI with a GET request.
Dereference(ctx context.Context, iri *url.URL) ([]byte, error)
// DereferenceMedia fetches the given media attachment IRI, returning the reader and filesize.
DereferenceMedia(ctx context.Context, iri *url.URL) (io.ReadCloser, int64, error)
// DereferenceInstance dereferences remote instance information, first by checking /api/v1/instance, and then by checking /.well-known/nodeinfo.
DereferenceInstance(ctx context.Context, iri *url.URL) (*gtsmodel.Instance, error)
// Finger performs a webfinger request with the given username and domain, and returns the bytes from the response body.
Finger(ctx context.Context, targetUsername string, targetDomain string) ([]byte, error)
}
// transport implements the Transport interface
type transport struct {
controller *controller
pubKeyID string
privkey crypto.PrivateKey
signerExp time.Time
getSigner httpsig.Signer
postSigner httpsig.Signer
signerMu sync.Mutex
}
// GET will perform given http request using transport client, retrying on certain preset errors.
func (t *transport) GET(r *http.Request) (*http.Response, error) {
if r.Method != http.MethodGet {
return nil, errors.New("must be GET request")
}
return t.do(r, func(r *http.Request) error {
return t.signGET(r)
})
}
// POST will perform given http request using transport client, retrying on certain preset errors.
func (t *transport) POST(r *http.Request, body []byte) (*http.Response, error) {
if r.Method != http.MethodPost {
return nil, errors.New("must be POST request")
}
return t.do(r, func(r *http.Request) error {
return t.signPOST(r, body)
})
}
func (t *transport) do(r *http.Request, signer func(*http.Request) error) (*http.Response, error) {
const (
// max no. attempts
maxRetries = 5
// starting backoff duration.
baseBackoff = 2 * time.Second
)
// Get request hostname
host := r.URL.Hostname()
// Check whether request should fast fail, we check this
// before loop as each context.Value() requires mutex lock.
fastFail := IsFastfail(r.Context())
if !fastFail {
// Check if recently reached max retries for this host
// so we don't bother with a retry-backoff loop. The only
// errors that are retried upon are server failure and
// domain resolution type errors, so this cached result
// indicates this server is likely having issues.
fastFail = t.controller.badHosts.Has(host)
}
// Start a log entry for this request
l := log.WithContext(r.Context()).
WithFields(kv.Fields{
{"pubKeyID", t.pubKeyID},
{"method", r.Method},
{"url", r.URL.String()},
}...)
r.Header.Set("User-Agent", t.controller.userAgent)
for i := 0; i < maxRetries; i++ {
var backoff time.Duration
// Reset signing header fields
now := t.controller.clock.Now().UTC()
r.Header.Set("Date", now.Format("Mon, 02 Jan 2006 15:04:05")+" GMT")
r.Header.Del("Signature")
r.Header.Del("Digest")
// Rewind body reader and content-length if set.
if rc, ok := r.Body.(*byteutil.ReadNopCloser); ok {
r.ContentLength = int64(rc.Len())
rc.Rewind()
}
// Perform request signing
if err := signer(r); err != nil {
return nil, err
}
l.Infof("performing request")
// Attempt to perform request
rsp, err := t.controller.client.Do(r)
if err == nil { //nolint:gocritic
// TooManyRequest means we need to slow
// down and retry our request. Codes over
// 500 generally indicate temp. outages.
if code := rsp.StatusCode; code < 500 &&
code != http.StatusTooManyRequests {
return rsp, nil
}
// Generate error from status code for logging
err = errors.New(`http response "` + rsp.Status + `"`)
// Search for a provided "Retry-After" header value.
if after := rsp.Header.Get("Retry-After"); after != "" {
if u, _ := strconv.ParseUint(after, 10, 32); u != 0 {
// An integer number of backoff seconds was provided.
backoff = time.Duration(u) * time.Second
} else if at, _ := http.ParseTime(after); !at.Before(now) {
// An HTTP formatted future date-time was provided.
backoff = at.Sub(now)
}
// Don't let their provided backoff exceed our max.
if max := baseBackoff * maxRetries; backoff > max {
backoff = max
}
}
} else if errorsv2.Is(err,
context.DeadlineExceeded,
context.Canceled,
httpclient.ErrInvalidRequest,
httpclient.ErrBodyTooLarge,
httpclient.ErrReservedAddr,
) {
// Return on non-retryable errors
return nil, err
} else if strings.Contains(err.Error(), "stopped after 10 redirects") {
// Don't bother if net/http returned after too many redirects
return nil, err
} else if errors.As(err, &x509.UnknownAuthorityError{}) {
// Unknown authority errors we do NOT recover from
return nil, err
} else if dnserr := (*net.DNSError)(nil); // nocollapse
errors.As(err, &dnserr) && dnserr.IsNotFound {
// DNS lookup failure, this domain does not exist
return nil, gtserror.SetNotFound(err)
}
if fastFail {
// on fast-fail, don't bother backoff/retry
return nil, fmt.Errorf("%w (fast fail)", err)
}
if backoff == 0 {
// No retry-after found, set our predefined backoff.
backoff = time.Duration(i) * baseBackoff
}
l.Errorf("backing off for %s after http request error: %v", backoff, err)
select {
// Request ctx cancelled
case <-r.Context().Done():
return nil, r.Context().Err()
// Backoff for some time
case <-time.After(backoff):
}
}
// Add "bad" entry for this host.
t.controller.badHosts.Set(host, struct{}{})
return nil, errors.New("transport reached max retries")
}
// signGET will safely sign an HTTP GET request.
func (t *transport) signGET(r *http.Request) (err error) {
t.safesign(func() {
err = t.getSigner.SignRequest(t.privkey, t.pubKeyID, r, nil)
})
return
}
// signPOST will safely sign an HTTP POST request for given body.
func (t *transport) signPOST(r *http.Request, body []byte) (err error) {
t.safesign(func() {
err = t.postSigner.SignRequest(t.privkey, t.pubKeyID, r, body)
})
return
}
// safesign will perform sign function within mutex protection,
// and ensured that httpsig.Signers are up-to-date.
func (t *transport) safesign(sign func()) {
// Perform within mu safety
t.signerMu.Lock()
defer t.signerMu.Unlock()
if now := time.Now(); now.After(t.signerExp) {
const expiry = 120
// Signers have expired and require renewal
t.getSigner, _ = NewGETSigner(expiry)
t.postSigner, _ = NewPOSTSigner(expiry)
t.signerExp = now.Add(time.Second * expiry)
}
// Perform signing
sign()
}
|