diff options
Diffstat (limited to 'cmd/gotosocial/action/server/server.go')
-rw-r--r-- | cmd/gotosocial/action/server/server.go | 67 |
1 files changed, 45 insertions, 22 deletions
diff --git a/cmd/gotosocial/action/server/server.go b/cmd/gotosocial/action/server/server.go index 42cbf318b..68b039d0c 100644 --- a/cmd/gotosocial/action/server/server.go +++ b/cmd/gotosocial/action/server/server.go @@ -87,9 +87,9 @@ var Start action.GTSAction = func(ctx context.Context) error { // defer function for safe shutdown // depending on what services were // managed to be started. - - state = new(state.State) - route *router.Router + state = new(state.State) + route *router.Router + process *processing.Processor ) defer func() { @@ -125,6 +125,23 @@ var Start action.GTSAction = func(ctx context.Context) error { } } + if process != nil { + const timeout = time.Minute + + // Use a new timeout context to ensure + // persisting queued tasks does not fail! + // The main ctx is very likely canceled. + ctx := context.WithoutCancel(ctx) + ctx, cncl := context.WithTimeout(ctx, timeout) + defer cncl() + + // Now that all the "moving" components have been stopped, + // persist any remaining queued worker tasks to the database. + if err := process.Admin().PersistWorkerQueues(ctx); err != nil { + log.Errorf(ctx, "error persisting worker queues: %v", err) + } + } + if state.DB != nil { // Lastly, if database service was started, // ensure it gets closed now all else stopped. @@ -270,7 +287,7 @@ var Start action.GTSAction = func(ctx context.Context) error { // Create the processor using all the // other services we've created so far. - processor := processing.NewProcessor( + process = processing.NewProcessor( cleaner, typeConverter, federator, @@ -286,14 +303,14 @@ var Start action.GTSAction = func(ctx context.Context) error { state.Workers.Client.Init(messages.ClientMsgIndices()) state.Workers.Federator.Init(messages.FederatorMsgIndices()) state.Workers.Delivery.Init(client) - state.Workers.Client.Process = processor.Workers().ProcessFromClientAPI - state.Workers.Federator.Process = processor.Workers().ProcessFromFediAPI + state.Workers.Client.Process = process.Workers().ProcessFromClientAPI + state.Workers.Federator.Process = process.Workers().ProcessFromFediAPI // Now start workers! state.Workers.Start() // Schedule notif tasks for all existing poll expiries. - if err := processor.Polls().ScheduleAll(ctx); err != nil { + if err := process.Polls().ScheduleAll(ctx); err != nil { return fmt.Errorf("error scheduling poll expiries: %w", err) } @@ -303,7 +320,7 @@ var Start action.GTSAction = func(ctx context.Context) error { } // Run advanced migrations. - if err := processor.AdvancedMigrations().Migrate(ctx); err != nil { + if err := process.AdvancedMigrations().Migrate(ctx); err != nil { return err } @@ -370,7 +387,7 @@ var Start action.GTSAction = func(ctx context.Context) error { // attach global no route / 404 handler to the router route.AttachNoRouteHandler(func(c *gin.Context) { - apiutil.ErrorHandler(c, gtserror.NewErrorNotFound(errors.New(http.StatusText(http.StatusNotFound))), processor.InstanceGetV1) + apiutil.ErrorHandler(c, gtserror.NewErrorNotFound(errors.New(http.StatusText(http.StatusNotFound))), process.InstanceGetV1) }) // build router modules @@ -393,15 +410,15 @@ var Start action.GTSAction = func(ctx context.Context) error { } var ( - authModule = api.NewAuth(dbService, processor, idp, routerSession, sessionName) // auth/oauth paths - clientModule = api.NewClient(state, processor) // api client endpoints - metricsModule = api.NewMetrics() // Metrics endpoints - healthModule = api.NewHealth(dbService.Ready) // Health check endpoints - fileserverModule = api.NewFileserver(processor) // fileserver endpoints - wellKnownModule = api.NewWellKnown(processor) // .well-known endpoints - nodeInfoModule = api.NewNodeInfo(processor) // nodeinfo endpoint - activityPubModule = api.NewActivityPub(dbService, processor) // ActivityPub endpoints - webModule = web.New(dbService, processor) // web pages + user profiles + settings panels etc + authModule = api.NewAuth(dbService, process, idp, routerSession, sessionName) // auth/oauth paths + clientModule = api.NewClient(state, process) // api client endpoints + metricsModule = api.NewMetrics() // Metrics endpoints + healthModule = api.NewHealth(dbService.Ready) // Health check endpoints + fileserverModule = api.NewFileserver(process) // fileserver endpoints + wellKnownModule = api.NewWellKnown(process) // .well-known endpoints + nodeInfoModule = api.NewNodeInfo(process) // nodeinfo endpoint + activityPubModule = api.NewActivityPub(dbService, process) // ActivityPub endpoints + webModule = web.New(dbService, process) // web pages + user profiles + settings panels etc ) // create required middleware @@ -416,10 +433,11 @@ var Start action.GTSAction = func(ctx context.Context) error { // throttling cpuMultiplier := config.GetAdvancedThrottlingMultiplier() retryAfter := config.GetAdvancedThrottlingRetryAfter() - clThrottle := middleware.Throttle(cpuMultiplier, retryAfter) // client api - s2sThrottle := middleware.Throttle(cpuMultiplier, retryAfter) // server-to-server (AP) - fsThrottle := middleware.Throttle(cpuMultiplier, retryAfter) // fileserver / web templates / emojis - pkThrottle := middleware.Throttle(cpuMultiplier, retryAfter) // throttle public key endpoint separately + clThrottle := middleware.Throttle(cpuMultiplier, retryAfter) // client api + s2sThrottle := middleware.Throttle(cpuMultiplier, retryAfter) + // server-to-server (AP) + fsThrottle := middleware.Throttle(cpuMultiplier, retryAfter) // fileserver / web templates / emojis + pkThrottle := middleware.Throttle(cpuMultiplier, retryAfter) // throttle public key endpoint separately gzip := middleware.Gzip() // applied to all except fileserver @@ -442,6 +460,11 @@ var Start action.GTSAction = func(ctx context.Context) error { return fmt.Errorf("error starting router: %w", err) } + // Fill worker queues from persisted task data in database. + if err := process.Admin().FillWorkerQueues(ctx); err != nil { + return fmt.Errorf("error filling worker queues: %w", err) + } + // catch shutdown signals from the operating system sigs := make(chan os.Signal, 1) signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM) |