summaryrefslogtreecommitdiff
path: root/vendor/google.golang.org/grpc/internal/transport
diff options
context:
space:
mode:
Diffstat (limited to 'vendor/google.golang.org/grpc/internal/transport')
-rw-r--r--vendor/google.golang.org/grpc/internal/transport/client_stream.go2
-rw-r--r--vendor/google.golang.org/grpc/internal/transport/http2_client.go8
-rw-r--r--vendor/google.golang.org/grpc/internal/transport/http2_server.go23
-rw-r--r--vendor/google.golang.org/grpc/internal/transport/http_util.go4
-rw-r--r--vendor/google.golang.org/grpc/internal/transport/server_stream.go6
5 files changed, 32 insertions, 11 deletions
diff --git a/vendor/google.golang.org/grpc/internal/transport/client_stream.go b/vendor/google.golang.org/grpc/internal/transport/client_stream.go
index 8ed347c54..ccc0e017e 100644
--- a/vendor/google.golang.org/grpc/internal/transport/client_stream.go
+++ b/vendor/google.golang.org/grpc/internal/transport/client_stream.go
@@ -59,7 +59,7 @@ func (s *ClientStream) Read(n int) (mem.BufferSlice, error) {
return b, err
}
-// Close closes the stream and popagates err to any readers.
+// Close closes the stream and propagates err to any readers.
func (s *ClientStream) Close(err error) {
var (
rst bool
diff --git a/vendor/google.golang.org/grpc/internal/transport/http2_client.go b/vendor/google.golang.org/grpc/internal/transport/http2_client.go
index 513dbb93d..171e690a3 100644
--- a/vendor/google.golang.org/grpc/internal/transport/http2_client.go
+++ b/vendor/google.golang.org/grpc/internal/transport/http2_client.go
@@ -176,7 +176,7 @@ func dial(ctx context.Context, fn func(context.Context, string) (net.Conn, error
return fn(ctx, address)
}
if !ok {
- networkType, address = parseDialTarget(address)
+ networkType, address = ParseDialTarget(address)
}
if opts, present := proxyattributes.Get(addr); present {
return proxyDial(ctx, addr, grpcUA, opts)
@@ -1242,7 +1242,8 @@ func (t *http2Client) handleRSTStream(f *http2.RSTStreamFrame) {
statusCode = codes.DeadlineExceeded
}
}
- t.closeStream(s, io.EOF, false, http2.ErrCodeNo, status.Newf(statusCode, "stream terminated by RST_STREAM with error code: %v", f.ErrCode), nil, false)
+ st := status.Newf(statusCode, "stream terminated by RST_STREAM with error code: %v", f.ErrCode)
+ t.closeStream(s, st.Err(), false, http2.ErrCodeNo, st, nil, false)
}
func (t *http2Client) handleSettings(f *http2.SettingsFrame, isFirst bool) {
@@ -1390,8 +1391,7 @@ func (t *http2Client) handleGoAway(f *http2.GoAwayFrame) error {
// the caller.
func (t *http2Client) setGoAwayReason(f *http2.GoAwayFrame) {
t.goAwayReason = GoAwayNoReason
- switch f.ErrCode {
- case http2.ErrCodeEnhanceYourCalm:
+ if f.ErrCode == http2.ErrCodeEnhanceYourCalm {
if string(f.DebugData()) == "too_many_pings" {
t.goAwayReason = GoAwayTooManyPings
}
diff --git a/vendor/google.golang.org/grpc/internal/transport/http2_server.go b/vendor/google.golang.org/grpc/internal/transport/http2_server.go
index 997b0a59b..7e53eb173 100644
--- a/vendor/google.golang.org/grpc/internal/transport/http2_server.go
+++ b/vendor/google.golang.org/grpc/internal/transport/http2_server.go
@@ -35,6 +35,7 @@ import (
"golang.org/x/net/http2"
"golang.org/x/net/http2/hpack"
+ "google.golang.org/grpc/internal"
"google.golang.org/grpc/internal/grpclog"
"google.golang.org/grpc/internal/grpcutil"
"google.golang.org/grpc/internal/pretty"
@@ -598,6 +599,22 @@ func (t *http2Server) operateHeaders(ctx context.Context, frame *http2.MetaHeade
if len(t.activeStreams) == 1 {
t.idle = time.Time{}
}
+ // Start a timer to close the stream on reaching the deadline.
+ if timeoutSet {
+ // We need to wait for s.cancel to be updated before calling
+ // t.closeStream to avoid data races.
+ cancelUpdated := make(chan struct{})
+ timer := internal.TimeAfterFunc(timeout, func() {
+ <-cancelUpdated
+ t.closeStream(s, true, http2.ErrCodeCancel, false)
+ })
+ oldCancel := s.cancel
+ s.cancel = func() {
+ oldCancel()
+ timer.Stop()
+ }
+ close(cancelUpdated)
+ }
t.mu.Unlock()
if channelz.IsOn() {
t.channelz.SocketMetrics.StreamsStarted.Add(1)
@@ -1274,7 +1291,6 @@ func (t *http2Server) Close(err error) {
// deleteStream deletes the stream s from transport's active streams.
func (t *http2Server) deleteStream(s *ServerStream, eosReceived bool) {
-
t.mu.Lock()
if _, ok := t.activeStreams[s.id]; ok {
delete(t.activeStreams, s.id)
@@ -1324,7 +1340,10 @@ func (t *http2Server) closeStream(s *ServerStream, rst bool, rstCode http2.ErrCo
// called to interrupt the potential blocking on other goroutines.
s.cancel()
- s.swapState(streamDone)
+ oldState := s.swapState(streamDone)
+ if oldState == streamDone {
+ return
+ }
t.deleteStream(s, eosReceived)
t.controlBuf.put(&cleanupStream{
diff --git a/vendor/google.golang.org/grpc/internal/transport/http_util.go b/vendor/google.golang.org/grpc/internal/transport/http_util.go
index 3613d7b64..f997f9fdb 100644
--- a/vendor/google.golang.org/grpc/internal/transport/http_util.go
+++ b/vendor/google.golang.org/grpc/internal/transport/http_util.go
@@ -439,8 +439,8 @@ func getWriteBufferPool(size int) *sync.Pool {
return pool
}
-// parseDialTarget returns the network and address to pass to dialer.
-func parseDialTarget(target string) (string, string) {
+// ParseDialTarget returns the network and address to pass to dialer.
+func ParseDialTarget(target string) (string, string) {
net := "tcp"
m1 := strings.Index(target, ":")
m2 := strings.Index(target, ":/")
diff --git a/vendor/google.golang.org/grpc/internal/transport/server_stream.go b/vendor/google.golang.org/grpc/internal/transport/server_stream.go
index a22a90151..cf8da0b52 100644
--- a/vendor/google.golang.org/grpc/internal/transport/server_stream.go
+++ b/vendor/google.golang.org/grpc/internal/transport/server_stream.go
@@ -35,8 +35,10 @@ type ServerStream struct {
*Stream // Embed for common stream functionality.
st internalServerTransport
- ctxDone <-chan struct{} // closed at the end of stream. Cache of ctx.Done() (for performance)
- cancel context.CancelFunc // invoked at the end of stream to cancel ctx.
+ ctxDone <-chan struct{} // closed at the end of stream. Cache of ctx.Done() (for performance)
+ // cancel is invoked at the end of stream to cancel ctx. It also stops the
+ // timer for monitoring the rpc deadline if configured.
+ cancel func()
// Holds compressor names passed in grpc-accept-encoding metadata from the
// client.