summaryrefslogtreecommitdiff
path: root/vendor/google.golang.org/grpc/balancer_conn_wrappers.go
diff options
context:
space:
mode:
authorLibravatar dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>2023-09-18 13:47:28 +0100
committerLibravatar GitHub <noreply@github.com>2023-09-18 13:47:28 +0100
commitc6fdcd52fabb6984de280f763ec5dc2023613054 (patch)
tree939de6cc265fb0c73ef40c2129c8eb298fd93b0c /vendor/google.golang.org/grpc/balancer_conn_wrappers.go
parent[chore]: Bump github.com/miekg/dns from 1.1.55 to 1.1.56 (#2204) (diff)
downloadgotosocial-c6fdcd52fabb6984de280f763ec5dc2023613054.tar.xz
[chore]: Bump go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp from 1.17.0 to 1.18.0 (#2207)
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
Diffstat (limited to 'vendor/google.golang.org/grpc/balancer_conn_wrappers.go')
-rw-r--r--vendor/google.golang.org/grpc/balancer_conn_wrappers.go75
1 files changed, 35 insertions, 40 deletions
diff --git a/vendor/google.golang.org/grpc/balancer_conn_wrappers.go b/vendor/google.golang.org/grpc/balancer_conn_wrappers.go
index 04b9ad411..a4411c22b 100644
--- a/vendor/google.golang.org/grpc/balancer_conn_wrappers.go
+++ b/vendor/google.golang.org/grpc/balancer_conn_wrappers.go
@@ -99,20 +99,6 @@ func (ccb *ccBalancerWrapper) updateClientConnState(ccs *balancer.ClientConnStat
// lock held. But the lock guards only the scheduling part. The actual
// callback is called asynchronously without the lock being held.
ok := ccb.serializer.Schedule(func(_ context.Context) {
- // If the addresses specified in the update contain addresses of type
- // "grpclb" and the selected LB policy is not "grpclb", these addresses
- // will be filtered out and ccs will be modified with the updated
- // address list.
- if ccb.curBalancerName != grpclbName {
- var addrs []resolver.Address
- for _, addr := range ccs.ResolverState.Addresses {
- if addr.Type == resolver.GRPCLB {
- continue
- }
- addrs = append(addrs, addr)
- }
- ccs.ResolverState.Addresses = addrs
- }
errCh <- ccb.balancer.UpdateClientConnState(*ccs)
})
if !ok {
@@ -139,7 +125,9 @@ func (ccb *ccBalancerWrapper) updateClientConnState(ccs *balancer.ClientConnStat
func (ccb *ccBalancerWrapper) updateSubConnState(sc balancer.SubConn, s connectivity.State, err error) {
ccb.mu.Lock()
ccb.serializer.Schedule(func(_ context.Context) {
- ccb.balancer.UpdateSubConnState(sc, balancer.SubConnState{ConnectivityState: s, ConnectionError: err})
+ // Even though it is optional for balancers, gracefulswitch ensures
+ // opts.StateListener is set, so this cannot ever be nil.
+ sc.(*acBalancerWrapper).stateListener(balancer.SubConnState{ConnectivityState: s, ConnectionError: err})
})
ccb.mu.Unlock()
}
@@ -221,7 +209,7 @@ func (ccb *ccBalancerWrapper) closeBalancer(m ccbMode) {
}
ccb.mode = m
- done := ccb.serializer.Done
+ done := ccb.serializer.Done()
b := ccb.balancer
ok := ccb.serializer.Schedule(func(_ context.Context) {
// Close the serializer to ensure that no more calls from gRPC are sent
@@ -238,11 +226,9 @@ func (ccb *ccBalancerWrapper) closeBalancer(m ccbMode) {
}
ccb.mu.Unlock()
- // Give enqueued callbacks a chance to finish.
+ // Give enqueued callbacks a chance to finish before closing the balancer.
<-done
- // Spawn a goroutine to close the balancer (since it may block trying to
- // cleanup all allocated resources) and return early.
- go b.Close()
+ b.Close()
}
// exitIdleMode is invoked by grpc when the channel exits idle mode either
@@ -314,29 +300,19 @@ func (ccb *ccBalancerWrapper) NewSubConn(addrs []resolver.Address, opts balancer
channelz.Warningf(logger, ccb.cc.channelzID, "acBalancerWrapper: NewSubConn: failed to newAddrConn: %v", err)
return nil, err
}
- acbw := &acBalancerWrapper{ac: ac, producers: make(map[balancer.ProducerBuilder]*refCountedProducer)}
+ acbw := &acBalancerWrapper{
+ ccb: ccb,
+ ac: ac,
+ producers: make(map[balancer.ProducerBuilder]*refCountedProducer),
+ stateListener: opts.StateListener,
+ }
ac.acbw = acbw
return acbw, nil
}
func (ccb *ccBalancerWrapper) RemoveSubConn(sc balancer.SubConn) {
- if ccb.isIdleOrClosed() {
- // It it safe to ignore this call when the balancer is closed or in idle
- // because the ClientConn takes care of closing the connections.
- //
- // Not returning early from here when the balancer is closed or in idle
- // leads to a deadlock though, because of the following sequence of
- // calls when holding cc.mu:
- // cc.exitIdleMode --> ccb.enterIdleMode --> gsw.Close -->
- // ccb.RemoveAddrConn --> cc.removeAddrConn
- return
- }
-
- acbw, ok := sc.(*acBalancerWrapper)
- if !ok {
- return
- }
- ccb.cc.removeAddrConn(acbw.ac, errConnDrain)
+ // The graceful switch balancer will never call this.
+ logger.Errorf("ccb RemoveSubConn(%v) called unexpectedly, sc")
}
func (ccb *ccBalancerWrapper) UpdateAddresses(sc balancer.SubConn, addrs []resolver.Address) {
@@ -380,7 +356,9 @@ func (ccb *ccBalancerWrapper) Target() string {
// acBalancerWrapper is a wrapper on top of ac for balancers.
// It implements balancer.SubConn interface.
type acBalancerWrapper struct {
- ac *addrConn // read-only
+ ac *addrConn // read-only
+ ccb *ccBalancerWrapper // read-only
+ stateListener func(balancer.SubConnState)
mu sync.Mutex
producers map[balancer.ProducerBuilder]*refCountedProducer
@@ -398,6 +376,23 @@ func (acbw *acBalancerWrapper) Connect() {
go acbw.ac.connect()
}
+func (acbw *acBalancerWrapper) Shutdown() {
+ ccb := acbw.ccb
+ if ccb.isIdleOrClosed() {
+ // It it safe to ignore this call when the balancer is closed or in idle
+ // because the ClientConn takes care of closing the connections.
+ //
+ // Not returning early from here when the balancer is closed or in idle
+ // leads to a deadlock though, because of the following sequence of
+ // calls when holding cc.mu:
+ // cc.exitIdleMode --> ccb.enterIdleMode --> gsw.Close -->
+ // ccb.RemoveAddrConn --> cc.removeAddrConn
+ return
+ }
+
+ ccb.cc.removeAddrConn(acbw.ac, errConnDrain)
+}
+
// NewStream begins a streaming RPC on the addrConn. If the addrConn is not
// ready, blocks until it is or ctx expires. Returns an error when the context
// expires or the addrConn is shut down.
@@ -411,7 +406,7 @@ func (acbw *acBalancerWrapper) NewStream(ctx context.Context, desc *StreamDesc,
// Invoke performs a unary RPC. If the addrConn is not ready, returns
// errSubConnNotReady.
-func (acbw *acBalancerWrapper) Invoke(ctx context.Context, method string, args interface{}, reply interface{}, opts ...CallOption) error {
+func (acbw *acBalancerWrapper) Invoke(ctx context.Context, method string, args any, reply any, opts ...CallOption) error {
cs, err := acbw.NewStream(ctx, unaryStreamDesc, method, opts...)
if err != nil {
return err