diff options
Diffstat (limited to 'vendor/google.golang.org/grpc/internal')
20 files changed, 551 insertions, 373 deletions
diff --git a/vendor/google.golang.org/grpc/internal/buffer/unbounded.go b/vendor/google.golang.org/grpc/internal/buffer/unbounded.go index 4399c3df4..11f91668a 100644 --- a/vendor/google.golang.org/grpc/internal/buffer/unbounded.go +++ b/vendor/google.golang.org/grpc/internal/buffer/unbounded.go @@ -18,7 +18,10 @@  // Package buffer provides an implementation of an unbounded buffer.  package buffer -import "sync" +import ( +	"errors" +	"sync" +)  // Unbounded is an implementation of an unbounded buffer which does not use  // extra goroutines. This is typically used for passing updates from one entity @@ -36,6 +39,7 @@ import "sync"  type Unbounded struct {  	c       chan any  	closed  bool +	closing bool  	mu      sync.Mutex  	backlog []any  } @@ -45,32 +49,32 @@ func NewUnbounded() *Unbounded {  	return &Unbounded{c: make(chan any, 1)}  } +var errBufferClosed = errors.New("Put called on closed buffer.Unbounded") +  // Put adds t to the unbounded buffer. -func (b *Unbounded) Put(t any) { +func (b *Unbounded) Put(t any) error {  	b.mu.Lock()  	defer b.mu.Unlock() -	if b.closed { -		return +	if b.closing { +		return errBufferClosed  	}  	if len(b.backlog) == 0 {  		select {  		case b.c <- t: -			return +			return nil  		default:  		}  	}  	b.backlog = append(b.backlog, t) +	return nil  } -// Load sends the earliest buffered data, if any, onto the read channel -// returned by Get(). Users are expected to call this every time they read a +// Load sends the earliest buffered data, if any, onto the read channel returned +// by Get(). Users are expected to call this every time they successfully read a  // value from the read channel.  func (b *Unbounded) Load() {  	b.mu.Lock()  	defer b.mu.Unlock() -	if b.closed { -		return -	}  	if len(b.backlog) > 0 {  		select {  		case b.c <- b.backlog[0]: @@ -78,6 +82,8 @@ func (b *Unbounded) Load() {  			b.backlog = b.backlog[1:]  		default:  		} +	} else if b.closing && !b.closed { +		close(b.c)  	}  } @@ -88,18 +94,23 @@ func (b *Unbounded) Load() {  // send the next buffered value onto the channel if there is any.  //  // If the unbounded buffer is closed, the read channel returned by this method -// is closed. +// is closed after all data is drained.  func (b *Unbounded) Get() <-chan any {  	return b.c  } -// Close closes the unbounded buffer. +// Close closes the unbounded buffer. No subsequent data may be Put(), and the +// channel returned from Get() will be closed after all the data is read and +// Load() is called for the final time.  func (b *Unbounded) Close() {  	b.mu.Lock()  	defer b.mu.Unlock() -	if b.closed { +	if b.closing {  		return  	} -	b.closed = true -	close(b.c) +	b.closing = true +	if len(b.backlog) == 0 { +		b.closed = true +		close(b.c) +	}  } diff --git a/vendor/google.golang.org/grpc/internal/channelz/funcs.go b/vendor/google.golang.org/grpc/internal/channelz/funcs.go index 5395e7752..fc094f344 100644 --- a/vendor/google.golang.org/grpc/internal/channelz/funcs.go +++ b/vendor/google.golang.org/grpc/internal/channelz/funcs.go @@ -31,6 +31,7 @@ import (  	"time"  	"google.golang.org/grpc/grpclog" +	"google.golang.org/grpc/internal"  )  const ( @@ -58,6 +59,12 @@ func TurnOn() {  	}  } +func init() { +	internal.ChannelzTurnOffForTesting = func() { +		atomic.StoreInt32(&curState, 0) +	} +} +  // IsOn returns whether channelz data collection is on.  func IsOn() bool {  	return atomic.LoadInt32(&curState) == 1 diff --git a/vendor/google.golang.org/grpc/internal/envconfig/envconfig.go b/vendor/google.golang.org/grpc/internal/envconfig/envconfig.go index 3cf10ddfb..685a3cb41 100644 --- a/vendor/google.golang.org/grpc/internal/envconfig/envconfig.go +++ b/vendor/google.golang.org/grpc/internal/envconfig/envconfig.go @@ -36,9 +36,6 @@ var (  	// "GRPC_RING_HASH_CAP".  This does not override the default bounds  	// checking which NACKs configs specifying ring sizes > 8*1024*1024 (~8M).  	RingHashCap = uint64FromEnv("GRPC_RING_HASH_CAP", 4096, 1, 8*1024*1024) -	// PickFirstLBConfig is set if we should support configuration of the -	// pick_first LB policy. -	PickFirstLBConfig = boolFromEnv("GRPC_EXPERIMENTAL_PICKFIRST_LB_CONFIG", true)  	// LeastRequestLB is set if we should support the least_request_experimental  	// LB policy, which can be enabled by setting the environment variable  	// "GRPC_EXPERIMENTAL_ENABLE_LEAST_REQUEST" to "true". diff --git a/vendor/google.golang.org/grpc/internal/envconfig/xds.go b/vendor/google.golang.org/grpc/internal/envconfig/xds.go index 02b4b6a1c..29f234acb 100644 --- a/vendor/google.golang.org/grpc/internal/envconfig/xds.go +++ b/vendor/google.golang.org/grpc/internal/envconfig/xds.go @@ -50,46 +50,7 @@ var (  	//  	// When both bootstrap FileName and FileContent are set, FileName is used.  	XDSBootstrapFileContent = os.Getenv(XDSBootstrapFileContentEnv) -	// XDSRingHash indicates whether ring hash support is enabled, which can be -	// disabled by setting the environment variable -	// "GRPC_XDS_EXPERIMENTAL_ENABLE_RING_HASH" to "false". -	XDSRingHash = boolFromEnv("GRPC_XDS_EXPERIMENTAL_ENABLE_RING_HASH", true) -	// XDSClientSideSecurity is used to control processing of security -	// configuration on the client-side. -	// -	// Note that there is no env var protection for the server-side because we -	// have a brand new API on the server-side and users explicitly need to use -	// the new API to get security integration on the server. -	XDSClientSideSecurity = boolFromEnv("GRPC_XDS_EXPERIMENTAL_SECURITY_SUPPORT", true) -	// XDSAggregateAndDNS indicates whether processing of aggregated cluster and -	// DNS cluster is enabled, which can be disabled by setting the environment -	// variable "GRPC_XDS_EXPERIMENTAL_ENABLE_AGGREGATE_AND_LOGICAL_DNS_CLUSTER" -	// to "false". -	XDSAggregateAndDNS = boolFromEnv("GRPC_XDS_EXPERIMENTAL_ENABLE_AGGREGATE_AND_LOGICAL_DNS_CLUSTER", true) - -	// XDSRBAC indicates whether xDS configured RBAC HTTP Filter is enabled, -	// which can be disabled by setting the environment variable -	// "GRPC_XDS_EXPERIMENTAL_RBAC" to "false". -	XDSRBAC = boolFromEnv("GRPC_XDS_EXPERIMENTAL_RBAC", true) -	// XDSOutlierDetection indicates whether outlier detection support is -	// enabled, which can be disabled by setting the environment variable -	// "GRPC_EXPERIMENTAL_ENABLE_OUTLIER_DETECTION" to "false". -	XDSOutlierDetection = boolFromEnv("GRPC_EXPERIMENTAL_ENABLE_OUTLIER_DETECTION", true) -	// XDSFederation indicates whether federation support is enabled, which can -	// be enabled by setting the environment variable -	// "GRPC_EXPERIMENTAL_XDS_FEDERATION" to "true". -	XDSFederation = boolFromEnv("GRPC_EXPERIMENTAL_XDS_FEDERATION", true) - -	// XDSRLS indicates whether processing of Cluster Specifier plugins and -	// support for the RLS CLuster Specifier is enabled, which can be disabled by -	// setting the environment variable "GRPC_EXPERIMENTAL_XDS_RLS_LB" to -	// "false". -	XDSRLS = boolFromEnv("GRPC_EXPERIMENTAL_XDS_RLS_LB", true)  	// C2PResolverTestOnlyTrafficDirectorURI is the TD URI for testing.  	C2PResolverTestOnlyTrafficDirectorURI = os.Getenv("GRPC_TEST_ONLY_GOOGLE_C2P_RESOLVER_TRAFFIC_DIRECTOR_URI") -	// XDSCustomLBPolicy indicates whether Custom LB Policies are enabled, which -	// can be disabled by setting the environment variable -	// "GRPC_EXPERIMENTAL_XDS_CUSTOM_LB_CONFIG" to "false". -	XDSCustomLBPolicy = boolFromEnv("GRPC_EXPERIMENTAL_XDS_CUSTOM_LB_CONFIG", true)  ) diff --git a/vendor/google.golang.org/grpc/internal/experimental.go b/vendor/google.golang.org/grpc/internal/experimental.go new file mode 100644 index 000000000..7f7044e17 --- /dev/null +++ b/vendor/google.golang.org/grpc/internal/experimental.go @@ -0,0 +1,28 @@ +/* + * Copyright 2023 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + *     http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package internal + +var ( +	// WithRecvBufferPool is implemented by the grpc package and returns a dial +	// option to configure a shared buffer pool for a grpc.ClientConn. +	WithRecvBufferPool any // func (grpc.SharedBufferPool) grpc.DialOption + +	// RecvBufferPool is implemented by the grpc package and returns a server +	// option to configure a shared buffer pool for a grpc.Server. +	RecvBufferPool any // func (grpc.SharedBufferPool) grpc.ServerOption +) diff --git a/vendor/google.golang.org/grpc/internal/grpcsync/callback_serializer.go b/vendor/google.golang.org/grpc/internal/grpcsync/callback_serializer.go index 900917dbe..f7f40a16a 100644 --- a/vendor/google.golang.org/grpc/internal/grpcsync/callback_serializer.go +++ b/vendor/google.golang.org/grpc/internal/grpcsync/callback_serializer.go @@ -20,7 +20,6 @@ package grpcsync  import (  	"context" -	"sync"  	"google.golang.org/grpc/internal/buffer"  ) @@ -38,8 +37,6 @@ type CallbackSerializer struct {  	done chan struct{}  	callbacks *buffer.Unbounded -	closedMu  sync.Mutex -	closed    bool  }  // NewCallbackSerializer returns a new CallbackSerializer instance. The provided @@ -65,56 +62,34 @@ func NewCallbackSerializer(ctx context.Context) *CallbackSerializer {  // callbacks to be executed by the serializer. It is not possible to add  // callbacks once the context passed to NewCallbackSerializer is cancelled.  func (cs *CallbackSerializer) Schedule(f func(ctx context.Context)) bool { -	cs.closedMu.Lock() -	defer cs.closedMu.Unlock() - -	if cs.closed { -		return false -	} -	cs.callbacks.Put(f) -	return true +	return cs.callbacks.Put(f) == nil  }  func (cs *CallbackSerializer) run(ctx context.Context) { -	var backlog []func(context.Context) -  	defer close(cs.done) + +	// TODO: when Go 1.21 is the oldest supported version, this loop and Close +	// can be replaced with: +	// +	// context.AfterFunc(ctx, cs.callbacks.Close)  	for ctx.Err() == nil {  		select {  		case <-ctx.Done():  			// Do nothing here. Next iteration of the for loop will not happen,  			// since ctx.Err() would be non-nil. -		case callback, ok := <-cs.callbacks.Get(): -			if !ok { -				return -			} +		case cb := <-cs.callbacks.Get():  			cs.callbacks.Load() -			callback.(func(ctx context.Context))(ctx) +			cb.(func(context.Context))(ctx)  		}  	} -	// Fetch pending callbacks if any, and execute them before returning from -	// this method and closing cs.done. -	cs.closedMu.Lock() -	cs.closed = true -	backlog = cs.fetchPendingCallbacks() +	// Close the buffer to prevent new callbacks from being added.  	cs.callbacks.Close() -	cs.closedMu.Unlock() -	for _, b := range backlog { -		b(ctx) -	} -} -func (cs *CallbackSerializer) fetchPendingCallbacks() []func(context.Context) { -	var backlog []func(context.Context) -	for { -		select { -		case b := <-cs.callbacks.Get(): -			backlog = append(backlog, b.(func(context.Context))) -			cs.callbacks.Load() -		default: -			return backlog -		} +	// Run all pending callbacks. +	for cb := range cs.callbacks.Get() { +		cs.callbacks.Load() +		cb.(func(context.Context))(ctx)  	}  } diff --git a/vendor/google.golang.org/grpc/internal/idle/idle.go b/vendor/google.golang.org/grpc/internal/idle/idle.go index 6c272476e..fe49cb74c 100644 --- a/vendor/google.golang.org/grpc/internal/idle/idle.go +++ b/vendor/google.golang.org/grpc/internal/idle/idle.go @@ -26,8 +26,6 @@ import (  	"sync"  	"sync/atomic"  	"time" - -	"google.golang.org/grpc/grpclog"  )  // For overriding in unit tests. @@ -39,27 +37,12 @@ var timeAfterFunc = func(d time.Duration, f func()) *time.Timer {  // and exit from idle mode.  type Enforcer interface {  	ExitIdleMode() error -	EnterIdleMode() error -} - -// Manager defines the functionality required to track RPC activity on a -// channel. -type Manager interface { -	OnCallBegin() error -	OnCallEnd() -	Close() +	EnterIdleMode()  } -type noopManager struct{} - -func (noopManager) OnCallBegin() error { return nil } -func (noopManager) OnCallEnd()         {} -func (noopManager) Close()             {} - -// manager implements the Manager interface. It uses atomic operations to -// synchronize access to shared state and a mutex to guarantee mutual exclusion -// in a critical section. -type manager struct { +// Manager implements idleness detection and calls the configured Enforcer to +// enter/exit idle mode when appropriate.  Must be created by NewManager. +type Manager struct {  	// State accessed atomically.  	lastCallEndTime           int64 // Unix timestamp in nanos; time when the most recent RPC completed.  	activeCallsCount          int32 // Count of active RPCs; -math.MaxInt32 means channel is idle or is trying to get there. @@ -69,8 +52,7 @@ type manager struct {  	// Can be accessed without atomics or mutex since these are set at creation  	// time and read-only after that.  	enforcer Enforcer // Functionality provided by grpc.ClientConn. -	timeout  int64    // Idle timeout duration nanos stored as an int64. -	logger   grpclog.LoggerV2 +	timeout  time.Duration  	// idleMu is used to guarantee mutual exclusion in two scenarios:  	// - Opposing intentions: @@ -88,57 +70,48 @@ type manager struct {  	timer        *time.Timer  } -// ManagerOptions is a collection of options used by -// NewManager. -type ManagerOptions struct { -	Enforcer Enforcer -	Timeout  time.Duration -	Logger   grpclog.LoggerV2 +// NewManager creates a new idleness manager implementation for the +// given idle timeout.  It begins in idle mode. +func NewManager(enforcer Enforcer, timeout time.Duration) *Manager { +	return &Manager{ +		enforcer:         enforcer, +		timeout:          timeout, +		actuallyIdle:     true, +		activeCallsCount: -math.MaxInt32, +	}  } -// NewManager creates a new idleness manager implementation for the -// given idle timeout. -func NewManager(opts ManagerOptions) Manager { -	if opts.Timeout == 0 { -		return noopManager{} +// resetIdleTimerLocked resets the idle timer to the given duration.  Called +// when exiting idle mode or when the timer fires and we need to reset it. +func (m *Manager) resetIdleTimerLocked(d time.Duration) { +	if m.isClosed() || m.timeout == 0 || m.actuallyIdle { +		return  	} -	m := &manager{ -		enforcer: opts.Enforcer, -		timeout:  int64(opts.Timeout), -		logger:   opts.Logger, +	// It is safe to ignore the return value from Reset() because this method is +	// only ever called from the timer callback or when exiting idle mode. +	if m.timer != nil { +		m.timer.Stop()  	} -	m.timer = timeAfterFunc(opts.Timeout, m.handleIdleTimeout) -	return m +	m.timer = timeAfterFunc(d, m.handleIdleTimeout)  } -// resetIdleTimer resets the idle timer to the given duration. This method -// should only be called from the timer callback. -func (m *manager) resetIdleTimer(d time.Duration) { +func (m *Manager) resetIdleTimer(d time.Duration) {  	m.idleMu.Lock()  	defer m.idleMu.Unlock() - -	if m.timer == nil { -		// Only close sets timer to nil. We are done. -		return -	} - -	// It is safe to ignore the return value from Reset() because this method is -	// only ever called from the timer callback, which means the timer has -	// already fired. -	m.timer.Reset(d) +	m.resetIdleTimerLocked(d)  }  // handleIdleTimeout is the timer callback that is invoked upon expiry of the  // configured idle timeout. The channel is considered inactive if there are no  // ongoing calls and no RPC activity since the last time the timer fired. -func (m *manager) handleIdleTimeout() { +func (m *Manager) handleIdleTimeout() {  	if m.isClosed() {  		return  	}  	if atomic.LoadInt32(&m.activeCallsCount) > 0 { -		m.resetIdleTimer(time.Duration(m.timeout)) +		m.resetIdleTimer(m.timeout)  		return  	} @@ -148,24 +121,12 @@ func (m *manager) handleIdleTimeout() {  		// Set the timer to fire after a duration of idle timeout, calculated  		// from the time the most recent RPC completed.  		atomic.StoreInt32(&m.activeSinceLastTimerCheck, 0) -		m.resetIdleTimer(time.Duration(atomic.LoadInt64(&m.lastCallEndTime) + m.timeout - time.Now().UnixNano())) +		m.resetIdleTimer(time.Duration(atomic.LoadInt64(&m.lastCallEndTime)-time.Now().UnixNano()) + m.timeout)  		return  	} -	// This CAS operation is extremely likely to succeed given that there has -	// been no activity since the last time we were here.  Setting the -	// activeCallsCount to -math.MaxInt32 indicates to OnCallBegin() that the -	// channel is either in idle mode or is trying to get there. -	if !atomic.CompareAndSwapInt32(&m.activeCallsCount, 0, -math.MaxInt32) { -		// This CAS operation can fail if an RPC started after we checked for -		// activity at the top of this method, or one was ongoing from before -		// the last time we were here. In both case, reset the timer and return. -		m.resetIdleTimer(time.Duration(m.timeout)) -		return -	} - -	// Now that we've set the active calls count to -math.MaxInt32, it's time to -	// actually move to idle mode. +	// Now that we've checked that there has been no activity, attempt to enter +	// idle mode, which is very likely to succeed.  	if m.tryEnterIdleMode() {  		// Successfully entered idle mode. No timer needed until we exit idle.  		return @@ -174,8 +135,7 @@ func (m *manager) handleIdleTimeout() {  	// Failed to enter idle mode due to a concurrent RPC that kept the channel  	// active, or because of an error from the channel. Undo the attempt to  	// enter idle, and reset the timer to try again later. -	atomic.AddInt32(&m.activeCallsCount, math.MaxInt32) -	m.resetIdleTimer(time.Duration(m.timeout)) +	m.resetIdleTimer(m.timeout)  }  // tryEnterIdleMode instructs the channel to enter idle mode. But before @@ -185,36 +145,49 @@ func (m *manager) handleIdleTimeout() {  // Return value indicates whether or not the channel moved to idle mode.  //  // Holds idleMu which ensures mutual exclusion with exitIdleMode. -func (m *manager) tryEnterIdleMode() bool { +func (m *Manager) tryEnterIdleMode() bool { +	// Setting the activeCallsCount to -math.MaxInt32 indicates to OnCallBegin() +	// that the channel is either in idle mode or is trying to get there. +	if !atomic.CompareAndSwapInt32(&m.activeCallsCount, 0, -math.MaxInt32) { +		// This CAS operation can fail if an RPC started after we checked for +		// activity in the timer handler, or one was ongoing from before the +		// last time the timer fired, or if a test is attempting to enter idle +		// mode without checking.  In all cases, abort going into idle mode. +		return false +	} +	// N.B. if we fail to enter idle mode after this, we must re-add +	// math.MaxInt32 to m.activeCallsCount. +  	m.idleMu.Lock()  	defer m.idleMu.Unlock()  	if atomic.LoadInt32(&m.activeCallsCount) != -math.MaxInt32 {  		// We raced and lost to a new RPC. Very rare, but stop entering idle. +		atomic.AddInt32(&m.activeCallsCount, math.MaxInt32)  		return false  	}  	if atomic.LoadInt32(&m.activeSinceLastTimerCheck) == 1 { -		// An very short RPC could have come in (and also finished) after we +		// A very short RPC could have come in (and also finished) after we  		// checked for calls count and activity in handleIdleTimeout(), but  		// before the CAS operation. So, we need to check for activity again. +		atomic.AddInt32(&m.activeCallsCount, math.MaxInt32)  		return false  	} -	// No new RPCs have come in since we last set the active calls count value -	// -math.MaxInt32 in the timer callback. And since we have the lock, it is -	// safe to enter idle mode now. -	if err := m.enforcer.EnterIdleMode(); err != nil { -		m.logger.Errorf("Failed to enter idle mode: %v", err) -		return false -	} - -	// Successfully entered idle mode. +	// No new RPCs have come in since we set the active calls count value to +	// -math.MaxInt32. And since we have the lock, it is safe to enter idle mode +	// unconditionally now. +	m.enforcer.EnterIdleMode()  	m.actuallyIdle = true  	return true  } +func (m *Manager) EnterIdleModeForTesting() { +	m.tryEnterIdleMode() +} +  // OnCallBegin is invoked at the start of every RPC. -func (m *manager) OnCallBegin() error { +func (m *Manager) OnCallBegin() error {  	if m.isClosed() {  		return nil  	} @@ -227,7 +200,7 @@ func (m *manager) OnCallBegin() error {  	// Channel is either in idle mode or is in the process of moving to idle  	// mode. Attempt to exit idle mode to allow this RPC. -	if err := m.exitIdleMode(); err != nil { +	if err := m.ExitIdleMode(); err != nil {  		// Undo the increment to calls count, and return an error causing the  		// RPC to fail.  		atomic.AddInt32(&m.activeCallsCount, -1) @@ -238,28 +211,30 @@ func (m *manager) OnCallBegin() error {  	return nil  } -// exitIdleMode instructs the channel to exit idle mode. -// -// Holds idleMu which ensures mutual exclusion with tryEnterIdleMode. -func (m *manager) exitIdleMode() error { +// ExitIdleMode instructs m to call the enforcer's ExitIdleMode and update m's +// internal state. +func (m *Manager) ExitIdleMode() error { +	// Holds idleMu which ensures mutual exclusion with tryEnterIdleMode.  	m.idleMu.Lock()  	defer m.idleMu.Unlock() -	if !m.actuallyIdle { -		// This can happen in two scenarios: +	if m.isClosed() || !m.actuallyIdle { +		// This can happen in three scenarios:  		// - handleIdleTimeout() set the calls count to -math.MaxInt32 and called  		//   tryEnterIdleMode(). But before the latter could grab the lock, an RPC  		//   came in and OnCallBegin() noticed that the calls count is negative.  		// - Channel is in idle mode, and multiple new RPCs come in at the same  		//   time, all of them notice a negative calls count in OnCallBegin and get  		//   here. The first one to get the lock would got the channel to exit idle. +		// - Channel is not in idle mode, and the user calls Connect which calls +		//   m.ExitIdleMode.  		// -		// Either way, nothing to do here. +		// In any case, there is nothing to do here.  		return nil  	}  	if err := m.enforcer.ExitIdleMode(); err != nil { -		return fmt.Errorf("channel failed to exit idle mode: %v", err) +		return fmt.Errorf("failed to exit idle mode: %w", err)  	}  	// Undo the idle entry process. This also respects any new RPC attempts. @@ -267,12 +242,12 @@ func (m *manager) exitIdleMode() error {  	m.actuallyIdle = false  	// Start a new timer to fire after the configured idle timeout. -	m.timer = timeAfterFunc(time.Duration(m.timeout), m.handleIdleTimeout) +	m.resetIdleTimerLocked(m.timeout)  	return nil  }  // OnCallEnd is invoked at the end of every RPC. -func (m *manager) OnCallEnd() { +func (m *Manager) OnCallEnd() {  	if m.isClosed() {  		return  	} @@ -287,15 +262,17 @@ func (m *manager) OnCallEnd() {  	atomic.AddInt32(&m.activeCallsCount, -1)  } -func (m *manager) isClosed() bool { +func (m *Manager) isClosed() bool {  	return atomic.LoadInt32(&m.closed) == 1  } -func (m *manager) Close() { +func (m *Manager) Close() {  	atomic.StoreInt32(&m.closed, 1)  	m.idleMu.Lock() -	m.timer.Stop() -	m.timer = nil +	if m.timer != nil { +		m.timer.Stop() +		m.timer = nil +	}  	m.idleMu.Unlock()  } diff --git a/vendor/google.golang.org/grpc/internal/internal.go b/vendor/google.golang.org/grpc/internal/internal.go index 0d94c63e0..6c7ea6a53 100644 --- a/vendor/google.golang.org/grpc/internal/internal.go +++ b/vendor/google.golang.org/grpc/internal/internal.go @@ -57,7 +57,7 @@ var (  	// GetXDSHandshakeInfoForTesting returns a pointer to the xds.HandshakeInfo  	// stored in the passed in attributes. This is set by  	// credentials/xds/xds.go. -	GetXDSHandshakeInfoForTesting any // func (*attributes.Attributes) *xds.HandshakeInfo +	GetXDSHandshakeInfoForTesting any // func (*attributes.Attributes) *unsafe.Pointer  	// GetServerCredentials returns the transport credentials configured on a  	// gRPC server. An xDS-enabled server needs to know what type of credentials  	// is configured on the underlying gRPC server. This is set by server.go. @@ -68,11 +68,11 @@ var (  	// This is used in the 1.0 release of gcp/observability, and thus must not be  	// deleted or changed.  	CanonicalString any // func (codes.Code) string -	// DrainServerTransports initiates a graceful close of existing connections -	// on a gRPC server accepted on the provided listener address. An -	// xDS-enabled server invokes this method on a grpc.Server when a particular -	// listener moves to "not-serving" mode. -	DrainServerTransports any // func(*grpc.Server, string) +	// IsRegisteredMethod returns whether the passed in method is registered as +	// a method on the server. +	IsRegisteredMethod any // func(*grpc.Server, string) bool +	// ServerFromContext returns the server from the context. +	ServerFromContext any // func(context.Context) *grpc.Server  	// AddGlobalServerOptions adds an array of ServerOption that will be  	// effective globally for newly created servers. The priority will be: 1.  	// user-provided; 2. this method; 3. default values. @@ -177,10 +177,25 @@ var (  	GRPCResolverSchemeExtraMetadata string = "xds"  	// EnterIdleModeForTesting gets the ClientConn to enter IDLE mode. -	EnterIdleModeForTesting any // func(*grpc.ClientConn) error +	EnterIdleModeForTesting any // func(*grpc.ClientConn)  	// ExitIdleModeForTesting gets the ClientConn to exit IDLE mode.  	ExitIdleModeForTesting any // func(*grpc.ClientConn) error + +	ChannelzTurnOffForTesting func() + +	// TriggerXDSResourceNameNotFoundForTesting triggers the resource-not-found +	// error for a given resource type and name. This is usually triggered when +	// the associated watch timer fires. For testing purposes, having this +	// function makes events more predictable than relying on timer events. +	TriggerXDSResourceNameNotFoundForTesting any // func(func(xdsresource.Type, string), string, string) error + +	// TriggerXDSResourceNotFoundClient invokes the testing xDS Client singleton +	// to invoke resource not found for a resource type name and resource name. +	TriggerXDSResourceNameNotFoundClient any // func(string, string) error + +	// FromOutgoingContextRaw returns the un-merged, intermediary contents of metadata.rawMD. +	FromOutgoingContextRaw any // func(context.Context) (metadata.MD, [][]string, bool)  )  // HealthChecker defines the signature of the client-side LB channel health checking function. diff --git a/vendor/google.golang.org/grpc/internal/resolver/dns/dns_resolver.go b/vendor/google.golang.org/grpc/internal/resolver/dns/dns_resolver.go index 99e1e5b36..b66dcb213 100644 --- a/vendor/google.golang.org/grpc/internal/resolver/dns/dns_resolver.go +++ b/vendor/google.golang.org/grpc/internal/resolver/dns/dns_resolver.go @@ -23,7 +23,6 @@ package dns  import (  	"context"  	"encoding/json" -	"errors"  	"fmt"  	"net"  	"os" @@ -37,6 +36,7 @@ import (  	"google.golang.org/grpc/internal/backoff"  	"google.golang.org/grpc/internal/envconfig"  	"google.golang.org/grpc/internal/grpcrand" +	"google.golang.org/grpc/internal/resolver/dns/internal"  	"google.golang.org/grpc/resolver"  	"google.golang.org/grpc/serviceconfig"  ) @@ -47,15 +47,11 @@ var EnableSRVLookups = false  var logger = grpclog.Component("dns") -// Globals to stub out in tests. TODO: Perhaps these two can be combined into a -// single variable for testing the resolver? -var ( -	newTimer           = time.NewTimer -	newTimerDNSResRate = time.NewTimer -) -  func init() {  	resolver.Register(NewBuilder()) +	internal.TimeAfterFunc = time.After +	internal.NewNetResolver = newNetResolver +	internal.AddressDialer = addressDialer  }  const ( @@ -70,23 +66,6 @@ const (  	txtAttribute = "grpc_config="  ) -var ( -	errMissingAddr = errors.New("dns resolver: missing address") - -	// Addresses ending with a colon that is supposed to be the separator -	// between host and port is not allowed.  E.g. "::" is a valid address as -	// it is an IPv6 address (host only) and "[::]:" is invalid as it ends with -	// a colon as the host and port separator -	errEndsWithColon = errors.New("dns resolver: missing port after port-separator colon") -) - -var ( -	defaultResolver netResolver = net.DefaultResolver -	// To prevent excessive re-resolution, we enforce a rate limit on DNS -	// resolution requests. -	minDNSResRate = 30 * time.Second -) -  var addressDialer = func(address string) func(context.Context, string, string) (net.Conn, error) {  	return func(ctx context.Context, network, _ string) (net.Conn, error) {  		var dialer net.Dialer @@ -94,7 +73,11 @@ var addressDialer = func(address string) func(context.Context, string, string) (  	}  } -var newNetResolver = func(authority string) (netResolver, error) { +var newNetResolver = func(authority string) (internal.NetResolver, error) { +	if authority == "" { +		return net.DefaultResolver, nil +	} +  	host, port, err := parseTarget(authority, defaultDNSSvrPort)  	if err != nil {  		return nil, err @@ -104,7 +87,7 @@ var newNetResolver = func(authority string) (netResolver, error) {  	return &net.Resolver{  		PreferGo: true, -		Dial:     addressDialer(authorityWithPort), +		Dial:     internal.AddressDialer(authorityWithPort),  	}, nil  } @@ -142,13 +125,9 @@ func (b *dnsBuilder) Build(target resolver.Target, cc resolver.ClientConn, opts  		disableServiceConfig: opts.DisableServiceConfig,  	} -	if target.URL.Host == "" { -		d.resolver = defaultResolver -	} else { -		d.resolver, err = newNetResolver(target.URL.Host) -		if err != nil { -			return nil, err -		} +	d.resolver, err = internal.NewNetResolver(target.URL.Host) +	if err != nil { +		return nil, err  	}  	d.wg.Add(1) @@ -161,12 +140,6 @@ func (b *dnsBuilder) Scheme() string {  	return "dns"  } -type netResolver interface { -	LookupHost(ctx context.Context, host string) (addrs []string, err error) -	LookupSRV(ctx context.Context, service, proto, name string) (cname string, addrs []*net.SRV, err error) -	LookupTXT(ctx context.Context, name string) (txts []string, err error) -} -  // deadResolver is a resolver that does nothing.  type deadResolver struct{} @@ -178,7 +151,7 @@ func (deadResolver) Close() {}  type dnsResolver struct {  	host     string  	port     string -	resolver netResolver +	resolver internal.NetResolver  	ctx      context.Context  	cancel   context.CancelFunc  	cc       resolver.ClientConn @@ -223,29 +196,27 @@ func (d *dnsResolver) watcher() {  			err = d.cc.UpdateState(*state)  		} -		var timer *time.Timer +		var waitTime time.Duration  		if err == nil {  			// Success resolving, wait for the next ResolveNow. However, also wait 30  			// seconds at the very least to prevent constantly re-resolving.  			backoffIndex = 1 -			timer = newTimerDNSResRate(minDNSResRate) +			waitTime = internal.MinResolutionRate  			select {  			case <-d.ctx.Done(): -				timer.Stop()  				return  			case <-d.rn:  			}  		} else {  			// Poll on an error found in DNS Resolver or an error received from  			// ClientConn. -			timer = newTimer(backoff.DefaultExponential.Backoff(backoffIndex)) +			waitTime = backoff.DefaultExponential.Backoff(backoffIndex)  			backoffIndex++  		}  		select {  		case <-d.ctx.Done(): -			timer.Stop()  			return -		case <-timer.C: +		case <-internal.TimeAfterFunc(waitTime):  		}  	}  } @@ -387,7 +358,7 @@ func formatIP(addr string) (addrIP string, ok bool) {  // target: ":80" defaultPort: "443" returns host: "localhost", port: "80"  func parseTarget(target, defaultPort string) (host, port string, err error) {  	if target == "" { -		return "", "", errMissingAddr +		return "", "", internal.ErrMissingAddr  	}  	if ip := net.ParseIP(target); ip != nil {  		// target is an IPv4 or IPv6(without brackets) address @@ -397,7 +368,7 @@ func parseTarget(target, defaultPort string) (host, port string, err error) {  		if port == "" {  			// If the port field is empty (target ends with colon), e.g. "[::1]:",  			// this is an error. -			return "", "", errEndsWithColon +			return "", "", internal.ErrEndsWithColon  		}  		// target has port, i.e ipv4-host:port, [ipv6-host]:port, host-name:port  		if host == "" { diff --git a/vendor/google.golang.org/grpc/internal/resolver/dns/internal/internal.go b/vendor/google.golang.org/grpc/internal/resolver/dns/internal/internal.go new file mode 100644 index 000000000..c7fc557d0 --- /dev/null +++ b/vendor/google.golang.org/grpc/internal/resolver/dns/internal/internal.go @@ -0,0 +1,70 @@ +/* + * + * Copyright 2023 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + *     http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +// Package internal contains functionality internal to the dns resolver package. +package internal + +import ( +	"context" +	"errors" +	"net" +	"time" +) + +// NetResolver groups the methods on net.Resolver that are used by the DNS +// resolver implementation. This allows the default net.Resolver instance to be +// overidden from tests. +type NetResolver interface { +	LookupHost(ctx context.Context, host string) (addrs []string, err error) +	LookupSRV(ctx context.Context, service, proto, name string) (cname string, addrs []*net.SRV, err error) +	LookupTXT(ctx context.Context, name string) (txts []string, err error) +} + +var ( +	// ErrMissingAddr is the error returned when building a DNS resolver when +	// the provided target name is empty. +	ErrMissingAddr = errors.New("dns resolver: missing address") + +	// ErrEndsWithColon is the error returned when building a DNS resolver when +	// the provided target name ends with a colon that is supposed to be the +	// separator between host and port.  E.g. "::" is a valid address as it is +	// an IPv6 address (host only) and "[::]:" is invalid as it ends with a +	// colon as the host and port separator +	ErrEndsWithColon = errors.New("dns resolver: missing port after port-separator colon") +) + +// The following vars are overridden from tests. +var ( +	// MinResolutionRate is the minimum rate at which re-resolutions are +	// allowed. This helps to prevent excessive re-resolution. +	MinResolutionRate = 30 * time.Second + +	// TimeAfterFunc is used by the DNS resolver to wait for the given duration +	// to elapse. In non-test code, this is implemented by time.After.  In test +	// code, this can be used to control the amount of time the resolver is +	// blocked waiting for the duration to elapse. +	TimeAfterFunc func(time.Duration) <-chan time.Time + +	// NewNetResolver returns the net.Resolver instance for the given target. +	NewNetResolver func(string) (NetResolver, error) + +	// AddressDialer is the dialer used to dial the DNS server. It accepts the +	// Host portion of the URL corresponding to the user's dial target and +	// returns a dial function. +	AddressDialer func(address string) func(context.Context, string, string) (net.Conn, error) +) diff --git a/vendor/google.golang.org/grpc/internal/resolver/unix/unix.go b/vendor/google.golang.org/grpc/internal/resolver/unix/unix.go index 160911687..27cd81af9 100644 --- a/vendor/google.golang.org/grpc/internal/resolver/unix/unix.go +++ b/vendor/google.golang.org/grpc/internal/resolver/unix/unix.go @@ -61,6 +61,10 @@ func (b *builder) Scheme() string {  	return b.scheme  } +func (b *builder) OverrideAuthority(resolver.Target) string { +	return "localhost" +} +  type nopResolver struct {  } diff --git a/vendor/google.golang.org/grpc/internal/tcp_keepalive_others.go b/vendor/google.golang.org/grpc/internal/tcp_keepalive_others.go new file mode 100644 index 000000000..4f347edd4 --- /dev/null +++ b/vendor/google.golang.org/grpc/internal/tcp_keepalive_others.go @@ -0,0 +1,29 @@ +//go:build !unix && !windows + +/* + * Copyright 2023 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + *     http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package internal + +import ( +	"net" +) + +// NetDialerWithTCPKeepalive returns a vanilla net.Dialer on non-unix platforms. +func NetDialerWithTCPKeepalive() *net.Dialer { +	return &net.Dialer{} +} diff --git a/vendor/google.golang.org/grpc/internal/tcp_keepalive_unix.go b/vendor/google.golang.org/grpc/internal/tcp_keepalive_unix.go new file mode 100644 index 000000000..078137b7f --- /dev/null +++ b/vendor/google.golang.org/grpc/internal/tcp_keepalive_unix.go @@ -0,0 +1,54 @@ +//go:build unix + +/* + * Copyright 2023 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + *     http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package internal + +import ( +	"net" +	"syscall" +	"time" + +	"golang.org/x/sys/unix" +) + +// NetDialerWithTCPKeepalive returns a net.Dialer that enables TCP keepalives on +// the underlying connection with OS default values for keepalive parameters. +// +// TODO: Once https://github.com/golang/go/issues/62254 lands, and the +// appropriate Go version becomes less than our least supported Go version, we +// should look into using the new API to make things more straightforward. +func NetDialerWithTCPKeepalive() *net.Dialer { +	return &net.Dialer{ +		// Setting a negative value here prevents the Go stdlib from overriding +		// the values of TCP keepalive time and interval. It also prevents the +		// Go stdlib from enabling TCP keepalives by default. +		KeepAlive: time.Duration(-1), +		// This method is called after the underlying network socket is created, +		// but before dialing the socket (or calling its connect() method). The +		// combination of unconditionally enabling TCP keepalives here, and +		// disabling the overriding of TCP keepalive parameters by setting the +		// KeepAlive field to a negative value above, results in OS defaults for +		// the TCP keealive interval and time parameters. +		Control: func(_, _ string, c syscall.RawConn) error { +			return c.Control(func(fd uintptr) { +				unix.SetsockoptInt(int(fd), unix.SOL_SOCKET, unix.SO_KEEPALIVE, 1) +			}) +		}, +	} +} diff --git a/vendor/google.golang.org/grpc/internal/tcp_keepalive_windows.go b/vendor/google.golang.org/grpc/internal/tcp_keepalive_windows.go new file mode 100644 index 000000000..fd7d43a89 --- /dev/null +++ b/vendor/google.golang.org/grpc/internal/tcp_keepalive_windows.go @@ -0,0 +1,54 @@ +//go:build windows + +/* + * Copyright 2023 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + *     http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package internal + +import ( +	"net" +	"syscall" +	"time" + +	"golang.org/x/sys/windows" +) + +// NetDialerWithTCPKeepalive returns a net.Dialer that enables TCP keepalives on +// the underlying connection with OS default values for keepalive parameters. +// +// TODO: Once https://github.com/golang/go/issues/62254 lands, and the +// appropriate Go version becomes less than our least supported Go version, we +// should look into using the new API to make things more straightforward. +func NetDialerWithTCPKeepalive() *net.Dialer { +	return &net.Dialer{ +		// Setting a negative value here prevents the Go stdlib from overriding +		// the values of TCP keepalive time and interval. It also prevents the +		// Go stdlib from enabling TCP keepalives by default. +		KeepAlive: time.Duration(-1), +		// This method is called after the underlying network socket is created, +		// but before dialing the socket (or calling its connect() method). The +		// combination of unconditionally enabling TCP keepalives here, and +		// disabling the overriding of TCP keepalive parameters by setting the +		// KeepAlive field to a negative value above, results in OS defaults for +		// the TCP keealive interval and time parameters. +		Control: func(_, _ string, c syscall.RawConn) error { +			return c.Control(func(fd uintptr) { +				windows.SetsockoptInt(windows.Handle(fd), windows.SOL_SOCKET, windows.SO_KEEPALIVE, 1) +			}) +		}, +	} +} diff --git a/vendor/google.golang.org/grpc/internal/transport/controlbuf.go b/vendor/google.golang.org/grpc/internal/transport/controlbuf.go index b330ccedc..83c382982 100644 --- a/vendor/google.golang.org/grpc/internal/transport/controlbuf.go +++ b/vendor/google.golang.org/grpc/internal/transport/controlbuf.go @@ -535,8 +535,8 @@ const minBatchSize = 1000  // size is too low to give stream goroutines a chance to fill it up.  //  // Upon exiting, if the error causing the exit is not an I/O error, run() -// flushes and closes the underlying connection.  Otherwise, the connection is -// left open to allow the I/O error to be encountered by the reader instead. +// flushes the underlying connection.  The connection is always left open to +// allow different closing behavior on the client and server.  func (l *loopyWriter) run() (err error) {  	defer func() {  		if l.logger.V(logLevel) { @@ -544,7 +544,6 @@ func (l *loopyWriter) run() (err error) {  		}  		if !isIOError(err) {  			l.framer.writer.Flush() -			l.conn.Close()  		}  		l.cbuf.finish()  	}() diff --git a/vendor/google.golang.org/grpc/internal/transport/handler_server.go b/vendor/google.golang.org/grpc/internal/transport/handler_server.go index 17f7a21b5..a9d70e2a1 100644 --- a/vendor/google.golang.org/grpc/internal/transport/handler_server.go +++ b/vendor/google.golang.org/grpc/internal/transport/handler_server.go @@ -75,11 +75,25 @@ func NewServerHandlerTransport(w http.ResponseWriter, r *http.Request, stats []s  		return nil, errors.New(msg)  	} +	var localAddr net.Addr +	if la := r.Context().Value(http.LocalAddrContextKey); la != nil { +		localAddr, _ = la.(net.Addr) +	} +	var authInfo credentials.AuthInfo +	if r.TLS != nil { +		authInfo = credentials.TLSInfo{State: *r.TLS, CommonAuthInfo: credentials.CommonAuthInfo{SecurityLevel: credentials.PrivacyAndIntegrity}} +	} +	p := peer.Peer{ +		Addr:      strAddr(r.RemoteAddr), +		LocalAddr: localAddr, +		AuthInfo:  authInfo, +	}  	st := &serverHandlerTransport{  		rw:             w,  		req:            r,  		closedCh:       make(chan struct{}),  		writes:         make(chan func()), +		peer:           p,  		contentType:    contentType,  		contentSubtype: contentSubtype,  		stats:          stats, @@ -134,6 +148,8 @@ type serverHandlerTransport struct {  	headerMD metadata.MD +	peer peer.Peer +  	closeOnce sync.Once  	closedCh  chan struct{} // closed on Close @@ -165,7 +181,13 @@ func (ht *serverHandlerTransport) Close(err error) {  	})  } -func (ht *serverHandlerTransport) RemoteAddr() net.Addr { return strAddr(ht.req.RemoteAddr) } +func (ht *serverHandlerTransport) Peer() *peer.Peer { +	return &peer.Peer{ +		Addr:      ht.peer.Addr, +		LocalAddr: ht.peer.LocalAddr, +		AuthInfo:  ht.peer.AuthInfo, +	} +}  // strAddr is a net.Addr backed by either a TCP "ip:port" string, or  // the empty string if unknown. @@ -347,10 +369,8 @@ func (ht *serverHandlerTransport) WriteHeader(s *Stream, md metadata.MD) error {  	return err  } -func (ht *serverHandlerTransport) HandleStreams(startStream func(*Stream)) { +func (ht *serverHandlerTransport) HandleStreams(ctx context.Context, startStream func(*Stream)) {  	// With this transport type there will be exactly 1 stream: this HTTP request. - -	ctx := ht.req.Context()  	var cancel context.CancelFunc  	if ht.timeoutSet {  		ctx, cancel = context.WithTimeout(ctx, ht.timeout) @@ -370,34 +390,19 @@ func (ht *serverHandlerTransport) HandleStreams(startStream func(*Stream)) {  		ht.Close(errors.New("request is done processing"))  	}() +	ctx = metadata.NewIncomingContext(ctx, ht.headerMD)  	req := ht.req -  	s := &Stream{ -		id:             0, // irrelevant -		requestRead:    func(int) {}, -		cancel:         cancel, -		buf:            newRecvBuffer(), -		st:             ht, -		method:         req.URL.Path, -		recvCompress:   req.Header.Get("grpc-encoding"), -		contentSubtype: ht.contentSubtype, -	} -	pr := &peer.Peer{ -		Addr: ht.RemoteAddr(), -	} -	if req.TLS != nil { -		pr.AuthInfo = credentials.TLSInfo{State: *req.TLS, CommonAuthInfo: credentials.CommonAuthInfo{SecurityLevel: credentials.PrivacyAndIntegrity}} -	} -	ctx = metadata.NewIncomingContext(ctx, ht.headerMD) -	s.ctx = peer.NewContext(ctx, pr) -	for _, sh := range ht.stats { -		s.ctx = sh.TagRPC(s.ctx, &stats.RPCTagInfo{FullMethodName: s.method}) -		inHeader := &stats.InHeader{ -			FullMethod:  s.method, -			RemoteAddr:  ht.RemoteAddr(), -			Compression: s.recvCompress, -		} -		sh.HandleRPC(s.ctx, inHeader) +		id:               0, // irrelevant +		ctx:              ctx, +		requestRead:      func(int) {}, +		cancel:           cancel, +		buf:              newRecvBuffer(), +		st:               ht, +		method:           req.URL.Path, +		recvCompress:     req.Header.Get("grpc-encoding"), +		contentSubtype:   ht.contentSubtype, +		headerWireLength: 0, // won't have access to header wire length until golang/go#18997.  	}  	s.trReader = &transportReader{  		reader:        &recvBufferReader{ctx: s.ctx, ctxDone: s.ctx.Done(), recv: s.buf, freeBuffer: func(*bytes.Buffer) {}}, diff --git a/vendor/google.golang.org/grpc/internal/transport/http2_client.go b/vendor/google.golang.org/grpc/internal/transport/http2_client.go index d6f5c4935..eff879964 100644 --- a/vendor/google.golang.org/grpc/internal/transport/http2_client.go +++ b/vendor/google.golang.org/grpc/internal/transport/http2_client.go @@ -36,6 +36,7 @@ import (  	"golang.org/x/net/http2/hpack"  	"google.golang.org/grpc/codes"  	"google.golang.org/grpc/credentials" +	"google.golang.org/grpc/internal"  	"google.golang.org/grpc/internal/channelz"  	icredentials "google.golang.org/grpc/internal/credentials"  	"google.golang.org/grpc/internal/grpclog" @@ -43,7 +44,7 @@ import (  	"google.golang.org/grpc/internal/grpcutil"  	imetadata "google.golang.org/grpc/internal/metadata"  	istatus "google.golang.org/grpc/internal/status" -	"google.golang.org/grpc/internal/syscall" +	isyscall "google.golang.org/grpc/internal/syscall"  	"google.golang.org/grpc/internal/transport/networktype"  	"google.golang.org/grpc/keepalive"  	"google.golang.org/grpc/metadata" @@ -58,6 +59,8 @@ import (  // atomically.  var clientConnectionCounter uint64 +var metadataFromOutgoingContextRaw = internal.FromOutgoingContextRaw.(func(context.Context) (metadata.MD, [][]string, bool)) +  // http2Client implements the ClientTransport interface with HTTP2.  type http2Client struct {  	lastRead  int64 // Keep this field 64-bit aligned. Accessed atomically. @@ -176,7 +179,7 @@ func dial(ctx context.Context, fn func(context.Context, string) (net.Conn, error  	if networkType == "tcp" && useProxy {  		return proxyDial(ctx, address, grpcUA)  	} -	return (&net.Dialer{}).DialContext(ctx, networkType, address) +	return internal.NetDialerWithTCPKeepalive().DialContext(ctx, networkType, address)  }  func isTemporary(err error) bool { @@ -262,7 +265,7 @@ func newHTTP2Client(connectCtx, ctx context.Context, addr resolver.Address, opts  	}  	keepaliveEnabled := false  	if kp.Time != infinity { -		if err = syscall.SetTCPUserTimeout(conn, kp.Timeout); err != nil { +		if err = isyscall.SetTCPUserTimeout(conn, kp.Timeout); err != nil {  			return nil, connectionErrorf(false, err, "transport: failed to set TCP_USER_TIMEOUT: %v", err)  		}  		keepaliveEnabled = true @@ -448,7 +451,13 @@ func newHTTP2Client(connectCtx, ctx context.Context, addr resolver.Address, opts  	}  	go func() {  		t.loopy = newLoopyWriter(clientSide, t.framer, t.controlBuf, t.bdpEst, t.conn, t.logger) -		t.loopy.run() +		if err := t.loopy.run(); !isIOError(err) { +			// Immediately close the connection, as the loopy writer returns +			// when there are no more active streams and we were draining (the +			// server sent a GOAWAY).  For I/O errors, the reader will hit it +			// after draining any remaining incoming data. +			t.conn.Close() +		}  		close(t.writerDone)  	}()  	return t, nil @@ -493,8 +502,9 @@ func (t *http2Client) newStream(ctx context.Context, callHdr *CallHdr) *Stream {  func (t *http2Client) getPeer() *peer.Peer {  	return &peer.Peer{ -		Addr:     t.remoteAddr, -		AuthInfo: t.authInfo, // Can be nil +		Addr:      t.remoteAddr, +		AuthInfo:  t.authInfo, // Can be nil +		LocalAddr: t.localAddr,  	}  } @@ -566,7 +576,7 @@ func (t *http2Client) createHeaderFields(ctx context.Context, callHdr *CallHdr)  		headerFields = append(headerFields, hpack.HeaderField{Name: "grpc-trace-bin", Value: encodeBinHeader(b)})  	} -	if md, added, ok := metadata.FromOutgoingContextRaw(ctx); ok { +	if md, added, ok := metadataFromOutgoingContextRaw(ctx); ok {  		var k string  		for k, vv := range md {  			// HTTP doesn't allow you to set pseudoheaders after non pseudoheaders were set. @@ -1321,10 +1331,8 @@ func (t *http2Client) handleGoAway(f *http2.GoAwayFrame) {  	for streamID, stream := range t.activeStreams {  		if streamID > id && streamID <= upperLimit {  			// The stream was unprocessed by the server. -			if streamID > id && streamID <= upperLimit { -				atomic.StoreUint32(&stream.unprocessed, 1) -				streamsToClose = append(streamsToClose, stream) -			} +			atomic.StoreUint32(&stream.unprocessed, 1) +			streamsToClose = append(streamsToClose, stream)  		}  	}  	t.mu.Unlock() diff --git a/vendor/google.golang.org/grpc/internal/transport/http2_server.go b/vendor/google.golang.org/grpc/internal/transport/http2_server.go index 6fa1eb419..a206e2eef 100644 --- a/vendor/google.golang.org/grpc/internal/transport/http2_server.go +++ b/vendor/google.golang.org/grpc/internal/transport/http2_server.go @@ -68,18 +68,15 @@ var serverConnectionCounter uint64  // http2Server implements the ServerTransport interface with HTTP2.  type http2Server struct { -	lastRead    int64 // Keep this field 64-bit aligned. Accessed atomically. -	ctx         context.Context -	done        chan struct{} -	conn        net.Conn -	loopy       *loopyWriter -	readerDone  chan struct{} // sync point to enable testing. -	writerDone  chan struct{} // sync point to enable testing. -	remoteAddr  net.Addr -	localAddr   net.Addr -	authInfo    credentials.AuthInfo // auth info about the connection -	inTapHandle tap.ServerInHandle -	framer      *framer +	lastRead        int64 // Keep this field 64-bit aligned. Accessed atomically. +	done            chan struct{} +	conn            net.Conn +	loopy           *loopyWriter +	readerDone      chan struct{} // sync point to enable testing. +	loopyWriterDone chan struct{} +	peer            peer.Peer +	inTapHandle     tap.ServerInHandle +	framer          *framer  	// The max number of concurrent streams.  	maxStreams uint32  	// controlBuf delivers all the control related tasks (e.g., window @@ -243,16 +240,18 @@ func NewServerTransport(conn net.Conn, config *ServerConfig) (_ ServerTransport,  	}  	done := make(chan struct{}) +	peer := peer.Peer{ +		Addr:      conn.RemoteAddr(), +		LocalAddr: conn.LocalAddr(), +		AuthInfo:  authInfo, +	}  	t := &http2Server{ -		ctx:               setConnection(context.Background(), rawConn),  		done:              done,  		conn:              conn, -		remoteAddr:        conn.RemoteAddr(), -		localAddr:         conn.LocalAddr(), -		authInfo:          authInfo, +		peer:              peer,  		framer:            framer,  		readerDone:        make(chan struct{}), -		writerDone:        make(chan struct{}), +		loopyWriterDone:   make(chan struct{}),  		maxStreams:        config.MaxStreams,  		inTapHandle:       config.InTapHandle,  		fc:                &trInFlow{limit: uint32(icwz)}, @@ -267,8 +266,6 @@ func NewServerTransport(conn net.Conn, config *ServerConfig) (_ ServerTransport,  		bufferPool:        newBufferPool(),  	}  	t.logger = prefixLoggerForServerTransport(t) -	// Add peer information to the http2server context. -	t.ctx = peer.NewContext(t.ctx, t.getPeer())  	t.controlBuf = newControlBuffer(t.done)  	if dynamicWindow { @@ -277,15 +274,7 @@ func NewServerTransport(conn net.Conn, config *ServerConfig) (_ ServerTransport,  			updateFlowControl: t.updateFlowControl,  		}  	} -	for _, sh := range t.stats { -		t.ctx = sh.TagConn(t.ctx, &stats.ConnTagInfo{ -			RemoteAddr: t.remoteAddr, -			LocalAddr:  t.localAddr, -		}) -		connBegin := &stats.ConnBegin{} -		sh.HandleConn(t.ctx, connBegin) -	} -	t.channelzID, err = channelz.RegisterNormalSocket(t, config.ChannelzParentID, fmt.Sprintf("%s -> %s", t.remoteAddr, t.localAddr)) +	t.channelzID, err = channelz.RegisterNormalSocket(t, config.ChannelzParentID, fmt.Sprintf("%s -> %s", t.peer.Addr, t.peer.LocalAddr))  	if err != nil {  		return nil, err  	} @@ -333,8 +322,24 @@ func NewServerTransport(conn net.Conn, config *ServerConfig) (_ ServerTransport,  	go func() {  		t.loopy = newLoopyWriter(serverSide, t.framer, t.controlBuf, t.bdpEst, t.conn, t.logger)  		t.loopy.ssGoAwayHandler = t.outgoingGoAwayHandler -		t.loopy.run() -		close(t.writerDone) +		err := t.loopy.run() +		close(t.loopyWriterDone) +		if !isIOError(err) { +			// Close the connection if a non-I/O error occurs (for I/O errors +			// the reader will also encounter the error and close).  Wait 1 +			// second before closing the connection, or when the reader is done +			// (i.e. the client already closed the connection or a connection +			// error occurred).  This avoids the potential problem where there +			// is unread data on the receive side of the connection, which, if +			// closed, would lead to a TCP RST instead of FIN, and the client +			// encountering errors.  For more info: +			// https://github.com/grpc/grpc-go/issues/5358 +			select { +			case <-t.readerDone: +			case <-time.After(time.Second): +			} +			t.conn.Close() +		}  	}()  	go t.keepalive()  	return t, nil @@ -342,7 +347,7 @@ func NewServerTransport(conn net.Conn, config *ServerConfig) (_ ServerTransport,  // operateHeaders takes action on the decoded headers. Returns an error if fatal  // error encountered and transport needs to close, otherwise returns nil. -func (t *http2Server) operateHeaders(frame *http2.MetaHeadersFrame, handle func(*Stream)) error { +func (t *http2Server) operateHeaders(ctx context.Context, frame *http2.MetaHeadersFrame, handle func(*Stream)) error {  	// Acquire max stream ID lock for entire duration  	t.maxStreamMu.Lock()  	defer t.maxStreamMu.Unlock() @@ -369,10 +374,11 @@ func (t *http2Server) operateHeaders(frame *http2.MetaHeadersFrame, handle func(  	buf := newRecvBuffer()  	s := &Stream{ -		id:  streamID, -		st:  t, -		buf: buf, -		fc:  &inFlow{limit: uint32(t.initialWindowSize)}, +		id:               streamID, +		st:               t, +		buf:              buf, +		fc:               &inFlow{limit: uint32(t.initialWindowSize)}, +		headerWireLength: int(frame.Header().Length),  	}  	var (  		// if false, content-type was missing or invalid @@ -511,9 +517,9 @@ func (t *http2Server) operateHeaders(frame *http2.MetaHeadersFrame, handle func(  		s.state = streamReadDone  	}  	if timeoutSet { -		s.ctx, s.cancel = context.WithTimeout(t.ctx, timeout) +		s.ctx, s.cancel = context.WithTimeout(ctx, timeout)  	} else { -		s.ctx, s.cancel = context.WithCancel(t.ctx) +		s.ctx, s.cancel = context.WithCancel(ctx)  	}  	// Attach the received metadata to the context. @@ -592,18 +598,6 @@ func (t *http2Server) operateHeaders(frame *http2.MetaHeadersFrame, handle func(  	s.requestRead = func(n int) {  		t.adjustWindow(s, uint32(n))  	} -	for _, sh := range t.stats { -		s.ctx = sh.TagRPC(s.ctx, &stats.RPCTagInfo{FullMethodName: s.method}) -		inHeader := &stats.InHeader{ -			FullMethod:  s.method, -			RemoteAddr:  t.remoteAddr, -			LocalAddr:   t.localAddr, -			Compression: s.recvCompress, -			WireLength:  int(frame.Header().Length), -			Header:      mdata.Copy(), -		} -		sh.HandleRPC(s.ctx, inHeader) -	}  	s.ctxDone = s.ctx.Done()  	s.wq = newWriteQuota(defaultWriteQuota, s.ctxDone)  	s.trReader = &transportReader{ @@ -629,8 +623,11 @@ func (t *http2Server) operateHeaders(frame *http2.MetaHeadersFrame, handle func(  // HandleStreams receives incoming streams using the given handler. This is  // typically run in a separate goroutine.  // traceCtx attaches trace to ctx and returns the new context. -func (t *http2Server) HandleStreams(handle func(*Stream)) { -	defer close(t.readerDone) +func (t *http2Server) HandleStreams(ctx context.Context, handle func(*Stream)) { +	defer func() { +		close(t.readerDone) +		<-t.loopyWriterDone +	}()  	for {  		t.controlBuf.throttle()  		frame, err := t.framer.fr.ReadFrame() @@ -664,7 +661,7 @@ func (t *http2Server) HandleStreams(handle func(*Stream)) {  		}  		switch frame := frame.(type) {  		case *http2.MetaHeadersFrame: -			if err := t.operateHeaders(frame, handle); err != nil { +			if err := t.operateHeaders(ctx, frame, handle); err != nil {  				t.Close(err)  				break  			} @@ -979,7 +976,12 @@ func (t *http2Server) WriteHeader(s *Stream, md metadata.MD) error {  		}  	}  	if err := t.writeHeaderLocked(s); err != nil { -		return status.Convert(err).Err() +		switch e := err.(type) { +		case ConnectionError: +			return status.Error(codes.Unavailable, e.Desc) +		default: +			return status.Convert(err).Err() +		}  	}  	return nil  } @@ -1242,10 +1244,6 @@ func (t *http2Server) Close(err error) {  	for _, s := range streams {  		s.cancel()  	} -	for _, sh := range t.stats { -		connEnd := &stats.ConnEnd{} -		sh.HandleConn(t.ctx, connEnd) -	}  }  // deleteStream deletes the stream s from transport's active streams. @@ -1311,10 +1309,6 @@ func (t *http2Server) closeStream(s *Stream, rst bool, rstCode http2.ErrCode, eo  	})  } -func (t *http2Server) RemoteAddr() net.Addr { -	return t.remoteAddr -} -  func (t *http2Server) Drain(debugData string) {  	t.mu.Lock()  	defer t.mu.Unlock() @@ -1351,6 +1345,7 @@ func (t *http2Server) outgoingGoAwayHandler(g *goAway) (bool, error) {  		if err := t.framer.fr.WriteGoAway(sid, g.code, g.debugData); err != nil {  			return false, err  		} +		t.framer.writer.Flush()  		if retErr != nil {  			return false, retErr  		} @@ -1371,7 +1366,7 @@ func (t *http2Server) outgoingGoAwayHandler(g *goAway) (bool, error) {  		return false, err  	}  	go func() { -		timer := time.NewTimer(time.Minute) +		timer := time.NewTimer(5 * time.Second)  		defer timer.Stop()  		select {  		case <-t.drainEvent.Done(): @@ -1397,11 +1392,11 @@ func (t *http2Server) ChannelzMetric() *channelz.SocketInternalMetric {  		LastMessageReceivedTimestamp:     time.Unix(0, atomic.LoadInt64(&t.czData.lastMsgRecvTime)),  		LocalFlowControlWindow:           int64(t.fc.getSize()),  		SocketOptions:                    channelz.GetSocketOption(t.conn), -		LocalAddr:                        t.localAddr, -		RemoteAddr:                       t.remoteAddr, +		LocalAddr:                        t.peer.LocalAddr, +		RemoteAddr:                       t.peer.Addr,  		// RemoteName :  	} -	if au, ok := t.authInfo.(credentials.ChannelzSecurityInfo); ok { +	if au, ok := t.peer.AuthInfo.(credentials.ChannelzSecurityInfo); ok {  		s.Security = au.GetSecurityValue()  	}  	s.RemoteFlowControlWindow = t.getOutFlowWindow() @@ -1433,10 +1428,12 @@ func (t *http2Server) getOutFlowWindow() int64 {  	}  } -func (t *http2Server) getPeer() *peer.Peer { +// Peer returns the peer of the transport. +func (t *http2Server) Peer() *peer.Peer {  	return &peer.Peer{ -		Addr:     t.remoteAddr, -		AuthInfo: t.authInfo, // Can be nil +		Addr:      t.peer.Addr, +		LocalAddr: t.peer.LocalAddr, +		AuthInfo:  t.peer.AuthInfo, // Can be nil  	}  } @@ -1461,6 +1458,6 @@ func GetConnection(ctx context.Context) net.Conn {  // SetConnection adds the connection to the context to be able to get  // information about the destination ip and port for an incoming RPC. This also  // allows any unary or streaming interceptors to see the connection. -func setConnection(ctx context.Context, conn net.Conn) context.Context { +func SetConnection(ctx context.Context, conn net.Conn) context.Context {  	return context.WithValue(ctx, connectionKey{}, conn)  } diff --git a/vendor/google.golang.org/grpc/internal/transport/proxy.go b/vendor/google.golang.org/grpc/internal/transport/proxy.go index 415961987..24fa10325 100644 --- a/vendor/google.golang.org/grpc/internal/transport/proxy.go +++ b/vendor/google.golang.org/grpc/internal/transport/proxy.go @@ -28,6 +28,8 @@ import (  	"net/http"  	"net/http/httputil"  	"net/url" + +	"google.golang.org/grpc/internal"  )  const proxyAuthHeaderKey = "Proxy-Authorization" @@ -112,7 +114,7 @@ func doHTTPConnectHandshake(ctx context.Context, conn net.Conn, backendAddr stri  // proxyDial dials, connecting to a proxy first if necessary. Checks if a proxy  // is necessary, dials, does the HTTP CONNECT handshake, and returns the  // connection. -func proxyDial(ctx context.Context, addr string, grpcUA string) (conn net.Conn, err error) { +func proxyDial(ctx context.Context, addr string, grpcUA string) (net.Conn, error) {  	newAddr := addr  	proxyURL, err := mapAddress(addr)  	if err != nil { @@ -122,15 +124,15 @@ func proxyDial(ctx context.Context, addr string, grpcUA string) (conn net.Conn,  		newAddr = proxyURL.Host  	} -	conn, err = (&net.Dialer{}).DialContext(ctx, "tcp", newAddr) +	conn, err := internal.NetDialerWithTCPKeepalive().DialContext(ctx, "tcp", newAddr)  	if err != nil { -		return +		return nil, err  	} -	if proxyURL != nil { +	if proxyURL == nil {  		// proxy is disabled if proxyURL is nil. -		conn, err = doHTTPConnectHandshake(ctx, conn, addr, proxyURL, grpcUA) +		return conn, err  	} -	return +	return doHTTPConnectHandshake(ctx, conn, addr, proxyURL, grpcUA)  }  func sendHTTPRequest(ctx context.Context, req *http.Request, conn net.Conn) error { diff --git a/vendor/google.golang.org/grpc/internal/transport/transport.go b/vendor/google.golang.org/grpc/internal/transport/transport.go index aac056e72..b7b8fec18 100644 --- a/vendor/google.golang.org/grpc/internal/transport/transport.go +++ b/vendor/google.golang.org/grpc/internal/transport/transport.go @@ -37,6 +37,7 @@ import (  	"google.golang.org/grpc/internal/channelz"  	"google.golang.org/grpc/keepalive"  	"google.golang.org/grpc/metadata" +	"google.golang.org/grpc/peer"  	"google.golang.org/grpc/resolver"  	"google.golang.org/grpc/stats"  	"google.golang.org/grpc/status" @@ -265,7 +266,8 @@ type Stream struct {  	// headerValid indicates whether a valid header was received.  Only  	// meaningful after headerChan is closed (always call waitOnHeader() before  	// reading its value).  Not valid on server side. -	headerValid bool +	headerValid      bool +	headerWireLength int // Only set on server side.  	// hdrMu protects header and trailer metadata on the server-side.  	hdrMu sync.Mutex @@ -425,6 +427,12 @@ func (s *Stream) Context() context.Context {  	return s.ctx  } +// SetContext sets the context of the stream. This will be deleted once the +// stats handler callouts all move to gRPC layer. +func (s *Stream) SetContext(ctx context.Context) { +	s.ctx = ctx +} +  // Method returns the method for the stream.  func (s *Stream) Method() string {  	return s.method @@ -437,6 +445,12 @@ func (s *Stream) Status() *status.Status {  	return s.status  } +// HeaderWireLength returns the size of the headers of the stream as received +// from the wire. Valid only on the server. +func (s *Stream) HeaderWireLength() int { +	return s.headerWireLength +} +  // SetHeader sets the header metadata. This can be called multiple times.  // Server side only.  // This should not be called in parallel to other data writes. @@ -698,7 +712,7 @@ type ClientTransport interface {  // Write methods for a given Stream will be called serially.  type ServerTransport interface {  	// HandleStreams receives incoming streams using the given handler. -	HandleStreams(func(*Stream)) +	HandleStreams(context.Context, func(*Stream))  	// WriteHeader sends the header metadata for the given stream.  	// WriteHeader may not be called on all streams. @@ -717,8 +731,8 @@ type ServerTransport interface {  	// handlers will be terminated asynchronously.  	Close(err error) -	// RemoteAddr returns the remote network address. -	RemoteAddr() net.Addr +	// Peer returns the peer of the server transport. +	Peer() *peer.Peer  	// Drain notifies the client this ServerTransport stops accepting new RPCs.  	Drain(debugData string)  | 
