diff options
Diffstat (limited to 'internal/processing')
28 files changed, 1573 insertions, 546 deletions
diff --git a/internal/processing/account/account_test.go b/internal/processing/account/account_test.go index 5d48d1210..3fa8c8991 100644 --- a/internal/processing/account/account_test.go +++ b/internal/processing/account/account_test.go @@ -88,6 +88,13 @@ func (suite *AccountStandardTestSuite) SetupTest() { suite.db = testrig.NewTestDB(&suite.state) suite.state.DB = suite.db suite.tc = testrig.NewTestTypeConverter(suite.db) + + testrig.StartTimelines( + &suite.state, + visibility.NewFilter(&suite.state), + suite.tc, + ) + suite.storage = testrig.NewInMemoryStorage() suite.state.Storage = suite.storage suite.mediaManager = testrig.NewTestMediaManager(&suite.state) diff --git a/internal/processing/account/lists.go b/internal/processing/account/lists.go new file mode 100644 index 000000000..167ed3358 --- /dev/null +++ b/internal/processing/account/lists.go @@ -0,0 +1,107 @@ +// GoToSocial +// Copyright (C) GoToSocial Authors admin@gotosocial.org +// SPDX-License-Identifier: AGPL-3.0-or-later +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see <http://www.gnu.org/licenses/>. + +package account + +import ( + "context" + "errors" + "fmt" + + apimodel "github.com/superseriousbusiness/gotosocial/internal/api/model" + "github.com/superseriousbusiness/gotosocial/internal/db" + "github.com/superseriousbusiness/gotosocial/internal/gtscontext" + "github.com/superseriousbusiness/gotosocial/internal/gtserror" + "github.com/superseriousbusiness/gotosocial/internal/gtsmodel" + "github.com/superseriousbusiness/gotosocial/internal/log" +) + +var noLists = make([]*apimodel.List, 0) + +// ListsGet returns all lists owned by requestingAccount, which contain a follow for targetAccountID. +func (p *Processor) ListsGet(ctx context.Context, requestingAccount *gtsmodel.Account, targetAccountID string) ([]*apimodel.List, gtserror.WithCode) { + targetAccount, err := p.state.DB.GetAccountByID(ctx, targetAccountID) + if err != nil { + if errors.Is(err, db.ErrNoEntries) { + return nil, gtserror.NewErrorNotFound(errors.New("account not found")) + } + return nil, gtserror.NewErrorInternalError(fmt.Errorf("db error: %w", err)) + } + + visible, err := p.filter.AccountVisible(ctx, requestingAccount, targetAccount) + if err != nil { + return nil, gtserror.NewErrorInternalError(fmt.Errorf("db error: %w", err)) + } + + if !visible { + return nil, gtserror.NewErrorNotFound(errors.New("account not found")) + } + + // Requester has to follow targetAccount + // for them to be in any of their lists. + follow, err := p.state.DB.GetFollow( + // Don't populate follow. + gtscontext.SetBarebones(ctx), + requestingAccount.ID, + targetAccountID, + ) + if err != nil && !errors.Is(err, db.ErrNoEntries) { + return nil, gtserror.NewErrorInternalError(fmt.Errorf("db error: %w", err)) + } + + if follow == nil { + return noLists, nil // by definition we know they're in no lists + } + + listEntries, err := p.state.DB.GetListEntriesForFollowID( + // Don't populate entries. + gtscontext.SetBarebones(ctx), + follow.ID, + ) + if err != nil && !errors.Is(err, db.ErrNoEntries) { + return nil, gtserror.NewErrorInternalError(fmt.Errorf("db error: %w", err)) + } + + count := len(listEntries) + if count == 0 { + return noLists, nil + } + + apiLists := make([]*apimodel.List, 0, count) + for _, listEntry := range listEntries { + list, err := p.state.DB.GetListByID( + // Don't populate list. + gtscontext.SetBarebones(ctx), + listEntry.ListID, + ) + + if err != nil { + log.Debugf(ctx, "skipping list %s due to error %q", listEntry.ListID, err) + continue + } + + apiList, err := p.tc.ListToAPIList(ctx, list) + if err != nil { + log.Debugf(ctx, "skipping list %s due to error %q", listEntry.ListID, err) + continue + } + + apiLists = append(apiLists, apiList) + } + + return apiLists, nil +} diff --git a/internal/processing/fromclientapi.go b/internal/processing/fromclientapi.go index 082a5ba2e..41bf6ee40 100644 --- a/internal/processing/fromclientapi.go +++ b/internal/processing/fromclientapi.go @@ -217,10 +217,10 @@ func (p *Processor) processCreateBlockFromClientAPI(ctx context.Context, clientM } // remove any of the blocking account's statuses from the blocked account's timeline, and vice versa - if err := p.statusTimelines.WipeItemsFromAccountID(ctx, block.AccountID, block.TargetAccountID); err != nil { + if err := p.state.Timelines.Home.WipeItemsFromAccountID(ctx, block.AccountID, block.TargetAccountID); err != nil { return err } - if err := p.statusTimelines.WipeItemsFromAccountID(ctx, block.TargetAccountID, block.AccountID); err != nil { + if err := p.state.Timelines.Home.WipeItemsFromAccountID(ctx, block.TargetAccountID, block.AccountID); err != nil { return err } diff --git a/internal/processing/fromclientapi_test.go b/internal/processing/fromclientapi_test.go index 0b641c091..808f02cd6 100644 --- a/internal/processing/fromclientapi_test.go +++ b/internal/processing/fromclientapi_test.go @@ -20,6 +20,7 @@ package processing_test import ( "context" "encoding/json" + "errors" "testing" "github.com/stretchr/testify/suite" @@ -36,24 +37,21 @@ type FromClientAPITestSuite struct { ProcessingStandardTestSuite } +// This test ensures that when admin_account posts a new +// status, it ends up in the correct streaming timelines +// of local_account_1, which follows it. func (suite *FromClientAPITestSuite) TestProcessStreamNewStatus() { - ctx := context.Background() + var ( + ctx = context.Background() + postingAccount = suite.testAccounts["admin_account"] + receivingAccount = suite.testAccounts["local_account_1"] + testList = suite.testLists["local_account_1_list_1"] + streams = suite.openStreams(ctx, receivingAccount, []string{testList.ID}) + homeStream = streams[stream.TimelineHome] + listStream = streams[stream.TimelineList+":"+testList.ID] + ) - // let's say that the admin account posts a new status: it should end up in the - // timeline of any account that follows it and has a stream open - postingAccount := suite.testAccounts["admin_account"] - receivingAccount := suite.testAccounts["local_account_1"] - - // open a home timeline stream for zork - wssStream, errWithCode := suite.processor.Stream().Open(ctx, receivingAccount, stream.TimelineHome) - suite.NoError(errWithCode) - - // open another stream for zork, but for a different timeline; - // this shouldn't get stuff streamed into it, since it's for the public timeline - irrelevantStream, errWithCode := suite.processor.Stream().Open(ctx, receivingAccount, stream.TimelinePublic) - suite.NoError(errWithCode) - - // make a new status from admin account + // Make a new status from admin account. newStatus := >smodel.Status{ ID: "01FN4B2F88TF9676DYNXWE1WSS", URI: "http://localhost:8080/users/admin/statuses/01FN4B2F88TF9676DYNXWE1WSS", @@ -82,87 +80,110 @@ func (suite *FromClientAPITestSuite) TestProcessStreamNewStatus() { ActivityStreamsType: ap.ObjectNote, } - // put the status in the db first, to mimic what would have already happened earlier up the flow - err := suite.db.PutStatus(ctx, newStatus) - suite.NoError(err) + // Put the status in the db first, to mimic what + // would have already happened earlier up the flow. + if err := suite.db.PutStatus(ctx, newStatus); err != nil { + suite.FailNow(err.Error()) + } - // process the new status - err = suite.processor.ProcessFromClientAPI(ctx, messages.FromClientAPI{ + // Process the new status. + if err := suite.processor.ProcessFromClientAPI(ctx, messages.FromClientAPI{ APObjectType: ap.ObjectNote, APActivityType: ap.ActivityCreate, GTSModel: newStatus, OriginAccount: postingAccount, - }) - suite.NoError(err) + }); err != nil { + suite.FailNow(err.Error()) + } - // zork's stream should have the newly created status in it now - msg := <-wssStream.Messages - suite.Equal(stream.EventTypeUpdate, msg.Event) - suite.NotEmpty(msg.Payload) - suite.EqualValues([]string{stream.TimelineHome}, msg.Stream) - statusStreamed := &apimodel.Status{} - err = json.Unmarshal([]byte(msg.Payload), statusStreamed) - suite.NoError(err) - suite.Equal("01FN4B2F88TF9676DYNXWE1WSS", statusStreamed.ID) - suite.Equal("this status should stream :)", statusStreamed.Content) + // Check message in home stream. + homeMsg := <-homeStream.Messages + suite.Equal(stream.EventTypeUpdate, homeMsg.Event) + suite.EqualValues([]string{stream.TimelineHome}, homeMsg.Stream) + suite.Empty(homeStream.Messages) // Stream should now be empty. - // and stream should now be empty - suite.Empty(wssStream.Messages) + // Check status from home stream. + homeStreamStatus := &apimodel.Status{} + if err := json.Unmarshal([]byte(homeMsg.Payload), homeStreamStatus); err != nil { + suite.FailNow(err.Error()) + } + suite.Equal(newStatus.ID, homeStreamStatus.ID) + suite.Equal(newStatus.Content, homeStreamStatus.Content) + + // Check message in list stream. + listMsg := <-listStream.Messages + suite.Equal(stream.EventTypeUpdate, listMsg.Event) + suite.EqualValues([]string{stream.TimelineList + ":" + testList.ID}, listMsg.Stream) + suite.Empty(listStream.Messages) // Stream should now be empty. - // the irrelevant messages stream should also be empty - suite.Empty(irrelevantStream.Messages) + // Check status from list stream. + listStreamStatus := &apimodel.Status{} + if err := json.Unmarshal([]byte(listMsg.Payload), listStreamStatus); err != nil { + suite.FailNow(err.Error()) + } + suite.Equal(newStatus.ID, listStreamStatus.ID) + suite.Equal(newStatus.Content, listStreamStatus.Content) } func (suite *FromClientAPITestSuite) TestProcessStatusDelete() { - ctx := context.Background() + var ( + ctx = context.Background() + deletingAccount = suite.testAccounts["local_account_1"] + receivingAccount = suite.testAccounts["local_account_2"] + deletedStatus = suite.testStatuses["local_account_1_status_1"] + boostOfDeletedStatus = suite.testStatuses["admin_account_status_4"] + streams = suite.openStreams(ctx, receivingAccount, nil) + homeStream = streams[stream.TimelineHome] + ) - deletingAccount := suite.testAccounts["local_account_1"] - receivingAccount := suite.testAccounts["local_account_2"] - - deletedStatus := suite.testStatuses["local_account_1_status_1"] - boostOfDeletedStatus := suite.testStatuses["admin_account_status_4"] - - // open a home timeline stream for turtle, who follows zork - wssStream, errWithCode := suite.processor.Stream().Open(ctx, receivingAccount, stream.TimelineHome) - suite.NoError(errWithCode) - - // delete the status from the db first, to mimic what would have already happened earlier up the flow - err := suite.db.DeleteStatusByID(ctx, deletedStatus.ID) - suite.NoError(err) + // Delete the status from the db first, to mimic what + // would have already happened earlier up the flow + if err := suite.db.DeleteStatusByID(ctx, deletedStatus.ID); err != nil { + suite.FailNow(err.Error()) + } - // process the status delete - err = suite.processor.ProcessFromClientAPI(ctx, messages.FromClientAPI{ + // Process the status delete. + if err := suite.processor.ProcessFromClientAPI(ctx, messages.FromClientAPI{ APObjectType: ap.ObjectNote, APActivityType: ap.ActivityDelete, GTSModel: deletedStatus, OriginAccount: deletingAccount, - }) - suite.NoError(err) + }); err != nil { + suite.FailNow(err.Error()) + } - // turtle's stream should have the delete of admin's boost in it now - msg := <-wssStream.Messages + // Stream should have the delete of admin's boost in it now. + msg := <-homeStream.Messages suite.Equal(stream.EventTypeDelete, msg.Event) suite.Equal(boostOfDeletedStatus.ID, msg.Payload) suite.EqualValues([]string{stream.TimelineHome}, msg.Stream) - // turtle's stream should also have the delete of the message itself in it - msg = <-wssStream.Messages + // Stream should also have the delete of the message itself in it. + msg = <-homeStream.Messages suite.Equal(stream.EventTypeDelete, msg.Event) suite.Equal(deletedStatus.ID, msg.Payload) suite.EqualValues([]string{stream.TimelineHome}, msg.Stream) - // stream should now be empty - suite.Empty(wssStream.Messages) + // Stream should now be empty. + suite.Empty(homeStream.Messages) - // the boost should no longer be in the database - _, err = suite.db.GetStatusByID(ctx, boostOfDeletedStatus.ID) - suite.ErrorIs(err, db.ErrNoEntries) + // Boost should no longer be in the database. + if !testrig.WaitFor(func() bool { + _, err := suite.db.GetStatusByID(ctx, boostOfDeletedStatus.ID) + return errors.Is(err, db.ErrNoEntries) + }) { + suite.FailNow("timed out waiting for status delete") + } } func (suite *FromClientAPITestSuite) TestProcessNewStatusWithNotification() { - ctx := context.Background() - postingAccount := suite.testAccounts["admin_account"] - receivingAccount := suite.testAccounts["local_account_1"] + var ( + ctx = context.Background() + postingAccount = suite.testAccounts["admin_account"] + receivingAccount = suite.testAccounts["local_account_1"] + streams = suite.openStreams(ctx, receivingAccount, nil) + notifStream = streams[stream.TimelineNotifications] + ) // Update the follow from receiving account -> posting account so // that receiving account wants notifs when posting account posts. @@ -204,8 +225,9 @@ func (suite *FromClientAPITestSuite) TestProcessNewStatusWithNotification() { // Put the status in the db first, to mimic what // would have already happened earlier up the flow. - err := suite.db.PutStatus(ctx, newStatus) - suite.NoError(err) + if err := suite.db.PutStatus(ctx, newStatus); err != nil { + suite.FailNow(err.Error()) + } // Process the new status. if err := suite.processor.ProcessFromClientAPI(ctx, messages.FromClientAPI{ @@ -230,6 +252,19 @@ func (suite *FromClientAPITestSuite) TestProcessNewStatusWithNotification() { }) { suite.FailNow("timed out waiting for new status notification") } + + // Check message in notification stream. + notifMsg := <-notifStream.Messages + suite.Equal(stream.EventTypeNotification, notifMsg.Event) + suite.EqualValues([]string{stream.TimelineNotifications}, notifMsg.Stream) + suite.Empty(notifStream.Messages) // Stream should now be empty. + + // Check notif. + notif := &apimodel.Notification{} + if err := json.Unmarshal([]byte(notifMsg.Payload), notif); err != nil { + suite.FailNow(err.Error()) + } + suite.Equal(newStatus.ID, notif.Status.ID) } func TestFromClientAPITestSuite(t *testing.T) { diff --git a/internal/processing/fromcommon.go b/internal/processing/fromcommon.go index a7ab0b330..0adb576bc 100644 --- a/internal/processing/fromcommon.go +++ b/internal/processing/fromcommon.go @@ -30,12 +30,14 @@ import ( "github.com/superseriousbusiness/gotosocial/internal/gtsmodel" "github.com/superseriousbusiness/gotosocial/internal/id" "github.com/superseriousbusiness/gotosocial/internal/stream" + "github.com/superseriousbusiness/gotosocial/internal/timeline" ) // timelineAndNotifyStatus processes the given new status and inserts it into -// the HOME timelines of accounts that follow the status author. It will also -// handle notifications for any mentions attached to the account, and also -// notifications for any local accounts that want a notif when this account posts. +// the HOME and LIST timelines of accounts that follow the status author. +// +// It will also handle notifications for any mentions attached to the account, and +// also notifications for any local accounts that want to know when this account posts. func (p *Processor) timelineAndNotifyStatus(ctx context.Context, status *gtsmodel.Status) error { // Ensure status fully populated; including account, mentions, etc. if err := p.state.DB.PopulateStatus(ctx, status); err != nil { @@ -89,10 +91,43 @@ func (p *Processor) timelineAndNotifyStatusForFollowers(ctx context.Context, sta continue } + // Add status to each list that this follow + // is included in, and stream it if applicable. + listEntries, err := p.state.DB.GetListEntriesForFollowID( + // We only need the list IDs. + gtscontext.SetBarebones(ctx), + follow.ID, + ) + if err != nil && !errors.Is(err, db.ErrNoEntries) { + errs.Append(fmt.Errorf("timelineAndNotifyStatusForFollowers: error list timelining status: %w", err)) + continue + } + + for _, listEntry := range listEntries { + if _, err := p.timelineStatus( + ctx, + p.state.Timelines.List.IngestOne, + listEntry.ListID, // list timelines are keyed by list ID + follow.Account, + status, + stream.TimelineList+":"+listEntry.ListID, // key streamType to this specific list + ); err != nil { + errs.Append(fmt.Errorf("timelineAndNotifyStatusForFollowers: error list timelining status: %w", err)) + continue + } + } + // Add status to home timeline for this // follower, and stream it if applicable. - if timelined, err := p.timelineStatusForAccount(ctx, follow.Account, status); err != nil { - errs.Append(fmt.Errorf("timelineAndNotifyStatusForFollowers: error timelining status: %w", err)) + if timelined, err := p.timelineStatus( + ctx, + p.state.Timelines.Home.IngestOne, + follow.AccountID, // home timelines are keyed by account ID + follow.Account, + status, + stream.TimelineHome, + ); err != nil { + errs.Append(fmt.Errorf("timelineAndNotifyStatusForFollowers: error home timelining status: %w", err)) continue } else if !timelined { // Status wasn't added to home tomeline, @@ -133,13 +168,21 @@ func (p *Processor) timelineAndNotifyStatusForFollowers(ctx context.Context, sta return errs.Combine() } -// timelineStatusForAccount puts the given status in the HOME timeline -// of the account with given accountID, if it's HomeTimelineable. +// timelineStatus uses the provided ingest function to put the given +// status in a timeline with the given ID, if it's timelineable. // -// If the status was inserted into the home timeline of the given account, -// true will be returned + it will also be streamed via websockets to the user. -func (p *Processor) timelineStatusForAccount(ctx context.Context, account *gtsmodel.Account, status *gtsmodel.Status) (bool, error) { +// If the status was inserted into the timeline, true will be returned +// + it will also be streamed to the user using the given streamType. +func (p *Processor) timelineStatus( + ctx context.Context, + ingest func(context.Context, string, timeline.Timelineable) (bool, error), + timelineID string, + account *gtsmodel.Account, + status *gtsmodel.Status, + streamType string, +) (bool, error) { // Make sure the status is timelineable. + // This works for both home and list timelines. if timelineable, err := p.filter.StatusHomeTimelineable(ctx, account, status); err != nil { err = fmt.Errorf("timelineStatusForAccount: error getting timelineability for status for timeline with id %s: %w", account.ID, err) return false, err @@ -148,8 +191,8 @@ func (p *Processor) timelineStatusForAccount(ctx context.Context, account *gtsmo return false, nil } - // Insert status in the home timeline of account. - if inserted, err := p.statusTimelines.IngestOne(ctx, account.ID, status); err != nil { + // Ingest status into given timeline using provided function. + if inserted, err := ingest(ctx, timelineID, status); err != nil { err = fmt.Errorf("timelineStatusForAccount: error ingesting status %s: %w", status.ID, err) return false, err } else if !inserted { @@ -164,7 +207,7 @@ func (p *Processor) timelineStatusForAccount(ctx context.Context, account *gtsmo return true, err } - if err := p.stream.Update(apiStatus, account, stream.TimelineHome); err != nil { + if err := p.stream.Update(apiStatus, account, []string{streamType}); err != nil { err = fmt.Errorf("timelineStatusForAccount: error streaming update for status %s: %w", status.ID, err) return true, err } @@ -401,7 +444,7 @@ func (p *Processor) wipeStatus(ctx context.Context, statusToDelete *gtsmodel.Sta // deleteStatusFromTimelines completely removes the given status from all timelines. // It will also stream deletion of the status to all open streams. func (p *Processor) deleteStatusFromTimelines(ctx context.Context, status *gtsmodel.Status) error { - if err := p.statusTimelines.WipeItemFromAllTimelines(ctx, status.ID); err != nil { + if err := p.state.Timelines.Home.WipeItemFromAllTimelines(ctx, status.ID); err != nil { return err } diff --git a/internal/processing/fromfederator.go b/internal/processing/fromfederator.go index eccdbb894..ecb7934c9 100644 --- a/internal/processing/fromfederator.go +++ b/internal/processing/fromfederator.go @@ -342,10 +342,10 @@ func (p *Processor) processCreateBlockFromFederator(ctx context.Context, federat } // remove any of the blocking account's statuses from the blocked account's timeline, and vice versa - if err := p.statusTimelines.WipeItemsFromAccountID(ctx, block.AccountID, block.TargetAccountID); err != nil { + if err := p.state.Timelines.Home.WipeItemsFromAccountID(ctx, block.AccountID, block.TargetAccountID); err != nil { return err } - if err := p.statusTimelines.WipeItemsFromAccountID(ctx, block.TargetAccountID, block.AccountID); err != nil { + if err := p.state.Timelines.Home.WipeItemsFromAccountID(ctx, block.TargetAccountID, block.AccountID); err != nil { return err } // TODO: same with notifications diff --git a/internal/processing/list/create.go b/internal/processing/list/create.go new file mode 100644 index 000000000..10dec1050 --- /dev/null +++ b/internal/processing/list/create.go @@ -0,0 +1,50 @@ +// GoToSocial +// Copyright (C) GoToSocial Authors admin@gotosocial.org +// SPDX-License-Identifier: AGPL-3.0-or-later +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see <http://www.gnu.org/licenses/>. + +package list + +import ( + "context" + "errors" + + apimodel "github.com/superseriousbusiness/gotosocial/internal/api/model" + "github.com/superseriousbusiness/gotosocial/internal/db" + "github.com/superseriousbusiness/gotosocial/internal/gtserror" + "github.com/superseriousbusiness/gotosocial/internal/gtsmodel" + "github.com/superseriousbusiness/gotosocial/internal/id" +) + +// Create creates one a new list for the given account, using the provided parameters. +// These params should have already been validated by the time they reach this function. +func (p *Processor) Create(ctx context.Context, account *gtsmodel.Account, title string, repliesPolicy gtsmodel.RepliesPolicy) (*apimodel.List, gtserror.WithCode) { + list := >smodel.List{ + ID: id.NewULID(), + Title: title, + AccountID: account.ID, + RepliesPolicy: repliesPolicy, + } + + if err := p.state.DB.PutList(ctx, list); err != nil { + if errors.Is(err, db.ErrAlreadyExists) { + err = errors.New("you already have a list with this title") + return nil, gtserror.NewErrorConflict(err, err.Error()) + } + return nil, gtserror.NewErrorInternalError(err) + } + + return p.apiList(ctx, list) +} diff --git a/internal/processing/list/delete.go b/internal/processing/list/delete.go new file mode 100644 index 000000000..1c8ee5700 --- /dev/null +++ b/internal/processing/list/delete.go @@ -0,0 +1,46 @@ +// GoToSocial +// Copyright (C) GoToSocial Authors admin@gotosocial.org +// SPDX-License-Identifier: AGPL-3.0-or-later +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see <http://www.gnu.org/licenses/>. + +package list + +import ( + "context" + + "github.com/superseriousbusiness/gotosocial/internal/gtscontext" + "github.com/superseriousbusiness/gotosocial/internal/gtserror" + "github.com/superseriousbusiness/gotosocial/internal/gtsmodel" +) + +// Delete deletes one list for the given account. +func (p *Processor) Delete(ctx context.Context, account *gtsmodel.Account, id string) gtserror.WithCode { + list, errWithCode := p.getList( + // Use barebones ctx; no embedded + // structs necessary for this call. + gtscontext.SetBarebones(ctx), + account.ID, + id, + ) + if errWithCode != nil { + return errWithCode + } + + if err := p.state.DB.DeleteListByID(ctx, list.ID); err != nil { + return gtserror.NewErrorInternalError(err) + } + + return nil +} diff --git a/internal/processing/list/get.go b/internal/processing/list/get.go new file mode 100644 index 000000000..3f124fe7c --- /dev/null +++ b/internal/processing/list/get.go @@ -0,0 +1,155 @@ +// GoToSocial +// Copyright (C) GoToSocial Authors admin@gotosocial.org +// SPDX-License-Identifier: AGPL-3.0-or-later +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see <http://www.gnu.org/licenses/>. + +package list + +import ( + "context" + "errors" + "fmt" + + apimodel "github.com/superseriousbusiness/gotosocial/internal/api/model" + "github.com/superseriousbusiness/gotosocial/internal/db" + "github.com/superseriousbusiness/gotosocial/internal/gtscontext" + "github.com/superseriousbusiness/gotosocial/internal/gtserror" + "github.com/superseriousbusiness/gotosocial/internal/gtsmodel" + "github.com/superseriousbusiness/gotosocial/internal/log" + "github.com/superseriousbusiness/gotosocial/internal/util" +) + +// Get returns the api model of one list with the given ID. +func (p *Processor) Get(ctx context.Context, account *gtsmodel.Account, id string) (*apimodel.List, gtserror.WithCode) { + list, errWithCode := p.getList( + // Use barebones ctx; no embedded + // structs necessary for this call. + gtscontext.SetBarebones(ctx), + account.ID, + id, + ) + if errWithCode != nil { + return nil, errWithCode + } + + return p.apiList(ctx, list) +} + +// GetMultiple returns multiple lists created by the given account, sorted by list ID DESC (newest first). +func (p *Processor) GetAll(ctx context.Context, account *gtsmodel.Account) ([]*apimodel.List, gtserror.WithCode) { + lists, err := p.state.DB.GetListsForAccountID( + // Use barebones ctx; no embedded + // structs necessary for simple GET. + gtscontext.SetBarebones(ctx), + account.ID, + ) + if err != nil { + if errors.Is(err, db.ErrNoEntries) { + return nil, nil + } + return nil, gtserror.NewErrorInternalError(err) + } + + apiLists := make([]*apimodel.List, 0, len(lists)) + for _, list := range lists { + apiList, errWithCode := p.apiList(ctx, list) + if errWithCode != nil { + return nil, errWithCode + } + + apiLists = append(apiLists, apiList) + } + + return apiLists, nil +} + +// GetListAccounts returns accounts that are in the given list, owned by the given account. +// The additional parameters can be used for paging. +func (p *Processor) GetListAccounts( + ctx context.Context, + account *gtsmodel.Account, + listID string, + maxID string, + sinceID string, + minID string, + limit int, +) (*apimodel.PageableResponse, gtserror.WithCode) { + // Ensure list exists + is owned by requesting account. + if _, errWithCode := p.getList(ctx, account.ID, listID); errWithCode != nil { + return nil, errWithCode + } + + // To know which accounts are in the list, + // we need to first get requested list entries. + listEntries, err := p.state.DB.GetListEntries(ctx, listID, maxID, sinceID, minID, limit) + if err != nil && !errors.Is(err, db.ErrNoEntries) { + err = fmt.Errorf("GetListAccounts: error getting list entries: %w", err) + return nil, gtserror.NewErrorInternalError(err) + } + + count := len(listEntries) + if count == 0 { + // No list entries means no accounts. + return util.EmptyPageableResponse(), nil + } + + var ( + items = make([]interface{}, count) + nextMaxIDValue string + prevMinIDValue string + ) + + // For each list entry, we want the account it points to. + // To get this, we need to first get the follow that the + // list entry pertains to, then extract the target account + // from that follow. + // + // We do paging not by account ID, but by list entry ID. + for i, listEntry := range listEntries { + if i == count-1 { + nextMaxIDValue = listEntry.ID + } + + if i == 0 { + prevMinIDValue = listEntry.ID + } + + if err := p.state.DB.PopulateListEntry(ctx, listEntry); err != nil { + log.Debugf(ctx, "skipping list entry because of error populating it: %q", err) + continue + } + + if err := p.state.DB.PopulateFollow(ctx, listEntry.Follow); err != nil { + log.Debugf(ctx, "skipping list entry because of error populating follow: %q", err) + continue + } + + apiAccount, err := p.tc.AccountToAPIAccountPublic(ctx, listEntry.Follow.TargetAccount) + if err != nil { + log.Debugf(ctx, "skipping list entry because of error converting follow target account: %q", err) + continue + } + + items[i] = apiAccount + } + + return util.PackagePageableResponse(util.PageableResponseParams{ + Items: items, + Path: "api/v1/lists/" + listID + "/accounts", + NextMaxIDValue: nextMaxIDValue, + PrevMinIDValue: prevMinIDValue, + Limit: limit, + }) +} diff --git a/internal/processing/list/list.go b/internal/processing/list/list.go new file mode 100644 index 000000000..f192beb60 --- /dev/null +++ b/internal/processing/list/list.go @@ -0,0 +1,35 @@ +// GoToSocial +// Copyright (C) GoToSocial Authors admin@gotosocial.org +// SPDX-License-Identifier: AGPL-3.0-or-later +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see <http://www.gnu.org/licenses/>. + +package list + +import ( + "github.com/superseriousbusiness/gotosocial/internal/state" + "github.com/superseriousbusiness/gotosocial/internal/typeutils" +) + +type Processor struct { + state *state.State + tc typeutils.TypeConverter +} + +func New(state *state.State, tc typeutils.TypeConverter) Processor { + return Processor{ + state: state, + tc: tc, + } +} diff --git a/internal/processing/list/update.go b/internal/processing/list/update.go new file mode 100644 index 000000000..656af1f78 --- /dev/null +++ b/internal/processing/list/update.go @@ -0,0 +1,73 @@ +// GoToSocial +// Copyright (C) GoToSocial Authors admin@gotosocial.org +// SPDX-License-Identifier: AGPL-3.0-or-later +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see <http://www.gnu.org/licenses/>. + +package list + +import ( + "context" + "errors" + + apimodel "github.com/superseriousbusiness/gotosocial/internal/api/model" + "github.com/superseriousbusiness/gotosocial/internal/db" + "github.com/superseriousbusiness/gotosocial/internal/gtscontext" + "github.com/superseriousbusiness/gotosocial/internal/gtserror" + "github.com/superseriousbusiness/gotosocial/internal/gtsmodel" +) + +// Update updates one list for the given account, using the provided parameters. +// These params should have already been validated by the time they reach this function. +func (p *Processor) Update( + ctx context.Context, + account *gtsmodel.Account, + id string, + title *string, + repliesPolicy *gtsmodel.RepliesPolicy, +) (*apimodel.List, gtserror.WithCode) { + list, errWithCode := p.getList( + // Use barebones ctx; no embedded + // structs necessary for this call. + gtscontext.SetBarebones(ctx), + account.ID, + id, + ) + if errWithCode != nil { + return nil, errWithCode + } + + // Only update columns we're told to update. + columns := make([]string, 0, 2) + + if title != nil { + list.Title = *title + columns = append(columns, "title") + } + + if repliesPolicy != nil { + list.RepliesPolicy = *repliesPolicy + columns = append(columns, "replies_policy") + } + + if err := p.state.DB.UpdateList(ctx, list, columns...); err != nil { + if errors.Is(err, db.ErrAlreadyExists) { + err = errors.New("you already have a list with this title") + return nil, gtserror.NewErrorConflict(err, err.Error()) + } + return nil, gtserror.NewErrorInternalError(err) + } + + return p.apiList(ctx, list) +} diff --git a/internal/processing/list/updateentries.go b/internal/processing/list/updateentries.go new file mode 100644 index 000000000..6dcb951a7 --- /dev/null +++ b/internal/processing/list/updateentries.go @@ -0,0 +1,151 @@ +// GoToSocial +// Copyright (C) GoToSocial Authors admin@gotosocial.org +// SPDX-License-Identifier: AGPL-3.0-or-later +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see <http://www.gnu.org/licenses/>. + +package list + +import ( + "context" + "errors" + "fmt" + + "github.com/superseriousbusiness/gotosocial/internal/db" + "github.com/superseriousbusiness/gotosocial/internal/gtserror" + "github.com/superseriousbusiness/gotosocial/internal/gtsmodel" + "github.com/superseriousbusiness/gotosocial/internal/id" +) + +// AddToList adds targetAccountIDs to the given list, if valid. +func (p *Processor) AddToList(ctx context.Context, account *gtsmodel.Account, listID string, targetAccountIDs []string) gtserror.WithCode { + // Ensure this list exists + account owns it. + list, errWithCode := p.getList(ctx, account.ID, listID) + if errWithCode != nil { + return errWithCode + } + + // Pre-assemble list of entries to add. We *could* add these + // one by one as we iterate through accountIDs, but according + // to the Mastodon API we should only add them all once we know + // they're all valid, no partial updates. + listEntries := make([]*gtsmodel.ListEntry, 0, len(targetAccountIDs)) + + // Check each targetAccountID is valid. + // - Follow must exist. + // - Follow must not already be in the given list. + for _, targetAccountID := range targetAccountIDs { + // Ensure follow exists. + follow, err := p.state.DB.GetFollow(ctx, account.ID, targetAccountID) + if err != nil { + if errors.Is(err, db.ErrNoEntries) { + err = fmt.Errorf("you do not follow account %s", targetAccountID) + return gtserror.NewErrorNotFound(err, err.Error()) + } + return gtserror.NewErrorInternalError(err) + } + + // Ensure followID not already in list. + // This particular call to isInList will + // never error, so just check entryID. + entryID, _ := isInList( + list, + follow.ID, + func(listEntry *gtsmodel.ListEntry) (string, error) { + // Looking for the listEntry follow ID. + return listEntry.FollowID, nil + }, + ) + + // Empty entryID means entry with given + // followID wasn't found in the list. + if entryID != "" { + err = fmt.Errorf("account with id %s is already in list %s with entryID %s", targetAccountID, listID, entryID) + return gtserror.NewErrorUnprocessableEntity(err, err.Error()) + } + + // Entry wasn't in the list, we can add it. + listEntries = append(listEntries, >smodel.ListEntry{ + ID: id.NewULID(), + ListID: listID, + FollowID: follow.ID, + }) + } + + // If we get to here we can assume all + // entries are valid, so try to add them. + if err := p.state.DB.PutListEntries(ctx, listEntries); err != nil { + if errors.Is(err, db.ErrAlreadyExists) { + err = fmt.Errorf("one or more errors inserting list entries: %w", err) + return gtserror.NewErrorUnprocessableEntity(err, err.Error()) + } + return gtserror.NewErrorInternalError(err) + } + + return nil +} + +// RemoveFromList removes targetAccountIDs from the given list, if valid. +func (p *Processor) RemoveFromList(ctx context.Context, account *gtsmodel.Account, listID string, targetAccountIDs []string) gtserror.WithCode { + // Ensure this list exists + account owns it. + list, errWithCode := p.getList(ctx, account.ID, listID) + if errWithCode != nil { + return errWithCode + } + + // For each targetAccountID, we want to check if + // a follow with that targetAccountID is in the + // given list. If it is in there, we want to remove + // it from the list. + for _, targetAccountID := range targetAccountIDs { + // Check if targetAccountID is + // on a follow in the list. + entryID, err := isInList( + list, + targetAccountID, + func(listEntry *gtsmodel.ListEntry) (string, error) { + // We need the follow so populate this + // entry, if it's not already populated. + if err := p.state.DB.PopulateListEntry(ctx, listEntry); err != nil { + return "", err + } + + // Looking for the list entry targetAccountID. + return listEntry.Follow.TargetAccountID, nil + }, + ) + + // Error may be returned here if there was an issue + // populating the list entry. We only return on proper + // DB errors, we can just skip no entry errors. + if err != nil && !errors.Is(err, db.ErrNoEntries) { + err = fmt.Errorf("error checking if targetAccountID %s was in list %s: %w", targetAccountID, listID, err) + return gtserror.NewErrorInternalError(err) + } + + if entryID == "" { + // There was an errNoEntries or targetAccount + // wasn't in this list anyway, so we can skip it. + continue + } + + // TargetAccount was in the list, remove the entry. + if err := p.state.DB.DeleteListEntry(ctx, entryID); err != nil && !errors.Is(err, db.ErrNoEntries) { + err = fmt.Errorf("error removing list entry %s from list %s: %w", entryID, listID, err) + return gtserror.NewErrorInternalError(err) + } + } + + return nil +} diff --git a/internal/processing/list/util.go b/internal/processing/list/util.go new file mode 100644 index 000000000..6186f58c7 --- /dev/null +++ b/internal/processing/list/util.go @@ -0,0 +1,85 @@ +// GoToSocial +// Copyright (C) GoToSocial Authors admin@gotosocial.org +// SPDX-License-Identifier: AGPL-3.0-or-later +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see <http://www.gnu.org/licenses/>. + +package list + +import ( + "context" + "errors" + "fmt" + + apimodel "github.com/superseriousbusiness/gotosocial/internal/api/model" + "github.com/superseriousbusiness/gotosocial/internal/db" + "github.com/superseriousbusiness/gotosocial/internal/gtserror" + "github.com/superseriousbusiness/gotosocial/internal/gtsmodel" +) + +// getList is a shortcut to get one list from the database and +// check that it's owned by the given accountID. Will return +// appropriate errors so caller doesn't need to bother. +func (p *Processor) getList(ctx context.Context, accountID string, listID string) (*gtsmodel.List, gtserror.WithCode) { + list, err := p.state.DB.GetListByID(ctx, listID) + if err != nil { + if errors.Is(err, db.ErrNoEntries) { + // List doesn't seem to exist. + return nil, gtserror.NewErrorNotFound(err) + } + // Real database error. + return nil, gtserror.NewErrorInternalError(err) + } + + if list.AccountID != accountID { + err = fmt.Errorf("list with id %s does not belong to account %s", list.ID, accountID) + return nil, gtserror.NewErrorNotFound(err) + } + + return list, nil +} + +// apiList is a shortcut to return the API version of the given +// list, or return an appropriate error if conversion fails. +func (p *Processor) apiList(ctx context.Context, list *gtsmodel.List) (*apimodel.List, gtserror.WithCode) { + apiList, err := p.tc.ListToAPIList(ctx, list) + if err != nil { + return nil, gtserror.NewErrorInternalError(fmt.Errorf("error converting list to api: %w", err)) + } + + return apiList, nil +} + +// isInList check if thisID is equal to the result of thatID +// for any entry in the given list. +// +// Will return the id of the listEntry if true, empty if false, +// or an error if the result of thatID returns an error. +func isInList( + list *gtsmodel.List, + thisID string, + getThatID func(listEntry *gtsmodel.ListEntry) (string, error), +) (string, error) { + for _, listEntry := range list.ListEntries { + thatID, err := getThatID(listEntry) + if err != nil { + return "", err + } + + if thisID == thatID { + return listEntry.ID, nil + } + } + return "", nil +} diff --git a/internal/processing/notification_test.go b/internal/processing/notification_test.go deleted file mode 100644 index bf69fc9bc..000000000 --- a/internal/processing/notification_test.go +++ /dev/null @@ -1,52 +0,0 @@ -// GoToSocial -// Copyright (C) GoToSocial Authors admin@gotosocial.org -// SPDX-License-Identifier: AGPL-3.0-or-later -// -// This program is free software: you can redistribute it and/or modify -// it under the terms of the GNU Affero General Public License as published by -// the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. -// -// This program is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU Affero General Public License for more details. -// -// You should have received a copy of the GNU Affero General Public License -// along with this program. If not, see <http://www.gnu.org/licenses/>. - -package processing_test - -import ( - "context" - "testing" - - "github.com/stretchr/testify/suite" - apimodel "github.com/superseriousbusiness/gotosocial/internal/api/model" -) - -type NotificationTestSuite struct { - ProcessingStandardTestSuite -} - -// get a notification where someone has liked our status -func (suite *NotificationTestSuite) TestGetNotifications() { - receivingAccount := suite.testAccounts["local_account_1"] - notifsResponse, err := suite.processor.NotificationsGet(context.Background(), suite.testAutheds["local_account_1"], "", "", "", 10, nil) - suite.NoError(err) - suite.Len(notifsResponse.Items, 1) - notif, ok := notifsResponse.Items[0].(*apimodel.Notification) - if !ok { - panic("notif in response wasn't *apimodel.Notification") - } - - suite.NotNil(notif.Status) - suite.NotNil(notif.Status) - suite.NotNil(notif.Status.Account) - suite.Equal(receivingAccount.ID, notif.Status.Account.ID) - suite.Equal(`<http://localhost:8080/api/v1/notifications?limit=10&max_id=01F8Q0ANPTWW10DAKTX7BRPBJP>; rel="next", <http://localhost:8080/api/v1/notifications?limit=10&min_id=01F8Q0ANPTWW10DAKTX7BRPBJP>; rel="prev"`, notifsResponse.LinkHeader) -} - -func TestNotificationTestSuite(t *testing.T) { - suite.Run(t, &NotificationTestSuite{}) -} diff --git a/internal/processing/processor.go b/internal/processing/processor.go index 749987d6a..d5f88bfb2 100644 --- a/internal/processing/processor.go +++ b/internal/processing/processor.go @@ -29,39 +29,41 @@ import ( "github.com/superseriousbusiness/gotosocial/internal/processing/account" "github.com/superseriousbusiness/gotosocial/internal/processing/admin" "github.com/superseriousbusiness/gotosocial/internal/processing/fedi" + "github.com/superseriousbusiness/gotosocial/internal/processing/list" "github.com/superseriousbusiness/gotosocial/internal/processing/media" "github.com/superseriousbusiness/gotosocial/internal/processing/report" "github.com/superseriousbusiness/gotosocial/internal/processing/status" "github.com/superseriousbusiness/gotosocial/internal/processing/stream" + "github.com/superseriousbusiness/gotosocial/internal/processing/timeline" "github.com/superseriousbusiness/gotosocial/internal/processing/user" "github.com/superseriousbusiness/gotosocial/internal/state" - "github.com/superseriousbusiness/gotosocial/internal/timeline" "github.com/superseriousbusiness/gotosocial/internal/typeutils" "github.com/superseriousbusiness/gotosocial/internal/visibility" ) type Processor struct { - federator federation.Federator - tc typeutils.TypeConverter - oauthServer oauth.Server - mediaManager mm.Manager - statusTimelines timeline.Manager - state *state.State - emailSender email.Sender - filter *visibility.Filter + federator federation.Federator + tc typeutils.TypeConverter + oauthServer oauth.Server + mediaManager mm.Manager + state *state.State + emailSender email.Sender + filter *visibility.Filter /* SUB-PROCESSORS */ - account account.Processor - admin admin.Processor - fedi fedi.Processor - media media.Processor - report report.Processor - status status.Processor - stream stream.Processor - user user.Processor + account account.Processor + admin admin.Processor + fedi fedi.Processor + list list.Processor + media media.Processor + report report.Processor + status status.Processor + stream stream.Processor + timeline timeline.Processor + user user.Processor } func (p *Processor) Account() *account.Processor { @@ -76,6 +78,10 @@ func (p *Processor) Fedi() *fedi.Processor { return &p.fedi } +func (p *Processor) List() *list.Processor { + return &p.list +} + func (p *Processor) Media() *media.Processor { return &p.media } @@ -92,6 +98,10 @@ func (p *Processor) Stream() *stream.Processor { return &p.stream } +func (p *Processor) Timeline() *timeline.Processor { + return &p.timeline +} + func (p *Processor) User() *user.Processor { return &p.user } @@ -114,23 +124,19 @@ func NewProcessor( tc: tc, oauthServer: oauthServer, mediaManager: mediaManager, - statusTimelines: timeline.NewManager( - StatusGrabFunction(state.DB), - StatusFilterFunction(state.DB, filter), - StatusPrepareFunction(state.DB, tc), - StatusSkipInsertFunction(), - ), - state: state, - filter: filter, - emailSender: emailSender, + state: state, + filter: filter, + emailSender: emailSender, } - // sub processors + // Instantiate sub processors. processor.account = account.New(state, tc, mediaManager, oauthServer, federator, filter, parseMentionFunc) processor.admin = admin.New(state, tc, mediaManager, federator.TransportController(), emailSender) processor.fedi = fedi.New(state, tc, federator, filter) + processor.list = list.New(state, tc) processor.media = media.New(state, tc, mediaManager, federator.TransportController()) processor.report = report.New(state, tc) + processor.timeline = timeline.New(state, tc, filter) processor.status = status.New(state, federator, tc, filter, parseMentionFunc) processor.stream = stream.New(state, oauthServer) processor.user = user.New(state, emailSender) @@ -161,13 +167,3 @@ func (p *Processor) EnqueueFederator(ctx context.Context, msgs ...messages.FromF } }) } - -// Start starts the Processor. -func (p *Processor) Start() error { - return p.statusTimelines.Start() -} - -// Stop stops the processor cleanly. -func (p *Processor) Stop() error { - return p.statusTimelines.Stop() -} diff --git a/internal/processing/processor_test.go b/internal/processing/processor_test.go index e572593d1..68c33aa04 100644 --- a/internal/processing/processor_test.go +++ b/internal/processing/processor_test.go @@ -18,6 +18,8 @@ package processing_test import ( + "context" + "github.com/stretchr/testify/suite" "github.com/superseriousbusiness/gotosocial/internal/db" "github.com/superseriousbusiness/gotosocial/internal/email" @@ -28,8 +30,10 @@ import ( "github.com/superseriousbusiness/gotosocial/internal/processing" "github.com/superseriousbusiness/gotosocial/internal/state" "github.com/superseriousbusiness/gotosocial/internal/storage" + "github.com/superseriousbusiness/gotosocial/internal/stream" "github.com/superseriousbusiness/gotosocial/internal/transport" "github.com/superseriousbusiness/gotosocial/internal/typeutils" + "github.com/superseriousbusiness/gotosocial/internal/visibility" "github.com/superseriousbusiness/gotosocial/testrig" ) @@ -61,6 +65,7 @@ type ProcessingStandardTestSuite struct { testAutheds map[string]*oauth.Auth testBlocks map[string]*gtsmodel.Block testActivities map[string]testrig.ActivityWithSignature + testLists map[string]*gtsmodel.List processor *processing.Processor } @@ -84,6 +89,7 @@ func (suite *ProcessingStandardTestSuite) SetupSuite() { }, } suite.testBlocks = testrig.NewTestBlocks() + suite.testLists = testrig.NewTestLists() } func (suite *ProcessingStandardTestSuite) SetupTest() { @@ -99,6 +105,13 @@ func (suite *ProcessingStandardTestSuite) SetupTest() { suite.storage = testrig.NewInMemoryStorage() suite.state.Storage = suite.storage suite.typeconverter = testrig.NewTestTypeConverter(suite.db) + + testrig.StartTimelines( + &suite.state, + visibility.NewFilter(&suite.state), + suite.typeconverter, + ) + suite.httpClient = testrig.NewMockHTTPClient(nil, "../../testrig/media") suite.httpClient.TestRemotePeople = testrig.NewTestFediPeople() suite.httpClient.TestRemoteStatuses = testrig.NewTestFediStatuses() @@ -115,16 +128,40 @@ func (suite *ProcessingStandardTestSuite) SetupTest() { testrig.StandardDBSetup(suite.db, suite.testAccounts) testrig.StandardStorageSetup(suite.storage, "../../testrig/media") - if err := suite.processor.Start(); err != nil { - panic(err) - } } func (suite *ProcessingStandardTestSuite) TearDownTest() { testrig.StandardDBTeardown(suite.db) testrig.StandardStorageTeardown(suite.storage) - if err := suite.processor.Stop(); err != nil { - panic(err) - } testrig.StopWorkers(&suite.state) } + +func (suite *ProcessingStandardTestSuite) openStreams(ctx context.Context, account *gtsmodel.Account, listIDs []string) map[string]*stream.Stream { + streams := make(map[string]*stream.Stream) + + for _, streamType := range []string{ + stream.TimelineHome, + stream.TimelinePublic, + stream.TimelineNotifications, + } { + stream, err := suite.processor.Stream().Open(ctx, account, streamType) + if err != nil { + suite.FailNow(err.Error()) + } + + streams[streamType] = stream + } + + for _, listID := range listIDs { + streamType := stream.TimelineList + ":" + listID + + stream, err := suite.processor.Stream().Open(ctx, account, streamType) + if err != nil { + suite.FailNow(err.Error()) + } + + streams[streamType] = stream + } + + return streams +} diff --git a/internal/processing/status/status_test.go b/internal/processing/status/status_test.go index 0de56c30e..01d8d3acd 100644 --- a/internal/processing/status/status_test.go +++ b/internal/processing/status/status_test.go @@ -88,6 +88,12 @@ func (suite *StatusStandardTestSuite) SetupTest() { suite.federator = testrig.NewTestFederator(&suite.state, suite.tc, suite.mediaManager) filter := visibility.NewFilter(&suite.state) + testrig.StartTimelines( + &suite.state, + filter, + testrig.NewTestTypeConverter(suite.db), + ) + suite.status = status.New(&suite.state, suite.federator, suite.typeConverter, filter, processing.GetParseMentionFunc(suite.db, suite.federator)) testrig.StandardDBSetup(suite.db, suite.testAccounts) diff --git a/internal/processing/statustimeline.go b/internal/processing/statustimeline.go deleted file mode 100644 index 39c5272b6..000000000 --- a/internal/processing/statustimeline.go +++ /dev/null @@ -1,309 +0,0 @@ -// GoToSocial -// Copyright (C) GoToSocial Authors admin@gotosocial.org -// SPDX-License-Identifier: AGPL-3.0-or-later -// -// This program is free software: you can redistribute it and/or modify -// it under the terms of the GNU Affero General Public License as published by -// the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. -// -// This program is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU Affero General Public License for more details. -// -// You should have received a copy of the GNU Affero General Public License -// along with this program. If not, see <http://www.gnu.org/licenses/>. - -package processing - -import ( - "context" - "errors" - "fmt" - - apimodel "github.com/superseriousbusiness/gotosocial/internal/api/model" - "github.com/superseriousbusiness/gotosocial/internal/db" - "github.com/superseriousbusiness/gotosocial/internal/gtserror" - "github.com/superseriousbusiness/gotosocial/internal/gtsmodel" - "github.com/superseriousbusiness/gotosocial/internal/log" - "github.com/superseriousbusiness/gotosocial/internal/oauth" - "github.com/superseriousbusiness/gotosocial/internal/timeline" - "github.com/superseriousbusiness/gotosocial/internal/typeutils" - "github.com/superseriousbusiness/gotosocial/internal/util" - "github.com/superseriousbusiness/gotosocial/internal/visibility" -) - -const boostReinsertionDepth = 50 - -// StatusGrabFunction returns a function that satisfies the GrabFunction interface in internal/timeline. -func StatusGrabFunction(database db.DB) timeline.GrabFunction { - return func(ctx context.Context, timelineAccountID string, maxID string, sinceID string, minID string, limit int) ([]timeline.Timelineable, bool, error) { - statuses, err := database.GetHomeTimeline(ctx, timelineAccountID, maxID, sinceID, minID, limit, false) - if err != nil { - if errors.Is(err, db.ErrNoEntries) { - return nil, true, nil // we just don't have enough statuses left in the db so return stop = true - } - return nil, false, fmt.Errorf("statusGrabFunction: error getting statuses from db: %w", err) - } - - items := make([]timeline.Timelineable, len(statuses)) - for i, s := range statuses { - items[i] = s - } - - return items, false, nil - } -} - -// StatusFilterFunction returns a function that satisfies the FilterFunction interface in internal/timeline. -func StatusFilterFunction(database db.DB, filter *visibility.Filter) timeline.FilterFunction { - return func(ctx context.Context, timelineAccountID string, item timeline.Timelineable) (shouldIndex bool, err error) { - status, ok := item.(*gtsmodel.Status) - if !ok { - return false, errors.New("StatusFilterFunction: could not convert item to *gtsmodel.Status") - } - - requestingAccount, err := database.GetAccountByID(ctx, timelineAccountID) - if err != nil { - return false, fmt.Errorf("StatusFilterFunction: error getting account with id %s: %w", timelineAccountID, err) - } - - timelineable, err := filter.StatusHomeTimelineable(ctx, requestingAccount, status) - if err != nil { - return false, fmt.Errorf("StatusFilterFunction: error checking hometimelineability of status %s for account %s: %w", status.ID, timelineAccountID, err) - } - - return timelineable, nil - } -} - -// StatusPrepareFunction returns a function that satisfies the PrepareFunction interface in internal/timeline. -func StatusPrepareFunction(database db.DB, tc typeutils.TypeConverter) timeline.PrepareFunction { - return func(ctx context.Context, timelineAccountID string, itemID string) (timeline.Preparable, error) { - status, err := database.GetStatusByID(ctx, itemID) - if err != nil { - return nil, fmt.Errorf("StatusPrepareFunction: error getting status with id %s: %w", itemID, err) - } - - requestingAccount, err := database.GetAccountByID(ctx, timelineAccountID) - if err != nil { - return nil, fmt.Errorf("StatusPrepareFunction: error getting account with id %s: %w", timelineAccountID, err) - } - - return tc.StatusToAPIStatus(ctx, status, requestingAccount) - } -} - -// StatusSkipInsertFunction returns a function that satisifes the SkipInsertFunction interface in internal/timeline. -func StatusSkipInsertFunction() timeline.SkipInsertFunction { - return func( - ctx context.Context, - newItemID string, - newItemAccountID string, - newItemBoostOfID string, - newItemBoostOfAccountID string, - nextItemID string, - nextItemAccountID string, - nextItemBoostOfID string, - nextItemBoostOfAccountID string, - depth int, - ) (bool, error) { - // make sure we don't insert a duplicate - if newItemID == nextItemID { - return true, nil - } - - // check if it's a boost - if newItemBoostOfID != "" { - // skip if we've recently put another boost of this status in the timeline - if newItemBoostOfID == nextItemBoostOfID { - if depth < boostReinsertionDepth { - return true, nil - } - } - - // skip if we've recently put the original status in the timeline - if newItemBoostOfID == nextItemID { - if depth < boostReinsertionDepth { - return true, nil - } - } - } - - // insert the item - return false, nil - } -} - -func (p *Processor) HomeTimelineGet(ctx context.Context, authed *oauth.Auth, maxID string, sinceID string, minID string, limit int, local bool) (*apimodel.PageableResponse, gtserror.WithCode) { - statuses, err := p.statusTimelines.GetTimeline(ctx, authed.Account.ID, maxID, sinceID, minID, limit, local) - if err != nil { - err = fmt.Errorf("HomeTimelineGet: error getting statuses: %w", err) - return nil, gtserror.NewErrorInternalError(err) - } - - count := len(statuses) - if count == 0 { - return util.EmptyPageableResponse(), nil - } - - var ( - items = make([]interface{}, count) - nextMaxIDValue string - prevMinIDValue string - ) - - for i, item := range statuses { - if i == count-1 { - nextMaxIDValue = item.GetID() - } - - if i == 0 { - prevMinIDValue = item.GetID() - } - - items[i] = item - } - - return util.PackagePageableResponse(util.PageableResponseParams{ - Items: items, - Path: "api/v1/timelines/home", - NextMaxIDValue: nextMaxIDValue, - PrevMinIDValue: prevMinIDValue, - Limit: limit, - }) -} - -func (p *Processor) PublicTimelineGet(ctx context.Context, authed *oauth.Auth, maxID string, sinceID string, minID string, limit int, local bool) (*apimodel.PageableResponse, gtserror.WithCode) { - statuses, err := p.state.DB.GetPublicTimeline(ctx, maxID, sinceID, minID, limit, local) - if err != nil { - if errors.Is(err, db.ErrNoEntries) { - // No statuses (left) in public timeline. - return util.EmptyPageableResponse(), nil - } - // An actual error has occurred. - err = fmt.Errorf("PublicTimelineGet: db error getting statuses: %w", err) - return nil, gtserror.NewErrorInternalError(err) - } - - count := len(statuses) - if count == 0 { - return util.EmptyPageableResponse(), nil - } - - var ( - items = make([]interface{}, 0, count) - nextMaxIDValue string - prevMinIDValue string - ) - - for i, s := range statuses { - // Set next + prev values before filtering and API - // converting, so caller can still page properly. - if i == count-1 { - nextMaxIDValue = s.ID - } - - if i == 0 { - prevMinIDValue = s.ID - } - - timelineable, err := p.filter.StatusPublicTimelineable(ctx, authed.Account, s) - if err != nil { - log.Debugf(ctx, "skipping status %s because of an error checking StatusPublicTimelineable: %s", s.ID, err) - continue - } - - if !timelineable { - continue - } - - apiStatus, err := p.tc.StatusToAPIStatus(ctx, s, authed.Account) - if err != nil { - log.Debugf(ctx, "skipping status %s because it couldn't be converted to its api representation: %s", s.ID, err) - continue - } - - items = append(items, apiStatus) - } - - return util.PackagePageableResponse(util.PageableResponseParams{ - Items: items, - Path: "api/v1/timelines/public", - NextMaxIDValue: nextMaxIDValue, - PrevMinIDValue: prevMinIDValue, - Limit: limit, - }) -} - -func (p *Processor) FavedTimelineGet(ctx context.Context, authed *oauth.Auth, maxID string, minID string, limit int) (*apimodel.PageableResponse, gtserror.WithCode) { - statuses, nextMaxID, prevMinID, err := p.state.DB.GetFavedTimeline(ctx, authed.Account.ID, maxID, minID, limit) - if err != nil { - if errors.Is(err, db.ErrNoEntries) { - // There are just no entries (left). - return util.EmptyPageableResponse(), nil - } - // An actual error has occurred. - err = fmt.Errorf("FavedTimelineGet: db error getting statuses: %w", err) - return nil, gtserror.NewErrorInternalError(err) - } - - count := len(statuses) - if count == 0 { - return util.EmptyPageableResponse(), nil - } - - filtered, err := p.filterFavedStatuses(ctx, authed, statuses) - if err != nil { - err = fmt.Errorf("FavedTimelineGet: error filtering statuses: %w", err) - return nil, gtserror.NewErrorInternalError(err) - } - - items := make([]interface{}, len(filtered)) - for i, item := range filtered { - items[i] = item - } - - return util.PackagePageableResponse(util.PageableResponseParams{ - Items: items, - Path: "api/v1/favourites", - NextMaxIDValue: nextMaxID, - PrevMinIDValue: prevMinID, - Limit: limit, - }) -} - -func (p *Processor) filterFavedStatuses(ctx context.Context, authed *oauth.Auth, statuses []*gtsmodel.Status) ([]*apimodel.Status, error) { - apiStatuses := make([]*apimodel.Status, 0, len(statuses)) - - for _, s := range statuses { - if _, err := p.state.DB.GetAccountByID(ctx, s.AccountID); err != nil { - if errors.Is(err, db.ErrNoEntries) { - log.Debugf(ctx, "skipping status %s because account %s can't be found in the db", s.ID, s.AccountID) - continue - } - err = fmt.Errorf("filterFavedStatuses: db error getting status author: %w", err) - return nil, gtserror.NewErrorInternalError(err) - } - - timelineable, err := p.filter.StatusVisible(ctx, authed.Account, s) - if err != nil { - log.Debugf(ctx, "skipping status %s because of an error checking status visibility: %s", s.ID, err) - continue - } - if !timelineable { - continue - } - - apiStatus, err := p.tc.StatusToAPIStatus(ctx, s, authed.Account) - if err != nil { - log.Debugf(ctx, "skipping status %s because it couldn't be converted to its api representation: %s", s.ID, err) - continue - } - - apiStatuses = append(apiStatuses, apiStatus) - } - - return apiStatuses, nil -} diff --git a/internal/processing/stream/open.go b/internal/processing/stream/open.go index e43152b29..1c041309f 100644 --- a/internal/processing/stream/open.go +++ b/internal/processing/stream/open.go @@ -31,60 +31,65 @@ import ( ) // Open returns a new Stream for the given account, which will contain a channel for passing messages back to the caller. -func (p *Processor) Open(ctx context.Context, account *gtsmodel.Account, streamTimeline string) (*stream.Stream, gtserror.WithCode) { +func (p *Processor) Open(ctx context.Context, account *gtsmodel.Account, streamType string) (*stream.Stream, gtserror.WithCode) { l := log.WithContext(ctx).WithFields(kv.Fields{ {"account", account.ID}, - {"streamType", streamTimeline}, + {"streamType", streamType}, }...) l.Debug("received open stream request") - // each stream needs a unique ID so we know to close it - streamID, err := id.NewRandomULID() + var ( + streamID string + err error + ) + + // Each stream needs a unique ID so we know to close it. + streamID, err = id.NewRandomULID() if err != nil { - return nil, gtserror.NewErrorInternalError(fmt.Errorf("error generating stream id: %s", err)) + return nil, gtserror.NewErrorInternalError(fmt.Errorf("error generating stream id: %w", err)) } - // Each stream can be subscibed to multiple timelines. + // Each stream can be subscibed to multiple types. // Record them in a set, and include the initial one - // if it was given to us - timelines := map[string]bool{} - if streamTimeline != "" { - timelines[streamTimeline] = true + // if it was given to us. + streamTypes := map[string]any{} + if streamType != "" { + streamTypes[streamType] = true } - thisStream := &stream.Stream{ - ID: streamID, - Timelines: timelines, - Messages: make(chan *stream.Message, 100), - Hangup: make(chan interface{}, 1), - Connected: true, + newStream := &stream.Stream{ + ID: streamID, + StreamTypes: streamTypes, + Messages: make(chan *stream.Message, 100), + Hangup: make(chan interface{}, 1), + Connected: true, } - go p.waitToCloseStream(account, thisStream) + go p.waitToCloseStream(account, newStream) v, ok := p.streamMap.Load(account.ID) - if !ok || v == nil { - // there is no entry in the streamMap for this account yet, so make one and store it - streamsForAccount := &stream.StreamsForAccount{ - Streams: []*stream.Stream{ - thisStream, - }, - } - p.streamMap.Store(account.ID, streamsForAccount) - } else { - // there is an entry in the streamMap for this account - // parse the interface as a streamsForAccount + if ok { + // There is an entry in the streamMap + // for this account. Parse it out. streamsForAccount, ok := v.(*stream.StreamsForAccount) if !ok { return nil, gtserror.NewErrorInternalError(errors.New("stream map error")) } - // append this stream to it + // Append new stream to existing entry. streamsForAccount.Lock() - streamsForAccount.Streams = append(streamsForAccount.Streams, thisStream) + streamsForAccount.Streams = append(streamsForAccount.Streams, newStream) streamsForAccount.Unlock() + } else { + // There is no entry in the streamMap for + // this account yet. Create one and store it. + p.streamMap.Store(account.ID, &stream.StreamsForAccount{ + Streams: []*stream.Stream{ + newStream, + }, + }) } - return thisStream, nil + return newStream, nil } // waitToCloseStream waits until the hangup channel is closed for the given stream. diff --git a/internal/processing/stream/stream.go b/internal/processing/stream/stream.go index 4a4c92a00..bd49a330c 100644 --- a/internal/processing/stream/stream.go +++ b/internal/processing/stream/stream.go @@ -18,7 +18,6 @@ package stream import ( - "errors" "sync" "github.com/superseriousbusiness/gotosocial/internal/oauth" @@ -40,37 +39,38 @@ func New(state *state.State, oauthServer oauth.Server) Processor { } // toAccount streams the given payload with the given event type to any streams currently open for the given account ID. -func (p *Processor) toAccount(payload string, event string, timelines []string, accountID string) error { +func (p *Processor) toAccount(payload string, event string, streamTypes []string, accountID string) error { + // Load all streams open for this account. v, ok := p.streamMap.Load(accountID) if !ok { - // no open connections so nothing to stream - return nil - } - - streamsForAccount, ok := v.(*stream.StreamsForAccount) - if !ok { - return errors.New("stream map error") + return nil // No entry = nothing to stream. } + streamsForAccount := v.(*stream.StreamsForAccount) //nolint:forcetypeassert streamsForAccount.Lock() defer streamsForAccount.Unlock() + for _, s := range streamsForAccount.Streams { s.Lock() defer s.Unlock() + if !s.Connected { continue } - for _, t := range timelines { - if _, found := s.Timelines[t]; found { + typeLoop: + for _, streamType := range streamTypes { + if _, found := s.StreamTypes[streamType]; found { s.Messages <- &stream.Message{ - Stream: []string{string(t)}, + Stream: []string{streamType}, Event: string(event), Payload: payload, } - // break out to the outer loop, to avoid sending duplicates - // of the same event to the same stream - break + + // Break out to the outer loop, + // to avoid sending duplicates of + // the same event to the same stream. + break typeLoop } } } diff --git a/internal/processing/stream/update.go b/internal/processing/stream/update.go index dc575c636..ee70bda11 100644 --- a/internal/processing/stream/update.go +++ b/internal/processing/stream/update.go @@ -27,11 +27,11 @@ import ( ) // Update streams the given update to any open, appropriate streams belonging to the given account. -func (p *Processor) Update(s *apimodel.Status, account *gtsmodel.Account, timeline string) error { +func (p *Processor) Update(s *apimodel.Status, account *gtsmodel.Account, streamTypes []string) error { bytes, err := json.Marshal(s) if err != nil { return fmt.Errorf("error marshalling status to json: %s", err) } - return p.toAccount(string(bytes), stream.EventTypeUpdate, []string{timeline}, account.ID) + return p.toAccount(string(bytes), stream.EventTypeUpdate, streamTypes, account.ID) } diff --git a/internal/processing/timeline/common.go b/internal/processing/timeline/common.go new file mode 100644 index 000000000..6d29d81d6 --- /dev/null +++ b/internal/processing/timeline/common.go @@ -0,0 +1,71 @@ +// GoToSocial +// Copyright (C) GoToSocial Authors admin@gotosocial.org +// SPDX-License-Identifier: AGPL-3.0-or-later +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see <http://www.gnu.org/licenses/>. + +package timeline + +import ( + "context" + + "github.com/superseriousbusiness/gotosocial/internal/timeline" +) + +// SkipInsert returns a function that satisifes SkipInsertFunction. +func SkipInsert() timeline.SkipInsertFunction { + // Gap to allow between a status or boost of status, + // and reinsertion of a new boost of that status. + // This is useful to avoid a heavily boosted status + // showing up way too often in a user's timeline. + const boostReinsertionDepth = 50 + + return func( + ctx context.Context, + newItemID string, + newItemAccountID string, + newItemBoostOfID string, + newItemBoostOfAccountID string, + nextItemID string, + nextItemAccountID string, + nextItemBoostOfID string, + nextItemBoostOfAccountID string, + depth int, + ) (bool, error) { + if newItemID == nextItemID { + // Don't insert duplicates. + return true, nil + } + + if newItemBoostOfID != "" { + if newItemBoostOfID == nextItemBoostOfID && + depth < boostReinsertionDepth { + // Don't insert boosts of items + // we've seen boosted recently. + return true, nil + } + + if newItemBoostOfID == nextItemID && + depth < boostReinsertionDepth { + // Don't insert boosts of items when + // we've seen the original recently. + return true, nil + } + } + + // Proceed with insertion + // (that's what she said!). + return false, nil + } +} diff --git a/internal/processing/timeline/faved.go b/internal/processing/timeline/faved.go new file mode 100644 index 000000000..0fc92d8fa --- /dev/null +++ b/internal/processing/timeline/faved.go @@ -0,0 +1,73 @@ +// GoToSocial +// Copyright (C) GoToSocial Authors admin@gotosocial.org +// SPDX-License-Identifier: AGPL-3.0-or-later +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see <http://www.gnu.org/licenses/>. + +package timeline + +import ( + "context" + "errors" + "fmt" + + apimodel "github.com/superseriousbusiness/gotosocial/internal/api/model" + "github.com/superseriousbusiness/gotosocial/internal/db" + "github.com/superseriousbusiness/gotosocial/internal/gtserror" + "github.com/superseriousbusiness/gotosocial/internal/log" + "github.com/superseriousbusiness/gotosocial/internal/oauth" + "github.com/superseriousbusiness/gotosocial/internal/util" +) + +func (p *Processor) FavedTimelineGet(ctx context.Context, authed *oauth.Auth, maxID string, minID string, limit int) (*apimodel.PageableResponse, gtserror.WithCode) { + statuses, nextMaxID, prevMinID, err := p.state.DB.GetFavedTimeline(ctx, authed.Account.ID, maxID, minID, limit) + if err != nil && !errors.Is(err, db.ErrNoEntries) { + err = fmt.Errorf("FavedTimelineGet: db error getting statuses: %w", err) + return nil, gtserror.NewErrorInternalError(err) + } + + count := len(statuses) + if count == 0 { + return util.EmptyPageableResponse(), nil + } + + items := make([]interface{}, 0, count) + for _, s := range statuses { + visible, err := p.filter.StatusVisible(ctx, authed.Account, s) + if err != nil { + log.Debugf(ctx, "skipping status %s because of an error checking status visibility: %s", s.ID, err) + continue + } + + if !visible { + continue + } + + apiStatus, err := p.tc.StatusToAPIStatus(ctx, s, authed.Account) + if err != nil { + log.Debugf(ctx, "skipping status %s because it couldn't be converted to its api representation: %s", s.ID, err) + continue + } + + items = append(items, apiStatus) + } + + return util.PackagePageableResponse(util.PageableResponseParams{ + Items: items, + Path: "api/v1/favourites", + NextMaxIDValue: nextMaxID, + PrevMinIDValue: prevMinID, + Limit: limit, + }) +} diff --git a/internal/processing/timeline/home.go b/internal/processing/timeline/home.go new file mode 100644 index 000000000..e65f12e17 --- /dev/null +++ b/internal/processing/timeline/home.go @@ -0,0 +1,133 @@ +// GoToSocial +// Copyright (C) GoToSocial Authors admin@gotosocial.org +// SPDX-License-Identifier: AGPL-3.0-or-later +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see <http://www.gnu.org/licenses/>. + +package timeline + +import ( + "context" + "errors" + "fmt" + + apimodel "github.com/superseriousbusiness/gotosocial/internal/api/model" + "github.com/superseriousbusiness/gotosocial/internal/db" + "github.com/superseriousbusiness/gotosocial/internal/gtserror" + "github.com/superseriousbusiness/gotosocial/internal/gtsmodel" + "github.com/superseriousbusiness/gotosocial/internal/oauth" + "github.com/superseriousbusiness/gotosocial/internal/state" + "github.com/superseriousbusiness/gotosocial/internal/timeline" + "github.com/superseriousbusiness/gotosocial/internal/typeutils" + "github.com/superseriousbusiness/gotosocial/internal/util" + "github.com/superseriousbusiness/gotosocial/internal/visibility" +) + +// HomeTimelineGrab returns a function that satisfies GrabFunction for home timelines. +func HomeTimelineGrab(state *state.State) timeline.GrabFunction { + return func(ctx context.Context, accountID string, maxID string, sinceID string, minID string, limit int) ([]timeline.Timelineable, bool, error) { + statuses, err := state.DB.GetHomeTimeline(ctx, accountID, maxID, sinceID, minID, limit, false) + if err != nil { + if errors.Is(err, db.ErrNoEntries) { + return nil, true, nil // we just don't have enough statuses left in the db so return stop = true + } + return nil, false, fmt.Errorf("HomeTimelineGrab: error getting statuses from db: %w", err) + } + + items := make([]timeline.Timelineable, len(statuses)) + for i, s := range statuses { + items[i] = s + } + + return items, false, nil + } +} + +// HomeTimelineFilter returns a function that satisfies FilterFunction for home timelines. +func HomeTimelineFilter(state *state.State, filter *visibility.Filter) timeline.FilterFunction { + return func(ctx context.Context, accountID string, item timeline.Timelineable) (shouldIndex bool, err error) { + status, ok := item.(*gtsmodel.Status) + if !ok { + return false, errors.New("HomeTimelineFilter: could not convert item to *gtsmodel.Status") + } + + requestingAccount, err := state.DB.GetAccountByID(ctx, accountID) + if err != nil { + return false, fmt.Errorf("HomeTimelineFilter: error getting account with id %s: %w", accountID, err) + } + + timelineable, err := filter.StatusHomeTimelineable(ctx, requestingAccount, status) + if err != nil { + return false, fmt.Errorf("HomeTimelineFilter: error checking hometimelineability of status %s for account %s: %w", status.ID, accountID, err) + } + + return timelineable, nil + } +} + +// HomeTimelineStatusPrepare returns a function that satisfies PrepareFunction for home timelines. +func HomeTimelineStatusPrepare(state *state.State, tc typeutils.TypeConverter) timeline.PrepareFunction { + return func(ctx context.Context, accountID string, itemID string) (timeline.Preparable, error) { + status, err := state.DB.GetStatusByID(ctx, itemID) + if err != nil { + return nil, fmt.Errorf("StatusPrepare: error getting status with id %s: %w", itemID, err) + } + + requestingAccount, err := state.DB.GetAccountByID(ctx, accountID) + if err != nil { + return nil, fmt.Errorf("StatusPrepare: error getting account with id %s: %w", accountID, err) + } + + return tc.StatusToAPIStatus(ctx, status, requestingAccount) + } +} + +func (p *Processor) HomeTimelineGet(ctx context.Context, authed *oauth.Auth, maxID string, sinceID string, minID string, limit int, local bool) (*apimodel.PageableResponse, gtserror.WithCode) { + statuses, err := p.state.Timelines.Home.GetTimeline(ctx, authed.Account.ID, maxID, sinceID, minID, limit, local) + if err != nil { + err = fmt.Errorf("HomeTimelineGet: error getting statuses: %w", err) + return nil, gtserror.NewErrorInternalError(err) + } + + count := len(statuses) + if count == 0 { + return util.EmptyPageableResponse(), nil + } + + var ( + items = make([]interface{}, count) + nextMaxIDValue string + prevMinIDValue string + ) + + for i, item := range statuses { + if i == count-1 { + nextMaxIDValue = item.GetID() + } + + if i == 0 { + prevMinIDValue = item.GetID() + } + + items[i] = item + } + + return util.PackagePageableResponse(util.PageableResponseParams{ + Items: items, + Path: "api/v1/timelines/home", + NextMaxIDValue: nextMaxIDValue, + PrevMinIDValue: prevMinIDValue, + Limit: limit, + }) +} diff --git a/internal/processing/timeline/list.go b/internal/processing/timeline/list.go new file mode 100644 index 000000000..adad35197 --- /dev/null +++ b/internal/processing/timeline/list.go @@ -0,0 +1,157 @@ +// GoToSocial +// Copyright (C) GoToSocial Authors admin@gotosocial.org +// SPDX-License-Identifier: AGPL-3.0-or-later +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see <http://www.gnu.org/licenses/>. + +package timeline + +import ( + "context" + "errors" + "fmt" + + apimodel "github.com/superseriousbusiness/gotosocial/internal/api/model" + "github.com/superseriousbusiness/gotosocial/internal/db" + "github.com/superseriousbusiness/gotosocial/internal/gtserror" + "github.com/superseriousbusiness/gotosocial/internal/gtsmodel" + "github.com/superseriousbusiness/gotosocial/internal/oauth" + "github.com/superseriousbusiness/gotosocial/internal/state" + "github.com/superseriousbusiness/gotosocial/internal/timeline" + "github.com/superseriousbusiness/gotosocial/internal/typeutils" + "github.com/superseriousbusiness/gotosocial/internal/util" + "github.com/superseriousbusiness/gotosocial/internal/visibility" +) + +// ListTimelineGrab returns a function that satisfies GrabFunction for list timelines. +func ListTimelineGrab(state *state.State) timeline.GrabFunction { + return func(ctx context.Context, listID string, maxID string, sinceID string, minID string, limit int) ([]timeline.Timelineable, bool, error) { + statuses, err := state.DB.GetListTimeline(ctx, listID, maxID, sinceID, minID, limit) + if err != nil { + if errors.Is(err, db.ErrNoEntries) { + return nil, true, nil // we just don't have enough statuses left in the db so return stop = true + } + return nil, false, fmt.Errorf("ListTimelineGrab: error getting statuses from db: %w", err) + } + + items := make([]timeline.Timelineable, len(statuses)) + for i, s := range statuses { + items[i] = s + } + + return items, false, nil + } +} + +// HomeTimelineFilter returns a function that satisfies FilterFunction for list timelines. +func ListTimelineFilter(state *state.State, filter *visibility.Filter) timeline.FilterFunction { + return func(ctx context.Context, listID string, item timeline.Timelineable) (shouldIndex bool, err error) { + status, ok := item.(*gtsmodel.Status) + if !ok { + return false, errors.New("ListTimelineFilter: could not convert item to *gtsmodel.Status") + } + + list, err := state.DB.GetListByID(ctx, listID) + if err != nil { + return false, fmt.Errorf("ListTimelineFilter: error getting list with id %s: %w", listID, err) + } + + requestingAccount, err := state.DB.GetAccountByID(ctx, list.AccountID) + if err != nil { + return false, fmt.Errorf("ListTimelineFilter: error getting account with id %s: %w", list.AccountID, err) + } + + timelineable, err := filter.StatusHomeTimelineable(ctx, requestingAccount, status) + if err != nil { + return false, fmt.Errorf("ListTimelineFilter: error checking hometimelineability of status %s for account %s: %w", status.ID, list.AccountID, err) + } + + return timelineable, nil + } +} + +// ListTimelineStatusPrepare returns a function that satisfies PrepareFunction for list timelines. +func ListTimelineStatusPrepare(state *state.State, tc typeutils.TypeConverter) timeline.PrepareFunction { + return func(ctx context.Context, listID string, itemID string) (timeline.Preparable, error) { + status, err := state.DB.GetStatusByID(ctx, itemID) + if err != nil { + return nil, fmt.Errorf("ListTimelineStatusPrepare: error getting status with id %s: %w", itemID, err) + } + + list, err := state.DB.GetListByID(ctx, listID) + if err != nil { + return nil, fmt.Errorf("ListTimelineStatusPrepare: error getting list with id %s: %w", listID, err) + } + + requestingAccount, err := state.DB.GetAccountByID(ctx, list.AccountID) + if err != nil { + return nil, fmt.Errorf("ListTimelineStatusPrepare: error getting account with id %s: %w", list.AccountID, err) + } + + return tc.StatusToAPIStatus(ctx, status, requestingAccount) + } +} + +func (p *Processor) ListTimelineGet(ctx context.Context, authed *oauth.Auth, listID string, maxID string, sinceID string, minID string, limit int) (*apimodel.PageableResponse, gtserror.WithCode) { + // Ensure list exists + is owned by this account. + list, err := p.state.DB.GetListByID(ctx, listID) + if err != nil { + if errors.Is(err, db.ErrNoEntries) { + return nil, gtserror.NewErrorNotFound(err) + } + return nil, gtserror.NewErrorInternalError(err) + } + + if list.AccountID != authed.Account.ID { + err = fmt.Errorf("list with id %s does not belong to account %s", list.ID, authed.Account.ID) + return nil, gtserror.NewErrorNotFound(err) + } + + statuses, err := p.state.Timelines.List.GetTimeline(ctx, listID, maxID, sinceID, minID, limit, false) + if err != nil { + err = fmt.Errorf("ListTimelineGet: error getting statuses: %w", err) + return nil, gtserror.NewErrorInternalError(err) + } + + count := len(statuses) + if count == 0 { + return util.EmptyPageableResponse(), nil + } + + var ( + items = make([]interface{}, count) + nextMaxIDValue string + prevMinIDValue string + ) + + for i, item := range statuses { + if i == count-1 { + nextMaxIDValue = item.GetID() + } + + if i == 0 { + prevMinIDValue = item.GetID() + } + + items[i] = item + } + + return util.PackagePageableResponse(util.PageableResponseParams{ + Items: items, + Path: "api/v1/timelines/list/" + listID, + NextMaxIDValue: nextMaxIDValue, + PrevMinIDValue: prevMinIDValue, + Limit: limit, + }) +} diff --git a/internal/processing/notification.go b/internal/processing/timeline/notification.go index 2e4e1788f..4a79fb82a 100644 --- a/internal/processing/notification.go +++ b/internal/processing/timeline/notification.go @@ -15,7 +15,7 @@ // You should have received a copy of the GNU Affero General Public License // along with this program. If not, see <http://www.gnu.org/licenses/>. -package processing +package timeline import ( "context" @@ -33,12 +33,7 @@ import ( func (p *Processor) NotificationsGet(ctx context.Context, authed *oauth.Auth, maxID string, sinceID string, minID string, limit int, excludeTypes []string) (*apimodel.PageableResponse, gtserror.WithCode) { notifs, err := p.state.DB.GetAccountNotifications(ctx, authed.Account.ID, maxID, sinceID, minID, limit, excludeTypes) - if err != nil { - if errors.Is(err, db.ErrNoEntries) { - // No notifs (left). - return util.EmptyPageableResponse(), nil - } - // An actual error has occurred. + if err != nil && !errors.Is(err, db.ErrNoEntries) { err = fmt.Errorf("NotificationsGet: db error getting notifications: %w", err) return nil, gtserror.NewErrorInternalError(err) } @@ -73,6 +68,7 @@ func (p *Processor) NotificationsGet(ctx context.Context, authed *oauth.Auth, ma log.Debugf(ctx, "skipping notification %s because of an error checking notification visibility: %s", n.ID, err) continue } + if !visible { continue } @@ -85,6 +81,7 @@ func (p *Processor) NotificationsGet(ctx context.Context, authed *oauth.Auth, ma log.Debugf(ctx, "skipping notification %s because of an error checking notification visibility: %s", n.ID, err) continue } + if !visible { continue } diff --git a/internal/processing/timeline/public.go b/internal/processing/timeline/public.go new file mode 100644 index 000000000..67893ecfa --- /dev/null +++ b/internal/processing/timeline/public.go @@ -0,0 +1,88 @@ +// GoToSocial +// Copyright (C) GoToSocial Authors admin@gotosocial.org +// SPDX-License-Identifier: AGPL-3.0-or-later +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see <http://www.gnu.org/licenses/>. + +package timeline + +import ( + "context" + "errors" + "fmt" + + apimodel "github.com/superseriousbusiness/gotosocial/internal/api/model" + "github.com/superseriousbusiness/gotosocial/internal/db" + "github.com/superseriousbusiness/gotosocial/internal/gtserror" + "github.com/superseriousbusiness/gotosocial/internal/log" + "github.com/superseriousbusiness/gotosocial/internal/oauth" + "github.com/superseriousbusiness/gotosocial/internal/util" +) + +func (p *Processor) PublicTimelineGet(ctx context.Context, authed *oauth.Auth, maxID string, sinceID string, minID string, limit int, local bool) (*apimodel.PageableResponse, gtserror.WithCode) { + statuses, err := p.state.DB.GetPublicTimeline(ctx, maxID, sinceID, minID, limit, local) + if err != nil && !errors.Is(err, db.ErrNoEntries) { + err = fmt.Errorf("PublicTimelineGet: db error getting statuses: %w", err) + return nil, gtserror.NewErrorInternalError(err) + } + + count := len(statuses) + if count == 0 { + return util.EmptyPageableResponse(), nil + } + + var ( + items = make([]interface{}, 0, count) + nextMaxIDValue string + prevMinIDValue string + ) + + for i, s := range statuses { + // Set next + prev values before filtering and API + // converting, so caller can still page properly. + if i == count-1 { + nextMaxIDValue = s.ID + } + + if i == 0 { + prevMinIDValue = s.ID + } + + timelineable, err := p.filter.StatusPublicTimelineable(ctx, authed.Account, s) + if err != nil { + log.Debugf(ctx, "skipping status %s because of an error checking StatusPublicTimelineable: %s", s.ID, err) + continue + } + + if !timelineable { + continue + } + + apiStatus, err := p.tc.StatusToAPIStatus(ctx, s, authed.Account) + if err != nil { + log.Debugf(ctx, "skipping status %s because it couldn't be converted to its api representation: %s", s.ID, err) + continue + } + + items = append(items, apiStatus) + } + + return util.PackagePageableResponse(util.PageableResponseParams{ + Items: items, + Path: "api/v1/timelines/public", + NextMaxIDValue: nextMaxIDValue, + PrevMinIDValue: prevMinIDValue, + Limit: limit, + }) +} diff --git a/internal/processing/timeline/timeline.go b/internal/processing/timeline/timeline.go new file mode 100644 index 000000000..7a95f9a11 --- /dev/null +++ b/internal/processing/timeline/timeline.go @@ -0,0 +1,38 @@ +// GoToSocial +// Copyright (C) GoToSocial Authors admin@gotosocial.org +// SPDX-License-Identifier: AGPL-3.0-or-later +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see <http://www.gnu.org/licenses/>. + +package timeline + +import ( + "github.com/superseriousbusiness/gotosocial/internal/state" + "github.com/superseriousbusiness/gotosocial/internal/typeutils" + "github.com/superseriousbusiness/gotosocial/internal/visibility" +) + +type Processor struct { + state *state.State + tc typeutils.TypeConverter + filter *visibility.Filter +} + +func New(state *state.State, tc typeutils.TypeConverter, filter *visibility.Filter) Processor { + return Processor{ + state: state, + tc: tc, + filter: filter, + } +} |