diff options
Diffstat (limited to 'vendor/codeberg.org/gruf/go-store')
-rw-r--r-- | vendor/codeberg.org/gruf/go-store/kv/iterator.go | 10 | ||||
-rw-r--r-- | vendor/codeberg.org/gruf/go-store/kv/state.go | 82 | ||||
-rw-r--r-- | vendor/codeberg.org/gruf/go-store/kv/store.go | 106 | ||||
-rw-r--r-- | vendor/codeberg.org/gruf/go-store/storage/block.go | 117 | ||||
-rw-r--r-- | vendor/codeberg.org/gruf/go-store/storage/disk.go | 136 | ||||
-rw-r--r-- | vendor/codeberg.org/gruf/go-store/storage/errors.go | 14 | ||||
-rw-r--r-- | vendor/codeberg.org/gruf/go-store/storage/fs.go | 11 | ||||
-rw-r--r-- | vendor/codeberg.org/gruf/go-store/storage/lock.go | 70 | ||||
-rw-r--r-- | vendor/codeberg.org/gruf/go-store/storage/memory.go | 103 | ||||
-rw-r--r-- | vendor/codeberg.org/gruf/go-store/storage/storage.go | 9 | ||||
-rw-r--r-- | vendor/codeberg.org/gruf/go-store/util/fs.go | 31 | ||||
-rw-r--r-- | vendor/codeberg.org/gruf/go-store/util/sys.go | 14 |
12 files changed, 482 insertions, 221 deletions
diff --git a/vendor/codeberg.org/gruf/go-store/kv/iterator.go b/vendor/codeberg.org/gruf/go-store/kv/iterator.go index d3999273f..da743ead1 100644 --- a/vendor/codeberg.org/gruf/go-store/kv/iterator.go +++ b/vendor/codeberg.org/gruf/go-store/kv/iterator.go @@ -2,6 +2,7 @@ package kv import ( "codeberg.org/gruf/go-errors" + "codeberg.org/gruf/go-mutexes" "codeberg.org/gruf/go-store/storage" ) @@ -17,10 +18,10 @@ var ErrIteratorClosed = errors.New("store/kv: iterator closed") // have multiple iterators running concurrently type KVIterator struct { store *KVStore // store is the linked KVStore + state *mutexes.LockState entries []storage.StorageEntry index int key string - onClose func() } // Next attempts to set the next key-value pair, the @@ -43,13 +44,10 @@ func (i *KVIterator) Key() string { // Release releases the KVIterator and KVStore's read lock func (i *KVIterator) Release() { - // Reset key, path, entries + i.state.UnlockMap() i.store = nil i.key = "" i.entries = nil - - // Perform requested callback - i.onClose() } // Value returns the next value from the KVStore @@ -60,5 +58,5 @@ func (i *KVIterator) Value() ([]byte, error) { } // Attempt to fetch from store - return i.store.get(i.key) + return i.store.get(i.state.RLock, i.key) } diff --git a/vendor/codeberg.org/gruf/go-store/kv/state.go b/vendor/codeberg.org/gruf/go-store/kv/state.go index 20a3e951d..0b226e107 100644 --- a/vendor/codeberg.org/gruf/go-store/kv/state.go +++ b/vendor/codeberg.org/gruf/go-store/kv/state.go @@ -2,9 +2,9 @@ package kv import ( "io" - "sync" "codeberg.org/gruf/go-errors" + "codeberg.org/gruf/go-mutexes" ) var ErrStateClosed = errors.New("store/kv: state closed") @@ -16,61 +16,42 @@ var ErrStateClosed = errors.New("store/kv: state closed") // then the state has zero guarantees type StateRO struct { store *KVStore - mutex sync.RWMutex + state *mutexes.LockState } func (st *StateRO) Get(key string) ([]byte, error) { - // Get state read lock - st.mutex.RLock() - defer st.mutex.RUnlock() - // Check not closed if st.store == nil { return nil, ErrStateClosed } // Pass request to store - return st.store.get(key) + return st.store.get(st.state.RLock, key) } func (st *StateRO) GetStream(key string) (io.ReadCloser, error) { - // Get state read lock - st.mutex.RLock() - defer st.mutex.RUnlock() - // Check not closed if st.store == nil { return nil, ErrStateClosed } // Pass request to store - return st.store.getStream(key) + return st.store.getStream(st.state.RLock, key) } func (st *StateRO) Has(key string) (bool, error) { - // Get state read lock - st.mutex.RLock() - defer st.mutex.RUnlock() - // Check not closed if st.store == nil { return false, ErrStateClosed } // Pass request to store - return st.store.has(key) + return st.store.has(st.state.RLock, key) } func (st *StateRO) Release() { - // Get state write lock - st.mutex.Lock() - defer st.mutex.Unlock() - - // Release the store - if st.store != nil { - st.store.mutex.RUnlock() - st.store = nil - } + st.state.UnlockMap() + st.store = nil } // StateRW provides a read-write window to the store. While this @@ -80,101 +61,70 @@ func (st *StateRO) Release() { // then the state has zero guarantees type StateRW struct { store *KVStore - mutex sync.RWMutex + state *mutexes.LockState } func (st *StateRW) Get(key string) ([]byte, error) { - // Get state read lock - st.mutex.RLock() - defer st.mutex.RUnlock() - // Check not closed if st.store == nil { return nil, ErrStateClosed } // Pass request to store - return st.store.get(key) + return st.store.get(st.state.RLock, key) } func (st *StateRW) GetStream(key string) (io.ReadCloser, error) { - // Get state read lock - st.mutex.RLock() - defer st.mutex.RUnlock() - // Check not closed if st.store == nil { return nil, ErrStateClosed } // Pass request to store - return st.store.getStream(key) + return st.store.getStream(st.state.RLock, key) } func (st *StateRW) Put(key string, value []byte) error { - // Get state read lock - st.mutex.RLock() - defer st.mutex.RUnlock() - // Check not closed if st.store == nil { return ErrStateClosed } // Pass request to store - return st.store.put(key, value) + return st.store.put(st.state.Lock, key, value) } func (st *StateRW) PutStream(key string, r io.Reader) error { - // Get state read lock - st.mutex.RLock() - defer st.mutex.RUnlock() - // Check not closed if st.store == nil { return ErrStateClosed } // Pass request to store - return st.store.putStream(key, r) + return st.store.putStream(st.state.Lock, key, r) } func (st *StateRW) Has(key string) (bool, error) { - // Get state read lock - st.mutex.RLock() - defer st.mutex.RUnlock() - // Check not closed if st.store == nil { return false, ErrStateClosed } // Pass request to store - return st.store.has(key) + return st.store.has(st.state.RLock, key) } func (st *StateRW) Delete(key string) error { - // Get state read lock - st.mutex.RLock() - defer st.mutex.RUnlock() - // Check not closed if st.store == nil { return ErrStateClosed } // Pass request to store - return st.store.delete(key) + return st.store.delete(st.state.Lock, key) } func (st *StateRW) Release() { - // Get state write lock - st.mutex.Lock() - defer st.mutex.Unlock() - - // Release the store - if st.store != nil { - st.store.mutex.Unlock() - st.store = nil - } + st.state.UnlockMap() + st.store = nil } diff --git a/vendor/codeberg.org/gruf/go-store/kv/store.go b/vendor/codeberg.org/gruf/go-store/kv/store.go index 34fe91987..a8741afe0 100644 --- a/vendor/codeberg.org/gruf/go-store/kv/store.go +++ b/vendor/codeberg.org/gruf/go-store/kv/store.go @@ -2,7 +2,6 @@ package kv import ( "io" - "sync" "codeberg.org/gruf/go-mutexes" "codeberg.org/gruf/go-store/storage" @@ -11,9 +10,8 @@ import ( // KVStore is a very simple, yet performant key-value store type KVStore struct { - mutexMap mutexes.MutexMap // mutexMap is a map of keys to mutexes to protect file access - mutex sync.RWMutex // mutex is the total store mutex - storage storage.Storage // storage is the underlying storage + mutex mutexes.MutexMap // mutex is a map of keys to mutexes to protect file access + storage storage.Storage // storage is the underlying storage } func OpenFile(path string, cfg *storage.DiskConfig) (*KVStore, error) { @@ -47,25 +45,29 @@ func OpenStorage(storage storage.Storage) (*KVStore, error) { // Return new KVStore return &KVStore{ - mutexMap: mutexes.NewMap(mutexes.NewRW), - mutex: sync.RWMutex{}, - storage: storage, + mutex: mutexes.NewMap(-1), + storage: storage, }, nil } +// RLock acquires a read-lock on supplied key, returning unlock function. +func (st *KVStore) RLock(key string) (runlock func()) { + return st.mutex.RLock(key) +} + +// Lock acquires a write-lock on supplied key, returning unlock function. +func (st *KVStore) Lock(key string) (unlock func()) { + return st.mutex.Lock(key) +} + // Get fetches the bytes for supplied key in the store func (st *KVStore) Get(key string) ([]byte, error) { - // Acquire store read lock - st.mutex.RLock() - defer st.mutex.RUnlock() - - // Pass to unprotected fn - return st.get(key) + return st.get(st.RLock, key) } -func (st *KVStore) get(key string) ([]byte, error) { +func (st *KVStore) get(rlock func(string) func(), key string) ([]byte, error) { // Acquire read lock for key - runlock := st.mutexMap.RLock(key) + runlock := rlock(key) defer runlock() // Read file bytes @@ -74,17 +76,12 @@ func (st *KVStore) get(key string) ([]byte, error) { // GetStream fetches a ReadCloser for the bytes at the supplied key location in the store func (st *KVStore) GetStream(key string) (io.ReadCloser, error) { - // Acquire store read lock - st.mutex.RLock() - defer st.mutex.RUnlock() - - // Pass to unprotected fn - return st.getStream(key) + return st.getStream(st.RLock, key) } -func (st *KVStore) getStream(key string) (io.ReadCloser, error) { +func (st *KVStore) getStream(rlock func(string) func(), key string) (io.ReadCloser, error) { // Acquire read lock for key - runlock := st.mutexMap.RLock(key) + runlock := rlock(key) // Attempt to open stream for read rd, err := st.storage.ReadStream(key) @@ -99,17 +96,12 @@ func (st *KVStore) getStream(key string) (io.ReadCloser, error) { // Put places the bytes at the supplied key location in the store func (st *KVStore) Put(key string, value []byte) error { - // Acquire store write lock - st.mutex.Lock() - defer st.mutex.Unlock() - - // Pass to unprotected fn - return st.put(key, value) + return st.put(st.Lock, key, value) } -func (st *KVStore) put(key string, value []byte) error { +func (st *KVStore) put(lock func(string) func(), key string, value []byte) error { // Acquire write lock for key - unlock := st.mutexMap.Lock(key) + unlock := lock(key) defer unlock() // Write file bytes @@ -118,17 +110,12 @@ func (st *KVStore) put(key string, value []byte) error { // PutStream writes the bytes from the supplied Reader at the supplied key location in the store func (st *KVStore) PutStream(key string, r io.Reader) error { - // Acquire store write lock - st.mutex.Lock() - defer st.mutex.Unlock() - - // Pass to unprotected fn - return st.putStream(key, r) + return st.putStream(st.Lock, key, r) } -func (st *KVStore) putStream(key string, r io.Reader) error { +func (st *KVStore) putStream(lock func(string) func(), key string, r io.Reader) error { // Acquire write lock for key - unlock := st.mutexMap.Lock(key) + unlock := lock(key) defer unlock() // Write file stream @@ -137,17 +124,12 @@ func (st *KVStore) putStream(key string, r io.Reader) error { // Has checks whether the supplied key exists in the store func (st *KVStore) Has(key string) (bool, error) { - // Acquire store read lock - st.mutex.RLock() - defer st.mutex.RUnlock() - - // Pass to unprotected fn - return st.has(key) + return st.has(st.RLock, key) } -func (st *KVStore) has(key string) (bool, error) { +func (st *KVStore) has(rlock func(string) func(), key string) (bool, error) { // Acquire read lock for key - runlock := st.mutexMap.RLock(key) + runlock := rlock(key) defer runlock() // Stat file on disk @@ -156,17 +138,12 @@ func (st *KVStore) has(key string) (bool, error) { // Delete removes the supplied key-value pair from the store func (st *KVStore) Delete(key string) error { - // Acquire store write lock - st.mutex.Lock() - defer st.mutex.Unlock() - - // Pass to unprotected fn - return st.delete(key) + return st.delete(st.Lock, key) } -func (st *KVStore) delete(key string) error { +func (st *KVStore) delete(lock func(string) func(), key string) error { // Acquire write lock for key - unlock := st.mutexMap.Lock(key) + unlock := lock(key) defer unlock() // Remove file from disk @@ -181,7 +158,7 @@ func (st *KVStore) Iterator(matchFn func(string) bool) (*KVIterator, error) { } // Get store read lock - st.mutex.RLock() + state := st.mutex.RLockMap() // Setup the walk keys function entries := []storage.StorageEntry{} @@ -198,24 +175,24 @@ func (st *KVStore) Iterator(matchFn func(string) bool) (*KVIterator, error) { // Walk keys in the storage err := st.storage.WalkKeys(storage.WalkKeysOptions{WalkFn: walkFn}) if err != nil { - st.mutex.RUnlock() + state.UnlockMap() return nil, err } // Return new iterator return &KVIterator{ store: st, + state: state, entries: entries, index: -1, key: "", - onClose: st.mutex.RUnlock, }, nil } // Read provides a read-only window to the store, holding it in a read-locked state until release func (st *KVStore) Read() *StateRO { - st.mutex.RLock() - return &StateRO{store: st} + state := st.mutex.RLockMap() + return &StateRO{store: st, state: state} } // ReadFn provides a read-only window to the store, holding it in a read-locked state until fn return. @@ -230,8 +207,8 @@ func (st *KVStore) ReadFn(fn func(*StateRO)) { // Update provides a read-write window to the store, holding it in a write-locked state until release func (st *KVStore) Update() *StateRW { - st.mutex.Lock() - return &StateRW{store: st} + state := st.mutex.LockMap() + return &StateRW{store: st, state: state} } // UpdateFn provides a read-write window to the store, holding it in a write-locked state until fn return. @@ -243,3 +220,8 @@ func (st *KVStore) UpdateFn(fn func(*StateRW)) { // Pass to fn fn(state) } + +// Close will close the underlying storage, the mutex map locking (e.g. RLock(), Lock() will still work). +func (st *KVStore) Close() error { + return st.storage.Close() +} diff --git a/vendor/codeberg.org/gruf/go-store/storage/block.go b/vendor/codeberg.org/gruf/go-store/storage/block.go index 9a8c4dc7d..c50faa10b 100644 --- a/vendor/codeberg.org/gruf/go-store/storage/block.go +++ b/vendor/codeberg.org/gruf/go-store/storage/block.go @@ -87,6 +87,7 @@ type BlockStorage struct { config BlockConfig // cfg is the supplied configuration for this store hashPool sync.Pool // hashPool is this store's hashEncoder pool bufpool pools.BufferPool // bufpool is this store's bytes.Buffer pool + lock *Lock // lock is the opened lockfile for this storage instance // NOTE: // BlockStorage does not need to lock each of the underlying block files @@ -138,6 +139,12 @@ func OpenBlock(path string, cfg *BlockConfig) (*BlockStorage, error) { return nil, errPathIsFile } + // Open and acquire storage lock for path + lock, err := OpenLock(pb.Join(path, LockFile)) + if err != nil { + return nil, err + } + // Figure out the largest size for bufpool slices bufSz := encodedHashLen if bufSz < config.BlockSize { @@ -159,19 +166,29 @@ func OpenBlock(path string, cfg *BlockConfig) (*BlockStorage, error) { }, }, bufpool: pools.NewBufferPool(bufSz), + lock: lock, }, nil } // Clean implements storage.Clean() func (st *BlockStorage) Clean() error { - nodes := map[string]*node{} + // Track open + st.lock.Add() + defer st.lock.Done() + + // Check if open + if st.lock.Closed() { + return ErrClosed + } // Acquire path builder pb := util.GetPathBuilder() defer util.PutPathBuilder(pb) - // Walk nodes dir for entries + nodes := map[string]*node{} onceErr := errors.OnceError{} + + // Walk nodes dir for entries err := util.WalkDir(pb, st.nodePath, func(npath string, fsentry fs.DirEntry) { // Only deal with regular files if !fsentry.Type().IsRegular() { @@ -293,6 +310,7 @@ func (st *BlockStorage) ReadBytes(key string) ([]byte, error) { if err != nil { return nil, err } + defer rc.Close() // Read all bytes and return return io.ReadAll(rc) @@ -306,9 +324,19 @@ func (st *BlockStorage) ReadStream(key string) (io.ReadCloser, error) { return nil, err } + // Track open + st.lock.Add() + + // Check if open + if st.lock.Closed() { + st.lock.Done() + return nil, ErrClosed + } + // Attempt to open RO file file, err := open(npath, defaultFileROFlags) if err != nil { + st.lock.Done() return nil, err } defer file.Close() @@ -328,14 +356,16 @@ func (st *BlockStorage) ReadStream(key string) (io.ReadCloser, error) { nil, ) if err != nil { + st.lock.Done() return nil, err } - // Return new block reader - return util.NopReadCloser(&blockReader{ + // Prepare block reader and return + rc := util.NopReadCloser(&blockReader{ storage: st, node: &node, - }), nil + }) // we wrap the blockreader to decr lockfile waitgroup + return util.ReadCloserWithCallback(rc, st.lock.Done), nil } func (st *BlockStorage) readBlock(key string) ([]byte, error) { @@ -373,6 +403,15 @@ func (st *BlockStorage) WriteStream(key string, r io.Reader) error { return err } + // Track open + st.lock.Add() + defer st.lock.Done() + + // Check if open + if st.lock.Closed() { + return ErrClosed + } + // Check if this exists ok, err := stat(key) if err != nil { @@ -443,11 +482,16 @@ loop: continue loop } - // Write in separate goroutine + // Check if reached EOF + atEOF := (n < buf.Len()) + wg.Add(1) go func() { - // Defer buffer release + signal done + // Perform writes in goroutine + defer func() { + // Defer release + + // signal we're done st.bufpool.Put(buf) wg.Done() }() @@ -460,8 +504,8 @@ loop: } }() - // We reached EOF - if n < buf.Len() { + // Break at end + if atEOF { break loop } } @@ -552,6 +596,15 @@ func (st *BlockStorage) Stat(key string) (bool, error) { return false, err } + // Track open + st.lock.Add() + defer st.lock.Done() + + // Check if open + if st.lock.Closed() { + return false, ErrClosed + } + // Check for file on disk return stat(kpath) } @@ -564,12 +617,35 @@ func (st *BlockStorage) Remove(key string) error { return err } + // Track open + st.lock.Add() + defer st.lock.Done() + + // Check if open + if st.lock.Closed() { + return ErrClosed + } + // Attempt to remove file return os.Remove(kpath) } +// Close implements Storage.Close() +func (st *BlockStorage) Close() error { + return st.lock.Close() +} + // WalkKeys implements Storage.WalkKeys() func (st *BlockStorage) WalkKeys(opts WalkKeysOptions) error { + // Track open + st.lock.Add() + defer st.lock.Done() + + // Check if open + if st.lock.Closed() { + return ErrClosed + } + // Acquire path builder pb := util.GetPathBuilder() defer util.PutPathBuilder(pb) @@ -610,7 +686,7 @@ func (st *BlockStorage) blockPathForKey(hash string) string { } // hashSeparator is the separating byte between block hashes -const hashSeparator = byte(':') +const hashSeparator = byte('\n') // node represents the contents of a node file in storage type node struct { @@ -773,24 +849,27 @@ func (r *blockReader) Read(b []byte) (int, error) { } } +var ( + // base64Encoding is our base64 encoding object. + base64Encoding = hashenc.Base64() + + // encodedHashLen is the once-calculated encoded hash-sum length + encodedHashLen = base64Encoding.EncodedLen( + sha256.New().Size(), + ) +) + // hashEncoder is a HashEncoder with built-in encode buffer type hashEncoder struct { henc hashenc.HashEncoder ebuf []byte } -// encodedHashLen is the once-calculated encoded hash-sum length -var encodedHashLen = hashenc.Base64().EncodedLen( - sha256.New().Size(), -) - // newHashEncoder returns a new hashEncoder instance func newHashEncoder() *hashEncoder { - hash := sha256.New() - enc := hashenc.Base64() return &hashEncoder{ - henc: hashenc.New(hash, enc), - ebuf: make([]byte, enc.EncodedLen(hash.Size())), + henc: hashenc.New(sha256.New(), base64Encoding), + ebuf: make([]byte, encodedHashLen), } } diff --git a/vendor/codeberg.org/gruf/go-store/storage/disk.go b/vendor/codeberg.org/gruf/go-store/storage/disk.go index 060d56688..287042886 100644 --- a/vendor/codeberg.org/gruf/go-store/storage/disk.go +++ b/vendor/codeberg.org/gruf/go-store/storage/disk.go @@ -5,6 +5,8 @@ import ( "io/fs" "os" "path" + _path "path" + "strings" "syscall" "codeberg.org/gruf/go-bytes" @@ -31,6 +33,11 @@ type DiskConfig struct { // Overwrite allows overwriting values of stored keys in the storage Overwrite bool + // LockFile allows specifying the filesystem path to use for the lockfile, + // providing only a filename it will store the lockfile within provided store + // path and nest the store under `path/store` to prevent access to lockfile + LockFile string + // Compression is the Compressor to use when reading / writing files, default is no compression Compression Compressor } @@ -57,11 +64,17 @@ func getDiskConfig(cfg *DiskConfig) DiskConfig { cfg.WriteBufSize = DefaultDiskConfig.WriteBufSize } + // Assume empty lockfile path == use default + if len(cfg.LockFile) < 1 { + cfg.LockFile = LockFile + } + // Return owned config copy return DiskConfig{ Transform: cfg.Transform, WriteBufSize: cfg.WriteBufSize, Overwrite: cfg.Overwrite, + LockFile: cfg.LockFile, Compression: cfg.Compression, } } @@ -71,23 +84,35 @@ type DiskStorage struct { path string // path is the root path of this store bufp pools.BufferPool // bufp is the buffer pool for this DiskStorage config DiskConfig // cfg is the supplied configuration for this store + lock *Lock // lock is the opened lockfile for this storage instance } // OpenFile opens a DiskStorage instance for given folder path and configuration func OpenFile(path string, cfg *DiskConfig) (*DiskStorage, error) { + // Get checked config + config := getDiskConfig(cfg) + // Acquire path builder pb := util.GetPathBuilder() defer util.PutPathBuilder(pb) - // Clean provided path, ensure ends in '/' (should - // be dir, this helps with file path trimming later) - path = pb.Clean(path) + "/" + // Clean provided store path, ensure + // ends in '/' to help later path trimming + storePath := pb.Clean(path) + "/" - // Get checked config - config := getDiskConfig(cfg) + // Clean provided lockfile path + lockfile := pb.Clean(config.LockFile) + + // Check if lockfile is an *actual* path or just filename + if lockDir, _ := _path.Split(lockfile); len(lockDir) < 1 { + // Lockfile is a filename, store must be nested under + // $storePath/store to prevent access to the lockfile + storePath += "store/" + lockfile = pb.Join(path, lockfile) + } // Attempt to open dir path - file, err := os.OpenFile(path, defaultFileROFlags, defaultDirPerms) + file, err := os.OpenFile(storePath, defaultFileROFlags, defaultDirPerms) if err != nil { // If not a not-exist error, return if !os.IsNotExist(err) { @@ -95,13 +120,13 @@ func OpenFile(path string, cfg *DiskConfig) (*DiskStorage, error) { } // Attempt to make store path dirs - err = os.MkdirAll(path, defaultDirPerms) + err = os.MkdirAll(storePath, defaultDirPerms) if err != nil { return nil, err } // Reopen dir now it's been created - file, err = os.OpenFile(path, defaultFileROFlags, defaultDirPerms) + file, err = os.OpenFile(storePath, defaultFileROFlags, defaultDirPerms) if err != nil { return nil, err } @@ -116,16 +141,28 @@ func OpenFile(path string, cfg *DiskConfig) (*DiskStorage, error) { return nil, errPathIsFile } + // Open and acquire storage lock for path + lock, err := OpenLock(lockfile) + if err != nil { + return nil, err + } + // Return new DiskStorage return &DiskStorage{ - path: path, + path: storePath, bufp: pools.NewBufferPool(config.WriteBufSize), config: config, + lock: lock, }, nil } // Clean implements Storage.Clean() func (st *DiskStorage) Clean() error { + st.lock.Add() + defer st.lock.Done() + if st.lock.Closed() { + return ErrClosed + } return util.CleanDirs(st.path) } @@ -150,9 +187,18 @@ func (st *DiskStorage) ReadStream(key string) (io.ReadCloser, error) { return nil, err } + // Track open + st.lock.Add() + + // Check if open + if st.lock.Closed() { + return nil, ErrClosed + } + // Attempt to open file (replace ENOENT with our own) file, err := open(kpath, defaultFileROFlags) if err != nil { + st.lock.Done() return nil, errSwapNotFound(err) } @@ -160,12 +206,14 @@ func (st *DiskStorage) ReadStream(key string) (io.ReadCloser, error) { cFile, err := st.config.Compression.Reader(file) if err != nil { file.Close() // close this here, ignore error + st.lock.Done() return nil, err } // Wrap compressor to ensure file close return util.ReadCloserWithCallback(cFile, func() { file.Close() + st.lock.Done() }), nil } @@ -182,6 +230,15 @@ func (st *DiskStorage) WriteStream(key string, r io.Reader) error { return err } + // Track open + st.lock.Add() + defer st.lock.Done() + + // Check if open + if st.lock.Closed() { + return ErrClosed + } + // Ensure dirs leading up to file exist err = os.MkdirAll(path.Dir(kpath), defaultDirPerms) if err != nil { @@ -232,6 +289,15 @@ func (st *DiskStorage) Stat(key string) (bool, error) { return false, err } + // Track open + st.lock.Add() + defer st.lock.Done() + + // Check if open + if st.lock.Closed() { + return false, ErrClosed + } + // Check for file on disk return stat(kpath) } @@ -244,20 +310,44 @@ func (st *DiskStorage) Remove(key string) error { return err } + // Track open + st.lock.Add() + defer st.lock.Done() + + // Check if open + if st.lock.Closed() { + return ErrClosed + } + // Attempt to remove file return os.Remove(kpath) } +// Close implements Storage.Close() +func (st *DiskStorage) Close() error { + return st.lock.Close() +} + // WalkKeys implements Storage.WalkKeys() func (st *DiskStorage) WalkKeys(opts WalkKeysOptions) error { + // Track open + st.lock.Add() + defer st.lock.Done() + + // Check if open + if st.lock.Closed() { + return ErrClosed + } + // Acquire path builder pb := util.GetPathBuilder() defer util.PutPathBuilder(pb) // Walk dir for entries return util.WalkDir(pb, st.path, func(kpath string, fsentry fs.DirEntry) { - // Only deal with regular files if fsentry.Type().IsRegular() { + // Only deal with regular files + // Get full item path (without root) kpath = pb.Join(kpath, fsentry.Name())[len(st.path):] @@ -269,21 +359,39 @@ func (st *DiskStorage) WalkKeys(opts WalkKeysOptions) error { // filepath checks and returns a formatted filepath for given key func (st *DiskStorage) filepath(key string) (string, error) { + // Calculate transformed key path + key = st.config.Transform.KeyToPath(key) + // Acquire path builder pb := util.GetPathBuilder() defer util.PutPathBuilder(pb) - // Calculate transformed key path - key = st.config.Transform.KeyToPath(key) - // Generated joined root path pb.AppendString(st.path) pb.AppendString(key) // Check for dir traversal outside of root - if util.IsDirTraversal(st.path, pb.StringPtr()) { + if isDirTraversal(st.path, pb.StringPtr()) { return "", ErrInvalidKey } return pb.String(), nil } + +// isDirTraversal will check if rootPlusPath is a dir traversal outside of root, +// assuming that both are cleaned and that rootPlusPath is path.Join(root, somePath) +func isDirTraversal(root, rootPlusPath string) bool { + switch { + // Root is $PWD, check for traversal out of + case root == ".": + return strings.HasPrefix(rootPlusPath, "../") + + // The path MUST be prefixed by root + case !strings.HasPrefix(rootPlusPath, root): + return true + + // In all other cases, check not equal + default: + return len(root) == len(rootPlusPath) + } +} diff --git a/vendor/codeberg.org/gruf/go-store/storage/errors.go b/vendor/codeberg.org/gruf/go-store/storage/errors.go index 016593596..ad2b742e6 100644 --- a/vendor/codeberg.org/gruf/go-store/storage/errors.go +++ b/vendor/codeberg.org/gruf/go-store/storage/errors.go @@ -19,6 +19,9 @@ func (e errorString) Extend(s string, a ...interface{}) errorString { } var ( + // ErrClosed is returned on operations on a closed storage + ErrClosed = errorString("store/storage: closed") + // ErrNotFound is the error returned when a key cannot be found in storage ErrNotFound = errorString("store/storage: key not found") @@ -39,6 +42,9 @@ var ( // errCorruptNodes is returned when nodes with missing blocks are found during a BlockStorage clean errCorruptNodes = errorString("store/storage: corrupted nodes") + + // ErrAlreadyLocked is returned on fail opening a storage lockfile + ErrAlreadyLocked = errorString("store/storage: storage lock already open") ) // errSwapNoop performs no error swaps @@ -61,3 +67,11 @@ func errSwapExist(err error) error { } return err } + +// errSwapUnavailable swaps syscall.EAGAIN for ErrAlreadyLocked +func errSwapUnavailable(err error) error { + if err == syscall.EAGAIN { + return ErrAlreadyLocked + } + return err +} diff --git a/vendor/codeberg.org/gruf/go-store/storage/fs.go b/vendor/codeberg.org/gruf/go-store/storage/fs.go index 444cee4b0..b1c3560d2 100644 --- a/vendor/codeberg.org/gruf/go-store/storage/fs.go +++ b/vendor/codeberg.org/gruf/go-store/storage/fs.go @@ -8,11 +8,14 @@ import ( ) const ( - defaultDirPerms = 0755 - defaultFilePerms = 0644 + // default file permission bits + defaultDirPerms = 0755 + defaultFilePerms = 0644 + + // default file open flags defaultFileROFlags = syscall.O_RDONLY defaultFileRWFlags = syscall.O_CREAT | syscall.O_RDWR - defaultFileLockFlags = syscall.O_RDONLY | syscall.O_EXCL | syscall.O_CREAT + defaultFileLockFlags = syscall.O_RDONLY | syscall.O_CREAT ) // NOTE: @@ -39,7 +42,7 @@ func stat(path string) (bool, error) { return syscall.Stat(path, &stat) }) if err != nil { - if err == syscall.ENOENT { + if err == syscall.ENOENT { //nolint err = nil } return false, err diff --git a/vendor/codeberg.org/gruf/go-store/storage/lock.go b/vendor/codeberg.org/gruf/go-store/storage/lock.go index 3d794cda9..8a6c4c5e8 100644 --- a/vendor/codeberg.org/gruf/go-store/storage/lock.go +++ b/vendor/codeberg.org/gruf/go-store/storage/lock.go @@ -1,34 +1,76 @@ package storage import ( - "os" + "sync" + "sync/atomic" "syscall" "codeberg.org/gruf/go-store/util" ) -type lockableFile struct { - *os.File +// LockFile is our standard lockfile name. +const LockFile = "store.lock" + +// Lock represents a filesystem lock to ensure only one storage instance open per path. +type Lock struct { + fd int + wg sync.WaitGroup + st uint32 } -func openLock(path string) (*lockableFile, error) { - file, err := open(path, defaultFileLockFlags) +// OpenLock opens a lockfile at path. +func OpenLock(path string) (*Lock, error) { + var fd int + + // Open the file descriptor at path + err := util.RetryOnEINTR(func() (err error) { + fd, err = syscall.Open(path, defaultFileLockFlags, defaultFilePerms) + return + }) if err != nil { return nil, err } - return &lockableFile{file}, nil + + // Get a flock on the file descriptor + err = util.RetryOnEINTR(func() error { + return syscall.Flock(fd, syscall.LOCK_EX|syscall.LOCK_NB) + }) + if err != nil { + return nil, errSwapUnavailable(err) + } + + return &Lock{fd: fd}, nil } -func (f *lockableFile) lock() error { - return f.flock(syscall.LOCK_EX | syscall.LOCK_NB) +// Add will add '1' to the underlying sync.WaitGroup. +func (f *Lock) Add() { + f.wg.Add(1) } -func (f *lockableFile) unlock() error { - return f.flock(syscall.LOCK_UN | syscall.LOCK_NB) +// Done will decrememnt '1' from the underlying sync.WaitGroup. +func (f *Lock) Done() { + f.wg.Done() } -func (f *lockableFile) flock(how int) error { - return util.RetryOnEINTR(func() error { - return syscall.Flock(int(f.Fd()), how) - }) +// Close will attempt to close the lockfile and file descriptor. +func (f *Lock) Close() error { + var err error + if atomic.CompareAndSwapUint32(&f.st, 0, 1) { + // Wait until done + f.wg.Wait() + + // Ensure gets closed + defer syscall.Close(f.fd) + + // Call funlock on the file descriptor + err = util.RetryOnEINTR(func() error { + return syscall.Flock(f.fd, syscall.LOCK_UN|syscall.LOCK_NB) + }) + } + return err +} + +// Closed will return whether this lockfile has been closed (and unlocked). +func (f *Lock) Closed() bool { + return (atomic.LoadUint32(&f.st) == 1) } diff --git a/vendor/codeberg.org/gruf/go-store/storage/memory.go b/vendor/codeberg.org/gruf/go-store/storage/memory.go index be60fa464..2dab562d6 100644 --- a/vendor/codeberg.org/gruf/go-store/storage/memory.go +++ b/vendor/codeberg.org/gruf/go-store/storage/memory.go @@ -2,6 +2,7 @@ package storage import ( "io" + "sync" "codeberg.org/gruf/go-bytes" "codeberg.org/gruf/go-store/util" @@ -10,36 +11,76 @@ import ( // MemoryStorage is a storage implementation that simply stores key-value // pairs in a Go map in-memory. The map is protected by a mutex. type MemoryStorage struct { + ow bool // overwrites fs map[string][]byte + mu sync.Mutex + st uint32 } // OpenMemory opens a new MemoryStorage instance with internal map of 'size'. -func OpenMemory(size int) *MemoryStorage { +func OpenMemory(size int, overwrites bool) *MemoryStorage { return &MemoryStorage{ fs: make(map[string][]byte, size), + mu: sync.Mutex{}, + ow: overwrites, } } // Clean implements Storage.Clean(). func (st *MemoryStorage) Clean() error { + st.mu.Lock() + defer st.mu.Unlock() + if st.st == 1 { + return ErrClosed + } return nil } // ReadBytes implements Storage.ReadBytes(). func (st *MemoryStorage) ReadBytes(key string) ([]byte, error) { + // Lock storage + st.mu.Lock() + + // Check store open + if st.st == 1 { + st.mu.Unlock() + return nil, ErrClosed + } + + // Check for key b, ok := st.fs[key] + st.mu.Unlock() + + // Return early if not exist if !ok { return nil, ErrNotFound } + + // Create return copy return bytes.Copy(b), nil } // ReadStream implements Storage.ReadStream(). func (st *MemoryStorage) ReadStream(key string) (io.ReadCloser, error) { + // Lock storage + st.mu.Lock() + + // Check store open + if st.st == 1 { + st.mu.Unlock() + return nil, ErrClosed + } + + // Check for key b, ok := st.fs[key] + st.mu.Unlock() + + // Return early if not exist if !ok { return nil, ErrNotFound } + + // Create io.ReadCloser from 'b' copy b = bytes.Copy(b) r := bytes.NewReader(b) return util.NopReadCloser(r), nil @@ -47,43 +88,101 @@ func (st *MemoryStorage) ReadStream(key string) (io.ReadCloser, error) { // WriteBytes implements Storage.WriteBytes(). func (st *MemoryStorage) WriteBytes(key string, b []byte) error { + // Lock storage + st.mu.Lock() + defer st.mu.Unlock() + + // Check store open + if st.st == 1 { + return ErrClosed + } + _, ok := st.fs[key] - if ok { + + // Check for already exist + if ok && !st.ow { return ErrAlreadyExists } + + // Write + unlock st.fs[key] = bytes.Copy(b) return nil } // WriteStream implements Storage.WriteStream(). func (st *MemoryStorage) WriteStream(key string, r io.Reader) error { + // Read all from reader b, err := io.ReadAll(r) if err != nil { return err } + + // Write to storage return st.WriteBytes(key, b) } // Stat implements Storage.Stat(). func (st *MemoryStorage) Stat(key string) (bool, error) { + // Lock storage + st.mu.Lock() + defer st.mu.Unlock() + + // Check store open + if st.st == 1 { + return false, ErrClosed + } + + // Check for key _, ok := st.fs[key] return ok, nil } // Remove implements Storage.Remove(). func (st *MemoryStorage) Remove(key string) error { + // Lock storage + st.mu.Lock() + defer st.mu.Unlock() + + // Check store open + if st.st == 1 { + return ErrClosed + } + + // Check for key _, ok := st.fs[key] if !ok { return ErrNotFound } + + // Remove from store delete(st.fs, key) + + return nil +} + +// Close implements Storage.Close(). +func (st *MemoryStorage) Close() error { + st.mu.Lock() + st.st = 1 + st.mu.Unlock() return nil } // WalkKeys implements Storage.WalkKeys(). func (st *MemoryStorage) WalkKeys(opts WalkKeysOptions) error { + // Lock storage + st.mu.Lock() + defer st.mu.Unlock() + + // Check store open + if st.st == 1 { + return ErrClosed + } + + // Walk store keys for key := range st.fs { opts.WalkFn(entry(key)) } + return nil } diff --git a/vendor/codeberg.org/gruf/go-store/storage/storage.go b/vendor/codeberg.org/gruf/go-store/storage/storage.go index b160267a4..346aff097 100644 --- a/vendor/codeberg.org/gruf/go-store/storage/storage.go +++ b/vendor/codeberg.org/gruf/go-store/storage/storage.go @@ -19,9 +19,6 @@ func (e entry) Key() string { // Storage defines a means of storing and accessing key value pairs type Storage interface { - // Clean removes unused values and unclutters the storage (e.g. removing empty folders) - Clean() error - // ReadBytes returns the byte value for key in storage ReadBytes(key string) ([]byte, error) @@ -40,6 +37,12 @@ type Storage interface { // Remove attempts to remove the supplied key-value pair from storage Remove(key string) error + // Close will close the storage, releasing any file locks + Close() error + + // Clean removes unused values and unclutters the storage (e.g. removing empty folders) + Clean() error + // WalkKeys walks the keys in the storage WalkKeys(opts WalkKeysOptions) error } diff --git a/vendor/codeberg.org/gruf/go-store/util/fs.go b/vendor/codeberg.org/gruf/go-store/util/fs.go index 93b37a261..53fef7750 100644 --- a/vendor/codeberg.org/gruf/go-store/util/fs.go +++ b/vendor/codeberg.org/gruf/go-store/util/fs.go @@ -3,30 +3,10 @@ package util import ( "io/fs" "os" - "strings" - "syscall" "codeberg.org/gruf/go-fastpath" ) -// IsDirTraversal will check if rootPlusPath is a dir traversal outside of root, -// assuming that both are cleaned and that rootPlusPath is path.Join(root, somePath) -func IsDirTraversal(root string, rootPlusPath string) bool { - switch { - // Root is $PWD, check for traversal out of - case root == ".": - return strings.HasPrefix(rootPlusPath, "../") - - // The path MUST be prefixed by root - case !strings.HasPrefix(rootPlusPath, root): - return true - - // In all other cases, check not equal - default: - return len(root) == len(rootPlusPath) - } -} - // WalkDir traverses the dir tree of the supplied path, performing the supplied walkFn on each entry func WalkDir(pb *fastpath.Builder, path string, walkFn func(string, fs.DirEntry)) error { // Read supplied dir path @@ -100,14 +80,3 @@ func cleanDirs(pb *fastpath.Builder, path string) error { } return nil } - -// RetryOnEINTR is a low-level filesystem function for retrying syscalls on O_EINTR received -func RetryOnEINTR(do func() error) error { - for { - err := do() - if err == syscall.EINTR { - continue - } - return err - } -} diff --git a/vendor/codeberg.org/gruf/go-store/util/sys.go b/vendor/codeberg.org/gruf/go-store/util/sys.go new file mode 100644 index 000000000..6661029e5 --- /dev/null +++ b/vendor/codeberg.org/gruf/go-store/util/sys.go @@ -0,0 +1,14 @@ +package util + +import "syscall" + +// RetryOnEINTR is a low-level filesystem function for retrying syscalls on O_EINTR received +func RetryOnEINTR(do func() error) error { + for { + err := do() + if err == syscall.EINTR { + continue + } + return err + } +} |