diff options
Diffstat (limited to 'cmd/gotosocial/action/server/server.go')
-rw-r--r-- | cmd/gotosocial/action/server/server.go | 56 |
1 files changed, 29 insertions, 27 deletions
diff --git a/cmd/gotosocial/action/server/server.go b/cmd/gotosocial/action/server/server.go index 5aaccd1c4..22d8b3920 100644 --- a/cmd/gotosocial/action/server/server.go +++ b/cmd/gotosocial/action/server/server.go @@ -35,6 +35,7 @@ import ( "github.com/superseriousbusiness/gotosocial/internal/filter/spam" "github.com/superseriousbusiness/gotosocial/internal/filter/visibility" "github.com/superseriousbusiness/gotosocial/internal/gtserror" + "github.com/superseriousbusiness/gotosocial/internal/messages" "github.com/superseriousbusiness/gotosocial/internal/metrics" "github.com/superseriousbusiness/gotosocial/internal/middleware" tlprocessor "github.com/superseriousbusiness/gotosocial/internal/processing/timeline" @@ -128,25 +129,6 @@ var Start action.GTSAction = func(ctx context.Context) error { TLSInsecureSkipVerify: config.GetHTTPClientTLSInsecureSkipVerify(), }) - // Initialize delivery worker with http client. - state.Workers.Delivery.Init(client) - - // Initialize workers. - state.Workers.Start() - defer state.Workers.Stop() - - // Add a task to the scheduler to sweep caches. - // Frequency = 1 * minute - // Threshold = 80% capacity - _ = state.Workers.Scheduler.AddRecurring( - "@cachesweep", // id - time.Time{}, // start - time.Minute, // freq - func(context.Context, time.Time) { - state.Caches.Sweep(60) - }, - ) - // Build handlers used in later initializations. mediaManager := media.NewManager(&state) oauthServer := oauth.New(ctx, dbService) @@ -195,10 +177,27 @@ var Start action.GTSAction = func(ctx context.Context) error { return fmt.Errorf("error starting list timeline: %s", err) } - // Create a media cleaner using the given state. + // Start the job scheduler + // (this is required for cleaner). + state.Workers.StartScheduler() + + // Add a task to the scheduler to sweep caches. + // Frequency = 1 * minute + // Threshold = 60% capacity + _ = state.Workers.Scheduler.AddRecurring( + "@cachesweep", // id + time.Time{}, // start + time.Minute, // freq + func(context.Context, time.Time) { + state.Caches.Sweep(60) + }, + ) + + // Create background cleaner. cleaner := cleaner.New(&state) - // Create the processor using all the other services we've created so far. + // Create the processor using all the + // other services we've created so far. processor := processing.NewProcessor( cleaner, typeConverter, @@ -209,13 +208,16 @@ var Start action.GTSAction = func(ctx context.Context) error { emailSender, ) - // Set state client / federator asynchronous worker enqueue functions - state.Workers.EnqueueClientAPI = processor.Workers().EnqueueClientAPI - state.Workers.EnqueueFediAPI = processor.Workers().EnqueueFediAPI + // Initialize the specialized workers. + state.Workers.Client.Init(messages.ClientMsgIndices()) + state.Workers.Federator.Init(messages.FederatorMsgIndices()) + state.Workers.Delivery.Init(client) + state.Workers.Client.Process = processor.Workers().ProcessFromClientAPI + state.Workers.Federator.Process = processor.Workers().ProcessFromFediAPI - // Set state client / federator synchronous processing functions. - state.Workers.ProcessFromClientAPI = processor.Workers().ProcessFromClientAPI - state.Workers.ProcessFromFediAPI = processor.Workers().ProcessFromFediAPI + // Initialize workers. + state.Workers.Start() + defer state.Workers.Stop() // Schedule tasks for all existing poll expiries. if err := processor.Polls().ScheduleAll(ctx); err != nil { |