1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
|
/*
*
* Copyright 2024 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
// Package delegatingresolver implements a resolver capable of resolving both
// target URIs and proxy addresses.
package delegatingresolver
import (
"fmt"
"net/http"
"net/url"
"sync"
"google.golang.org/grpc/grpclog"
"google.golang.org/grpc/internal/proxyattributes"
"google.golang.org/grpc/internal/transport"
"google.golang.org/grpc/internal/transport/networktype"
"google.golang.org/grpc/resolver"
"google.golang.org/grpc/serviceconfig"
)
var (
logger = grpclog.Component("delegating-resolver")
// HTTPSProxyFromEnvironment will be overwritten in the tests
HTTPSProxyFromEnvironment = http.ProxyFromEnvironment
)
// delegatingResolver manages both target URI and proxy address resolution by
// delegating these tasks to separate child resolvers. Essentially, it acts as
// an intermediary between the gRPC ClientConn and the child resolvers.
//
// It implements the [resolver.Resolver] interface.
type delegatingResolver struct {
target resolver.Target // parsed target URI to be resolved
cc resolver.ClientConn // gRPC ClientConn
proxyURL *url.URL // proxy URL, derived from proxy environment and target
// We do not hold both mu and childMu in the same goroutine. Avoid holding
// both locks when calling into the child, as the child resolver may
// synchronously callback into the channel.
mu sync.Mutex // protects all the fields below
targetResolverState *resolver.State // state of the target resolver
proxyAddrs []resolver.Address // resolved proxy addresses; empty if no proxy is configured
// childMu serializes calls into child resolvers. It also protects access to
// the following fields.
childMu sync.Mutex
targetResolver resolver.Resolver // resolver for the target URI, based on its scheme
proxyResolver resolver.Resolver // resolver for the proxy URI; nil if no proxy is configured
}
// nopResolver is a resolver that does nothing.
type nopResolver struct{}
func (nopResolver) ResolveNow(resolver.ResolveNowOptions) {}
func (nopResolver) Close() {}
// proxyURLForTarget determines the proxy URL for the given address based on the
// environment. It can return the following:
// - nil URL, nil error: No proxy is configured or the address is excluded
// using the `NO_PROXY` environment variable or if req.URL.Host is
// "localhost" (with or without // a port number)
// - nil URL, non-nil error: An error occurred while retrieving the proxy URL.
// - non-nil URL, nil error: A proxy is configured, and the proxy URL was
// retrieved successfully without any errors.
func proxyURLForTarget(address string) (*url.URL, error) {
req := &http.Request{URL: &url.URL{
Scheme: "https",
Host: address,
}}
return HTTPSProxyFromEnvironment(req)
}
// New creates a new delegating resolver that can create up to two child
// resolvers:
// - one to resolve the proxy address specified using the supported
// environment variables. This uses the registered resolver for the "dns"
// scheme. It is lazily built when a target resolver update contains at least
// one TCP address.
// - one to resolve the target URI using the resolver specified by the scheme
// in the target URI or specified by the user using the WithResolvers dial
// option. As a special case, if the target URI's scheme is "dns" and a
// proxy is specified using the supported environment variables, the target
// URI's path portion is used as the resolved address unless target
// resolution is enabled using the dial option.
func New(target resolver.Target, cc resolver.ClientConn, opts resolver.BuildOptions, targetResolverBuilder resolver.Builder, targetResolutionEnabled bool) (resolver.Resolver, error) {
r := &delegatingResolver{
target: target,
cc: cc,
proxyResolver: nopResolver{},
targetResolver: nopResolver{},
}
var err error
r.proxyURL, err = proxyURLForTarget(target.Endpoint())
if err != nil {
return nil, fmt.Errorf("delegating_resolver: failed to determine proxy URL for target %s: %v", target, err)
}
// proxy is not configured or proxy address excluded using `NO_PROXY` env
// var, so only target resolver is used.
if r.proxyURL == nil {
return targetResolverBuilder.Build(target, cc, opts)
}
if logger.V(2) {
logger.Infof("Proxy URL detected : %s", r.proxyURL)
}
// Resolver updates from one child may trigger calls into the other. Block
// updates until the children are initialized.
r.childMu.Lock()
defer r.childMu.Unlock()
// When the scheme is 'dns' and target resolution on client is not enabled,
// resolution should be handled by the proxy, not the client. Therefore, we
// bypass the target resolver and store the unresolved target address.
if target.URL.Scheme == "dns" && !targetResolutionEnabled {
r.targetResolverState = &resolver.State{
Addresses: []resolver.Address{{Addr: target.Endpoint()}},
Endpoints: []resolver.Endpoint{{Addresses: []resolver.Address{{Addr: target.Endpoint()}}}},
}
r.updateTargetResolverState(*r.targetResolverState)
return r, nil
}
wcc := &wrappingClientConn{
stateListener: r.updateTargetResolverState,
parent: r,
}
if r.targetResolver, err = targetResolverBuilder.Build(target, wcc, opts); err != nil {
return nil, fmt.Errorf("delegating_resolver: unable to build the resolver for target %s: %v", target, err)
}
return r, nil
}
// proxyURIResolver creates a resolver for resolving proxy URIs using the "dns"
// scheme. It adjusts the proxyURL to conform to the "dns:///" format and builds
// a resolver with a wrappingClientConn to capture resolved addresses.
func (r *delegatingResolver) proxyURIResolver(opts resolver.BuildOptions) (resolver.Resolver, error) {
proxyBuilder := resolver.Get("dns")
if proxyBuilder == nil {
panic("delegating_resolver: resolver for proxy not found for scheme dns")
}
url := *r.proxyURL
url.Scheme = "dns"
url.Path = "/" + r.proxyURL.Host
url.Host = "" // Clear the Host field to conform to the "dns:///" format
proxyTarget := resolver.Target{URL: url}
wcc := &wrappingClientConn{
stateListener: r.updateProxyResolverState,
parent: r,
}
return proxyBuilder.Build(proxyTarget, wcc, opts)
}
func (r *delegatingResolver) ResolveNow(o resolver.ResolveNowOptions) {
r.childMu.Lock()
defer r.childMu.Unlock()
r.targetResolver.ResolveNow(o)
r.proxyResolver.ResolveNow(o)
}
func (r *delegatingResolver) Close() {
r.childMu.Lock()
defer r.childMu.Unlock()
r.targetResolver.Close()
r.targetResolver = nil
r.proxyResolver.Close()
r.proxyResolver = nil
}
func networkTypeFromAddr(addr resolver.Address) string {
networkType, ok := networktype.Get(addr)
if !ok {
networkType, _ = transport.ParseDialTarget(addr.Addr)
}
return networkType
}
func isTCPAddressPresent(state *resolver.State) bool {
for _, addr := range state.Addresses {
if networkType := networkTypeFromAddr(addr); networkType == "tcp" {
return true
}
}
for _, endpoint := range state.Endpoints {
for _, addr := range endpoint.Addresses {
if networktype := networkTypeFromAddr(addr); networktype == "tcp" {
return true
}
}
}
return false
}
// updateClientConnStateLocked constructs a combined list of addresses by
// pairing each proxy address with every target address of type TCP. For each
// pair, it creates a new [resolver.Address] using the proxy address and
// attaches the corresponding target address and user info as attributes. Target
// addresses that are not of type TCP are appended to the list as-is. The
// function returns nil if either resolver has not yet provided an update, and
// returns the result of ClientConn.UpdateState once both resolvers have
// provided at least one update.
func (r *delegatingResolver) updateClientConnStateLocked() error {
if r.targetResolverState == nil || r.proxyAddrs == nil {
return nil
}
// If multiple resolved proxy addresses are present, we send only the
// unresolved proxy host and let net.Dial handle the proxy host name
// resolution when creating the transport. Sending all resolved addresses
// would increase the number of addresses passed to the ClientConn and
// subsequently to load balancing (LB) policies like Round Robin, leading
// to additional TCP connections. However, if there's only one resolved
// proxy address, we send it directly, as it doesn't affect the address
// count returned by the target resolver and the address count sent to the
// ClientConn.
var proxyAddr resolver.Address
if len(r.proxyAddrs) == 1 {
proxyAddr = r.proxyAddrs[0]
} else {
proxyAddr = resolver.Address{Addr: r.proxyURL.Host}
}
var addresses []resolver.Address
for _, targetAddr := range (*r.targetResolverState).Addresses {
// Avoid proxy when network is not tcp.
if networkType := networkTypeFromAddr(targetAddr); networkType != "tcp" {
addresses = append(addresses, targetAddr)
continue
}
addresses = append(addresses, proxyattributes.Set(proxyAddr, proxyattributes.Options{
User: r.proxyURL.User,
ConnectAddr: targetAddr.Addr,
}))
}
// For each target endpoint, construct a new [resolver.Endpoint] that
// includes all addresses from all proxy endpoints and the addresses from
// that target endpoint, preserving the number of target endpoints.
var endpoints []resolver.Endpoint
for _, endpt := range (*r.targetResolverState).Endpoints {
var addrs []resolver.Address
for _, targetAddr := range endpt.Addresses {
// Avoid proxy when network is not tcp.
if networkType := networkTypeFromAddr(targetAddr); networkType != "tcp" {
addrs = append(addrs, targetAddr)
continue
}
for _, proxyAddr := range r.proxyAddrs {
addrs = append(addrs, proxyattributes.Set(proxyAddr, proxyattributes.Options{
User: r.proxyURL.User,
ConnectAddr: targetAddr.Addr,
}))
}
}
endpoints = append(endpoints, resolver.Endpoint{Addresses: addrs})
}
// Use the targetResolverState for its service config and attributes
// contents. The state update is only sent after both the target and proxy
// resolvers have sent their updates, and curState has been updated with the
// combined addresses.
curState := *r.targetResolverState
curState.Addresses = addresses
curState.Endpoints = endpoints
return r.cc.UpdateState(curState)
}
// updateProxyResolverState updates the proxy resolver state by storing proxy
// addresses and endpoints, marking the resolver as ready, and triggering a
// state update if both proxy and target resolvers are ready. If the ClientConn
// returns a non-nil error, it calls `ResolveNow()` on the target resolver. It
// is a StateListener function of wrappingClientConn passed to the proxy
// resolver.
func (r *delegatingResolver) updateProxyResolverState(state resolver.State) error {
r.mu.Lock()
defer r.mu.Unlock()
if logger.V(2) {
logger.Infof("Addresses received from proxy resolver: %s", state.Addresses)
}
if len(state.Endpoints) > 0 {
// We expect exactly one address per endpoint because the proxy resolver
// uses "dns" resolution.
r.proxyAddrs = make([]resolver.Address, 0, len(state.Endpoints))
for _, endpoint := range state.Endpoints {
r.proxyAddrs = append(r.proxyAddrs, endpoint.Addresses...)
}
} else if state.Addresses != nil {
r.proxyAddrs = state.Addresses
} else {
r.proxyAddrs = []resolver.Address{} // ensure proxyAddrs is non-nil to indicate an update has been received
}
err := r.updateClientConnStateLocked()
// Another possible approach was to block until updates are received from
// both resolvers. But this is not used because calling `New()` triggers
// `Build()` for the first resolver, which calls `UpdateState()`. And the
// second resolver hasn't sent an update yet, so it would cause `New()` to
// block indefinitely.
if err != nil {
go func() {
r.childMu.Lock()
defer r.childMu.Unlock()
if r.targetResolver != nil {
r.targetResolver.ResolveNow(resolver.ResolveNowOptions{})
}
}()
}
return err
}
// updateTargetResolverState is the StateListener function provided to the
// target resolver via wrappingClientConn. It updates the resolver state and
// marks the target resolver as ready. If the update includes at least one TCP
// address and the proxy resolver has not yet been constructed, it initializes
// the proxy resolver. A combined state update is triggered once both resolvers
// are ready. If all addresses are non-TCP, it proceeds without waiting for the
// proxy resolver. If ClientConn.UpdateState returns a non-nil error,
// ResolveNow() is called on the proxy resolver.
func (r *delegatingResolver) updateTargetResolverState(state resolver.State) error {
r.mu.Lock()
defer r.mu.Unlock()
if logger.V(2) {
logger.Infof("Addresses received from target resolver: %v", state.Addresses)
}
r.targetResolverState = &state
// If no addresses returned by resolver have network type as tcp , do not
// wait for proxy update.
if !isTCPAddressPresent(r.targetResolverState) {
return r.cc.UpdateState(*r.targetResolverState)
}
// The proxy resolver may be rebuilt multiple times, specifically each time
// the target resolver sends an update, even if the target resolver is built
// successfully but building the proxy resolver fails.
if len(r.proxyAddrs) == 0 {
go func() {
r.childMu.Lock()
defer r.childMu.Unlock()
if _, ok := r.proxyResolver.(nopResolver); !ok {
return
}
proxyResolver, err := r.proxyURIResolver(resolver.BuildOptions{})
if err != nil {
r.cc.ReportError(fmt.Errorf("delegating_resolver: unable to build the proxy resolver: %v", err))
return
}
r.proxyResolver = proxyResolver
}()
}
err := r.updateClientConnStateLocked()
if err != nil {
go func() {
r.childMu.Lock()
defer r.childMu.Unlock()
if r.proxyResolver != nil {
r.proxyResolver.ResolveNow(resolver.ResolveNowOptions{})
}
}()
}
return nil
}
// wrappingClientConn serves as an intermediary between the parent ClientConn
// and the child resolvers created here. It implements the resolver.ClientConn
// interface and is passed in that capacity to the child resolvers.
type wrappingClientConn struct {
// Callback to deliver resolver state updates
stateListener func(state resolver.State) error
parent *delegatingResolver
}
// UpdateState receives resolver state updates and forwards them to the
// appropriate listener function (either for the proxy or target resolver).
func (wcc *wrappingClientConn) UpdateState(state resolver.State) error {
return wcc.stateListener(state)
}
// ReportError intercepts errors from the child resolvers and passes them to
// ClientConn.
func (wcc *wrappingClientConn) ReportError(err error) {
wcc.parent.cc.ReportError(err)
}
// NewAddress intercepts the new resolved address from the child resolvers and
// passes them to ClientConn.
func (wcc *wrappingClientConn) NewAddress(addrs []resolver.Address) {
wcc.UpdateState(resolver.State{Addresses: addrs})
}
// ParseServiceConfig parses the provided service config and returns an object
// that provides the parsed config.
func (wcc *wrappingClientConn) ParseServiceConfig(serviceConfigJSON string) *serviceconfig.ParseResult {
return wcc.parent.cc.ParseServiceConfig(serviceConfigJSON)
}
|