summaryrefslogtreecommitdiff
path: root/internal/media/manager.go
diff options
context:
space:
mode:
Diffstat (limited to 'internal/media/manager.go')
-rw-r--r--internal/media/manager.go176
1 files changed, 176 insertions, 0 deletions
diff --git a/internal/media/manager.go b/internal/media/manager.go
new file mode 100644
index 000000000..7f626271a
--- /dev/null
+++ b/internal/media/manager.go
@@ -0,0 +1,176 @@
+/*
+ GoToSocial
+ Copyright (C) 2021-2022 GoToSocial Authors admin@gotosocial.org
+
+ This program is free software: you can redistribute it and/or modify
+ it under the terms of the GNU Affero General Public License as published by
+ the Free Software Foundation, either version 3 of the License, or
+ (at your option) any later version.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU Affero General Public License for more details.
+
+ You should have received a copy of the GNU Affero General Public License
+ along with this program. If not, see <http://www.gnu.org/licenses/>.
+*/
+
+package media
+
+import (
+ "context"
+ "errors"
+ "runtime"
+
+ "codeberg.org/gruf/go-runners"
+ "codeberg.org/gruf/go-store/kv"
+ "github.com/sirupsen/logrus"
+ "github.com/superseriousbusiness/gotosocial/internal/db"
+)
+
+// Manager provides an interface for managing media: parsing, storing, and retrieving media objects like photos, videos, and gifs.
+type Manager interface {
+ // ProcessMedia begins the process of decoding and storing the given data as an attachment.
+ // It will return a pointer to a Media struct upon which further actions can be performed, such as getting
+ // the finished media, thumbnail, attachment, etc.
+ //
+ // data should be a function that the media manager can call to return raw bytes of a piece of media.
+ //
+ // accountID should be the account that the media belongs to.
+ //
+ // ai is optional and can be nil. Any additional information about the attachment provided will be put in the database.
+ ProcessMedia(ctx context.Context, data DataFunc, accountID string, ai *AdditionalMediaInfo) (*ProcessingMedia, error)
+ ProcessEmoji(ctx context.Context, data DataFunc, shortcode string, id string, uri string, ai *AdditionalEmojiInfo) (*ProcessingEmoji, error)
+ // NumWorkers returns the total number of workers available to this manager.
+ NumWorkers() int
+ // QueueSize returns the total capacity of the queue.
+ QueueSize() int
+ // JobsQueued returns the number of jobs currently in the task queue.
+ JobsQueued() int
+ // ActiveWorkers returns the number of workers currently performing jobs.
+ ActiveWorkers() int
+ // Stop stops the underlying worker pool of the manager. It should be called
+ // when closing GoToSocial in order to cleanly finish any in-progress jobs.
+ // It will block until workers are finished processing.
+ Stop() error
+}
+
+type manager struct {
+ db db.DB
+ storage *kv.KVStore
+ pool runners.WorkerPool
+ numWorkers int
+ queueSize int
+}
+
+// NewManager returns a media manager with the given db and underlying storage.
+//
+// A worker pool will also be initialized for the manager, to ensure that only
+// a limited number of media will be processed in parallel.
+//
+// The number of workers will be the number of CPUs available to the Go runtime,
+// divided by 2 (rounding down, but always at least 1).
+//
+// The length of the queue will be the number of workers multiplied by 10.
+//
+// So for an 8 core machine, the media manager will get 4 workers, and a queue of length 40.
+// For a 4 core machine, this will be 2 workers, and a queue length of 20.
+// For a single or 2-core machine, the media manager will get 1 worker, and a queue of length 10.
+func NewManager(database db.DB, storage *kv.KVStore) (Manager, error) {
+ numWorkers := runtime.NumCPU() / 2
+ // make sure we always have at least 1 worker even on single-core machines
+ if numWorkers == 0 {
+ numWorkers = 1
+ }
+ queueSize := numWorkers * 10
+
+ m := &manager{
+ db: database,
+ storage: storage,
+ pool: runners.NewWorkerPool(numWorkers, queueSize),
+ numWorkers: numWorkers,
+ queueSize: queueSize,
+ }
+
+ if start := m.pool.Start(); !start {
+ return nil, errors.New("could not start worker pool")
+ }
+ logrus.Debugf("started media manager worker pool with %d workers and queue capacity of %d", numWorkers, queueSize)
+
+ return m, nil
+}
+
+func (m *manager) ProcessMedia(ctx context.Context, data DataFunc, accountID string, ai *AdditionalMediaInfo) (*ProcessingMedia, error) {
+ processingMedia, err := m.preProcessMedia(ctx, data, accountID, ai)
+ if err != nil {
+ return nil, err
+ }
+
+ logrus.Tracef("ProcessMedia: about to enqueue media with attachmentID %s, queue length is %d", processingMedia.AttachmentID(), m.pool.Queue())
+ m.pool.Enqueue(func(innerCtx context.Context) {
+ select {
+ case <-innerCtx.Done():
+ // if the inner context is done that means the worker pool is closing, so we should just return
+ return
+ default:
+ // start loading the media already for the caller's convenience
+ if _, err := processingMedia.LoadAttachment(innerCtx); err != nil {
+ logrus.Errorf("ProcessMedia: error processing media with attachmentID %s: %s", processingMedia.AttachmentID(), err)
+ }
+ }
+ })
+ logrus.Tracef("ProcessMedia: succesfully queued media with attachmentID %s, queue length is %d", processingMedia.AttachmentID(), m.pool.Queue())
+
+ return processingMedia, nil
+}
+
+func (m *manager) ProcessEmoji(ctx context.Context, data DataFunc, shortcode string, id string, uri string, ai *AdditionalEmojiInfo) (*ProcessingEmoji, error) {
+ processingEmoji, err := m.preProcessEmoji(ctx, data, shortcode, id, uri, ai)
+ if err != nil {
+ return nil, err
+ }
+
+ logrus.Tracef("ProcessEmoji: about to enqueue emoji with id %s, queue length is %d", processingEmoji.EmojiID(), m.pool.Queue())
+ m.pool.Enqueue(func(innerCtx context.Context) {
+ select {
+ case <-innerCtx.Done():
+ // if the inner context is done that means the worker pool is closing, so we should just return
+ return
+ default:
+ // start loading the emoji already for the caller's convenience
+ if _, err := processingEmoji.LoadEmoji(innerCtx); err != nil {
+ logrus.Errorf("ProcessEmoji: error processing emoji with id %s: %s", processingEmoji.EmojiID(), err)
+ }
+ }
+ })
+ logrus.Tracef("ProcessEmoji: succesfully queued emoji with id %s, queue length is %d", processingEmoji.EmojiID(), m.pool.Queue())
+
+ return processingEmoji, nil
+}
+
+func (m *manager) NumWorkers() int {
+ return m.numWorkers
+}
+
+func (m *manager) QueueSize() int {
+ return m.queueSize
+}
+
+func (m *manager) JobsQueued() int {
+ return m.pool.Queue()
+}
+
+func (m *manager) ActiveWorkers() int {
+ return m.pool.Workers()
+}
+
+func (m *manager) Stop() error {
+ logrus.Info("stopping media manager worker pool")
+
+ stopped := m.pool.Stop()
+ if !stopped {
+ return errors.New("could not stop media manager worker pool")
+ }
+ return nil
+}