diff options
Diffstat (limited to 'vendor/codeberg.org/gruf/go-store')
10 files changed, 261 insertions, 230 deletions
diff --git a/vendor/codeberg.org/gruf/go-store/v2/kv/state.go b/vendor/codeberg.org/gruf/go-store/v2/kv/state.go index 9ac8ab1bf..450cd850c 100644 --- a/vendor/codeberg.org/gruf/go-store/v2/kv/state.go +++ b/vendor/codeberg.org/gruf/go-store/v2/kv/state.go @@ -77,17 +77,17 @@ func (st *StateRW) GetStream(ctx context.Context, key string) (io.ReadCloser, er } // Put: see KVStore.Put(). Returns error if state already closed. -func (st *StateRW) Put(ctx context.Context, key string, value []byte) error { +func (st *StateRW) Put(ctx context.Context, key string, value []byte) (int, error) { if st.store == nil { - return ErrStateClosed + return 0, ErrStateClosed } return st.store.put(st.state.Lock, ctx, key, value) } // PutStream: see KVStore.PutStream(). Returns error if state already closed. -func (st *StateRW) PutStream(ctx context.Context, key string, r io.Reader) error { +func (st *StateRW) PutStream(ctx context.Context, key string, r io.Reader) (int64, error) { if st.store == nil { - return ErrStateClosed + return 0, ErrStateClosed } return st.store.putStream(st.state.Lock, ctx, key, r) } diff --git a/vendor/codeberg.org/gruf/go-store/v2/kv/store.go b/vendor/codeberg.org/gruf/go-store/v2/kv/store.go index 5ea795e7c..0b878c47f 100644 --- a/vendor/codeberg.org/gruf/go-store/v2/kv/store.go +++ b/vendor/codeberg.org/gruf/go-store/v2/kv/store.go @@ -4,9 +4,9 @@ import ( "context" "io" + "codeberg.org/gruf/go-iotools" "codeberg.org/gruf/go-mutexes" "codeberg.org/gruf/go-store/v2/storage" - "codeberg.org/gruf/go-store/v2/util" ) // KVStore is a very simple, yet performant key-value store @@ -117,17 +117,25 @@ func (st *KVStore) getStream(rlock func(string) func(), ctx context.Context, key return nil, err } - // Wrap readcloser in our own callback closer - return util.ReadCloserWithCallback(rd, runlock), nil + var unlocked bool + + // Wrap readcloser to call our own callback + return iotools.ReadCloser(rd, iotools.CloserFunc(func() error { + if !unlocked { + unlocked = true + defer runlock() + } + return rd.Close() + })), nil } // Put places the bytes at the supplied key in the store. -func (st *KVStore) Put(ctx context.Context, key string, value []byte) error { +func (st *KVStore) Put(ctx context.Context, key string, value []byte) (int, error) { return st.put(st.Lock, ctx, key, value) } // put performs the underlying logic for KVStore.Put(), using supplied lock func to allow use with states. -func (st *KVStore) put(lock func(string) func(), ctx context.Context, key string, value []byte) error { +func (st *KVStore) put(lock func(string) func(), ctx context.Context, key string, value []byte) (int, error) { // Acquire write lock for key unlock := lock(key) defer unlock() @@ -137,12 +145,12 @@ func (st *KVStore) put(lock func(string) func(), ctx context.Context, key string } // PutStream writes the bytes from the supplied Reader at the supplied key in the store. -func (st *KVStore) PutStream(ctx context.Context, key string, r io.Reader) error { +func (st *KVStore) PutStream(ctx context.Context, key string, r io.Reader) (int64, error) { return st.putStream(st.Lock, ctx, key, r) } // putStream performs the underlying logic for KVStore.PutStream(), using supplied lock func to allow use with states. -func (st *KVStore) putStream(lock func(string) func(), ctx context.Context, key string, r io.Reader) error { +func (st *KVStore) putStream(lock func(string) func(), ctx context.Context, key string, r io.Reader) (int64, error) { // Acquire write lock for key unlock := lock(key) defer unlock() diff --git a/vendor/codeberg.org/gruf/go-store/v2/storage/block.go b/vendor/codeberg.org/gruf/go-store/v2/storage/block.go index f41099c75..11a757211 100644 --- a/vendor/codeberg.org/gruf/go-store/v2/storage/block.go +++ b/vendor/codeberg.org/gruf/go-store/v2/storage/block.go @@ -10,12 +10,14 @@ import ( "os" "strings" "sync" + "sync/atomic" "syscall" "codeberg.org/gruf/go-byteutil" "codeberg.org/gruf/go-errors/v2" "codeberg.org/gruf/go-fastcopy" "codeberg.org/gruf/go-hashenc" + "codeberg.org/gruf/go-iotools" "codeberg.org/gruf/go-pools" "codeberg.org/gruf/go-store/v2/util" ) @@ -354,7 +356,7 @@ func (st *BlockStorage) ReadStream(ctx context.Context, key string) (io.ReadClos } // Prepare block reader and return - return util.NopReadCloser(&blockReader{ + return iotools.NopReadCloser(&blockReader{ storage: st, node: &node, }), nil @@ -384,52 +386,54 @@ func (st *BlockStorage) readBlock(key string) ([]byte, error) { } // WriteBytes implements Storage.WriteBytes(). -func (st *BlockStorage) WriteBytes(ctx context.Context, key string, value []byte) error { - return st.WriteStream(ctx, key, bytes.NewReader(value)) +func (st *BlockStorage) WriteBytes(ctx context.Context, key string, value []byte) (int, error) { + n, err := st.WriteStream(ctx, key, bytes.NewReader(value)) + return int(n), err } // WriteStream implements Storage.WriteStream(). -func (st *BlockStorage) WriteStream(ctx context.Context, key string, r io.Reader) error { +func (st *BlockStorage) WriteStream(ctx context.Context, key string, r io.Reader) (int64, error) { // Get node file path for key npath, err := st.nodePathForKey(key) if err != nil { - return err + return 0, err } // Check if open if st.lock.Closed() { - return ErrClosed + return 0, ErrClosed } // Check context still valid if err := ctx.Err(); err != nil { - return err + return 0, err } // Check if this exists ok, err := stat(key) if err != nil { - return err + return 0, err } // Check if we allow overwrites if ok && !st.config.Overwrite { - return ErrAlreadyExists + return 0, ErrAlreadyExists } // Ensure nodes dir (and any leading up to) exists err = os.MkdirAll(st.nodePath, defaultDirPerms) if err != nil { - return err + return 0, err } // Ensure blocks dir (and any leading up to) exists err = os.MkdirAll(st.blockPath, defaultDirPerms) if err != nil { - return err + return 0, err } var node node + var total atomic.Int64 // Acquire HashEncoder hc := st.hashPool.Get().(*hashEncoder) @@ -456,7 +460,7 @@ loop: break loop default: st.bufpool.Put(buf) - return err + return 0, err } // Hash the encoded data @@ -469,7 +473,7 @@ loop: has, err := st.statBlock(sum) if err != nil { st.bufpool.Put(buf) - return err + return 0, err } else if has { st.bufpool.Put(buf) continue loop @@ -490,11 +494,14 @@ loop: }() // Write block to store at hash - err = st.writeBlock(sum, buf.B[:n]) + n, err := st.writeBlock(sum, buf.B[:n]) if err != nil { onceErr.Store(err) return } + + // Increment total. + total.Add(int64(n)) }() // Break at end @@ -506,12 +513,12 @@ loop: // Wait, check errors wg.Wait() if onceErr.IsSet() { - return onceErr.Load() + return 0, onceErr.Load() } // If no hashes created, return if len(node.hashes) < 1 { - return new_error("no hashes written") + return 0, new_error("no hashes written") } // Prepare to swap error if need-be @@ -535,7 +542,7 @@ loop: // Attempt to open RW file file, err := open(npath, flags) if err != nil { - return errSwap(err) + return 0, errSwap(err) } defer file.Close() @@ -546,11 +553,11 @@ loop: // Finally, write data to file _, err = io.CopyBuffer(file, &nodeReader{node: node}, buf.B) - return err + return total.Load(), err } // writeBlock writes the block with hash and supplied value to the filesystem. -func (st *BlockStorage) writeBlock(hash string, value []byte) error { +func (st *BlockStorage) writeBlock(hash string, value []byte) (int, error) { // Get block file path for key bpath := st.blockPathForKey(hash) @@ -560,20 +567,19 @@ func (st *BlockStorage) writeBlock(hash string, value []byte) error { if err == syscall.EEXIST { err = nil /* race issue describe in struct NOTE */ } - return err + return 0, err } defer file.Close() // Wrap the file in a compressor cFile, err := st.config.Compression.Writer(file) if err != nil { - return err + return 0, err } defer cFile.Close() // Write value to file - _, err = cFile.Write(value) - return err + return cFile.Write(value) } // statBlock checks for existence of supplied block hash. diff --git a/vendor/codeberg.org/gruf/go-store/v2/storage/compressor.go b/vendor/codeberg.org/gruf/go-store/v2/storage/compressor.go index 6eeb3a78d..bbe02f22d 100644 --- a/vendor/codeberg.org/gruf/go-store/v2/storage/compressor.go +++ b/vendor/codeberg.org/gruf/go-store/v2/storage/compressor.go @@ -5,7 +5,7 @@ import ( "io" "sync" - "codeberg.org/gruf/go-store/v2/util" + "codeberg.org/gruf/go-iotools" "github.com/klauspost/compress/gzip" "github.com/klauspost/compress/snappy" @@ -15,10 +15,10 @@ import ( // Compressor defines a means of compressing/decompressing values going into a key-value store type Compressor interface { // Reader returns a new decompressing io.ReadCloser based on supplied (compressed) io.Reader - Reader(io.Reader) (io.ReadCloser, error) + Reader(io.ReadCloser) (io.ReadCloser, error) // Writer returns a new compressing io.WriteCloser based on supplied (uncompressed) io.Writer - Writer(io.Writer) (io.WriteCloser, error) + Writer(io.WriteCloser) (io.WriteCloser, error) } type gzipCompressor struct { @@ -47,8 +47,8 @@ func GZipCompressorLevel(level int) Compressor { // Write empty data to ensure gzip // header data is in byte buffer. - gw.Write([]byte{}) - gw.Close() + _, _ = gw.Write([]byte{}) + _ = gw.Close() return &gzipCompressor{ rpool: sync.Pool{ @@ -67,23 +67,61 @@ func GZipCompressorLevel(level int) Compressor { } } -func (c *gzipCompressor) Reader(r io.Reader) (io.ReadCloser, error) { +func (c *gzipCompressor) Reader(rc io.ReadCloser) (io.ReadCloser, error) { + var released bool + + // Acquire from pool. gr := c.rpool.Get().(*gzip.Reader) - if err := gr.Reset(r); err != nil { + if err := gr.Reset(rc); err != nil { c.rpool.Put(gr) return nil, err } - return util.ReadCloserWithCallback(gr, func() { - c.rpool.Put(gr) - }), nil + + return iotools.ReadCloser(gr, iotools.CloserFunc(func() error { + if !released { + released = true + defer c.rpool.Put(gr) + } + + // Close compressor + err1 := gr.Close() + + // Close original stream. + err2 := rc.Close() + + // Return err1 or 2 + if err1 != nil { + return err1 + } + return err2 + })), nil } -func (c *gzipCompressor) Writer(w io.Writer) (io.WriteCloser, error) { +func (c *gzipCompressor) Writer(wc io.WriteCloser) (io.WriteCloser, error) { + var released bool + + // Acquire from pool. gw := c.wpool.Get().(*gzip.Writer) - gw.Reset(w) - return util.WriteCloserWithCallback(gw, func() { - c.wpool.Put(gw) - }), nil + gw.Reset(wc) + + return iotools.WriteCloser(gw, iotools.CloserFunc(func() error { + if !released { + released = true + c.wpool.Put(gw) + } + + // Close compressor + err1 := gw.Close() + + // Close original stream. + err2 := wc.Close() + + // Return err1 or 2 + if err1 != nil { + return err1 + } + return err2 + })), nil } type zlibCompressor struct { @@ -139,26 +177,61 @@ func ZLibCompressorLevelDict(level int, dict []byte) Compressor { } } -func (c *zlibCompressor) Reader(r io.Reader) (io.ReadCloser, error) { +func (c *zlibCompressor) Reader(rc io.ReadCloser) (io.ReadCloser, error) { + var released bool zr := c.rpool.Get().(interface { io.ReadCloser zlib.Resetter }) - if err := zr.Reset(r, c.dict); err != nil { + if err := zr.Reset(rc, c.dict); err != nil { c.rpool.Put(zr) return nil, err } - return util.ReadCloserWithCallback(zr, func() { - c.rpool.Put(zr) - }), nil + return iotools.ReadCloser(zr, iotools.CloserFunc(func() error { + if !released { + released = true + defer c.rpool.Put(zr) + } + + // Close compressor + err1 := zr.Close() + + // Close original stream. + err2 := rc.Close() + + // Return err1 or 2 + if err1 != nil { + return err1 + } + return err2 + })), nil } -func (c *zlibCompressor) Writer(w io.Writer) (io.WriteCloser, error) { +func (c *zlibCompressor) Writer(wc io.WriteCloser) (io.WriteCloser, error) { + var released bool + + // Acquire from pool. zw := c.wpool.Get().(*zlib.Writer) - zw.Reset(w) - return util.WriteCloserWithCallback(zw, func() { - c.wpool.Put(zw) - }), nil + zw.Reset(wc) + + return iotools.WriteCloser(zw, iotools.CloserFunc(func() error { + if !released { + released = true + c.wpool.Put(zw) + } + + // Close compressor + err1 := zw.Close() + + // Close original stream. + err2 := wc.Close() + + // Return err1 or 2 + if err1 != nil { + return err1 + } + return err2 + })), nil } type snappyCompressor struct { @@ -178,22 +251,40 @@ func SnappyCompressor() Compressor { } } -func (c *snappyCompressor) Reader(r io.Reader) (io.ReadCloser, error) { +func (c *snappyCompressor) Reader(rc io.ReadCloser) (io.ReadCloser, error) { + var released bool + + // Acquire from pool. sr := c.rpool.Get().(*snappy.Reader) - sr.Reset(r) - return util.ReadCloserWithCallback( - util.NopReadCloser(sr), - func() { c.rpool.Put(sr) }, - ), nil + sr.Reset(rc) + + return iotools.ReadCloser(sr, iotools.CloserFunc(func() error { + if !released { + released = true + defer c.rpool.Put(sr) + } + + // Close original stream. + return rc.Close() + })), nil } -func (c *snappyCompressor) Writer(w io.Writer) (io.WriteCloser, error) { +func (c *snappyCompressor) Writer(wc io.WriteCloser) (io.WriteCloser, error) { + var released bool + + // Acquire from pool. sw := c.wpool.Get().(*snappy.Writer) - sw.Reset(w) - return util.WriteCloserWithCallback( - util.NopWriteCloser(sw), - func() { c.wpool.Put(sw) }, - ), nil + sw.Reset(wc) + + return iotools.WriteCloser(sw, iotools.CloserFunc(func() error { + if !released { + released = true + c.wpool.Put(sw) + } + + // Close original stream. + return wc.Close() + })), nil } type nopCompressor struct{} @@ -203,10 +294,10 @@ func NoCompression() Compressor { return &nopCompressor{} } -func (c *nopCompressor) Reader(r io.Reader) (io.ReadCloser, error) { - return util.NopReadCloser(r), nil +func (c *nopCompressor) Reader(rc io.ReadCloser) (io.ReadCloser, error) { + return rc, nil } -func (c *nopCompressor) Writer(w io.Writer) (io.WriteCloser, error) { - return util.NopWriteCloser(w), nil +func (c *nopCompressor) Writer(wc io.WriteCloser) (io.WriteCloser, error) { + return wc, nil } diff --git a/vendor/codeberg.org/gruf/go-store/v2/storage/disk.go b/vendor/codeberg.org/gruf/go-store/v2/storage/disk.go index ef6993edd..21dba7671 100644 --- a/vendor/codeberg.org/gruf/go-store/v2/storage/disk.go +++ b/vendor/codeberg.org/gruf/go-store/v2/storage/disk.go @@ -219,43 +219,41 @@ func (st *DiskStorage) ReadStream(ctx context.Context, key string) (io.ReadClose // Wrap the file in a compressor cFile, err := st.config.Compression.Reader(file) if err != nil { - file.Close() // close this here, ignore error + _ = file.Close() return nil, err } - // Wrap compressor to ensure file close - return util.ReadCloserWithCallback(cFile, func() { - file.Close() - }), nil + return cFile, nil } // WriteBytes implements Storage.WriteBytes(). -func (st *DiskStorage) WriteBytes(ctx context.Context, key string, value []byte) error { - return st.WriteStream(ctx, key, bytes.NewReader(value)) +func (st *DiskStorage) WriteBytes(ctx context.Context, key string, value []byte) (int, error) { + n, err := st.WriteStream(ctx, key, bytes.NewReader(value)) + return int(n), err } // WriteStream implements Storage.WriteStream(). -func (st *DiskStorage) WriteStream(ctx context.Context, key string, r io.Reader) error { +func (st *DiskStorage) WriteStream(ctx context.Context, key string, r io.Reader) (int64, error) { // Get file path for key kpath, err := st.filepath(key) if err != nil { - return err + return 0, err } // Check if open if st.lock.Closed() { - return ErrClosed + return 0, ErrClosed } // Check context still valid if err := ctx.Err(); err != nil { - return err + return 0, err } // Ensure dirs leading up to file exist err = os.MkdirAll(path.Dir(kpath), defaultDirPerms) if err != nil { - return err + return 0, err } // Prepare to swap error if need-be @@ -273,20 +271,21 @@ func (st *DiskStorage) WriteStream(ctx context.Context, key string, r io.Reader) // Attempt to open file file, err := open(kpath, flags) if err != nil { - return errSwap(err) + return 0, errSwap(err) } - defer file.Close() // Wrap the file in a compressor cFile, err := st.config.Compression.Writer(file) if err != nil { - return err + _ = file.Close() + return 0, err } + + // Wraps file.Close(). defer cFile.Close() // Copy provided reader to file - _, err = st.cppool.Copy(cFile, r) - return err + return st.cppool.Copy(cFile, r) } // Stat implements Storage.Stat(). diff --git a/vendor/codeberg.org/gruf/go-store/v2/storage/fs.go b/vendor/codeberg.org/gruf/go-store/v2/storage/fs.go index 48a5806f2..be86ac127 100644 --- a/vendor/codeberg.org/gruf/go-store/v2/storage/fs.go +++ b/vendor/codeberg.org/gruf/go-store/v2/storage/fs.go @@ -1,6 +1,7 @@ package storage import ( + "fmt" "io/fs" "os" "syscall" @@ -102,46 +103,32 @@ outer: // cleanDirs traverses the dir tree of the supplied path, removing any folders with zero children func cleanDirs(path string) error { - // Acquire path builder pb := util.GetPathBuilder() defer util.PutPathBuilder(pb) - - // Get top-level dir entries - entries, err := readDir(path) - if err != nil { - return err - } - - for _, entry := range entries { - if entry.IsDir() { - // Recursively clean sub-directory entries - if err := cleanDir(pb, pb.Join(path, entry.Name())); err != nil { - return err - } - } - } - - return nil + return cleanDir(pb, path, true) } // cleanDir performs the actual dir cleaning logic for the above top-level version. -func cleanDir(pb *fastpath.Builder, path string) error { - // Get dir entries +func cleanDir(pb *fastpath.Builder, path string, top bool) error { + // Get dir entries at path. entries, err := readDir(path) if err != nil { return err } - // If no entries, delete - if len(entries) < 1 { + // If no entries, delete dir. + if !top && len(entries) == 0 { return rmdir(path) } for _, entry := range entries { if entry.IsDir() { - // Recursively clean sub-directory entries - if err := cleanDir(pb, pb.Join(path, entry.Name())); err != nil { - return err + // Calculate directory path. + dirPath := pb.Join(path, entry.Name()) + + // Recursively clean sub-directory entries. + if err := cleanDir(pb, dirPath, false); err != nil { + fmt.Fprintf(os.Stderr, "[go-store/storage] error cleaning %s: %v", dirPath, err) } } } diff --git a/vendor/codeberg.org/gruf/go-store/v2/storage/memory.go b/vendor/codeberg.org/gruf/go-store/v2/storage/memory.go index a853c84d2..d42274e39 100644 --- a/vendor/codeberg.org/gruf/go-store/v2/storage/memory.go +++ b/vendor/codeberg.org/gruf/go-store/v2/storage/memory.go @@ -6,7 +6,7 @@ import ( "sync/atomic" "codeberg.org/gruf/go-bytes" - "codeberg.org/gruf/go-store/v2/util" + "codeberg.org/gruf/go-iotools" "github.com/cornelk/hashmap" ) @@ -86,57 +86,57 @@ func (st *MemoryStorage) ReadStream(ctx context.Context, key string) (io.ReadClo // Create io.ReadCloser from 'b' copy r := bytes.NewReader(copyb(b)) - return util.NopReadCloser(r), nil + return iotools.NopReadCloser(r), nil } // WriteBytes implements Storage.WriteBytes(). -func (st *MemoryStorage) WriteBytes(ctx context.Context, key string, b []byte) error { +func (st *MemoryStorage) WriteBytes(ctx context.Context, key string, b []byte) (int, error) { // Check store open if st.closed() { - return ErrClosed + return 0, ErrClosed } // Check context still valid if err := ctx.Err(); err != nil { - return err + return 0, err } // Check for key that already exists if _, ok := st.fs.Get(key); ok && !st.ow { - return ErrAlreadyExists + return 0, ErrAlreadyExists } // Write key copy to store st.fs.Set(key, copyb(b)) - return nil + return len(b), nil } // WriteStream implements Storage.WriteStream(). -func (st *MemoryStorage) WriteStream(ctx context.Context, key string, r io.Reader) error { +func (st *MemoryStorage) WriteStream(ctx context.Context, key string, r io.Reader) (int64, error) { // Check store open if st.closed() { - return ErrClosed + return 0, ErrClosed } // Check context still valid if err := ctx.Err(); err != nil { - return err + return 0, err } // Check for key that already exists if _, ok := st.fs.Get(key); ok && !st.ow { - return ErrAlreadyExists + return 0, ErrAlreadyExists } // Read all from reader b, err := io.ReadAll(r) if err != nil { - return err + return 0, err } // Write key to store st.fs.Set(key, b) - return nil + return int64(len(b)), nil } // Stat implements Storage.Stat(). diff --git a/vendor/codeberg.org/gruf/go-store/v2/storage/s3.go b/vendor/codeberg.org/gruf/go-store/v2/storage/s3.go index f8011114f..501de230d 100644 --- a/vendor/codeberg.org/gruf/go-store/v2/storage/s3.go +++ b/vendor/codeberg.org/gruf/go-store/v2/storage/s3.go @@ -160,22 +160,23 @@ func (st *S3Storage) ReadStream(ctx context.Context, key string) (io.ReadCloser, } // WriteBytes implements Storage.WriteBytes(). -func (st *S3Storage) WriteBytes(ctx context.Context, key string, value []byte) error { - return st.WriteStream(ctx, key, util.NewByteReaderSize(value)) +func (st *S3Storage) WriteBytes(ctx context.Context, key string, value []byte) (int, error) { + n, err := st.WriteStream(ctx, key, util.NewByteReaderSize(value)) + return int(n), err } // WriteStream implements Storage.WriteStream(). -func (st *S3Storage) WriteStream(ctx context.Context, key string, r io.Reader) error { +func (st *S3Storage) WriteStream(ctx context.Context, key string, r io.Reader) (int64, error) { // Check storage open if st.closed() { - return ErrClosed + return 0, ErrClosed } if rs, ok := r.(util.ReaderSize); ok { // This reader supports providing us the size of // the encompassed data, allowing us to perform // a singular .PutObject() call with length. - _, err := st.client.PutObject( + info, err := st.client.PutObject( ctx, st.bucket, key, @@ -186,9 +187,9 @@ func (st *S3Storage) WriteStream(ctx context.Context, key string, r io.Reader) e st.config.PutOpts, ) if err != nil { - return transformS3Error(err) + err = transformS3Error(err) } - return nil + return info.Size, err } // Start a new multipart upload to get ID @@ -199,14 +200,15 @@ func (st *S3Storage) WriteStream(ctx context.Context, key string, r io.Reader) e st.config.PutOpts, ) if err != nil { - return transformS3Error(err) + return 0, transformS3Error(err) } var ( - count = 1 + index = int(1) // parts index + total = int64(0) parts []minio.CompletePart chunk = make([]byte, st.config.PutChunkSize) - rdr = bytes.NewReader(nil) + rbuf = bytes.NewReader(nil) ) // Note that we do not perform any kind of @@ -234,11 +236,11 @@ loop: // All other errors default: - return err + return 0, err } // Reset byte reader - rdr.Reset(chunk[:n]) + rbuf.Reset(chunk[:n]) // Put this object chunk in S3 store pt, err := st.client.PutObjectPart( @@ -246,15 +248,15 @@ loop: st.bucket, key, uploadID, - count, - rdr, + index, + rbuf, int64(n), "", "", nil, ) if err != nil { - return err + return 0, err } // Append completed part to slice @@ -267,8 +269,11 @@ loop: ChecksumSHA256: pt.ChecksumSHA256, }) - // Iterate part count - count++ + // Iterate idx + index++ + + // Update total size + total += pt.Size } // Complete this multi-part upload operation @@ -281,10 +286,10 @@ loop: st.config.PutOpts, ) if err != nil { - return err + return 0, err } - return nil + return total, nil } // Stat implements Storage.Stat(). diff --git a/vendor/codeberg.org/gruf/go-store/v2/storage/storage.go b/vendor/codeberg.org/gruf/go-store/v2/storage/storage.go index 00fbe7abd..a60ea93ad 100644 --- a/vendor/codeberg.org/gruf/go-store/v2/storage/storage.go +++ b/vendor/codeberg.org/gruf/go-store/v2/storage/storage.go @@ -14,10 +14,10 @@ type Storage interface { ReadStream(ctx context.Context, key string) (io.ReadCloser, error) // WriteBytes writes the supplied value bytes at key in the storage - WriteBytes(ctx context.Context, key string, value []byte) error + WriteBytes(ctx context.Context, key string, value []byte) (int, error) // WriteStream writes the bytes from supplied reader at key in the storage - WriteStream(ctx context.Context, key string, r io.Reader) error + WriteStream(ctx context.Context, key string, r io.Reader) (int64, error) // Stat checks if the supplied key is in the storage Stat(ctx context.Context, key string) (bool, error) diff --git a/vendor/codeberg.org/gruf/go-store/v2/util/io.go b/vendor/codeberg.org/gruf/go-store/v2/util/io.go index 3d62e8be6..c5135084a 100644 --- a/vendor/codeberg.org/gruf/go-store/v2/util/io.go +++ b/vendor/codeberg.org/gruf/go-store/v2/util/io.go @@ -5,102 +5,37 @@ import ( "io" ) -// ReaderSize ... +// ReaderSize defines a reader of known size in bytes. type ReaderSize interface { io.Reader - - // Size ... Size() int64 } -// ByteReaderSize ... +// ByteReaderSize implements ReaderSize for an in-memory byte-slice. type ByteReaderSize struct { - bytes.Reader + br bytes.Reader sz int64 } -// NewByteReaderSize ... +// NewByteReaderSize returns a new ByteReaderSize instance reset to slice b. func NewByteReaderSize(b []byte) *ByteReaderSize { - rs := ByteReaderSize{} + rs := new(ByteReaderSize) rs.Reset(b) - return &rs + return rs +} + +// Read implements io.Reader. +func (rs *ByteReaderSize) Read(b []byte) (int, error) { + return rs.br.Read(b) } -// Size implements ReaderSize.Size(). -func (rs ByteReaderSize) Size() int64 { +// Size implements ReaderSize. +func (rs *ByteReaderSize) Size() int64 { return rs.sz } // Reset resets the ReaderSize to be reading from b. func (rs *ByteReaderSize) Reset(b []byte) { - rs.Reader.Reset(b) + rs.br.Reset(b) rs.sz = int64(len(b)) } - -// NopReadCloser turns a supplied io.Reader into io.ReadCloser with a nop Close() implementation. -func NopReadCloser(r io.Reader) io.ReadCloser { - return &nopReadCloser{r} -} - -// NopWriteCloser turns a supplied io.Writer into io.WriteCloser with a nop Close() implementation. -func NopWriteCloser(w io.Writer) io.WriteCloser { - return &nopWriteCloser{w} -} - -// ReadCloserWithCallback adds a customizable callback to be called upon Close() of a supplied io.ReadCloser. -// Note that the callback will never be called more than once, after execution this will remove the func reference. -func ReadCloserWithCallback(rc io.ReadCloser, cb func()) io.ReadCloser { - return &callbackReadCloser{ - ReadCloser: rc, - callback: cb, - } -} - -// WriteCloserWithCallback adds a customizable callback to be called upon Close() of a supplied io.WriteCloser. -// Note that the callback will never be called more than once, after execution this will remove the func reference. -func WriteCloserWithCallback(wc io.WriteCloser, cb func()) io.WriteCloser { - return &callbackWriteCloser{ - WriteCloser: wc, - callback: cb, - } -} - -// nopReadCloser turns an io.Reader -> io.ReadCloser with a nop Close(). -type nopReadCloser struct{ io.Reader } - -func (r *nopReadCloser) Close() error { return nil } - -// nopWriteCloser turns an io.Writer -> io.WriteCloser with a nop Close(). -type nopWriteCloser struct{ io.Writer } - -func (w nopWriteCloser) Close() error { return nil } - -// callbackReadCloser allows adding our own custom callback to an io.ReadCloser. -type callbackReadCloser struct { - io.ReadCloser - callback func() -} - -func (c *callbackReadCloser) Close() error { - if c.callback != nil { - cb := c.callback - c.callback = nil - defer cb() - } - return c.ReadCloser.Close() -} - -// callbackWriteCloser allows adding our own custom callback to an io.WriteCloser. -type callbackWriteCloser struct { - io.WriteCloser - callback func() -} - -func (c *callbackWriteCloser) Close() error { - if c.callback != nil { - cb := c.callback - c.callback = nil - defer cb() - } - return c.WriteCloser.Close() -} |