summaryrefslogtreecommitdiff
path: root/vendor/google.golang.org/grpc/internal/transport/controlbuf.go
diff options
context:
space:
mode:
Diffstat (limited to 'vendor/google.golang.org/grpc/internal/transport/controlbuf.go')
-rw-r--r--vendor/google.golang.org/grpc/internal/transport/controlbuf.go1035
1 files changed, 0 insertions, 1035 deletions
diff --git a/vendor/google.golang.org/grpc/internal/transport/controlbuf.go b/vendor/google.golang.org/grpc/internal/transport/controlbuf.go
deleted file mode 100644
index ef72fbb3a..000000000
--- a/vendor/google.golang.org/grpc/internal/transport/controlbuf.go
+++ /dev/null
@@ -1,1035 +0,0 @@
-/*
- *
- * Copyright 2014 gRPC authors.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package transport
-
-import (
- "bytes"
- "errors"
- "fmt"
- "net"
- "runtime"
- "strconv"
- "sync"
- "sync/atomic"
-
- "golang.org/x/net/http2"
- "golang.org/x/net/http2/hpack"
- "google.golang.org/grpc/internal/grpclog"
- "google.golang.org/grpc/internal/grpcutil"
- "google.golang.org/grpc/mem"
- "google.golang.org/grpc/status"
-)
-
-var updateHeaderTblSize = func(e *hpack.Encoder, v uint32) {
- e.SetMaxDynamicTableSizeLimit(v)
-}
-
-type itemNode struct {
- it any
- next *itemNode
-}
-
-type itemList struct {
- head *itemNode
- tail *itemNode
-}
-
-func (il *itemList) enqueue(i any) {
- n := &itemNode{it: i}
- if il.tail == nil {
- il.head, il.tail = n, n
- return
- }
- il.tail.next = n
- il.tail = n
-}
-
-// peek returns the first item in the list without removing it from the
-// list.
-func (il *itemList) peek() any {
- return il.head.it
-}
-
-func (il *itemList) dequeue() any {
- if il.head == nil {
- return nil
- }
- i := il.head.it
- il.head = il.head.next
- if il.head == nil {
- il.tail = nil
- }
- return i
-}
-
-func (il *itemList) dequeueAll() *itemNode {
- h := il.head
- il.head, il.tail = nil, nil
- return h
-}
-
-func (il *itemList) isEmpty() bool {
- return il.head == nil
-}
-
-// The following defines various control items which could flow through
-// the control buffer of transport. They represent different aspects of
-// control tasks, e.g., flow control, settings, streaming resetting, etc.
-
-// maxQueuedTransportResponseFrames is the most queued "transport response"
-// frames we will buffer before preventing new reads from occurring on the
-// transport. These are control frames sent in response to client requests,
-// such as RST_STREAM due to bad headers or settings acks.
-const maxQueuedTransportResponseFrames = 50
-
-type cbItem interface {
- isTransportResponseFrame() bool
-}
-
-// registerStream is used to register an incoming stream with loopy writer.
-type registerStream struct {
- streamID uint32
- wq *writeQuota
-}
-
-func (*registerStream) isTransportResponseFrame() bool { return false }
-
-// headerFrame is also used to register stream on the client-side.
-type headerFrame struct {
- streamID uint32
- hf []hpack.HeaderField
- endStream bool // Valid on server side.
- initStream func(uint32) error // Used only on the client side.
- onWrite func()
- wq *writeQuota // write quota for the stream created.
- cleanup *cleanupStream // Valid on the server side.
- onOrphaned func(error) // Valid on client-side
-}
-
-func (h *headerFrame) isTransportResponseFrame() bool {
- return h.cleanup != nil && h.cleanup.rst // Results in a RST_STREAM
-}
-
-type cleanupStream struct {
- streamID uint32
- rst bool
- rstCode http2.ErrCode
- onWrite func()
-}
-
-func (c *cleanupStream) isTransportResponseFrame() bool { return c.rst } // Results in a RST_STREAM
-
-type earlyAbortStream struct {
- httpStatus uint32
- streamID uint32
- contentSubtype string
- status *status.Status
- rst bool
-}
-
-func (*earlyAbortStream) isTransportResponseFrame() bool { return false }
-
-type dataFrame struct {
- streamID uint32
- endStream bool
- h []byte
- reader mem.Reader
- // onEachWrite is called every time
- // a part of data is written out.
- onEachWrite func()
-}
-
-func (*dataFrame) isTransportResponseFrame() bool { return false }
-
-type incomingWindowUpdate struct {
- streamID uint32
- increment uint32
-}
-
-func (*incomingWindowUpdate) isTransportResponseFrame() bool { return false }
-
-type outgoingWindowUpdate struct {
- streamID uint32
- increment uint32
-}
-
-func (*outgoingWindowUpdate) isTransportResponseFrame() bool {
- return false // window updates are throttled by thresholds
-}
-
-type incomingSettings struct {
- ss []http2.Setting
-}
-
-func (*incomingSettings) isTransportResponseFrame() bool { return true } // Results in a settings ACK
-
-type outgoingSettings struct {
- ss []http2.Setting
-}
-
-func (*outgoingSettings) isTransportResponseFrame() bool { return false }
-
-type incomingGoAway struct {
-}
-
-func (*incomingGoAway) isTransportResponseFrame() bool { return false }
-
-type goAway struct {
- code http2.ErrCode
- debugData []byte
- headsUp bool
- closeConn error // if set, loopyWriter will exit with this error
-}
-
-func (*goAway) isTransportResponseFrame() bool { return false }
-
-type ping struct {
- ack bool
- data [8]byte
-}
-
-func (*ping) isTransportResponseFrame() bool { return true }
-
-type outFlowControlSizeRequest struct {
- resp chan uint32
-}
-
-func (*outFlowControlSizeRequest) isTransportResponseFrame() bool { return false }
-
-// closeConnection is an instruction to tell the loopy writer to flush the
-// framer and exit, which will cause the transport's connection to be closed
-// (by the client or server). The transport itself will close after the reader
-// encounters the EOF caused by the connection closure.
-type closeConnection struct{}
-
-func (closeConnection) isTransportResponseFrame() bool { return false }
-
-type outStreamState int
-
-const (
- active outStreamState = iota
- empty
- waitingOnStreamQuota
-)
-
-type outStream struct {
- id uint32
- state outStreamState
- itl *itemList
- bytesOutStanding int
- wq *writeQuota
-
- next *outStream
- prev *outStream
-}
-
-func (s *outStream) deleteSelf() {
- if s.prev != nil {
- s.prev.next = s.next
- }
- if s.next != nil {
- s.next.prev = s.prev
- }
- s.next, s.prev = nil, nil
-}
-
-type outStreamList struct {
- // Following are sentinel objects that mark the
- // beginning and end of the list. They do not
- // contain any item lists. All valid objects are
- // inserted in between them.
- // This is needed so that an outStream object can
- // deleteSelf() in O(1) time without knowing which
- // list it belongs to.
- head *outStream
- tail *outStream
-}
-
-func newOutStreamList() *outStreamList {
- head, tail := new(outStream), new(outStream)
- head.next = tail
- tail.prev = head
- return &outStreamList{
- head: head,
- tail: tail,
- }
-}
-
-func (l *outStreamList) enqueue(s *outStream) {
- e := l.tail.prev
- e.next = s
- s.prev = e
- s.next = l.tail
- l.tail.prev = s
-}
-
-// remove from the beginning of the list.
-func (l *outStreamList) dequeue() *outStream {
- b := l.head.next
- if b == l.tail {
- return nil
- }
- b.deleteSelf()
- return b
-}
-
-// controlBuffer is a way to pass information to loopy.
-//
-// Information is passed as specific struct types called control frames. A
-// control frame not only represents data, messages or headers to be sent out
-// but can also be used to instruct loopy to update its internal state. It
-// shouldn't be confused with an HTTP2 frame, although some of the control
-// frames like dataFrame and headerFrame do go out on wire as HTTP2 frames.
-type controlBuffer struct {
- wakeupCh chan struct{} // Unblocks readers waiting for something to read.
- done <-chan struct{} // Closed when the transport is done.
-
- // Mutex guards all the fields below, except trfChan which can be read
- // atomically without holding mu.
- mu sync.Mutex
- consumerWaiting bool // True when readers are blocked waiting for new data.
- closed bool // True when the controlbuf is finished.
- list *itemList // List of queued control frames.
-
- // transportResponseFrames counts the number of queued items that represent
- // the response of an action initiated by the peer. trfChan is created
- // when transportResponseFrames >= maxQueuedTransportResponseFrames and is
- // closed and nilled when transportResponseFrames drops below the
- // threshold. Both fields are protected by mu.
- transportResponseFrames int
- trfChan atomic.Pointer[chan struct{}]
-}
-
-func newControlBuffer(done <-chan struct{}) *controlBuffer {
- return &controlBuffer{
- wakeupCh: make(chan struct{}, 1),
- list: &itemList{},
- done: done,
- }
-}
-
-// throttle blocks if there are too many frames in the control buf that
-// represent the response of an action initiated by the peer, like
-// incomingSettings cleanupStreams etc.
-func (c *controlBuffer) throttle() {
- if ch := c.trfChan.Load(); ch != nil {
- select {
- case <-(*ch):
- case <-c.done:
- }
- }
-}
-
-// put adds an item to the controlbuf.
-func (c *controlBuffer) put(it cbItem) error {
- _, err := c.executeAndPut(nil, it)
- return err
-}
-
-// executeAndPut runs f, and if the return value is true, adds the given item to
-// the controlbuf. The item could be nil, in which case, this method simply
-// executes f and does not add the item to the controlbuf.
-//
-// The first return value indicates whether the item was successfully added to
-// the control buffer. A non-nil error, specifically ErrConnClosing, is returned
-// if the control buffer is already closed.
-func (c *controlBuffer) executeAndPut(f func() bool, it cbItem) (bool, error) {
- c.mu.Lock()
- defer c.mu.Unlock()
-
- if c.closed {
- return false, ErrConnClosing
- }
- if f != nil {
- if !f() { // f wasn't successful
- return false, nil
- }
- }
- if it == nil {
- return true, nil
- }
-
- var wakeUp bool
- if c.consumerWaiting {
- wakeUp = true
- c.consumerWaiting = false
- }
- c.list.enqueue(it)
- if it.isTransportResponseFrame() {
- c.transportResponseFrames++
- if c.transportResponseFrames == maxQueuedTransportResponseFrames {
- // We are adding the frame that puts us over the threshold; create
- // a throttling channel.
- ch := make(chan struct{})
- c.trfChan.Store(&ch)
- }
- }
- if wakeUp {
- select {
- case c.wakeupCh <- struct{}{}:
- default:
- }
- }
- return true, nil
-}
-
-// get returns the next control frame from the control buffer. If block is true
-// **and** there are no control frames in the control buffer, the call blocks
-// until one of the conditions is met: there is a frame to return or the
-// transport is closed.
-func (c *controlBuffer) get(block bool) (any, error) {
- for {
- c.mu.Lock()
- frame, err := c.getOnceLocked()
- if frame != nil || err != nil || !block {
- // If we read a frame or an error, we can return to the caller. The
- // call to getOnceLocked() returns a nil frame and a nil error if
- // there is nothing to read, and in that case, if the caller asked
- // us not to block, we can return now as well.
- c.mu.Unlock()
- return frame, err
- }
- c.consumerWaiting = true
- c.mu.Unlock()
-
- // Release the lock above and wait to be woken up.
- select {
- case <-c.wakeupCh:
- case <-c.done:
- return nil, errors.New("transport closed by client")
- }
- }
-}
-
-// Callers must not use this method, but should instead use get().
-//
-// Caller must hold c.mu.
-func (c *controlBuffer) getOnceLocked() (any, error) {
- if c.closed {
- return false, ErrConnClosing
- }
- if c.list.isEmpty() {
- return nil, nil
- }
- h := c.list.dequeue().(cbItem)
- if h.isTransportResponseFrame() {
- if c.transportResponseFrames == maxQueuedTransportResponseFrames {
- // We are removing the frame that put us over the
- // threshold; close and clear the throttling channel.
- ch := c.trfChan.Swap(nil)
- close(*ch)
- }
- c.transportResponseFrames--
- }
- return h, nil
-}
-
-// finish closes the control buffer, cleaning up any streams that have queued
-// header frames. Once this method returns, no more frames can be added to the
-// control buffer, and attempts to do so will return ErrConnClosing.
-func (c *controlBuffer) finish() {
- c.mu.Lock()
- defer c.mu.Unlock()
-
- if c.closed {
- return
- }
- c.closed = true
- // There may be headers for streams in the control buffer.
- // These streams need to be cleaned out since the transport
- // is still not aware of these yet.
- for head := c.list.dequeueAll(); head != nil; head = head.next {
- switch v := head.it.(type) {
- case *headerFrame:
- if v.onOrphaned != nil { // It will be nil on the server-side.
- v.onOrphaned(ErrConnClosing)
- }
- case *dataFrame:
- _ = v.reader.Close()
- }
- }
-
- // In case throttle() is currently in flight, it needs to be unblocked.
- // Otherwise, the transport may not close, since the transport is closed by
- // the reader encountering the connection error.
- ch := c.trfChan.Swap(nil)
- if ch != nil {
- close(*ch)
- }
-}
-
-type side int
-
-const (
- clientSide side = iota
- serverSide
-)
-
-// Loopy receives frames from the control buffer.
-// Each frame is handled individually; most of the work done by loopy goes
-// into handling data frames. Loopy maintains a queue of active streams, and each
-// stream maintains a queue of data frames; as loopy receives data frames
-// it gets added to the queue of the relevant stream.
-// Loopy goes over this list of active streams by processing one node every iteration,
-// thereby closely resembling a round-robin scheduling over all streams. While
-// processing a stream, loopy writes out data bytes from this stream capped by the min
-// of http2MaxFrameLen, connection-level flow control and stream-level flow control.
-type loopyWriter struct {
- side side
- cbuf *controlBuffer
- sendQuota uint32
- oiws uint32 // outbound initial window size.
- // estdStreams is map of all established streams that are not cleaned-up yet.
- // On client-side, this is all streams whose headers were sent out.
- // On server-side, this is all streams whose headers were received.
- estdStreams map[uint32]*outStream // Established streams.
- // activeStreams is a linked-list of all streams that have data to send and some
- // stream-level flow control quota.
- // Each of these streams internally have a list of data items(and perhaps trailers
- // on the server-side) to be sent out.
- activeStreams *outStreamList
- framer *framer
- hBuf *bytes.Buffer // The buffer for HPACK encoding.
- hEnc *hpack.Encoder // HPACK encoder.
- bdpEst *bdpEstimator
- draining bool
- conn net.Conn
- logger *grpclog.PrefixLogger
- bufferPool mem.BufferPool
-
- // Side-specific handlers
- ssGoAwayHandler func(*goAway) (bool, error)
-}
-
-func newLoopyWriter(s side, fr *framer, cbuf *controlBuffer, bdpEst *bdpEstimator, conn net.Conn, logger *grpclog.PrefixLogger, goAwayHandler func(*goAway) (bool, error), bufferPool mem.BufferPool) *loopyWriter {
- var buf bytes.Buffer
- l := &loopyWriter{
- side: s,
- cbuf: cbuf,
- sendQuota: defaultWindowSize,
- oiws: defaultWindowSize,
- estdStreams: make(map[uint32]*outStream),
- activeStreams: newOutStreamList(),
- framer: fr,
- hBuf: &buf,
- hEnc: hpack.NewEncoder(&buf),
- bdpEst: bdpEst,
- conn: conn,
- logger: logger,
- ssGoAwayHandler: goAwayHandler,
- bufferPool: bufferPool,
- }
- return l
-}
-
-const minBatchSize = 1000
-
-// run should be run in a separate goroutine.
-// It reads control frames from controlBuf and processes them by:
-// 1. Updating loopy's internal state, or/and
-// 2. Writing out HTTP2 frames on the wire.
-//
-// Loopy keeps all active streams with data to send in a linked-list.
-// All streams in the activeStreams linked-list must have both:
-// 1. Data to send, and
-// 2. Stream level flow control quota available.
-//
-// In each iteration of run loop, other than processing the incoming control
-// frame, loopy calls processData, which processes one node from the
-// activeStreams linked-list. This results in writing of HTTP2 frames into an
-// underlying write buffer. When there's no more control frames to read from
-// controlBuf, loopy flushes the write buffer. As an optimization, to increase
-// the batch size for each flush, loopy yields the processor, once if the batch
-// size is too low to give stream goroutines a chance to fill it up.
-//
-// Upon exiting, if the error causing the exit is not an I/O error, run()
-// flushes the underlying connection. The connection is always left open to
-// allow different closing behavior on the client and server.
-func (l *loopyWriter) run() (err error) {
- defer func() {
- if l.logger.V(logLevel) {
- l.logger.Infof("loopyWriter exiting with error: %v", err)
- }
- if !isIOError(err) {
- l.framer.writer.Flush()
- }
- l.cbuf.finish()
- }()
- for {
- it, err := l.cbuf.get(true)
- if err != nil {
- return err
- }
- if err = l.handle(it); err != nil {
- return err
- }
- if _, err = l.processData(); err != nil {
- return err
- }
- gosched := true
- hasdata:
- for {
- it, err := l.cbuf.get(false)
- if err != nil {
- return err
- }
- if it != nil {
- if err = l.handle(it); err != nil {
- return err
- }
- if _, err = l.processData(); err != nil {
- return err
- }
- continue hasdata
- }
- isEmpty, err := l.processData()
- if err != nil {
- return err
- }
- if !isEmpty {
- continue hasdata
- }
- if gosched {
- gosched = false
- if l.framer.writer.offset < minBatchSize {
- runtime.Gosched()
- continue hasdata
- }
- }
- l.framer.writer.Flush()
- break hasdata
- }
- }
-}
-
-func (l *loopyWriter) outgoingWindowUpdateHandler(w *outgoingWindowUpdate) error {
- return l.framer.fr.WriteWindowUpdate(w.streamID, w.increment)
-}
-
-func (l *loopyWriter) incomingWindowUpdateHandler(w *incomingWindowUpdate) {
- // Otherwise update the quota.
- if w.streamID == 0 {
- l.sendQuota += w.increment
- return
- }
- // Find the stream and update it.
- if str, ok := l.estdStreams[w.streamID]; ok {
- str.bytesOutStanding -= int(w.increment)
- if strQuota := int(l.oiws) - str.bytesOutStanding; strQuota > 0 && str.state == waitingOnStreamQuota {
- str.state = active
- l.activeStreams.enqueue(str)
- return
- }
- }
-}
-
-func (l *loopyWriter) outgoingSettingsHandler(s *outgoingSettings) error {
- return l.framer.fr.WriteSettings(s.ss...)
-}
-
-func (l *loopyWriter) incomingSettingsHandler(s *incomingSettings) error {
- l.applySettings(s.ss)
- return l.framer.fr.WriteSettingsAck()
-}
-
-func (l *loopyWriter) registerStreamHandler(h *registerStream) {
- str := &outStream{
- id: h.streamID,
- state: empty,
- itl: &itemList{},
- wq: h.wq,
- }
- l.estdStreams[h.streamID] = str
-}
-
-func (l *loopyWriter) headerHandler(h *headerFrame) error {
- if l.side == serverSide {
- str, ok := l.estdStreams[h.streamID]
- if !ok {
- if l.logger.V(logLevel) {
- l.logger.Infof("Unrecognized streamID %d in loopyWriter", h.streamID)
- }
- return nil
- }
- // Case 1.A: Server is responding back with headers.
- if !h.endStream {
- return l.writeHeader(h.streamID, h.endStream, h.hf, h.onWrite)
- }
- // else: Case 1.B: Server wants to close stream.
-
- if str.state != empty { // either active or waiting on stream quota.
- // add it str's list of items.
- str.itl.enqueue(h)
- return nil
- }
- if err := l.writeHeader(h.streamID, h.endStream, h.hf, h.onWrite); err != nil {
- return err
- }
- return l.cleanupStreamHandler(h.cleanup)
- }
- // Case 2: Client wants to originate stream.
- str := &outStream{
- id: h.streamID,
- state: empty,
- itl: &itemList{},
- wq: h.wq,
- }
- return l.originateStream(str, h)
-}
-
-func (l *loopyWriter) originateStream(str *outStream, hdr *headerFrame) error {
- // l.draining is set when handling GoAway. In which case, we want to avoid
- // creating new streams.
- if l.draining {
- // TODO: provide a better error with the reason we are in draining.
- hdr.onOrphaned(errStreamDrain)
- return nil
- }
- if err := hdr.initStream(str.id); err != nil {
- return err
- }
- if err := l.writeHeader(str.id, hdr.endStream, hdr.hf, hdr.onWrite); err != nil {
- return err
- }
- l.estdStreams[str.id] = str
- return nil
-}
-
-func (l *loopyWriter) writeHeader(streamID uint32, endStream bool, hf []hpack.HeaderField, onWrite func()) error {
- if onWrite != nil {
- onWrite()
- }
- l.hBuf.Reset()
- for _, f := range hf {
- if err := l.hEnc.WriteField(f); err != nil {
- if l.logger.V(logLevel) {
- l.logger.Warningf("Encountered error while encoding headers: %v", err)
- }
- }
- }
- var (
- err error
- endHeaders, first bool
- )
- first = true
- for !endHeaders {
- size := l.hBuf.Len()
- if size > http2MaxFrameLen {
- size = http2MaxFrameLen
- } else {
- endHeaders = true
- }
- if first {
- first = false
- err = l.framer.fr.WriteHeaders(http2.HeadersFrameParam{
- StreamID: streamID,
- BlockFragment: l.hBuf.Next(size),
- EndStream: endStream,
- EndHeaders: endHeaders,
- })
- } else {
- err = l.framer.fr.WriteContinuation(
- streamID,
- endHeaders,
- l.hBuf.Next(size),
- )
- }
- if err != nil {
- return err
- }
- }
- return nil
-}
-
-func (l *loopyWriter) preprocessData(df *dataFrame) {
- str, ok := l.estdStreams[df.streamID]
- if !ok {
- return
- }
- // If we got data for a stream it means that
- // stream was originated and the headers were sent out.
- str.itl.enqueue(df)
- if str.state == empty {
- str.state = active
- l.activeStreams.enqueue(str)
- }
-}
-
-func (l *loopyWriter) pingHandler(p *ping) error {
- if !p.ack {
- l.bdpEst.timesnap(p.data)
- }
- return l.framer.fr.WritePing(p.ack, p.data)
-
-}
-
-func (l *loopyWriter) outFlowControlSizeRequestHandler(o *outFlowControlSizeRequest) {
- o.resp <- l.sendQuota
-}
-
-func (l *loopyWriter) cleanupStreamHandler(c *cleanupStream) error {
- c.onWrite()
- if str, ok := l.estdStreams[c.streamID]; ok {
- // On the server side it could be a trailers-only response or
- // a RST_STREAM before stream initialization thus the stream might
- // not be established yet.
- delete(l.estdStreams, c.streamID)
- str.deleteSelf()
- for head := str.itl.dequeueAll(); head != nil; head = head.next {
- if df, ok := head.it.(*dataFrame); ok {
- _ = df.reader.Close()
- }
- }
- }
- if c.rst { // If RST_STREAM needs to be sent.
- if err := l.framer.fr.WriteRSTStream(c.streamID, c.rstCode); err != nil {
- return err
- }
- }
- if l.draining && len(l.estdStreams) == 0 {
- // Flush and close the connection; we are done with it.
- return errors.New("finished processing active streams while in draining mode")
- }
- return nil
-}
-
-func (l *loopyWriter) earlyAbortStreamHandler(eas *earlyAbortStream) error {
- if l.side == clientSide {
- return errors.New("earlyAbortStream not handled on client")
- }
- // In case the caller forgets to set the http status, default to 200.
- if eas.httpStatus == 0 {
- eas.httpStatus = 200
- }
- headerFields := []hpack.HeaderField{
- {Name: ":status", Value: strconv.Itoa(int(eas.httpStatus))},
- {Name: "content-type", Value: grpcutil.ContentType(eas.contentSubtype)},
- {Name: "grpc-status", Value: strconv.Itoa(int(eas.status.Code()))},
- {Name: "grpc-message", Value: encodeGrpcMessage(eas.status.Message())},
- }
-
- if err := l.writeHeader(eas.streamID, true, headerFields, nil); err != nil {
- return err
- }
- if eas.rst {
- if err := l.framer.fr.WriteRSTStream(eas.streamID, http2.ErrCodeNo); err != nil {
- return err
- }
- }
- return nil
-}
-
-func (l *loopyWriter) incomingGoAwayHandler(*incomingGoAway) error {
- if l.side == clientSide {
- l.draining = true
- if len(l.estdStreams) == 0 {
- // Flush and close the connection; we are done with it.
- return errors.New("received GOAWAY with no active streams")
- }
- }
- return nil
-}
-
-func (l *loopyWriter) goAwayHandler(g *goAway) error {
- // Handling of outgoing GoAway is very specific to side.
- if l.ssGoAwayHandler != nil {
- draining, err := l.ssGoAwayHandler(g)
- if err != nil {
- return err
- }
- l.draining = draining
- }
- return nil
-}
-
-func (l *loopyWriter) handle(i any) error {
- switch i := i.(type) {
- case *incomingWindowUpdate:
- l.incomingWindowUpdateHandler(i)
- case *outgoingWindowUpdate:
- return l.outgoingWindowUpdateHandler(i)
- case *incomingSettings:
- return l.incomingSettingsHandler(i)
- case *outgoingSettings:
- return l.outgoingSettingsHandler(i)
- case *headerFrame:
- return l.headerHandler(i)
- case *registerStream:
- l.registerStreamHandler(i)
- case *cleanupStream:
- return l.cleanupStreamHandler(i)
- case *earlyAbortStream:
- return l.earlyAbortStreamHandler(i)
- case *incomingGoAway:
- return l.incomingGoAwayHandler(i)
- case *dataFrame:
- l.preprocessData(i)
- case *ping:
- return l.pingHandler(i)
- case *goAway:
- return l.goAwayHandler(i)
- case *outFlowControlSizeRequest:
- l.outFlowControlSizeRequestHandler(i)
- case closeConnection:
- // Just return a non-I/O error and run() will flush and close the
- // connection.
- return ErrConnClosing
- default:
- return fmt.Errorf("transport: unknown control message type %T", i)
- }
- return nil
-}
-
-func (l *loopyWriter) applySettings(ss []http2.Setting) {
- for _, s := range ss {
- switch s.ID {
- case http2.SettingInitialWindowSize:
- o := l.oiws
- l.oiws = s.Val
- if o < l.oiws {
- // If the new limit is greater make all depleted streams active.
- for _, stream := range l.estdStreams {
- if stream.state == waitingOnStreamQuota {
- stream.state = active
- l.activeStreams.enqueue(stream)
- }
- }
- }
- case http2.SettingHeaderTableSize:
- updateHeaderTblSize(l.hEnc, s.Val)
- }
- }
-}
-
-// processData removes the first stream from active streams, writes out at most 16KB
-// of its data and then puts it at the end of activeStreams if there's still more data
-// to be sent and stream has some stream-level flow control.
-func (l *loopyWriter) processData() (bool, error) {
- if l.sendQuota == 0 {
- return true, nil
- }
- str := l.activeStreams.dequeue() // Remove the first stream.
- if str == nil {
- return true, nil
- }
- dataItem := str.itl.peek().(*dataFrame) // Peek at the first data item this stream.
- // A data item is represented by a dataFrame, since it later translates into
- // multiple HTTP2 data frames.
- // Every dataFrame has two buffers; h that keeps grpc-message header and data
- // that is the actual message. As an optimization to keep wire traffic low, data
- // from data is copied to h to make as big as the maximum possible HTTP2 frame
- // size.
-
- if len(dataItem.h) == 0 && dataItem.reader.Remaining() == 0 { // Empty data frame
- // Client sends out empty data frame with endStream = true
- if err := l.framer.fr.WriteData(dataItem.streamID, dataItem.endStream, nil); err != nil {
- return false, err
- }
- str.itl.dequeue() // remove the empty data item from stream
- _ = dataItem.reader.Close()
- if str.itl.isEmpty() {
- str.state = empty
- } else if trailer, ok := str.itl.peek().(*headerFrame); ok { // the next item is trailers.
- if err := l.writeHeader(trailer.streamID, trailer.endStream, trailer.hf, trailer.onWrite); err != nil {
- return false, err
- }
- if err := l.cleanupStreamHandler(trailer.cleanup); err != nil {
- return false, err
- }
- } else {
- l.activeStreams.enqueue(str)
- }
- return false, nil
- }
-
- // Figure out the maximum size we can send
- maxSize := http2MaxFrameLen
- if strQuota := int(l.oiws) - str.bytesOutStanding; strQuota <= 0 { // stream-level flow control.
- str.state = waitingOnStreamQuota
- return false, nil
- } else if maxSize > strQuota {
- maxSize = strQuota
- }
- if maxSize > int(l.sendQuota) { // connection-level flow control.
- maxSize = int(l.sendQuota)
- }
- // Compute how much of the header and data we can send within quota and max frame length
- hSize := min(maxSize, len(dataItem.h))
- dSize := min(maxSize-hSize, dataItem.reader.Remaining())
- remainingBytes := len(dataItem.h) + dataItem.reader.Remaining() - hSize - dSize
- size := hSize + dSize
-
- var buf *[]byte
-
- if hSize != 0 && dSize == 0 {
- buf = &dataItem.h
- } else {
- // Note: this is only necessary because the http2.Framer does not support
- // partially writing a frame, so the sequence must be materialized into a buffer.
- // TODO: Revisit once https://github.com/golang/go/issues/66655 is addressed.
- pool := l.bufferPool
- if pool == nil {
- // Note that this is only supposed to be nil in tests. Otherwise, stream is
- // always initialized with a BufferPool.
- pool = mem.DefaultBufferPool()
- }
- buf = pool.Get(size)
- defer pool.Put(buf)
-
- copy((*buf)[:hSize], dataItem.h)
- _, _ = dataItem.reader.Read((*buf)[hSize:])
- }
-
- // Now that outgoing flow controls are checked we can replenish str's write quota
- str.wq.replenish(size)
- var endStream bool
- // If this is the last data message on this stream and all of it can be written in this iteration.
- if dataItem.endStream && remainingBytes == 0 {
- endStream = true
- }
- if dataItem.onEachWrite != nil {
- dataItem.onEachWrite()
- }
- if err := l.framer.fr.WriteData(dataItem.streamID, endStream, (*buf)[:size]); err != nil {
- return false, err
- }
- str.bytesOutStanding += size
- l.sendQuota -= uint32(size)
- dataItem.h = dataItem.h[hSize:]
-
- if remainingBytes == 0 { // All the data from that message was written out.
- _ = dataItem.reader.Close()
- str.itl.dequeue()
- }
- if str.itl.isEmpty() {
- str.state = empty
- } else if trailer, ok := str.itl.peek().(*headerFrame); ok { // The next item is trailers.
- if err := l.writeHeader(trailer.streamID, trailer.endStream, trailer.hf, trailer.onWrite); err != nil {
- return false, err
- }
- if err := l.cleanupStreamHandler(trailer.cleanup); err != nil {
- return false, err
- }
- } else if int(l.oiws)-str.bytesOutStanding <= 0 { // Ran out of stream quota.
- str.state = waitingOnStreamQuota
- } else { // Otherwise add it back to the list of active streams.
- l.activeStreams.enqueue(str)
- }
- return false, nil
-}