diff options
author | 2022-01-24 17:35:13 +0100 | |
---|---|---|
committer | 2022-01-24 17:35:13 +0100 | |
commit | f28cf793ee53e8391c9eabbfba93afbc5b59936b (patch) | |
tree | fcd6ec6fc4fa011017fb30fce0f52fc947a4d226 /vendor/codeberg.org/gruf/go-store | |
parent | update remote account get/deref logic (diff) | |
download | gotosocial-f28cf793ee53e8391c9eabbfba93afbc5b59936b.tar.xz |
upgrade go-store
Diffstat (limited to 'vendor/codeberg.org/gruf/go-store')
-rw-r--r-- | vendor/codeberg.org/gruf/go-store/kv/iterator.go | 10 | ||||
-rw-r--r-- | vendor/codeberg.org/gruf/go-store/kv/state.go | 82 | ||||
-rw-r--r-- | vendor/codeberg.org/gruf/go-store/kv/store.go | 40 | ||||
-rw-r--r-- | vendor/codeberg.org/gruf/go-store/storage/block.go | 86 | ||||
-rw-r--r-- | vendor/codeberg.org/gruf/go-store/storage/disk.go | 67 | ||||
-rw-r--r-- | vendor/codeberg.org/gruf/go-store/storage/errors.go | 14 | ||||
-rw-r--r-- | vendor/codeberg.org/gruf/go-store/storage/fs.go | 9 | ||||
-rw-r--r-- | vendor/codeberg.org/gruf/go-store/storage/lock.go | 75 | ||||
-rw-r--r-- | vendor/codeberg.org/gruf/go-store/storage/memory.go | 78 |
9 files changed, 309 insertions, 152 deletions
diff --git a/vendor/codeberg.org/gruf/go-store/kv/iterator.go b/vendor/codeberg.org/gruf/go-store/kv/iterator.go index ddaaf60cf..da743ead1 100644 --- a/vendor/codeberg.org/gruf/go-store/kv/iterator.go +++ b/vendor/codeberg.org/gruf/go-store/kv/iterator.go @@ -2,6 +2,7 @@ package kv import ( "codeberg.org/gruf/go-errors" + "codeberg.org/gruf/go-mutexes" "codeberg.org/gruf/go-store/storage" ) @@ -17,10 +18,10 @@ var ErrIteratorClosed = errors.New("store/kv: iterator closed") // have multiple iterators running concurrently type KVIterator struct { store *KVStore // store is the linked KVStore + state *mutexes.LockState entries []storage.StorageEntry index int key string - onClose func() } // Next attempts to set the next key-value pair, the @@ -43,13 +44,10 @@ func (i *KVIterator) Key() string { // Release releases the KVIterator and KVStore's read lock func (i *KVIterator) Release() { - // Reset key, path, entries + i.state.UnlockMap() i.store = nil i.key = "" i.entries = nil - - // Perform requested callback - i.onClose() } // Value returns the next value from the KVStore @@ -60,5 +58,5 @@ func (i *KVIterator) Value() ([]byte, error) { } // Attempt to fetch from store - return i.store.get(i.store.mutexMap.RLock, i.key) + return i.store.get(i.state.RLock, i.key) } diff --git a/vendor/codeberg.org/gruf/go-store/kv/state.go b/vendor/codeberg.org/gruf/go-store/kv/state.go index 330928bce..0b226e107 100644 --- a/vendor/codeberg.org/gruf/go-store/kv/state.go +++ b/vendor/codeberg.org/gruf/go-store/kv/state.go @@ -2,9 +2,9 @@ package kv import ( "io" - "sync" "codeberg.org/gruf/go-errors" + "codeberg.org/gruf/go-mutexes" ) var ErrStateClosed = errors.New("store/kv: state closed") @@ -16,61 +16,42 @@ var ErrStateClosed = errors.New("store/kv: state closed") // then the state has zero guarantees type StateRO struct { store *KVStore - mutex sync.RWMutex + state *mutexes.LockState } func (st *StateRO) Get(key string) ([]byte, error) { - // Get state read lock - st.mutex.RLock() - defer st.mutex.RUnlock() - // Check not closed if st.store == nil { return nil, ErrStateClosed } // Pass request to store - return st.store.get(st.store.mutexMap.RLock, key) + return st.store.get(st.state.RLock, key) } func (st *StateRO) GetStream(key string) (io.ReadCloser, error) { - // Get state read lock - st.mutex.RLock() - defer st.mutex.RUnlock() - // Check not closed if st.store == nil { return nil, ErrStateClosed } // Pass request to store - return st.store.getStream(st.store.mutexMap.RLock, key) + return st.store.getStream(st.state.RLock, key) } func (st *StateRO) Has(key string) (bool, error) { - // Get state read lock - st.mutex.RLock() - defer st.mutex.RUnlock() - // Check not closed if st.store == nil { return false, ErrStateClosed } // Pass request to store - return st.store.has(st.store.mutexMap.RLock, key) + return st.store.has(st.state.RLock, key) } func (st *StateRO) Release() { - // Get state write lock - st.mutex.Lock() - defer st.mutex.Unlock() - - // Release the store - if st.store != nil { - st.store.mutex.RUnlock() - st.store = nil - } + st.state.UnlockMap() + st.store = nil } // StateRW provides a read-write window to the store. While this @@ -80,101 +61,70 @@ func (st *StateRO) Release() { // then the state has zero guarantees type StateRW struct { store *KVStore - mutex sync.RWMutex + state *mutexes.LockState } func (st *StateRW) Get(key string) ([]byte, error) { - // Get state read lock - st.mutex.RLock() - defer st.mutex.RUnlock() - // Check not closed if st.store == nil { return nil, ErrStateClosed } // Pass request to store - return st.store.get(st.store.mutexMap.RLock, key) + return st.store.get(st.state.RLock, key) } func (st *StateRW) GetStream(key string) (io.ReadCloser, error) { - // Get state read lock - st.mutex.RLock() - defer st.mutex.RUnlock() - // Check not closed if st.store == nil { return nil, ErrStateClosed } // Pass request to store - return st.store.getStream(st.store.mutexMap.RLock, key) + return st.store.getStream(st.state.RLock, key) } func (st *StateRW) Put(key string, value []byte) error { - // Get state read lock - st.mutex.RLock() - defer st.mutex.RUnlock() - // Check not closed if st.store == nil { return ErrStateClosed } // Pass request to store - return st.store.put(st.store.mutexMap.Lock, key, value) + return st.store.put(st.state.Lock, key, value) } func (st *StateRW) PutStream(key string, r io.Reader) error { - // Get state read lock - st.mutex.RLock() - defer st.mutex.RUnlock() - // Check not closed if st.store == nil { return ErrStateClosed } // Pass request to store - return st.store.putStream(st.store.mutexMap.Lock, key, r) + return st.store.putStream(st.state.Lock, key, r) } func (st *StateRW) Has(key string) (bool, error) { - // Get state read lock - st.mutex.RLock() - defer st.mutex.RUnlock() - // Check not closed if st.store == nil { return false, ErrStateClosed } // Pass request to store - return st.store.has(st.store.mutexMap.RLock, key) + return st.store.has(st.state.RLock, key) } func (st *StateRW) Delete(key string) error { - // Get state read lock - st.mutex.RLock() - defer st.mutex.RUnlock() - // Check not closed if st.store == nil { return ErrStateClosed } // Pass request to store - return st.store.delete(st.store.mutexMap.Lock, key) + return st.store.delete(st.state.Lock, key) } func (st *StateRW) Release() { - // Get state write lock - st.mutex.Lock() - defer st.mutex.Unlock() - - // Release the store - if st.store != nil { - st.store.mutex.Unlock() - st.store = nil - } + st.state.UnlockMap() + st.store = nil } diff --git a/vendor/codeberg.org/gruf/go-store/kv/store.go b/vendor/codeberg.org/gruf/go-store/kv/store.go index 4c3a31140..a8741afe0 100644 --- a/vendor/codeberg.org/gruf/go-store/kv/store.go +++ b/vendor/codeberg.org/gruf/go-store/kv/store.go @@ -2,7 +2,6 @@ package kv import ( "io" - "sync" "codeberg.org/gruf/go-mutexes" "codeberg.org/gruf/go-store/storage" @@ -11,9 +10,8 @@ import ( // KVStore is a very simple, yet performant key-value store type KVStore struct { - mutexMap mutexes.MutexMap // mutexMap is a map of keys to mutexes to protect file access - mutex sync.RWMutex // mutex is the total store mutex - storage storage.Storage // storage is the underlying storage + mutex mutexes.MutexMap // mutex is a map of keys to mutexes to protect file access + storage storage.Storage // storage is the underlying storage } func OpenFile(path string, cfg *storage.DiskConfig) (*KVStore, error) { @@ -47,26 +45,19 @@ func OpenStorage(storage storage.Storage) (*KVStore, error) { // Return new KVStore return &KVStore{ - mutexMap: mutexes.NewMap(mutexes.NewRW), - mutex: sync.RWMutex{}, - storage: storage, + mutex: mutexes.NewMap(-1), + storage: storage, }, nil } // RLock acquires a read-lock on supplied key, returning unlock function. func (st *KVStore) RLock(key string) (runlock func()) { - st.mutex.RLock() - runlock = st.mutexMap.RLock(key) - st.mutex.RUnlock() - return runlock + return st.mutex.RLock(key) } // Lock acquires a write-lock on supplied key, returning unlock function. func (st *KVStore) Lock(key string) (unlock func()) { - st.mutex.Lock() - unlock = st.mutexMap.Lock(key) - st.mutex.Unlock() - return unlock + return st.mutex.Lock(key) } // Get fetches the bytes for supplied key in the store @@ -167,7 +158,7 @@ func (st *KVStore) Iterator(matchFn func(string) bool) (*KVIterator, error) { } // Get store read lock - st.mutex.RLock() + state := st.mutex.RLockMap() // Setup the walk keys function entries := []storage.StorageEntry{} @@ -184,24 +175,24 @@ func (st *KVStore) Iterator(matchFn func(string) bool) (*KVIterator, error) { // Walk keys in the storage err := st.storage.WalkKeys(storage.WalkKeysOptions{WalkFn: walkFn}) if err != nil { - st.mutex.RUnlock() + state.UnlockMap() return nil, err } // Return new iterator return &KVIterator{ store: st, + state: state, entries: entries, index: -1, key: "", - onClose: st.mutex.RUnlock, }, nil } // Read provides a read-only window to the store, holding it in a read-locked state until release func (st *KVStore) Read() *StateRO { - st.mutex.RLock() - return &StateRO{store: st} + state := st.mutex.RLockMap() + return &StateRO{store: st, state: state} } // ReadFn provides a read-only window to the store, holding it in a read-locked state until fn return. @@ -216,8 +207,8 @@ func (st *KVStore) ReadFn(fn func(*StateRO)) { // Update provides a read-write window to the store, holding it in a write-locked state until release func (st *KVStore) Update() *StateRW { - st.mutex.Lock() - return &StateRW{store: st} + state := st.mutex.LockMap() + return &StateRW{store: st, state: state} } // UpdateFn provides a read-write window to the store, holding it in a write-locked state until fn return. @@ -229,3 +220,8 @@ func (st *KVStore) UpdateFn(fn func(*StateRW)) { // Pass to fn fn(state) } + +// Close will close the underlying storage, the mutex map locking (e.g. RLock(), Lock() will still work). +func (st *KVStore) Close() error { + return st.storage.Close() +} diff --git a/vendor/codeberg.org/gruf/go-store/storage/block.go b/vendor/codeberg.org/gruf/go-store/storage/block.go index bc35b07ac..5075c7d17 100644 --- a/vendor/codeberg.org/gruf/go-store/storage/block.go +++ b/vendor/codeberg.org/gruf/go-store/storage/block.go @@ -1,6 +1,7 @@ package storage import ( + "crypto/sha256" "io" "io/fs" "os" @@ -13,7 +14,6 @@ import ( "codeberg.org/gruf/go-hashenc" "codeberg.org/gruf/go-pools" "codeberg.org/gruf/go-store/util" - "github.com/zeebo/blake3" ) var ( @@ -77,7 +77,7 @@ func getBlockConfig(cfg *BlockConfig) BlockConfig { // BlockStorage is a Storage implementation that stores input data as chunks on // a filesystem. Each value is chunked into blocks of configured size and these -// blocks are stored with name equal to their base64-encoded BLAKE3 hash-sum. A +// blocks are stored with name equal to their base64-encoded SHA256 hash-sum. A // "node" file is finally created containing an array of hashes contained within // this value type BlockStorage struct { @@ -87,7 +87,7 @@ type BlockStorage struct { config BlockConfig // cfg is the supplied configuration for this store hashPool sync.Pool // hashPool is this store's hashEncoder pool bufpool pools.BufferPool // bufpool is this store's bytes.Buffer pool - lock *LockableFile // lock is the opened lockfile for this storage instance + lock *Lock // lock is the opened lockfile for this storage instance // NOTE: // BlockStorage does not need to lock each of the underlying block files @@ -140,11 +140,9 @@ func OpenBlock(path string, cfg *BlockConfig) (*BlockStorage, error) { } // Open and acquire storage lock for path - lock, err := OpenLock(pb.Join(path, LockFile)) + lock, err := OpenLock(pb.Join(path, lockFile)) if err != nil { return nil, err - } else if err := lock.Lock(); err != nil { - return nil, err } // Figure out the largest size for bufpool slices @@ -174,14 +172,23 @@ func OpenBlock(path string, cfg *BlockConfig) (*BlockStorage, error) { // Clean implements storage.Clean() func (st *BlockStorage) Clean() error { - nodes := map[string]*node{} + // Track open + st.lock.Add() + defer st.lock.Done() + + // Check if open + if st.lock.Closed() { + return ErrClosed + } // Acquire path builder pb := util.GetPathBuilder() defer util.PutPathBuilder(pb) - // Walk nodes dir for entries + nodes := map[string]*node{} onceErr := errors.OnceError{} + + // Walk nodes dir for entries err := util.WalkDir(pb, st.nodePath, func(npath string, fsentry fs.DirEntry) { // Only deal with regular files if !fsentry.Type().IsRegular() { @@ -303,6 +310,7 @@ func (st *BlockStorage) ReadBytes(key string) ([]byte, error) { if err != nil { return nil, err } + defer rc.Close() // Read all bytes and return return io.ReadAll(rc) @@ -316,9 +324,19 @@ func (st *BlockStorage) ReadStream(key string) (io.ReadCloser, error) { return nil, err } + // Track open + st.lock.Add() + + // Check if open + if st.lock.Closed() { + st.lock.Done() + return nil, ErrClosed + } + // Attempt to open RO file file, err := open(npath, defaultFileROFlags) if err != nil { + st.lock.Done() return nil, err } defer file.Close() @@ -338,14 +356,16 @@ func (st *BlockStorage) ReadStream(key string) (io.ReadCloser, error) { nil, ) if err != nil { + st.lock.Done() return nil, err } - // Return new block reader - return util.NopReadCloser(&blockReader{ + // Prepare block reader and return + rc := util.NopReadCloser(&blockReader{ storage: st, node: &node, - }), nil + }) // we wrap the blockreader to decr lockfile waitgroup + return util.ReadCloserWithCallback(rc, st.lock.Done), nil } func (st *BlockStorage) readBlock(key string) ([]byte, error) { @@ -383,6 +403,15 @@ func (st *BlockStorage) WriteStream(key string, r io.Reader) error { return err } + // Track open + st.lock.Add() + defer st.lock.Done() + + // Check if open + if st.lock.Closed() { + return ErrClosed + } + // Check if this exists ok, err := stat(key) if err != nil { @@ -567,6 +596,15 @@ func (st *BlockStorage) Stat(key string) (bool, error) { return false, err } + // Track open + st.lock.Add() + defer st.lock.Done() + + // Check if open + if st.lock.Closed() { + return false, ErrClosed + } + // Check for file on disk return stat(kpath) } @@ -579,18 +617,35 @@ func (st *BlockStorage) Remove(key string) error { return err } + // Track open + st.lock.Add() + defer st.lock.Done() + + // Check if open + if st.lock.Closed() { + return ErrClosed + } + // Attempt to remove file return os.Remove(kpath) } // Close implements Storage.Close() func (st *BlockStorage) Close() error { - defer st.lock.Close() - return st.lock.Unlock() + return st.lock.Close() } // WalkKeys implements Storage.WalkKeys() func (st *BlockStorage) WalkKeys(opts WalkKeysOptions) error { + // Track open + st.lock.Add() + defer st.lock.Done() + + // Check if open + if st.lock.Closed() { + return ErrClosed + } + // Acquire path builder pb := util.GetPathBuilder() defer util.PutPathBuilder(pb) @@ -800,7 +855,7 @@ var ( // encodedHashLen is the once-calculated encoded hash-sum length encodedHashLen = base64Encoding.EncodedLen( - blake3.New().Size(), + sha256.New().Size(), ) ) @@ -812,9 +867,8 @@ type hashEncoder struct { // newHashEncoder returns a new hashEncoder instance func newHashEncoder() *hashEncoder { - hash := blake3.New() return &hashEncoder{ - henc: hashenc.New(hash, base64Encoding), + henc: hashenc.New(sha256.New(), base64Encoding), ebuf: make([]byte, encodedHashLen), } } diff --git a/vendor/codeberg.org/gruf/go-store/storage/disk.go b/vendor/codeberg.org/gruf/go-store/storage/disk.go index 9b5430437..2ee00ddee 100644 --- a/vendor/codeberg.org/gruf/go-store/storage/disk.go +++ b/vendor/codeberg.org/gruf/go-store/storage/disk.go @@ -71,7 +71,7 @@ type DiskStorage struct { path string // path is the root path of this store bufp pools.BufferPool // bufp is the buffer pool for this DiskStorage config DiskConfig // cfg is the supplied configuration for this store - lock *LockableFile // lock is the opened lockfile for this storage instance + lock *Lock // lock is the opened lockfile for this storage instance } // OpenFile opens a DiskStorage instance for given folder path and configuration @@ -118,11 +118,9 @@ func OpenFile(path string, cfg *DiskConfig) (*DiskStorage, error) { } // Open and acquire storage lock for path - lock, err := OpenLock(pb.Join(path, LockFile)) + lock, err := OpenLock(pb.Join(path, lockFile)) if err != nil { return nil, err - } else if err := lock.Lock(); err != nil { - return nil, err } // Return new DiskStorage @@ -136,6 +134,11 @@ func OpenFile(path string, cfg *DiskConfig) (*DiskStorage, error) { // Clean implements Storage.Clean() func (st *DiskStorage) Clean() error { + st.lock.Add() + defer st.lock.Done() + if st.lock.Closed() { + return ErrClosed + } return util.CleanDirs(st.path) } @@ -160,9 +163,18 @@ func (st *DiskStorage) ReadStream(key string) (io.ReadCloser, error) { return nil, err } + // Track open + st.lock.Add() + + // Check if open + if st.lock.Closed() { + return nil, ErrClosed + } + // Attempt to open file (replace ENOENT with our own) file, err := open(kpath, defaultFileROFlags) if err != nil { + st.lock.Done() return nil, errSwapNotFound(err) } @@ -170,12 +182,14 @@ func (st *DiskStorage) ReadStream(key string) (io.ReadCloser, error) { cFile, err := st.config.Compression.Reader(file) if err != nil { file.Close() // close this here, ignore error + st.lock.Done() return nil, err } // Wrap compressor to ensure file close return util.ReadCloserWithCallback(cFile, func() { file.Close() + st.lock.Done() }), nil } @@ -192,6 +206,15 @@ func (st *DiskStorage) WriteStream(key string, r io.Reader) error { return err } + // Track open + st.lock.Add() + defer st.lock.Done() + + // Check if open + if st.lock.Closed() { + return ErrClosed + } + // Ensure dirs leading up to file exist err = os.MkdirAll(path.Dir(kpath), defaultDirPerms) if err != nil { @@ -242,6 +265,15 @@ func (st *DiskStorage) Stat(key string) (bool, error) { return false, err } + // Track open + st.lock.Add() + defer st.lock.Done() + + // Check if open + if st.lock.Closed() { + return false, ErrClosed + } + // Check for file on disk return stat(kpath) } @@ -254,18 +286,35 @@ func (st *DiskStorage) Remove(key string) error { return err } + // Track open + st.lock.Add() + defer st.lock.Done() + + // Check if open + if st.lock.Closed() { + return ErrClosed + } + // Attempt to remove file return os.Remove(kpath) } // Close implements Storage.Close() func (st *DiskStorage) Close() error { - defer st.lock.Close() - return st.lock.Unlock() + return st.lock.Close() } // WalkKeys implements Storage.WalkKeys() func (st *DiskStorage) WalkKeys(opts WalkKeysOptions) error { + // Track open + st.lock.Add() + defer st.lock.Done() + + // Check if open + if st.lock.Closed() { + return ErrClosed + } + // Acquire path builder pb := util.GetPathBuilder() defer util.PutPathBuilder(pb) @@ -286,13 +335,13 @@ func (st *DiskStorage) WalkKeys(opts WalkKeysOptions) error { // filepath checks and returns a formatted filepath for given key func (st *DiskStorage) filepath(key string) (string, error) { + // Calculate transformed key path + key = st.config.Transform.KeyToPath(key) + // Acquire path builder pb := util.GetPathBuilder() defer util.PutPathBuilder(pb) - // Calculate transformed key path - key = st.config.Transform.KeyToPath(key) - // Generated joined root path pb.AppendString(st.path) pb.AppendString(key) diff --git a/vendor/codeberg.org/gruf/go-store/storage/errors.go b/vendor/codeberg.org/gruf/go-store/storage/errors.go index 016593596..ad2b742e6 100644 --- a/vendor/codeberg.org/gruf/go-store/storage/errors.go +++ b/vendor/codeberg.org/gruf/go-store/storage/errors.go @@ -19,6 +19,9 @@ func (e errorString) Extend(s string, a ...interface{}) errorString { } var ( + // ErrClosed is returned on operations on a closed storage + ErrClosed = errorString("store/storage: closed") + // ErrNotFound is the error returned when a key cannot be found in storage ErrNotFound = errorString("store/storage: key not found") @@ -39,6 +42,9 @@ var ( // errCorruptNodes is returned when nodes with missing blocks are found during a BlockStorage clean errCorruptNodes = errorString("store/storage: corrupted nodes") + + // ErrAlreadyLocked is returned on fail opening a storage lockfile + ErrAlreadyLocked = errorString("store/storage: storage lock already open") ) // errSwapNoop performs no error swaps @@ -61,3 +67,11 @@ func errSwapExist(err error) error { } return err } + +// errSwapUnavailable swaps syscall.EAGAIN for ErrAlreadyLocked +func errSwapUnavailable(err error) error { + if err == syscall.EAGAIN { + return ErrAlreadyLocked + } + return err +} diff --git a/vendor/codeberg.org/gruf/go-store/storage/fs.go b/vendor/codeberg.org/gruf/go-store/storage/fs.go index ff4c857c3..b1c3560d2 100644 --- a/vendor/codeberg.org/gruf/go-store/storage/fs.go +++ b/vendor/codeberg.org/gruf/go-store/storage/fs.go @@ -8,11 +8,14 @@ import ( ) const ( - defaultDirPerms = 0755 - defaultFilePerms = 0644 + // default file permission bits + defaultDirPerms = 0755 + defaultFilePerms = 0644 + + // default file open flags defaultFileROFlags = syscall.O_RDONLY defaultFileRWFlags = syscall.O_CREAT | syscall.O_RDWR - defaultFileLockFlags = syscall.O_RDONLY | syscall.O_EXCL | syscall.O_CREAT + defaultFileLockFlags = syscall.O_RDONLY | syscall.O_CREAT ) // NOTE: diff --git a/vendor/codeberg.org/gruf/go-store/storage/lock.go b/vendor/codeberg.org/gruf/go-store/storage/lock.go index a757830cc..fae4351bf 100644 --- a/vendor/codeberg.org/gruf/go-store/storage/lock.go +++ b/vendor/codeberg.org/gruf/go-store/storage/lock.go @@ -1,38 +1,81 @@ package storage import ( - "os" + "sync" + "sync/atomic" "syscall" "codeberg.org/gruf/go-store/util" ) -// LockFile is our standard lockfile name. -const LockFile = "store.lock" +// lockFile is our standard lockfile name. +var lockFile = "store.lock" -type LockableFile struct { - *os.File +// IsLockKey returns whether storage key is our lockfile. +func IsLockKey(key string) bool { + return key == lockFile +} + +// Lock represents a filesystem lock to ensure only one storage instance open per path. +type Lock struct { + fd int + wg sync.WaitGroup + st uint32 } // OpenLock opens a lockfile at path. -func OpenLock(path string) (*LockableFile, error) { - file, err := open(path, defaultFileLockFlags) +func OpenLock(path string) (*Lock, error) { + var fd int + + // Open the file descriptor at path + err := util.RetryOnEINTR(func() (err error) { + fd, err = syscall.Open(path, defaultFileLockFlags, defaultFilePerms) + return + }) if err != nil { return nil, err } - return &LockableFile{file}, nil + + // Get a flock on the file descriptor + err = util.RetryOnEINTR(func() error { + return syscall.Flock(fd, syscall.LOCK_EX|syscall.LOCK_NB) + }) + if err != nil { + return nil, errSwapUnavailable(err) + } + + return &Lock{fd: fd}, nil } -func (f *LockableFile) Lock() error { - return f.flock(syscall.LOCK_EX | syscall.LOCK_NB) +// Add will add '1' to the underlying sync.WaitGroup. +func (f *Lock) Add() { + f.wg.Add(1) } -func (f *LockableFile) Unlock() error { - return f.flock(syscall.LOCK_UN | syscall.LOCK_NB) +// Done will decrememnt '1' from the underlying sync.WaitGroup. +func (f *Lock) Done() { + f.wg.Done() } -func (f *LockableFile) flock(how int) error { - return util.RetryOnEINTR(func() error { - return syscall.Flock(int(f.Fd()), how) - }) +// Close will attempt to close the lockfile and file descriptor. +func (f *Lock) Close() error { + var err error + if atomic.CompareAndSwapUint32(&f.st, 0, 1) { + // Wait until done + f.wg.Wait() + + // Ensure gets closed + defer syscall.Close(f.fd) + + // Call funlock on the file descriptor + err = util.RetryOnEINTR(func() error { + return syscall.Flock(f.fd, syscall.LOCK_UN|syscall.LOCK_NB) + }) + } + return err +} + +// Closed will return whether this lockfile has been closed (and unlocked). +func (f *Lock) Closed() bool { + return (atomic.LoadUint32(&f.st) == 1) } diff --git a/vendor/codeberg.org/gruf/go-store/storage/memory.go b/vendor/codeberg.org/gruf/go-store/storage/memory.go index 7daa4a483..2dab562d6 100644 --- a/vendor/codeberg.org/gruf/go-store/storage/memory.go +++ b/vendor/codeberg.org/gruf/go-store/storage/memory.go @@ -14,6 +14,7 @@ type MemoryStorage struct { ow bool // overwrites fs map[string][]byte mu sync.Mutex + st uint32 } // OpenMemory opens a new MemoryStorage instance with internal map of 'size'. @@ -27,13 +28,26 @@ func OpenMemory(size int, overwrites bool) *MemoryStorage { // Clean implements Storage.Clean(). func (st *MemoryStorage) Clean() error { + st.mu.Lock() + defer st.mu.Unlock() + if st.st == 1 { + return ErrClosed + } return nil } // ReadBytes implements Storage.ReadBytes(). func (st *MemoryStorage) ReadBytes(key string) ([]byte, error) { - // Safely check store + // Lock storage st.mu.Lock() + + // Check store open + if st.st == 1 { + st.mu.Unlock() + return nil, ErrClosed + } + + // Check for key b, ok := st.fs[key] st.mu.Unlock() @@ -48,8 +62,16 @@ func (st *MemoryStorage) ReadBytes(key string) ([]byte, error) { // ReadStream implements Storage.ReadStream(). func (st *MemoryStorage) ReadStream(key string) (io.ReadCloser, error) { - // Safely check store + // Lock storage st.mu.Lock() + + // Check store open + if st.st == 1 { + st.mu.Unlock() + return nil, ErrClosed + } + + // Check for key b, ok := st.fs[key] st.mu.Unlock() @@ -66,19 +88,24 @@ func (st *MemoryStorage) ReadStream(key string) (io.ReadCloser, error) { // WriteBytes implements Storage.WriteBytes(). func (st *MemoryStorage) WriteBytes(key string, b []byte) error { - // Safely check store + // Lock storage st.mu.Lock() + defer st.mu.Unlock() + + // Check store open + if st.st == 1 { + return ErrClosed + } + _, ok := st.fs[key] // Check for already exist if ok && !st.ow { - st.mu.Unlock() return ErrAlreadyExists } // Write + unlock st.fs[key] = bytes.Copy(b) - st.mu.Unlock() return nil } @@ -96,43 +123,66 @@ func (st *MemoryStorage) WriteStream(key string, r io.Reader) error { // Stat implements Storage.Stat(). func (st *MemoryStorage) Stat(key string) (bool, error) { + // Lock storage st.mu.Lock() + defer st.mu.Unlock() + + // Check store open + if st.st == 1 { + return false, ErrClosed + } + + // Check for key _, ok := st.fs[key] - st.mu.Unlock() return ok, nil } // Remove implements Storage.Remove(). func (st *MemoryStorage) Remove(key string) error { - // Safely check store + // Lock storage st.mu.Lock() - _, ok := st.fs[key] + defer st.mu.Unlock() - // Check in store + // Check store open + if st.st == 1 { + return ErrClosed + } + + // Check for key + _, ok := st.fs[key] if !ok { - st.mu.Unlock() return ErrNotFound } - // Delete + unlock + // Remove from store delete(st.fs, key) - st.mu.Unlock() + return nil } // Close implements Storage.Close(). func (st *MemoryStorage) Close() error { + st.mu.Lock() + st.st = 1 + st.mu.Unlock() return nil } // WalkKeys implements Storage.WalkKeys(). func (st *MemoryStorage) WalkKeys(opts WalkKeysOptions) error { - // Safely walk storage keys + // Lock storage st.mu.Lock() + defer st.mu.Unlock() + + // Check store open + if st.st == 1 { + return ErrClosed + } + + // Walk store keys for key := range st.fs { opts.WalkFn(entry(key)) } - st.mu.Unlock() return nil } |