diff options
| author | 2023-11-04 20:21:20 +0000 | |
|---|---|---|
| committer | 2023-11-04 20:21:20 +0000 | |
| commit | 41435a6c4ee0a5b52528890edf3fbf5a9dc0a6c8 (patch) | |
| tree | 987b5d7787b24f6f6e340bbcf7fd1b052fe40dfc /internal/processing | |
| parent | [docs/bugfix] fix link to swagger yaml (#2333) (diff) | |
| download | gotosocial-41435a6c4ee0a5b52528890edf3fbf5a9dc0a6c8.tar.xz | |
[feature] support canceling scheduled tasks, some federation API performance improvements (#2329)
Diffstat (limited to 'internal/processing')
| -rw-r--r-- | internal/processing/search/get.go | 1 | ||||
| -rw-r--r-- | internal/processing/workers/fromfediapi.go | 101 | ||||
| -rw-r--r-- | internal/processing/workers/fromfediapi_test.go | 50 | 
3 files changed, 58 insertions, 94 deletions
| diff --git a/internal/processing/search/get.go b/internal/processing/search/get.go index 30a2745af..4c09f05bb 100644 --- a/internal/processing/search/get.go +++ b/internal/processing/search/get.go @@ -603,7 +603,6 @@ func (p *Processor) statusByURI(  			requestingAccount.Username,  			uri,  		) -  		return status, err  	} diff --git a/internal/processing/workers/fromfediapi.go b/internal/processing/workers/fromfediapi.go index f57235bf1..1ce3b6076 100644 --- a/internal/processing/workers/fromfediapi.go +++ b/internal/processing/workers/fromfediapi.go @@ -32,6 +32,7 @@ import (  	"github.com/superseriousbusiness/gotosocial/internal/messages"  	"github.com/superseriousbusiness/gotosocial/internal/processing/account"  	"github.com/superseriousbusiness/gotosocial/internal/state" +	"github.com/superseriousbusiness/gotosocial/internal/util"  )  // fediAPI wraps processing functions @@ -142,27 +143,22 @@ func (p *Processor) ProcessFromFediAPI(ctx context.Context, fMsg messages.FromFe  		}  	} -	return nil +	return gtserror.Newf("unhandled: %s %s", fMsg.APActivityType, fMsg.APObjectType)  }  func (p *fediAPI) CreateStatus(ctx context.Context, fMsg messages.FromFediAPI) error {  	var (  		status *gtsmodel.Status  		err    error - -		// Check the federatorMsg for either an already dereferenced -		// and converted status pinned to the message, or a forwarded -		// AP IRI that we still need to deref. -		forwarded = (fMsg.GTSModel == nil)  	) -	if forwarded { -		// Model was not set, deref with IRI. +	if fMsg.APObjectModel == nil /* i.e. forwarded */ { +		// Model was not set, deref with IRI (this is a forward).  		// This will also cause the status to be inserted into the db.  		status, err = p.statusFromAPIRI(ctx, fMsg)  	} else {  		// Model is set, ensure we have the most up-to-date model. -		status, err = p.statusFromGTSModel(ctx, fMsg) +		status, err = p.statusFromAPModel(ctx, fMsg)  	}  	if err != nil { @@ -188,19 +184,10 @@ func (p *fediAPI) CreateStatus(ctx context.Context, fMsg messages.FromFediAPI) e  		}  	} -	// Ensure status ancestors dereferenced. We need at least the -	// immediate parent (if present) to ascertain timelineability. -	if err := p.federate.DereferenceStatusAncestors( -		ctx, -		fMsg.ReceivingAccount.Username, -		status, -	); err != nil { -		return err -	} -  	if status.InReplyToID != "" { -		// Interaction counts changed on the replied status; -		// uncache the prepared version from all timelines. +		// Interaction counts changed on the replied status; uncache the +		// prepared version from all timelines. The status dereferencer +		// functions will ensure necessary ancestors exist before this point.  		p.surface.invalidateStatusFromTimelines(ctx, status.InReplyToID)  	} @@ -211,23 +198,31 @@ func (p *fediAPI) CreateStatus(ctx context.Context, fMsg messages.FromFediAPI) e  	return nil  } -func (p *fediAPI) statusFromGTSModel(ctx context.Context, fMsg messages.FromFediAPI) (*gtsmodel.Status, error) { -	// There should be a status pinned to the message: -	// we've already checked to ensure this is not nil. -	status, ok := fMsg.GTSModel.(*gtsmodel.Status) +func (p *fediAPI) statusFromAPModel(ctx context.Context, fMsg messages.FromFediAPI) (*gtsmodel.Status, error) { +	// AP statusable representation MUST have been set. +	statusable, ok := fMsg.APObjectModel.(ap.Statusable)  	if !ok { -		err := gtserror.New("Note was not parseable as *gtsmodel.Status") -		return nil, err +		return nil, gtserror.Newf("cannot cast %T -> ap.Statusable", fMsg.APObjectModel)  	} -	// AP statusable representation may have also -	// been set on message (no problem if not). -	statusable, _ := fMsg.APObjectModel.(ap.Statusable) +	// Status may have been set (no problem if not). +	status, _ := fMsg.GTSModel.(*gtsmodel.Status) + +	if status == nil { +		// No status was set, create a bare-bones +		// model for the deferencer to flesh-out, +		// this indicates it is a new (to us) status. +		status = >smodel.Status{ -	// Call refresh on status to update -	// it (deref remote) if necessary. -	var err error -	status, _, err = p.federate.RefreshStatus( +			// if coming in here status will ALWAYS be remote. +			Local: util.Ptr(false), +			URI:   ap.GetJSONLDId(statusable).String(), +		} +	} + +	// Call refresh on status to either update existing +	// model, or parse + insert status from statusable data. +	status, _, err := p.federate.RefreshStatus(  		ctx,  		fMsg.ReceivingAccount.Username,  		status, @@ -235,7 +230,7 @@ func (p *fediAPI) statusFromGTSModel(ctx context.Context, fMsg messages.FromFedi  		false, // Don't force refresh.  	)  	if err != nil { -		return nil, gtserror.Newf("%w", err) +		return nil, gtserror.Newf("error refreshing status: %w", err)  	}  	return status, nil @@ -245,11 +240,8 @@ func (p *fediAPI) statusFromAPIRI(ctx context.Context, fMsg messages.FromFediAPI  	// There should be a status IRI pinned to  	// the federatorMsg for us to dereference.  	if fMsg.APIri == nil { -		err := gtserror.New( -			"status was not pinned to federatorMsg, " + -				"and neither was an IRI for us to dereference", -		) -		return nil, err +		const text = "neither APObjectModel nor APIri set" +		return nil, gtserror.New(text)  	}  	// Get the status + ensure we have @@ -260,7 +252,7 @@ func (p *fediAPI) statusFromAPIRI(ctx context.Context, fMsg messages.FromFediAPI  		fMsg.APIri,  	)  	if err != nil { -		return nil, gtserror.Newf("%w", err) +		return nil, gtserror.Newf("error getting status by uri %s: %w", fMsg.APIri, err)  	}  	return status, nil @@ -337,7 +329,9 @@ func (p *fediAPI) CreateAnnounce(ctx context.Context, fMsg messages.FromFediAPI)  		return gtserror.Newf("%T not parseable as *gtsmodel.Status", fMsg.GTSModel)  	} -	// Dereference status that this status boosts. +	// Dereference status that this boosts, note +	// that this will handle dereferencing the status +	// ancestors / descendants where appropriate.  	if err := p.federate.DereferenceAnnounce(  		ctx,  		status, @@ -358,15 +352,6 @@ func (p *fediAPI) CreateAnnounce(ctx context.Context, fMsg messages.FromFediAPI)  		return gtserror.Newf("db error inserting status: %w", err)  	} -	// Ensure boosted status ancestors dereferenced. We need at least -	// the immediate parent (if present) to ascertain timelineability. -	if err := p.federate.DereferenceStatusAncestors(ctx, -		fMsg.ReceivingAccount.Username, -		status.BoostOf, -	); err != nil { -		return err -	} -  	// Timeline and notify the announce.  	if err := p.surface.timelineAndNotifyStatus(ctx, status); err != nil {  		return gtserror.Newf("error timelining status: %w", err) @@ -526,23 +511,25 @@ func (p *fediAPI) UpdateStatus(ctx context.Context, fMsg messages.FromFediAPI) e  	}  	// Cast the updated ActivityPub statusable object . -	apStatus, ok := fMsg.APObjectModel.(ap.Statusable) -	if !ok { -		return gtserror.Newf("cannot cast %T -> ap.Statusable", fMsg.APObjectModel) -	} +	apStatus, _ := fMsg.APObjectModel.(ap.Statusable)  	// Fetch up-to-date attach status attachments, etc. -	_, _, err := p.federate.RefreshStatus( +	_, statusable, err := p.federate.RefreshStatus(  		ctx,  		fMsg.ReceivingAccount.Username,  		existing,  		apStatus, -		false, +		true,  	)  	if err != nil {  		return gtserror.Newf("error refreshing updated status: %w", err)  	} +	if statusable != nil { +		// Status representation was refetched, uncache from timelines. +		p.surface.invalidateStatusFromTimelines(ctx, existing.ID) +	} +  	return nil  } diff --git a/internal/processing/workers/fromfediapi_test.go b/internal/processing/workers/fromfediapi_test.go index f8e3941fc..b8d86ac45 100644 --- a/internal/processing/workers/fromfediapi_test.go +++ b/internal/processing/workers/fromfediapi_test.go @@ -29,7 +29,6 @@ import (  	apimodel "github.com/superseriousbusiness/gotosocial/internal/api/model"  	"github.com/superseriousbusiness/gotosocial/internal/db"  	"github.com/superseriousbusiness/gotosocial/internal/gtsmodel" -	"github.com/superseriousbusiness/gotosocial/internal/id"  	"github.com/superseriousbusiness/gotosocial/internal/messages"  	"github.com/superseriousbusiness/gotosocial/internal/stream"  	"github.com/superseriousbusiness/gotosocial/internal/util" @@ -92,54 +91,33 @@ func (suite *FromFediAPITestSuite) TestProcessReplyMention() {  	repliedStatus := suite.testStatuses["local_account_1_status_1"]  	replyingAccount := suite.testAccounts["remote_account_1"] -	replyingStatus := >smodel.Status{ -		CreatedAt: time.Now(), -		UpdatedAt: time.Now(), -		URI:       "http://fossbros-anonymous.io/users/foss_satan/statuses/106221634728637552", -		URL:       "http://fossbros-anonymous.io/@foss_satan/106221634728637552", -		Content:   `<p><span class="h-card"><a href="http://localhost:8080/@the_mighty_zork" class="u-url mention">@<span>the_mighty_zork</span></a></span> nice there it is:</p><p><a href="http://localhost:8080/users/the_mighty_zork/statuses/01F8MHAMCHF6Y650WCRSCP4WMY/activity" rel="nofollow noopener noreferrer" target="_blank"><span class="invisible">https://</span><span class="ellipsis">social.pixie.town/users/f0x/st</span><span class="invisible">atuses/106221628567855262/activity</span></a></p>`, -		Mentions: []*gtsmodel.Mention{ -			{ -				TargetAccountURI: repliedAccount.URI, -				NameString:       "@the_mighty_zork@localhost:8080", -			}, -		}, -		AccountID:           replyingAccount.ID, -		AccountURI:          replyingAccount.URI, -		InReplyToID:         repliedStatus.ID, -		InReplyToURI:        repliedStatus.URI, -		InReplyToAccountID:  repliedAccount.ID, -		Visibility:          gtsmodel.VisibilityUnlocked, -		ActivityStreamsType: ap.ObjectNote, -		Federated:           util.Ptr(true), -		Boostable:           util.Ptr(true), -		Replyable:           util.Ptr(true), -		Likeable:            util.Ptr(false), -	} +	// Set the replyingAccount's last fetched_at +	// date to something recent so no refresh is attempted. +	replyingAccount.FetchedAt = time.Now() +	err := suite.state.DB.UpdateAccount(context.Background(), replyingAccount, "fetched_at") +	suite.NoError(err) + +	// Get replying statusable to use from remote test statuses. +	const replyingURI = "http://fossbros-anonymous.io/users/foss_satan/statuses/106221634728637552" +	replyingStatusable := testrig.NewTestFediStatuses()[replyingURI] +	ap.AppendInReplyTo(replyingStatusable, testrig.URLMustParse(repliedStatus.URI)) +	// Open a websocket stream to later test the streamed status reply.  	wssStream, errWithCode := suite.processor.Stream().Open(context.Background(), repliedAccount, stream.TimelineHome)  	suite.NoError(errWithCode) -	// id the status based on the time it was created -	statusID, err := id.NewULIDFromTime(replyingStatus.CreatedAt) -	suite.NoError(err) -	replyingStatus.ID = statusID - -	err = suite.db.PutStatus(context.Background(), replyingStatus) -	suite.NoError(err) - +	// Send the replied status off to the fedi worker to be further processed.  	err = suite.processor.Workers().ProcessFromFediAPI(context.Background(), messages.FromFediAPI{  		APObjectType:     ap.ObjectNote,  		APActivityType:   ap.ActivityCreate, -		GTSModel:         replyingStatus, +		APObjectModel:    replyingStatusable,  		ReceivingAccount: suite.testAccounts["local_account_1"],  	})  	suite.NoError(err)  	// side effects should be triggered  	// 1. status should be in the database -	suite.NotEmpty(replyingStatus.ID) -	_, err = suite.db.GetStatusByID(context.Background(), replyingStatus.ID) +	replyingStatus, err := suite.state.DB.GetStatusByURI(context.Background(), replyingURI)  	suite.NoError(err)  	// 2. a notification should exist for the mention | 
