summaryrefslogtreecommitdiff
path: root/cmd
diff options
context:
space:
mode:
authorLibravatar kim <89579420+NyaaaWhatsUpDoc@users.noreply.github.com>2024-04-26 13:50:46 +0100
committerLibravatar GitHub <noreply@github.com>2024-04-26 13:50:46 +0100
commitc9c0773f2c2363dcfa37e675b83ec3f0b49bd0d9 (patch)
treedbd3409070765d5ca81448a574ccd32b4da1ffe6 /cmd
parent[chore] update Docker container to use new go swagger hash (#2872) (diff)
downloadgotosocial-c9c0773f2c2363dcfa37e675b83ec3f0b49bd0d9.tar.xz
[performance] update remaining worker pools to use queues (#2865)
* start replacing client + federator + media workers with new worker + queue types * refactor federatingDB.Delete(), drop queued messages when deleting account / status * move all queue purging to the processor workers * undo toolchain updates * code comments, ensure dereferencer worker pool gets started * update gruf libraries in readme * start the job scheduler separately to the worker pools * reshuffle ordering or server.go + remove duplicate worker start / stop * update go-list version * fix vendoring * move queue invalidation to before wipeing / deletion, to ensure queued work not dropped * add logging to worker processing functions in testrig, don't start workers in unexpected places * update go-structr to add (+then rely on) QueueCtx{} type * ensure more worker pools get started properly in tests * fix remaining broken tests relying on worker queue logic * fix account test suite queue popping logic, ensure noop workers do not pull from queue * move back accidentally shuffled account deletion order * ensure error (non nil!!) gets passed in refactored federatingDB{}.Delete() * silently drop deletes from accounts not permitted to * don't warn log on forwarded deletes * make if else clauses easier to parse * use getFederatorMsg() * improved code comment * improved code comment re: requesting account delete checks * remove boolean result from worker start / stop since false = already running or already stopped * remove optional passed-in http.client * remove worker starting from the admin CLI commands (we don't need to handle side-effects) * update prune cli to start scheduler but not all of the workers * fix rebase issues * remove redundant return statements * i'm sorry sir linter
Diffstat (limited to 'cmd')
-rw-r--r--cmd/gotosocial/action/admin/account/account.go1
-rw-r--r--cmd/gotosocial/action/admin/media/prune/common.go5
-rw-r--r--cmd/gotosocial/action/server/server.go56
3 files changed, 33 insertions, 29 deletions
diff --git a/cmd/gotosocial/action/admin/account/account.go b/cmd/gotosocial/action/admin/account/account.go
index 42b324107..0693dd04e 100644
--- a/cmd/gotosocial/action/admin/account/account.go
+++ b/cmd/gotosocial/action/admin/account/account.go
@@ -39,7 +39,6 @@ func initState(ctx context.Context) (*state.State, error) {
var state state.State
state.Caches.Init()
state.Caches.Start()
- state.Workers.Start()
// Set the state DB connection
dbConn, err := bundb.NewBunDBService(ctx, &state)
diff --git a/cmd/gotosocial/action/admin/media/prune/common.go b/cmd/gotosocial/action/admin/media/prune/common.go
index ed272984b..fcdb2bcf2 100644
--- a/cmd/gotosocial/action/admin/media/prune/common.go
+++ b/cmd/gotosocial/action/admin/media/prune/common.go
@@ -44,7 +44,10 @@ func setupPrune(ctx context.Context) (*prune, error) {
state.Caches.Init()
state.Caches.Start()
- state.Workers.Start()
+ // Scheduler is required for the
+ // claner, but no other workers
+ // are needed for this CLI action.
+ state.Workers.StartScheduler()
dbService, err := bundb.NewBunDBService(ctx, &state)
if err != nil {
diff --git a/cmd/gotosocial/action/server/server.go b/cmd/gotosocial/action/server/server.go
index 5aaccd1c4..22d8b3920 100644
--- a/cmd/gotosocial/action/server/server.go
+++ b/cmd/gotosocial/action/server/server.go
@@ -35,6 +35,7 @@ import (
"github.com/superseriousbusiness/gotosocial/internal/filter/spam"
"github.com/superseriousbusiness/gotosocial/internal/filter/visibility"
"github.com/superseriousbusiness/gotosocial/internal/gtserror"
+ "github.com/superseriousbusiness/gotosocial/internal/messages"
"github.com/superseriousbusiness/gotosocial/internal/metrics"
"github.com/superseriousbusiness/gotosocial/internal/middleware"
tlprocessor "github.com/superseriousbusiness/gotosocial/internal/processing/timeline"
@@ -128,25 +129,6 @@ var Start action.GTSAction = func(ctx context.Context) error {
TLSInsecureSkipVerify: config.GetHTTPClientTLSInsecureSkipVerify(),
})
- // Initialize delivery worker with http client.
- state.Workers.Delivery.Init(client)
-
- // Initialize workers.
- state.Workers.Start()
- defer state.Workers.Stop()
-
- // Add a task to the scheduler to sweep caches.
- // Frequency = 1 * minute
- // Threshold = 80% capacity
- _ = state.Workers.Scheduler.AddRecurring(
- "@cachesweep", // id
- time.Time{}, // start
- time.Minute, // freq
- func(context.Context, time.Time) {
- state.Caches.Sweep(60)
- },
- )
-
// Build handlers used in later initializations.
mediaManager := media.NewManager(&state)
oauthServer := oauth.New(ctx, dbService)
@@ -195,10 +177,27 @@ var Start action.GTSAction = func(ctx context.Context) error {
return fmt.Errorf("error starting list timeline: %s", err)
}
- // Create a media cleaner using the given state.
+ // Start the job scheduler
+ // (this is required for cleaner).
+ state.Workers.StartScheduler()
+
+ // Add a task to the scheduler to sweep caches.
+ // Frequency = 1 * minute
+ // Threshold = 60% capacity
+ _ = state.Workers.Scheduler.AddRecurring(
+ "@cachesweep", // id
+ time.Time{}, // start
+ time.Minute, // freq
+ func(context.Context, time.Time) {
+ state.Caches.Sweep(60)
+ },
+ )
+
+ // Create background cleaner.
cleaner := cleaner.New(&state)
- // Create the processor using all the other services we've created so far.
+ // Create the processor using all the
+ // other services we've created so far.
processor := processing.NewProcessor(
cleaner,
typeConverter,
@@ -209,13 +208,16 @@ var Start action.GTSAction = func(ctx context.Context) error {
emailSender,
)
- // Set state client / federator asynchronous worker enqueue functions
- state.Workers.EnqueueClientAPI = processor.Workers().EnqueueClientAPI
- state.Workers.EnqueueFediAPI = processor.Workers().EnqueueFediAPI
+ // Initialize the specialized workers.
+ state.Workers.Client.Init(messages.ClientMsgIndices())
+ state.Workers.Federator.Init(messages.FederatorMsgIndices())
+ state.Workers.Delivery.Init(client)
+ state.Workers.Client.Process = processor.Workers().ProcessFromClientAPI
+ state.Workers.Federator.Process = processor.Workers().ProcessFromFediAPI
- // Set state client / federator synchronous processing functions.
- state.Workers.ProcessFromClientAPI = processor.Workers().ProcessFromClientAPI
- state.Workers.ProcessFromFediAPI = processor.Workers().ProcessFromFediAPI
+ // Initialize workers.
+ state.Workers.Start()
+ defer state.Workers.Stop()
// Schedule tasks for all existing poll expiries.
if err := processor.Polls().ScheduleAll(ctx); err != nil {