diff options
author | 2022-11-05 12:10:19 +0100 | |
---|---|---|
committer | 2022-11-05 11:10:19 +0000 | |
commit | bcb80d3ff4a669d52d63950c8830427646c05884 (patch) | |
tree | 4aa95a83545b3f87a80fe4b625cb6f2ad9c4427f /vendor/codeberg.org/gruf/go-store | |
parent | [bugfix] Increase field size limits when registering apps (#958) (diff) | |
download | gotosocial-bcb80d3ff4a669d52d63950c8830427646c05884.tar.xz |
[chore] bump gruf/go-store to v2 (#953)
* [chore] bump gruf/go-store to v2
* no more boobs
Diffstat (limited to 'vendor/codeberg.org/gruf/go-store')
25 files changed, 1846 insertions, 1170 deletions
diff --git a/vendor/codeberg.org/gruf/go-store/kv/iterator.go b/vendor/codeberg.org/gruf/go-store/kv/iterator.go deleted file mode 100644 index 2fe5dd428..000000000 --- a/vendor/codeberg.org/gruf/go-store/kv/iterator.go +++ /dev/null @@ -1,63 +0,0 @@ -package kv - -import ( - "errors" - - "codeberg.org/gruf/go-mutexes" - "codeberg.org/gruf/go-store/storage" -) - -var ErrIteratorClosed = errors.New("store/kv: iterator closed") - -// KVIterator provides a read-only iterator to all the key-value -// pairs in a KVStore. While the iterator is open the store is read -// locked, you MUST release the iterator when you are finished with -// it. -// -// Please note: -// - individual iterators are NOT concurrency safe, though it is safe to -// have multiple iterators running concurrently -type KVIterator struct { - store *KVStore // store is the linked KVStore - state *mutexes.LockState - entries []storage.StorageEntry - index int - key string -} - -// Next attempts to set the next key-value pair, the -// return value is if there was another pair remaining -func (i *KVIterator) Next() bool { - next := i.index + 1 - if next >= len(i.entries) { - i.key = "" - return false - } - i.key = i.entries[next].Key() - i.index = next - return true -} - -// Key returns the next key from the store -func (i *KVIterator) Key() string { - return i.key -} - -// Release releases the KVIterator and KVStore's read lock -func (i *KVIterator) Release() { - i.state.UnlockMap() - i.store = nil - i.key = "" - i.entries = nil -} - -// Value returns the next value from the KVStore -func (i *KVIterator) Value() ([]byte, error) { - // Check store isn't closed - if i.store == nil { - return nil, ErrIteratorClosed - } - - // Attempt to fetch from store - return i.store.get(i.state.RLock, i.key) -} diff --git a/vendor/codeberg.org/gruf/go-store/kv/state.go b/vendor/codeberg.org/gruf/go-store/kv/state.go deleted file mode 100644 index f9f789521..000000000 --- a/vendor/codeberg.org/gruf/go-store/kv/state.go +++ /dev/null @@ -1,130 +0,0 @@ -package kv - -import ( - "errors" - "io" - - "codeberg.org/gruf/go-mutexes" -) - -var ErrStateClosed = errors.New("store/kv: state closed") - -// StateRO provides a read-only window to the store. While this -// state is active during the Read() function window, the entire -// store will be read-locked. The state is thread-safe for concurrent -// use UNTIL the moment that your supplied function to Read() returns, -// then the state has zero guarantees -type StateRO struct { - store *KVStore - state *mutexes.LockState -} - -func (st *StateRO) Get(key string) ([]byte, error) { - // Check not closed - if st.store == nil { - return nil, ErrStateClosed - } - - // Pass request to store - return st.store.get(st.state.RLock, key) -} - -func (st *StateRO) GetStream(key string) (io.ReadCloser, error) { - // Check not closed - if st.store == nil { - return nil, ErrStateClosed - } - - // Pass request to store - return st.store.getStream(st.state.RLock, key) -} - -func (st *StateRO) Has(key string) (bool, error) { - // Check not closed - if st.store == nil { - return false, ErrStateClosed - } - - // Pass request to store - return st.store.has(st.state.RLock, key) -} - -func (st *StateRO) Release() { - st.state.UnlockMap() - st.store = nil -} - -// StateRW provides a read-write window to the store. While this -// state is active during the Update() function window, the entire -// store will be locked. The state is thread-safe for concurrent -// use UNTIL the moment that your supplied function to Update() returns, -// then the state has zero guarantees -type StateRW struct { - store *KVStore - state *mutexes.LockState -} - -func (st *StateRW) Get(key string) ([]byte, error) { - // Check not closed - if st.store == nil { - return nil, ErrStateClosed - } - - // Pass request to store - return st.store.get(st.state.RLock, key) -} - -func (st *StateRW) GetStream(key string) (io.ReadCloser, error) { - // Check not closed - if st.store == nil { - return nil, ErrStateClosed - } - - // Pass request to store - return st.store.getStream(st.state.RLock, key) -} - -func (st *StateRW) Put(key string, value []byte) error { - // Check not closed - if st.store == nil { - return ErrStateClosed - } - - // Pass request to store - return st.store.put(st.state.Lock, key, value) -} - -func (st *StateRW) PutStream(key string, r io.Reader) error { - // Check not closed - if st.store == nil { - return ErrStateClosed - } - - // Pass request to store - return st.store.putStream(st.state.Lock, key, r) -} - -func (st *StateRW) Has(key string) (bool, error) { - // Check not closed - if st.store == nil { - return false, ErrStateClosed - } - - // Pass request to store - return st.store.has(st.state.RLock, key) -} - -func (st *StateRW) Delete(key string) error { - // Check not closed - if st.store == nil { - return ErrStateClosed - } - - // Pass request to store - return st.store.delete(st.state.Lock, key) -} - -func (st *StateRW) Release() { - st.state.UnlockMap() - st.store = nil -} diff --git a/vendor/codeberg.org/gruf/go-store/kv/store.go b/vendor/codeberg.org/gruf/go-store/kv/store.go deleted file mode 100644 index fd9935f25..000000000 --- a/vendor/codeberg.org/gruf/go-store/kv/store.go +++ /dev/null @@ -1,227 +0,0 @@ -package kv - -import ( - "io" - - "codeberg.org/gruf/go-mutexes" - "codeberg.org/gruf/go-store/storage" - "codeberg.org/gruf/go-store/util" -) - -// KVStore is a very simple, yet performant key-value store -type KVStore struct { - mutex mutexes.MutexMap // mutex is a map of keys to mutexes to protect file access - storage storage.Storage // storage is the underlying storage -} - -func OpenFile(path string, cfg *storage.DiskConfig) (*KVStore, error) { - // Attempt to open disk storage - storage, err := storage.OpenFile(path, cfg) - if err != nil { - return nil, err - } - - // Return new KVStore - return OpenStorage(storage) -} - -func OpenBlock(path string, cfg *storage.BlockConfig) (*KVStore, error) { - // Attempt to open block storage - storage, err := storage.OpenBlock(path, cfg) - if err != nil { - return nil, err - } - - // Return new KVStore - return OpenStorage(storage) -} - -func OpenStorage(storage storage.Storage) (*KVStore, error) { - // Perform initial storage clean - err := storage.Clean() - if err != nil { - return nil, err - } - - // Return new KVStore - return &KVStore{ - mutex: mutexes.NewMap(-1, -1), - storage: storage, - }, nil -} - -// RLock acquires a read-lock on supplied key, returning unlock function. -func (st *KVStore) RLock(key string) (runlock func()) { - return st.mutex.RLock(key) -} - -// Lock acquires a write-lock on supplied key, returning unlock function. -func (st *KVStore) Lock(key string) (unlock func()) { - return st.mutex.Lock(key) -} - -// Get fetches the bytes for supplied key in the store -func (st *KVStore) Get(key string) ([]byte, error) { - return st.get(st.RLock, key) -} - -func (st *KVStore) get(rlock func(string) func(), key string) ([]byte, error) { - // Acquire read lock for key - runlock := rlock(key) - defer runlock() - - // Read file bytes - return st.storage.ReadBytes(key) -} - -// GetStream fetches a ReadCloser for the bytes at the supplied key location in the store -func (st *KVStore) GetStream(key string) (io.ReadCloser, error) { - return st.getStream(st.RLock, key) -} - -func (st *KVStore) getStream(rlock func(string) func(), key string) (io.ReadCloser, error) { - // Acquire read lock for key - runlock := rlock(key) - - // Attempt to open stream for read - rd, err := st.storage.ReadStream(key) - if err != nil { - runlock() - return nil, err - } - - // Wrap readcloser in our own callback closer - return util.ReadCloserWithCallback(rd, runlock), nil -} - -// Put places the bytes at the supplied key location in the store -func (st *KVStore) Put(key string, value []byte) error { - return st.put(st.Lock, key, value) -} - -func (st *KVStore) put(lock func(string) func(), key string, value []byte) error { - // Acquire write lock for key - unlock := lock(key) - defer unlock() - - // Write file bytes - return st.storage.WriteBytes(key, value) -} - -// PutStream writes the bytes from the supplied Reader at the supplied key location in the store -func (st *KVStore) PutStream(key string, r io.Reader) error { - return st.putStream(st.Lock, key, r) -} - -func (st *KVStore) putStream(lock func(string) func(), key string, r io.Reader) error { - // Acquire write lock for key - unlock := lock(key) - defer unlock() - - // Write file stream - return st.storage.WriteStream(key, r) -} - -// Has checks whether the supplied key exists in the store -func (st *KVStore) Has(key string) (bool, error) { - return st.has(st.RLock, key) -} - -func (st *KVStore) has(rlock func(string) func(), key string) (bool, error) { - // Acquire read lock for key - runlock := rlock(key) - defer runlock() - - // Stat file on disk - return st.storage.Stat(key) -} - -// Delete removes the supplied key-value pair from the store -func (st *KVStore) Delete(key string) error { - return st.delete(st.Lock, key) -} - -func (st *KVStore) delete(lock func(string) func(), key string) error { - // Acquire write lock for key - unlock := lock(key) - defer unlock() - - // Remove file from disk - return st.storage.Remove(key) -} - -// Iterator returns an Iterator for key-value pairs in the store, using supplied match function -func (st *KVStore) Iterator(matchFn func(string) bool) (*KVIterator, error) { - // If no function, match all - if matchFn == nil { - matchFn = func(string) bool { return true } - } - - // Get store read lock - state := st.mutex.RLockMap() - - // Setup the walk keys function - entries := []storage.StorageEntry{} - walkFn := func(entry storage.StorageEntry) { - // Ignore unmatched entries - if !matchFn(entry.Key()) { - return - } - - // Add to entries - entries = append(entries, entry) - } - - // Walk keys in the storage - err := st.storage.WalkKeys(storage.WalkKeysOptions{WalkFn: walkFn}) - if err != nil { - state.UnlockMap() - return nil, err - } - - // Return new iterator - return &KVIterator{ - store: st, - state: state, - entries: entries, - index: -1, - key: "", - }, nil -} - -// Read provides a read-only window to the store, holding it in a read-locked state until release -func (st *KVStore) Read() *StateRO { - state := st.mutex.RLockMap() - return &StateRO{store: st, state: state} -} - -// ReadFn provides a read-only window to the store, holding it in a read-locked state until fn return. -func (st *KVStore) ReadFn(fn func(*StateRO)) { - // Acquire read-only state - state := st.Read() - defer state.Release() - - // Pass to fn - fn(state) -} - -// Update provides a read-write window to the store, holding it in a write-locked state until release -func (st *KVStore) Update() *StateRW { - state := st.mutex.LockMap() - return &StateRW{store: st, state: state} -} - -// UpdateFn provides a read-write window to the store, holding it in a write-locked state until fn return. -func (st *KVStore) UpdateFn(fn func(*StateRW)) { - // Acquire read-write state - state := st.Update() - defer state.Release() - - // Pass to fn - fn(state) -} - -// Close will close the underlying storage, the mutex map locking (e.g. RLock(), Lock() will still work). -func (st *KVStore) Close() error { - return st.storage.Close() -} diff --git a/vendor/codeberg.org/gruf/go-store/storage/compressor.go b/vendor/codeberg.org/gruf/go-store/storage/compressor.go deleted file mode 100644 index d6b975db0..000000000 --- a/vendor/codeberg.org/gruf/go-store/storage/compressor.go +++ /dev/null @@ -1,104 +0,0 @@ -package storage - -import ( - "compress/gzip" - "compress/zlib" - "io" - - "codeberg.org/gruf/go-store/util" - "github.com/golang/snappy" -) - -// Compressor defines a means of compressing/decompressing values going into a key-value store -type Compressor interface { - // Reader returns a new decompressing io.ReadCloser based on supplied (compressed) io.Reader - Reader(io.Reader) (io.ReadCloser, error) - - // Writer returns a new compressing io.WriteCloser based on supplied (uncompressed) io.Writer - Writer(io.Writer) (io.WriteCloser, error) -} - -type gzipCompressor struct { - level int -} - -// GZipCompressor returns a new Compressor that implements GZip at default compression level -func GZipCompressor() Compressor { - return GZipCompressorLevel(gzip.DefaultCompression) -} - -// GZipCompressorLevel returns a new Compressor that implements GZip at supplied compression level -func GZipCompressorLevel(level int) Compressor { - return &gzipCompressor{ - level: level, - } -} - -func (c *gzipCompressor) Reader(r io.Reader) (io.ReadCloser, error) { - return gzip.NewReader(r) -} - -func (c *gzipCompressor) Writer(w io.Writer) (io.WriteCloser, error) { - return gzip.NewWriterLevel(w, c.level) -} - -type zlibCompressor struct { - level int - dict []byte -} - -// ZLibCompressor returns a new Compressor that implements ZLib at default compression level -func ZLibCompressor() Compressor { - return ZLibCompressorLevelDict(zlib.DefaultCompression, nil) -} - -// ZLibCompressorLevel returns a new Compressor that implements ZLib at supplied compression level -func ZLibCompressorLevel(level int) Compressor { - return ZLibCompressorLevelDict(level, nil) -} - -// ZLibCompressorLevelDict returns a new Compressor that implements ZLib at supplied compression level with supplied dict -func ZLibCompressorLevelDict(level int, dict []byte) Compressor { - return &zlibCompressor{ - level: level, - dict: dict, - } -} - -func (c *zlibCompressor) Reader(r io.Reader) (io.ReadCloser, error) { - return zlib.NewReaderDict(r, c.dict) -} - -func (c *zlibCompressor) Writer(w io.Writer) (io.WriteCloser, error) { - return zlib.NewWriterLevelDict(w, c.level, c.dict) -} - -type snappyCompressor struct{} - -// SnappyCompressor returns a new Compressor that implements Snappy -func SnappyCompressor() Compressor { - return &snappyCompressor{} -} - -func (c *snappyCompressor) Reader(r io.Reader) (io.ReadCloser, error) { - return util.NopReadCloser(snappy.NewReader(r)), nil -} - -func (c *snappyCompressor) Writer(w io.Writer) (io.WriteCloser, error) { - return snappy.NewBufferedWriter(w), nil -} - -type nopCompressor struct{} - -// NoCompression is a Compressor that simply does nothing -func NoCompression() Compressor { - return &nopCompressor{} -} - -func (c *nopCompressor) Reader(r io.Reader) (io.ReadCloser, error) { - return util.NopReadCloser(r), nil -} - -func (c *nopCompressor) Writer(w io.Writer) (io.WriteCloser, error) { - return util.NopWriteCloser(w), nil -} diff --git a/vendor/codeberg.org/gruf/go-store/storage/fs.go b/vendor/codeberg.org/gruf/go-store/storage/fs.go deleted file mode 100644 index b4729b041..000000000 --- a/vendor/codeberg.org/gruf/go-store/storage/fs.go +++ /dev/null @@ -1,65 +0,0 @@ -package storage - -import ( - "os" - "syscall" - - "codeberg.org/gruf/go-store/util" -) - -const ( - // default file permission bits - defaultDirPerms = 0o755 - defaultFilePerms = 0o644 - - // default file open flags - defaultFileROFlags = syscall.O_RDONLY - defaultFileRWFlags = syscall.O_CREAT | syscall.O_RDWR - defaultFileLockFlags = syscall.O_RDONLY | syscall.O_CREAT -) - -// NOTE: -// These functions are for opening storage files, -// not necessarily for e.g. initial setup (OpenFile) - -// open should not be called directly. -func open(path string, flags int) (*os.File, error) { - var fd int - err := util.RetryOnEINTR(func() (err error) { - fd, err = syscall.Open(path, flags, defaultFilePerms) - return - }) - if err != nil { - return nil, err - } - return os.NewFile(uintptr(fd), path), nil -} - -// stat checks for a file on disk. -func stat(path string) (bool, error) { - var stat syscall.Stat_t - err := util.RetryOnEINTR(func() error { - return syscall.Stat(path, &stat) - }) - if err != nil { - if err == syscall.ENOENT { //nolint - err = nil - } - return false, err - } - return true, nil -} - -// unlink removes a file (not dir!) on disk. -func unlink(path string) error { - return util.RetryOnEINTR(func() error { - return syscall.Unlink(path) - }) -} - -// rmdir removes a dir (not file!) on disk. -func rmdir(path string) error { - return util.RetryOnEINTR(func() error { - return syscall.Rmdir(path) - }) -} diff --git a/vendor/codeberg.org/gruf/go-store/storage/memory.go b/vendor/codeberg.org/gruf/go-store/storage/memory.go deleted file mode 100644 index 2dab562d6..000000000 --- a/vendor/codeberg.org/gruf/go-store/storage/memory.go +++ /dev/null @@ -1,188 +0,0 @@ -package storage - -import ( - "io" - "sync" - - "codeberg.org/gruf/go-bytes" - "codeberg.org/gruf/go-store/util" -) - -// MemoryStorage is a storage implementation that simply stores key-value -// pairs in a Go map in-memory. The map is protected by a mutex. -type MemoryStorage struct { - ow bool // overwrites - fs map[string][]byte - mu sync.Mutex - st uint32 -} - -// OpenMemory opens a new MemoryStorage instance with internal map of 'size'. -func OpenMemory(size int, overwrites bool) *MemoryStorage { - return &MemoryStorage{ - fs: make(map[string][]byte, size), - mu: sync.Mutex{}, - ow: overwrites, - } -} - -// Clean implements Storage.Clean(). -func (st *MemoryStorage) Clean() error { - st.mu.Lock() - defer st.mu.Unlock() - if st.st == 1 { - return ErrClosed - } - return nil -} - -// ReadBytes implements Storage.ReadBytes(). -func (st *MemoryStorage) ReadBytes(key string) ([]byte, error) { - // Lock storage - st.mu.Lock() - - // Check store open - if st.st == 1 { - st.mu.Unlock() - return nil, ErrClosed - } - - // Check for key - b, ok := st.fs[key] - st.mu.Unlock() - - // Return early if not exist - if !ok { - return nil, ErrNotFound - } - - // Create return copy - return bytes.Copy(b), nil -} - -// ReadStream implements Storage.ReadStream(). -func (st *MemoryStorage) ReadStream(key string) (io.ReadCloser, error) { - // Lock storage - st.mu.Lock() - - // Check store open - if st.st == 1 { - st.mu.Unlock() - return nil, ErrClosed - } - - // Check for key - b, ok := st.fs[key] - st.mu.Unlock() - - // Return early if not exist - if !ok { - return nil, ErrNotFound - } - - // Create io.ReadCloser from 'b' copy - b = bytes.Copy(b) - r := bytes.NewReader(b) - return util.NopReadCloser(r), nil -} - -// WriteBytes implements Storage.WriteBytes(). -func (st *MemoryStorage) WriteBytes(key string, b []byte) error { - // Lock storage - st.mu.Lock() - defer st.mu.Unlock() - - // Check store open - if st.st == 1 { - return ErrClosed - } - - _, ok := st.fs[key] - - // Check for already exist - if ok && !st.ow { - return ErrAlreadyExists - } - - // Write + unlock - st.fs[key] = bytes.Copy(b) - return nil -} - -// WriteStream implements Storage.WriteStream(). -func (st *MemoryStorage) WriteStream(key string, r io.Reader) error { - // Read all from reader - b, err := io.ReadAll(r) - if err != nil { - return err - } - - // Write to storage - return st.WriteBytes(key, b) -} - -// Stat implements Storage.Stat(). -func (st *MemoryStorage) Stat(key string) (bool, error) { - // Lock storage - st.mu.Lock() - defer st.mu.Unlock() - - // Check store open - if st.st == 1 { - return false, ErrClosed - } - - // Check for key - _, ok := st.fs[key] - return ok, nil -} - -// Remove implements Storage.Remove(). -func (st *MemoryStorage) Remove(key string) error { - // Lock storage - st.mu.Lock() - defer st.mu.Unlock() - - // Check store open - if st.st == 1 { - return ErrClosed - } - - // Check for key - _, ok := st.fs[key] - if !ok { - return ErrNotFound - } - - // Remove from store - delete(st.fs, key) - - return nil -} - -// Close implements Storage.Close(). -func (st *MemoryStorage) Close() error { - st.mu.Lock() - st.st = 1 - st.mu.Unlock() - return nil -} - -// WalkKeys implements Storage.WalkKeys(). -func (st *MemoryStorage) WalkKeys(opts WalkKeysOptions) error { - // Lock storage - st.mu.Lock() - defer st.mu.Unlock() - - // Check store open - if st.st == 1 { - return ErrClosed - } - - // Walk store keys - for key := range st.fs { - opts.WalkFn(entry(key)) - } - - return nil -} diff --git a/vendor/codeberg.org/gruf/go-store/util/fs.go b/vendor/codeberg.org/gruf/go-store/util/fs.go deleted file mode 100644 index 53fef7750..000000000 --- a/vendor/codeberg.org/gruf/go-store/util/fs.go +++ /dev/null @@ -1,82 +0,0 @@ -package util - -import ( - "io/fs" - "os" - - "codeberg.org/gruf/go-fastpath" -) - -// WalkDir traverses the dir tree of the supplied path, performing the supplied walkFn on each entry -func WalkDir(pb *fastpath.Builder, path string, walkFn func(string, fs.DirEntry)) error { - // Read supplied dir path - dirEntries, err := os.ReadDir(path) - if err != nil { - return err - } - - // Iter entries - for _, entry := range dirEntries { - // Pass to walk fn - walkFn(path, entry) - - // Recurse dir entries - if entry.IsDir() { - err = WalkDir(pb, pb.Join(path, entry.Name()), walkFn) - if err != nil { - return err - } - } - } - - return nil -} - -// CleanDirs traverses the dir tree of the supplied path, removing any folders with zero children -func CleanDirs(path string) error { - // Acquire builder - pb := GetPathBuilder() - defer PutPathBuilder(pb) - - // Get dir entries - entries, err := os.ReadDir(path) - if err != nil { - return err - } - - // Recurse dirs - for _, entry := range entries { - if entry.IsDir() { - err := cleanDirs(pb, pb.Join(path, entry.Name())) - if err != nil { - return err - } - } - } - return nil -} - -// cleanDirs performs the actual dir cleaning logic for the exported version -func cleanDirs(pb *fastpath.Builder, path string) error { - // Get dir entries - entries, err := os.ReadDir(path) - if err != nil { - return err - } - - // If no entries, delete - if len(entries) < 1 { - return os.Remove(path) - } - - // Recurse dirs - for _, entry := range entries { - if entry.IsDir() { - err := cleanDirs(pb, pb.Join(path, entry.Name())) - if err != nil { - return err - } - } - } - return nil -} diff --git a/vendor/codeberg.org/gruf/go-store/util/io.go b/vendor/codeberg.org/gruf/go-store/util/io.go deleted file mode 100644 index d034cf62b..000000000 --- a/vendor/codeberg.org/gruf/go-store/util/io.go +++ /dev/null @@ -1,42 +0,0 @@ -package util - -import "io" - -// NopReadCloser turns a supplied io.Reader into io.ReadCloser with a nop Close() implementation -func NopReadCloser(r io.Reader) io.ReadCloser { - return &nopReadCloser{r} -} - -// NopWriteCloser turns a supplied io.Writer into io.WriteCloser with a nop Close() implementation -func NopWriteCloser(w io.Writer) io.WriteCloser { - return &nopWriteCloser{w} -} - -// ReadCloserWithCallback adds a customizable callback to be called upon Close() of a supplied io.ReadCloser -func ReadCloserWithCallback(rc io.ReadCloser, cb func()) io.ReadCloser { - return &callbackReadCloser{ - ReadCloser: rc, - callback: cb, - } -} - -// nopReadCloser turns an io.Reader -> io.ReadCloser with a nop Close() -type nopReadCloser struct{ io.Reader } - -func (r *nopReadCloser) Close() error { return nil } - -// nopWriteCloser turns an io.Writer -> io.WriteCloser with a nop Close() -type nopWriteCloser struct{ io.Writer } - -func (w nopWriteCloser) Close() error { return nil } - -// callbackReadCloser allows adding our own custom callback to an io.ReadCloser -type callbackReadCloser struct { - io.ReadCloser - callback func() -} - -func (c *callbackReadCloser) Close() error { - defer c.callback() - return c.ReadCloser.Close() -} diff --git a/vendor/codeberg.org/gruf/go-store/util/sys.go b/vendor/codeberg.org/gruf/go-store/util/sys.go deleted file mode 100644 index 6661029e5..000000000 --- a/vendor/codeberg.org/gruf/go-store/util/sys.go +++ /dev/null @@ -1,14 +0,0 @@ -package util - -import "syscall" - -// RetryOnEINTR is a low-level filesystem function for retrying syscalls on O_EINTR received -func RetryOnEINTR(do func() error) error { - for { - err := do() - if err == syscall.EINTR { - continue - } - return err - } -} diff --git a/vendor/codeberg.org/gruf/go-store/LICENSE b/vendor/codeberg.org/gruf/go-store/v2/LICENSE index b7c4417ac..e4163ae35 100644 --- a/vendor/codeberg.org/gruf/go-store/LICENSE +++ b/vendor/codeberg.org/gruf/go-store/v2/LICENSE @@ -1,6 +1,6 @@ MIT License -Copyright (c) 2021 gruf +Copyright (c) 2022 gruf Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions: diff --git a/vendor/codeberg.org/gruf/go-store/v2/kv/iterator.go b/vendor/codeberg.org/gruf/go-store/v2/kv/iterator.go new file mode 100644 index 000000000..736edddaa --- /dev/null +++ b/vendor/codeberg.org/gruf/go-store/v2/kv/iterator.go @@ -0,0 +1,63 @@ +package kv + +import ( + "context" + "errors" + + "codeberg.org/gruf/go-mutexes" + "codeberg.org/gruf/go-store/v2/storage" +) + +var ErrIteratorClosed = errors.New("store/kv: iterator closed") + +// Iterator provides a read-only iterator to all the key-value +// pairs in a KVStore. While the iterator is open the store is read +// locked, you MUST release the iterator when you are finished with +// it. +// +// Please note: +// individual iterators are NOT concurrency safe, though it is safe to +// have multiple iterators running concurrently. +type Iterator struct { + store *KVStore // store is the linked KVStore + state *mutexes.LockState + entries []storage.Entry + index int + key string +} + +// Next attempts to fetch the next key-value pair, the +// return value indicates whether another pair remains. +func (i *Iterator) Next() bool { + next := i.index + 1 + if next >= len(i.entries) { + i.key = "" + return false + } + i.key = i.entries[next].Key + i.index = next + return true +} + +// Key returns the current iterator key. +func (i *Iterator) Key() string { + return i.key +} + +// Value returns the current iterator value at key. +func (i *Iterator) Value(ctx context.Context) ([]byte, error) { + if i.store == nil { + return nil, ErrIteratorClosed + } + return i.store.get(i.state.RLock, ctx, i.key) +} + +// Release will release the store read-lock, and close this iterator. +func (i *Iterator) Release() { + i.state.UnlockMap() + i.state = nil + i.store = nil + i.key = "" + i.entries = nil + i.index = 0 +} diff --git a/vendor/codeberg.org/gruf/go-store/v2/kv/state.go b/vendor/codeberg.org/gruf/go-store/v2/kv/state.go new file mode 100644 index 000000000..9ac8ab1bf --- /dev/null +++ b/vendor/codeberg.org/gruf/go-store/v2/kv/state.go @@ -0,0 +1,116 @@ +package kv + +import ( + "context" + "errors" + "io" + + "codeberg.org/gruf/go-mutexes" +) + +// ErrStateClosed is returned on further calls to states after calling Release(). +var ErrStateClosed = errors.New("store/kv: state closed") + +// StateRO provides a read-only window to the store. While this +// state is active during the Read() function window, the entire +// store will be read-locked. The state is thread-safe for concurrent +// use UNTIL the moment that your supplied function to Read() returns. +type StateRO struct { + store *KVStore + state *mutexes.LockState +} + +// Get: see KVStore.Get(). Returns error if state already closed. +func (st *StateRO) Get(ctx context.Context, key string) ([]byte, error) { + if st.store == nil { + return nil, ErrStateClosed + } + return st.store.get(st.state.RLock, ctx, key) +} + +// GetStream: see KVStore.GetStream(). Returns error if state already closed. +func (st *StateRO) GetStream(ctx context.Context, key string) (io.ReadCloser, error) { + if st.store == nil { + return nil, ErrStateClosed + } + return st.store.getStream(st.state.RLock, ctx, key) +} + +// Has: see KVStore.Has(). Returns error if state already closed. +func (st *StateRO) Has(ctx context.Context, key string) (bool, error) { + if st.store == nil { + return false, ErrStateClosed + } + return st.store.has(st.state.RLock, ctx, key) +} + +// Release will release the store read-lock, and close this state. +func (st *StateRO) Release() { + st.state.UnlockMap() + st.state = nil + st.store = nil +} + +// StateRW provides a read-write window to the store. While this +// state is active during the Update() function window, the entire +// store will be locked. The state is thread-safe for concurrent +// use UNTIL the moment that your supplied function to Update() returns. +type StateRW struct { + store *KVStore + state *mutexes.LockState +} + +// Get: see KVStore.Get(). Returns error if state already closed. +func (st *StateRW) Get(ctx context.Context, key string) ([]byte, error) { + if st.store == nil { + return nil, ErrStateClosed + } + return st.store.get(st.state.RLock, ctx, key) +} + +// GetStream: see KVStore.GetStream(). Returns error if state already closed. +func (st *StateRW) GetStream(ctx context.Context, key string) (io.ReadCloser, error) { + if st.store == nil { + return nil, ErrStateClosed + } + return st.store.getStream(st.state.RLock, ctx, key) +} + +// Put: see KVStore.Put(). Returns error if state already closed. +func (st *StateRW) Put(ctx context.Context, key string, value []byte) error { + if st.store == nil { + return ErrStateClosed + } + return st.store.put(st.state.Lock, ctx, key, value) +} + +// PutStream: see KVStore.PutStream(). Returns error if state already closed. +func (st *StateRW) PutStream(ctx context.Context, key string, r io.Reader) error { + if st.store == nil { + return ErrStateClosed + } + return st.store.putStream(st.state.Lock, ctx, key, r) +} + +// Has: see KVStore.Has(). Returns error if state already closed. +func (st *StateRW) Has(ctx context.Context, key string) (bool, error) { + if st.store == nil { + return false, ErrStateClosed + } + return st.store.has(st.state.RLock, ctx, key) +} + +// Delete: see KVStore.Delete(). Returns error if state already closed. +func (st *StateRW) Delete(ctx context.Context, key string) error { + if st.store == nil { + return ErrStateClosed + } + return st.store.delete(st.state.Lock, ctx, key) +} + +// Release will release the store lock, and close this state. +func (st *StateRW) Release() { + st.state.UnlockMap() + st.state = nil + st.store = nil +} diff --git a/vendor/codeberg.org/gruf/go-store/v2/kv/store.go b/vendor/codeberg.org/gruf/go-store/v2/kv/store.go new file mode 100644 index 000000000..86ba73f67 --- /dev/null +++ b/vendor/codeberg.org/gruf/go-store/v2/kv/store.go @@ -0,0 +1,253 @@ +package kv + +import ( + "context" + "io" + + "codeberg.org/gruf/go-mutexes" + "codeberg.org/gruf/go-store/v2/storage" + "codeberg.org/gruf/go-store/v2/util" +) + +// KVStore is a very simple, yet performant key-value store +type KVStore struct { + mu mutexes.MutexMap // map of keys to mutexes to protect key access + st storage.Storage // underlying storage implementation +} + +func OpenDisk(path string, cfg *storage.DiskConfig) (*KVStore, error) { + // Attempt to open disk storage + storage, err := storage.OpenDisk(path, cfg) + if err != nil { + return nil, err + } + + // Return new KVStore + return OpenStorage(storage) +} + +func OpenBlock(path string, cfg *storage.BlockConfig) (*KVStore, error) { + // Attempt to open block storage + storage, err := storage.OpenBlock(path, cfg) + if err != nil { + return nil, err + } + + // Return new KVStore + return OpenStorage(storage) +} + +func OpenMemory(overwrites bool) *KVStore { + return &KVStore{ + mu: mutexes.NewMap(-1, -1), + st: storage.OpenMemory(100, overwrites), + } +} + +func OpenS3(endpoint string, bucket string, cfg *storage.S3Config) (*KVStore, error) { + // Attempt to open S3 storage + storage, err := storage.OpenS3(endpoint, bucket, cfg) + if err != nil { + return nil, err + } + + // Return new KVStore + return OpenStorage(storage) +} + +func OpenStorage(storage storage.Storage) (*KVStore, error) { + // Perform initial storage clean + err := storage.Clean(context.Background()) + if err != nil { + return nil, err + } + + // Return new KVStore + return &KVStore{ + mu: mutexes.NewMap(-1, -1), + st: storage, + }, nil +} + +// RLock acquires a read-lock on supplied key, returning unlock function. +func (st *KVStore) RLock(key string) (runlock func()) { + return st.mu.RLock(key) +} + +// Lock acquires a write-lock on supplied key, returning unlock function. +func (st *KVStore) Lock(key string) (unlock func()) { + return st.mu.Lock(key) +} + +// Get fetches the bytes for supplied key in the store. +func (st *KVStore) Get(ctx context.Context, key string) ([]byte, error) { + return st.get(st.RLock, ctx, key) +} + +// get performs the underlying logic for KVStore.Get(), using supplied read lock func to allow use with states. +func (st *KVStore) get(rlock func(string) func(), ctx context.Context, key string) ([]byte, error) { + // Acquire read lock for key + runlock := rlock(key) + defer runlock() + + // Read file bytes from storage + return st.st.ReadBytes(ctx, key) +} + +// GetStream fetches a ReadCloser for the bytes at the supplied key in the store. +func (st *KVStore) GetStream(ctx context.Context, key string) (io.ReadCloser, error) { + return st.getStream(st.RLock, ctx, key) +} + +// getStream performs the underlying logic for KVStore.GetStream(), using supplied read lock func to allow use with states. +func (st *KVStore) getStream(rlock func(string) func(), ctx context.Context, key string) (io.ReadCloser, error) { + // Acquire read lock for key + runlock := rlock(key) + + // Attempt to open stream for read + rd, err := st.st.ReadStream(ctx, key) + if err != nil { + runlock() + return nil, err + } + + // Wrap readcloser in our own callback closer + return util.ReadCloserWithCallback(rd, runlock), nil +} + +// Put places the bytes at the supplied key in the store. +func (st *KVStore) Put(ctx context.Context, key string, value []byte) error { + return st.put(st.Lock, ctx, key, value) +} + +// put performs the underlying logic for KVStore.Put(), using supplied lock func to allow use with states. +func (st *KVStore) put(lock func(string) func(), ctx context.Context, key string, value []byte) error { + // Acquire write lock for key + unlock := lock(key) + defer unlock() + + // Write file bytes to storage + return st.st.WriteBytes(ctx, key, value) +} + +// PutStream writes the bytes from the supplied Reader at the supplied key in the store. +func (st *KVStore) PutStream(ctx context.Context, key string, r io.Reader) error { + return st.putStream(st.Lock, ctx, key, r) +} + +// putStream performs the underlying logic for KVStore.PutStream(), using supplied lock func to allow use with states. +func (st *KVStore) putStream(lock func(string) func(), ctx context.Context, key string, r io.Reader) error { + // Acquire write lock for key + unlock := lock(key) + defer unlock() + + // Write file stream to storage + return st.st.WriteStream(ctx, key, r) +} + +// Has checks whether the supplied key exists in the store. +func (st *KVStore) Has(ctx context.Context, key string) (bool, error) { + return st.has(st.RLock, ctx, key) +} + +// has performs the underlying logic for KVStore.Has(), using supplied read lock func to allow use with states. +func (st *KVStore) has(rlock func(string) func(), ctx context.Context, key string) (bool, error) { + // Acquire read lock for key + runlock := rlock(key) + defer runlock() + + // Stat file in storage + return st.st.Stat(ctx, key) +} + +// Delete removes value at supplied key from the store. +func (st *KVStore) Delete(ctx context.Context, key string) error { + return st.delete(st.Lock, ctx, key) +} + +// delete performs the underlying logic for KVStore.Delete(), using supplied lock func to allow use with states. +func (st *KVStore) delete(lock func(string) func(), ctx context.Context, key string) error { + // Acquire write lock for key + unlock := lock(key) + defer unlock() + + // Remove file from storage + return st.st.Remove(ctx, key) +} + +// Iterator returns an Iterator for key-value pairs in the store, using supplied match function +func (st *KVStore) Iterator(ctx context.Context, matchFn func(string) bool) (*Iterator, error) { + if matchFn == nil { + // By default simply match all keys + matchFn = func(string) bool { return true } + } + + // Get store read lock state + state := st.mu.RLockMap() + + var entries []storage.Entry + + walkFn := func(ctx context.Context, entry storage.Entry) error { + // Ignore unmatched entries + if !matchFn(entry.Key) { + return nil + } + + // Add to entries + entries = append(entries, entry) + return nil + } + + // Collate keys in storage with our walk function + err := st.st.WalkKeys(ctx, storage.WalkKeysOptions{WalkFn: walkFn}) + if err != nil { + state.UnlockMap() + return nil, err + } + + // Return new iterator + return &Iterator{ + store: st, + state: state, + entries: entries, + index: -1, + key: "", + }, nil +} + +// Read provides a read-only window to the store, holding it in a read-locked state until release. +func (st *KVStore) Read() *StateRO { + state := st.mu.RLockMap() + return &StateRO{store: st, state: state} +} + +// ReadFn provides a read-only window to the store, holding it in a read-locked state until fn return.. +func (st *KVStore) ReadFn(fn func(*StateRO)) { + // Acquire read-only state + state := st.Read() + defer state.Release() + + // Pass to fn + fn(state) +} + +// Update provides a read-write window to the store, holding it in a write-locked state until release. +func (st *KVStore) Update() *StateRW { + state := st.mu.LockMap() + return &StateRW{store: st, state: state} +} + +// UpdateFn provides a read-write window to the store, holding it in a write-locked state until fn return. +func (st *KVStore) UpdateFn(fn func(*StateRW)) { + // Acquire read-write state + state := st.Update() + defer state.Release() + + // Pass to fn + fn(state) +} + +// Close will close the underlying storage, the mutex map locking (e.g. RLock(), Lock()) will continue to function. +func (st *KVStore) Close() error { + return st.st.Close() +} diff --git a/vendor/codeberg.org/gruf/go-store/storage/block.go b/vendor/codeberg.org/gruf/go-store/v2/storage/block.go index c0bb6b383..b1081cb1c 100644 --- a/vendor/codeberg.org/gruf/go-store/storage/block.go +++ b/vendor/codeberg.org/gruf/go-store/v2/storage/block.go @@ -2,6 +2,7 @@ package storage import ( "bytes" + "context" "crypto/sha256" "fmt" "io" @@ -16,7 +17,7 @@ import ( "codeberg.org/gruf/go-fastcopy" "codeberg.org/gruf/go-hashenc" "codeberg.org/gruf/go-pools" - "codeberg.org/gruf/go-store/util" + "codeberg.org/gruf/go-store/v2/util" ) var ( @@ -24,7 +25,7 @@ var ( blockPathPrefix = "block/" ) -// DefaultBlockConfig is the default BlockStorage configuration +// DefaultBlockConfig is the default BlockStorage configuration. var DefaultBlockConfig = &BlockConfig{ BlockSize: 1024 * 16, WriteBufSize: 4096, @@ -32,25 +33,26 @@ var DefaultBlockConfig = &BlockConfig{ Compression: NoCompression(), } -// BlockConfig defines options to be used when opening a BlockStorage +// BlockConfig defines options to be used when opening a BlockStorage. type BlockConfig struct { - // BlockSize is the chunking size to use when splitting and storing blocks of data + // BlockSize is the chunking size to use when splitting and storing blocks of data. BlockSize int - // ReadBufSize is the buffer size to use when reading node files + // ReadBufSize is the buffer size to use when reading node files. ReadBufSize int - // WriteBufSize is the buffer size to use when writing file streams (PutStream) + // WriteBufSize is the buffer size to use when writing file streams. WriteBufSize int - // Overwrite allows overwriting values of stored keys in the storage + // Overwrite allows overwriting values of stored keys in the storage. Overwrite bool - // Compression is the Compressor to use when reading / writing files, default is no compression + // Compression is the Compressor to use when reading / writing files, + // default is no compression. Compression Compressor } -// getBlockConfig returns a valid BlockConfig for supplied ptr +// getBlockConfig returns a valid BlockConfig for supplied ptr. func getBlockConfig(cfg *BlockConfig) BlockConfig { // If nil, use default if cfg == nil { @@ -63,12 +65,12 @@ func getBlockConfig(cfg *BlockConfig) BlockConfig { } // Assume 0 chunk size == use default - if cfg.BlockSize < 1 { + if cfg.BlockSize <= 0 { cfg.BlockSize = DefaultBlockConfig.BlockSize } // Assume 0 buf size == use default - if cfg.WriteBufSize < 1 { + if cfg.WriteBufSize <= 0 { cfg.WriteBufSize = DefaultDiskConfig.WriteBufSize } @@ -85,7 +87,7 @@ func getBlockConfig(cfg *BlockConfig) BlockConfig { // a filesystem. Each value is chunked into blocks of configured size and these // blocks are stored with name equal to their base64-encoded SHA256 hash-sum. A // "node" file is finally created containing an array of hashes contained within -// this value +// this value. type BlockStorage struct { path string // path is the root path of this store blockPath string // blockPath is the joined root path + block path prefix @@ -103,7 +105,7 @@ type BlockStorage struct { // the hash of the data. } -// OpenBlock opens a BlockStorage instance for given folder path and configuration +// OpenBlock opens a BlockStorage instance for given folder path and configuration. func OpenBlock(path string, cfg *BlockConfig) (*BlockStorage, error) { // Acquire path builder pb := util.GetPathBuilder() @@ -143,7 +145,7 @@ func OpenBlock(path string, cfg *BlockConfig) (*BlockStorage, error) { if err != nil { return nil, err } else if !stat.IsDir() { - return nil, errPathIsFile + return nil, new_error("path is file") } // Open and acquire storage lock for path @@ -182,34 +184,29 @@ func OpenBlock(path string, cfg *BlockConfig) (*BlockStorage, error) { return st, nil } -// Clean implements storage.Clean() -func (st *BlockStorage) Clean() error { - // Track open - st.lock.Add() - defer st.lock.Done() - +// Clean implements storage.Clean(). +func (st *BlockStorage) Clean(ctx context.Context) error { // Check if open if st.lock.Closed() { return ErrClosed } + // Check context still valid + if err := ctx.Err(); err != nil { + return err + } + // Acquire path builder pb := util.GetPathBuilder() defer util.PutPathBuilder(pb) nodes := map[string]*node{} - onceErr := errors.OnceError{} // Walk nodes dir for entries - err := util.WalkDir(pb, st.nodePath, func(npath string, fsentry fs.DirEntry) { + err := walkDir(pb, st.nodePath, func(npath string, fsentry fs.DirEntry) error { // Only deal with regular files if !fsentry.Type().IsRegular() { - return - } - - // Stop if we hit error previously - if onceErr.IsSet() { - return + return nil } // Get joined node path name @@ -218,8 +215,7 @@ func (st *BlockStorage) Clean() error { // Attempt to open RO file file, err := open(npath, defaultFileROFlags) if err != nil { - onceErr.Store(err) - return + return err } defer file.Close() @@ -239,32 +235,24 @@ func (st *BlockStorage) Clean() error { nil, ) if err != nil { - onceErr.Store(err) - return + return err } // Append to nodes slice nodes[fsentry.Name()] = &node + return nil }) // Handle errors (though nodePath may not have been created yet) if err != nil && !os.IsNotExist(err) { return err - } else if onceErr.IsSet() { - return onceErr.Load() } // Walk blocks dir for entries - onceErr.Reset() - err = util.WalkDir(pb, st.blockPath, func(bpath string, fsentry fs.DirEntry) { + err = walkDir(pb, st.blockPath, func(bpath string, fsentry fs.DirEntry) error { // Only deal with regular files if !fsentry.Type().IsRegular() { - return - } - - // Stop if we hit error previously - if onceErr.IsSet() { - return + return nil } inUse := false @@ -281,25 +269,19 @@ func (st *BlockStorage) Clean() error { // Block hash is used by node if inUse { - return + return nil } // Get joined block path name bpath = pb.Join(bpath, fsentry.Name()) // Remove this unused block path - err := os.Remove(bpath) - if err != nil { - onceErr.Store(err) - return - } + return os.Remove(bpath) }) // Handle errors (though blockPath may not have been created yet) if err != nil && !os.IsNotExist(err) { return err - } else if onceErr.IsSet() { - return onceErr.Load() } // If there are nodes left at this point, they are corrupt @@ -315,10 +297,10 @@ func (st *BlockStorage) Clean() error { return nil } -// ReadBytes implements Storage.ReadBytes() -func (st *BlockStorage) ReadBytes(key string) ([]byte, error) { +// ReadBytes implements Storage.ReadBytes(). +func (st *BlockStorage) ReadBytes(ctx context.Context, key string) ([]byte, error) { // Get stream reader for key - rc, err := st.ReadStream(key) + rc, err := st.ReadStream(ctx, key) if err != nil { return nil, err } @@ -328,27 +310,27 @@ func (st *BlockStorage) ReadBytes(key string) ([]byte, error) { return io.ReadAll(rc) } -// ReadStream implements Storage.ReadStream() -func (st *BlockStorage) ReadStream(key string) (io.ReadCloser, error) { +// ReadStream implements Storage.ReadStream(). +func (st *BlockStorage) ReadStream(ctx context.Context, key string) (io.ReadCloser, error) { // Get node file path for key npath, err := st.nodePathForKey(key) if err != nil { return nil, err } - // Track open - st.lock.Add() - // Check if open if st.lock.Closed() { - st.lock.Done() return nil, ErrClosed } + // Check context still valid + if err := ctx.Err(); err != nil { + return nil, err + } + // Attempt to open RO file file, err := open(npath, defaultFileROFlags) if err != nil { - st.lock.Done() return nil, errSwapNotFound(err) } defer file.Close() @@ -357,8 +339,9 @@ func (st *BlockStorage) ReadStream(key string) (io.ReadCloser, error) { hbuf := st.bufpool.Get() defer st.bufpool.Put(hbuf) + var node node + // Write file contents to node - node := node{} _, err = st.cppool.Copy( &nodeWriter{ node: &node, @@ -367,18 +350,17 @@ func (st *BlockStorage) ReadStream(key string) (io.ReadCloser, error) { file, ) if err != nil { - st.lock.Done() return nil, err } // Prepare block reader and return - rc := util.NopReadCloser(&blockReader{ + return util.NopReadCloser(&blockReader{ storage: st, node: &node, - }) // we wrap the blockreader to decr lockfile waitgroup - return util.ReadCloserWithCallback(rc, st.lock.Done), nil + }), nil } +// readBlock reads the block with hash (key) from the filesystem. func (st *BlockStorage) readBlock(key string) ([]byte, error) { // Get block file path for key bpath := st.blockPathForKey(key) @@ -386,14 +368,14 @@ func (st *BlockStorage) readBlock(key string) ([]byte, error) { // Attempt to open RO file file, err := open(bpath, defaultFileROFlags) if err != nil { - return nil, wrap(errCorruptNode, err) + return nil, wrap(new_error("corrupted node"), err) } defer file.Close() // Wrap the file in a compressor cFile, err := st.config.Compression.Reader(file) if err != nil { - return nil, wrap(errCorruptNode, err) + return nil, wrap(new_error("corrupted node"), err) } defer cFile.Close() @@ -401,28 +383,29 @@ func (st *BlockStorage) readBlock(key string) ([]byte, error) { return io.ReadAll(cFile) } -// WriteBytes implements Storage.WriteBytes() -func (st *BlockStorage) WriteBytes(key string, value []byte) error { - return st.WriteStream(key, bytes.NewReader(value)) +// WriteBytes implements Storage.WriteBytes(). +func (st *BlockStorage) WriteBytes(ctx context.Context, key string, value []byte) error { + return st.WriteStream(ctx, key, bytes.NewReader(value)) } -// WriteStream implements Storage.WriteStream() -func (st *BlockStorage) WriteStream(key string, r io.Reader) error { +// WriteStream implements Storage.WriteStream(). +func (st *BlockStorage) WriteStream(ctx context.Context, key string, r io.Reader) error { // Get node file path for key npath, err := st.nodePathForKey(key) if err != nil { return err } - // Track open - st.lock.Add() - defer st.lock.Done() - // Check if open if st.lock.Closed() { return ErrClosed } + // Check context still valid + if err := ctx.Err(); err != nil { + return err + } + // Check if this exists ok, err := stat(key) if err != nil { @@ -446,8 +429,7 @@ func (st *BlockStorage) WriteStream(key string, r io.Reader) error { return err } - // Alloc new node - node := node{} + var node node // Acquire HashEncoder hc := st.hashPool.Get().(*hashEncoder) @@ -529,7 +511,7 @@ loop: // If no hashes created, return if len(node.hashes) < 1 { - return errNoHashesWritten + return new_error("no hashes written") } // Prepare to swap error if need-be @@ -563,11 +545,11 @@ loop: buf.Grow(st.config.WriteBufSize) // Finally, write data to file - _, err = io.CopyBuffer(file, &nodeReader{node: &node}, nil) + _, err = io.CopyBuffer(file, &nodeReader{node: node}, buf.B) return err } -// writeBlock writes the block with hash and supplied value to the filesystem +// writeBlock writes the block with hash and supplied value to the filesystem. func (st *BlockStorage) writeBlock(hash string, value []byte) error { // Get block file path for key bpath := st.blockPathForKey(hash) @@ -594,49 +576,51 @@ func (st *BlockStorage) writeBlock(hash string, value []byte) error { return err } -// statBlock checks for existence of supplied block hash +// statBlock checks for existence of supplied block hash. func (st *BlockStorage) statBlock(hash string) (bool, error) { return stat(st.blockPathForKey(hash)) } // Stat implements Storage.Stat() -func (st *BlockStorage) Stat(key string) (bool, error) { +func (st *BlockStorage) Stat(ctx context.Context, key string) (bool, error) { // Get node file path for key kpath, err := st.nodePathForKey(key) if err != nil { return false, err } - // Track open - st.lock.Add() - defer st.lock.Done() - // Check if open if st.lock.Closed() { return false, ErrClosed } + // Check context still valid + if err := ctx.Err(); err != nil { + return false, err + } + // Check for file on disk return stat(kpath) } -// Remove implements Storage.Remove() -func (st *BlockStorage) Remove(key string) error { +// Remove implements Storage.Remove(). +func (st *BlockStorage) Remove(ctx context.Context, key string) error { // Get node file path for key kpath, err := st.nodePathForKey(key) if err != nil { return err } - // Track open - st.lock.Add() - defer st.lock.Done() - // Check if open if st.lock.Closed() { return ErrClosed } + // Check context still valid + if err := ctx.Err(); err != nil { + return err + } + // Remove at path (we know this is file) if err := unlink(kpath); err != nil { return errSwapNotFound(err) @@ -645,36 +629,43 @@ func (st *BlockStorage) Remove(key string) error { return nil } -// Close implements Storage.Close() +// Close implements Storage.Close(). func (st *BlockStorage) Close() error { return st.lock.Close() } -// WalkKeys implements Storage.WalkKeys() -func (st *BlockStorage) WalkKeys(opts WalkKeysOptions) error { - // Track open - st.lock.Add() - defer st.lock.Done() - +// WalkKeys implements Storage.WalkKeys(). +func (st *BlockStorage) WalkKeys(ctx context.Context, opts WalkKeysOptions) error { // Check if open if st.lock.Closed() { return ErrClosed } + // Check context still valid + if err := ctx.Err(); err != nil { + return err + } + // Acquire path builder pb := util.GetPathBuilder() defer util.PutPathBuilder(pb) // Walk dir for entries - return util.WalkDir(pb, st.nodePath, func(npath string, fsentry fs.DirEntry) { - // Only deal with regular files - if fsentry.Type().IsRegular() { - opts.WalkFn(entry(fsentry.Name())) + return walkDir(pb, st.nodePath, func(npath string, fsentry fs.DirEntry) error { + if !fsentry.Type().IsRegular() { + // Only deal with regular files + return nil } + + // Perform provided walk function + return opts.WalkFn(ctx, Entry{ + Key: fsentry.Name(), + Size: -1, + }) }) } -// nodePathForKey calculates the node file path for supplied key +// nodePathForKey calculates the node file path for supplied key. func (st *BlockStorage) nodePathForKey(key string) (string, error) { // Path separators are illegal, as directory paths if strings.Contains(key, "/") || key == "." || key == ".." { @@ -693,41 +684,40 @@ func (st *BlockStorage) nodePathForKey(key string) (string, error) { return pb.Join(st.nodePath, key), nil } -// blockPathForKey calculates the block file path for supplied hash +// blockPathForKey calculates the block file path for supplied hash. func (st *BlockStorage) blockPathForKey(hash string) string { pb := util.GetPathBuilder() defer util.PutPathBuilder(pb) return pb.Join(st.blockPath, hash) } -// hashSeparator is the separating byte between block hashes +// hashSeparator is the separating byte between block hashes. const hashSeparator = byte('\n') -// node represents the contents of a node file in storage +// node represents the contents of a node file in storage. type node struct { hashes []string } -// removeHash attempts to remove supplied block hash from the node's hash array +// removeHash attempts to remove supplied block hash from the node's hash array. func (n *node) removeHash(hash string) bool { - haveDropped := false for i := 0; i < len(n.hashes); { if n.hashes[i] == hash { // Drop this hash from slice n.hashes = append(n.hashes[:i], n.hashes[i+1:]...) - haveDropped = true - } else { - // Continue iter - i++ + return true } + + // Continue iter + i++ } - return haveDropped + return false } // nodeReader is an io.Reader implementation for the node file representation, -// which is useful when calculated node file is being written to the store +// which is useful when calculated node file is being written to the store. type nodeReader struct { - node *node + node node idx int last int } @@ -774,7 +764,7 @@ func (r *nodeReader) Read(b []byte) (int, error) { } // nodeWriter is an io.Writer implementation for the node file representation, -// which is useful when calculated node file is being read from the store +// which is useful when calculated node file is being read from the store. type nodeWriter struct { node *node buf *byteutil.Buffer @@ -789,7 +779,7 @@ func (w *nodeWriter) Write(b []byte) (int, error) { if idx == -1 { // Check we shouldn't be expecting it if w.buf.Len() > encodedHashLen { - return n, errInvalidNode + return n, new_error("invalid node") } // Write all contents to buffer @@ -802,7 +792,7 @@ func (w *nodeWriter) Write(b []byte) (int, error) { w.buf.Write(b[n : n+idx]) n += idx + 1 if w.buf.Len() != encodedHashLen { - return n, errInvalidNode + return n, new_error("invalid node") } // Append to hashes & reset @@ -813,7 +803,7 @@ func (w *nodeWriter) Write(b []byte) (int, error) { // blockReader is an io.Reader implementation for the combined, linked block // data contained with a node file. Basically, this allows reading value data -// from the store for a given node file +// from the store for a given node file. type blockReader struct { storage *BlockStorage node *node @@ -874,13 +864,13 @@ var ( ) ) -// hashEncoder is a HashEncoder with built-in encode buffer +// hashEncoder is a HashEncoder with built-in encode buffer. type hashEncoder struct { henc hashenc.HashEncoder ebuf []byte } -// newHashEncoder returns a new hashEncoder instance +// newHashEncoder returns a new hashEncoder instance. func newHashEncoder() *hashEncoder { return &hashEncoder{ henc: hashenc.New(sha256.New(), base64Encoding), @@ -888,7 +878,7 @@ func newHashEncoder() *hashEncoder { } } -// EncodeSum encodes the src data and returns resulting bytes, only valid until next call to EncodeSum() +// EncodeSum encodes the src data and returns resulting bytes, only valid until next call to EncodeSum(). func (henc *hashEncoder) EncodeSum(src []byte) string { henc.henc.EncodeSum(henc.ebuf, src) return string(henc.ebuf) diff --git a/vendor/codeberg.org/gruf/go-store/v2/storage/compressor.go b/vendor/codeberg.org/gruf/go-store/v2/storage/compressor.go new file mode 100644 index 000000000..6eeb3a78d --- /dev/null +++ b/vendor/codeberg.org/gruf/go-store/v2/storage/compressor.go @@ -0,0 +1,212 @@ +package storage + +import ( + "bytes" + "io" + "sync" + + "codeberg.org/gruf/go-store/v2/util" + + "github.com/klauspost/compress/gzip" + "github.com/klauspost/compress/snappy" + "github.com/klauspost/compress/zlib" +) + +// Compressor defines a means of compressing/decompressing values going into a key-value store +type Compressor interface { + // Reader returns a new decompressing io.ReadCloser based on supplied (compressed) io.Reader + Reader(io.Reader) (io.ReadCloser, error) + + // Writer returns a new compressing io.WriteCloser based on supplied (uncompressed) io.Writer + Writer(io.Writer) (io.WriteCloser, error) +} + +type gzipCompressor struct { + rpool sync.Pool + wpool sync.Pool +} + +// GZipCompressor returns a new Compressor that implements GZip at default compression level +func GZipCompressor() Compressor { + return GZipCompressorLevel(gzip.DefaultCompression) +} + +// GZipCompressorLevel returns a new Compressor that implements GZip at supplied compression level +func GZipCompressorLevel(level int) Compressor { + // GZip readers immediately check for valid + // header data on allocation / reset, so we + // need a set of valid header data so we can + // iniitialize reader instances in mempool. + hdr := bytes.NewBuffer(nil) + + // Init writer to ensure valid level provided + gw, err := gzip.NewWriterLevel(hdr, level) + if err != nil { + panic(err) + } + + // Write empty data to ensure gzip + // header data is in byte buffer. + gw.Write([]byte{}) + gw.Close() + + return &gzipCompressor{ + rpool: sync.Pool{ + New: func() any { + hdr := bytes.NewReader(hdr.Bytes()) + gr, _ := gzip.NewReader(hdr) + return gr + }, + }, + wpool: sync.Pool{ + New: func() any { + gw, _ := gzip.NewWriterLevel(nil, level) + return gw + }, + }, + } +} + +func (c *gzipCompressor) Reader(r io.Reader) (io.ReadCloser, error) { + gr := c.rpool.Get().(*gzip.Reader) + if err := gr.Reset(r); err != nil { + c.rpool.Put(gr) + return nil, err + } + return util.ReadCloserWithCallback(gr, func() { + c.rpool.Put(gr) + }), nil +} + +func (c *gzipCompressor) Writer(w io.Writer) (io.WriteCloser, error) { + gw := c.wpool.Get().(*gzip.Writer) + gw.Reset(w) + return util.WriteCloserWithCallback(gw, func() { + c.wpool.Put(gw) + }), nil +} + +type zlibCompressor struct { + rpool sync.Pool + wpool sync.Pool + dict []byte +} + +// ZLibCompressor returns a new Compressor that implements ZLib at default compression level +func ZLibCompressor() Compressor { + return ZLibCompressorLevelDict(zlib.DefaultCompression, nil) +} + +// ZLibCompressorLevel returns a new Compressor that implements ZLib at supplied compression level +func ZLibCompressorLevel(level int) Compressor { + return ZLibCompressorLevelDict(level, nil) +} + +// ZLibCompressorLevelDict returns a new Compressor that implements ZLib at supplied compression level with supplied dict +func ZLibCompressorLevelDict(level int, dict []byte) Compressor { + // ZLib readers immediately check for valid + // header data on allocation / reset, so we + // need a set of valid header data so we can + // iniitialize reader instances in mempool. + hdr := bytes.NewBuffer(nil) + + // Init writer to ensure valid level + dict provided + zw, err := zlib.NewWriterLevelDict(hdr, level, dict) + if err != nil { + panic(err) + } + + // Write empty data to ensure zlib + // header data is in byte buffer. + zw.Write([]byte{}) + zw.Close() + + return &zlibCompressor{ + rpool: sync.Pool{ + New: func() any { + hdr := bytes.NewReader(hdr.Bytes()) + zr, _ := zlib.NewReaderDict(hdr, dict) + return zr + }, + }, + wpool: sync.Pool{ + New: func() any { + zw, _ := zlib.NewWriterLevelDict(nil, level, dict) + return zw + }, + }, + dict: dict, + } +} + +func (c *zlibCompressor) Reader(r io.Reader) (io.ReadCloser, error) { + zr := c.rpool.Get().(interface { + io.ReadCloser + zlib.Resetter + }) + if err := zr.Reset(r, c.dict); err != nil { + c.rpool.Put(zr) + return nil, err + } + return util.ReadCloserWithCallback(zr, func() { + c.rpool.Put(zr) + }), nil +} + +func (c *zlibCompressor) Writer(w io.Writer) (io.WriteCloser, error) { + zw := c.wpool.Get().(*zlib.Writer) + zw.Reset(w) + return util.WriteCloserWithCallback(zw, func() { + c.wpool.Put(zw) + }), nil +} + +type snappyCompressor struct { + rpool sync.Pool + wpool sync.Pool +} + +// SnappyCompressor returns a new Compressor that implements Snappy. +func SnappyCompressor() Compressor { + return &snappyCompressor{ + rpool: sync.Pool{ + New: func() any { return snappy.NewReader(nil) }, + }, + wpool: sync.Pool{ + New: func() any { return snappy.NewWriter(nil) }, + }, + } +} + +func (c *snappyCompressor) Reader(r io.Reader) (io.ReadCloser, error) { + sr := c.rpool.Get().(*snappy.Reader) + sr.Reset(r) + return util.ReadCloserWithCallback( + util.NopReadCloser(sr), + func() { c.rpool.Put(sr) }, + ), nil +} + +func (c *snappyCompressor) Writer(w io.Writer) (io.WriteCloser, error) { + sw := c.wpool.Get().(*snappy.Writer) + sw.Reset(w) + return util.WriteCloserWithCallback( + util.NopWriteCloser(sw), + func() { c.wpool.Put(sw) }, + ), nil +} + +type nopCompressor struct{} + +// NoCompression is a Compressor that simply does nothing. +func NoCompression() Compressor { + return &nopCompressor{} +} + +func (c *nopCompressor) Reader(r io.Reader) (io.ReadCloser, error) { + return util.NopReadCloser(r), nil +} + +func (c *nopCompressor) Writer(w io.Writer) (io.WriteCloser, error) { + return util.NopWriteCloser(w), nil +} diff --git a/vendor/codeberg.org/gruf/go-store/storage/disk.go b/vendor/codeberg.org/gruf/go-store/v2/storage/disk.go index 457cc6364..dab1d6128 100644 --- a/vendor/codeberg.org/gruf/go-store/storage/disk.go +++ b/vendor/codeberg.org/gruf/go-store/v2/storage/disk.go @@ -1,6 +1,8 @@ package storage import ( + "context" + "errors" "io" "io/fs" "os" @@ -11,10 +13,10 @@ import ( "codeberg.org/gruf/go-bytes" "codeberg.org/gruf/go-fastcopy" - "codeberg.org/gruf/go-store/util" + "codeberg.org/gruf/go-store/v2/util" ) -// DefaultDiskConfig is the default DiskStorage configuration +// DefaultDiskConfig is the default DiskStorage configuration. var DefaultDiskConfig = &DiskConfig{ Overwrite: true, WriteBufSize: 4096, @@ -22,27 +24,28 @@ var DefaultDiskConfig = &DiskConfig{ Compression: NoCompression(), } -// DiskConfig defines options to be used when opening a DiskStorage +// DiskConfig defines options to be used when opening a DiskStorage. type DiskConfig struct { - // Transform is the supplied key<-->path KeyTransform + // Transform is the supplied key <--> path KeyTransform. Transform KeyTransform - // WriteBufSize is the buffer size to use when writing file streams (PutStream) + // WriteBufSize is the buffer size to use when writing file streams. WriteBufSize int - // Overwrite allows overwriting values of stored keys in the storage + // Overwrite allows overwriting values of stored keys in the storage. Overwrite bool // LockFile allows specifying the filesystem path to use for the lockfile, // providing only a filename it will store the lockfile within provided store - // path and nest the store under `path/store` to prevent access to lockfile + // path and nest the store under `path/store` to prevent access to lockfile. LockFile string - // Compression is the Compressor to use when reading / writing files, default is no compression + // Compression is the Compressor to use when reading / writing files, + // default is no compression. Compression Compressor } -// getDiskConfig returns a valid DiskConfig for supplied ptr +// getDiskConfig returns a valid DiskConfig for supplied ptr. func getDiskConfig(cfg *DiskConfig) DiskConfig { // If nil, use default if cfg == nil { @@ -60,12 +63,12 @@ func getDiskConfig(cfg *DiskConfig) DiskConfig { } // Assume 0 buf size == use default - if cfg.WriteBufSize < 1 { + if cfg.WriteBufSize <= 0 { cfg.WriteBufSize = DefaultDiskConfig.WriteBufSize } // Assume empty lockfile path == use default - if len(cfg.LockFile) < 1 { + if len(cfg.LockFile) == 0 { cfg.LockFile = LockFile } @@ -79,7 +82,7 @@ func getDiskConfig(cfg *DiskConfig) DiskConfig { } } -// DiskStorage is a Storage implementation that stores directly to a filesystem +// DiskStorage is a Storage implementation that stores directly to a filesystem. type DiskStorage struct { path string // path is the root path of this store cppool fastcopy.CopyPool // cppool is the prepared io copier with buffer pool @@ -87,8 +90,8 @@ type DiskStorage struct { lock *Lock // lock is the opened lockfile for this storage instance } -// OpenFile opens a DiskStorage instance for given folder path and configuration -func OpenFile(path string, cfg *DiskConfig) (*DiskStorage, error) { +// OpenDisk opens a DiskStorage instance for given folder path and configuration. +func OpenDisk(path string, cfg *DiskConfig) (*DiskStorage, error) { // Get checked config config := getDiskConfig(cfg) @@ -104,7 +107,7 @@ func OpenFile(path string, cfg *DiskConfig) (*DiskStorage, error) { lockfile := pb.Clean(config.LockFile) // Check if lockfile is an *actual* path or just filename - if lockDir, _ := _path.Split(lockfile); len(lockDir) < 1 { + if lockDir, _ := _path.Split(lockfile); lockDir == "" { // Lockfile is a filename, store must be nested under // $storePath/store to prevent access to the lockfile storePath += "store/" @@ -138,7 +141,7 @@ func OpenFile(path string, cfg *DiskConfig) (*DiskStorage, error) { if err != nil { return nil, err } else if !stat.IsDir() { - return nil, errPathIsFile + return nil, errors.New("store/storage: path is file") } // Open and acquire storage lock for path @@ -160,20 +163,26 @@ func OpenFile(path string, cfg *DiskConfig) (*DiskStorage, error) { return st, nil } -// Clean implements Storage.Clean() -func (st *DiskStorage) Clean() error { - st.lock.Add() - defer st.lock.Done() +// Clean implements Storage.Clean(). +func (st *DiskStorage) Clean(ctx context.Context) error { + // Check if open if st.lock.Closed() { return ErrClosed } - return util.CleanDirs(st.path) + + // Check context still valid + if err := ctx.Err(); err != nil { + return err + } + + // Clean-out unused directories + return cleanDirs(st.path) } -// ReadBytes implements Storage.ReadBytes() -func (st *DiskStorage) ReadBytes(key string) ([]byte, error) { +// ReadBytes implements Storage.ReadBytes(). +func (st *DiskStorage) ReadBytes(ctx context.Context, key string) ([]byte, error) { // Get stream reader for key - rc, err := st.ReadStream(key) + rc, err := st.ReadStream(ctx, key) if err != nil { return nil, err } @@ -183,26 +192,27 @@ func (st *DiskStorage) ReadBytes(key string) ([]byte, error) { return io.ReadAll(rc) } -// ReadStream implements Storage.ReadStream() -func (st *DiskStorage) ReadStream(key string) (io.ReadCloser, error) { +// ReadStream implements Storage.ReadStream(). +func (st *DiskStorage) ReadStream(ctx context.Context, key string) (io.ReadCloser, error) { // Get file path for key kpath, err := st.filepath(key) if err != nil { return nil, err } - // Track open - st.lock.Add() - // Check if open if st.lock.Closed() { return nil, ErrClosed } + // Check context still valid + if err := ctx.Err(); err != nil { + return nil, err + } + // Attempt to open file (replace ENOENT with our own) file, err := open(kpath, defaultFileROFlags) if err != nil { - st.lock.Done() return nil, errSwapNotFound(err) } @@ -210,39 +220,38 @@ func (st *DiskStorage) ReadStream(key string) (io.ReadCloser, error) { cFile, err := st.config.Compression.Reader(file) if err != nil { file.Close() // close this here, ignore error - st.lock.Done() return nil, err } // Wrap compressor to ensure file close return util.ReadCloserWithCallback(cFile, func() { file.Close() - st.lock.Done() }), nil } -// WriteBytes implements Storage.WriteBytes() -func (st *DiskStorage) WriteBytes(key string, value []byte) error { - return st.WriteStream(key, bytes.NewReader(value)) +// WriteBytes implements Storage.WriteBytes(). +func (st *DiskStorage) WriteBytes(ctx context.Context, key string, value []byte) error { + return st.WriteStream(ctx, key, bytes.NewReader(value)) } -// WriteStream implements Storage.WriteStream() -func (st *DiskStorage) WriteStream(key string, r io.Reader) error { +// WriteStream implements Storage.WriteStream(). +func (st *DiskStorage) WriteStream(ctx context.Context, key string, r io.Reader) error { // Get file path for key kpath, err := st.filepath(key) if err != nil { return err } - // Track open - st.lock.Add() - defer st.lock.Done() - // Check if open if st.lock.Closed() { return ErrClosed } + // Check context still valid + if err := ctx.Err(); err != nil { + return err + } + // Ensure dirs leading up to file exist err = os.MkdirAll(path.Dir(kpath), defaultDirPerms) if err != nil { @@ -280,44 +289,46 @@ func (st *DiskStorage) WriteStream(key string, r io.Reader) error { return err } -// Stat implements Storage.Stat() -func (st *DiskStorage) Stat(key string) (bool, error) { +// Stat implements Storage.Stat(). +func (st *DiskStorage) Stat(ctx context.Context, key string) (bool, error) { // Get file path for key kpath, err := st.filepath(key) if err != nil { return false, err } - // Track open - st.lock.Add() - defer st.lock.Done() - // Check if open if st.lock.Closed() { return false, ErrClosed } + // Check context still valid + if err := ctx.Err(); err != nil { + return false, err + } + // Check for file on disk return stat(kpath) } -// Remove implements Storage.Remove() -func (st *DiskStorage) Remove(key string) error { +// Remove implements Storage.Remove(). +func (st *DiskStorage) Remove(ctx context.Context, key string) error { // Get file path for key kpath, err := st.filepath(key) if err != nil { return err } - // Track open - st.lock.Add() - defer st.lock.Done() - // Check if open if st.lock.Closed() { return ErrClosed } + // Check context still valid + if err := ctx.Err(); err != nil { + return err + } + // Remove at path (we know this is file) if err := unlink(kpath); err != nil { return errSwapNotFound(err) @@ -326,41 +337,55 @@ func (st *DiskStorage) Remove(key string) error { return nil } -// Close implements Storage.Close() +// Close implements Storage.Close(). func (st *DiskStorage) Close() error { return st.lock.Close() } -// WalkKeys implements Storage.WalkKeys() -func (st *DiskStorage) WalkKeys(opts WalkKeysOptions) error { - // Track open - st.lock.Add() - defer st.lock.Done() - +// WalkKeys implements Storage.WalkKeys(). +func (st *DiskStorage) WalkKeys(ctx context.Context, opts WalkKeysOptions) error { // Check if open if st.lock.Closed() { return ErrClosed } + // Check context still valid + if err := ctx.Err(); err != nil { + return err + } + // Acquire path builder pb := util.GetPathBuilder() defer util.PutPathBuilder(pb) // Walk dir for entries - return util.WalkDir(pb, st.path, func(kpath string, fsentry fs.DirEntry) { - if fsentry.Type().IsRegular() { + return walkDir(pb, st.path, func(kpath string, fsentry fs.DirEntry) error { + if !fsentry.Type().IsRegular() { // Only deal with regular files + return nil + } - // Get full item path (without root) - kpath = pb.Join(kpath, fsentry.Name())[len(st.path):] + // Get full item path (without root) + kpath = pb.Join(kpath, fsentry.Name()) + kpath = kpath[len(st.path):] - // Perform provided walk function - opts.WalkFn(entry(st.config.Transform.PathToKey(kpath))) + // Load file info. This should already + // be loaded due to the underlying call + // to os.File{}.ReadDir() populating them + info, err := fsentry.Info() + if err != nil { + return err } + + // Perform provided walk function + return opts.WalkFn(ctx, Entry{ + Key: st.config.Transform.PathToKey(kpath), + Size: info.Size(), + }) }) } -// filepath checks and returns a formatted filepath for given key +// filepath checks and returns a formatted filepath for given key. func (st *DiskStorage) filepath(key string) (string, error) { // Calculate transformed key path key = st.config.Transform.KeyToPath(key) @@ -382,7 +407,7 @@ func (st *DiskStorage) filepath(key string) (string, error) { } // isDirTraversal will check if rootPlusPath is a dir traversal outside of root, -// assuming that both are cleaned and that rootPlusPath is path.Join(root, somePath) +// assuming that both are cleaned and that rootPlusPath is path.Join(root, somePath). func isDirTraversal(root, rootPlusPath string) bool { switch { // Root is $PWD, check for traversal out of diff --git a/vendor/codeberg.org/gruf/go-store/storage/errors.go b/vendor/codeberg.org/gruf/go-store/v2/storage/errors.go index 6953e11fe..4ae7e4be5 100644 --- a/vendor/codeberg.org/gruf/go-store/storage/errors.go +++ b/vendor/codeberg.org/gruf/go-store/v2/storage/errors.go @@ -2,38 +2,34 @@ package storage import ( "errors" + "strings" "syscall" + + "github.com/minio/minio-go/v7" ) var ( // ErrClosed is returned on operations on a closed storage - ErrClosed = errors.New("store/storage: closed") + ErrClosed = new_error("closed") // ErrNotFound is the error returned when a key cannot be found in storage - ErrNotFound = errors.New("store/storage: key not found") + ErrNotFound = new_error("key not found") // ErrAlreadyExist is the error returned when a key already exists in storage - ErrAlreadyExists = errors.New("store/storage: key already exists") + ErrAlreadyExists = new_error("key already exists") // ErrInvalidkey is the error returned when an invalid key is passed to storage - ErrInvalidKey = errors.New("store/storage: invalid key") + ErrInvalidKey = new_error("invalid key") // ErrAlreadyLocked is returned on fail opening a storage lockfile - ErrAlreadyLocked = errors.New("store/storage: storage lock already open") - - // errPathIsFile is returned when a path for a disk config is actually a file - errPathIsFile = errors.New("store/storage: path is file") - - // errNoHashesWritten is returned when no blocks are written for given input value - errNoHashesWritten = errors.New("storage/storage: no hashes written") - - // errInvalidNode is returned when read on an invalid node in the store is attempted - errInvalidNode = errors.New("store/storage: invalid node") - - // errCorruptNode is returned when a block fails to be opened / read during read of a node. - errCorruptNode = errors.New("store/storage: corrupted node") + ErrAlreadyLocked = new_error("storage lock already open") ) +// new_error returns a new error instance prefixed by package prefix. +func new_error(msg string) error { + return errors.New("store/storage: " + msg) +} + // wrappedError allows wrapping together an inner with outer error. type wrappedError struct { inner error @@ -88,3 +84,27 @@ func errSwapUnavailable(err error) error { } return err } + +// transformS3Error transforms an error returned from S3Storage underlying +// minio.Core client, by wrapping where necessary with our own error types. +func transformS3Error(err error) error { + // Cast this to a minio error response + ersp, ok := err.(minio.ErrorResponse) + if ok { + switch ersp.Code { + case "NoSuchKey": + return wrap(ErrNotFound, err) + case "Conflict": + return wrap(ErrAlreadyExists, err) + default: + return err + } + } + + // Check if error has an invalid object name prefix + if strings.HasPrefix(err.Error(), "Object name ") { + return wrap(ErrInvalidKey, err) + } + + return err +} diff --git a/vendor/codeberg.org/gruf/go-store/v2/storage/fs.go b/vendor/codeberg.org/gruf/go-store/v2/storage/fs.go new file mode 100644 index 000000000..658b7e762 --- /dev/null +++ b/vendor/codeberg.org/gruf/go-store/v2/storage/fs.go @@ -0,0 +1,221 @@ +package storage + +import ( + "io/fs" + "os" + "syscall" + + "codeberg.org/gruf/go-fastpath" + "codeberg.org/gruf/go-store/v2/util" +) + +const ( + // default file permission bits + defaultDirPerms = 0o755 + defaultFilePerms = 0o644 + + // default file open flags + defaultFileROFlags = syscall.O_RDONLY + defaultFileRWFlags = syscall.O_CREAT | syscall.O_RDWR + defaultFileLockFlags = syscall.O_RDONLY | syscall.O_CREAT +) + +// NOTE: +// These functions are for opening storage files, +// not necessarily for e.g. initial setup (OpenFile) + +// walkDir traverses the dir tree of the supplied path, performing the supplied walkFn on each entry +func walkDir(pb *fastpath.Builder, path string, walkFn func(string, fs.DirEntry) error) error { + // Read directory entries + entries, err := readDir(path) + if err != nil { + return err + } + + // frame represents a directory entry + // walk-loop snapshot, taken when a sub + // directory requiring iteration is found + type frame struct { + path string + entries []fs.DirEntry + } + + // stack contains a list of held snapshot + // frames, representing unfinished upper + // layers of a directory structure yet to + // be traversed. + var stack []frame + +outer: + for { + if len(entries) == 0 { + if len(stack) == 0 { + // Reached end + break outer + } + + // Pop frame from stack + frame := stack[len(stack)-1] + stack = stack[:len(stack)-1] + + // Update loop vars + entries = frame.entries + path = frame.path + } + + for len(entries) > 0 { + // Pop next entry from queue + entry := entries[0] + entries = entries[1:] + + // Pass to provided walk function + if err := walkFn(path, entry); err != nil { + return err + } + + if entry.IsDir() { + // Push current frame to stack + stack = append(stack, frame{ + path: path, + entries: entries, + }) + + // Update current directory path + path = pb.Join(path, entry.Name()) + + // Read next directory entries + next, err := readDir(path) + if err != nil { + return err + } + + // Set next entries + entries = next + + continue outer + } + } + } + + return nil +} + +// cleanDirs traverses the dir tree of the supplied path, removing any folders with zero children +func cleanDirs(path string) error { + // Acquire path builder + pb := util.GetPathBuilder() + defer util.PutPathBuilder(pb) + + // Get top-level dir entries + entries, err := readDir(path) + if err != nil { + return err + } + + for _, entry := range entries { + if entry.IsDir() { + // Recursively clean sub-directory entries + if err := cleanDir(pb, pb.Join(path, entry.Name())); err != nil { + return err + } + } + } + + return nil +} + +// cleanDir performs the actual dir cleaning logic for the above top-level version. +func cleanDir(pb *fastpath.Builder, path string) error { + // Get dir entries + entries, err := readDir(path) + if err != nil { + return err + } + + // If no entries, delete + if len(entries) < 1 { + return rmdir(path) + } + + for _, entry := range entries { + if entry.IsDir() { + // Recursively clean sub-directory entries + if err := cleanDir(pb, pb.Join(path, entry.Name())); err != nil { + return err + } + } + } + + return nil +} + +// readDir will open file at path, read the unsorted list of entries, then close. +func readDir(path string) ([]fs.DirEntry, error) { + // Open file at path + file, err := open(path, defaultFileROFlags) + if err != nil { + return nil, err + } + + // Read directory entries + entries, err := file.ReadDir(-1) + + // Done with file + _ = file.Close() + + return entries, err +} + +// open will open a file at the given path with flags and default file perms. +func open(path string, flags int) (*os.File, error) { + var fd int + err := retryOnEINTR(func() (err error) { + fd, err = syscall.Open(path, flags, defaultFilePerms) + return + }) + if err != nil { + return nil, err + } + return os.NewFile(uintptr(fd), path), nil +} + +// stat checks for a file on disk. +func stat(path string) (bool, error) { + var stat syscall.Stat_t + err := retryOnEINTR(func() error { + return syscall.Stat(path, &stat) + }) + if err != nil { + if err == syscall.ENOENT { + // not-found is no error + err = nil + } + return false, err + } + return true, nil +} + +// unlink removes a file (not dir!) on disk. +func unlink(path string) error { + return retryOnEINTR(func() error { + return syscall.Unlink(path) + }) +} + +// rmdir removes a dir (not file!) on disk. +func rmdir(path string) error { + return retryOnEINTR(func() error { + return syscall.Rmdir(path) + }) +} + +// retryOnEINTR is a low-level filesystem function for retrying syscalls on O_EINTR received. +func retryOnEINTR(do func() error) error { + for { + err := do() + if err == syscall.EINTR { + continue + } + return err + } +} diff --git a/vendor/codeberg.org/gruf/go-store/storage/lock.go b/vendor/codeberg.org/gruf/go-store/v2/storage/lock.go index 8a6c4c5e8..25ecefe52 100644 --- a/vendor/codeberg.org/gruf/go-store/storage/lock.go +++ b/vendor/codeberg.org/gruf/go-store/v2/storage/lock.go @@ -1,11 +1,8 @@ package storage import ( - "sync" "sync/atomic" "syscall" - - "codeberg.org/gruf/go-store/util" ) // LockFile is our standard lockfile name. @@ -14,7 +11,6 @@ const LockFile = "store.lock" // Lock represents a filesystem lock to ensure only one storage instance open per path. type Lock struct { fd int - wg sync.WaitGroup st uint32 } @@ -23,7 +19,7 @@ func OpenLock(path string) (*Lock, error) { var fd int // Open the file descriptor at path - err := util.RetryOnEINTR(func() (err error) { + err := retryOnEINTR(func() (err error) { fd, err = syscall.Open(path, defaultFileLockFlags, defaultFilePerms) return }) @@ -32,7 +28,7 @@ func OpenLock(path string) (*Lock, error) { } // Get a flock on the file descriptor - err = util.RetryOnEINTR(func() error { + err = retryOnEINTR(func() error { return syscall.Flock(fd, syscall.LOCK_EX|syscall.LOCK_NB) }) if err != nil { @@ -42,28 +38,15 @@ func OpenLock(path string) (*Lock, error) { return &Lock{fd: fd}, nil } -// Add will add '1' to the underlying sync.WaitGroup. -func (f *Lock) Add() { - f.wg.Add(1) -} - -// Done will decrememnt '1' from the underlying sync.WaitGroup. -func (f *Lock) Done() { - f.wg.Done() -} - // Close will attempt to close the lockfile and file descriptor. func (f *Lock) Close() error { var err error if atomic.CompareAndSwapUint32(&f.st, 0, 1) { - // Wait until done - f.wg.Wait() - // Ensure gets closed defer syscall.Close(f.fd) // Call funlock on the file descriptor - err = util.RetryOnEINTR(func() error { + err = retryOnEINTR(func() error { return syscall.Flock(f.fd, syscall.LOCK_UN|syscall.LOCK_NB) }) } diff --git a/vendor/codeberg.org/gruf/go-store/v2/storage/memory.go b/vendor/codeberg.org/gruf/go-store/v2/storage/memory.go new file mode 100644 index 000000000..a853c84d2 --- /dev/null +++ b/vendor/codeberg.org/gruf/go-store/v2/storage/memory.go @@ -0,0 +1,228 @@ +package storage + +import ( + "context" + "io" + "sync/atomic" + + "codeberg.org/gruf/go-bytes" + "codeberg.org/gruf/go-store/v2/util" + "github.com/cornelk/hashmap" +) + +// MemoryStorage is a storage implementation that simply stores key-value +// pairs in a Go map in-memory. The map is protected by a mutex. +type MemoryStorage struct { + ow bool // overwrites + fs *hashmap.Map[string, []byte] + st uint32 +} + +// OpenMemory opens a new MemoryStorage instance with internal map starting size. +func OpenMemory(size int, overwrites bool) *MemoryStorage { + if size <= 0 { + size = 8 + } + return &MemoryStorage{ + fs: hashmap.NewSized[string, []byte](uintptr(size)), + ow: overwrites, + } +} + +// Clean implements Storage.Clean(). +func (st *MemoryStorage) Clean(ctx context.Context) error { + // Check store open + if st.closed() { + return ErrClosed + } + + // Check context still valid + if err := ctx.Err(); err != nil { + return err + } + + return nil +} + +// ReadBytes implements Storage.ReadBytes(). +func (st *MemoryStorage) ReadBytes(ctx context.Context, key string) ([]byte, error) { + // Check store open + if st.closed() { + return nil, ErrClosed + } + + // Check context still valid + if err := ctx.Err(); err != nil { + return nil, err + } + + // Check for key in store + b, ok := st.fs.Get(key) + if !ok { + return nil, ErrNotFound + } + + // Create return copy + return copyb(b), nil +} + +// ReadStream implements Storage.ReadStream(). +func (st *MemoryStorage) ReadStream(ctx context.Context, key string) (io.ReadCloser, error) { + // Check store open + if st.closed() { + return nil, ErrClosed + } + + // Check context still valid + if err := ctx.Err(); err != nil { + return nil, err + } + + // Check for key in store + b, ok := st.fs.Get(key) + if !ok { + return nil, ErrNotFound + } + + // Create io.ReadCloser from 'b' copy + r := bytes.NewReader(copyb(b)) + return util.NopReadCloser(r), nil +} + +// WriteBytes implements Storage.WriteBytes(). +func (st *MemoryStorage) WriteBytes(ctx context.Context, key string, b []byte) error { + // Check store open + if st.closed() { + return ErrClosed + } + + // Check context still valid + if err := ctx.Err(); err != nil { + return err + } + + // Check for key that already exists + if _, ok := st.fs.Get(key); ok && !st.ow { + return ErrAlreadyExists + } + + // Write key copy to store + st.fs.Set(key, copyb(b)) + return nil +} + +// WriteStream implements Storage.WriteStream(). +func (st *MemoryStorage) WriteStream(ctx context.Context, key string, r io.Reader) error { + // Check store open + if st.closed() { + return ErrClosed + } + + // Check context still valid + if err := ctx.Err(); err != nil { + return err + } + + // Check for key that already exists + if _, ok := st.fs.Get(key); ok && !st.ow { + return ErrAlreadyExists + } + + // Read all from reader + b, err := io.ReadAll(r) + if err != nil { + return err + } + + // Write key to store + st.fs.Set(key, b) + return nil +} + +// Stat implements Storage.Stat(). +func (st *MemoryStorage) Stat(ctx context.Context, key string) (bool, error) { + // Check store open + if st.closed() { + return false, ErrClosed + } + + // Check context still valid + if err := ctx.Err(); err != nil { + return false, err + } + + // Check for key in store + _, ok := st.fs.Get(key) + return ok, nil +} + +// Remove implements Storage.Remove(). +func (st *MemoryStorage) Remove(ctx context.Context, key string) error { + // Check store open + if st.closed() { + return ErrClosed + } + + // Check context still valid + if err := ctx.Err(); err != nil { + return err + } + + // Attempt to delete key + ok := st.fs.Del(key) + if !ok { + return ErrNotFound + } + + return nil +} + +// WalkKeys implements Storage.WalkKeys(). +func (st *MemoryStorage) WalkKeys(ctx context.Context, opts WalkKeysOptions) error { + // Check store open + if st.closed() { + return ErrClosed + } + + // Check context still valid + if err := ctx.Err(); err != nil { + return err + } + + var err error + + // Nil check func + _ = opts.WalkFn + + // Pass each key in map to walk function + st.fs.Range(func(key string, val []byte) bool { + err = opts.WalkFn(ctx, Entry{ + Key: key, + Size: int64(len(val)), + }) + return (err == nil) + }) + + return err +} + +// Close implements Storage.Close(). +func (st *MemoryStorage) Close() error { + atomic.StoreUint32(&st.st, 1) + return nil +} + +// closed returns whether MemoryStorage is closed. +func (st *MemoryStorage) closed() bool { + return (atomic.LoadUint32(&st.st) == 1) +} + +// copyb returns a copy of byte-slice b. +func copyb(b []byte) []byte { + if b == nil { + return nil + } + p := make([]byte, len(b)) + _ = copy(p, b) + return p +} diff --git a/vendor/codeberg.org/gruf/go-store/v2/storage/s3.go b/vendor/codeberg.org/gruf/go-store/v2/storage/s3.go new file mode 100644 index 000000000..baf2a1b3c --- /dev/null +++ b/vendor/codeberg.org/gruf/go-store/v2/storage/s3.go @@ -0,0 +1,385 @@ +package storage + +import ( + "bytes" + "context" + "io" + "sync/atomic" + + "codeberg.org/gruf/go-store/v2/util" + "github.com/minio/minio-go/v7" +) + +// DefaultS3Config is the default S3Storage configuration. +var DefaultS3Config = &S3Config{ + CoreOpts: minio.Options{}, + GetOpts: minio.GetObjectOptions{}, + PutOpts: minio.PutObjectOptions{}, + PutChunkSize: 4 * 1024 * 1024, // 4MiB + StatOpts: minio.StatObjectOptions{}, + RemoveOpts: minio.RemoveObjectOptions{}, + ListSize: 200, +} + +// S3Config defines options to be used when opening an S3Storage, +// mostly options for underlying S3 client library. +type S3Config struct { + // CoreOpts are S3 client options passed during initialization. + CoreOpts minio.Options + + // GetOpts are S3 client options passed during .Read___() calls. + GetOpts minio.GetObjectOptions + + // PutOpts are S3 client options passed during .Write___() calls. + PutOpts minio.PutObjectOptions + + // PutChunkSize is the chunk size (in bytes) to use when sending + // a byte stream reader of unknown size as a multi-part object. + PutChunkSize int64 + + // StatOpts are S3 client options passed during .Stat() calls. + StatOpts minio.StatObjectOptions + + // RemoveOpts are S3 client options passed during .Remove() calls. + RemoveOpts minio.RemoveObjectOptions + + // ListSize determines how many items to include in each + // list request, made during calls to .WalkKeys(). + ListSize int +} + +// getS3Config returns a valid S3Config for supplied ptr. +func getS3Config(cfg *S3Config) S3Config { + // If nil, use default + if cfg == nil { + cfg = DefaultS3Config + } + + // Assume 0 chunk size == use default + if cfg.PutChunkSize <= 0 { + cfg.PutChunkSize = 4 * 1024 * 1024 + } + + // Assume 0 list size == use default + if cfg.ListSize <= 0 { + cfg.ListSize = 200 + } + + // Return owned config copy + return S3Config{ + CoreOpts: cfg.CoreOpts, + GetOpts: cfg.GetOpts, + PutOpts: cfg.PutOpts, + StatOpts: cfg.StatOpts, + RemoveOpts: cfg.RemoveOpts, + } +} + +// S3Storage is a storage implementation that stores key-value +// pairs in an S3 instance at given endpoint with bucket name. +type S3Storage struct { + client *minio.Core + bucket string + config S3Config + state uint32 +} + +// OpenS3 opens a new S3Storage instance with given S3 endpoint URL, bucket name and configuration. +func OpenS3(endpoint string, bucket string, cfg *S3Config) (*S3Storage, error) { + // Get checked config + config := getS3Config(cfg) + + // Create new S3 client connection + client, err := minio.NewCore(endpoint, &config.CoreOpts) + if err != nil { + return nil, err + } + + // Check that provided bucket actually exists + exists, err := client.BucketExists(context.Background(), bucket) + if err != nil { + return nil, err + } else if !exists { + return nil, new_error("bucket does not exist") + } + + return &S3Storage{ + client: client, + bucket: bucket, + config: config, + }, nil +} + +// Client returns access to the underlying S3 client. +func (st *S3Storage) Client() *minio.Core { + return st.client +} + +// Clean implements Storage.Clean(). +func (st *S3Storage) Clean(ctx context.Context) error { + return nil // nothing to do for S3 +} + +// ReadBytes implements Storage.ReadBytes(). +func (st *S3Storage) ReadBytes(ctx context.Context, key string) ([]byte, error) { + // Fetch object reader from S3 bucket + rc, err := st.ReadStream(ctx, key) + if err != nil { + return nil, err + } + defer rc.Close() + + // Read all bytes and return + return io.ReadAll(rc) +} + +// ReadStream implements Storage.ReadStream(). +func (st *S3Storage) ReadStream(ctx context.Context, key string) (io.ReadCloser, error) { + // Check storage open + if st.closed() { + return nil, ErrClosed + } + + // Fetch object reader from S3 bucket + rc, _, _, err := st.client.GetObject( + ctx, + st.bucket, + key, + st.config.GetOpts, + ) + if err != nil { + return nil, transformS3Error(err) + } + + return rc, nil +} + +// WriteBytes implements Storage.WriteBytes(). +func (st *S3Storage) WriteBytes(ctx context.Context, key string, value []byte) error { + return st.WriteStream(ctx, key, util.NewByteReaderSize(value)) +} + +// WriteStream implements Storage.WriteStream(). +func (st *S3Storage) WriteStream(ctx context.Context, key string, r io.Reader) error { + // Check storage open + if st.closed() { + return ErrClosed + } + + if rs, ok := r.(util.ReaderSize); ok { + // This reader supports providing us the size of + // the encompassed data, allowing us to perform + // a singular .PutObject() call with length. + _, err := st.client.PutObject( + ctx, + st.bucket, + key, + r, + rs.Size(), + "", + "", + st.config.PutOpts, + ) + if err != nil { + return transformS3Error(err) + } + return nil + } + + // Start a new multipart upload to get ID + uploadID, err := st.client.NewMultipartUpload( + ctx, + st.bucket, + key, + st.config.PutOpts, + ) + if err != nil { + return transformS3Error(err) + } + + var ( + count int + parts []minio.CompletePart + chunk = make([]byte, st.config.PutChunkSize) + rdr = bytes.NewReader(nil) + ) + + // Note that we do not perform any kind of + // memory pooling of the chunk buffers here. + // Optimal chunking sizes for S3 writes are in + // the orders of megabytes, so letting the GC + // collect these ASAP is much preferred. + +loop: + for done := false; !done; { + // Read next chunk into byte buffer + n, err := io.ReadFull(r, chunk) + + switch err { + // Successful read + case nil: + + // Reached end, buffer empty + case io.EOF: + break loop + + // Reached end, but buffer not empty + case io.ErrUnexpectedEOF: + done = true + + // All other errors + default: + return err + } + + // Reset byte reader + rdr.Reset(chunk[:n]) + + // Put this object chunk in S3 store + pt, err := st.client.PutObjectPart( + ctx, + st.bucket, + key, + uploadID, + count, + rdr, + st.config.PutChunkSize, + "", + "", + nil, + ) + if err != nil { + return err + } + + // Append completed part to slice + parts = append(parts, minio.CompletePart{ + PartNumber: pt.PartNumber, + ETag: pt.ETag, + ChecksumCRC32: pt.ChecksumCRC32, + ChecksumCRC32C: pt.ChecksumCRC32C, + ChecksumSHA1: pt.ChecksumSHA1, + ChecksumSHA256: pt.ChecksumSHA256, + }) + + // Iterate part count + count++ + } + + // Complete this multi-part upload operation + _, err = st.client.CompleteMultipartUpload( + ctx, + st.bucket, + key, + uploadID, + parts, + st.config.PutOpts, + ) + if err != nil { + return err + } + + return nil +} + +// Stat implements Storage.Stat(). +func (st *S3Storage) Stat(ctx context.Context, key string) (bool, error) { + // Check storage open + if st.closed() { + return false, ErrClosed + } + + // Query object in S3 bucket + _, err := st.client.StatObject( + ctx, + st.bucket, + key, + st.config.StatOpts, + ) + if err != nil { + return false, transformS3Error(err) + } + + return true, nil +} + +// Remove implements Storage.Remove(). +func (st *S3Storage) Remove(ctx context.Context, key string) error { + // Check storage open + if st.closed() { + return ErrClosed + } + + // S3 returns no error on remove for non-existent keys + if ok, err := st.Stat(ctx, key); err != nil { + return err + } else if !ok { + return ErrNotFound + } + + // Remove object from S3 bucket + err := st.client.RemoveObject( + ctx, + st.bucket, + key, + st.config.RemoveOpts, + ) + if err != nil { + return transformS3Error(err) + } + + return nil +} + +// WalkKeys implements Storage.WalkKeys(). +func (st *S3Storage) WalkKeys(ctx context.Context, opts WalkKeysOptions) error { + var ( + prev string + token string + ) + + for { + // List the objects in bucket starting at marker + result, err := st.client.ListObjectsV2( + st.bucket, + "", + prev, + token, + "", + st.config.ListSize, + ) + if err != nil { + return err + } + + // Pass each object through walk func + for _, obj := range result.Contents { + if err := opts.WalkFn(ctx, Entry{ + Key: obj.Key, + Size: obj.Size, + }); err != nil { + return err + } + } + + // No token means we reached end of bucket + if result.NextContinuationToken == "" { + return nil + } + + // Set continue token and prev mark + token = result.NextContinuationToken + prev = result.StartAfter + } +} + +// Close implements Storage.Close(). +func (st *S3Storage) Close() error { + atomic.StoreUint32(&st.state, 1) + return nil +} + +// closed returns whether S3Storage is closed. +func (st *S3Storage) closed() bool { + return (atomic.LoadUint32(&st.state) == 1) +} diff --git a/vendor/codeberg.org/gruf/go-store/storage/storage.go b/vendor/codeberg.org/gruf/go-store/v2/storage/storage.go index 346aff097..00fbe7abd 100644 --- a/vendor/codeberg.org/gruf/go-store/storage/storage.go +++ b/vendor/codeberg.org/gruf/go-store/v2/storage/storage.go @@ -1,54 +1,53 @@ package storage import ( + "context" "io" ) -// StorageEntry defines a key in Storage -type StorageEntry interface { - // Key returns the storage entry's key - Key() string -} - -// entry is the simplest possible StorageEntry -type entry string - -func (e entry) Key() string { - return string(e) -} - // Storage defines a means of storing and accessing key value pairs type Storage interface { // ReadBytes returns the byte value for key in storage - ReadBytes(key string) ([]byte, error) + ReadBytes(ctx context.Context, key string) ([]byte, error) // ReadStream returns an io.ReadCloser for the value bytes at key in the storage - ReadStream(key string) (io.ReadCloser, error) + ReadStream(ctx context.Context, key string) (io.ReadCloser, error) // WriteBytes writes the supplied value bytes at key in the storage - WriteBytes(key string, value []byte) error + WriteBytes(ctx context.Context, key string, value []byte) error // WriteStream writes the bytes from supplied reader at key in the storage - WriteStream(key string, r io.Reader) error + WriteStream(ctx context.Context, key string, r io.Reader) error // Stat checks if the supplied key is in the storage - Stat(key string) (bool, error) + Stat(ctx context.Context, key string) (bool, error) // Remove attempts to remove the supplied key-value pair from storage - Remove(key string) error + Remove(ctx context.Context, key string) error // Close will close the storage, releasing any file locks Close() error // Clean removes unused values and unclutters the storage (e.g. removing empty folders) - Clean() error + Clean(ctx context.Context) error // WalkKeys walks the keys in the storage - WalkKeys(opts WalkKeysOptions) error + WalkKeys(ctx context.Context, opts WalkKeysOptions) error +} + +// Entry represents a key in a Storage{} implementation, +// with any associated metadata that may have been set. +type Entry struct { + // Key is this entry's unique storage key. + Key string + + // Size is the size of this entry in storage. + // Note that size < 0 indicates unknown. + Size int64 } // WalkKeysOptions defines how to walk the keys in a storage implementation type WalkKeysOptions struct { // WalkFn is the function to apply on each StorageEntry - WalkFn func(StorageEntry) + WalkFn func(context.Context, Entry) error } diff --git a/vendor/codeberg.org/gruf/go-store/storage/transform.go b/vendor/codeberg.org/gruf/go-store/v2/storage/transform.go index 3863dd774..3863dd774 100644 --- a/vendor/codeberg.org/gruf/go-store/storage/transform.go +++ b/vendor/codeberg.org/gruf/go-store/v2/storage/transform.go diff --git a/vendor/codeberg.org/gruf/go-store/v2/util/io.go b/vendor/codeberg.org/gruf/go-store/v2/util/io.go new file mode 100644 index 000000000..334ae4dd0 --- /dev/null +++ b/vendor/codeberg.org/gruf/go-store/v2/util/io.go @@ -0,0 +1,96 @@ +package util + +import ( + "bytes" + "io" +) + +// ReaderSize ... +type ReaderSize interface { + io.Reader + + // Size ... + Size() int64 +} + +// ByteReaderSize ... +type ByteReaderSize struct { + bytes.Reader + sz int64 +} + +// NewByteReaderSize ... +func NewByteReaderSize(b []byte) *ByteReaderSize { + rs := ByteReaderSize{} + rs.Reset(b) + return &rs +} + +// Size implements ReaderSize.Size(). +func (rs ByteReaderSize) Size() int64 { + return rs.sz +} + +// Reset resets the ReaderSize to be reading from b. +func (rs *ByteReaderSize) Reset(b []byte) { + rs.Reader.Reset(b) + rs.sz = int64(len(b)) +} + +// NopReadCloser turns a supplied io.Reader into io.ReadCloser with a nop Close() implementation. +func NopReadCloser(r io.Reader) io.ReadCloser { + return &nopReadCloser{r} +} + +// NopWriteCloser turns a supplied io.Writer into io.WriteCloser with a nop Close() implementation. +func NopWriteCloser(w io.Writer) io.WriteCloser { + return &nopWriteCloser{w} +} + +// ReadCloserWithCallback adds a customizable callback to be called upon Close() of a supplied io.ReadCloser. +func ReadCloserWithCallback(rc io.ReadCloser, cb func()) io.ReadCloser { + return &callbackReadCloser{ + ReadCloser: rc, + callback: cb, + } +} + +// WriteCloserWithCallback adds a customizable callback to be called upon Close() of a supplied io.WriteCloser. +func WriteCloserWithCallback(wc io.WriteCloser, cb func()) io.WriteCloser { + return &callbackWriteCloser{ + WriteCloser: wc, + callback: cb, + } +} + +// nopReadCloser turns an io.Reader -> io.ReadCloser with a nop Close(). +type nopReadCloser struct{ io.Reader } + +func (r *nopReadCloser) Close() error { return nil } + +// nopWriteCloser turns an io.Writer -> io.WriteCloser with a nop Close(). +type nopWriteCloser struct{ io.Writer } + +func (w nopWriteCloser) Close() error { return nil } + +// callbackReadCloser allows adding our own custom callback to an io.ReadCloser. +type callbackReadCloser struct { + io.ReadCloser + callback func() +} + +func (c *callbackReadCloser) Close() error { + defer c.callback() + return c.ReadCloser.Close() +} + +// callbackWriteCloser allows adding our own custom callback to an io.WriteCloser. +type callbackWriteCloser struct { + io.WriteCloser + callback func() +} + +func (c *callbackWriteCloser) Close() error { + defer c.callback() + return c.WriteCloser.Close() +} diff --git a/vendor/codeberg.org/gruf/go-store/util/pool.go b/vendor/codeberg.org/gruf/go-store/v2/util/pool.go index 8400cb5b7..dc35dae01 100644 --- a/vendor/codeberg.org/gruf/go-store/util/pool.go +++ b/vendor/codeberg.org/gruf/go-store/v2/util/pool.go @@ -5,15 +5,15 @@ import ( "codeberg.org/gruf/go-pools" ) -// pathBuilderPool is the global fastpath.Builder pool +// pathBuilderPool is the global fastpath.Builder pool. var pathBuilderPool = pools.NewPathBuilderPool(512) -// GetPathBuilder fetches a fastpath.Builder object from the pool +// GetPathBuilder fetches a fastpath.Builder object from the pool. func GetPathBuilder() *fastpath.Builder { return pathBuilderPool.Get() } -// PutPathBuilder places supplied fastpath.Builder back in the pool +// PutPathBuilder places supplied fastpath.Builder back in the pool. func PutPathBuilder(pb *fastpath.Builder) { pb.Reset() pathBuilderPool.Put(pb) |