diff options
author | 2023-01-11 11:13:13 +0000 | |
---|---|---|
committer | 2023-01-11 12:13:13 +0100 | |
commit | 53180548083c0a100db2f703d5f5da047a9e0031 (patch) | |
tree | a8eb1df9d03b37f907a747ae42cc8992d2ff9f52 /vendor/codeberg.org | |
parent | [feature] Add local user and post count to nodeinfo responses (#1325) (diff) | |
download | gotosocial-53180548083c0a100db2f703d5f5da047a9e0031.tar.xz |
[performance] media processing improvements (#1288)
* media processor consolidation and reformatting, reduce amount of required syscalls
Signed-off-by: kim <grufwub@gmail.com>
* update go-store library, stream jpeg/png encoding + use buffer pools, improved media processing AlreadyExists error handling
Signed-off-by: kim <grufwub@gmail.com>
* fix duration not being set, fix mp4 test expecting error
Signed-off-by: kim <grufwub@gmail.com>
* fix test expecting media files with different extension
Signed-off-by: kim <grufwub@gmail.com>
* remove unused code
Signed-off-by: kim <grufwub@gmail.com>
* fix expected storage paths in tests, update expected test thumbnails
Signed-off-by: kim <grufwub@gmail.com>
* remove dead code
Signed-off-by: kim <grufwub@gmail.com>
* fix cached presigned s3 url fetching
Signed-off-by: kim <grufwub@gmail.com>
* fix tests
Signed-off-by: kim <grufwub@gmail.com>
* fix test models
Signed-off-by: kim <grufwub@gmail.com>
* update media processing to use sync.Once{} for concurrency protection
Signed-off-by: kim <grufwub@gmail.com>
* shutup linter
Signed-off-by: kim <grufwub@gmail.com>
* fix passing in KVStore GetStream() as stream to PutStream()
Signed-off-by: kim <grufwub@gmail.com>
* fix unlocks of storage keys
Signed-off-by: kim <grufwub@gmail.com>
* whoops, return the error...
Signed-off-by: kim <grufwub@gmail.com>
* pour one out for tobi's code <3
Signed-off-by: kim <grufwub@gmail.com>
* add back the byte slurping code
Signed-off-by: kim <grufwub@gmail.com>
* check for both ErrUnexpectedEOF and EOF
Signed-off-by: kim <grufwub@gmail.com>
* add back links to file format header information
Signed-off-by: kim <grufwub@gmail.com>
Signed-off-by: kim <grufwub@gmail.com>
Diffstat (limited to 'vendor/codeberg.org')
-rw-r--r-- | vendor/codeberg.org/gruf/go-fastcopy/copy.go | 6 | ||||
-rw-r--r-- | vendor/codeberg.org/gruf/go-iotools/LICENSE | 9 | ||||
-rw-r--r-- | vendor/codeberg.org/gruf/go-iotools/close.go | 35 | ||||
-rw-r--r-- | vendor/codeberg.org/gruf/go-iotools/read.go | 28 | ||||
-rw-r--r-- | vendor/codeberg.org/gruf/go-iotools/write.go | 26 | ||||
-rw-r--r-- | vendor/codeberg.org/gruf/go-mutexes/map.go | 4 | ||||
-rw-r--r-- | vendor/codeberg.org/gruf/go-store/v2/kv/state.go | 8 | ||||
-rw-r--r-- | vendor/codeberg.org/gruf/go-store/v2/kv/store.go | 22 | ||||
-rw-r--r-- | vendor/codeberg.org/gruf/go-store/v2/storage/block.go | 52 | ||||
-rw-r--r-- | vendor/codeberg.org/gruf/go-store/v2/storage/compressor.go | 173 | ||||
-rw-r--r-- | vendor/codeberg.org/gruf/go-store/v2/storage/disk.go | 33 | ||||
-rw-r--r-- | vendor/codeberg.org/gruf/go-store/v2/storage/fs.go | 37 | ||||
-rw-r--r-- | vendor/codeberg.org/gruf/go-store/v2/storage/memory.go | 26 | ||||
-rw-r--r-- | vendor/codeberg.org/gruf/go-store/v2/storage/s3.go | 43 | ||||
-rw-r--r-- | vendor/codeberg.org/gruf/go-store/v2/storage/storage.go | 4 | ||||
-rw-r--r-- | vendor/codeberg.org/gruf/go-store/v2/util/io.go | 93 |
16 files changed, 365 insertions, 234 deletions
diff --git a/vendor/codeberg.org/gruf/go-fastcopy/copy.go b/vendor/codeberg.org/gruf/go-fastcopy/copy.go index 4716b140f..a9c115927 100644 --- a/vendor/codeberg.org/gruf/go-fastcopy/copy.go +++ b/vendor/codeberg.org/gruf/go-fastcopy/copy.go @@ -78,16 +78,16 @@ func (cp *CopyPool) Copy(dst io.Writer, src io.Reader) (int64, error) { var buf []byte - if b, ok := cp.pool.Get().([]byte); ok { + if b, ok := cp.pool.Get().(*[]byte); ok { // Acquired buf from pool - buf = b + buf = *b } else { // Allocate new buffer of size buf = make([]byte, cp.Buffer(0)) } // Defer release to pool - defer cp.pool.Put(buf) + defer cp.pool.Put(&buf) var n int64 for { diff --git a/vendor/codeberg.org/gruf/go-iotools/LICENSE b/vendor/codeberg.org/gruf/go-iotools/LICENSE new file mode 100644 index 000000000..e4163ae35 --- /dev/null +++ b/vendor/codeberg.org/gruf/go-iotools/LICENSE @@ -0,0 +1,9 @@ +MIT License + +Copyright (c) 2022 gruf + +Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. diff --git a/vendor/codeberg.org/gruf/go-iotools/close.go b/vendor/codeberg.org/gruf/go-iotools/close.go new file mode 100644 index 000000000..fbed7f33c --- /dev/null +++ b/vendor/codeberg.org/gruf/go-iotools/close.go @@ -0,0 +1,35 @@ +package iotools + +import "io" + +// CloserFunc is a function signature which allows +// a function to implement the io.Closer type. +type CloserFunc func() error + +func (c CloserFunc) Close() error { + return c() +} + +func CloserCallback(c io.Closer, cb func()) io.Closer { + return CloserFunc(func() error { + defer cb() + return c.Close() + }) +} + +// CloseOnce wraps an io.Closer to ensure it only performs the close logic once. +func CloseOnce(c io.Closer) io.Closer { + return CloserFunc(func() error { + if c == nil { + // already run. + return nil + } + + // Acquire. + cptr := c + c = nil + + // Call the closer. + return cptr.Close() + }) +} diff --git a/vendor/codeberg.org/gruf/go-iotools/read.go b/vendor/codeberg.org/gruf/go-iotools/read.go new file mode 100644 index 000000000..4a134e7b3 --- /dev/null +++ b/vendor/codeberg.org/gruf/go-iotools/read.go @@ -0,0 +1,28 @@ +package iotools + +import ( + "io" +) + +// ReaderFunc is a function signature which allows +// a function to implement the io.Reader type. +type ReaderFunc func([]byte) (int, error) + +func (r ReaderFunc) Read(b []byte) (int, error) { + return r(b) +} + +// ReadCloser wraps an io.Reader and io.Closer in order to implement io.ReadCloser. +func ReadCloser(r io.Reader, c io.Closer) io.ReadCloser { + return &struct { + io.Reader + io.Closer + }{r, c} +} + +// NopReadCloser wraps an io.Reader to implement io.ReadCloser with empty io.Closer implementation. +func NopReadCloser(r io.Reader) io.ReadCloser { + return ReadCloser(r, CloserFunc(func() error { + return nil + })) +} diff --git a/vendor/codeberg.org/gruf/go-iotools/write.go b/vendor/codeberg.org/gruf/go-iotools/write.go new file mode 100644 index 000000000..c520b8636 --- /dev/null +++ b/vendor/codeberg.org/gruf/go-iotools/write.go @@ -0,0 +1,26 @@ +package iotools + +import "io" + +// WriterFunc is a function signature which allows +// a function to implement the io.Writer type. +type WriterFunc func([]byte) (int, error) + +func (w WriterFunc) Write(b []byte) (int, error) { + return w(b) +} + +// WriteCloser wraps an io.Writer and io.Closer in order to implement io.WriteCloser. +func WriteCloser(w io.Writer, c io.Closer) io.WriteCloser { + return &struct { + io.Writer + io.Closer + }{w, c} +} + +// NopWriteCloser wraps an io.Writer to implement io.WriteCloser with empty io.Closer implementation. +func NopWriteCloser(w io.Writer) io.WriteCloser { + return WriteCloser(w, CloserFunc(func() error { + return nil + })) +} diff --git a/vendor/codeberg.org/gruf/go-mutexes/map.go b/vendor/codeberg.org/gruf/go-mutexes/map.go index a3c171c7a..73f8f1821 100644 --- a/vendor/codeberg.org/gruf/go-mutexes/map.go +++ b/vendor/codeberg.org/gruf/go-mutexes/map.go @@ -454,7 +454,9 @@ func (mu *rwmutex) Unlock() { if mu.rcnt > 0 { // RUnlock mu.rcnt-- - } else { + } + + if mu.rcnt == 0 { // Total unlock mu.lock = 0 } diff --git a/vendor/codeberg.org/gruf/go-store/v2/kv/state.go b/vendor/codeberg.org/gruf/go-store/v2/kv/state.go index 9ac8ab1bf..450cd850c 100644 --- a/vendor/codeberg.org/gruf/go-store/v2/kv/state.go +++ b/vendor/codeberg.org/gruf/go-store/v2/kv/state.go @@ -77,17 +77,17 @@ func (st *StateRW) GetStream(ctx context.Context, key string) (io.ReadCloser, er } // Put: see KVStore.Put(). Returns error if state already closed. -func (st *StateRW) Put(ctx context.Context, key string, value []byte) error { +func (st *StateRW) Put(ctx context.Context, key string, value []byte) (int, error) { if st.store == nil { - return ErrStateClosed + return 0, ErrStateClosed } return st.store.put(st.state.Lock, ctx, key, value) } // PutStream: see KVStore.PutStream(). Returns error if state already closed. -func (st *StateRW) PutStream(ctx context.Context, key string, r io.Reader) error { +func (st *StateRW) PutStream(ctx context.Context, key string, r io.Reader) (int64, error) { if st.store == nil { - return ErrStateClosed + return 0, ErrStateClosed } return st.store.putStream(st.state.Lock, ctx, key, r) } diff --git a/vendor/codeberg.org/gruf/go-store/v2/kv/store.go b/vendor/codeberg.org/gruf/go-store/v2/kv/store.go index 5ea795e7c..0b878c47f 100644 --- a/vendor/codeberg.org/gruf/go-store/v2/kv/store.go +++ b/vendor/codeberg.org/gruf/go-store/v2/kv/store.go @@ -4,9 +4,9 @@ import ( "context" "io" + "codeberg.org/gruf/go-iotools" "codeberg.org/gruf/go-mutexes" "codeberg.org/gruf/go-store/v2/storage" - "codeberg.org/gruf/go-store/v2/util" ) // KVStore is a very simple, yet performant key-value store @@ -117,17 +117,25 @@ func (st *KVStore) getStream(rlock func(string) func(), ctx context.Context, key return nil, err } - // Wrap readcloser in our own callback closer - return util.ReadCloserWithCallback(rd, runlock), nil + var unlocked bool + + // Wrap readcloser to call our own callback + return iotools.ReadCloser(rd, iotools.CloserFunc(func() error { + if !unlocked { + unlocked = true + defer runlock() + } + return rd.Close() + })), nil } // Put places the bytes at the supplied key in the store. -func (st *KVStore) Put(ctx context.Context, key string, value []byte) error { +func (st *KVStore) Put(ctx context.Context, key string, value []byte) (int, error) { return st.put(st.Lock, ctx, key, value) } // put performs the underlying logic for KVStore.Put(), using supplied lock func to allow use with states. -func (st *KVStore) put(lock func(string) func(), ctx context.Context, key string, value []byte) error { +func (st *KVStore) put(lock func(string) func(), ctx context.Context, key string, value []byte) (int, error) { // Acquire write lock for key unlock := lock(key) defer unlock() @@ -137,12 +145,12 @@ func (st *KVStore) put(lock func(string) func(), ctx context.Context, key string } // PutStream writes the bytes from the supplied Reader at the supplied key in the store. -func (st *KVStore) PutStream(ctx context.Context, key string, r io.Reader) error { +func (st *KVStore) PutStream(ctx context.Context, key string, r io.Reader) (int64, error) { return st.putStream(st.Lock, ctx, key, r) } // putStream performs the underlying logic for KVStore.PutStream(), using supplied lock func to allow use with states. -func (st *KVStore) putStream(lock func(string) func(), ctx context.Context, key string, r io.Reader) error { +func (st *KVStore) putStream(lock func(string) func(), ctx context.Context, key string, r io.Reader) (int64, error) { // Acquire write lock for key unlock := lock(key) defer unlock() diff --git a/vendor/codeberg.org/gruf/go-store/v2/storage/block.go b/vendor/codeberg.org/gruf/go-store/v2/storage/block.go index f41099c75..11a757211 100644 --- a/vendor/codeberg.org/gruf/go-store/v2/storage/block.go +++ b/vendor/codeberg.org/gruf/go-store/v2/storage/block.go @@ -10,12 +10,14 @@ import ( "os" "strings" "sync" + "sync/atomic" "syscall" "codeberg.org/gruf/go-byteutil" "codeberg.org/gruf/go-errors/v2" "codeberg.org/gruf/go-fastcopy" "codeberg.org/gruf/go-hashenc" + "codeberg.org/gruf/go-iotools" "codeberg.org/gruf/go-pools" "codeberg.org/gruf/go-store/v2/util" ) @@ -354,7 +356,7 @@ func (st *BlockStorage) ReadStream(ctx context.Context, key string) (io.ReadClos } // Prepare block reader and return - return util.NopReadCloser(&blockReader{ + return iotools.NopReadCloser(&blockReader{ storage: st, node: &node, }), nil @@ -384,52 +386,54 @@ func (st *BlockStorage) readBlock(key string) ([]byte, error) { } // WriteBytes implements Storage.WriteBytes(). -func (st *BlockStorage) WriteBytes(ctx context.Context, key string, value []byte) error { - return st.WriteStream(ctx, key, bytes.NewReader(value)) +func (st *BlockStorage) WriteBytes(ctx context.Context, key string, value []byte) (int, error) { + n, err := st.WriteStream(ctx, key, bytes.NewReader(value)) + return int(n), err } // WriteStream implements Storage.WriteStream(). -func (st *BlockStorage) WriteStream(ctx context.Context, key string, r io.Reader) error { +func (st *BlockStorage) WriteStream(ctx context.Context, key string, r io.Reader) (int64, error) { // Get node file path for key npath, err := st.nodePathForKey(key) if err != nil { - return err + return 0, err } // Check if open if st.lock.Closed() { - return ErrClosed + return 0, ErrClosed } // Check context still valid if err := ctx.Err(); err != nil { - return err + return 0, err } // Check if this exists ok, err := stat(key) if err != nil { - return err + return 0, err } // Check if we allow overwrites if ok && !st.config.Overwrite { - return ErrAlreadyExists + return 0, ErrAlreadyExists } // Ensure nodes dir (and any leading up to) exists err = os.MkdirAll(st.nodePath, defaultDirPerms) if err != nil { - return err + return 0, err } // Ensure blocks dir (and any leading up to) exists err = os.MkdirAll(st.blockPath, defaultDirPerms) if err != nil { - return err + return 0, err } var node node + var total atomic.Int64 // Acquire HashEncoder hc := st.hashPool.Get().(*hashEncoder) @@ -456,7 +460,7 @@ loop: break loop default: st.bufpool.Put(buf) - return err + return 0, err } // Hash the encoded data @@ -469,7 +473,7 @@ loop: has, err := st.statBlock(sum) if err != nil { st.bufpool.Put(buf) - return err + return 0, err } else if has { st.bufpool.Put(buf) continue loop @@ -490,11 +494,14 @@ loop: }() // Write block to store at hash - err = st.writeBlock(sum, buf.B[:n]) + n, err := st.writeBlock(sum, buf.B[:n]) if err != nil { onceErr.Store(err) return } + + // Increment total. + total.Add(int64(n)) }() // Break at end @@ -506,12 +513,12 @@ loop: // Wait, check errors wg.Wait() if onceErr.IsSet() { - return onceErr.Load() + return 0, onceErr.Load() } // If no hashes created, return if len(node.hashes) < 1 { - return new_error("no hashes written") + return 0, new_error("no hashes written") } // Prepare to swap error if need-be @@ -535,7 +542,7 @@ loop: // Attempt to open RW file file, err := open(npath, flags) if err != nil { - return errSwap(err) + return 0, errSwap(err) } defer file.Close() @@ -546,11 +553,11 @@ loop: // Finally, write data to file _, err = io.CopyBuffer(file, &nodeReader{node: node}, buf.B) - return err + return total.Load(), err } // writeBlock writes the block with hash and supplied value to the filesystem. -func (st *BlockStorage) writeBlock(hash string, value []byte) error { +func (st *BlockStorage) writeBlock(hash string, value []byte) (int, error) { // Get block file path for key bpath := st.blockPathForKey(hash) @@ -560,20 +567,19 @@ func (st *BlockStorage) writeBlock(hash string, value []byte) error { if err == syscall.EEXIST { err = nil /* race issue describe in struct NOTE */ } - return err + return 0, err } defer file.Close() // Wrap the file in a compressor cFile, err := st.config.Compression.Writer(file) if err != nil { - return err + return 0, err } defer cFile.Close() // Write value to file - _, err = cFile.Write(value) - return err + return cFile.Write(value) } // statBlock checks for existence of supplied block hash. diff --git a/vendor/codeberg.org/gruf/go-store/v2/storage/compressor.go b/vendor/codeberg.org/gruf/go-store/v2/storage/compressor.go index 6eeb3a78d..bbe02f22d 100644 --- a/vendor/codeberg.org/gruf/go-store/v2/storage/compressor.go +++ b/vendor/codeberg.org/gruf/go-store/v2/storage/compressor.go @@ -5,7 +5,7 @@ import ( "io" "sync" - "codeberg.org/gruf/go-store/v2/util" + "codeberg.org/gruf/go-iotools" "github.com/klauspost/compress/gzip" "github.com/klauspost/compress/snappy" @@ -15,10 +15,10 @@ import ( // Compressor defines a means of compressing/decompressing values going into a key-value store type Compressor interface { // Reader returns a new decompressing io.ReadCloser based on supplied (compressed) io.Reader - Reader(io.Reader) (io.ReadCloser, error) + Reader(io.ReadCloser) (io.ReadCloser, error) // Writer returns a new compressing io.WriteCloser based on supplied (uncompressed) io.Writer - Writer(io.Writer) (io.WriteCloser, error) + Writer(io.WriteCloser) (io.WriteCloser, error) } type gzipCompressor struct { @@ -47,8 +47,8 @@ func GZipCompressorLevel(level int) Compressor { // Write empty data to ensure gzip // header data is in byte buffer. - gw.Write([]byte{}) - gw.Close() + _, _ = gw.Write([]byte{}) + _ = gw.Close() return &gzipCompressor{ rpool: sync.Pool{ @@ -67,23 +67,61 @@ func GZipCompressorLevel(level int) Compressor { } } -func (c *gzipCompressor) Reader(r io.Reader) (io.ReadCloser, error) { +func (c *gzipCompressor) Reader(rc io.ReadCloser) (io.ReadCloser, error) { + var released bool + + // Acquire from pool. gr := c.rpool.Get().(*gzip.Reader) - if err := gr.Reset(r); err != nil { + if err := gr.Reset(rc); err != nil { c.rpool.Put(gr) return nil, err } - return util.ReadCloserWithCallback(gr, func() { - c.rpool.Put(gr) - }), nil + + return iotools.ReadCloser(gr, iotools.CloserFunc(func() error { + if !released { + released = true + defer c.rpool.Put(gr) + } + + // Close compressor + err1 := gr.Close() + + // Close original stream. + err2 := rc.Close() + + // Return err1 or 2 + if err1 != nil { + return err1 + } + return err2 + })), nil } -func (c *gzipCompressor) Writer(w io.Writer) (io.WriteCloser, error) { +func (c *gzipCompressor) Writer(wc io.WriteCloser) (io.WriteCloser, error) { + var released bool + + // Acquire from pool. gw := c.wpool.Get().(*gzip.Writer) - gw.Reset(w) - return util.WriteCloserWithCallback(gw, func() { - c.wpool.Put(gw) - }), nil + gw.Reset(wc) + + return iotools.WriteCloser(gw, iotools.CloserFunc(func() error { + if !released { + released = true + c.wpool.Put(gw) + } + + // Close compressor + err1 := gw.Close() + + // Close original stream. + err2 := wc.Close() + + // Return err1 or 2 + if err1 != nil { + return err1 + } + return err2 + })), nil } type zlibCompressor struct { @@ -139,26 +177,61 @@ func ZLibCompressorLevelDict(level int, dict []byte) Compressor { } } -func (c *zlibCompressor) Reader(r io.Reader) (io.ReadCloser, error) { +func (c *zlibCompressor) Reader(rc io.ReadCloser) (io.ReadCloser, error) { + var released bool zr := c.rpool.Get().(interface { io.ReadCloser zlib.Resetter }) - if err := zr.Reset(r, c.dict); err != nil { + if err := zr.Reset(rc, c.dict); err != nil { c.rpool.Put(zr) return nil, err } - return util.ReadCloserWithCallback(zr, func() { - c.rpool.Put(zr) - }), nil + return iotools.ReadCloser(zr, iotools.CloserFunc(func() error { + if !released { + released = true + defer c.rpool.Put(zr) + } + + // Close compressor + err1 := zr.Close() + + // Close original stream. + err2 := rc.Close() + + // Return err1 or 2 + if err1 != nil { + return err1 + } + return err2 + })), nil } -func (c *zlibCompressor) Writer(w io.Writer) (io.WriteCloser, error) { +func (c *zlibCompressor) Writer(wc io.WriteCloser) (io.WriteCloser, error) { + var released bool + + // Acquire from pool. zw := c.wpool.Get().(*zlib.Writer) - zw.Reset(w) - return util.WriteCloserWithCallback(zw, func() { - c.wpool.Put(zw) - }), nil + zw.Reset(wc) + + return iotools.WriteCloser(zw, iotools.CloserFunc(func() error { + if !released { + released = true + c.wpool.Put(zw) + } + + // Close compressor + err1 := zw.Close() + + // Close original stream. + err2 := wc.Close() + + // Return err1 or 2 + if err1 != nil { + return err1 + } + return err2 + })), nil } type snappyCompressor struct { @@ -178,22 +251,40 @@ func SnappyCompressor() Compressor { } } -func (c *snappyCompressor) Reader(r io.Reader) (io.ReadCloser, error) { +func (c *snappyCompressor) Reader(rc io.ReadCloser) (io.ReadCloser, error) { + var released bool + + // Acquire from pool. sr := c.rpool.Get().(*snappy.Reader) - sr.Reset(r) - return util.ReadCloserWithCallback( - util.NopReadCloser(sr), - func() { c.rpool.Put(sr) }, - ), nil + sr.Reset(rc) + + return iotools.ReadCloser(sr, iotools.CloserFunc(func() error { + if !released { + released = true + defer c.rpool.Put(sr) + } + + // Close original stream. + return rc.Close() + })), nil } -func (c *snappyCompressor) Writer(w io.Writer) (io.WriteCloser, error) { +func (c *snappyCompressor) Writer(wc io.WriteCloser) (io.WriteCloser, error) { + var released bool + + // Acquire from pool. sw := c.wpool.Get().(*snappy.Writer) - sw.Reset(w) - return util.WriteCloserWithCallback( - util.NopWriteCloser(sw), - func() { c.wpool.Put(sw) }, - ), nil + sw.Reset(wc) + + return iotools.WriteCloser(sw, iotools.CloserFunc(func() error { + if !released { + released = true + c.wpool.Put(sw) + } + + // Close original stream. + return wc.Close() + })), nil } type nopCompressor struct{} @@ -203,10 +294,10 @@ func NoCompression() Compressor { return &nopCompressor{} } -func (c *nopCompressor) Reader(r io.Reader) (io.ReadCloser, error) { - return util.NopReadCloser(r), nil +func (c *nopCompressor) Reader(rc io.ReadCloser) (io.ReadCloser, error) { + return rc, nil } -func (c *nopCompressor) Writer(w io.Writer) (io.WriteCloser, error) { - return util.NopWriteCloser(w), nil +func (c *nopCompressor) Writer(wc io.WriteCloser) (io.WriteCloser, error) { + return wc, nil } diff --git a/vendor/codeberg.org/gruf/go-store/v2/storage/disk.go b/vendor/codeberg.org/gruf/go-store/v2/storage/disk.go index ef6993edd..21dba7671 100644 --- a/vendor/codeberg.org/gruf/go-store/v2/storage/disk.go +++ b/vendor/codeberg.org/gruf/go-store/v2/storage/disk.go @@ -219,43 +219,41 @@ func (st *DiskStorage) ReadStream(ctx context.Context, key string) (io.ReadClose // Wrap the file in a compressor cFile, err := st.config.Compression.Reader(file) if err != nil { - file.Close() // close this here, ignore error + _ = file.Close() return nil, err } - // Wrap compressor to ensure file close - return util.ReadCloserWithCallback(cFile, func() { - file.Close() - }), nil + return cFile, nil } // WriteBytes implements Storage.WriteBytes(). -func (st *DiskStorage) WriteBytes(ctx context.Context, key string, value []byte) error { - return st.WriteStream(ctx, key, bytes.NewReader(value)) +func (st *DiskStorage) WriteBytes(ctx context.Context, key string, value []byte) (int, error) { + n, err := st.WriteStream(ctx, key, bytes.NewReader(value)) + return int(n), err } // WriteStream implements Storage.WriteStream(). -func (st *DiskStorage) WriteStream(ctx context.Context, key string, r io.Reader) error { +func (st *DiskStorage) WriteStream(ctx context.Context, key string, r io.Reader) (int64, error) { // Get file path for key kpath, err := st.filepath(key) if err != nil { - return err + return 0, err } // Check if open if st.lock.Closed() { - return ErrClosed + return 0, ErrClosed } // Check context still valid if err := ctx.Err(); err != nil { - return err + return 0, err } // Ensure dirs leading up to file exist err = os.MkdirAll(path.Dir(kpath), defaultDirPerms) if err != nil { - return err + return 0, err } // Prepare to swap error if need-be @@ -273,20 +271,21 @@ func (st *DiskStorage) WriteStream(ctx context.Context, key string, r io.Reader) // Attempt to open file file, err := open(kpath, flags) if err != nil { - return errSwap(err) + return 0, errSwap(err) } - defer file.Close() // Wrap the file in a compressor cFile, err := st.config.Compression.Writer(file) if err != nil { - return err + _ = file.Close() + return 0, err } + + // Wraps file.Close(). defer cFile.Close() // Copy provided reader to file - _, err = st.cppool.Copy(cFile, r) - return err + return st.cppool.Copy(cFile, r) } // Stat implements Storage.Stat(). diff --git a/vendor/codeberg.org/gruf/go-store/v2/storage/fs.go b/vendor/codeberg.org/gruf/go-store/v2/storage/fs.go index 48a5806f2..be86ac127 100644 --- a/vendor/codeberg.org/gruf/go-store/v2/storage/fs.go +++ b/vendor/codeberg.org/gruf/go-store/v2/storage/fs.go @@ -1,6 +1,7 @@ package storage import ( + "fmt" "io/fs" "os" "syscall" @@ -102,46 +103,32 @@ outer: // cleanDirs traverses the dir tree of the supplied path, removing any folders with zero children func cleanDirs(path string) error { - // Acquire path builder pb := util.GetPathBuilder() defer util.PutPathBuilder(pb) - - // Get top-level dir entries - entries, err := readDir(path) - if err != nil { - return err - } - - for _, entry := range entries { - if entry.IsDir() { - // Recursively clean sub-directory entries - if err := cleanDir(pb, pb.Join(path, entry.Name())); err != nil { - return err - } - } - } - - return nil + return cleanDir(pb, path, true) } // cleanDir performs the actual dir cleaning logic for the above top-level version. -func cleanDir(pb *fastpath.Builder, path string) error { - // Get dir entries +func cleanDir(pb *fastpath.Builder, path string, top bool) error { + // Get dir entries at path. entries, err := readDir(path) if err != nil { return err } - // If no entries, delete - if len(entries) < 1 { + // If no entries, delete dir. + if !top && len(entries) == 0 { return rmdir(path) } for _, entry := range entries { if entry.IsDir() { - // Recursively clean sub-directory entries - if err := cleanDir(pb, pb.Join(path, entry.Name())); err != nil { - return err + // Calculate directory path. + dirPath := pb.Join(path, entry.Name()) + + // Recursively clean sub-directory entries. + if err := cleanDir(pb, dirPath, false); err != nil { + fmt.Fprintf(os.Stderr, "[go-store/storage] error cleaning %s: %v", dirPath, err) } } } diff --git a/vendor/codeberg.org/gruf/go-store/v2/storage/memory.go b/vendor/codeberg.org/gruf/go-store/v2/storage/memory.go index a853c84d2..d42274e39 100644 --- a/vendor/codeberg.org/gruf/go-store/v2/storage/memory.go +++ b/vendor/codeberg.org/gruf/go-store/v2/storage/memory.go @@ -6,7 +6,7 @@ import ( "sync/atomic" "codeberg.org/gruf/go-bytes" - "codeberg.org/gruf/go-store/v2/util" + "codeberg.org/gruf/go-iotools" "github.com/cornelk/hashmap" ) @@ -86,57 +86,57 @@ func (st *MemoryStorage) ReadStream(ctx context.Context, key string) (io.ReadClo // Create io.ReadCloser from 'b' copy r := bytes.NewReader(copyb(b)) - return util.NopReadCloser(r), nil + return iotools.NopReadCloser(r), nil } // WriteBytes implements Storage.WriteBytes(). -func (st *MemoryStorage) WriteBytes(ctx context.Context, key string, b []byte) error { +func (st *MemoryStorage) WriteBytes(ctx context.Context, key string, b []byte) (int, error) { // Check store open if st.closed() { - return ErrClosed + return 0, ErrClosed } // Check context still valid if err := ctx.Err(); err != nil { - return err + return 0, err } // Check for key that already exists if _, ok := st.fs.Get(key); ok && !st.ow { - return ErrAlreadyExists + return 0, ErrAlreadyExists } // Write key copy to store st.fs.Set(key, copyb(b)) - return nil + return len(b), nil } // WriteStream implements Storage.WriteStream(). -func (st *MemoryStorage) WriteStream(ctx context.Context, key string, r io.Reader) error { +func (st *MemoryStorage) WriteStream(ctx context.Context, key string, r io.Reader) (int64, error) { // Check store open if st.closed() { - return ErrClosed + return 0, ErrClosed } // Check context still valid if err := ctx.Err(); err != nil { - return err + return 0, err } // Check for key that already exists if _, ok := st.fs.Get(key); ok && !st.ow { - return ErrAlreadyExists + return 0, ErrAlreadyExists } // Read all from reader b, err := io.ReadAll(r) if err != nil { - return err + return 0, err } // Write key to store st.fs.Set(key, b) - return nil + return int64(len(b)), nil } // Stat implements Storage.Stat(). diff --git a/vendor/codeberg.org/gruf/go-store/v2/storage/s3.go b/vendor/codeberg.org/gruf/go-store/v2/storage/s3.go index f8011114f..501de230d 100644 --- a/vendor/codeberg.org/gruf/go-store/v2/storage/s3.go +++ b/vendor/codeberg.org/gruf/go-store/v2/storage/s3.go @@ -160,22 +160,23 @@ func (st *S3Storage) ReadStream(ctx context.Context, key string) (io.ReadCloser, } // WriteBytes implements Storage.WriteBytes(). -func (st *S3Storage) WriteBytes(ctx context.Context, key string, value []byte) error { - return st.WriteStream(ctx, key, util.NewByteReaderSize(value)) +func (st *S3Storage) WriteBytes(ctx context.Context, key string, value []byte) (int, error) { + n, err := st.WriteStream(ctx, key, util.NewByteReaderSize(value)) + return int(n), err } // WriteStream implements Storage.WriteStream(). -func (st *S3Storage) WriteStream(ctx context.Context, key string, r io.Reader) error { +func (st *S3Storage) WriteStream(ctx context.Context, key string, r io.Reader) (int64, error) { // Check storage open if st.closed() { - return ErrClosed + return 0, ErrClosed } if rs, ok := r.(util.ReaderSize); ok { // This reader supports providing us the size of // the encompassed data, allowing us to perform // a singular .PutObject() call with length. - _, err := st.client.PutObject( + info, err := st.client.PutObject( ctx, st.bucket, key, @@ -186,9 +187,9 @@ func (st *S3Storage) WriteStream(ctx context.Context, key string, r io.Reader) e st.config.PutOpts, ) if err != nil { - return transformS3Error(err) + err = transformS3Error(err) } - return nil + return info.Size, err } // Start a new multipart upload to get ID @@ -199,14 +200,15 @@ func (st *S3Storage) WriteStream(ctx context.Context, key string, r io.Reader) e st.config.PutOpts, ) if err != nil { - return transformS3Error(err) + return 0, transformS3Error(err) } var ( - count = 1 + index = int(1) // parts index + total = int64(0) parts []minio.CompletePart chunk = make([]byte, st.config.PutChunkSize) - rdr = bytes.NewReader(nil) + rbuf = bytes.NewReader(nil) ) // Note that we do not perform any kind of @@ -234,11 +236,11 @@ loop: // All other errors default: - return err + return 0, err } // Reset byte reader - rdr.Reset(chunk[:n]) + rbuf.Reset(chunk[:n]) // Put this object chunk in S3 store pt, err := st.client.PutObjectPart( @@ -246,15 +248,15 @@ loop: st.bucket, key, uploadID, - count, - rdr, + index, + rbuf, int64(n), "", "", nil, ) if err != nil { - return err + return 0, err } // Append completed part to slice @@ -267,8 +269,11 @@ loop: ChecksumSHA256: pt.ChecksumSHA256, }) - // Iterate part count - count++ + // Iterate idx + index++ + + // Update total size + total += pt.Size } // Complete this multi-part upload operation @@ -281,10 +286,10 @@ loop: st.config.PutOpts, ) if err != nil { - return err + return 0, err } - return nil + return total, nil } // Stat implements Storage.Stat(). diff --git a/vendor/codeberg.org/gruf/go-store/v2/storage/storage.go b/vendor/codeberg.org/gruf/go-store/v2/storage/storage.go index 00fbe7abd..a60ea93ad 100644 --- a/vendor/codeberg.org/gruf/go-store/v2/storage/storage.go +++ b/vendor/codeberg.org/gruf/go-store/v2/storage/storage.go @@ -14,10 +14,10 @@ type Storage interface { ReadStream(ctx context.Context, key string) (io.ReadCloser, error) // WriteBytes writes the supplied value bytes at key in the storage - WriteBytes(ctx context.Context, key string, value []byte) error + WriteBytes(ctx context.Context, key string, value []byte) (int, error) // WriteStream writes the bytes from supplied reader at key in the storage - WriteStream(ctx context.Context, key string, r io.Reader) error + WriteStream(ctx context.Context, key string, r io.Reader) (int64, error) // Stat checks if the supplied key is in the storage Stat(ctx context.Context, key string) (bool, error) diff --git a/vendor/codeberg.org/gruf/go-store/v2/util/io.go b/vendor/codeberg.org/gruf/go-store/v2/util/io.go index 3d62e8be6..c5135084a 100644 --- a/vendor/codeberg.org/gruf/go-store/v2/util/io.go +++ b/vendor/codeberg.org/gruf/go-store/v2/util/io.go @@ -5,102 +5,37 @@ import ( "io" ) -// ReaderSize ... +// ReaderSize defines a reader of known size in bytes. type ReaderSize interface { io.Reader - - // Size ... Size() int64 } -// ByteReaderSize ... +// ByteReaderSize implements ReaderSize for an in-memory byte-slice. type ByteReaderSize struct { - bytes.Reader + br bytes.Reader sz int64 } -// NewByteReaderSize ... +// NewByteReaderSize returns a new ByteReaderSize instance reset to slice b. func NewByteReaderSize(b []byte) *ByteReaderSize { - rs := ByteReaderSize{} + rs := new(ByteReaderSize) rs.Reset(b) - return &rs + return rs +} + +// Read implements io.Reader. +func (rs *ByteReaderSize) Read(b []byte) (int, error) { + return rs.br.Read(b) } -// Size implements ReaderSize.Size(). -func (rs ByteReaderSize) Size() int64 { +// Size implements ReaderSize. +func (rs *ByteReaderSize) Size() int64 { return rs.sz } // Reset resets the ReaderSize to be reading from b. func (rs *ByteReaderSize) Reset(b []byte) { - rs.Reader.Reset(b) + rs.br.Reset(b) rs.sz = int64(len(b)) } - -// NopReadCloser turns a supplied io.Reader into io.ReadCloser with a nop Close() implementation. -func NopReadCloser(r io.Reader) io.ReadCloser { - return &nopReadCloser{r} -} - -// NopWriteCloser turns a supplied io.Writer into io.WriteCloser with a nop Close() implementation. -func NopWriteCloser(w io.Writer) io.WriteCloser { - return &nopWriteCloser{w} -} - -// ReadCloserWithCallback adds a customizable callback to be called upon Close() of a supplied io.ReadCloser. -// Note that the callback will never be called more than once, after execution this will remove the func reference. -func ReadCloserWithCallback(rc io.ReadCloser, cb func()) io.ReadCloser { - return &callbackReadCloser{ - ReadCloser: rc, - callback: cb, - } -} - -// WriteCloserWithCallback adds a customizable callback to be called upon Close() of a supplied io.WriteCloser. -// Note that the callback will never be called more than once, after execution this will remove the func reference. -func WriteCloserWithCallback(wc io.WriteCloser, cb func()) io.WriteCloser { - return &callbackWriteCloser{ - WriteCloser: wc, - callback: cb, - } -} - -// nopReadCloser turns an io.Reader -> io.ReadCloser with a nop Close(). -type nopReadCloser struct{ io.Reader } - -func (r *nopReadCloser) Close() error { return nil } - -// nopWriteCloser turns an io.Writer -> io.WriteCloser with a nop Close(). -type nopWriteCloser struct{ io.Writer } - -func (w nopWriteCloser) Close() error { return nil } - -// callbackReadCloser allows adding our own custom callback to an io.ReadCloser. -type callbackReadCloser struct { - io.ReadCloser - callback func() -} - -func (c *callbackReadCloser) Close() error { - if c.callback != nil { - cb := c.callback - c.callback = nil - defer cb() - } - return c.ReadCloser.Close() -} - -// callbackWriteCloser allows adding our own custom callback to an io.WriteCloser. -type callbackWriteCloser struct { - io.WriteCloser - callback func() -} - -func (c *callbackWriteCloser) Close() error { - if c.callback != nil { - cb := c.callback - c.callback = nil - defer cb() - } - return c.WriteCloser.Close() -} |