summaryrefslogtreecommitdiff
path: root/vendor/google.golang.org/grpc/internal/transport
diff options
context:
space:
mode:
authorLibravatar tobi <31960611+tsmethurst@users.noreply.github.com>2024-08-26 18:05:54 +0200
committerLibravatar GitHub <noreply@github.com>2024-08-26 18:05:54 +0200
commit28d57d1f13ee61a6ff83ce4beaf238139d20bbac (patch)
tree7946abb44b21f9e2f4267146711760a911072c9b /vendor/google.golang.org/grpc/internal/transport
parent[chore]: Bump github.com/prometheus/client_golang from 1.20.0 to 1.20.2 (#3239) (diff)
downloadgotosocial-28d57d1f13ee61a6ff83ce4beaf238139d20bbac.tar.xz
[chore] Bump all otel deps (#3241)
Diffstat (limited to 'vendor/google.golang.org/grpc/internal/transport')
-rw-r--r--vendor/google.golang.org/grpc/internal/transport/controlbuf.go33
-rw-r--r--vendor/google.golang.org/grpc/internal/transport/http2_client.go68
-rw-r--r--vendor/google.golang.org/grpc/internal/transport/http2_server.go16
-rw-r--r--vendor/google.golang.org/grpc/internal/transport/transport.go2
4 files changed, 69 insertions, 50 deletions
diff --git a/vendor/google.golang.org/grpc/internal/transport/controlbuf.go b/vendor/google.golang.org/grpc/internal/transport/controlbuf.go
index 83c382982..3deadfb4a 100644
--- a/vendor/google.golang.org/grpc/internal/transport/controlbuf.go
+++ b/vendor/google.golang.org/grpc/internal/transport/controlbuf.go
@@ -193,7 +193,7 @@ type goAway struct {
code http2.ErrCode
debugData []byte
headsUp bool
- closeConn error // if set, loopyWriter will exit, resulting in conn closure
+ closeConn error // if set, loopyWriter will exit with this error
}
func (*goAway) isTransportResponseFrame() bool { return false }
@@ -336,7 +336,7 @@ func (c *controlBuffer) put(it cbItem) error {
return err
}
-func (c *controlBuffer) executeAndPut(f func(it any) bool, it cbItem) (bool, error) {
+func (c *controlBuffer) executeAndPut(f func() bool, it cbItem) (bool, error) {
var wakeUp bool
c.mu.Lock()
if c.err != nil {
@@ -344,7 +344,7 @@ func (c *controlBuffer) executeAndPut(f func(it any) bool, it cbItem) (bool, err
return false, c.err
}
if f != nil {
- if !f(it) { // f wasn't successful
+ if !f() { // f wasn't successful
c.mu.Unlock()
return false, nil
}
@@ -495,21 +495,22 @@ type loopyWriter struct {
ssGoAwayHandler func(*goAway) (bool, error)
}
-func newLoopyWriter(s side, fr *framer, cbuf *controlBuffer, bdpEst *bdpEstimator, conn net.Conn, logger *grpclog.PrefixLogger) *loopyWriter {
+func newLoopyWriter(s side, fr *framer, cbuf *controlBuffer, bdpEst *bdpEstimator, conn net.Conn, logger *grpclog.PrefixLogger, goAwayHandler func(*goAway) (bool, error)) *loopyWriter {
var buf bytes.Buffer
l := &loopyWriter{
- side: s,
- cbuf: cbuf,
- sendQuota: defaultWindowSize,
- oiws: defaultWindowSize,
- estdStreams: make(map[uint32]*outStream),
- activeStreams: newOutStreamList(),
- framer: fr,
- hBuf: &buf,
- hEnc: hpack.NewEncoder(&buf),
- bdpEst: bdpEst,
- conn: conn,
- logger: logger,
+ side: s,
+ cbuf: cbuf,
+ sendQuota: defaultWindowSize,
+ oiws: defaultWindowSize,
+ estdStreams: make(map[uint32]*outStream),
+ activeStreams: newOutStreamList(),
+ framer: fr,
+ hBuf: &buf,
+ hEnc: hpack.NewEncoder(&buf),
+ bdpEst: bdpEst,
+ conn: conn,
+ logger: logger,
+ ssGoAwayHandler: goAwayHandler,
}
return l
}
diff --git a/vendor/google.golang.org/grpc/internal/transport/http2_client.go b/vendor/google.golang.org/grpc/internal/transport/http2_client.go
index deba0c4d9..3c63c7069 100644
--- a/vendor/google.golang.org/grpc/internal/transport/http2_client.go
+++ b/vendor/google.golang.org/grpc/internal/transport/http2_client.go
@@ -114,11 +114,11 @@ type http2Client struct {
streamQuota int64
streamsQuotaAvailable chan struct{}
waitingStreams uint32
- nextID uint32
registeredCompressors string
// Do not access controlBuf with mu held.
mu sync.Mutex // guard the following variables
+ nextID uint32
state transportState
activeStreams map[uint32]*Stream
// prevGoAway ID records the Last-Stream-ID in the previous GOAway frame.
@@ -408,10 +408,10 @@ func newHTTP2Client(connectCtx, ctx context.Context, addr resolver.Address, opts
readerErrCh := make(chan error, 1)
go t.reader(readerErrCh)
defer func() {
- if err == nil {
- err = <-readerErrCh
- }
if err != nil {
+ // writerDone should be closed since the loopy goroutine
+ // wouldn't have started in the case this function returns an error.
+ close(t.writerDone)
t.Close(err)
}
}()
@@ -458,8 +458,12 @@ func newHTTP2Client(connectCtx, ctx context.Context, addr resolver.Address, opts
if err := t.framer.writer.Flush(); err != nil {
return nil, err
}
+ // Block until the server preface is received successfully or an error occurs.
+ if err = <-readerErrCh; err != nil {
+ return nil, err
+ }
go func() {
- t.loopy = newLoopyWriter(clientSide, t.framer, t.controlBuf, t.bdpEst, t.conn, t.logger)
+ t.loopy = newLoopyWriter(clientSide, t.framer, t.controlBuf, t.bdpEst, t.conn, t.logger, t.outgoingGoAwayHandler)
if err := t.loopy.run(); !isIOError(err) {
// Immediately close the connection, as the loopy writer returns
// when there are no more active streams and we were draining (the
@@ -517,6 +521,17 @@ func (t *http2Client) getPeer() *peer.Peer {
}
}
+// OutgoingGoAwayHandler writes a GOAWAY to the connection. Always returns (false, err) as we want the GoAway
+// to be the last frame loopy writes to the transport.
+func (t *http2Client) outgoingGoAwayHandler(g *goAway) (bool, error) {
+ t.mu.Lock()
+ defer t.mu.Unlock()
+ if err := t.framer.fr.WriteGoAway(t.nextID-2, http2.ErrCodeNo, g.debugData); err != nil {
+ return false, err
+ }
+ return false, g.closeConn
+}
+
func (t *http2Client) createHeaderFields(ctx context.Context, callHdr *CallHdr) ([]hpack.HeaderField, error) {
aud := t.createAudience(callHdr)
ri := credentials.RequestInfo{
@@ -781,7 +796,7 @@ func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (*Stream,
firstTry := true
var ch chan struct{}
transportDrainRequired := false
- checkForStreamQuota := func(it any) bool {
+ checkForStreamQuota := func() bool {
if t.streamQuota <= 0 { // Can go negative if server decreases it.
if firstTry {
t.waitingStreams++
@@ -793,23 +808,24 @@ func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (*Stream,
t.waitingStreams--
}
t.streamQuota--
- h := it.(*headerFrame)
- h.streamID = t.nextID
- t.nextID += 2
-
- // Drain client transport if nextID > MaxStreamID which signals gRPC that
- // the connection is closed and a new one must be created for subsequent RPCs.
- transportDrainRequired = t.nextID > MaxStreamID
- s.id = h.streamID
- s.fc = &inFlow{limit: uint32(t.initialWindowSize)}
t.mu.Lock()
if t.state == draining || t.activeStreams == nil { // Can be niled from Close().
t.mu.Unlock()
return false // Don't create a stream if the transport is already closed.
}
+
+ hdr.streamID = t.nextID
+ t.nextID += 2
+ // Drain client transport if nextID > MaxStreamID which signals gRPC that
+ // the connection is closed and a new one must be created for subsequent RPCs.
+ transportDrainRequired = t.nextID > MaxStreamID
+
+ s.id = hdr.streamID
+ s.fc = &inFlow{limit: uint32(t.initialWindowSize)}
t.activeStreams[s.id] = s
t.mu.Unlock()
+
if t.streamQuota > 0 && t.waitingStreams > 0 {
select {
case t.streamsQuotaAvailable <- struct{}{}:
@@ -819,13 +835,12 @@ func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (*Stream,
return true
}
var hdrListSizeErr error
- checkForHeaderListSize := func(it any) bool {
+ checkForHeaderListSize := func() bool {
if t.maxSendHeaderListSize == nil {
return true
}
- hdrFrame := it.(*headerFrame)
var sz int64
- for _, f := range hdrFrame.hf {
+ for _, f := range hdr.hf {
if sz += int64(f.Size()); sz > int64(*t.maxSendHeaderListSize) {
hdrListSizeErr = status.Errorf(codes.Internal, "header list size to send violates the maximum size (%d bytes) set by server", *t.maxSendHeaderListSize)
return false
@@ -834,8 +849,8 @@ func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (*Stream,
return true
}
for {
- success, err := t.controlBuf.executeAndPut(func(it any) bool {
- return checkForHeaderListSize(it) && checkForStreamQuota(it)
+ success, err := t.controlBuf.executeAndPut(func() bool {
+ return checkForHeaderListSize() && checkForStreamQuota()
}, hdr)
if err != nil {
// Connection closed.
@@ -946,7 +961,7 @@ func (t *http2Client) closeStream(s *Stream, err error, rst bool, rstCode http2.
rst: rst,
rstCode: rstCode,
}
- addBackStreamQuota := func(any) bool {
+ addBackStreamQuota := func() bool {
t.streamQuota++
if t.streamQuota > 0 && t.waitingStreams > 0 {
select {
@@ -966,7 +981,7 @@ func (t *http2Client) closeStream(s *Stream, err error, rst bool, rstCode http2.
// Close kicks off the shutdown process of the transport. This should be called
// only once on a transport. Once it is called, the transport should not be
-// accessed any more.
+// accessed anymore.
func (t *http2Client) Close(err error) {
t.mu.Lock()
// Make sure we only close once.
@@ -991,7 +1006,10 @@ func (t *http2Client) Close(err error) {
t.kpDormancyCond.Signal()
}
t.mu.Unlock()
- t.controlBuf.finish()
+ // Per HTTP/2 spec, a GOAWAY frame must be sent before closing the
+ // connection. See https://httpwg.org/specs/rfc7540.html#GOAWAY.
+ t.controlBuf.put(&goAway{code: http2.ErrCodeNo, debugData: []byte("client transport shutdown"), closeConn: err})
+ <-t.writerDone
t.cancel()
t.conn.Close()
channelz.RemoveEntry(t.channelz.ID)
@@ -1099,7 +1117,7 @@ func (t *http2Client) updateWindow(s *Stream, n uint32) {
// for the transport and the stream based on the current bdp
// estimation.
func (t *http2Client) updateFlowControl(n uint32) {
- updateIWS := func(any) bool {
+ updateIWS := func() bool {
t.initialWindowSize = int32(n)
t.mu.Lock()
for _, s := range t.activeStreams {
@@ -1252,7 +1270,7 @@ func (t *http2Client) handleSettings(f *http2.SettingsFrame, isFirst bool) {
}
updateFuncs = append(updateFuncs, updateStreamQuota)
}
- t.controlBuf.executeAndPut(func(any) bool {
+ t.controlBuf.executeAndPut(func() bool {
for _, f := range updateFuncs {
f()
}
diff --git a/vendor/google.golang.org/grpc/internal/transport/http2_server.go b/vendor/google.golang.org/grpc/internal/transport/http2_server.go
index d582e0471..b7091165b 100644
--- a/vendor/google.golang.org/grpc/internal/transport/http2_server.go
+++ b/vendor/google.golang.org/grpc/internal/transport/http2_server.go
@@ -25,6 +25,7 @@ import (
"fmt"
"io"
"math"
+ "math/rand"
"net"
"net/http"
"strconv"
@@ -43,7 +44,6 @@ import (
"google.golang.org/grpc/codes"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/internal/channelz"
- "google.golang.org/grpc/internal/grpcrand"
"google.golang.org/grpc/internal/grpcsync"
"google.golang.org/grpc/keepalive"
"google.golang.org/grpc/metadata"
@@ -330,8 +330,7 @@ func NewServerTransport(conn net.Conn, config *ServerConfig) (_ ServerTransport,
t.handleSettings(sf)
go func() {
- t.loopy = newLoopyWriter(serverSide, t.framer, t.controlBuf, t.bdpEst, t.conn, t.logger)
- t.loopy.ssGoAwayHandler = t.outgoingGoAwayHandler
+ t.loopy = newLoopyWriter(serverSide, t.framer, t.controlBuf, t.bdpEst, t.conn, t.logger, t.outgoingGoAwayHandler)
err := t.loopy.run()
close(t.loopyWriterDone)
if !isIOError(err) {
@@ -860,7 +859,7 @@ func (t *http2Server) handleSettings(f *http2.SettingsFrame) {
}
return nil
})
- t.controlBuf.executeAndPut(func(any) bool {
+ t.controlBuf.executeAndPut(func() bool {
for _, f := range updateFuncs {
f()
}
@@ -1014,12 +1013,13 @@ func (t *http2Server) writeHeaderLocked(s *Stream) error {
headerFields = append(headerFields, hpack.HeaderField{Name: "grpc-encoding", Value: s.sendCompress})
}
headerFields = appendHeaderFieldsFromMD(headerFields, s.header)
- success, err := t.controlBuf.executeAndPut(t.checkForHeaderListSize, &headerFrame{
+ hf := &headerFrame{
streamID: s.id,
hf: headerFields,
endStream: false,
onWrite: t.setResetPingStrikes,
- })
+ }
+ success, err := t.controlBuf.executeAndPut(func() bool { return t.checkForHeaderListSize(hf) }, hf)
if !success {
if err != nil {
return err
@@ -1208,7 +1208,7 @@ func (t *http2Server) keepalive() {
continue
}
if outstandingPing && kpTimeoutLeft <= 0 {
- t.Close(fmt.Errorf("keepalive ping not acked within timeout %s", t.kp.Time))
+ t.Close(fmt.Errorf("keepalive ping not acked within timeout %s", t.kp.Timeout))
return
}
if !outstandingPing {
@@ -1440,7 +1440,7 @@ func getJitter(v time.Duration) time.Duration {
}
// Generate a jitter between +/- 10% of the value.
r := int64(v / 10)
- j := grpcrand.Int63n(2*r) - r
+ j := rand.Int63n(2*r) - r
return time.Duration(j)
}
diff --git a/vendor/google.golang.org/grpc/internal/transport/transport.go b/vendor/google.golang.org/grpc/internal/transport/transport.go
index 0d2a6e47f..4b39c0ade 100644
--- a/vendor/google.golang.org/grpc/internal/transport/transport.go
+++ b/vendor/google.golang.org/grpc/internal/transport/transport.go
@@ -304,7 +304,7 @@ func (s *Stream) isHeaderSent() bool {
}
// updateHeaderSent updates headerSent and returns true
-// if it was alreay set. It is valid only on server-side.
+// if it was already set. It is valid only on server-side.
func (s *Stream) updateHeaderSent() bool {
return atomic.SwapUint32(&s.headerSent, 1) == 1
}