summaryrefslogtreecommitdiff
path: root/vendor/google.golang.org
diff options
context:
space:
mode:
Diffstat (limited to 'vendor/google.golang.org')
-rw-r--r--vendor/google.golang.org/grpc/balancer_wrapper.go (renamed from vendor/google.golang.org/grpc/balancer_conn_wrappers.go)304
-rw-r--r--vendor/google.golang.org/grpc/binarylog/grpc_binarylog_v1/binarylog.pb.go2
-rw-r--r--vendor/google.golang.org/grpc/clientconn.go518
-rw-r--r--vendor/google.golang.org/grpc/codes/codes.go8
-rw-r--r--vendor/google.golang.org/grpc/credentials/tls.go75
-rw-r--r--vendor/google.golang.org/grpc/dialoptions.go48
-rw-r--r--vendor/google.golang.org/grpc/internal/buffer/unbounded.go41
-rw-r--r--vendor/google.golang.org/grpc/internal/channelz/funcs.go7
-rw-r--r--vendor/google.golang.org/grpc/internal/envconfig/envconfig.go3
-rw-r--r--vendor/google.golang.org/grpc/internal/envconfig/xds.go39
-rw-r--r--vendor/google.golang.org/grpc/internal/experimental.go28
-rw-r--r--vendor/google.golang.org/grpc/internal/grpcsync/callback_serializer.go51
-rw-r--r--vendor/google.golang.org/grpc/internal/idle/idle.go175
-rw-r--r--vendor/google.golang.org/grpc/internal/internal.go29
-rw-r--r--vendor/google.golang.org/grpc/internal/resolver/dns/dns_resolver.go69
-rw-r--r--vendor/google.golang.org/grpc/internal/resolver/dns/internal/internal.go70
-rw-r--r--vendor/google.golang.org/grpc/internal/resolver/unix/unix.go4
-rw-r--r--vendor/google.golang.org/grpc/internal/tcp_keepalive_others.go29
-rw-r--r--vendor/google.golang.org/grpc/internal/tcp_keepalive_unix.go54
-rw-r--r--vendor/google.golang.org/grpc/internal/tcp_keepalive_windows.go54
-rw-r--r--vendor/google.golang.org/grpc/internal/transport/controlbuf.go5
-rw-r--r--vendor/google.golang.org/grpc/internal/transport/handler_server.go65
-rw-r--r--vendor/google.golang.org/grpc/internal/transport/http2_client.go30
-rw-r--r--vendor/google.golang.org/grpc/internal/transport/http2_server.go135
-rw-r--r--vendor/google.golang.org/grpc/internal/transport/proxy.go14
-rw-r--r--vendor/google.golang.org/grpc/internal/transport/transport.go22
-rw-r--r--vendor/google.golang.org/grpc/metadata/metadata.go31
-rw-r--r--vendor/google.golang.org/grpc/peer/peer.go2
-rw-r--r--vendor/google.golang.org/grpc/picker_wrapper.go21
-rw-r--r--vendor/google.golang.org/grpc/pickfirst.go14
-rw-r--r--vendor/google.golang.org/grpc/resolver/dns/dns_resolver.go36
-rw-r--r--vendor/google.golang.org/grpc/resolver/map.go113
-rw-r--r--vendor/google.golang.org/grpc/resolver/resolver.go20
-rw-r--r--vendor/google.golang.org/grpc/resolver_conn_wrapper.go247
-rw-r--r--vendor/google.golang.org/grpc/resolver_wrapper.go197
-rw-r--r--vendor/google.golang.org/grpc/rpc_util.go8
-rw-r--r--vendor/google.golang.org/grpc/server.go264
-rw-r--r--vendor/google.golang.org/grpc/stream.go4
-rw-r--r--vendor/google.golang.org/grpc/version.go2
-rw-r--r--vendor/google.golang.org/grpc/vet.sh166
40 files changed, 1571 insertions, 1433 deletions
diff --git a/vendor/google.golang.org/grpc/balancer_conn_wrappers.go b/vendor/google.golang.org/grpc/balancer_wrapper.go
index a4411c22b..b5e30cff0 100644
--- a/vendor/google.golang.org/grpc/balancer_conn_wrappers.go
+++ b/vendor/google.golang.org/grpc/balancer_wrapper.go
@@ -32,21 +32,13 @@ import (
"google.golang.org/grpc/resolver"
)
-type ccbMode int
-
-const (
- ccbModeActive = iota
- ccbModeIdle
- ccbModeClosed
- ccbModeExitingIdle
-)
-
// ccBalancerWrapper sits between the ClientConn and the Balancer.
//
// ccBalancerWrapper implements methods corresponding to the ones on the
// balancer.Balancer interface. The ClientConn is free to call these methods
// concurrently and the ccBalancerWrapper ensures that calls from the ClientConn
-// to the Balancer happen synchronously and in order.
+// to the Balancer happen in order by performing them in the serializer, without
+// any mutexes held.
//
// ccBalancerWrapper also implements the balancer.ClientConn interface and is
// passed to the Balancer implementations. It invokes unexported methods on the
@@ -57,87 +49,75 @@ const (
type ccBalancerWrapper struct {
// The following fields are initialized when the wrapper is created and are
// read-only afterwards, and therefore can be accessed without a mutex.
- cc *ClientConn
- opts balancer.BuildOptions
+ cc *ClientConn
+ opts balancer.BuildOptions
+ serializer *grpcsync.CallbackSerializer
+ serializerCancel context.CancelFunc
- // Outgoing (gRPC --> balancer) calls are guaranteed to execute in a
- // mutually exclusive manner as they are scheduled in the serializer. Fields
- // accessed *only* in these serializer callbacks, can therefore be accessed
- // without a mutex.
- balancer *gracefulswitch.Balancer
+ // The following fields are only accessed within the serializer or during
+ // initialization.
curBalancerName string
+ balancer *gracefulswitch.Balancer
- // mu guards access to the below fields. Access to the serializer and its
- // cancel function needs to be mutex protected because they are overwritten
- // when the wrapper exits idle mode.
- mu sync.Mutex
- serializer *grpcsync.CallbackSerializer // To serialize all outoing calls.
- serializerCancel context.CancelFunc // To close the seralizer at close/enterIdle time.
- mode ccbMode // Tracks the current mode of the wrapper.
+ // The following field is protected by mu. Caller must take cc.mu before
+ // taking mu.
+ mu sync.Mutex
+ closed bool
}
-// newCCBalancerWrapper creates a new balancer wrapper. The underlying balancer
-// is not created until the switchTo() method is invoked.
-func newCCBalancerWrapper(cc *ClientConn, bopts balancer.BuildOptions) *ccBalancerWrapper {
- ctx, cancel := context.WithCancel(context.Background())
+// newCCBalancerWrapper creates a new balancer wrapper in idle state. The
+// underlying balancer is not created until the switchTo() method is invoked.
+func newCCBalancerWrapper(cc *ClientConn) *ccBalancerWrapper {
+ ctx, cancel := context.WithCancel(cc.ctx)
ccb := &ccBalancerWrapper{
- cc: cc,
- opts: bopts,
+ cc: cc,
+ opts: balancer.BuildOptions{
+ DialCreds: cc.dopts.copts.TransportCredentials,
+ CredsBundle: cc.dopts.copts.CredsBundle,
+ Dialer: cc.dopts.copts.Dialer,
+ Authority: cc.authority,
+ CustomUserAgent: cc.dopts.copts.UserAgent,
+ ChannelzParentID: cc.channelzID,
+ Target: cc.parsedTarget,
+ },
serializer: grpcsync.NewCallbackSerializer(ctx),
serializerCancel: cancel,
}
- ccb.balancer = gracefulswitch.NewBalancer(ccb, bopts)
+ ccb.balancer = gracefulswitch.NewBalancer(ccb, ccb.opts)
return ccb
}
// updateClientConnState is invoked by grpc to push a ClientConnState update to
-// the underlying balancer.
+// the underlying balancer. This is always executed from the serializer, so
+// it is safe to call into the balancer here.
func (ccb *ccBalancerWrapper) updateClientConnState(ccs *balancer.ClientConnState) error {
- ccb.mu.Lock()
- errCh := make(chan error, 1)
- // Here and everywhere else where Schedule() is called, it is done with the
- // lock held. But the lock guards only the scheduling part. The actual
- // callback is called asynchronously without the lock being held.
- ok := ccb.serializer.Schedule(func(_ context.Context) {
- errCh <- ccb.balancer.UpdateClientConnState(*ccs)
+ errCh := make(chan error)
+ ok := ccb.serializer.Schedule(func(ctx context.Context) {
+ defer close(errCh)
+ if ctx.Err() != nil || ccb.balancer == nil {
+ return
+ }
+ err := ccb.balancer.UpdateClientConnState(*ccs)
+ if logger.V(2) && err != nil {
+ logger.Infof("error from balancer.UpdateClientConnState: %v", err)
+ }
+ errCh <- err
})
if !ok {
- // If we are unable to schedule a function with the serializer, it
- // indicates that it has been closed. A serializer is only closed when
- // the wrapper is closed or is in idle.
- ccb.mu.Unlock()
- return fmt.Errorf("grpc: cannot send state update to a closed or idle balancer")
- }
- ccb.mu.Unlock()
-
- // We get here only if the above call to Schedule succeeds, in which case it
- // is guaranteed that the scheduled function will run. Therefore it is safe
- // to block on this channel.
- err := <-errCh
- if logger.V(2) && err != nil {
- logger.Infof("error from balancer.UpdateClientConnState: %v", err)
+ return nil
}
- return err
-}
-
-// updateSubConnState is invoked by grpc to push a subConn state update to the
-// underlying balancer.
-func (ccb *ccBalancerWrapper) updateSubConnState(sc balancer.SubConn, s connectivity.State, err error) {
- ccb.mu.Lock()
- ccb.serializer.Schedule(func(_ context.Context) {
- // Even though it is optional for balancers, gracefulswitch ensures
- // opts.StateListener is set, so this cannot ever be nil.
- sc.(*acBalancerWrapper).stateListener(balancer.SubConnState{ConnectivityState: s, ConnectionError: err})
- })
- ccb.mu.Unlock()
+ return <-errCh
}
+// resolverError is invoked by grpc to push a resolver error to the underlying
+// balancer. The call to the balancer is executed from the serializer.
func (ccb *ccBalancerWrapper) resolverError(err error) {
- ccb.mu.Lock()
- ccb.serializer.Schedule(func(_ context.Context) {
+ ccb.serializer.Schedule(func(ctx context.Context) {
+ if ctx.Err() != nil || ccb.balancer == nil {
+ return
+ }
ccb.balancer.ResolverError(err)
})
- ccb.mu.Unlock()
}
// switchTo is invoked by grpc to instruct the balancer wrapper to switch to the
@@ -151,8 +131,10 @@ func (ccb *ccBalancerWrapper) resolverError(err error) {
// the ccBalancerWrapper keeps track of the current LB policy name, and skips
// the graceful balancer switching process if the name does not change.
func (ccb *ccBalancerWrapper) switchTo(name string) {
- ccb.mu.Lock()
- ccb.serializer.Schedule(func(_ context.Context) {
+ ccb.serializer.Schedule(func(ctx context.Context) {
+ if ctx.Err() != nil || ccb.balancer == nil {
+ return
+ }
// TODO: Other languages use case-sensitive balancer registries. We should
// switch as well. See: https://github.com/grpc/grpc-go/issues/5288.
if strings.EqualFold(ccb.curBalancerName, name) {
@@ -160,7 +142,6 @@ func (ccb *ccBalancerWrapper) switchTo(name string) {
}
ccb.buildLoadBalancingPolicy(name)
})
- ccb.mu.Unlock()
}
// buildLoadBalancingPolicy performs the following:
@@ -187,115 +168,49 @@ func (ccb *ccBalancerWrapper) buildLoadBalancingPolicy(name string) {
ccb.curBalancerName = builder.Name()
}
+// close initiates async shutdown of the wrapper. cc.mu must be held when
+// calling this function. To determine the wrapper has finished shutting down,
+// the channel should block on ccb.serializer.Done() without cc.mu held.
func (ccb *ccBalancerWrapper) close() {
- channelz.Info(logger, ccb.cc.channelzID, "ccBalancerWrapper: closing")
- ccb.closeBalancer(ccbModeClosed)
-}
-
-// enterIdleMode is invoked by grpc when the channel enters idle mode upon
-// expiry of idle_timeout. This call blocks until the balancer is closed.
-func (ccb *ccBalancerWrapper) enterIdleMode() {
- channelz.Info(logger, ccb.cc.channelzID, "ccBalancerWrapper: entering idle mode")
- ccb.closeBalancer(ccbModeIdle)
-}
-
-// closeBalancer is invoked when the channel is being closed or when it enters
-// idle mode upon expiry of idle_timeout.
-func (ccb *ccBalancerWrapper) closeBalancer(m ccbMode) {
ccb.mu.Lock()
- if ccb.mode == ccbModeClosed || ccb.mode == ccbModeIdle {
- ccb.mu.Unlock()
- return
- }
-
- ccb.mode = m
- done := ccb.serializer.Done()
- b := ccb.balancer
- ok := ccb.serializer.Schedule(func(_ context.Context) {
- // Close the serializer to ensure that no more calls from gRPC are sent
- // to the balancer.
- ccb.serializerCancel()
- // Empty the current balancer name because we don't have a balancer
- // anymore and also so that we act on the next call to switchTo by
- // creating a new balancer specified by the new resolver.
- ccb.curBalancerName = ""
- })
- if !ok {
- ccb.mu.Unlock()
- return
- }
+ ccb.closed = true
ccb.mu.Unlock()
-
- // Give enqueued callbacks a chance to finish before closing the balancer.
- <-done
- b.Close()
-}
-
-// exitIdleMode is invoked by grpc when the channel exits idle mode either
-// because of an RPC or because of an invocation of the Connect() API. This
-// recreates the balancer that was closed previously when entering idle mode.
-//
-// If the channel is not in idle mode, we know for a fact that we are here as a
-// result of the user calling the Connect() method on the ClientConn. In this
-// case, we can simply forward the call to the underlying balancer, instructing
-// it to reconnect to the backends.
-func (ccb *ccBalancerWrapper) exitIdleMode() {
- ccb.mu.Lock()
- if ccb.mode == ccbModeClosed {
- // Request to exit idle is a no-op when wrapper is already closed.
- ccb.mu.Unlock()
- return
- }
-
- if ccb.mode == ccbModeIdle {
- // Recreate the serializer which was closed when we entered idle.
- ctx, cancel := context.WithCancel(context.Background())
- ccb.serializer = grpcsync.NewCallbackSerializer(ctx)
- ccb.serializerCancel = cancel
- }
-
- // The ClientConn guarantees that mutual exclusion between close() and
- // exitIdleMode(), and since we just created a new serializer, we can be
- // sure that the below function will be scheduled.
- done := make(chan struct{})
- ccb.serializer.Schedule(func(_ context.Context) {
- defer close(done)
-
- ccb.mu.Lock()
- defer ccb.mu.Unlock()
-
- if ccb.mode != ccbModeIdle {
- ccb.balancer.ExitIdle()
+ channelz.Info(logger, ccb.cc.channelzID, "ccBalancerWrapper: closing")
+ ccb.serializer.Schedule(func(context.Context) {
+ if ccb.balancer == nil {
return
}
-
- // Gracefulswitch balancer does not support a switchTo operation after
- // being closed. Hence we need to create a new one here.
- ccb.balancer = gracefulswitch.NewBalancer(ccb, ccb.opts)
- ccb.mode = ccbModeActive
- channelz.Info(logger, ccb.cc.channelzID, "ccBalancerWrapper: exiting idle mode")
-
+ ccb.balancer.Close()
+ ccb.balancer = nil
})
- ccb.mu.Unlock()
-
- <-done
+ ccb.serializerCancel()
}
-func (ccb *ccBalancerWrapper) isIdleOrClosed() bool {
- ccb.mu.Lock()
- defer ccb.mu.Unlock()
- return ccb.mode == ccbModeIdle || ccb.mode == ccbModeClosed
+// exitIdle invokes the balancer's exitIdle method in the serializer.
+func (ccb *ccBalancerWrapper) exitIdle() {
+ ccb.serializer.Schedule(func(ctx context.Context) {
+ if ctx.Err() != nil || ccb.balancer == nil {
+ return
+ }
+ ccb.balancer.ExitIdle()
+ })
}
func (ccb *ccBalancerWrapper) NewSubConn(addrs []resolver.Address, opts balancer.NewSubConnOptions) (balancer.SubConn, error) {
- if ccb.isIdleOrClosed() {
- return nil, fmt.Errorf("grpc: cannot create SubConn when balancer is closed or idle")
+ ccb.cc.mu.Lock()
+ defer ccb.cc.mu.Unlock()
+
+ ccb.mu.Lock()
+ if ccb.closed {
+ ccb.mu.Unlock()
+ return nil, fmt.Errorf("balancer is being closed; no new SubConns allowed")
}
+ ccb.mu.Unlock()
if len(addrs) == 0 {
return nil, fmt.Errorf("grpc: cannot create SubConn with empty address list")
}
- ac, err := ccb.cc.newAddrConn(addrs, opts)
+ ac, err := ccb.cc.newAddrConnLocked(addrs, opts)
if err != nil {
channelz.Warningf(logger, ccb.cc.channelzID, "acBalancerWrapper: NewSubConn: failed to newAddrConn: %v", err)
return nil, err
@@ -316,10 +231,6 @@ func (ccb *ccBalancerWrapper) RemoveSubConn(sc balancer.SubConn) {
}
func (ccb *ccBalancerWrapper) UpdateAddresses(sc balancer.SubConn, addrs []resolver.Address) {
- if ccb.isIdleOrClosed() {
- return
- }
-
acbw, ok := sc.(*acBalancerWrapper)
if !ok {
return
@@ -328,25 +239,39 @@ func (ccb *ccBalancerWrapper) UpdateAddresses(sc balancer.SubConn, addrs []resol
}
func (ccb *ccBalancerWrapper) UpdateState(s balancer.State) {
- if ccb.isIdleOrClosed() {
+ ccb.cc.mu.Lock()
+ defer ccb.cc.mu.Unlock()
+
+ ccb.mu.Lock()
+ if ccb.closed {
+ ccb.mu.Unlock()
return
}
-
+ ccb.mu.Unlock()
// Update picker before updating state. Even though the ordering here does
// not matter, it can lead to multiple calls of Pick in the common start-up
// case where we wait for ready and then perform an RPC. If the picker is
// updated later, we could call the "connecting" picker when the state is
// updated, and then call the "ready" picker after the picker gets updated.
- ccb.cc.blockingpicker.updatePicker(s.Picker)
+
+ // Note that there is no need to check if the balancer wrapper was closed,
+ // as we know the graceful switch LB policy will not call cc if it has been
+ // closed.
+ ccb.cc.pickerWrapper.updatePicker(s.Picker)
ccb.cc.csMgr.updateState(s.ConnectivityState)
}
func (ccb *ccBalancerWrapper) ResolveNow(o resolver.ResolveNowOptions) {
- if ccb.isIdleOrClosed() {
+ ccb.cc.mu.RLock()
+ defer ccb.cc.mu.RUnlock()
+
+ ccb.mu.Lock()
+ if ccb.closed {
+ ccb.mu.Unlock()
return
}
-
- ccb.cc.resolveNow(o)
+ ccb.mu.Unlock()
+ ccb.cc.resolveNowLocked(o)
}
func (ccb *ccBalancerWrapper) Target() string {
@@ -364,6 +289,20 @@ type acBalancerWrapper struct {
producers map[balancer.ProducerBuilder]*refCountedProducer
}
+// updateState is invoked by grpc to push a subConn state update to the
+// underlying balancer.
+func (acbw *acBalancerWrapper) updateState(s connectivity.State, err error) {
+ acbw.ccb.serializer.Schedule(func(ctx context.Context) {
+ if ctx.Err() != nil || acbw.ccb.balancer == nil {
+ return
+ }
+ // Even though it is optional for balancers, gracefulswitch ensures
+ // opts.StateListener is set, so this cannot ever be nil.
+ // TODO: delete this comment when UpdateSubConnState is removed.
+ acbw.stateListener(balancer.SubConnState{ConnectivityState: s, ConnectionError: err})
+ })
+}
+
func (acbw *acBalancerWrapper) String() string {
return fmt.Sprintf("SubConn(id:%d)", acbw.ac.channelzID.Int())
}
@@ -377,20 +316,7 @@ func (acbw *acBalancerWrapper) Connect() {
}
func (acbw *acBalancerWrapper) Shutdown() {
- ccb := acbw.ccb
- if ccb.isIdleOrClosed() {
- // It it safe to ignore this call when the balancer is closed or in idle
- // because the ClientConn takes care of closing the connections.
- //
- // Not returning early from here when the balancer is closed or in idle
- // leads to a deadlock though, because of the following sequence of
- // calls when holding cc.mu:
- // cc.exitIdleMode --> ccb.enterIdleMode --> gsw.Close -->
- // ccb.RemoveAddrConn --> cc.removeAddrConn
- return
- }
-
- ccb.cc.removeAddrConn(acbw.ac, errConnDrain)
+ acbw.ccb.cc.removeAddrConn(acbw.ac, errConnDrain)
}
// NewStream begins a streaming RPC on the addrConn. If the addrConn is not
diff --git a/vendor/google.golang.org/grpc/binarylog/grpc_binarylog_v1/binarylog.pb.go b/vendor/google.golang.org/grpc/binarylog/grpc_binarylog_v1/binarylog.pb.go
index 595480112..e9e97d451 100644
--- a/vendor/google.golang.org/grpc/binarylog/grpc_binarylog_v1/binarylog.pb.go
+++ b/vendor/google.golang.org/grpc/binarylog/grpc_binarylog_v1/binarylog.pb.go
@@ -430,7 +430,7 @@ type ClientHeader struct {
MethodName string `protobuf:"bytes,2,opt,name=method_name,json=methodName,proto3" json:"method_name,omitempty"`
// A single process may be used to run multiple virtual
// servers with different identities.
- // The authority is the name of such a server identitiy.
+ // The authority is the name of such a server identity.
// It is typically a portion of the URI in the form of
// <host> or <host>:<port> .
Authority string `protobuf:"bytes,3,opt,name=authority,proto3" json:"authority,omitempty"`
diff --git a/vendor/google.golang.org/grpc/clientconn.go b/vendor/google.golang.org/grpc/clientconn.go
index 429c389e4..f6e815e6b 100644
--- a/vendor/google.golang.org/grpc/clientconn.go
+++ b/vendor/google.golang.org/grpc/clientconn.go
@@ -33,9 +33,7 @@ import (
"google.golang.org/grpc/balancer/base"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/connectivity"
- "google.golang.org/grpc/credentials"
"google.golang.org/grpc/internal"
- "google.golang.org/grpc/internal/backoff"
"google.golang.org/grpc/internal/channelz"
"google.golang.org/grpc/internal/grpcsync"
"google.golang.org/grpc/internal/idle"
@@ -48,9 +46,9 @@ import (
"google.golang.org/grpc/status"
_ "google.golang.org/grpc/balancer/roundrobin" // To register roundrobin.
- _ "google.golang.org/grpc/internal/resolver/dns" // To register dns resolver.
_ "google.golang.org/grpc/internal/resolver/passthrough" // To register passthrough resolver.
_ "google.golang.org/grpc/internal/resolver/unix" // To register unix resolver.
+ _ "google.golang.org/grpc/resolver/dns" // To register dns resolver.
)
const (
@@ -119,23 +117,8 @@ func (dcs *defaultConfigSelector) SelectConfig(rpcInfo iresolver.RPCInfo) (*ires
}, nil
}
-// DialContext creates a client connection to the given target. By default, it's
-// a non-blocking dial (the function won't wait for connections to be
-// established, and connecting happens in the background). To make it a blocking
-// dial, use WithBlock() dial option.
-//
-// In the non-blocking case, the ctx does not act against the connection. It
-// only controls the setup steps.
-//
-// In the blocking case, ctx can be used to cancel or expire the pending
-// connection. Once this function returns, the cancellation and expiration of
-// ctx will be noop. Users should call ClientConn.Close to terminate all the
-// pending operations after this function returns.
-//
-// The target name syntax is defined in
-// https://github.com/grpc/grpc/blob/master/doc/naming.md.
-// e.g. to use dns resolver, a "dns:///" prefix should be applied to the target.
-func DialContext(ctx context.Context, target string, opts ...DialOption) (conn *ClientConn, err error) {
+// newClient returns a new client in idle mode.
+func newClient(target string, opts ...DialOption) (conn *ClientConn, err error) {
cc := &ClientConn{
target: target,
conns: make(map[*addrConn]struct{}),
@@ -143,23 +126,11 @@ func DialContext(ctx context.Context, target string, opts ...DialOption) (conn *
czData: new(channelzData),
}
- // We start the channel off in idle mode, but kick it out of idle at the end
- // of this method, instead of waiting for the first RPC. Other gRPC
- // implementations do wait for the first RPC to kick the channel out of
- // idle. But doing so would be a major behavior change for our users who are
- // used to seeing the channel active after Dial.
- //
- // Taking this approach of kicking it out of idle at the end of this method
- // allows us to share the code between channel creation and exiting idle
- // mode. This will also make it easy for us to switch to starting the
- // channel off in idle, if at all we ever get to do that.
- cc.idlenessState = ccIdlenessStateIdle
-
cc.retryThrottler.Store((*retryThrottler)(nil))
cc.safeConfigSelector.UpdateConfigSelector(&defaultConfigSelector{nil})
cc.ctx, cc.cancel = context.WithCancel(context.Background())
- cc.exitIdleCond = sync.NewCond(&cc.mu)
+ // Apply dial options.
disableGlobalOpts := false
for _, opt := range opts {
if _, ok := opt.(*disableGlobalDialOptions); ok {
@@ -177,21 +148,9 @@ func DialContext(ctx context.Context, target string, opts ...DialOption) (conn *
for _, opt := range opts {
opt.apply(&cc.dopts)
}
-
chainUnaryClientInterceptors(cc)
chainStreamClientInterceptors(cc)
- defer func() {
- if err != nil {
- cc.Close()
- }
- }()
-
- // Register ClientConn with channelz.
- cc.channelzRegistration(target)
-
- cc.csMgr = newConnectivityStateManager(cc.ctx, cc.channelzID)
-
if err := cc.validateTransportCredentials(); err != nil {
return nil, err
}
@@ -205,10 +164,80 @@ func DialContext(ctx context.Context, target string, opts ...DialOption) (conn *
}
cc.mkp = cc.dopts.copts.KeepaliveParams
- if cc.dopts.copts.UserAgent != "" {
- cc.dopts.copts.UserAgent += " " + grpcUA
- } else {
- cc.dopts.copts.UserAgent = grpcUA
+ // Register ClientConn with channelz.
+ cc.channelzRegistration(target)
+
+ // TODO: Ideally it should be impossible to error from this function after
+ // channelz registration. This will require removing some channelz logs
+ // from the following functions that can error. Errors can be returned to
+ // the user, and successful logs can be emitted here, after the checks have
+ // passed and channelz is subsequently registered.
+
+ // Determine the resolver to use.
+ if err := cc.parseTargetAndFindResolver(); err != nil {
+ channelz.RemoveEntry(cc.channelzID)
+ return nil, err
+ }
+ if err = cc.determineAuthority(); err != nil {
+ channelz.RemoveEntry(cc.channelzID)
+ return nil, err
+ }
+
+ cc.csMgr = newConnectivityStateManager(cc.ctx, cc.channelzID)
+ cc.pickerWrapper = newPickerWrapper(cc.dopts.copts.StatsHandlers)
+
+ cc.initIdleStateLocked() // Safe to call without the lock, since nothing else has a reference to cc.
+ cc.idlenessMgr = idle.NewManager((*idler)(cc), cc.dopts.idleTimeout)
+ return cc, nil
+}
+
+// DialContext creates a client connection to the given target. By default, it's
+// a non-blocking dial (the function won't wait for connections to be
+// established, and connecting happens in the background). To make it a blocking
+// dial, use WithBlock() dial option.
+//
+// In the non-blocking case, the ctx does not act against the connection. It
+// only controls the setup steps.
+//
+// In the blocking case, ctx can be used to cancel or expire the pending
+// connection. Once this function returns, the cancellation and expiration of
+// ctx will be noop. Users should call ClientConn.Close to terminate all the
+// pending operations after this function returns.
+//
+// The target name syntax is defined in
+// https://github.com/grpc/grpc/blob/master/doc/naming.md.
+// e.g. to use dns resolver, a "dns:///" prefix should be applied to the target.
+func DialContext(ctx context.Context, target string, opts ...DialOption) (conn *ClientConn, err error) {
+ cc, err := newClient(target, opts...)
+ if err != nil {
+ return nil, err
+ }
+
+ // We start the channel off in idle mode, but kick it out of idle now,
+ // instead of waiting for the first RPC. Other gRPC implementations do wait
+ // for the first RPC to kick the channel out of idle. But doing so would be
+ // a major behavior change for our users who are used to seeing the channel
+ // active after Dial.
+ //
+ // Taking this approach of kicking it out of idle at the end of this method
+ // allows us to share the code between channel creation and exiting idle
+ // mode. This will also make it easy for us to switch to starting the
+ // channel off in idle, i.e. by making newClient exported.
+
+ defer func() {
+ if err != nil {
+ cc.Close()
+ }
+ }()
+
+ // This creates the name resolver, load balancer, etc.
+ if err := cc.idlenessMgr.ExitIdleMode(); err != nil {
+ return nil, err
+ }
+
+ // Return now for non-blocking dials.
+ if !cc.dopts.block {
+ return cc, nil
}
if cc.dopts.timeout > 0 {
@@ -231,49 +260,6 @@ func DialContext(ctx context.Context, target string, opts ...DialOption) (conn *
}
}()
- if cc.dopts.bs == nil {
- cc.dopts.bs = backoff.DefaultExponential
- }
-
- // Determine the resolver to use.
- if err := cc.parseTargetAndFindResolver(); err != nil {
- return nil, err
- }
- if err = cc.determineAuthority(); err != nil {
- return nil, err
- }
-
- if cc.dopts.scChan != nil {
- // Blocking wait for the initial service config.
- select {
- case sc, ok := <-cc.dopts.scChan:
- if ok {
- cc.sc = &sc
- cc.safeConfigSelector.UpdateConfigSelector(&defaultConfigSelector{&sc})
- }
- case <-ctx.Done():
- return nil, ctx.Err()
- }
- }
- if cc.dopts.scChan != nil {
- go cc.scWatcher()
- }
-
- // This creates the name resolver, load balancer, blocking picker etc.
- if err := cc.exitIdleMode(); err != nil {
- return nil, err
- }
-
- // Configure idleness support with configured idle timeout or default idle
- // timeout duration. Idleness can be explicitly disabled by the user, by
- // setting the dial option to 0.
- cc.idlenessMgr = idle.NewManager(idle.ManagerOptions{Enforcer: (*idler)(cc), Timeout: cc.dopts.idleTimeout, Logger: logger})
-
- // Return early for non-blocking dials.
- if !cc.dopts.block {
- return cc, nil
- }
-
// A blocking dial blocks until the clientConn is ready.
for {
s := cc.GetState()
@@ -320,8 +306,8 @@ func (cc *ClientConn) addTraceEvent(msg string) {
type idler ClientConn
-func (i *idler) EnterIdleMode() error {
- return (*ClientConn)(i).enterIdleMode()
+func (i *idler) EnterIdleMode() {
+ (*ClientConn)(i).enterIdleMode()
}
func (i *idler) ExitIdleMode() error {
@@ -329,117 +315,71 @@ func (i *idler) ExitIdleMode() error {
}
// exitIdleMode moves the channel out of idle mode by recreating the name
-// resolver and load balancer.
-func (cc *ClientConn) exitIdleMode() error {
+// resolver and load balancer. This should never be called directly; use
+// cc.idlenessMgr.ExitIdleMode instead.
+func (cc *ClientConn) exitIdleMode() (err error) {
cc.mu.Lock()
if cc.conns == nil {
cc.mu.Unlock()
return errConnClosing
}
- if cc.idlenessState != ccIdlenessStateIdle {
- channelz.Infof(logger, cc.channelzID, "ClientConn asked to exit idle mode, current mode is %v", cc.idlenessState)
- cc.mu.Unlock()
- return nil
- }
-
- defer func() {
- // When Close() and exitIdleMode() race against each other, one of the
- // following two can happen:
- // - Close() wins the race and runs first. exitIdleMode() runs after, and
- // sees that the ClientConn is already closed and hence returns early.
- // - exitIdleMode() wins the race and runs first and recreates the balancer
- // and releases the lock before recreating the resolver. If Close() runs
- // in this window, it will wait for exitIdleMode to complete.
- //
- // We achieve this synchronization using the below condition variable.
- cc.mu.Lock()
- cc.idlenessState = ccIdlenessStateActive
- cc.exitIdleCond.Signal()
- cc.mu.Unlock()
- }()
-
- cc.idlenessState = ccIdlenessStateExitingIdle
- exitedIdle := false
- if cc.blockingpicker == nil {
- cc.blockingpicker = newPickerWrapper(cc.dopts.copts.StatsHandlers)
- } else {
- cc.blockingpicker.exitIdleMode()
- exitedIdle = true
- }
-
- var credsClone credentials.TransportCredentials
- if creds := cc.dopts.copts.TransportCredentials; creds != nil {
- credsClone = creds.Clone()
- }
- if cc.balancerWrapper == nil {
- cc.balancerWrapper = newCCBalancerWrapper(cc, balancer.BuildOptions{
- DialCreds: credsClone,
- CredsBundle: cc.dopts.copts.CredsBundle,
- Dialer: cc.dopts.copts.Dialer,
- Authority: cc.authority,
- CustomUserAgent: cc.dopts.copts.UserAgent,
- ChannelzParentID: cc.channelzID,
- Target: cc.parsedTarget,
- })
- } else {
- cc.balancerWrapper.exitIdleMode()
- }
- cc.firstResolveEvent = grpcsync.NewEvent()
cc.mu.Unlock()
// This needs to be called without cc.mu because this builds a new resolver
- // which might update state or report error inline which needs to be handled
- // by cc.updateResolverState() which also grabs cc.mu.
- if err := cc.initResolverWrapper(credsClone); err != nil {
+ // which might update state or report error inline, which would then need to
+ // acquire cc.mu.
+ if err := cc.resolverWrapper.start(); err != nil {
return err
}
- if exitedIdle {
- cc.addTraceEvent("exiting idle mode")
- }
+ cc.addTraceEvent("exiting idle mode")
return nil
}
+// initIdleStateLocked initializes common state to how it should be while idle.
+func (cc *ClientConn) initIdleStateLocked() {
+ cc.resolverWrapper = newCCResolverWrapper(cc)
+ cc.balancerWrapper = newCCBalancerWrapper(cc)
+ cc.firstResolveEvent = grpcsync.NewEvent()
+ // cc.conns == nil is a proxy for the ClientConn being closed. So, instead
+ // of setting it to nil here, we recreate the map. This also means that we
+ // don't have to do this when exiting idle mode.
+ cc.conns = make(map[*addrConn]struct{})
+}
+
// enterIdleMode puts the channel in idle mode, and as part of it shuts down the
-// name resolver, load balancer and any subchannels.
-func (cc *ClientConn) enterIdleMode() error {
+// name resolver, load balancer, and any subchannels. This should never be
+// called directly; use cc.idlenessMgr.EnterIdleMode instead.
+func (cc *ClientConn) enterIdleMode() {
cc.mu.Lock()
- defer cc.mu.Unlock()
if cc.conns == nil {
- return ErrClientConnClosing
- }
- if cc.idlenessState != ccIdlenessStateActive {
- channelz.Warningf(logger, cc.channelzID, "ClientConn asked to enter idle mode, current mode is %v", cc.idlenessState)
- return nil
+ cc.mu.Unlock()
+ return
}
- // cc.conns == nil is a proxy for the ClientConn being closed. So, instead
- // of setting it to nil here, we recreate the map. This also means that we
- // don't have to do this when exiting idle mode.
conns := cc.conns
- cc.conns = make(map[*addrConn]struct{})
- // TODO: Currently, we close the resolver wrapper upon entering idle mode
- // and create a new one upon exiting idle mode. This means that the
- // `cc.resolverWrapper` field would be overwritten everytime we exit idle
- // mode. While this means that we need to hold `cc.mu` when accessing
- // `cc.resolverWrapper`, it makes the code simpler in the wrapper. We should
- // try to do the same for the balancer and picker wrappers too.
- cc.resolverWrapper.close()
- cc.blockingpicker.enterIdleMode()
- cc.balancerWrapper.enterIdleMode()
+ rWrapper := cc.resolverWrapper
+ rWrapper.close()
+ cc.pickerWrapper.reset()
+ bWrapper := cc.balancerWrapper
+ bWrapper.close()
cc.csMgr.updateState(connectivity.Idle)
- cc.idlenessState = ccIdlenessStateIdle
cc.addTraceEvent("entering idle mode")
- go func() {
- for ac := range conns {
- ac.tearDown(errConnIdling)
- }
- }()
+ cc.initIdleStateLocked()
- return nil
+ cc.mu.Unlock()
+
+ // Block until the name resolver and LB policy are closed.
+ <-rWrapper.serializer.Done()
+ <-bWrapper.serializer.Done()
+
+ // Close all subchannels after the LB policy is closed.
+ for ac := range conns {
+ ac.tearDown(errConnIdling)
+ }
}
// validateTransportCredentials performs a series of checks on the configured
@@ -649,66 +589,35 @@ type ClientConn struct {
dopts dialOptions // Default and user specified dial options.
channelzID *channelz.Identifier // Channelz identifier for the channel.
resolverBuilder resolver.Builder // See parseTargetAndFindResolver().
- balancerWrapper *ccBalancerWrapper // Uses gracefulswitch.balancer underneath.
- idlenessMgr idle.Manager
+ idlenessMgr *idle.Manager
// The following provide their own synchronization, and therefore don't
// require cc.mu to be held to access them.
csMgr *connectivityStateManager
- blockingpicker *pickerWrapper
+ pickerWrapper *pickerWrapper
safeConfigSelector iresolver.SafeConfigSelector
czData *channelzData
retryThrottler atomic.Value // Updated from service config.
- // firstResolveEvent is used to track whether the name resolver sent us at
- // least one update. RPCs block on this event.
- firstResolveEvent *grpcsync.Event
-
// mu protects the following fields.
// TODO: split mu so the same mutex isn't used for everything.
mu sync.RWMutex
- resolverWrapper *ccResolverWrapper // Initialized in Dial; cleared in Close.
+ resolverWrapper *ccResolverWrapper // Always recreated whenever entering idle to simplify Close.
+ balancerWrapper *ccBalancerWrapper // Always recreated whenever entering idle to simplify Close.
sc *ServiceConfig // Latest service config received from the resolver.
conns map[*addrConn]struct{} // Set to nil on close.
mkp keepalive.ClientParameters // May be updated upon receipt of a GoAway.
- idlenessState ccIdlenessState // Tracks idleness state of the channel.
- exitIdleCond *sync.Cond // Signalled when channel exits idle.
+ // firstResolveEvent is used to track whether the name resolver sent us at
+ // least one update. RPCs block on this event. May be accessed without mu
+ // if we know we cannot be asked to enter idle mode while accessing it (e.g.
+ // when the idle manager has already been closed, or if we are already
+ // entering idle mode).
+ firstResolveEvent *grpcsync.Event
lceMu sync.Mutex // protects lastConnectionError
lastConnectionError error
}
-// ccIdlenessState tracks the idleness state of the channel.
-//
-// Channels start off in `active` and move to `idle` after a period of
-// inactivity. When moving back to `active` upon an incoming RPC, they
-// transition through `exiting_idle`. This state is useful for synchronization
-// with Close().
-//
-// This state tracking is mostly for self-protection. The idlenessManager is
-// expected to keep track of the state as well, and is expected not to call into
-// the ClientConn unnecessarily.
-type ccIdlenessState int8
-
-const (
- ccIdlenessStateActive ccIdlenessState = iota
- ccIdlenessStateIdle
- ccIdlenessStateExitingIdle
-)
-
-func (s ccIdlenessState) String() string {
- switch s {
- case ccIdlenessStateActive:
- return "active"
- case ccIdlenessStateIdle:
- return "idle"
- case ccIdlenessStateExitingIdle:
- return "exitingIdle"
- default:
- return "unknown"
- }
-}
-
// WaitForStateChange waits until the connectivity.State of ClientConn changes from sourceState or
// ctx expires. A true value is returned in former case and false in latter.
//
@@ -748,29 +657,15 @@ func (cc *ClientConn) GetState() connectivity.State {
// Notice: This API is EXPERIMENTAL and may be changed or removed in a later
// release.
func (cc *ClientConn) Connect() {
- cc.exitIdleMode()
+ if err := cc.idlenessMgr.ExitIdleMode(); err != nil {
+ cc.addTraceEvent(err.Error())
+ return
+ }
// If the ClientConn was not in idle mode, we need to call ExitIdle on the
// LB policy so that connections can be created.
- cc.balancerWrapper.exitIdleMode()
-}
-
-func (cc *ClientConn) scWatcher() {
- for {
- select {
- case sc, ok := <-cc.dopts.scChan:
- if !ok {
- return
- }
- cc.mu.Lock()
- // TODO: load balance policy runtime change is ignored.
- // We may revisit this decision in the future.
- cc.sc = &sc
- cc.safeConfigSelector.UpdateConfigSelector(&defaultConfigSelector{&sc})
- cc.mu.Unlock()
- case <-cc.ctx.Done():
- return
- }
- }
+ cc.mu.Lock()
+ cc.balancerWrapper.exitIdle()
+ cc.mu.Unlock()
}
// waitForResolvedAddrs blocks until the resolver has provided addresses or the
@@ -804,11 +699,11 @@ func init() {
internal.SubscribeToConnectivityStateChanges = func(cc *ClientConn, s grpcsync.Subscriber) func() {
return cc.csMgr.pubSub.Subscribe(s)
}
- internal.EnterIdleModeForTesting = func(cc *ClientConn) error {
- return cc.enterIdleMode()
+ internal.EnterIdleModeForTesting = func(cc *ClientConn) {
+ cc.idlenessMgr.EnterIdleModeForTesting()
}
internal.ExitIdleModeForTesting = func(cc *ClientConn) error {
- return cc.exitIdleMode()
+ return cc.idlenessMgr.ExitIdleMode()
}
}
@@ -824,9 +719,8 @@ func (cc *ClientConn) maybeApplyDefaultServiceConfig(addrs []resolver.Address) {
}
}
-func (cc *ClientConn) updateResolverState(s resolver.State, err error) error {
+func (cc *ClientConn) updateResolverStateAndUnlock(s resolver.State, err error) error {
defer cc.firstResolveEvent.Fire()
- cc.mu.Lock()
// Check if the ClientConn is already closed. Some fields (e.g.
// balancerWrapper) are set to nil when closing the ClientConn, and could
// cause nil pointer panic if we don't have this check.
@@ -872,7 +766,7 @@ func (cc *ClientConn) updateResolverState(s resolver.State, err error) error {
if cc.sc == nil {
// Apply the failing LB only if we haven't received valid service config
// from the name resolver in the past.
- cc.applyFailingLB(s.ServiceConfig)
+ cc.applyFailingLBLocked(s.ServiceConfig)
cc.mu.Unlock()
return ret
}
@@ -894,15 +788,13 @@ func (cc *ClientConn) updateResolverState(s resolver.State, err error) error {
return ret
}
-// applyFailingLB is akin to configuring an LB policy on the channel which
+// applyFailingLBLocked is akin to configuring an LB policy on the channel which
// always fails RPCs. Here, an actual LB policy is not configured, but an always
// erroring picker is configured, which returns errors with information about
// what was invalid in the received service config. A config selector with no
// service config is configured, and the connectivity state of the channel is
// set to TransientFailure.
-//
-// Caller must hold cc.mu.
-func (cc *ClientConn) applyFailingLB(sc *serviceconfig.ParseResult) {
+func (cc *ClientConn) applyFailingLBLocked(sc *serviceconfig.ParseResult) {
var err error
if sc.Err != nil {
err = status.Errorf(codes.Unavailable, "error parsing service config: %v", sc.Err)
@@ -910,14 +802,10 @@ func (cc *ClientConn) applyFailingLB(sc *serviceconfig.ParseResult) {
err = status.Errorf(codes.Unavailable, "illegal service config type: %T", sc.Config)
}
cc.safeConfigSelector.UpdateConfigSelector(&defaultConfigSelector{nil})
- cc.blockingpicker.updatePicker(base.NewErrPicker(err))
+ cc.pickerWrapper.updatePicker(base.NewErrPicker(err))
cc.csMgr.updateState(connectivity.TransientFailure)
}
-func (cc *ClientConn) handleSubConnStateChange(sc balancer.SubConn, s connectivity.State, err error) {
- cc.balancerWrapper.updateSubConnState(sc, s, err)
-}
-
// Makes a copy of the input addresses slice and clears out the balancer
// attributes field. Addresses are passed during subconn creation and address
// update operations. In both cases, we will clear the balancer attributes by
@@ -932,10 +820,14 @@ func copyAddressesWithoutBalancerAttributes(in []resolver.Address) []resolver.Ad
return out
}
-// newAddrConn creates an addrConn for addrs and adds it to cc.conns.
+// newAddrConnLocked creates an addrConn for addrs and adds it to cc.conns.
//
// Caller needs to make sure len(addrs) > 0.
-func (cc *ClientConn) newAddrConn(addrs []resolver.Address, opts balancer.NewSubConnOptions) (*addrConn, error) {
+func (cc *ClientConn) newAddrConnLocked(addrs []resolver.Address, opts balancer.NewSubConnOptions) (*addrConn, error) {
+ if cc.conns == nil {
+ return nil, ErrClientConnClosing
+ }
+
ac := &addrConn{
state: connectivity.Idle,
cc: cc,
@@ -947,12 +839,6 @@ func (cc *ClientConn) newAddrConn(addrs []resolver.Address, opts balancer.NewSub
stateChan: make(chan struct{}),
}
ac.ctx, ac.cancel = context.WithCancel(cc.ctx)
- // Track ac in cc. This needs to be done before any getTransport(...) is called.
- cc.mu.Lock()
- defer cc.mu.Unlock()
- if cc.conns == nil {
- return nil, ErrClientConnClosing
- }
var err error
ac.channelzID, err = channelz.RegisterSubChannel(ac, cc.channelzID, "")
@@ -968,6 +854,7 @@ func (cc *ClientConn) newAddrConn(addrs []resolver.Address, opts balancer.NewSub
},
})
+ // Track ac in cc. This needs to be done before any getTransport(...) is called.
cc.conns[ac] = struct{}{}
return ac, nil
}
@@ -1174,7 +1061,7 @@ func (cc *ClientConn) healthCheckConfig() *healthCheckConfig {
}
func (cc *ClientConn) getTransport(ctx context.Context, failfast bool, method string) (transport.ClientTransport, balancer.PickResult, error) {
- return cc.blockingpicker.pick(ctx, failfast, balancer.PickInfo{
+ return cc.pickerWrapper.pick(ctx, failfast, balancer.PickInfo{
Ctx: ctx,
FullMethodName: method,
})
@@ -1216,12 +1103,12 @@ func (cc *ClientConn) applyServiceConfigAndBalancer(sc *ServiceConfig, configSel
func (cc *ClientConn) resolveNow(o resolver.ResolveNowOptions) {
cc.mu.RLock()
- r := cc.resolverWrapper
+ cc.resolverWrapper.resolveNow(o)
cc.mu.RUnlock()
- if r == nil {
- return
- }
- go r.resolveNow(o)
+}
+
+func (cc *ClientConn) resolveNowLocked(o resolver.ResolveNowOptions) {
+ cc.resolverWrapper.resolveNow(o)
}
// ResetConnectBackoff wakes up all subchannels in transient failure and causes
@@ -1253,40 +1140,32 @@ func (cc *ClientConn) Close() error {
<-cc.csMgr.pubSub.Done()
}()
+ // Prevent calls to enter/exit idle immediately, and ensure we are not
+ // currently entering/exiting idle mode.
+ cc.idlenessMgr.Close()
+
cc.mu.Lock()
if cc.conns == nil {
cc.mu.Unlock()
return ErrClientConnClosing
}
- for cc.idlenessState == ccIdlenessStateExitingIdle {
- cc.exitIdleCond.Wait()
- }
-
conns := cc.conns
cc.conns = nil
cc.csMgr.updateState(connectivity.Shutdown)
- pWrapper := cc.blockingpicker
- rWrapper := cc.resolverWrapper
- bWrapper := cc.balancerWrapper
- idlenessMgr := cc.idlenessMgr
+ // We can safely unlock and continue to access all fields now as
+ // cc.conns==nil, preventing any further operations on cc.
cc.mu.Unlock()
+ cc.resolverWrapper.close()
// The order of closing matters here since the balancer wrapper assumes the
// picker is closed before it is closed.
- if pWrapper != nil {
- pWrapper.close()
- }
- if bWrapper != nil {
- bWrapper.close()
- }
- if rWrapper != nil {
- rWrapper.close()
- }
- if idlenessMgr != nil {
- idlenessMgr.Close()
- }
+ cc.pickerWrapper.close()
+ cc.balancerWrapper.close()
+
+ <-cc.resolverWrapper.serializer.Done()
+ <-cc.balancerWrapper.serializer.Done()
for ac := range conns {
ac.tearDown(ErrClientConnClosing)
@@ -1307,7 +1186,7 @@ type addrConn struct {
cc *ClientConn
dopts dialOptions
- acbw balancer.SubConn
+ acbw *acBalancerWrapper
scopts balancer.NewSubConnOptions
// transport is set when there's a viable transport (note: ac state may not be READY as LB channel
@@ -1345,7 +1224,7 @@ func (ac *addrConn) updateConnectivityState(s connectivity.State, lastErr error)
} else {
channelz.Infof(logger, ac.channelzID, "Subchannel Connectivity change to %v, last error: %s", s, lastErr)
}
- ac.cc.handleSubConnStateChange(ac.acbw, s, lastErr)
+ ac.acbw.updateState(s, lastErr)
}
// adjustParams updates parameters used to create transports upon
@@ -1849,7 +1728,7 @@ func (cc *ClientConn) parseTargetAndFindResolver() error {
if err != nil {
channelz.Infof(logger, cc.channelzID, "dial target %q parse failed: %v", cc.target, err)
} else {
- channelz.Infof(logger, cc.channelzID, "parsed dial target is: %+v", parsedTarget)
+ channelz.Infof(logger, cc.channelzID, "parsed dial target is: %#v", parsedTarget)
rb = cc.getResolver(parsedTarget.URL.Scheme)
if rb != nil {
cc.parsedTarget = parsedTarget
@@ -1981,58 +1860,17 @@ func (cc *ClientConn) determineAuthority() error {
}
endpoint := cc.parsedTarget.Endpoint()
- target := cc.target
- switch {
- case authorityFromDialOption != "":
+ if authorityFromDialOption != "" {
cc.authority = authorityFromDialOption
- case authorityFromCreds != "":
+ } else if authorityFromCreds != "" {
cc.authority = authorityFromCreds
- case strings.HasPrefix(target, "unix:") || strings.HasPrefix(target, "unix-abstract:"):
- // TODO: remove when the unix resolver implements optional interface to
- // return channel authority.
- cc.authority = "localhost"
- case strings.HasPrefix(endpoint, ":"):
+ } else if auth, ok := cc.resolverBuilder.(resolver.AuthorityOverrider); ok {
+ cc.authority = auth.OverrideAuthority(cc.parsedTarget)
+ } else if strings.HasPrefix(endpoint, ":") {
cc.authority = "localhost" + endpoint
- default:
- // TODO: Define an optional interface on the resolver builder to return
- // the channel authority given the user's dial target. For resolvers
- // which don't implement this interface, we will use the endpoint from
- // "scheme://authority/endpoint" as the default authority.
- // Escape the endpoint to handle use cases where the endpoint
- // might not be a valid authority by default.
- // For example an endpoint which has multiple paths like
- // 'a/b/c', which is not a valid authority by default.
+ } else {
cc.authority = encodeAuthority(endpoint)
}
channelz.Infof(logger, cc.channelzID, "Channel authority set to %q", cc.authority)
return nil
}
-
-// initResolverWrapper creates a ccResolverWrapper, which builds the name
-// resolver. This method grabs the lock to assign the newly built resolver
-// wrapper to the cc.resolverWrapper field.
-func (cc *ClientConn) initResolverWrapper(creds credentials.TransportCredentials) error {
- rw, err := newCCResolverWrapper(cc, ccResolverWrapperOpts{
- target: cc.parsedTarget,
- builder: cc.resolverBuilder,
- bOpts: resolver.BuildOptions{
- DisableServiceConfig: cc.dopts.disableServiceConfig,
- DialCreds: creds,
- CredsBundle: cc.dopts.copts.CredsBundle,
- Dialer: cc.dopts.copts.Dialer,
- },
- channelzID: cc.channelzID,
- })
- if err != nil {
- return fmt.Errorf("failed to build resolver: %v", err)
- }
- // Resolver implementations may report state update or error inline when
- // built (or right after), and this is handled in cc.updateResolverState.
- // Also, an error from the resolver might lead to a re-resolution request
- // from the balancer, which is handled in resolveNow() where
- // `cc.resolverWrapper` is accessed. Hence, we need to hold the lock here.
- cc.mu.Lock()
- cc.resolverWrapper = rw
- cc.mu.Unlock()
- return nil
-}
diff --git a/vendor/google.golang.org/grpc/codes/codes.go b/vendor/google.golang.org/grpc/codes/codes.go
index 11b106182..08476ad1f 100644
--- a/vendor/google.golang.org/grpc/codes/codes.go
+++ b/vendor/google.golang.org/grpc/codes/codes.go
@@ -25,7 +25,13 @@ import (
"strconv"
)
-// A Code is an unsigned 32-bit error code as defined in the gRPC spec.
+// A Code is a status code defined according to the [gRPC documentation].
+//
+// Only the codes defined as consts in this package are valid codes. Do not use
+// other code values. Behavior of other codes is implementation-specific and
+// interoperability between implementations is not guaranteed.
+//
+// [gRPC documentation]: https://github.com/grpc/grpc/blob/master/doc/statuscodes.md
type Code uint32
const (
diff --git a/vendor/google.golang.org/grpc/credentials/tls.go b/vendor/google.golang.org/grpc/credentials/tls.go
index 877b7cd21..5dafd34ed 100644
--- a/vendor/google.golang.org/grpc/credentials/tls.go
+++ b/vendor/google.golang.org/grpc/credentials/tls.go
@@ -44,10 +44,25 @@ func (t TLSInfo) AuthType() string {
return "tls"
}
+// cipherSuiteLookup returns the string version of a TLS cipher suite ID.
+func cipherSuiteLookup(cipherSuiteID uint16) string {
+ for _, s := range tls.CipherSuites() {
+ if s.ID == cipherSuiteID {
+ return s.Name
+ }
+ }
+ for _, s := range tls.InsecureCipherSuites() {
+ if s.ID == cipherSuiteID {
+ return s.Name
+ }
+ }
+ return fmt.Sprintf("unknown ID: %v", cipherSuiteID)
+}
+
// GetSecurityValue returns security info requested by channelz.
func (t TLSInfo) GetSecurityValue() ChannelzSecurityValue {
v := &TLSChannelzSecurityValue{
- StandardName: cipherSuiteLookup[t.State.CipherSuite],
+ StandardName: cipherSuiteLookup(t.State.CipherSuite),
}
// Currently there's no way to get LocalCertificate info from tls package.
if len(t.State.PeerCertificates) > 0 {
@@ -138,10 +153,39 @@ func (c *tlsCreds) OverrideServerName(serverNameOverride string) error {
return nil
}
+// The following cipher suites are forbidden for use with HTTP/2 by
+// https://datatracker.ietf.org/doc/html/rfc7540#appendix-A
+var tls12ForbiddenCipherSuites = map[uint16]struct{}{
+ tls.TLS_RSA_WITH_AES_128_CBC_SHA: {},
+ tls.TLS_RSA_WITH_AES_256_CBC_SHA: {},
+ tls.TLS_RSA_WITH_AES_128_GCM_SHA256: {},
+ tls.TLS_RSA_WITH_AES_256_GCM_SHA384: {},
+ tls.TLS_ECDHE_ECDSA_WITH_AES_128_CBC_SHA: {},
+ tls.TLS_ECDHE_ECDSA_WITH_AES_256_CBC_SHA: {},
+ tls.TLS_ECDHE_RSA_WITH_AES_128_CBC_SHA: {},
+ tls.TLS_ECDHE_RSA_WITH_AES_256_CBC_SHA: {},
+}
+
// NewTLS uses c to construct a TransportCredentials based on TLS.
func NewTLS(c *tls.Config) TransportCredentials {
tc := &tlsCreds{credinternal.CloneTLSConfig(c)}
tc.config.NextProtos = credinternal.AppendH2ToNextProtos(tc.config.NextProtos)
+ // If the user did not configure a MinVersion and did not configure a
+ // MaxVersion < 1.2, use MinVersion=1.2, which is required by
+ // https://datatracker.ietf.org/doc/html/rfc7540#section-9.2
+ if tc.config.MinVersion == 0 && (tc.config.MaxVersion == 0 || tc.config.MaxVersion >= tls.VersionTLS12) {
+ tc.config.MinVersion = tls.VersionTLS12
+ }
+ // If the user did not configure CipherSuites, use all "secure" cipher
+ // suites reported by the TLS package, but remove some explicitly forbidden
+ // by https://datatracker.ietf.org/doc/html/rfc7540#appendix-A
+ if tc.config.CipherSuites == nil {
+ for _, cs := range tls.CipherSuites() {
+ if _, ok := tls12ForbiddenCipherSuites[cs.ID]; !ok {
+ tc.config.CipherSuites = append(tc.config.CipherSuites, cs.ID)
+ }
+ }
+ }
return tc
}
@@ -205,32 +249,3 @@ type TLSChannelzSecurityValue struct {
LocalCertificate []byte
RemoteCertificate []byte
}
-
-var cipherSuiteLookup = map[uint16]string{
- tls.TLS_RSA_WITH_RC4_128_SHA: "TLS_RSA_WITH_RC4_128_SHA",
- tls.TLS_RSA_WITH_3DES_EDE_CBC_SHA: "TLS_RSA_WITH_3DES_EDE_CBC_SHA",
- tls.TLS_RSA_WITH_AES_128_CBC_SHA: "TLS_RSA_WITH_AES_128_CBC_SHA",
- tls.TLS_RSA_WITH_AES_256_CBC_SHA: "TLS_RSA_WITH_AES_256_CBC_SHA",
- tls.TLS_RSA_WITH_AES_128_GCM_SHA256: "TLS_RSA_WITH_AES_128_GCM_SHA256",
- tls.TLS_RSA_WITH_AES_256_GCM_SHA384: "TLS_RSA_WITH_AES_256_GCM_SHA384",
- tls.TLS_ECDHE_ECDSA_WITH_RC4_128_SHA: "TLS_ECDHE_ECDSA_WITH_RC4_128_SHA",
- tls.TLS_ECDHE_ECDSA_WITH_AES_128_CBC_SHA: "TLS_ECDHE_ECDSA_WITH_AES_128_CBC_SHA",
- tls.TLS_ECDHE_ECDSA_WITH_AES_256_CBC_SHA: "TLS_ECDHE_ECDSA_WITH_AES_256_CBC_SHA",
- tls.TLS_ECDHE_RSA_WITH_RC4_128_SHA: "TLS_ECDHE_RSA_WITH_RC4_128_SHA",
- tls.TLS_ECDHE_RSA_WITH_3DES_EDE_CBC_SHA: "TLS_ECDHE_RSA_WITH_3DES_EDE_CBC_SHA",
- tls.TLS_ECDHE_RSA_WITH_AES_128_CBC_SHA: "TLS_ECDHE_RSA_WITH_AES_128_CBC_SHA",
- tls.TLS_ECDHE_RSA_WITH_AES_256_CBC_SHA: "TLS_ECDHE_RSA_WITH_AES_256_CBC_SHA",
- tls.TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256: "TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256",
- tls.TLS_ECDHE_ECDSA_WITH_AES_128_GCM_SHA256: "TLS_ECDHE_ECDSA_WITH_AES_128_GCM_SHA256",
- tls.TLS_ECDHE_RSA_WITH_AES_256_GCM_SHA384: "TLS_ECDHE_RSA_WITH_AES_256_GCM_SHA384",
- tls.TLS_ECDHE_ECDSA_WITH_AES_256_GCM_SHA384: "TLS_ECDHE_ECDSA_WITH_AES_256_GCM_SHA384",
- tls.TLS_FALLBACK_SCSV: "TLS_FALLBACK_SCSV",
- tls.TLS_RSA_WITH_AES_128_CBC_SHA256: "TLS_RSA_WITH_AES_128_CBC_SHA256",
- tls.TLS_ECDHE_ECDSA_WITH_AES_128_CBC_SHA256: "TLS_ECDHE_ECDSA_WITH_AES_128_CBC_SHA256",
- tls.TLS_ECDHE_RSA_WITH_AES_128_CBC_SHA256: "TLS_ECDHE_RSA_WITH_AES_128_CBC_SHA256",
- tls.TLS_ECDHE_RSA_WITH_CHACHA20_POLY1305: "TLS_ECDHE_RSA_WITH_CHACHA20_POLY1305",
- tls.TLS_ECDHE_ECDSA_WITH_CHACHA20_POLY1305: "TLS_ECDHE_ECDSA_WITH_CHACHA20_POLY1305",
- tls.TLS_AES_128_GCM_SHA256: "TLS_AES_128_GCM_SHA256",
- tls.TLS_AES_256_GCM_SHA384: "TLS_AES_256_GCM_SHA384",
- tls.TLS_CHACHA20_POLY1305_SHA256: "TLS_CHACHA20_POLY1305_SHA256",
-}
diff --git a/vendor/google.golang.org/grpc/dialoptions.go b/vendor/google.golang.org/grpc/dialoptions.go
index cfc9fd85e..ba2426180 100644
--- a/vendor/google.golang.org/grpc/dialoptions.go
+++ b/vendor/google.golang.org/grpc/dialoptions.go
@@ -46,6 +46,7 @@ func init() {
internal.WithBinaryLogger = withBinaryLogger
internal.JoinDialOptions = newJoinDialOption
internal.DisableGlobalDialOptions = newDisableGlobalDialOptions
+ internal.WithRecvBufferPool = withRecvBufferPool
}
// dialOptions configure a Dial call. dialOptions are set by the DialOption
@@ -63,7 +64,6 @@ type dialOptions struct {
block bool
returnLastError bool
timeout time.Duration
- scChan <-chan ServiceConfig
authority string
binaryLogger binarylog.Logger
copts transport.ConnectOptions
@@ -250,19 +250,6 @@ func WithDecompressor(dc Decompressor) DialOption {
})
}
-// WithServiceConfig returns a DialOption which has a channel to read the
-// service configuration.
-//
-// Deprecated: service config should be received through name resolver or via
-// WithDefaultServiceConfig, as specified at
-// https://github.com/grpc/grpc/blob/master/doc/service_config.md. Will be
-// removed in a future 1.x release.
-func WithServiceConfig(c <-chan ServiceConfig) DialOption {
- return newFuncDialOption(func(o *dialOptions) {
- o.scChan = c
- })
-}
-
// WithConnectParams configures the ClientConn to use the provided ConnectParams
// for creating and maintaining connections to servers.
//
@@ -413,6 +400,17 @@ func WithTimeout(d time.Duration) DialOption {
// connections. If FailOnNonTempDialError() is set to true, and an error is
// returned by f, gRPC checks the error's Temporary() method to decide if it
// should try to reconnect to the network address.
+//
+// Note: All supported releases of Go (as of December 2023) override the OS
+// defaults for TCP keepalive time and interval to 15s. To enable TCP keepalive
+// with OS defaults for keepalive time and interval, use a net.Dialer that sets
+// the KeepAlive field to a negative value, and sets the SO_KEEPALIVE socket
+// option to true from the Control field. For a concrete example of how to do
+// this, see internal.NetDialerWithTCPKeepalive().
+//
+// For more information, please see [issue 23459] in the Go github repo.
+//
+// [issue 23459]: https://github.com/golang/go/issues/23459
func WithContextDialer(f func(context.Context, string) (net.Conn, error)) DialOption {
return newFuncDialOption(func(o *dialOptions) {
o.copts.Dialer = f
@@ -487,7 +485,7 @@ func FailOnNonTempDialError(f bool) DialOption {
// the RPCs.
func WithUserAgent(s string) DialOption {
return newFuncDialOption(func(o *dialOptions) {
- o.copts.UserAgent = s
+ o.copts.UserAgent = s + " " + grpcUA
})
}
@@ -637,14 +635,16 @@ func withHealthCheckFunc(f internal.HealthChecker) DialOption {
func defaultDialOptions() dialOptions {
return dialOptions{
- healthCheckFunc: internal.HealthCheckFunc,
copts: transport.ConnectOptions{
- WriteBufferSize: defaultWriteBufSize,
ReadBufferSize: defaultReadBufSize,
+ WriteBufferSize: defaultWriteBufSize,
UseProxy: true,
+ UserAgent: grpcUA,
},
- recvBufferPool: nopBufferPool{},
- idleTimeout: 30 * time.Minute,
+ bs: internalbackoff.DefaultExponential,
+ healthCheckFunc: internal.HealthCheckFunc,
+ idleTimeout: 30 * time.Minute,
+ recvBufferPool: nopBufferPool{},
}
}
@@ -705,11 +705,13 @@ func WithIdleTimeout(d time.Duration) DialOption {
// options are used: WithStatsHandler, EnableTracing, or binary logging. In such
// cases, the shared buffer pool will be ignored.
//
-// # Experimental
-//
-// Notice: This API is EXPERIMENTAL and may be changed or removed in a
-// later release.
+// Deprecated: use experimental.WithRecvBufferPool instead. Will be deleted in
+// v1.60.0 or later.
func WithRecvBufferPool(bufferPool SharedBufferPool) DialOption {
+ return withRecvBufferPool(bufferPool)
+}
+
+func withRecvBufferPool(bufferPool SharedBufferPool) DialOption {
return newFuncDialOption(func(o *dialOptions) {
o.recvBufferPool = bufferPool
})
diff --git a/vendor/google.golang.org/grpc/internal/buffer/unbounded.go b/vendor/google.golang.org/grpc/internal/buffer/unbounded.go
index 4399c3df4..11f91668a 100644
--- a/vendor/google.golang.org/grpc/internal/buffer/unbounded.go
+++ b/vendor/google.golang.org/grpc/internal/buffer/unbounded.go
@@ -18,7 +18,10 @@
// Package buffer provides an implementation of an unbounded buffer.
package buffer
-import "sync"
+import (
+ "errors"
+ "sync"
+)
// Unbounded is an implementation of an unbounded buffer which does not use
// extra goroutines. This is typically used for passing updates from one entity
@@ -36,6 +39,7 @@ import "sync"
type Unbounded struct {
c chan any
closed bool
+ closing bool
mu sync.Mutex
backlog []any
}
@@ -45,32 +49,32 @@ func NewUnbounded() *Unbounded {
return &Unbounded{c: make(chan any, 1)}
}
+var errBufferClosed = errors.New("Put called on closed buffer.Unbounded")
+
// Put adds t to the unbounded buffer.
-func (b *Unbounded) Put(t any) {
+func (b *Unbounded) Put(t any) error {
b.mu.Lock()
defer b.mu.Unlock()
- if b.closed {
- return
+ if b.closing {
+ return errBufferClosed
}
if len(b.backlog) == 0 {
select {
case b.c <- t:
- return
+ return nil
default:
}
}
b.backlog = append(b.backlog, t)
+ return nil
}
-// Load sends the earliest buffered data, if any, onto the read channel
-// returned by Get(). Users are expected to call this every time they read a
+// Load sends the earliest buffered data, if any, onto the read channel returned
+// by Get(). Users are expected to call this every time they successfully read a
// value from the read channel.
func (b *Unbounded) Load() {
b.mu.Lock()
defer b.mu.Unlock()
- if b.closed {
- return
- }
if len(b.backlog) > 0 {
select {
case b.c <- b.backlog[0]:
@@ -78,6 +82,8 @@ func (b *Unbounded) Load() {
b.backlog = b.backlog[1:]
default:
}
+ } else if b.closing && !b.closed {
+ close(b.c)
}
}
@@ -88,18 +94,23 @@ func (b *Unbounded) Load() {
// send the next buffered value onto the channel if there is any.
//
// If the unbounded buffer is closed, the read channel returned by this method
-// is closed.
+// is closed after all data is drained.
func (b *Unbounded) Get() <-chan any {
return b.c
}
-// Close closes the unbounded buffer.
+// Close closes the unbounded buffer. No subsequent data may be Put(), and the
+// channel returned from Get() will be closed after all the data is read and
+// Load() is called for the final time.
func (b *Unbounded) Close() {
b.mu.Lock()
defer b.mu.Unlock()
- if b.closed {
+ if b.closing {
return
}
- b.closed = true
- close(b.c)
+ b.closing = true
+ if len(b.backlog) == 0 {
+ b.closed = true
+ close(b.c)
+ }
}
diff --git a/vendor/google.golang.org/grpc/internal/channelz/funcs.go b/vendor/google.golang.org/grpc/internal/channelz/funcs.go
index 5395e7752..fc094f344 100644
--- a/vendor/google.golang.org/grpc/internal/channelz/funcs.go
+++ b/vendor/google.golang.org/grpc/internal/channelz/funcs.go
@@ -31,6 +31,7 @@ import (
"time"
"google.golang.org/grpc/grpclog"
+ "google.golang.org/grpc/internal"
)
const (
@@ -58,6 +59,12 @@ func TurnOn() {
}
}
+func init() {
+ internal.ChannelzTurnOffForTesting = func() {
+ atomic.StoreInt32(&curState, 0)
+ }
+}
+
// IsOn returns whether channelz data collection is on.
func IsOn() bool {
return atomic.LoadInt32(&curState) == 1
diff --git a/vendor/google.golang.org/grpc/internal/envconfig/envconfig.go b/vendor/google.golang.org/grpc/internal/envconfig/envconfig.go
index 3cf10ddfb..685a3cb41 100644
--- a/vendor/google.golang.org/grpc/internal/envconfig/envconfig.go
+++ b/vendor/google.golang.org/grpc/internal/envconfig/envconfig.go
@@ -36,9 +36,6 @@ var (
// "GRPC_RING_HASH_CAP". This does not override the default bounds
// checking which NACKs configs specifying ring sizes > 8*1024*1024 (~8M).
RingHashCap = uint64FromEnv("GRPC_RING_HASH_CAP", 4096, 1, 8*1024*1024)
- // PickFirstLBConfig is set if we should support configuration of the
- // pick_first LB policy.
- PickFirstLBConfig = boolFromEnv("GRPC_EXPERIMENTAL_PICKFIRST_LB_CONFIG", true)
// LeastRequestLB is set if we should support the least_request_experimental
// LB policy, which can be enabled by setting the environment variable
// "GRPC_EXPERIMENTAL_ENABLE_LEAST_REQUEST" to "true".
diff --git a/vendor/google.golang.org/grpc/internal/envconfig/xds.go b/vendor/google.golang.org/grpc/internal/envconfig/xds.go
index 02b4b6a1c..29f234acb 100644
--- a/vendor/google.golang.org/grpc/internal/envconfig/xds.go
+++ b/vendor/google.golang.org/grpc/internal/envconfig/xds.go
@@ -50,46 +50,7 @@ var (
//
// When both bootstrap FileName and FileContent are set, FileName is used.
XDSBootstrapFileContent = os.Getenv(XDSBootstrapFileContentEnv)
- // XDSRingHash indicates whether ring hash support is enabled, which can be
- // disabled by setting the environment variable
- // "GRPC_XDS_EXPERIMENTAL_ENABLE_RING_HASH" to "false".
- XDSRingHash = boolFromEnv("GRPC_XDS_EXPERIMENTAL_ENABLE_RING_HASH", true)
- // XDSClientSideSecurity is used to control processing of security
- // configuration on the client-side.
- //
- // Note that there is no env var protection for the server-side because we
- // have a brand new API on the server-side and users explicitly need to use
- // the new API to get security integration on the server.
- XDSClientSideSecurity = boolFromEnv("GRPC_XDS_EXPERIMENTAL_SECURITY_SUPPORT", true)
- // XDSAggregateAndDNS indicates whether processing of aggregated cluster and
- // DNS cluster is enabled, which can be disabled by setting the environment
- // variable "GRPC_XDS_EXPERIMENTAL_ENABLE_AGGREGATE_AND_LOGICAL_DNS_CLUSTER"
- // to "false".
- XDSAggregateAndDNS = boolFromEnv("GRPC_XDS_EXPERIMENTAL_ENABLE_AGGREGATE_AND_LOGICAL_DNS_CLUSTER", true)
-
- // XDSRBAC indicates whether xDS configured RBAC HTTP Filter is enabled,
- // which can be disabled by setting the environment variable
- // "GRPC_XDS_EXPERIMENTAL_RBAC" to "false".
- XDSRBAC = boolFromEnv("GRPC_XDS_EXPERIMENTAL_RBAC", true)
- // XDSOutlierDetection indicates whether outlier detection support is
- // enabled, which can be disabled by setting the environment variable
- // "GRPC_EXPERIMENTAL_ENABLE_OUTLIER_DETECTION" to "false".
- XDSOutlierDetection = boolFromEnv("GRPC_EXPERIMENTAL_ENABLE_OUTLIER_DETECTION", true)
- // XDSFederation indicates whether federation support is enabled, which can
- // be enabled by setting the environment variable
- // "GRPC_EXPERIMENTAL_XDS_FEDERATION" to "true".
- XDSFederation = boolFromEnv("GRPC_EXPERIMENTAL_XDS_FEDERATION", true)
-
- // XDSRLS indicates whether processing of Cluster Specifier plugins and
- // support for the RLS CLuster Specifier is enabled, which can be disabled by
- // setting the environment variable "GRPC_EXPERIMENTAL_XDS_RLS_LB" to
- // "false".
- XDSRLS = boolFromEnv("GRPC_EXPERIMENTAL_XDS_RLS_LB", true)
// C2PResolverTestOnlyTrafficDirectorURI is the TD URI for testing.
C2PResolverTestOnlyTrafficDirectorURI = os.Getenv("GRPC_TEST_ONLY_GOOGLE_C2P_RESOLVER_TRAFFIC_DIRECTOR_URI")
- // XDSCustomLBPolicy indicates whether Custom LB Policies are enabled, which
- // can be disabled by setting the environment variable
- // "GRPC_EXPERIMENTAL_XDS_CUSTOM_LB_CONFIG" to "false".
- XDSCustomLBPolicy = boolFromEnv("GRPC_EXPERIMENTAL_XDS_CUSTOM_LB_CONFIG", true)
)
diff --git a/vendor/google.golang.org/grpc/internal/experimental.go b/vendor/google.golang.org/grpc/internal/experimental.go
new file mode 100644
index 000000000..7f7044e17
--- /dev/null
+++ b/vendor/google.golang.org/grpc/internal/experimental.go
@@ -0,0 +1,28 @@
+/*
+ * Copyright 2023 gRPC authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package internal
+
+var (
+ // WithRecvBufferPool is implemented by the grpc package and returns a dial
+ // option to configure a shared buffer pool for a grpc.ClientConn.
+ WithRecvBufferPool any // func (grpc.SharedBufferPool) grpc.DialOption
+
+ // RecvBufferPool is implemented by the grpc package and returns a server
+ // option to configure a shared buffer pool for a grpc.Server.
+ RecvBufferPool any // func (grpc.SharedBufferPool) grpc.ServerOption
+)
diff --git a/vendor/google.golang.org/grpc/internal/grpcsync/callback_serializer.go b/vendor/google.golang.org/grpc/internal/grpcsync/callback_serializer.go
index 900917dbe..f7f40a16a 100644
--- a/vendor/google.golang.org/grpc/internal/grpcsync/callback_serializer.go
+++ b/vendor/google.golang.org/grpc/internal/grpcsync/callback_serializer.go
@@ -20,7 +20,6 @@ package grpcsync
import (
"context"
- "sync"
"google.golang.org/grpc/internal/buffer"
)
@@ -38,8 +37,6 @@ type CallbackSerializer struct {
done chan struct{}
callbacks *buffer.Unbounded
- closedMu sync.Mutex
- closed bool
}
// NewCallbackSerializer returns a new CallbackSerializer instance. The provided
@@ -65,56 +62,34 @@ func NewCallbackSerializer(ctx context.Context) *CallbackSerializer {
// callbacks to be executed by the serializer. It is not possible to add
// callbacks once the context passed to NewCallbackSerializer is cancelled.
func (cs *CallbackSerializer) Schedule(f func(ctx context.Context)) bool {
- cs.closedMu.Lock()
- defer cs.closedMu.Unlock()
-
- if cs.closed {
- return false
- }
- cs.callbacks.Put(f)
- return true
+ return cs.callbacks.Put(f) == nil
}
func (cs *CallbackSerializer) run(ctx context.Context) {
- var backlog []func(context.Context)
-
defer close(cs.done)
+
+ // TODO: when Go 1.21 is the oldest supported version, this loop and Close
+ // can be replaced with:
+ //
+ // context.AfterFunc(ctx, cs.callbacks.Close)
for ctx.Err() == nil {
select {
case <-ctx.Done():
// Do nothing here. Next iteration of the for loop will not happen,
// since ctx.Err() would be non-nil.
- case callback, ok := <-cs.callbacks.Get():
- if !ok {
- return
- }
+ case cb := <-cs.callbacks.Get():
cs.callbacks.Load()
- callback.(func(ctx context.Context))(ctx)
+ cb.(func(context.Context))(ctx)
}
}
- // Fetch pending callbacks if any, and execute them before returning from
- // this method and closing cs.done.
- cs.closedMu.Lock()
- cs.closed = true
- backlog = cs.fetchPendingCallbacks()
+ // Close the buffer to prevent new callbacks from being added.
cs.callbacks.Close()
- cs.closedMu.Unlock()
- for _, b := range backlog {
- b(ctx)
- }
-}
-func (cs *CallbackSerializer) fetchPendingCallbacks() []func(context.Context) {
- var backlog []func(context.Context)
- for {
- select {
- case b := <-cs.callbacks.Get():
- backlog = append(backlog, b.(func(context.Context)))
- cs.callbacks.Load()
- default:
- return backlog
- }
+ // Run all pending callbacks.
+ for cb := range cs.callbacks.Get() {
+ cs.callbacks.Load()
+ cb.(func(context.Context))(ctx)
}
}
diff --git a/vendor/google.golang.org/grpc/internal/idle/idle.go b/vendor/google.golang.org/grpc/internal/idle/idle.go
index 6c272476e..fe49cb74c 100644
--- a/vendor/google.golang.org/grpc/internal/idle/idle.go
+++ b/vendor/google.golang.org/grpc/internal/idle/idle.go
@@ -26,8 +26,6 @@ import (
"sync"
"sync/atomic"
"time"
-
- "google.golang.org/grpc/grpclog"
)
// For overriding in unit tests.
@@ -39,27 +37,12 @@ var timeAfterFunc = func(d time.Duration, f func()) *time.Timer {
// and exit from idle mode.
type Enforcer interface {
ExitIdleMode() error
- EnterIdleMode() error
-}
-
-// Manager defines the functionality required to track RPC activity on a
-// channel.
-type Manager interface {
- OnCallBegin() error
- OnCallEnd()
- Close()
+ EnterIdleMode()
}
-type noopManager struct{}
-
-func (noopManager) OnCallBegin() error { return nil }
-func (noopManager) OnCallEnd() {}
-func (noopManager) Close() {}
-
-// manager implements the Manager interface. It uses atomic operations to
-// synchronize access to shared state and a mutex to guarantee mutual exclusion
-// in a critical section.
-type manager struct {
+// Manager implements idleness detection and calls the configured Enforcer to
+// enter/exit idle mode when appropriate. Must be created by NewManager.
+type Manager struct {
// State accessed atomically.
lastCallEndTime int64 // Unix timestamp in nanos; time when the most recent RPC completed.
activeCallsCount int32 // Count of active RPCs; -math.MaxInt32 means channel is idle or is trying to get there.
@@ -69,8 +52,7 @@ type manager struct {
// Can be accessed without atomics or mutex since these are set at creation
// time and read-only after that.
enforcer Enforcer // Functionality provided by grpc.ClientConn.
- timeout int64 // Idle timeout duration nanos stored as an int64.
- logger grpclog.LoggerV2
+ timeout time.Duration
// idleMu is used to guarantee mutual exclusion in two scenarios:
// - Opposing intentions:
@@ -88,57 +70,48 @@ type manager struct {
timer *time.Timer
}
-// ManagerOptions is a collection of options used by
-// NewManager.
-type ManagerOptions struct {
- Enforcer Enforcer
- Timeout time.Duration
- Logger grpclog.LoggerV2
+// NewManager creates a new idleness manager implementation for the
+// given idle timeout. It begins in idle mode.
+func NewManager(enforcer Enforcer, timeout time.Duration) *Manager {
+ return &Manager{
+ enforcer: enforcer,
+ timeout: timeout,
+ actuallyIdle: true,
+ activeCallsCount: -math.MaxInt32,
+ }
}
-// NewManager creates a new idleness manager implementation for the
-// given idle timeout.
-func NewManager(opts ManagerOptions) Manager {
- if opts.Timeout == 0 {
- return noopManager{}
+// resetIdleTimerLocked resets the idle timer to the given duration. Called
+// when exiting idle mode or when the timer fires and we need to reset it.
+func (m *Manager) resetIdleTimerLocked(d time.Duration) {
+ if m.isClosed() || m.timeout == 0 || m.actuallyIdle {
+ return
}
- m := &manager{
- enforcer: opts.Enforcer,
- timeout: int64(opts.Timeout),
- logger: opts.Logger,
+ // It is safe to ignore the return value from Reset() because this method is
+ // only ever called from the timer callback or when exiting idle mode.
+ if m.timer != nil {
+ m.timer.Stop()
}
- m.timer = timeAfterFunc(opts.Timeout, m.handleIdleTimeout)
- return m
+ m.timer = timeAfterFunc(d, m.handleIdleTimeout)
}
-// resetIdleTimer resets the idle timer to the given duration. This method
-// should only be called from the timer callback.
-func (m *manager) resetIdleTimer(d time.Duration) {
+func (m *Manager) resetIdleTimer(d time.Duration) {
m.idleMu.Lock()
defer m.idleMu.Unlock()
-
- if m.timer == nil {
- // Only close sets timer to nil. We are done.
- return
- }
-
- // It is safe to ignore the return value from Reset() because this method is
- // only ever called from the timer callback, which means the timer has
- // already fired.
- m.timer.Reset(d)
+ m.resetIdleTimerLocked(d)
}
// handleIdleTimeout is the timer callback that is invoked upon expiry of the
// configured idle timeout. The channel is considered inactive if there are no
// ongoing calls and no RPC activity since the last time the timer fired.
-func (m *manager) handleIdleTimeout() {
+func (m *Manager) handleIdleTimeout() {
if m.isClosed() {
return
}
if atomic.LoadInt32(&m.activeCallsCount) > 0 {
- m.resetIdleTimer(time.Duration(m.timeout))
+ m.resetIdleTimer(m.timeout)
return
}
@@ -148,24 +121,12 @@ func (m *manager) handleIdleTimeout() {
// Set the timer to fire after a duration of idle timeout, calculated
// from the time the most recent RPC completed.
atomic.StoreInt32(&m.activeSinceLastTimerCheck, 0)
- m.resetIdleTimer(time.Duration(atomic.LoadInt64(&m.lastCallEndTime) + m.timeout - time.Now().UnixNano()))
+ m.resetIdleTimer(time.Duration(atomic.LoadInt64(&m.lastCallEndTime)-time.Now().UnixNano()) + m.timeout)
return
}
- // This CAS operation is extremely likely to succeed given that there has
- // been no activity since the last time we were here. Setting the
- // activeCallsCount to -math.MaxInt32 indicates to OnCallBegin() that the
- // channel is either in idle mode or is trying to get there.
- if !atomic.CompareAndSwapInt32(&m.activeCallsCount, 0, -math.MaxInt32) {
- // This CAS operation can fail if an RPC started after we checked for
- // activity at the top of this method, or one was ongoing from before
- // the last time we were here. In both case, reset the timer and return.
- m.resetIdleTimer(time.Duration(m.timeout))
- return
- }
-
- // Now that we've set the active calls count to -math.MaxInt32, it's time to
- // actually move to idle mode.
+ // Now that we've checked that there has been no activity, attempt to enter
+ // idle mode, which is very likely to succeed.
if m.tryEnterIdleMode() {
// Successfully entered idle mode. No timer needed until we exit idle.
return
@@ -174,8 +135,7 @@ func (m *manager) handleIdleTimeout() {
// Failed to enter idle mode due to a concurrent RPC that kept the channel
// active, or because of an error from the channel. Undo the attempt to
// enter idle, and reset the timer to try again later.
- atomic.AddInt32(&m.activeCallsCount, math.MaxInt32)
- m.resetIdleTimer(time.Duration(m.timeout))
+ m.resetIdleTimer(m.timeout)
}
// tryEnterIdleMode instructs the channel to enter idle mode. But before
@@ -185,36 +145,49 @@ func (m *manager) handleIdleTimeout() {
// Return value indicates whether or not the channel moved to idle mode.
//
// Holds idleMu which ensures mutual exclusion with exitIdleMode.
-func (m *manager) tryEnterIdleMode() bool {
+func (m *Manager) tryEnterIdleMode() bool {
+ // Setting the activeCallsCount to -math.MaxInt32 indicates to OnCallBegin()
+ // that the channel is either in idle mode or is trying to get there.
+ if !atomic.CompareAndSwapInt32(&m.activeCallsCount, 0, -math.MaxInt32) {
+ // This CAS operation can fail if an RPC started after we checked for
+ // activity in the timer handler, or one was ongoing from before the
+ // last time the timer fired, or if a test is attempting to enter idle
+ // mode without checking. In all cases, abort going into idle mode.
+ return false
+ }
+ // N.B. if we fail to enter idle mode after this, we must re-add
+ // math.MaxInt32 to m.activeCallsCount.
+
m.idleMu.Lock()
defer m.idleMu.Unlock()
if atomic.LoadInt32(&m.activeCallsCount) != -math.MaxInt32 {
// We raced and lost to a new RPC. Very rare, but stop entering idle.
+ atomic.AddInt32(&m.activeCallsCount, math.MaxInt32)
return false
}
if atomic.LoadInt32(&m.activeSinceLastTimerCheck) == 1 {
- // An very short RPC could have come in (and also finished) after we
+ // A very short RPC could have come in (and also finished) after we
// checked for calls count and activity in handleIdleTimeout(), but
// before the CAS operation. So, we need to check for activity again.
+ atomic.AddInt32(&m.activeCallsCount, math.MaxInt32)
return false
}
- // No new RPCs have come in since we last set the active calls count value
- // -math.MaxInt32 in the timer callback. And since we have the lock, it is
- // safe to enter idle mode now.
- if err := m.enforcer.EnterIdleMode(); err != nil {
- m.logger.Errorf("Failed to enter idle mode: %v", err)
- return false
- }
-
- // Successfully entered idle mode.
+ // No new RPCs have come in since we set the active calls count value to
+ // -math.MaxInt32. And since we have the lock, it is safe to enter idle mode
+ // unconditionally now.
+ m.enforcer.EnterIdleMode()
m.actuallyIdle = true
return true
}
+func (m *Manager) EnterIdleModeForTesting() {
+ m.tryEnterIdleMode()
+}
+
// OnCallBegin is invoked at the start of every RPC.
-func (m *manager) OnCallBegin() error {
+func (m *Manager) OnCallBegin() error {
if m.isClosed() {
return nil
}
@@ -227,7 +200,7 @@ func (m *manager) OnCallBegin() error {
// Channel is either in idle mode or is in the process of moving to idle
// mode. Attempt to exit idle mode to allow this RPC.
- if err := m.exitIdleMode(); err != nil {
+ if err := m.ExitIdleMode(); err != nil {
// Undo the increment to calls count, and return an error causing the
// RPC to fail.
atomic.AddInt32(&m.activeCallsCount, -1)
@@ -238,28 +211,30 @@ func (m *manager) OnCallBegin() error {
return nil
}
-// exitIdleMode instructs the channel to exit idle mode.
-//
-// Holds idleMu which ensures mutual exclusion with tryEnterIdleMode.
-func (m *manager) exitIdleMode() error {
+// ExitIdleMode instructs m to call the enforcer's ExitIdleMode and update m's
+// internal state.
+func (m *Manager) ExitIdleMode() error {
+ // Holds idleMu which ensures mutual exclusion with tryEnterIdleMode.
m.idleMu.Lock()
defer m.idleMu.Unlock()
- if !m.actuallyIdle {
- // This can happen in two scenarios:
+ if m.isClosed() || !m.actuallyIdle {
+ // This can happen in three scenarios:
// - handleIdleTimeout() set the calls count to -math.MaxInt32 and called
// tryEnterIdleMode(). But before the latter could grab the lock, an RPC
// came in and OnCallBegin() noticed that the calls count is negative.
// - Channel is in idle mode, and multiple new RPCs come in at the same
// time, all of them notice a negative calls count in OnCallBegin and get
// here. The first one to get the lock would got the channel to exit idle.
+ // - Channel is not in idle mode, and the user calls Connect which calls
+ // m.ExitIdleMode.
//
- // Either way, nothing to do here.
+ // In any case, there is nothing to do here.
return nil
}
if err := m.enforcer.ExitIdleMode(); err != nil {
- return fmt.Errorf("channel failed to exit idle mode: %v", err)
+ return fmt.Errorf("failed to exit idle mode: %w", err)
}
// Undo the idle entry process. This also respects any new RPC attempts.
@@ -267,12 +242,12 @@ func (m *manager) exitIdleMode() error {
m.actuallyIdle = false
// Start a new timer to fire after the configured idle timeout.
- m.timer = timeAfterFunc(time.Duration(m.timeout), m.handleIdleTimeout)
+ m.resetIdleTimerLocked(m.timeout)
return nil
}
// OnCallEnd is invoked at the end of every RPC.
-func (m *manager) OnCallEnd() {
+func (m *Manager) OnCallEnd() {
if m.isClosed() {
return
}
@@ -287,15 +262,17 @@ func (m *manager) OnCallEnd() {
atomic.AddInt32(&m.activeCallsCount, -1)
}
-func (m *manager) isClosed() bool {
+func (m *Manager) isClosed() bool {
return atomic.LoadInt32(&m.closed) == 1
}
-func (m *manager) Close() {
+func (m *Manager) Close() {
atomic.StoreInt32(&m.closed, 1)
m.idleMu.Lock()
- m.timer.Stop()
- m.timer = nil
+ if m.timer != nil {
+ m.timer.Stop()
+ m.timer = nil
+ }
m.idleMu.Unlock()
}
diff --git a/vendor/google.golang.org/grpc/internal/internal.go b/vendor/google.golang.org/grpc/internal/internal.go
index 0d94c63e0..6c7ea6a53 100644
--- a/vendor/google.golang.org/grpc/internal/internal.go
+++ b/vendor/google.golang.org/grpc/internal/internal.go
@@ -57,7 +57,7 @@ var (
// GetXDSHandshakeInfoForTesting returns a pointer to the xds.HandshakeInfo
// stored in the passed in attributes. This is set by
// credentials/xds/xds.go.
- GetXDSHandshakeInfoForTesting any // func (*attributes.Attributes) *xds.HandshakeInfo
+ GetXDSHandshakeInfoForTesting any // func (*attributes.Attributes) *unsafe.Pointer
// GetServerCredentials returns the transport credentials configured on a
// gRPC server. An xDS-enabled server needs to know what type of credentials
// is configured on the underlying gRPC server. This is set by server.go.
@@ -68,11 +68,11 @@ var (
// This is used in the 1.0 release of gcp/observability, and thus must not be
// deleted or changed.
CanonicalString any // func (codes.Code) string
- // DrainServerTransports initiates a graceful close of existing connections
- // on a gRPC server accepted on the provided listener address. An
- // xDS-enabled server invokes this method on a grpc.Server when a particular
- // listener moves to "not-serving" mode.
- DrainServerTransports any // func(*grpc.Server, string)
+ // IsRegisteredMethod returns whether the passed in method is registered as
+ // a method on the server.
+ IsRegisteredMethod any // func(*grpc.Server, string) bool
+ // ServerFromContext returns the server from the context.
+ ServerFromContext any // func(context.Context) *grpc.Server
// AddGlobalServerOptions adds an array of ServerOption that will be
// effective globally for newly created servers. The priority will be: 1.
// user-provided; 2. this method; 3. default values.
@@ -177,10 +177,25 @@ var (
GRPCResolverSchemeExtraMetadata string = "xds"
// EnterIdleModeForTesting gets the ClientConn to enter IDLE mode.
- EnterIdleModeForTesting any // func(*grpc.ClientConn) error
+ EnterIdleModeForTesting any // func(*grpc.ClientConn)
// ExitIdleModeForTesting gets the ClientConn to exit IDLE mode.
ExitIdleModeForTesting any // func(*grpc.ClientConn) error
+
+ ChannelzTurnOffForTesting func()
+
+ // TriggerXDSResourceNameNotFoundForTesting triggers the resource-not-found
+ // error for a given resource type and name. This is usually triggered when
+ // the associated watch timer fires. For testing purposes, having this
+ // function makes events more predictable than relying on timer events.
+ TriggerXDSResourceNameNotFoundForTesting any // func(func(xdsresource.Type, string), string, string) error
+
+ // TriggerXDSResourceNotFoundClient invokes the testing xDS Client singleton
+ // to invoke resource not found for a resource type name and resource name.
+ TriggerXDSResourceNameNotFoundClient any // func(string, string) error
+
+ // FromOutgoingContextRaw returns the un-merged, intermediary contents of metadata.rawMD.
+ FromOutgoingContextRaw any // func(context.Context) (metadata.MD, [][]string, bool)
)
// HealthChecker defines the signature of the client-side LB channel health checking function.
diff --git a/vendor/google.golang.org/grpc/internal/resolver/dns/dns_resolver.go b/vendor/google.golang.org/grpc/internal/resolver/dns/dns_resolver.go
index 99e1e5b36..b66dcb213 100644
--- a/vendor/google.golang.org/grpc/internal/resolver/dns/dns_resolver.go
+++ b/vendor/google.golang.org/grpc/internal/resolver/dns/dns_resolver.go
@@ -23,7 +23,6 @@ package dns
import (
"context"
"encoding/json"
- "errors"
"fmt"
"net"
"os"
@@ -37,6 +36,7 @@ import (
"google.golang.org/grpc/internal/backoff"
"google.golang.org/grpc/internal/envconfig"
"google.golang.org/grpc/internal/grpcrand"
+ "google.golang.org/grpc/internal/resolver/dns/internal"
"google.golang.org/grpc/resolver"
"google.golang.org/grpc/serviceconfig"
)
@@ -47,15 +47,11 @@ var EnableSRVLookups = false
var logger = grpclog.Component("dns")
-// Globals to stub out in tests. TODO: Perhaps these two can be combined into a
-// single variable for testing the resolver?
-var (
- newTimer = time.NewTimer
- newTimerDNSResRate = time.NewTimer
-)
-
func init() {
resolver.Register(NewBuilder())
+ internal.TimeAfterFunc = time.After
+ internal.NewNetResolver = newNetResolver
+ internal.AddressDialer = addressDialer
}
const (
@@ -70,23 +66,6 @@ const (
txtAttribute = "grpc_config="
)
-var (
- errMissingAddr = errors.New("dns resolver: missing address")
-
- // Addresses ending with a colon that is supposed to be the separator
- // between host and port is not allowed. E.g. "::" is a valid address as
- // it is an IPv6 address (host only) and "[::]:" is invalid as it ends with
- // a colon as the host and port separator
- errEndsWithColon = errors.New("dns resolver: missing port after port-separator colon")
-)
-
-var (
- defaultResolver netResolver = net.DefaultResolver
- // To prevent excessive re-resolution, we enforce a rate limit on DNS
- // resolution requests.
- minDNSResRate = 30 * time.Second
-)
-
var addressDialer = func(address string) func(context.Context, string, string) (net.Conn, error) {
return func(ctx context.Context, network, _ string) (net.Conn, error) {
var dialer net.Dialer
@@ -94,7 +73,11 @@ var addressDialer = func(address string) func(context.Context, string, string) (
}
}
-var newNetResolver = func(authority string) (netResolver, error) {
+var newNetResolver = func(authority string) (internal.NetResolver, error) {
+ if authority == "" {
+ return net.DefaultResolver, nil
+ }
+
host, port, err := parseTarget(authority, defaultDNSSvrPort)
if err != nil {
return nil, err
@@ -104,7 +87,7 @@ var newNetResolver = func(authority string) (netResolver, error) {
return &net.Resolver{
PreferGo: true,
- Dial: addressDialer(authorityWithPort),
+ Dial: internal.AddressDialer(authorityWithPort),
}, nil
}
@@ -142,13 +125,9 @@ func (b *dnsBuilder) Build(target resolver.Target, cc resolver.ClientConn, opts
disableServiceConfig: opts.DisableServiceConfig,
}
- if target.URL.Host == "" {
- d.resolver = defaultResolver
- } else {
- d.resolver, err = newNetResolver(target.URL.Host)
- if err != nil {
- return nil, err
- }
+ d.resolver, err = internal.NewNetResolver(target.URL.Host)
+ if err != nil {
+ return nil, err
}
d.wg.Add(1)
@@ -161,12 +140,6 @@ func (b *dnsBuilder) Scheme() string {
return "dns"
}
-type netResolver interface {
- LookupHost(ctx context.Context, host string) (addrs []string, err error)
- LookupSRV(ctx context.Context, service, proto, name string) (cname string, addrs []*net.SRV, err error)
- LookupTXT(ctx context.Context, name string) (txts []string, err error)
-}
-
// deadResolver is a resolver that does nothing.
type deadResolver struct{}
@@ -178,7 +151,7 @@ func (deadResolver) Close() {}
type dnsResolver struct {
host string
port string
- resolver netResolver
+ resolver internal.NetResolver
ctx context.Context
cancel context.CancelFunc
cc resolver.ClientConn
@@ -223,29 +196,27 @@ func (d *dnsResolver) watcher() {
err = d.cc.UpdateState(*state)
}
- var timer *time.Timer
+ var waitTime time.Duration
if err == nil {
// Success resolving, wait for the next ResolveNow. However, also wait 30
// seconds at the very least to prevent constantly re-resolving.
backoffIndex = 1
- timer = newTimerDNSResRate(minDNSResRate)
+ waitTime = internal.MinResolutionRate
select {
case <-d.ctx.Done():
- timer.Stop()
return
case <-d.rn:
}
} else {
// Poll on an error found in DNS Resolver or an error received from
// ClientConn.
- timer = newTimer(backoff.DefaultExponential.Backoff(backoffIndex))
+ waitTime = backoff.DefaultExponential.Backoff(backoffIndex)
backoffIndex++
}
select {
case <-d.ctx.Done():
- timer.Stop()
return
- case <-timer.C:
+ case <-internal.TimeAfterFunc(waitTime):
}
}
}
@@ -387,7 +358,7 @@ func formatIP(addr string) (addrIP string, ok bool) {
// target: ":80" defaultPort: "443" returns host: "localhost", port: "80"
func parseTarget(target, defaultPort string) (host, port string, err error) {
if target == "" {
- return "", "", errMissingAddr
+ return "", "", internal.ErrMissingAddr
}
if ip := net.ParseIP(target); ip != nil {
// target is an IPv4 or IPv6(without brackets) address
@@ -397,7 +368,7 @@ func parseTarget(target, defaultPort string) (host, port string, err error) {
if port == "" {
// If the port field is empty (target ends with colon), e.g. "[::1]:",
// this is an error.
- return "", "", errEndsWithColon
+ return "", "", internal.ErrEndsWithColon
}
// target has port, i.e ipv4-host:port, [ipv6-host]:port, host-name:port
if host == "" {
diff --git a/vendor/google.golang.org/grpc/internal/resolver/dns/internal/internal.go b/vendor/google.golang.org/grpc/internal/resolver/dns/internal/internal.go
new file mode 100644
index 000000000..c7fc557d0
--- /dev/null
+++ b/vendor/google.golang.org/grpc/internal/resolver/dns/internal/internal.go
@@ -0,0 +1,70 @@
+/*
+ *
+ * Copyright 2023 gRPC authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+// Package internal contains functionality internal to the dns resolver package.
+package internal
+
+import (
+ "context"
+ "errors"
+ "net"
+ "time"
+)
+
+// NetResolver groups the methods on net.Resolver that are used by the DNS
+// resolver implementation. This allows the default net.Resolver instance to be
+// overidden from tests.
+type NetResolver interface {
+ LookupHost(ctx context.Context, host string) (addrs []string, err error)
+ LookupSRV(ctx context.Context, service, proto, name string) (cname string, addrs []*net.SRV, err error)
+ LookupTXT(ctx context.Context, name string) (txts []string, err error)
+}
+
+var (
+ // ErrMissingAddr is the error returned when building a DNS resolver when
+ // the provided target name is empty.
+ ErrMissingAddr = errors.New("dns resolver: missing address")
+
+ // ErrEndsWithColon is the error returned when building a DNS resolver when
+ // the provided target name ends with a colon that is supposed to be the
+ // separator between host and port. E.g. "::" is a valid address as it is
+ // an IPv6 address (host only) and "[::]:" is invalid as it ends with a
+ // colon as the host and port separator
+ ErrEndsWithColon = errors.New("dns resolver: missing port after port-separator colon")
+)
+
+// The following vars are overridden from tests.
+var (
+ // MinResolutionRate is the minimum rate at which re-resolutions are
+ // allowed. This helps to prevent excessive re-resolution.
+ MinResolutionRate = 30 * time.Second
+
+ // TimeAfterFunc is used by the DNS resolver to wait for the given duration
+ // to elapse. In non-test code, this is implemented by time.After. In test
+ // code, this can be used to control the amount of time the resolver is
+ // blocked waiting for the duration to elapse.
+ TimeAfterFunc func(time.Duration) <-chan time.Time
+
+ // NewNetResolver returns the net.Resolver instance for the given target.
+ NewNetResolver func(string) (NetResolver, error)
+
+ // AddressDialer is the dialer used to dial the DNS server. It accepts the
+ // Host portion of the URL corresponding to the user's dial target and
+ // returns a dial function.
+ AddressDialer func(address string) func(context.Context, string, string) (net.Conn, error)
+)
diff --git a/vendor/google.golang.org/grpc/internal/resolver/unix/unix.go b/vendor/google.golang.org/grpc/internal/resolver/unix/unix.go
index 160911687..27cd81af9 100644
--- a/vendor/google.golang.org/grpc/internal/resolver/unix/unix.go
+++ b/vendor/google.golang.org/grpc/internal/resolver/unix/unix.go
@@ -61,6 +61,10 @@ func (b *builder) Scheme() string {
return b.scheme
}
+func (b *builder) OverrideAuthority(resolver.Target) string {
+ return "localhost"
+}
+
type nopResolver struct {
}
diff --git a/vendor/google.golang.org/grpc/internal/tcp_keepalive_others.go b/vendor/google.golang.org/grpc/internal/tcp_keepalive_others.go
new file mode 100644
index 000000000..4f347edd4
--- /dev/null
+++ b/vendor/google.golang.org/grpc/internal/tcp_keepalive_others.go
@@ -0,0 +1,29 @@
+//go:build !unix && !windows
+
+/*
+ * Copyright 2023 gRPC authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package internal
+
+import (
+ "net"
+)
+
+// NetDialerWithTCPKeepalive returns a vanilla net.Dialer on non-unix platforms.
+func NetDialerWithTCPKeepalive() *net.Dialer {
+ return &net.Dialer{}
+}
diff --git a/vendor/google.golang.org/grpc/internal/tcp_keepalive_unix.go b/vendor/google.golang.org/grpc/internal/tcp_keepalive_unix.go
new file mode 100644
index 000000000..078137b7f
--- /dev/null
+++ b/vendor/google.golang.org/grpc/internal/tcp_keepalive_unix.go
@@ -0,0 +1,54 @@
+//go:build unix
+
+/*
+ * Copyright 2023 gRPC authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package internal
+
+import (
+ "net"
+ "syscall"
+ "time"
+
+ "golang.org/x/sys/unix"
+)
+
+// NetDialerWithTCPKeepalive returns a net.Dialer that enables TCP keepalives on
+// the underlying connection with OS default values for keepalive parameters.
+//
+// TODO: Once https://github.com/golang/go/issues/62254 lands, and the
+// appropriate Go version becomes less than our least supported Go version, we
+// should look into using the new API to make things more straightforward.
+func NetDialerWithTCPKeepalive() *net.Dialer {
+ return &net.Dialer{
+ // Setting a negative value here prevents the Go stdlib from overriding
+ // the values of TCP keepalive time and interval. It also prevents the
+ // Go stdlib from enabling TCP keepalives by default.
+ KeepAlive: time.Duration(-1),
+ // This method is called after the underlying network socket is created,
+ // but before dialing the socket (or calling its connect() method). The
+ // combination of unconditionally enabling TCP keepalives here, and
+ // disabling the overriding of TCP keepalive parameters by setting the
+ // KeepAlive field to a negative value above, results in OS defaults for
+ // the TCP keealive interval and time parameters.
+ Control: func(_, _ string, c syscall.RawConn) error {
+ return c.Control(func(fd uintptr) {
+ unix.SetsockoptInt(int(fd), unix.SOL_SOCKET, unix.SO_KEEPALIVE, 1)
+ })
+ },
+ }
+}
diff --git a/vendor/google.golang.org/grpc/internal/tcp_keepalive_windows.go b/vendor/google.golang.org/grpc/internal/tcp_keepalive_windows.go
new file mode 100644
index 000000000..fd7d43a89
--- /dev/null
+++ b/vendor/google.golang.org/grpc/internal/tcp_keepalive_windows.go
@@ -0,0 +1,54 @@
+//go:build windows
+
+/*
+ * Copyright 2023 gRPC authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package internal
+
+import (
+ "net"
+ "syscall"
+ "time"
+
+ "golang.org/x/sys/windows"
+)
+
+// NetDialerWithTCPKeepalive returns a net.Dialer that enables TCP keepalives on
+// the underlying connection with OS default values for keepalive parameters.
+//
+// TODO: Once https://github.com/golang/go/issues/62254 lands, and the
+// appropriate Go version becomes less than our least supported Go version, we
+// should look into using the new API to make things more straightforward.
+func NetDialerWithTCPKeepalive() *net.Dialer {
+ return &net.Dialer{
+ // Setting a negative value here prevents the Go stdlib from overriding
+ // the values of TCP keepalive time and interval. It also prevents the
+ // Go stdlib from enabling TCP keepalives by default.
+ KeepAlive: time.Duration(-1),
+ // This method is called after the underlying network socket is created,
+ // but before dialing the socket (or calling its connect() method). The
+ // combination of unconditionally enabling TCP keepalives here, and
+ // disabling the overriding of TCP keepalive parameters by setting the
+ // KeepAlive field to a negative value above, results in OS defaults for
+ // the TCP keealive interval and time parameters.
+ Control: func(_, _ string, c syscall.RawConn) error {
+ return c.Control(func(fd uintptr) {
+ windows.SetsockoptInt(windows.Handle(fd), windows.SOL_SOCKET, windows.SO_KEEPALIVE, 1)
+ })
+ },
+ }
+}
diff --git a/vendor/google.golang.org/grpc/internal/transport/controlbuf.go b/vendor/google.golang.org/grpc/internal/transport/controlbuf.go
index b330ccedc..83c382982 100644
--- a/vendor/google.golang.org/grpc/internal/transport/controlbuf.go
+++ b/vendor/google.golang.org/grpc/internal/transport/controlbuf.go
@@ -535,8 +535,8 @@ const minBatchSize = 1000
// size is too low to give stream goroutines a chance to fill it up.
//
// Upon exiting, if the error causing the exit is not an I/O error, run()
-// flushes and closes the underlying connection. Otherwise, the connection is
-// left open to allow the I/O error to be encountered by the reader instead.
+// flushes the underlying connection. The connection is always left open to
+// allow different closing behavior on the client and server.
func (l *loopyWriter) run() (err error) {
defer func() {
if l.logger.V(logLevel) {
@@ -544,7 +544,6 @@ func (l *loopyWriter) run() (err error) {
}
if !isIOError(err) {
l.framer.writer.Flush()
- l.conn.Close()
}
l.cbuf.finish()
}()
diff --git a/vendor/google.golang.org/grpc/internal/transport/handler_server.go b/vendor/google.golang.org/grpc/internal/transport/handler_server.go
index 17f7a21b5..a9d70e2a1 100644
--- a/vendor/google.golang.org/grpc/internal/transport/handler_server.go
+++ b/vendor/google.golang.org/grpc/internal/transport/handler_server.go
@@ -75,11 +75,25 @@ func NewServerHandlerTransport(w http.ResponseWriter, r *http.Request, stats []s
return nil, errors.New(msg)
}
+ var localAddr net.Addr
+ if la := r.Context().Value(http.LocalAddrContextKey); la != nil {
+ localAddr, _ = la.(net.Addr)
+ }
+ var authInfo credentials.AuthInfo
+ if r.TLS != nil {
+ authInfo = credentials.TLSInfo{State: *r.TLS, CommonAuthInfo: credentials.CommonAuthInfo{SecurityLevel: credentials.PrivacyAndIntegrity}}
+ }
+ p := peer.Peer{
+ Addr: strAddr(r.RemoteAddr),
+ LocalAddr: localAddr,
+ AuthInfo: authInfo,
+ }
st := &serverHandlerTransport{
rw: w,
req: r,
closedCh: make(chan struct{}),
writes: make(chan func()),
+ peer: p,
contentType: contentType,
contentSubtype: contentSubtype,
stats: stats,
@@ -134,6 +148,8 @@ type serverHandlerTransport struct {
headerMD metadata.MD
+ peer peer.Peer
+
closeOnce sync.Once
closedCh chan struct{} // closed on Close
@@ -165,7 +181,13 @@ func (ht *serverHandlerTransport) Close(err error) {
})
}
-func (ht *serverHandlerTransport) RemoteAddr() net.Addr { return strAddr(ht.req.RemoteAddr) }
+func (ht *serverHandlerTransport) Peer() *peer.Peer {
+ return &peer.Peer{
+ Addr: ht.peer.Addr,
+ LocalAddr: ht.peer.LocalAddr,
+ AuthInfo: ht.peer.AuthInfo,
+ }
+}
// strAddr is a net.Addr backed by either a TCP "ip:port" string, or
// the empty string if unknown.
@@ -347,10 +369,8 @@ func (ht *serverHandlerTransport) WriteHeader(s *Stream, md metadata.MD) error {
return err
}
-func (ht *serverHandlerTransport) HandleStreams(startStream func(*Stream)) {
+func (ht *serverHandlerTransport) HandleStreams(ctx context.Context, startStream func(*Stream)) {
// With this transport type there will be exactly 1 stream: this HTTP request.
-
- ctx := ht.req.Context()
var cancel context.CancelFunc
if ht.timeoutSet {
ctx, cancel = context.WithTimeout(ctx, ht.timeout)
@@ -370,34 +390,19 @@ func (ht *serverHandlerTransport) HandleStreams(startStream func(*Stream)) {
ht.Close(errors.New("request is done processing"))
}()
+ ctx = metadata.NewIncomingContext(ctx, ht.headerMD)
req := ht.req
-
s := &Stream{
- id: 0, // irrelevant
- requestRead: func(int) {},
- cancel: cancel,
- buf: newRecvBuffer(),
- st: ht,
- method: req.URL.Path,
- recvCompress: req.Header.Get("grpc-encoding"),
- contentSubtype: ht.contentSubtype,
- }
- pr := &peer.Peer{
- Addr: ht.RemoteAddr(),
- }
- if req.TLS != nil {
- pr.AuthInfo = credentials.TLSInfo{State: *req.TLS, CommonAuthInfo: credentials.CommonAuthInfo{SecurityLevel: credentials.PrivacyAndIntegrity}}
- }
- ctx = metadata.NewIncomingContext(ctx, ht.headerMD)
- s.ctx = peer.NewContext(ctx, pr)
- for _, sh := range ht.stats {
- s.ctx = sh.TagRPC(s.ctx, &stats.RPCTagInfo{FullMethodName: s.method})
- inHeader := &stats.InHeader{
- FullMethod: s.method,
- RemoteAddr: ht.RemoteAddr(),
- Compression: s.recvCompress,
- }
- sh.HandleRPC(s.ctx, inHeader)
+ id: 0, // irrelevant
+ ctx: ctx,
+ requestRead: func(int) {},
+ cancel: cancel,
+ buf: newRecvBuffer(),
+ st: ht,
+ method: req.URL.Path,
+ recvCompress: req.Header.Get("grpc-encoding"),
+ contentSubtype: ht.contentSubtype,
+ headerWireLength: 0, // won't have access to header wire length until golang/go#18997.
}
s.trReader = &transportReader{
reader: &recvBufferReader{ctx: s.ctx, ctxDone: s.ctx.Done(), recv: s.buf, freeBuffer: func(*bytes.Buffer) {}},
diff --git a/vendor/google.golang.org/grpc/internal/transport/http2_client.go b/vendor/google.golang.org/grpc/internal/transport/http2_client.go
index d6f5c4935..eff879964 100644
--- a/vendor/google.golang.org/grpc/internal/transport/http2_client.go
+++ b/vendor/google.golang.org/grpc/internal/transport/http2_client.go
@@ -36,6 +36,7 @@ import (
"golang.org/x/net/http2/hpack"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/credentials"
+ "google.golang.org/grpc/internal"
"google.golang.org/grpc/internal/channelz"
icredentials "google.golang.org/grpc/internal/credentials"
"google.golang.org/grpc/internal/grpclog"
@@ -43,7 +44,7 @@ import (
"google.golang.org/grpc/internal/grpcutil"
imetadata "google.golang.org/grpc/internal/metadata"
istatus "google.golang.org/grpc/internal/status"
- "google.golang.org/grpc/internal/syscall"
+ isyscall "google.golang.org/grpc/internal/syscall"
"google.golang.org/grpc/internal/transport/networktype"
"google.golang.org/grpc/keepalive"
"google.golang.org/grpc/metadata"
@@ -58,6 +59,8 @@ import (
// atomically.
var clientConnectionCounter uint64
+var metadataFromOutgoingContextRaw = internal.FromOutgoingContextRaw.(func(context.Context) (metadata.MD, [][]string, bool))
+
// http2Client implements the ClientTransport interface with HTTP2.
type http2Client struct {
lastRead int64 // Keep this field 64-bit aligned. Accessed atomically.
@@ -176,7 +179,7 @@ func dial(ctx context.Context, fn func(context.Context, string) (net.Conn, error
if networkType == "tcp" && useProxy {
return proxyDial(ctx, address, grpcUA)
}
- return (&net.Dialer{}).DialContext(ctx, networkType, address)
+ return internal.NetDialerWithTCPKeepalive().DialContext(ctx, networkType, address)
}
func isTemporary(err error) bool {
@@ -262,7 +265,7 @@ func newHTTP2Client(connectCtx, ctx context.Context, addr resolver.Address, opts
}
keepaliveEnabled := false
if kp.Time != infinity {
- if err = syscall.SetTCPUserTimeout(conn, kp.Timeout); err != nil {
+ if err = isyscall.SetTCPUserTimeout(conn, kp.Timeout); err != nil {
return nil, connectionErrorf(false, err, "transport: failed to set TCP_USER_TIMEOUT: %v", err)
}
keepaliveEnabled = true
@@ -448,7 +451,13 @@ func newHTTP2Client(connectCtx, ctx context.Context, addr resolver.Address, opts
}
go func() {
t.loopy = newLoopyWriter(clientSide, t.framer, t.controlBuf, t.bdpEst, t.conn, t.logger)
- t.loopy.run()
+ if err := t.loopy.run(); !isIOError(err) {
+ // Immediately close the connection, as the loopy writer returns
+ // when there are no more active streams and we were draining (the
+ // server sent a GOAWAY). For I/O errors, the reader will hit it
+ // after draining any remaining incoming data.
+ t.conn.Close()
+ }
close(t.writerDone)
}()
return t, nil
@@ -493,8 +502,9 @@ func (t *http2Client) newStream(ctx context.Context, callHdr *CallHdr) *Stream {
func (t *http2Client) getPeer() *peer.Peer {
return &peer.Peer{
- Addr: t.remoteAddr,
- AuthInfo: t.authInfo, // Can be nil
+ Addr: t.remoteAddr,
+ AuthInfo: t.authInfo, // Can be nil
+ LocalAddr: t.localAddr,
}
}
@@ -566,7 +576,7 @@ func (t *http2Client) createHeaderFields(ctx context.Context, callHdr *CallHdr)
headerFields = append(headerFields, hpack.HeaderField{Name: "grpc-trace-bin", Value: encodeBinHeader(b)})
}
- if md, added, ok := metadata.FromOutgoingContextRaw(ctx); ok {
+ if md, added, ok := metadataFromOutgoingContextRaw(ctx); ok {
var k string
for k, vv := range md {
// HTTP doesn't allow you to set pseudoheaders after non pseudoheaders were set.
@@ -1321,10 +1331,8 @@ func (t *http2Client) handleGoAway(f *http2.GoAwayFrame) {
for streamID, stream := range t.activeStreams {
if streamID > id && streamID <= upperLimit {
// The stream was unprocessed by the server.
- if streamID > id && streamID <= upperLimit {
- atomic.StoreUint32(&stream.unprocessed, 1)
- streamsToClose = append(streamsToClose, stream)
- }
+ atomic.StoreUint32(&stream.unprocessed, 1)
+ streamsToClose = append(streamsToClose, stream)
}
}
t.mu.Unlock()
diff --git a/vendor/google.golang.org/grpc/internal/transport/http2_server.go b/vendor/google.golang.org/grpc/internal/transport/http2_server.go
index 6fa1eb419..a206e2eef 100644
--- a/vendor/google.golang.org/grpc/internal/transport/http2_server.go
+++ b/vendor/google.golang.org/grpc/internal/transport/http2_server.go
@@ -68,18 +68,15 @@ var serverConnectionCounter uint64
// http2Server implements the ServerTransport interface with HTTP2.
type http2Server struct {
- lastRead int64 // Keep this field 64-bit aligned. Accessed atomically.
- ctx context.Context
- done chan struct{}
- conn net.Conn
- loopy *loopyWriter
- readerDone chan struct{} // sync point to enable testing.
- writerDone chan struct{} // sync point to enable testing.
- remoteAddr net.Addr
- localAddr net.Addr
- authInfo credentials.AuthInfo // auth info about the connection
- inTapHandle tap.ServerInHandle
- framer *framer
+ lastRead int64 // Keep this field 64-bit aligned. Accessed atomically.
+ done chan struct{}
+ conn net.Conn
+ loopy *loopyWriter
+ readerDone chan struct{} // sync point to enable testing.
+ loopyWriterDone chan struct{}
+ peer peer.Peer
+ inTapHandle tap.ServerInHandle
+ framer *framer
// The max number of concurrent streams.
maxStreams uint32
// controlBuf delivers all the control related tasks (e.g., window
@@ -243,16 +240,18 @@ func NewServerTransport(conn net.Conn, config *ServerConfig) (_ ServerTransport,
}
done := make(chan struct{})
+ peer := peer.Peer{
+ Addr: conn.RemoteAddr(),
+ LocalAddr: conn.LocalAddr(),
+ AuthInfo: authInfo,
+ }
t := &http2Server{
- ctx: setConnection(context.Background(), rawConn),
done: done,
conn: conn,
- remoteAddr: conn.RemoteAddr(),
- localAddr: conn.LocalAddr(),
- authInfo: authInfo,
+ peer: peer,
framer: framer,
readerDone: make(chan struct{}),
- writerDone: make(chan struct{}),
+ loopyWriterDone: make(chan struct{}),
maxStreams: config.MaxStreams,
inTapHandle: config.InTapHandle,
fc: &trInFlow{limit: uint32(icwz)},
@@ -267,8 +266,6 @@ func NewServerTransport(conn net.Conn, config *ServerConfig) (_ ServerTransport,
bufferPool: newBufferPool(),
}
t.logger = prefixLoggerForServerTransport(t)
- // Add peer information to the http2server context.
- t.ctx = peer.NewContext(t.ctx, t.getPeer())
t.controlBuf = newControlBuffer(t.done)
if dynamicWindow {
@@ -277,15 +274,7 @@ func NewServerTransport(conn net.Conn, config *ServerConfig) (_ ServerTransport,
updateFlowControl: t.updateFlowControl,
}
}
- for _, sh := range t.stats {
- t.ctx = sh.TagConn(t.ctx, &stats.ConnTagInfo{
- RemoteAddr: t.remoteAddr,
- LocalAddr: t.localAddr,
- })
- connBegin := &stats.ConnBegin{}
- sh.HandleConn(t.ctx, connBegin)
- }
- t.channelzID, err = channelz.RegisterNormalSocket(t, config.ChannelzParentID, fmt.Sprintf("%s -> %s", t.remoteAddr, t.localAddr))
+ t.channelzID, err = channelz.RegisterNormalSocket(t, config.ChannelzParentID, fmt.Sprintf("%s -> %s", t.peer.Addr, t.peer.LocalAddr))
if err != nil {
return nil, err
}
@@ -333,8 +322,24 @@ func NewServerTransport(conn net.Conn, config *ServerConfig) (_ ServerTransport,
go func() {
t.loopy = newLoopyWriter(serverSide, t.framer, t.controlBuf, t.bdpEst, t.conn, t.logger)
t.loopy.ssGoAwayHandler = t.outgoingGoAwayHandler
- t.loopy.run()
- close(t.writerDone)
+ err := t.loopy.run()
+ close(t.loopyWriterDone)
+ if !isIOError(err) {
+ // Close the connection if a non-I/O error occurs (for I/O errors
+ // the reader will also encounter the error and close). Wait 1
+ // second before closing the connection, or when the reader is done
+ // (i.e. the client already closed the connection or a connection
+ // error occurred). This avoids the potential problem where there
+ // is unread data on the receive side of the connection, which, if
+ // closed, would lead to a TCP RST instead of FIN, and the client
+ // encountering errors. For more info:
+ // https://github.com/grpc/grpc-go/issues/5358
+ select {
+ case <-t.readerDone:
+ case <-time.After(time.Second):
+ }
+ t.conn.Close()
+ }
}()
go t.keepalive()
return t, nil
@@ -342,7 +347,7 @@ func NewServerTransport(conn net.Conn, config *ServerConfig) (_ ServerTransport,
// operateHeaders takes action on the decoded headers. Returns an error if fatal
// error encountered and transport needs to close, otherwise returns nil.
-func (t *http2Server) operateHeaders(frame *http2.MetaHeadersFrame, handle func(*Stream)) error {
+func (t *http2Server) operateHeaders(ctx context.Context, frame *http2.MetaHeadersFrame, handle func(*Stream)) error {
// Acquire max stream ID lock for entire duration
t.maxStreamMu.Lock()
defer t.maxStreamMu.Unlock()
@@ -369,10 +374,11 @@ func (t *http2Server) operateHeaders(frame *http2.MetaHeadersFrame, handle func(
buf := newRecvBuffer()
s := &Stream{
- id: streamID,
- st: t,
- buf: buf,
- fc: &inFlow{limit: uint32(t.initialWindowSize)},
+ id: streamID,
+ st: t,
+ buf: buf,
+ fc: &inFlow{limit: uint32(t.initialWindowSize)},
+ headerWireLength: int(frame.Header().Length),
}
var (
// if false, content-type was missing or invalid
@@ -511,9 +517,9 @@ func (t *http2Server) operateHeaders(frame *http2.MetaHeadersFrame, handle func(
s.state = streamReadDone
}
if timeoutSet {
- s.ctx, s.cancel = context.WithTimeout(t.ctx, timeout)
+ s.ctx, s.cancel = context.WithTimeout(ctx, timeout)
} else {
- s.ctx, s.cancel = context.WithCancel(t.ctx)
+ s.ctx, s.cancel = context.WithCancel(ctx)
}
// Attach the received metadata to the context.
@@ -592,18 +598,6 @@ func (t *http2Server) operateHeaders(frame *http2.MetaHeadersFrame, handle func(
s.requestRead = func(n int) {
t.adjustWindow(s, uint32(n))
}
- for _, sh := range t.stats {
- s.ctx = sh.TagRPC(s.ctx, &stats.RPCTagInfo{FullMethodName: s.method})
- inHeader := &stats.InHeader{
- FullMethod: s.method,
- RemoteAddr: t.remoteAddr,
- LocalAddr: t.localAddr,
- Compression: s.recvCompress,
- WireLength: int(frame.Header().Length),
- Header: mdata.Copy(),
- }
- sh.HandleRPC(s.ctx, inHeader)
- }
s.ctxDone = s.ctx.Done()
s.wq = newWriteQuota(defaultWriteQuota, s.ctxDone)
s.trReader = &transportReader{
@@ -629,8 +623,11 @@ func (t *http2Server) operateHeaders(frame *http2.MetaHeadersFrame, handle func(
// HandleStreams receives incoming streams using the given handler. This is
// typically run in a separate goroutine.
// traceCtx attaches trace to ctx and returns the new context.
-func (t *http2Server) HandleStreams(handle func(*Stream)) {
- defer close(t.readerDone)
+func (t *http2Server) HandleStreams(ctx context.Context, handle func(*Stream)) {
+ defer func() {
+ close(t.readerDone)
+ <-t.loopyWriterDone
+ }()
for {
t.controlBuf.throttle()
frame, err := t.framer.fr.ReadFrame()
@@ -664,7 +661,7 @@ func (t *http2Server) HandleStreams(handle func(*Stream)) {
}
switch frame := frame.(type) {
case *http2.MetaHeadersFrame:
- if err := t.operateHeaders(frame, handle); err != nil {
+ if err := t.operateHeaders(ctx, frame, handle); err != nil {
t.Close(err)
break
}
@@ -979,7 +976,12 @@ func (t *http2Server) WriteHeader(s *Stream, md metadata.MD) error {
}
}
if err := t.writeHeaderLocked(s); err != nil {
- return status.Convert(err).Err()
+ switch e := err.(type) {
+ case ConnectionError:
+ return status.Error(codes.Unavailable, e.Desc)
+ default:
+ return status.Convert(err).Err()
+ }
}
return nil
}
@@ -1242,10 +1244,6 @@ func (t *http2Server) Close(err error) {
for _, s := range streams {
s.cancel()
}
- for _, sh := range t.stats {
- connEnd := &stats.ConnEnd{}
- sh.HandleConn(t.ctx, connEnd)
- }
}
// deleteStream deletes the stream s from transport's active streams.
@@ -1311,10 +1309,6 @@ func (t *http2Server) closeStream(s *Stream, rst bool, rstCode http2.ErrCode, eo
})
}
-func (t *http2Server) RemoteAddr() net.Addr {
- return t.remoteAddr
-}
-
func (t *http2Server) Drain(debugData string) {
t.mu.Lock()
defer t.mu.Unlock()
@@ -1351,6 +1345,7 @@ func (t *http2Server) outgoingGoAwayHandler(g *goAway) (bool, error) {
if err := t.framer.fr.WriteGoAway(sid, g.code, g.debugData); err != nil {
return false, err
}
+ t.framer.writer.Flush()
if retErr != nil {
return false, retErr
}
@@ -1371,7 +1366,7 @@ func (t *http2Server) outgoingGoAwayHandler(g *goAway) (bool, error) {
return false, err
}
go func() {
- timer := time.NewTimer(time.Minute)
+ timer := time.NewTimer(5 * time.Second)
defer timer.Stop()
select {
case <-t.drainEvent.Done():
@@ -1397,11 +1392,11 @@ func (t *http2Server) ChannelzMetric() *channelz.SocketInternalMetric {
LastMessageReceivedTimestamp: time.Unix(0, atomic.LoadInt64(&t.czData.lastMsgRecvTime)),
LocalFlowControlWindow: int64(t.fc.getSize()),
SocketOptions: channelz.GetSocketOption(t.conn),
- LocalAddr: t.localAddr,
- RemoteAddr: t.remoteAddr,
+ LocalAddr: t.peer.LocalAddr,
+ RemoteAddr: t.peer.Addr,
// RemoteName :
}
- if au, ok := t.authInfo.(credentials.ChannelzSecurityInfo); ok {
+ if au, ok := t.peer.AuthInfo.(credentials.ChannelzSecurityInfo); ok {
s.Security = au.GetSecurityValue()
}
s.RemoteFlowControlWindow = t.getOutFlowWindow()
@@ -1433,10 +1428,12 @@ func (t *http2Server) getOutFlowWindow() int64 {
}
}
-func (t *http2Server) getPeer() *peer.Peer {
+// Peer returns the peer of the transport.
+func (t *http2Server) Peer() *peer.Peer {
return &peer.Peer{
- Addr: t.remoteAddr,
- AuthInfo: t.authInfo, // Can be nil
+ Addr: t.peer.Addr,
+ LocalAddr: t.peer.LocalAddr,
+ AuthInfo: t.peer.AuthInfo, // Can be nil
}
}
@@ -1461,6 +1458,6 @@ func GetConnection(ctx context.Context) net.Conn {
// SetConnection adds the connection to the context to be able to get
// information about the destination ip and port for an incoming RPC. This also
// allows any unary or streaming interceptors to see the connection.
-func setConnection(ctx context.Context, conn net.Conn) context.Context {
+func SetConnection(ctx context.Context, conn net.Conn) context.Context {
return context.WithValue(ctx, connectionKey{}, conn)
}
diff --git a/vendor/google.golang.org/grpc/internal/transport/proxy.go b/vendor/google.golang.org/grpc/internal/transport/proxy.go
index 415961987..24fa10325 100644
--- a/vendor/google.golang.org/grpc/internal/transport/proxy.go
+++ b/vendor/google.golang.org/grpc/internal/transport/proxy.go
@@ -28,6 +28,8 @@ import (
"net/http"
"net/http/httputil"
"net/url"
+
+ "google.golang.org/grpc/internal"
)
const proxyAuthHeaderKey = "Proxy-Authorization"
@@ -112,7 +114,7 @@ func doHTTPConnectHandshake(ctx context.Context, conn net.Conn, backendAddr stri
// proxyDial dials, connecting to a proxy first if necessary. Checks if a proxy
// is necessary, dials, does the HTTP CONNECT handshake, and returns the
// connection.
-func proxyDial(ctx context.Context, addr string, grpcUA string) (conn net.Conn, err error) {
+func proxyDial(ctx context.Context, addr string, grpcUA string) (net.Conn, error) {
newAddr := addr
proxyURL, err := mapAddress(addr)
if err != nil {
@@ -122,15 +124,15 @@ func proxyDial(ctx context.Context, addr string, grpcUA string) (conn net.Conn,
newAddr = proxyURL.Host
}
- conn, err = (&net.Dialer{}).DialContext(ctx, "tcp", newAddr)
+ conn, err := internal.NetDialerWithTCPKeepalive().DialContext(ctx, "tcp", newAddr)
if err != nil {
- return
+ return nil, err
}
- if proxyURL != nil {
+ if proxyURL == nil {
// proxy is disabled if proxyURL is nil.
- conn, err = doHTTPConnectHandshake(ctx, conn, addr, proxyURL, grpcUA)
+ return conn, err
}
- return
+ return doHTTPConnectHandshake(ctx, conn, addr, proxyURL, grpcUA)
}
func sendHTTPRequest(ctx context.Context, req *http.Request, conn net.Conn) error {
diff --git a/vendor/google.golang.org/grpc/internal/transport/transport.go b/vendor/google.golang.org/grpc/internal/transport/transport.go
index aac056e72..b7b8fec18 100644
--- a/vendor/google.golang.org/grpc/internal/transport/transport.go
+++ b/vendor/google.golang.org/grpc/internal/transport/transport.go
@@ -37,6 +37,7 @@ import (
"google.golang.org/grpc/internal/channelz"
"google.golang.org/grpc/keepalive"
"google.golang.org/grpc/metadata"
+ "google.golang.org/grpc/peer"
"google.golang.org/grpc/resolver"
"google.golang.org/grpc/stats"
"google.golang.org/grpc/status"
@@ -265,7 +266,8 @@ type Stream struct {
// headerValid indicates whether a valid header was received. Only
// meaningful after headerChan is closed (always call waitOnHeader() before
// reading its value). Not valid on server side.
- headerValid bool
+ headerValid bool
+ headerWireLength int // Only set on server side.
// hdrMu protects header and trailer metadata on the server-side.
hdrMu sync.Mutex
@@ -425,6 +427,12 @@ func (s *Stream) Context() context.Context {
return s.ctx
}
+// SetContext sets the context of the stream. This will be deleted once the
+// stats handler callouts all move to gRPC layer.
+func (s *Stream) SetContext(ctx context.Context) {
+ s.ctx = ctx
+}
+
// Method returns the method for the stream.
func (s *Stream) Method() string {
return s.method
@@ -437,6 +445,12 @@ func (s *Stream) Status() *status.Status {
return s.status
}
+// HeaderWireLength returns the size of the headers of the stream as received
+// from the wire. Valid only on the server.
+func (s *Stream) HeaderWireLength() int {
+ return s.headerWireLength
+}
+
// SetHeader sets the header metadata. This can be called multiple times.
// Server side only.
// This should not be called in parallel to other data writes.
@@ -698,7 +712,7 @@ type ClientTransport interface {
// Write methods for a given Stream will be called serially.
type ServerTransport interface {
// HandleStreams receives incoming streams using the given handler.
- HandleStreams(func(*Stream))
+ HandleStreams(context.Context, func(*Stream))
// WriteHeader sends the header metadata for the given stream.
// WriteHeader may not be called on all streams.
@@ -717,8 +731,8 @@ type ServerTransport interface {
// handlers will be terminated asynchronously.
Close(err error)
- // RemoteAddr returns the remote network address.
- RemoteAddr() net.Addr
+ // Peer returns the peer of the server transport.
+ Peer() *peer.Peer
// Drain notifies the client this ServerTransport stops accepting new RPCs.
Drain(debugData string)
diff --git a/vendor/google.golang.org/grpc/metadata/metadata.go b/vendor/google.golang.org/grpc/metadata/metadata.go
index a2cdcaf12..1e9485fd6 100644
--- a/vendor/google.golang.org/grpc/metadata/metadata.go
+++ b/vendor/google.golang.org/grpc/metadata/metadata.go
@@ -25,8 +25,14 @@ import (
"context"
"fmt"
"strings"
+
+ "google.golang.org/grpc/internal"
)
+func init() {
+ internal.FromOutgoingContextRaw = fromOutgoingContextRaw
+}
+
// DecodeKeyValue returns k, v, nil.
//
// Deprecated: use k and v directly instead.
@@ -153,14 +159,16 @@ func Join(mds ...MD) MD {
type mdIncomingKey struct{}
type mdOutgoingKey struct{}
-// NewIncomingContext creates a new context with incoming md attached.
+// NewIncomingContext creates a new context with incoming md attached. md must
+// not be modified after calling this function.
func NewIncomingContext(ctx context.Context, md MD) context.Context {
return context.WithValue(ctx, mdIncomingKey{}, md)
}
// NewOutgoingContext creates a new context with outgoing md attached. If used
// in conjunction with AppendToOutgoingContext, NewOutgoingContext will
-// overwrite any previously-appended metadata.
+// overwrite any previously-appended metadata. md must not be modified after
+// calling this function.
func NewOutgoingContext(ctx context.Context, md MD) context.Context {
return context.WithValue(ctx, mdOutgoingKey{}, rawMD{md: md})
}
@@ -203,7 +211,8 @@ func FromIncomingContext(ctx context.Context) (MD, bool) {
}
// ValueFromIncomingContext returns the metadata value corresponding to the metadata
-// key from the incoming metadata if it exists. Key must be lower-case.
+// key from the incoming metadata if it exists. Keys are matched in a case insensitive
+// manner.
//
// # Experimental
//
@@ -219,33 +228,29 @@ func ValueFromIncomingContext(ctx context.Context, key string) []string {
return copyOf(v)
}
for k, v := range md {
- // We need to manually convert all keys to lower case, because MD is a
- // map, and there's no guarantee that the MD attached to the context is
- // created using our helper functions.
- if strings.ToLower(k) == key {
+ // Case insenitive comparison: MD is a map, and there's no guarantee
+ // that the MD attached to the context is created using our helper
+ // functions.
+ if strings.EqualFold(k, key) {
return copyOf(v)
}
}
return nil
}
-// the returned slice must not be modified in place
func copyOf(v []string) []string {
vals := make([]string, len(v))
copy(vals, v)
return vals
}
-// FromOutgoingContextRaw returns the un-merged, intermediary contents of rawMD.
+// fromOutgoingContextRaw returns the un-merged, intermediary contents of rawMD.
//
// Remember to perform strings.ToLower on the keys, for both the returned MD (MD
// is a map, there's no guarantee it's created using our helper functions) and
// the extra kv pairs (AppendToOutgoingContext doesn't turn them into
// lowercase).
-//
-// This is intended for gRPC-internal use ONLY. Users should use
-// FromOutgoingContext instead.
-func FromOutgoingContextRaw(ctx context.Context) (MD, [][]string, bool) {
+func fromOutgoingContextRaw(ctx context.Context) (MD, [][]string, bool) {
raw, ok := ctx.Value(mdOutgoingKey{}).(rawMD)
if !ok {
return nil, nil, false
diff --git a/vendor/google.golang.org/grpc/peer/peer.go b/vendor/google.golang.org/grpc/peer/peer.go
index e01d219ff..a821ff9b2 100644
--- a/vendor/google.golang.org/grpc/peer/peer.go
+++ b/vendor/google.golang.org/grpc/peer/peer.go
@@ -32,6 +32,8 @@ import (
type Peer struct {
// Addr is the peer address.
Addr net.Addr
+ // LocalAddr is the local address.
+ LocalAddr net.Addr
// AuthInfo is the authentication information of the transport.
// It is nil if there is no transport security being used.
AuthInfo credentials.AuthInfo
diff --git a/vendor/google.golang.org/grpc/picker_wrapper.go b/vendor/google.golang.org/grpc/picker_wrapper.go
index 236837f41..bf56faa76 100644
--- a/vendor/google.golang.org/grpc/picker_wrapper.go
+++ b/vendor/google.golang.org/grpc/picker_wrapper.go
@@ -37,7 +37,6 @@ import (
type pickerWrapper struct {
mu sync.Mutex
done bool
- idle bool
blockingCh chan struct{}
picker balancer.Picker
statsHandlers []stats.Handler // to record blocking picker calls
@@ -53,11 +52,7 @@ func newPickerWrapper(statsHandlers []stats.Handler) *pickerWrapper {
// updatePicker is called by UpdateBalancerState. It unblocks all blocked pick.
func (pw *pickerWrapper) updatePicker(p balancer.Picker) {
pw.mu.Lock()
- if pw.done || pw.idle {
- // There is a small window where a picker update from the LB policy can
- // race with the channel going to idle mode. If the picker is idle here,
- // it is because the channel asked it to do so, and therefore it is sage
- // to ignore the update from the LB policy.
+ if pw.done {
pw.mu.Unlock()
return
}
@@ -210,23 +205,15 @@ func (pw *pickerWrapper) close() {
close(pw.blockingCh)
}
-func (pw *pickerWrapper) enterIdleMode() {
- pw.mu.Lock()
- defer pw.mu.Unlock()
- if pw.done {
- return
- }
- pw.idle = true
-}
-
-func (pw *pickerWrapper) exitIdleMode() {
+// reset clears the pickerWrapper and prepares it for being used again when idle
+// mode is exited.
+func (pw *pickerWrapper) reset() {
pw.mu.Lock()
defer pw.mu.Unlock()
if pw.done {
return
}
pw.blockingCh = make(chan struct{})
- pw.idle = false
}
// dropError is a wrapper error that indicates the LB policy wishes to drop the
diff --git a/vendor/google.golang.org/grpc/pickfirst.go b/vendor/google.golang.org/grpc/pickfirst.go
index 2e9cf66b4..5128f9364 100644
--- a/vendor/google.golang.org/grpc/pickfirst.go
+++ b/vendor/google.golang.org/grpc/pickfirst.go
@@ -25,7 +25,6 @@ import (
"google.golang.org/grpc/balancer"
"google.golang.org/grpc/connectivity"
- "google.golang.org/grpc/internal/envconfig"
internalgrpclog "google.golang.org/grpc/internal/grpclog"
"google.golang.org/grpc/internal/grpcrand"
"google.golang.org/grpc/internal/pretty"
@@ -65,19 +64,6 @@ type pfConfig struct {
}
func (*pickfirstBuilder) ParseConfig(js json.RawMessage) (serviceconfig.LoadBalancingConfig, error) {
- if !envconfig.PickFirstLBConfig {
- // Prior to supporting loadbalancing configuration, the pick_first LB
- // policy did not implement the balancer.ConfigParser interface. This
- // meant that if a non-empty configuration was passed to it, the service
- // config unmarshaling code would throw a warning log, but would
- // continue using the pick_first LB policy. The code below ensures the
- // same behavior is retained if the env var is not set.
- if string(js) != "{}" {
- logger.Warningf("Ignoring non-empty balancer configuration %q for the pick_first LB policy", string(js))
- }
- return nil, nil
- }
-
var cfg pfConfig
if err := json.Unmarshal(js, &cfg); err != nil {
return nil, fmt.Errorf("pickfirst: unable to unmarshal LB policy config: %s, error: %v", string(js), err)
diff --git a/vendor/google.golang.org/grpc/resolver/dns/dns_resolver.go b/vendor/google.golang.org/grpc/resolver/dns/dns_resolver.go
new file mode 100644
index 000000000..14aa6f20a
--- /dev/null
+++ b/vendor/google.golang.org/grpc/resolver/dns/dns_resolver.go
@@ -0,0 +1,36 @@
+/*
+ *
+ * Copyright 2018 gRPC authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+// Package dns implements a dns resolver to be installed as the default resolver
+// in grpc.
+//
+// Deprecated: this package is imported by grpc and should not need to be
+// imported directly by users.
+package dns
+
+import (
+ "google.golang.org/grpc/internal/resolver/dns"
+ "google.golang.org/grpc/resolver"
+)
+
+// NewBuilder creates a dnsBuilder which is used to factory DNS resolvers.
+//
+// Deprecated: import grpc and use resolver.Get("dns") instead.
+func NewBuilder() resolver.Builder {
+ return dns.NewBuilder()
+}
diff --git a/vendor/google.golang.org/grpc/resolver/map.go b/vendor/google.golang.org/grpc/resolver/map.go
index 804be887d..ada5b9bb7 100644
--- a/vendor/google.golang.org/grpc/resolver/map.go
+++ b/vendor/google.golang.org/grpc/resolver/map.go
@@ -136,3 +136,116 @@ func (a *AddressMap) Values() []any {
}
return ret
}
+
+type endpointNode struct {
+ addrs map[string]struct{}
+}
+
+// Equal returns whether the unordered set of addrs are the same between the
+// endpoint nodes.
+func (en *endpointNode) Equal(en2 *endpointNode) bool {
+ if len(en.addrs) != len(en2.addrs) {
+ return false
+ }
+ for addr := range en.addrs {
+ if _, ok := en2.addrs[addr]; !ok {
+ return false
+ }
+ }
+ return true
+}
+
+func toEndpointNode(endpoint Endpoint) endpointNode {
+ en := make(map[string]struct{})
+ for _, addr := range endpoint.Addresses {
+ en[addr.Addr] = struct{}{}
+ }
+ return endpointNode{
+ addrs: en,
+ }
+}
+
+// EndpointMap is a map of endpoints to arbitrary values keyed on only the
+// unordered set of address strings within an endpoint. This map is not thread
+// safe, thus it is unsafe to access concurrently. Must be created via
+// NewEndpointMap; do not construct directly.
+type EndpointMap struct {
+ endpoints map[*endpointNode]any
+}
+
+// NewEndpointMap creates a new EndpointMap.
+func NewEndpointMap() *EndpointMap {
+ return &EndpointMap{
+ endpoints: make(map[*endpointNode]any),
+ }
+}
+
+// Get returns the value for the address in the map, if present.
+func (em *EndpointMap) Get(e Endpoint) (value any, ok bool) {
+ en := toEndpointNode(e)
+ if endpoint := em.find(en); endpoint != nil {
+ return em.endpoints[endpoint], true
+ }
+ return nil, false
+}
+
+// Set updates or adds the value to the address in the map.
+func (em *EndpointMap) Set(e Endpoint, value any) {
+ en := toEndpointNode(e)
+ if endpoint := em.find(en); endpoint != nil {
+ em.endpoints[endpoint] = value
+ return
+ }
+ em.endpoints[&en] = value
+}
+
+// Len returns the number of entries in the map.
+func (em *EndpointMap) Len() int {
+ return len(em.endpoints)
+}
+
+// Keys returns a slice of all current map keys, as endpoints specifying the
+// addresses present in the endpoint keys, in which uniqueness is determined by
+// the unordered set of addresses. Thus, endpoint information returned is not
+// the full endpoint data (drops duplicated addresses and attributes) but can be
+// used for EndpointMap accesses.
+func (em *EndpointMap) Keys() []Endpoint {
+ ret := make([]Endpoint, 0, len(em.endpoints))
+ for en := range em.endpoints {
+ var endpoint Endpoint
+ for addr := range en.addrs {
+ endpoint.Addresses = append(endpoint.Addresses, Address{Addr: addr})
+ }
+ ret = append(ret, endpoint)
+ }
+ return ret
+}
+
+// Values returns a slice of all current map values.
+func (em *EndpointMap) Values() []any {
+ ret := make([]any, 0, len(em.endpoints))
+ for _, val := range em.endpoints {
+ ret = append(ret, val)
+ }
+ return ret
+}
+
+// find returns a pointer to the endpoint node in em if the endpoint node is
+// already present. If not found, nil is returned. The comparisons are done on
+// the unordered set of addresses within an endpoint.
+func (em EndpointMap) find(e endpointNode) *endpointNode {
+ for endpoint := range em.endpoints {
+ if e.Equal(endpoint) {
+ return endpoint
+ }
+ }
+ return nil
+}
+
+// Delete removes the specified endpoint from the map.
+func (em *EndpointMap) Delete(e Endpoint) {
+ en := toEndpointNode(e)
+ if entry := em.find(en); entry != nil {
+ delete(em.endpoints, entry)
+ }
+}
diff --git a/vendor/google.golang.org/grpc/resolver/resolver.go b/vendor/google.golang.org/grpc/resolver/resolver.go
index 11384e228..adf89dd9c 100644
--- a/vendor/google.golang.org/grpc/resolver/resolver.go
+++ b/vendor/google.golang.org/grpc/resolver/resolver.go
@@ -240,11 +240,6 @@ type ClientConn interface {
//
// Deprecated: Use UpdateState instead.
NewAddress(addresses []Address)
- // NewServiceConfig is called by resolver to notify ClientConn a new
- // service config. The service config should be provided as a json string.
- //
- // Deprecated: Use UpdateState instead.
- NewServiceConfig(serviceConfig string)
// ParseServiceConfig parses the provided service config and returns an
// object that provides the parsed config.
ParseServiceConfig(serviceConfigJSON string) *serviceconfig.ParseResult
@@ -286,6 +281,11 @@ func (t Target) Endpoint() string {
return strings.TrimPrefix(endpoint, "/")
}
+// String returns a string representation of Target.
+func (t Target) String() string {
+ return t.URL.String()
+}
+
// Builder creates a resolver that will be used to watch name resolution updates.
type Builder interface {
// Build creates a new resolver for the given target.
@@ -314,3 +314,13 @@ type Resolver interface {
// Close closes the resolver.
Close()
}
+
+// AuthorityOverrider is implemented by Builders that wish to override the
+// default authority for the ClientConn.
+// By default, the authority used is target.Endpoint().
+type AuthorityOverrider interface {
+ // OverrideAuthority returns the authority to use for a ClientConn with the
+ // given target. The implementation must generate it without blocking,
+ // typically in line, and must keep it unchanged.
+ OverrideAuthority(Target) string
+}
diff --git a/vendor/google.golang.org/grpc/resolver_conn_wrapper.go b/vendor/google.golang.org/grpc/resolver_conn_wrapper.go
deleted file mode 100644
index d68330560..000000000
--- a/vendor/google.golang.org/grpc/resolver_conn_wrapper.go
+++ /dev/null
@@ -1,247 +0,0 @@
-/*
- *
- * Copyright 2017 gRPC authors.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package grpc
-
-import (
- "context"
- "strings"
- "sync"
-
- "google.golang.org/grpc/balancer"
- "google.golang.org/grpc/internal/channelz"
- "google.golang.org/grpc/internal/grpcsync"
- "google.golang.org/grpc/internal/pretty"
- "google.golang.org/grpc/resolver"
- "google.golang.org/grpc/serviceconfig"
-)
-
-// resolverStateUpdater wraps the single method used by ccResolverWrapper to
-// report a state update from the actual resolver implementation.
-type resolverStateUpdater interface {
- updateResolverState(s resolver.State, err error) error
-}
-
-// ccResolverWrapper is a wrapper on top of cc for resolvers.
-// It implements resolver.ClientConn interface.
-type ccResolverWrapper struct {
- // The following fields are initialized when the wrapper is created and are
- // read-only afterwards, and therefore can be accessed without a mutex.
- cc resolverStateUpdater
- channelzID *channelz.Identifier
- ignoreServiceConfig bool
- opts ccResolverWrapperOpts
- serializer *grpcsync.CallbackSerializer // To serialize all incoming calls.
- serializerCancel context.CancelFunc // To close the serializer, accessed only from close().
-
- // All incoming (resolver --> gRPC) calls are guaranteed to execute in a
- // mutually exclusive manner as they are scheduled on the serializer.
- // Fields accessed *only* in these serializer callbacks, can therefore be
- // accessed without a mutex.
- curState resolver.State
-
- // mu guards access to the below fields.
- mu sync.Mutex
- closed bool
- resolver resolver.Resolver // Accessed only from outgoing calls.
-}
-
-// ccResolverWrapperOpts wraps the arguments to be passed when creating a new
-// ccResolverWrapper.
-type ccResolverWrapperOpts struct {
- target resolver.Target // User specified dial target to resolve.
- builder resolver.Builder // Resolver builder to use.
- bOpts resolver.BuildOptions // Resolver build options to use.
- channelzID *channelz.Identifier // Channelz identifier for the channel.
-}
-
-// newCCResolverWrapper uses the resolver.Builder to build a Resolver and
-// returns a ccResolverWrapper object which wraps the newly built resolver.
-func newCCResolverWrapper(cc resolverStateUpdater, opts ccResolverWrapperOpts) (*ccResolverWrapper, error) {
- ctx, cancel := context.WithCancel(context.Background())
- ccr := &ccResolverWrapper{
- cc: cc,
- channelzID: opts.channelzID,
- ignoreServiceConfig: opts.bOpts.DisableServiceConfig,
- opts: opts,
- serializer: grpcsync.NewCallbackSerializer(ctx),
- serializerCancel: cancel,
- }
-
- // Cannot hold the lock at build time because the resolver can send an
- // update or error inline and these incoming calls grab the lock to schedule
- // a callback in the serializer.
- r, err := opts.builder.Build(opts.target, ccr, opts.bOpts)
- if err != nil {
- cancel()
- return nil, err
- }
-
- // Any error reported by the resolver at build time that leads to a
- // re-resolution request from the balancer is dropped by grpc until we
- // return from this function. So, we don't have to handle pending resolveNow
- // requests here.
- ccr.mu.Lock()
- ccr.resolver = r
- ccr.mu.Unlock()
-
- return ccr, nil
-}
-
-func (ccr *ccResolverWrapper) resolveNow(o resolver.ResolveNowOptions) {
- ccr.mu.Lock()
- defer ccr.mu.Unlock()
-
- // ccr.resolver field is set only after the call to Build() returns. But in
- // the process of building, the resolver may send an error update which when
- // propagated to the balancer may result in a re-resolution request.
- if ccr.closed || ccr.resolver == nil {
- return
- }
- ccr.resolver.ResolveNow(o)
-}
-
-func (ccr *ccResolverWrapper) close() {
- ccr.mu.Lock()
- if ccr.closed {
- ccr.mu.Unlock()
- return
- }
-
- channelz.Info(logger, ccr.channelzID, "Closing the name resolver")
-
- // Close the serializer to ensure that no more calls from the resolver are
- // handled, before actually closing the resolver.
- ccr.serializerCancel()
- ccr.closed = true
- r := ccr.resolver
- ccr.mu.Unlock()
-
- // Give enqueued callbacks a chance to finish.
- <-ccr.serializer.Done()
-
- // Spawn a goroutine to close the resolver (since it may block trying to
- // cleanup all allocated resources) and return early.
- go r.Close()
-}
-
-// serializerScheduleLocked is a convenience method to schedule a function to be
-// run on the serializer while holding ccr.mu.
-func (ccr *ccResolverWrapper) serializerScheduleLocked(f func(context.Context)) {
- ccr.mu.Lock()
- ccr.serializer.Schedule(f)
- ccr.mu.Unlock()
-}
-
-// UpdateState is called by resolver implementations to report new state to gRPC
-// which includes addresses and service config.
-func (ccr *ccResolverWrapper) UpdateState(s resolver.State) error {
- errCh := make(chan error, 1)
- if s.Endpoints == nil {
- s.Endpoints = make([]resolver.Endpoint, 0, len(s.Addresses))
- for _, a := range s.Addresses {
- ep := resolver.Endpoint{Addresses: []resolver.Address{a}, Attributes: a.BalancerAttributes}
- ep.Addresses[0].BalancerAttributes = nil
- s.Endpoints = append(s.Endpoints, ep)
- }
- }
- ok := ccr.serializer.Schedule(func(context.Context) {
- ccr.addChannelzTraceEvent(s)
- ccr.curState = s
- if err := ccr.cc.updateResolverState(ccr.curState, nil); err == balancer.ErrBadResolverState {
- errCh <- balancer.ErrBadResolverState
- return
- }
- errCh <- nil
- })
- if !ok {
- // The only time when Schedule() fail to add the callback to the
- // serializer is when the serializer is closed, and this happens only
- // when the resolver wrapper is closed.
- return nil
- }
- return <-errCh
-}
-
-// ReportError is called by resolver implementations to report errors
-// encountered during name resolution to gRPC.
-func (ccr *ccResolverWrapper) ReportError(err error) {
- ccr.serializerScheduleLocked(func(_ context.Context) {
- channelz.Warningf(logger, ccr.channelzID, "ccResolverWrapper: reporting error to cc: %v", err)
- ccr.cc.updateResolverState(resolver.State{}, err)
- })
-}
-
-// NewAddress is called by the resolver implementation to send addresses to
-// gRPC.
-func (ccr *ccResolverWrapper) NewAddress(addrs []resolver.Address) {
- ccr.serializerScheduleLocked(func(_ context.Context) {
- ccr.addChannelzTraceEvent(resolver.State{Addresses: addrs, ServiceConfig: ccr.curState.ServiceConfig})
- ccr.curState.Addresses = addrs
- ccr.cc.updateResolverState(ccr.curState, nil)
- })
-}
-
-// NewServiceConfig is called by the resolver implementation to send service
-// configs to gRPC.
-func (ccr *ccResolverWrapper) NewServiceConfig(sc string) {
- ccr.serializerScheduleLocked(func(_ context.Context) {
- channelz.Infof(logger, ccr.channelzID, "ccResolverWrapper: got new service config: %s", sc)
- if ccr.ignoreServiceConfig {
- channelz.Info(logger, ccr.channelzID, "Service config lookups disabled; ignoring config")
- return
- }
- scpr := parseServiceConfig(sc)
- if scpr.Err != nil {
- channelz.Warningf(logger, ccr.channelzID, "ccResolverWrapper: error parsing service config: %v", scpr.Err)
- return
- }
- ccr.addChannelzTraceEvent(resolver.State{Addresses: ccr.curState.Addresses, ServiceConfig: scpr})
- ccr.curState.ServiceConfig = scpr
- ccr.cc.updateResolverState(ccr.curState, nil)
- })
-}
-
-// ParseServiceConfig is called by resolver implementations to parse a JSON
-// representation of the service config.
-func (ccr *ccResolverWrapper) ParseServiceConfig(scJSON string) *serviceconfig.ParseResult {
- return parseServiceConfig(scJSON)
-}
-
-// addChannelzTraceEvent adds a channelz trace event containing the new
-// state received from resolver implementations.
-func (ccr *ccResolverWrapper) addChannelzTraceEvent(s resolver.State) {
- var updates []string
- var oldSC, newSC *ServiceConfig
- var oldOK, newOK bool
- if ccr.curState.ServiceConfig != nil {
- oldSC, oldOK = ccr.curState.ServiceConfig.Config.(*ServiceConfig)
- }
- if s.ServiceConfig != nil {
- newSC, newOK = s.ServiceConfig.Config.(*ServiceConfig)
- }
- if oldOK != newOK || (oldOK && newOK && oldSC.rawJSONString != newSC.rawJSONString) {
- updates = append(updates, "service config updated")
- }
- if len(ccr.curState.Addresses) > 0 && len(s.Addresses) == 0 {
- updates = append(updates, "resolver returned an empty address list")
- } else if len(ccr.curState.Addresses) == 0 && len(s.Addresses) > 0 {
- updates = append(updates, "resolver returned new addresses")
- }
- channelz.Infof(logger, ccr.channelzID, "Resolver state updated: %s (%v)", pretty.ToJSON(s), strings.Join(updates, "; "))
-}
diff --git a/vendor/google.golang.org/grpc/resolver_wrapper.go b/vendor/google.golang.org/grpc/resolver_wrapper.go
new file mode 100644
index 000000000..c79bab121
--- /dev/null
+++ b/vendor/google.golang.org/grpc/resolver_wrapper.go
@@ -0,0 +1,197 @@
+/*
+ *
+ * Copyright 2017 gRPC authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package grpc
+
+import (
+ "context"
+ "strings"
+ "sync"
+
+ "google.golang.org/grpc/internal/channelz"
+ "google.golang.org/grpc/internal/grpcsync"
+ "google.golang.org/grpc/internal/pretty"
+ "google.golang.org/grpc/resolver"
+ "google.golang.org/grpc/serviceconfig"
+)
+
+// ccResolverWrapper is a wrapper on top of cc for resolvers.
+// It implements resolver.ClientConn interface.
+type ccResolverWrapper struct {
+ // The following fields are initialized when the wrapper is created and are
+ // read-only afterwards, and therefore can be accessed without a mutex.
+ cc *ClientConn
+ ignoreServiceConfig bool
+ serializer *grpcsync.CallbackSerializer
+ serializerCancel context.CancelFunc
+
+ resolver resolver.Resolver // only accessed within the serializer
+
+ // The following fields are protected by mu. Caller must take cc.mu before
+ // taking mu.
+ mu sync.Mutex
+ curState resolver.State
+ closed bool
+}
+
+// newCCResolverWrapper initializes the ccResolverWrapper. It can only be used
+// after calling start, which builds the resolver.
+func newCCResolverWrapper(cc *ClientConn) *ccResolverWrapper {
+ ctx, cancel := context.WithCancel(cc.ctx)
+ return &ccResolverWrapper{
+ cc: cc,
+ ignoreServiceConfig: cc.dopts.disableServiceConfig,
+ serializer: grpcsync.NewCallbackSerializer(ctx),
+ serializerCancel: cancel,
+ }
+}
+
+// start builds the name resolver using the resolver.Builder in cc and returns
+// any error encountered. It must always be the first operation performed on
+// any newly created ccResolverWrapper, except that close may be called instead.
+func (ccr *ccResolverWrapper) start() error {
+ errCh := make(chan error)
+ ccr.serializer.Schedule(func(ctx context.Context) {
+ if ctx.Err() != nil {
+ return
+ }
+ opts := resolver.BuildOptions{
+ DisableServiceConfig: ccr.cc.dopts.disableServiceConfig,
+ DialCreds: ccr.cc.dopts.copts.TransportCredentials,
+ CredsBundle: ccr.cc.dopts.copts.CredsBundle,
+ Dialer: ccr.cc.dopts.copts.Dialer,
+ }
+ var err error
+ ccr.resolver, err = ccr.cc.resolverBuilder.Build(ccr.cc.parsedTarget, ccr, opts)
+ errCh <- err
+ })
+ return <-errCh
+}
+
+func (ccr *ccResolverWrapper) resolveNow(o resolver.ResolveNowOptions) {
+ ccr.serializer.Schedule(func(ctx context.Context) {
+ if ctx.Err() != nil || ccr.resolver == nil {
+ return
+ }
+ ccr.resolver.ResolveNow(o)
+ })
+}
+
+// close initiates async shutdown of the wrapper. To determine the wrapper has
+// finished shutting down, the channel should block on ccr.serializer.Done()
+// without cc.mu held.
+func (ccr *ccResolverWrapper) close() {
+ channelz.Info(logger, ccr.cc.channelzID, "Closing the name resolver")
+ ccr.mu.Lock()
+ ccr.closed = true
+ ccr.mu.Unlock()
+
+ ccr.serializer.Schedule(func(context.Context) {
+ if ccr.resolver == nil {
+ return
+ }
+ ccr.resolver.Close()
+ ccr.resolver = nil
+ })
+ ccr.serializerCancel()
+}
+
+// UpdateState is called by resolver implementations to report new state to gRPC
+// which includes addresses and service config.
+func (ccr *ccResolverWrapper) UpdateState(s resolver.State) error {
+ ccr.cc.mu.Lock()
+ ccr.mu.Lock()
+ if ccr.closed {
+ ccr.mu.Unlock()
+ ccr.cc.mu.Unlock()
+ return nil
+ }
+ if s.Endpoints == nil {
+ s.Endpoints = make([]resolver.Endpoint, 0, len(s.Addresses))
+ for _, a := range s.Addresses {
+ ep := resolver.Endpoint{Addresses: []resolver.Address{a}, Attributes: a.BalancerAttributes}
+ ep.Addresses[0].BalancerAttributes = nil
+ s.Endpoints = append(s.Endpoints, ep)
+ }
+ }
+ ccr.addChannelzTraceEvent(s)
+ ccr.curState = s
+ ccr.mu.Unlock()
+ return ccr.cc.updateResolverStateAndUnlock(s, nil)
+}
+
+// ReportError is called by resolver implementations to report errors
+// encountered during name resolution to gRPC.
+func (ccr *ccResolverWrapper) ReportError(err error) {
+ ccr.cc.mu.Lock()
+ ccr.mu.Lock()
+ if ccr.closed {
+ ccr.mu.Unlock()
+ ccr.cc.mu.Unlock()
+ return
+ }
+ ccr.mu.Unlock()
+ channelz.Warningf(logger, ccr.cc.channelzID, "ccResolverWrapper: reporting error to cc: %v", err)
+ ccr.cc.updateResolverStateAndUnlock(resolver.State{}, err)
+}
+
+// NewAddress is called by the resolver implementation to send addresses to
+// gRPC.
+func (ccr *ccResolverWrapper) NewAddress(addrs []resolver.Address) {
+ ccr.cc.mu.Lock()
+ ccr.mu.Lock()
+ if ccr.closed {
+ ccr.mu.Unlock()
+ ccr.cc.mu.Unlock()
+ return
+ }
+ s := resolver.State{Addresses: addrs, ServiceConfig: ccr.curState.ServiceConfig}
+ ccr.addChannelzTraceEvent(s)
+ ccr.curState = s
+ ccr.mu.Unlock()
+ ccr.cc.updateResolverStateAndUnlock(s, nil)
+}
+
+// ParseServiceConfig is called by resolver implementations to parse a JSON
+// representation of the service config.
+func (ccr *ccResolverWrapper) ParseServiceConfig(scJSON string) *serviceconfig.ParseResult {
+ return parseServiceConfig(scJSON)
+}
+
+// addChannelzTraceEvent adds a channelz trace event containing the new
+// state received from resolver implementations.
+func (ccr *ccResolverWrapper) addChannelzTraceEvent(s resolver.State) {
+ var updates []string
+ var oldSC, newSC *ServiceConfig
+ var oldOK, newOK bool
+ if ccr.curState.ServiceConfig != nil {
+ oldSC, oldOK = ccr.curState.ServiceConfig.Config.(*ServiceConfig)
+ }
+ if s.ServiceConfig != nil {
+ newSC, newOK = s.ServiceConfig.Config.(*ServiceConfig)
+ }
+ if oldOK != newOK || (oldOK && newOK && oldSC.rawJSONString != newSC.rawJSONString) {
+ updates = append(updates, "service config updated")
+ }
+ if len(ccr.curState.Addresses) > 0 && len(s.Addresses) == 0 {
+ updates = append(updates, "resolver returned an empty address list")
+ } else if len(ccr.curState.Addresses) == 0 && len(s.Addresses) > 0 {
+ updates = append(updates, "resolver returned new addresses")
+ }
+ channelz.Infof(logger, ccr.cc.channelzID, "Resolver state updated: %s (%v)", pretty.ToJSON(s), strings.Join(updates, "; "))
+}
diff --git a/vendor/google.golang.org/grpc/rpc_util.go b/vendor/google.golang.org/grpc/rpc_util.go
index b7723aa09..a4b6bc687 100644
--- a/vendor/google.golang.org/grpc/rpc_util.go
+++ b/vendor/google.golang.org/grpc/rpc_util.go
@@ -640,14 +640,18 @@ func encode(c baseCodec, msg any) ([]byte, error) {
return b, nil
}
-// compress returns the input bytes compressed by compressor or cp. If both
-// compressors are nil, returns nil.
+// compress returns the input bytes compressed by compressor or cp.
+// If both compressors are nil, or if the message has zero length, returns nil,
+// indicating no compression was done.
//
// TODO(dfawley): eliminate cp parameter by wrapping Compressor in an encoding.Compressor.
func compress(in []byte, cp Compressor, compressor encoding.Compressor) ([]byte, error) {
if compressor == nil && cp == nil {
return nil, nil
}
+ if len(in) == 0 {
+ return nil, nil
+ }
wrapErr := func(err error) error {
return status.Errorf(codes.Internal, "grpc: error while compressing: %v", err.Error())
}
diff --git a/vendor/google.golang.org/grpc/server.go b/vendor/google.golang.org/grpc/server.go
index 8f60d4214..e89c5ac61 100644
--- a/vendor/google.golang.org/grpc/server.go
+++ b/vendor/google.golang.org/grpc/server.go
@@ -70,9 +70,10 @@ func init() {
internal.GetServerCredentials = func(srv *Server) credentials.TransportCredentials {
return srv.opts.creds
}
- internal.DrainServerTransports = func(srv *Server, addr string) {
- srv.drainServerTransports(addr)
+ internal.IsRegisteredMethod = func(srv *Server, method string) bool {
+ return srv.isRegisteredMethod(method)
}
+ internal.ServerFromContext = serverFromContext
internal.AddGlobalServerOptions = func(opt ...ServerOption) {
globalServerOptions = append(globalServerOptions, opt...)
}
@@ -81,6 +82,7 @@ func init() {
}
internal.BinaryLogger = binaryLogger
internal.JoinServerOptions = newJoinServerOption
+ internal.RecvBufferPool = recvBufferPool
}
var statusOK = status.New(codes.OK, "")
@@ -134,12 +136,14 @@ type Server struct {
quit *grpcsync.Event
done *grpcsync.Event
channelzRemoveOnce sync.Once
- serveWG sync.WaitGroup // counts active Serve goroutines for GracefulStop
+ serveWG sync.WaitGroup // counts active Serve goroutines for Stop/GracefulStop
+ handlersWG sync.WaitGroup // counts active method handler goroutines
channelzID *channelz.Identifier
czData *channelzData
- serverWorkerChannel chan func()
+ serverWorkerChannel chan func()
+ serverWorkerChannelClose func()
}
type serverOptions struct {
@@ -170,6 +174,7 @@ type serverOptions struct {
headerTableSize *uint32
numServerWorkers uint32
recvBufferPool SharedBufferPool
+ waitForHandlers bool
}
var defaultServerOptions = serverOptions{
@@ -567,6 +572,21 @@ func NumStreamWorkers(numServerWorkers uint32) ServerOption {
})
}
+// WaitForHandlers cause Stop to wait until all outstanding method handlers have
+// exited before returning. If false, Stop will return as soon as all
+// connections have closed, but method handlers may still be running. By
+// default, Stop does not wait for method handlers to return.
+//
+// # Experimental
+//
+// Notice: This API is EXPERIMENTAL and may be changed or removed in a
+// later release.
+func WaitForHandlers(w bool) ServerOption {
+ return newFuncServerOption(func(o *serverOptions) {
+ o.waitForHandlers = w
+ })
+}
+
// RecvBufferPool returns a ServerOption that configures the server
// to use the provided shared buffer pool for parsing incoming messages. Depending
// on the application's workload, this could result in reduced memory allocation.
@@ -578,11 +598,13 @@ func NumStreamWorkers(numServerWorkers uint32) ServerOption {
// options are used: StatsHandler, EnableTracing, or binary logging. In such
// cases, the shared buffer pool will be ignored.
//
-// # Experimental
-//
-// Notice: This API is EXPERIMENTAL and may be changed or removed in a
-// later release.
+// Deprecated: use experimental.WithRecvBufferPool instead. Will be deleted in
+// v1.60.0 or later.
func RecvBufferPool(bufferPool SharedBufferPool) ServerOption {
+ return recvBufferPool(bufferPool)
+}
+
+func recvBufferPool(bufferPool SharedBufferPool) ServerOption {
return newFuncServerOption(func(o *serverOptions) {
o.recvBufferPool = bufferPool
})
@@ -616,15 +638,14 @@ func (s *Server) serverWorker() {
// connections to reduce the time spent overall on runtime.morestack.
func (s *Server) initServerWorkers() {
s.serverWorkerChannel = make(chan func())
+ s.serverWorkerChannelClose = grpcsync.OnceFunc(func() {
+ close(s.serverWorkerChannel)
+ })
for i := uint32(0); i < s.opts.numServerWorkers; i++ {
go s.serverWorker()
}
}
-func (s *Server) stopServerWorkers() {
- close(s.serverWorkerChannel)
-}
-
// NewServer creates a gRPC server which has no service registered and has not
// started to accept requests yet.
func NewServer(opt ...ServerOption) *Server {
@@ -806,6 +827,18 @@ func (l *listenSocket) Close() error {
// Serve returns when lis.Accept fails with fatal errors. lis will be closed when
// this method returns.
// Serve will return a non-nil error unless Stop or GracefulStop is called.
+//
+// Note: All supported releases of Go (as of December 2023) override the OS
+// defaults for TCP keepalive time and interval to 15s. To enable TCP keepalive
+// with OS defaults for keepalive time and interval, callers need to do the
+// following two things:
+// - pass a net.Listener created by calling the Listen method on a
+// net.ListenConfig with the `KeepAlive` field set to a negative value. This
+// will result in the Go standard library not overriding OS defaults for TCP
+// keepalive interval and time. But this will also result in the Go standard
+// library not enabling TCP keepalives by default.
+// - override the Accept method on the passed in net.Listener and set the
+// SO_KEEPALIVE socket option to enable TCP keepalives, with OS defaults.
func (s *Server) Serve(lis net.Listener) error {
s.mu.Lock()
s.printf("serving")
@@ -913,24 +946,21 @@ func (s *Server) handleRawConn(lisAddr string, rawConn net.Conn) {
return
}
+ if cc, ok := rawConn.(interface {
+ PassServerTransport(transport.ServerTransport)
+ }); ok {
+ cc.PassServerTransport(st)
+ }
+
if !s.addConn(lisAddr, st) {
return
}
go func() {
- s.serveStreams(st)
+ s.serveStreams(context.Background(), st, rawConn)
s.removeConn(lisAddr, st)
}()
}
-func (s *Server) drainServerTransports(addr string) {
- s.mu.Lock()
- conns := s.conns[addr]
- for st := range conns {
- st.Drain("")
- }
- s.mu.Unlock()
-}
-
// newHTTP2Transport sets up a http/2 transport (using the
// gRPC http2 server transport in transport/http2_server.go).
func (s *Server) newHTTP2Transport(c net.Conn) transport.ServerTransport {
@@ -971,18 +1001,31 @@ func (s *Server) newHTTP2Transport(c net.Conn) transport.ServerTransport {
return st
}
-func (s *Server) serveStreams(st transport.ServerTransport) {
- defer st.Close(errors.New("finished serving streams for the server transport"))
- var wg sync.WaitGroup
+func (s *Server) serveStreams(ctx context.Context, st transport.ServerTransport, rawConn net.Conn) {
+ ctx = transport.SetConnection(ctx, rawConn)
+ ctx = peer.NewContext(ctx, st.Peer())
+ for _, sh := range s.opts.statsHandlers {
+ ctx = sh.TagConn(ctx, &stats.ConnTagInfo{
+ RemoteAddr: st.Peer().Addr,
+ LocalAddr: st.Peer().LocalAddr,
+ })
+ sh.HandleConn(ctx, &stats.ConnBegin{})
+ }
- streamQuota := newHandlerQuota(s.opts.maxConcurrentStreams)
- st.HandleStreams(func(stream *transport.Stream) {
- wg.Add(1)
+ defer func() {
+ st.Close(errors.New("finished serving streams for the server transport"))
+ for _, sh := range s.opts.statsHandlers {
+ sh.HandleConn(ctx, &stats.ConnEnd{})
+ }
+ }()
+ streamQuota := newHandlerQuota(s.opts.maxConcurrentStreams)
+ st.HandleStreams(ctx, func(stream *transport.Stream) {
+ s.handlersWG.Add(1)
streamQuota.acquire()
f := func() {
defer streamQuota.release()
- defer wg.Done()
+ defer s.handlersWG.Done()
s.handleStream(st, stream)
}
@@ -996,7 +1039,6 @@ func (s *Server) serveStreams(st transport.ServerTransport) {
}
go f()
})
- wg.Wait()
}
var _ http.Handler = (*Server)(nil)
@@ -1040,7 +1082,7 @@ func (s *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) {
return
}
defer s.removeConn(listenerAddressForServeHTTP, st)
- s.serveStreams(st)
+ s.serveStreams(r.Context(), st, nil)
}
func (s *Server) addConn(addr string, st transport.ServerTransport) bool {
@@ -1689,6 +1731,7 @@ func (s *Server) processStreamingRPC(ctx context.Context, t transport.ServerTran
func (s *Server) handleStream(t transport.ServerTransport, stream *transport.Stream) {
ctx := stream.Context()
+ ctx = contextWithServer(ctx, s)
var ti *traceInfo
if EnableTracing {
tr := trace.New("grpc.Recv."+methodFamily(stream.Method()), stream.Method())
@@ -1697,7 +1740,7 @@ func (s *Server) handleStream(t transport.ServerTransport, stream *transport.Str
tr: tr,
firstLine: firstLine{
client: false,
- remoteAddr: t.RemoteAddr(),
+ remoteAddr: t.Peer().Addr,
},
}
if dl, ok := ctx.Deadline(); ok {
@@ -1731,6 +1774,22 @@ func (s *Server) handleStream(t transport.ServerTransport, stream *transport.Str
service := sm[:pos]
method := sm[pos+1:]
+ md, _ := metadata.FromIncomingContext(ctx)
+ for _, sh := range s.opts.statsHandlers {
+ ctx = sh.TagRPC(ctx, &stats.RPCTagInfo{FullMethodName: stream.Method()})
+ sh.HandleRPC(ctx, &stats.InHeader{
+ FullMethod: stream.Method(),
+ RemoteAddr: t.Peer().Addr,
+ LocalAddr: t.Peer().LocalAddr,
+ Compression: stream.RecvCompress(),
+ WireLength: stream.HeaderWireLength(),
+ Header: md,
+ })
+ }
+ // To have calls in stream callouts work. Will delete once all stats handler
+ // calls come from the gRPC layer.
+ stream.SetContext(ctx)
+
srv, knownService := s.services[service]
if knownService {
if md, ok := srv.methods[method]; ok {
@@ -1820,62 +1879,72 @@ func ServerTransportStreamFromContext(ctx context.Context) ServerTransportStream
// pending RPCs on the client side will get notified by connection
// errors.
func (s *Server) Stop() {
- s.quit.Fire()
+ s.stop(false)
+}
- defer func() {
- s.serveWG.Wait()
- s.done.Fire()
- }()
+// GracefulStop stops the gRPC server gracefully. It stops the server from
+// accepting new connections and RPCs and blocks until all the pending RPCs are
+// finished.
+func (s *Server) GracefulStop() {
+ s.stop(true)
+}
+
+func (s *Server) stop(graceful bool) {
+ s.quit.Fire()
+ defer s.done.Fire()
s.channelzRemoveOnce.Do(func() { channelz.RemoveEntry(s.channelzID) })
s.mu.Lock()
- listeners := s.lis
- s.lis = nil
- conns := s.conns
- s.conns = nil
- // interrupt GracefulStop if Stop and GracefulStop are called concurrently.
- s.cv.Broadcast()
+ s.closeListenersLocked()
+ // Wait for serving threads to be ready to exit. Only then can we be sure no
+ // new conns will be created.
s.mu.Unlock()
+ s.serveWG.Wait()
- for lis := range listeners {
- lis.Close()
+ s.mu.Lock()
+ defer s.mu.Unlock()
+
+ if graceful {
+ s.drainAllServerTransportsLocked()
+ } else {
+ s.closeServerTransportsLocked()
}
- for _, cs := range conns {
- for st := range cs {
- st.Close(errors.New("Server.Stop called"))
- }
+
+ for len(s.conns) != 0 {
+ s.cv.Wait()
}
+ s.conns = nil
+
if s.opts.numServerWorkers > 0 {
- s.stopServerWorkers()
+ // Closing the channel (only once, via grpcsync.OnceFunc) after all the
+ // connections have been closed above ensures that there are no
+ // goroutines executing the callback passed to st.HandleStreams (where
+ // the channel is written to).
+ s.serverWorkerChannelClose()
+ }
+
+ if graceful || s.opts.waitForHandlers {
+ s.handlersWG.Wait()
}
- s.mu.Lock()
if s.events != nil {
s.events.Finish()
s.events = nil
}
- s.mu.Unlock()
}
-// GracefulStop stops the gRPC server gracefully. It stops the server from
-// accepting new connections and RPCs and blocks until all the pending RPCs are
-// finished.
-func (s *Server) GracefulStop() {
- s.quit.Fire()
- defer s.done.Fire()
-
- s.channelzRemoveOnce.Do(func() { channelz.RemoveEntry(s.channelzID) })
- s.mu.Lock()
- if s.conns == nil {
- s.mu.Unlock()
- return
+// s.mu must be held by the caller.
+func (s *Server) closeServerTransportsLocked() {
+ for _, conns := range s.conns {
+ for st := range conns {
+ st.Close(errors.New("Server.Stop called"))
+ }
}
+}
- for lis := range s.lis {
- lis.Close()
- }
- s.lis = nil
+// s.mu must be held by the caller.
+func (s *Server) drainAllServerTransportsLocked() {
if !s.drain {
for _, conns := range s.conns {
for st := range conns {
@@ -1884,22 +1953,14 @@ func (s *Server) GracefulStop() {
}
s.drain = true
}
+}
- // Wait for serving threads to be ready to exit. Only then can we be sure no
- // new conns will be created.
- s.mu.Unlock()
- s.serveWG.Wait()
- s.mu.Lock()
-
- for len(s.conns) != 0 {
- s.cv.Wait()
- }
- s.conns = nil
- if s.events != nil {
- s.events.Finish()
- s.events = nil
+// s.mu must be held by the caller.
+func (s *Server) closeListenersLocked() {
+ for lis := range s.lis {
+ lis.Close()
}
- s.mu.Unlock()
+ s.lis = nil
}
// contentSubtype must be lowercase
@@ -1913,11 +1974,50 @@ func (s *Server) getCodec(contentSubtype string) baseCodec {
}
codec := encoding.GetCodec(contentSubtype)
if codec == nil {
+ logger.Warningf("Unsupported codec %q. Defaulting to %q for now. This will start to fail in future releases.", contentSubtype, proto.Name)
return encoding.GetCodec(proto.Name)
}
return codec
}
+type serverKey struct{}
+
+// serverFromContext gets the Server from the context.
+func serverFromContext(ctx context.Context) *Server {
+ s, _ := ctx.Value(serverKey{}).(*Server)
+ return s
+}
+
+// contextWithServer sets the Server in the context.
+func contextWithServer(ctx context.Context, server *Server) context.Context {
+ return context.WithValue(ctx, serverKey{}, server)
+}
+
+// isRegisteredMethod returns whether the passed in method is registered as a
+// method on the server. /service/method and service/method will match if the
+// service and method are registered on the server.
+func (s *Server) isRegisteredMethod(serviceMethod string) bool {
+ if serviceMethod != "" && serviceMethod[0] == '/' {
+ serviceMethod = serviceMethod[1:]
+ }
+ pos := strings.LastIndex(serviceMethod, "/")
+ if pos == -1 { // Invalid method name syntax.
+ return false
+ }
+ service := serviceMethod[:pos]
+ method := serviceMethod[pos+1:]
+ srv, knownService := s.services[service]
+ if knownService {
+ if _, ok := srv.methods[method]; ok {
+ return true
+ }
+ if _, ok := srv.streams[method]; ok {
+ return true
+ }
+ }
+ return false
+}
+
// SetHeader sets the header metadata to be sent from the server to the client.
// The context provided must be the context passed to the server's handler.
//
diff --git a/vendor/google.golang.org/grpc/stream.go b/vendor/google.golang.org/grpc/stream.go
index b14b2fbea..d621f52b1 100644
--- a/vendor/google.golang.org/grpc/stream.go
+++ b/vendor/google.golang.org/grpc/stream.go
@@ -48,6 +48,8 @@ import (
"google.golang.org/grpc/status"
)
+var metadataFromOutgoingContextRaw = internal.FromOutgoingContextRaw.(func(context.Context) (metadata.MD, [][]string, bool))
+
// StreamHandler defines the handler called by gRPC server to complete the
// execution of a streaming RPC.
//
@@ -184,7 +186,7 @@ func newClientStream(ctx context.Context, desc *StreamDesc, cc *ClientConn, meth
// when the RPC completes.
opts = append([]CallOption{OnFinish(func(error) { cc.idlenessMgr.OnCallEnd() })}, opts...)
- if md, added, ok := metadata.FromOutgoingContextRaw(ctx); ok {
+ if md, added, ok := metadataFromOutgoingContextRaw(ctx); ok {
// validate md
if err := imetadata.Validate(md); err != nil {
return nil, status.Error(codes.Internal, err.Error())
diff --git a/vendor/google.golang.org/grpc/version.go b/vendor/google.golang.org/grpc/version.go
index 6d2cadd79..f1aec4c0a 100644
--- a/vendor/google.golang.org/grpc/version.go
+++ b/vendor/google.golang.org/grpc/version.go
@@ -19,4 +19,4 @@
package grpc
// Version is the current grpc version.
-const Version = "1.59.0"
+const Version = "1.61.1"
diff --git a/vendor/google.golang.org/grpc/vet.sh b/vendor/google.golang.org/grpc/vet.sh
index bb480f1f9..5da38a409 100644
--- a/vendor/google.golang.org/grpc/vet.sh
+++ b/vendor/google.golang.org/grpc/vet.sh
@@ -35,7 +35,6 @@ if [[ "$1" = "-install" ]]; then
# Install the pinned versions as defined in module tools.
pushd ./test/tools
go install \
- golang.org/x/lint/golint \
golang.org/x/tools/cmd/goimports \
honnef.co/go/tools/cmd/staticcheck \
github.com/client9/misspell/cmd/misspell
@@ -77,15 +76,19 @@ fi
not grep 'func Test[^(]' *_test.go
not grep 'func Test[^(]' test/*.go
+# - Check for typos in test function names
+git grep 'func (s) ' -- "*_test.go" | not grep -v 'func (s) Test'
+git grep 'func [A-Z]' -- "*_test.go" | not grep -v 'func Test\|Benchmark\|Example'
+
# - Do not import x/net/context.
not git grep -l 'x/net/context' -- "*.go"
# - Do not import math/rand for real library code. Use internal/grpcrand for
# thread safety.
-git grep -l '"math/rand"' -- "*.go" 2>&1 | not grep -v '^examples\|^stress\|grpcrand\|^benchmark\|wrr_test'
+git grep -l '"math/rand"' -- "*.go" 2>&1 | not grep -v '^examples\|^interop/stress\|grpcrand\|^benchmark\|wrr_test'
# - Do not use "interface{}"; use "any" instead.
-git grep -l 'interface{}' -- "*.go" 2>&1 | not grep -v '\.pb\.go\|protoc-gen-go-grpc'
+git grep -l 'interface{}' -- "*.go" 2>&1 | not grep -v '\.pb\.go\|protoc-gen-go-grpc\|grpc_testing_not_regenerate'
# - Do not call grpclog directly. Use grpclog.Component instead.
git grep -l -e 'grpclog.I' --or -e 'grpclog.W' --or -e 'grpclog.E' --or -e 'grpclog.F' --or -e 'grpclog.V' -- "*.go" | not grep -v '^grpclog/component.go\|^internal/grpctest/tlogger_test.go'
@@ -94,15 +97,14 @@ git grep -l -e 'grpclog.I' --or -e 'grpclog.W' --or -e 'grpclog.E' --or -e 'grpc
not git grep "\(import \|^\s*\)\"github.com/golang/protobuf/ptypes/" -- "*.go"
# - Ensure all usages of grpc_testing package are renamed when importing.
-not git grep "\(import \|^\s*\)\"google.golang.org/grpc/interop/grpc_testing" -- "*.go"
+not git grep "\(import \|^\s*\)\"google.golang.org/grpc/interop/grpc_testing" -- "*.go"
# - Ensure all xds proto imports are renamed to *pb or *grpc.
git grep '"github.com/envoyproxy/go-control-plane/envoy' -- '*.go' ':(exclude)*.pb.go' | not grep -v 'pb "\|grpc "'
misspell -error .
-# - gofmt, goimports, golint (with exceptions for generated code), go vet,
-# go mod tidy.
+# - gofmt, goimports, go vet, go mod tidy.
# Perform these checks on each module inside gRPC.
for MOD_FILE in $(find . -name 'go.mod'); do
MOD_DIR=$(dirname ${MOD_FILE})
@@ -110,7 +112,6 @@ for MOD_FILE in $(find . -name 'go.mod'); do
go vet -all ./... | fail_on_output
gofmt -s -d -l . 2>&1 | fail_on_output
goimports -l . 2>&1 | not grep -vE "\.pb\.go"
- golint ./... 2>&1 | not grep -vE "/grpc_testing_not_regenerate/.*\.pb\.go:"
go mod tidy -compat=1.19
git status --porcelain 2>&1 | fail_on_output || \
@@ -119,94 +120,71 @@ for MOD_FILE in $(find . -name 'go.mod'); do
done
# - Collection of static analysis checks
-#
-# TODO(dfawley): don't use deprecated functions in examples or first-party
-# plugins.
-# TODO(dfawley): enable ST1019 (duplicate imports) but allow for protobufs.
SC_OUT="$(mktemp)"
-staticcheck -go 1.19 -checks 'inherit,-ST1015,-ST1019,-SA1019' ./... > "${SC_OUT}" || true
-# Error if anything other than deprecation warnings are printed.
-not grep -v "is deprecated:.*SA1019" "${SC_OUT}"
-# Only ignore the following deprecated types/fields/functions.
-not grep -Fv '.CredsBundle
-.HeaderMap
-.Metadata is deprecated: use Attributes
-.NewAddress
-.NewServiceConfig
-.Type is deprecated: use Attributes
-BuildVersion is deprecated
-balancer.ErrTransientFailure
-balancer.Picker
-extDesc.Filename is deprecated
-github.com/golang/protobuf/jsonpb is deprecated
-grpc.CallCustomCodec
-grpc.Code
-grpc.Compressor
-grpc.CustomCodec
-grpc.Decompressor
-grpc.MaxMsgSize
-grpc.MethodConfig
-grpc.NewGZIPCompressor
-grpc.NewGZIPDecompressor
-grpc.RPCCompressor
-grpc.RPCDecompressor
-grpc.ServiceConfig
-grpc.WithCompressor
-grpc.WithDecompressor
-grpc.WithDialer
-grpc.WithMaxMsgSize
-grpc.WithServiceConfig
-grpc.WithTimeout
-http.CloseNotifier
-info.SecurityVersion
-proto is deprecated
-proto.InternalMessageInfo is deprecated
-proto.EnumName is deprecated
-proto.ErrInternalBadWireType is deprecated
-proto.FileDescriptor is deprecated
-proto.Marshaler is deprecated
-proto.MessageType is deprecated
-proto.RegisterEnum is deprecated
-proto.RegisterFile is deprecated
-proto.RegisterType is deprecated
-proto.RegisterExtension is deprecated
-proto.RegisteredExtension is deprecated
-proto.RegisteredExtensions is deprecated
-proto.RegisterMapType is deprecated
-proto.Unmarshaler is deprecated
+staticcheck -go 1.19 -checks 'all' ./... > "${SC_OUT}" || true
+
+# Error for anything other than checks that need exclusions.
+grep -v "(ST1000)" "${SC_OUT}" | grep -v "(SA1019)" | grep -v "(ST1003)" | not grep -v "(ST1019)\|\(other import of\)"
+
+# Exclude underscore checks for generated code.
+grep "(ST1003)" "${SC_OUT}" | not grep -v '\(.pb.go:\)\|\(code_string_test.go:\)\|\(grpc_testing_not_regenerate\)'
+
+# Error for duplicate imports not including grpc protos.
+grep "(ST1019)\|\(other import of\)" "${SC_OUT}" | not grep -Fv 'XXXXX PleaseIgnoreUnused
+channelz/grpc_channelz_v1"
+go-control-plane/envoy
+grpclb/grpc_lb_v1"
+health/grpc_health_v1"
+interop/grpc_testing"
+orca/v3"
+proto/grpc_gcp"
+proto/grpc_lookup_v1"
+reflection/grpc_reflection_v1"
+reflection/grpc_reflection_v1alpha"
+XXXXX PleaseIgnoreUnused'
+
+# Error for any package comments not in generated code.
+grep "(ST1000)" "${SC_OUT}" | not grep -v "\.pb\.go:"
+
+# Only ignore the following deprecated types/fields/functions and exclude
+# generated code.
+grep "(SA1019)" "${SC_OUT}" | not grep -Fv 'XXXXX PleaseIgnoreUnused
+XXXXX Protobuf related deprecation errors:
+"github.com/golang/protobuf
+.pb.go:
+grpc_testing_not_regenerate
+: ptypes.
+proto.RegisterType
+XXXXX gRPC internal usage deprecation errors:
+"google.golang.org/grpc
+: grpc.
+: v1alpha.
+: v1alphareflectionpb.
+BalancerAttributes is deprecated:
+CredsBundle is deprecated:
+Metadata is deprecated: use Attributes instead.
+NewSubConn is deprecated:
+OverrideServerName is deprecated:
+RemoveSubConn is deprecated:
+SecurityVersion is deprecated:
Target is deprecated: Use the Target field in the BuildOptions instead.
-xxx_messageInfo_
-' "${SC_OUT}"
-
-# - special golint on package comments.
-lint_package_comment_per_package() {
- # Number of files in this go package.
- fileCount=$(go list -f '{{len .GoFiles}}' $1)
- if [ ${fileCount} -eq 0 ]; then
- return 0
- fi
- # Number of package errors generated by golint.
- lintPackageCommentErrorsCount=$(golint --min_confidence 0 $1 | grep -c "should have a package comment")
- # golint complains about every file that's missing the package comment. If the
- # number of files for this package is greater than the number of errors, there's
- # at least one file with package comment, good. Otherwise, fail.
- if [ ${fileCount} -le ${lintPackageCommentErrorsCount} ]; then
- echo "Package $1 (with ${fileCount} files) is missing package comment"
- return 1
- fi
-}
-lint_package_comment() {
- set +ex
-
- count=0
- for i in $(go list ./...); do
- lint_package_comment_per_package "$i"
- ((count += $?))
- done
-
- set -ex
- return $count
-}
-lint_package_comment
+UpdateAddresses is deprecated:
+UpdateSubConnState is deprecated:
+balancer.ErrTransientFailure is deprecated:
+grpc/reflection/v1alpha/reflection.proto
+XXXXX xDS deprecated fields we support
+.ExactMatch
+.PrefixMatch
+.SafeRegexMatch
+.SuffixMatch
+GetContainsMatch
+GetExactMatch
+GetMatchSubjectAltNames
+GetPrefixMatch
+GetSafeRegexMatch
+GetSuffixMatch
+GetTlsCertificateCertificateProviderInstance
+GetValidationContextCertificateProviderInstance
+XXXXX PleaseIgnoreUnused'
echo SUCCESS