diff options
| author | 2024-05-22 09:46:24 +0000 | |
|---|---|---|
| committer | 2024-05-22 11:46:24 +0200 | |
| commit | 3d3e99ae52ff8895b840cbced2e55b5f849fd4be (patch) | |
| tree | c646d5eb99368028a2fbdafbe2c4400059d8eed5 /vendor/codeberg.org/gruf/go-store/v2/storage | |
| parent | --- (#2923) (diff) | |
| download | gotosocial-3d3e99ae52ff8895b840cbced2e55b5f849fd4be.tar.xz | |
[performance] update storage backend and make use of seek syscall when available (#2924)
* update to use go-storage/ instead of go-store/v2/storage/
* pull in latest version from codeberg
* remove test output :innocent:
* add code comments
* set the exclusive bit when creating new files in disk config
* bump to actual release version
* bump to v0.1.1 (tis a simple no-logic change)
* update readme
* only use a temporary read seeker when decoding video if required (should only be S3 now)
* use fastcopy library to use memory pooled buffers when calling TempFileSeeker()
* update to use seek call in serveFileRange()
Diffstat (limited to 'vendor/codeberg.org/gruf/go-store/v2/storage')
11 files changed, 0 insertions, 2732 deletions
diff --git a/vendor/codeberg.org/gruf/go-store/v2/storage/block.archived b/vendor/codeberg.org/gruf/go-store/v2/storage/block.archived deleted file mode 100644 index 11a757211..000000000 --- a/vendor/codeberg.org/gruf/go-store/v2/storage/block.archived +++ /dev/null @@ -1,887 +0,0 @@ -package storage - -import ( - "bytes" - "context" - "crypto/sha256" - "fmt" - "io" - "io/fs" - "os" - "strings" - "sync" - "sync/atomic" - "syscall" - - "codeberg.org/gruf/go-byteutil" - "codeberg.org/gruf/go-errors/v2" - "codeberg.org/gruf/go-fastcopy" - "codeberg.org/gruf/go-hashenc" - "codeberg.org/gruf/go-iotools" - "codeberg.org/gruf/go-pools" - "codeberg.org/gruf/go-store/v2/util" -) - -var ( - nodePathPrefix = "node/" - blockPathPrefix = "block/" -) - -// DefaultBlockConfig is the default BlockStorage configuration. -var DefaultBlockConfig = &BlockConfig{ - BlockSize: 1024 * 16, - WriteBufSize: 4096, - Overwrite: false, - Compression: NoCompression(), -} - -// BlockConfig defines options to be used when opening a BlockStorage. -type BlockConfig struct { - // BlockSize is the chunking size to use when splitting and storing blocks of data. - BlockSize int - - // ReadBufSize is the buffer size to use when reading node files. - ReadBufSize int - - // WriteBufSize is the buffer size to use when writing file streams. - WriteBufSize int - - // Overwrite allows overwriting values of stored keys in the storage. - Overwrite bool - - // Compression is the Compressor to use when reading / writing files, - // default is no compression. - Compression Compressor -} - -// getBlockConfig returns a valid BlockConfig for supplied ptr. -func getBlockConfig(cfg *BlockConfig) BlockConfig { - // If nil, use default - if cfg == nil { - cfg = DefaultBlockConfig - } - - // Assume nil compress == none - if cfg.Compression == nil { - cfg.Compression = NoCompression() - } - - // Assume 0 chunk size == use default - if cfg.BlockSize <= 0 { - cfg.BlockSize = DefaultBlockConfig.BlockSize - } - - // Assume 0 buf size == use default - if cfg.WriteBufSize <= 0 { - cfg.WriteBufSize = DefaultDiskConfig.WriteBufSize - } - - // Return owned config copy - return BlockConfig{ - BlockSize: cfg.BlockSize, - WriteBufSize: cfg.WriteBufSize, - Overwrite: cfg.Overwrite, - Compression: cfg.Compression, - } -} - -// BlockStorage is a Storage implementation that stores input data as chunks on -// a filesystem. Each value is chunked into blocks of configured size and these -// blocks are stored with name equal to their base64-encoded SHA256 hash-sum. A -// "node" file is finally created containing an array of hashes contained within -// this value. -type BlockStorage struct { - path string // path is the root path of this store - blockPath string // blockPath is the joined root path + block path prefix - nodePath string // nodePath is the joined root path + node path prefix - config BlockConfig // cfg is the supplied configuration for this store - hashPool sync.Pool // hashPool is this store's hashEncoder pool - bufpool pools.BufferPool // bufpool is this store's bytes.Buffer pool - cppool fastcopy.CopyPool // cppool is the prepared io copier with buffer pool - lock *Lock // lock is the opened lockfile for this storage instance - - // NOTE: - // BlockStorage does not need to lock each of the underlying block files - // as the filename itself directly relates to the contents. If there happens - // to be an overwrite, it will just be of the same data since the filename is - // the hash of the data. -} - -// OpenBlock opens a BlockStorage instance for given folder path and configuration. -func OpenBlock(path string, cfg *BlockConfig) (*BlockStorage, error) { - // Acquire path builder - pb := util.GetPathBuilder() - defer util.PutPathBuilder(pb) - - // Clean provided path, ensure ends in '/' (should - // be dir, this helps with file path trimming later) - path = pb.Clean(path) + "/" - - // Get checked config - config := getBlockConfig(cfg) - - // Attempt to open path - file, err := os.OpenFile(path, defaultFileROFlags, defaultDirPerms) - if err != nil { - // If not a not-exist error, return - if !os.IsNotExist(err) { - return nil, err - } - - // Attempt to make store path dirs - err = os.MkdirAll(path, defaultDirPerms) - if err != nil { - return nil, err - } - - // Reopen dir now it's been created - file, err = os.OpenFile(path, defaultFileROFlags, defaultDirPerms) - if err != nil { - return nil, err - } - } - defer file.Close() - - // Double check this is a dir (NOT a file!) - stat, err := file.Stat() - if err != nil { - return nil, err - } else if !stat.IsDir() { - return nil, new_error("path is file") - } - - // Open and acquire storage lock for path - lock, err := OpenLock(pb.Join(path, LockFile)) - if err != nil { - return nil, err - } - - // Figure out the largest size for bufpool slices - bufSz := encodedHashLen - if bufSz < config.BlockSize { - bufSz = config.BlockSize - } - if bufSz < config.WriteBufSize { - bufSz = config.WriteBufSize - } - - // Prepare BlockStorage - st := &BlockStorage{ - path: path, - blockPath: pb.Join(path, blockPathPrefix), - nodePath: pb.Join(path, nodePathPrefix), - config: config, - hashPool: sync.Pool{ - New: func() interface{} { - return newHashEncoder() - }, - }, - bufpool: pools.NewBufferPool(bufSz), - lock: lock, - } - - // Set copypool buffer size - st.cppool.Buffer(config.ReadBufSize) - - return st, nil -} - -// Clean implements storage.Clean(). -func (st *BlockStorage) Clean(ctx context.Context) error { - // Check if open - if st.lock.Closed() { - return ErrClosed - } - - // Check context still valid - if err := ctx.Err(); err != nil { - return err - } - - // Acquire path builder - pb := util.GetPathBuilder() - defer util.PutPathBuilder(pb) - - nodes := map[string]*node{} - - // Walk nodes dir for entries - err := walkDir(pb, st.nodePath, func(npath string, fsentry fs.DirEntry) error { - // Only deal with regular files - if !fsentry.Type().IsRegular() { - return nil - } - - // Get joined node path name - npath = pb.Join(npath, fsentry.Name()) - - // Attempt to open RO file - file, err := open(npath, defaultFileROFlags) - if err != nil { - return err - } - defer file.Close() - - // Alloc new Node + acquire hash buffer for writes - hbuf := st.bufpool.Get() - defer st.bufpool.Put(hbuf) - hbuf.Guarantee(encodedHashLen) - node := node{} - - // Write file contents to node - _, err = io.CopyBuffer( - &nodeWriter{ - node: &node, - buf: hbuf, - }, - file, - nil, - ) - if err != nil { - return err - } - - // Append to nodes slice - nodes[fsentry.Name()] = &node - return nil - }) - - // Handle errors (though nodePath may not have been created yet) - if err != nil && !os.IsNotExist(err) { - return err - } - - // Walk blocks dir for entries - err = walkDir(pb, st.blockPath, func(bpath string, fsentry fs.DirEntry) error { - // Only deal with regular files - if !fsentry.Type().IsRegular() { - return nil - } - - inUse := false - for key, node := range nodes { - if node.removeHash(fsentry.Name()) { - if len(node.hashes) < 1 { - // This node contained hash, and after removal is now empty. - // Remove this node from our tracked nodes slice - delete(nodes, key) - } - inUse = true - } - } - - // Block hash is used by node - if inUse { - return nil - } - - // Get joined block path name - bpath = pb.Join(bpath, fsentry.Name()) - - // Remove this unused block path - return os.Remove(bpath) - }) - - // Handle errors (though blockPath may not have been created yet) - if err != nil && !os.IsNotExist(err) { - return err - } - - // If there are nodes left at this point, they are corrupt - // (i.e. they're referencing block hashes that don't exist) - if len(nodes) > 0 { - nodeKeys := []string{} - for key := range nodes { - nodeKeys = append(nodeKeys, key) - } - return fmt.Errorf("store/storage: corrupted nodes: %v", nodeKeys) - } - - return nil -} - -// ReadBytes implements Storage.ReadBytes(). -func (st *BlockStorage) ReadBytes(ctx context.Context, key string) ([]byte, error) { - // Get stream reader for key - rc, err := st.ReadStream(ctx, key) - if err != nil { - return nil, err - } - defer rc.Close() - - // Read all bytes and return - return io.ReadAll(rc) -} - -// ReadStream implements Storage.ReadStream(). -func (st *BlockStorage) ReadStream(ctx context.Context, key string) (io.ReadCloser, error) { - // Get node file path for key - npath, err := st.nodePathForKey(key) - if err != nil { - return nil, err - } - - // Check if open - if st.lock.Closed() { - return nil, ErrClosed - } - - // Check context still valid - if err := ctx.Err(); err != nil { - return nil, err - } - - // Attempt to open RO file - file, err := open(npath, defaultFileROFlags) - if err != nil { - return nil, errSwapNotFound(err) - } - defer file.Close() - - // Acquire hash buffer for writes - hbuf := st.bufpool.Get() - defer st.bufpool.Put(hbuf) - - var node node - - // Write file contents to node - _, err = st.cppool.Copy( - &nodeWriter{ - node: &node, - buf: hbuf, - }, - file, - ) - if err != nil { - return nil, err - } - - // Prepare block reader and return - return iotools.NopReadCloser(&blockReader{ - storage: st, - node: &node, - }), nil -} - -// readBlock reads the block with hash (key) from the filesystem. -func (st *BlockStorage) readBlock(key string) ([]byte, error) { - // Get block file path for key - bpath := st.blockPathForKey(key) - - // Attempt to open RO file - file, err := open(bpath, defaultFileROFlags) - if err != nil { - return nil, wrap(new_error("corrupted node"), err) - } - defer file.Close() - - // Wrap the file in a compressor - cFile, err := st.config.Compression.Reader(file) - if err != nil { - return nil, wrap(new_error("corrupted node"), err) - } - defer cFile.Close() - - // Read the entire file - return io.ReadAll(cFile) -} - -// WriteBytes implements Storage.WriteBytes(). -func (st *BlockStorage) WriteBytes(ctx context.Context, key string, value []byte) (int, error) { - n, err := st.WriteStream(ctx, key, bytes.NewReader(value)) - return int(n), err -} - -// WriteStream implements Storage.WriteStream(). -func (st *BlockStorage) WriteStream(ctx context.Context, key string, r io.Reader) (int64, error) { - // Get node file path for key - npath, err := st.nodePathForKey(key) - if err != nil { - return 0, err - } - - // Check if open - if st.lock.Closed() { - return 0, ErrClosed - } - - // Check context still valid - if err := ctx.Err(); err != nil { - return 0, err - } - - // Check if this exists - ok, err := stat(key) - if err != nil { - return 0, err - } - - // Check if we allow overwrites - if ok && !st.config.Overwrite { - return 0, ErrAlreadyExists - } - - // Ensure nodes dir (and any leading up to) exists - err = os.MkdirAll(st.nodePath, defaultDirPerms) - if err != nil { - return 0, err - } - - // Ensure blocks dir (and any leading up to) exists - err = os.MkdirAll(st.blockPath, defaultDirPerms) - if err != nil { - return 0, err - } - - var node node - var total atomic.Int64 - - // Acquire HashEncoder - hc := st.hashPool.Get().(*hashEncoder) - defer st.hashPool.Put(hc) - - // Create new waitgroup and OnceError for - // goroutine error tracking and propagating - wg := sync.WaitGroup{} - onceErr := errors.OnceError{} - -loop: - for !onceErr.IsSet() { - // Fetch new buffer for this loop - buf := st.bufpool.Get() - buf.Grow(st.config.BlockSize) - - // Read next chunk - n, err := io.ReadFull(r, buf.B) - switch err { - case nil, io.ErrUnexpectedEOF: - // do nothing - case io.EOF: - st.bufpool.Put(buf) - break loop - default: - st.bufpool.Put(buf) - return 0, err - } - - // Hash the encoded data - sum := hc.EncodeSum(buf.B) - - // Append to the node's hashes - node.hashes = append(node.hashes, sum) - - // If already on disk, skip - has, err := st.statBlock(sum) - if err != nil { - st.bufpool.Put(buf) - return 0, err - } else if has { - st.bufpool.Put(buf) - continue loop - } - - // Check if reached EOF - atEOF := (n < buf.Len()) - - wg.Add(1) - go func() { - // Perform writes in goroutine - - defer func() { - // Defer release + - // signal we're done - st.bufpool.Put(buf) - wg.Done() - }() - - // Write block to store at hash - n, err := st.writeBlock(sum, buf.B[:n]) - if err != nil { - onceErr.Store(err) - return - } - - // Increment total. - total.Add(int64(n)) - }() - - // Break at end - if atEOF { - break loop - } - } - - // Wait, check errors - wg.Wait() - if onceErr.IsSet() { - return 0, onceErr.Load() - } - - // If no hashes created, return - if len(node.hashes) < 1 { - return 0, new_error("no hashes written") - } - - // Prepare to swap error if need-be - errSwap := errSwapNoop - - // Build file RW flags - // NOTE: we performed an initial check for - // this before writing blocks, but if - // the utilizer of this storage didn't - // correctly mutex protect this key then - // someone may have beaten us to the - // punch at writing the node file. - flags := defaultFileRWFlags - if !st.config.Overwrite { - flags |= syscall.O_EXCL - - // Catch + replace err exist - errSwap = errSwapExist - } - - // Attempt to open RW file - file, err := open(npath, flags) - if err != nil { - return 0, errSwap(err) - } - defer file.Close() - - // Acquire write buffer - buf := st.bufpool.Get() - defer st.bufpool.Put(buf) - buf.Grow(st.config.WriteBufSize) - - // Finally, write data to file - _, err = io.CopyBuffer(file, &nodeReader{node: node}, buf.B) - return total.Load(), err -} - -// writeBlock writes the block with hash and supplied value to the filesystem. -func (st *BlockStorage) writeBlock(hash string, value []byte) (int, error) { - // Get block file path for key - bpath := st.blockPathForKey(hash) - - // Attempt to open RW file - file, err := open(bpath, defaultFileRWFlags) - if err != nil { - if err == syscall.EEXIST { - err = nil /* race issue describe in struct NOTE */ - } - return 0, err - } - defer file.Close() - - // Wrap the file in a compressor - cFile, err := st.config.Compression.Writer(file) - if err != nil { - return 0, err - } - defer cFile.Close() - - // Write value to file - return cFile.Write(value) -} - -// statBlock checks for existence of supplied block hash. -func (st *BlockStorage) statBlock(hash string) (bool, error) { - return stat(st.blockPathForKey(hash)) -} - -// Stat implements Storage.Stat() -func (st *BlockStorage) Stat(ctx context.Context, key string) (bool, error) { - // Get node file path for key - kpath, err := st.nodePathForKey(key) - if err != nil { - return false, err - } - - // Check if open - if st.lock.Closed() { - return false, ErrClosed - } - - // Check context still valid - if err := ctx.Err(); err != nil { - return false, err - } - - // Check for file on disk - return stat(kpath) -} - -// Remove implements Storage.Remove(). -func (st *BlockStorage) Remove(ctx context.Context, key string) error { - // Get node file path for key - kpath, err := st.nodePathForKey(key) - if err != nil { - return err - } - - // Check if open - if st.lock.Closed() { - return ErrClosed - } - - // Check context still valid - if err := ctx.Err(); err != nil { - return err - } - - // Remove at path (we know this is file) - if err := unlink(kpath); err != nil { - return errSwapNotFound(err) - } - - return nil -} - -// Close implements Storage.Close(). -func (st *BlockStorage) Close() error { - return st.lock.Close() -} - -// WalkKeys implements Storage.WalkKeys(). -func (st *BlockStorage) WalkKeys(ctx context.Context, opts WalkKeysOptions) error { - // Check if open - if st.lock.Closed() { - return ErrClosed - } - - // Check context still valid - if err := ctx.Err(); err != nil { - return err - } - - // Acquire path builder - pb := util.GetPathBuilder() - defer util.PutPathBuilder(pb) - - // Walk dir for entries - return walkDir(pb, st.nodePath, func(npath string, fsentry fs.DirEntry) error { - if !fsentry.Type().IsRegular() { - // Only deal with regular files - return nil - } - - // Perform provided walk function - return opts.WalkFn(ctx, Entry{ - Key: fsentry.Name(), - Size: -1, - }) - }) -} - -// nodePathForKey calculates the node file path for supplied key. -func (st *BlockStorage) nodePathForKey(key string) (string, error) { - // Path separators are illegal, as directory paths - if strings.Contains(key, "/") || key == "." || key == ".." { - return "", ErrInvalidKey - } - - // Acquire path builder - pb := util.GetPathBuilder() - defer util.PutPathBuilder(pb) - - // Return joined + cleaned node-path - return pb.Join(st.nodePath, key), nil -} - -// blockPathForKey calculates the block file path for supplied hash. -func (st *BlockStorage) blockPathForKey(hash string) string { - pb := util.GetPathBuilder() - defer util.PutPathBuilder(pb) - return pb.Join(st.blockPath, hash) -} - -// hashSeparator is the separating byte between block hashes. -const hashSeparator = byte('\n') - -// node represents the contents of a node file in storage. -type node struct { - hashes []string -} - -// removeHash attempts to remove supplied block hash from the node's hash array. -func (n *node) removeHash(hash string) bool { - for i := 0; i < len(n.hashes); { - if n.hashes[i] == hash { - // Drop this hash from slice - n.hashes = append(n.hashes[:i], n.hashes[i+1:]...) - return true - } - - // Continue iter - i++ - } - return false -} - -// nodeReader is an io.Reader implementation for the node file representation, -// which is useful when calculated node file is being written to the store. -type nodeReader struct { - node node - idx int - last int -} - -func (r *nodeReader) Read(b []byte) (int, error) { - n := 0 - - // '-1' means we missed writing - // hash separator on last iteration - if r.last == -1 { - b[n] = hashSeparator - n++ - r.last = 0 - } - - for r.idx < len(r.node.hashes) { - hash := r.node.hashes[r.idx] - - // Copy into buffer + update read count - m := copy(b[n:], hash[r.last:]) - n += m - - // If incomplete copy, return here - if m < len(hash)-r.last { - r.last = m - return n, nil - } - - // Check we can write last separator - if n == len(b) { - r.last = -1 - return n, nil - } - - // Write separator, iter, reset - b[n] = hashSeparator - n++ - r.idx++ - r.last = 0 - } - - // We reached end of hashes - return n, io.EOF -} - -// nodeWriter is an io.Writer implementation for the node file representation, -// which is useful when calculated node file is being read from the store. -type nodeWriter struct { - node *node - buf *byteutil.Buffer -} - -func (w *nodeWriter) Write(b []byte) (int, error) { - n := 0 - - for { - // Find next hash separator position - idx := bytes.IndexByte(b[n:], hashSeparator) - if idx == -1 { - // Check we shouldn't be expecting it - if w.buf.Len() > encodedHashLen { - return n, new_error("invalid node") - } - - // Write all contents to buffer - w.buf.Write(b[n:]) - return len(b), nil - } - - // Found hash separator, write - // current buf contents to Node hashes - w.buf.Write(b[n : n+idx]) - n += idx + 1 - if w.buf.Len() != encodedHashLen { - return n, new_error("invalid node") - } - - // Append to hashes & reset - w.node.hashes = append(w.node.hashes, w.buf.String()) - w.buf.Reset() - } -} - -// blockReader is an io.Reader implementation for the combined, linked block -// data contained with a node file. Basically, this allows reading value data -// from the store for a given node file. -type blockReader struct { - storage *BlockStorage - node *node - buf []byte - prev int -} - -func (r *blockReader) Read(b []byte) (int, error) { - n := 0 - - // Data left in buf, copy as much as we - // can into supplied read buffer - if r.prev < len(r.buf)-1 { - n += copy(b, r.buf[r.prev:]) - r.prev += n - if n >= len(b) { - return n, nil - } - } - - for { - // Check we have any hashes left - if len(r.node.hashes) < 1 { - return n, io.EOF - } - - // Get next key from slice - key := r.node.hashes[0] - r.node.hashes = r.node.hashes[1:] - - // Attempt to fetch next batch of data - var err error - r.buf, err = r.storage.readBlock(key) - if err != nil { - return n, err - } - r.prev = 0 - - // Copy as much as can from new buffer - m := copy(b[n:], r.buf) - r.prev += m - n += m - - // If we hit end of supplied buf, return - if n >= len(b) { - return n, nil - } - } -} - -var ( - // base64Encoding is our base64 encoding object. - base64Encoding = hashenc.Base64() - - // encodedHashLen is the once-calculated encoded hash-sum length - encodedHashLen = base64Encoding.EncodedLen( - sha256.New().Size(), - ) -) - -// hashEncoder is a HashEncoder with built-in encode buffer. -type hashEncoder struct { - henc hashenc.HashEncoder - ebuf []byte -} - -// newHashEncoder returns a new hashEncoder instance. -func newHashEncoder() *hashEncoder { - return &hashEncoder{ - henc: hashenc.New(sha256.New(), base64Encoding), - ebuf: make([]byte, encodedHashLen), - } -} - -// EncodeSum encodes the src data and returns resulting bytes, only valid until next call to EncodeSum(). -func (henc *hashEncoder) EncodeSum(src []byte) string { - henc.henc.EncodeSum(henc.ebuf, src) - return string(henc.ebuf) -} diff --git a/vendor/codeberg.org/gruf/go-store/v2/storage/block_test.archived b/vendor/codeberg.org/gruf/go-store/v2/storage/block_test.archived deleted file mode 100644 index 8436f067f..000000000 --- a/vendor/codeberg.org/gruf/go-store/v2/storage/block_test.archived +++ /dev/null @@ -1,38 +0,0 @@ -package storage_test - -import ( - "os" - "testing" - - "codeberg.org/gruf/go-store/v2/storage" -) - -func TestBlockStorage(t *testing.T) { - // Set test path, defer deleting it - testPath := "blockstorage.test" - t.Cleanup(func() { - os.RemoveAll(testPath) - }) - - // Open new blockstorage instance - st, err := storage.OpenBlock(testPath, nil) - if err != nil { - t.Fatalf("Failed opening storage: %v", err) - } - - // Attempt multi open of same instance - _, err = storage.OpenBlock(testPath, nil) - if err == nil { - t.Fatal("Successfully opened a locked storage instance") - } - - // Run the storage tests - testStorage(t, st) - - // Test reopen storage path - st, err = storage.OpenBlock(testPath, nil) - if err != nil { - t.Fatalf("Failed opening storage: %v", err) - } - st.Close() -} diff --git a/vendor/codeberg.org/gruf/go-store/v2/storage/compressor.go b/vendor/codeberg.org/gruf/go-store/v2/storage/compressor.go deleted file mode 100644 index bbe02f22d..000000000 --- a/vendor/codeberg.org/gruf/go-store/v2/storage/compressor.go +++ /dev/null @@ -1,303 +0,0 @@ -package storage - -import ( - "bytes" - "io" - "sync" - - "codeberg.org/gruf/go-iotools" - - "github.com/klauspost/compress/gzip" - "github.com/klauspost/compress/snappy" - "github.com/klauspost/compress/zlib" -) - -// Compressor defines a means of compressing/decompressing values going into a key-value store -type Compressor interface { - // Reader returns a new decompressing io.ReadCloser based on supplied (compressed) io.Reader - Reader(io.ReadCloser) (io.ReadCloser, error) - - // Writer returns a new compressing io.WriteCloser based on supplied (uncompressed) io.Writer - Writer(io.WriteCloser) (io.WriteCloser, error) -} - -type gzipCompressor struct { - rpool sync.Pool - wpool sync.Pool -} - -// GZipCompressor returns a new Compressor that implements GZip at default compression level -func GZipCompressor() Compressor { - return GZipCompressorLevel(gzip.DefaultCompression) -} - -// GZipCompressorLevel returns a new Compressor that implements GZip at supplied compression level -func GZipCompressorLevel(level int) Compressor { - // GZip readers immediately check for valid - // header data on allocation / reset, so we - // need a set of valid header data so we can - // iniitialize reader instances in mempool. - hdr := bytes.NewBuffer(nil) - - // Init writer to ensure valid level provided - gw, err := gzip.NewWriterLevel(hdr, level) - if err != nil { - panic(err) - } - - // Write empty data to ensure gzip - // header data is in byte buffer. - _, _ = gw.Write([]byte{}) - _ = gw.Close() - - return &gzipCompressor{ - rpool: sync.Pool{ - New: func() any { - hdr := bytes.NewReader(hdr.Bytes()) - gr, _ := gzip.NewReader(hdr) - return gr - }, - }, - wpool: sync.Pool{ - New: func() any { - gw, _ := gzip.NewWriterLevel(nil, level) - return gw - }, - }, - } -} - -func (c *gzipCompressor) Reader(rc io.ReadCloser) (io.ReadCloser, error) { - var released bool - - // Acquire from pool. - gr := c.rpool.Get().(*gzip.Reader) - if err := gr.Reset(rc); err != nil { - c.rpool.Put(gr) - return nil, err - } - - return iotools.ReadCloser(gr, iotools.CloserFunc(func() error { - if !released { - released = true - defer c.rpool.Put(gr) - } - - // Close compressor - err1 := gr.Close() - - // Close original stream. - err2 := rc.Close() - - // Return err1 or 2 - if err1 != nil { - return err1 - } - return err2 - })), nil -} - -func (c *gzipCompressor) Writer(wc io.WriteCloser) (io.WriteCloser, error) { - var released bool - - // Acquire from pool. - gw := c.wpool.Get().(*gzip.Writer) - gw.Reset(wc) - - return iotools.WriteCloser(gw, iotools.CloserFunc(func() error { - if !released { - released = true - c.wpool.Put(gw) - } - - // Close compressor - err1 := gw.Close() - - // Close original stream. - err2 := wc.Close() - - // Return err1 or 2 - if err1 != nil { - return err1 - } - return err2 - })), nil -} - -type zlibCompressor struct { - rpool sync.Pool - wpool sync.Pool - dict []byte -} - -// ZLibCompressor returns a new Compressor that implements ZLib at default compression level -func ZLibCompressor() Compressor { - return ZLibCompressorLevelDict(zlib.DefaultCompression, nil) -} - -// ZLibCompressorLevel returns a new Compressor that implements ZLib at supplied compression level -func ZLibCompressorLevel(level int) Compressor { - return ZLibCompressorLevelDict(level, nil) -} - -// ZLibCompressorLevelDict returns a new Compressor that implements ZLib at supplied compression level with supplied dict -func ZLibCompressorLevelDict(level int, dict []byte) Compressor { - // ZLib readers immediately check for valid - // header data on allocation / reset, so we - // need a set of valid header data so we can - // iniitialize reader instances in mempool. - hdr := bytes.NewBuffer(nil) - - // Init writer to ensure valid level + dict provided - zw, err := zlib.NewWriterLevelDict(hdr, level, dict) - if err != nil { - panic(err) - } - - // Write empty data to ensure zlib - // header data is in byte buffer. - zw.Write([]byte{}) - zw.Close() - - return &zlibCompressor{ - rpool: sync.Pool{ - New: func() any { - hdr := bytes.NewReader(hdr.Bytes()) - zr, _ := zlib.NewReaderDict(hdr, dict) - return zr - }, - }, - wpool: sync.Pool{ - New: func() any { - zw, _ := zlib.NewWriterLevelDict(nil, level, dict) - return zw - }, - }, - dict: dict, - } -} - -func (c *zlibCompressor) Reader(rc io.ReadCloser) (io.ReadCloser, error) { - var released bool - zr := c.rpool.Get().(interface { - io.ReadCloser - zlib.Resetter - }) - if err := zr.Reset(rc, c.dict); err != nil { - c.rpool.Put(zr) - return nil, err - } - return iotools.ReadCloser(zr, iotools.CloserFunc(func() error { - if !released { - released = true - defer c.rpool.Put(zr) - } - - // Close compressor - err1 := zr.Close() - - // Close original stream. - err2 := rc.Close() - - // Return err1 or 2 - if err1 != nil { - return err1 - } - return err2 - })), nil -} - -func (c *zlibCompressor) Writer(wc io.WriteCloser) (io.WriteCloser, error) { - var released bool - - // Acquire from pool. - zw := c.wpool.Get().(*zlib.Writer) - zw.Reset(wc) - - return iotools.WriteCloser(zw, iotools.CloserFunc(func() error { - if !released { - released = true - c.wpool.Put(zw) - } - - // Close compressor - err1 := zw.Close() - - // Close original stream. - err2 := wc.Close() - - // Return err1 or 2 - if err1 != nil { - return err1 - } - return err2 - })), nil -} - -type snappyCompressor struct { - rpool sync.Pool - wpool sync.Pool -} - -// SnappyCompressor returns a new Compressor that implements Snappy. -func SnappyCompressor() Compressor { - return &snappyCompressor{ - rpool: sync.Pool{ - New: func() any { return snappy.NewReader(nil) }, - }, - wpool: sync.Pool{ - New: func() any { return snappy.NewWriter(nil) }, - }, - } -} - -func (c *snappyCompressor) Reader(rc io.ReadCloser) (io.ReadCloser, error) { - var released bool - - // Acquire from pool. - sr := c.rpool.Get().(*snappy.Reader) - sr.Reset(rc) - - return iotools.ReadCloser(sr, iotools.CloserFunc(func() error { - if !released { - released = true - defer c.rpool.Put(sr) - } - - // Close original stream. - return rc.Close() - })), nil -} - -func (c *snappyCompressor) Writer(wc io.WriteCloser) (io.WriteCloser, error) { - var released bool - - // Acquire from pool. - sw := c.wpool.Get().(*snappy.Writer) - sw.Reset(wc) - - return iotools.WriteCloser(sw, iotools.CloserFunc(func() error { - if !released { - released = true - c.wpool.Put(sw) - } - - // Close original stream. - return wc.Close() - })), nil -} - -type nopCompressor struct{} - -// NoCompression is a Compressor that simply does nothing. -func NoCompression() Compressor { - return &nopCompressor{} -} - -func (c *nopCompressor) Reader(rc io.ReadCloser) (io.ReadCloser, error) { - return rc, nil -} - -func (c *nopCompressor) Writer(wc io.WriteCloser) (io.WriteCloser, error) { - return wc, nil -} diff --git a/vendor/codeberg.org/gruf/go-store/v2/storage/disk.go b/vendor/codeberg.org/gruf/go-store/v2/storage/disk.go deleted file mode 100644 index 3104400f3..000000000 --- a/vendor/codeberg.org/gruf/go-store/v2/storage/disk.go +++ /dev/null @@ -1,424 +0,0 @@ -package storage - -import ( - "context" - "errors" - "io" - "io/fs" - "os" - "path" - _path "path" - "strings" - "syscall" - - "codeberg.org/gruf/go-bytes" - "codeberg.org/gruf/go-fastcopy" - "codeberg.org/gruf/go-store/v2/util" -) - -// DefaultDiskConfig is the default DiskStorage configuration. -var DefaultDiskConfig = &DiskConfig{ - Overwrite: true, - WriteBufSize: 4096, - Transform: NopTransform(), - Compression: NoCompression(), -} - -// DiskConfig defines options to be used when opening a DiskStorage. -type DiskConfig struct { - // Transform is the supplied key <--> path KeyTransform. - Transform KeyTransform - - // WriteBufSize is the buffer size to use when writing file streams. - WriteBufSize int - - // Overwrite allows overwriting values of stored keys in the storage. - Overwrite bool - - // LockFile allows specifying the filesystem path to use for the lockfile, - // providing only a filename it will store the lockfile within provided store - // path and nest the store under `path/store` to prevent access to lockfile. - LockFile string - - // Compression is the Compressor to use when reading / writing files, - // default is no compression. - Compression Compressor -} - -// getDiskConfig returns a valid DiskConfig for supplied ptr. -func getDiskConfig(cfg *DiskConfig) DiskConfig { - // If nil, use default - if cfg == nil { - cfg = DefaultDiskConfig - } - - // Assume nil transform == none - if cfg.Transform == nil { - cfg.Transform = NopTransform() - } - - // Assume nil compress == none - if cfg.Compression == nil { - cfg.Compression = NoCompression() - } - - // Assume 0 buf size == use default - if cfg.WriteBufSize <= 0 { - cfg.WriteBufSize = DefaultDiskConfig.WriteBufSize - } - - // Assume empty lockfile path == use default - if len(cfg.LockFile) == 0 { - cfg.LockFile = LockFile - } - - // Return owned config copy - return DiskConfig{ - Transform: cfg.Transform, - WriteBufSize: cfg.WriteBufSize, - Overwrite: cfg.Overwrite, - LockFile: cfg.LockFile, - Compression: cfg.Compression, - } -} - -// DiskStorage is a Storage implementation that stores directly to a filesystem. -type DiskStorage struct { - path string // path is the root path of this store - cppool fastcopy.CopyPool // cppool is the prepared io copier with buffer pool - config DiskConfig // cfg is the supplied configuration for this store - lock *Lock // lock is the opened lockfile for this storage instance -} - -// OpenDisk opens a DiskStorage instance for given folder path and configuration. -func OpenDisk(path string, cfg *DiskConfig) (*DiskStorage, error) { - // Get checked config - config := getDiskConfig(cfg) - - // Acquire path builder - pb := util.GetPathBuilder() - defer util.PutPathBuilder(pb) - - // Clean provided store path, ensure - // ends in '/' to help later path trimming - storePath := pb.Clean(path) + "/" - - // Clean provided lockfile path - lockfile := pb.Clean(config.LockFile) - - // Check if lockfile is an *actual* path or just filename - if lockDir, _ := _path.Split(lockfile); lockDir == "" { - // Lockfile is a filename, store must be nested under - // $storePath/store to prevent access to the lockfile - storePath += "store/" - lockfile = pb.Join(path, lockfile) - } - - // Attempt to open dir path - file, err := os.OpenFile(storePath, defaultFileROFlags, defaultDirPerms) - if err != nil { - // If not a not-exist error, return - if !os.IsNotExist(err) { - return nil, err - } - - // Attempt to make store path dirs - err = os.MkdirAll(storePath, defaultDirPerms) - if err != nil { - return nil, err - } - - // Reopen dir now it's been created - file, err = os.OpenFile(storePath, defaultFileROFlags, defaultDirPerms) - if err != nil { - return nil, err - } - } - defer file.Close() - - // Double check this is a dir (NOT a file!) - stat, err := file.Stat() - if err != nil { - return nil, err - } else if !stat.IsDir() { - return nil, errors.New("store/storage: path is file") - } - - // Open and acquire storage lock for path - lock, err := OpenLock(lockfile) - if err != nil { - return nil, err - } - - // Prepare DiskStorage - st := &DiskStorage{ - path: storePath, - config: config, - lock: lock, - } - - // Set copypool buffer size - st.cppool.Buffer(config.WriteBufSize) - - return st, nil -} - -// Clean implements Storage.Clean(). -func (st *DiskStorage) Clean(ctx context.Context) error { - // Check if open - if st.lock.Closed() { - return ErrClosed - } - - // Check context still valid - if err := ctx.Err(); err != nil { - return err - } - - // Clean-out unused directories - return cleanDirs(st.path) -} - -// ReadBytes implements Storage.ReadBytes(). -func (st *DiskStorage) ReadBytes(ctx context.Context, key string) ([]byte, error) { - // Get stream reader for key - rc, err := st.ReadStream(ctx, key) - if err != nil { - return nil, err - } - defer rc.Close() - - // Read all bytes and return - return io.ReadAll(rc) -} - -// ReadStream implements Storage.ReadStream(). -func (st *DiskStorage) ReadStream(ctx context.Context, key string) (io.ReadCloser, error) { - // Get file path for key - kpath, err := st.Filepath(key) - if err != nil { - return nil, err - } - - // Check if open - if st.lock.Closed() { - return nil, ErrClosed - } - - // Check context still valid - if err := ctx.Err(); err != nil { - return nil, err - } - - // Attempt to open file (replace ENOENT with our own) - file, err := open(kpath, defaultFileROFlags) - if err != nil { - return nil, errSwapNotFound(err) - } - - // Wrap the file in a compressor - cFile, err := st.config.Compression.Reader(file) - if err != nil { - _ = file.Close() - return nil, err - } - - return cFile, nil -} - -// WriteBytes implements Storage.WriteBytes(). -func (st *DiskStorage) WriteBytes(ctx context.Context, key string, value []byte) (int, error) { - n, err := st.WriteStream(ctx, key, bytes.NewReader(value)) - return int(n), err -} - -// WriteStream implements Storage.WriteStream(). -func (st *DiskStorage) WriteStream(ctx context.Context, key string, r io.Reader) (int64, error) { - // Get file path for key - kpath, err := st.Filepath(key) - if err != nil { - return 0, err - } - - // Check if open - if st.lock.Closed() { - return 0, ErrClosed - } - - // Check context still valid - if err := ctx.Err(); err != nil { - return 0, err - } - - // Ensure dirs leading up to file exist - err = os.MkdirAll(path.Dir(kpath), defaultDirPerms) - if err != nil { - return 0, err - } - - // Prepare to swap error if need-be - errSwap := errSwapNoop - - // Build file RW flags - flags := defaultFileRWFlags - if !st.config.Overwrite { - flags |= syscall.O_EXCL - - // Catch + replace err exist - errSwap = errSwapExist - } - - // Attempt to open file - file, err := open(kpath, flags) - if err != nil { - return 0, errSwap(err) - } - - // Wrap the file in a compressor - cFile, err := st.config.Compression.Writer(file) - if err != nil { - _ = file.Close() - return 0, err - } - - // Wraps file.Close(). - defer cFile.Close() - - // Copy provided reader to file - return st.cppool.Copy(cFile, r) -} - -// Stat implements Storage.Stat(). -func (st *DiskStorage) Stat(ctx context.Context, key string) (bool, error) { - // Get file path for key - kpath, err := st.Filepath(key) - if err != nil { - return false, err - } - - // Check if open - if st.lock.Closed() { - return false, ErrClosed - } - - // Check context still valid - if err := ctx.Err(); err != nil { - return false, err - } - - // Check for file on disk - return stat(kpath) -} - -// Remove implements Storage.Remove(). -func (st *DiskStorage) Remove(ctx context.Context, key string) error { - // Get file path for key - kpath, err := st.Filepath(key) - if err != nil { - return err - } - - // Check if open - if st.lock.Closed() { - return ErrClosed - } - - // Check context still valid - if err := ctx.Err(); err != nil { - return err - } - - // Remove at path (we know this is file) - if err := unlink(kpath); err != nil { - return errSwapNotFound(err) - } - - return nil -} - -// Close implements Storage.Close(). -func (st *DiskStorage) Close() error { - return st.lock.Close() -} - -// WalkKeys implements Storage.WalkKeys(). -func (st *DiskStorage) WalkKeys(ctx context.Context, opts WalkKeysOptions) error { - // Check if open - if st.lock.Closed() { - return ErrClosed - } - - // Check context still valid - if err := ctx.Err(); err != nil { - return err - } - - // Acquire path builder - pb := util.GetPathBuilder() - defer util.PutPathBuilder(pb) - - // Walk dir for entries - return walkDir(pb, st.path, func(kpath string, fsentry fs.DirEntry) error { - if !fsentry.Type().IsRegular() { - // Only deal with regular files - return nil - } - - // Get full item path (without root) - kpath = pb.Join(kpath, fsentry.Name()) - kpath = kpath[len(st.path):] - - // Load file info. This should already - // be loaded due to the underlying call - // to os.File{}.ReadDir() populating them - info, err := fsentry.Info() - if err != nil { - return err - } - - // Perform provided walk function - return opts.WalkFn(ctx, Entry{ - Key: st.config.Transform.PathToKey(kpath), - Size: info.Size(), - }) - }) -} - -// Filepath checks and returns a formatted Filepath for given key. -func (st *DiskStorage) Filepath(key string) (string, error) { - // Calculate transformed key path - key = st.config.Transform.KeyToPath(key) - - // Acquire path builder - pb := util.GetPathBuilder() - defer util.PutPathBuilder(pb) - - // Generate key path - pb.Append(st.path) - pb.Append(key) - - // Check for dir traversal outside of root - if isDirTraversal(st.path, pb.String()) { - return "", ErrInvalidKey - } - - return string(pb.B), nil -} - -// isDirTraversal will check if rootPlusPath is a dir traversal outside of root, -// assuming that both are cleaned and that rootPlusPath is path.Join(root, somePath). -func isDirTraversal(root, rootPlusPath string) bool { - switch { - // Root is $PWD, check for traversal out of - case root == ".": - return strings.HasPrefix(rootPlusPath, "../") - - // The path MUST be prefixed by root - case !strings.HasPrefix(rootPlusPath, root): - return true - - // In all other cases, check not equal - default: - return len(root) == len(rootPlusPath) - } -} diff --git a/vendor/codeberg.org/gruf/go-store/v2/storage/errors.go b/vendor/codeberg.org/gruf/go-store/v2/storage/errors.go deleted file mode 100644 index 4ae7e4be5..000000000 --- a/vendor/codeberg.org/gruf/go-store/v2/storage/errors.go +++ /dev/null @@ -1,110 +0,0 @@ -package storage - -import ( - "errors" - "strings" - "syscall" - - "github.com/minio/minio-go/v7" -) - -var ( - // ErrClosed is returned on operations on a closed storage - ErrClosed = new_error("closed") - - // ErrNotFound is the error returned when a key cannot be found in storage - ErrNotFound = new_error("key not found") - - // ErrAlreadyExist is the error returned when a key already exists in storage - ErrAlreadyExists = new_error("key already exists") - - // ErrInvalidkey is the error returned when an invalid key is passed to storage - ErrInvalidKey = new_error("invalid key") - - // ErrAlreadyLocked is returned on fail opening a storage lockfile - ErrAlreadyLocked = new_error("storage lock already open") -) - -// new_error returns a new error instance prefixed by package prefix. -func new_error(msg string) error { - return errors.New("store/storage: " + msg) -} - -// wrappedError allows wrapping together an inner with outer error. -type wrappedError struct { - inner error - outer error -} - -// wrap will return a new wrapped error from given inner and outer errors. -func wrap(outer, inner error) *wrappedError { - return &wrappedError{ - inner: inner, - outer: outer, - } -} - -func (e *wrappedError) Is(target error) bool { - return e.outer == target || e.inner == target -} - -func (e *wrappedError) Error() string { - return e.outer.Error() + ": " + e.inner.Error() -} - -func (e *wrappedError) Unwrap() error { - return e.inner -} - -// errSwapNoop performs no error swaps -func errSwapNoop(err error) error { - return err -} - -// ErrSwapNotFound swaps syscall.ENOENT for ErrNotFound -func errSwapNotFound(err error) error { - if err == syscall.ENOENT { - return ErrNotFound - } - return err -} - -// errSwapExist swaps syscall.EEXIST for ErrAlreadyExists -func errSwapExist(err error) error { - if err == syscall.EEXIST { - return ErrAlreadyExists - } - return err -} - -// errSwapUnavailable swaps syscall.EAGAIN for ErrAlreadyLocked -func errSwapUnavailable(err error) error { - if err == syscall.EAGAIN { - return ErrAlreadyLocked - } - return err -} - -// transformS3Error transforms an error returned from S3Storage underlying -// minio.Core client, by wrapping where necessary with our own error types. -func transformS3Error(err error) error { - // Cast this to a minio error response - ersp, ok := err.(minio.ErrorResponse) - if ok { - switch ersp.Code { - case "NoSuchKey": - return wrap(ErrNotFound, err) - case "Conflict": - return wrap(ErrAlreadyExists, err) - default: - return err - } - } - - // Check if error has an invalid object name prefix - if strings.HasPrefix(err.Error(), "Object name ") { - return wrap(ErrInvalidKey, err) - } - - return err -} diff --git a/vendor/codeberg.org/gruf/go-store/v2/storage/fs.go b/vendor/codeberg.org/gruf/go-store/v2/storage/fs.go deleted file mode 100644 index be86ac127..000000000 --- a/vendor/codeberg.org/gruf/go-store/v2/storage/fs.go +++ /dev/null @@ -1,208 +0,0 @@ -package storage - -import ( - "fmt" - "io/fs" - "os" - "syscall" - - "codeberg.org/gruf/go-fastpath/v2" - "codeberg.org/gruf/go-store/v2/util" -) - -const ( - // default file permission bits - defaultDirPerms = 0o755 - defaultFilePerms = 0o644 - - // default file open flags - defaultFileROFlags = syscall.O_RDONLY - defaultFileRWFlags = syscall.O_CREAT | syscall.O_RDWR - defaultFileLockFlags = syscall.O_RDONLY | syscall.O_CREAT -) - -// NOTE: -// These functions are for opening storage files, -// not necessarily for e.g. initial setup (OpenFile) - -// walkDir traverses the dir tree of the supplied path, performing the supplied walkFn on each entry -func walkDir(pb *fastpath.Builder, path string, walkFn func(string, fs.DirEntry) error) error { - // Read directory entries - entries, err := readDir(path) - if err != nil { - return err - } - - // frame represents a directory entry - // walk-loop snapshot, taken when a sub - // directory requiring iteration is found - type frame struct { - path string - entries []fs.DirEntry - } - - // stack contains a list of held snapshot - // frames, representing unfinished upper - // layers of a directory structure yet to - // be traversed. - var stack []frame - -outer: - for { - if len(entries) == 0 { - if len(stack) == 0 { - // Reached end - break outer - } - - // Pop frame from stack - frame := stack[len(stack)-1] - stack = stack[:len(stack)-1] - - // Update loop vars - entries = frame.entries - path = frame.path - } - - for len(entries) > 0 { - // Pop next entry from queue - entry := entries[0] - entries = entries[1:] - - // Pass to provided walk function - if err := walkFn(path, entry); err != nil { - return err - } - - if entry.IsDir() { - // Push current frame to stack - stack = append(stack, frame{ - path: path, - entries: entries, - }) - - // Update current directory path - path = pb.Join(path, entry.Name()) - - // Read next directory entries - next, err := readDir(path) - if err != nil { - return err - } - - // Set next entries - entries = next - - continue outer - } - } - } - - return nil -} - -// cleanDirs traverses the dir tree of the supplied path, removing any folders with zero children -func cleanDirs(path string) error { - pb := util.GetPathBuilder() - defer util.PutPathBuilder(pb) - return cleanDir(pb, path, true) -} - -// cleanDir performs the actual dir cleaning logic for the above top-level version. -func cleanDir(pb *fastpath.Builder, path string, top bool) error { - // Get dir entries at path. - entries, err := readDir(path) - if err != nil { - return err - } - - // If no entries, delete dir. - if !top && len(entries) == 0 { - return rmdir(path) - } - - for _, entry := range entries { - if entry.IsDir() { - // Calculate directory path. - dirPath := pb.Join(path, entry.Name()) - - // Recursively clean sub-directory entries. - if err := cleanDir(pb, dirPath, false); err != nil { - fmt.Fprintf(os.Stderr, "[go-store/storage] error cleaning %s: %v", dirPath, err) - } - } - } - - return nil -} - -// readDir will open file at path, read the unsorted list of entries, then close. -func readDir(path string) ([]fs.DirEntry, error) { - // Open file at path - file, err := open(path, defaultFileROFlags) - if err != nil { - return nil, err - } - - // Read directory entries - entries, err := file.ReadDir(-1) - - // Done with file - _ = file.Close() - - return entries, err -} - -// open will open a file at the given path with flags and default file perms. -func open(path string, flags int) (*os.File, error) { - var fd int - err := retryOnEINTR(func() (err error) { - fd, err = syscall.Open(path, flags, defaultFilePerms) - return - }) - if err != nil { - return nil, err - } - return os.NewFile(uintptr(fd), path), nil -} - -// stat checks for a file on disk. -func stat(path string) (bool, error) { - var stat syscall.Stat_t - err := retryOnEINTR(func() error { - return syscall.Stat(path, &stat) - }) - if err != nil { - if err == syscall.ENOENT { - // not-found is no error - err = nil - } - return false, err - } - return true, nil -} - -// unlink removes a file (not dir!) on disk. -func unlink(path string) error { - return retryOnEINTR(func() error { - return syscall.Unlink(path) - }) -} - -// rmdir removes a dir (not file!) on disk. -func rmdir(path string) error { - return retryOnEINTR(func() error { - return syscall.Rmdir(path) - }) -} - -// retryOnEINTR is a low-level filesystem function for retrying syscalls on O_EINTR received. -func retryOnEINTR(do func() error) error { - for { - err := do() - if err == syscall.EINTR { - continue - } - return err - } -} diff --git a/vendor/codeberg.org/gruf/go-store/v2/storage/lock.go b/vendor/codeberg.org/gruf/go-store/v2/storage/lock.go deleted file mode 100644 index 25ecefe52..000000000 --- a/vendor/codeberg.org/gruf/go-store/v2/storage/lock.go +++ /dev/null @@ -1,59 +0,0 @@ -package storage - -import ( - "sync/atomic" - "syscall" -) - -// LockFile is our standard lockfile name. -const LockFile = "store.lock" - -// Lock represents a filesystem lock to ensure only one storage instance open per path. -type Lock struct { - fd int - st uint32 -} - -// OpenLock opens a lockfile at path. -func OpenLock(path string) (*Lock, error) { - var fd int - - // Open the file descriptor at path - err := retryOnEINTR(func() (err error) { - fd, err = syscall.Open(path, defaultFileLockFlags, defaultFilePerms) - return - }) - if err != nil { - return nil, err - } - - // Get a flock on the file descriptor - err = retryOnEINTR(func() error { - return syscall.Flock(fd, syscall.LOCK_EX|syscall.LOCK_NB) - }) - if err != nil { - return nil, errSwapUnavailable(err) - } - - return &Lock{fd: fd}, nil -} - -// Close will attempt to close the lockfile and file descriptor. -func (f *Lock) Close() error { - var err error - if atomic.CompareAndSwapUint32(&f.st, 0, 1) { - // Ensure gets closed - defer syscall.Close(f.fd) - - // Call funlock on the file descriptor - err = retryOnEINTR(func() error { - return syscall.Flock(f.fd, syscall.LOCK_UN|syscall.LOCK_NB) - }) - } - return err -} - -// Closed will return whether this lockfile has been closed (and unlocked). -func (f *Lock) Closed() bool { - return (atomic.LoadUint32(&f.st) == 1) -} diff --git a/vendor/codeberg.org/gruf/go-store/v2/storage/memory.go b/vendor/codeberg.org/gruf/go-store/v2/storage/memory.go deleted file mode 100644 index d42274e39..000000000 --- a/vendor/codeberg.org/gruf/go-store/v2/storage/memory.go +++ /dev/null @@ -1,228 +0,0 @@ -package storage - -import ( - "context" - "io" - "sync/atomic" - - "codeberg.org/gruf/go-bytes" - "codeberg.org/gruf/go-iotools" - "github.com/cornelk/hashmap" -) - -// MemoryStorage is a storage implementation that simply stores key-value -// pairs in a Go map in-memory. The map is protected by a mutex. -type MemoryStorage struct { - ow bool // overwrites - fs *hashmap.Map[string, []byte] - st uint32 -} - -// OpenMemory opens a new MemoryStorage instance with internal map starting size. -func OpenMemory(size int, overwrites bool) *MemoryStorage { - if size <= 0 { - size = 8 - } - return &MemoryStorage{ - fs: hashmap.NewSized[string, []byte](uintptr(size)), - ow: overwrites, - } -} - -// Clean implements Storage.Clean(). -func (st *MemoryStorage) Clean(ctx context.Context) error { - // Check store open - if st.closed() { - return ErrClosed - } - - // Check context still valid - if err := ctx.Err(); err != nil { - return err - } - - return nil -} - -// ReadBytes implements Storage.ReadBytes(). -func (st *MemoryStorage) ReadBytes(ctx context.Context, key string) ([]byte, error) { - // Check store open - if st.closed() { - return nil, ErrClosed - } - - // Check context still valid - if err := ctx.Err(); err != nil { - return nil, err - } - - // Check for key in store - b, ok := st.fs.Get(key) - if !ok { - return nil, ErrNotFound - } - - // Create return copy - return copyb(b), nil -} - -// ReadStream implements Storage.ReadStream(). -func (st *MemoryStorage) ReadStream(ctx context.Context, key string) (io.ReadCloser, error) { - // Check store open - if st.closed() { - return nil, ErrClosed - } - - // Check context still valid - if err := ctx.Err(); err != nil { - return nil, err - } - - // Check for key in store - b, ok := st.fs.Get(key) - if !ok { - return nil, ErrNotFound - } - - // Create io.ReadCloser from 'b' copy - r := bytes.NewReader(copyb(b)) - return iotools.NopReadCloser(r), nil -} - -// WriteBytes implements Storage.WriteBytes(). -func (st *MemoryStorage) WriteBytes(ctx context.Context, key string, b []byte) (int, error) { - // Check store open - if st.closed() { - return 0, ErrClosed - } - - // Check context still valid - if err := ctx.Err(); err != nil { - return 0, err - } - - // Check for key that already exists - if _, ok := st.fs.Get(key); ok && !st.ow { - return 0, ErrAlreadyExists - } - - // Write key copy to store - st.fs.Set(key, copyb(b)) - return len(b), nil -} - -// WriteStream implements Storage.WriteStream(). -func (st *MemoryStorage) WriteStream(ctx context.Context, key string, r io.Reader) (int64, error) { - // Check store open - if st.closed() { - return 0, ErrClosed - } - - // Check context still valid - if err := ctx.Err(); err != nil { - return 0, err - } - - // Check for key that already exists - if _, ok := st.fs.Get(key); ok && !st.ow { - return 0, ErrAlreadyExists - } - - // Read all from reader - b, err := io.ReadAll(r) - if err != nil { - return 0, err - } - - // Write key to store - st.fs.Set(key, b) - return int64(len(b)), nil -} - -// Stat implements Storage.Stat(). -func (st *MemoryStorage) Stat(ctx context.Context, key string) (bool, error) { - // Check store open - if st.closed() { - return false, ErrClosed - } - - // Check context still valid - if err := ctx.Err(); err != nil { - return false, err - } - - // Check for key in store - _, ok := st.fs.Get(key) - return ok, nil -} - -// Remove implements Storage.Remove(). -func (st *MemoryStorage) Remove(ctx context.Context, key string) error { - // Check store open - if st.closed() { - return ErrClosed - } - - // Check context still valid - if err := ctx.Err(); err != nil { - return err - } - - // Attempt to delete key - ok := st.fs.Del(key) - if !ok { - return ErrNotFound - } - - return nil -} - -// WalkKeys implements Storage.WalkKeys(). -func (st *MemoryStorage) WalkKeys(ctx context.Context, opts WalkKeysOptions) error { - // Check store open - if st.closed() { - return ErrClosed - } - - // Check context still valid - if err := ctx.Err(); err != nil { - return err - } - - var err error - - // Nil check func - _ = opts.WalkFn - - // Pass each key in map to walk function - st.fs.Range(func(key string, val []byte) bool { - err = opts.WalkFn(ctx, Entry{ - Key: key, - Size: int64(len(val)), - }) - return (err == nil) - }) - - return err -} - -// Close implements Storage.Close(). -func (st *MemoryStorage) Close() error { - atomic.StoreUint32(&st.st, 1) - return nil -} - -// closed returns whether MemoryStorage is closed. -func (st *MemoryStorage) closed() bool { - return (atomic.LoadUint32(&st.st) == 1) -} - -// copyb returns a copy of byte-slice b. -func copyb(b []byte) []byte { - if b == nil { - return nil - } - p := make([]byte, len(b)) - _ = copy(p, b) - return p -} diff --git a/vendor/codeberg.org/gruf/go-store/v2/storage/s3.go b/vendor/codeberg.org/gruf/go-store/v2/storage/s3.go deleted file mode 100644 index 965fe0d4f..000000000 --- a/vendor/codeberg.org/gruf/go-store/v2/storage/s3.go +++ /dev/null @@ -1,397 +0,0 @@ -package storage - -import ( - "bytes" - "context" - "io" - "sync/atomic" - - "codeberg.org/gruf/go-store/v2/util" - "github.com/minio/minio-go/v7" -) - -// DefaultS3Config is the default S3Storage configuration. -var DefaultS3Config = &S3Config{ - CoreOpts: minio.Options{}, - GetOpts: minio.GetObjectOptions{}, - PutOpts: minio.PutObjectOptions{}, - PutChunkOpts: minio.PutObjectPartOptions{}, - PutChunkSize: 4 * 1024 * 1024, // 4MiB - StatOpts: minio.StatObjectOptions{}, - RemoveOpts: minio.RemoveObjectOptions{}, - ListSize: 200, -} - -// S3Config defines options to be used when opening an S3Storage, -// mostly options for underlying S3 client library. -type S3Config struct { - // CoreOpts are S3 client options passed during initialization. - CoreOpts minio.Options - - // GetOpts are S3 client options passed during .Read___() calls. - GetOpts minio.GetObjectOptions - - // PutOpts are S3 client options passed during .Write___() calls. - PutOpts minio.PutObjectOptions - - // PutChunkSize is the chunk size (in bytes) to use when sending - // a byte stream reader of unknown size as a multi-part object. - PutChunkSize int64 - - // PutChunkOpts are S3 client options passed during chunked .Write___() calls. - PutChunkOpts minio.PutObjectPartOptions - - // StatOpts are S3 client options passed during .Stat() calls. - StatOpts minio.StatObjectOptions - - // RemoveOpts are S3 client options passed during .Remove() calls. - RemoveOpts minio.RemoveObjectOptions - - // ListSize determines how many items to include in each - // list request, made during calls to .WalkKeys(). - ListSize int -} - -// getS3Config returns a valid S3Config for supplied ptr. -func getS3Config(cfg *S3Config) S3Config { - const minChunkSz = 5 * 1024 * 1024 - - // If nil, use default - if cfg == nil { - cfg = DefaultS3Config - } - - // Ensure a minimum compatible chunk size - if cfg.PutChunkSize <= minChunkSz { - // See: https://docs.aws.amazon.com/AmazonS3/latest/userguide/qfacts.html - cfg.PutChunkSize = minChunkSz - } - - // Assume 0 list size == use default - if cfg.ListSize <= 0 { - cfg.ListSize = 200 - } - - // Return owned config copy - return S3Config{ - CoreOpts: cfg.CoreOpts, - GetOpts: cfg.GetOpts, - PutOpts: cfg.PutOpts, - PutChunkSize: cfg.PutChunkSize, - ListSize: cfg.ListSize, - StatOpts: cfg.StatOpts, - RemoveOpts: cfg.RemoveOpts, - } -} - -// S3Storage is a storage implementation that stores key-value -// pairs in an S3 instance at given endpoint with bucket name. -type S3Storage struct { - client *minio.Core - bucket string - config S3Config - state uint32 -} - -// OpenS3 opens a new S3Storage instance with given S3 endpoint URL, bucket name and configuration. -func OpenS3(endpoint string, bucket string, cfg *S3Config) (*S3Storage, error) { - // Get checked config - config := getS3Config(cfg) - - // Create new S3 client connection - client, err := minio.NewCore(endpoint, &config.CoreOpts) - if err != nil { - return nil, err - } - - // Check that provided bucket actually exists - exists, err := client.BucketExists(context.Background(), bucket) - if err != nil { - return nil, err - } else if !exists { - return nil, new_error("bucket does not exist") - } - - return &S3Storage{ - client: client, - bucket: bucket, - config: config, - }, nil -} - -// Client returns access to the underlying S3 client. -func (st *S3Storage) Client() *minio.Core { - return st.client -} - -// Clean implements Storage.Clean(). -func (st *S3Storage) Clean(ctx context.Context) error { - return nil // nothing to do for S3 -} - -// ReadBytes implements Storage.ReadBytes(). -func (st *S3Storage) ReadBytes(ctx context.Context, key string) ([]byte, error) { - // Fetch object reader from S3 bucket - rc, err := st.ReadStream(ctx, key) - if err != nil { - return nil, err - } - defer rc.Close() - - // Read all bytes and return - return io.ReadAll(rc) -} - -// ReadStream implements Storage.ReadStream(). -func (st *S3Storage) ReadStream(ctx context.Context, key string) (io.ReadCloser, error) { - // Check storage open - if st.closed() { - return nil, ErrClosed - } - - // Fetch object reader from S3 bucket - rc, _, _, err := st.client.GetObject( - ctx, - st.bucket, - key, - st.config.GetOpts, - ) - if err != nil { - return nil, transformS3Error(err) - } - - return rc, nil -} - -// WriteBytes implements Storage.WriteBytes(). -func (st *S3Storage) WriteBytes(ctx context.Context, key string, value []byte) (int, error) { - n, err := st.WriteStream(ctx, key, util.NewByteReaderSize(value)) - return int(n), err -} - -// WriteStream implements Storage.WriteStream(). -func (st *S3Storage) WriteStream(ctx context.Context, key string, r io.Reader) (int64, error) { - // Check storage open - if st.closed() { - return 0, ErrClosed - } - - if rs, ok := r.(util.ReaderSize); ok { - // This reader supports providing us the size of - // the encompassed data, allowing us to perform - // a singular .PutObject() call with length. - info, err := st.client.PutObject( - ctx, - st.bucket, - key, - r, - rs.Size(), - "", - "", - st.config.PutOpts, - ) - if err != nil { - err = transformS3Error(err) - } - return info.Size, err - } - - // Start a new multipart upload to get ID - uploadID, err := st.client.NewMultipartUpload( - ctx, - st.bucket, - key, - st.config.PutOpts, - ) - if err != nil { - return 0, transformS3Error(err) - } - - var ( - index = int(1) // parts index - total = int64(0) - parts []minio.CompletePart - chunk = make([]byte, st.config.PutChunkSize) - rbuf = bytes.NewReader(nil) - ) - - // Note that we do not perform any kind of - // memory pooling of the chunk buffers here. - // Optimal chunking sizes for S3 writes are in - // the orders of megabytes, so letting the GC - // collect these ASAP is much preferred. - -loop: - for done := false; !done; { - // Read next chunk into byte buffer - n, err := io.ReadFull(r, chunk) - - switch err { - // Successful read - case nil: - - // Reached end, buffer empty - case io.EOF: - break loop - - // Reached end, but buffer not empty - case io.ErrUnexpectedEOF: - done = true - - // All other errors - default: - return 0, err - } - - // Reset byte reader - rbuf.Reset(chunk[:n]) - - // Put this object chunk in S3 store - pt, err := st.client.PutObjectPart( - ctx, - st.bucket, - key, - uploadID, - index, - rbuf, - int64(n), - st.config.PutChunkOpts, - ) - if err != nil { - return 0, err - } - - // Append completed part to slice - parts = append(parts, minio.CompletePart{ - PartNumber: pt.PartNumber, - ETag: pt.ETag, - ChecksumCRC32: pt.ChecksumCRC32, - ChecksumCRC32C: pt.ChecksumCRC32C, - ChecksumSHA1: pt.ChecksumSHA1, - ChecksumSHA256: pt.ChecksumSHA256, - }) - - // Iterate idx - index++ - - // Update total size - total += pt.Size - } - - // Complete this multi-part upload operation - _, err = st.client.CompleteMultipartUpload( - ctx, - st.bucket, - key, - uploadID, - parts, - st.config.PutOpts, - ) - if err != nil { - return 0, err - } - - return total, nil -} - -// Stat implements Storage.Stat(). -func (st *S3Storage) Stat(ctx context.Context, key string) (bool, error) { - // Check storage open - if st.closed() { - return false, ErrClosed - } - - // Query object in S3 bucket - _, err := st.client.StatObject( - ctx, - st.bucket, - key, - st.config.StatOpts, - ) - if err != nil { - return false, transformS3Error(err) - } - - return true, nil -} - -// Remove implements Storage.Remove(). -func (st *S3Storage) Remove(ctx context.Context, key string) error { - // Check storage open - if st.closed() { - return ErrClosed - } - - // S3 returns no error on remove for non-existent keys - if ok, err := st.Stat(ctx, key); err != nil { - return err - } else if !ok { - return ErrNotFound - } - - // Remove object from S3 bucket - err := st.client.RemoveObject( - ctx, - st.bucket, - key, - st.config.RemoveOpts, - ) - if err != nil { - return transformS3Error(err) - } - - return nil -} - -// WalkKeys implements Storage.WalkKeys(). -func (st *S3Storage) WalkKeys(ctx context.Context, opts WalkKeysOptions) error { - var ( - prev string - token string - ) - - for { - // List the objects in bucket starting at marker - result, err := st.client.ListObjectsV2( - st.bucket, - "", - prev, - token, - "", - st.config.ListSize, - ) - if err != nil { - return err - } - - // Pass each object through walk func - for _, obj := range result.Contents { - if err := opts.WalkFn(ctx, Entry{ - Key: obj.Key, - Size: obj.Size, - }); err != nil { - return err - } - } - - // No token means we reached end of bucket - if result.NextContinuationToken == "" { - return nil - } - - // Set continue token and prev mark - token = result.NextContinuationToken - prev = result.StartAfter - } -} - -// Close implements Storage.Close(). -func (st *S3Storage) Close() error { - atomic.StoreUint32(&st.state, 1) - return nil -} - -// closed returns whether S3Storage is closed. -func (st *S3Storage) closed() bool { - return (atomic.LoadUint32(&st.state) == 1) -} diff --git a/vendor/codeberg.org/gruf/go-store/v2/storage/storage.go b/vendor/codeberg.org/gruf/go-store/v2/storage/storage.go deleted file mode 100644 index a60ea93ad..000000000 --- a/vendor/codeberg.org/gruf/go-store/v2/storage/storage.go +++ /dev/null @@ -1,53 +0,0 @@ -package storage - -import ( - "context" - "io" -) - -// Storage defines a means of storing and accessing key value pairs -type Storage interface { - // ReadBytes returns the byte value for key in storage - ReadBytes(ctx context.Context, key string) ([]byte, error) - - // ReadStream returns an io.ReadCloser for the value bytes at key in the storage - ReadStream(ctx context.Context, key string) (io.ReadCloser, error) - - // WriteBytes writes the supplied value bytes at key in the storage - WriteBytes(ctx context.Context, key string, value []byte) (int, error) - - // WriteStream writes the bytes from supplied reader at key in the storage - WriteStream(ctx context.Context, key string, r io.Reader) (int64, error) - - // Stat checks if the supplied key is in the storage - Stat(ctx context.Context, key string) (bool, error) - - // Remove attempts to remove the supplied key-value pair from storage - Remove(ctx context.Context, key string) error - - // Close will close the storage, releasing any file locks - Close() error - - // Clean removes unused values and unclutters the storage (e.g. removing empty folders) - Clean(ctx context.Context) error - - // WalkKeys walks the keys in the storage - WalkKeys(ctx context.Context, opts WalkKeysOptions) error -} - -// Entry represents a key in a Storage{} implementation, -// with any associated metadata that may have been set. -type Entry struct { - // Key is this entry's unique storage key. - Key string - - // Size is the size of this entry in storage. - // Note that size < 0 indicates unknown. - Size int64 -} - -// WalkKeysOptions defines how to walk the keys in a storage implementation -type WalkKeysOptions struct { - // WalkFn is the function to apply on each StorageEntry - WalkFn func(context.Context, Entry) error -} diff --git a/vendor/codeberg.org/gruf/go-store/v2/storage/transform.go b/vendor/codeberg.org/gruf/go-store/v2/storage/transform.go deleted file mode 100644 index 3863dd774..000000000 --- a/vendor/codeberg.org/gruf/go-store/v2/storage/transform.go +++ /dev/null @@ -1,25 +0,0 @@ -package storage - -// KeyTransform defines a method of converting store keys to storage paths (and vice-versa) -type KeyTransform interface { - // KeyToPath converts a supplied key to storage path - KeyToPath(string) string - - // PathToKey converts a supplied storage path to key - PathToKey(string) string -} - -type nopKeyTransform struct{} - -// NopTransform returns a nop key transform (i.e. key = path) -func NopTransform() KeyTransform { - return &nopKeyTransform{} -} - -func (t *nopKeyTransform) KeyToPath(key string) string { - return key -} - -func (t *nopKeyTransform) PathToKey(path string) string { - return path -} |
