diff options
Diffstat (limited to 'vendor/google.golang.org/grpc/balancer/endpointsharding')
| -rw-r--r-- | vendor/google.golang.org/grpc/balancer/endpointsharding/endpointsharding.go | 389 |
1 files changed, 0 insertions, 389 deletions
diff --git a/vendor/google.golang.org/grpc/balancer/endpointsharding/endpointsharding.go b/vendor/google.golang.org/grpc/balancer/endpointsharding/endpointsharding.go deleted file mode 100644 index 360db08eb..000000000 --- a/vendor/google.golang.org/grpc/balancer/endpointsharding/endpointsharding.go +++ /dev/null @@ -1,389 +0,0 @@ -/* - * - * Copyright 2024 gRPC authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -// Package endpointsharding implements a load balancing policy that manages -// homogeneous child policies each owning a single endpoint. -// -// # Experimental -// -// Notice: This package is EXPERIMENTAL and may be changed or removed in a -// later release. -package endpointsharding - -import ( - "errors" - rand "math/rand/v2" - "sync" - "sync/atomic" - - "google.golang.org/grpc/balancer" - "google.golang.org/grpc/balancer/base" - "google.golang.org/grpc/connectivity" - "google.golang.org/grpc/resolver" -) - -var randIntN = rand.IntN - -// ChildState is the balancer state of a child along with the endpoint which -// identifies the child balancer. -type ChildState struct { - Endpoint resolver.Endpoint - State balancer.State - - // Balancer exposes only the ExitIdler interface of the child LB policy. - // Other methods of the child policy are called only by endpointsharding. - Balancer ExitIdler -} - -// ExitIdler provides access to only the ExitIdle method of the child balancer. -type ExitIdler interface { - // ExitIdle instructs the LB policy to reconnect to backends / exit the - // IDLE state, if appropriate and possible. Note that SubConns that enter - // the IDLE state will not reconnect until SubConn.Connect is called. - ExitIdle() -} - -// Options are the options to configure the behaviour of the -// endpointsharding balancer. -type Options struct { - // DisableAutoReconnect allows the balancer to keep child balancer in the - // IDLE state until they are explicitly triggered to exit using the - // ChildState obtained from the endpointsharding picker. When set to false, - // the endpointsharding balancer will automatically call ExitIdle on child - // connections that report IDLE. - DisableAutoReconnect bool -} - -// ChildBuilderFunc creates a new balancer with the ClientConn. It has the same -// type as the balancer.Builder.Build method. -type ChildBuilderFunc func(cc balancer.ClientConn, opts balancer.BuildOptions) balancer.Balancer - -// NewBalancer returns a load balancing policy that manages homogeneous child -// policies each owning a single endpoint. The endpointsharding balancer -// forwards the LoadBalancingConfig in ClientConn state updates to its children. -func NewBalancer(cc balancer.ClientConn, opts balancer.BuildOptions, childBuilder ChildBuilderFunc, esOpts Options) balancer.Balancer { - es := &endpointSharding{ - cc: cc, - bOpts: opts, - esOpts: esOpts, - childBuilder: childBuilder, - } - es.children.Store(resolver.NewEndpointMap[*balancerWrapper]()) - return es -} - -// endpointSharding is a balancer that wraps child balancers. It creates a child -// balancer with child config for every unique Endpoint received. It updates the -// child states on any update from parent or child. -type endpointSharding struct { - cc balancer.ClientConn - bOpts balancer.BuildOptions - esOpts Options - childBuilder ChildBuilderFunc - - // childMu synchronizes calls to any single child. It must be held for all - // calls into a child. To avoid deadlocks, do not acquire childMu while - // holding mu. - childMu sync.Mutex - children atomic.Pointer[resolver.EndpointMap[*balancerWrapper]] - - // inhibitChildUpdates is set during UpdateClientConnState/ResolverError - // calls (calls to children will each produce an update, only want one - // update). - inhibitChildUpdates atomic.Bool - - // mu synchronizes access to the state stored in balancerWrappers in the - // children field. mu must not be held during calls into a child since - // synchronous calls back from the child may require taking mu, causing a - // deadlock. To avoid deadlocks, do not acquire childMu while holding mu. - mu sync.Mutex -} - -// rotateEndpoints returns a slice of all the input endpoints rotated a random -// amount. -func rotateEndpoints(es []resolver.Endpoint) []resolver.Endpoint { - les := len(es) - if les == 0 { - return es - } - r := randIntN(les) - // Make a copy to avoid mutating data beyond the end of es. - ret := make([]resolver.Endpoint, les) - copy(ret, es[r:]) - copy(ret[les-r:], es[:r]) - return ret -} - -// UpdateClientConnState creates a child for new endpoints and deletes children -// for endpoints that are no longer present. It also updates all the children, -// and sends a single synchronous update of the childrens' aggregated state at -// the end of the UpdateClientConnState operation. If any endpoint has no -// addresses it will ignore that endpoint. Otherwise, returns first error found -// from a child, but fully processes the new update. -func (es *endpointSharding) UpdateClientConnState(state balancer.ClientConnState) error { - es.childMu.Lock() - defer es.childMu.Unlock() - - es.inhibitChildUpdates.Store(true) - defer func() { - es.inhibitChildUpdates.Store(false) - es.updateState() - }() - var ret error - - children := es.children.Load() - newChildren := resolver.NewEndpointMap[*balancerWrapper]() - - // Update/Create new children. - for _, endpoint := range rotateEndpoints(state.ResolverState.Endpoints) { - if _, ok := newChildren.Get(endpoint); ok { - // Endpoint child was already created, continue to avoid duplicate - // update. - continue - } - childBalancer, ok := children.Get(endpoint) - if ok { - // Endpoint attributes may have changed, update the stored endpoint. - es.mu.Lock() - childBalancer.childState.Endpoint = endpoint - es.mu.Unlock() - } else { - childBalancer = &balancerWrapper{ - childState: ChildState{Endpoint: endpoint}, - ClientConn: es.cc, - es: es, - } - childBalancer.childState.Balancer = childBalancer - childBalancer.child = es.childBuilder(childBalancer, es.bOpts) - } - newChildren.Set(endpoint, childBalancer) - if err := childBalancer.updateClientConnStateLocked(balancer.ClientConnState{ - BalancerConfig: state.BalancerConfig, - ResolverState: resolver.State{ - Endpoints: []resolver.Endpoint{endpoint}, - Attributes: state.ResolverState.Attributes, - }, - }); err != nil && ret == nil { - // Return first error found, and always commit full processing of - // updating children. If desired to process more specific errors - // across all endpoints, caller should make these specific - // validations, this is a current limitation for simplicity sake. - ret = err - } - } - // Delete old children that are no longer present. - for _, e := range children.Keys() { - child, _ := children.Get(e) - if _, ok := newChildren.Get(e); !ok { - child.closeLocked() - } - } - es.children.Store(newChildren) - if newChildren.Len() == 0 { - return balancer.ErrBadResolverState - } - return ret -} - -// ResolverError forwards the resolver error to all of the endpointSharding's -// children and sends a single synchronous update of the childStates at the end -// of the ResolverError operation. -func (es *endpointSharding) ResolverError(err error) { - es.childMu.Lock() - defer es.childMu.Unlock() - es.inhibitChildUpdates.Store(true) - defer func() { - es.inhibitChildUpdates.Store(false) - es.updateState() - }() - children := es.children.Load() - for _, child := range children.Values() { - child.resolverErrorLocked(err) - } -} - -func (es *endpointSharding) UpdateSubConnState(balancer.SubConn, balancer.SubConnState) { - // UpdateSubConnState is deprecated. -} - -func (es *endpointSharding) Close() { - es.childMu.Lock() - defer es.childMu.Unlock() - children := es.children.Load() - for _, child := range children.Values() { - child.closeLocked() - } -} - -func (es *endpointSharding) ExitIdle() { - es.childMu.Lock() - defer es.childMu.Unlock() - for _, bw := range es.children.Load().Values() { - if !bw.isClosed { - bw.child.ExitIdle() - } - } -} - -// updateState updates this component's state. It sends the aggregated state, -// and a picker with round robin behavior with all the child states present if -// needed. -func (es *endpointSharding) updateState() { - if es.inhibitChildUpdates.Load() { - return - } - var readyPickers, connectingPickers, idlePickers, transientFailurePickers []balancer.Picker - - es.mu.Lock() - defer es.mu.Unlock() - - children := es.children.Load() - childStates := make([]ChildState, 0, children.Len()) - - for _, child := range children.Values() { - childState := child.childState - childStates = append(childStates, childState) - childPicker := childState.State.Picker - switch childState.State.ConnectivityState { - case connectivity.Ready: - readyPickers = append(readyPickers, childPicker) - case connectivity.Connecting: - connectingPickers = append(connectingPickers, childPicker) - case connectivity.Idle: - idlePickers = append(idlePickers, childPicker) - case connectivity.TransientFailure: - transientFailurePickers = append(transientFailurePickers, childPicker) - // connectivity.Shutdown shouldn't appear. - } - } - - // Construct the round robin picker based off the aggregated state. Whatever - // the aggregated state, use the pickers present that are currently in that - // state only. - var aggState connectivity.State - var pickers []balancer.Picker - if len(readyPickers) >= 1 { - aggState = connectivity.Ready - pickers = readyPickers - } else if len(connectingPickers) >= 1 { - aggState = connectivity.Connecting - pickers = connectingPickers - } else if len(idlePickers) >= 1 { - aggState = connectivity.Idle - pickers = idlePickers - } else if len(transientFailurePickers) >= 1 { - aggState = connectivity.TransientFailure - pickers = transientFailurePickers - } else { - aggState = connectivity.TransientFailure - pickers = []balancer.Picker{base.NewErrPicker(errors.New("no children to pick from"))} - } // No children (resolver error before valid update). - p := &pickerWithChildStates{ - pickers: pickers, - childStates: childStates, - next: uint32(randIntN(len(pickers))), - } - es.cc.UpdateState(balancer.State{ - ConnectivityState: aggState, - Picker: p, - }) -} - -// pickerWithChildStates delegates to the pickers it holds in a round robin -// fashion. It also contains the childStates of all the endpointSharding's -// children. -type pickerWithChildStates struct { - pickers []balancer.Picker - childStates []ChildState - next uint32 -} - -func (p *pickerWithChildStates) Pick(info balancer.PickInfo) (balancer.PickResult, error) { - nextIndex := atomic.AddUint32(&p.next, 1) - picker := p.pickers[nextIndex%uint32(len(p.pickers))] - return picker.Pick(info) -} - -// ChildStatesFromPicker returns the state of all the children managed by the -// endpoint sharding balancer that created this picker. -func ChildStatesFromPicker(picker balancer.Picker) []ChildState { - p, ok := picker.(*pickerWithChildStates) - if !ok { - return nil - } - return p.childStates -} - -// balancerWrapper is a wrapper of a balancer. It ID's a child balancer by -// endpoint, and persists recent child balancer state. -type balancerWrapper struct { - // The following fields are initialized at build time and read-only after - // that and therefore do not need to be guarded by a mutex. - - // child contains the wrapped balancer. Access its methods only through - // methods on balancerWrapper to ensure proper synchronization - child balancer.Balancer - balancer.ClientConn // embed to intercept UpdateState, doesn't deal with SubConns - - es *endpointSharding - - // Access to the following fields is guarded by es.mu. - - childState ChildState - isClosed bool -} - -func (bw *balancerWrapper) UpdateState(state balancer.State) { - bw.es.mu.Lock() - bw.childState.State = state - bw.es.mu.Unlock() - if state.ConnectivityState == connectivity.Idle && !bw.es.esOpts.DisableAutoReconnect { - bw.ExitIdle() - } - bw.es.updateState() -} - -// ExitIdle pings an IDLE child balancer to exit idle in a new goroutine to -// avoid deadlocks due to synchronous balancer state updates. -func (bw *balancerWrapper) ExitIdle() { - go func() { - bw.es.childMu.Lock() - if !bw.isClosed { - bw.child.ExitIdle() - } - bw.es.childMu.Unlock() - }() -} - -// updateClientConnStateLocked delivers the ClientConnState to the child -// balancer. Callers must hold the child mutex of the parent endpointsharding -// balancer. -func (bw *balancerWrapper) updateClientConnStateLocked(ccs balancer.ClientConnState) error { - return bw.child.UpdateClientConnState(ccs) -} - -// closeLocked closes the child balancer. Callers must hold the child mutext of -// the parent endpointsharding balancer. -func (bw *balancerWrapper) closeLocked() { - bw.child.Close() - bw.isClosed = true -} - -func (bw *balancerWrapper) resolverErrorLocked(err error) { - bw.child.ResolverError(err) -} |
