summaryrefslogtreecommitdiff
path: root/vendor/codeberg.org/gruf/go-store/storage/block.go
diff options
context:
space:
mode:
Diffstat (limited to 'vendor/codeberg.org/gruf/go-store/storage/block.go')
-rw-r--r--vendor/codeberg.org/gruf/go-store/storage/block.go69
1 files changed, 42 insertions, 27 deletions
diff --git a/vendor/codeberg.org/gruf/go-store/storage/block.go b/vendor/codeberg.org/gruf/go-store/storage/block.go
index c50faa10b..c0bb6b383 100644
--- a/vendor/codeberg.org/gruf/go-store/storage/block.go
+++ b/vendor/codeberg.org/gruf/go-store/storage/block.go
@@ -1,7 +1,9 @@
package storage
import (
+ "bytes"
"crypto/sha256"
+ "fmt"
"io"
"io/fs"
"os"
@@ -9,8 +11,9 @@ import (
"sync"
"syscall"
- "codeberg.org/gruf/go-bytes"
- "codeberg.org/gruf/go-errors"
+ "codeberg.org/gruf/go-byteutil"
+ "codeberg.org/gruf/go-errors/v2"
+ "codeberg.org/gruf/go-fastcopy"
"codeberg.org/gruf/go-hashenc"
"codeberg.org/gruf/go-pools"
"codeberg.org/gruf/go-store/util"
@@ -34,6 +37,9 @@ type BlockConfig struct {
// BlockSize is the chunking size to use when splitting and storing blocks of data
BlockSize int
+ // ReadBufSize is the buffer size to use when reading node files
+ ReadBufSize int
+
// WriteBufSize is the buffer size to use when writing file streams (PutStream)
WriteBufSize int
@@ -81,13 +87,14 @@ func getBlockConfig(cfg *BlockConfig) BlockConfig {
// "node" file is finally created containing an array of hashes contained within
// this value
type BlockStorage struct {
- path string // path is the root path of this store
- blockPath string // blockPath is the joined root path + block path prefix
- nodePath string // nodePath is the joined root path + node path prefix
- config BlockConfig // cfg is the supplied configuration for this store
- hashPool sync.Pool // hashPool is this store's hashEncoder pool
- bufpool pools.BufferPool // bufpool is this store's bytes.Buffer pool
- lock *Lock // lock is the opened lockfile for this storage instance
+ path string // path is the root path of this store
+ blockPath string // blockPath is the joined root path + block path prefix
+ nodePath string // nodePath is the joined root path + node path prefix
+ config BlockConfig // cfg is the supplied configuration for this store
+ hashPool sync.Pool // hashPool is this store's hashEncoder pool
+ bufpool pools.BufferPool // bufpool is this store's bytes.Buffer pool
+ cppool fastcopy.CopyPool // cppool is the prepared io copier with buffer pool
+ lock *Lock // lock is the opened lockfile for this storage instance
// NOTE:
// BlockStorage does not need to lock each of the underlying block files
@@ -154,8 +161,8 @@ func OpenBlock(path string, cfg *BlockConfig) (*BlockStorage, error) {
bufSz = config.WriteBufSize
}
- // Return new BlockStorage
- return &BlockStorage{
+ // Prepare BlockStorage
+ st := &BlockStorage{
path: path,
blockPath: pb.Join(path, blockPathPrefix),
nodePath: pb.Join(path, nodePathPrefix),
@@ -167,7 +174,12 @@ func OpenBlock(path string, cfg *BlockConfig) (*BlockStorage, error) {
},
bufpool: pools.NewBufferPool(bufSz),
lock: lock,
- }, nil
+ }
+
+ // Set copypool buffer size
+ st.cppool.Buffer(config.ReadBufSize)
+
+ return st, nil
}
// Clean implements storage.Clean()
@@ -297,7 +309,7 @@ func (st *BlockStorage) Clean() error {
for key := range nodes {
nodeKeys = append(nodeKeys, key)
}
- return errCorruptNodes.Extend("%v", nodeKeys)
+ return fmt.Errorf("store/storage: corrupted nodes: %v", nodeKeys)
}
return nil
@@ -337,7 +349,7 @@ func (st *BlockStorage) ReadStream(key string) (io.ReadCloser, error) {
file, err := open(npath, defaultFileROFlags)
if err != nil {
st.lock.Done()
- return nil, err
+ return nil, errSwapNotFound(err)
}
defer file.Close()
@@ -347,13 +359,12 @@ func (st *BlockStorage) ReadStream(key string) (io.ReadCloser, error) {
// Write file contents to node
node := node{}
- _, err = io.CopyBuffer(
+ _, err = st.cppool.Copy(
&nodeWriter{
node: &node,
buf: hbuf,
},
file,
- nil,
)
if err != nil {
st.lock.Done()
@@ -375,14 +386,14 @@ func (st *BlockStorage) readBlock(key string) ([]byte, error) {
// Attempt to open RO file
file, err := open(bpath, defaultFileROFlags)
if err != nil {
- return nil, err
+ return nil, wrap(errCorruptNode, err)
}
defer file.Close()
// Wrap the file in a compressor
cFile, err := st.config.Compression.Reader(file)
if err != nil {
- return nil, err
+ return nil, wrap(errCorruptNode, err)
}
defer cFile.Close()
@@ -470,10 +481,10 @@ loop:
sum := hc.EncodeSum(buf.B)
// Append to the node's hashes
- node.hashes = append(node.hashes, sum.String())
+ node.hashes = append(node.hashes, sum)
// If already on disk, skip
- has, err := st.statBlock(sum.StringPtr())
+ has, err := st.statBlock(sum)
if err != nil {
st.bufpool.Put(buf)
return err
@@ -497,7 +508,7 @@ loop:
}()
// Write block to store at hash
- err = st.writeBlock(sum.StringPtr(), buf.B[:n])
+ err = st.writeBlock(sum, buf.B[:n])
if err != nil {
onceErr.Store(err)
return
@@ -564,7 +575,7 @@ func (st *BlockStorage) writeBlock(hash string, value []byte) error {
// Attempt to open RW file
file, err := open(bpath, defaultFileRWFlags)
if err != nil {
- if err == ErrAlreadyExists {
+ if err == syscall.EEXIST {
err = nil /* race issue describe in struct NOTE */
}
return err
@@ -626,8 +637,12 @@ func (st *BlockStorage) Remove(key string) error {
return ErrClosed
}
- // Attempt to remove file
- return os.Remove(kpath)
+ // Remove at path (we know this is file)
+ if err := unlink(kpath); err != nil {
+ return errSwapNotFound(err)
+ }
+
+ return nil
}
// Close implements Storage.Close()
@@ -762,7 +777,7 @@ func (r *nodeReader) Read(b []byte) (int, error) {
// which is useful when calculated node file is being read from the store
type nodeWriter struct {
node *node
- buf *bytes.Buffer
+ buf *byteutil.Buffer
}
func (w *nodeWriter) Write(b []byte) (int, error) {
@@ -874,7 +889,7 @@ func newHashEncoder() *hashEncoder {
}
// EncodeSum encodes the src data and returns resulting bytes, only valid until next call to EncodeSum()
-func (henc *hashEncoder) EncodeSum(src []byte) bytes.Bytes {
+func (henc *hashEncoder) EncodeSum(src []byte) string {
henc.henc.EncodeSum(henc.ebuf, src)
- return bytes.ToBytes(henc.ebuf)
+ return string(henc.ebuf)
}