summaryrefslogtreecommitdiff
path: root/cmd
diff options
context:
space:
mode:
authorLibravatar kim <89579420+NyaaaWhatsUpDoc@users.noreply.github.com>2024-07-30 11:58:31 +0000
committerLibravatar GitHub <noreply@github.com>2024-07-30 13:58:31 +0200
commit87cff71af95d2cef095a5feea40e48b40576b3d0 (patch)
tree9725ac3ab67d050e78016a2246d2b020635edcb7 /cmd
parent[chore] replace UniqueStrings with Deduplicate (#3154) (diff)
downloadgotosocial-87cff71af95d2cef095a5feea40e48b40576b3d0.tar.xz
[feature] persist worker queues to db (#3042)
* persist queued worker tasks to database on shutdown, fill worker queues from database on startup * ensure the tasks are sorted by creation time before pushing them * add migration to insert WorkerTask{} into database, add test for worker task persistence * add test for recovering worker queues from database * quick tweak * whoops we ended up with double cleaner job scheduling * insert each task separately, because bun is throwing some reflection error?? * add specific checking of cancelled worker contexts * add http request signing to deliveries recovered from database * add test for outgoing public key ID being correctly set on delivery * replace select with Queue.PopCtx() * get rid of loop now we don't use it * remove field now we don't use it * ensure that signing func is set * header values weren't being copied over :facepalm: * use ptr for httpclient.Request in delivery * move worker queue filling to later in server init process * fix rebase issues * make logging less shouty * use slices.Delete() instead of copying / reslicing * have database return tasks in ascending order instead of sorting them * add a 1 minute timeout to persisting worker queues
Diffstat (limited to 'cmd')
-rw-r--r--cmd/gotosocial/action/server/server.go67
1 files changed, 45 insertions, 22 deletions
diff --git a/cmd/gotosocial/action/server/server.go b/cmd/gotosocial/action/server/server.go
index 42cbf318b..68b039d0c 100644
--- a/cmd/gotosocial/action/server/server.go
+++ b/cmd/gotosocial/action/server/server.go
@@ -87,9 +87,9 @@ var Start action.GTSAction = func(ctx context.Context) error {
// defer function for safe shutdown
// depending on what services were
// managed to be started.
-
- state = new(state.State)
- route *router.Router
+ state = new(state.State)
+ route *router.Router
+ process *processing.Processor
)
defer func() {
@@ -125,6 +125,23 @@ var Start action.GTSAction = func(ctx context.Context) error {
}
}
+ if process != nil {
+ const timeout = time.Minute
+
+ // Use a new timeout context to ensure
+ // persisting queued tasks does not fail!
+ // The main ctx is very likely canceled.
+ ctx := context.WithoutCancel(ctx)
+ ctx, cncl := context.WithTimeout(ctx, timeout)
+ defer cncl()
+
+ // Now that all the "moving" components have been stopped,
+ // persist any remaining queued worker tasks to the database.
+ if err := process.Admin().PersistWorkerQueues(ctx); err != nil {
+ log.Errorf(ctx, "error persisting worker queues: %v", err)
+ }
+ }
+
if state.DB != nil {
// Lastly, if database service was started,
// ensure it gets closed now all else stopped.
@@ -270,7 +287,7 @@ var Start action.GTSAction = func(ctx context.Context) error {
// Create the processor using all the
// other services we've created so far.
- processor := processing.NewProcessor(
+ process = processing.NewProcessor(
cleaner,
typeConverter,
federator,
@@ -286,14 +303,14 @@ var Start action.GTSAction = func(ctx context.Context) error {
state.Workers.Client.Init(messages.ClientMsgIndices())
state.Workers.Federator.Init(messages.FederatorMsgIndices())
state.Workers.Delivery.Init(client)
- state.Workers.Client.Process = processor.Workers().ProcessFromClientAPI
- state.Workers.Federator.Process = processor.Workers().ProcessFromFediAPI
+ state.Workers.Client.Process = process.Workers().ProcessFromClientAPI
+ state.Workers.Federator.Process = process.Workers().ProcessFromFediAPI
// Now start workers!
state.Workers.Start()
// Schedule notif tasks for all existing poll expiries.
- if err := processor.Polls().ScheduleAll(ctx); err != nil {
+ if err := process.Polls().ScheduleAll(ctx); err != nil {
return fmt.Errorf("error scheduling poll expiries: %w", err)
}
@@ -303,7 +320,7 @@ var Start action.GTSAction = func(ctx context.Context) error {
}
// Run advanced migrations.
- if err := processor.AdvancedMigrations().Migrate(ctx); err != nil {
+ if err := process.AdvancedMigrations().Migrate(ctx); err != nil {
return err
}
@@ -370,7 +387,7 @@ var Start action.GTSAction = func(ctx context.Context) error {
// attach global no route / 404 handler to the router
route.AttachNoRouteHandler(func(c *gin.Context) {
- apiutil.ErrorHandler(c, gtserror.NewErrorNotFound(errors.New(http.StatusText(http.StatusNotFound))), processor.InstanceGetV1)
+ apiutil.ErrorHandler(c, gtserror.NewErrorNotFound(errors.New(http.StatusText(http.StatusNotFound))), process.InstanceGetV1)
})
// build router modules
@@ -393,15 +410,15 @@ var Start action.GTSAction = func(ctx context.Context) error {
}
var (
- authModule = api.NewAuth(dbService, processor, idp, routerSession, sessionName) // auth/oauth paths
- clientModule = api.NewClient(state, processor) // api client endpoints
- metricsModule = api.NewMetrics() // Metrics endpoints
- healthModule = api.NewHealth(dbService.Ready) // Health check endpoints
- fileserverModule = api.NewFileserver(processor) // fileserver endpoints
- wellKnownModule = api.NewWellKnown(processor) // .well-known endpoints
- nodeInfoModule = api.NewNodeInfo(processor) // nodeinfo endpoint
- activityPubModule = api.NewActivityPub(dbService, processor) // ActivityPub endpoints
- webModule = web.New(dbService, processor) // web pages + user profiles + settings panels etc
+ authModule = api.NewAuth(dbService, process, idp, routerSession, sessionName) // auth/oauth paths
+ clientModule = api.NewClient(state, process) // api client endpoints
+ metricsModule = api.NewMetrics() // Metrics endpoints
+ healthModule = api.NewHealth(dbService.Ready) // Health check endpoints
+ fileserverModule = api.NewFileserver(process) // fileserver endpoints
+ wellKnownModule = api.NewWellKnown(process) // .well-known endpoints
+ nodeInfoModule = api.NewNodeInfo(process) // nodeinfo endpoint
+ activityPubModule = api.NewActivityPub(dbService, process) // ActivityPub endpoints
+ webModule = web.New(dbService, process) // web pages + user profiles + settings panels etc
)
// create required middleware
@@ -416,10 +433,11 @@ var Start action.GTSAction = func(ctx context.Context) error {
// throttling
cpuMultiplier := config.GetAdvancedThrottlingMultiplier()
retryAfter := config.GetAdvancedThrottlingRetryAfter()
- clThrottle := middleware.Throttle(cpuMultiplier, retryAfter) // client api
- s2sThrottle := middleware.Throttle(cpuMultiplier, retryAfter) // server-to-server (AP)
- fsThrottle := middleware.Throttle(cpuMultiplier, retryAfter) // fileserver / web templates / emojis
- pkThrottle := middleware.Throttle(cpuMultiplier, retryAfter) // throttle public key endpoint separately
+ clThrottle := middleware.Throttle(cpuMultiplier, retryAfter) // client api
+ s2sThrottle := middleware.Throttle(cpuMultiplier, retryAfter)
+ // server-to-server (AP)
+ fsThrottle := middleware.Throttle(cpuMultiplier, retryAfter) // fileserver / web templates / emojis
+ pkThrottle := middleware.Throttle(cpuMultiplier, retryAfter) // throttle public key endpoint separately
gzip := middleware.Gzip() // applied to all except fileserver
@@ -442,6 +460,11 @@ var Start action.GTSAction = func(ctx context.Context) error {
return fmt.Errorf("error starting router: %w", err)
}
+ // Fill worker queues from persisted task data in database.
+ if err := process.Admin().FillWorkerQueues(ctx); err != nil {
+ return fmt.Errorf("error filling worker queues: %w", err)
+ }
+
// catch shutdown signals from the operating system
sigs := make(chan os.Signal, 1)
signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM)