summaryrefslogtreecommitdiff
path: root/vendor/google.golang.org/grpc/internal/transport/http2_client.go
diff options
context:
space:
mode:
Diffstat (limited to 'vendor/google.golang.org/grpc/internal/transport/http2_client.go')
-rw-r--r--vendor/google.golang.org/grpc/internal/transport/http2_client.go72
1 files changed, 32 insertions, 40 deletions
diff --git a/vendor/google.golang.org/grpc/internal/transport/http2_client.go b/vendor/google.golang.org/grpc/internal/transport/http2_client.go
index eff879964..deba0c4d9 100644
--- a/vendor/google.golang.org/grpc/internal/transport/http2_client.go
+++ b/vendor/google.golang.org/grpc/internal/transport/http2_client.go
@@ -140,9 +140,7 @@ type http2Client struct {
// variable.
kpDormant bool
- // Fields below are for channelz metric collection.
- channelzID *channelz.Identifier
- czData *channelzData
+ channelz *channelz.Socket
onClose func(GoAwayReason)
@@ -319,6 +317,7 @@ func newHTTP2Client(connectCtx, ctx context.Context, addr resolver.Address, opts
if opts.MaxHeaderListSize != nil {
maxHeaderListSize = *opts.MaxHeaderListSize
}
+
t := &http2Client{
ctx: ctx,
ctxDone: ctx.Done(), // Cache Done chan.
@@ -346,11 +345,25 @@ func newHTTP2Client(connectCtx, ctx context.Context, addr resolver.Address, opts
maxConcurrentStreams: defaultMaxStreamsClient,
streamQuota: defaultMaxStreamsClient,
streamsQuotaAvailable: make(chan struct{}, 1),
- czData: new(channelzData),
keepaliveEnabled: keepaliveEnabled,
bufferPool: newBufferPool(),
onClose: onClose,
}
+ var czSecurity credentials.ChannelzSecurityValue
+ if au, ok := authInfo.(credentials.ChannelzSecurityInfo); ok {
+ czSecurity = au.GetSecurityValue()
+ }
+ t.channelz = channelz.RegisterSocket(
+ &channelz.Socket{
+ SocketType: channelz.SocketTypeNormal,
+ Parent: opts.ChannelzParent,
+ SocketMetrics: channelz.SocketMetrics{},
+ EphemeralMetrics: t.socketMetrics,
+ LocalAddr: t.localAddr,
+ RemoteAddr: t.remoteAddr,
+ SocketOptions: channelz.GetSocketOption(t.conn),
+ Security: czSecurity,
+ })
t.logger = prefixLoggerForClientTransport(t)
// Add peer information to the http2client context.
t.ctx = peer.NewContext(t.ctx, t.getPeer())
@@ -381,10 +394,6 @@ func newHTTP2Client(connectCtx, ctx context.Context, addr resolver.Address, opts
}
sh.HandleConn(t.ctx, connBegin)
}
- t.channelzID, err = channelz.RegisterNormalSocket(t, opts.ChannelzParentID, fmt.Sprintf("%s -> %s", t.localAddr, t.remoteAddr))
- if err != nil {
- return nil, err
- }
if t.keepaliveEnabled {
t.kpDormancyCond = sync.NewCond(&t.mu)
go t.keepalive()
@@ -756,8 +765,8 @@ func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (*Stream,
return ErrConnClosing
}
if channelz.IsOn() {
- atomic.AddInt64(&t.czData.streamsStarted, 1)
- atomic.StoreInt64(&t.czData.lastStreamCreatedTime, time.Now().UnixNano())
+ t.channelz.SocketMetrics.StreamsStarted.Add(1)
+ t.channelz.SocketMetrics.LastLocalStreamCreatedTimestamp.Store(time.Now().UnixNano())
}
// If the keepalive goroutine has gone dormant, wake it up.
if t.kpDormant {
@@ -928,9 +937,9 @@ func (t *http2Client) closeStream(s *Stream, err error, rst bool, rstCode http2.
t.mu.Unlock()
if channelz.IsOn() {
if eosReceived {
- atomic.AddInt64(&t.czData.streamsSucceeded, 1)
+ t.channelz.SocketMetrics.StreamsSucceeded.Add(1)
} else {
- atomic.AddInt64(&t.czData.streamsFailed, 1)
+ t.channelz.SocketMetrics.StreamsFailed.Add(1)
}
}
},
@@ -985,7 +994,7 @@ func (t *http2Client) Close(err error) {
t.controlBuf.finish()
t.cancel()
t.conn.Close()
- channelz.RemoveEntry(t.channelzID)
+ channelz.RemoveEntry(t.channelz.ID)
// Append info about previous goaways if there were any, since this may be important
// for understanding the root cause for this connection to be closed.
_, goAwayDebugMessage := t.GetGoAwayReason()
@@ -1708,7 +1717,7 @@ func (t *http2Client) keepalive() {
// keepalive timer expired. In both cases, we need to send a ping.
if !outstandingPing {
if channelz.IsOn() {
- atomic.AddInt64(&t.czData.kpCount, 1)
+ t.channelz.SocketMetrics.KeepAlivesSent.Add(1)
}
t.controlBuf.put(p)
timeoutLeft = t.kp.Timeout
@@ -1738,40 +1747,23 @@ func (t *http2Client) GoAway() <-chan struct{} {
return t.goAway
}
-func (t *http2Client) ChannelzMetric() *channelz.SocketInternalMetric {
- s := channelz.SocketInternalMetric{
- StreamsStarted: atomic.LoadInt64(&t.czData.streamsStarted),
- StreamsSucceeded: atomic.LoadInt64(&t.czData.streamsSucceeded),
- StreamsFailed: atomic.LoadInt64(&t.czData.streamsFailed),
- MessagesSent: atomic.LoadInt64(&t.czData.msgSent),
- MessagesReceived: atomic.LoadInt64(&t.czData.msgRecv),
- KeepAlivesSent: atomic.LoadInt64(&t.czData.kpCount),
- LastLocalStreamCreatedTimestamp: time.Unix(0, atomic.LoadInt64(&t.czData.lastStreamCreatedTime)),
- LastMessageSentTimestamp: time.Unix(0, atomic.LoadInt64(&t.czData.lastMsgSentTime)),
- LastMessageReceivedTimestamp: time.Unix(0, atomic.LoadInt64(&t.czData.lastMsgRecvTime)),
- LocalFlowControlWindow: int64(t.fc.getSize()),
- SocketOptions: channelz.GetSocketOption(t.conn),
- LocalAddr: t.localAddr,
- RemoteAddr: t.remoteAddr,
- // RemoteName :
- }
- if au, ok := t.authInfo.(credentials.ChannelzSecurityInfo); ok {
- s.Security = au.GetSecurityValue()
- }
- s.RemoteFlowControlWindow = t.getOutFlowWindow()
- return &s
+func (t *http2Client) socketMetrics() *channelz.EphemeralSocketMetrics {
+ return &channelz.EphemeralSocketMetrics{
+ LocalFlowControlWindow: int64(t.fc.getSize()),
+ RemoteFlowControlWindow: t.getOutFlowWindow(),
+ }
}
func (t *http2Client) RemoteAddr() net.Addr { return t.remoteAddr }
func (t *http2Client) IncrMsgSent() {
- atomic.AddInt64(&t.czData.msgSent, 1)
- atomic.StoreInt64(&t.czData.lastMsgSentTime, time.Now().UnixNano())
+ t.channelz.SocketMetrics.MessagesSent.Add(1)
+ t.channelz.SocketMetrics.LastMessageSentTimestamp.Store(time.Now().UnixNano())
}
func (t *http2Client) IncrMsgRecv() {
- atomic.AddInt64(&t.czData.msgRecv, 1)
- atomic.StoreInt64(&t.czData.lastMsgRecvTime, time.Now().UnixNano())
+ t.channelz.SocketMetrics.MessagesReceived.Add(1)
+ t.channelz.SocketMetrics.LastMessageReceivedTimestamp.Store(time.Now().UnixNano())
}
func (t *http2Client) getOutFlowWindow() int64 {