diff options
author | 2023-11-04 20:21:20 +0000 | |
---|---|---|
committer | 2023-11-04 20:21:20 +0000 | |
commit | 41435a6c4ee0a5b52528890edf3fbf5a9dc0a6c8 (patch) | |
tree | 987b5d7787b24f6f6e340bbcf7fd1b052fe40dfc /internal/processing | |
parent | [docs/bugfix] fix link to swagger yaml (#2333) (diff) | |
download | gotosocial-41435a6c4ee0a5b52528890edf3fbf5a9dc0a6c8.tar.xz |
[feature] support canceling scheduled tasks, some federation API performance improvements (#2329)
Diffstat (limited to 'internal/processing')
-rw-r--r-- | internal/processing/search/get.go | 1 | ||||
-rw-r--r-- | internal/processing/workers/fromfediapi.go | 101 | ||||
-rw-r--r-- | internal/processing/workers/fromfediapi_test.go | 50 |
3 files changed, 58 insertions, 94 deletions
diff --git a/internal/processing/search/get.go b/internal/processing/search/get.go index 30a2745af..4c09f05bb 100644 --- a/internal/processing/search/get.go +++ b/internal/processing/search/get.go @@ -603,7 +603,6 @@ func (p *Processor) statusByURI( requestingAccount.Username, uri, ) - return status, err } diff --git a/internal/processing/workers/fromfediapi.go b/internal/processing/workers/fromfediapi.go index f57235bf1..1ce3b6076 100644 --- a/internal/processing/workers/fromfediapi.go +++ b/internal/processing/workers/fromfediapi.go @@ -32,6 +32,7 @@ import ( "github.com/superseriousbusiness/gotosocial/internal/messages" "github.com/superseriousbusiness/gotosocial/internal/processing/account" "github.com/superseriousbusiness/gotosocial/internal/state" + "github.com/superseriousbusiness/gotosocial/internal/util" ) // fediAPI wraps processing functions @@ -142,27 +143,22 @@ func (p *Processor) ProcessFromFediAPI(ctx context.Context, fMsg messages.FromFe } } - return nil + return gtserror.Newf("unhandled: %s %s", fMsg.APActivityType, fMsg.APObjectType) } func (p *fediAPI) CreateStatus(ctx context.Context, fMsg messages.FromFediAPI) error { var ( status *gtsmodel.Status err error - - // Check the federatorMsg for either an already dereferenced - // and converted status pinned to the message, or a forwarded - // AP IRI that we still need to deref. - forwarded = (fMsg.GTSModel == nil) ) - if forwarded { - // Model was not set, deref with IRI. + if fMsg.APObjectModel == nil /* i.e. forwarded */ { + // Model was not set, deref with IRI (this is a forward). // This will also cause the status to be inserted into the db. status, err = p.statusFromAPIRI(ctx, fMsg) } else { // Model is set, ensure we have the most up-to-date model. - status, err = p.statusFromGTSModel(ctx, fMsg) + status, err = p.statusFromAPModel(ctx, fMsg) } if err != nil { @@ -188,19 +184,10 @@ func (p *fediAPI) CreateStatus(ctx context.Context, fMsg messages.FromFediAPI) e } } - // Ensure status ancestors dereferenced. We need at least the - // immediate parent (if present) to ascertain timelineability. - if err := p.federate.DereferenceStatusAncestors( - ctx, - fMsg.ReceivingAccount.Username, - status, - ); err != nil { - return err - } - if status.InReplyToID != "" { - // Interaction counts changed on the replied status; - // uncache the prepared version from all timelines. + // Interaction counts changed on the replied status; uncache the + // prepared version from all timelines. The status dereferencer + // functions will ensure necessary ancestors exist before this point. p.surface.invalidateStatusFromTimelines(ctx, status.InReplyToID) } @@ -211,23 +198,31 @@ func (p *fediAPI) CreateStatus(ctx context.Context, fMsg messages.FromFediAPI) e return nil } -func (p *fediAPI) statusFromGTSModel(ctx context.Context, fMsg messages.FromFediAPI) (*gtsmodel.Status, error) { - // There should be a status pinned to the message: - // we've already checked to ensure this is not nil. - status, ok := fMsg.GTSModel.(*gtsmodel.Status) +func (p *fediAPI) statusFromAPModel(ctx context.Context, fMsg messages.FromFediAPI) (*gtsmodel.Status, error) { + // AP statusable representation MUST have been set. + statusable, ok := fMsg.APObjectModel.(ap.Statusable) if !ok { - err := gtserror.New("Note was not parseable as *gtsmodel.Status") - return nil, err + return nil, gtserror.Newf("cannot cast %T -> ap.Statusable", fMsg.APObjectModel) } - // AP statusable representation may have also - // been set on message (no problem if not). - statusable, _ := fMsg.APObjectModel.(ap.Statusable) + // Status may have been set (no problem if not). + status, _ := fMsg.GTSModel.(*gtsmodel.Status) + + if status == nil { + // No status was set, create a bare-bones + // model for the deferencer to flesh-out, + // this indicates it is a new (to us) status. + status = >smodel.Status{ - // Call refresh on status to update - // it (deref remote) if necessary. - var err error - status, _, err = p.federate.RefreshStatus( + // if coming in here status will ALWAYS be remote. + Local: util.Ptr(false), + URI: ap.GetJSONLDId(statusable).String(), + } + } + + // Call refresh on status to either update existing + // model, or parse + insert status from statusable data. + status, _, err := p.federate.RefreshStatus( ctx, fMsg.ReceivingAccount.Username, status, @@ -235,7 +230,7 @@ func (p *fediAPI) statusFromGTSModel(ctx context.Context, fMsg messages.FromFedi false, // Don't force refresh. ) if err != nil { - return nil, gtserror.Newf("%w", err) + return nil, gtserror.Newf("error refreshing status: %w", err) } return status, nil @@ -245,11 +240,8 @@ func (p *fediAPI) statusFromAPIRI(ctx context.Context, fMsg messages.FromFediAPI // There should be a status IRI pinned to // the federatorMsg for us to dereference. if fMsg.APIri == nil { - err := gtserror.New( - "status was not pinned to federatorMsg, " + - "and neither was an IRI for us to dereference", - ) - return nil, err + const text = "neither APObjectModel nor APIri set" + return nil, gtserror.New(text) } // Get the status + ensure we have @@ -260,7 +252,7 @@ func (p *fediAPI) statusFromAPIRI(ctx context.Context, fMsg messages.FromFediAPI fMsg.APIri, ) if err != nil { - return nil, gtserror.Newf("%w", err) + return nil, gtserror.Newf("error getting status by uri %s: %w", fMsg.APIri, err) } return status, nil @@ -337,7 +329,9 @@ func (p *fediAPI) CreateAnnounce(ctx context.Context, fMsg messages.FromFediAPI) return gtserror.Newf("%T not parseable as *gtsmodel.Status", fMsg.GTSModel) } - // Dereference status that this status boosts. + // Dereference status that this boosts, note + // that this will handle dereferencing the status + // ancestors / descendants where appropriate. if err := p.federate.DereferenceAnnounce( ctx, status, @@ -358,15 +352,6 @@ func (p *fediAPI) CreateAnnounce(ctx context.Context, fMsg messages.FromFediAPI) return gtserror.Newf("db error inserting status: %w", err) } - // Ensure boosted status ancestors dereferenced. We need at least - // the immediate parent (if present) to ascertain timelineability. - if err := p.federate.DereferenceStatusAncestors(ctx, - fMsg.ReceivingAccount.Username, - status.BoostOf, - ); err != nil { - return err - } - // Timeline and notify the announce. if err := p.surface.timelineAndNotifyStatus(ctx, status); err != nil { return gtserror.Newf("error timelining status: %w", err) @@ -526,23 +511,25 @@ func (p *fediAPI) UpdateStatus(ctx context.Context, fMsg messages.FromFediAPI) e } // Cast the updated ActivityPub statusable object . - apStatus, ok := fMsg.APObjectModel.(ap.Statusable) - if !ok { - return gtserror.Newf("cannot cast %T -> ap.Statusable", fMsg.APObjectModel) - } + apStatus, _ := fMsg.APObjectModel.(ap.Statusable) // Fetch up-to-date attach status attachments, etc. - _, _, err := p.federate.RefreshStatus( + _, statusable, err := p.federate.RefreshStatus( ctx, fMsg.ReceivingAccount.Username, existing, apStatus, - false, + true, ) if err != nil { return gtserror.Newf("error refreshing updated status: %w", err) } + if statusable != nil { + // Status representation was refetched, uncache from timelines. + p.surface.invalidateStatusFromTimelines(ctx, existing.ID) + } + return nil } diff --git a/internal/processing/workers/fromfediapi_test.go b/internal/processing/workers/fromfediapi_test.go index f8e3941fc..b8d86ac45 100644 --- a/internal/processing/workers/fromfediapi_test.go +++ b/internal/processing/workers/fromfediapi_test.go @@ -29,7 +29,6 @@ import ( apimodel "github.com/superseriousbusiness/gotosocial/internal/api/model" "github.com/superseriousbusiness/gotosocial/internal/db" "github.com/superseriousbusiness/gotosocial/internal/gtsmodel" - "github.com/superseriousbusiness/gotosocial/internal/id" "github.com/superseriousbusiness/gotosocial/internal/messages" "github.com/superseriousbusiness/gotosocial/internal/stream" "github.com/superseriousbusiness/gotosocial/internal/util" @@ -92,54 +91,33 @@ func (suite *FromFediAPITestSuite) TestProcessReplyMention() { repliedStatus := suite.testStatuses["local_account_1_status_1"] replyingAccount := suite.testAccounts["remote_account_1"] - replyingStatus := >smodel.Status{ - CreatedAt: time.Now(), - UpdatedAt: time.Now(), - URI: "http://fossbros-anonymous.io/users/foss_satan/statuses/106221634728637552", - URL: "http://fossbros-anonymous.io/@foss_satan/106221634728637552", - Content: `<p><span class="h-card"><a href="http://localhost:8080/@the_mighty_zork" class="u-url mention">@<span>the_mighty_zork</span></a></span> nice there it is:</p><p><a href="http://localhost:8080/users/the_mighty_zork/statuses/01F8MHAMCHF6Y650WCRSCP4WMY/activity" rel="nofollow noopener noreferrer" target="_blank"><span class="invisible">https://</span><span class="ellipsis">social.pixie.town/users/f0x/st</span><span class="invisible">atuses/106221628567855262/activity</span></a></p>`, - Mentions: []*gtsmodel.Mention{ - { - TargetAccountURI: repliedAccount.URI, - NameString: "@the_mighty_zork@localhost:8080", - }, - }, - AccountID: replyingAccount.ID, - AccountURI: replyingAccount.URI, - InReplyToID: repliedStatus.ID, - InReplyToURI: repliedStatus.URI, - InReplyToAccountID: repliedAccount.ID, - Visibility: gtsmodel.VisibilityUnlocked, - ActivityStreamsType: ap.ObjectNote, - Federated: util.Ptr(true), - Boostable: util.Ptr(true), - Replyable: util.Ptr(true), - Likeable: util.Ptr(false), - } + // Set the replyingAccount's last fetched_at + // date to something recent so no refresh is attempted. + replyingAccount.FetchedAt = time.Now() + err := suite.state.DB.UpdateAccount(context.Background(), replyingAccount, "fetched_at") + suite.NoError(err) + + // Get replying statusable to use from remote test statuses. + const replyingURI = "http://fossbros-anonymous.io/users/foss_satan/statuses/106221634728637552" + replyingStatusable := testrig.NewTestFediStatuses()[replyingURI] + ap.AppendInReplyTo(replyingStatusable, testrig.URLMustParse(repliedStatus.URI)) + // Open a websocket stream to later test the streamed status reply. wssStream, errWithCode := suite.processor.Stream().Open(context.Background(), repliedAccount, stream.TimelineHome) suite.NoError(errWithCode) - // id the status based on the time it was created - statusID, err := id.NewULIDFromTime(replyingStatus.CreatedAt) - suite.NoError(err) - replyingStatus.ID = statusID - - err = suite.db.PutStatus(context.Background(), replyingStatus) - suite.NoError(err) - + // Send the replied status off to the fedi worker to be further processed. err = suite.processor.Workers().ProcessFromFediAPI(context.Background(), messages.FromFediAPI{ APObjectType: ap.ObjectNote, APActivityType: ap.ActivityCreate, - GTSModel: replyingStatus, + APObjectModel: replyingStatusable, ReceivingAccount: suite.testAccounts["local_account_1"], }) suite.NoError(err) // side effects should be triggered // 1. status should be in the database - suite.NotEmpty(replyingStatus.ID) - _, err = suite.db.GetStatusByID(context.Background(), replyingStatus.ID) + replyingStatus, err := suite.state.DB.GetStatusByURI(context.Background(), replyingURI) suite.NoError(err) // 2. a notification should exist for the mention |