summaryrefslogtreecommitdiff
path: root/internal/processing/admin/workertask.go
diff options
context:
space:
mode:
authorLibravatar kim <89579420+NyaaaWhatsUpDoc@users.noreply.github.com>2024-07-30 11:58:31 +0000
committerLibravatar GitHub <noreply@github.com>2024-07-30 13:58:31 +0200
commit87cff71af95d2cef095a5feea40e48b40576b3d0 (patch)
tree9725ac3ab67d050e78016a2246d2b020635edcb7 /internal/processing/admin/workertask.go
parent[chore] replace UniqueStrings with Deduplicate (#3154) (diff)
downloadgotosocial-87cff71af95d2cef095a5feea40e48b40576b3d0.tar.xz
[feature] persist worker queues to db (#3042)
* persist queued worker tasks to database on shutdown, fill worker queues from database on startup * ensure the tasks are sorted by creation time before pushing them * add migration to insert WorkerTask{} into database, add test for worker task persistence * add test for recovering worker queues from database * quick tweak * whoops we ended up with double cleaner job scheduling * insert each task separately, because bun is throwing some reflection error?? * add specific checking of cancelled worker contexts * add http request signing to deliveries recovered from database * add test for outgoing public key ID being correctly set on delivery * replace select with Queue.PopCtx() * get rid of loop now we don't use it * remove field now we don't use it * ensure that signing func is set * header values weren't being copied over :facepalm: * use ptr for httpclient.Request in delivery * move worker queue filling to later in server init process * fix rebase issues * make logging less shouty * use slices.Delete() instead of copying / reslicing * have database return tasks in ascending order instead of sorting them * add a 1 minute timeout to persisting worker queues
Diffstat (limited to 'internal/processing/admin/workertask.go')
-rw-r--r--internal/processing/admin/workertask.go426
1 files changed, 426 insertions, 0 deletions
diff --git a/internal/processing/admin/workertask.go b/internal/processing/admin/workertask.go
new file mode 100644
index 000000000..6d7cc7b7a
--- /dev/null
+++ b/internal/processing/admin/workertask.go
@@ -0,0 +1,426 @@
+// GoToSocial
+// Copyright (C) GoToSocial Authors admin@gotosocial.org
+// SPDX-License-Identifier: AGPL-3.0-or-later
+//
+// This program is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Affero General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// This program is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Affero General Public License for more details.
+//
+// You should have received a copy of the GNU Affero General Public License
+// along with this program. If not, see <http://www.gnu.org/licenses/>.
+
+package admin
+
+import (
+ "context"
+ "fmt"
+ "slices"
+ "time"
+
+ "github.com/superseriousbusiness/gotosocial/internal/gtserror"
+ "github.com/superseriousbusiness/gotosocial/internal/gtsmodel"
+ "github.com/superseriousbusiness/gotosocial/internal/log"
+ "github.com/superseriousbusiness/gotosocial/internal/messages"
+ "github.com/superseriousbusiness/gotosocial/internal/transport"
+ "github.com/superseriousbusiness/gotosocial/internal/transport/delivery"
+)
+
+// NOTE:
+// Having these functions in the processor, which is
+// usually the intermediary that performs *processing*
+// between the HTTP route handlers and the underlying
+// database / storage layers is a little odd, so this
+// may be subject to change!
+//
+// For now at least, this is a useful place that has
+// access to the underlying database, workers and
+// causes no dependency cycles with this use case!
+
+// FillWorkerQueues recovers all serialized worker tasks from the database
+// (if any!), and pushes them to each of their relevant worker queues.
+func (p *Processor) FillWorkerQueues(ctx context.Context) error {
+ log.Info(ctx, "rehydrate!")
+
+ // Get all persisted worker tasks from db.
+ //
+ // (database returns these as ASCENDING, i.e.
+ // returned in the order they were inserted).
+ tasks, err := p.state.DB.GetWorkerTasks(ctx)
+ if err != nil {
+ return gtserror.Newf("error fetching worker tasks from db: %w", err)
+ }
+
+ var (
+ // Counts of each task type
+ // successfully recovered.
+ delivery int
+ federator int
+ client int
+
+ // Failed recoveries.
+ errors int
+ )
+
+loop:
+
+ // Handle each persisted task, removing
+ // all those we can't handle. Leaving us
+ // with a slice of tasks we can safely
+ // delete from being persisted in the DB.
+ for i := 0; i < len(tasks); {
+ var err error
+
+ // Task at index.
+ task := tasks[i]
+
+ // Appropriate task count
+ // pointer to increment.
+ var counter *int
+
+ // Attempt to recovery persisted
+ // task depending on worker type.
+ switch task.WorkerType {
+ case gtsmodel.DeliveryWorker:
+ err = p.pushDelivery(ctx, task)
+ counter = &delivery
+ case gtsmodel.FederatorWorker:
+ err = p.pushFederator(ctx, task)
+ counter = &federator
+ case gtsmodel.ClientWorker:
+ err = p.pushClient(ctx, task)
+ counter = &client
+ default:
+ err = fmt.Errorf("invalid worker type %d", task.WorkerType)
+ }
+
+ if err != nil {
+ log.Errorf(ctx, "error pushing task %d: %v", task.ID, err)
+
+ // Drop error'd task from slice.
+ tasks = slices.Delete(tasks, i, i+1)
+
+ // Incr errors.
+ errors++
+ continue loop
+ }
+
+ // Increment slice
+ // index & counter.
+ (*counter)++
+ i++
+ }
+
+ // Tasks that worker successfully pushed
+ // to their appropriate workers, we can
+ // safely now remove from the database.
+ for _, task := range tasks {
+ if err := p.state.DB.DeleteWorkerTaskByID(ctx, task.ID); err != nil {
+ log.Errorf(ctx, "error deleting task from db: %v", err)
+ }
+ }
+
+ // Log recovered tasks.
+ log.WithContext(ctx).
+ WithField("delivery", delivery).
+ WithField("federator", federator).
+ WithField("client", client).
+ WithField("errors", errors).
+ Info("recovered queued tasks")
+
+ return nil
+}
+
+// PersistWorkerQueues pops all queued worker tasks (that are themselves persistable, i.e. not
+// dereference tasks which are just function ptrs), serializes and persists them to the database.
+func (p *Processor) PersistWorkerQueues(ctx context.Context) error {
+ log.Info(ctx, "dehydrate!")
+
+ var (
+ // Counts of each task type
+ // successfully persisted.
+ delivery int
+ federator int
+ client int
+
+ // Failed persists.
+ errors int
+
+ // Serialized tasks to persist.
+ tasks []*gtsmodel.WorkerTask
+ )
+
+ for {
+ // Pop all queued deliveries.
+ task, err := p.popDelivery()
+ if err != nil {
+ log.Errorf(ctx, "error popping delivery: %v", err)
+ errors++ // incr error count.
+ continue
+ }
+
+ if task == nil {
+ // No more queue
+ // tasks to pop!
+ break
+ }
+
+ // Append serialized task.
+ tasks = append(tasks, task)
+ delivery++ // incr count
+ }
+
+ for {
+ // Pop queued federator msgs.
+ task, err := p.popFederator()
+ if err != nil {
+ log.Errorf(ctx, "error popping federator message: %v", err)
+ errors++ // incr count
+ continue
+ }
+
+ if task == nil {
+ // No more queue
+ // tasks to pop!
+ break
+ }
+
+ // Append serialized task.
+ tasks = append(tasks, task)
+ federator++ // incr count
+ }
+
+ for {
+ // Pop queued client msgs.
+ task, err := p.popClient()
+ if err != nil {
+ log.Errorf(ctx, "error popping client message: %v", err)
+ continue
+ }
+
+ if task == nil {
+ // No more queue
+ // tasks to pop!
+ break
+ }
+
+ // Append serialized task.
+ tasks = append(tasks, task)
+ client++ // incr count
+ }
+
+ // Persist all serialized queued worker tasks to database.
+ if err := p.state.DB.PutWorkerTasks(ctx, tasks); err != nil {
+ return gtserror.Newf("error putting tasks in db: %w", err)
+ }
+
+ // Log recovered tasks.
+ log.WithContext(ctx).
+ WithField("delivery", delivery).
+ WithField("federator", federator).
+ WithField("client", client).
+ WithField("errors", errors).
+ Info("persisted queued tasks")
+
+ return nil
+}
+
+// pushDelivery parses a valid delivery.Delivery{} from serialized task data and pushes to queue.
+func (p *Processor) pushDelivery(ctx context.Context, task *gtsmodel.WorkerTask) error {
+ dlv := new(delivery.Delivery)
+
+ // Deserialize the raw worker task data into delivery.
+ if err := dlv.Deserialize(task.TaskData); err != nil {
+ return gtserror.Newf("error deserializing delivery: %w", err)
+ }
+
+ var tsport transport.Transport
+
+ if uri := dlv.ActorID; uri != "" {
+ // Fetch the actor account by provided URI from db.
+ account, err := p.state.DB.GetAccountByURI(ctx, uri)
+ if err != nil {
+ return gtserror.Newf("error getting actor account %s from db: %w", uri, err)
+ }
+
+ // Fetch a transport for request signing for actor's account username.
+ tsport, err = p.transport.NewTransportForUsername(ctx, account.Username)
+ if err != nil {
+ return gtserror.Newf("error getting transport for actor %s: %w", uri, err)
+ }
+ } else {
+ var err error
+
+ // No actor was given, will be signed by instance account.
+ tsport, err = p.transport.NewTransportForUsername(ctx, "")
+ if err != nil {
+ return gtserror.Newf("error getting instance account transport: %w", err)
+ }
+ }
+
+ // Using transport, add actor signature to delivery.
+ if err := tsport.SignDelivery(dlv); err != nil {
+ return gtserror.Newf("error signing delivery: %w", err)
+ }
+
+ // Push deserialized task to delivery queue.
+ p.state.Workers.Delivery.Queue.Push(dlv)
+
+ return nil
+}
+
+// popDelivery pops delivery.Delivery{} from queue and serializes as valid task data.
+func (p *Processor) popDelivery() (*gtsmodel.WorkerTask, error) {
+
+ // Pop waiting delivery from the delivery worker.
+ delivery, ok := p.state.Workers.Delivery.Queue.Pop()
+ if !ok {
+ return nil, nil
+ }
+
+ // Serialize the delivery task data.
+ data, err := delivery.Serialize()
+ if err != nil {
+ return nil, gtserror.Newf("error serializing delivery: %w", err)
+ }
+
+ return &gtsmodel.WorkerTask{
+ // ID is autoincrement
+ WorkerType: gtsmodel.DeliveryWorker,
+ TaskData: data,
+ CreatedAt: time.Now(),
+ }, nil
+}
+
+// pushClient parses a valid messages.FromFediAPI{} from serialized task data and pushes to queue.
+func (p *Processor) pushFederator(ctx context.Context, task *gtsmodel.WorkerTask) error {
+ var msg messages.FromFediAPI
+
+ // Deserialize the raw worker task data into message.
+ if err := msg.Deserialize(task.TaskData); err != nil {
+ return gtserror.Newf("error deserializing federator message: %w", err)
+ }
+
+ if rcv := msg.Receiving; rcv != nil {
+ // Only a placeholder receiving account will be populated,
+ // fetch the actual model from database by persisted ID.
+ account, err := p.state.DB.GetAccountByID(ctx, rcv.ID)
+ if err != nil {
+ return gtserror.Newf("error fetching receiving account %s from db: %w", rcv.ID, err)
+ }
+
+ // Set the now populated
+ // receiving account model.
+ msg.Receiving = account
+ }
+
+ if req := msg.Requesting; req != nil {
+ // Only a placeholder requesting account will be populated,
+ // fetch the actual model from database by persisted ID.
+ account, err := p.state.DB.GetAccountByID(ctx, req.ID)
+ if err != nil {
+ return gtserror.Newf("error fetching requesting account %s from db: %w", req.ID, err)
+ }
+
+ // Set the now populated
+ // requesting account model.
+ msg.Requesting = account
+ }
+
+ // Push populated task to the federator queue.
+ p.state.Workers.Federator.Queue.Push(&msg)
+
+ return nil
+}
+
+// popFederator pops messages.FromFediAPI{} from queue and serializes as valid task data.
+func (p *Processor) popFederator() (*gtsmodel.WorkerTask, error) {
+
+ // Pop waiting message from the federator worker.
+ msg, ok := p.state.Workers.Federator.Queue.Pop()
+ if !ok {
+ return nil, nil
+ }
+
+ // Serialize message task data.
+ data, err := msg.Serialize()
+ if err != nil {
+ return nil, gtserror.Newf("error serializing federator message: %w", err)
+ }
+
+ return &gtsmodel.WorkerTask{
+ // ID is autoincrement
+ WorkerType: gtsmodel.FederatorWorker,
+ TaskData: data,
+ CreatedAt: time.Now(),
+ }, nil
+}
+
+// pushClient parses a valid messages.FromClientAPI{} from serialized task data and pushes to queue.
+func (p *Processor) pushClient(ctx context.Context, task *gtsmodel.WorkerTask) error {
+ var msg messages.FromClientAPI
+
+ // Deserialize the raw worker task data into message.
+ if err := msg.Deserialize(task.TaskData); err != nil {
+ return gtserror.Newf("error deserializing client message: %w", err)
+ }
+
+ if org := msg.Origin; org != nil {
+ // Only a placeholder origin account will be populated,
+ // fetch the actual model from database by persisted ID.
+ account, err := p.state.DB.GetAccountByID(ctx, org.ID)
+ if err != nil {
+ return gtserror.Newf("error fetching origin account %s from db: %w", org.ID, err)
+ }
+
+ // Set the now populated
+ // origin account model.
+ msg.Origin = account
+ }
+
+ if trg := msg.Target; trg != nil {
+ // Only a placeholder target account will be populated,
+ // fetch the actual model from database by persisted ID.
+ account, err := p.state.DB.GetAccountByID(ctx, trg.ID)
+ if err != nil {
+ return gtserror.Newf("error fetching target account %s from db: %w", trg.ID, err)
+ }
+
+ // Set the now populated
+ // target account model.
+ msg.Target = account
+ }
+
+ // Push populated task to the federator queue.
+ p.state.Workers.Client.Queue.Push(&msg)
+
+ return nil
+}
+
+// popClient pops messages.FromClientAPI{} from queue and serializes as valid task data.
+func (p *Processor) popClient() (*gtsmodel.WorkerTask, error) {
+
+ // Pop waiting message from the client worker.
+ msg, ok := p.state.Workers.Client.Queue.Pop()
+ if !ok {
+ return nil, nil
+ }
+
+ // Serialize message task data.
+ data, err := msg.Serialize()
+ if err != nil {
+ return nil, gtserror.Newf("error serializing client message: %w", err)
+ }
+
+ return &gtsmodel.WorkerTask{
+ // ID is autoincrement
+ WorkerType: gtsmodel.ClientWorker,
+ TaskData: data,
+ CreatedAt: time.Now(),
+ }, nil
+}