diff options
author | 2022-01-16 18:52:30 +0100 | |
---|---|---|
committer | 2022-01-16 18:52:30 +0100 | |
commit | 6f5ccf435585e43a00e3cc50f4bcefac36ada818 (patch) | |
tree | ba368d27464b79b1e5d010c0662fd3e340bf108e /vendor/codeberg.org/gruf/go-store | |
parent | add go-runners to readme (diff) | |
download | gotosocial-6f5ccf435585e43a00e3cc50f4bcefac36ada818.tar.xz |
update dependencies
Diffstat (limited to 'vendor/codeberg.org/gruf/go-store')
-rw-r--r-- | vendor/codeberg.org/gruf/go-store/kv/iterator.go | 2 | ||||
-rw-r--r-- | vendor/codeberg.org/gruf/go-store/kv/state.go | 18 | ||||
-rw-r--r-- | vendor/codeberg.org/gruf/go-store/kv/store.go | 82 | ||||
-rw-r--r-- | vendor/codeberg.org/gruf/go-store/storage/block.go | 57 | ||||
-rw-r--r-- | vendor/codeberg.org/gruf/go-store/storage/disk.go | 29 | ||||
-rw-r--r-- | vendor/codeberg.org/gruf/go-store/storage/fs.go | 2 | ||||
-rw-r--r-- | vendor/codeberg.org/gruf/go-store/storage/lock.go | 16 | ||||
-rw-r--r-- | vendor/codeberg.org/gruf/go-store/storage/memory.go | 53 | ||||
-rw-r--r-- | vendor/codeberg.org/gruf/go-store/storage/storage.go | 9 |
9 files changed, 176 insertions, 92 deletions
diff --git a/vendor/codeberg.org/gruf/go-store/kv/iterator.go b/vendor/codeberg.org/gruf/go-store/kv/iterator.go index d3999273f..ddaaf60cf 100644 --- a/vendor/codeberg.org/gruf/go-store/kv/iterator.go +++ b/vendor/codeberg.org/gruf/go-store/kv/iterator.go @@ -60,5 +60,5 @@ func (i *KVIterator) Value() ([]byte, error) { } // Attempt to fetch from store - return i.store.get(i.key) + return i.store.get(i.store.mutexMap.RLock, i.key) } diff --git a/vendor/codeberg.org/gruf/go-store/kv/state.go b/vendor/codeberg.org/gruf/go-store/kv/state.go index 20a3e951d..330928bce 100644 --- a/vendor/codeberg.org/gruf/go-store/kv/state.go +++ b/vendor/codeberg.org/gruf/go-store/kv/state.go @@ -30,7 +30,7 @@ func (st *StateRO) Get(key string) ([]byte, error) { } // Pass request to store - return st.store.get(key) + return st.store.get(st.store.mutexMap.RLock, key) } func (st *StateRO) GetStream(key string) (io.ReadCloser, error) { @@ -44,7 +44,7 @@ func (st *StateRO) GetStream(key string) (io.ReadCloser, error) { } // Pass request to store - return st.store.getStream(key) + return st.store.getStream(st.store.mutexMap.RLock, key) } func (st *StateRO) Has(key string) (bool, error) { @@ -58,7 +58,7 @@ func (st *StateRO) Has(key string) (bool, error) { } // Pass request to store - return st.store.has(key) + return st.store.has(st.store.mutexMap.RLock, key) } func (st *StateRO) Release() { @@ -94,7 +94,7 @@ func (st *StateRW) Get(key string) ([]byte, error) { } // Pass request to store - return st.store.get(key) + return st.store.get(st.store.mutexMap.RLock, key) } func (st *StateRW) GetStream(key string) (io.ReadCloser, error) { @@ -108,7 +108,7 @@ func (st *StateRW) GetStream(key string) (io.ReadCloser, error) { } // Pass request to store - return st.store.getStream(key) + return st.store.getStream(st.store.mutexMap.RLock, key) } func (st *StateRW) Put(key string, value []byte) error { @@ -122,7 +122,7 @@ func (st *StateRW) Put(key string, value []byte) error { } // Pass request to store - return st.store.put(key, value) + return st.store.put(st.store.mutexMap.Lock, key, value) } func (st *StateRW) PutStream(key string, r io.Reader) error { @@ -136,7 +136,7 @@ func (st *StateRW) PutStream(key string, r io.Reader) error { } // Pass request to store - return st.store.putStream(key, r) + return st.store.putStream(st.store.mutexMap.Lock, key, r) } func (st *StateRW) Has(key string) (bool, error) { @@ -150,7 +150,7 @@ func (st *StateRW) Has(key string) (bool, error) { } // Pass request to store - return st.store.has(key) + return st.store.has(st.store.mutexMap.RLock, key) } func (st *StateRW) Delete(key string) error { @@ -164,7 +164,7 @@ func (st *StateRW) Delete(key string) error { } // Pass request to store - return st.store.delete(key) + return st.store.delete(st.store.mutexMap.Lock, key) } func (st *StateRW) Release() { diff --git a/vendor/codeberg.org/gruf/go-store/kv/store.go b/vendor/codeberg.org/gruf/go-store/kv/store.go index 34fe91987..4c3a31140 100644 --- a/vendor/codeberg.org/gruf/go-store/kv/store.go +++ b/vendor/codeberg.org/gruf/go-store/kv/store.go @@ -53,19 +53,30 @@ func OpenStorage(storage storage.Storage) (*KVStore, error) { }, nil } -// Get fetches the bytes for supplied key in the store -func (st *KVStore) Get(key string) ([]byte, error) { - // Acquire store read lock +// RLock acquires a read-lock on supplied key, returning unlock function. +func (st *KVStore) RLock(key string) (runlock func()) { st.mutex.RLock() - defer st.mutex.RUnlock() + runlock = st.mutexMap.RLock(key) + st.mutex.RUnlock() + return runlock +} - // Pass to unprotected fn - return st.get(key) +// Lock acquires a write-lock on supplied key, returning unlock function. +func (st *KVStore) Lock(key string) (unlock func()) { + st.mutex.Lock() + unlock = st.mutexMap.Lock(key) + st.mutex.Unlock() + return unlock } -func (st *KVStore) get(key string) ([]byte, error) { +// Get fetches the bytes for supplied key in the store +func (st *KVStore) Get(key string) ([]byte, error) { + return st.get(st.RLock, key) +} + +func (st *KVStore) get(rlock func(string) func(), key string) ([]byte, error) { // Acquire read lock for key - runlock := st.mutexMap.RLock(key) + runlock := rlock(key) defer runlock() // Read file bytes @@ -74,17 +85,12 @@ func (st *KVStore) get(key string) ([]byte, error) { // GetStream fetches a ReadCloser for the bytes at the supplied key location in the store func (st *KVStore) GetStream(key string) (io.ReadCloser, error) { - // Acquire store read lock - st.mutex.RLock() - defer st.mutex.RUnlock() - - // Pass to unprotected fn - return st.getStream(key) + return st.getStream(st.RLock, key) } -func (st *KVStore) getStream(key string) (io.ReadCloser, error) { +func (st *KVStore) getStream(rlock func(string) func(), key string) (io.ReadCloser, error) { // Acquire read lock for key - runlock := st.mutexMap.RLock(key) + runlock := rlock(key) // Attempt to open stream for read rd, err := st.storage.ReadStream(key) @@ -99,17 +105,12 @@ func (st *KVStore) getStream(key string) (io.ReadCloser, error) { // Put places the bytes at the supplied key location in the store func (st *KVStore) Put(key string, value []byte) error { - // Acquire store write lock - st.mutex.Lock() - defer st.mutex.Unlock() - - // Pass to unprotected fn - return st.put(key, value) + return st.put(st.Lock, key, value) } -func (st *KVStore) put(key string, value []byte) error { +func (st *KVStore) put(lock func(string) func(), key string, value []byte) error { // Acquire write lock for key - unlock := st.mutexMap.Lock(key) + unlock := lock(key) defer unlock() // Write file bytes @@ -118,17 +119,12 @@ func (st *KVStore) put(key string, value []byte) error { // PutStream writes the bytes from the supplied Reader at the supplied key location in the store func (st *KVStore) PutStream(key string, r io.Reader) error { - // Acquire store write lock - st.mutex.Lock() - defer st.mutex.Unlock() - - // Pass to unprotected fn - return st.putStream(key, r) + return st.putStream(st.Lock, key, r) } -func (st *KVStore) putStream(key string, r io.Reader) error { +func (st *KVStore) putStream(lock func(string) func(), key string, r io.Reader) error { // Acquire write lock for key - unlock := st.mutexMap.Lock(key) + unlock := lock(key) defer unlock() // Write file stream @@ -137,17 +133,12 @@ func (st *KVStore) putStream(key string, r io.Reader) error { // Has checks whether the supplied key exists in the store func (st *KVStore) Has(key string) (bool, error) { - // Acquire store read lock - st.mutex.RLock() - defer st.mutex.RUnlock() - - // Pass to unprotected fn - return st.has(key) + return st.has(st.RLock, key) } -func (st *KVStore) has(key string) (bool, error) { +func (st *KVStore) has(rlock func(string) func(), key string) (bool, error) { // Acquire read lock for key - runlock := st.mutexMap.RLock(key) + runlock := rlock(key) defer runlock() // Stat file on disk @@ -156,17 +147,12 @@ func (st *KVStore) has(key string) (bool, error) { // Delete removes the supplied key-value pair from the store func (st *KVStore) Delete(key string) error { - // Acquire store write lock - st.mutex.Lock() - defer st.mutex.Unlock() - - // Pass to unprotected fn - return st.delete(key) + return st.delete(st.Lock, key) } -func (st *KVStore) delete(key string) error { +func (st *KVStore) delete(lock func(string) func(), key string) error { // Acquire write lock for key - unlock := st.mutexMap.Lock(key) + unlock := lock(key) defer unlock() // Remove file from disk diff --git a/vendor/codeberg.org/gruf/go-store/storage/block.go b/vendor/codeberg.org/gruf/go-store/storage/block.go index 9a8c4dc7d..bc35b07ac 100644 --- a/vendor/codeberg.org/gruf/go-store/storage/block.go +++ b/vendor/codeberg.org/gruf/go-store/storage/block.go @@ -1,7 +1,6 @@ package storage import ( - "crypto/sha256" "io" "io/fs" "os" @@ -14,6 +13,7 @@ import ( "codeberg.org/gruf/go-hashenc" "codeberg.org/gruf/go-pools" "codeberg.org/gruf/go-store/util" + "github.com/zeebo/blake3" ) var ( @@ -77,7 +77,7 @@ func getBlockConfig(cfg *BlockConfig) BlockConfig { // BlockStorage is a Storage implementation that stores input data as chunks on // a filesystem. Each value is chunked into blocks of configured size and these -// blocks are stored with name equal to their base64-encoded SHA256 hash-sum. A +// blocks are stored with name equal to their base64-encoded BLAKE3 hash-sum. A // "node" file is finally created containing an array of hashes contained within // this value type BlockStorage struct { @@ -87,6 +87,7 @@ type BlockStorage struct { config BlockConfig // cfg is the supplied configuration for this store hashPool sync.Pool // hashPool is this store's hashEncoder pool bufpool pools.BufferPool // bufpool is this store's bytes.Buffer pool + lock *LockableFile // lock is the opened lockfile for this storage instance // NOTE: // BlockStorage does not need to lock each of the underlying block files @@ -138,6 +139,14 @@ func OpenBlock(path string, cfg *BlockConfig) (*BlockStorage, error) { return nil, errPathIsFile } + // Open and acquire storage lock for path + lock, err := OpenLock(pb.Join(path, LockFile)) + if err != nil { + return nil, err + } else if err := lock.Lock(); err != nil { + return nil, err + } + // Figure out the largest size for bufpool slices bufSz := encodedHashLen if bufSz < config.BlockSize { @@ -159,6 +168,7 @@ func OpenBlock(path string, cfg *BlockConfig) (*BlockStorage, error) { }, }, bufpool: pools.NewBufferPool(bufSz), + lock: lock, }, nil } @@ -443,11 +453,16 @@ loop: continue loop } - // Write in separate goroutine + // Check if reached EOF + atEOF := (n < buf.Len()) + wg.Add(1) go func() { - // Defer buffer release + signal done + // Perform writes in goroutine + defer func() { + // Defer release + + // signal we're done st.bufpool.Put(buf) wg.Done() }() @@ -460,8 +475,8 @@ loop: } }() - // We reached EOF - if n < buf.Len() { + // Break at end + if atEOF { break loop } } @@ -568,6 +583,12 @@ func (st *BlockStorage) Remove(key string) error { return os.Remove(kpath) } +// Close implements Storage.Close() +func (st *BlockStorage) Close() error { + defer st.lock.Close() + return st.lock.Unlock() +} + // WalkKeys implements Storage.WalkKeys() func (st *BlockStorage) WalkKeys(opts WalkKeysOptions) error { // Acquire path builder @@ -610,7 +631,7 @@ func (st *BlockStorage) blockPathForKey(hash string) string { } // hashSeparator is the separating byte between block hashes -const hashSeparator = byte(':') +const hashSeparator = byte('\n') // node represents the contents of a node file in storage type node struct { @@ -773,24 +794,28 @@ func (r *blockReader) Read(b []byte) (int, error) { } } +var ( + // base64Encoding is our base64 encoding object. + base64Encoding = hashenc.Base64() + + // encodedHashLen is the once-calculated encoded hash-sum length + encodedHashLen = base64Encoding.EncodedLen( + blake3.New().Size(), + ) +) + // hashEncoder is a HashEncoder with built-in encode buffer type hashEncoder struct { henc hashenc.HashEncoder ebuf []byte } -// encodedHashLen is the once-calculated encoded hash-sum length -var encodedHashLen = hashenc.Base64().EncodedLen( - sha256.New().Size(), -) - // newHashEncoder returns a new hashEncoder instance func newHashEncoder() *hashEncoder { - hash := sha256.New() - enc := hashenc.Base64() + hash := blake3.New() return &hashEncoder{ - henc: hashenc.New(hash, enc), - ebuf: make([]byte, enc.EncodedLen(hash.Size())), + henc: hashenc.New(hash, base64Encoding), + ebuf: make([]byte, encodedHashLen), } } diff --git a/vendor/codeberg.org/gruf/go-store/storage/disk.go b/vendor/codeberg.org/gruf/go-store/storage/disk.go index 060d56688..9b5430437 100644 --- a/vendor/codeberg.org/gruf/go-store/storage/disk.go +++ b/vendor/codeberg.org/gruf/go-store/storage/disk.go @@ -71,6 +71,7 @@ type DiskStorage struct { path string // path is the root path of this store bufp pools.BufferPool // bufp is the buffer pool for this DiskStorage config DiskConfig // cfg is the supplied configuration for this store + lock *LockableFile // lock is the opened lockfile for this storage instance } // OpenFile opens a DiskStorage instance for given folder path and configuration @@ -81,13 +82,13 @@ func OpenFile(path string, cfg *DiskConfig) (*DiskStorage, error) { // Clean provided path, ensure ends in '/' (should // be dir, this helps with file path trimming later) - path = pb.Clean(path) + "/" + storePath := pb.Join(path, "store") + "/" // Get checked config config := getDiskConfig(cfg) // Attempt to open dir path - file, err := os.OpenFile(path, defaultFileROFlags, defaultDirPerms) + file, err := os.OpenFile(storePath, defaultFileROFlags, defaultDirPerms) if err != nil { // If not a not-exist error, return if !os.IsNotExist(err) { @@ -95,13 +96,13 @@ func OpenFile(path string, cfg *DiskConfig) (*DiskStorage, error) { } // Attempt to make store path dirs - err = os.MkdirAll(path, defaultDirPerms) + err = os.MkdirAll(storePath, defaultDirPerms) if err != nil { return nil, err } // Reopen dir now it's been created - file, err = os.OpenFile(path, defaultFileROFlags, defaultDirPerms) + file, err = os.OpenFile(storePath, defaultFileROFlags, defaultDirPerms) if err != nil { return nil, err } @@ -116,11 +117,20 @@ func OpenFile(path string, cfg *DiskConfig) (*DiskStorage, error) { return nil, errPathIsFile } + // Open and acquire storage lock for path + lock, err := OpenLock(pb.Join(path, LockFile)) + if err != nil { + return nil, err + } else if err := lock.Lock(); err != nil { + return nil, err + } + // Return new DiskStorage return &DiskStorage{ - path: path, + path: storePath, bufp: pools.NewBufferPool(config.WriteBufSize), config: config, + lock: lock, }, nil } @@ -248,6 +258,12 @@ func (st *DiskStorage) Remove(key string) error { return os.Remove(kpath) } +// Close implements Storage.Close() +func (st *DiskStorage) Close() error { + defer st.lock.Close() + return st.lock.Unlock() +} + // WalkKeys implements Storage.WalkKeys() func (st *DiskStorage) WalkKeys(opts WalkKeysOptions) error { // Acquire path builder @@ -256,8 +272,9 @@ func (st *DiskStorage) WalkKeys(opts WalkKeysOptions) error { // Walk dir for entries return util.WalkDir(pb, st.path, func(kpath string, fsentry fs.DirEntry) { - // Only deal with regular files if fsentry.Type().IsRegular() { + // Only deal with regular files + // Get full item path (without root) kpath = pb.Join(kpath, fsentry.Name())[len(st.path):] diff --git a/vendor/codeberg.org/gruf/go-store/storage/fs.go b/vendor/codeberg.org/gruf/go-store/storage/fs.go index 444cee4b0..ff4c857c3 100644 --- a/vendor/codeberg.org/gruf/go-store/storage/fs.go +++ b/vendor/codeberg.org/gruf/go-store/storage/fs.go @@ -39,7 +39,7 @@ func stat(path string) (bool, error) { return syscall.Stat(path, &stat) }) if err != nil { - if err == syscall.ENOENT { + if err == syscall.ENOENT { //nolint err = nil } return false, err diff --git a/vendor/codeberg.org/gruf/go-store/storage/lock.go b/vendor/codeberg.org/gruf/go-store/storage/lock.go index 3d794cda9..a757830cc 100644 --- a/vendor/codeberg.org/gruf/go-store/storage/lock.go +++ b/vendor/codeberg.org/gruf/go-store/storage/lock.go @@ -7,27 +7,31 @@ import ( "codeberg.org/gruf/go-store/util" ) -type lockableFile struct { +// LockFile is our standard lockfile name. +const LockFile = "store.lock" + +type LockableFile struct { *os.File } -func openLock(path string) (*lockableFile, error) { +// OpenLock opens a lockfile at path. +func OpenLock(path string) (*LockableFile, error) { file, err := open(path, defaultFileLockFlags) if err != nil { return nil, err } - return &lockableFile{file}, nil + return &LockableFile{file}, nil } -func (f *lockableFile) lock() error { +func (f *LockableFile) Lock() error { return f.flock(syscall.LOCK_EX | syscall.LOCK_NB) } -func (f *lockableFile) unlock() error { +func (f *LockableFile) Unlock() error { return f.flock(syscall.LOCK_UN | syscall.LOCK_NB) } -func (f *lockableFile) flock(how int) error { +func (f *LockableFile) flock(how int) error { return util.RetryOnEINTR(func() error { return syscall.Flock(int(f.Fd()), how) }) diff --git a/vendor/codeberg.org/gruf/go-store/storage/memory.go b/vendor/codeberg.org/gruf/go-store/storage/memory.go index be60fa464..7daa4a483 100644 --- a/vendor/codeberg.org/gruf/go-store/storage/memory.go +++ b/vendor/codeberg.org/gruf/go-store/storage/memory.go @@ -2,6 +2,7 @@ package storage import ( "io" + "sync" "codeberg.org/gruf/go-bytes" "codeberg.org/gruf/go-store/util" @@ -10,13 +11,17 @@ import ( // MemoryStorage is a storage implementation that simply stores key-value // pairs in a Go map in-memory. The map is protected by a mutex. type MemoryStorage struct { + ow bool // overwrites fs map[string][]byte + mu sync.Mutex } // OpenMemory opens a new MemoryStorage instance with internal map of 'size'. -func OpenMemory(size int) *MemoryStorage { +func OpenMemory(size int, overwrites bool) *MemoryStorage { return &MemoryStorage{ fs: make(map[string][]byte, size), + mu: sync.Mutex{}, + ow: overwrites, } } @@ -27,19 +32,33 @@ func (st *MemoryStorage) Clean() error { // ReadBytes implements Storage.ReadBytes(). func (st *MemoryStorage) ReadBytes(key string) ([]byte, error) { + // Safely check store + st.mu.Lock() b, ok := st.fs[key] + st.mu.Unlock() + + // Return early if not exist if !ok { return nil, ErrNotFound } + + // Create return copy return bytes.Copy(b), nil } // ReadStream implements Storage.ReadStream(). func (st *MemoryStorage) ReadStream(key string) (io.ReadCloser, error) { + // Safely check store + st.mu.Lock() b, ok := st.fs[key] + st.mu.Unlock() + + // Return early if not exist if !ok { return nil, ErrNotFound } + + // Create io.ReadCloser from 'b' copy b = bytes.Copy(b) r := bytes.NewReader(b) return util.NopReadCloser(r), nil @@ -47,43 +66,73 @@ func (st *MemoryStorage) ReadStream(key string) (io.ReadCloser, error) { // WriteBytes implements Storage.WriteBytes(). func (st *MemoryStorage) WriteBytes(key string, b []byte) error { + // Safely check store + st.mu.Lock() _, ok := st.fs[key] - if ok { + + // Check for already exist + if ok && !st.ow { + st.mu.Unlock() return ErrAlreadyExists } + + // Write + unlock st.fs[key] = bytes.Copy(b) + st.mu.Unlock() return nil } // WriteStream implements Storage.WriteStream(). func (st *MemoryStorage) WriteStream(key string, r io.Reader) error { + // Read all from reader b, err := io.ReadAll(r) if err != nil { return err } + + // Write to storage return st.WriteBytes(key, b) } // Stat implements Storage.Stat(). func (st *MemoryStorage) Stat(key string) (bool, error) { + st.mu.Lock() _, ok := st.fs[key] + st.mu.Unlock() return ok, nil } // Remove implements Storage.Remove(). func (st *MemoryStorage) Remove(key string) error { + // Safely check store + st.mu.Lock() _, ok := st.fs[key] + + // Check in store if !ok { + st.mu.Unlock() return ErrNotFound } + + // Delete + unlock delete(st.fs, key) + st.mu.Unlock() + return nil +} + +// Close implements Storage.Close(). +func (st *MemoryStorage) Close() error { return nil } // WalkKeys implements Storage.WalkKeys(). func (st *MemoryStorage) WalkKeys(opts WalkKeysOptions) error { + // Safely walk storage keys + st.mu.Lock() for key := range st.fs { opts.WalkFn(entry(key)) } + st.mu.Unlock() + return nil } diff --git a/vendor/codeberg.org/gruf/go-store/storage/storage.go b/vendor/codeberg.org/gruf/go-store/storage/storage.go index b160267a4..346aff097 100644 --- a/vendor/codeberg.org/gruf/go-store/storage/storage.go +++ b/vendor/codeberg.org/gruf/go-store/storage/storage.go @@ -19,9 +19,6 @@ func (e entry) Key() string { // Storage defines a means of storing and accessing key value pairs type Storage interface { - // Clean removes unused values and unclutters the storage (e.g. removing empty folders) - Clean() error - // ReadBytes returns the byte value for key in storage ReadBytes(key string) ([]byte, error) @@ -40,6 +37,12 @@ type Storage interface { // Remove attempts to remove the supplied key-value pair from storage Remove(key string) error + // Close will close the storage, releasing any file locks + Close() error + + // Clean removes unused values and unclutters the storage (e.g. removing empty folders) + Clean() error + // WalkKeys walks the keys in the storage WalkKeys(opts WalkKeysOptions) error } |