summaryrefslogtreecommitdiff
path: root/vendor/github.com/klauspost/compress/s2/encode.go
diff options
context:
space:
mode:
authorLibravatar dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>2023-05-29 13:47:11 +0100
committerLibravatar GitHub <noreply@github.com>2023-05-29 13:47:11 +0100
commit9ed96bc57083b4261a9e6571d86ec94b1e771e40 (patch)
tree87b6521816ed4d1242f47a731895cd5a8c6cabc6 /vendor/github.com/klauspost/compress/s2/encode.go
parent[bugfix/chore] Inbox post updates (#1821) (diff)
downloadgotosocial-9ed96bc57083b4261a9e6571d86ec94b1e771e40.tar.xz
[chore]: Bump github.com/minio/minio-go/v7 from 7.0.53 to 7.0.55 (#1844)
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
Diffstat (limited to 'vendor/github.com/klauspost/compress/s2/encode.go')
-rw-r--r--vendor/github.com/klauspost/compress/s2/encode.go1030
1 files changed, 20 insertions, 1010 deletions
diff --git a/vendor/github.com/klauspost/compress/s2/encode.go b/vendor/github.com/klauspost/compress/s2/encode.go
index c2ca7236a..e6c231021 100644
--- a/vendor/github.com/klauspost/compress/s2/encode.go
+++ b/vendor/github.com/klauspost/compress/s2/encode.go
@@ -6,15 +6,9 @@
package s2
import (
- "crypto/rand"
"encoding/binary"
- "errors"
- "fmt"
- "io"
"math"
"math/bits"
- "runtime"
- "sync"
)
// Encode returns the encoded form of src. The returned slice may be a sub-
@@ -355,9 +349,12 @@ const inputMargin = 8
// will be accepted by the encoder.
const minNonLiteralBlockSize = 32
+const intReduction = 2 - (1 << (^uint(0) >> 63)) // 1 (32 bits) or 0 (64 bits)
+
// MaxBlockSize is the maximum value where MaxEncodedLen will return a valid block size.
// Blocks this big are highly discouraged, though.
-const MaxBlockSize = math.MaxUint32 - binary.MaxVarintLen32 - 5
+// Half the size on 32 bit systems.
+const MaxBlockSize = (1<<(32-intReduction) - 1) - binary.MaxVarintLen32 - 5
// MaxEncodedLen returns the maximum length of a snappy block, given its
// uncompressed length.
@@ -366,7 +363,14 @@ const MaxBlockSize = math.MaxUint32 - binary.MaxVarintLen32 - 5
// 32 bit platforms will have lower thresholds for rejecting big content.
func MaxEncodedLen(srcLen int) int {
n := uint64(srcLen)
- if n > 0xffffffff {
+ if intReduction == 1 {
+ // 32 bits
+ if n > math.MaxInt32 {
+ // Also includes negative.
+ return -1
+ }
+ } else if n > 0xffffffff {
+ // 64 bits
// Also includes negative.
return -1
}
@@ -375,1009 +379,15 @@ func MaxEncodedLen(srcLen int) int {
// Add maximum size of encoding block as literals.
n += uint64(literalExtraSize(int64(srcLen)))
- if n > 0xffffffff {
+ if intReduction == 1 {
+ // 32 bits
+ if n > math.MaxInt32 {
+ return -1
+ }
+ } else if n > 0xffffffff {
+ // 64 bits
+ // Also includes negative.
return -1
}
return int(n)
}
-
-var errClosed = errors.New("s2: Writer is closed")
-
-// NewWriter returns a new Writer that compresses to w, using the
-// framing format described at
-// https://github.com/google/snappy/blob/master/framing_format.txt
-//
-// Users must call Close to guarantee all data has been forwarded to
-// the underlying io.Writer and that resources are released.
-// They may also call Flush zero or more times before calling Close.
-func NewWriter(w io.Writer, opts ...WriterOption) *Writer {
- w2 := Writer{
- blockSize: defaultBlockSize,
- concurrency: runtime.GOMAXPROCS(0),
- randSrc: rand.Reader,
- level: levelFast,
- }
- for _, opt := range opts {
- if err := opt(&w2); err != nil {
- w2.errState = err
- return &w2
- }
- }
- w2.obufLen = obufHeaderLen + MaxEncodedLen(w2.blockSize)
- w2.paramsOK = true
- w2.ibuf = make([]byte, 0, w2.blockSize)
- w2.buffers.New = func() interface{} {
- return make([]byte, w2.obufLen)
- }
- w2.Reset(w)
- return &w2
-}
-
-// Writer is an io.Writer that can write Snappy-compressed bytes.
-type Writer struct {
- errMu sync.Mutex
- errState error
-
- // ibuf is a buffer for the incoming (uncompressed) bytes.
- ibuf []byte
-
- blockSize int
- obufLen int
- concurrency int
- written int64
- uncompWritten int64 // Bytes sent to compression
- output chan chan result
- buffers sync.Pool
- pad int
-
- writer io.Writer
- randSrc io.Reader
- writerWg sync.WaitGroup
- index Index
- customEnc func(dst, src []byte) int
-
- // wroteStreamHeader is whether we have written the stream header.
- wroteStreamHeader bool
- paramsOK bool
- snappy bool
- flushOnWrite bool
- appendIndex bool
- level uint8
-}
-
-const (
- levelUncompressed = iota + 1
- levelFast
- levelBetter
- levelBest
-)
-
-type result struct {
- b []byte
- // Uncompressed start offset
- startOffset int64
-}
-
-// err returns the previously set error.
-// If no error has been set it is set to err if not nil.
-func (w *Writer) err(err error) error {
- w.errMu.Lock()
- errSet := w.errState
- if errSet == nil && err != nil {
- w.errState = err
- errSet = err
- }
- w.errMu.Unlock()
- return errSet
-}
-
-// Reset discards the writer's state and switches the Snappy writer to write to w.
-// This permits reusing a Writer rather than allocating a new one.
-func (w *Writer) Reset(writer io.Writer) {
- if !w.paramsOK {
- return
- }
- // Close previous writer, if any.
- if w.output != nil {
- close(w.output)
- w.writerWg.Wait()
- w.output = nil
- }
- w.errState = nil
- w.ibuf = w.ibuf[:0]
- w.wroteStreamHeader = false
- w.written = 0
- w.writer = writer
- w.uncompWritten = 0
- w.index.reset(w.blockSize)
-
- // If we didn't get a writer, stop here.
- if writer == nil {
- return
- }
- // If no concurrency requested, don't spin up writer goroutine.
- if w.concurrency == 1 {
- return
- }
-
- toWrite := make(chan chan result, w.concurrency)
- w.output = toWrite
- w.writerWg.Add(1)
-
- // Start a writer goroutine that will write all output in order.
- go func() {
- defer w.writerWg.Done()
-
- // Get a queued write.
- for write := range toWrite {
- // Wait for the data to be available.
- input := <-write
- in := input.b
- if len(in) > 0 {
- if w.err(nil) == nil {
- // Don't expose data from previous buffers.
- toWrite := in[:len(in):len(in)]
- // Write to output.
- n, err := writer.Write(toWrite)
- if err == nil && n != len(toWrite) {
- err = io.ErrShortBuffer
- }
- _ = w.err(err)
- w.err(w.index.add(w.written, input.startOffset))
- w.written += int64(n)
- }
- }
- if cap(in) >= w.obufLen {
- w.buffers.Put(in)
- }
- // close the incoming write request.
- // This can be used for synchronizing flushes.
- close(write)
- }
- }()
-}
-
-// Write satisfies the io.Writer interface.
-func (w *Writer) Write(p []byte) (nRet int, errRet error) {
- if err := w.err(nil); err != nil {
- return 0, err
- }
- if w.flushOnWrite {
- return w.write(p)
- }
- // If we exceed the input buffer size, start writing
- for len(p) > (cap(w.ibuf)-len(w.ibuf)) && w.err(nil) == nil {
- var n int
- if len(w.ibuf) == 0 {
- // Large write, empty buffer.
- // Write directly from p to avoid copy.
- n, _ = w.write(p)
- } else {
- n = copy(w.ibuf[len(w.ibuf):cap(w.ibuf)], p)
- w.ibuf = w.ibuf[:len(w.ibuf)+n]
- w.write(w.ibuf)
- w.ibuf = w.ibuf[:0]
- }
- nRet += n
- p = p[n:]
- }
- if err := w.err(nil); err != nil {
- return nRet, err
- }
- // p should always be able to fit into w.ibuf now.
- n := copy(w.ibuf[len(w.ibuf):cap(w.ibuf)], p)
- w.ibuf = w.ibuf[:len(w.ibuf)+n]
- nRet += n
- return nRet, nil
-}
-
-// ReadFrom implements the io.ReaderFrom interface.
-// Using this is typically more efficient since it avoids a memory copy.
-// ReadFrom reads data from r until EOF or error.
-// The return value n is the number of bytes read.
-// Any error except io.EOF encountered during the read is also returned.
-func (w *Writer) ReadFrom(r io.Reader) (n int64, err error) {
- if err := w.err(nil); err != nil {
- return 0, err
- }
- if len(w.ibuf) > 0 {
- err := w.Flush()
- if err != nil {
- return 0, err
- }
- }
- if br, ok := r.(byter); ok {
- buf := br.Bytes()
- if err := w.EncodeBuffer(buf); err != nil {
- return 0, err
- }
- return int64(len(buf)), w.Flush()
- }
- for {
- inbuf := w.buffers.Get().([]byte)[:w.blockSize+obufHeaderLen]
- n2, err := io.ReadFull(r, inbuf[obufHeaderLen:])
- if err != nil {
- if err == io.ErrUnexpectedEOF {
- err = io.EOF
- }
- if err != io.EOF {
- return n, w.err(err)
- }
- }
- if n2 == 0 {
- break
- }
- n += int64(n2)
- err2 := w.writeFull(inbuf[:n2+obufHeaderLen])
- if w.err(err2) != nil {
- break
- }
-
- if err != nil {
- // We got EOF and wrote everything
- break
- }
- }
-
- return n, w.err(nil)
-}
-
-// AddSkippableBlock will add a skippable block to the stream.
-// The ID must be 0x80-0xfe (inclusive).
-// Length of the skippable block must be <= 16777215 bytes.
-func (w *Writer) AddSkippableBlock(id uint8, data []byte) (err error) {
- if err := w.err(nil); err != nil {
- return err
- }
- if len(data) == 0 {
- return nil
- }
- if id < 0x80 || id > chunkTypePadding {
- return fmt.Errorf("invalid skippable block id %x", id)
- }
- if len(data) > maxChunkSize {
- return fmt.Errorf("skippable block excessed maximum size")
- }
- var header [4]byte
- chunkLen := 4 + len(data)
- header[0] = id
- header[1] = uint8(chunkLen >> 0)
- header[2] = uint8(chunkLen >> 8)
- header[3] = uint8(chunkLen >> 16)
- if w.concurrency == 1 {
- write := func(b []byte) error {
- n, err := w.writer.Write(b)
- if err = w.err(err); err != nil {
- return err
- }
- if n != len(data) {
- return w.err(io.ErrShortWrite)
- }
- w.written += int64(n)
- return w.err(nil)
- }
- if !w.wroteStreamHeader {
- w.wroteStreamHeader = true
- if w.snappy {
- if err := write([]byte(magicChunkSnappy)); err != nil {
- return err
- }
- } else {
- if err := write([]byte(magicChunk)); err != nil {
- return err
- }
- }
- }
- if err := write(header[:]); err != nil {
- return err
- }
- if err := write(data); err != nil {
- return err
- }
- }
-
- // Create output...
- if !w.wroteStreamHeader {
- w.wroteStreamHeader = true
- hWriter := make(chan result)
- w.output <- hWriter
- if w.snappy {
- hWriter <- result{startOffset: w.uncompWritten, b: []byte(magicChunkSnappy)}
- } else {
- hWriter <- result{startOffset: w.uncompWritten, b: []byte(magicChunk)}
- }
- }
-
- // Copy input.
- inbuf := w.buffers.Get().([]byte)[:4]
- copy(inbuf, header[:])
- inbuf = append(inbuf, data...)
-
- output := make(chan result, 1)
- // Queue output.
- w.output <- output
- output <- result{startOffset: w.uncompWritten, b: inbuf}
-
- return nil
-}
-
-// EncodeBuffer will add a buffer to the stream.
-// This is the fastest way to encode a stream,
-// but the input buffer cannot be written to by the caller
-// until Flush or Close has been called when concurrency != 1.
-//
-// If you cannot control that, use the regular Write function.
-//
-// Note that input is not buffered.
-// This means that each write will result in discrete blocks being created.
-// For buffered writes, use the regular Write function.
-func (w *Writer) EncodeBuffer(buf []byte) (err error) {
- if err := w.err(nil); err != nil {
- return err
- }
-
- if w.flushOnWrite {
- _, err := w.write(buf)
- return err
- }
- // Flush queued data first.
- if len(w.ibuf) > 0 {
- err := w.Flush()
- if err != nil {
- return err
- }
- }
- if w.concurrency == 1 {
- _, err := w.writeSync(buf)
- return err
- }
-
- // Spawn goroutine and write block to output channel.
- if !w.wroteStreamHeader {
- w.wroteStreamHeader = true
- hWriter := make(chan result)
- w.output <- hWriter
- if w.snappy {
- hWriter <- result{startOffset: w.uncompWritten, b: []byte(magicChunkSnappy)}
- } else {
- hWriter <- result{startOffset: w.uncompWritten, b: []byte(magicChunk)}
- }
- }
-
- for len(buf) > 0 {
- // Cut input.
- uncompressed := buf
- if len(uncompressed) > w.blockSize {
- uncompressed = uncompressed[:w.blockSize]
- }
- buf = buf[len(uncompressed):]
- // Get an output buffer.
- obuf := w.buffers.Get().([]byte)[:len(uncompressed)+obufHeaderLen]
- output := make(chan result)
- // Queue output now, so we keep order.
- w.output <- output
- res := result{
- startOffset: w.uncompWritten,
- }
- w.uncompWritten += int64(len(uncompressed))
- go func() {
- checksum := crc(uncompressed)
-
- // Set to uncompressed.
- chunkType := uint8(chunkTypeUncompressedData)
- chunkLen := 4 + len(uncompressed)
-
- // Attempt compressing.
- n := binary.PutUvarint(obuf[obufHeaderLen:], uint64(len(uncompressed)))
- n2 := w.encodeBlock(obuf[obufHeaderLen+n:], uncompressed)
-
- // Check if we should use this, or store as uncompressed instead.
- if n2 > 0 {
- chunkType = uint8(chunkTypeCompressedData)
- chunkLen = 4 + n + n2
- obuf = obuf[:obufHeaderLen+n+n2]
- } else {
- // copy uncompressed
- copy(obuf[obufHeaderLen:], uncompressed)
- }
-
- // Fill in the per-chunk header that comes before the body.
- obuf[0] = chunkType
- obuf[1] = uint8(chunkLen >> 0)
- obuf[2] = uint8(chunkLen >> 8)
- obuf[3] = uint8(chunkLen >> 16)
- obuf[4] = uint8(checksum >> 0)
- obuf[5] = uint8(checksum >> 8)
- obuf[6] = uint8(checksum >> 16)
- obuf[7] = uint8(checksum >> 24)
-
- // Queue final output.
- res.b = obuf
- output <- res
- }()
- }
- return nil
-}
-
-func (w *Writer) encodeBlock(obuf, uncompressed []byte) int {
- if w.customEnc != nil {
- return w.customEnc(obuf, uncompressed)
- }
- if w.snappy {
- switch w.level {
- case levelFast:
- return encodeBlockSnappy(obuf, uncompressed)
- case levelBetter:
- return encodeBlockBetterSnappy(obuf, uncompressed)
- case levelBest:
- return encodeBlockBestSnappy(obuf, uncompressed)
- }
- return 0
- }
- switch w.level {
- case levelFast:
- return encodeBlock(obuf, uncompressed)
- case levelBetter:
- return encodeBlockBetter(obuf, uncompressed)
- case levelBest:
- return encodeBlockBest(obuf, uncompressed, nil)
- }
- return 0
-}
-
-func (w *Writer) write(p []byte) (nRet int, errRet error) {
- if err := w.err(nil); err != nil {
- return 0, err
- }
- if w.concurrency == 1 {
- return w.writeSync(p)
- }
-
- // Spawn goroutine and write block to output channel.
- for len(p) > 0 {
- if !w.wroteStreamHeader {
- w.wroteStreamHeader = true
- hWriter := make(chan result)
- w.output <- hWriter
- if w.snappy {
- hWriter <- result{startOffset: w.uncompWritten, b: []byte(magicChunkSnappy)}
- } else {
- hWriter <- result{startOffset: w.uncompWritten, b: []byte(magicChunk)}
- }
- }
-
- var uncompressed []byte
- if len(p) > w.blockSize {
- uncompressed, p = p[:w.blockSize], p[w.blockSize:]
- } else {
- uncompressed, p = p, nil
- }
-
- // Copy input.
- // If the block is incompressible, this is used for the result.
- inbuf := w.buffers.Get().([]byte)[:len(uncompressed)+obufHeaderLen]
- obuf := w.buffers.Get().([]byte)[:w.obufLen]
- copy(inbuf[obufHeaderLen:], uncompressed)
- uncompressed = inbuf[obufHeaderLen:]
-
- output := make(chan result)
- // Queue output now, so we keep order.
- w.output <- output
- res := result{
- startOffset: w.uncompWritten,
- }
- w.uncompWritten += int64(len(uncompressed))
-
- go func() {
- checksum := crc(uncompressed)
-
- // Set to uncompressed.
- chunkType := uint8(chunkTypeUncompressedData)
- chunkLen := 4 + len(uncompressed)
-
- // Attempt compressing.
- n := binary.PutUvarint(obuf[obufHeaderLen:], uint64(len(uncompressed)))
- n2 := w.encodeBlock(obuf[obufHeaderLen+n:], uncompressed)
-
- // Check if we should use this, or store as uncompressed instead.
- if n2 > 0 {
- chunkType = uint8(chunkTypeCompressedData)
- chunkLen = 4 + n + n2
- obuf = obuf[:obufHeaderLen+n+n2]
- } else {
- // Use input as output.
- obuf, inbuf = inbuf, obuf
- }
-
- // Fill in the per-chunk header that comes before the body.
- obuf[0] = chunkType
- obuf[1] = uint8(chunkLen >> 0)
- obuf[2] = uint8(chunkLen >> 8)
- obuf[3] = uint8(chunkLen >> 16)
- obuf[4] = uint8(checksum >> 0)
- obuf[5] = uint8(checksum >> 8)
- obuf[6] = uint8(checksum >> 16)
- obuf[7] = uint8(checksum >> 24)
-
- // Queue final output.
- res.b = obuf
- output <- res
-
- // Put unused buffer back in pool.
- w.buffers.Put(inbuf)
- }()
- nRet += len(uncompressed)
- }
- return nRet, nil
-}
-
-// writeFull is a special version of write that will always write the full buffer.
-// Data to be compressed should start at offset obufHeaderLen and fill the remainder of the buffer.
-// The data will be written as a single block.
-// The caller is not allowed to use inbuf after this function has been called.
-func (w *Writer) writeFull(inbuf []byte) (errRet error) {
- if err := w.err(nil); err != nil {
- return err
- }
-
- if w.concurrency == 1 {
- _, err := w.writeSync(inbuf[obufHeaderLen:])
- return err
- }
-
- // Spawn goroutine and write block to output channel.
- if !w.wroteStreamHeader {
- w.wroteStreamHeader = true
- hWriter := make(chan result)
- w.output <- hWriter
- if w.snappy {
- hWriter <- result{startOffset: w.uncompWritten, b: []byte(magicChunkSnappy)}
- } else {
- hWriter <- result{startOffset: w.uncompWritten, b: []byte(magicChunk)}
- }
- }
-
- // Get an output buffer.
- obuf := w.buffers.Get().([]byte)[:w.obufLen]
- uncompressed := inbuf[obufHeaderLen:]
-
- output := make(chan result)
- // Queue output now, so we keep order.
- w.output <- output
- res := result{
- startOffset: w.uncompWritten,
- }
- w.uncompWritten += int64(len(uncompressed))
-
- go func() {
- checksum := crc(uncompressed)
-
- // Set to uncompressed.
- chunkType := uint8(chunkTypeUncompressedData)
- chunkLen := 4 + len(uncompressed)
-
- // Attempt compressing.
- n := binary.PutUvarint(obuf[obufHeaderLen:], uint64(len(uncompressed)))
- n2 := w.encodeBlock(obuf[obufHeaderLen+n:], uncompressed)
-
- // Check if we should use this, or store as uncompressed instead.
- if n2 > 0 {
- chunkType = uint8(chunkTypeCompressedData)
- chunkLen = 4 + n + n2
- obuf = obuf[:obufHeaderLen+n+n2]
- } else {
- // Use input as output.
- obuf, inbuf = inbuf, obuf
- }
-
- // Fill in the per-chunk header that comes before the body.
- obuf[0] = chunkType
- obuf[1] = uint8(chunkLen >> 0)
- obuf[2] = uint8(chunkLen >> 8)
- obuf[3] = uint8(chunkLen >> 16)
- obuf[4] = uint8(checksum >> 0)
- obuf[5] = uint8(checksum >> 8)
- obuf[6] = uint8(checksum >> 16)
- obuf[7] = uint8(checksum >> 24)
-
- // Queue final output.
- res.b = obuf
- output <- res
-
- // Put unused buffer back in pool.
- w.buffers.Put(inbuf)
- }()
- return nil
-}
-
-func (w *Writer) writeSync(p []byte) (nRet int, errRet error) {
- if err := w.err(nil); err != nil {
- return 0, err
- }
- if !w.wroteStreamHeader {
- w.wroteStreamHeader = true
- var n int
- var err error
- if w.snappy {
- n, err = w.writer.Write([]byte(magicChunkSnappy))
- } else {
- n, err = w.writer.Write([]byte(magicChunk))
- }
- if err != nil {
- return 0, w.err(err)
- }
- if n != len(magicChunk) {
- return 0, w.err(io.ErrShortWrite)
- }
- w.written += int64(n)
- }
-
- for len(p) > 0 {
- var uncompressed []byte
- if len(p) > w.blockSize {
- uncompressed, p = p[:w.blockSize], p[w.blockSize:]
- } else {
- uncompressed, p = p, nil
- }
-
- obuf := w.buffers.Get().([]byte)[:w.obufLen]
- checksum := crc(uncompressed)
-
- // Set to uncompressed.
- chunkType := uint8(chunkTypeUncompressedData)
- chunkLen := 4 + len(uncompressed)
-
- // Attempt compressing.
- n := binary.PutUvarint(obuf[obufHeaderLen:], uint64(len(uncompressed)))
- n2 := w.encodeBlock(obuf[obufHeaderLen+n:], uncompressed)
-
- if n2 > 0 {
- chunkType = uint8(chunkTypeCompressedData)
- chunkLen = 4 + n + n2
- obuf = obuf[:obufHeaderLen+n+n2]
- } else {
- obuf = obuf[:8]
- }
-
- // Fill in the per-chunk header that comes before the body.
- obuf[0] = chunkType
- obuf[1] = uint8(chunkLen >> 0)
- obuf[2] = uint8(chunkLen >> 8)
- obuf[3] = uint8(chunkLen >> 16)
- obuf[4] = uint8(checksum >> 0)
- obuf[5] = uint8(checksum >> 8)
- obuf[6] = uint8(checksum >> 16)
- obuf[7] = uint8(checksum >> 24)
-
- n, err := w.writer.Write(obuf)
- if err != nil {
- return 0, w.err(err)
- }
- if n != len(obuf) {
- return 0, w.err(io.ErrShortWrite)
- }
- w.err(w.index.add(w.written, w.uncompWritten))
- w.written += int64(n)
- w.uncompWritten += int64(len(uncompressed))
-
- if chunkType == chunkTypeUncompressedData {
- // Write uncompressed data.
- n, err := w.writer.Write(uncompressed)
- if err != nil {
- return 0, w.err(err)
- }
- if n != len(uncompressed) {
- return 0, w.err(io.ErrShortWrite)
- }
- w.written += int64(n)
- }
- w.buffers.Put(obuf)
- // Queue final output.
- nRet += len(uncompressed)
- }
- return nRet, nil
-}
-
-// Flush flushes the Writer to its underlying io.Writer.
-// This does not apply padding.
-func (w *Writer) Flush() error {
- if err := w.err(nil); err != nil {
- return err
- }
-
- // Queue any data still in input buffer.
- if len(w.ibuf) != 0 {
- if !w.wroteStreamHeader {
- _, err := w.writeSync(w.ibuf)
- w.ibuf = w.ibuf[:0]
- return w.err(err)
- } else {
- _, err := w.write(w.ibuf)
- w.ibuf = w.ibuf[:0]
- err = w.err(err)
- if err != nil {
- return err
- }
- }
- }
- if w.output == nil {
- return w.err(nil)
- }
-
- // Send empty buffer
- res := make(chan result)
- w.output <- res
- // Block until this has been picked up.
- res <- result{b: nil, startOffset: w.uncompWritten}
- // When it is closed, we have flushed.
- <-res
- return w.err(nil)
-}
-
-// Close calls Flush and then closes the Writer.
-// Calling Close multiple times is ok,
-// but calling CloseIndex after this will make it not return the index.
-func (w *Writer) Close() error {
- _, err := w.closeIndex(w.appendIndex)
- return err
-}
-
-// CloseIndex calls Close and returns an index on first call.
-// This is not required if you are only adding index to a stream.
-func (w *Writer) CloseIndex() ([]byte, error) {
- return w.closeIndex(true)
-}
-
-func (w *Writer) closeIndex(idx bool) ([]byte, error) {
- err := w.Flush()
- if w.output != nil {
- close(w.output)
- w.writerWg.Wait()
- w.output = nil
- }
-
- var index []byte
- if w.err(nil) == nil && w.writer != nil {
- // Create index.
- if idx {
- compSize := int64(-1)
- if w.pad <= 1 {
- compSize = w.written
- }
- index = w.index.appendTo(w.ibuf[:0], w.uncompWritten, compSize)
- // Count as written for padding.
- if w.appendIndex {
- w.written += int64(len(index))
- }
- }
-
- if w.pad > 1 {
- tmp := w.ibuf[:0]
- if len(index) > 0 {
- // Allocate another buffer.
- tmp = w.buffers.Get().([]byte)[:0]
- defer w.buffers.Put(tmp)
- }
- add := calcSkippableFrame(w.written, int64(w.pad))
- frame, err := skippableFrame(tmp, add, w.randSrc)
- if err = w.err(err); err != nil {
- return nil, err
- }
- n, err2 := w.writer.Write(frame)
- if err2 == nil && n != len(frame) {
- err2 = io.ErrShortWrite
- }
- _ = w.err(err2)
- }
- if len(index) > 0 && w.appendIndex {
- n, err2 := w.writer.Write(index)
- if err2 == nil && n != len(index) {
- err2 = io.ErrShortWrite
- }
- _ = w.err(err2)
- }
- }
- err = w.err(errClosed)
- if err == errClosed {
- return index, nil
- }
- return nil, err
-}
-
-// calcSkippableFrame will return a total size to be added for written
-// to be divisible by multiple.
-// The value will always be > skippableFrameHeader.
-// The function will panic if written < 0 or wantMultiple <= 0.
-func calcSkippableFrame(written, wantMultiple int64) int {
- if wantMultiple <= 0 {
- panic("wantMultiple <= 0")
- }
- if written < 0 {
- panic("written < 0")
- }
- leftOver := written % wantMultiple
- if leftOver == 0 {
- return 0
- }
- toAdd := wantMultiple - leftOver
- for toAdd < skippableFrameHeader {
- toAdd += wantMultiple
- }
- return int(toAdd)
-}
-
-// skippableFrame will add a skippable frame with a total size of bytes.
-// total should be >= skippableFrameHeader and < maxBlockSize + skippableFrameHeader
-func skippableFrame(dst []byte, total int, r io.Reader) ([]byte, error) {
- if total == 0 {
- return dst, nil
- }
- if total < skippableFrameHeader {
- return dst, fmt.Errorf("s2: requested skippable frame (%d) < 4", total)
- }
- if int64(total) >= maxBlockSize+skippableFrameHeader {
- return dst, fmt.Errorf("s2: requested skippable frame (%d) >= max 1<<24", total)
- }
- // Chunk type 0xfe "Section 4.4 Padding (chunk type 0xfe)"
- dst = append(dst, chunkTypePadding)
- f := uint32(total - skippableFrameHeader)
- // Add chunk length.
- dst = append(dst, uint8(f), uint8(f>>8), uint8(f>>16))
- // Add data
- start := len(dst)
- dst = append(dst, make([]byte, f)...)
- _, err := io.ReadFull(r, dst[start:])
- return dst, err
-}
-
-// WriterOption is an option for creating a encoder.
-type WriterOption func(*Writer) error
-
-// WriterConcurrency will set the concurrency,
-// meaning the maximum number of decoders to run concurrently.
-// The value supplied must be at least 1.
-// By default this will be set to GOMAXPROCS.
-func WriterConcurrency(n int) WriterOption {
- return func(w *Writer) error {
- if n <= 0 {
- return errors.New("concurrency must be at least 1")
- }
- w.concurrency = n
- return nil
- }
-}
-
-// WriterAddIndex will append an index to the end of a stream
-// when it is closed.
-func WriterAddIndex() WriterOption {
- return func(w *Writer) error {
- w.appendIndex = true
- return nil
- }
-}
-
-// WriterBetterCompression will enable better compression.
-// EncodeBetter compresses better than Encode but typically with a
-// 10-40% speed decrease on both compression and decompression.
-func WriterBetterCompression() WriterOption {
- return func(w *Writer) error {
- w.level = levelBetter
- return nil
- }
-}
-
-// WriterBestCompression will enable better compression.
-// EncodeBetter compresses better than Encode but typically with a
-// big speed decrease on compression.
-func WriterBestCompression() WriterOption {
- return func(w *Writer) error {
- w.level = levelBest
- return nil
- }
-}
-
-// WriterUncompressed will bypass compression.
-// The stream will be written as uncompressed blocks only.
-// If concurrency is > 1 CRC and output will still be done async.
-func WriterUncompressed() WriterOption {
- return func(w *Writer) error {
- w.level = levelUncompressed
- return nil
- }
-}
-
-// WriterBlockSize allows to override the default block size.
-// Blocks will be this size or smaller.
-// Minimum size is 4KB and and maximum size is 4MB.
-//
-// Bigger blocks may give bigger throughput on systems with many cores,
-// and will increase compression slightly, but it will limit the possible
-// concurrency for smaller payloads for both encoding and decoding.
-// Default block size is 1MB.
-//
-// When writing Snappy compatible output using WriterSnappyCompat,
-// the maximum block size is 64KB.
-func WriterBlockSize(n int) WriterOption {
- return func(w *Writer) error {
- if w.snappy && n > maxSnappyBlockSize || n < minBlockSize {
- return errors.New("s2: block size too large. Must be <= 64K and >=4KB on for snappy compatible output")
- }
- if n > maxBlockSize || n < minBlockSize {
- return errors.New("s2: block size too large. Must be <= 4MB and >=4KB")
- }
- w.blockSize = n
- return nil
- }
-}
-
-// WriterPadding will add padding to all output so the size will be a multiple of n.
-// This can be used to obfuscate the exact output size or make blocks of a certain size.
-// The contents will be a skippable frame, so it will be invisible by the decoder.
-// n must be > 0 and <= 4MB.
-// The padded area will be filled with data from crypto/rand.Reader.
-// The padding will be applied whenever Close is called on the writer.
-func WriterPadding(n int) WriterOption {
- return func(w *Writer) error {
- if n <= 0 {
- return fmt.Errorf("s2: padding must be at least 1")
- }
- // No need to waste our time.
- if n == 1 {
- w.pad = 0
- }
- if n > maxBlockSize {
- return fmt.Errorf("s2: padding must less than 4MB")
- }
- w.pad = n
- return nil
- }
-}
-
-// WriterPaddingSrc will get random data for padding from the supplied source.
-// By default crypto/rand is used.
-func WriterPaddingSrc(reader io.Reader) WriterOption {
- return func(w *Writer) error {
- w.randSrc = reader
- return nil
- }
-}
-
-// WriterSnappyCompat will write snappy compatible output.
-// The output can be decompressed using either snappy or s2.
-// If block size is more than 64KB it is set to that.
-func WriterSnappyCompat() WriterOption {
- return func(w *Writer) error {
- w.snappy = true
- if w.blockSize > 64<<10 {
- // We choose 8 bytes less than 64K, since that will make literal emits slightly more effective.
- // And allows us to skip some size checks.
- w.blockSize = (64 << 10) - 8
- }
- return nil
- }
-}
-
-// WriterFlushOnWrite will compress blocks on each call to the Write function.
-//
-// This is quite inefficient as blocks size will depend on the write size.
-//
-// Use WriterConcurrency(1) to also make sure that output is flushed.
-// When Write calls return, otherwise they will be written when compression is done.
-func WriterFlushOnWrite() WriterOption {
- return func(w *Writer) error {
- w.flushOnWrite = true
- return nil
- }
-}
-
-// WriterCustomEncoder allows to override the encoder for blocks on the stream.
-// The function must compress 'src' into 'dst' and return the bytes used in dst as an integer.
-// Block size (initial varint) should not be added by the encoder.
-// Returning value 0 indicates the block could not be compressed.
-// The function should expect to be called concurrently.
-func WriterCustomEncoder(fn func(dst, src []byte) int) WriterOption {
- return func(w *Writer) error {
- w.customEnc = fn
- return nil
- }
-}