summaryrefslogtreecommitdiff
path: root/vendor/codeberg.org/gruf/go-store/kv
diff options
context:
space:
mode:
Diffstat (limited to 'vendor/codeberg.org/gruf/go-store/kv')
-rw-r--r--vendor/codeberg.org/gruf/go-store/kv/iterator.go10
-rw-r--r--vendor/codeberg.org/gruf/go-store/kv/state.go82
-rw-r--r--vendor/codeberg.org/gruf/go-store/kv/store.go106
3 files changed, 64 insertions, 134 deletions
diff --git a/vendor/codeberg.org/gruf/go-store/kv/iterator.go b/vendor/codeberg.org/gruf/go-store/kv/iterator.go
index d3999273f..da743ead1 100644
--- a/vendor/codeberg.org/gruf/go-store/kv/iterator.go
+++ b/vendor/codeberg.org/gruf/go-store/kv/iterator.go
@@ -2,6 +2,7 @@ package kv
import (
"codeberg.org/gruf/go-errors"
+ "codeberg.org/gruf/go-mutexes"
"codeberg.org/gruf/go-store/storage"
)
@@ -17,10 +18,10 @@ var ErrIteratorClosed = errors.New("store/kv: iterator closed")
// have multiple iterators running concurrently
type KVIterator struct {
store *KVStore // store is the linked KVStore
+ state *mutexes.LockState
entries []storage.StorageEntry
index int
key string
- onClose func()
}
// Next attempts to set the next key-value pair, the
@@ -43,13 +44,10 @@ func (i *KVIterator) Key() string {
// Release releases the KVIterator and KVStore's read lock
func (i *KVIterator) Release() {
- // Reset key, path, entries
+ i.state.UnlockMap()
i.store = nil
i.key = ""
i.entries = nil
-
- // Perform requested callback
- i.onClose()
}
// Value returns the next value from the KVStore
@@ -60,5 +58,5 @@ func (i *KVIterator) Value() ([]byte, error) {
}
// Attempt to fetch from store
- return i.store.get(i.key)
+ return i.store.get(i.state.RLock, i.key)
}
diff --git a/vendor/codeberg.org/gruf/go-store/kv/state.go b/vendor/codeberg.org/gruf/go-store/kv/state.go
index 20a3e951d..0b226e107 100644
--- a/vendor/codeberg.org/gruf/go-store/kv/state.go
+++ b/vendor/codeberg.org/gruf/go-store/kv/state.go
@@ -2,9 +2,9 @@ package kv
import (
"io"
- "sync"
"codeberg.org/gruf/go-errors"
+ "codeberg.org/gruf/go-mutexes"
)
var ErrStateClosed = errors.New("store/kv: state closed")
@@ -16,61 +16,42 @@ var ErrStateClosed = errors.New("store/kv: state closed")
// then the state has zero guarantees
type StateRO struct {
store *KVStore
- mutex sync.RWMutex
+ state *mutexes.LockState
}
func (st *StateRO) Get(key string) ([]byte, error) {
- // Get state read lock
- st.mutex.RLock()
- defer st.mutex.RUnlock()
-
// Check not closed
if st.store == nil {
return nil, ErrStateClosed
}
// Pass request to store
- return st.store.get(key)
+ return st.store.get(st.state.RLock, key)
}
func (st *StateRO) GetStream(key string) (io.ReadCloser, error) {
- // Get state read lock
- st.mutex.RLock()
- defer st.mutex.RUnlock()
-
// Check not closed
if st.store == nil {
return nil, ErrStateClosed
}
// Pass request to store
- return st.store.getStream(key)
+ return st.store.getStream(st.state.RLock, key)
}
func (st *StateRO) Has(key string) (bool, error) {
- // Get state read lock
- st.mutex.RLock()
- defer st.mutex.RUnlock()
-
// Check not closed
if st.store == nil {
return false, ErrStateClosed
}
// Pass request to store
- return st.store.has(key)
+ return st.store.has(st.state.RLock, key)
}
func (st *StateRO) Release() {
- // Get state write lock
- st.mutex.Lock()
- defer st.mutex.Unlock()
-
- // Release the store
- if st.store != nil {
- st.store.mutex.RUnlock()
- st.store = nil
- }
+ st.state.UnlockMap()
+ st.store = nil
}
// StateRW provides a read-write window to the store. While this
@@ -80,101 +61,70 @@ func (st *StateRO) Release() {
// then the state has zero guarantees
type StateRW struct {
store *KVStore
- mutex sync.RWMutex
+ state *mutexes.LockState
}
func (st *StateRW) Get(key string) ([]byte, error) {
- // Get state read lock
- st.mutex.RLock()
- defer st.mutex.RUnlock()
-
// Check not closed
if st.store == nil {
return nil, ErrStateClosed
}
// Pass request to store
- return st.store.get(key)
+ return st.store.get(st.state.RLock, key)
}
func (st *StateRW) GetStream(key string) (io.ReadCloser, error) {
- // Get state read lock
- st.mutex.RLock()
- defer st.mutex.RUnlock()
-
// Check not closed
if st.store == nil {
return nil, ErrStateClosed
}
// Pass request to store
- return st.store.getStream(key)
+ return st.store.getStream(st.state.RLock, key)
}
func (st *StateRW) Put(key string, value []byte) error {
- // Get state read lock
- st.mutex.RLock()
- defer st.mutex.RUnlock()
-
// Check not closed
if st.store == nil {
return ErrStateClosed
}
// Pass request to store
- return st.store.put(key, value)
+ return st.store.put(st.state.Lock, key, value)
}
func (st *StateRW) PutStream(key string, r io.Reader) error {
- // Get state read lock
- st.mutex.RLock()
- defer st.mutex.RUnlock()
-
// Check not closed
if st.store == nil {
return ErrStateClosed
}
// Pass request to store
- return st.store.putStream(key, r)
+ return st.store.putStream(st.state.Lock, key, r)
}
func (st *StateRW) Has(key string) (bool, error) {
- // Get state read lock
- st.mutex.RLock()
- defer st.mutex.RUnlock()
-
// Check not closed
if st.store == nil {
return false, ErrStateClosed
}
// Pass request to store
- return st.store.has(key)
+ return st.store.has(st.state.RLock, key)
}
func (st *StateRW) Delete(key string) error {
- // Get state read lock
- st.mutex.RLock()
- defer st.mutex.RUnlock()
-
// Check not closed
if st.store == nil {
return ErrStateClosed
}
// Pass request to store
- return st.store.delete(key)
+ return st.store.delete(st.state.Lock, key)
}
func (st *StateRW) Release() {
- // Get state write lock
- st.mutex.Lock()
- defer st.mutex.Unlock()
-
- // Release the store
- if st.store != nil {
- st.store.mutex.Unlock()
- st.store = nil
- }
+ st.state.UnlockMap()
+ st.store = nil
}
diff --git a/vendor/codeberg.org/gruf/go-store/kv/store.go b/vendor/codeberg.org/gruf/go-store/kv/store.go
index 34fe91987..a8741afe0 100644
--- a/vendor/codeberg.org/gruf/go-store/kv/store.go
+++ b/vendor/codeberg.org/gruf/go-store/kv/store.go
@@ -2,7 +2,6 @@ package kv
import (
"io"
- "sync"
"codeberg.org/gruf/go-mutexes"
"codeberg.org/gruf/go-store/storage"
@@ -11,9 +10,8 @@ import (
// KVStore is a very simple, yet performant key-value store
type KVStore struct {
- mutexMap mutexes.MutexMap // mutexMap is a map of keys to mutexes to protect file access
- mutex sync.RWMutex // mutex is the total store mutex
- storage storage.Storage // storage is the underlying storage
+ mutex mutexes.MutexMap // mutex is a map of keys to mutexes to protect file access
+ storage storage.Storage // storage is the underlying storage
}
func OpenFile(path string, cfg *storage.DiskConfig) (*KVStore, error) {
@@ -47,25 +45,29 @@ func OpenStorage(storage storage.Storage) (*KVStore, error) {
// Return new KVStore
return &KVStore{
- mutexMap: mutexes.NewMap(mutexes.NewRW),
- mutex: sync.RWMutex{},
- storage: storage,
+ mutex: mutexes.NewMap(-1),
+ storage: storage,
}, nil
}
+// RLock acquires a read-lock on supplied key, returning unlock function.
+func (st *KVStore) RLock(key string) (runlock func()) {
+ return st.mutex.RLock(key)
+}
+
+// Lock acquires a write-lock on supplied key, returning unlock function.
+func (st *KVStore) Lock(key string) (unlock func()) {
+ return st.mutex.Lock(key)
+}
+
// Get fetches the bytes for supplied key in the store
func (st *KVStore) Get(key string) ([]byte, error) {
- // Acquire store read lock
- st.mutex.RLock()
- defer st.mutex.RUnlock()
-
- // Pass to unprotected fn
- return st.get(key)
+ return st.get(st.RLock, key)
}
-func (st *KVStore) get(key string) ([]byte, error) {
+func (st *KVStore) get(rlock func(string) func(), key string) ([]byte, error) {
// Acquire read lock for key
- runlock := st.mutexMap.RLock(key)
+ runlock := rlock(key)
defer runlock()
// Read file bytes
@@ -74,17 +76,12 @@ func (st *KVStore) get(key string) ([]byte, error) {
// GetStream fetches a ReadCloser for the bytes at the supplied key location in the store
func (st *KVStore) GetStream(key string) (io.ReadCloser, error) {
- // Acquire store read lock
- st.mutex.RLock()
- defer st.mutex.RUnlock()
-
- // Pass to unprotected fn
- return st.getStream(key)
+ return st.getStream(st.RLock, key)
}
-func (st *KVStore) getStream(key string) (io.ReadCloser, error) {
+func (st *KVStore) getStream(rlock func(string) func(), key string) (io.ReadCloser, error) {
// Acquire read lock for key
- runlock := st.mutexMap.RLock(key)
+ runlock := rlock(key)
// Attempt to open stream for read
rd, err := st.storage.ReadStream(key)
@@ -99,17 +96,12 @@ func (st *KVStore) getStream(key string) (io.ReadCloser, error) {
// Put places the bytes at the supplied key location in the store
func (st *KVStore) Put(key string, value []byte) error {
- // Acquire store write lock
- st.mutex.Lock()
- defer st.mutex.Unlock()
-
- // Pass to unprotected fn
- return st.put(key, value)
+ return st.put(st.Lock, key, value)
}
-func (st *KVStore) put(key string, value []byte) error {
+func (st *KVStore) put(lock func(string) func(), key string, value []byte) error {
// Acquire write lock for key
- unlock := st.mutexMap.Lock(key)
+ unlock := lock(key)
defer unlock()
// Write file bytes
@@ -118,17 +110,12 @@ func (st *KVStore) put(key string, value []byte) error {
// PutStream writes the bytes from the supplied Reader at the supplied key location in the store
func (st *KVStore) PutStream(key string, r io.Reader) error {
- // Acquire store write lock
- st.mutex.Lock()
- defer st.mutex.Unlock()
-
- // Pass to unprotected fn
- return st.putStream(key, r)
+ return st.putStream(st.Lock, key, r)
}
-func (st *KVStore) putStream(key string, r io.Reader) error {
+func (st *KVStore) putStream(lock func(string) func(), key string, r io.Reader) error {
// Acquire write lock for key
- unlock := st.mutexMap.Lock(key)
+ unlock := lock(key)
defer unlock()
// Write file stream
@@ -137,17 +124,12 @@ func (st *KVStore) putStream(key string, r io.Reader) error {
// Has checks whether the supplied key exists in the store
func (st *KVStore) Has(key string) (bool, error) {
- // Acquire store read lock
- st.mutex.RLock()
- defer st.mutex.RUnlock()
-
- // Pass to unprotected fn
- return st.has(key)
+ return st.has(st.RLock, key)
}
-func (st *KVStore) has(key string) (bool, error) {
+func (st *KVStore) has(rlock func(string) func(), key string) (bool, error) {
// Acquire read lock for key
- runlock := st.mutexMap.RLock(key)
+ runlock := rlock(key)
defer runlock()
// Stat file on disk
@@ -156,17 +138,12 @@ func (st *KVStore) has(key string) (bool, error) {
// Delete removes the supplied key-value pair from the store
func (st *KVStore) Delete(key string) error {
- // Acquire store write lock
- st.mutex.Lock()
- defer st.mutex.Unlock()
-
- // Pass to unprotected fn
- return st.delete(key)
+ return st.delete(st.Lock, key)
}
-func (st *KVStore) delete(key string) error {
+func (st *KVStore) delete(lock func(string) func(), key string) error {
// Acquire write lock for key
- unlock := st.mutexMap.Lock(key)
+ unlock := lock(key)
defer unlock()
// Remove file from disk
@@ -181,7 +158,7 @@ func (st *KVStore) Iterator(matchFn func(string) bool) (*KVIterator, error) {
}
// Get store read lock
- st.mutex.RLock()
+ state := st.mutex.RLockMap()
// Setup the walk keys function
entries := []storage.StorageEntry{}
@@ -198,24 +175,24 @@ func (st *KVStore) Iterator(matchFn func(string) bool) (*KVIterator, error) {
// Walk keys in the storage
err := st.storage.WalkKeys(storage.WalkKeysOptions{WalkFn: walkFn})
if err != nil {
- st.mutex.RUnlock()
+ state.UnlockMap()
return nil, err
}
// Return new iterator
return &KVIterator{
store: st,
+ state: state,
entries: entries,
index: -1,
key: "",
- onClose: st.mutex.RUnlock,
}, nil
}
// Read provides a read-only window to the store, holding it in a read-locked state until release
func (st *KVStore) Read() *StateRO {
- st.mutex.RLock()
- return &StateRO{store: st}
+ state := st.mutex.RLockMap()
+ return &StateRO{store: st, state: state}
}
// ReadFn provides a read-only window to the store, holding it in a read-locked state until fn return.
@@ -230,8 +207,8 @@ func (st *KVStore) ReadFn(fn func(*StateRO)) {
// Update provides a read-write window to the store, holding it in a write-locked state until release
func (st *KVStore) Update() *StateRW {
- st.mutex.Lock()
- return &StateRW{store: st}
+ state := st.mutex.LockMap()
+ return &StateRW{store: st, state: state}
}
// UpdateFn provides a read-write window to the store, holding it in a write-locked state until fn return.
@@ -243,3 +220,8 @@ func (st *KVStore) UpdateFn(fn func(*StateRW)) {
// Pass to fn
fn(state)
}
+
+// Close will close the underlying storage, the mutex map locking (e.g. RLock(), Lock() will still work).
+func (st *KVStore) Close() error {
+ return st.storage.Close()
+}