summaryrefslogtreecommitdiff
path: root/vendor/codeberg.org/gruf/go-store/v2/storage
diff options
context:
space:
mode:
authorLibravatar tobi <31960611+tsmethurst@users.noreply.github.com>2022-11-05 12:10:19 +0100
committerLibravatar GitHub <noreply@github.com>2022-11-05 11:10:19 +0000
commitbcb80d3ff4a669d52d63950c8830427646c05884 (patch)
tree4aa95a83545b3f87a80fe4b625cb6f2ad9c4427f /vendor/codeberg.org/gruf/go-store/v2/storage
parent[bugfix] Increase field size limits when registering apps (#958) (diff)
downloadgotosocial-bcb80d3ff4a669d52d63950c8830427646c05884.tar.xz
[chore] bump gruf/go-store to v2 (#953)
* [chore] bump gruf/go-store to v2 * no more boobs
Diffstat (limited to 'vendor/codeberg.org/gruf/go-store/v2/storage')
-rw-r--r--vendor/codeberg.org/gruf/go-store/v2/storage/block.go885
-rw-r--r--vendor/codeberg.org/gruf/go-store/v2/storage/compressor.go212
-rw-r--r--vendor/codeberg.org/gruf/go-store/v2/storage/disk.go425
-rw-r--r--vendor/codeberg.org/gruf/go-store/v2/storage/errors.go110
-rw-r--r--vendor/codeberg.org/gruf/go-store/v2/storage/fs.go221
-rw-r--r--vendor/codeberg.org/gruf/go-store/v2/storage/lock.go59
-rw-r--r--vendor/codeberg.org/gruf/go-store/v2/storage/memory.go228
-rw-r--r--vendor/codeberg.org/gruf/go-store/v2/storage/s3.go385
-rw-r--r--vendor/codeberg.org/gruf/go-store/v2/storage/storage.go53
-rw-r--r--vendor/codeberg.org/gruf/go-store/v2/storage/transform.go25
10 files changed, 2603 insertions, 0 deletions
diff --git a/vendor/codeberg.org/gruf/go-store/v2/storage/block.go b/vendor/codeberg.org/gruf/go-store/v2/storage/block.go
new file mode 100644
index 000000000..b1081cb1c
--- /dev/null
+++ b/vendor/codeberg.org/gruf/go-store/v2/storage/block.go
@@ -0,0 +1,885 @@
+package storage
+
+import (
+ "bytes"
+ "context"
+ "crypto/sha256"
+ "fmt"
+ "io"
+ "io/fs"
+ "os"
+ "strings"
+ "sync"
+ "syscall"
+
+ "codeberg.org/gruf/go-byteutil"
+ "codeberg.org/gruf/go-errors/v2"
+ "codeberg.org/gruf/go-fastcopy"
+ "codeberg.org/gruf/go-hashenc"
+ "codeberg.org/gruf/go-pools"
+ "codeberg.org/gruf/go-store/v2/util"
+)
+
+var (
+ nodePathPrefix = "node/"
+ blockPathPrefix = "block/"
+)
+
+// DefaultBlockConfig is the default BlockStorage configuration.
+var DefaultBlockConfig = &BlockConfig{
+ BlockSize: 1024 * 16,
+ WriteBufSize: 4096,
+ Overwrite: false,
+ Compression: NoCompression(),
+}
+
+// BlockConfig defines options to be used when opening a BlockStorage.
+type BlockConfig struct {
+ // BlockSize is the chunking size to use when splitting and storing blocks of data.
+ BlockSize int
+
+ // ReadBufSize is the buffer size to use when reading node files.
+ ReadBufSize int
+
+ // WriteBufSize is the buffer size to use when writing file streams.
+ WriteBufSize int
+
+ // Overwrite allows overwriting values of stored keys in the storage.
+ Overwrite bool
+
+ // Compression is the Compressor to use when reading / writing files,
+ // default is no compression.
+ Compression Compressor
+}
+
+// getBlockConfig returns a valid BlockConfig for supplied ptr.
+func getBlockConfig(cfg *BlockConfig) BlockConfig {
+ // If nil, use default
+ if cfg == nil {
+ cfg = DefaultBlockConfig
+ }
+
+ // Assume nil compress == none
+ if cfg.Compression == nil {
+ cfg.Compression = NoCompression()
+ }
+
+ // Assume 0 chunk size == use default
+ if cfg.BlockSize <= 0 {
+ cfg.BlockSize = DefaultBlockConfig.BlockSize
+ }
+
+ // Assume 0 buf size == use default
+ if cfg.WriteBufSize <= 0 {
+ cfg.WriteBufSize = DefaultDiskConfig.WriteBufSize
+ }
+
+ // Return owned config copy
+ return BlockConfig{
+ BlockSize: cfg.BlockSize,
+ WriteBufSize: cfg.WriteBufSize,
+ Overwrite: cfg.Overwrite,
+ Compression: cfg.Compression,
+ }
+}
+
+// BlockStorage is a Storage implementation that stores input data as chunks on
+// a filesystem. Each value is chunked into blocks of configured size and these
+// blocks are stored with name equal to their base64-encoded SHA256 hash-sum. A
+// "node" file is finally created containing an array of hashes contained within
+// this value.
+type BlockStorage struct {
+ path string // path is the root path of this store
+ blockPath string // blockPath is the joined root path + block path prefix
+ nodePath string // nodePath is the joined root path + node path prefix
+ config BlockConfig // cfg is the supplied configuration for this store
+ hashPool sync.Pool // hashPool is this store's hashEncoder pool
+ bufpool pools.BufferPool // bufpool is this store's bytes.Buffer pool
+ cppool fastcopy.CopyPool // cppool is the prepared io copier with buffer pool
+ lock *Lock // lock is the opened lockfile for this storage instance
+
+ // NOTE:
+ // BlockStorage does not need to lock each of the underlying block files
+ // as the filename itself directly relates to the contents. If there happens
+ // to be an overwrite, it will just be of the same data since the filename is
+ // the hash of the data.
+}
+
+// OpenBlock opens a BlockStorage instance for given folder path and configuration.
+func OpenBlock(path string, cfg *BlockConfig) (*BlockStorage, error) {
+ // Acquire path builder
+ pb := util.GetPathBuilder()
+ defer util.PutPathBuilder(pb)
+
+ // Clean provided path, ensure ends in '/' (should
+ // be dir, this helps with file path trimming later)
+ path = pb.Clean(path) + "/"
+
+ // Get checked config
+ config := getBlockConfig(cfg)
+
+ // Attempt to open path
+ file, err := os.OpenFile(path, defaultFileROFlags, defaultDirPerms)
+ if err != nil {
+ // If not a not-exist error, return
+ if !os.IsNotExist(err) {
+ return nil, err
+ }
+
+ // Attempt to make store path dirs
+ err = os.MkdirAll(path, defaultDirPerms)
+ if err != nil {
+ return nil, err
+ }
+
+ // Reopen dir now it's been created
+ file, err = os.OpenFile(path, defaultFileROFlags, defaultDirPerms)
+ if err != nil {
+ return nil, err
+ }
+ }
+ defer file.Close()
+
+ // Double check this is a dir (NOT a file!)
+ stat, err := file.Stat()
+ if err != nil {
+ return nil, err
+ } else if !stat.IsDir() {
+ return nil, new_error("path is file")
+ }
+
+ // Open and acquire storage lock for path
+ lock, err := OpenLock(pb.Join(path, LockFile))
+ if err != nil {
+ return nil, err
+ }
+
+ // Figure out the largest size for bufpool slices
+ bufSz := encodedHashLen
+ if bufSz < config.BlockSize {
+ bufSz = config.BlockSize
+ }
+ if bufSz < config.WriteBufSize {
+ bufSz = config.WriteBufSize
+ }
+
+ // Prepare BlockStorage
+ st := &BlockStorage{
+ path: path,
+ blockPath: pb.Join(path, blockPathPrefix),
+ nodePath: pb.Join(path, nodePathPrefix),
+ config: config,
+ hashPool: sync.Pool{
+ New: func() interface{} {
+ return newHashEncoder()
+ },
+ },
+ bufpool: pools.NewBufferPool(bufSz),
+ lock: lock,
+ }
+
+ // Set copypool buffer size
+ st.cppool.Buffer(config.ReadBufSize)
+
+ return st, nil
+}
+
+// Clean implements storage.Clean().
+func (st *BlockStorage) Clean(ctx context.Context) error {
+ // Check if open
+ if st.lock.Closed() {
+ return ErrClosed
+ }
+
+ // Check context still valid
+ if err := ctx.Err(); err != nil {
+ return err
+ }
+
+ // Acquire path builder
+ pb := util.GetPathBuilder()
+ defer util.PutPathBuilder(pb)
+
+ nodes := map[string]*node{}
+
+ // Walk nodes dir for entries
+ err := walkDir(pb, st.nodePath, func(npath string, fsentry fs.DirEntry) error {
+ // Only deal with regular files
+ if !fsentry.Type().IsRegular() {
+ return nil
+ }
+
+ // Get joined node path name
+ npath = pb.Join(npath, fsentry.Name())
+
+ // Attempt to open RO file
+ file, err := open(npath, defaultFileROFlags)
+ if err != nil {
+ return err
+ }
+ defer file.Close()
+
+ // Alloc new Node + acquire hash buffer for writes
+ hbuf := st.bufpool.Get()
+ defer st.bufpool.Put(hbuf)
+ hbuf.Guarantee(encodedHashLen)
+ node := node{}
+
+ // Write file contents to node
+ _, err = io.CopyBuffer(
+ &nodeWriter{
+ node: &node,
+ buf: hbuf,
+ },
+ file,
+ nil,
+ )
+ if err != nil {
+ return err
+ }
+
+ // Append to nodes slice
+ nodes[fsentry.Name()] = &node
+ return nil
+ })
+
+ // Handle errors (though nodePath may not have been created yet)
+ if err != nil && !os.IsNotExist(err) {
+ return err
+ }
+
+ // Walk blocks dir for entries
+ err = walkDir(pb, st.blockPath, func(bpath string, fsentry fs.DirEntry) error {
+ // Only deal with regular files
+ if !fsentry.Type().IsRegular() {
+ return nil
+ }
+
+ inUse := false
+ for key, node := range nodes {
+ if node.removeHash(fsentry.Name()) {
+ if len(node.hashes) < 1 {
+ // This node contained hash, and after removal is now empty.
+ // Remove this node from our tracked nodes slice
+ delete(nodes, key)
+ }
+ inUse = true
+ }
+ }
+
+ // Block hash is used by node
+ if inUse {
+ return nil
+ }
+
+ // Get joined block path name
+ bpath = pb.Join(bpath, fsentry.Name())
+
+ // Remove this unused block path
+ return os.Remove(bpath)
+ })
+
+ // Handle errors (though blockPath may not have been created yet)
+ if err != nil && !os.IsNotExist(err) {
+ return err
+ }
+
+ // If there are nodes left at this point, they are corrupt
+ // (i.e. they're referencing block hashes that don't exist)
+ if len(nodes) > 0 {
+ nodeKeys := []string{}
+ for key := range nodes {
+ nodeKeys = append(nodeKeys, key)
+ }
+ return fmt.Errorf("store/storage: corrupted nodes: %v", nodeKeys)
+ }
+
+ return nil
+}
+
+// ReadBytes implements Storage.ReadBytes().
+func (st *BlockStorage) ReadBytes(ctx context.Context, key string) ([]byte, error) {
+ // Get stream reader for key
+ rc, err := st.ReadStream(ctx, key)
+ if err != nil {
+ return nil, err
+ }
+ defer rc.Close()
+
+ // Read all bytes and return
+ return io.ReadAll(rc)
+}
+
+// ReadStream implements Storage.ReadStream().
+func (st *BlockStorage) ReadStream(ctx context.Context, key string) (io.ReadCloser, error) {
+ // Get node file path for key
+ npath, err := st.nodePathForKey(key)
+ if err != nil {
+ return nil, err
+ }
+
+ // Check if open
+ if st.lock.Closed() {
+ return nil, ErrClosed
+ }
+
+ // Check context still valid
+ if err := ctx.Err(); err != nil {
+ return nil, err
+ }
+
+ // Attempt to open RO file
+ file, err := open(npath, defaultFileROFlags)
+ if err != nil {
+ return nil, errSwapNotFound(err)
+ }
+ defer file.Close()
+
+ // Acquire hash buffer for writes
+ hbuf := st.bufpool.Get()
+ defer st.bufpool.Put(hbuf)
+
+ var node node
+
+ // Write file contents to node
+ _, err = st.cppool.Copy(
+ &nodeWriter{
+ node: &node,
+ buf: hbuf,
+ },
+ file,
+ )
+ if err != nil {
+ return nil, err
+ }
+
+ // Prepare block reader and return
+ return util.NopReadCloser(&blockReader{
+ storage: st,
+ node: &node,
+ }), nil
+}
+
+// readBlock reads the block with hash (key) from the filesystem.
+func (st *BlockStorage) readBlock(key string) ([]byte, error) {
+ // Get block file path for key
+ bpath := st.blockPathForKey(key)
+
+ // Attempt to open RO file
+ file, err := open(bpath, defaultFileROFlags)
+ if err != nil {
+ return nil, wrap(new_error("corrupted node"), err)
+ }
+ defer file.Close()
+
+ // Wrap the file in a compressor
+ cFile, err := st.config.Compression.Reader(file)
+ if err != nil {
+ return nil, wrap(new_error("corrupted node"), err)
+ }
+ defer cFile.Close()
+
+ // Read the entire file
+ return io.ReadAll(cFile)
+}
+
+// WriteBytes implements Storage.WriteBytes().
+func (st *BlockStorage) WriteBytes(ctx context.Context, key string, value []byte) error {
+ return st.WriteStream(ctx, key, bytes.NewReader(value))
+}
+
+// WriteStream implements Storage.WriteStream().
+func (st *BlockStorage) WriteStream(ctx context.Context, key string, r io.Reader) error {
+ // Get node file path for key
+ npath, err := st.nodePathForKey(key)
+ if err != nil {
+ return err
+ }
+
+ // Check if open
+ if st.lock.Closed() {
+ return ErrClosed
+ }
+
+ // Check context still valid
+ if err := ctx.Err(); err != nil {
+ return err
+ }
+
+ // Check if this exists
+ ok, err := stat(key)
+ if err != nil {
+ return err
+ }
+
+ // Check if we allow overwrites
+ if ok && !st.config.Overwrite {
+ return ErrAlreadyExists
+ }
+
+ // Ensure nodes dir (and any leading up to) exists
+ err = os.MkdirAll(st.nodePath, defaultDirPerms)
+ if err != nil {
+ return err
+ }
+
+ // Ensure blocks dir (and any leading up to) exists
+ err = os.MkdirAll(st.blockPath, defaultDirPerms)
+ if err != nil {
+ return err
+ }
+
+ var node node
+
+ // Acquire HashEncoder
+ hc := st.hashPool.Get().(*hashEncoder)
+ defer st.hashPool.Put(hc)
+
+ // Create new waitgroup and OnceError for
+ // goroutine error tracking and propagating
+ wg := sync.WaitGroup{}
+ onceErr := errors.OnceError{}
+
+loop:
+ for !onceErr.IsSet() {
+ // Fetch new buffer for this loop
+ buf := st.bufpool.Get()
+ buf.Grow(st.config.BlockSize)
+
+ // Read next chunk
+ n, err := io.ReadFull(r, buf.B)
+ switch err {
+ case nil, io.ErrUnexpectedEOF:
+ // do nothing
+ case io.EOF:
+ st.bufpool.Put(buf)
+ break loop
+ default:
+ st.bufpool.Put(buf)
+ return err
+ }
+
+ // Hash the encoded data
+ sum := hc.EncodeSum(buf.B)
+
+ // Append to the node's hashes
+ node.hashes = append(node.hashes, sum)
+
+ // If already on disk, skip
+ has, err := st.statBlock(sum)
+ if err != nil {
+ st.bufpool.Put(buf)
+ return err
+ } else if has {
+ st.bufpool.Put(buf)
+ continue loop
+ }
+
+ // Check if reached EOF
+ atEOF := (n < buf.Len())
+
+ wg.Add(1)
+ go func() {
+ // Perform writes in goroutine
+
+ defer func() {
+ // Defer release +
+ // signal we're done
+ st.bufpool.Put(buf)
+ wg.Done()
+ }()
+
+ // Write block to store at hash
+ err = st.writeBlock(sum, buf.B[:n])
+ if err != nil {
+ onceErr.Store(err)
+ return
+ }
+ }()
+
+ // Break at end
+ if atEOF {
+ break loop
+ }
+ }
+
+ // Wait, check errors
+ wg.Wait()
+ if onceErr.IsSet() {
+ return onceErr.Load()
+ }
+
+ // If no hashes created, return
+ if len(node.hashes) < 1 {
+ return new_error("no hashes written")
+ }
+
+ // Prepare to swap error if need-be
+ errSwap := errSwapNoop
+
+ // Build file RW flags
+ // NOTE: we performed an initial check for
+ // this before writing blocks, but if
+ // the utilizer of this storage didn't
+ // correctly mutex protect this key then
+ // someone may have beaten us to the
+ // punch at writing the node file.
+ flags := defaultFileRWFlags
+ if !st.config.Overwrite {
+ flags |= syscall.O_EXCL
+
+ // Catch + replace err exist
+ errSwap = errSwapExist
+ }
+
+ // Attempt to open RW file
+ file, err := open(npath, flags)
+ if err != nil {
+ return errSwap(err)
+ }
+ defer file.Close()
+
+ // Acquire write buffer
+ buf := st.bufpool.Get()
+ defer st.bufpool.Put(buf)
+ buf.Grow(st.config.WriteBufSize)
+
+ // Finally, write data to file
+ _, err = io.CopyBuffer(file, &nodeReader{node: node}, buf.B)
+ return err
+}
+
+// writeBlock writes the block with hash and supplied value to the filesystem.
+func (st *BlockStorage) writeBlock(hash string, value []byte) error {
+ // Get block file path for key
+ bpath := st.blockPathForKey(hash)
+
+ // Attempt to open RW file
+ file, err := open(bpath, defaultFileRWFlags)
+ if err != nil {
+ if err == syscall.EEXIST {
+ err = nil /* race issue describe in struct NOTE */
+ }
+ return err
+ }
+ defer file.Close()
+
+ // Wrap the file in a compressor
+ cFile, err := st.config.Compression.Writer(file)
+ if err != nil {
+ return err
+ }
+ defer cFile.Close()
+
+ // Write value to file
+ _, err = cFile.Write(value)
+ return err
+}
+
+// statBlock checks for existence of supplied block hash.
+func (st *BlockStorage) statBlock(hash string) (bool, error) {
+ return stat(st.blockPathForKey(hash))
+}
+
+// Stat implements Storage.Stat()
+func (st *BlockStorage) Stat(ctx context.Context, key string) (bool, error) {
+ // Get node file path for key
+ kpath, err := st.nodePathForKey(key)
+ if err != nil {
+ return false, err
+ }
+
+ // Check if open
+ if st.lock.Closed() {
+ return false, ErrClosed
+ }
+
+ // Check context still valid
+ if err := ctx.Err(); err != nil {
+ return false, err
+ }
+
+ // Check for file on disk
+ return stat(kpath)
+}
+
+// Remove implements Storage.Remove().
+func (st *BlockStorage) Remove(ctx context.Context, key string) error {
+ // Get node file path for key
+ kpath, err := st.nodePathForKey(key)
+ if err != nil {
+ return err
+ }
+
+ // Check if open
+ if st.lock.Closed() {
+ return ErrClosed
+ }
+
+ // Check context still valid
+ if err := ctx.Err(); err != nil {
+ return err
+ }
+
+ // Remove at path (we know this is file)
+ if err := unlink(kpath); err != nil {
+ return errSwapNotFound(err)
+ }
+
+ return nil
+}
+
+// Close implements Storage.Close().
+func (st *BlockStorage) Close() error {
+ return st.lock.Close()
+}
+
+// WalkKeys implements Storage.WalkKeys().
+func (st *BlockStorage) WalkKeys(ctx context.Context, opts WalkKeysOptions) error {
+ // Check if open
+ if st.lock.Closed() {
+ return ErrClosed
+ }
+
+ // Check context still valid
+ if err := ctx.Err(); err != nil {
+ return err
+ }
+
+ // Acquire path builder
+ pb := util.GetPathBuilder()
+ defer util.PutPathBuilder(pb)
+
+ // Walk dir for entries
+ return walkDir(pb, st.nodePath, func(npath string, fsentry fs.DirEntry) error {
+ if !fsentry.Type().IsRegular() {
+ // Only deal with regular files
+ return nil
+ }
+
+ // Perform provided walk function
+ return opts.WalkFn(ctx, Entry{
+ Key: fsentry.Name(),
+ Size: -1,
+ })
+ })
+}
+
+// nodePathForKey calculates the node file path for supplied key.
+func (st *BlockStorage) nodePathForKey(key string) (string, error) {
+ // Path separators are illegal, as directory paths
+ if strings.Contains(key, "/") || key == "." || key == ".." {
+ return "", ErrInvalidKey
+ }
+
+ // Acquire path builder
+ pb := util.GetPathBuilder()
+ defer util.PutPathBuilder(pb)
+
+ // Append the nodepath to key
+ pb.AppendString(st.nodePath)
+ pb.AppendString(key)
+
+ // Return joined + cleaned node-path
+ return pb.Join(st.nodePath, key), nil
+}
+
+// blockPathForKey calculates the block file path for supplied hash.
+func (st *BlockStorage) blockPathForKey(hash string) string {
+ pb := util.GetPathBuilder()
+ defer util.PutPathBuilder(pb)
+ return pb.Join(st.blockPath, hash)
+}
+
+// hashSeparator is the separating byte between block hashes.
+const hashSeparator = byte('\n')
+
+// node represents the contents of a node file in storage.
+type node struct {
+ hashes []string
+}
+
+// removeHash attempts to remove supplied block hash from the node's hash array.
+func (n *node) removeHash(hash string) bool {
+ for i := 0; i < len(n.hashes); {
+ if n.hashes[i] == hash {
+ // Drop this hash from slice
+ n.hashes = append(n.hashes[:i], n.hashes[i+1:]...)
+ return true
+ }
+
+ // Continue iter
+ i++
+ }
+ return false
+}
+
+// nodeReader is an io.Reader implementation for the node file representation,
+// which is useful when calculated node file is being written to the store.
+type nodeReader struct {
+ node node
+ idx int
+ last int
+}
+
+func (r *nodeReader) Read(b []byte) (int, error) {
+ n := 0
+
+ // '-1' means we missed writing
+ // hash separator on last iteration
+ if r.last == -1 {
+ b[n] = hashSeparator
+ n++
+ r.last = 0
+ }
+
+ for r.idx < len(r.node.hashes) {
+ hash := r.node.hashes[r.idx]
+
+ // Copy into buffer + update read count
+ m := copy(b[n:], hash[r.last:])
+ n += m
+
+ // If incomplete copy, return here
+ if m < len(hash)-r.last {
+ r.last = m
+ return n, nil
+ }
+
+ // Check we can write last separator
+ if n == len(b) {
+ r.last = -1
+ return n, nil
+ }
+
+ // Write separator, iter, reset
+ b[n] = hashSeparator
+ n++
+ r.idx++
+ r.last = 0
+ }
+
+ // We reached end of hashes
+ return n, io.EOF
+}
+
+// nodeWriter is an io.Writer implementation for the node file representation,
+// which is useful when calculated node file is being read from the store.
+type nodeWriter struct {
+ node *node
+ buf *byteutil.Buffer
+}
+
+func (w *nodeWriter) Write(b []byte) (int, error) {
+ n := 0
+
+ for {
+ // Find next hash separator position
+ idx := bytes.IndexByte(b[n:], hashSeparator)
+ if idx == -1 {
+ // Check we shouldn't be expecting it
+ if w.buf.Len() > encodedHashLen {
+ return n, new_error("invalid node")
+ }
+
+ // Write all contents to buffer
+ w.buf.Write(b[n:])
+ return len(b), nil
+ }
+
+ // Found hash separator, write
+ // current buf contents to Node hashes
+ w.buf.Write(b[n : n+idx])
+ n += idx + 1
+ if w.buf.Len() != encodedHashLen {
+ return n, new_error("invalid node")
+ }
+
+ // Append to hashes & reset
+ w.node.hashes = append(w.node.hashes, w.buf.String())
+ w.buf.Reset()
+ }
+}
+
+// blockReader is an io.Reader implementation for the combined, linked block
+// data contained with a node file. Basically, this allows reading value data
+// from the store for a given node file.
+type blockReader struct {
+ storage *BlockStorage
+ node *node
+ buf []byte
+ prev int
+}
+
+func (r *blockReader) Read(b []byte) (int, error) {
+ n := 0
+
+ // Data left in buf, copy as much as we
+ // can into supplied read buffer
+ if r.prev < len(r.buf)-1 {
+ n += copy(b, r.buf[r.prev:])
+ r.prev += n
+ if n >= len(b) {
+ return n, nil
+ }
+ }
+
+ for {
+ // Check we have any hashes left
+ if len(r.node.hashes) < 1 {
+ return n, io.EOF
+ }
+
+ // Get next key from slice
+ key := r.node.hashes[0]
+ r.node.hashes = r.node.hashes[1:]
+
+ // Attempt to fetch next batch of data
+ var err error
+ r.buf, err = r.storage.readBlock(key)
+ if err != nil {
+ return n, err
+ }
+ r.prev = 0
+
+ // Copy as much as can from new buffer
+ m := copy(b[n:], r.buf)
+ r.prev += m
+ n += m
+
+ // If we hit end of supplied buf, return
+ if n >= len(b) {
+ return n, nil
+ }
+ }
+}
+
+var (
+ // base64Encoding is our base64 encoding object.
+ base64Encoding = hashenc.Base64()
+
+ // encodedHashLen is the once-calculated encoded hash-sum length
+ encodedHashLen = base64Encoding.EncodedLen(
+ sha256.New().Size(),
+ )
+)
+
+// hashEncoder is a HashEncoder with built-in encode buffer.
+type hashEncoder struct {
+ henc hashenc.HashEncoder
+ ebuf []byte
+}
+
+// newHashEncoder returns a new hashEncoder instance.
+func newHashEncoder() *hashEncoder {
+ return &hashEncoder{
+ henc: hashenc.New(sha256.New(), base64Encoding),
+ ebuf: make([]byte, encodedHashLen),
+ }
+}
+
+// EncodeSum encodes the src data and returns resulting bytes, only valid until next call to EncodeSum().
+func (henc *hashEncoder) EncodeSum(src []byte) string {
+ henc.henc.EncodeSum(henc.ebuf, src)
+ return string(henc.ebuf)
+}
diff --git a/vendor/codeberg.org/gruf/go-store/v2/storage/compressor.go b/vendor/codeberg.org/gruf/go-store/v2/storage/compressor.go
new file mode 100644
index 000000000..6eeb3a78d
--- /dev/null
+++ b/vendor/codeberg.org/gruf/go-store/v2/storage/compressor.go
@@ -0,0 +1,212 @@
+package storage
+
+import (
+ "bytes"
+ "io"
+ "sync"
+
+ "codeberg.org/gruf/go-store/v2/util"
+
+ "github.com/klauspost/compress/gzip"
+ "github.com/klauspost/compress/snappy"
+ "github.com/klauspost/compress/zlib"
+)
+
+// Compressor defines a means of compressing/decompressing values going into a key-value store
+type Compressor interface {
+ // Reader returns a new decompressing io.ReadCloser based on supplied (compressed) io.Reader
+ Reader(io.Reader) (io.ReadCloser, error)
+
+ // Writer returns a new compressing io.WriteCloser based on supplied (uncompressed) io.Writer
+ Writer(io.Writer) (io.WriteCloser, error)
+}
+
+type gzipCompressor struct {
+ rpool sync.Pool
+ wpool sync.Pool
+}
+
+// GZipCompressor returns a new Compressor that implements GZip at default compression level
+func GZipCompressor() Compressor {
+ return GZipCompressorLevel(gzip.DefaultCompression)
+}
+
+// GZipCompressorLevel returns a new Compressor that implements GZip at supplied compression level
+func GZipCompressorLevel(level int) Compressor {
+ // GZip readers immediately check for valid
+ // header data on allocation / reset, so we
+ // need a set of valid header data so we can
+ // iniitialize reader instances in mempool.
+ hdr := bytes.NewBuffer(nil)
+
+ // Init writer to ensure valid level provided
+ gw, err := gzip.NewWriterLevel(hdr, level)
+ if err != nil {
+ panic(err)
+ }
+
+ // Write empty data to ensure gzip
+ // header data is in byte buffer.
+ gw.Write([]byte{})
+ gw.Close()
+
+ return &gzipCompressor{
+ rpool: sync.Pool{
+ New: func() any {
+ hdr := bytes.NewReader(hdr.Bytes())
+ gr, _ := gzip.NewReader(hdr)
+ return gr
+ },
+ },
+ wpool: sync.Pool{
+ New: func() any {
+ gw, _ := gzip.NewWriterLevel(nil, level)
+ return gw
+ },
+ },
+ }
+}
+
+func (c *gzipCompressor) Reader(r io.Reader) (io.ReadCloser, error) {
+ gr := c.rpool.Get().(*gzip.Reader)
+ if err := gr.Reset(r); err != nil {
+ c.rpool.Put(gr)
+ return nil, err
+ }
+ return util.ReadCloserWithCallback(gr, func() {
+ c.rpool.Put(gr)
+ }), nil
+}
+
+func (c *gzipCompressor) Writer(w io.Writer) (io.WriteCloser, error) {
+ gw := c.wpool.Get().(*gzip.Writer)
+ gw.Reset(w)
+ return util.WriteCloserWithCallback(gw, func() {
+ c.wpool.Put(gw)
+ }), nil
+}
+
+type zlibCompressor struct {
+ rpool sync.Pool
+ wpool sync.Pool
+ dict []byte
+}
+
+// ZLibCompressor returns a new Compressor that implements ZLib at default compression level
+func ZLibCompressor() Compressor {
+ return ZLibCompressorLevelDict(zlib.DefaultCompression, nil)
+}
+
+// ZLibCompressorLevel returns a new Compressor that implements ZLib at supplied compression level
+func ZLibCompressorLevel(level int) Compressor {
+ return ZLibCompressorLevelDict(level, nil)
+}
+
+// ZLibCompressorLevelDict returns a new Compressor that implements ZLib at supplied compression level with supplied dict
+func ZLibCompressorLevelDict(level int, dict []byte) Compressor {
+ // ZLib readers immediately check for valid
+ // header data on allocation / reset, so we
+ // need a set of valid header data so we can
+ // iniitialize reader instances in mempool.
+ hdr := bytes.NewBuffer(nil)
+
+ // Init writer to ensure valid level + dict provided
+ zw, err := zlib.NewWriterLevelDict(hdr, level, dict)
+ if err != nil {
+ panic(err)
+ }
+
+ // Write empty data to ensure zlib
+ // header data is in byte buffer.
+ zw.Write([]byte{})
+ zw.Close()
+
+ return &zlibCompressor{
+ rpool: sync.Pool{
+ New: func() any {
+ hdr := bytes.NewReader(hdr.Bytes())
+ zr, _ := zlib.NewReaderDict(hdr, dict)
+ return zr
+ },
+ },
+ wpool: sync.Pool{
+ New: func() any {
+ zw, _ := zlib.NewWriterLevelDict(nil, level, dict)
+ return zw
+ },
+ },
+ dict: dict,
+ }
+}
+
+func (c *zlibCompressor) Reader(r io.Reader) (io.ReadCloser, error) {
+ zr := c.rpool.Get().(interface {
+ io.ReadCloser
+ zlib.Resetter
+ })
+ if err := zr.Reset(r, c.dict); err != nil {
+ c.rpool.Put(zr)
+ return nil, err
+ }
+ return util.ReadCloserWithCallback(zr, func() {
+ c.rpool.Put(zr)
+ }), nil
+}
+
+func (c *zlibCompressor) Writer(w io.Writer) (io.WriteCloser, error) {
+ zw := c.wpool.Get().(*zlib.Writer)
+ zw.Reset(w)
+ return util.WriteCloserWithCallback(zw, func() {
+ c.wpool.Put(zw)
+ }), nil
+}
+
+type snappyCompressor struct {
+ rpool sync.Pool
+ wpool sync.Pool
+}
+
+// SnappyCompressor returns a new Compressor that implements Snappy.
+func SnappyCompressor() Compressor {
+ return &snappyCompressor{
+ rpool: sync.Pool{
+ New: func() any { return snappy.NewReader(nil) },
+ },
+ wpool: sync.Pool{
+ New: func() any { return snappy.NewWriter(nil) },
+ },
+ }
+}
+
+func (c *snappyCompressor) Reader(r io.Reader) (io.ReadCloser, error) {
+ sr := c.rpool.Get().(*snappy.Reader)
+ sr.Reset(r)
+ return util.ReadCloserWithCallback(
+ util.NopReadCloser(sr),
+ func() { c.rpool.Put(sr) },
+ ), nil
+}
+
+func (c *snappyCompressor) Writer(w io.Writer) (io.WriteCloser, error) {
+ sw := c.wpool.Get().(*snappy.Writer)
+ sw.Reset(w)
+ return util.WriteCloserWithCallback(
+ util.NopWriteCloser(sw),
+ func() { c.wpool.Put(sw) },
+ ), nil
+}
+
+type nopCompressor struct{}
+
+// NoCompression is a Compressor that simply does nothing.
+func NoCompression() Compressor {
+ return &nopCompressor{}
+}
+
+func (c *nopCompressor) Reader(r io.Reader) (io.ReadCloser, error) {
+ return util.NopReadCloser(r), nil
+}
+
+func (c *nopCompressor) Writer(w io.Writer) (io.WriteCloser, error) {
+ return util.NopWriteCloser(w), nil
+}
diff --git a/vendor/codeberg.org/gruf/go-store/v2/storage/disk.go b/vendor/codeberg.org/gruf/go-store/v2/storage/disk.go
new file mode 100644
index 000000000..dab1d6128
--- /dev/null
+++ b/vendor/codeberg.org/gruf/go-store/v2/storage/disk.go
@@ -0,0 +1,425 @@
+package storage
+
+import (
+ "context"
+ "errors"
+ "io"
+ "io/fs"
+ "os"
+ "path"
+ _path "path"
+ "strings"
+ "syscall"
+
+ "codeberg.org/gruf/go-bytes"
+ "codeberg.org/gruf/go-fastcopy"
+ "codeberg.org/gruf/go-store/v2/util"
+)
+
+// DefaultDiskConfig is the default DiskStorage configuration.
+var DefaultDiskConfig = &DiskConfig{
+ Overwrite: true,
+ WriteBufSize: 4096,
+ Transform: NopTransform(),
+ Compression: NoCompression(),
+}
+
+// DiskConfig defines options to be used when opening a DiskStorage.
+type DiskConfig struct {
+ // Transform is the supplied key <--> path KeyTransform.
+ Transform KeyTransform
+
+ // WriteBufSize is the buffer size to use when writing file streams.
+ WriteBufSize int
+
+ // Overwrite allows overwriting values of stored keys in the storage.
+ Overwrite bool
+
+ // LockFile allows specifying the filesystem path to use for the lockfile,
+ // providing only a filename it will store the lockfile within provided store
+ // path and nest the store under `path/store` to prevent access to lockfile.
+ LockFile string
+
+ // Compression is the Compressor to use when reading / writing files,
+ // default is no compression.
+ Compression Compressor
+}
+
+// getDiskConfig returns a valid DiskConfig for supplied ptr.
+func getDiskConfig(cfg *DiskConfig) DiskConfig {
+ // If nil, use default
+ if cfg == nil {
+ cfg = DefaultDiskConfig
+ }
+
+ // Assume nil transform == none
+ if cfg.Transform == nil {
+ cfg.Transform = NopTransform()
+ }
+
+ // Assume nil compress == none
+ if cfg.Compression == nil {
+ cfg.Compression = NoCompression()
+ }
+
+ // Assume 0 buf size == use default
+ if cfg.WriteBufSize <= 0 {
+ cfg.WriteBufSize = DefaultDiskConfig.WriteBufSize
+ }
+
+ // Assume empty lockfile path == use default
+ if len(cfg.LockFile) == 0 {
+ cfg.LockFile = LockFile
+ }
+
+ // Return owned config copy
+ return DiskConfig{
+ Transform: cfg.Transform,
+ WriteBufSize: cfg.WriteBufSize,
+ Overwrite: cfg.Overwrite,
+ LockFile: cfg.LockFile,
+ Compression: cfg.Compression,
+ }
+}
+
+// DiskStorage is a Storage implementation that stores directly to a filesystem.
+type DiskStorage struct {
+ path string // path is the root path of this store
+ cppool fastcopy.CopyPool // cppool is the prepared io copier with buffer pool
+ config DiskConfig // cfg is the supplied configuration for this store
+ lock *Lock // lock is the opened lockfile for this storage instance
+}
+
+// OpenDisk opens a DiskStorage instance for given folder path and configuration.
+func OpenDisk(path string, cfg *DiskConfig) (*DiskStorage, error) {
+ // Get checked config
+ config := getDiskConfig(cfg)
+
+ // Acquire path builder
+ pb := util.GetPathBuilder()
+ defer util.PutPathBuilder(pb)
+
+ // Clean provided store path, ensure
+ // ends in '/' to help later path trimming
+ storePath := pb.Clean(path) + "/"
+
+ // Clean provided lockfile path
+ lockfile := pb.Clean(config.LockFile)
+
+ // Check if lockfile is an *actual* path or just filename
+ if lockDir, _ := _path.Split(lockfile); lockDir == "" {
+ // Lockfile is a filename, store must be nested under
+ // $storePath/store to prevent access to the lockfile
+ storePath += "store/"
+ lockfile = pb.Join(path, lockfile)
+ }
+
+ // Attempt to open dir path
+ file, err := os.OpenFile(storePath, defaultFileROFlags, defaultDirPerms)
+ if err != nil {
+ // If not a not-exist error, return
+ if !os.IsNotExist(err) {
+ return nil, err
+ }
+
+ // Attempt to make store path dirs
+ err = os.MkdirAll(storePath, defaultDirPerms)
+ if err != nil {
+ return nil, err
+ }
+
+ // Reopen dir now it's been created
+ file, err = os.OpenFile(storePath, defaultFileROFlags, defaultDirPerms)
+ if err != nil {
+ return nil, err
+ }
+ }
+ defer file.Close()
+
+ // Double check this is a dir (NOT a file!)
+ stat, err := file.Stat()
+ if err != nil {
+ return nil, err
+ } else if !stat.IsDir() {
+ return nil, errors.New("store/storage: path is file")
+ }
+
+ // Open and acquire storage lock for path
+ lock, err := OpenLock(lockfile)
+ if err != nil {
+ return nil, err
+ }
+
+ // Prepare DiskStorage
+ st := &DiskStorage{
+ path: storePath,
+ config: config,
+ lock: lock,
+ }
+
+ // Set copypool buffer size
+ st.cppool.Buffer(config.WriteBufSize)
+
+ return st, nil
+}
+
+// Clean implements Storage.Clean().
+func (st *DiskStorage) Clean(ctx context.Context) error {
+ // Check if open
+ if st.lock.Closed() {
+ return ErrClosed
+ }
+
+ // Check context still valid
+ if err := ctx.Err(); err != nil {
+ return err
+ }
+
+ // Clean-out unused directories
+ return cleanDirs(st.path)
+}
+
+// ReadBytes implements Storage.ReadBytes().
+func (st *DiskStorage) ReadBytes(ctx context.Context, key string) ([]byte, error) {
+ // Get stream reader for key
+ rc, err := st.ReadStream(ctx, key)
+ if err != nil {
+ return nil, err
+ }
+ defer rc.Close()
+
+ // Read all bytes and return
+ return io.ReadAll(rc)
+}
+
+// ReadStream implements Storage.ReadStream().
+func (st *DiskStorage) ReadStream(ctx context.Context, key string) (io.ReadCloser, error) {
+ // Get file path for key
+ kpath, err := st.filepath(key)
+ if err != nil {
+ return nil, err
+ }
+
+ // Check if open
+ if st.lock.Closed() {
+ return nil, ErrClosed
+ }
+
+ // Check context still valid
+ if err := ctx.Err(); err != nil {
+ return nil, err
+ }
+
+ // Attempt to open file (replace ENOENT with our own)
+ file, err := open(kpath, defaultFileROFlags)
+ if err != nil {
+ return nil, errSwapNotFound(err)
+ }
+
+ // Wrap the file in a compressor
+ cFile, err := st.config.Compression.Reader(file)
+ if err != nil {
+ file.Close() // close this here, ignore error
+ return nil, err
+ }
+
+ // Wrap compressor to ensure file close
+ return util.ReadCloserWithCallback(cFile, func() {
+ file.Close()
+ }), nil
+}
+
+// WriteBytes implements Storage.WriteBytes().
+func (st *DiskStorage) WriteBytes(ctx context.Context, key string, value []byte) error {
+ return st.WriteStream(ctx, key, bytes.NewReader(value))
+}
+
+// WriteStream implements Storage.WriteStream().
+func (st *DiskStorage) WriteStream(ctx context.Context, key string, r io.Reader) error {
+ // Get file path for key
+ kpath, err := st.filepath(key)
+ if err != nil {
+ return err
+ }
+
+ // Check if open
+ if st.lock.Closed() {
+ return ErrClosed
+ }
+
+ // Check context still valid
+ if err := ctx.Err(); err != nil {
+ return err
+ }
+
+ // Ensure dirs leading up to file exist
+ err = os.MkdirAll(path.Dir(kpath), defaultDirPerms)
+ if err != nil {
+ return err
+ }
+
+ // Prepare to swap error if need-be
+ errSwap := errSwapNoop
+
+ // Build file RW flags
+ flags := defaultFileRWFlags
+ if !st.config.Overwrite {
+ flags |= syscall.O_EXCL
+
+ // Catch + replace err exist
+ errSwap = errSwapExist
+ }
+
+ // Attempt to open file
+ file, err := open(kpath, flags)
+ if err != nil {
+ return errSwap(err)
+ }
+ defer file.Close()
+
+ // Wrap the file in a compressor
+ cFile, err := st.config.Compression.Writer(file)
+ if err != nil {
+ return err
+ }
+ defer cFile.Close()
+
+ // Copy provided reader to file
+ _, err = st.cppool.Copy(cFile, r)
+ return err
+}
+
+// Stat implements Storage.Stat().
+func (st *DiskStorage) Stat(ctx context.Context, key string) (bool, error) {
+ // Get file path for key
+ kpath, err := st.filepath(key)
+ if err != nil {
+ return false, err
+ }
+
+ // Check if open
+ if st.lock.Closed() {
+ return false, ErrClosed
+ }
+
+ // Check context still valid
+ if err := ctx.Err(); err != nil {
+ return false, err
+ }
+
+ // Check for file on disk
+ return stat(kpath)
+}
+
+// Remove implements Storage.Remove().
+func (st *DiskStorage) Remove(ctx context.Context, key string) error {
+ // Get file path for key
+ kpath, err := st.filepath(key)
+ if err != nil {
+ return err
+ }
+
+ // Check if open
+ if st.lock.Closed() {
+ return ErrClosed
+ }
+
+ // Check context still valid
+ if err := ctx.Err(); err != nil {
+ return err
+ }
+
+ // Remove at path (we know this is file)
+ if err := unlink(kpath); err != nil {
+ return errSwapNotFound(err)
+ }
+
+ return nil
+}
+
+// Close implements Storage.Close().
+func (st *DiskStorage) Close() error {
+ return st.lock.Close()
+}
+
+// WalkKeys implements Storage.WalkKeys().
+func (st *DiskStorage) WalkKeys(ctx context.Context, opts WalkKeysOptions) error {
+ // Check if open
+ if st.lock.Closed() {
+ return ErrClosed
+ }
+
+ // Check context still valid
+ if err := ctx.Err(); err != nil {
+ return err
+ }
+
+ // Acquire path builder
+ pb := util.GetPathBuilder()
+ defer util.PutPathBuilder(pb)
+
+ // Walk dir for entries
+ return walkDir(pb, st.path, func(kpath string, fsentry fs.DirEntry) error {
+ if !fsentry.Type().IsRegular() {
+ // Only deal with regular files
+ return nil
+ }
+
+ // Get full item path (without root)
+ kpath = pb.Join(kpath, fsentry.Name())
+ kpath = kpath[len(st.path):]
+
+ // Load file info. This should already
+ // be loaded due to the underlying call
+ // to os.File{}.ReadDir() populating them
+ info, err := fsentry.Info()
+ if err != nil {
+ return err
+ }
+
+ // Perform provided walk function
+ return opts.WalkFn(ctx, Entry{
+ Key: st.config.Transform.PathToKey(kpath),
+ Size: info.Size(),
+ })
+ })
+}
+
+// filepath checks and returns a formatted filepath for given key.
+func (st *DiskStorage) filepath(key string) (string, error) {
+ // Calculate transformed key path
+ key = st.config.Transform.KeyToPath(key)
+
+ // Acquire path builder
+ pb := util.GetPathBuilder()
+ defer util.PutPathBuilder(pb)
+
+ // Generated joined root path
+ pb.AppendString(st.path)
+ pb.AppendString(key)
+
+ // Check for dir traversal outside of root
+ if isDirTraversal(st.path, pb.StringPtr()) {
+ return "", ErrInvalidKey
+ }
+
+ return pb.String(), nil
+}
+
+// isDirTraversal will check if rootPlusPath is a dir traversal outside of root,
+// assuming that both are cleaned and that rootPlusPath is path.Join(root, somePath).
+func isDirTraversal(root, rootPlusPath string) bool {
+ switch {
+ // Root is $PWD, check for traversal out of
+ case root == ".":
+ return strings.HasPrefix(rootPlusPath, "../")
+
+ // The path MUST be prefixed by root
+ case !strings.HasPrefix(rootPlusPath, root):
+ return true
+
+ // In all other cases, check not equal
+ default:
+ return len(root) == len(rootPlusPath)
+ }
+}
diff --git a/vendor/codeberg.org/gruf/go-store/v2/storage/errors.go b/vendor/codeberg.org/gruf/go-store/v2/storage/errors.go
new file mode 100644
index 000000000..4ae7e4be5
--- /dev/null
+++ b/vendor/codeberg.org/gruf/go-store/v2/storage/errors.go
@@ -0,0 +1,110 @@
+package storage
+
+import (
+ "errors"
+ "strings"
+ "syscall"
+
+ "github.com/minio/minio-go/v7"
+)
+
+var (
+ // ErrClosed is returned on operations on a closed storage
+ ErrClosed = new_error("closed")
+
+ // ErrNotFound is the error returned when a key cannot be found in storage
+ ErrNotFound = new_error("key not found")
+
+ // ErrAlreadyExist is the error returned when a key already exists in storage
+ ErrAlreadyExists = new_error("key already exists")
+
+ // ErrInvalidkey is the error returned when an invalid key is passed to storage
+ ErrInvalidKey = new_error("invalid key")
+
+ // ErrAlreadyLocked is returned on fail opening a storage lockfile
+ ErrAlreadyLocked = new_error("storage lock already open")
+)
+
+// new_error returns a new error instance prefixed by package prefix.
+func new_error(msg string) error {
+ return errors.New("store/storage: " + msg)
+}
+
+// wrappedError allows wrapping together an inner with outer error.
+type wrappedError struct {
+ inner error
+ outer error
+}
+
+// wrap will return a new wrapped error from given inner and outer errors.
+func wrap(outer, inner error) *wrappedError {
+ return &wrappedError{
+ inner: inner,
+ outer: outer,
+ }
+}
+
+func (e *wrappedError) Is(target error) bool {
+ return e.outer == target || e.inner == target
+}
+
+func (e *wrappedError) Error() string {
+ return e.outer.Error() + ": " + e.inner.Error()
+}
+
+func (e *wrappedError) Unwrap() error {
+ return e.inner
+}
+
+// errSwapNoop performs no error swaps
+func errSwapNoop(err error) error {
+ return err
+}
+
+// ErrSwapNotFound swaps syscall.ENOENT for ErrNotFound
+func errSwapNotFound(err error) error {
+ if err == syscall.ENOENT {
+ return ErrNotFound
+ }
+ return err
+}
+
+// errSwapExist swaps syscall.EEXIST for ErrAlreadyExists
+func errSwapExist(err error) error {
+ if err == syscall.EEXIST {
+ return ErrAlreadyExists
+ }
+ return err
+}
+
+// errSwapUnavailable swaps syscall.EAGAIN for ErrAlreadyLocked
+func errSwapUnavailable(err error) error {
+ if err == syscall.EAGAIN {
+ return ErrAlreadyLocked
+ }
+ return err
+}
+
+// transformS3Error transforms an error returned from S3Storage underlying
+// minio.Core client, by wrapping where necessary with our own error types.
+func transformS3Error(err error) error {
+ // Cast this to a minio error response
+ ersp, ok := err.(minio.ErrorResponse)
+ if ok {
+ switch ersp.Code {
+ case "NoSuchKey":
+ return wrap(ErrNotFound, err)
+ case "Conflict":
+ return wrap(ErrAlreadyExists, err)
+ default:
+ return err
+ }
+ }
+
+ // Check if error has an invalid object name prefix
+ if strings.HasPrefix(err.Error(), "Object name ") {
+ return wrap(ErrInvalidKey, err)
+ }
+
+ return err
+}
diff --git a/vendor/codeberg.org/gruf/go-store/v2/storage/fs.go b/vendor/codeberg.org/gruf/go-store/v2/storage/fs.go
new file mode 100644
index 000000000..658b7e762
--- /dev/null
+++ b/vendor/codeberg.org/gruf/go-store/v2/storage/fs.go
@@ -0,0 +1,221 @@
+package storage
+
+import (
+ "io/fs"
+ "os"
+ "syscall"
+
+ "codeberg.org/gruf/go-fastpath"
+ "codeberg.org/gruf/go-store/v2/util"
+)
+
+const (
+ // default file permission bits
+ defaultDirPerms = 0o755
+ defaultFilePerms = 0o644
+
+ // default file open flags
+ defaultFileROFlags = syscall.O_RDONLY
+ defaultFileRWFlags = syscall.O_CREAT | syscall.O_RDWR
+ defaultFileLockFlags = syscall.O_RDONLY | syscall.O_CREAT
+)
+
+// NOTE:
+// These functions are for opening storage files,
+// not necessarily for e.g. initial setup (OpenFile)
+
+// walkDir traverses the dir tree of the supplied path, performing the supplied walkFn on each entry
+func walkDir(pb *fastpath.Builder, path string, walkFn func(string, fs.DirEntry) error) error {
+ // Read directory entries
+ entries, err := readDir(path)
+ if err != nil {
+ return err
+ }
+
+ // frame represents a directory entry
+ // walk-loop snapshot, taken when a sub
+ // directory requiring iteration is found
+ type frame struct {
+ path string
+ entries []fs.DirEntry
+ }
+
+ // stack contains a list of held snapshot
+ // frames, representing unfinished upper
+ // layers of a directory structure yet to
+ // be traversed.
+ var stack []frame
+
+outer:
+ for {
+ if len(entries) == 0 {
+ if len(stack) == 0 {
+ // Reached end
+ break outer
+ }
+
+ // Pop frame from stack
+ frame := stack[len(stack)-1]
+ stack = stack[:len(stack)-1]
+
+ // Update loop vars
+ entries = frame.entries
+ path = frame.path
+ }
+
+ for len(entries) > 0 {
+ // Pop next entry from queue
+ entry := entries[0]
+ entries = entries[1:]
+
+ // Pass to provided walk function
+ if err := walkFn(path, entry); err != nil {
+ return err
+ }
+
+ if entry.IsDir() {
+ // Push current frame to stack
+ stack = append(stack, frame{
+ path: path,
+ entries: entries,
+ })
+
+ // Update current directory path
+ path = pb.Join(path, entry.Name())
+
+ // Read next directory entries
+ next, err := readDir(path)
+ if err != nil {
+ return err
+ }
+
+ // Set next entries
+ entries = next
+
+ continue outer
+ }
+ }
+ }
+
+ return nil
+}
+
+// cleanDirs traverses the dir tree of the supplied path, removing any folders with zero children
+func cleanDirs(path string) error {
+ // Acquire path builder
+ pb := util.GetPathBuilder()
+ defer util.PutPathBuilder(pb)
+
+ // Get top-level dir entries
+ entries, err := readDir(path)
+ if err != nil {
+ return err
+ }
+
+ for _, entry := range entries {
+ if entry.IsDir() {
+ // Recursively clean sub-directory entries
+ if err := cleanDir(pb, pb.Join(path, entry.Name())); err != nil {
+ return err
+ }
+ }
+ }
+
+ return nil
+}
+
+// cleanDir performs the actual dir cleaning logic for the above top-level version.
+func cleanDir(pb *fastpath.Builder, path string) error {
+ // Get dir entries
+ entries, err := readDir(path)
+ if err != nil {
+ return err
+ }
+
+ // If no entries, delete
+ if len(entries) < 1 {
+ return rmdir(path)
+ }
+
+ for _, entry := range entries {
+ if entry.IsDir() {
+ // Recursively clean sub-directory entries
+ if err := cleanDir(pb, pb.Join(path, entry.Name())); err != nil {
+ return err
+ }
+ }
+ }
+
+ return nil
+}
+
+// readDir will open file at path, read the unsorted list of entries, then close.
+func readDir(path string) ([]fs.DirEntry, error) {
+ // Open file at path
+ file, err := open(path, defaultFileROFlags)
+ if err != nil {
+ return nil, err
+ }
+
+ // Read directory entries
+ entries, err := file.ReadDir(-1)
+
+ // Done with file
+ _ = file.Close()
+
+ return entries, err
+}
+
+// open will open a file at the given path with flags and default file perms.
+func open(path string, flags int) (*os.File, error) {
+ var fd int
+ err := retryOnEINTR(func() (err error) {
+ fd, err = syscall.Open(path, flags, defaultFilePerms)
+ return
+ })
+ if err != nil {
+ return nil, err
+ }
+ return os.NewFile(uintptr(fd), path), nil
+}
+
+// stat checks for a file on disk.
+func stat(path string) (bool, error) {
+ var stat syscall.Stat_t
+ err := retryOnEINTR(func() error {
+ return syscall.Stat(path, &stat)
+ })
+ if err != nil {
+ if err == syscall.ENOENT {
+ // not-found is no error
+ err = nil
+ }
+ return false, err
+ }
+ return true, nil
+}
+
+// unlink removes a file (not dir!) on disk.
+func unlink(path string) error {
+ return retryOnEINTR(func() error {
+ return syscall.Unlink(path)
+ })
+}
+
+// rmdir removes a dir (not file!) on disk.
+func rmdir(path string) error {
+ return retryOnEINTR(func() error {
+ return syscall.Rmdir(path)
+ })
+}
+
+// retryOnEINTR is a low-level filesystem function for retrying syscalls on O_EINTR received.
+func retryOnEINTR(do func() error) error {
+ for {
+ err := do()
+ if err == syscall.EINTR {
+ continue
+ }
+ return err
+ }
+}
diff --git a/vendor/codeberg.org/gruf/go-store/v2/storage/lock.go b/vendor/codeberg.org/gruf/go-store/v2/storage/lock.go
new file mode 100644
index 000000000..25ecefe52
--- /dev/null
+++ b/vendor/codeberg.org/gruf/go-store/v2/storage/lock.go
@@ -0,0 +1,59 @@
+package storage
+
+import (
+ "sync/atomic"
+ "syscall"
+)
+
+// LockFile is our standard lockfile name.
+const LockFile = "store.lock"
+
+// Lock represents a filesystem lock to ensure only one storage instance open per path.
+type Lock struct {
+ fd int
+ st uint32
+}
+
+// OpenLock opens a lockfile at path.
+func OpenLock(path string) (*Lock, error) {
+ var fd int
+
+ // Open the file descriptor at path
+ err := retryOnEINTR(func() (err error) {
+ fd, err = syscall.Open(path, defaultFileLockFlags, defaultFilePerms)
+ return
+ })
+ if err != nil {
+ return nil, err
+ }
+
+ // Get a flock on the file descriptor
+ err = retryOnEINTR(func() error {
+ return syscall.Flock(fd, syscall.LOCK_EX|syscall.LOCK_NB)
+ })
+ if err != nil {
+ return nil, errSwapUnavailable(err)
+ }
+
+ return &Lock{fd: fd}, nil
+}
+
+// Close will attempt to close the lockfile and file descriptor.
+func (f *Lock) Close() error {
+ var err error
+ if atomic.CompareAndSwapUint32(&f.st, 0, 1) {
+ // Ensure gets closed
+ defer syscall.Close(f.fd)
+
+ // Call funlock on the file descriptor
+ err = retryOnEINTR(func() error {
+ return syscall.Flock(f.fd, syscall.LOCK_UN|syscall.LOCK_NB)
+ })
+ }
+ return err
+}
+
+// Closed will return whether this lockfile has been closed (and unlocked).
+func (f *Lock) Closed() bool {
+ return (atomic.LoadUint32(&f.st) == 1)
+}
diff --git a/vendor/codeberg.org/gruf/go-store/v2/storage/memory.go b/vendor/codeberg.org/gruf/go-store/v2/storage/memory.go
new file mode 100644
index 000000000..a853c84d2
--- /dev/null
+++ b/vendor/codeberg.org/gruf/go-store/v2/storage/memory.go
@@ -0,0 +1,228 @@
+package storage
+
+import (
+ "context"
+ "io"
+ "sync/atomic"
+
+ "codeberg.org/gruf/go-bytes"
+ "codeberg.org/gruf/go-store/v2/util"
+ "github.com/cornelk/hashmap"
+)
+
+// MemoryStorage is a storage implementation that simply stores key-value
+// pairs in a Go map in-memory. The map is protected by a mutex.
+type MemoryStorage struct {
+ ow bool // overwrites
+ fs *hashmap.Map[string, []byte]
+ st uint32
+}
+
+// OpenMemory opens a new MemoryStorage instance with internal map starting size.
+func OpenMemory(size int, overwrites bool) *MemoryStorage {
+ if size <= 0 {
+ size = 8
+ }
+ return &MemoryStorage{
+ fs: hashmap.NewSized[string, []byte](uintptr(size)),
+ ow: overwrites,
+ }
+}
+
+// Clean implements Storage.Clean().
+func (st *MemoryStorage) Clean(ctx context.Context) error {
+ // Check store open
+ if st.closed() {
+ return ErrClosed
+ }
+
+ // Check context still valid
+ if err := ctx.Err(); err != nil {
+ return err
+ }
+
+ return nil
+}
+
+// ReadBytes implements Storage.ReadBytes().
+func (st *MemoryStorage) ReadBytes(ctx context.Context, key string) ([]byte, error) {
+ // Check store open
+ if st.closed() {
+ return nil, ErrClosed
+ }
+
+ // Check context still valid
+ if err := ctx.Err(); err != nil {
+ return nil, err
+ }
+
+ // Check for key in store
+ b, ok := st.fs.Get(key)
+ if !ok {
+ return nil, ErrNotFound
+ }
+
+ // Create return copy
+ return copyb(b), nil
+}
+
+// ReadStream implements Storage.ReadStream().
+func (st *MemoryStorage) ReadStream(ctx context.Context, key string) (io.ReadCloser, error) {
+ // Check store open
+ if st.closed() {
+ return nil, ErrClosed
+ }
+
+ // Check context still valid
+ if err := ctx.Err(); err != nil {
+ return nil, err
+ }
+
+ // Check for key in store
+ b, ok := st.fs.Get(key)
+ if !ok {
+ return nil, ErrNotFound
+ }
+
+ // Create io.ReadCloser from 'b' copy
+ r := bytes.NewReader(copyb(b))
+ return util.NopReadCloser(r), nil
+}
+
+// WriteBytes implements Storage.WriteBytes().
+func (st *MemoryStorage) WriteBytes(ctx context.Context, key string, b []byte) error {
+ // Check store open
+ if st.closed() {
+ return ErrClosed
+ }
+
+ // Check context still valid
+ if err := ctx.Err(); err != nil {
+ return err
+ }
+
+ // Check for key that already exists
+ if _, ok := st.fs.Get(key); ok && !st.ow {
+ return ErrAlreadyExists
+ }
+
+ // Write key copy to store
+ st.fs.Set(key, copyb(b))
+ return nil
+}
+
+// WriteStream implements Storage.WriteStream().
+func (st *MemoryStorage) WriteStream(ctx context.Context, key string, r io.Reader) error {
+ // Check store open
+ if st.closed() {
+ return ErrClosed
+ }
+
+ // Check context still valid
+ if err := ctx.Err(); err != nil {
+ return err
+ }
+
+ // Check for key that already exists
+ if _, ok := st.fs.Get(key); ok && !st.ow {
+ return ErrAlreadyExists
+ }
+
+ // Read all from reader
+ b, err := io.ReadAll(r)
+ if err != nil {
+ return err
+ }
+
+ // Write key to store
+ st.fs.Set(key, b)
+ return nil
+}
+
+// Stat implements Storage.Stat().
+func (st *MemoryStorage) Stat(ctx context.Context, key string) (bool, error) {
+ // Check store open
+ if st.closed() {
+ return false, ErrClosed
+ }
+
+ // Check context still valid
+ if err := ctx.Err(); err != nil {
+ return false, err
+ }
+
+ // Check for key in store
+ _, ok := st.fs.Get(key)
+ return ok, nil
+}
+
+// Remove implements Storage.Remove().
+func (st *MemoryStorage) Remove(ctx context.Context, key string) error {
+ // Check store open
+ if st.closed() {
+ return ErrClosed
+ }
+
+ // Check context still valid
+ if err := ctx.Err(); err != nil {
+ return err
+ }
+
+ // Attempt to delete key
+ ok := st.fs.Del(key)
+ if !ok {
+ return ErrNotFound
+ }
+
+ return nil
+}
+
+// WalkKeys implements Storage.WalkKeys().
+func (st *MemoryStorage) WalkKeys(ctx context.Context, opts WalkKeysOptions) error {
+ // Check store open
+ if st.closed() {
+ return ErrClosed
+ }
+
+ // Check context still valid
+ if err := ctx.Err(); err != nil {
+ return err
+ }
+
+ var err error
+
+ // Nil check func
+ _ = opts.WalkFn
+
+ // Pass each key in map to walk function
+ st.fs.Range(func(key string, val []byte) bool {
+ err = opts.WalkFn(ctx, Entry{
+ Key: key,
+ Size: int64(len(val)),
+ })
+ return (err == nil)
+ })
+
+ return err
+}
+
+// Close implements Storage.Close().
+func (st *MemoryStorage) Close() error {
+ atomic.StoreUint32(&st.st, 1)
+ return nil
+}
+
+// closed returns whether MemoryStorage is closed.
+func (st *MemoryStorage) closed() bool {
+ return (atomic.LoadUint32(&st.st) == 1)
+}
+
+// copyb returns a copy of byte-slice b.
+func copyb(b []byte) []byte {
+ if b == nil {
+ return nil
+ }
+ p := make([]byte, len(b))
+ _ = copy(p, b)
+ return p
+}
diff --git a/vendor/codeberg.org/gruf/go-store/v2/storage/s3.go b/vendor/codeberg.org/gruf/go-store/v2/storage/s3.go
new file mode 100644
index 000000000..baf2a1b3c
--- /dev/null
+++ b/vendor/codeberg.org/gruf/go-store/v2/storage/s3.go
@@ -0,0 +1,385 @@
+package storage
+
+import (
+ "bytes"
+ "context"
+ "io"
+ "sync/atomic"
+
+ "codeberg.org/gruf/go-store/v2/util"
+ "github.com/minio/minio-go/v7"
+)
+
+// DefaultS3Config is the default S3Storage configuration.
+var DefaultS3Config = &S3Config{
+ CoreOpts: minio.Options{},
+ GetOpts: minio.GetObjectOptions{},
+ PutOpts: minio.PutObjectOptions{},
+ PutChunkSize: 4 * 1024 * 1024, // 4MiB
+ StatOpts: minio.StatObjectOptions{},
+ RemoveOpts: minio.RemoveObjectOptions{},
+ ListSize: 200,
+}
+
+// S3Config defines options to be used when opening an S3Storage,
+// mostly options for underlying S3 client library.
+type S3Config struct {
+ // CoreOpts are S3 client options passed during initialization.
+ CoreOpts minio.Options
+
+ // GetOpts are S3 client options passed during .Read___() calls.
+ GetOpts minio.GetObjectOptions
+
+ // PutOpts are S3 client options passed during .Write___() calls.
+ PutOpts minio.PutObjectOptions
+
+ // PutChunkSize is the chunk size (in bytes) to use when sending
+ // a byte stream reader of unknown size as a multi-part object.
+ PutChunkSize int64
+
+ // StatOpts are S3 client options passed during .Stat() calls.
+ StatOpts minio.StatObjectOptions
+
+ // RemoveOpts are S3 client options passed during .Remove() calls.
+ RemoveOpts minio.RemoveObjectOptions
+
+ // ListSize determines how many items to include in each
+ // list request, made during calls to .WalkKeys().
+ ListSize int
+}
+
+// getS3Config returns a valid S3Config for supplied ptr.
+func getS3Config(cfg *S3Config) S3Config {
+ // If nil, use default
+ if cfg == nil {
+ cfg = DefaultS3Config
+ }
+
+ // Assume 0 chunk size == use default
+ if cfg.PutChunkSize <= 0 {
+ cfg.PutChunkSize = 4 * 1024 * 1024
+ }
+
+ // Assume 0 list size == use default
+ if cfg.ListSize <= 0 {
+ cfg.ListSize = 200
+ }
+
+ // Return owned config copy
+ return S3Config{
+ CoreOpts: cfg.CoreOpts,
+ GetOpts: cfg.GetOpts,
+ PutOpts: cfg.PutOpts,
+ StatOpts: cfg.StatOpts,
+ RemoveOpts: cfg.RemoveOpts,
+ }
+}
+
+// S3Storage is a storage implementation that stores key-value
+// pairs in an S3 instance at given endpoint with bucket name.
+type S3Storage struct {
+ client *minio.Core
+ bucket string
+ config S3Config
+ state uint32
+}
+
+// OpenS3 opens a new S3Storage instance with given S3 endpoint URL, bucket name and configuration.
+func OpenS3(endpoint string, bucket string, cfg *S3Config) (*S3Storage, error) {
+ // Get checked config
+ config := getS3Config(cfg)
+
+ // Create new S3 client connection
+ client, err := minio.NewCore(endpoint, &config.CoreOpts)
+ if err != nil {
+ return nil, err
+ }
+
+ // Check that provided bucket actually exists
+ exists, err := client.BucketExists(context.Background(), bucket)
+ if err != nil {
+ return nil, err
+ } else if !exists {
+ return nil, new_error("bucket does not exist")
+ }
+
+ return &S3Storage{
+ client: client,
+ bucket: bucket,
+ config: config,
+ }, nil
+}
+
+// Client returns access to the underlying S3 client.
+func (st *S3Storage) Client() *minio.Core {
+ return st.client
+}
+
+// Clean implements Storage.Clean().
+func (st *S3Storage) Clean(ctx context.Context) error {
+ return nil // nothing to do for S3
+}
+
+// ReadBytes implements Storage.ReadBytes().
+func (st *S3Storage) ReadBytes(ctx context.Context, key string) ([]byte, error) {
+ // Fetch object reader from S3 bucket
+ rc, err := st.ReadStream(ctx, key)
+ if err != nil {
+ return nil, err
+ }
+ defer rc.Close()
+
+ // Read all bytes and return
+ return io.ReadAll(rc)
+}
+
+// ReadStream implements Storage.ReadStream().
+func (st *S3Storage) ReadStream(ctx context.Context, key string) (io.ReadCloser, error) {
+ // Check storage open
+ if st.closed() {
+ return nil, ErrClosed
+ }
+
+ // Fetch object reader from S3 bucket
+ rc, _, _, err := st.client.GetObject(
+ ctx,
+ st.bucket,
+ key,
+ st.config.GetOpts,
+ )
+ if err != nil {
+ return nil, transformS3Error(err)
+ }
+
+ return rc, nil
+}
+
+// WriteBytes implements Storage.WriteBytes().
+func (st *S3Storage) WriteBytes(ctx context.Context, key string, value []byte) error {
+ return st.WriteStream(ctx, key, util.NewByteReaderSize(value))
+}
+
+// WriteStream implements Storage.WriteStream().
+func (st *S3Storage) WriteStream(ctx context.Context, key string, r io.Reader) error {
+ // Check storage open
+ if st.closed() {
+ return ErrClosed
+ }
+
+ if rs, ok := r.(util.ReaderSize); ok {
+ // This reader supports providing us the size of
+ // the encompassed data, allowing us to perform
+ // a singular .PutObject() call with length.
+ _, err := st.client.PutObject(
+ ctx,
+ st.bucket,
+ key,
+ r,
+ rs.Size(),
+ "",
+ "",
+ st.config.PutOpts,
+ )
+ if err != nil {
+ return transformS3Error(err)
+ }
+ return nil
+ }
+
+ // Start a new multipart upload to get ID
+ uploadID, err := st.client.NewMultipartUpload(
+ ctx,
+ st.bucket,
+ key,
+ st.config.PutOpts,
+ )
+ if err != nil {
+ return transformS3Error(err)
+ }
+
+ var (
+ count int
+ parts []minio.CompletePart
+ chunk = make([]byte, st.config.PutChunkSize)
+ rdr = bytes.NewReader(nil)
+ )
+
+ // Note that we do not perform any kind of
+ // memory pooling of the chunk buffers here.
+ // Optimal chunking sizes for S3 writes are in
+ // the orders of megabytes, so letting the GC
+ // collect these ASAP is much preferred.
+
+loop:
+ for done := false; !done; {
+ // Read next chunk into byte buffer
+ n, err := io.ReadFull(r, chunk)
+
+ switch err {
+ // Successful read
+ case nil:
+
+ // Reached end, buffer empty
+ case io.EOF:
+ break loop
+
+ // Reached end, but buffer not empty
+ case io.ErrUnexpectedEOF:
+ done = true
+
+ // All other errors
+ default:
+ return err
+ }
+
+ // Reset byte reader
+ rdr.Reset(chunk[:n])
+
+ // Put this object chunk in S3 store
+ pt, err := st.client.PutObjectPart(
+ ctx,
+ st.bucket,
+ key,
+ uploadID,
+ count,
+ rdr,
+ st.config.PutChunkSize,
+ "",
+ "",
+ nil,
+ )
+ if err != nil {
+ return err
+ }
+
+ // Append completed part to slice
+ parts = append(parts, minio.CompletePart{
+ PartNumber: pt.PartNumber,
+ ETag: pt.ETag,
+ ChecksumCRC32: pt.ChecksumCRC32,
+ ChecksumCRC32C: pt.ChecksumCRC32C,
+ ChecksumSHA1: pt.ChecksumSHA1,
+ ChecksumSHA256: pt.ChecksumSHA256,
+ })
+
+ // Iterate part count
+ count++
+ }
+
+ // Complete this multi-part upload operation
+ _, err = st.client.CompleteMultipartUpload(
+ ctx,
+ st.bucket,
+ key,
+ uploadID,
+ parts,
+ st.config.PutOpts,
+ )
+ if err != nil {
+ return err
+ }
+
+ return nil
+}
+
+// Stat implements Storage.Stat().
+func (st *S3Storage) Stat(ctx context.Context, key string) (bool, error) {
+ // Check storage open
+ if st.closed() {
+ return false, ErrClosed
+ }
+
+ // Query object in S3 bucket
+ _, err := st.client.StatObject(
+ ctx,
+ st.bucket,
+ key,
+ st.config.StatOpts,
+ )
+ if err != nil {
+ return false, transformS3Error(err)
+ }
+
+ return true, nil
+}
+
+// Remove implements Storage.Remove().
+func (st *S3Storage) Remove(ctx context.Context, key string) error {
+ // Check storage open
+ if st.closed() {
+ return ErrClosed
+ }
+
+ // S3 returns no error on remove for non-existent keys
+ if ok, err := st.Stat(ctx, key); err != nil {
+ return err
+ } else if !ok {
+ return ErrNotFound
+ }
+
+ // Remove object from S3 bucket
+ err := st.client.RemoveObject(
+ ctx,
+ st.bucket,
+ key,
+ st.config.RemoveOpts,
+ )
+ if err != nil {
+ return transformS3Error(err)
+ }
+
+ return nil
+}
+
+// WalkKeys implements Storage.WalkKeys().
+func (st *S3Storage) WalkKeys(ctx context.Context, opts WalkKeysOptions) error {
+ var (
+ prev string
+ token string
+ )
+
+ for {
+ // List the objects in bucket starting at marker
+ result, err := st.client.ListObjectsV2(
+ st.bucket,
+ "",
+ prev,
+ token,
+ "",
+ st.config.ListSize,
+ )
+ if err != nil {
+ return err
+ }
+
+ // Pass each object through walk func
+ for _, obj := range result.Contents {
+ if err := opts.WalkFn(ctx, Entry{
+ Key: obj.Key,
+ Size: obj.Size,
+ }); err != nil {
+ return err
+ }
+ }
+
+ // No token means we reached end of bucket
+ if result.NextContinuationToken == "" {
+ return nil
+ }
+
+ // Set continue token and prev mark
+ token = result.NextContinuationToken
+ prev = result.StartAfter
+ }
+}
+
+// Close implements Storage.Close().
+func (st *S3Storage) Close() error {
+ atomic.StoreUint32(&st.state, 1)
+ return nil
+}
+
+// closed returns whether S3Storage is closed.
+func (st *S3Storage) closed() bool {
+ return (atomic.LoadUint32(&st.state) == 1)
+}
diff --git a/vendor/codeberg.org/gruf/go-store/v2/storage/storage.go b/vendor/codeberg.org/gruf/go-store/v2/storage/storage.go
new file mode 100644
index 000000000..00fbe7abd
--- /dev/null
+++ b/vendor/codeberg.org/gruf/go-store/v2/storage/storage.go
@@ -0,0 +1,53 @@
+package storage
+
+import (
+ "context"
+ "io"
+)
+
+// Storage defines a means of storing and accessing key value pairs
+type Storage interface {
+ // ReadBytes returns the byte value for key in storage
+ ReadBytes(ctx context.Context, key string) ([]byte, error)
+
+ // ReadStream returns an io.ReadCloser for the value bytes at key in the storage
+ ReadStream(ctx context.Context, key string) (io.ReadCloser, error)
+
+ // WriteBytes writes the supplied value bytes at key in the storage
+ WriteBytes(ctx context.Context, key string, value []byte) error
+
+ // WriteStream writes the bytes from supplied reader at key in the storage
+ WriteStream(ctx context.Context, key string, r io.Reader) error
+
+ // Stat checks if the supplied key is in the storage
+ Stat(ctx context.Context, key string) (bool, error)
+
+ // Remove attempts to remove the supplied key-value pair from storage
+ Remove(ctx context.Context, key string) error
+
+ // Close will close the storage, releasing any file locks
+ Close() error
+
+ // Clean removes unused values and unclutters the storage (e.g. removing empty folders)
+ Clean(ctx context.Context) error
+
+ // WalkKeys walks the keys in the storage
+ WalkKeys(ctx context.Context, opts WalkKeysOptions) error
+}
+
+// Entry represents a key in a Storage{} implementation,
+// with any associated metadata that may have been set.
+type Entry struct {
+ // Key is this entry's unique storage key.
+ Key string
+
+ // Size is the size of this entry in storage.
+ // Note that size < 0 indicates unknown.
+ Size int64
+}
+
+// WalkKeysOptions defines how to walk the keys in a storage implementation
+type WalkKeysOptions struct {
+ // WalkFn is the function to apply on each StorageEntry
+ WalkFn func(context.Context, Entry) error
+}
diff --git a/vendor/codeberg.org/gruf/go-store/v2/storage/transform.go b/vendor/codeberg.org/gruf/go-store/v2/storage/transform.go
new file mode 100644
index 000000000..3863dd774
--- /dev/null
+++ b/vendor/codeberg.org/gruf/go-store/v2/storage/transform.go
@@ -0,0 +1,25 @@
+package storage
+
+// KeyTransform defines a method of converting store keys to storage paths (and vice-versa)
+type KeyTransform interface {
+ // KeyToPath converts a supplied key to storage path
+ KeyToPath(string) string
+
+ // PathToKey converts a supplied storage path to key
+ PathToKey(string) string
+}
+
+type nopKeyTransform struct{}
+
+// NopTransform returns a nop key transform (i.e. key = path)
+func NopTransform() KeyTransform {
+ return &nopKeyTransform{}
+}
+
+func (t *nopKeyTransform) KeyToPath(key string) string {
+ return key
+}
+
+func (t *nopKeyTransform) PathToKey(path string) string {
+ return path
+}