summaryrefslogtreecommitdiff
path: root/vendor/google.golang.org/grpc/internal/transport
diff options
context:
space:
mode:
Diffstat (limited to 'vendor/google.golang.org/grpc/internal/transport')
-rw-r--r--vendor/google.golang.org/grpc/internal/transport/controlbuf.go17
-rw-r--r--vendor/google.golang.org/grpc/internal/transport/handler_server.go9
-rw-r--r--vendor/google.golang.org/grpc/internal/transport/http2_client.go25
-rw-r--r--vendor/google.golang.org/grpc/internal/transport/http2_server.go53
-rw-r--r--vendor/google.golang.org/grpc/internal/transport/http_util.go2
-rw-r--r--vendor/google.golang.org/grpc/internal/transport/logging.go40
6 files changed, 99 insertions, 47 deletions
diff --git a/vendor/google.golang.org/grpc/internal/transport/controlbuf.go b/vendor/google.golang.org/grpc/internal/transport/controlbuf.go
index c343c23a5..be5a9c81e 100644
--- a/vendor/google.golang.org/grpc/internal/transport/controlbuf.go
+++ b/vendor/google.golang.org/grpc/internal/transport/controlbuf.go
@@ -30,6 +30,7 @@ import (
"golang.org/x/net/http2"
"golang.org/x/net/http2/hpack"
+ "google.golang.org/grpc/internal/grpclog"
"google.golang.org/grpc/internal/grpcutil"
"google.golang.org/grpc/status"
)
@@ -488,12 +489,13 @@ type loopyWriter struct {
bdpEst *bdpEstimator
draining bool
conn net.Conn
+ logger *grpclog.PrefixLogger
// Side-specific handlers
ssGoAwayHandler func(*goAway) (bool, error)
}
-func newLoopyWriter(s side, fr *framer, cbuf *controlBuffer, bdpEst *bdpEstimator, conn net.Conn) *loopyWriter {
+func newLoopyWriter(s side, fr *framer, cbuf *controlBuffer, bdpEst *bdpEstimator, conn net.Conn, logger *grpclog.PrefixLogger) *loopyWriter {
var buf bytes.Buffer
l := &loopyWriter{
side: s,
@@ -507,6 +509,7 @@ func newLoopyWriter(s side, fr *framer, cbuf *controlBuffer, bdpEst *bdpEstimato
hEnc: hpack.NewEncoder(&buf),
bdpEst: bdpEst,
conn: conn,
+ logger: logger,
}
return l
}
@@ -536,8 +539,8 @@ const minBatchSize = 1000
// left open to allow the I/O error to be encountered by the reader instead.
func (l *loopyWriter) run() (err error) {
defer func() {
- if logger.V(logLevel) {
- logger.Infof("transport: loopyWriter exiting with error: %v", err)
+ if l.logger.V(logLevel) {
+ l.logger.Infof("loopyWriter exiting with error: %v", err)
}
if !isIOError(err) {
l.framer.writer.Flush()
@@ -636,8 +639,8 @@ func (l *loopyWriter) headerHandler(h *headerFrame) error {
if l.side == serverSide {
str, ok := l.estdStreams[h.streamID]
if !ok {
- if logger.V(logLevel) {
- logger.Warningf("transport: loopy doesn't recognize the stream: %d", h.streamID)
+ if l.logger.V(logLevel) {
+ l.logger.Infof("Unrecognized streamID %d in loopyWriter", h.streamID)
}
return nil
}
@@ -692,8 +695,8 @@ func (l *loopyWriter) writeHeader(streamID uint32, endStream bool, hf []hpack.He
l.hBuf.Reset()
for _, f := range hf {
if err := l.hEnc.WriteField(f); err != nil {
- if logger.V(logLevel) {
- logger.Warningf("transport: loopyWriter.writeHeader encountered error while encoding headers: %v", err)
+ if l.logger.V(logLevel) {
+ l.logger.Warningf("Encountered error while encoding headers: %v", err)
}
}
}
diff --git a/vendor/google.golang.org/grpc/internal/transport/handler_server.go b/vendor/google.golang.org/grpc/internal/transport/handler_server.go
index e6626bf96..fbee581b8 100644
--- a/vendor/google.golang.org/grpc/internal/transport/handler_server.go
+++ b/vendor/google.golang.org/grpc/internal/transport/handler_server.go
@@ -39,6 +39,7 @@ import (
"golang.org/x/net/http2"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/credentials"
+ "google.golang.org/grpc/internal/grpclog"
"google.golang.org/grpc/internal/grpcutil"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/peer"
@@ -83,6 +84,7 @@ func NewServerHandlerTransport(w http.ResponseWriter, r *http.Request, stats []s
contentSubtype: contentSubtype,
stats: stats,
}
+ st.logger = prefixLoggerForServerHandlerTransport(st)
if v := r.Header.Get("grpc-timeout"); v != "" {
to, err := decodeTimeout(v)
@@ -150,13 +152,14 @@ type serverHandlerTransport struct {
// TODO make sure this is consistent across handler_server and http2_server
contentSubtype string
- stats []stats.Handler
+ stats []stats.Handler
+ logger *grpclog.PrefixLogger
}
func (ht *serverHandlerTransport) Close(err error) {
ht.closeOnce.Do(func() {
- if logger.V(logLevel) {
- logger.Infof("Closing serverHandlerTransport: %v", err)
+ if ht.logger.V(logLevel) {
+ ht.logger.Infof("Closing: %v", err)
}
close(ht.closedCh)
})
diff --git a/vendor/google.golang.org/grpc/internal/transport/http2_client.go b/vendor/google.golang.org/grpc/internal/transport/http2_client.go
index 9826feb8c..5216998a8 100644
--- a/vendor/google.golang.org/grpc/internal/transport/http2_client.go
+++ b/vendor/google.golang.org/grpc/internal/transport/http2_client.go
@@ -38,6 +38,7 @@ import (
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/internal/channelz"
icredentials "google.golang.org/grpc/internal/credentials"
+ "google.golang.org/grpc/internal/grpclog"
"google.golang.org/grpc/internal/grpcsync"
"google.golang.org/grpc/internal/grpcutil"
imetadata "google.golang.org/grpc/internal/metadata"
@@ -145,6 +146,7 @@ type http2Client struct {
bufferPool *bufferPool
connectionID uint64
+ logger *grpclog.PrefixLogger
}
func dial(ctx context.Context, fn func(context.Context, string) (net.Conn, error), addr resolver.Address, useProxy bool, grpcUA string) (net.Conn, error) {
@@ -244,7 +246,7 @@ func newHTTP2Client(connectCtx, ctx context.Context, addr resolver.Address, opts
if err := connectCtx.Err(); err != nil {
// connectCtx expired before exiting the function. Hard close the connection.
if logger.V(logLevel) {
- logger.Infof("newClientTransport: aborting due to connectCtx: %v", err)
+ logger.Infof("Aborting due to connect deadline expiring: %v", err)
}
conn.Close()
}
@@ -346,6 +348,7 @@ func newHTTP2Client(connectCtx, ctx context.Context, addr resolver.Address, opts
bufferPool: newBufferPool(),
onClose: onClose,
}
+ t.logger = prefixLoggerForClientTransport(t)
// Add peer information to the http2client context.
t.ctx = peer.NewContext(t.ctx, t.getPeer())
@@ -444,7 +447,7 @@ func newHTTP2Client(connectCtx, ctx context.Context, addr resolver.Address, opts
return nil, err
}
go func() {
- t.loopy = newLoopyWriter(clientSide, t.framer, t.controlBuf, t.bdpEst, t.conn)
+ t.loopy = newLoopyWriter(clientSide, t.framer, t.controlBuf, t.bdpEst, t.conn, t.logger)
t.loopy.run()
close(t.writerDone)
}()
@@ -782,7 +785,7 @@ func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (*Stream,
s.id = h.streamID
s.fc = &inFlow{limit: uint32(t.initialWindowSize)}
t.mu.Lock()
- if t.activeStreams == nil { // Can be niled from Close().
+ if t.state == draining || t.activeStreams == nil { // Can be niled from Close().
t.mu.Unlock()
return false // Don't create a stream if the transport is already closed.
}
@@ -859,8 +862,8 @@ func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (*Stream,
}
}
if transportDrainRequired {
- if logger.V(logLevel) {
- logger.Infof("transport: t.nextID > MaxStreamID. Draining")
+ if t.logger.V(logLevel) {
+ t.logger.Infof("Draining transport: t.nextID > MaxStreamID")
}
t.GracefulClose()
}
@@ -952,8 +955,8 @@ func (t *http2Client) Close(err error) {
t.mu.Unlock()
return
}
- if logger.V(logLevel) {
- logger.Infof("transport: closing: %v", err)
+ if t.logger.V(logLevel) {
+ t.logger.Infof("Closing: %v", err)
}
// Call t.onClose ASAP to prevent the client from attempting to create new
// streams.
@@ -1009,8 +1012,8 @@ func (t *http2Client) GracefulClose() {
t.mu.Unlock()
return
}
- if logger.V(logLevel) {
- logger.Infof("transport: GracefulClose called")
+ if t.logger.V(logLevel) {
+ t.logger.Infof("GracefulClose called")
}
t.onClose(GoAwayInvalid)
t.state = draining
@@ -1174,8 +1177,8 @@ func (t *http2Client) handleRSTStream(f *http2.RSTStreamFrame) {
}
statusCode, ok := http2ErrConvTab[f.ErrCode]
if !ok {
- if logger.V(logLevel) {
- logger.Warningf("transport: http2Client.handleRSTStream found no mapped gRPC status for the received http2 error: %v", f.ErrCode)
+ if t.logger.V(logLevel) {
+ t.logger.Infof("Received a RST_STREAM frame with code %q, but found no mapped gRPC status", f.ErrCode)
}
statusCode = codes.Unknown
}
diff --git a/vendor/google.golang.org/grpc/internal/transport/http2_server.go b/vendor/google.golang.org/grpc/internal/transport/http2_server.go
index 99ae1a737..4b406b8cb 100644
--- a/vendor/google.golang.org/grpc/internal/transport/http2_server.go
+++ b/vendor/google.golang.org/grpc/internal/transport/http2_server.go
@@ -35,7 +35,9 @@ import (
"github.com/golang/protobuf/proto"
"golang.org/x/net/http2"
"golang.org/x/net/http2/hpack"
+ "google.golang.org/grpc/internal/grpclog"
"google.golang.org/grpc/internal/grpcutil"
+ "google.golang.org/grpc/internal/pretty"
"google.golang.org/grpc/internal/syscall"
"google.golang.org/grpc/codes"
@@ -129,6 +131,8 @@ type http2Server struct {
// This lock may not be taken if mu is already held.
maxStreamMu sync.Mutex
maxStreamID uint32 // max stream ID ever seen
+
+ logger *grpclog.PrefixLogger
}
// NewServerTransport creates a http2 transport with conn and configuration
@@ -267,6 +271,7 @@ func NewServerTransport(conn net.Conn, config *ServerConfig) (_ ServerTransport,
czData: new(channelzData),
bufferPool: newBufferPool(),
}
+ t.logger = prefixLoggerForServerTransport(t)
// Add peer information to the http2server context.
t.ctx = peer.NewContext(t.ctx, t.getPeer())
@@ -331,7 +336,7 @@ func NewServerTransport(conn net.Conn, config *ServerConfig) (_ ServerTransport,
t.handleSettings(sf)
go func() {
- t.loopy = newLoopyWriter(serverSide, t.framer, t.controlBuf, t.bdpEst, t.conn)
+ t.loopy = newLoopyWriter(serverSide, t.framer, t.controlBuf, t.bdpEst, t.conn, t.logger)
t.loopy.ssGoAwayHandler = t.outgoingGoAwayHandler
t.loopy.run()
close(t.writerDone)
@@ -425,8 +430,8 @@ func (t *http2Server) operateHeaders(frame *http2.MetaHeadersFrame, handle func(
// "Transports must consider requests containing the Connection header
// as malformed." - A41
case "connection":
- if logger.V(logLevel) {
- logger.Errorf("transport: http2Server.operateHeaders parsed a :connection header which makes a request malformed as per the HTTP/2 spec")
+ if t.logger.V(logLevel) {
+ t.logger.Infof("Received a HEADERS frame with a :connection header which makes the request malformed, as per the HTTP/2 spec")
}
protocolError = true
default:
@@ -436,7 +441,7 @@ func (t *http2Server) operateHeaders(frame *http2.MetaHeadersFrame, handle func(
v, err := decodeMetadataHeader(hf.Name, hf.Value)
if err != nil {
headerError = status.Newf(codes.Internal, "malformed binary metadata %q in header %q: %v", hf.Value, hf.Name, err)
- logger.Warningf("Failed to decode metadata header (%q, %q): %v", hf.Name, hf.Value, err)
+ t.logger.Warningf("Failed to decode metadata header (%q, %q): %v", hf.Name, hf.Value, err)
break
}
mdata[hf.Name] = append(mdata[hf.Name], v)
@@ -450,8 +455,8 @@ func (t *http2Server) operateHeaders(frame *http2.MetaHeadersFrame, handle func(
// error, this takes precedence over a client not speaking gRPC.
if len(mdata[":authority"]) > 1 || len(mdata["host"]) > 1 {
errMsg := fmt.Sprintf("num values of :authority: %v, num values of host: %v, both must only have 1 value as per HTTP/2 spec", len(mdata[":authority"]), len(mdata["host"]))
- if logger.V(logLevel) {
- logger.Errorf("transport: %v", errMsg)
+ if t.logger.V(logLevel) {
+ t.logger.Infof("Aborting the stream early: %v", errMsg)
}
t.controlBuf.put(&earlyAbortStream{
httpStatus: http.StatusBadRequest,
@@ -545,9 +550,9 @@ func (t *http2Server) operateHeaders(frame *http2.MetaHeadersFrame, handle func(
}
if httpMethod != http.MethodPost {
t.mu.Unlock()
- errMsg := fmt.Sprintf("http2Server.operateHeaders parsed a :method field: %v which should be POST", httpMethod)
- if logger.V(logLevel) {
- logger.Infof("transport: %v", errMsg)
+ errMsg := fmt.Sprintf("Received a HEADERS frame with :method %q which should be POST", httpMethod)
+ if t.logger.V(logLevel) {
+ t.logger.Infof("Aborting the stream early: %v", errMsg)
}
t.controlBuf.put(&earlyAbortStream{
httpStatus: 405,
@@ -563,8 +568,8 @@ func (t *http2Server) operateHeaders(frame *http2.MetaHeadersFrame, handle func(
var err error
if s.ctx, err = t.inTapHandle(s.ctx, &tap.Info{FullMethodName: s.method}); err != nil {
t.mu.Unlock()
- if logger.V(logLevel) {
- logger.Infof("transport: http2Server.operateHeaders got an error from InTapHandle: %v", err)
+ if t.logger.V(logLevel) {
+ t.logger.Infof("Aborting the stream early due to InTapHandle failure: %v", err)
}
stat, ok := status.FromError(err)
if !ok {
@@ -638,8 +643,8 @@ func (t *http2Server) HandleStreams(handle func(*Stream), traceCtx func(context.
atomic.StoreInt64(&t.lastRead, time.Now().UnixNano())
if err != nil {
if se, ok := err.(http2.StreamError); ok {
- if logger.V(logLevel) {
- logger.Warningf("transport: http2Server.HandleStreams encountered http2.StreamError: %v", se)
+ if t.logger.V(logLevel) {
+ t.logger.Warningf("Encountered http2.StreamError: %v", se)
}
t.mu.Lock()
s := t.activeStreams[se.StreamID]
@@ -682,8 +687,8 @@ func (t *http2Server) HandleStreams(handle func(*Stream), traceCtx func(context.
case *http2.GoAwayFrame:
// TODO: Handle GoAway from the client appropriately.
default:
- if logger.V(logLevel) {
- logger.Errorf("transport: http2Server.HandleStreams found unhandled frame type %v.", frame)
+ if t.logger.V(logLevel) {
+ t.logger.Infof("Received unsupported frame type %T", frame)
}
}
}
@@ -942,8 +947,8 @@ func (t *http2Server) checkForHeaderListSize(it interface{}) bool {
var sz int64
for _, f := range hdrFrame.hf {
if sz += int64(f.Size()); sz > int64(*t.maxSendHeaderListSize) {
- if logger.V(logLevel) {
- logger.Errorf("header list size to send violates the maximum size (%d bytes) set by client", *t.maxSendHeaderListSize)
+ if t.logger.V(logLevel) {
+ t.logger.Infof("Header list size to send violates the maximum size (%d bytes) set by client", *t.maxSendHeaderListSize)
}
return false
}
@@ -1056,7 +1061,7 @@ func (t *http2Server) WriteStatus(s *Stream, st *status.Status) error {
stBytes, err := proto.Marshal(p)
if err != nil {
// TODO: return error instead, when callers are able to handle it.
- logger.Errorf("transport: failed to marshal rpc status: %v, error: %v", p, err)
+ t.logger.Errorf("Failed to marshal rpc status: %s, error: %v", pretty.ToJSON(p), err)
} else {
headerFields = append(headerFields, hpack.HeaderField{Name: "grpc-status-details-bin", Value: encodeBinHeader(stBytes)})
}
@@ -1171,8 +1176,8 @@ func (t *http2Server) keepalive() {
select {
case <-ageTimer.C:
// Close the connection after grace period.
- if logger.V(logLevel) {
- logger.Infof("transport: closing server transport due to maximum connection age.")
+ if t.logger.V(logLevel) {
+ t.logger.Infof("Closing server transport due to maximum connection age")
}
t.controlBuf.put(closeConnection{})
case <-t.done:
@@ -1223,8 +1228,8 @@ func (t *http2Server) Close(err error) {
t.mu.Unlock()
return
}
- if logger.V(logLevel) {
- logger.Infof("transport: closing: %v", err)
+ if t.logger.V(logLevel) {
+ t.logger.Infof("Closing: %v", err)
}
t.state = closing
streams := t.activeStreams
@@ -1232,8 +1237,8 @@ func (t *http2Server) Close(err error) {
t.mu.Unlock()
t.controlBuf.finish()
close(t.done)
- if err := t.conn.Close(); err != nil && logger.V(logLevel) {
- logger.Infof("transport: error closing conn during Close: %v", err)
+ if err := t.conn.Close(); err != nil && t.logger.V(logLevel) {
+ t.logger.Infof("Error closing underlying net.Conn during Close: %v", err)
}
channelz.RemoveEntry(t.channelzID)
// Cancel all active streams.
diff --git a/vendor/google.golang.org/grpc/internal/transport/http_util.go b/vendor/google.golang.org/grpc/internal/transport/http_util.go
index 8fcae4f4d..19cbb18f5 100644
--- a/vendor/google.golang.org/grpc/internal/transport/http_util.go
+++ b/vendor/google.golang.org/grpc/internal/transport/http_util.go
@@ -38,7 +38,6 @@ import (
"golang.org/x/net/http2/hpack"
spb "google.golang.org/genproto/googleapis/rpc/status"
"google.golang.org/grpc/codes"
- "google.golang.org/grpc/grpclog"
"google.golang.org/grpc/status"
)
@@ -86,7 +85,6 @@ var (
// 504 Gateway timeout - UNAVAILABLE.
http.StatusGatewayTimeout: codes.Unavailable,
}
- logger = grpclog.Component("transport")
)
// isReservedHeader checks whether hdr belongs to HTTP2 headers
diff --git a/vendor/google.golang.org/grpc/internal/transport/logging.go b/vendor/google.golang.org/grpc/internal/transport/logging.go
new file mode 100644
index 000000000..42ed2b07a
--- /dev/null
+++ b/vendor/google.golang.org/grpc/internal/transport/logging.go
@@ -0,0 +1,40 @@
+/*
+ *
+ * Copyright 2023 gRPC authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package transport
+
+import (
+ "fmt"
+
+ "google.golang.org/grpc/grpclog"
+ internalgrpclog "google.golang.org/grpc/internal/grpclog"
+)
+
+var logger = grpclog.Component("transport")
+
+func prefixLoggerForServerTransport(p *http2Server) *internalgrpclog.PrefixLogger {
+ return internalgrpclog.NewPrefixLogger(logger, fmt.Sprintf("[server-transport %p] ", p))
+}
+
+func prefixLoggerForServerHandlerTransport(p *serverHandlerTransport) *internalgrpclog.PrefixLogger {
+ return internalgrpclog.NewPrefixLogger(logger, fmt.Sprintf("[server-handler-transport %p] ", p))
+}
+
+func prefixLoggerForClientTransport(p *http2Client) *internalgrpclog.PrefixLogger {
+ return internalgrpclog.NewPrefixLogger(logger, fmt.Sprintf("[client-transport %p] ", p))
+}