diff options
author | 2024-08-24 11:49:37 +0200 | |
---|---|---|
committer | 2024-08-24 11:49:37 +0200 | |
commit | f23f04e0b1d117be714bf91d5266dab219ed741e (patch) | |
tree | 0b3ddd60d51c8729949c3669993910a7f8f32a7b /internal/processing | |
parent | [performance] ffmpeg ffprobe wrapper improvements (#3225) (diff) | |
download | gotosocial-f23f04e0b1d117be714bf91d5266dab219ed741e.tar.xz |
[feature] Interaction requests client api + settings panel (#3215)
* [feature] Interaction requests client api + settings panel
* test accept / reject
* fmt
* don't pin rejected interaction
* use single db model for interaction accept, reject, and request
* swaggor
* env sharting
* append errors
* remove ErrNoEntries checks
* change intReqID to reqID
* rename "pend" to "request"
* markIntsPending -> mark interactionsPending
* use log instead of returning error when rejecting interaction
* empty migration
* jolly renaming
* make interactionURI unique again
* swag grr
* remove unnecessary locks
* invalidate as last step
Diffstat (limited to 'internal/processing')
18 files changed, 1504 insertions, 404 deletions
diff --git a/internal/processing/fedi/accept.go b/internal/processing/fedi/accept.go index 72d810f94..fc699ee08 100644 --- a/internal/processing/fedi/accept.go +++ b/internal/processing/fedi/accept.go @@ -27,14 +27,14 @@ import ( ) // AcceptGet handles the getting of a fedi/activitypub -// representation of a local interaction approval. +// representation of a local interaction acceptance. // // It performs appropriate authentication before // returning a JSON serializable interface. func (p *Processor) AcceptGet( ctx context.Context, requestedUser string, - approvalID string, + reqID string, ) (interface{}, gtserror.WithCode) { // Authenticate incoming request, getting related accounts. auth, errWithCode := p.authenticate(ctx, requestedUser) @@ -52,25 +52,26 @@ func (p *Processor) AcceptGet( receivingAcct := auth.receivingAcct - approval, err := p.state.DB.GetInteractionApprovalByID(ctx, approvalID) + req, err := p.state.DB.GetInteractionRequestByID(ctx, reqID) if err != nil && !errors.Is(err, db.ErrNoEntries) { - err := gtserror.Newf("db error getting approval %s: %w", approvalID, err) + err := gtserror.Newf("db error getting interaction request %s: %w", reqID, err) return nil, gtserror.NewErrorInternalError(err) } - if approval.AccountID != receivingAcct.ID { - const text = "approval does not belong to receiving account" - return nil, gtserror.NewErrorNotFound(errors.New(text)) + if req == nil || !req.IsAccepted() { + // Request doesn't exist or hasn't been accepted. + err := gtserror.Newf("interaction request %s not found", reqID) + return nil, gtserror.NewErrorNotFound(err) } - if approval == nil { - err := gtserror.Newf("approval %s not found", approvalID) - return nil, gtserror.NewErrorNotFound(err) + if req.TargetAccountID != receivingAcct.ID { + const text = "interaction request does not belong to receiving account" + return nil, gtserror.NewErrorNotFound(errors.New(text)) } - accept, err := p.converter.InteractionApprovalToASAccept(ctx, approval) + accept, err := p.converter.InteractionReqToASAccept(ctx, req) if err != nil { - err := gtserror.Newf("error converting approval: %w", err) + err := gtserror.Newf("error converting accept: %w", err) return nil, gtserror.NewErrorInternalError(err) } diff --git a/internal/processing/interactionrequests/accept.go b/internal/processing/interactionrequests/accept.go new file mode 100644 index 000000000..ad86e50d1 --- /dev/null +++ b/internal/processing/interactionrequests/accept.go @@ -0,0 +1,239 @@ +// GoToSocial +// Copyright (C) GoToSocial Authors admin@gotosocial.org +// SPDX-License-Identifier: AGPL-3.0-or-later +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see <http://www.gnu.org/licenses/>. + +package interactionrequests + +import ( + "context" + "time" + + "github.com/superseriousbusiness/gotosocial/internal/ap" + apimodel "github.com/superseriousbusiness/gotosocial/internal/api/model" + "github.com/superseriousbusiness/gotosocial/internal/gtserror" + "github.com/superseriousbusiness/gotosocial/internal/gtsmodel" + "github.com/superseriousbusiness/gotosocial/internal/messages" + "github.com/superseriousbusiness/gotosocial/internal/uris" + "github.com/superseriousbusiness/gotosocial/internal/util" +) + +// Accept accepts an interaction request with the given ID, +// on behalf of the given account (whose post it must target). +func (p *Processor) Accept( + ctx context.Context, + acct *gtsmodel.Account, + reqID string, +) (*apimodel.InteractionRequest, gtserror.WithCode) { + req, err := p.state.DB.GetInteractionRequestByID(ctx, reqID) + if err != nil { + err := gtserror.Newf("db error getting interaction request: %w", err) + return nil, gtserror.NewErrorInternalError(err) + } + + if req.TargetAccountID != acct.ID { + err := gtserror.Newf( + "interaction request %s does not belong to account %s", + reqID, acct.ID, + ) + return nil, gtserror.NewErrorNotFound(err) + } + + if !req.IsPending() { + err := gtserror.Newf( + "interaction request %s has already been handled", + reqID, + ) + return nil, gtserror.NewErrorNotFound(err) + } + + // Lock on the interaction req URI to + // ensure nobody else is modifying it rn. + unlock := p.state.ProcessingLocks.Lock(req.InteractionURI) + defer unlock() + + // Mark the request as accepted + // and generate a URI for it. + req.AcceptedAt = time.Now() + req.URI = uris.GenerateURIForAccept(acct.Username, req.ID) + if err := p.state.DB.UpdateInteractionRequest( + ctx, + req, + "accepted_at", + "uri", + ); err != nil { + err := gtserror.Newf("db error updating interaction request: %w", err) + return nil, gtserror.NewErrorInternalError(err) + } + + switch req.InteractionType { + + case gtsmodel.InteractionLike: + if errWithCode := p.acceptLike(ctx, req); errWithCode != nil { + return nil, errWithCode + } + + case gtsmodel.InteractionReply: + if errWithCode := p.acceptReply(ctx, req); errWithCode != nil { + return nil, errWithCode + } + + case gtsmodel.InteractionAnnounce: + if errWithCode := p.acceptAnnounce(ctx, req); errWithCode != nil { + return nil, errWithCode + } + + default: + err := gtserror.Newf("unknown interaction type for interaction request %s", reqID) + return nil, gtserror.NewErrorInternalError(err) + } + + // Return the now-accepted req to the caller so + // they can do something with it if they need to. + apiReq, err := p.converter.InteractionReqToAPIInteractionReq( + ctx, + req, + acct, + ) + if err != nil { + err := gtserror.Newf("error converting interaction request: %w", err) + return nil, gtserror.NewErrorInternalError(err) + } + + return apiReq, nil +} + +// Package-internal convenience +// function to accept a like. +func (p *Processor) acceptLike( + ctx context.Context, + req *gtsmodel.InteractionRequest, +) gtserror.WithCode { + // If the Like is missing, that means it's + // probably already been undone by someone, + // so there's nothing to actually accept. + if req.Like == nil { + err := gtserror.Newf("no Like found for interaction request %s", req.ID) + return gtserror.NewErrorNotFound(err) + } + + // Update the Like. + req.Like.PendingApproval = util.Ptr(false) + req.Like.PreApproved = false + req.Like.ApprovedByURI = req.URI + if err := p.state.DB.UpdateStatusFave( + ctx, + req.Like, + "pending_approval", + "approved_by_uri", + ); err != nil { + err := gtserror.Newf("db error updating status fave: %w", err) + return gtserror.NewErrorInternalError(err) + } + + // Send the accepted request off through the + // client API processor to handle side effects. + p.state.Workers.Client.Queue.Push(&messages.FromClientAPI{ + APObjectType: ap.ActivityLike, + APActivityType: ap.ActivityAccept, + GTSModel: req, + Origin: req.TargetAccount, + Target: req.InteractingAccount, + }) + + return nil +} + +// Package-internal convenience +// function to accept a reply. +func (p *Processor) acceptReply( + ctx context.Context, + req *gtsmodel.InteractionRequest, +) gtserror.WithCode { + // If the Reply is missing, that means it's + // probably already been undone by someone, + // so there's nothing to actually accept. + if req.Reply == nil { + err := gtserror.Newf("no Reply found for interaction request %s", req.ID) + return gtserror.NewErrorNotFound(err) + } + + // Update the Reply. + req.Reply.PendingApproval = util.Ptr(false) + req.Reply.PreApproved = false + req.Reply.ApprovedByURI = req.URI + if err := p.state.DB.UpdateStatus( + ctx, + req.Reply, + "pending_approval", + "approved_by_uri", + ); err != nil { + err := gtserror.Newf("db error updating status reply: %w", err) + return gtserror.NewErrorInternalError(err) + } + + // Send the accepted request off through the + // client API processor to handle side effects. + p.state.Workers.Client.Queue.Push(&messages.FromClientAPI{ + APObjectType: ap.ObjectNote, + APActivityType: ap.ActivityAccept, + GTSModel: req, + Origin: req.TargetAccount, + Target: req.InteractingAccount, + }) + + return nil +} + +// Package-internal convenience +// function to accept an announce. +func (p *Processor) acceptAnnounce( + ctx context.Context, + req *gtsmodel.InteractionRequest, +) gtserror.WithCode { + // If the Announce is missing, that means it's + // probably already been undone by someone, + // so there's nothing to actually accept. + if req.Reply == nil { + err := gtserror.Newf("no Announce found for interaction request %s", req.ID) + return gtserror.NewErrorNotFound(err) + } + + // Update the Announce. + req.Announce.PendingApproval = util.Ptr(false) + req.Announce.PreApproved = false + req.Announce.ApprovedByURI = req.URI + if err := p.state.DB.UpdateStatus( + ctx, + req.Announce, + "pending_approval", + "approved_by_uri", + ); err != nil { + err := gtserror.Newf("db error updating status announce: %w", err) + return gtserror.NewErrorInternalError(err) + } + + // Send the accepted request off through the + // client API processor to handle side effects. + p.state.Workers.Client.Queue.Push(&messages.FromClientAPI{ + APObjectType: ap.ActivityAnnounce, + APActivityType: ap.ActivityAccept, + GTSModel: req, + Origin: req.TargetAccount, + Target: req.InteractingAccount, + }) + + return nil +} diff --git a/internal/processing/interactionrequests/accept_test.go b/internal/processing/interactionrequests/accept_test.go new file mode 100644 index 000000000..75fb1e512 --- /dev/null +++ b/internal/processing/interactionrequests/accept_test.go @@ -0,0 +1,89 @@ +// GoToSocial +// Copyright (C) GoToSocial Authors admin@gotosocial.org +// SPDX-License-Identifier: AGPL-3.0-or-later +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see <http://www.gnu.org/licenses/>. + +package interactionrequests_test + +import ( + "context" + "testing" + + "github.com/stretchr/testify/suite" + "github.com/superseriousbusiness/gotosocial/internal/gtsmodel" + "github.com/superseriousbusiness/gotosocial/internal/processing/interactionrequests" + "github.com/superseriousbusiness/gotosocial/testrig" +) + +type AcceptTestSuite struct { + InteractionRequestsTestSuite +} + +func (suite *AcceptTestSuite) TestAccept() { + testStructs := testrig.SetupTestStructs(rMediaPath, rTemplatePath) + defer testrig.TearDownTestStructs(testStructs) + + var ( + ctx = context.Background() + state = testStructs.State + acct = suite.testAccounts["local_account_2"] + intReq = suite.testInteractionRequests["admin_account_reply_turtle"] + ) + + // Create interaction reqs processor. + p := interactionrequests.New( + testStructs.Common, + testStructs.State, + testStructs.TypeConverter, + ) + + apiReq, errWithCode := p.Accept(ctx, acct, intReq.ID) + if errWithCode != nil { + suite.FailNow(errWithCode.Error()) + } + + // Get db interaction request. + dbReq, err := state.DB.GetInteractionRequestByID(ctx, apiReq.ID) + if err != nil { + suite.FailNow(err.Error()) + } + suite.True(dbReq.IsAccepted()) + + // Interacting status + // should now be approved. + dbStatus, err := state.DB.GetStatusByURI(ctx, dbReq.InteractionURI) + if err != nil { + suite.FailNow(err.Error()) + } + suite.False(*dbStatus.PendingApproval) + suite.Equal(dbReq.URI, dbStatus.ApprovedByURI) + + // Wait for a notification + // for interacting status. + testrig.WaitFor(func() bool { + notif, err := state.DB.GetNotification( + ctx, + gtsmodel.NotificationMention, + dbStatus.InReplyToAccountID, + dbStatus.AccountID, + dbStatus.ID, + ) + return notif != nil && err == nil + }) +} + +func TestAcceptTestSuite(t *testing.T) { + suite.Run(t, new(AcceptTestSuite)) +} diff --git a/internal/processing/interactionrequests/get.go b/internal/processing/interactionrequests/get.go new file mode 100644 index 000000000..8f8a7f35d --- /dev/null +++ b/internal/processing/interactionrequests/get.go @@ -0,0 +1,141 @@ +// GoToSocial +// Copyright (C) GoToSocial Authors admin@gotosocial.org +// SPDX-License-Identifier: AGPL-3.0-or-later +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see <http://www.gnu.org/licenses/>. + +package interactionrequests + +import ( + "context" + "errors" + "net/url" + "strconv" + + apimodel "github.com/superseriousbusiness/gotosocial/internal/api/model" + apiutil "github.com/superseriousbusiness/gotosocial/internal/api/util" + "github.com/superseriousbusiness/gotosocial/internal/db" + "github.com/superseriousbusiness/gotosocial/internal/gtserror" + "github.com/superseriousbusiness/gotosocial/internal/gtsmodel" + "github.com/superseriousbusiness/gotosocial/internal/log" + "github.com/superseriousbusiness/gotosocial/internal/paging" +) + +// GetPage returns a page of interaction requests targeting +// the requester and (optionally) the given status ID. +func (p *Processor) GetPage( + ctx context.Context, + requester *gtsmodel.Account, + statusID string, + likes bool, + replies bool, + boosts bool, + page *paging.Page, +) (*apimodel.PageableResponse, gtserror.WithCode) { + reqs, err := p.state.DB.GetInteractionsRequestsForAcct( + ctx, + requester.ID, + statusID, + likes, + replies, + boosts, + page, + ) + if err != nil && !errors.Is(err, db.ErrNoEntries) { + err := gtserror.Newf("db error getting interaction requests: %w", err) + return nil, gtserror.NewErrorInternalError(err) + } + + count := len(reqs) + if count == 0 { + return paging.EmptyResponse(), nil + } + + var ( + // Get the lowest and highest + // ID values, used for paging. + lo = reqs[count-1].ID + hi = reqs[0].ID + + // Best-guess items length. + items = make([]interface{}, 0, count) + ) + + for _, req := range reqs { + apiReq, err := p.converter.InteractionReqToAPIInteractionReq( + ctx, req, requester, + ) + if err != nil { + log.Errorf(ctx, "error converting interaction req to api req: %v", err) + continue + } + + // Append req to return items. + items = append(items, apiReq) + } + + // Build extra query params to return in Link header. + extraParams := make(url.Values, 4) + extraParams.Set(apiutil.InteractionFavouritesKey, strconv.FormatBool(likes)) + extraParams.Set(apiutil.InteractionRepliesKey, strconv.FormatBool(replies)) + extraParams.Set(apiutil.InteractionReblogsKey, strconv.FormatBool(boosts)) + if statusID != "" { + extraParams.Set(apiutil.InteractionStatusIDKey, statusID) + } + + return paging.PackageResponse(paging.ResponseParams{ + Items: items, + Path: "/api/v1/interaction_requests", + Next: page.Next(lo, hi), + Prev: page.Prev(lo, hi), + Query: extraParams, + }), nil +} + +// GetOne returns one interaction +// request with the given ID. +func (p *Processor) GetOne( + ctx context.Context, + requester *gtsmodel.Account, + id string, +) (*apimodel.InteractionRequest, gtserror.WithCode) { + req, err := p.state.DB.GetInteractionRequestByID(ctx, id) + if err != nil && !errors.Is(err, db.ErrNoEntries) { + err := gtserror.Newf("db error getting interaction request: %w", err) + return nil, gtserror.NewErrorInternalError(err) + } + + if req == nil { + err := gtserror.New("interaction request not found") + return nil, gtserror.NewErrorNotFound(err) + } + + if req.TargetAccountID != requester.ID { + err := gtserror.Newf( + "interaction request %s does not target account %s", + req.ID, requester.ID, + ) + return nil, gtserror.NewErrorNotFound(err) + } + + apiReq, err := p.converter.InteractionReqToAPIInteractionReq( + ctx, req, requester, + ) + if err != nil { + err := gtserror.Newf("error converting interaction req to api req: %w", err) + return nil, gtserror.NewErrorInternalError(err) + } + + return apiReq, nil +} diff --git a/internal/processing/interactionrequests/interactionrequests.go b/internal/processing/interactionrequests/interactionrequests.go new file mode 100644 index 000000000..d56636233 --- /dev/null +++ b/internal/processing/interactionrequests/interactionrequests.go @@ -0,0 +1,47 @@ +// GoToSocial +// Copyright (C) GoToSocial Authors admin@gotosocial.org +// SPDX-License-Identifier: AGPL-3.0-or-later +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see <http://www.gnu.org/licenses/>. + +package interactionrequests + +import ( + "github.com/superseriousbusiness/gotosocial/internal/processing/common" + "github.com/superseriousbusiness/gotosocial/internal/state" + "github.com/superseriousbusiness/gotosocial/internal/typeutils" +) + +// Processor wraps functionality for getting, +// accepting, and rejecting interaction requests. +type Processor struct { + // common processor logic + c *common.Processor + + state *state.State + converter *typeutils.Converter +} + +// New returns a new interaction requests processor. +func New( + common *common.Processor, + state *state.State, + converter *typeutils.Converter, +) Processor { + return Processor{ + c: common, + state: state, + converter: converter, + } +} diff --git a/internal/processing/interactionrequests/interactionrequests_test.go b/internal/processing/interactionrequests/interactionrequests_test.go new file mode 100644 index 000000000..ce2aa351a --- /dev/null +++ b/internal/processing/interactionrequests/interactionrequests_test.go @@ -0,0 +1,45 @@ +// GoToSocial +// Copyright (C) GoToSocial Authors admin@gotosocial.org +// SPDX-License-Identifier: AGPL-3.0-or-later +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see <http://www.gnu.org/licenses/>. + +package interactionrequests_test + +import ( + "github.com/stretchr/testify/suite" + "github.com/superseriousbusiness/gotosocial/internal/gtsmodel" + "github.com/superseriousbusiness/gotosocial/testrig" +) + +const ( + rMediaPath = "../../../testrig/media" + rTemplatePath = "../../../web/template" +) + +type InteractionRequestsTestSuite struct { + suite.Suite + + testAccounts map[string]*gtsmodel.Account + testStatuses map[string]*gtsmodel.Status + testInteractionRequests map[string]*gtsmodel.InteractionRequest +} + +func (suite *InteractionRequestsTestSuite) SetupTest() { + testrig.InitTestConfig() + testrig.InitTestLog() + suite.testAccounts = testrig.NewTestAccounts() + suite.testStatuses = testrig.NewTestStatuses() + suite.testInteractionRequests = testrig.NewTestInteractionRequests() +} diff --git a/internal/processing/interactionrequests/reject.go b/internal/processing/interactionrequests/reject.go new file mode 100644 index 000000000..dcf07a03f --- /dev/null +++ b/internal/processing/interactionrequests/reject.go @@ -0,0 +1,133 @@ +// GoToSocial +// Copyright (C) GoToSocial Authors admin@gotosocial.org +// SPDX-License-Identifier: AGPL-3.0-or-later +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see <http://www.gnu.org/licenses/>. + +package interactionrequests + +import ( + "context" + "time" + + "github.com/superseriousbusiness/gotosocial/internal/ap" + apimodel "github.com/superseriousbusiness/gotosocial/internal/api/model" + "github.com/superseriousbusiness/gotosocial/internal/gtserror" + "github.com/superseriousbusiness/gotosocial/internal/gtsmodel" + "github.com/superseriousbusiness/gotosocial/internal/messages" + "github.com/superseriousbusiness/gotosocial/internal/uris" +) + +// Reject rejects an interaction request with the given ID, +// on behalf of the given account (whose post it must target). +func (p *Processor) Reject( + ctx context.Context, + acct *gtsmodel.Account, + reqID string, +) (*apimodel.InteractionRequest, gtserror.WithCode) { + req, err := p.state.DB.GetInteractionRequestByID(ctx, reqID) + if err != nil { + err := gtserror.Newf("db error getting interaction request: %w", err) + return nil, gtserror.NewErrorInternalError(err) + } + + if req.TargetAccountID != acct.ID { + err := gtserror.Newf( + "interaction request %s does not belong to account %s", + reqID, acct.ID, + ) + return nil, gtserror.NewErrorNotFound(err) + } + + if !req.IsPending() { + err := gtserror.Newf( + "interaction request %s has already been handled", + reqID, + ) + return nil, gtserror.NewErrorNotFound(err) + } + + // Lock on the interaction req URI to + // ensure nobody else is modifying it rn. + unlock := p.state.ProcessingLocks.Lock(req.InteractionURI) + defer unlock() + + // Mark the request as rejected + // and generate a URI for it. + req.RejectedAt = time.Now() + req.URI = uris.GenerateURIForReject(acct.Username, req.ID) + if err := p.state.DB.UpdateInteractionRequest( + ctx, + req, + "rejected_at", + "uri", + ); err != nil { + err := gtserror.Newf("db error updating interaction request: %w", err) + return nil, gtserror.NewErrorInternalError(err) + } + + switch req.InteractionType { + + case gtsmodel.InteractionLike: + // Send the rejected request off through the + // client API processor to handle side effects. + p.state.Workers.Client.Queue.Push(&messages.FromClientAPI{ + APObjectType: ap.ActivityLike, + APActivityType: ap.ActivityReject, + GTSModel: req, + Origin: req.TargetAccount, + Target: req.InteractingAccount, + }) + + case gtsmodel.InteractionReply: + // Send the rejected request off through the + // client API processor to handle side effects. + p.state.Workers.Client.Queue.Push(&messages.FromClientAPI{ + APObjectType: ap.ObjectNote, + APActivityType: ap.ActivityReject, + GTSModel: req, + Origin: req.TargetAccount, + Target: req.InteractingAccount, + }) + + case gtsmodel.InteractionAnnounce: + // Send the rejected request off through the + // client API processor to handle side effects. + p.state.Workers.Client.Queue.Push(&messages.FromClientAPI{ + APObjectType: ap.ActivityAnnounce, + APActivityType: ap.ActivityReject, + GTSModel: req, + Origin: req.TargetAccount, + Target: req.InteractingAccount, + }) + + default: + err := gtserror.Newf("unknown interaction type for interaction request %s", reqID) + return nil, gtserror.NewErrorInternalError(err) + } + + // Return the now-rejected req to the caller so + // they can do something with it if they need to. + apiReq, err := p.converter.InteractionReqToAPIInteractionReq( + ctx, + req, + acct, + ) + if err != nil { + err := gtserror.Newf("error converting interaction request: %w", err) + return nil, gtserror.NewErrorInternalError(err) + } + + return apiReq, nil +} diff --git a/internal/processing/interactionrequests/reject_test.go b/internal/processing/interactionrequests/reject_test.go new file mode 100644 index 000000000..f1f6aed72 --- /dev/null +++ b/internal/processing/interactionrequests/reject_test.go @@ -0,0 +1,78 @@ +// GoToSocial +// Copyright (C) GoToSocial Authors admin@gotosocial.org +// SPDX-License-Identifier: AGPL-3.0-or-later +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see <http://www.gnu.org/licenses/>. + +package interactionrequests_test + +import ( + "context" + "errors" + "testing" + + "github.com/stretchr/testify/suite" + "github.com/superseriousbusiness/gotosocial/internal/db" + "github.com/superseriousbusiness/gotosocial/internal/gtscontext" + "github.com/superseriousbusiness/gotosocial/internal/processing/interactionrequests" + "github.com/superseriousbusiness/gotosocial/testrig" +) + +type RejectTestSuite struct { + InteractionRequestsTestSuite +} + +func (suite *RejectTestSuite) TestReject() { + testStructs := testrig.SetupTestStructs(rMediaPath, rTemplatePath) + defer testrig.TearDownTestStructs(testStructs) + + var ( + ctx = context.Background() + state = testStructs.State + acct = suite.testAccounts["local_account_2"] + intReq = suite.testInteractionRequests["admin_account_reply_turtle"] + ) + + // Create int reqs processor. + p := interactionrequests.New( + testStructs.Common, + testStructs.State, + testStructs.TypeConverter, + ) + + apiReq, errWithCode := p.Reject(ctx, acct, intReq.ID) + if errWithCode != nil { + suite.FailNow(errWithCode.Error()) + } + + // Get db interaction rejection. + dbReq, err := state.DB.GetInteractionRequestByID(ctx, apiReq.ID) + if err != nil { + suite.FailNow(err.Error()) + } + suite.True(dbReq.IsRejected()) + + // Wait for interacting status to be deleted. + testrig.WaitFor(func() bool { + status, err := state.DB.GetStatusByURI( + gtscontext.SetBarebones(ctx), + dbReq.InteractionURI, + ) + return status == nil && errors.Is(err, db.ErrNoEntries) + }) +} + +func TestRejectTestSuite(t *testing.T) { + suite.Run(t, new(RejectTestSuite)) +} diff --git a/internal/processing/processor.go b/internal/processing/processor.go index 6d39dc103..2ed13d396 100644 --- a/internal/processing/processor.go +++ b/internal/processing/processor.go @@ -34,6 +34,7 @@ import ( "github.com/superseriousbusiness/gotosocial/internal/processing/fedi" filtersv1 "github.com/superseriousbusiness/gotosocial/internal/processing/filters/v1" filtersv2 "github.com/superseriousbusiness/gotosocial/internal/processing/filters/v2" + "github.com/superseriousbusiness/gotosocial/internal/processing/interactionrequests" "github.com/superseriousbusiness/gotosocial/internal/processing/list" "github.com/superseriousbusiness/gotosocial/internal/processing/markers" "github.com/superseriousbusiness/gotosocial/internal/processing/media" @@ -74,25 +75,26 @@ type Processor struct { SUB-PROCESSORS */ - account account.Processor - admin admin.Processor - advancedmigrations advancedmigrations.Processor - conversations conversations.Processor - fedi fedi.Processor - filtersv1 filtersv1.Processor - filtersv2 filtersv2.Processor - list list.Processor - markers markers.Processor - media media.Processor - polls polls.Processor - report report.Processor - search search.Processor - status status.Processor - stream stream.Processor - tags tags.Processor - timeline timeline.Processor - user user.Processor - workers workers.Processor + account account.Processor + admin admin.Processor + advancedmigrations advancedmigrations.Processor + conversations conversations.Processor + fedi fedi.Processor + filtersv1 filtersv1.Processor + filtersv2 filtersv2.Processor + interactionRequests interactionrequests.Processor + list list.Processor + markers markers.Processor + media media.Processor + polls polls.Processor + report report.Processor + search search.Processor + status status.Processor + stream stream.Processor + tags tags.Processor + timeline timeline.Processor + user user.Processor + workers workers.Processor } func (p *Processor) Account() *account.Processor { @@ -123,6 +125,10 @@ func (p *Processor) FiltersV2() *filtersv2.Processor { return &p.filtersv2 } +func (p *Processor) InteractionRequests() *interactionrequests.Processor { + return &p.interactionRequests +} + func (p *Processor) List() *list.Processor { return &p.list } @@ -209,6 +215,7 @@ func NewProcessor( processor.fedi = fedi.New(state, &common, converter, federator, visFilter) processor.filtersv1 = filtersv1.New(state, converter, &processor.stream) processor.filtersv2 = filtersv2.New(state, converter, &processor.stream) + processor.interactionRequests = interactionrequests.New(&common, state, converter) processor.list = list.New(state, converter) processor.markers = markers.New(state, converter) processor.polls = polls.New(&common, state, converter) @@ -227,6 +234,7 @@ func NewProcessor( // and pass subset of sub processors it needs. processor.workers = workers.New( state, + &common, federator, converter, visFilter, diff --git a/internal/processing/workers/federate.go b/internal/processing/workers/federate.go index 6e8b558c8..d14f38902 100644 --- a/internal/processing/workers/federate.go +++ b/internal/processing/workers/federate.go @@ -1127,17 +1127,17 @@ func (f *federate) MoveAccount(ctx context.Context, account *gtsmodel.Account) e func (f *federate) AcceptInteraction( ctx context.Context, - approval *gtsmodel.InteractionApproval, + req *gtsmodel.InteractionRequest, ) error { // Populate model. - if err := f.state.DB.PopulateInteractionApproval(ctx, approval); err != nil { - return gtserror.Newf("error populating approval: %w", err) + if err := f.state.DB.PopulateInteractionRequest(ctx, req); err != nil { + return gtserror.Newf("error populating request: %w", err) } // Bail if interacting account is ours: // we've already accepted internally and // shouldn't send an Accept to ourselves. - if approval.InteractingAccount.IsLocal() { + if req.InteractingAccount.IsLocal() { return nil } @@ -1145,27 +1145,27 @@ func (f *federate) AcceptInteraction( // we can't Accept on another // instance's behalf. (This // should never happen but...) - if approval.Account.IsRemote() { + if req.TargetAccount.IsRemote() { return nil } // Parse relevant URI(s). - outboxIRI, err := parseURI(approval.Account.OutboxURI) + outboxIRI, err := parseURI(req.TargetAccount.OutboxURI) if err != nil { return err } - acceptingAcctIRI, err := parseURI(approval.Account.URI) + acceptingAcctIRI, err := parseURI(req.TargetAccount.URI) if err != nil { return err } - interactingAcctURI, err := parseURI(approval.InteractingAccount.URI) + interactingAcctURI, err := parseURI(req.InteractingAccount.URI) if err != nil { return err } - interactionURI, err := parseURI(approval.InteractionURI) + interactionURI, err := parseURI(req.InteractionURI) if err != nil { return err } @@ -1190,7 +1190,79 @@ func (f *federate) AcceptInteraction( ); err != nil { return gtserror.Newf( "error sending activity %T for %v via outbox %s: %w", - accept, approval.InteractionType, outboxIRI, err, + accept, req.InteractionType, outboxIRI, err, + ) + } + + return nil +} + +func (f *federate) RejectInteraction( + ctx context.Context, + req *gtsmodel.InteractionRequest, +) error { + // Populate model. + if err := f.state.DB.PopulateInteractionRequest(ctx, req); err != nil { + return gtserror.Newf("error populating request: %w", err) + } + + // Bail if interacting account is ours: + // we've already rejected internally and + // shouldn't send an Reject to ourselves. + if req.InteractingAccount.IsLocal() { + return nil + } + + // Bail if account isn't ours: + // we can't Reject on another + // instance's behalf. (This + // should never happen but...) + if req.TargetAccount.IsRemote() { + return nil + } + + // Parse relevant URI(s). + outboxIRI, err := parseURI(req.TargetAccount.OutboxURI) + if err != nil { + return err + } + + rejectingAcctIRI, err := parseURI(req.TargetAccount.URI) + if err != nil { + return err + } + + interactingAcctURI, err := parseURI(req.InteractingAccount.URI) + if err != nil { + return err + } + + interactionURI, err := parseURI(req.InteractionURI) + if err != nil { + return err + } + + // Create a new Reject. + reject := streams.NewActivityStreamsReject() + + // Set interacted-with account + // as Actor of the Reject. + ap.AppendActorIRIs(reject, rejectingAcctIRI) + + // Set the interacted-with object + // as Object of the Reject. + ap.AppendObjectIRIs(reject, interactionURI) + + // Address the Reject To the interacting acct. + ap.AppendTo(reject, interactingAcctURI) + + // Send the Reject via the Actor's outbox. + if _, err := f.FederatingActor().Send( + ctx, outboxIRI, reject, + ); err != nil { + return gtserror.Newf( + "error sending activity %T for %v via outbox %s: %w", + reject, req.InteractionType, outboxIRI, err, ) } diff --git a/internal/processing/workers/fromclientapi.go b/internal/processing/workers/fromclientapi.go index 1a37341f8..c723a6001 100644 --- a/internal/processing/workers/fromclientapi.go +++ b/internal/processing/workers/fromclientapi.go @@ -20,18 +20,23 @@ package workers import ( "context" "errors" + "time" "codeberg.org/gruf/go-kv" "codeberg.org/gruf/go-logger/v2/level" "github.com/superseriousbusiness/gotosocial/internal/ap" "github.com/superseriousbusiness/gotosocial/internal/db" + "github.com/superseriousbusiness/gotosocial/internal/gtscontext" "github.com/superseriousbusiness/gotosocial/internal/gtserror" "github.com/superseriousbusiness/gotosocial/internal/gtsmodel" + "github.com/superseriousbusiness/gotosocial/internal/id" "github.com/superseriousbusiness/gotosocial/internal/log" "github.com/superseriousbusiness/gotosocial/internal/messages" "github.com/superseriousbusiness/gotosocial/internal/processing/account" + "github.com/superseriousbusiness/gotosocial/internal/processing/common" "github.com/superseriousbusiness/gotosocial/internal/state" "github.com/superseriousbusiness/gotosocial/internal/typeutils" + "github.com/superseriousbusiness/gotosocial/internal/uris" "github.com/superseriousbusiness/gotosocial/internal/util" ) @@ -44,6 +49,7 @@ type clientAPI struct { surface *Surface federate *federate account *account.Processor + common *common.Processor utils *utils } @@ -160,6 +166,18 @@ func (p *Processor) ProcessFromClientAPI(ctx context.Context, cMsg *messages.Fro // REJECT USER (ie., new user+account sign-up) case ap.ObjectProfile: return p.clientAPI.RejectUser(ctx, cMsg) + + // REJECT NOTE/STATUS (ie., reject a reply) + case ap.ObjectNote: + return p.clientAPI.RejectReply(ctx, cMsg) + + // REJECT LIKE + case ap.ActivityLike: + return p.clientAPI.RejectLike(ctx, cMsg) + + // REJECT BOOST + case ap.ActivityAnnounce: + return p.clientAPI.RejectAnnounce(ctx, cMsg) } // UNDO SOMETHING @@ -261,15 +279,13 @@ func (p *clientAPI) CreateStatus(ctx context.Context, cMsg *messages.FromClientA // and/or notify the account that's being // interacted with (if it's local): they can // approve or deny the interaction later. - - // Notify *local* account of pending reply. - if err := p.surface.notifyPendingReply(ctx, status); err != nil { - log.Errorf(ctx, "error notifying pending reply: %v", err) + if err := p.utils.requestReply(ctx, status); err != nil { + return gtserror.Newf("error pending reply: %w", err) } // Send Create to *remote* account inbox ONLY. if err := p.federate.CreateStatus(ctx, status); err != nil { - log.Errorf(ctx, "error federating pending reply: %v", err) + return gtserror.Newf("error federating pending reply: %w", err) } // Return early. @@ -285,14 +301,38 @@ func (p *clientAPI) CreateStatus(ctx context.Context, cMsg *messages.FromClientA // sending out the Create with the approval // URI attached. - // Put approval in the database and - // update the status with approvedBy URI. - approval, err := p.utils.approveReply(ctx, status) - if err != nil { - return gtserror.Newf("error pre-approving reply: %w", err) + // Store an already-accepted interaction request. + id := id.NewULID() + approval := >smodel.InteractionRequest{ + ID: id, + StatusID: status.InReplyToID, + TargetAccountID: status.InReplyToAccountID, + TargetAccount: status.InReplyToAccount, + InteractingAccountID: status.AccountID, + InteractingAccount: status.Account, + InteractionURI: status.URI, + InteractionType: gtsmodel.InteractionLike, + Reply: status, + URI: uris.GenerateURIForAccept(status.InReplyToAccount.Username, id), + AcceptedAt: time.Now(), + } + if err := p.state.DB.PutInteractionRequest(ctx, approval); err != nil { + return gtserror.Newf("db error putting pre-approved interaction request: %w", err) + } + + // Mark the status as now approved. + status.PendingApproval = util.Ptr(false) + status.PreApproved = false + status.ApprovedByURI = approval.URI + if err := p.state.DB.UpdateStatus( + ctx, + status, + "pending_approval", + "approved_by_uri", + ); err != nil { + return gtserror.Newf("db error updating status: %w", err) } - // Send out the approval as Accept. if err := p.federate.AcceptInteraction(ctx, approval); err != nil { return gtserror.Newf("error federating pre-approval of reply: %w", err) } @@ -309,16 +349,16 @@ func (p *clientAPI) CreateStatus(ctx context.Context, cMsg *messages.FromClientA log.Errorf(ctx, "error timelining and notifying status: %v", err) } + if err := p.federate.CreateStatus(ctx, status); err != nil { + log.Errorf(ctx, "error federating status: %v", err) + } + if status.InReplyToID != "" { // Interaction counts changed on the replied status; // uncache the prepared version from all timelines. p.surface.invalidateStatusFromTimelines(ctx, status.InReplyToID) } - if err := p.federate.CreateStatus(ctx, status); err != nil { - log.Errorf(ctx, "error federating status: %v", err) - } - return nil } @@ -344,9 +384,6 @@ func (p *clientAPI) CreatePollVote(ctx context.Context, cMsg *messages.FromClien status := vote.Poll.Status status.Poll = vote.Poll - // Interaction counts changed on the source status, uncache from timelines. - p.surface.invalidateStatusFromTimelines(ctx, vote.Poll.StatusID) - if *status.Local { // These are poll votes in a local status, we only need to // federate the updated status model with latest vote counts. @@ -360,6 +397,9 @@ func (p *clientAPI) CreatePollVote(ctx context.Context, cMsg *messages.FromClien } } + // Interaction counts changed on the source status, uncache from timelines. + p.surface.invalidateStatusFromTimelines(ctx, vote.Poll.StatusID) + return nil } @@ -429,10 +469,7 @@ func (p *clientAPI) CreateLike(ctx context.Context, cMsg *messages.FromClientAPI // If pending approval is true then fave must // target a status (either one of ours or a // remote) that requires approval for the fave. - pendingApproval := util.PtrOrValue( - fave.PendingApproval, - false, - ) + pendingApproval := util.PtrOrZero(fave.PendingApproval) switch { case pendingApproval && !fave.PreApproved: @@ -442,15 +479,13 @@ func (p *clientAPI) CreateLike(ctx context.Context, cMsg *messages.FromClientAPI // and/or notify the account that's being // interacted with (if it's local): they can // approve or deny the interaction later. - - // Notify *local* account of pending reply. - if err := p.surface.notifyPendingFave(ctx, fave); err != nil { - log.Errorf(ctx, "error notifying pending fave: %v", err) + if err := p.utils.requestFave(ctx, fave); err != nil { + return gtserror.Newf("error pending fave: %w", err) } // Send Like to *remote* account inbox ONLY. if err := p.federate.Like(ctx, fave); err != nil { - log.Errorf(ctx, "error federating pending Like: %v", err) + return gtserror.Newf("error federating pending Like: %v", err) } // Return early. @@ -466,14 +501,38 @@ func (p *clientAPI) CreateLike(ctx context.Context, cMsg *messages.FromClientAPI // sending out the Like with the approval // URI attached. - // Put approval in the database and - // update the fave with approvedBy URI. - approval, err := p.utils.approveFave(ctx, fave) - if err != nil { - return gtserror.Newf("error pre-approving fave: %w", err) + // Store an already-accepted interaction request. + id := id.NewULID() + approval := >smodel.InteractionRequest{ + ID: id, + StatusID: fave.StatusID, + TargetAccountID: fave.TargetAccountID, + TargetAccount: fave.TargetAccount, + InteractingAccountID: fave.AccountID, + InteractingAccount: fave.Account, + InteractionURI: fave.URI, + InteractionType: gtsmodel.InteractionLike, + Like: fave, + URI: uris.GenerateURIForAccept(fave.TargetAccount.Username, id), + AcceptedAt: time.Now(), + } + if err := p.state.DB.PutInteractionRequest(ctx, approval); err != nil { + return gtserror.Newf("db error putting pre-approved interaction request: %w", err) + } + + // Mark the fave itself as now approved. + fave.PendingApproval = util.Ptr(false) + fave.PreApproved = false + fave.ApprovedByURI = approval.URI + if err := p.state.DB.UpdateStatusFave( + ctx, + fave, + "pending_approval", + "approved_by_uri", + ); err != nil { + return gtserror.Newf("db error updating status fave: %w", err) } - // Send out the approval as Accept. if err := p.federate.AcceptInteraction(ctx, approval); err != nil { return gtserror.Newf("error federating pre-approval of fave: %w", err) } @@ -485,14 +544,14 @@ func (p *clientAPI) CreateLike(ctx context.Context, cMsg *messages.FromClientAPI log.Errorf(ctx, "error notifying fave: %v", err) } - // Interaction counts changed on the faved status; - // uncache the prepared version from all timelines. - p.surface.invalidateStatusFromTimelines(ctx, fave.StatusID) - if err := p.federate.Like(ctx, fave); err != nil { log.Errorf(ctx, "error federating like: %v", err) } + // Interaction counts changed on the faved status; + // uncache the prepared version from all timelines. + p.surface.invalidateStatusFromTimelines(ctx, fave.StatusID) + return nil } @@ -505,10 +564,7 @@ func (p *clientAPI) CreateAnnounce(ctx context.Context, cMsg *messages.FromClien // If pending approval is true then status must // boost a status (either one of ours or a // remote) that requires approval for the boost. - pendingApproval := util.PtrOrValue( - boost.PendingApproval, - false, - ) + pendingApproval := util.PtrOrZero(boost.PendingApproval) switch { case pendingApproval && !boost.PreApproved: @@ -518,15 +574,13 @@ func (p *clientAPI) CreateAnnounce(ctx context.Context, cMsg *messages.FromClien // and/or notify the account that's being // interacted with (if it's local): they can // approve or deny the interaction later. - - // Notify *local* account of pending announce. - if err := p.surface.notifyPendingAnnounce(ctx, boost); err != nil { - log.Errorf(ctx, "error notifying pending boost: %v", err) + if err := p.utils.requestAnnounce(ctx, boost); err != nil { + return gtserror.Newf("error pending boost: %w", err) } // Send Announce to *remote* account inbox ONLY. if err := p.federate.Announce(ctx, boost); err != nil { - log.Errorf(ctx, "error federating pending Announce: %v", err) + return gtserror.Newf("error federating pending Announce: %v", err) } // Return early. @@ -542,14 +596,38 @@ func (p *clientAPI) CreateAnnounce(ctx context.Context, cMsg *messages.FromClien // sending out the Create with the approval // URI attached. - // Put approval in the database and - // update the boost with approvedBy URI. - approval, err := p.utils.approveAnnounce(ctx, boost) - if err != nil { - return gtserror.Newf("error pre-approving boost: %w", err) + // Store an already-accepted interaction request. + id := id.NewULID() + approval := >smodel.InteractionRequest{ + ID: id, + StatusID: boost.BoostOfID, + TargetAccountID: boost.BoostOfAccountID, + TargetAccount: boost.BoostOfAccount, + InteractingAccountID: boost.AccountID, + InteractingAccount: boost.Account, + InteractionURI: boost.URI, + InteractionType: gtsmodel.InteractionLike, + Announce: boost, + URI: uris.GenerateURIForAccept(boost.BoostOfAccount.Username, id), + AcceptedAt: time.Now(), + } + if err := p.state.DB.PutInteractionRequest(ctx, approval); err != nil { + return gtserror.Newf("db error putting pre-approved interaction request: %w", err) + } + + // Mark the boost itself as now approved. + boost.PendingApproval = util.Ptr(false) + boost.PreApproved = false + boost.ApprovedByURI = approval.URI + if err := p.state.DB.UpdateStatus( + ctx, + boost, + "pending_approval", + "approved_by_uri", + ); err != nil { + return gtserror.Newf("db error updating status: %w", err) } - // Send out the approval as Accept. if err := p.federate.AcceptInteraction(ctx, approval); err != nil { return gtserror.Newf("error federating pre-approval of boost: %w", err) } @@ -572,14 +650,14 @@ func (p *clientAPI) CreateAnnounce(ctx context.Context, cMsg *messages.FromClien log.Errorf(ctx, "error notifying boost: %v", err) } - // Interaction counts changed on the boosted status; - // uncache the prepared version from all timelines. - p.surface.invalidateStatusFromTimelines(ctx, boost.BoostOfID) - if err := p.federate.Announce(ctx, boost); err != nil { log.Errorf(ctx, "error federating announce: %v", err) } + // Interaction counts changed on the boosted status; + // uncache the prepared version from all timelines. + p.surface.invalidateStatusFromTimelines(ctx, boost.BoostOfID) + return nil } @@ -629,9 +707,6 @@ func (p *clientAPI) UpdateStatus(ctx context.Context, cMsg *messages.FromClientA log.Errorf(ctx, "error federating status update: %v", err) } - // Status representation has changed, invalidate from timelines. - p.surface.invalidateStatusFromTimelines(ctx, status.ID) - if status.Poll != nil && status.Poll.Closing { // If the latest status has a newly closed poll, at least compared @@ -646,6 +721,9 @@ func (p *clientAPI) UpdateStatus(ctx context.Context, cMsg *messages.FromClientA log.Errorf(ctx, "error streaming status edit: %v", err) } + // Status representation has changed, invalidate from timelines. + p.surface.invalidateStatusFromTimelines(ctx, status.ID) + return nil } @@ -791,14 +869,14 @@ func (p *clientAPI) UndoFave(ctx context.Context, cMsg *messages.FromClientAPI) return gtserror.Newf("%T not parseable as *gtsmodel.StatusFave", cMsg.GTSModel) } - // Interaction counts changed on the faved status; - // uncache the prepared version from all timelines. - p.surface.invalidateStatusFromTimelines(ctx, statusFave.StatusID) - if err := p.federate.UndoLike(ctx, statusFave); err != nil { log.Errorf(ctx, "error federating like undo: %v", err) } + // Interaction counts changed on the faved status; + // uncache the prepared version from all timelines. + p.surface.invalidateStatusFromTimelines(ctx, statusFave.StatusID) + return nil } @@ -821,14 +899,14 @@ func (p *clientAPI) UndoAnnounce(ctx context.Context, cMsg *messages.FromClientA log.Errorf(ctx, "error removing timelined status: %v", err) } - // Interaction counts changed on the boosted status; - // uncache the prepared version from all timelines. - p.surface.invalidateStatusFromTimelines(ctx, status.BoostOfID) - if err := p.federate.UndoAnnounce(ctx, status); err != nil { log.Errorf(ctx, "error federating announce undo: %v", err) } + // Interaction counts changed on the boosted status; + // uncache the prepared version from all timelines. + p.surface.invalidateStatusFromTimelines(ctx, status.BoostOfID) + return nil } @@ -874,16 +952,16 @@ func (p *clientAPI) DeleteStatus(ctx context.Context, cMsg *messages.FromClientA log.Errorf(ctx, "error updating account stats: %v", err) } + if err := p.federate.DeleteStatus(ctx, status); err != nil { + log.Errorf(ctx, "error federating status delete: %v", err) + } + if status.InReplyToID != "" { // Interaction counts changed on the replied status; // uncache the prepared version from all timelines. p.surface.invalidateStatusFromTimelines(ctx, status.InReplyToID) } - if err := p.federate.DeleteStatus(ctx, status); err != nil { - log.Errorf(ctx, "error federating status delete: %v", err) - } - return nil } @@ -1050,16 +1128,188 @@ func (p *clientAPI) RejectUser(ctx context.Context, cMsg *messages.FromClientAPI } func (p *clientAPI) AcceptLike(ctx context.Context, cMsg *messages.FromClientAPI) error { - // TODO + req, ok := cMsg.GTSModel.(*gtsmodel.InteractionRequest) + if !ok { + return gtserror.Newf("%T not parseable as *gtsmodel.InteractionRequest", cMsg.GTSModel) + } + + // Notify the fave (distinct from the notif for the pending fave). + if err := p.surface.notifyFave(ctx, req.Like); err != nil { + log.Errorf(ctx, "error notifying fave: %v", err) + } + + // Send out the Accept. + if err := p.federate.AcceptInteraction(ctx, req); err != nil { + log.Errorf(ctx, "error federating approval of like: %v", err) + } + + // Interaction counts changed on the faved status; + // uncache the prepared version from all timelines. + p.surface.invalidateStatusFromTimelines(ctx, req.Like.StatusID) + return nil } func (p *clientAPI) AcceptReply(ctx context.Context, cMsg *messages.FromClientAPI) error { - // TODO + req, ok := cMsg.GTSModel.(*gtsmodel.InteractionRequest) + if !ok { + return gtserror.Newf("%T not parseable as *gtsmodel.InteractionRequest", cMsg.GTSModel) + } + + var ( + interactingAcct = req.InteractingAccount + reply = req.Reply + ) + + // Update stats for the reply author account. + if err := p.utils.incrementStatusesCount(ctx, interactingAcct, reply); err != nil { + log.Errorf(ctx, "error updating account stats: %v", err) + } + + // Timeline the reply + notify relevant accounts. + if err := p.surface.timelineAndNotifyStatus(ctx, reply); err != nil { + log.Errorf(ctx, "error timelining and notifying status reply: %v", err) + } + + // Send out the Accept. + if err := p.federate.AcceptInteraction(ctx, req); err != nil { + log.Errorf(ctx, "error federating approval of reply: %v", err) + } + + // Interaction counts changed on the replied status; + // uncache the prepared version from all timelines. + p.surface.invalidateStatusFromTimelines(ctx, reply.InReplyToID) + return nil } func (p *clientAPI) AcceptAnnounce(ctx context.Context, cMsg *messages.FromClientAPI) error { - // TODO + req, ok := cMsg.GTSModel.(*gtsmodel.InteractionRequest) + if !ok { + return gtserror.Newf("%T not parseable as *gtsmodel.InteractionRequest", cMsg.GTSModel) + } + + var ( + interactingAcct = req.InteractingAccount + boost = req.Announce + ) + + // Update stats for the boost author account. + if err := p.utils.incrementStatusesCount(ctx, interactingAcct, boost); err != nil { + log.Errorf(ctx, "error updating account stats: %v", err) + } + + // Timeline and notify the announce. + if err := p.surface.timelineAndNotifyStatus(ctx, boost); err != nil { + log.Errorf(ctx, "error timelining and notifying status: %v", err) + } + + // Notify the announce (distinct from the notif for the pending announce). + if err := p.surface.notifyAnnounce(ctx, boost); err != nil { + log.Errorf(ctx, "error notifying announce: %v", err) + } + + // Send out the Accept. + if err := p.federate.AcceptInteraction(ctx, req); err != nil { + log.Errorf(ctx, "error federating approval of announce: %v", err) + } + + // Interaction counts changed on the original status; + // uncache the prepared version from all timelines. + p.surface.invalidateStatusFromTimelines(ctx, boost.BoostOfID) + + return nil +} + +func (p *clientAPI) RejectLike(ctx context.Context, cMsg *messages.FromClientAPI) error { + req, ok := cMsg.GTSModel.(*gtsmodel.InteractionRequest) + if !ok { + return gtserror.Newf("%T not parseable as *gtsmodel.InteractionRequest", cMsg.GTSModel) + } + + // At this point the InteractionRequest should already + // be in the database, we just need to do side effects. + + // Send out the Reject. + if err := p.federate.RejectInteraction(ctx, req); err != nil { + log.Errorf(ctx, "error federating rejection of like: %v", err) + } + + // Get the rejected fave. + fave, err := p.state.DB.GetStatusFaveByURI( + gtscontext.SetBarebones(ctx), + req.InteractionURI, + ) + if err != nil { + return gtserror.Newf("db error getting rejected fave: %w", err) + } + + // Delete the status fave. + if err := p.state.DB.DeleteStatusFaveByID(ctx, fave.ID); err != nil { + return gtserror.Newf("db error deleting status fave: %w", err) + } + + return nil +} + +func (p *clientAPI) RejectReply(ctx context.Context, cMsg *messages.FromClientAPI) error { + req, ok := cMsg.GTSModel.(*gtsmodel.InteractionRequest) + if !ok { + return gtserror.Newf("%T not parseable as *gtsmodel.InteractionRequest", cMsg.GTSModel) + } + + // At this point the InteractionRequest should already + // be in the database, we just need to do side effects. + + // Send out the Reject. + if err := p.federate.RejectInteraction(ctx, req); err != nil { + log.Errorf(ctx, "error federating rejection of reply: %v", err) + } + + // Get the rejected status. + status, err := p.state.DB.GetStatusByURI( + gtscontext.SetBarebones(ctx), + req.InteractionURI, + ) + if err != nil { + return gtserror.Newf("db error getting rejected reply: %w", err) + } + + // Totally wipe the status. + if err := p.utils.wipeStatus(ctx, status, true); err != nil { + return gtserror.Newf("error wiping status: %w", err) + } + + return nil +} + +func (p *clientAPI) RejectAnnounce(ctx context.Context, cMsg *messages.FromClientAPI) error { + req, ok := cMsg.GTSModel.(*gtsmodel.InteractionRequest) + if !ok { + return gtserror.Newf("%T not parseable as *gtsmodel.InteractionRequest", cMsg.GTSModel) + } + + // At this point the InteractionRequest should already + // be in the database, we just need to do side effects. + + // Send out the Reject. + if err := p.federate.RejectInteraction(ctx, req); err != nil { + log.Errorf(ctx, "error federating rejection of announce: %v", err) + } + + // Get the rejected boost. + boost, err := p.state.DB.GetStatusByURI( + gtscontext.SetBarebones(ctx), + req.InteractionURI, + ) + if err != nil { + return gtserror.Newf("db error getting rejected announce: %w", err) + } + + // Totally wipe the status. + if err := p.utils.wipeStatus(ctx, boost, true); err != nil { + return gtserror.Newf("error wiping status: %w", err) + } + return nil } diff --git a/internal/processing/workers/fromclientapi_test.go b/internal/processing/workers/fromclientapi_test.go index b4eae0be0..d330e4c2b 100644 --- a/internal/processing/workers/fromclientapi_test.go +++ b/internal/processing/workers/fromclientapi_test.go @@ -231,8 +231,8 @@ func (suite *FromClientAPITestSuite) conversationJSON( } func (suite *FromClientAPITestSuite) TestProcessCreateStatusWithNotification() { - testStructs := suite.SetupTestStructs() - defer suite.TearDownTestStructs(testStructs) + testStructs := testrig.SetupTestStructs(rMediaPath, rTemplatePath) + defer testrig.TearDownTestStructs(testStructs) var ( ctx = context.Background() @@ -344,8 +344,8 @@ func (suite *FromClientAPITestSuite) TestProcessCreateStatusWithNotification() { } func (suite *FromClientAPITestSuite) TestProcessCreateStatusReply() { - testStructs := suite.SetupTestStructs() - defer suite.TearDownTestStructs(testStructs) + testStructs := testrig.SetupTestStructs(rMediaPath, rTemplatePath) + defer testrig.TearDownTestStructs(testStructs) var ( ctx = context.Background() @@ -412,8 +412,8 @@ func (suite *FromClientAPITestSuite) TestProcessCreateStatusReply() { } func (suite *FromClientAPITestSuite) TestProcessCreateStatusReplyMuted() { - testStructs := suite.SetupTestStructs() - defer suite.TearDownTestStructs(testStructs) + testStructs := testrig.SetupTestStructs(rMediaPath, rTemplatePath) + defer testrig.TearDownTestStructs(testStructs) var ( ctx = context.Background() @@ -473,8 +473,8 @@ func (suite *FromClientAPITestSuite) TestProcessCreateStatusReplyMuted() { } func (suite *FromClientAPITestSuite) TestProcessCreateStatusBoostMuted() { - testStructs := suite.SetupTestStructs() - defer suite.TearDownTestStructs(testStructs) + testStructs := testrig.SetupTestStructs(rMediaPath, rTemplatePath) + defer testrig.TearDownTestStructs(testStructs) var ( ctx = context.Background() @@ -534,8 +534,8 @@ func (suite *FromClientAPITestSuite) TestProcessCreateStatusBoostMuted() { } func (suite *FromClientAPITestSuite) TestProcessCreateStatusListRepliesPolicyListOnlyOK() { - testStructs := suite.SetupTestStructs() - defer suite.TearDownTestStructs(testStructs) + testStructs := testrig.SetupTestStructs(rMediaPath, rTemplatePath) + defer testrig.TearDownTestStructs(testStructs) // We're modifying the test list so take a copy. testList := new(gtsmodel.List) @@ -610,8 +610,8 @@ func (suite *FromClientAPITestSuite) TestProcessCreateStatusListRepliesPolicyLis } func (suite *FromClientAPITestSuite) TestProcessCreateStatusListRepliesPolicyListOnlyNo() { - testStructs := suite.SetupTestStructs() - defer suite.TearDownTestStructs(testStructs) + testStructs := testrig.SetupTestStructs(rMediaPath, rTemplatePath) + defer testrig.TearDownTestStructs(testStructs) // We're modifying the test list so take a copy. testList := new(gtsmodel.List) @@ -691,8 +691,8 @@ func (suite *FromClientAPITestSuite) TestProcessCreateStatusListRepliesPolicyLis } func (suite *FromClientAPITestSuite) TestProcessCreateStatusReplyListRepliesPolicyNone() { - testStructs := suite.SetupTestStructs() - defer suite.TearDownTestStructs(testStructs) + testStructs := testrig.SetupTestStructs(rMediaPath, rTemplatePath) + defer testrig.TearDownTestStructs(testStructs) // We're modifying the test list so take a copy. testList := new(gtsmodel.List) @@ -767,8 +767,8 @@ func (suite *FromClientAPITestSuite) TestProcessCreateStatusReplyListRepliesPoli } func (suite *FromClientAPITestSuite) TestProcessCreateStatusBoost() { - testStructs := suite.SetupTestStructs() - defer suite.TearDownTestStructs(testStructs) + testStructs := testrig.SetupTestStructs(rMediaPath, rTemplatePath) + defer testrig.TearDownTestStructs(testStructs) var ( ctx = context.Background() @@ -831,8 +831,8 @@ func (suite *FromClientAPITestSuite) TestProcessCreateStatusBoost() { } func (suite *FromClientAPITestSuite) TestProcessCreateStatusBoostNoReblogs() { - testStructs := suite.SetupTestStructs() - defer suite.TearDownTestStructs(testStructs) + testStructs := testrig.SetupTestStructs(rMediaPath, rTemplatePath) + defer testrig.TearDownTestStructs(testStructs) var ( ctx = context.Background() @@ -898,8 +898,8 @@ func (suite *FromClientAPITestSuite) TestProcessCreateStatusBoostNoReblogs() { // A DM to a local user should create a conversation and accompanying notification. func (suite *FromClientAPITestSuite) TestProcessCreateStatusWhichBeginsConversation() { - testStructs := suite.SetupTestStructs() - defer suite.TearDownTestStructs(testStructs) + testStructs := testrig.SetupTestStructs(rMediaPath, rTemplatePath) + defer testrig.TearDownTestStructs(testStructs) var ( ctx = context.Background() @@ -984,8 +984,8 @@ func (suite *FromClientAPITestSuite) TestProcessCreateStatusWhichBeginsConversat // A public message to a local user should not result in a conversation notification. func (suite *FromClientAPITestSuite) TestProcessCreateStatusWhichShouldNotCreateConversation() { - testStructs := suite.SetupTestStructs() - defer suite.TearDownTestStructs(testStructs) + testStructs := testrig.SetupTestStructs(rMediaPath, rTemplatePath) + defer testrig.TearDownTestStructs(testStructs) var ( ctx = context.Background() @@ -1054,8 +1054,8 @@ func (suite *FromClientAPITestSuite) TestProcessCreateStatusWhichShouldNotCreate // A public status with a hashtag followed by a local user who does not otherwise follow the author // should end up in the tag-following user's home timeline. func (suite *FromClientAPITestSuite) TestProcessCreateStatusWithFollowedHashtag() { - testStructs := suite.SetupTestStructs() - defer suite.TearDownTestStructs(testStructs) + testStructs := testrig.SetupTestStructs(rMediaPath, rTemplatePath) + defer testrig.TearDownTestStructs(testStructs) var ( ctx = context.Background() @@ -1128,8 +1128,8 @@ func (suite *FromClientAPITestSuite) TestProcessCreateStatusWithFollowedHashtag( // should not end up in the tag-following user's home timeline // if the user has the author blocked. func (suite *FromClientAPITestSuite) TestProcessCreateStatusWithFollowedHashtagAndBlock() { - testStructs := suite.SetupTestStructs() - defer suite.TearDownTestStructs(testStructs) + testStructs := testrig.SetupTestStructs(rMediaPath, rTemplatePath) + defer testrig.TearDownTestStructs(testStructs) var ( ctx = context.Background() @@ -1209,8 +1209,8 @@ func (suite *FromClientAPITestSuite) TestProcessCreateStatusWithFollowedHashtagA // who does not otherwise follow the author or booster // should end up in the tag-following user's home timeline as the original status. func (suite *FromClientAPITestSuite) TestProcessCreateBoostWithFollowedHashtag() { - testStructs := suite.SetupTestStructs() - defer suite.TearDownTestStructs(testStructs) + testStructs := testrig.SetupTestStructs(rMediaPath, rTemplatePath) + defer testrig.TearDownTestStructs(testStructs) var ( ctx = context.Background() @@ -1312,8 +1312,8 @@ func (suite *FromClientAPITestSuite) TestProcessCreateBoostWithFollowedHashtag() // should not end up in the tag-following user's home timeline // if the user has the author blocked. func (suite *FromClientAPITestSuite) TestProcessCreateBoostWithFollowedHashtagAndBlock() { - testStructs := suite.SetupTestStructs() - defer suite.TearDownTestStructs(testStructs) + testStructs := testrig.SetupTestStructs(rMediaPath, rTemplatePath) + defer testrig.TearDownTestStructs(testStructs) var ( ctx = context.Background() @@ -1422,8 +1422,8 @@ func (suite *FromClientAPITestSuite) TestProcessCreateBoostWithFollowedHashtagAn // should not end up in the tag-following user's home timeline // if the user has the booster blocked. func (suite *FromClientAPITestSuite) TestProcessCreateBoostWithFollowedHashtagAndBlockedBoost() { - testStructs := suite.SetupTestStructs() - defer suite.TearDownTestStructs(testStructs) + testStructs := testrig.SetupTestStructs(rMediaPath, rTemplatePath) + defer testrig.TearDownTestStructs(testStructs) var ( ctx = context.Background() @@ -1530,8 +1530,8 @@ func (suite *FromClientAPITestSuite) TestProcessCreateBoostWithFollowedHashtagAn // Updating a public status with a hashtag followed by a local user who does not otherwise follow the author // should stream a status update to the tag-following user's home timeline. func (suite *FromClientAPITestSuite) TestProcessUpdateStatusWithFollowedHashtag() { - testStructs := suite.SetupTestStructs() - defer suite.TearDownTestStructs(testStructs) + testStructs := testrig.SetupTestStructs(rMediaPath, rTemplatePath) + defer testrig.TearDownTestStructs(testStructs) var ( ctx = context.Background() @@ -1601,8 +1601,8 @@ func (suite *FromClientAPITestSuite) TestProcessUpdateStatusWithFollowedHashtag( } func (suite *FromClientAPITestSuite) TestProcessStatusDelete() { - testStructs := suite.SetupTestStructs() - defer suite.TearDownTestStructs(testStructs) + testStructs := testrig.SetupTestStructs(rMediaPath, rTemplatePath) + defer testrig.TearDownTestStructs(testStructs) var ( ctx = context.Background() diff --git a/internal/processing/workers/fromfediapi.go b/internal/processing/workers/fromfediapi.go index ce7c53388..908369ca6 100644 --- a/internal/processing/workers/fromfediapi.go +++ b/internal/processing/workers/fromfediapi.go @@ -20,18 +20,22 @@ package workers import ( "context" "errors" + "time" "codeberg.org/gruf/go-kv" "codeberg.org/gruf/go-logger/v2/level" "github.com/superseriousbusiness/gotosocial/internal/ap" "github.com/superseriousbusiness/gotosocial/internal/db" "github.com/superseriousbusiness/gotosocial/internal/federation/dereferencing" + "github.com/superseriousbusiness/gotosocial/internal/id" + "github.com/superseriousbusiness/gotosocial/internal/uris" "github.com/superseriousbusiness/gotosocial/internal/gtserror" "github.com/superseriousbusiness/gotosocial/internal/gtsmodel" "github.com/superseriousbusiness/gotosocial/internal/log" "github.com/superseriousbusiness/gotosocial/internal/messages" "github.com/superseriousbusiness/gotosocial/internal/processing/account" + "github.com/superseriousbusiness/gotosocial/internal/processing/common" "github.com/superseriousbusiness/gotosocial/internal/state" "github.com/superseriousbusiness/gotosocial/internal/util" ) @@ -44,6 +48,7 @@ type fediAPI struct { surface *Surface federate *federate account *account.Processor + common *common.Processor utils *utils } @@ -231,10 +236,7 @@ func (p *fediAPI) CreateStatus(ctx context.Context, fMsg *messages.FromFediAPI) // If pending approval is true then // status must reply to a LOCAL status // that requires approval for the reply. - pendingApproval := util.PtrOrValue( - status.PendingApproval, - false, - ) + pendingApproval := util.PtrOrZero(status.PendingApproval) switch { case pendingApproval && !status.PreApproved: @@ -242,10 +244,8 @@ func (p *fediAPI) CreateStatus(ctx context.Context, fMsg *messages.FromFediAPI) // preapproved, then just notify the account // that's being interacted with: they can // approve or deny the interaction later. - - // Notify *local* account of pending reply. - if err := p.surface.notifyPendingReply(ctx, status); err != nil { - log.Errorf(ctx, "error notifying pending reply: %v", err) + if err := p.utils.requestReply(ctx, status); err != nil { + return gtserror.Newf("error pending reply: %w", err) } // Return early. @@ -259,11 +259,33 @@ func (p *fediAPI) CreateStatus(ctx context.Context, fMsg *messages.FromFediAPI) // collection. Do the Accept immediately and // then process everything else as normal. - // Put approval in the database and - // update the status with approvedBy URI. - approval, err := p.utils.approveReply(ctx, status) - if err != nil { - return gtserror.Newf("error pre-approving reply: %w", err) + // Store an already-accepted interaction request. + id := id.NewULID() + approval := >smodel.InteractionRequest{ + ID: id, + StatusID: status.InReplyToID, + TargetAccountID: status.InReplyToAccountID, + TargetAccount: status.InReplyToAccount, + InteractingAccountID: status.AccountID, + InteractingAccount: status.Account, + InteractionURI: status.URI, + InteractionType: gtsmodel.InteractionLike, + Reply: status, + URI: uris.GenerateURIForAccept(status.InReplyToAccount.Username, id), + AcceptedAt: time.Now(), + } + + // Mark the status as now approved. + status.PendingApproval = util.Ptr(false) + status.PreApproved = false + status.ApprovedByURI = approval.URI + if err := p.state.DB.UpdateStatus( + ctx, + status, + "pending_approval", + "approved_by_uri", + ); err != nil { + return gtserror.Newf("db error updating status: %w", err) } // Send out the approval as Accept. @@ -279,6 +301,10 @@ func (p *fediAPI) CreateStatus(ctx context.Context, fMsg *messages.FromFediAPI) log.Errorf(ctx, "error updating account stats: %v", err) } + if err := p.surface.timelineAndNotifyStatus(ctx, status); err != nil { + log.Errorf(ctx, "error timelining and notifying status: %v", err) + } + if status.InReplyToID != "" { // Interaction counts changed on the replied status; uncache the // prepared version from all timelines. The status dereferencer @@ -286,10 +312,6 @@ func (p *fediAPI) CreateStatus(ctx context.Context, fMsg *messages.FromFediAPI) p.surface.invalidateStatusFromTimelines(ctx, status.InReplyToID) } - if err := p.surface.timelineAndNotifyStatus(ctx, status); err != nil { - log.Errorf(ctx, "error timelining and notifying status: %v", err) - } - return nil } @@ -320,9 +342,6 @@ func (p *fediAPI) CreatePollVote(ctx context.Context, fMsg *messages.FromFediAPI status := vote.Poll.Status status.Poll = vote.Poll - // Interaction counts changed on the source status, uncache from timelines. - p.surface.invalidateStatusFromTimelines(ctx, vote.Poll.StatusID) - if *status.Local { // Before federating it, increment the // poll vote counts on our local copy. @@ -335,6 +354,9 @@ func (p *fediAPI) CreatePollVote(ctx context.Context, fMsg *messages.FromFediAPI } } + // Interaction counts changed on the source status, uncache from timelines. + p.surface.invalidateStatusFromTimelines(ctx, vote.Poll.StatusID) + return nil } @@ -409,10 +431,7 @@ func (p *fediAPI) CreateLike(ctx context.Context, fMsg *messages.FromFediAPI) er // If pending approval is true then // fave must target a LOCAL status // that requires approval for the fave. - pendingApproval := util.PtrOrValue( - fave.PendingApproval, - false, - ) + pendingApproval := util.PtrOrZero(fave.PendingApproval) switch { case pendingApproval && !fave.PreApproved: @@ -420,10 +439,8 @@ func (p *fediAPI) CreateLike(ctx context.Context, fMsg *messages.FromFediAPI) er // preapproved, then just notify the account // that's being interacted with: they can // approve or deny the interaction later. - - // Notify *local* account of pending fave. - if err := p.surface.notifyPendingFave(ctx, fave); err != nil { - log.Errorf(ctx, "error notifying pending fave: %v", err) + if err := p.utils.requestFave(ctx, fave); err != nil { + return gtserror.Newf("error pending fave: %w", err) } // Return early. @@ -437,11 +454,33 @@ func (p *fediAPI) CreateLike(ctx context.Context, fMsg *messages.FromFediAPI) er // collection. Do the Accept immediately and // then process everything else as normal. - // Put approval in the database and - // update the fave with approvedBy URI. - approval, err := p.utils.approveFave(ctx, fave) - if err != nil { - return gtserror.Newf("error pre-approving fave: %w", err) + // Store an already-accepted interaction request. + id := id.NewULID() + approval := >smodel.InteractionRequest{ + ID: id, + StatusID: fave.StatusID, + TargetAccountID: fave.TargetAccountID, + TargetAccount: fave.TargetAccount, + InteractingAccountID: fave.AccountID, + InteractingAccount: fave.Account, + InteractionURI: fave.URI, + InteractionType: gtsmodel.InteractionLike, + Like: fave, + URI: uris.GenerateURIForAccept(fave.TargetAccount.Username, id), + AcceptedAt: time.Now(), + } + + // Mark the fave itself as now approved. + fave.PendingApproval = util.Ptr(false) + fave.PreApproved = false + fave.ApprovedByURI = approval.URI + if err := p.state.DB.UpdateStatusFave( + ctx, + fave, + "pending_approval", + "approved_by_uri", + ); err != nil { + return gtserror.Newf("db error updating status fave: %w", err) } // Send out the approval as Accept. @@ -496,10 +535,7 @@ func (p *fediAPI) CreateAnnounce(ctx context.Context, fMsg *messages.FromFediAPI // If pending approval is true then // boost must target a LOCAL status // that requires approval for the boost. - pendingApproval := util.PtrOrValue( - boost.PendingApproval, - false, - ) + pendingApproval := util.PtrOrZero(boost.PendingApproval) switch { case pendingApproval && !boost.PreApproved: @@ -507,10 +543,8 @@ func (p *fediAPI) CreateAnnounce(ctx context.Context, fMsg *messages.FromFediAPI // preapproved, then just notify the account // that's being interacted with: they can // approve or deny the interaction later. - - // Notify *local* account of pending announce. - if err := p.surface.notifyPendingAnnounce(ctx, boost); err != nil { - log.Errorf(ctx, "error notifying pending boost: %v", err) + if err := p.utils.requestAnnounce(ctx, boost); err != nil { + return gtserror.Newf("error pending boost: %w", err) } // Return early. @@ -524,11 +558,33 @@ func (p *fediAPI) CreateAnnounce(ctx context.Context, fMsg *messages.FromFediAPI // collection. Do the Accept immediately and // then process everything else as normal. - // Put approval in the database and - // update the boost with approvedBy URI. - approval, err := p.utils.approveAnnounce(ctx, boost) - if err != nil { - return gtserror.Newf("error pre-approving boost: %w", err) + // Store an already-accepted interaction request. + id := id.NewULID() + approval := >smodel.InteractionRequest{ + ID: id, + StatusID: boost.BoostOfID, + TargetAccountID: boost.BoostOfAccountID, + TargetAccount: boost.BoostOfAccount, + InteractingAccountID: boost.AccountID, + InteractingAccount: boost.Account, + InteractionURI: boost.URI, + InteractionType: gtsmodel.InteractionLike, + Announce: boost, + URI: uris.GenerateURIForAccept(boost.BoostOfAccount.Username, id), + AcceptedAt: time.Now(), + } + + // Mark the boost itself as now approved. + boost.PendingApproval = util.Ptr(false) + boost.PreApproved = false + boost.ApprovedByURI = approval.URI + if err := p.state.DB.UpdateStatus( + ctx, + boost, + "pending_approval", + "approved_by_uri", + ); err != nil { + return gtserror.Newf("db error updating status: %w", err) } // Send out the approval as Accept. @@ -729,15 +785,15 @@ func (p *fediAPI) AcceptReply(ctx context.Context, fMsg *messages.FromFediAPI) e log.Errorf(ctx, "error timelining and notifying status: %v", err) } - // Interaction counts changed on the replied-to status; - // uncache the prepared version from all timelines. - p.surface.invalidateStatusFromTimelines(ctx, status.InReplyToID) - // Send out the reply again, fully this time. if err := p.federate.CreateStatus(ctx, status); err != nil { log.Errorf(ctx, "error federating announce: %v", err) } + // Interaction counts changed on the replied-to status; + // uncache the prepared version from all timelines. + p.surface.invalidateStatusFromTimelines(ctx, status.InReplyToID) + return nil } @@ -757,15 +813,15 @@ func (p *fediAPI) AcceptAnnounce(ctx context.Context, fMsg *messages.FromFediAPI log.Errorf(ctx, "error timelining and notifying status: %v", err) } - // Interaction counts changed on the boosted status; - // uncache the prepared version from all timelines. - p.surface.invalidateStatusFromTimelines(ctx, boost.BoostOfID) - // Send out the boost again, fully this time. if err := p.federate.Announce(ctx, boost); err != nil { log.Errorf(ctx, "error federating announce: %v", err) } + // Interaction counts changed on the boosted status; + // uncache the prepared version from all timelines. + p.surface.invalidateStatusFromTimelines(ctx, boost.BoostOfID) + return nil } @@ -792,9 +848,6 @@ func (p *fediAPI) UpdateStatus(ctx context.Context, fMsg *messages.FromFediAPI) log.Errorf(ctx, "error refreshing status: %v", err) } - // Status representation was refetched, uncache from timelines. - p.surface.invalidateStatusFromTimelines(ctx, status.ID) - if status.Poll != nil && status.Poll.Closing { // If the latest status has a newly closed poll, at least compared @@ -809,6 +862,9 @@ func (p *fediAPI) UpdateStatus(ctx context.Context, fMsg *messages.FromFediAPI) log.Errorf(ctx, "error streaming status edit: %v", err) } + // Status representation was refetched, uncache from timelines. + p.surface.invalidateStatusFromTimelines(ctx, status.ID) + return nil } diff --git a/internal/processing/workers/fromfediapi_test.go b/internal/processing/workers/fromfediapi_test.go index f08f059ea..d7d7454e7 100644 --- a/internal/processing/workers/fromfediapi_test.go +++ b/internal/processing/workers/fromfediapi_test.go @@ -42,8 +42,8 @@ type FromFediAPITestSuite struct { // remote_account_1 boosts the first status of local_account_1 func (suite *FromFediAPITestSuite) TestProcessFederationAnnounce() { - testStructs := suite.SetupTestStructs() - defer suite.TearDownTestStructs(testStructs) + testStructs := testrig.SetupTestStructs(rMediaPath, rTemplatePath) + defer testrig.TearDownTestStructs(testStructs) boostedStatus := >smodel.Status{} *boostedStatus = *suite.testStatuses["local_account_1_status_1"] @@ -106,8 +106,8 @@ func (suite *FromFediAPITestSuite) TestProcessFederationAnnounce() { } func (suite *FromFediAPITestSuite) TestProcessReplyMention() { - testStructs := suite.SetupTestStructs() - defer suite.TearDownTestStructs(testStructs) + testStructs := testrig.SetupTestStructs(rMediaPath, rTemplatePath) + defer testrig.TearDownTestStructs(testStructs) repliedAccount := >smodel.Account{} *repliedAccount = *suite.testAccounts["local_account_1"] @@ -190,8 +190,8 @@ func (suite *FromFediAPITestSuite) TestProcessReplyMention() { } func (suite *FromFediAPITestSuite) TestProcessFave() { - testStructs := suite.SetupTestStructs() - defer suite.TearDownTestStructs(testStructs) + testStructs := testrig.SetupTestStructs(rMediaPath, rTemplatePath) + defer testrig.TearDownTestStructs(testStructs) favedAccount := suite.testAccounts["local_account_1"] favedStatus := suite.testStatuses["local_account_1_status_1"] @@ -262,8 +262,8 @@ func (suite *FromFediAPITestSuite) TestProcessFave() { // This tests for an issue we were seeing where Misskey sends out faves to inboxes of people that don't own // the fave, but just follow the actor who received the fave. func (suite *FromFediAPITestSuite) TestProcessFaveWithDifferentReceivingAccount() { - testStructs := suite.SetupTestStructs() - defer suite.TearDownTestStructs(testStructs) + testStructs := testrig.SetupTestStructs(rMediaPath, rTemplatePath) + defer testrig.TearDownTestStructs(testStructs) receivingAccount := suite.testAccounts["local_account_2"] favedAccount := suite.testAccounts["local_account_1"] @@ -327,8 +327,8 @@ func (suite *FromFediAPITestSuite) TestProcessFaveWithDifferentReceivingAccount( } func (suite *FromFediAPITestSuite) TestProcessAccountDelete() { - testStructs := suite.SetupTestStructs() - defer suite.TearDownTestStructs(testStructs) + testStructs := testrig.SetupTestStructs(rMediaPath, rTemplatePath) + defer testrig.TearDownTestStructs(testStructs) ctx := context.Background() @@ -421,8 +421,8 @@ func (suite *FromFediAPITestSuite) TestProcessAccountDelete() { } func (suite *FromFediAPITestSuite) TestProcessFollowRequestLocked() { - testStructs := suite.SetupTestStructs() - defer suite.TearDownTestStructs(testStructs) + testStructs := testrig.SetupTestStructs(rMediaPath, rTemplatePath) + defer testrig.TearDownTestStructs(testStructs) ctx := context.Background() @@ -478,8 +478,8 @@ func (suite *FromFediAPITestSuite) TestProcessFollowRequestLocked() { } func (suite *FromFediAPITestSuite) TestProcessFollowRequestUnlocked() { - testStructs := suite.SetupTestStructs() - defer suite.TearDownTestStructs(testStructs) + testStructs := testrig.SetupTestStructs(rMediaPath, rTemplatePath) + defer testrig.TearDownTestStructs(testStructs) ctx := context.Background() @@ -579,8 +579,8 @@ func (suite *FromFediAPITestSuite) TestProcessFollowRequestUnlocked() { // TestCreateStatusFromIRI checks if a forwarded status can be dereferenced by the processor. func (suite *FromFediAPITestSuite) TestCreateStatusFromIRI() { - testStructs := suite.SetupTestStructs() - defer suite.TearDownTestStructs(testStructs) + testStructs := testrig.SetupTestStructs(rMediaPath, rTemplatePath) + defer testrig.TearDownTestStructs(testStructs) ctx := context.Background() @@ -604,8 +604,8 @@ func (suite *FromFediAPITestSuite) TestCreateStatusFromIRI() { } func (suite *FromFediAPITestSuite) TestMoveAccount() { - testStructs := suite.SetupTestStructs() - defer suite.TearDownTestStructs(testStructs) + testStructs := testrig.SetupTestStructs(rMediaPath, rTemplatePath) + defer testrig.TearDownTestStructs(testStructs) // We're gonna migrate foss_satan to our local admin account. ctx := context.Background() diff --git a/internal/processing/workers/surfacenotify_test.go b/internal/processing/workers/surfacenotify_test.go index 876f69933..dc445d0ac 100644 --- a/internal/processing/workers/surfacenotify_test.go +++ b/internal/processing/workers/surfacenotify_test.go @@ -28,6 +28,7 @@ import ( "github.com/superseriousbusiness/gotosocial/internal/gtscontext" "github.com/superseriousbusiness/gotosocial/internal/gtsmodel" "github.com/superseriousbusiness/gotosocial/internal/processing/workers" + "github.com/superseriousbusiness/gotosocial/testrig" ) type SurfaceNotifyTestSuite struct { @@ -35,8 +36,8 @@ type SurfaceNotifyTestSuite struct { } func (suite *SurfaceNotifyTestSuite) TestSpamNotifs() { - testStructs := suite.SetupTestStructs() - defer suite.TearDownTestStructs(testStructs) + testStructs := testrig.SetupTestStructs(rMediaPath, rTemplatePath) + defer testrig.TearDownTestStructs(testStructs) surface := &workers.Surface{ State: testStructs.State, diff --git a/internal/processing/workers/util.go b/internal/processing/workers/util.go index 49c6183a4..bb7faffbf 100644 --- a/internal/processing/workers/util.go +++ b/internal/processing/workers/util.go @@ -26,12 +26,11 @@ import ( "github.com/superseriousbusiness/gotosocial/internal/gtscontext" "github.com/superseriousbusiness/gotosocial/internal/gtserror" "github.com/superseriousbusiness/gotosocial/internal/gtsmodel" - "github.com/superseriousbusiness/gotosocial/internal/id" "github.com/superseriousbusiness/gotosocial/internal/log" "github.com/superseriousbusiness/gotosocial/internal/processing/account" "github.com/superseriousbusiness/gotosocial/internal/processing/media" "github.com/superseriousbusiness/gotosocial/internal/state" - "github.com/superseriousbusiness/gotosocial/internal/uris" + "github.com/superseriousbusiness/gotosocial/internal/typeutils" "github.com/superseriousbusiness/gotosocial/internal/util" ) @@ -488,128 +487,143 @@ func (u *utils) decrementFollowRequestsCount( return nil } -// approveFave stores + returns an -// interactionApproval for a fave. -func (u *utils) approveFave( +// requestFave stores an interaction request +// for the given fave, and notifies the interactee. +func (u *utils) requestFave( ctx context.Context, fave *gtsmodel.StatusFave, -) (*gtsmodel.InteractionApproval, error) { - id := id.NewULID() +) error { + // Only create interaction request + // if fave targets a local status. + if fave.Status == nil || + !fave.Status.IsLocal() { + return nil + } - approval := >smodel.InteractionApproval{ - ID: id, - AccountID: fave.TargetAccountID, - Account: fave.TargetAccount, - InteractingAccountID: fave.AccountID, - InteractingAccount: fave.Account, - InteractionURI: fave.URI, - InteractionType: gtsmodel.InteractionLike, - URI: uris.GenerateURIForAccept(fave.TargetAccount.Username, id), + // Lock on the interaction URI. + unlock := u.state.ProcessingLocks.Lock(fave.URI) + defer unlock() + + // Ensure no req with this URI exists already. + req, err := u.state.DB.GetInteractionRequestByInteractionURI(ctx, fave.URI) + if err != nil && !errors.Is(err, db.ErrNoEntries) { + return gtserror.Newf("db error checking for existing interaction request: %w", err) } - if err := u.state.DB.PutInteractionApproval(ctx, approval); err != nil { - err := gtserror.Newf("db error inserting interaction approval: %w", err) - return nil, err + if req != nil { + // Interaction req already exists, + // no need to do anything else. + return nil } - // Mark the fave itself as now approved. - fave.PendingApproval = util.Ptr(false) - fave.PreApproved = false - fave.ApprovedByURI = approval.URI + // Create + store new interaction request. + req, err = typeutils.StatusFaveToInteractionRequest(ctx, fave) + if err != nil { + return gtserror.Newf("error creating interaction request: %w", err) + } - if err := u.state.DB.UpdateStatusFave( - ctx, - fave, - "pending_approval", - "approved_by_uri", - ); err != nil { - err := gtserror.Newf("db error updating status fave: %w", err) - return nil, err + if err := u.state.DB.PutInteractionRequest(ctx, req); err != nil { + return gtserror.Newf("db error storing interaction request: %w", err) } - return approval, nil + // Notify *local* account of pending announce. + if err := u.surface.notifyPendingFave(ctx, fave); err != nil { + return gtserror.Newf("error notifying pending fave: %w", err) + } + + return nil } -// approveReply stores + returns an -// interactionApproval for a reply. -func (u *utils) approveReply( +// requestReply stores an interaction request +// for the given reply, and notifies the interactee. +func (u *utils) requestReply( ctx context.Context, - status *gtsmodel.Status, -) (*gtsmodel.InteractionApproval, error) { - id := id.NewULID() + reply *gtsmodel.Status, +) error { + // Only create interaction request if + // status replies to a local status. + if reply.InReplyTo == nil || + !reply.InReplyTo.IsLocal() { + return nil + } - approval := >smodel.InteractionApproval{ - ID: id, - AccountID: status.InReplyToAccountID, - Account: status.InReplyToAccount, - InteractingAccountID: status.AccountID, - InteractingAccount: status.Account, - InteractionURI: status.URI, - InteractionType: gtsmodel.InteractionReply, - URI: uris.GenerateURIForAccept(status.InReplyToAccount.Username, id), + // Lock on the interaction URI. + unlock := u.state.ProcessingLocks.Lock(reply.URI) + defer unlock() + + // Ensure no req with this URI exists already. + req, err := u.state.DB.GetInteractionRequestByInteractionURI(ctx, reply.URI) + if err != nil && !errors.Is(err, db.ErrNoEntries) { + return gtserror.Newf("db error checking for existing interaction request: %w", err) } - if err := u.state.DB.PutInteractionApproval(ctx, approval); err != nil { - err := gtserror.Newf("db error inserting interaction approval: %w", err) - return nil, err + if req != nil { + // Interaction req already exists, + // no need to do anything else. + return nil } - // Mark the status itself as now approved. - status.PendingApproval = util.Ptr(false) - status.PreApproved = false - status.ApprovedByURI = approval.URI + // Create + store interaction request. + req, err = typeutils.StatusToInteractionRequest(ctx, reply) + if err != nil { + return gtserror.Newf("error creating interaction request: %w", err) + } - if err := u.state.DB.UpdateStatus( - ctx, - status, - "pending_approval", - "approved_by_uri", - ); err != nil { - err := gtserror.Newf("db error updating status: %w", err) - return nil, err + if err := u.state.DB.PutInteractionRequest(ctx, req); err != nil { + return gtserror.Newf("db error storing interaction request: %w", err) + } + + // Notify *local* account of pending reply. + if err := u.surface.notifyPendingReply(ctx, reply); err != nil { + return gtserror.Newf("error notifying pending reply: %w", err) } - return approval, nil + return nil } -// approveAnnounce stores + returns an -// interactionApproval for an announce. -func (u *utils) approveAnnounce( +// requestAnnounce stores an interaction request +// for the given announce, and notifies the interactee. +func (u *utils) requestAnnounce( ctx context.Context, boost *gtsmodel.Status, -) (*gtsmodel.InteractionApproval, error) { - id := id.NewULID() +) error { + // Only create interaction request if + // status announces a local status. + if boost.BoostOf == nil || + !boost.BoostOf.IsLocal() { + return nil + } + + // Lock on the interaction URI. + unlock := u.state.ProcessingLocks.Lock(boost.URI) + defer unlock() - approval := >smodel.InteractionApproval{ - ID: id, - AccountID: boost.BoostOfAccountID, - Account: boost.BoostOfAccount, - InteractingAccountID: boost.AccountID, - InteractingAccount: boost.Account, - InteractionURI: boost.URI, - InteractionType: gtsmodel.InteractionReply, - URI: uris.GenerateURIForAccept(boost.BoostOfAccount.Username, id), + // Ensure no req with this URI exists already. + req, err := u.state.DB.GetInteractionRequestByInteractionURI(ctx, boost.URI) + if err != nil && !errors.Is(err, db.ErrNoEntries) { + return gtserror.Newf("db error checking for existing interaction request: %w", err) } - if err := u.state.DB.PutInteractionApproval(ctx, approval); err != nil { - err := gtserror.Newf("db error inserting interaction approval: %w", err) - return nil, err + if req != nil { + // Interaction req already exists, + // no need to do anything else. + return nil } - // Mark the status itself as now approved. - boost.PendingApproval = util.Ptr(false) - boost.PreApproved = false - boost.ApprovedByURI = approval.URI + // Create + store interaction request. + req, err = typeutils.StatusToInteractionRequest(ctx, boost) + if err != nil { + return gtserror.Newf("error creating interaction request: %w", err) + } - if err := u.state.DB.UpdateStatus( - ctx, - boost, - "pending_approval", - "approved_by_uri", - ); err != nil { - err := gtserror.Newf("db error updating boost wrapper status: %w", err) - return nil, err + if err := u.state.DB.PutInteractionRequest(ctx, req); err != nil { + return gtserror.Newf("db error storing interaction request: %w", err) + } + + // Notify *local* account of pending announce. + if err := u.surface.notifyPendingAnnounce(ctx, boost); err != nil { + return gtserror.Newf("error notifying pending announce: %w", err) } - return approval, nil + return nil } diff --git a/internal/processing/workers/workers.go b/internal/processing/workers/workers.go index 04010a92e..d4b525783 100644 --- a/internal/processing/workers/workers.go +++ b/internal/processing/workers/workers.go @@ -22,6 +22,7 @@ import ( "github.com/superseriousbusiness/gotosocial/internal/federation" "github.com/superseriousbusiness/gotosocial/internal/filter/visibility" "github.com/superseriousbusiness/gotosocial/internal/processing/account" + "github.com/superseriousbusiness/gotosocial/internal/processing/common" "github.com/superseriousbusiness/gotosocial/internal/processing/conversations" "github.com/superseriousbusiness/gotosocial/internal/processing/media" "github.com/superseriousbusiness/gotosocial/internal/processing/stream" @@ -38,6 +39,7 @@ type Processor struct { func New( state *state.State, + common *common.Processor, federator *federation.Federator, converter *typeutils.Converter, visFilter *visibility.Filter, @@ -82,6 +84,7 @@ func New( surface: surface, federate: federate, account: account, + common: common, utils: utils, }, fediAPI: fediAPI{ @@ -89,6 +92,7 @@ func New( surface: surface, federate: federate, account: account, + common: common, utils: utils, }, } diff --git a/internal/processing/workers/workers_test.go b/internal/processing/workers/workers_test.go index 65ed3f6b7..ffd40d8fb 100644 --- a/internal/processing/workers/workers_test.go +++ b/internal/processing/workers/workers_test.go @@ -21,19 +21,18 @@ import ( "context" "github.com/stretchr/testify/suite" - "github.com/superseriousbusiness/gotosocial/internal/cleaner" - "github.com/superseriousbusiness/gotosocial/internal/email" - "github.com/superseriousbusiness/gotosocial/internal/filter/interaction" - "github.com/superseriousbusiness/gotosocial/internal/filter/visibility" "github.com/superseriousbusiness/gotosocial/internal/gtsmodel" "github.com/superseriousbusiness/gotosocial/internal/oauth" "github.com/superseriousbusiness/gotosocial/internal/processing" - "github.com/superseriousbusiness/gotosocial/internal/state" "github.com/superseriousbusiness/gotosocial/internal/stream" - "github.com/superseriousbusiness/gotosocial/internal/typeutils" "github.com/superseriousbusiness/gotosocial/testrig" ) +const ( + rMediaPath = "../../../testrig/media" + rTemplatePath = "../../../web/template" +) + type WorkersTestSuite struct { // standard suite interfaces suite.Suite @@ -56,23 +55,6 @@ type WorkersTestSuite struct { testListEntries map[string]*gtsmodel.ListEntry } -// TestStructs encapsulates structs needed to -// run one test in this package. Each test should -// call SetupTestStructs to get a new TestStructs, -// and defer TearDownTestStructs to close it when -// the test is complete. The reason for doing things -// this way here is to prevent the tests in this -// package from overwriting one another's processors -// and worker queues, which was causing issues -// when running all tests at once. -type TestStructs struct { - State *state.State - Processor *processing.Processor - HTTPClient *testrig.MockHTTPClient - TypeConverter *typeutils.Converter - EmailSender email.Sender -} - func (suite *WorkersTestSuite) SetupSuite() { suite.testTokens = testrig.NewTestTokens() suite.testClients = testrig.NewTestClients() @@ -132,63 +114,3 @@ func (suite *WorkersTestSuite) openStreams(ctx context.Context, processor *proce return streams } - -func (suite *WorkersTestSuite) SetupTestStructs() *TestStructs { - state := state.State{} - - state.Caches.Init() - - db := testrig.NewTestDB(&state) - state.DB = db - - storage := testrig.NewInMemoryStorage() - state.Storage = storage - typeconverter := typeutils.NewConverter(&state) - - testrig.StartTimelines( - &state, - visibility.NewFilter(&state), - typeconverter, - ) - - httpClient := testrig.NewMockHTTPClient(nil, "../../../testrig/media") - httpClient.TestRemotePeople = testrig.NewTestFediPeople() - httpClient.TestRemoteStatuses = testrig.NewTestFediStatuses() - - transportController := testrig.NewTestTransportController(&state, httpClient) - mediaManager := testrig.NewTestMediaManager(&state) - federator := testrig.NewTestFederator(&state, transportController, mediaManager) - oauthServer := testrig.NewTestOauthServer(db) - emailSender := testrig.NewEmailSender("../../../web/template/", nil) - - processor := processing.NewProcessor( - cleaner.New(&state), - typeconverter, - federator, - oauthServer, - mediaManager, - &state, - emailSender, - visibility.NewFilter(&state), - interaction.NewFilter(&state), - ) - - testrig.StartWorkers(&state, processor.Workers()) - - testrig.StandardDBSetup(db, suite.testAccounts) - testrig.StandardStorageSetup(storage, "../../../testrig/media") - - return &TestStructs{ - State: &state, - Processor: processor, - HTTPClient: httpClient, - TypeConverter: typeconverter, - EmailSender: emailSender, - } -} - -func (suite *WorkersTestSuite) TearDownTestStructs(testStructs *TestStructs) { - testrig.StandardDBTeardown(testStructs.State.DB) - testrig.StandardStorageTeardown(testStructs.State.Storage) - testrig.StopWorkers(testStructs.State) -} |