summaryrefslogtreecommitdiff
path: root/internal/processing/workers
diff options
context:
space:
mode:
authorLibravatar tobi <31960611+tsmethurst@users.noreply.github.com>2024-08-24 11:49:37 +0200
committerLibravatar GitHub <noreply@github.com>2024-08-24 11:49:37 +0200
commitf23f04e0b1d117be714bf91d5266dab219ed741e (patch)
tree0b3ddd60d51c8729949c3669993910a7f8f32a7b /internal/processing/workers
parent[performance] ffmpeg ffprobe wrapper improvements (#3225) (diff)
downloadgotosocial-f23f04e0b1d117be714bf91d5266dab219ed741e.tar.xz
[feature] Interaction requests client api + settings panel (#3215)
* [feature] Interaction requests client api + settings panel * test accept / reject * fmt * don't pin rejected interaction * use single db model for interaction accept, reject, and request * swaggor * env sharting * append errors * remove ErrNoEntries checks * change intReqID to reqID * rename "pend" to "request" * markIntsPending -> mark interactionsPending * use log instead of returning error when rejecting interaction * empty migration * jolly renaming * make interactionURI unique again * swag grr * remove unnecessary locks * invalidate as last step
Diffstat (limited to 'internal/processing/workers')
-rw-r--r--internal/processing/workers/federate.go92
-rw-r--r--internal/processing/workers/fromclientapi.go398
-rw-r--r--internal/processing/workers/fromclientapi_test.go72
-rw-r--r--internal/processing/workers/fromfediapi.go170
-rw-r--r--internal/processing/workers/fromfediapi_test.go36
-rw-r--r--internal/processing/workers/surfacenotify_test.go5
-rw-r--r--internal/processing/workers/util.go200
-rw-r--r--internal/processing/workers/workers.go4
-rw-r--r--internal/processing/workers/workers_test.go88
9 files changed, 692 insertions, 373 deletions
diff --git a/internal/processing/workers/federate.go b/internal/processing/workers/federate.go
index 6e8b558c8..d14f38902 100644
--- a/internal/processing/workers/federate.go
+++ b/internal/processing/workers/federate.go
@@ -1127,17 +1127,17 @@ func (f *federate) MoveAccount(ctx context.Context, account *gtsmodel.Account) e
func (f *federate) AcceptInteraction(
ctx context.Context,
- approval *gtsmodel.InteractionApproval,
+ req *gtsmodel.InteractionRequest,
) error {
// Populate model.
- if err := f.state.DB.PopulateInteractionApproval(ctx, approval); err != nil {
- return gtserror.Newf("error populating approval: %w", err)
+ if err := f.state.DB.PopulateInteractionRequest(ctx, req); err != nil {
+ return gtserror.Newf("error populating request: %w", err)
}
// Bail if interacting account is ours:
// we've already accepted internally and
// shouldn't send an Accept to ourselves.
- if approval.InteractingAccount.IsLocal() {
+ if req.InteractingAccount.IsLocal() {
return nil
}
@@ -1145,27 +1145,27 @@ func (f *federate) AcceptInteraction(
// we can't Accept on another
// instance's behalf. (This
// should never happen but...)
- if approval.Account.IsRemote() {
+ if req.TargetAccount.IsRemote() {
return nil
}
// Parse relevant URI(s).
- outboxIRI, err := parseURI(approval.Account.OutboxURI)
+ outboxIRI, err := parseURI(req.TargetAccount.OutboxURI)
if err != nil {
return err
}
- acceptingAcctIRI, err := parseURI(approval.Account.URI)
+ acceptingAcctIRI, err := parseURI(req.TargetAccount.URI)
if err != nil {
return err
}
- interactingAcctURI, err := parseURI(approval.InteractingAccount.URI)
+ interactingAcctURI, err := parseURI(req.InteractingAccount.URI)
if err != nil {
return err
}
- interactionURI, err := parseURI(approval.InteractionURI)
+ interactionURI, err := parseURI(req.InteractionURI)
if err != nil {
return err
}
@@ -1190,7 +1190,79 @@ func (f *federate) AcceptInteraction(
); err != nil {
return gtserror.Newf(
"error sending activity %T for %v via outbox %s: %w",
- accept, approval.InteractionType, outboxIRI, err,
+ accept, req.InteractionType, outboxIRI, err,
+ )
+ }
+
+ return nil
+}
+
+func (f *federate) RejectInteraction(
+ ctx context.Context,
+ req *gtsmodel.InteractionRequest,
+) error {
+ // Populate model.
+ if err := f.state.DB.PopulateInteractionRequest(ctx, req); err != nil {
+ return gtserror.Newf("error populating request: %w", err)
+ }
+
+ // Bail if interacting account is ours:
+ // we've already rejected internally and
+ // shouldn't send an Reject to ourselves.
+ if req.InteractingAccount.IsLocal() {
+ return nil
+ }
+
+ // Bail if account isn't ours:
+ // we can't Reject on another
+ // instance's behalf. (This
+ // should never happen but...)
+ if req.TargetAccount.IsRemote() {
+ return nil
+ }
+
+ // Parse relevant URI(s).
+ outboxIRI, err := parseURI(req.TargetAccount.OutboxURI)
+ if err != nil {
+ return err
+ }
+
+ rejectingAcctIRI, err := parseURI(req.TargetAccount.URI)
+ if err != nil {
+ return err
+ }
+
+ interactingAcctURI, err := parseURI(req.InteractingAccount.URI)
+ if err != nil {
+ return err
+ }
+
+ interactionURI, err := parseURI(req.InteractionURI)
+ if err != nil {
+ return err
+ }
+
+ // Create a new Reject.
+ reject := streams.NewActivityStreamsReject()
+
+ // Set interacted-with account
+ // as Actor of the Reject.
+ ap.AppendActorIRIs(reject, rejectingAcctIRI)
+
+ // Set the interacted-with object
+ // as Object of the Reject.
+ ap.AppendObjectIRIs(reject, interactionURI)
+
+ // Address the Reject To the interacting acct.
+ ap.AppendTo(reject, interactingAcctURI)
+
+ // Send the Reject via the Actor's outbox.
+ if _, err := f.FederatingActor().Send(
+ ctx, outboxIRI, reject,
+ ); err != nil {
+ return gtserror.Newf(
+ "error sending activity %T for %v via outbox %s: %w",
+ reject, req.InteractionType, outboxIRI, err,
)
}
diff --git a/internal/processing/workers/fromclientapi.go b/internal/processing/workers/fromclientapi.go
index 1a37341f8..c723a6001 100644
--- a/internal/processing/workers/fromclientapi.go
+++ b/internal/processing/workers/fromclientapi.go
@@ -20,18 +20,23 @@ package workers
import (
"context"
"errors"
+ "time"
"codeberg.org/gruf/go-kv"
"codeberg.org/gruf/go-logger/v2/level"
"github.com/superseriousbusiness/gotosocial/internal/ap"
"github.com/superseriousbusiness/gotosocial/internal/db"
+ "github.com/superseriousbusiness/gotosocial/internal/gtscontext"
"github.com/superseriousbusiness/gotosocial/internal/gtserror"
"github.com/superseriousbusiness/gotosocial/internal/gtsmodel"
+ "github.com/superseriousbusiness/gotosocial/internal/id"
"github.com/superseriousbusiness/gotosocial/internal/log"
"github.com/superseriousbusiness/gotosocial/internal/messages"
"github.com/superseriousbusiness/gotosocial/internal/processing/account"
+ "github.com/superseriousbusiness/gotosocial/internal/processing/common"
"github.com/superseriousbusiness/gotosocial/internal/state"
"github.com/superseriousbusiness/gotosocial/internal/typeutils"
+ "github.com/superseriousbusiness/gotosocial/internal/uris"
"github.com/superseriousbusiness/gotosocial/internal/util"
)
@@ -44,6 +49,7 @@ type clientAPI struct {
surface *Surface
federate *federate
account *account.Processor
+ common *common.Processor
utils *utils
}
@@ -160,6 +166,18 @@ func (p *Processor) ProcessFromClientAPI(ctx context.Context, cMsg *messages.Fro
// REJECT USER (ie., new user+account sign-up)
case ap.ObjectProfile:
return p.clientAPI.RejectUser(ctx, cMsg)
+
+ // REJECT NOTE/STATUS (ie., reject a reply)
+ case ap.ObjectNote:
+ return p.clientAPI.RejectReply(ctx, cMsg)
+
+ // REJECT LIKE
+ case ap.ActivityLike:
+ return p.clientAPI.RejectLike(ctx, cMsg)
+
+ // REJECT BOOST
+ case ap.ActivityAnnounce:
+ return p.clientAPI.RejectAnnounce(ctx, cMsg)
}
// UNDO SOMETHING
@@ -261,15 +279,13 @@ func (p *clientAPI) CreateStatus(ctx context.Context, cMsg *messages.FromClientA
// and/or notify the account that's being
// interacted with (if it's local): they can
// approve or deny the interaction later.
-
- // Notify *local* account of pending reply.
- if err := p.surface.notifyPendingReply(ctx, status); err != nil {
- log.Errorf(ctx, "error notifying pending reply: %v", err)
+ if err := p.utils.requestReply(ctx, status); err != nil {
+ return gtserror.Newf("error pending reply: %w", err)
}
// Send Create to *remote* account inbox ONLY.
if err := p.federate.CreateStatus(ctx, status); err != nil {
- log.Errorf(ctx, "error federating pending reply: %v", err)
+ return gtserror.Newf("error federating pending reply: %w", err)
}
// Return early.
@@ -285,14 +301,38 @@ func (p *clientAPI) CreateStatus(ctx context.Context, cMsg *messages.FromClientA
// sending out the Create with the approval
// URI attached.
- // Put approval in the database and
- // update the status with approvedBy URI.
- approval, err := p.utils.approveReply(ctx, status)
- if err != nil {
- return gtserror.Newf("error pre-approving reply: %w", err)
+ // Store an already-accepted interaction request.
+ id := id.NewULID()
+ approval := &gtsmodel.InteractionRequest{
+ ID: id,
+ StatusID: status.InReplyToID,
+ TargetAccountID: status.InReplyToAccountID,
+ TargetAccount: status.InReplyToAccount,
+ InteractingAccountID: status.AccountID,
+ InteractingAccount: status.Account,
+ InteractionURI: status.URI,
+ InteractionType: gtsmodel.InteractionLike,
+ Reply: status,
+ URI: uris.GenerateURIForAccept(status.InReplyToAccount.Username, id),
+ AcceptedAt: time.Now(),
+ }
+ if err := p.state.DB.PutInteractionRequest(ctx, approval); err != nil {
+ return gtserror.Newf("db error putting pre-approved interaction request: %w", err)
+ }
+
+ // Mark the status as now approved.
+ status.PendingApproval = util.Ptr(false)
+ status.PreApproved = false
+ status.ApprovedByURI = approval.URI
+ if err := p.state.DB.UpdateStatus(
+ ctx,
+ status,
+ "pending_approval",
+ "approved_by_uri",
+ ); err != nil {
+ return gtserror.Newf("db error updating status: %w", err)
}
- // Send out the approval as Accept.
if err := p.federate.AcceptInteraction(ctx, approval); err != nil {
return gtserror.Newf("error federating pre-approval of reply: %w", err)
}
@@ -309,16 +349,16 @@ func (p *clientAPI) CreateStatus(ctx context.Context, cMsg *messages.FromClientA
log.Errorf(ctx, "error timelining and notifying status: %v", err)
}
+ if err := p.federate.CreateStatus(ctx, status); err != nil {
+ log.Errorf(ctx, "error federating status: %v", err)
+ }
+
if status.InReplyToID != "" {
// Interaction counts changed on the replied status;
// uncache the prepared version from all timelines.
p.surface.invalidateStatusFromTimelines(ctx, status.InReplyToID)
}
- if err := p.federate.CreateStatus(ctx, status); err != nil {
- log.Errorf(ctx, "error federating status: %v", err)
- }
-
return nil
}
@@ -344,9 +384,6 @@ func (p *clientAPI) CreatePollVote(ctx context.Context, cMsg *messages.FromClien
status := vote.Poll.Status
status.Poll = vote.Poll
- // Interaction counts changed on the source status, uncache from timelines.
- p.surface.invalidateStatusFromTimelines(ctx, vote.Poll.StatusID)
-
if *status.Local {
// These are poll votes in a local status, we only need to
// federate the updated status model with latest vote counts.
@@ -360,6 +397,9 @@ func (p *clientAPI) CreatePollVote(ctx context.Context, cMsg *messages.FromClien
}
}
+ // Interaction counts changed on the source status, uncache from timelines.
+ p.surface.invalidateStatusFromTimelines(ctx, vote.Poll.StatusID)
+
return nil
}
@@ -429,10 +469,7 @@ func (p *clientAPI) CreateLike(ctx context.Context, cMsg *messages.FromClientAPI
// If pending approval is true then fave must
// target a status (either one of ours or a
// remote) that requires approval for the fave.
- pendingApproval := util.PtrOrValue(
- fave.PendingApproval,
- false,
- )
+ pendingApproval := util.PtrOrZero(fave.PendingApproval)
switch {
case pendingApproval && !fave.PreApproved:
@@ -442,15 +479,13 @@ func (p *clientAPI) CreateLike(ctx context.Context, cMsg *messages.FromClientAPI
// and/or notify the account that's being
// interacted with (if it's local): they can
// approve or deny the interaction later.
-
- // Notify *local* account of pending reply.
- if err := p.surface.notifyPendingFave(ctx, fave); err != nil {
- log.Errorf(ctx, "error notifying pending fave: %v", err)
+ if err := p.utils.requestFave(ctx, fave); err != nil {
+ return gtserror.Newf("error pending fave: %w", err)
}
// Send Like to *remote* account inbox ONLY.
if err := p.federate.Like(ctx, fave); err != nil {
- log.Errorf(ctx, "error federating pending Like: %v", err)
+ return gtserror.Newf("error federating pending Like: %v", err)
}
// Return early.
@@ -466,14 +501,38 @@ func (p *clientAPI) CreateLike(ctx context.Context, cMsg *messages.FromClientAPI
// sending out the Like with the approval
// URI attached.
- // Put approval in the database and
- // update the fave with approvedBy URI.
- approval, err := p.utils.approveFave(ctx, fave)
- if err != nil {
- return gtserror.Newf("error pre-approving fave: %w", err)
+ // Store an already-accepted interaction request.
+ id := id.NewULID()
+ approval := &gtsmodel.InteractionRequest{
+ ID: id,
+ StatusID: fave.StatusID,
+ TargetAccountID: fave.TargetAccountID,
+ TargetAccount: fave.TargetAccount,
+ InteractingAccountID: fave.AccountID,
+ InteractingAccount: fave.Account,
+ InteractionURI: fave.URI,
+ InteractionType: gtsmodel.InteractionLike,
+ Like: fave,
+ URI: uris.GenerateURIForAccept(fave.TargetAccount.Username, id),
+ AcceptedAt: time.Now(),
+ }
+ if err := p.state.DB.PutInteractionRequest(ctx, approval); err != nil {
+ return gtserror.Newf("db error putting pre-approved interaction request: %w", err)
+ }
+
+ // Mark the fave itself as now approved.
+ fave.PendingApproval = util.Ptr(false)
+ fave.PreApproved = false
+ fave.ApprovedByURI = approval.URI
+ if err := p.state.DB.UpdateStatusFave(
+ ctx,
+ fave,
+ "pending_approval",
+ "approved_by_uri",
+ ); err != nil {
+ return gtserror.Newf("db error updating status fave: %w", err)
}
- // Send out the approval as Accept.
if err := p.federate.AcceptInteraction(ctx, approval); err != nil {
return gtserror.Newf("error federating pre-approval of fave: %w", err)
}
@@ -485,14 +544,14 @@ func (p *clientAPI) CreateLike(ctx context.Context, cMsg *messages.FromClientAPI
log.Errorf(ctx, "error notifying fave: %v", err)
}
- // Interaction counts changed on the faved status;
- // uncache the prepared version from all timelines.
- p.surface.invalidateStatusFromTimelines(ctx, fave.StatusID)
-
if err := p.federate.Like(ctx, fave); err != nil {
log.Errorf(ctx, "error federating like: %v", err)
}
+ // Interaction counts changed on the faved status;
+ // uncache the prepared version from all timelines.
+ p.surface.invalidateStatusFromTimelines(ctx, fave.StatusID)
+
return nil
}
@@ -505,10 +564,7 @@ func (p *clientAPI) CreateAnnounce(ctx context.Context, cMsg *messages.FromClien
// If pending approval is true then status must
// boost a status (either one of ours or a
// remote) that requires approval for the boost.
- pendingApproval := util.PtrOrValue(
- boost.PendingApproval,
- false,
- )
+ pendingApproval := util.PtrOrZero(boost.PendingApproval)
switch {
case pendingApproval && !boost.PreApproved:
@@ -518,15 +574,13 @@ func (p *clientAPI) CreateAnnounce(ctx context.Context, cMsg *messages.FromClien
// and/or notify the account that's being
// interacted with (if it's local): they can
// approve or deny the interaction later.
-
- // Notify *local* account of pending announce.
- if err := p.surface.notifyPendingAnnounce(ctx, boost); err != nil {
- log.Errorf(ctx, "error notifying pending boost: %v", err)
+ if err := p.utils.requestAnnounce(ctx, boost); err != nil {
+ return gtserror.Newf("error pending boost: %w", err)
}
// Send Announce to *remote* account inbox ONLY.
if err := p.federate.Announce(ctx, boost); err != nil {
- log.Errorf(ctx, "error federating pending Announce: %v", err)
+ return gtserror.Newf("error federating pending Announce: %v", err)
}
// Return early.
@@ -542,14 +596,38 @@ func (p *clientAPI) CreateAnnounce(ctx context.Context, cMsg *messages.FromClien
// sending out the Create with the approval
// URI attached.
- // Put approval in the database and
- // update the boost with approvedBy URI.
- approval, err := p.utils.approveAnnounce(ctx, boost)
- if err != nil {
- return gtserror.Newf("error pre-approving boost: %w", err)
+ // Store an already-accepted interaction request.
+ id := id.NewULID()
+ approval := &gtsmodel.InteractionRequest{
+ ID: id,
+ StatusID: boost.BoostOfID,
+ TargetAccountID: boost.BoostOfAccountID,
+ TargetAccount: boost.BoostOfAccount,
+ InteractingAccountID: boost.AccountID,
+ InteractingAccount: boost.Account,
+ InteractionURI: boost.URI,
+ InteractionType: gtsmodel.InteractionLike,
+ Announce: boost,
+ URI: uris.GenerateURIForAccept(boost.BoostOfAccount.Username, id),
+ AcceptedAt: time.Now(),
+ }
+ if err := p.state.DB.PutInteractionRequest(ctx, approval); err != nil {
+ return gtserror.Newf("db error putting pre-approved interaction request: %w", err)
+ }
+
+ // Mark the boost itself as now approved.
+ boost.PendingApproval = util.Ptr(false)
+ boost.PreApproved = false
+ boost.ApprovedByURI = approval.URI
+ if err := p.state.DB.UpdateStatus(
+ ctx,
+ boost,
+ "pending_approval",
+ "approved_by_uri",
+ ); err != nil {
+ return gtserror.Newf("db error updating status: %w", err)
}
- // Send out the approval as Accept.
if err := p.federate.AcceptInteraction(ctx, approval); err != nil {
return gtserror.Newf("error federating pre-approval of boost: %w", err)
}
@@ -572,14 +650,14 @@ func (p *clientAPI) CreateAnnounce(ctx context.Context, cMsg *messages.FromClien
log.Errorf(ctx, "error notifying boost: %v", err)
}
- // Interaction counts changed on the boosted status;
- // uncache the prepared version from all timelines.
- p.surface.invalidateStatusFromTimelines(ctx, boost.BoostOfID)
-
if err := p.federate.Announce(ctx, boost); err != nil {
log.Errorf(ctx, "error federating announce: %v", err)
}
+ // Interaction counts changed on the boosted status;
+ // uncache the prepared version from all timelines.
+ p.surface.invalidateStatusFromTimelines(ctx, boost.BoostOfID)
+
return nil
}
@@ -629,9 +707,6 @@ func (p *clientAPI) UpdateStatus(ctx context.Context, cMsg *messages.FromClientA
log.Errorf(ctx, "error federating status update: %v", err)
}
- // Status representation has changed, invalidate from timelines.
- p.surface.invalidateStatusFromTimelines(ctx, status.ID)
-
if status.Poll != nil && status.Poll.Closing {
// If the latest status has a newly closed poll, at least compared
@@ -646,6 +721,9 @@ func (p *clientAPI) UpdateStatus(ctx context.Context, cMsg *messages.FromClientA
log.Errorf(ctx, "error streaming status edit: %v", err)
}
+ // Status representation has changed, invalidate from timelines.
+ p.surface.invalidateStatusFromTimelines(ctx, status.ID)
+
return nil
}
@@ -791,14 +869,14 @@ func (p *clientAPI) UndoFave(ctx context.Context, cMsg *messages.FromClientAPI)
return gtserror.Newf("%T not parseable as *gtsmodel.StatusFave", cMsg.GTSModel)
}
- // Interaction counts changed on the faved status;
- // uncache the prepared version from all timelines.
- p.surface.invalidateStatusFromTimelines(ctx, statusFave.StatusID)
-
if err := p.federate.UndoLike(ctx, statusFave); err != nil {
log.Errorf(ctx, "error federating like undo: %v", err)
}
+ // Interaction counts changed on the faved status;
+ // uncache the prepared version from all timelines.
+ p.surface.invalidateStatusFromTimelines(ctx, statusFave.StatusID)
+
return nil
}
@@ -821,14 +899,14 @@ func (p *clientAPI) UndoAnnounce(ctx context.Context, cMsg *messages.FromClientA
log.Errorf(ctx, "error removing timelined status: %v", err)
}
- // Interaction counts changed on the boosted status;
- // uncache the prepared version from all timelines.
- p.surface.invalidateStatusFromTimelines(ctx, status.BoostOfID)
-
if err := p.federate.UndoAnnounce(ctx, status); err != nil {
log.Errorf(ctx, "error federating announce undo: %v", err)
}
+ // Interaction counts changed on the boosted status;
+ // uncache the prepared version from all timelines.
+ p.surface.invalidateStatusFromTimelines(ctx, status.BoostOfID)
+
return nil
}
@@ -874,16 +952,16 @@ func (p *clientAPI) DeleteStatus(ctx context.Context, cMsg *messages.FromClientA
log.Errorf(ctx, "error updating account stats: %v", err)
}
+ if err := p.federate.DeleteStatus(ctx, status); err != nil {
+ log.Errorf(ctx, "error federating status delete: %v", err)
+ }
+
if status.InReplyToID != "" {
// Interaction counts changed on the replied status;
// uncache the prepared version from all timelines.
p.surface.invalidateStatusFromTimelines(ctx, status.InReplyToID)
}
- if err := p.federate.DeleteStatus(ctx, status); err != nil {
- log.Errorf(ctx, "error federating status delete: %v", err)
- }
-
return nil
}
@@ -1050,16 +1128,188 @@ func (p *clientAPI) RejectUser(ctx context.Context, cMsg *messages.FromClientAPI
}
func (p *clientAPI) AcceptLike(ctx context.Context, cMsg *messages.FromClientAPI) error {
- // TODO
+ req, ok := cMsg.GTSModel.(*gtsmodel.InteractionRequest)
+ if !ok {
+ return gtserror.Newf("%T not parseable as *gtsmodel.InteractionRequest", cMsg.GTSModel)
+ }
+
+ // Notify the fave (distinct from the notif for the pending fave).
+ if err := p.surface.notifyFave(ctx, req.Like); err != nil {
+ log.Errorf(ctx, "error notifying fave: %v", err)
+ }
+
+ // Send out the Accept.
+ if err := p.federate.AcceptInteraction(ctx, req); err != nil {
+ log.Errorf(ctx, "error federating approval of like: %v", err)
+ }
+
+ // Interaction counts changed on the faved status;
+ // uncache the prepared version from all timelines.
+ p.surface.invalidateStatusFromTimelines(ctx, req.Like.StatusID)
+
return nil
}
func (p *clientAPI) AcceptReply(ctx context.Context, cMsg *messages.FromClientAPI) error {
- // TODO
+ req, ok := cMsg.GTSModel.(*gtsmodel.InteractionRequest)
+ if !ok {
+ return gtserror.Newf("%T not parseable as *gtsmodel.InteractionRequest", cMsg.GTSModel)
+ }
+
+ var (
+ interactingAcct = req.InteractingAccount
+ reply = req.Reply
+ )
+
+ // Update stats for the reply author account.
+ if err := p.utils.incrementStatusesCount(ctx, interactingAcct, reply); err != nil {
+ log.Errorf(ctx, "error updating account stats: %v", err)
+ }
+
+ // Timeline the reply + notify relevant accounts.
+ if err := p.surface.timelineAndNotifyStatus(ctx, reply); err != nil {
+ log.Errorf(ctx, "error timelining and notifying status reply: %v", err)
+ }
+
+ // Send out the Accept.
+ if err := p.federate.AcceptInteraction(ctx, req); err != nil {
+ log.Errorf(ctx, "error federating approval of reply: %v", err)
+ }
+
+ // Interaction counts changed on the replied status;
+ // uncache the prepared version from all timelines.
+ p.surface.invalidateStatusFromTimelines(ctx, reply.InReplyToID)
+
return nil
}
func (p *clientAPI) AcceptAnnounce(ctx context.Context, cMsg *messages.FromClientAPI) error {
- // TODO
+ req, ok := cMsg.GTSModel.(*gtsmodel.InteractionRequest)
+ if !ok {
+ return gtserror.Newf("%T not parseable as *gtsmodel.InteractionRequest", cMsg.GTSModel)
+ }
+
+ var (
+ interactingAcct = req.InteractingAccount
+ boost = req.Announce
+ )
+
+ // Update stats for the boost author account.
+ if err := p.utils.incrementStatusesCount(ctx, interactingAcct, boost); err != nil {
+ log.Errorf(ctx, "error updating account stats: %v", err)
+ }
+
+ // Timeline and notify the announce.
+ if err := p.surface.timelineAndNotifyStatus(ctx, boost); err != nil {
+ log.Errorf(ctx, "error timelining and notifying status: %v", err)
+ }
+
+ // Notify the announce (distinct from the notif for the pending announce).
+ if err := p.surface.notifyAnnounce(ctx, boost); err != nil {
+ log.Errorf(ctx, "error notifying announce: %v", err)
+ }
+
+ // Send out the Accept.
+ if err := p.federate.AcceptInteraction(ctx, req); err != nil {
+ log.Errorf(ctx, "error federating approval of announce: %v", err)
+ }
+
+ // Interaction counts changed on the original status;
+ // uncache the prepared version from all timelines.
+ p.surface.invalidateStatusFromTimelines(ctx, boost.BoostOfID)
+
+ return nil
+}
+
+func (p *clientAPI) RejectLike(ctx context.Context, cMsg *messages.FromClientAPI) error {
+ req, ok := cMsg.GTSModel.(*gtsmodel.InteractionRequest)
+ if !ok {
+ return gtserror.Newf("%T not parseable as *gtsmodel.InteractionRequest", cMsg.GTSModel)
+ }
+
+ // At this point the InteractionRequest should already
+ // be in the database, we just need to do side effects.
+
+ // Send out the Reject.
+ if err := p.federate.RejectInteraction(ctx, req); err != nil {
+ log.Errorf(ctx, "error federating rejection of like: %v", err)
+ }
+
+ // Get the rejected fave.
+ fave, err := p.state.DB.GetStatusFaveByURI(
+ gtscontext.SetBarebones(ctx),
+ req.InteractionURI,
+ )
+ if err != nil {
+ return gtserror.Newf("db error getting rejected fave: %w", err)
+ }
+
+ // Delete the status fave.
+ if err := p.state.DB.DeleteStatusFaveByID(ctx, fave.ID); err != nil {
+ return gtserror.Newf("db error deleting status fave: %w", err)
+ }
+
+ return nil
+}
+
+func (p *clientAPI) RejectReply(ctx context.Context, cMsg *messages.FromClientAPI) error {
+ req, ok := cMsg.GTSModel.(*gtsmodel.InteractionRequest)
+ if !ok {
+ return gtserror.Newf("%T not parseable as *gtsmodel.InteractionRequest", cMsg.GTSModel)
+ }
+
+ // At this point the InteractionRequest should already
+ // be in the database, we just need to do side effects.
+
+ // Send out the Reject.
+ if err := p.federate.RejectInteraction(ctx, req); err != nil {
+ log.Errorf(ctx, "error federating rejection of reply: %v", err)
+ }
+
+ // Get the rejected status.
+ status, err := p.state.DB.GetStatusByURI(
+ gtscontext.SetBarebones(ctx),
+ req.InteractionURI,
+ )
+ if err != nil {
+ return gtserror.Newf("db error getting rejected reply: %w", err)
+ }
+
+ // Totally wipe the status.
+ if err := p.utils.wipeStatus(ctx, status, true); err != nil {
+ return gtserror.Newf("error wiping status: %w", err)
+ }
+
+ return nil
+}
+
+func (p *clientAPI) RejectAnnounce(ctx context.Context, cMsg *messages.FromClientAPI) error {
+ req, ok := cMsg.GTSModel.(*gtsmodel.InteractionRequest)
+ if !ok {
+ return gtserror.Newf("%T not parseable as *gtsmodel.InteractionRequest", cMsg.GTSModel)
+ }
+
+ // At this point the InteractionRequest should already
+ // be in the database, we just need to do side effects.
+
+ // Send out the Reject.
+ if err := p.federate.RejectInteraction(ctx, req); err != nil {
+ log.Errorf(ctx, "error federating rejection of announce: %v", err)
+ }
+
+ // Get the rejected boost.
+ boost, err := p.state.DB.GetStatusByURI(
+ gtscontext.SetBarebones(ctx),
+ req.InteractionURI,
+ )
+ if err != nil {
+ return gtserror.Newf("db error getting rejected announce: %w", err)
+ }
+
+ // Totally wipe the status.
+ if err := p.utils.wipeStatus(ctx, boost, true); err != nil {
+ return gtserror.Newf("error wiping status: %w", err)
+ }
+
return nil
}
diff --git a/internal/processing/workers/fromclientapi_test.go b/internal/processing/workers/fromclientapi_test.go
index b4eae0be0..d330e4c2b 100644
--- a/internal/processing/workers/fromclientapi_test.go
+++ b/internal/processing/workers/fromclientapi_test.go
@@ -231,8 +231,8 @@ func (suite *FromClientAPITestSuite) conversationJSON(
}
func (suite *FromClientAPITestSuite) TestProcessCreateStatusWithNotification() {
- testStructs := suite.SetupTestStructs()
- defer suite.TearDownTestStructs(testStructs)
+ testStructs := testrig.SetupTestStructs(rMediaPath, rTemplatePath)
+ defer testrig.TearDownTestStructs(testStructs)
var (
ctx = context.Background()
@@ -344,8 +344,8 @@ func (suite *FromClientAPITestSuite) TestProcessCreateStatusWithNotification() {
}
func (suite *FromClientAPITestSuite) TestProcessCreateStatusReply() {
- testStructs := suite.SetupTestStructs()
- defer suite.TearDownTestStructs(testStructs)
+ testStructs := testrig.SetupTestStructs(rMediaPath, rTemplatePath)
+ defer testrig.TearDownTestStructs(testStructs)
var (
ctx = context.Background()
@@ -412,8 +412,8 @@ func (suite *FromClientAPITestSuite) TestProcessCreateStatusReply() {
}
func (suite *FromClientAPITestSuite) TestProcessCreateStatusReplyMuted() {
- testStructs := suite.SetupTestStructs()
- defer suite.TearDownTestStructs(testStructs)
+ testStructs := testrig.SetupTestStructs(rMediaPath, rTemplatePath)
+ defer testrig.TearDownTestStructs(testStructs)
var (
ctx = context.Background()
@@ -473,8 +473,8 @@ func (suite *FromClientAPITestSuite) TestProcessCreateStatusReplyMuted() {
}
func (suite *FromClientAPITestSuite) TestProcessCreateStatusBoostMuted() {
- testStructs := suite.SetupTestStructs()
- defer suite.TearDownTestStructs(testStructs)
+ testStructs := testrig.SetupTestStructs(rMediaPath, rTemplatePath)
+ defer testrig.TearDownTestStructs(testStructs)
var (
ctx = context.Background()
@@ -534,8 +534,8 @@ func (suite *FromClientAPITestSuite) TestProcessCreateStatusBoostMuted() {
}
func (suite *FromClientAPITestSuite) TestProcessCreateStatusListRepliesPolicyListOnlyOK() {
- testStructs := suite.SetupTestStructs()
- defer suite.TearDownTestStructs(testStructs)
+ testStructs := testrig.SetupTestStructs(rMediaPath, rTemplatePath)
+ defer testrig.TearDownTestStructs(testStructs)
// We're modifying the test list so take a copy.
testList := new(gtsmodel.List)
@@ -610,8 +610,8 @@ func (suite *FromClientAPITestSuite) TestProcessCreateStatusListRepliesPolicyLis
}
func (suite *FromClientAPITestSuite) TestProcessCreateStatusListRepliesPolicyListOnlyNo() {
- testStructs := suite.SetupTestStructs()
- defer suite.TearDownTestStructs(testStructs)
+ testStructs := testrig.SetupTestStructs(rMediaPath, rTemplatePath)
+ defer testrig.TearDownTestStructs(testStructs)
// We're modifying the test list so take a copy.
testList := new(gtsmodel.List)
@@ -691,8 +691,8 @@ func (suite *FromClientAPITestSuite) TestProcessCreateStatusListRepliesPolicyLis
}
func (suite *FromClientAPITestSuite) TestProcessCreateStatusReplyListRepliesPolicyNone() {
- testStructs := suite.SetupTestStructs()
- defer suite.TearDownTestStructs(testStructs)
+ testStructs := testrig.SetupTestStructs(rMediaPath, rTemplatePath)
+ defer testrig.TearDownTestStructs(testStructs)
// We're modifying the test list so take a copy.
testList := new(gtsmodel.List)
@@ -767,8 +767,8 @@ func (suite *FromClientAPITestSuite) TestProcessCreateStatusReplyListRepliesPoli
}
func (suite *FromClientAPITestSuite) TestProcessCreateStatusBoost() {
- testStructs := suite.SetupTestStructs()
- defer suite.TearDownTestStructs(testStructs)
+ testStructs := testrig.SetupTestStructs(rMediaPath, rTemplatePath)
+ defer testrig.TearDownTestStructs(testStructs)
var (
ctx = context.Background()
@@ -831,8 +831,8 @@ func (suite *FromClientAPITestSuite) TestProcessCreateStatusBoost() {
}
func (suite *FromClientAPITestSuite) TestProcessCreateStatusBoostNoReblogs() {
- testStructs := suite.SetupTestStructs()
- defer suite.TearDownTestStructs(testStructs)
+ testStructs := testrig.SetupTestStructs(rMediaPath, rTemplatePath)
+ defer testrig.TearDownTestStructs(testStructs)
var (
ctx = context.Background()
@@ -898,8 +898,8 @@ func (suite *FromClientAPITestSuite) TestProcessCreateStatusBoostNoReblogs() {
// A DM to a local user should create a conversation and accompanying notification.
func (suite *FromClientAPITestSuite) TestProcessCreateStatusWhichBeginsConversation() {
- testStructs := suite.SetupTestStructs()
- defer suite.TearDownTestStructs(testStructs)
+ testStructs := testrig.SetupTestStructs(rMediaPath, rTemplatePath)
+ defer testrig.TearDownTestStructs(testStructs)
var (
ctx = context.Background()
@@ -984,8 +984,8 @@ func (suite *FromClientAPITestSuite) TestProcessCreateStatusWhichBeginsConversat
// A public message to a local user should not result in a conversation notification.
func (suite *FromClientAPITestSuite) TestProcessCreateStatusWhichShouldNotCreateConversation() {
- testStructs := suite.SetupTestStructs()
- defer suite.TearDownTestStructs(testStructs)
+ testStructs := testrig.SetupTestStructs(rMediaPath, rTemplatePath)
+ defer testrig.TearDownTestStructs(testStructs)
var (
ctx = context.Background()
@@ -1054,8 +1054,8 @@ func (suite *FromClientAPITestSuite) TestProcessCreateStatusWhichShouldNotCreate
// A public status with a hashtag followed by a local user who does not otherwise follow the author
// should end up in the tag-following user's home timeline.
func (suite *FromClientAPITestSuite) TestProcessCreateStatusWithFollowedHashtag() {
- testStructs := suite.SetupTestStructs()
- defer suite.TearDownTestStructs(testStructs)
+ testStructs := testrig.SetupTestStructs(rMediaPath, rTemplatePath)
+ defer testrig.TearDownTestStructs(testStructs)
var (
ctx = context.Background()
@@ -1128,8 +1128,8 @@ func (suite *FromClientAPITestSuite) TestProcessCreateStatusWithFollowedHashtag(
// should not end up in the tag-following user's home timeline
// if the user has the author blocked.
func (suite *FromClientAPITestSuite) TestProcessCreateStatusWithFollowedHashtagAndBlock() {
- testStructs := suite.SetupTestStructs()
- defer suite.TearDownTestStructs(testStructs)
+ testStructs := testrig.SetupTestStructs(rMediaPath, rTemplatePath)
+ defer testrig.TearDownTestStructs(testStructs)
var (
ctx = context.Background()
@@ -1209,8 +1209,8 @@ func (suite *FromClientAPITestSuite) TestProcessCreateStatusWithFollowedHashtagA
// who does not otherwise follow the author or booster
// should end up in the tag-following user's home timeline as the original status.
func (suite *FromClientAPITestSuite) TestProcessCreateBoostWithFollowedHashtag() {
- testStructs := suite.SetupTestStructs()
- defer suite.TearDownTestStructs(testStructs)
+ testStructs := testrig.SetupTestStructs(rMediaPath, rTemplatePath)
+ defer testrig.TearDownTestStructs(testStructs)
var (
ctx = context.Background()
@@ -1312,8 +1312,8 @@ func (suite *FromClientAPITestSuite) TestProcessCreateBoostWithFollowedHashtag()
// should not end up in the tag-following user's home timeline
// if the user has the author blocked.
func (suite *FromClientAPITestSuite) TestProcessCreateBoostWithFollowedHashtagAndBlock() {
- testStructs := suite.SetupTestStructs()
- defer suite.TearDownTestStructs(testStructs)
+ testStructs := testrig.SetupTestStructs(rMediaPath, rTemplatePath)
+ defer testrig.TearDownTestStructs(testStructs)
var (
ctx = context.Background()
@@ -1422,8 +1422,8 @@ func (suite *FromClientAPITestSuite) TestProcessCreateBoostWithFollowedHashtagAn
// should not end up in the tag-following user's home timeline
// if the user has the booster blocked.
func (suite *FromClientAPITestSuite) TestProcessCreateBoostWithFollowedHashtagAndBlockedBoost() {
- testStructs := suite.SetupTestStructs()
- defer suite.TearDownTestStructs(testStructs)
+ testStructs := testrig.SetupTestStructs(rMediaPath, rTemplatePath)
+ defer testrig.TearDownTestStructs(testStructs)
var (
ctx = context.Background()
@@ -1530,8 +1530,8 @@ func (suite *FromClientAPITestSuite) TestProcessCreateBoostWithFollowedHashtagAn
// Updating a public status with a hashtag followed by a local user who does not otherwise follow the author
// should stream a status update to the tag-following user's home timeline.
func (suite *FromClientAPITestSuite) TestProcessUpdateStatusWithFollowedHashtag() {
- testStructs := suite.SetupTestStructs()
- defer suite.TearDownTestStructs(testStructs)
+ testStructs := testrig.SetupTestStructs(rMediaPath, rTemplatePath)
+ defer testrig.TearDownTestStructs(testStructs)
var (
ctx = context.Background()
@@ -1601,8 +1601,8 @@ func (suite *FromClientAPITestSuite) TestProcessUpdateStatusWithFollowedHashtag(
}
func (suite *FromClientAPITestSuite) TestProcessStatusDelete() {
- testStructs := suite.SetupTestStructs()
- defer suite.TearDownTestStructs(testStructs)
+ testStructs := testrig.SetupTestStructs(rMediaPath, rTemplatePath)
+ defer testrig.TearDownTestStructs(testStructs)
var (
ctx = context.Background()
diff --git a/internal/processing/workers/fromfediapi.go b/internal/processing/workers/fromfediapi.go
index ce7c53388..908369ca6 100644
--- a/internal/processing/workers/fromfediapi.go
+++ b/internal/processing/workers/fromfediapi.go
@@ -20,18 +20,22 @@ package workers
import (
"context"
"errors"
+ "time"
"codeberg.org/gruf/go-kv"
"codeberg.org/gruf/go-logger/v2/level"
"github.com/superseriousbusiness/gotosocial/internal/ap"
"github.com/superseriousbusiness/gotosocial/internal/db"
"github.com/superseriousbusiness/gotosocial/internal/federation/dereferencing"
+ "github.com/superseriousbusiness/gotosocial/internal/id"
+ "github.com/superseriousbusiness/gotosocial/internal/uris"
"github.com/superseriousbusiness/gotosocial/internal/gtserror"
"github.com/superseriousbusiness/gotosocial/internal/gtsmodel"
"github.com/superseriousbusiness/gotosocial/internal/log"
"github.com/superseriousbusiness/gotosocial/internal/messages"
"github.com/superseriousbusiness/gotosocial/internal/processing/account"
+ "github.com/superseriousbusiness/gotosocial/internal/processing/common"
"github.com/superseriousbusiness/gotosocial/internal/state"
"github.com/superseriousbusiness/gotosocial/internal/util"
)
@@ -44,6 +48,7 @@ type fediAPI struct {
surface *Surface
federate *federate
account *account.Processor
+ common *common.Processor
utils *utils
}
@@ -231,10 +236,7 @@ func (p *fediAPI) CreateStatus(ctx context.Context, fMsg *messages.FromFediAPI)
// If pending approval is true then
// status must reply to a LOCAL status
// that requires approval for the reply.
- pendingApproval := util.PtrOrValue(
- status.PendingApproval,
- false,
- )
+ pendingApproval := util.PtrOrZero(status.PendingApproval)
switch {
case pendingApproval && !status.PreApproved:
@@ -242,10 +244,8 @@ func (p *fediAPI) CreateStatus(ctx context.Context, fMsg *messages.FromFediAPI)
// preapproved, then just notify the account
// that's being interacted with: they can
// approve or deny the interaction later.
-
- // Notify *local* account of pending reply.
- if err := p.surface.notifyPendingReply(ctx, status); err != nil {
- log.Errorf(ctx, "error notifying pending reply: %v", err)
+ if err := p.utils.requestReply(ctx, status); err != nil {
+ return gtserror.Newf("error pending reply: %w", err)
}
// Return early.
@@ -259,11 +259,33 @@ func (p *fediAPI) CreateStatus(ctx context.Context, fMsg *messages.FromFediAPI)
// collection. Do the Accept immediately and
// then process everything else as normal.
- // Put approval in the database and
- // update the status with approvedBy URI.
- approval, err := p.utils.approveReply(ctx, status)
- if err != nil {
- return gtserror.Newf("error pre-approving reply: %w", err)
+ // Store an already-accepted interaction request.
+ id := id.NewULID()
+ approval := &gtsmodel.InteractionRequest{
+ ID: id,
+ StatusID: status.InReplyToID,
+ TargetAccountID: status.InReplyToAccountID,
+ TargetAccount: status.InReplyToAccount,
+ InteractingAccountID: status.AccountID,
+ InteractingAccount: status.Account,
+ InteractionURI: status.URI,
+ InteractionType: gtsmodel.InteractionLike,
+ Reply: status,
+ URI: uris.GenerateURIForAccept(status.InReplyToAccount.Username, id),
+ AcceptedAt: time.Now(),
+ }
+
+ // Mark the status as now approved.
+ status.PendingApproval = util.Ptr(false)
+ status.PreApproved = false
+ status.ApprovedByURI = approval.URI
+ if err := p.state.DB.UpdateStatus(
+ ctx,
+ status,
+ "pending_approval",
+ "approved_by_uri",
+ ); err != nil {
+ return gtserror.Newf("db error updating status: %w", err)
}
// Send out the approval as Accept.
@@ -279,6 +301,10 @@ func (p *fediAPI) CreateStatus(ctx context.Context, fMsg *messages.FromFediAPI)
log.Errorf(ctx, "error updating account stats: %v", err)
}
+ if err := p.surface.timelineAndNotifyStatus(ctx, status); err != nil {
+ log.Errorf(ctx, "error timelining and notifying status: %v", err)
+ }
+
if status.InReplyToID != "" {
// Interaction counts changed on the replied status; uncache the
// prepared version from all timelines. The status dereferencer
@@ -286,10 +312,6 @@ func (p *fediAPI) CreateStatus(ctx context.Context, fMsg *messages.FromFediAPI)
p.surface.invalidateStatusFromTimelines(ctx, status.InReplyToID)
}
- if err := p.surface.timelineAndNotifyStatus(ctx, status); err != nil {
- log.Errorf(ctx, "error timelining and notifying status: %v", err)
- }
-
return nil
}
@@ -320,9 +342,6 @@ func (p *fediAPI) CreatePollVote(ctx context.Context, fMsg *messages.FromFediAPI
status := vote.Poll.Status
status.Poll = vote.Poll
- // Interaction counts changed on the source status, uncache from timelines.
- p.surface.invalidateStatusFromTimelines(ctx, vote.Poll.StatusID)
-
if *status.Local {
// Before federating it, increment the
// poll vote counts on our local copy.
@@ -335,6 +354,9 @@ func (p *fediAPI) CreatePollVote(ctx context.Context, fMsg *messages.FromFediAPI
}
}
+ // Interaction counts changed on the source status, uncache from timelines.
+ p.surface.invalidateStatusFromTimelines(ctx, vote.Poll.StatusID)
+
return nil
}
@@ -409,10 +431,7 @@ func (p *fediAPI) CreateLike(ctx context.Context, fMsg *messages.FromFediAPI) er
// If pending approval is true then
// fave must target a LOCAL status
// that requires approval for the fave.
- pendingApproval := util.PtrOrValue(
- fave.PendingApproval,
- false,
- )
+ pendingApproval := util.PtrOrZero(fave.PendingApproval)
switch {
case pendingApproval && !fave.PreApproved:
@@ -420,10 +439,8 @@ func (p *fediAPI) CreateLike(ctx context.Context, fMsg *messages.FromFediAPI) er
// preapproved, then just notify the account
// that's being interacted with: they can
// approve or deny the interaction later.
-
- // Notify *local* account of pending fave.
- if err := p.surface.notifyPendingFave(ctx, fave); err != nil {
- log.Errorf(ctx, "error notifying pending fave: %v", err)
+ if err := p.utils.requestFave(ctx, fave); err != nil {
+ return gtserror.Newf("error pending fave: %w", err)
}
// Return early.
@@ -437,11 +454,33 @@ func (p *fediAPI) CreateLike(ctx context.Context, fMsg *messages.FromFediAPI) er
// collection. Do the Accept immediately and
// then process everything else as normal.
- // Put approval in the database and
- // update the fave with approvedBy URI.
- approval, err := p.utils.approveFave(ctx, fave)
- if err != nil {
- return gtserror.Newf("error pre-approving fave: %w", err)
+ // Store an already-accepted interaction request.
+ id := id.NewULID()
+ approval := &gtsmodel.InteractionRequest{
+ ID: id,
+ StatusID: fave.StatusID,
+ TargetAccountID: fave.TargetAccountID,
+ TargetAccount: fave.TargetAccount,
+ InteractingAccountID: fave.AccountID,
+ InteractingAccount: fave.Account,
+ InteractionURI: fave.URI,
+ InteractionType: gtsmodel.InteractionLike,
+ Like: fave,
+ URI: uris.GenerateURIForAccept(fave.TargetAccount.Username, id),
+ AcceptedAt: time.Now(),
+ }
+
+ // Mark the fave itself as now approved.
+ fave.PendingApproval = util.Ptr(false)
+ fave.PreApproved = false
+ fave.ApprovedByURI = approval.URI
+ if err := p.state.DB.UpdateStatusFave(
+ ctx,
+ fave,
+ "pending_approval",
+ "approved_by_uri",
+ ); err != nil {
+ return gtserror.Newf("db error updating status fave: %w", err)
}
// Send out the approval as Accept.
@@ -496,10 +535,7 @@ func (p *fediAPI) CreateAnnounce(ctx context.Context, fMsg *messages.FromFediAPI
// If pending approval is true then
// boost must target a LOCAL status
// that requires approval for the boost.
- pendingApproval := util.PtrOrValue(
- boost.PendingApproval,
- false,
- )
+ pendingApproval := util.PtrOrZero(boost.PendingApproval)
switch {
case pendingApproval && !boost.PreApproved:
@@ -507,10 +543,8 @@ func (p *fediAPI) CreateAnnounce(ctx context.Context, fMsg *messages.FromFediAPI
// preapproved, then just notify the account
// that's being interacted with: they can
// approve or deny the interaction later.
-
- // Notify *local* account of pending announce.
- if err := p.surface.notifyPendingAnnounce(ctx, boost); err != nil {
- log.Errorf(ctx, "error notifying pending boost: %v", err)
+ if err := p.utils.requestAnnounce(ctx, boost); err != nil {
+ return gtserror.Newf("error pending boost: %w", err)
}
// Return early.
@@ -524,11 +558,33 @@ func (p *fediAPI) CreateAnnounce(ctx context.Context, fMsg *messages.FromFediAPI
// collection. Do the Accept immediately and
// then process everything else as normal.
- // Put approval in the database and
- // update the boost with approvedBy URI.
- approval, err := p.utils.approveAnnounce(ctx, boost)
- if err != nil {
- return gtserror.Newf("error pre-approving boost: %w", err)
+ // Store an already-accepted interaction request.
+ id := id.NewULID()
+ approval := &gtsmodel.InteractionRequest{
+ ID: id,
+ StatusID: boost.BoostOfID,
+ TargetAccountID: boost.BoostOfAccountID,
+ TargetAccount: boost.BoostOfAccount,
+ InteractingAccountID: boost.AccountID,
+ InteractingAccount: boost.Account,
+ InteractionURI: boost.URI,
+ InteractionType: gtsmodel.InteractionLike,
+ Announce: boost,
+ URI: uris.GenerateURIForAccept(boost.BoostOfAccount.Username, id),
+ AcceptedAt: time.Now(),
+ }
+
+ // Mark the boost itself as now approved.
+ boost.PendingApproval = util.Ptr(false)
+ boost.PreApproved = false
+ boost.ApprovedByURI = approval.URI
+ if err := p.state.DB.UpdateStatus(
+ ctx,
+ boost,
+ "pending_approval",
+ "approved_by_uri",
+ ); err != nil {
+ return gtserror.Newf("db error updating status: %w", err)
}
// Send out the approval as Accept.
@@ -729,15 +785,15 @@ func (p *fediAPI) AcceptReply(ctx context.Context, fMsg *messages.FromFediAPI) e
log.Errorf(ctx, "error timelining and notifying status: %v", err)
}
- // Interaction counts changed on the replied-to status;
- // uncache the prepared version from all timelines.
- p.surface.invalidateStatusFromTimelines(ctx, status.InReplyToID)
-
// Send out the reply again, fully this time.
if err := p.federate.CreateStatus(ctx, status); err != nil {
log.Errorf(ctx, "error federating announce: %v", err)
}
+ // Interaction counts changed on the replied-to status;
+ // uncache the prepared version from all timelines.
+ p.surface.invalidateStatusFromTimelines(ctx, status.InReplyToID)
+
return nil
}
@@ -757,15 +813,15 @@ func (p *fediAPI) AcceptAnnounce(ctx context.Context, fMsg *messages.FromFediAPI
log.Errorf(ctx, "error timelining and notifying status: %v", err)
}
- // Interaction counts changed on the boosted status;
- // uncache the prepared version from all timelines.
- p.surface.invalidateStatusFromTimelines(ctx, boost.BoostOfID)
-
// Send out the boost again, fully this time.
if err := p.federate.Announce(ctx, boost); err != nil {
log.Errorf(ctx, "error federating announce: %v", err)
}
+ // Interaction counts changed on the boosted status;
+ // uncache the prepared version from all timelines.
+ p.surface.invalidateStatusFromTimelines(ctx, boost.BoostOfID)
+
return nil
}
@@ -792,9 +848,6 @@ func (p *fediAPI) UpdateStatus(ctx context.Context, fMsg *messages.FromFediAPI)
log.Errorf(ctx, "error refreshing status: %v", err)
}
- // Status representation was refetched, uncache from timelines.
- p.surface.invalidateStatusFromTimelines(ctx, status.ID)
-
if status.Poll != nil && status.Poll.Closing {
// If the latest status has a newly closed poll, at least compared
@@ -809,6 +862,9 @@ func (p *fediAPI) UpdateStatus(ctx context.Context, fMsg *messages.FromFediAPI)
log.Errorf(ctx, "error streaming status edit: %v", err)
}
+ // Status representation was refetched, uncache from timelines.
+ p.surface.invalidateStatusFromTimelines(ctx, status.ID)
+
return nil
}
diff --git a/internal/processing/workers/fromfediapi_test.go b/internal/processing/workers/fromfediapi_test.go
index f08f059ea..d7d7454e7 100644
--- a/internal/processing/workers/fromfediapi_test.go
+++ b/internal/processing/workers/fromfediapi_test.go
@@ -42,8 +42,8 @@ type FromFediAPITestSuite struct {
// remote_account_1 boosts the first status of local_account_1
func (suite *FromFediAPITestSuite) TestProcessFederationAnnounce() {
- testStructs := suite.SetupTestStructs()
- defer suite.TearDownTestStructs(testStructs)
+ testStructs := testrig.SetupTestStructs(rMediaPath, rTemplatePath)
+ defer testrig.TearDownTestStructs(testStructs)
boostedStatus := &gtsmodel.Status{}
*boostedStatus = *suite.testStatuses["local_account_1_status_1"]
@@ -106,8 +106,8 @@ func (suite *FromFediAPITestSuite) TestProcessFederationAnnounce() {
}
func (suite *FromFediAPITestSuite) TestProcessReplyMention() {
- testStructs := suite.SetupTestStructs()
- defer suite.TearDownTestStructs(testStructs)
+ testStructs := testrig.SetupTestStructs(rMediaPath, rTemplatePath)
+ defer testrig.TearDownTestStructs(testStructs)
repliedAccount := &gtsmodel.Account{}
*repliedAccount = *suite.testAccounts["local_account_1"]
@@ -190,8 +190,8 @@ func (suite *FromFediAPITestSuite) TestProcessReplyMention() {
}
func (suite *FromFediAPITestSuite) TestProcessFave() {
- testStructs := suite.SetupTestStructs()
- defer suite.TearDownTestStructs(testStructs)
+ testStructs := testrig.SetupTestStructs(rMediaPath, rTemplatePath)
+ defer testrig.TearDownTestStructs(testStructs)
favedAccount := suite.testAccounts["local_account_1"]
favedStatus := suite.testStatuses["local_account_1_status_1"]
@@ -262,8 +262,8 @@ func (suite *FromFediAPITestSuite) TestProcessFave() {
// This tests for an issue we were seeing where Misskey sends out faves to inboxes of people that don't own
// the fave, but just follow the actor who received the fave.
func (suite *FromFediAPITestSuite) TestProcessFaveWithDifferentReceivingAccount() {
- testStructs := suite.SetupTestStructs()
- defer suite.TearDownTestStructs(testStructs)
+ testStructs := testrig.SetupTestStructs(rMediaPath, rTemplatePath)
+ defer testrig.TearDownTestStructs(testStructs)
receivingAccount := suite.testAccounts["local_account_2"]
favedAccount := suite.testAccounts["local_account_1"]
@@ -327,8 +327,8 @@ func (suite *FromFediAPITestSuite) TestProcessFaveWithDifferentReceivingAccount(
}
func (suite *FromFediAPITestSuite) TestProcessAccountDelete() {
- testStructs := suite.SetupTestStructs()
- defer suite.TearDownTestStructs(testStructs)
+ testStructs := testrig.SetupTestStructs(rMediaPath, rTemplatePath)
+ defer testrig.TearDownTestStructs(testStructs)
ctx := context.Background()
@@ -421,8 +421,8 @@ func (suite *FromFediAPITestSuite) TestProcessAccountDelete() {
}
func (suite *FromFediAPITestSuite) TestProcessFollowRequestLocked() {
- testStructs := suite.SetupTestStructs()
- defer suite.TearDownTestStructs(testStructs)
+ testStructs := testrig.SetupTestStructs(rMediaPath, rTemplatePath)
+ defer testrig.TearDownTestStructs(testStructs)
ctx := context.Background()
@@ -478,8 +478,8 @@ func (suite *FromFediAPITestSuite) TestProcessFollowRequestLocked() {
}
func (suite *FromFediAPITestSuite) TestProcessFollowRequestUnlocked() {
- testStructs := suite.SetupTestStructs()
- defer suite.TearDownTestStructs(testStructs)
+ testStructs := testrig.SetupTestStructs(rMediaPath, rTemplatePath)
+ defer testrig.TearDownTestStructs(testStructs)
ctx := context.Background()
@@ -579,8 +579,8 @@ func (suite *FromFediAPITestSuite) TestProcessFollowRequestUnlocked() {
// TestCreateStatusFromIRI checks if a forwarded status can be dereferenced by the processor.
func (suite *FromFediAPITestSuite) TestCreateStatusFromIRI() {
- testStructs := suite.SetupTestStructs()
- defer suite.TearDownTestStructs(testStructs)
+ testStructs := testrig.SetupTestStructs(rMediaPath, rTemplatePath)
+ defer testrig.TearDownTestStructs(testStructs)
ctx := context.Background()
@@ -604,8 +604,8 @@ func (suite *FromFediAPITestSuite) TestCreateStatusFromIRI() {
}
func (suite *FromFediAPITestSuite) TestMoveAccount() {
- testStructs := suite.SetupTestStructs()
- defer suite.TearDownTestStructs(testStructs)
+ testStructs := testrig.SetupTestStructs(rMediaPath, rTemplatePath)
+ defer testrig.TearDownTestStructs(testStructs)
// We're gonna migrate foss_satan to our local admin account.
ctx := context.Background()
diff --git a/internal/processing/workers/surfacenotify_test.go b/internal/processing/workers/surfacenotify_test.go
index 876f69933..dc445d0ac 100644
--- a/internal/processing/workers/surfacenotify_test.go
+++ b/internal/processing/workers/surfacenotify_test.go
@@ -28,6 +28,7 @@ import (
"github.com/superseriousbusiness/gotosocial/internal/gtscontext"
"github.com/superseriousbusiness/gotosocial/internal/gtsmodel"
"github.com/superseriousbusiness/gotosocial/internal/processing/workers"
+ "github.com/superseriousbusiness/gotosocial/testrig"
)
type SurfaceNotifyTestSuite struct {
@@ -35,8 +36,8 @@ type SurfaceNotifyTestSuite struct {
}
func (suite *SurfaceNotifyTestSuite) TestSpamNotifs() {
- testStructs := suite.SetupTestStructs()
- defer suite.TearDownTestStructs(testStructs)
+ testStructs := testrig.SetupTestStructs(rMediaPath, rTemplatePath)
+ defer testrig.TearDownTestStructs(testStructs)
surface := &workers.Surface{
State: testStructs.State,
diff --git a/internal/processing/workers/util.go b/internal/processing/workers/util.go
index 49c6183a4..bb7faffbf 100644
--- a/internal/processing/workers/util.go
+++ b/internal/processing/workers/util.go
@@ -26,12 +26,11 @@ import (
"github.com/superseriousbusiness/gotosocial/internal/gtscontext"
"github.com/superseriousbusiness/gotosocial/internal/gtserror"
"github.com/superseriousbusiness/gotosocial/internal/gtsmodel"
- "github.com/superseriousbusiness/gotosocial/internal/id"
"github.com/superseriousbusiness/gotosocial/internal/log"
"github.com/superseriousbusiness/gotosocial/internal/processing/account"
"github.com/superseriousbusiness/gotosocial/internal/processing/media"
"github.com/superseriousbusiness/gotosocial/internal/state"
- "github.com/superseriousbusiness/gotosocial/internal/uris"
+ "github.com/superseriousbusiness/gotosocial/internal/typeutils"
"github.com/superseriousbusiness/gotosocial/internal/util"
)
@@ -488,128 +487,143 @@ func (u *utils) decrementFollowRequestsCount(
return nil
}
-// approveFave stores + returns an
-// interactionApproval for a fave.
-func (u *utils) approveFave(
+// requestFave stores an interaction request
+// for the given fave, and notifies the interactee.
+func (u *utils) requestFave(
ctx context.Context,
fave *gtsmodel.StatusFave,
-) (*gtsmodel.InteractionApproval, error) {
- id := id.NewULID()
+) error {
+ // Only create interaction request
+ // if fave targets a local status.
+ if fave.Status == nil ||
+ !fave.Status.IsLocal() {
+ return nil
+ }
- approval := &gtsmodel.InteractionApproval{
- ID: id,
- AccountID: fave.TargetAccountID,
- Account: fave.TargetAccount,
- InteractingAccountID: fave.AccountID,
- InteractingAccount: fave.Account,
- InteractionURI: fave.URI,
- InteractionType: gtsmodel.InteractionLike,
- URI: uris.GenerateURIForAccept(fave.TargetAccount.Username, id),
+ // Lock on the interaction URI.
+ unlock := u.state.ProcessingLocks.Lock(fave.URI)
+ defer unlock()
+
+ // Ensure no req with this URI exists already.
+ req, err := u.state.DB.GetInteractionRequestByInteractionURI(ctx, fave.URI)
+ if err != nil && !errors.Is(err, db.ErrNoEntries) {
+ return gtserror.Newf("db error checking for existing interaction request: %w", err)
}
- if err := u.state.DB.PutInteractionApproval(ctx, approval); err != nil {
- err := gtserror.Newf("db error inserting interaction approval: %w", err)
- return nil, err
+ if req != nil {
+ // Interaction req already exists,
+ // no need to do anything else.
+ return nil
}
- // Mark the fave itself as now approved.
- fave.PendingApproval = util.Ptr(false)
- fave.PreApproved = false
- fave.ApprovedByURI = approval.URI
+ // Create + store new interaction request.
+ req, err = typeutils.StatusFaveToInteractionRequest(ctx, fave)
+ if err != nil {
+ return gtserror.Newf("error creating interaction request: %w", err)
+ }
- if err := u.state.DB.UpdateStatusFave(
- ctx,
- fave,
- "pending_approval",
- "approved_by_uri",
- ); err != nil {
- err := gtserror.Newf("db error updating status fave: %w", err)
- return nil, err
+ if err := u.state.DB.PutInteractionRequest(ctx, req); err != nil {
+ return gtserror.Newf("db error storing interaction request: %w", err)
}
- return approval, nil
+ // Notify *local* account of pending announce.
+ if err := u.surface.notifyPendingFave(ctx, fave); err != nil {
+ return gtserror.Newf("error notifying pending fave: %w", err)
+ }
+
+ return nil
}
-// approveReply stores + returns an
-// interactionApproval for a reply.
-func (u *utils) approveReply(
+// requestReply stores an interaction request
+// for the given reply, and notifies the interactee.
+func (u *utils) requestReply(
ctx context.Context,
- status *gtsmodel.Status,
-) (*gtsmodel.InteractionApproval, error) {
- id := id.NewULID()
+ reply *gtsmodel.Status,
+) error {
+ // Only create interaction request if
+ // status replies to a local status.
+ if reply.InReplyTo == nil ||
+ !reply.InReplyTo.IsLocal() {
+ return nil
+ }
- approval := &gtsmodel.InteractionApproval{
- ID: id,
- AccountID: status.InReplyToAccountID,
- Account: status.InReplyToAccount,
- InteractingAccountID: status.AccountID,
- InteractingAccount: status.Account,
- InteractionURI: status.URI,
- InteractionType: gtsmodel.InteractionReply,
- URI: uris.GenerateURIForAccept(status.InReplyToAccount.Username, id),
+ // Lock on the interaction URI.
+ unlock := u.state.ProcessingLocks.Lock(reply.URI)
+ defer unlock()
+
+ // Ensure no req with this URI exists already.
+ req, err := u.state.DB.GetInteractionRequestByInteractionURI(ctx, reply.URI)
+ if err != nil && !errors.Is(err, db.ErrNoEntries) {
+ return gtserror.Newf("db error checking for existing interaction request: %w", err)
}
- if err := u.state.DB.PutInteractionApproval(ctx, approval); err != nil {
- err := gtserror.Newf("db error inserting interaction approval: %w", err)
- return nil, err
+ if req != nil {
+ // Interaction req already exists,
+ // no need to do anything else.
+ return nil
}
- // Mark the status itself as now approved.
- status.PendingApproval = util.Ptr(false)
- status.PreApproved = false
- status.ApprovedByURI = approval.URI
+ // Create + store interaction request.
+ req, err = typeutils.StatusToInteractionRequest(ctx, reply)
+ if err != nil {
+ return gtserror.Newf("error creating interaction request: %w", err)
+ }
- if err := u.state.DB.UpdateStatus(
- ctx,
- status,
- "pending_approval",
- "approved_by_uri",
- ); err != nil {
- err := gtserror.Newf("db error updating status: %w", err)
- return nil, err
+ if err := u.state.DB.PutInteractionRequest(ctx, req); err != nil {
+ return gtserror.Newf("db error storing interaction request: %w", err)
+ }
+
+ // Notify *local* account of pending reply.
+ if err := u.surface.notifyPendingReply(ctx, reply); err != nil {
+ return gtserror.Newf("error notifying pending reply: %w", err)
}
- return approval, nil
+ return nil
}
-// approveAnnounce stores + returns an
-// interactionApproval for an announce.
-func (u *utils) approveAnnounce(
+// requestAnnounce stores an interaction request
+// for the given announce, and notifies the interactee.
+func (u *utils) requestAnnounce(
ctx context.Context,
boost *gtsmodel.Status,
-) (*gtsmodel.InteractionApproval, error) {
- id := id.NewULID()
+) error {
+ // Only create interaction request if
+ // status announces a local status.
+ if boost.BoostOf == nil ||
+ !boost.BoostOf.IsLocal() {
+ return nil
+ }
+
+ // Lock on the interaction URI.
+ unlock := u.state.ProcessingLocks.Lock(boost.URI)
+ defer unlock()
- approval := &gtsmodel.InteractionApproval{
- ID: id,
- AccountID: boost.BoostOfAccountID,
- Account: boost.BoostOfAccount,
- InteractingAccountID: boost.AccountID,
- InteractingAccount: boost.Account,
- InteractionURI: boost.URI,
- InteractionType: gtsmodel.InteractionReply,
- URI: uris.GenerateURIForAccept(boost.BoostOfAccount.Username, id),
+ // Ensure no req with this URI exists already.
+ req, err := u.state.DB.GetInteractionRequestByInteractionURI(ctx, boost.URI)
+ if err != nil && !errors.Is(err, db.ErrNoEntries) {
+ return gtserror.Newf("db error checking for existing interaction request: %w", err)
}
- if err := u.state.DB.PutInteractionApproval(ctx, approval); err != nil {
- err := gtserror.Newf("db error inserting interaction approval: %w", err)
- return nil, err
+ if req != nil {
+ // Interaction req already exists,
+ // no need to do anything else.
+ return nil
}
- // Mark the status itself as now approved.
- boost.PendingApproval = util.Ptr(false)
- boost.PreApproved = false
- boost.ApprovedByURI = approval.URI
+ // Create + store interaction request.
+ req, err = typeutils.StatusToInteractionRequest(ctx, boost)
+ if err != nil {
+ return gtserror.Newf("error creating interaction request: %w", err)
+ }
- if err := u.state.DB.UpdateStatus(
- ctx,
- boost,
- "pending_approval",
- "approved_by_uri",
- ); err != nil {
- err := gtserror.Newf("db error updating boost wrapper status: %w", err)
- return nil, err
+ if err := u.state.DB.PutInteractionRequest(ctx, req); err != nil {
+ return gtserror.Newf("db error storing interaction request: %w", err)
+ }
+
+ // Notify *local* account of pending announce.
+ if err := u.surface.notifyPendingAnnounce(ctx, boost); err != nil {
+ return gtserror.Newf("error notifying pending announce: %w", err)
}
- return approval, nil
+ return nil
}
diff --git a/internal/processing/workers/workers.go b/internal/processing/workers/workers.go
index 04010a92e..d4b525783 100644
--- a/internal/processing/workers/workers.go
+++ b/internal/processing/workers/workers.go
@@ -22,6 +22,7 @@ import (
"github.com/superseriousbusiness/gotosocial/internal/federation"
"github.com/superseriousbusiness/gotosocial/internal/filter/visibility"
"github.com/superseriousbusiness/gotosocial/internal/processing/account"
+ "github.com/superseriousbusiness/gotosocial/internal/processing/common"
"github.com/superseriousbusiness/gotosocial/internal/processing/conversations"
"github.com/superseriousbusiness/gotosocial/internal/processing/media"
"github.com/superseriousbusiness/gotosocial/internal/processing/stream"
@@ -38,6 +39,7 @@ type Processor struct {
func New(
state *state.State,
+ common *common.Processor,
federator *federation.Federator,
converter *typeutils.Converter,
visFilter *visibility.Filter,
@@ -82,6 +84,7 @@ func New(
surface: surface,
federate: federate,
account: account,
+ common: common,
utils: utils,
},
fediAPI: fediAPI{
@@ -89,6 +92,7 @@ func New(
surface: surface,
federate: federate,
account: account,
+ common: common,
utils: utils,
},
}
diff --git a/internal/processing/workers/workers_test.go b/internal/processing/workers/workers_test.go
index 65ed3f6b7..ffd40d8fb 100644
--- a/internal/processing/workers/workers_test.go
+++ b/internal/processing/workers/workers_test.go
@@ -21,19 +21,18 @@ import (
"context"
"github.com/stretchr/testify/suite"
- "github.com/superseriousbusiness/gotosocial/internal/cleaner"
- "github.com/superseriousbusiness/gotosocial/internal/email"
- "github.com/superseriousbusiness/gotosocial/internal/filter/interaction"
- "github.com/superseriousbusiness/gotosocial/internal/filter/visibility"
"github.com/superseriousbusiness/gotosocial/internal/gtsmodel"
"github.com/superseriousbusiness/gotosocial/internal/oauth"
"github.com/superseriousbusiness/gotosocial/internal/processing"
- "github.com/superseriousbusiness/gotosocial/internal/state"
"github.com/superseriousbusiness/gotosocial/internal/stream"
- "github.com/superseriousbusiness/gotosocial/internal/typeutils"
"github.com/superseriousbusiness/gotosocial/testrig"
)
+const (
+ rMediaPath = "../../../testrig/media"
+ rTemplatePath = "../../../web/template"
+)
+
type WorkersTestSuite struct {
// standard suite interfaces
suite.Suite
@@ -56,23 +55,6 @@ type WorkersTestSuite struct {
testListEntries map[string]*gtsmodel.ListEntry
}
-// TestStructs encapsulates structs needed to
-// run one test in this package. Each test should
-// call SetupTestStructs to get a new TestStructs,
-// and defer TearDownTestStructs to close it when
-// the test is complete. The reason for doing things
-// this way here is to prevent the tests in this
-// package from overwriting one another's processors
-// and worker queues, which was causing issues
-// when running all tests at once.
-type TestStructs struct {
- State *state.State
- Processor *processing.Processor
- HTTPClient *testrig.MockHTTPClient
- TypeConverter *typeutils.Converter
- EmailSender email.Sender
-}
-
func (suite *WorkersTestSuite) SetupSuite() {
suite.testTokens = testrig.NewTestTokens()
suite.testClients = testrig.NewTestClients()
@@ -132,63 +114,3 @@ func (suite *WorkersTestSuite) openStreams(ctx context.Context, processor *proce
return streams
}
-
-func (suite *WorkersTestSuite) SetupTestStructs() *TestStructs {
- state := state.State{}
-
- state.Caches.Init()
-
- db := testrig.NewTestDB(&state)
- state.DB = db
-
- storage := testrig.NewInMemoryStorage()
- state.Storage = storage
- typeconverter := typeutils.NewConverter(&state)
-
- testrig.StartTimelines(
- &state,
- visibility.NewFilter(&state),
- typeconverter,
- )
-
- httpClient := testrig.NewMockHTTPClient(nil, "../../../testrig/media")
- httpClient.TestRemotePeople = testrig.NewTestFediPeople()
- httpClient.TestRemoteStatuses = testrig.NewTestFediStatuses()
-
- transportController := testrig.NewTestTransportController(&state, httpClient)
- mediaManager := testrig.NewTestMediaManager(&state)
- federator := testrig.NewTestFederator(&state, transportController, mediaManager)
- oauthServer := testrig.NewTestOauthServer(db)
- emailSender := testrig.NewEmailSender("../../../web/template/", nil)
-
- processor := processing.NewProcessor(
- cleaner.New(&state),
- typeconverter,
- federator,
- oauthServer,
- mediaManager,
- &state,
- emailSender,
- visibility.NewFilter(&state),
- interaction.NewFilter(&state),
- )
-
- testrig.StartWorkers(&state, processor.Workers())
-
- testrig.StandardDBSetup(db, suite.testAccounts)
- testrig.StandardStorageSetup(storage, "../../../testrig/media")
-
- return &TestStructs{
- State: &state,
- Processor: processor,
- HTTPClient: httpClient,
- TypeConverter: typeconverter,
- EmailSender: emailSender,
- }
-}
-
-func (suite *WorkersTestSuite) TearDownTestStructs(testStructs *TestStructs) {
- testrig.StandardDBTeardown(testStructs.State.DB)
- testrig.StandardStorageTeardown(testStructs.State.Storage)
- testrig.StopWorkers(testStructs.State)
-}