diff options
author | 2022-04-28 13:23:11 +0100 | |
---|---|---|
committer | 2022-04-28 13:23:11 +0100 | |
commit | 420e2fb22bc7aa4967ddadb11e444079efdf5117 (patch) | |
tree | 413842c5df646c30a8079671ade5e677e3825fb8 /internal/processing/status | |
parent | [bugfix] Fix possible race condition in federatingdb (#490) (diff) | |
download | gotosocial-420e2fb22bc7aa4967ddadb11e444079efdf5117.tar.xz |
replace async client API / federator msg processing with worker pools (#497)
* replace async client API / federator msg processing with worker pools
* appease our lord-and-saviour, the linter
Diffstat (limited to 'internal/processing/status')
-rw-r--r-- | internal/processing/status/boost.go | 4 | ||||
-rw-r--r-- | internal/processing/status/create.go | 4 | ||||
-rw-r--r-- | internal/processing/status/delete.go | 4 | ||||
-rw-r--r-- | internal/processing/status/fave.go | 4 | ||||
-rw-r--r-- | internal/processing/status/status.go | 27 | ||||
-rw-r--r-- | internal/processing/status/status_test.go | 25 | ||||
-rw-r--r-- | internal/processing/status/unboost.go | 4 | ||||
-rw-r--r-- | internal/processing/status/unfave.go | 4 |
8 files changed, 40 insertions, 36 deletions
diff --git a/internal/processing/status/boost.go b/internal/processing/status/boost.go index 44296eec9..b222544ca 100644 --- a/internal/processing/status/boost.go +++ b/internal/processing/status/boost.go @@ -65,13 +65,13 @@ func (p *processor) Boost(ctx context.Context, requestingAccount *gtsmodel.Accou } // send it back to the processor for async processing - p.fromClientAPI <- messages.FromClientAPI{ + p.clientWorker.Queue(messages.FromClientAPI{ APObjectType: ap.ActivityAnnounce, APActivityType: ap.ActivityCreate, GTSModel: boostWrapperStatus, OriginAccount: requestingAccount, TargetAccount: targetStatus.Account, - } + }) // return the frontend representation of the new status to the submitter apiStatus, err := p.tc.StatusToAPIStatus(ctx, boostWrapperStatus, requestingAccount) diff --git a/internal/processing/status/create.go b/internal/processing/status/create.go index 1a832d5c4..1e93af162 100644 --- a/internal/processing/status/create.go +++ b/internal/processing/status/create.go @@ -97,12 +97,12 @@ func (p *processor) Create(ctx context.Context, account *gtsmodel.Account, appli } // send it back to the processor for async processing - p.fromClientAPI <- messages.FromClientAPI{ + p.clientWorker.Queue(messages.FromClientAPI{ APObjectType: ap.ObjectNote, APActivityType: ap.ActivityCreate, GTSModel: newStatus, OriginAccount: account, - } + }) // return the frontend representation of the new status to the submitter apiStatus, err := p.tc.StatusToAPIStatus(ctx, newStatus, account) diff --git a/internal/processing/status/delete.go b/internal/processing/status/delete.go index 8d02d37b1..6db0d9890 100644 --- a/internal/processing/status/delete.go +++ b/internal/processing/status/delete.go @@ -53,13 +53,13 @@ func (p *processor) Delete(ctx context.Context, requestingAccount *gtsmodel.Acco } // send it back to the processor for async processing - p.fromClientAPI <- messages.FromClientAPI{ + p.clientWorker.Queue(messages.FromClientAPI{ APObjectType: ap.ObjectNote, APActivityType: ap.ActivityDelete, GTSModel: targetStatus, OriginAccount: requestingAccount, TargetAccount: requestingAccount, - } + }) return apiStatus, nil } diff --git a/internal/processing/status/fave.go b/internal/processing/status/fave.go index 42fcb5220..1b40d9da1 100644 --- a/internal/processing/status/fave.go +++ b/internal/processing/status/fave.go @@ -84,13 +84,13 @@ func (p *processor) Fave(ctx context.Context, requestingAccount *gtsmodel.Accoun } // send it back to the processor for async processing - p.fromClientAPI <- messages.FromClientAPI{ + p.clientWorker.Queue(messages.FromClientAPI{ APObjectType: ap.ActivityLike, APActivityType: ap.ActivityCreate, GTSModel: gtsFave, OriginAccount: requestingAccount, TargetAccount: targetStatus.Account, - } + }) } // return the apidon representation of the target status diff --git a/internal/processing/status/status.go b/internal/processing/status/status.go index 421ab5bbe..207bffb30 100644 --- a/internal/processing/status/status.go +++ b/internal/processing/status/status.go @@ -29,6 +29,7 @@ import ( "github.com/superseriousbusiness/gotosocial/internal/text" "github.com/superseriousbusiness/gotosocial/internal/typeutils" "github.com/superseriousbusiness/gotosocial/internal/visibility" + "github.com/superseriousbusiness/gotosocial/internal/worker" ) // Processor wraps a bunch of functions for processing statuses. @@ -69,22 +70,22 @@ type Processor interface { } type processor struct { - tc typeutils.TypeConverter - db db.DB - filter visibility.Filter - formatter text.Formatter - fromClientAPI chan messages.FromClientAPI - parseMention gtsmodel.ParseMentionFunc + tc typeutils.TypeConverter + db db.DB + filter visibility.Filter + formatter text.Formatter + clientWorker *worker.Worker[messages.FromClientAPI] + parseMention gtsmodel.ParseMentionFunc } // New returns a new status processor. -func New(db db.DB, tc typeutils.TypeConverter, fromClientAPI chan messages.FromClientAPI, parseMention gtsmodel.ParseMentionFunc) Processor { +func New(db db.DB, tc typeutils.TypeConverter, clientWorker *worker.Worker[messages.FromClientAPI], parseMention gtsmodel.ParseMentionFunc) Processor { return &processor{ - tc: tc, - db: db, - filter: visibility.NewFilter(db), - formatter: text.NewFormatter(db), - fromClientAPI: fromClientAPI, - parseMention: parseMention, + tc: tc, + db: db, + filter: visibility.NewFilter(db), + formatter: text.NewFormatter(db), + clientWorker: clientWorker, + parseMention: parseMention, } } diff --git a/internal/processing/status/status_test.go b/internal/processing/status/status_test.go index 25794070e..d2126f03d 100644 --- a/internal/processing/status/status_test.go +++ b/internal/processing/status/status_test.go @@ -30,18 +30,19 @@ import ( "github.com/superseriousbusiness/gotosocial/internal/processing/status" "github.com/superseriousbusiness/gotosocial/internal/transport" "github.com/superseriousbusiness/gotosocial/internal/typeutils" + "github.com/superseriousbusiness/gotosocial/internal/worker" "github.com/superseriousbusiness/gotosocial/testrig" ) type StatusStandardTestSuite struct { suite.Suite - db db.DB - typeConverter typeutils.TypeConverter - tc transport.Controller - storage *kv.KVStore - mediaManager media.Manager - federator federation.Federator - fromClientAPIChan chan messages.FromClientAPI + db db.DB + typeConverter typeutils.TypeConverter + tc transport.Controller + storage *kv.KVStore + mediaManager media.Manager + federator federation.Federator + clientWorker *worker.Worker[messages.FromClientAPI] // standard suite models testTokens map[string]*gtsmodel.Token @@ -74,14 +75,16 @@ func (suite *StatusStandardTestSuite) SetupTest() { testrig.InitTestConfig() testrig.InitTestLog() + fedWorker := worker.New[messages.FromFederator](-1, -1) + suite.db = testrig.NewTestDB() suite.typeConverter = testrig.NewTestTypeConverter(suite.db) - suite.fromClientAPIChan = make(chan messages.FromClientAPI, 100) - suite.tc = testrig.NewTestTransportController(testrig.NewMockHTTPClient(nil), suite.db) + suite.clientWorker = worker.New[messages.FromClientAPI](-1, -1) + suite.tc = testrig.NewTestTransportController(testrig.NewMockHTTPClient(nil), suite.db, fedWorker) suite.storage = testrig.NewTestStorage() suite.mediaManager = testrig.NewTestMediaManager(suite.db, suite.storage) - suite.federator = testrig.NewTestFederator(suite.db, suite.tc, suite.storage, suite.mediaManager) - suite.status = status.New(suite.db, suite.typeConverter, suite.fromClientAPIChan, processing.GetParseMentionFunc(suite.db, suite.federator)) + suite.federator = testrig.NewTestFederator(suite.db, suite.tc, suite.storage, suite.mediaManager, fedWorker) + suite.status = status.New(suite.db, suite.typeConverter, suite.clientWorker, processing.GetParseMentionFunc(suite.db, suite.federator)) testrig.StandardDBSetup(suite.db, suite.testAccounts) testrig.StandardStorageSetup(suite.storage, "../../../testrig/media") diff --git a/internal/processing/status/unboost.go b/internal/processing/status/unboost.go index 520704209..75158fd40 100644 --- a/internal/processing/status/unboost.go +++ b/internal/processing/status/unboost.go @@ -91,13 +91,13 @@ func (p *processor) Unboost(ctx context.Context, requestingAccount *gtsmodel.Acc gtsBoost.BoostOf.Account = targetStatus.Account // send it back to the processor for async processing - p.fromClientAPI <- messages.FromClientAPI{ + p.clientWorker.Queue(messages.FromClientAPI{ APObjectType: ap.ActivityAnnounce, APActivityType: ap.ActivityUndo, GTSModel: gtsBoost, OriginAccount: requestingAccount, TargetAccount: targetStatus.Account, - } + }) } apiStatus, err := p.tc.StatusToAPIStatus(ctx, targetStatus, requestingAccount) diff --git a/internal/processing/status/unfave.go b/internal/processing/status/unfave.go index c5784a74b..1ccc80ebf 100644 --- a/internal/processing/status/unfave.go +++ b/internal/processing/status/unfave.go @@ -73,13 +73,13 @@ func (p *processor) Unfave(ctx context.Context, requestingAccount *gtsmodel.Acco } // send it back to the processor for async processing - p.fromClientAPI <- messages.FromClientAPI{ + p.clientWorker.Queue(messages.FromClientAPI{ APObjectType: ap.ActivityLike, APActivityType: ap.ActivityUndo, GTSModel: gtsFave, OriginAccount: requestingAccount, TargetAccount: targetStatus.Account, - } + }) } apiStatus, err := p.tc.StatusToAPIStatus(ctx, targetStatus, requestingAccount) |