summaryrefslogtreecommitdiff
path: root/vendor/codeberg.org/gruf/go-store
diff options
context:
space:
mode:
Diffstat (limited to 'vendor/codeberg.org/gruf/go-store')
-rw-r--r--vendor/codeberg.org/gruf/go-store/v2/kv/state.go8
-rw-r--r--vendor/codeberg.org/gruf/go-store/v2/kv/store.go22
-rw-r--r--vendor/codeberg.org/gruf/go-store/v2/storage/block.go52
-rw-r--r--vendor/codeberg.org/gruf/go-store/v2/storage/compressor.go173
-rw-r--r--vendor/codeberg.org/gruf/go-store/v2/storage/disk.go33
-rw-r--r--vendor/codeberg.org/gruf/go-store/v2/storage/fs.go37
-rw-r--r--vendor/codeberg.org/gruf/go-store/v2/storage/memory.go26
-rw-r--r--vendor/codeberg.org/gruf/go-store/v2/storage/s3.go43
-rw-r--r--vendor/codeberg.org/gruf/go-store/v2/storage/storage.go4
-rw-r--r--vendor/codeberg.org/gruf/go-store/v2/util/io.go93
10 files changed, 261 insertions, 230 deletions
diff --git a/vendor/codeberg.org/gruf/go-store/v2/kv/state.go b/vendor/codeberg.org/gruf/go-store/v2/kv/state.go
index 9ac8ab1bf..450cd850c 100644
--- a/vendor/codeberg.org/gruf/go-store/v2/kv/state.go
+++ b/vendor/codeberg.org/gruf/go-store/v2/kv/state.go
@@ -77,17 +77,17 @@ func (st *StateRW) GetStream(ctx context.Context, key string) (io.ReadCloser, er
}
// Put: see KVStore.Put(). Returns error if state already closed.
-func (st *StateRW) Put(ctx context.Context, key string, value []byte) error {
+func (st *StateRW) Put(ctx context.Context, key string, value []byte) (int, error) {
if st.store == nil {
- return ErrStateClosed
+ return 0, ErrStateClosed
}
return st.store.put(st.state.Lock, ctx, key, value)
}
// PutStream: see KVStore.PutStream(). Returns error if state already closed.
-func (st *StateRW) PutStream(ctx context.Context, key string, r io.Reader) error {
+func (st *StateRW) PutStream(ctx context.Context, key string, r io.Reader) (int64, error) {
if st.store == nil {
- return ErrStateClosed
+ return 0, ErrStateClosed
}
return st.store.putStream(st.state.Lock, ctx, key, r)
}
diff --git a/vendor/codeberg.org/gruf/go-store/v2/kv/store.go b/vendor/codeberg.org/gruf/go-store/v2/kv/store.go
index 5ea795e7c..0b878c47f 100644
--- a/vendor/codeberg.org/gruf/go-store/v2/kv/store.go
+++ b/vendor/codeberg.org/gruf/go-store/v2/kv/store.go
@@ -4,9 +4,9 @@ import (
"context"
"io"
+ "codeberg.org/gruf/go-iotools"
"codeberg.org/gruf/go-mutexes"
"codeberg.org/gruf/go-store/v2/storage"
- "codeberg.org/gruf/go-store/v2/util"
)
// KVStore is a very simple, yet performant key-value store
@@ -117,17 +117,25 @@ func (st *KVStore) getStream(rlock func(string) func(), ctx context.Context, key
return nil, err
}
- // Wrap readcloser in our own callback closer
- return util.ReadCloserWithCallback(rd, runlock), nil
+ var unlocked bool
+
+ // Wrap readcloser to call our own callback
+ return iotools.ReadCloser(rd, iotools.CloserFunc(func() error {
+ if !unlocked {
+ unlocked = true
+ defer runlock()
+ }
+ return rd.Close()
+ })), nil
}
// Put places the bytes at the supplied key in the store.
-func (st *KVStore) Put(ctx context.Context, key string, value []byte) error {
+func (st *KVStore) Put(ctx context.Context, key string, value []byte) (int, error) {
return st.put(st.Lock, ctx, key, value)
}
// put performs the underlying logic for KVStore.Put(), using supplied lock func to allow use with states.
-func (st *KVStore) put(lock func(string) func(), ctx context.Context, key string, value []byte) error {
+func (st *KVStore) put(lock func(string) func(), ctx context.Context, key string, value []byte) (int, error) {
// Acquire write lock for key
unlock := lock(key)
defer unlock()
@@ -137,12 +145,12 @@ func (st *KVStore) put(lock func(string) func(), ctx context.Context, key string
}
// PutStream writes the bytes from the supplied Reader at the supplied key in the store.
-func (st *KVStore) PutStream(ctx context.Context, key string, r io.Reader) error {
+func (st *KVStore) PutStream(ctx context.Context, key string, r io.Reader) (int64, error) {
return st.putStream(st.Lock, ctx, key, r)
}
// putStream performs the underlying logic for KVStore.PutStream(), using supplied lock func to allow use with states.
-func (st *KVStore) putStream(lock func(string) func(), ctx context.Context, key string, r io.Reader) error {
+func (st *KVStore) putStream(lock func(string) func(), ctx context.Context, key string, r io.Reader) (int64, error) {
// Acquire write lock for key
unlock := lock(key)
defer unlock()
diff --git a/vendor/codeberg.org/gruf/go-store/v2/storage/block.go b/vendor/codeberg.org/gruf/go-store/v2/storage/block.go
index f41099c75..11a757211 100644
--- a/vendor/codeberg.org/gruf/go-store/v2/storage/block.go
+++ b/vendor/codeberg.org/gruf/go-store/v2/storage/block.go
@@ -10,12 +10,14 @@ import (
"os"
"strings"
"sync"
+ "sync/atomic"
"syscall"
"codeberg.org/gruf/go-byteutil"
"codeberg.org/gruf/go-errors/v2"
"codeberg.org/gruf/go-fastcopy"
"codeberg.org/gruf/go-hashenc"
+ "codeberg.org/gruf/go-iotools"
"codeberg.org/gruf/go-pools"
"codeberg.org/gruf/go-store/v2/util"
)
@@ -354,7 +356,7 @@ func (st *BlockStorage) ReadStream(ctx context.Context, key string) (io.ReadClos
}
// Prepare block reader and return
- return util.NopReadCloser(&blockReader{
+ return iotools.NopReadCloser(&blockReader{
storage: st,
node: &node,
}), nil
@@ -384,52 +386,54 @@ func (st *BlockStorage) readBlock(key string) ([]byte, error) {
}
// WriteBytes implements Storage.WriteBytes().
-func (st *BlockStorage) WriteBytes(ctx context.Context, key string, value []byte) error {
- return st.WriteStream(ctx, key, bytes.NewReader(value))
+func (st *BlockStorage) WriteBytes(ctx context.Context, key string, value []byte) (int, error) {
+ n, err := st.WriteStream(ctx, key, bytes.NewReader(value))
+ return int(n), err
}
// WriteStream implements Storage.WriteStream().
-func (st *BlockStorage) WriteStream(ctx context.Context, key string, r io.Reader) error {
+func (st *BlockStorage) WriteStream(ctx context.Context, key string, r io.Reader) (int64, error) {
// Get node file path for key
npath, err := st.nodePathForKey(key)
if err != nil {
- return err
+ return 0, err
}
// Check if open
if st.lock.Closed() {
- return ErrClosed
+ return 0, ErrClosed
}
// Check context still valid
if err := ctx.Err(); err != nil {
- return err
+ return 0, err
}
// Check if this exists
ok, err := stat(key)
if err != nil {
- return err
+ return 0, err
}
// Check if we allow overwrites
if ok && !st.config.Overwrite {
- return ErrAlreadyExists
+ return 0, ErrAlreadyExists
}
// Ensure nodes dir (and any leading up to) exists
err = os.MkdirAll(st.nodePath, defaultDirPerms)
if err != nil {
- return err
+ return 0, err
}
// Ensure blocks dir (and any leading up to) exists
err = os.MkdirAll(st.blockPath, defaultDirPerms)
if err != nil {
- return err
+ return 0, err
}
var node node
+ var total atomic.Int64
// Acquire HashEncoder
hc := st.hashPool.Get().(*hashEncoder)
@@ -456,7 +460,7 @@ loop:
break loop
default:
st.bufpool.Put(buf)
- return err
+ return 0, err
}
// Hash the encoded data
@@ -469,7 +473,7 @@ loop:
has, err := st.statBlock(sum)
if err != nil {
st.bufpool.Put(buf)
- return err
+ return 0, err
} else if has {
st.bufpool.Put(buf)
continue loop
@@ -490,11 +494,14 @@ loop:
}()
// Write block to store at hash
- err = st.writeBlock(sum, buf.B[:n])
+ n, err := st.writeBlock(sum, buf.B[:n])
if err != nil {
onceErr.Store(err)
return
}
+
+ // Increment total.
+ total.Add(int64(n))
}()
// Break at end
@@ -506,12 +513,12 @@ loop:
// Wait, check errors
wg.Wait()
if onceErr.IsSet() {
- return onceErr.Load()
+ return 0, onceErr.Load()
}
// If no hashes created, return
if len(node.hashes) < 1 {
- return new_error("no hashes written")
+ return 0, new_error("no hashes written")
}
// Prepare to swap error if need-be
@@ -535,7 +542,7 @@ loop:
// Attempt to open RW file
file, err := open(npath, flags)
if err != nil {
- return errSwap(err)
+ return 0, errSwap(err)
}
defer file.Close()
@@ -546,11 +553,11 @@ loop:
// Finally, write data to file
_, err = io.CopyBuffer(file, &nodeReader{node: node}, buf.B)
- return err
+ return total.Load(), err
}
// writeBlock writes the block with hash and supplied value to the filesystem.
-func (st *BlockStorage) writeBlock(hash string, value []byte) error {
+func (st *BlockStorage) writeBlock(hash string, value []byte) (int, error) {
// Get block file path for key
bpath := st.blockPathForKey(hash)
@@ -560,20 +567,19 @@ func (st *BlockStorage) writeBlock(hash string, value []byte) error {
if err == syscall.EEXIST {
err = nil /* race issue describe in struct NOTE */
}
- return err
+ return 0, err
}
defer file.Close()
// Wrap the file in a compressor
cFile, err := st.config.Compression.Writer(file)
if err != nil {
- return err
+ return 0, err
}
defer cFile.Close()
// Write value to file
- _, err = cFile.Write(value)
- return err
+ return cFile.Write(value)
}
// statBlock checks for existence of supplied block hash.
diff --git a/vendor/codeberg.org/gruf/go-store/v2/storage/compressor.go b/vendor/codeberg.org/gruf/go-store/v2/storage/compressor.go
index 6eeb3a78d..bbe02f22d 100644
--- a/vendor/codeberg.org/gruf/go-store/v2/storage/compressor.go
+++ b/vendor/codeberg.org/gruf/go-store/v2/storage/compressor.go
@@ -5,7 +5,7 @@ import (
"io"
"sync"
- "codeberg.org/gruf/go-store/v2/util"
+ "codeberg.org/gruf/go-iotools"
"github.com/klauspost/compress/gzip"
"github.com/klauspost/compress/snappy"
@@ -15,10 +15,10 @@ import (
// Compressor defines a means of compressing/decompressing values going into a key-value store
type Compressor interface {
// Reader returns a new decompressing io.ReadCloser based on supplied (compressed) io.Reader
- Reader(io.Reader) (io.ReadCloser, error)
+ Reader(io.ReadCloser) (io.ReadCloser, error)
// Writer returns a new compressing io.WriteCloser based on supplied (uncompressed) io.Writer
- Writer(io.Writer) (io.WriteCloser, error)
+ Writer(io.WriteCloser) (io.WriteCloser, error)
}
type gzipCompressor struct {
@@ -47,8 +47,8 @@ func GZipCompressorLevel(level int) Compressor {
// Write empty data to ensure gzip
// header data is in byte buffer.
- gw.Write([]byte{})
- gw.Close()
+ _, _ = gw.Write([]byte{})
+ _ = gw.Close()
return &gzipCompressor{
rpool: sync.Pool{
@@ -67,23 +67,61 @@ func GZipCompressorLevel(level int) Compressor {
}
}
-func (c *gzipCompressor) Reader(r io.Reader) (io.ReadCloser, error) {
+func (c *gzipCompressor) Reader(rc io.ReadCloser) (io.ReadCloser, error) {
+ var released bool
+
+ // Acquire from pool.
gr := c.rpool.Get().(*gzip.Reader)
- if err := gr.Reset(r); err != nil {
+ if err := gr.Reset(rc); err != nil {
c.rpool.Put(gr)
return nil, err
}
- return util.ReadCloserWithCallback(gr, func() {
- c.rpool.Put(gr)
- }), nil
+
+ return iotools.ReadCloser(gr, iotools.CloserFunc(func() error {
+ if !released {
+ released = true
+ defer c.rpool.Put(gr)
+ }
+
+ // Close compressor
+ err1 := gr.Close()
+
+ // Close original stream.
+ err2 := rc.Close()
+
+ // Return err1 or 2
+ if err1 != nil {
+ return err1
+ }
+ return err2
+ })), nil
}
-func (c *gzipCompressor) Writer(w io.Writer) (io.WriteCloser, error) {
+func (c *gzipCompressor) Writer(wc io.WriteCloser) (io.WriteCloser, error) {
+ var released bool
+
+ // Acquire from pool.
gw := c.wpool.Get().(*gzip.Writer)
- gw.Reset(w)
- return util.WriteCloserWithCallback(gw, func() {
- c.wpool.Put(gw)
- }), nil
+ gw.Reset(wc)
+
+ return iotools.WriteCloser(gw, iotools.CloserFunc(func() error {
+ if !released {
+ released = true
+ c.wpool.Put(gw)
+ }
+
+ // Close compressor
+ err1 := gw.Close()
+
+ // Close original stream.
+ err2 := wc.Close()
+
+ // Return err1 or 2
+ if err1 != nil {
+ return err1
+ }
+ return err2
+ })), nil
}
type zlibCompressor struct {
@@ -139,26 +177,61 @@ func ZLibCompressorLevelDict(level int, dict []byte) Compressor {
}
}
-func (c *zlibCompressor) Reader(r io.Reader) (io.ReadCloser, error) {
+func (c *zlibCompressor) Reader(rc io.ReadCloser) (io.ReadCloser, error) {
+ var released bool
zr := c.rpool.Get().(interface {
io.ReadCloser
zlib.Resetter
})
- if err := zr.Reset(r, c.dict); err != nil {
+ if err := zr.Reset(rc, c.dict); err != nil {
c.rpool.Put(zr)
return nil, err
}
- return util.ReadCloserWithCallback(zr, func() {
- c.rpool.Put(zr)
- }), nil
+ return iotools.ReadCloser(zr, iotools.CloserFunc(func() error {
+ if !released {
+ released = true
+ defer c.rpool.Put(zr)
+ }
+
+ // Close compressor
+ err1 := zr.Close()
+
+ // Close original stream.
+ err2 := rc.Close()
+
+ // Return err1 or 2
+ if err1 != nil {
+ return err1
+ }
+ return err2
+ })), nil
}
-func (c *zlibCompressor) Writer(w io.Writer) (io.WriteCloser, error) {
+func (c *zlibCompressor) Writer(wc io.WriteCloser) (io.WriteCloser, error) {
+ var released bool
+
+ // Acquire from pool.
zw := c.wpool.Get().(*zlib.Writer)
- zw.Reset(w)
- return util.WriteCloserWithCallback(zw, func() {
- c.wpool.Put(zw)
- }), nil
+ zw.Reset(wc)
+
+ return iotools.WriteCloser(zw, iotools.CloserFunc(func() error {
+ if !released {
+ released = true
+ c.wpool.Put(zw)
+ }
+
+ // Close compressor
+ err1 := zw.Close()
+
+ // Close original stream.
+ err2 := wc.Close()
+
+ // Return err1 or 2
+ if err1 != nil {
+ return err1
+ }
+ return err2
+ })), nil
}
type snappyCompressor struct {
@@ -178,22 +251,40 @@ func SnappyCompressor() Compressor {
}
}
-func (c *snappyCompressor) Reader(r io.Reader) (io.ReadCloser, error) {
+func (c *snappyCompressor) Reader(rc io.ReadCloser) (io.ReadCloser, error) {
+ var released bool
+
+ // Acquire from pool.
sr := c.rpool.Get().(*snappy.Reader)
- sr.Reset(r)
- return util.ReadCloserWithCallback(
- util.NopReadCloser(sr),
- func() { c.rpool.Put(sr) },
- ), nil
+ sr.Reset(rc)
+
+ return iotools.ReadCloser(sr, iotools.CloserFunc(func() error {
+ if !released {
+ released = true
+ defer c.rpool.Put(sr)
+ }
+
+ // Close original stream.
+ return rc.Close()
+ })), nil
}
-func (c *snappyCompressor) Writer(w io.Writer) (io.WriteCloser, error) {
+func (c *snappyCompressor) Writer(wc io.WriteCloser) (io.WriteCloser, error) {
+ var released bool
+
+ // Acquire from pool.
sw := c.wpool.Get().(*snappy.Writer)
- sw.Reset(w)
- return util.WriteCloserWithCallback(
- util.NopWriteCloser(sw),
- func() { c.wpool.Put(sw) },
- ), nil
+ sw.Reset(wc)
+
+ return iotools.WriteCloser(sw, iotools.CloserFunc(func() error {
+ if !released {
+ released = true
+ c.wpool.Put(sw)
+ }
+
+ // Close original stream.
+ return wc.Close()
+ })), nil
}
type nopCompressor struct{}
@@ -203,10 +294,10 @@ func NoCompression() Compressor {
return &nopCompressor{}
}
-func (c *nopCompressor) Reader(r io.Reader) (io.ReadCloser, error) {
- return util.NopReadCloser(r), nil
+func (c *nopCompressor) Reader(rc io.ReadCloser) (io.ReadCloser, error) {
+ return rc, nil
}
-func (c *nopCompressor) Writer(w io.Writer) (io.WriteCloser, error) {
- return util.NopWriteCloser(w), nil
+func (c *nopCompressor) Writer(wc io.WriteCloser) (io.WriteCloser, error) {
+ return wc, nil
}
diff --git a/vendor/codeberg.org/gruf/go-store/v2/storage/disk.go b/vendor/codeberg.org/gruf/go-store/v2/storage/disk.go
index ef6993edd..21dba7671 100644
--- a/vendor/codeberg.org/gruf/go-store/v2/storage/disk.go
+++ b/vendor/codeberg.org/gruf/go-store/v2/storage/disk.go
@@ -219,43 +219,41 @@ func (st *DiskStorage) ReadStream(ctx context.Context, key string) (io.ReadClose
// Wrap the file in a compressor
cFile, err := st.config.Compression.Reader(file)
if err != nil {
- file.Close() // close this here, ignore error
+ _ = file.Close()
return nil, err
}
- // Wrap compressor to ensure file close
- return util.ReadCloserWithCallback(cFile, func() {
- file.Close()
- }), nil
+ return cFile, nil
}
// WriteBytes implements Storage.WriteBytes().
-func (st *DiskStorage) WriteBytes(ctx context.Context, key string, value []byte) error {
- return st.WriteStream(ctx, key, bytes.NewReader(value))
+func (st *DiskStorage) WriteBytes(ctx context.Context, key string, value []byte) (int, error) {
+ n, err := st.WriteStream(ctx, key, bytes.NewReader(value))
+ return int(n), err
}
// WriteStream implements Storage.WriteStream().
-func (st *DiskStorage) WriteStream(ctx context.Context, key string, r io.Reader) error {
+func (st *DiskStorage) WriteStream(ctx context.Context, key string, r io.Reader) (int64, error) {
// Get file path for key
kpath, err := st.filepath(key)
if err != nil {
- return err
+ return 0, err
}
// Check if open
if st.lock.Closed() {
- return ErrClosed
+ return 0, ErrClosed
}
// Check context still valid
if err := ctx.Err(); err != nil {
- return err
+ return 0, err
}
// Ensure dirs leading up to file exist
err = os.MkdirAll(path.Dir(kpath), defaultDirPerms)
if err != nil {
- return err
+ return 0, err
}
// Prepare to swap error if need-be
@@ -273,20 +271,21 @@ func (st *DiskStorage) WriteStream(ctx context.Context, key string, r io.Reader)
// Attempt to open file
file, err := open(kpath, flags)
if err != nil {
- return errSwap(err)
+ return 0, errSwap(err)
}
- defer file.Close()
// Wrap the file in a compressor
cFile, err := st.config.Compression.Writer(file)
if err != nil {
- return err
+ _ = file.Close()
+ return 0, err
}
+
+ // Wraps file.Close().
defer cFile.Close()
// Copy provided reader to file
- _, err = st.cppool.Copy(cFile, r)
- return err
+ return st.cppool.Copy(cFile, r)
}
// Stat implements Storage.Stat().
diff --git a/vendor/codeberg.org/gruf/go-store/v2/storage/fs.go b/vendor/codeberg.org/gruf/go-store/v2/storage/fs.go
index 48a5806f2..be86ac127 100644
--- a/vendor/codeberg.org/gruf/go-store/v2/storage/fs.go
+++ b/vendor/codeberg.org/gruf/go-store/v2/storage/fs.go
@@ -1,6 +1,7 @@
package storage
import (
+ "fmt"
"io/fs"
"os"
"syscall"
@@ -102,46 +103,32 @@ outer:
// cleanDirs traverses the dir tree of the supplied path, removing any folders with zero children
func cleanDirs(path string) error {
- // Acquire path builder
pb := util.GetPathBuilder()
defer util.PutPathBuilder(pb)
-
- // Get top-level dir entries
- entries, err := readDir(path)
- if err != nil {
- return err
- }
-
- for _, entry := range entries {
- if entry.IsDir() {
- // Recursively clean sub-directory entries
- if err := cleanDir(pb, pb.Join(path, entry.Name())); err != nil {
- return err
- }
- }
- }
-
- return nil
+ return cleanDir(pb, path, true)
}
// cleanDir performs the actual dir cleaning logic for the above top-level version.
-func cleanDir(pb *fastpath.Builder, path string) error {
- // Get dir entries
+func cleanDir(pb *fastpath.Builder, path string, top bool) error {
+ // Get dir entries at path.
entries, err := readDir(path)
if err != nil {
return err
}
- // If no entries, delete
- if len(entries) < 1 {
+ // If no entries, delete dir.
+ if !top && len(entries) == 0 {
return rmdir(path)
}
for _, entry := range entries {
if entry.IsDir() {
- // Recursively clean sub-directory entries
- if err := cleanDir(pb, pb.Join(path, entry.Name())); err != nil {
- return err
+ // Calculate directory path.
+ dirPath := pb.Join(path, entry.Name())
+
+ // Recursively clean sub-directory entries.
+ if err := cleanDir(pb, dirPath, false); err != nil {
+ fmt.Fprintf(os.Stderr, "[go-store/storage] error cleaning %s: %v", dirPath, err)
}
}
}
diff --git a/vendor/codeberg.org/gruf/go-store/v2/storage/memory.go b/vendor/codeberg.org/gruf/go-store/v2/storage/memory.go
index a853c84d2..d42274e39 100644
--- a/vendor/codeberg.org/gruf/go-store/v2/storage/memory.go
+++ b/vendor/codeberg.org/gruf/go-store/v2/storage/memory.go
@@ -6,7 +6,7 @@ import (
"sync/atomic"
"codeberg.org/gruf/go-bytes"
- "codeberg.org/gruf/go-store/v2/util"
+ "codeberg.org/gruf/go-iotools"
"github.com/cornelk/hashmap"
)
@@ -86,57 +86,57 @@ func (st *MemoryStorage) ReadStream(ctx context.Context, key string) (io.ReadClo
// Create io.ReadCloser from 'b' copy
r := bytes.NewReader(copyb(b))
- return util.NopReadCloser(r), nil
+ return iotools.NopReadCloser(r), nil
}
// WriteBytes implements Storage.WriteBytes().
-func (st *MemoryStorage) WriteBytes(ctx context.Context, key string, b []byte) error {
+func (st *MemoryStorage) WriteBytes(ctx context.Context, key string, b []byte) (int, error) {
// Check store open
if st.closed() {
- return ErrClosed
+ return 0, ErrClosed
}
// Check context still valid
if err := ctx.Err(); err != nil {
- return err
+ return 0, err
}
// Check for key that already exists
if _, ok := st.fs.Get(key); ok && !st.ow {
- return ErrAlreadyExists
+ return 0, ErrAlreadyExists
}
// Write key copy to store
st.fs.Set(key, copyb(b))
- return nil
+ return len(b), nil
}
// WriteStream implements Storage.WriteStream().
-func (st *MemoryStorage) WriteStream(ctx context.Context, key string, r io.Reader) error {
+func (st *MemoryStorage) WriteStream(ctx context.Context, key string, r io.Reader) (int64, error) {
// Check store open
if st.closed() {
- return ErrClosed
+ return 0, ErrClosed
}
// Check context still valid
if err := ctx.Err(); err != nil {
- return err
+ return 0, err
}
// Check for key that already exists
if _, ok := st.fs.Get(key); ok && !st.ow {
- return ErrAlreadyExists
+ return 0, ErrAlreadyExists
}
// Read all from reader
b, err := io.ReadAll(r)
if err != nil {
- return err
+ return 0, err
}
// Write key to store
st.fs.Set(key, b)
- return nil
+ return int64(len(b)), nil
}
// Stat implements Storage.Stat().
diff --git a/vendor/codeberg.org/gruf/go-store/v2/storage/s3.go b/vendor/codeberg.org/gruf/go-store/v2/storage/s3.go
index f8011114f..501de230d 100644
--- a/vendor/codeberg.org/gruf/go-store/v2/storage/s3.go
+++ b/vendor/codeberg.org/gruf/go-store/v2/storage/s3.go
@@ -160,22 +160,23 @@ func (st *S3Storage) ReadStream(ctx context.Context, key string) (io.ReadCloser,
}
// WriteBytes implements Storage.WriteBytes().
-func (st *S3Storage) WriteBytes(ctx context.Context, key string, value []byte) error {
- return st.WriteStream(ctx, key, util.NewByteReaderSize(value))
+func (st *S3Storage) WriteBytes(ctx context.Context, key string, value []byte) (int, error) {
+ n, err := st.WriteStream(ctx, key, util.NewByteReaderSize(value))
+ return int(n), err
}
// WriteStream implements Storage.WriteStream().
-func (st *S3Storage) WriteStream(ctx context.Context, key string, r io.Reader) error {
+func (st *S3Storage) WriteStream(ctx context.Context, key string, r io.Reader) (int64, error) {
// Check storage open
if st.closed() {
- return ErrClosed
+ return 0, ErrClosed
}
if rs, ok := r.(util.ReaderSize); ok {
// This reader supports providing us the size of
// the encompassed data, allowing us to perform
// a singular .PutObject() call with length.
- _, err := st.client.PutObject(
+ info, err := st.client.PutObject(
ctx,
st.bucket,
key,
@@ -186,9 +187,9 @@ func (st *S3Storage) WriteStream(ctx context.Context, key string, r io.Reader) e
st.config.PutOpts,
)
if err != nil {
- return transformS3Error(err)
+ err = transformS3Error(err)
}
- return nil
+ return info.Size, err
}
// Start a new multipart upload to get ID
@@ -199,14 +200,15 @@ func (st *S3Storage) WriteStream(ctx context.Context, key string, r io.Reader) e
st.config.PutOpts,
)
if err != nil {
- return transformS3Error(err)
+ return 0, transformS3Error(err)
}
var (
- count = 1
+ index = int(1) // parts index
+ total = int64(0)
parts []minio.CompletePart
chunk = make([]byte, st.config.PutChunkSize)
- rdr = bytes.NewReader(nil)
+ rbuf = bytes.NewReader(nil)
)
// Note that we do not perform any kind of
@@ -234,11 +236,11 @@ loop:
// All other errors
default:
- return err
+ return 0, err
}
// Reset byte reader
- rdr.Reset(chunk[:n])
+ rbuf.Reset(chunk[:n])
// Put this object chunk in S3 store
pt, err := st.client.PutObjectPart(
@@ -246,15 +248,15 @@ loop:
st.bucket,
key,
uploadID,
- count,
- rdr,
+ index,
+ rbuf,
int64(n),
"",
"",
nil,
)
if err != nil {
- return err
+ return 0, err
}
// Append completed part to slice
@@ -267,8 +269,11 @@ loop:
ChecksumSHA256: pt.ChecksumSHA256,
})
- // Iterate part count
- count++
+ // Iterate idx
+ index++
+
+ // Update total size
+ total += pt.Size
}
// Complete this multi-part upload operation
@@ -281,10 +286,10 @@ loop:
st.config.PutOpts,
)
if err != nil {
- return err
+ return 0, err
}
- return nil
+ return total, nil
}
// Stat implements Storage.Stat().
diff --git a/vendor/codeberg.org/gruf/go-store/v2/storage/storage.go b/vendor/codeberg.org/gruf/go-store/v2/storage/storage.go
index 00fbe7abd..a60ea93ad 100644
--- a/vendor/codeberg.org/gruf/go-store/v2/storage/storage.go
+++ b/vendor/codeberg.org/gruf/go-store/v2/storage/storage.go
@@ -14,10 +14,10 @@ type Storage interface {
ReadStream(ctx context.Context, key string) (io.ReadCloser, error)
// WriteBytes writes the supplied value bytes at key in the storage
- WriteBytes(ctx context.Context, key string, value []byte) error
+ WriteBytes(ctx context.Context, key string, value []byte) (int, error)
// WriteStream writes the bytes from supplied reader at key in the storage
- WriteStream(ctx context.Context, key string, r io.Reader) error
+ WriteStream(ctx context.Context, key string, r io.Reader) (int64, error)
// Stat checks if the supplied key is in the storage
Stat(ctx context.Context, key string) (bool, error)
diff --git a/vendor/codeberg.org/gruf/go-store/v2/util/io.go b/vendor/codeberg.org/gruf/go-store/v2/util/io.go
index 3d62e8be6..c5135084a 100644
--- a/vendor/codeberg.org/gruf/go-store/v2/util/io.go
+++ b/vendor/codeberg.org/gruf/go-store/v2/util/io.go
@@ -5,102 +5,37 @@ import (
"io"
)
-// ReaderSize ...
+// ReaderSize defines a reader of known size in bytes.
type ReaderSize interface {
io.Reader
-
- // Size ...
Size() int64
}
-// ByteReaderSize ...
+// ByteReaderSize implements ReaderSize for an in-memory byte-slice.
type ByteReaderSize struct {
- bytes.Reader
+ br bytes.Reader
sz int64
}
-// NewByteReaderSize ...
+// NewByteReaderSize returns a new ByteReaderSize instance reset to slice b.
func NewByteReaderSize(b []byte) *ByteReaderSize {
- rs := ByteReaderSize{}
+ rs := new(ByteReaderSize)
rs.Reset(b)
- return &rs
+ return rs
+}
+
+// Read implements io.Reader.
+func (rs *ByteReaderSize) Read(b []byte) (int, error) {
+ return rs.br.Read(b)
}
-// Size implements ReaderSize.Size().
-func (rs ByteReaderSize) Size() int64 {
+// Size implements ReaderSize.
+func (rs *ByteReaderSize) Size() int64 {
return rs.sz
}
// Reset resets the ReaderSize to be reading from b.
func (rs *ByteReaderSize) Reset(b []byte) {
- rs.Reader.Reset(b)
+ rs.br.Reset(b)
rs.sz = int64(len(b))
}
-
-// NopReadCloser turns a supplied io.Reader into io.ReadCloser with a nop Close() implementation.
-func NopReadCloser(r io.Reader) io.ReadCloser {
- return &nopReadCloser{r}
-}
-
-// NopWriteCloser turns a supplied io.Writer into io.WriteCloser with a nop Close() implementation.
-func NopWriteCloser(w io.Writer) io.WriteCloser {
- return &nopWriteCloser{w}
-}
-
-// ReadCloserWithCallback adds a customizable callback to be called upon Close() of a supplied io.ReadCloser.
-// Note that the callback will never be called more than once, after execution this will remove the func reference.
-func ReadCloserWithCallback(rc io.ReadCloser, cb func()) io.ReadCloser {
- return &callbackReadCloser{
- ReadCloser: rc,
- callback: cb,
- }
-}
-
-// WriteCloserWithCallback adds a customizable callback to be called upon Close() of a supplied io.WriteCloser.
-// Note that the callback will never be called more than once, after execution this will remove the func reference.
-func WriteCloserWithCallback(wc io.WriteCloser, cb func()) io.WriteCloser {
- return &callbackWriteCloser{
- WriteCloser: wc,
- callback: cb,
- }
-}
-
-// nopReadCloser turns an io.Reader -> io.ReadCloser with a nop Close().
-type nopReadCloser struct{ io.Reader }
-
-func (r *nopReadCloser) Close() error { return nil }
-
-// nopWriteCloser turns an io.Writer -> io.WriteCloser with a nop Close().
-type nopWriteCloser struct{ io.Writer }
-
-func (w nopWriteCloser) Close() error { return nil }
-
-// callbackReadCloser allows adding our own custom callback to an io.ReadCloser.
-type callbackReadCloser struct {
- io.ReadCloser
- callback func()
-}
-
-func (c *callbackReadCloser) Close() error {
- if c.callback != nil {
- cb := c.callback
- c.callback = nil
- defer cb()
- }
- return c.ReadCloser.Close()
-}
-
-// callbackWriteCloser allows adding our own custom callback to an io.WriteCloser.
-type callbackWriteCloser struct {
- io.WriteCloser
- callback func()
-}
-
-func (c *callbackWriteCloser) Close() error {
- if c.callback != nil {
- cb := c.callback
- c.callback = nil
- defer cb()
- }
- return c.WriteCloser.Close()
-}