diff options
author | 2024-08-23 15:15:35 +0000 | |
---|---|---|
committer | 2024-08-23 17:15:35 +0200 | |
commit | 8e5a72ac5c37f26b49f5fdd02726ea13e3aac6b0 (patch) | |
tree | 041139f1883541411c9bb4b6738dee64b51b197a /internal | |
parent | [feature] Use `local_only` field, deprecate `federated` field (#3222) (diff) | |
download | gotosocial-8e5a72ac5c37f26b49f5fdd02726ea13e3aac6b0.tar.xz |
[performance] ffmpeg ffprobe wrapper improvements (#3225)
* use a single instance of wazero runtime and compiled modules
* remove test output :facepalm:
* undo process-{media,emoji} changes
* update test runner to include wazero compilation cache
* sign drone.yml
---------
Co-authored-by: tobi <tobi.smethurst@protonmail.com>
Diffstat (limited to 'internal')
-rw-r--r-- | internal/media/ffmpeg.go | 5 | ||||
-rw-r--r-- | internal/media/ffmpeg/cache.go | 46 | ||||
-rw-r--r-- | internal/media/ffmpeg/ffmpeg.go | 67 | ||||
-rw-r--r-- | internal/media/ffmpeg/ffprobe.go | 67 | ||||
-rw-r--r-- | internal/media/ffmpeg/pool.go | 94 | ||||
-rw-r--r-- | internal/media/ffmpeg/runner.go | 70 | ||||
-rw-r--r-- | internal/media/ffmpeg/wasm.go | 201 |
7 files changed, 297 insertions, 253 deletions
diff --git a/internal/media/ffmpeg.go b/internal/media/ffmpeg.go index f88908196..0443f95b8 100644 --- a/internal/media/ffmpeg.go +++ b/internal/media/ffmpeg.go @@ -27,7 +27,6 @@ import ( "codeberg.org/gruf/go-byteutil" - "codeberg.org/gruf/go-ffmpreg/wasm" _ffmpeg "github.com/superseriousbusiness/gotosocial/internal/media/ffmpeg" "github.com/superseriousbusiness/gotosocial/internal/gtserror" @@ -161,7 +160,7 @@ func ffmpegGenerateStatic(ctx context.Context, filepath string) (string, error) // ffmpeg calls `ffmpeg [args...]` (WASM) with directory path mounted in runtime. func ffmpeg(ctx context.Context, dirpath string, args ...string) error { var stderr byteutil.Buffer - rc, err := _ffmpeg.Ffmpeg(ctx, wasm.Args{ + rc, err := _ffmpeg.Ffmpeg(ctx, _ffmpeg.Args{ Stderr: &stderr, Args: args, Config: func(modcfg wazero.ModuleConfig) wazero.ModuleConfig { @@ -188,7 +187,7 @@ func ffprobe(ctx context.Context, filepath string) (*result, error) { dirpath := path.Dir(filepath) // Run ffprobe on our given file at path. - _, err := _ffmpeg.Ffprobe(ctx, wasm.Args{ + _, err := _ffmpeg.Ffprobe(ctx, _ffmpeg.Args{ Stdout: &stdout, Args: []string{ diff --git a/internal/media/ffmpeg/cache.go b/internal/media/ffmpeg/cache.go deleted file mode 100644 index 371d409dc..000000000 --- a/internal/media/ffmpeg/cache.go +++ /dev/null @@ -1,46 +0,0 @@ -// GoToSocial -// Copyright (C) GoToSocial Authors admin@gotosocial.org -// SPDX-License-Identifier: AGPL-3.0-or-later -// -// This program is free software: you can redistribute it and/or modify -// it under the terms of the GNU Affero General Public License as published by -// the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. -// -// This program is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU Affero General Public License for more details. -// -// You should have received a copy of the GNU Affero General Public License -// along with this program. If not, see <http://www.gnu.org/licenses/>. - -package ffmpeg - -import ( - "os" - - "github.com/tetratelabs/wazero" -) - -// shared WASM compilation cache. -var cache wazero.CompilationCache - -func initCache() { - if cache != nil { - return - } - - if dir := os.Getenv("WAZERO_COMPILATION_CACHE"); dir != "" { - var err error - - // Use on-filesystem compilation cache given by env. - cache, err = wazero.NewCompilationCacheWithDir(dir) - if err != nil { - panic(err) - } - } else { - // Use in-memory compilation cache. - cache = wazero.NewCompilationCache() - } -} diff --git a/internal/media/ffmpeg/ffmpeg.go b/internal/media/ffmpeg/ffmpeg.go index 253323989..d33fef34e 100644 --- a/internal/media/ffmpeg/ffmpeg.go +++ b/internal/media/ffmpeg/ffmpeg.go @@ -19,65 +19,22 @@ package ffmpeg import ( "context" - - ffmpeglib "codeberg.org/gruf/go-ffmpreg/embed/ffmpeg" - "codeberg.org/gruf/go-ffmpreg/wasm" - - "github.com/tetratelabs/wazero" - "github.com/tetratelabs/wazero/imports/wasi_snapshot_preview1" ) -// InitFfmpeg initializes the ffmpeg WebAssembly instance pool, -// with given maximum limiting the number of concurrent instances. +// ffmpegRunner limits the number of +// ffmpeg WebAssembly instances that +// may be concurrently running, in +// order to reduce memory usage. +var ffmpegRunner runner + +// InitFfmpeg precompiles the ffmpeg WebAssembly source into memory and +// prepares the runner to only allow max given concurrent running instances. func InitFfmpeg(ctx context.Context, max int) error { - initCache() // ensure compilation cache initialized - return ffmpegPool.Init(ctx, max) + ffmpegRunner.Init(max) + return compileFfmpeg(ctx) } // Ffmpeg runs the given arguments with an instance of ffmpeg. -func Ffmpeg(ctx context.Context, args wasm.Args) (uint32, error) { - return ffmpegPool.Run(ctx, args) -} - -var ffmpegPool = wasmInstancePool{ - inst: wasm.Instantiator{ - - // WASM module name. - Module: "ffmpeg", - - // Per-instance WebAssembly runtime (with shared cache). - Runtime: func(ctx context.Context) wazero.Runtime { - - // Prepare config with cache. - cfg := wazero.NewRuntimeConfig() - cfg = cfg.WithCoreFeatures(ffmpeglib.CoreFeatures) - cfg = cfg.WithCompilationCache(cache) - - // Instantiate runtime with our config. - rt := wazero.NewRuntimeWithConfig(ctx, cfg) - - // Prepare default "env" host module. - env := rt.NewHostModuleBuilder("env") - - // Instantiate "env" module in our runtime. - _, err := env.Instantiate(context.Background()) - if err != nil { - panic(err) - } - - // Instantiate the wasi snapshot preview 1 in runtime. - _, err = wasi_snapshot_preview1.Instantiate(ctx, rt) - if err != nil { - panic(err) - } - - return rt - }, - - // Per-run module configuration. - Config: wazero.NewModuleConfig, - - // Embedded WASM. - Source: ffmpeglib.B, - }, +func Ffmpeg(ctx context.Context, args Args) (uint32, error) { + return ffmpegRunner.Run(ctx, ffmpeg, args) } diff --git a/internal/media/ffmpeg/ffprobe.go b/internal/media/ffmpeg/ffprobe.go index 19582450f..eca819b09 100644 --- a/internal/media/ffmpeg/ffprobe.go +++ b/internal/media/ffmpeg/ffprobe.go @@ -19,65 +19,22 @@ package ffmpeg import ( "context" - - ffprobelib "codeberg.org/gruf/go-ffmpreg/embed/ffprobe" - "codeberg.org/gruf/go-ffmpreg/wasm" - - "github.com/tetratelabs/wazero" - "github.com/tetratelabs/wazero/imports/wasi_snapshot_preview1" ) -// InitFfprobe initializes the ffprobe WebAssembly instance pool, -// with given maximum limiting the number of concurrent instances. +// ffprobeRunner limits the number of +// ffprobe WebAssembly instances that +// may be concurrently running, in +// order to reduce memory usage. +var ffprobeRunner runner + +// InitFfprobe precompiles the ffprobe WebAssembly source into memory and +// prepares the runner to only allow max given concurrent running instances. func InitFfprobe(ctx context.Context, max int) error { - initCache() // ensure compilation cache initialized - return ffprobePool.Init(ctx, max) + ffprobeRunner.Init(max) + return compileFfprobe(ctx) } // Ffprobe runs the given arguments with an instance of ffprobe. -func Ffprobe(ctx context.Context, args wasm.Args) (uint32, error) { - return ffprobePool.Run(ctx, args) -} - -var ffprobePool = wasmInstancePool{ - inst: wasm.Instantiator{ - - // WASM module name. - Module: "ffprobe", - - // Per-instance WebAssembly runtime (with shared cache). - Runtime: func(ctx context.Context) wazero.Runtime { - - // Prepare config with cache. - cfg := wazero.NewRuntimeConfig() - cfg = cfg.WithCoreFeatures(ffprobelib.CoreFeatures) - cfg = cfg.WithCompilationCache(cache) - - // Instantiate runtime with our config. - rt := wazero.NewRuntimeWithConfig(ctx, cfg) - - // Prepare default "env" host module. - env := rt.NewHostModuleBuilder("env") - - // Instantiate "env" module in our runtime. - _, err := env.Instantiate(context.Background()) - if err != nil { - panic(err) - } - - // Instantiate the wasi snapshot preview 1 in runtime. - _, err = wasi_snapshot_preview1.Instantiate(ctx, rt) - if err != nil { - panic(err) - } - - return rt - }, - - // Per-run module configuration. - Config: wazero.NewModuleConfig, - - // Embedded WASM. - Source: ffprobelib.B, - }, +func Ffprobe(ctx context.Context, args Args) (uint32, error) { + return ffprobeRunner.Run(ctx, ffprobe, args) } diff --git a/internal/media/ffmpeg/pool.go b/internal/media/ffmpeg/pool.go deleted file mode 100644 index e63b10e69..000000000 --- a/internal/media/ffmpeg/pool.go +++ /dev/null @@ -1,94 +0,0 @@ -// GoToSocial -// Copyright (C) GoToSocial Authors admin@gotosocial.org -// SPDX-License-Identifier: AGPL-3.0-or-later -// -// This program is free software: you can redistribute it and/or modify -// it under the terms of the GNU Affero General Public License as published by -// the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. -// -// This program is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU Affero General Public License for more details. -// -// You should have received a copy of the GNU Affero General Public License -// along with this program. If not, see <http://www.gnu.org/licenses/>. - -package ffmpeg - -import ( - "context" - - "codeberg.org/gruf/go-ffmpreg/wasm" -) - -// wasmInstancePool wraps a wasm.Instantiator{} and a -// channel of wasm.Instance{}s to provide a concurrency -// safe pool of WebAssembly module instances capable of -// compiling new instances on-the-fly, with a predetermined -// maximum number of concurrent instances at any one time. -type wasmInstancePool struct { - inst wasm.Instantiator - pool chan *wasm.Instance -} - -func (p *wasmInstancePool) Init(ctx context.Context, sz int) error { - // Initialize for first time - // to preload module into the - // wazero compilation cache. - inst, err := p.inst.New(ctx) - if err != nil { - return err - } - - // Clamp to 1. - if sz <= 0 { - sz = 1 - } - - // Allocate new pool instance channel. - p.pool = make(chan *wasm.Instance, sz) - - // Store only one - // open instance - // at init time. - p.pool <- inst - - // Fill reminaing with closed - // instances for later opening. - for i := 0; i < sz-1; i++ { - p.pool <- new(wasm.Instance) - } - - return nil -} - -func (p *wasmInstancePool) Run(ctx context.Context, args wasm.Args) (uint32, error) { - var inst *wasm.Instance - - select { - // Context canceled. - case <-ctx.Done(): - return 0, ctx.Err() - - // Acquire instance. - case inst = <-p.pool: - - // Ensure instance is - // ready for running. - if inst.IsClosed() { - var err error - inst, err = p.inst.New(ctx) - if err != nil { - return 0, err - } - } - } - - // Release instance to pool on end. - defer func() { p.pool <- inst }() - - // Pass args to instance. - return inst.Run(ctx, args) -} diff --git a/internal/media/ffmpeg/runner.go b/internal/media/ffmpeg/runner.go new file mode 100644 index 000000000..403131ff7 --- /dev/null +++ b/internal/media/ffmpeg/runner.go @@ -0,0 +1,70 @@ +// GoToSocial +// Copyright (C) GoToSocial Authors admin@gotosocial.org +// SPDX-License-Identifier: AGPL-3.0-or-later +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see <http://www.gnu.org/licenses/>. + +package ffmpeg + +import ( + "context" + + "github.com/tetratelabs/wazero" +) + +// runner simply abstracts away the complexities +// of limiting the number of concurrent running +// instances of a particular WebAssembly module. +type runner struct{ pool chan struct{} } + +// Init initializes the runner to +// only allow 'n' concurrently running +// instances. Special cases include 0 +// which clamps to 1, and < 0 which +// disables the limit alltogether. +func (r *runner) Init(n int) { + + // Reset pool. + r.pool = nil + + // Clamp to 1. + if n <= 0 { + n = 1 + } + + // Allocate new pool channel. + r.pool = make(chan struct{}, n) + for i := 0; i < n; i++ { + r.pool <- struct{}{} + } +} + +// Run will attempt to pass the given compiled WebAssembly module with args to run(), waiting on +// the receiving runner until a free slot is available to run an instance, (if a limit is enabled). +func (r *runner) Run(ctx context.Context, cmod wazero.CompiledModule, args Args) (uint32, error) { + select { + // Context canceled. + case <-ctx.Done(): + return 0, ctx.Err() + + // Slot acquired. + case <-r.pool: + } + + // Release slot back to pool on end. + defer func() { r.pool <- struct{}{} }() + + // Pass to main module runner. + return run(ctx, cmod, args) +} diff --git a/internal/media/ffmpeg/wasm.go b/internal/media/ffmpeg/wasm.go new file mode 100644 index 000000000..ff5506f2a --- /dev/null +++ b/internal/media/ffmpeg/wasm.go @@ -0,0 +1,201 @@ +// GoToSocial +// Copyright (C) GoToSocial Authors admin@gotosocial.org +// SPDX-License-Identifier: AGPL-3.0-or-later +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see <http://www.gnu.org/licenses/>. + +package ffmpeg + +import ( + "context" + "io" + "os" + + ffmpeglib "codeberg.org/gruf/go-ffmpreg/embed/ffmpeg" + ffprobelib "codeberg.org/gruf/go-ffmpreg/embed/ffprobe" + "github.com/tetratelabs/wazero" + "github.com/tetratelabs/wazero/imports/wasi_snapshot_preview1" + "github.com/tetratelabs/wazero/sys" +) + +// Use all core features required by ffmpeg / ffprobe +// (these should be the same but we OR just in case). +const corefeatures = ffprobelib.CoreFeatures | + ffmpeglib.CoreFeatures + +var ( + // shared WASM runtime instance. + runtime wazero.Runtime + + // ffmpeg / ffprobe compiled WASM. + ffmpeg wazero.CompiledModule + ffprobe wazero.CompiledModule +) + +// Args encapsulates the passing of common +// configuration options to run an instance +// of a compiled WebAssembly module that is +// run in a typical CLI manner. +type Args struct { + + // Optional further module configuration function. + // (e.g. to mount filesystem dir, set env vars, etc). + Config func(wazero.ModuleConfig) wazero.ModuleConfig + + // Standard FDs. + Stdin io.Reader + Stdout io.Writer + Stderr io.Writer + + // CLI args. + Args []string +} + +// run will run the given compiled +// WebAssembly module using given args, +// using the global wazero runtime. +func run( + ctx context.Context, + cmod wazero.CompiledModule, + args Args, +) ( + uint32, // exit code + error, +) { + // Prefix module name as argv0 to args. + cargs := make([]string, len(args.Args)+1) + copy(cargs[1:], args.Args) + cargs[0] = cmod.Name() + + // Create base module config. + modcfg := wazero.NewModuleConfig() + modcfg = modcfg.WithArgs(cargs...) + modcfg = modcfg.WithStdin(args.Stdin) + modcfg = modcfg.WithStdout(args.Stdout) + modcfg = modcfg.WithStderr(args.Stderr) + + if args.Config != nil { + // Pass through config fn. + modcfg = args.Config(modcfg) + } + + // Instantiate the module from precompiled wasm module data. + mod, err := runtime.InstantiateModule(ctx, cmod, modcfg) + + if mod != nil { + // Ensure closed. + _ = mod.Close(ctx) + } + + // Try extract exit code. + switch err := err.(type) { + case *sys.ExitError: + return err.ExitCode(), nil + default: + return 0, err + } +} + +// compileFfmpeg ensures the ffmpeg WebAssembly has been +// pre-compiled into memory. If already compiled is a no-op. +func compileFfmpeg(ctx context.Context) error { + if ffmpeg != nil { + return nil + } + + // Ensure runtime already initialized. + if err := initRuntime(ctx); err != nil { + return err + } + + // Compile the ffmpeg WebAssembly module into memory. + cmod, err := runtime.CompileModule(ctx, ffmpeglib.B) + if err != nil { + return err + } + + // Set module. + ffmpeg = cmod + return nil +} + +// compileFfprobe ensures the ffprobe WebAssembly has been +// pre-compiled into memory. If already compiled is a no-op. +func compileFfprobe(ctx context.Context) error { + if ffprobe != nil { + return nil + } + + // Ensure runtime already initialized. + if err := initRuntime(ctx); err != nil { + return err + } + + // Compile the ffprobe WebAssembly module into memory. + cmod, err := runtime.CompileModule(ctx, ffprobelib.B) + if err != nil { + return err + } + + // Set module. + ffprobe = cmod + return nil +} + +// initRuntime initializes the global wazero.Runtime, +// if already initialized this function is a no-op. +func initRuntime(ctx context.Context) error { + if runtime != nil { + return nil + } + + var cache wazero.CompilationCache + + if dir := os.Getenv("GTS_WAZERO_COMPILATION_CACHE"); dir != "" { + var err error + + // Use on-filesystem compilation cache given by env. + cache, err = wazero.NewCompilationCacheWithDir(dir) + if err != nil { + return err + } + } + + // Prepare config with cache. + cfg := wazero.NewRuntimeConfig() + cfg = cfg.WithCoreFeatures(corefeatures) + cfg = cfg.WithCompilationCache(cache) + + // Instantiate runtime with prepared config. + rt := wazero.NewRuntimeWithConfig(ctx, cfg) + + // Prepare default "env" host module. + env := rt.NewHostModuleBuilder("env") + + // Instantiate host "env" module. + _, err := env.Instantiate(ctx) + if err != nil { + return err + } + + // Instantiate wasi snapshot preview features in runtime. + _, err = wasi_snapshot_preview1.Instantiate(ctx, rt) + if err != nil { + return err + } + + // Set runtime. + runtime = rt + return nil +} |