summaryrefslogtreecommitdiff
path: root/vendor/google.golang.org/grpc/balancer_wrapper.go
diff options
context:
space:
mode:
authorLibravatar Dominik Süß <dominik@suess.wtf>2025-02-06 12:14:37 +0100
committerLibravatar GitHub <noreply@github.com>2025-02-06 12:14:37 +0100
commitdd094e401282e135989f57c0ca3dee7dea3f5207 (patch)
tree74cb77830f621840273255a17565ced73b4fa997 /vendor/google.golang.org/grpc/balancer_wrapper.go
parent[feature] Use `X-Robots-Tag` headers to instruct scrapers/crawlers (#3737) (diff)
downloadgotosocial-dd094e401282e135989f57c0ca3dee7dea3f5207.tar.xz
[chore] update otel libraries (#3740)
* chore: update otel dependencies * refactor: combine tracing & metrics in observability package * chore: update example tracing compose file
Diffstat (limited to 'vendor/google.golang.org/grpc/balancer_wrapper.go')
-rw-r--r--vendor/google.golang.org/grpc/balancer_wrapper.go131
1 files changed, 109 insertions, 22 deletions
diff --git a/vendor/google.golang.org/grpc/balancer_wrapper.go b/vendor/google.golang.org/grpc/balancer_wrapper.go
index 6561b769e..905817b5f 100644
--- a/vendor/google.golang.org/grpc/balancer_wrapper.go
+++ b/vendor/google.golang.org/grpc/balancer_wrapper.go
@@ -24,12 +24,14 @@ import (
"sync"
"google.golang.org/grpc/balancer"
+ "google.golang.org/grpc/codes"
"google.golang.org/grpc/connectivity"
"google.golang.org/grpc/internal"
"google.golang.org/grpc/internal/balancer/gracefulswitch"
"google.golang.org/grpc/internal/channelz"
"google.golang.org/grpc/internal/grpcsync"
"google.golang.org/grpc/resolver"
+ "google.golang.org/grpc/status"
)
var setConnectedAddress = internal.SetConnectedAddress.(func(*balancer.SubConnState, resolver.Address))
@@ -187,12 +189,13 @@ func (ccb *ccBalancerWrapper) NewSubConn(addrs []resolver.Address, opts balancer
ac: ac,
producers: make(map[balancer.ProducerBuilder]*refCountedProducer),
stateListener: opts.StateListener,
+ healthData: newHealthData(connectivity.Idle),
}
ac.acbw = acbw
return acbw, nil
}
-func (ccb *ccBalancerWrapper) RemoveSubConn(sc balancer.SubConn) {
+func (ccb *ccBalancerWrapper) RemoveSubConn(balancer.SubConn) {
// The graceful switch balancer will never call this.
logger.Errorf("ccb RemoveSubConn(%v) called unexpectedly, sc")
}
@@ -252,12 +255,32 @@ func (ccb *ccBalancerWrapper) Target() string {
// acBalancerWrapper is a wrapper on top of ac for balancers.
// It implements balancer.SubConn interface.
type acBalancerWrapper struct {
+ internal.EnforceSubConnEmbedding
ac *addrConn // read-only
ccb *ccBalancerWrapper // read-only
stateListener func(balancer.SubConnState)
- mu sync.Mutex
- producers map[balancer.ProducerBuilder]*refCountedProducer
+ producersMu sync.Mutex
+ producers map[balancer.ProducerBuilder]*refCountedProducer
+
+ // Access to healthData is protected by healthMu.
+ healthMu sync.Mutex
+ // healthData is stored as a pointer to detect when the health listener is
+ // dropped or updated. This is required as closures can't be compared for
+ // equality.
+ healthData *healthData
+}
+
+// healthData holds data related to health state reporting.
+type healthData struct {
+ // connectivityState stores the most recent connectivity state delivered
+ // to the LB policy. This is stored to avoid sending updates when the
+ // SubConn has already exited connectivity state READY.
+ connectivityState connectivity.State
+}
+
+func newHealthData(s connectivity.State) *healthData {
+ return &healthData{connectivityState: s}
}
// updateState is invoked by grpc to push a subConn state update to the
@@ -267,6 +290,9 @@ func (acbw *acBalancerWrapper) updateState(s connectivity.State, curAddr resolve
if ctx.Err() != nil || acbw.ccb.balancer == nil {
return
}
+ // Invalidate all producers on any state change.
+ acbw.closeProducers()
+
// Even though it is optional for balancers, gracefulswitch ensures
// opts.StateListener is set, so this cannot ever be nil.
// TODO: delete this comment when UpdateSubConnState is removed.
@@ -274,17 +300,25 @@ func (acbw *acBalancerWrapper) updateState(s connectivity.State, curAddr resolve
if s == connectivity.Ready {
setConnectedAddress(&scs, curAddr)
}
+ // Invalidate the health listener by updating the healthData.
+ acbw.healthMu.Lock()
+ // A race may occur if a health listener is registered soon after the
+ // connectivity state is set but before the stateListener is called.
+ // Two cases may arise:
+ // 1. The new state is not READY: RegisterHealthListener has checks to
+ // ensure no updates are sent when the connectivity state is not
+ // READY.
+ // 2. The new state is READY: This means that the old state wasn't Ready.
+ // The RegisterHealthListener API mentions that a health listener
+ // must not be registered when a SubConn is not ready to avoid such
+ // races. When this happens, the LB policy would get health updates
+ // on the old listener. When the LB policy registers a new listener
+ // on receiving the connectivity update, the health updates will be
+ // sent to the new health listener.
+ acbw.healthData = newHealthData(scs.ConnectivityState)
+ acbw.healthMu.Unlock()
+
acbw.stateListener(scs)
- acbw.ac.mu.Lock()
- defer acbw.ac.mu.Unlock()
- if s == connectivity.Ready {
- // When changing states to READY, reset stateReadyChan. Wait until
- // after we notify the LB policy's listener(s) in order to prevent
- // ac.getTransport() from unblocking before the LB policy starts
- // tracking the subchannel as READY.
- close(acbw.ac.stateReadyChan)
- acbw.ac.stateReadyChan = make(chan struct{})
- }
})
}
@@ -301,6 +335,7 @@ func (acbw *acBalancerWrapper) Connect() {
}
func (acbw *acBalancerWrapper) Shutdown() {
+ acbw.closeProducers()
acbw.ccb.cc.removeAddrConn(acbw.ac, errConnDrain)
}
@@ -308,9 +343,10 @@ func (acbw *acBalancerWrapper) Shutdown() {
// ready, blocks until it is or ctx expires. Returns an error when the context
// expires or the addrConn is shut down.
func (acbw *acBalancerWrapper) NewStream(ctx context.Context, desc *StreamDesc, method string, opts ...CallOption) (ClientStream, error) {
- transport, err := acbw.ac.getTransport(ctx)
- if err != nil {
- return nil, err
+ transport := acbw.ac.getReadyTransport()
+ if transport == nil {
+ return nil, status.Errorf(codes.Unavailable, "SubConn state is not Ready")
+
}
return newNonRetryClientStream(ctx, desc, method, transport, acbw.ac, opts...)
}
@@ -335,15 +371,15 @@ type refCountedProducer struct {
}
func (acbw *acBalancerWrapper) GetOrBuildProducer(pb balancer.ProducerBuilder) (balancer.Producer, func()) {
- acbw.mu.Lock()
- defer acbw.mu.Unlock()
+ acbw.producersMu.Lock()
+ defer acbw.producersMu.Unlock()
// Look up existing producer from this builder.
pData := acbw.producers[pb]
if pData == nil {
// Not found; create a new one and add it to the producers map.
- p, close := pb.Build(acbw)
- pData = &refCountedProducer{producer: p, close: close}
+ p, closeFn := pb.Build(acbw)
+ pData = &refCountedProducer{producer: p, close: closeFn}
acbw.producers[pb] = pData
}
// Account for this new reference.
@@ -353,13 +389,64 @@ func (acbw *acBalancerWrapper) GetOrBuildProducer(pb balancer.ProducerBuilder) (
// and delete the refCountedProducer from the map if the total reference
// count goes to zero.
unref := func() {
- acbw.mu.Lock()
+ acbw.producersMu.Lock()
+ // If closeProducers has already closed this producer instance, refs is
+ // set to 0, so the check after decrementing will never pass, and the
+ // producer will not be double-closed.
pData.refs--
if pData.refs == 0 {
defer pData.close() // Run outside the acbw mutex
delete(acbw.producers, pb)
}
- acbw.mu.Unlock()
+ acbw.producersMu.Unlock()
}
return pData.producer, grpcsync.OnceFunc(unref)
}
+
+func (acbw *acBalancerWrapper) closeProducers() {
+ acbw.producersMu.Lock()
+ defer acbw.producersMu.Unlock()
+ for pb, pData := range acbw.producers {
+ pData.refs = 0
+ pData.close()
+ delete(acbw.producers, pb)
+ }
+}
+
+// RegisterHealthListener accepts a health listener from the LB policy. It sends
+// updates to the health listener as long as the SubConn's connectivity state
+// doesn't change and a new health listener is not registered. To invalidate
+// the currently registered health listener, acbw updates the healthData. If a
+// nil listener is registered, the active health listener is dropped.
+func (acbw *acBalancerWrapper) RegisterHealthListener(listener func(balancer.SubConnState)) {
+ acbw.healthMu.Lock()
+ defer acbw.healthMu.Unlock()
+ // listeners should not be registered when the connectivity state
+ // isn't Ready. This may happen when the balancer registers a listener
+ // after the connectivityState is updated, but before it is notified
+ // of the update.
+ if acbw.healthData.connectivityState != connectivity.Ready {
+ return
+ }
+ // Replace the health data to stop sending updates to any previously
+ // registered health listeners.
+ hd := newHealthData(connectivity.Ready)
+ acbw.healthData = hd
+ if listener == nil {
+ return
+ }
+
+ acbw.ccb.serializer.TrySchedule(func(ctx context.Context) {
+ if ctx.Err() != nil || acbw.ccb.balancer == nil {
+ return
+ }
+ // Don't send updates if a new listener is registered.
+ acbw.healthMu.Lock()
+ defer acbw.healthMu.Unlock()
+ curHD := acbw.healthData
+ if curHD != hd {
+ return
+ }
+ listener(balancer.SubConnState{ConnectivityState: connectivity.Ready})
+ })
+}