diff options
Diffstat (limited to 'vendor/google.golang.org/grpc/internal')
16 files changed, 481 insertions, 56 deletions
| diff --git a/vendor/google.golang.org/grpc/internal/binarylog/binarylog.go b/vendor/google.golang.org/grpc/internal/binarylog/binarylog.go index af03a40d9..755fdebc1 100644 --- a/vendor/google.golang.org/grpc/internal/binarylog/binarylog.go +++ b/vendor/google.golang.org/grpc/internal/binarylog/binarylog.go @@ -32,6 +32,9 @@ var grpclogLogger = grpclog.Component("binarylog")  // Logger specifies MethodLoggers for method names with a Log call that  // takes a context. +// +// This is used in the 1.0 release of gcp/observability, and thus must not be +// deleted or changed.  type Logger interface {  	GetMethodLogger(methodName string) MethodLogger  } diff --git a/vendor/google.golang.org/grpc/internal/binarylog/method_logger.go b/vendor/google.golang.org/grpc/internal/binarylog/method_logger.go index 56fcf008d..6c3f63221 100644 --- a/vendor/google.golang.org/grpc/internal/binarylog/method_logger.go +++ b/vendor/google.golang.org/grpc/internal/binarylog/method_logger.go @@ -49,6 +49,9 @@ func (g *callIDGenerator) reset() {  var idGen callIDGenerator  // MethodLogger is the sub-logger for each method. +// +// This is used in the 1.0 release of gcp/observability, and thus must not be +// deleted or changed.  type MethodLogger interface {  	Log(context.Context, LogEntryConfig)  } @@ -65,6 +68,9 @@ type TruncatingMethodLogger struct {  }  // NewTruncatingMethodLogger returns a new truncating method logger. +// +// This is used in the 1.0 release of gcp/observability, and thus must not be +// deleted or changed.  func NewTruncatingMethodLogger(h, m uint64) *TruncatingMethodLogger {  	return &TruncatingMethodLogger{  		headerMaxLen:  h, @@ -145,6 +151,9 @@ func (ml *TruncatingMethodLogger) truncateMessage(msgPb *binlogpb.Message) (trun  }  // LogEntryConfig represents the configuration for binary log entry. +// +// This is used in the 1.0 release of gcp/observability, and thus must not be +// deleted or changed.  type LogEntryConfig interface {  	toProto() *binlogpb.GrpcLogEntry  } diff --git a/vendor/google.golang.org/grpc/internal/buffer/unbounded.go b/vendor/google.golang.org/grpc/internal/buffer/unbounded.go index 9f6a0c120..81c2f5fd7 100644 --- a/vendor/google.golang.org/grpc/internal/buffer/unbounded.go +++ b/vendor/google.golang.org/grpc/internal/buffer/unbounded.go @@ -35,6 +35,7 @@ import "sync"  // internal/transport/transport.go for an example of this.  type Unbounded struct {  	c       chan interface{} +	closed  bool  	mu      sync.Mutex  	backlog []interface{}  } @@ -47,16 +48,18 @@ func NewUnbounded() *Unbounded {  // Put adds t to the unbounded buffer.  func (b *Unbounded) Put(t interface{}) {  	b.mu.Lock() +	defer b.mu.Unlock() +	if b.closed { +		return +	}  	if len(b.backlog) == 0 {  		select {  		case b.c <- t: -			b.mu.Unlock()  			return  		default:  		}  	}  	b.backlog = append(b.backlog, t) -	b.mu.Unlock()  }  // Load sends the earliest buffered data, if any, onto the read channel @@ -64,6 +67,10 @@ func (b *Unbounded) Put(t interface{}) {  // value from the read channel.  func (b *Unbounded) Load() {  	b.mu.Lock() +	defer b.mu.Unlock() +	if b.closed { +		return +	}  	if len(b.backlog) > 0 {  		select {  		case b.c <- b.backlog[0]: @@ -72,7 +79,6 @@ func (b *Unbounded) Load() {  		default:  		}  	} -	b.mu.Unlock()  }  // Get returns a read channel on which values added to the buffer, via Put(), @@ -80,6 +86,20 @@ func (b *Unbounded) Load() {  //  // Upon reading a value from this channel, users are expected to call Load() to  // send the next buffered value onto the channel if there is any. +// +// If the unbounded buffer is closed, the read channel returned by this method +// is closed.  func (b *Unbounded) Get() <-chan interface{} {  	return b.c  } + +// Close closes the unbounded buffer. +func (b *Unbounded) Close() { +	b.mu.Lock() +	defer b.mu.Unlock() +	if b.closed { +		return +	} +	b.closed = true +	close(b.c) +} diff --git a/vendor/google.golang.org/grpc/internal/envconfig/envconfig.go b/vendor/google.golang.org/grpc/internal/envconfig/envconfig.go index 5ba9d94d4..77c2c0b89 100644 --- a/vendor/google.golang.org/grpc/internal/envconfig/envconfig.go +++ b/vendor/google.golang.org/grpc/internal/envconfig/envconfig.go @@ -36,6 +36,13 @@ var (  	// "GRPC_RING_HASH_CAP".  This does not override the default bounds  	// checking which NACKs configs specifying ring sizes > 8*1024*1024 (~8M).  	RingHashCap = uint64FromEnv("GRPC_RING_HASH_CAP", 4096, 1, 8*1024*1024) +	// PickFirstLBConfig is set if we should support configuration of the +	// pick_first LB policy, which can be enabled by setting the environment +	// variable "GRPC_EXPERIMENTAL_PICKFIRST_LB_CONFIG" to "true". +	PickFirstLBConfig = boolFromEnv("GRPC_EXPERIMENTAL_PICKFIRST_LB_CONFIG", false) +	// ALTSMaxConcurrentHandshakes is the maximum number of concurrent ALTS +	// handshakes that can be performed. +	ALTSMaxConcurrentHandshakes = uint64FromEnv("GRPC_ALTS_MAX_CONCURRENT_HANDSHAKES", 100, 1, 100)  )  func boolFromEnv(envVar string, def bool) bool { diff --git a/vendor/google.golang.org/grpc/internal/envconfig/observability.go b/vendor/google.golang.org/grpc/internal/envconfig/observability.go index 821dd0a7c..dd314cfb1 100644 --- a/vendor/google.golang.org/grpc/internal/envconfig/observability.go +++ b/vendor/google.golang.org/grpc/internal/envconfig/observability.go @@ -28,9 +28,15 @@ const (  var (  	// ObservabilityConfig is the json configuration for the gcp/observability  	// package specified directly in the envObservabilityConfig env var. +	// +	// This is used in the 1.0 release of gcp/observability, and thus must not be +	// deleted or changed.  	ObservabilityConfig = os.Getenv(envObservabilityConfig)  	// ObservabilityConfigFile is the json configuration for the  	// gcp/observability specified in a file with the location specified in  	// envObservabilityConfigFile env var. +	// +	// This is used in the 1.0 release of gcp/observability, and thus must not be +	// deleted or changed.  	ObservabilityConfigFile = os.Getenv(envObservabilityConfigFile)  ) diff --git a/vendor/google.golang.org/grpc/internal/envconfig/xds.go b/vendor/google.golang.org/grpc/internal/envconfig/xds.go index 3b17705ba..02b4b6a1c 100644 --- a/vendor/google.golang.org/grpc/internal/envconfig/xds.go +++ b/vendor/google.golang.org/grpc/internal/envconfig/xds.go @@ -61,11 +61,10 @@ var (  	// have a brand new API on the server-side and users explicitly need to use  	// the new API to get security integration on the server.  	XDSClientSideSecurity = boolFromEnv("GRPC_XDS_EXPERIMENTAL_SECURITY_SUPPORT", true) -	// XDSAggregateAndDNS indicates whether processing of aggregated cluster -	// and DNS cluster is enabled, which can be enabled by setting the -	// environment variable -	// "GRPC_XDS_EXPERIMENTAL_ENABLE_AGGREGATE_AND_LOGICAL_DNS_CLUSTER" to -	// "true". +	// XDSAggregateAndDNS indicates whether processing of aggregated cluster and +	// DNS cluster is enabled, which can be disabled by setting the environment +	// variable "GRPC_XDS_EXPERIMENTAL_ENABLE_AGGREGATE_AND_LOGICAL_DNS_CLUSTER" +	// to "false".  	XDSAggregateAndDNS = boolFromEnv("GRPC_XDS_EXPERIMENTAL_ENABLE_AGGREGATE_AND_LOGICAL_DNS_CLUSTER", true)  	// XDSRBAC indicates whether xDS configured RBAC HTTP Filter is enabled, @@ -82,11 +81,15 @@ var (  	XDSFederation = boolFromEnv("GRPC_EXPERIMENTAL_XDS_FEDERATION", true)  	// XDSRLS indicates whether processing of Cluster Specifier plugins and -	// support for the RLS CLuster Specifier is enabled, which can be enabled by +	// support for the RLS CLuster Specifier is enabled, which can be disabled by  	// setting the environment variable "GRPC_EXPERIMENTAL_XDS_RLS_LB" to -	// "true". -	XDSRLS = boolFromEnv("GRPC_EXPERIMENTAL_XDS_RLS_LB", false) +	// "false". +	XDSRLS = boolFromEnv("GRPC_EXPERIMENTAL_XDS_RLS_LB", true)  	// C2PResolverTestOnlyTrafficDirectorURI is the TD URI for testing.  	C2PResolverTestOnlyTrafficDirectorURI = os.Getenv("GRPC_TEST_ONLY_GOOGLE_C2P_RESOLVER_TRAFFIC_DIRECTOR_URI") +	// XDSCustomLBPolicy indicates whether Custom LB Policies are enabled, which +	// can be disabled by setting the environment variable +	// "GRPC_EXPERIMENTAL_XDS_CUSTOM_LB_CONFIG" to "false". +	XDSCustomLBPolicy = boolFromEnv("GRPC_EXPERIMENTAL_XDS_CUSTOM_LB_CONFIG", true)  ) diff --git a/vendor/google.golang.org/grpc/internal/grpcrand/grpcrand.go b/vendor/google.golang.org/grpc/internal/grpcrand/grpcrand.go index 517ea7064..aa97273e7 100644 --- a/vendor/google.golang.org/grpc/internal/grpcrand/grpcrand.go +++ b/vendor/google.golang.org/grpc/internal/grpcrand/grpcrand.go @@ -72,3 +72,24 @@ func Uint64() uint64 {  	defer mu.Unlock()  	return r.Uint64()  } + +// Uint32 implements rand.Uint32 on the grpcrand global source. +func Uint32() uint32 { +	mu.Lock() +	defer mu.Unlock() +	return r.Uint32() +} + +// ExpFloat64 implements rand.ExpFloat64 on the grpcrand global source. +func ExpFloat64() float64 { +	mu.Lock() +	defer mu.Unlock() +	return r.ExpFloat64() +} + +// Shuffle implements rand.Shuffle on the grpcrand global source. +var Shuffle = func(n int, f func(int, int)) { +	mu.Lock() +	defer mu.Unlock() +	r.Shuffle(n, f) +} diff --git a/vendor/google.golang.org/grpc/internal/grpcsync/callback_serializer.go b/vendor/google.golang.org/grpc/internal/grpcsync/callback_serializer.go index 79993d343..37b8d4117 100644 --- a/vendor/google.golang.org/grpc/internal/grpcsync/callback_serializer.go +++ b/vendor/google.golang.org/grpc/internal/grpcsync/callback_serializer.go @@ -20,6 +20,7 @@ package grpcsync  import (  	"context" +	"sync"  	"google.golang.org/grpc/internal/buffer"  ) @@ -31,15 +32,26 @@ import (  //  // This type is safe for concurrent access.  type CallbackSerializer struct { +	// Done is closed once the serializer is shut down completely, i.e all +	// scheduled callbacks are executed and the serializer has deallocated all +	// its resources. +	Done chan struct{} +  	callbacks *buffer.Unbounded +	closedMu  sync.Mutex +	closed    bool  }  // NewCallbackSerializer returns a new CallbackSerializer instance. The provided  // context will be passed to the scheduled callbacks. Users should cancel the  // provided context to shutdown the CallbackSerializer. It is guaranteed that no -// callbacks will be executed once this context is canceled. +// callbacks will be added once this context is canceled, and any pending un-run +// callbacks will be executed before the serializer is shut down.  func NewCallbackSerializer(ctx context.Context) *CallbackSerializer { -	t := &CallbackSerializer{callbacks: buffer.NewUnbounded()} +	t := &CallbackSerializer{ +		Done:      make(chan struct{}), +		callbacks: buffer.NewUnbounded(), +	}  	go t.run(ctx)  	return t  } @@ -48,18 +60,60 @@ func NewCallbackSerializer(ctx context.Context) *CallbackSerializer {  //  // Callbacks are expected to honor the context when performing any blocking  // operations, and should return early when the context is canceled. -func (t *CallbackSerializer) Schedule(f func(ctx context.Context)) { +// +// Return value indicates if the callback was successfully added to the list of +// callbacks to be executed by the serializer. It is not possible to add +// callbacks once the context passed to NewCallbackSerializer is cancelled. +func (t *CallbackSerializer) Schedule(f func(ctx context.Context)) bool { +	t.closedMu.Lock() +	defer t.closedMu.Unlock() + +	if t.closed { +		return false +	}  	t.callbacks.Put(f) +	return true  }  func (t *CallbackSerializer) run(ctx context.Context) { +	var backlog []func(context.Context) + +	defer close(t.Done)  	for ctx.Err() == nil {  		select {  		case <-ctx.Done(): -			return -		case callback := <-t.callbacks.Get(): +			// Do nothing here. Next iteration of the for loop will not happen, +			// since ctx.Err() would be non-nil. +		case callback, ok := <-t.callbacks.Get(): +			if !ok { +				return +			}  			t.callbacks.Load()  			callback.(func(ctx context.Context))(ctx)  		}  	} + +	// Fetch pending callbacks if any, and execute them before returning from +	// this method and closing t.Done. +	t.closedMu.Lock() +	t.closed = true +	backlog = t.fetchPendingCallbacks() +	t.callbacks.Close() +	t.closedMu.Unlock() +	for _, b := range backlog { +		b(ctx) +	} +} + +func (t *CallbackSerializer) fetchPendingCallbacks() []func(context.Context) { +	var backlog []func(context.Context) +	for { +		select { +		case b := <-t.callbacks.Get(): +			backlog = append(backlog, b.(func(context.Context))) +			t.callbacks.Load() +		default: +			return backlog +		} +	}  } diff --git a/vendor/google.golang.org/grpc/internal/grpcsync/pubsub.go b/vendor/google.golang.org/grpc/internal/grpcsync/pubsub.go new file mode 100644 index 000000000..f58b5ffa6 --- /dev/null +++ b/vendor/google.golang.org/grpc/internal/grpcsync/pubsub.go @@ -0,0 +1,136 @@ +/* + * + * Copyright 2023 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + *     http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package grpcsync + +import ( +	"context" +	"sync" +) + +// Subscriber represents an entity that is subscribed to messages published on +// a PubSub. It wraps the callback to be invoked by the PubSub when a new +// message is published. +type Subscriber interface { +	// OnMessage is invoked when a new message is published. Implementations +	// must not block in this method. +	OnMessage(msg interface{}) +} + +// PubSub is a simple one-to-many publish-subscribe system that supports +// messages of arbitrary type. It guarantees that messages are delivered in +// the same order in which they were published. +// +// Publisher invokes the Publish() method to publish new messages, while +// subscribers interested in receiving these messages register a callback +// via the Subscribe() method. +// +// Once a PubSub is stopped, no more messages can be published, and +// it is guaranteed that no more subscriber callback will be invoked. +type PubSub struct { +	cs     *CallbackSerializer +	cancel context.CancelFunc + +	// Access to the below fields are guarded by this mutex. +	mu          sync.Mutex +	msg         interface{} +	subscribers map[Subscriber]bool +	stopped     bool +} + +// NewPubSub returns a new PubSub instance. +func NewPubSub() *PubSub { +	ctx, cancel := context.WithCancel(context.Background()) +	return &PubSub{ +		cs:          NewCallbackSerializer(ctx), +		cancel:      cancel, +		subscribers: map[Subscriber]bool{}, +	} +} + +// Subscribe registers the provided Subscriber to the PubSub. +// +// If the PubSub contains a previously published message, the Subscriber's +// OnMessage() callback will be invoked asynchronously with the existing +// message to begin with, and subsequently for every newly published message. +// +// The caller is responsible for invoking the returned cancel function to +// unsubscribe itself from the PubSub. +func (ps *PubSub) Subscribe(sub Subscriber) (cancel func()) { +	ps.mu.Lock() +	defer ps.mu.Unlock() + +	if ps.stopped { +		return func() {} +	} + +	ps.subscribers[sub] = true + +	if ps.msg != nil { +		msg := ps.msg +		ps.cs.Schedule(func(context.Context) { +			ps.mu.Lock() +			defer ps.mu.Unlock() +			if !ps.subscribers[sub] { +				return +			} +			sub.OnMessage(msg) +		}) +	} + +	return func() { +		ps.mu.Lock() +		defer ps.mu.Unlock() +		delete(ps.subscribers, sub) +	} +} + +// Publish publishes the provided message to the PubSub, and invokes +// callbacks registered by subscribers asynchronously. +func (ps *PubSub) Publish(msg interface{}) { +	ps.mu.Lock() +	defer ps.mu.Unlock() + +	if ps.stopped { +		return +	} + +	ps.msg = msg +	for sub := range ps.subscribers { +		s := sub +		ps.cs.Schedule(func(context.Context) { +			ps.mu.Lock() +			defer ps.mu.Unlock() +			if !ps.subscribers[s] { +				return +			} +			s.OnMessage(msg) +		}) +	} +} + +// Stop shuts down the PubSub and releases any resources allocated by it. +// It is guaranteed that no subscriber callbacks would be invoked once this +// method returns. +func (ps *PubSub) Stop() { +	ps.mu.Lock() +	defer ps.mu.Unlock() +	ps.stopped = true + +	ps.cancel() +} diff --git a/vendor/google.golang.org/grpc/internal/internal.go b/vendor/google.golang.org/grpc/internal/internal.go index 836b6a3b3..42ff39c84 100644 --- a/vendor/google.golang.org/grpc/internal/internal.go +++ b/vendor/google.golang.org/grpc/internal/internal.go @@ -60,6 +60,9 @@ var (  	GetServerCredentials interface{} // func (*grpc.Server) credentials.TransportCredentials  	// CanonicalString returns the canonical string of the code defined here:  	// https://github.com/grpc/grpc/blob/master/doc/statuscodes.md. +	// +	// This is used in the 1.0 release of gcp/observability, and thus must not be +	// deleted or changed.  	CanonicalString interface{} // func (codes.Code) string  	// DrainServerTransports initiates a graceful close of existing connections  	// on a gRPC server accepted on the provided listener address. An @@ -69,20 +72,35 @@ var (  	// AddGlobalServerOptions adds an array of ServerOption that will be  	// effective globally for newly created servers. The priority will be: 1.  	// user-provided; 2. this method; 3. default values. +	// +	// This is used in the 1.0 release of gcp/observability, and thus must not be +	// deleted or changed.  	AddGlobalServerOptions interface{} // func(opt ...ServerOption)  	// ClearGlobalServerOptions clears the array of extra ServerOption. This  	// method is useful in testing and benchmarking. +	// +	// This is used in the 1.0 release of gcp/observability, and thus must not be +	// deleted or changed.  	ClearGlobalServerOptions func()  	// AddGlobalDialOptions adds an array of DialOption that will be effective  	// globally for newly created client channels. The priority will be: 1.  	// user-provided; 2. this method; 3. default values. +	// +	// This is used in the 1.0 release of gcp/observability, and thus must not be +	// deleted or changed.  	AddGlobalDialOptions interface{} // func(opt ...DialOption)  	// DisableGlobalDialOptions returns a DialOption that prevents the  	// ClientConn from applying the global DialOptions (set via  	// AddGlobalDialOptions). +	// +	// This is used in the 1.0 release of gcp/observability, and thus must not be +	// deleted or changed.  	DisableGlobalDialOptions interface{} // func() grpc.DialOption  	// ClearGlobalDialOptions clears the array of extra DialOption. This  	// method is useful in testing and benchmarking. +	// +	// This is used in the 1.0 release of gcp/observability, and thus must not be +	// deleted or changed.  	ClearGlobalDialOptions func()  	// JoinDialOptions combines the dial options passed as arguments into a  	// single dial option. @@ -93,9 +111,15 @@ var (  	// WithBinaryLogger returns a DialOption that specifies the binary logger  	// for a ClientConn. +	// +	// This is used in the 1.0 release of gcp/observability, and thus must not be +	// deleted or changed.  	WithBinaryLogger interface{} // func(binarylog.Logger) grpc.DialOption  	// BinaryLogger returns a ServerOption that can set the binary logger for a  	// server. +	// +	// This is used in the 1.0 release of gcp/observability, and thus must not be +	// deleted or changed.  	BinaryLogger interface{} // func(binarylog.Logger) grpc.ServerOption  	// NewXDSResolverWithConfigForTesting creates a new xds resolver builder using diff --git a/vendor/google.golang.org/grpc/internal/resolver/dns/dns_resolver.go b/vendor/google.golang.org/grpc/internal/resolver/dns/dns_resolver.go index 09a667f33..99e1e5b36 100644 --- a/vendor/google.golang.org/grpc/internal/resolver/dns/dns_resolver.go +++ b/vendor/google.golang.org/grpc/internal/resolver/dns/dns_resolver.go @@ -62,7 +62,8 @@ const (  	defaultPort       = "443"  	defaultDNSSvrPort = "53"  	golang            = "GO" -	// txtPrefix is the prefix string to be prepended to the host name for txt record lookup. +	// txtPrefix is the prefix string to be prepended to the host name for txt +	// record lookup.  	txtPrefix = "_grpc_config."  	// In DNS, service config is encoded in a TXT record via the mechanism  	// described in RFC-1464 using the attribute name grpc_config. @@ -86,14 +87,14 @@ var (  	minDNSResRate = 30 * time.Second  ) -var customAuthorityDialler = func(authority string) func(ctx context.Context, network, address string) (net.Conn, error) { -	return func(ctx context.Context, network, address string) (net.Conn, error) { +var addressDialer = func(address string) func(context.Context, string, string) (net.Conn, error) { +	return func(ctx context.Context, network, _ string) (net.Conn, error) {  		var dialer net.Dialer -		return dialer.DialContext(ctx, network, authority) +		return dialer.DialContext(ctx, network, address)  	}  } -var customAuthorityResolver = func(authority string) (netResolver, error) { +var newNetResolver = func(authority string) (netResolver, error) {  	host, port, err := parseTarget(authority, defaultDNSSvrPort)  	if err != nil {  		return nil, err @@ -103,7 +104,7 @@ var customAuthorityResolver = func(authority string) (netResolver, error) {  	return &net.Resolver{  		PreferGo: true, -		Dial:     customAuthorityDialler(authorityWithPort), +		Dial:     addressDialer(authorityWithPort),  	}, nil  } @@ -114,7 +115,8 @@ func NewBuilder() resolver.Builder {  type dnsBuilder struct{} -// Build creates and starts a DNS resolver that watches the name resolution of the target. +// Build creates and starts a DNS resolver that watches the name resolution of +// the target.  func (b *dnsBuilder) Build(target resolver.Target, cc resolver.ClientConn, opts resolver.BuildOptions) (resolver.Resolver, error) {  	host, port, err := parseTarget(target.Endpoint(), defaultPort)  	if err != nil { @@ -143,7 +145,7 @@ func (b *dnsBuilder) Build(target resolver.Target, cc resolver.ClientConn, opts  	if target.URL.Host == "" {  		d.resolver = defaultResolver  	} else { -		d.resolver, err = customAuthorityResolver(target.URL.Host) +		d.resolver, err = newNetResolver(target.URL.Host)  		if err != nil {  			return nil, err  		} @@ -180,19 +182,22 @@ type dnsResolver struct {  	ctx      context.Context  	cancel   context.CancelFunc  	cc       resolver.ClientConn -	// rn channel is used by ResolveNow() to force an immediate resolution of the target. +	// rn channel is used by ResolveNow() to force an immediate resolution of the +	// target.  	rn chan struct{} -	// wg is used to enforce Close() to return after the watcher() goroutine has finished. -	// Otherwise, data race will be possible. [Race Example] in dns_resolver_test we -	// replace the real lookup functions with mocked ones to facilitate testing. -	// If Close() doesn't wait for watcher() goroutine finishes, race detector sometimes -	// will warns lookup (READ the lookup function pointers) inside watcher() goroutine -	// has data race with replaceNetFunc (WRITE the lookup function pointers). +	// wg is used to enforce Close() to return after the watcher() goroutine has +	// finished. Otherwise, data race will be possible. [Race Example] in +	// dns_resolver_test we replace the real lookup functions with mocked ones to +	// facilitate testing. If Close() doesn't wait for watcher() goroutine +	// finishes, race detector sometimes will warns lookup (READ the lookup +	// function pointers) inside watcher() goroutine has data race with +	// replaceNetFunc (WRITE the lookup function pointers).  	wg                   sync.WaitGroup  	disableServiceConfig bool  } -// ResolveNow invoke an immediate resolution of the target that this dnsResolver watches. +// ResolveNow invoke an immediate resolution of the target that this +// dnsResolver watches.  func (d *dnsResolver) ResolveNow(resolver.ResolveNowOptions) {  	select {  	case d.rn <- struct{}{}: @@ -220,8 +225,8 @@ func (d *dnsResolver) watcher() {  		var timer *time.Timer  		if err == nil { -			// Success resolving, wait for the next ResolveNow. However, also wait 30 seconds at the very least -			// to prevent constantly re-resolving. +			// Success resolving, wait for the next ResolveNow. However, also wait 30 +			// seconds at the very least to prevent constantly re-resolving.  			backoffIndex = 1  			timer = newTimerDNSResRate(minDNSResRate)  			select { @@ -231,7 +236,8 @@ func (d *dnsResolver) watcher() {  			case <-d.rn:  			}  		} else { -			// Poll on an error found in DNS Resolver or an error received from ClientConn. +			// Poll on an error found in DNS Resolver or an error received from +			// ClientConn.  			timer = newTimer(backoff.DefaultExponential.Backoff(backoffIndex))  			backoffIndex++  		} @@ -278,7 +284,8 @@ func (d *dnsResolver) lookupSRV() ([]resolver.Address, error) {  }  func handleDNSError(err error, lookupType string) error { -	if dnsErr, ok := err.(*net.DNSError); ok && !dnsErr.IsTimeout && !dnsErr.IsTemporary { +	dnsErr, ok := err.(*net.DNSError) +	if ok && !dnsErr.IsTimeout && !dnsErr.IsTemporary {  		// Timeouts and temporary errors should be communicated to gRPC to  		// attempt another DNS query (with backoff).  Other errors should be  		// suppressed (they may represent the absence of a TXT record). @@ -307,10 +314,12 @@ func (d *dnsResolver) lookupTXT() *serviceconfig.ParseResult {  		res += s  	} -	// TXT record must have "grpc_config=" attribute in order to be used as service config. +	// TXT record must have "grpc_config=" attribute in order to be used as +	// service config.  	if !strings.HasPrefix(res, txtAttribute) {  		logger.Warningf("dns: TXT record %v missing %v attribute", res, txtAttribute) -		// This is not an error; it is the equivalent of not having a service config. +		// This is not an error; it is the equivalent of not having a service +		// config.  		return nil  	}  	sc := canaryingSC(strings.TrimPrefix(res, txtAttribute)) @@ -352,9 +361,10 @@ func (d *dnsResolver) lookup() (*resolver.State, error) {  	return &state, nil  } -// formatIP returns ok = false if addr is not a valid textual representation of an IP address. -// If addr is an IPv4 address, return the addr and ok = true. -// If addr is an IPv6 address, return the addr enclosed in square brackets and ok = true. +// formatIP returns ok = false if addr is not a valid textual representation of +// an IP address. If addr is an IPv4 address, return the addr and ok = true. +// If addr is an IPv6 address, return the addr enclosed in square brackets and +// ok = true.  func formatIP(addr string) (addrIP string, ok bool) {  	ip := net.ParseIP(addr)  	if ip == nil { @@ -366,10 +376,10 @@ func formatIP(addr string) (addrIP string, ok bool) {  	return "[" + addr + "]", true  } -// parseTarget takes the user input target string and default port, returns formatted host and port info. -// If target doesn't specify a port, set the port to be the defaultPort. -// If target is in IPv6 format and host-name is enclosed in square brackets, brackets -// are stripped when setting the host. +// parseTarget takes the user input target string and default port, returns +// formatted host and port info. If target doesn't specify a port, set the port +// to be the defaultPort. If target is in IPv6 format and host-name is enclosed +// in square brackets, brackets are stripped when setting the host.  // examples:  // target: "www.google.com" defaultPort: "443" returns host: "www.google.com", port: "443"  // target: "ipv4-host:80" defaultPort: "443" returns host: "ipv4-host", port: "80" @@ -385,12 +395,14 @@ func parseTarget(target, defaultPort string) (host, port string, err error) {  	}  	if host, port, err = net.SplitHostPort(target); err == nil {  		if port == "" { -			// If the port field is empty (target ends with colon), e.g. "[::1]:", this is an error. +			// If the port field is empty (target ends with colon), e.g. "[::1]:", +			// this is an error.  			return "", "", errEndsWithColon  		}  		// target has port, i.e ipv4-host:port, [ipv6-host]:port, host-name:port  		if host == "" { -			// Keep consistent with net.Dial(): If the host is empty, as in ":80", the local system is assumed. +			// Keep consistent with net.Dial(): If the host is empty, as in ":80", +			// the local system is assumed.  			host = "localhost"  		}  		return host, port, nil diff --git a/vendor/google.golang.org/grpc/internal/serviceconfig/duration.go b/vendor/google.golang.org/grpc/internal/serviceconfig/duration.go new file mode 100644 index 000000000..11d82afcc --- /dev/null +++ b/vendor/google.golang.org/grpc/internal/serviceconfig/duration.go @@ -0,0 +1,130 @@ +/* + * + * Copyright 2023 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + *     http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package serviceconfig + +import ( +	"encoding/json" +	"fmt" +	"math" +	"strconv" +	"strings" +	"time" +) + +// Duration defines JSON marshal and unmarshal methods to conform to the +// protobuf JSON spec defined [here]. +// +// [here]: https://protobuf.dev/reference/protobuf/google.protobuf/#duration +type Duration time.Duration + +func (d Duration) String() string { +	return fmt.Sprint(time.Duration(d)) +} + +// MarshalJSON converts from d to a JSON string output. +func (d Duration) MarshalJSON() ([]byte, error) { +	ns := time.Duration(d).Nanoseconds() +	sec := ns / int64(time.Second) +	ns = ns % int64(time.Second) + +	var sign string +	if sec < 0 || ns < 0 { +		sign, sec, ns = "-", -1*sec, -1*ns +	} + +	// Generated output always contains 0, 3, 6, or 9 fractional digits, +	// depending on required precision. +	str := fmt.Sprintf("%s%d.%09d", sign, sec, ns) +	str = strings.TrimSuffix(str, "000") +	str = strings.TrimSuffix(str, "000") +	str = strings.TrimSuffix(str, ".000") +	return []byte(fmt.Sprintf("\"%ss\"", str)), nil +} + +// UnmarshalJSON unmarshals b as a duration JSON string into d. +func (d *Duration) UnmarshalJSON(b []byte) error { +	var s string +	if err := json.Unmarshal(b, &s); err != nil { +		return err +	} +	if !strings.HasSuffix(s, "s") { +		return fmt.Errorf("malformed duration %q: missing seconds unit", s) +	} +	neg := false +	if s[0] == '-' { +		neg = true +		s = s[1:] +	} +	ss := strings.SplitN(s[:len(s)-1], ".", 3) +	if len(ss) > 2 { +		return fmt.Errorf("malformed duration %q: too many decimals", s) +	} +	// hasDigits is set if either the whole or fractional part of the number is +	// present, since both are optional but one is required. +	hasDigits := false +	var sec, ns int64 +	if len(ss[0]) > 0 { +		var err error +		if sec, err = strconv.ParseInt(ss[0], 10, 64); err != nil { +			return fmt.Errorf("malformed duration %q: %v", s, err) +		} +		// Maximum seconds value per the durationpb spec. +		const maxProtoSeconds = 315_576_000_000 +		if sec > maxProtoSeconds { +			return fmt.Errorf("out of range: %q", s) +		} +		hasDigits = true +	} +	if len(ss) == 2 && len(ss[1]) > 0 { +		if len(ss[1]) > 9 { +			return fmt.Errorf("malformed duration %q: too many digits after decimal", s) +		} +		var err error +		if ns, err = strconv.ParseInt(ss[1], 10, 64); err != nil { +			return fmt.Errorf("malformed duration %q: %v", s, err) +		} +		for i := 9; i > len(ss[1]); i-- { +			ns *= 10 +		} +		hasDigits = true +	} +	if !hasDigits { +		return fmt.Errorf("malformed duration %q: contains no numbers", s) +	} + +	if neg { +		sec *= -1 +		ns *= -1 +	} + +	// Maximum/minimum seconds/nanoseconds representable by Go's time.Duration. +	const maxSeconds = math.MaxInt64 / int64(time.Second) +	const maxNanosAtMaxSeconds = math.MaxInt64 % int64(time.Second) +	const minSeconds = math.MinInt64 / int64(time.Second) +	const minNanosAtMinSeconds = math.MinInt64 % int64(time.Second) + +	if sec > maxSeconds || (sec == maxSeconds && ns >= maxNanosAtMaxSeconds) { +		*d = Duration(math.MaxInt64) +	} else if sec < minSeconds || (sec == minSeconds && ns <= minNanosAtMinSeconds) { +		*d = Duration(math.MinInt64) +	} else { +		*d = Duration(sec*int64(time.Second) + ns) +	} +	return nil +} diff --git a/vendor/google.golang.org/grpc/internal/transport/handler_server.go b/vendor/google.golang.org/grpc/internal/transport/handler_server.go index fbee581b8..98f80e3fa 100644 --- a/vendor/google.golang.org/grpc/internal/transport/handler_server.go +++ b/vendor/google.golang.org/grpc/internal/transport/handler_server.go @@ -453,7 +453,7 @@ func (ht *serverHandlerTransport) IncrMsgSent() {}  func (ht *serverHandlerTransport) IncrMsgRecv() {} -func (ht *serverHandlerTransport) Drain() { +func (ht *serverHandlerTransport) Drain(debugData string) {  	panic("Drain() is not implemented")  } diff --git a/vendor/google.golang.org/grpc/internal/transport/http2_client.go b/vendor/google.golang.org/grpc/internal/transport/http2_client.go index 5216998a8..326bf0848 100644 --- a/vendor/google.golang.org/grpc/internal/transport/http2_client.go +++ b/vendor/google.golang.org/grpc/internal/transport/http2_client.go @@ -1337,7 +1337,7 @@ func (t *http2Client) handleGoAway(f *http2.GoAwayFrame) {  // setGoAwayReason sets the value of t.goAwayReason based  // on the GoAway frame received. -// It expects a lock on transport's mutext to be held by +// It expects a lock on transport's mutex to be held by  // the caller.  func (t *http2Client) setGoAwayReason(f *http2.GoAwayFrame) {  	t.goAwayReason = GoAwayNoReason diff --git a/vendor/google.golang.org/grpc/internal/transport/http2_server.go b/vendor/google.golang.org/grpc/internal/transport/http2_server.go index 4b406b8cb..f96064012 100644 --- a/vendor/google.golang.org/grpc/internal/transport/http2_server.go +++ b/vendor/google.golang.org/grpc/internal/transport/http2_server.go @@ -238,7 +238,7 @@ func NewServerTransport(conn net.Conn, config *ServerConfig) (_ ServerTransport,  		kp.Timeout = defaultServerKeepaliveTimeout  	}  	if kp.Time != infinity { -		if err = syscall.SetTCPUserTimeout(conn, kp.Timeout); err != nil { +		if err = syscall.SetTCPUserTimeout(rawConn, kp.Timeout); err != nil {  			return nil, connectionErrorf(false, err, "transport: failed to set TCP_USER_TIMEOUT: %v", err)  		}  	} @@ -1166,12 +1166,12 @@ func (t *http2Server) keepalive() {  			if val <= 0 {  				// The connection has been idle for a duration of keepalive.MaxConnectionIdle or more.  				// Gracefully close the connection. -				t.Drain() +				t.Drain("max_idle")  				return  			}  			idleTimer.Reset(val)  		case <-ageTimer.C: -			t.Drain() +			t.Drain("max_age")  			ageTimer.Reset(t.kp.MaxConnectionAgeGrace)  			select {  			case <-ageTimer.C: @@ -1318,14 +1318,14 @@ func (t *http2Server) RemoteAddr() net.Addr {  	return t.remoteAddr  } -func (t *http2Server) Drain() { +func (t *http2Server) Drain(debugData string) {  	t.mu.Lock()  	defer t.mu.Unlock()  	if t.drainEvent != nil {  		return  	}  	t.drainEvent = grpcsync.NewEvent() -	t.controlBuf.put(&goAway{code: http2.ErrCodeNo, debugData: []byte{}, headsUp: true}) +	t.controlBuf.put(&goAway{code: http2.ErrCodeNo, debugData: []byte(debugData), headsUp: true})  }  var goAwayPing = &ping{data: [8]byte{1, 6, 1, 8, 0, 3, 3, 9}} @@ -1367,7 +1367,7 @@ func (t *http2Server) outgoingGoAwayHandler(g *goAway) (bool, error) {  	// originated before the GoAway reaches the client.  	// After getting the ack or timer expiration send out another GoAway this  	// time with an ID of the max stream server intends to process. -	if err := t.framer.fr.WriteGoAway(math.MaxUint32, http2.ErrCodeNo, []byte{}); err != nil { +	if err := t.framer.fr.WriteGoAway(math.MaxUint32, http2.ErrCodeNo, g.debugData); err != nil {  		return false, err  	}  	if err := t.framer.fr.WritePing(false, goAwayPing.data); err != nil { diff --git a/vendor/google.golang.org/grpc/internal/transport/transport.go b/vendor/google.golang.org/grpc/internal/transport/transport.go index 1b7d7fabc..aa1c89659 100644 --- a/vendor/google.golang.org/grpc/internal/transport/transport.go +++ b/vendor/google.golang.org/grpc/internal/transport/transport.go @@ -726,7 +726,7 @@ type ServerTransport interface {  	RemoteAddr() net.Addr  	// Drain notifies the client this ServerTransport stops accepting new RPCs. -	Drain() +	Drain(debugData string)  	// IncrMsgSent increments the number of message sent through this transport.  	IncrMsgSent() | 
