summaryrefslogtreecommitdiff
path: root/vendor/codeberg.org/gruf/go-store
diff options
context:
space:
mode:
Diffstat (limited to 'vendor/codeberg.org/gruf/go-store')
-rw-r--r--vendor/codeberg.org/gruf/go-store/kv/iterator.go10
-rw-r--r--vendor/codeberg.org/gruf/go-store/kv/state.go82
-rw-r--r--vendor/codeberg.org/gruf/go-store/kv/store.go106
-rw-r--r--vendor/codeberg.org/gruf/go-store/storage/block.go117
-rw-r--r--vendor/codeberg.org/gruf/go-store/storage/disk.go136
-rw-r--r--vendor/codeberg.org/gruf/go-store/storage/errors.go14
-rw-r--r--vendor/codeberg.org/gruf/go-store/storage/fs.go11
-rw-r--r--vendor/codeberg.org/gruf/go-store/storage/lock.go70
-rw-r--r--vendor/codeberg.org/gruf/go-store/storage/memory.go103
-rw-r--r--vendor/codeberg.org/gruf/go-store/storage/storage.go9
-rw-r--r--vendor/codeberg.org/gruf/go-store/util/fs.go31
-rw-r--r--vendor/codeberg.org/gruf/go-store/util/sys.go14
12 files changed, 482 insertions, 221 deletions
diff --git a/vendor/codeberg.org/gruf/go-store/kv/iterator.go b/vendor/codeberg.org/gruf/go-store/kv/iterator.go
index d3999273f..da743ead1 100644
--- a/vendor/codeberg.org/gruf/go-store/kv/iterator.go
+++ b/vendor/codeberg.org/gruf/go-store/kv/iterator.go
@@ -2,6 +2,7 @@ package kv
import (
"codeberg.org/gruf/go-errors"
+ "codeberg.org/gruf/go-mutexes"
"codeberg.org/gruf/go-store/storage"
)
@@ -17,10 +18,10 @@ var ErrIteratorClosed = errors.New("store/kv: iterator closed")
// have multiple iterators running concurrently
type KVIterator struct {
store *KVStore // store is the linked KVStore
+ state *mutexes.LockState
entries []storage.StorageEntry
index int
key string
- onClose func()
}
// Next attempts to set the next key-value pair, the
@@ -43,13 +44,10 @@ func (i *KVIterator) Key() string {
// Release releases the KVIterator and KVStore's read lock
func (i *KVIterator) Release() {
- // Reset key, path, entries
+ i.state.UnlockMap()
i.store = nil
i.key = ""
i.entries = nil
-
- // Perform requested callback
- i.onClose()
}
// Value returns the next value from the KVStore
@@ -60,5 +58,5 @@ func (i *KVIterator) Value() ([]byte, error) {
}
// Attempt to fetch from store
- return i.store.get(i.key)
+ return i.store.get(i.state.RLock, i.key)
}
diff --git a/vendor/codeberg.org/gruf/go-store/kv/state.go b/vendor/codeberg.org/gruf/go-store/kv/state.go
index 20a3e951d..0b226e107 100644
--- a/vendor/codeberg.org/gruf/go-store/kv/state.go
+++ b/vendor/codeberg.org/gruf/go-store/kv/state.go
@@ -2,9 +2,9 @@ package kv
import (
"io"
- "sync"
"codeberg.org/gruf/go-errors"
+ "codeberg.org/gruf/go-mutexes"
)
var ErrStateClosed = errors.New("store/kv: state closed")
@@ -16,61 +16,42 @@ var ErrStateClosed = errors.New("store/kv: state closed")
// then the state has zero guarantees
type StateRO struct {
store *KVStore
- mutex sync.RWMutex
+ state *mutexes.LockState
}
func (st *StateRO) Get(key string) ([]byte, error) {
- // Get state read lock
- st.mutex.RLock()
- defer st.mutex.RUnlock()
-
// Check not closed
if st.store == nil {
return nil, ErrStateClosed
}
// Pass request to store
- return st.store.get(key)
+ return st.store.get(st.state.RLock, key)
}
func (st *StateRO) GetStream(key string) (io.ReadCloser, error) {
- // Get state read lock
- st.mutex.RLock()
- defer st.mutex.RUnlock()
-
// Check not closed
if st.store == nil {
return nil, ErrStateClosed
}
// Pass request to store
- return st.store.getStream(key)
+ return st.store.getStream(st.state.RLock, key)
}
func (st *StateRO) Has(key string) (bool, error) {
- // Get state read lock
- st.mutex.RLock()
- defer st.mutex.RUnlock()
-
// Check not closed
if st.store == nil {
return false, ErrStateClosed
}
// Pass request to store
- return st.store.has(key)
+ return st.store.has(st.state.RLock, key)
}
func (st *StateRO) Release() {
- // Get state write lock
- st.mutex.Lock()
- defer st.mutex.Unlock()
-
- // Release the store
- if st.store != nil {
- st.store.mutex.RUnlock()
- st.store = nil
- }
+ st.state.UnlockMap()
+ st.store = nil
}
// StateRW provides a read-write window to the store. While this
@@ -80,101 +61,70 @@ func (st *StateRO) Release() {
// then the state has zero guarantees
type StateRW struct {
store *KVStore
- mutex sync.RWMutex
+ state *mutexes.LockState
}
func (st *StateRW) Get(key string) ([]byte, error) {
- // Get state read lock
- st.mutex.RLock()
- defer st.mutex.RUnlock()
-
// Check not closed
if st.store == nil {
return nil, ErrStateClosed
}
// Pass request to store
- return st.store.get(key)
+ return st.store.get(st.state.RLock, key)
}
func (st *StateRW) GetStream(key string) (io.ReadCloser, error) {
- // Get state read lock
- st.mutex.RLock()
- defer st.mutex.RUnlock()
-
// Check not closed
if st.store == nil {
return nil, ErrStateClosed
}
// Pass request to store
- return st.store.getStream(key)
+ return st.store.getStream(st.state.RLock, key)
}
func (st *StateRW) Put(key string, value []byte) error {
- // Get state read lock
- st.mutex.RLock()
- defer st.mutex.RUnlock()
-
// Check not closed
if st.store == nil {
return ErrStateClosed
}
// Pass request to store
- return st.store.put(key, value)
+ return st.store.put(st.state.Lock, key, value)
}
func (st *StateRW) PutStream(key string, r io.Reader) error {
- // Get state read lock
- st.mutex.RLock()
- defer st.mutex.RUnlock()
-
// Check not closed
if st.store == nil {
return ErrStateClosed
}
// Pass request to store
- return st.store.putStream(key, r)
+ return st.store.putStream(st.state.Lock, key, r)
}
func (st *StateRW) Has(key string) (bool, error) {
- // Get state read lock
- st.mutex.RLock()
- defer st.mutex.RUnlock()
-
// Check not closed
if st.store == nil {
return false, ErrStateClosed
}
// Pass request to store
- return st.store.has(key)
+ return st.store.has(st.state.RLock, key)
}
func (st *StateRW) Delete(key string) error {
- // Get state read lock
- st.mutex.RLock()
- defer st.mutex.RUnlock()
-
// Check not closed
if st.store == nil {
return ErrStateClosed
}
// Pass request to store
- return st.store.delete(key)
+ return st.store.delete(st.state.Lock, key)
}
func (st *StateRW) Release() {
- // Get state write lock
- st.mutex.Lock()
- defer st.mutex.Unlock()
-
- // Release the store
- if st.store != nil {
- st.store.mutex.Unlock()
- st.store = nil
- }
+ st.state.UnlockMap()
+ st.store = nil
}
diff --git a/vendor/codeberg.org/gruf/go-store/kv/store.go b/vendor/codeberg.org/gruf/go-store/kv/store.go
index 34fe91987..a8741afe0 100644
--- a/vendor/codeberg.org/gruf/go-store/kv/store.go
+++ b/vendor/codeberg.org/gruf/go-store/kv/store.go
@@ -2,7 +2,6 @@ package kv
import (
"io"
- "sync"
"codeberg.org/gruf/go-mutexes"
"codeberg.org/gruf/go-store/storage"
@@ -11,9 +10,8 @@ import (
// KVStore is a very simple, yet performant key-value store
type KVStore struct {
- mutexMap mutexes.MutexMap // mutexMap is a map of keys to mutexes to protect file access
- mutex sync.RWMutex // mutex is the total store mutex
- storage storage.Storage // storage is the underlying storage
+ mutex mutexes.MutexMap // mutex is a map of keys to mutexes to protect file access
+ storage storage.Storage // storage is the underlying storage
}
func OpenFile(path string, cfg *storage.DiskConfig) (*KVStore, error) {
@@ -47,25 +45,29 @@ func OpenStorage(storage storage.Storage) (*KVStore, error) {
// Return new KVStore
return &KVStore{
- mutexMap: mutexes.NewMap(mutexes.NewRW),
- mutex: sync.RWMutex{},
- storage: storage,
+ mutex: mutexes.NewMap(-1),
+ storage: storage,
}, nil
}
+// RLock acquires a read-lock on supplied key, returning unlock function.
+func (st *KVStore) RLock(key string) (runlock func()) {
+ return st.mutex.RLock(key)
+}
+
+// Lock acquires a write-lock on supplied key, returning unlock function.
+func (st *KVStore) Lock(key string) (unlock func()) {
+ return st.mutex.Lock(key)
+}
+
// Get fetches the bytes for supplied key in the store
func (st *KVStore) Get(key string) ([]byte, error) {
- // Acquire store read lock
- st.mutex.RLock()
- defer st.mutex.RUnlock()
-
- // Pass to unprotected fn
- return st.get(key)
+ return st.get(st.RLock, key)
}
-func (st *KVStore) get(key string) ([]byte, error) {
+func (st *KVStore) get(rlock func(string) func(), key string) ([]byte, error) {
// Acquire read lock for key
- runlock := st.mutexMap.RLock(key)
+ runlock := rlock(key)
defer runlock()
// Read file bytes
@@ -74,17 +76,12 @@ func (st *KVStore) get(key string) ([]byte, error) {
// GetStream fetches a ReadCloser for the bytes at the supplied key location in the store
func (st *KVStore) GetStream(key string) (io.ReadCloser, error) {
- // Acquire store read lock
- st.mutex.RLock()
- defer st.mutex.RUnlock()
-
- // Pass to unprotected fn
- return st.getStream(key)
+ return st.getStream(st.RLock, key)
}
-func (st *KVStore) getStream(key string) (io.ReadCloser, error) {
+func (st *KVStore) getStream(rlock func(string) func(), key string) (io.ReadCloser, error) {
// Acquire read lock for key
- runlock := st.mutexMap.RLock(key)
+ runlock := rlock(key)
// Attempt to open stream for read
rd, err := st.storage.ReadStream(key)
@@ -99,17 +96,12 @@ func (st *KVStore) getStream(key string) (io.ReadCloser, error) {
// Put places the bytes at the supplied key location in the store
func (st *KVStore) Put(key string, value []byte) error {
- // Acquire store write lock
- st.mutex.Lock()
- defer st.mutex.Unlock()
-
- // Pass to unprotected fn
- return st.put(key, value)
+ return st.put(st.Lock, key, value)
}
-func (st *KVStore) put(key string, value []byte) error {
+func (st *KVStore) put(lock func(string) func(), key string, value []byte) error {
// Acquire write lock for key
- unlock := st.mutexMap.Lock(key)
+ unlock := lock(key)
defer unlock()
// Write file bytes
@@ -118,17 +110,12 @@ func (st *KVStore) put(key string, value []byte) error {
// PutStream writes the bytes from the supplied Reader at the supplied key location in the store
func (st *KVStore) PutStream(key string, r io.Reader) error {
- // Acquire store write lock
- st.mutex.Lock()
- defer st.mutex.Unlock()
-
- // Pass to unprotected fn
- return st.putStream(key, r)
+ return st.putStream(st.Lock, key, r)
}
-func (st *KVStore) putStream(key string, r io.Reader) error {
+func (st *KVStore) putStream(lock func(string) func(), key string, r io.Reader) error {
// Acquire write lock for key
- unlock := st.mutexMap.Lock(key)
+ unlock := lock(key)
defer unlock()
// Write file stream
@@ -137,17 +124,12 @@ func (st *KVStore) putStream(key string, r io.Reader) error {
// Has checks whether the supplied key exists in the store
func (st *KVStore) Has(key string) (bool, error) {
- // Acquire store read lock
- st.mutex.RLock()
- defer st.mutex.RUnlock()
-
- // Pass to unprotected fn
- return st.has(key)
+ return st.has(st.RLock, key)
}
-func (st *KVStore) has(key string) (bool, error) {
+func (st *KVStore) has(rlock func(string) func(), key string) (bool, error) {
// Acquire read lock for key
- runlock := st.mutexMap.RLock(key)
+ runlock := rlock(key)
defer runlock()
// Stat file on disk
@@ -156,17 +138,12 @@ func (st *KVStore) has(key string) (bool, error) {
// Delete removes the supplied key-value pair from the store
func (st *KVStore) Delete(key string) error {
- // Acquire store write lock
- st.mutex.Lock()
- defer st.mutex.Unlock()
-
- // Pass to unprotected fn
- return st.delete(key)
+ return st.delete(st.Lock, key)
}
-func (st *KVStore) delete(key string) error {
+func (st *KVStore) delete(lock func(string) func(), key string) error {
// Acquire write lock for key
- unlock := st.mutexMap.Lock(key)
+ unlock := lock(key)
defer unlock()
// Remove file from disk
@@ -181,7 +158,7 @@ func (st *KVStore) Iterator(matchFn func(string) bool) (*KVIterator, error) {
}
// Get store read lock
- st.mutex.RLock()
+ state := st.mutex.RLockMap()
// Setup the walk keys function
entries := []storage.StorageEntry{}
@@ -198,24 +175,24 @@ func (st *KVStore) Iterator(matchFn func(string) bool) (*KVIterator, error) {
// Walk keys in the storage
err := st.storage.WalkKeys(storage.WalkKeysOptions{WalkFn: walkFn})
if err != nil {
- st.mutex.RUnlock()
+ state.UnlockMap()
return nil, err
}
// Return new iterator
return &KVIterator{
store: st,
+ state: state,
entries: entries,
index: -1,
key: "",
- onClose: st.mutex.RUnlock,
}, nil
}
// Read provides a read-only window to the store, holding it in a read-locked state until release
func (st *KVStore) Read() *StateRO {
- st.mutex.RLock()
- return &StateRO{store: st}
+ state := st.mutex.RLockMap()
+ return &StateRO{store: st, state: state}
}
// ReadFn provides a read-only window to the store, holding it in a read-locked state until fn return.
@@ -230,8 +207,8 @@ func (st *KVStore) ReadFn(fn func(*StateRO)) {
// Update provides a read-write window to the store, holding it in a write-locked state until release
func (st *KVStore) Update() *StateRW {
- st.mutex.Lock()
- return &StateRW{store: st}
+ state := st.mutex.LockMap()
+ return &StateRW{store: st, state: state}
}
// UpdateFn provides a read-write window to the store, holding it in a write-locked state until fn return.
@@ -243,3 +220,8 @@ func (st *KVStore) UpdateFn(fn func(*StateRW)) {
// Pass to fn
fn(state)
}
+
+// Close will close the underlying storage, the mutex map locking (e.g. RLock(), Lock() will still work).
+func (st *KVStore) Close() error {
+ return st.storage.Close()
+}
diff --git a/vendor/codeberg.org/gruf/go-store/storage/block.go b/vendor/codeberg.org/gruf/go-store/storage/block.go
index 9a8c4dc7d..c50faa10b 100644
--- a/vendor/codeberg.org/gruf/go-store/storage/block.go
+++ b/vendor/codeberg.org/gruf/go-store/storage/block.go
@@ -87,6 +87,7 @@ type BlockStorage struct {
config BlockConfig // cfg is the supplied configuration for this store
hashPool sync.Pool // hashPool is this store's hashEncoder pool
bufpool pools.BufferPool // bufpool is this store's bytes.Buffer pool
+ lock *Lock // lock is the opened lockfile for this storage instance
// NOTE:
// BlockStorage does not need to lock each of the underlying block files
@@ -138,6 +139,12 @@ func OpenBlock(path string, cfg *BlockConfig) (*BlockStorage, error) {
return nil, errPathIsFile
}
+ // Open and acquire storage lock for path
+ lock, err := OpenLock(pb.Join(path, LockFile))
+ if err != nil {
+ return nil, err
+ }
+
// Figure out the largest size for bufpool slices
bufSz := encodedHashLen
if bufSz < config.BlockSize {
@@ -159,19 +166,29 @@ func OpenBlock(path string, cfg *BlockConfig) (*BlockStorage, error) {
},
},
bufpool: pools.NewBufferPool(bufSz),
+ lock: lock,
}, nil
}
// Clean implements storage.Clean()
func (st *BlockStorage) Clean() error {
- nodes := map[string]*node{}
+ // Track open
+ st.lock.Add()
+ defer st.lock.Done()
+
+ // Check if open
+ if st.lock.Closed() {
+ return ErrClosed
+ }
// Acquire path builder
pb := util.GetPathBuilder()
defer util.PutPathBuilder(pb)
- // Walk nodes dir for entries
+ nodes := map[string]*node{}
onceErr := errors.OnceError{}
+
+ // Walk nodes dir for entries
err := util.WalkDir(pb, st.nodePath, func(npath string, fsentry fs.DirEntry) {
// Only deal with regular files
if !fsentry.Type().IsRegular() {
@@ -293,6 +310,7 @@ func (st *BlockStorage) ReadBytes(key string) ([]byte, error) {
if err != nil {
return nil, err
}
+ defer rc.Close()
// Read all bytes and return
return io.ReadAll(rc)
@@ -306,9 +324,19 @@ func (st *BlockStorage) ReadStream(key string) (io.ReadCloser, error) {
return nil, err
}
+ // Track open
+ st.lock.Add()
+
+ // Check if open
+ if st.lock.Closed() {
+ st.lock.Done()
+ return nil, ErrClosed
+ }
+
// Attempt to open RO file
file, err := open(npath, defaultFileROFlags)
if err != nil {
+ st.lock.Done()
return nil, err
}
defer file.Close()
@@ -328,14 +356,16 @@ func (st *BlockStorage) ReadStream(key string) (io.ReadCloser, error) {
nil,
)
if err != nil {
+ st.lock.Done()
return nil, err
}
- // Return new block reader
- return util.NopReadCloser(&blockReader{
+ // Prepare block reader and return
+ rc := util.NopReadCloser(&blockReader{
storage: st,
node: &node,
- }), nil
+ }) // we wrap the blockreader to decr lockfile waitgroup
+ return util.ReadCloserWithCallback(rc, st.lock.Done), nil
}
func (st *BlockStorage) readBlock(key string) ([]byte, error) {
@@ -373,6 +403,15 @@ func (st *BlockStorage) WriteStream(key string, r io.Reader) error {
return err
}
+ // Track open
+ st.lock.Add()
+ defer st.lock.Done()
+
+ // Check if open
+ if st.lock.Closed() {
+ return ErrClosed
+ }
+
// Check if this exists
ok, err := stat(key)
if err != nil {
@@ -443,11 +482,16 @@ loop:
continue loop
}
- // Write in separate goroutine
+ // Check if reached EOF
+ atEOF := (n < buf.Len())
+
wg.Add(1)
go func() {
- // Defer buffer release + signal done
+ // Perform writes in goroutine
+
defer func() {
+ // Defer release +
+ // signal we're done
st.bufpool.Put(buf)
wg.Done()
}()
@@ -460,8 +504,8 @@ loop:
}
}()
- // We reached EOF
- if n < buf.Len() {
+ // Break at end
+ if atEOF {
break loop
}
}
@@ -552,6 +596,15 @@ func (st *BlockStorage) Stat(key string) (bool, error) {
return false, err
}
+ // Track open
+ st.lock.Add()
+ defer st.lock.Done()
+
+ // Check if open
+ if st.lock.Closed() {
+ return false, ErrClosed
+ }
+
// Check for file on disk
return stat(kpath)
}
@@ -564,12 +617,35 @@ func (st *BlockStorage) Remove(key string) error {
return err
}
+ // Track open
+ st.lock.Add()
+ defer st.lock.Done()
+
+ // Check if open
+ if st.lock.Closed() {
+ return ErrClosed
+ }
+
// Attempt to remove file
return os.Remove(kpath)
}
+// Close implements Storage.Close()
+func (st *BlockStorage) Close() error {
+ return st.lock.Close()
+}
+
// WalkKeys implements Storage.WalkKeys()
func (st *BlockStorage) WalkKeys(opts WalkKeysOptions) error {
+ // Track open
+ st.lock.Add()
+ defer st.lock.Done()
+
+ // Check if open
+ if st.lock.Closed() {
+ return ErrClosed
+ }
+
// Acquire path builder
pb := util.GetPathBuilder()
defer util.PutPathBuilder(pb)
@@ -610,7 +686,7 @@ func (st *BlockStorage) blockPathForKey(hash string) string {
}
// hashSeparator is the separating byte between block hashes
-const hashSeparator = byte(':')
+const hashSeparator = byte('\n')
// node represents the contents of a node file in storage
type node struct {
@@ -773,24 +849,27 @@ func (r *blockReader) Read(b []byte) (int, error) {
}
}
+var (
+ // base64Encoding is our base64 encoding object.
+ base64Encoding = hashenc.Base64()
+
+ // encodedHashLen is the once-calculated encoded hash-sum length
+ encodedHashLen = base64Encoding.EncodedLen(
+ sha256.New().Size(),
+ )
+)
+
// hashEncoder is a HashEncoder with built-in encode buffer
type hashEncoder struct {
henc hashenc.HashEncoder
ebuf []byte
}
-// encodedHashLen is the once-calculated encoded hash-sum length
-var encodedHashLen = hashenc.Base64().EncodedLen(
- sha256.New().Size(),
-)
-
// newHashEncoder returns a new hashEncoder instance
func newHashEncoder() *hashEncoder {
- hash := sha256.New()
- enc := hashenc.Base64()
return &hashEncoder{
- henc: hashenc.New(hash, enc),
- ebuf: make([]byte, enc.EncodedLen(hash.Size())),
+ henc: hashenc.New(sha256.New(), base64Encoding),
+ ebuf: make([]byte, encodedHashLen),
}
}
diff --git a/vendor/codeberg.org/gruf/go-store/storage/disk.go b/vendor/codeberg.org/gruf/go-store/storage/disk.go
index 060d56688..287042886 100644
--- a/vendor/codeberg.org/gruf/go-store/storage/disk.go
+++ b/vendor/codeberg.org/gruf/go-store/storage/disk.go
@@ -5,6 +5,8 @@ import (
"io/fs"
"os"
"path"
+ _path "path"
+ "strings"
"syscall"
"codeberg.org/gruf/go-bytes"
@@ -31,6 +33,11 @@ type DiskConfig struct {
// Overwrite allows overwriting values of stored keys in the storage
Overwrite bool
+ // LockFile allows specifying the filesystem path to use for the lockfile,
+ // providing only a filename it will store the lockfile within provided store
+ // path and nest the store under `path/store` to prevent access to lockfile
+ LockFile string
+
// Compression is the Compressor to use when reading / writing files, default is no compression
Compression Compressor
}
@@ -57,11 +64,17 @@ func getDiskConfig(cfg *DiskConfig) DiskConfig {
cfg.WriteBufSize = DefaultDiskConfig.WriteBufSize
}
+ // Assume empty lockfile path == use default
+ if len(cfg.LockFile) < 1 {
+ cfg.LockFile = LockFile
+ }
+
// Return owned config copy
return DiskConfig{
Transform: cfg.Transform,
WriteBufSize: cfg.WriteBufSize,
Overwrite: cfg.Overwrite,
+ LockFile: cfg.LockFile,
Compression: cfg.Compression,
}
}
@@ -71,23 +84,35 @@ type DiskStorage struct {
path string // path is the root path of this store
bufp pools.BufferPool // bufp is the buffer pool for this DiskStorage
config DiskConfig // cfg is the supplied configuration for this store
+ lock *Lock // lock is the opened lockfile for this storage instance
}
// OpenFile opens a DiskStorage instance for given folder path and configuration
func OpenFile(path string, cfg *DiskConfig) (*DiskStorage, error) {
+ // Get checked config
+ config := getDiskConfig(cfg)
+
// Acquire path builder
pb := util.GetPathBuilder()
defer util.PutPathBuilder(pb)
- // Clean provided path, ensure ends in '/' (should
- // be dir, this helps with file path trimming later)
- path = pb.Clean(path) + "/"
+ // Clean provided store path, ensure
+ // ends in '/' to help later path trimming
+ storePath := pb.Clean(path) + "/"
- // Get checked config
- config := getDiskConfig(cfg)
+ // Clean provided lockfile path
+ lockfile := pb.Clean(config.LockFile)
+
+ // Check if lockfile is an *actual* path or just filename
+ if lockDir, _ := _path.Split(lockfile); len(lockDir) < 1 {
+ // Lockfile is a filename, store must be nested under
+ // $storePath/store to prevent access to the lockfile
+ storePath += "store/"
+ lockfile = pb.Join(path, lockfile)
+ }
// Attempt to open dir path
- file, err := os.OpenFile(path, defaultFileROFlags, defaultDirPerms)
+ file, err := os.OpenFile(storePath, defaultFileROFlags, defaultDirPerms)
if err != nil {
// If not a not-exist error, return
if !os.IsNotExist(err) {
@@ -95,13 +120,13 @@ func OpenFile(path string, cfg *DiskConfig) (*DiskStorage, error) {
}
// Attempt to make store path dirs
- err = os.MkdirAll(path, defaultDirPerms)
+ err = os.MkdirAll(storePath, defaultDirPerms)
if err != nil {
return nil, err
}
// Reopen dir now it's been created
- file, err = os.OpenFile(path, defaultFileROFlags, defaultDirPerms)
+ file, err = os.OpenFile(storePath, defaultFileROFlags, defaultDirPerms)
if err != nil {
return nil, err
}
@@ -116,16 +141,28 @@ func OpenFile(path string, cfg *DiskConfig) (*DiskStorage, error) {
return nil, errPathIsFile
}
+ // Open and acquire storage lock for path
+ lock, err := OpenLock(lockfile)
+ if err != nil {
+ return nil, err
+ }
+
// Return new DiskStorage
return &DiskStorage{
- path: path,
+ path: storePath,
bufp: pools.NewBufferPool(config.WriteBufSize),
config: config,
+ lock: lock,
}, nil
}
// Clean implements Storage.Clean()
func (st *DiskStorage) Clean() error {
+ st.lock.Add()
+ defer st.lock.Done()
+ if st.lock.Closed() {
+ return ErrClosed
+ }
return util.CleanDirs(st.path)
}
@@ -150,9 +187,18 @@ func (st *DiskStorage) ReadStream(key string) (io.ReadCloser, error) {
return nil, err
}
+ // Track open
+ st.lock.Add()
+
+ // Check if open
+ if st.lock.Closed() {
+ return nil, ErrClosed
+ }
+
// Attempt to open file (replace ENOENT with our own)
file, err := open(kpath, defaultFileROFlags)
if err != nil {
+ st.lock.Done()
return nil, errSwapNotFound(err)
}
@@ -160,12 +206,14 @@ func (st *DiskStorage) ReadStream(key string) (io.ReadCloser, error) {
cFile, err := st.config.Compression.Reader(file)
if err != nil {
file.Close() // close this here, ignore error
+ st.lock.Done()
return nil, err
}
// Wrap compressor to ensure file close
return util.ReadCloserWithCallback(cFile, func() {
file.Close()
+ st.lock.Done()
}), nil
}
@@ -182,6 +230,15 @@ func (st *DiskStorage) WriteStream(key string, r io.Reader) error {
return err
}
+ // Track open
+ st.lock.Add()
+ defer st.lock.Done()
+
+ // Check if open
+ if st.lock.Closed() {
+ return ErrClosed
+ }
+
// Ensure dirs leading up to file exist
err = os.MkdirAll(path.Dir(kpath), defaultDirPerms)
if err != nil {
@@ -232,6 +289,15 @@ func (st *DiskStorage) Stat(key string) (bool, error) {
return false, err
}
+ // Track open
+ st.lock.Add()
+ defer st.lock.Done()
+
+ // Check if open
+ if st.lock.Closed() {
+ return false, ErrClosed
+ }
+
// Check for file on disk
return stat(kpath)
}
@@ -244,20 +310,44 @@ func (st *DiskStorage) Remove(key string) error {
return err
}
+ // Track open
+ st.lock.Add()
+ defer st.lock.Done()
+
+ // Check if open
+ if st.lock.Closed() {
+ return ErrClosed
+ }
+
// Attempt to remove file
return os.Remove(kpath)
}
+// Close implements Storage.Close()
+func (st *DiskStorage) Close() error {
+ return st.lock.Close()
+}
+
// WalkKeys implements Storage.WalkKeys()
func (st *DiskStorage) WalkKeys(opts WalkKeysOptions) error {
+ // Track open
+ st.lock.Add()
+ defer st.lock.Done()
+
+ // Check if open
+ if st.lock.Closed() {
+ return ErrClosed
+ }
+
// Acquire path builder
pb := util.GetPathBuilder()
defer util.PutPathBuilder(pb)
// Walk dir for entries
return util.WalkDir(pb, st.path, func(kpath string, fsentry fs.DirEntry) {
- // Only deal with regular files
if fsentry.Type().IsRegular() {
+ // Only deal with regular files
+
// Get full item path (without root)
kpath = pb.Join(kpath, fsentry.Name())[len(st.path):]
@@ -269,21 +359,39 @@ func (st *DiskStorage) WalkKeys(opts WalkKeysOptions) error {
// filepath checks and returns a formatted filepath for given key
func (st *DiskStorage) filepath(key string) (string, error) {
+ // Calculate transformed key path
+ key = st.config.Transform.KeyToPath(key)
+
// Acquire path builder
pb := util.GetPathBuilder()
defer util.PutPathBuilder(pb)
- // Calculate transformed key path
- key = st.config.Transform.KeyToPath(key)
-
// Generated joined root path
pb.AppendString(st.path)
pb.AppendString(key)
// Check for dir traversal outside of root
- if util.IsDirTraversal(st.path, pb.StringPtr()) {
+ if isDirTraversal(st.path, pb.StringPtr()) {
return "", ErrInvalidKey
}
return pb.String(), nil
}
+
+// isDirTraversal will check if rootPlusPath is a dir traversal outside of root,
+// assuming that both are cleaned and that rootPlusPath is path.Join(root, somePath)
+func isDirTraversal(root, rootPlusPath string) bool {
+ switch {
+ // Root is $PWD, check for traversal out of
+ case root == ".":
+ return strings.HasPrefix(rootPlusPath, "../")
+
+ // The path MUST be prefixed by root
+ case !strings.HasPrefix(rootPlusPath, root):
+ return true
+
+ // In all other cases, check not equal
+ default:
+ return len(root) == len(rootPlusPath)
+ }
+}
diff --git a/vendor/codeberg.org/gruf/go-store/storage/errors.go b/vendor/codeberg.org/gruf/go-store/storage/errors.go
index 016593596..ad2b742e6 100644
--- a/vendor/codeberg.org/gruf/go-store/storage/errors.go
+++ b/vendor/codeberg.org/gruf/go-store/storage/errors.go
@@ -19,6 +19,9 @@ func (e errorString) Extend(s string, a ...interface{}) errorString {
}
var (
+ // ErrClosed is returned on operations on a closed storage
+ ErrClosed = errorString("store/storage: closed")
+
// ErrNotFound is the error returned when a key cannot be found in storage
ErrNotFound = errorString("store/storage: key not found")
@@ -39,6 +42,9 @@ var (
// errCorruptNodes is returned when nodes with missing blocks are found during a BlockStorage clean
errCorruptNodes = errorString("store/storage: corrupted nodes")
+
+ // ErrAlreadyLocked is returned on fail opening a storage lockfile
+ ErrAlreadyLocked = errorString("store/storage: storage lock already open")
)
// errSwapNoop performs no error swaps
@@ -61,3 +67,11 @@ func errSwapExist(err error) error {
}
return err
}
+
+// errSwapUnavailable swaps syscall.EAGAIN for ErrAlreadyLocked
+func errSwapUnavailable(err error) error {
+ if err == syscall.EAGAIN {
+ return ErrAlreadyLocked
+ }
+ return err
+}
diff --git a/vendor/codeberg.org/gruf/go-store/storage/fs.go b/vendor/codeberg.org/gruf/go-store/storage/fs.go
index 444cee4b0..b1c3560d2 100644
--- a/vendor/codeberg.org/gruf/go-store/storage/fs.go
+++ b/vendor/codeberg.org/gruf/go-store/storage/fs.go
@@ -8,11 +8,14 @@ import (
)
const (
- defaultDirPerms = 0755
- defaultFilePerms = 0644
+ // default file permission bits
+ defaultDirPerms = 0755
+ defaultFilePerms = 0644
+
+ // default file open flags
defaultFileROFlags = syscall.O_RDONLY
defaultFileRWFlags = syscall.O_CREAT | syscall.O_RDWR
- defaultFileLockFlags = syscall.O_RDONLY | syscall.O_EXCL | syscall.O_CREAT
+ defaultFileLockFlags = syscall.O_RDONLY | syscall.O_CREAT
)
// NOTE:
@@ -39,7 +42,7 @@ func stat(path string) (bool, error) {
return syscall.Stat(path, &stat)
})
if err != nil {
- if err == syscall.ENOENT {
+ if err == syscall.ENOENT { //nolint
err = nil
}
return false, err
diff --git a/vendor/codeberg.org/gruf/go-store/storage/lock.go b/vendor/codeberg.org/gruf/go-store/storage/lock.go
index 3d794cda9..8a6c4c5e8 100644
--- a/vendor/codeberg.org/gruf/go-store/storage/lock.go
+++ b/vendor/codeberg.org/gruf/go-store/storage/lock.go
@@ -1,34 +1,76 @@
package storage
import (
- "os"
+ "sync"
+ "sync/atomic"
"syscall"
"codeberg.org/gruf/go-store/util"
)
-type lockableFile struct {
- *os.File
+// LockFile is our standard lockfile name.
+const LockFile = "store.lock"
+
+// Lock represents a filesystem lock to ensure only one storage instance open per path.
+type Lock struct {
+ fd int
+ wg sync.WaitGroup
+ st uint32
}
-func openLock(path string) (*lockableFile, error) {
- file, err := open(path, defaultFileLockFlags)
+// OpenLock opens a lockfile at path.
+func OpenLock(path string) (*Lock, error) {
+ var fd int
+
+ // Open the file descriptor at path
+ err := util.RetryOnEINTR(func() (err error) {
+ fd, err = syscall.Open(path, defaultFileLockFlags, defaultFilePerms)
+ return
+ })
if err != nil {
return nil, err
}
- return &lockableFile{file}, nil
+
+ // Get a flock on the file descriptor
+ err = util.RetryOnEINTR(func() error {
+ return syscall.Flock(fd, syscall.LOCK_EX|syscall.LOCK_NB)
+ })
+ if err != nil {
+ return nil, errSwapUnavailable(err)
+ }
+
+ return &Lock{fd: fd}, nil
}
-func (f *lockableFile) lock() error {
- return f.flock(syscall.LOCK_EX | syscall.LOCK_NB)
+// Add will add '1' to the underlying sync.WaitGroup.
+func (f *Lock) Add() {
+ f.wg.Add(1)
}
-func (f *lockableFile) unlock() error {
- return f.flock(syscall.LOCK_UN | syscall.LOCK_NB)
+// Done will decrememnt '1' from the underlying sync.WaitGroup.
+func (f *Lock) Done() {
+ f.wg.Done()
}
-func (f *lockableFile) flock(how int) error {
- return util.RetryOnEINTR(func() error {
- return syscall.Flock(int(f.Fd()), how)
- })
+// Close will attempt to close the lockfile and file descriptor.
+func (f *Lock) Close() error {
+ var err error
+ if atomic.CompareAndSwapUint32(&f.st, 0, 1) {
+ // Wait until done
+ f.wg.Wait()
+
+ // Ensure gets closed
+ defer syscall.Close(f.fd)
+
+ // Call funlock on the file descriptor
+ err = util.RetryOnEINTR(func() error {
+ return syscall.Flock(f.fd, syscall.LOCK_UN|syscall.LOCK_NB)
+ })
+ }
+ return err
+}
+
+// Closed will return whether this lockfile has been closed (and unlocked).
+func (f *Lock) Closed() bool {
+ return (atomic.LoadUint32(&f.st) == 1)
}
diff --git a/vendor/codeberg.org/gruf/go-store/storage/memory.go b/vendor/codeberg.org/gruf/go-store/storage/memory.go
index be60fa464..2dab562d6 100644
--- a/vendor/codeberg.org/gruf/go-store/storage/memory.go
+++ b/vendor/codeberg.org/gruf/go-store/storage/memory.go
@@ -2,6 +2,7 @@ package storage
import (
"io"
+ "sync"
"codeberg.org/gruf/go-bytes"
"codeberg.org/gruf/go-store/util"
@@ -10,36 +11,76 @@ import (
// MemoryStorage is a storage implementation that simply stores key-value
// pairs in a Go map in-memory. The map is protected by a mutex.
type MemoryStorage struct {
+ ow bool // overwrites
fs map[string][]byte
+ mu sync.Mutex
+ st uint32
}
// OpenMemory opens a new MemoryStorage instance with internal map of 'size'.
-func OpenMemory(size int) *MemoryStorage {
+func OpenMemory(size int, overwrites bool) *MemoryStorage {
return &MemoryStorage{
fs: make(map[string][]byte, size),
+ mu: sync.Mutex{},
+ ow: overwrites,
}
}
// Clean implements Storage.Clean().
func (st *MemoryStorage) Clean() error {
+ st.mu.Lock()
+ defer st.mu.Unlock()
+ if st.st == 1 {
+ return ErrClosed
+ }
return nil
}
// ReadBytes implements Storage.ReadBytes().
func (st *MemoryStorage) ReadBytes(key string) ([]byte, error) {
+ // Lock storage
+ st.mu.Lock()
+
+ // Check store open
+ if st.st == 1 {
+ st.mu.Unlock()
+ return nil, ErrClosed
+ }
+
+ // Check for key
b, ok := st.fs[key]
+ st.mu.Unlock()
+
+ // Return early if not exist
if !ok {
return nil, ErrNotFound
}
+
+ // Create return copy
return bytes.Copy(b), nil
}
// ReadStream implements Storage.ReadStream().
func (st *MemoryStorage) ReadStream(key string) (io.ReadCloser, error) {
+ // Lock storage
+ st.mu.Lock()
+
+ // Check store open
+ if st.st == 1 {
+ st.mu.Unlock()
+ return nil, ErrClosed
+ }
+
+ // Check for key
b, ok := st.fs[key]
+ st.mu.Unlock()
+
+ // Return early if not exist
if !ok {
return nil, ErrNotFound
}
+
+ // Create io.ReadCloser from 'b' copy
b = bytes.Copy(b)
r := bytes.NewReader(b)
return util.NopReadCloser(r), nil
@@ -47,43 +88,101 @@ func (st *MemoryStorage) ReadStream(key string) (io.ReadCloser, error) {
// WriteBytes implements Storage.WriteBytes().
func (st *MemoryStorage) WriteBytes(key string, b []byte) error {
+ // Lock storage
+ st.mu.Lock()
+ defer st.mu.Unlock()
+
+ // Check store open
+ if st.st == 1 {
+ return ErrClosed
+ }
+
_, ok := st.fs[key]
- if ok {
+
+ // Check for already exist
+ if ok && !st.ow {
return ErrAlreadyExists
}
+
+ // Write + unlock
st.fs[key] = bytes.Copy(b)
return nil
}
// WriteStream implements Storage.WriteStream().
func (st *MemoryStorage) WriteStream(key string, r io.Reader) error {
+ // Read all from reader
b, err := io.ReadAll(r)
if err != nil {
return err
}
+
+ // Write to storage
return st.WriteBytes(key, b)
}
// Stat implements Storage.Stat().
func (st *MemoryStorage) Stat(key string) (bool, error) {
+ // Lock storage
+ st.mu.Lock()
+ defer st.mu.Unlock()
+
+ // Check store open
+ if st.st == 1 {
+ return false, ErrClosed
+ }
+
+ // Check for key
_, ok := st.fs[key]
return ok, nil
}
// Remove implements Storage.Remove().
func (st *MemoryStorage) Remove(key string) error {
+ // Lock storage
+ st.mu.Lock()
+ defer st.mu.Unlock()
+
+ // Check store open
+ if st.st == 1 {
+ return ErrClosed
+ }
+
+ // Check for key
_, ok := st.fs[key]
if !ok {
return ErrNotFound
}
+
+ // Remove from store
delete(st.fs, key)
+
+ return nil
+}
+
+// Close implements Storage.Close().
+func (st *MemoryStorage) Close() error {
+ st.mu.Lock()
+ st.st = 1
+ st.mu.Unlock()
return nil
}
// WalkKeys implements Storage.WalkKeys().
func (st *MemoryStorage) WalkKeys(opts WalkKeysOptions) error {
+ // Lock storage
+ st.mu.Lock()
+ defer st.mu.Unlock()
+
+ // Check store open
+ if st.st == 1 {
+ return ErrClosed
+ }
+
+ // Walk store keys
for key := range st.fs {
opts.WalkFn(entry(key))
}
+
return nil
}
diff --git a/vendor/codeberg.org/gruf/go-store/storage/storage.go b/vendor/codeberg.org/gruf/go-store/storage/storage.go
index b160267a4..346aff097 100644
--- a/vendor/codeberg.org/gruf/go-store/storage/storage.go
+++ b/vendor/codeberg.org/gruf/go-store/storage/storage.go
@@ -19,9 +19,6 @@ func (e entry) Key() string {
// Storage defines a means of storing and accessing key value pairs
type Storage interface {
- // Clean removes unused values and unclutters the storage (e.g. removing empty folders)
- Clean() error
-
// ReadBytes returns the byte value for key in storage
ReadBytes(key string) ([]byte, error)
@@ -40,6 +37,12 @@ type Storage interface {
// Remove attempts to remove the supplied key-value pair from storage
Remove(key string) error
+ // Close will close the storage, releasing any file locks
+ Close() error
+
+ // Clean removes unused values and unclutters the storage (e.g. removing empty folders)
+ Clean() error
+
// WalkKeys walks the keys in the storage
WalkKeys(opts WalkKeysOptions) error
}
diff --git a/vendor/codeberg.org/gruf/go-store/util/fs.go b/vendor/codeberg.org/gruf/go-store/util/fs.go
index 93b37a261..53fef7750 100644
--- a/vendor/codeberg.org/gruf/go-store/util/fs.go
+++ b/vendor/codeberg.org/gruf/go-store/util/fs.go
@@ -3,30 +3,10 @@ package util
import (
"io/fs"
"os"
- "strings"
- "syscall"
"codeberg.org/gruf/go-fastpath"
)
-// IsDirTraversal will check if rootPlusPath is a dir traversal outside of root,
-// assuming that both are cleaned and that rootPlusPath is path.Join(root, somePath)
-func IsDirTraversal(root string, rootPlusPath string) bool {
- switch {
- // Root is $PWD, check for traversal out of
- case root == ".":
- return strings.HasPrefix(rootPlusPath, "../")
-
- // The path MUST be prefixed by root
- case !strings.HasPrefix(rootPlusPath, root):
- return true
-
- // In all other cases, check not equal
- default:
- return len(root) == len(rootPlusPath)
- }
-}
-
// WalkDir traverses the dir tree of the supplied path, performing the supplied walkFn on each entry
func WalkDir(pb *fastpath.Builder, path string, walkFn func(string, fs.DirEntry)) error {
// Read supplied dir path
@@ -100,14 +80,3 @@ func cleanDirs(pb *fastpath.Builder, path string) error {
}
return nil
}
-
-// RetryOnEINTR is a low-level filesystem function for retrying syscalls on O_EINTR received
-func RetryOnEINTR(do func() error) error {
- for {
- err := do()
- if err == syscall.EINTR {
- continue
- }
- return err
- }
-}
diff --git a/vendor/codeberg.org/gruf/go-store/util/sys.go b/vendor/codeberg.org/gruf/go-store/util/sys.go
new file mode 100644
index 000000000..6661029e5
--- /dev/null
+++ b/vendor/codeberg.org/gruf/go-store/util/sys.go
@@ -0,0 +1,14 @@
+package util
+
+import "syscall"
+
+// RetryOnEINTR is a low-level filesystem function for retrying syscalls on O_EINTR received
+func RetryOnEINTR(do func() error) error {
+ for {
+ err := do()
+ if err == syscall.EINTR {
+ continue
+ }
+ return err
+ }
+}