summaryrefslogtreecommitdiff
path: root/vendor/google.golang.org/grpc/balancer_wrapper.go
diff options
context:
space:
mode:
authorLibravatar dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>2025-03-10 09:23:45 +0000
committerLibravatar GitHub <noreply@github.com>2025-03-10 09:23:45 +0000
commit252a17a691a316af8a1b0d960436c32257b4f66b (patch)
tree3768a2a373f3a0e462b33ef389e30ce761dce5cb /vendor/google.golang.org/grpc/balancer_wrapper.go
parent[chore]: Bump github.com/prometheus/client_golang from 1.21.0 to 1.21.1 (#3890) (diff)
downloadgotosocial-252a17a691a316af8a1b0d960436c32257b4f66b.tar.xz
[chore]: Bump go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc (#3888)
Diffstat (limited to 'vendor/google.golang.org/grpc/balancer_wrapper.go')
-rw-r--r--vendor/google.golang.org/grpc/balancer_wrapper.go82
1 files changed, 75 insertions, 7 deletions
diff --git a/vendor/google.golang.org/grpc/balancer_wrapper.go b/vendor/google.golang.org/grpc/balancer_wrapper.go
index 905817b5f..948a21ef6 100644
--- a/vendor/google.golang.org/grpc/balancer_wrapper.go
+++ b/vendor/google.golang.org/grpc/balancer_wrapper.go
@@ -26,6 +26,7 @@ import (
"google.golang.org/grpc/balancer"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/connectivity"
+ "google.golang.org/grpc/experimental/stats"
"google.golang.org/grpc/internal"
"google.golang.org/grpc/internal/balancer/gracefulswitch"
"google.golang.org/grpc/internal/channelz"
@@ -34,7 +35,15 @@ import (
"google.golang.org/grpc/status"
)
-var setConnectedAddress = internal.SetConnectedAddress.(func(*balancer.SubConnState, resolver.Address))
+var (
+ setConnectedAddress = internal.SetConnectedAddress.(func(*balancer.SubConnState, resolver.Address))
+ // noOpRegisterHealthListenerFn is used when client side health checking is
+ // disabled. It sends a single READY update on the registered listener.
+ noOpRegisterHealthListenerFn = func(_ context.Context, listener func(balancer.SubConnState)) func() {
+ listener(balancer.SubConnState{ConnectivityState: connectivity.Ready})
+ return func() {}
+ }
+)
// ccBalancerWrapper sits between the ClientConn and the Balancer.
//
@@ -51,6 +60,7 @@ var setConnectedAddress = internal.SetConnectedAddress.(func(*balancer.SubConnSt
// It uses the gracefulswitch.Balancer internally to ensure that balancer
// switches happen in a graceful manner.
type ccBalancerWrapper struct {
+ internal.EnforceClientConnEmbedding
// The following fields are initialized when the wrapper is created and are
// read-only afterwards, and therefore can be accessed without a mutex.
cc *ClientConn
@@ -84,7 +94,6 @@ func newCCBalancerWrapper(cc *ClientConn) *ccBalancerWrapper {
CustomUserAgent: cc.dopts.copts.UserAgent,
ChannelzParent: cc.channelz,
Target: cc.parsedTarget,
- MetricsRecorder: cc.metricsRecorderList,
},
serializer: grpcsync.NewCallbackSerializer(ctx),
serializerCancel: cancel,
@@ -93,6 +102,10 @@ func newCCBalancerWrapper(cc *ClientConn) *ccBalancerWrapper {
return ccb
}
+func (ccb *ccBalancerWrapper) MetricsRecorder() stats.MetricsRecorder {
+ return ccb.cc.metricsRecorderList
+}
+
// updateClientConnState is invoked by grpc to push a ClientConnState update to
// the underlying balancer. This is always executed from the serializer, so
// it is safe to call into the balancer here.
@@ -277,10 +290,17 @@ type healthData struct {
// to the LB policy. This is stored to avoid sending updates when the
// SubConn has already exited connectivity state READY.
connectivityState connectivity.State
+ // closeHealthProducer stores function to close the ref counted health
+ // producer. The health producer is automatically closed when the SubConn
+ // state changes.
+ closeHealthProducer func()
}
func newHealthData(s connectivity.State) *healthData {
- return &healthData{connectivityState: s}
+ return &healthData{
+ connectivityState: s,
+ closeHealthProducer: func() {},
+ }
}
// updateState is invoked by grpc to push a subConn state update to the
@@ -400,7 +420,7 @@ func (acbw *acBalancerWrapper) GetOrBuildProducer(pb balancer.ProducerBuilder) (
}
acbw.producersMu.Unlock()
}
- return pData.producer, grpcsync.OnceFunc(unref)
+ return pData.producer, sync.OnceFunc(unref)
}
func (acbw *acBalancerWrapper) closeProducers() {
@@ -413,6 +433,37 @@ func (acbw *acBalancerWrapper) closeProducers() {
}
}
+// healthProducerRegisterFn is a type alias for the health producer's function
+// for registering listeners.
+type healthProducerRegisterFn = func(context.Context, balancer.SubConn, string, func(balancer.SubConnState)) func()
+
+// healthListenerRegFn returns a function to register a listener for health
+// updates. If client side health checks are disabled, the registered listener
+// will get a single READY (raw connectivity state) update.
+//
+// Client side health checking is enabled when all the following
+// conditions are satisfied:
+// 1. Health checking is not disabled using the dial option.
+// 2. The health package is imported.
+// 3. The health check config is present in the service config.
+func (acbw *acBalancerWrapper) healthListenerRegFn() func(context.Context, func(balancer.SubConnState)) func() {
+ if acbw.ccb.cc.dopts.disableHealthCheck {
+ return noOpRegisterHealthListenerFn
+ }
+ regHealthLisFn := internal.RegisterClientHealthCheckListener
+ if regHealthLisFn == nil {
+ // The health package is not imported.
+ return noOpRegisterHealthListenerFn
+ }
+ cfg := acbw.ac.cc.healthCheckConfig()
+ if cfg == nil {
+ return noOpRegisterHealthListenerFn
+ }
+ return func(ctx context.Context, listener func(balancer.SubConnState)) func() {
+ return regHealthLisFn.(healthProducerRegisterFn)(ctx, acbw, cfg.ServiceName, listener)
+ }
+}
+
// RegisterHealthListener accepts a health listener from the LB policy. It sends
// updates to the health listener as long as the SubConn's connectivity state
// doesn't change and a new health listener is not registered. To invalidate
@@ -421,6 +472,7 @@ func (acbw *acBalancerWrapper) closeProducers() {
func (acbw *acBalancerWrapper) RegisterHealthListener(listener func(balancer.SubConnState)) {
acbw.healthMu.Lock()
defer acbw.healthMu.Unlock()
+ acbw.healthData.closeHealthProducer()
// listeners should not be registered when the connectivity state
// isn't Ready. This may happen when the balancer registers a listener
// after the connectivityState is updated, but before it is notified
@@ -436,6 +488,7 @@ func (acbw *acBalancerWrapper) RegisterHealthListener(listener func(balancer.Sub
return
}
+ registerFn := acbw.healthListenerRegFn()
acbw.ccb.serializer.TrySchedule(func(ctx context.Context) {
if ctx.Err() != nil || acbw.ccb.balancer == nil {
return
@@ -443,10 +496,25 @@ func (acbw *acBalancerWrapper) RegisterHealthListener(listener func(balancer.Sub
// Don't send updates if a new listener is registered.
acbw.healthMu.Lock()
defer acbw.healthMu.Unlock()
- curHD := acbw.healthData
- if curHD != hd {
+ if acbw.healthData != hd {
return
}
- listener(balancer.SubConnState{ConnectivityState: connectivity.Ready})
+ // Serialize the health updates from the health producer with
+ // other calls into the LB policy.
+ listenerWrapper := func(scs balancer.SubConnState) {
+ acbw.ccb.serializer.TrySchedule(func(ctx context.Context) {
+ if ctx.Err() != nil || acbw.ccb.balancer == nil {
+ return
+ }
+ acbw.healthMu.Lock()
+ defer acbw.healthMu.Unlock()
+ if acbw.healthData != hd {
+ return
+ }
+ listener(scs)
+ })
+ }
+
+ hd.closeHealthProducer = registerFn(ctx, listenerWrapper)
})
}