diff options
author | 2022-01-03 17:37:38 +0100 | |
---|---|---|
committer | 2022-01-03 17:37:38 +0100 | |
commit | 8abfa7751ab4b80ced391f8a7bd16c5e6c432fee (patch) | |
tree | 4bade7f50049435fae024f646d009b663a80c3de /internal/media/manager.go | |
parent | add gruf worker pool (diff) | |
download | gotosocial-8abfa7751ab4b80ced391f8a7bd16c5e6c432fee.tar.xz |
return very partial image on first upload
Diffstat (limited to 'internal/media/manager.go')
-rw-r--r-- | internal/media/manager.go | 30 |
1 files changed, 24 insertions, 6 deletions
diff --git a/internal/media/manager.go b/internal/media/manager.go index 16465bb67..54b964564 100644 --- a/internal/media/manager.go +++ b/internal/media/manager.go @@ -25,7 +25,9 @@ import ( "runtime" "strings" + "codeberg.org/gruf/go-runners" "codeberg.org/gruf/go-store/kv" + "github.com/sirupsen/logrus" "github.com/superseriousbusiness/gotosocial/internal/db" ) @@ -37,18 +39,27 @@ type Manager interface { type manager struct { db db.DB storage *kv.KVStore - pool *workerPool + pool runners.WorkerPool } // New returns a media manager with the given db and underlying storage. -func New(database db.DB, storage *kv.KVStore) Manager { +func New(database db.DB, storage *kv.KVStore) (Manager, error) { workers := runtime.NumCPU() / 2 + queue := workers * 10 + pool := runners.NewWorkerPool(workers, queue) - return &manager{ + if start := pool.Start(); !start { + return nil, errors.New("could not start worker pool") + } + logrus.Debugf("started media manager worker pool with %d workers and queue capacity of %d", workers, queue) + + m := &manager{ db: database, storage: storage, - pool: newWorkerPool(workers), + pool: pool, } + + return m, nil } /* @@ -77,9 +88,16 @@ func (m *manager) ProcessMedia(ctx context.Context, data []byte, accountID strin return nil, errors.New("image was of size 0") } - return m.pool.run(func(ctx context.Context, data []byte, contentType string, accountID string) { - m.processImage(ctx, data, contentType, accountID) + media, err := m.preProcessImage(ctx, data, contentType, accountID) + if err != nil { + return nil, err + } + + m.pool.Enqueue(func(innerCtx context.Context) { + }) + + return nil, nil default: return nil, fmt.Errorf("content type %s not (yet) supported", contentType) } |