diff options
Diffstat (limited to 'vendor/codeberg.org')
24 files changed, 1490 insertions, 1709 deletions
diff --git a/vendor/codeberg.org/gruf/go-store/v2/LICENSE b/vendor/codeberg.org/gruf/go-storage/LICENSE index e4163ae35..e4163ae35 100644 --- a/vendor/codeberg.org/gruf/go-store/v2/LICENSE +++ b/vendor/codeberg.org/gruf/go-storage/LICENSE diff --git a/vendor/codeberg.org/gruf/go-storage/README.md b/vendor/codeberg.org/gruf/go-storage/README.md new file mode 100644 index 000000000..430b43467 --- /dev/null +++ b/vendor/codeberg.org/gruf/go-storage/README.md @@ -0,0 +1,5 @@ +# go-storage + +A simple library providing various storage implementations with a simple read-write-stat interface. + +Supports: on-disk, memory, S3.
\ No newline at end of file diff --git a/vendor/codeberg.org/gruf/go-store/v2/storage/block.archived b/vendor/codeberg.org/gruf/go-storage/block.archived index 11a757211..11a757211 100644 --- a/vendor/codeberg.org/gruf/go-store/v2/storage/block.archived +++ b/vendor/codeberg.org/gruf/go-storage/block.archived diff --git a/vendor/codeberg.org/gruf/go-store/v2/storage/block_test.archived b/vendor/codeberg.org/gruf/go-storage/block_test.archived index 8436f067f..8436f067f 100644 --- a/vendor/codeberg.org/gruf/go-store/v2/storage/block_test.archived +++ b/vendor/codeberg.org/gruf/go-storage/block_test.archived diff --git a/vendor/codeberg.org/gruf/go-storage/disk/disk.go b/vendor/codeberg.org/gruf/go-storage/disk/disk.go new file mode 100644 index 000000000..b11346503 --- /dev/null +++ b/vendor/codeberg.org/gruf/go-storage/disk/disk.go @@ -0,0 +1,467 @@ +package disk + +import ( + "bytes" + "context" + "errors" + "io" + "io/fs" + "os" + "path" + "strings" + "syscall" + + "codeberg.org/gruf/go-fastcopy" + "codeberg.org/gruf/go-fastpath/v2" + "codeberg.org/gruf/go-storage" + "codeberg.org/gruf/go-storage/internal" +) + +// ensure DiskStorage conforms to storage.Storage. +var _ storage.Storage = (*DiskStorage)(nil) + +// DefaultConfig returns the default DiskStorage configuration. +func DefaultConfig() Config { + return defaultConfig +} + +// immutable default configuration. +var defaultConfig = Config{ + OpenRead: OpenArgs{syscall.O_RDONLY, 0o644}, + OpenWrite: OpenArgs{syscall.O_CREAT | syscall.O_WRONLY, 0o644}, + MkdirPerms: 0o755, + WriteBufSize: 4096, +} + +// OpenArgs defines args passed +// in a syscall.Open() operation. +type OpenArgs struct { + Flags int + Perms uint32 +} + +// Config defines options to be +// used when opening a DiskStorage. +type Config struct { + + // OpenRead are the arguments passed + // to syscall.Open() when opening a + // file for read operations. + OpenRead OpenArgs + + // OpenWrite are the arguments passed + // to syscall.Open() when opening a + // file for write operations. + OpenWrite OpenArgs + + // MkdirPerms are the permissions used + // when creating necessary sub-dirs in + // a storage key with slashes. + MkdirPerms uint32 + + // WriteBufSize is the buffer size + // to use when writing file streams. + WriteBufSize int +} + +// getDiskConfig returns valid (and owned!) Config for given ptr. +func getDiskConfig(cfg *Config) Config { + if cfg == nil { + // use defaults. + return defaultConfig + } + + // Ensure non-zero syscall args. + if cfg.OpenRead.Flags == 0 { + cfg.OpenRead.Flags = defaultConfig.OpenRead.Flags + } + if cfg.OpenRead.Perms == 0 { + cfg.OpenRead.Perms = defaultConfig.OpenRead.Perms + } + if cfg.OpenWrite.Flags == 0 { + cfg.OpenWrite.Flags = defaultConfig.OpenWrite.Flags + } + if cfg.OpenWrite.Perms == 0 { + cfg.OpenWrite.Perms = defaultConfig.OpenWrite.Perms + } + if cfg.MkdirPerms == 0 { + cfg.MkdirPerms = defaultConfig.MkdirPerms + } + + // Ensure valid write buf. + if cfg.WriteBufSize <= 0 { + cfg.WriteBufSize = defaultConfig.WriteBufSize + } + + return Config{ + OpenRead: cfg.OpenRead, + OpenWrite: cfg.OpenWrite, + MkdirPerms: cfg.MkdirPerms, + WriteBufSize: cfg.WriteBufSize, + } +} + +// DiskStorage is a Storage implementation +// that stores directly to a filesystem. +type DiskStorage struct { + path string // path is the root path of this store + pool fastcopy.CopyPool // pool is the prepared io copier with buffer pool + cfg Config // cfg is the supplied configuration for this store +} + +// Open opens a DiskStorage instance for given folder path and configuration. +func Open(path string, cfg *Config) (*DiskStorage, error) { + // Check + set config defaults. + config := getDiskConfig(cfg) + + // Clean provided storage path, ensure + // final '/' to help with path trimming. + pb := internal.GetPathBuilder() + path = pb.Clean(path) + "/" + internal.PutPathBuilder(pb) + + // Ensure directories up-to path exist. + perms := fs.FileMode(config.MkdirPerms) + err := os.MkdirAll(path, perms) + if err != nil { + return nil, err + } + + // Prepare DiskStorage. + st := &DiskStorage{ + path: path, + cfg: config, + } + + // Set fastcopy pool buffer size. + st.pool.Buffer(config.WriteBufSize) + + return st, nil +} + +// Clean: implements Storage.Clean(). +func (st *DiskStorage) Clean(ctx context.Context) error { + // Check context still valid. + if err := ctx.Err(); err != nil { + return err + } + + // Clean unused directories. + return cleanDirs(st.path, OpenArgs{ + Flags: syscall.O_RDONLY, + }) +} + +// ReadBytes: implements Storage.ReadBytes(). +func (st *DiskStorage) ReadBytes(ctx context.Context, key string) ([]byte, error) { + // Get stream reader for key + rc, err := st.ReadStream(ctx, key) + if err != nil { + return nil, err + } + + // Read all data to memory. + data, err := io.ReadAll(rc) + if err != nil { + _ = rc.Close() + return nil, err + } + + // Close storage stream reader. + if err := rc.Close(); err != nil { + return nil, err + } + + return data, nil +} + +// ReadStream: implements Storage.ReadStream(). +func (st *DiskStorage) ReadStream(ctx context.Context, key string) (io.ReadCloser, error) { + // Generate file path for key. + kpath, err := st.Filepath(key) + if err != nil { + return nil, err + } + + // Check context still valid. + if err := ctx.Err(); err != nil { + return nil, err + } + + // Attempt to open file with read args. + file, err := open(kpath, st.cfg.OpenRead) + if err != nil { + + if err == syscall.ENOENT { + // Translate not-found errors and wrap with key. + err = internal.ErrWithKey(storage.ErrNotFound, key) + } + + return nil, err + } + + return file, nil +} + +// WriteBytes: implements Storage.WriteBytes(). +func (st *DiskStorage) WriteBytes(ctx context.Context, key string, value []byte) (int, error) { + n, err := st.WriteStream(ctx, key, bytes.NewReader(value)) + return int(n), err +} + +// WriteStream: implements Storage.WriteStream(). +func (st *DiskStorage) WriteStream(ctx context.Context, key string, stream io.Reader) (int64, error) { + // Acquire path builder buffer. + pb := internal.GetPathBuilder() + + // Generate the file path for given key. + kpath, subdir, err := st.filepath(pb, key) + if err != nil { + return 0, err + } + + // Done with path buffer. + internal.PutPathBuilder(pb) + + // Check context still valid. + if err := ctx.Err(); err != nil { + return 0, err + } + + if subdir { + // Get dir of key path. + dir := path.Dir(kpath) + + // Note that subdir will only be set if + // the transformed key (without base path) + // contains any slashes. This is not a + // definitive check, but it allows us to + // skip a syscall if mkdirall not needed! + perms := fs.FileMode(st.cfg.MkdirPerms) + err = os.MkdirAll(dir, perms) + if err != nil { + return 0, err + } + } + + // Attempt to open file with write args. + file, err := open(kpath, st.cfg.OpenWrite) + if err != nil { + + if st.cfg.OpenWrite.Flags&syscall.O_EXCL != 0 && + err == syscall.EEXIST { + // Translate already exists errors and wrap with key. + err = internal.ErrWithKey(storage.ErrAlreadyExists, key) + } + + return 0, err + } + + // Copy provided stream to file interface. + n, err := st.pool.Copy(file, stream) + if err != nil { + _ = file.Close() + return n, err + } + + // Finally, close file. + return n, file.Close() +} + +// Stat implements Storage.Stat(). +func (st *DiskStorage) Stat(ctx context.Context, key string) (*storage.Entry, error) { + // Generate file path for key. + kpath, err := st.Filepath(key) + if err != nil { + return nil, err + } + + // Check context still valid. + if err := ctx.Err(); err != nil { + return nil, err + } + + // Stat file on disk. + stat, err := stat(kpath) + if stat == nil { + return nil, err + } + + return &storage.Entry{ + Key: key, + Size: stat.Size, + }, nil +} + +// Remove implements Storage.Remove(). +func (st *DiskStorage) Remove(ctx context.Context, key string) error { + // Generate file path for key. + kpath, err := st.Filepath(key) + if err != nil { + return err + } + + // Check context still valid. + if err := ctx.Err(); err != nil { + return err + } + + // Stat file on disk. + stat, err := stat(kpath) + if err != nil { + return err + } + + // Not-found (or handled + // as) error situations. + if stat == nil { + return internal.ErrWithKey(storage.ErrNotFound, key) + } else if stat.Mode&syscall.S_IFREG == 0 { + err := errors.New("storage/disk: not a regular file") + return internal.ErrWithKey(err, key) + } + + // Remove at path (we know this is file). + if err := unlink(kpath); err != nil { + + if err == syscall.ENOENT { + // Translate not-found errors and wrap with key. + err = internal.ErrWithKey(storage.ErrNotFound, key) + } + + return err + } + + return nil +} + +// WalkKeys implements Storage.WalkKeys(). +func (st *DiskStorage) WalkKeys(ctx context.Context, opts storage.WalkKeysOpts) error { + if opts.Step == nil { + panic("nil step fn") + } + + // Check context still valid. + if err := ctx.Err(); err != nil { + return err + } + + // Acquire path builder for walk. + pb := internal.GetPathBuilder() + defer internal.PutPathBuilder(pb) + + // Dir to walk. + dir := st.path + + if opts.Prefix != "" { + // Convert key prefix to one of our storage filepaths. + pathprefix, subdir, err := st.filepath(pb, opts.Prefix) + if err != nil { + return internal.ErrWithMsg(err, "prefix error") + } + + if subdir { + // Note that subdir will only be set if + // the transformed key (without base path) + // contains any slashes. This is not a + // definitive check, but it allows us to + // update the directory we walk in case + // it might narrow search parameters! + dir = path.Dir(pathprefix) + } + + // Set updated storage + // path prefix in opts. + opts.Prefix = pathprefix + } + + // Only need to open dirs as read-only. + args := OpenArgs{Flags: syscall.O_RDONLY} + + return walkDir(pb, dir, args, func(kpath string, fsentry fs.DirEntry) error { + if !fsentry.Type().IsRegular() { + // Ignore anything but + // regular file types. + return nil + } + + // Get full item path (without root). + kpath = pb.Join(kpath, fsentry.Name()) + + // Perform a fast filter check against storage path prefix (if set). + if opts.Prefix != "" && !strings.HasPrefix(kpath, opts.Prefix) { + return nil // ignore + } + + // Storage key without base. + key := kpath[len(st.path):] + + // Ignore filtered keys. + if opts.Filter != nil && + !opts.Filter(key) { + return nil // ignore + } + + // Load file info. This should already + // be loaded due to the underlying call + // to os.File{}.ReadDir() populating them. + info, err := fsentry.Info() + if err != nil { + return err + } + + // Perform provided walk function + return opts.Step(storage.Entry{ + Key: key, + Size: info.Size(), + }) + }) +} + +// Filepath checks and returns a formatted Filepath for given key. +func (st *DiskStorage) Filepath(key string) (path string, err error) { + pb := internal.GetPathBuilder() + path, _, err = st.filepath(pb, key) + internal.PutPathBuilder(pb) + return +} + +// filepath performs the "meat" of Filepath(), returning also if path *may* be a subdir of base. +func (st *DiskStorage) filepath(pb *fastpath.Builder, key string) (path string, subdir bool, err error) { + // Fast check for whether this may be a + // sub-directory. This is not a definitive + // check, it's only for a fastpath check. + subdir = strings.ContainsRune(key, '/') + + // Build from base. + pb.Append(st.path) + pb.Append(key) + + // Take COPY of bytes. + path = string(pb.B) + + // Check for dir traversal outside base. + if isDirTraversal(st.path, path) { + err = internal.ErrWithKey(storage.ErrInvalidKey, key) + } + + return +} + +// isDirTraversal will check if rootPlusPath is a dir traversal outside of root, +// assuming that both are cleaned and that rootPlusPath is path.Join(root, somePath). +func isDirTraversal(root, rootPlusPath string) bool { + switch { + // Root is $PWD, check for traversal out of + case root == ".": + return strings.HasPrefix(rootPlusPath, "../") + + // The path MUST be prefixed by root + case !strings.HasPrefix(rootPlusPath, root): + return true + + // In all other cases, check not equal + default: + return len(root) == len(rootPlusPath) + } +} diff --git a/vendor/codeberg.org/gruf/go-store/v2/storage/fs.go b/vendor/codeberg.org/gruf/go-storage/disk/fs.go index be86ac127..606d8fb0f 100644 --- a/vendor/codeberg.org/gruf/go-store/v2/storage/fs.go +++ b/vendor/codeberg.org/gruf/go-storage/disk/fs.go @@ -1,24 +1,14 @@ -package storage +package disk import ( + "errors" "fmt" "io/fs" "os" "syscall" "codeberg.org/gruf/go-fastpath/v2" - "codeberg.org/gruf/go-store/v2/util" -) - -const ( - // default file permission bits - defaultDirPerms = 0o755 - defaultFilePerms = 0o644 - - // default file open flags - defaultFileROFlags = syscall.O_RDONLY - defaultFileRWFlags = syscall.O_CREAT | syscall.O_RDWR - defaultFileLockFlags = syscall.O_RDONLY | syscall.O_CREAT + "codeberg.org/gruf/go-storage/internal" ) // NOTE: @@ -26,9 +16,9 @@ const ( // not necessarily for e.g. initial setup (OpenFile) // walkDir traverses the dir tree of the supplied path, performing the supplied walkFn on each entry -func walkDir(pb *fastpath.Builder, path string, walkFn func(string, fs.DirEntry) error) error { - // Read directory entries - entries, err := readDir(path) +func walkDir(pb *fastpath.Builder, path string, args OpenArgs, walkFn func(string, fs.DirEntry) error) error { + // Read directory entries at path. + entries, err := readDir(path, args) if err != nil { return err } @@ -85,7 +75,7 @@ outer: path = pb.Join(path, entry.Name()) // Read next directory entries - next, err := readDir(path) + next, err := readDir(path, args) if err != nil { return err } @@ -102,16 +92,17 @@ outer: } // cleanDirs traverses the dir tree of the supplied path, removing any folders with zero children -func cleanDirs(path string) error { - pb := util.GetPathBuilder() - defer util.PutPathBuilder(pb) - return cleanDir(pb, path, true) +func cleanDirs(path string, args OpenArgs) error { + pb := internal.GetPathBuilder() + err := cleanDir(pb, path, args, true) + internal.PutPathBuilder(pb) + return err } // cleanDir performs the actual dir cleaning logic for the above top-level version. -func cleanDir(pb *fastpath.Builder, path string, top bool) error { - // Get dir entries at path. - entries, err := readDir(path) +func cleanDir(pb *fastpath.Builder, path string, args OpenArgs, top bool) error { + // Get directory entries at path. + entries, err := readDir(path, args) if err != nil { return err } @@ -121,30 +112,36 @@ func cleanDir(pb *fastpath.Builder, path string, top bool) error { return rmdir(path) } + var errs []error + + // Iterate all directory entries. for _, entry := range entries { + if entry.IsDir() { // Calculate directory path. - dirPath := pb.Join(path, entry.Name()) + dir := pb.Join(path, entry.Name()) - // Recursively clean sub-directory entries. - if err := cleanDir(pb, dirPath, false); err != nil { - fmt.Fprintf(os.Stderr, "[go-store/storage] error cleaning %s: %v", dirPath, err) + // Recursively clean sub-directory entries, adding errs. + if err := cleanDir(pb, dir, args, false); err != nil { + err = fmt.Errorf("error(s) cleaning subdir %s: %w", dir, err) + errs = append(errs, err) } } } - return nil + // Return combined errors. + return errors.Join(errs...) } // readDir will open file at path, read the unsorted list of entries, then close. -func readDir(path string) ([]fs.DirEntry, error) { - // Open file at path - file, err := open(path, defaultFileROFlags) +func readDir(path string, args OpenArgs) ([]fs.DirEntry, error) { + // Open directory at path. + file, err := open(path, args) if err != nil { return nil, err } - // Read directory entries + // Read ALL directory entries. entries, err := file.ReadDir(-1) // Done with file @@ -153,11 +150,11 @@ func readDir(path string) ([]fs.DirEntry, error) { return entries, err } -// open will open a file at the given path with flags and default file perms. -func open(path string, flags int) (*os.File, error) { +// open is a simple wrapper around syscall.Open(). +func open(path string, args OpenArgs) (*os.File, error) { var fd int err := retryOnEINTR(func() (err error) { - fd, err = syscall.Open(path, flags, defaultFilePerms) + fd, err = syscall.Open(path, args.Flags, args.Perms) return }) if err != nil { @@ -166,8 +163,8 @@ func open(path string, flags int) (*os.File, error) { return os.NewFile(uintptr(fd), path), nil } -// stat checks for a file on disk. -func stat(path string) (bool, error) { +// stat is a simple wrapper around syscall.Stat(). +func stat(path string) (*syscall.Stat_t, error) { var stat syscall.Stat_t err := retryOnEINTR(func() error { return syscall.Stat(path, &stat) @@ -177,26 +174,27 @@ func stat(path string) (bool, error) { // not-found is no error err = nil } - return false, err + return nil, err } - return true, nil + return &stat, nil } -// unlink removes a file (not dir!) on disk. +// unlink is a simple wrapper around syscall.Unlink(). func unlink(path string) error { return retryOnEINTR(func() error { return syscall.Unlink(path) }) } -// rmdir removes a dir (not file!) on disk. +// rmdir is a simple wrapper around syscall.Rmdir(). func rmdir(path string) error { return retryOnEINTR(func() error { return syscall.Rmdir(path) }) } -// retryOnEINTR is a low-level filesystem function for retrying syscalls on O_EINTR received. +// retryOnEINTR is a low-level filesystem function +// for retrying syscalls on O_EINTR received. func retryOnEINTR(do func() error) error { for { err := do() diff --git a/vendor/codeberg.org/gruf/go-storage/errors.go b/vendor/codeberg.org/gruf/go-storage/errors.go new file mode 100644 index 000000000..1dd847011 --- /dev/null +++ b/vendor/codeberg.org/gruf/go-storage/errors.go @@ -0,0 +1,16 @@ +package storage + +import ( + "errors" +) + +var ( + // ErrNotFound is the error returned when a key cannot be found in storage + ErrNotFound = errors.New("storage: key not found") + + // ErrAlreadyExist is the error returned when a key already exists in storage + ErrAlreadyExists = errors.New("storage: key already exists") + + // ErrInvalidkey is the error returned when an invalid key is passed to storage + ErrInvalidKey = errors.New("storage: invalid key") +) diff --git a/vendor/codeberg.org/gruf/go-storage/internal/errors.go b/vendor/codeberg.org/gruf/go-storage/internal/errors.go new file mode 100644 index 000000000..6b10a8c90 --- /dev/null +++ b/vendor/codeberg.org/gruf/go-storage/internal/errors.go @@ -0,0 +1,56 @@ +package internal + +func ErrWithKey(err error, key string) error { + return &errorWithKey{key: key, err: err} +} + +type errorWithKey struct { + key string + err error +} + +func (err *errorWithKey) Error() string { + return err.err.Error() + ": " + err.key +} + +func (err *errorWithKey) Unwrap() error { + return err.err +} + +func ErrWithMsg(err error, msg string) error { + return &errorWithMsg{msg: msg, err: err} +} + +type errorWithMsg struct { + msg string + err error +} + +func (err *errorWithMsg) Error() string { + return err.msg + ": " + err.err.Error() +} + +func (err *errorWithMsg) Unwrap() error { + return err.err +} + +func WrapErr(inner, outer error) error { + return &wrappedError{inner: inner, outer: outer} +} + +type wrappedError struct { + inner error + outer error +} + +func (err *wrappedError) Is(other error) bool { + return err.inner == other || err.outer == other +} + +func (err *wrappedError) Error() string { + return err.inner.Error() + ": " + err.outer.Error() +} + +func (err *wrappedError) Unwrap() []error { + return []error{err.inner, err.outer} +} diff --git a/vendor/codeberg.org/gruf/go-storage/internal/path.go b/vendor/codeberg.org/gruf/go-storage/internal/path.go new file mode 100644 index 000000000..cd1c219bf --- /dev/null +++ b/vendor/codeberg.org/gruf/go-storage/internal/path.go @@ -0,0 +1,24 @@ +package internal + +import ( + "sync" + + "codeberg.org/gruf/go-fastpath/v2" +) + +var pathBuilderPool sync.Pool + +func GetPathBuilder() *fastpath.Builder { + v := pathBuilderPool.Get() + if v == nil { + pb := new(fastpath.Builder) + pb.B = make([]byte, 0, 512) + v = pb + } + return v.(*fastpath.Builder) +} + +func PutPathBuilder(pb *fastpath.Builder) { + pb.Reset() + pathBuilderPool.Put(pb) +} diff --git a/vendor/codeberg.org/gruf/go-storage/memory/memory.go b/vendor/codeberg.org/gruf/go-storage/memory/memory.go new file mode 100644 index 000000000..55728b827 --- /dev/null +++ b/vendor/codeberg.org/gruf/go-storage/memory/memory.go @@ -0,0 +1,253 @@ +package memory + +import ( + "bytes" + "context" + "io" + "strings" + "sync" + + "codeberg.org/gruf/go-iotools" + "codeberg.org/gruf/go-storage" + + "codeberg.org/gruf/go-storage/internal" +) + +// ensure MemoryStorage conforms to storage.Storage. +var _ storage.Storage = (*MemoryStorage)(nil) + +// MemoryStorage is a storage implementation that simply stores key-value +// pairs in a Go map in-memory. The map is protected by a mutex. +type MemoryStorage struct { + ow bool // overwrites + fs map[string][]byte + mu sync.Mutex +} + +// Open opens a new MemoryStorage instance with internal map starting size. +func Open(size int, overwrites bool) *MemoryStorage { + return &MemoryStorage{ + ow: overwrites, + fs: make(map[string][]byte, size), + } +} + +// Clean: implements Storage.Clean(). +func (st *MemoryStorage) Clean(ctx context.Context) error { + // Check context still valid + if err := ctx.Err(); err != nil { + return err + } + + // Lock map. + st.mu.Lock() + + // Resize map to only necessary size in-mem. + fs := make(map[string][]byte, len(st.fs)) + for key, val := range st.fs { + fs[key] = val + } + st.fs = fs + + // Done with lock. + st.mu.Unlock() + + return nil +} + +// ReadBytes: implements Storage.ReadBytes(). +func (st *MemoryStorage) ReadBytes(ctx context.Context, key string) ([]byte, error) { + // Check context still valid. + if err := ctx.Err(); err != nil { + return nil, err + } + + // Lock map. + st.mu.Lock() + + // Check key in store. + b, ok := st.fs[key] + if ok { + + // COPY bytes. + b = copyb(b) + } + + // Done with lock. + st.mu.Unlock() + + if !ok { + return nil, internal.ErrWithKey(storage.ErrNotFound, key) + } + + return b, nil +} + +// ReadStream: implements Storage.ReadStream(). +func (st *MemoryStorage) ReadStream(ctx context.Context, key string) (io.ReadCloser, error) { + // Read value data from store. + b, err := st.ReadBytes(ctx, key) + if err != nil { + return nil, err + } + + // Wrap in readcloser. + r := bytes.NewReader(b) + return iotools.NopReadCloser(r), nil +} + +// WriteBytes: implements Storage.WriteBytes(). +func (st *MemoryStorage) WriteBytes(ctx context.Context, key string, b []byte) (int, error) { + // Check context still valid + if err := ctx.Err(); err != nil { + return 0, err + } + + // Lock map. + st.mu.Lock() + + // Check key in store. + _, ok := st.fs[key] + + if ok && !st.ow { + // Done with lock. + st.mu.Unlock() + + // Overwrites are disabled, return existing key error. + return 0, internal.ErrWithKey(storage.ErrAlreadyExists, key) + } + + // Write copy to store. + st.fs[key] = copyb(b) + + // Done with lock. + st.mu.Unlock() + + return len(b), nil +} + +// WriteStream: implements Storage.WriteStream(). +func (st *MemoryStorage) WriteStream(ctx context.Context, key string, r io.Reader) (int64, error) { + // Read all from reader. + b, err := io.ReadAll(r) + if err != nil { + return 0, err + } + + // Write in-memory data to store. + n, err := st.WriteBytes(ctx, key, b) + return int64(n), err +} + +// Stat: implements Storage.Stat(). +func (st *MemoryStorage) Stat(ctx context.Context, key string) (*storage.Entry, error) { + // Check context still valid + if err := ctx.Err(); err != nil { + return nil, err + } + + // Lock map. + st.mu.Lock() + + // Check key in store. + b, ok := st.fs[key] + + // Get entry size. + sz := int64(len(b)) + + // Done with lock. + st.mu.Unlock() + + if !ok { + return nil, nil + } + + return &storage.Entry{ + Key: key, + Size: sz, + }, nil +} + +// Remove: implements Storage.Remove(). +func (st *MemoryStorage) Remove(ctx context.Context, key string) error { + // Check context still valid + if err := ctx.Err(); err != nil { + return err + } + + // Lock map. + st.mu.Lock() + + // Check key in store. + _, ok := st.fs[key] + + if ok { + // Delete store key. + delete(st.fs, key) + } + + // Done with lock. + st.mu.Unlock() + + if !ok { + return internal.ErrWithKey(storage.ErrNotFound, key) + } + + return nil +} + +// WalkKeys: implements Storage.WalkKeys(). +func (st *MemoryStorage) WalkKeys(ctx context.Context, opts storage.WalkKeysOpts) error { + if opts.Step == nil { + panic("nil step fn") + } + + // Check context still valid. + if err := ctx.Err(); err != nil { + return err + } + + var err error + + // Lock map. + st.mu.Lock() + + // Ensure unlocked. + defer st.mu.Unlock() + + // Range all key-vals in hash map. + for key, val := range st.fs { + // Check for filtered prefix. + if opts.Prefix != "" && + !strings.HasPrefix(key, opts.Prefix) { + continue // ignore + } + + // Check for filtered key. + if opts.Filter != nil && + !opts.Filter(key) { + continue // ignore + } + + // Pass to provided step func. + err = opts.Step(storage.Entry{ + Key: key, + Size: int64(len(val)), + }) + if err != nil { + return err + } + } + + return err +} + +// copyb returns a copy of byte-slice b. +func copyb(b []byte) []byte { + if b == nil { + return nil + } + p := make([]byte, len(b)) + _ = copy(p, b) + return p +} diff --git a/vendor/codeberg.org/gruf/go-storage/s3/errors.go b/vendor/codeberg.org/gruf/go-storage/s3/errors.go new file mode 100644 index 000000000..2cbdd2e9d --- /dev/null +++ b/vendor/codeberg.org/gruf/go-storage/s3/errors.go @@ -0,0 +1,47 @@ +package s3 + +import ( + "strings" + + "codeberg.org/gruf/go-storage" + "codeberg.org/gruf/go-storage/internal" + "github.com/minio/minio-go/v7" +) + +// transformS3Error transforms an error returned from S3Storage underlying +// minio.Core client, by wrapping where necessary with our own error types. +func transformS3Error(err error) error { + // Cast this to a minio error response + ersp, ok := err.(minio.ErrorResponse) + if ok { + switch ersp.Code { + case "NoSuchKey": + return internal.WrapErr(err, storage.ErrNotFound) + case "Conflict": + return internal.WrapErr(err, storage.ErrAlreadyExists) + default: + return err + } + } + + // Check if error has an invalid object name prefix + if strings.HasPrefix(err.Error(), "Object name ") { + return internal.WrapErr(err, storage.ErrInvalidKey) + } + + return err +} + +func isNotFoundError(err error) bool { + errRsp, ok := err.(minio.ErrorResponse) + return ok && errRsp.Code == "NoSuchKey" +} + +func isConflictError(err error) bool { + errRsp, ok := err.(minio.ErrorResponse) + return ok && errRsp.Code == "Conflict" +} + +func isObjectNameError(err error) bool { + return strings.HasPrefix(err.Error(), "Object name ") +} diff --git a/vendor/codeberg.org/gruf/go-storage/s3/s3.go b/vendor/codeberg.org/gruf/go-storage/s3/s3.go new file mode 100644 index 000000000..0067d3e19 --- /dev/null +++ b/vendor/codeberg.org/gruf/go-storage/s3/s3.go @@ -0,0 +1,479 @@ +package s3 + +import ( + "bytes" + "context" + "errors" + "io" + + "codeberg.org/gruf/go-storage" + "codeberg.org/gruf/go-storage/internal" + "github.com/minio/minio-go/v7" +) + +// ensure S3Storage conforms to storage.Storage. +var _ storage.Storage = (*S3Storage)(nil) + +// ensure bytes.Reader conforms to ReaderSize. +var _ ReaderSize = (*bytes.Reader)(nil) + +// ReaderSize is an extension of the io.Reader interface +// that may be implemented by callers of WriteStream() in +// order to improve performance. When the size is known it +// is passed onto the underlying minio S3 library. +type ReaderSize interface { + io.Reader + Size() int64 +} + +// DefaultConfig returns the default S3Storage configuration. +func DefaultConfig() Config { + return defaultConfig +} + +// immutable default configuration. +var defaultConfig = Config{ + CoreOpts: minio.Options{}, + GetOpts: minio.GetObjectOptions{}, + PutOpts: minio.PutObjectOptions{}, + PutChunkOpts: minio.PutObjectPartOptions{}, + PutChunkSize: 4 * 1024 * 1024, // 4MiB + StatOpts: minio.StatObjectOptions{}, + RemoveOpts: minio.RemoveObjectOptions{}, + ListSize: 200, +} + +// Config defines options to be used when opening an S3Storage, +// mostly options for underlying S3 client library. +type Config struct { + // CoreOpts are S3 client options + // passed during initialization. + CoreOpts minio.Options + + // GetOpts are S3 client options + // passed during .Read___() calls. + GetOpts minio.GetObjectOptions + + // PutOpts are S3 client options + // passed during .Write___() calls. + PutOpts minio.PutObjectOptions + + // PutChunkSize is the chunk size (in bytes) + // to use when sending a byte stream reader + // of unknown size as a multi-part object. + PutChunkSize int64 + + // PutChunkOpts are S3 client options + // passed during chunked .Write___() calls. + PutChunkOpts minio.PutObjectPartOptions + + // StatOpts are S3 client options + // passed during .Stat() calls. + StatOpts minio.StatObjectOptions + + // RemoveOpts are S3 client options + // passed during .Remove() calls. + RemoveOpts minio.RemoveObjectOptions + + // ListSize determines how many items + // to include in each list request, made + // during calls to .WalkKeys(). + ListSize int +} + +// getS3Config returns valid (and owned!) Config for given ptr. +func getS3Config(cfg *Config) Config { + // See: https://docs.aws.amazon.com/AmazonS3/latest/userguide/qfacts.html + const minChunkSz = 5 * 1024 * 1024 + + if cfg == nil { + // use defaults. + return defaultConfig + } + + // Ensure a minimum compat chunk size. + if cfg.PutChunkSize <= minChunkSz { + cfg.PutChunkSize = minChunkSz + } + + // Ensure valid list size. + if cfg.ListSize <= 0 { + cfg.ListSize = 200 + } + + return Config{ + CoreOpts: cfg.CoreOpts, + GetOpts: cfg.GetOpts, + PutOpts: cfg.PutOpts, + PutChunkSize: cfg.PutChunkSize, + ListSize: cfg.ListSize, + StatOpts: cfg.StatOpts, + RemoveOpts: cfg.RemoveOpts, + } +} + +// S3Storage is a storage implementation that stores key-value +// pairs in an S3 instance at given endpoint with bucket name. +type S3Storage struct { + client *minio.Core + bucket string + config Config +} + +// Open opens a new S3Storage instance with given S3 endpoint URL, bucket name and configuration. +func Open(endpoint string, bucket string, cfg *Config) (*S3Storage, error) { + // Check + set config defaults. + config := getS3Config(cfg) + + // Create new S3 client connection to given endpoint. + client, err := minio.NewCore(endpoint, &config.CoreOpts) + if err != nil { + return nil, err + } + + ctx := context.Background() + + // Check that provided bucket actually exists. + exists, err := client.BucketExists(ctx, bucket) + if err != nil { + return nil, err + } else if !exists { + return nil, errors.New("storage/s3: bucket does not exist") + } + + return &S3Storage{ + client: client, + bucket: bucket, + config: config, + }, nil +} + +// Client: returns access to the underlying S3 client. +func (st *S3Storage) Client() *minio.Core { + return st.client +} + +// Clean: implements Storage.Clean(). +func (st *S3Storage) Clean(ctx context.Context) error { + return nil // nothing to do for S3 +} + +// ReadBytes: implements Storage.ReadBytes(). +func (st *S3Storage) ReadBytes(ctx context.Context, key string) ([]byte, error) { + // Get stream reader for key + rc, err := st.ReadStream(ctx, key) + if err != nil { + return nil, err + } + + // Read all data to memory. + data, err := io.ReadAll(rc) + if err != nil { + _ = rc.Close() + return nil, err + } + + // Close storage stream reader. + if err := rc.Close(); err != nil { + return nil, err + } + + return data, nil +} + +// ReadStream: implements Storage.ReadStream(). +func (st *S3Storage) ReadStream(ctx context.Context, key string) (io.ReadCloser, error) { + // Fetch object reader from S3 bucket + rc, _, _, err := st.client.GetObject( + ctx, + st.bucket, + key, + st.config.GetOpts, + ) + if err != nil { + + if isNotFoundError(err) { + // Wrap not found errors as our not found type. + err = internal.WrapErr(err, storage.ErrNotFound) + } else if !isObjectNameError(err) { + // Wrap object name errors as our invalid key type. + err = internal.WrapErr(err, storage.ErrInvalidKey) + } + + return nil, transformS3Error(err) + } + return rc, nil +} + +// WriteBytes: implements Storage.WriteBytes(). +func (st *S3Storage) WriteBytes(ctx context.Context, key string, value []byte) (int, error) { + n, err := st.WriteStream(ctx, key, bytes.NewReader(value)) + return int(n), err +} + +// WriteStream: implements Storage.WriteStream(). +func (st *S3Storage) WriteStream(ctx context.Context, key string, r io.Reader) (int64, error) { + if rs, ok := r.(ReaderSize); ok { + // This reader supports providing us the size of + // the encompassed data, allowing us to perform + // a singular .PutObject() call with length. + info, err := st.client.PutObject( + ctx, + st.bucket, + key, + r, + rs.Size(), + "", + "", + st.config.PutOpts, + ) + if err != nil { + + if isConflictError(err) { + // Wrap conflict errors as our already exists type. + err = internal.WrapErr(err, storage.ErrAlreadyExists) + } else if !isObjectNameError(err) { + // Wrap object name errors as our invalid key type. + err = internal.WrapErr(err, storage.ErrInvalidKey) + } + + return 0, err + } + + return info.Size, nil + } + + // Start a new multipart upload to get ID. + uploadID, err := st.client.NewMultipartUpload( + ctx, + st.bucket, + key, + st.config.PutOpts, + ) + if err != nil { + + if isConflictError(err) { + // Wrap conflict errors as our already exists type. + err = internal.WrapErr(err, storage.ErrAlreadyExists) + } else if !isObjectNameError(err) { + // Wrap object name errors as our invalid key type. + err = internal.WrapErr(err, storage.ErrInvalidKey) + } + + return 0, transformS3Error(err) + } + + var ( + index = int(1) // parts index + total = int64(0) + parts []minio.CompletePart + chunk = make([]byte, st.config.PutChunkSize) + rbuf = bytes.NewReader(nil) + ) + + // Note that we do not perform any kind of + // memory pooling of the chunk buffers here. + // Optimal chunking sizes for S3 writes are in + // the orders of megabytes, so letting the GC + // collect these ASAP is much preferred. + +loop: + for done := false; !done; { + // Read next chunk into byte buffer. + n, err := io.ReadFull(r, chunk) + + switch err { + // Successful read. + case nil: + + // Reached end, buffer empty. + case io.EOF: + break loop + + // Reached end, but buffer not empty. + case io.ErrUnexpectedEOF: + done = true + + // All other errors. + default: + return 0, err + } + + // Reset byte reader. + rbuf.Reset(chunk[:n]) + + // Put this object chunk in S3 store. + pt, err := st.client.PutObjectPart( + ctx, + st.bucket, + key, + uploadID, + index, + rbuf, + int64(n), + st.config.PutChunkOpts, + ) + if err != nil { + return 0, err + } + + // Append completed part to slice. + parts = append(parts, minio.CompletePart{ + PartNumber: pt.PartNumber, + ETag: pt.ETag, + ChecksumCRC32: pt.ChecksumCRC32, + ChecksumCRC32C: pt.ChecksumCRC32C, + ChecksumSHA1: pt.ChecksumSHA1, + ChecksumSHA256: pt.ChecksumSHA256, + }) + + // Iterate. + index++ + + // Update total size. + total += pt.Size + } + + // Complete this multi-part upload operation + _, err = st.client.CompleteMultipartUpload( + ctx, + st.bucket, + key, + uploadID, + parts, + st.config.PutOpts, + ) + if err != nil { + return 0, err + } + + return total, nil +} + +// Stat: implements Storage.Stat(). +func (st *S3Storage) Stat(ctx context.Context, key string) (*storage.Entry, error) { + // Query object in S3 bucket. + stat, err := st.client.StatObject( + ctx, + st.bucket, + key, + st.config.StatOpts, + ) + if err != nil { + + if isNotFoundError(err) { + // Ignore err return + // for not-found. + err = nil + } else if !isObjectNameError(err) { + // Wrap object name errors as our invalid key type. + err = internal.WrapErr(err, storage.ErrInvalidKey) + } + + return nil, err + } + + return &storage.Entry{ + Key: key, + Size: stat.Size, + }, nil +} + +// Remove: implements Storage.Remove(). +func (st *S3Storage) Remove(ctx context.Context, key string) error { + // Query object in S3 bucket. + _, err := st.client.StatObject( + ctx, + st.bucket, + key, + st.config.StatOpts, + ) + if err != nil { + + if isNotFoundError(err) { + // Wrap not found errors as our not found type. + err = internal.WrapErr(err, storage.ErrNotFound) + } else if !isObjectNameError(err) { + // Wrap object name errors as our invalid key type. + err = internal.WrapErr(err, storage.ErrInvalidKey) + } + + return err + } + + // Remove object from S3 bucket + err = st.client.RemoveObject( + ctx, + st.bucket, + key, + st.config.RemoveOpts, + ) + if err != nil { + + if isNotFoundError(err) { + // Wrap not found errors as our not found type. + err = internal.WrapErr(err, storage.ErrNotFound) + } else if !isObjectNameError(err) { + // Wrap object name errors as our invalid key type. + err = internal.WrapErr(err, storage.ErrInvalidKey) + } + + return err + } + + return nil +} + +// WalkKeys: implements Storage.WalkKeys(). +func (st *S3Storage) WalkKeys(ctx context.Context, opts storage.WalkKeysOpts) error { + if opts.Step == nil { + panic("nil step fn") + } + + var ( + prev string + token string + ) + + for { + // List objects in bucket starting at marker. + result, err := st.client.ListObjectsV2( + st.bucket, + opts.Prefix, + prev, + token, + "", + st.config.ListSize, + ) + if err != nil { + return err + } + + // Iterate through list result contents. + for _, obj := range result.Contents { + + // Skip filtered obj keys. + if opts.Filter != nil && + opts.Filter(obj.Key) { + continue + } + + // Pass each obj through step func. + if err := opts.Step(storage.Entry{ + Key: obj.Key, + Size: obj.Size, + }); err != nil { + return err + } + } + + // No token means we reached end of bucket. + if result.NextContinuationToken == "" { + return nil + } + + // Set continue token and prev mark + token = result.NextContinuationToken + prev = result.StartAfter + } +} diff --git a/vendor/codeberg.org/gruf/go-storage/storage.go b/vendor/codeberg.org/gruf/go-storage/storage.go new file mode 100644 index 000000000..b13f2d387 --- /dev/null +++ b/vendor/codeberg.org/gruf/go-storage/storage.go @@ -0,0 +1,73 @@ +package storage + +import ( + "context" + "io" +) + +// Storage defines a means of accessing and storing +// data to some abstracted underlying mechanism. Whether +// that be in-memory, an on-disk filesystem or S3 bucket. +type Storage interface { + + // ReadBytes returns the data located at key (e.g. filepath) in storage. + ReadBytes(ctx context.Context, key string) ([]byte, error) + + // ReadStream returns an io.ReadCloser for the data at key (e.g. filepath) in storage. + ReadStream(ctx context.Context, key string) (io.ReadCloser, error) + + // WriteBytes writes the supplied data at key (e.g. filepath) in storage. + WriteBytes(ctx context.Context, key string, data []byte) (int, error) + + // WriteStream writes the supplied data stream at key (e.g. filepath) in storage. + WriteStream(ctx context.Context, key string, stream io.Reader) (int64, error) + + // Stat returns details about key (e.g. filepath) in storage, nil indicates not found. + Stat(ctx context.Context, key string) (*Entry, error) + + // Remove will remove data at key from storage. + Remove(ctx context.Context, key string) error + + // Clean in simple terms performs a clean of underlying + // storage mechanism. For memory implementations this may + // compact the underlying hashmap, for disk filesystems + // this may remove now-unused directories. + Clean(ctx context.Context) error + + // WalkKeys walks available keys using opts in storage. + WalkKeys(ctx context.Context, opts WalkKeysOpts) error +} + +// Entry represents a key in a Storage{} implementation, +// with any associated metadata that may have been set. +type Entry struct { + + // Key is this entry's + // unique storage key. + Key string + + // Size is the size of + // this entry in storage. + Size int64 +} + +// WalkKeysOpts are arguments provided +// to a storage WalkKeys() implementation. +type WalkKeysOpts struct { + + // Prefix can be used to filter entries + // by the given key prefix, for example + // only those under a subdirectory. This + // is preferred over Filter() function. + Prefix string + + // Filter can be used to filter entries + // by any custom metric before before they + // are passed to Step() function. E.g. + // filter storage entries by regexp. + Filter func(string) bool + + // Step is called for each entry during + // WalkKeys, error triggers early return. + Step func(Entry) error +} diff --git a/vendor/codeberg.org/gruf/go-storage/test.sh b/vendor/codeberg.org/gruf/go-storage/test.sh new file mode 100644 index 000000000..91286b5c8 --- /dev/null +++ b/vendor/codeberg.org/gruf/go-storage/test.sh @@ -0,0 +1,29 @@ +#!/bin/sh + +export \ + MINIO_ADDR='127.0.0.1:8080' \ + MINIO_BUCKET='test' \ + MINIO_ROOT_USER='root' \ + MINIO_ROOT_PASSWORD='password' \ + MINIO_PID=0 \ + S3_DIR=$(mktemp -d) + +# Drop the test S3 bucket and kill minio on exit +trap 'rm -rf "$S3_DIR"; [ $MINIO_PID -ne 0 ] && kill -9 $MINIO_PID' \ + HUP INT QUIT ABRT KILL TERM EXIT + +# Create required S3 bucket dir +mkdir -p "${S3_DIR}/${MINIO_BUCKET}" + +# Start the minio test S3 server instance +minio server --address "$MINIO_ADDR" "$S3_DIR" & > /dev/null 2>&1 +MINIO_PID=$!; [ $? -ne 0 ] && { + echo 'failed to start minio' + exit 1 +} + +# Let server startup +sleep 1 + +# Run go-store tests +go test ./... -v
\ No newline at end of file diff --git a/vendor/codeberg.org/gruf/go-store/v2/storage/compressor.go b/vendor/codeberg.org/gruf/go-store/v2/storage/compressor.go deleted file mode 100644 index bbe02f22d..000000000 --- a/vendor/codeberg.org/gruf/go-store/v2/storage/compressor.go +++ /dev/null @@ -1,303 +0,0 @@ -package storage - -import ( - "bytes" - "io" - "sync" - - "codeberg.org/gruf/go-iotools" - - "github.com/klauspost/compress/gzip" - "github.com/klauspost/compress/snappy" - "github.com/klauspost/compress/zlib" -) - -// Compressor defines a means of compressing/decompressing values going into a key-value store -type Compressor interface { - // Reader returns a new decompressing io.ReadCloser based on supplied (compressed) io.Reader - Reader(io.ReadCloser) (io.ReadCloser, error) - - // Writer returns a new compressing io.WriteCloser based on supplied (uncompressed) io.Writer - Writer(io.WriteCloser) (io.WriteCloser, error) -} - -type gzipCompressor struct { - rpool sync.Pool - wpool sync.Pool -} - -// GZipCompressor returns a new Compressor that implements GZip at default compression level -func GZipCompressor() Compressor { - return GZipCompressorLevel(gzip.DefaultCompression) -} - -// GZipCompressorLevel returns a new Compressor that implements GZip at supplied compression level -func GZipCompressorLevel(level int) Compressor { - // GZip readers immediately check for valid - // header data on allocation / reset, so we - // need a set of valid header data so we can - // iniitialize reader instances in mempool. - hdr := bytes.NewBuffer(nil) - - // Init writer to ensure valid level provided - gw, err := gzip.NewWriterLevel(hdr, level) - if err != nil { - panic(err) - } - - // Write empty data to ensure gzip - // header data is in byte buffer. - _, _ = gw.Write([]byte{}) - _ = gw.Close() - - return &gzipCompressor{ - rpool: sync.Pool{ - New: func() any { - hdr := bytes.NewReader(hdr.Bytes()) - gr, _ := gzip.NewReader(hdr) - return gr - }, - }, - wpool: sync.Pool{ - New: func() any { - gw, _ := gzip.NewWriterLevel(nil, level) - return gw - }, - }, - } -} - -func (c *gzipCompressor) Reader(rc io.ReadCloser) (io.ReadCloser, error) { - var released bool - - // Acquire from pool. - gr := c.rpool.Get().(*gzip.Reader) - if err := gr.Reset(rc); err != nil { - c.rpool.Put(gr) - return nil, err - } - - return iotools.ReadCloser(gr, iotools.CloserFunc(func() error { - if !released { - released = true - defer c.rpool.Put(gr) - } - - // Close compressor - err1 := gr.Close() - - // Close original stream. - err2 := rc.Close() - - // Return err1 or 2 - if err1 != nil { - return err1 - } - return err2 - })), nil -} - -func (c *gzipCompressor) Writer(wc io.WriteCloser) (io.WriteCloser, error) { - var released bool - - // Acquire from pool. - gw := c.wpool.Get().(*gzip.Writer) - gw.Reset(wc) - - return iotools.WriteCloser(gw, iotools.CloserFunc(func() error { - if !released { - released = true - c.wpool.Put(gw) - } - - // Close compressor - err1 := gw.Close() - - // Close original stream. - err2 := wc.Close() - - // Return err1 or 2 - if err1 != nil { - return err1 - } - return err2 - })), nil -} - -type zlibCompressor struct { - rpool sync.Pool - wpool sync.Pool - dict []byte -} - -// ZLibCompressor returns a new Compressor that implements ZLib at default compression level -func ZLibCompressor() Compressor { - return ZLibCompressorLevelDict(zlib.DefaultCompression, nil) -} - -// ZLibCompressorLevel returns a new Compressor that implements ZLib at supplied compression level -func ZLibCompressorLevel(level int) Compressor { - return ZLibCompressorLevelDict(level, nil) -} - -// ZLibCompressorLevelDict returns a new Compressor that implements ZLib at supplied compression level with supplied dict -func ZLibCompressorLevelDict(level int, dict []byte) Compressor { - // ZLib readers immediately check for valid - // header data on allocation / reset, so we - // need a set of valid header data so we can - // iniitialize reader instances in mempool. - hdr := bytes.NewBuffer(nil) - - // Init writer to ensure valid level + dict provided - zw, err := zlib.NewWriterLevelDict(hdr, level, dict) - if err != nil { - panic(err) - } - - // Write empty data to ensure zlib - // header data is in byte buffer. - zw.Write([]byte{}) - zw.Close() - - return &zlibCompressor{ - rpool: sync.Pool{ - New: func() any { - hdr := bytes.NewReader(hdr.Bytes()) - zr, _ := zlib.NewReaderDict(hdr, dict) - return zr - }, - }, - wpool: sync.Pool{ - New: func() any { - zw, _ := zlib.NewWriterLevelDict(nil, level, dict) - return zw - }, - }, - dict: dict, - } -} - -func (c *zlibCompressor) Reader(rc io.ReadCloser) (io.ReadCloser, error) { - var released bool - zr := c.rpool.Get().(interface { - io.ReadCloser - zlib.Resetter - }) - if err := zr.Reset(rc, c.dict); err != nil { - c.rpool.Put(zr) - return nil, err - } - return iotools.ReadCloser(zr, iotools.CloserFunc(func() error { - if !released { - released = true - defer c.rpool.Put(zr) - } - - // Close compressor - err1 := zr.Close() - - // Close original stream. - err2 := rc.Close() - - // Return err1 or 2 - if err1 != nil { - return err1 - } - return err2 - })), nil -} - -func (c *zlibCompressor) Writer(wc io.WriteCloser) (io.WriteCloser, error) { - var released bool - - // Acquire from pool. - zw := c.wpool.Get().(*zlib.Writer) - zw.Reset(wc) - - return iotools.WriteCloser(zw, iotools.CloserFunc(func() error { - if !released { - released = true - c.wpool.Put(zw) - } - - // Close compressor - err1 := zw.Close() - - // Close original stream. - err2 := wc.Close() - - // Return err1 or 2 - if err1 != nil { - return err1 - } - return err2 - })), nil -} - -type snappyCompressor struct { - rpool sync.Pool - wpool sync.Pool -} - -// SnappyCompressor returns a new Compressor that implements Snappy. -func SnappyCompressor() Compressor { - return &snappyCompressor{ - rpool: sync.Pool{ - New: func() any { return snappy.NewReader(nil) }, - }, - wpool: sync.Pool{ - New: func() any { return snappy.NewWriter(nil) }, - }, - } -} - -func (c *snappyCompressor) Reader(rc io.ReadCloser) (io.ReadCloser, error) { - var released bool - - // Acquire from pool. - sr := c.rpool.Get().(*snappy.Reader) - sr.Reset(rc) - - return iotools.ReadCloser(sr, iotools.CloserFunc(func() error { - if !released { - released = true - defer c.rpool.Put(sr) - } - - // Close original stream. - return rc.Close() - })), nil -} - -func (c *snappyCompressor) Writer(wc io.WriteCloser) (io.WriteCloser, error) { - var released bool - - // Acquire from pool. - sw := c.wpool.Get().(*snappy.Writer) - sw.Reset(wc) - - return iotools.WriteCloser(sw, iotools.CloserFunc(func() error { - if !released { - released = true - c.wpool.Put(sw) - } - - // Close original stream. - return wc.Close() - })), nil -} - -type nopCompressor struct{} - -// NoCompression is a Compressor that simply does nothing. -func NoCompression() Compressor { - return &nopCompressor{} -} - -func (c *nopCompressor) Reader(rc io.ReadCloser) (io.ReadCloser, error) { - return rc, nil -} - -func (c *nopCompressor) Writer(wc io.WriteCloser) (io.WriteCloser, error) { - return wc, nil -} diff --git a/vendor/codeberg.org/gruf/go-store/v2/storage/disk.go b/vendor/codeberg.org/gruf/go-store/v2/storage/disk.go deleted file mode 100644 index 3104400f3..000000000 --- a/vendor/codeberg.org/gruf/go-store/v2/storage/disk.go +++ /dev/null @@ -1,424 +0,0 @@ -package storage - -import ( - "context" - "errors" - "io" - "io/fs" - "os" - "path" - _path "path" - "strings" - "syscall" - - "codeberg.org/gruf/go-bytes" - "codeberg.org/gruf/go-fastcopy" - "codeberg.org/gruf/go-store/v2/util" -) - -// DefaultDiskConfig is the default DiskStorage configuration. -var DefaultDiskConfig = &DiskConfig{ - Overwrite: true, - WriteBufSize: 4096, - Transform: NopTransform(), - Compression: NoCompression(), -} - -// DiskConfig defines options to be used when opening a DiskStorage. -type DiskConfig struct { - // Transform is the supplied key <--> path KeyTransform. - Transform KeyTransform - - // WriteBufSize is the buffer size to use when writing file streams. - WriteBufSize int - - // Overwrite allows overwriting values of stored keys in the storage. - Overwrite bool - - // LockFile allows specifying the filesystem path to use for the lockfile, - // providing only a filename it will store the lockfile within provided store - // path and nest the store under `path/store` to prevent access to lockfile. - LockFile string - - // Compression is the Compressor to use when reading / writing files, - // default is no compression. - Compression Compressor -} - -// getDiskConfig returns a valid DiskConfig for supplied ptr. -func getDiskConfig(cfg *DiskConfig) DiskConfig { - // If nil, use default - if cfg == nil { - cfg = DefaultDiskConfig - } - - // Assume nil transform == none - if cfg.Transform == nil { - cfg.Transform = NopTransform() - } - - // Assume nil compress == none - if cfg.Compression == nil { - cfg.Compression = NoCompression() - } - - // Assume 0 buf size == use default - if cfg.WriteBufSize <= 0 { - cfg.WriteBufSize = DefaultDiskConfig.WriteBufSize - } - - // Assume empty lockfile path == use default - if len(cfg.LockFile) == 0 { - cfg.LockFile = LockFile - } - - // Return owned config copy - return DiskConfig{ - Transform: cfg.Transform, - WriteBufSize: cfg.WriteBufSize, - Overwrite: cfg.Overwrite, - LockFile: cfg.LockFile, - Compression: cfg.Compression, - } -} - -// DiskStorage is a Storage implementation that stores directly to a filesystem. -type DiskStorage struct { - path string // path is the root path of this store - cppool fastcopy.CopyPool // cppool is the prepared io copier with buffer pool - config DiskConfig // cfg is the supplied configuration for this store - lock *Lock // lock is the opened lockfile for this storage instance -} - -// OpenDisk opens a DiskStorage instance for given folder path and configuration. -func OpenDisk(path string, cfg *DiskConfig) (*DiskStorage, error) { - // Get checked config - config := getDiskConfig(cfg) - - // Acquire path builder - pb := util.GetPathBuilder() - defer util.PutPathBuilder(pb) - - // Clean provided store path, ensure - // ends in '/' to help later path trimming - storePath := pb.Clean(path) + "/" - - // Clean provided lockfile path - lockfile := pb.Clean(config.LockFile) - - // Check if lockfile is an *actual* path or just filename - if lockDir, _ := _path.Split(lockfile); lockDir == "" { - // Lockfile is a filename, store must be nested under - // $storePath/store to prevent access to the lockfile - storePath += "store/" - lockfile = pb.Join(path, lockfile) - } - - // Attempt to open dir path - file, err := os.OpenFile(storePath, defaultFileROFlags, defaultDirPerms) - if err != nil { - // If not a not-exist error, return - if !os.IsNotExist(err) { - return nil, err - } - - // Attempt to make store path dirs - err = os.MkdirAll(storePath, defaultDirPerms) - if err != nil { - return nil, err - } - - // Reopen dir now it's been created - file, err = os.OpenFile(storePath, defaultFileROFlags, defaultDirPerms) - if err != nil { - return nil, err - } - } - defer file.Close() - - // Double check this is a dir (NOT a file!) - stat, err := file.Stat() - if err != nil { - return nil, err - } else if !stat.IsDir() { - return nil, errors.New("store/storage: path is file") - } - - // Open and acquire storage lock for path - lock, err := OpenLock(lockfile) - if err != nil { - return nil, err - } - - // Prepare DiskStorage - st := &DiskStorage{ - path: storePath, - config: config, - lock: lock, - } - - // Set copypool buffer size - st.cppool.Buffer(config.WriteBufSize) - - return st, nil -} - -// Clean implements Storage.Clean(). -func (st *DiskStorage) Clean(ctx context.Context) error { - // Check if open - if st.lock.Closed() { - return ErrClosed - } - - // Check context still valid - if err := ctx.Err(); err != nil { - return err - } - - // Clean-out unused directories - return cleanDirs(st.path) -} - -// ReadBytes implements Storage.ReadBytes(). -func (st *DiskStorage) ReadBytes(ctx context.Context, key string) ([]byte, error) { - // Get stream reader for key - rc, err := st.ReadStream(ctx, key) - if err != nil { - return nil, err - } - defer rc.Close() - - // Read all bytes and return - return io.ReadAll(rc) -} - -// ReadStream implements Storage.ReadStream(). -func (st *DiskStorage) ReadStream(ctx context.Context, key string) (io.ReadCloser, error) { - // Get file path for key - kpath, err := st.Filepath(key) - if err != nil { - return nil, err - } - - // Check if open - if st.lock.Closed() { - return nil, ErrClosed - } - - // Check context still valid - if err := ctx.Err(); err != nil { - return nil, err - } - - // Attempt to open file (replace ENOENT with our own) - file, err := open(kpath, defaultFileROFlags) - if err != nil { - return nil, errSwapNotFound(err) - } - - // Wrap the file in a compressor - cFile, err := st.config.Compression.Reader(file) - if err != nil { - _ = file.Close() - return nil, err - } - - return cFile, nil -} - -// WriteBytes implements Storage.WriteBytes(). -func (st *DiskStorage) WriteBytes(ctx context.Context, key string, value []byte) (int, error) { - n, err := st.WriteStream(ctx, key, bytes.NewReader(value)) - return int(n), err -} - -// WriteStream implements Storage.WriteStream(). -func (st *DiskStorage) WriteStream(ctx context.Context, key string, r io.Reader) (int64, error) { - // Get file path for key - kpath, err := st.Filepath(key) - if err != nil { - return 0, err - } - - // Check if open - if st.lock.Closed() { - return 0, ErrClosed - } - - // Check context still valid - if err := ctx.Err(); err != nil { - return 0, err - } - - // Ensure dirs leading up to file exist - err = os.MkdirAll(path.Dir(kpath), defaultDirPerms) - if err != nil { - return 0, err - } - - // Prepare to swap error if need-be - errSwap := errSwapNoop - - // Build file RW flags - flags := defaultFileRWFlags - if !st.config.Overwrite { - flags |= syscall.O_EXCL - - // Catch + replace err exist - errSwap = errSwapExist - } - - // Attempt to open file - file, err := open(kpath, flags) - if err != nil { - return 0, errSwap(err) - } - - // Wrap the file in a compressor - cFile, err := st.config.Compression.Writer(file) - if err != nil { - _ = file.Close() - return 0, err - } - - // Wraps file.Close(). - defer cFile.Close() - - // Copy provided reader to file - return st.cppool.Copy(cFile, r) -} - -// Stat implements Storage.Stat(). -func (st *DiskStorage) Stat(ctx context.Context, key string) (bool, error) { - // Get file path for key - kpath, err := st.Filepath(key) - if err != nil { - return false, err - } - - // Check if open - if st.lock.Closed() { - return false, ErrClosed - } - - // Check context still valid - if err := ctx.Err(); err != nil { - return false, err - } - - // Check for file on disk - return stat(kpath) -} - -// Remove implements Storage.Remove(). -func (st *DiskStorage) Remove(ctx context.Context, key string) error { - // Get file path for key - kpath, err := st.Filepath(key) - if err != nil { - return err - } - - // Check if open - if st.lock.Closed() { - return ErrClosed - } - - // Check context still valid - if err := ctx.Err(); err != nil { - return err - } - - // Remove at path (we know this is file) - if err := unlink(kpath); err != nil { - return errSwapNotFound(err) - } - - return nil -} - -// Close implements Storage.Close(). -func (st *DiskStorage) Close() error { - return st.lock.Close() -} - -// WalkKeys implements Storage.WalkKeys(). -func (st *DiskStorage) WalkKeys(ctx context.Context, opts WalkKeysOptions) error { - // Check if open - if st.lock.Closed() { - return ErrClosed - } - - // Check context still valid - if err := ctx.Err(); err != nil { - return err - } - - // Acquire path builder - pb := util.GetPathBuilder() - defer util.PutPathBuilder(pb) - - // Walk dir for entries - return walkDir(pb, st.path, func(kpath string, fsentry fs.DirEntry) error { - if !fsentry.Type().IsRegular() { - // Only deal with regular files - return nil - } - - // Get full item path (without root) - kpath = pb.Join(kpath, fsentry.Name()) - kpath = kpath[len(st.path):] - - // Load file info. This should already - // be loaded due to the underlying call - // to os.File{}.ReadDir() populating them - info, err := fsentry.Info() - if err != nil { - return err - } - - // Perform provided walk function - return opts.WalkFn(ctx, Entry{ - Key: st.config.Transform.PathToKey(kpath), - Size: info.Size(), - }) - }) -} - -// Filepath checks and returns a formatted Filepath for given key. -func (st *DiskStorage) Filepath(key string) (string, error) { - // Calculate transformed key path - key = st.config.Transform.KeyToPath(key) - - // Acquire path builder - pb := util.GetPathBuilder() - defer util.PutPathBuilder(pb) - - // Generate key path - pb.Append(st.path) - pb.Append(key) - - // Check for dir traversal outside of root - if isDirTraversal(st.path, pb.String()) { - return "", ErrInvalidKey - } - - return string(pb.B), nil -} - -// isDirTraversal will check if rootPlusPath is a dir traversal outside of root, -// assuming that both are cleaned and that rootPlusPath is path.Join(root, somePath). -func isDirTraversal(root, rootPlusPath string) bool { - switch { - // Root is $PWD, check for traversal out of - case root == ".": - return strings.HasPrefix(rootPlusPath, "../") - - // The path MUST be prefixed by root - case !strings.HasPrefix(rootPlusPath, root): - return true - - // In all other cases, check not equal - default: - return len(root) == len(rootPlusPath) - } -} diff --git a/vendor/codeberg.org/gruf/go-store/v2/storage/errors.go b/vendor/codeberg.org/gruf/go-store/v2/storage/errors.go deleted file mode 100644 index 4ae7e4be5..000000000 --- a/vendor/codeberg.org/gruf/go-store/v2/storage/errors.go +++ /dev/null @@ -1,110 +0,0 @@ -package storage - -import ( - "errors" - "strings" - "syscall" - - "github.com/minio/minio-go/v7" -) - -var ( - // ErrClosed is returned on operations on a closed storage - ErrClosed = new_error("closed") - - // ErrNotFound is the error returned when a key cannot be found in storage - ErrNotFound = new_error("key not found") - - // ErrAlreadyExist is the error returned when a key already exists in storage - ErrAlreadyExists = new_error("key already exists") - - // ErrInvalidkey is the error returned when an invalid key is passed to storage - ErrInvalidKey = new_error("invalid key") - - // ErrAlreadyLocked is returned on fail opening a storage lockfile - ErrAlreadyLocked = new_error("storage lock already open") -) - -// new_error returns a new error instance prefixed by package prefix. -func new_error(msg string) error { - return errors.New("store/storage: " + msg) -} - -// wrappedError allows wrapping together an inner with outer error. -type wrappedError struct { - inner error - outer error -} - -// wrap will return a new wrapped error from given inner and outer errors. -func wrap(outer, inner error) *wrappedError { - return &wrappedError{ - inner: inner, - outer: outer, - } -} - -func (e *wrappedError) Is(target error) bool { - return e.outer == target || e.inner == target -} - -func (e *wrappedError) Error() string { - return e.outer.Error() + ": " + e.inner.Error() -} - -func (e *wrappedError) Unwrap() error { - return e.inner -} - -// errSwapNoop performs no error swaps -func errSwapNoop(err error) error { - return err -} - -// ErrSwapNotFound swaps syscall.ENOENT for ErrNotFound -func errSwapNotFound(err error) error { - if err == syscall.ENOENT { - return ErrNotFound - } - return err -} - -// errSwapExist swaps syscall.EEXIST for ErrAlreadyExists -func errSwapExist(err error) error { - if err == syscall.EEXIST { - return ErrAlreadyExists - } - return err -} - -// errSwapUnavailable swaps syscall.EAGAIN for ErrAlreadyLocked -func errSwapUnavailable(err error) error { - if err == syscall.EAGAIN { - return ErrAlreadyLocked - } - return err -} - -// transformS3Error transforms an error returned from S3Storage underlying -// minio.Core client, by wrapping where necessary with our own error types. -func transformS3Error(err error) error { - // Cast this to a minio error response - ersp, ok := err.(minio.ErrorResponse) - if ok { - switch ersp.Code { - case "NoSuchKey": - return wrap(ErrNotFound, err) - case "Conflict": - return wrap(ErrAlreadyExists, err) - default: - return err - } - } - - // Check if error has an invalid object name prefix - if strings.HasPrefix(err.Error(), "Object name ") { - return wrap(ErrInvalidKey, err) - } - - return err -} diff --git a/vendor/codeberg.org/gruf/go-store/v2/storage/lock.go b/vendor/codeberg.org/gruf/go-store/v2/storage/lock.go deleted file mode 100644 index 25ecefe52..000000000 --- a/vendor/codeberg.org/gruf/go-store/v2/storage/lock.go +++ /dev/null @@ -1,59 +0,0 @@ -package storage - -import ( - "sync/atomic" - "syscall" -) - -// LockFile is our standard lockfile name. -const LockFile = "store.lock" - -// Lock represents a filesystem lock to ensure only one storage instance open per path. -type Lock struct { - fd int - st uint32 -} - -// OpenLock opens a lockfile at path. -func OpenLock(path string) (*Lock, error) { - var fd int - - // Open the file descriptor at path - err := retryOnEINTR(func() (err error) { - fd, err = syscall.Open(path, defaultFileLockFlags, defaultFilePerms) - return - }) - if err != nil { - return nil, err - } - - // Get a flock on the file descriptor - err = retryOnEINTR(func() error { - return syscall.Flock(fd, syscall.LOCK_EX|syscall.LOCK_NB) - }) - if err != nil { - return nil, errSwapUnavailable(err) - } - - return &Lock{fd: fd}, nil -} - -// Close will attempt to close the lockfile and file descriptor. -func (f *Lock) Close() error { - var err error - if atomic.CompareAndSwapUint32(&f.st, 0, 1) { - // Ensure gets closed - defer syscall.Close(f.fd) - - // Call funlock on the file descriptor - err = retryOnEINTR(func() error { - return syscall.Flock(f.fd, syscall.LOCK_UN|syscall.LOCK_NB) - }) - } - return err -} - -// Closed will return whether this lockfile has been closed (and unlocked). -func (f *Lock) Closed() bool { - return (atomic.LoadUint32(&f.st) == 1) -} diff --git a/vendor/codeberg.org/gruf/go-store/v2/storage/memory.go b/vendor/codeberg.org/gruf/go-store/v2/storage/memory.go deleted file mode 100644 index d42274e39..000000000 --- a/vendor/codeberg.org/gruf/go-store/v2/storage/memory.go +++ /dev/null @@ -1,228 +0,0 @@ -package storage - -import ( - "context" - "io" - "sync/atomic" - - "codeberg.org/gruf/go-bytes" - "codeberg.org/gruf/go-iotools" - "github.com/cornelk/hashmap" -) - -// MemoryStorage is a storage implementation that simply stores key-value -// pairs in a Go map in-memory. The map is protected by a mutex. -type MemoryStorage struct { - ow bool // overwrites - fs *hashmap.Map[string, []byte] - st uint32 -} - -// OpenMemory opens a new MemoryStorage instance with internal map starting size. -func OpenMemory(size int, overwrites bool) *MemoryStorage { - if size <= 0 { - size = 8 - } - return &MemoryStorage{ - fs: hashmap.NewSized[string, []byte](uintptr(size)), - ow: overwrites, - } -} - -// Clean implements Storage.Clean(). -func (st *MemoryStorage) Clean(ctx context.Context) error { - // Check store open - if st.closed() { - return ErrClosed - } - - // Check context still valid - if err := ctx.Err(); err != nil { - return err - } - - return nil -} - -// ReadBytes implements Storage.ReadBytes(). -func (st *MemoryStorage) ReadBytes(ctx context.Context, key string) ([]byte, error) { - // Check store open - if st.closed() { - return nil, ErrClosed - } - - // Check context still valid - if err := ctx.Err(); err != nil { - return nil, err - } - - // Check for key in store - b, ok := st.fs.Get(key) - if !ok { - return nil, ErrNotFound - } - - // Create return copy - return copyb(b), nil -} - -// ReadStream implements Storage.ReadStream(). -func (st *MemoryStorage) ReadStream(ctx context.Context, key string) (io.ReadCloser, error) { - // Check store open - if st.closed() { - return nil, ErrClosed - } - - // Check context still valid - if err := ctx.Err(); err != nil { - return nil, err - } - - // Check for key in store - b, ok := st.fs.Get(key) - if !ok { - return nil, ErrNotFound - } - - // Create io.ReadCloser from 'b' copy - r := bytes.NewReader(copyb(b)) - return iotools.NopReadCloser(r), nil -} - -// WriteBytes implements Storage.WriteBytes(). -func (st *MemoryStorage) WriteBytes(ctx context.Context, key string, b []byte) (int, error) { - // Check store open - if st.closed() { - return 0, ErrClosed - } - - // Check context still valid - if err := ctx.Err(); err != nil { - return 0, err - } - - // Check for key that already exists - if _, ok := st.fs.Get(key); ok && !st.ow { - return 0, ErrAlreadyExists - } - - // Write key copy to store - st.fs.Set(key, copyb(b)) - return len(b), nil -} - -// WriteStream implements Storage.WriteStream(). -func (st *MemoryStorage) WriteStream(ctx context.Context, key string, r io.Reader) (int64, error) { - // Check store open - if st.closed() { - return 0, ErrClosed - } - - // Check context still valid - if err := ctx.Err(); err != nil { - return 0, err - } - - // Check for key that already exists - if _, ok := st.fs.Get(key); ok && !st.ow { - return 0, ErrAlreadyExists - } - - // Read all from reader - b, err := io.ReadAll(r) - if err != nil { - return 0, err - } - - // Write key to store - st.fs.Set(key, b) - return int64(len(b)), nil -} - -// Stat implements Storage.Stat(). -func (st *MemoryStorage) Stat(ctx context.Context, key string) (bool, error) { - // Check store open - if st.closed() { - return false, ErrClosed - } - - // Check context still valid - if err := ctx.Err(); err != nil { - return false, err - } - - // Check for key in store - _, ok := st.fs.Get(key) - return ok, nil -} - -// Remove implements Storage.Remove(). -func (st *MemoryStorage) Remove(ctx context.Context, key string) error { - // Check store open - if st.closed() { - return ErrClosed - } - - // Check context still valid - if err := ctx.Err(); err != nil { - return err - } - - // Attempt to delete key - ok := st.fs.Del(key) - if !ok { - return ErrNotFound - } - - return nil -} - -// WalkKeys implements Storage.WalkKeys(). -func (st *MemoryStorage) WalkKeys(ctx context.Context, opts WalkKeysOptions) error { - // Check store open - if st.closed() { - return ErrClosed - } - - // Check context still valid - if err := ctx.Err(); err != nil { - return err - } - - var err error - - // Nil check func - _ = opts.WalkFn - - // Pass each key in map to walk function - st.fs.Range(func(key string, val []byte) bool { - err = opts.WalkFn(ctx, Entry{ - Key: key, - Size: int64(len(val)), - }) - return (err == nil) - }) - - return err -} - -// Close implements Storage.Close(). -func (st *MemoryStorage) Close() error { - atomic.StoreUint32(&st.st, 1) - return nil -} - -// closed returns whether MemoryStorage is closed. -func (st *MemoryStorage) closed() bool { - return (atomic.LoadUint32(&st.st) == 1) -} - -// copyb returns a copy of byte-slice b. -func copyb(b []byte) []byte { - if b == nil { - return nil - } - p := make([]byte, len(b)) - _ = copy(p, b) - return p -} diff --git a/vendor/codeberg.org/gruf/go-store/v2/storage/s3.go b/vendor/codeberg.org/gruf/go-store/v2/storage/s3.go deleted file mode 100644 index 965fe0d4f..000000000 --- a/vendor/codeberg.org/gruf/go-store/v2/storage/s3.go +++ /dev/null @@ -1,397 +0,0 @@ -package storage - -import ( - "bytes" - "context" - "io" - "sync/atomic" - - "codeberg.org/gruf/go-store/v2/util" - "github.com/minio/minio-go/v7" -) - -// DefaultS3Config is the default S3Storage configuration. -var DefaultS3Config = &S3Config{ - CoreOpts: minio.Options{}, - GetOpts: minio.GetObjectOptions{}, - PutOpts: minio.PutObjectOptions{}, - PutChunkOpts: minio.PutObjectPartOptions{}, - PutChunkSize: 4 * 1024 * 1024, // 4MiB - StatOpts: minio.StatObjectOptions{}, - RemoveOpts: minio.RemoveObjectOptions{}, - ListSize: 200, -} - -// S3Config defines options to be used when opening an S3Storage, -// mostly options for underlying S3 client library. -type S3Config struct { - // CoreOpts are S3 client options passed during initialization. - CoreOpts minio.Options - - // GetOpts are S3 client options passed during .Read___() calls. - GetOpts minio.GetObjectOptions - - // PutOpts are S3 client options passed during .Write___() calls. - PutOpts minio.PutObjectOptions - - // PutChunkSize is the chunk size (in bytes) to use when sending - // a byte stream reader of unknown size as a multi-part object. - PutChunkSize int64 - - // PutChunkOpts are S3 client options passed during chunked .Write___() calls. - PutChunkOpts minio.PutObjectPartOptions - - // StatOpts are S3 client options passed during .Stat() calls. - StatOpts minio.StatObjectOptions - - // RemoveOpts are S3 client options passed during .Remove() calls. - RemoveOpts minio.RemoveObjectOptions - - // ListSize determines how many items to include in each - // list request, made during calls to .WalkKeys(). - ListSize int -} - -// getS3Config returns a valid S3Config for supplied ptr. -func getS3Config(cfg *S3Config) S3Config { - const minChunkSz = 5 * 1024 * 1024 - - // If nil, use default - if cfg == nil { - cfg = DefaultS3Config - } - - // Ensure a minimum compatible chunk size - if cfg.PutChunkSize <= minChunkSz { - // See: https://docs.aws.amazon.com/AmazonS3/latest/userguide/qfacts.html - cfg.PutChunkSize = minChunkSz - } - - // Assume 0 list size == use default - if cfg.ListSize <= 0 { - cfg.ListSize = 200 - } - - // Return owned config copy - return S3Config{ - CoreOpts: cfg.CoreOpts, - GetOpts: cfg.GetOpts, - PutOpts: cfg.PutOpts, - PutChunkSize: cfg.PutChunkSize, - ListSize: cfg.ListSize, - StatOpts: cfg.StatOpts, - RemoveOpts: cfg.RemoveOpts, - } -} - -// S3Storage is a storage implementation that stores key-value -// pairs in an S3 instance at given endpoint with bucket name. -type S3Storage struct { - client *minio.Core - bucket string - config S3Config - state uint32 -} - -// OpenS3 opens a new S3Storage instance with given S3 endpoint URL, bucket name and configuration. -func OpenS3(endpoint string, bucket string, cfg *S3Config) (*S3Storage, error) { - // Get checked config - config := getS3Config(cfg) - - // Create new S3 client connection - client, err := minio.NewCore(endpoint, &config.CoreOpts) - if err != nil { - return nil, err - } - - // Check that provided bucket actually exists - exists, err := client.BucketExists(context.Background(), bucket) - if err != nil { - return nil, err - } else if !exists { - return nil, new_error("bucket does not exist") - } - - return &S3Storage{ - client: client, - bucket: bucket, - config: config, - }, nil -} - -// Client returns access to the underlying S3 client. -func (st *S3Storage) Client() *minio.Core { - return st.client -} - -// Clean implements Storage.Clean(). -func (st *S3Storage) Clean(ctx context.Context) error { - return nil // nothing to do for S3 -} - -// ReadBytes implements Storage.ReadBytes(). -func (st *S3Storage) ReadBytes(ctx context.Context, key string) ([]byte, error) { - // Fetch object reader from S3 bucket - rc, err := st.ReadStream(ctx, key) - if err != nil { - return nil, err - } - defer rc.Close() - - // Read all bytes and return - return io.ReadAll(rc) -} - -// ReadStream implements Storage.ReadStream(). -func (st *S3Storage) ReadStream(ctx context.Context, key string) (io.ReadCloser, error) { - // Check storage open - if st.closed() { - return nil, ErrClosed - } - - // Fetch object reader from S3 bucket - rc, _, _, err := st.client.GetObject( - ctx, - st.bucket, - key, - st.config.GetOpts, - ) - if err != nil { - return nil, transformS3Error(err) - } - - return rc, nil -} - -// WriteBytes implements Storage.WriteBytes(). -func (st *S3Storage) WriteBytes(ctx context.Context, key string, value []byte) (int, error) { - n, err := st.WriteStream(ctx, key, util.NewByteReaderSize(value)) - return int(n), err -} - -// WriteStream implements Storage.WriteStream(). -func (st *S3Storage) WriteStream(ctx context.Context, key string, r io.Reader) (int64, error) { - // Check storage open - if st.closed() { - return 0, ErrClosed - } - - if rs, ok := r.(util.ReaderSize); ok { - // This reader supports providing us the size of - // the encompassed data, allowing us to perform - // a singular .PutObject() call with length. - info, err := st.client.PutObject( - ctx, - st.bucket, - key, - r, - rs.Size(), - "", - "", - st.config.PutOpts, - ) - if err != nil { - err = transformS3Error(err) - } - return info.Size, err - } - - // Start a new multipart upload to get ID - uploadID, err := st.client.NewMultipartUpload( - ctx, - st.bucket, - key, - st.config.PutOpts, - ) - if err != nil { - return 0, transformS3Error(err) - } - - var ( - index = int(1) // parts index - total = int64(0) - parts []minio.CompletePart - chunk = make([]byte, st.config.PutChunkSize) - rbuf = bytes.NewReader(nil) - ) - - // Note that we do not perform any kind of - // memory pooling of the chunk buffers here. - // Optimal chunking sizes for S3 writes are in - // the orders of megabytes, so letting the GC - // collect these ASAP is much preferred. - -loop: - for done := false; !done; { - // Read next chunk into byte buffer - n, err := io.ReadFull(r, chunk) - - switch err { - // Successful read - case nil: - - // Reached end, buffer empty - case io.EOF: - break loop - - // Reached end, but buffer not empty - case io.ErrUnexpectedEOF: - done = true - - // All other errors - default: - return 0, err - } - - // Reset byte reader - rbuf.Reset(chunk[:n]) - - // Put this object chunk in S3 store - pt, err := st.client.PutObjectPart( - ctx, - st.bucket, - key, - uploadID, - index, - rbuf, - int64(n), - st.config.PutChunkOpts, - ) - if err != nil { - return 0, err - } - - // Append completed part to slice - parts = append(parts, minio.CompletePart{ - PartNumber: pt.PartNumber, - ETag: pt.ETag, - ChecksumCRC32: pt.ChecksumCRC32, - ChecksumCRC32C: pt.ChecksumCRC32C, - ChecksumSHA1: pt.ChecksumSHA1, - ChecksumSHA256: pt.ChecksumSHA256, - }) - - // Iterate idx - index++ - - // Update total size - total += pt.Size - } - - // Complete this multi-part upload operation - _, err = st.client.CompleteMultipartUpload( - ctx, - st.bucket, - key, - uploadID, - parts, - st.config.PutOpts, - ) - if err != nil { - return 0, err - } - - return total, nil -} - -// Stat implements Storage.Stat(). -func (st *S3Storage) Stat(ctx context.Context, key string) (bool, error) { - // Check storage open - if st.closed() { - return false, ErrClosed - } - - // Query object in S3 bucket - _, err := st.client.StatObject( - ctx, - st.bucket, - key, - st.config.StatOpts, - ) - if err != nil { - return false, transformS3Error(err) - } - - return true, nil -} - -// Remove implements Storage.Remove(). -func (st *S3Storage) Remove(ctx context.Context, key string) error { - // Check storage open - if st.closed() { - return ErrClosed - } - - // S3 returns no error on remove for non-existent keys - if ok, err := st.Stat(ctx, key); err != nil { - return err - } else if !ok { - return ErrNotFound - } - - // Remove object from S3 bucket - err := st.client.RemoveObject( - ctx, - st.bucket, - key, - st.config.RemoveOpts, - ) - if err != nil { - return transformS3Error(err) - } - - return nil -} - -// WalkKeys implements Storage.WalkKeys(). -func (st *S3Storage) WalkKeys(ctx context.Context, opts WalkKeysOptions) error { - var ( - prev string - token string - ) - - for { - // List the objects in bucket starting at marker - result, err := st.client.ListObjectsV2( - st.bucket, - "", - prev, - token, - "", - st.config.ListSize, - ) - if err != nil { - return err - } - - // Pass each object through walk func - for _, obj := range result.Contents { - if err := opts.WalkFn(ctx, Entry{ - Key: obj.Key, - Size: obj.Size, - }); err != nil { - return err - } - } - - // No token means we reached end of bucket - if result.NextContinuationToken == "" { - return nil - } - - // Set continue token and prev mark - token = result.NextContinuationToken - prev = result.StartAfter - } -} - -// Close implements Storage.Close(). -func (st *S3Storage) Close() error { - atomic.StoreUint32(&st.state, 1) - return nil -} - -// closed returns whether S3Storage is closed. -func (st *S3Storage) closed() bool { - return (atomic.LoadUint32(&st.state) == 1) -} diff --git a/vendor/codeberg.org/gruf/go-store/v2/storage/storage.go b/vendor/codeberg.org/gruf/go-store/v2/storage/storage.go deleted file mode 100644 index a60ea93ad..000000000 --- a/vendor/codeberg.org/gruf/go-store/v2/storage/storage.go +++ /dev/null @@ -1,53 +0,0 @@ -package storage - -import ( - "context" - "io" -) - -// Storage defines a means of storing and accessing key value pairs -type Storage interface { - // ReadBytes returns the byte value for key in storage - ReadBytes(ctx context.Context, key string) ([]byte, error) - - // ReadStream returns an io.ReadCloser for the value bytes at key in the storage - ReadStream(ctx context.Context, key string) (io.ReadCloser, error) - - // WriteBytes writes the supplied value bytes at key in the storage - WriteBytes(ctx context.Context, key string, value []byte) (int, error) - - // WriteStream writes the bytes from supplied reader at key in the storage - WriteStream(ctx context.Context, key string, r io.Reader) (int64, error) - - // Stat checks if the supplied key is in the storage - Stat(ctx context.Context, key string) (bool, error) - - // Remove attempts to remove the supplied key-value pair from storage - Remove(ctx context.Context, key string) error - - // Close will close the storage, releasing any file locks - Close() error - - // Clean removes unused values and unclutters the storage (e.g. removing empty folders) - Clean(ctx context.Context) error - - // WalkKeys walks the keys in the storage - WalkKeys(ctx context.Context, opts WalkKeysOptions) error -} - -// Entry represents a key in a Storage{} implementation, -// with any associated metadata that may have been set. -type Entry struct { - // Key is this entry's unique storage key. - Key string - - // Size is the size of this entry in storage. - // Note that size < 0 indicates unknown. - Size int64 -} - -// WalkKeysOptions defines how to walk the keys in a storage implementation -type WalkKeysOptions struct { - // WalkFn is the function to apply on each StorageEntry - WalkFn func(context.Context, Entry) error -} diff --git a/vendor/codeberg.org/gruf/go-store/v2/storage/transform.go b/vendor/codeberg.org/gruf/go-store/v2/storage/transform.go deleted file mode 100644 index 3863dd774..000000000 --- a/vendor/codeberg.org/gruf/go-store/v2/storage/transform.go +++ /dev/null @@ -1,25 +0,0 @@ -package storage - -// KeyTransform defines a method of converting store keys to storage paths (and vice-versa) -type KeyTransform interface { - // KeyToPath converts a supplied key to storage path - KeyToPath(string) string - - // PathToKey converts a supplied storage path to key - PathToKey(string) string -} - -type nopKeyTransform struct{} - -// NopTransform returns a nop key transform (i.e. key = path) -func NopTransform() KeyTransform { - return &nopKeyTransform{} -} - -func (t *nopKeyTransform) KeyToPath(key string) string { - return key -} - -func (t *nopKeyTransform) PathToKey(path string) string { - return path -} diff --git a/vendor/codeberg.org/gruf/go-store/v2/util/io.go b/vendor/codeberg.org/gruf/go-store/v2/util/io.go deleted file mode 100644 index c5135084a..000000000 --- a/vendor/codeberg.org/gruf/go-store/v2/util/io.go +++ /dev/null @@ -1,41 +0,0 @@ -package util - -import ( - "bytes" - "io" -) - -// ReaderSize defines a reader of known size in bytes. -type ReaderSize interface { - io.Reader - Size() int64 -} - -// ByteReaderSize implements ReaderSize for an in-memory byte-slice. -type ByteReaderSize struct { - br bytes.Reader - sz int64 -} - -// NewByteReaderSize returns a new ByteReaderSize instance reset to slice b. -func NewByteReaderSize(b []byte) *ByteReaderSize { - rs := new(ByteReaderSize) - rs.Reset(b) - return rs -} - -// Read implements io.Reader. -func (rs *ByteReaderSize) Read(b []byte) (int, error) { - return rs.br.Read(b) -} - -// Size implements ReaderSize. -func (rs *ByteReaderSize) Size() int64 { - return rs.sz -} - -// Reset resets the ReaderSize to be reading from b. -func (rs *ByteReaderSize) Reset(b []byte) { - rs.br.Reset(b) - rs.sz = int64(len(b)) -} diff --git a/vendor/codeberg.org/gruf/go-store/v2/util/pool.go b/vendor/codeberg.org/gruf/go-store/v2/util/pool.go deleted file mode 100644 index ec5b501fe..000000000 --- a/vendor/codeberg.org/gruf/go-store/v2/util/pool.go +++ /dev/null @@ -1,26 +0,0 @@ -package util - -import ( - "sync" - - "codeberg.org/gruf/go-fastpath/v2" -) - -// pathBuilderPool is the global fastpath.Builder pool. -var pathBuilderPool = sync.Pool{ - New: func() any { - return &fastpath.Builder{B: make([]byte, 0, 512)} - }, -} - -// GetPathBuilder fetches a fastpath.Builder object from the pool. -func GetPathBuilder() *fastpath.Builder { - pb, _ := pathBuilderPool.Get().(*fastpath.Builder) - return pb -} - -// PutPathBuilder places supplied fastpath.Builder back in the pool. -func PutPathBuilder(pb *fastpath.Builder) { - pb.Reset() - pathBuilderPool.Put(pb) -} |