diff options
author | 2023-09-18 13:47:28 +0100 | |
---|---|---|
committer | 2023-09-18 13:47:28 +0100 | |
commit | c6fdcd52fabb6984de280f763ec5dc2023613054 (patch) | |
tree | 939de6cc265fb0c73ef40c2129c8eb298fd93b0c /vendor/google.golang.org/grpc/balancer_conn_wrappers.go | |
parent | [chore]: Bump github.com/miekg/dns from 1.1.55 to 1.1.56 (#2204) (diff) | |
download | gotosocial-c6fdcd52fabb6984de280f763ec5dc2023613054.tar.xz |
[chore]: Bump go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp from 1.17.0 to 1.18.0 (#2207)
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
Diffstat (limited to 'vendor/google.golang.org/grpc/balancer_conn_wrappers.go')
-rw-r--r-- | vendor/google.golang.org/grpc/balancer_conn_wrappers.go | 75 |
1 files changed, 35 insertions, 40 deletions
diff --git a/vendor/google.golang.org/grpc/balancer_conn_wrappers.go b/vendor/google.golang.org/grpc/balancer_conn_wrappers.go index 04b9ad411..a4411c22b 100644 --- a/vendor/google.golang.org/grpc/balancer_conn_wrappers.go +++ b/vendor/google.golang.org/grpc/balancer_conn_wrappers.go @@ -99,20 +99,6 @@ func (ccb *ccBalancerWrapper) updateClientConnState(ccs *balancer.ClientConnStat // lock held. But the lock guards only the scheduling part. The actual // callback is called asynchronously without the lock being held. ok := ccb.serializer.Schedule(func(_ context.Context) { - // If the addresses specified in the update contain addresses of type - // "grpclb" and the selected LB policy is not "grpclb", these addresses - // will be filtered out and ccs will be modified with the updated - // address list. - if ccb.curBalancerName != grpclbName { - var addrs []resolver.Address - for _, addr := range ccs.ResolverState.Addresses { - if addr.Type == resolver.GRPCLB { - continue - } - addrs = append(addrs, addr) - } - ccs.ResolverState.Addresses = addrs - } errCh <- ccb.balancer.UpdateClientConnState(*ccs) }) if !ok { @@ -139,7 +125,9 @@ func (ccb *ccBalancerWrapper) updateClientConnState(ccs *balancer.ClientConnStat func (ccb *ccBalancerWrapper) updateSubConnState(sc balancer.SubConn, s connectivity.State, err error) { ccb.mu.Lock() ccb.serializer.Schedule(func(_ context.Context) { - ccb.balancer.UpdateSubConnState(sc, balancer.SubConnState{ConnectivityState: s, ConnectionError: err}) + // Even though it is optional for balancers, gracefulswitch ensures + // opts.StateListener is set, so this cannot ever be nil. + sc.(*acBalancerWrapper).stateListener(balancer.SubConnState{ConnectivityState: s, ConnectionError: err}) }) ccb.mu.Unlock() } @@ -221,7 +209,7 @@ func (ccb *ccBalancerWrapper) closeBalancer(m ccbMode) { } ccb.mode = m - done := ccb.serializer.Done + done := ccb.serializer.Done() b := ccb.balancer ok := ccb.serializer.Schedule(func(_ context.Context) { // Close the serializer to ensure that no more calls from gRPC are sent @@ -238,11 +226,9 @@ func (ccb *ccBalancerWrapper) closeBalancer(m ccbMode) { } ccb.mu.Unlock() - // Give enqueued callbacks a chance to finish. + // Give enqueued callbacks a chance to finish before closing the balancer. <-done - // Spawn a goroutine to close the balancer (since it may block trying to - // cleanup all allocated resources) and return early. - go b.Close() + b.Close() } // exitIdleMode is invoked by grpc when the channel exits idle mode either @@ -314,29 +300,19 @@ func (ccb *ccBalancerWrapper) NewSubConn(addrs []resolver.Address, opts balancer channelz.Warningf(logger, ccb.cc.channelzID, "acBalancerWrapper: NewSubConn: failed to newAddrConn: %v", err) return nil, err } - acbw := &acBalancerWrapper{ac: ac, producers: make(map[balancer.ProducerBuilder]*refCountedProducer)} + acbw := &acBalancerWrapper{ + ccb: ccb, + ac: ac, + producers: make(map[balancer.ProducerBuilder]*refCountedProducer), + stateListener: opts.StateListener, + } ac.acbw = acbw return acbw, nil } func (ccb *ccBalancerWrapper) RemoveSubConn(sc balancer.SubConn) { - if ccb.isIdleOrClosed() { - // It it safe to ignore this call when the balancer is closed or in idle - // because the ClientConn takes care of closing the connections. - // - // Not returning early from here when the balancer is closed or in idle - // leads to a deadlock though, because of the following sequence of - // calls when holding cc.mu: - // cc.exitIdleMode --> ccb.enterIdleMode --> gsw.Close --> - // ccb.RemoveAddrConn --> cc.removeAddrConn - return - } - - acbw, ok := sc.(*acBalancerWrapper) - if !ok { - return - } - ccb.cc.removeAddrConn(acbw.ac, errConnDrain) + // The graceful switch balancer will never call this. + logger.Errorf("ccb RemoveSubConn(%v) called unexpectedly, sc") } func (ccb *ccBalancerWrapper) UpdateAddresses(sc balancer.SubConn, addrs []resolver.Address) { @@ -380,7 +356,9 @@ func (ccb *ccBalancerWrapper) Target() string { // acBalancerWrapper is a wrapper on top of ac for balancers. // It implements balancer.SubConn interface. type acBalancerWrapper struct { - ac *addrConn // read-only + ac *addrConn // read-only + ccb *ccBalancerWrapper // read-only + stateListener func(balancer.SubConnState) mu sync.Mutex producers map[balancer.ProducerBuilder]*refCountedProducer @@ -398,6 +376,23 @@ func (acbw *acBalancerWrapper) Connect() { go acbw.ac.connect() } +func (acbw *acBalancerWrapper) Shutdown() { + ccb := acbw.ccb + if ccb.isIdleOrClosed() { + // It it safe to ignore this call when the balancer is closed or in idle + // because the ClientConn takes care of closing the connections. + // + // Not returning early from here when the balancer is closed or in idle + // leads to a deadlock though, because of the following sequence of + // calls when holding cc.mu: + // cc.exitIdleMode --> ccb.enterIdleMode --> gsw.Close --> + // ccb.RemoveAddrConn --> cc.removeAddrConn + return + } + + ccb.cc.removeAddrConn(acbw.ac, errConnDrain) +} + // NewStream begins a streaming RPC on the addrConn. If the addrConn is not // ready, blocks until it is or ctx expires. Returns an error when the context // expires or the addrConn is shut down. @@ -411,7 +406,7 @@ func (acbw *acBalancerWrapper) NewStream(ctx context.Context, desc *StreamDesc, // Invoke performs a unary RPC. If the addrConn is not ready, returns // errSubConnNotReady. -func (acbw *acBalancerWrapper) Invoke(ctx context.Context, method string, args interface{}, reply interface{}, opts ...CallOption) error { +func (acbw *acBalancerWrapper) Invoke(ctx context.Context, method string, args any, reply any, opts ...CallOption) error { cs, err := acbw.NewStream(ctx, unaryStreamDesc, method, opts...) if err != nil { return err |