diff options
Diffstat (limited to 'vendor/codeberg.org/gruf/go-store/kv/state.go')
-rw-r--r-- | vendor/codeberg.org/gruf/go-store/kv/state.go | 63 |
1 files changed, 59 insertions, 4 deletions
diff --git a/vendor/codeberg.org/gruf/go-store/kv/state.go b/vendor/codeberg.org/gruf/go-store/kv/state.go index a8c1b9c82..20a3e951d 100644 --- a/vendor/codeberg.org/gruf/go-store/kv/state.go +++ b/vendor/codeberg.org/gruf/go-store/kv/state.go @@ -2,6 +2,7 @@ package kv import ( "io" + "sync" "codeberg.org/gruf/go-errors" ) @@ -15,9 +16,14 @@ var ErrStateClosed = errors.New("store/kv: state closed") // then the state has zero guarantees type StateRO struct { store *KVStore + mutex sync.RWMutex } func (st *StateRO) Get(key string) ([]byte, error) { + // Get state read lock + st.mutex.RLock() + defer st.mutex.RUnlock() + // Check not closed if st.store == nil { return nil, ErrStateClosed @@ -28,6 +34,10 @@ func (st *StateRO) Get(key string) ([]byte, error) { } func (st *StateRO) GetStream(key string) (io.ReadCloser, error) { + // Get state read lock + st.mutex.RLock() + defer st.mutex.RUnlock() + // Check not closed if st.store == nil { return nil, ErrStateClosed @@ -38,6 +48,10 @@ func (st *StateRO) GetStream(key string) (io.ReadCloser, error) { } func (st *StateRO) Has(key string) (bool, error) { + // Get state read lock + st.mutex.RLock() + defer st.mutex.RUnlock() + // Check not closed if st.store == nil { return false, ErrStateClosed @@ -47,8 +61,16 @@ func (st *StateRO) Has(key string) (bool, error) { return st.store.has(key) } -func (st *StateRO) close() { - st.store = nil +func (st *StateRO) Release() { + // Get state write lock + st.mutex.Lock() + defer st.mutex.Unlock() + + // Release the store + if st.store != nil { + st.store.mutex.RUnlock() + st.store = nil + } } // StateRW provides a read-write window to the store. While this @@ -58,9 +80,14 @@ func (st *StateRO) close() { // then the state has zero guarantees type StateRW struct { store *KVStore + mutex sync.RWMutex } func (st *StateRW) Get(key string) ([]byte, error) { + // Get state read lock + st.mutex.RLock() + defer st.mutex.RUnlock() + // Check not closed if st.store == nil { return nil, ErrStateClosed @@ -71,6 +98,10 @@ func (st *StateRW) Get(key string) ([]byte, error) { } func (st *StateRW) GetStream(key string) (io.ReadCloser, error) { + // Get state read lock + st.mutex.RLock() + defer st.mutex.RUnlock() + // Check not closed if st.store == nil { return nil, ErrStateClosed @@ -81,6 +112,10 @@ func (st *StateRW) GetStream(key string) (io.ReadCloser, error) { } func (st *StateRW) Put(key string, value []byte) error { + // Get state read lock + st.mutex.RLock() + defer st.mutex.RUnlock() + // Check not closed if st.store == nil { return ErrStateClosed @@ -91,6 +126,10 @@ func (st *StateRW) Put(key string, value []byte) error { } func (st *StateRW) PutStream(key string, r io.Reader) error { + // Get state read lock + st.mutex.RLock() + defer st.mutex.RUnlock() + // Check not closed if st.store == nil { return ErrStateClosed @@ -101,6 +140,10 @@ func (st *StateRW) PutStream(key string, r io.Reader) error { } func (st *StateRW) Has(key string) (bool, error) { + // Get state read lock + st.mutex.RLock() + defer st.mutex.RUnlock() + // Check not closed if st.store == nil { return false, ErrStateClosed @@ -111,6 +154,10 @@ func (st *StateRW) Has(key string) (bool, error) { } func (st *StateRW) Delete(key string) error { + // Get state read lock + st.mutex.RLock() + defer st.mutex.RUnlock() + // Check not closed if st.store == nil { return ErrStateClosed @@ -120,6 +167,14 @@ func (st *StateRW) Delete(key string) error { return st.store.delete(key) } -func (st *StateRW) close() { - st.store = nil +func (st *StateRW) Release() { + // Get state write lock + st.mutex.Lock() + defer st.mutex.Unlock() + + // Release the store + if st.store != nil { + st.store.mutex.Unlock() + st.store = nil + } } |