diff options
author | 2021-11-13 12:29:08 +0100 | |
---|---|---|
committer | 2021-11-13 12:29:08 +0100 | |
commit | 829a934d23ab221049b4d54926305d8d5d64c9ad (patch) | |
tree | f4e382b289c113d3ba8a3c7a183507a5609c46c0 /vendor/codeberg.org/gruf/go-store | |
parent | smtp + email confirmation (#285) (diff) | |
download | gotosocial-829a934d23ab221049b4d54926305d8d5d64c9ad.tar.xz |
update dependencies (#296)
Diffstat (limited to 'vendor/codeberg.org/gruf/go-store')
-rw-r--r-- | vendor/codeberg.org/gruf/go-store/LICENSE | 9 | ||||
-rw-r--r-- | vendor/codeberg.org/gruf/go-store/kv/iterator.go | 64 | ||||
-rw-r--r-- | vendor/codeberg.org/gruf/go-store/kv/state.go | 125 | ||||
-rw-r--r-- | vendor/codeberg.org/gruf/go-store/kv/store.go | 243 | ||||
-rw-r--r-- | vendor/codeberg.org/gruf/go-store/storage/block.go | 797 | ||||
-rw-r--r-- | vendor/codeberg.org/gruf/go-store/storage/compressor.go | 104 | ||||
-rw-r--r-- | vendor/codeberg.org/gruf/go-store/storage/disk.go | 291 | ||||
-rw-r--r-- | vendor/codeberg.org/gruf/go-store/storage/errors.go | 63 | ||||
-rw-r--r-- | vendor/codeberg.org/gruf/go-store/storage/fs.go | 48 | ||||
-rw-r--r-- | vendor/codeberg.org/gruf/go-store/storage/lock.go | 34 | ||||
-rw-r--r-- | vendor/codeberg.org/gruf/go-store/storage/storage.go | 51 | ||||
-rw-r--r-- | vendor/codeberg.org/gruf/go-store/storage/transform.go | 25 | ||||
-rw-r--r-- | vendor/codeberg.org/gruf/go-store/util/fs.go | 105 | ||||
-rw-r--r-- | vendor/codeberg.org/gruf/go-store/util/io.go | 42 | ||||
-rw-r--r-- | vendor/codeberg.org/gruf/go-store/util/pool.go | 20 |
15 files changed, 2021 insertions, 0 deletions
diff --git a/vendor/codeberg.org/gruf/go-store/LICENSE b/vendor/codeberg.org/gruf/go-store/LICENSE new file mode 100644 index 000000000..b7c4417ac --- /dev/null +++ b/vendor/codeberg.org/gruf/go-store/LICENSE @@ -0,0 +1,9 @@ +MIT License + +Copyright (c) 2021 gruf + +Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. diff --git a/vendor/codeberg.org/gruf/go-store/kv/iterator.go b/vendor/codeberg.org/gruf/go-store/kv/iterator.go new file mode 100644 index 000000000..d3999273f --- /dev/null +++ b/vendor/codeberg.org/gruf/go-store/kv/iterator.go @@ -0,0 +1,64 @@ +package kv + +import ( + "codeberg.org/gruf/go-errors" + "codeberg.org/gruf/go-store/storage" +) + +var ErrIteratorClosed = errors.New("store/kv: iterator closed") + +// KVIterator provides a read-only iterator to all the key-value +// pairs in a KVStore. While the iterator is open the store is read +// locked, you MUST release the iterator when you are finished with +// it. +// +// Please note: +// - individual iterators are NOT concurrency safe, though it is safe to +// have multiple iterators running concurrently +type KVIterator struct { + store *KVStore // store is the linked KVStore + entries []storage.StorageEntry + index int + key string + onClose func() +} + +// Next attempts to set the next key-value pair, the +// return value is if there was another pair remaining +func (i *KVIterator) Next() bool { + next := i.index + 1 + if next >= len(i.entries) { + i.key = "" + return false + } + i.key = i.entries[next].Key() + i.index = next + return true +} + +// Key returns the next key from the store +func (i *KVIterator) Key() string { + return i.key +} + +// Release releases the KVIterator and KVStore's read lock +func (i *KVIterator) Release() { + // Reset key, path, entries + i.store = nil + i.key = "" + i.entries = nil + + // Perform requested callback + i.onClose() +} + +// Value returns the next value from the KVStore +func (i *KVIterator) Value() ([]byte, error) { + // Check store isn't closed + if i.store == nil { + return nil, ErrIteratorClosed + } + + // Attempt to fetch from store + return i.store.get(i.key) +} diff --git a/vendor/codeberg.org/gruf/go-store/kv/state.go b/vendor/codeberg.org/gruf/go-store/kv/state.go new file mode 100644 index 000000000..a8c1b9c82 --- /dev/null +++ b/vendor/codeberg.org/gruf/go-store/kv/state.go @@ -0,0 +1,125 @@ +package kv + +import ( + "io" + + "codeberg.org/gruf/go-errors" +) + +var ErrStateClosed = errors.New("store/kv: state closed") + +// StateRO provides a read-only window to the store. While this +// state is active during the Read() function window, the entire +// store will be read-locked. The state is thread-safe for concurrent +// use UNTIL the moment that your supplied function to Read() returns, +// then the state has zero guarantees +type StateRO struct { + store *KVStore +} + +func (st *StateRO) Get(key string) ([]byte, error) { + // Check not closed + if st.store == nil { + return nil, ErrStateClosed + } + + // Pass request to store + return st.store.get(key) +} + +func (st *StateRO) GetStream(key string) (io.ReadCloser, error) { + // Check not closed + if st.store == nil { + return nil, ErrStateClosed + } + + // Pass request to store + return st.store.getStream(key) +} + +func (st *StateRO) Has(key string) (bool, error) { + // Check not closed + if st.store == nil { + return false, ErrStateClosed + } + + // Pass request to store + return st.store.has(key) +} + +func (st *StateRO) close() { + st.store = nil +} + +// StateRW provides a read-write window to the store. While this +// state is active during the Update() function window, the entire +// store will be locked. The state is thread-safe for concurrent +// use UNTIL the moment that your supplied function to Update() returns, +// then the state has zero guarantees +type StateRW struct { + store *KVStore +} + +func (st *StateRW) Get(key string) ([]byte, error) { + // Check not closed + if st.store == nil { + return nil, ErrStateClosed + } + + // Pass request to store + return st.store.get(key) +} + +func (st *StateRW) GetStream(key string) (io.ReadCloser, error) { + // Check not closed + if st.store == nil { + return nil, ErrStateClosed + } + + // Pass request to store + return st.store.getStream(key) +} + +func (st *StateRW) Put(key string, value []byte) error { + // Check not closed + if st.store == nil { + return ErrStateClosed + } + + // Pass request to store + return st.store.put(key, value) +} + +func (st *StateRW) PutStream(key string, r io.Reader) error { + // Check not closed + if st.store == nil { + return ErrStateClosed + } + + // Pass request to store + return st.store.putStream(key, r) +} + +func (st *StateRW) Has(key string) (bool, error) { + // Check not closed + if st.store == nil { + return false, ErrStateClosed + } + + // Pass request to store + return st.store.has(key) +} + +func (st *StateRW) Delete(key string) error { + // Check not closed + if st.store == nil { + return ErrStateClosed + } + + // Pass request to store + return st.store.delete(key) +} + +func (st *StateRW) close() { + st.store = nil +} diff --git a/vendor/codeberg.org/gruf/go-store/kv/store.go b/vendor/codeberg.org/gruf/go-store/kv/store.go new file mode 100644 index 000000000..d25b3fb04 --- /dev/null +++ b/vendor/codeberg.org/gruf/go-store/kv/store.go @@ -0,0 +1,243 @@ +package kv + +import ( + "io" + "sync" + + "codeberg.org/gruf/go-mutexes" + "codeberg.org/gruf/go-store/storage" + "codeberg.org/gruf/go-store/util" +) + +// KVStore is a very simple, yet performant key-value store +type KVStore struct { + mutexMap mutexes.MutexMap // mutexMap is a map of keys to mutexes to protect file access + mutex sync.RWMutex // mutex is the total store mutex + storage storage.Storage // storage is the underlying storage +} + +func OpenFile(path string, cfg *storage.DiskConfig) (*KVStore, error) { + // Attempt to open disk storage + storage, err := storage.OpenFile(path, cfg) + if err != nil { + return nil, err + } + + // Return new KVStore + return OpenStorage(storage) +} + +func OpenBlock(path string, cfg *storage.BlockConfig) (*KVStore, error) { + // Attempt to open block storage + storage, err := storage.OpenBlock(path, cfg) + if err != nil { + return nil, err + } + + // Return new KVStore + return OpenStorage(storage) +} + +func OpenStorage(storage storage.Storage) (*KVStore, error) { + // Perform initial storage clean + err := storage.Clean() + if err != nil { + return nil, err + } + + // Return new KVStore + return &KVStore{ + mutexMap: mutexes.NewMap(mutexes.NewRW), + mutex: sync.RWMutex{}, + storage: storage, + }, nil +} + +// Get fetches the bytes for supplied key in the store +func (st *KVStore) Get(key string) ([]byte, error) { + // Acquire store read lock + st.mutex.RLock() + defer st.mutex.RUnlock() + + // Pass to unprotected fn + return st.get(key) +} + +func (st *KVStore) get(key string) ([]byte, error) { + // Acquire read lock for key + runlock := st.mutexMap.RLock(key) + defer runlock() + + // Read file bytes + return st.storage.ReadBytes(key) +} + +// GetStream fetches a ReadCloser for the bytes at the supplied key location in the store +func (st *KVStore) GetStream(key string) (io.ReadCloser, error) { + // Acquire store read lock + st.mutex.RLock() + defer st.mutex.RUnlock() + + // Pass to unprotected fn + return st.getStream(key) +} + +func (st *KVStore) getStream(key string) (io.ReadCloser, error) { + // Acquire read lock for key + runlock := st.mutexMap.RLock(key) + + // Attempt to open stream for read + rd, err := st.storage.ReadStream(key) + if err != nil { + runlock() + return nil, err + } + + // Wrap readcloser in our own callback closer + return util.ReadCloserWithCallback(rd, runlock), nil +} + +// Put places the bytes at the supplied key location in the store +func (st *KVStore) Put(key string, value []byte) error { + // Acquire store write lock + st.mutex.Lock() + defer st.mutex.Unlock() + + // Pass to unprotected fn + return st.put(key, value) +} + +func (st *KVStore) put(key string, value []byte) error { + // Acquire write lock for key + unlock := st.mutexMap.Lock(key) + defer unlock() + + // Write file bytes + return st.storage.WriteBytes(key, value) +} + +// PutStream writes the bytes from the supplied Reader at the supplied key location in the store +func (st *KVStore) PutStream(key string, r io.Reader) error { + // Acquire store write lock + st.mutex.Lock() + defer st.mutex.Unlock() + + // Pass to unprotected fn + return st.putStream(key, r) +} + +func (st *KVStore) putStream(key string, r io.Reader) error { + // Acquire write lock for key + unlock := st.mutexMap.Lock(key) + defer unlock() + + // Write file stream + return st.storage.WriteStream(key, r) +} + +// Has checks whether the supplied key exists in the store +func (st *KVStore) Has(key string) (bool, error) { + // Acquire store read lock + st.mutex.RLock() + defer st.mutex.RUnlock() + + // Pass to unprotected fn + return st.has(key) +} + +func (st *KVStore) has(key string) (bool, error) { + // Acquire read lock for key + runlock := st.mutexMap.RLock(key) + defer runlock() + + // Stat file on disk + return st.storage.Stat(key) +} + +// Delete removes the supplied key-value pair from the store +func (st *KVStore) Delete(key string) error { + // Acquire store write lock + st.mutex.Lock() + defer st.mutex.Unlock() + + // Pass to unprotected fn + return st.delete(key) +} + +func (st *KVStore) delete(key string) error { + // Acquire write lock for key + unlock := st.mutexMap.Lock(key) + defer unlock() + + // Remove file from disk + return st.storage.Remove(key) +} + +// Iterator returns an Iterator for key-value pairs in the store, using supplied match function +func (st *KVStore) Iterator(matchFn func(string) bool) (*KVIterator, error) { + // If no function, match all + if matchFn == nil { + matchFn = func(string) bool { return true } + } + + // Get store read lock + st.mutex.RLock() + + // Setup the walk keys function + entries := []storage.StorageEntry{} + walkFn := func(entry storage.StorageEntry) { + // Ignore unmatched entries + if !matchFn(entry.Key()) { + return + } + + // Add to entries + entries = append(entries, entry) + } + + // Walk keys in the storage + err := st.storage.WalkKeys(&storage.WalkKeysOptions{WalkFn: walkFn}) + if err != nil { + st.mutex.RUnlock() + return nil, err + } + + // Return new iterator + return &KVIterator{ + store: st, + entries: entries, + index: -1, + key: "", + onClose: st.mutex.RUnlock, + }, nil +} + +// Read provides a read-only window to the store, holding it in a read-locked state until +// the supplied function returns +func (st *KVStore) Read(do func(*StateRO)) { + // Get store read lock + st.mutex.RLock() + defer st.mutex.RUnlock() + + // Create new store state (defer close) + state := &StateRO{store: st} + defer state.close() + + // Pass state + do(state) +} + +// Update provides a read-write window to the store, holding it in a read-write-locked state +// until the supplied functions returns +func (st *KVStore) Update(do func(*StateRW)) { + // Get store lock + st.mutex.Lock() + defer st.mutex.Unlock() + + // Create new store state (defer close) + state := &StateRW{store: st} + defer state.close() + + // Pass state + do(state) +} diff --git a/vendor/codeberg.org/gruf/go-store/storage/block.go b/vendor/codeberg.org/gruf/go-store/storage/block.go new file mode 100644 index 000000000..eb51a4cd3 --- /dev/null +++ b/vendor/codeberg.org/gruf/go-store/storage/block.go @@ -0,0 +1,797 @@ +package storage + +import ( + "crypto/sha256" + "io" + "io/fs" + "os" + "strings" + "sync" + "syscall" + + "codeberg.org/gruf/go-bytes" + "codeberg.org/gruf/go-errors" + "codeberg.org/gruf/go-hashenc" + "codeberg.org/gruf/go-pools" + "codeberg.org/gruf/go-store/util" +) + +var ( + nodePathPrefix = "node/" + blockPathPrefix = "block/" +) + +// DefaultBlockConfig is the default BlockStorage configuration +var DefaultBlockConfig = &BlockConfig{ + BlockSize: 1024 * 16, + WriteBufSize: 4096, + Overwrite: false, + Compression: NoCompression(), +} + +// BlockConfig defines options to be used when opening a BlockStorage +type BlockConfig struct { + // BlockSize is the chunking size to use when splitting and storing blocks of data + BlockSize int + + // WriteBufSize is the buffer size to use when writing file streams (PutStream) + WriteBufSize int + + // Overwrite allows overwriting values of stored keys in the storage + Overwrite bool + + // Compression is the Compressor to use when reading / writing files, default is no compression + Compression Compressor +} + +// getBlockConfig returns a valid BlockConfig for supplied ptr +func getBlockConfig(cfg *BlockConfig) BlockConfig { + // If nil, use default + if cfg == nil { + cfg = DefaultBlockConfig + } + + // Assume nil compress == none + if cfg.Compression == nil { + cfg.Compression = NoCompression() + } + + // Assume 0 chunk size == use default + if cfg.BlockSize < 1 { + cfg.BlockSize = DefaultBlockConfig.BlockSize + } + + // Assume 0 buf size == use default + if cfg.WriteBufSize < 1 { + cfg.WriteBufSize = DefaultDiskConfig.WriteBufSize + } + + // Return owned config copy + return BlockConfig{ + BlockSize: cfg.BlockSize, + WriteBufSize: cfg.WriteBufSize, + Overwrite: cfg.Overwrite, + Compression: cfg.Compression, + } +} + +// BlockStorage is a Storage implementation that stores input data as chunks on +// a filesystem. Each value is chunked into blocks of configured size and these +// blocks are stored with name equal to their base64-encoded SHA256 hash-sum. A +// "node" file is finally created containing an array of hashes contained within +// this value +type BlockStorage struct { + path string // path is the root path of this store + blockPath string // blockPath is the joined root path + block path prefix + nodePath string // nodePath is the joined root path + node path prefix + config BlockConfig // cfg is the supplied configuration for this store + hashPool sync.Pool // hashPool is this store's hashEncoder pool + bufpool pools.BufferPool // bufpool is this store's bytes.Buffer pool + + // NOTE: + // BlockStorage does not need to lock each of the underlying block files + // as the filename itself directly relates to the contents. If there happens + // to be an overwrite, it will just be of the same data since the filename is + // the hash of the data. +} + +// OpenBlock opens a BlockStorage instance for given folder path and configuration +func OpenBlock(path string, cfg *BlockConfig) (*BlockStorage, error) { + // Acquire path builder + pb := util.GetPathBuilder() + defer util.PutPathBuilder(pb) + + // Clean provided path, ensure ends in '/' (should + // be dir, this helps with file path trimming later) + path = pb.Clean(path) + "/" + + // Get checked config + config := getBlockConfig(cfg) + + // Attempt to open path + file, err := os.OpenFile(path, defaultFileROFlags, defaultDirPerms) + if err != nil { + // If not a not-exist error, return + if !os.IsNotExist(err) { + return nil, err + } + + // Attempt to make store path dirs + err = os.MkdirAll(path, defaultDirPerms) + if err != nil { + return nil, err + } + + // Reopen dir now it's been created + file, err = os.OpenFile(path, defaultFileROFlags, defaultDirPerms) + if err != nil { + return nil, err + } + } + defer file.Close() + + // Double check this is a dir (NOT a file!) + stat, err := file.Stat() + if err != nil { + return nil, err + } else if !stat.IsDir() { + return nil, errPathIsFile + } + + // Figure out the largest size for bufpool slices + bufSz := encodedHashLen + if bufSz < config.BlockSize { + bufSz = config.BlockSize + } + if bufSz < config.WriteBufSize { + bufSz = config.WriteBufSize + } + + // Return new BlockStorage + return &BlockStorage{ + path: path, + blockPath: pb.Join(path, blockPathPrefix), + nodePath: pb.Join(path, nodePathPrefix), + config: config, + hashPool: sync.Pool{ + New: func() interface{} { + return newHashEncoder() + }, + }, + bufpool: pools.NewBufferPool(bufSz), + }, nil +} + +// Clean implements storage.Clean() +func (st *BlockStorage) Clean() error { + nodes := map[string]*node{} + + // Acquire path builder + pb := util.GetPathBuilder() + defer util.PutPathBuilder(pb) + + // Walk nodes dir for entries + onceErr := errors.OnceError{} + err := util.WalkDir(pb, st.nodePath, func(npath string, fsentry fs.DirEntry) { + // Only deal with regular files + if !fsentry.Type().IsRegular() { + return + } + + // Stop if we hit error previously + if onceErr.IsSet() { + return + } + + // Get joined node path name + npath = pb.Join(npath, fsentry.Name()) + + // Attempt to open RO file + file, err := open(npath, defaultFileROFlags) + if err != nil { + onceErr.Store(err) + return + } + defer file.Close() + + // Alloc new Node + acquire hash buffer for writes + hbuf := st.bufpool.Get() + defer st.bufpool.Put(hbuf) + hbuf.Guarantee(encodedHashLen) + node := node{} + + // Write file contents to node + _, err = io.CopyBuffer( + &nodeWriter{ + node: &node, + buf: hbuf, + }, + file, + nil, + ) + if err != nil { + onceErr.Store(err) + return + } + + // Append to nodes slice + nodes[fsentry.Name()] = &node + }) + + // Handle errors (though nodePath may not have been created yet) + if err != nil && !os.IsNotExist(err) { + return err + } else if onceErr.IsSet() { + return onceErr.Load() + } + + // Walk blocks dir for entries + onceErr.Reset() + err = util.WalkDir(pb, st.blockPath, func(bpath string, fsentry fs.DirEntry) { + // Only deal with regular files + if !fsentry.Type().IsRegular() { + return + } + + // Stop if we hit error previously + if onceErr.IsSet() { + return + } + + inUse := false + for key, node := range nodes { + if node.removeHash(fsentry.Name()) { + if len(node.hashes) < 1 { + // This node contained hash, and after removal is now empty. + // Remove this node from our tracked nodes slice + delete(nodes, key) + } + inUse = true + } + } + + // Block hash is used by node + if inUse { + return + } + + // Get joined block path name + bpath = pb.Join(bpath, fsentry.Name()) + + // Remove this unused block path + err := os.Remove(bpath) + if err != nil { + onceErr.Store(err) + return + } + }) + + // Handle errors (though blockPath may not have been created yet) + if err != nil && !os.IsNotExist(err) { + return err + } else if onceErr.IsSet() { + return onceErr.Load() + } + + // If there are nodes left at this point, they are corrupt + // (i.e. they're referencing block hashes that don't exist) + if len(nodes) > 0 { + nodeKeys := []string{} + for key := range nodes { + nodeKeys = append(nodeKeys, key) + } + return errCorruptNodes.Extend("%v", nodeKeys) + } + + return nil +} + +// ReadBytes implements Storage.ReadBytes() +func (st *BlockStorage) ReadBytes(key string) ([]byte, error) { + // Get stream reader for key + rc, err := st.ReadStream(key) + if err != nil { + return nil, err + } + + // Read all bytes and return + return io.ReadAll(rc) +} + +// ReadStream implements Storage.ReadStream() +func (st *BlockStorage) ReadStream(key string) (io.ReadCloser, error) { + // Get node file path for key + npath, err := st.nodePathForKey(key) + if err != nil { + return nil, err + } + + // Attempt to open RO file + file, err := open(npath, defaultFileROFlags) + if err != nil { + return nil, err + } + defer file.Close() + + // Acquire hash buffer for writes + hbuf := st.bufpool.Get() + defer st.bufpool.Put(hbuf) + + // Write file contents to node + node := node{} + _, err = io.CopyBuffer( + &nodeWriter{ + node: &node, + buf: hbuf, + }, + file, + nil, + ) + if err != nil { + return nil, err + } + + // Return new block reader + return util.NopReadCloser(&blockReader{ + storage: st, + node: &node, + }), nil +} + +func (st *BlockStorage) readBlock(key string) ([]byte, error) { + // Get block file path for key + bpath := st.blockPathForKey(key) + + // Attempt to open RO file + file, err := open(bpath, defaultFileROFlags) + if err != nil { + return nil, err + } + defer file.Close() + + // Wrap the file in a compressor + cFile, err := st.config.Compression.Reader(file) + if err != nil { + return nil, err + } + defer cFile.Close() + + // Read the entire file + return io.ReadAll(cFile) +} + +// WriteBytes implements Storage.WriteBytes() +func (st *BlockStorage) WriteBytes(key string, value []byte) error { + return st.WriteStream(key, bytes.NewReader(value)) +} + +// WriteStream implements Storage.WriteStream() +func (st *BlockStorage) WriteStream(key string, r io.Reader) error { + // Get node file path for key + npath, err := st.nodePathForKey(key) + if err != nil { + return err + } + + // Check if this exists + ok, err := stat(key) + if err != nil { + return err + } + + // Check if we allow overwrites + if ok && !st.config.Overwrite { + return ErrAlreadyExists + } + + // Ensure nodes dir (and any leading up to) exists + err = os.MkdirAll(st.nodePath, defaultDirPerms) + if err != nil { + return err + } + + // Ensure blocks dir (and any leading up to) exists + err = os.MkdirAll(st.blockPath, defaultDirPerms) + if err != nil { + return err + } + + // Alloc new node + node := node{} + + // Acquire HashEncoder + hc := st.hashPool.Get().(*hashEncoder) + defer st.hashPool.Put(hc) + + // Create new waitgroup and OnceError for + // goroutine error tracking and propagating + wg := sync.WaitGroup{} + onceErr := errors.OnceError{} + +loop: + for !onceErr.IsSet() { + // Fetch new buffer for this loop + buf := st.bufpool.Get() + buf.Grow(st.config.BlockSize) + + // Read next chunk + n, err := io.ReadFull(r, buf.B) + switch err { + case nil, io.ErrUnexpectedEOF: + // do nothing + case io.EOF: + st.bufpool.Put(buf) + break loop + default: + st.bufpool.Put(buf) + return err + } + + // Hash the encoded data + sum := hc.EncodeSum(buf.B) + + // Append to the node's hashes + node.hashes = append(node.hashes, sum.String()) + + // If already on disk, skip + has, err := st.statBlock(sum.StringPtr()) + if err != nil { + st.bufpool.Put(buf) + return err + } else if has { + st.bufpool.Put(buf) + continue loop + } + + // Write in separate goroutine + wg.Add(1) + go func() { + // Defer buffer release + signal done + defer func() { + st.bufpool.Put(buf) + wg.Done() + }() + + // Write block to store at hash + err = st.writeBlock(sum.StringPtr(), buf.B[:n]) + if err != nil { + onceErr.Store(err) + return + } + }() + + // We reached EOF + if n < buf.Len() { + break loop + } + } + + // Wait, check errors + wg.Wait() + if onceErr.IsSet() { + return onceErr.Load() + } + + // If no hashes created, return + if len(node.hashes) < 1 { + return errNoHashesWritten + } + + // Prepare to swap error if need-be + errSwap := errSwapNoop + + // Build file RW flags + // NOTE: we performed an initial check for + // this before writing blocks, but if + // the utilizer of this storage didn't + // correctly mutex protect this key then + // someone may have beaten us to the + // punch at writing the node file. + flags := defaultFileRWFlags + if !st.config.Overwrite { + flags |= syscall.O_EXCL + + // Catch + replace err exist + errSwap = errSwapExist + } + + // Attempt to open RW file + file, err := open(npath, flags) + if err != nil { + return errSwap(err) + } + defer file.Close() + + // Acquire write buffer + buf := st.bufpool.Get() + defer st.bufpool.Put(buf) + buf.Grow(st.config.WriteBufSize) + + // Finally, write data to file + _, err = io.CopyBuffer(file, &nodeReader{node: &node}, nil) + return err +} + +// writeBlock writes the block with hash and supplied value to the filesystem +func (st *BlockStorage) writeBlock(hash string, value []byte) error { + // Get block file path for key + bpath := st.blockPathForKey(hash) + + // Attempt to open RW file + file, err := open(bpath, defaultFileRWFlags) + if err != nil { + if err == ErrAlreadyExists { + err = nil /* race issue describe in struct NOTE */ + } + return err + } + defer file.Close() + + // Wrap the file in a compressor + cFile, err := st.config.Compression.Writer(file) + if err != nil { + return err + } + defer cFile.Close() + + // Write value to file + _, err = cFile.Write(value) + return err +} + +// statBlock checks for existence of supplied block hash +func (st *BlockStorage) statBlock(hash string) (bool, error) { + return stat(st.blockPathForKey(hash)) +} + +// Stat implements Storage.Stat() +func (st *BlockStorage) Stat(key string) (bool, error) { + // Get node file path for key + kpath, err := st.nodePathForKey(key) + if err != nil { + return false, err + } + + // Check for file on disk + return stat(kpath) +} + +// Remove implements Storage.Remove() +func (st *BlockStorage) Remove(key string) error { + // Get node file path for key + kpath, err := st.nodePathForKey(key) + if err != nil { + return err + } + + // Attempt to remove file + return os.Remove(kpath) +} + +// WalkKeys implements Storage.WalkKeys() +func (st *BlockStorage) WalkKeys(opts *WalkKeysOptions) error { + // Acquire path builder + pb := util.GetPathBuilder() + defer util.PutPathBuilder(pb) + + // Walk dir for entries + return util.WalkDir(pb, st.nodePath, func(npath string, fsentry fs.DirEntry) { + // Only deal with regular files + if fsentry.Type().IsRegular() { + opts.WalkFn(entry(fsentry.Name())) + } + }) +} + +// nodePathForKey calculates the node file path for supplied key +func (st *BlockStorage) nodePathForKey(key string) (string, error) { + // Path separators are illegal + if strings.Contains(key, "/") { + return "", ErrInvalidKey + } + + // Acquire path builder + pb := util.GetPathBuilder() + defer util.PutPathBuilder(pb) + + // Return joined + cleaned node-path + return pb.Join(st.nodePath, key), nil +} + +// blockPathForKey calculates the block file path for supplied hash +func (st *BlockStorage) blockPathForKey(hash string) string { + pb := util.GetPathBuilder() + defer util.PutPathBuilder(pb) + return pb.Join(st.blockPath, hash) +} + +// hashSeparator is the separating byte between block hashes +const hashSeparator = byte(':') + +// node represents the contents of a node file in storage +type node struct { + hashes []string +} + +// removeHash attempts to remove supplied block hash from the node's hash array +func (n *node) removeHash(hash string) bool { + haveDropped := false + for i := 0; i < len(n.hashes); { + if n.hashes[i] == hash { + // Drop this hash from slice + n.hashes = append(n.hashes[:i], n.hashes[i+1:]...) + haveDropped = true + } else { + // Continue iter + i++ + } + } + return haveDropped +} + +// nodeReader is an io.Reader implementation for the node file representation, +// which is useful when calculated node file is being written to the store +type nodeReader struct { + node *node + idx int + last int +} + +func (r *nodeReader) Read(b []byte) (int, error) { + n := 0 + + // '-1' means we missed writing + // hash separator on last iteration + if r.last == -1 { + b[n] = hashSeparator + n++ + r.last = 0 + } + + for r.idx < len(r.node.hashes) { + hash := r.node.hashes[r.idx] + + // Copy into buffer + update read count + m := copy(b[n:], hash[r.last:]) + n += m + + // If incomplete copy, return here + if m < len(hash)-r.last { + r.last = m + return n, nil + } + + // Check we can write last separator + if n == len(b) { + r.last = -1 + return n, nil + } + + // Write separator, iter, reset + b[n] = hashSeparator + n++ + r.idx++ + r.last = 0 + } + + // We reached end of hashes + return n, io.EOF +} + +// nodeWriter is an io.Writer implementation for the node file representation, +// which is useful when calculated node file is being read from the store +type nodeWriter struct { + node *node + buf *bytes.Buffer +} + +func (w *nodeWriter) Write(b []byte) (int, error) { + n := 0 + + for { + // Find next hash separator position + idx := bytes.IndexByte(b[n:], hashSeparator) + if idx == -1 { + // Check we shouldn't be expecting it + if w.buf.Len() > encodedHashLen { + return n, errInvalidNode + } + + // Write all contents to buffer + w.buf.Write(b[n:]) + return len(b), nil + } + + // Found hash separator, write + // current buf contents to Node hashes + w.buf.Write(b[n : n+idx]) + n += idx + 1 + if w.buf.Len() != encodedHashLen { + return n, errInvalidNode + } + + // Append to hashes & reset + w.node.hashes = append(w.node.hashes, w.buf.String()) + w.buf.Reset() + } +} + +// blockReader is an io.Reader implementation for the combined, linked block +// data contained with a node file. Basically, this allows reading value data +// from the store for a given node file +type blockReader struct { + storage *BlockStorage + node *node + buf []byte + prev int +} + +func (r *blockReader) Read(b []byte) (int, error) { + n := 0 + + // Data left in buf, copy as much as we + // can into supplied read buffer + if r.prev < len(r.buf)-1 { + n += copy(b, r.buf[r.prev:]) + r.prev += n + if n >= len(b) { + return n, nil + } + } + + for { + // Check we have any hashes left + if len(r.node.hashes) < 1 { + return n, io.EOF + } + + // Get next key from slice + key := r.node.hashes[0] + r.node.hashes = r.node.hashes[1:] + + // Attempt to fetch next batch of data + var err error + r.buf, err = r.storage.readBlock(key) + if err != nil { + return n, err + } + r.prev = 0 + + // Copy as much as can from new buffer + m := copy(b[n:], r.buf) + r.prev += m + n += m + + // If we hit end of supplied buf, return + if n >= len(b) { + return n, nil + } + } +} + +// hashEncoder is a HashEncoder with built-in encode buffer +type hashEncoder struct { + henc hashenc.HashEncoder + ebuf []byte +} + +// encodedHashLen is the once-calculated encoded hash-sum length +var encodedHashLen = hashenc.Base64().EncodedLen( + sha256.New().Size(), +) + +// newHashEncoder returns a new hashEncoder instance +func newHashEncoder() *hashEncoder { + hash := sha256.New() + enc := hashenc.Base64() + return &hashEncoder{ + henc: hashenc.New(hash, enc), + ebuf: make([]byte, enc.EncodedLen(hash.Size())), + } +} + +// EncodeSum encodes the src data and returns resulting bytes, only valid until next call to EncodeSum() +func (henc *hashEncoder) EncodeSum(src []byte) bytes.Bytes { + henc.henc.EncodeSum(henc.ebuf, src) + return bytes.ToBytes(henc.ebuf) +} diff --git a/vendor/codeberg.org/gruf/go-store/storage/compressor.go b/vendor/codeberg.org/gruf/go-store/storage/compressor.go new file mode 100644 index 000000000..d6b975db0 --- /dev/null +++ b/vendor/codeberg.org/gruf/go-store/storage/compressor.go @@ -0,0 +1,104 @@ +package storage + +import ( + "compress/gzip" + "compress/zlib" + "io" + + "codeberg.org/gruf/go-store/util" + "github.com/golang/snappy" +) + +// Compressor defines a means of compressing/decompressing values going into a key-value store +type Compressor interface { + // Reader returns a new decompressing io.ReadCloser based on supplied (compressed) io.Reader + Reader(io.Reader) (io.ReadCloser, error) + + // Writer returns a new compressing io.WriteCloser based on supplied (uncompressed) io.Writer + Writer(io.Writer) (io.WriteCloser, error) +} + +type gzipCompressor struct { + level int +} + +// GZipCompressor returns a new Compressor that implements GZip at default compression level +func GZipCompressor() Compressor { + return GZipCompressorLevel(gzip.DefaultCompression) +} + +// GZipCompressorLevel returns a new Compressor that implements GZip at supplied compression level +func GZipCompressorLevel(level int) Compressor { + return &gzipCompressor{ + level: level, + } +} + +func (c *gzipCompressor) Reader(r io.Reader) (io.ReadCloser, error) { + return gzip.NewReader(r) +} + +func (c *gzipCompressor) Writer(w io.Writer) (io.WriteCloser, error) { + return gzip.NewWriterLevel(w, c.level) +} + +type zlibCompressor struct { + level int + dict []byte +} + +// ZLibCompressor returns a new Compressor that implements ZLib at default compression level +func ZLibCompressor() Compressor { + return ZLibCompressorLevelDict(zlib.DefaultCompression, nil) +} + +// ZLibCompressorLevel returns a new Compressor that implements ZLib at supplied compression level +func ZLibCompressorLevel(level int) Compressor { + return ZLibCompressorLevelDict(level, nil) +} + +// ZLibCompressorLevelDict returns a new Compressor that implements ZLib at supplied compression level with supplied dict +func ZLibCompressorLevelDict(level int, dict []byte) Compressor { + return &zlibCompressor{ + level: level, + dict: dict, + } +} + +func (c *zlibCompressor) Reader(r io.Reader) (io.ReadCloser, error) { + return zlib.NewReaderDict(r, c.dict) +} + +func (c *zlibCompressor) Writer(w io.Writer) (io.WriteCloser, error) { + return zlib.NewWriterLevelDict(w, c.level, c.dict) +} + +type snappyCompressor struct{} + +// SnappyCompressor returns a new Compressor that implements Snappy +func SnappyCompressor() Compressor { + return &snappyCompressor{} +} + +func (c *snappyCompressor) Reader(r io.Reader) (io.ReadCloser, error) { + return util.NopReadCloser(snappy.NewReader(r)), nil +} + +func (c *snappyCompressor) Writer(w io.Writer) (io.WriteCloser, error) { + return snappy.NewBufferedWriter(w), nil +} + +type nopCompressor struct{} + +// NoCompression is a Compressor that simply does nothing +func NoCompression() Compressor { + return &nopCompressor{} +} + +func (c *nopCompressor) Reader(r io.Reader) (io.ReadCloser, error) { + return util.NopReadCloser(r), nil +} + +func (c *nopCompressor) Writer(w io.Writer) (io.WriteCloser, error) { + return util.NopWriteCloser(w), nil +} diff --git a/vendor/codeberg.org/gruf/go-store/storage/disk.go b/vendor/codeberg.org/gruf/go-store/storage/disk.go new file mode 100644 index 000000000..6b295755f --- /dev/null +++ b/vendor/codeberg.org/gruf/go-store/storage/disk.go @@ -0,0 +1,291 @@ +package storage + +import ( + "io" + "io/fs" + "os" + "path" + "syscall" + + "codeberg.org/gruf/go-bytes" + "codeberg.org/gruf/go-pools" + "codeberg.org/gruf/go-store/util" +) + +// DefaultDiskConfig is the default DiskStorage configuration +var DefaultDiskConfig = &DiskConfig{ + Overwrite: true, + WriteBufSize: 4096, + Transform: NopTransform(), + Compression: NoCompression(), +} + +// DiskConfig defines options to be used when opening a DiskStorage +type DiskConfig struct { + // Transform is the supplied key<-->path KeyTransform + Transform KeyTransform + + // WriteBufSize is the buffer size to use when writing file streams (PutStream) + WriteBufSize int + + // Overwrite allows overwriting values of stored keys in the storage + Overwrite bool + + // Compression is the Compressor to use when reading / writing files, default is no compression + Compression Compressor +} + +// getDiskConfig returns a valid DiskConfig for supplied ptr +func getDiskConfig(cfg *DiskConfig) DiskConfig { + // If nil, use default + if cfg == nil { + cfg = DefaultDiskConfig + } + + // Assume nil transform == none + if cfg.Transform == nil { + cfg.Transform = NopTransform() + } + + // Assume nil compress == none + if cfg.Compression == nil { + cfg.Compression = NoCompression() + } + + // Assume 0 buf size == use default + if cfg.WriteBufSize < 1 { + cfg.WriteBufSize = DefaultDiskConfig.WriteBufSize + } + + // Return owned config copy + return DiskConfig{ + Transform: cfg.Transform, + WriteBufSize: cfg.WriteBufSize, + Overwrite: cfg.Overwrite, + Compression: cfg.Compression, + } +} + +// DiskStorage is a Storage implementation that stores directly to a filesystem +type DiskStorage struct { + path string // path is the root path of this store + dots int // dots is the "dotdot" count for the root store path + bufp pools.BufferPool // bufp is the buffer pool for this DiskStorage + config DiskConfig // cfg is the supplied configuration for this store +} + +// OpenFile opens a DiskStorage instance for given folder path and configuration +func OpenFile(path string, cfg *DiskConfig) (*DiskStorage, error) { + // Acquire path builder + pb := util.GetPathBuilder() + defer util.PutPathBuilder(pb) + + // Clean provided path, ensure ends in '/' (should + // be dir, this helps with file path trimming later) + path = pb.Clean(path) + "/" + + // Get checked config + config := getDiskConfig(cfg) + + // Attempt to open dir path + file, err := os.OpenFile(path, defaultFileROFlags, defaultDirPerms) + if err != nil { + // If not a not-exist error, return + if !os.IsNotExist(err) { + return nil, err + } + + // Attempt to make store path dirs + err = os.MkdirAll(path, defaultDirPerms) + if err != nil { + return nil, err + } + + // Reopen dir now it's been created + file, err = os.OpenFile(path, defaultFileROFlags, defaultDirPerms) + if err != nil { + return nil, err + } + } + defer file.Close() + + // Double check this is a dir (NOT a file!) + stat, err := file.Stat() + if err != nil { + return nil, err + } else if !stat.IsDir() { + return nil, errPathIsFile + } + + // Return new DiskStorage + return &DiskStorage{ + path: path, + dots: util.CountDotdots(path), + bufp: pools.NewBufferPool(config.WriteBufSize), + config: config, + }, nil +} + +// Clean implements Storage.Clean() +func (st *DiskStorage) Clean() error { + return util.CleanDirs(st.path) +} + +// ReadBytes implements Storage.ReadBytes() +func (st *DiskStorage) ReadBytes(key string) ([]byte, error) { + // Get stream reader for key + rc, err := st.ReadStream(key) + if err != nil { + return nil, err + } + defer rc.Close() + + // Read all bytes and return + return io.ReadAll(rc) +} + +// ReadStream implements Storage.ReadStream() +func (st *DiskStorage) ReadStream(key string) (io.ReadCloser, error) { + // Get file path for key + kpath, err := st.filepath(key) + if err != nil { + return nil, err + } + + // Attempt to open file (replace ENOENT with our own) + file, err := open(kpath, defaultFileROFlags) + if err != nil { + return nil, errSwapNotFound(err) + } + + // Wrap the file in a compressor + cFile, err := st.config.Compression.Reader(file) + if err != nil { + file.Close() // close this here, ignore error + return nil, err + } + + // Wrap compressor to ensure file close + return util.ReadCloserWithCallback(cFile, func() { + file.Close() + }), nil +} + +// WriteBytes implements Storage.WriteBytes() +func (st *DiskStorage) WriteBytes(key string, value []byte) error { + return st.WriteStream(key, bytes.NewReader(value)) +} + +// WriteStream implements Storage.WriteStream() +func (st *DiskStorage) WriteStream(key string, r io.Reader) error { + // Get file path for key + kpath, err := st.filepath(key) + if err != nil { + return err + } + + // Ensure dirs leading up to file exist + err = os.MkdirAll(path.Dir(kpath), defaultDirPerms) + if err != nil { + return err + } + + // Prepare to swap error if need-be + errSwap := errSwapNoop + + // Build file RW flags + flags := defaultFileRWFlags + if !st.config.Overwrite { + flags |= syscall.O_EXCL + + // Catch + replace err exist + errSwap = errSwapExist + } + + // Attempt to open file + file, err := open(kpath, flags) + if err != nil { + return errSwap(err) + } + defer file.Close() + + // Wrap the file in a compressor + cFile, err := st.config.Compression.Writer(file) + if err != nil { + return err + } + defer cFile.Close() + + // Acquire write buffer + buf := st.bufp.Get() + defer st.bufp.Put(buf) + buf.Grow(st.config.WriteBufSize) + + // Copy reader to file + _, err = io.CopyBuffer(cFile, r, buf.B) + return err +} + +// Stat implements Storage.Stat() +func (st *DiskStorage) Stat(key string) (bool, error) { + // Get file path for key + kpath, err := st.filepath(key) + if err != nil { + return false, err + } + + // Check for file on disk + return stat(kpath) +} + +// Remove implements Storage.Remove() +func (st *DiskStorage) Remove(key string) error { + // Get file path for key + kpath, err := st.filepath(key) + if err != nil { + return err + } + + // Attempt to remove file + return os.Remove(kpath) +} + +// WalkKeys implements Storage.WalkKeys() +func (st *DiskStorage) WalkKeys(opts *WalkKeysOptions) error { + // Acquire path builder + pb := util.GetPathBuilder() + defer util.PutPathBuilder(pb) + + // Walk dir for entries + return util.WalkDir(pb, st.path, func(kpath string, fsentry fs.DirEntry) { + // Only deal with regular files + if fsentry.Type().IsRegular() { + // Get full item path (without root) + kpath = pb.Join(kpath, fsentry.Name())[len(st.path):] + + // Perform provided walk function + opts.WalkFn(entry(st.config.Transform.PathToKey(kpath))) + } + }) +} + +// filepath checks and returns a formatted filepath for given key +func (st *DiskStorage) filepath(key string) (string, error) { + // Acquire path builder + pb := util.GetPathBuilder() + defer util.PutPathBuilder(pb) + + // Calculate transformed key path + key = st.config.Transform.KeyToPath(key) + + // Generated joined root path + pb.AppendString(st.path) + pb.AppendString(key) + + // If path is dir traversal, and traverses FURTHER + // than store root, this is an error + if util.CountDotdots(pb.StringPtr()) > st.dots { + return "", ErrInvalidKey + } + return pb.String(), nil +} diff --git a/vendor/codeberg.org/gruf/go-store/storage/errors.go b/vendor/codeberg.org/gruf/go-store/storage/errors.go new file mode 100644 index 000000000..016593596 --- /dev/null +++ b/vendor/codeberg.org/gruf/go-store/storage/errors.go @@ -0,0 +1,63 @@ +package storage + +import ( + "fmt" + "syscall" +) + +// errorString is our own simple error type +type errorString string + +// Error implements error +func (e errorString) Error() string { + return string(e) +} + +// Extend appends extra information to an errorString +func (e errorString) Extend(s string, a ...interface{}) errorString { + return errorString(string(e) + ": " + fmt.Sprintf(s, a...)) +} + +var ( + // ErrNotFound is the error returned when a key cannot be found in storage + ErrNotFound = errorString("store/storage: key not found") + + // ErrAlreadyExist is the error returned when a key already exists in storage + ErrAlreadyExists = errorString("store/storage: key already exists") + + // ErrInvalidkey is the error returned when an invalid key is passed to storage + ErrInvalidKey = errorString("store/storage: invalid key") + + // errPathIsFile is returned when a path for a disk config is actually a file + errPathIsFile = errorString("store/storage: path is file") + + // errNoHashesWritten is returned when no blocks are written for given input value + errNoHashesWritten = errorString("storage/storage: no hashes written") + + // errInvalidNode is returned when read on an invalid node in the store is attempted + errInvalidNode = errorString("store/storage: invalid node") + + // errCorruptNodes is returned when nodes with missing blocks are found during a BlockStorage clean + errCorruptNodes = errorString("store/storage: corrupted nodes") +) + +// errSwapNoop performs no error swaps +func errSwapNoop(err error) error { + return err +} + +// ErrSwapNotFound swaps syscall.ENOENT for ErrNotFound +func errSwapNotFound(err error) error { + if err == syscall.ENOENT { + return ErrNotFound + } + return err +} + +// errSwapExist swaps syscall.EEXIST for ErrAlreadyExists +func errSwapExist(err error) error { + if err == syscall.EEXIST { + return ErrAlreadyExists + } + return err +} diff --git a/vendor/codeberg.org/gruf/go-store/storage/fs.go b/vendor/codeberg.org/gruf/go-store/storage/fs.go new file mode 100644 index 000000000..444cee4b0 --- /dev/null +++ b/vendor/codeberg.org/gruf/go-store/storage/fs.go @@ -0,0 +1,48 @@ +package storage + +import ( + "os" + "syscall" + + "codeberg.org/gruf/go-store/util" +) + +const ( + defaultDirPerms = 0755 + defaultFilePerms = 0644 + defaultFileROFlags = syscall.O_RDONLY + defaultFileRWFlags = syscall.O_CREAT | syscall.O_RDWR + defaultFileLockFlags = syscall.O_RDONLY | syscall.O_EXCL | syscall.O_CREAT +) + +// NOTE: +// These functions are for opening storage files, +// not necessarily for e.g. initial setup (OpenFile) + +// open should not be called directly +func open(path string, flags int) (*os.File, error) { + var fd int + err := util.RetryOnEINTR(func() (err error) { + fd, err = syscall.Open(path, flags, defaultFilePerms) + return + }) + if err != nil { + return nil, err + } + return os.NewFile(uintptr(fd), path), nil +} + +// stat checks for a file on disk +func stat(path string) (bool, error) { + var stat syscall.Stat_t + err := util.RetryOnEINTR(func() error { + return syscall.Stat(path, &stat) + }) + if err != nil { + if err == syscall.ENOENT { + err = nil + } + return false, err + } + return true, nil +} diff --git a/vendor/codeberg.org/gruf/go-store/storage/lock.go b/vendor/codeberg.org/gruf/go-store/storage/lock.go new file mode 100644 index 000000000..3d794cda9 --- /dev/null +++ b/vendor/codeberg.org/gruf/go-store/storage/lock.go @@ -0,0 +1,34 @@ +package storage + +import ( + "os" + "syscall" + + "codeberg.org/gruf/go-store/util" +) + +type lockableFile struct { + *os.File +} + +func openLock(path string) (*lockableFile, error) { + file, err := open(path, defaultFileLockFlags) + if err != nil { + return nil, err + } + return &lockableFile{file}, nil +} + +func (f *lockableFile) lock() error { + return f.flock(syscall.LOCK_EX | syscall.LOCK_NB) +} + +func (f *lockableFile) unlock() error { + return f.flock(syscall.LOCK_UN | syscall.LOCK_NB) +} + +func (f *lockableFile) flock(how int) error { + return util.RetryOnEINTR(func() error { + return syscall.Flock(int(f.Fd()), how) + }) +} diff --git a/vendor/codeberg.org/gruf/go-store/storage/storage.go b/vendor/codeberg.org/gruf/go-store/storage/storage.go new file mode 100644 index 000000000..61f722111 --- /dev/null +++ b/vendor/codeberg.org/gruf/go-store/storage/storage.go @@ -0,0 +1,51 @@ +package storage + +import ( + "io" +) + +// StorageEntry defines a key in Storage +type StorageEntry interface { + // Key returns the storage entry's key + Key() string +} + +// entry is the simplest possible StorageEntry +type entry string + +func (e entry) Key() string { + return string(e) +} + +// Storage defines a means of storing and accessing key value pairs +type Storage interface { + // Clean removes unused values and unclutters the storage (e.g. removing empty folders) + Clean() error + + // ReadBytes returns the byte value for key in storage + ReadBytes(key string) ([]byte, error) + + // ReadStream returns an io.ReadCloser for the value bytes at key in the storage + ReadStream(key string) (io.ReadCloser, error) + + // WriteBytes writes the supplied value bytes at key in the storage + WriteBytes(key string, value []byte) error + + // WriteStream writes the bytes from supplied reader at key in the storage + WriteStream(key string, r io.Reader) error + + // Stat checks if the supplied key is in the storage + Stat(key string) (bool, error) + + // Remove attempts to remove the supplied key-value pair from storage + Remove(key string) error + + // WalkKeys walks the keys in the storage + WalkKeys(opts *WalkKeysOptions) error +} + +// WalkKeysOptions defines how to walk the keys in a storage implementation +type WalkKeysOptions struct { + // WalkFn is the function to apply on each StorageEntry + WalkFn func(StorageEntry) +} diff --git a/vendor/codeberg.org/gruf/go-store/storage/transform.go b/vendor/codeberg.org/gruf/go-store/storage/transform.go new file mode 100644 index 000000000..3863dd774 --- /dev/null +++ b/vendor/codeberg.org/gruf/go-store/storage/transform.go @@ -0,0 +1,25 @@ +package storage + +// KeyTransform defines a method of converting store keys to storage paths (and vice-versa) +type KeyTransform interface { + // KeyToPath converts a supplied key to storage path + KeyToPath(string) string + + // PathToKey converts a supplied storage path to key + PathToKey(string) string +} + +type nopKeyTransform struct{} + +// NopTransform returns a nop key transform (i.e. key = path) +func NopTransform() KeyTransform { + return &nopKeyTransform{} +} + +func (t *nopKeyTransform) KeyToPath(key string) string { + return key +} + +func (t *nopKeyTransform) PathToKey(path string) string { + return path +} diff --git a/vendor/codeberg.org/gruf/go-store/util/fs.go b/vendor/codeberg.org/gruf/go-store/util/fs.go new file mode 100644 index 000000000..fa6a9d2c4 --- /dev/null +++ b/vendor/codeberg.org/gruf/go-store/util/fs.go @@ -0,0 +1,105 @@ +package util + +import ( + "io/fs" + "os" + "strings" + "syscall" + + "codeberg.org/gruf/go-fastpath" +) + +var dotdot = "../" + +// CountDotdots returns the number of "dot-dots" (../) in a cleaned filesystem path +func CountDotdots(path string) int { + if !strings.HasSuffix(path, dotdot) { + return 0 + } + return strings.Count(path, dotdot) +} + +// WalkDir traverses the dir tree of the supplied path, performing the supplied walkFn on each entry +func WalkDir(pb *fastpath.Builder, path string, walkFn func(string, fs.DirEntry)) error { + // Read supplied dir path + dirEntries, err := os.ReadDir(path) + if err != nil { + return err + } + + // Iter entries + for _, entry := range dirEntries { + // Pass to walk fn + walkFn(path, entry) + + // Recurse dir entries + if entry.IsDir() { + err = WalkDir(pb, pb.Join(path, entry.Name()), walkFn) + if err != nil { + return err + } + } + } + + return nil +} + +// CleanDirs traverses the dir tree of the supplied path, removing any folders with zero children +func CleanDirs(path string) error { + // Acquire builder + pb := GetPathBuilder() + defer PutPathBuilder(pb) + + // Get dir entries + entries, err := os.ReadDir(path) + if err != nil { + return err + } + + // Recurse dirs + for _, entry := range entries { + if entry.IsDir() { + err := cleanDirs(pb, pb.Join(path, entry.Name())) + if err != nil { + return err + } + } + } + return nil +} + +// cleanDirs performs the actual dir cleaning logic for the exported version +func cleanDirs(pb *fastpath.Builder, path string) error { + // Get dir entries + entries, err := os.ReadDir(path) + if err != nil { + return err + } + + // If no entries, delete + if len(entries) < 1 { + return os.Remove(path) + } + + // Recurse dirs + for _, entry := range entries { + if entry.IsDir() { + err := cleanDirs(pb, pb.Join(path, entry.Name())) + if err != nil { + return err + } + } + } + return nil +} + +// RetryOnEINTR is a low-level filesystem function for retrying syscalls on O_EINTR received +func RetryOnEINTR(do func() error) error { + for { + err := do() + if err == syscall.EINTR { + continue + } + return err + } +} diff --git a/vendor/codeberg.org/gruf/go-store/util/io.go b/vendor/codeberg.org/gruf/go-store/util/io.go new file mode 100644 index 000000000..d034cf62b --- /dev/null +++ b/vendor/codeberg.org/gruf/go-store/util/io.go @@ -0,0 +1,42 @@ +package util + +import "io" + +// NopReadCloser turns a supplied io.Reader into io.ReadCloser with a nop Close() implementation +func NopReadCloser(r io.Reader) io.ReadCloser { + return &nopReadCloser{r} +} + +// NopWriteCloser turns a supplied io.Writer into io.WriteCloser with a nop Close() implementation +func NopWriteCloser(w io.Writer) io.WriteCloser { + return &nopWriteCloser{w} +} + +// ReadCloserWithCallback adds a customizable callback to be called upon Close() of a supplied io.ReadCloser +func ReadCloserWithCallback(rc io.ReadCloser, cb func()) io.ReadCloser { + return &callbackReadCloser{ + ReadCloser: rc, + callback: cb, + } +} + +// nopReadCloser turns an io.Reader -> io.ReadCloser with a nop Close() +type nopReadCloser struct{ io.Reader } + +func (r *nopReadCloser) Close() error { return nil } + +// nopWriteCloser turns an io.Writer -> io.WriteCloser with a nop Close() +type nopWriteCloser struct{ io.Writer } + +func (w nopWriteCloser) Close() error { return nil } + +// callbackReadCloser allows adding our own custom callback to an io.ReadCloser +type callbackReadCloser struct { + io.ReadCloser + callback func() +} + +func (c *callbackReadCloser) Close() error { + defer c.callback() + return c.ReadCloser.Close() +} diff --git a/vendor/codeberg.org/gruf/go-store/util/pool.go b/vendor/codeberg.org/gruf/go-store/util/pool.go new file mode 100644 index 000000000..8400cb5b7 --- /dev/null +++ b/vendor/codeberg.org/gruf/go-store/util/pool.go @@ -0,0 +1,20 @@ +package util + +import ( + "codeberg.org/gruf/go-fastpath" + "codeberg.org/gruf/go-pools" +) + +// pathBuilderPool is the global fastpath.Builder pool +var pathBuilderPool = pools.NewPathBuilderPool(512) + +// GetPathBuilder fetches a fastpath.Builder object from the pool +func GetPathBuilder() *fastpath.Builder { + return pathBuilderPool.Get() +} + +// PutPathBuilder places supplied fastpath.Builder back in the pool +func PutPathBuilder(pb *fastpath.Builder) { + pb.Reset() + pathBuilderPool.Put(pb) +} |