summaryrefslogtreecommitdiff
path: root/vendor/codeberg.org/gruf/go-store/storage/disk.go
diff options
context:
space:
mode:
Diffstat (limited to 'vendor/codeberg.org/gruf/go-store/storage/disk.go')
-rw-r--r--vendor/codeberg.org/gruf/go-store/storage/disk.go67
1 files changed, 58 insertions, 9 deletions
diff --git a/vendor/codeberg.org/gruf/go-store/storage/disk.go b/vendor/codeberg.org/gruf/go-store/storage/disk.go
index 9b5430437..2ee00ddee 100644
--- a/vendor/codeberg.org/gruf/go-store/storage/disk.go
+++ b/vendor/codeberg.org/gruf/go-store/storage/disk.go
@@ -71,7 +71,7 @@ type DiskStorage struct {
path string // path is the root path of this store
bufp pools.BufferPool // bufp is the buffer pool for this DiskStorage
config DiskConfig // cfg is the supplied configuration for this store
- lock *LockableFile // lock is the opened lockfile for this storage instance
+ lock *Lock // lock is the opened lockfile for this storage instance
}
// OpenFile opens a DiskStorage instance for given folder path and configuration
@@ -118,11 +118,9 @@ func OpenFile(path string, cfg *DiskConfig) (*DiskStorage, error) {
}
// Open and acquire storage lock for path
- lock, err := OpenLock(pb.Join(path, LockFile))
+ lock, err := OpenLock(pb.Join(path, lockFile))
if err != nil {
return nil, err
- } else if err := lock.Lock(); err != nil {
- return nil, err
}
// Return new DiskStorage
@@ -136,6 +134,11 @@ func OpenFile(path string, cfg *DiskConfig) (*DiskStorage, error) {
// Clean implements Storage.Clean()
func (st *DiskStorage) Clean() error {
+ st.lock.Add()
+ defer st.lock.Done()
+ if st.lock.Closed() {
+ return ErrClosed
+ }
return util.CleanDirs(st.path)
}
@@ -160,9 +163,18 @@ func (st *DiskStorage) ReadStream(key string) (io.ReadCloser, error) {
return nil, err
}
+ // Track open
+ st.lock.Add()
+
+ // Check if open
+ if st.lock.Closed() {
+ return nil, ErrClosed
+ }
+
// Attempt to open file (replace ENOENT with our own)
file, err := open(kpath, defaultFileROFlags)
if err != nil {
+ st.lock.Done()
return nil, errSwapNotFound(err)
}
@@ -170,12 +182,14 @@ func (st *DiskStorage) ReadStream(key string) (io.ReadCloser, error) {
cFile, err := st.config.Compression.Reader(file)
if err != nil {
file.Close() // close this here, ignore error
+ st.lock.Done()
return nil, err
}
// Wrap compressor to ensure file close
return util.ReadCloserWithCallback(cFile, func() {
file.Close()
+ st.lock.Done()
}), nil
}
@@ -192,6 +206,15 @@ func (st *DiskStorage) WriteStream(key string, r io.Reader) error {
return err
}
+ // Track open
+ st.lock.Add()
+ defer st.lock.Done()
+
+ // Check if open
+ if st.lock.Closed() {
+ return ErrClosed
+ }
+
// Ensure dirs leading up to file exist
err = os.MkdirAll(path.Dir(kpath), defaultDirPerms)
if err != nil {
@@ -242,6 +265,15 @@ func (st *DiskStorage) Stat(key string) (bool, error) {
return false, err
}
+ // Track open
+ st.lock.Add()
+ defer st.lock.Done()
+
+ // Check if open
+ if st.lock.Closed() {
+ return false, ErrClosed
+ }
+
// Check for file on disk
return stat(kpath)
}
@@ -254,18 +286,35 @@ func (st *DiskStorage) Remove(key string) error {
return err
}
+ // Track open
+ st.lock.Add()
+ defer st.lock.Done()
+
+ // Check if open
+ if st.lock.Closed() {
+ return ErrClosed
+ }
+
// Attempt to remove file
return os.Remove(kpath)
}
// Close implements Storage.Close()
func (st *DiskStorage) Close() error {
- defer st.lock.Close()
- return st.lock.Unlock()
+ return st.lock.Close()
}
// WalkKeys implements Storage.WalkKeys()
func (st *DiskStorage) WalkKeys(opts WalkKeysOptions) error {
+ // Track open
+ st.lock.Add()
+ defer st.lock.Done()
+
+ // Check if open
+ if st.lock.Closed() {
+ return ErrClosed
+ }
+
// Acquire path builder
pb := util.GetPathBuilder()
defer util.PutPathBuilder(pb)
@@ -286,13 +335,13 @@ func (st *DiskStorage) WalkKeys(opts WalkKeysOptions) error {
// filepath checks and returns a formatted filepath for given key
func (st *DiskStorage) filepath(key string) (string, error) {
+ // Calculate transformed key path
+ key = st.config.Transform.KeyToPath(key)
+
// Acquire path builder
pb := util.GetPathBuilder()
defer util.PutPathBuilder(pb)
- // Calculate transformed key path
- key = st.config.Transform.KeyToPath(key)
-
// Generated joined root path
pb.AppendString(st.path)
pb.AppendString(key)