summaryrefslogtreecommitdiff
path: root/internal/processing
diff options
context:
space:
mode:
authorLibravatar kim <89579420+NyaaaWhatsUpDoc@users.noreply.github.com>2023-11-04 20:21:20 +0000
committerLibravatar GitHub <noreply@github.com>2023-11-04 20:21:20 +0000
commit41435a6c4ee0a5b52528890edf3fbf5a9dc0a6c8 (patch)
tree987b5d7787b24f6f6e340bbcf7fd1b052fe40dfc /internal/processing
parent[docs/bugfix] fix link to swagger yaml (#2333) (diff)
downloadgotosocial-41435a6c4ee0a5b52528890edf3fbf5a9dc0a6c8.tar.xz
[feature] support canceling scheduled tasks, some federation API performance improvements (#2329)
Diffstat (limited to 'internal/processing')
-rw-r--r--internal/processing/search/get.go1
-rw-r--r--internal/processing/workers/fromfediapi.go101
-rw-r--r--internal/processing/workers/fromfediapi_test.go50
3 files changed, 58 insertions, 94 deletions
diff --git a/internal/processing/search/get.go b/internal/processing/search/get.go
index 30a2745af..4c09f05bb 100644
--- a/internal/processing/search/get.go
+++ b/internal/processing/search/get.go
@@ -603,7 +603,6 @@ func (p *Processor) statusByURI(
requestingAccount.Username,
uri,
)
-
return status, err
}
diff --git a/internal/processing/workers/fromfediapi.go b/internal/processing/workers/fromfediapi.go
index f57235bf1..1ce3b6076 100644
--- a/internal/processing/workers/fromfediapi.go
+++ b/internal/processing/workers/fromfediapi.go
@@ -32,6 +32,7 @@ import (
"github.com/superseriousbusiness/gotosocial/internal/messages"
"github.com/superseriousbusiness/gotosocial/internal/processing/account"
"github.com/superseriousbusiness/gotosocial/internal/state"
+ "github.com/superseriousbusiness/gotosocial/internal/util"
)
// fediAPI wraps processing functions
@@ -142,27 +143,22 @@ func (p *Processor) ProcessFromFediAPI(ctx context.Context, fMsg messages.FromFe
}
}
- return nil
+ return gtserror.Newf("unhandled: %s %s", fMsg.APActivityType, fMsg.APObjectType)
}
func (p *fediAPI) CreateStatus(ctx context.Context, fMsg messages.FromFediAPI) error {
var (
status *gtsmodel.Status
err error
-
- // Check the federatorMsg for either an already dereferenced
- // and converted status pinned to the message, or a forwarded
- // AP IRI that we still need to deref.
- forwarded = (fMsg.GTSModel == nil)
)
- if forwarded {
- // Model was not set, deref with IRI.
+ if fMsg.APObjectModel == nil /* i.e. forwarded */ {
+ // Model was not set, deref with IRI (this is a forward).
// This will also cause the status to be inserted into the db.
status, err = p.statusFromAPIRI(ctx, fMsg)
} else {
// Model is set, ensure we have the most up-to-date model.
- status, err = p.statusFromGTSModel(ctx, fMsg)
+ status, err = p.statusFromAPModel(ctx, fMsg)
}
if err != nil {
@@ -188,19 +184,10 @@ func (p *fediAPI) CreateStatus(ctx context.Context, fMsg messages.FromFediAPI) e
}
}
- // Ensure status ancestors dereferenced. We need at least the
- // immediate parent (if present) to ascertain timelineability.
- if err := p.federate.DereferenceStatusAncestors(
- ctx,
- fMsg.ReceivingAccount.Username,
- status,
- ); err != nil {
- return err
- }
-
if status.InReplyToID != "" {
- // Interaction counts changed on the replied status;
- // uncache the prepared version from all timelines.
+ // Interaction counts changed on the replied status; uncache the
+ // prepared version from all timelines. The status dereferencer
+ // functions will ensure necessary ancestors exist before this point.
p.surface.invalidateStatusFromTimelines(ctx, status.InReplyToID)
}
@@ -211,23 +198,31 @@ func (p *fediAPI) CreateStatus(ctx context.Context, fMsg messages.FromFediAPI) e
return nil
}
-func (p *fediAPI) statusFromGTSModel(ctx context.Context, fMsg messages.FromFediAPI) (*gtsmodel.Status, error) {
- // There should be a status pinned to the message:
- // we've already checked to ensure this is not nil.
- status, ok := fMsg.GTSModel.(*gtsmodel.Status)
+func (p *fediAPI) statusFromAPModel(ctx context.Context, fMsg messages.FromFediAPI) (*gtsmodel.Status, error) {
+ // AP statusable representation MUST have been set.
+ statusable, ok := fMsg.APObjectModel.(ap.Statusable)
if !ok {
- err := gtserror.New("Note was not parseable as *gtsmodel.Status")
- return nil, err
+ return nil, gtserror.Newf("cannot cast %T -> ap.Statusable", fMsg.APObjectModel)
}
- // AP statusable representation may have also
- // been set on message (no problem if not).
- statusable, _ := fMsg.APObjectModel.(ap.Statusable)
+ // Status may have been set (no problem if not).
+ status, _ := fMsg.GTSModel.(*gtsmodel.Status)
+
+ if status == nil {
+ // No status was set, create a bare-bones
+ // model for the deferencer to flesh-out,
+ // this indicates it is a new (to us) status.
+ status = &gtsmodel.Status{
- // Call refresh on status to update
- // it (deref remote) if necessary.
- var err error
- status, _, err = p.federate.RefreshStatus(
+ // if coming in here status will ALWAYS be remote.
+ Local: util.Ptr(false),
+ URI: ap.GetJSONLDId(statusable).String(),
+ }
+ }
+
+ // Call refresh on status to either update existing
+ // model, or parse + insert status from statusable data.
+ status, _, err := p.federate.RefreshStatus(
ctx,
fMsg.ReceivingAccount.Username,
status,
@@ -235,7 +230,7 @@ func (p *fediAPI) statusFromGTSModel(ctx context.Context, fMsg messages.FromFedi
false, // Don't force refresh.
)
if err != nil {
- return nil, gtserror.Newf("%w", err)
+ return nil, gtserror.Newf("error refreshing status: %w", err)
}
return status, nil
@@ -245,11 +240,8 @@ func (p *fediAPI) statusFromAPIRI(ctx context.Context, fMsg messages.FromFediAPI
// There should be a status IRI pinned to
// the federatorMsg for us to dereference.
if fMsg.APIri == nil {
- err := gtserror.New(
- "status was not pinned to federatorMsg, " +
- "and neither was an IRI for us to dereference",
- )
- return nil, err
+ const text = "neither APObjectModel nor APIri set"
+ return nil, gtserror.New(text)
}
// Get the status + ensure we have
@@ -260,7 +252,7 @@ func (p *fediAPI) statusFromAPIRI(ctx context.Context, fMsg messages.FromFediAPI
fMsg.APIri,
)
if err != nil {
- return nil, gtserror.Newf("%w", err)
+ return nil, gtserror.Newf("error getting status by uri %s: %w", fMsg.APIri, err)
}
return status, nil
@@ -337,7 +329,9 @@ func (p *fediAPI) CreateAnnounce(ctx context.Context, fMsg messages.FromFediAPI)
return gtserror.Newf("%T not parseable as *gtsmodel.Status", fMsg.GTSModel)
}
- // Dereference status that this status boosts.
+ // Dereference status that this boosts, note
+ // that this will handle dereferencing the status
+ // ancestors / descendants where appropriate.
if err := p.federate.DereferenceAnnounce(
ctx,
status,
@@ -358,15 +352,6 @@ func (p *fediAPI) CreateAnnounce(ctx context.Context, fMsg messages.FromFediAPI)
return gtserror.Newf("db error inserting status: %w", err)
}
- // Ensure boosted status ancestors dereferenced. We need at least
- // the immediate parent (if present) to ascertain timelineability.
- if err := p.federate.DereferenceStatusAncestors(ctx,
- fMsg.ReceivingAccount.Username,
- status.BoostOf,
- ); err != nil {
- return err
- }
-
// Timeline and notify the announce.
if err := p.surface.timelineAndNotifyStatus(ctx, status); err != nil {
return gtserror.Newf("error timelining status: %w", err)
@@ -526,23 +511,25 @@ func (p *fediAPI) UpdateStatus(ctx context.Context, fMsg messages.FromFediAPI) e
}
// Cast the updated ActivityPub statusable object .
- apStatus, ok := fMsg.APObjectModel.(ap.Statusable)
- if !ok {
- return gtserror.Newf("cannot cast %T -> ap.Statusable", fMsg.APObjectModel)
- }
+ apStatus, _ := fMsg.APObjectModel.(ap.Statusable)
// Fetch up-to-date attach status attachments, etc.
- _, _, err := p.federate.RefreshStatus(
+ _, statusable, err := p.federate.RefreshStatus(
ctx,
fMsg.ReceivingAccount.Username,
existing,
apStatus,
- false,
+ true,
)
if err != nil {
return gtserror.Newf("error refreshing updated status: %w", err)
}
+ if statusable != nil {
+ // Status representation was refetched, uncache from timelines.
+ p.surface.invalidateStatusFromTimelines(ctx, existing.ID)
+ }
+
return nil
}
diff --git a/internal/processing/workers/fromfediapi_test.go b/internal/processing/workers/fromfediapi_test.go
index f8e3941fc..b8d86ac45 100644
--- a/internal/processing/workers/fromfediapi_test.go
+++ b/internal/processing/workers/fromfediapi_test.go
@@ -29,7 +29,6 @@ import (
apimodel "github.com/superseriousbusiness/gotosocial/internal/api/model"
"github.com/superseriousbusiness/gotosocial/internal/db"
"github.com/superseriousbusiness/gotosocial/internal/gtsmodel"
- "github.com/superseriousbusiness/gotosocial/internal/id"
"github.com/superseriousbusiness/gotosocial/internal/messages"
"github.com/superseriousbusiness/gotosocial/internal/stream"
"github.com/superseriousbusiness/gotosocial/internal/util"
@@ -92,54 +91,33 @@ func (suite *FromFediAPITestSuite) TestProcessReplyMention() {
repliedStatus := suite.testStatuses["local_account_1_status_1"]
replyingAccount := suite.testAccounts["remote_account_1"]
- replyingStatus := &gtsmodel.Status{
- CreatedAt: time.Now(),
- UpdatedAt: time.Now(),
- URI: "http://fossbros-anonymous.io/users/foss_satan/statuses/106221634728637552",
- URL: "http://fossbros-anonymous.io/@foss_satan/106221634728637552",
- Content: `<p><span class="h-card"><a href="http://localhost:8080/@the_mighty_zork" class="u-url mention">@<span>the_mighty_zork</span></a></span> nice there it is:</p><p><a href="http://localhost:8080/users/the_mighty_zork/statuses/01F8MHAMCHF6Y650WCRSCP4WMY/activity" rel="nofollow noopener noreferrer" target="_blank"><span class="invisible">https://</span><span class="ellipsis">social.pixie.town/users/f0x/st</span><span class="invisible">atuses/106221628567855262/activity</span></a></p>`,
- Mentions: []*gtsmodel.Mention{
- {
- TargetAccountURI: repliedAccount.URI,
- NameString: "@the_mighty_zork@localhost:8080",
- },
- },
- AccountID: replyingAccount.ID,
- AccountURI: replyingAccount.URI,
- InReplyToID: repliedStatus.ID,
- InReplyToURI: repliedStatus.URI,
- InReplyToAccountID: repliedAccount.ID,
- Visibility: gtsmodel.VisibilityUnlocked,
- ActivityStreamsType: ap.ObjectNote,
- Federated: util.Ptr(true),
- Boostable: util.Ptr(true),
- Replyable: util.Ptr(true),
- Likeable: util.Ptr(false),
- }
+ // Set the replyingAccount's last fetched_at
+ // date to something recent so no refresh is attempted.
+ replyingAccount.FetchedAt = time.Now()
+ err := suite.state.DB.UpdateAccount(context.Background(), replyingAccount, "fetched_at")
+ suite.NoError(err)
+
+ // Get replying statusable to use from remote test statuses.
+ const replyingURI = "http://fossbros-anonymous.io/users/foss_satan/statuses/106221634728637552"
+ replyingStatusable := testrig.NewTestFediStatuses()[replyingURI]
+ ap.AppendInReplyTo(replyingStatusable, testrig.URLMustParse(repliedStatus.URI))
+ // Open a websocket stream to later test the streamed status reply.
wssStream, errWithCode := suite.processor.Stream().Open(context.Background(), repliedAccount, stream.TimelineHome)
suite.NoError(errWithCode)
- // id the status based on the time it was created
- statusID, err := id.NewULIDFromTime(replyingStatus.CreatedAt)
- suite.NoError(err)
- replyingStatus.ID = statusID
-
- err = suite.db.PutStatus(context.Background(), replyingStatus)
- suite.NoError(err)
-
+ // Send the replied status off to the fedi worker to be further processed.
err = suite.processor.Workers().ProcessFromFediAPI(context.Background(), messages.FromFediAPI{
APObjectType: ap.ObjectNote,
APActivityType: ap.ActivityCreate,
- GTSModel: replyingStatus,
+ APObjectModel: replyingStatusable,
ReceivingAccount: suite.testAccounts["local_account_1"],
})
suite.NoError(err)
// side effects should be triggered
// 1. status should be in the database
- suite.NotEmpty(replyingStatus.ID)
- _, err = suite.db.GetStatusByID(context.Background(), replyingStatus.ID)
+ replyingStatus, err := suite.state.DB.GetStatusByURI(context.Background(), replyingURI)
suite.NoError(err)
// 2. a notification should exist for the mention