summaryrefslogtreecommitdiff
path: root/vendor/codeberg.org/gruf/go-store/storage
diff options
context:
space:
mode:
authorLibravatar tsmethurst <tobi.smethurst@protonmail.com>2022-01-24 17:35:13 +0100
committerLibravatar tsmethurst <tobi.smethurst@protonmail.com>2022-01-24 17:35:13 +0100
commitf28cf793ee53e8391c9eabbfba93afbc5b59936b (patch)
treefcd6ec6fc4fa011017fb30fce0f52fc947a4d226 /vendor/codeberg.org/gruf/go-store/storage
parentupdate remote account get/deref logic (diff)
downloadgotosocial-f28cf793ee53e8391c9eabbfba93afbc5b59936b.tar.xz
upgrade go-store
Diffstat (limited to 'vendor/codeberg.org/gruf/go-store/storage')
-rw-r--r--vendor/codeberg.org/gruf/go-store/storage/block.go86
-rw-r--r--vendor/codeberg.org/gruf/go-store/storage/disk.go67
-rw-r--r--vendor/codeberg.org/gruf/go-store/storage/errors.go14
-rw-r--r--vendor/codeberg.org/gruf/go-store/storage/fs.go9
-rw-r--r--vendor/codeberg.org/gruf/go-store/storage/lock.go75
-rw-r--r--vendor/codeberg.org/gruf/go-store/storage/memory.go78
6 files changed, 271 insertions, 58 deletions
diff --git a/vendor/codeberg.org/gruf/go-store/storage/block.go b/vendor/codeberg.org/gruf/go-store/storage/block.go
index bc35b07ac..5075c7d17 100644
--- a/vendor/codeberg.org/gruf/go-store/storage/block.go
+++ b/vendor/codeberg.org/gruf/go-store/storage/block.go
@@ -1,6 +1,7 @@
package storage
import (
+ "crypto/sha256"
"io"
"io/fs"
"os"
@@ -13,7 +14,6 @@ import (
"codeberg.org/gruf/go-hashenc"
"codeberg.org/gruf/go-pools"
"codeberg.org/gruf/go-store/util"
- "github.com/zeebo/blake3"
)
var (
@@ -77,7 +77,7 @@ func getBlockConfig(cfg *BlockConfig) BlockConfig {
// BlockStorage is a Storage implementation that stores input data as chunks on
// a filesystem. Each value is chunked into blocks of configured size and these
-// blocks are stored with name equal to their base64-encoded BLAKE3 hash-sum. A
+// blocks are stored with name equal to their base64-encoded SHA256 hash-sum. A
// "node" file is finally created containing an array of hashes contained within
// this value
type BlockStorage struct {
@@ -87,7 +87,7 @@ type BlockStorage struct {
config BlockConfig // cfg is the supplied configuration for this store
hashPool sync.Pool // hashPool is this store's hashEncoder pool
bufpool pools.BufferPool // bufpool is this store's bytes.Buffer pool
- lock *LockableFile // lock is the opened lockfile for this storage instance
+ lock *Lock // lock is the opened lockfile for this storage instance
// NOTE:
// BlockStorage does not need to lock each of the underlying block files
@@ -140,11 +140,9 @@ func OpenBlock(path string, cfg *BlockConfig) (*BlockStorage, error) {
}
// Open and acquire storage lock for path
- lock, err := OpenLock(pb.Join(path, LockFile))
+ lock, err := OpenLock(pb.Join(path, lockFile))
if err != nil {
return nil, err
- } else if err := lock.Lock(); err != nil {
- return nil, err
}
// Figure out the largest size for bufpool slices
@@ -174,14 +172,23 @@ func OpenBlock(path string, cfg *BlockConfig) (*BlockStorage, error) {
// Clean implements storage.Clean()
func (st *BlockStorage) Clean() error {
- nodes := map[string]*node{}
+ // Track open
+ st.lock.Add()
+ defer st.lock.Done()
+
+ // Check if open
+ if st.lock.Closed() {
+ return ErrClosed
+ }
// Acquire path builder
pb := util.GetPathBuilder()
defer util.PutPathBuilder(pb)
- // Walk nodes dir for entries
+ nodes := map[string]*node{}
onceErr := errors.OnceError{}
+
+ // Walk nodes dir for entries
err := util.WalkDir(pb, st.nodePath, func(npath string, fsentry fs.DirEntry) {
// Only deal with regular files
if !fsentry.Type().IsRegular() {
@@ -303,6 +310,7 @@ func (st *BlockStorage) ReadBytes(key string) ([]byte, error) {
if err != nil {
return nil, err
}
+ defer rc.Close()
// Read all bytes and return
return io.ReadAll(rc)
@@ -316,9 +324,19 @@ func (st *BlockStorage) ReadStream(key string) (io.ReadCloser, error) {
return nil, err
}
+ // Track open
+ st.lock.Add()
+
+ // Check if open
+ if st.lock.Closed() {
+ st.lock.Done()
+ return nil, ErrClosed
+ }
+
// Attempt to open RO file
file, err := open(npath, defaultFileROFlags)
if err != nil {
+ st.lock.Done()
return nil, err
}
defer file.Close()
@@ -338,14 +356,16 @@ func (st *BlockStorage) ReadStream(key string) (io.ReadCloser, error) {
nil,
)
if err != nil {
+ st.lock.Done()
return nil, err
}
- // Return new block reader
- return util.NopReadCloser(&blockReader{
+ // Prepare block reader and return
+ rc := util.NopReadCloser(&blockReader{
storage: st,
node: &node,
- }), nil
+ }) // we wrap the blockreader to decr lockfile waitgroup
+ return util.ReadCloserWithCallback(rc, st.lock.Done), nil
}
func (st *BlockStorage) readBlock(key string) ([]byte, error) {
@@ -383,6 +403,15 @@ func (st *BlockStorage) WriteStream(key string, r io.Reader) error {
return err
}
+ // Track open
+ st.lock.Add()
+ defer st.lock.Done()
+
+ // Check if open
+ if st.lock.Closed() {
+ return ErrClosed
+ }
+
// Check if this exists
ok, err := stat(key)
if err != nil {
@@ -567,6 +596,15 @@ func (st *BlockStorage) Stat(key string) (bool, error) {
return false, err
}
+ // Track open
+ st.lock.Add()
+ defer st.lock.Done()
+
+ // Check if open
+ if st.lock.Closed() {
+ return false, ErrClosed
+ }
+
// Check for file on disk
return stat(kpath)
}
@@ -579,18 +617,35 @@ func (st *BlockStorage) Remove(key string) error {
return err
}
+ // Track open
+ st.lock.Add()
+ defer st.lock.Done()
+
+ // Check if open
+ if st.lock.Closed() {
+ return ErrClosed
+ }
+
// Attempt to remove file
return os.Remove(kpath)
}
// Close implements Storage.Close()
func (st *BlockStorage) Close() error {
- defer st.lock.Close()
- return st.lock.Unlock()
+ return st.lock.Close()
}
// WalkKeys implements Storage.WalkKeys()
func (st *BlockStorage) WalkKeys(opts WalkKeysOptions) error {
+ // Track open
+ st.lock.Add()
+ defer st.lock.Done()
+
+ // Check if open
+ if st.lock.Closed() {
+ return ErrClosed
+ }
+
// Acquire path builder
pb := util.GetPathBuilder()
defer util.PutPathBuilder(pb)
@@ -800,7 +855,7 @@ var (
// encodedHashLen is the once-calculated encoded hash-sum length
encodedHashLen = base64Encoding.EncodedLen(
- blake3.New().Size(),
+ sha256.New().Size(),
)
)
@@ -812,9 +867,8 @@ type hashEncoder struct {
// newHashEncoder returns a new hashEncoder instance
func newHashEncoder() *hashEncoder {
- hash := blake3.New()
return &hashEncoder{
- henc: hashenc.New(hash, base64Encoding),
+ henc: hashenc.New(sha256.New(), base64Encoding),
ebuf: make([]byte, encodedHashLen),
}
}
diff --git a/vendor/codeberg.org/gruf/go-store/storage/disk.go b/vendor/codeberg.org/gruf/go-store/storage/disk.go
index 9b5430437..2ee00ddee 100644
--- a/vendor/codeberg.org/gruf/go-store/storage/disk.go
+++ b/vendor/codeberg.org/gruf/go-store/storage/disk.go
@@ -71,7 +71,7 @@ type DiskStorage struct {
path string // path is the root path of this store
bufp pools.BufferPool // bufp is the buffer pool for this DiskStorage
config DiskConfig // cfg is the supplied configuration for this store
- lock *LockableFile // lock is the opened lockfile for this storage instance
+ lock *Lock // lock is the opened lockfile for this storage instance
}
// OpenFile opens a DiskStorage instance for given folder path and configuration
@@ -118,11 +118,9 @@ func OpenFile(path string, cfg *DiskConfig) (*DiskStorage, error) {
}
// Open and acquire storage lock for path
- lock, err := OpenLock(pb.Join(path, LockFile))
+ lock, err := OpenLock(pb.Join(path, lockFile))
if err != nil {
return nil, err
- } else if err := lock.Lock(); err != nil {
- return nil, err
}
// Return new DiskStorage
@@ -136,6 +134,11 @@ func OpenFile(path string, cfg *DiskConfig) (*DiskStorage, error) {
// Clean implements Storage.Clean()
func (st *DiskStorage) Clean() error {
+ st.lock.Add()
+ defer st.lock.Done()
+ if st.lock.Closed() {
+ return ErrClosed
+ }
return util.CleanDirs(st.path)
}
@@ -160,9 +163,18 @@ func (st *DiskStorage) ReadStream(key string) (io.ReadCloser, error) {
return nil, err
}
+ // Track open
+ st.lock.Add()
+
+ // Check if open
+ if st.lock.Closed() {
+ return nil, ErrClosed
+ }
+
// Attempt to open file (replace ENOENT with our own)
file, err := open(kpath, defaultFileROFlags)
if err != nil {
+ st.lock.Done()
return nil, errSwapNotFound(err)
}
@@ -170,12 +182,14 @@ func (st *DiskStorage) ReadStream(key string) (io.ReadCloser, error) {
cFile, err := st.config.Compression.Reader(file)
if err != nil {
file.Close() // close this here, ignore error
+ st.lock.Done()
return nil, err
}
// Wrap compressor to ensure file close
return util.ReadCloserWithCallback(cFile, func() {
file.Close()
+ st.lock.Done()
}), nil
}
@@ -192,6 +206,15 @@ func (st *DiskStorage) WriteStream(key string, r io.Reader) error {
return err
}
+ // Track open
+ st.lock.Add()
+ defer st.lock.Done()
+
+ // Check if open
+ if st.lock.Closed() {
+ return ErrClosed
+ }
+
// Ensure dirs leading up to file exist
err = os.MkdirAll(path.Dir(kpath), defaultDirPerms)
if err != nil {
@@ -242,6 +265,15 @@ func (st *DiskStorage) Stat(key string) (bool, error) {
return false, err
}
+ // Track open
+ st.lock.Add()
+ defer st.lock.Done()
+
+ // Check if open
+ if st.lock.Closed() {
+ return false, ErrClosed
+ }
+
// Check for file on disk
return stat(kpath)
}
@@ -254,18 +286,35 @@ func (st *DiskStorage) Remove(key string) error {
return err
}
+ // Track open
+ st.lock.Add()
+ defer st.lock.Done()
+
+ // Check if open
+ if st.lock.Closed() {
+ return ErrClosed
+ }
+
// Attempt to remove file
return os.Remove(kpath)
}
// Close implements Storage.Close()
func (st *DiskStorage) Close() error {
- defer st.lock.Close()
- return st.lock.Unlock()
+ return st.lock.Close()
}
// WalkKeys implements Storage.WalkKeys()
func (st *DiskStorage) WalkKeys(opts WalkKeysOptions) error {
+ // Track open
+ st.lock.Add()
+ defer st.lock.Done()
+
+ // Check if open
+ if st.lock.Closed() {
+ return ErrClosed
+ }
+
// Acquire path builder
pb := util.GetPathBuilder()
defer util.PutPathBuilder(pb)
@@ -286,13 +335,13 @@ func (st *DiskStorage) WalkKeys(opts WalkKeysOptions) error {
// filepath checks and returns a formatted filepath for given key
func (st *DiskStorage) filepath(key string) (string, error) {
+ // Calculate transformed key path
+ key = st.config.Transform.KeyToPath(key)
+
// Acquire path builder
pb := util.GetPathBuilder()
defer util.PutPathBuilder(pb)
- // Calculate transformed key path
- key = st.config.Transform.KeyToPath(key)
-
// Generated joined root path
pb.AppendString(st.path)
pb.AppendString(key)
diff --git a/vendor/codeberg.org/gruf/go-store/storage/errors.go b/vendor/codeberg.org/gruf/go-store/storage/errors.go
index 016593596..ad2b742e6 100644
--- a/vendor/codeberg.org/gruf/go-store/storage/errors.go
+++ b/vendor/codeberg.org/gruf/go-store/storage/errors.go
@@ -19,6 +19,9 @@ func (e errorString) Extend(s string, a ...interface{}) errorString {
}
var (
+ // ErrClosed is returned on operations on a closed storage
+ ErrClosed = errorString("store/storage: closed")
+
// ErrNotFound is the error returned when a key cannot be found in storage
ErrNotFound = errorString("store/storage: key not found")
@@ -39,6 +42,9 @@ var (
// errCorruptNodes is returned when nodes with missing blocks are found during a BlockStorage clean
errCorruptNodes = errorString("store/storage: corrupted nodes")
+
+ // ErrAlreadyLocked is returned on fail opening a storage lockfile
+ ErrAlreadyLocked = errorString("store/storage: storage lock already open")
)
// errSwapNoop performs no error swaps
@@ -61,3 +67,11 @@ func errSwapExist(err error) error {
}
return err
}
+
+// errSwapUnavailable swaps syscall.EAGAIN for ErrAlreadyLocked
+func errSwapUnavailable(err error) error {
+ if err == syscall.EAGAIN {
+ return ErrAlreadyLocked
+ }
+ return err
+}
diff --git a/vendor/codeberg.org/gruf/go-store/storage/fs.go b/vendor/codeberg.org/gruf/go-store/storage/fs.go
index ff4c857c3..b1c3560d2 100644
--- a/vendor/codeberg.org/gruf/go-store/storage/fs.go
+++ b/vendor/codeberg.org/gruf/go-store/storage/fs.go
@@ -8,11 +8,14 @@ import (
)
const (
- defaultDirPerms = 0755
- defaultFilePerms = 0644
+ // default file permission bits
+ defaultDirPerms = 0755
+ defaultFilePerms = 0644
+
+ // default file open flags
defaultFileROFlags = syscall.O_RDONLY
defaultFileRWFlags = syscall.O_CREAT | syscall.O_RDWR
- defaultFileLockFlags = syscall.O_RDONLY | syscall.O_EXCL | syscall.O_CREAT
+ defaultFileLockFlags = syscall.O_RDONLY | syscall.O_CREAT
)
// NOTE:
diff --git a/vendor/codeberg.org/gruf/go-store/storage/lock.go b/vendor/codeberg.org/gruf/go-store/storage/lock.go
index a757830cc..fae4351bf 100644
--- a/vendor/codeberg.org/gruf/go-store/storage/lock.go
+++ b/vendor/codeberg.org/gruf/go-store/storage/lock.go
@@ -1,38 +1,81 @@
package storage
import (
- "os"
+ "sync"
+ "sync/atomic"
"syscall"
"codeberg.org/gruf/go-store/util"
)
-// LockFile is our standard lockfile name.
-const LockFile = "store.lock"
+// lockFile is our standard lockfile name.
+var lockFile = "store.lock"
-type LockableFile struct {
- *os.File
+// IsLockKey returns whether storage key is our lockfile.
+func IsLockKey(key string) bool {
+ return key == lockFile
+}
+
+// Lock represents a filesystem lock to ensure only one storage instance open per path.
+type Lock struct {
+ fd int
+ wg sync.WaitGroup
+ st uint32
}
// OpenLock opens a lockfile at path.
-func OpenLock(path string) (*LockableFile, error) {
- file, err := open(path, defaultFileLockFlags)
+func OpenLock(path string) (*Lock, error) {
+ var fd int
+
+ // Open the file descriptor at path
+ err := util.RetryOnEINTR(func() (err error) {
+ fd, err = syscall.Open(path, defaultFileLockFlags, defaultFilePerms)
+ return
+ })
if err != nil {
return nil, err
}
- return &LockableFile{file}, nil
+
+ // Get a flock on the file descriptor
+ err = util.RetryOnEINTR(func() error {
+ return syscall.Flock(fd, syscall.LOCK_EX|syscall.LOCK_NB)
+ })
+ if err != nil {
+ return nil, errSwapUnavailable(err)
+ }
+
+ return &Lock{fd: fd}, nil
}
-func (f *LockableFile) Lock() error {
- return f.flock(syscall.LOCK_EX | syscall.LOCK_NB)
+// Add will add '1' to the underlying sync.WaitGroup.
+func (f *Lock) Add() {
+ f.wg.Add(1)
}
-func (f *LockableFile) Unlock() error {
- return f.flock(syscall.LOCK_UN | syscall.LOCK_NB)
+// Done will decrememnt '1' from the underlying sync.WaitGroup.
+func (f *Lock) Done() {
+ f.wg.Done()
}
-func (f *LockableFile) flock(how int) error {
- return util.RetryOnEINTR(func() error {
- return syscall.Flock(int(f.Fd()), how)
- })
+// Close will attempt to close the lockfile and file descriptor.
+func (f *Lock) Close() error {
+ var err error
+ if atomic.CompareAndSwapUint32(&f.st, 0, 1) {
+ // Wait until done
+ f.wg.Wait()
+
+ // Ensure gets closed
+ defer syscall.Close(f.fd)
+
+ // Call funlock on the file descriptor
+ err = util.RetryOnEINTR(func() error {
+ return syscall.Flock(f.fd, syscall.LOCK_UN|syscall.LOCK_NB)
+ })
+ }
+ return err
+}
+
+// Closed will return whether this lockfile has been closed (and unlocked).
+func (f *Lock) Closed() bool {
+ return (atomic.LoadUint32(&f.st) == 1)
}
diff --git a/vendor/codeberg.org/gruf/go-store/storage/memory.go b/vendor/codeberg.org/gruf/go-store/storage/memory.go
index 7daa4a483..2dab562d6 100644
--- a/vendor/codeberg.org/gruf/go-store/storage/memory.go
+++ b/vendor/codeberg.org/gruf/go-store/storage/memory.go
@@ -14,6 +14,7 @@ type MemoryStorage struct {
ow bool // overwrites
fs map[string][]byte
mu sync.Mutex
+ st uint32
}
// OpenMemory opens a new MemoryStorage instance with internal map of 'size'.
@@ -27,13 +28,26 @@ func OpenMemory(size int, overwrites bool) *MemoryStorage {
// Clean implements Storage.Clean().
func (st *MemoryStorage) Clean() error {
+ st.mu.Lock()
+ defer st.mu.Unlock()
+ if st.st == 1 {
+ return ErrClosed
+ }
return nil
}
// ReadBytes implements Storage.ReadBytes().
func (st *MemoryStorage) ReadBytes(key string) ([]byte, error) {
- // Safely check store
+ // Lock storage
st.mu.Lock()
+
+ // Check store open
+ if st.st == 1 {
+ st.mu.Unlock()
+ return nil, ErrClosed
+ }
+
+ // Check for key
b, ok := st.fs[key]
st.mu.Unlock()
@@ -48,8 +62,16 @@ func (st *MemoryStorage) ReadBytes(key string) ([]byte, error) {
// ReadStream implements Storage.ReadStream().
func (st *MemoryStorage) ReadStream(key string) (io.ReadCloser, error) {
- // Safely check store
+ // Lock storage
st.mu.Lock()
+
+ // Check store open
+ if st.st == 1 {
+ st.mu.Unlock()
+ return nil, ErrClosed
+ }
+
+ // Check for key
b, ok := st.fs[key]
st.mu.Unlock()
@@ -66,19 +88,24 @@ func (st *MemoryStorage) ReadStream(key string) (io.ReadCloser, error) {
// WriteBytes implements Storage.WriteBytes().
func (st *MemoryStorage) WriteBytes(key string, b []byte) error {
- // Safely check store
+ // Lock storage
st.mu.Lock()
+ defer st.mu.Unlock()
+
+ // Check store open
+ if st.st == 1 {
+ return ErrClosed
+ }
+
_, ok := st.fs[key]
// Check for already exist
if ok && !st.ow {
- st.mu.Unlock()
return ErrAlreadyExists
}
// Write + unlock
st.fs[key] = bytes.Copy(b)
- st.mu.Unlock()
return nil
}
@@ -96,43 +123,66 @@ func (st *MemoryStorage) WriteStream(key string, r io.Reader) error {
// Stat implements Storage.Stat().
func (st *MemoryStorage) Stat(key string) (bool, error) {
+ // Lock storage
st.mu.Lock()
+ defer st.mu.Unlock()
+
+ // Check store open
+ if st.st == 1 {
+ return false, ErrClosed
+ }
+
+ // Check for key
_, ok := st.fs[key]
- st.mu.Unlock()
return ok, nil
}
// Remove implements Storage.Remove().
func (st *MemoryStorage) Remove(key string) error {
- // Safely check store
+ // Lock storage
st.mu.Lock()
- _, ok := st.fs[key]
+ defer st.mu.Unlock()
- // Check in store
+ // Check store open
+ if st.st == 1 {
+ return ErrClosed
+ }
+
+ // Check for key
+ _, ok := st.fs[key]
if !ok {
- st.mu.Unlock()
return ErrNotFound
}
- // Delete + unlock
+ // Remove from store
delete(st.fs, key)
- st.mu.Unlock()
+
return nil
}
// Close implements Storage.Close().
func (st *MemoryStorage) Close() error {
+ st.mu.Lock()
+ st.st = 1
+ st.mu.Unlock()
return nil
}
// WalkKeys implements Storage.WalkKeys().
func (st *MemoryStorage) WalkKeys(opts WalkKeysOptions) error {
- // Safely walk storage keys
+ // Lock storage
st.mu.Lock()
+ defer st.mu.Unlock()
+
+ // Check store open
+ if st.st == 1 {
+ return ErrClosed
+ }
+
+ // Walk store keys
for key := range st.fs {
opts.WalkFn(entry(key))
}
- st.mu.Unlock()
return nil
}