diff options
Diffstat (limited to 'vendor/codeberg.org/gruf/go-store/storage/block.go')
-rw-r--r-- | vendor/codeberg.org/gruf/go-store/storage/block.go | 69 |
1 files changed, 42 insertions, 27 deletions
diff --git a/vendor/codeberg.org/gruf/go-store/storage/block.go b/vendor/codeberg.org/gruf/go-store/storage/block.go index c50faa10b..c0bb6b383 100644 --- a/vendor/codeberg.org/gruf/go-store/storage/block.go +++ b/vendor/codeberg.org/gruf/go-store/storage/block.go @@ -1,7 +1,9 @@ package storage import ( + "bytes" "crypto/sha256" + "fmt" "io" "io/fs" "os" @@ -9,8 +11,9 @@ import ( "sync" "syscall" - "codeberg.org/gruf/go-bytes" - "codeberg.org/gruf/go-errors" + "codeberg.org/gruf/go-byteutil" + "codeberg.org/gruf/go-errors/v2" + "codeberg.org/gruf/go-fastcopy" "codeberg.org/gruf/go-hashenc" "codeberg.org/gruf/go-pools" "codeberg.org/gruf/go-store/util" @@ -34,6 +37,9 @@ type BlockConfig struct { // BlockSize is the chunking size to use when splitting and storing blocks of data BlockSize int + // ReadBufSize is the buffer size to use when reading node files + ReadBufSize int + // WriteBufSize is the buffer size to use when writing file streams (PutStream) WriteBufSize int @@ -81,13 +87,14 @@ func getBlockConfig(cfg *BlockConfig) BlockConfig { // "node" file is finally created containing an array of hashes contained within // this value type BlockStorage struct { - path string // path is the root path of this store - blockPath string // blockPath is the joined root path + block path prefix - nodePath string // nodePath is the joined root path + node path prefix - config BlockConfig // cfg is the supplied configuration for this store - hashPool sync.Pool // hashPool is this store's hashEncoder pool - bufpool pools.BufferPool // bufpool is this store's bytes.Buffer pool - lock *Lock // lock is the opened lockfile for this storage instance + path string // path is the root path of this store + blockPath string // blockPath is the joined root path + block path prefix + nodePath string // nodePath is the joined root path + node path prefix + config BlockConfig // cfg is the supplied configuration for this store + hashPool sync.Pool // hashPool is this store's hashEncoder pool + bufpool pools.BufferPool // bufpool is this store's bytes.Buffer pool + cppool fastcopy.CopyPool // cppool is the prepared io copier with buffer pool + lock *Lock // lock is the opened lockfile for this storage instance // NOTE: // BlockStorage does not need to lock each of the underlying block files @@ -154,8 +161,8 @@ func OpenBlock(path string, cfg *BlockConfig) (*BlockStorage, error) { bufSz = config.WriteBufSize } - // Return new BlockStorage - return &BlockStorage{ + // Prepare BlockStorage + st := &BlockStorage{ path: path, blockPath: pb.Join(path, blockPathPrefix), nodePath: pb.Join(path, nodePathPrefix), @@ -167,7 +174,12 @@ func OpenBlock(path string, cfg *BlockConfig) (*BlockStorage, error) { }, bufpool: pools.NewBufferPool(bufSz), lock: lock, - }, nil + } + + // Set copypool buffer size + st.cppool.Buffer(config.ReadBufSize) + + return st, nil } // Clean implements storage.Clean() @@ -297,7 +309,7 @@ func (st *BlockStorage) Clean() error { for key := range nodes { nodeKeys = append(nodeKeys, key) } - return errCorruptNodes.Extend("%v", nodeKeys) + return fmt.Errorf("store/storage: corrupted nodes: %v", nodeKeys) } return nil @@ -337,7 +349,7 @@ func (st *BlockStorage) ReadStream(key string) (io.ReadCloser, error) { file, err := open(npath, defaultFileROFlags) if err != nil { st.lock.Done() - return nil, err + return nil, errSwapNotFound(err) } defer file.Close() @@ -347,13 +359,12 @@ func (st *BlockStorage) ReadStream(key string) (io.ReadCloser, error) { // Write file contents to node node := node{} - _, err = io.CopyBuffer( + _, err = st.cppool.Copy( &nodeWriter{ node: &node, buf: hbuf, }, file, - nil, ) if err != nil { st.lock.Done() @@ -375,14 +386,14 @@ func (st *BlockStorage) readBlock(key string) ([]byte, error) { // Attempt to open RO file file, err := open(bpath, defaultFileROFlags) if err != nil { - return nil, err + return nil, wrap(errCorruptNode, err) } defer file.Close() // Wrap the file in a compressor cFile, err := st.config.Compression.Reader(file) if err != nil { - return nil, err + return nil, wrap(errCorruptNode, err) } defer cFile.Close() @@ -470,10 +481,10 @@ loop: sum := hc.EncodeSum(buf.B) // Append to the node's hashes - node.hashes = append(node.hashes, sum.String()) + node.hashes = append(node.hashes, sum) // If already on disk, skip - has, err := st.statBlock(sum.StringPtr()) + has, err := st.statBlock(sum) if err != nil { st.bufpool.Put(buf) return err @@ -497,7 +508,7 @@ loop: }() // Write block to store at hash - err = st.writeBlock(sum.StringPtr(), buf.B[:n]) + err = st.writeBlock(sum, buf.B[:n]) if err != nil { onceErr.Store(err) return @@ -564,7 +575,7 @@ func (st *BlockStorage) writeBlock(hash string, value []byte) error { // Attempt to open RW file file, err := open(bpath, defaultFileRWFlags) if err != nil { - if err == ErrAlreadyExists { + if err == syscall.EEXIST { err = nil /* race issue describe in struct NOTE */ } return err @@ -626,8 +637,12 @@ func (st *BlockStorage) Remove(key string) error { return ErrClosed } - // Attempt to remove file - return os.Remove(kpath) + // Remove at path (we know this is file) + if err := unlink(kpath); err != nil { + return errSwapNotFound(err) + } + + return nil } // Close implements Storage.Close() @@ -762,7 +777,7 @@ func (r *nodeReader) Read(b []byte) (int, error) { // which is useful when calculated node file is being read from the store type nodeWriter struct { node *node - buf *bytes.Buffer + buf *byteutil.Buffer } func (w *nodeWriter) Write(b []byte) (int, error) { @@ -874,7 +889,7 @@ func newHashEncoder() *hashEncoder { } // EncodeSum encodes the src data and returns resulting bytes, only valid until next call to EncodeSum() -func (henc *hashEncoder) EncodeSum(src []byte) bytes.Bytes { +func (henc *hashEncoder) EncodeSum(src []byte) string { henc.henc.EncodeSum(henc.ebuf, src) - return bytes.ToBytes(henc.ebuf) + return string(henc.ebuf) } |