summaryrefslogtreecommitdiff
path: root/vendor/google.golang.org/grpc/balancer/endpointsharding
diff options
context:
space:
mode:
Diffstat (limited to 'vendor/google.golang.org/grpc/balancer/endpointsharding')
-rw-r--r--vendor/google.golang.org/grpc/balancer/endpointsharding/endpointsharding.go389
1 files changed, 0 insertions, 389 deletions
diff --git a/vendor/google.golang.org/grpc/balancer/endpointsharding/endpointsharding.go b/vendor/google.golang.org/grpc/balancer/endpointsharding/endpointsharding.go
deleted file mode 100644
index 360db08eb..000000000
--- a/vendor/google.golang.org/grpc/balancer/endpointsharding/endpointsharding.go
+++ /dev/null
@@ -1,389 +0,0 @@
-/*
- *
- * Copyright 2024 gRPC authors.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-// Package endpointsharding implements a load balancing policy that manages
-// homogeneous child policies each owning a single endpoint.
-//
-// # Experimental
-//
-// Notice: This package is EXPERIMENTAL and may be changed or removed in a
-// later release.
-package endpointsharding
-
-import (
- "errors"
- rand "math/rand/v2"
- "sync"
- "sync/atomic"
-
- "google.golang.org/grpc/balancer"
- "google.golang.org/grpc/balancer/base"
- "google.golang.org/grpc/connectivity"
- "google.golang.org/grpc/resolver"
-)
-
-var randIntN = rand.IntN
-
-// ChildState is the balancer state of a child along with the endpoint which
-// identifies the child balancer.
-type ChildState struct {
- Endpoint resolver.Endpoint
- State balancer.State
-
- // Balancer exposes only the ExitIdler interface of the child LB policy.
- // Other methods of the child policy are called only by endpointsharding.
- Balancer ExitIdler
-}
-
-// ExitIdler provides access to only the ExitIdle method of the child balancer.
-type ExitIdler interface {
- // ExitIdle instructs the LB policy to reconnect to backends / exit the
- // IDLE state, if appropriate and possible. Note that SubConns that enter
- // the IDLE state will not reconnect until SubConn.Connect is called.
- ExitIdle()
-}
-
-// Options are the options to configure the behaviour of the
-// endpointsharding balancer.
-type Options struct {
- // DisableAutoReconnect allows the balancer to keep child balancer in the
- // IDLE state until they are explicitly triggered to exit using the
- // ChildState obtained from the endpointsharding picker. When set to false,
- // the endpointsharding balancer will automatically call ExitIdle on child
- // connections that report IDLE.
- DisableAutoReconnect bool
-}
-
-// ChildBuilderFunc creates a new balancer with the ClientConn. It has the same
-// type as the balancer.Builder.Build method.
-type ChildBuilderFunc func(cc balancer.ClientConn, opts balancer.BuildOptions) balancer.Balancer
-
-// NewBalancer returns a load balancing policy that manages homogeneous child
-// policies each owning a single endpoint. The endpointsharding balancer
-// forwards the LoadBalancingConfig in ClientConn state updates to its children.
-func NewBalancer(cc balancer.ClientConn, opts balancer.BuildOptions, childBuilder ChildBuilderFunc, esOpts Options) balancer.Balancer {
- es := &endpointSharding{
- cc: cc,
- bOpts: opts,
- esOpts: esOpts,
- childBuilder: childBuilder,
- }
- es.children.Store(resolver.NewEndpointMap[*balancerWrapper]())
- return es
-}
-
-// endpointSharding is a balancer that wraps child balancers. It creates a child
-// balancer with child config for every unique Endpoint received. It updates the
-// child states on any update from parent or child.
-type endpointSharding struct {
- cc balancer.ClientConn
- bOpts balancer.BuildOptions
- esOpts Options
- childBuilder ChildBuilderFunc
-
- // childMu synchronizes calls to any single child. It must be held for all
- // calls into a child. To avoid deadlocks, do not acquire childMu while
- // holding mu.
- childMu sync.Mutex
- children atomic.Pointer[resolver.EndpointMap[*balancerWrapper]]
-
- // inhibitChildUpdates is set during UpdateClientConnState/ResolverError
- // calls (calls to children will each produce an update, only want one
- // update).
- inhibitChildUpdates atomic.Bool
-
- // mu synchronizes access to the state stored in balancerWrappers in the
- // children field. mu must not be held during calls into a child since
- // synchronous calls back from the child may require taking mu, causing a
- // deadlock. To avoid deadlocks, do not acquire childMu while holding mu.
- mu sync.Mutex
-}
-
-// rotateEndpoints returns a slice of all the input endpoints rotated a random
-// amount.
-func rotateEndpoints(es []resolver.Endpoint) []resolver.Endpoint {
- les := len(es)
- if les == 0 {
- return es
- }
- r := randIntN(les)
- // Make a copy to avoid mutating data beyond the end of es.
- ret := make([]resolver.Endpoint, les)
- copy(ret, es[r:])
- copy(ret[les-r:], es[:r])
- return ret
-}
-
-// UpdateClientConnState creates a child for new endpoints and deletes children
-// for endpoints that are no longer present. It also updates all the children,
-// and sends a single synchronous update of the childrens' aggregated state at
-// the end of the UpdateClientConnState operation. If any endpoint has no
-// addresses it will ignore that endpoint. Otherwise, returns first error found
-// from a child, but fully processes the new update.
-func (es *endpointSharding) UpdateClientConnState(state balancer.ClientConnState) error {
- es.childMu.Lock()
- defer es.childMu.Unlock()
-
- es.inhibitChildUpdates.Store(true)
- defer func() {
- es.inhibitChildUpdates.Store(false)
- es.updateState()
- }()
- var ret error
-
- children := es.children.Load()
- newChildren := resolver.NewEndpointMap[*balancerWrapper]()
-
- // Update/Create new children.
- for _, endpoint := range rotateEndpoints(state.ResolverState.Endpoints) {
- if _, ok := newChildren.Get(endpoint); ok {
- // Endpoint child was already created, continue to avoid duplicate
- // update.
- continue
- }
- childBalancer, ok := children.Get(endpoint)
- if ok {
- // Endpoint attributes may have changed, update the stored endpoint.
- es.mu.Lock()
- childBalancer.childState.Endpoint = endpoint
- es.mu.Unlock()
- } else {
- childBalancer = &balancerWrapper{
- childState: ChildState{Endpoint: endpoint},
- ClientConn: es.cc,
- es: es,
- }
- childBalancer.childState.Balancer = childBalancer
- childBalancer.child = es.childBuilder(childBalancer, es.bOpts)
- }
- newChildren.Set(endpoint, childBalancer)
- if err := childBalancer.updateClientConnStateLocked(balancer.ClientConnState{
- BalancerConfig: state.BalancerConfig,
- ResolverState: resolver.State{
- Endpoints: []resolver.Endpoint{endpoint},
- Attributes: state.ResolverState.Attributes,
- },
- }); err != nil && ret == nil {
- // Return first error found, and always commit full processing of
- // updating children. If desired to process more specific errors
- // across all endpoints, caller should make these specific
- // validations, this is a current limitation for simplicity sake.
- ret = err
- }
- }
- // Delete old children that are no longer present.
- for _, e := range children.Keys() {
- child, _ := children.Get(e)
- if _, ok := newChildren.Get(e); !ok {
- child.closeLocked()
- }
- }
- es.children.Store(newChildren)
- if newChildren.Len() == 0 {
- return balancer.ErrBadResolverState
- }
- return ret
-}
-
-// ResolverError forwards the resolver error to all of the endpointSharding's
-// children and sends a single synchronous update of the childStates at the end
-// of the ResolverError operation.
-func (es *endpointSharding) ResolverError(err error) {
- es.childMu.Lock()
- defer es.childMu.Unlock()
- es.inhibitChildUpdates.Store(true)
- defer func() {
- es.inhibitChildUpdates.Store(false)
- es.updateState()
- }()
- children := es.children.Load()
- for _, child := range children.Values() {
- child.resolverErrorLocked(err)
- }
-}
-
-func (es *endpointSharding) UpdateSubConnState(balancer.SubConn, balancer.SubConnState) {
- // UpdateSubConnState is deprecated.
-}
-
-func (es *endpointSharding) Close() {
- es.childMu.Lock()
- defer es.childMu.Unlock()
- children := es.children.Load()
- for _, child := range children.Values() {
- child.closeLocked()
- }
-}
-
-func (es *endpointSharding) ExitIdle() {
- es.childMu.Lock()
- defer es.childMu.Unlock()
- for _, bw := range es.children.Load().Values() {
- if !bw.isClosed {
- bw.child.ExitIdle()
- }
- }
-}
-
-// updateState updates this component's state. It sends the aggregated state,
-// and a picker with round robin behavior with all the child states present if
-// needed.
-func (es *endpointSharding) updateState() {
- if es.inhibitChildUpdates.Load() {
- return
- }
- var readyPickers, connectingPickers, idlePickers, transientFailurePickers []balancer.Picker
-
- es.mu.Lock()
- defer es.mu.Unlock()
-
- children := es.children.Load()
- childStates := make([]ChildState, 0, children.Len())
-
- for _, child := range children.Values() {
- childState := child.childState
- childStates = append(childStates, childState)
- childPicker := childState.State.Picker
- switch childState.State.ConnectivityState {
- case connectivity.Ready:
- readyPickers = append(readyPickers, childPicker)
- case connectivity.Connecting:
- connectingPickers = append(connectingPickers, childPicker)
- case connectivity.Idle:
- idlePickers = append(idlePickers, childPicker)
- case connectivity.TransientFailure:
- transientFailurePickers = append(transientFailurePickers, childPicker)
- // connectivity.Shutdown shouldn't appear.
- }
- }
-
- // Construct the round robin picker based off the aggregated state. Whatever
- // the aggregated state, use the pickers present that are currently in that
- // state only.
- var aggState connectivity.State
- var pickers []balancer.Picker
- if len(readyPickers) >= 1 {
- aggState = connectivity.Ready
- pickers = readyPickers
- } else if len(connectingPickers) >= 1 {
- aggState = connectivity.Connecting
- pickers = connectingPickers
- } else if len(idlePickers) >= 1 {
- aggState = connectivity.Idle
- pickers = idlePickers
- } else if len(transientFailurePickers) >= 1 {
- aggState = connectivity.TransientFailure
- pickers = transientFailurePickers
- } else {
- aggState = connectivity.TransientFailure
- pickers = []balancer.Picker{base.NewErrPicker(errors.New("no children to pick from"))}
- } // No children (resolver error before valid update).
- p := &pickerWithChildStates{
- pickers: pickers,
- childStates: childStates,
- next: uint32(randIntN(len(pickers))),
- }
- es.cc.UpdateState(balancer.State{
- ConnectivityState: aggState,
- Picker: p,
- })
-}
-
-// pickerWithChildStates delegates to the pickers it holds in a round robin
-// fashion. It also contains the childStates of all the endpointSharding's
-// children.
-type pickerWithChildStates struct {
- pickers []balancer.Picker
- childStates []ChildState
- next uint32
-}
-
-func (p *pickerWithChildStates) Pick(info balancer.PickInfo) (balancer.PickResult, error) {
- nextIndex := atomic.AddUint32(&p.next, 1)
- picker := p.pickers[nextIndex%uint32(len(p.pickers))]
- return picker.Pick(info)
-}
-
-// ChildStatesFromPicker returns the state of all the children managed by the
-// endpoint sharding balancer that created this picker.
-func ChildStatesFromPicker(picker balancer.Picker) []ChildState {
- p, ok := picker.(*pickerWithChildStates)
- if !ok {
- return nil
- }
- return p.childStates
-}
-
-// balancerWrapper is a wrapper of a balancer. It ID's a child balancer by
-// endpoint, and persists recent child balancer state.
-type balancerWrapper struct {
- // The following fields are initialized at build time and read-only after
- // that and therefore do not need to be guarded by a mutex.
-
- // child contains the wrapped balancer. Access its methods only through
- // methods on balancerWrapper to ensure proper synchronization
- child balancer.Balancer
- balancer.ClientConn // embed to intercept UpdateState, doesn't deal with SubConns
-
- es *endpointSharding
-
- // Access to the following fields is guarded by es.mu.
-
- childState ChildState
- isClosed bool
-}
-
-func (bw *balancerWrapper) UpdateState(state balancer.State) {
- bw.es.mu.Lock()
- bw.childState.State = state
- bw.es.mu.Unlock()
- if state.ConnectivityState == connectivity.Idle && !bw.es.esOpts.DisableAutoReconnect {
- bw.ExitIdle()
- }
- bw.es.updateState()
-}
-
-// ExitIdle pings an IDLE child balancer to exit idle in a new goroutine to
-// avoid deadlocks due to synchronous balancer state updates.
-func (bw *balancerWrapper) ExitIdle() {
- go func() {
- bw.es.childMu.Lock()
- if !bw.isClosed {
- bw.child.ExitIdle()
- }
- bw.es.childMu.Unlock()
- }()
-}
-
-// updateClientConnStateLocked delivers the ClientConnState to the child
-// balancer. Callers must hold the child mutex of the parent endpointsharding
-// balancer.
-func (bw *balancerWrapper) updateClientConnStateLocked(ccs balancer.ClientConnState) error {
- return bw.child.UpdateClientConnState(ccs)
-}
-
-// closeLocked closes the child balancer. Callers must hold the child mutext of
-// the parent endpointsharding balancer.
-func (bw *balancerWrapper) closeLocked() {
- bw.child.Close()
- bw.isClosed = true
-}
-
-func (bw *balancerWrapper) resolverErrorLocked(err error) {
- bw.child.ResolverError(err)
-}