diff options
Diffstat (limited to 'internal/federation')
-rw-r--r-- | internal/federation/authenticate.go | 16 | ||||
-rw-r--r-- | internal/federation/dereferencing/account.go | 144 | ||||
-rw-r--r-- | internal/federation/dereferencing/announce.go | 11 | ||||
-rw-r--r-- | internal/federation/dereferencing/collection.go (renamed from internal/federation/dereferencing/collectionpage.go) | 44 | ||||
-rw-r--r-- | internal/federation/dereferencing/status.go | 43 | ||||
-rw-r--r-- | internal/federation/dereferencing/thread.go | 92 |
6 files changed, 222 insertions, 128 deletions
diff --git a/internal/federation/authenticate.go b/internal/federation/authenticate.go index 59281fa65..596233b19 100644 --- a/internal/federation/authenticate.go +++ b/internal/federation/authenticate.go @@ -23,6 +23,7 @@ import ( "encoding/json" "errors" "fmt" + "io" "net/http" "net/url" "time" @@ -414,10 +415,19 @@ func (f *Federator) callForPubKey( // The actual http call to the remote server is // made right here by the Dereference function. - pubKeyBytes, err := trans.Dereference(ctx, pubKeyID) + rsp, err := trans.Dereference(ctx, pubKeyID) + if err == nil { - // No problem. - return pubKeyBytes, nil + // Read the response body data. + b, err := io.ReadAll(rsp.Body) + _ = rsp.Body.Close() // done + + if err != nil { + err := gtserror.Newf("error reading pubkey: %w", err) + return nil, gtserror.NewErrorInternalError(err) + } + + return b, nil } if gtserror.StatusCode(err) == http.StatusGone { diff --git a/internal/federation/dereferencing/account.go b/internal/federation/dereferencing/account.go index c3ad6be5e..10d15bca6 100644 --- a/internal/federation/dereferencing/account.go +++ b/internal/federation/dereferencing/account.go @@ -19,14 +19,12 @@ package dereferencing import ( "context" - "encoding/json" "errors" "io" "net/url" "time" - "github.com/superseriousbusiness/activity/streams" - "github.com/superseriousbusiness/activity/streams/vocab" + "github.com/superseriousbusiness/activity/pub" "github.com/superseriousbusiness/gotosocial/internal/ap" "github.com/superseriousbusiness/gotosocial/internal/config" "github.com/superseriousbusiness/gotosocial/internal/db" @@ -499,16 +497,17 @@ func (d *Dereferencer) enrichAccount( case err == nil && account.Domain != accDomain: // After webfinger, we now have correct account domain from which we can do a final DB check. - alreadyAcct, err := d.state.DB.GetAccountByUsernameDomain(ctx, account.Username, accDomain) + alreadyAcc, err := d.state.DB.GetAccountByUsernameDomain(ctx, account.Username, accDomain) if err != nil && !errors.Is(err, db.ErrNoEntries) { - return nil, nil, gtserror.Newf("db err looking for account again after webfinger: %w", err) + return nil, nil, gtserror.Newf("db error getting account after webfinger: %w", err) } - if alreadyAcct != nil { - // We had this account stored under - // the discovered accountDomain. + if alreadyAcc != nil { + // We had this account stored + // under discovered accountDomain. + // // Proceed with this account. - account = alreadyAcct + account = alreadyAcc } // Whether we had the account or not, we @@ -537,8 +536,9 @@ func (d *Dereferencer) enrichAccount( ) } + // Check URI scheme ahead of time for more useful errs. if uri.Scheme != "http" && uri.Scheme != "https" { - err = errors.New("account URI scheme must be http or https") + err := errors.New("account URI scheme must be http or https") return nil, nil, gtserror.Newf( "invalid uri %q: %w", account.URI, gtserror.SetUnretrievable(err), @@ -567,20 +567,52 @@ func (d *Dereferencer) enrichAccount( // We were not given any (partial) ActivityPub // version of this account as a parameter. // Dereference latest version of the account. - b, err := tsport.Dereference(ctx, uri) + rsp, err := tsport.Dereference(ctx, uri) if err != nil { err := gtserror.Newf("error dereferencing %s: %w", uri, err) return nil, nil, gtserror.SetUnretrievable(err) } - // Attempt to resolve ActivityPub acc from data. - apubAcc, err = ap.ResolveAccountable(ctx, b) + // Attempt to resolve ActivityPub acc from response. + apubAcc, err = ap.ResolveAccountable(ctx, rsp.Body) + + // Tidy up now done. + _ = rsp.Body.Close() + if err != nil { - // ResolveAccountable will set Unretrievable/WrongType + // ResolveAccountable will set gtserror.WrongType // on the returned error, so we don't need to do it here. - err = gtserror.Newf("error resolving accountable from data for account %s: %w", uri, err) + err = gtserror.Newf("error resolving accountable %s: %w", uri, err) return nil, nil, err } + + // Check whether input URI and final returned URI + // have changed (i.e. we followed some redirects). + if finalURIStr := rsp.Request.URL.String(); // + finalURIStr != uri.String() { + + // NOTE: this URI check + database call is performed + // AFTER reading and closing response body, for performance. + // + // Check whether we have this account stored under *final* URI. + alreadyAcc, err := d.state.DB.GetAccountByURI(ctx, finalURIStr) + if err != nil && !errors.Is(err, db.ErrNoEntries) { + return nil, nil, gtserror.Newf("db error getting account after redirects: %w", err) + } + + if alreadyAcc != nil { + // We had this account stored + // under discovered final URI. + // + // Proceed with this account. + account = alreadyAcc + } + + // Update the input URI to + // the final determined URI + // for later URI checks. + uri = rsp.Request.URL + } } /* @@ -1014,76 +1046,33 @@ func (d *Dereferencer) dereferenceAccountFeatured(ctx context.Context, requestUs return err } - // Pre-fetch a transport for requesting username, used by later deref procedures. - tsport, err := d.transportController.NewTransportForUsername(ctx, requestUser) - if err != nil { - return gtserror.Newf("couldn't create transport: %w", err) - } - - b, err := tsport.Dereference(ctx, uri) + collect, err := d.dereferenceCollection(ctx, requestUser, uri) if err != nil { return err } - m := make(map[string]interface{}) - if err := json.Unmarshal(b, &m); err != nil { - return gtserror.Newf("error unmarshalling bytes into json: %w", err) - } - - t, err := streams.ToType(ctx, m) - if err != nil { - return gtserror.Newf("error resolving json into ap vocab type: %w", err) - } - - if t.GetTypeName() != ap.ObjectOrderedCollection { - return gtserror.Newf("%s was not an OrderedCollection", uri) - } - - collection, ok := t.(vocab.ActivityStreamsOrderedCollection) - if !ok { - return gtserror.New("couldn't coerce OrderedCollection") - } - - items := collection.GetActivityStreamsOrderedItems() - if items == nil { - return gtserror.New("nil orderedItems") - } - // Get previous pinned statuses (we'll need these later). wasPinned, err := d.state.DB.GetAccountPinnedStatuses(ctx, account.ID) if err != nil && !errors.Is(err, db.ErrNoEntries) { return gtserror.Newf("error getting account pinned statuses: %w", err) } - statusURIs := make([]*url.URL, 0, items.Len()) - for iter := items.Begin(); iter != items.End(); iter = iter.Next() { - var statusURI *url.URL + var statusURIs []*url.URL - switch { - case iter.IsActivityStreamsNote(): - // We got a whole Note. Extract the URI. - if note := iter.GetActivityStreamsNote(); note != nil { - if id := note.GetJSONLDId(); id != nil { - statusURI = id.GetIRI() - } - } - case iter.IsActivityStreamsArticle(): - // We got a whole Article. Extract the URI. - if article := iter.GetActivityStreamsArticle(); article != nil { - if id := article.GetJSONLDId(); id != nil { - statusURI = id.GetIRI() - } - } - default: - // Try to get just the URI. - statusURI = iter.GetIRI() + for { + // Get next collect item. + item := collect.NextItem() + if item == nil { + break } - if statusURI == nil { + // Check for available IRI. + itemIRI, _ := pub.ToId(item) + if itemIRI == nil { continue } - if statusURI.Host != uri.Host { + if itemIRI.Host != uri.Host { // If this status doesn't share a host with its featured // collection URI, we shouldn't trust it. Just move on. continue @@ -1093,13 +1082,13 @@ func (d *Dereferencer) dereferenceAccountFeatured(ctx context.Context, requestUs // We do this here so that even if we can't get // the status in the next part for some reason, // we still know it was *meant* to be pinned. - statusURIs = append(statusURIs, statusURI) + statusURIs = append(statusURIs, itemIRI) // Search for status by URI. Note this may return an existing model // we have stored with an error from attempted update, so check both. - status, _, _, err := d.getStatusByURI(ctx, requestUser, statusURI) + status, _, _, err := d.getStatusByURI(ctx, requestUser, itemIRI) if err != nil { - log.Errorf(ctx, "error getting status from featured collection %s: %v", statusURI, err) + log.Errorf(ctx, "error getting status from featured collection %s: %v", itemIRI, err) if status == nil { // This is only unactionable @@ -1108,20 +1097,23 @@ func (d *Dereferencer) dereferenceAccountFeatured(ctx context.Context, requestUs } } - // If the status was already pinned, we don't need to do anything. + // If the status was already pinned, + // we don't need to do anything. if !status.PinnedAt.IsZero() { continue } - if status.AccountID != account.ID { + if status.AccountURI != account.URI { // Someone's pinned a status that doesn't // belong to them, this doesn't work for us. continue } if status.BoostOfID != "" { - // Someone's pinned a boost. This also - // doesn't work for us. + // Someone's pinned a boost. This + // also doesn't work for us. (note + // we check using BoostOfID since + // BoostOfURI isn't *always* set). continue } diff --git a/internal/federation/dereferencing/announce.go b/internal/federation/dereferencing/announce.go index 8d082105b..02b1d5e5c 100644 --- a/internal/federation/dereferencing/announce.go +++ b/internal/federation/dereferencing/announce.go @@ -83,6 +83,10 @@ func (d *Dereferencer) EnrichAnnounce( return nil, gtserror.Newf("error generating id: %w", err) } + // Set boost_of_uri again in case the + // original URI was an indirect link. + boost.BoostOfURI = target.URI + // Populate remaining fields on // the boost wrapper using target. boost.Content = target.Content @@ -101,10 +105,10 @@ func (d *Dereferencer) EnrichAnnounce( boost.Replyable = target.Replyable boost.Likeable = target.Likeable - // Store the boost wrapper status. + // Store the boost wrapper status in database. switch err = d.state.DB.PutStatus(ctx, boost); { case err == nil: - // All good baby. + // all groovy. case errors.Is(err, db.ErrAlreadyExists): uri := boost.URI @@ -119,8 +123,7 @@ func (d *Dereferencer) EnrichAnnounce( ) } - default: - // Proper database error. + default: // Proper database error. return nil, gtserror.Newf("db error inserting status: %w", err) } diff --git a/internal/federation/dereferencing/collectionpage.go b/internal/federation/dereferencing/collection.go index dc5c68273..07f56c952 100644 --- a/internal/federation/dereferencing/collectionpage.go +++ b/internal/federation/dereferencing/collection.go @@ -19,18 +19,16 @@ package dereferencing import ( "context" - "encoding/json" "net/url" - "github.com/superseriousbusiness/activity/streams" "github.com/superseriousbusiness/activity/streams/vocab" "github.com/superseriousbusiness/gotosocial/internal/ap" "github.com/superseriousbusiness/gotosocial/internal/gtserror" "github.com/superseriousbusiness/gotosocial/internal/log" ) -// dereferenceCollectionPage returns the activitystreams CollectionPage at the specified IRI, or an error if something goes wrong. -func (d *Dereferencer) dereferenceCollectionPage(ctx context.Context, username string, pageIRI *url.URL) (ap.CollectionPageIterator, error) { +// dereferenceCollectionPage returns the activitystreams Collection at the specified IRI, or an error if something goes wrong. +func (d *Dereferencer) dereferenceCollection(ctx context.Context, username string, pageIRI *url.URL) (ap.CollectionIterator, error) { if blocked, err := d.state.DB.IsDomainBlocked(ctx, pageIRI.Host); blocked || err != nil { return nil, gtserror.Newf("domain %s is blocked", pageIRI.Host) } @@ -40,24 +38,46 @@ func (d *Dereferencer) dereferenceCollectionPage(ctx context.Context, username s return nil, gtserror.Newf("error creating transport: %w", err) } - b, err := transport.Dereference(ctx, pageIRI) + rsp, err := transport.Dereference(ctx, pageIRI) if err != nil { return nil, gtserror.Newf("error deferencing %s: %w", pageIRI.String(), err) } - m := make(map[string]interface{}) - if err := json.Unmarshal(b, &m); err != nil { - return nil, gtserror.Newf("error unmarshalling bytes into json: %w", err) + collect, err := ap.ResolveCollection(ctx, rsp.Body) + + // Tidy up rsp body. + _ = rsp.Body.Close() + + if err != nil { + return nil, gtserror.Newf("error resolving collection %s: %w", pageIRI.String(), err) } - t, err := streams.ToType(ctx, m) + return collect, nil +} + +// dereferenceCollectionPage returns the activitystreams CollectionPage at the specified IRI, or an error if something goes wrong. +func (d *Dereferencer) dereferenceCollectionPage(ctx context.Context, username string, pageIRI *url.URL) (ap.CollectionPageIterator, error) { + if blocked, err := d.state.DB.IsDomainBlocked(ctx, pageIRI.Host); blocked || err != nil { + return nil, gtserror.Newf("domain %s is blocked", pageIRI.Host) + } + + transport, err := d.transportController.NewTransportForUsername(ctx, username) + if err != nil { + return nil, gtserror.Newf("error creating transport: %w", err) + } + + rsp, err := transport.Dereference(ctx, pageIRI) if err != nil { - return nil, gtserror.Newf("error resolving json into ap vocab type: %w", err) + return nil, gtserror.Newf("error deferencing %s: %w", pageIRI.String(), err) } - page, err := ap.ToCollectionPageIterator(t) + page, err := ap.ResolveCollectionPage(ctx, rsp.Body) + + // Tidy up rsp body. + _ = rsp.Body.Close() + if err != nil { - return nil, gtserror.Newf("error resolving vocab type as page: %w", err) + return nil, gtserror.Newf("error resolving collection page %s: %w", pageIRI.String(), err) } return page, nil diff --git a/internal/federation/dereferencing/status.go b/internal/federation/dereferencing/status.go index e3f97553d..397d2aa28 100644 --- a/internal/federation/dereferencing/status.go +++ b/internal/federation/dereferencing/status.go @@ -393,16 +393,51 @@ func (d *Dereferencer) enrichStatus( if apubStatus == nil { // Dereference latest version of the status. - b, err := tsport.Dereference(ctx, uri) + rsp, err := tsport.Dereference(ctx, uri) if err != nil { err := gtserror.Newf("error dereferencing %s: %w", uri, err) return nil, nil, gtserror.SetUnretrievable(err) } - // Attempt to resolve ActivityPub status from data. - apubStatus, err = ap.ResolveStatusable(ctx, b) + // Attempt to resolve ActivityPub status from response. + apubStatus, err = ap.ResolveStatusable(ctx, rsp.Body) + + // Tidy up now done. + _ = rsp.Body.Close() + if err != nil { - return nil, nil, gtserror.Newf("error resolving statusable from data for account %s: %w", uri, err) + // ResolveStatusable will set gtserror.WrongType + // on the returned error, so we don't need to do it here. + err = gtserror.Newf("error resolving statusable %s: %w", uri, err) + return nil, nil, err + } + + // Check whether input URI and final returned URI + // have changed (i.e. we followed some redirects). + if finalURIStr := rsp.Request.URL.String(); // + finalURIStr != uri.String() { + + // NOTE: this URI check + database call is performed + // AFTER reading and closing response body, for performance. + // + // Check whether we have this status stored under *final* URI. + alreadyStatus, err := d.state.DB.GetStatusByURI(ctx, finalURIStr) + if err != nil && !errors.Is(err, db.ErrNoEntries) { + return nil, nil, gtserror.Newf("db error getting status after redirects: %w", err) + } + + if alreadyStatus != nil { + // We had this status stored + // under discovered final URI. + // + // Proceed with this status. + status = alreadyStatus + } + + // Update the input URI to + // the final determined URI + // for later URI checks. + uri = rsp.Request.URL } } diff --git a/internal/federation/dereferencing/thread.go b/internal/federation/dereferencing/thread.go index 28f7ffa8a..e528581c9 100644 --- a/internal/federation/dereferencing/thread.go +++ b/internal/federation/dereferencing/thread.go @@ -33,7 +33,7 @@ import ( // maxIter defines how many iterations of descendants or // ancesters we are willing to follow before returning error. -const maxIter = 1000 +const maxIter = 512 // dereferenceThread handles dereferencing status thread after // fetch. Passing off appropriate parts to be enqueued for async @@ -98,16 +98,10 @@ func (d *Dereferencer) DereferenceStatusAncestors(ctx context.Context, username return nil } + // Apparent current parent URI to log fields. l = l.WithField("parent", current.InReplyToURI) l.Trace("following status ancestor") - // Parse status parent URI for later use. - uri, err := url.Parse(current.InReplyToURI) - if err != nil { - l.Warnf("invalid uri: %v", err) - return nil - } - // Check whether this parent has already been deref'd. if _, ok := derefdStatuses[current.InReplyToURI]; ok { l.Warn("self referencing status ancestor") @@ -117,6 +111,13 @@ func (d *Dereferencer) DereferenceStatusAncestors(ctx context.Context, username // Add this status's parent URI to map of deref'd. derefdStatuses[current.InReplyToURI] = struct{}{} + // Parse status parent URI for later use. + uri, err := url.Parse(current.InReplyToURI) + if err != nil { + l.Warnf("invalid uri: %v", err) + return nil + } + // Fetch parent status by current's reply URI, this handles // case of existing (updating if necessary) or a new status. parent, _, _, err := d.getStatusByURI(ctx, username, uri) @@ -129,6 +130,7 @@ func (d *Dereferencer) DereferenceStatusAncestors(ctx context.Context, username // view the status (it's followers-only and // we don't follow, for example). case code == http.StatusNotFound: + // If this reply is followers-only or stricter, // we can safely assume the status it replies // to is also followers only or stricter. @@ -153,31 +155,43 @@ func (d *Dereferencer) DereferenceStatusAncestors(ctx context.Context, username // the now-gone parent. case code == http.StatusGone: l.Trace("status orphaned") - current.InReplyToID = "" - current.InReplyToURI = "" - current.InReplyToAccountID = "" current.InReplyTo = nil current.InReplyToAccount = nil - if err := d.state.DB.UpdateStatus(ctx, + return d.updateStatusParent(ctx, current, - "in_reply_to_id", - "in_reply_to_uri", - "in_reply_to_account_id", - ); err != nil { - return gtserror.Newf("db error updating status %s: %w", current.ID, err) - } - return nil + "", // status ID + "", // status URI + "", // account ID + ) // An error was returned for a status during // an attempted NEW dereference, return here. - case err != nil && current.InReplyToID == "": + // + // NOTE: this will catch all cases of a nil + // parent, all cases below can safely assume + // a non-nil parent in their code logic. + case err != nil && parent == nil: return gtserror.Newf("error dereferencing new %s: %w", current.InReplyToURI, err) // An error was returned for an existing parent, // we simply treat this as a temporary situation. - // (we fallback to using existing parent status). case err != nil: l.Errorf("error getting parent: %v", err) + } + + // Start a new switch case + // as the following scenarios + // are possible with / without + // any returned error. + switch { + + // The current status is using an indirect URL + // in order to reference the parent. This is just + // weird and broken... Leave the URI in place but + // don't link the statuses via database IDs as it + // could cause all sorts of unexpected situations. + case current.InReplyToURI != parent.URI: + l.Errorf("indirect in_reply_to_uri => %s", parent.URI) // The ID has changed for currently stored parent ID // (which may be empty, if new!) and fetched version. @@ -185,17 +199,14 @@ func (d *Dereferencer) DereferenceStatusAncestors(ctx context.Context, username // Update the current's inReplyTo fields to parent. case current.InReplyToID != parent.ID: l.Tracef("parent changed %s => %s", current.InReplyToID, parent.ID) - current.InReplyToAccountID = parent.AccountID current.InReplyToAccount = parent.Account - current.InReplyToURI = parent.URI - current.InReplyToID = parent.ID - if err := d.state.DB.UpdateStatus(ctx, + if err := d.updateStatusParent(ctx, current, - "in_reply_to_id", - "in_reply_to_uri", - "in_reply_to_account_id", + parent.ID, + parent.URI, + parent.AccountID, ); err != nil { - return gtserror.Newf("db error updating status %s: %w", current.ID, err) + return err } } @@ -384,3 +395,26 @@ stackLoop: return gtserror.Newf("reached %d descendant iterations for %q", maxIter, statusIRIStr) } + +// updateStatusParent updates the given status' parent +// status URI, ID and account ID to given values in DB. +func (d *Dereferencer) updateStatusParent( + ctx context.Context, + status *gtsmodel.Status, + parentStatusID string, + parentStatusURI string, + parentAccountID string, +) error { + status.InReplyToAccountID = parentAccountID + status.InReplyToURI = parentStatusURI + status.InReplyToID = parentStatusID + if err := d.state.DB.UpdateStatus(ctx, + status, + "in_reply_to_id", + "in_reply_to_uri", + "in_reply_to_account_id", + ); err != nil { + return gtserror.Newf("error updating status %s: %w", status.URI, err) + } + return nil +} |