summaryrefslogtreecommitdiff
path: root/internal/processing/admin/workertask.go
blob: 6d7cc7b7ad1b966c9dc97edeebdc02d1e1056e15 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
// GoToSocial
// Copyright (C) GoToSocial Authors admin@gotosocial.org
// SPDX-License-Identifier: AGPL-3.0-or-later
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program.  If not, see <http://www.gnu.org/licenses/>.

package admin

import (
	"context"
	"fmt"
	"slices"
	"time"

	"github.com/superseriousbusiness/gotosocial/internal/gtserror"
	"github.com/superseriousbusiness/gotosocial/internal/gtsmodel"
	"github.com/superseriousbusiness/gotosocial/internal/log"
	"github.com/superseriousbusiness/gotosocial/internal/messages"
	"github.com/superseriousbusiness/gotosocial/internal/transport"
	"github.com/superseriousbusiness/gotosocial/internal/transport/delivery"
)

// NOTE:
// Having these functions in the processor, which is
// usually the intermediary that performs *processing*
// between the HTTP route handlers and the underlying
// database / storage layers is a little odd, so this
// may be subject to change!
//
// For now at least, this is a useful place that has
// access to the underlying database, workers and
// causes no dependency cycles with this use case!

// FillWorkerQueues recovers all serialized worker tasks from the database
// (if any!), and pushes them to each of their relevant worker queues.
func (p *Processor) FillWorkerQueues(ctx context.Context) error {
	log.Info(ctx, "rehydrate!")

	// Get all persisted worker tasks from db.
	//
	// (database returns these as ASCENDING, i.e.
	// returned in the order they were inserted).
	tasks, err := p.state.DB.GetWorkerTasks(ctx)
	if err != nil {
		return gtserror.Newf("error fetching worker tasks from db: %w", err)
	}

	var (
		// Counts of each task type
		// successfully recovered.
		delivery  int
		federator int
		client    int

		// Failed recoveries.
		errors int
	)

loop:

	// Handle each persisted task, removing
	// all those we can't handle. Leaving us
	// with a slice of tasks we can safely
	// delete from being persisted in the DB.
	for i := 0; i < len(tasks); {
		var err error

		// Task at index.
		task := tasks[i]

		// Appropriate task count
		// pointer to increment.
		var counter *int

		// Attempt to recovery persisted
		// task depending on worker type.
		switch task.WorkerType {
		case gtsmodel.DeliveryWorker:
			err = p.pushDelivery(ctx, task)
			counter = &delivery
		case gtsmodel.FederatorWorker:
			err = p.pushFederator(ctx, task)
			counter = &federator
		case gtsmodel.ClientWorker:
			err = p.pushClient(ctx, task)
			counter = &client
		default:
			err = fmt.Errorf("invalid worker type %d", task.WorkerType)
		}

		if err != nil {
			log.Errorf(ctx, "error pushing task %d: %v", task.ID, err)

			// Drop error'd task from slice.
			tasks = slices.Delete(tasks, i, i+1)

			// Incr errors.
			errors++
			continue loop
		}

		// Increment slice
		// index & counter.
		(*counter)++
		i++
	}

	// Tasks that worker successfully pushed
	// to their appropriate workers, we can
	// safely now remove from the database.
	for _, task := range tasks {
		if err := p.state.DB.DeleteWorkerTaskByID(ctx, task.ID); err != nil {
			log.Errorf(ctx, "error deleting task from db: %v", err)
		}
	}

	// Log recovered tasks.
	log.WithContext(ctx).
		WithField("delivery", delivery).
		WithField("federator", federator).
		WithField("client", client).
		WithField("errors", errors).
		Info("recovered queued tasks")

	return nil
}

// PersistWorkerQueues pops all queued worker tasks (that are themselves persistable, i.e. not
// dereference tasks which are just function ptrs), serializes and persists them to the database.
func (p *Processor) PersistWorkerQueues(ctx context.Context) error {
	log.Info(ctx, "dehydrate!")

	var (
		// Counts of each task type
		// successfully persisted.
		delivery  int
		federator int
		client    int

		// Failed persists.
		errors int

		// Serialized tasks to persist.
		tasks []*gtsmodel.WorkerTask
	)

	for {
		// Pop all queued deliveries.
		task, err := p.popDelivery()
		if err != nil {
			log.Errorf(ctx, "error popping delivery: %v", err)
			errors++ // incr error count.
			continue
		}

		if task == nil {
			// No more queue
			// tasks to pop!
			break
		}

		// Append serialized task.
		tasks = append(tasks, task)
		delivery++ // incr count
	}

	for {
		// Pop queued federator msgs.
		task, err := p.popFederator()
		if err != nil {
			log.Errorf(ctx, "error popping federator message: %v", err)
			errors++ // incr count
			continue
		}

		if task == nil {
			// No more queue
			// tasks to pop!
			break
		}

		// Append serialized task.
		tasks = append(tasks, task)
		federator++ // incr count
	}

	for {
		// Pop queued client msgs.
		task, err := p.popClient()
		if err != nil {
			log.Errorf(ctx, "error popping client message: %v", err)
			continue
		}

		if task == nil {
			// No more queue
			// tasks to pop!
			break
		}

		// Append serialized task.
		tasks = append(tasks, task)
		client++ // incr count
	}

	// Persist all serialized queued worker tasks to database.
	if err := p.state.DB.PutWorkerTasks(ctx, tasks); err != nil {
		return gtserror.Newf("error putting tasks in db: %w", err)
	}

	// Log recovered tasks.
	log.WithContext(ctx).
		WithField("delivery", delivery).
		WithField("federator", federator).
		WithField("client", client).
		WithField("errors", errors).
		Info("persisted queued tasks")

	return nil
}

// pushDelivery parses a valid delivery.Delivery{} from serialized task data and pushes to queue.
func (p *Processor) pushDelivery(ctx context.Context, task *gtsmodel.WorkerTask) error {
	dlv := new(delivery.Delivery)

	// Deserialize the raw worker task data into delivery.
	if err := dlv.Deserialize(task.TaskData); err != nil {
		return gtserror.Newf("error deserializing delivery: %w", err)
	}

	var tsport transport.Transport

	if uri := dlv.ActorID; uri != "" {
		// Fetch the actor account by provided URI from db.
		account, err := p.state.DB.GetAccountByURI(ctx, uri)
		if err != nil {
			return gtserror.Newf("error getting actor account %s from db: %w", uri, err)
		}

		// Fetch a transport for request signing for actor's account username.
		tsport, err = p.transport.NewTransportForUsername(ctx, account.Username)
		if err != nil {
			return gtserror.Newf("error getting transport for actor %s: %w", uri, err)
		}
	} else {
		var err error

		// No actor was given, will be signed by instance account.
		tsport, err = p.transport.NewTransportForUsername(ctx, "")
		if err != nil {
			return gtserror.Newf("error getting instance account transport: %w", err)
		}
	}

	// Using transport, add actor signature to delivery.
	if err := tsport.SignDelivery(dlv); err != nil {
		return gtserror.Newf("error signing delivery: %w", err)
	}

	// Push deserialized task to delivery queue.
	p.state.Workers.Delivery.Queue.Push(dlv)

	return nil
}

// popDelivery pops delivery.Delivery{} from queue and serializes as valid task data.
func (p *Processor) popDelivery() (*gtsmodel.WorkerTask, error) {

	// Pop waiting delivery from the delivery worker.
	delivery, ok := p.state.Workers.Delivery.Queue.Pop()
	if !ok {
		return nil, nil
	}

	// Serialize the delivery task data.
	data, err := delivery.Serialize()
	if err != nil {
		return nil, gtserror.Newf("error serializing delivery: %w", err)
	}

	return &gtsmodel.WorkerTask{
		// ID is autoincrement
		WorkerType: gtsmodel.DeliveryWorker,
		TaskData:   data,
		CreatedAt:  time.Now(),
	}, nil
}

// pushClient parses a valid messages.FromFediAPI{} from serialized task data and pushes to queue.
func (p *Processor) pushFederator(ctx context.Context, task *gtsmodel.WorkerTask) error {
	var msg messages.FromFediAPI

	// Deserialize the raw worker task data into message.
	if err := msg.Deserialize(task.TaskData); err != nil {
		return gtserror.Newf("error deserializing federator message: %w", err)
	}

	if rcv := msg.Receiving; rcv != nil {
		// Only a placeholder receiving account will be populated,
		// fetch the actual model from database by persisted ID.
		account, err := p.state.DB.GetAccountByID(ctx, rcv.ID)
		if err != nil {
			return gtserror.Newf("error fetching receiving account %s from db: %w", rcv.ID, err)
		}

		// Set the now populated
		// receiving account model.
		msg.Receiving = account
	}

	if req := msg.Requesting; req != nil {
		// Only a placeholder requesting account will be populated,
		// fetch the actual model from database by persisted ID.
		account, err := p.state.DB.GetAccountByID(ctx, req.ID)
		if err != nil {
			return gtserror.Newf("error fetching requesting account %s from db: %w", req.ID, err)
		}

		// Set the now populated
		// requesting account model.
		msg.Requesting = account
	}

	// Push populated task to the federator queue.
	p.state.Workers.Federator.Queue.Push(&msg)

	return nil
}

// popFederator pops messages.FromFediAPI{} from queue and serializes as valid task data.
func (p *Processor) popFederator() (*gtsmodel.WorkerTask, error) {

	// Pop waiting message from the federator worker.
	msg, ok := p.state.Workers.Federator.Queue.Pop()
	if !ok {
		return nil, nil
	}

	// Serialize message task data.
	data, err := msg.Serialize()
	if err != nil {
		return nil, gtserror.Newf("error serializing federator message: %w", err)
	}

	return &gtsmodel.WorkerTask{
		// ID is autoincrement
		WorkerType: gtsmodel.FederatorWorker,
		TaskData:   data,
		CreatedAt:  time.Now(),
	}, nil
}

// pushClient parses a valid messages.FromClientAPI{} from serialized task data and pushes to queue.
func (p *Processor) pushClient(ctx context.Context, task *gtsmodel.WorkerTask) error {
	var msg messages.FromClientAPI

	// Deserialize the raw worker task data into message.
	if err := msg.Deserialize(task.TaskData); err != nil {
		return gtserror.Newf("error deserializing client message: %w", err)
	}

	if org := msg.Origin; org != nil {
		// Only a placeholder origin account will be populated,
		// fetch the actual model from database by persisted ID.
		account, err := p.state.DB.GetAccountByID(ctx, org.ID)
		if err != nil {
			return gtserror.Newf("error fetching origin account %s from db: %w", org.ID, err)
		}

		// Set the now populated
		// origin account model.
		msg.Origin = account
	}

	if trg := msg.Target; trg != nil {
		// Only a placeholder target account will be populated,
		// fetch the actual model from database by persisted ID.
		account, err := p.state.DB.GetAccountByID(ctx, trg.ID)
		if err != nil {
			return gtserror.Newf("error fetching target account %s from db: %w", trg.ID, err)
		}

		// Set the now populated
		// target account model.
		msg.Target = account
	}

	// Push populated task to the federator queue.
	p.state.Workers.Client.Queue.Push(&msg)

	return nil
}

// popClient pops messages.FromClientAPI{} from queue and serializes as valid task data.
func (p *Processor) popClient() (*gtsmodel.WorkerTask, error) {

	// Pop waiting message from the client worker.
	msg, ok := p.state.Workers.Client.Queue.Pop()
	if !ok {
		return nil, nil
	}

	// Serialize message task data.
	data, err := msg.Serialize()
	if err != nil {
		return nil, gtserror.Newf("error serializing client message: %w", err)
	}

	return &gtsmodel.WorkerTask{
		// ID is autoincrement
		WorkerType: gtsmodel.ClientWorker,
		TaskData:   data,
		CreatedAt:  time.Now(),
	}, nil
}