summaryrefslogtreecommitdiff
path: root/vendor/google.golang.org/grpc/internal/transport/controlbuf.go
diff options
context:
space:
mode:
Diffstat (limited to 'vendor/google.golang.org/grpc/internal/transport/controlbuf.go')
-rw-r--r--vendor/google.golang.org/grpc/internal/transport/controlbuf.go249
1 files changed, 142 insertions, 107 deletions
diff --git a/vendor/google.golang.org/grpc/internal/transport/controlbuf.go b/vendor/google.golang.org/grpc/internal/transport/controlbuf.go
index 3deadfb4a..ea0633bbd 100644
--- a/vendor/google.golang.org/grpc/internal/transport/controlbuf.go
+++ b/vendor/google.golang.org/grpc/internal/transport/controlbuf.go
@@ -32,6 +32,7 @@ import (
"golang.org/x/net/http2/hpack"
"google.golang.org/grpc/internal/grpclog"
"google.golang.org/grpc/internal/grpcutil"
+ "google.golang.org/grpc/mem"
"google.golang.org/grpc/status"
)
@@ -148,9 +149,9 @@ type dataFrame struct {
streamID uint32
endStream bool
h []byte
- d []byte
+ reader mem.Reader
// onEachWrite is called every time
- // a part of d is written out.
+ // a part of data is written out.
onEachWrite func()
}
@@ -289,18 +290,22 @@ func (l *outStreamList) dequeue() *outStream {
}
// controlBuffer is a way to pass information to loopy.
-// Information is passed as specific struct types called control frames.
-// A control frame not only represents data, messages or headers to be sent out
-// but can also be used to instruct loopy to update its internal state.
-// It shouldn't be confused with an HTTP2 frame, although some of the control frames
-// like dataFrame and headerFrame do go out on wire as HTTP2 frames.
+//
+// Information is passed as specific struct types called control frames. A
+// control frame not only represents data, messages or headers to be sent out
+// but can also be used to instruct loopy to update its internal state. It
+// shouldn't be confused with an HTTP2 frame, although some of the control
+// frames like dataFrame and headerFrame do go out on wire as HTTP2 frames.
type controlBuffer struct {
- ch chan struct{}
- done <-chan struct{}
+ wakeupCh chan struct{} // Unblocks readers waiting for something to read.
+ done <-chan struct{} // Closed when the transport is done.
+
+ // Mutex guards all the fields below, except trfChan which can be read
+ // atomically without holding mu.
mu sync.Mutex
- consumerWaiting bool
- list *itemList
- err error
+ consumerWaiting bool // True when readers are blocked waiting for new data.
+ closed bool // True when the controlbuf is finished.
+ list *itemList // List of queued control frames.
// transportResponseFrames counts the number of queued items that represent
// the response of an action initiated by the peer. trfChan is created
@@ -308,47 +313,59 @@ type controlBuffer struct {
// closed and nilled when transportResponseFrames drops below the
// threshold. Both fields are protected by mu.
transportResponseFrames int
- trfChan atomic.Value // chan struct{}
+ trfChan atomic.Pointer[chan struct{}]
}
func newControlBuffer(done <-chan struct{}) *controlBuffer {
return &controlBuffer{
- ch: make(chan struct{}, 1),
- list: &itemList{},
- done: done,
+ wakeupCh: make(chan struct{}, 1),
+ list: &itemList{},
+ done: done,
}
}
-// throttle blocks if there are too many incomingSettings/cleanupStreams in the
-// controlbuf.
+// throttle blocks if there are too many frames in the control buf that
+// represent the response of an action initiated by the peer, like
+// incomingSettings cleanupStreams etc.
func (c *controlBuffer) throttle() {
- ch, _ := c.trfChan.Load().(chan struct{})
- if ch != nil {
+ if ch := c.trfChan.Load(); ch != nil {
select {
- case <-ch:
+ case <-(*ch):
case <-c.done:
}
}
}
+// put adds an item to the controlbuf.
func (c *controlBuffer) put(it cbItem) error {
_, err := c.executeAndPut(nil, it)
return err
}
+// executeAndPut runs f, and if the return value is true, adds the given item to
+// the controlbuf. The item could be nil, in which case, this method simply
+// executes f and does not add the item to the controlbuf.
+//
+// The first return value indicates whether the item was successfully added to
+// the control buffer. A non-nil error, specifically ErrConnClosing, is returned
+// if the control buffer is already closed.
func (c *controlBuffer) executeAndPut(f func() bool, it cbItem) (bool, error) {
- var wakeUp bool
c.mu.Lock()
- if c.err != nil {
- c.mu.Unlock()
- return false, c.err
+ defer c.mu.Unlock()
+
+ if c.closed {
+ return false, ErrConnClosing
}
if f != nil {
if !f() { // f wasn't successful
- c.mu.Unlock()
return false, nil
}
}
+ if it == nil {
+ return true, nil
+ }
+
+ var wakeUp bool
if c.consumerWaiting {
wakeUp = true
c.consumerWaiting = false
@@ -359,98 +376,102 @@ func (c *controlBuffer) executeAndPut(f func() bool, it cbItem) (bool, error) {
if c.transportResponseFrames == maxQueuedTransportResponseFrames {
// We are adding the frame that puts us over the threshold; create
// a throttling channel.
- c.trfChan.Store(make(chan struct{}))
+ ch := make(chan struct{})
+ c.trfChan.Store(&ch)
}
}
- c.mu.Unlock()
if wakeUp {
select {
- case c.ch <- struct{}{}:
+ case c.wakeupCh <- struct{}{}:
default:
}
}
return true, nil
}
-// Note argument f should never be nil.
-func (c *controlBuffer) execute(f func(it any) bool, it any) (bool, error) {
- c.mu.Lock()
- if c.err != nil {
- c.mu.Unlock()
- return false, c.err
- }
- if !f(it) { // f wasn't successful
- c.mu.Unlock()
- return false, nil
- }
- c.mu.Unlock()
- return true, nil
-}
-
+// get returns the next control frame from the control buffer. If block is true
+// **and** there are no control frames in the control buffer, the call blocks
+// until one of the conditions is met: there is a frame to return or the
+// transport is closed.
func (c *controlBuffer) get(block bool) (any, error) {
for {
c.mu.Lock()
- if c.err != nil {
+ frame, err := c.getOnceLocked()
+ if frame != nil || err != nil || !block {
+ // If we read a frame or an error, we can return to the caller. The
+ // call to getOnceLocked() returns a nil frame and a nil error if
+ // there is nothing to read, and in that case, if the caller asked
+ // us not to block, we can return now as well.
c.mu.Unlock()
- return nil, c.err
- }
- if !c.list.isEmpty() {
- h := c.list.dequeue().(cbItem)
- if h.isTransportResponseFrame() {
- if c.transportResponseFrames == maxQueuedTransportResponseFrames {
- // We are removing the frame that put us over the
- // threshold; close and clear the throttling channel.
- ch := c.trfChan.Load().(chan struct{})
- close(ch)
- c.trfChan.Store((chan struct{})(nil))
- }
- c.transportResponseFrames--
- }
- c.mu.Unlock()
- return h, nil
- }
- if !block {
- c.mu.Unlock()
- return nil, nil
+ return frame, err
}
c.consumerWaiting = true
c.mu.Unlock()
+
+ // Release the lock above and wait to be woken up.
select {
- case <-c.ch:
+ case <-c.wakeupCh:
case <-c.done:
return nil, errors.New("transport closed by client")
}
}
}
+// Callers must not use this method, but should instead use get().
+//
+// Caller must hold c.mu.
+func (c *controlBuffer) getOnceLocked() (any, error) {
+ if c.closed {
+ return false, ErrConnClosing
+ }
+ if c.list.isEmpty() {
+ return nil, nil
+ }
+ h := c.list.dequeue().(cbItem)
+ if h.isTransportResponseFrame() {
+ if c.transportResponseFrames == maxQueuedTransportResponseFrames {
+ // We are removing the frame that put us over the
+ // threshold; close and clear the throttling channel.
+ ch := c.trfChan.Swap(nil)
+ close(*ch)
+ }
+ c.transportResponseFrames--
+ }
+ return h, nil
+}
+
+// finish closes the control buffer, cleaning up any streams that have queued
+// header frames. Once this method returns, no more frames can be added to the
+// control buffer, and attempts to do so will return ErrConnClosing.
func (c *controlBuffer) finish() {
c.mu.Lock()
- if c.err != nil {
- c.mu.Unlock()
+ defer c.mu.Unlock()
+
+ if c.closed {
return
}
- c.err = ErrConnClosing
+ c.closed = true
// There may be headers for streams in the control buffer.
// These streams need to be cleaned out since the transport
// is still not aware of these yet.
for head := c.list.dequeueAll(); head != nil; head = head.next {
- hdr, ok := head.it.(*headerFrame)
- if !ok {
- continue
- }
- if hdr.onOrphaned != nil { // It will be nil on the server-side.
- hdr.onOrphaned(ErrConnClosing)
+ switch v := head.it.(type) {
+ case *headerFrame:
+ if v.onOrphaned != nil { // It will be nil on the server-side.
+ v.onOrphaned(ErrConnClosing)
+ }
+ case *dataFrame:
+ _ = v.reader.Close()
}
}
+
// In case throttle() is currently in flight, it needs to be unblocked.
// Otherwise, the transport may not close, since the transport is closed by
// the reader encountering the connection error.
- ch, _ := c.trfChan.Load().(chan struct{})
+ ch := c.trfChan.Swap(nil)
if ch != nil {
- close(ch)
+ close(*ch)
}
- c.trfChan.Store((chan struct{})(nil))
- c.mu.Unlock()
}
type side int
@@ -466,7 +487,7 @@ const (
// stream maintains a queue of data frames; as loopy receives data frames
// it gets added to the queue of the relevant stream.
// Loopy goes over this list of active streams by processing one node every iteration,
-// thereby closely resemebling to a round-robin scheduling over all streams. While
+// thereby closely resembling a round-robin scheduling over all streams. While
// processing a stream, loopy writes out data bytes from this stream capped by the min
// of http2MaxFrameLen, connection-level flow control and stream-level flow control.
type loopyWriter struct {
@@ -490,12 +511,13 @@ type loopyWriter struct {
draining bool
conn net.Conn
logger *grpclog.PrefixLogger
+ bufferPool mem.BufferPool
// Side-specific handlers
ssGoAwayHandler func(*goAway) (bool, error)
}
-func newLoopyWriter(s side, fr *framer, cbuf *controlBuffer, bdpEst *bdpEstimator, conn net.Conn, logger *grpclog.PrefixLogger, goAwayHandler func(*goAway) (bool, error)) *loopyWriter {
+func newLoopyWriter(s side, fr *framer, cbuf *controlBuffer, bdpEst *bdpEstimator, conn net.Conn, logger *grpclog.PrefixLogger, goAwayHandler func(*goAway) (bool, error), bufferPool mem.BufferPool) *loopyWriter {
var buf bytes.Buffer
l := &loopyWriter{
side: s,
@@ -511,6 +533,7 @@ func newLoopyWriter(s side, fr *framer, cbuf *controlBuffer, bdpEst *bdpEstimato
conn: conn,
logger: logger,
ssGoAwayHandler: goAwayHandler,
+ bufferPool: bufferPool,
}
return l
}
@@ -768,6 +791,11 @@ func (l *loopyWriter) cleanupStreamHandler(c *cleanupStream) error {
// not be established yet.
delete(l.estdStreams, c.streamID)
str.deleteSelf()
+ for head := str.itl.dequeueAll(); head != nil; head = head.next {
+ if df, ok := head.it.(*dataFrame); ok {
+ _ = df.reader.Close()
+ }
+ }
}
if c.rst { // If RST_STREAM needs to be sent.
if err := l.framer.fr.WriteRSTStream(c.streamID, c.rstCode); err != nil {
@@ -903,16 +931,18 @@ func (l *loopyWriter) processData() (bool, error) {
dataItem := str.itl.peek().(*dataFrame) // Peek at the first data item this stream.
// A data item is represented by a dataFrame, since it later translates into
// multiple HTTP2 data frames.
- // Every dataFrame has two buffers; h that keeps grpc-message header and d that is actual data.
- // As an optimization to keep wire traffic low, data from d is copied to h to make as big as the
- // maximum possible HTTP2 frame size.
+ // Every dataFrame has two buffers; h that keeps grpc-message header and data
+ // that is the actual message. As an optimization to keep wire traffic low, data
+ // from data is copied to h to make as big as the maximum possible HTTP2 frame
+ // size.
- if len(dataItem.h) == 0 && len(dataItem.d) == 0 { // Empty data frame
+ if len(dataItem.h) == 0 && dataItem.reader.Remaining() == 0 { // Empty data frame
// Client sends out empty data frame with endStream = true
if err := l.framer.fr.WriteData(dataItem.streamID, dataItem.endStream, nil); err != nil {
return false, err
}
str.itl.dequeue() // remove the empty data item from stream
+ _ = dataItem.reader.Close()
if str.itl.isEmpty() {
str.state = empty
} else if trailer, ok := str.itl.peek().(*headerFrame); ok { // the next item is trailers.
@@ -927,9 +957,7 @@ func (l *loopyWriter) processData() (bool, error) {
}
return false, nil
}
- var (
- buf []byte
- )
+
// Figure out the maximum size we can send
maxSize := http2MaxFrameLen
if strQuota := int(l.oiws) - str.bytesOutStanding; strQuota <= 0 { // stream-level flow control.
@@ -943,43 +971,50 @@ func (l *loopyWriter) processData() (bool, error) {
}
// Compute how much of the header and data we can send within quota and max frame length
hSize := min(maxSize, len(dataItem.h))
- dSize := min(maxSize-hSize, len(dataItem.d))
- if hSize != 0 {
- if dSize == 0 {
- buf = dataItem.h
- } else {
- // We can add some data to grpc message header to distribute bytes more equally across frames.
- // Copy on the stack to avoid generating garbage
- var localBuf [http2MaxFrameLen]byte
- copy(localBuf[:hSize], dataItem.h)
- copy(localBuf[hSize:], dataItem.d[:dSize])
- buf = localBuf[:hSize+dSize]
- }
+ dSize := min(maxSize-hSize, dataItem.reader.Remaining())
+ remainingBytes := len(dataItem.h) + dataItem.reader.Remaining() - hSize - dSize
+ size := hSize + dSize
+
+ var buf *[]byte
+
+ if hSize != 0 && dSize == 0 {
+ buf = &dataItem.h
} else {
- buf = dataItem.d
- }
+ // Note: this is only necessary because the http2.Framer does not support
+ // partially writing a frame, so the sequence must be materialized into a buffer.
+ // TODO: Revisit once https://github.com/golang/go/issues/66655 is addressed.
+ pool := l.bufferPool
+ if pool == nil {
+ // Note that this is only supposed to be nil in tests. Otherwise, stream is
+ // always initialized with a BufferPool.
+ pool = mem.DefaultBufferPool()
+ }
+ buf = pool.Get(size)
+ defer pool.Put(buf)
- size := hSize + dSize
+ copy((*buf)[:hSize], dataItem.h)
+ _, _ = dataItem.reader.Read((*buf)[hSize:])
+ }
// Now that outgoing flow controls are checked we can replenish str's write quota
str.wq.replenish(size)
var endStream bool
// If this is the last data message on this stream and all of it can be written in this iteration.
- if dataItem.endStream && len(dataItem.h)+len(dataItem.d) <= size {
+ if dataItem.endStream && remainingBytes == 0 {
endStream = true
}
if dataItem.onEachWrite != nil {
dataItem.onEachWrite()
}
- if err := l.framer.fr.WriteData(dataItem.streamID, endStream, buf[:size]); err != nil {
+ if err := l.framer.fr.WriteData(dataItem.streamID, endStream, (*buf)[:size]); err != nil {
return false, err
}
str.bytesOutStanding += size
l.sendQuota -= uint32(size)
dataItem.h = dataItem.h[hSize:]
- dataItem.d = dataItem.d[dSize:]
- if len(dataItem.h) == 0 && len(dataItem.d) == 0 { // All the data from that message was written out.
+ if remainingBytes == 0 { // All the data from that message was written out.
+ _ = dataItem.reader.Close()
str.itl.dequeue()
}
if str.itl.isEmpty() {