summaryrefslogtreecommitdiff
path: root/vendor/google.golang.org/grpc/internal
diff options
context:
space:
mode:
Diffstat (limited to 'vendor/google.golang.org/grpc/internal')
-rw-r--r--vendor/google.golang.org/grpc/internal/binarylog/binarylog.go8
-rw-r--r--vendor/google.golang.org/grpc/internal/binarylog/method_logger.go5
-rw-r--r--vendor/google.golang.org/grpc/internal/grpclog/prefixLogger.go12
-rw-r--r--vendor/google.golang.org/grpc/internal/internal.go10
-rw-r--r--vendor/google.golang.org/grpc/internal/metadata/metadata.go62
-rw-r--r--vendor/google.golang.org/grpc/internal/transport/controlbuf.go82
-rw-r--r--vendor/google.golang.org/grpc/internal/transport/http2_client.go21
-rw-r--r--vendor/google.golang.org/grpc/internal/transport/http2_server.go27
-rw-r--r--vendor/google.golang.org/grpc/internal/transport/http_util.go24
-rw-r--r--vendor/google.golang.org/grpc/internal/transport/transport.go23
10 files changed, 177 insertions, 97 deletions
diff --git a/vendor/google.golang.org/grpc/internal/binarylog/binarylog.go b/vendor/google.golang.org/grpc/internal/binarylog/binarylog.go
index 809d73cca..af03a40d9 100644
--- a/vendor/google.golang.org/grpc/internal/binarylog/binarylog.go
+++ b/vendor/google.golang.org/grpc/internal/binarylog/binarylog.go
@@ -28,8 +28,10 @@ import (
"google.golang.org/grpc/internal/grpcutil"
)
-// Logger is the global binary logger. It can be used to get binary logger for
-// each method.
+var grpclogLogger = grpclog.Component("binarylog")
+
+// Logger specifies MethodLoggers for method names with a Log call that
+// takes a context.
type Logger interface {
GetMethodLogger(methodName string) MethodLogger
}
@@ -40,8 +42,6 @@ type Logger interface {
// It is used to get a MethodLogger for each individual method.
var binLogger Logger
-var grpclogLogger = grpclog.Component("binarylog")
-
// SetLogger sets the binary logger.
//
// Only call this at init time.
diff --git a/vendor/google.golang.org/grpc/internal/binarylog/method_logger.go b/vendor/google.golang.org/grpc/internal/binarylog/method_logger.go
index d71e44177..56fcf008d 100644
--- a/vendor/google.golang.org/grpc/internal/binarylog/method_logger.go
+++ b/vendor/google.golang.org/grpc/internal/binarylog/method_logger.go
@@ -19,6 +19,7 @@
package binarylog
import (
+ "context"
"net"
"strings"
"sync/atomic"
@@ -49,7 +50,7 @@ var idGen callIDGenerator
// MethodLogger is the sub-logger for each method.
type MethodLogger interface {
- Log(LogEntryConfig)
+ Log(context.Context, LogEntryConfig)
}
// TruncatingMethodLogger is a method logger that truncates headers and messages
@@ -98,7 +99,7 @@ func (ml *TruncatingMethodLogger) Build(c LogEntryConfig) *binlogpb.GrpcLogEntry
}
// Log creates a proto binary log entry, and logs it to the sink.
-func (ml *TruncatingMethodLogger) Log(c LogEntryConfig) {
+func (ml *TruncatingMethodLogger) Log(ctx context.Context, c LogEntryConfig) {
ml.sink.Write(ml.Build(c))
}
diff --git a/vendor/google.golang.org/grpc/internal/grpclog/prefixLogger.go b/vendor/google.golang.org/grpc/internal/grpclog/prefixLogger.go
index 82af70e96..02224b42c 100644
--- a/vendor/google.golang.org/grpc/internal/grpclog/prefixLogger.go
+++ b/vendor/google.golang.org/grpc/internal/grpclog/prefixLogger.go
@@ -63,6 +63,9 @@ func (pl *PrefixLogger) Errorf(format string, args ...interface{}) {
// Debugf does info logging at verbose level 2.
func (pl *PrefixLogger) Debugf(format string, args ...interface{}) {
+ // TODO(6044): Refactor interfaces LoggerV2 and DepthLogger, and maybe
+ // rewrite PrefixLogger a little to ensure that we don't use the global
+ // `Logger` here, and instead use the `logger` field.
if !Logger.V(2) {
return
}
@@ -73,6 +76,15 @@ func (pl *PrefixLogger) Debugf(format string, args ...interface{}) {
return
}
InfoDepth(1, fmt.Sprintf(format, args...))
+
+}
+
+// V reports whether verbosity level l is at least the requested verbose level.
+func (pl *PrefixLogger) V(l int) bool {
+ // TODO(6044): Refactor interfaces LoggerV2 and DepthLogger, and maybe
+ // rewrite PrefixLogger a little to ensure that we don't use the global
+ // `Logger` here, and instead use the `logger` field.
+ return Logger.V(l)
}
// NewPrefixLogger creates a prefix logger with the given prefix.
diff --git a/vendor/google.golang.org/grpc/internal/internal.go b/vendor/google.golang.org/grpc/internal/internal.go
index 0a76d9de6..836b6a3b3 100644
--- a/vendor/google.golang.org/grpc/internal/internal.go
+++ b/vendor/google.golang.org/grpc/internal/internal.go
@@ -58,6 +58,9 @@ var (
// gRPC server. An xDS-enabled server needs to know what type of credentials
// is configured on the underlying gRPC server. This is set by server.go.
GetServerCredentials interface{} // func (*grpc.Server) credentials.TransportCredentials
+ // CanonicalString returns the canonical string of the code defined here:
+ // https://github.com/grpc/grpc/blob/master/doc/statuscodes.md.
+ CanonicalString interface{} // func (codes.Code) string
// DrainServerTransports initiates a graceful close of existing connections
// on a gRPC server accepted on the provided listener address. An
// xDS-enabled server invokes this method on a grpc.Server when a particular
@@ -74,6 +77,10 @@ var (
// globally for newly created client channels. The priority will be: 1.
// user-provided; 2. this method; 3. default values.
AddGlobalDialOptions interface{} // func(opt ...DialOption)
+ // DisableGlobalDialOptions returns a DialOption that prevents the
+ // ClientConn from applying the global DialOptions (set via
+ // AddGlobalDialOptions).
+ DisableGlobalDialOptions interface{} // func() grpc.DialOption
// ClearGlobalDialOptions clears the array of extra DialOption. This
// method is useful in testing and benchmarking.
ClearGlobalDialOptions func()
@@ -130,6 +137,9 @@ var (
//
// TODO: Remove this function once the RBAC env var is removed.
UnregisterRBACHTTPFilterForTesting func()
+
+ // ORCAAllowAnyMinReportingInterval is for examples/orca use ONLY.
+ ORCAAllowAnyMinReportingInterval interface{} // func(so *orca.ServiceOptions)
)
// HealthChecker defines the signature of the client-side LB channel health checking function.
diff --git a/vendor/google.golang.org/grpc/internal/metadata/metadata.go b/vendor/google.golang.org/grpc/internal/metadata/metadata.go
index b2980f8ac..c82e608e0 100644
--- a/vendor/google.golang.org/grpc/internal/metadata/metadata.go
+++ b/vendor/google.golang.org/grpc/internal/metadata/metadata.go
@@ -76,33 +76,11 @@ func Set(addr resolver.Address, md metadata.MD) resolver.Address {
return addr
}
-// Validate returns an error if the input md contains invalid keys or values.
-//
-// If the header is not a pseudo-header, the following items are checked:
-// - header names must contain one or more characters from this set [0-9 a-z _ - .].
-// - if the header-name ends with a "-bin" suffix, no validation of the header value is performed.
-// - otherwise, the header value must contain one or more characters from the set [%x20-%x7E].
+// Validate validates every pair in md with ValidatePair.
func Validate(md metadata.MD) error {
for k, vals := range md {
- // pseudo-header will be ignored
- if k[0] == ':' {
- continue
- }
- // check key, for i that saving a conversion if not using for range
- for i := 0; i < len(k); i++ {
- r := k[i]
- if !(r >= 'a' && r <= 'z') && !(r >= '0' && r <= '9') && r != '.' && r != '-' && r != '_' {
- return fmt.Errorf("header key %q contains illegal characters not in [0-9a-z-_.]", k)
- }
- }
- if strings.HasSuffix(k, "-bin") {
- continue
- }
- // check value
- for _, val := range vals {
- if hasNotPrintable(val) {
- return fmt.Errorf("header key %q contains value with non-printable ASCII characters", k)
- }
+ if err := ValidatePair(k, vals...); err != nil {
+ return err
}
}
return nil
@@ -118,3 +96,37 @@ func hasNotPrintable(msg string) bool {
}
return false
}
+
+// ValidatePair validate a key-value pair with the following rules (the pseudo-header will be skipped) :
+//
+// - key must contain one or more characters.
+// - the characters in the key must be contained in [0-9 a-z _ - .].
+// - if the key ends with a "-bin" suffix, no validation of the corresponding value is performed.
+// - the characters in the every value must be printable (in [%x20-%x7E]).
+func ValidatePair(key string, vals ...string) error {
+ // key should not be empty
+ if key == "" {
+ return fmt.Errorf("there is an empty key in the header")
+ }
+ // pseudo-header will be ignored
+ if key[0] == ':' {
+ return nil
+ }
+ // check key, for i that saving a conversion if not using for range
+ for i := 0; i < len(key); i++ {
+ r := key[i]
+ if !(r >= 'a' && r <= 'z') && !(r >= '0' && r <= '9') && r != '.' && r != '-' && r != '_' {
+ return fmt.Errorf("header key %q contains illegal characters not in [0-9a-z-_.]", key)
+ }
+ }
+ if strings.HasSuffix(key, "-bin") {
+ return nil
+ }
+ // check value
+ for _, val := range vals {
+ if hasNotPrintable(val) {
+ return fmt.Errorf("header key %q contains value with non-printable ASCII characters", key)
+ }
+ }
+ return nil
+}
diff --git a/vendor/google.golang.org/grpc/internal/transport/controlbuf.go b/vendor/google.golang.org/grpc/internal/transport/controlbuf.go
index 9097385e1..c343c23a5 100644
--- a/vendor/google.golang.org/grpc/internal/transport/controlbuf.go
+++ b/vendor/google.golang.org/grpc/internal/transport/controlbuf.go
@@ -22,6 +22,7 @@ import (
"bytes"
"errors"
"fmt"
+ "net"
"runtime"
"strconv"
"sync"
@@ -486,12 +487,13 @@ type loopyWriter struct {
hEnc *hpack.Encoder // HPACK encoder.
bdpEst *bdpEstimator
draining bool
+ conn net.Conn
// Side-specific handlers
ssGoAwayHandler func(*goAway) (bool, error)
}
-func newLoopyWriter(s side, fr *framer, cbuf *controlBuffer, bdpEst *bdpEstimator) *loopyWriter {
+func newLoopyWriter(s side, fr *framer, cbuf *controlBuffer, bdpEst *bdpEstimator, conn net.Conn) *loopyWriter {
var buf bytes.Buffer
l := &loopyWriter{
side: s,
@@ -504,6 +506,7 @@ func newLoopyWriter(s side, fr *framer, cbuf *controlBuffer, bdpEst *bdpEstimato
hBuf: &buf,
hEnc: hpack.NewEncoder(&buf),
bdpEst: bdpEst,
+ conn: conn,
}
return l
}
@@ -521,15 +524,27 @@ const minBatchSize = 1000
// 2. Stream level flow control quota available.
//
// In each iteration of run loop, other than processing the incoming control
-// frame, loopy calls processData, which processes one node from the activeStreams linked-list.
-// This results in writing of HTTP2 frames into an underlying write buffer.
-// When there's no more control frames to read from controlBuf, loopy flushes the write buffer.
-// As an optimization, to increase the batch size for each flush, loopy yields the processor, once
-// if the batch size is too low to give stream goroutines a chance to fill it up.
+// frame, loopy calls processData, which processes one node from the
+// activeStreams linked-list. This results in writing of HTTP2 frames into an
+// underlying write buffer. When there's no more control frames to read from
+// controlBuf, loopy flushes the write buffer. As an optimization, to increase
+// the batch size for each flush, loopy yields the processor, once if the batch
+// size is too low to give stream goroutines a chance to fill it up.
+//
+// Upon exiting, if the error causing the exit is not an I/O error, run()
+// flushes and closes the underlying connection. Otherwise, the connection is
+// left open to allow the I/O error to be encountered by the reader instead.
func (l *loopyWriter) run() (err error) {
- // Always flush the writer before exiting in case there are pending frames
- // to be sent.
- defer l.framer.writer.Flush()
+ defer func() {
+ if logger.V(logLevel) {
+ logger.Infof("transport: loopyWriter exiting with error: %v", err)
+ }
+ if !isIOError(err) {
+ l.framer.writer.Flush()
+ l.conn.Close()
+ }
+ l.cbuf.finish()
+ }()
for {
it, err := l.cbuf.get(true)
if err != nil {
@@ -581,11 +596,11 @@ func (l *loopyWriter) outgoingWindowUpdateHandler(w *outgoingWindowUpdate) error
return l.framer.fr.WriteWindowUpdate(w.streamID, w.increment)
}
-func (l *loopyWriter) incomingWindowUpdateHandler(w *incomingWindowUpdate) error {
+func (l *loopyWriter) incomingWindowUpdateHandler(w *incomingWindowUpdate) {
// Otherwise update the quota.
if w.streamID == 0 {
l.sendQuota += w.increment
- return nil
+ return
}
// Find the stream and update it.
if str, ok := l.estdStreams[w.streamID]; ok {
@@ -593,10 +608,9 @@ func (l *loopyWriter) incomingWindowUpdateHandler(w *incomingWindowUpdate) error
if strQuota := int(l.oiws) - str.bytesOutStanding; strQuota > 0 && str.state == waitingOnStreamQuota {
str.state = active
l.activeStreams.enqueue(str)
- return nil
+ return
}
}
- return nil
}
func (l *loopyWriter) outgoingSettingsHandler(s *outgoingSettings) error {
@@ -604,13 +618,11 @@ func (l *loopyWriter) outgoingSettingsHandler(s *outgoingSettings) error {
}
func (l *loopyWriter) incomingSettingsHandler(s *incomingSettings) error {
- if err := l.applySettings(s.ss); err != nil {
- return err
- }
+ l.applySettings(s.ss)
return l.framer.fr.WriteSettingsAck()
}
-func (l *loopyWriter) registerStreamHandler(h *registerStream) error {
+func (l *loopyWriter) registerStreamHandler(h *registerStream) {
str := &outStream{
id: h.streamID,
state: empty,
@@ -618,7 +630,6 @@ func (l *loopyWriter) registerStreamHandler(h *registerStream) error {
wq: h.wq,
}
l.estdStreams[h.streamID] = str
- return nil
}
func (l *loopyWriter) headerHandler(h *headerFrame) error {
@@ -720,10 +731,10 @@ func (l *loopyWriter) writeHeader(streamID uint32, endStream bool, hf []hpack.He
return nil
}
-func (l *loopyWriter) preprocessData(df *dataFrame) error {
+func (l *loopyWriter) preprocessData(df *dataFrame) {
str, ok := l.estdStreams[df.streamID]
if !ok {
- return nil
+ return
}
// If we got data for a stream it means that
// stream was originated and the headers were sent out.
@@ -732,7 +743,6 @@ func (l *loopyWriter) preprocessData(df *dataFrame) error {
str.state = active
l.activeStreams.enqueue(str)
}
- return nil
}
func (l *loopyWriter) pingHandler(p *ping) error {
@@ -743,9 +753,8 @@ func (l *loopyWriter) pingHandler(p *ping) error {
}
-func (l *loopyWriter) outFlowControlSizeRequestHandler(o *outFlowControlSizeRequest) error {
+func (l *loopyWriter) outFlowControlSizeRequestHandler(o *outFlowControlSizeRequest) {
o.resp <- l.sendQuota
- return nil
}
func (l *loopyWriter) cleanupStreamHandler(c *cleanupStream) error {
@@ -763,6 +772,7 @@ func (l *loopyWriter) cleanupStreamHandler(c *cleanupStream) error {
}
}
if l.draining && len(l.estdStreams) == 0 {
+ // Flush and close the connection; we are done with it.
return errors.New("finished processing active streams while in draining mode")
}
return nil
@@ -798,6 +808,7 @@ func (l *loopyWriter) incomingGoAwayHandler(*incomingGoAway) error {
if l.side == clientSide {
l.draining = true
if len(l.estdStreams) == 0 {
+ // Flush and close the connection; we are done with it.
return errors.New("received GOAWAY with no active streams")
}
}
@@ -816,17 +827,10 @@ func (l *loopyWriter) goAwayHandler(g *goAway) error {
return nil
}
-func (l *loopyWriter) closeConnectionHandler() error {
- // Exit loopyWriter entirely by returning an error here. This will lead to
- // the transport closing the connection, and, ultimately, transport
- // closure.
- return ErrConnClosing
-}
-
func (l *loopyWriter) handle(i interface{}) error {
switch i := i.(type) {
case *incomingWindowUpdate:
- return l.incomingWindowUpdateHandler(i)
+ l.incomingWindowUpdateHandler(i)
case *outgoingWindowUpdate:
return l.outgoingWindowUpdateHandler(i)
case *incomingSettings:
@@ -836,7 +840,7 @@ func (l *loopyWriter) handle(i interface{}) error {
case *headerFrame:
return l.headerHandler(i)
case *registerStream:
- return l.registerStreamHandler(i)
+ l.registerStreamHandler(i)
case *cleanupStream:
return l.cleanupStreamHandler(i)
case *earlyAbortStream:
@@ -844,21 +848,24 @@ func (l *loopyWriter) handle(i interface{}) error {
case *incomingGoAway:
return l.incomingGoAwayHandler(i)
case *dataFrame:
- return l.preprocessData(i)
+ l.preprocessData(i)
case *ping:
return l.pingHandler(i)
case *goAway:
return l.goAwayHandler(i)
case *outFlowControlSizeRequest:
- return l.outFlowControlSizeRequestHandler(i)
+ l.outFlowControlSizeRequestHandler(i)
case closeConnection:
- return l.closeConnectionHandler()
+ // Just return a non-I/O error and run() will flush and close the
+ // connection.
+ return ErrConnClosing
default:
return fmt.Errorf("transport: unknown control message type %T", i)
}
+ return nil
}
-func (l *loopyWriter) applySettings(ss []http2.Setting) error {
+func (l *loopyWriter) applySettings(ss []http2.Setting) {
for _, s := range ss {
switch s.ID {
case http2.SettingInitialWindowSize:
@@ -877,7 +884,6 @@ func (l *loopyWriter) applySettings(ss []http2.Setting) error {
updateHeaderTblSize(l.hEnc, s.Val)
}
}
- return nil
}
// processData removes the first stream from active streams, writes out at most 16KB
@@ -911,7 +917,7 @@ func (l *loopyWriter) processData() (bool, error) {
return false, err
}
if err := l.cleanupStreamHandler(trailer.cleanup); err != nil {
- return false, nil
+ return false, err
}
} else {
l.activeStreams.enqueue(str)
diff --git a/vendor/google.golang.org/grpc/internal/transport/http2_client.go b/vendor/google.golang.org/grpc/internal/transport/http2_client.go
index 79ee8aea0..9826feb8c 100644
--- a/vendor/google.golang.org/grpc/internal/transport/http2_client.go
+++ b/vendor/google.golang.org/grpc/internal/transport/http2_client.go
@@ -444,15 +444,8 @@ func newHTTP2Client(connectCtx, ctx context.Context, addr resolver.Address, opts
return nil, err
}
go func() {
- t.loopy = newLoopyWriter(clientSide, t.framer, t.controlBuf, t.bdpEst)
- err := t.loopy.run()
- if logger.V(logLevel) {
- logger.Infof("transport: loopyWriter exited. Closing connection. Err: %v", err)
- }
- // Do not close the transport. Let reader goroutine handle it since
- // there might be data in the buffers.
- t.conn.Close()
- t.controlBuf.finish()
+ t.loopy = newLoopyWriter(clientSide, t.framer, t.controlBuf, t.bdpEst, t.conn)
+ t.loopy.run()
close(t.writerDone)
}()
return t, nil
@@ -1264,10 +1257,12 @@ func (t *http2Client) handleGoAway(f *http2.GoAwayFrame) {
t.mu.Unlock()
return
}
- if f.ErrCode == http2.ErrCodeEnhanceYourCalm {
- if logger.V(logLevel) {
- logger.Infof("Client received GoAway with http2.ErrCodeEnhanceYourCalm.")
- }
+ if f.ErrCode == http2.ErrCodeEnhanceYourCalm && string(f.DebugData()) == "too_many_pings" {
+ // When a client receives a GOAWAY with error code ENHANCE_YOUR_CALM and debug
+ // data equal to ASCII "too_many_pings", it should log the occurrence at a log level that is
+ // enabled by default and double the configure KEEPALIVE_TIME used for new connections
+ // on that channel.
+ logger.Errorf("Client received GoAway with error code ENHANCE_YOUR_CALM and debug data equal to ASCII \"too_many_pings\".")
}
id := f.LastStreamID
if id > 0 && id%2 == 0 {
diff --git a/vendor/google.golang.org/grpc/internal/transport/http2_server.go b/vendor/google.golang.org/grpc/internal/transport/http2_server.go
index bc3da7067..99ae1a737 100644
--- a/vendor/google.golang.org/grpc/internal/transport/http2_server.go
+++ b/vendor/google.golang.org/grpc/internal/transport/http2_server.go
@@ -331,14 +331,9 @@ func NewServerTransport(conn net.Conn, config *ServerConfig) (_ ServerTransport,
t.handleSettings(sf)
go func() {
- t.loopy = newLoopyWriter(serverSide, t.framer, t.controlBuf, t.bdpEst)
+ t.loopy = newLoopyWriter(serverSide, t.framer, t.controlBuf, t.bdpEst, t.conn)
t.loopy.ssGoAwayHandler = t.outgoingGoAwayHandler
- err := t.loopy.run()
- if logger.V(logLevel) {
- logger.Infof("transport: loopyWriter exited. Closing connection. Err: %v", err)
- }
- t.conn.Close()
- t.controlBuf.finish()
+ t.loopy.run()
close(t.writerDone)
}()
go t.keepalive()
@@ -383,7 +378,7 @@ func (t *http2Server) operateHeaders(frame *http2.MetaHeadersFrame, handle func(
// if false, content-type was missing or invalid
isGRPC = false
contentType = ""
- mdata = make(map[string][]string)
+ mdata = make(metadata.MD, len(frame.Fields))
httpMethod string
// these are set if an error is encountered while parsing the headers
protocolError bool
@@ -404,6 +399,17 @@ func (t *http2Server) operateHeaders(frame *http2.MetaHeadersFrame, handle func(
mdata[hf.Name] = append(mdata[hf.Name], hf.Value)
s.contentSubtype = contentSubtype
isGRPC = true
+
+ case "grpc-accept-encoding":
+ mdata[hf.Name] = append(mdata[hf.Name], hf.Value)
+ if hf.Value == "" {
+ continue
+ }
+ compressors := hf.Value
+ if s.clientAdvertisedCompressors != "" {
+ compressors = s.clientAdvertisedCompressors + "," + compressors
+ }
+ s.clientAdvertisedCompressors = compressors
case "grpc-encoding":
s.recvCompress = hf.Value
case ":method":
@@ -595,7 +601,7 @@ func (t *http2Server) operateHeaders(frame *http2.MetaHeadersFrame, handle func(
LocalAddr: t.localAddr,
Compression: s.recvCompress,
WireLength: int(frame.Header().Length),
- Header: metadata.MD(mdata).Copy(),
+ Header: mdata.Copy(),
}
sh.HandleRPC(s.ctx, inHeader)
}
@@ -1344,9 +1350,6 @@ func (t *http2Server) outgoingGoAwayHandler(g *goAway) (bool, error) {
return false, err
}
if retErr != nil {
- // Abruptly close the connection following the GoAway (via
- // loopywriter). But flush out what's inside the buffer first.
- t.framer.writer.Flush()
return false, retErr
}
return true, nil
diff --git a/vendor/google.golang.org/grpc/internal/transport/http_util.go b/vendor/google.golang.org/grpc/internal/transport/http_util.go
index 2c601a864..8fcae4f4d 100644
--- a/vendor/google.golang.org/grpc/internal/transport/http_util.go
+++ b/vendor/google.golang.org/grpc/internal/transport/http_util.go
@@ -21,6 +21,7 @@ package transport
import (
"bufio"
"encoding/base64"
+ "errors"
"fmt"
"io"
"math"
@@ -330,7 +331,8 @@ func (w *bufWriter) Write(b []byte) (n int, err error) {
return 0, w.err
}
if w.batchSize == 0 { // Buffer has been disabled.
- return w.conn.Write(b)
+ n, err = w.conn.Write(b)
+ return n, toIOError(err)
}
for len(b) > 0 {
nn := copy(w.buf[w.offset:], b)
@@ -352,10 +354,30 @@ func (w *bufWriter) Flush() error {
return nil
}
_, w.err = w.conn.Write(w.buf[:w.offset])
+ w.err = toIOError(w.err)
w.offset = 0
return w.err
}
+type ioError struct {
+ error
+}
+
+func (i ioError) Unwrap() error {
+ return i.error
+}
+
+func isIOError(err error) bool {
+ return errors.As(err, &ioError{})
+}
+
+func toIOError(err error) error {
+ if err == nil {
+ return nil
+ }
+ return ioError{error: err}
+}
+
type framer struct {
writer *bufWriter
fr *http2.Framer
diff --git a/vendor/google.golang.org/grpc/internal/transport/transport.go b/vendor/google.golang.org/grpc/internal/transport/transport.go
index 0ac77ea4f..1b7d7fabc 100644
--- a/vendor/google.golang.org/grpc/internal/transport/transport.go
+++ b/vendor/google.golang.org/grpc/internal/transport/transport.go
@@ -257,6 +257,9 @@ type Stream struct {
fc *inFlow
wq *writeQuota
+ // Holds compressor names passed in grpc-accept-encoding metadata from the
+ // client. This is empty for the client side stream.
+ clientAdvertisedCompressors string
// Callback to state application's intentions to read data. This
// is used to adjust flow control, if needed.
requestRead func(int)
@@ -345,8 +348,24 @@ func (s *Stream) RecvCompress() string {
}
// SetSendCompress sets the compression algorithm to the stream.
-func (s *Stream) SetSendCompress(str string) {
- s.sendCompress = str
+func (s *Stream) SetSendCompress(name string) error {
+ if s.isHeaderSent() || s.getState() == streamDone {
+ return errors.New("transport: set send compressor called after headers sent or stream done")
+ }
+
+ s.sendCompress = name
+ return nil
+}
+
+// SendCompress returns the send compressor name.
+func (s *Stream) SendCompress() string {
+ return s.sendCompress
+}
+
+// ClientAdvertisedCompressors returns the compressor names advertised by the
+// client via grpc-accept-encoding header.
+func (s *Stream) ClientAdvertisedCompressors() string {
+ return s.clientAdvertisedCompressors
}
// Done returns a channel which is closed when it receives the final status