diff options
| author | 2024-03-11 15:34:34 +0100 | |
|---|---|---|
| committer | 2024-03-11 15:34:34 +0100 | |
| commit | 5e871e81a87a638b07d540c15d1b95608843255d (patch) | |
| tree | 62db65c7de651bac3d8894f4f70e0fe8de853a5e /vendor/google.golang.org/grpc | |
| parent | [chore]: Bump github.com/minio/minio-go/v7 from 7.0.67 to 7.0.69 (#2748) (diff) | |
| download | gotosocial-5e871e81a87a638b07d540c15d1b95608843255d.tar.xz | |
[chore] Update usage of OTEL libraries (#2725)
* otel to 1.24
* prometheus exporter to 0.46
* bunotel to 1.1.17
Also:
* Use schemaless URL for metrics
* Add software version to tracing schema
Diffstat (limited to 'vendor/google.golang.org/grpc')
40 files changed, 1571 insertions, 1433 deletions
| diff --git a/vendor/google.golang.org/grpc/balancer_conn_wrappers.go b/vendor/google.golang.org/grpc/balancer_wrapper.go index a4411c22b..b5e30cff0 100644 --- a/vendor/google.golang.org/grpc/balancer_conn_wrappers.go +++ b/vendor/google.golang.org/grpc/balancer_wrapper.go @@ -32,21 +32,13 @@ import (  	"google.golang.org/grpc/resolver"  ) -type ccbMode int - -const ( -	ccbModeActive = iota -	ccbModeIdle -	ccbModeClosed -	ccbModeExitingIdle -) -  // ccBalancerWrapper sits between the ClientConn and the Balancer.  //  // ccBalancerWrapper implements methods corresponding to the ones on the  // balancer.Balancer interface. The ClientConn is free to call these methods  // concurrently and the ccBalancerWrapper ensures that calls from the ClientConn -// to the Balancer happen synchronously and in order. +// to the Balancer happen in order by performing them in the serializer, without +// any mutexes held.  //  // ccBalancerWrapper also implements the balancer.ClientConn interface and is  // passed to the Balancer implementations. It invokes unexported methods on the @@ -57,87 +49,75 @@ const (  type ccBalancerWrapper struct {  	// The following fields are initialized when the wrapper is created and are  	// read-only afterwards, and therefore can be accessed without a mutex. -	cc   *ClientConn -	opts balancer.BuildOptions +	cc               *ClientConn +	opts             balancer.BuildOptions +	serializer       *grpcsync.CallbackSerializer +	serializerCancel context.CancelFunc -	// Outgoing (gRPC --> balancer) calls are guaranteed to execute in a -	// mutually exclusive manner as they are scheduled in the serializer. Fields -	// accessed *only* in these serializer callbacks, can therefore be accessed -	// without a mutex. -	balancer        *gracefulswitch.Balancer +	// The following fields are only accessed within the serializer or during +	// initialization.  	curBalancerName string +	balancer        *gracefulswitch.Balancer -	// mu guards access to the below fields. Access to the serializer and its -	// cancel function needs to be mutex protected because they are overwritten -	// when the wrapper exits idle mode. -	mu               sync.Mutex -	serializer       *grpcsync.CallbackSerializer // To serialize all outoing calls. -	serializerCancel context.CancelFunc           // To close the seralizer at close/enterIdle time. -	mode             ccbMode                      // Tracks the current mode of the wrapper. +	// The following field is protected by mu.  Caller must take cc.mu before +	// taking mu. +	mu     sync.Mutex +	closed bool  } -// newCCBalancerWrapper creates a new balancer wrapper. The underlying balancer -// is not created until the switchTo() method is invoked. -func newCCBalancerWrapper(cc *ClientConn, bopts balancer.BuildOptions) *ccBalancerWrapper { -	ctx, cancel := context.WithCancel(context.Background()) +// newCCBalancerWrapper creates a new balancer wrapper in idle state. The +// underlying balancer is not created until the switchTo() method is invoked. +func newCCBalancerWrapper(cc *ClientConn) *ccBalancerWrapper { +	ctx, cancel := context.WithCancel(cc.ctx)  	ccb := &ccBalancerWrapper{ -		cc:               cc, -		opts:             bopts, +		cc: cc, +		opts: balancer.BuildOptions{ +			DialCreds:        cc.dopts.copts.TransportCredentials, +			CredsBundle:      cc.dopts.copts.CredsBundle, +			Dialer:           cc.dopts.copts.Dialer, +			Authority:        cc.authority, +			CustomUserAgent:  cc.dopts.copts.UserAgent, +			ChannelzParentID: cc.channelzID, +			Target:           cc.parsedTarget, +		},  		serializer:       grpcsync.NewCallbackSerializer(ctx),  		serializerCancel: cancel,  	} -	ccb.balancer = gracefulswitch.NewBalancer(ccb, bopts) +	ccb.balancer = gracefulswitch.NewBalancer(ccb, ccb.opts)  	return ccb  }  // updateClientConnState is invoked by grpc to push a ClientConnState update to -// the underlying balancer. +// the underlying balancer.  This is always executed from the serializer, so +// it is safe to call into the balancer here.  func (ccb *ccBalancerWrapper) updateClientConnState(ccs *balancer.ClientConnState) error { -	ccb.mu.Lock() -	errCh := make(chan error, 1) -	// Here and everywhere else where Schedule() is called, it is done with the -	// lock held. But the lock guards only the scheduling part. The actual -	// callback is called asynchronously without the lock being held. -	ok := ccb.serializer.Schedule(func(_ context.Context) { -		errCh <- ccb.balancer.UpdateClientConnState(*ccs) +	errCh := make(chan error) +	ok := ccb.serializer.Schedule(func(ctx context.Context) { +		defer close(errCh) +		if ctx.Err() != nil || ccb.balancer == nil { +			return +		} +		err := ccb.balancer.UpdateClientConnState(*ccs) +		if logger.V(2) && err != nil { +			logger.Infof("error from balancer.UpdateClientConnState: %v", err) +		} +		errCh <- err  	})  	if !ok { -		// If we are unable to schedule a function with the serializer, it -		// indicates that it has been closed. A serializer is only closed when -		// the wrapper is closed or is in idle. -		ccb.mu.Unlock() -		return fmt.Errorf("grpc: cannot send state update to a closed or idle balancer") -	} -	ccb.mu.Unlock() - -	// We get here only if the above call to Schedule succeeds, in which case it -	// is guaranteed that the scheduled function will run. Therefore it is safe -	// to block on this channel. -	err := <-errCh -	if logger.V(2) && err != nil { -		logger.Infof("error from balancer.UpdateClientConnState: %v", err) +		return nil  	} -	return err -} - -// updateSubConnState is invoked by grpc to push a subConn state update to the -// underlying balancer. -func (ccb *ccBalancerWrapper) updateSubConnState(sc balancer.SubConn, s connectivity.State, err error) { -	ccb.mu.Lock() -	ccb.serializer.Schedule(func(_ context.Context) { -		// Even though it is optional for balancers, gracefulswitch ensures -		// opts.StateListener is set, so this cannot ever be nil. -		sc.(*acBalancerWrapper).stateListener(balancer.SubConnState{ConnectivityState: s, ConnectionError: err}) -	}) -	ccb.mu.Unlock() +	return <-errCh  } +// resolverError is invoked by grpc to push a resolver error to the underlying +// balancer.  The call to the balancer is executed from the serializer.  func (ccb *ccBalancerWrapper) resolverError(err error) { -	ccb.mu.Lock() -	ccb.serializer.Schedule(func(_ context.Context) { +	ccb.serializer.Schedule(func(ctx context.Context) { +		if ctx.Err() != nil || ccb.balancer == nil { +			return +		}  		ccb.balancer.ResolverError(err)  	}) -	ccb.mu.Unlock()  }  // switchTo is invoked by grpc to instruct the balancer wrapper to switch to the @@ -151,8 +131,10 @@ func (ccb *ccBalancerWrapper) resolverError(err error) {  // the ccBalancerWrapper keeps track of the current LB policy name, and skips  // the graceful balancer switching process if the name does not change.  func (ccb *ccBalancerWrapper) switchTo(name string) { -	ccb.mu.Lock() -	ccb.serializer.Schedule(func(_ context.Context) { +	ccb.serializer.Schedule(func(ctx context.Context) { +		if ctx.Err() != nil || ccb.balancer == nil { +			return +		}  		// TODO: Other languages use case-sensitive balancer registries. We should  		// switch as well. See: https://github.com/grpc/grpc-go/issues/5288.  		if strings.EqualFold(ccb.curBalancerName, name) { @@ -160,7 +142,6 @@ func (ccb *ccBalancerWrapper) switchTo(name string) {  		}  		ccb.buildLoadBalancingPolicy(name)  	}) -	ccb.mu.Unlock()  }  // buildLoadBalancingPolicy performs the following: @@ -187,115 +168,49 @@ func (ccb *ccBalancerWrapper) buildLoadBalancingPolicy(name string) {  	ccb.curBalancerName = builder.Name()  } +// close initiates async shutdown of the wrapper.  cc.mu must be held when +// calling this function.  To determine the wrapper has finished shutting down, +// the channel should block on ccb.serializer.Done() without cc.mu held.  func (ccb *ccBalancerWrapper) close() { -	channelz.Info(logger, ccb.cc.channelzID, "ccBalancerWrapper: closing") -	ccb.closeBalancer(ccbModeClosed) -} - -// enterIdleMode is invoked by grpc when the channel enters idle mode upon -// expiry of idle_timeout. This call blocks until the balancer is closed. -func (ccb *ccBalancerWrapper) enterIdleMode() { -	channelz.Info(logger, ccb.cc.channelzID, "ccBalancerWrapper: entering idle mode") -	ccb.closeBalancer(ccbModeIdle) -} - -// closeBalancer is invoked when the channel is being closed or when it enters -// idle mode upon expiry of idle_timeout. -func (ccb *ccBalancerWrapper) closeBalancer(m ccbMode) {  	ccb.mu.Lock() -	if ccb.mode == ccbModeClosed || ccb.mode == ccbModeIdle { -		ccb.mu.Unlock() -		return -	} - -	ccb.mode = m -	done := ccb.serializer.Done() -	b := ccb.balancer -	ok := ccb.serializer.Schedule(func(_ context.Context) { -		// Close the serializer to ensure that no more calls from gRPC are sent -		// to the balancer. -		ccb.serializerCancel() -		// Empty the current balancer name because we don't have a balancer -		// anymore and also so that we act on the next call to switchTo by -		// creating a new balancer specified by the new resolver. -		ccb.curBalancerName = "" -	}) -	if !ok { -		ccb.mu.Unlock() -		return -	} +	ccb.closed = true  	ccb.mu.Unlock() - -	// Give enqueued callbacks a chance to finish before closing the balancer. -	<-done -	b.Close() -} - -// exitIdleMode is invoked by grpc when the channel exits idle mode either -// because of an RPC or because of an invocation of the Connect() API. This -// recreates the balancer that was closed previously when entering idle mode. -// -// If the channel is not in idle mode, we know for a fact that we are here as a -// result of the user calling the Connect() method on the ClientConn. In this -// case, we can simply forward the call to the underlying balancer, instructing -// it to reconnect to the backends. -func (ccb *ccBalancerWrapper) exitIdleMode() { -	ccb.mu.Lock() -	if ccb.mode == ccbModeClosed { -		// Request to exit idle is a no-op when wrapper is already closed. -		ccb.mu.Unlock() -		return -	} - -	if ccb.mode == ccbModeIdle { -		// Recreate the serializer which was closed when we entered idle. -		ctx, cancel := context.WithCancel(context.Background()) -		ccb.serializer = grpcsync.NewCallbackSerializer(ctx) -		ccb.serializerCancel = cancel -	} - -	// The ClientConn guarantees that mutual exclusion between close() and -	// exitIdleMode(), and since we just created a new serializer, we can be -	// sure that the below function will be scheduled. -	done := make(chan struct{}) -	ccb.serializer.Schedule(func(_ context.Context) { -		defer close(done) - -		ccb.mu.Lock() -		defer ccb.mu.Unlock() - -		if ccb.mode != ccbModeIdle { -			ccb.balancer.ExitIdle() +	channelz.Info(logger, ccb.cc.channelzID, "ccBalancerWrapper: closing") +	ccb.serializer.Schedule(func(context.Context) { +		if ccb.balancer == nil {  			return  		} - -		// Gracefulswitch balancer does not support a switchTo operation after -		// being closed. Hence we need to create a new one here. -		ccb.balancer = gracefulswitch.NewBalancer(ccb, ccb.opts) -		ccb.mode = ccbModeActive -		channelz.Info(logger, ccb.cc.channelzID, "ccBalancerWrapper: exiting idle mode") - +		ccb.balancer.Close() +		ccb.balancer = nil  	}) -	ccb.mu.Unlock() - -	<-done +	ccb.serializerCancel()  } -func (ccb *ccBalancerWrapper) isIdleOrClosed() bool { -	ccb.mu.Lock() -	defer ccb.mu.Unlock() -	return ccb.mode == ccbModeIdle || ccb.mode == ccbModeClosed +// exitIdle invokes the balancer's exitIdle method in the serializer. +func (ccb *ccBalancerWrapper) exitIdle() { +	ccb.serializer.Schedule(func(ctx context.Context) { +		if ctx.Err() != nil || ccb.balancer == nil { +			return +		} +		ccb.balancer.ExitIdle() +	})  }  func (ccb *ccBalancerWrapper) NewSubConn(addrs []resolver.Address, opts balancer.NewSubConnOptions) (balancer.SubConn, error) { -	if ccb.isIdleOrClosed() { -		return nil, fmt.Errorf("grpc: cannot create SubConn when balancer is closed or idle") +	ccb.cc.mu.Lock() +	defer ccb.cc.mu.Unlock() + +	ccb.mu.Lock() +	if ccb.closed { +		ccb.mu.Unlock() +		return nil, fmt.Errorf("balancer is being closed; no new SubConns allowed")  	} +	ccb.mu.Unlock()  	if len(addrs) == 0 {  		return nil, fmt.Errorf("grpc: cannot create SubConn with empty address list")  	} -	ac, err := ccb.cc.newAddrConn(addrs, opts) +	ac, err := ccb.cc.newAddrConnLocked(addrs, opts)  	if err != nil {  		channelz.Warningf(logger, ccb.cc.channelzID, "acBalancerWrapper: NewSubConn: failed to newAddrConn: %v", err)  		return nil, err @@ -316,10 +231,6 @@ func (ccb *ccBalancerWrapper) RemoveSubConn(sc balancer.SubConn) {  }  func (ccb *ccBalancerWrapper) UpdateAddresses(sc balancer.SubConn, addrs []resolver.Address) { -	if ccb.isIdleOrClosed() { -		return -	} -  	acbw, ok := sc.(*acBalancerWrapper)  	if !ok {  		return @@ -328,25 +239,39 @@ func (ccb *ccBalancerWrapper) UpdateAddresses(sc balancer.SubConn, addrs []resol  }  func (ccb *ccBalancerWrapper) UpdateState(s balancer.State) { -	if ccb.isIdleOrClosed() { +	ccb.cc.mu.Lock() +	defer ccb.cc.mu.Unlock() + +	ccb.mu.Lock() +	if ccb.closed { +		ccb.mu.Unlock()  		return  	} - +	ccb.mu.Unlock()  	// Update picker before updating state.  Even though the ordering here does  	// not matter, it can lead to multiple calls of Pick in the common start-up  	// case where we wait for ready and then perform an RPC.  If the picker is  	// updated later, we could call the "connecting" picker when the state is  	// updated, and then call the "ready" picker after the picker gets updated. -	ccb.cc.blockingpicker.updatePicker(s.Picker) + +	// Note that there is no need to check if the balancer wrapper was closed, +	// as we know the graceful switch LB policy will not call cc if it has been +	// closed. +	ccb.cc.pickerWrapper.updatePicker(s.Picker)  	ccb.cc.csMgr.updateState(s.ConnectivityState)  }  func (ccb *ccBalancerWrapper) ResolveNow(o resolver.ResolveNowOptions) { -	if ccb.isIdleOrClosed() { +	ccb.cc.mu.RLock() +	defer ccb.cc.mu.RUnlock() + +	ccb.mu.Lock() +	if ccb.closed { +		ccb.mu.Unlock()  		return  	} - -	ccb.cc.resolveNow(o) +	ccb.mu.Unlock() +	ccb.cc.resolveNowLocked(o)  }  func (ccb *ccBalancerWrapper) Target() string { @@ -364,6 +289,20 @@ type acBalancerWrapper struct {  	producers map[balancer.ProducerBuilder]*refCountedProducer  } +// updateState is invoked by grpc to push a subConn state update to the +// underlying balancer. +func (acbw *acBalancerWrapper) updateState(s connectivity.State, err error) { +	acbw.ccb.serializer.Schedule(func(ctx context.Context) { +		if ctx.Err() != nil || acbw.ccb.balancer == nil { +			return +		} +		// Even though it is optional for balancers, gracefulswitch ensures +		// opts.StateListener is set, so this cannot ever be nil. +		// TODO: delete this comment when UpdateSubConnState is removed. +		acbw.stateListener(balancer.SubConnState{ConnectivityState: s, ConnectionError: err}) +	}) +} +  func (acbw *acBalancerWrapper) String() string {  	return fmt.Sprintf("SubConn(id:%d)", acbw.ac.channelzID.Int())  } @@ -377,20 +316,7 @@ func (acbw *acBalancerWrapper) Connect() {  }  func (acbw *acBalancerWrapper) Shutdown() { -	ccb := acbw.ccb -	if ccb.isIdleOrClosed() { -		// It it safe to ignore this call when the balancer is closed or in idle -		// because the ClientConn takes care of closing the connections. -		// -		// Not returning early from here when the balancer is closed or in idle -		// leads to a deadlock though, because of the following sequence of -		// calls when holding cc.mu: -		// cc.exitIdleMode --> ccb.enterIdleMode --> gsw.Close --> -		// ccb.RemoveAddrConn --> cc.removeAddrConn -		return -	} - -	ccb.cc.removeAddrConn(acbw.ac, errConnDrain) +	acbw.ccb.cc.removeAddrConn(acbw.ac, errConnDrain)  }  // NewStream begins a streaming RPC on the addrConn.  If the addrConn is not diff --git a/vendor/google.golang.org/grpc/binarylog/grpc_binarylog_v1/binarylog.pb.go b/vendor/google.golang.org/grpc/binarylog/grpc_binarylog_v1/binarylog.pb.go index 595480112..e9e97d451 100644 --- a/vendor/google.golang.org/grpc/binarylog/grpc_binarylog_v1/binarylog.pb.go +++ b/vendor/google.golang.org/grpc/binarylog/grpc_binarylog_v1/binarylog.pb.go @@ -430,7 +430,7 @@ type ClientHeader struct {  	MethodName string `protobuf:"bytes,2,opt,name=method_name,json=methodName,proto3" json:"method_name,omitempty"`  	// A single process may be used to run multiple virtual  	// servers with different identities. -	// The authority is the name of such a server identitiy. +	// The authority is the name of such a server identity.  	// It is typically a portion of the URI in the form of  	// <host> or <host>:<port> .  	Authority string `protobuf:"bytes,3,opt,name=authority,proto3" json:"authority,omitempty"` diff --git a/vendor/google.golang.org/grpc/clientconn.go b/vendor/google.golang.org/grpc/clientconn.go index 429c389e4..f6e815e6b 100644 --- a/vendor/google.golang.org/grpc/clientconn.go +++ b/vendor/google.golang.org/grpc/clientconn.go @@ -33,9 +33,7 @@ import (  	"google.golang.org/grpc/balancer/base"  	"google.golang.org/grpc/codes"  	"google.golang.org/grpc/connectivity" -	"google.golang.org/grpc/credentials"  	"google.golang.org/grpc/internal" -	"google.golang.org/grpc/internal/backoff"  	"google.golang.org/grpc/internal/channelz"  	"google.golang.org/grpc/internal/grpcsync"  	"google.golang.org/grpc/internal/idle" @@ -48,9 +46,9 @@ import (  	"google.golang.org/grpc/status"  	_ "google.golang.org/grpc/balancer/roundrobin"           // To register roundrobin. -	_ "google.golang.org/grpc/internal/resolver/dns"         // To register dns resolver.  	_ "google.golang.org/grpc/internal/resolver/passthrough" // To register passthrough resolver.  	_ "google.golang.org/grpc/internal/resolver/unix"        // To register unix resolver. +	_ "google.golang.org/grpc/resolver/dns"                  // To register dns resolver.  )  const ( @@ -119,23 +117,8 @@ func (dcs *defaultConfigSelector) SelectConfig(rpcInfo iresolver.RPCInfo) (*ires  	}, nil  } -// DialContext creates a client connection to the given target. By default, it's -// a non-blocking dial (the function won't wait for connections to be -// established, and connecting happens in the background). To make it a blocking -// dial, use WithBlock() dial option. -// -// In the non-blocking case, the ctx does not act against the connection. It -// only controls the setup steps. -// -// In the blocking case, ctx can be used to cancel or expire the pending -// connection. Once this function returns, the cancellation and expiration of -// ctx will be noop. Users should call ClientConn.Close to terminate all the -// pending operations after this function returns. -// -// The target name syntax is defined in -// https://github.com/grpc/grpc/blob/master/doc/naming.md. -// e.g. to use dns resolver, a "dns:///" prefix should be applied to the target. -func DialContext(ctx context.Context, target string, opts ...DialOption) (conn *ClientConn, err error) { +// newClient returns a new client in idle mode. +func newClient(target string, opts ...DialOption) (conn *ClientConn, err error) {  	cc := &ClientConn{  		target: target,  		conns:  make(map[*addrConn]struct{}), @@ -143,23 +126,11 @@ func DialContext(ctx context.Context, target string, opts ...DialOption) (conn *  		czData: new(channelzData),  	} -	// We start the channel off in idle mode, but kick it out of idle at the end -	// of this method, instead of waiting for the first RPC. Other gRPC -	// implementations do wait for the first RPC to kick the channel out of -	// idle. But doing so would be a major behavior change for our users who are -	// used to seeing the channel active after Dial. -	// -	// Taking this approach of kicking it out of idle at the end of this method -	// allows us to share the code between channel creation and exiting idle -	// mode. This will also make it easy for us to switch to starting the -	// channel off in idle, if at all we ever get to do that. -	cc.idlenessState = ccIdlenessStateIdle -  	cc.retryThrottler.Store((*retryThrottler)(nil))  	cc.safeConfigSelector.UpdateConfigSelector(&defaultConfigSelector{nil})  	cc.ctx, cc.cancel = context.WithCancel(context.Background()) -	cc.exitIdleCond = sync.NewCond(&cc.mu) +	// Apply dial options.  	disableGlobalOpts := false  	for _, opt := range opts {  		if _, ok := opt.(*disableGlobalDialOptions); ok { @@ -177,21 +148,9 @@ func DialContext(ctx context.Context, target string, opts ...DialOption) (conn *  	for _, opt := range opts {  		opt.apply(&cc.dopts)  	} -  	chainUnaryClientInterceptors(cc)  	chainStreamClientInterceptors(cc) -	defer func() { -		if err != nil { -			cc.Close() -		} -	}() - -	// Register ClientConn with channelz. -	cc.channelzRegistration(target) - -	cc.csMgr = newConnectivityStateManager(cc.ctx, cc.channelzID) -  	if err := cc.validateTransportCredentials(); err != nil {  		return nil, err  	} @@ -205,10 +164,80 @@ func DialContext(ctx context.Context, target string, opts ...DialOption) (conn *  	}  	cc.mkp = cc.dopts.copts.KeepaliveParams -	if cc.dopts.copts.UserAgent != "" { -		cc.dopts.copts.UserAgent += " " + grpcUA -	} else { -		cc.dopts.copts.UserAgent = grpcUA +	// Register ClientConn with channelz. +	cc.channelzRegistration(target) + +	// TODO: Ideally it should be impossible to error from this function after +	// channelz registration.  This will require removing some channelz logs +	// from the following functions that can error.  Errors can be returned to +	// the user, and successful logs can be emitted here, after the checks have +	// passed and channelz is subsequently registered. + +	// Determine the resolver to use. +	if err := cc.parseTargetAndFindResolver(); err != nil { +		channelz.RemoveEntry(cc.channelzID) +		return nil, err +	} +	if err = cc.determineAuthority(); err != nil { +		channelz.RemoveEntry(cc.channelzID) +		return nil, err +	} + +	cc.csMgr = newConnectivityStateManager(cc.ctx, cc.channelzID) +	cc.pickerWrapper = newPickerWrapper(cc.dopts.copts.StatsHandlers) + +	cc.initIdleStateLocked() // Safe to call without the lock, since nothing else has a reference to cc. +	cc.idlenessMgr = idle.NewManager((*idler)(cc), cc.dopts.idleTimeout) +	return cc, nil +} + +// DialContext creates a client connection to the given target. By default, it's +// a non-blocking dial (the function won't wait for connections to be +// established, and connecting happens in the background). To make it a blocking +// dial, use WithBlock() dial option. +// +// In the non-blocking case, the ctx does not act against the connection. It +// only controls the setup steps. +// +// In the blocking case, ctx can be used to cancel or expire the pending +// connection. Once this function returns, the cancellation and expiration of +// ctx will be noop. Users should call ClientConn.Close to terminate all the +// pending operations after this function returns. +// +// The target name syntax is defined in +// https://github.com/grpc/grpc/blob/master/doc/naming.md. +// e.g. to use dns resolver, a "dns:///" prefix should be applied to the target. +func DialContext(ctx context.Context, target string, opts ...DialOption) (conn *ClientConn, err error) { +	cc, err := newClient(target, opts...) +	if err != nil { +		return nil, err +	} + +	// We start the channel off in idle mode, but kick it out of idle now, +	// instead of waiting for the first RPC. Other gRPC implementations do wait +	// for the first RPC to kick the channel out of idle. But doing so would be +	// a major behavior change for our users who are used to seeing the channel +	// active after Dial. +	// +	// Taking this approach of kicking it out of idle at the end of this method +	// allows us to share the code between channel creation and exiting idle +	// mode. This will also make it easy for us to switch to starting the +	// channel off in idle, i.e. by making newClient exported. + +	defer func() { +		if err != nil { +			cc.Close() +		} +	}() + +	// This creates the name resolver, load balancer, etc. +	if err := cc.idlenessMgr.ExitIdleMode(); err != nil { +		return nil, err +	} + +	// Return now for non-blocking dials. +	if !cc.dopts.block { +		return cc, nil  	}  	if cc.dopts.timeout > 0 { @@ -231,49 +260,6 @@ func DialContext(ctx context.Context, target string, opts ...DialOption) (conn *  		}  	}() -	if cc.dopts.bs == nil { -		cc.dopts.bs = backoff.DefaultExponential -	} - -	// Determine the resolver to use. -	if err := cc.parseTargetAndFindResolver(); err != nil { -		return nil, err -	} -	if err = cc.determineAuthority(); err != nil { -		return nil, err -	} - -	if cc.dopts.scChan != nil { -		// Blocking wait for the initial service config. -		select { -		case sc, ok := <-cc.dopts.scChan: -			if ok { -				cc.sc = &sc -				cc.safeConfigSelector.UpdateConfigSelector(&defaultConfigSelector{&sc}) -			} -		case <-ctx.Done(): -			return nil, ctx.Err() -		} -	} -	if cc.dopts.scChan != nil { -		go cc.scWatcher() -	} - -	// This creates the name resolver, load balancer, blocking picker etc. -	if err := cc.exitIdleMode(); err != nil { -		return nil, err -	} - -	// Configure idleness support with configured idle timeout or default idle -	// timeout duration. Idleness can be explicitly disabled by the user, by -	// setting the dial option to 0. -	cc.idlenessMgr = idle.NewManager(idle.ManagerOptions{Enforcer: (*idler)(cc), Timeout: cc.dopts.idleTimeout, Logger: logger}) - -	// Return early for non-blocking dials. -	if !cc.dopts.block { -		return cc, nil -	} -  	// A blocking dial blocks until the clientConn is ready.  	for {  		s := cc.GetState() @@ -320,8 +306,8 @@ func (cc *ClientConn) addTraceEvent(msg string) {  type idler ClientConn -func (i *idler) EnterIdleMode() error { -	return (*ClientConn)(i).enterIdleMode() +func (i *idler) EnterIdleMode() { +	(*ClientConn)(i).enterIdleMode()  }  func (i *idler) ExitIdleMode() error { @@ -329,117 +315,71 @@ func (i *idler) ExitIdleMode() error {  }  // exitIdleMode moves the channel out of idle mode by recreating the name -// resolver and load balancer. -func (cc *ClientConn) exitIdleMode() error { +// resolver and load balancer.  This should never be called directly; use +// cc.idlenessMgr.ExitIdleMode instead. +func (cc *ClientConn) exitIdleMode() (err error) {  	cc.mu.Lock()  	if cc.conns == nil {  		cc.mu.Unlock()  		return errConnClosing  	} -	if cc.idlenessState != ccIdlenessStateIdle { -		channelz.Infof(logger, cc.channelzID, "ClientConn asked to exit idle mode, current mode is %v", cc.idlenessState) -		cc.mu.Unlock() -		return nil -	} - -	defer func() { -		// When Close() and exitIdleMode() race against each other, one of the -		// following two can happen: -		// - Close() wins the race and runs first. exitIdleMode() runs after, and -		//   sees that the ClientConn is already closed and hence returns early. -		// - exitIdleMode() wins the race and runs first and recreates the balancer -		//   and releases the lock before recreating the resolver. If Close() runs -		//   in this window, it will wait for exitIdleMode to complete. -		// -		// We achieve this synchronization using the below condition variable. -		cc.mu.Lock() -		cc.idlenessState = ccIdlenessStateActive -		cc.exitIdleCond.Signal() -		cc.mu.Unlock() -	}() - -	cc.idlenessState = ccIdlenessStateExitingIdle -	exitedIdle := false -	if cc.blockingpicker == nil { -		cc.blockingpicker = newPickerWrapper(cc.dopts.copts.StatsHandlers) -	} else { -		cc.blockingpicker.exitIdleMode() -		exitedIdle = true -	} - -	var credsClone credentials.TransportCredentials -	if creds := cc.dopts.copts.TransportCredentials; creds != nil { -		credsClone = creds.Clone() -	} -	if cc.balancerWrapper == nil { -		cc.balancerWrapper = newCCBalancerWrapper(cc, balancer.BuildOptions{ -			DialCreds:        credsClone, -			CredsBundle:      cc.dopts.copts.CredsBundle, -			Dialer:           cc.dopts.copts.Dialer, -			Authority:        cc.authority, -			CustomUserAgent:  cc.dopts.copts.UserAgent, -			ChannelzParentID: cc.channelzID, -			Target:           cc.parsedTarget, -		}) -	} else { -		cc.balancerWrapper.exitIdleMode() -	} -	cc.firstResolveEvent = grpcsync.NewEvent()  	cc.mu.Unlock()  	// This needs to be called without cc.mu because this builds a new resolver -	// which might update state or report error inline which needs to be handled -	// by cc.updateResolverState() which also grabs cc.mu. -	if err := cc.initResolverWrapper(credsClone); err != nil { +	// which might update state or report error inline, which would then need to +	// acquire cc.mu. +	if err := cc.resolverWrapper.start(); err != nil {  		return err  	} -	if exitedIdle { -		cc.addTraceEvent("exiting idle mode") -	} +	cc.addTraceEvent("exiting idle mode")  	return nil  } +// initIdleStateLocked initializes common state to how it should be while idle. +func (cc *ClientConn) initIdleStateLocked() { +	cc.resolverWrapper = newCCResolverWrapper(cc) +	cc.balancerWrapper = newCCBalancerWrapper(cc) +	cc.firstResolveEvent = grpcsync.NewEvent() +	// cc.conns == nil is a proxy for the ClientConn being closed. So, instead +	// of setting it to nil here, we recreate the map. This also means that we +	// don't have to do this when exiting idle mode. +	cc.conns = make(map[*addrConn]struct{}) +} +  // enterIdleMode puts the channel in idle mode, and as part of it shuts down the -// name resolver, load balancer and any subchannels. -func (cc *ClientConn) enterIdleMode() error { +// name resolver, load balancer, and any subchannels.  This should never be +// called directly; use cc.idlenessMgr.EnterIdleMode instead. +func (cc *ClientConn) enterIdleMode() {  	cc.mu.Lock() -	defer cc.mu.Unlock()  	if cc.conns == nil { -		return ErrClientConnClosing -	} -	if cc.idlenessState != ccIdlenessStateActive { -		channelz.Warningf(logger, cc.channelzID, "ClientConn asked to enter idle mode, current mode is %v", cc.idlenessState) -		return nil +		cc.mu.Unlock() +		return  	} -	// cc.conns == nil is a proxy for the ClientConn being closed. So, instead -	// of setting it to nil here, we recreate the map. This also means that we -	// don't have to do this when exiting idle mode.  	conns := cc.conns -	cc.conns = make(map[*addrConn]struct{}) -	// TODO: Currently, we close the resolver wrapper upon entering idle mode -	// and create a new one upon exiting idle mode. This means that the -	// `cc.resolverWrapper` field would be overwritten everytime we exit idle -	// mode. While this means that we need to hold `cc.mu` when accessing -	// `cc.resolverWrapper`, it makes the code simpler in the wrapper. We should -	// try to do the same for the balancer and picker wrappers too. -	cc.resolverWrapper.close() -	cc.blockingpicker.enterIdleMode() -	cc.balancerWrapper.enterIdleMode() +	rWrapper := cc.resolverWrapper +	rWrapper.close() +	cc.pickerWrapper.reset() +	bWrapper := cc.balancerWrapper +	bWrapper.close()  	cc.csMgr.updateState(connectivity.Idle) -	cc.idlenessState = ccIdlenessStateIdle  	cc.addTraceEvent("entering idle mode") -	go func() { -		for ac := range conns { -			ac.tearDown(errConnIdling) -		} -	}() +	cc.initIdleStateLocked() -	return nil +	cc.mu.Unlock() + +	// Block until the name resolver and LB policy are closed. +	<-rWrapper.serializer.Done() +	<-bWrapper.serializer.Done() + +	// Close all subchannels after the LB policy is closed. +	for ac := range conns { +		ac.tearDown(errConnIdling) +	}  }  // validateTransportCredentials performs a series of checks on the configured @@ -649,66 +589,35 @@ type ClientConn struct {  	dopts           dialOptions          // Default and user specified dial options.  	channelzID      *channelz.Identifier // Channelz identifier for the channel.  	resolverBuilder resolver.Builder     // See parseTargetAndFindResolver(). -	balancerWrapper *ccBalancerWrapper   // Uses gracefulswitch.balancer underneath. -	idlenessMgr     idle.Manager +	idlenessMgr     *idle.Manager  	// The following provide their own synchronization, and therefore don't  	// require cc.mu to be held to access them.  	csMgr              *connectivityStateManager -	blockingpicker     *pickerWrapper +	pickerWrapper      *pickerWrapper  	safeConfigSelector iresolver.SafeConfigSelector  	czData             *channelzData  	retryThrottler     atomic.Value // Updated from service config. -	// firstResolveEvent is used to track whether the name resolver sent us at -	// least one update. RPCs block on this event. -	firstResolveEvent *grpcsync.Event -  	// mu protects the following fields.  	// TODO: split mu so the same mutex isn't used for everything.  	mu              sync.RWMutex -	resolverWrapper *ccResolverWrapper         // Initialized in Dial; cleared in Close. +	resolverWrapper *ccResolverWrapper         // Always recreated whenever entering idle to simplify Close. +	balancerWrapper *ccBalancerWrapper         // Always recreated whenever entering idle to simplify Close.  	sc              *ServiceConfig             // Latest service config received from the resolver.  	conns           map[*addrConn]struct{}     // Set to nil on close.  	mkp             keepalive.ClientParameters // May be updated upon receipt of a GoAway. -	idlenessState   ccIdlenessState            // Tracks idleness state of the channel. -	exitIdleCond    *sync.Cond                 // Signalled when channel exits idle. +	// firstResolveEvent is used to track whether the name resolver sent us at +	// least one update. RPCs block on this event.  May be accessed without mu +	// if we know we cannot be asked to enter idle mode while accessing it (e.g. +	// when the idle manager has already been closed, or if we are already +	// entering idle mode). +	firstResolveEvent *grpcsync.Event  	lceMu               sync.Mutex // protects lastConnectionError  	lastConnectionError error  } -// ccIdlenessState tracks the idleness state of the channel. -// -// Channels start off in `active` and move to `idle` after a period of -// inactivity. When moving back to `active` upon an incoming RPC, they -// transition through `exiting_idle`. This state is useful for synchronization -// with Close(). -// -// This state tracking is mostly for self-protection. The idlenessManager is -// expected to keep track of the state as well, and is expected not to call into -// the ClientConn unnecessarily. -type ccIdlenessState int8 - -const ( -	ccIdlenessStateActive ccIdlenessState = iota -	ccIdlenessStateIdle -	ccIdlenessStateExitingIdle -) - -func (s ccIdlenessState) String() string { -	switch s { -	case ccIdlenessStateActive: -		return "active" -	case ccIdlenessStateIdle: -		return "idle" -	case ccIdlenessStateExitingIdle: -		return "exitingIdle" -	default: -		return "unknown" -	} -} -  // WaitForStateChange waits until the connectivity.State of ClientConn changes from sourceState or  // ctx expires. A true value is returned in former case and false in latter.  // @@ -748,29 +657,15 @@ func (cc *ClientConn) GetState() connectivity.State {  // Notice: This API is EXPERIMENTAL and may be changed or removed in a later  // release.  func (cc *ClientConn) Connect() { -	cc.exitIdleMode() +	if err := cc.idlenessMgr.ExitIdleMode(); err != nil { +		cc.addTraceEvent(err.Error()) +		return +	}  	// If the ClientConn was not in idle mode, we need to call ExitIdle on the  	// LB policy so that connections can be created. -	cc.balancerWrapper.exitIdleMode() -} - -func (cc *ClientConn) scWatcher() { -	for { -		select { -		case sc, ok := <-cc.dopts.scChan: -			if !ok { -				return -			} -			cc.mu.Lock() -			// TODO: load balance policy runtime change is ignored. -			// We may revisit this decision in the future. -			cc.sc = &sc -			cc.safeConfigSelector.UpdateConfigSelector(&defaultConfigSelector{&sc}) -			cc.mu.Unlock() -		case <-cc.ctx.Done(): -			return -		} -	} +	cc.mu.Lock() +	cc.balancerWrapper.exitIdle() +	cc.mu.Unlock()  }  // waitForResolvedAddrs blocks until the resolver has provided addresses or the @@ -804,11 +699,11 @@ func init() {  	internal.SubscribeToConnectivityStateChanges = func(cc *ClientConn, s grpcsync.Subscriber) func() {  		return cc.csMgr.pubSub.Subscribe(s)  	} -	internal.EnterIdleModeForTesting = func(cc *ClientConn) error { -		return cc.enterIdleMode() +	internal.EnterIdleModeForTesting = func(cc *ClientConn) { +		cc.idlenessMgr.EnterIdleModeForTesting()  	}  	internal.ExitIdleModeForTesting = func(cc *ClientConn) error { -		return cc.exitIdleMode() +		return cc.idlenessMgr.ExitIdleMode()  	}  } @@ -824,9 +719,8 @@ func (cc *ClientConn) maybeApplyDefaultServiceConfig(addrs []resolver.Address) {  	}  } -func (cc *ClientConn) updateResolverState(s resolver.State, err error) error { +func (cc *ClientConn) updateResolverStateAndUnlock(s resolver.State, err error) error {  	defer cc.firstResolveEvent.Fire() -	cc.mu.Lock()  	// Check if the ClientConn is already closed. Some fields (e.g.  	// balancerWrapper) are set to nil when closing the ClientConn, and could  	// cause nil pointer panic if we don't have this check. @@ -872,7 +766,7 @@ func (cc *ClientConn) updateResolverState(s resolver.State, err error) error {  			if cc.sc == nil {  				// Apply the failing LB only if we haven't received valid service config  				// from the name resolver in the past. -				cc.applyFailingLB(s.ServiceConfig) +				cc.applyFailingLBLocked(s.ServiceConfig)  				cc.mu.Unlock()  				return ret  			} @@ -894,15 +788,13 @@ func (cc *ClientConn) updateResolverState(s resolver.State, err error) error {  	return ret  } -// applyFailingLB is akin to configuring an LB policy on the channel which +// applyFailingLBLocked is akin to configuring an LB policy on the channel which  // always fails RPCs. Here, an actual LB policy is not configured, but an always  // erroring picker is configured, which returns errors with information about  // what was invalid in the received service config. A config selector with no  // service config is configured, and the connectivity state of the channel is  // set to TransientFailure. -// -// Caller must hold cc.mu. -func (cc *ClientConn) applyFailingLB(sc *serviceconfig.ParseResult) { +func (cc *ClientConn) applyFailingLBLocked(sc *serviceconfig.ParseResult) {  	var err error  	if sc.Err != nil {  		err = status.Errorf(codes.Unavailable, "error parsing service config: %v", sc.Err) @@ -910,14 +802,10 @@ func (cc *ClientConn) applyFailingLB(sc *serviceconfig.ParseResult) {  		err = status.Errorf(codes.Unavailable, "illegal service config type: %T", sc.Config)  	}  	cc.safeConfigSelector.UpdateConfigSelector(&defaultConfigSelector{nil}) -	cc.blockingpicker.updatePicker(base.NewErrPicker(err)) +	cc.pickerWrapper.updatePicker(base.NewErrPicker(err))  	cc.csMgr.updateState(connectivity.TransientFailure)  } -func (cc *ClientConn) handleSubConnStateChange(sc balancer.SubConn, s connectivity.State, err error) { -	cc.balancerWrapper.updateSubConnState(sc, s, err) -} -  // Makes a copy of the input addresses slice and clears out the balancer  // attributes field. Addresses are passed during subconn creation and address  // update operations. In both cases, we will clear the balancer attributes by @@ -932,10 +820,14 @@ func copyAddressesWithoutBalancerAttributes(in []resolver.Address) []resolver.Ad  	return out  } -// newAddrConn creates an addrConn for addrs and adds it to cc.conns. +// newAddrConnLocked creates an addrConn for addrs and adds it to cc.conns.  //  // Caller needs to make sure len(addrs) > 0. -func (cc *ClientConn) newAddrConn(addrs []resolver.Address, opts balancer.NewSubConnOptions) (*addrConn, error) { +func (cc *ClientConn) newAddrConnLocked(addrs []resolver.Address, opts balancer.NewSubConnOptions) (*addrConn, error) { +	if cc.conns == nil { +		return nil, ErrClientConnClosing +	} +  	ac := &addrConn{  		state:        connectivity.Idle,  		cc:           cc, @@ -947,12 +839,6 @@ func (cc *ClientConn) newAddrConn(addrs []resolver.Address, opts balancer.NewSub  		stateChan:    make(chan struct{}),  	}  	ac.ctx, ac.cancel = context.WithCancel(cc.ctx) -	// Track ac in cc. This needs to be done before any getTransport(...) is called. -	cc.mu.Lock() -	defer cc.mu.Unlock() -	if cc.conns == nil { -		return nil, ErrClientConnClosing -	}  	var err error  	ac.channelzID, err = channelz.RegisterSubChannel(ac, cc.channelzID, "") @@ -968,6 +854,7 @@ func (cc *ClientConn) newAddrConn(addrs []resolver.Address, opts balancer.NewSub  		},  	}) +	// Track ac in cc. This needs to be done before any getTransport(...) is called.  	cc.conns[ac] = struct{}{}  	return ac, nil  } @@ -1174,7 +1061,7 @@ func (cc *ClientConn) healthCheckConfig() *healthCheckConfig {  }  func (cc *ClientConn) getTransport(ctx context.Context, failfast bool, method string) (transport.ClientTransport, balancer.PickResult, error) { -	return cc.blockingpicker.pick(ctx, failfast, balancer.PickInfo{ +	return cc.pickerWrapper.pick(ctx, failfast, balancer.PickInfo{  		Ctx:            ctx,  		FullMethodName: method,  	}) @@ -1216,12 +1103,12 @@ func (cc *ClientConn) applyServiceConfigAndBalancer(sc *ServiceConfig, configSel  func (cc *ClientConn) resolveNow(o resolver.ResolveNowOptions) {  	cc.mu.RLock() -	r := cc.resolverWrapper +	cc.resolverWrapper.resolveNow(o)  	cc.mu.RUnlock() -	if r == nil { -		return -	} -	go r.resolveNow(o) +} + +func (cc *ClientConn) resolveNowLocked(o resolver.ResolveNowOptions) { +	cc.resolverWrapper.resolveNow(o)  }  // ResetConnectBackoff wakes up all subchannels in transient failure and causes @@ -1253,40 +1140,32 @@ func (cc *ClientConn) Close() error {  		<-cc.csMgr.pubSub.Done()  	}() +	// Prevent calls to enter/exit idle immediately, and ensure we are not +	// currently entering/exiting idle mode. +	cc.idlenessMgr.Close() +  	cc.mu.Lock()  	if cc.conns == nil {  		cc.mu.Unlock()  		return ErrClientConnClosing  	} -	for cc.idlenessState == ccIdlenessStateExitingIdle { -		cc.exitIdleCond.Wait() -	} -  	conns := cc.conns  	cc.conns = nil  	cc.csMgr.updateState(connectivity.Shutdown) -	pWrapper := cc.blockingpicker -	rWrapper := cc.resolverWrapper -	bWrapper := cc.balancerWrapper -	idlenessMgr := cc.idlenessMgr +	// We can safely unlock and continue to access all fields now as +	// cc.conns==nil, preventing any further operations on cc.  	cc.mu.Unlock() +	cc.resolverWrapper.close()  	// The order of closing matters here since the balancer wrapper assumes the  	// picker is closed before it is closed. -	if pWrapper != nil { -		pWrapper.close() -	} -	if bWrapper != nil { -		bWrapper.close() -	} -	if rWrapper != nil { -		rWrapper.close() -	} -	if idlenessMgr != nil { -		idlenessMgr.Close() -	} +	cc.pickerWrapper.close() +	cc.balancerWrapper.close() + +	<-cc.resolverWrapper.serializer.Done() +	<-cc.balancerWrapper.serializer.Done()  	for ac := range conns {  		ac.tearDown(ErrClientConnClosing) @@ -1307,7 +1186,7 @@ type addrConn struct {  	cc     *ClientConn  	dopts  dialOptions -	acbw   balancer.SubConn +	acbw   *acBalancerWrapper  	scopts balancer.NewSubConnOptions  	// transport is set when there's a viable transport (note: ac state may not be READY as LB channel @@ -1345,7 +1224,7 @@ func (ac *addrConn) updateConnectivityState(s connectivity.State, lastErr error)  	} else {  		channelz.Infof(logger, ac.channelzID, "Subchannel Connectivity change to %v, last error: %s", s, lastErr)  	} -	ac.cc.handleSubConnStateChange(ac.acbw, s, lastErr) +	ac.acbw.updateState(s, lastErr)  }  // adjustParams updates parameters used to create transports upon @@ -1849,7 +1728,7 @@ func (cc *ClientConn) parseTargetAndFindResolver() error {  	if err != nil {  		channelz.Infof(logger, cc.channelzID, "dial target %q parse failed: %v", cc.target, err)  	} else { -		channelz.Infof(logger, cc.channelzID, "parsed dial target is: %+v", parsedTarget) +		channelz.Infof(logger, cc.channelzID, "parsed dial target is: %#v", parsedTarget)  		rb = cc.getResolver(parsedTarget.URL.Scheme)  		if rb != nil {  			cc.parsedTarget = parsedTarget @@ -1981,58 +1860,17 @@ func (cc *ClientConn) determineAuthority() error {  	}  	endpoint := cc.parsedTarget.Endpoint() -	target := cc.target -	switch { -	case authorityFromDialOption != "": +	if authorityFromDialOption != "" {  		cc.authority = authorityFromDialOption -	case authorityFromCreds != "": +	} else if authorityFromCreds != "" {  		cc.authority = authorityFromCreds -	case strings.HasPrefix(target, "unix:") || strings.HasPrefix(target, "unix-abstract:"): -		// TODO: remove when the unix resolver implements optional interface to -		// return channel authority. -		cc.authority = "localhost" -	case strings.HasPrefix(endpoint, ":"): +	} else if auth, ok := cc.resolverBuilder.(resolver.AuthorityOverrider); ok { +		cc.authority = auth.OverrideAuthority(cc.parsedTarget) +	} else if strings.HasPrefix(endpoint, ":") {  		cc.authority = "localhost" + endpoint -	default: -		// TODO: Define an optional interface on the resolver builder to return -		// the channel authority given the user's dial target. For resolvers -		// which don't implement this interface, we will use the endpoint from -		// "scheme://authority/endpoint" as the default authority. -		// Escape the endpoint to handle use cases where the endpoint -		// might not be a valid authority by default. -		// For example an endpoint which has multiple paths like -		// 'a/b/c', which is not a valid authority by default. +	} else {  		cc.authority = encodeAuthority(endpoint)  	}  	channelz.Infof(logger, cc.channelzID, "Channel authority set to %q", cc.authority)  	return nil  } - -// initResolverWrapper creates a ccResolverWrapper, which builds the name -// resolver. This method grabs the lock to assign the newly built resolver -// wrapper to the cc.resolverWrapper field. -func (cc *ClientConn) initResolverWrapper(creds credentials.TransportCredentials) error { -	rw, err := newCCResolverWrapper(cc, ccResolverWrapperOpts{ -		target:  cc.parsedTarget, -		builder: cc.resolverBuilder, -		bOpts: resolver.BuildOptions{ -			DisableServiceConfig: cc.dopts.disableServiceConfig, -			DialCreds:            creds, -			CredsBundle:          cc.dopts.copts.CredsBundle, -			Dialer:               cc.dopts.copts.Dialer, -		}, -		channelzID: cc.channelzID, -	}) -	if err != nil { -		return fmt.Errorf("failed to build resolver: %v", err) -	} -	// Resolver implementations may report state update or error inline when -	// built (or right after), and this is handled in cc.updateResolverState. -	// Also, an error from the resolver might lead to a re-resolution request -	// from the balancer, which is handled in resolveNow() where -	// `cc.resolverWrapper` is accessed. Hence, we need to hold the lock here. -	cc.mu.Lock() -	cc.resolverWrapper = rw -	cc.mu.Unlock() -	return nil -} diff --git a/vendor/google.golang.org/grpc/codes/codes.go b/vendor/google.golang.org/grpc/codes/codes.go index 11b106182..08476ad1f 100644 --- a/vendor/google.golang.org/grpc/codes/codes.go +++ b/vendor/google.golang.org/grpc/codes/codes.go @@ -25,7 +25,13 @@ import (  	"strconv"  ) -// A Code is an unsigned 32-bit error code as defined in the gRPC spec. +// A Code is a status code defined according to the [gRPC documentation]. +// +// Only the codes defined as consts in this package are valid codes. Do not use +// other code values.  Behavior of other codes is implementation-specific and +// interoperability between implementations is not guaranteed. +// +// [gRPC documentation]: https://github.com/grpc/grpc/blob/master/doc/statuscodes.md  type Code uint32  const ( diff --git a/vendor/google.golang.org/grpc/credentials/tls.go b/vendor/google.golang.org/grpc/credentials/tls.go index 877b7cd21..5dafd34ed 100644 --- a/vendor/google.golang.org/grpc/credentials/tls.go +++ b/vendor/google.golang.org/grpc/credentials/tls.go @@ -44,10 +44,25 @@ func (t TLSInfo) AuthType() string {  	return "tls"  } +// cipherSuiteLookup returns the string version of a TLS cipher suite ID. +func cipherSuiteLookup(cipherSuiteID uint16) string { +	for _, s := range tls.CipherSuites() { +		if s.ID == cipherSuiteID { +			return s.Name +		} +	} +	for _, s := range tls.InsecureCipherSuites() { +		if s.ID == cipherSuiteID { +			return s.Name +		} +	} +	return fmt.Sprintf("unknown ID: %v", cipherSuiteID) +} +  // GetSecurityValue returns security info requested by channelz.  func (t TLSInfo) GetSecurityValue() ChannelzSecurityValue {  	v := &TLSChannelzSecurityValue{ -		StandardName: cipherSuiteLookup[t.State.CipherSuite], +		StandardName: cipherSuiteLookup(t.State.CipherSuite),  	}  	// Currently there's no way to get LocalCertificate info from tls package.  	if len(t.State.PeerCertificates) > 0 { @@ -138,10 +153,39 @@ func (c *tlsCreds) OverrideServerName(serverNameOverride string) error {  	return nil  } +// The following cipher suites are forbidden for use with HTTP/2 by +// https://datatracker.ietf.org/doc/html/rfc7540#appendix-A +var tls12ForbiddenCipherSuites = map[uint16]struct{}{ +	tls.TLS_RSA_WITH_AES_128_CBC_SHA:         {}, +	tls.TLS_RSA_WITH_AES_256_CBC_SHA:         {}, +	tls.TLS_RSA_WITH_AES_128_GCM_SHA256:      {}, +	tls.TLS_RSA_WITH_AES_256_GCM_SHA384:      {}, +	tls.TLS_ECDHE_ECDSA_WITH_AES_128_CBC_SHA: {}, +	tls.TLS_ECDHE_ECDSA_WITH_AES_256_CBC_SHA: {}, +	tls.TLS_ECDHE_RSA_WITH_AES_128_CBC_SHA:   {}, +	tls.TLS_ECDHE_RSA_WITH_AES_256_CBC_SHA:   {}, +} +  // NewTLS uses c to construct a TransportCredentials based on TLS.  func NewTLS(c *tls.Config) TransportCredentials {  	tc := &tlsCreds{credinternal.CloneTLSConfig(c)}  	tc.config.NextProtos = credinternal.AppendH2ToNextProtos(tc.config.NextProtos) +	// If the user did not configure a MinVersion and did not configure a +	// MaxVersion < 1.2, use MinVersion=1.2, which is required by +	// https://datatracker.ietf.org/doc/html/rfc7540#section-9.2 +	if tc.config.MinVersion == 0 && (tc.config.MaxVersion == 0 || tc.config.MaxVersion >= tls.VersionTLS12) { +		tc.config.MinVersion = tls.VersionTLS12 +	} +	// If the user did not configure CipherSuites, use all "secure" cipher +	// suites reported by the TLS package, but remove some explicitly forbidden +	// by https://datatracker.ietf.org/doc/html/rfc7540#appendix-A +	if tc.config.CipherSuites == nil { +		for _, cs := range tls.CipherSuites() { +			if _, ok := tls12ForbiddenCipherSuites[cs.ID]; !ok { +				tc.config.CipherSuites = append(tc.config.CipherSuites, cs.ID) +			} +		} +	}  	return tc  } @@ -205,32 +249,3 @@ type TLSChannelzSecurityValue struct {  	LocalCertificate  []byte  	RemoteCertificate []byte  } - -var cipherSuiteLookup = map[uint16]string{ -	tls.TLS_RSA_WITH_RC4_128_SHA:                "TLS_RSA_WITH_RC4_128_SHA", -	tls.TLS_RSA_WITH_3DES_EDE_CBC_SHA:           "TLS_RSA_WITH_3DES_EDE_CBC_SHA", -	tls.TLS_RSA_WITH_AES_128_CBC_SHA:            "TLS_RSA_WITH_AES_128_CBC_SHA", -	tls.TLS_RSA_WITH_AES_256_CBC_SHA:            "TLS_RSA_WITH_AES_256_CBC_SHA", -	tls.TLS_RSA_WITH_AES_128_GCM_SHA256:         "TLS_RSA_WITH_AES_128_GCM_SHA256", -	tls.TLS_RSA_WITH_AES_256_GCM_SHA384:         "TLS_RSA_WITH_AES_256_GCM_SHA384", -	tls.TLS_ECDHE_ECDSA_WITH_RC4_128_SHA:        "TLS_ECDHE_ECDSA_WITH_RC4_128_SHA", -	tls.TLS_ECDHE_ECDSA_WITH_AES_128_CBC_SHA:    "TLS_ECDHE_ECDSA_WITH_AES_128_CBC_SHA", -	tls.TLS_ECDHE_ECDSA_WITH_AES_256_CBC_SHA:    "TLS_ECDHE_ECDSA_WITH_AES_256_CBC_SHA", -	tls.TLS_ECDHE_RSA_WITH_RC4_128_SHA:          "TLS_ECDHE_RSA_WITH_RC4_128_SHA", -	tls.TLS_ECDHE_RSA_WITH_3DES_EDE_CBC_SHA:     "TLS_ECDHE_RSA_WITH_3DES_EDE_CBC_SHA", -	tls.TLS_ECDHE_RSA_WITH_AES_128_CBC_SHA:      "TLS_ECDHE_RSA_WITH_AES_128_CBC_SHA", -	tls.TLS_ECDHE_RSA_WITH_AES_256_CBC_SHA:      "TLS_ECDHE_RSA_WITH_AES_256_CBC_SHA", -	tls.TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256:   "TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256", -	tls.TLS_ECDHE_ECDSA_WITH_AES_128_GCM_SHA256: "TLS_ECDHE_ECDSA_WITH_AES_128_GCM_SHA256", -	tls.TLS_ECDHE_RSA_WITH_AES_256_GCM_SHA384:   "TLS_ECDHE_RSA_WITH_AES_256_GCM_SHA384", -	tls.TLS_ECDHE_ECDSA_WITH_AES_256_GCM_SHA384: "TLS_ECDHE_ECDSA_WITH_AES_256_GCM_SHA384", -	tls.TLS_FALLBACK_SCSV:                       "TLS_FALLBACK_SCSV", -	tls.TLS_RSA_WITH_AES_128_CBC_SHA256:         "TLS_RSA_WITH_AES_128_CBC_SHA256", -	tls.TLS_ECDHE_ECDSA_WITH_AES_128_CBC_SHA256: "TLS_ECDHE_ECDSA_WITH_AES_128_CBC_SHA256", -	tls.TLS_ECDHE_RSA_WITH_AES_128_CBC_SHA256:   "TLS_ECDHE_RSA_WITH_AES_128_CBC_SHA256", -	tls.TLS_ECDHE_RSA_WITH_CHACHA20_POLY1305:    "TLS_ECDHE_RSA_WITH_CHACHA20_POLY1305", -	tls.TLS_ECDHE_ECDSA_WITH_CHACHA20_POLY1305:  "TLS_ECDHE_ECDSA_WITH_CHACHA20_POLY1305", -	tls.TLS_AES_128_GCM_SHA256:                  "TLS_AES_128_GCM_SHA256", -	tls.TLS_AES_256_GCM_SHA384:                  "TLS_AES_256_GCM_SHA384", -	tls.TLS_CHACHA20_POLY1305_SHA256:            "TLS_CHACHA20_POLY1305_SHA256", -} diff --git a/vendor/google.golang.org/grpc/dialoptions.go b/vendor/google.golang.org/grpc/dialoptions.go index cfc9fd85e..ba2426180 100644 --- a/vendor/google.golang.org/grpc/dialoptions.go +++ b/vendor/google.golang.org/grpc/dialoptions.go @@ -46,6 +46,7 @@ func init() {  	internal.WithBinaryLogger = withBinaryLogger  	internal.JoinDialOptions = newJoinDialOption  	internal.DisableGlobalDialOptions = newDisableGlobalDialOptions +	internal.WithRecvBufferPool = withRecvBufferPool  }  // dialOptions configure a Dial call. dialOptions are set by the DialOption @@ -63,7 +64,6 @@ type dialOptions struct {  	block                       bool  	returnLastError             bool  	timeout                     time.Duration -	scChan                      <-chan ServiceConfig  	authority                   string  	binaryLogger                binarylog.Logger  	copts                       transport.ConnectOptions @@ -250,19 +250,6 @@ func WithDecompressor(dc Decompressor) DialOption {  	})  } -// WithServiceConfig returns a DialOption which has a channel to read the -// service configuration. -// -// Deprecated: service config should be received through name resolver or via -// WithDefaultServiceConfig, as specified at -// https://github.com/grpc/grpc/blob/master/doc/service_config.md.  Will be -// removed in a future 1.x release. -func WithServiceConfig(c <-chan ServiceConfig) DialOption { -	return newFuncDialOption(func(o *dialOptions) { -		o.scChan = c -	}) -} -  // WithConnectParams configures the ClientConn to use the provided ConnectParams  // for creating and maintaining connections to servers.  // @@ -413,6 +400,17 @@ func WithTimeout(d time.Duration) DialOption {  // connections. If FailOnNonTempDialError() is set to true, and an error is  // returned by f, gRPC checks the error's Temporary() method to decide if it  // should try to reconnect to the network address. +// +// Note: All supported releases of Go (as of December 2023) override the OS +// defaults for TCP keepalive time and interval to 15s. To enable TCP keepalive +// with OS defaults for keepalive time and interval, use a net.Dialer that sets +// the KeepAlive field to a negative value, and sets the SO_KEEPALIVE socket +// option to true from the Control field. For a concrete example of how to do +// this, see internal.NetDialerWithTCPKeepalive(). +// +// For more information, please see [issue 23459] in the Go github repo. +// +// [issue 23459]: https://github.com/golang/go/issues/23459  func WithContextDialer(f func(context.Context, string) (net.Conn, error)) DialOption {  	return newFuncDialOption(func(o *dialOptions) {  		o.copts.Dialer = f @@ -487,7 +485,7 @@ func FailOnNonTempDialError(f bool) DialOption {  // the RPCs.  func WithUserAgent(s string) DialOption {  	return newFuncDialOption(func(o *dialOptions) { -		o.copts.UserAgent = s +		o.copts.UserAgent = s + " " + grpcUA  	})  } @@ -637,14 +635,16 @@ func withHealthCheckFunc(f internal.HealthChecker) DialOption {  func defaultDialOptions() dialOptions {  	return dialOptions{ -		healthCheckFunc: internal.HealthCheckFunc,  		copts: transport.ConnectOptions{ -			WriteBufferSize: defaultWriteBufSize,  			ReadBufferSize:  defaultReadBufSize, +			WriteBufferSize: defaultWriteBufSize,  			UseProxy:        true, +			UserAgent:       grpcUA,  		}, -		recvBufferPool: nopBufferPool{}, -		idleTimeout:    30 * time.Minute, +		bs:              internalbackoff.DefaultExponential, +		healthCheckFunc: internal.HealthCheckFunc, +		idleTimeout:     30 * time.Minute, +		recvBufferPool:  nopBufferPool{},  	}  } @@ -705,11 +705,13 @@ func WithIdleTimeout(d time.Duration) DialOption {  // options are used: WithStatsHandler, EnableTracing, or binary logging. In such  // cases, the shared buffer pool will be ignored.  // -// # Experimental -// -// Notice: This API is EXPERIMENTAL and may be changed or removed in a -// later release. +// Deprecated: use experimental.WithRecvBufferPool instead.  Will be deleted in +// v1.60.0 or later.  func WithRecvBufferPool(bufferPool SharedBufferPool) DialOption { +	return withRecvBufferPool(bufferPool) +} + +func withRecvBufferPool(bufferPool SharedBufferPool) DialOption {  	return newFuncDialOption(func(o *dialOptions) {  		o.recvBufferPool = bufferPool  	}) diff --git a/vendor/google.golang.org/grpc/internal/buffer/unbounded.go b/vendor/google.golang.org/grpc/internal/buffer/unbounded.go index 4399c3df4..11f91668a 100644 --- a/vendor/google.golang.org/grpc/internal/buffer/unbounded.go +++ b/vendor/google.golang.org/grpc/internal/buffer/unbounded.go @@ -18,7 +18,10 @@  // Package buffer provides an implementation of an unbounded buffer.  package buffer -import "sync" +import ( +	"errors" +	"sync" +)  // Unbounded is an implementation of an unbounded buffer which does not use  // extra goroutines. This is typically used for passing updates from one entity @@ -36,6 +39,7 @@ import "sync"  type Unbounded struct {  	c       chan any  	closed  bool +	closing bool  	mu      sync.Mutex  	backlog []any  } @@ -45,32 +49,32 @@ func NewUnbounded() *Unbounded {  	return &Unbounded{c: make(chan any, 1)}  } +var errBufferClosed = errors.New("Put called on closed buffer.Unbounded") +  // Put adds t to the unbounded buffer. -func (b *Unbounded) Put(t any) { +func (b *Unbounded) Put(t any) error {  	b.mu.Lock()  	defer b.mu.Unlock() -	if b.closed { -		return +	if b.closing { +		return errBufferClosed  	}  	if len(b.backlog) == 0 {  		select {  		case b.c <- t: -			return +			return nil  		default:  		}  	}  	b.backlog = append(b.backlog, t) +	return nil  } -// Load sends the earliest buffered data, if any, onto the read channel -// returned by Get(). Users are expected to call this every time they read a +// Load sends the earliest buffered data, if any, onto the read channel returned +// by Get(). Users are expected to call this every time they successfully read a  // value from the read channel.  func (b *Unbounded) Load() {  	b.mu.Lock()  	defer b.mu.Unlock() -	if b.closed { -		return -	}  	if len(b.backlog) > 0 {  		select {  		case b.c <- b.backlog[0]: @@ -78,6 +82,8 @@ func (b *Unbounded) Load() {  			b.backlog = b.backlog[1:]  		default:  		} +	} else if b.closing && !b.closed { +		close(b.c)  	}  } @@ -88,18 +94,23 @@ func (b *Unbounded) Load() {  // send the next buffered value onto the channel if there is any.  //  // If the unbounded buffer is closed, the read channel returned by this method -// is closed. +// is closed after all data is drained.  func (b *Unbounded) Get() <-chan any {  	return b.c  } -// Close closes the unbounded buffer. +// Close closes the unbounded buffer. No subsequent data may be Put(), and the +// channel returned from Get() will be closed after all the data is read and +// Load() is called for the final time.  func (b *Unbounded) Close() {  	b.mu.Lock()  	defer b.mu.Unlock() -	if b.closed { +	if b.closing {  		return  	} -	b.closed = true -	close(b.c) +	b.closing = true +	if len(b.backlog) == 0 { +		b.closed = true +		close(b.c) +	}  } diff --git a/vendor/google.golang.org/grpc/internal/channelz/funcs.go b/vendor/google.golang.org/grpc/internal/channelz/funcs.go index 5395e7752..fc094f344 100644 --- a/vendor/google.golang.org/grpc/internal/channelz/funcs.go +++ b/vendor/google.golang.org/grpc/internal/channelz/funcs.go @@ -31,6 +31,7 @@ import (  	"time"  	"google.golang.org/grpc/grpclog" +	"google.golang.org/grpc/internal"  )  const ( @@ -58,6 +59,12 @@ func TurnOn() {  	}  } +func init() { +	internal.ChannelzTurnOffForTesting = func() { +		atomic.StoreInt32(&curState, 0) +	} +} +  // IsOn returns whether channelz data collection is on.  func IsOn() bool {  	return atomic.LoadInt32(&curState) == 1 diff --git a/vendor/google.golang.org/grpc/internal/envconfig/envconfig.go b/vendor/google.golang.org/grpc/internal/envconfig/envconfig.go index 3cf10ddfb..685a3cb41 100644 --- a/vendor/google.golang.org/grpc/internal/envconfig/envconfig.go +++ b/vendor/google.golang.org/grpc/internal/envconfig/envconfig.go @@ -36,9 +36,6 @@ var (  	// "GRPC_RING_HASH_CAP".  This does not override the default bounds  	// checking which NACKs configs specifying ring sizes > 8*1024*1024 (~8M).  	RingHashCap = uint64FromEnv("GRPC_RING_HASH_CAP", 4096, 1, 8*1024*1024) -	// PickFirstLBConfig is set if we should support configuration of the -	// pick_first LB policy. -	PickFirstLBConfig = boolFromEnv("GRPC_EXPERIMENTAL_PICKFIRST_LB_CONFIG", true)  	// LeastRequestLB is set if we should support the least_request_experimental  	// LB policy, which can be enabled by setting the environment variable  	// "GRPC_EXPERIMENTAL_ENABLE_LEAST_REQUEST" to "true". diff --git a/vendor/google.golang.org/grpc/internal/envconfig/xds.go b/vendor/google.golang.org/grpc/internal/envconfig/xds.go index 02b4b6a1c..29f234acb 100644 --- a/vendor/google.golang.org/grpc/internal/envconfig/xds.go +++ b/vendor/google.golang.org/grpc/internal/envconfig/xds.go @@ -50,46 +50,7 @@ var (  	//  	// When both bootstrap FileName and FileContent are set, FileName is used.  	XDSBootstrapFileContent = os.Getenv(XDSBootstrapFileContentEnv) -	// XDSRingHash indicates whether ring hash support is enabled, which can be -	// disabled by setting the environment variable -	// "GRPC_XDS_EXPERIMENTAL_ENABLE_RING_HASH" to "false". -	XDSRingHash = boolFromEnv("GRPC_XDS_EXPERIMENTAL_ENABLE_RING_HASH", true) -	// XDSClientSideSecurity is used to control processing of security -	// configuration on the client-side. -	// -	// Note that there is no env var protection for the server-side because we -	// have a brand new API on the server-side and users explicitly need to use -	// the new API to get security integration on the server. -	XDSClientSideSecurity = boolFromEnv("GRPC_XDS_EXPERIMENTAL_SECURITY_SUPPORT", true) -	// XDSAggregateAndDNS indicates whether processing of aggregated cluster and -	// DNS cluster is enabled, which can be disabled by setting the environment -	// variable "GRPC_XDS_EXPERIMENTAL_ENABLE_AGGREGATE_AND_LOGICAL_DNS_CLUSTER" -	// to "false". -	XDSAggregateAndDNS = boolFromEnv("GRPC_XDS_EXPERIMENTAL_ENABLE_AGGREGATE_AND_LOGICAL_DNS_CLUSTER", true) - -	// XDSRBAC indicates whether xDS configured RBAC HTTP Filter is enabled, -	// which can be disabled by setting the environment variable -	// "GRPC_XDS_EXPERIMENTAL_RBAC" to "false". -	XDSRBAC = boolFromEnv("GRPC_XDS_EXPERIMENTAL_RBAC", true) -	// XDSOutlierDetection indicates whether outlier detection support is -	// enabled, which can be disabled by setting the environment variable -	// "GRPC_EXPERIMENTAL_ENABLE_OUTLIER_DETECTION" to "false". -	XDSOutlierDetection = boolFromEnv("GRPC_EXPERIMENTAL_ENABLE_OUTLIER_DETECTION", true) -	// XDSFederation indicates whether federation support is enabled, which can -	// be enabled by setting the environment variable -	// "GRPC_EXPERIMENTAL_XDS_FEDERATION" to "true". -	XDSFederation = boolFromEnv("GRPC_EXPERIMENTAL_XDS_FEDERATION", true) - -	// XDSRLS indicates whether processing of Cluster Specifier plugins and -	// support for the RLS CLuster Specifier is enabled, which can be disabled by -	// setting the environment variable "GRPC_EXPERIMENTAL_XDS_RLS_LB" to -	// "false". -	XDSRLS = boolFromEnv("GRPC_EXPERIMENTAL_XDS_RLS_LB", true)  	// C2PResolverTestOnlyTrafficDirectorURI is the TD URI for testing.  	C2PResolverTestOnlyTrafficDirectorURI = os.Getenv("GRPC_TEST_ONLY_GOOGLE_C2P_RESOLVER_TRAFFIC_DIRECTOR_URI") -	// XDSCustomLBPolicy indicates whether Custom LB Policies are enabled, which -	// can be disabled by setting the environment variable -	// "GRPC_EXPERIMENTAL_XDS_CUSTOM_LB_CONFIG" to "false". -	XDSCustomLBPolicy = boolFromEnv("GRPC_EXPERIMENTAL_XDS_CUSTOM_LB_CONFIG", true)  ) diff --git a/vendor/google.golang.org/grpc/internal/experimental.go b/vendor/google.golang.org/grpc/internal/experimental.go new file mode 100644 index 000000000..7f7044e17 --- /dev/null +++ b/vendor/google.golang.org/grpc/internal/experimental.go @@ -0,0 +1,28 @@ +/* + * Copyright 2023 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + *     http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package internal + +var ( +	// WithRecvBufferPool is implemented by the grpc package and returns a dial +	// option to configure a shared buffer pool for a grpc.ClientConn. +	WithRecvBufferPool any // func (grpc.SharedBufferPool) grpc.DialOption + +	// RecvBufferPool is implemented by the grpc package and returns a server +	// option to configure a shared buffer pool for a grpc.Server. +	RecvBufferPool any // func (grpc.SharedBufferPool) grpc.ServerOption +) diff --git a/vendor/google.golang.org/grpc/internal/grpcsync/callback_serializer.go b/vendor/google.golang.org/grpc/internal/grpcsync/callback_serializer.go index 900917dbe..f7f40a16a 100644 --- a/vendor/google.golang.org/grpc/internal/grpcsync/callback_serializer.go +++ b/vendor/google.golang.org/grpc/internal/grpcsync/callback_serializer.go @@ -20,7 +20,6 @@ package grpcsync  import (  	"context" -	"sync"  	"google.golang.org/grpc/internal/buffer"  ) @@ -38,8 +37,6 @@ type CallbackSerializer struct {  	done chan struct{}  	callbacks *buffer.Unbounded -	closedMu  sync.Mutex -	closed    bool  }  // NewCallbackSerializer returns a new CallbackSerializer instance. The provided @@ -65,56 +62,34 @@ func NewCallbackSerializer(ctx context.Context) *CallbackSerializer {  // callbacks to be executed by the serializer. It is not possible to add  // callbacks once the context passed to NewCallbackSerializer is cancelled.  func (cs *CallbackSerializer) Schedule(f func(ctx context.Context)) bool { -	cs.closedMu.Lock() -	defer cs.closedMu.Unlock() - -	if cs.closed { -		return false -	} -	cs.callbacks.Put(f) -	return true +	return cs.callbacks.Put(f) == nil  }  func (cs *CallbackSerializer) run(ctx context.Context) { -	var backlog []func(context.Context) -  	defer close(cs.done) + +	// TODO: when Go 1.21 is the oldest supported version, this loop and Close +	// can be replaced with: +	// +	// context.AfterFunc(ctx, cs.callbacks.Close)  	for ctx.Err() == nil {  		select {  		case <-ctx.Done():  			// Do nothing here. Next iteration of the for loop will not happen,  			// since ctx.Err() would be non-nil. -		case callback, ok := <-cs.callbacks.Get(): -			if !ok { -				return -			} +		case cb := <-cs.callbacks.Get():  			cs.callbacks.Load() -			callback.(func(ctx context.Context))(ctx) +			cb.(func(context.Context))(ctx)  		}  	} -	// Fetch pending callbacks if any, and execute them before returning from -	// this method and closing cs.done. -	cs.closedMu.Lock() -	cs.closed = true -	backlog = cs.fetchPendingCallbacks() +	// Close the buffer to prevent new callbacks from being added.  	cs.callbacks.Close() -	cs.closedMu.Unlock() -	for _, b := range backlog { -		b(ctx) -	} -} -func (cs *CallbackSerializer) fetchPendingCallbacks() []func(context.Context) { -	var backlog []func(context.Context) -	for { -		select { -		case b := <-cs.callbacks.Get(): -			backlog = append(backlog, b.(func(context.Context))) -			cs.callbacks.Load() -		default: -			return backlog -		} +	// Run all pending callbacks. +	for cb := range cs.callbacks.Get() { +		cs.callbacks.Load() +		cb.(func(context.Context))(ctx)  	}  } diff --git a/vendor/google.golang.org/grpc/internal/idle/idle.go b/vendor/google.golang.org/grpc/internal/idle/idle.go index 6c272476e..fe49cb74c 100644 --- a/vendor/google.golang.org/grpc/internal/idle/idle.go +++ b/vendor/google.golang.org/grpc/internal/idle/idle.go @@ -26,8 +26,6 @@ import (  	"sync"  	"sync/atomic"  	"time" - -	"google.golang.org/grpc/grpclog"  )  // For overriding in unit tests. @@ -39,27 +37,12 @@ var timeAfterFunc = func(d time.Duration, f func()) *time.Timer {  // and exit from idle mode.  type Enforcer interface {  	ExitIdleMode() error -	EnterIdleMode() error -} - -// Manager defines the functionality required to track RPC activity on a -// channel. -type Manager interface { -	OnCallBegin() error -	OnCallEnd() -	Close() +	EnterIdleMode()  } -type noopManager struct{} - -func (noopManager) OnCallBegin() error { return nil } -func (noopManager) OnCallEnd()         {} -func (noopManager) Close()             {} - -// manager implements the Manager interface. It uses atomic operations to -// synchronize access to shared state and a mutex to guarantee mutual exclusion -// in a critical section. -type manager struct { +// Manager implements idleness detection and calls the configured Enforcer to +// enter/exit idle mode when appropriate.  Must be created by NewManager. +type Manager struct {  	// State accessed atomically.  	lastCallEndTime           int64 // Unix timestamp in nanos; time when the most recent RPC completed.  	activeCallsCount          int32 // Count of active RPCs; -math.MaxInt32 means channel is idle or is trying to get there. @@ -69,8 +52,7 @@ type manager struct {  	// Can be accessed without atomics or mutex since these are set at creation  	// time and read-only after that.  	enforcer Enforcer // Functionality provided by grpc.ClientConn. -	timeout  int64    // Idle timeout duration nanos stored as an int64. -	logger   grpclog.LoggerV2 +	timeout  time.Duration  	// idleMu is used to guarantee mutual exclusion in two scenarios:  	// - Opposing intentions: @@ -88,57 +70,48 @@ type manager struct {  	timer        *time.Timer  } -// ManagerOptions is a collection of options used by -// NewManager. -type ManagerOptions struct { -	Enforcer Enforcer -	Timeout  time.Duration -	Logger   grpclog.LoggerV2 +// NewManager creates a new idleness manager implementation for the +// given idle timeout.  It begins in idle mode. +func NewManager(enforcer Enforcer, timeout time.Duration) *Manager { +	return &Manager{ +		enforcer:         enforcer, +		timeout:          timeout, +		actuallyIdle:     true, +		activeCallsCount: -math.MaxInt32, +	}  } -// NewManager creates a new idleness manager implementation for the -// given idle timeout. -func NewManager(opts ManagerOptions) Manager { -	if opts.Timeout == 0 { -		return noopManager{} +// resetIdleTimerLocked resets the idle timer to the given duration.  Called +// when exiting idle mode or when the timer fires and we need to reset it. +func (m *Manager) resetIdleTimerLocked(d time.Duration) { +	if m.isClosed() || m.timeout == 0 || m.actuallyIdle { +		return  	} -	m := &manager{ -		enforcer: opts.Enforcer, -		timeout:  int64(opts.Timeout), -		logger:   opts.Logger, +	// It is safe to ignore the return value from Reset() because this method is +	// only ever called from the timer callback or when exiting idle mode. +	if m.timer != nil { +		m.timer.Stop()  	} -	m.timer = timeAfterFunc(opts.Timeout, m.handleIdleTimeout) -	return m +	m.timer = timeAfterFunc(d, m.handleIdleTimeout)  } -// resetIdleTimer resets the idle timer to the given duration. This method -// should only be called from the timer callback. -func (m *manager) resetIdleTimer(d time.Duration) { +func (m *Manager) resetIdleTimer(d time.Duration) {  	m.idleMu.Lock()  	defer m.idleMu.Unlock() - -	if m.timer == nil { -		// Only close sets timer to nil. We are done. -		return -	} - -	// It is safe to ignore the return value from Reset() because this method is -	// only ever called from the timer callback, which means the timer has -	// already fired. -	m.timer.Reset(d) +	m.resetIdleTimerLocked(d)  }  // handleIdleTimeout is the timer callback that is invoked upon expiry of the  // configured idle timeout. The channel is considered inactive if there are no  // ongoing calls and no RPC activity since the last time the timer fired. -func (m *manager) handleIdleTimeout() { +func (m *Manager) handleIdleTimeout() {  	if m.isClosed() {  		return  	}  	if atomic.LoadInt32(&m.activeCallsCount) > 0 { -		m.resetIdleTimer(time.Duration(m.timeout)) +		m.resetIdleTimer(m.timeout)  		return  	} @@ -148,24 +121,12 @@ func (m *manager) handleIdleTimeout() {  		// Set the timer to fire after a duration of idle timeout, calculated  		// from the time the most recent RPC completed.  		atomic.StoreInt32(&m.activeSinceLastTimerCheck, 0) -		m.resetIdleTimer(time.Duration(atomic.LoadInt64(&m.lastCallEndTime) + m.timeout - time.Now().UnixNano())) +		m.resetIdleTimer(time.Duration(atomic.LoadInt64(&m.lastCallEndTime)-time.Now().UnixNano()) + m.timeout)  		return  	} -	// This CAS operation is extremely likely to succeed given that there has -	// been no activity since the last time we were here.  Setting the -	// activeCallsCount to -math.MaxInt32 indicates to OnCallBegin() that the -	// channel is either in idle mode or is trying to get there. -	if !atomic.CompareAndSwapInt32(&m.activeCallsCount, 0, -math.MaxInt32) { -		// This CAS operation can fail if an RPC started after we checked for -		// activity at the top of this method, or one was ongoing from before -		// the last time we were here. In both case, reset the timer and return. -		m.resetIdleTimer(time.Duration(m.timeout)) -		return -	} - -	// Now that we've set the active calls count to -math.MaxInt32, it's time to -	// actually move to idle mode. +	// Now that we've checked that there has been no activity, attempt to enter +	// idle mode, which is very likely to succeed.  	if m.tryEnterIdleMode() {  		// Successfully entered idle mode. No timer needed until we exit idle.  		return @@ -174,8 +135,7 @@ func (m *manager) handleIdleTimeout() {  	// Failed to enter idle mode due to a concurrent RPC that kept the channel  	// active, or because of an error from the channel. Undo the attempt to  	// enter idle, and reset the timer to try again later. -	atomic.AddInt32(&m.activeCallsCount, math.MaxInt32) -	m.resetIdleTimer(time.Duration(m.timeout)) +	m.resetIdleTimer(m.timeout)  }  // tryEnterIdleMode instructs the channel to enter idle mode. But before @@ -185,36 +145,49 @@ func (m *manager) handleIdleTimeout() {  // Return value indicates whether or not the channel moved to idle mode.  //  // Holds idleMu which ensures mutual exclusion with exitIdleMode. -func (m *manager) tryEnterIdleMode() bool { +func (m *Manager) tryEnterIdleMode() bool { +	// Setting the activeCallsCount to -math.MaxInt32 indicates to OnCallBegin() +	// that the channel is either in idle mode or is trying to get there. +	if !atomic.CompareAndSwapInt32(&m.activeCallsCount, 0, -math.MaxInt32) { +		// This CAS operation can fail if an RPC started after we checked for +		// activity in the timer handler, or one was ongoing from before the +		// last time the timer fired, or if a test is attempting to enter idle +		// mode without checking.  In all cases, abort going into idle mode. +		return false +	} +	// N.B. if we fail to enter idle mode after this, we must re-add +	// math.MaxInt32 to m.activeCallsCount. +  	m.idleMu.Lock()  	defer m.idleMu.Unlock()  	if atomic.LoadInt32(&m.activeCallsCount) != -math.MaxInt32 {  		// We raced and lost to a new RPC. Very rare, but stop entering idle. +		atomic.AddInt32(&m.activeCallsCount, math.MaxInt32)  		return false  	}  	if atomic.LoadInt32(&m.activeSinceLastTimerCheck) == 1 { -		// An very short RPC could have come in (and also finished) after we +		// A very short RPC could have come in (and also finished) after we  		// checked for calls count and activity in handleIdleTimeout(), but  		// before the CAS operation. So, we need to check for activity again. +		atomic.AddInt32(&m.activeCallsCount, math.MaxInt32)  		return false  	} -	// No new RPCs have come in since we last set the active calls count value -	// -math.MaxInt32 in the timer callback. And since we have the lock, it is -	// safe to enter idle mode now. -	if err := m.enforcer.EnterIdleMode(); err != nil { -		m.logger.Errorf("Failed to enter idle mode: %v", err) -		return false -	} - -	// Successfully entered idle mode. +	// No new RPCs have come in since we set the active calls count value to +	// -math.MaxInt32. And since we have the lock, it is safe to enter idle mode +	// unconditionally now. +	m.enforcer.EnterIdleMode()  	m.actuallyIdle = true  	return true  } +func (m *Manager) EnterIdleModeForTesting() { +	m.tryEnterIdleMode() +} +  // OnCallBegin is invoked at the start of every RPC. -func (m *manager) OnCallBegin() error { +func (m *Manager) OnCallBegin() error {  	if m.isClosed() {  		return nil  	} @@ -227,7 +200,7 @@ func (m *manager) OnCallBegin() error {  	// Channel is either in idle mode or is in the process of moving to idle  	// mode. Attempt to exit idle mode to allow this RPC. -	if err := m.exitIdleMode(); err != nil { +	if err := m.ExitIdleMode(); err != nil {  		// Undo the increment to calls count, and return an error causing the  		// RPC to fail.  		atomic.AddInt32(&m.activeCallsCount, -1) @@ -238,28 +211,30 @@ func (m *manager) OnCallBegin() error {  	return nil  } -// exitIdleMode instructs the channel to exit idle mode. -// -// Holds idleMu which ensures mutual exclusion with tryEnterIdleMode. -func (m *manager) exitIdleMode() error { +// ExitIdleMode instructs m to call the enforcer's ExitIdleMode and update m's +// internal state. +func (m *Manager) ExitIdleMode() error { +	// Holds idleMu which ensures mutual exclusion with tryEnterIdleMode.  	m.idleMu.Lock()  	defer m.idleMu.Unlock() -	if !m.actuallyIdle { -		// This can happen in two scenarios: +	if m.isClosed() || !m.actuallyIdle { +		// This can happen in three scenarios:  		// - handleIdleTimeout() set the calls count to -math.MaxInt32 and called  		//   tryEnterIdleMode(). But before the latter could grab the lock, an RPC  		//   came in and OnCallBegin() noticed that the calls count is negative.  		// - Channel is in idle mode, and multiple new RPCs come in at the same  		//   time, all of them notice a negative calls count in OnCallBegin and get  		//   here. The first one to get the lock would got the channel to exit idle. +		// - Channel is not in idle mode, and the user calls Connect which calls +		//   m.ExitIdleMode.  		// -		// Either way, nothing to do here. +		// In any case, there is nothing to do here.  		return nil  	}  	if err := m.enforcer.ExitIdleMode(); err != nil { -		return fmt.Errorf("channel failed to exit idle mode: %v", err) +		return fmt.Errorf("failed to exit idle mode: %w", err)  	}  	// Undo the idle entry process. This also respects any new RPC attempts. @@ -267,12 +242,12 @@ func (m *manager) exitIdleMode() error {  	m.actuallyIdle = false  	// Start a new timer to fire after the configured idle timeout. -	m.timer = timeAfterFunc(time.Duration(m.timeout), m.handleIdleTimeout) +	m.resetIdleTimerLocked(m.timeout)  	return nil  }  // OnCallEnd is invoked at the end of every RPC. -func (m *manager) OnCallEnd() { +func (m *Manager) OnCallEnd() {  	if m.isClosed() {  		return  	} @@ -287,15 +262,17 @@ func (m *manager) OnCallEnd() {  	atomic.AddInt32(&m.activeCallsCount, -1)  } -func (m *manager) isClosed() bool { +func (m *Manager) isClosed() bool {  	return atomic.LoadInt32(&m.closed) == 1  } -func (m *manager) Close() { +func (m *Manager) Close() {  	atomic.StoreInt32(&m.closed, 1)  	m.idleMu.Lock() -	m.timer.Stop() -	m.timer = nil +	if m.timer != nil { +		m.timer.Stop() +		m.timer = nil +	}  	m.idleMu.Unlock()  } diff --git a/vendor/google.golang.org/grpc/internal/internal.go b/vendor/google.golang.org/grpc/internal/internal.go index 0d94c63e0..6c7ea6a53 100644 --- a/vendor/google.golang.org/grpc/internal/internal.go +++ b/vendor/google.golang.org/grpc/internal/internal.go @@ -57,7 +57,7 @@ var (  	// GetXDSHandshakeInfoForTesting returns a pointer to the xds.HandshakeInfo  	// stored in the passed in attributes. This is set by  	// credentials/xds/xds.go. -	GetXDSHandshakeInfoForTesting any // func (*attributes.Attributes) *xds.HandshakeInfo +	GetXDSHandshakeInfoForTesting any // func (*attributes.Attributes) *unsafe.Pointer  	// GetServerCredentials returns the transport credentials configured on a  	// gRPC server. An xDS-enabled server needs to know what type of credentials  	// is configured on the underlying gRPC server. This is set by server.go. @@ -68,11 +68,11 @@ var (  	// This is used in the 1.0 release of gcp/observability, and thus must not be  	// deleted or changed.  	CanonicalString any // func (codes.Code) string -	// DrainServerTransports initiates a graceful close of existing connections -	// on a gRPC server accepted on the provided listener address. An -	// xDS-enabled server invokes this method on a grpc.Server when a particular -	// listener moves to "not-serving" mode. -	DrainServerTransports any // func(*grpc.Server, string) +	// IsRegisteredMethod returns whether the passed in method is registered as +	// a method on the server. +	IsRegisteredMethod any // func(*grpc.Server, string) bool +	// ServerFromContext returns the server from the context. +	ServerFromContext any // func(context.Context) *grpc.Server  	// AddGlobalServerOptions adds an array of ServerOption that will be  	// effective globally for newly created servers. The priority will be: 1.  	// user-provided; 2. this method; 3. default values. @@ -177,10 +177,25 @@ var (  	GRPCResolverSchemeExtraMetadata string = "xds"  	// EnterIdleModeForTesting gets the ClientConn to enter IDLE mode. -	EnterIdleModeForTesting any // func(*grpc.ClientConn) error +	EnterIdleModeForTesting any // func(*grpc.ClientConn)  	// ExitIdleModeForTesting gets the ClientConn to exit IDLE mode.  	ExitIdleModeForTesting any // func(*grpc.ClientConn) error + +	ChannelzTurnOffForTesting func() + +	// TriggerXDSResourceNameNotFoundForTesting triggers the resource-not-found +	// error for a given resource type and name. This is usually triggered when +	// the associated watch timer fires. For testing purposes, having this +	// function makes events more predictable than relying on timer events. +	TriggerXDSResourceNameNotFoundForTesting any // func(func(xdsresource.Type, string), string, string) error + +	// TriggerXDSResourceNotFoundClient invokes the testing xDS Client singleton +	// to invoke resource not found for a resource type name and resource name. +	TriggerXDSResourceNameNotFoundClient any // func(string, string) error + +	// FromOutgoingContextRaw returns the un-merged, intermediary contents of metadata.rawMD. +	FromOutgoingContextRaw any // func(context.Context) (metadata.MD, [][]string, bool)  )  // HealthChecker defines the signature of the client-side LB channel health checking function. diff --git a/vendor/google.golang.org/grpc/internal/resolver/dns/dns_resolver.go b/vendor/google.golang.org/grpc/internal/resolver/dns/dns_resolver.go index 99e1e5b36..b66dcb213 100644 --- a/vendor/google.golang.org/grpc/internal/resolver/dns/dns_resolver.go +++ b/vendor/google.golang.org/grpc/internal/resolver/dns/dns_resolver.go @@ -23,7 +23,6 @@ package dns  import (  	"context"  	"encoding/json" -	"errors"  	"fmt"  	"net"  	"os" @@ -37,6 +36,7 @@ import (  	"google.golang.org/grpc/internal/backoff"  	"google.golang.org/grpc/internal/envconfig"  	"google.golang.org/grpc/internal/grpcrand" +	"google.golang.org/grpc/internal/resolver/dns/internal"  	"google.golang.org/grpc/resolver"  	"google.golang.org/grpc/serviceconfig"  ) @@ -47,15 +47,11 @@ var EnableSRVLookups = false  var logger = grpclog.Component("dns") -// Globals to stub out in tests. TODO: Perhaps these two can be combined into a -// single variable for testing the resolver? -var ( -	newTimer           = time.NewTimer -	newTimerDNSResRate = time.NewTimer -) -  func init() {  	resolver.Register(NewBuilder()) +	internal.TimeAfterFunc = time.After +	internal.NewNetResolver = newNetResolver +	internal.AddressDialer = addressDialer  }  const ( @@ -70,23 +66,6 @@ const (  	txtAttribute = "grpc_config="  ) -var ( -	errMissingAddr = errors.New("dns resolver: missing address") - -	// Addresses ending with a colon that is supposed to be the separator -	// between host and port is not allowed.  E.g. "::" is a valid address as -	// it is an IPv6 address (host only) and "[::]:" is invalid as it ends with -	// a colon as the host and port separator -	errEndsWithColon = errors.New("dns resolver: missing port after port-separator colon") -) - -var ( -	defaultResolver netResolver = net.DefaultResolver -	// To prevent excessive re-resolution, we enforce a rate limit on DNS -	// resolution requests. -	minDNSResRate = 30 * time.Second -) -  var addressDialer = func(address string) func(context.Context, string, string) (net.Conn, error) {  	return func(ctx context.Context, network, _ string) (net.Conn, error) {  		var dialer net.Dialer @@ -94,7 +73,11 @@ var addressDialer = func(address string) func(context.Context, string, string) (  	}  } -var newNetResolver = func(authority string) (netResolver, error) { +var newNetResolver = func(authority string) (internal.NetResolver, error) { +	if authority == "" { +		return net.DefaultResolver, nil +	} +  	host, port, err := parseTarget(authority, defaultDNSSvrPort)  	if err != nil {  		return nil, err @@ -104,7 +87,7 @@ var newNetResolver = func(authority string) (netResolver, error) {  	return &net.Resolver{  		PreferGo: true, -		Dial:     addressDialer(authorityWithPort), +		Dial:     internal.AddressDialer(authorityWithPort),  	}, nil  } @@ -142,13 +125,9 @@ func (b *dnsBuilder) Build(target resolver.Target, cc resolver.ClientConn, opts  		disableServiceConfig: opts.DisableServiceConfig,  	} -	if target.URL.Host == "" { -		d.resolver = defaultResolver -	} else { -		d.resolver, err = newNetResolver(target.URL.Host) -		if err != nil { -			return nil, err -		} +	d.resolver, err = internal.NewNetResolver(target.URL.Host) +	if err != nil { +		return nil, err  	}  	d.wg.Add(1) @@ -161,12 +140,6 @@ func (b *dnsBuilder) Scheme() string {  	return "dns"  } -type netResolver interface { -	LookupHost(ctx context.Context, host string) (addrs []string, err error) -	LookupSRV(ctx context.Context, service, proto, name string) (cname string, addrs []*net.SRV, err error) -	LookupTXT(ctx context.Context, name string) (txts []string, err error) -} -  // deadResolver is a resolver that does nothing.  type deadResolver struct{} @@ -178,7 +151,7 @@ func (deadResolver) Close() {}  type dnsResolver struct {  	host     string  	port     string -	resolver netResolver +	resolver internal.NetResolver  	ctx      context.Context  	cancel   context.CancelFunc  	cc       resolver.ClientConn @@ -223,29 +196,27 @@ func (d *dnsResolver) watcher() {  			err = d.cc.UpdateState(*state)  		} -		var timer *time.Timer +		var waitTime time.Duration  		if err == nil {  			// Success resolving, wait for the next ResolveNow. However, also wait 30  			// seconds at the very least to prevent constantly re-resolving.  			backoffIndex = 1 -			timer = newTimerDNSResRate(minDNSResRate) +			waitTime = internal.MinResolutionRate  			select {  			case <-d.ctx.Done(): -				timer.Stop()  				return  			case <-d.rn:  			}  		} else {  			// Poll on an error found in DNS Resolver or an error received from  			// ClientConn. -			timer = newTimer(backoff.DefaultExponential.Backoff(backoffIndex)) +			waitTime = backoff.DefaultExponential.Backoff(backoffIndex)  			backoffIndex++  		}  		select {  		case <-d.ctx.Done(): -			timer.Stop()  			return -		case <-timer.C: +		case <-internal.TimeAfterFunc(waitTime):  		}  	}  } @@ -387,7 +358,7 @@ func formatIP(addr string) (addrIP string, ok bool) {  // target: ":80" defaultPort: "443" returns host: "localhost", port: "80"  func parseTarget(target, defaultPort string) (host, port string, err error) {  	if target == "" { -		return "", "", errMissingAddr +		return "", "", internal.ErrMissingAddr  	}  	if ip := net.ParseIP(target); ip != nil {  		// target is an IPv4 or IPv6(without brackets) address @@ -397,7 +368,7 @@ func parseTarget(target, defaultPort string) (host, port string, err error) {  		if port == "" {  			// If the port field is empty (target ends with colon), e.g. "[::1]:",  			// this is an error. -			return "", "", errEndsWithColon +			return "", "", internal.ErrEndsWithColon  		}  		// target has port, i.e ipv4-host:port, [ipv6-host]:port, host-name:port  		if host == "" { diff --git a/vendor/google.golang.org/grpc/internal/resolver/dns/internal/internal.go b/vendor/google.golang.org/grpc/internal/resolver/dns/internal/internal.go new file mode 100644 index 000000000..c7fc557d0 --- /dev/null +++ b/vendor/google.golang.org/grpc/internal/resolver/dns/internal/internal.go @@ -0,0 +1,70 @@ +/* + * + * Copyright 2023 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + *     http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +// Package internal contains functionality internal to the dns resolver package. +package internal + +import ( +	"context" +	"errors" +	"net" +	"time" +) + +// NetResolver groups the methods on net.Resolver that are used by the DNS +// resolver implementation. This allows the default net.Resolver instance to be +// overidden from tests. +type NetResolver interface { +	LookupHost(ctx context.Context, host string) (addrs []string, err error) +	LookupSRV(ctx context.Context, service, proto, name string) (cname string, addrs []*net.SRV, err error) +	LookupTXT(ctx context.Context, name string) (txts []string, err error) +} + +var ( +	// ErrMissingAddr is the error returned when building a DNS resolver when +	// the provided target name is empty. +	ErrMissingAddr = errors.New("dns resolver: missing address") + +	// ErrEndsWithColon is the error returned when building a DNS resolver when +	// the provided target name ends with a colon that is supposed to be the +	// separator between host and port.  E.g. "::" is a valid address as it is +	// an IPv6 address (host only) and "[::]:" is invalid as it ends with a +	// colon as the host and port separator +	ErrEndsWithColon = errors.New("dns resolver: missing port after port-separator colon") +) + +// The following vars are overridden from tests. +var ( +	// MinResolutionRate is the minimum rate at which re-resolutions are +	// allowed. This helps to prevent excessive re-resolution. +	MinResolutionRate = 30 * time.Second + +	// TimeAfterFunc is used by the DNS resolver to wait for the given duration +	// to elapse. In non-test code, this is implemented by time.After.  In test +	// code, this can be used to control the amount of time the resolver is +	// blocked waiting for the duration to elapse. +	TimeAfterFunc func(time.Duration) <-chan time.Time + +	// NewNetResolver returns the net.Resolver instance for the given target. +	NewNetResolver func(string) (NetResolver, error) + +	// AddressDialer is the dialer used to dial the DNS server. It accepts the +	// Host portion of the URL corresponding to the user's dial target and +	// returns a dial function. +	AddressDialer func(address string) func(context.Context, string, string) (net.Conn, error) +) diff --git a/vendor/google.golang.org/grpc/internal/resolver/unix/unix.go b/vendor/google.golang.org/grpc/internal/resolver/unix/unix.go index 160911687..27cd81af9 100644 --- a/vendor/google.golang.org/grpc/internal/resolver/unix/unix.go +++ b/vendor/google.golang.org/grpc/internal/resolver/unix/unix.go @@ -61,6 +61,10 @@ func (b *builder) Scheme() string {  	return b.scheme  } +func (b *builder) OverrideAuthority(resolver.Target) string { +	return "localhost" +} +  type nopResolver struct {  } diff --git a/vendor/google.golang.org/grpc/internal/tcp_keepalive_others.go b/vendor/google.golang.org/grpc/internal/tcp_keepalive_others.go new file mode 100644 index 000000000..4f347edd4 --- /dev/null +++ b/vendor/google.golang.org/grpc/internal/tcp_keepalive_others.go @@ -0,0 +1,29 @@ +//go:build !unix && !windows + +/* + * Copyright 2023 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + *     http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package internal + +import ( +	"net" +) + +// NetDialerWithTCPKeepalive returns a vanilla net.Dialer on non-unix platforms. +func NetDialerWithTCPKeepalive() *net.Dialer { +	return &net.Dialer{} +} diff --git a/vendor/google.golang.org/grpc/internal/tcp_keepalive_unix.go b/vendor/google.golang.org/grpc/internal/tcp_keepalive_unix.go new file mode 100644 index 000000000..078137b7f --- /dev/null +++ b/vendor/google.golang.org/grpc/internal/tcp_keepalive_unix.go @@ -0,0 +1,54 @@ +//go:build unix + +/* + * Copyright 2023 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + *     http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package internal + +import ( +	"net" +	"syscall" +	"time" + +	"golang.org/x/sys/unix" +) + +// NetDialerWithTCPKeepalive returns a net.Dialer that enables TCP keepalives on +// the underlying connection with OS default values for keepalive parameters. +// +// TODO: Once https://github.com/golang/go/issues/62254 lands, and the +// appropriate Go version becomes less than our least supported Go version, we +// should look into using the new API to make things more straightforward. +func NetDialerWithTCPKeepalive() *net.Dialer { +	return &net.Dialer{ +		// Setting a negative value here prevents the Go stdlib from overriding +		// the values of TCP keepalive time and interval. It also prevents the +		// Go stdlib from enabling TCP keepalives by default. +		KeepAlive: time.Duration(-1), +		// This method is called after the underlying network socket is created, +		// but before dialing the socket (or calling its connect() method). The +		// combination of unconditionally enabling TCP keepalives here, and +		// disabling the overriding of TCP keepalive parameters by setting the +		// KeepAlive field to a negative value above, results in OS defaults for +		// the TCP keealive interval and time parameters. +		Control: func(_, _ string, c syscall.RawConn) error { +			return c.Control(func(fd uintptr) { +				unix.SetsockoptInt(int(fd), unix.SOL_SOCKET, unix.SO_KEEPALIVE, 1) +			}) +		}, +	} +} diff --git a/vendor/google.golang.org/grpc/internal/tcp_keepalive_windows.go b/vendor/google.golang.org/grpc/internal/tcp_keepalive_windows.go new file mode 100644 index 000000000..fd7d43a89 --- /dev/null +++ b/vendor/google.golang.org/grpc/internal/tcp_keepalive_windows.go @@ -0,0 +1,54 @@ +//go:build windows + +/* + * Copyright 2023 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + *     http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package internal + +import ( +	"net" +	"syscall" +	"time" + +	"golang.org/x/sys/windows" +) + +// NetDialerWithTCPKeepalive returns a net.Dialer that enables TCP keepalives on +// the underlying connection with OS default values for keepalive parameters. +// +// TODO: Once https://github.com/golang/go/issues/62254 lands, and the +// appropriate Go version becomes less than our least supported Go version, we +// should look into using the new API to make things more straightforward. +func NetDialerWithTCPKeepalive() *net.Dialer { +	return &net.Dialer{ +		// Setting a negative value here prevents the Go stdlib from overriding +		// the values of TCP keepalive time and interval. It also prevents the +		// Go stdlib from enabling TCP keepalives by default. +		KeepAlive: time.Duration(-1), +		// This method is called after the underlying network socket is created, +		// but before dialing the socket (or calling its connect() method). The +		// combination of unconditionally enabling TCP keepalives here, and +		// disabling the overriding of TCP keepalive parameters by setting the +		// KeepAlive field to a negative value above, results in OS defaults for +		// the TCP keealive interval and time parameters. +		Control: func(_, _ string, c syscall.RawConn) error { +			return c.Control(func(fd uintptr) { +				windows.SetsockoptInt(windows.Handle(fd), windows.SOL_SOCKET, windows.SO_KEEPALIVE, 1) +			}) +		}, +	} +} diff --git a/vendor/google.golang.org/grpc/internal/transport/controlbuf.go b/vendor/google.golang.org/grpc/internal/transport/controlbuf.go index b330ccedc..83c382982 100644 --- a/vendor/google.golang.org/grpc/internal/transport/controlbuf.go +++ b/vendor/google.golang.org/grpc/internal/transport/controlbuf.go @@ -535,8 +535,8 @@ const minBatchSize = 1000  // size is too low to give stream goroutines a chance to fill it up.  //  // Upon exiting, if the error causing the exit is not an I/O error, run() -// flushes and closes the underlying connection.  Otherwise, the connection is -// left open to allow the I/O error to be encountered by the reader instead. +// flushes the underlying connection.  The connection is always left open to +// allow different closing behavior on the client and server.  func (l *loopyWriter) run() (err error) {  	defer func() {  		if l.logger.V(logLevel) { @@ -544,7 +544,6 @@ func (l *loopyWriter) run() (err error) {  		}  		if !isIOError(err) {  			l.framer.writer.Flush() -			l.conn.Close()  		}  		l.cbuf.finish()  	}() diff --git a/vendor/google.golang.org/grpc/internal/transport/handler_server.go b/vendor/google.golang.org/grpc/internal/transport/handler_server.go index 17f7a21b5..a9d70e2a1 100644 --- a/vendor/google.golang.org/grpc/internal/transport/handler_server.go +++ b/vendor/google.golang.org/grpc/internal/transport/handler_server.go @@ -75,11 +75,25 @@ func NewServerHandlerTransport(w http.ResponseWriter, r *http.Request, stats []s  		return nil, errors.New(msg)  	} +	var localAddr net.Addr +	if la := r.Context().Value(http.LocalAddrContextKey); la != nil { +		localAddr, _ = la.(net.Addr) +	} +	var authInfo credentials.AuthInfo +	if r.TLS != nil { +		authInfo = credentials.TLSInfo{State: *r.TLS, CommonAuthInfo: credentials.CommonAuthInfo{SecurityLevel: credentials.PrivacyAndIntegrity}} +	} +	p := peer.Peer{ +		Addr:      strAddr(r.RemoteAddr), +		LocalAddr: localAddr, +		AuthInfo:  authInfo, +	}  	st := &serverHandlerTransport{  		rw:             w,  		req:            r,  		closedCh:       make(chan struct{}),  		writes:         make(chan func()), +		peer:           p,  		contentType:    contentType,  		contentSubtype: contentSubtype,  		stats:          stats, @@ -134,6 +148,8 @@ type serverHandlerTransport struct {  	headerMD metadata.MD +	peer peer.Peer +  	closeOnce sync.Once  	closedCh  chan struct{} // closed on Close @@ -165,7 +181,13 @@ func (ht *serverHandlerTransport) Close(err error) {  	})  } -func (ht *serverHandlerTransport) RemoteAddr() net.Addr { return strAddr(ht.req.RemoteAddr) } +func (ht *serverHandlerTransport) Peer() *peer.Peer { +	return &peer.Peer{ +		Addr:      ht.peer.Addr, +		LocalAddr: ht.peer.LocalAddr, +		AuthInfo:  ht.peer.AuthInfo, +	} +}  // strAddr is a net.Addr backed by either a TCP "ip:port" string, or  // the empty string if unknown. @@ -347,10 +369,8 @@ func (ht *serverHandlerTransport) WriteHeader(s *Stream, md metadata.MD) error {  	return err  } -func (ht *serverHandlerTransport) HandleStreams(startStream func(*Stream)) { +func (ht *serverHandlerTransport) HandleStreams(ctx context.Context, startStream func(*Stream)) {  	// With this transport type there will be exactly 1 stream: this HTTP request. - -	ctx := ht.req.Context()  	var cancel context.CancelFunc  	if ht.timeoutSet {  		ctx, cancel = context.WithTimeout(ctx, ht.timeout) @@ -370,34 +390,19 @@ func (ht *serverHandlerTransport) HandleStreams(startStream func(*Stream)) {  		ht.Close(errors.New("request is done processing"))  	}() +	ctx = metadata.NewIncomingContext(ctx, ht.headerMD)  	req := ht.req -  	s := &Stream{ -		id:             0, // irrelevant -		requestRead:    func(int) {}, -		cancel:         cancel, -		buf:            newRecvBuffer(), -		st:             ht, -		method:         req.URL.Path, -		recvCompress:   req.Header.Get("grpc-encoding"), -		contentSubtype: ht.contentSubtype, -	} -	pr := &peer.Peer{ -		Addr: ht.RemoteAddr(), -	} -	if req.TLS != nil { -		pr.AuthInfo = credentials.TLSInfo{State: *req.TLS, CommonAuthInfo: credentials.CommonAuthInfo{SecurityLevel: credentials.PrivacyAndIntegrity}} -	} -	ctx = metadata.NewIncomingContext(ctx, ht.headerMD) -	s.ctx = peer.NewContext(ctx, pr) -	for _, sh := range ht.stats { -		s.ctx = sh.TagRPC(s.ctx, &stats.RPCTagInfo{FullMethodName: s.method}) -		inHeader := &stats.InHeader{ -			FullMethod:  s.method, -			RemoteAddr:  ht.RemoteAddr(), -			Compression: s.recvCompress, -		} -		sh.HandleRPC(s.ctx, inHeader) +		id:               0, // irrelevant +		ctx:              ctx, +		requestRead:      func(int) {}, +		cancel:           cancel, +		buf:              newRecvBuffer(), +		st:               ht, +		method:           req.URL.Path, +		recvCompress:     req.Header.Get("grpc-encoding"), +		contentSubtype:   ht.contentSubtype, +		headerWireLength: 0, // won't have access to header wire length until golang/go#18997.  	}  	s.trReader = &transportReader{  		reader:        &recvBufferReader{ctx: s.ctx, ctxDone: s.ctx.Done(), recv: s.buf, freeBuffer: func(*bytes.Buffer) {}}, diff --git a/vendor/google.golang.org/grpc/internal/transport/http2_client.go b/vendor/google.golang.org/grpc/internal/transport/http2_client.go index d6f5c4935..eff879964 100644 --- a/vendor/google.golang.org/grpc/internal/transport/http2_client.go +++ b/vendor/google.golang.org/grpc/internal/transport/http2_client.go @@ -36,6 +36,7 @@ import (  	"golang.org/x/net/http2/hpack"  	"google.golang.org/grpc/codes"  	"google.golang.org/grpc/credentials" +	"google.golang.org/grpc/internal"  	"google.golang.org/grpc/internal/channelz"  	icredentials "google.golang.org/grpc/internal/credentials"  	"google.golang.org/grpc/internal/grpclog" @@ -43,7 +44,7 @@ import (  	"google.golang.org/grpc/internal/grpcutil"  	imetadata "google.golang.org/grpc/internal/metadata"  	istatus "google.golang.org/grpc/internal/status" -	"google.golang.org/grpc/internal/syscall" +	isyscall "google.golang.org/grpc/internal/syscall"  	"google.golang.org/grpc/internal/transport/networktype"  	"google.golang.org/grpc/keepalive"  	"google.golang.org/grpc/metadata" @@ -58,6 +59,8 @@ import (  // atomically.  var clientConnectionCounter uint64 +var metadataFromOutgoingContextRaw = internal.FromOutgoingContextRaw.(func(context.Context) (metadata.MD, [][]string, bool)) +  // http2Client implements the ClientTransport interface with HTTP2.  type http2Client struct {  	lastRead  int64 // Keep this field 64-bit aligned. Accessed atomically. @@ -176,7 +179,7 @@ func dial(ctx context.Context, fn func(context.Context, string) (net.Conn, error  	if networkType == "tcp" && useProxy {  		return proxyDial(ctx, address, grpcUA)  	} -	return (&net.Dialer{}).DialContext(ctx, networkType, address) +	return internal.NetDialerWithTCPKeepalive().DialContext(ctx, networkType, address)  }  func isTemporary(err error) bool { @@ -262,7 +265,7 @@ func newHTTP2Client(connectCtx, ctx context.Context, addr resolver.Address, opts  	}  	keepaliveEnabled := false  	if kp.Time != infinity { -		if err = syscall.SetTCPUserTimeout(conn, kp.Timeout); err != nil { +		if err = isyscall.SetTCPUserTimeout(conn, kp.Timeout); err != nil {  			return nil, connectionErrorf(false, err, "transport: failed to set TCP_USER_TIMEOUT: %v", err)  		}  		keepaliveEnabled = true @@ -448,7 +451,13 @@ func newHTTP2Client(connectCtx, ctx context.Context, addr resolver.Address, opts  	}  	go func() {  		t.loopy = newLoopyWriter(clientSide, t.framer, t.controlBuf, t.bdpEst, t.conn, t.logger) -		t.loopy.run() +		if err := t.loopy.run(); !isIOError(err) { +			// Immediately close the connection, as the loopy writer returns +			// when there are no more active streams and we were draining (the +			// server sent a GOAWAY).  For I/O errors, the reader will hit it +			// after draining any remaining incoming data. +			t.conn.Close() +		}  		close(t.writerDone)  	}()  	return t, nil @@ -493,8 +502,9 @@ func (t *http2Client) newStream(ctx context.Context, callHdr *CallHdr) *Stream {  func (t *http2Client) getPeer() *peer.Peer {  	return &peer.Peer{ -		Addr:     t.remoteAddr, -		AuthInfo: t.authInfo, // Can be nil +		Addr:      t.remoteAddr, +		AuthInfo:  t.authInfo, // Can be nil +		LocalAddr: t.localAddr,  	}  } @@ -566,7 +576,7 @@ func (t *http2Client) createHeaderFields(ctx context.Context, callHdr *CallHdr)  		headerFields = append(headerFields, hpack.HeaderField{Name: "grpc-trace-bin", Value: encodeBinHeader(b)})  	} -	if md, added, ok := metadata.FromOutgoingContextRaw(ctx); ok { +	if md, added, ok := metadataFromOutgoingContextRaw(ctx); ok {  		var k string  		for k, vv := range md {  			// HTTP doesn't allow you to set pseudoheaders after non pseudoheaders were set. @@ -1321,10 +1331,8 @@ func (t *http2Client) handleGoAway(f *http2.GoAwayFrame) {  	for streamID, stream := range t.activeStreams {  		if streamID > id && streamID <= upperLimit {  			// The stream was unprocessed by the server. -			if streamID > id && streamID <= upperLimit { -				atomic.StoreUint32(&stream.unprocessed, 1) -				streamsToClose = append(streamsToClose, stream) -			} +			atomic.StoreUint32(&stream.unprocessed, 1) +			streamsToClose = append(streamsToClose, stream)  		}  	}  	t.mu.Unlock() diff --git a/vendor/google.golang.org/grpc/internal/transport/http2_server.go b/vendor/google.golang.org/grpc/internal/transport/http2_server.go index 6fa1eb419..a206e2eef 100644 --- a/vendor/google.golang.org/grpc/internal/transport/http2_server.go +++ b/vendor/google.golang.org/grpc/internal/transport/http2_server.go @@ -68,18 +68,15 @@ var serverConnectionCounter uint64  // http2Server implements the ServerTransport interface with HTTP2.  type http2Server struct { -	lastRead    int64 // Keep this field 64-bit aligned. Accessed atomically. -	ctx         context.Context -	done        chan struct{} -	conn        net.Conn -	loopy       *loopyWriter -	readerDone  chan struct{} // sync point to enable testing. -	writerDone  chan struct{} // sync point to enable testing. -	remoteAddr  net.Addr -	localAddr   net.Addr -	authInfo    credentials.AuthInfo // auth info about the connection -	inTapHandle tap.ServerInHandle -	framer      *framer +	lastRead        int64 // Keep this field 64-bit aligned. Accessed atomically. +	done            chan struct{} +	conn            net.Conn +	loopy           *loopyWriter +	readerDone      chan struct{} // sync point to enable testing. +	loopyWriterDone chan struct{} +	peer            peer.Peer +	inTapHandle     tap.ServerInHandle +	framer          *framer  	// The max number of concurrent streams.  	maxStreams uint32  	// controlBuf delivers all the control related tasks (e.g., window @@ -243,16 +240,18 @@ func NewServerTransport(conn net.Conn, config *ServerConfig) (_ ServerTransport,  	}  	done := make(chan struct{}) +	peer := peer.Peer{ +		Addr:      conn.RemoteAddr(), +		LocalAddr: conn.LocalAddr(), +		AuthInfo:  authInfo, +	}  	t := &http2Server{ -		ctx:               setConnection(context.Background(), rawConn),  		done:              done,  		conn:              conn, -		remoteAddr:        conn.RemoteAddr(), -		localAddr:         conn.LocalAddr(), -		authInfo:          authInfo, +		peer:              peer,  		framer:            framer,  		readerDone:        make(chan struct{}), -		writerDone:        make(chan struct{}), +		loopyWriterDone:   make(chan struct{}),  		maxStreams:        config.MaxStreams,  		inTapHandle:       config.InTapHandle,  		fc:                &trInFlow{limit: uint32(icwz)}, @@ -267,8 +266,6 @@ func NewServerTransport(conn net.Conn, config *ServerConfig) (_ ServerTransport,  		bufferPool:        newBufferPool(),  	}  	t.logger = prefixLoggerForServerTransport(t) -	// Add peer information to the http2server context. -	t.ctx = peer.NewContext(t.ctx, t.getPeer())  	t.controlBuf = newControlBuffer(t.done)  	if dynamicWindow { @@ -277,15 +274,7 @@ func NewServerTransport(conn net.Conn, config *ServerConfig) (_ ServerTransport,  			updateFlowControl: t.updateFlowControl,  		}  	} -	for _, sh := range t.stats { -		t.ctx = sh.TagConn(t.ctx, &stats.ConnTagInfo{ -			RemoteAddr: t.remoteAddr, -			LocalAddr:  t.localAddr, -		}) -		connBegin := &stats.ConnBegin{} -		sh.HandleConn(t.ctx, connBegin) -	} -	t.channelzID, err = channelz.RegisterNormalSocket(t, config.ChannelzParentID, fmt.Sprintf("%s -> %s", t.remoteAddr, t.localAddr)) +	t.channelzID, err = channelz.RegisterNormalSocket(t, config.ChannelzParentID, fmt.Sprintf("%s -> %s", t.peer.Addr, t.peer.LocalAddr))  	if err != nil {  		return nil, err  	} @@ -333,8 +322,24 @@ func NewServerTransport(conn net.Conn, config *ServerConfig) (_ ServerTransport,  	go func() {  		t.loopy = newLoopyWriter(serverSide, t.framer, t.controlBuf, t.bdpEst, t.conn, t.logger)  		t.loopy.ssGoAwayHandler = t.outgoingGoAwayHandler -		t.loopy.run() -		close(t.writerDone) +		err := t.loopy.run() +		close(t.loopyWriterDone) +		if !isIOError(err) { +			// Close the connection if a non-I/O error occurs (for I/O errors +			// the reader will also encounter the error and close).  Wait 1 +			// second before closing the connection, or when the reader is done +			// (i.e. the client already closed the connection or a connection +			// error occurred).  This avoids the potential problem where there +			// is unread data on the receive side of the connection, which, if +			// closed, would lead to a TCP RST instead of FIN, and the client +			// encountering errors.  For more info: +			// https://github.com/grpc/grpc-go/issues/5358 +			select { +			case <-t.readerDone: +			case <-time.After(time.Second): +			} +			t.conn.Close() +		}  	}()  	go t.keepalive()  	return t, nil @@ -342,7 +347,7 @@ func NewServerTransport(conn net.Conn, config *ServerConfig) (_ ServerTransport,  // operateHeaders takes action on the decoded headers. Returns an error if fatal  // error encountered and transport needs to close, otherwise returns nil. -func (t *http2Server) operateHeaders(frame *http2.MetaHeadersFrame, handle func(*Stream)) error { +func (t *http2Server) operateHeaders(ctx context.Context, frame *http2.MetaHeadersFrame, handle func(*Stream)) error {  	// Acquire max stream ID lock for entire duration  	t.maxStreamMu.Lock()  	defer t.maxStreamMu.Unlock() @@ -369,10 +374,11 @@ func (t *http2Server) operateHeaders(frame *http2.MetaHeadersFrame, handle func(  	buf := newRecvBuffer()  	s := &Stream{ -		id:  streamID, -		st:  t, -		buf: buf, -		fc:  &inFlow{limit: uint32(t.initialWindowSize)}, +		id:               streamID, +		st:               t, +		buf:              buf, +		fc:               &inFlow{limit: uint32(t.initialWindowSize)}, +		headerWireLength: int(frame.Header().Length),  	}  	var (  		// if false, content-type was missing or invalid @@ -511,9 +517,9 @@ func (t *http2Server) operateHeaders(frame *http2.MetaHeadersFrame, handle func(  		s.state = streamReadDone  	}  	if timeoutSet { -		s.ctx, s.cancel = context.WithTimeout(t.ctx, timeout) +		s.ctx, s.cancel = context.WithTimeout(ctx, timeout)  	} else { -		s.ctx, s.cancel = context.WithCancel(t.ctx) +		s.ctx, s.cancel = context.WithCancel(ctx)  	}  	// Attach the received metadata to the context. @@ -592,18 +598,6 @@ func (t *http2Server) operateHeaders(frame *http2.MetaHeadersFrame, handle func(  	s.requestRead = func(n int) {  		t.adjustWindow(s, uint32(n))  	} -	for _, sh := range t.stats { -		s.ctx = sh.TagRPC(s.ctx, &stats.RPCTagInfo{FullMethodName: s.method}) -		inHeader := &stats.InHeader{ -			FullMethod:  s.method, -			RemoteAddr:  t.remoteAddr, -			LocalAddr:   t.localAddr, -			Compression: s.recvCompress, -			WireLength:  int(frame.Header().Length), -			Header:      mdata.Copy(), -		} -		sh.HandleRPC(s.ctx, inHeader) -	}  	s.ctxDone = s.ctx.Done()  	s.wq = newWriteQuota(defaultWriteQuota, s.ctxDone)  	s.trReader = &transportReader{ @@ -629,8 +623,11 @@ func (t *http2Server) operateHeaders(frame *http2.MetaHeadersFrame, handle func(  // HandleStreams receives incoming streams using the given handler. This is  // typically run in a separate goroutine.  // traceCtx attaches trace to ctx and returns the new context. -func (t *http2Server) HandleStreams(handle func(*Stream)) { -	defer close(t.readerDone) +func (t *http2Server) HandleStreams(ctx context.Context, handle func(*Stream)) { +	defer func() { +		close(t.readerDone) +		<-t.loopyWriterDone +	}()  	for {  		t.controlBuf.throttle()  		frame, err := t.framer.fr.ReadFrame() @@ -664,7 +661,7 @@ func (t *http2Server) HandleStreams(handle func(*Stream)) {  		}  		switch frame := frame.(type) {  		case *http2.MetaHeadersFrame: -			if err := t.operateHeaders(frame, handle); err != nil { +			if err := t.operateHeaders(ctx, frame, handle); err != nil {  				t.Close(err)  				break  			} @@ -979,7 +976,12 @@ func (t *http2Server) WriteHeader(s *Stream, md metadata.MD) error {  		}  	}  	if err := t.writeHeaderLocked(s); err != nil { -		return status.Convert(err).Err() +		switch e := err.(type) { +		case ConnectionError: +			return status.Error(codes.Unavailable, e.Desc) +		default: +			return status.Convert(err).Err() +		}  	}  	return nil  } @@ -1242,10 +1244,6 @@ func (t *http2Server) Close(err error) {  	for _, s := range streams {  		s.cancel()  	} -	for _, sh := range t.stats { -		connEnd := &stats.ConnEnd{} -		sh.HandleConn(t.ctx, connEnd) -	}  }  // deleteStream deletes the stream s from transport's active streams. @@ -1311,10 +1309,6 @@ func (t *http2Server) closeStream(s *Stream, rst bool, rstCode http2.ErrCode, eo  	})  } -func (t *http2Server) RemoteAddr() net.Addr { -	return t.remoteAddr -} -  func (t *http2Server) Drain(debugData string) {  	t.mu.Lock()  	defer t.mu.Unlock() @@ -1351,6 +1345,7 @@ func (t *http2Server) outgoingGoAwayHandler(g *goAway) (bool, error) {  		if err := t.framer.fr.WriteGoAway(sid, g.code, g.debugData); err != nil {  			return false, err  		} +		t.framer.writer.Flush()  		if retErr != nil {  			return false, retErr  		} @@ -1371,7 +1366,7 @@ func (t *http2Server) outgoingGoAwayHandler(g *goAway) (bool, error) {  		return false, err  	}  	go func() { -		timer := time.NewTimer(time.Minute) +		timer := time.NewTimer(5 * time.Second)  		defer timer.Stop()  		select {  		case <-t.drainEvent.Done(): @@ -1397,11 +1392,11 @@ func (t *http2Server) ChannelzMetric() *channelz.SocketInternalMetric {  		LastMessageReceivedTimestamp:     time.Unix(0, atomic.LoadInt64(&t.czData.lastMsgRecvTime)),  		LocalFlowControlWindow:           int64(t.fc.getSize()),  		SocketOptions:                    channelz.GetSocketOption(t.conn), -		LocalAddr:                        t.localAddr, -		RemoteAddr:                       t.remoteAddr, +		LocalAddr:                        t.peer.LocalAddr, +		RemoteAddr:                       t.peer.Addr,  		// RemoteName :  	} -	if au, ok := t.authInfo.(credentials.ChannelzSecurityInfo); ok { +	if au, ok := t.peer.AuthInfo.(credentials.ChannelzSecurityInfo); ok {  		s.Security = au.GetSecurityValue()  	}  	s.RemoteFlowControlWindow = t.getOutFlowWindow() @@ -1433,10 +1428,12 @@ func (t *http2Server) getOutFlowWindow() int64 {  	}  } -func (t *http2Server) getPeer() *peer.Peer { +// Peer returns the peer of the transport. +func (t *http2Server) Peer() *peer.Peer {  	return &peer.Peer{ -		Addr:     t.remoteAddr, -		AuthInfo: t.authInfo, // Can be nil +		Addr:      t.peer.Addr, +		LocalAddr: t.peer.LocalAddr, +		AuthInfo:  t.peer.AuthInfo, // Can be nil  	}  } @@ -1461,6 +1458,6 @@ func GetConnection(ctx context.Context) net.Conn {  // SetConnection adds the connection to the context to be able to get  // information about the destination ip and port for an incoming RPC. This also  // allows any unary or streaming interceptors to see the connection. -func setConnection(ctx context.Context, conn net.Conn) context.Context { +func SetConnection(ctx context.Context, conn net.Conn) context.Context {  	return context.WithValue(ctx, connectionKey{}, conn)  } diff --git a/vendor/google.golang.org/grpc/internal/transport/proxy.go b/vendor/google.golang.org/grpc/internal/transport/proxy.go index 415961987..24fa10325 100644 --- a/vendor/google.golang.org/grpc/internal/transport/proxy.go +++ b/vendor/google.golang.org/grpc/internal/transport/proxy.go @@ -28,6 +28,8 @@ import (  	"net/http"  	"net/http/httputil"  	"net/url" + +	"google.golang.org/grpc/internal"  )  const proxyAuthHeaderKey = "Proxy-Authorization" @@ -112,7 +114,7 @@ func doHTTPConnectHandshake(ctx context.Context, conn net.Conn, backendAddr stri  // proxyDial dials, connecting to a proxy first if necessary. Checks if a proxy  // is necessary, dials, does the HTTP CONNECT handshake, and returns the  // connection. -func proxyDial(ctx context.Context, addr string, grpcUA string) (conn net.Conn, err error) { +func proxyDial(ctx context.Context, addr string, grpcUA string) (net.Conn, error) {  	newAddr := addr  	proxyURL, err := mapAddress(addr)  	if err != nil { @@ -122,15 +124,15 @@ func proxyDial(ctx context.Context, addr string, grpcUA string) (conn net.Conn,  		newAddr = proxyURL.Host  	} -	conn, err = (&net.Dialer{}).DialContext(ctx, "tcp", newAddr) +	conn, err := internal.NetDialerWithTCPKeepalive().DialContext(ctx, "tcp", newAddr)  	if err != nil { -		return +		return nil, err  	} -	if proxyURL != nil { +	if proxyURL == nil {  		// proxy is disabled if proxyURL is nil. -		conn, err = doHTTPConnectHandshake(ctx, conn, addr, proxyURL, grpcUA) +		return conn, err  	} -	return +	return doHTTPConnectHandshake(ctx, conn, addr, proxyURL, grpcUA)  }  func sendHTTPRequest(ctx context.Context, req *http.Request, conn net.Conn) error { diff --git a/vendor/google.golang.org/grpc/internal/transport/transport.go b/vendor/google.golang.org/grpc/internal/transport/transport.go index aac056e72..b7b8fec18 100644 --- a/vendor/google.golang.org/grpc/internal/transport/transport.go +++ b/vendor/google.golang.org/grpc/internal/transport/transport.go @@ -37,6 +37,7 @@ import (  	"google.golang.org/grpc/internal/channelz"  	"google.golang.org/grpc/keepalive"  	"google.golang.org/grpc/metadata" +	"google.golang.org/grpc/peer"  	"google.golang.org/grpc/resolver"  	"google.golang.org/grpc/stats"  	"google.golang.org/grpc/status" @@ -265,7 +266,8 @@ type Stream struct {  	// headerValid indicates whether a valid header was received.  Only  	// meaningful after headerChan is closed (always call waitOnHeader() before  	// reading its value).  Not valid on server side. -	headerValid bool +	headerValid      bool +	headerWireLength int // Only set on server side.  	// hdrMu protects header and trailer metadata on the server-side.  	hdrMu sync.Mutex @@ -425,6 +427,12 @@ func (s *Stream) Context() context.Context {  	return s.ctx  } +// SetContext sets the context of the stream. This will be deleted once the +// stats handler callouts all move to gRPC layer. +func (s *Stream) SetContext(ctx context.Context) { +	s.ctx = ctx +} +  // Method returns the method for the stream.  func (s *Stream) Method() string {  	return s.method @@ -437,6 +445,12 @@ func (s *Stream) Status() *status.Status {  	return s.status  } +// HeaderWireLength returns the size of the headers of the stream as received +// from the wire. Valid only on the server. +func (s *Stream) HeaderWireLength() int { +	return s.headerWireLength +} +  // SetHeader sets the header metadata. This can be called multiple times.  // Server side only.  // This should not be called in parallel to other data writes. @@ -698,7 +712,7 @@ type ClientTransport interface {  // Write methods for a given Stream will be called serially.  type ServerTransport interface {  	// HandleStreams receives incoming streams using the given handler. -	HandleStreams(func(*Stream)) +	HandleStreams(context.Context, func(*Stream))  	// WriteHeader sends the header metadata for the given stream.  	// WriteHeader may not be called on all streams. @@ -717,8 +731,8 @@ type ServerTransport interface {  	// handlers will be terminated asynchronously.  	Close(err error) -	// RemoteAddr returns the remote network address. -	RemoteAddr() net.Addr +	// Peer returns the peer of the server transport. +	Peer() *peer.Peer  	// Drain notifies the client this ServerTransport stops accepting new RPCs.  	Drain(debugData string) diff --git a/vendor/google.golang.org/grpc/metadata/metadata.go b/vendor/google.golang.org/grpc/metadata/metadata.go index a2cdcaf12..1e9485fd6 100644 --- a/vendor/google.golang.org/grpc/metadata/metadata.go +++ b/vendor/google.golang.org/grpc/metadata/metadata.go @@ -25,8 +25,14 @@ import (  	"context"  	"fmt"  	"strings" + +	"google.golang.org/grpc/internal"  ) +func init() { +	internal.FromOutgoingContextRaw = fromOutgoingContextRaw +} +  // DecodeKeyValue returns k, v, nil.  //  // Deprecated: use k and v directly instead. @@ -153,14 +159,16 @@ func Join(mds ...MD) MD {  type mdIncomingKey struct{}  type mdOutgoingKey struct{} -// NewIncomingContext creates a new context with incoming md attached. +// NewIncomingContext creates a new context with incoming md attached. md must +// not be modified after calling this function.  func NewIncomingContext(ctx context.Context, md MD) context.Context {  	return context.WithValue(ctx, mdIncomingKey{}, md)  }  // NewOutgoingContext creates a new context with outgoing md attached. If used  // in conjunction with AppendToOutgoingContext, NewOutgoingContext will -// overwrite any previously-appended metadata. +// overwrite any previously-appended metadata. md must not be modified after +// calling this function.  func NewOutgoingContext(ctx context.Context, md MD) context.Context {  	return context.WithValue(ctx, mdOutgoingKey{}, rawMD{md: md})  } @@ -203,7 +211,8 @@ func FromIncomingContext(ctx context.Context) (MD, bool) {  }  // ValueFromIncomingContext returns the metadata value corresponding to the metadata -// key from the incoming metadata if it exists. Key must be lower-case. +// key from the incoming metadata if it exists. Keys are matched in a case insensitive +// manner.  //  // # Experimental  // @@ -219,33 +228,29 @@ func ValueFromIncomingContext(ctx context.Context, key string) []string {  		return copyOf(v)  	}  	for k, v := range md { -		// We need to manually convert all keys to lower case, because MD is a -		// map, and there's no guarantee that the MD attached to the context is -		// created using our helper functions. -		if strings.ToLower(k) == key { +		// Case insenitive comparison: MD is a map, and there's no guarantee +		// that the MD attached to the context is created using our helper +		// functions. +		if strings.EqualFold(k, key) {  			return copyOf(v)  		}  	}  	return nil  } -// the returned slice must not be modified in place  func copyOf(v []string) []string {  	vals := make([]string, len(v))  	copy(vals, v)  	return vals  } -// FromOutgoingContextRaw returns the un-merged, intermediary contents of rawMD. +// fromOutgoingContextRaw returns the un-merged, intermediary contents of rawMD.  //  // Remember to perform strings.ToLower on the keys, for both the returned MD (MD  // is a map, there's no guarantee it's created using our helper functions) and  // the extra kv pairs (AppendToOutgoingContext doesn't turn them into  // lowercase). -// -// This is intended for gRPC-internal use ONLY. Users should use -// FromOutgoingContext instead. -func FromOutgoingContextRaw(ctx context.Context) (MD, [][]string, bool) { +func fromOutgoingContextRaw(ctx context.Context) (MD, [][]string, bool) {  	raw, ok := ctx.Value(mdOutgoingKey{}).(rawMD)  	if !ok {  		return nil, nil, false diff --git a/vendor/google.golang.org/grpc/peer/peer.go b/vendor/google.golang.org/grpc/peer/peer.go index e01d219ff..a821ff9b2 100644 --- a/vendor/google.golang.org/grpc/peer/peer.go +++ b/vendor/google.golang.org/grpc/peer/peer.go @@ -32,6 +32,8 @@ import (  type Peer struct {  	// Addr is the peer address.  	Addr net.Addr +	// LocalAddr is the local address. +	LocalAddr net.Addr  	// AuthInfo is the authentication information of the transport.  	// It is nil if there is no transport security being used.  	AuthInfo credentials.AuthInfo diff --git a/vendor/google.golang.org/grpc/picker_wrapper.go b/vendor/google.golang.org/grpc/picker_wrapper.go index 236837f41..bf56faa76 100644 --- a/vendor/google.golang.org/grpc/picker_wrapper.go +++ b/vendor/google.golang.org/grpc/picker_wrapper.go @@ -37,7 +37,6 @@ import (  type pickerWrapper struct {  	mu            sync.Mutex  	done          bool -	idle          bool  	blockingCh    chan struct{}  	picker        balancer.Picker  	statsHandlers []stats.Handler // to record blocking picker calls @@ -53,11 +52,7 @@ func newPickerWrapper(statsHandlers []stats.Handler) *pickerWrapper {  // updatePicker is called by UpdateBalancerState. It unblocks all blocked pick.  func (pw *pickerWrapper) updatePicker(p balancer.Picker) {  	pw.mu.Lock() -	if pw.done || pw.idle { -		// There is a small window where a picker update from the LB policy can -		// race with the channel going to idle mode. If the picker is idle here, -		// it is because the channel asked it to do so, and therefore it is sage -		// to ignore the update from the LB policy. +	if pw.done {  		pw.mu.Unlock()  		return  	} @@ -210,23 +205,15 @@ func (pw *pickerWrapper) close() {  	close(pw.blockingCh)  } -func (pw *pickerWrapper) enterIdleMode() { -	pw.mu.Lock() -	defer pw.mu.Unlock() -	if pw.done { -		return -	} -	pw.idle = true -} - -func (pw *pickerWrapper) exitIdleMode() { +// reset clears the pickerWrapper and prepares it for being used again when idle +// mode is exited. +func (pw *pickerWrapper) reset() {  	pw.mu.Lock()  	defer pw.mu.Unlock()  	if pw.done {  		return  	}  	pw.blockingCh = make(chan struct{}) -	pw.idle = false  }  // dropError is a wrapper error that indicates the LB policy wishes to drop the diff --git a/vendor/google.golang.org/grpc/pickfirst.go b/vendor/google.golang.org/grpc/pickfirst.go index 2e9cf66b4..5128f9364 100644 --- a/vendor/google.golang.org/grpc/pickfirst.go +++ b/vendor/google.golang.org/grpc/pickfirst.go @@ -25,7 +25,6 @@ import (  	"google.golang.org/grpc/balancer"  	"google.golang.org/grpc/connectivity" -	"google.golang.org/grpc/internal/envconfig"  	internalgrpclog "google.golang.org/grpc/internal/grpclog"  	"google.golang.org/grpc/internal/grpcrand"  	"google.golang.org/grpc/internal/pretty" @@ -65,19 +64,6 @@ type pfConfig struct {  }  func (*pickfirstBuilder) ParseConfig(js json.RawMessage) (serviceconfig.LoadBalancingConfig, error) { -	if !envconfig.PickFirstLBConfig { -		// Prior to supporting loadbalancing configuration, the pick_first LB -		// policy did not implement the balancer.ConfigParser interface. This -		// meant that if a non-empty configuration was passed to it, the service -		// config unmarshaling code would throw a warning log, but would -		// continue using the pick_first LB policy. The code below ensures the -		// same behavior is retained if the env var is not set. -		if string(js) != "{}" { -			logger.Warningf("Ignoring non-empty balancer configuration %q for the pick_first LB policy", string(js)) -		} -		return nil, nil -	} -  	var cfg pfConfig  	if err := json.Unmarshal(js, &cfg); err != nil {  		return nil, fmt.Errorf("pickfirst: unable to unmarshal LB policy config: %s, error: %v", string(js), err) diff --git a/vendor/google.golang.org/grpc/resolver/dns/dns_resolver.go b/vendor/google.golang.org/grpc/resolver/dns/dns_resolver.go new file mode 100644 index 000000000..14aa6f20a --- /dev/null +++ b/vendor/google.golang.org/grpc/resolver/dns/dns_resolver.go @@ -0,0 +1,36 @@ +/* + * + * Copyright 2018 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + *     http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +// Package dns implements a dns resolver to be installed as the default resolver +// in grpc. +// +// Deprecated: this package is imported by grpc and should not need to be +// imported directly by users. +package dns + +import ( +	"google.golang.org/grpc/internal/resolver/dns" +	"google.golang.org/grpc/resolver" +) + +// NewBuilder creates a dnsBuilder which is used to factory DNS resolvers. +// +// Deprecated: import grpc and use resolver.Get("dns") instead. +func NewBuilder() resolver.Builder { +	return dns.NewBuilder() +} diff --git a/vendor/google.golang.org/grpc/resolver/map.go b/vendor/google.golang.org/grpc/resolver/map.go index 804be887d..ada5b9bb7 100644 --- a/vendor/google.golang.org/grpc/resolver/map.go +++ b/vendor/google.golang.org/grpc/resolver/map.go @@ -136,3 +136,116 @@ func (a *AddressMap) Values() []any {  	}  	return ret  } + +type endpointNode struct { +	addrs map[string]struct{} +} + +// Equal returns whether the unordered set of addrs are the same between the +// endpoint nodes. +func (en *endpointNode) Equal(en2 *endpointNode) bool { +	if len(en.addrs) != len(en2.addrs) { +		return false +	} +	for addr := range en.addrs { +		if _, ok := en2.addrs[addr]; !ok { +			return false +		} +	} +	return true +} + +func toEndpointNode(endpoint Endpoint) endpointNode { +	en := make(map[string]struct{}) +	for _, addr := range endpoint.Addresses { +		en[addr.Addr] = struct{}{} +	} +	return endpointNode{ +		addrs: en, +	} +} + +// EndpointMap is a map of endpoints to arbitrary values keyed on only the +// unordered set of address strings within an endpoint. This map is not thread +// safe, thus it is unsafe to access concurrently. Must be created via +// NewEndpointMap; do not construct directly. +type EndpointMap struct { +	endpoints map[*endpointNode]any +} + +// NewEndpointMap creates a new EndpointMap. +func NewEndpointMap() *EndpointMap { +	return &EndpointMap{ +		endpoints: make(map[*endpointNode]any), +	} +} + +// Get returns the value for the address in the map, if present. +func (em *EndpointMap) Get(e Endpoint) (value any, ok bool) { +	en := toEndpointNode(e) +	if endpoint := em.find(en); endpoint != nil { +		return em.endpoints[endpoint], true +	} +	return nil, false +} + +// Set updates or adds the value to the address in the map. +func (em *EndpointMap) Set(e Endpoint, value any) { +	en := toEndpointNode(e) +	if endpoint := em.find(en); endpoint != nil { +		em.endpoints[endpoint] = value +		return +	} +	em.endpoints[&en] = value +} + +// Len returns the number of entries in the map. +func (em *EndpointMap) Len() int { +	return len(em.endpoints) +} + +// Keys returns a slice of all current map keys, as endpoints specifying the +// addresses present in the endpoint keys, in which uniqueness is determined by +// the unordered set of addresses. Thus, endpoint information returned is not +// the full endpoint data (drops duplicated addresses and attributes) but can be +// used for EndpointMap accesses. +func (em *EndpointMap) Keys() []Endpoint { +	ret := make([]Endpoint, 0, len(em.endpoints)) +	for en := range em.endpoints { +		var endpoint Endpoint +		for addr := range en.addrs { +			endpoint.Addresses = append(endpoint.Addresses, Address{Addr: addr}) +		} +		ret = append(ret, endpoint) +	} +	return ret +} + +// Values returns a slice of all current map values. +func (em *EndpointMap) Values() []any { +	ret := make([]any, 0, len(em.endpoints)) +	for _, val := range em.endpoints { +		ret = append(ret, val) +	} +	return ret +} + +// find returns a pointer to the endpoint node in em if the endpoint node is +// already present. If not found, nil is returned. The comparisons are done on +// the unordered set of addresses within an endpoint. +func (em EndpointMap) find(e endpointNode) *endpointNode { +	for endpoint := range em.endpoints { +		if e.Equal(endpoint) { +			return endpoint +		} +	} +	return nil +} + +// Delete removes the specified endpoint from the map. +func (em *EndpointMap) Delete(e Endpoint) { +	en := toEndpointNode(e) +	if entry := em.find(en); entry != nil { +		delete(em.endpoints, entry) +	} +} diff --git a/vendor/google.golang.org/grpc/resolver/resolver.go b/vendor/google.golang.org/grpc/resolver/resolver.go index 11384e228..adf89dd9c 100644 --- a/vendor/google.golang.org/grpc/resolver/resolver.go +++ b/vendor/google.golang.org/grpc/resolver/resolver.go @@ -240,11 +240,6 @@ type ClientConn interface {  	//  	// Deprecated: Use UpdateState instead.  	NewAddress(addresses []Address) -	// NewServiceConfig is called by resolver to notify ClientConn a new -	// service config. The service config should be provided as a json string. -	// -	// Deprecated: Use UpdateState instead. -	NewServiceConfig(serviceConfig string)  	// ParseServiceConfig parses the provided service config and returns an  	// object that provides the parsed config.  	ParseServiceConfig(serviceConfigJSON string) *serviceconfig.ParseResult @@ -286,6 +281,11 @@ func (t Target) Endpoint() string {  	return strings.TrimPrefix(endpoint, "/")  } +// String returns a string representation of Target. +func (t Target) String() string { +	return t.URL.String() +} +  // Builder creates a resolver that will be used to watch name resolution updates.  type Builder interface {  	// Build creates a new resolver for the given target. @@ -314,3 +314,13 @@ type Resolver interface {  	// Close closes the resolver.  	Close()  } + +// AuthorityOverrider is implemented by Builders that wish to override the +// default authority for the ClientConn. +// By default, the authority used is target.Endpoint(). +type AuthorityOverrider interface { +	// OverrideAuthority returns the authority to use for a ClientConn with the +	// given target. The implementation must generate it without blocking, +	// typically in line, and must keep it unchanged. +	OverrideAuthority(Target) string +} diff --git a/vendor/google.golang.org/grpc/resolver_conn_wrapper.go b/vendor/google.golang.org/grpc/resolver_conn_wrapper.go deleted file mode 100644 index d68330560..000000000 --- a/vendor/google.golang.org/grpc/resolver_conn_wrapper.go +++ /dev/null @@ -1,247 +0,0 @@ -/* - * - * Copyright 2017 gRPC authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - *     http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -package grpc - -import ( -	"context" -	"strings" -	"sync" - -	"google.golang.org/grpc/balancer" -	"google.golang.org/grpc/internal/channelz" -	"google.golang.org/grpc/internal/grpcsync" -	"google.golang.org/grpc/internal/pretty" -	"google.golang.org/grpc/resolver" -	"google.golang.org/grpc/serviceconfig" -) - -// resolverStateUpdater wraps the single method used by ccResolverWrapper to -// report a state update from the actual resolver implementation. -type resolverStateUpdater interface { -	updateResolverState(s resolver.State, err error) error -} - -// ccResolverWrapper is a wrapper on top of cc for resolvers. -// It implements resolver.ClientConn interface. -type ccResolverWrapper struct { -	// The following fields are initialized when the wrapper is created and are -	// read-only afterwards, and therefore can be accessed without a mutex. -	cc                  resolverStateUpdater -	channelzID          *channelz.Identifier -	ignoreServiceConfig bool -	opts                ccResolverWrapperOpts -	serializer          *grpcsync.CallbackSerializer // To serialize all incoming calls. -	serializerCancel    context.CancelFunc           // To close the serializer, accessed only from close(). - -	// All incoming (resolver --> gRPC) calls are guaranteed to execute in a -	// mutually exclusive manner as they are scheduled on the serializer. -	// Fields accessed *only* in these serializer callbacks, can therefore be -	// accessed without a mutex. -	curState resolver.State - -	// mu guards access to the below fields. -	mu       sync.Mutex -	closed   bool -	resolver resolver.Resolver // Accessed only from outgoing calls. -} - -// ccResolverWrapperOpts wraps the arguments to be passed when creating a new -// ccResolverWrapper. -type ccResolverWrapperOpts struct { -	target     resolver.Target       // User specified dial target to resolve. -	builder    resolver.Builder      // Resolver builder to use. -	bOpts      resolver.BuildOptions // Resolver build options to use. -	channelzID *channelz.Identifier  // Channelz identifier for the channel. -} - -// newCCResolverWrapper uses the resolver.Builder to build a Resolver and -// returns a ccResolverWrapper object which wraps the newly built resolver. -func newCCResolverWrapper(cc resolverStateUpdater, opts ccResolverWrapperOpts) (*ccResolverWrapper, error) { -	ctx, cancel := context.WithCancel(context.Background()) -	ccr := &ccResolverWrapper{ -		cc:                  cc, -		channelzID:          opts.channelzID, -		ignoreServiceConfig: opts.bOpts.DisableServiceConfig, -		opts:                opts, -		serializer:          grpcsync.NewCallbackSerializer(ctx), -		serializerCancel:    cancel, -	} - -	// Cannot hold the lock at build time because the resolver can send an -	// update or error inline and these incoming calls grab the lock to schedule -	// a callback in the serializer. -	r, err := opts.builder.Build(opts.target, ccr, opts.bOpts) -	if err != nil { -		cancel() -		return nil, err -	} - -	// Any error reported by the resolver at build time that leads to a -	// re-resolution request from the balancer is dropped by grpc until we -	// return from this function. So, we don't have to handle pending resolveNow -	// requests here. -	ccr.mu.Lock() -	ccr.resolver = r -	ccr.mu.Unlock() - -	return ccr, nil -} - -func (ccr *ccResolverWrapper) resolveNow(o resolver.ResolveNowOptions) { -	ccr.mu.Lock() -	defer ccr.mu.Unlock() - -	// ccr.resolver field is set only after the call to Build() returns. But in -	// the process of building, the resolver may send an error update which when -	// propagated to the balancer may result in a re-resolution request. -	if ccr.closed || ccr.resolver == nil { -		return -	} -	ccr.resolver.ResolveNow(o) -} - -func (ccr *ccResolverWrapper) close() { -	ccr.mu.Lock() -	if ccr.closed { -		ccr.mu.Unlock() -		return -	} - -	channelz.Info(logger, ccr.channelzID, "Closing the name resolver") - -	// Close the serializer to ensure that no more calls from the resolver are -	// handled, before actually closing the resolver. -	ccr.serializerCancel() -	ccr.closed = true -	r := ccr.resolver -	ccr.mu.Unlock() - -	// Give enqueued callbacks a chance to finish. -	<-ccr.serializer.Done() - -	// Spawn a goroutine to close the resolver (since it may block trying to -	// cleanup all allocated resources) and return early. -	go r.Close() -} - -// serializerScheduleLocked is a convenience method to schedule a function to be -// run on the serializer while holding ccr.mu. -func (ccr *ccResolverWrapper) serializerScheduleLocked(f func(context.Context)) { -	ccr.mu.Lock() -	ccr.serializer.Schedule(f) -	ccr.mu.Unlock() -} - -// UpdateState is called by resolver implementations to report new state to gRPC -// which includes addresses and service config. -func (ccr *ccResolverWrapper) UpdateState(s resolver.State) error { -	errCh := make(chan error, 1) -	if s.Endpoints == nil { -		s.Endpoints = make([]resolver.Endpoint, 0, len(s.Addresses)) -		for _, a := range s.Addresses { -			ep := resolver.Endpoint{Addresses: []resolver.Address{a}, Attributes: a.BalancerAttributes} -			ep.Addresses[0].BalancerAttributes = nil -			s.Endpoints = append(s.Endpoints, ep) -		} -	} -	ok := ccr.serializer.Schedule(func(context.Context) { -		ccr.addChannelzTraceEvent(s) -		ccr.curState = s -		if err := ccr.cc.updateResolverState(ccr.curState, nil); err == balancer.ErrBadResolverState { -			errCh <- balancer.ErrBadResolverState -			return -		} -		errCh <- nil -	}) -	if !ok { -		// The only time when Schedule() fail to add the callback to the -		// serializer is when the serializer is closed, and this happens only -		// when the resolver wrapper is closed. -		return nil -	} -	return <-errCh -} - -// ReportError is called by resolver implementations to report errors -// encountered during name resolution to gRPC. -func (ccr *ccResolverWrapper) ReportError(err error) { -	ccr.serializerScheduleLocked(func(_ context.Context) { -		channelz.Warningf(logger, ccr.channelzID, "ccResolverWrapper: reporting error to cc: %v", err) -		ccr.cc.updateResolverState(resolver.State{}, err) -	}) -} - -// NewAddress is called by the resolver implementation to send addresses to -// gRPC. -func (ccr *ccResolverWrapper) NewAddress(addrs []resolver.Address) { -	ccr.serializerScheduleLocked(func(_ context.Context) { -		ccr.addChannelzTraceEvent(resolver.State{Addresses: addrs, ServiceConfig: ccr.curState.ServiceConfig}) -		ccr.curState.Addresses = addrs -		ccr.cc.updateResolverState(ccr.curState, nil) -	}) -} - -// NewServiceConfig is called by the resolver implementation to send service -// configs to gRPC. -func (ccr *ccResolverWrapper) NewServiceConfig(sc string) { -	ccr.serializerScheduleLocked(func(_ context.Context) { -		channelz.Infof(logger, ccr.channelzID, "ccResolverWrapper: got new service config: %s", sc) -		if ccr.ignoreServiceConfig { -			channelz.Info(logger, ccr.channelzID, "Service config lookups disabled; ignoring config") -			return -		} -		scpr := parseServiceConfig(sc) -		if scpr.Err != nil { -			channelz.Warningf(logger, ccr.channelzID, "ccResolverWrapper: error parsing service config: %v", scpr.Err) -			return -		} -		ccr.addChannelzTraceEvent(resolver.State{Addresses: ccr.curState.Addresses, ServiceConfig: scpr}) -		ccr.curState.ServiceConfig = scpr -		ccr.cc.updateResolverState(ccr.curState, nil) -	}) -} - -// ParseServiceConfig is called by resolver implementations to parse a JSON -// representation of the service config. -func (ccr *ccResolverWrapper) ParseServiceConfig(scJSON string) *serviceconfig.ParseResult { -	return parseServiceConfig(scJSON) -} - -// addChannelzTraceEvent adds a channelz trace event containing the new -// state received from resolver implementations. -func (ccr *ccResolverWrapper) addChannelzTraceEvent(s resolver.State) { -	var updates []string -	var oldSC, newSC *ServiceConfig -	var oldOK, newOK bool -	if ccr.curState.ServiceConfig != nil { -		oldSC, oldOK = ccr.curState.ServiceConfig.Config.(*ServiceConfig) -	} -	if s.ServiceConfig != nil { -		newSC, newOK = s.ServiceConfig.Config.(*ServiceConfig) -	} -	if oldOK != newOK || (oldOK && newOK && oldSC.rawJSONString != newSC.rawJSONString) { -		updates = append(updates, "service config updated") -	} -	if len(ccr.curState.Addresses) > 0 && len(s.Addresses) == 0 { -		updates = append(updates, "resolver returned an empty address list") -	} else if len(ccr.curState.Addresses) == 0 && len(s.Addresses) > 0 { -		updates = append(updates, "resolver returned new addresses") -	} -	channelz.Infof(logger, ccr.channelzID, "Resolver state updated: %s (%v)", pretty.ToJSON(s), strings.Join(updates, "; ")) -} diff --git a/vendor/google.golang.org/grpc/resolver_wrapper.go b/vendor/google.golang.org/grpc/resolver_wrapper.go new file mode 100644 index 000000000..c79bab121 --- /dev/null +++ b/vendor/google.golang.org/grpc/resolver_wrapper.go @@ -0,0 +1,197 @@ +/* + * + * Copyright 2017 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + *     http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package grpc + +import ( +	"context" +	"strings" +	"sync" + +	"google.golang.org/grpc/internal/channelz" +	"google.golang.org/grpc/internal/grpcsync" +	"google.golang.org/grpc/internal/pretty" +	"google.golang.org/grpc/resolver" +	"google.golang.org/grpc/serviceconfig" +) + +// ccResolverWrapper is a wrapper on top of cc for resolvers. +// It implements resolver.ClientConn interface. +type ccResolverWrapper struct { +	// The following fields are initialized when the wrapper is created and are +	// read-only afterwards, and therefore can be accessed without a mutex. +	cc                  *ClientConn +	ignoreServiceConfig bool +	serializer          *grpcsync.CallbackSerializer +	serializerCancel    context.CancelFunc + +	resolver resolver.Resolver // only accessed within the serializer + +	// The following fields are protected by mu.  Caller must take cc.mu before +	// taking mu. +	mu       sync.Mutex +	curState resolver.State +	closed   bool +} + +// newCCResolverWrapper initializes the ccResolverWrapper.  It can only be used +// after calling start, which builds the resolver. +func newCCResolverWrapper(cc *ClientConn) *ccResolverWrapper { +	ctx, cancel := context.WithCancel(cc.ctx) +	return &ccResolverWrapper{ +		cc:                  cc, +		ignoreServiceConfig: cc.dopts.disableServiceConfig, +		serializer:          grpcsync.NewCallbackSerializer(ctx), +		serializerCancel:    cancel, +	} +} + +// start builds the name resolver using the resolver.Builder in cc and returns +// any error encountered.  It must always be the first operation performed on +// any newly created ccResolverWrapper, except that close may be called instead. +func (ccr *ccResolverWrapper) start() error { +	errCh := make(chan error) +	ccr.serializer.Schedule(func(ctx context.Context) { +		if ctx.Err() != nil { +			return +		} +		opts := resolver.BuildOptions{ +			DisableServiceConfig: ccr.cc.dopts.disableServiceConfig, +			DialCreds:            ccr.cc.dopts.copts.TransportCredentials, +			CredsBundle:          ccr.cc.dopts.copts.CredsBundle, +			Dialer:               ccr.cc.dopts.copts.Dialer, +		} +		var err error +		ccr.resolver, err = ccr.cc.resolverBuilder.Build(ccr.cc.parsedTarget, ccr, opts) +		errCh <- err +	}) +	return <-errCh +} + +func (ccr *ccResolverWrapper) resolveNow(o resolver.ResolveNowOptions) { +	ccr.serializer.Schedule(func(ctx context.Context) { +		if ctx.Err() != nil || ccr.resolver == nil { +			return +		} +		ccr.resolver.ResolveNow(o) +	}) +} + +// close initiates async shutdown of the wrapper.  To determine the wrapper has +// finished shutting down, the channel should block on ccr.serializer.Done() +// without cc.mu held. +func (ccr *ccResolverWrapper) close() { +	channelz.Info(logger, ccr.cc.channelzID, "Closing the name resolver") +	ccr.mu.Lock() +	ccr.closed = true +	ccr.mu.Unlock() + +	ccr.serializer.Schedule(func(context.Context) { +		if ccr.resolver == nil { +			return +		} +		ccr.resolver.Close() +		ccr.resolver = nil +	}) +	ccr.serializerCancel() +} + +// UpdateState is called by resolver implementations to report new state to gRPC +// which includes addresses and service config. +func (ccr *ccResolverWrapper) UpdateState(s resolver.State) error { +	ccr.cc.mu.Lock() +	ccr.mu.Lock() +	if ccr.closed { +		ccr.mu.Unlock() +		ccr.cc.mu.Unlock() +		return nil +	} +	if s.Endpoints == nil { +		s.Endpoints = make([]resolver.Endpoint, 0, len(s.Addresses)) +		for _, a := range s.Addresses { +			ep := resolver.Endpoint{Addresses: []resolver.Address{a}, Attributes: a.BalancerAttributes} +			ep.Addresses[0].BalancerAttributes = nil +			s.Endpoints = append(s.Endpoints, ep) +		} +	} +	ccr.addChannelzTraceEvent(s) +	ccr.curState = s +	ccr.mu.Unlock() +	return ccr.cc.updateResolverStateAndUnlock(s, nil) +} + +// ReportError is called by resolver implementations to report errors +// encountered during name resolution to gRPC. +func (ccr *ccResolverWrapper) ReportError(err error) { +	ccr.cc.mu.Lock() +	ccr.mu.Lock() +	if ccr.closed { +		ccr.mu.Unlock() +		ccr.cc.mu.Unlock() +		return +	} +	ccr.mu.Unlock() +	channelz.Warningf(logger, ccr.cc.channelzID, "ccResolverWrapper: reporting error to cc: %v", err) +	ccr.cc.updateResolverStateAndUnlock(resolver.State{}, err) +} + +// NewAddress is called by the resolver implementation to send addresses to +// gRPC. +func (ccr *ccResolverWrapper) NewAddress(addrs []resolver.Address) { +	ccr.cc.mu.Lock() +	ccr.mu.Lock() +	if ccr.closed { +		ccr.mu.Unlock() +		ccr.cc.mu.Unlock() +		return +	} +	s := resolver.State{Addresses: addrs, ServiceConfig: ccr.curState.ServiceConfig} +	ccr.addChannelzTraceEvent(s) +	ccr.curState = s +	ccr.mu.Unlock() +	ccr.cc.updateResolverStateAndUnlock(s, nil) +} + +// ParseServiceConfig is called by resolver implementations to parse a JSON +// representation of the service config. +func (ccr *ccResolverWrapper) ParseServiceConfig(scJSON string) *serviceconfig.ParseResult { +	return parseServiceConfig(scJSON) +} + +// addChannelzTraceEvent adds a channelz trace event containing the new +// state received from resolver implementations. +func (ccr *ccResolverWrapper) addChannelzTraceEvent(s resolver.State) { +	var updates []string +	var oldSC, newSC *ServiceConfig +	var oldOK, newOK bool +	if ccr.curState.ServiceConfig != nil { +		oldSC, oldOK = ccr.curState.ServiceConfig.Config.(*ServiceConfig) +	} +	if s.ServiceConfig != nil { +		newSC, newOK = s.ServiceConfig.Config.(*ServiceConfig) +	} +	if oldOK != newOK || (oldOK && newOK && oldSC.rawJSONString != newSC.rawJSONString) { +		updates = append(updates, "service config updated") +	} +	if len(ccr.curState.Addresses) > 0 && len(s.Addresses) == 0 { +		updates = append(updates, "resolver returned an empty address list") +	} else if len(ccr.curState.Addresses) == 0 && len(s.Addresses) > 0 { +		updates = append(updates, "resolver returned new addresses") +	} +	channelz.Infof(logger, ccr.cc.channelzID, "Resolver state updated: %s (%v)", pretty.ToJSON(s), strings.Join(updates, "; ")) +} diff --git a/vendor/google.golang.org/grpc/rpc_util.go b/vendor/google.golang.org/grpc/rpc_util.go index b7723aa09..a4b6bc687 100644 --- a/vendor/google.golang.org/grpc/rpc_util.go +++ b/vendor/google.golang.org/grpc/rpc_util.go @@ -640,14 +640,18 @@ func encode(c baseCodec, msg any) ([]byte, error) {  	return b, nil  } -// compress returns the input bytes compressed by compressor or cp.  If both -// compressors are nil, returns nil. +// compress returns the input bytes compressed by compressor or cp. +// If both compressors are nil, or if the message has zero length, returns nil, +// indicating no compression was done.  //  // TODO(dfawley): eliminate cp parameter by wrapping Compressor in an encoding.Compressor.  func compress(in []byte, cp Compressor, compressor encoding.Compressor) ([]byte, error) {  	if compressor == nil && cp == nil {  		return nil, nil  	} +	if len(in) == 0 { +		return nil, nil +	}  	wrapErr := func(err error) error {  		return status.Errorf(codes.Internal, "grpc: error while compressing: %v", err.Error())  	} diff --git a/vendor/google.golang.org/grpc/server.go b/vendor/google.golang.org/grpc/server.go index 8f60d4214..e89c5ac61 100644 --- a/vendor/google.golang.org/grpc/server.go +++ b/vendor/google.golang.org/grpc/server.go @@ -70,9 +70,10 @@ func init() {  	internal.GetServerCredentials = func(srv *Server) credentials.TransportCredentials {  		return srv.opts.creds  	} -	internal.DrainServerTransports = func(srv *Server, addr string) { -		srv.drainServerTransports(addr) +	internal.IsRegisteredMethod = func(srv *Server, method string) bool { +		return srv.isRegisteredMethod(method)  	} +	internal.ServerFromContext = serverFromContext  	internal.AddGlobalServerOptions = func(opt ...ServerOption) {  		globalServerOptions = append(globalServerOptions, opt...)  	} @@ -81,6 +82,7 @@ func init() {  	}  	internal.BinaryLogger = binaryLogger  	internal.JoinServerOptions = newJoinServerOption +	internal.RecvBufferPool = recvBufferPool  }  var statusOK = status.New(codes.OK, "") @@ -134,12 +136,14 @@ type Server struct {  	quit               *grpcsync.Event  	done               *grpcsync.Event  	channelzRemoveOnce sync.Once -	serveWG            sync.WaitGroup // counts active Serve goroutines for GracefulStop +	serveWG            sync.WaitGroup // counts active Serve goroutines for Stop/GracefulStop +	handlersWG         sync.WaitGroup // counts active method handler goroutines  	channelzID *channelz.Identifier  	czData     *channelzData -	serverWorkerChannel chan func() +	serverWorkerChannel      chan func() +	serverWorkerChannelClose func()  }  type serverOptions struct { @@ -170,6 +174,7 @@ type serverOptions struct {  	headerTableSize       *uint32  	numServerWorkers      uint32  	recvBufferPool        SharedBufferPool +	waitForHandlers       bool  }  var defaultServerOptions = serverOptions{ @@ -567,6 +572,21 @@ func NumStreamWorkers(numServerWorkers uint32) ServerOption {  	})  } +// WaitForHandlers cause Stop to wait until all outstanding method handlers have +// exited before returning.  If false, Stop will return as soon as all +// connections have closed, but method handlers may still be running. By +// default, Stop does not wait for method handlers to return. +// +// # Experimental +// +// Notice: This API is EXPERIMENTAL and may be changed or removed in a +// later release. +func WaitForHandlers(w bool) ServerOption { +	return newFuncServerOption(func(o *serverOptions) { +		o.waitForHandlers = w +	}) +} +  // RecvBufferPool returns a ServerOption that configures the server  // to use the provided shared buffer pool for parsing incoming messages. Depending  // on the application's workload, this could result in reduced memory allocation. @@ -578,11 +598,13 @@ func NumStreamWorkers(numServerWorkers uint32) ServerOption {  // options are used: StatsHandler, EnableTracing, or binary logging. In such  // cases, the shared buffer pool will be ignored.  // -// # Experimental -// -// Notice: This API is EXPERIMENTAL and may be changed or removed in a -// later release. +// Deprecated: use experimental.WithRecvBufferPool instead.  Will be deleted in +// v1.60.0 or later.  func RecvBufferPool(bufferPool SharedBufferPool) ServerOption { +	return recvBufferPool(bufferPool) +} + +func recvBufferPool(bufferPool SharedBufferPool) ServerOption {  	return newFuncServerOption(func(o *serverOptions) {  		o.recvBufferPool = bufferPool  	}) @@ -616,15 +638,14 @@ func (s *Server) serverWorker() {  // connections to reduce the time spent overall on runtime.morestack.  func (s *Server) initServerWorkers() {  	s.serverWorkerChannel = make(chan func()) +	s.serverWorkerChannelClose = grpcsync.OnceFunc(func() { +		close(s.serverWorkerChannel) +	})  	for i := uint32(0); i < s.opts.numServerWorkers; i++ {  		go s.serverWorker()  	}  } -func (s *Server) stopServerWorkers() { -	close(s.serverWorkerChannel) -} -  // NewServer creates a gRPC server which has no service registered and has not  // started to accept requests yet.  func NewServer(opt ...ServerOption) *Server { @@ -806,6 +827,18 @@ func (l *listenSocket) Close() error {  // Serve returns when lis.Accept fails with fatal errors.  lis will be closed when  // this method returns.  // Serve will return a non-nil error unless Stop or GracefulStop is called. +// +// Note: All supported releases of Go (as of December 2023) override the OS +// defaults for TCP keepalive time and interval to 15s. To enable TCP keepalive +// with OS defaults for keepalive time and interval, callers need to do the +// following two things: +//   - pass a net.Listener created by calling the Listen method on a +//     net.ListenConfig with the `KeepAlive` field set to a negative value. This +//     will result in the Go standard library not overriding OS defaults for TCP +//     keepalive interval and time. But this will also result in the Go standard +//     library not enabling TCP keepalives by default. +//   - override the Accept method on the passed in net.Listener and set the +//     SO_KEEPALIVE socket option to enable TCP keepalives, with OS defaults.  func (s *Server) Serve(lis net.Listener) error {  	s.mu.Lock()  	s.printf("serving") @@ -913,24 +946,21 @@ func (s *Server) handleRawConn(lisAddr string, rawConn net.Conn) {  		return  	} +	if cc, ok := rawConn.(interface { +		PassServerTransport(transport.ServerTransport) +	}); ok { +		cc.PassServerTransport(st) +	} +  	if !s.addConn(lisAddr, st) {  		return  	}  	go func() { -		s.serveStreams(st) +		s.serveStreams(context.Background(), st, rawConn)  		s.removeConn(lisAddr, st)  	}()  } -func (s *Server) drainServerTransports(addr string) { -	s.mu.Lock() -	conns := s.conns[addr] -	for st := range conns { -		st.Drain("") -	} -	s.mu.Unlock() -} -  // newHTTP2Transport sets up a http/2 transport (using the  // gRPC http2 server transport in transport/http2_server.go).  func (s *Server) newHTTP2Transport(c net.Conn) transport.ServerTransport { @@ -971,18 +1001,31 @@ func (s *Server) newHTTP2Transport(c net.Conn) transport.ServerTransport {  	return st  } -func (s *Server) serveStreams(st transport.ServerTransport) { -	defer st.Close(errors.New("finished serving streams for the server transport")) -	var wg sync.WaitGroup +func (s *Server) serveStreams(ctx context.Context, st transport.ServerTransport, rawConn net.Conn) { +	ctx = transport.SetConnection(ctx, rawConn) +	ctx = peer.NewContext(ctx, st.Peer()) +	for _, sh := range s.opts.statsHandlers { +		ctx = sh.TagConn(ctx, &stats.ConnTagInfo{ +			RemoteAddr: st.Peer().Addr, +			LocalAddr:  st.Peer().LocalAddr, +		}) +		sh.HandleConn(ctx, &stats.ConnBegin{}) +	} -	streamQuota := newHandlerQuota(s.opts.maxConcurrentStreams) -	st.HandleStreams(func(stream *transport.Stream) { -		wg.Add(1) +	defer func() { +		st.Close(errors.New("finished serving streams for the server transport")) +		for _, sh := range s.opts.statsHandlers { +			sh.HandleConn(ctx, &stats.ConnEnd{}) +		} +	}() +	streamQuota := newHandlerQuota(s.opts.maxConcurrentStreams) +	st.HandleStreams(ctx, func(stream *transport.Stream) { +		s.handlersWG.Add(1)  		streamQuota.acquire()  		f := func() {  			defer streamQuota.release() -			defer wg.Done() +			defer s.handlersWG.Done()  			s.handleStream(st, stream)  		} @@ -996,7 +1039,6 @@ func (s *Server) serveStreams(st transport.ServerTransport) {  		}  		go f()  	}) -	wg.Wait()  }  var _ http.Handler = (*Server)(nil) @@ -1040,7 +1082,7 @@ func (s *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) {  		return  	}  	defer s.removeConn(listenerAddressForServeHTTP, st) -	s.serveStreams(st) +	s.serveStreams(r.Context(), st, nil)  }  func (s *Server) addConn(addr string, st transport.ServerTransport) bool { @@ -1689,6 +1731,7 @@ func (s *Server) processStreamingRPC(ctx context.Context, t transport.ServerTran  func (s *Server) handleStream(t transport.ServerTransport, stream *transport.Stream) {  	ctx := stream.Context() +	ctx = contextWithServer(ctx, s)  	var ti *traceInfo  	if EnableTracing {  		tr := trace.New("grpc.Recv."+methodFamily(stream.Method()), stream.Method()) @@ -1697,7 +1740,7 @@ func (s *Server) handleStream(t transport.ServerTransport, stream *transport.Str  			tr: tr,  			firstLine: firstLine{  				client:     false, -				remoteAddr: t.RemoteAddr(), +				remoteAddr: t.Peer().Addr,  			},  		}  		if dl, ok := ctx.Deadline(); ok { @@ -1731,6 +1774,22 @@ func (s *Server) handleStream(t transport.ServerTransport, stream *transport.Str  	service := sm[:pos]  	method := sm[pos+1:] +	md, _ := metadata.FromIncomingContext(ctx) +	for _, sh := range s.opts.statsHandlers { +		ctx = sh.TagRPC(ctx, &stats.RPCTagInfo{FullMethodName: stream.Method()}) +		sh.HandleRPC(ctx, &stats.InHeader{ +			FullMethod:  stream.Method(), +			RemoteAddr:  t.Peer().Addr, +			LocalAddr:   t.Peer().LocalAddr, +			Compression: stream.RecvCompress(), +			WireLength:  stream.HeaderWireLength(), +			Header:      md, +		}) +	} +	// To have calls in stream callouts work. Will delete once all stats handler +	// calls come from the gRPC layer. +	stream.SetContext(ctx) +  	srv, knownService := s.services[service]  	if knownService {  		if md, ok := srv.methods[method]; ok { @@ -1820,62 +1879,72 @@ func ServerTransportStreamFromContext(ctx context.Context) ServerTransportStream  // pending RPCs on the client side will get notified by connection  // errors.  func (s *Server) Stop() { -	s.quit.Fire() +	s.stop(false) +} -	defer func() { -		s.serveWG.Wait() -		s.done.Fire() -	}() +// GracefulStop stops the gRPC server gracefully. It stops the server from +// accepting new connections and RPCs and blocks until all the pending RPCs are +// finished. +func (s *Server) GracefulStop() { +	s.stop(true) +} + +func (s *Server) stop(graceful bool) { +	s.quit.Fire() +	defer s.done.Fire()  	s.channelzRemoveOnce.Do(func() { channelz.RemoveEntry(s.channelzID) })  	s.mu.Lock() -	listeners := s.lis -	s.lis = nil -	conns := s.conns -	s.conns = nil -	// interrupt GracefulStop if Stop and GracefulStop are called concurrently. -	s.cv.Broadcast() +	s.closeListenersLocked() +	// Wait for serving threads to be ready to exit.  Only then can we be sure no +	// new conns will be created.  	s.mu.Unlock() +	s.serveWG.Wait() -	for lis := range listeners { -		lis.Close() +	s.mu.Lock() +	defer s.mu.Unlock() + +	if graceful { +		s.drainAllServerTransportsLocked() +	} else { +		s.closeServerTransportsLocked()  	} -	for _, cs := range conns { -		for st := range cs { -			st.Close(errors.New("Server.Stop called")) -		} + +	for len(s.conns) != 0 { +		s.cv.Wait()  	} +	s.conns = nil +  	if s.opts.numServerWorkers > 0 { -		s.stopServerWorkers() +		// Closing the channel (only once, via grpcsync.OnceFunc) after all the +		// connections have been closed above ensures that there are no +		// goroutines executing the callback passed to st.HandleStreams (where +		// the channel is written to). +		s.serverWorkerChannelClose() +	} + +	if graceful || s.opts.waitForHandlers { +		s.handlersWG.Wait()  	} -	s.mu.Lock()  	if s.events != nil {  		s.events.Finish()  		s.events = nil  	} -	s.mu.Unlock()  } -// GracefulStop stops the gRPC server gracefully. It stops the server from -// accepting new connections and RPCs and blocks until all the pending RPCs are -// finished. -func (s *Server) GracefulStop() { -	s.quit.Fire() -	defer s.done.Fire() - -	s.channelzRemoveOnce.Do(func() { channelz.RemoveEntry(s.channelzID) }) -	s.mu.Lock() -	if s.conns == nil { -		s.mu.Unlock() -		return +// s.mu must be held by the caller. +func (s *Server) closeServerTransportsLocked() { +	for _, conns := range s.conns { +		for st := range conns { +			st.Close(errors.New("Server.Stop called")) +		}  	} +} -	for lis := range s.lis { -		lis.Close() -	} -	s.lis = nil +// s.mu must be held by the caller. +func (s *Server) drainAllServerTransportsLocked() {  	if !s.drain {  		for _, conns := range s.conns {  			for st := range conns { @@ -1884,22 +1953,14 @@ func (s *Server) GracefulStop() {  		}  		s.drain = true  	} +} -	// Wait for serving threads to be ready to exit.  Only then can we be sure no -	// new conns will be created. -	s.mu.Unlock() -	s.serveWG.Wait() -	s.mu.Lock() - -	for len(s.conns) != 0 { -		s.cv.Wait() -	} -	s.conns = nil -	if s.events != nil { -		s.events.Finish() -		s.events = nil +// s.mu must be held by the caller. +func (s *Server) closeListenersLocked() { +	for lis := range s.lis { +		lis.Close()  	} -	s.mu.Unlock() +	s.lis = nil  }  // contentSubtype must be lowercase @@ -1913,11 +1974,50 @@ func (s *Server) getCodec(contentSubtype string) baseCodec {  	}  	codec := encoding.GetCodec(contentSubtype)  	if codec == nil { +		logger.Warningf("Unsupported codec %q. Defaulting to %q for now. This will start to fail in future releases.", contentSubtype, proto.Name)  		return encoding.GetCodec(proto.Name)  	}  	return codec  } +type serverKey struct{} + +// serverFromContext gets the Server from the context. +func serverFromContext(ctx context.Context) *Server { +	s, _ := ctx.Value(serverKey{}).(*Server) +	return s +} + +// contextWithServer sets the Server in the context. +func contextWithServer(ctx context.Context, server *Server) context.Context { +	return context.WithValue(ctx, serverKey{}, server) +} + +// isRegisteredMethod returns whether the passed in method is registered as a +// method on the server. /service/method and service/method will match if the +// service and method are registered on the server. +func (s *Server) isRegisteredMethod(serviceMethod string) bool { +	if serviceMethod != "" && serviceMethod[0] == '/' { +		serviceMethod = serviceMethod[1:] +	} +	pos := strings.LastIndex(serviceMethod, "/") +	if pos == -1 { // Invalid method name syntax. +		return false +	} +	service := serviceMethod[:pos] +	method := serviceMethod[pos+1:] +	srv, knownService := s.services[service] +	if knownService { +		if _, ok := srv.methods[method]; ok { +			return true +		} +		if _, ok := srv.streams[method]; ok { +			return true +		} +	} +	return false +} +  // SetHeader sets the header metadata to be sent from the server to the client.  // The context provided must be the context passed to the server's handler.  // diff --git a/vendor/google.golang.org/grpc/stream.go b/vendor/google.golang.org/grpc/stream.go index b14b2fbea..d621f52b1 100644 --- a/vendor/google.golang.org/grpc/stream.go +++ b/vendor/google.golang.org/grpc/stream.go @@ -48,6 +48,8 @@ import (  	"google.golang.org/grpc/status"  ) +var metadataFromOutgoingContextRaw = internal.FromOutgoingContextRaw.(func(context.Context) (metadata.MD, [][]string, bool)) +  // StreamHandler defines the handler called by gRPC server to complete the  // execution of a streaming RPC.  // @@ -184,7 +186,7 @@ func newClientStream(ctx context.Context, desc *StreamDesc, cc *ClientConn, meth  	// when the RPC completes.  	opts = append([]CallOption{OnFinish(func(error) { cc.idlenessMgr.OnCallEnd() })}, opts...) -	if md, added, ok := metadata.FromOutgoingContextRaw(ctx); ok { +	if md, added, ok := metadataFromOutgoingContextRaw(ctx); ok {  		// validate md  		if err := imetadata.Validate(md); err != nil {  			return nil, status.Error(codes.Internal, err.Error()) diff --git a/vendor/google.golang.org/grpc/version.go b/vendor/google.golang.org/grpc/version.go index 6d2cadd79..f1aec4c0a 100644 --- a/vendor/google.golang.org/grpc/version.go +++ b/vendor/google.golang.org/grpc/version.go @@ -19,4 +19,4 @@  package grpc  // Version is the current grpc version. -const Version = "1.59.0" +const Version = "1.61.1" diff --git a/vendor/google.golang.org/grpc/vet.sh b/vendor/google.golang.org/grpc/vet.sh index bb480f1f9..5da38a409 100644 --- a/vendor/google.golang.org/grpc/vet.sh +++ b/vendor/google.golang.org/grpc/vet.sh @@ -35,7 +35,6 @@ if [[ "$1" = "-install" ]]; then    # Install the pinned versions as defined in module tools.    pushd ./test/tools    go install \ -    golang.org/x/lint/golint \      golang.org/x/tools/cmd/goimports \      honnef.co/go/tools/cmd/staticcheck \      github.com/client9/misspell/cmd/misspell @@ -77,15 +76,19 @@ fi  not grep 'func Test[^(]' *_test.go  not grep 'func Test[^(]' test/*.go +# - Check for typos in test function names +git grep 'func (s) ' -- "*_test.go" | not grep -v 'func (s) Test' +git grep 'func [A-Z]' -- "*_test.go" | not grep -v 'func Test\|Benchmark\|Example' +  # - Do not import x/net/context.  not git grep -l 'x/net/context' -- "*.go"  # - Do not import math/rand for real library code.  Use internal/grpcrand for  #   thread safety. -git grep -l '"math/rand"' -- "*.go" 2>&1 | not grep -v '^examples\|^stress\|grpcrand\|^benchmark\|wrr_test' +git grep -l '"math/rand"' -- "*.go" 2>&1 | not grep -v '^examples\|^interop/stress\|grpcrand\|^benchmark\|wrr_test'  # - Do not use "interface{}"; use "any" instead. -git grep -l 'interface{}' -- "*.go" 2>&1 | not grep -v '\.pb\.go\|protoc-gen-go-grpc' +git grep -l 'interface{}' -- "*.go" 2>&1 | not grep -v '\.pb\.go\|protoc-gen-go-grpc\|grpc_testing_not_regenerate'  # - Do not call grpclog directly. Use grpclog.Component instead.  git grep -l -e 'grpclog.I' --or -e 'grpclog.W' --or -e 'grpclog.E' --or -e 'grpclog.F' --or -e 'grpclog.V' -- "*.go" | not grep -v '^grpclog/component.go\|^internal/grpctest/tlogger_test.go' @@ -94,15 +97,14 @@ git grep -l -e 'grpclog.I' --or -e 'grpclog.W' --or -e 'grpclog.E' --or -e 'grpc  not git grep "\(import \|^\s*\)\"github.com/golang/protobuf/ptypes/" -- "*.go"  # - Ensure all usages of grpc_testing package are renamed when importing. -not git grep "\(import \|^\s*\)\"google.golang.org/grpc/interop/grpc_testing" -- "*.go"  +not git grep "\(import \|^\s*\)\"google.golang.org/grpc/interop/grpc_testing" -- "*.go"  # - Ensure all xds proto imports are renamed to *pb or *grpc.  git grep '"github.com/envoyproxy/go-control-plane/envoy' -- '*.go' ':(exclude)*.pb.go' | not grep -v 'pb "\|grpc "'  misspell -error . -# - gofmt, goimports, golint (with exceptions for generated code), go vet, -# go mod tidy. +# - gofmt, goimports, go vet, go mod tidy.  # Perform these checks on each module inside gRPC.  for MOD_FILE in $(find . -name 'go.mod'); do    MOD_DIR=$(dirname ${MOD_FILE}) @@ -110,7 +112,6 @@ for MOD_FILE in $(find . -name 'go.mod'); do    go vet -all ./... | fail_on_output    gofmt -s -d -l . 2>&1 | fail_on_output    goimports -l . 2>&1 | not grep -vE "\.pb\.go" -  golint ./... 2>&1 | not grep -vE "/grpc_testing_not_regenerate/.*\.pb\.go:"    go mod tidy -compat=1.19    git status --porcelain 2>&1 | fail_on_output || \ @@ -119,94 +120,71 @@ for MOD_FILE in $(find . -name 'go.mod'); do  done  # - Collection of static analysis checks -# -# TODO(dfawley): don't use deprecated functions in examples or first-party -# plugins. -# TODO(dfawley): enable ST1019 (duplicate imports) but allow for protobufs.  SC_OUT="$(mktemp)" -staticcheck -go 1.19 -checks 'inherit,-ST1015,-ST1019,-SA1019' ./... > "${SC_OUT}" || true -# Error if anything other than deprecation warnings are printed. -not grep -v "is deprecated:.*SA1019" "${SC_OUT}" -# Only ignore the following deprecated types/fields/functions. -not grep -Fv '.CredsBundle -.HeaderMap -.Metadata is deprecated: use Attributes -.NewAddress -.NewServiceConfig -.Type is deprecated: use Attributes -BuildVersion is deprecated -balancer.ErrTransientFailure -balancer.Picker -extDesc.Filename is deprecated -github.com/golang/protobuf/jsonpb is deprecated -grpc.CallCustomCodec -grpc.Code -grpc.Compressor -grpc.CustomCodec -grpc.Decompressor -grpc.MaxMsgSize -grpc.MethodConfig -grpc.NewGZIPCompressor -grpc.NewGZIPDecompressor -grpc.RPCCompressor -grpc.RPCDecompressor -grpc.ServiceConfig -grpc.WithCompressor -grpc.WithDecompressor -grpc.WithDialer -grpc.WithMaxMsgSize -grpc.WithServiceConfig -grpc.WithTimeout -http.CloseNotifier -info.SecurityVersion -proto is deprecated -proto.InternalMessageInfo is deprecated -proto.EnumName is deprecated -proto.ErrInternalBadWireType is deprecated -proto.FileDescriptor is deprecated -proto.Marshaler is deprecated -proto.MessageType is deprecated -proto.RegisterEnum is deprecated -proto.RegisterFile is deprecated -proto.RegisterType is deprecated -proto.RegisterExtension is deprecated -proto.RegisteredExtension is deprecated -proto.RegisteredExtensions is deprecated -proto.RegisterMapType is deprecated -proto.Unmarshaler is deprecated +staticcheck -go 1.19 -checks 'all' ./... > "${SC_OUT}" || true + +# Error for anything other than checks that need exclusions. +grep -v "(ST1000)" "${SC_OUT}" | grep -v "(SA1019)" | grep -v "(ST1003)" | not grep -v "(ST1019)\|\(other import of\)" + +# Exclude underscore checks for generated code. +grep "(ST1003)" "${SC_OUT}" | not grep -v '\(.pb.go:\)\|\(code_string_test.go:\)\|\(grpc_testing_not_regenerate\)' + +# Error for duplicate imports not including grpc protos. +grep "(ST1019)\|\(other import of\)" "${SC_OUT}" | not grep -Fv 'XXXXX PleaseIgnoreUnused +channelz/grpc_channelz_v1" +go-control-plane/envoy +grpclb/grpc_lb_v1" +health/grpc_health_v1" +interop/grpc_testing" +orca/v3" +proto/grpc_gcp" +proto/grpc_lookup_v1" +reflection/grpc_reflection_v1" +reflection/grpc_reflection_v1alpha" +XXXXX PleaseIgnoreUnused' + +# Error for any package comments not in generated code. +grep "(ST1000)" "${SC_OUT}" | not grep -v "\.pb\.go:" + +# Only ignore the following deprecated types/fields/functions and exclude +# generated code. +grep "(SA1019)" "${SC_OUT}" | not grep -Fv 'XXXXX PleaseIgnoreUnused +XXXXX Protobuf related deprecation errors: +"github.com/golang/protobuf +.pb.go: +grpc_testing_not_regenerate +: ptypes. +proto.RegisterType +XXXXX gRPC internal usage deprecation errors: +"google.golang.org/grpc +: grpc. +: v1alpha. +: v1alphareflectionpb. +BalancerAttributes is deprecated: +CredsBundle is deprecated: +Metadata is deprecated: use Attributes instead. +NewSubConn is deprecated: +OverrideServerName is deprecated: +RemoveSubConn is deprecated: +SecurityVersion is deprecated:  Target is deprecated: Use the Target field in the BuildOptions instead. -xxx_messageInfo_ -' "${SC_OUT}" - -# - special golint on package comments. -lint_package_comment_per_package() { -  # Number of files in this go package. -  fileCount=$(go list -f '{{len .GoFiles}}' $1) -  if [ ${fileCount} -eq 0 ]; then -    return 0 -  fi -  # Number of package errors generated by golint. -  lintPackageCommentErrorsCount=$(golint --min_confidence 0 $1 | grep -c "should have a package comment") -  # golint complains about every file that's missing the package comment. If the -  # number of files for this package is greater than the number of errors, there's -  # at least one file with package comment, good. Otherwise, fail. -  if [ ${fileCount} -le ${lintPackageCommentErrorsCount} ]; then -    echo "Package $1 (with ${fileCount} files) is missing package comment" -    return 1 -  fi -} -lint_package_comment() { -  set +ex - -  count=0 -  for i in $(go list ./...); do -    lint_package_comment_per_package "$i" -    ((count += $?)) -  done - -  set -ex -  return $count -} -lint_package_comment +UpdateAddresses is deprecated: +UpdateSubConnState is deprecated: +balancer.ErrTransientFailure is deprecated: +grpc/reflection/v1alpha/reflection.proto +XXXXX xDS deprecated fields we support +.ExactMatch +.PrefixMatch +.SafeRegexMatch +.SuffixMatch +GetContainsMatch +GetExactMatch +GetMatchSubjectAltNames +GetPrefixMatch +GetSafeRegexMatch +GetSuffixMatch +GetTlsCertificateCertificateProviderInstance +GetValidationContextCertificateProviderInstance +XXXXX PleaseIgnoreUnused'  echo SUCCESS | 
