summaryrefslogtreecommitdiff
path: root/vendor/google.golang.org/grpc/resolver_conn_wrapper.go
diff options
context:
space:
mode:
authorLibravatar Daenney <daenney@users.noreply.github.com>2023-09-07 13:20:37 +0200
committerLibravatar GitHub <noreply@github.com>2023-09-07 13:20:37 +0200
commit14ef09809942800db57de87fe3963770a56b585b (patch)
tree89e95f21145bc7ad5745f77be1d998faa9c09695 /vendor/google.golang.org/grpc/resolver_conn_wrapper.go
parent[bugfix] fix checks for deref the same status descendants / ascendants (#2181) (diff)
downloadgotosocial-14ef09809942800db57de87fe3963770a56b585b.tar.xz
[feature] Support OTLP HTTP, drop Jaeger (#2184)
* [feature] Add http trace exporter, drop Jaeger Jaeger supports ingesting traces using the OpenTelemetry gRPC or HTTP methods. The Jaeger project has deprecated the old jaeger transport. * Add support for submitting traces over HTTP * Drop support for the old Jaeger protocol * Upgrade the trace libraries to v1.17 Fixes: #2176 Fixes: #2179
Diffstat (limited to 'vendor/google.golang.org/grpc/resolver_conn_wrapper.go')
-rw-r--r--vendor/google.golang.org/grpc/resolver_conn_wrapper.go229
1 files changed, 146 insertions, 83 deletions
diff --git a/vendor/google.golang.org/grpc/resolver_conn_wrapper.go b/vendor/google.golang.org/grpc/resolver_conn_wrapper.go
index 05a9d4e0b..b408b3688 100644
--- a/vendor/google.golang.org/grpc/resolver_conn_wrapper.go
+++ b/vendor/google.golang.org/grpc/resolver_conn_wrapper.go
@@ -19,11 +19,11 @@
package grpc
import (
+ "context"
"strings"
"sync"
"google.golang.org/grpc/balancer"
- "google.golang.org/grpc/credentials"
"google.golang.org/grpc/internal/channelz"
"google.golang.org/grpc/internal/grpcsync"
"google.golang.org/grpc/internal/pretty"
@@ -31,129 +31,192 @@ import (
"google.golang.org/grpc/serviceconfig"
)
+// resolverStateUpdater wraps the single method used by ccResolverWrapper to
+// report a state update from the actual resolver implementation.
+type resolverStateUpdater interface {
+ updateResolverState(s resolver.State, err error) error
+}
+
// ccResolverWrapper is a wrapper on top of cc for resolvers.
// It implements resolver.ClientConn interface.
type ccResolverWrapper struct {
- cc *ClientConn
- resolverMu sync.Mutex
- resolver resolver.Resolver
- done *grpcsync.Event
- curState resolver.State
+ // The following fields are initialized when the wrapper is created and are
+ // read-only afterwards, and therefore can be accessed without a mutex.
+ cc resolverStateUpdater
+ channelzID *channelz.Identifier
+ ignoreServiceConfig bool
+ opts ccResolverWrapperOpts
+ serializer *grpcsync.CallbackSerializer // To serialize all incoming calls.
+ serializerCancel context.CancelFunc // To close the serializer, accessed only from close().
+
+ // All incoming (resolver --> gRPC) calls are guaranteed to execute in a
+ // mutually exclusive manner as they are scheduled on the serializer.
+ // Fields accessed *only* in these serializer callbacks, can therefore be
+ // accessed without a mutex.
+ curState resolver.State
+
+ // mu guards access to the below fields.
+ mu sync.Mutex
+ closed bool
+ resolver resolver.Resolver // Accessed only from outgoing calls.
+}
- incomingMu sync.Mutex // Synchronizes all the incoming calls.
+// ccResolverWrapperOpts wraps the arguments to be passed when creating a new
+// ccResolverWrapper.
+type ccResolverWrapperOpts struct {
+ target resolver.Target // User specified dial target to resolve.
+ builder resolver.Builder // Resolver builder to use.
+ bOpts resolver.BuildOptions // Resolver build options to use.
+ channelzID *channelz.Identifier // Channelz identifier for the channel.
}
// newCCResolverWrapper uses the resolver.Builder to build a Resolver and
// returns a ccResolverWrapper object which wraps the newly built resolver.
-func newCCResolverWrapper(cc *ClientConn, rb resolver.Builder) (*ccResolverWrapper, error) {
+func newCCResolverWrapper(cc resolverStateUpdater, opts ccResolverWrapperOpts) (*ccResolverWrapper, error) {
+ ctx, cancel := context.WithCancel(context.Background())
ccr := &ccResolverWrapper{
- cc: cc,
- done: grpcsync.NewEvent(),
- }
-
- var credsClone credentials.TransportCredentials
- if creds := cc.dopts.copts.TransportCredentials; creds != nil {
- credsClone = creds.Clone()
- }
- rbo := resolver.BuildOptions{
- DisableServiceConfig: cc.dopts.disableServiceConfig,
- DialCreds: credsClone,
- CredsBundle: cc.dopts.copts.CredsBundle,
- Dialer: cc.dopts.copts.Dialer,
- }
-
- var err error
- // We need to hold the lock here while we assign to the ccr.resolver field
- // to guard against a data race caused by the following code path,
- // rb.Build-->ccr.ReportError-->ccr.poll-->ccr.resolveNow, would end up
- // accessing ccr.resolver which is being assigned here.
- ccr.resolverMu.Lock()
- defer ccr.resolverMu.Unlock()
- ccr.resolver, err = rb.Build(cc.parsedTarget, ccr, rbo)
+ cc: cc,
+ channelzID: opts.channelzID,
+ ignoreServiceConfig: opts.bOpts.DisableServiceConfig,
+ opts: opts,
+ serializer: grpcsync.NewCallbackSerializer(ctx),
+ serializerCancel: cancel,
+ }
+
+ // Cannot hold the lock at build time because the resolver can send an
+ // update or error inline and these incoming calls grab the lock to schedule
+ // a callback in the serializer.
+ r, err := opts.builder.Build(opts.target, ccr, opts.bOpts)
if err != nil {
+ cancel()
return nil, err
}
+
+ // Any error reported by the resolver at build time that leads to a
+ // re-resolution request from the balancer is dropped by grpc until we
+ // return from this function. So, we don't have to handle pending resolveNow
+ // requests here.
+ ccr.mu.Lock()
+ ccr.resolver = r
+ ccr.mu.Unlock()
+
return ccr, nil
}
func (ccr *ccResolverWrapper) resolveNow(o resolver.ResolveNowOptions) {
- ccr.resolverMu.Lock()
- if !ccr.done.HasFired() {
- ccr.resolver.ResolveNow(o)
+ ccr.mu.Lock()
+ defer ccr.mu.Unlock()
+
+ // ccr.resolver field is set only after the call to Build() returns. But in
+ // the process of building, the resolver may send an error update which when
+ // propagated to the balancer may result in a re-resolution request.
+ if ccr.closed || ccr.resolver == nil {
+ return
}
- ccr.resolverMu.Unlock()
+ ccr.resolver.ResolveNow(o)
}
func (ccr *ccResolverWrapper) close() {
- ccr.resolverMu.Lock()
- ccr.resolver.Close()
- ccr.done.Fire()
- ccr.resolverMu.Unlock()
+ ccr.mu.Lock()
+ if ccr.closed {
+ ccr.mu.Unlock()
+ return
+ }
+
+ channelz.Info(logger, ccr.channelzID, "Closing the name resolver")
+
+ // Close the serializer to ensure that no more calls from the resolver are
+ // handled, before actually closing the resolver.
+ ccr.serializerCancel()
+ ccr.closed = true
+ r := ccr.resolver
+ ccr.mu.Unlock()
+
+ // Give enqueued callbacks a chance to finish.
+ <-ccr.serializer.Done
+
+ // Spawn a goroutine to close the resolver (since it may block trying to
+ // cleanup all allocated resources) and return early.
+ go r.Close()
+}
+
+// serializerScheduleLocked is a convenience method to schedule a function to be
+// run on the serializer while holding ccr.mu.
+func (ccr *ccResolverWrapper) serializerScheduleLocked(f func(context.Context)) {
+ ccr.mu.Lock()
+ ccr.serializer.Schedule(f)
+ ccr.mu.Unlock()
}
+// UpdateState is called by resolver implementations to report new state to gRPC
+// which includes addresses and service config.
func (ccr *ccResolverWrapper) UpdateState(s resolver.State) error {
- ccr.incomingMu.Lock()
- defer ccr.incomingMu.Unlock()
- if ccr.done.HasFired() {
+ errCh := make(chan error, 1)
+ ok := ccr.serializer.Schedule(func(context.Context) {
+ ccr.addChannelzTraceEvent(s)
+ ccr.curState = s
+ if err := ccr.cc.updateResolverState(ccr.curState, nil); err == balancer.ErrBadResolverState {
+ errCh <- balancer.ErrBadResolverState
+ return
+ }
+ errCh <- nil
+ })
+ if !ok {
+ // The only time when Schedule() fail to add the callback to the
+ // serializer is when the serializer is closed, and this happens only
+ // when the resolver wrapper is closed.
return nil
}
- ccr.addChannelzTraceEvent(s)
- ccr.curState = s
- if err := ccr.cc.updateResolverState(ccr.curState, nil); err == balancer.ErrBadResolverState {
- return balancer.ErrBadResolverState
- }
- return nil
+ return <-errCh
}
+// ReportError is called by resolver implementations to report errors
+// encountered during name resolution to gRPC.
func (ccr *ccResolverWrapper) ReportError(err error) {
- ccr.incomingMu.Lock()
- defer ccr.incomingMu.Unlock()
- if ccr.done.HasFired() {
- return
- }
- channelz.Warningf(logger, ccr.cc.channelzID, "ccResolverWrapper: reporting error to cc: %v", err)
- ccr.cc.updateResolverState(resolver.State{}, err)
+ ccr.serializerScheduleLocked(func(_ context.Context) {
+ channelz.Warningf(logger, ccr.channelzID, "ccResolverWrapper: reporting error to cc: %v", err)
+ ccr.cc.updateResolverState(resolver.State{}, err)
+ })
}
-// NewAddress is called by the resolver implementation to send addresses to gRPC.
+// NewAddress is called by the resolver implementation to send addresses to
+// gRPC.
func (ccr *ccResolverWrapper) NewAddress(addrs []resolver.Address) {
- ccr.incomingMu.Lock()
- defer ccr.incomingMu.Unlock()
- if ccr.done.HasFired() {
- return
- }
- ccr.addChannelzTraceEvent(resolver.State{Addresses: addrs, ServiceConfig: ccr.curState.ServiceConfig})
- ccr.curState.Addresses = addrs
- ccr.cc.updateResolverState(ccr.curState, nil)
+ ccr.serializerScheduleLocked(func(_ context.Context) {
+ ccr.addChannelzTraceEvent(resolver.State{Addresses: addrs, ServiceConfig: ccr.curState.ServiceConfig})
+ ccr.curState.Addresses = addrs
+ ccr.cc.updateResolverState(ccr.curState, nil)
+ })
}
// NewServiceConfig is called by the resolver implementation to send service
// configs to gRPC.
func (ccr *ccResolverWrapper) NewServiceConfig(sc string) {
- ccr.incomingMu.Lock()
- defer ccr.incomingMu.Unlock()
- if ccr.done.HasFired() {
- return
- }
- channelz.Infof(logger, ccr.cc.channelzID, "ccResolverWrapper: got new service config: %s", sc)
- if ccr.cc.dopts.disableServiceConfig {
- channelz.Info(logger, ccr.cc.channelzID, "Service config lookups disabled; ignoring config")
- return
- }
- scpr := parseServiceConfig(sc)
- if scpr.Err != nil {
- channelz.Warningf(logger, ccr.cc.channelzID, "ccResolverWrapper: error parsing service config: %v", scpr.Err)
- return
- }
- ccr.addChannelzTraceEvent(resolver.State{Addresses: ccr.curState.Addresses, ServiceConfig: scpr})
- ccr.curState.ServiceConfig = scpr
- ccr.cc.updateResolverState(ccr.curState, nil)
+ ccr.serializerScheduleLocked(func(_ context.Context) {
+ channelz.Infof(logger, ccr.channelzID, "ccResolverWrapper: got new service config: %s", sc)
+ if ccr.ignoreServiceConfig {
+ channelz.Info(logger, ccr.channelzID, "Service config lookups disabled; ignoring config")
+ return
+ }
+ scpr := parseServiceConfig(sc)
+ if scpr.Err != nil {
+ channelz.Warningf(logger, ccr.channelzID, "ccResolverWrapper: error parsing service config: %v", scpr.Err)
+ return
+ }
+ ccr.addChannelzTraceEvent(resolver.State{Addresses: ccr.curState.Addresses, ServiceConfig: scpr})
+ ccr.curState.ServiceConfig = scpr
+ ccr.cc.updateResolverState(ccr.curState, nil)
+ })
}
+// ParseServiceConfig is called by resolver implementations to parse a JSON
+// representation of the service config.
func (ccr *ccResolverWrapper) ParseServiceConfig(scJSON string) *serviceconfig.ParseResult {
return parseServiceConfig(scJSON)
}
+// addChannelzTraceEvent adds a channelz trace event containing the new
+// state received from resolver implementations.
func (ccr *ccResolverWrapper) addChannelzTraceEvent(s resolver.State) {
var updates []string
var oldSC, newSC *ServiceConfig
@@ -172,5 +235,5 @@ func (ccr *ccResolverWrapper) addChannelzTraceEvent(s resolver.State) {
} else if len(ccr.curState.Addresses) == 0 && len(s.Addresses) > 0 {
updates = append(updates, "resolver returned new addresses")
}
- channelz.Infof(logger, ccr.cc.channelzID, "Resolver state updated: %s (%v)", pretty.ToJSON(s), strings.Join(updates, "; "))
+ channelz.Infof(logger, ccr.channelzID, "Resolver state updated: %s (%v)", pretty.ToJSON(s), strings.Join(updates, "; "))
}