| 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
 | package fastcopy
import (
	"io"
	"sync"
	_ "unsafe" // link to io.errInvalidWrite.
)
var (
	// global pool instance.
	pool = CopyPool{size: 4096}
	//go:linkname errInvalidWrite io.errInvalidWrite
	errInvalidWrite error
)
// CopyPool provides a memory pool of byte
// buffers for io copies from readers to writers.
type CopyPool struct {
	size int
	pool sync.Pool
}
// See CopyPool.Buffer().
func Buffer(sz int) int {
	return pool.Buffer(sz)
}
// See CopyPool.CopyN().
func CopyN(dst io.Writer, src io.Reader, n int64) (int64, error) {
	return pool.CopyN(dst, src, n)
}
// See CopyPool.Copy().
func Copy(dst io.Writer, src io.Reader) (int64, error) {
	return pool.Copy(dst, src)
}
// Buffer sets the pool buffer size to allocate. Returns current size.
// Note this is NOT atomically safe, please call BEFORE other calls to CopyPool.
func (cp *CopyPool) Buffer(sz int) int {
	if sz > 0 {
		// update size
		cp.size = sz
	} else if cp.size < 1 {
		// default size
		return 4096
	}
	return cp.size
}
// CopyN performs the same logic as io.CopyN(), with the difference
// being that the byte buffer is acquired from a memory pool.
func (cp *CopyPool) CopyN(dst io.Writer, src io.Reader, n int64) (int64, error) {
	written, err := cp.Copy(dst, io.LimitReader(src, n))
	if written == n {
		return n, nil
	}
	if written < n && err == nil {
		// src stopped early; must have been EOF.
		err = io.EOF
	}
	return written, err
}
// Copy performs the same logic as io.Copy(), with the difference
// being that the byte buffer is acquired from a memory pool.
func (cp *CopyPool) Copy(dst io.Writer, src io.Reader) (int64, error) {
	// Prefer using io.WriterTo to do the copy (avoids alloc + copy)
	if wt, ok := src.(io.WriterTo); ok {
		return wt.WriteTo(dst)
	}
	// Prefer using io.ReaderFrom to do the copy.
	if rt, ok := dst.(io.ReaderFrom); ok {
		return rt.ReadFrom(src)
	}
	var buf []byte
	if b, ok := cp.pool.Get().([]byte); ok {
		// Acquired buf from pool
		buf = b
	} else {
		// Allocate new buffer of size
		buf = make([]byte, cp.Buffer(0))
	}
	// Defer release to pool
	defer cp.pool.Put(buf)
	var n int64
	for {
		// Perform next read into buf
		nr, err := src.Read(buf)
		if nr > 0 {
			// We error check AFTER checking
			// no. read bytes so incomplete
			// read still gets written up to nr.
			// Perform next write from buf
			nw, ew := dst.Write(buf[0:nr])
			// Check for valid write
			if nw < 0 || nr < nw {
				if ew == nil {
					ew = errInvalidWrite
				}
				return n, ew
			}
			// Incr total count
			n += int64(nw)
			// Check write error
			if ew != nil {
				return n, ew
			}
			// Check unequal read/writes
			if nr != nw {
				return n, io.ErrShortWrite
			}
		}
		// Return on err
		if err != nil {
			if err == io.EOF {
				err = nil // expected
			}
			return n, err
		}
	}
}
 |