summaryrefslogtreecommitdiff
path: root/cmd/gotosocial/action/server/server.go
diff options
context:
space:
mode:
Diffstat (limited to 'cmd/gotosocial/action/server/server.go')
-rw-r--r--cmd/gotosocial/action/server/server.go67
1 files changed, 45 insertions, 22 deletions
diff --git a/cmd/gotosocial/action/server/server.go b/cmd/gotosocial/action/server/server.go
index 42cbf318b..68b039d0c 100644
--- a/cmd/gotosocial/action/server/server.go
+++ b/cmd/gotosocial/action/server/server.go
@@ -87,9 +87,9 @@ var Start action.GTSAction = func(ctx context.Context) error {
// defer function for safe shutdown
// depending on what services were
// managed to be started.
-
- state = new(state.State)
- route *router.Router
+ state = new(state.State)
+ route *router.Router
+ process *processing.Processor
)
defer func() {
@@ -125,6 +125,23 @@ var Start action.GTSAction = func(ctx context.Context) error {
}
}
+ if process != nil {
+ const timeout = time.Minute
+
+ // Use a new timeout context to ensure
+ // persisting queued tasks does not fail!
+ // The main ctx is very likely canceled.
+ ctx := context.WithoutCancel(ctx)
+ ctx, cncl := context.WithTimeout(ctx, timeout)
+ defer cncl()
+
+ // Now that all the "moving" components have been stopped,
+ // persist any remaining queued worker tasks to the database.
+ if err := process.Admin().PersistWorkerQueues(ctx); err != nil {
+ log.Errorf(ctx, "error persisting worker queues: %v", err)
+ }
+ }
+
if state.DB != nil {
// Lastly, if database service was started,
// ensure it gets closed now all else stopped.
@@ -270,7 +287,7 @@ var Start action.GTSAction = func(ctx context.Context) error {
// Create the processor using all the
// other services we've created so far.
- processor := processing.NewProcessor(
+ process = processing.NewProcessor(
cleaner,
typeConverter,
federator,
@@ -286,14 +303,14 @@ var Start action.GTSAction = func(ctx context.Context) error {
state.Workers.Client.Init(messages.ClientMsgIndices())
state.Workers.Federator.Init(messages.FederatorMsgIndices())
state.Workers.Delivery.Init(client)
- state.Workers.Client.Process = processor.Workers().ProcessFromClientAPI
- state.Workers.Federator.Process = processor.Workers().ProcessFromFediAPI
+ state.Workers.Client.Process = process.Workers().ProcessFromClientAPI
+ state.Workers.Federator.Process = process.Workers().ProcessFromFediAPI
// Now start workers!
state.Workers.Start()
// Schedule notif tasks for all existing poll expiries.
- if err := processor.Polls().ScheduleAll(ctx); err != nil {
+ if err := process.Polls().ScheduleAll(ctx); err != nil {
return fmt.Errorf("error scheduling poll expiries: %w", err)
}
@@ -303,7 +320,7 @@ var Start action.GTSAction = func(ctx context.Context) error {
}
// Run advanced migrations.
- if err := processor.AdvancedMigrations().Migrate(ctx); err != nil {
+ if err := process.AdvancedMigrations().Migrate(ctx); err != nil {
return err
}
@@ -370,7 +387,7 @@ var Start action.GTSAction = func(ctx context.Context) error {
// attach global no route / 404 handler to the router
route.AttachNoRouteHandler(func(c *gin.Context) {
- apiutil.ErrorHandler(c, gtserror.NewErrorNotFound(errors.New(http.StatusText(http.StatusNotFound))), processor.InstanceGetV1)
+ apiutil.ErrorHandler(c, gtserror.NewErrorNotFound(errors.New(http.StatusText(http.StatusNotFound))), process.InstanceGetV1)
})
// build router modules
@@ -393,15 +410,15 @@ var Start action.GTSAction = func(ctx context.Context) error {
}
var (
- authModule = api.NewAuth(dbService, processor, idp, routerSession, sessionName) // auth/oauth paths
- clientModule = api.NewClient(state, processor) // api client endpoints
- metricsModule = api.NewMetrics() // Metrics endpoints
- healthModule = api.NewHealth(dbService.Ready) // Health check endpoints
- fileserverModule = api.NewFileserver(processor) // fileserver endpoints
- wellKnownModule = api.NewWellKnown(processor) // .well-known endpoints
- nodeInfoModule = api.NewNodeInfo(processor) // nodeinfo endpoint
- activityPubModule = api.NewActivityPub(dbService, processor) // ActivityPub endpoints
- webModule = web.New(dbService, processor) // web pages + user profiles + settings panels etc
+ authModule = api.NewAuth(dbService, process, idp, routerSession, sessionName) // auth/oauth paths
+ clientModule = api.NewClient(state, process) // api client endpoints
+ metricsModule = api.NewMetrics() // Metrics endpoints
+ healthModule = api.NewHealth(dbService.Ready) // Health check endpoints
+ fileserverModule = api.NewFileserver(process) // fileserver endpoints
+ wellKnownModule = api.NewWellKnown(process) // .well-known endpoints
+ nodeInfoModule = api.NewNodeInfo(process) // nodeinfo endpoint
+ activityPubModule = api.NewActivityPub(dbService, process) // ActivityPub endpoints
+ webModule = web.New(dbService, process) // web pages + user profiles + settings panels etc
)
// create required middleware
@@ -416,10 +433,11 @@ var Start action.GTSAction = func(ctx context.Context) error {
// throttling
cpuMultiplier := config.GetAdvancedThrottlingMultiplier()
retryAfter := config.GetAdvancedThrottlingRetryAfter()
- clThrottle := middleware.Throttle(cpuMultiplier, retryAfter) // client api
- s2sThrottle := middleware.Throttle(cpuMultiplier, retryAfter) // server-to-server (AP)
- fsThrottle := middleware.Throttle(cpuMultiplier, retryAfter) // fileserver / web templates / emojis
- pkThrottle := middleware.Throttle(cpuMultiplier, retryAfter) // throttle public key endpoint separately
+ clThrottle := middleware.Throttle(cpuMultiplier, retryAfter) // client api
+ s2sThrottle := middleware.Throttle(cpuMultiplier, retryAfter)
+ // server-to-server (AP)
+ fsThrottle := middleware.Throttle(cpuMultiplier, retryAfter) // fileserver / web templates / emojis
+ pkThrottle := middleware.Throttle(cpuMultiplier, retryAfter) // throttle public key endpoint separately
gzip := middleware.Gzip() // applied to all except fileserver
@@ -442,6 +460,11 @@ var Start action.GTSAction = func(ctx context.Context) error {
return fmt.Errorf("error starting router: %w", err)
}
+ // Fill worker queues from persisted task data in database.
+ if err := process.Admin().FillWorkerQueues(ctx); err != nil {
+ return fmt.Errorf("error filling worker queues: %w", err)
+ }
+
// catch shutdown signals from the operating system
sigs := make(chan os.Signal, 1)
signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM)