summaryrefslogtreecommitdiff
path: root/vendor/git.iim.gay/grufwub/go-store
diff options
context:
space:
mode:
Diffstat (limited to 'vendor/git.iim.gay/grufwub/go-store')
-rw-r--r--vendor/git.iim.gay/grufwub/go-store/kv/iterator.go64
-rw-r--r--vendor/git.iim.gay/grufwub/go-store/kv/state.go125
-rw-r--r--vendor/git.iim.gay/grufwub/go-store/kv/store.go243
-rw-r--r--vendor/git.iim.gay/grufwub/go-store/storage/block.go785
-rw-r--r--vendor/git.iim.gay/grufwub/go-store/storage/compressor.go104
-rw-r--r--vendor/git.iim.gay/grufwub/go-store/storage/disk.go289
-rw-r--r--vendor/git.iim.gay/grufwub/go-store/storage/errors.go63
-rw-r--r--vendor/git.iim.gay/grufwub/go-store/storage/fs.go48
-rw-r--r--vendor/git.iim.gay/grufwub/go-store/storage/lock.go34
-rw-r--r--vendor/git.iim.gay/grufwub/go-store/storage/storage.go51
-rw-r--r--vendor/git.iim.gay/grufwub/go-store/storage/transform.go25
-rw-r--r--vendor/git.iim.gay/grufwub/go-store/util/fs.go105
-rw-r--r--vendor/git.iim.gay/grufwub/go-store/util/io.go42
-rw-r--r--vendor/git.iim.gay/grufwub/go-store/util/nocopy.go6
-rw-r--r--vendor/git.iim.gay/grufwub/go-store/util/pools.go44
15 files changed, 2028 insertions, 0 deletions
diff --git a/vendor/git.iim.gay/grufwub/go-store/kv/iterator.go b/vendor/git.iim.gay/grufwub/go-store/kv/iterator.go
new file mode 100644
index 000000000..f2c68b87c
--- /dev/null
+++ b/vendor/git.iim.gay/grufwub/go-store/kv/iterator.go
@@ -0,0 +1,64 @@
+package kv
+
+import (
+ "git.iim.gay/grufwub/go-errors"
+ "git.iim.gay/grufwub/go-store/storage"
+)
+
+var ErrIteratorClosed = errors.Define("store/kv: iterator closed")
+
+// KVIterator provides a read-only iterator to all the key-value
+// pairs in a KVStore. While the iterator is open the store is read
+// locked, you MUST release the iterator when you are finished with
+// it.
+//
+// Please note:
+// - individual iterators are NOT concurrency safe, though it is safe to
+// have multiple iterators running concurrently
+type KVIterator struct {
+ store *KVStore // store is the linked KVStore
+ entries []storage.StorageEntry
+ index int
+ key string
+ onClose func()
+}
+
+// Next attempts to set the next key-value pair, the
+// return value is if there was another pair remaining
+func (i *KVIterator) Next() bool {
+ next := i.index + 1
+ if next >= len(i.entries) {
+ i.key = ""
+ return false
+ }
+ i.key = i.entries[next].Key()
+ i.index = next
+ return true
+}
+
+// Key returns the next key from the store
+func (i *KVIterator) Key() string {
+ return i.key
+}
+
+// Release releases the KVIterator and KVStore's read lock
+func (i *KVIterator) Release() {
+ // Reset key, path, entries
+ i.store = nil
+ i.key = ""
+ i.entries = nil
+
+ // Perform requested callback
+ i.onClose()
+}
+
+// Value returns the next value from the KVStore
+func (i *KVIterator) Value() ([]byte, error) {
+ // Check store isn't closed
+ if i.store == nil {
+ return nil, ErrIteratorClosed
+ }
+
+ // Attempt to fetch from store
+ return i.store.get(i.key)
+}
diff --git a/vendor/git.iim.gay/grufwub/go-store/kv/state.go b/vendor/git.iim.gay/grufwub/go-store/kv/state.go
new file mode 100644
index 000000000..a8bc64637
--- /dev/null
+++ b/vendor/git.iim.gay/grufwub/go-store/kv/state.go
@@ -0,0 +1,125 @@
+package kv
+
+import (
+ "io"
+
+ "git.iim.gay/grufwub/go-errors"
+)
+
+var ErrStateClosed = errors.Define("store/kv: state closed")
+
+// StateRO provides a read-only window to the store. While this
+// state is active during the Read() function window, the entire
+// store will be read-locked. The state is thread-safe for concurrent
+// use UNTIL the moment that your supplied function to Read() returns,
+// then the state has zero guarantees
+type StateRO struct {
+ store *KVStore
+}
+
+func (st *StateRO) Get(key string) ([]byte, error) {
+ // Check not closed
+ if st.store == nil {
+ return nil, ErrStateClosed
+ }
+
+ // Pass request to store
+ return st.store.get(key)
+}
+
+func (st *StateRO) GetStream(key string) (io.ReadCloser, error) {
+ // Check not closed
+ if st.store == nil {
+ return nil, ErrStateClosed
+ }
+
+ // Pass request to store
+ return st.store.getStream(key)
+}
+
+func (st *StateRO) Has(key string) (bool, error) {
+ // Check not closed
+ if st.store == nil {
+ return false, ErrStateClosed
+ }
+
+ // Pass request to store
+ return st.store.has(key)
+}
+
+func (st *StateRO) close() {
+ st.store = nil
+}
+
+// StateRW provides a read-write window to the store. While this
+// state is active during the Update() function window, the entire
+// store will be locked. The state is thread-safe for concurrent
+// use UNTIL the moment that your supplied function to Update() returns,
+// then the state has zero guarantees
+type StateRW struct {
+ store *KVStore
+}
+
+func (st *StateRW) Get(key string) ([]byte, error) {
+ // Check not closed
+ if st.store == nil {
+ return nil, ErrStateClosed
+ }
+
+ // Pass request to store
+ return st.store.get(key)
+}
+
+func (st *StateRW) GetStream(key string) (io.ReadCloser, error) {
+ // Check not closed
+ if st.store == nil {
+ return nil, ErrStateClosed
+ }
+
+ // Pass request to store
+ return st.store.getStream(key)
+}
+
+func (st *StateRW) Put(key string, value []byte) error {
+ // Check not closed
+ if st.store == nil {
+ return ErrStateClosed
+ }
+
+ // Pass request to store
+ return st.store.put(key, value)
+}
+
+func (st *StateRW) PutStream(key string, r io.Reader) error {
+ // Check not closed
+ if st.store == nil {
+ return ErrStateClosed
+ }
+
+ // Pass request to store
+ return st.store.putStream(key, r)
+}
+
+func (st *StateRW) Has(key string) (bool, error) {
+ // Check not closed
+ if st.store == nil {
+ return false, ErrStateClosed
+ }
+
+ // Pass request to store
+ return st.store.has(key)
+}
+
+func (st *StateRW) Delete(key string) error {
+ // Check not closed
+ if st.store == nil {
+ return ErrStateClosed
+ }
+
+ // Pass request to store
+ return st.store.delete(key)
+}
+
+func (st *StateRW) close() {
+ st.store = nil
+}
diff --git a/vendor/git.iim.gay/grufwub/go-store/kv/store.go b/vendor/git.iim.gay/grufwub/go-store/kv/store.go
new file mode 100644
index 000000000..840a554e9
--- /dev/null
+++ b/vendor/git.iim.gay/grufwub/go-store/kv/store.go
@@ -0,0 +1,243 @@
+package kv
+
+import (
+ "io"
+ "sync"
+
+ "git.iim.gay/grufwub/go-mutexes"
+ "git.iim.gay/grufwub/go-store/storage"
+ "git.iim.gay/grufwub/go-store/util"
+)
+
+// KVStore is a very simple, yet performant key-value store
+type KVStore struct {
+ mutexMap mutexes.MutexMap // mutexMap is a map of keys to mutexes to protect file access
+ mutex sync.RWMutex // mutex is the total store mutex
+ storage storage.Storage // storage is the underlying storage
+}
+
+func OpenFile(path string, cfg *storage.DiskConfig) (*KVStore, error) {
+ // Attempt to open disk storage
+ storage, err := storage.OpenFile(path, cfg)
+ if err != nil {
+ return nil, err
+ }
+
+ // Return new KVStore
+ return OpenStorage(storage)
+}
+
+func OpenBlock(path string, cfg *storage.BlockConfig) (*KVStore, error) {
+ // Attempt to open block storage
+ storage, err := storage.OpenBlock(path, cfg)
+ if err != nil {
+ return nil, err
+ }
+
+ // Return new KVStore
+ return OpenStorage(storage)
+}
+
+func OpenStorage(storage storage.Storage) (*KVStore, error) {
+ // Perform initial storage clean
+ err := storage.Clean()
+ if err != nil {
+ return nil, err
+ }
+
+ // Return new KVStore
+ return &KVStore{
+ mutexMap: mutexes.NewMap(mutexes.NewRW),
+ mutex: sync.RWMutex{},
+ storage: storage,
+ }, nil
+}
+
+// Get fetches the bytes for supplied key in the store
+func (st *KVStore) Get(key string) ([]byte, error) {
+ // Acquire store read lock
+ st.mutex.RLock()
+ defer st.mutex.RUnlock()
+
+ // Pass to unprotected fn
+ return st.get(key)
+}
+
+func (st *KVStore) get(key string) ([]byte, error) {
+ // Acquire read lock for key
+ runlock := st.mutexMap.RLock(key)
+ defer runlock()
+
+ // Read file bytes
+ return st.storage.ReadBytes(key)
+}
+
+// GetStream fetches a ReadCloser for the bytes at the supplied key location in the store
+func (st *KVStore) GetStream(key string) (io.ReadCloser, error) {
+ // Acquire store read lock
+ st.mutex.RLock()
+ defer st.mutex.RUnlock()
+
+ // Pass to unprotected fn
+ return st.getStream(key)
+}
+
+func (st *KVStore) getStream(key string) (io.ReadCloser, error) {
+ // Acquire read lock for key
+ runlock := st.mutexMap.RLock(key)
+
+ // Attempt to open stream for read
+ rd, err := st.storage.ReadStream(key)
+ if err != nil {
+ runlock()
+ return nil, err
+ }
+
+ // Wrap readcloser in our own callback closer
+ return util.ReadCloserWithCallback(rd, runlock), nil
+}
+
+// Put places the bytes at the supplied key location in the store
+func (st *KVStore) Put(key string, value []byte) error {
+ // Acquire store write lock
+ st.mutex.Lock()
+ defer st.mutex.Unlock()
+
+ // Pass to unprotected fn
+ return st.put(key, value)
+}
+
+func (st *KVStore) put(key string, value []byte) error {
+ // Acquire write lock for key
+ unlock := st.mutexMap.Lock(key)
+ defer unlock()
+
+ // Write file bytes
+ return st.storage.WriteBytes(key, value)
+}
+
+// PutStream writes the bytes from the supplied Reader at the supplied key location in the store
+func (st *KVStore) PutStream(key string, r io.Reader) error {
+ // Acquire store write lock
+ st.mutex.Lock()
+ defer st.mutex.Unlock()
+
+ // Pass to unprotected fn
+ return st.putStream(key, r)
+}
+
+func (st *KVStore) putStream(key string, r io.Reader) error {
+ // Acquire write lock for key
+ unlock := st.mutexMap.Lock(key)
+ defer unlock()
+
+ // Write file stream
+ return st.storage.WriteStream(key, r)
+}
+
+// Has checks whether the supplied key exists in the store
+func (st *KVStore) Has(key string) (bool, error) {
+ // Acquire store read lock
+ st.mutex.RLock()
+ defer st.mutex.RUnlock()
+
+ // Pass to unprotected fn
+ return st.has(key)
+}
+
+func (st *KVStore) has(key string) (bool, error) {
+ // Acquire read lock for key
+ runlock := st.mutexMap.RLock(key)
+ defer runlock()
+
+ // Stat file on disk
+ return st.storage.Stat(key)
+}
+
+// Delete removes the supplied key-value pair from the store
+func (st *KVStore) Delete(key string) error {
+ // Acquire store write lock
+ st.mutex.Lock()
+ defer st.mutex.Unlock()
+
+ // Pass to unprotected fn
+ return st.delete(key)
+}
+
+func (st *KVStore) delete(key string) error {
+ // Acquire write lock for key
+ unlock := st.mutexMap.Lock(key)
+ defer unlock()
+
+ // Remove file from disk
+ return st.storage.Remove(key)
+}
+
+// Iterator returns an Iterator for key-value pairs in the store, using supplied match function
+func (st *KVStore) Iterator(matchFn func(string) bool) (*KVIterator, error) {
+ // If no function, match all
+ if matchFn == nil {
+ matchFn = func(string) bool { return true }
+ }
+
+ // Get store read lock
+ st.mutex.RLock()
+
+ // Setup the walk keys function
+ entries := []storage.StorageEntry{}
+ walkFn := func(entry storage.StorageEntry) {
+ // Ignore unmatched entries
+ if !matchFn(entry.Key()) {
+ return
+ }
+
+ // Add to entries
+ entries = append(entries, entry)
+ }
+
+ // Walk keys in the storage
+ err := st.storage.WalkKeys(&storage.WalkKeysOptions{WalkFn: walkFn})
+ if err != nil {
+ st.mutex.RUnlock()
+ return nil, err
+ }
+
+ // Return new iterator
+ return &KVIterator{
+ store: st,
+ entries: entries,
+ index: -1,
+ key: "",
+ onClose: st.mutex.RUnlock,
+ }, nil
+}
+
+// Read provides a read-only window to the store, holding it in a read-locked state until
+// the supplied function returns
+func (st *KVStore) Read(do func(*StateRO)) {
+ // Get store read lock
+ st.mutex.RLock()
+ defer st.mutex.RUnlock()
+
+ // Create new store state (defer close)
+ state := &StateRO{store: st}
+ defer state.close()
+
+ // Pass state
+ do(state)
+}
+
+// Update provides a read-write window to the store, holding it in a read-write-locked state
+// until the supplied functions returns
+func (st *KVStore) Update(do func(*StateRW)) {
+ // Get store lock
+ st.mutex.Lock()
+ defer st.mutex.Unlock()
+
+ // Create new store state (defer close)
+ state := &StateRW{store: st}
+ defer state.close()
+
+ // Pass state
+ do(state)
+}
diff --git a/vendor/git.iim.gay/grufwub/go-store/storage/block.go b/vendor/git.iim.gay/grufwub/go-store/storage/block.go
new file mode 100644
index 000000000..023b83886
--- /dev/null
+++ b/vendor/git.iim.gay/grufwub/go-store/storage/block.go
@@ -0,0 +1,785 @@
+package storage
+
+import (
+ "crypto/sha256"
+ "io"
+ "io/fs"
+ "os"
+ "strings"
+ "sync"
+ "syscall"
+
+ "git.iim.gay/grufwub/fastpath"
+ "git.iim.gay/grufwub/go-bytes"
+ "git.iim.gay/grufwub/go-errors"
+ "git.iim.gay/grufwub/go-hashenc"
+ "git.iim.gay/grufwub/go-store/util"
+)
+
+var (
+ nodePathPrefix = "node/"
+ blockPathPrefix = "block/"
+)
+
+// DefaultBlockConfig is the default BlockStorage configuration
+var DefaultBlockConfig = &BlockConfig{
+ BlockSize: 1024 * 16,
+ WriteBufSize: 4096,
+ Overwrite: false,
+ Compression: NoCompression(),
+}
+
+// BlockConfig defines options to be used when opening a BlockStorage
+type BlockConfig struct {
+ // BlockSize is the chunking size to use when splitting and storing blocks of data
+ BlockSize int
+
+ // WriteBufSize is the buffer size to use when writing file streams (PutStream)
+ WriteBufSize int
+
+ // Overwrite allows overwriting values of stored keys in the storage
+ Overwrite bool
+
+ // Compression is the Compressor to use when reading / writing files, default is no compression
+ Compression Compressor
+}
+
+// getBlockConfig returns a valid BlockConfig for supplied ptr
+func getBlockConfig(cfg *BlockConfig) BlockConfig {
+ // If nil, use default
+ if cfg == nil {
+ cfg = DefaultBlockConfig
+ }
+
+ // Assume nil compress == none
+ if cfg.Compression == nil {
+ cfg.Compression = NoCompression()
+ }
+
+ // Assume 0 chunk size == use default
+ if cfg.BlockSize < 1 {
+ cfg.BlockSize = DefaultBlockConfig.BlockSize
+ }
+
+ // Assume 0 buf size == use default
+ if cfg.WriteBufSize < 1 {
+ cfg.WriteBufSize = DefaultDiskConfig.WriteBufSize
+ }
+
+ // Return owned config copy
+ return BlockConfig{
+ BlockSize: cfg.BlockSize,
+ WriteBufSize: cfg.WriteBufSize,
+ Overwrite: cfg.Overwrite,
+ Compression: cfg.Compression,
+ }
+}
+
+// BlockStorage is a Storage implementation that stores input data as chunks on
+// a filesystem. Each value is chunked into blocks of configured size and these
+// blocks are stored with name equal to their base64-encoded SHA256 hash-sum. A
+// "node" file is finally created containing an array of hashes contained within
+// this value
+type BlockStorage struct {
+ path string // path is the root path of this store
+ blockPath string // blockPath is the joined root path + block path prefix
+ nodePath string // nodePath is the joined root path + node path prefix
+ config BlockConfig // cfg is the supplied configuration for this store
+ hashPool sync.Pool // hashPool is this store's hashEncoder pool
+
+ // NOTE:
+ // BlockStorage does not need to lock each of the underlying block files
+ // as the filename itself directly relates to the contents. If there happens
+ // to be an overwrite, it will just be of the same data since the filename is
+ // the hash of the data.
+}
+
+// OpenBlock opens a BlockStorage instance for given folder path and configuration
+func OpenBlock(path string, cfg *BlockConfig) (*BlockStorage, error) {
+ // Acquire path builder
+ pb := util.AcquirePathBuilder()
+ defer util.ReleasePathBuilder(pb)
+
+ // Clean provided path, ensure ends in '/' (should
+ // be dir, this helps with file path trimming later)
+ path = pb.Clean(path) + "/"
+
+ // Get checked config
+ config := getBlockConfig(cfg)
+
+ // Attempt to open path
+ file, err := os.OpenFile(path, defaultFileROFlags, defaultDirPerms)
+ if err != nil {
+ // If not a not-exist error, return
+ if !os.IsNotExist(err) {
+ return nil, err
+ }
+
+ // Attempt to make store path dirs
+ err = os.MkdirAll(path, defaultDirPerms)
+ if err != nil {
+ return nil, err
+ }
+
+ // Reopen dir now it's been created
+ file, err = os.OpenFile(path, defaultFileROFlags, defaultDirPerms)
+ if err != nil {
+ return nil, err
+ }
+ }
+ defer file.Close()
+
+ // Double check this is a dir (NOT a file!)
+ stat, err := file.Stat()
+ if err != nil {
+ return nil, err
+ } else if !stat.IsDir() {
+ return nil, errPathIsFile
+ }
+
+ // Return new BlockStorage
+ return &BlockStorage{
+ path: path,
+ blockPath: pb.Join(path, blockPathPrefix),
+ nodePath: pb.Join(path, nodePathPrefix),
+ config: config,
+ hashPool: sync.Pool{
+ New: func() interface{} {
+ return newHashEncoder()
+ },
+ },
+ }, nil
+}
+
+// Clean implements storage.Clean()
+func (st *BlockStorage) Clean() error {
+ nodes := map[string]*node{}
+
+ // Acquire path builder
+ pb := fastpath.AcquireBuilder()
+ defer fastpath.ReleaseBuilder(pb)
+
+ // Walk nodes dir for entries
+ onceErr := errors.OnceError{}
+ err := util.WalkDir(pb, st.nodePath, func(npath string, fsentry fs.DirEntry) {
+ // Only deal with regular files
+ if !fsentry.Type().IsRegular() {
+ return
+ }
+
+ // Stop if we hit error previously
+ if onceErr.IsSet() {
+ return
+ }
+
+ // Get joined node path name
+ npath = pb.Join(npath, fsentry.Name())
+
+ // Attempt to open RO file
+ file, err := open(npath, defaultFileROFlags)
+ if err != nil {
+ onceErr.Store(err)
+ return
+ }
+ defer file.Close()
+
+ // Alloc new Node + acquire hash buffer for writes
+ hbuf := util.AcquireBuffer(encodedHashLen)
+ defer util.ReleaseBuffer(hbuf)
+ node := node{}
+
+ // Write file contents to node
+ _, err = io.CopyBuffer(
+ &nodeWriter{
+ node: &node,
+ buf: hbuf,
+ },
+ file,
+ nil,
+ )
+ if err != nil {
+ onceErr.Store(err)
+ return
+ }
+
+ // Append to nodes slice
+ nodes[fsentry.Name()] = &node
+ })
+
+ // Handle errors (though nodePath may not have been created yet)
+ if err != nil && !os.IsNotExist(err) {
+ return err
+ } else if onceErr.IsSet() {
+ return onceErr.Load()
+ }
+
+ // Walk blocks dir for entries
+ onceErr.Reset()
+ err = util.WalkDir(pb, st.blockPath, func(bpath string, fsentry fs.DirEntry) {
+ // Only deal with regular files
+ if !fsentry.Type().IsRegular() {
+ return
+ }
+
+ // Stop if we hit error previously
+ if onceErr.IsSet() {
+ return
+ }
+
+ inUse := false
+ for key, node := range nodes {
+ if node.removeHash(fsentry.Name()) {
+ if len(node.hashes) < 1 {
+ // This node contained hash, and after removal is now empty.
+ // Remove this node from our tracked nodes slice
+ delete(nodes, key)
+ }
+ inUse = true
+ }
+ }
+
+ // Block hash is used by node
+ if inUse {
+ return
+ }
+
+ // Get joined block path name
+ bpath = pb.Join(bpath, fsentry.Name())
+
+ // Remove this unused block path
+ err := os.Remove(bpath)
+ if err != nil {
+ onceErr.Store(err)
+ return
+ }
+ })
+
+ // Handle errors (though blockPath may not have been created yet)
+ if err != nil && !os.IsNotExist(err) {
+ return err
+ } else if onceErr.IsSet() {
+ return onceErr.Load()
+ }
+
+ // If there are nodes left at this point, they are corrupt
+ // (i.e. they're referencing block hashes that don't exist)
+ if len(nodes) > 0 {
+ nodeKeys := []string{}
+ for key := range nodes {
+ nodeKeys = append(nodeKeys, key)
+ }
+ return errCorruptNodes.Extend("%v", nodeKeys)
+ }
+
+ return nil
+}
+
+// ReadBytes implements Storage.ReadBytes()
+func (st *BlockStorage) ReadBytes(key string) ([]byte, error) {
+ // Get stream reader for key
+ rc, err := st.ReadStream(key)
+ if err != nil {
+ return nil, err
+ }
+
+ // Read all bytes and return
+ return io.ReadAll(rc)
+}
+
+// ReadStream implements Storage.ReadStream()
+func (st *BlockStorage) ReadStream(key string) (io.ReadCloser, error) {
+ // Get node file path for key
+ npath, err := st.nodePathForKey(key)
+ if err != nil {
+ return nil, err
+ }
+
+ // Attempt to open RO file
+ file, err := open(npath, defaultFileROFlags)
+ if err != nil {
+ return nil, err
+ }
+ defer file.Close()
+
+ // Alloc new Node + acquire hash buffer for writes
+ hbuf := util.AcquireBuffer(encodedHashLen)
+ defer util.ReleaseBuffer(hbuf)
+ node := node{}
+
+ // Write file contents to node
+ _, err = io.CopyBuffer(
+ &nodeWriter{
+ node: &node,
+ buf: hbuf,
+ },
+ file,
+ nil,
+ )
+ if err != nil {
+ return nil, err
+ }
+
+ // Return new block reader
+ return util.NopReadCloser(&blockReader{
+ storage: st,
+ node: &node,
+ }), nil
+}
+
+func (st *BlockStorage) readBlock(key string) ([]byte, error) {
+ // Get block file path for key
+ bpath := st.blockPathForKey(key)
+
+ // Attempt to open RO file
+ file, err := open(bpath, defaultFileROFlags)
+ if err != nil {
+ return nil, err
+ }
+ defer file.Close()
+
+ // Wrap the file in a compressor
+ cFile, err := st.config.Compression.Reader(file)
+ if err != nil {
+ return nil, err
+ }
+ defer cFile.Close()
+
+ // Read the entire file
+ return io.ReadAll(cFile)
+}
+
+// WriteBytes implements Storage.WriteBytes()
+func (st *BlockStorage) WriteBytes(key string, value []byte) error {
+ return st.WriteStream(key, bytes.NewReader(value))
+}
+
+// WriteStream implements Storage.WriteStream()
+func (st *BlockStorage) WriteStream(key string, r io.Reader) error {
+ // Get node file path for key
+ npath, err := st.nodePathForKey(key)
+ if err != nil {
+ return err
+ }
+
+ // Check if this exists
+ ok, err := stat(key)
+ if err != nil {
+ return err
+ }
+
+ // Check if we allow overwrites
+ if ok && !st.config.Overwrite {
+ return ErrAlreadyExists
+ }
+
+ // Ensure nodes dir (and any leading up to) exists
+ err = os.MkdirAll(st.nodePath, defaultDirPerms)
+ if err != nil {
+ return err
+ }
+
+ // Ensure blocks dir (and any leading up to) exists
+ err = os.MkdirAll(st.blockPath, defaultDirPerms)
+ if err != nil {
+ return err
+ }
+
+ // Alloc new node
+ node := node{}
+
+ // Acquire HashEncoder
+ hc := st.hashPool.Get().(*hashEncoder)
+ defer st.hashPool.Put(hc)
+
+ // Create new waitgroup and OnceError for
+ // goroutine error tracking and propagating
+ wg := sync.WaitGroup{}
+ onceErr := errors.OnceError{}
+
+loop:
+ for !onceErr.IsSet() {
+ // Fetch new buffer for this loop
+ buf := util.AcquireBuffer(st.config.BlockSize)
+ buf.Grow(st.config.BlockSize)
+
+ // Read next chunk
+ n, err := io.ReadFull(r, buf.B)
+ switch err {
+ case nil, io.ErrUnexpectedEOF:
+ // do nothing
+ case io.EOF:
+ util.ReleaseBuffer(buf)
+ break loop
+ default:
+ util.ReleaseBuffer(buf)
+ return err
+ }
+
+ // Hash the encoded data
+ sum := hc.EncodeSum(buf.B)
+
+ // Append to the node's hashes
+ node.hashes = append(node.hashes, sum.String())
+
+ // If already on disk, skip
+ has, err := st.statBlock(sum.StringPtr())
+ if err != nil {
+ util.ReleaseBuffer(buf)
+ return err
+ } else if has {
+ util.ReleaseBuffer(buf)
+ continue loop
+ }
+
+ // Write in separate goroutine
+ wg.Add(1)
+ go func() {
+ // Defer buffer release + signal done
+ defer func() {
+ util.ReleaseBuffer(buf)
+ wg.Done()
+ }()
+
+ // Write block to store at hash
+ err = st.writeBlock(sum.StringPtr(), buf.B[:n])
+ if err != nil {
+ onceErr.Store(err)
+ return
+ }
+ }()
+
+ // We reached EOF
+ if n < buf.Len() {
+ break loop
+ }
+ }
+
+ // Wait, check errors
+ wg.Wait()
+ if onceErr.IsSet() {
+ return onceErr.Load()
+ }
+
+ // If no hashes created, return
+ if len(node.hashes) < 1 {
+ return errNoHashesWritten
+ }
+
+ // Prepare to swap error if need-be
+ errSwap := errSwapNoop
+
+ // Build file RW flags
+ // NOTE: we performed an initial check for
+ // this before writing blocks, but if
+ // the utilizer of this storage didn't
+ // correctly mutex protect this key then
+ // someone may have beaten us to the
+ // punch at writing the node file.
+ flags := defaultFileRWFlags
+ if !st.config.Overwrite {
+ flags |= syscall.O_EXCL
+
+ // Catch + replace err exist
+ errSwap = errSwapExist
+ }
+
+ // Attempt to open RW file
+ file, err := open(npath, flags)
+ if err != nil {
+ return errSwap(err)
+ }
+ defer file.Close()
+
+ // Acquire write buffer
+ buf := util.AcquireBuffer(st.config.WriteBufSize)
+ defer util.ReleaseBuffer(buf)
+ buf.Grow(st.config.WriteBufSize)
+
+ // Finally, write data to file
+ _, err = io.CopyBuffer(file, &nodeReader{node: &node}, nil)
+ return err
+}
+
+// writeBlock writes the block with hash and supplied value to the filesystem
+func (st *BlockStorage) writeBlock(hash string, value []byte) error {
+ // Get block file path for key
+ bpath := st.blockPathForKey(hash)
+
+ // Attempt to open RW file
+ file, err := open(bpath, defaultFileRWFlags)
+ if err != nil {
+ if err == ErrAlreadyExists {
+ err = nil /* race issue describe in struct NOTE */
+ }
+ return err
+ }
+ defer file.Close()
+
+ // Wrap the file in a compressor
+ cFile, err := st.config.Compression.Writer(file)
+ if err != nil {
+ return err
+ }
+ defer cFile.Close()
+
+ // Write value to file
+ _, err = cFile.Write(value)
+ return err
+}
+
+// statBlock checks for existence of supplied block hash
+func (st *BlockStorage) statBlock(hash string) (bool, error) {
+ return stat(st.blockPathForKey(hash))
+}
+
+// Stat implements Storage.Stat()
+func (st *BlockStorage) Stat(key string) (bool, error) {
+ // Get node file path for key
+ kpath, err := st.nodePathForKey(key)
+ if err != nil {
+ return false, err
+ }
+
+ // Check for file on disk
+ return stat(kpath)
+}
+
+// Remove implements Storage.Remove()
+func (st *BlockStorage) Remove(key string) error {
+ // Get node file path for key
+ kpath, err := st.nodePathForKey(key)
+ if err != nil {
+ return err
+ }
+
+ // Attempt to remove file
+ return os.Remove(kpath)
+}
+
+// WalkKeys implements Storage.WalkKeys()
+func (st *BlockStorage) WalkKeys(opts *WalkKeysOptions) error {
+ // Acquire path builder
+ pb := fastpath.AcquireBuilder()
+ defer fastpath.ReleaseBuilder(pb)
+
+ // Walk dir for entries
+ return util.WalkDir(pb, st.nodePath, func(npath string, fsentry fs.DirEntry) {
+ // Only deal with regular files
+ if fsentry.Type().IsRegular() {
+ opts.WalkFn(entry(fsentry.Name()))
+ }
+ })
+}
+
+// nodePathForKey calculates the node file path for supplied key
+func (st *BlockStorage) nodePathForKey(key string) (string, error) {
+ // Path separators are illegal
+ if strings.Contains(key, "/") {
+ return "", ErrInvalidKey
+ }
+
+ // Acquire path builder
+ pb := util.AcquirePathBuilder()
+ defer util.ReleasePathBuilder(pb)
+
+ // Return joined + cleaned node-path
+ return pb.Join(st.nodePath, key), nil
+}
+
+// blockPathForKey calculates the block file path for supplied hash
+func (st *BlockStorage) blockPathForKey(hash string) string {
+ pb := util.AcquirePathBuilder()
+ defer util.ReleasePathBuilder(pb)
+ return pb.Join(st.blockPath, hash)
+}
+
+// hashSeparator is the separating byte between block hashes
+const hashSeparator = byte(':')
+
+// node represents the contents of a node file in storage
+type node struct {
+ hashes []string
+}
+
+// removeHash attempts to remove supplied block hash from the node's hash array
+func (n *node) removeHash(hash string) bool {
+ haveDropped := false
+ for i := 0; i < len(n.hashes); {
+ if n.hashes[i] == hash {
+ // Drop this hash from slice
+ n.hashes = append(n.hashes[:i], n.hashes[i+1:]...)
+ haveDropped = true
+ } else {
+ // Continue iter
+ i++
+ }
+ }
+ return haveDropped
+}
+
+// nodeReader is an io.Reader implementation for the node file representation,
+// which is useful when calculated node file is being written to the store
+type nodeReader struct {
+ node *node
+ idx int
+ last int
+}
+
+func (r *nodeReader) Read(b []byte) (int, error) {
+ n := 0
+
+ // '-1' means we missed writing
+ // hash separator on last iteration
+ if r.last == -1 {
+ b[n] = hashSeparator
+ n++
+ r.last = 0
+ }
+
+ for r.idx < len(r.node.hashes) {
+ hash := r.node.hashes[r.idx]
+
+ // Copy into buffer + update read count
+ m := copy(b[n:], hash[r.last:])
+ n += m
+
+ // If incomplete copy, return here
+ if m < len(hash)-r.last {
+ r.last = m
+ return n, nil
+ }
+
+ // Check we can write last separator
+ if n == len(b) {
+ r.last = -1
+ return n, nil
+ }
+
+ // Write separator, iter, reset
+ b[n] = hashSeparator
+ n++
+ r.idx++
+ r.last = 0
+ }
+
+ // We reached end of hashes
+ return n, io.EOF
+}
+
+// nodeWriter is an io.Writer implementation for the node file representation,
+// which is useful when calculated node file is being read from the store
+type nodeWriter struct {
+ node *node
+ buf *bytes.Buffer
+}
+
+func (w *nodeWriter) Write(b []byte) (int, error) {
+ n := 0
+
+ for {
+ // Find next hash separator position
+ idx := bytes.IndexByte(b[n:], hashSeparator)
+ if idx == -1 {
+ // Check we shouldn't be expecting it
+ if w.buf.Len() > encodedHashLen {
+ return n, errInvalidNode
+ }
+
+ // Write all contents to buffer
+ w.buf.Write(b[n:])
+ return len(b), nil
+ }
+
+ // Found hash separator, write
+ // current buf contents to Node hashes
+ w.buf.Write(b[n : n+idx])
+ n += idx + 1
+ if w.buf.Len() != encodedHashLen {
+ return n, errInvalidNode
+ }
+
+ // Append to hashes & reset
+ w.node.hashes = append(w.node.hashes, w.buf.String())
+ w.buf.Reset()
+ }
+}
+
+// blockReader is an io.Reader implementation for the combined, linked block
+// data contained with a node file. Basically, this allows reading value data
+// from the store for a given node file
+type blockReader struct {
+ storage *BlockStorage
+ node *node
+ buf []byte
+ prev int
+}
+
+func (r *blockReader) Read(b []byte) (int, error) {
+ n := 0
+
+ // Data left in buf, copy as much as we
+ // can into supplied read buffer
+ if r.prev < len(r.buf)-1 {
+ n += copy(b, r.buf[r.prev:])
+ r.prev += n
+ if n >= len(b) {
+ return n, nil
+ }
+ }
+
+ for {
+ // Check we have any hashes left
+ if len(r.node.hashes) < 1 {
+ return n, io.EOF
+ }
+
+ // Get next key from slice
+ key := r.node.hashes[0]
+ r.node.hashes = r.node.hashes[1:]
+
+ // Attempt to fetch next batch of data
+ var err error
+ r.buf, err = r.storage.readBlock(key)
+ if err != nil {
+ return n, err
+ }
+ r.prev = 0
+
+ // Copy as much as can from new buffer
+ m := copy(b[n:], r.buf)
+ r.prev += m
+ n += m
+
+ // If we hit end of supplied buf, return
+ if n >= len(b) {
+ return n, nil
+ }
+ }
+}
+
+// hashEncoder is a HashEncoder with built-in encode buffer
+type hashEncoder struct {
+ henc hashenc.HashEncoder
+ ebuf []byte
+}
+
+// encodedHashLen is the once-calculated encoded hash-sum length
+var encodedHashLen = hashenc.Base64().EncodedLen(
+ sha256.New().Size(),
+)
+
+// newHashEncoder returns a new hashEncoder instance
+func newHashEncoder() *hashEncoder {
+ hash := sha256.New()
+ enc := hashenc.Base64()
+ return &hashEncoder{
+ henc: hashenc.New(hash, enc),
+ ebuf: make([]byte, enc.EncodedLen(hash.Size())),
+ }
+}
+
+// EncodeSum encodes the src data and returns resulting bytes, only valid until next call to EncodeSum()
+func (henc *hashEncoder) EncodeSum(src []byte) bytes.Bytes {
+ henc.henc.EncodeSum(henc.ebuf, src)
+ return bytes.ToBytes(henc.ebuf)
+}
diff --git a/vendor/git.iim.gay/grufwub/go-store/storage/compressor.go b/vendor/git.iim.gay/grufwub/go-store/storage/compressor.go
new file mode 100644
index 000000000..b8cfb906c
--- /dev/null
+++ b/vendor/git.iim.gay/grufwub/go-store/storage/compressor.go
@@ -0,0 +1,104 @@
+package storage
+
+import (
+ "compress/gzip"
+ "compress/zlib"
+ "io"
+
+ "git.iim.gay/grufwub/go-store/util"
+ "github.com/golang/snappy"
+)
+
+// Compressor defines a means of compressing/decompressing values going into a key-value store
+type Compressor interface {
+ // Reader returns a new decompressing io.ReadCloser based on supplied (compressed) io.Reader
+ Reader(io.Reader) (io.ReadCloser, error)
+
+ // Writer returns a new compressing io.WriteCloser based on supplied (uncompressed) io.Writer
+ Writer(io.Writer) (io.WriteCloser, error)
+}
+
+type gzipCompressor struct {
+ level int
+}
+
+// GZipCompressor returns a new Compressor that implements GZip at default compression level
+func GZipCompressor() Compressor {
+ return GZipCompressorLevel(gzip.DefaultCompression)
+}
+
+// GZipCompressorLevel returns a new Compressor that implements GZip at supplied compression level
+func GZipCompressorLevel(level int) Compressor {
+ return &gzipCompressor{
+ level: level,
+ }
+}
+
+func (c *gzipCompressor) Reader(r io.Reader) (io.ReadCloser, error) {
+ return gzip.NewReader(r)
+}
+
+func (c *gzipCompressor) Writer(w io.Writer) (io.WriteCloser, error) {
+ return gzip.NewWriterLevel(w, c.level)
+}
+
+type zlibCompressor struct {
+ level int
+ dict []byte
+}
+
+// ZLibCompressor returns a new Compressor that implements ZLib at default compression level
+func ZLibCompressor() Compressor {
+ return ZLibCompressorLevelDict(zlib.DefaultCompression, nil)
+}
+
+// ZLibCompressorLevel returns a new Compressor that implements ZLib at supplied compression level
+func ZLibCompressorLevel(level int) Compressor {
+ return ZLibCompressorLevelDict(level, nil)
+}
+
+// ZLibCompressorLevelDict returns a new Compressor that implements ZLib at supplied compression level with supplied dict
+func ZLibCompressorLevelDict(level int, dict []byte) Compressor {
+ return &zlibCompressor{
+ level: level,
+ dict: dict,
+ }
+}
+
+func (c *zlibCompressor) Reader(r io.Reader) (io.ReadCloser, error) {
+ return zlib.NewReaderDict(r, c.dict)
+}
+
+func (c *zlibCompressor) Writer(w io.Writer) (io.WriteCloser, error) {
+ return zlib.NewWriterLevelDict(w, c.level, c.dict)
+}
+
+type snappyCompressor struct{}
+
+// SnappyCompressor returns a new Compressor that implements Snappy
+func SnappyCompressor() Compressor {
+ return &snappyCompressor{}
+}
+
+func (c *snappyCompressor) Reader(r io.Reader) (io.ReadCloser, error) {
+ return util.NopReadCloser(snappy.NewReader(r)), nil
+}
+
+func (c *snappyCompressor) Writer(w io.Writer) (io.WriteCloser, error) {
+ return snappy.NewBufferedWriter(w), nil
+}
+
+type nopCompressor struct{}
+
+// NoCompression is a Compressor that simply does nothing
+func NoCompression() Compressor {
+ return &nopCompressor{}
+}
+
+func (c *nopCompressor) Reader(r io.Reader) (io.ReadCloser, error) {
+ return util.NopReadCloser(r), nil
+}
+
+func (c *nopCompressor) Writer(w io.Writer) (io.WriteCloser, error) {
+ return util.NopWriteCloser(w), nil
+}
diff --git a/vendor/git.iim.gay/grufwub/go-store/storage/disk.go b/vendor/git.iim.gay/grufwub/go-store/storage/disk.go
new file mode 100644
index 000000000..884c2e252
--- /dev/null
+++ b/vendor/git.iim.gay/grufwub/go-store/storage/disk.go
@@ -0,0 +1,289 @@
+package storage
+
+import (
+ "io"
+ "io/fs"
+ "os"
+ "path"
+ "syscall"
+
+ "git.iim.gay/grufwub/fastpath"
+ "git.iim.gay/grufwub/go-bytes"
+ "git.iim.gay/grufwub/go-store/util"
+)
+
+// DefaultDiskConfig is the default DiskStorage configuration
+var DefaultDiskConfig = &DiskConfig{
+ Overwrite: true,
+ WriteBufSize: 4096,
+ Transform: NopTransform(),
+ Compression: NoCompression(),
+}
+
+// DiskConfig defines options to be used when opening a DiskStorage
+type DiskConfig struct {
+ // Transform is the supplied key<-->path KeyTransform
+ Transform KeyTransform
+
+ // WriteBufSize is the buffer size to use when writing file streams (PutStream)
+ WriteBufSize int
+
+ // Overwrite allows overwriting values of stored keys in the storage
+ Overwrite bool
+
+ // Compression is the Compressor to use when reading / writing files, default is no compression
+ Compression Compressor
+}
+
+// getDiskConfig returns a valid DiskConfig for supplied ptr
+func getDiskConfig(cfg *DiskConfig) DiskConfig {
+ // If nil, use default
+ if cfg == nil {
+ cfg = DefaultDiskConfig
+ }
+
+ // Assume nil transform == none
+ if cfg.Transform == nil {
+ cfg.Transform = NopTransform()
+ }
+
+ // Assume nil compress == none
+ if cfg.Compression == nil {
+ cfg.Compression = NoCompression()
+ }
+
+ // Assume 0 buf size == use default
+ if cfg.WriteBufSize < 1 {
+ cfg.WriteBufSize = DefaultDiskConfig.WriteBufSize
+ }
+
+ // Return owned config copy
+ return DiskConfig{
+ Transform: cfg.Transform,
+ WriteBufSize: cfg.WriteBufSize,
+ Overwrite: cfg.Overwrite,
+ Compression: cfg.Compression,
+ }
+}
+
+// DiskStorage is a Storage implementation that stores directly to a filesystem
+type DiskStorage struct {
+ path string // path is the root path of this store
+ dots int // dots is the "dotdot" count for the root store path
+ config DiskConfig // cfg is the supplied configuration for this store
+}
+
+// OpenFile opens a DiskStorage instance for given folder path and configuration
+func OpenFile(path string, cfg *DiskConfig) (*DiskStorage, error) {
+ // Acquire path builder
+ pb := util.AcquirePathBuilder()
+ defer util.ReleasePathBuilder(pb)
+
+ // Clean provided path, ensure ends in '/' (should
+ // be dir, this helps with file path trimming later)
+ path = pb.Clean(path) + "/"
+
+ // Get checked config
+ config := getDiskConfig(cfg)
+
+ // Attempt to open dir path
+ file, err := os.OpenFile(path, defaultFileROFlags, defaultDirPerms)
+ if err != nil {
+ // If not a not-exist error, return
+ if !os.IsNotExist(err) {
+ return nil, err
+ }
+
+ // Attempt to make store path dirs
+ err = os.MkdirAll(path, defaultDirPerms)
+ if err != nil {
+ return nil, err
+ }
+
+ // Reopen dir now it's been created
+ file, err = os.OpenFile(path, defaultFileROFlags, defaultDirPerms)
+ if err != nil {
+ return nil, err
+ }
+ }
+ defer file.Close()
+
+ // Double check this is a dir (NOT a file!)
+ stat, err := file.Stat()
+ if err != nil {
+ return nil, err
+ } else if !stat.IsDir() {
+ return nil, errPathIsFile
+ }
+
+ // Return new DiskStorage
+ return &DiskStorage{
+ path: path,
+ dots: util.CountDotdots(path),
+ config: config,
+ }, nil
+}
+
+// Clean implements Storage.Clean()
+func (st *DiskStorage) Clean() error {
+ return util.CleanDirs(st.path)
+}
+
+// ReadBytes implements Storage.ReadBytes()
+func (st *DiskStorage) ReadBytes(key string) ([]byte, error) {
+ // Get stream reader for key
+ rc, err := st.ReadStream(key)
+ if err != nil {
+ return nil, err
+ }
+ defer rc.Close()
+
+ // Read all bytes and return
+ return io.ReadAll(rc)
+}
+
+// ReadStream implements Storage.ReadStream()
+func (st *DiskStorage) ReadStream(key string) (io.ReadCloser, error) {
+ // Get file path for key
+ kpath, err := st.filepath(key)
+ if err != nil {
+ return nil, err
+ }
+
+ // Attempt to open file (replace ENOENT with our own)
+ file, err := open(kpath, defaultFileROFlags)
+ if err != nil {
+ return nil, errSwapNotFound(err)
+ }
+
+ // Wrap the file in a compressor
+ cFile, err := st.config.Compression.Reader(file)
+ if err != nil {
+ file.Close() // close this here, ignore error
+ return nil, err
+ }
+
+ // Wrap compressor to ensure file close
+ return util.ReadCloserWithCallback(cFile, func() {
+ file.Close()
+ }), nil
+}
+
+// WriteBytes implements Storage.WriteBytes()
+func (st *DiskStorage) WriteBytes(key string, value []byte) error {
+ return st.WriteStream(key, bytes.NewReader(value))
+}
+
+// WriteStream implements Storage.WriteStream()
+func (st *DiskStorage) WriteStream(key string, r io.Reader) error {
+ // Get file path for key
+ kpath, err := st.filepath(key)
+ if err != nil {
+ return err
+ }
+
+ // Ensure dirs leading up to file exist
+ err = os.MkdirAll(path.Dir(kpath), defaultDirPerms)
+ if err != nil {
+ return err
+ }
+
+ // Prepare to swap error if need-be
+ errSwap := errSwapNoop
+
+ // Build file RW flags
+ flags := defaultFileRWFlags
+ if !st.config.Overwrite {
+ flags |= syscall.O_EXCL
+
+ // Catch + replace err exist
+ errSwap = errSwapExist
+ }
+
+ // Attempt to open file
+ file, err := open(kpath, flags)
+ if err != nil {
+ return errSwap(err)
+ }
+ defer file.Close()
+
+ // Wrap the file in a compressor
+ cFile, err := st.config.Compression.Writer(file)
+ if err != nil {
+ return err
+ }
+ defer cFile.Close()
+
+ // Acquire write buffer
+ buf := util.AcquireBuffer(st.config.WriteBufSize)
+ defer util.ReleaseBuffer(buf)
+ buf.Grow(st.config.WriteBufSize)
+
+ // Copy reader to file
+ _, err = io.CopyBuffer(cFile, r, buf.B)
+ return err
+}
+
+// Stat implements Storage.Stat()
+func (st *DiskStorage) Stat(key string) (bool, error) {
+ // Get file path for key
+ kpath, err := st.filepath(key)
+ if err != nil {
+ return false, err
+ }
+
+ // Check for file on disk
+ return stat(kpath)
+}
+
+// Remove implements Storage.Remove()
+func (st *DiskStorage) Remove(key string) error {
+ // Get file path for key
+ kpath, err := st.filepath(key)
+ if err != nil {
+ return err
+ }
+
+ // Attempt to remove file
+ return os.Remove(kpath)
+}
+
+// WalkKeys implements Storage.WalkKeys()
+func (st *DiskStorage) WalkKeys(opts *WalkKeysOptions) error {
+ // Acquire path builder
+ pb := fastpath.AcquireBuilder()
+ defer fastpath.ReleaseBuilder(pb)
+
+ // Walk dir for entries
+ return util.WalkDir(pb, st.path, func(kpath string, fsentry fs.DirEntry) {
+ // Only deal with regular files
+ if fsentry.Type().IsRegular() {
+ // Get full item path (without root)
+ kpath = pb.Join(kpath, fsentry.Name())[len(st.path):]
+
+ // Perform provided walk function
+ opts.WalkFn(entry(st.config.Transform.PathToKey(kpath)))
+ }
+ })
+}
+
+// filepath checks and returns a formatted filepath for given key
+func (st *DiskStorage) filepath(key string) (string, error) {
+ // Acquire path builder
+ pb := util.AcquirePathBuilder()
+ defer util.ReleasePathBuilder(pb)
+
+ // Calculate transformed key path
+ key = st.config.Transform.KeyToPath(key)
+
+ // Generated joined root path
+ pb.AppendString(st.path)
+ pb.AppendString(key)
+
+ // If path is dir traversal, and traverses FURTHER
+ // than store root, this is an error
+ if util.CountDotdots(pb.StringPtr()) > st.dots {
+ return "", ErrInvalidKey
+ }
+ return pb.String(), nil
+}
diff --git a/vendor/git.iim.gay/grufwub/go-store/storage/errors.go b/vendor/git.iim.gay/grufwub/go-store/storage/errors.go
new file mode 100644
index 000000000..016593596
--- /dev/null
+++ b/vendor/git.iim.gay/grufwub/go-store/storage/errors.go
@@ -0,0 +1,63 @@
+package storage
+
+import (
+ "fmt"
+ "syscall"
+)
+
+// errorString is our own simple error type
+type errorString string
+
+// Error implements error
+func (e errorString) Error() string {
+ return string(e)
+}
+
+// Extend appends extra information to an errorString
+func (e errorString) Extend(s string, a ...interface{}) errorString {
+ return errorString(string(e) + ": " + fmt.Sprintf(s, a...))
+}
+
+var (
+ // ErrNotFound is the error returned when a key cannot be found in storage
+ ErrNotFound = errorString("store/storage: key not found")
+
+ // ErrAlreadyExist is the error returned when a key already exists in storage
+ ErrAlreadyExists = errorString("store/storage: key already exists")
+
+ // ErrInvalidkey is the error returned when an invalid key is passed to storage
+ ErrInvalidKey = errorString("store/storage: invalid key")
+
+ // errPathIsFile is returned when a path for a disk config is actually a file
+ errPathIsFile = errorString("store/storage: path is file")
+
+ // errNoHashesWritten is returned when no blocks are written for given input value
+ errNoHashesWritten = errorString("storage/storage: no hashes written")
+
+ // errInvalidNode is returned when read on an invalid node in the store is attempted
+ errInvalidNode = errorString("store/storage: invalid node")
+
+ // errCorruptNodes is returned when nodes with missing blocks are found during a BlockStorage clean
+ errCorruptNodes = errorString("store/storage: corrupted nodes")
+)
+
+// errSwapNoop performs no error swaps
+func errSwapNoop(err error) error {
+ return err
+}
+
+// ErrSwapNotFound swaps syscall.ENOENT for ErrNotFound
+func errSwapNotFound(err error) error {
+ if err == syscall.ENOENT {
+ return ErrNotFound
+ }
+ return err
+}
+
+// errSwapExist swaps syscall.EEXIST for ErrAlreadyExists
+func errSwapExist(err error) error {
+ if err == syscall.EEXIST {
+ return ErrAlreadyExists
+ }
+ return err
+}
diff --git a/vendor/git.iim.gay/grufwub/go-store/storage/fs.go b/vendor/git.iim.gay/grufwub/go-store/storage/fs.go
new file mode 100644
index 000000000..2d6c44caf
--- /dev/null
+++ b/vendor/git.iim.gay/grufwub/go-store/storage/fs.go
@@ -0,0 +1,48 @@
+package storage
+
+import (
+ "os"
+ "syscall"
+
+ "git.iim.gay/grufwub/go-store/util"
+)
+
+const (
+ defaultDirPerms = 0755
+ defaultFilePerms = 0644
+ defaultFileROFlags = syscall.O_RDONLY
+ defaultFileRWFlags = syscall.O_CREAT | syscall.O_RDWR
+ defaultFileLockFlags = syscall.O_RDONLY | syscall.O_EXCL | syscall.O_CREAT
+)
+
+// NOTE:
+// These functions are for opening storage files,
+// not necessarily for e.g. initial setup (OpenFile)
+
+// open should not be called directly
+func open(path string, flags int) (*os.File, error) {
+ var fd int
+ err := util.RetryOnEINTR(func() (err error) {
+ fd, err = syscall.Open(path, flags, defaultFilePerms)
+ return
+ })
+ if err != nil {
+ return nil, err
+ }
+ return os.NewFile(uintptr(fd), path), nil
+}
+
+// stat checks for a file on disk
+func stat(path string) (bool, error) {
+ var stat syscall.Stat_t
+ err := util.RetryOnEINTR(func() error {
+ return syscall.Stat(path, &stat)
+ })
+ if err != nil {
+ if err == syscall.ENOENT {
+ err = nil
+ }
+ return false, err
+ }
+ return true, nil
+}
diff --git a/vendor/git.iim.gay/grufwub/go-store/storage/lock.go b/vendor/git.iim.gay/grufwub/go-store/storage/lock.go
new file mode 100644
index 000000000..e7c7bf49a
--- /dev/null
+++ b/vendor/git.iim.gay/grufwub/go-store/storage/lock.go
@@ -0,0 +1,34 @@
+package storage
+
+import (
+ "os"
+ "syscall"
+
+ "git.iim.gay/grufwub/go-store/util"
+)
+
+type lockableFile struct {
+ *os.File
+}
+
+func openLock(path string) (*lockableFile, error) {
+ file, err := open(path, defaultFileLockFlags)
+ if err != nil {
+ return nil, err
+ }
+ return &lockableFile{file}, nil
+}
+
+func (f *lockableFile) lock() error {
+ return f.flock(syscall.LOCK_EX | syscall.LOCK_NB)
+}
+
+func (f *lockableFile) unlock() error {
+ return f.flock(syscall.LOCK_UN | syscall.LOCK_NB)
+}
+
+func (f *lockableFile) flock(how int) error {
+ return util.RetryOnEINTR(func() error {
+ return syscall.Flock(int(f.Fd()), how)
+ })
+}
diff --git a/vendor/git.iim.gay/grufwub/go-store/storage/storage.go b/vendor/git.iim.gay/grufwub/go-store/storage/storage.go
new file mode 100644
index 000000000..61f722111
--- /dev/null
+++ b/vendor/git.iim.gay/grufwub/go-store/storage/storage.go
@@ -0,0 +1,51 @@
+package storage
+
+import (
+ "io"
+)
+
+// StorageEntry defines a key in Storage
+type StorageEntry interface {
+ // Key returns the storage entry's key
+ Key() string
+}
+
+// entry is the simplest possible StorageEntry
+type entry string
+
+func (e entry) Key() string {
+ return string(e)
+}
+
+// Storage defines a means of storing and accessing key value pairs
+type Storage interface {
+ // Clean removes unused values and unclutters the storage (e.g. removing empty folders)
+ Clean() error
+
+ // ReadBytes returns the byte value for key in storage
+ ReadBytes(key string) ([]byte, error)
+
+ // ReadStream returns an io.ReadCloser for the value bytes at key in the storage
+ ReadStream(key string) (io.ReadCloser, error)
+
+ // WriteBytes writes the supplied value bytes at key in the storage
+ WriteBytes(key string, value []byte) error
+
+ // WriteStream writes the bytes from supplied reader at key in the storage
+ WriteStream(key string, r io.Reader) error
+
+ // Stat checks if the supplied key is in the storage
+ Stat(key string) (bool, error)
+
+ // Remove attempts to remove the supplied key-value pair from storage
+ Remove(key string) error
+
+ // WalkKeys walks the keys in the storage
+ WalkKeys(opts *WalkKeysOptions) error
+}
+
+// WalkKeysOptions defines how to walk the keys in a storage implementation
+type WalkKeysOptions struct {
+ // WalkFn is the function to apply on each StorageEntry
+ WalkFn func(StorageEntry)
+}
diff --git a/vendor/git.iim.gay/grufwub/go-store/storage/transform.go b/vendor/git.iim.gay/grufwub/go-store/storage/transform.go
new file mode 100644
index 000000000..3863dd774
--- /dev/null
+++ b/vendor/git.iim.gay/grufwub/go-store/storage/transform.go
@@ -0,0 +1,25 @@
+package storage
+
+// KeyTransform defines a method of converting store keys to storage paths (and vice-versa)
+type KeyTransform interface {
+ // KeyToPath converts a supplied key to storage path
+ KeyToPath(string) string
+
+ // PathToKey converts a supplied storage path to key
+ PathToKey(string) string
+}
+
+type nopKeyTransform struct{}
+
+// NopTransform returns a nop key transform (i.e. key = path)
+func NopTransform() KeyTransform {
+ return &nopKeyTransform{}
+}
+
+func (t *nopKeyTransform) KeyToPath(key string) string {
+ return key
+}
+
+func (t *nopKeyTransform) PathToKey(path string) string {
+ return path
+}
diff --git a/vendor/git.iim.gay/grufwub/go-store/util/fs.go b/vendor/git.iim.gay/grufwub/go-store/util/fs.go
new file mode 100644
index 000000000..20c0ab116
--- /dev/null
+++ b/vendor/git.iim.gay/grufwub/go-store/util/fs.go
@@ -0,0 +1,105 @@
+package util
+
+import (
+ "io/fs"
+ "os"
+ "strings"
+ "syscall"
+
+ "git.iim.gay/grufwub/fastpath"
+)
+
+var dotdot = "../"
+
+// CountDotdots returns the number of "dot-dots" (../) in a cleaned filesystem path
+func CountDotdots(path string) int {
+ if !strings.HasSuffix(path, dotdot) {
+ return 0
+ }
+ return strings.Count(path, dotdot)
+}
+
+// WalkDir traverses the dir tree of the supplied path, performing the supplied walkFn on each entry
+func WalkDir(pb *fastpath.Builder, path string, walkFn func(string, fs.DirEntry)) error {
+ // Read supplied dir path
+ dirEntries, err := os.ReadDir(path)
+ if err != nil {
+ return err
+ }
+
+ // Iter entries
+ for _, entry := range dirEntries {
+ // Pass to walk fn
+ walkFn(path, entry)
+
+ // Recurse dir entries
+ if entry.IsDir() {
+ err = WalkDir(pb, pb.Join(path, entry.Name()), walkFn)
+ if err != nil {
+ return err
+ }
+ }
+ }
+
+ return nil
+}
+
+// CleanDirs traverses the dir tree of the supplied path, removing any folders with zero children
+func CleanDirs(path string) error {
+ // Acquire builder
+ pb := AcquirePathBuilder()
+ defer ReleasePathBuilder(pb)
+
+ // Get dir entries
+ entries, err := os.ReadDir(path)
+ if err != nil {
+ return err
+ }
+
+ // Recurse dirs
+ for _, entry := range entries {
+ if entry.IsDir() {
+ err := cleanDirs(pb, pb.Join(path, entry.Name()))
+ if err != nil {
+ return err
+ }
+ }
+ }
+ return nil
+}
+
+// cleanDirs performs the actual dir cleaning logic for the exported version
+func cleanDirs(pb *fastpath.Builder, path string) error {
+ // Get dir entries
+ entries, err := os.ReadDir(path)
+ if err != nil {
+ return err
+ }
+
+ // If no entries, delete
+ if len(entries) < 1 {
+ return os.Remove(path)
+ }
+
+ // Recurse dirs
+ for _, entry := range entries {
+ if entry.IsDir() {
+ err := cleanDirs(pb, pb.Join(path, entry.Name()))
+ if err != nil {
+ return err
+ }
+ }
+ }
+ return nil
+}
+
+// RetryOnEINTR is a low-level filesystem function for retrying syscalls on O_EINTR received
+func RetryOnEINTR(do func() error) error {
+ for {
+ err := do()
+ if err == syscall.EINTR {
+ continue
+ }
+ return err
+ }
+}
diff --git a/vendor/git.iim.gay/grufwub/go-store/util/io.go b/vendor/git.iim.gay/grufwub/go-store/util/io.go
new file mode 100644
index 000000000..d034cf62b
--- /dev/null
+++ b/vendor/git.iim.gay/grufwub/go-store/util/io.go
@@ -0,0 +1,42 @@
+package util
+
+import "io"
+
+// NopReadCloser turns a supplied io.Reader into io.ReadCloser with a nop Close() implementation
+func NopReadCloser(r io.Reader) io.ReadCloser {
+ return &nopReadCloser{r}
+}
+
+// NopWriteCloser turns a supplied io.Writer into io.WriteCloser with a nop Close() implementation
+func NopWriteCloser(w io.Writer) io.WriteCloser {
+ return &nopWriteCloser{w}
+}
+
+// ReadCloserWithCallback adds a customizable callback to be called upon Close() of a supplied io.ReadCloser
+func ReadCloserWithCallback(rc io.ReadCloser, cb func()) io.ReadCloser {
+ return &callbackReadCloser{
+ ReadCloser: rc,
+ callback: cb,
+ }
+}
+
+// nopReadCloser turns an io.Reader -> io.ReadCloser with a nop Close()
+type nopReadCloser struct{ io.Reader }
+
+func (r *nopReadCloser) Close() error { return nil }
+
+// nopWriteCloser turns an io.Writer -> io.WriteCloser with a nop Close()
+type nopWriteCloser struct{ io.Writer }
+
+func (w nopWriteCloser) Close() error { return nil }
+
+// callbackReadCloser allows adding our own custom callback to an io.ReadCloser
+type callbackReadCloser struct {
+ io.ReadCloser
+ callback func()
+}
+
+func (c *callbackReadCloser) Close() error {
+ defer c.callback()
+ return c.ReadCloser.Close()
+}
diff --git a/vendor/git.iim.gay/grufwub/go-store/util/nocopy.go b/vendor/git.iim.gay/grufwub/go-store/util/nocopy.go
new file mode 100644
index 000000000..e4dd071aa
--- /dev/null
+++ b/vendor/git.iim.gay/grufwub/go-store/util/nocopy.go
@@ -0,0 +1,6 @@
+package util
+
+type NoCopy struct{}
+
+func (*NoCopy) Lock() {}
+func (*NoCopy) Unlock() {}
diff --git a/vendor/git.iim.gay/grufwub/go-store/util/pools.go b/vendor/git.iim.gay/grufwub/go-store/util/pools.go
new file mode 100644
index 000000000..c02f2d25e
--- /dev/null
+++ b/vendor/git.iim.gay/grufwub/go-store/util/pools.go
@@ -0,0 +1,44 @@
+package util
+
+import (
+ "sync"
+
+ "git.iim.gay/grufwub/fastpath"
+ "git.iim.gay/grufwub/go-bufpool"
+ "git.iim.gay/grufwub/go-bytes"
+)
+
+// pathBuilderPool is the global fastpath.Builder pool, we implement
+// our own here instead of using fastpath's default one because we
+// don't want to deal with fastpath's sync.Once locks on every Acquire/Release
+var pathBuilderPool = sync.Pool{
+ New: func() interface{} {
+ pb := fastpath.NewBuilder(make([]byte, 0, 512))
+ return &pb
+ },
+}
+
+// AcquirePathBuilder returns a reset fastpath.Builder instance
+func AcquirePathBuilder() *fastpath.Builder {
+ return pathBuilderPool.Get().(*fastpath.Builder)
+}
+
+// ReleasePathBuilder resets and releases provided fastpath.Builder instance to global pool
+func ReleasePathBuilder(pb *fastpath.Builder) {
+ pb.Reset()
+ pathBuilderPool.Put(pb)
+}
+
+// bufferPool is the global BufferPool, we implement this here
+// so we can share allocations across whatever libaries need them.
+var bufferPool = bufpool.BufferPool{}
+
+// AcquireBuffer returns a reset bytes.Buffer with at least requested capacity
+func AcquireBuffer(cap int) *bytes.Buffer {
+ return bufferPool.Get(cap)
+}
+
+// ReleaseBuffer resets and releases provided bytes.Buffer to global BufferPool
+func ReleaseBuffer(buf *bytes.Buffer) {
+ bufferPool.Put(buf)
+}