summaryrefslogtreecommitdiff
path: root/vendor/google.golang.org/grpc/internal/transport/http_util.go
diff options
context:
space:
mode:
Diffstat (limited to 'vendor/google.golang.org/grpc/internal/transport/http_util.go')
-rw-r--r--vendor/google.golang.org/grpc/internal/transport/http_util.go59
1 files changed, 53 insertions, 6 deletions
diff --git a/vendor/google.golang.org/grpc/internal/transport/http_util.go b/vendor/google.golang.org/grpc/internal/transport/http_util.go
index 19cbb18f5..195814008 100644
--- a/vendor/google.golang.org/grpc/internal/transport/http_util.go
+++ b/vendor/google.golang.org/grpc/internal/transport/http_util.go
@@ -30,6 +30,7 @@ import (
"net/url"
"strconv"
"strings"
+ "sync"
"time"
"unicode/utf8"
@@ -309,6 +310,7 @@ func decodeGrpcMessageUnchecked(msg string) string {
}
type bufWriter struct {
+ pool *sync.Pool
buf []byte
offset int
batchSize int
@@ -316,12 +318,17 @@ type bufWriter struct {
err error
}
-func newBufWriter(conn net.Conn, batchSize int) *bufWriter {
- return &bufWriter{
- buf: make([]byte, batchSize*2),
+func newBufWriter(conn net.Conn, batchSize int, pool *sync.Pool) *bufWriter {
+ w := &bufWriter{
batchSize: batchSize,
conn: conn,
+ pool: pool,
}
+ // this indicates that we should use non shared buf
+ if pool == nil {
+ w.buf = make([]byte, batchSize)
+ }
+ return w
}
func (w *bufWriter) Write(b []byte) (n int, err error) {
@@ -332,19 +339,34 @@ func (w *bufWriter) Write(b []byte) (n int, err error) {
n, err = w.conn.Write(b)
return n, toIOError(err)
}
+ if w.buf == nil {
+ b := w.pool.Get().(*[]byte)
+ w.buf = *b
+ }
for len(b) > 0 {
nn := copy(w.buf[w.offset:], b)
b = b[nn:]
w.offset += nn
n += nn
if w.offset >= w.batchSize {
- err = w.Flush()
+ err = w.flushKeepBuffer()
}
}
return n, err
}
func (w *bufWriter) Flush() error {
+ err := w.flushKeepBuffer()
+ // Only release the buffer if we are in a "shared" mode
+ if w.buf != nil && w.pool != nil {
+ b := w.buf
+ w.pool.Put(&b)
+ w.buf = nil
+ }
+ return err
+}
+
+func (w *bufWriter) flushKeepBuffer() error {
if w.err != nil {
return w.err
}
@@ -381,7 +403,10 @@ type framer struct {
fr *http2.Framer
}
-func newFramer(conn net.Conn, writeBufferSize, readBufferSize int, maxHeaderListSize uint32) *framer {
+var writeBufferPoolMap map[int]*sync.Pool = make(map[int]*sync.Pool)
+var writeBufferMutex sync.Mutex
+
+func newFramer(conn net.Conn, writeBufferSize, readBufferSize int, sharedWriteBuffer bool, maxHeaderListSize uint32) *framer {
if writeBufferSize < 0 {
writeBufferSize = 0
}
@@ -389,7 +414,11 @@ func newFramer(conn net.Conn, writeBufferSize, readBufferSize int, maxHeaderList
if readBufferSize > 0 {
r = bufio.NewReaderSize(r, readBufferSize)
}
- w := newBufWriter(conn, writeBufferSize)
+ var pool *sync.Pool
+ if sharedWriteBuffer {
+ pool = getWriteBufferPool(writeBufferSize)
+ }
+ w := newBufWriter(conn, writeBufferSize, pool)
f := &framer{
writer: w,
fr: http2.NewFramer(w, r),
@@ -403,6 +432,24 @@ func newFramer(conn net.Conn, writeBufferSize, readBufferSize int, maxHeaderList
return f
}
+func getWriteBufferPool(writeBufferSize int) *sync.Pool {
+ writeBufferMutex.Lock()
+ defer writeBufferMutex.Unlock()
+ size := writeBufferSize * 2
+ pool, ok := writeBufferPoolMap[size]
+ if ok {
+ return pool
+ }
+ pool = &sync.Pool{
+ New: func() any {
+ b := make([]byte, size)
+ return &b
+ },
+ }
+ writeBufferPoolMap[size] = pool
+ return pool
+}
+
// parseDialTarget returns the network and address to pass to dialer.
func parseDialTarget(target string) (string, string) {
net := "tcp"