summaryrefslogtreecommitdiff
path: root/vendor/google.golang.org/grpc/internal/transport
diff options
context:
space:
mode:
Diffstat (limited to 'vendor/google.golang.org/grpc/internal/transport')
-rw-r--r--vendor/google.golang.org/grpc/internal/transport/controlbuf.go16
-rw-r--r--vendor/google.golang.org/grpc/internal/transport/http2_client.go45
-rw-r--r--vendor/google.golang.org/grpc/internal/transport/http2_server.go6
-rw-r--r--vendor/google.golang.org/grpc/internal/transport/http_util.go59
-rw-r--r--vendor/google.golang.org/grpc/internal/transport/transport.go17
5 files changed, 92 insertions, 51 deletions
diff --git a/vendor/google.golang.org/grpc/internal/transport/controlbuf.go b/vendor/google.golang.org/grpc/internal/transport/controlbuf.go
index be5a9c81e..b330ccedc 100644
--- a/vendor/google.golang.org/grpc/internal/transport/controlbuf.go
+++ b/vendor/google.golang.org/grpc/internal/transport/controlbuf.go
@@ -40,7 +40,7 @@ var updateHeaderTblSize = func(e *hpack.Encoder, v uint32) {
}
type itemNode struct {
- it interface{}
+ it any
next *itemNode
}
@@ -49,7 +49,7 @@ type itemList struct {
tail *itemNode
}
-func (il *itemList) enqueue(i interface{}) {
+func (il *itemList) enqueue(i any) {
n := &itemNode{it: i}
if il.tail == nil {
il.head, il.tail = n, n
@@ -61,11 +61,11 @@ func (il *itemList) enqueue(i interface{}) {
// peek returns the first item in the list without removing it from the
// list.
-func (il *itemList) peek() interface{} {
+func (il *itemList) peek() any {
return il.head.it
}
-func (il *itemList) dequeue() interface{} {
+func (il *itemList) dequeue() any {
if il.head == nil {
return nil
}
@@ -336,7 +336,7 @@ func (c *controlBuffer) put(it cbItem) error {
return err
}
-func (c *controlBuffer) executeAndPut(f func(it interface{}) bool, it cbItem) (bool, error) {
+func (c *controlBuffer) executeAndPut(f func(it any) bool, it cbItem) (bool, error) {
var wakeUp bool
c.mu.Lock()
if c.err != nil {
@@ -373,7 +373,7 @@ func (c *controlBuffer) executeAndPut(f func(it interface{}) bool, it cbItem) (b
}
// Note argument f should never be nil.
-func (c *controlBuffer) execute(f func(it interface{}) bool, it interface{}) (bool, error) {
+func (c *controlBuffer) execute(f func(it any) bool, it any) (bool, error) {
c.mu.Lock()
if c.err != nil {
c.mu.Unlock()
@@ -387,7 +387,7 @@ func (c *controlBuffer) execute(f func(it interface{}) bool, it interface{}) (bo
return true, nil
}
-func (c *controlBuffer) get(block bool) (interface{}, error) {
+func (c *controlBuffer) get(block bool) (any, error) {
for {
c.mu.Lock()
if c.err != nil {
@@ -830,7 +830,7 @@ func (l *loopyWriter) goAwayHandler(g *goAway) error {
return nil
}
-func (l *loopyWriter) handle(i interface{}) error {
+func (l *loopyWriter) handle(i any) error {
switch i := i.(type) {
case *incomingWindowUpdate:
l.incomingWindowUpdateHandler(i)
diff --git a/vendor/google.golang.org/grpc/internal/transport/http2_client.go b/vendor/google.golang.org/grpc/internal/transport/http2_client.go
index 326bf0848..badab8acf 100644
--- a/vendor/google.golang.org/grpc/internal/transport/http2_client.go
+++ b/vendor/google.golang.org/grpc/internal/transport/http2_client.go
@@ -330,7 +330,7 @@ func newHTTP2Client(connectCtx, ctx context.Context, addr resolver.Address, opts
readerDone: make(chan struct{}),
writerDone: make(chan struct{}),
goAway: make(chan struct{}),
- framer: newFramer(conn, writeBufSize, readBufSize, maxHeaderListSize),
+ framer: newFramer(conn, writeBufSize, readBufSize, opts.SharedWriteBuffer, maxHeaderListSize),
fc: &trInFlow{limit: uint32(icwz)},
scheme: scheme,
activeStreams: make(map[uint32]*Stream),
@@ -762,7 +762,7 @@ func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (*Stream,
firstTry := true
var ch chan struct{}
transportDrainRequired := false
- checkForStreamQuota := func(it interface{}) bool {
+ checkForStreamQuota := func(it any) bool {
if t.streamQuota <= 0 { // Can go negative if server decreases it.
if firstTry {
t.waitingStreams++
@@ -800,7 +800,7 @@ func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (*Stream,
return true
}
var hdrListSizeErr error
- checkForHeaderListSize := func(it interface{}) bool {
+ checkForHeaderListSize := func(it any) bool {
if t.maxSendHeaderListSize == nil {
return true
}
@@ -815,7 +815,7 @@ func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (*Stream,
return true
}
for {
- success, err := t.controlBuf.executeAndPut(func(it interface{}) bool {
+ success, err := t.controlBuf.executeAndPut(func(it any) bool {
return checkForHeaderListSize(it) && checkForStreamQuota(it)
}, hdr)
if err != nil {
@@ -927,7 +927,7 @@ func (t *http2Client) closeStream(s *Stream, err error, rst bool, rstCode http2.
rst: rst,
rstCode: rstCode,
}
- addBackStreamQuota := func(interface{}) bool {
+ addBackStreamQuota := func(any) bool {
t.streamQuota++
if t.streamQuota > 0 && t.waitingStreams > 0 {
select {
@@ -1080,7 +1080,7 @@ func (t *http2Client) updateWindow(s *Stream, n uint32) {
// for the transport and the stream based on the current bdp
// estimation.
func (t *http2Client) updateFlowControl(n uint32) {
- updateIWS := func(interface{}) bool {
+ updateIWS := func(any) bool {
t.initialWindowSize = int32(n)
t.mu.Lock()
for _, s := range t.activeStreams {
@@ -1233,7 +1233,7 @@ func (t *http2Client) handleSettings(f *http2.SettingsFrame, isFirst bool) {
}
updateFuncs = append(updateFuncs, updateStreamQuota)
}
- t.controlBuf.executeAndPut(func(interface{}) bool {
+ t.controlBuf.executeAndPut(func(any) bool {
for _, f := range updateFuncs {
f()
}
@@ -1505,14 +1505,15 @@ func (t *http2Client) operateHeaders(frame *http2.MetaHeadersFrame) {
return
}
- isHeader := false
-
- // If headerChan hasn't been closed yet
- if atomic.CompareAndSwapUint32(&s.headerChanClosed, 0, 1) {
- s.headerValid = true
- if !endStream {
- // HEADERS frame block carries a Response-Headers.
- isHeader = true
+ // For headers, set them in s.header and close headerChan. For trailers or
+ // trailers-only, closeStream will set the trailers and close headerChan as
+ // needed.
+ if !endStream {
+ // If headerChan hasn't been closed yet (expected, given we checked it
+ // above, but something else could have potentially closed the whole
+ // stream).
+ if atomic.CompareAndSwapUint32(&s.headerChanClosed, 0, 1) {
+ s.headerValid = true
// These values can be set without any synchronization because
// stream goroutine will read it only after seeing a closed
// headerChan which we'll close after setting this.
@@ -1520,15 +1521,12 @@ func (t *http2Client) operateHeaders(frame *http2.MetaHeadersFrame) {
if len(mdata) > 0 {
s.header = mdata
}
- } else {
- // HEADERS frame block carries a Trailers-Only.
- s.noHeaders = true
+ close(s.headerChan)
}
- close(s.headerChan)
}
for _, sh := range t.statsHandlers {
- if isHeader {
+ if !endStream {
inHeader := &stats.InHeader{
Client: true,
WireLength: int(frame.Header().Length),
@@ -1554,9 +1552,10 @@ func (t *http2Client) operateHeaders(frame *http2.MetaHeadersFrame) {
statusGen = status.New(rawStatusCode, grpcMessage)
}
- // if client received END_STREAM from server while stream was still active, send RST_STREAM
- rst := s.getState() == streamActive
- t.closeStream(s, io.EOF, rst, http2.ErrCodeNo, statusGen, mdata, true)
+ // If client received END_STREAM from server while stream was still active,
+ // send RST_STREAM.
+ rstStream := s.getState() == streamActive
+ t.closeStream(s, io.EOF, rstStream, http2.ErrCodeNo, statusGen, mdata, true)
}
// readServerPreface reads and handles the initial settings frame from the
diff --git a/vendor/google.golang.org/grpc/internal/transport/http2_server.go b/vendor/google.golang.org/grpc/internal/transport/http2_server.go
index f96064012..8d3a353c1 100644
--- a/vendor/google.golang.org/grpc/internal/transport/http2_server.go
+++ b/vendor/google.golang.org/grpc/internal/transport/http2_server.go
@@ -165,7 +165,7 @@ func NewServerTransport(conn net.Conn, config *ServerConfig) (_ ServerTransport,
if config.MaxHeaderListSize != nil {
maxHeaderListSize = *config.MaxHeaderListSize
}
- framer := newFramer(conn, writeBufSize, readBufSize, maxHeaderListSize)
+ framer := newFramer(conn, writeBufSize, readBufSize, config.SharedWriteBuffer, maxHeaderListSize)
// Send initial settings as connection preface to client.
isettings := []http2.Setting{{
ID: http2.SettingMaxFrameSize,
@@ -855,7 +855,7 @@ func (t *http2Server) handleSettings(f *http2.SettingsFrame) {
}
return nil
})
- t.controlBuf.executeAndPut(func(interface{}) bool {
+ t.controlBuf.executeAndPut(func(any) bool {
for _, f := range updateFuncs {
f()
}
@@ -939,7 +939,7 @@ func appendHeaderFieldsFromMD(headerFields []hpack.HeaderField, md metadata.MD)
return headerFields
}
-func (t *http2Server) checkForHeaderListSize(it interface{}) bool {
+func (t *http2Server) checkForHeaderListSize(it any) bool {
if t.maxSendHeaderListSize == nil {
return true
}
diff --git a/vendor/google.golang.org/grpc/internal/transport/http_util.go b/vendor/google.golang.org/grpc/internal/transport/http_util.go
index 19cbb18f5..195814008 100644
--- a/vendor/google.golang.org/grpc/internal/transport/http_util.go
+++ b/vendor/google.golang.org/grpc/internal/transport/http_util.go
@@ -30,6 +30,7 @@ import (
"net/url"
"strconv"
"strings"
+ "sync"
"time"
"unicode/utf8"
@@ -309,6 +310,7 @@ func decodeGrpcMessageUnchecked(msg string) string {
}
type bufWriter struct {
+ pool *sync.Pool
buf []byte
offset int
batchSize int
@@ -316,12 +318,17 @@ type bufWriter struct {
err error
}
-func newBufWriter(conn net.Conn, batchSize int) *bufWriter {
- return &bufWriter{
- buf: make([]byte, batchSize*2),
+func newBufWriter(conn net.Conn, batchSize int, pool *sync.Pool) *bufWriter {
+ w := &bufWriter{
batchSize: batchSize,
conn: conn,
+ pool: pool,
}
+ // this indicates that we should use non shared buf
+ if pool == nil {
+ w.buf = make([]byte, batchSize)
+ }
+ return w
}
func (w *bufWriter) Write(b []byte) (n int, err error) {
@@ -332,19 +339,34 @@ func (w *bufWriter) Write(b []byte) (n int, err error) {
n, err = w.conn.Write(b)
return n, toIOError(err)
}
+ if w.buf == nil {
+ b := w.pool.Get().(*[]byte)
+ w.buf = *b
+ }
for len(b) > 0 {
nn := copy(w.buf[w.offset:], b)
b = b[nn:]
w.offset += nn
n += nn
if w.offset >= w.batchSize {
- err = w.Flush()
+ err = w.flushKeepBuffer()
}
}
return n, err
}
func (w *bufWriter) Flush() error {
+ err := w.flushKeepBuffer()
+ // Only release the buffer if we are in a "shared" mode
+ if w.buf != nil && w.pool != nil {
+ b := w.buf
+ w.pool.Put(&b)
+ w.buf = nil
+ }
+ return err
+}
+
+func (w *bufWriter) flushKeepBuffer() error {
if w.err != nil {
return w.err
}
@@ -381,7 +403,10 @@ type framer struct {
fr *http2.Framer
}
-func newFramer(conn net.Conn, writeBufferSize, readBufferSize int, maxHeaderListSize uint32) *framer {
+var writeBufferPoolMap map[int]*sync.Pool = make(map[int]*sync.Pool)
+var writeBufferMutex sync.Mutex
+
+func newFramer(conn net.Conn, writeBufferSize, readBufferSize int, sharedWriteBuffer bool, maxHeaderListSize uint32) *framer {
if writeBufferSize < 0 {
writeBufferSize = 0
}
@@ -389,7 +414,11 @@ func newFramer(conn net.Conn, writeBufferSize, readBufferSize int, maxHeaderList
if readBufferSize > 0 {
r = bufio.NewReaderSize(r, readBufferSize)
}
- w := newBufWriter(conn, writeBufferSize)
+ var pool *sync.Pool
+ if sharedWriteBuffer {
+ pool = getWriteBufferPool(writeBufferSize)
+ }
+ w := newBufWriter(conn, writeBufferSize, pool)
f := &framer{
writer: w,
fr: http2.NewFramer(w, r),
@@ -403,6 +432,24 @@ func newFramer(conn net.Conn, writeBufferSize, readBufferSize int, maxHeaderList
return f
}
+func getWriteBufferPool(writeBufferSize int) *sync.Pool {
+ writeBufferMutex.Lock()
+ defer writeBufferMutex.Unlock()
+ size := writeBufferSize * 2
+ pool, ok := writeBufferPoolMap[size]
+ if ok {
+ return pool
+ }
+ pool = &sync.Pool{
+ New: func() any {
+ b := make([]byte, size)
+ return &b
+ },
+ }
+ writeBufferPoolMap[size] = pool
+ return pool
+}
+
// parseDialTarget returns the network and address to pass to dialer.
func parseDialTarget(target string) (string, string) {
net := "tcp"
diff --git a/vendor/google.golang.org/grpc/internal/transport/transport.go b/vendor/google.golang.org/grpc/internal/transport/transport.go
index aa1c89659..74a811fc0 100644
--- a/vendor/google.golang.org/grpc/internal/transport/transport.go
+++ b/vendor/google.golang.org/grpc/internal/transport/transport.go
@@ -43,10 +43,6 @@ import (
"google.golang.org/grpc/tap"
)
-// ErrNoHeaders is used as a signal that a trailers only response was received,
-// and is not a real error.
-var ErrNoHeaders = errors.New("stream has no headers")
-
const logLevel = 2
type bufferPool struct {
@@ -56,7 +52,7 @@ type bufferPool struct {
func newBufferPool() *bufferPool {
return &bufferPool{
pool: sync.Pool{
- New: func() interface{} {
+ New: func() any {
return new(bytes.Buffer)
},
},
@@ -390,14 +386,10 @@ func (s *Stream) Header() (metadata.MD, error) {
}
s.waitOnHeader()
- if !s.headerValid {
+ if !s.headerValid || s.noHeaders {
return nil, s.status.Err()
}
- if s.noHeaders {
- return nil, ErrNoHeaders
- }
-
return s.header.Copy(), nil
}
@@ -559,6 +551,7 @@ type ServerConfig struct {
InitialConnWindowSize int32
WriteBufferSize int
ReadBufferSize int
+ SharedWriteBuffer bool
ChannelzParentID *channelz.Identifier
MaxHeaderListSize *uint32
HeaderTableSize *uint32
@@ -592,6 +585,8 @@ type ConnectOptions struct {
WriteBufferSize int
// ReadBufferSize sets the size of read buffer, which in turn determines how much data can be read at most for one read syscall.
ReadBufferSize int
+ // SharedWriteBuffer indicates whether connections should reuse write buffer
+ SharedWriteBuffer bool
// ChannelzParentID sets the addrConn id which initiate the creation of this client transport.
ChannelzParentID *channelz.Identifier
// MaxHeaderListSize sets the max (uncompressed) size of header list that is prepared to be received.
@@ -736,7 +731,7 @@ type ServerTransport interface {
}
// connectionErrorf creates an ConnectionError with the specified error description.
-func connectionErrorf(temp bool, e error, format string, a ...interface{}) ConnectionError {
+func connectionErrorf(temp bool, e error, format string, a ...any) ConnectionError {
return ConnectionError{
Desc: fmt.Sprintf(format, a...),
temp: temp,