summaryrefslogtreecommitdiff
path: root/vendor/google.golang.org/grpc/internal/resolver/delegatingresolver/delegatingresolver.go
diff options
context:
space:
mode:
Diffstat (limited to 'vendor/google.golang.org/grpc/internal/resolver/delegatingresolver/delegatingresolver.go')
-rw-r--r--vendor/google.golang.org/grpc/internal/resolver/delegatingresolver/delegatingresolver.go213
1 files changed, 148 insertions, 65 deletions
diff --git a/vendor/google.golang.org/grpc/internal/resolver/delegatingresolver/delegatingresolver.go b/vendor/google.golang.org/grpc/internal/resolver/delegatingresolver/delegatingresolver.go
index a6c647013..c0e227577 100644
--- a/vendor/google.golang.org/grpc/internal/resolver/delegatingresolver/delegatingresolver.go
+++ b/vendor/google.golang.org/grpc/internal/resolver/delegatingresolver/delegatingresolver.go
@@ -28,6 +28,8 @@ import (
"google.golang.org/grpc/grpclog"
"google.golang.org/grpc/internal/proxyattributes"
+ "google.golang.org/grpc/internal/transport"
+ "google.golang.org/grpc/internal/transport/networktype"
"google.golang.org/grpc/resolver"
"google.golang.org/grpc/serviceconfig"
)
@@ -40,19 +42,26 @@ var (
// delegatingResolver manages both target URI and proxy address resolution by
// delegating these tasks to separate child resolvers. Essentially, it acts as
-// a intermediary between the gRPC ClientConn and the child resolvers.
+// an intermediary between the gRPC ClientConn and the child resolvers.
//
// It implements the [resolver.Resolver] interface.
type delegatingResolver struct {
- target resolver.Target // parsed target URI to be resolved
- cc resolver.ClientConn // gRPC ClientConn
- targetResolver resolver.Resolver // resolver for the target URI, based on its scheme
- proxyResolver resolver.Resolver // resolver for the proxy URI; nil if no proxy is configured
- proxyURL *url.URL // proxy URL, derived from proxy environment and target
+ target resolver.Target // parsed target URI to be resolved
+ cc resolver.ClientConn // gRPC ClientConn
+ proxyURL *url.URL // proxy URL, derived from proxy environment and target
+ // We do not hold both mu and childMu in the same goroutine. Avoid holding
+ // both locks when calling into the child, as the child resolver may
+ // synchronously callback into the channel.
mu sync.Mutex // protects all the fields below
targetResolverState *resolver.State // state of the target resolver
proxyAddrs []resolver.Address // resolved proxy addresses; empty if no proxy is configured
+
+ // childMu serializes calls into child resolvers. It also protects access to
+ // the following fields.
+ childMu sync.Mutex
+ targetResolver resolver.Resolver // resolver for the target URI, based on its scheme
+ proxyResolver resolver.Resolver // resolver for the proxy URI; nil if no proxy is configured
}
// nopResolver is a resolver that does nothing.
@@ -62,8 +71,8 @@ func (nopResolver) ResolveNow(resolver.ResolveNowOptions) {}
func (nopResolver) Close() {}
-// proxyURLForTarget determines the proxy URL for the given address based on
-// the environment. It can return the following:
+// proxyURLForTarget determines the proxy URL for the given address based on the
+// environment. It can return the following:
// - nil URL, nil error: No proxy is configured or the address is excluded
// using the `NO_PROXY` environment variable or if req.URL.Host is
// "localhost" (with or without // a port number)
@@ -82,7 +91,8 @@ func proxyURLForTarget(address string) (*url.URL, error) {
// resolvers:
// - one to resolve the proxy address specified using the supported
// environment variables. This uses the registered resolver for the "dns"
-// scheme.
+// scheme. It is lazily built when a target resolver update contains at least
+// one TCP address.
// - one to resolve the target URI using the resolver specified by the scheme
// in the target URI or specified by the user using the WithResolvers dial
// option. As a special case, if the target URI's scheme is "dns" and a
@@ -91,8 +101,10 @@ func proxyURLForTarget(address string) (*url.URL, error) {
// resolution is enabled using the dial option.
func New(target resolver.Target, cc resolver.ClientConn, opts resolver.BuildOptions, targetResolverBuilder resolver.Builder, targetResolutionEnabled bool) (resolver.Resolver, error) {
r := &delegatingResolver{
- target: target,
- cc: cc,
+ target: target,
+ cc: cc,
+ proxyResolver: nopResolver{},
+ targetResolver: nopResolver{},
}
var err error
@@ -111,41 +123,34 @@ func New(target resolver.Target, cc resolver.ClientConn, opts resolver.BuildOpti
logger.Infof("Proxy URL detected : %s", r.proxyURL)
}
+ // Resolver updates from one child may trigger calls into the other. Block
+ // updates until the children are initialized.
+ r.childMu.Lock()
+ defer r.childMu.Unlock()
// When the scheme is 'dns' and target resolution on client is not enabled,
// resolution should be handled by the proxy, not the client. Therefore, we
// bypass the target resolver and store the unresolved target address.
if target.URL.Scheme == "dns" && !targetResolutionEnabled {
- state := resolver.State{
+ r.targetResolverState = &resolver.State{
Addresses: []resolver.Address{{Addr: target.Endpoint()}},
Endpoints: []resolver.Endpoint{{Addresses: []resolver.Address{{Addr: target.Endpoint()}}}},
}
- r.targetResolverState = &state
- } else {
- wcc := &wrappingClientConn{
- stateListener: r.updateTargetResolverState,
- parent: r,
- }
- if r.targetResolver, err = targetResolverBuilder.Build(target, wcc, opts); err != nil {
- return nil, fmt.Errorf("delegating_resolver: unable to build the resolver for target %s: %v", target, err)
- }
+ r.updateTargetResolverState(*r.targetResolverState)
+ return r, nil
}
-
- if r.proxyResolver, err = r.proxyURIResolver(opts); err != nil {
- return nil, fmt.Errorf("delegating_resolver: failed to build resolver for proxy URL %q: %v", r.proxyURL, err)
- }
-
- if r.targetResolver == nil {
- r.targetResolver = nopResolver{}
+ wcc := &wrappingClientConn{
+ stateListener: r.updateTargetResolverState,
+ parent: r,
}
- if r.proxyResolver == nil {
- r.proxyResolver = nopResolver{}
+ if r.targetResolver, err = targetResolverBuilder.Build(target, wcc, opts); err != nil {
+ return nil, fmt.Errorf("delegating_resolver: unable to build the resolver for target %s: %v", target, err)
}
return r, nil
}
-// proxyURIResolver creates a resolver for resolving proxy URIs using the
-// "dns" scheme. It adjusts the proxyURL to conform to the "dns:///" format and
-// builds a resolver with a wrappingClientConn to capture resolved addresses.
+// proxyURIResolver creates a resolver for resolving proxy URIs using the "dns"
+// scheme. It adjusts the proxyURL to conform to the "dns:///" format and builds
+// a resolver with a wrappingClientConn to capture resolved addresses.
func (r *delegatingResolver) proxyURIResolver(opts resolver.BuildOptions) (resolver.Resolver, error) {
proxyBuilder := resolver.Get("dns")
if proxyBuilder == nil {
@@ -165,11 +170,15 @@ func (r *delegatingResolver) proxyURIResolver(opts resolver.BuildOptions) (resol
}
func (r *delegatingResolver) ResolveNow(o resolver.ResolveNowOptions) {
+ r.childMu.Lock()
+ defer r.childMu.Unlock()
r.targetResolver.ResolveNow(o)
r.proxyResolver.ResolveNow(o)
}
func (r *delegatingResolver) Close() {
+ r.childMu.Lock()
+ defer r.childMu.Unlock()
r.targetResolver.Close()
r.targetResolver = nil
@@ -177,18 +186,43 @@ func (r *delegatingResolver) Close() {
r.proxyResolver = nil
}
-// updateClientConnStateLocked creates a list of combined addresses by
-// pairing each proxy address with every target address. For each pair, it
-// generates a new [resolver.Address] using the proxy address, and adding the
-// target address as the attribute along with user info. It returns nil if
-// either resolver has not sent update even once and returns the error from
-// ClientConn update once both resolvers have sent update atleast once.
+func networkTypeFromAddr(addr resolver.Address) string {
+ networkType, ok := networktype.Get(addr)
+ if !ok {
+ networkType, _ = transport.ParseDialTarget(addr.Addr)
+ }
+ return networkType
+}
+
+func isTCPAddressPresent(state *resolver.State) bool {
+ for _, addr := range state.Addresses {
+ if networkType := networkTypeFromAddr(addr); networkType == "tcp" {
+ return true
+ }
+ }
+ for _, endpoint := range state.Endpoints {
+ for _, addr := range endpoint.Addresses {
+ if networktype := networkTypeFromAddr(addr); networktype == "tcp" {
+ return true
+ }
+ }
+ }
+ return false
+}
+
+// updateClientConnStateLocked constructs a combined list of addresses by
+// pairing each proxy address with every target address of type TCP. For each
+// pair, it creates a new [resolver.Address] using the proxy address and
+// attaches the corresponding target address and user info as attributes. Target
+// addresses that are not of type TCP are appended to the list as-is. The
+// function returns nil if either resolver has not yet provided an update, and
+// returns the result of ClientConn.UpdateState once both resolvers have
+// provided at least one update.
func (r *delegatingResolver) updateClientConnStateLocked() error {
if r.targetResolverState == nil || r.proxyAddrs == nil {
return nil
}
- curState := *r.targetResolverState
// If multiple resolved proxy addresses are present, we send only the
// unresolved proxy host and let net.Dial handle the proxy host name
// resolution when creating the transport. Sending all resolved addresses
@@ -206,24 +240,30 @@ func (r *delegatingResolver) updateClientConnStateLocked() error {
}
var addresses []resolver.Address
for _, targetAddr := range (*r.targetResolverState).Addresses {
+ // Avoid proxy when network is not tcp.
+ if networkType := networkTypeFromAddr(targetAddr); networkType != "tcp" {
+ addresses = append(addresses, targetAddr)
+ continue
+ }
addresses = append(addresses, proxyattributes.Set(proxyAddr, proxyattributes.Options{
User: r.proxyURL.User,
ConnectAddr: targetAddr.Addr,
}))
}
- // Create a list of combined endpoints by pairing all proxy endpoints
- // with every target endpoint. Each time, it constructs a new
- // [resolver.Endpoint] using the all addresses from all the proxy endpoint
- // and the target addresses from one endpoint. The target address and user
- // information from the proxy URL are added as attributes to the proxy
- // address.The resulting list of addresses is then grouped into endpoints,
- // covering all combinations of proxy and target endpoints.
+ // For each target endpoint, construct a new [resolver.Endpoint] that
+ // includes all addresses from all proxy endpoints and the addresses from
+ // that target endpoint, preserving the number of target endpoints.
var endpoints []resolver.Endpoint
for _, endpt := range (*r.targetResolverState).Endpoints {
var addrs []resolver.Address
- for _, proxyAddr := range r.proxyAddrs {
- for _, targetAddr := range endpt.Addresses {
+ for _, targetAddr := range endpt.Addresses {
+ // Avoid proxy when network is not tcp.
+ if networkType := networkTypeFromAddr(targetAddr); networkType != "tcp" {
+ addrs = append(addrs, targetAddr)
+ continue
+ }
+ for _, proxyAddr := range r.proxyAddrs {
addrs = append(addrs, proxyattributes.Set(proxyAddr, proxyattributes.Options{
User: r.proxyURL.User,
ConnectAddr: targetAddr.Addr,
@@ -234,8 +274,9 @@ func (r *delegatingResolver) updateClientConnStateLocked() error {
}
// Use the targetResolverState for its service config and attributes
// contents. The state update is only sent after both the target and proxy
- // resolvers have sent their updates, and curState has been updated with
- // the combined addresses.
+ // resolvers have sent their updates, and curState has been updated with the
+ // combined addresses.
+ curState := *r.targetResolverState
curState.Addresses = addresses
curState.Endpoints = endpoints
return r.cc.UpdateState(curState)
@@ -245,7 +286,8 @@ func (r *delegatingResolver) updateClientConnStateLocked() error {
// addresses and endpoints, marking the resolver as ready, and triggering a
// state update if both proxy and target resolvers are ready. If the ClientConn
// returns a non-nil error, it calls `ResolveNow()` on the target resolver. It
-// is a StateListener function of wrappingClientConn passed to the proxy resolver.
+// is a StateListener function of wrappingClientConn passed to the proxy
+// resolver.
func (r *delegatingResolver) updateProxyResolverState(state resolver.State) error {
r.mu.Lock()
defer r.mu.Unlock()
@@ -253,8 +295,8 @@ func (r *delegatingResolver) updateProxyResolverState(state resolver.State) erro
logger.Infof("Addresses received from proxy resolver: %s", state.Addresses)
}
if len(state.Endpoints) > 0 {
- // We expect exactly one address per endpoint because the proxy
- // resolver uses "dns" resolution.
+ // We expect exactly one address per endpoint because the proxy resolver
+ // uses "dns" resolution.
r.proxyAddrs = make([]resolver.Address, 0, len(state.Endpoints))
for _, endpoint := range state.Endpoints {
r.proxyAddrs = append(r.proxyAddrs, endpoint.Addresses...)
@@ -267,20 +309,29 @@ func (r *delegatingResolver) updateProxyResolverState(state resolver.State) erro
err := r.updateClientConnStateLocked()
// Another possible approach was to block until updates are received from
// both resolvers. But this is not used because calling `New()` triggers
- // `Build()` for the first resolver, which calls `UpdateState()`. And the
+ // `Build()` for the first resolver, which calls `UpdateState()`. And the
// second resolver hasn't sent an update yet, so it would cause `New()` to
// block indefinitely.
if err != nil {
- r.targetResolver.ResolveNow(resolver.ResolveNowOptions{})
+ go func() {
+ r.childMu.Lock()
+ defer r.childMu.Unlock()
+ if r.targetResolver != nil {
+ r.targetResolver.ResolveNow(resolver.ResolveNowOptions{})
+ }
+ }()
}
return err
}
-// updateTargetResolverState updates the target resolver state by storing target
-// addresses, endpoints, and service config, marking the resolver as ready, and
-// triggering a state update if both resolvers are ready. If the ClientConn
-// returns a non-nil error, it calls `ResolveNow()` on the proxy resolver. It
-// is a StateListener function of wrappingClientConn passed to the target resolver.
+// updateTargetResolverState is the StateListener function provided to the
+// target resolver via wrappingClientConn. It updates the resolver state and
+// marks the target resolver as ready. If the update includes at least one TCP
+// address and the proxy resolver has not yet been constructed, it initializes
+// the proxy resolver. A combined state update is triggered once both resolvers
+// are ready. If all addresses are non-TCP, it proceeds without waiting for the
+// proxy resolver. If ClientConn.UpdateState returns a non-nil error,
+// ResolveNow() is called on the proxy resolver.
func (r *delegatingResolver) updateTargetResolverState(state resolver.State) error {
r.mu.Lock()
defer r.mu.Unlock()
@@ -289,9 +340,40 @@ func (r *delegatingResolver) updateTargetResolverState(state resolver.State) err
logger.Infof("Addresses received from target resolver: %v", state.Addresses)
}
r.targetResolverState = &state
+ // If no addresses returned by resolver have network type as tcp , do not
+ // wait for proxy update.
+ if !isTCPAddressPresent(r.targetResolverState) {
+ return r.cc.UpdateState(*r.targetResolverState)
+ }
+
+ // The proxy resolver may be rebuilt multiple times, specifically each time
+ // the target resolver sends an update, even if the target resolver is built
+ // successfully but building the proxy resolver fails.
+ if len(r.proxyAddrs) == 0 {
+ go func() {
+ r.childMu.Lock()
+ defer r.childMu.Unlock()
+ if _, ok := r.proxyResolver.(nopResolver); !ok {
+ return
+ }
+ proxyResolver, err := r.proxyURIResolver(resolver.BuildOptions{})
+ if err != nil {
+ r.cc.ReportError(fmt.Errorf("delegating_resolver: unable to build the proxy resolver: %v", err))
+ return
+ }
+ r.proxyResolver = proxyResolver
+ }()
+ }
+
err := r.updateClientConnStateLocked()
if err != nil {
- r.proxyResolver.ResolveNow(resolver.ResolveNowOptions{})
+ go func() {
+ r.childMu.Lock()
+ defer r.childMu.Unlock()
+ if r.proxyResolver != nil {
+ r.proxyResolver.ResolveNow(resolver.ResolveNowOptions{})
+ }
+ }()
}
return nil
}
@@ -311,7 +393,8 @@ func (wcc *wrappingClientConn) UpdateState(state resolver.State) error {
return wcc.stateListener(state)
}
-// ReportError intercepts errors from the child resolvers and passes them to ClientConn.
+// ReportError intercepts errors from the child resolvers and passes them to
+// ClientConn.
func (wcc *wrappingClientConn) ReportError(err error) {
wcc.parent.cc.ReportError(err)
}
@@ -322,8 +405,8 @@ func (wcc *wrappingClientConn) NewAddress(addrs []resolver.Address) {
wcc.UpdateState(resolver.State{Addresses: addrs})
}
-// ParseServiceConfig parses the provided service config and returns an
-// object that provides the parsed config.
+// ParseServiceConfig parses the provided service config and returns an object
+// that provides the parsed config.
func (wcc *wrappingClientConn) ParseServiceConfig(serviceConfigJSON string) *serviceconfig.ParseResult {
return wcc.parent.cc.ParseServiceConfig(serviceConfigJSON)
}