diff options
| author | 2022-11-05 12:10:19 +0100 | |
|---|---|---|
| committer | 2022-11-05 11:10:19 +0000 | |
| commit | bcb80d3ff4a669d52d63950c8830427646c05884 (patch) | |
| tree | 4aa95a83545b3f87a80fe4b625cb6f2ad9c4427f /vendor/codeberg.org/gruf/go-store/v2/storage | |
| parent | [bugfix] Increase field size limits when registering apps (#958) (diff) | |
| download | gotosocial-bcb80d3ff4a669d52d63950c8830427646c05884.tar.xz | |
[chore] bump gruf/go-store to v2 (#953)
* [chore] bump gruf/go-store to v2
* no more boobs
Diffstat (limited to 'vendor/codeberg.org/gruf/go-store/v2/storage')
| -rw-r--r-- | vendor/codeberg.org/gruf/go-store/v2/storage/block.go | 885 | ||||
| -rw-r--r-- | vendor/codeberg.org/gruf/go-store/v2/storage/compressor.go | 212 | ||||
| -rw-r--r-- | vendor/codeberg.org/gruf/go-store/v2/storage/disk.go | 425 | ||||
| -rw-r--r-- | vendor/codeberg.org/gruf/go-store/v2/storage/errors.go | 110 | ||||
| -rw-r--r-- | vendor/codeberg.org/gruf/go-store/v2/storage/fs.go | 221 | ||||
| -rw-r--r-- | vendor/codeberg.org/gruf/go-store/v2/storage/lock.go | 59 | ||||
| -rw-r--r-- | vendor/codeberg.org/gruf/go-store/v2/storage/memory.go | 228 | ||||
| -rw-r--r-- | vendor/codeberg.org/gruf/go-store/v2/storage/s3.go | 385 | ||||
| -rw-r--r-- | vendor/codeberg.org/gruf/go-store/v2/storage/storage.go | 53 | ||||
| -rw-r--r-- | vendor/codeberg.org/gruf/go-store/v2/storage/transform.go | 25 |
10 files changed, 2603 insertions, 0 deletions
diff --git a/vendor/codeberg.org/gruf/go-store/v2/storage/block.go b/vendor/codeberg.org/gruf/go-store/v2/storage/block.go new file mode 100644 index 000000000..b1081cb1c --- /dev/null +++ b/vendor/codeberg.org/gruf/go-store/v2/storage/block.go @@ -0,0 +1,885 @@ +package storage + +import ( + "bytes" + "context" + "crypto/sha256" + "fmt" + "io" + "io/fs" + "os" + "strings" + "sync" + "syscall" + + "codeberg.org/gruf/go-byteutil" + "codeberg.org/gruf/go-errors/v2" + "codeberg.org/gruf/go-fastcopy" + "codeberg.org/gruf/go-hashenc" + "codeberg.org/gruf/go-pools" + "codeberg.org/gruf/go-store/v2/util" +) + +var ( + nodePathPrefix = "node/" + blockPathPrefix = "block/" +) + +// DefaultBlockConfig is the default BlockStorage configuration. +var DefaultBlockConfig = &BlockConfig{ + BlockSize: 1024 * 16, + WriteBufSize: 4096, + Overwrite: false, + Compression: NoCompression(), +} + +// BlockConfig defines options to be used when opening a BlockStorage. +type BlockConfig struct { + // BlockSize is the chunking size to use when splitting and storing blocks of data. + BlockSize int + + // ReadBufSize is the buffer size to use when reading node files. + ReadBufSize int + + // WriteBufSize is the buffer size to use when writing file streams. + WriteBufSize int + + // Overwrite allows overwriting values of stored keys in the storage. + Overwrite bool + + // Compression is the Compressor to use when reading / writing files, + // default is no compression. + Compression Compressor +} + +// getBlockConfig returns a valid BlockConfig for supplied ptr. +func getBlockConfig(cfg *BlockConfig) BlockConfig { + // If nil, use default + if cfg == nil { + cfg = DefaultBlockConfig + } + + // Assume nil compress == none + if cfg.Compression == nil { + cfg.Compression = NoCompression() + } + + // Assume 0 chunk size == use default + if cfg.BlockSize <= 0 { + cfg.BlockSize = DefaultBlockConfig.BlockSize + } + + // Assume 0 buf size == use default + if cfg.WriteBufSize <= 0 { + cfg.WriteBufSize = DefaultDiskConfig.WriteBufSize + } + + // Return owned config copy + return BlockConfig{ + BlockSize: cfg.BlockSize, + WriteBufSize: cfg.WriteBufSize, + Overwrite: cfg.Overwrite, + Compression: cfg.Compression, + } +} + +// BlockStorage is a Storage implementation that stores input data as chunks on +// a filesystem. Each value is chunked into blocks of configured size and these +// blocks are stored with name equal to their base64-encoded SHA256 hash-sum. A +// "node" file is finally created containing an array of hashes contained within +// this value. +type BlockStorage struct { + path string // path is the root path of this store + blockPath string // blockPath is the joined root path + block path prefix + nodePath string // nodePath is the joined root path + node path prefix + config BlockConfig // cfg is the supplied configuration for this store + hashPool sync.Pool // hashPool is this store's hashEncoder pool + bufpool pools.BufferPool // bufpool is this store's bytes.Buffer pool + cppool fastcopy.CopyPool // cppool is the prepared io copier with buffer pool + lock *Lock // lock is the opened lockfile for this storage instance + + // NOTE: + // BlockStorage does not need to lock each of the underlying block files + // as the filename itself directly relates to the contents. If there happens + // to be an overwrite, it will just be of the same data since the filename is + // the hash of the data. +} + +// OpenBlock opens a BlockStorage instance for given folder path and configuration. +func OpenBlock(path string, cfg *BlockConfig) (*BlockStorage, error) { + // Acquire path builder + pb := util.GetPathBuilder() + defer util.PutPathBuilder(pb) + + // Clean provided path, ensure ends in '/' (should + // be dir, this helps with file path trimming later) + path = pb.Clean(path) + "/" + + // Get checked config + config := getBlockConfig(cfg) + + // Attempt to open path + file, err := os.OpenFile(path, defaultFileROFlags, defaultDirPerms) + if err != nil { + // If not a not-exist error, return + if !os.IsNotExist(err) { + return nil, err + } + + // Attempt to make store path dirs + err = os.MkdirAll(path, defaultDirPerms) + if err != nil { + return nil, err + } + + // Reopen dir now it's been created + file, err = os.OpenFile(path, defaultFileROFlags, defaultDirPerms) + if err != nil { + return nil, err + } + } + defer file.Close() + + // Double check this is a dir (NOT a file!) + stat, err := file.Stat() + if err != nil { + return nil, err + } else if !stat.IsDir() { + return nil, new_error("path is file") + } + + // Open and acquire storage lock for path + lock, err := OpenLock(pb.Join(path, LockFile)) + if err != nil { + return nil, err + } + + // Figure out the largest size for bufpool slices + bufSz := encodedHashLen + if bufSz < config.BlockSize { + bufSz = config.BlockSize + } + if bufSz < config.WriteBufSize { + bufSz = config.WriteBufSize + } + + // Prepare BlockStorage + st := &BlockStorage{ + path: path, + blockPath: pb.Join(path, blockPathPrefix), + nodePath: pb.Join(path, nodePathPrefix), + config: config, + hashPool: sync.Pool{ + New: func() interface{} { + return newHashEncoder() + }, + }, + bufpool: pools.NewBufferPool(bufSz), + lock: lock, + } + + // Set copypool buffer size + st.cppool.Buffer(config.ReadBufSize) + + return st, nil +} + +// Clean implements storage.Clean(). +func (st *BlockStorage) Clean(ctx context.Context) error { + // Check if open + if st.lock.Closed() { + return ErrClosed + } + + // Check context still valid + if err := ctx.Err(); err != nil { + return err + } + + // Acquire path builder + pb := util.GetPathBuilder() + defer util.PutPathBuilder(pb) + + nodes := map[string]*node{} + + // Walk nodes dir for entries + err := walkDir(pb, st.nodePath, func(npath string, fsentry fs.DirEntry) error { + // Only deal with regular files + if !fsentry.Type().IsRegular() { + return nil + } + + // Get joined node path name + npath = pb.Join(npath, fsentry.Name()) + + // Attempt to open RO file + file, err := open(npath, defaultFileROFlags) + if err != nil { + return err + } + defer file.Close() + + // Alloc new Node + acquire hash buffer for writes + hbuf := st.bufpool.Get() + defer st.bufpool.Put(hbuf) + hbuf.Guarantee(encodedHashLen) + node := node{} + + // Write file contents to node + _, err = io.CopyBuffer( + &nodeWriter{ + node: &node, + buf: hbuf, + }, + file, + nil, + ) + if err != nil { + return err + } + + // Append to nodes slice + nodes[fsentry.Name()] = &node + return nil + }) + + // Handle errors (though nodePath may not have been created yet) + if err != nil && !os.IsNotExist(err) { + return err + } + + // Walk blocks dir for entries + err = walkDir(pb, st.blockPath, func(bpath string, fsentry fs.DirEntry) error { + // Only deal with regular files + if !fsentry.Type().IsRegular() { + return nil + } + + inUse := false + for key, node := range nodes { + if node.removeHash(fsentry.Name()) { + if len(node.hashes) < 1 { + // This node contained hash, and after removal is now empty. + // Remove this node from our tracked nodes slice + delete(nodes, key) + } + inUse = true + } + } + + // Block hash is used by node + if inUse { + return nil + } + + // Get joined block path name + bpath = pb.Join(bpath, fsentry.Name()) + + // Remove this unused block path + return os.Remove(bpath) + }) + + // Handle errors (though blockPath may not have been created yet) + if err != nil && !os.IsNotExist(err) { + return err + } + + // If there are nodes left at this point, they are corrupt + // (i.e. they're referencing block hashes that don't exist) + if len(nodes) > 0 { + nodeKeys := []string{} + for key := range nodes { + nodeKeys = append(nodeKeys, key) + } + return fmt.Errorf("store/storage: corrupted nodes: %v", nodeKeys) + } + + return nil +} + +// ReadBytes implements Storage.ReadBytes(). +func (st *BlockStorage) ReadBytes(ctx context.Context, key string) ([]byte, error) { + // Get stream reader for key + rc, err := st.ReadStream(ctx, key) + if err != nil { + return nil, err + } + defer rc.Close() + + // Read all bytes and return + return io.ReadAll(rc) +} + +// ReadStream implements Storage.ReadStream(). +func (st *BlockStorage) ReadStream(ctx context.Context, key string) (io.ReadCloser, error) { + // Get node file path for key + npath, err := st.nodePathForKey(key) + if err != nil { + return nil, err + } + + // Check if open + if st.lock.Closed() { + return nil, ErrClosed + } + + // Check context still valid + if err := ctx.Err(); err != nil { + return nil, err + } + + // Attempt to open RO file + file, err := open(npath, defaultFileROFlags) + if err != nil { + return nil, errSwapNotFound(err) + } + defer file.Close() + + // Acquire hash buffer for writes + hbuf := st.bufpool.Get() + defer st.bufpool.Put(hbuf) + + var node node + + // Write file contents to node + _, err = st.cppool.Copy( + &nodeWriter{ + node: &node, + buf: hbuf, + }, + file, + ) + if err != nil { + return nil, err + } + + // Prepare block reader and return + return util.NopReadCloser(&blockReader{ + storage: st, + node: &node, + }), nil +} + +// readBlock reads the block with hash (key) from the filesystem. +func (st *BlockStorage) readBlock(key string) ([]byte, error) { + // Get block file path for key + bpath := st.blockPathForKey(key) + + // Attempt to open RO file + file, err := open(bpath, defaultFileROFlags) + if err != nil { + return nil, wrap(new_error("corrupted node"), err) + } + defer file.Close() + + // Wrap the file in a compressor + cFile, err := st.config.Compression.Reader(file) + if err != nil { + return nil, wrap(new_error("corrupted node"), err) + } + defer cFile.Close() + + // Read the entire file + return io.ReadAll(cFile) +} + +// WriteBytes implements Storage.WriteBytes(). +func (st *BlockStorage) WriteBytes(ctx context.Context, key string, value []byte) error { + return st.WriteStream(ctx, key, bytes.NewReader(value)) +} + +// WriteStream implements Storage.WriteStream(). +func (st *BlockStorage) WriteStream(ctx context.Context, key string, r io.Reader) error { + // Get node file path for key + npath, err := st.nodePathForKey(key) + if err != nil { + return err + } + + // Check if open + if st.lock.Closed() { + return ErrClosed + } + + // Check context still valid + if err := ctx.Err(); err != nil { + return err + } + + // Check if this exists + ok, err := stat(key) + if err != nil { + return err + } + + // Check if we allow overwrites + if ok && !st.config.Overwrite { + return ErrAlreadyExists + } + + // Ensure nodes dir (and any leading up to) exists + err = os.MkdirAll(st.nodePath, defaultDirPerms) + if err != nil { + return err + } + + // Ensure blocks dir (and any leading up to) exists + err = os.MkdirAll(st.blockPath, defaultDirPerms) + if err != nil { + return err + } + + var node node + + // Acquire HashEncoder + hc := st.hashPool.Get().(*hashEncoder) + defer st.hashPool.Put(hc) + + // Create new waitgroup and OnceError for + // goroutine error tracking and propagating + wg := sync.WaitGroup{} + onceErr := errors.OnceError{} + +loop: + for !onceErr.IsSet() { + // Fetch new buffer for this loop + buf := st.bufpool.Get() + buf.Grow(st.config.BlockSize) + + // Read next chunk + n, err := io.ReadFull(r, buf.B) + switch err { + case nil, io.ErrUnexpectedEOF: + // do nothing + case io.EOF: + st.bufpool.Put(buf) + break loop + default: + st.bufpool.Put(buf) + return err + } + + // Hash the encoded data + sum := hc.EncodeSum(buf.B) + + // Append to the node's hashes + node.hashes = append(node.hashes, sum) + + // If already on disk, skip + has, err := st.statBlock(sum) + if err != nil { + st.bufpool.Put(buf) + return err + } else if has { + st.bufpool.Put(buf) + continue loop + } + + // Check if reached EOF + atEOF := (n < buf.Len()) + + wg.Add(1) + go func() { + // Perform writes in goroutine + + defer func() { + // Defer release + + // signal we're done + st.bufpool.Put(buf) + wg.Done() + }() + + // Write block to store at hash + err = st.writeBlock(sum, buf.B[:n]) + if err != nil { + onceErr.Store(err) + return + } + }() + + // Break at end + if atEOF { + break loop + } + } + + // Wait, check errors + wg.Wait() + if onceErr.IsSet() { + return onceErr.Load() + } + + // If no hashes created, return + if len(node.hashes) < 1 { + return new_error("no hashes written") + } + + // Prepare to swap error if need-be + errSwap := errSwapNoop + + // Build file RW flags + // NOTE: we performed an initial check for + // this before writing blocks, but if + // the utilizer of this storage didn't + // correctly mutex protect this key then + // someone may have beaten us to the + // punch at writing the node file. + flags := defaultFileRWFlags + if !st.config.Overwrite { + flags |= syscall.O_EXCL + + // Catch + replace err exist + errSwap = errSwapExist + } + + // Attempt to open RW file + file, err := open(npath, flags) + if err != nil { + return errSwap(err) + } + defer file.Close() + + // Acquire write buffer + buf := st.bufpool.Get() + defer st.bufpool.Put(buf) + buf.Grow(st.config.WriteBufSize) + + // Finally, write data to file + _, err = io.CopyBuffer(file, &nodeReader{node: node}, buf.B) + return err +} + +// writeBlock writes the block with hash and supplied value to the filesystem. +func (st *BlockStorage) writeBlock(hash string, value []byte) error { + // Get block file path for key + bpath := st.blockPathForKey(hash) + + // Attempt to open RW file + file, err := open(bpath, defaultFileRWFlags) + if err != nil { + if err == syscall.EEXIST { + err = nil /* race issue describe in struct NOTE */ + } + return err + } + defer file.Close() + + // Wrap the file in a compressor + cFile, err := st.config.Compression.Writer(file) + if err != nil { + return err + } + defer cFile.Close() + + // Write value to file + _, err = cFile.Write(value) + return err +} + +// statBlock checks for existence of supplied block hash. +func (st *BlockStorage) statBlock(hash string) (bool, error) { + return stat(st.blockPathForKey(hash)) +} + +// Stat implements Storage.Stat() +func (st *BlockStorage) Stat(ctx context.Context, key string) (bool, error) { + // Get node file path for key + kpath, err := st.nodePathForKey(key) + if err != nil { + return false, err + } + + // Check if open + if st.lock.Closed() { + return false, ErrClosed + } + + // Check context still valid + if err := ctx.Err(); err != nil { + return false, err + } + + // Check for file on disk + return stat(kpath) +} + +// Remove implements Storage.Remove(). +func (st *BlockStorage) Remove(ctx context.Context, key string) error { + // Get node file path for key + kpath, err := st.nodePathForKey(key) + if err != nil { + return err + } + + // Check if open + if st.lock.Closed() { + return ErrClosed + } + + // Check context still valid + if err := ctx.Err(); err != nil { + return err + } + + // Remove at path (we know this is file) + if err := unlink(kpath); err != nil { + return errSwapNotFound(err) + } + + return nil +} + +// Close implements Storage.Close(). +func (st *BlockStorage) Close() error { + return st.lock.Close() +} + +// WalkKeys implements Storage.WalkKeys(). +func (st *BlockStorage) WalkKeys(ctx context.Context, opts WalkKeysOptions) error { + // Check if open + if st.lock.Closed() { + return ErrClosed + } + + // Check context still valid + if err := ctx.Err(); err != nil { + return err + } + + // Acquire path builder + pb := util.GetPathBuilder() + defer util.PutPathBuilder(pb) + + // Walk dir for entries + return walkDir(pb, st.nodePath, func(npath string, fsentry fs.DirEntry) error { + if !fsentry.Type().IsRegular() { + // Only deal with regular files + return nil + } + + // Perform provided walk function + return opts.WalkFn(ctx, Entry{ + Key: fsentry.Name(), + Size: -1, + }) + }) +} + +// nodePathForKey calculates the node file path for supplied key. +func (st *BlockStorage) nodePathForKey(key string) (string, error) { + // Path separators are illegal, as directory paths + if strings.Contains(key, "/") || key == "." || key == ".." { + return "", ErrInvalidKey + } + + // Acquire path builder + pb := util.GetPathBuilder() + defer util.PutPathBuilder(pb) + + // Append the nodepath to key + pb.AppendString(st.nodePath) + pb.AppendString(key) + + // Return joined + cleaned node-path + return pb.Join(st.nodePath, key), nil +} + +// blockPathForKey calculates the block file path for supplied hash. +func (st *BlockStorage) blockPathForKey(hash string) string { + pb := util.GetPathBuilder() + defer util.PutPathBuilder(pb) + return pb.Join(st.blockPath, hash) +} + +// hashSeparator is the separating byte between block hashes. +const hashSeparator = byte('\n') + +// node represents the contents of a node file in storage. +type node struct { + hashes []string +} + +// removeHash attempts to remove supplied block hash from the node's hash array. +func (n *node) removeHash(hash string) bool { + for i := 0; i < len(n.hashes); { + if n.hashes[i] == hash { + // Drop this hash from slice + n.hashes = append(n.hashes[:i], n.hashes[i+1:]...) + return true + } + + // Continue iter + i++ + } + return false +} + +// nodeReader is an io.Reader implementation for the node file representation, +// which is useful when calculated node file is being written to the store. +type nodeReader struct { + node node + idx int + last int +} + +func (r *nodeReader) Read(b []byte) (int, error) { + n := 0 + + // '-1' means we missed writing + // hash separator on last iteration + if r.last == -1 { + b[n] = hashSeparator + n++ + r.last = 0 + } + + for r.idx < len(r.node.hashes) { + hash := r.node.hashes[r.idx] + + // Copy into buffer + update read count + m := copy(b[n:], hash[r.last:]) + n += m + + // If incomplete copy, return here + if m < len(hash)-r.last { + r.last = m + return n, nil + } + + // Check we can write last separator + if n == len(b) { + r.last = -1 + return n, nil + } + + // Write separator, iter, reset + b[n] = hashSeparator + n++ + r.idx++ + r.last = 0 + } + + // We reached end of hashes + return n, io.EOF +} + +// nodeWriter is an io.Writer implementation for the node file representation, +// which is useful when calculated node file is being read from the store. +type nodeWriter struct { + node *node + buf *byteutil.Buffer +} + +func (w *nodeWriter) Write(b []byte) (int, error) { + n := 0 + + for { + // Find next hash separator position + idx := bytes.IndexByte(b[n:], hashSeparator) + if idx == -1 { + // Check we shouldn't be expecting it + if w.buf.Len() > encodedHashLen { + return n, new_error("invalid node") + } + + // Write all contents to buffer + w.buf.Write(b[n:]) + return len(b), nil + } + + // Found hash separator, write + // current buf contents to Node hashes + w.buf.Write(b[n : n+idx]) + n += idx + 1 + if w.buf.Len() != encodedHashLen { + return n, new_error("invalid node") + } + + // Append to hashes & reset + w.node.hashes = append(w.node.hashes, w.buf.String()) + w.buf.Reset() + } +} + +// blockReader is an io.Reader implementation for the combined, linked block +// data contained with a node file. Basically, this allows reading value data +// from the store for a given node file. +type blockReader struct { + storage *BlockStorage + node *node + buf []byte + prev int +} + +func (r *blockReader) Read(b []byte) (int, error) { + n := 0 + + // Data left in buf, copy as much as we + // can into supplied read buffer + if r.prev < len(r.buf)-1 { + n += copy(b, r.buf[r.prev:]) + r.prev += n + if n >= len(b) { + return n, nil + } + } + + for { + // Check we have any hashes left + if len(r.node.hashes) < 1 { + return n, io.EOF + } + + // Get next key from slice + key := r.node.hashes[0] + r.node.hashes = r.node.hashes[1:] + + // Attempt to fetch next batch of data + var err error + r.buf, err = r.storage.readBlock(key) + if err != nil { + return n, err + } + r.prev = 0 + + // Copy as much as can from new buffer + m := copy(b[n:], r.buf) + r.prev += m + n += m + + // If we hit end of supplied buf, return + if n >= len(b) { + return n, nil + } + } +} + +var ( + // base64Encoding is our base64 encoding object. + base64Encoding = hashenc.Base64() + + // encodedHashLen is the once-calculated encoded hash-sum length + encodedHashLen = base64Encoding.EncodedLen( + sha256.New().Size(), + ) +) + +// hashEncoder is a HashEncoder with built-in encode buffer. +type hashEncoder struct { + henc hashenc.HashEncoder + ebuf []byte +} + +// newHashEncoder returns a new hashEncoder instance. +func newHashEncoder() *hashEncoder { + return &hashEncoder{ + henc: hashenc.New(sha256.New(), base64Encoding), + ebuf: make([]byte, encodedHashLen), + } +} + +// EncodeSum encodes the src data and returns resulting bytes, only valid until next call to EncodeSum(). +func (henc *hashEncoder) EncodeSum(src []byte) string { + henc.henc.EncodeSum(henc.ebuf, src) + return string(henc.ebuf) +} diff --git a/vendor/codeberg.org/gruf/go-store/v2/storage/compressor.go b/vendor/codeberg.org/gruf/go-store/v2/storage/compressor.go new file mode 100644 index 000000000..6eeb3a78d --- /dev/null +++ b/vendor/codeberg.org/gruf/go-store/v2/storage/compressor.go @@ -0,0 +1,212 @@ +package storage + +import ( + "bytes" + "io" + "sync" + + "codeberg.org/gruf/go-store/v2/util" + + "github.com/klauspost/compress/gzip" + "github.com/klauspost/compress/snappy" + "github.com/klauspost/compress/zlib" +) + +// Compressor defines a means of compressing/decompressing values going into a key-value store +type Compressor interface { + // Reader returns a new decompressing io.ReadCloser based on supplied (compressed) io.Reader + Reader(io.Reader) (io.ReadCloser, error) + + // Writer returns a new compressing io.WriteCloser based on supplied (uncompressed) io.Writer + Writer(io.Writer) (io.WriteCloser, error) +} + +type gzipCompressor struct { + rpool sync.Pool + wpool sync.Pool +} + +// GZipCompressor returns a new Compressor that implements GZip at default compression level +func GZipCompressor() Compressor { + return GZipCompressorLevel(gzip.DefaultCompression) +} + +// GZipCompressorLevel returns a new Compressor that implements GZip at supplied compression level +func GZipCompressorLevel(level int) Compressor { + // GZip readers immediately check for valid + // header data on allocation / reset, so we + // need a set of valid header data so we can + // iniitialize reader instances in mempool. + hdr := bytes.NewBuffer(nil) + + // Init writer to ensure valid level provided + gw, err := gzip.NewWriterLevel(hdr, level) + if err != nil { + panic(err) + } + + // Write empty data to ensure gzip + // header data is in byte buffer. + gw.Write([]byte{}) + gw.Close() + + return &gzipCompressor{ + rpool: sync.Pool{ + New: func() any { + hdr := bytes.NewReader(hdr.Bytes()) + gr, _ := gzip.NewReader(hdr) + return gr + }, + }, + wpool: sync.Pool{ + New: func() any { + gw, _ := gzip.NewWriterLevel(nil, level) + return gw + }, + }, + } +} + +func (c *gzipCompressor) Reader(r io.Reader) (io.ReadCloser, error) { + gr := c.rpool.Get().(*gzip.Reader) + if err := gr.Reset(r); err != nil { + c.rpool.Put(gr) + return nil, err + } + return util.ReadCloserWithCallback(gr, func() { + c.rpool.Put(gr) + }), nil +} + +func (c *gzipCompressor) Writer(w io.Writer) (io.WriteCloser, error) { + gw := c.wpool.Get().(*gzip.Writer) + gw.Reset(w) + return util.WriteCloserWithCallback(gw, func() { + c.wpool.Put(gw) + }), nil +} + +type zlibCompressor struct { + rpool sync.Pool + wpool sync.Pool + dict []byte +} + +// ZLibCompressor returns a new Compressor that implements ZLib at default compression level +func ZLibCompressor() Compressor { + return ZLibCompressorLevelDict(zlib.DefaultCompression, nil) +} + +// ZLibCompressorLevel returns a new Compressor that implements ZLib at supplied compression level +func ZLibCompressorLevel(level int) Compressor { + return ZLibCompressorLevelDict(level, nil) +} + +// ZLibCompressorLevelDict returns a new Compressor that implements ZLib at supplied compression level with supplied dict +func ZLibCompressorLevelDict(level int, dict []byte) Compressor { + // ZLib readers immediately check for valid + // header data on allocation / reset, so we + // need a set of valid header data so we can + // iniitialize reader instances in mempool. + hdr := bytes.NewBuffer(nil) + + // Init writer to ensure valid level + dict provided + zw, err := zlib.NewWriterLevelDict(hdr, level, dict) + if err != nil { + panic(err) + } + + // Write empty data to ensure zlib + // header data is in byte buffer. + zw.Write([]byte{}) + zw.Close() + + return &zlibCompressor{ + rpool: sync.Pool{ + New: func() any { + hdr := bytes.NewReader(hdr.Bytes()) + zr, _ := zlib.NewReaderDict(hdr, dict) + return zr + }, + }, + wpool: sync.Pool{ + New: func() any { + zw, _ := zlib.NewWriterLevelDict(nil, level, dict) + return zw + }, + }, + dict: dict, + } +} + +func (c *zlibCompressor) Reader(r io.Reader) (io.ReadCloser, error) { + zr := c.rpool.Get().(interface { + io.ReadCloser + zlib.Resetter + }) + if err := zr.Reset(r, c.dict); err != nil { + c.rpool.Put(zr) + return nil, err + } + return util.ReadCloserWithCallback(zr, func() { + c.rpool.Put(zr) + }), nil +} + +func (c *zlibCompressor) Writer(w io.Writer) (io.WriteCloser, error) { + zw := c.wpool.Get().(*zlib.Writer) + zw.Reset(w) + return util.WriteCloserWithCallback(zw, func() { + c.wpool.Put(zw) + }), nil +} + +type snappyCompressor struct { + rpool sync.Pool + wpool sync.Pool +} + +// SnappyCompressor returns a new Compressor that implements Snappy. +func SnappyCompressor() Compressor { + return &snappyCompressor{ + rpool: sync.Pool{ + New: func() any { return snappy.NewReader(nil) }, + }, + wpool: sync.Pool{ + New: func() any { return snappy.NewWriter(nil) }, + }, + } +} + +func (c *snappyCompressor) Reader(r io.Reader) (io.ReadCloser, error) { + sr := c.rpool.Get().(*snappy.Reader) + sr.Reset(r) + return util.ReadCloserWithCallback( + util.NopReadCloser(sr), + func() { c.rpool.Put(sr) }, + ), nil +} + +func (c *snappyCompressor) Writer(w io.Writer) (io.WriteCloser, error) { + sw := c.wpool.Get().(*snappy.Writer) + sw.Reset(w) + return util.WriteCloserWithCallback( + util.NopWriteCloser(sw), + func() { c.wpool.Put(sw) }, + ), nil +} + +type nopCompressor struct{} + +// NoCompression is a Compressor that simply does nothing. +func NoCompression() Compressor { + return &nopCompressor{} +} + +func (c *nopCompressor) Reader(r io.Reader) (io.ReadCloser, error) { + return util.NopReadCloser(r), nil +} + +func (c *nopCompressor) Writer(w io.Writer) (io.WriteCloser, error) { + return util.NopWriteCloser(w), nil +} diff --git a/vendor/codeberg.org/gruf/go-store/v2/storage/disk.go b/vendor/codeberg.org/gruf/go-store/v2/storage/disk.go new file mode 100644 index 000000000..dab1d6128 --- /dev/null +++ b/vendor/codeberg.org/gruf/go-store/v2/storage/disk.go @@ -0,0 +1,425 @@ +package storage + +import ( + "context" + "errors" + "io" + "io/fs" + "os" + "path" + _path "path" + "strings" + "syscall" + + "codeberg.org/gruf/go-bytes" + "codeberg.org/gruf/go-fastcopy" + "codeberg.org/gruf/go-store/v2/util" +) + +// DefaultDiskConfig is the default DiskStorage configuration. +var DefaultDiskConfig = &DiskConfig{ + Overwrite: true, + WriteBufSize: 4096, + Transform: NopTransform(), + Compression: NoCompression(), +} + +// DiskConfig defines options to be used when opening a DiskStorage. +type DiskConfig struct { + // Transform is the supplied key <--> path KeyTransform. + Transform KeyTransform + + // WriteBufSize is the buffer size to use when writing file streams. + WriteBufSize int + + // Overwrite allows overwriting values of stored keys in the storage. + Overwrite bool + + // LockFile allows specifying the filesystem path to use for the lockfile, + // providing only a filename it will store the lockfile within provided store + // path and nest the store under `path/store` to prevent access to lockfile. + LockFile string + + // Compression is the Compressor to use when reading / writing files, + // default is no compression. + Compression Compressor +} + +// getDiskConfig returns a valid DiskConfig for supplied ptr. +func getDiskConfig(cfg *DiskConfig) DiskConfig { + // If nil, use default + if cfg == nil { + cfg = DefaultDiskConfig + } + + // Assume nil transform == none + if cfg.Transform == nil { + cfg.Transform = NopTransform() + } + + // Assume nil compress == none + if cfg.Compression == nil { + cfg.Compression = NoCompression() + } + + // Assume 0 buf size == use default + if cfg.WriteBufSize <= 0 { + cfg.WriteBufSize = DefaultDiskConfig.WriteBufSize + } + + // Assume empty lockfile path == use default + if len(cfg.LockFile) == 0 { + cfg.LockFile = LockFile + } + + // Return owned config copy + return DiskConfig{ + Transform: cfg.Transform, + WriteBufSize: cfg.WriteBufSize, + Overwrite: cfg.Overwrite, + LockFile: cfg.LockFile, + Compression: cfg.Compression, + } +} + +// DiskStorage is a Storage implementation that stores directly to a filesystem. +type DiskStorage struct { + path string // path is the root path of this store + cppool fastcopy.CopyPool // cppool is the prepared io copier with buffer pool + config DiskConfig // cfg is the supplied configuration for this store + lock *Lock // lock is the opened lockfile for this storage instance +} + +// OpenDisk opens a DiskStorage instance for given folder path and configuration. +func OpenDisk(path string, cfg *DiskConfig) (*DiskStorage, error) { + // Get checked config + config := getDiskConfig(cfg) + + // Acquire path builder + pb := util.GetPathBuilder() + defer util.PutPathBuilder(pb) + + // Clean provided store path, ensure + // ends in '/' to help later path trimming + storePath := pb.Clean(path) + "/" + + // Clean provided lockfile path + lockfile := pb.Clean(config.LockFile) + + // Check if lockfile is an *actual* path or just filename + if lockDir, _ := _path.Split(lockfile); lockDir == "" { + // Lockfile is a filename, store must be nested under + // $storePath/store to prevent access to the lockfile + storePath += "store/" + lockfile = pb.Join(path, lockfile) + } + + // Attempt to open dir path + file, err := os.OpenFile(storePath, defaultFileROFlags, defaultDirPerms) + if err != nil { + // If not a not-exist error, return + if !os.IsNotExist(err) { + return nil, err + } + + // Attempt to make store path dirs + err = os.MkdirAll(storePath, defaultDirPerms) + if err != nil { + return nil, err + } + + // Reopen dir now it's been created + file, err = os.OpenFile(storePath, defaultFileROFlags, defaultDirPerms) + if err != nil { + return nil, err + } + } + defer file.Close() + + // Double check this is a dir (NOT a file!) + stat, err := file.Stat() + if err != nil { + return nil, err + } else if !stat.IsDir() { + return nil, errors.New("store/storage: path is file") + } + + // Open and acquire storage lock for path + lock, err := OpenLock(lockfile) + if err != nil { + return nil, err + } + + // Prepare DiskStorage + st := &DiskStorage{ + path: storePath, + config: config, + lock: lock, + } + + // Set copypool buffer size + st.cppool.Buffer(config.WriteBufSize) + + return st, nil +} + +// Clean implements Storage.Clean(). +func (st *DiskStorage) Clean(ctx context.Context) error { + // Check if open + if st.lock.Closed() { + return ErrClosed + } + + // Check context still valid + if err := ctx.Err(); err != nil { + return err + } + + // Clean-out unused directories + return cleanDirs(st.path) +} + +// ReadBytes implements Storage.ReadBytes(). +func (st *DiskStorage) ReadBytes(ctx context.Context, key string) ([]byte, error) { + // Get stream reader for key + rc, err := st.ReadStream(ctx, key) + if err != nil { + return nil, err + } + defer rc.Close() + + // Read all bytes and return + return io.ReadAll(rc) +} + +// ReadStream implements Storage.ReadStream(). +func (st *DiskStorage) ReadStream(ctx context.Context, key string) (io.ReadCloser, error) { + // Get file path for key + kpath, err := st.filepath(key) + if err != nil { + return nil, err + } + + // Check if open + if st.lock.Closed() { + return nil, ErrClosed + } + + // Check context still valid + if err := ctx.Err(); err != nil { + return nil, err + } + + // Attempt to open file (replace ENOENT with our own) + file, err := open(kpath, defaultFileROFlags) + if err != nil { + return nil, errSwapNotFound(err) + } + + // Wrap the file in a compressor + cFile, err := st.config.Compression.Reader(file) + if err != nil { + file.Close() // close this here, ignore error + return nil, err + } + + // Wrap compressor to ensure file close + return util.ReadCloserWithCallback(cFile, func() { + file.Close() + }), nil +} + +// WriteBytes implements Storage.WriteBytes(). +func (st *DiskStorage) WriteBytes(ctx context.Context, key string, value []byte) error { + return st.WriteStream(ctx, key, bytes.NewReader(value)) +} + +// WriteStream implements Storage.WriteStream(). +func (st *DiskStorage) WriteStream(ctx context.Context, key string, r io.Reader) error { + // Get file path for key + kpath, err := st.filepath(key) + if err != nil { + return err + } + + // Check if open + if st.lock.Closed() { + return ErrClosed + } + + // Check context still valid + if err := ctx.Err(); err != nil { + return err + } + + // Ensure dirs leading up to file exist + err = os.MkdirAll(path.Dir(kpath), defaultDirPerms) + if err != nil { + return err + } + + // Prepare to swap error if need-be + errSwap := errSwapNoop + + // Build file RW flags + flags := defaultFileRWFlags + if !st.config.Overwrite { + flags |= syscall.O_EXCL + + // Catch + replace err exist + errSwap = errSwapExist + } + + // Attempt to open file + file, err := open(kpath, flags) + if err != nil { + return errSwap(err) + } + defer file.Close() + + // Wrap the file in a compressor + cFile, err := st.config.Compression.Writer(file) + if err != nil { + return err + } + defer cFile.Close() + + // Copy provided reader to file + _, err = st.cppool.Copy(cFile, r) + return err +} + +// Stat implements Storage.Stat(). +func (st *DiskStorage) Stat(ctx context.Context, key string) (bool, error) { + // Get file path for key + kpath, err := st.filepath(key) + if err != nil { + return false, err + } + + // Check if open + if st.lock.Closed() { + return false, ErrClosed + } + + // Check context still valid + if err := ctx.Err(); err != nil { + return false, err + } + + // Check for file on disk + return stat(kpath) +} + +// Remove implements Storage.Remove(). +func (st *DiskStorage) Remove(ctx context.Context, key string) error { + // Get file path for key + kpath, err := st.filepath(key) + if err != nil { + return err + } + + // Check if open + if st.lock.Closed() { + return ErrClosed + } + + // Check context still valid + if err := ctx.Err(); err != nil { + return err + } + + // Remove at path (we know this is file) + if err := unlink(kpath); err != nil { + return errSwapNotFound(err) + } + + return nil +} + +// Close implements Storage.Close(). +func (st *DiskStorage) Close() error { + return st.lock.Close() +} + +// WalkKeys implements Storage.WalkKeys(). +func (st *DiskStorage) WalkKeys(ctx context.Context, opts WalkKeysOptions) error { + // Check if open + if st.lock.Closed() { + return ErrClosed + } + + // Check context still valid + if err := ctx.Err(); err != nil { + return err + } + + // Acquire path builder + pb := util.GetPathBuilder() + defer util.PutPathBuilder(pb) + + // Walk dir for entries + return walkDir(pb, st.path, func(kpath string, fsentry fs.DirEntry) error { + if !fsentry.Type().IsRegular() { + // Only deal with regular files + return nil + } + + // Get full item path (without root) + kpath = pb.Join(kpath, fsentry.Name()) + kpath = kpath[len(st.path):] + + // Load file info. This should already + // be loaded due to the underlying call + // to os.File{}.ReadDir() populating them + info, err := fsentry.Info() + if err != nil { + return err + } + + // Perform provided walk function + return opts.WalkFn(ctx, Entry{ + Key: st.config.Transform.PathToKey(kpath), + Size: info.Size(), + }) + }) +} + +// filepath checks and returns a formatted filepath for given key. +func (st *DiskStorage) filepath(key string) (string, error) { + // Calculate transformed key path + key = st.config.Transform.KeyToPath(key) + + // Acquire path builder + pb := util.GetPathBuilder() + defer util.PutPathBuilder(pb) + + // Generated joined root path + pb.AppendString(st.path) + pb.AppendString(key) + + // Check for dir traversal outside of root + if isDirTraversal(st.path, pb.StringPtr()) { + return "", ErrInvalidKey + } + + return pb.String(), nil +} + +// isDirTraversal will check if rootPlusPath is a dir traversal outside of root, +// assuming that both are cleaned and that rootPlusPath is path.Join(root, somePath). +func isDirTraversal(root, rootPlusPath string) bool { + switch { + // Root is $PWD, check for traversal out of + case root == ".": + return strings.HasPrefix(rootPlusPath, "../") + + // The path MUST be prefixed by root + case !strings.HasPrefix(rootPlusPath, root): + return true + + // In all other cases, check not equal + default: + return len(root) == len(rootPlusPath) + } +} diff --git a/vendor/codeberg.org/gruf/go-store/v2/storage/errors.go b/vendor/codeberg.org/gruf/go-store/v2/storage/errors.go new file mode 100644 index 000000000..4ae7e4be5 --- /dev/null +++ b/vendor/codeberg.org/gruf/go-store/v2/storage/errors.go @@ -0,0 +1,110 @@ +package storage + +import ( + "errors" + "strings" + "syscall" + + "github.com/minio/minio-go/v7" +) + +var ( + // ErrClosed is returned on operations on a closed storage + ErrClosed = new_error("closed") + + // ErrNotFound is the error returned when a key cannot be found in storage + ErrNotFound = new_error("key not found") + + // ErrAlreadyExist is the error returned when a key already exists in storage + ErrAlreadyExists = new_error("key already exists") + + // ErrInvalidkey is the error returned when an invalid key is passed to storage + ErrInvalidKey = new_error("invalid key") + + // ErrAlreadyLocked is returned on fail opening a storage lockfile + ErrAlreadyLocked = new_error("storage lock already open") +) + +// new_error returns a new error instance prefixed by package prefix. +func new_error(msg string) error { + return errors.New("store/storage: " + msg) +} + +// wrappedError allows wrapping together an inner with outer error. +type wrappedError struct { + inner error + outer error +} + +// wrap will return a new wrapped error from given inner and outer errors. +func wrap(outer, inner error) *wrappedError { + return &wrappedError{ + inner: inner, + outer: outer, + } +} + +func (e *wrappedError) Is(target error) bool { + return e.outer == target || e.inner == target +} + +func (e *wrappedError) Error() string { + return e.outer.Error() + ": " + e.inner.Error() +} + +func (e *wrappedError) Unwrap() error { + return e.inner +} + +// errSwapNoop performs no error swaps +func errSwapNoop(err error) error { + return err +} + +// ErrSwapNotFound swaps syscall.ENOENT for ErrNotFound +func errSwapNotFound(err error) error { + if err == syscall.ENOENT { + return ErrNotFound + } + return err +} + +// errSwapExist swaps syscall.EEXIST for ErrAlreadyExists +func errSwapExist(err error) error { + if err == syscall.EEXIST { + return ErrAlreadyExists + } + return err +} + +// errSwapUnavailable swaps syscall.EAGAIN for ErrAlreadyLocked +func errSwapUnavailable(err error) error { + if err == syscall.EAGAIN { + return ErrAlreadyLocked + } + return err +} + +// transformS3Error transforms an error returned from S3Storage underlying +// minio.Core client, by wrapping where necessary with our own error types. +func transformS3Error(err error) error { + // Cast this to a minio error response + ersp, ok := err.(minio.ErrorResponse) + if ok { + switch ersp.Code { + case "NoSuchKey": + return wrap(ErrNotFound, err) + case "Conflict": + return wrap(ErrAlreadyExists, err) + default: + return err + } + } + + // Check if error has an invalid object name prefix + if strings.HasPrefix(err.Error(), "Object name ") { + return wrap(ErrInvalidKey, err) + } + + return err +} diff --git a/vendor/codeberg.org/gruf/go-store/v2/storage/fs.go b/vendor/codeberg.org/gruf/go-store/v2/storage/fs.go new file mode 100644 index 000000000..658b7e762 --- /dev/null +++ b/vendor/codeberg.org/gruf/go-store/v2/storage/fs.go @@ -0,0 +1,221 @@ +package storage + +import ( + "io/fs" + "os" + "syscall" + + "codeberg.org/gruf/go-fastpath" + "codeberg.org/gruf/go-store/v2/util" +) + +const ( + // default file permission bits + defaultDirPerms = 0o755 + defaultFilePerms = 0o644 + + // default file open flags + defaultFileROFlags = syscall.O_RDONLY + defaultFileRWFlags = syscall.O_CREAT | syscall.O_RDWR + defaultFileLockFlags = syscall.O_RDONLY | syscall.O_CREAT +) + +// NOTE: +// These functions are for opening storage files, +// not necessarily for e.g. initial setup (OpenFile) + +// walkDir traverses the dir tree of the supplied path, performing the supplied walkFn on each entry +func walkDir(pb *fastpath.Builder, path string, walkFn func(string, fs.DirEntry) error) error { + // Read directory entries + entries, err := readDir(path) + if err != nil { + return err + } + + // frame represents a directory entry + // walk-loop snapshot, taken when a sub + // directory requiring iteration is found + type frame struct { + path string + entries []fs.DirEntry + } + + // stack contains a list of held snapshot + // frames, representing unfinished upper + // layers of a directory structure yet to + // be traversed. + var stack []frame + +outer: + for { + if len(entries) == 0 { + if len(stack) == 0 { + // Reached end + break outer + } + + // Pop frame from stack + frame := stack[len(stack)-1] + stack = stack[:len(stack)-1] + + // Update loop vars + entries = frame.entries + path = frame.path + } + + for len(entries) > 0 { + // Pop next entry from queue + entry := entries[0] + entries = entries[1:] + + // Pass to provided walk function + if err := walkFn(path, entry); err != nil { + return err + } + + if entry.IsDir() { + // Push current frame to stack + stack = append(stack, frame{ + path: path, + entries: entries, + }) + + // Update current directory path + path = pb.Join(path, entry.Name()) + + // Read next directory entries + next, err := readDir(path) + if err != nil { + return err + } + + // Set next entries + entries = next + + continue outer + } + } + } + + return nil +} + +// cleanDirs traverses the dir tree of the supplied path, removing any folders with zero children +func cleanDirs(path string) error { + // Acquire path builder + pb := util.GetPathBuilder() + defer util.PutPathBuilder(pb) + + // Get top-level dir entries + entries, err := readDir(path) + if err != nil { + return err + } + + for _, entry := range entries { + if entry.IsDir() { + // Recursively clean sub-directory entries + if err := cleanDir(pb, pb.Join(path, entry.Name())); err != nil { + return err + } + } + } + + return nil +} + +// cleanDir performs the actual dir cleaning logic for the above top-level version. +func cleanDir(pb *fastpath.Builder, path string) error { + // Get dir entries + entries, err := readDir(path) + if err != nil { + return err + } + + // If no entries, delete + if len(entries) < 1 { + return rmdir(path) + } + + for _, entry := range entries { + if entry.IsDir() { + // Recursively clean sub-directory entries + if err := cleanDir(pb, pb.Join(path, entry.Name())); err != nil { + return err + } + } + } + + return nil +} + +// readDir will open file at path, read the unsorted list of entries, then close. +func readDir(path string) ([]fs.DirEntry, error) { + // Open file at path + file, err := open(path, defaultFileROFlags) + if err != nil { + return nil, err + } + + // Read directory entries + entries, err := file.ReadDir(-1) + + // Done with file + _ = file.Close() + + return entries, err +} + +// open will open a file at the given path with flags and default file perms. +func open(path string, flags int) (*os.File, error) { + var fd int + err := retryOnEINTR(func() (err error) { + fd, err = syscall.Open(path, flags, defaultFilePerms) + return + }) + if err != nil { + return nil, err + } + return os.NewFile(uintptr(fd), path), nil +} + +// stat checks for a file on disk. +func stat(path string) (bool, error) { + var stat syscall.Stat_t + err := retryOnEINTR(func() error { + return syscall.Stat(path, &stat) + }) + if err != nil { + if err == syscall.ENOENT { + // not-found is no error + err = nil + } + return false, err + } + return true, nil +} + +// unlink removes a file (not dir!) on disk. +func unlink(path string) error { + return retryOnEINTR(func() error { + return syscall.Unlink(path) + }) +} + +// rmdir removes a dir (not file!) on disk. +func rmdir(path string) error { + return retryOnEINTR(func() error { + return syscall.Rmdir(path) + }) +} + +// retryOnEINTR is a low-level filesystem function for retrying syscalls on O_EINTR received. +func retryOnEINTR(do func() error) error { + for { + err := do() + if err == syscall.EINTR { + continue + } + return err + } +} diff --git a/vendor/codeberg.org/gruf/go-store/v2/storage/lock.go b/vendor/codeberg.org/gruf/go-store/v2/storage/lock.go new file mode 100644 index 000000000..25ecefe52 --- /dev/null +++ b/vendor/codeberg.org/gruf/go-store/v2/storage/lock.go @@ -0,0 +1,59 @@ +package storage + +import ( + "sync/atomic" + "syscall" +) + +// LockFile is our standard lockfile name. +const LockFile = "store.lock" + +// Lock represents a filesystem lock to ensure only one storage instance open per path. +type Lock struct { + fd int + st uint32 +} + +// OpenLock opens a lockfile at path. +func OpenLock(path string) (*Lock, error) { + var fd int + + // Open the file descriptor at path + err := retryOnEINTR(func() (err error) { + fd, err = syscall.Open(path, defaultFileLockFlags, defaultFilePerms) + return + }) + if err != nil { + return nil, err + } + + // Get a flock on the file descriptor + err = retryOnEINTR(func() error { + return syscall.Flock(fd, syscall.LOCK_EX|syscall.LOCK_NB) + }) + if err != nil { + return nil, errSwapUnavailable(err) + } + + return &Lock{fd: fd}, nil +} + +// Close will attempt to close the lockfile and file descriptor. +func (f *Lock) Close() error { + var err error + if atomic.CompareAndSwapUint32(&f.st, 0, 1) { + // Ensure gets closed + defer syscall.Close(f.fd) + + // Call funlock on the file descriptor + err = retryOnEINTR(func() error { + return syscall.Flock(f.fd, syscall.LOCK_UN|syscall.LOCK_NB) + }) + } + return err +} + +// Closed will return whether this lockfile has been closed (and unlocked). +func (f *Lock) Closed() bool { + return (atomic.LoadUint32(&f.st) == 1) +} diff --git a/vendor/codeberg.org/gruf/go-store/v2/storage/memory.go b/vendor/codeberg.org/gruf/go-store/v2/storage/memory.go new file mode 100644 index 000000000..a853c84d2 --- /dev/null +++ b/vendor/codeberg.org/gruf/go-store/v2/storage/memory.go @@ -0,0 +1,228 @@ +package storage + +import ( + "context" + "io" + "sync/atomic" + + "codeberg.org/gruf/go-bytes" + "codeberg.org/gruf/go-store/v2/util" + "github.com/cornelk/hashmap" +) + +// MemoryStorage is a storage implementation that simply stores key-value +// pairs in a Go map in-memory. The map is protected by a mutex. +type MemoryStorage struct { + ow bool // overwrites + fs *hashmap.Map[string, []byte] + st uint32 +} + +// OpenMemory opens a new MemoryStorage instance with internal map starting size. +func OpenMemory(size int, overwrites bool) *MemoryStorage { + if size <= 0 { + size = 8 + } + return &MemoryStorage{ + fs: hashmap.NewSized[string, []byte](uintptr(size)), + ow: overwrites, + } +} + +// Clean implements Storage.Clean(). +func (st *MemoryStorage) Clean(ctx context.Context) error { + // Check store open + if st.closed() { + return ErrClosed + } + + // Check context still valid + if err := ctx.Err(); err != nil { + return err + } + + return nil +} + +// ReadBytes implements Storage.ReadBytes(). +func (st *MemoryStorage) ReadBytes(ctx context.Context, key string) ([]byte, error) { + // Check store open + if st.closed() { + return nil, ErrClosed + } + + // Check context still valid + if err := ctx.Err(); err != nil { + return nil, err + } + + // Check for key in store + b, ok := st.fs.Get(key) + if !ok { + return nil, ErrNotFound + } + + // Create return copy + return copyb(b), nil +} + +// ReadStream implements Storage.ReadStream(). +func (st *MemoryStorage) ReadStream(ctx context.Context, key string) (io.ReadCloser, error) { + // Check store open + if st.closed() { + return nil, ErrClosed + } + + // Check context still valid + if err := ctx.Err(); err != nil { + return nil, err + } + + // Check for key in store + b, ok := st.fs.Get(key) + if !ok { + return nil, ErrNotFound + } + + // Create io.ReadCloser from 'b' copy + r := bytes.NewReader(copyb(b)) + return util.NopReadCloser(r), nil +} + +// WriteBytes implements Storage.WriteBytes(). +func (st *MemoryStorage) WriteBytes(ctx context.Context, key string, b []byte) error { + // Check store open + if st.closed() { + return ErrClosed + } + + // Check context still valid + if err := ctx.Err(); err != nil { + return err + } + + // Check for key that already exists + if _, ok := st.fs.Get(key); ok && !st.ow { + return ErrAlreadyExists + } + + // Write key copy to store + st.fs.Set(key, copyb(b)) + return nil +} + +// WriteStream implements Storage.WriteStream(). +func (st *MemoryStorage) WriteStream(ctx context.Context, key string, r io.Reader) error { + // Check store open + if st.closed() { + return ErrClosed + } + + // Check context still valid + if err := ctx.Err(); err != nil { + return err + } + + // Check for key that already exists + if _, ok := st.fs.Get(key); ok && !st.ow { + return ErrAlreadyExists + } + + // Read all from reader + b, err := io.ReadAll(r) + if err != nil { + return err + } + + // Write key to store + st.fs.Set(key, b) + return nil +} + +// Stat implements Storage.Stat(). +func (st *MemoryStorage) Stat(ctx context.Context, key string) (bool, error) { + // Check store open + if st.closed() { + return false, ErrClosed + } + + // Check context still valid + if err := ctx.Err(); err != nil { + return false, err + } + + // Check for key in store + _, ok := st.fs.Get(key) + return ok, nil +} + +// Remove implements Storage.Remove(). +func (st *MemoryStorage) Remove(ctx context.Context, key string) error { + // Check store open + if st.closed() { + return ErrClosed + } + + // Check context still valid + if err := ctx.Err(); err != nil { + return err + } + + // Attempt to delete key + ok := st.fs.Del(key) + if !ok { + return ErrNotFound + } + + return nil +} + +// WalkKeys implements Storage.WalkKeys(). +func (st *MemoryStorage) WalkKeys(ctx context.Context, opts WalkKeysOptions) error { + // Check store open + if st.closed() { + return ErrClosed + } + + // Check context still valid + if err := ctx.Err(); err != nil { + return err + } + + var err error + + // Nil check func + _ = opts.WalkFn + + // Pass each key in map to walk function + st.fs.Range(func(key string, val []byte) bool { + err = opts.WalkFn(ctx, Entry{ + Key: key, + Size: int64(len(val)), + }) + return (err == nil) + }) + + return err +} + +// Close implements Storage.Close(). +func (st *MemoryStorage) Close() error { + atomic.StoreUint32(&st.st, 1) + return nil +} + +// closed returns whether MemoryStorage is closed. +func (st *MemoryStorage) closed() bool { + return (atomic.LoadUint32(&st.st) == 1) +} + +// copyb returns a copy of byte-slice b. +func copyb(b []byte) []byte { + if b == nil { + return nil + } + p := make([]byte, len(b)) + _ = copy(p, b) + return p +} diff --git a/vendor/codeberg.org/gruf/go-store/v2/storage/s3.go b/vendor/codeberg.org/gruf/go-store/v2/storage/s3.go new file mode 100644 index 000000000..baf2a1b3c --- /dev/null +++ b/vendor/codeberg.org/gruf/go-store/v2/storage/s3.go @@ -0,0 +1,385 @@ +package storage + +import ( + "bytes" + "context" + "io" + "sync/atomic" + + "codeberg.org/gruf/go-store/v2/util" + "github.com/minio/minio-go/v7" +) + +// DefaultS3Config is the default S3Storage configuration. +var DefaultS3Config = &S3Config{ + CoreOpts: minio.Options{}, + GetOpts: minio.GetObjectOptions{}, + PutOpts: minio.PutObjectOptions{}, + PutChunkSize: 4 * 1024 * 1024, // 4MiB + StatOpts: minio.StatObjectOptions{}, + RemoveOpts: minio.RemoveObjectOptions{}, + ListSize: 200, +} + +// S3Config defines options to be used when opening an S3Storage, +// mostly options for underlying S3 client library. +type S3Config struct { + // CoreOpts are S3 client options passed during initialization. + CoreOpts minio.Options + + // GetOpts are S3 client options passed during .Read___() calls. + GetOpts minio.GetObjectOptions + + // PutOpts are S3 client options passed during .Write___() calls. + PutOpts minio.PutObjectOptions + + // PutChunkSize is the chunk size (in bytes) to use when sending + // a byte stream reader of unknown size as a multi-part object. + PutChunkSize int64 + + // StatOpts are S3 client options passed during .Stat() calls. + StatOpts minio.StatObjectOptions + + // RemoveOpts are S3 client options passed during .Remove() calls. + RemoveOpts minio.RemoveObjectOptions + + // ListSize determines how many items to include in each + // list request, made during calls to .WalkKeys(). + ListSize int +} + +// getS3Config returns a valid S3Config for supplied ptr. +func getS3Config(cfg *S3Config) S3Config { + // If nil, use default + if cfg == nil { + cfg = DefaultS3Config + } + + // Assume 0 chunk size == use default + if cfg.PutChunkSize <= 0 { + cfg.PutChunkSize = 4 * 1024 * 1024 + } + + // Assume 0 list size == use default + if cfg.ListSize <= 0 { + cfg.ListSize = 200 + } + + // Return owned config copy + return S3Config{ + CoreOpts: cfg.CoreOpts, + GetOpts: cfg.GetOpts, + PutOpts: cfg.PutOpts, + StatOpts: cfg.StatOpts, + RemoveOpts: cfg.RemoveOpts, + } +} + +// S3Storage is a storage implementation that stores key-value +// pairs in an S3 instance at given endpoint with bucket name. +type S3Storage struct { + client *minio.Core + bucket string + config S3Config + state uint32 +} + +// OpenS3 opens a new S3Storage instance with given S3 endpoint URL, bucket name and configuration. +func OpenS3(endpoint string, bucket string, cfg *S3Config) (*S3Storage, error) { + // Get checked config + config := getS3Config(cfg) + + // Create new S3 client connection + client, err := minio.NewCore(endpoint, &config.CoreOpts) + if err != nil { + return nil, err + } + + // Check that provided bucket actually exists + exists, err := client.BucketExists(context.Background(), bucket) + if err != nil { + return nil, err + } else if !exists { + return nil, new_error("bucket does not exist") + } + + return &S3Storage{ + client: client, + bucket: bucket, + config: config, + }, nil +} + +// Client returns access to the underlying S3 client. +func (st *S3Storage) Client() *minio.Core { + return st.client +} + +// Clean implements Storage.Clean(). +func (st *S3Storage) Clean(ctx context.Context) error { + return nil // nothing to do for S3 +} + +// ReadBytes implements Storage.ReadBytes(). +func (st *S3Storage) ReadBytes(ctx context.Context, key string) ([]byte, error) { + // Fetch object reader from S3 bucket + rc, err := st.ReadStream(ctx, key) + if err != nil { + return nil, err + } + defer rc.Close() + + // Read all bytes and return + return io.ReadAll(rc) +} + +// ReadStream implements Storage.ReadStream(). +func (st *S3Storage) ReadStream(ctx context.Context, key string) (io.ReadCloser, error) { + // Check storage open + if st.closed() { + return nil, ErrClosed + } + + // Fetch object reader from S3 bucket + rc, _, _, err := st.client.GetObject( + ctx, + st.bucket, + key, + st.config.GetOpts, + ) + if err != nil { + return nil, transformS3Error(err) + } + + return rc, nil +} + +// WriteBytes implements Storage.WriteBytes(). +func (st *S3Storage) WriteBytes(ctx context.Context, key string, value []byte) error { + return st.WriteStream(ctx, key, util.NewByteReaderSize(value)) +} + +// WriteStream implements Storage.WriteStream(). +func (st *S3Storage) WriteStream(ctx context.Context, key string, r io.Reader) error { + // Check storage open + if st.closed() { + return ErrClosed + } + + if rs, ok := r.(util.ReaderSize); ok { + // This reader supports providing us the size of + // the encompassed data, allowing us to perform + // a singular .PutObject() call with length. + _, err := st.client.PutObject( + ctx, + st.bucket, + key, + r, + rs.Size(), + "", + "", + st.config.PutOpts, + ) + if err != nil { + return transformS3Error(err) + } + return nil + } + + // Start a new multipart upload to get ID + uploadID, err := st.client.NewMultipartUpload( + ctx, + st.bucket, + key, + st.config.PutOpts, + ) + if err != nil { + return transformS3Error(err) + } + + var ( + count int + parts []minio.CompletePart + chunk = make([]byte, st.config.PutChunkSize) + rdr = bytes.NewReader(nil) + ) + + // Note that we do not perform any kind of + // memory pooling of the chunk buffers here. + // Optimal chunking sizes for S3 writes are in + // the orders of megabytes, so letting the GC + // collect these ASAP is much preferred. + +loop: + for done := false; !done; { + // Read next chunk into byte buffer + n, err := io.ReadFull(r, chunk) + + switch err { + // Successful read + case nil: + + // Reached end, buffer empty + case io.EOF: + break loop + + // Reached end, but buffer not empty + case io.ErrUnexpectedEOF: + done = true + + // All other errors + default: + return err + } + + // Reset byte reader + rdr.Reset(chunk[:n]) + + // Put this object chunk in S3 store + pt, err := st.client.PutObjectPart( + ctx, + st.bucket, + key, + uploadID, + count, + rdr, + st.config.PutChunkSize, + "", + "", + nil, + ) + if err != nil { + return err + } + + // Append completed part to slice + parts = append(parts, minio.CompletePart{ + PartNumber: pt.PartNumber, + ETag: pt.ETag, + ChecksumCRC32: pt.ChecksumCRC32, + ChecksumCRC32C: pt.ChecksumCRC32C, + ChecksumSHA1: pt.ChecksumSHA1, + ChecksumSHA256: pt.ChecksumSHA256, + }) + + // Iterate part count + count++ + } + + // Complete this multi-part upload operation + _, err = st.client.CompleteMultipartUpload( + ctx, + st.bucket, + key, + uploadID, + parts, + st.config.PutOpts, + ) + if err != nil { + return err + } + + return nil +} + +// Stat implements Storage.Stat(). +func (st *S3Storage) Stat(ctx context.Context, key string) (bool, error) { + // Check storage open + if st.closed() { + return false, ErrClosed + } + + // Query object in S3 bucket + _, err := st.client.StatObject( + ctx, + st.bucket, + key, + st.config.StatOpts, + ) + if err != nil { + return false, transformS3Error(err) + } + + return true, nil +} + +// Remove implements Storage.Remove(). +func (st *S3Storage) Remove(ctx context.Context, key string) error { + // Check storage open + if st.closed() { + return ErrClosed + } + + // S3 returns no error on remove for non-existent keys + if ok, err := st.Stat(ctx, key); err != nil { + return err + } else if !ok { + return ErrNotFound + } + + // Remove object from S3 bucket + err := st.client.RemoveObject( + ctx, + st.bucket, + key, + st.config.RemoveOpts, + ) + if err != nil { + return transformS3Error(err) + } + + return nil +} + +// WalkKeys implements Storage.WalkKeys(). +func (st *S3Storage) WalkKeys(ctx context.Context, opts WalkKeysOptions) error { + var ( + prev string + token string + ) + + for { + // List the objects in bucket starting at marker + result, err := st.client.ListObjectsV2( + st.bucket, + "", + prev, + token, + "", + st.config.ListSize, + ) + if err != nil { + return err + } + + // Pass each object through walk func + for _, obj := range result.Contents { + if err := opts.WalkFn(ctx, Entry{ + Key: obj.Key, + Size: obj.Size, + }); err != nil { + return err + } + } + + // No token means we reached end of bucket + if result.NextContinuationToken == "" { + return nil + } + + // Set continue token and prev mark + token = result.NextContinuationToken + prev = result.StartAfter + } +} + +// Close implements Storage.Close(). +func (st *S3Storage) Close() error { + atomic.StoreUint32(&st.state, 1) + return nil +} + +// closed returns whether S3Storage is closed. +func (st *S3Storage) closed() bool { + return (atomic.LoadUint32(&st.state) == 1) +} diff --git a/vendor/codeberg.org/gruf/go-store/v2/storage/storage.go b/vendor/codeberg.org/gruf/go-store/v2/storage/storage.go new file mode 100644 index 000000000..00fbe7abd --- /dev/null +++ b/vendor/codeberg.org/gruf/go-store/v2/storage/storage.go @@ -0,0 +1,53 @@ +package storage + +import ( + "context" + "io" +) + +// Storage defines a means of storing and accessing key value pairs +type Storage interface { + // ReadBytes returns the byte value for key in storage + ReadBytes(ctx context.Context, key string) ([]byte, error) + + // ReadStream returns an io.ReadCloser for the value bytes at key in the storage + ReadStream(ctx context.Context, key string) (io.ReadCloser, error) + + // WriteBytes writes the supplied value bytes at key in the storage + WriteBytes(ctx context.Context, key string, value []byte) error + + // WriteStream writes the bytes from supplied reader at key in the storage + WriteStream(ctx context.Context, key string, r io.Reader) error + + // Stat checks if the supplied key is in the storage + Stat(ctx context.Context, key string) (bool, error) + + // Remove attempts to remove the supplied key-value pair from storage + Remove(ctx context.Context, key string) error + + // Close will close the storage, releasing any file locks + Close() error + + // Clean removes unused values and unclutters the storage (e.g. removing empty folders) + Clean(ctx context.Context) error + + // WalkKeys walks the keys in the storage + WalkKeys(ctx context.Context, opts WalkKeysOptions) error +} + +// Entry represents a key in a Storage{} implementation, +// with any associated metadata that may have been set. +type Entry struct { + // Key is this entry's unique storage key. + Key string + + // Size is the size of this entry in storage. + // Note that size < 0 indicates unknown. + Size int64 +} + +// WalkKeysOptions defines how to walk the keys in a storage implementation +type WalkKeysOptions struct { + // WalkFn is the function to apply on each StorageEntry + WalkFn func(context.Context, Entry) error +} diff --git a/vendor/codeberg.org/gruf/go-store/v2/storage/transform.go b/vendor/codeberg.org/gruf/go-store/v2/storage/transform.go new file mode 100644 index 000000000..3863dd774 --- /dev/null +++ b/vendor/codeberg.org/gruf/go-store/v2/storage/transform.go @@ -0,0 +1,25 @@ +package storage + +// KeyTransform defines a method of converting store keys to storage paths (and vice-versa) +type KeyTransform interface { + // KeyToPath converts a supplied key to storage path + KeyToPath(string) string + + // PathToKey converts a supplied storage path to key + PathToKey(string) string +} + +type nopKeyTransform struct{} + +// NopTransform returns a nop key transform (i.e. key = path) +func NopTransform() KeyTransform { + return &nopKeyTransform{} +} + +func (t *nopKeyTransform) KeyToPath(key string) string { + return key +} + +func (t *nopKeyTransform) PathToKey(path string) string { + return path +} |
