summaryrefslogtreecommitdiff
path: root/vendor/codeberg.org/gruf/go-store/kv/state.go
diff options
context:
space:
mode:
Diffstat (limited to 'vendor/codeberg.org/gruf/go-store/kv/state.go')
-rw-r--r--vendor/codeberg.org/gruf/go-store/kv/state.go63
1 files changed, 59 insertions, 4 deletions
diff --git a/vendor/codeberg.org/gruf/go-store/kv/state.go b/vendor/codeberg.org/gruf/go-store/kv/state.go
index a8c1b9c82..20a3e951d 100644
--- a/vendor/codeberg.org/gruf/go-store/kv/state.go
+++ b/vendor/codeberg.org/gruf/go-store/kv/state.go
@@ -2,6 +2,7 @@ package kv
import (
"io"
+ "sync"
"codeberg.org/gruf/go-errors"
)
@@ -15,9 +16,14 @@ var ErrStateClosed = errors.New("store/kv: state closed")
// then the state has zero guarantees
type StateRO struct {
store *KVStore
+ mutex sync.RWMutex
}
func (st *StateRO) Get(key string) ([]byte, error) {
+ // Get state read lock
+ st.mutex.RLock()
+ defer st.mutex.RUnlock()
+
// Check not closed
if st.store == nil {
return nil, ErrStateClosed
@@ -28,6 +34,10 @@ func (st *StateRO) Get(key string) ([]byte, error) {
}
func (st *StateRO) GetStream(key string) (io.ReadCloser, error) {
+ // Get state read lock
+ st.mutex.RLock()
+ defer st.mutex.RUnlock()
+
// Check not closed
if st.store == nil {
return nil, ErrStateClosed
@@ -38,6 +48,10 @@ func (st *StateRO) GetStream(key string) (io.ReadCloser, error) {
}
func (st *StateRO) Has(key string) (bool, error) {
+ // Get state read lock
+ st.mutex.RLock()
+ defer st.mutex.RUnlock()
+
// Check not closed
if st.store == nil {
return false, ErrStateClosed
@@ -47,8 +61,16 @@ func (st *StateRO) Has(key string) (bool, error) {
return st.store.has(key)
}
-func (st *StateRO) close() {
- st.store = nil
+func (st *StateRO) Release() {
+ // Get state write lock
+ st.mutex.Lock()
+ defer st.mutex.Unlock()
+
+ // Release the store
+ if st.store != nil {
+ st.store.mutex.RUnlock()
+ st.store = nil
+ }
}
// StateRW provides a read-write window to the store. While this
@@ -58,9 +80,14 @@ func (st *StateRO) close() {
// then the state has zero guarantees
type StateRW struct {
store *KVStore
+ mutex sync.RWMutex
}
func (st *StateRW) Get(key string) ([]byte, error) {
+ // Get state read lock
+ st.mutex.RLock()
+ defer st.mutex.RUnlock()
+
// Check not closed
if st.store == nil {
return nil, ErrStateClosed
@@ -71,6 +98,10 @@ func (st *StateRW) Get(key string) ([]byte, error) {
}
func (st *StateRW) GetStream(key string) (io.ReadCloser, error) {
+ // Get state read lock
+ st.mutex.RLock()
+ defer st.mutex.RUnlock()
+
// Check not closed
if st.store == nil {
return nil, ErrStateClosed
@@ -81,6 +112,10 @@ func (st *StateRW) GetStream(key string) (io.ReadCloser, error) {
}
func (st *StateRW) Put(key string, value []byte) error {
+ // Get state read lock
+ st.mutex.RLock()
+ defer st.mutex.RUnlock()
+
// Check not closed
if st.store == nil {
return ErrStateClosed
@@ -91,6 +126,10 @@ func (st *StateRW) Put(key string, value []byte) error {
}
func (st *StateRW) PutStream(key string, r io.Reader) error {
+ // Get state read lock
+ st.mutex.RLock()
+ defer st.mutex.RUnlock()
+
// Check not closed
if st.store == nil {
return ErrStateClosed
@@ -101,6 +140,10 @@ func (st *StateRW) PutStream(key string, r io.Reader) error {
}
func (st *StateRW) Has(key string) (bool, error) {
+ // Get state read lock
+ st.mutex.RLock()
+ defer st.mutex.RUnlock()
+
// Check not closed
if st.store == nil {
return false, ErrStateClosed
@@ -111,6 +154,10 @@ func (st *StateRW) Has(key string) (bool, error) {
}
func (st *StateRW) Delete(key string) error {
+ // Get state read lock
+ st.mutex.RLock()
+ defer st.mutex.RUnlock()
+
// Check not closed
if st.store == nil {
return ErrStateClosed
@@ -120,6 +167,14 @@ func (st *StateRW) Delete(key string) error {
return st.store.delete(key)
}
-func (st *StateRW) close() {
- st.store = nil
+func (st *StateRW) Release() {
+ // Get state write lock
+ st.mutex.Lock()
+ defer st.mutex.Unlock()
+
+ // Release the store
+ if st.store != nil {
+ st.store.mutex.Unlock()
+ st.store = nil
+ }
}