diff options
Diffstat (limited to 'vendor/codeberg.org/gruf/go-storage')
-rw-r--r-- | vendor/codeberg.org/gruf/go-storage/LICENSE | 9 | ||||
-rw-r--r-- | vendor/codeberg.org/gruf/go-storage/README.md | 5 | ||||
-rw-r--r-- | vendor/codeberg.org/gruf/go-storage/block.archived | 887 | ||||
-rw-r--r-- | vendor/codeberg.org/gruf/go-storage/block_test.archived | 38 | ||||
-rw-r--r-- | vendor/codeberg.org/gruf/go-storage/disk/disk.go | 467 | ||||
-rw-r--r-- | vendor/codeberg.org/gruf/go-storage/disk/fs.go | 206 | ||||
-rw-r--r-- | vendor/codeberg.org/gruf/go-storage/errors.go | 16 | ||||
-rw-r--r-- | vendor/codeberg.org/gruf/go-storage/internal/errors.go | 56 | ||||
-rw-r--r-- | vendor/codeberg.org/gruf/go-storage/internal/path.go | 24 | ||||
-rw-r--r-- | vendor/codeberg.org/gruf/go-storage/memory/memory.go | 253 | ||||
-rw-r--r-- | vendor/codeberg.org/gruf/go-storage/s3/errors.go | 47 | ||||
-rw-r--r-- | vendor/codeberg.org/gruf/go-storage/s3/s3.go | 479 | ||||
-rw-r--r-- | vendor/codeberg.org/gruf/go-storage/storage.go | 73 | ||||
-rw-r--r-- | vendor/codeberg.org/gruf/go-storage/test.sh | 29 |
14 files changed, 2589 insertions, 0 deletions
diff --git a/vendor/codeberg.org/gruf/go-storage/LICENSE b/vendor/codeberg.org/gruf/go-storage/LICENSE new file mode 100644 index 000000000..e4163ae35 --- /dev/null +++ b/vendor/codeberg.org/gruf/go-storage/LICENSE @@ -0,0 +1,9 @@ +MIT License + +Copyright (c) 2022 gruf + +Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. diff --git a/vendor/codeberg.org/gruf/go-storage/README.md b/vendor/codeberg.org/gruf/go-storage/README.md new file mode 100644 index 000000000..430b43467 --- /dev/null +++ b/vendor/codeberg.org/gruf/go-storage/README.md @@ -0,0 +1,5 @@ +# go-storage + +A simple library providing various storage implementations with a simple read-write-stat interface. + +Supports: on-disk, memory, S3.
\ No newline at end of file diff --git a/vendor/codeberg.org/gruf/go-storage/block.archived b/vendor/codeberg.org/gruf/go-storage/block.archived new file mode 100644 index 000000000..11a757211 --- /dev/null +++ b/vendor/codeberg.org/gruf/go-storage/block.archived @@ -0,0 +1,887 @@ +package storage + +import ( + "bytes" + "context" + "crypto/sha256" + "fmt" + "io" + "io/fs" + "os" + "strings" + "sync" + "sync/atomic" + "syscall" + + "codeberg.org/gruf/go-byteutil" + "codeberg.org/gruf/go-errors/v2" + "codeberg.org/gruf/go-fastcopy" + "codeberg.org/gruf/go-hashenc" + "codeberg.org/gruf/go-iotools" + "codeberg.org/gruf/go-pools" + "codeberg.org/gruf/go-store/v2/util" +) + +var ( + nodePathPrefix = "node/" + blockPathPrefix = "block/" +) + +// DefaultBlockConfig is the default BlockStorage configuration. +var DefaultBlockConfig = &BlockConfig{ + BlockSize: 1024 * 16, + WriteBufSize: 4096, + Overwrite: false, + Compression: NoCompression(), +} + +// BlockConfig defines options to be used when opening a BlockStorage. +type BlockConfig struct { + // BlockSize is the chunking size to use when splitting and storing blocks of data. + BlockSize int + + // ReadBufSize is the buffer size to use when reading node files. + ReadBufSize int + + // WriteBufSize is the buffer size to use when writing file streams. + WriteBufSize int + + // Overwrite allows overwriting values of stored keys in the storage. + Overwrite bool + + // Compression is the Compressor to use when reading / writing files, + // default is no compression. + Compression Compressor +} + +// getBlockConfig returns a valid BlockConfig for supplied ptr. +func getBlockConfig(cfg *BlockConfig) BlockConfig { + // If nil, use default + if cfg == nil { + cfg = DefaultBlockConfig + } + + // Assume nil compress == none + if cfg.Compression == nil { + cfg.Compression = NoCompression() + } + + // Assume 0 chunk size == use default + if cfg.BlockSize <= 0 { + cfg.BlockSize = DefaultBlockConfig.BlockSize + } + + // Assume 0 buf size == use default + if cfg.WriteBufSize <= 0 { + cfg.WriteBufSize = DefaultDiskConfig.WriteBufSize + } + + // Return owned config copy + return BlockConfig{ + BlockSize: cfg.BlockSize, + WriteBufSize: cfg.WriteBufSize, + Overwrite: cfg.Overwrite, + Compression: cfg.Compression, + } +} + +// BlockStorage is a Storage implementation that stores input data as chunks on +// a filesystem. Each value is chunked into blocks of configured size and these +// blocks are stored with name equal to their base64-encoded SHA256 hash-sum. A +// "node" file is finally created containing an array of hashes contained within +// this value. +type BlockStorage struct { + path string // path is the root path of this store + blockPath string // blockPath is the joined root path + block path prefix + nodePath string // nodePath is the joined root path + node path prefix + config BlockConfig // cfg is the supplied configuration for this store + hashPool sync.Pool // hashPool is this store's hashEncoder pool + bufpool pools.BufferPool // bufpool is this store's bytes.Buffer pool + cppool fastcopy.CopyPool // cppool is the prepared io copier with buffer pool + lock *Lock // lock is the opened lockfile for this storage instance + + // NOTE: + // BlockStorage does not need to lock each of the underlying block files + // as the filename itself directly relates to the contents. If there happens + // to be an overwrite, it will just be of the same data since the filename is + // the hash of the data. +} + +// OpenBlock opens a BlockStorage instance for given folder path and configuration. +func OpenBlock(path string, cfg *BlockConfig) (*BlockStorage, error) { + // Acquire path builder + pb := util.GetPathBuilder() + defer util.PutPathBuilder(pb) + + // Clean provided path, ensure ends in '/' (should + // be dir, this helps with file path trimming later) + path = pb.Clean(path) + "/" + + // Get checked config + config := getBlockConfig(cfg) + + // Attempt to open path + file, err := os.OpenFile(path, defaultFileROFlags, defaultDirPerms) + if err != nil { + // If not a not-exist error, return + if !os.IsNotExist(err) { + return nil, err + } + + // Attempt to make store path dirs + err = os.MkdirAll(path, defaultDirPerms) + if err != nil { + return nil, err + } + + // Reopen dir now it's been created + file, err = os.OpenFile(path, defaultFileROFlags, defaultDirPerms) + if err != nil { + return nil, err + } + } + defer file.Close() + + // Double check this is a dir (NOT a file!) + stat, err := file.Stat() + if err != nil { + return nil, err + } else if !stat.IsDir() { + return nil, new_error("path is file") + } + + // Open and acquire storage lock for path + lock, err := OpenLock(pb.Join(path, LockFile)) + if err != nil { + return nil, err + } + + // Figure out the largest size for bufpool slices + bufSz := encodedHashLen + if bufSz < config.BlockSize { + bufSz = config.BlockSize + } + if bufSz < config.WriteBufSize { + bufSz = config.WriteBufSize + } + + // Prepare BlockStorage + st := &BlockStorage{ + path: path, + blockPath: pb.Join(path, blockPathPrefix), + nodePath: pb.Join(path, nodePathPrefix), + config: config, + hashPool: sync.Pool{ + New: func() interface{} { + return newHashEncoder() + }, + }, + bufpool: pools.NewBufferPool(bufSz), + lock: lock, + } + + // Set copypool buffer size + st.cppool.Buffer(config.ReadBufSize) + + return st, nil +} + +// Clean implements storage.Clean(). +func (st *BlockStorage) Clean(ctx context.Context) error { + // Check if open + if st.lock.Closed() { + return ErrClosed + } + + // Check context still valid + if err := ctx.Err(); err != nil { + return err + } + + // Acquire path builder + pb := util.GetPathBuilder() + defer util.PutPathBuilder(pb) + + nodes := map[string]*node{} + + // Walk nodes dir for entries + err := walkDir(pb, st.nodePath, func(npath string, fsentry fs.DirEntry) error { + // Only deal with regular files + if !fsentry.Type().IsRegular() { + return nil + } + + // Get joined node path name + npath = pb.Join(npath, fsentry.Name()) + + // Attempt to open RO file + file, err := open(npath, defaultFileROFlags) + if err != nil { + return err + } + defer file.Close() + + // Alloc new Node + acquire hash buffer for writes + hbuf := st.bufpool.Get() + defer st.bufpool.Put(hbuf) + hbuf.Guarantee(encodedHashLen) + node := node{} + + // Write file contents to node + _, err = io.CopyBuffer( + &nodeWriter{ + node: &node, + buf: hbuf, + }, + file, + nil, + ) + if err != nil { + return err + } + + // Append to nodes slice + nodes[fsentry.Name()] = &node + return nil + }) + + // Handle errors (though nodePath may not have been created yet) + if err != nil && !os.IsNotExist(err) { + return err + } + + // Walk blocks dir for entries + err = walkDir(pb, st.blockPath, func(bpath string, fsentry fs.DirEntry) error { + // Only deal with regular files + if !fsentry.Type().IsRegular() { + return nil + } + + inUse := false + for key, node := range nodes { + if node.removeHash(fsentry.Name()) { + if len(node.hashes) < 1 { + // This node contained hash, and after removal is now empty. + // Remove this node from our tracked nodes slice + delete(nodes, key) + } + inUse = true + } + } + + // Block hash is used by node + if inUse { + return nil + } + + // Get joined block path name + bpath = pb.Join(bpath, fsentry.Name()) + + // Remove this unused block path + return os.Remove(bpath) + }) + + // Handle errors (though blockPath may not have been created yet) + if err != nil && !os.IsNotExist(err) { + return err + } + + // If there are nodes left at this point, they are corrupt + // (i.e. they're referencing block hashes that don't exist) + if len(nodes) > 0 { + nodeKeys := []string{} + for key := range nodes { + nodeKeys = append(nodeKeys, key) + } + return fmt.Errorf("store/storage: corrupted nodes: %v", nodeKeys) + } + + return nil +} + +// ReadBytes implements Storage.ReadBytes(). +func (st *BlockStorage) ReadBytes(ctx context.Context, key string) ([]byte, error) { + // Get stream reader for key + rc, err := st.ReadStream(ctx, key) + if err != nil { + return nil, err + } + defer rc.Close() + + // Read all bytes and return + return io.ReadAll(rc) +} + +// ReadStream implements Storage.ReadStream(). +func (st *BlockStorage) ReadStream(ctx context.Context, key string) (io.ReadCloser, error) { + // Get node file path for key + npath, err := st.nodePathForKey(key) + if err != nil { + return nil, err + } + + // Check if open + if st.lock.Closed() { + return nil, ErrClosed + } + + // Check context still valid + if err := ctx.Err(); err != nil { + return nil, err + } + + // Attempt to open RO file + file, err := open(npath, defaultFileROFlags) + if err != nil { + return nil, errSwapNotFound(err) + } + defer file.Close() + + // Acquire hash buffer for writes + hbuf := st.bufpool.Get() + defer st.bufpool.Put(hbuf) + + var node node + + // Write file contents to node + _, err = st.cppool.Copy( + &nodeWriter{ + node: &node, + buf: hbuf, + }, + file, + ) + if err != nil { + return nil, err + } + + // Prepare block reader and return + return iotools.NopReadCloser(&blockReader{ + storage: st, + node: &node, + }), nil +} + +// readBlock reads the block with hash (key) from the filesystem. +func (st *BlockStorage) readBlock(key string) ([]byte, error) { + // Get block file path for key + bpath := st.blockPathForKey(key) + + // Attempt to open RO file + file, err := open(bpath, defaultFileROFlags) + if err != nil { + return nil, wrap(new_error("corrupted node"), err) + } + defer file.Close() + + // Wrap the file in a compressor + cFile, err := st.config.Compression.Reader(file) + if err != nil { + return nil, wrap(new_error("corrupted node"), err) + } + defer cFile.Close() + + // Read the entire file + return io.ReadAll(cFile) +} + +// WriteBytes implements Storage.WriteBytes(). +func (st *BlockStorage) WriteBytes(ctx context.Context, key string, value []byte) (int, error) { + n, err := st.WriteStream(ctx, key, bytes.NewReader(value)) + return int(n), err +} + +// WriteStream implements Storage.WriteStream(). +func (st *BlockStorage) WriteStream(ctx context.Context, key string, r io.Reader) (int64, error) { + // Get node file path for key + npath, err := st.nodePathForKey(key) + if err != nil { + return 0, err + } + + // Check if open + if st.lock.Closed() { + return 0, ErrClosed + } + + // Check context still valid + if err := ctx.Err(); err != nil { + return 0, err + } + + // Check if this exists + ok, err := stat(key) + if err != nil { + return 0, err + } + + // Check if we allow overwrites + if ok && !st.config.Overwrite { + return 0, ErrAlreadyExists + } + + // Ensure nodes dir (and any leading up to) exists + err = os.MkdirAll(st.nodePath, defaultDirPerms) + if err != nil { + return 0, err + } + + // Ensure blocks dir (and any leading up to) exists + err = os.MkdirAll(st.blockPath, defaultDirPerms) + if err != nil { + return 0, err + } + + var node node + var total atomic.Int64 + + // Acquire HashEncoder + hc := st.hashPool.Get().(*hashEncoder) + defer st.hashPool.Put(hc) + + // Create new waitgroup and OnceError for + // goroutine error tracking and propagating + wg := sync.WaitGroup{} + onceErr := errors.OnceError{} + +loop: + for !onceErr.IsSet() { + // Fetch new buffer for this loop + buf := st.bufpool.Get() + buf.Grow(st.config.BlockSize) + + // Read next chunk + n, err := io.ReadFull(r, buf.B) + switch err { + case nil, io.ErrUnexpectedEOF: + // do nothing + case io.EOF: + st.bufpool.Put(buf) + break loop + default: + st.bufpool.Put(buf) + return 0, err + } + + // Hash the encoded data + sum := hc.EncodeSum(buf.B) + + // Append to the node's hashes + node.hashes = append(node.hashes, sum) + + // If already on disk, skip + has, err := st.statBlock(sum) + if err != nil { + st.bufpool.Put(buf) + return 0, err + } else if has { + st.bufpool.Put(buf) + continue loop + } + + // Check if reached EOF + atEOF := (n < buf.Len()) + + wg.Add(1) + go func() { + // Perform writes in goroutine + + defer func() { + // Defer release + + // signal we're done + st.bufpool.Put(buf) + wg.Done() + }() + + // Write block to store at hash + n, err := st.writeBlock(sum, buf.B[:n]) + if err != nil { + onceErr.Store(err) + return + } + + // Increment total. + total.Add(int64(n)) + }() + + // Break at end + if atEOF { + break loop + } + } + + // Wait, check errors + wg.Wait() + if onceErr.IsSet() { + return 0, onceErr.Load() + } + + // If no hashes created, return + if len(node.hashes) < 1 { + return 0, new_error("no hashes written") + } + + // Prepare to swap error if need-be + errSwap := errSwapNoop + + // Build file RW flags + // NOTE: we performed an initial check for + // this before writing blocks, but if + // the utilizer of this storage didn't + // correctly mutex protect this key then + // someone may have beaten us to the + // punch at writing the node file. + flags := defaultFileRWFlags + if !st.config.Overwrite { + flags |= syscall.O_EXCL + + // Catch + replace err exist + errSwap = errSwapExist + } + + // Attempt to open RW file + file, err := open(npath, flags) + if err != nil { + return 0, errSwap(err) + } + defer file.Close() + + // Acquire write buffer + buf := st.bufpool.Get() + defer st.bufpool.Put(buf) + buf.Grow(st.config.WriteBufSize) + + // Finally, write data to file + _, err = io.CopyBuffer(file, &nodeReader{node: node}, buf.B) + return total.Load(), err +} + +// writeBlock writes the block with hash and supplied value to the filesystem. +func (st *BlockStorage) writeBlock(hash string, value []byte) (int, error) { + // Get block file path for key + bpath := st.blockPathForKey(hash) + + // Attempt to open RW file + file, err := open(bpath, defaultFileRWFlags) + if err != nil { + if err == syscall.EEXIST { + err = nil /* race issue describe in struct NOTE */ + } + return 0, err + } + defer file.Close() + + // Wrap the file in a compressor + cFile, err := st.config.Compression.Writer(file) + if err != nil { + return 0, err + } + defer cFile.Close() + + // Write value to file + return cFile.Write(value) +} + +// statBlock checks for existence of supplied block hash. +func (st *BlockStorage) statBlock(hash string) (bool, error) { + return stat(st.blockPathForKey(hash)) +} + +// Stat implements Storage.Stat() +func (st *BlockStorage) Stat(ctx context.Context, key string) (bool, error) { + // Get node file path for key + kpath, err := st.nodePathForKey(key) + if err != nil { + return false, err + } + + // Check if open + if st.lock.Closed() { + return false, ErrClosed + } + + // Check context still valid + if err := ctx.Err(); err != nil { + return false, err + } + + // Check for file on disk + return stat(kpath) +} + +// Remove implements Storage.Remove(). +func (st *BlockStorage) Remove(ctx context.Context, key string) error { + // Get node file path for key + kpath, err := st.nodePathForKey(key) + if err != nil { + return err + } + + // Check if open + if st.lock.Closed() { + return ErrClosed + } + + // Check context still valid + if err := ctx.Err(); err != nil { + return err + } + + // Remove at path (we know this is file) + if err := unlink(kpath); err != nil { + return errSwapNotFound(err) + } + + return nil +} + +// Close implements Storage.Close(). +func (st *BlockStorage) Close() error { + return st.lock.Close() +} + +// WalkKeys implements Storage.WalkKeys(). +func (st *BlockStorage) WalkKeys(ctx context.Context, opts WalkKeysOptions) error { + // Check if open + if st.lock.Closed() { + return ErrClosed + } + + // Check context still valid + if err := ctx.Err(); err != nil { + return err + } + + // Acquire path builder + pb := util.GetPathBuilder() + defer util.PutPathBuilder(pb) + + // Walk dir for entries + return walkDir(pb, st.nodePath, func(npath string, fsentry fs.DirEntry) error { + if !fsentry.Type().IsRegular() { + // Only deal with regular files + return nil + } + + // Perform provided walk function + return opts.WalkFn(ctx, Entry{ + Key: fsentry.Name(), + Size: -1, + }) + }) +} + +// nodePathForKey calculates the node file path for supplied key. +func (st *BlockStorage) nodePathForKey(key string) (string, error) { + // Path separators are illegal, as directory paths + if strings.Contains(key, "/") || key == "." || key == ".." { + return "", ErrInvalidKey + } + + // Acquire path builder + pb := util.GetPathBuilder() + defer util.PutPathBuilder(pb) + + // Return joined + cleaned node-path + return pb.Join(st.nodePath, key), nil +} + +// blockPathForKey calculates the block file path for supplied hash. +func (st *BlockStorage) blockPathForKey(hash string) string { + pb := util.GetPathBuilder() + defer util.PutPathBuilder(pb) + return pb.Join(st.blockPath, hash) +} + +// hashSeparator is the separating byte between block hashes. +const hashSeparator = byte('\n') + +// node represents the contents of a node file in storage. +type node struct { + hashes []string +} + +// removeHash attempts to remove supplied block hash from the node's hash array. +func (n *node) removeHash(hash string) bool { + for i := 0; i < len(n.hashes); { + if n.hashes[i] == hash { + // Drop this hash from slice + n.hashes = append(n.hashes[:i], n.hashes[i+1:]...) + return true + } + + // Continue iter + i++ + } + return false +} + +// nodeReader is an io.Reader implementation for the node file representation, +// which is useful when calculated node file is being written to the store. +type nodeReader struct { + node node + idx int + last int +} + +func (r *nodeReader) Read(b []byte) (int, error) { + n := 0 + + // '-1' means we missed writing + // hash separator on last iteration + if r.last == -1 { + b[n] = hashSeparator + n++ + r.last = 0 + } + + for r.idx < len(r.node.hashes) { + hash := r.node.hashes[r.idx] + + // Copy into buffer + update read count + m := copy(b[n:], hash[r.last:]) + n += m + + // If incomplete copy, return here + if m < len(hash)-r.last { + r.last = m + return n, nil + } + + // Check we can write last separator + if n == len(b) { + r.last = -1 + return n, nil + } + + // Write separator, iter, reset + b[n] = hashSeparator + n++ + r.idx++ + r.last = 0 + } + + // We reached end of hashes + return n, io.EOF +} + +// nodeWriter is an io.Writer implementation for the node file representation, +// which is useful when calculated node file is being read from the store. +type nodeWriter struct { + node *node + buf *byteutil.Buffer +} + +func (w *nodeWriter) Write(b []byte) (int, error) { + n := 0 + + for { + // Find next hash separator position + idx := bytes.IndexByte(b[n:], hashSeparator) + if idx == -1 { + // Check we shouldn't be expecting it + if w.buf.Len() > encodedHashLen { + return n, new_error("invalid node") + } + + // Write all contents to buffer + w.buf.Write(b[n:]) + return len(b), nil + } + + // Found hash separator, write + // current buf contents to Node hashes + w.buf.Write(b[n : n+idx]) + n += idx + 1 + if w.buf.Len() != encodedHashLen { + return n, new_error("invalid node") + } + + // Append to hashes & reset + w.node.hashes = append(w.node.hashes, w.buf.String()) + w.buf.Reset() + } +} + +// blockReader is an io.Reader implementation for the combined, linked block +// data contained with a node file. Basically, this allows reading value data +// from the store for a given node file. +type blockReader struct { + storage *BlockStorage + node *node + buf []byte + prev int +} + +func (r *blockReader) Read(b []byte) (int, error) { + n := 0 + + // Data left in buf, copy as much as we + // can into supplied read buffer + if r.prev < len(r.buf)-1 { + n += copy(b, r.buf[r.prev:]) + r.prev += n + if n >= len(b) { + return n, nil + } + } + + for { + // Check we have any hashes left + if len(r.node.hashes) < 1 { + return n, io.EOF + } + + // Get next key from slice + key := r.node.hashes[0] + r.node.hashes = r.node.hashes[1:] + + // Attempt to fetch next batch of data + var err error + r.buf, err = r.storage.readBlock(key) + if err != nil { + return n, err + } + r.prev = 0 + + // Copy as much as can from new buffer + m := copy(b[n:], r.buf) + r.prev += m + n += m + + // If we hit end of supplied buf, return + if n >= len(b) { + return n, nil + } + } +} + +var ( + // base64Encoding is our base64 encoding object. + base64Encoding = hashenc.Base64() + + // encodedHashLen is the once-calculated encoded hash-sum length + encodedHashLen = base64Encoding.EncodedLen( + sha256.New().Size(), + ) +) + +// hashEncoder is a HashEncoder with built-in encode buffer. +type hashEncoder struct { + henc hashenc.HashEncoder + ebuf []byte +} + +// newHashEncoder returns a new hashEncoder instance. +func newHashEncoder() *hashEncoder { + return &hashEncoder{ + henc: hashenc.New(sha256.New(), base64Encoding), + ebuf: make([]byte, encodedHashLen), + } +} + +// EncodeSum encodes the src data and returns resulting bytes, only valid until next call to EncodeSum(). +func (henc *hashEncoder) EncodeSum(src []byte) string { + henc.henc.EncodeSum(henc.ebuf, src) + return string(henc.ebuf) +} diff --git a/vendor/codeberg.org/gruf/go-storage/block_test.archived b/vendor/codeberg.org/gruf/go-storage/block_test.archived new file mode 100644 index 000000000..8436f067f --- /dev/null +++ b/vendor/codeberg.org/gruf/go-storage/block_test.archived @@ -0,0 +1,38 @@ +package storage_test + +import ( + "os" + "testing" + + "codeberg.org/gruf/go-store/v2/storage" +) + +func TestBlockStorage(t *testing.T) { + // Set test path, defer deleting it + testPath := "blockstorage.test" + t.Cleanup(func() { + os.RemoveAll(testPath) + }) + + // Open new blockstorage instance + st, err := storage.OpenBlock(testPath, nil) + if err != nil { + t.Fatalf("Failed opening storage: %v", err) + } + + // Attempt multi open of same instance + _, err = storage.OpenBlock(testPath, nil) + if err == nil { + t.Fatal("Successfully opened a locked storage instance") + } + + // Run the storage tests + testStorage(t, st) + + // Test reopen storage path + st, err = storage.OpenBlock(testPath, nil) + if err != nil { + t.Fatalf("Failed opening storage: %v", err) + } + st.Close() +} diff --git a/vendor/codeberg.org/gruf/go-storage/disk/disk.go b/vendor/codeberg.org/gruf/go-storage/disk/disk.go new file mode 100644 index 000000000..b11346503 --- /dev/null +++ b/vendor/codeberg.org/gruf/go-storage/disk/disk.go @@ -0,0 +1,467 @@ +package disk + +import ( + "bytes" + "context" + "errors" + "io" + "io/fs" + "os" + "path" + "strings" + "syscall" + + "codeberg.org/gruf/go-fastcopy" + "codeberg.org/gruf/go-fastpath/v2" + "codeberg.org/gruf/go-storage" + "codeberg.org/gruf/go-storage/internal" +) + +// ensure DiskStorage conforms to storage.Storage. +var _ storage.Storage = (*DiskStorage)(nil) + +// DefaultConfig returns the default DiskStorage configuration. +func DefaultConfig() Config { + return defaultConfig +} + +// immutable default configuration. +var defaultConfig = Config{ + OpenRead: OpenArgs{syscall.O_RDONLY, 0o644}, + OpenWrite: OpenArgs{syscall.O_CREAT | syscall.O_WRONLY, 0o644}, + MkdirPerms: 0o755, + WriteBufSize: 4096, +} + +// OpenArgs defines args passed +// in a syscall.Open() operation. +type OpenArgs struct { + Flags int + Perms uint32 +} + +// Config defines options to be +// used when opening a DiskStorage. +type Config struct { + + // OpenRead are the arguments passed + // to syscall.Open() when opening a + // file for read operations. + OpenRead OpenArgs + + // OpenWrite are the arguments passed + // to syscall.Open() when opening a + // file for write operations. + OpenWrite OpenArgs + + // MkdirPerms are the permissions used + // when creating necessary sub-dirs in + // a storage key with slashes. + MkdirPerms uint32 + + // WriteBufSize is the buffer size + // to use when writing file streams. + WriteBufSize int +} + +// getDiskConfig returns valid (and owned!) Config for given ptr. +func getDiskConfig(cfg *Config) Config { + if cfg == nil { + // use defaults. + return defaultConfig + } + + // Ensure non-zero syscall args. + if cfg.OpenRead.Flags == 0 { + cfg.OpenRead.Flags = defaultConfig.OpenRead.Flags + } + if cfg.OpenRead.Perms == 0 { + cfg.OpenRead.Perms = defaultConfig.OpenRead.Perms + } + if cfg.OpenWrite.Flags == 0 { + cfg.OpenWrite.Flags = defaultConfig.OpenWrite.Flags + } + if cfg.OpenWrite.Perms == 0 { + cfg.OpenWrite.Perms = defaultConfig.OpenWrite.Perms + } + if cfg.MkdirPerms == 0 { + cfg.MkdirPerms = defaultConfig.MkdirPerms + } + + // Ensure valid write buf. + if cfg.WriteBufSize <= 0 { + cfg.WriteBufSize = defaultConfig.WriteBufSize + } + + return Config{ + OpenRead: cfg.OpenRead, + OpenWrite: cfg.OpenWrite, + MkdirPerms: cfg.MkdirPerms, + WriteBufSize: cfg.WriteBufSize, + } +} + +// DiskStorage is a Storage implementation +// that stores directly to a filesystem. +type DiskStorage struct { + path string // path is the root path of this store + pool fastcopy.CopyPool // pool is the prepared io copier with buffer pool + cfg Config // cfg is the supplied configuration for this store +} + +// Open opens a DiskStorage instance for given folder path and configuration. +func Open(path string, cfg *Config) (*DiskStorage, error) { + // Check + set config defaults. + config := getDiskConfig(cfg) + + // Clean provided storage path, ensure + // final '/' to help with path trimming. + pb := internal.GetPathBuilder() + path = pb.Clean(path) + "/" + internal.PutPathBuilder(pb) + + // Ensure directories up-to path exist. + perms := fs.FileMode(config.MkdirPerms) + err := os.MkdirAll(path, perms) + if err != nil { + return nil, err + } + + // Prepare DiskStorage. + st := &DiskStorage{ + path: path, + cfg: config, + } + + // Set fastcopy pool buffer size. + st.pool.Buffer(config.WriteBufSize) + + return st, nil +} + +// Clean: implements Storage.Clean(). +func (st *DiskStorage) Clean(ctx context.Context) error { + // Check context still valid. + if err := ctx.Err(); err != nil { + return err + } + + // Clean unused directories. + return cleanDirs(st.path, OpenArgs{ + Flags: syscall.O_RDONLY, + }) +} + +// ReadBytes: implements Storage.ReadBytes(). +func (st *DiskStorage) ReadBytes(ctx context.Context, key string) ([]byte, error) { + // Get stream reader for key + rc, err := st.ReadStream(ctx, key) + if err != nil { + return nil, err + } + + // Read all data to memory. + data, err := io.ReadAll(rc) + if err != nil { + _ = rc.Close() + return nil, err + } + + // Close storage stream reader. + if err := rc.Close(); err != nil { + return nil, err + } + + return data, nil +} + +// ReadStream: implements Storage.ReadStream(). +func (st *DiskStorage) ReadStream(ctx context.Context, key string) (io.ReadCloser, error) { + // Generate file path for key. + kpath, err := st.Filepath(key) + if err != nil { + return nil, err + } + + // Check context still valid. + if err := ctx.Err(); err != nil { + return nil, err + } + + // Attempt to open file with read args. + file, err := open(kpath, st.cfg.OpenRead) + if err != nil { + + if err == syscall.ENOENT { + // Translate not-found errors and wrap with key. + err = internal.ErrWithKey(storage.ErrNotFound, key) + } + + return nil, err + } + + return file, nil +} + +// WriteBytes: implements Storage.WriteBytes(). +func (st *DiskStorage) WriteBytes(ctx context.Context, key string, value []byte) (int, error) { + n, err := st.WriteStream(ctx, key, bytes.NewReader(value)) + return int(n), err +} + +// WriteStream: implements Storage.WriteStream(). +func (st *DiskStorage) WriteStream(ctx context.Context, key string, stream io.Reader) (int64, error) { + // Acquire path builder buffer. + pb := internal.GetPathBuilder() + + // Generate the file path for given key. + kpath, subdir, err := st.filepath(pb, key) + if err != nil { + return 0, err + } + + // Done with path buffer. + internal.PutPathBuilder(pb) + + // Check context still valid. + if err := ctx.Err(); err != nil { + return 0, err + } + + if subdir { + // Get dir of key path. + dir := path.Dir(kpath) + + // Note that subdir will only be set if + // the transformed key (without base path) + // contains any slashes. This is not a + // definitive check, but it allows us to + // skip a syscall if mkdirall not needed! + perms := fs.FileMode(st.cfg.MkdirPerms) + err = os.MkdirAll(dir, perms) + if err != nil { + return 0, err + } + } + + // Attempt to open file with write args. + file, err := open(kpath, st.cfg.OpenWrite) + if err != nil { + + if st.cfg.OpenWrite.Flags&syscall.O_EXCL != 0 && + err == syscall.EEXIST { + // Translate already exists errors and wrap with key. + err = internal.ErrWithKey(storage.ErrAlreadyExists, key) + } + + return 0, err + } + + // Copy provided stream to file interface. + n, err := st.pool.Copy(file, stream) + if err != nil { + _ = file.Close() + return n, err + } + + // Finally, close file. + return n, file.Close() +} + +// Stat implements Storage.Stat(). +func (st *DiskStorage) Stat(ctx context.Context, key string) (*storage.Entry, error) { + // Generate file path for key. + kpath, err := st.Filepath(key) + if err != nil { + return nil, err + } + + // Check context still valid. + if err := ctx.Err(); err != nil { + return nil, err + } + + // Stat file on disk. + stat, err := stat(kpath) + if stat == nil { + return nil, err + } + + return &storage.Entry{ + Key: key, + Size: stat.Size, + }, nil +} + +// Remove implements Storage.Remove(). +func (st *DiskStorage) Remove(ctx context.Context, key string) error { + // Generate file path for key. + kpath, err := st.Filepath(key) + if err != nil { + return err + } + + // Check context still valid. + if err := ctx.Err(); err != nil { + return err + } + + // Stat file on disk. + stat, err := stat(kpath) + if err != nil { + return err + } + + // Not-found (or handled + // as) error situations. + if stat == nil { + return internal.ErrWithKey(storage.ErrNotFound, key) + } else if stat.Mode&syscall.S_IFREG == 0 { + err := errors.New("storage/disk: not a regular file") + return internal.ErrWithKey(err, key) + } + + // Remove at path (we know this is file). + if err := unlink(kpath); err != nil { + + if err == syscall.ENOENT { + // Translate not-found errors and wrap with key. + err = internal.ErrWithKey(storage.ErrNotFound, key) + } + + return err + } + + return nil +} + +// WalkKeys implements Storage.WalkKeys(). +func (st *DiskStorage) WalkKeys(ctx context.Context, opts storage.WalkKeysOpts) error { + if opts.Step == nil { + panic("nil step fn") + } + + // Check context still valid. + if err := ctx.Err(); err != nil { + return err + } + + // Acquire path builder for walk. + pb := internal.GetPathBuilder() + defer internal.PutPathBuilder(pb) + + // Dir to walk. + dir := st.path + + if opts.Prefix != "" { + // Convert key prefix to one of our storage filepaths. + pathprefix, subdir, err := st.filepath(pb, opts.Prefix) + if err != nil { + return internal.ErrWithMsg(err, "prefix error") + } + + if subdir { + // Note that subdir will only be set if + // the transformed key (without base path) + // contains any slashes. This is not a + // definitive check, but it allows us to + // update the directory we walk in case + // it might narrow search parameters! + dir = path.Dir(pathprefix) + } + + // Set updated storage + // path prefix in opts. + opts.Prefix = pathprefix + } + + // Only need to open dirs as read-only. + args := OpenArgs{Flags: syscall.O_RDONLY} + + return walkDir(pb, dir, args, func(kpath string, fsentry fs.DirEntry) error { + if !fsentry.Type().IsRegular() { + // Ignore anything but + // regular file types. + return nil + } + + // Get full item path (without root). + kpath = pb.Join(kpath, fsentry.Name()) + + // Perform a fast filter check against storage path prefix (if set). + if opts.Prefix != "" && !strings.HasPrefix(kpath, opts.Prefix) { + return nil // ignore + } + + // Storage key without base. + key := kpath[len(st.path):] + + // Ignore filtered keys. + if opts.Filter != nil && + !opts.Filter(key) { + return nil // ignore + } + + // Load file info. This should already + // be loaded due to the underlying call + // to os.File{}.ReadDir() populating them. + info, err := fsentry.Info() + if err != nil { + return err + } + + // Perform provided walk function + return opts.Step(storage.Entry{ + Key: key, + Size: info.Size(), + }) + }) +} + +// Filepath checks and returns a formatted Filepath for given key. +func (st *DiskStorage) Filepath(key string) (path string, err error) { + pb := internal.GetPathBuilder() + path, _, err = st.filepath(pb, key) + internal.PutPathBuilder(pb) + return +} + +// filepath performs the "meat" of Filepath(), returning also if path *may* be a subdir of base. +func (st *DiskStorage) filepath(pb *fastpath.Builder, key string) (path string, subdir bool, err error) { + // Fast check for whether this may be a + // sub-directory. This is not a definitive + // check, it's only for a fastpath check. + subdir = strings.ContainsRune(key, '/') + + // Build from base. + pb.Append(st.path) + pb.Append(key) + + // Take COPY of bytes. + path = string(pb.B) + + // Check for dir traversal outside base. + if isDirTraversal(st.path, path) { + err = internal.ErrWithKey(storage.ErrInvalidKey, key) + } + + return +} + +// isDirTraversal will check if rootPlusPath is a dir traversal outside of root, +// assuming that both are cleaned and that rootPlusPath is path.Join(root, somePath). +func isDirTraversal(root, rootPlusPath string) bool { + switch { + // Root is $PWD, check for traversal out of + case root == ".": + return strings.HasPrefix(rootPlusPath, "../") + + // The path MUST be prefixed by root + case !strings.HasPrefix(rootPlusPath, root): + return true + + // In all other cases, check not equal + default: + return len(root) == len(rootPlusPath) + } +} diff --git a/vendor/codeberg.org/gruf/go-storage/disk/fs.go b/vendor/codeberg.org/gruf/go-storage/disk/fs.go new file mode 100644 index 000000000..606d8fb0f --- /dev/null +++ b/vendor/codeberg.org/gruf/go-storage/disk/fs.go @@ -0,0 +1,206 @@ +package disk + +import ( + "errors" + "fmt" + "io/fs" + "os" + "syscall" + + "codeberg.org/gruf/go-fastpath/v2" + "codeberg.org/gruf/go-storage/internal" +) + +// NOTE: +// These functions are for opening storage files, +// not necessarily for e.g. initial setup (OpenFile) + +// walkDir traverses the dir tree of the supplied path, performing the supplied walkFn on each entry +func walkDir(pb *fastpath.Builder, path string, args OpenArgs, walkFn func(string, fs.DirEntry) error) error { + // Read directory entries at path. + entries, err := readDir(path, args) + if err != nil { + return err + } + + // frame represents a directory entry + // walk-loop snapshot, taken when a sub + // directory requiring iteration is found + type frame struct { + path string + entries []fs.DirEntry + } + + // stack contains a list of held snapshot + // frames, representing unfinished upper + // layers of a directory structure yet to + // be traversed. + var stack []frame + +outer: + for { + if len(entries) == 0 { + if len(stack) == 0 { + // Reached end + break outer + } + + // Pop frame from stack + frame := stack[len(stack)-1] + stack = stack[:len(stack)-1] + + // Update loop vars + entries = frame.entries + path = frame.path + } + + for len(entries) > 0 { + // Pop next entry from queue + entry := entries[0] + entries = entries[1:] + + // Pass to provided walk function + if err := walkFn(path, entry); err != nil { + return err + } + + if entry.IsDir() { + // Push current frame to stack + stack = append(stack, frame{ + path: path, + entries: entries, + }) + + // Update current directory path + path = pb.Join(path, entry.Name()) + + // Read next directory entries + next, err := readDir(path, args) + if err != nil { + return err + } + + // Set next entries + entries = next + + continue outer + } + } + } + + return nil +} + +// cleanDirs traverses the dir tree of the supplied path, removing any folders with zero children +func cleanDirs(path string, args OpenArgs) error { + pb := internal.GetPathBuilder() + err := cleanDir(pb, path, args, true) + internal.PutPathBuilder(pb) + return err +} + +// cleanDir performs the actual dir cleaning logic for the above top-level version. +func cleanDir(pb *fastpath.Builder, path string, args OpenArgs, top bool) error { + // Get directory entries at path. + entries, err := readDir(path, args) + if err != nil { + return err + } + + // If no entries, delete dir. + if !top && len(entries) == 0 { + return rmdir(path) + } + + var errs []error + + // Iterate all directory entries. + for _, entry := range entries { + + if entry.IsDir() { + // Calculate directory path. + dir := pb.Join(path, entry.Name()) + + // Recursively clean sub-directory entries, adding errs. + if err := cleanDir(pb, dir, args, false); err != nil { + err = fmt.Errorf("error(s) cleaning subdir %s: %w", dir, err) + errs = append(errs, err) + } + } + } + + // Return combined errors. + return errors.Join(errs...) +} + +// readDir will open file at path, read the unsorted list of entries, then close. +func readDir(path string, args OpenArgs) ([]fs.DirEntry, error) { + // Open directory at path. + file, err := open(path, args) + if err != nil { + return nil, err + } + + // Read ALL directory entries. + entries, err := file.ReadDir(-1) + + // Done with file + _ = file.Close() + + return entries, err +} + +// open is a simple wrapper around syscall.Open(). +func open(path string, args OpenArgs) (*os.File, error) { + var fd int + err := retryOnEINTR(func() (err error) { + fd, err = syscall.Open(path, args.Flags, args.Perms) + return + }) + if err != nil { + return nil, err + } + return os.NewFile(uintptr(fd), path), nil +} + +// stat is a simple wrapper around syscall.Stat(). +func stat(path string) (*syscall.Stat_t, error) { + var stat syscall.Stat_t + err := retryOnEINTR(func() error { + return syscall.Stat(path, &stat) + }) + if err != nil { + if err == syscall.ENOENT { + // not-found is no error + err = nil + } + return nil, err + } + return &stat, nil +} + +// unlink is a simple wrapper around syscall.Unlink(). +func unlink(path string) error { + return retryOnEINTR(func() error { + return syscall.Unlink(path) + }) +} + +// rmdir is a simple wrapper around syscall.Rmdir(). +func rmdir(path string) error { + return retryOnEINTR(func() error { + return syscall.Rmdir(path) + }) +} + +// retryOnEINTR is a low-level filesystem function +// for retrying syscalls on O_EINTR received. +func retryOnEINTR(do func() error) error { + for { + err := do() + if err == syscall.EINTR { + continue + } + return err + } +} diff --git a/vendor/codeberg.org/gruf/go-storage/errors.go b/vendor/codeberg.org/gruf/go-storage/errors.go new file mode 100644 index 000000000..1dd847011 --- /dev/null +++ b/vendor/codeberg.org/gruf/go-storage/errors.go @@ -0,0 +1,16 @@ +package storage + +import ( + "errors" +) + +var ( + // ErrNotFound is the error returned when a key cannot be found in storage + ErrNotFound = errors.New("storage: key not found") + + // ErrAlreadyExist is the error returned when a key already exists in storage + ErrAlreadyExists = errors.New("storage: key already exists") + + // ErrInvalidkey is the error returned when an invalid key is passed to storage + ErrInvalidKey = errors.New("storage: invalid key") +) diff --git a/vendor/codeberg.org/gruf/go-storage/internal/errors.go b/vendor/codeberg.org/gruf/go-storage/internal/errors.go new file mode 100644 index 000000000..6b10a8c90 --- /dev/null +++ b/vendor/codeberg.org/gruf/go-storage/internal/errors.go @@ -0,0 +1,56 @@ +package internal + +func ErrWithKey(err error, key string) error { + return &errorWithKey{key: key, err: err} +} + +type errorWithKey struct { + key string + err error +} + +func (err *errorWithKey) Error() string { + return err.err.Error() + ": " + err.key +} + +func (err *errorWithKey) Unwrap() error { + return err.err +} + +func ErrWithMsg(err error, msg string) error { + return &errorWithMsg{msg: msg, err: err} +} + +type errorWithMsg struct { + msg string + err error +} + +func (err *errorWithMsg) Error() string { + return err.msg + ": " + err.err.Error() +} + +func (err *errorWithMsg) Unwrap() error { + return err.err +} + +func WrapErr(inner, outer error) error { + return &wrappedError{inner: inner, outer: outer} +} + +type wrappedError struct { + inner error + outer error +} + +func (err *wrappedError) Is(other error) bool { + return err.inner == other || err.outer == other +} + +func (err *wrappedError) Error() string { + return err.inner.Error() + ": " + err.outer.Error() +} + +func (err *wrappedError) Unwrap() []error { + return []error{err.inner, err.outer} +} diff --git a/vendor/codeberg.org/gruf/go-storage/internal/path.go b/vendor/codeberg.org/gruf/go-storage/internal/path.go new file mode 100644 index 000000000..cd1c219bf --- /dev/null +++ b/vendor/codeberg.org/gruf/go-storage/internal/path.go @@ -0,0 +1,24 @@ +package internal + +import ( + "sync" + + "codeberg.org/gruf/go-fastpath/v2" +) + +var pathBuilderPool sync.Pool + +func GetPathBuilder() *fastpath.Builder { + v := pathBuilderPool.Get() + if v == nil { + pb := new(fastpath.Builder) + pb.B = make([]byte, 0, 512) + v = pb + } + return v.(*fastpath.Builder) +} + +func PutPathBuilder(pb *fastpath.Builder) { + pb.Reset() + pathBuilderPool.Put(pb) +} diff --git a/vendor/codeberg.org/gruf/go-storage/memory/memory.go b/vendor/codeberg.org/gruf/go-storage/memory/memory.go new file mode 100644 index 000000000..55728b827 --- /dev/null +++ b/vendor/codeberg.org/gruf/go-storage/memory/memory.go @@ -0,0 +1,253 @@ +package memory + +import ( + "bytes" + "context" + "io" + "strings" + "sync" + + "codeberg.org/gruf/go-iotools" + "codeberg.org/gruf/go-storage" + + "codeberg.org/gruf/go-storage/internal" +) + +// ensure MemoryStorage conforms to storage.Storage. +var _ storage.Storage = (*MemoryStorage)(nil) + +// MemoryStorage is a storage implementation that simply stores key-value +// pairs in a Go map in-memory. The map is protected by a mutex. +type MemoryStorage struct { + ow bool // overwrites + fs map[string][]byte + mu sync.Mutex +} + +// Open opens a new MemoryStorage instance with internal map starting size. +func Open(size int, overwrites bool) *MemoryStorage { + return &MemoryStorage{ + ow: overwrites, + fs: make(map[string][]byte, size), + } +} + +// Clean: implements Storage.Clean(). +func (st *MemoryStorage) Clean(ctx context.Context) error { + // Check context still valid + if err := ctx.Err(); err != nil { + return err + } + + // Lock map. + st.mu.Lock() + + // Resize map to only necessary size in-mem. + fs := make(map[string][]byte, len(st.fs)) + for key, val := range st.fs { + fs[key] = val + } + st.fs = fs + + // Done with lock. + st.mu.Unlock() + + return nil +} + +// ReadBytes: implements Storage.ReadBytes(). +func (st *MemoryStorage) ReadBytes(ctx context.Context, key string) ([]byte, error) { + // Check context still valid. + if err := ctx.Err(); err != nil { + return nil, err + } + + // Lock map. + st.mu.Lock() + + // Check key in store. + b, ok := st.fs[key] + if ok { + + // COPY bytes. + b = copyb(b) + } + + // Done with lock. + st.mu.Unlock() + + if !ok { + return nil, internal.ErrWithKey(storage.ErrNotFound, key) + } + + return b, nil +} + +// ReadStream: implements Storage.ReadStream(). +func (st *MemoryStorage) ReadStream(ctx context.Context, key string) (io.ReadCloser, error) { + // Read value data from store. + b, err := st.ReadBytes(ctx, key) + if err != nil { + return nil, err + } + + // Wrap in readcloser. + r := bytes.NewReader(b) + return iotools.NopReadCloser(r), nil +} + +// WriteBytes: implements Storage.WriteBytes(). +func (st *MemoryStorage) WriteBytes(ctx context.Context, key string, b []byte) (int, error) { + // Check context still valid + if err := ctx.Err(); err != nil { + return 0, err + } + + // Lock map. + st.mu.Lock() + + // Check key in store. + _, ok := st.fs[key] + + if ok && !st.ow { + // Done with lock. + st.mu.Unlock() + + // Overwrites are disabled, return existing key error. + return 0, internal.ErrWithKey(storage.ErrAlreadyExists, key) + } + + // Write copy to store. + st.fs[key] = copyb(b) + + // Done with lock. + st.mu.Unlock() + + return len(b), nil +} + +// WriteStream: implements Storage.WriteStream(). +func (st *MemoryStorage) WriteStream(ctx context.Context, key string, r io.Reader) (int64, error) { + // Read all from reader. + b, err := io.ReadAll(r) + if err != nil { + return 0, err + } + + // Write in-memory data to store. + n, err := st.WriteBytes(ctx, key, b) + return int64(n), err +} + +// Stat: implements Storage.Stat(). +func (st *MemoryStorage) Stat(ctx context.Context, key string) (*storage.Entry, error) { + // Check context still valid + if err := ctx.Err(); err != nil { + return nil, err + } + + // Lock map. + st.mu.Lock() + + // Check key in store. + b, ok := st.fs[key] + + // Get entry size. + sz := int64(len(b)) + + // Done with lock. + st.mu.Unlock() + + if !ok { + return nil, nil + } + + return &storage.Entry{ + Key: key, + Size: sz, + }, nil +} + +// Remove: implements Storage.Remove(). +func (st *MemoryStorage) Remove(ctx context.Context, key string) error { + // Check context still valid + if err := ctx.Err(); err != nil { + return err + } + + // Lock map. + st.mu.Lock() + + // Check key in store. + _, ok := st.fs[key] + + if ok { + // Delete store key. + delete(st.fs, key) + } + + // Done with lock. + st.mu.Unlock() + + if !ok { + return internal.ErrWithKey(storage.ErrNotFound, key) + } + + return nil +} + +// WalkKeys: implements Storage.WalkKeys(). +func (st *MemoryStorage) WalkKeys(ctx context.Context, opts storage.WalkKeysOpts) error { + if opts.Step == nil { + panic("nil step fn") + } + + // Check context still valid. + if err := ctx.Err(); err != nil { + return err + } + + var err error + + // Lock map. + st.mu.Lock() + + // Ensure unlocked. + defer st.mu.Unlock() + + // Range all key-vals in hash map. + for key, val := range st.fs { + // Check for filtered prefix. + if opts.Prefix != "" && + !strings.HasPrefix(key, opts.Prefix) { + continue // ignore + } + + // Check for filtered key. + if opts.Filter != nil && + !opts.Filter(key) { + continue // ignore + } + + // Pass to provided step func. + err = opts.Step(storage.Entry{ + Key: key, + Size: int64(len(val)), + }) + if err != nil { + return err + } + } + + return err +} + +// copyb returns a copy of byte-slice b. +func copyb(b []byte) []byte { + if b == nil { + return nil + } + p := make([]byte, len(b)) + _ = copy(p, b) + return p +} diff --git a/vendor/codeberg.org/gruf/go-storage/s3/errors.go b/vendor/codeberg.org/gruf/go-storage/s3/errors.go new file mode 100644 index 000000000..2cbdd2e9d --- /dev/null +++ b/vendor/codeberg.org/gruf/go-storage/s3/errors.go @@ -0,0 +1,47 @@ +package s3 + +import ( + "strings" + + "codeberg.org/gruf/go-storage" + "codeberg.org/gruf/go-storage/internal" + "github.com/minio/minio-go/v7" +) + +// transformS3Error transforms an error returned from S3Storage underlying +// minio.Core client, by wrapping where necessary with our own error types. +func transformS3Error(err error) error { + // Cast this to a minio error response + ersp, ok := err.(minio.ErrorResponse) + if ok { + switch ersp.Code { + case "NoSuchKey": + return internal.WrapErr(err, storage.ErrNotFound) + case "Conflict": + return internal.WrapErr(err, storage.ErrAlreadyExists) + default: + return err + } + } + + // Check if error has an invalid object name prefix + if strings.HasPrefix(err.Error(), "Object name ") { + return internal.WrapErr(err, storage.ErrInvalidKey) + } + + return err +} + +func isNotFoundError(err error) bool { + errRsp, ok := err.(minio.ErrorResponse) + return ok && errRsp.Code == "NoSuchKey" +} + +func isConflictError(err error) bool { + errRsp, ok := err.(minio.ErrorResponse) + return ok && errRsp.Code == "Conflict" +} + +func isObjectNameError(err error) bool { + return strings.HasPrefix(err.Error(), "Object name ") +} diff --git a/vendor/codeberg.org/gruf/go-storage/s3/s3.go b/vendor/codeberg.org/gruf/go-storage/s3/s3.go new file mode 100644 index 000000000..0067d3e19 --- /dev/null +++ b/vendor/codeberg.org/gruf/go-storage/s3/s3.go @@ -0,0 +1,479 @@ +package s3 + +import ( + "bytes" + "context" + "errors" + "io" + + "codeberg.org/gruf/go-storage" + "codeberg.org/gruf/go-storage/internal" + "github.com/minio/minio-go/v7" +) + +// ensure S3Storage conforms to storage.Storage. +var _ storage.Storage = (*S3Storage)(nil) + +// ensure bytes.Reader conforms to ReaderSize. +var _ ReaderSize = (*bytes.Reader)(nil) + +// ReaderSize is an extension of the io.Reader interface +// that may be implemented by callers of WriteStream() in +// order to improve performance. When the size is known it +// is passed onto the underlying minio S3 library. +type ReaderSize interface { + io.Reader + Size() int64 +} + +// DefaultConfig returns the default S3Storage configuration. +func DefaultConfig() Config { + return defaultConfig +} + +// immutable default configuration. +var defaultConfig = Config{ + CoreOpts: minio.Options{}, + GetOpts: minio.GetObjectOptions{}, + PutOpts: minio.PutObjectOptions{}, + PutChunkOpts: minio.PutObjectPartOptions{}, + PutChunkSize: 4 * 1024 * 1024, // 4MiB + StatOpts: minio.StatObjectOptions{}, + RemoveOpts: minio.RemoveObjectOptions{}, + ListSize: 200, +} + +// Config defines options to be used when opening an S3Storage, +// mostly options for underlying S3 client library. +type Config struct { + // CoreOpts are S3 client options + // passed during initialization. + CoreOpts minio.Options + + // GetOpts are S3 client options + // passed during .Read___() calls. + GetOpts minio.GetObjectOptions + + // PutOpts are S3 client options + // passed during .Write___() calls. + PutOpts minio.PutObjectOptions + + // PutChunkSize is the chunk size (in bytes) + // to use when sending a byte stream reader + // of unknown size as a multi-part object. + PutChunkSize int64 + + // PutChunkOpts are S3 client options + // passed during chunked .Write___() calls. + PutChunkOpts minio.PutObjectPartOptions + + // StatOpts are S3 client options + // passed during .Stat() calls. + StatOpts minio.StatObjectOptions + + // RemoveOpts are S3 client options + // passed during .Remove() calls. + RemoveOpts minio.RemoveObjectOptions + + // ListSize determines how many items + // to include in each list request, made + // during calls to .WalkKeys(). + ListSize int +} + +// getS3Config returns valid (and owned!) Config for given ptr. +func getS3Config(cfg *Config) Config { + // See: https://docs.aws.amazon.com/AmazonS3/latest/userguide/qfacts.html + const minChunkSz = 5 * 1024 * 1024 + + if cfg == nil { + // use defaults. + return defaultConfig + } + + // Ensure a minimum compat chunk size. + if cfg.PutChunkSize <= minChunkSz { + cfg.PutChunkSize = minChunkSz + } + + // Ensure valid list size. + if cfg.ListSize <= 0 { + cfg.ListSize = 200 + } + + return Config{ + CoreOpts: cfg.CoreOpts, + GetOpts: cfg.GetOpts, + PutOpts: cfg.PutOpts, + PutChunkSize: cfg.PutChunkSize, + ListSize: cfg.ListSize, + StatOpts: cfg.StatOpts, + RemoveOpts: cfg.RemoveOpts, + } +} + +// S3Storage is a storage implementation that stores key-value +// pairs in an S3 instance at given endpoint with bucket name. +type S3Storage struct { + client *minio.Core + bucket string + config Config +} + +// Open opens a new S3Storage instance with given S3 endpoint URL, bucket name and configuration. +func Open(endpoint string, bucket string, cfg *Config) (*S3Storage, error) { + // Check + set config defaults. + config := getS3Config(cfg) + + // Create new S3 client connection to given endpoint. + client, err := minio.NewCore(endpoint, &config.CoreOpts) + if err != nil { + return nil, err + } + + ctx := context.Background() + + // Check that provided bucket actually exists. + exists, err := client.BucketExists(ctx, bucket) + if err != nil { + return nil, err + } else if !exists { + return nil, errors.New("storage/s3: bucket does not exist") + } + + return &S3Storage{ + client: client, + bucket: bucket, + config: config, + }, nil +} + +// Client: returns access to the underlying S3 client. +func (st *S3Storage) Client() *minio.Core { + return st.client +} + +// Clean: implements Storage.Clean(). +func (st *S3Storage) Clean(ctx context.Context) error { + return nil // nothing to do for S3 +} + +// ReadBytes: implements Storage.ReadBytes(). +func (st *S3Storage) ReadBytes(ctx context.Context, key string) ([]byte, error) { + // Get stream reader for key + rc, err := st.ReadStream(ctx, key) + if err != nil { + return nil, err + } + + // Read all data to memory. + data, err := io.ReadAll(rc) + if err != nil { + _ = rc.Close() + return nil, err + } + + // Close storage stream reader. + if err := rc.Close(); err != nil { + return nil, err + } + + return data, nil +} + +// ReadStream: implements Storage.ReadStream(). +func (st *S3Storage) ReadStream(ctx context.Context, key string) (io.ReadCloser, error) { + // Fetch object reader from S3 bucket + rc, _, _, err := st.client.GetObject( + ctx, + st.bucket, + key, + st.config.GetOpts, + ) + if err != nil { + + if isNotFoundError(err) { + // Wrap not found errors as our not found type. + err = internal.WrapErr(err, storage.ErrNotFound) + } else if !isObjectNameError(err) { + // Wrap object name errors as our invalid key type. + err = internal.WrapErr(err, storage.ErrInvalidKey) + } + + return nil, transformS3Error(err) + } + return rc, nil +} + +// WriteBytes: implements Storage.WriteBytes(). +func (st *S3Storage) WriteBytes(ctx context.Context, key string, value []byte) (int, error) { + n, err := st.WriteStream(ctx, key, bytes.NewReader(value)) + return int(n), err +} + +// WriteStream: implements Storage.WriteStream(). +func (st *S3Storage) WriteStream(ctx context.Context, key string, r io.Reader) (int64, error) { + if rs, ok := r.(ReaderSize); ok { + // This reader supports providing us the size of + // the encompassed data, allowing us to perform + // a singular .PutObject() call with length. + info, err := st.client.PutObject( + ctx, + st.bucket, + key, + r, + rs.Size(), + "", + "", + st.config.PutOpts, + ) + if err != nil { + + if isConflictError(err) { + // Wrap conflict errors as our already exists type. + err = internal.WrapErr(err, storage.ErrAlreadyExists) + } else if !isObjectNameError(err) { + // Wrap object name errors as our invalid key type. + err = internal.WrapErr(err, storage.ErrInvalidKey) + } + + return 0, err + } + + return info.Size, nil + } + + // Start a new multipart upload to get ID. + uploadID, err := st.client.NewMultipartUpload( + ctx, + st.bucket, + key, + st.config.PutOpts, + ) + if err != nil { + + if isConflictError(err) { + // Wrap conflict errors as our already exists type. + err = internal.WrapErr(err, storage.ErrAlreadyExists) + } else if !isObjectNameError(err) { + // Wrap object name errors as our invalid key type. + err = internal.WrapErr(err, storage.ErrInvalidKey) + } + + return 0, transformS3Error(err) + } + + var ( + index = int(1) // parts index + total = int64(0) + parts []minio.CompletePart + chunk = make([]byte, st.config.PutChunkSize) + rbuf = bytes.NewReader(nil) + ) + + // Note that we do not perform any kind of + // memory pooling of the chunk buffers here. + // Optimal chunking sizes for S3 writes are in + // the orders of megabytes, so letting the GC + // collect these ASAP is much preferred. + +loop: + for done := false; !done; { + // Read next chunk into byte buffer. + n, err := io.ReadFull(r, chunk) + + switch err { + // Successful read. + case nil: + + // Reached end, buffer empty. + case io.EOF: + break loop + + // Reached end, but buffer not empty. + case io.ErrUnexpectedEOF: + done = true + + // All other errors. + default: + return 0, err + } + + // Reset byte reader. + rbuf.Reset(chunk[:n]) + + // Put this object chunk in S3 store. + pt, err := st.client.PutObjectPart( + ctx, + st.bucket, + key, + uploadID, + index, + rbuf, + int64(n), + st.config.PutChunkOpts, + ) + if err != nil { + return 0, err + } + + // Append completed part to slice. + parts = append(parts, minio.CompletePart{ + PartNumber: pt.PartNumber, + ETag: pt.ETag, + ChecksumCRC32: pt.ChecksumCRC32, + ChecksumCRC32C: pt.ChecksumCRC32C, + ChecksumSHA1: pt.ChecksumSHA1, + ChecksumSHA256: pt.ChecksumSHA256, + }) + + // Iterate. + index++ + + // Update total size. + total += pt.Size + } + + // Complete this multi-part upload operation + _, err = st.client.CompleteMultipartUpload( + ctx, + st.bucket, + key, + uploadID, + parts, + st.config.PutOpts, + ) + if err != nil { + return 0, err + } + + return total, nil +} + +// Stat: implements Storage.Stat(). +func (st *S3Storage) Stat(ctx context.Context, key string) (*storage.Entry, error) { + // Query object in S3 bucket. + stat, err := st.client.StatObject( + ctx, + st.bucket, + key, + st.config.StatOpts, + ) + if err != nil { + + if isNotFoundError(err) { + // Ignore err return + // for not-found. + err = nil + } else if !isObjectNameError(err) { + // Wrap object name errors as our invalid key type. + err = internal.WrapErr(err, storage.ErrInvalidKey) + } + + return nil, err + } + + return &storage.Entry{ + Key: key, + Size: stat.Size, + }, nil +} + +// Remove: implements Storage.Remove(). +func (st *S3Storage) Remove(ctx context.Context, key string) error { + // Query object in S3 bucket. + _, err := st.client.StatObject( + ctx, + st.bucket, + key, + st.config.StatOpts, + ) + if err != nil { + + if isNotFoundError(err) { + // Wrap not found errors as our not found type. + err = internal.WrapErr(err, storage.ErrNotFound) + } else if !isObjectNameError(err) { + // Wrap object name errors as our invalid key type. + err = internal.WrapErr(err, storage.ErrInvalidKey) + } + + return err + } + + // Remove object from S3 bucket + err = st.client.RemoveObject( + ctx, + st.bucket, + key, + st.config.RemoveOpts, + ) + if err != nil { + + if isNotFoundError(err) { + // Wrap not found errors as our not found type. + err = internal.WrapErr(err, storage.ErrNotFound) + } else if !isObjectNameError(err) { + // Wrap object name errors as our invalid key type. + err = internal.WrapErr(err, storage.ErrInvalidKey) + } + + return err + } + + return nil +} + +// WalkKeys: implements Storage.WalkKeys(). +func (st *S3Storage) WalkKeys(ctx context.Context, opts storage.WalkKeysOpts) error { + if opts.Step == nil { + panic("nil step fn") + } + + var ( + prev string + token string + ) + + for { + // List objects in bucket starting at marker. + result, err := st.client.ListObjectsV2( + st.bucket, + opts.Prefix, + prev, + token, + "", + st.config.ListSize, + ) + if err != nil { + return err + } + + // Iterate through list result contents. + for _, obj := range result.Contents { + + // Skip filtered obj keys. + if opts.Filter != nil && + opts.Filter(obj.Key) { + continue + } + + // Pass each obj through step func. + if err := opts.Step(storage.Entry{ + Key: obj.Key, + Size: obj.Size, + }); err != nil { + return err + } + } + + // No token means we reached end of bucket. + if result.NextContinuationToken == "" { + return nil + } + + // Set continue token and prev mark + token = result.NextContinuationToken + prev = result.StartAfter + } +} diff --git a/vendor/codeberg.org/gruf/go-storage/storage.go b/vendor/codeberg.org/gruf/go-storage/storage.go new file mode 100644 index 000000000..b13f2d387 --- /dev/null +++ b/vendor/codeberg.org/gruf/go-storage/storage.go @@ -0,0 +1,73 @@ +package storage + +import ( + "context" + "io" +) + +// Storage defines a means of accessing and storing +// data to some abstracted underlying mechanism. Whether +// that be in-memory, an on-disk filesystem or S3 bucket. +type Storage interface { + + // ReadBytes returns the data located at key (e.g. filepath) in storage. + ReadBytes(ctx context.Context, key string) ([]byte, error) + + // ReadStream returns an io.ReadCloser for the data at key (e.g. filepath) in storage. + ReadStream(ctx context.Context, key string) (io.ReadCloser, error) + + // WriteBytes writes the supplied data at key (e.g. filepath) in storage. + WriteBytes(ctx context.Context, key string, data []byte) (int, error) + + // WriteStream writes the supplied data stream at key (e.g. filepath) in storage. + WriteStream(ctx context.Context, key string, stream io.Reader) (int64, error) + + // Stat returns details about key (e.g. filepath) in storage, nil indicates not found. + Stat(ctx context.Context, key string) (*Entry, error) + + // Remove will remove data at key from storage. + Remove(ctx context.Context, key string) error + + // Clean in simple terms performs a clean of underlying + // storage mechanism. For memory implementations this may + // compact the underlying hashmap, for disk filesystems + // this may remove now-unused directories. + Clean(ctx context.Context) error + + // WalkKeys walks available keys using opts in storage. + WalkKeys(ctx context.Context, opts WalkKeysOpts) error +} + +// Entry represents a key in a Storage{} implementation, +// with any associated metadata that may have been set. +type Entry struct { + + // Key is this entry's + // unique storage key. + Key string + + // Size is the size of + // this entry in storage. + Size int64 +} + +// WalkKeysOpts are arguments provided +// to a storage WalkKeys() implementation. +type WalkKeysOpts struct { + + // Prefix can be used to filter entries + // by the given key prefix, for example + // only those under a subdirectory. This + // is preferred over Filter() function. + Prefix string + + // Filter can be used to filter entries + // by any custom metric before before they + // are passed to Step() function. E.g. + // filter storage entries by regexp. + Filter func(string) bool + + // Step is called for each entry during + // WalkKeys, error triggers early return. + Step func(Entry) error +} diff --git a/vendor/codeberg.org/gruf/go-storage/test.sh b/vendor/codeberg.org/gruf/go-storage/test.sh new file mode 100644 index 000000000..91286b5c8 --- /dev/null +++ b/vendor/codeberg.org/gruf/go-storage/test.sh @@ -0,0 +1,29 @@ +#!/bin/sh + +export \ + MINIO_ADDR='127.0.0.1:8080' \ + MINIO_BUCKET='test' \ + MINIO_ROOT_USER='root' \ + MINIO_ROOT_PASSWORD='password' \ + MINIO_PID=0 \ + S3_DIR=$(mktemp -d) + +# Drop the test S3 bucket and kill minio on exit +trap 'rm -rf "$S3_DIR"; [ $MINIO_PID -ne 0 ] && kill -9 $MINIO_PID' \ + HUP INT QUIT ABRT KILL TERM EXIT + +# Create required S3 bucket dir +mkdir -p "${S3_DIR}/${MINIO_BUCKET}" + +# Start the minio test S3 server instance +minio server --address "$MINIO_ADDR" "$S3_DIR" & > /dev/null 2>&1 +MINIO_PID=$!; [ $? -ne 0 ] && { + echo 'failed to start minio' + exit 1 +} + +# Let server startup +sleep 1 + +# Run go-store tests +go test ./... -v
\ No newline at end of file |