diff options
Diffstat (limited to 'vendor/google.golang.org/grpc')
36 files changed, 2628 insertions, 695 deletions
| diff --git a/vendor/google.golang.org/grpc/attributes/attributes.go b/vendor/google.golang.org/grpc/attributes/attributes.go index 02f5dc531..49712aca3 100644 --- a/vendor/google.golang.org/grpc/attributes/attributes.go +++ b/vendor/google.golang.org/grpc/attributes/attributes.go @@ -25,6 +25,11 @@  // later release.  package attributes +import ( +	"fmt" +	"strings" +) +  // Attributes is an immutable struct for storing and retrieving generic  // key/value pairs.  Keys must be hashable, and users should define their own  // types for keys.  Values should not be modified after they are added to an @@ -99,3 +104,39 @@ func (a *Attributes) Equal(o *Attributes) bool {  	}  	return true  } + +// String prints the attribute map. If any key or values throughout the map +// implement fmt.Stringer, it calls that method and appends. +func (a *Attributes) String() string { +	var sb strings.Builder +	sb.WriteString("{") +	first := true +	for k, v := range a.m { +		if !first { +			sb.WriteString(", ") +		} +		sb.WriteString(fmt.Sprintf("%q: %q ", str(k), str(v))) +		first = false +	} +	sb.WriteString("}") +	return sb.String() +} + +func str(x interface{}) string { +	if v, ok := x.(fmt.Stringer); ok { +		return v.String() +	} else if v, ok := x.(string); ok { +		return v +	} +	return fmt.Sprintf("<%p>", x) +} + +// MarshalJSON helps implement the json.Marshaler interface, thereby rendering +// the Attributes correctly when printing (via pretty.JSON) structs containing +// Attributes as fields. +// +// Is it impossible to unmarshal attributes from a JSON representation and this +// method is meant only for debugging purposes. +func (a *Attributes) MarshalJSON() ([]byte, error) { +	return []byte(a.String()), nil +} diff --git a/vendor/google.golang.org/grpc/balancer/balancer.go b/vendor/google.golang.org/grpc/balancer/balancer.go index 09d61dd1b..8f00523c0 100644 --- a/vendor/google.golang.org/grpc/balancer/balancer.go +++ b/vendor/google.golang.org/grpc/balancer/balancer.go @@ -286,7 +286,7 @@ type PickResult struct {  	//  	// LB policies with child policies are responsible for propagating metadata  	// injected by their children to the ClientConn, as part of Pick(). -	Metatada metadata.MD +	Metadata metadata.MD  }  // TransientFailureError returns e.  It exists for backward compatibility and diff --git a/vendor/google.golang.org/grpc/balancer_conn_wrappers.go b/vendor/google.golang.org/grpc/balancer_conn_wrappers.go index 0359956d3..04b9ad411 100644 --- a/vendor/google.golang.org/grpc/balancer_conn_wrappers.go +++ b/vendor/google.golang.org/grpc/balancer_conn_wrappers.go @@ -25,14 +25,20 @@ import (  	"sync"  	"google.golang.org/grpc/balancer" -	"google.golang.org/grpc/codes"  	"google.golang.org/grpc/connectivity"  	"google.golang.org/grpc/internal/balancer/gracefulswitch" -	"google.golang.org/grpc/internal/buffer"  	"google.golang.org/grpc/internal/channelz"  	"google.golang.org/grpc/internal/grpcsync"  	"google.golang.org/grpc/resolver" -	"google.golang.org/grpc/status" +) + +type ccbMode int + +const ( +	ccbModeActive = iota +	ccbModeIdle +	ccbModeClosed +	ccbModeExitingIdle  )  // ccBalancerWrapper sits between the ClientConn and the Balancer. @@ -49,192 +55,101 @@ import (  // It uses the gracefulswitch.Balancer internally to ensure that balancer  // switches happen in a graceful manner.  type ccBalancerWrapper struct { -	cc *ClientConn - -	// Since these fields are accessed only from handleXxx() methods which are -	// synchronized by the watcher goroutine, we do not need a mutex to protect -	// these fields. +	// The following fields are initialized when the wrapper is created and are +	// read-only afterwards, and therefore can be accessed without a mutex. +	cc   *ClientConn +	opts balancer.BuildOptions + +	// Outgoing (gRPC --> balancer) calls are guaranteed to execute in a +	// mutually exclusive manner as they are scheduled in the serializer. Fields +	// accessed *only* in these serializer callbacks, can therefore be accessed +	// without a mutex.  	balancer        *gracefulswitch.Balancer  	curBalancerName string -	updateCh *buffer.Unbounded // Updates written on this channel are processed by watcher(). -	resultCh *buffer.Unbounded // Results of calls to UpdateClientConnState() are pushed here. -	closed   *grpcsync.Event   // Indicates if close has been called. -	done     *grpcsync.Event   // Indicates if close has completed its work. +	// mu guards access to the below fields. Access to the serializer and its +	// cancel function needs to be mutex protected because they are overwritten +	// when the wrapper exits idle mode. +	mu               sync.Mutex +	serializer       *grpcsync.CallbackSerializer // To serialize all outoing calls. +	serializerCancel context.CancelFunc           // To close the seralizer at close/enterIdle time. +	mode             ccbMode                      // Tracks the current mode of the wrapper.  }  // newCCBalancerWrapper creates a new balancer wrapper. The underlying balancer  // is not created until the switchTo() method is invoked.  func newCCBalancerWrapper(cc *ClientConn, bopts balancer.BuildOptions) *ccBalancerWrapper { +	ctx, cancel := context.WithCancel(context.Background())  	ccb := &ccBalancerWrapper{ -		cc:       cc, -		updateCh: buffer.NewUnbounded(), -		resultCh: buffer.NewUnbounded(), -		closed:   grpcsync.NewEvent(), -		done:     grpcsync.NewEvent(), +		cc:               cc, +		opts:             bopts, +		serializer:       grpcsync.NewCallbackSerializer(ctx), +		serializerCancel: cancel,  	} -	go ccb.watcher()  	ccb.balancer = gracefulswitch.NewBalancer(ccb, bopts)  	return ccb  } -// The following xxxUpdate structs wrap the arguments received as part of the -// corresponding update. The watcher goroutine uses the 'type' of the update to -// invoke the appropriate handler routine to handle the update. - -type ccStateUpdate struct { -	ccs *balancer.ClientConnState -} - -type scStateUpdate struct { -	sc    balancer.SubConn -	state connectivity.State -	err   error -} - -type exitIdleUpdate struct{} - -type resolverErrorUpdate struct { -	err error -} - -type switchToUpdate struct { -	name string -} - -type subConnUpdate struct { -	acbw *acBalancerWrapper -} - -// watcher is a long-running goroutine which reads updates from a channel and -// invokes corresponding methods on the underlying balancer. It ensures that -// these methods are invoked in a synchronous fashion. It also ensures that -// these methods are invoked in the order in which the updates were received. -func (ccb *ccBalancerWrapper) watcher() { -	for { -		select { -		case u := <-ccb.updateCh.Get(): -			ccb.updateCh.Load() -			if ccb.closed.HasFired() { -				break -			} -			switch update := u.(type) { -			case *ccStateUpdate: -				ccb.handleClientConnStateChange(update.ccs) -			case *scStateUpdate: -				ccb.handleSubConnStateChange(update) -			case *exitIdleUpdate: -				ccb.handleExitIdle() -			case *resolverErrorUpdate: -				ccb.handleResolverError(update.err) -			case *switchToUpdate: -				ccb.handleSwitchTo(update.name) -			case *subConnUpdate: -				ccb.handleRemoveSubConn(update.acbw) -			default: -				logger.Errorf("ccBalancerWrapper.watcher: unknown update %+v, type %T", update, update) -			} -		case <-ccb.closed.Done(): -		} - -		if ccb.closed.HasFired() { -			ccb.handleClose() -			return -		} -	} -} -  // updateClientConnState is invoked by grpc to push a ClientConnState update to  // the underlying balancer. -// -// Unlike other methods invoked by grpc to push updates to the underlying -// balancer, this method cannot simply push the update onto the update channel -// and return. It needs to return the error returned by the underlying balancer -// back to grpc which propagates that to the resolver.  func (ccb *ccBalancerWrapper) updateClientConnState(ccs *balancer.ClientConnState) error { -	ccb.updateCh.Put(&ccStateUpdate{ccs: ccs}) - -	var res interface{} -	select { -	case res = <-ccb.resultCh.Get(): -		ccb.resultCh.Load() -	case <-ccb.closed.Done(): -		// Return early if the balancer wrapper is closed while we are waiting for -		// the underlying balancer to process a ClientConnState update. -		return nil -	} -	// If the returned error is nil, attempting to type assert to error leads to -	// panic. So, this needs to handled separately. -	if res == nil { -		return nil -	} -	return res.(error) -} - -// handleClientConnStateChange handles a ClientConnState update from the update -// channel and invokes the appropriate method on the underlying balancer. -// -// If the addresses specified in the update contain addresses of type "grpclb" -// and the selected LB policy is not "grpclb", these addresses will be filtered -// out and ccs will be modified with the updated address list. -func (ccb *ccBalancerWrapper) handleClientConnStateChange(ccs *balancer.ClientConnState) { -	if ccb.curBalancerName != grpclbName { -		// Filter any grpclb addresses since we don't have the grpclb balancer. -		var addrs []resolver.Address -		for _, addr := range ccs.ResolverState.Addresses { -			if addr.Type == resolver.GRPCLB { -				continue +	ccb.mu.Lock() +	errCh := make(chan error, 1) +	// Here and everywhere else where Schedule() is called, it is done with the +	// lock held. But the lock guards only the scheduling part. The actual +	// callback is called asynchronously without the lock being held. +	ok := ccb.serializer.Schedule(func(_ context.Context) { +		// If the addresses specified in the update contain addresses of type +		// "grpclb" and the selected LB policy is not "grpclb", these addresses +		// will be filtered out and ccs will be modified with the updated +		// address list. +		if ccb.curBalancerName != grpclbName { +			var addrs []resolver.Address +			for _, addr := range ccs.ResolverState.Addresses { +				if addr.Type == resolver.GRPCLB { +					continue +				} +				addrs = append(addrs, addr)  			} -			addrs = append(addrs, addr) +			ccs.ResolverState.Addresses = addrs  		} -		ccs.ResolverState.Addresses = addrs +		errCh <- ccb.balancer.UpdateClientConnState(*ccs) +	}) +	if !ok { +		// If we are unable to schedule a function with the serializer, it +		// indicates that it has been closed. A serializer is only closed when +		// the wrapper is closed or is in idle. +		ccb.mu.Unlock() +		return fmt.Errorf("grpc: cannot send state update to a closed or idle balancer")  	} -	ccb.resultCh.Put(ccb.balancer.UpdateClientConnState(*ccs)) +	ccb.mu.Unlock() + +	// We get here only if the above call to Schedule succeeds, in which case it +	// is guaranteed that the scheduled function will run. Therefore it is safe +	// to block on this channel. +	err := <-errCh +	if logger.V(2) && err != nil { +		logger.Infof("error from balancer.UpdateClientConnState: %v", err) +	} +	return err  }  // updateSubConnState is invoked by grpc to push a subConn state update to the  // underlying balancer.  func (ccb *ccBalancerWrapper) updateSubConnState(sc balancer.SubConn, s connectivity.State, err error) { -	// When updating addresses for a SubConn, if the address in use is not in -	// the new addresses, the old ac will be tearDown() and a new ac will be -	// created. tearDown() generates a state change with Shutdown state, we -	// don't want the balancer to receive this state change. So before -	// tearDown() on the old ac, ac.acbw (acWrapper) will be set to nil, and -	// this function will be called with (nil, Shutdown). We don't need to call -	// balancer method in this case. -	if sc == nil { -		return -	} -	ccb.updateCh.Put(&scStateUpdate{ -		sc:    sc, -		state: s, -		err:   err, +	ccb.mu.Lock() +	ccb.serializer.Schedule(func(_ context.Context) { +		ccb.balancer.UpdateSubConnState(sc, balancer.SubConnState{ConnectivityState: s, ConnectionError: err})  	}) -} - -// handleSubConnStateChange handles a SubConnState update from the update -// channel and invokes the appropriate method on the underlying balancer. -func (ccb *ccBalancerWrapper) handleSubConnStateChange(update *scStateUpdate) { -	ccb.balancer.UpdateSubConnState(update.sc, balancer.SubConnState{ConnectivityState: update.state, ConnectionError: update.err}) -} - -func (ccb *ccBalancerWrapper) exitIdle() { -	ccb.updateCh.Put(&exitIdleUpdate{}) -} - -func (ccb *ccBalancerWrapper) handleExitIdle() { -	if ccb.cc.GetState() != connectivity.Idle { -		return -	} -	ccb.balancer.ExitIdle() +	ccb.mu.Unlock()  }  func (ccb *ccBalancerWrapper) resolverError(err error) { -	ccb.updateCh.Put(&resolverErrorUpdate{err: err}) -} - -func (ccb *ccBalancerWrapper) handleResolverError(err error) { -	ccb.balancer.ResolverError(err) +	ccb.mu.Lock() +	ccb.serializer.Schedule(func(_ context.Context) { +		ccb.balancer.ResolverError(err) +	}) +	ccb.mu.Unlock()  }  // switchTo is invoked by grpc to instruct the balancer wrapper to switch to the @@ -248,24 +163,27 @@ func (ccb *ccBalancerWrapper) handleResolverError(err error) {  // the ccBalancerWrapper keeps track of the current LB policy name, and skips  // the graceful balancer switching process if the name does not change.  func (ccb *ccBalancerWrapper) switchTo(name string) { -	ccb.updateCh.Put(&switchToUpdate{name: name}) +	ccb.mu.Lock() +	ccb.serializer.Schedule(func(_ context.Context) { +		// TODO: Other languages use case-sensitive balancer registries. We should +		// switch as well. See: https://github.com/grpc/grpc-go/issues/5288. +		if strings.EqualFold(ccb.curBalancerName, name) { +			return +		} +		ccb.buildLoadBalancingPolicy(name) +	}) +	ccb.mu.Unlock()  } -// handleSwitchTo handles a balancer switch update from the update channel. It -// calls the SwitchTo() method on the gracefulswitch.Balancer with a -// balancer.Builder corresponding to name. If no balancer.Builder is registered -// for the given name, it uses the default LB policy which is "pick_first". -func (ccb *ccBalancerWrapper) handleSwitchTo(name string) { -	// TODO: Other languages use case-insensitive balancer registries. We should -	// switch as well. See: https://github.com/grpc/grpc-go/issues/5288. -	if strings.EqualFold(ccb.curBalancerName, name) { -		return -	} - -	// TODO: Ensure that name is a registered LB policy when we get here. -	// We currently only validate the `loadBalancingConfig` field. We need to do -	// the same for the `loadBalancingPolicy` field and reject the service config -	// if the specified policy is not registered. +// buildLoadBalancingPolicy performs the following: +//   - retrieve a balancer builder for the given name. Use the default LB +//     policy, pick_first, if no LB policy with name is found in the registry. +//   - instruct the gracefulswitch balancer to switch to the above builder. This +//     will actually build the new balancer. +//   - update the `curBalancerName` field +// +// Must be called from a serializer callback. +func (ccb *ccBalancerWrapper) buildLoadBalancingPolicy(name string) {  	builder := balancer.Get(name)  	if builder == nil {  		channelz.Warningf(logger, ccb.cc.channelzID, "Channel switches to new LB policy %q, since the specified LB policy %q was not registered", PickFirstBalancerName, name) @@ -281,26 +199,114 @@ func (ccb *ccBalancerWrapper) handleSwitchTo(name string) {  	ccb.curBalancerName = builder.Name()  } -// handleRemoveSucConn handles a request from the underlying balancer to remove -// a subConn. -// -// See comments in RemoveSubConn() for more details. -func (ccb *ccBalancerWrapper) handleRemoveSubConn(acbw *acBalancerWrapper) { -	ccb.cc.removeAddrConn(acbw.getAddrConn(), errConnDrain) +func (ccb *ccBalancerWrapper) close() { +	channelz.Info(logger, ccb.cc.channelzID, "ccBalancerWrapper: closing") +	ccb.closeBalancer(ccbModeClosed)  } -func (ccb *ccBalancerWrapper) close() { -	ccb.closed.Fire() -	<-ccb.done.Done() +// enterIdleMode is invoked by grpc when the channel enters idle mode upon +// expiry of idle_timeout. This call blocks until the balancer is closed. +func (ccb *ccBalancerWrapper) enterIdleMode() { +	channelz.Info(logger, ccb.cc.channelzID, "ccBalancerWrapper: entering idle mode") +	ccb.closeBalancer(ccbModeIdle) +} + +// closeBalancer is invoked when the channel is being closed or when it enters +// idle mode upon expiry of idle_timeout. +func (ccb *ccBalancerWrapper) closeBalancer(m ccbMode) { +	ccb.mu.Lock() +	if ccb.mode == ccbModeClosed || ccb.mode == ccbModeIdle { +		ccb.mu.Unlock() +		return +	} + +	ccb.mode = m +	done := ccb.serializer.Done +	b := ccb.balancer +	ok := ccb.serializer.Schedule(func(_ context.Context) { +		// Close the serializer to ensure that no more calls from gRPC are sent +		// to the balancer. +		ccb.serializerCancel() +		// Empty the current balancer name because we don't have a balancer +		// anymore and also so that we act on the next call to switchTo by +		// creating a new balancer specified by the new resolver. +		ccb.curBalancerName = "" +	}) +	if !ok { +		ccb.mu.Unlock() +		return +	} +	ccb.mu.Unlock() + +	// Give enqueued callbacks a chance to finish. +	<-done +	// Spawn a goroutine to close the balancer (since it may block trying to +	// cleanup all allocated resources) and return early. +	go b.Close()  } -func (ccb *ccBalancerWrapper) handleClose() { -	ccb.balancer.Close() -	ccb.done.Fire() +// exitIdleMode is invoked by grpc when the channel exits idle mode either +// because of an RPC or because of an invocation of the Connect() API. This +// recreates the balancer that was closed previously when entering idle mode. +// +// If the channel is not in idle mode, we know for a fact that we are here as a +// result of the user calling the Connect() method on the ClientConn. In this +// case, we can simply forward the call to the underlying balancer, instructing +// it to reconnect to the backends. +func (ccb *ccBalancerWrapper) exitIdleMode() { +	ccb.mu.Lock() +	if ccb.mode == ccbModeClosed { +		// Request to exit idle is a no-op when wrapper is already closed. +		ccb.mu.Unlock() +		return +	} + +	if ccb.mode == ccbModeIdle { +		// Recreate the serializer which was closed when we entered idle. +		ctx, cancel := context.WithCancel(context.Background()) +		ccb.serializer = grpcsync.NewCallbackSerializer(ctx) +		ccb.serializerCancel = cancel +	} + +	// The ClientConn guarantees that mutual exclusion between close() and +	// exitIdleMode(), and since we just created a new serializer, we can be +	// sure that the below function will be scheduled. +	done := make(chan struct{}) +	ccb.serializer.Schedule(func(_ context.Context) { +		defer close(done) + +		ccb.mu.Lock() +		defer ccb.mu.Unlock() + +		if ccb.mode != ccbModeIdle { +			ccb.balancer.ExitIdle() +			return +		} + +		// Gracefulswitch balancer does not support a switchTo operation after +		// being closed. Hence we need to create a new one here. +		ccb.balancer = gracefulswitch.NewBalancer(ccb, ccb.opts) +		ccb.mode = ccbModeActive +		channelz.Info(logger, ccb.cc.channelzID, "ccBalancerWrapper: exiting idle mode") + +	}) +	ccb.mu.Unlock() + +	<-done +} + +func (ccb *ccBalancerWrapper) isIdleOrClosed() bool { +	ccb.mu.Lock() +	defer ccb.mu.Unlock() +	return ccb.mode == ccbModeIdle || ccb.mode == ccbModeClosed  }  func (ccb *ccBalancerWrapper) NewSubConn(addrs []resolver.Address, opts balancer.NewSubConnOptions) (balancer.SubConn, error) { -	if len(addrs) <= 0 { +	if ccb.isIdleOrClosed() { +		return nil, fmt.Errorf("grpc: cannot create SubConn when balancer is closed or idle") +	} + +	if len(addrs) == 0 {  		return nil, fmt.Errorf("grpc: cannot create SubConn with empty address list")  	}  	ac, err := ccb.cc.newAddrConn(addrs, opts) @@ -309,31 +315,35 @@ func (ccb *ccBalancerWrapper) NewSubConn(addrs []resolver.Address, opts balancer  		return nil, err  	}  	acbw := &acBalancerWrapper{ac: ac, producers: make(map[balancer.ProducerBuilder]*refCountedProducer)} -	acbw.ac.mu.Lock()  	ac.acbw = acbw -	acbw.ac.mu.Unlock()  	return acbw, nil  }  func (ccb *ccBalancerWrapper) RemoveSubConn(sc balancer.SubConn) { -	// Before we switched the ccBalancerWrapper to use gracefulswitch.Balancer, it -	// was required to handle the RemoveSubConn() method asynchronously by pushing -	// the update onto the update channel. This was done to avoid a deadlock as -	// switchBalancer() was holding cc.mu when calling Close() on the old -	// balancer, which would in turn call RemoveSubConn(). -	// -	// With the use of gracefulswitch.Balancer in ccBalancerWrapper, handling this -	// asynchronously is probably not required anymore since the switchTo() method -	// handles the balancer switch by pushing the update onto the channel. -	// TODO(easwars): Handle this inline. +	if ccb.isIdleOrClosed() { +		// It it safe to ignore this call when the balancer is closed or in idle +		// because the ClientConn takes care of closing the connections. +		// +		// Not returning early from here when the balancer is closed or in idle +		// leads to a deadlock though, because of the following sequence of +		// calls when holding cc.mu: +		// cc.exitIdleMode --> ccb.enterIdleMode --> gsw.Close --> +		// ccb.RemoveAddrConn --> cc.removeAddrConn +		return +	} +  	acbw, ok := sc.(*acBalancerWrapper)  	if !ok {  		return  	} -	ccb.updateCh.Put(&subConnUpdate{acbw: acbw}) +	ccb.cc.removeAddrConn(acbw.ac, errConnDrain)  }  func (ccb *ccBalancerWrapper) UpdateAddresses(sc balancer.SubConn, addrs []resolver.Address) { +	if ccb.isIdleOrClosed() { +		return +	} +  	acbw, ok := sc.(*acBalancerWrapper)  	if !ok {  		return @@ -342,6 +352,10 @@ func (ccb *ccBalancerWrapper) UpdateAddresses(sc balancer.SubConn, addrs []resol  }  func (ccb *ccBalancerWrapper) UpdateState(s balancer.State) { +	if ccb.isIdleOrClosed() { +		return +	} +  	// Update picker before updating state.  Even though the ordering here does  	// not matter, it can lead to multiple calls of Pick in the common start-up  	// case where we wait for ready and then perform an RPC.  If the picker is @@ -352,6 +366,10 @@ func (ccb *ccBalancerWrapper) UpdateState(s balancer.State) {  }  func (ccb *ccBalancerWrapper) ResolveNow(o resolver.ResolveNowOptions) { +	if ccb.isIdleOrClosed() { +		return +	} +  	ccb.cc.resolveNow(o)  } @@ -362,71 +380,31 @@ func (ccb *ccBalancerWrapper) Target() string {  // acBalancerWrapper is a wrapper on top of ac for balancers.  // It implements balancer.SubConn interface.  type acBalancerWrapper struct { +	ac *addrConn // read-only +  	mu        sync.Mutex -	ac        *addrConn  	producers map[balancer.ProducerBuilder]*refCountedProducer  } -func (acbw *acBalancerWrapper) UpdateAddresses(addrs []resolver.Address) { -	acbw.mu.Lock() -	defer acbw.mu.Unlock() -	if len(addrs) <= 0 { -		acbw.ac.cc.removeAddrConn(acbw.ac, errConnDrain) -		return -	} -	if !acbw.ac.tryUpdateAddrs(addrs) { -		cc := acbw.ac.cc -		opts := acbw.ac.scopts -		acbw.ac.mu.Lock() -		// Set old ac.acbw to nil so the Shutdown state update will be ignored -		// by balancer. -		// -		// TODO(bar) the state transition could be wrong when tearDown() old ac -		// and creating new ac, fix the transition. -		acbw.ac.acbw = nil -		acbw.ac.mu.Unlock() -		acState := acbw.ac.getState() -		acbw.ac.cc.removeAddrConn(acbw.ac, errConnDrain) - -		if acState == connectivity.Shutdown { -			return -		} +func (acbw *acBalancerWrapper) String() string { +	return fmt.Sprintf("SubConn(id:%d)", acbw.ac.channelzID.Int()) +} -		newAC, err := cc.newAddrConn(addrs, opts) -		if err != nil { -			channelz.Warningf(logger, acbw.ac.channelzID, "acBalancerWrapper: UpdateAddresses: failed to newAddrConn: %v", err) -			return -		} -		acbw.ac = newAC -		newAC.mu.Lock() -		newAC.acbw = acbw -		newAC.mu.Unlock() -		if acState != connectivity.Idle { -			go newAC.connect() -		} -	} +func (acbw *acBalancerWrapper) UpdateAddresses(addrs []resolver.Address) { +	acbw.ac.updateAddrs(addrs)  }  func (acbw *acBalancerWrapper) Connect() { -	acbw.mu.Lock() -	defer acbw.mu.Unlock()  	go acbw.ac.connect()  } -func (acbw *acBalancerWrapper) getAddrConn() *addrConn { -	acbw.mu.Lock() -	defer acbw.mu.Unlock() -	return acbw.ac -} - -var errSubConnNotReady = status.Error(codes.Unavailable, "SubConn not currently connected") -  // NewStream begins a streaming RPC on the addrConn.  If the addrConn is not -// ready, returns errSubConnNotReady. +// ready, blocks until it is or ctx expires.  Returns an error when the context +// expires or the addrConn is shut down.  func (acbw *acBalancerWrapper) NewStream(ctx context.Context, desc *StreamDesc, method string, opts ...CallOption) (ClientStream, error) { -	transport := acbw.ac.getReadyTransport() -	if transport == nil { -		return nil, errSubConnNotReady +	transport, err := acbw.ac.getTransport(ctx) +	if err != nil { +		return nil, err  	}  	return newNonRetryClientStream(ctx, desc, method, transport, acbw.ac, opts...)  } diff --git a/vendor/google.golang.org/grpc/call.go b/vendor/google.golang.org/grpc/call.go index 9e20e4d38..e6a1dc5d7 100644 --- a/vendor/google.golang.org/grpc/call.go +++ b/vendor/google.golang.org/grpc/call.go @@ -27,6 +27,11 @@ import (  //  // All errors returned by Invoke are compatible with the status package.  func (cc *ClientConn) Invoke(ctx context.Context, method string, args, reply interface{}, opts ...CallOption) error { +	if err := cc.idlenessMgr.onCallBegin(); err != nil { +		return err +	} +	defer cc.idlenessMgr.onCallEnd() +  	// allow interceptor to see all applicable call options, which means those  	// configured as defaults from dial option as well as per-call options  	opts = combine(cc.dopts.callOptions, opts) diff --git a/vendor/google.golang.org/grpc/clientconn.go b/vendor/google.golang.org/grpc/clientconn.go index 3a7614242..bfd7555a8 100644 --- a/vendor/google.golang.org/grpc/clientconn.go +++ b/vendor/google.golang.org/grpc/clientconn.go @@ -24,7 +24,6 @@ import (  	"fmt"  	"math"  	"net/url" -	"reflect"  	"strings"  	"sync"  	"sync/atomic" @@ -38,6 +37,7 @@ import (  	"google.golang.org/grpc/internal/backoff"  	"google.golang.org/grpc/internal/channelz"  	"google.golang.org/grpc/internal/grpcsync" +	"google.golang.org/grpc/internal/pretty"  	iresolver "google.golang.org/grpc/internal/resolver"  	"google.golang.org/grpc/internal/transport"  	"google.golang.org/grpc/keepalive" @@ -69,6 +69,9 @@ var (  	errConnDrain = errors.New("grpc: the connection is drained")  	// errConnClosing indicates that the connection is closing.  	errConnClosing = errors.New("grpc: the connection is closing") +	// errConnIdling indicates the the connection is being closed as the channel +	// is moving to an idle mode due to inactivity. +	errConnIdling = errors.New("grpc: the connection is closing due to channel idleness")  	// invalidDefaultServiceConfigErrPrefix is used to prefix the json parsing error for the default  	// service config.  	invalidDefaultServiceConfigErrPrefix = "grpc: the provided default service config is invalid" @@ -134,17 +137,29 @@ func (dcs *defaultConfigSelector) SelectConfig(rpcInfo iresolver.RPCInfo) (*ires  // e.g. to use dns resolver, a "dns:///" prefix should be applied to the target.  func DialContext(ctx context.Context, target string, opts ...DialOption) (conn *ClientConn, err error) {  	cc := &ClientConn{ -		target:            target, -		csMgr:             &connectivityStateManager{}, -		conns:             make(map[*addrConn]struct{}), -		dopts:             defaultDialOptions(), -		blockingpicker:    newPickerWrapper(), -		czData:            new(channelzData), -		firstResolveEvent: grpcsync.NewEvent(), -	} +		target: target, +		csMgr:  &connectivityStateManager{}, +		conns:  make(map[*addrConn]struct{}), +		dopts:  defaultDialOptions(), +		czData: new(channelzData), +	} + +	// We start the channel off in idle mode, but kick it out of idle at the end +	// of this method, instead of waiting for the first RPC. Other gRPC +	// implementations do wait for the first RPC to kick the channel out of +	// idle. But doing so would be a major behavior change for our users who are +	// used to seeing the channel active after Dial. +	// +	// Taking this approach of kicking it out of idle at the end of this method +	// allows us to share the code between channel creation and exiting idle +	// mode. This will also make it easy for us to switch to starting the +	// channel off in idle, if at all we ever get to do that. +	cc.idlenessState = ccIdlenessStateIdle +  	cc.retryThrottler.Store((*retryThrottler)(nil))  	cc.safeConfigSelector.UpdateConfigSelector(&defaultConfigSelector{nil})  	cc.ctx, cc.cancel = context.WithCancel(context.Background()) +	cc.exitIdleCond = sync.NewCond(&cc.mu)  	disableGlobalOpts := false  	for _, opt := range opts { @@ -173,40 +188,11 @@ func DialContext(ctx context.Context, target string, opts ...DialOption) (conn *  		}  	}() -	pid := cc.dopts.channelzParentID -	cc.channelzID = channelz.RegisterChannel(&channelzChannel{cc}, pid, target) -	ted := &channelz.TraceEventDesc{ -		Desc:     "Channel created", -		Severity: channelz.CtInfo, -	} -	if cc.dopts.channelzParentID != nil { -		ted.Parent = &channelz.TraceEventDesc{ -			Desc:     fmt.Sprintf("Nested Channel(id:%d) created", cc.channelzID.Int()), -			Severity: channelz.CtInfo, -		} -	} -	channelz.AddTraceEvent(logger, cc.channelzID, 1, ted) -	cc.csMgr.channelzID = cc.channelzID +	// Register ClientConn with channelz. +	cc.channelzRegistration(target) -	if cc.dopts.copts.TransportCredentials == nil && cc.dopts.copts.CredsBundle == nil { -		return nil, errNoTransportSecurity -	} -	if cc.dopts.copts.TransportCredentials != nil && cc.dopts.copts.CredsBundle != nil { -		return nil, errTransportCredsAndBundle -	} -	if cc.dopts.copts.CredsBundle != nil && cc.dopts.copts.CredsBundle.TransportCredentials() == nil { -		return nil, errNoTransportCredsInBundle -	} -	transportCreds := cc.dopts.copts.TransportCredentials -	if transportCreds == nil { -		transportCreds = cc.dopts.copts.CredsBundle.TransportCredentials() -	} -	if transportCreds.Info().SecurityProtocol == "insecure" { -		for _, cd := range cc.dopts.copts.PerRPCCredentials { -			if cd.RequireTransportSecurity() { -				return nil, errTransportCredentialsMissing -			} -		} +	if err := cc.validateTransportCredentials(); err != nil { +		return nil, err  	}  	if cc.dopts.defaultServiceConfigRawJSON != nil { @@ -249,15 +235,12 @@ func DialContext(ctx context.Context, target string, opts ...DialOption) (conn *  	}  	// Determine the resolver to use. -	resolverBuilder, err := cc.parseTargetAndFindResolver() -	if err != nil { +	if err := cc.parseTargetAndFindResolver(); err != nil {  		return nil, err  	} -	cc.authority, err = determineAuthority(cc.parsedTarget.Endpoint(), cc.target, cc.dopts) -	if err != nil { +	if err = cc.determineAuthority(); err != nil {  		return nil, err  	} -	channelz.Infof(logger, cc.channelzID, "Channel authority set to %q", cc.authority)  	if cc.dopts.scChan != nil {  		// Blocking wait for the initial service config. @@ -275,57 +258,224 @@ func DialContext(ctx context.Context, target string, opts ...DialOption) (conn *  		go cc.scWatcher()  	} +	// This creates the name resolver, load balancer, blocking picker etc. +	if err := cc.exitIdleMode(); err != nil { +		return nil, err +	} + +	// Configure idleness support with configured idle timeout or default idle +	// timeout duration. Idleness can be explicitly disabled by the user, by +	// setting the dial option to 0. +	cc.idlenessMgr = newIdlenessManager(cc, cc.dopts.idleTimeout) + +	// Return early for non-blocking dials. +	if !cc.dopts.block { +		return cc, nil +	} + +	// A blocking dial blocks until the clientConn is ready. +	for { +		s := cc.GetState() +		if s == connectivity.Idle { +			cc.Connect() +		} +		if s == connectivity.Ready { +			return cc, nil +		} else if cc.dopts.copts.FailOnNonTempDialError && s == connectivity.TransientFailure { +			if err = cc.connectionError(); err != nil { +				terr, ok := err.(interface { +					Temporary() bool +				}) +				if ok && !terr.Temporary() { +					return nil, err +				} +			} +		} +		if !cc.WaitForStateChange(ctx, s) { +			// ctx got timeout or canceled. +			if err = cc.connectionError(); err != nil && cc.dopts.returnLastError { +				return nil, err +			} +			return nil, ctx.Err() +		} +	} +} + +// addTraceEvent is a helper method to add a trace event on the channel. If the +// channel is a nested one, the same event is also added on the parent channel. +func (cc *ClientConn) addTraceEvent(msg string) { +	ted := &channelz.TraceEventDesc{ +		Desc:     fmt.Sprintf("Channel %s", msg), +		Severity: channelz.CtInfo, +	} +	if cc.dopts.channelzParentID != nil { +		ted.Parent = &channelz.TraceEventDesc{ +			Desc:     fmt.Sprintf("Nested channel(id:%d) %s", cc.channelzID.Int(), msg), +			Severity: channelz.CtInfo, +		} +	} +	channelz.AddTraceEvent(logger, cc.channelzID, 0, ted) +} + +// exitIdleMode moves the channel out of idle mode by recreating the name +// resolver and load balancer. +func (cc *ClientConn) exitIdleMode() error { +	cc.mu.Lock() +	if cc.conns == nil { +		cc.mu.Unlock() +		return errConnClosing +	} +	if cc.idlenessState != ccIdlenessStateIdle { +		cc.mu.Unlock() +		logger.Info("ClientConn asked to exit idle mode when not in idle mode") +		return nil +	} + +	defer func() { +		// When Close() and exitIdleMode() race against each other, one of the +		// following two can happen: +		// - Close() wins the race and runs first. exitIdleMode() runs after, and +		//   sees that the ClientConn is already closed and hence returns early. +		// - exitIdleMode() wins the race and runs first and recreates the balancer +		//   and releases the lock before recreating the resolver. If Close() runs +		//   in this window, it will wait for exitIdleMode to complete. +		// +		// We achieve this synchronization using the below condition variable. +		cc.mu.Lock() +		cc.idlenessState = ccIdlenessStateActive +		cc.exitIdleCond.Signal() +		cc.mu.Unlock() +	}() + +	cc.idlenessState = ccIdlenessStateExitingIdle +	exitedIdle := false +	if cc.blockingpicker == nil { +		cc.blockingpicker = newPickerWrapper() +	} else { +		cc.blockingpicker.exitIdleMode() +		exitedIdle = true +	} +  	var credsClone credentials.TransportCredentials  	if creds := cc.dopts.copts.TransportCredentials; creds != nil {  		credsClone = creds.Clone()  	} -	cc.balancerWrapper = newCCBalancerWrapper(cc, balancer.BuildOptions{ -		DialCreds:        credsClone, -		CredsBundle:      cc.dopts.copts.CredsBundle, -		Dialer:           cc.dopts.copts.Dialer, -		Authority:        cc.authority, -		CustomUserAgent:  cc.dopts.copts.UserAgent, -		ChannelzParentID: cc.channelzID, -		Target:           cc.parsedTarget, -	}) +	if cc.balancerWrapper == nil { +		cc.balancerWrapper = newCCBalancerWrapper(cc, balancer.BuildOptions{ +			DialCreds:        credsClone, +			CredsBundle:      cc.dopts.copts.CredsBundle, +			Dialer:           cc.dopts.copts.Dialer, +			Authority:        cc.authority, +			CustomUserAgent:  cc.dopts.copts.UserAgent, +			ChannelzParentID: cc.channelzID, +			Target:           cc.parsedTarget, +		}) +	} else { +		cc.balancerWrapper.exitIdleMode() +	} +	cc.firstResolveEvent = grpcsync.NewEvent() +	cc.mu.Unlock() -	// Build the resolver. -	rWrapper, err := newCCResolverWrapper(cc, resolverBuilder) -	if err != nil { -		return nil, fmt.Errorf("failed to build resolver: %v", err) +	// This needs to be called without cc.mu because this builds a new resolver +	// which might update state or report error inline which needs to be handled +	// by cc.updateResolverState() which also grabs cc.mu. +	if err := cc.initResolverWrapper(credsClone); err != nil { +		return err +	} + +	if exitedIdle { +		cc.addTraceEvent("exiting idle mode")  	} +	return nil +} + +// enterIdleMode puts the channel in idle mode, and as part of it shuts down the +// name resolver, load balancer and any subchannels. +func (cc *ClientConn) enterIdleMode() error {  	cc.mu.Lock() -	cc.resolverWrapper = rWrapper +	if cc.conns == nil { +		cc.mu.Unlock() +		return ErrClientConnClosing +	} +	if cc.idlenessState != ccIdlenessStateActive { +		logger.Error("ClientConn asked to enter idle mode when not active") +		return nil +	} + +	// cc.conns == nil is a proxy for the ClientConn being closed. So, instead +	// of setting it to nil here, we recreate the map. This also means that we +	// don't have to do this when exiting idle mode. +	conns := cc.conns +	cc.conns = make(map[*addrConn]struct{}) + +	// TODO: Currently, we close the resolver wrapper upon entering idle mode +	// and create a new one upon exiting idle mode. This means that the +	// `cc.resolverWrapper` field would be overwritten everytime we exit idle +	// mode. While this means that we need to hold `cc.mu` when accessing +	// `cc.resolverWrapper`, it makes the code simpler in the wrapper. We should +	// try to do the same for the balancer and picker wrappers too. +	cc.resolverWrapper.close() +	cc.blockingpicker.enterIdleMode() +	cc.balancerWrapper.enterIdleMode() +	cc.csMgr.updateState(connectivity.Idle) +	cc.idlenessState = ccIdlenessStateIdle  	cc.mu.Unlock() -	// A blocking dial blocks until the clientConn is ready. -	if cc.dopts.block { -		for { -			cc.Connect() -			s := cc.GetState() -			if s == connectivity.Ready { -				break -			} else if cc.dopts.copts.FailOnNonTempDialError && s == connectivity.TransientFailure { -				if err = cc.connectionError(); err != nil { -					terr, ok := err.(interface { -						Temporary() bool -					}) -					if ok && !terr.Temporary() { -						return nil, err -					} -				} -			} -			if !cc.WaitForStateChange(ctx, s) { -				// ctx got timeout or canceled. -				if err = cc.connectionError(); err != nil && cc.dopts.returnLastError { -					return nil, err -				} -				return nil, ctx.Err() +	go func() { +		cc.addTraceEvent("entering idle mode") +		for ac := range conns { +			ac.tearDown(errConnIdling) +		} +	}() +	return nil +} + +// validateTransportCredentials performs a series of checks on the configured +// transport credentials. It returns a non-nil error if any of these conditions +// are met: +//   - no transport creds and no creds bundle is configured +//   - both transport creds and creds bundle are configured +//   - creds bundle is configured, but it lacks a transport credentials +//   - insecure transport creds configured alongside call creds that require +//     transport level security +// +// If none of the above conditions are met, the configured credentials are +// deemed valid and a nil error is returned. +func (cc *ClientConn) validateTransportCredentials() error { +	if cc.dopts.copts.TransportCredentials == nil && cc.dopts.copts.CredsBundle == nil { +		return errNoTransportSecurity +	} +	if cc.dopts.copts.TransportCredentials != nil && cc.dopts.copts.CredsBundle != nil { +		return errTransportCredsAndBundle +	} +	if cc.dopts.copts.CredsBundle != nil && cc.dopts.copts.CredsBundle.TransportCredentials() == nil { +		return errNoTransportCredsInBundle +	} +	transportCreds := cc.dopts.copts.TransportCredentials +	if transportCreds == nil { +		transportCreds = cc.dopts.copts.CredsBundle.TransportCredentials() +	} +	if transportCreds.Info().SecurityProtocol == "insecure" { +		for _, cd := range cc.dopts.copts.PerRPCCredentials { +			if cd.RequireTransportSecurity() { +				return errTransportCredentialsMissing  			}  		}  	} +	return nil +} -	return cc, nil +// channelzRegistration registers the newly created ClientConn with channelz and +// stores the returned identifier in `cc.channelzID` and `cc.csMgr.channelzID`. +// A channelz trace event is emitted for ClientConn creation. If the newly +// created ClientConn is a nested one, i.e a valid parent ClientConn ID is +// specified via a dial option, the trace event is also added to the parent. +// +// Doesn't grab cc.mu as this method is expected to be called only at Dial time. +func (cc *ClientConn) channelzRegistration(target string) { +	cc.channelzID = channelz.RegisterChannel(&channelzChannel{cc}, cc.dopts.channelzParentID, target) +	cc.addTraceEvent("created") +	cc.csMgr.channelzID = cc.channelzID  }  // chainUnaryClientInterceptors chains all unary client interceptors into one. @@ -471,7 +621,9 @@ type ClientConn struct {  	authority       string               // See determineAuthority().  	dopts           dialOptions          // Default and user specified dial options.  	channelzID      *channelz.Identifier // Channelz identifier for the channel. +	resolverBuilder resolver.Builder     // See parseTargetAndFindResolver().  	balancerWrapper *ccBalancerWrapper   // Uses gracefulswitch.balancer underneath. +	idlenessMgr     idlenessManager  	// The following provide their own synchronization, and therefore don't  	// require cc.mu to be held to access them. @@ -492,11 +644,31 @@ type ClientConn struct {  	sc              *ServiceConfig             // Latest service config received from the resolver.  	conns           map[*addrConn]struct{}     // Set to nil on close.  	mkp             keepalive.ClientParameters // May be updated upon receipt of a GoAway. +	idlenessState   ccIdlenessState            // Tracks idleness state of the channel. +	exitIdleCond    *sync.Cond                 // Signalled when channel exits idle.  	lceMu               sync.Mutex // protects lastConnectionError  	lastConnectionError error  } +// ccIdlenessState tracks the idleness state of the channel. +// +// Channels start off in `active` and move to `idle` after a period of +// inactivity. When moving back to `active` upon an incoming RPC, they +// transition through `exiting_idle`. This state is useful for synchronization +// with Close(). +// +// This state tracking is mostly for self-protection. The idlenessManager is +// expected to keep track of the state as well, and is expected not to call into +// the ClientConn unnecessarily. +type ccIdlenessState int8 + +const ( +	ccIdlenessStateActive ccIdlenessState = iota +	ccIdlenessStateIdle +	ccIdlenessStateExitingIdle +) +  // WaitForStateChange waits until the connectivity.State of ClientConn changes from sourceState or  // ctx expires. A true value is returned in former case and false in latter.  // @@ -536,7 +708,10 @@ func (cc *ClientConn) GetState() connectivity.State {  // Notice: This API is EXPERIMENTAL and may be changed or removed in a later  // release.  func (cc *ClientConn) Connect() { -	cc.balancerWrapper.exitIdle() +	cc.exitIdleMode() +	// If the ClientConn was not in idle mode, we need to call ExitIdle on the +	// LB policy so that connections can be created. +	cc.balancerWrapper.exitIdleMode()  }  func (cc *ClientConn) scWatcher() { @@ -693,6 +868,20 @@ func (cc *ClientConn) handleSubConnStateChange(sc balancer.SubConn, s connectivi  	cc.balancerWrapper.updateSubConnState(sc, s, err)  } +// Makes a copy of the input addresses slice and clears out the balancer +// attributes field. Addresses are passed during subconn creation and address +// update operations. In both cases, we will clear the balancer attributes by +// calling this function, and therefore we will be able to use the Equal method +// provided by the resolver.Address type for comparison. +func copyAddressesWithoutBalancerAttributes(in []resolver.Address) []resolver.Address { +	out := make([]resolver.Address, len(in)) +	for i := range in { +		out[i] = in[i] +		out[i].BalancerAttributes = nil +	} +	return out +} +  // newAddrConn creates an addrConn for addrs and adds it to cc.conns.  //  // Caller needs to make sure len(addrs) > 0. @@ -700,11 +889,12 @@ func (cc *ClientConn) newAddrConn(addrs []resolver.Address, opts balancer.NewSub  	ac := &addrConn{  		state:        connectivity.Idle,  		cc:           cc, -		addrs:        addrs, +		addrs:        copyAddressesWithoutBalancerAttributes(addrs),  		scopts:       opts,  		dopts:        cc.dopts,  		czData:       new(channelzData),  		resetBackoff: make(chan struct{}), +		stateChan:    make(chan struct{}),  	}  	ac.ctx, ac.cancel = context.WithCancel(cc.ctx)  	// Track ac in cc. This needs to be done before any getTransport(...) is called. @@ -798,9 +988,6 @@ func (ac *addrConn) connect() error {  		ac.mu.Unlock()  		return nil  	} -	// Update connectivity state within the lock to prevent subsequent or -	// concurrent calls from resetting the transport more than once. -	ac.updateConnectivityState(connectivity.Connecting, nil)  	ac.mu.Unlock()  	ac.resetTransport() @@ -819,58 +1006,63 @@ func equalAddresses(a, b []resolver.Address) bool {  	return true  } -// tryUpdateAddrs tries to update ac.addrs with the new addresses list. -// -// If ac is TransientFailure, it updates ac.addrs and returns true. The updated -// addresses will be picked up by retry in the next iteration after backoff. -// -// If ac is Shutdown or Idle, it updates ac.addrs and returns true. -// -// If the addresses is the same as the old list, it does nothing and returns -// true. -// -// If ac is Connecting, it returns false. The caller should tear down the ac and -// create a new one. Note that the backoff will be reset when this happens. -// -// If ac is Ready, it checks whether current connected address of ac is in the -// new addrs list. -//   - If true, it updates ac.addrs and returns true. The ac will keep using -//     the existing connection. -//   - If false, it does nothing and returns false. -func (ac *addrConn) tryUpdateAddrs(addrs []resolver.Address) bool { +// updateAddrs updates ac.addrs with the new addresses list and handles active +// connections or connection attempts. +func (ac *addrConn) updateAddrs(addrs []resolver.Address) {  	ac.mu.Lock() -	defer ac.mu.Unlock() -	channelz.Infof(logger, ac.channelzID, "addrConn: tryUpdateAddrs curAddr: %v, addrs: %v", ac.curAddr, addrs) +	channelz.Infof(logger, ac.channelzID, "addrConn: updateAddrs curAddr: %v, addrs: %v", pretty.ToJSON(ac.curAddr), pretty.ToJSON(addrs)) + +	addrs = copyAddressesWithoutBalancerAttributes(addrs) +	if equalAddresses(ac.addrs, addrs) { +		ac.mu.Unlock() +		return +	} + +	ac.addrs = addrs +  	if ac.state == connectivity.Shutdown ||  		ac.state == connectivity.TransientFailure ||  		ac.state == connectivity.Idle { -		ac.addrs = addrs -		return true +		// We were not connecting, so do nothing but update the addresses. +		ac.mu.Unlock() +		return  	} -	if equalAddresses(ac.addrs, addrs) { -		return true +	if ac.state == connectivity.Ready { +		// Try to find the connected address. +		for _, a := range addrs { +			a.ServerName = ac.cc.getServerName(a) +			if a.Equal(ac.curAddr) { +				// We are connected to a valid address, so do nothing but +				// update the addresses. +				ac.mu.Unlock() +				return +			} +		}  	} -	if ac.state == connectivity.Connecting { -		return false -	} +	// We are either connected to the wrong address or currently connecting. +	// Stop the current iteration and restart. -	// ac.state is Ready, try to find the connected address. -	var curAddrFound bool -	for _, a := range addrs { -		a.ServerName = ac.cc.getServerName(a) -		if reflect.DeepEqual(ac.curAddr, a) { -			curAddrFound = true -			break -		} +	ac.cancel() +	ac.ctx, ac.cancel = context.WithCancel(ac.cc.ctx) + +	// We have to defer here because GracefulClose => Close => onClose, which +	// requires locking ac.mu. +	if ac.transport != nil { +		defer ac.transport.GracefulClose() +		ac.transport = nil  	} -	channelz.Infof(logger, ac.channelzID, "addrConn: tryUpdateAddrs curAddrFound: %v", curAddrFound) -	if curAddrFound { -		ac.addrs = addrs + +	if len(addrs) == 0 { +		ac.updateConnectivityState(connectivity.Idle, nil)  	} -	return curAddrFound +	ac.mu.Unlock() + +	// Since we were connecting/connected, we should start a new connection +	// attempt. +	go ac.resetTransport()  }  // getServerName determines the serverName to be used in the connection @@ -1023,39 +1215,40 @@ func (cc *ClientConn) Close() error {  		cc.mu.Unlock()  		return ErrClientConnClosing  	} + +	for cc.idlenessState == ccIdlenessStateExitingIdle { +		cc.exitIdleCond.Wait() +	} +  	conns := cc.conns  	cc.conns = nil  	cc.csMgr.updateState(connectivity.Shutdown) +	pWrapper := cc.blockingpicker  	rWrapper := cc.resolverWrapper -	cc.resolverWrapper = nil  	bWrapper := cc.balancerWrapper +	idlenessMgr := cc.idlenessMgr  	cc.mu.Unlock()  	// The order of closing matters here since the balancer wrapper assumes the  	// picker is closed before it is closed. -	cc.blockingpicker.close() +	if pWrapper != nil { +		pWrapper.close() +	}  	if bWrapper != nil {  		bWrapper.close()  	}  	if rWrapper != nil {  		rWrapper.close()  	} +	if idlenessMgr != nil { +		idlenessMgr.close() +	}  	for ac := range conns {  		ac.tearDown(ErrClientConnClosing)  	} -	ted := &channelz.TraceEventDesc{ -		Desc:     "Channel deleted", -		Severity: channelz.CtInfo, -	} -	if cc.dopts.channelzParentID != nil { -		ted.Parent = &channelz.TraceEventDesc{ -			Desc:     fmt.Sprintf("Nested channel(id:%d) deleted", cc.channelzID.Int()), -			Severity: channelz.CtInfo, -		} -	} -	channelz.AddTraceEvent(logger, cc.channelzID, 0, ted) +	cc.addTraceEvent("deleted")  	// TraceEvent needs to be called before RemoveEntry, as TraceEvent may add  	// trace reference to the entity being deleted, and thus prevent it from being  	// deleted right away. @@ -1085,7 +1278,8 @@ type addrConn struct {  	addrs   []resolver.Address // All addresses that the resolver resolved to.  	// Use updateConnectivityState for updating addrConn's connectivity state. -	state connectivity.State +	state     connectivity.State +	stateChan chan struct{} // closed and recreated on every state change.  	backoffIdx   int // Needs to be stateful for resetConnectBackoff.  	resetBackoff chan struct{} @@ -1099,6 +1293,9 @@ func (ac *addrConn) updateConnectivityState(s connectivity.State, lastErr error)  	if ac.state == s {  		return  	} +	// When changing states, reset the state change channel. +	close(ac.stateChan) +	ac.stateChan = make(chan struct{})  	ac.state = s  	if lastErr == nil {  		channelz.Infof(logger, ac.channelzID, "Subchannel Connectivity change to %v", s) @@ -1124,7 +1321,8 @@ func (ac *addrConn) adjustParams(r transport.GoAwayReason) {  func (ac *addrConn) resetTransport() {  	ac.mu.Lock() -	if ac.state == connectivity.Shutdown { +	acCtx := ac.ctx +	if acCtx.Err() != nil {  		ac.mu.Unlock()  		return  	} @@ -1152,15 +1350,14 @@ func (ac *addrConn) resetTransport() {  	ac.updateConnectivityState(connectivity.Connecting, nil)  	ac.mu.Unlock() -	if err := ac.tryAllAddrs(addrs, connectDeadline); err != nil { +	if err := ac.tryAllAddrs(acCtx, addrs, connectDeadline); err != nil {  		ac.cc.resolveNow(resolver.ResolveNowOptions{})  		// After exhausting all addresses, the addrConn enters  		// TRANSIENT_FAILURE. -		ac.mu.Lock() -		if ac.state == connectivity.Shutdown { -			ac.mu.Unlock() +		if acCtx.Err() != nil {  			return  		} +		ac.mu.Lock()  		ac.updateConnectivityState(connectivity.TransientFailure, err)  		// Backoff. @@ -1175,13 +1372,13 @@ func (ac *addrConn) resetTransport() {  			ac.mu.Unlock()  		case <-b:  			timer.Stop() -		case <-ac.ctx.Done(): +		case <-acCtx.Done():  			timer.Stop()  			return  		}  		ac.mu.Lock() -		if ac.state != connectivity.Shutdown { +		if acCtx.Err() == nil {  			ac.updateConnectivityState(connectivity.Idle, err)  		}  		ac.mu.Unlock() @@ -1196,14 +1393,13 @@ func (ac *addrConn) resetTransport() {  // tryAllAddrs tries to creates a connection to the addresses, and stop when at  // the first successful one. It returns an error if no address was successfully  // connected, or updates ac appropriately with the new transport. -func (ac *addrConn) tryAllAddrs(addrs []resolver.Address, connectDeadline time.Time) error { +func (ac *addrConn) tryAllAddrs(ctx context.Context, addrs []resolver.Address, connectDeadline time.Time) error {  	var firstConnErr error  	for _, addr := range addrs { -		ac.mu.Lock() -		if ac.state == connectivity.Shutdown { -			ac.mu.Unlock() +		if ctx.Err() != nil {  			return errConnClosing  		} +		ac.mu.Lock()  		ac.cc.mu.RLock()  		ac.dopts.copts.KeepaliveParams = ac.cc.mkp @@ -1217,7 +1413,7 @@ func (ac *addrConn) tryAllAddrs(addrs []resolver.Address, connectDeadline time.T  		channelz.Infof(logger, ac.channelzID, "Subchannel picks a new address %q to connect", addr.Addr) -		err := ac.createTransport(addr, copts, connectDeadline) +		err := ac.createTransport(ctx, addr, copts, connectDeadline)  		if err == nil {  			return nil  		} @@ -1234,19 +1430,20 @@ func (ac *addrConn) tryAllAddrs(addrs []resolver.Address, connectDeadline time.T  // createTransport creates a connection to addr. It returns an error if the  // address was not successfully connected, or updates ac appropriately with the  // new transport. -func (ac *addrConn) createTransport(addr resolver.Address, copts transport.ConnectOptions, connectDeadline time.Time) error { +func (ac *addrConn) createTransport(ctx context.Context, addr resolver.Address, copts transport.ConnectOptions, connectDeadline time.Time) error {  	addr.ServerName = ac.cc.getServerName(addr) -	hctx, hcancel := context.WithCancel(ac.ctx) +	hctx, hcancel := context.WithCancel(ctx)  	onClose := func(r transport.GoAwayReason) {  		ac.mu.Lock()  		defer ac.mu.Unlock()  		// adjust params based on GoAwayReason  		ac.adjustParams(r) -		if ac.state == connectivity.Shutdown { -			// Already shut down.  tearDown() already cleared the transport and -			// canceled hctx via ac.ctx, and we expected this connection to be -			// closed, so do nothing here. +		if ctx.Err() != nil { +			// Already shut down or connection attempt canceled.  tearDown() or +			// updateAddrs() already cleared the transport and canceled hctx +			// via ac.ctx, and we expected this connection to be closed, so do +			// nothing here.  			return  		}  		hcancel() @@ -1265,7 +1462,7 @@ func (ac *addrConn) createTransport(addr resolver.Address, copts transport.Conne  		ac.updateConnectivityState(connectivity.Idle, nil)  	} -	connectCtx, cancel := context.WithDeadline(ac.ctx, connectDeadline) +	connectCtx, cancel := context.WithDeadline(ctx, connectDeadline)  	defer cancel()  	copts.ChannelzParentID = ac.channelzID @@ -1282,7 +1479,7 @@ func (ac *addrConn) createTransport(addr resolver.Address, copts transport.Conne  	ac.mu.Lock()  	defer ac.mu.Unlock() -	if ac.state == connectivity.Shutdown { +	if ctx.Err() != nil {  		// This can happen if the subConn was removed while in `Connecting`  		// state. tearDown() would have set the state to `Shutdown`, but  		// would not have closed the transport since ac.transport would not @@ -1294,6 +1491,9 @@ func (ac *addrConn) createTransport(addr resolver.Address, copts transport.Conne  		// The error we pass to Close() is immaterial since there are no open  		// streams at this point, so no trailers with error details will be sent  		// out. We just need to pass a non-nil error. +		// +		// This can also happen when updateAddrs is called during a connection +		// attempt.  		go newTr.Close(transport.ErrConnClosing)  		return nil  	} @@ -1401,6 +1601,29 @@ func (ac *addrConn) getReadyTransport() transport.ClientTransport {  	return nil  } +// getTransport waits until the addrconn is ready and returns the transport. +// If the context expires first, returns an appropriate status.  If the +// addrConn is stopped first, returns an Unavailable status error. +func (ac *addrConn) getTransport(ctx context.Context) (transport.ClientTransport, error) { +	for ctx.Err() == nil { +		ac.mu.Lock() +		t, state, sc := ac.transport, ac.state, ac.stateChan +		ac.mu.Unlock() +		if state == connectivity.Ready { +			return t, nil +		} +		if state == connectivity.Shutdown { +			return nil, status.Errorf(codes.Unavailable, "SubConn shutting down") +		} + +		select { +		case <-ctx.Done(): +		case <-sc: +		} +	} +	return nil, status.FromContextError(ctx.Err()).Err() +} +  // tearDown starts to tear down the addrConn.  //  // Note that tearDown doesn't remove ac from ac.cc.conns, so the addrConn struct @@ -1552,7 +1775,14 @@ func (cc *ClientConn) connectionError() error {  	return cc.lastConnectionError  } -func (cc *ClientConn) parseTargetAndFindResolver() (resolver.Builder, error) { +// parseTargetAndFindResolver parses the user's dial target and stores the +// parsed target in `cc.parsedTarget`. +// +// The resolver to use is determined based on the scheme in the parsed target +// and the same is stored in `cc.resolverBuilder`. +// +// Doesn't grab cc.mu as this method is expected to be called only at Dial time. +func (cc *ClientConn) parseTargetAndFindResolver() error {  	channelz.Infof(logger, cc.channelzID, "original dial target is: %q", cc.target)  	var rb resolver.Builder @@ -1564,7 +1794,8 @@ func (cc *ClientConn) parseTargetAndFindResolver() (resolver.Builder, error) {  		rb = cc.getResolver(parsedTarget.URL.Scheme)  		if rb != nil {  			cc.parsedTarget = parsedTarget -			return rb, nil +			cc.resolverBuilder = rb +			return nil  		}  	} @@ -1579,38 +1810,98 @@ func (cc *ClientConn) parseTargetAndFindResolver() (resolver.Builder, error) {  	parsedTarget, err = parseTarget(canonicalTarget)  	if err != nil {  		channelz.Infof(logger, cc.channelzID, "dial target %q parse failed: %v", canonicalTarget, err) -		return nil, err +		return err  	}  	channelz.Infof(logger, cc.channelzID, "parsed dial target is: %+v", parsedTarget)  	rb = cc.getResolver(parsedTarget.URL.Scheme)  	if rb == nil { -		return nil, fmt.Errorf("could not get resolver for default scheme: %q", parsedTarget.URL.Scheme) +		return fmt.Errorf("could not get resolver for default scheme: %q", parsedTarget.URL.Scheme)  	}  	cc.parsedTarget = parsedTarget -	return rb, nil +	cc.resolverBuilder = rb +	return nil  }  // parseTarget uses RFC 3986 semantics to parse the given target into a -// resolver.Target struct containing scheme, authority and url. Query -// params are stripped from the endpoint. +// resolver.Target struct containing url. Query params are stripped from the +// endpoint.  func parseTarget(target string) (resolver.Target, error) {  	u, err := url.Parse(target)  	if err != nil {  		return resolver.Target{}, err  	} -	return resolver.Target{ -		Scheme:    u.Scheme, -		Authority: u.Host, -		URL:       *u, -	}, nil +	return resolver.Target{URL: *u}, nil +} + +func encodeAuthority(authority string) string { +	const upperhex = "0123456789ABCDEF" + +	// Return for characters that must be escaped as per +	// Valid chars are mentioned here: +	// https://datatracker.ietf.org/doc/html/rfc3986#section-3.2 +	shouldEscape := func(c byte) bool { +		// Alphanum are always allowed. +		if 'a' <= c && c <= 'z' || 'A' <= c && c <= 'Z' || '0' <= c && c <= '9' { +			return false +		} +		switch c { +		case '-', '_', '.', '~': // Unreserved characters +			return false +		case '!', '$', '&', '\'', '(', ')', '*', '+', ',', ';', '=': // Subdelim characters +			return false +		case ':', '[', ']', '@': // Authority related delimeters +			return false +		} +		// Everything else must be escaped. +		return true +	} + +	hexCount := 0 +	for i := 0; i < len(authority); i++ { +		c := authority[i] +		if shouldEscape(c) { +			hexCount++ +		} +	} + +	if hexCount == 0 { +		return authority +	} + +	required := len(authority) + 2*hexCount +	t := make([]byte, required) + +	j := 0 +	// This logic is a barebones version of escape in the go net/url library. +	for i := 0; i < len(authority); i++ { +		switch c := authority[i]; { +		case shouldEscape(c): +			t[j] = '%' +			t[j+1] = upperhex[c>>4] +			t[j+2] = upperhex[c&15] +			j += 3 +		default: +			t[j] = authority[i] +			j++ +		} +	} +	return string(t)  }  // Determine channel authority. The order of precedence is as follows:  // - user specified authority override using `WithAuthority` dial option  // - creds' notion of server name for the authentication handshake  // - endpoint from dial target of the form "scheme://[authority]/endpoint" -func determineAuthority(endpoint, target string, dopts dialOptions) (string, error) { +// +// Stores the determined authority in `cc.authority`. +// +// Returns a non-nil error if the authority returned by the transport +// credentials do not match the authority configured through the dial option. +// +// Doesn't grab cc.mu as this method is expected to be called only at Dial time. +func (cc *ClientConn) determineAuthority() error { +	dopts := cc.dopts  	// Historically, we had two options for users to specify the serverName or  	// authority for a channel. One was through the transport credentials  	// (either in its constructor, or through the OverrideServerName() method). @@ -1627,25 +1918,62 @@ func determineAuthority(endpoint, target string, dopts dialOptions) (string, err  	}  	authorityFromDialOption := dopts.authority  	if (authorityFromCreds != "" && authorityFromDialOption != "") && authorityFromCreds != authorityFromDialOption { -		return "", fmt.Errorf("ClientConn's authority from transport creds %q and dial option %q don't match", authorityFromCreds, authorityFromDialOption) +		return fmt.Errorf("ClientConn's authority from transport creds %q and dial option %q don't match", authorityFromCreds, authorityFromDialOption)  	} +	endpoint := cc.parsedTarget.Endpoint() +	target := cc.target  	switch {  	case authorityFromDialOption != "": -		return authorityFromDialOption, nil +		cc.authority = authorityFromDialOption  	case authorityFromCreds != "": -		return authorityFromCreds, nil +		cc.authority = authorityFromCreds  	case strings.HasPrefix(target, "unix:") || strings.HasPrefix(target, "unix-abstract:"):  		// TODO: remove when the unix resolver implements optional interface to  		// return channel authority. -		return "localhost", nil +		cc.authority = "localhost"  	case strings.HasPrefix(endpoint, ":"): -		return "localhost" + endpoint, nil +		cc.authority = "localhost" + endpoint  	default:  		// TODO: Define an optional interface on the resolver builder to return  		// the channel authority given the user's dial target. For resolvers  		// which don't implement this interface, we will use the endpoint from  		// "scheme://authority/endpoint" as the default authority. -		return endpoint, nil +		// Escape the endpoint to handle use cases where the endpoint +		// might not be a valid authority by default. +		// For example an endpoint which has multiple paths like +		// 'a/b/c', which is not a valid authority by default. +		cc.authority = encodeAuthority(endpoint)  	} +	channelz.Infof(logger, cc.channelzID, "Channel authority set to %q", cc.authority) +	return nil +} + +// initResolverWrapper creates a ccResolverWrapper, which builds the name +// resolver. This method grabs the lock to assign the newly built resolver +// wrapper to the cc.resolverWrapper field. +func (cc *ClientConn) initResolverWrapper(creds credentials.TransportCredentials) error { +	rw, err := newCCResolverWrapper(cc, ccResolverWrapperOpts{ +		target:  cc.parsedTarget, +		builder: cc.resolverBuilder, +		bOpts: resolver.BuildOptions{ +			DisableServiceConfig: cc.dopts.disableServiceConfig, +			DialCreds:            creds, +			CredsBundle:          cc.dopts.copts.CredsBundle, +			Dialer:               cc.dopts.copts.Dialer, +		}, +		channelzID: cc.channelzID, +	}) +	if err != nil { +		return fmt.Errorf("failed to build resolver: %v", err) +	} +	// Resolver implementations may report state update or error inline when +	// built (or right after), and this is handled in cc.updateResolverState. +	// Also, an error from the resolver might lead to a re-resolution request +	// from the balancer, which is handled in resolveNow() where +	// `cc.resolverWrapper` is accessed. Hence, we need to hold the lock here. +	cc.mu.Lock() +	cc.resolverWrapper = rw +	cc.mu.Unlock() +	return nil  } diff --git a/vendor/google.golang.org/grpc/dialoptions.go b/vendor/google.golang.org/grpc/dialoptions.go index cdc8263bd..23ea95237 100644 --- a/vendor/google.golang.org/grpc/dialoptions.go +++ b/vendor/google.golang.org/grpc/dialoptions.go @@ -77,6 +77,8 @@ type dialOptions struct {  	defaultServiceConfig        *ServiceConfig // defaultServiceConfig is parsed from defaultServiceConfigRawJSON.  	defaultServiceConfigRawJSON *string  	resolvers                   []resolver.Builder +	idleTimeout                 time.Duration +	recvBufferPool              SharedBufferPool  }  // DialOption configures how we set up the connection. @@ -627,6 +629,7 @@ func defaultDialOptions() dialOptions {  			ReadBufferSize:  defaultReadBufSize,  			UseProxy:        true,  		}, +		recvBufferPool: nopBufferPool{},  	}  } @@ -655,3 +658,44 @@ func WithResolvers(rs ...resolver.Builder) DialOption {  		o.resolvers = append(o.resolvers, rs...)  	})  } + +// WithIdleTimeout returns a DialOption that configures an idle timeout for the +// channel. If the channel is idle for the configured timeout, i.e there are no +// ongoing RPCs and no new RPCs are initiated, the channel will enter idle mode +// and as a result the name resolver and load balancer will be shut down. The +// channel will exit idle mode when the Connect() method is called or when an +// RPC is initiated. +// +// By default this feature is disabled, which can also be explicitly configured +// by passing zero to this function. +// +// # Experimental +// +// Notice: This API is EXPERIMENTAL and may be changed or removed in a +// later release. +func WithIdleTimeout(d time.Duration) DialOption { +	return newFuncDialOption(func(o *dialOptions) { +		o.idleTimeout = d +	}) +} + +// WithRecvBufferPool returns a DialOption that configures the ClientConn +// to use the provided shared buffer pool for parsing incoming messages. Depending +// on the application's workload, this could result in reduced memory allocation. +// +// If you are unsure about how to implement a memory pool but want to utilize one, +// begin with grpc.NewSharedBufferPool. +// +// Note: The shared buffer pool feature will not be active if any of the following +// options are used: WithStatsHandler, EnableTracing, or binary logging. In such +// cases, the shared buffer pool will be ignored. +// +// # Experimental +// +// Notice: This API is EXPERIMENTAL and may be changed or removed in a +// later release. +func WithRecvBufferPool(bufferPool SharedBufferPool) DialOption { +	return newFuncDialOption(func(o *dialOptions) { +		o.recvBufferPool = bufferPool +	}) +} diff --git a/vendor/google.golang.org/grpc/health/grpc_health_v1/health.pb.go b/vendor/google.golang.org/grpc/health/grpc_health_v1/health.pb.go new file mode 100644 index 000000000..142d35f75 --- /dev/null +++ b/vendor/google.golang.org/grpc/health/grpc_health_v1/health.pb.go @@ -0,0 +1,308 @@ +// Copyright 2015 The gRPC Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +//     http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// The canonical version of this proto can be found at +// https://github.com/grpc/grpc-proto/blob/master/grpc/health/v1/health.proto + +// Code generated by protoc-gen-go. DO NOT EDIT. +// versions: +// 	protoc-gen-go v1.30.0 +// 	protoc        v4.22.0 +// source: grpc/health/v1/health.proto + +package grpc_health_v1 + +import ( +	protoreflect "google.golang.org/protobuf/reflect/protoreflect" +	protoimpl "google.golang.org/protobuf/runtime/protoimpl" +	reflect "reflect" +	sync "sync" +) + +const ( +	// Verify that this generated code is sufficiently up-to-date. +	_ = protoimpl.EnforceVersion(20 - protoimpl.MinVersion) +	// Verify that runtime/protoimpl is sufficiently up-to-date. +	_ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20) +) + +type HealthCheckResponse_ServingStatus int32 + +const ( +	HealthCheckResponse_UNKNOWN         HealthCheckResponse_ServingStatus = 0 +	HealthCheckResponse_SERVING         HealthCheckResponse_ServingStatus = 1 +	HealthCheckResponse_NOT_SERVING     HealthCheckResponse_ServingStatus = 2 +	HealthCheckResponse_SERVICE_UNKNOWN HealthCheckResponse_ServingStatus = 3 // Used only by the Watch method. +) + +// Enum value maps for HealthCheckResponse_ServingStatus. +var ( +	HealthCheckResponse_ServingStatus_name = map[int32]string{ +		0: "UNKNOWN", +		1: "SERVING", +		2: "NOT_SERVING", +		3: "SERVICE_UNKNOWN", +	} +	HealthCheckResponse_ServingStatus_value = map[string]int32{ +		"UNKNOWN":         0, +		"SERVING":         1, +		"NOT_SERVING":     2, +		"SERVICE_UNKNOWN": 3, +	} +) + +func (x HealthCheckResponse_ServingStatus) Enum() *HealthCheckResponse_ServingStatus { +	p := new(HealthCheckResponse_ServingStatus) +	*p = x +	return p +} + +func (x HealthCheckResponse_ServingStatus) String() string { +	return protoimpl.X.EnumStringOf(x.Descriptor(), protoreflect.EnumNumber(x)) +} + +func (HealthCheckResponse_ServingStatus) Descriptor() protoreflect.EnumDescriptor { +	return file_grpc_health_v1_health_proto_enumTypes[0].Descriptor() +} + +func (HealthCheckResponse_ServingStatus) Type() protoreflect.EnumType { +	return &file_grpc_health_v1_health_proto_enumTypes[0] +} + +func (x HealthCheckResponse_ServingStatus) Number() protoreflect.EnumNumber { +	return protoreflect.EnumNumber(x) +} + +// Deprecated: Use HealthCheckResponse_ServingStatus.Descriptor instead. +func (HealthCheckResponse_ServingStatus) EnumDescriptor() ([]byte, []int) { +	return file_grpc_health_v1_health_proto_rawDescGZIP(), []int{1, 0} +} + +type HealthCheckRequest struct { +	state         protoimpl.MessageState +	sizeCache     protoimpl.SizeCache +	unknownFields protoimpl.UnknownFields + +	Service string `protobuf:"bytes,1,opt,name=service,proto3" json:"service,omitempty"` +} + +func (x *HealthCheckRequest) Reset() { +	*x = HealthCheckRequest{} +	if protoimpl.UnsafeEnabled { +		mi := &file_grpc_health_v1_health_proto_msgTypes[0] +		ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) +		ms.StoreMessageInfo(mi) +	} +} + +func (x *HealthCheckRequest) String() string { +	return protoimpl.X.MessageStringOf(x) +} + +func (*HealthCheckRequest) ProtoMessage() {} + +func (x *HealthCheckRequest) ProtoReflect() protoreflect.Message { +	mi := &file_grpc_health_v1_health_proto_msgTypes[0] +	if protoimpl.UnsafeEnabled && x != nil { +		ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) +		if ms.LoadMessageInfo() == nil { +			ms.StoreMessageInfo(mi) +		} +		return ms +	} +	return mi.MessageOf(x) +} + +// Deprecated: Use HealthCheckRequest.ProtoReflect.Descriptor instead. +func (*HealthCheckRequest) Descriptor() ([]byte, []int) { +	return file_grpc_health_v1_health_proto_rawDescGZIP(), []int{0} +} + +func (x *HealthCheckRequest) GetService() string { +	if x != nil { +		return x.Service +	} +	return "" +} + +type HealthCheckResponse struct { +	state         protoimpl.MessageState +	sizeCache     protoimpl.SizeCache +	unknownFields protoimpl.UnknownFields + +	Status HealthCheckResponse_ServingStatus `protobuf:"varint,1,opt,name=status,proto3,enum=grpc.health.v1.HealthCheckResponse_ServingStatus" json:"status,omitempty"` +} + +func (x *HealthCheckResponse) Reset() { +	*x = HealthCheckResponse{} +	if protoimpl.UnsafeEnabled { +		mi := &file_grpc_health_v1_health_proto_msgTypes[1] +		ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) +		ms.StoreMessageInfo(mi) +	} +} + +func (x *HealthCheckResponse) String() string { +	return protoimpl.X.MessageStringOf(x) +} + +func (*HealthCheckResponse) ProtoMessage() {} + +func (x *HealthCheckResponse) ProtoReflect() protoreflect.Message { +	mi := &file_grpc_health_v1_health_proto_msgTypes[1] +	if protoimpl.UnsafeEnabled && x != nil { +		ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) +		if ms.LoadMessageInfo() == nil { +			ms.StoreMessageInfo(mi) +		} +		return ms +	} +	return mi.MessageOf(x) +} + +// Deprecated: Use HealthCheckResponse.ProtoReflect.Descriptor instead. +func (*HealthCheckResponse) Descriptor() ([]byte, []int) { +	return file_grpc_health_v1_health_proto_rawDescGZIP(), []int{1} +} + +func (x *HealthCheckResponse) GetStatus() HealthCheckResponse_ServingStatus { +	if x != nil { +		return x.Status +	} +	return HealthCheckResponse_UNKNOWN +} + +var File_grpc_health_v1_health_proto protoreflect.FileDescriptor + +var file_grpc_health_v1_health_proto_rawDesc = []byte{ +	0x0a, 0x1b, 0x67, 0x72, 0x70, 0x63, 0x2f, 0x68, 0x65, 0x61, 0x6c, 0x74, 0x68, 0x2f, 0x76, 0x31, +	0x2f, 0x68, 0x65, 0x61, 0x6c, 0x74, 0x68, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x12, 0x0e, 0x67, +	0x72, 0x70, 0x63, 0x2e, 0x68, 0x65, 0x61, 0x6c, 0x74, 0x68, 0x2e, 0x76, 0x31, 0x22, 0x2e, 0x0a, +	0x12, 0x48, 0x65, 0x61, 0x6c, 0x74, 0x68, 0x43, 0x68, 0x65, 0x63, 0x6b, 0x52, 0x65, 0x71, 0x75, +	0x65, 0x73, 0x74, 0x12, 0x18, 0x0a, 0x07, 0x73, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x18, 0x01, +	0x20, 0x01, 0x28, 0x09, 0x52, 0x07, 0x73, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x22, 0xb1, 0x01, +	0x0a, 0x13, 0x48, 0x65, 0x61, 0x6c, 0x74, 0x68, 0x43, 0x68, 0x65, 0x63, 0x6b, 0x52, 0x65, 0x73, +	0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x49, 0x0a, 0x06, 0x73, 0x74, 0x61, 0x74, 0x75, 0x73, 0x18, +	0x01, 0x20, 0x01, 0x28, 0x0e, 0x32, 0x31, 0x2e, 0x67, 0x72, 0x70, 0x63, 0x2e, 0x68, 0x65, 0x61, +	0x6c, 0x74, 0x68, 0x2e, 0x76, 0x31, 0x2e, 0x48, 0x65, 0x61, 0x6c, 0x74, 0x68, 0x43, 0x68, 0x65, +	0x63, 0x6b, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x2e, 0x53, 0x65, 0x72, 0x76, 0x69, +	0x6e, 0x67, 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, 0x52, 0x06, 0x73, 0x74, 0x61, 0x74, 0x75, 0x73, +	0x22, 0x4f, 0x0a, 0x0d, 0x53, 0x65, 0x72, 0x76, 0x69, 0x6e, 0x67, 0x53, 0x74, 0x61, 0x74, 0x75, +	0x73, 0x12, 0x0b, 0x0a, 0x07, 0x55, 0x4e, 0x4b, 0x4e, 0x4f, 0x57, 0x4e, 0x10, 0x00, 0x12, 0x0b, +	0x0a, 0x07, 0x53, 0x45, 0x52, 0x56, 0x49, 0x4e, 0x47, 0x10, 0x01, 0x12, 0x0f, 0x0a, 0x0b, 0x4e, +	0x4f, 0x54, 0x5f, 0x53, 0x45, 0x52, 0x56, 0x49, 0x4e, 0x47, 0x10, 0x02, 0x12, 0x13, 0x0a, 0x0f, +	0x53, 0x45, 0x52, 0x56, 0x49, 0x43, 0x45, 0x5f, 0x55, 0x4e, 0x4b, 0x4e, 0x4f, 0x57, 0x4e, 0x10, +	0x03, 0x32, 0xae, 0x01, 0x0a, 0x06, 0x48, 0x65, 0x61, 0x6c, 0x74, 0x68, 0x12, 0x50, 0x0a, 0x05, +	0x43, 0x68, 0x65, 0x63, 0x6b, 0x12, 0x22, 0x2e, 0x67, 0x72, 0x70, 0x63, 0x2e, 0x68, 0x65, 0x61, +	0x6c, 0x74, 0x68, 0x2e, 0x76, 0x31, 0x2e, 0x48, 0x65, 0x61, 0x6c, 0x74, 0x68, 0x43, 0x68, 0x65, +	0x63, 0x6b, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x23, 0x2e, 0x67, 0x72, 0x70, 0x63, +	0x2e, 0x68, 0x65, 0x61, 0x6c, 0x74, 0x68, 0x2e, 0x76, 0x31, 0x2e, 0x48, 0x65, 0x61, 0x6c, 0x74, +	0x68, 0x43, 0x68, 0x65, 0x63, 0x6b, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x52, +	0x0a, 0x05, 0x57, 0x61, 0x74, 0x63, 0x68, 0x12, 0x22, 0x2e, 0x67, 0x72, 0x70, 0x63, 0x2e, 0x68, +	0x65, 0x61, 0x6c, 0x74, 0x68, 0x2e, 0x76, 0x31, 0x2e, 0x48, 0x65, 0x61, 0x6c, 0x74, 0x68, 0x43, +	0x68, 0x65, 0x63, 0x6b, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x23, 0x2e, 0x67, 0x72, +	0x70, 0x63, 0x2e, 0x68, 0x65, 0x61, 0x6c, 0x74, 0x68, 0x2e, 0x76, 0x31, 0x2e, 0x48, 0x65, 0x61, +	0x6c, 0x74, 0x68, 0x43, 0x68, 0x65, 0x63, 0x6b, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, +	0x30, 0x01, 0x42, 0x61, 0x0a, 0x11, 0x69, 0x6f, 0x2e, 0x67, 0x72, 0x70, 0x63, 0x2e, 0x68, 0x65, +	0x61, 0x6c, 0x74, 0x68, 0x2e, 0x76, 0x31, 0x42, 0x0b, 0x48, 0x65, 0x61, 0x6c, 0x74, 0x68, 0x50, +	0x72, 0x6f, 0x74, 0x6f, 0x50, 0x01, 0x5a, 0x2c, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x67, +	0x6f, 0x6c, 0x61, 0x6e, 0x67, 0x2e, 0x6f, 0x72, 0x67, 0x2f, 0x67, 0x72, 0x70, 0x63, 0x2f, 0x68, +	0x65, 0x61, 0x6c, 0x74, 0x68, 0x2f, 0x67, 0x72, 0x70, 0x63, 0x5f, 0x68, 0x65, 0x61, 0x6c, 0x74, +	0x68, 0x5f, 0x76, 0x31, 0xaa, 0x02, 0x0e, 0x47, 0x72, 0x70, 0x63, 0x2e, 0x48, 0x65, 0x61, 0x6c, +	0x74, 0x68, 0x2e, 0x56, 0x31, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, +} + +var ( +	file_grpc_health_v1_health_proto_rawDescOnce sync.Once +	file_grpc_health_v1_health_proto_rawDescData = file_grpc_health_v1_health_proto_rawDesc +) + +func file_grpc_health_v1_health_proto_rawDescGZIP() []byte { +	file_grpc_health_v1_health_proto_rawDescOnce.Do(func() { +		file_grpc_health_v1_health_proto_rawDescData = protoimpl.X.CompressGZIP(file_grpc_health_v1_health_proto_rawDescData) +	}) +	return file_grpc_health_v1_health_proto_rawDescData +} + +var file_grpc_health_v1_health_proto_enumTypes = make([]protoimpl.EnumInfo, 1) +var file_grpc_health_v1_health_proto_msgTypes = make([]protoimpl.MessageInfo, 2) +var file_grpc_health_v1_health_proto_goTypes = []interface{}{ +	(HealthCheckResponse_ServingStatus)(0), // 0: grpc.health.v1.HealthCheckResponse.ServingStatus +	(*HealthCheckRequest)(nil),             // 1: grpc.health.v1.HealthCheckRequest +	(*HealthCheckResponse)(nil),            // 2: grpc.health.v1.HealthCheckResponse +} +var file_grpc_health_v1_health_proto_depIdxs = []int32{ +	0, // 0: grpc.health.v1.HealthCheckResponse.status:type_name -> grpc.health.v1.HealthCheckResponse.ServingStatus +	1, // 1: grpc.health.v1.Health.Check:input_type -> grpc.health.v1.HealthCheckRequest +	1, // 2: grpc.health.v1.Health.Watch:input_type -> grpc.health.v1.HealthCheckRequest +	2, // 3: grpc.health.v1.Health.Check:output_type -> grpc.health.v1.HealthCheckResponse +	2, // 4: grpc.health.v1.Health.Watch:output_type -> grpc.health.v1.HealthCheckResponse +	3, // [3:5] is the sub-list for method output_type +	1, // [1:3] is the sub-list for method input_type +	1, // [1:1] is the sub-list for extension type_name +	1, // [1:1] is the sub-list for extension extendee +	0, // [0:1] is the sub-list for field type_name +} + +func init() { file_grpc_health_v1_health_proto_init() } +func file_grpc_health_v1_health_proto_init() { +	if File_grpc_health_v1_health_proto != nil { +		return +	} +	if !protoimpl.UnsafeEnabled { +		file_grpc_health_v1_health_proto_msgTypes[0].Exporter = func(v interface{}, i int) interface{} { +			switch v := v.(*HealthCheckRequest); i { +			case 0: +				return &v.state +			case 1: +				return &v.sizeCache +			case 2: +				return &v.unknownFields +			default: +				return nil +			} +		} +		file_grpc_health_v1_health_proto_msgTypes[1].Exporter = func(v interface{}, i int) interface{} { +			switch v := v.(*HealthCheckResponse); i { +			case 0: +				return &v.state +			case 1: +				return &v.sizeCache +			case 2: +				return &v.unknownFields +			default: +				return nil +			} +		} +	} +	type x struct{} +	out := protoimpl.TypeBuilder{ +		File: protoimpl.DescBuilder{ +			GoPackagePath: reflect.TypeOf(x{}).PkgPath(), +			RawDescriptor: file_grpc_health_v1_health_proto_rawDesc, +			NumEnums:      1, +			NumMessages:   2, +			NumExtensions: 0, +			NumServices:   1, +		}, +		GoTypes:           file_grpc_health_v1_health_proto_goTypes, +		DependencyIndexes: file_grpc_health_v1_health_proto_depIdxs, +		EnumInfos:         file_grpc_health_v1_health_proto_enumTypes, +		MessageInfos:      file_grpc_health_v1_health_proto_msgTypes, +	}.Build() +	File_grpc_health_v1_health_proto = out.File +	file_grpc_health_v1_health_proto_rawDesc = nil +	file_grpc_health_v1_health_proto_goTypes = nil +	file_grpc_health_v1_health_proto_depIdxs = nil +} diff --git a/vendor/google.golang.org/grpc/health/grpc_health_v1/health_grpc.pb.go b/vendor/google.golang.org/grpc/health/grpc_health_v1/health_grpc.pb.go new file mode 100644 index 000000000..a01a1b4d5 --- /dev/null +++ b/vendor/google.golang.org/grpc/health/grpc_health_v1/health_grpc.pb.go @@ -0,0 +1,223 @@ +// Copyright 2015 The gRPC Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +//     http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// The canonical version of this proto can be found at +// https://github.com/grpc/grpc-proto/blob/master/grpc/health/v1/health.proto + +// Code generated by protoc-gen-go-grpc. DO NOT EDIT. +// versions: +// - protoc-gen-go-grpc v1.3.0 +// - protoc             v4.22.0 +// source: grpc/health/v1/health.proto + +package grpc_health_v1 + +import ( +	context "context" +	grpc "google.golang.org/grpc" +	codes "google.golang.org/grpc/codes" +	status "google.golang.org/grpc/status" +) + +// This is a compile-time assertion to ensure that this generated file +// is compatible with the grpc package it is being compiled against. +// Requires gRPC-Go v1.32.0 or later. +const _ = grpc.SupportPackageIsVersion7 + +const ( +	Health_Check_FullMethodName = "/grpc.health.v1.Health/Check" +	Health_Watch_FullMethodName = "/grpc.health.v1.Health/Watch" +) + +// HealthClient is the client API for Health service. +// +// For semantics around ctx use and closing/ending streaming RPCs, please refer to https://pkg.go.dev/google.golang.org/grpc/?tab=doc#ClientConn.NewStream. +type HealthClient interface { +	// If the requested service is unknown, the call will fail with status +	// NOT_FOUND. +	Check(ctx context.Context, in *HealthCheckRequest, opts ...grpc.CallOption) (*HealthCheckResponse, error) +	// Performs a watch for the serving status of the requested service. +	// The server will immediately send back a message indicating the current +	// serving status.  It will then subsequently send a new message whenever +	// the service's serving status changes. +	// +	// If the requested service is unknown when the call is received, the +	// server will send a message setting the serving status to +	// SERVICE_UNKNOWN but will *not* terminate the call.  If at some +	// future point, the serving status of the service becomes known, the +	// server will send a new message with the service's serving status. +	// +	// If the call terminates with status UNIMPLEMENTED, then clients +	// should assume this method is not supported and should not retry the +	// call.  If the call terminates with any other status (including OK), +	// clients should retry the call with appropriate exponential backoff. +	Watch(ctx context.Context, in *HealthCheckRequest, opts ...grpc.CallOption) (Health_WatchClient, error) +} + +type healthClient struct { +	cc grpc.ClientConnInterface +} + +func NewHealthClient(cc grpc.ClientConnInterface) HealthClient { +	return &healthClient{cc} +} + +func (c *healthClient) Check(ctx context.Context, in *HealthCheckRequest, opts ...grpc.CallOption) (*HealthCheckResponse, error) { +	out := new(HealthCheckResponse) +	err := c.cc.Invoke(ctx, Health_Check_FullMethodName, in, out, opts...) +	if err != nil { +		return nil, err +	} +	return out, nil +} + +func (c *healthClient) Watch(ctx context.Context, in *HealthCheckRequest, opts ...grpc.CallOption) (Health_WatchClient, error) { +	stream, err := c.cc.NewStream(ctx, &Health_ServiceDesc.Streams[0], Health_Watch_FullMethodName, opts...) +	if err != nil { +		return nil, err +	} +	x := &healthWatchClient{stream} +	if err := x.ClientStream.SendMsg(in); err != nil { +		return nil, err +	} +	if err := x.ClientStream.CloseSend(); err != nil { +		return nil, err +	} +	return x, nil +} + +type Health_WatchClient interface { +	Recv() (*HealthCheckResponse, error) +	grpc.ClientStream +} + +type healthWatchClient struct { +	grpc.ClientStream +} + +func (x *healthWatchClient) Recv() (*HealthCheckResponse, error) { +	m := new(HealthCheckResponse) +	if err := x.ClientStream.RecvMsg(m); err != nil { +		return nil, err +	} +	return m, nil +} + +// HealthServer is the server API for Health service. +// All implementations should embed UnimplementedHealthServer +// for forward compatibility +type HealthServer interface { +	// If the requested service is unknown, the call will fail with status +	// NOT_FOUND. +	Check(context.Context, *HealthCheckRequest) (*HealthCheckResponse, error) +	// Performs a watch for the serving status of the requested service. +	// The server will immediately send back a message indicating the current +	// serving status.  It will then subsequently send a new message whenever +	// the service's serving status changes. +	// +	// If the requested service is unknown when the call is received, the +	// server will send a message setting the serving status to +	// SERVICE_UNKNOWN but will *not* terminate the call.  If at some +	// future point, the serving status of the service becomes known, the +	// server will send a new message with the service's serving status. +	// +	// If the call terminates with status UNIMPLEMENTED, then clients +	// should assume this method is not supported and should not retry the +	// call.  If the call terminates with any other status (including OK), +	// clients should retry the call with appropriate exponential backoff. +	Watch(*HealthCheckRequest, Health_WatchServer) error +} + +// UnimplementedHealthServer should be embedded to have forward compatible implementations. +type UnimplementedHealthServer struct { +} + +func (UnimplementedHealthServer) Check(context.Context, *HealthCheckRequest) (*HealthCheckResponse, error) { +	return nil, status.Errorf(codes.Unimplemented, "method Check not implemented") +} +func (UnimplementedHealthServer) Watch(*HealthCheckRequest, Health_WatchServer) error { +	return status.Errorf(codes.Unimplemented, "method Watch not implemented") +} + +// UnsafeHealthServer may be embedded to opt out of forward compatibility for this service. +// Use of this interface is not recommended, as added methods to HealthServer will +// result in compilation errors. +type UnsafeHealthServer interface { +	mustEmbedUnimplementedHealthServer() +} + +func RegisterHealthServer(s grpc.ServiceRegistrar, srv HealthServer) { +	s.RegisterService(&Health_ServiceDesc, srv) +} + +func _Health_Check_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { +	in := new(HealthCheckRequest) +	if err := dec(in); err != nil { +		return nil, err +	} +	if interceptor == nil { +		return srv.(HealthServer).Check(ctx, in) +	} +	info := &grpc.UnaryServerInfo{ +		Server:     srv, +		FullMethod: Health_Check_FullMethodName, +	} +	handler := func(ctx context.Context, req interface{}) (interface{}, error) { +		return srv.(HealthServer).Check(ctx, req.(*HealthCheckRequest)) +	} +	return interceptor(ctx, in, info, handler) +} + +func _Health_Watch_Handler(srv interface{}, stream grpc.ServerStream) error { +	m := new(HealthCheckRequest) +	if err := stream.RecvMsg(m); err != nil { +		return err +	} +	return srv.(HealthServer).Watch(m, &healthWatchServer{stream}) +} + +type Health_WatchServer interface { +	Send(*HealthCheckResponse) error +	grpc.ServerStream +} + +type healthWatchServer struct { +	grpc.ServerStream +} + +func (x *healthWatchServer) Send(m *HealthCheckResponse) error { +	return x.ServerStream.SendMsg(m) +} + +// Health_ServiceDesc is the grpc.ServiceDesc for Health service. +// It's only intended for direct use with grpc.RegisterService, +// and not to be introspected or modified (even as a copy) +var Health_ServiceDesc = grpc.ServiceDesc{ +	ServiceName: "grpc.health.v1.Health", +	HandlerType: (*HealthServer)(nil), +	Methods: []grpc.MethodDesc{ +		{ +			MethodName: "Check", +			Handler:    _Health_Check_Handler, +		}, +	}, +	Streams: []grpc.StreamDesc{ +		{ +			StreamName:    "Watch", +			Handler:       _Health_Watch_Handler, +			ServerStreams: true, +		}, +	}, +	Metadata: "grpc/health/v1/health.proto", +} diff --git a/vendor/google.golang.org/grpc/idle.go b/vendor/google.golang.org/grpc/idle.go new file mode 100644 index 000000000..dc3dc72f6 --- /dev/null +++ b/vendor/google.golang.org/grpc/idle.go @@ -0,0 +1,287 @@ +/* + * + * Copyright 2023 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + *     http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package grpc + +import ( +	"fmt" +	"math" +	"sync" +	"sync/atomic" +	"time" +) + +// For overriding in unit tests. +var timeAfterFunc = func(d time.Duration, f func()) *time.Timer { +	return time.AfterFunc(d, f) +} + +// idlenessEnforcer is the functionality provided by grpc.ClientConn to enter +// and exit from idle mode. +type idlenessEnforcer interface { +	exitIdleMode() error +	enterIdleMode() error +} + +// idlenessManager defines the functionality required to track RPC activity on a +// channel. +type idlenessManager interface { +	onCallBegin() error +	onCallEnd() +	close() +} + +type noopIdlenessManager struct{} + +func (noopIdlenessManager) onCallBegin() error { return nil } +func (noopIdlenessManager) onCallEnd()         {} +func (noopIdlenessManager) close()             {} + +// idlenessManagerImpl implements the idlenessManager interface. It uses atomic +// operations to synchronize access to shared state and a mutex to guarantee +// mutual exclusion in a critical section. +type idlenessManagerImpl struct { +	// State accessed atomically. +	lastCallEndTime           int64 // Unix timestamp in nanos; time when the most recent RPC completed. +	activeCallsCount          int32 // Count of active RPCs; -math.MaxInt32 means channel is idle or is trying to get there. +	activeSinceLastTimerCheck int32 // Boolean; True if there was an RPC since the last timer callback. +	closed                    int32 // Boolean; True when the manager is closed. + +	// Can be accessed without atomics or mutex since these are set at creation +	// time and read-only after that. +	enforcer idlenessEnforcer // Functionality provided by grpc.ClientConn. +	timeout  int64            // Idle timeout duration nanos stored as an int64. + +	// idleMu is used to guarantee mutual exclusion in two scenarios: +	// - Opposing intentions: +	//   - a: Idle timeout has fired and handleIdleTimeout() is trying to put +	//     the channel in idle mode because the channel has been inactive. +	//   - b: At the same time an RPC is made on the channel, and onCallBegin() +	//     is trying to prevent the channel from going idle. +	// - Competing intentions: +	//   - The channel is in idle mode and there are multiple RPCs starting at +	//     the same time, all trying to move the channel out of idle. Only one +	//     of them should succeed in doing so, while the other RPCs should +	//     piggyback on the first one and be successfully handled. +	idleMu       sync.RWMutex +	actuallyIdle bool +	timer        *time.Timer +} + +// newIdlenessManager creates a new idleness manager implementation for the +// given idle timeout. +func newIdlenessManager(enforcer idlenessEnforcer, idleTimeout time.Duration) idlenessManager { +	if idleTimeout == 0 { +		return noopIdlenessManager{} +	} + +	i := &idlenessManagerImpl{ +		enforcer: enforcer, +		timeout:  int64(idleTimeout), +	} +	i.timer = timeAfterFunc(idleTimeout, i.handleIdleTimeout) +	return i +} + +// resetIdleTimer resets the idle timer to the given duration. This method +// should only be called from the timer callback. +func (i *idlenessManagerImpl) resetIdleTimer(d time.Duration) { +	i.idleMu.Lock() +	defer i.idleMu.Unlock() + +	if i.timer == nil { +		// Only close sets timer to nil. We are done. +		return +	} + +	// It is safe to ignore the return value from Reset() because this method is +	// only ever called from the timer callback, which means the timer has +	// already fired. +	i.timer.Reset(d) +} + +// handleIdleTimeout is the timer callback that is invoked upon expiry of the +// configured idle timeout. The channel is considered inactive if there are no +// ongoing calls and no RPC activity since the last time the timer fired. +func (i *idlenessManagerImpl) handleIdleTimeout() { +	if i.isClosed() { +		return +	} + +	if atomic.LoadInt32(&i.activeCallsCount) > 0 { +		i.resetIdleTimer(time.Duration(i.timeout)) +		return +	} + +	// There has been activity on the channel since we last got here. Reset the +	// timer and return. +	if atomic.LoadInt32(&i.activeSinceLastTimerCheck) == 1 { +		// Set the timer to fire after a duration of idle timeout, calculated +		// from the time the most recent RPC completed. +		atomic.StoreInt32(&i.activeSinceLastTimerCheck, 0) +		i.resetIdleTimer(time.Duration(atomic.LoadInt64(&i.lastCallEndTime) + i.timeout - time.Now().UnixNano())) +		return +	} + +	// This CAS operation is extremely likely to succeed given that there has +	// been no activity since the last time we were here.  Setting the +	// activeCallsCount to -math.MaxInt32 indicates to onCallBegin() that the +	// channel is either in idle mode or is trying to get there. +	if !atomic.CompareAndSwapInt32(&i.activeCallsCount, 0, -math.MaxInt32) { +		// This CAS operation can fail if an RPC started after we checked for +		// activity at the top of this method, or one was ongoing from before +		// the last time we were here. In both case, reset the timer and return. +		i.resetIdleTimer(time.Duration(i.timeout)) +		return +	} + +	// Now that we've set the active calls count to -math.MaxInt32, it's time to +	// actually move to idle mode. +	if i.tryEnterIdleMode() { +		// Successfully entered idle mode. No timer needed until we exit idle. +		return +	} + +	// Failed to enter idle mode due to a concurrent RPC that kept the channel +	// active, or because of an error from the channel. Undo the attempt to +	// enter idle, and reset the timer to try again later. +	atomic.AddInt32(&i.activeCallsCount, math.MaxInt32) +	i.resetIdleTimer(time.Duration(i.timeout)) +} + +// tryEnterIdleMode instructs the channel to enter idle mode. But before +// that, it performs a last minute check to ensure that no new RPC has come in, +// making the channel active. +// +// Return value indicates whether or not the channel moved to idle mode. +// +// Holds idleMu which ensures mutual exclusion with exitIdleMode. +func (i *idlenessManagerImpl) tryEnterIdleMode() bool { +	i.idleMu.Lock() +	defer i.idleMu.Unlock() + +	if atomic.LoadInt32(&i.activeCallsCount) != -math.MaxInt32 { +		// We raced and lost to a new RPC. Very rare, but stop entering idle. +		return false +	} +	if atomic.LoadInt32(&i.activeSinceLastTimerCheck) == 1 { +		// An very short RPC could have come in (and also finished) after we +		// checked for calls count and activity in handleIdleTimeout(), but +		// before the CAS operation. So, we need to check for activity again. +		return false +	} + +	// No new RPCs have come in since we last set the active calls count value +	// -math.MaxInt32 in the timer callback. And since we have the lock, it is +	// safe to enter idle mode now. +	if err := i.enforcer.enterIdleMode(); err != nil { +		logger.Errorf("Failed to enter idle mode: %v", err) +		return false +	} + +	// Successfully entered idle mode. +	i.actuallyIdle = true +	return true +} + +// onCallBegin is invoked at the start of every RPC. +func (i *idlenessManagerImpl) onCallBegin() error { +	if i.isClosed() { +		return nil +	} + +	if atomic.AddInt32(&i.activeCallsCount, 1) > 0 { +		// Channel is not idle now. Set the activity bit and allow the call. +		atomic.StoreInt32(&i.activeSinceLastTimerCheck, 1) +		return nil +	} + +	// Channel is either in idle mode or is in the process of moving to idle +	// mode. Attempt to exit idle mode to allow this RPC. +	if err := i.exitIdleMode(); err != nil { +		// Undo the increment to calls count, and return an error causing the +		// RPC to fail. +		atomic.AddInt32(&i.activeCallsCount, -1) +		return err +	} + +	atomic.StoreInt32(&i.activeSinceLastTimerCheck, 1) +	return nil +} + +// exitIdleMode instructs the channel to exit idle mode. +// +// Holds idleMu which ensures mutual exclusion with tryEnterIdleMode. +func (i *idlenessManagerImpl) exitIdleMode() error { +	i.idleMu.Lock() +	defer i.idleMu.Unlock() + +	if !i.actuallyIdle { +		// This can happen in two scenarios: +		// - handleIdleTimeout() set the calls count to -math.MaxInt32 and called +		//   tryEnterIdleMode(). But before the latter could grab the lock, an RPC +		//   came in and onCallBegin() noticed that the calls count is negative. +		// - Channel is in idle mode, and multiple new RPCs come in at the same +		//   time, all of them notice a negative calls count in onCallBegin and get +		//   here. The first one to get the lock would got the channel to exit idle. +		// +		// Either way, nothing to do here. +		return nil +	} + +	if err := i.enforcer.exitIdleMode(); err != nil { +		return fmt.Errorf("channel failed to exit idle mode: %v", err) +	} + +	// Undo the idle entry process. This also respects any new RPC attempts. +	atomic.AddInt32(&i.activeCallsCount, math.MaxInt32) +	i.actuallyIdle = false + +	// Start a new timer to fire after the configured idle timeout. +	i.timer = timeAfterFunc(time.Duration(i.timeout), i.handleIdleTimeout) +	return nil +} + +// onCallEnd is invoked at the end of every RPC. +func (i *idlenessManagerImpl) onCallEnd() { +	if i.isClosed() { +		return +	} + +	// Record the time at which the most recent call finished. +	atomic.StoreInt64(&i.lastCallEndTime, time.Now().UnixNano()) + +	// Decrement the active calls count. This count can temporarily go negative +	// when the timer callback is in the process of moving the channel to idle +	// mode, but one or more RPCs come in and complete before the timer callback +	// can get done with the process of moving to idle mode. +	atomic.AddInt32(&i.activeCallsCount, -1) +} + +func (i *idlenessManagerImpl) isClosed() bool { +	return atomic.LoadInt32(&i.closed) == 1 +} + +func (i *idlenessManagerImpl) close() { +	atomic.StoreInt32(&i.closed, 1) + +	i.idleMu.Lock() +	i.timer.Stop() +	i.timer = nil +	i.idleMu.Unlock() +} diff --git a/vendor/google.golang.org/grpc/internal/binarylog/binarylog.go b/vendor/google.golang.org/grpc/internal/binarylog/binarylog.go index af03a40d9..755fdebc1 100644 --- a/vendor/google.golang.org/grpc/internal/binarylog/binarylog.go +++ b/vendor/google.golang.org/grpc/internal/binarylog/binarylog.go @@ -32,6 +32,9 @@ var grpclogLogger = grpclog.Component("binarylog")  // Logger specifies MethodLoggers for method names with a Log call that  // takes a context. +// +// This is used in the 1.0 release of gcp/observability, and thus must not be +// deleted or changed.  type Logger interface {  	GetMethodLogger(methodName string) MethodLogger  } diff --git a/vendor/google.golang.org/grpc/internal/binarylog/method_logger.go b/vendor/google.golang.org/grpc/internal/binarylog/method_logger.go index 56fcf008d..6c3f63221 100644 --- a/vendor/google.golang.org/grpc/internal/binarylog/method_logger.go +++ b/vendor/google.golang.org/grpc/internal/binarylog/method_logger.go @@ -49,6 +49,9 @@ func (g *callIDGenerator) reset() {  var idGen callIDGenerator  // MethodLogger is the sub-logger for each method. +// +// This is used in the 1.0 release of gcp/observability, and thus must not be +// deleted or changed.  type MethodLogger interface {  	Log(context.Context, LogEntryConfig)  } @@ -65,6 +68,9 @@ type TruncatingMethodLogger struct {  }  // NewTruncatingMethodLogger returns a new truncating method logger. +// +// This is used in the 1.0 release of gcp/observability, and thus must not be +// deleted or changed.  func NewTruncatingMethodLogger(h, m uint64) *TruncatingMethodLogger {  	return &TruncatingMethodLogger{  		headerMaxLen:  h, @@ -145,6 +151,9 @@ func (ml *TruncatingMethodLogger) truncateMessage(msgPb *binlogpb.Message) (trun  }  // LogEntryConfig represents the configuration for binary log entry. +// +// This is used in the 1.0 release of gcp/observability, and thus must not be +// deleted or changed.  type LogEntryConfig interface {  	toProto() *binlogpb.GrpcLogEntry  } diff --git a/vendor/google.golang.org/grpc/internal/buffer/unbounded.go b/vendor/google.golang.org/grpc/internal/buffer/unbounded.go index 9f6a0c120..81c2f5fd7 100644 --- a/vendor/google.golang.org/grpc/internal/buffer/unbounded.go +++ b/vendor/google.golang.org/grpc/internal/buffer/unbounded.go @@ -35,6 +35,7 @@ import "sync"  // internal/transport/transport.go for an example of this.  type Unbounded struct {  	c       chan interface{} +	closed  bool  	mu      sync.Mutex  	backlog []interface{}  } @@ -47,16 +48,18 @@ func NewUnbounded() *Unbounded {  // Put adds t to the unbounded buffer.  func (b *Unbounded) Put(t interface{}) {  	b.mu.Lock() +	defer b.mu.Unlock() +	if b.closed { +		return +	}  	if len(b.backlog) == 0 {  		select {  		case b.c <- t: -			b.mu.Unlock()  			return  		default:  		}  	}  	b.backlog = append(b.backlog, t) -	b.mu.Unlock()  }  // Load sends the earliest buffered data, if any, onto the read channel @@ -64,6 +67,10 @@ func (b *Unbounded) Put(t interface{}) {  // value from the read channel.  func (b *Unbounded) Load() {  	b.mu.Lock() +	defer b.mu.Unlock() +	if b.closed { +		return +	}  	if len(b.backlog) > 0 {  		select {  		case b.c <- b.backlog[0]: @@ -72,7 +79,6 @@ func (b *Unbounded) Load() {  		default:  		}  	} -	b.mu.Unlock()  }  // Get returns a read channel on which values added to the buffer, via Put(), @@ -80,6 +86,20 @@ func (b *Unbounded) Load() {  //  // Upon reading a value from this channel, users are expected to call Load() to  // send the next buffered value onto the channel if there is any. +// +// If the unbounded buffer is closed, the read channel returned by this method +// is closed.  func (b *Unbounded) Get() <-chan interface{} {  	return b.c  } + +// Close closes the unbounded buffer. +func (b *Unbounded) Close() { +	b.mu.Lock() +	defer b.mu.Unlock() +	if b.closed { +		return +	} +	b.closed = true +	close(b.c) +} diff --git a/vendor/google.golang.org/grpc/internal/envconfig/envconfig.go b/vendor/google.golang.org/grpc/internal/envconfig/envconfig.go index 5ba9d94d4..77c2c0b89 100644 --- a/vendor/google.golang.org/grpc/internal/envconfig/envconfig.go +++ b/vendor/google.golang.org/grpc/internal/envconfig/envconfig.go @@ -36,6 +36,13 @@ var (  	// "GRPC_RING_HASH_CAP".  This does not override the default bounds  	// checking which NACKs configs specifying ring sizes > 8*1024*1024 (~8M).  	RingHashCap = uint64FromEnv("GRPC_RING_HASH_CAP", 4096, 1, 8*1024*1024) +	// PickFirstLBConfig is set if we should support configuration of the +	// pick_first LB policy, which can be enabled by setting the environment +	// variable "GRPC_EXPERIMENTAL_PICKFIRST_LB_CONFIG" to "true". +	PickFirstLBConfig = boolFromEnv("GRPC_EXPERIMENTAL_PICKFIRST_LB_CONFIG", false) +	// ALTSMaxConcurrentHandshakes is the maximum number of concurrent ALTS +	// handshakes that can be performed. +	ALTSMaxConcurrentHandshakes = uint64FromEnv("GRPC_ALTS_MAX_CONCURRENT_HANDSHAKES", 100, 1, 100)  )  func boolFromEnv(envVar string, def bool) bool { diff --git a/vendor/google.golang.org/grpc/internal/envconfig/observability.go b/vendor/google.golang.org/grpc/internal/envconfig/observability.go index 821dd0a7c..dd314cfb1 100644 --- a/vendor/google.golang.org/grpc/internal/envconfig/observability.go +++ b/vendor/google.golang.org/grpc/internal/envconfig/observability.go @@ -28,9 +28,15 @@ const (  var (  	// ObservabilityConfig is the json configuration for the gcp/observability  	// package specified directly in the envObservabilityConfig env var. +	// +	// This is used in the 1.0 release of gcp/observability, and thus must not be +	// deleted or changed.  	ObservabilityConfig = os.Getenv(envObservabilityConfig)  	// ObservabilityConfigFile is the json configuration for the  	// gcp/observability specified in a file with the location specified in  	// envObservabilityConfigFile env var. +	// +	// This is used in the 1.0 release of gcp/observability, and thus must not be +	// deleted or changed.  	ObservabilityConfigFile = os.Getenv(envObservabilityConfigFile)  ) diff --git a/vendor/google.golang.org/grpc/internal/envconfig/xds.go b/vendor/google.golang.org/grpc/internal/envconfig/xds.go index 3b17705ba..02b4b6a1c 100644 --- a/vendor/google.golang.org/grpc/internal/envconfig/xds.go +++ b/vendor/google.golang.org/grpc/internal/envconfig/xds.go @@ -61,11 +61,10 @@ var (  	// have a brand new API on the server-side and users explicitly need to use  	// the new API to get security integration on the server.  	XDSClientSideSecurity = boolFromEnv("GRPC_XDS_EXPERIMENTAL_SECURITY_SUPPORT", true) -	// XDSAggregateAndDNS indicates whether processing of aggregated cluster -	// and DNS cluster is enabled, which can be enabled by setting the -	// environment variable -	// "GRPC_XDS_EXPERIMENTAL_ENABLE_AGGREGATE_AND_LOGICAL_DNS_CLUSTER" to -	// "true". +	// XDSAggregateAndDNS indicates whether processing of aggregated cluster and +	// DNS cluster is enabled, which can be disabled by setting the environment +	// variable "GRPC_XDS_EXPERIMENTAL_ENABLE_AGGREGATE_AND_LOGICAL_DNS_CLUSTER" +	// to "false".  	XDSAggregateAndDNS = boolFromEnv("GRPC_XDS_EXPERIMENTAL_ENABLE_AGGREGATE_AND_LOGICAL_DNS_CLUSTER", true)  	// XDSRBAC indicates whether xDS configured RBAC HTTP Filter is enabled, @@ -82,11 +81,15 @@ var (  	XDSFederation = boolFromEnv("GRPC_EXPERIMENTAL_XDS_FEDERATION", true)  	// XDSRLS indicates whether processing of Cluster Specifier plugins and -	// support for the RLS CLuster Specifier is enabled, which can be enabled by +	// support for the RLS CLuster Specifier is enabled, which can be disabled by  	// setting the environment variable "GRPC_EXPERIMENTAL_XDS_RLS_LB" to -	// "true". -	XDSRLS = boolFromEnv("GRPC_EXPERIMENTAL_XDS_RLS_LB", false) +	// "false". +	XDSRLS = boolFromEnv("GRPC_EXPERIMENTAL_XDS_RLS_LB", true)  	// C2PResolverTestOnlyTrafficDirectorURI is the TD URI for testing.  	C2PResolverTestOnlyTrafficDirectorURI = os.Getenv("GRPC_TEST_ONLY_GOOGLE_C2P_RESOLVER_TRAFFIC_DIRECTOR_URI") +	// XDSCustomLBPolicy indicates whether Custom LB Policies are enabled, which +	// can be disabled by setting the environment variable +	// "GRPC_EXPERIMENTAL_XDS_CUSTOM_LB_CONFIG" to "false". +	XDSCustomLBPolicy = boolFromEnv("GRPC_EXPERIMENTAL_XDS_CUSTOM_LB_CONFIG", true)  ) diff --git a/vendor/google.golang.org/grpc/internal/grpcrand/grpcrand.go b/vendor/google.golang.org/grpc/internal/grpcrand/grpcrand.go index 517ea7064..aa97273e7 100644 --- a/vendor/google.golang.org/grpc/internal/grpcrand/grpcrand.go +++ b/vendor/google.golang.org/grpc/internal/grpcrand/grpcrand.go @@ -72,3 +72,24 @@ func Uint64() uint64 {  	defer mu.Unlock()  	return r.Uint64()  } + +// Uint32 implements rand.Uint32 on the grpcrand global source. +func Uint32() uint32 { +	mu.Lock() +	defer mu.Unlock() +	return r.Uint32() +} + +// ExpFloat64 implements rand.ExpFloat64 on the grpcrand global source. +func ExpFloat64() float64 { +	mu.Lock() +	defer mu.Unlock() +	return r.ExpFloat64() +} + +// Shuffle implements rand.Shuffle on the grpcrand global source. +var Shuffle = func(n int, f func(int, int)) { +	mu.Lock() +	defer mu.Unlock() +	r.Shuffle(n, f) +} diff --git a/vendor/google.golang.org/grpc/internal/grpcsync/callback_serializer.go b/vendor/google.golang.org/grpc/internal/grpcsync/callback_serializer.go index 79993d343..37b8d4117 100644 --- a/vendor/google.golang.org/grpc/internal/grpcsync/callback_serializer.go +++ b/vendor/google.golang.org/grpc/internal/grpcsync/callback_serializer.go @@ -20,6 +20,7 @@ package grpcsync  import (  	"context" +	"sync"  	"google.golang.org/grpc/internal/buffer"  ) @@ -31,15 +32,26 @@ import (  //  // This type is safe for concurrent access.  type CallbackSerializer struct { +	// Done is closed once the serializer is shut down completely, i.e all +	// scheduled callbacks are executed and the serializer has deallocated all +	// its resources. +	Done chan struct{} +  	callbacks *buffer.Unbounded +	closedMu  sync.Mutex +	closed    bool  }  // NewCallbackSerializer returns a new CallbackSerializer instance. The provided  // context will be passed to the scheduled callbacks. Users should cancel the  // provided context to shutdown the CallbackSerializer. It is guaranteed that no -// callbacks will be executed once this context is canceled. +// callbacks will be added once this context is canceled, and any pending un-run +// callbacks will be executed before the serializer is shut down.  func NewCallbackSerializer(ctx context.Context) *CallbackSerializer { -	t := &CallbackSerializer{callbacks: buffer.NewUnbounded()} +	t := &CallbackSerializer{ +		Done:      make(chan struct{}), +		callbacks: buffer.NewUnbounded(), +	}  	go t.run(ctx)  	return t  } @@ -48,18 +60,60 @@ func NewCallbackSerializer(ctx context.Context) *CallbackSerializer {  //  // Callbacks are expected to honor the context when performing any blocking  // operations, and should return early when the context is canceled. -func (t *CallbackSerializer) Schedule(f func(ctx context.Context)) { +// +// Return value indicates if the callback was successfully added to the list of +// callbacks to be executed by the serializer. It is not possible to add +// callbacks once the context passed to NewCallbackSerializer is cancelled. +func (t *CallbackSerializer) Schedule(f func(ctx context.Context)) bool { +	t.closedMu.Lock() +	defer t.closedMu.Unlock() + +	if t.closed { +		return false +	}  	t.callbacks.Put(f) +	return true  }  func (t *CallbackSerializer) run(ctx context.Context) { +	var backlog []func(context.Context) + +	defer close(t.Done)  	for ctx.Err() == nil {  		select {  		case <-ctx.Done(): -			return -		case callback := <-t.callbacks.Get(): +			// Do nothing here. Next iteration of the for loop will not happen, +			// since ctx.Err() would be non-nil. +		case callback, ok := <-t.callbacks.Get(): +			if !ok { +				return +			}  			t.callbacks.Load()  			callback.(func(ctx context.Context))(ctx)  		}  	} + +	// Fetch pending callbacks if any, and execute them before returning from +	// this method and closing t.Done. +	t.closedMu.Lock() +	t.closed = true +	backlog = t.fetchPendingCallbacks() +	t.callbacks.Close() +	t.closedMu.Unlock() +	for _, b := range backlog { +		b(ctx) +	} +} + +func (t *CallbackSerializer) fetchPendingCallbacks() []func(context.Context) { +	var backlog []func(context.Context) +	for { +		select { +		case b := <-t.callbacks.Get(): +			backlog = append(backlog, b.(func(context.Context))) +			t.callbacks.Load() +		default: +			return backlog +		} +	}  } diff --git a/vendor/google.golang.org/grpc/internal/grpcsync/pubsub.go b/vendor/google.golang.org/grpc/internal/grpcsync/pubsub.go new file mode 100644 index 000000000..f58b5ffa6 --- /dev/null +++ b/vendor/google.golang.org/grpc/internal/grpcsync/pubsub.go @@ -0,0 +1,136 @@ +/* + * + * Copyright 2023 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + *     http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package grpcsync + +import ( +	"context" +	"sync" +) + +// Subscriber represents an entity that is subscribed to messages published on +// a PubSub. It wraps the callback to be invoked by the PubSub when a new +// message is published. +type Subscriber interface { +	// OnMessage is invoked when a new message is published. Implementations +	// must not block in this method. +	OnMessage(msg interface{}) +} + +// PubSub is a simple one-to-many publish-subscribe system that supports +// messages of arbitrary type. It guarantees that messages are delivered in +// the same order in which they were published. +// +// Publisher invokes the Publish() method to publish new messages, while +// subscribers interested in receiving these messages register a callback +// via the Subscribe() method. +// +// Once a PubSub is stopped, no more messages can be published, and +// it is guaranteed that no more subscriber callback will be invoked. +type PubSub struct { +	cs     *CallbackSerializer +	cancel context.CancelFunc + +	// Access to the below fields are guarded by this mutex. +	mu          sync.Mutex +	msg         interface{} +	subscribers map[Subscriber]bool +	stopped     bool +} + +// NewPubSub returns a new PubSub instance. +func NewPubSub() *PubSub { +	ctx, cancel := context.WithCancel(context.Background()) +	return &PubSub{ +		cs:          NewCallbackSerializer(ctx), +		cancel:      cancel, +		subscribers: map[Subscriber]bool{}, +	} +} + +// Subscribe registers the provided Subscriber to the PubSub. +// +// If the PubSub contains a previously published message, the Subscriber's +// OnMessage() callback will be invoked asynchronously with the existing +// message to begin with, and subsequently for every newly published message. +// +// The caller is responsible for invoking the returned cancel function to +// unsubscribe itself from the PubSub. +func (ps *PubSub) Subscribe(sub Subscriber) (cancel func()) { +	ps.mu.Lock() +	defer ps.mu.Unlock() + +	if ps.stopped { +		return func() {} +	} + +	ps.subscribers[sub] = true + +	if ps.msg != nil { +		msg := ps.msg +		ps.cs.Schedule(func(context.Context) { +			ps.mu.Lock() +			defer ps.mu.Unlock() +			if !ps.subscribers[sub] { +				return +			} +			sub.OnMessage(msg) +		}) +	} + +	return func() { +		ps.mu.Lock() +		defer ps.mu.Unlock() +		delete(ps.subscribers, sub) +	} +} + +// Publish publishes the provided message to the PubSub, and invokes +// callbacks registered by subscribers asynchronously. +func (ps *PubSub) Publish(msg interface{}) { +	ps.mu.Lock() +	defer ps.mu.Unlock() + +	if ps.stopped { +		return +	} + +	ps.msg = msg +	for sub := range ps.subscribers { +		s := sub +		ps.cs.Schedule(func(context.Context) { +			ps.mu.Lock() +			defer ps.mu.Unlock() +			if !ps.subscribers[s] { +				return +			} +			s.OnMessage(msg) +		}) +	} +} + +// Stop shuts down the PubSub and releases any resources allocated by it. +// It is guaranteed that no subscriber callbacks would be invoked once this +// method returns. +func (ps *PubSub) Stop() { +	ps.mu.Lock() +	defer ps.mu.Unlock() +	ps.stopped = true + +	ps.cancel() +} diff --git a/vendor/google.golang.org/grpc/internal/internal.go b/vendor/google.golang.org/grpc/internal/internal.go index 836b6a3b3..42ff39c84 100644 --- a/vendor/google.golang.org/grpc/internal/internal.go +++ b/vendor/google.golang.org/grpc/internal/internal.go @@ -60,6 +60,9 @@ var (  	GetServerCredentials interface{} // func (*grpc.Server) credentials.TransportCredentials  	// CanonicalString returns the canonical string of the code defined here:  	// https://github.com/grpc/grpc/blob/master/doc/statuscodes.md. +	// +	// This is used in the 1.0 release of gcp/observability, and thus must not be +	// deleted or changed.  	CanonicalString interface{} // func (codes.Code) string  	// DrainServerTransports initiates a graceful close of existing connections  	// on a gRPC server accepted on the provided listener address. An @@ -69,20 +72,35 @@ var (  	// AddGlobalServerOptions adds an array of ServerOption that will be  	// effective globally for newly created servers. The priority will be: 1.  	// user-provided; 2. this method; 3. default values. +	// +	// This is used in the 1.0 release of gcp/observability, and thus must not be +	// deleted or changed.  	AddGlobalServerOptions interface{} // func(opt ...ServerOption)  	// ClearGlobalServerOptions clears the array of extra ServerOption. This  	// method is useful in testing and benchmarking. +	// +	// This is used in the 1.0 release of gcp/observability, and thus must not be +	// deleted or changed.  	ClearGlobalServerOptions func()  	// AddGlobalDialOptions adds an array of DialOption that will be effective  	// globally for newly created client channels. The priority will be: 1.  	// user-provided; 2. this method; 3. default values. +	// +	// This is used in the 1.0 release of gcp/observability, and thus must not be +	// deleted or changed.  	AddGlobalDialOptions interface{} // func(opt ...DialOption)  	// DisableGlobalDialOptions returns a DialOption that prevents the  	// ClientConn from applying the global DialOptions (set via  	// AddGlobalDialOptions). +	// +	// This is used in the 1.0 release of gcp/observability, and thus must not be +	// deleted or changed.  	DisableGlobalDialOptions interface{} // func() grpc.DialOption  	// ClearGlobalDialOptions clears the array of extra DialOption. This  	// method is useful in testing and benchmarking. +	// +	// This is used in the 1.0 release of gcp/observability, and thus must not be +	// deleted or changed.  	ClearGlobalDialOptions func()  	// JoinDialOptions combines the dial options passed as arguments into a  	// single dial option. @@ -93,9 +111,15 @@ var (  	// WithBinaryLogger returns a DialOption that specifies the binary logger  	// for a ClientConn. +	// +	// This is used in the 1.0 release of gcp/observability, and thus must not be +	// deleted or changed.  	WithBinaryLogger interface{} // func(binarylog.Logger) grpc.DialOption  	// BinaryLogger returns a ServerOption that can set the binary logger for a  	// server. +	// +	// This is used in the 1.0 release of gcp/observability, and thus must not be +	// deleted or changed.  	BinaryLogger interface{} // func(binarylog.Logger) grpc.ServerOption  	// NewXDSResolverWithConfigForTesting creates a new xds resolver builder using diff --git a/vendor/google.golang.org/grpc/internal/resolver/dns/dns_resolver.go b/vendor/google.golang.org/grpc/internal/resolver/dns/dns_resolver.go index 09a667f33..99e1e5b36 100644 --- a/vendor/google.golang.org/grpc/internal/resolver/dns/dns_resolver.go +++ b/vendor/google.golang.org/grpc/internal/resolver/dns/dns_resolver.go @@ -62,7 +62,8 @@ const (  	defaultPort       = "443"  	defaultDNSSvrPort = "53"  	golang            = "GO" -	// txtPrefix is the prefix string to be prepended to the host name for txt record lookup. +	// txtPrefix is the prefix string to be prepended to the host name for txt +	// record lookup.  	txtPrefix = "_grpc_config."  	// In DNS, service config is encoded in a TXT record via the mechanism  	// described in RFC-1464 using the attribute name grpc_config. @@ -86,14 +87,14 @@ var (  	minDNSResRate = 30 * time.Second  ) -var customAuthorityDialler = func(authority string) func(ctx context.Context, network, address string) (net.Conn, error) { -	return func(ctx context.Context, network, address string) (net.Conn, error) { +var addressDialer = func(address string) func(context.Context, string, string) (net.Conn, error) { +	return func(ctx context.Context, network, _ string) (net.Conn, error) {  		var dialer net.Dialer -		return dialer.DialContext(ctx, network, authority) +		return dialer.DialContext(ctx, network, address)  	}  } -var customAuthorityResolver = func(authority string) (netResolver, error) { +var newNetResolver = func(authority string) (netResolver, error) {  	host, port, err := parseTarget(authority, defaultDNSSvrPort)  	if err != nil {  		return nil, err @@ -103,7 +104,7 @@ var customAuthorityResolver = func(authority string) (netResolver, error) {  	return &net.Resolver{  		PreferGo: true, -		Dial:     customAuthorityDialler(authorityWithPort), +		Dial:     addressDialer(authorityWithPort),  	}, nil  } @@ -114,7 +115,8 @@ func NewBuilder() resolver.Builder {  type dnsBuilder struct{} -// Build creates and starts a DNS resolver that watches the name resolution of the target. +// Build creates and starts a DNS resolver that watches the name resolution of +// the target.  func (b *dnsBuilder) Build(target resolver.Target, cc resolver.ClientConn, opts resolver.BuildOptions) (resolver.Resolver, error) {  	host, port, err := parseTarget(target.Endpoint(), defaultPort)  	if err != nil { @@ -143,7 +145,7 @@ func (b *dnsBuilder) Build(target resolver.Target, cc resolver.ClientConn, opts  	if target.URL.Host == "" {  		d.resolver = defaultResolver  	} else { -		d.resolver, err = customAuthorityResolver(target.URL.Host) +		d.resolver, err = newNetResolver(target.URL.Host)  		if err != nil {  			return nil, err  		} @@ -180,19 +182,22 @@ type dnsResolver struct {  	ctx      context.Context  	cancel   context.CancelFunc  	cc       resolver.ClientConn -	// rn channel is used by ResolveNow() to force an immediate resolution of the target. +	// rn channel is used by ResolveNow() to force an immediate resolution of the +	// target.  	rn chan struct{} -	// wg is used to enforce Close() to return after the watcher() goroutine has finished. -	// Otherwise, data race will be possible. [Race Example] in dns_resolver_test we -	// replace the real lookup functions with mocked ones to facilitate testing. -	// If Close() doesn't wait for watcher() goroutine finishes, race detector sometimes -	// will warns lookup (READ the lookup function pointers) inside watcher() goroutine -	// has data race with replaceNetFunc (WRITE the lookup function pointers). +	// wg is used to enforce Close() to return after the watcher() goroutine has +	// finished. Otherwise, data race will be possible. [Race Example] in +	// dns_resolver_test we replace the real lookup functions with mocked ones to +	// facilitate testing. If Close() doesn't wait for watcher() goroutine +	// finishes, race detector sometimes will warns lookup (READ the lookup +	// function pointers) inside watcher() goroutine has data race with +	// replaceNetFunc (WRITE the lookup function pointers).  	wg                   sync.WaitGroup  	disableServiceConfig bool  } -// ResolveNow invoke an immediate resolution of the target that this dnsResolver watches. +// ResolveNow invoke an immediate resolution of the target that this +// dnsResolver watches.  func (d *dnsResolver) ResolveNow(resolver.ResolveNowOptions) {  	select {  	case d.rn <- struct{}{}: @@ -220,8 +225,8 @@ func (d *dnsResolver) watcher() {  		var timer *time.Timer  		if err == nil { -			// Success resolving, wait for the next ResolveNow. However, also wait 30 seconds at the very least -			// to prevent constantly re-resolving. +			// Success resolving, wait for the next ResolveNow. However, also wait 30 +			// seconds at the very least to prevent constantly re-resolving.  			backoffIndex = 1  			timer = newTimerDNSResRate(minDNSResRate)  			select { @@ -231,7 +236,8 @@ func (d *dnsResolver) watcher() {  			case <-d.rn:  			}  		} else { -			// Poll on an error found in DNS Resolver or an error received from ClientConn. +			// Poll on an error found in DNS Resolver or an error received from +			// ClientConn.  			timer = newTimer(backoff.DefaultExponential.Backoff(backoffIndex))  			backoffIndex++  		} @@ -278,7 +284,8 @@ func (d *dnsResolver) lookupSRV() ([]resolver.Address, error) {  }  func handleDNSError(err error, lookupType string) error { -	if dnsErr, ok := err.(*net.DNSError); ok && !dnsErr.IsTimeout && !dnsErr.IsTemporary { +	dnsErr, ok := err.(*net.DNSError) +	if ok && !dnsErr.IsTimeout && !dnsErr.IsTemporary {  		// Timeouts and temporary errors should be communicated to gRPC to  		// attempt another DNS query (with backoff).  Other errors should be  		// suppressed (they may represent the absence of a TXT record). @@ -307,10 +314,12 @@ func (d *dnsResolver) lookupTXT() *serviceconfig.ParseResult {  		res += s  	} -	// TXT record must have "grpc_config=" attribute in order to be used as service config. +	// TXT record must have "grpc_config=" attribute in order to be used as +	// service config.  	if !strings.HasPrefix(res, txtAttribute) {  		logger.Warningf("dns: TXT record %v missing %v attribute", res, txtAttribute) -		// This is not an error; it is the equivalent of not having a service config. +		// This is not an error; it is the equivalent of not having a service +		// config.  		return nil  	}  	sc := canaryingSC(strings.TrimPrefix(res, txtAttribute)) @@ -352,9 +361,10 @@ func (d *dnsResolver) lookup() (*resolver.State, error) {  	return &state, nil  } -// formatIP returns ok = false if addr is not a valid textual representation of an IP address. -// If addr is an IPv4 address, return the addr and ok = true. -// If addr is an IPv6 address, return the addr enclosed in square brackets and ok = true. +// formatIP returns ok = false if addr is not a valid textual representation of +// an IP address. If addr is an IPv4 address, return the addr and ok = true. +// If addr is an IPv6 address, return the addr enclosed in square brackets and +// ok = true.  func formatIP(addr string) (addrIP string, ok bool) {  	ip := net.ParseIP(addr)  	if ip == nil { @@ -366,10 +376,10 @@ func formatIP(addr string) (addrIP string, ok bool) {  	return "[" + addr + "]", true  } -// parseTarget takes the user input target string and default port, returns formatted host and port info. -// If target doesn't specify a port, set the port to be the defaultPort. -// If target is in IPv6 format and host-name is enclosed in square brackets, brackets -// are stripped when setting the host. +// parseTarget takes the user input target string and default port, returns +// formatted host and port info. If target doesn't specify a port, set the port +// to be the defaultPort. If target is in IPv6 format and host-name is enclosed +// in square brackets, brackets are stripped when setting the host.  // examples:  // target: "www.google.com" defaultPort: "443" returns host: "www.google.com", port: "443"  // target: "ipv4-host:80" defaultPort: "443" returns host: "ipv4-host", port: "80" @@ -385,12 +395,14 @@ func parseTarget(target, defaultPort string) (host, port string, err error) {  	}  	if host, port, err = net.SplitHostPort(target); err == nil {  		if port == "" { -			// If the port field is empty (target ends with colon), e.g. "[::1]:", this is an error. +			// If the port field is empty (target ends with colon), e.g. "[::1]:", +			// this is an error.  			return "", "", errEndsWithColon  		}  		// target has port, i.e ipv4-host:port, [ipv6-host]:port, host-name:port  		if host == "" { -			// Keep consistent with net.Dial(): If the host is empty, as in ":80", the local system is assumed. +			// Keep consistent with net.Dial(): If the host is empty, as in ":80", +			// the local system is assumed.  			host = "localhost"  		}  		return host, port, nil diff --git a/vendor/google.golang.org/grpc/internal/serviceconfig/duration.go b/vendor/google.golang.org/grpc/internal/serviceconfig/duration.go new file mode 100644 index 000000000..11d82afcc --- /dev/null +++ b/vendor/google.golang.org/grpc/internal/serviceconfig/duration.go @@ -0,0 +1,130 @@ +/* + * + * Copyright 2023 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + *     http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package serviceconfig + +import ( +	"encoding/json" +	"fmt" +	"math" +	"strconv" +	"strings" +	"time" +) + +// Duration defines JSON marshal and unmarshal methods to conform to the +// protobuf JSON spec defined [here]. +// +// [here]: https://protobuf.dev/reference/protobuf/google.protobuf/#duration +type Duration time.Duration + +func (d Duration) String() string { +	return fmt.Sprint(time.Duration(d)) +} + +// MarshalJSON converts from d to a JSON string output. +func (d Duration) MarshalJSON() ([]byte, error) { +	ns := time.Duration(d).Nanoseconds() +	sec := ns / int64(time.Second) +	ns = ns % int64(time.Second) + +	var sign string +	if sec < 0 || ns < 0 { +		sign, sec, ns = "-", -1*sec, -1*ns +	} + +	// Generated output always contains 0, 3, 6, or 9 fractional digits, +	// depending on required precision. +	str := fmt.Sprintf("%s%d.%09d", sign, sec, ns) +	str = strings.TrimSuffix(str, "000") +	str = strings.TrimSuffix(str, "000") +	str = strings.TrimSuffix(str, ".000") +	return []byte(fmt.Sprintf("\"%ss\"", str)), nil +} + +// UnmarshalJSON unmarshals b as a duration JSON string into d. +func (d *Duration) UnmarshalJSON(b []byte) error { +	var s string +	if err := json.Unmarshal(b, &s); err != nil { +		return err +	} +	if !strings.HasSuffix(s, "s") { +		return fmt.Errorf("malformed duration %q: missing seconds unit", s) +	} +	neg := false +	if s[0] == '-' { +		neg = true +		s = s[1:] +	} +	ss := strings.SplitN(s[:len(s)-1], ".", 3) +	if len(ss) > 2 { +		return fmt.Errorf("malformed duration %q: too many decimals", s) +	} +	// hasDigits is set if either the whole or fractional part of the number is +	// present, since both are optional but one is required. +	hasDigits := false +	var sec, ns int64 +	if len(ss[0]) > 0 { +		var err error +		if sec, err = strconv.ParseInt(ss[0], 10, 64); err != nil { +			return fmt.Errorf("malformed duration %q: %v", s, err) +		} +		// Maximum seconds value per the durationpb spec. +		const maxProtoSeconds = 315_576_000_000 +		if sec > maxProtoSeconds { +			return fmt.Errorf("out of range: %q", s) +		} +		hasDigits = true +	} +	if len(ss) == 2 && len(ss[1]) > 0 { +		if len(ss[1]) > 9 { +			return fmt.Errorf("malformed duration %q: too many digits after decimal", s) +		} +		var err error +		if ns, err = strconv.ParseInt(ss[1], 10, 64); err != nil { +			return fmt.Errorf("malformed duration %q: %v", s, err) +		} +		for i := 9; i > len(ss[1]); i-- { +			ns *= 10 +		} +		hasDigits = true +	} +	if !hasDigits { +		return fmt.Errorf("malformed duration %q: contains no numbers", s) +	} + +	if neg { +		sec *= -1 +		ns *= -1 +	} + +	// Maximum/minimum seconds/nanoseconds representable by Go's time.Duration. +	const maxSeconds = math.MaxInt64 / int64(time.Second) +	const maxNanosAtMaxSeconds = math.MaxInt64 % int64(time.Second) +	const minSeconds = math.MinInt64 / int64(time.Second) +	const minNanosAtMinSeconds = math.MinInt64 % int64(time.Second) + +	if sec > maxSeconds || (sec == maxSeconds && ns >= maxNanosAtMaxSeconds) { +		*d = Duration(math.MaxInt64) +	} else if sec < minSeconds || (sec == minSeconds && ns <= minNanosAtMinSeconds) { +		*d = Duration(math.MinInt64) +	} else { +		*d = Duration(sec*int64(time.Second) + ns) +	} +	return nil +} diff --git a/vendor/google.golang.org/grpc/internal/transport/handler_server.go b/vendor/google.golang.org/grpc/internal/transport/handler_server.go index fbee581b8..98f80e3fa 100644 --- a/vendor/google.golang.org/grpc/internal/transport/handler_server.go +++ b/vendor/google.golang.org/grpc/internal/transport/handler_server.go @@ -453,7 +453,7 @@ func (ht *serverHandlerTransport) IncrMsgSent() {}  func (ht *serverHandlerTransport) IncrMsgRecv() {} -func (ht *serverHandlerTransport) Drain() { +func (ht *serverHandlerTransport) Drain(debugData string) {  	panic("Drain() is not implemented")  } diff --git a/vendor/google.golang.org/grpc/internal/transport/http2_client.go b/vendor/google.golang.org/grpc/internal/transport/http2_client.go index 5216998a8..326bf0848 100644 --- a/vendor/google.golang.org/grpc/internal/transport/http2_client.go +++ b/vendor/google.golang.org/grpc/internal/transport/http2_client.go @@ -1337,7 +1337,7 @@ func (t *http2Client) handleGoAway(f *http2.GoAwayFrame) {  // setGoAwayReason sets the value of t.goAwayReason based  // on the GoAway frame received. -// It expects a lock on transport's mutext to be held by +// It expects a lock on transport's mutex to be held by  // the caller.  func (t *http2Client) setGoAwayReason(f *http2.GoAwayFrame) {  	t.goAwayReason = GoAwayNoReason diff --git a/vendor/google.golang.org/grpc/internal/transport/http2_server.go b/vendor/google.golang.org/grpc/internal/transport/http2_server.go index 4b406b8cb..f96064012 100644 --- a/vendor/google.golang.org/grpc/internal/transport/http2_server.go +++ b/vendor/google.golang.org/grpc/internal/transport/http2_server.go @@ -238,7 +238,7 @@ func NewServerTransport(conn net.Conn, config *ServerConfig) (_ ServerTransport,  		kp.Timeout = defaultServerKeepaliveTimeout  	}  	if kp.Time != infinity { -		if err = syscall.SetTCPUserTimeout(conn, kp.Timeout); err != nil { +		if err = syscall.SetTCPUserTimeout(rawConn, kp.Timeout); err != nil {  			return nil, connectionErrorf(false, err, "transport: failed to set TCP_USER_TIMEOUT: %v", err)  		}  	} @@ -1166,12 +1166,12 @@ func (t *http2Server) keepalive() {  			if val <= 0 {  				// The connection has been idle for a duration of keepalive.MaxConnectionIdle or more.  				// Gracefully close the connection. -				t.Drain() +				t.Drain("max_idle")  				return  			}  			idleTimer.Reset(val)  		case <-ageTimer.C: -			t.Drain() +			t.Drain("max_age")  			ageTimer.Reset(t.kp.MaxConnectionAgeGrace)  			select {  			case <-ageTimer.C: @@ -1318,14 +1318,14 @@ func (t *http2Server) RemoteAddr() net.Addr {  	return t.remoteAddr  } -func (t *http2Server) Drain() { +func (t *http2Server) Drain(debugData string) {  	t.mu.Lock()  	defer t.mu.Unlock()  	if t.drainEvent != nil {  		return  	}  	t.drainEvent = grpcsync.NewEvent() -	t.controlBuf.put(&goAway{code: http2.ErrCodeNo, debugData: []byte{}, headsUp: true}) +	t.controlBuf.put(&goAway{code: http2.ErrCodeNo, debugData: []byte(debugData), headsUp: true})  }  var goAwayPing = &ping{data: [8]byte{1, 6, 1, 8, 0, 3, 3, 9}} @@ -1367,7 +1367,7 @@ func (t *http2Server) outgoingGoAwayHandler(g *goAway) (bool, error) {  	// originated before the GoAway reaches the client.  	// After getting the ack or timer expiration send out another GoAway this  	// time with an ID of the max stream server intends to process. -	if err := t.framer.fr.WriteGoAway(math.MaxUint32, http2.ErrCodeNo, []byte{}); err != nil { +	if err := t.framer.fr.WriteGoAway(math.MaxUint32, http2.ErrCodeNo, g.debugData); err != nil {  		return false, err  	}  	if err := t.framer.fr.WritePing(false, goAwayPing.data); err != nil { diff --git a/vendor/google.golang.org/grpc/internal/transport/transport.go b/vendor/google.golang.org/grpc/internal/transport/transport.go index 1b7d7fabc..aa1c89659 100644 --- a/vendor/google.golang.org/grpc/internal/transport/transport.go +++ b/vendor/google.golang.org/grpc/internal/transport/transport.go @@ -726,7 +726,7 @@ type ServerTransport interface {  	RemoteAddr() net.Addr  	// Drain notifies the client this ServerTransport stops accepting new RPCs. -	Drain() +	Drain(debugData string)  	// IncrMsgSent increments the number of message sent through this transport.  	IncrMsgSent() diff --git a/vendor/google.golang.org/grpc/picker_wrapper.go b/vendor/google.golang.org/grpc/picker_wrapper.go index c525dc070..02f975951 100644 --- a/vendor/google.golang.org/grpc/picker_wrapper.go +++ b/vendor/google.golang.org/grpc/picker_wrapper.go @@ -36,6 +36,7 @@ import (  type pickerWrapper struct {  	mu         sync.Mutex  	done       bool +	idle       bool  	blockingCh chan struct{}  	picker     balancer.Picker  } @@ -47,7 +48,11 @@ func newPickerWrapper() *pickerWrapper {  // updatePicker is called by UpdateBalancerState. It unblocks all blocked pick.  func (pw *pickerWrapper) updatePicker(p balancer.Picker) {  	pw.mu.Lock() -	if pw.done { +	if pw.done || pw.idle { +		// There is a small window where a picker update from the LB policy can +		// race with the channel going to idle mode. If the picker is idle here, +		// it is because the channel asked it to do so, and therefore it is sage +		// to ignore the update from the LB policy.  		pw.mu.Unlock()  		return  	} @@ -63,10 +68,8 @@ func (pw *pickerWrapper) updatePicker(p balancer.Picker) {  //   - wraps the done function in the passed in result to increment the calls  //     failed or calls succeeded channelz counter before invoking the actual  //     done function. -func doneChannelzWrapper(acw *acBalancerWrapper, result *balancer.PickResult) { -	acw.mu.Lock() -	ac := acw.ac -	acw.mu.Unlock() +func doneChannelzWrapper(acbw *acBalancerWrapper, result *balancer.PickResult) { +	ac := acbw.ac  	ac.incrCallsStarted()  	done := result.Done  	result.Done = func(b balancer.DoneInfo) { @@ -152,14 +155,14 @@ func (pw *pickerWrapper) pick(ctx context.Context, failfast bool, info balancer.  			return nil, balancer.PickResult{}, status.Error(codes.Unavailable, err.Error())  		} -		acw, ok := pickResult.SubConn.(*acBalancerWrapper) +		acbw, ok := pickResult.SubConn.(*acBalancerWrapper)  		if !ok {  			logger.Errorf("subconn returned from pick is type %T, not *acBalancerWrapper", pickResult.SubConn)  			continue  		} -		if t := acw.getAddrConn().getReadyTransport(); t != nil { +		if t := acbw.ac.getReadyTransport(); t != nil {  			if channelz.IsOn() { -				doneChannelzWrapper(acw, &pickResult) +				doneChannelzWrapper(acbw, &pickResult)  				return t, pickResult, nil  			}  			return t, pickResult, nil @@ -187,6 +190,25 @@ func (pw *pickerWrapper) close() {  	close(pw.blockingCh)  } +func (pw *pickerWrapper) enterIdleMode() { +	pw.mu.Lock() +	defer pw.mu.Unlock() +	if pw.done { +		return +	} +	pw.idle = true +} + +func (pw *pickerWrapper) exitIdleMode() { +	pw.mu.Lock() +	defer pw.mu.Unlock() +	if pw.done { +		return +	} +	pw.blockingCh = make(chan struct{}) +	pw.idle = false +} +  // dropError is a wrapper error that indicates the LB policy wishes to drop the  // RPC and not retry it.  type dropError struct { diff --git a/vendor/google.golang.org/grpc/pickfirst.go b/vendor/google.golang.org/grpc/pickfirst.go index fc91b4d26..abe266b02 100644 --- a/vendor/google.golang.org/grpc/pickfirst.go +++ b/vendor/google.golang.org/grpc/pickfirst.go @@ -19,11 +19,15 @@  package grpc  import ( +	"encoding/json"  	"errors"  	"fmt"  	"google.golang.org/grpc/balancer"  	"google.golang.org/grpc/connectivity" +	"google.golang.org/grpc/internal/envconfig" +	"google.golang.org/grpc/internal/grpcrand" +	"google.golang.org/grpc/serviceconfig"  )  // PickFirstBalancerName is the name of the pick_first balancer. @@ -43,10 +47,28 @@ func (*pickfirstBuilder) Name() string {  	return PickFirstBalancerName  } +type pfConfig struct { +	serviceconfig.LoadBalancingConfig `json:"-"` + +	// If set to true, instructs the LB policy to shuffle the order of the list +	// of addresses received from the name resolver before attempting to +	// connect to them. +	ShuffleAddressList bool `json:"shuffleAddressList"` +} + +func (*pickfirstBuilder) ParseConfig(js json.RawMessage) (serviceconfig.LoadBalancingConfig, error) { +	cfg := &pfConfig{} +	if err := json.Unmarshal(js, cfg); err != nil { +		return nil, fmt.Errorf("pickfirst: unable to unmarshal LB policy config: %s, error: %v", string(js), err) +	} +	return cfg, nil +} +  type pickfirstBalancer struct {  	state   connectivity.State  	cc      balancer.ClientConn  	subConn balancer.SubConn +	cfg     *pfConfig  }  func (b *pickfirstBalancer) ResolverError(err error) { @@ -69,7 +91,8 @@ func (b *pickfirstBalancer) ResolverError(err error) {  }  func (b *pickfirstBalancer) UpdateClientConnState(state balancer.ClientConnState) error { -	if len(state.ResolverState.Addresses) == 0 { +	addrs := state.ResolverState.Addresses +	if len(addrs) == 0 {  		// The resolver reported an empty address list. Treat it like an error by  		// calling b.ResolverError.  		if b.subConn != nil { @@ -82,12 +105,23 @@ func (b *pickfirstBalancer) UpdateClientConnState(state balancer.ClientConnState  		return balancer.ErrBadResolverState  	} +	if state.BalancerConfig != nil { +		cfg, ok := state.BalancerConfig.(*pfConfig) +		if !ok { +			return fmt.Errorf("pickfirstBalancer: received nil or illegal BalancerConfig (type %T): %v", state.BalancerConfig, state.BalancerConfig) +		} +		b.cfg = cfg +	} + +	if envconfig.PickFirstLBConfig && b.cfg != nil && b.cfg.ShuffleAddressList { +		grpcrand.Shuffle(len(addrs), func(i, j int) { addrs[i], addrs[j] = addrs[j], addrs[i] }) +	}  	if b.subConn != nil { -		b.cc.UpdateAddresses(b.subConn, state.ResolverState.Addresses) +		b.cc.UpdateAddresses(b.subConn, addrs)  		return nil  	} -	subConn, err := b.cc.NewSubConn(state.ResolverState.Addresses, balancer.NewSubConnOptions{}) +	subConn, err := b.cc.NewSubConn(addrs, balancer.NewSubConnOptions{})  	if err != nil {  		if logger.V(2) {  			logger.Errorf("pickfirstBalancer: failed to NewSubConn: %v", err) @@ -119,7 +153,6 @@ func (b *pickfirstBalancer) UpdateSubConnState(subConn balancer.SubConn, state b  		}  		return  	} -	b.state = state.ConnectivityState  	if state.ConnectivityState == connectivity.Shutdown {  		b.subConn = nil  		return @@ -132,11 +165,21 @@ func (b *pickfirstBalancer) UpdateSubConnState(subConn balancer.SubConn, state b  			Picker:            &picker{result: balancer.PickResult{SubConn: subConn}},  		})  	case connectivity.Connecting: +		if b.state == connectivity.TransientFailure { +			// We stay in TransientFailure until we are Ready. See A62. +			return +		}  		b.cc.UpdateState(balancer.State{  			ConnectivityState: state.ConnectivityState,  			Picker:            &picker{err: balancer.ErrNoSubConnAvailable},  		})  	case connectivity.Idle: +		if b.state == connectivity.TransientFailure { +			// We stay in TransientFailure until we are Ready. Also kick the +			// subConn out of Idle into Connecting. See A62. +			b.subConn.Connect() +			return +		}  		b.cc.UpdateState(balancer.State{  			ConnectivityState: state.ConnectivityState,  			Picker:            &idlePicker{subConn: subConn}, @@ -147,6 +190,7 @@ func (b *pickfirstBalancer) UpdateSubConnState(subConn balancer.SubConn, state b  			Picker:            &picker{err: state.ConnectionError},  		})  	} +	b.state = state.ConnectivityState  }  func (b *pickfirstBalancer) Close() { diff --git a/vendor/google.golang.org/grpc/resolver/resolver.go b/vendor/google.golang.org/grpc/resolver/resolver.go index 6215e5ef2..d8db6f5d3 100644 --- a/vendor/google.golang.org/grpc/resolver/resolver.go +++ b/vendor/google.golang.org/grpc/resolver/resolver.go @@ -22,13 +22,13 @@ package resolver  import (  	"context" +	"fmt"  	"net"  	"net/url"  	"strings"  	"google.golang.org/grpc/attributes"  	"google.golang.org/grpc/credentials" -	"google.golang.org/grpc/internal/pretty"  	"google.golang.org/grpc/serviceconfig"  ) @@ -124,7 +124,7 @@ type Address struct {  	Attributes *attributes.Attributes  	// BalancerAttributes contains arbitrary data about this address intended -	// for consumption by the LB policy.  These attribes do not affect SubConn +	// for consumption by the LB policy.  These attributes do not affect SubConn  	// creation, connection establishment, handshaking, etc.  	BalancerAttributes *attributes.Attributes @@ -142,6 +142,10 @@ type Address struct {  // Equal returns whether a and o are identical.  Metadata is compared directly,  // not with any recursive introspection. +// +// This method compares all fields of the address. When used to tell apart +// addresses during subchannel creation or connection establishment, it might be +// more appropriate for the caller to implement custom equality logic.  func (a Address) Equal(o Address) bool {  	return a.Addr == o.Addr && a.ServerName == o.ServerName &&  		a.Attributes.Equal(o.Attributes) && @@ -151,7 +155,17 @@ func (a Address) Equal(o Address) bool {  // String returns JSON formatted string representation of the address.  func (a Address) String() string { -	return pretty.ToJSON(a) +	var sb strings.Builder +	sb.WriteString(fmt.Sprintf("{Addr: %q, ", a.Addr)) +	sb.WriteString(fmt.Sprintf("ServerName: %q, ", a.ServerName)) +	if a.Attributes != nil { +		sb.WriteString(fmt.Sprintf("Attributes: %v, ", a.Attributes.String())) +	} +	if a.BalancerAttributes != nil { +		sb.WriteString(fmt.Sprintf("BalancerAttributes: %v", a.BalancerAttributes.String())) +	} +	sb.WriteString("}") +	return sb.String()  }  // BuildOptions includes additional information for the builder to create @@ -254,10 +268,6 @@ type ClientConn interface {  //   - "unknown_scheme://authority/endpoint"  //     Target{Scheme: resolver.GetDefaultScheme(), Endpoint: "unknown_scheme://authority/endpoint"}  type Target struct { -	// Deprecated: use URL.Scheme instead. -	Scheme string -	// Deprecated: use URL.Host instead. -	Authority string  	// URL contains the parsed dial target with an optional default scheme added  	// to it if the original dial target contained no scheme or contained an  	// unregistered scheme. Any query params specified in the original dial diff --git a/vendor/google.golang.org/grpc/resolver_conn_wrapper.go b/vendor/google.golang.org/grpc/resolver_conn_wrapper.go index 05a9d4e0b..b408b3688 100644 --- a/vendor/google.golang.org/grpc/resolver_conn_wrapper.go +++ b/vendor/google.golang.org/grpc/resolver_conn_wrapper.go @@ -19,11 +19,11 @@  package grpc  import ( +	"context"  	"strings"  	"sync"  	"google.golang.org/grpc/balancer" -	"google.golang.org/grpc/credentials"  	"google.golang.org/grpc/internal/channelz"  	"google.golang.org/grpc/internal/grpcsync"  	"google.golang.org/grpc/internal/pretty" @@ -31,129 +31,192 @@ import (  	"google.golang.org/grpc/serviceconfig"  ) +// resolverStateUpdater wraps the single method used by ccResolverWrapper to +// report a state update from the actual resolver implementation. +type resolverStateUpdater interface { +	updateResolverState(s resolver.State, err error) error +} +  // ccResolverWrapper is a wrapper on top of cc for resolvers.  // It implements resolver.ClientConn interface.  type ccResolverWrapper struct { -	cc         *ClientConn -	resolverMu sync.Mutex -	resolver   resolver.Resolver -	done       *grpcsync.Event -	curState   resolver.State +	// The following fields are initialized when the wrapper is created and are +	// read-only afterwards, and therefore can be accessed without a mutex. +	cc                  resolverStateUpdater +	channelzID          *channelz.Identifier +	ignoreServiceConfig bool +	opts                ccResolverWrapperOpts +	serializer          *grpcsync.CallbackSerializer // To serialize all incoming calls. +	serializerCancel    context.CancelFunc           // To close the serializer, accessed only from close(). + +	// All incoming (resolver --> gRPC) calls are guaranteed to execute in a +	// mutually exclusive manner as they are scheduled on the serializer. +	// Fields accessed *only* in these serializer callbacks, can therefore be +	// accessed without a mutex. +	curState resolver.State + +	// mu guards access to the below fields. +	mu       sync.Mutex +	closed   bool +	resolver resolver.Resolver // Accessed only from outgoing calls. +} -	incomingMu sync.Mutex // Synchronizes all the incoming calls. +// ccResolverWrapperOpts wraps the arguments to be passed when creating a new +// ccResolverWrapper. +type ccResolverWrapperOpts struct { +	target     resolver.Target       // User specified dial target to resolve. +	builder    resolver.Builder      // Resolver builder to use. +	bOpts      resolver.BuildOptions // Resolver build options to use. +	channelzID *channelz.Identifier  // Channelz identifier for the channel.  }  // newCCResolverWrapper uses the resolver.Builder to build a Resolver and  // returns a ccResolverWrapper object which wraps the newly built resolver. -func newCCResolverWrapper(cc *ClientConn, rb resolver.Builder) (*ccResolverWrapper, error) { +func newCCResolverWrapper(cc resolverStateUpdater, opts ccResolverWrapperOpts) (*ccResolverWrapper, error) { +	ctx, cancel := context.WithCancel(context.Background())  	ccr := &ccResolverWrapper{ -		cc:   cc, -		done: grpcsync.NewEvent(), -	} - -	var credsClone credentials.TransportCredentials -	if creds := cc.dopts.copts.TransportCredentials; creds != nil { -		credsClone = creds.Clone() -	} -	rbo := resolver.BuildOptions{ -		DisableServiceConfig: cc.dopts.disableServiceConfig, -		DialCreds:            credsClone, -		CredsBundle:          cc.dopts.copts.CredsBundle, -		Dialer:               cc.dopts.copts.Dialer, -	} - -	var err error -	// We need to hold the lock here while we assign to the ccr.resolver field -	// to guard against a data race caused by the following code path, -	// rb.Build-->ccr.ReportError-->ccr.poll-->ccr.resolveNow, would end up -	// accessing ccr.resolver which is being assigned here. -	ccr.resolverMu.Lock() -	defer ccr.resolverMu.Unlock() -	ccr.resolver, err = rb.Build(cc.parsedTarget, ccr, rbo) +		cc:                  cc, +		channelzID:          opts.channelzID, +		ignoreServiceConfig: opts.bOpts.DisableServiceConfig, +		opts:                opts, +		serializer:          grpcsync.NewCallbackSerializer(ctx), +		serializerCancel:    cancel, +	} + +	// Cannot hold the lock at build time because the resolver can send an +	// update or error inline and these incoming calls grab the lock to schedule +	// a callback in the serializer. +	r, err := opts.builder.Build(opts.target, ccr, opts.bOpts)  	if err != nil { +		cancel()  		return nil, err  	} + +	// Any error reported by the resolver at build time that leads to a +	// re-resolution request from the balancer is dropped by grpc until we +	// return from this function. So, we don't have to handle pending resolveNow +	// requests here. +	ccr.mu.Lock() +	ccr.resolver = r +	ccr.mu.Unlock() +  	return ccr, nil  }  func (ccr *ccResolverWrapper) resolveNow(o resolver.ResolveNowOptions) { -	ccr.resolverMu.Lock() -	if !ccr.done.HasFired() { -		ccr.resolver.ResolveNow(o) +	ccr.mu.Lock() +	defer ccr.mu.Unlock() + +	// ccr.resolver field is set only after the call to Build() returns. But in +	// the process of building, the resolver may send an error update which when +	// propagated to the balancer may result in a re-resolution request. +	if ccr.closed || ccr.resolver == nil { +		return  	} -	ccr.resolverMu.Unlock() +	ccr.resolver.ResolveNow(o)  }  func (ccr *ccResolverWrapper) close() { -	ccr.resolverMu.Lock() -	ccr.resolver.Close() -	ccr.done.Fire() -	ccr.resolverMu.Unlock() +	ccr.mu.Lock() +	if ccr.closed { +		ccr.mu.Unlock() +		return +	} + +	channelz.Info(logger, ccr.channelzID, "Closing the name resolver") + +	// Close the serializer to ensure that no more calls from the resolver are +	// handled, before actually closing the resolver. +	ccr.serializerCancel() +	ccr.closed = true +	r := ccr.resolver +	ccr.mu.Unlock() + +	// Give enqueued callbacks a chance to finish. +	<-ccr.serializer.Done + +	// Spawn a goroutine to close the resolver (since it may block trying to +	// cleanup all allocated resources) and return early. +	go r.Close() +} + +// serializerScheduleLocked is a convenience method to schedule a function to be +// run on the serializer while holding ccr.mu. +func (ccr *ccResolverWrapper) serializerScheduleLocked(f func(context.Context)) { +	ccr.mu.Lock() +	ccr.serializer.Schedule(f) +	ccr.mu.Unlock()  } +// UpdateState is called by resolver implementations to report new state to gRPC +// which includes addresses and service config.  func (ccr *ccResolverWrapper) UpdateState(s resolver.State) error { -	ccr.incomingMu.Lock() -	defer ccr.incomingMu.Unlock() -	if ccr.done.HasFired() { +	errCh := make(chan error, 1) +	ok := ccr.serializer.Schedule(func(context.Context) { +		ccr.addChannelzTraceEvent(s) +		ccr.curState = s +		if err := ccr.cc.updateResolverState(ccr.curState, nil); err == balancer.ErrBadResolverState { +			errCh <- balancer.ErrBadResolverState +			return +		} +		errCh <- nil +	}) +	if !ok { +		// The only time when Schedule() fail to add the callback to the +		// serializer is when the serializer is closed, and this happens only +		// when the resolver wrapper is closed.  		return nil  	} -	ccr.addChannelzTraceEvent(s) -	ccr.curState = s -	if err := ccr.cc.updateResolverState(ccr.curState, nil); err == balancer.ErrBadResolverState { -		return balancer.ErrBadResolverState -	} -	return nil +	return <-errCh  } +// ReportError is called by resolver implementations to report errors +// encountered during name resolution to gRPC.  func (ccr *ccResolverWrapper) ReportError(err error) { -	ccr.incomingMu.Lock() -	defer ccr.incomingMu.Unlock() -	if ccr.done.HasFired() { -		return -	} -	channelz.Warningf(logger, ccr.cc.channelzID, "ccResolverWrapper: reporting error to cc: %v", err) -	ccr.cc.updateResolverState(resolver.State{}, err) +	ccr.serializerScheduleLocked(func(_ context.Context) { +		channelz.Warningf(logger, ccr.channelzID, "ccResolverWrapper: reporting error to cc: %v", err) +		ccr.cc.updateResolverState(resolver.State{}, err) +	})  } -// NewAddress is called by the resolver implementation to send addresses to gRPC. +// NewAddress is called by the resolver implementation to send addresses to +// gRPC.  func (ccr *ccResolverWrapper) NewAddress(addrs []resolver.Address) { -	ccr.incomingMu.Lock() -	defer ccr.incomingMu.Unlock() -	if ccr.done.HasFired() { -		return -	} -	ccr.addChannelzTraceEvent(resolver.State{Addresses: addrs, ServiceConfig: ccr.curState.ServiceConfig}) -	ccr.curState.Addresses = addrs -	ccr.cc.updateResolverState(ccr.curState, nil) +	ccr.serializerScheduleLocked(func(_ context.Context) { +		ccr.addChannelzTraceEvent(resolver.State{Addresses: addrs, ServiceConfig: ccr.curState.ServiceConfig}) +		ccr.curState.Addresses = addrs +		ccr.cc.updateResolverState(ccr.curState, nil) +	})  }  // NewServiceConfig is called by the resolver implementation to send service  // configs to gRPC.  func (ccr *ccResolverWrapper) NewServiceConfig(sc string) { -	ccr.incomingMu.Lock() -	defer ccr.incomingMu.Unlock() -	if ccr.done.HasFired() { -		return -	} -	channelz.Infof(logger, ccr.cc.channelzID, "ccResolverWrapper: got new service config: %s", sc) -	if ccr.cc.dopts.disableServiceConfig { -		channelz.Info(logger, ccr.cc.channelzID, "Service config lookups disabled; ignoring config") -		return -	} -	scpr := parseServiceConfig(sc) -	if scpr.Err != nil { -		channelz.Warningf(logger, ccr.cc.channelzID, "ccResolverWrapper: error parsing service config: %v", scpr.Err) -		return -	} -	ccr.addChannelzTraceEvent(resolver.State{Addresses: ccr.curState.Addresses, ServiceConfig: scpr}) -	ccr.curState.ServiceConfig = scpr -	ccr.cc.updateResolverState(ccr.curState, nil) +	ccr.serializerScheduleLocked(func(_ context.Context) { +		channelz.Infof(logger, ccr.channelzID, "ccResolverWrapper: got new service config: %s", sc) +		if ccr.ignoreServiceConfig { +			channelz.Info(logger, ccr.channelzID, "Service config lookups disabled; ignoring config") +			return +		} +		scpr := parseServiceConfig(sc) +		if scpr.Err != nil { +			channelz.Warningf(logger, ccr.channelzID, "ccResolverWrapper: error parsing service config: %v", scpr.Err) +			return +		} +		ccr.addChannelzTraceEvent(resolver.State{Addresses: ccr.curState.Addresses, ServiceConfig: scpr}) +		ccr.curState.ServiceConfig = scpr +		ccr.cc.updateResolverState(ccr.curState, nil) +	})  } +// ParseServiceConfig is called by resolver implementations to parse a JSON +// representation of the service config.  func (ccr *ccResolverWrapper) ParseServiceConfig(scJSON string) *serviceconfig.ParseResult {  	return parseServiceConfig(scJSON)  } +// addChannelzTraceEvent adds a channelz trace event containing the new +// state received from resolver implementations.  func (ccr *ccResolverWrapper) addChannelzTraceEvent(s resolver.State) {  	var updates []string  	var oldSC, newSC *ServiceConfig @@ -172,5 +235,5 @@ func (ccr *ccResolverWrapper) addChannelzTraceEvent(s resolver.State) {  	} else if len(ccr.curState.Addresses) == 0 && len(s.Addresses) > 0 {  		updates = append(updates, "resolver returned new addresses")  	} -	channelz.Infof(logger, ccr.cc.channelzID, "Resolver state updated: %s (%v)", pretty.ToJSON(s), strings.Join(updates, "; ")) +	channelz.Infof(logger, ccr.channelzID, "Resolver state updated: %s (%v)", pretty.ToJSON(s), strings.Join(updates, "; "))  } diff --git a/vendor/google.golang.org/grpc/rpc_util.go b/vendor/google.golang.org/grpc/rpc_util.go index 2030736a3..a844d28f4 100644 --- a/vendor/google.golang.org/grpc/rpc_util.go +++ b/vendor/google.golang.org/grpc/rpc_util.go @@ -577,6 +577,9 @@ type parser struct {  	// The header of a gRPC message. Find more detail at  	// https://github.com/grpc/grpc/blob/master/doc/PROTOCOL-HTTP2.md  	header [5]byte + +	// recvBufferPool is the pool of shared receive buffers. +	recvBufferPool SharedBufferPool  }  // recvMsg reads a complete gRPC message from the stream. @@ -610,9 +613,7 @@ func (p *parser) recvMsg(maxReceiveMessageSize int) (pf payloadFormat, msg []byt  	if int(length) > maxReceiveMessageSize {  		return 0, nil, status.Errorf(codes.ResourceExhausted, "grpc: received message larger than max (%d vs. %d)", length, maxReceiveMessageSize)  	} -	// TODO(bradfitz,zhaoq): garbage. reuse buffer after proto decoding instead -	// of making it for each message: -	msg = make([]byte, int(length)) +	msg = p.recvBufferPool.Get(int(length))  	if _, err := p.r.Read(msg); err != nil {  		if err == io.EOF {  			err = io.ErrUnexpectedEOF @@ -726,12 +727,12 @@ type payloadInfo struct {  }  func recvAndDecompress(p *parser, s *transport.Stream, dc Decompressor, maxReceiveMessageSize int, payInfo *payloadInfo, compressor encoding.Compressor) ([]byte, error) { -	pf, d, err := p.recvMsg(maxReceiveMessageSize) +	pf, buf, err := p.recvMsg(maxReceiveMessageSize)  	if err != nil {  		return nil, err  	}  	if payInfo != nil { -		payInfo.compressedLength = len(d) +		payInfo.compressedLength = len(buf)  	}  	if st := checkRecvPayload(pf, s.RecvCompress(), compressor != nil || dc != nil); st != nil { @@ -743,10 +744,10 @@ func recvAndDecompress(p *parser, s *transport.Stream, dc Decompressor, maxRecei  		// To match legacy behavior, if the decompressor is set by WithDecompressor or RPCDecompressor,  		// use this decompressor as the default.  		if dc != nil { -			d, err = dc.Do(bytes.NewReader(d)) -			size = len(d) +			buf, err = dc.Do(bytes.NewReader(buf)) +			size = len(buf)  		} else { -			d, size, err = decompress(compressor, d, maxReceiveMessageSize) +			buf, size, err = decompress(compressor, buf, maxReceiveMessageSize)  		}  		if err != nil {  			return nil, status.Errorf(codes.Internal, "grpc: failed to decompress the received message: %v", err) @@ -757,7 +758,7 @@ func recvAndDecompress(p *parser, s *transport.Stream, dc Decompressor, maxRecei  			return nil, status.Errorf(codes.ResourceExhausted, "grpc: received message after decompression larger than max (%d vs. %d)", size, maxReceiveMessageSize)  		}  	} -	return d, nil +	return buf, nil  }  // Using compressor, decompress d, returning data and size. @@ -792,15 +793,17 @@ func decompress(compressor encoding.Compressor, d []byte, maxReceiveMessageSize  // dc takes precedence over compressor.  // TODO(dfawley): wrap the old compressor/decompressor using the new API?  func recv(p *parser, c baseCodec, s *transport.Stream, dc Decompressor, m interface{}, maxReceiveMessageSize int, payInfo *payloadInfo, compressor encoding.Compressor) error { -	d, err := recvAndDecompress(p, s, dc, maxReceiveMessageSize, payInfo, compressor) +	buf, err := recvAndDecompress(p, s, dc, maxReceiveMessageSize, payInfo, compressor)  	if err != nil {  		return err  	} -	if err := c.Unmarshal(d, m); err != nil { +	if err := c.Unmarshal(buf, m); err != nil {  		return status.Errorf(codes.Internal, "grpc: failed to unmarshal the received message: %v", err)  	}  	if payInfo != nil { -		payInfo.uncompressedBytes = d +		payInfo.uncompressedBytes = buf +	} else { +		p.recvBufferPool.Put(&buf)  	}  	return nil  } diff --git a/vendor/google.golang.org/grpc/server.go b/vendor/google.golang.org/grpc/server.go index 76d152a69..e076ec714 100644 --- a/vendor/google.golang.org/grpc/server.go +++ b/vendor/google.golang.org/grpc/server.go @@ -174,6 +174,7 @@ type serverOptions struct {  	maxHeaderListSize     *uint32  	headerTableSize       *uint32  	numServerWorkers      uint32 +	recvBufferPool        SharedBufferPool  }  var defaultServerOptions = serverOptions{ @@ -182,6 +183,7 @@ var defaultServerOptions = serverOptions{  	connectionTimeout:     120 * time.Second,  	writeBufferSize:       defaultWriteBufSize,  	readBufferSize:        defaultReadBufSize, +	recvBufferPool:        nopBufferPool{},  }  var globalServerOptions []ServerOption @@ -552,6 +554,27 @@ func NumStreamWorkers(numServerWorkers uint32) ServerOption {  	})  } +// RecvBufferPool returns a ServerOption that configures the server +// to use the provided shared buffer pool for parsing incoming messages. Depending +// on the application's workload, this could result in reduced memory allocation. +// +// If you are unsure about how to implement a memory pool but want to utilize one, +// begin with grpc.NewSharedBufferPool. +// +// Note: The shared buffer pool feature will not be active if any of the following +// options are used: StatsHandler, EnableTracing, or binary logging. In such +// cases, the shared buffer pool will be ignored. +// +// # Experimental +// +// Notice: This API is EXPERIMENTAL and may be changed or removed in a +// later release. +func RecvBufferPool(bufferPool SharedBufferPool) ServerOption { +	return newFuncServerOption(func(o *serverOptions) { +		o.recvBufferPool = bufferPool +	}) +} +  // serverWorkerResetThreshold defines how often the stack must be reset. Every  // N requests, by spawning a new goroutine in its place, a worker can reset its  // stack so that large stacks don't live in memory forever. 2^16 should allow @@ -895,7 +918,7 @@ func (s *Server) drainServerTransports(addr string) {  	s.mu.Lock()  	conns := s.conns[addr]  	for st := range conns { -		st.Drain() +		st.Drain("")  	}  	s.mu.Unlock()  } @@ -1046,7 +1069,7 @@ func (s *Server) addConn(addr string, st transport.ServerTransport) bool {  	if s.drain {  		// Transport added after we drained our existing conns: drain it  		// immediately. -		st.Drain() +		st.Drain("")  	}  	if s.conns[addr] == nil { @@ -1296,7 +1319,7 @@ func (s *Server) processUnaryRPC(t transport.ServerTransport, stream *transport.  	if len(shs) != 0 || len(binlogs) != 0 {  		payInfo = &payloadInfo{}  	} -	d, err := recvAndDecompress(&parser{r: stream}, stream, dc, s.opts.maxReceiveMessageSize, payInfo, decomp) +	d, err := recvAndDecompress(&parser{r: stream, recvBufferPool: s.opts.recvBufferPool}, stream, dc, s.opts.maxReceiveMessageSize, payInfo, decomp)  	if err != nil {  		if e := t.WriteStatus(stream, status.Convert(err)); e != nil {  			channelz.Warningf(logger, s.channelzID, "grpc: Server.processUnaryRPC failed to write status: %v", e) @@ -1506,7 +1529,7 @@ func (s *Server) processStreamingRPC(t transport.ServerTransport, stream *transp  		ctx:                   ctx,  		t:                     t,  		s:                     stream, -		p:                     &parser{r: stream}, +		p:                     &parser{r: stream, recvBufferPool: s.opts.recvBufferPool},  		codec:                 s.getCodec(stream.ContentSubtype()),  		maxReceiveMessageSize: s.opts.maxReceiveMessageSize,  		maxSendMessageSize:    s.opts.maxSendMessageSize, @@ -1856,7 +1879,7 @@ func (s *Server) GracefulStop() {  	if !s.drain {  		for _, conns := range s.conns {  			for st := range conns { -				st.Drain() +				st.Drain("graceful_stop")  			}  		}  		s.drain = true diff --git a/vendor/google.golang.org/grpc/service_config.go b/vendor/google.golang.org/grpc/service_config.go index f22acace4..0df11fc09 100644 --- a/vendor/google.golang.org/grpc/service_config.go +++ b/vendor/google.golang.org/grpc/service_config.go @@ -23,8 +23,6 @@ import (  	"errors"  	"fmt"  	"reflect" -	"strconv" -	"strings"  	"time"  	"google.golang.org/grpc/codes" @@ -106,8 +104,8 @@ type healthCheckConfig struct {  type jsonRetryPolicy struct {  	MaxAttempts          int -	InitialBackoff       string -	MaxBackoff           string +	InitialBackoff       internalserviceconfig.Duration +	MaxBackoff           internalserviceconfig.Duration  	BackoffMultiplier    float64  	RetryableStatusCodes []codes.Code  } @@ -129,50 +127,6 @@ type retryThrottlingPolicy struct {  	TokenRatio float64  } -func parseDuration(s *string) (*time.Duration, error) { -	if s == nil { -		return nil, nil -	} -	if !strings.HasSuffix(*s, "s") { -		return nil, fmt.Errorf("malformed duration %q", *s) -	} -	ss := strings.SplitN((*s)[:len(*s)-1], ".", 3) -	if len(ss) > 2 { -		return nil, fmt.Errorf("malformed duration %q", *s) -	} -	// hasDigits is set if either the whole or fractional part of the number is -	// present, since both are optional but one is required. -	hasDigits := false -	var d time.Duration -	if len(ss[0]) > 0 { -		i, err := strconv.ParseInt(ss[0], 10, 32) -		if err != nil { -			return nil, fmt.Errorf("malformed duration %q: %v", *s, err) -		} -		d = time.Duration(i) * time.Second -		hasDigits = true -	} -	if len(ss) == 2 && len(ss[1]) > 0 { -		if len(ss[1]) > 9 { -			return nil, fmt.Errorf("malformed duration %q", *s) -		} -		f, err := strconv.ParseInt(ss[1], 10, 64) -		if err != nil { -			return nil, fmt.Errorf("malformed duration %q: %v", *s, err) -		} -		for i := 9; i > len(ss[1]); i-- { -			f *= 10 -		} -		d += time.Duration(f) -		hasDigits = true -	} -	if !hasDigits { -		return nil, fmt.Errorf("malformed duration %q", *s) -	} - -	return &d, nil -} -  type jsonName struct {  	Service string  	Method  string @@ -201,7 +155,7 @@ func (j jsonName) generatePath() (string, error) {  type jsonMC struct {  	Name                    *[]jsonName  	WaitForReady            *bool -	Timeout                 *string +	Timeout                 *internalserviceconfig.Duration  	MaxRequestMessageBytes  *int64  	MaxResponseMessageBytes *int64  	RetryPolicy             *jsonRetryPolicy @@ -252,15 +206,10 @@ func parseServiceConfig(js string) *serviceconfig.ParseResult {  		if m.Name == nil {  			continue  		} -		d, err := parseDuration(m.Timeout) -		if err != nil { -			logger.Warningf("grpc: unmarshaling service config %s: %v", js, err) -			return &serviceconfig.ParseResult{Err: err} -		}  		mc := MethodConfig{  			WaitForReady: m.WaitForReady, -			Timeout:      d, +			Timeout:      (*time.Duration)(m.Timeout),  		}  		if mc.RetryPolicy, err = convertRetryPolicy(m.RetryPolicy); err != nil {  			logger.Warningf("grpc: unmarshaling service config %s: %v", js, err) @@ -312,18 +261,10 @@ func convertRetryPolicy(jrp *jsonRetryPolicy) (p *internalserviceconfig.RetryPol  	if jrp == nil {  		return nil, nil  	} -	ib, err := parseDuration(&jrp.InitialBackoff) -	if err != nil { -		return nil, err -	} -	mb, err := parseDuration(&jrp.MaxBackoff) -	if err != nil { -		return nil, err -	}  	if jrp.MaxAttempts <= 1 || -		*ib <= 0 || -		*mb <= 0 || +		jrp.InitialBackoff <= 0 || +		jrp.MaxBackoff <= 0 ||  		jrp.BackoffMultiplier <= 0 ||  		len(jrp.RetryableStatusCodes) == 0 {  		logger.Warningf("grpc: ignoring retry policy %v due to illegal configuration", jrp) @@ -332,8 +273,8 @@ func convertRetryPolicy(jrp *jsonRetryPolicy) (p *internalserviceconfig.RetryPol  	rp := &internalserviceconfig.RetryPolicy{  		MaxAttempts:          jrp.MaxAttempts, -		InitialBackoff:       *ib, -		MaxBackoff:           *mb, +		InitialBackoff:       time.Duration(jrp.InitialBackoff), +		MaxBackoff:           time.Duration(jrp.MaxBackoff),  		BackoffMultiplier:    jrp.BackoffMultiplier,  		RetryableStatusCodes: make(map[codes.Code]bool),  	} diff --git a/vendor/google.golang.org/grpc/shared_buffer_pool.go b/vendor/google.golang.org/grpc/shared_buffer_pool.go new file mode 100644 index 000000000..c3a5a9ac1 --- /dev/null +++ b/vendor/google.golang.org/grpc/shared_buffer_pool.go @@ -0,0 +1,154 @@ +/* + * + * Copyright 2023 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + *     http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package grpc + +import "sync" + +// SharedBufferPool is a pool of buffers that can be shared, resulting in +// decreased memory allocation. Currently, in gRPC-go, it is only utilized +// for parsing incoming messages. +// +// # Experimental +// +// Notice: This API is EXPERIMENTAL and may be changed or removed in a +// later release. +type SharedBufferPool interface { +	// Get returns a buffer with specified length from the pool. +	// +	// The returned byte slice may be not zero initialized. +	Get(length int) []byte + +	// Put returns a buffer to the pool. +	Put(*[]byte) +} + +// NewSharedBufferPool creates a simple SharedBufferPool with buckets +// of different sizes to optimize memory usage. This prevents the pool from +// wasting large amounts of memory, even when handling messages of varying sizes. +// +// # Experimental +// +// Notice: This API is EXPERIMENTAL and may be changed or removed in a +// later release. +func NewSharedBufferPool() SharedBufferPool { +	return &simpleSharedBufferPool{ +		pools: [poolArraySize]simpleSharedBufferChildPool{ +			newBytesPool(level0PoolMaxSize), +			newBytesPool(level1PoolMaxSize), +			newBytesPool(level2PoolMaxSize), +			newBytesPool(level3PoolMaxSize), +			newBytesPool(level4PoolMaxSize), +			newBytesPool(0), +		}, +	} +} + +// simpleSharedBufferPool is a simple implementation of SharedBufferPool. +type simpleSharedBufferPool struct { +	pools [poolArraySize]simpleSharedBufferChildPool +} + +func (p *simpleSharedBufferPool) Get(size int) []byte { +	return p.pools[p.poolIdx(size)].Get(size) +} + +func (p *simpleSharedBufferPool) Put(bs *[]byte) { +	p.pools[p.poolIdx(cap(*bs))].Put(bs) +} + +func (p *simpleSharedBufferPool) poolIdx(size int) int { +	switch { +	case size <= level0PoolMaxSize: +		return level0PoolIdx +	case size <= level1PoolMaxSize: +		return level1PoolIdx +	case size <= level2PoolMaxSize: +		return level2PoolIdx +	case size <= level3PoolMaxSize: +		return level3PoolIdx +	case size <= level4PoolMaxSize: +		return level4PoolIdx +	default: +		return levelMaxPoolIdx +	} +} + +const ( +	level0PoolMaxSize = 16                     //  16  B +	level1PoolMaxSize = level0PoolMaxSize * 16 // 256  B +	level2PoolMaxSize = level1PoolMaxSize * 16 //   4 KB +	level3PoolMaxSize = level2PoolMaxSize * 16 //  64 KB +	level4PoolMaxSize = level3PoolMaxSize * 16 //   1 MB +) + +const ( +	level0PoolIdx = iota +	level1PoolIdx +	level2PoolIdx +	level3PoolIdx +	level4PoolIdx +	levelMaxPoolIdx +	poolArraySize +) + +type simpleSharedBufferChildPool interface { +	Get(size int) []byte +	Put(interface{}) +} + +type bufferPool struct { +	sync.Pool + +	defaultSize int +} + +func (p *bufferPool) Get(size int) []byte { +	bs := p.Pool.Get().(*[]byte) + +	if cap(*bs) < size { +		p.Pool.Put(bs) + +		return make([]byte, size) +	} + +	return (*bs)[:size] +} + +func newBytesPool(size int) simpleSharedBufferChildPool { +	return &bufferPool{ +		Pool: sync.Pool{ +			New: func() interface{} { +				bs := make([]byte, size) +				return &bs +			}, +		}, +		defaultSize: size, +	} +} + +// nopBufferPool is a buffer pool just makes new buffer without pooling. +type nopBufferPool struct { +} + +func (nopBufferPool) Get(length int) []byte { +	return make([]byte, length) +} + +func (nopBufferPool) Put(*[]byte) { +} diff --git a/vendor/google.golang.org/grpc/status/status.go b/vendor/google.golang.org/grpc/status/status.go index 53910fb7c..bcf2e4d81 100644 --- a/vendor/google.golang.org/grpc/status/status.go +++ b/vendor/google.golang.org/grpc/status/status.go @@ -77,11 +77,18 @@ func FromProto(s *spb.Status) *Status {  // FromError returns a Status representation of err.  //  //   - If err was produced by this package or implements the method `GRPCStatus() -//     *Status`, or if err wraps a type satisfying this, the appropriate Status is -//     returned.  For wrapped errors, the message returned contains the entire -//     err.Error() text and not just the wrapped status. +//     *Status` and `GRPCStatus()` does not return nil, or if err wraps a type +//     satisfying this, the Status from `GRPCStatus()` is returned.  For wrapped +//     errors, the message returned contains the entire err.Error() text and not +//     just the wrapped status. In that case, ok is true.  // -//   - If err is nil, a Status is returned with codes.OK and no message. +//   - If err is nil, a Status is returned with codes.OK and no message, and ok +//     is true. +// +//   - If err implements the method `GRPCStatus() *Status` and `GRPCStatus()` +//     returns nil (which maps to Codes.OK), or if err wraps a type +//     satisfying this, a Status is returned with codes.Unknown and err's +//     Error() message, and ok is false.  //  //   - Otherwise, err is an error not compatible with this package.  In this  //     case, a Status is returned with codes.Unknown and err's Error() message, @@ -92,10 +99,24 @@ func FromError(err error) (s *Status, ok bool) {  	}  	type grpcstatus interface{ GRPCStatus() *Status }  	if gs, ok := err.(grpcstatus); ok { +		if gs.GRPCStatus() == nil { +			// Error has status nil, which maps to codes.OK. There +			// is no sensible behavior for this, so we turn it into +			// an error with codes.Unknown and discard the existing +			// status. +			return New(codes.Unknown, err.Error()), false +		}  		return gs.GRPCStatus(), true  	}  	var gs grpcstatus  	if errors.As(err, &gs) { +		if gs.GRPCStatus() == nil { +			// Error wraps an error that has status nil, which maps +			// to codes.OK.  There is no sensible behavior for this, +			// so we turn it into an error with codes.Unknown and +			// discard the existing status. +			return New(codes.Unknown, err.Error()), false +		}  		p := gs.GRPCStatus().Proto()  		p.Message = err.Error()  		return status.FromProto(p), true diff --git a/vendor/google.golang.org/grpc/stream.go b/vendor/google.golang.org/grpc/stream.go index d1226a412..de32a7597 100644 --- a/vendor/google.golang.org/grpc/stream.go +++ b/vendor/google.golang.org/grpc/stream.go @@ -123,6 +123,9 @@ type ClientStream interface {  	// calling RecvMsg on the same stream at the same time, but it is not safe  	// to call SendMsg on the same stream in different goroutines. It is also  	// not safe to call CloseSend concurrently with SendMsg. +	// +	// It is not safe to modify the message after calling SendMsg. Tracing +	// libraries and stats handlers may use the message lazily.  	SendMsg(m interface{}) error  	// RecvMsg blocks until it receives a message into m or the stream is  	// done. It returns io.EOF when the stream completes successfully. On @@ -152,6 +155,11 @@ type ClientStream interface {  // If none of the above happen, a goroutine and a context will be leaked, and grpc  // will not call the optionally-configured stats handler with a stats.End message.  func (cc *ClientConn) NewStream(ctx context.Context, desc *StreamDesc, method string, opts ...CallOption) (ClientStream, error) { +	if err := cc.idlenessMgr.onCallBegin(); err != nil { +		return nil, err +	} +	defer cc.idlenessMgr.onCallEnd() +  	// allow interceptor to see all applicable call options, which means those  	// configured as defaults from dial option as well as per-call options  	opts = combine(cc.dopts.callOptions, opts) @@ -469,7 +477,7 @@ func (a *csAttempt) newStream() error {  	// It is safe to overwrite the csAttempt's context here, since all state  	// maintained in it are local to the attempt. When the attempt has to be  	// retried, a new instance of csAttempt will be created. -	if a.pickResult.Metatada != nil { +	if a.pickResult.Metadata != nil {  		// We currently do not have a function it the metadata package which  		// merges given metadata with existing metadata in a context. Existing  		// function `AppendToOutgoingContext()` takes a variadic argument of key @@ -479,7 +487,7 @@ func (a *csAttempt) newStream() error {  		// in a form passable to AppendToOutgoingContext(), or create a version  		// of AppendToOutgoingContext() that accepts a metadata.MD.  		md, _ := metadata.FromOutgoingContext(a.ctx) -		md = metadata.Join(md, a.pickResult.Metatada) +		md = metadata.Join(md, a.pickResult.Metadata)  		a.ctx = metadata.NewOutgoingContext(a.ctx, md)  	} @@ -499,7 +507,7 @@ func (a *csAttempt) newStream() error {  		return toRPCErr(nse.Err)  	}  	a.s = s -	a.p = &parser{r: s} +	a.p = &parser{r: s, recvBufferPool: a.cs.cc.dopts.recvBufferPool}  	return nil  } @@ -1262,17 +1270,22 @@ func newNonRetryClientStream(ctx context.Context, desc *StreamDesc, method strin  		return nil, err  	}  	as.s = s -	as.p = &parser{r: s} +	as.p = &parser{r: s, recvBufferPool: ac.dopts.recvBufferPool}  	ac.incrCallsStarted()  	if desc != unaryStreamDesc { -		// Listen on cc and stream contexts to cleanup when the user closes the -		// ClientConn or cancels the stream context.  In all other cases, an error -		// should already be injected into the recv buffer by the transport, which -		// the client will eventually receive, and then we will cancel the stream's -		// context in clientStream.finish. +		// Listen on stream context to cleanup when the stream context is +		// canceled.  Also listen for the addrConn's context in case the +		// addrConn is closed or reconnects to a different address.  In all +		// other cases, an error should already be injected into the recv +		// buffer by the transport, which the client will eventually receive, +		// and then we will cancel the stream's context in +		// addrConnStream.finish.  		go func() { +			ac.mu.Lock() +			acCtx := ac.ctx +			ac.mu.Unlock()  			select { -			case <-ac.ctx.Done(): +			case <-acCtx.Done():  				as.finish(status.Error(codes.Canceled, "grpc: the SubConn is closing"))  			case <-ctx.Done():  				as.finish(toRPCErr(ctx.Err())) diff --git a/vendor/google.golang.org/grpc/version.go b/vendor/google.golang.org/grpc/version.go index 853ce0e30..353cfd528 100644 --- a/vendor/google.golang.org/grpc/version.go +++ b/vendor/google.golang.org/grpc/version.go @@ -19,4 +19,4 @@  package grpc  // Version is the current grpc version. -const Version = "1.55.0" +const Version = "1.57.0" | 
