diff options
Diffstat (limited to 'vendor/google.golang.org/grpc/balancer_wrapper.go')
| -rw-r--r-- | vendor/google.golang.org/grpc/balancer_wrapper.go | 82 |
1 files changed, 75 insertions, 7 deletions
diff --git a/vendor/google.golang.org/grpc/balancer_wrapper.go b/vendor/google.golang.org/grpc/balancer_wrapper.go index 905817b5f..948a21ef6 100644 --- a/vendor/google.golang.org/grpc/balancer_wrapper.go +++ b/vendor/google.golang.org/grpc/balancer_wrapper.go @@ -26,6 +26,7 @@ import ( "google.golang.org/grpc/balancer" "google.golang.org/grpc/codes" "google.golang.org/grpc/connectivity" + "google.golang.org/grpc/experimental/stats" "google.golang.org/grpc/internal" "google.golang.org/grpc/internal/balancer/gracefulswitch" "google.golang.org/grpc/internal/channelz" @@ -34,7 +35,15 @@ import ( "google.golang.org/grpc/status" ) -var setConnectedAddress = internal.SetConnectedAddress.(func(*balancer.SubConnState, resolver.Address)) +var ( + setConnectedAddress = internal.SetConnectedAddress.(func(*balancer.SubConnState, resolver.Address)) + // noOpRegisterHealthListenerFn is used when client side health checking is + // disabled. It sends a single READY update on the registered listener. + noOpRegisterHealthListenerFn = func(_ context.Context, listener func(balancer.SubConnState)) func() { + listener(balancer.SubConnState{ConnectivityState: connectivity.Ready}) + return func() {} + } +) // ccBalancerWrapper sits between the ClientConn and the Balancer. // @@ -51,6 +60,7 @@ var setConnectedAddress = internal.SetConnectedAddress.(func(*balancer.SubConnSt // It uses the gracefulswitch.Balancer internally to ensure that balancer // switches happen in a graceful manner. type ccBalancerWrapper struct { + internal.EnforceClientConnEmbedding // The following fields are initialized when the wrapper is created and are // read-only afterwards, and therefore can be accessed without a mutex. cc *ClientConn @@ -84,7 +94,6 @@ func newCCBalancerWrapper(cc *ClientConn) *ccBalancerWrapper { CustomUserAgent: cc.dopts.copts.UserAgent, ChannelzParent: cc.channelz, Target: cc.parsedTarget, - MetricsRecorder: cc.metricsRecorderList, }, serializer: grpcsync.NewCallbackSerializer(ctx), serializerCancel: cancel, @@ -93,6 +102,10 @@ func newCCBalancerWrapper(cc *ClientConn) *ccBalancerWrapper { return ccb } +func (ccb *ccBalancerWrapper) MetricsRecorder() stats.MetricsRecorder { + return ccb.cc.metricsRecorderList +} + // updateClientConnState is invoked by grpc to push a ClientConnState update to // the underlying balancer. This is always executed from the serializer, so // it is safe to call into the balancer here. @@ -277,10 +290,17 @@ type healthData struct { // to the LB policy. This is stored to avoid sending updates when the // SubConn has already exited connectivity state READY. connectivityState connectivity.State + // closeHealthProducer stores function to close the ref counted health + // producer. The health producer is automatically closed when the SubConn + // state changes. + closeHealthProducer func() } func newHealthData(s connectivity.State) *healthData { - return &healthData{connectivityState: s} + return &healthData{ + connectivityState: s, + closeHealthProducer: func() {}, + } } // updateState is invoked by grpc to push a subConn state update to the @@ -400,7 +420,7 @@ func (acbw *acBalancerWrapper) GetOrBuildProducer(pb balancer.ProducerBuilder) ( } acbw.producersMu.Unlock() } - return pData.producer, grpcsync.OnceFunc(unref) + return pData.producer, sync.OnceFunc(unref) } func (acbw *acBalancerWrapper) closeProducers() { @@ -413,6 +433,37 @@ func (acbw *acBalancerWrapper) closeProducers() { } } +// healthProducerRegisterFn is a type alias for the health producer's function +// for registering listeners. +type healthProducerRegisterFn = func(context.Context, balancer.SubConn, string, func(balancer.SubConnState)) func() + +// healthListenerRegFn returns a function to register a listener for health +// updates. If client side health checks are disabled, the registered listener +// will get a single READY (raw connectivity state) update. +// +// Client side health checking is enabled when all the following +// conditions are satisfied: +// 1. Health checking is not disabled using the dial option. +// 2. The health package is imported. +// 3. The health check config is present in the service config. +func (acbw *acBalancerWrapper) healthListenerRegFn() func(context.Context, func(balancer.SubConnState)) func() { + if acbw.ccb.cc.dopts.disableHealthCheck { + return noOpRegisterHealthListenerFn + } + regHealthLisFn := internal.RegisterClientHealthCheckListener + if regHealthLisFn == nil { + // The health package is not imported. + return noOpRegisterHealthListenerFn + } + cfg := acbw.ac.cc.healthCheckConfig() + if cfg == nil { + return noOpRegisterHealthListenerFn + } + return func(ctx context.Context, listener func(balancer.SubConnState)) func() { + return regHealthLisFn.(healthProducerRegisterFn)(ctx, acbw, cfg.ServiceName, listener) + } +} + // RegisterHealthListener accepts a health listener from the LB policy. It sends // updates to the health listener as long as the SubConn's connectivity state // doesn't change and a new health listener is not registered. To invalidate @@ -421,6 +472,7 @@ func (acbw *acBalancerWrapper) closeProducers() { func (acbw *acBalancerWrapper) RegisterHealthListener(listener func(balancer.SubConnState)) { acbw.healthMu.Lock() defer acbw.healthMu.Unlock() + acbw.healthData.closeHealthProducer() // listeners should not be registered when the connectivity state // isn't Ready. This may happen when the balancer registers a listener // after the connectivityState is updated, but before it is notified @@ -436,6 +488,7 @@ func (acbw *acBalancerWrapper) RegisterHealthListener(listener func(balancer.Sub return } + registerFn := acbw.healthListenerRegFn() acbw.ccb.serializer.TrySchedule(func(ctx context.Context) { if ctx.Err() != nil || acbw.ccb.balancer == nil { return @@ -443,10 +496,25 @@ func (acbw *acBalancerWrapper) RegisterHealthListener(listener func(balancer.Sub // Don't send updates if a new listener is registered. acbw.healthMu.Lock() defer acbw.healthMu.Unlock() - curHD := acbw.healthData - if curHD != hd { + if acbw.healthData != hd { return } - listener(balancer.SubConnState{ConnectivityState: connectivity.Ready}) + // Serialize the health updates from the health producer with + // other calls into the LB policy. + listenerWrapper := func(scs balancer.SubConnState) { + acbw.ccb.serializer.TrySchedule(func(ctx context.Context) { + if ctx.Err() != nil || acbw.ccb.balancer == nil { + return + } + acbw.healthMu.Lock() + defer acbw.healthMu.Unlock() + if acbw.healthData != hd { + return + } + listener(scs) + }) + } + + hd.closeHealthProducer = registerFn(ctx, listenerWrapper) }) } |
