diff options
Diffstat (limited to 'internal/processing/workers/fromfediapi.go')
-rw-r--r-- | internal/processing/workers/fromfediapi.go | 137 |
1 files changed, 81 insertions, 56 deletions
diff --git a/internal/processing/workers/fromfediapi.go b/internal/processing/workers/fromfediapi.go index 0b1106a9e..fcd5b38f2 100644 --- a/internal/processing/workers/fromfediapi.go +++ b/internal/processing/workers/fromfediapi.go @@ -19,13 +19,14 @@ package workers import ( "context" + "errors" "codeberg.org/gruf/go-kv" "codeberg.org/gruf/go-logger/v2/level" "github.com/superseriousbusiness/gotosocial/internal/ap" + "github.com/superseriousbusiness/gotosocial/internal/db" "github.com/superseriousbusiness/gotosocial/internal/federation/dereferencing" - "github.com/superseriousbusiness/gotosocial/internal/gtscontext" "github.com/superseriousbusiness/gotosocial/internal/gtserror" "github.com/superseriousbusiness/gotosocial/internal/gtsmodel" "github.com/superseriousbusiness/gotosocial/internal/log" @@ -43,34 +44,20 @@ type fediAPI struct { surface *surface federate *federate account *account.Processor - utilF *utilF + utils *utils } -func (p *Processor) EnqueueFediAPI(cctx context.Context, msgs ...messages.FromFediAPI) { - _ = p.workers.Federator.MustEnqueueCtx(cctx, func(wctx context.Context) { - // Copy caller ctx values to worker's. - wctx = gtscontext.WithValues(wctx, cctx) - - // Process worker messages. - for _, msg := range msgs { - if err := p.ProcessFromFediAPI(wctx, msg); err != nil { - log.Errorf(wctx, "error processing fedi API message: %v", err) - } - } - }) -} - -func (p *Processor) ProcessFromFediAPI(ctx context.Context, fMsg messages.FromFediAPI) error { +func (p *Processor) ProcessFromFediAPI(ctx context.Context, fMsg *messages.FromFediAPI) error { // Allocate new log fields slice fields := make([]kv.Field, 3, 5) fields[0] = kv.Field{"activityType", fMsg.APActivityType} fields[1] = kv.Field{"objectType", fMsg.APObjectType} - fields[2] = kv.Field{"toAccount", fMsg.ReceivingAccount.Username} + fields[2] = kv.Field{"toAccount", fMsg.Receiving.Username} - if fMsg.APIri != nil { + if fMsg.APIRI != nil { // An IRI was supplied, append to log fields = append(fields, kv.Field{ - "iri", fMsg.APIri, + "iri", fMsg.APIRI, }) } @@ -168,7 +155,7 @@ func (p *Processor) ProcessFromFediAPI(ctx context.Context, fMsg messages.FromFe return gtserror.Newf("unhandled: %s %s", fMsg.APActivityType, fMsg.APObjectType) } -func (p *fediAPI) CreateStatus(ctx context.Context, fMsg messages.FromFediAPI) error { +func (p *fediAPI) CreateStatus(ctx context.Context, fMsg *messages.FromFediAPI) error { var ( status *gtsmodel.Status statusable ap.Statusable @@ -178,11 +165,11 @@ func (p *fediAPI) CreateStatus(ctx context.Context, fMsg messages.FromFediAPI) e var ok bool switch { - case fMsg.APObjectModel != nil: + case fMsg.APObject != nil: // A model was provided, extract this from message. - statusable, ok = fMsg.APObjectModel.(ap.Statusable) + statusable, ok = fMsg.APObject.(ap.Statusable) if !ok { - return gtserror.Newf("cannot cast %T -> ap.Statusable", fMsg.APObjectModel) + return gtserror.Newf("cannot cast %T -> ap.Statusable", fMsg.APObject) } // Create bare-bones model to pass @@ -196,7 +183,7 @@ func (p *fediAPI) CreateStatus(ctx context.Context, fMsg messages.FromFediAPI) e // statusable model, which it will use to further flesh out // the bare bones model and insert it into the database. status, statusable, err = p.federate.RefreshStatus(ctx, - fMsg.ReceivingAccount.Username, + fMsg.Receiving.Username, bareStatus, statusable, // Force refresh within 5min window. @@ -206,15 +193,15 @@ func (p *fediAPI) CreateStatus(ctx context.Context, fMsg messages.FromFediAPI) e return gtserror.Newf("error processing new status %s: %w", bareStatus.URI, err) } - case fMsg.APIri != nil: + case fMsg.APIRI != nil: // Model was not set, deref with IRI (this is a forward). // This will also cause the status to be inserted into the db. status, statusable, err = p.federate.GetStatusByURI(ctx, - fMsg.ReceivingAccount.Username, - fMsg.APIri, + fMsg.Receiving.Username, + fMsg.APIRI, ) if err != nil { - return gtserror.Newf("error dereferencing forwarded status %s: %w", fMsg.APIri, err) + return gtserror.Newf("error dereferencing forwarded status %s: %w", fMsg.APIRI, err) } default: @@ -230,7 +217,7 @@ func (p *fediAPI) CreateStatus(ctx context.Context, fMsg messages.FromFediAPI) e } // Update stats for the remote account. - if err := p.utilF.incrementStatusesCount(ctx, fMsg.RequestingAccount, status); err != nil { + if err := p.utils.incrementStatusesCount(ctx, fMsg.Requesting, status); err != nil { log.Errorf(ctx, "error updating account stats: %v", err) } @@ -248,7 +235,7 @@ func (p *fediAPI) CreateStatus(ctx context.Context, fMsg messages.FromFediAPI) e return nil } -func (p *fediAPI) CreatePollVote(ctx context.Context, fMsg messages.FromFediAPI) error { +func (p *fediAPI) CreatePollVote(ctx context.Context, fMsg *messages.FromFediAPI) error { // Cast poll vote type from the worker message. vote, ok := fMsg.GTSModel.(*gtsmodel.PollVote) if !ok { @@ -293,7 +280,7 @@ func (p *fediAPI) CreatePollVote(ctx context.Context, fMsg messages.FromFediAPI) return nil } -func (p *fediAPI) CreateFollowReq(ctx context.Context, fMsg messages.FromFediAPI) error { +func (p *fediAPI) CreateFollowReq(ctx context.Context, fMsg *messages.FromFediAPI) error { followRequest, ok := fMsg.GTSModel.(*gtsmodel.FollowRequest) if !ok { return gtserror.Newf("%T not parseable as *gtsmodel.FollowRequest", fMsg.GTSModel) @@ -310,7 +297,7 @@ func (p *fediAPI) CreateFollowReq(ctx context.Context, fMsg messages.FromFediAPI } // And update stats for the local account. - if err := p.utilF.incrementFollowRequestsCount(ctx, fMsg.ReceivingAccount); err != nil { + if err := p.utils.incrementFollowRequestsCount(ctx, fMsg.Receiving); err != nil { log.Errorf(ctx, "error updating account stats: %v", err) } @@ -330,12 +317,12 @@ func (p *fediAPI) CreateFollowReq(ctx context.Context, fMsg messages.FromFediAPI } // Update stats for the local account. - if err := p.utilF.incrementFollowersCount(ctx, fMsg.ReceivingAccount); err != nil { + if err := p.utils.incrementFollowersCount(ctx, fMsg.Receiving); err != nil { log.Errorf(ctx, "error updating account stats: %v", err) } // Update stats for the remote account. - if err := p.utilF.incrementFollowingCount(ctx, fMsg.RequestingAccount); err != nil { + if err := p.utils.incrementFollowingCount(ctx, fMsg.Requesting); err != nil { log.Errorf(ctx, "error updating account stats: %v", err) } @@ -350,7 +337,7 @@ func (p *fediAPI) CreateFollowReq(ctx context.Context, fMsg messages.FromFediAPI return nil } -func (p *fediAPI) CreateLike(ctx context.Context, fMsg messages.FromFediAPI) error { +func (p *fediAPI) CreateLike(ctx context.Context, fMsg *messages.FromFediAPI) error { fave, ok := fMsg.GTSModel.(*gtsmodel.StatusFave) if !ok { return gtserror.Newf("%T not parseable as *gtsmodel.StatusFave", fMsg.GTSModel) @@ -372,7 +359,7 @@ func (p *fediAPI) CreateLike(ctx context.Context, fMsg messages.FromFediAPI) err return nil } -func (p *fediAPI) CreateAnnounce(ctx context.Context, fMsg messages.FromFediAPI) error { +func (p *fediAPI) CreateAnnounce(ctx context.Context, fMsg *messages.FromFediAPI) error { boost, ok := fMsg.GTSModel.(*gtsmodel.Status) if !ok { return gtserror.Newf("%T not parseable as *gtsmodel.Status", fMsg.GTSModel) @@ -386,7 +373,7 @@ func (p *fediAPI) CreateAnnounce(ctx context.Context, fMsg messages.FromFediAPI) boost, err = p.federate.EnrichAnnounce( ctx, boost, - fMsg.ReceivingAccount.Username, + fMsg.Receiving.Username, ) if err != nil { if gtserror.IsUnretrievable(err) { @@ -400,7 +387,7 @@ func (p *fediAPI) CreateAnnounce(ctx context.Context, fMsg messages.FromFediAPI) } // Update stats for the remote account. - if err := p.utilF.incrementStatusesCount(ctx, fMsg.RequestingAccount, boost); err != nil { + if err := p.utils.incrementStatusesCount(ctx, fMsg.Requesting, boost); err != nil { log.Errorf(ctx, "error updating account stats: %v", err) } @@ -420,7 +407,7 @@ func (p *fediAPI) CreateAnnounce(ctx context.Context, fMsg messages.FromFediAPI) return nil } -func (p *fediAPI) CreateBlock(ctx context.Context, fMsg messages.FromFediAPI) error { +func (p *fediAPI) CreateBlock(ctx context.Context, fMsg *messages.FromFediAPI) error { block, ok := fMsg.GTSModel.(*gtsmodel.Block) if !ok { return gtserror.Newf("%T not parseable as *gtsmodel.Block", fMsg.GTSModel) @@ -499,7 +486,7 @@ func (p *fediAPI) CreateBlock(ctx context.Context, fMsg messages.FromFediAPI) er return nil } -func (p *fediAPI) CreateFlag(ctx context.Context, fMsg messages.FromFediAPI) error { +func (p *fediAPI) CreateFlag(ctx context.Context, fMsg *messages.FromFediAPI) error { incomingReport, ok := fMsg.GTSModel.(*gtsmodel.Report) if !ok { return gtserror.Newf("%T not parseable as *gtsmodel.Report", fMsg.GTSModel) @@ -515,7 +502,7 @@ func (p *fediAPI) CreateFlag(ctx context.Context, fMsg messages.FromFediAPI) err return nil } -func (p *fediAPI) UpdateAccount(ctx context.Context, fMsg messages.FromFediAPI) error { +func (p *fediAPI) UpdateAccount(ctx context.Context, fMsg *messages.FromFediAPI) error { // Parse the old/existing account model. account, ok := fMsg.GTSModel.(*gtsmodel.Account) if !ok { @@ -523,15 +510,15 @@ func (p *fediAPI) UpdateAccount(ctx context.Context, fMsg messages.FromFediAPI) } // Because this was an Update, the new Accountable should be set on the message. - apubAcc, ok := fMsg.APObjectModel.(ap.Accountable) + apubAcc, ok := fMsg.APObject.(ap.Accountable) if !ok { - return gtserror.Newf("cannot cast %T -> ap.Accountable", fMsg.APObjectModel) + return gtserror.Newf("cannot cast %T -> ap.Accountable", fMsg.APObject) } // Fetch up-to-date bio, avatar, header, etc. _, _, err := p.federate.RefreshAccount( ctx, - fMsg.ReceivingAccount.Username, + fMsg.Receiving.Username, account, apubAcc, // Force refresh within 5min window. @@ -544,25 +531,25 @@ func (p *fediAPI) UpdateAccount(ctx context.Context, fMsg messages.FromFediAPI) return nil } -func (p *fediAPI) AcceptFollow(ctx context.Context, fMsg messages.FromFediAPI) error { +func (p *fediAPI) AcceptFollow(ctx context.Context, fMsg *messages.FromFediAPI) error { // Update stats for the remote account. - if err := p.utilF.decrementFollowRequestsCount(ctx, fMsg.RequestingAccount); err != nil { + if err := p.utils.decrementFollowRequestsCount(ctx, fMsg.Requesting); err != nil { log.Errorf(ctx, "error updating account stats: %v", err) } - if err := p.utilF.incrementFollowersCount(ctx, fMsg.RequestingAccount); err != nil { + if err := p.utils.incrementFollowersCount(ctx, fMsg.Requesting); err != nil { log.Errorf(ctx, "error updating account stats: %v", err) } // Update stats for the local account. - if err := p.utilF.incrementFollowingCount(ctx, fMsg.ReceivingAccount); err != nil { + if err := p.utils.incrementFollowingCount(ctx, fMsg.Receiving); err != nil { log.Errorf(ctx, "error updating account stats: %v", err) } return nil } -func (p *fediAPI) UpdateStatus(ctx context.Context, fMsg messages.FromFediAPI) error { +func (p *fediAPI) UpdateStatus(ctx context.Context, fMsg *messages.FromFediAPI) error { // Cast the existing Status model attached to msg. existing, ok := fMsg.GTSModel.(*gtsmodel.Status) if !ok { @@ -570,12 +557,12 @@ func (p *fediAPI) UpdateStatus(ctx context.Context, fMsg messages.FromFediAPI) e } // Cast the updated ActivityPub statusable object . - apStatus, _ := fMsg.APObjectModel.(ap.Statusable) + apStatus, _ := fMsg.APObject.(ap.Statusable) // Fetch up-to-date attach status attachments, etc. status, _, err := p.federate.RefreshStatus( ctx, - fMsg.ReceivingAccount.Username, + fMsg.Receiving.Username, existing, apStatus, // Force refresh within 5min window. @@ -605,7 +592,7 @@ func (p *fediAPI) UpdateStatus(ctx context.Context, fMsg messages.FromFediAPI) e return nil } -func (p *fediAPI) DeleteStatus(ctx context.Context, fMsg messages.FromFediAPI) error { +func (p *fediAPI) DeleteStatus(ctx context.Context, fMsg *messages.FromFediAPI) error { // Delete attachments from this status, since this request // comes from the federating API, and there's no way the // poster can do a delete + redraft for it on our instance. @@ -616,12 +603,34 @@ func (p *fediAPI) DeleteStatus(ctx context.Context, fMsg messages.FromFediAPI) e return gtserror.Newf("%T not parseable as *gtsmodel.Status", fMsg.GTSModel) } - if err := p.utilF.wipeStatus(ctx, status, deleteAttachments); err != nil { + // Try to populate status structs if possible, + // in order to more thoroughly remove them. + if err := p.state.DB.PopulateStatus( + ctx, status, + ); err != nil && !errors.Is(err, db.ErrNoEntries) { + return gtserror.Newf("db error populating status: %w", err) + } + + // Drop any outgoing queued AP requests about / targeting + // this status, (stops queued likes, boosts, creates etc). + p.state.Workers.Delivery.Queue.Delete("ObjectID", status.URI) + p.state.Workers.Delivery.Queue.Delete("TargetID", status.URI) + + // Drop any incoming queued client messages about / targeting + // status, (stops processing of local origin data for status). + p.state.Workers.Client.Queue.Delete("TargetURI", status.URI) + + // Drop any incoming queued federator messages targeting status, + // (stops processing of remote origin data targeting this status). + p.state.Workers.Federator.Queue.Delete("TargetURI", status.URI) + + // First perform the actual status deletion. + if err := p.utils.wipeStatus(ctx, status, deleteAttachments); err != nil { log.Errorf(ctx, "error wiping status: %v", err) } // Update stats for the remote account. - if err := p.utilF.decrementStatusesCount(ctx, fMsg.RequestingAccount); err != nil { + if err := p.utils.decrementStatusesCount(ctx, fMsg.Requesting); err != nil { log.Errorf(ctx, "error updating account stats: %v", err) } @@ -634,12 +643,28 @@ func (p *fediAPI) DeleteStatus(ctx context.Context, fMsg messages.FromFediAPI) e return nil } -func (p *fediAPI) DeleteAccount(ctx context.Context, fMsg messages.FromFediAPI) error { +func (p *fediAPI) DeleteAccount(ctx context.Context, fMsg *messages.FromFediAPI) error { account, ok := fMsg.GTSModel.(*gtsmodel.Account) if !ok { return gtserror.Newf("%T not parseable as *gtsmodel.Account", fMsg.GTSModel) } + // Drop any outgoing queued AP requests to / from / targeting + // this account, (stops queued likes, boosts, creates etc). + p.state.Workers.Delivery.Queue.Delete("ObjectID", account.URI) + p.state.Workers.Delivery.Queue.Delete("TargetID", account.URI) + + // Drop any incoming queued client messages to / from this + // account, (stops processing of local origin data for acccount). + p.state.Workers.Client.Queue.Delete("Target.ID", account.ID) + p.state.Workers.Client.Queue.Delete("TargetURI", account.URI) + + // Drop any incoming queued federator messages to this account, + // (stops processing of remote origin data targeting this account). + p.state.Workers.Federator.Queue.Delete("Requesting.ID", account.ID) + p.state.Workers.Federator.Queue.Delete("TargetURI", account.URI) + + // First perform the actual account deletion. if err := p.account.Delete(ctx, account, account.ID); err != nil { log.Errorf(ctx, "error deleting account: %v", err) } |