summaryrefslogtreecommitdiff
path: root/vendor/google.golang.org/grpc/internal/balancer/gracefulswitch/gracefulswitch.go
diff options
context:
space:
mode:
Diffstat (limited to 'vendor/google.golang.org/grpc/internal/balancer/gracefulswitch/gracefulswitch.go')
-rw-r--r--vendor/google.golang.org/grpc/internal/balancer/gracefulswitch/gracefulswitch.go59
1 files changed, 30 insertions, 29 deletions
diff --git a/vendor/google.golang.org/grpc/internal/balancer/gracefulswitch/gracefulswitch.go b/vendor/google.golang.org/grpc/internal/balancer/gracefulswitch/gracefulswitch.go
index 08666f62a..3c594e6e4 100644
--- a/vendor/google.golang.org/grpc/internal/balancer/gracefulswitch/gracefulswitch.go
+++ b/vendor/google.golang.org/grpc/internal/balancer/gracefulswitch/gracefulswitch.go
@@ -200,8 +200,8 @@ func (gsb *Balancer) ExitIdle() {
}
}
-// UpdateSubConnState forwards the update to the appropriate child.
-func (gsb *Balancer) UpdateSubConnState(sc balancer.SubConn, state balancer.SubConnState) {
+// updateSubConnState forwards the update to the appropriate child.
+func (gsb *Balancer) updateSubConnState(sc balancer.SubConn, state balancer.SubConnState, cb func(balancer.SubConnState)) {
gsb.currentMu.Lock()
defer gsb.currentMu.Unlock()
gsb.mu.Lock()
@@ -214,13 +214,26 @@ func (gsb *Balancer) UpdateSubConnState(sc balancer.SubConn, state balancer.SubC
} else if gsb.balancerPending != nil && gsb.balancerPending.subconns[sc] {
balToUpdate = gsb.balancerPending
}
- gsb.mu.Unlock()
if balToUpdate == nil {
// SubConn belonged to a stale lb policy that has not yet fully closed,
// or the balancer was already closed.
+ gsb.mu.Unlock()
return
}
- balToUpdate.UpdateSubConnState(sc, state)
+ if state.ConnectivityState == connectivity.Shutdown {
+ delete(balToUpdate.subconns, sc)
+ }
+ gsb.mu.Unlock()
+ if cb != nil {
+ cb(state)
+ } else {
+ balToUpdate.UpdateSubConnState(sc, state)
+ }
+}
+
+// UpdateSubConnState forwards the update to the appropriate child.
+func (gsb *Balancer) UpdateSubConnState(sc balancer.SubConn, state balancer.SubConnState) {
+ gsb.updateSubConnState(sc, state, nil)
}
// Close closes any active child balancers.
@@ -242,7 +255,7 @@ func (gsb *Balancer) Close() {
//
// It implements the balancer.ClientConn interface and is passed down in that
// capacity to the wrapped balancer. It maintains a set of subConns created by
-// the wrapped balancer and calls from the latter to create/update/remove
+// the wrapped balancer and calls from the latter to create/update/shutdown
// SubConns update this set before being forwarded to the parent ClientConn.
// State updates from the wrapped balancer can result in invocation of the
// graceful switch logic.
@@ -254,21 +267,10 @@ type balancerWrapper struct {
subconns map[balancer.SubConn]bool // subconns created by this balancer
}
-func (bw *balancerWrapper) UpdateSubConnState(sc balancer.SubConn, state balancer.SubConnState) {
- if state.ConnectivityState == connectivity.Shutdown {
- bw.gsb.mu.Lock()
- delete(bw.subconns, sc)
- bw.gsb.mu.Unlock()
- }
- // There is no need to protect this read with a mutex, as the write to the
- // Balancer field happens in SwitchTo, which completes before this can be
- // called.
- bw.Balancer.UpdateSubConnState(sc, state)
-}
-
-// Close closes the underlying LB policy and removes the subconns it created. bw
-// must not be referenced via balancerCurrent or balancerPending in gsb when
-// called. gsb.mu must not be held. Does not panic with a nil receiver.
+// Close closes the underlying LB policy and shuts down the subconns it
+// created. bw must not be referenced via balancerCurrent or balancerPending in
+// gsb when called. gsb.mu must not be held. Does not panic with a nil
+// receiver.
func (bw *balancerWrapper) Close() {
// before Close is called.
if bw == nil {
@@ -281,7 +283,7 @@ func (bw *balancerWrapper) Close() {
bw.Balancer.Close()
bw.gsb.mu.Lock()
for sc := range bw.subconns {
- bw.gsb.cc.RemoveSubConn(sc)
+ sc.Shutdown()
}
bw.gsb.mu.Unlock()
}
@@ -335,13 +337,16 @@ func (bw *balancerWrapper) NewSubConn(addrs []resolver.Address, opts balancer.Ne
}
bw.gsb.mu.Unlock()
+ var sc balancer.SubConn
+ oldListener := opts.StateListener
+ opts.StateListener = func(state balancer.SubConnState) { bw.gsb.updateSubConnState(sc, state, oldListener) }
sc, err := bw.gsb.cc.NewSubConn(addrs, opts)
if err != nil {
return nil, err
}
bw.gsb.mu.Lock()
if !bw.gsb.balancerCurrentOrPending(bw) { // balancer was closed during this call
- bw.gsb.cc.RemoveSubConn(sc)
+ sc.Shutdown()
bw.gsb.mu.Unlock()
return nil, fmt.Errorf("%T at address %p that called NewSubConn is deleted", bw, bw)
}
@@ -360,13 +365,9 @@ func (bw *balancerWrapper) ResolveNow(opts resolver.ResolveNowOptions) {
}
func (bw *balancerWrapper) RemoveSubConn(sc balancer.SubConn) {
- bw.gsb.mu.Lock()
- if !bw.gsb.balancerCurrentOrPending(bw) {
- bw.gsb.mu.Unlock()
- return
- }
- bw.gsb.mu.Unlock()
- bw.gsb.cc.RemoveSubConn(sc)
+ // Note: existing third party balancers may call this, so it must remain
+ // until RemoveSubConn is fully removed.
+ sc.Shutdown()
}
func (bw *balancerWrapper) UpdateAddresses(sc balancer.SubConn, addrs []resolver.Address) {