diff options
Diffstat (limited to 'internal/processing/admin/workertask.go')
-rw-r--r-- | internal/processing/admin/workertask.go | 426 |
1 files changed, 426 insertions, 0 deletions
diff --git a/internal/processing/admin/workertask.go b/internal/processing/admin/workertask.go new file mode 100644 index 000000000..6d7cc7b7a --- /dev/null +++ b/internal/processing/admin/workertask.go @@ -0,0 +1,426 @@ +// GoToSocial +// Copyright (C) GoToSocial Authors admin@gotosocial.org +// SPDX-License-Identifier: AGPL-3.0-or-later +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see <http://www.gnu.org/licenses/>. + +package admin + +import ( + "context" + "fmt" + "slices" + "time" + + "github.com/superseriousbusiness/gotosocial/internal/gtserror" + "github.com/superseriousbusiness/gotosocial/internal/gtsmodel" + "github.com/superseriousbusiness/gotosocial/internal/log" + "github.com/superseriousbusiness/gotosocial/internal/messages" + "github.com/superseriousbusiness/gotosocial/internal/transport" + "github.com/superseriousbusiness/gotosocial/internal/transport/delivery" +) + +// NOTE: +// Having these functions in the processor, which is +// usually the intermediary that performs *processing* +// between the HTTP route handlers and the underlying +// database / storage layers is a little odd, so this +// may be subject to change! +// +// For now at least, this is a useful place that has +// access to the underlying database, workers and +// causes no dependency cycles with this use case! + +// FillWorkerQueues recovers all serialized worker tasks from the database +// (if any!), and pushes them to each of their relevant worker queues. +func (p *Processor) FillWorkerQueues(ctx context.Context) error { + log.Info(ctx, "rehydrate!") + + // Get all persisted worker tasks from db. + // + // (database returns these as ASCENDING, i.e. + // returned in the order they were inserted). + tasks, err := p.state.DB.GetWorkerTasks(ctx) + if err != nil { + return gtserror.Newf("error fetching worker tasks from db: %w", err) + } + + var ( + // Counts of each task type + // successfully recovered. + delivery int + federator int + client int + + // Failed recoveries. + errors int + ) + +loop: + + // Handle each persisted task, removing + // all those we can't handle. Leaving us + // with a slice of tasks we can safely + // delete from being persisted in the DB. + for i := 0; i < len(tasks); { + var err error + + // Task at index. + task := tasks[i] + + // Appropriate task count + // pointer to increment. + var counter *int + + // Attempt to recovery persisted + // task depending on worker type. + switch task.WorkerType { + case gtsmodel.DeliveryWorker: + err = p.pushDelivery(ctx, task) + counter = &delivery + case gtsmodel.FederatorWorker: + err = p.pushFederator(ctx, task) + counter = &federator + case gtsmodel.ClientWorker: + err = p.pushClient(ctx, task) + counter = &client + default: + err = fmt.Errorf("invalid worker type %d", task.WorkerType) + } + + if err != nil { + log.Errorf(ctx, "error pushing task %d: %v", task.ID, err) + + // Drop error'd task from slice. + tasks = slices.Delete(tasks, i, i+1) + + // Incr errors. + errors++ + continue loop + } + + // Increment slice + // index & counter. + (*counter)++ + i++ + } + + // Tasks that worker successfully pushed + // to their appropriate workers, we can + // safely now remove from the database. + for _, task := range tasks { + if err := p.state.DB.DeleteWorkerTaskByID(ctx, task.ID); err != nil { + log.Errorf(ctx, "error deleting task from db: %v", err) + } + } + + // Log recovered tasks. + log.WithContext(ctx). + WithField("delivery", delivery). + WithField("federator", federator). + WithField("client", client). + WithField("errors", errors). + Info("recovered queued tasks") + + return nil +} + +// PersistWorkerQueues pops all queued worker tasks (that are themselves persistable, i.e. not +// dereference tasks which are just function ptrs), serializes and persists them to the database. +func (p *Processor) PersistWorkerQueues(ctx context.Context) error { + log.Info(ctx, "dehydrate!") + + var ( + // Counts of each task type + // successfully persisted. + delivery int + federator int + client int + + // Failed persists. + errors int + + // Serialized tasks to persist. + tasks []*gtsmodel.WorkerTask + ) + + for { + // Pop all queued deliveries. + task, err := p.popDelivery() + if err != nil { + log.Errorf(ctx, "error popping delivery: %v", err) + errors++ // incr error count. + continue + } + + if task == nil { + // No more queue + // tasks to pop! + break + } + + // Append serialized task. + tasks = append(tasks, task) + delivery++ // incr count + } + + for { + // Pop queued federator msgs. + task, err := p.popFederator() + if err != nil { + log.Errorf(ctx, "error popping federator message: %v", err) + errors++ // incr count + continue + } + + if task == nil { + // No more queue + // tasks to pop! + break + } + + // Append serialized task. + tasks = append(tasks, task) + federator++ // incr count + } + + for { + // Pop queued client msgs. + task, err := p.popClient() + if err != nil { + log.Errorf(ctx, "error popping client message: %v", err) + continue + } + + if task == nil { + // No more queue + // tasks to pop! + break + } + + // Append serialized task. + tasks = append(tasks, task) + client++ // incr count + } + + // Persist all serialized queued worker tasks to database. + if err := p.state.DB.PutWorkerTasks(ctx, tasks); err != nil { + return gtserror.Newf("error putting tasks in db: %w", err) + } + + // Log recovered tasks. + log.WithContext(ctx). + WithField("delivery", delivery). + WithField("federator", federator). + WithField("client", client). + WithField("errors", errors). + Info("persisted queued tasks") + + return nil +} + +// pushDelivery parses a valid delivery.Delivery{} from serialized task data and pushes to queue. +func (p *Processor) pushDelivery(ctx context.Context, task *gtsmodel.WorkerTask) error { + dlv := new(delivery.Delivery) + + // Deserialize the raw worker task data into delivery. + if err := dlv.Deserialize(task.TaskData); err != nil { + return gtserror.Newf("error deserializing delivery: %w", err) + } + + var tsport transport.Transport + + if uri := dlv.ActorID; uri != "" { + // Fetch the actor account by provided URI from db. + account, err := p.state.DB.GetAccountByURI(ctx, uri) + if err != nil { + return gtserror.Newf("error getting actor account %s from db: %w", uri, err) + } + + // Fetch a transport for request signing for actor's account username. + tsport, err = p.transport.NewTransportForUsername(ctx, account.Username) + if err != nil { + return gtserror.Newf("error getting transport for actor %s: %w", uri, err) + } + } else { + var err error + + // No actor was given, will be signed by instance account. + tsport, err = p.transport.NewTransportForUsername(ctx, "") + if err != nil { + return gtserror.Newf("error getting instance account transport: %w", err) + } + } + + // Using transport, add actor signature to delivery. + if err := tsport.SignDelivery(dlv); err != nil { + return gtserror.Newf("error signing delivery: %w", err) + } + + // Push deserialized task to delivery queue. + p.state.Workers.Delivery.Queue.Push(dlv) + + return nil +} + +// popDelivery pops delivery.Delivery{} from queue and serializes as valid task data. +func (p *Processor) popDelivery() (*gtsmodel.WorkerTask, error) { + + // Pop waiting delivery from the delivery worker. + delivery, ok := p.state.Workers.Delivery.Queue.Pop() + if !ok { + return nil, nil + } + + // Serialize the delivery task data. + data, err := delivery.Serialize() + if err != nil { + return nil, gtserror.Newf("error serializing delivery: %w", err) + } + + return >smodel.WorkerTask{ + // ID is autoincrement + WorkerType: gtsmodel.DeliveryWorker, + TaskData: data, + CreatedAt: time.Now(), + }, nil +} + +// pushClient parses a valid messages.FromFediAPI{} from serialized task data and pushes to queue. +func (p *Processor) pushFederator(ctx context.Context, task *gtsmodel.WorkerTask) error { + var msg messages.FromFediAPI + + // Deserialize the raw worker task data into message. + if err := msg.Deserialize(task.TaskData); err != nil { + return gtserror.Newf("error deserializing federator message: %w", err) + } + + if rcv := msg.Receiving; rcv != nil { + // Only a placeholder receiving account will be populated, + // fetch the actual model from database by persisted ID. + account, err := p.state.DB.GetAccountByID(ctx, rcv.ID) + if err != nil { + return gtserror.Newf("error fetching receiving account %s from db: %w", rcv.ID, err) + } + + // Set the now populated + // receiving account model. + msg.Receiving = account + } + + if req := msg.Requesting; req != nil { + // Only a placeholder requesting account will be populated, + // fetch the actual model from database by persisted ID. + account, err := p.state.DB.GetAccountByID(ctx, req.ID) + if err != nil { + return gtserror.Newf("error fetching requesting account %s from db: %w", req.ID, err) + } + + // Set the now populated + // requesting account model. + msg.Requesting = account + } + + // Push populated task to the federator queue. + p.state.Workers.Federator.Queue.Push(&msg) + + return nil +} + +// popFederator pops messages.FromFediAPI{} from queue and serializes as valid task data. +func (p *Processor) popFederator() (*gtsmodel.WorkerTask, error) { + + // Pop waiting message from the federator worker. + msg, ok := p.state.Workers.Federator.Queue.Pop() + if !ok { + return nil, nil + } + + // Serialize message task data. + data, err := msg.Serialize() + if err != nil { + return nil, gtserror.Newf("error serializing federator message: %w", err) + } + + return >smodel.WorkerTask{ + // ID is autoincrement + WorkerType: gtsmodel.FederatorWorker, + TaskData: data, + CreatedAt: time.Now(), + }, nil +} + +// pushClient parses a valid messages.FromClientAPI{} from serialized task data and pushes to queue. +func (p *Processor) pushClient(ctx context.Context, task *gtsmodel.WorkerTask) error { + var msg messages.FromClientAPI + + // Deserialize the raw worker task data into message. + if err := msg.Deserialize(task.TaskData); err != nil { + return gtserror.Newf("error deserializing client message: %w", err) + } + + if org := msg.Origin; org != nil { + // Only a placeholder origin account will be populated, + // fetch the actual model from database by persisted ID. + account, err := p.state.DB.GetAccountByID(ctx, org.ID) + if err != nil { + return gtserror.Newf("error fetching origin account %s from db: %w", org.ID, err) + } + + // Set the now populated + // origin account model. + msg.Origin = account + } + + if trg := msg.Target; trg != nil { + // Only a placeholder target account will be populated, + // fetch the actual model from database by persisted ID. + account, err := p.state.DB.GetAccountByID(ctx, trg.ID) + if err != nil { + return gtserror.Newf("error fetching target account %s from db: %w", trg.ID, err) + } + + // Set the now populated + // target account model. + msg.Target = account + } + + // Push populated task to the federator queue. + p.state.Workers.Client.Queue.Push(&msg) + + return nil +} + +// popClient pops messages.FromClientAPI{} from queue and serializes as valid task data. +func (p *Processor) popClient() (*gtsmodel.WorkerTask, error) { + + // Pop waiting message from the client worker. + msg, ok := p.state.Workers.Client.Queue.Pop() + if !ok { + return nil, nil + } + + // Serialize message task data. + data, err := msg.Serialize() + if err != nil { + return nil, gtserror.Newf("error serializing client message: %w", err) + } + + return >smodel.WorkerTask{ + // ID is autoincrement + WorkerType: gtsmodel.ClientWorker, + TaskData: data, + CreatedAt: time.Now(), + }, nil +} |