summaryrefslogtreecommitdiff
path: root/vendor/google.golang.org/grpc/clientconn.go
diff options
context:
space:
mode:
Diffstat (limited to 'vendor/google.golang.org/grpc/clientconn.go')
-rw-r--r--vendor/google.golang.org/grpc/clientconn.go96
1 files changed, 54 insertions, 42 deletions
diff --git a/vendor/google.golang.org/grpc/clientconn.go b/vendor/google.golang.org/grpc/clientconn.go
index c7f260711..423be7b43 100644
--- a/vendor/google.golang.org/grpc/clientconn.go
+++ b/vendor/google.golang.org/grpc/clientconn.go
@@ -31,13 +31,13 @@ import (
"google.golang.org/grpc/balancer"
"google.golang.org/grpc/balancer/base"
+ "google.golang.org/grpc/balancer/pickfirst"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/connectivity"
"google.golang.org/grpc/internal"
"google.golang.org/grpc/internal/channelz"
"google.golang.org/grpc/internal/grpcsync"
"google.golang.org/grpc/internal/idle"
- "google.golang.org/grpc/internal/pretty"
iresolver "google.golang.org/grpc/internal/resolver"
"google.golang.org/grpc/internal/transport"
"google.golang.org/grpc/keepalive"
@@ -73,6 +73,8 @@ var (
// invalidDefaultServiceConfigErrPrefix is used to prefix the json parsing error for the default
// service config.
invalidDefaultServiceConfigErrPrefix = "grpc: the provided default service config is invalid"
+ // PickFirstBalancerName is the name of the pick_first balancer.
+ PickFirstBalancerName = pickfirst.Name
)
// The following errors are returned from Dial and DialContext
@@ -121,8 +123,9 @@ func (dcs *defaultConfigSelector) SelectConfig(rpcInfo iresolver.RPCInfo) (*ires
// https://github.com/grpc/grpc/blob/master/doc/naming.md. e.g. to use dns
// resolver, a "dns:///" prefix should be applied to the target.
//
-// The DialOptions returned by WithBlock, WithTimeout, and
-// WithReturnConnectionError are ignored by this function.
+// The DialOptions returned by WithBlock, WithTimeout,
+// WithReturnConnectionError, and FailOnNonTempDialError are ignored by this
+// function.
func NewClient(target string, opts ...DialOption) (conn *ClientConn, err error) {
cc := &ClientConn{
target: target,
@@ -152,6 +155,16 @@ func NewClient(target string, opts ...DialOption) (conn *ClientConn, err error)
for _, opt := range opts {
opt.apply(&cc.dopts)
}
+
+ // Determine the resolver to use.
+ if err := cc.initParsedTargetAndResolverBuilder(); err != nil {
+ return nil, err
+ }
+
+ for _, opt := range globalPerTargetDialOptions {
+ opt.DialOptionForTarget(cc.parsedTarget.URL).apply(&cc.dopts)
+ }
+
chainUnaryClientInterceptors(cc)
chainStreamClientInterceptors(cc)
@@ -160,7 +173,7 @@ func NewClient(target string, opts ...DialOption) (conn *ClientConn, err error)
}
if cc.dopts.defaultServiceConfigRawJSON != nil {
- scpr := parseServiceConfig(*cc.dopts.defaultServiceConfigRawJSON)
+ scpr := parseServiceConfig(*cc.dopts.defaultServiceConfigRawJSON, cc.dopts.maxCallAttempts)
if scpr.Err != nil {
return nil, fmt.Errorf("%s: %v", invalidDefaultServiceConfigErrPrefix, scpr.Err)
}
@@ -168,25 +181,16 @@ func NewClient(target string, opts ...DialOption) (conn *ClientConn, err error)
}
cc.mkp = cc.dopts.copts.KeepaliveParams
- // Register ClientConn with channelz.
- cc.channelzRegistration(target)
-
- // TODO: Ideally it should be impossible to error from this function after
- // channelz registration. This will require removing some channelz logs
- // from the following functions that can error. Errors can be returned to
- // the user, and successful logs can be emitted here, after the checks have
- // passed and channelz is subsequently registered.
-
- // Determine the resolver to use.
- if err := cc.parseTargetAndFindResolver(); err != nil {
- channelz.RemoveEntry(cc.channelz.ID)
- return nil, err
- }
- if err = cc.determineAuthority(); err != nil {
- channelz.RemoveEntry(cc.channelz.ID)
+ if err = cc.initAuthority(); err != nil {
return nil, err
}
+ // Register ClientConn with channelz. Note that this is only done after
+ // channel creation cannot fail.
+ cc.channelzRegistration(target)
+ channelz.Infof(logger, cc.channelz, "parsed dial target is: %#v", cc.parsedTarget)
+ channelz.Infof(logger, cc.channelz, "Channel authority set to %q", cc.authority)
+
cc.csMgr = newConnectivityStateManager(cc.ctx, cc.channelz)
cc.pickerWrapper = newPickerWrapper(cc.dopts.copts.StatsHandlers)
@@ -196,6 +200,8 @@ func NewClient(target string, opts ...DialOption) (conn *ClientConn, err error)
}
// Dial calls DialContext(context.Background(), target, opts...).
+//
+// Deprecated: use NewClient instead. Will be supported throughout 1.x.
func Dial(target string, opts ...DialOption) (*ClientConn, error) {
return DialContext(context.Background(), target, opts...)
}
@@ -209,6 +215,8 @@ func Dial(target string, opts ...DialOption) (*ClientConn, error) {
// "passthrough" for backward compatibility. This distinction should not matter
// to most users, but could matter to legacy users that specify a custom dialer
// and expect it to receive the target string directly.
+//
+// Deprecated: use NewClient instead. Will be supported throughout 1.x.
func DialContext(ctx context.Context, target string, opts ...DialOption) (conn *ClientConn, err error) {
// At the end of this method, we kick the channel out of idle, rather than
// waiting for the first rpc.
@@ -583,11 +591,11 @@ type ClientConn struct {
// The following are initialized at dial time, and are read-only after that.
target string // User's dial target.
- parsedTarget resolver.Target // See parseTargetAndFindResolver().
- authority string // See determineAuthority().
+ parsedTarget resolver.Target // See initParsedTargetAndResolverBuilder().
+ authority string // See initAuthority().
dopts dialOptions // Default and user specified dial options.
channelz *channelz.Channel // Channelz object.
- resolverBuilder resolver.Builder // See parseTargetAndFindResolver().
+ resolverBuilder resolver.Builder // See initParsedTargetAndResolverBuilder().
idlenessMgr *idle.Manager
// The following provide their own synchronization, and therefore don't
@@ -688,8 +696,7 @@ func (cc *ClientConn) waitForResolvedAddrs(ctx context.Context) error {
var emptyServiceConfig *ServiceConfig
func init() {
- balancer.Register(pickfirstBuilder{})
- cfg := parseServiceConfig("{}")
+ cfg := parseServiceConfig("{}", defaultMaxCallAttempts)
if cfg.Err != nil {
panic(fmt.Sprintf("impossible error parsing empty service config: %v", cfg.Err))
}
@@ -838,6 +845,9 @@ func (cc *ClientConn) newAddrConnLocked(addrs []resolver.Address, opts balancer.
stateChan: make(chan struct{}),
}
ac.ctx, ac.cancel = context.WithCancel(cc.ctx)
+ // Start with our address set to the first address; this may be updated if
+ // we connect to different addresses.
+ ac.channelz.ChannelMetrics.Target.Store(&addrs[0].Addr)
channelz.AddTraceEvent(logger, ac.channelz, 0, &channelz.TraceEvent{
Desc: "Subchannel created",
@@ -929,10 +939,14 @@ func equalAddresses(a, b []resolver.Address) bool {
// updateAddrs updates ac.addrs with the new addresses list and handles active
// connections or connection attempts.
func (ac *addrConn) updateAddrs(addrs []resolver.Address) {
- ac.mu.Lock()
- channelz.Infof(logger, ac.channelz, "addrConn: updateAddrs curAddr: %v, addrs: %v", pretty.ToJSON(ac.curAddr), pretty.ToJSON(addrs))
-
addrs = copyAddressesWithoutBalancerAttributes(addrs)
+ limit := len(addrs)
+ if limit > 5 {
+ limit = 5
+ }
+ channelz.Infof(logger, ac.channelz, "addrConn: updateAddrs addrs (%d of %d): %v", limit, len(addrs), addrs[:limit])
+
+ ac.mu.Lock()
if equalAddresses(ac.addrs, addrs) {
ac.mu.Unlock()
return
@@ -1167,6 +1181,10 @@ type addrConn struct {
// is received, transport is closed, ac has been torn down).
transport transport.ClientTransport // The current transport.
+ // This mutex is used on the RPC path, so its usage should be minimized as
+ // much as possible.
+ // TODO: Find a lock-free way to retrieve the transport and state from the
+ // addrConn.
mu sync.Mutex
curAddr resolver.Address // The current address.
addrs []resolver.Address // All addresses that the resolver resolved to.
@@ -1292,6 +1310,7 @@ func (ac *addrConn) resetTransport() {
func (ac *addrConn) tryAllAddrs(ctx context.Context, addrs []resolver.Address, connectDeadline time.Time) error {
var firstConnErr error
for _, addr := range addrs {
+ ac.channelz.ChannelMetrics.Target.Store(&addr.Addr)
if ctx.Err() != nil {
return errConnClosing
}
@@ -1657,22 +1676,19 @@ func (cc *ClientConn) connectionError() error {
return cc.lastConnectionError
}
-// parseTargetAndFindResolver parses the user's dial target and stores the
-// parsed target in `cc.parsedTarget`.
+// initParsedTargetAndResolverBuilder parses the user's dial target and stores
+// the parsed target in `cc.parsedTarget`.
//
// The resolver to use is determined based on the scheme in the parsed target
// and the same is stored in `cc.resolverBuilder`.
//
// Doesn't grab cc.mu as this method is expected to be called only at Dial time.
-func (cc *ClientConn) parseTargetAndFindResolver() error {
- channelz.Infof(logger, cc.channelz, "original dial target is: %q", cc.target)
+func (cc *ClientConn) initParsedTargetAndResolverBuilder() error {
+ logger.Infof("original dial target is: %q", cc.target)
var rb resolver.Builder
parsedTarget, err := parseTarget(cc.target)
- if err != nil {
- channelz.Infof(logger, cc.channelz, "dial target %q parse failed: %v", cc.target, err)
- } else {
- channelz.Infof(logger, cc.channelz, "parsed dial target is: %#v", parsedTarget)
+ if err == nil {
rb = cc.getResolver(parsedTarget.URL.Scheme)
if rb != nil {
cc.parsedTarget = parsedTarget
@@ -1691,15 +1707,12 @@ func (cc *ClientConn) parseTargetAndFindResolver() error {
defScheme = resolver.GetDefaultScheme()
}
- channelz.Infof(logger, cc.channelz, "fallback to scheme %q", defScheme)
canonicalTarget := defScheme + ":///" + cc.target
parsedTarget, err = parseTarget(canonicalTarget)
if err != nil {
- channelz.Infof(logger, cc.channelz, "dial target %q parse failed: %v", canonicalTarget, err)
return err
}
- channelz.Infof(logger, cc.channelz, "parsed dial target is: %+v", parsedTarget)
rb = cc.getResolver(parsedTarget.URL.Scheme)
if rb == nil {
return fmt.Errorf("could not get resolver for default scheme: %q", parsedTarget.URL.Scheme)
@@ -1739,7 +1752,7 @@ func encodeAuthority(authority string) string {
return false
case '!', '$', '&', '\'', '(', ')', '*', '+', ',', ';', '=': // Subdelim characters
return false
- case ':', '[', ']', '@': // Authority related delimeters
+ case ':', '[', ']', '@': // Authority related delimiters
return false
}
// Everything else must be escaped.
@@ -1789,7 +1802,7 @@ func encodeAuthority(authority string) string {
// credentials do not match the authority configured through the dial option.
//
// Doesn't grab cc.mu as this method is expected to be called only at Dial time.
-func (cc *ClientConn) determineAuthority() error {
+func (cc *ClientConn) initAuthority() error {
dopts := cc.dopts
// Historically, we had two options for users to specify the serverName or
// authority for a channel. One was through the transport credentials
@@ -1822,6 +1835,5 @@ func (cc *ClientConn) determineAuthority() error {
} else {
cc.authority = encodeAuthority(endpoint)
}
- channelz.Infof(logger, cc.channelz, "Channel authority set to %q", cc.authority)
return nil
}