diff options
Diffstat (limited to 'vendor/codeberg.org/gruf/go-store/kv')
-rw-r--r-- | vendor/codeberg.org/gruf/go-store/kv/iterator.go | 10 | ||||
-rw-r--r-- | vendor/codeberg.org/gruf/go-store/kv/state.go | 82 | ||||
-rw-r--r-- | vendor/codeberg.org/gruf/go-store/kv/store.go | 106 |
3 files changed, 64 insertions, 134 deletions
diff --git a/vendor/codeberg.org/gruf/go-store/kv/iterator.go b/vendor/codeberg.org/gruf/go-store/kv/iterator.go index d3999273f..da743ead1 100644 --- a/vendor/codeberg.org/gruf/go-store/kv/iterator.go +++ b/vendor/codeberg.org/gruf/go-store/kv/iterator.go @@ -2,6 +2,7 @@ package kv import ( "codeberg.org/gruf/go-errors" + "codeberg.org/gruf/go-mutexes" "codeberg.org/gruf/go-store/storage" ) @@ -17,10 +18,10 @@ var ErrIteratorClosed = errors.New("store/kv: iterator closed") // have multiple iterators running concurrently type KVIterator struct { store *KVStore // store is the linked KVStore + state *mutexes.LockState entries []storage.StorageEntry index int key string - onClose func() } // Next attempts to set the next key-value pair, the @@ -43,13 +44,10 @@ func (i *KVIterator) Key() string { // Release releases the KVIterator and KVStore's read lock func (i *KVIterator) Release() { - // Reset key, path, entries + i.state.UnlockMap() i.store = nil i.key = "" i.entries = nil - - // Perform requested callback - i.onClose() } // Value returns the next value from the KVStore @@ -60,5 +58,5 @@ func (i *KVIterator) Value() ([]byte, error) { } // Attempt to fetch from store - return i.store.get(i.key) + return i.store.get(i.state.RLock, i.key) } diff --git a/vendor/codeberg.org/gruf/go-store/kv/state.go b/vendor/codeberg.org/gruf/go-store/kv/state.go index 20a3e951d..0b226e107 100644 --- a/vendor/codeberg.org/gruf/go-store/kv/state.go +++ b/vendor/codeberg.org/gruf/go-store/kv/state.go @@ -2,9 +2,9 @@ package kv import ( "io" - "sync" "codeberg.org/gruf/go-errors" + "codeberg.org/gruf/go-mutexes" ) var ErrStateClosed = errors.New("store/kv: state closed") @@ -16,61 +16,42 @@ var ErrStateClosed = errors.New("store/kv: state closed") // then the state has zero guarantees type StateRO struct { store *KVStore - mutex sync.RWMutex + state *mutexes.LockState } func (st *StateRO) Get(key string) ([]byte, error) { - // Get state read lock - st.mutex.RLock() - defer st.mutex.RUnlock() - // Check not closed if st.store == nil { return nil, ErrStateClosed } // Pass request to store - return st.store.get(key) + return st.store.get(st.state.RLock, key) } func (st *StateRO) GetStream(key string) (io.ReadCloser, error) { - // Get state read lock - st.mutex.RLock() - defer st.mutex.RUnlock() - // Check not closed if st.store == nil { return nil, ErrStateClosed } // Pass request to store - return st.store.getStream(key) + return st.store.getStream(st.state.RLock, key) } func (st *StateRO) Has(key string) (bool, error) { - // Get state read lock - st.mutex.RLock() - defer st.mutex.RUnlock() - // Check not closed if st.store == nil { return false, ErrStateClosed } // Pass request to store - return st.store.has(key) + return st.store.has(st.state.RLock, key) } func (st *StateRO) Release() { - // Get state write lock - st.mutex.Lock() - defer st.mutex.Unlock() - - // Release the store - if st.store != nil { - st.store.mutex.RUnlock() - st.store = nil - } + st.state.UnlockMap() + st.store = nil } // StateRW provides a read-write window to the store. While this @@ -80,101 +61,70 @@ func (st *StateRO) Release() { // then the state has zero guarantees type StateRW struct { store *KVStore - mutex sync.RWMutex + state *mutexes.LockState } func (st *StateRW) Get(key string) ([]byte, error) { - // Get state read lock - st.mutex.RLock() - defer st.mutex.RUnlock() - // Check not closed if st.store == nil { return nil, ErrStateClosed } // Pass request to store - return st.store.get(key) + return st.store.get(st.state.RLock, key) } func (st *StateRW) GetStream(key string) (io.ReadCloser, error) { - // Get state read lock - st.mutex.RLock() - defer st.mutex.RUnlock() - // Check not closed if st.store == nil { return nil, ErrStateClosed } // Pass request to store - return st.store.getStream(key) + return st.store.getStream(st.state.RLock, key) } func (st *StateRW) Put(key string, value []byte) error { - // Get state read lock - st.mutex.RLock() - defer st.mutex.RUnlock() - // Check not closed if st.store == nil { return ErrStateClosed } // Pass request to store - return st.store.put(key, value) + return st.store.put(st.state.Lock, key, value) } func (st *StateRW) PutStream(key string, r io.Reader) error { - // Get state read lock - st.mutex.RLock() - defer st.mutex.RUnlock() - // Check not closed if st.store == nil { return ErrStateClosed } // Pass request to store - return st.store.putStream(key, r) + return st.store.putStream(st.state.Lock, key, r) } func (st *StateRW) Has(key string) (bool, error) { - // Get state read lock - st.mutex.RLock() - defer st.mutex.RUnlock() - // Check not closed if st.store == nil { return false, ErrStateClosed } // Pass request to store - return st.store.has(key) + return st.store.has(st.state.RLock, key) } func (st *StateRW) Delete(key string) error { - // Get state read lock - st.mutex.RLock() - defer st.mutex.RUnlock() - // Check not closed if st.store == nil { return ErrStateClosed } // Pass request to store - return st.store.delete(key) + return st.store.delete(st.state.Lock, key) } func (st *StateRW) Release() { - // Get state write lock - st.mutex.Lock() - defer st.mutex.Unlock() - - // Release the store - if st.store != nil { - st.store.mutex.Unlock() - st.store = nil - } + st.state.UnlockMap() + st.store = nil } diff --git a/vendor/codeberg.org/gruf/go-store/kv/store.go b/vendor/codeberg.org/gruf/go-store/kv/store.go index 34fe91987..a8741afe0 100644 --- a/vendor/codeberg.org/gruf/go-store/kv/store.go +++ b/vendor/codeberg.org/gruf/go-store/kv/store.go @@ -2,7 +2,6 @@ package kv import ( "io" - "sync" "codeberg.org/gruf/go-mutexes" "codeberg.org/gruf/go-store/storage" @@ -11,9 +10,8 @@ import ( // KVStore is a very simple, yet performant key-value store type KVStore struct { - mutexMap mutexes.MutexMap // mutexMap is a map of keys to mutexes to protect file access - mutex sync.RWMutex // mutex is the total store mutex - storage storage.Storage // storage is the underlying storage + mutex mutexes.MutexMap // mutex is a map of keys to mutexes to protect file access + storage storage.Storage // storage is the underlying storage } func OpenFile(path string, cfg *storage.DiskConfig) (*KVStore, error) { @@ -47,25 +45,29 @@ func OpenStorage(storage storage.Storage) (*KVStore, error) { // Return new KVStore return &KVStore{ - mutexMap: mutexes.NewMap(mutexes.NewRW), - mutex: sync.RWMutex{}, - storage: storage, + mutex: mutexes.NewMap(-1), + storage: storage, }, nil } +// RLock acquires a read-lock on supplied key, returning unlock function. +func (st *KVStore) RLock(key string) (runlock func()) { + return st.mutex.RLock(key) +} + +// Lock acquires a write-lock on supplied key, returning unlock function. +func (st *KVStore) Lock(key string) (unlock func()) { + return st.mutex.Lock(key) +} + // Get fetches the bytes for supplied key in the store func (st *KVStore) Get(key string) ([]byte, error) { - // Acquire store read lock - st.mutex.RLock() - defer st.mutex.RUnlock() - - // Pass to unprotected fn - return st.get(key) + return st.get(st.RLock, key) } -func (st *KVStore) get(key string) ([]byte, error) { +func (st *KVStore) get(rlock func(string) func(), key string) ([]byte, error) { // Acquire read lock for key - runlock := st.mutexMap.RLock(key) + runlock := rlock(key) defer runlock() // Read file bytes @@ -74,17 +76,12 @@ func (st *KVStore) get(key string) ([]byte, error) { // GetStream fetches a ReadCloser for the bytes at the supplied key location in the store func (st *KVStore) GetStream(key string) (io.ReadCloser, error) { - // Acquire store read lock - st.mutex.RLock() - defer st.mutex.RUnlock() - - // Pass to unprotected fn - return st.getStream(key) + return st.getStream(st.RLock, key) } -func (st *KVStore) getStream(key string) (io.ReadCloser, error) { +func (st *KVStore) getStream(rlock func(string) func(), key string) (io.ReadCloser, error) { // Acquire read lock for key - runlock := st.mutexMap.RLock(key) + runlock := rlock(key) // Attempt to open stream for read rd, err := st.storage.ReadStream(key) @@ -99,17 +96,12 @@ func (st *KVStore) getStream(key string) (io.ReadCloser, error) { // Put places the bytes at the supplied key location in the store func (st *KVStore) Put(key string, value []byte) error { - // Acquire store write lock - st.mutex.Lock() - defer st.mutex.Unlock() - - // Pass to unprotected fn - return st.put(key, value) + return st.put(st.Lock, key, value) } -func (st *KVStore) put(key string, value []byte) error { +func (st *KVStore) put(lock func(string) func(), key string, value []byte) error { // Acquire write lock for key - unlock := st.mutexMap.Lock(key) + unlock := lock(key) defer unlock() // Write file bytes @@ -118,17 +110,12 @@ func (st *KVStore) put(key string, value []byte) error { // PutStream writes the bytes from the supplied Reader at the supplied key location in the store func (st *KVStore) PutStream(key string, r io.Reader) error { - // Acquire store write lock - st.mutex.Lock() - defer st.mutex.Unlock() - - // Pass to unprotected fn - return st.putStream(key, r) + return st.putStream(st.Lock, key, r) } -func (st *KVStore) putStream(key string, r io.Reader) error { +func (st *KVStore) putStream(lock func(string) func(), key string, r io.Reader) error { // Acquire write lock for key - unlock := st.mutexMap.Lock(key) + unlock := lock(key) defer unlock() // Write file stream @@ -137,17 +124,12 @@ func (st *KVStore) putStream(key string, r io.Reader) error { // Has checks whether the supplied key exists in the store func (st *KVStore) Has(key string) (bool, error) { - // Acquire store read lock - st.mutex.RLock() - defer st.mutex.RUnlock() - - // Pass to unprotected fn - return st.has(key) + return st.has(st.RLock, key) } -func (st *KVStore) has(key string) (bool, error) { +func (st *KVStore) has(rlock func(string) func(), key string) (bool, error) { // Acquire read lock for key - runlock := st.mutexMap.RLock(key) + runlock := rlock(key) defer runlock() // Stat file on disk @@ -156,17 +138,12 @@ func (st *KVStore) has(key string) (bool, error) { // Delete removes the supplied key-value pair from the store func (st *KVStore) Delete(key string) error { - // Acquire store write lock - st.mutex.Lock() - defer st.mutex.Unlock() - - // Pass to unprotected fn - return st.delete(key) + return st.delete(st.Lock, key) } -func (st *KVStore) delete(key string) error { +func (st *KVStore) delete(lock func(string) func(), key string) error { // Acquire write lock for key - unlock := st.mutexMap.Lock(key) + unlock := lock(key) defer unlock() // Remove file from disk @@ -181,7 +158,7 @@ func (st *KVStore) Iterator(matchFn func(string) bool) (*KVIterator, error) { } // Get store read lock - st.mutex.RLock() + state := st.mutex.RLockMap() // Setup the walk keys function entries := []storage.StorageEntry{} @@ -198,24 +175,24 @@ func (st *KVStore) Iterator(matchFn func(string) bool) (*KVIterator, error) { // Walk keys in the storage err := st.storage.WalkKeys(storage.WalkKeysOptions{WalkFn: walkFn}) if err != nil { - st.mutex.RUnlock() + state.UnlockMap() return nil, err } // Return new iterator return &KVIterator{ store: st, + state: state, entries: entries, index: -1, key: "", - onClose: st.mutex.RUnlock, }, nil } // Read provides a read-only window to the store, holding it in a read-locked state until release func (st *KVStore) Read() *StateRO { - st.mutex.RLock() - return &StateRO{store: st} + state := st.mutex.RLockMap() + return &StateRO{store: st, state: state} } // ReadFn provides a read-only window to the store, holding it in a read-locked state until fn return. @@ -230,8 +207,8 @@ func (st *KVStore) ReadFn(fn func(*StateRO)) { // Update provides a read-write window to the store, holding it in a write-locked state until release func (st *KVStore) Update() *StateRW { - st.mutex.Lock() - return &StateRW{store: st} + state := st.mutex.LockMap() + return &StateRW{store: st, state: state} } // UpdateFn provides a read-write window to the store, holding it in a write-locked state until fn return. @@ -243,3 +220,8 @@ func (st *KVStore) UpdateFn(fn func(*StateRW)) { // Pass to fn fn(state) } + +// Close will close the underlying storage, the mutex map locking (e.g. RLock(), Lock() will still work). +func (st *KVStore) Close() error { + return st.storage.Close() +} |