diff options
Diffstat (limited to 'vendor/git.iim.gay/grufwub/go-store')
-rw-r--r-- | vendor/git.iim.gay/grufwub/go-store/kv/iterator.go | 64 | ||||
-rw-r--r-- | vendor/git.iim.gay/grufwub/go-store/kv/state.go | 125 | ||||
-rw-r--r-- | vendor/git.iim.gay/grufwub/go-store/kv/store.go | 243 | ||||
-rw-r--r-- | vendor/git.iim.gay/grufwub/go-store/storage/block.go | 785 | ||||
-rw-r--r-- | vendor/git.iim.gay/grufwub/go-store/storage/compressor.go | 104 | ||||
-rw-r--r-- | vendor/git.iim.gay/grufwub/go-store/storage/disk.go | 289 | ||||
-rw-r--r-- | vendor/git.iim.gay/grufwub/go-store/storage/errors.go | 63 | ||||
-rw-r--r-- | vendor/git.iim.gay/grufwub/go-store/storage/fs.go | 48 | ||||
-rw-r--r-- | vendor/git.iim.gay/grufwub/go-store/storage/lock.go | 34 | ||||
-rw-r--r-- | vendor/git.iim.gay/grufwub/go-store/storage/storage.go | 51 | ||||
-rw-r--r-- | vendor/git.iim.gay/grufwub/go-store/storage/transform.go | 25 | ||||
-rw-r--r-- | vendor/git.iim.gay/grufwub/go-store/util/fs.go | 105 | ||||
-rw-r--r-- | vendor/git.iim.gay/grufwub/go-store/util/io.go | 42 | ||||
-rw-r--r-- | vendor/git.iim.gay/grufwub/go-store/util/nocopy.go | 6 | ||||
-rw-r--r-- | vendor/git.iim.gay/grufwub/go-store/util/pools.go | 44 |
15 files changed, 2028 insertions, 0 deletions
diff --git a/vendor/git.iim.gay/grufwub/go-store/kv/iterator.go b/vendor/git.iim.gay/grufwub/go-store/kv/iterator.go new file mode 100644 index 000000000..f2c68b87c --- /dev/null +++ b/vendor/git.iim.gay/grufwub/go-store/kv/iterator.go @@ -0,0 +1,64 @@ +package kv + +import ( + "git.iim.gay/grufwub/go-errors" + "git.iim.gay/grufwub/go-store/storage" +) + +var ErrIteratorClosed = errors.Define("store/kv: iterator closed") + +// KVIterator provides a read-only iterator to all the key-value +// pairs in a KVStore. While the iterator is open the store is read +// locked, you MUST release the iterator when you are finished with +// it. +// +// Please note: +// - individual iterators are NOT concurrency safe, though it is safe to +// have multiple iterators running concurrently +type KVIterator struct { + store *KVStore // store is the linked KVStore + entries []storage.StorageEntry + index int + key string + onClose func() +} + +// Next attempts to set the next key-value pair, the +// return value is if there was another pair remaining +func (i *KVIterator) Next() bool { + next := i.index + 1 + if next >= len(i.entries) { + i.key = "" + return false + } + i.key = i.entries[next].Key() + i.index = next + return true +} + +// Key returns the next key from the store +func (i *KVIterator) Key() string { + return i.key +} + +// Release releases the KVIterator and KVStore's read lock +func (i *KVIterator) Release() { + // Reset key, path, entries + i.store = nil + i.key = "" + i.entries = nil + + // Perform requested callback + i.onClose() +} + +// Value returns the next value from the KVStore +func (i *KVIterator) Value() ([]byte, error) { + // Check store isn't closed + if i.store == nil { + return nil, ErrIteratorClosed + } + + // Attempt to fetch from store + return i.store.get(i.key) +} diff --git a/vendor/git.iim.gay/grufwub/go-store/kv/state.go b/vendor/git.iim.gay/grufwub/go-store/kv/state.go new file mode 100644 index 000000000..a8bc64637 --- /dev/null +++ b/vendor/git.iim.gay/grufwub/go-store/kv/state.go @@ -0,0 +1,125 @@ +package kv + +import ( + "io" + + "git.iim.gay/grufwub/go-errors" +) + +var ErrStateClosed = errors.Define("store/kv: state closed") + +// StateRO provides a read-only window to the store. While this +// state is active during the Read() function window, the entire +// store will be read-locked. The state is thread-safe for concurrent +// use UNTIL the moment that your supplied function to Read() returns, +// then the state has zero guarantees +type StateRO struct { + store *KVStore +} + +func (st *StateRO) Get(key string) ([]byte, error) { + // Check not closed + if st.store == nil { + return nil, ErrStateClosed + } + + // Pass request to store + return st.store.get(key) +} + +func (st *StateRO) GetStream(key string) (io.ReadCloser, error) { + // Check not closed + if st.store == nil { + return nil, ErrStateClosed + } + + // Pass request to store + return st.store.getStream(key) +} + +func (st *StateRO) Has(key string) (bool, error) { + // Check not closed + if st.store == nil { + return false, ErrStateClosed + } + + // Pass request to store + return st.store.has(key) +} + +func (st *StateRO) close() { + st.store = nil +} + +// StateRW provides a read-write window to the store. While this +// state is active during the Update() function window, the entire +// store will be locked. The state is thread-safe for concurrent +// use UNTIL the moment that your supplied function to Update() returns, +// then the state has zero guarantees +type StateRW struct { + store *KVStore +} + +func (st *StateRW) Get(key string) ([]byte, error) { + // Check not closed + if st.store == nil { + return nil, ErrStateClosed + } + + // Pass request to store + return st.store.get(key) +} + +func (st *StateRW) GetStream(key string) (io.ReadCloser, error) { + // Check not closed + if st.store == nil { + return nil, ErrStateClosed + } + + // Pass request to store + return st.store.getStream(key) +} + +func (st *StateRW) Put(key string, value []byte) error { + // Check not closed + if st.store == nil { + return ErrStateClosed + } + + // Pass request to store + return st.store.put(key, value) +} + +func (st *StateRW) PutStream(key string, r io.Reader) error { + // Check not closed + if st.store == nil { + return ErrStateClosed + } + + // Pass request to store + return st.store.putStream(key, r) +} + +func (st *StateRW) Has(key string) (bool, error) { + // Check not closed + if st.store == nil { + return false, ErrStateClosed + } + + // Pass request to store + return st.store.has(key) +} + +func (st *StateRW) Delete(key string) error { + // Check not closed + if st.store == nil { + return ErrStateClosed + } + + // Pass request to store + return st.store.delete(key) +} + +func (st *StateRW) close() { + st.store = nil +} diff --git a/vendor/git.iim.gay/grufwub/go-store/kv/store.go b/vendor/git.iim.gay/grufwub/go-store/kv/store.go new file mode 100644 index 000000000..840a554e9 --- /dev/null +++ b/vendor/git.iim.gay/grufwub/go-store/kv/store.go @@ -0,0 +1,243 @@ +package kv + +import ( + "io" + "sync" + + "git.iim.gay/grufwub/go-mutexes" + "git.iim.gay/grufwub/go-store/storage" + "git.iim.gay/grufwub/go-store/util" +) + +// KVStore is a very simple, yet performant key-value store +type KVStore struct { + mutexMap mutexes.MutexMap // mutexMap is a map of keys to mutexes to protect file access + mutex sync.RWMutex // mutex is the total store mutex + storage storage.Storage // storage is the underlying storage +} + +func OpenFile(path string, cfg *storage.DiskConfig) (*KVStore, error) { + // Attempt to open disk storage + storage, err := storage.OpenFile(path, cfg) + if err != nil { + return nil, err + } + + // Return new KVStore + return OpenStorage(storage) +} + +func OpenBlock(path string, cfg *storage.BlockConfig) (*KVStore, error) { + // Attempt to open block storage + storage, err := storage.OpenBlock(path, cfg) + if err != nil { + return nil, err + } + + // Return new KVStore + return OpenStorage(storage) +} + +func OpenStorage(storage storage.Storage) (*KVStore, error) { + // Perform initial storage clean + err := storage.Clean() + if err != nil { + return nil, err + } + + // Return new KVStore + return &KVStore{ + mutexMap: mutexes.NewMap(mutexes.NewRW), + mutex: sync.RWMutex{}, + storage: storage, + }, nil +} + +// Get fetches the bytes for supplied key in the store +func (st *KVStore) Get(key string) ([]byte, error) { + // Acquire store read lock + st.mutex.RLock() + defer st.mutex.RUnlock() + + // Pass to unprotected fn + return st.get(key) +} + +func (st *KVStore) get(key string) ([]byte, error) { + // Acquire read lock for key + runlock := st.mutexMap.RLock(key) + defer runlock() + + // Read file bytes + return st.storage.ReadBytes(key) +} + +// GetStream fetches a ReadCloser for the bytes at the supplied key location in the store +func (st *KVStore) GetStream(key string) (io.ReadCloser, error) { + // Acquire store read lock + st.mutex.RLock() + defer st.mutex.RUnlock() + + // Pass to unprotected fn + return st.getStream(key) +} + +func (st *KVStore) getStream(key string) (io.ReadCloser, error) { + // Acquire read lock for key + runlock := st.mutexMap.RLock(key) + + // Attempt to open stream for read + rd, err := st.storage.ReadStream(key) + if err != nil { + runlock() + return nil, err + } + + // Wrap readcloser in our own callback closer + return util.ReadCloserWithCallback(rd, runlock), nil +} + +// Put places the bytes at the supplied key location in the store +func (st *KVStore) Put(key string, value []byte) error { + // Acquire store write lock + st.mutex.Lock() + defer st.mutex.Unlock() + + // Pass to unprotected fn + return st.put(key, value) +} + +func (st *KVStore) put(key string, value []byte) error { + // Acquire write lock for key + unlock := st.mutexMap.Lock(key) + defer unlock() + + // Write file bytes + return st.storage.WriteBytes(key, value) +} + +// PutStream writes the bytes from the supplied Reader at the supplied key location in the store +func (st *KVStore) PutStream(key string, r io.Reader) error { + // Acquire store write lock + st.mutex.Lock() + defer st.mutex.Unlock() + + // Pass to unprotected fn + return st.putStream(key, r) +} + +func (st *KVStore) putStream(key string, r io.Reader) error { + // Acquire write lock for key + unlock := st.mutexMap.Lock(key) + defer unlock() + + // Write file stream + return st.storage.WriteStream(key, r) +} + +// Has checks whether the supplied key exists in the store +func (st *KVStore) Has(key string) (bool, error) { + // Acquire store read lock + st.mutex.RLock() + defer st.mutex.RUnlock() + + // Pass to unprotected fn + return st.has(key) +} + +func (st *KVStore) has(key string) (bool, error) { + // Acquire read lock for key + runlock := st.mutexMap.RLock(key) + defer runlock() + + // Stat file on disk + return st.storage.Stat(key) +} + +// Delete removes the supplied key-value pair from the store +func (st *KVStore) Delete(key string) error { + // Acquire store write lock + st.mutex.Lock() + defer st.mutex.Unlock() + + // Pass to unprotected fn + return st.delete(key) +} + +func (st *KVStore) delete(key string) error { + // Acquire write lock for key + unlock := st.mutexMap.Lock(key) + defer unlock() + + // Remove file from disk + return st.storage.Remove(key) +} + +// Iterator returns an Iterator for key-value pairs in the store, using supplied match function +func (st *KVStore) Iterator(matchFn func(string) bool) (*KVIterator, error) { + // If no function, match all + if matchFn == nil { + matchFn = func(string) bool { return true } + } + + // Get store read lock + st.mutex.RLock() + + // Setup the walk keys function + entries := []storage.StorageEntry{} + walkFn := func(entry storage.StorageEntry) { + // Ignore unmatched entries + if !matchFn(entry.Key()) { + return + } + + // Add to entries + entries = append(entries, entry) + } + + // Walk keys in the storage + err := st.storage.WalkKeys(&storage.WalkKeysOptions{WalkFn: walkFn}) + if err != nil { + st.mutex.RUnlock() + return nil, err + } + + // Return new iterator + return &KVIterator{ + store: st, + entries: entries, + index: -1, + key: "", + onClose: st.mutex.RUnlock, + }, nil +} + +// Read provides a read-only window to the store, holding it in a read-locked state until +// the supplied function returns +func (st *KVStore) Read(do func(*StateRO)) { + // Get store read lock + st.mutex.RLock() + defer st.mutex.RUnlock() + + // Create new store state (defer close) + state := &StateRO{store: st} + defer state.close() + + // Pass state + do(state) +} + +// Update provides a read-write window to the store, holding it in a read-write-locked state +// until the supplied functions returns +func (st *KVStore) Update(do func(*StateRW)) { + // Get store lock + st.mutex.Lock() + defer st.mutex.Unlock() + + // Create new store state (defer close) + state := &StateRW{store: st} + defer state.close() + + // Pass state + do(state) +} diff --git a/vendor/git.iim.gay/grufwub/go-store/storage/block.go b/vendor/git.iim.gay/grufwub/go-store/storage/block.go new file mode 100644 index 000000000..023b83886 --- /dev/null +++ b/vendor/git.iim.gay/grufwub/go-store/storage/block.go @@ -0,0 +1,785 @@ +package storage + +import ( + "crypto/sha256" + "io" + "io/fs" + "os" + "strings" + "sync" + "syscall" + + "git.iim.gay/grufwub/fastpath" + "git.iim.gay/grufwub/go-bytes" + "git.iim.gay/grufwub/go-errors" + "git.iim.gay/grufwub/go-hashenc" + "git.iim.gay/grufwub/go-store/util" +) + +var ( + nodePathPrefix = "node/" + blockPathPrefix = "block/" +) + +// DefaultBlockConfig is the default BlockStorage configuration +var DefaultBlockConfig = &BlockConfig{ + BlockSize: 1024 * 16, + WriteBufSize: 4096, + Overwrite: false, + Compression: NoCompression(), +} + +// BlockConfig defines options to be used when opening a BlockStorage +type BlockConfig struct { + // BlockSize is the chunking size to use when splitting and storing blocks of data + BlockSize int + + // WriteBufSize is the buffer size to use when writing file streams (PutStream) + WriteBufSize int + + // Overwrite allows overwriting values of stored keys in the storage + Overwrite bool + + // Compression is the Compressor to use when reading / writing files, default is no compression + Compression Compressor +} + +// getBlockConfig returns a valid BlockConfig for supplied ptr +func getBlockConfig(cfg *BlockConfig) BlockConfig { + // If nil, use default + if cfg == nil { + cfg = DefaultBlockConfig + } + + // Assume nil compress == none + if cfg.Compression == nil { + cfg.Compression = NoCompression() + } + + // Assume 0 chunk size == use default + if cfg.BlockSize < 1 { + cfg.BlockSize = DefaultBlockConfig.BlockSize + } + + // Assume 0 buf size == use default + if cfg.WriteBufSize < 1 { + cfg.WriteBufSize = DefaultDiskConfig.WriteBufSize + } + + // Return owned config copy + return BlockConfig{ + BlockSize: cfg.BlockSize, + WriteBufSize: cfg.WriteBufSize, + Overwrite: cfg.Overwrite, + Compression: cfg.Compression, + } +} + +// BlockStorage is a Storage implementation that stores input data as chunks on +// a filesystem. Each value is chunked into blocks of configured size and these +// blocks are stored with name equal to their base64-encoded SHA256 hash-sum. A +// "node" file is finally created containing an array of hashes contained within +// this value +type BlockStorage struct { + path string // path is the root path of this store + blockPath string // blockPath is the joined root path + block path prefix + nodePath string // nodePath is the joined root path + node path prefix + config BlockConfig // cfg is the supplied configuration for this store + hashPool sync.Pool // hashPool is this store's hashEncoder pool + + // NOTE: + // BlockStorage does not need to lock each of the underlying block files + // as the filename itself directly relates to the contents. If there happens + // to be an overwrite, it will just be of the same data since the filename is + // the hash of the data. +} + +// OpenBlock opens a BlockStorage instance for given folder path and configuration +func OpenBlock(path string, cfg *BlockConfig) (*BlockStorage, error) { + // Acquire path builder + pb := util.AcquirePathBuilder() + defer util.ReleasePathBuilder(pb) + + // Clean provided path, ensure ends in '/' (should + // be dir, this helps with file path trimming later) + path = pb.Clean(path) + "/" + + // Get checked config + config := getBlockConfig(cfg) + + // Attempt to open path + file, err := os.OpenFile(path, defaultFileROFlags, defaultDirPerms) + if err != nil { + // If not a not-exist error, return + if !os.IsNotExist(err) { + return nil, err + } + + // Attempt to make store path dirs + err = os.MkdirAll(path, defaultDirPerms) + if err != nil { + return nil, err + } + + // Reopen dir now it's been created + file, err = os.OpenFile(path, defaultFileROFlags, defaultDirPerms) + if err != nil { + return nil, err + } + } + defer file.Close() + + // Double check this is a dir (NOT a file!) + stat, err := file.Stat() + if err != nil { + return nil, err + } else if !stat.IsDir() { + return nil, errPathIsFile + } + + // Return new BlockStorage + return &BlockStorage{ + path: path, + blockPath: pb.Join(path, blockPathPrefix), + nodePath: pb.Join(path, nodePathPrefix), + config: config, + hashPool: sync.Pool{ + New: func() interface{} { + return newHashEncoder() + }, + }, + }, nil +} + +// Clean implements storage.Clean() +func (st *BlockStorage) Clean() error { + nodes := map[string]*node{} + + // Acquire path builder + pb := fastpath.AcquireBuilder() + defer fastpath.ReleaseBuilder(pb) + + // Walk nodes dir for entries + onceErr := errors.OnceError{} + err := util.WalkDir(pb, st.nodePath, func(npath string, fsentry fs.DirEntry) { + // Only deal with regular files + if !fsentry.Type().IsRegular() { + return + } + + // Stop if we hit error previously + if onceErr.IsSet() { + return + } + + // Get joined node path name + npath = pb.Join(npath, fsentry.Name()) + + // Attempt to open RO file + file, err := open(npath, defaultFileROFlags) + if err != nil { + onceErr.Store(err) + return + } + defer file.Close() + + // Alloc new Node + acquire hash buffer for writes + hbuf := util.AcquireBuffer(encodedHashLen) + defer util.ReleaseBuffer(hbuf) + node := node{} + + // Write file contents to node + _, err = io.CopyBuffer( + &nodeWriter{ + node: &node, + buf: hbuf, + }, + file, + nil, + ) + if err != nil { + onceErr.Store(err) + return + } + + // Append to nodes slice + nodes[fsentry.Name()] = &node + }) + + // Handle errors (though nodePath may not have been created yet) + if err != nil && !os.IsNotExist(err) { + return err + } else if onceErr.IsSet() { + return onceErr.Load() + } + + // Walk blocks dir for entries + onceErr.Reset() + err = util.WalkDir(pb, st.blockPath, func(bpath string, fsentry fs.DirEntry) { + // Only deal with regular files + if !fsentry.Type().IsRegular() { + return + } + + // Stop if we hit error previously + if onceErr.IsSet() { + return + } + + inUse := false + for key, node := range nodes { + if node.removeHash(fsentry.Name()) { + if len(node.hashes) < 1 { + // This node contained hash, and after removal is now empty. + // Remove this node from our tracked nodes slice + delete(nodes, key) + } + inUse = true + } + } + + // Block hash is used by node + if inUse { + return + } + + // Get joined block path name + bpath = pb.Join(bpath, fsentry.Name()) + + // Remove this unused block path + err := os.Remove(bpath) + if err != nil { + onceErr.Store(err) + return + } + }) + + // Handle errors (though blockPath may not have been created yet) + if err != nil && !os.IsNotExist(err) { + return err + } else if onceErr.IsSet() { + return onceErr.Load() + } + + // If there are nodes left at this point, they are corrupt + // (i.e. they're referencing block hashes that don't exist) + if len(nodes) > 0 { + nodeKeys := []string{} + for key := range nodes { + nodeKeys = append(nodeKeys, key) + } + return errCorruptNodes.Extend("%v", nodeKeys) + } + + return nil +} + +// ReadBytes implements Storage.ReadBytes() +func (st *BlockStorage) ReadBytes(key string) ([]byte, error) { + // Get stream reader for key + rc, err := st.ReadStream(key) + if err != nil { + return nil, err + } + + // Read all bytes and return + return io.ReadAll(rc) +} + +// ReadStream implements Storage.ReadStream() +func (st *BlockStorage) ReadStream(key string) (io.ReadCloser, error) { + // Get node file path for key + npath, err := st.nodePathForKey(key) + if err != nil { + return nil, err + } + + // Attempt to open RO file + file, err := open(npath, defaultFileROFlags) + if err != nil { + return nil, err + } + defer file.Close() + + // Alloc new Node + acquire hash buffer for writes + hbuf := util.AcquireBuffer(encodedHashLen) + defer util.ReleaseBuffer(hbuf) + node := node{} + + // Write file contents to node + _, err = io.CopyBuffer( + &nodeWriter{ + node: &node, + buf: hbuf, + }, + file, + nil, + ) + if err != nil { + return nil, err + } + + // Return new block reader + return util.NopReadCloser(&blockReader{ + storage: st, + node: &node, + }), nil +} + +func (st *BlockStorage) readBlock(key string) ([]byte, error) { + // Get block file path for key + bpath := st.blockPathForKey(key) + + // Attempt to open RO file + file, err := open(bpath, defaultFileROFlags) + if err != nil { + return nil, err + } + defer file.Close() + + // Wrap the file in a compressor + cFile, err := st.config.Compression.Reader(file) + if err != nil { + return nil, err + } + defer cFile.Close() + + // Read the entire file + return io.ReadAll(cFile) +} + +// WriteBytes implements Storage.WriteBytes() +func (st *BlockStorage) WriteBytes(key string, value []byte) error { + return st.WriteStream(key, bytes.NewReader(value)) +} + +// WriteStream implements Storage.WriteStream() +func (st *BlockStorage) WriteStream(key string, r io.Reader) error { + // Get node file path for key + npath, err := st.nodePathForKey(key) + if err != nil { + return err + } + + // Check if this exists + ok, err := stat(key) + if err != nil { + return err + } + + // Check if we allow overwrites + if ok && !st.config.Overwrite { + return ErrAlreadyExists + } + + // Ensure nodes dir (and any leading up to) exists + err = os.MkdirAll(st.nodePath, defaultDirPerms) + if err != nil { + return err + } + + // Ensure blocks dir (and any leading up to) exists + err = os.MkdirAll(st.blockPath, defaultDirPerms) + if err != nil { + return err + } + + // Alloc new node + node := node{} + + // Acquire HashEncoder + hc := st.hashPool.Get().(*hashEncoder) + defer st.hashPool.Put(hc) + + // Create new waitgroup and OnceError for + // goroutine error tracking and propagating + wg := sync.WaitGroup{} + onceErr := errors.OnceError{} + +loop: + for !onceErr.IsSet() { + // Fetch new buffer for this loop + buf := util.AcquireBuffer(st.config.BlockSize) + buf.Grow(st.config.BlockSize) + + // Read next chunk + n, err := io.ReadFull(r, buf.B) + switch err { + case nil, io.ErrUnexpectedEOF: + // do nothing + case io.EOF: + util.ReleaseBuffer(buf) + break loop + default: + util.ReleaseBuffer(buf) + return err + } + + // Hash the encoded data + sum := hc.EncodeSum(buf.B) + + // Append to the node's hashes + node.hashes = append(node.hashes, sum.String()) + + // If already on disk, skip + has, err := st.statBlock(sum.StringPtr()) + if err != nil { + util.ReleaseBuffer(buf) + return err + } else if has { + util.ReleaseBuffer(buf) + continue loop + } + + // Write in separate goroutine + wg.Add(1) + go func() { + // Defer buffer release + signal done + defer func() { + util.ReleaseBuffer(buf) + wg.Done() + }() + + // Write block to store at hash + err = st.writeBlock(sum.StringPtr(), buf.B[:n]) + if err != nil { + onceErr.Store(err) + return + } + }() + + // We reached EOF + if n < buf.Len() { + break loop + } + } + + // Wait, check errors + wg.Wait() + if onceErr.IsSet() { + return onceErr.Load() + } + + // If no hashes created, return + if len(node.hashes) < 1 { + return errNoHashesWritten + } + + // Prepare to swap error if need-be + errSwap := errSwapNoop + + // Build file RW flags + // NOTE: we performed an initial check for + // this before writing blocks, but if + // the utilizer of this storage didn't + // correctly mutex protect this key then + // someone may have beaten us to the + // punch at writing the node file. + flags := defaultFileRWFlags + if !st.config.Overwrite { + flags |= syscall.O_EXCL + + // Catch + replace err exist + errSwap = errSwapExist + } + + // Attempt to open RW file + file, err := open(npath, flags) + if err != nil { + return errSwap(err) + } + defer file.Close() + + // Acquire write buffer + buf := util.AcquireBuffer(st.config.WriteBufSize) + defer util.ReleaseBuffer(buf) + buf.Grow(st.config.WriteBufSize) + + // Finally, write data to file + _, err = io.CopyBuffer(file, &nodeReader{node: &node}, nil) + return err +} + +// writeBlock writes the block with hash and supplied value to the filesystem +func (st *BlockStorage) writeBlock(hash string, value []byte) error { + // Get block file path for key + bpath := st.blockPathForKey(hash) + + // Attempt to open RW file + file, err := open(bpath, defaultFileRWFlags) + if err != nil { + if err == ErrAlreadyExists { + err = nil /* race issue describe in struct NOTE */ + } + return err + } + defer file.Close() + + // Wrap the file in a compressor + cFile, err := st.config.Compression.Writer(file) + if err != nil { + return err + } + defer cFile.Close() + + // Write value to file + _, err = cFile.Write(value) + return err +} + +// statBlock checks for existence of supplied block hash +func (st *BlockStorage) statBlock(hash string) (bool, error) { + return stat(st.blockPathForKey(hash)) +} + +// Stat implements Storage.Stat() +func (st *BlockStorage) Stat(key string) (bool, error) { + // Get node file path for key + kpath, err := st.nodePathForKey(key) + if err != nil { + return false, err + } + + // Check for file on disk + return stat(kpath) +} + +// Remove implements Storage.Remove() +func (st *BlockStorage) Remove(key string) error { + // Get node file path for key + kpath, err := st.nodePathForKey(key) + if err != nil { + return err + } + + // Attempt to remove file + return os.Remove(kpath) +} + +// WalkKeys implements Storage.WalkKeys() +func (st *BlockStorage) WalkKeys(opts *WalkKeysOptions) error { + // Acquire path builder + pb := fastpath.AcquireBuilder() + defer fastpath.ReleaseBuilder(pb) + + // Walk dir for entries + return util.WalkDir(pb, st.nodePath, func(npath string, fsentry fs.DirEntry) { + // Only deal with regular files + if fsentry.Type().IsRegular() { + opts.WalkFn(entry(fsentry.Name())) + } + }) +} + +// nodePathForKey calculates the node file path for supplied key +func (st *BlockStorage) nodePathForKey(key string) (string, error) { + // Path separators are illegal + if strings.Contains(key, "/") { + return "", ErrInvalidKey + } + + // Acquire path builder + pb := util.AcquirePathBuilder() + defer util.ReleasePathBuilder(pb) + + // Return joined + cleaned node-path + return pb.Join(st.nodePath, key), nil +} + +// blockPathForKey calculates the block file path for supplied hash +func (st *BlockStorage) blockPathForKey(hash string) string { + pb := util.AcquirePathBuilder() + defer util.ReleasePathBuilder(pb) + return pb.Join(st.blockPath, hash) +} + +// hashSeparator is the separating byte between block hashes +const hashSeparator = byte(':') + +// node represents the contents of a node file in storage +type node struct { + hashes []string +} + +// removeHash attempts to remove supplied block hash from the node's hash array +func (n *node) removeHash(hash string) bool { + haveDropped := false + for i := 0; i < len(n.hashes); { + if n.hashes[i] == hash { + // Drop this hash from slice + n.hashes = append(n.hashes[:i], n.hashes[i+1:]...) + haveDropped = true + } else { + // Continue iter + i++ + } + } + return haveDropped +} + +// nodeReader is an io.Reader implementation for the node file representation, +// which is useful when calculated node file is being written to the store +type nodeReader struct { + node *node + idx int + last int +} + +func (r *nodeReader) Read(b []byte) (int, error) { + n := 0 + + // '-1' means we missed writing + // hash separator on last iteration + if r.last == -1 { + b[n] = hashSeparator + n++ + r.last = 0 + } + + for r.idx < len(r.node.hashes) { + hash := r.node.hashes[r.idx] + + // Copy into buffer + update read count + m := copy(b[n:], hash[r.last:]) + n += m + + // If incomplete copy, return here + if m < len(hash)-r.last { + r.last = m + return n, nil + } + + // Check we can write last separator + if n == len(b) { + r.last = -1 + return n, nil + } + + // Write separator, iter, reset + b[n] = hashSeparator + n++ + r.idx++ + r.last = 0 + } + + // We reached end of hashes + return n, io.EOF +} + +// nodeWriter is an io.Writer implementation for the node file representation, +// which is useful when calculated node file is being read from the store +type nodeWriter struct { + node *node + buf *bytes.Buffer +} + +func (w *nodeWriter) Write(b []byte) (int, error) { + n := 0 + + for { + // Find next hash separator position + idx := bytes.IndexByte(b[n:], hashSeparator) + if idx == -1 { + // Check we shouldn't be expecting it + if w.buf.Len() > encodedHashLen { + return n, errInvalidNode + } + + // Write all contents to buffer + w.buf.Write(b[n:]) + return len(b), nil + } + + // Found hash separator, write + // current buf contents to Node hashes + w.buf.Write(b[n : n+idx]) + n += idx + 1 + if w.buf.Len() != encodedHashLen { + return n, errInvalidNode + } + + // Append to hashes & reset + w.node.hashes = append(w.node.hashes, w.buf.String()) + w.buf.Reset() + } +} + +// blockReader is an io.Reader implementation for the combined, linked block +// data contained with a node file. Basically, this allows reading value data +// from the store for a given node file +type blockReader struct { + storage *BlockStorage + node *node + buf []byte + prev int +} + +func (r *blockReader) Read(b []byte) (int, error) { + n := 0 + + // Data left in buf, copy as much as we + // can into supplied read buffer + if r.prev < len(r.buf)-1 { + n += copy(b, r.buf[r.prev:]) + r.prev += n + if n >= len(b) { + return n, nil + } + } + + for { + // Check we have any hashes left + if len(r.node.hashes) < 1 { + return n, io.EOF + } + + // Get next key from slice + key := r.node.hashes[0] + r.node.hashes = r.node.hashes[1:] + + // Attempt to fetch next batch of data + var err error + r.buf, err = r.storage.readBlock(key) + if err != nil { + return n, err + } + r.prev = 0 + + // Copy as much as can from new buffer + m := copy(b[n:], r.buf) + r.prev += m + n += m + + // If we hit end of supplied buf, return + if n >= len(b) { + return n, nil + } + } +} + +// hashEncoder is a HashEncoder with built-in encode buffer +type hashEncoder struct { + henc hashenc.HashEncoder + ebuf []byte +} + +// encodedHashLen is the once-calculated encoded hash-sum length +var encodedHashLen = hashenc.Base64().EncodedLen( + sha256.New().Size(), +) + +// newHashEncoder returns a new hashEncoder instance +func newHashEncoder() *hashEncoder { + hash := sha256.New() + enc := hashenc.Base64() + return &hashEncoder{ + henc: hashenc.New(hash, enc), + ebuf: make([]byte, enc.EncodedLen(hash.Size())), + } +} + +// EncodeSum encodes the src data and returns resulting bytes, only valid until next call to EncodeSum() +func (henc *hashEncoder) EncodeSum(src []byte) bytes.Bytes { + henc.henc.EncodeSum(henc.ebuf, src) + return bytes.ToBytes(henc.ebuf) +} diff --git a/vendor/git.iim.gay/grufwub/go-store/storage/compressor.go b/vendor/git.iim.gay/grufwub/go-store/storage/compressor.go new file mode 100644 index 000000000..b8cfb906c --- /dev/null +++ b/vendor/git.iim.gay/grufwub/go-store/storage/compressor.go @@ -0,0 +1,104 @@ +package storage + +import ( + "compress/gzip" + "compress/zlib" + "io" + + "git.iim.gay/grufwub/go-store/util" + "github.com/golang/snappy" +) + +// Compressor defines a means of compressing/decompressing values going into a key-value store +type Compressor interface { + // Reader returns a new decompressing io.ReadCloser based on supplied (compressed) io.Reader + Reader(io.Reader) (io.ReadCloser, error) + + // Writer returns a new compressing io.WriteCloser based on supplied (uncompressed) io.Writer + Writer(io.Writer) (io.WriteCloser, error) +} + +type gzipCompressor struct { + level int +} + +// GZipCompressor returns a new Compressor that implements GZip at default compression level +func GZipCompressor() Compressor { + return GZipCompressorLevel(gzip.DefaultCompression) +} + +// GZipCompressorLevel returns a new Compressor that implements GZip at supplied compression level +func GZipCompressorLevel(level int) Compressor { + return &gzipCompressor{ + level: level, + } +} + +func (c *gzipCompressor) Reader(r io.Reader) (io.ReadCloser, error) { + return gzip.NewReader(r) +} + +func (c *gzipCompressor) Writer(w io.Writer) (io.WriteCloser, error) { + return gzip.NewWriterLevel(w, c.level) +} + +type zlibCompressor struct { + level int + dict []byte +} + +// ZLibCompressor returns a new Compressor that implements ZLib at default compression level +func ZLibCompressor() Compressor { + return ZLibCompressorLevelDict(zlib.DefaultCompression, nil) +} + +// ZLibCompressorLevel returns a new Compressor that implements ZLib at supplied compression level +func ZLibCompressorLevel(level int) Compressor { + return ZLibCompressorLevelDict(level, nil) +} + +// ZLibCompressorLevelDict returns a new Compressor that implements ZLib at supplied compression level with supplied dict +func ZLibCompressorLevelDict(level int, dict []byte) Compressor { + return &zlibCompressor{ + level: level, + dict: dict, + } +} + +func (c *zlibCompressor) Reader(r io.Reader) (io.ReadCloser, error) { + return zlib.NewReaderDict(r, c.dict) +} + +func (c *zlibCompressor) Writer(w io.Writer) (io.WriteCloser, error) { + return zlib.NewWriterLevelDict(w, c.level, c.dict) +} + +type snappyCompressor struct{} + +// SnappyCompressor returns a new Compressor that implements Snappy +func SnappyCompressor() Compressor { + return &snappyCompressor{} +} + +func (c *snappyCompressor) Reader(r io.Reader) (io.ReadCloser, error) { + return util.NopReadCloser(snappy.NewReader(r)), nil +} + +func (c *snappyCompressor) Writer(w io.Writer) (io.WriteCloser, error) { + return snappy.NewBufferedWriter(w), nil +} + +type nopCompressor struct{} + +// NoCompression is a Compressor that simply does nothing +func NoCompression() Compressor { + return &nopCompressor{} +} + +func (c *nopCompressor) Reader(r io.Reader) (io.ReadCloser, error) { + return util.NopReadCloser(r), nil +} + +func (c *nopCompressor) Writer(w io.Writer) (io.WriteCloser, error) { + return util.NopWriteCloser(w), nil +} diff --git a/vendor/git.iim.gay/grufwub/go-store/storage/disk.go b/vendor/git.iim.gay/grufwub/go-store/storage/disk.go new file mode 100644 index 000000000..884c2e252 --- /dev/null +++ b/vendor/git.iim.gay/grufwub/go-store/storage/disk.go @@ -0,0 +1,289 @@ +package storage + +import ( + "io" + "io/fs" + "os" + "path" + "syscall" + + "git.iim.gay/grufwub/fastpath" + "git.iim.gay/grufwub/go-bytes" + "git.iim.gay/grufwub/go-store/util" +) + +// DefaultDiskConfig is the default DiskStorage configuration +var DefaultDiskConfig = &DiskConfig{ + Overwrite: true, + WriteBufSize: 4096, + Transform: NopTransform(), + Compression: NoCompression(), +} + +// DiskConfig defines options to be used when opening a DiskStorage +type DiskConfig struct { + // Transform is the supplied key<-->path KeyTransform + Transform KeyTransform + + // WriteBufSize is the buffer size to use when writing file streams (PutStream) + WriteBufSize int + + // Overwrite allows overwriting values of stored keys in the storage + Overwrite bool + + // Compression is the Compressor to use when reading / writing files, default is no compression + Compression Compressor +} + +// getDiskConfig returns a valid DiskConfig for supplied ptr +func getDiskConfig(cfg *DiskConfig) DiskConfig { + // If nil, use default + if cfg == nil { + cfg = DefaultDiskConfig + } + + // Assume nil transform == none + if cfg.Transform == nil { + cfg.Transform = NopTransform() + } + + // Assume nil compress == none + if cfg.Compression == nil { + cfg.Compression = NoCompression() + } + + // Assume 0 buf size == use default + if cfg.WriteBufSize < 1 { + cfg.WriteBufSize = DefaultDiskConfig.WriteBufSize + } + + // Return owned config copy + return DiskConfig{ + Transform: cfg.Transform, + WriteBufSize: cfg.WriteBufSize, + Overwrite: cfg.Overwrite, + Compression: cfg.Compression, + } +} + +// DiskStorage is a Storage implementation that stores directly to a filesystem +type DiskStorage struct { + path string // path is the root path of this store + dots int // dots is the "dotdot" count for the root store path + config DiskConfig // cfg is the supplied configuration for this store +} + +// OpenFile opens a DiskStorage instance for given folder path and configuration +func OpenFile(path string, cfg *DiskConfig) (*DiskStorage, error) { + // Acquire path builder + pb := util.AcquirePathBuilder() + defer util.ReleasePathBuilder(pb) + + // Clean provided path, ensure ends in '/' (should + // be dir, this helps with file path trimming later) + path = pb.Clean(path) + "/" + + // Get checked config + config := getDiskConfig(cfg) + + // Attempt to open dir path + file, err := os.OpenFile(path, defaultFileROFlags, defaultDirPerms) + if err != nil { + // If not a not-exist error, return + if !os.IsNotExist(err) { + return nil, err + } + + // Attempt to make store path dirs + err = os.MkdirAll(path, defaultDirPerms) + if err != nil { + return nil, err + } + + // Reopen dir now it's been created + file, err = os.OpenFile(path, defaultFileROFlags, defaultDirPerms) + if err != nil { + return nil, err + } + } + defer file.Close() + + // Double check this is a dir (NOT a file!) + stat, err := file.Stat() + if err != nil { + return nil, err + } else if !stat.IsDir() { + return nil, errPathIsFile + } + + // Return new DiskStorage + return &DiskStorage{ + path: path, + dots: util.CountDotdots(path), + config: config, + }, nil +} + +// Clean implements Storage.Clean() +func (st *DiskStorage) Clean() error { + return util.CleanDirs(st.path) +} + +// ReadBytes implements Storage.ReadBytes() +func (st *DiskStorage) ReadBytes(key string) ([]byte, error) { + // Get stream reader for key + rc, err := st.ReadStream(key) + if err != nil { + return nil, err + } + defer rc.Close() + + // Read all bytes and return + return io.ReadAll(rc) +} + +// ReadStream implements Storage.ReadStream() +func (st *DiskStorage) ReadStream(key string) (io.ReadCloser, error) { + // Get file path for key + kpath, err := st.filepath(key) + if err != nil { + return nil, err + } + + // Attempt to open file (replace ENOENT with our own) + file, err := open(kpath, defaultFileROFlags) + if err != nil { + return nil, errSwapNotFound(err) + } + + // Wrap the file in a compressor + cFile, err := st.config.Compression.Reader(file) + if err != nil { + file.Close() // close this here, ignore error + return nil, err + } + + // Wrap compressor to ensure file close + return util.ReadCloserWithCallback(cFile, func() { + file.Close() + }), nil +} + +// WriteBytes implements Storage.WriteBytes() +func (st *DiskStorage) WriteBytes(key string, value []byte) error { + return st.WriteStream(key, bytes.NewReader(value)) +} + +// WriteStream implements Storage.WriteStream() +func (st *DiskStorage) WriteStream(key string, r io.Reader) error { + // Get file path for key + kpath, err := st.filepath(key) + if err != nil { + return err + } + + // Ensure dirs leading up to file exist + err = os.MkdirAll(path.Dir(kpath), defaultDirPerms) + if err != nil { + return err + } + + // Prepare to swap error if need-be + errSwap := errSwapNoop + + // Build file RW flags + flags := defaultFileRWFlags + if !st.config.Overwrite { + flags |= syscall.O_EXCL + + // Catch + replace err exist + errSwap = errSwapExist + } + + // Attempt to open file + file, err := open(kpath, flags) + if err != nil { + return errSwap(err) + } + defer file.Close() + + // Wrap the file in a compressor + cFile, err := st.config.Compression.Writer(file) + if err != nil { + return err + } + defer cFile.Close() + + // Acquire write buffer + buf := util.AcquireBuffer(st.config.WriteBufSize) + defer util.ReleaseBuffer(buf) + buf.Grow(st.config.WriteBufSize) + + // Copy reader to file + _, err = io.CopyBuffer(cFile, r, buf.B) + return err +} + +// Stat implements Storage.Stat() +func (st *DiskStorage) Stat(key string) (bool, error) { + // Get file path for key + kpath, err := st.filepath(key) + if err != nil { + return false, err + } + + // Check for file on disk + return stat(kpath) +} + +// Remove implements Storage.Remove() +func (st *DiskStorage) Remove(key string) error { + // Get file path for key + kpath, err := st.filepath(key) + if err != nil { + return err + } + + // Attempt to remove file + return os.Remove(kpath) +} + +// WalkKeys implements Storage.WalkKeys() +func (st *DiskStorage) WalkKeys(opts *WalkKeysOptions) error { + // Acquire path builder + pb := fastpath.AcquireBuilder() + defer fastpath.ReleaseBuilder(pb) + + // Walk dir for entries + return util.WalkDir(pb, st.path, func(kpath string, fsentry fs.DirEntry) { + // Only deal with regular files + if fsentry.Type().IsRegular() { + // Get full item path (without root) + kpath = pb.Join(kpath, fsentry.Name())[len(st.path):] + + // Perform provided walk function + opts.WalkFn(entry(st.config.Transform.PathToKey(kpath))) + } + }) +} + +// filepath checks and returns a formatted filepath for given key +func (st *DiskStorage) filepath(key string) (string, error) { + // Acquire path builder + pb := util.AcquirePathBuilder() + defer util.ReleasePathBuilder(pb) + + // Calculate transformed key path + key = st.config.Transform.KeyToPath(key) + + // Generated joined root path + pb.AppendString(st.path) + pb.AppendString(key) + + // If path is dir traversal, and traverses FURTHER + // than store root, this is an error + if util.CountDotdots(pb.StringPtr()) > st.dots { + return "", ErrInvalidKey + } + return pb.String(), nil +} diff --git a/vendor/git.iim.gay/grufwub/go-store/storage/errors.go b/vendor/git.iim.gay/grufwub/go-store/storage/errors.go new file mode 100644 index 000000000..016593596 --- /dev/null +++ b/vendor/git.iim.gay/grufwub/go-store/storage/errors.go @@ -0,0 +1,63 @@ +package storage + +import ( + "fmt" + "syscall" +) + +// errorString is our own simple error type +type errorString string + +// Error implements error +func (e errorString) Error() string { + return string(e) +} + +// Extend appends extra information to an errorString +func (e errorString) Extend(s string, a ...interface{}) errorString { + return errorString(string(e) + ": " + fmt.Sprintf(s, a...)) +} + +var ( + // ErrNotFound is the error returned when a key cannot be found in storage + ErrNotFound = errorString("store/storage: key not found") + + // ErrAlreadyExist is the error returned when a key already exists in storage + ErrAlreadyExists = errorString("store/storage: key already exists") + + // ErrInvalidkey is the error returned when an invalid key is passed to storage + ErrInvalidKey = errorString("store/storage: invalid key") + + // errPathIsFile is returned when a path for a disk config is actually a file + errPathIsFile = errorString("store/storage: path is file") + + // errNoHashesWritten is returned when no blocks are written for given input value + errNoHashesWritten = errorString("storage/storage: no hashes written") + + // errInvalidNode is returned when read on an invalid node in the store is attempted + errInvalidNode = errorString("store/storage: invalid node") + + // errCorruptNodes is returned when nodes with missing blocks are found during a BlockStorage clean + errCorruptNodes = errorString("store/storage: corrupted nodes") +) + +// errSwapNoop performs no error swaps +func errSwapNoop(err error) error { + return err +} + +// ErrSwapNotFound swaps syscall.ENOENT for ErrNotFound +func errSwapNotFound(err error) error { + if err == syscall.ENOENT { + return ErrNotFound + } + return err +} + +// errSwapExist swaps syscall.EEXIST for ErrAlreadyExists +func errSwapExist(err error) error { + if err == syscall.EEXIST { + return ErrAlreadyExists + } + return err +} diff --git a/vendor/git.iim.gay/grufwub/go-store/storage/fs.go b/vendor/git.iim.gay/grufwub/go-store/storage/fs.go new file mode 100644 index 000000000..2d6c44caf --- /dev/null +++ b/vendor/git.iim.gay/grufwub/go-store/storage/fs.go @@ -0,0 +1,48 @@ +package storage + +import ( + "os" + "syscall" + + "git.iim.gay/grufwub/go-store/util" +) + +const ( + defaultDirPerms = 0755 + defaultFilePerms = 0644 + defaultFileROFlags = syscall.O_RDONLY + defaultFileRWFlags = syscall.O_CREAT | syscall.O_RDWR + defaultFileLockFlags = syscall.O_RDONLY | syscall.O_EXCL | syscall.O_CREAT +) + +// NOTE: +// These functions are for opening storage files, +// not necessarily for e.g. initial setup (OpenFile) + +// open should not be called directly +func open(path string, flags int) (*os.File, error) { + var fd int + err := util.RetryOnEINTR(func() (err error) { + fd, err = syscall.Open(path, flags, defaultFilePerms) + return + }) + if err != nil { + return nil, err + } + return os.NewFile(uintptr(fd), path), nil +} + +// stat checks for a file on disk +func stat(path string) (bool, error) { + var stat syscall.Stat_t + err := util.RetryOnEINTR(func() error { + return syscall.Stat(path, &stat) + }) + if err != nil { + if err == syscall.ENOENT { + err = nil + } + return false, err + } + return true, nil +} diff --git a/vendor/git.iim.gay/grufwub/go-store/storage/lock.go b/vendor/git.iim.gay/grufwub/go-store/storage/lock.go new file mode 100644 index 000000000..e7c7bf49a --- /dev/null +++ b/vendor/git.iim.gay/grufwub/go-store/storage/lock.go @@ -0,0 +1,34 @@ +package storage + +import ( + "os" + "syscall" + + "git.iim.gay/grufwub/go-store/util" +) + +type lockableFile struct { + *os.File +} + +func openLock(path string) (*lockableFile, error) { + file, err := open(path, defaultFileLockFlags) + if err != nil { + return nil, err + } + return &lockableFile{file}, nil +} + +func (f *lockableFile) lock() error { + return f.flock(syscall.LOCK_EX | syscall.LOCK_NB) +} + +func (f *lockableFile) unlock() error { + return f.flock(syscall.LOCK_UN | syscall.LOCK_NB) +} + +func (f *lockableFile) flock(how int) error { + return util.RetryOnEINTR(func() error { + return syscall.Flock(int(f.Fd()), how) + }) +} diff --git a/vendor/git.iim.gay/grufwub/go-store/storage/storage.go b/vendor/git.iim.gay/grufwub/go-store/storage/storage.go new file mode 100644 index 000000000..61f722111 --- /dev/null +++ b/vendor/git.iim.gay/grufwub/go-store/storage/storage.go @@ -0,0 +1,51 @@ +package storage + +import ( + "io" +) + +// StorageEntry defines a key in Storage +type StorageEntry interface { + // Key returns the storage entry's key + Key() string +} + +// entry is the simplest possible StorageEntry +type entry string + +func (e entry) Key() string { + return string(e) +} + +// Storage defines a means of storing and accessing key value pairs +type Storage interface { + // Clean removes unused values and unclutters the storage (e.g. removing empty folders) + Clean() error + + // ReadBytes returns the byte value for key in storage + ReadBytes(key string) ([]byte, error) + + // ReadStream returns an io.ReadCloser for the value bytes at key in the storage + ReadStream(key string) (io.ReadCloser, error) + + // WriteBytes writes the supplied value bytes at key in the storage + WriteBytes(key string, value []byte) error + + // WriteStream writes the bytes from supplied reader at key in the storage + WriteStream(key string, r io.Reader) error + + // Stat checks if the supplied key is in the storage + Stat(key string) (bool, error) + + // Remove attempts to remove the supplied key-value pair from storage + Remove(key string) error + + // WalkKeys walks the keys in the storage + WalkKeys(opts *WalkKeysOptions) error +} + +// WalkKeysOptions defines how to walk the keys in a storage implementation +type WalkKeysOptions struct { + // WalkFn is the function to apply on each StorageEntry + WalkFn func(StorageEntry) +} diff --git a/vendor/git.iim.gay/grufwub/go-store/storage/transform.go b/vendor/git.iim.gay/grufwub/go-store/storage/transform.go new file mode 100644 index 000000000..3863dd774 --- /dev/null +++ b/vendor/git.iim.gay/grufwub/go-store/storage/transform.go @@ -0,0 +1,25 @@ +package storage + +// KeyTransform defines a method of converting store keys to storage paths (and vice-versa) +type KeyTransform interface { + // KeyToPath converts a supplied key to storage path + KeyToPath(string) string + + // PathToKey converts a supplied storage path to key + PathToKey(string) string +} + +type nopKeyTransform struct{} + +// NopTransform returns a nop key transform (i.e. key = path) +func NopTransform() KeyTransform { + return &nopKeyTransform{} +} + +func (t *nopKeyTransform) KeyToPath(key string) string { + return key +} + +func (t *nopKeyTransform) PathToKey(path string) string { + return path +} diff --git a/vendor/git.iim.gay/grufwub/go-store/util/fs.go b/vendor/git.iim.gay/grufwub/go-store/util/fs.go new file mode 100644 index 000000000..20c0ab116 --- /dev/null +++ b/vendor/git.iim.gay/grufwub/go-store/util/fs.go @@ -0,0 +1,105 @@ +package util + +import ( + "io/fs" + "os" + "strings" + "syscall" + + "git.iim.gay/grufwub/fastpath" +) + +var dotdot = "../" + +// CountDotdots returns the number of "dot-dots" (../) in a cleaned filesystem path +func CountDotdots(path string) int { + if !strings.HasSuffix(path, dotdot) { + return 0 + } + return strings.Count(path, dotdot) +} + +// WalkDir traverses the dir tree of the supplied path, performing the supplied walkFn on each entry +func WalkDir(pb *fastpath.Builder, path string, walkFn func(string, fs.DirEntry)) error { + // Read supplied dir path + dirEntries, err := os.ReadDir(path) + if err != nil { + return err + } + + // Iter entries + for _, entry := range dirEntries { + // Pass to walk fn + walkFn(path, entry) + + // Recurse dir entries + if entry.IsDir() { + err = WalkDir(pb, pb.Join(path, entry.Name()), walkFn) + if err != nil { + return err + } + } + } + + return nil +} + +// CleanDirs traverses the dir tree of the supplied path, removing any folders with zero children +func CleanDirs(path string) error { + // Acquire builder + pb := AcquirePathBuilder() + defer ReleasePathBuilder(pb) + + // Get dir entries + entries, err := os.ReadDir(path) + if err != nil { + return err + } + + // Recurse dirs + for _, entry := range entries { + if entry.IsDir() { + err := cleanDirs(pb, pb.Join(path, entry.Name())) + if err != nil { + return err + } + } + } + return nil +} + +// cleanDirs performs the actual dir cleaning logic for the exported version +func cleanDirs(pb *fastpath.Builder, path string) error { + // Get dir entries + entries, err := os.ReadDir(path) + if err != nil { + return err + } + + // If no entries, delete + if len(entries) < 1 { + return os.Remove(path) + } + + // Recurse dirs + for _, entry := range entries { + if entry.IsDir() { + err := cleanDirs(pb, pb.Join(path, entry.Name())) + if err != nil { + return err + } + } + } + return nil +} + +// RetryOnEINTR is a low-level filesystem function for retrying syscalls on O_EINTR received +func RetryOnEINTR(do func() error) error { + for { + err := do() + if err == syscall.EINTR { + continue + } + return err + } +} diff --git a/vendor/git.iim.gay/grufwub/go-store/util/io.go b/vendor/git.iim.gay/grufwub/go-store/util/io.go new file mode 100644 index 000000000..d034cf62b --- /dev/null +++ b/vendor/git.iim.gay/grufwub/go-store/util/io.go @@ -0,0 +1,42 @@ +package util + +import "io" + +// NopReadCloser turns a supplied io.Reader into io.ReadCloser with a nop Close() implementation +func NopReadCloser(r io.Reader) io.ReadCloser { + return &nopReadCloser{r} +} + +// NopWriteCloser turns a supplied io.Writer into io.WriteCloser with a nop Close() implementation +func NopWriteCloser(w io.Writer) io.WriteCloser { + return &nopWriteCloser{w} +} + +// ReadCloserWithCallback adds a customizable callback to be called upon Close() of a supplied io.ReadCloser +func ReadCloserWithCallback(rc io.ReadCloser, cb func()) io.ReadCloser { + return &callbackReadCloser{ + ReadCloser: rc, + callback: cb, + } +} + +// nopReadCloser turns an io.Reader -> io.ReadCloser with a nop Close() +type nopReadCloser struct{ io.Reader } + +func (r *nopReadCloser) Close() error { return nil } + +// nopWriteCloser turns an io.Writer -> io.WriteCloser with a nop Close() +type nopWriteCloser struct{ io.Writer } + +func (w nopWriteCloser) Close() error { return nil } + +// callbackReadCloser allows adding our own custom callback to an io.ReadCloser +type callbackReadCloser struct { + io.ReadCloser + callback func() +} + +func (c *callbackReadCloser) Close() error { + defer c.callback() + return c.ReadCloser.Close() +} diff --git a/vendor/git.iim.gay/grufwub/go-store/util/nocopy.go b/vendor/git.iim.gay/grufwub/go-store/util/nocopy.go new file mode 100644 index 000000000..e4dd071aa --- /dev/null +++ b/vendor/git.iim.gay/grufwub/go-store/util/nocopy.go @@ -0,0 +1,6 @@ +package util + +type NoCopy struct{} + +func (*NoCopy) Lock() {} +func (*NoCopy) Unlock() {} diff --git a/vendor/git.iim.gay/grufwub/go-store/util/pools.go b/vendor/git.iim.gay/grufwub/go-store/util/pools.go new file mode 100644 index 000000000..c02f2d25e --- /dev/null +++ b/vendor/git.iim.gay/grufwub/go-store/util/pools.go @@ -0,0 +1,44 @@ +package util + +import ( + "sync" + + "git.iim.gay/grufwub/fastpath" + "git.iim.gay/grufwub/go-bufpool" + "git.iim.gay/grufwub/go-bytes" +) + +// pathBuilderPool is the global fastpath.Builder pool, we implement +// our own here instead of using fastpath's default one because we +// don't want to deal with fastpath's sync.Once locks on every Acquire/Release +var pathBuilderPool = sync.Pool{ + New: func() interface{} { + pb := fastpath.NewBuilder(make([]byte, 0, 512)) + return &pb + }, +} + +// AcquirePathBuilder returns a reset fastpath.Builder instance +func AcquirePathBuilder() *fastpath.Builder { + return pathBuilderPool.Get().(*fastpath.Builder) +} + +// ReleasePathBuilder resets and releases provided fastpath.Builder instance to global pool +func ReleasePathBuilder(pb *fastpath.Builder) { + pb.Reset() + pathBuilderPool.Put(pb) +} + +// bufferPool is the global BufferPool, we implement this here +// so we can share allocations across whatever libaries need them. +var bufferPool = bufpool.BufferPool{} + +// AcquireBuffer returns a reset bytes.Buffer with at least requested capacity +func AcquireBuffer(cap int) *bytes.Buffer { + return bufferPool.Get(cap) +} + +// ReleaseBuffer resets and releases provided bytes.Buffer to global BufferPool +func ReleaseBuffer(buf *bytes.Buffer) { + bufferPool.Put(buf) +} |