summaryrefslogtreecommitdiff
path: root/vendor/github.com/klauspost/compress/zstd/encoder.go
diff options
context:
space:
mode:
Diffstat (limited to 'vendor/github.com/klauspost/compress/zstd/encoder.go')
-rw-r--r--vendor/github.com/klauspost/compress/zstd/encoder.go45
1 files changed, 34 insertions, 11 deletions
diff --git a/vendor/github.com/klauspost/compress/zstd/encoder.go b/vendor/github.com/klauspost/compress/zstd/encoder.go
index 72af7ef0f..8f8223cd3 100644
--- a/vendor/github.com/klauspost/compress/zstd/encoder.go
+++ b/vendor/github.com/klauspost/compress/zstd/encoder.go
@@ -6,6 +6,7 @@ package zstd
import (
"crypto/rand"
+ "errors"
"fmt"
"io"
"math"
@@ -149,6 +150,9 @@ func (e *Encoder) ResetContentSize(w io.Writer, size int64) {
// and write CRC if requested.
func (e *Encoder) Write(p []byte) (n int, err error) {
s := &e.state
+ if s.eofWritten {
+ return 0, ErrEncoderClosed
+ }
for len(p) > 0 {
if len(p)+len(s.filling) < e.o.blockSize {
if e.o.crc {
@@ -202,7 +206,7 @@ func (e *Encoder) nextBlock(final bool) error {
return nil
}
if final && len(s.filling) > 0 {
- s.current = e.EncodeAll(s.filling, s.current[:0])
+ s.current = e.encodeAll(s.encoder, s.filling, s.current[:0])
var n2 int
n2, s.err = s.w.Write(s.current)
if s.err != nil {
@@ -288,6 +292,9 @@ func (e *Encoder) nextBlock(final bool) error {
s.filling, s.current, s.previous = s.previous[:0], s.filling, s.current
s.nInput += int64(len(s.current))
s.wg.Add(1)
+ if final {
+ s.eofWritten = true
+ }
go func(src []byte) {
if debugEncoder {
println("Adding block,", len(src), "bytes, final:", final)
@@ -303,9 +310,6 @@ func (e *Encoder) nextBlock(final bool) error {
blk := enc.Block()
enc.Encode(blk, src)
blk.last = final
- if final {
- s.eofWritten = true
- }
// Wait for pending writes.
s.wWg.Wait()
if s.writeErr != nil {
@@ -401,12 +405,20 @@ func (e *Encoder) Flush() error {
if len(s.filling) > 0 {
err := e.nextBlock(false)
if err != nil {
+ // Ignore Flush after Close.
+ if errors.Is(s.err, ErrEncoderClosed) {
+ return nil
+ }
return err
}
}
s.wg.Wait()
s.wWg.Wait()
if s.err != nil {
+ // Ignore Flush after Close.
+ if errors.Is(s.err, ErrEncoderClosed) {
+ return nil
+ }
return s.err
}
return s.writeErr
@@ -422,6 +434,9 @@ func (e *Encoder) Close() error {
}
err := e.nextBlock(true)
if err != nil {
+ if errors.Is(s.err, ErrEncoderClosed) {
+ return nil
+ }
return err
}
if s.frameContentSize > 0 {
@@ -459,6 +474,11 @@ func (e *Encoder) Close() error {
}
_, s.err = s.w.Write(frame)
}
+ if s.err == nil {
+ s.err = ErrEncoderClosed
+ return nil
+ }
+
return s.err
}
@@ -469,6 +489,15 @@ func (e *Encoder) Close() error {
// Data compressed with EncodeAll can be decoded with the Decoder,
// using either a stream or DecodeAll.
func (e *Encoder) EncodeAll(src, dst []byte) []byte {
+ e.init.Do(e.initialize)
+ enc := <-e.encoders
+ defer func() {
+ e.encoders <- enc
+ }()
+ return e.encodeAll(enc, src, dst)
+}
+
+func (e *Encoder) encodeAll(enc encoder, src, dst []byte) []byte {
if len(src) == 0 {
if e.o.fullZero {
// Add frame header.
@@ -491,13 +520,7 @@ func (e *Encoder) EncodeAll(src, dst []byte) []byte {
}
return dst
}
- e.init.Do(e.initialize)
- enc := <-e.encoders
- defer func() {
- // Release encoder reference to last block.
- // If a non-single block is needed the encoder will reset again.
- e.encoders <- enc
- }()
+
// Use single segments when above minimum window and below window size.
single := len(src) <= e.o.windowSize && len(src) > MinWindowSize
if e.o.single != nil {