diff options
author | 2024-08-24 11:49:37 +0200 | |
---|---|---|
committer | 2024-08-24 11:49:37 +0200 | |
commit | f23f04e0b1d117be714bf91d5266dab219ed741e (patch) | |
tree | 0b3ddd60d51c8729949c3669993910a7f8f32a7b /internal/processing/workers | |
parent | [performance] ffmpeg ffprobe wrapper improvements (#3225) (diff) | |
download | gotosocial-f23f04e0b1d117be714bf91d5266dab219ed741e.tar.xz |
[feature] Interaction requests client api + settings panel (#3215)
* [feature] Interaction requests client api + settings panel
* test accept / reject
* fmt
* don't pin rejected interaction
* use single db model for interaction accept, reject, and request
* swaggor
* env sharting
* append errors
* remove ErrNoEntries checks
* change intReqID to reqID
* rename "pend" to "request"
* markIntsPending -> mark interactionsPending
* use log instead of returning error when rejecting interaction
* empty migration
* jolly renaming
* make interactionURI unique again
* swag grr
* remove unnecessary locks
* invalidate as last step
Diffstat (limited to 'internal/processing/workers')
-rw-r--r-- | internal/processing/workers/federate.go | 92 | ||||
-rw-r--r-- | internal/processing/workers/fromclientapi.go | 398 | ||||
-rw-r--r-- | internal/processing/workers/fromclientapi_test.go | 72 | ||||
-rw-r--r-- | internal/processing/workers/fromfediapi.go | 170 | ||||
-rw-r--r-- | internal/processing/workers/fromfediapi_test.go | 36 | ||||
-rw-r--r-- | internal/processing/workers/surfacenotify_test.go | 5 | ||||
-rw-r--r-- | internal/processing/workers/util.go | 200 | ||||
-rw-r--r-- | internal/processing/workers/workers.go | 4 | ||||
-rw-r--r-- | internal/processing/workers/workers_test.go | 88 |
9 files changed, 692 insertions, 373 deletions
diff --git a/internal/processing/workers/federate.go b/internal/processing/workers/federate.go index 6e8b558c8..d14f38902 100644 --- a/internal/processing/workers/federate.go +++ b/internal/processing/workers/federate.go @@ -1127,17 +1127,17 @@ func (f *federate) MoveAccount(ctx context.Context, account *gtsmodel.Account) e func (f *federate) AcceptInteraction( ctx context.Context, - approval *gtsmodel.InteractionApproval, + req *gtsmodel.InteractionRequest, ) error { // Populate model. - if err := f.state.DB.PopulateInteractionApproval(ctx, approval); err != nil { - return gtserror.Newf("error populating approval: %w", err) + if err := f.state.DB.PopulateInteractionRequest(ctx, req); err != nil { + return gtserror.Newf("error populating request: %w", err) } // Bail if interacting account is ours: // we've already accepted internally and // shouldn't send an Accept to ourselves. - if approval.InteractingAccount.IsLocal() { + if req.InteractingAccount.IsLocal() { return nil } @@ -1145,27 +1145,27 @@ func (f *federate) AcceptInteraction( // we can't Accept on another // instance's behalf. (This // should never happen but...) - if approval.Account.IsRemote() { + if req.TargetAccount.IsRemote() { return nil } // Parse relevant URI(s). - outboxIRI, err := parseURI(approval.Account.OutboxURI) + outboxIRI, err := parseURI(req.TargetAccount.OutboxURI) if err != nil { return err } - acceptingAcctIRI, err := parseURI(approval.Account.URI) + acceptingAcctIRI, err := parseURI(req.TargetAccount.URI) if err != nil { return err } - interactingAcctURI, err := parseURI(approval.InteractingAccount.URI) + interactingAcctURI, err := parseURI(req.InteractingAccount.URI) if err != nil { return err } - interactionURI, err := parseURI(approval.InteractionURI) + interactionURI, err := parseURI(req.InteractionURI) if err != nil { return err } @@ -1190,7 +1190,79 @@ func (f *federate) AcceptInteraction( ); err != nil { return gtserror.Newf( "error sending activity %T for %v via outbox %s: %w", - accept, approval.InteractionType, outboxIRI, err, + accept, req.InteractionType, outboxIRI, err, + ) + } + + return nil +} + +func (f *federate) RejectInteraction( + ctx context.Context, + req *gtsmodel.InteractionRequest, +) error { + // Populate model. + if err := f.state.DB.PopulateInteractionRequest(ctx, req); err != nil { + return gtserror.Newf("error populating request: %w", err) + } + + // Bail if interacting account is ours: + // we've already rejected internally and + // shouldn't send an Reject to ourselves. + if req.InteractingAccount.IsLocal() { + return nil + } + + // Bail if account isn't ours: + // we can't Reject on another + // instance's behalf. (This + // should never happen but...) + if req.TargetAccount.IsRemote() { + return nil + } + + // Parse relevant URI(s). + outboxIRI, err := parseURI(req.TargetAccount.OutboxURI) + if err != nil { + return err + } + + rejectingAcctIRI, err := parseURI(req.TargetAccount.URI) + if err != nil { + return err + } + + interactingAcctURI, err := parseURI(req.InteractingAccount.URI) + if err != nil { + return err + } + + interactionURI, err := parseURI(req.InteractionURI) + if err != nil { + return err + } + + // Create a new Reject. + reject := streams.NewActivityStreamsReject() + + // Set interacted-with account + // as Actor of the Reject. + ap.AppendActorIRIs(reject, rejectingAcctIRI) + + // Set the interacted-with object + // as Object of the Reject. + ap.AppendObjectIRIs(reject, interactionURI) + + // Address the Reject To the interacting acct. + ap.AppendTo(reject, interactingAcctURI) + + // Send the Reject via the Actor's outbox. + if _, err := f.FederatingActor().Send( + ctx, outboxIRI, reject, + ); err != nil { + return gtserror.Newf( + "error sending activity %T for %v via outbox %s: %w", + reject, req.InteractionType, outboxIRI, err, ) } diff --git a/internal/processing/workers/fromclientapi.go b/internal/processing/workers/fromclientapi.go index 1a37341f8..c723a6001 100644 --- a/internal/processing/workers/fromclientapi.go +++ b/internal/processing/workers/fromclientapi.go @@ -20,18 +20,23 @@ package workers import ( "context" "errors" + "time" "codeberg.org/gruf/go-kv" "codeberg.org/gruf/go-logger/v2/level" "github.com/superseriousbusiness/gotosocial/internal/ap" "github.com/superseriousbusiness/gotosocial/internal/db" + "github.com/superseriousbusiness/gotosocial/internal/gtscontext" "github.com/superseriousbusiness/gotosocial/internal/gtserror" "github.com/superseriousbusiness/gotosocial/internal/gtsmodel" + "github.com/superseriousbusiness/gotosocial/internal/id" "github.com/superseriousbusiness/gotosocial/internal/log" "github.com/superseriousbusiness/gotosocial/internal/messages" "github.com/superseriousbusiness/gotosocial/internal/processing/account" + "github.com/superseriousbusiness/gotosocial/internal/processing/common" "github.com/superseriousbusiness/gotosocial/internal/state" "github.com/superseriousbusiness/gotosocial/internal/typeutils" + "github.com/superseriousbusiness/gotosocial/internal/uris" "github.com/superseriousbusiness/gotosocial/internal/util" ) @@ -44,6 +49,7 @@ type clientAPI struct { surface *Surface federate *federate account *account.Processor + common *common.Processor utils *utils } @@ -160,6 +166,18 @@ func (p *Processor) ProcessFromClientAPI(ctx context.Context, cMsg *messages.Fro // REJECT USER (ie., new user+account sign-up) case ap.ObjectProfile: return p.clientAPI.RejectUser(ctx, cMsg) + + // REJECT NOTE/STATUS (ie., reject a reply) + case ap.ObjectNote: + return p.clientAPI.RejectReply(ctx, cMsg) + + // REJECT LIKE + case ap.ActivityLike: + return p.clientAPI.RejectLike(ctx, cMsg) + + // REJECT BOOST + case ap.ActivityAnnounce: + return p.clientAPI.RejectAnnounce(ctx, cMsg) } // UNDO SOMETHING @@ -261,15 +279,13 @@ func (p *clientAPI) CreateStatus(ctx context.Context, cMsg *messages.FromClientA // and/or notify the account that's being // interacted with (if it's local): they can // approve or deny the interaction later. - - // Notify *local* account of pending reply. - if err := p.surface.notifyPendingReply(ctx, status); err != nil { - log.Errorf(ctx, "error notifying pending reply: %v", err) + if err := p.utils.requestReply(ctx, status); err != nil { + return gtserror.Newf("error pending reply: %w", err) } // Send Create to *remote* account inbox ONLY. if err := p.federate.CreateStatus(ctx, status); err != nil { - log.Errorf(ctx, "error federating pending reply: %v", err) + return gtserror.Newf("error federating pending reply: %w", err) } // Return early. @@ -285,14 +301,38 @@ func (p *clientAPI) CreateStatus(ctx context.Context, cMsg *messages.FromClientA // sending out the Create with the approval // URI attached. - // Put approval in the database and - // update the status with approvedBy URI. - approval, err := p.utils.approveReply(ctx, status) - if err != nil { - return gtserror.Newf("error pre-approving reply: %w", err) + // Store an already-accepted interaction request. + id := id.NewULID() + approval := >smodel.InteractionRequest{ + ID: id, + StatusID: status.InReplyToID, + TargetAccountID: status.InReplyToAccountID, + TargetAccount: status.InReplyToAccount, + InteractingAccountID: status.AccountID, + InteractingAccount: status.Account, + InteractionURI: status.URI, + InteractionType: gtsmodel.InteractionLike, + Reply: status, + URI: uris.GenerateURIForAccept(status.InReplyToAccount.Username, id), + AcceptedAt: time.Now(), + } + if err := p.state.DB.PutInteractionRequest(ctx, approval); err != nil { + return gtserror.Newf("db error putting pre-approved interaction request: %w", err) + } + + // Mark the status as now approved. + status.PendingApproval = util.Ptr(false) + status.PreApproved = false + status.ApprovedByURI = approval.URI + if err := p.state.DB.UpdateStatus( + ctx, + status, + "pending_approval", + "approved_by_uri", + ); err != nil { + return gtserror.Newf("db error updating status: %w", err) } - // Send out the approval as Accept. if err := p.federate.AcceptInteraction(ctx, approval); err != nil { return gtserror.Newf("error federating pre-approval of reply: %w", err) } @@ -309,16 +349,16 @@ func (p *clientAPI) CreateStatus(ctx context.Context, cMsg *messages.FromClientA log.Errorf(ctx, "error timelining and notifying status: %v", err) } + if err := p.federate.CreateStatus(ctx, status); err != nil { + log.Errorf(ctx, "error federating status: %v", err) + } + if status.InReplyToID != "" { // Interaction counts changed on the replied status; // uncache the prepared version from all timelines. p.surface.invalidateStatusFromTimelines(ctx, status.InReplyToID) } - if err := p.federate.CreateStatus(ctx, status); err != nil { - log.Errorf(ctx, "error federating status: %v", err) - } - return nil } @@ -344,9 +384,6 @@ func (p *clientAPI) CreatePollVote(ctx context.Context, cMsg *messages.FromClien status := vote.Poll.Status status.Poll = vote.Poll - // Interaction counts changed on the source status, uncache from timelines. - p.surface.invalidateStatusFromTimelines(ctx, vote.Poll.StatusID) - if *status.Local { // These are poll votes in a local status, we only need to // federate the updated status model with latest vote counts. @@ -360,6 +397,9 @@ func (p *clientAPI) CreatePollVote(ctx context.Context, cMsg *messages.FromClien } } + // Interaction counts changed on the source status, uncache from timelines. + p.surface.invalidateStatusFromTimelines(ctx, vote.Poll.StatusID) + return nil } @@ -429,10 +469,7 @@ func (p *clientAPI) CreateLike(ctx context.Context, cMsg *messages.FromClientAPI // If pending approval is true then fave must // target a status (either one of ours or a // remote) that requires approval for the fave. - pendingApproval := util.PtrOrValue( - fave.PendingApproval, - false, - ) + pendingApproval := util.PtrOrZero(fave.PendingApproval) switch { case pendingApproval && !fave.PreApproved: @@ -442,15 +479,13 @@ func (p *clientAPI) CreateLike(ctx context.Context, cMsg *messages.FromClientAPI // and/or notify the account that's being // interacted with (if it's local): they can // approve or deny the interaction later. - - // Notify *local* account of pending reply. - if err := p.surface.notifyPendingFave(ctx, fave); err != nil { - log.Errorf(ctx, "error notifying pending fave: %v", err) + if err := p.utils.requestFave(ctx, fave); err != nil { + return gtserror.Newf("error pending fave: %w", err) } // Send Like to *remote* account inbox ONLY. if err := p.federate.Like(ctx, fave); err != nil { - log.Errorf(ctx, "error federating pending Like: %v", err) + return gtserror.Newf("error federating pending Like: %v", err) } // Return early. @@ -466,14 +501,38 @@ func (p *clientAPI) CreateLike(ctx context.Context, cMsg *messages.FromClientAPI // sending out the Like with the approval // URI attached. - // Put approval in the database and - // update the fave with approvedBy URI. - approval, err := p.utils.approveFave(ctx, fave) - if err != nil { - return gtserror.Newf("error pre-approving fave: %w", err) + // Store an already-accepted interaction request. + id := id.NewULID() + approval := >smodel.InteractionRequest{ + ID: id, + StatusID: fave.StatusID, + TargetAccountID: fave.TargetAccountID, + TargetAccount: fave.TargetAccount, + InteractingAccountID: fave.AccountID, + InteractingAccount: fave.Account, + InteractionURI: fave.URI, + InteractionType: gtsmodel.InteractionLike, + Like: fave, + URI: uris.GenerateURIForAccept(fave.TargetAccount.Username, id), + AcceptedAt: time.Now(), + } + if err := p.state.DB.PutInteractionRequest(ctx, approval); err != nil { + return gtserror.Newf("db error putting pre-approved interaction request: %w", err) + } + + // Mark the fave itself as now approved. + fave.PendingApproval = util.Ptr(false) + fave.PreApproved = false + fave.ApprovedByURI = approval.URI + if err := p.state.DB.UpdateStatusFave( + ctx, + fave, + "pending_approval", + "approved_by_uri", + ); err != nil { + return gtserror.Newf("db error updating status fave: %w", err) } - // Send out the approval as Accept. if err := p.federate.AcceptInteraction(ctx, approval); err != nil { return gtserror.Newf("error federating pre-approval of fave: %w", err) } @@ -485,14 +544,14 @@ func (p *clientAPI) CreateLike(ctx context.Context, cMsg *messages.FromClientAPI log.Errorf(ctx, "error notifying fave: %v", err) } - // Interaction counts changed on the faved status; - // uncache the prepared version from all timelines. - p.surface.invalidateStatusFromTimelines(ctx, fave.StatusID) - if err := p.federate.Like(ctx, fave); err != nil { log.Errorf(ctx, "error federating like: %v", err) } + // Interaction counts changed on the faved status; + // uncache the prepared version from all timelines. + p.surface.invalidateStatusFromTimelines(ctx, fave.StatusID) + return nil } @@ -505,10 +564,7 @@ func (p *clientAPI) CreateAnnounce(ctx context.Context, cMsg *messages.FromClien // If pending approval is true then status must // boost a status (either one of ours or a // remote) that requires approval for the boost. - pendingApproval := util.PtrOrValue( - boost.PendingApproval, - false, - ) + pendingApproval := util.PtrOrZero(boost.PendingApproval) switch { case pendingApproval && !boost.PreApproved: @@ -518,15 +574,13 @@ func (p *clientAPI) CreateAnnounce(ctx context.Context, cMsg *messages.FromClien // and/or notify the account that's being // interacted with (if it's local): they can // approve or deny the interaction later. - - // Notify *local* account of pending announce. - if err := p.surface.notifyPendingAnnounce(ctx, boost); err != nil { - log.Errorf(ctx, "error notifying pending boost: %v", err) + if err := p.utils.requestAnnounce(ctx, boost); err != nil { + return gtserror.Newf("error pending boost: %w", err) } // Send Announce to *remote* account inbox ONLY. if err := p.federate.Announce(ctx, boost); err != nil { - log.Errorf(ctx, "error federating pending Announce: %v", err) + return gtserror.Newf("error federating pending Announce: %v", err) } // Return early. @@ -542,14 +596,38 @@ func (p *clientAPI) CreateAnnounce(ctx context.Context, cMsg *messages.FromClien // sending out the Create with the approval // URI attached. - // Put approval in the database and - // update the boost with approvedBy URI. - approval, err := p.utils.approveAnnounce(ctx, boost) - if err != nil { - return gtserror.Newf("error pre-approving boost: %w", err) + // Store an already-accepted interaction request. + id := id.NewULID() + approval := >smodel.InteractionRequest{ + ID: id, + StatusID: boost.BoostOfID, + TargetAccountID: boost.BoostOfAccountID, + TargetAccount: boost.BoostOfAccount, + InteractingAccountID: boost.AccountID, + InteractingAccount: boost.Account, + InteractionURI: boost.URI, + InteractionType: gtsmodel.InteractionLike, + Announce: boost, + URI: uris.GenerateURIForAccept(boost.BoostOfAccount.Username, id), + AcceptedAt: time.Now(), + } + if err := p.state.DB.PutInteractionRequest(ctx, approval); err != nil { + return gtserror.Newf("db error putting pre-approved interaction request: %w", err) + } + + // Mark the boost itself as now approved. + boost.PendingApproval = util.Ptr(false) + boost.PreApproved = false + boost.ApprovedByURI = approval.URI + if err := p.state.DB.UpdateStatus( + ctx, + boost, + "pending_approval", + "approved_by_uri", + ); err != nil { + return gtserror.Newf("db error updating status: %w", err) } - // Send out the approval as Accept. if err := p.federate.AcceptInteraction(ctx, approval); err != nil { return gtserror.Newf("error federating pre-approval of boost: %w", err) } @@ -572,14 +650,14 @@ func (p *clientAPI) CreateAnnounce(ctx context.Context, cMsg *messages.FromClien log.Errorf(ctx, "error notifying boost: %v", err) } - // Interaction counts changed on the boosted status; - // uncache the prepared version from all timelines. - p.surface.invalidateStatusFromTimelines(ctx, boost.BoostOfID) - if err := p.federate.Announce(ctx, boost); err != nil { log.Errorf(ctx, "error federating announce: %v", err) } + // Interaction counts changed on the boosted status; + // uncache the prepared version from all timelines. + p.surface.invalidateStatusFromTimelines(ctx, boost.BoostOfID) + return nil } @@ -629,9 +707,6 @@ func (p *clientAPI) UpdateStatus(ctx context.Context, cMsg *messages.FromClientA log.Errorf(ctx, "error federating status update: %v", err) } - // Status representation has changed, invalidate from timelines. - p.surface.invalidateStatusFromTimelines(ctx, status.ID) - if status.Poll != nil && status.Poll.Closing { // If the latest status has a newly closed poll, at least compared @@ -646,6 +721,9 @@ func (p *clientAPI) UpdateStatus(ctx context.Context, cMsg *messages.FromClientA log.Errorf(ctx, "error streaming status edit: %v", err) } + // Status representation has changed, invalidate from timelines. + p.surface.invalidateStatusFromTimelines(ctx, status.ID) + return nil } @@ -791,14 +869,14 @@ func (p *clientAPI) UndoFave(ctx context.Context, cMsg *messages.FromClientAPI) return gtserror.Newf("%T not parseable as *gtsmodel.StatusFave", cMsg.GTSModel) } - // Interaction counts changed on the faved status; - // uncache the prepared version from all timelines. - p.surface.invalidateStatusFromTimelines(ctx, statusFave.StatusID) - if err := p.federate.UndoLike(ctx, statusFave); err != nil { log.Errorf(ctx, "error federating like undo: %v", err) } + // Interaction counts changed on the faved status; + // uncache the prepared version from all timelines. + p.surface.invalidateStatusFromTimelines(ctx, statusFave.StatusID) + return nil } @@ -821,14 +899,14 @@ func (p *clientAPI) UndoAnnounce(ctx context.Context, cMsg *messages.FromClientA log.Errorf(ctx, "error removing timelined status: %v", err) } - // Interaction counts changed on the boosted status; - // uncache the prepared version from all timelines. - p.surface.invalidateStatusFromTimelines(ctx, status.BoostOfID) - if err := p.federate.UndoAnnounce(ctx, status); err != nil { log.Errorf(ctx, "error federating announce undo: %v", err) } + // Interaction counts changed on the boosted status; + // uncache the prepared version from all timelines. + p.surface.invalidateStatusFromTimelines(ctx, status.BoostOfID) + return nil } @@ -874,16 +952,16 @@ func (p *clientAPI) DeleteStatus(ctx context.Context, cMsg *messages.FromClientA log.Errorf(ctx, "error updating account stats: %v", err) } + if err := p.federate.DeleteStatus(ctx, status); err != nil { + log.Errorf(ctx, "error federating status delete: %v", err) + } + if status.InReplyToID != "" { // Interaction counts changed on the replied status; // uncache the prepared version from all timelines. p.surface.invalidateStatusFromTimelines(ctx, status.InReplyToID) } - if err := p.federate.DeleteStatus(ctx, status); err != nil { - log.Errorf(ctx, "error federating status delete: %v", err) - } - return nil } @@ -1050,16 +1128,188 @@ func (p *clientAPI) RejectUser(ctx context.Context, cMsg *messages.FromClientAPI } func (p *clientAPI) AcceptLike(ctx context.Context, cMsg *messages.FromClientAPI) error { - // TODO + req, ok := cMsg.GTSModel.(*gtsmodel.InteractionRequest) + if !ok { + return gtserror.Newf("%T not parseable as *gtsmodel.InteractionRequest", cMsg.GTSModel) + } + + // Notify the fave (distinct from the notif for the pending fave). + if err := p.surface.notifyFave(ctx, req.Like); err != nil { + log.Errorf(ctx, "error notifying fave: %v", err) + } + + // Send out the Accept. + if err := p.federate.AcceptInteraction(ctx, req); err != nil { + log.Errorf(ctx, "error federating approval of like: %v", err) + } + + // Interaction counts changed on the faved status; + // uncache the prepared version from all timelines. + p.surface.invalidateStatusFromTimelines(ctx, req.Like.StatusID) + return nil } func (p *clientAPI) AcceptReply(ctx context.Context, cMsg *messages.FromClientAPI) error { - // TODO + req, ok := cMsg.GTSModel.(*gtsmodel.InteractionRequest) + if !ok { + return gtserror.Newf("%T not parseable as *gtsmodel.InteractionRequest", cMsg.GTSModel) + } + + var ( + interactingAcct = req.InteractingAccount + reply = req.Reply + ) + + // Update stats for the reply author account. + if err := p.utils.incrementStatusesCount(ctx, interactingAcct, reply); err != nil { + log.Errorf(ctx, "error updating account stats: %v", err) + } + + // Timeline the reply + notify relevant accounts. + if err := p.surface.timelineAndNotifyStatus(ctx, reply); err != nil { + log.Errorf(ctx, "error timelining and notifying status reply: %v", err) + } + + // Send out the Accept. + if err := p.federate.AcceptInteraction(ctx, req); err != nil { + log.Errorf(ctx, "error federating approval of reply: %v", err) + } + + // Interaction counts changed on the replied status; + // uncache the prepared version from all timelines. + p.surface.invalidateStatusFromTimelines(ctx, reply.InReplyToID) + return nil } func (p *clientAPI) AcceptAnnounce(ctx context.Context, cMsg *messages.FromClientAPI) error { - // TODO + req, ok := cMsg.GTSModel.(*gtsmodel.InteractionRequest) + if !ok { + return gtserror.Newf("%T not parseable as *gtsmodel.InteractionRequest", cMsg.GTSModel) + } + + var ( + interactingAcct = req.InteractingAccount + boost = req.Announce + ) + + // Update stats for the boost author account. + if err := p.utils.incrementStatusesCount(ctx, interactingAcct, boost); err != nil { + log.Errorf(ctx, "error updating account stats: %v", err) + } + + // Timeline and notify the announce. + if err := p.surface.timelineAndNotifyStatus(ctx, boost); err != nil { + log.Errorf(ctx, "error timelining and notifying status: %v", err) + } + + // Notify the announce (distinct from the notif for the pending announce). + if err := p.surface.notifyAnnounce(ctx, boost); err != nil { + log.Errorf(ctx, "error notifying announce: %v", err) + } + + // Send out the Accept. + if err := p.federate.AcceptInteraction(ctx, req); err != nil { + log.Errorf(ctx, "error federating approval of announce: %v", err) + } + + // Interaction counts changed on the original status; + // uncache the prepared version from all timelines. + p.surface.invalidateStatusFromTimelines(ctx, boost.BoostOfID) + + return nil +} + +func (p *clientAPI) RejectLike(ctx context.Context, cMsg *messages.FromClientAPI) error { + req, ok := cMsg.GTSModel.(*gtsmodel.InteractionRequest) + if !ok { + return gtserror.Newf("%T not parseable as *gtsmodel.InteractionRequest", cMsg.GTSModel) + } + + // At this point the InteractionRequest should already + // be in the database, we just need to do side effects. + + // Send out the Reject. + if err := p.federate.RejectInteraction(ctx, req); err != nil { + log.Errorf(ctx, "error federating rejection of like: %v", err) + } + + // Get the rejected fave. + fave, err := p.state.DB.GetStatusFaveByURI( + gtscontext.SetBarebones(ctx), + req.InteractionURI, + ) + if err != nil { + return gtserror.Newf("db error getting rejected fave: %w", err) + } + + // Delete the status fave. + if err := p.state.DB.DeleteStatusFaveByID(ctx, fave.ID); err != nil { + return gtserror.Newf("db error deleting status fave: %w", err) + } + + return nil +} + +func (p *clientAPI) RejectReply(ctx context.Context, cMsg *messages.FromClientAPI) error { + req, ok := cMsg.GTSModel.(*gtsmodel.InteractionRequest) + if !ok { + return gtserror.Newf("%T not parseable as *gtsmodel.InteractionRequest", cMsg.GTSModel) + } + + // At this point the InteractionRequest should already + // be in the database, we just need to do side effects. + + // Send out the Reject. + if err := p.federate.RejectInteraction(ctx, req); err != nil { + log.Errorf(ctx, "error federating rejection of reply: %v", err) + } + + // Get the rejected status. + status, err := p.state.DB.GetStatusByURI( + gtscontext.SetBarebones(ctx), + req.InteractionURI, + ) + if err != nil { + return gtserror.Newf("db error getting rejected reply: %w", err) + } + + // Totally wipe the status. + if err := p.utils.wipeStatus(ctx, status, true); err != nil { + return gtserror.Newf("error wiping status: %w", err) + } + + return nil +} + +func (p *clientAPI) RejectAnnounce(ctx context.Context, cMsg *messages.FromClientAPI) error { + req, ok := cMsg.GTSModel.(*gtsmodel.InteractionRequest) + if !ok { + return gtserror.Newf("%T not parseable as *gtsmodel.InteractionRequest", cMsg.GTSModel) + } + + // At this point the InteractionRequest should already + // be in the database, we just need to do side effects. + + // Send out the Reject. + if err := p.federate.RejectInteraction(ctx, req); err != nil { + log.Errorf(ctx, "error federating rejection of announce: %v", err) + } + + // Get the rejected boost. + boost, err := p.state.DB.GetStatusByURI( + gtscontext.SetBarebones(ctx), + req.InteractionURI, + ) + if err != nil { + return gtserror.Newf("db error getting rejected announce: %w", err) + } + + // Totally wipe the status. + if err := p.utils.wipeStatus(ctx, boost, true); err != nil { + return gtserror.Newf("error wiping status: %w", err) + } + return nil } diff --git a/internal/processing/workers/fromclientapi_test.go b/internal/processing/workers/fromclientapi_test.go index b4eae0be0..d330e4c2b 100644 --- a/internal/processing/workers/fromclientapi_test.go +++ b/internal/processing/workers/fromclientapi_test.go @@ -231,8 +231,8 @@ func (suite *FromClientAPITestSuite) conversationJSON( } func (suite *FromClientAPITestSuite) TestProcessCreateStatusWithNotification() { - testStructs := suite.SetupTestStructs() - defer suite.TearDownTestStructs(testStructs) + testStructs := testrig.SetupTestStructs(rMediaPath, rTemplatePath) + defer testrig.TearDownTestStructs(testStructs) var ( ctx = context.Background() @@ -344,8 +344,8 @@ func (suite *FromClientAPITestSuite) TestProcessCreateStatusWithNotification() { } func (suite *FromClientAPITestSuite) TestProcessCreateStatusReply() { - testStructs := suite.SetupTestStructs() - defer suite.TearDownTestStructs(testStructs) + testStructs := testrig.SetupTestStructs(rMediaPath, rTemplatePath) + defer testrig.TearDownTestStructs(testStructs) var ( ctx = context.Background() @@ -412,8 +412,8 @@ func (suite *FromClientAPITestSuite) TestProcessCreateStatusReply() { } func (suite *FromClientAPITestSuite) TestProcessCreateStatusReplyMuted() { - testStructs := suite.SetupTestStructs() - defer suite.TearDownTestStructs(testStructs) + testStructs := testrig.SetupTestStructs(rMediaPath, rTemplatePath) + defer testrig.TearDownTestStructs(testStructs) var ( ctx = context.Background() @@ -473,8 +473,8 @@ func (suite *FromClientAPITestSuite) TestProcessCreateStatusReplyMuted() { } func (suite *FromClientAPITestSuite) TestProcessCreateStatusBoostMuted() { - testStructs := suite.SetupTestStructs() - defer suite.TearDownTestStructs(testStructs) + testStructs := testrig.SetupTestStructs(rMediaPath, rTemplatePath) + defer testrig.TearDownTestStructs(testStructs) var ( ctx = context.Background() @@ -534,8 +534,8 @@ func (suite *FromClientAPITestSuite) TestProcessCreateStatusBoostMuted() { } func (suite *FromClientAPITestSuite) TestProcessCreateStatusListRepliesPolicyListOnlyOK() { - testStructs := suite.SetupTestStructs() - defer suite.TearDownTestStructs(testStructs) + testStructs := testrig.SetupTestStructs(rMediaPath, rTemplatePath) + defer testrig.TearDownTestStructs(testStructs) // We're modifying the test list so take a copy. testList := new(gtsmodel.List) @@ -610,8 +610,8 @@ func (suite *FromClientAPITestSuite) TestProcessCreateStatusListRepliesPolicyLis } func (suite *FromClientAPITestSuite) TestProcessCreateStatusListRepliesPolicyListOnlyNo() { - testStructs := suite.SetupTestStructs() - defer suite.TearDownTestStructs(testStructs) + testStructs := testrig.SetupTestStructs(rMediaPath, rTemplatePath) + defer testrig.TearDownTestStructs(testStructs) // We're modifying the test list so take a copy. testList := new(gtsmodel.List) @@ -691,8 +691,8 @@ func (suite *FromClientAPITestSuite) TestProcessCreateStatusListRepliesPolicyLis } func (suite *FromClientAPITestSuite) TestProcessCreateStatusReplyListRepliesPolicyNone() { - testStructs := suite.SetupTestStructs() - defer suite.TearDownTestStructs(testStructs) + testStructs := testrig.SetupTestStructs(rMediaPath, rTemplatePath) + defer testrig.TearDownTestStructs(testStructs) // We're modifying the test list so take a copy. testList := new(gtsmodel.List) @@ -767,8 +767,8 @@ func (suite *FromClientAPITestSuite) TestProcessCreateStatusReplyListRepliesPoli } func (suite *FromClientAPITestSuite) TestProcessCreateStatusBoost() { - testStructs := suite.SetupTestStructs() - defer suite.TearDownTestStructs(testStructs) + testStructs := testrig.SetupTestStructs(rMediaPath, rTemplatePath) + defer testrig.TearDownTestStructs(testStructs) var ( ctx = context.Background() @@ -831,8 +831,8 @@ func (suite *FromClientAPITestSuite) TestProcessCreateStatusBoost() { } func (suite *FromClientAPITestSuite) TestProcessCreateStatusBoostNoReblogs() { - testStructs := suite.SetupTestStructs() - defer suite.TearDownTestStructs(testStructs) + testStructs := testrig.SetupTestStructs(rMediaPath, rTemplatePath) + defer testrig.TearDownTestStructs(testStructs) var ( ctx = context.Background() @@ -898,8 +898,8 @@ func (suite *FromClientAPITestSuite) TestProcessCreateStatusBoostNoReblogs() { // A DM to a local user should create a conversation and accompanying notification. func (suite *FromClientAPITestSuite) TestProcessCreateStatusWhichBeginsConversation() { - testStructs := suite.SetupTestStructs() - defer suite.TearDownTestStructs(testStructs) + testStructs := testrig.SetupTestStructs(rMediaPath, rTemplatePath) + defer testrig.TearDownTestStructs(testStructs) var ( ctx = context.Background() @@ -984,8 +984,8 @@ func (suite *FromClientAPITestSuite) TestProcessCreateStatusWhichBeginsConversat // A public message to a local user should not result in a conversation notification. func (suite *FromClientAPITestSuite) TestProcessCreateStatusWhichShouldNotCreateConversation() { - testStructs := suite.SetupTestStructs() - defer suite.TearDownTestStructs(testStructs) + testStructs := testrig.SetupTestStructs(rMediaPath, rTemplatePath) + defer testrig.TearDownTestStructs(testStructs) var ( ctx = context.Background() @@ -1054,8 +1054,8 @@ func (suite *FromClientAPITestSuite) TestProcessCreateStatusWhichShouldNotCreate // A public status with a hashtag followed by a local user who does not otherwise follow the author // should end up in the tag-following user's home timeline. func (suite *FromClientAPITestSuite) TestProcessCreateStatusWithFollowedHashtag() { - testStructs := suite.SetupTestStructs() - defer suite.TearDownTestStructs(testStructs) + testStructs := testrig.SetupTestStructs(rMediaPath, rTemplatePath) + defer testrig.TearDownTestStructs(testStructs) var ( ctx = context.Background() @@ -1128,8 +1128,8 @@ func (suite *FromClientAPITestSuite) TestProcessCreateStatusWithFollowedHashtag( // should not end up in the tag-following user's home timeline // if the user has the author blocked. func (suite *FromClientAPITestSuite) TestProcessCreateStatusWithFollowedHashtagAndBlock() { - testStructs := suite.SetupTestStructs() - defer suite.TearDownTestStructs(testStructs) + testStructs := testrig.SetupTestStructs(rMediaPath, rTemplatePath) + defer testrig.TearDownTestStructs(testStructs) var ( ctx = context.Background() @@ -1209,8 +1209,8 @@ func (suite *FromClientAPITestSuite) TestProcessCreateStatusWithFollowedHashtagA // who does not otherwise follow the author or booster // should end up in the tag-following user's home timeline as the original status. func (suite *FromClientAPITestSuite) TestProcessCreateBoostWithFollowedHashtag() { - testStructs := suite.SetupTestStructs() - defer suite.TearDownTestStructs(testStructs) + testStructs := testrig.SetupTestStructs(rMediaPath, rTemplatePath) + defer testrig.TearDownTestStructs(testStructs) var ( ctx = context.Background() @@ -1312,8 +1312,8 @@ func (suite *FromClientAPITestSuite) TestProcessCreateBoostWithFollowedHashtag() // should not end up in the tag-following user's home timeline // if the user has the author blocked. func (suite *FromClientAPITestSuite) TestProcessCreateBoostWithFollowedHashtagAndBlock() { - testStructs := suite.SetupTestStructs() - defer suite.TearDownTestStructs(testStructs) + testStructs := testrig.SetupTestStructs(rMediaPath, rTemplatePath) + defer testrig.TearDownTestStructs(testStructs) var ( ctx = context.Background() @@ -1422,8 +1422,8 @@ func (suite *FromClientAPITestSuite) TestProcessCreateBoostWithFollowedHashtagAn // should not end up in the tag-following user's home timeline // if the user has the booster blocked. func (suite *FromClientAPITestSuite) TestProcessCreateBoostWithFollowedHashtagAndBlockedBoost() { - testStructs := suite.SetupTestStructs() - defer suite.TearDownTestStructs(testStructs) + testStructs := testrig.SetupTestStructs(rMediaPath, rTemplatePath) + defer testrig.TearDownTestStructs(testStructs) var ( ctx = context.Background() @@ -1530,8 +1530,8 @@ func (suite *FromClientAPITestSuite) TestProcessCreateBoostWithFollowedHashtagAn // Updating a public status with a hashtag followed by a local user who does not otherwise follow the author // should stream a status update to the tag-following user's home timeline. func (suite *FromClientAPITestSuite) TestProcessUpdateStatusWithFollowedHashtag() { - testStructs := suite.SetupTestStructs() - defer suite.TearDownTestStructs(testStructs) + testStructs := testrig.SetupTestStructs(rMediaPath, rTemplatePath) + defer testrig.TearDownTestStructs(testStructs) var ( ctx = context.Background() @@ -1601,8 +1601,8 @@ func (suite *FromClientAPITestSuite) TestProcessUpdateStatusWithFollowedHashtag( } func (suite *FromClientAPITestSuite) TestProcessStatusDelete() { - testStructs := suite.SetupTestStructs() - defer suite.TearDownTestStructs(testStructs) + testStructs := testrig.SetupTestStructs(rMediaPath, rTemplatePath) + defer testrig.TearDownTestStructs(testStructs) var ( ctx = context.Background() diff --git a/internal/processing/workers/fromfediapi.go b/internal/processing/workers/fromfediapi.go index ce7c53388..908369ca6 100644 --- a/internal/processing/workers/fromfediapi.go +++ b/internal/processing/workers/fromfediapi.go @@ -20,18 +20,22 @@ package workers import ( "context" "errors" + "time" "codeberg.org/gruf/go-kv" "codeberg.org/gruf/go-logger/v2/level" "github.com/superseriousbusiness/gotosocial/internal/ap" "github.com/superseriousbusiness/gotosocial/internal/db" "github.com/superseriousbusiness/gotosocial/internal/federation/dereferencing" + "github.com/superseriousbusiness/gotosocial/internal/id" + "github.com/superseriousbusiness/gotosocial/internal/uris" "github.com/superseriousbusiness/gotosocial/internal/gtserror" "github.com/superseriousbusiness/gotosocial/internal/gtsmodel" "github.com/superseriousbusiness/gotosocial/internal/log" "github.com/superseriousbusiness/gotosocial/internal/messages" "github.com/superseriousbusiness/gotosocial/internal/processing/account" + "github.com/superseriousbusiness/gotosocial/internal/processing/common" "github.com/superseriousbusiness/gotosocial/internal/state" "github.com/superseriousbusiness/gotosocial/internal/util" ) @@ -44,6 +48,7 @@ type fediAPI struct { surface *Surface federate *federate account *account.Processor + common *common.Processor utils *utils } @@ -231,10 +236,7 @@ func (p *fediAPI) CreateStatus(ctx context.Context, fMsg *messages.FromFediAPI) // If pending approval is true then // status must reply to a LOCAL status // that requires approval for the reply. - pendingApproval := util.PtrOrValue( - status.PendingApproval, - false, - ) + pendingApproval := util.PtrOrZero(status.PendingApproval) switch { case pendingApproval && !status.PreApproved: @@ -242,10 +244,8 @@ func (p *fediAPI) CreateStatus(ctx context.Context, fMsg *messages.FromFediAPI) // preapproved, then just notify the account // that's being interacted with: they can // approve or deny the interaction later. - - // Notify *local* account of pending reply. - if err := p.surface.notifyPendingReply(ctx, status); err != nil { - log.Errorf(ctx, "error notifying pending reply: %v", err) + if err := p.utils.requestReply(ctx, status); err != nil { + return gtserror.Newf("error pending reply: %w", err) } // Return early. @@ -259,11 +259,33 @@ func (p *fediAPI) CreateStatus(ctx context.Context, fMsg *messages.FromFediAPI) // collection. Do the Accept immediately and // then process everything else as normal. - // Put approval in the database and - // update the status with approvedBy URI. - approval, err := p.utils.approveReply(ctx, status) - if err != nil { - return gtserror.Newf("error pre-approving reply: %w", err) + // Store an already-accepted interaction request. + id := id.NewULID() + approval := >smodel.InteractionRequest{ + ID: id, + StatusID: status.InReplyToID, + TargetAccountID: status.InReplyToAccountID, + TargetAccount: status.InReplyToAccount, + InteractingAccountID: status.AccountID, + InteractingAccount: status.Account, + InteractionURI: status.URI, + InteractionType: gtsmodel.InteractionLike, + Reply: status, + URI: uris.GenerateURIForAccept(status.InReplyToAccount.Username, id), + AcceptedAt: time.Now(), + } + + // Mark the status as now approved. + status.PendingApproval = util.Ptr(false) + status.PreApproved = false + status.ApprovedByURI = approval.URI + if err := p.state.DB.UpdateStatus( + ctx, + status, + "pending_approval", + "approved_by_uri", + ); err != nil { + return gtserror.Newf("db error updating status: %w", err) } // Send out the approval as Accept. @@ -279,6 +301,10 @@ func (p *fediAPI) CreateStatus(ctx context.Context, fMsg *messages.FromFediAPI) log.Errorf(ctx, "error updating account stats: %v", err) } + if err := p.surface.timelineAndNotifyStatus(ctx, status); err != nil { + log.Errorf(ctx, "error timelining and notifying status: %v", err) + } + if status.InReplyToID != "" { // Interaction counts changed on the replied status; uncache the // prepared version from all timelines. The status dereferencer @@ -286,10 +312,6 @@ func (p *fediAPI) CreateStatus(ctx context.Context, fMsg *messages.FromFediAPI) p.surface.invalidateStatusFromTimelines(ctx, status.InReplyToID) } - if err := p.surface.timelineAndNotifyStatus(ctx, status); err != nil { - log.Errorf(ctx, "error timelining and notifying status: %v", err) - } - return nil } @@ -320,9 +342,6 @@ func (p *fediAPI) CreatePollVote(ctx context.Context, fMsg *messages.FromFediAPI status := vote.Poll.Status status.Poll = vote.Poll - // Interaction counts changed on the source status, uncache from timelines. - p.surface.invalidateStatusFromTimelines(ctx, vote.Poll.StatusID) - if *status.Local { // Before federating it, increment the // poll vote counts on our local copy. @@ -335,6 +354,9 @@ func (p *fediAPI) CreatePollVote(ctx context.Context, fMsg *messages.FromFediAPI } } + // Interaction counts changed on the source status, uncache from timelines. + p.surface.invalidateStatusFromTimelines(ctx, vote.Poll.StatusID) + return nil } @@ -409,10 +431,7 @@ func (p *fediAPI) CreateLike(ctx context.Context, fMsg *messages.FromFediAPI) er // If pending approval is true then // fave must target a LOCAL status // that requires approval for the fave. - pendingApproval := util.PtrOrValue( - fave.PendingApproval, - false, - ) + pendingApproval := util.PtrOrZero(fave.PendingApproval) switch { case pendingApproval && !fave.PreApproved: @@ -420,10 +439,8 @@ func (p *fediAPI) CreateLike(ctx context.Context, fMsg *messages.FromFediAPI) er // preapproved, then just notify the account // that's being interacted with: they can // approve or deny the interaction later. - - // Notify *local* account of pending fave. - if err := p.surface.notifyPendingFave(ctx, fave); err != nil { - log.Errorf(ctx, "error notifying pending fave: %v", err) + if err := p.utils.requestFave(ctx, fave); err != nil { + return gtserror.Newf("error pending fave: %w", err) } // Return early. @@ -437,11 +454,33 @@ func (p *fediAPI) CreateLike(ctx context.Context, fMsg *messages.FromFediAPI) er // collection. Do the Accept immediately and // then process everything else as normal. - // Put approval in the database and - // update the fave with approvedBy URI. - approval, err := p.utils.approveFave(ctx, fave) - if err != nil { - return gtserror.Newf("error pre-approving fave: %w", err) + // Store an already-accepted interaction request. + id := id.NewULID() + approval := >smodel.InteractionRequest{ + ID: id, + StatusID: fave.StatusID, + TargetAccountID: fave.TargetAccountID, + TargetAccount: fave.TargetAccount, + InteractingAccountID: fave.AccountID, + InteractingAccount: fave.Account, + InteractionURI: fave.URI, + InteractionType: gtsmodel.InteractionLike, + Like: fave, + URI: uris.GenerateURIForAccept(fave.TargetAccount.Username, id), + AcceptedAt: time.Now(), + } + + // Mark the fave itself as now approved. + fave.PendingApproval = util.Ptr(false) + fave.PreApproved = false + fave.ApprovedByURI = approval.URI + if err := p.state.DB.UpdateStatusFave( + ctx, + fave, + "pending_approval", + "approved_by_uri", + ); err != nil { + return gtserror.Newf("db error updating status fave: %w", err) } // Send out the approval as Accept. @@ -496,10 +535,7 @@ func (p *fediAPI) CreateAnnounce(ctx context.Context, fMsg *messages.FromFediAPI // If pending approval is true then // boost must target a LOCAL status // that requires approval for the boost. - pendingApproval := util.PtrOrValue( - boost.PendingApproval, - false, - ) + pendingApproval := util.PtrOrZero(boost.PendingApproval) switch { case pendingApproval && !boost.PreApproved: @@ -507,10 +543,8 @@ func (p *fediAPI) CreateAnnounce(ctx context.Context, fMsg *messages.FromFediAPI // preapproved, then just notify the account // that's being interacted with: they can // approve or deny the interaction later. - - // Notify *local* account of pending announce. - if err := p.surface.notifyPendingAnnounce(ctx, boost); err != nil { - log.Errorf(ctx, "error notifying pending boost: %v", err) + if err := p.utils.requestAnnounce(ctx, boost); err != nil { + return gtserror.Newf("error pending boost: %w", err) } // Return early. @@ -524,11 +558,33 @@ func (p *fediAPI) CreateAnnounce(ctx context.Context, fMsg *messages.FromFediAPI // collection. Do the Accept immediately and // then process everything else as normal. - // Put approval in the database and - // update the boost with approvedBy URI. - approval, err := p.utils.approveAnnounce(ctx, boost) - if err != nil { - return gtserror.Newf("error pre-approving boost: %w", err) + // Store an already-accepted interaction request. + id := id.NewULID() + approval := >smodel.InteractionRequest{ + ID: id, + StatusID: boost.BoostOfID, + TargetAccountID: boost.BoostOfAccountID, + TargetAccount: boost.BoostOfAccount, + InteractingAccountID: boost.AccountID, + InteractingAccount: boost.Account, + InteractionURI: boost.URI, + InteractionType: gtsmodel.InteractionLike, + Announce: boost, + URI: uris.GenerateURIForAccept(boost.BoostOfAccount.Username, id), + AcceptedAt: time.Now(), + } + + // Mark the boost itself as now approved. + boost.PendingApproval = util.Ptr(false) + boost.PreApproved = false + boost.ApprovedByURI = approval.URI + if err := p.state.DB.UpdateStatus( + ctx, + boost, + "pending_approval", + "approved_by_uri", + ); err != nil { + return gtserror.Newf("db error updating status: %w", err) } // Send out the approval as Accept. @@ -729,15 +785,15 @@ func (p *fediAPI) AcceptReply(ctx context.Context, fMsg *messages.FromFediAPI) e log.Errorf(ctx, "error timelining and notifying status: %v", err) } - // Interaction counts changed on the replied-to status; - // uncache the prepared version from all timelines. - p.surface.invalidateStatusFromTimelines(ctx, status.InReplyToID) - // Send out the reply again, fully this time. if err := p.federate.CreateStatus(ctx, status); err != nil { log.Errorf(ctx, "error federating announce: %v", err) } + // Interaction counts changed on the replied-to status; + // uncache the prepared version from all timelines. + p.surface.invalidateStatusFromTimelines(ctx, status.InReplyToID) + return nil } @@ -757,15 +813,15 @@ func (p *fediAPI) AcceptAnnounce(ctx context.Context, fMsg *messages.FromFediAPI log.Errorf(ctx, "error timelining and notifying status: %v", err) } - // Interaction counts changed on the boosted status; - // uncache the prepared version from all timelines. - p.surface.invalidateStatusFromTimelines(ctx, boost.BoostOfID) - // Send out the boost again, fully this time. if err := p.federate.Announce(ctx, boost); err != nil { log.Errorf(ctx, "error federating announce: %v", err) } + // Interaction counts changed on the boosted status; + // uncache the prepared version from all timelines. + p.surface.invalidateStatusFromTimelines(ctx, boost.BoostOfID) + return nil } @@ -792,9 +848,6 @@ func (p *fediAPI) UpdateStatus(ctx context.Context, fMsg *messages.FromFediAPI) log.Errorf(ctx, "error refreshing status: %v", err) } - // Status representation was refetched, uncache from timelines. - p.surface.invalidateStatusFromTimelines(ctx, status.ID) - if status.Poll != nil && status.Poll.Closing { // If the latest status has a newly closed poll, at least compared @@ -809,6 +862,9 @@ func (p *fediAPI) UpdateStatus(ctx context.Context, fMsg *messages.FromFediAPI) log.Errorf(ctx, "error streaming status edit: %v", err) } + // Status representation was refetched, uncache from timelines. + p.surface.invalidateStatusFromTimelines(ctx, status.ID) + return nil } diff --git a/internal/processing/workers/fromfediapi_test.go b/internal/processing/workers/fromfediapi_test.go index f08f059ea..d7d7454e7 100644 --- a/internal/processing/workers/fromfediapi_test.go +++ b/internal/processing/workers/fromfediapi_test.go @@ -42,8 +42,8 @@ type FromFediAPITestSuite struct { // remote_account_1 boosts the first status of local_account_1 func (suite *FromFediAPITestSuite) TestProcessFederationAnnounce() { - testStructs := suite.SetupTestStructs() - defer suite.TearDownTestStructs(testStructs) + testStructs := testrig.SetupTestStructs(rMediaPath, rTemplatePath) + defer testrig.TearDownTestStructs(testStructs) boostedStatus := >smodel.Status{} *boostedStatus = *suite.testStatuses["local_account_1_status_1"] @@ -106,8 +106,8 @@ func (suite *FromFediAPITestSuite) TestProcessFederationAnnounce() { } func (suite *FromFediAPITestSuite) TestProcessReplyMention() { - testStructs := suite.SetupTestStructs() - defer suite.TearDownTestStructs(testStructs) + testStructs := testrig.SetupTestStructs(rMediaPath, rTemplatePath) + defer testrig.TearDownTestStructs(testStructs) repliedAccount := >smodel.Account{} *repliedAccount = *suite.testAccounts["local_account_1"] @@ -190,8 +190,8 @@ func (suite *FromFediAPITestSuite) TestProcessReplyMention() { } func (suite *FromFediAPITestSuite) TestProcessFave() { - testStructs := suite.SetupTestStructs() - defer suite.TearDownTestStructs(testStructs) + testStructs := testrig.SetupTestStructs(rMediaPath, rTemplatePath) + defer testrig.TearDownTestStructs(testStructs) favedAccount := suite.testAccounts["local_account_1"] favedStatus := suite.testStatuses["local_account_1_status_1"] @@ -262,8 +262,8 @@ func (suite *FromFediAPITestSuite) TestProcessFave() { // This tests for an issue we were seeing where Misskey sends out faves to inboxes of people that don't own // the fave, but just follow the actor who received the fave. func (suite *FromFediAPITestSuite) TestProcessFaveWithDifferentReceivingAccount() { - testStructs := suite.SetupTestStructs() - defer suite.TearDownTestStructs(testStructs) + testStructs := testrig.SetupTestStructs(rMediaPath, rTemplatePath) + defer testrig.TearDownTestStructs(testStructs) receivingAccount := suite.testAccounts["local_account_2"] favedAccount := suite.testAccounts["local_account_1"] @@ -327,8 +327,8 @@ func (suite *FromFediAPITestSuite) TestProcessFaveWithDifferentReceivingAccount( } func (suite *FromFediAPITestSuite) TestProcessAccountDelete() { - testStructs := suite.SetupTestStructs() - defer suite.TearDownTestStructs(testStructs) + testStructs := testrig.SetupTestStructs(rMediaPath, rTemplatePath) + defer testrig.TearDownTestStructs(testStructs) ctx := context.Background() @@ -421,8 +421,8 @@ func (suite *FromFediAPITestSuite) TestProcessAccountDelete() { } func (suite *FromFediAPITestSuite) TestProcessFollowRequestLocked() { - testStructs := suite.SetupTestStructs() - defer suite.TearDownTestStructs(testStructs) + testStructs := testrig.SetupTestStructs(rMediaPath, rTemplatePath) + defer testrig.TearDownTestStructs(testStructs) ctx := context.Background() @@ -478,8 +478,8 @@ func (suite *FromFediAPITestSuite) TestProcessFollowRequestLocked() { } func (suite *FromFediAPITestSuite) TestProcessFollowRequestUnlocked() { - testStructs := suite.SetupTestStructs() - defer suite.TearDownTestStructs(testStructs) + testStructs := testrig.SetupTestStructs(rMediaPath, rTemplatePath) + defer testrig.TearDownTestStructs(testStructs) ctx := context.Background() @@ -579,8 +579,8 @@ func (suite *FromFediAPITestSuite) TestProcessFollowRequestUnlocked() { // TestCreateStatusFromIRI checks if a forwarded status can be dereferenced by the processor. func (suite *FromFediAPITestSuite) TestCreateStatusFromIRI() { - testStructs := suite.SetupTestStructs() - defer suite.TearDownTestStructs(testStructs) + testStructs := testrig.SetupTestStructs(rMediaPath, rTemplatePath) + defer testrig.TearDownTestStructs(testStructs) ctx := context.Background() @@ -604,8 +604,8 @@ func (suite *FromFediAPITestSuite) TestCreateStatusFromIRI() { } func (suite *FromFediAPITestSuite) TestMoveAccount() { - testStructs := suite.SetupTestStructs() - defer suite.TearDownTestStructs(testStructs) + testStructs := testrig.SetupTestStructs(rMediaPath, rTemplatePath) + defer testrig.TearDownTestStructs(testStructs) // We're gonna migrate foss_satan to our local admin account. ctx := context.Background() diff --git a/internal/processing/workers/surfacenotify_test.go b/internal/processing/workers/surfacenotify_test.go index 876f69933..dc445d0ac 100644 --- a/internal/processing/workers/surfacenotify_test.go +++ b/internal/processing/workers/surfacenotify_test.go @@ -28,6 +28,7 @@ import ( "github.com/superseriousbusiness/gotosocial/internal/gtscontext" "github.com/superseriousbusiness/gotosocial/internal/gtsmodel" "github.com/superseriousbusiness/gotosocial/internal/processing/workers" + "github.com/superseriousbusiness/gotosocial/testrig" ) type SurfaceNotifyTestSuite struct { @@ -35,8 +36,8 @@ type SurfaceNotifyTestSuite struct { } func (suite *SurfaceNotifyTestSuite) TestSpamNotifs() { - testStructs := suite.SetupTestStructs() - defer suite.TearDownTestStructs(testStructs) + testStructs := testrig.SetupTestStructs(rMediaPath, rTemplatePath) + defer testrig.TearDownTestStructs(testStructs) surface := &workers.Surface{ State: testStructs.State, diff --git a/internal/processing/workers/util.go b/internal/processing/workers/util.go index 49c6183a4..bb7faffbf 100644 --- a/internal/processing/workers/util.go +++ b/internal/processing/workers/util.go @@ -26,12 +26,11 @@ import ( "github.com/superseriousbusiness/gotosocial/internal/gtscontext" "github.com/superseriousbusiness/gotosocial/internal/gtserror" "github.com/superseriousbusiness/gotosocial/internal/gtsmodel" - "github.com/superseriousbusiness/gotosocial/internal/id" "github.com/superseriousbusiness/gotosocial/internal/log" "github.com/superseriousbusiness/gotosocial/internal/processing/account" "github.com/superseriousbusiness/gotosocial/internal/processing/media" "github.com/superseriousbusiness/gotosocial/internal/state" - "github.com/superseriousbusiness/gotosocial/internal/uris" + "github.com/superseriousbusiness/gotosocial/internal/typeutils" "github.com/superseriousbusiness/gotosocial/internal/util" ) @@ -488,128 +487,143 @@ func (u *utils) decrementFollowRequestsCount( return nil } -// approveFave stores + returns an -// interactionApproval for a fave. -func (u *utils) approveFave( +// requestFave stores an interaction request +// for the given fave, and notifies the interactee. +func (u *utils) requestFave( ctx context.Context, fave *gtsmodel.StatusFave, -) (*gtsmodel.InteractionApproval, error) { - id := id.NewULID() +) error { + // Only create interaction request + // if fave targets a local status. + if fave.Status == nil || + !fave.Status.IsLocal() { + return nil + } - approval := >smodel.InteractionApproval{ - ID: id, - AccountID: fave.TargetAccountID, - Account: fave.TargetAccount, - InteractingAccountID: fave.AccountID, - InteractingAccount: fave.Account, - InteractionURI: fave.URI, - InteractionType: gtsmodel.InteractionLike, - URI: uris.GenerateURIForAccept(fave.TargetAccount.Username, id), + // Lock on the interaction URI. + unlock := u.state.ProcessingLocks.Lock(fave.URI) + defer unlock() + + // Ensure no req with this URI exists already. + req, err := u.state.DB.GetInteractionRequestByInteractionURI(ctx, fave.URI) + if err != nil && !errors.Is(err, db.ErrNoEntries) { + return gtserror.Newf("db error checking for existing interaction request: %w", err) } - if err := u.state.DB.PutInteractionApproval(ctx, approval); err != nil { - err := gtserror.Newf("db error inserting interaction approval: %w", err) - return nil, err + if req != nil { + // Interaction req already exists, + // no need to do anything else. + return nil } - // Mark the fave itself as now approved. - fave.PendingApproval = util.Ptr(false) - fave.PreApproved = false - fave.ApprovedByURI = approval.URI + // Create + store new interaction request. + req, err = typeutils.StatusFaveToInteractionRequest(ctx, fave) + if err != nil { + return gtserror.Newf("error creating interaction request: %w", err) + } - if err := u.state.DB.UpdateStatusFave( - ctx, - fave, - "pending_approval", - "approved_by_uri", - ); err != nil { - err := gtserror.Newf("db error updating status fave: %w", err) - return nil, err + if err := u.state.DB.PutInteractionRequest(ctx, req); err != nil { + return gtserror.Newf("db error storing interaction request: %w", err) } - return approval, nil + // Notify *local* account of pending announce. + if err := u.surface.notifyPendingFave(ctx, fave); err != nil { + return gtserror.Newf("error notifying pending fave: %w", err) + } + + return nil } -// approveReply stores + returns an -// interactionApproval for a reply. -func (u *utils) approveReply( +// requestReply stores an interaction request +// for the given reply, and notifies the interactee. +func (u *utils) requestReply( ctx context.Context, - status *gtsmodel.Status, -) (*gtsmodel.InteractionApproval, error) { - id := id.NewULID() + reply *gtsmodel.Status, +) error { + // Only create interaction request if + // status replies to a local status. + if reply.InReplyTo == nil || + !reply.InReplyTo.IsLocal() { + return nil + } - approval := >smodel.InteractionApproval{ - ID: id, - AccountID: status.InReplyToAccountID, - Account: status.InReplyToAccount, - InteractingAccountID: status.AccountID, - InteractingAccount: status.Account, - InteractionURI: status.URI, - InteractionType: gtsmodel.InteractionReply, - URI: uris.GenerateURIForAccept(status.InReplyToAccount.Username, id), + // Lock on the interaction URI. + unlock := u.state.ProcessingLocks.Lock(reply.URI) + defer unlock() + + // Ensure no req with this URI exists already. + req, err := u.state.DB.GetInteractionRequestByInteractionURI(ctx, reply.URI) + if err != nil && !errors.Is(err, db.ErrNoEntries) { + return gtserror.Newf("db error checking for existing interaction request: %w", err) } - if err := u.state.DB.PutInteractionApproval(ctx, approval); err != nil { - err := gtserror.Newf("db error inserting interaction approval: %w", err) - return nil, err + if req != nil { + // Interaction req already exists, + // no need to do anything else. + return nil } - // Mark the status itself as now approved. - status.PendingApproval = util.Ptr(false) - status.PreApproved = false - status.ApprovedByURI = approval.URI + // Create + store interaction request. + req, err = typeutils.StatusToInteractionRequest(ctx, reply) + if err != nil { + return gtserror.Newf("error creating interaction request: %w", err) + } - if err := u.state.DB.UpdateStatus( - ctx, - status, - "pending_approval", - "approved_by_uri", - ); err != nil { - err := gtserror.Newf("db error updating status: %w", err) - return nil, err + if err := u.state.DB.PutInteractionRequest(ctx, req); err != nil { + return gtserror.Newf("db error storing interaction request: %w", err) + } + + // Notify *local* account of pending reply. + if err := u.surface.notifyPendingReply(ctx, reply); err != nil { + return gtserror.Newf("error notifying pending reply: %w", err) } - return approval, nil + return nil } -// approveAnnounce stores + returns an -// interactionApproval for an announce. -func (u *utils) approveAnnounce( +// requestAnnounce stores an interaction request +// for the given announce, and notifies the interactee. +func (u *utils) requestAnnounce( ctx context.Context, boost *gtsmodel.Status, -) (*gtsmodel.InteractionApproval, error) { - id := id.NewULID() +) error { + // Only create interaction request if + // status announces a local status. + if boost.BoostOf == nil || + !boost.BoostOf.IsLocal() { + return nil + } + + // Lock on the interaction URI. + unlock := u.state.ProcessingLocks.Lock(boost.URI) + defer unlock() - approval := >smodel.InteractionApproval{ - ID: id, - AccountID: boost.BoostOfAccountID, - Account: boost.BoostOfAccount, - InteractingAccountID: boost.AccountID, - InteractingAccount: boost.Account, - InteractionURI: boost.URI, - InteractionType: gtsmodel.InteractionReply, - URI: uris.GenerateURIForAccept(boost.BoostOfAccount.Username, id), + // Ensure no req with this URI exists already. + req, err := u.state.DB.GetInteractionRequestByInteractionURI(ctx, boost.URI) + if err != nil && !errors.Is(err, db.ErrNoEntries) { + return gtserror.Newf("db error checking for existing interaction request: %w", err) } - if err := u.state.DB.PutInteractionApproval(ctx, approval); err != nil { - err := gtserror.Newf("db error inserting interaction approval: %w", err) - return nil, err + if req != nil { + // Interaction req already exists, + // no need to do anything else. + return nil } - // Mark the status itself as now approved. - boost.PendingApproval = util.Ptr(false) - boost.PreApproved = false - boost.ApprovedByURI = approval.URI + // Create + store interaction request. + req, err = typeutils.StatusToInteractionRequest(ctx, boost) + if err != nil { + return gtserror.Newf("error creating interaction request: %w", err) + } - if err := u.state.DB.UpdateStatus( - ctx, - boost, - "pending_approval", - "approved_by_uri", - ); err != nil { - err := gtserror.Newf("db error updating boost wrapper status: %w", err) - return nil, err + if err := u.state.DB.PutInteractionRequest(ctx, req); err != nil { + return gtserror.Newf("db error storing interaction request: %w", err) + } + + // Notify *local* account of pending announce. + if err := u.surface.notifyPendingAnnounce(ctx, boost); err != nil { + return gtserror.Newf("error notifying pending announce: %w", err) } - return approval, nil + return nil } diff --git a/internal/processing/workers/workers.go b/internal/processing/workers/workers.go index 04010a92e..d4b525783 100644 --- a/internal/processing/workers/workers.go +++ b/internal/processing/workers/workers.go @@ -22,6 +22,7 @@ import ( "github.com/superseriousbusiness/gotosocial/internal/federation" "github.com/superseriousbusiness/gotosocial/internal/filter/visibility" "github.com/superseriousbusiness/gotosocial/internal/processing/account" + "github.com/superseriousbusiness/gotosocial/internal/processing/common" "github.com/superseriousbusiness/gotosocial/internal/processing/conversations" "github.com/superseriousbusiness/gotosocial/internal/processing/media" "github.com/superseriousbusiness/gotosocial/internal/processing/stream" @@ -38,6 +39,7 @@ type Processor struct { func New( state *state.State, + common *common.Processor, federator *federation.Federator, converter *typeutils.Converter, visFilter *visibility.Filter, @@ -82,6 +84,7 @@ func New( surface: surface, federate: federate, account: account, + common: common, utils: utils, }, fediAPI: fediAPI{ @@ -89,6 +92,7 @@ func New( surface: surface, federate: federate, account: account, + common: common, utils: utils, }, } diff --git a/internal/processing/workers/workers_test.go b/internal/processing/workers/workers_test.go index 65ed3f6b7..ffd40d8fb 100644 --- a/internal/processing/workers/workers_test.go +++ b/internal/processing/workers/workers_test.go @@ -21,19 +21,18 @@ import ( "context" "github.com/stretchr/testify/suite" - "github.com/superseriousbusiness/gotosocial/internal/cleaner" - "github.com/superseriousbusiness/gotosocial/internal/email" - "github.com/superseriousbusiness/gotosocial/internal/filter/interaction" - "github.com/superseriousbusiness/gotosocial/internal/filter/visibility" "github.com/superseriousbusiness/gotosocial/internal/gtsmodel" "github.com/superseriousbusiness/gotosocial/internal/oauth" "github.com/superseriousbusiness/gotosocial/internal/processing" - "github.com/superseriousbusiness/gotosocial/internal/state" "github.com/superseriousbusiness/gotosocial/internal/stream" - "github.com/superseriousbusiness/gotosocial/internal/typeutils" "github.com/superseriousbusiness/gotosocial/testrig" ) +const ( + rMediaPath = "../../../testrig/media" + rTemplatePath = "../../../web/template" +) + type WorkersTestSuite struct { // standard suite interfaces suite.Suite @@ -56,23 +55,6 @@ type WorkersTestSuite struct { testListEntries map[string]*gtsmodel.ListEntry } -// TestStructs encapsulates structs needed to -// run one test in this package. Each test should -// call SetupTestStructs to get a new TestStructs, -// and defer TearDownTestStructs to close it when -// the test is complete. The reason for doing things -// this way here is to prevent the tests in this -// package from overwriting one another's processors -// and worker queues, which was causing issues -// when running all tests at once. -type TestStructs struct { - State *state.State - Processor *processing.Processor - HTTPClient *testrig.MockHTTPClient - TypeConverter *typeutils.Converter - EmailSender email.Sender -} - func (suite *WorkersTestSuite) SetupSuite() { suite.testTokens = testrig.NewTestTokens() suite.testClients = testrig.NewTestClients() @@ -132,63 +114,3 @@ func (suite *WorkersTestSuite) openStreams(ctx context.Context, processor *proce return streams } - -func (suite *WorkersTestSuite) SetupTestStructs() *TestStructs { - state := state.State{} - - state.Caches.Init() - - db := testrig.NewTestDB(&state) - state.DB = db - - storage := testrig.NewInMemoryStorage() - state.Storage = storage - typeconverter := typeutils.NewConverter(&state) - - testrig.StartTimelines( - &state, - visibility.NewFilter(&state), - typeconverter, - ) - - httpClient := testrig.NewMockHTTPClient(nil, "../../../testrig/media") - httpClient.TestRemotePeople = testrig.NewTestFediPeople() - httpClient.TestRemoteStatuses = testrig.NewTestFediStatuses() - - transportController := testrig.NewTestTransportController(&state, httpClient) - mediaManager := testrig.NewTestMediaManager(&state) - federator := testrig.NewTestFederator(&state, transportController, mediaManager) - oauthServer := testrig.NewTestOauthServer(db) - emailSender := testrig.NewEmailSender("../../../web/template/", nil) - - processor := processing.NewProcessor( - cleaner.New(&state), - typeconverter, - federator, - oauthServer, - mediaManager, - &state, - emailSender, - visibility.NewFilter(&state), - interaction.NewFilter(&state), - ) - - testrig.StartWorkers(&state, processor.Workers()) - - testrig.StandardDBSetup(db, suite.testAccounts) - testrig.StandardStorageSetup(storage, "../../../testrig/media") - - return &TestStructs{ - State: &state, - Processor: processor, - HTTPClient: httpClient, - TypeConverter: typeconverter, - EmailSender: emailSender, - } -} - -func (suite *WorkersTestSuite) TearDownTestStructs(testStructs *TestStructs) { - testrig.StandardDBTeardown(testStructs.State.DB) - testrig.StandardStorageTeardown(testStructs.State.Storage) - testrig.StopWorkers(testStructs.State) -} |