summaryrefslogtreecommitdiff
path: root/vendor/google.golang.org/grpc/clientconn.go
diff options
context:
space:
mode:
authorLibravatar dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>2023-09-18 13:47:28 +0100
committerLibravatar GitHub <noreply@github.com>2023-09-18 13:47:28 +0100
commitc6fdcd52fabb6984de280f763ec5dc2023613054 (patch)
tree939de6cc265fb0c73ef40c2129c8eb298fd93b0c /vendor/google.golang.org/grpc/clientconn.go
parent[chore]: Bump github.com/miekg/dns from 1.1.55 to 1.1.56 (#2204) (diff)
downloadgotosocial-c6fdcd52fabb6984de280f763ec5dc2023613054.tar.xz
[chore]: Bump go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp from 1.17.0 to 1.18.0 (#2207)
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
Diffstat (limited to 'vendor/google.golang.org/grpc/clientconn.go')
-rw-r--r--vendor/google.golang.org/grpc/clientconn.go107
1 files changed, 73 insertions, 34 deletions
diff --git a/vendor/google.golang.org/grpc/clientconn.go b/vendor/google.golang.org/grpc/clientconn.go
index bfd7555a8..d53d91d5d 100644
--- a/vendor/google.golang.org/grpc/clientconn.go
+++ b/vendor/google.golang.org/grpc/clientconn.go
@@ -34,9 +34,11 @@ import (
"google.golang.org/grpc/codes"
"google.golang.org/grpc/connectivity"
"google.golang.org/grpc/credentials"
+ "google.golang.org/grpc/internal"
"google.golang.org/grpc/internal/backoff"
"google.golang.org/grpc/internal/channelz"
"google.golang.org/grpc/internal/grpcsync"
+ "google.golang.org/grpc/internal/idle"
"google.golang.org/grpc/internal/pretty"
iresolver "google.golang.org/grpc/internal/resolver"
"google.golang.org/grpc/internal/transport"
@@ -54,8 +56,6 @@ import (
const (
// minimum time to give a connection to complete
minConnectTimeout = 20 * time.Second
- // must match grpclbName in grpclb/grpclb.go
- grpclbName = "grpclb"
)
var (
@@ -138,7 +138,6 @@ func (dcs *defaultConfigSelector) SelectConfig(rpcInfo iresolver.RPCInfo) (*ires
func DialContext(ctx context.Context, target string, opts ...DialOption) (conn *ClientConn, err error) {
cc := &ClientConn{
target: target,
- csMgr: &connectivityStateManager{},
conns: make(map[*addrConn]struct{}),
dopts: defaultDialOptions(),
czData: new(channelzData),
@@ -191,6 +190,8 @@ func DialContext(ctx context.Context, target string, opts ...DialOption) (conn *
// Register ClientConn with channelz.
cc.channelzRegistration(target)
+ cc.csMgr = newConnectivityStateManager(cc.ctx, cc.channelzID)
+
if err := cc.validateTransportCredentials(); err != nil {
return nil, err
}
@@ -266,7 +267,7 @@ func DialContext(ctx context.Context, target string, opts ...DialOption) (conn *
// Configure idleness support with configured idle timeout or default idle
// timeout duration. Idleness can be explicitly disabled by the user, by
// setting the dial option to 0.
- cc.idlenessMgr = newIdlenessManager(cc, cc.dopts.idleTimeout)
+ cc.idlenessMgr = idle.NewManager(idle.ManagerOptions{Enforcer: (*idler)(cc), Timeout: cc.dopts.idleTimeout, Logger: logger})
// Return early for non-blocking dials.
if !cc.dopts.block {
@@ -317,6 +318,16 @@ func (cc *ClientConn) addTraceEvent(msg string) {
channelz.AddTraceEvent(logger, cc.channelzID, 0, ted)
}
+type idler ClientConn
+
+func (i *idler) EnterIdleMode() error {
+ return (*ClientConn)(i).enterIdleMode()
+}
+
+func (i *idler) ExitIdleMode() error {
+ return (*ClientConn)(i).exitIdleMode()
+}
+
// exitIdleMode moves the channel out of idle mode by recreating the name
// resolver and load balancer.
func (cc *ClientConn) exitIdleMode() error {
@@ -327,7 +338,7 @@ func (cc *ClientConn) exitIdleMode() error {
}
if cc.idlenessState != ccIdlenessStateIdle {
cc.mu.Unlock()
- logger.Info("ClientConn asked to exit idle mode when not in idle mode")
+ channelz.Infof(logger, cc.channelzID, "ClientConn asked to exit idle mode, current mode is %v", cc.idlenessState)
return nil
}
@@ -350,7 +361,7 @@ func (cc *ClientConn) exitIdleMode() error {
cc.idlenessState = ccIdlenessStateExitingIdle
exitedIdle := false
if cc.blockingpicker == nil {
- cc.blockingpicker = newPickerWrapper()
+ cc.blockingpicker = newPickerWrapper(cc.dopts.copts.StatsHandlers)
} else {
cc.blockingpicker.exitIdleMode()
exitedIdle = true
@@ -398,7 +409,8 @@ func (cc *ClientConn) enterIdleMode() error {
return ErrClientConnClosing
}
if cc.idlenessState != ccIdlenessStateActive {
- logger.Error("ClientConn asked to enter idle mode when not active")
+ channelz.Errorf(logger, cc.channelzID, "ClientConn asked to enter idle mode, current mode is %v", cc.idlenessState)
+ cc.mu.Unlock()
return nil
}
@@ -475,7 +487,6 @@ func (cc *ClientConn) validateTransportCredentials() error {
func (cc *ClientConn) channelzRegistration(target string) {
cc.channelzID = channelz.RegisterChannel(&channelzChannel{cc}, cc.dopts.channelzParentID, target)
cc.addTraceEvent("created")
- cc.csMgr.channelzID = cc.channelzID
}
// chainUnaryClientInterceptors chains all unary client interceptors into one.
@@ -492,7 +503,7 @@ func chainUnaryClientInterceptors(cc *ClientConn) {
} else if len(interceptors) == 1 {
chainedInt = interceptors[0]
} else {
- chainedInt = func(ctx context.Context, method string, req, reply interface{}, cc *ClientConn, invoker UnaryInvoker, opts ...CallOption) error {
+ chainedInt = func(ctx context.Context, method string, req, reply any, cc *ClientConn, invoker UnaryInvoker, opts ...CallOption) error {
return interceptors[0](ctx, method, req, reply, cc, getChainUnaryInvoker(interceptors, 0, invoker), opts...)
}
}
@@ -504,7 +515,7 @@ func getChainUnaryInvoker(interceptors []UnaryClientInterceptor, curr int, final
if curr == len(interceptors)-1 {
return finalInvoker
}
- return func(ctx context.Context, method string, req, reply interface{}, cc *ClientConn, opts ...CallOption) error {
+ return func(ctx context.Context, method string, req, reply any, cc *ClientConn, opts ...CallOption) error {
return interceptors[curr+1](ctx, method, req, reply, cc, getChainUnaryInvoker(interceptors, curr+1, finalInvoker), opts...)
}
}
@@ -540,13 +551,27 @@ func getChainStreamer(interceptors []StreamClientInterceptor, curr int, finalStr
}
}
+// newConnectivityStateManager creates an connectivityStateManager with
+// the specified id.
+func newConnectivityStateManager(ctx context.Context, id *channelz.Identifier) *connectivityStateManager {
+ return &connectivityStateManager{
+ channelzID: id,
+ pubSub: grpcsync.NewPubSub(ctx),
+ }
+}
+
// connectivityStateManager keeps the connectivity.State of ClientConn.
// This struct will eventually be exported so the balancers can access it.
+//
+// TODO: If possible, get rid of the `connectivityStateManager` type, and
+// provide this functionality using the `PubSub`, to avoid keeping track of
+// the connectivity state at two places.
type connectivityStateManager struct {
mu sync.Mutex
state connectivity.State
notifyChan chan struct{}
channelzID *channelz.Identifier
+ pubSub *grpcsync.PubSub
}
// updateState updates the connectivity.State of ClientConn.
@@ -562,6 +587,8 @@ func (csm *connectivityStateManager) updateState(state connectivity.State) {
return
}
csm.state = state
+ csm.pubSub.Publish(state)
+
channelz.Infof(logger, csm.channelzID, "Channel Connectivity change to %v", state)
if csm.notifyChan != nil {
// There are other goroutines waiting on this channel.
@@ -591,7 +618,7 @@ func (csm *connectivityStateManager) getNotifyChan() <-chan struct{} {
type ClientConnInterface interface {
// Invoke performs a unary RPC and returns after the response is received
// into reply.
- Invoke(ctx context.Context, method string, args interface{}, reply interface{}, opts ...CallOption) error
+ Invoke(ctx context.Context, method string, args any, reply any, opts ...CallOption) error
// NewStream begins a streaming RPC.
NewStream(ctx context.Context, desc *StreamDesc, method string, opts ...CallOption) (ClientStream, error)
}
@@ -623,7 +650,7 @@ type ClientConn struct {
channelzID *channelz.Identifier // Channelz identifier for the channel.
resolverBuilder resolver.Builder // See parseTargetAndFindResolver().
balancerWrapper *ccBalancerWrapper // Uses gracefulswitch.balancer underneath.
- idlenessMgr idlenessManager
+ idlenessMgr idle.Manager
// The following provide their own synchronization, and therefore don't
// require cc.mu to be held to access them.
@@ -669,6 +696,19 @@ const (
ccIdlenessStateExitingIdle
)
+func (s ccIdlenessState) String() string {
+ switch s {
+ case ccIdlenessStateActive:
+ return "active"
+ case ccIdlenessStateIdle:
+ return "idle"
+ case ccIdlenessStateExitingIdle:
+ return "exitingIdle"
+ default:
+ return "unknown"
+ }
+}
+
// WaitForStateChange waits until the connectivity.State of ClientConn changes from sourceState or
// ctx expires. A true value is returned in former case and false in latter.
//
@@ -760,6 +800,10 @@ func init() {
panic(fmt.Sprintf("impossible error parsing empty service config: %v", cfg.Err))
}
emptyServiceConfig = cfg.Config.(*ServiceConfig)
+
+ internal.SubscribeToConnectivityStateChanges = func(cc *ClientConn, s grpcsync.Subscriber) func() {
+ return cc.csMgr.pubSub.Subscribe(s)
+ }
}
func (cc *ClientConn) maybeApplyDefaultServiceConfig(addrs []resolver.Address) {
@@ -1153,23 +1197,13 @@ func (cc *ClientConn) applyServiceConfigAndBalancer(sc *ServiceConfig, configSel
}
var newBalancerName string
- if cc.sc != nil && cc.sc.lbConfig != nil {
+ if cc.sc == nil || (cc.sc.lbConfig == nil && cc.sc.LB == nil) {
+ // No service config or no LB policy specified in config.
+ newBalancerName = PickFirstBalancerName
+ } else if cc.sc.lbConfig != nil {
newBalancerName = cc.sc.lbConfig.name
- } else {
- var isGRPCLB bool
- for _, a := range addrs {
- if a.Type == resolver.GRPCLB {
- isGRPCLB = true
- break
- }
- }
- if isGRPCLB {
- newBalancerName = grpclbName
- } else if cc.sc != nil && cc.sc.LB != nil {
- newBalancerName = *cc.sc.LB
- } else {
- newBalancerName = PickFirstBalancerName
- }
+ } else { // cc.sc.LB != nil
+ newBalancerName = *cc.sc.LB
}
cc.balancerWrapper.switchTo(newBalancerName)
}
@@ -1208,7 +1242,10 @@ func (cc *ClientConn) ResetConnectBackoff() {
// Close tears down the ClientConn and all underlying connections.
func (cc *ClientConn) Close() error {
- defer cc.cancel()
+ defer func() {
+ cc.cancel()
+ <-cc.csMgr.pubSub.Done()
+ }()
cc.mu.Lock()
if cc.conns == nil {
@@ -1242,7 +1279,7 @@ func (cc *ClientConn) Close() error {
rWrapper.close()
}
if idlenessMgr != nil {
- idlenessMgr.close()
+ idlenessMgr.Close()
}
for ac := range conns {
@@ -1352,12 +1389,14 @@ func (ac *addrConn) resetTransport() {
if err := ac.tryAllAddrs(acCtx, addrs, connectDeadline); err != nil {
ac.cc.resolveNow(resolver.ResolveNowOptions{})
- // After exhausting all addresses, the addrConn enters
- // TRANSIENT_FAILURE.
+ ac.mu.Lock()
if acCtx.Err() != nil {
+ // addrConn was torn down.
+ ac.mu.Unlock()
return
}
- ac.mu.Lock()
+ // After exhausting all addresses, the addrConn enters
+ // TRANSIENT_FAILURE.
ac.updateConnectivityState(connectivity.TransientFailure, err)
// Backoff.
@@ -1553,7 +1592,7 @@ func (ac *addrConn) startHealthCheck(ctx context.Context) {
// Set up the health check helper functions.
currentTr := ac.transport
- newStream := func(method string) (interface{}, error) {
+ newStream := func(method string) (any, error) {
ac.mu.Lock()
if ac.transport != currentTr {
ac.mu.Unlock()