diff options
Diffstat (limited to 'vendor/google.golang.org/grpc')
63 files changed, 2495 insertions, 1213 deletions
diff --git a/vendor/google.golang.org/grpc/CONTRIBUTING.md b/vendor/google.golang.org/grpc/CONTRIBUTING.md index 0854d298e..d9bfa6e1e 100644 --- a/vendor/google.golang.org/grpc/CONTRIBUTING.md +++ b/vendor/google.golang.org/grpc/CONTRIBUTING.md @@ -4,7 +4,7 @@ We definitely welcome your patches and contributions to gRPC! Please read the gR  organization's [governance rules](https://github.com/grpc/grpc-community/blob/master/governance.md)  and [contribution guidelines](https://github.com/grpc/grpc-community/blob/master/CONTRIBUTING.md) before proceeding. -If you are new to github, please start by reading [Pull Request howto](https://help.github.com/articles/about-pull-requests/) +If you are new to GitHub, please start by reading [Pull Request howto](https://help.github.com/articles/about-pull-requests/)  ## Legal requirements @@ -25,8 +25,8 @@ How to get your contributions merged smoothly and quickly.    is a great place to start. These issues are well-documented and usually can be    resolved with a single pull request. -- If you are adding a new file, make sure it has the copyright message template  -  at the top as a comment. You can copy over the message from an existing file  +- If you are adding a new file, make sure it has the copyright message template +  at the top as a comment. You can copy over the message from an existing file    and update the year.  - The grpc package should only depend on standard Go packages and a small number @@ -39,12 +39,12 @@ How to get your contributions merged smoothly and quickly.    proposal](https://github.com/grpc/proposal).  - Provide a good **PR description** as a record of **what** change is being made -  and **why** it was made. Link to a github issue if it exists. +  and **why** it was made. Link to a GitHub issue if it exists. -- If you want to fix formatting or style, consider whether your changes are an  -  obvious improvement or might be considered a personal preference. If a style  -  change is based on preference, it likely will not be accepted. If it corrects  -  widely agreed-upon anti-patterns, then please do create a PR and explain the  +- If you want to fix formatting or style, consider whether your changes are an +  obvious improvement or might be considered a personal preference. If a style +  change is based on preference, it likely will not be accepted. If it corrects +  widely agreed-upon anti-patterns, then please do create a PR and explain the    benefits of the change.  - Unless your PR is trivial, you should expect there will be reviewer comments diff --git a/vendor/google.golang.org/grpc/balancer/balancer.go b/vendor/google.golang.org/grpc/balancer/balancer.go index b181f386a..382ad6941 100644 --- a/vendor/google.golang.org/grpc/balancer/balancer.go +++ b/vendor/google.golang.org/grpc/balancer/balancer.go @@ -73,17 +73,6 @@ func unregisterForTesting(name string) {  	delete(m, name)  } -// connectedAddress returns the connected address for a SubConnState. The -// address is only valid if the state is READY. -func connectedAddress(scs SubConnState) resolver.Address { -	return scs.connectedAddress -} - -// setConnectedAddress sets the connected address for a SubConnState. -func setConnectedAddress(scs *SubConnState, addr resolver.Address) { -	scs.connectedAddress = addr -} -  func init() {  	internal.BalancerUnregister = unregisterForTesting  	internal.ConnectedAddress = connectedAddress @@ -106,54 +95,6 @@ func Get(name string) Builder {  	return nil  } -// A SubConn represents a single connection to a gRPC backend service. -// -// Each SubConn contains a list of addresses. -// -// All SubConns start in IDLE, and will not try to connect. To trigger the -// connecting, Balancers must call Connect.  If a connection re-enters IDLE, -// Balancers must call Connect again to trigger a new connection attempt. -// -// gRPC will try to connect to the addresses in sequence, and stop trying the -// remainder once the first connection is successful. If an attempt to connect -// to all addresses encounters an error, the SubConn will enter -// TRANSIENT_FAILURE for a backoff period, and then transition to IDLE. -// -// Once established, if a connection is lost, the SubConn will transition -// directly to IDLE. -// -// This interface is to be implemented by gRPC. Users should not need their own -// implementation of this interface. For situations like testing, any -// implementations should embed this interface. This allows gRPC to add new -// methods to this interface. -type SubConn interface { -	// UpdateAddresses updates the addresses used in this SubConn. -	// gRPC checks if currently-connected address is still in the new list. -	// If it's in the list, the connection will be kept. -	// If it's not in the list, the connection will gracefully closed, and -	// a new connection will be created. -	// -	// This will trigger a state transition for the SubConn. -	// -	// Deprecated: this method will be removed.  Create new SubConns for new -	// addresses instead. -	UpdateAddresses([]resolver.Address) -	// Connect starts the connecting for this SubConn. -	Connect() -	// GetOrBuildProducer returns a reference to the existing Producer for this -	// ProducerBuilder in this SubConn, or, if one does not currently exist, -	// creates a new one and returns it.  Returns a close function which must -	// be called when the Producer is no longer needed. -	GetOrBuildProducer(ProducerBuilder) (p Producer, close func()) -	// Shutdown shuts down the SubConn gracefully.  Any started RPCs will be -	// allowed to complete.  No future calls should be made on the SubConn. -	// One final state update will be delivered to the StateListener (or -	// UpdateSubConnState; deprecated) with ConnectivityState of Shutdown to -	// indicate the shutdown operation.  This may be delivered before -	// in-progress RPCs are complete and the actual connection is closed. -	Shutdown() -} -  // NewSubConnOptions contains options to create new SubConn.  type NewSubConnOptions struct {  	// CredsBundle is the credentials bundle that will be used in the created @@ -421,18 +362,6 @@ type ExitIdler interface {  	ExitIdle()  } -// SubConnState describes the state of a SubConn. -type SubConnState struct { -	// ConnectivityState is the connectivity state of the SubConn. -	ConnectivityState connectivity.State -	// ConnectionError is set if the ConnectivityState is TransientFailure, -	// describing the reason the SubConn failed.  Otherwise, it is nil. -	ConnectionError error -	// connectedAddr contains the connected address when ConnectivityState is -	// Ready. Otherwise, it is indeterminate. -	connectedAddress resolver.Address -} -  // ClientConnState describes the state of a ClientConn relevant to the  // balancer.  type ClientConnState struct { @@ -445,20 +374,3 @@ type ClientConnState struct {  // ErrBadResolverState may be returned by UpdateClientConnState to indicate a  // problem with the provided name resolver data.  var ErrBadResolverState = errors.New("bad resolver state") - -// A ProducerBuilder is a simple constructor for a Producer.  It is used by the -// SubConn to create producers when needed. -type ProducerBuilder interface { -	// Build creates a Producer.  The first parameter is always a -	// grpc.ClientConnInterface (a type to allow creating RPCs/streams on the -	// associated SubConn), but is declared as `any` to avoid a dependency -	// cycle.  Should also return a close function that will be called when all -	// references to the Producer have been given up. -	Build(grpcClientConnInterface any) (p Producer, close func()) -} - -// A Producer is a type shared among potentially many consumers.  It is -// associated with a SubConn, and an implementation will typically contain -// other methods to provide additional functionality, e.g. configuration or -// subscription registration. -type Producer any diff --git a/vendor/google.golang.org/grpc/balancer/base/balancer.go b/vendor/google.golang.org/grpc/balancer/base/balancer.go index a7f1eeec8..d5ed172ae 100644 --- a/vendor/google.golang.org/grpc/balancer/base/balancer.go +++ b/vendor/google.golang.org/grpc/balancer/base/balancer.go @@ -36,7 +36,7 @@ type baseBuilder struct {  	config        Config  } -func (bb *baseBuilder) Build(cc balancer.ClientConn, opt balancer.BuildOptions) balancer.Balancer { +func (bb *baseBuilder) Build(cc balancer.ClientConn, _ balancer.BuildOptions) balancer.Balancer {  	bal := &baseBalancer{  		cc:            cc,  		pickerBuilder: bb.pickerBuilder, @@ -133,7 +133,7 @@ func (b *baseBalancer) UpdateClientConnState(s balancer.ClientConnState) error {  		}  	}  	// If resolver state contains no addresses, return an error so ClientConn -	// will trigger re-resolve. Also records this as an resolver error, so when +	// will trigger re-resolve. Also records this as a resolver error, so when  	// the overall state turns transient failure, the error message will have  	// the zero address information.  	if len(s.ResolverState.Addresses) == 0 { @@ -259,6 +259,6 @@ type errPicker struct {  	err error // Pick() always returns this err.  } -func (p *errPicker) Pick(info balancer.PickInfo) (balancer.PickResult, error) { +func (p *errPicker) Pick(balancer.PickInfo) (balancer.PickResult, error) {  	return balancer.PickResult{}, p.err  } diff --git a/vendor/google.golang.org/grpc/balancer/pickfirst/internal/internal.go b/vendor/google.golang.org/grpc/balancer/pickfirst/internal/internal.go new file mode 100644 index 000000000..7d66cb491 --- /dev/null +++ b/vendor/google.golang.org/grpc/balancer/pickfirst/internal/internal.go @@ -0,0 +1,35 @@ +/* + * Copyright 2024 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + *     http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +// Package internal contains code internal to the pickfirst package. +package internal + +import ( +	rand "math/rand/v2" +	"time" +) + +var ( +	// RandShuffle pseudo-randomizes the order of addresses. +	RandShuffle = rand.Shuffle +	// TimeAfterFunc allows mocking the timer for testing connection delay +	// related functionality. +	TimeAfterFunc = func(d time.Duration, f func()) func() { +		timer := time.AfterFunc(d, f) +		return func() { timer.Stop() } +	} +) diff --git a/vendor/google.golang.org/grpc/balancer/pickfirst/pickfirst.go b/vendor/google.golang.org/grpc/balancer/pickfirst/pickfirst.go index 5b592f48a..ea8899818 100644 --- a/vendor/google.golang.org/grpc/balancer/pickfirst/pickfirst.go +++ b/vendor/google.golang.org/grpc/balancer/pickfirst/pickfirst.go @@ -23,21 +23,26 @@ import (  	"encoding/json"  	"errors"  	"fmt" -	"math/rand" +	rand "math/rand/v2"  	"google.golang.org/grpc/balancer" +	"google.golang.org/grpc/balancer/pickfirst/internal"  	"google.golang.org/grpc/connectivity"  	"google.golang.org/grpc/grpclog" -	"google.golang.org/grpc/internal" +	"google.golang.org/grpc/internal/envconfig"  	internalgrpclog "google.golang.org/grpc/internal/grpclog"  	"google.golang.org/grpc/internal/pretty"  	"google.golang.org/grpc/resolver"  	"google.golang.org/grpc/serviceconfig" + +	_ "google.golang.org/grpc/balancer/pickfirst/pickfirstleaf" // For automatically registering the new pickfirst if required.  )  func init() { +	if envconfig.NewPickFirstEnabled { +		return +	}  	balancer.Register(pickfirstBuilder{}) -	internal.ShuffleAddressListForTesting = func(n int, swap func(i, j int)) { rand.Shuffle(n, swap) }  }  var logger = grpclog.Component("pick-first-lb") @@ -50,7 +55,7 @@ const (  type pickfirstBuilder struct{} -func (pickfirstBuilder) Build(cc balancer.ClientConn, opt balancer.BuildOptions) balancer.Balancer { +func (pickfirstBuilder) Build(cc balancer.ClientConn, _ balancer.BuildOptions) balancer.Balancer {  	b := &pickfirstBalancer{cc: cc}  	b.logger = internalgrpclog.NewPrefixLogger(logger, fmt.Sprintf(logPrefix, b))  	return b @@ -103,10 +108,13 @@ func (b *pickfirstBalancer) ResolverError(err error) {  	})  } +// Shuffler is an interface for shuffling an address list.  type Shuffler interface {  	ShuffleAddressListForTesting(n int, swap func(i, j int))  } +// ShuffleAddressListForTesting pseudo-randomizes the order of addresses.  n +// is the number of elements.  swap swaps the elements with indexes i and j.  func ShuffleAddressListForTesting(n int, swap func(i, j int)) { rand.Shuffle(n, swap) }  func (b *pickfirstBalancer) UpdateClientConnState(state balancer.ClientConnState) error { @@ -140,7 +148,7 @@ func (b *pickfirstBalancer) UpdateClientConnState(state balancer.ClientConnState  		// within each endpoint. - A61  		if cfg.ShuffleAddressList {  			endpoints = append([]resolver.Endpoint{}, endpoints...) -			internal.ShuffleAddressListForTesting.(func(int, func(int, int)))(len(endpoints), func(i, j int) { endpoints[i], endpoints[j] = endpoints[j], endpoints[i] }) +			internal.RandShuffle(len(endpoints), func(i, j int) { endpoints[i], endpoints[j] = endpoints[j], endpoints[i] })  		}  		// "Flatten the list by concatenating the ordered list of addresses for each diff --git a/vendor/google.golang.org/grpc/balancer/pickfirst/pickfirstleaf/pickfirstleaf.go b/vendor/google.golang.org/grpc/balancer/pickfirst/pickfirstleaf/pickfirstleaf.go new file mode 100644 index 000000000..2fc0a71f9 --- /dev/null +++ b/vendor/google.golang.org/grpc/balancer/pickfirst/pickfirstleaf/pickfirstleaf.go @@ -0,0 +1,911 @@ +/* + * + * Copyright 2024 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + *     http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +// Package pickfirstleaf contains the pick_first load balancing policy which +// will be the universal leaf policy after dualstack changes are implemented. +// +// # Experimental +// +// Notice: This package is EXPERIMENTAL and may be changed or removed in a +// later release. +package pickfirstleaf + +import ( +	"encoding/json" +	"errors" +	"fmt" +	"net" +	"net/netip" +	"sync" +	"time" + +	"google.golang.org/grpc/balancer" +	"google.golang.org/grpc/balancer/pickfirst/internal" +	"google.golang.org/grpc/connectivity" +	expstats "google.golang.org/grpc/experimental/stats" +	"google.golang.org/grpc/grpclog" +	"google.golang.org/grpc/internal/envconfig" +	internalgrpclog "google.golang.org/grpc/internal/grpclog" +	"google.golang.org/grpc/internal/pretty" +	"google.golang.org/grpc/resolver" +	"google.golang.org/grpc/serviceconfig" +) + +func init() { +	if envconfig.NewPickFirstEnabled { +		// Register as the default pick_first balancer. +		Name = "pick_first" +	} +	balancer.Register(pickfirstBuilder{}) +} + +// enableHealthListenerKeyType is a unique key type used in resolver attributes +// to indicate whether the health listener usage is enabled. +type enableHealthListenerKeyType struct{} + +var ( +	logger = grpclog.Component("pick-first-leaf-lb") +	// Name is the name of the pick_first_leaf balancer. +	// It is changed to "pick_first" in init() if this balancer is to be +	// registered as the default pickfirst. +	Name                 = "pick_first_leaf" +	disconnectionsMetric = expstats.RegisterInt64Count(expstats.MetricDescriptor{ +		Name:        "grpc.lb.pick_first.disconnections", +		Description: "EXPERIMENTAL. Number of times the selected subchannel becomes disconnected.", +		Unit:        "disconnection", +		Labels:      []string{"grpc.target"}, +		Default:     false, +	}) +	connectionAttemptsSucceededMetric = expstats.RegisterInt64Count(expstats.MetricDescriptor{ +		Name:        "grpc.lb.pick_first.connection_attempts_succeeded", +		Description: "EXPERIMENTAL. Number of successful connection attempts.", +		Unit:        "attempt", +		Labels:      []string{"grpc.target"}, +		Default:     false, +	}) +	connectionAttemptsFailedMetric = expstats.RegisterInt64Count(expstats.MetricDescriptor{ +		Name:        "grpc.lb.pick_first.connection_attempts_failed", +		Description: "EXPERIMENTAL. Number of failed connection attempts.", +		Unit:        "attempt", +		Labels:      []string{"grpc.target"}, +		Default:     false, +	}) +) + +const ( +	// TODO: change to pick-first when this becomes the default pick_first policy. +	logPrefix = "[pick-first-leaf-lb %p] " +	// connectionDelayInterval is the time to wait for during the happy eyeballs +	// pass before starting the next connection attempt. +	connectionDelayInterval = 250 * time.Millisecond +) + +type ipAddrFamily int + +const ( +	// ipAddrFamilyUnknown represents strings that can't be parsed as an IP +	// address. +	ipAddrFamilyUnknown ipAddrFamily = iota +	ipAddrFamilyV4 +	ipAddrFamilyV6 +) + +type pickfirstBuilder struct{} + +func (pickfirstBuilder) Build(cc balancer.ClientConn, bo balancer.BuildOptions) balancer.Balancer { +	b := &pickfirstBalancer{ +		cc:              cc, +		target:          bo.Target.String(), +		metricsRecorder: bo.MetricsRecorder, // ClientConn will always create a Metrics Recorder. + +		subConns:              resolver.NewAddressMap(), +		state:                 connectivity.Connecting, +		cancelConnectionTimer: func() {}, +	} +	b.logger = internalgrpclog.NewPrefixLogger(logger, fmt.Sprintf(logPrefix, b)) +	return b +} + +func (b pickfirstBuilder) Name() string { +	return Name +} + +func (pickfirstBuilder) ParseConfig(js json.RawMessage) (serviceconfig.LoadBalancingConfig, error) { +	var cfg pfConfig +	if err := json.Unmarshal(js, &cfg); err != nil { +		return nil, fmt.Errorf("pickfirst: unable to unmarshal LB policy config: %s, error: %v", string(js), err) +	} +	return cfg, nil +} + +// EnableHealthListener updates the state to configure pickfirst for using a +// generic health listener. +func EnableHealthListener(state resolver.State) resolver.State { +	state.Attributes = state.Attributes.WithValue(enableHealthListenerKeyType{}, true) +	return state +} + +type pfConfig struct { +	serviceconfig.LoadBalancingConfig `json:"-"` + +	// If set to true, instructs the LB policy to shuffle the order of the list +	// of endpoints received from the name resolver before attempting to +	// connect to them. +	ShuffleAddressList bool `json:"shuffleAddressList"` +} + +// scData keeps track of the current state of the subConn. +// It is not safe for concurrent access. +type scData struct { +	// The following fields are initialized at build time and read-only after +	// that. +	subConn balancer.SubConn +	addr    resolver.Address + +	rawConnectivityState connectivity.State +	// The effective connectivity state based on raw connectivity, health state +	// and after following sticky TransientFailure behaviour defined in A62. +	effectiveState              connectivity.State +	lastErr                     error +	connectionFailedInFirstPass bool +} + +func (b *pickfirstBalancer) newSCData(addr resolver.Address) (*scData, error) { +	sd := &scData{ +		rawConnectivityState: connectivity.Idle, +		effectiveState:       connectivity.Idle, +		addr:                 addr, +	} +	sc, err := b.cc.NewSubConn([]resolver.Address{addr}, balancer.NewSubConnOptions{ +		StateListener: func(state balancer.SubConnState) { +			b.updateSubConnState(sd, state) +		}, +	}) +	if err != nil { +		return nil, err +	} +	sd.subConn = sc +	return sd, nil +} + +type pickfirstBalancer struct { +	// The following fields are initialized at build time and read-only after +	// that and therefore do not need to be guarded by a mutex. +	logger          *internalgrpclog.PrefixLogger +	cc              balancer.ClientConn +	target          string +	metricsRecorder expstats.MetricsRecorder // guaranteed to be non nil + +	// The mutex is used to ensure synchronization of updates triggered +	// from the idle picker and the already serialized resolver, +	// SubConn state updates. +	mu sync.Mutex +	// State reported to the channel based on SubConn states and resolver +	// updates. +	state connectivity.State +	// scData for active subonns mapped by address. +	subConns              *resolver.AddressMap +	addressList           addressList +	firstPass             bool +	numTF                 int +	cancelConnectionTimer func() +	healthCheckingEnabled bool +} + +// ResolverError is called by the ClientConn when the name resolver produces +// an error or when pickfirst determined the resolver update to be invalid. +func (b *pickfirstBalancer) ResolverError(err error) { +	b.mu.Lock() +	defer b.mu.Unlock() +	b.resolverErrorLocked(err) +} + +func (b *pickfirstBalancer) resolverErrorLocked(err error) { +	if b.logger.V(2) { +		b.logger.Infof("Received error from the name resolver: %v", err) +	} + +	// The picker will not change since the balancer does not currently +	// report an error. If the balancer hasn't received a single good resolver +	// update yet, transition to TRANSIENT_FAILURE. +	if b.state != connectivity.TransientFailure && b.addressList.size() > 0 { +		if b.logger.V(2) { +			b.logger.Infof("Ignoring resolver error because balancer is using a previous good update.") +		} +		return +	} + +	b.updateBalancerState(balancer.State{ +		ConnectivityState: connectivity.TransientFailure, +		Picker:            &picker{err: fmt.Errorf("name resolver error: %v", err)}, +	}) +} + +func (b *pickfirstBalancer) UpdateClientConnState(state balancer.ClientConnState) error { +	b.mu.Lock() +	defer b.mu.Unlock() +	b.cancelConnectionTimer() +	if len(state.ResolverState.Addresses) == 0 && len(state.ResolverState.Endpoints) == 0 { +		// Cleanup state pertaining to the previous resolver state. +		// Treat an empty address list like an error by calling b.ResolverError. +		b.closeSubConnsLocked() +		b.addressList.updateAddrs(nil) +		b.resolverErrorLocked(errors.New("produced zero addresses")) +		return balancer.ErrBadResolverState +	} +	b.healthCheckingEnabled = state.ResolverState.Attributes.Value(enableHealthListenerKeyType{}) != nil +	cfg, ok := state.BalancerConfig.(pfConfig) +	if state.BalancerConfig != nil && !ok { +		return fmt.Errorf("pickfirst: received illegal BalancerConfig (type %T): %v: %w", state.BalancerConfig, state.BalancerConfig, balancer.ErrBadResolverState) +	} + +	if b.logger.V(2) { +		b.logger.Infof("Received new config %s, resolver state %s", pretty.ToJSON(cfg), pretty.ToJSON(state.ResolverState)) +	} + +	var newAddrs []resolver.Address +	if endpoints := state.ResolverState.Endpoints; len(endpoints) != 0 { +		// Perform the optional shuffling described in gRFC A62. The shuffling +		// will change the order of endpoints but not touch the order of the +		// addresses within each endpoint. - A61 +		if cfg.ShuffleAddressList { +			endpoints = append([]resolver.Endpoint{}, endpoints...) +			internal.RandShuffle(len(endpoints), func(i, j int) { endpoints[i], endpoints[j] = endpoints[j], endpoints[i] }) +		} + +		// "Flatten the list by concatenating the ordered list of addresses for +		// each of the endpoints, in order." - A61 +		for _, endpoint := range endpoints { +			newAddrs = append(newAddrs, endpoint.Addresses...) +		} +	} else { +		// Endpoints not set, process addresses until we migrate resolver +		// emissions fully to Endpoints. The top channel does wrap emitted +		// addresses with endpoints, however some balancers such as weighted +		// target do not forward the corresponding correct endpoints down/split +		// endpoints properly. Once all balancers correctly forward endpoints +		// down, can delete this else conditional. +		newAddrs = state.ResolverState.Addresses +		if cfg.ShuffleAddressList { +			newAddrs = append([]resolver.Address{}, newAddrs...) +			internal.RandShuffle(len(endpoints), func(i, j int) { endpoints[i], endpoints[j] = endpoints[j], endpoints[i] }) +		} +	} + +	// If an address appears in multiple endpoints or in the same endpoint +	// multiple times, we keep it only once. We will create only one SubConn +	// for the address because an AddressMap is used to store SubConns. +	// Not de-duplicating would result in attempting to connect to the same +	// SubConn multiple times in the same pass. We don't want this. +	newAddrs = deDupAddresses(newAddrs) +	newAddrs = interleaveAddresses(newAddrs) + +	prevAddr := b.addressList.currentAddress() +	prevSCData, found := b.subConns.Get(prevAddr) +	prevAddrsCount := b.addressList.size() +	isPrevRawConnectivityStateReady := found && prevSCData.(*scData).rawConnectivityState == connectivity.Ready +	b.addressList.updateAddrs(newAddrs) + +	// If the previous ready SubConn exists in new address list, +	// keep this connection and don't create new SubConns. +	if isPrevRawConnectivityStateReady && b.addressList.seekTo(prevAddr) { +		return nil +	} + +	b.reconcileSubConnsLocked(newAddrs) +	// If it's the first resolver update or the balancer was already READY +	// (but the new address list does not contain the ready SubConn) or +	// CONNECTING, enter CONNECTING. +	// We may be in TRANSIENT_FAILURE due to a previous empty address list, +	// we should still enter CONNECTING because the sticky TF behaviour +	//  mentioned in A62 applies only when the TRANSIENT_FAILURE is reported +	// due to connectivity failures. +	if isPrevRawConnectivityStateReady || b.state == connectivity.Connecting || prevAddrsCount == 0 { +		// Start connection attempt at first address. +		b.forceUpdateConcludedStateLocked(balancer.State{ +			ConnectivityState: connectivity.Connecting, +			Picker:            &picker{err: balancer.ErrNoSubConnAvailable}, +		}) +		b.startFirstPassLocked() +	} else if b.state == connectivity.TransientFailure { +		// If we're in TRANSIENT_FAILURE, we stay in TRANSIENT_FAILURE until +		// we're READY. See A62. +		b.startFirstPassLocked() +	} +	return nil +} + +// UpdateSubConnState is unused as a StateListener is always registered when +// creating SubConns. +func (b *pickfirstBalancer) UpdateSubConnState(subConn balancer.SubConn, state balancer.SubConnState) { +	b.logger.Errorf("UpdateSubConnState(%v, %+v) called unexpectedly", subConn, state) +} + +func (b *pickfirstBalancer) Close() { +	b.mu.Lock() +	defer b.mu.Unlock() +	b.closeSubConnsLocked() +	b.cancelConnectionTimer() +	b.state = connectivity.Shutdown +} + +// ExitIdle moves the balancer out of idle state. It can be called concurrently +// by the idlePicker and clientConn so access to variables should be +// synchronized. +func (b *pickfirstBalancer) ExitIdle() { +	b.mu.Lock() +	defer b.mu.Unlock() +	if b.state == connectivity.Idle { +		b.startFirstPassLocked() +	} +} + +func (b *pickfirstBalancer) startFirstPassLocked() { +	b.firstPass = true +	b.numTF = 0 +	// Reset the connection attempt record for existing SubConns. +	for _, sd := range b.subConns.Values() { +		sd.(*scData).connectionFailedInFirstPass = false +	} +	b.requestConnectionLocked() +} + +func (b *pickfirstBalancer) closeSubConnsLocked() { +	for _, sd := range b.subConns.Values() { +		sd.(*scData).subConn.Shutdown() +	} +	b.subConns = resolver.NewAddressMap() +} + +// deDupAddresses ensures that each address appears only once in the slice. +func deDupAddresses(addrs []resolver.Address) []resolver.Address { +	seenAddrs := resolver.NewAddressMap() +	retAddrs := []resolver.Address{} + +	for _, addr := range addrs { +		if _, ok := seenAddrs.Get(addr); ok { +			continue +		} +		retAddrs = append(retAddrs, addr) +	} +	return retAddrs +} + +// interleaveAddresses interleaves addresses of both families (IPv4 and IPv6) +// as per RFC-8305 section 4. +// Whichever address family is first in the list is followed by an address of +// the other address family; that is, if the first address in the list is IPv6, +// then the first IPv4 address should be moved up in the list to be second in +// the list. It doesn't support configuring "First Address Family Count", i.e. +// there will always be a single member of the first address family at the +// beginning of the interleaved list. +// Addresses that are neither IPv4 nor IPv6 are treated as part of a third +// "unknown" family for interleaving. +// See: https://datatracker.ietf.org/doc/html/rfc8305#autoid-6 +func interleaveAddresses(addrs []resolver.Address) []resolver.Address { +	familyAddrsMap := map[ipAddrFamily][]resolver.Address{} +	interleavingOrder := []ipAddrFamily{} +	for _, addr := range addrs { +		family := addressFamily(addr.Addr) +		if _, found := familyAddrsMap[family]; !found { +			interleavingOrder = append(interleavingOrder, family) +		} +		familyAddrsMap[family] = append(familyAddrsMap[family], addr) +	} + +	interleavedAddrs := make([]resolver.Address, 0, len(addrs)) + +	for curFamilyIdx := 0; len(interleavedAddrs) < len(addrs); curFamilyIdx = (curFamilyIdx + 1) % len(interleavingOrder) { +		// Some IP types may have fewer addresses than others, so we look for +		// the next type that has a remaining member to add to the interleaved +		// list. +		family := interleavingOrder[curFamilyIdx] +		remainingMembers := familyAddrsMap[family] +		if len(remainingMembers) > 0 { +			interleavedAddrs = append(interleavedAddrs, remainingMembers[0]) +			familyAddrsMap[family] = remainingMembers[1:] +		} +	} + +	return interleavedAddrs +} + +// addressFamily returns the ipAddrFamily after parsing the address string. +// If the address isn't of the format "ip-address:port", it returns +// ipAddrFamilyUnknown. The address may be valid even if it's not an IP when +// using a resolver like passthrough where the address may be a hostname in +// some format that the dialer can resolve. +func addressFamily(address string) ipAddrFamily { +	// Parse the IP after removing the port. +	host, _, err := net.SplitHostPort(address) +	if err != nil { +		return ipAddrFamilyUnknown +	} +	ip, err := netip.ParseAddr(host) +	if err != nil { +		return ipAddrFamilyUnknown +	} +	switch { +	case ip.Is4() || ip.Is4In6(): +		return ipAddrFamilyV4 +	case ip.Is6(): +		return ipAddrFamilyV6 +	default: +		return ipAddrFamilyUnknown +	} +} + +// reconcileSubConnsLocked updates the active subchannels based on a new address +// list from the resolver. It does this by: +//   - closing subchannels: any existing subchannels associated with addresses +//     that are no longer in the updated list are shut down. +//   - removing subchannels: entries for these closed subchannels are removed +//     from the subchannel map. +// +// This ensures that the subchannel map accurately reflects the current set of +// addresses received from the name resolver. +func (b *pickfirstBalancer) reconcileSubConnsLocked(newAddrs []resolver.Address) { +	newAddrsMap := resolver.NewAddressMap() +	for _, addr := range newAddrs { +		newAddrsMap.Set(addr, true) +	} + +	for _, oldAddr := range b.subConns.Keys() { +		if _, ok := newAddrsMap.Get(oldAddr); ok { +			continue +		} +		val, _ := b.subConns.Get(oldAddr) +		val.(*scData).subConn.Shutdown() +		b.subConns.Delete(oldAddr) +	} +} + +// shutdownRemainingLocked shuts down remaining subConns. Called when a subConn +// becomes ready, which means that all other subConn must be shutdown. +func (b *pickfirstBalancer) shutdownRemainingLocked(selected *scData) { +	b.cancelConnectionTimer() +	for _, v := range b.subConns.Values() { +		sd := v.(*scData) +		if sd.subConn != selected.subConn { +			sd.subConn.Shutdown() +		} +	} +	b.subConns = resolver.NewAddressMap() +	b.subConns.Set(selected.addr, selected) +} + +// requestConnectionLocked starts connecting on the subchannel corresponding to +// the current address. If no subchannel exists, one is created. If the current +// subchannel is in TransientFailure, a connection to the next address is +// attempted until a subchannel is found. +func (b *pickfirstBalancer) requestConnectionLocked() { +	if !b.addressList.isValid() { +		return +	} +	var lastErr error +	for valid := true; valid; valid = b.addressList.increment() { +		curAddr := b.addressList.currentAddress() +		sd, ok := b.subConns.Get(curAddr) +		if !ok { +			var err error +			// We want to assign the new scData to sd from the outer scope, +			// hence we can't use := below. +			sd, err = b.newSCData(curAddr) +			if err != nil { +				// This should never happen, unless the clientConn is being shut +				// down. +				if b.logger.V(2) { +					b.logger.Infof("Failed to create a subConn for address %v: %v", curAddr.String(), err) +				} +				// Do nothing, the LB policy will be closed soon. +				return +			} +			b.subConns.Set(curAddr, sd) +		} + +		scd := sd.(*scData) +		switch scd.rawConnectivityState { +		case connectivity.Idle: +			scd.subConn.Connect() +			b.scheduleNextConnectionLocked() +			return +		case connectivity.TransientFailure: +			// The SubConn is being re-used and failed during a previous pass +			// over the addressList. It has not completed backoff yet. +			// Mark it as having failed and try the next address. +			scd.connectionFailedInFirstPass = true +			lastErr = scd.lastErr +			continue +		case connectivity.Connecting: +			// Wait for the connection attempt to complete or the timer to fire +			// before attempting the next address. +			b.scheduleNextConnectionLocked() +			return +		default: +			b.logger.Errorf("SubConn with unexpected state %v present in SubConns map.", scd.rawConnectivityState) +			return + +		} +	} + +	// All the remaining addresses in the list are in TRANSIENT_FAILURE, end the +	// first pass if possible. +	b.endFirstPassIfPossibleLocked(lastErr) +} + +func (b *pickfirstBalancer) scheduleNextConnectionLocked() { +	b.cancelConnectionTimer() +	if !b.addressList.hasNext() { +		return +	} +	curAddr := b.addressList.currentAddress() +	cancelled := false // Access to this is protected by the balancer's mutex. +	closeFn := internal.TimeAfterFunc(connectionDelayInterval, func() { +		b.mu.Lock() +		defer b.mu.Unlock() +		// If the scheduled task is cancelled while acquiring the mutex, return. +		if cancelled { +			return +		} +		if b.logger.V(2) { +			b.logger.Infof("Happy Eyeballs timer expired while waiting for connection to %q.", curAddr.Addr) +		} +		if b.addressList.increment() { +			b.requestConnectionLocked() +		} +	}) +	// Access to the cancellation callback held by the balancer is guarded by +	// the balancer's mutex, so it's safe to set the boolean from the callback. +	b.cancelConnectionTimer = sync.OnceFunc(func() { +		cancelled = true +		closeFn() +	}) +} + +func (b *pickfirstBalancer) updateSubConnState(sd *scData, newState balancer.SubConnState) { +	b.mu.Lock() +	defer b.mu.Unlock() +	oldState := sd.rawConnectivityState +	sd.rawConnectivityState = newState.ConnectivityState +	// Previously relevant SubConns can still callback with state updates. +	// To prevent pickers from returning these obsolete SubConns, this logic +	// is included to check if the current list of active SubConns includes this +	// SubConn. +	if !b.isActiveSCData(sd) { +		return +	} +	if newState.ConnectivityState == connectivity.Shutdown { +		sd.effectiveState = connectivity.Shutdown +		return +	} + +	// Record a connection attempt when exiting CONNECTING. +	if newState.ConnectivityState == connectivity.TransientFailure { +		sd.connectionFailedInFirstPass = true +		connectionAttemptsFailedMetric.Record(b.metricsRecorder, 1, b.target) +	} + +	if newState.ConnectivityState == connectivity.Ready { +		connectionAttemptsSucceededMetric.Record(b.metricsRecorder, 1, b.target) +		b.shutdownRemainingLocked(sd) +		if !b.addressList.seekTo(sd.addr) { +			// This should not fail as we should have only one SubConn after +			// entering READY. The SubConn should be present in the addressList. +			b.logger.Errorf("Address %q not found address list in  %v", sd.addr, b.addressList.addresses) +			return +		} +		if !b.healthCheckingEnabled { +			if b.logger.V(2) { +				b.logger.Infof("SubConn %p reported connectivity state READY and the health listener is disabled. Transitioning SubConn to READY.", sd.subConn) +			} + +			sd.effectiveState = connectivity.Ready +			b.updateBalancerState(balancer.State{ +				ConnectivityState: connectivity.Ready, +				Picker:            &picker{result: balancer.PickResult{SubConn: sd.subConn}}, +			}) +			return +		} +		if b.logger.V(2) { +			b.logger.Infof("SubConn %p reported connectivity state READY. Registering health listener.", sd.subConn) +		} +		// Send a CONNECTING update to take the SubConn out of sticky-TF if +		// required. +		sd.effectiveState = connectivity.Connecting +		b.updateBalancerState(balancer.State{ +			ConnectivityState: connectivity.Connecting, +			Picker:            &picker{err: balancer.ErrNoSubConnAvailable}, +		}) +		sd.subConn.RegisterHealthListener(func(scs balancer.SubConnState) { +			b.updateSubConnHealthState(sd, scs) +		}) +		return +	} + +	// If the LB policy is READY, and it receives a subchannel state change, +	// it means that the READY subchannel has failed. +	// A SubConn can also transition from CONNECTING directly to IDLE when +	// a transport is successfully created, but the connection fails +	// before the SubConn can send the notification for READY. We treat +	// this as a successful connection and transition to IDLE. +	// TODO: https://github.com/grpc/grpc-go/issues/7862 - Remove the second +	// part of the if condition below once the issue is fixed. +	if oldState == connectivity.Ready || (oldState == connectivity.Connecting && newState.ConnectivityState == connectivity.Idle) { +		// Once a transport fails, the balancer enters IDLE and starts from +		// the first address when the picker is used. +		b.shutdownRemainingLocked(sd) +		sd.effectiveState = newState.ConnectivityState +		// READY SubConn interspliced in between CONNECTING and IDLE, need to +		// account for that. +		if oldState == connectivity.Connecting { +			// A known issue (https://github.com/grpc/grpc-go/issues/7862) +			// causes a race that prevents the READY state change notification. +			// This works around it. +			connectionAttemptsSucceededMetric.Record(b.metricsRecorder, 1, b.target) +		} +		disconnectionsMetric.Record(b.metricsRecorder, 1, b.target) +		b.addressList.reset() +		b.updateBalancerState(balancer.State{ +			ConnectivityState: connectivity.Idle, +			Picker:            &idlePicker{exitIdle: sync.OnceFunc(b.ExitIdle)}, +		}) +		return +	} + +	if b.firstPass { +		switch newState.ConnectivityState { +		case connectivity.Connecting: +			// The effective state can be in either IDLE, CONNECTING or +			// TRANSIENT_FAILURE. If it's  TRANSIENT_FAILURE, stay in +			// TRANSIENT_FAILURE until it's READY. See A62. +			if sd.effectiveState != connectivity.TransientFailure { +				sd.effectiveState = connectivity.Connecting +				b.updateBalancerState(balancer.State{ +					ConnectivityState: connectivity.Connecting, +					Picker:            &picker{err: balancer.ErrNoSubConnAvailable}, +				}) +			} +		case connectivity.TransientFailure: +			sd.lastErr = newState.ConnectionError +			sd.effectiveState = connectivity.TransientFailure +			// Since we're re-using common SubConns while handling resolver +			// updates, we could receive an out of turn TRANSIENT_FAILURE from +			// a pass over the previous address list. Happy Eyeballs will also +			// cause out of order updates to arrive. + +			if curAddr := b.addressList.currentAddress(); equalAddressIgnoringBalAttributes(&curAddr, &sd.addr) { +				b.cancelConnectionTimer() +				if b.addressList.increment() { +					b.requestConnectionLocked() +					return +				} +			} + +			// End the first pass if we've seen a TRANSIENT_FAILURE from all +			// SubConns once. +			b.endFirstPassIfPossibleLocked(newState.ConnectionError) +		} +		return +	} + +	// We have finished the first pass, keep re-connecting failing SubConns. +	switch newState.ConnectivityState { +	case connectivity.TransientFailure: +		b.numTF = (b.numTF + 1) % b.subConns.Len() +		sd.lastErr = newState.ConnectionError +		if b.numTF%b.subConns.Len() == 0 { +			b.updateBalancerState(balancer.State{ +				ConnectivityState: connectivity.TransientFailure, +				Picker:            &picker{err: newState.ConnectionError}, +			}) +		} +		// We don't need to request re-resolution since the SubConn already +		// does that before reporting TRANSIENT_FAILURE. +		// TODO: #7534 - Move re-resolution requests from SubConn into +		// pick_first. +	case connectivity.Idle: +		sd.subConn.Connect() +	} +} + +// endFirstPassIfPossibleLocked ends the first happy-eyeballs pass if all the +// addresses are tried and their SubConns have reported a failure. +func (b *pickfirstBalancer) endFirstPassIfPossibleLocked(lastErr error) { +	// An optimization to avoid iterating over the entire SubConn map. +	if b.addressList.isValid() { +		return +	} +	// Connect() has been called on all the SubConns. The first pass can be +	// ended if all the SubConns have reported a failure. +	for _, v := range b.subConns.Values() { +		sd := v.(*scData) +		if !sd.connectionFailedInFirstPass { +			return +		} +	} +	b.firstPass = false +	b.updateBalancerState(balancer.State{ +		ConnectivityState: connectivity.TransientFailure, +		Picker:            &picker{err: lastErr}, +	}) +	// Start re-connecting all the SubConns that are already in IDLE. +	for _, v := range b.subConns.Values() { +		sd := v.(*scData) +		if sd.rawConnectivityState == connectivity.Idle { +			sd.subConn.Connect() +		} +	} +} + +func (b *pickfirstBalancer) isActiveSCData(sd *scData) bool { +	activeSD, found := b.subConns.Get(sd.addr) +	return found && activeSD == sd +} + +func (b *pickfirstBalancer) updateSubConnHealthState(sd *scData, state balancer.SubConnState) { +	b.mu.Lock() +	defer b.mu.Unlock() +	// Previously relevant SubConns can still callback with state updates. +	// To prevent pickers from returning these obsolete SubConns, this logic +	// is included to check if the current list of active SubConns includes +	// this SubConn. +	if !b.isActiveSCData(sd) { +		return +	} +	sd.effectiveState = state.ConnectivityState +	switch state.ConnectivityState { +	case connectivity.Ready: +		b.updateBalancerState(balancer.State{ +			ConnectivityState: connectivity.Ready, +			Picker:            &picker{result: balancer.PickResult{SubConn: sd.subConn}}, +		}) +	case connectivity.TransientFailure: +		b.updateBalancerState(balancer.State{ +			ConnectivityState: connectivity.TransientFailure, +			Picker:            &picker{err: fmt.Errorf("pickfirst: health check failure: %v", state.ConnectionError)}, +		}) +	case connectivity.Connecting: +		b.updateBalancerState(balancer.State{ +			ConnectivityState: connectivity.Connecting, +			Picker:            &picker{err: balancer.ErrNoSubConnAvailable}, +		}) +	default: +		b.logger.Errorf("Got unexpected health update for SubConn %p: %v", state) +	} +} + +// updateBalancerState stores the state reported to the channel and calls +// ClientConn.UpdateState(). As an optimization, it avoids sending duplicate +// updates to the channel. +func (b *pickfirstBalancer) updateBalancerState(newState balancer.State) { +	// In case of TransientFailures allow the picker to be updated to update +	// the connectivity error, in all other cases don't send duplicate state +	// updates. +	if newState.ConnectivityState == b.state && b.state != connectivity.TransientFailure { +		return +	} +	b.forceUpdateConcludedStateLocked(newState) +} + +// forceUpdateConcludedStateLocked stores the state reported to the channel and +// calls ClientConn.UpdateState(). +// A separate function is defined to force update the ClientConn state since the +// channel doesn't correctly assume that LB policies start in CONNECTING and +// relies on LB policy to send an initial CONNECTING update. +func (b *pickfirstBalancer) forceUpdateConcludedStateLocked(newState balancer.State) { +	b.state = newState.ConnectivityState +	b.cc.UpdateState(newState) +} + +type picker struct { +	result balancer.PickResult +	err    error +} + +func (p *picker) Pick(balancer.PickInfo) (balancer.PickResult, error) { +	return p.result, p.err +} + +// idlePicker is used when the SubConn is IDLE and kicks the SubConn into +// CONNECTING when Pick is called. +type idlePicker struct { +	exitIdle func() +} + +func (i *idlePicker) Pick(balancer.PickInfo) (balancer.PickResult, error) { +	i.exitIdle() +	return balancer.PickResult{}, balancer.ErrNoSubConnAvailable +} + +// addressList manages sequentially iterating over addresses present in a list +// of endpoints. It provides a 1 dimensional view of the addresses present in +// the endpoints. +// This type is not safe for concurrent access. +type addressList struct { +	addresses []resolver.Address +	idx       int +} + +func (al *addressList) isValid() bool { +	return al.idx < len(al.addresses) +} + +func (al *addressList) size() int { +	return len(al.addresses) +} + +// increment moves to the next index in the address list. +// This method returns false if it went off the list, true otherwise. +func (al *addressList) increment() bool { +	if !al.isValid() { +		return false +	} +	al.idx++ +	return al.idx < len(al.addresses) +} + +// currentAddress returns the current address pointed to in the addressList. +// If the list is in an invalid state, it returns an empty address instead. +func (al *addressList) currentAddress() resolver.Address { +	if !al.isValid() { +		return resolver.Address{} +	} +	return al.addresses[al.idx] +} + +func (al *addressList) reset() { +	al.idx = 0 +} + +func (al *addressList) updateAddrs(addrs []resolver.Address) { +	al.addresses = addrs +	al.reset() +} + +// seekTo returns false if the needle was not found and the current index was +// left unchanged. +func (al *addressList) seekTo(needle resolver.Address) bool { +	for ai, addr := range al.addresses { +		if !equalAddressIgnoringBalAttributes(&addr, &needle) { +			continue +		} +		al.idx = ai +		return true +	} +	return false +} + +// hasNext returns whether incrementing the addressList will result in moving +// past the end of the list. If the list has already moved past the end, it +// returns false. +func (al *addressList) hasNext() bool { +	if !al.isValid() { +		return false +	} +	return al.idx+1 < len(al.addresses) +} + +// equalAddressIgnoringBalAttributes returns true is a and b are considered +// equal. This is different from the Equal method on the resolver.Address type +// which considers all fields to determine equality. Here, we only consider +// fields that are meaningful to the SubConn. +func equalAddressIgnoringBalAttributes(a, b *resolver.Address) bool { +	return a.Addr == b.Addr && a.ServerName == b.ServerName && +		a.Attributes.Equal(b.Attributes) && +		a.Metadata == b.Metadata +} diff --git a/vendor/google.golang.org/grpc/balancer/roundrobin/roundrobin.go b/vendor/google.golang.org/grpc/balancer/roundrobin/roundrobin.go index 260255d31..80a42d225 100644 --- a/vendor/google.golang.org/grpc/balancer/roundrobin/roundrobin.go +++ b/vendor/google.golang.org/grpc/balancer/roundrobin/roundrobin.go @@ -22,7 +22,7 @@  package roundrobin  import ( -	"math/rand" +	rand "math/rand/v2"  	"sync/atomic"  	"google.golang.org/grpc/balancer" @@ -60,7 +60,7 @@ func (*rrPickerBuilder) Build(info base.PickerBuildInfo) balancer.Picker {  		// Start at a random index, as the same RR balancer rebuilds a new  		// picker when SubConn states change, and we don't want to apply excess  		// load to the first server in the list. -		next: uint32(rand.Intn(len(scs))), +		next: uint32(rand.IntN(len(scs))),  	}  } diff --git a/vendor/google.golang.org/grpc/balancer/subconn.go b/vendor/google.golang.org/grpc/balancer/subconn.go new file mode 100644 index 000000000..ea27c4fa7 --- /dev/null +++ b/vendor/google.golang.org/grpc/balancer/subconn.go @@ -0,0 +1,134 @@ +/* + * + * Copyright 2024 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + *     http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package balancer + +import ( +	"google.golang.org/grpc/connectivity" +	"google.golang.org/grpc/internal" +	"google.golang.org/grpc/resolver" +) + +// A SubConn represents a single connection to a gRPC backend service. +// +// All SubConns start in IDLE, and will not try to connect. To trigger a +// connection attempt, Balancers must call Connect. +// +// If the connection attempt fails, the SubConn will transition to +// TRANSIENT_FAILURE for a backoff period, and then return to IDLE.  If the +// connection attempt succeeds, it will transition to READY. +// +// If a READY SubConn becomes disconnected, the SubConn will transition to IDLE. +// +// If a connection re-enters IDLE, Balancers must call Connect again to trigger +// a new connection attempt. +// +// Each SubConn contains a list of addresses.  gRPC will try to connect to the +// addresses in sequence, and stop trying the remainder once the first +// connection is successful.  However, this behavior is deprecated.  SubConns +// should only use a single address. +// +// NOTICE: This interface is intended to be implemented by gRPC, or intercepted +// by custom load balancing poilices.  Users should not need their own complete +// implementation of this interface -- they should always delegate to a SubConn +// returned by ClientConn.NewSubConn() by embedding it in their implementations. +// An embedded SubConn must never be nil, or runtime panics will occur. +type SubConn interface { +	// UpdateAddresses updates the addresses used in this SubConn. +	// gRPC checks if currently-connected address is still in the new list. +	// If it's in the list, the connection will be kept. +	// If it's not in the list, the connection will gracefully close, and +	// a new connection will be created. +	// +	// This will trigger a state transition for the SubConn. +	// +	// Deprecated: this method will be removed.  Create new SubConns for new +	// addresses instead. +	UpdateAddresses([]resolver.Address) +	// Connect starts the connecting for this SubConn. +	Connect() +	// GetOrBuildProducer returns a reference to the existing Producer for this +	// ProducerBuilder in this SubConn, or, if one does not currently exist, +	// creates a new one and returns it.  Returns a close function which may be +	// called when the Producer is no longer needed.  Otherwise the producer +	// will automatically be closed upon connection loss or subchannel close. +	// Should only be called on a SubConn in state Ready.  Otherwise the +	// producer will be unable to create streams. +	GetOrBuildProducer(ProducerBuilder) (p Producer, close func()) +	// Shutdown shuts down the SubConn gracefully.  Any started RPCs will be +	// allowed to complete.  No future calls should be made on the SubConn. +	// One final state update will be delivered to the StateListener (or +	// UpdateSubConnState; deprecated) with ConnectivityState of Shutdown to +	// indicate the shutdown operation.  This may be delivered before +	// in-progress RPCs are complete and the actual connection is closed. +	Shutdown() +	// RegisterHealthListener registers a health listener that receives health +	// updates for a Ready SubConn. Only one health listener can be registered +	// at a time. A health listener should be registered each time the SubConn's +	// connectivity state changes to READY. Registering a health listener when +	// the connectivity state is not READY may result in undefined behaviour. +	// This method must not be called synchronously while handling an update +	// from a previously registered health listener. +	RegisterHealthListener(func(SubConnState)) +	// EnforceSubConnEmbedding is included to force implementers to embed +	// another implementation of this interface, allowing gRPC to add methods +	// without breaking users. +	internal.EnforceSubConnEmbedding +} + +// A ProducerBuilder is a simple constructor for a Producer.  It is used by the +// SubConn to create producers when needed. +type ProducerBuilder interface { +	// Build creates a Producer.  The first parameter is always a +	// grpc.ClientConnInterface (a type to allow creating RPCs/streams on the +	// associated SubConn), but is declared as `any` to avoid a dependency +	// cycle.  Build also returns a close function that will be called when all +	// references to the Producer have been given up for a SubConn, or when a +	// connectivity state change occurs on the SubConn.  The close function +	// should always block until all asynchronous cleanup work is completed. +	Build(grpcClientConnInterface any) (p Producer, close func()) +} + +// SubConnState describes the state of a SubConn. +type SubConnState struct { +	// ConnectivityState is the connectivity state of the SubConn. +	ConnectivityState connectivity.State +	// ConnectionError is set if the ConnectivityState is TransientFailure, +	// describing the reason the SubConn failed.  Otherwise, it is nil. +	ConnectionError error +	// connectedAddr contains the connected address when ConnectivityState is +	// Ready. Otherwise, it is indeterminate. +	connectedAddress resolver.Address +} + +// connectedAddress returns the connected address for a SubConnState. The +// address is only valid if the state is READY. +func connectedAddress(scs SubConnState) resolver.Address { +	return scs.connectedAddress +} + +// setConnectedAddress sets the connected address for a SubConnState. +func setConnectedAddress(scs *SubConnState, addr resolver.Address) { +	scs.connectedAddress = addr +} + +// A Producer is a type shared among potentially many consumers.  It is +// associated with a SubConn, and an implementation will typically contain +// other methods to provide additional functionality, e.g. configuration or +// subscription registration. +type Producer any diff --git a/vendor/google.golang.org/grpc/balancer_wrapper.go b/vendor/google.golang.org/grpc/balancer_wrapper.go index 6561b769e..905817b5f 100644 --- a/vendor/google.golang.org/grpc/balancer_wrapper.go +++ b/vendor/google.golang.org/grpc/balancer_wrapper.go @@ -24,12 +24,14 @@ import (  	"sync"  	"google.golang.org/grpc/balancer" +	"google.golang.org/grpc/codes"  	"google.golang.org/grpc/connectivity"  	"google.golang.org/grpc/internal"  	"google.golang.org/grpc/internal/balancer/gracefulswitch"  	"google.golang.org/grpc/internal/channelz"  	"google.golang.org/grpc/internal/grpcsync"  	"google.golang.org/grpc/resolver" +	"google.golang.org/grpc/status"  )  var setConnectedAddress = internal.SetConnectedAddress.(func(*balancer.SubConnState, resolver.Address)) @@ -187,12 +189,13 @@ func (ccb *ccBalancerWrapper) NewSubConn(addrs []resolver.Address, opts balancer  		ac:            ac,  		producers:     make(map[balancer.ProducerBuilder]*refCountedProducer),  		stateListener: opts.StateListener, +		healthData:    newHealthData(connectivity.Idle),  	}  	ac.acbw = acbw  	return acbw, nil  } -func (ccb *ccBalancerWrapper) RemoveSubConn(sc balancer.SubConn) { +func (ccb *ccBalancerWrapper) RemoveSubConn(balancer.SubConn) {  	// The graceful switch balancer will never call this.  	logger.Errorf("ccb RemoveSubConn(%v) called unexpectedly, sc")  } @@ -252,12 +255,32 @@ func (ccb *ccBalancerWrapper) Target() string {  // acBalancerWrapper is a wrapper on top of ac for balancers.  // It implements balancer.SubConn interface.  type acBalancerWrapper struct { +	internal.EnforceSubConnEmbedding  	ac            *addrConn          // read-only  	ccb           *ccBalancerWrapper // read-only  	stateListener func(balancer.SubConnState) -	mu        sync.Mutex -	producers map[balancer.ProducerBuilder]*refCountedProducer +	producersMu sync.Mutex +	producers   map[balancer.ProducerBuilder]*refCountedProducer + +	// Access to healthData is protected by healthMu. +	healthMu sync.Mutex +	// healthData is stored as a pointer to detect when the health listener is +	// dropped or updated. This is required as closures can't be compared for +	// equality. +	healthData *healthData +} + +// healthData holds data related to health state reporting. +type healthData struct { +	// connectivityState stores the most recent connectivity state delivered +	// to the LB policy. This is stored to avoid sending updates when the +	// SubConn has already exited connectivity state READY. +	connectivityState connectivity.State +} + +func newHealthData(s connectivity.State) *healthData { +	return &healthData{connectivityState: s}  }  // updateState is invoked by grpc to push a subConn state update to the @@ -267,6 +290,9 @@ func (acbw *acBalancerWrapper) updateState(s connectivity.State, curAddr resolve  		if ctx.Err() != nil || acbw.ccb.balancer == nil {  			return  		} +		// Invalidate all producers on any state change. +		acbw.closeProducers() +  		// Even though it is optional for balancers, gracefulswitch ensures  		// opts.StateListener is set, so this cannot ever be nil.  		// TODO: delete this comment when UpdateSubConnState is removed. @@ -274,17 +300,25 @@ func (acbw *acBalancerWrapper) updateState(s connectivity.State, curAddr resolve  		if s == connectivity.Ready {  			setConnectedAddress(&scs, curAddr)  		} +		// Invalidate the health listener by updating the healthData. +		acbw.healthMu.Lock() +		// A race may occur if a health listener is registered soon after the +		// connectivity state is set but before the stateListener is called. +		// Two cases may arise: +		// 1. The new state is not READY: RegisterHealthListener has checks to +		//    ensure no updates are sent when the connectivity state is not +		//    READY. +		// 2. The new state is READY: This means that the old state wasn't Ready. +		//    The RegisterHealthListener API mentions that a health listener +		//    must not be registered when a SubConn is not ready to avoid such +		//    races. When this happens, the LB policy would get health updates +		//    on the old listener. When the LB policy registers a new listener +		//    on receiving the connectivity update, the health updates will be +		//    sent to the new health listener. +		acbw.healthData = newHealthData(scs.ConnectivityState) +		acbw.healthMu.Unlock() +  		acbw.stateListener(scs) -		acbw.ac.mu.Lock() -		defer acbw.ac.mu.Unlock() -		if s == connectivity.Ready { -			// When changing states to READY, reset stateReadyChan.  Wait until -			// after we notify the LB policy's listener(s) in order to prevent -			// ac.getTransport() from unblocking before the LB policy starts -			// tracking the subchannel as READY. -			close(acbw.ac.stateReadyChan) -			acbw.ac.stateReadyChan = make(chan struct{}) -		}  	})  } @@ -301,6 +335,7 @@ func (acbw *acBalancerWrapper) Connect() {  }  func (acbw *acBalancerWrapper) Shutdown() { +	acbw.closeProducers()  	acbw.ccb.cc.removeAddrConn(acbw.ac, errConnDrain)  } @@ -308,9 +343,10 @@ func (acbw *acBalancerWrapper) Shutdown() {  // ready, blocks until it is or ctx expires.  Returns an error when the context  // expires or the addrConn is shut down.  func (acbw *acBalancerWrapper) NewStream(ctx context.Context, desc *StreamDesc, method string, opts ...CallOption) (ClientStream, error) { -	transport, err := acbw.ac.getTransport(ctx) -	if err != nil { -		return nil, err +	transport := acbw.ac.getReadyTransport() +	if transport == nil { +		return nil, status.Errorf(codes.Unavailable, "SubConn state is not Ready") +  	}  	return newNonRetryClientStream(ctx, desc, method, transport, acbw.ac, opts...)  } @@ -335,15 +371,15 @@ type refCountedProducer struct {  }  func (acbw *acBalancerWrapper) GetOrBuildProducer(pb balancer.ProducerBuilder) (balancer.Producer, func()) { -	acbw.mu.Lock() -	defer acbw.mu.Unlock() +	acbw.producersMu.Lock() +	defer acbw.producersMu.Unlock()  	// Look up existing producer from this builder.  	pData := acbw.producers[pb]  	if pData == nil {  		// Not found; create a new one and add it to the producers map. -		p, close := pb.Build(acbw) -		pData = &refCountedProducer{producer: p, close: close} +		p, closeFn := pb.Build(acbw) +		pData = &refCountedProducer{producer: p, close: closeFn}  		acbw.producers[pb] = pData  	}  	// Account for this new reference. @@ -353,13 +389,64 @@ func (acbw *acBalancerWrapper) GetOrBuildProducer(pb balancer.ProducerBuilder) (  	// and delete the refCountedProducer from the map if the total reference  	// count goes to zero.  	unref := func() { -		acbw.mu.Lock() +		acbw.producersMu.Lock() +		// If closeProducers has already closed this producer instance, refs is +		// set to 0, so the check after decrementing will never pass, and the +		// producer will not be double-closed.  		pData.refs--  		if pData.refs == 0 {  			defer pData.close() // Run outside the acbw mutex  			delete(acbw.producers, pb)  		} -		acbw.mu.Unlock() +		acbw.producersMu.Unlock()  	}  	return pData.producer, grpcsync.OnceFunc(unref)  } + +func (acbw *acBalancerWrapper) closeProducers() { +	acbw.producersMu.Lock() +	defer acbw.producersMu.Unlock() +	for pb, pData := range acbw.producers { +		pData.refs = 0 +		pData.close() +		delete(acbw.producers, pb) +	} +} + +// RegisterHealthListener accepts a health listener from the LB policy. It sends +// updates to the health listener as long as the SubConn's connectivity state +// doesn't change and a new health listener is not registered. To invalidate +// the currently registered health listener, acbw updates the healthData. If a +// nil listener is registered, the active health listener is dropped. +func (acbw *acBalancerWrapper) RegisterHealthListener(listener func(balancer.SubConnState)) { +	acbw.healthMu.Lock() +	defer acbw.healthMu.Unlock() +	// listeners should not be registered when the connectivity state +	// isn't Ready. This may happen when the balancer registers a listener +	// after the connectivityState is updated, but before it is notified +	// of the update. +	if acbw.healthData.connectivityState != connectivity.Ready { +		return +	} +	// Replace the health data to stop sending updates to any previously +	// registered health listeners. +	hd := newHealthData(connectivity.Ready) +	acbw.healthData = hd +	if listener == nil { +		return +	} + +	acbw.ccb.serializer.TrySchedule(func(ctx context.Context) { +		if ctx.Err() != nil || acbw.ccb.balancer == nil { +			return +		} +		// Don't send updates if a new listener is registered. +		acbw.healthMu.Lock() +		defer acbw.healthMu.Unlock() +		curHD := acbw.healthData +		if curHD != hd { +			return +		} +		listener(balancer.SubConnState{ConnectivityState: connectivity.Ready}) +	}) +} diff --git a/vendor/google.golang.org/grpc/binarylog/grpc_binarylog_v1/binarylog.pb.go b/vendor/google.golang.org/grpc/binarylog/grpc_binarylog_v1/binarylog.pb.go index fcd1cfe80..9e9d08069 100644 --- a/vendor/google.golang.org/grpc/binarylog/grpc_binarylog_v1/binarylog.pb.go +++ b/vendor/google.golang.org/grpc/binarylog/grpc_binarylog_v1/binarylog.pb.go @@ -18,7 +18,7 @@  // Code generated by protoc-gen-go. DO NOT EDIT.  // versions: -// 	protoc-gen-go v1.34.1 +// 	protoc-gen-go v1.35.1  // 	protoc        v5.27.1  // source: grpc/binlog/v1/binarylog.proto @@ -274,11 +274,9 @@ type GrpcLogEntry struct {  func (x *GrpcLogEntry) Reset() {  	*x = GrpcLogEntry{} -	if protoimpl.UnsafeEnabled { -		mi := &file_grpc_binlog_v1_binarylog_proto_msgTypes[0] -		ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) -		ms.StoreMessageInfo(mi) -	} +	mi := &file_grpc_binlog_v1_binarylog_proto_msgTypes[0] +	ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) +	ms.StoreMessageInfo(mi)  }  func (x *GrpcLogEntry) String() string { @@ -289,7 +287,7 @@ func (*GrpcLogEntry) ProtoMessage() {}  func (x *GrpcLogEntry) ProtoReflect() protoreflect.Message {  	mi := &file_grpc_binlog_v1_binarylog_proto_msgTypes[0] -	if protoimpl.UnsafeEnabled && x != nil { +	if x != nil {  		ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))  		if ms.LoadMessageInfo() == nil {  			ms.StoreMessageInfo(mi) @@ -440,11 +438,9 @@ type ClientHeader struct {  func (x *ClientHeader) Reset() {  	*x = ClientHeader{} -	if protoimpl.UnsafeEnabled { -		mi := &file_grpc_binlog_v1_binarylog_proto_msgTypes[1] -		ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) -		ms.StoreMessageInfo(mi) -	} +	mi := &file_grpc_binlog_v1_binarylog_proto_msgTypes[1] +	ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) +	ms.StoreMessageInfo(mi)  }  func (x *ClientHeader) String() string { @@ -455,7 +451,7 @@ func (*ClientHeader) ProtoMessage() {}  func (x *ClientHeader) ProtoReflect() protoreflect.Message {  	mi := &file_grpc_binlog_v1_binarylog_proto_msgTypes[1] -	if protoimpl.UnsafeEnabled && x != nil { +	if x != nil {  		ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))  		if ms.LoadMessageInfo() == nil {  			ms.StoreMessageInfo(mi) @@ -509,11 +505,9 @@ type ServerHeader struct {  func (x *ServerHeader) Reset() {  	*x = ServerHeader{} -	if protoimpl.UnsafeEnabled { -		mi := &file_grpc_binlog_v1_binarylog_proto_msgTypes[2] -		ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) -		ms.StoreMessageInfo(mi) -	} +	mi := &file_grpc_binlog_v1_binarylog_proto_msgTypes[2] +	ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) +	ms.StoreMessageInfo(mi)  }  func (x *ServerHeader) String() string { @@ -524,7 +518,7 @@ func (*ServerHeader) ProtoMessage() {}  func (x *ServerHeader) ProtoReflect() protoreflect.Message {  	mi := &file_grpc_binlog_v1_binarylog_proto_msgTypes[2] -	if protoimpl.UnsafeEnabled && x != nil { +	if x != nil {  		ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))  		if ms.LoadMessageInfo() == nil {  			ms.StoreMessageInfo(mi) @@ -565,11 +559,9 @@ type Trailer struct {  func (x *Trailer) Reset() {  	*x = Trailer{} -	if protoimpl.UnsafeEnabled { -		mi := &file_grpc_binlog_v1_binarylog_proto_msgTypes[3] -		ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) -		ms.StoreMessageInfo(mi) -	} +	mi := &file_grpc_binlog_v1_binarylog_proto_msgTypes[3] +	ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) +	ms.StoreMessageInfo(mi)  }  func (x *Trailer) String() string { @@ -580,7 +572,7 @@ func (*Trailer) ProtoMessage() {}  func (x *Trailer) ProtoReflect() protoreflect.Message {  	mi := &file_grpc_binlog_v1_binarylog_proto_msgTypes[3] -	if protoimpl.UnsafeEnabled && x != nil { +	if x != nil {  		ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))  		if ms.LoadMessageInfo() == nil {  			ms.StoreMessageInfo(mi) @@ -638,11 +630,9 @@ type Message struct {  func (x *Message) Reset() {  	*x = Message{} -	if protoimpl.UnsafeEnabled { -		mi := &file_grpc_binlog_v1_binarylog_proto_msgTypes[4] -		ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) -		ms.StoreMessageInfo(mi) -	} +	mi := &file_grpc_binlog_v1_binarylog_proto_msgTypes[4] +	ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) +	ms.StoreMessageInfo(mi)  }  func (x *Message) String() string { @@ -653,7 +643,7 @@ func (*Message) ProtoMessage() {}  func (x *Message) ProtoReflect() protoreflect.Message {  	mi := &file_grpc_binlog_v1_binarylog_proto_msgTypes[4] -	if protoimpl.UnsafeEnabled && x != nil { +	if x != nil {  		ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))  		if ms.LoadMessageInfo() == nil {  			ms.StoreMessageInfo(mi) @@ -713,11 +703,9 @@ type Metadata struct {  func (x *Metadata) Reset() {  	*x = Metadata{} -	if protoimpl.UnsafeEnabled { -		mi := &file_grpc_binlog_v1_binarylog_proto_msgTypes[5] -		ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) -		ms.StoreMessageInfo(mi) -	} +	mi := &file_grpc_binlog_v1_binarylog_proto_msgTypes[5] +	ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) +	ms.StoreMessageInfo(mi)  }  func (x *Metadata) String() string { @@ -728,7 +716,7 @@ func (*Metadata) ProtoMessage() {}  func (x *Metadata) ProtoReflect() protoreflect.Message {  	mi := &file_grpc_binlog_v1_binarylog_proto_msgTypes[5] -	if protoimpl.UnsafeEnabled && x != nil { +	if x != nil {  		ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))  		if ms.LoadMessageInfo() == nil {  			ms.StoreMessageInfo(mi) @@ -762,11 +750,9 @@ type MetadataEntry struct {  func (x *MetadataEntry) Reset() {  	*x = MetadataEntry{} -	if protoimpl.UnsafeEnabled { -		mi := &file_grpc_binlog_v1_binarylog_proto_msgTypes[6] -		ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) -		ms.StoreMessageInfo(mi) -	} +	mi := &file_grpc_binlog_v1_binarylog_proto_msgTypes[6] +	ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) +	ms.StoreMessageInfo(mi)  }  func (x *MetadataEntry) String() string { @@ -777,7 +763,7 @@ func (*MetadataEntry) ProtoMessage() {}  func (x *MetadataEntry) ProtoReflect() protoreflect.Message {  	mi := &file_grpc_binlog_v1_binarylog_proto_msgTypes[6] -	if protoimpl.UnsafeEnabled && x != nil { +	if x != nil {  		ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))  		if ms.LoadMessageInfo() == nil {  			ms.StoreMessageInfo(mi) @@ -820,11 +806,9 @@ type Address struct {  func (x *Address) Reset() {  	*x = Address{} -	if protoimpl.UnsafeEnabled { -		mi := &file_grpc_binlog_v1_binarylog_proto_msgTypes[7] -		ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) -		ms.StoreMessageInfo(mi) -	} +	mi := &file_grpc_binlog_v1_binarylog_proto_msgTypes[7] +	ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) +	ms.StoreMessageInfo(mi)  }  func (x *Address) String() string { @@ -835,7 +819,7 @@ func (*Address) ProtoMessage() {}  func (x *Address) ProtoReflect() protoreflect.Message {  	mi := &file_grpc_binlog_v1_binarylog_proto_msgTypes[7] -	if protoimpl.UnsafeEnabled && x != nil { +	if x != nil {  		ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))  		if ms.LoadMessageInfo() == nil {  			ms.StoreMessageInfo(mi) @@ -1015,7 +999,7 @@ func file_grpc_binlog_v1_binarylog_proto_rawDescGZIP() []byte {  var file_grpc_binlog_v1_binarylog_proto_enumTypes = make([]protoimpl.EnumInfo, 3)  var file_grpc_binlog_v1_binarylog_proto_msgTypes = make([]protoimpl.MessageInfo, 8) -var file_grpc_binlog_v1_binarylog_proto_goTypes = []interface{}{ +var file_grpc_binlog_v1_binarylog_proto_goTypes = []any{  	(GrpcLogEntry_EventType)(0),   // 0: grpc.binarylog.v1.GrpcLogEntry.EventType  	(GrpcLogEntry_Logger)(0),      // 1: grpc.binarylog.v1.GrpcLogEntry.Logger  	(Address_Type)(0),             // 2: grpc.binarylog.v1.Address.Type @@ -1057,105 +1041,7 @@ func file_grpc_binlog_v1_binarylog_proto_init() {  	if File_grpc_binlog_v1_binarylog_proto != nil {  		return  	} -	if !protoimpl.UnsafeEnabled { -		file_grpc_binlog_v1_binarylog_proto_msgTypes[0].Exporter = func(v interface{}, i int) interface{} { -			switch v := v.(*GrpcLogEntry); i { -			case 0: -				return &v.state -			case 1: -				return &v.sizeCache -			case 2: -				return &v.unknownFields -			default: -				return nil -			} -		} -		file_grpc_binlog_v1_binarylog_proto_msgTypes[1].Exporter = func(v interface{}, i int) interface{} { -			switch v := v.(*ClientHeader); i { -			case 0: -				return &v.state -			case 1: -				return &v.sizeCache -			case 2: -				return &v.unknownFields -			default: -				return nil -			} -		} -		file_grpc_binlog_v1_binarylog_proto_msgTypes[2].Exporter = func(v interface{}, i int) interface{} { -			switch v := v.(*ServerHeader); i { -			case 0: -				return &v.state -			case 1: -				return &v.sizeCache -			case 2: -				return &v.unknownFields -			default: -				return nil -			} -		} -		file_grpc_binlog_v1_binarylog_proto_msgTypes[3].Exporter = func(v interface{}, i int) interface{} { -			switch v := v.(*Trailer); i { -			case 0: -				return &v.state -			case 1: -				return &v.sizeCache -			case 2: -				return &v.unknownFields -			default: -				return nil -			} -		} -		file_grpc_binlog_v1_binarylog_proto_msgTypes[4].Exporter = func(v interface{}, i int) interface{} { -			switch v := v.(*Message); i { -			case 0: -				return &v.state -			case 1: -				return &v.sizeCache -			case 2: -				return &v.unknownFields -			default: -				return nil -			} -		} -		file_grpc_binlog_v1_binarylog_proto_msgTypes[5].Exporter = func(v interface{}, i int) interface{} { -			switch v := v.(*Metadata); i { -			case 0: -				return &v.state -			case 1: -				return &v.sizeCache -			case 2: -				return &v.unknownFields -			default: -				return nil -			} -		} -		file_grpc_binlog_v1_binarylog_proto_msgTypes[6].Exporter = func(v interface{}, i int) interface{} { -			switch v := v.(*MetadataEntry); i { -			case 0: -				return &v.state -			case 1: -				return &v.sizeCache -			case 2: -				return &v.unknownFields -			default: -				return nil -			} -		} -		file_grpc_binlog_v1_binarylog_proto_msgTypes[7].Exporter = func(v interface{}, i int) interface{} { -			switch v := v.(*Address); i { -			case 0: -				return &v.state -			case 1: -				return &v.sizeCache -			case 2: -				return &v.unknownFields -			default: -				return nil -			} -		} -	} -	file_grpc_binlog_v1_binarylog_proto_msgTypes[0].OneofWrappers = []interface{}{ +	file_grpc_binlog_v1_binarylog_proto_msgTypes[0].OneofWrappers = []any{  		(*GrpcLogEntry_ClientHeader)(nil),  		(*GrpcLogEntry_ServerHeader)(nil),  		(*GrpcLogEntry_Message)(nil), diff --git a/vendor/google.golang.org/grpc/clientconn.go b/vendor/google.golang.org/grpc/clientconn.go index 9c8850e3f..4f57b5543 100644 --- a/vendor/google.golang.org/grpc/clientconn.go +++ b/vendor/google.golang.org/grpc/clientconn.go @@ -775,10 +775,7 @@ func (cc *ClientConn) updateResolverStateAndUnlock(s resolver.State, err error)  		}  	} -	var balCfg serviceconfig.LoadBalancingConfig -	if cc.sc != nil && cc.sc.lbConfig != nil { -		balCfg = cc.sc.lbConfig -	} +	balCfg := cc.sc.lbConfig  	bw := cc.balancerWrapper  	cc.mu.Unlock() @@ -825,14 +822,13 @@ func (cc *ClientConn) newAddrConnLocked(addrs []resolver.Address, opts balancer.  	}  	ac := &addrConn{ -		state:          connectivity.Idle, -		cc:             cc, -		addrs:          copyAddresses(addrs), -		scopts:         opts, -		dopts:          cc.dopts, -		channelz:       channelz.RegisterSubChannel(cc.channelz, ""), -		resetBackoff:   make(chan struct{}), -		stateReadyChan: make(chan struct{}), +		state:        connectivity.Idle, +		cc:           cc, +		addrs:        copyAddresses(addrs), +		scopts:       opts, +		dopts:        cc.dopts, +		channelz:     channelz.RegisterSubChannel(cc.channelz, ""), +		resetBackoff: make(chan struct{}),  	}  	ac.ctx, ac.cancel = context.WithCancel(cc.ctx)  	// Start with our address set to the first address; this may be updated if @@ -1141,10 +1137,15 @@ func (cc *ClientConn) Close() error {  	<-cc.resolverWrapper.serializer.Done()  	<-cc.balancerWrapper.serializer.Done() - +	var wg sync.WaitGroup  	for ac := range conns { -		ac.tearDown(ErrClientConnClosing) +		wg.Add(1) +		go func(ac *addrConn) { +			defer wg.Done() +			ac.tearDown(ErrClientConnClosing) +		}(ac)  	} +	wg.Wait()  	cc.addTraceEvent("deleted")  	// TraceEvent needs to be called before RemoveEntry, as TraceEvent may add  	// trace reference to the entity being deleted, and thus prevent it from being @@ -1179,8 +1180,7 @@ type addrConn struct {  	addrs   []resolver.Address // All addresses that the resolver resolved to.  	// Use updateConnectivityState for updating addrConn's connectivity state. -	state          connectivity.State -	stateReadyChan chan struct{} // closed and recreated on every READY state change. +	state connectivity.State  	backoffIdx   int // Needs to be stateful for resetConnectBackoff.  	resetBackoff chan struct{} @@ -1251,6 +1251,8 @@ func (ac *addrConn) resetTransportAndUnlock() {  	ac.mu.Unlock()  	if err := ac.tryAllAddrs(acCtx, addrs, connectDeadline); err != nil { +		// TODO: #7534 - Move re-resolution requests into the pick_first LB policy +		// to ensure one resolution request per pass instead of per subconn failure.  		ac.cc.resolveNow(resolver.ResolveNowOptions{})  		ac.mu.Lock()  		if acCtx.Err() != nil { @@ -1292,7 +1294,7 @@ func (ac *addrConn) resetTransportAndUnlock() {  	ac.mu.Unlock()  } -// tryAllAddrs tries to creates a connection to the addresses, and stop when at +// tryAllAddrs tries to create a connection to the addresses, and stop when at  // the first successful one. It returns an error if no address was successfully  // connected, or updates ac appropriately with the new transport.  func (ac *addrConn) tryAllAddrs(ctx context.Context, addrs []resolver.Address, connectDeadline time.Time) error { @@ -1369,7 +1371,7 @@ func (ac *addrConn) createTransport(ctx context.Context, addr resolver.Address,  	defer cancel()  	copts.ChannelzParent = ac.channelz -	newTr, err := transport.NewClientTransport(connectCtx, ac.cc.ctx, addr, copts, onClose) +	newTr, err := transport.NewHTTP2Client(connectCtx, ac.cc.ctx, addr, copts, onClose)  	if err != nil {  		if logger.V(2) {  			logger.Infof("Creating new client transport to %q: %v", addr, err) @@ -1443,7 +1445,7 @@ func (ac *addrConn) startHealthCheck(ctx context.Context) {  	if !ac.scopts.HealthCheckEnabled {  		return  	} -	healthCheckFunc := ac.cc.dopts.healthCheckFunc +	healthCheckFunc := internal.HealthCheckFunc  	if healthCheckFunc == nil {  		// The health package is not imported to set health check function.  		// @@ -1475,7 +1477,7 @@ func (ac *addrConn) startHealthCheck(ctx context.Context) {  	}  	// Start the health checking stream.  	go func() { -		err := ac.cc.dopts.healthCheckFunc(ctx, newStream, setConnectivityState, healthCheckConfig.ServiceName) +		err := healthCheckFunc(ctx, newStream, setConnectivityState, healthCheckConfig.ServiceName)  		if err != nil {  			if status.Code(err) == codes.Unimplemented {  				channelz.Error(logger, ac.channelz, "Subchannel health check is unimplemented at server side, thus health check is disabled") @@ -1504,29 +1506,6 @@ func (ac *addrConn) getReadyTransport() transport.ClientTransport {  	return nil  } -// getTransport waits until the addrconn is ready and returns the transport. -// If the context expires first, returns an appropriate status.  If the -// addrConn is stopped first, returns an Unavailable status error. -func (ac *addrConn) getTransport(ctx context.Context) (transport.ClientTransport, error) { -	for ctx.Err() == nil { -		ac.mu.Lock() -		t, state, sc := ac.transport, ac.state, ac.stateReadyChan -		ac.mu.Unlock() -		if state == connectivity.Ready { -			return t, nil -		} -		if state == connectivity.Shutdown { -			return nil, status.Errorf(codes.Unavailable, "SubConn shutting down") -		} - -		select { -		case <-ctx.Done(): -		case <-sc: -		} -	} -	return nil, status.FromContextError(ctx.Err()).Err() -} -  // tearDown starts to tear down the addrConn.  //  // Note that tearDown doesn't remove ac from ac.cc.conns, so the addrConn struct diff --git a/vendor/google.golang.org/grpc/codec.go b/vendor/google.golang.org/grpc/codec.go index e840858b7..959c2f99d 100644 --- a/vendor/google.golang.org/grpc/codec.go +++ b/vendor/google.golang.org/grpc/codec.go @@ -71,7 +71,7 @@ func (c codecV0Bridge) Marshal(v any) (mem.BufferSlice, error) {  	if err != nil {  		return nil, err  	} -	return mem.BufferSlice{mem.NewBuffer(&data, nil)}, nil +	return mem.BufferSlice{mem.SliceBuffer(data)}, nil  }  func (c codecV0Bridge) Unmarshal(data mem.BufferSlice, v any) (err error) { diff --git a/vendor/google.golang.org/grpc/credentials/insecure/insecure.go b/vendor/google.golang.org/grpc/credentials/insecure/insecure.go index 82bee1443..4c805c644 100644 --- a/vendor/google.golang.org/grpc/credentials/insecure/insecure.go +++ b/vendor/google.golang.org/grpc/credentials/insecure/insecure.go @@ -40,7 +40,7 @@ func NewCredentials() credentials.TransportCredentials {  // NoSecurity.  type insecureTC struct{} -func (insecureTC) ClientHandshake(ctx context.Context, _ string, conn net.Conn) (net.Conn, credentials.AuthInfo, error) { +func (insecureTC) ClientHandshake(_ context.Context, _ string, conn net.Conn) (net.Conn, credentials.AuthInfo, error) {  	return conn, info{credentials.CommonAuthInfo{SecurityLevel: credentials.NoSecurity}}, nil  } diff --git a/vendor/google.golang.org/grpc/credentials/tls.go b/vendor/google.golang.org/grpc/credentials/tls.go index 411435854..e163a473d 100644 --- a/vendor/google.golang.org/grpc/credentials/tls.go +++ b/vendor/google.golang.org/grpc/credentials/tls.go @@ -200,25 +200,40 @@ var tls12ForbiddenCipherSuites = map[uint16]struct{}{  // NewTLS uses c to construct a TransportCredentials based on TLS.  func NewTLS(c *tls.Config) TransportCredentials { -	tc := &tlsCreds{credinternal.CloneTLSConfig(c)} -	tc.config.NextProtos = credinternal.AppendH2ToNextProtos(tc.config.NextProtos) +	config := applyDefaults(c) +	if config.GetConfigForClient != nil { +		oldFn := config.GetConfigForClient +		config.GetConfigForClient = func(hello *tls.ClientHelloInfo) (*tls.Config, error) { +			cfgForClient, err := oldFn(hello) +			if err != nil || cfgForClient == nil { +				return cfgForClient, err +			} +			return applyDefaults(cfgForClient), nil +		} +	} +	return &tlsCreds{config: config} +} + +func applyDefaults(c *tls.Config) *tls.Config { +	config := credinternal.CloneTLSConfig(c) +	config.NextProtos = credinternal.AppendH2ToNextProtos(config.NextProtos)  	// If the user did not configure a MinVersion and did not configure a  	// MaxVersion < 1.2, use MinVersion=1.2, which is required by  	// https://datatracker.ietf.org/doc/html/rfc7540#section-9.2 -	if tc.config.MinVersion == 0 && (tc.config.MaxVersion == 0 || tc.config.MaxVersion >= tls.VersionTLS12) { -		tc.config.MinVersion = tls.VersionTLS12 +	if config.MinVersion == 0 && (config.MaxVersion == 0 || config.MaxVersion >= tls.VersionTLS12) { +		config.MinVersion = tls.VersionTLS12  	}  	// If the user did not configure CipherSuites, use all "secure" cipher  	// suites reported by the TLS package, but remove some explicitly forbidden  	// by https://datatracker.ietf.org/doc/html/rfc7540#appendix-A -	if tc.config.CipherSuites == nil { +	if config.CipherSuites == nil {  		for _, cs := range tls.CipherSuites() {  			if _, ok := tls12ForbiddenCipherSuites[cs.ID]; !ok { -				tc.config.CipherSuites = append(tc.config.CipherSuites, cs.ID) +				config.CipherSuites = append(config.CipherSuites, cs.ID)  			}  		}  	} -	return tc +	return config  }  // NewClientTLSFromCert constructs TLS credentials from the provided root diff --git a/vendor/google.golang.org/grpc/dialoptions.go b/vendor/google.golang.org/grpc/dialoptions.go index 27c1b9bb6..7494ae591 100644 --- a/vendor/google.golang.org/grpc/dialoptions.go +++ b/vendor/google.golang.org/grpc/dialoptions.go @@ -87,7 +87,6 @@ type dialOptions struct {  	disableServiceConfig        bool  	disableRetry                bool  	disableHealthCheck          bool -	healthCheckFunc             internal.HealthChecker  	minConnectTimeout           func() time.Duration  	defaultServiceConfig        *ServiceConfig // defaultServiceConfig is parsed from defaultServiceConfigRawJSON.  	defaultServiceConfigRawJSON *string @@ -436,7 +435,7 @@ func WithTimeout(d time.Duration) DialOption {  // option to true from the Control field. For a concrete example of how to do  // this, see internal.NetDialerWithTCPKeepalive().  // -// For more information, please see [issue 23459] in the Go github repo. +// For more information, please see [issue 23459] in the Go GitHub repo.  //  // [issue 23459]: https://github.com/golang/go/issues/23459  func WithContextDialer(f func(context.Context, string) (net.Conn, error)) DialOption { @@ -445,10 +444,6 @@ func WithContextDialer(f func(context.Context, string) (net.Conn, error)) DialOp  	})  } -func init() { -	internal.WithHealthCheckFunc = withHealthCheckFunc -} -  // WithDialer returns a DialOption that specifies a function to use for dialing  // network addresses. If FailOnNonTempDialError() is set to true, and an error  // is returned by f, gRPC checks the error's Temporary() method to decide if it @@ -518,6 +513,8 @@ func WithUserAgent(s string) DialOption {  // WithKeepaliveParams returns a DialOption that specifies keepalive parameters  // for the client transport. +// +// Keepalive is disabled by default.  func WithKeepaliveParams(kp keepalive.ClientParameters) DialOption {  	if kp.Time < internal.KeepaliveMinPingTime {  		logger.Warningf("Adjusting keepalive ping interval to minimum period of %v", internal.KeepaliveMinPingTime) @@ -660,16 +657,6 @@ func WithDisableHealthCheck() DialOption {  	})  } -// withHealthCheckFunc replaces the default health check function with the -// provided one. It makes tests easier to change the health check function. -// -// For testing purpose only. -func withHealthCheckFunc(f internal.HealthChecker) DialOption { -	return newFuncDialOption(func(o *dialOptions) { -		o.healthCheckFunc = f -	}) -} -  func defaultDialOptions() dialOptions {  	return dialOptions{  		copts: transport.ConnectOptions{ @@ -680,7 +667,6 @@ func defaultDialOptions() dialOptions {  			BufferPool:      mem.DefaultBufferPool(),  		},  		bs:              internalbackoff.DefaultExponential, -		healthCheckFunc: internal.HealthCheckFunc,  		idleTimeout:     30 * time.Minute,  		defaultScheme:   "dns",  		maxCallAttempts: defaultMaxCallAttempts, diff --git a/vendor/google.golang.org/grpc/experimental/stats/metricregistry.go b/vendor/google.golang.org/grpc/experimental/stats/metricregistry.go index 930140f57..ad75313a1 100644 --- a/vendor/google.golang.org/grpc/experimental/stats/metricregistry.go +++ b/vendor/google.golang.org/grpc/experimental/stats/metricregistry.go @@ -20,10 +20,10 @@ package stats  import (  	"maps" -	"testing"  	"google.golang.org/grpc/grpclog"  	"google.golang.org/grpc/internal" +	"google.golang.org/grpc/stats"  )  func init() { @@ -35,7 +35,7 @@ var logger = grpclog.Component("metrics-registry")  // DefaultMetrics are the default metrics registered through global metrics  // registry. This is written to at initialization time only, and is read only  // after initialization. -var DefaultMetrics = NewMetrics() +var DefaultMetrics = stats.NewMetricSet()  // MetricDescriptor is the data for a registered metric.  type MetricDescriptor struct { @@ -43,7 +43,7 @@ type MetricDescriptor struct {  	// (including any per call metrics). See  	// https://github.com/grpc/proposal/blob/master/A79-non-per-call-metrics-architecture.md#metric-instrument-naming-conventions  	// for metric naming conventions. -	Name Metric +	Name string  	// The description of this metric.  	Description string  	// The unit (e.g. entries, seconds) of this metric. @@ -155,27 +155,27 @@ func (h *Int64GaugeHandle) Record(recorder MetricsRecorder, incr int64, labels .  }  // registeredMetrics are the registered metric descriptor names. -var registeredMetrics = make(map[Metric]bool) +var registeredMetrics = make(map[string]bool)  // metricsRegistry contains all of the registered metrics.  //  // This is written to only at init time, and read only after that. -var metricsRegistry = make(map[Metric]*MetricDescriptor) +var metricsRegistry = make(map[string]*MetricDescriptor)  // DescriptorForMetric returns the MetricDescriptor from the global registry.  //  // Returns nil if MetricDescriptor not present. -func DescriptorForMetric(metric Metric) *MetricDescriptor { -	return metricsRegistry[metric] +func DescriptorForMetric(metricName string) *MetricDescriptor { +	return metricsRegistry[metricName]  } -func registerMetric(name Metric, def bool) { -	if registeredMetrics[name] { -		logger.Fatalf("metric %v already registered", name) +func registerMetric(metricName string, def bool) { +	if registeredMetrics[metricName] { +		logger.Fatalf("metric %v already registered", metricName)  	} -	registeredMetrics[name] = true +	registeredMetrics[metricName] = true  	if def { -		DefaultMetrics = DefaultMetrics.Add(name) +		DefaultMetrics = DefaultMetrics.Add(metricName)  	}  } @@ -250,21 +250,21 @@ func RegisterInt64Gauge(descriptor MetricDescriptor) *Int64GaugeHandle {  }  // snapshotMetricsRegistryForTesting snapshots the global data of the metrics -// registry. Registers a cleanup function on the provided testing.T that sets -// the metrics registry to its original state. Only called in testing functions. -func snapshotMetricsRegistryForTesting(t *testing.T) { +// registry. Returns a cleanup function that sets the metrics registry to its +// original state. +func snapshotMetricsRegistryForTesting() func() {  	oldDefaultMetrics := DefaultMetrics  	oldRegisteredMetrics := registeredMetrics  	oldMetricsRegistry := metricsRegistry -	registeredMetrics = make(map[Metric]bool) -	metricsRegistry = make(map[Metric]*MetricDescriptor) +	registeredMetrics = make(map[string]bool) +	metricsRegistry = make(map[string]*MetricDescriptor)  	maps.Copy(registeredMetrics, registeredMetrics)  	maps.Copy(metricsRegistry, metricsRegistry) -	t.Cleanup(func() { +	return func() {  		DefaultMetrics = oldDefaultMetrics  		registeredMetrics = oldRegisteredMetrics  		metricsRegistry = oldMetricsRegistry -	}) +	}  } diff --git a/vendor/google.golang.org/grpc/experimental/stats/metrics.go b/vendor/google.golang.org/grpc/experimental/stats/metrics.go index 3221f7a63..ee1423605 100644 --- a/vendor/google.golang.org/grpc/experimental/stats/metrics.go +++ b/vendor/google.golang.org/grpc/experimental/stats/metrics.go @@ -19,7 +19,7 @@  // Package stats contains experimental metrics/stats API's.  package stats -import "maps" +import "google.golang.org/grpc/stats"  // MetricsRecorder records on metrics derived from metric registry.  type MetricsRecorder interface { @@ -40,75 +40,15 @@ type MetricsRecorder interface {  	RecordInt64Gauge(handle *Int64GaugeHandle, incr int64, labels ...string)  } -// Metric is an identifier for a metric. -type Metric string +// Metrics is an experimental legacy alias of the now-stable stats.MetricSet. +// Metrics will be deleted in a future release. +type Metrics = stats.MetricSet -// Metrics is a set of metrics to record. Once created, Metrics is immutable, -// however Add and Remove can make copies with specific metrics added or -// removed, respectively. -// -// Do not construct directly; use NewMetrics instead. -type Metrics struct { -	// metrics are the set of metrics to initialize. -	metrics map[Metric]bool -} +// Metric was replaced by direct usage of strings. +type Metric = string -// NewMetrics returns a Metrics containing Metrics. +// NewMetrics is an experimental legacy alias of the now-stable +// stats.NewMetricSet.  NewMetrics will be deleted in a future release.  func NewMetrics(metrics ...Metric) *Metrics { -	newMetrics := make(map[Metric]bool) -	for _, metric := range metrics { -		newMetrics[metric] = true -	} -	return &Metrics{ -		metrics: newMetrics, -	} -} - -// Metrics returns the metrics set. The returned map is read-only and must not -// be modified. -func (m *Metrics) Metrics() map[Metric]bool { -	return m.metrics -} - -// Add adds the metrics to the metrics set and returns a new copy with the -// additional metrics. -func (m *Metrics) Add(metrics ...Metric) *Metrics { -	newMetrics := make(map[Metric]bool) -	for metric := range m.metrics { -		newMetrics[metric] = true -	} - -	for _, metric := range metrics { -		newMetrics[metric] = true -	} -	return &Metrics{ -		metrics: newMetrics, -	} -} - -// Join joins the metrics passed in with the metrics set, and returns a new copy -// with the merged metrics. -func (m *Metrics) Join(metrics *Metrics) *Metrics { -	newMetrics := make(map[Metric]bool) -	maps.Copy(newMetrics, m.metrics) -	maps.Copy(newMetrics, metrics.metrics) -	return &Metrics{ -		metrics: newMetrics, -	} -} - -// Remove removes the metrics from the metrics set and returns a new copy with -// the metrics removed. -func (m *Metrics) Remove(metrics ...Metric) *Metrics { -	newMetrics := make(map[Metric]bool) -	for metric := range m.metrics { -		newMetrics[metric] = true -	} - -	for _, metric := range metrics { -		delete(newMetrics, metric) -	} -	return &Metrics{ -		metrics: newMetrics, -	} +	return stats.NewMetricSet(metrics...)  } diff --git a/vendor/google.golang.org/grpc/grpclog/internal/logger.go b/vendor/google.golang.org/grpc/grpclog/internal/logger.go index 0d9a824ce..e524fdd40 100644 --- a/vendor/google.golang.org/grpc/grpclog/internal/logger.go +++ b/vendor/google.golang.org/grpc/grpclog/internal/logger.go @@ -81,7 +81,7 @@ func (l *LoggerWrapper) Errorf(format string, args ...any) {  }  // V reports whether verbosity level l is at least the requested verbose level. -func (*LoggerWrapper) V(l int) bool { +func (*LoggerWrapper) V(int) bool {  	// Returns true for all verbose level.  	return true  } diff --git a/vendor/google.golang.org/grpc/grpclog/internal/loggerv2.go b/vendor/google.golang.org/grpc/grpclog/internal/loggerv2.go index 07df71e98..ed90060c3 100644 --- a/vendor/google.golang.org/grpc/grpclog/internal/loggerv2.go +++ b/vendor/google.golang.org/grpc/grpclog/internal/loggerv2.go @@ -101,6 +101,22 @@ var severityName = []string{  	fatalLog:   "FATAL",  } +// sprintf is fmt.Sprintf. +// These vars exist to make it possible to test that expensive format calls aren't made unnecessarily. +var sprintf = fmt.Sprintf + +// sprint is fmt.Sprint. +// These vars exist to make it possible to test that expensive format calls aren't made unnecessarily. +var sprint = fmt.Sprint + +// sprintln is fmt.Sprintln. +// These vars exist to make it possible to test that expensive format calls aren't made unnecessarily. +var sprintln = fmt.Sprintln + +// exit is os.Exit. +// This var exists to make it possible to test functions calling os.Exit. +var exit = os.Exit +  // loggerT is the default logger used by grpclog.  type loggerT struct {  	m          []*log.Logger @@ -111,7 +127,7 @@ type loggerT struct {  func (g *loggerT) output(severity int, s string) {  	sevStr := severityName[severity]  	if !g.jsonFormat { -		g.m[severity].Output(2, fmt.Sprintf("%v: %v", sevStr, s)) +		g.m[severity].Output(2, sevStr+": "+s)  		return  	}  	// TODO: we can also include the logging component, but that needs more @@ -123,55 +139,79 @@ func (g *loggerT) output(severity int, s string) {  	g.m[severity].Output(2, string(b))  } +func (g *loggerT) printf(severity int, format string, args ...any) { +	// Note the discard check is duplicated in each print func, rather than in +	// output, to avoid the expensive Sprint calls. +	// De-duplicating this by moving to output would be a significant performance regression! +	if lg := g.m[severity]; lg.Writer() == io.Discard { +		return +	} +	g.output(severity, sprintf(format, args...)) +} + +func (g *loggerT) print(severity int, v ...any) { +	if lg := g.m[severity]; lg.Writer() == io.Discard { +		return +	} +	g.output(severity, sprint(v...)) +} + +func (g *loggerT) println(severity int, v ...any) { +	if lg := g.m[severity]; lg.Writer() == io.Discard { +		return +	} +	g.output(severity, sprintln(v...)) +} +  func (g *loggerT) Info(args ...any) { -	g.output(infoLog, fmt.Sprint(args...)) +	g.print(infoLog, args...)  }  func (g *loggerT) Infoln(args ...any) { -	g.output(infoLog, fmt.Sprintln(args...)) +	g.println(infoLog, args...)  }  func (g *loggerT) Infof(format string, args ...any) { -	g.output(infoLog, fmt.Sprintf(format, args...)) +	g.printf(infoLog, format, args...)  }  func (g *loggerT) Warning(args ...any) { -	g.output(warningLog, fmt.Sprint(args...)) +	g.print(warningLog, args...)  }  func (g *loggerT) Warningln(args ...any) { -	g.output(warningLog, fmt.Sprintln(args...)) +	g.println(warningLog, args...)  }  func (g *loggerT) Warningf(format string, args ...any) { -	g.output(warningLog, fmt.Sprintf(format, args...)) +	g.printf(warningLog, format, args...)  }  func (g *loggerT) Error(args ...any) { -	g.output(errorLog, fmt.Sprint(args...)) +	g.print(errorLog, args...)  }  func (g *loggerT) Errorln(args ...any) { -	g.output(errorLog, fmt.Sprintln(args...)) +	g.println(errorLog, args...)  }  func (g *loggerT) Errorf(format string, args ...any) { -	g.output(errorLog, fmt.Sprintf(format, args...)) +	g.printf(errorLog, format, args...)  }  func (g *loggerT) Fatal(args ...any) { -	g.output(fatalLog, fmt.Sprint(args...)) -	os.Exit(1) +	g.print(fatalLog, args...) +	exit(1)  }  func (g *loggerT) Fatalln(args ...any) { -	g.output(fatalLog, fmt.Sprintln(args...)) -	os.Exit(1) +	g.println(fatalLog, args...) +	exit(1)  }  func (g *loggerT) Fatalf(format string, args ...any) { -	g.output(fatalLog, fmt.Sprintf(format, args...)) -	os.Exit(1) +	g.printf(fatalLog, format, args...) +	exit(1)  }  func (g *loggerT) V(l int) bool { @@ -186,19 +226,42 @@ type LoggerV2Config struct {  	FormatJSON bool  } +// combineLoggers returns a combined logger for both higher & lower severity logs, +// or only one if the other is io.Discard. +// +// This uses io.Discard instead of io.MultiWriter when all loggers +// are set to io.Discard. Both this package and the standard log package have +// significant optimizations for io.Discard, which io.MultiWriter lacks (as of +// this writing). +func combineLoggers(lower, higher io.Writer) io.Writer { +	if lower == io.Discard { +		return higher +	} +	if higher == io.Discard { +		return lower +	} +	return io.MultiWriter(lower, higher) +} +  // NewLoggerV2 creates a new LoggerV2 instance with the provided configuration.  // The infoW, warningW, and errorW writers are used to write log messages of  // different severity levels.  func NewLoggerV2(infoW, warningW, errorW io.Writer, c LoggerV2Config) LoggerV2 { -	var m []*log.Logger  	flag := log.LstdFlags  	if c.FormatJSON {  		flag = 0  	} -	m = append(m, log.New(infoW, "", flag)) -	m = append(m, log.New(io.MultiWriter(infoW, warningW), "", flag)) -	ew := io.MultiWriter(infoW, warningW, errorW) // ew will be used for error and fatal. -	m = append(m, log.New(ew, "", flag)) -	m = append(m, log.New(ew, "", flag)) + +	warningW = combineLoggers(infoW, warningW) +	errorW = combineLoggers(errorW, warningW) + +	fatalW := errorW + +	m := []*log.Logger{ +		log.New(infoW, "", flag), +		log.New(warningW, "", flag), +		log.New(errorW, "", flag), +		log.New(fatalW, "", flag), +	}  	return &loggerT{m: m, v: c.Verbosity, jsonFormat: c.FormatJSON}  } diff --git a/vendor/google.golang.org/grpc/health/grpc_health_v1/health.pb.go b/vendor/google.golang.org/grpc/health/grpc_health_v1/health.pb.go index e65cf0ea1..26e16d919 100644 --- a/vendor/google.golang.org/grpc/health/grpc_health_v1/health.pb.go +++ b/vendor/google.golang.org/grpc/health/grpc_health_v1/health.pb.go @@ -17,7 +17,7 @@  // Code generated by protoc-gen-go. DO NOT EDIT.  // versions: -// 	protoc-gen-go v1.34.1 +// 	protoc-gen-go v1.35.1  // 	protoc        v5.27.1  // source: grpc/health/v1/health.proto @@ -99,11 +99,9 @@ type HealthCheckRequest struct {  func (x *HealthCheckRequest) Reset() {  	*x = HealthCheckRequest{} -	if protoimpl.UnsafeEnabled { -		mi := &file_grpc_health_v1_health_proto_msgTypes[0] -		ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) -		ms.StoreMessageInfo(mi) -	} +	mi := &file_grpc_health_v1_health_proto_msgTypes[0] +	ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) +	ms.StoreMessageInfo(mi)  }  func (x *HealthCheckRequest) String() string { @@ -114,7 +112,7 @@ func (*HealthCheckRequest) ProtoMessage() {}  func (x *HealthCheckRequest) ProtoReflect() protoreflect.Message {  	mi := &file_grpc_health_v1_health_proto_msgTypes[0] -	if protoimpl.UnsafeEnabled && x != nil { +	if x != nil {  		ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))  		if ms.LoadMessageInfo() == nil {  			ms.StoreMessageInfo(mi) @@ -146,11 +144,9 @@ type HealthCheckResponse struct {  func (x *HealthCheckResponse) Reset() {  	*x = HealthCheckResponse{} -	if protoimpl.UnsafeEnabled { -		mi := &file_grpc_health_v1_health_proto_msgTypes[1] -		ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) -		ms.StoreMessageInfo(mi) -	} +	mi := &file_grpc_health_v1_health_proto_msgTypes[1] +	ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) +	ms.StoreMessageInfo(mi)  }  func (x *HealthCheckResponse) String() string { @@ -161,7 +157,7 @@ func (*HealthCheckResponse) ProtoMessage() {}  func (x *HealthCheckResponse) ProtoReflect() protoreflect.Message {  	mi := &file_grpc_health_v1_health_proto_msgTypes[1] -	if protoimpl.UnsafeEnabled && x != nil { +	if x != nil {  		ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))  		if ms.LoadMessageInfo() == nil {  			ms.StoreMessageInfo(mi) @@ -237,7 +233,7 @@ func file_grpc_health_v1_health_proto_rawDescGZIP() []byte {  var file_grpc_health_v1_health_proto_enumTypes = make([]protoimpl.EnumInfo, 1)  var file_grpc_health_v1_health_proto_msgTypes = make([]protoimpl.MessageInfo, 2) -var file_grpc_health_v1_health_proto_goTypes = []interface{}{ +var file_grpc_health_v1_health_proto_goTypes = []any{  	(HealthCheckResponse_ServingStatus)(0), // 0: grpc.health.v1.HealthCheckResponse.ServingStatus  	(*HealthCheckRequest)(nil),             // 1: grpc.health.v1.HealthCheckRequest  	(*HealthCheckResponse)(nil),            // 2: grpc.health.v1.HealthCheckResponse @@ -260,32 +256,6 @@ func file_grpc_health_v1_health_proto_init() {  	if File_grpc_health_v1_health_proto != nil {  		return  	} -	if !protoimpl.UnsafeEnabled { -		file_grpc_health_v1_health_proto_msgTypes[0].Exporter = func(v interface{}, i int) interface{} { -			switch v := v.(*HealthCheckRequest); i { -			case 0: -				return &v.state -			case 1: -				return &v.sizeCache -			case 2: -				return &v.unknownFields -			default: -				return nil -			} -		} -		file_grpc_health_v1_health_proto_msgTypes[1].Exporter = func(v interface{}, i int) interface{} { -			switch v := v.(*HealthCheckResponse); i { -			case 0: -				return &v.state -			case 1: -				return &v.sizeCache -			case 2: -				return &v.unknownFields -			default: -				return nil -			} -		} -	}  	type x struct{}  	out := protoimpl.TypeBuilder{  		File: protoimpl.DescBuilder{ diff --git a/vendor/google.golang.org/grpc/internal/backoff/backoff.go b/vendor/google.golang.org/grpc/internal/backoff/backoff.go index b15cf482d..b6ae7f258 100644 --- a/vendor/google.golang.org/grpc/internal/backoff/backoff.go +++ b/vendor/google.golang.org/grpc/internal/backoff/backoff.go @@ -25,7 +25,7 @@ package backoff  import (  	"context"  	"errors" -	"math/rand" +	rand "math/rand/v2"  	"time"  	grpcbackoff "google.golang.org/grpc/backoff" diff --git a/vendor/google.golang.org/grpc/internal/balancer/gracefulswitch/config.go b/vendor/google.golang.org/grpc/internal/balancer/gracefulswitch/config.go index 13821a926..85540f86a 100644 --- a/vendor/google.golang.org/grpc/internal/balancer/gracefulswitch/config.go +++ b/vendor/google.golang.org/grpc/internal/balancer/gracefulswitch/config.go @@ -33,6 +33,8 @@ type lbConfig struct {  	childConfig  serviceconfig.LoadBalancingConfig  } +// ChildName returns the name of the child balancer of the gracefulswitch +// Balancer.  func ChildName(l serviceconfig.LoadBalancingConfig) string {  	return l.(*lbConfig).childBuilder.Name()  } diff --git a/vendor/google.golang.org/grpc/internal/binarylog/method_logger.go b/vendor/google.golang.org/grpc/internal/binarylog/method_logger.go index aa4505a87..966932891 100644 --- a/vendor/google.golang.org/grpc/internal/binarylog/method_logger.go +++ b/vendor/google.golang.org/grpc/internal/binarylog/method_logger.go @@ -106,7 +106,7 @@ func (ml *TruncatingMethodLogger) Build(c LogEntryConfig) *binlogpb.GrpcLogEntry  }  // Log creates a proto binary log entry, and logs it to the sink. -func (ml *TruncatingMethodLogger) Log(ctx context.Context, c LogEntryConfig) { +func (ml *TruncatingMethodLogger) Log(_ context.Context, c LogEntryConfig) {  	ml.sink.Write(ml.Build(c))  } diff --git a/vendor/google.golang.org/grpc/internal/channelz/channel.go b/vendor/google.golang.org/grpc/internal/channelz/channel.go index d7e9e1d54..3ec662799 100644 --- a/vendor/google.golang.org/grpc/internal/channelz/channel.go +++ b/vendor/google.golang.org/grpc/internal/channelz/channel.go @@ -43,6 +43,8 @@ type Channel struct {  	// Non-zero traceRefCount means the trace of this channel cannot be deleted.  	traceRefCount int32 +	// ChannelMetrics holds connectivity state, target and call metrics for the +	// channel within channelz.  	ChannelMetrics ChannelMetrics  } @@ -50,6 +52,8 @@ type Channel struct {  // nesting.  func (c *Channel) channelzIdentifier() {} +// String returns a string representation of the Channel, including its parent +// entity and ID.  func (c *Channel) String() string {  	if c.Parent == nil {  		return fmt.Sprintf("Channel #%d", c.ID) @@ -61,24 +65,31 @@ func (c *Channel) id() int64 {  	return c.ID  } +// SubChans returns a copy of the map of sub-channels associated with the +// Channel.  func (c *Channel) SubChans() map[int64]string {  	db.mu.RLock()  	defer db.mu.RUnlock()  	return copyMap(c.subChans)  } +// NestedChans returns a copy of the map of nested channels associated with the +// Channel.  func (c *Channel) NestedChans() map[int64]string {  	db.mu.RLock()  	defer db.mu.RUnlock()  	return copyMap(c.nestedChans)  } +// Trace returns a copy of the Channel's trace data.  func (c *Channel) Trace() *ChannelTrace {  	db.mu.RLock()  	defer db.mu.RUnlock()  	return c.trace.copy()  } +// ChannelMetrics holds connectivity state, target and call metrics for the +// channel within channelz.  type ChannelMetrics struct {  	// The current connectivity state of the channel.  	State atomic.Pointer[connectivity.State] @@ -136,12 +147,16 @@ func strFromPointer(s *string) string {  	return *s  } +// String returns a string representation of the ChannelMetrics, including its +// state, target, and call metrics.  func (c *ChannelMetrics) String() string {  	return fmt.Sprintf("State: %v, Target: %s, CallsStarted: %v, CallsSucceeded: %v, CallsFailed: %v, LastCallStartedTimestamp: %v",  		c.State.Load(), strFromPointer(c.Target.Load()), c.CallsStarted.Load(), c.CallsSucceeded.Load(), c.CallsFailed.Load(), c.LastCallStartedTimestamp.Load(),  	)  } +// NewChannelMetricForTesting creates a new instance of ChannelMetrics with +// specified initial values for testing purposes.  func NewChannelMetricForTesting(state connectivity.State, target string, started, succeeded, failed, timestamp int64) *ChannelMetrics {  	c := &ChannelMetrics{}  	c.State.Store(&state) diff --git a/vendor/google.golang.org/grpc/internal/channelz/channelmap.go b/vendor/google.golang.org/grpc/internal/channelz/channelmap.go index bb531225d..64c791953 100644 --- a/vendor/google.golang.org/grpc/internal/channelz/channelmap.go +++ b/vendor/google.golang.org/grpc/internal/channelz/channelmap.go @@ -234,13 +234,6 @@ func copyMap(m map[int64]string) map[int64]string {  	return n  } -func min(a, b int) int { -	if a < b { -		return a -	} -	return b -} -  func (c *channelMap) getTopChannels(id int64, maxResults int) ([]*Channel, bool) {  	if maxResults <= 0 {  		maxResults = EntriesPerPage diff --git a/vendor/google.golang.org/grpc/internal/channelz/funcs.go b/vendor/google.golang.org/grpc/internal/channelz/funcs.go index 03e24e150..078bb8123 100644 --- a/vendor/google.golang.org/grpc/internal/channelz/funcs.go +++ b/vendor/google.golang.org/grpc/internal/channelz/funcs.go @@ -33,7 +33,7 @@ var (  	// outside this package except by tests.  	IDGen IDGenerator -	db *channelMap = newChannelMap() +	db = newChannelMap()  	// EntriesPerPage defines the number of channelz entries to be shown on a web page.  	EntriesPerPage = 50  	curState       int32 diff --git a/vendor/google.golang.org/grpc/internal/channelz/server.go b/vendor/google.golang.org/grpc/internal/channelz/server.go index cdfc49d6e..b5a824992 100644 --- a/vendor/google.golang.org/grpc/internal/channelz/server.go +++ b/vendor/google.golang.org/grpc/internal/channelz/server.go @@ -59,6 +59,8 @@ func NewServerMetricsForTesting(started, succeeded, failed, timestamp int64) *Se  	return sm  } +// CopyFrom copies the metrics data from the provided ServerMetrics +// instance into the current instance.  func (sm *ServerMetrics) CopyFrom(o *ServerMetrics) {  	sm.CallsStarted.Store(o.CallsStarted.Load())  	sm.CallsSucceeded.Store(o.CallsSucceeded.Load()) diff --git a/vendor/google.golang.org/grpc/internal/channelz/socket.go b/vendor/google.golang.org/grpc/internal/channelz/socket.go index fa64834b2..90103847c 100644 --- a/vendor/google.golang.org/grpc/internal/channelz/socket.go +++ b/vendor/google.golang.org/grpc/internal/channelz/socket.go @@ -70,13 +70,18 @@ type EphemeralSocketMetrics struct {  	RemoteFlowControlWindow int64  } +// SocketType represents the type of socket.  type SocketType string +// SocketType can be one of these.  const (  	SocketTypeNormal = "NormalSocket"  	SocketTypeListen = "ListenSocket"  ) +// Socket represents a socket within channelz which includes socket +// metrics and data related to socket activity and provides methods +// for managing and interacting with sockets.  type Socket struct {  	Entity  	SocketType       SocketType @@ -100,6 +105,8 @@ type Socket struct {  	Security credentials.ChannelzSecurityValue  } +// String returns a string representation of the Socket, including its parent +// entity, socket type, and ID.  func (ls *Socket) String() string {  	return fmt.Sprintf("%s %s #%d", ls.Parent, ls.SocketType, ls.ID)  } diff --git a/vendor/google.golang.org/grpc/internal/channelz/subchannel.go b/vendor/google.golang.org/grpc/internal/channelz/subchannel.go index 3b88e4cba..b20802e6e 100644 --- a/vendor/google.golang.org/grpc/internal/channelz/subchannel.go +++ b/vendor/google.golang.org/grpc/internal/channelz/subchannel.go @@ -47,12 +47,14 @@ func (sc *SubChannel) id() int64 {  	return sc.ID  } +// Sockets returns a copy of the sockets map associated with the SubChannel.  func (sc *SubChannel) Sockets() map[int64]string {  	db.mu.RLock()  	defer db.mu.RUnlock()  	return copyMap(sc.sockets)  } +// Trace returns a copy of the ChannelTrace associated with the SubChannel.  func (sc *SubChannel) Trace() *ChannelTrace {  	db.mu.RLock()  	defer db.mu.RUnlock() diff --git a/vendor/google.golang.org/grpc/internal/channelz/syscall_nonlinux.go b/vendor/google.golang.org/grpc/internal/channelz/syscall_nonlinux.go index d1ed8df6a..0e6e18e18 100644 --- a/vendor/google.golang.org/grpc/internal/channelz/syscall_nonlinux.go +++ b/vendor/google.golang.org/grpc/internal/channelz/syscall_nonlinux.go @@ -35,13 +35,13 @@ type SocketOptionData struct {  // Getsockopt defines the function to get socket options requested by channelz.  // It is to be passed to syscall.RawConn.Control().  // Windows OS doesn't support Socket Option -func (s *SocketOptionData) Getsockopt(fd uintptr) { +func (s *SocketOptionData) Getsockopt(uintptr) {  	once.Do(func() {  		logger.Warning("Channelz: socket options are not supported on non-linux environments")  	})  }  // GetSocketOption gets the socket option info of the conn. -func GetSocketOption(c any) *SocketOptionData { +func GetSocketOption(any) *SocketOptionData {  	return nil  } diff --git a/vendor/google.golang.org/grpc/internal/channelz/trace.go b/vendor/google.golang.org/grpc/internal/channelz/trace.go index 36b867403..2bffe4777 100644 --- a/vendor/google.golang.org/grpc/internal/channelz/trace.go +++ b/vendor/google.golang.org/grpc/internal/channelz/trace.go @@ -79,13 +79,21 @@ type TraceEvent struct {  	Parent   *TraceEvent  } +// ChannelTrace provides tracing information for a channel. +// It tracks various events and metadata related to the channel's lifecycle +// and operations.  type ChannelTrace struct { -	cm           *channelMap -	clearCalled  bool +	cm          *channelMap +	clearCalled bool +	// The time when the trace was created.  	CreationTime time.Time -	EventNum     int64 -	mu           sync.Mutex -	Events       []*traceEvent +	// A counter for the number of events recorded in the +	// trace. +	EventNum int64 +	mu       sync.Mutex +	// A slice of traceEvent pointers representing the events recorded for +	// this channel. +	Events []*traceEvent  }  func (c *ChannelTrace) copy() *ChannelTrace { @@ -175,6 +183,7 @@ var refChannelTypeToString = map[RefChannelType]string{  	RefNormalSocket: "NormalSocket",  } +// String returns a string representation of the RefChannelType  func (r RefChannelType) String() string {  	return refChannelTypeToString[r]  } diff --git a/vendor/google.golang.org/grpc/internal/envconfig/envconfig.go b/vendor/google.golang.org/grpc/internal/envconfig/envconfig.go index 00abc7c2b..6e7dd6b77 100644 --- a/vendor/google.golang.org/grpc/internal/envconfig/envconfig.go +++ b/vendor/google.golang.org/grpc/internal/envconfig/envconfig.go @@ -45,11 +45,16 @@ var (  	// option is present for backward compatibility. This option may be overridden  	// by setting the environment variable "GRPC_ENFORCE_ALPN_ENABLED" to "true"  	// or "false". -	EnforceALPNEnabled = boolFromEnv("GRPC_ENFORCE_ALPN_ENABLED", false) +	EnforceALPNEnabled = boolFromEnv("GRPC_ENFORCE_ALPN_ENABLED", true)  	// XDSFallbackSupport is the env variable that controls whether support for  	// xDS fallback is turned on. If this is unset or is false, only the first  	// xDS server in the list of server configs will be used.  	XDSFallbackSupport = boolFromEnv("GRPC_EXPERIMENTAL_XDS_FALLBACK", false) +	// NewPickFirstEnabled is set if the new pickfirst leaf policy is to be used +	// instead of the exiting pickfirst implementation. This can be enabled by +	// setting the environment variable "GRPC_EXPERIMENTAL_ENABLE_NEW_PICK_FIRST" +	// to "true". +	NewPickFirstEnabled = boolFromEnv("GRPC_EXPERIMENTAL_ENABLE_NEW_PICK_FIRST", false)  )  func boolFromEnv(envVar string, def bool) bool { diff --git a/vendor/google.golang.org/grpc/internal/grpcsync/callback_serializer.go b/vendor/google.golang.org/grpc/internal/grpcsync/callback_serializer.go index 19b9d6392..8e8e86128 100644 --- a/vendor/google.golang.org/grpc/internal/grpcsync/callback_serializer.go +++ b/vendor/google.golang.org/grpc/internal/grpcsync/callback_serializer.go @@ -53,7 +53,7 @@ func NewCallbackSerializer(ctx context.Context) *CallbackSerializer {  	return cs  } -// TrySchedule tries to schedules the provided callback function f to be +// TrySchedule tries to schedule the provided callback function f to be  // executed in the order it was added. This is a best-effort operation. If the  // context passed to NewCallbackSerializer was canceled before this method is  // called, the callback will not be scheduled. diff --git a/vendor/google.golang.org/grpc/internal/grpcutil/method.go b/vendor/google.golang.org/grpc/internal/grpcutil/method.go index ec62b4775..683d1955c 100644 --- a/vendor/google.golang.org/grpc/internal/grpcutil/method.go +++ b/vendor/google.golang.org/grpc/internal/grpcutil/method.go @@ -39,7 +39,7 @@ func ParseMethod(methodName string) (service, method string, _ error) {  }  // baseContentType is the base content-type for gRPC.  This is a valid -// content-type on it's own, but can also include a content-subtype such as +// content-type on its own, but can also include a content-subtype such as  // "proto" as a suffix after "+" or ";".  See  // https://github.com/grpc/grpc/blob/master/doc/PROTOCOL-HTTP2.md#requests  // for more details. diff --git a/vendor/google.golang.org/grpc/internal/idle/idle.go b/vendor/google.golang.org/grpc/internal/idle/idle.go index fe49cb74c..2c13ee9da 100644 --- a/vendor/google.golang.org/grpc/internal/idle/idle.go +++ b/vendor/google.golang.org/grpc/internal/idle/idle.go @@ -182,6 +182,7 @@ func (m *Manager) tryEnterIdleMode() bool {  	return true  } +// EnterIdleModeForTesting instructs the channel to enter idle mode.  func (m *Manager) EnterIdleModeForTesting() {  	m.tryEnterIdleMode()  } @@ -225,7 +226,7 @@ func (m *Manager) ExitIdleMode() error {  		//   came in and OnCallBegin() noticed that the calls count is negative.  		// - Channel is in idle mode, and multiple new RPCs come in at the same  		//   time, all of them notice a negative calls count in OnCallBegin and get -		//   here. The first one to get the lock would got the channel to exit idle. +		//   here. The first one to get the lock would get the channel to exit idle.  		// - Channel is not in idle mode, and the user calls Connect which calls  		//   m.ExitIdleMode.  		// @@ -266,6 +267,7 @@ func (m *Manager) isClosed() bool {  	return atomic.LoadInt32(&m.closed) == 1  } +// Close stops the timer associated with the Manager, if it exists.  func (m *Manager) Close() {  	atomic.StoreInt32(&m.closed, 1) diff --git a/vendor/google.golang.org/grpc/internal/internal.go b/vendor/google.golang.org/grpc/internal/internal.go index 65f936a62..3afc18134 100644 --- a/vendor/google.golang.org/grpc/internal/internal.go +++ b/vendor/google.golang.org/grpc/internal/internal.go @@ -29,8 +29,6 @@ import (  )  var ( -	// WithHealthCheckFunc is set by dialoptions.go -	WithHealthCheckFunc any // func (HealthChecker) DialOption  	// HealthCheckFunc is used to provide client-side LB channel health checking  	HealthCheckFunc HealthChecker  	// BalancerUnregister is exported by package balancer to unregister a balancer. @@ -149,6 +147,20 @@ var (  	// other features, including the CSDS service.  	NewXDSResolverWithConfigForTesting any // func([]byte) (resolver.Builder, error) +	// NewXDSResolverWithClientForTesting creates a new xDS resolver builder +	// using the provided xDS client instead of creating a new one using the +	// bootstrap configuration specified by the supported environment variables. +	// The resolver.Builder is meant to be used in conjunction with the +	// grpc.WithResolvers DialOption. The resolver.Builder does not take +	// ownership of the provided xDS client and it is the responsibility of the +	// caller to close the client when no longer required. +	// +	// Testing Only +	// +	// This function should ONLY be used for testing and may not work with some +	// other features, including the CSDS service. +	NewXDSResolverWithClientForTesting any // func(xdsclient.XDSClient) (resolver.Builder, error) +  	// RegisterRLSClusterSpecifierPluginForTesting registers the RLS Cluster  	// Specifier Plugin for testing purposes, regardless of the XDSRLS environment  	// variable. @@ -183,7 +195,7 @@ var (  	// GRPCResolverSchemeExtraMetadata determines when gRPC will add extra  	// metadata to RPCs. -	GRPCResolverSchemeExtraMetadata string = "xds" +	GRPCResolverSchemeExtraMetadata = "xds"  	// EnterIdleModeForTesting gets the ClientConn to enter IDLE mode.  	EnterIdleModeForTesting any // func(*grpc.ClientConn) @@ -191,6 +203,8 @@ var (  	// ExitIdleModeForTesting gets the ClientConn to exit IDLE mode.  	ExitIdleModeForTesting any // func(*grpc.ClientConn) error +	// ChannelzTurnOffForTesting disables the Channelz service for testing +	// purposes.  	ChannelzTurnOffForTesting func()  	// TriggerXDSResourceNotFoundForTesting causes the provided xDS Client to @@ -203,11 +217,7 @@ var (  	// UserSetDefaultScheme is set to true if the user has overridden the  	// default resolver scheme. -	UserSetDefaultScheme bool = false - -	// ShuffleAddressListForTesting pseudo-randomizes the order of addresses.  n -	// is the number of elements.  swap swaps the elements with indexes i and j. -	ShuffleAddressListForTesting any // func(n int, swap func(i, j int)) +	UserSetDefaultScheme = false  	// ConnectedAddress returns the connected address for a SubConnState. The  	// address is only valid if the state is READY. @@ -217,10 +227,9 @@ var (  	SetConnectedAddress any // func(scs *SubConnState, addr resolver.Address)  	// SnapshotMetricRegistryForTesting snapshots the global data of the metric -	// registry. Registers a cleanup function on the provided testing.T that -	// sets the metric registry to its original state. Only called in testing -	// functions. -	SnapshotMetricRegistryForTesting any // func(t *testing.T) +	// registry. Returns a cleanup function that sets the metric registry to its +	// original state. Only called in testing functions. +	SnapshotMetricRegistryForTesting func() func()  	// SetDefaultBufferPoolForTesting updates the default buffer pool, for  	// testing purposes. @@ -236,7 +245,7 @@ var (  //  // The implementation is expected to create a health checking RPC stream by  // calling newStream(), watch for the health status of serviceName, and report -// it's health back by calling setConnectivityState(). +// its health back by calling setConnectivityState().  //  // The health checking protocol is defined at:  // https://github.com/grpc/grpc/blob/master/doc/health-checking.md @@ -258,3 +267,9 @@ const (  // It currently has an experimental suffix which would be removed once  // end-to-end testing of the policy is completed.  const RLSLoadBalancingPolicyName = "rls_experimental" + +// EnforceSubConnEmbedding is used to enforce proper SubConn implementation +// embedding. +type EnforceSubConnEmbedding interface { +	enforceSubConnEmbedding() +} diff --git a/vendor/google.golang.org/grpc/internal/resolver/dns/dns_resolver.go b/vendor/google.golang.org/grpc/internal/resolver/dns/dns_resolver.go index 4552db16b..ba5c5a95d 100644 --- a/vendor/google.golang.org/grpc/internal/resolver/dns/dns_resolver.go +++ b/vendor/google.golang.org/grpc/internal/resolver/dns/dns_resolver.go @@ -24,8 +24,9 @@ import (  	"context"  	"encoding/json"  	"fmt" -	"math/rand" +	rand "math/rand/v2"  	"net" +	"net/netip"  	"os"  	"strconv"  	"strings" @@ -122,7 +123,7 @@ func (b *dnsBuilder) Build(target resolver.Target, cc resolver.ClientConn, opts  	}  	// IP address. -	if ipAddr, ok := formatIP(host); ok { +	if ipAddr, err := formatIP(host); err == nil {  		addr := []resolver.Address{{Addr: ipAddr + ":" + port}}  		cc.UpdateState(resolver.State{Addresses: addr})  		return deadResolver{}, nil @@ -177,7 +178,7 @@ type dnsResolver struct {  	// finished. Otherwise, data race will be possible. [Race Example] in  	// dns_resolver_test we replace the real lookup functions with mocked ones to  	// facilitate testing. If Close() doesn't wait for watcher() goroutine -	// finishes, race detector sometimes will warns lookup (READ the lookup +	// finishes, race detector sometimes will warn lookup (READ the lookup  	// function pointers) inside watcher() goroutine has data race with  	// replaceNetFunc (WRITE the lookup function pointers).  	wg                   sync.WaitGroup @@ -237,7 +238,9 @@ func (d *dnsResolver) watcher() {  }  func (d *dnsResolver) lookupSRV(ctx context.Context) ([]resolver.Address, error) { -	if !EnableSRVLookups { +	// Skip this particular host to avoid timeouts with some versions of +	// systemd-resolved. +	if !EnableSRVLookups || d.host == "metadata.google.internal." {  		return nil, nil  	}  	var newAddrs []resolver.Address @@ -258,9 +261,9 @@ func (d *dnsResolver) lookupSRV(ctx context.Context) ([]resolver.Address, error)  			return nil, err  		}  		for _, a := range lbAddrs { -			ip, ok := formatIP(a) -			if !ok { -				return nil, fmt.Errorf("dns: error parsing A record IP address %v", a) +			ip, err := formatIP(a) +			if err != nil { +				return nil, fmt.Errorf("dns: error parsing A record IP address %v: %v", a, err)  			}  			addr := ip + ":" + strconv.Itoa(int(s.Port))  			newAddrs = append(newAddrs, resolver.Address{Addr: addr, ServerName: s.Target}) @@ -320,9 +323,9 @@ func (d *dnsResolver) lookupHost(ctx context.Context) ([]resolver.Address, error  	}  	newAddrs := make([]resolver.Address, 0, len(addrs))  	for _, a := range addrs { -		ip, ok := formatIP(a) -		if !ok { -			return nil, fmt.Errorf("dns: error parsing A record IP address %v", a) +		ip, err := formatIP(a) +		if err != nil { +			return nil, fmt.Errorf("dns: error parsing A record IP address %v: %v", a, err)  		}  		addr := ip + ":" + d.port  		newAddrs = append(newAddrs, resolver.Address{Addr: addr}) @@ -349,19 +352,19 @@ func (d *dnsResolver) lookup() (*resolver.State, error) {  	return &state, nil  } -// formatIP returns ok = false if addr is not a valid textual representation of -// an IP address. If addr is an IPv4 address, return the addr and ok = true. +// formatIP returns an error if addr is not a valid textual representation of +// an IP address. If addr is an IPv4 address, return the addr and error = nil.  // If addr is an IPv6 address, return the addr enclosed in square brackets and -// ok = true. -func formatIP(addr string) (addrIP string, ok bool) { -	ip := net.ParseIP(addr) -	if ip == nil { -		return "", false +// error = nil. +func formatIP(addr string) (string, error) { +	ip, err := netip.ParseAddr(addr) +	if err != nil { +		return "", err  	} -	if ip.To4() != nil { -		return addr, true +	if ip.Is4() { +		return addr, nil  	} -	return "[" + addr + "]", true +	return "[" + addr + "]", nil  }  // parseTarget takes the user input target string and default port, returns @@ -377,7 +380,7 @@ func parseTarget(target, defaultPort string) (host, port string, err error) {  	if target == "" {  		return "", "", internal.ErrMissingAddr  	} -	if ip := net.ParseIP(target); ip != nil { +	if _, err := netip.ParseAddr(target); err == nil {  		// target is an IPv4 or IPv6(without brackets) address  		return target, defaultPort, nil  	} @@ -425,7 +428,7 @@ func chosenByPercentage(a *int) bool {  	if a == nil {  		return true  	} -	return rand.Intn(100)+1 <= *a +	return rand.IntN(100)+1 <= *a  }  func canaryingSC(js string) string { diff --git a/vendor/google.golang.org/grpc/internal/resolver/passthrough/passthrough.go b/vendor/google.golang.org/grpc/internal/resolver/passthrough/passthrough.go index afac56572..b901c7bac 100644 --- a/vendor/google.golang.org/grpc/internal/resolver/passthrough/passthrough.go +++ b/vendor/google.golang.org/grpc/internal/resolver/passthrough/passthrough.go @@ -55,7 +55,7 @@ func (r *passthroughResolver) start() {  	r.cc.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: r.target.Endpoint()}}})  } -func (*passthroughResolver) ResolveNow(o resolver.ResolveNowOptions) {} +func (*passthroughResolver) ResolveNow(resolver.ResolveNowOptions) {}  func (*passthroughResolver) Close() {} diff --git a/vendor/google.golang.org/grpc/internal/stats/metrics_recorder_list.go b/vendor/google.golang.org/grpc/internal/stats/metrics_recorder_list.go index be110d41f..79044657b 100644 --- a/vendor/google.golang.org/grpc/internal/stats/metrics_recorder_list.go +++ b/vendor/google.golang.org/grpc/internal/stats/metrics_recorder_list.go @@ -54,6 +54,8 @@ func verifyLabels(desc *estats.MetricDescriptor, labelsRecv ...string) {  	}  } +// RecordInt64Count records the measurement alongside labels on the int +// count associated with the provided handle.  func (l *MetricsRecorderList) RecordInt64Count(handle *estats.Int64CountHandle, incr int64, labels ...string) {  	verifyLabels(handle.Descriptor(), labels...) @@ -62,6 +64,8 @@ func (l *MetricsRecorderList) RecordInt64Count(handle *estats.Int64CountHandle,  	}  } +// RecordFloat64Count records the measurement alongside labels on the float +// count associated with the provided handle.  func (l *MetricsRecorderList) RecordFloat64Count(handle *estats.Float64CountHandle, incr float64, labels ...string) {  	verifyLabels(handle.Descriptor(), labels...) @@ -70,6 +74,8 @@ func (l *MetricsRecorderList) RecordFloat64Count(handle *estats.Float64CountHand  	}  } +// RecordInt64Histo records the measurement alongside labels on the int +// histo associated with the provided handle.  func (l *MetricsRecorderList) RecordInt64Histo(handle *estats.Int64HistoHandle, incr int64, labels ...string) {  	verifyLabels(handle.Descriptor(), labels...) @@ -78,6 +84,8 @@ func (l *MetricsRecorderList) RecordInt64Histo(handle *estats.Int64HistoHandle,  	}  } +// RecordFloat64Histo records the measurement alongside labels on the float +// histo associated with the provided handle.  func (l *MetricsRecorderList) RecordFloat64Histo(handle *estats.Float64HistoHandle, incr float64, labels ...string) {  	verifyLabels(handle.Descriptor(), labels...) @@ -86,6 +94,8 @@ func (l *MetricsRecorderList) RecordFloat64Histo(handle *estats.Float64HistoHand  	}  } +// RecordInt64Gauge records the measurement alongside labels on the int +// gauge associated with the provided handle.  func (l *MetricsRecorderList) RecordInt64Gauge(handle *estats.Int64GaugeHandle, incr int64, labels ...string) {  	verifyLabels(handle.Descriptor(), labels...) diff --git a/vendor/google.golang.org/grpc/internal/status/status.go b/vendor/google.golang.org/grpc/internal/status/status.go index c7dbc8205..1186f1e9a 100644 --- a/vendor/google.golang.org/grpc/internal/status/status.go +++ b/vendor/google.golang.org/grpc/internal/status/status.go @@ -138,17 +138,19 @@ func (s *Status) WithDetails(details ...protoadapt.MessageV1) (*Status, error) {  	// s.Code() != OK implies that s.Proto() != nil.  	p := s.Proto()  	for _, detail := range details { -		any, err := anypb.New(protoadapt.MessageV2Of(detail)) +		m, err := anypb.New(protoadapt.MessageV2Of(detail))  		if err != nil {  			return nil, err  		} -		p.Details = append(p.Details, any) +		p.Details = append(p.Details, m)  	}  	return &Status{s: p}, nil  }  // Details returns a slice of details messages attached to the status.  // If a detail cannot be decoded, the error is returned in place of the detail. +// If the detail can be decoded, the proto message returned is of the same +// type that was given to WithDetails().  func (s *Status) Details() []any {  	if s == nil || s.s == nil {  		return nil @@ -160,7 +162,38 @@ func (s *Status) Details() []any {  			details = append(details, err)  			continue  		} -		details = append(details, detail) +		// The call to MessageV1Of is required to unwrap the proto message if +		// it implemented only the MessageV1 API. The proto message would have +		// been wrapped in a V2 wrapper in Status.WithDetails. V2 messages are +		// added to a global registry used by any.UnmarshalNew(). +		// MessageV1Of has the following behaviour: +		// 1. If the given message is a wrapped MessageV1, it returns the +		//   unwrapped value. +		// 2. If the given message already implements MessageV1, it returns it +		//   as is. +		// 3. Else, it wraps the MessageV2 in a MessageV1 wrapper. +		// +		// Since the Status.WithDetails() API only accepts MessageV1, calling +		// MessageV1Of ensures we return the same type that was given to +		// WithDetails: +		// * If the give type implemented only MessageV1, the unwrapping from +		//   point 1 above will restore the type. +		// * If the given type implemented both MessageV1 and MessageV2, point 2 +		//   above will ensure no wrapping is performed. +		// * If the given type implemented only MessageV2 and was wrapped using +		//   MessageV1Of before passing to WithDetails(), it would be unwrapped +		//   in WithDetails by calling MessageV2Of(). Point 3 above will ensure +		//   that the type is wrapped in a MessageV1 wrapper again before +		//   returning. Note that protoc-gen-go doesn't generate code which +		//   implements ONLY MessageV2 at the time of writing. +		// +		// NOTE: Status details can also be added using the FromProto method. +		// This could theoretically allow passing a Detail message that only +		// implements the V2 API. In such a case the message will be wrapped in +		// a MessageV1 wrapper when fetched using Details(). +		// Since protoc-gen-go generates only code that implements both V1 and +		// V2 APIs for backward compatibility, this is not a concern. +		details = append(details, protoadapt.MessageV1Of(detail))  	}  	return details  } diff --git a/vendor/google.golang.org/grpc/internal/syscall/syscall_nonlinux.go b/vendor/google.golang.org/grpc/internal/syscall/syscall_nonlinux.go index 999f52cd7..54c24c2ff 100644 --- a/vendor/google.golang.org/grpc/internal/syscall/syscall_nonlinux.go +++ b/vendor/google.golang.org/grpc/internal/syscall/syscall_nonlinux.go @@ -58,20 +58,20 @@ func GetRusage() *Rusage {  // CPUTimeDiff returns the differences of user CPU time and system CPU time used  // between two Rusage structs. It a no-op function for non-linux environments. -func CPUTimeDiff(first *Rusage, latest *Rusage) (float64, float64) { +func CPUTimeDiff(*Rusage, *Rusage) (float64, float64) {  	log()  	return 0, 0  }  // SetTCPUserTimeout is a no-op function under non-linux environments. -func SetTCPUserTimeout(conn net.Conn, timeout time.Duration) error { +func SetTCPUserTimeout(net.Conn, time.Duration) error {  	log()  	return nil  }  // GetTCPUserTimeout is a no-op function under non-linux environments.  // A negative return value indicates the operation is not supported -func GetTCPUserTimeout(conn net.Conn) (int, error) { +func GetTCPUserTimeout(net.Conn) (int, error) {  	log()  	return -1, nil  } diff --git a/vendor/google.golang.org/grpc/internal/transport/client_stream.go b/vendor/google.golang.org/grpc/internal/transport/client_stream.go new file mode 100644 index 000000000..8ed347c54 --- /dev/null +++ b/vendor/google.golang.org/grpc/internal/transport/client_stream.go @@ -0,0 +1,144 @@ +/* + * + * Copyright 2024 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + *     http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package transport + +import ( +	"sync/atomic" + +	"golang.org/x/net/http2" +	"google.golang.org/grpc/mem" +	"google.golang.org/grpc/metadata" +	"google.golang.org/grpc/status" +) + +// ClientStream implements streaming functionality for a gRPC client. +type ClientStream struct { +	*Stream // Embed for common stream functionality. + +	ct       *http2Client +	done     chan struct{} // closed at the end of stream to unblock writers. +	doneFunc func()        // invoked at the end of stream. + +	headerChan       chan struct{} // closed to indicate the end of header metadata. +	headerChanClosed uint32        // set when headerChan is closed. Used to avoid closing headerChan multiple times. +	// headerValid indicates whether a valid header was received.  Only +	// meaningful after headerChan is closed (always call waitOnHeader() before +	// reading its value). +	headerValid bool +	header      metadata.MD // the received header metadata +	noHeaders   bool        // set if the client never received headers (set only after the stream is done). + +	bytesReceived atomic.Bool // indicates whether any bytes have been received on this stream +	unprocessed   atomic.Bool // set if the server sends a refused stream or GOAWAY including this stream + +	status *status.Status // the status error received from the server +} + +// Read reads an n byte message from the input stream. +func (s *ClientStream) Read(n int) (mem.BufferSlice, error) { +	b, err := s.Stream.read(n) +	if err == nil { +		s.ct.incrMsgRecv() +	} +	return b, err +} + +// Close closes the stream and popagates err to any readers. +func (s *ClientStream) Close(err error) { +	var ( +		rst     bool +		rstCode http2.ErrCode +	) +	if err != nil { +		rst = true +		rstCode = http2.ErrCodeCancel +	} +	s.ct.closeStream(s, err, rst, rstCode, status.Convert(err), nil, false) +} + +// Write writes the hdr and data bytes to the output stream. +func (s *ClientStream) Write(hdr []byte, data mem.BufferSlice, opts *WriteOptions) error { +	return s.ct.write(s, hdr, data, opts) +} + +// BytesReceived indicates whether any bytes have been received on this stream. +func (s *ClientStream) BytesReceived() bool { +	return s.bytesReceived.Load() +} + +// Unprocessed indicates whether the server did not process this stream -- +// i.e. it sent a refused stream or GOAWAY including this stream ID. +func (s *ClientStream) Unprocessed() bool { +	return s.unprocessed.Load() +} + +func (s *ClientStream) waitOnHeader() { +	select { +	case <-s.ctx.Done(): +		// Close the stream to prevent headers/trailers from changing after +		// this function returns. +		s.Close(ContextErr(s.ctx.Err())) +		// headerChan could possibly not be closed yet if closeStream raced +		// with operateHeaders; wait until it is closed explicitly here. +		<-s.headerChan +	case <-s.headerChan: +	} +} + +// RecvCompress returns the compression algorithm applied to the inbound +// message. It is empty string if there is no compression applied. +func (s *ClientStream) RecvCompress() string { +	s.waitOnHeader() +	return s.recvCompress +} + +// Done returns a channel which is closed when it receives the final status +// from the server. +func (s *ClientStream) Done() <-chan struct{} { +	return s.done +} + +// Header returns the header metadata of the stream. Acquires the key-value +// pairs of header metadata once it is available. It blocks until i) the +// metadata is ready or ii) there is no header metadata or iii) the stream is +// canceled/expired. +func (s *ClientStream) Header() (metadata.MD, error) { +	s.waitOnHeader() + +	if !s.headerValid || s.noHeaders { +		return nil, s.status.Err() +	} + +	return s.header.Copy(), nil +} + +// TrailersOnly blocks until a header or trailers-only frame is received and +// then returns true if the stream was trailers-only.  If the stream ends +// before headers are received, returns true, nil. +func (s *ClientStream) TrailersOnly() bool { +	s.waitOnHeader() +	return s.noHeaders +} + +// Status returns the status received from the server. +// Status can be read safely only after the stream has ended, +// that is, after Done() is closed. +func (s *ClientStream) Status() *status.Status { +	return s.status +} diff --git a/vendor/google.golang.org/grpc/internal/transport/controlbuf.go b/vendor/google.golang.org/grpc/internal/transport/controlbuf.go index ea0633bbd..ef72fbb3a 100644 --- a/vendor/google.golang.org/grpc/internal/transport/controlbuf.go +++ b/vendor/google.golang.org/grpc/internal/transport/controlbuf.go @@ -1033,10 +1033,3 @@ func (l *loopyWriter) processData() (bool, error) {  	}  	return false, nil  } - -func min(a, b int) int { -	if a < b { -		return a -	} -	return b -} diff --git a/vendor/google.golang.org/grpc/internal/transport/flowcontrol.go b/vendor/google.golang.org/grpc/internal/transport/flowcontrol.go index 97198c515..dfc0f224e 100644 --- a/vendor/google.golang.org/grpc/internal/transport/flowcontrol.go +++ b/vendor/google.golang.org/grpc/internal/transport/flowcontrol.go @@ -92,14 +92,11 @@ func (f *trInFlow) newLimit(n uint32) uint32 {  func (f *trInFlow) onData(n uint32) uint32 {  	f.unacked += n -	if f.unacked >= f.limit/4 { -		w := f.unacked -		f.unacked = 0 +	if f.unacked < f.limit/4 {  		f.updateEffectiveWindowSize() -		return w +		return 0  	} -	f.updateEffectiveWindowSize() -	return 0 +	return f.reset()  }  func (f *trInFlow) reset() uint32 { diff --git a/vendor/google.golang.org/grpc/internal/transport/handler_server.go b/vendor/google.golang.org/grpc/internal/transport/handler_server.go index e1cd86b2f..d9305a65d 100644 --- a/vendor/google.golang.org/grpc/internal/transport/handler_server.go +++ b/vendor/google.golang.org/grpc/internal/transport/handler_server.go @@ -225,7 +225,7 @@ func (ht *serverHandlerTransport) do(fn func()) error {  	}  } -func (ht *serverHandlerTransport) WriteStatus(s *Stream, st *status.Status) error { +func (ht *serverHandlerTransport) writeStatus(s *ServerStream, st *status.Status) error {  	ht.writeStatusMu.Lock()  	defer ht.writeStatusMu.Unlock() @@ -289,14 +289,14 @@ func (ht *serverHandlerTransport) WriteStatus(s *Stream, st *status.Status) erro  // writePendingHeaders sets common and custom headers on the first  // write call (Write, WriteHeader, or WriteStatus) -func (ht *serverHandlerTransport) writePendingHeaders(s *Stream) { +func (ht *serverHandlerTransport) writePendingHeaders(s *ServerStream) {  	ht.writeCommonHeaders(s)  	ht.writeCustomHeaders(s)  }  // writeCommonHeaders sets common headers on the first write  // call (Write, WriteHeader, or WriteStatus). -func (ht *serverHandlerTransport) writeCommonHeaders(s *Stream) { +func (ht *serverHandlerTransport) writeCommonHeaders(s *ServerStream) {  	h := ht.rw.Header()  	h["Date"] = nil // suppress Date to make tests happy; TODO: restore  	h.Set("Content-Type", ht.contentType) @@ -317,7 +317,7 @@ func (ht *serverHandlerTransport) writeCommonHeaders(s *Stream) {  // writeCustomHeaders sets custom headers set on the stream via SetHeader  // on the first write call (Write, WriteHeader, or WriteStatus) -func (ht *serverHandlerTransport) writeCustomHeaders(s *Stream) { +func (ht *serverHandlerTransport) writeCustomHeaders(s *ServerStream) {  	h := ht.rw.Header()  	s.hdrMu.Lock() @@ -333,7 +333,7 @@ func (ht *serverHandlerTransport) writeCustomHeaders(s *Stream) {  	s.hdrMu.Unlock()  } -func (ht *serverHandlerTransport) Write(s *Stream, hdr []byte, data mem.BufferSlice, opts *Options) error { +func (ht *serverHandlerTransport) write(s *ServerStream, hdr []byte, data mem.BufferSlice, _ *WriteOptions) error {  	// Always take a reference because otherwise there is no guarantee the data will  	// be available after this function returns. This is what callers to Write  	// expect. @@ -357,7 +357,7 @@ func (ht *serverHandlerTransport) Write(s *Stream, hdr []byte, data mem.BufferSl  	return nil  } -func (ht *serverHandlerTransport) WriteHeader(s *Stream, md metadata.MD) error { +func (ht *serverHandlerTransport) writeHeader(s *ServerStream, md metadata.MD) error {  	if err := s.SetHeader(md); err != nil {  		return err  	} @@ -385,7 +385,7 @@ func (ht *serverHandlerTransport) WriteHeader(s *Stream, md metadata.MD) error {  	return err  } -func (ht *serverHandlerTransport) HandleStreams(ctx context.Context, startStream func(*Stream)) { +func (ht *serverHandlerTransport) HandleStreams(ctx context.Context, startStream func(*ServerStream)) {  	// With this transport type there will be exactly 1 stream: this HTTP request.  	var cancel context.CancelFunc  	if ht.timeoutSet { @@ -408,16 +408,18 @@ func (ht *serverHandlerTransport) HandleStreams(ctx context.Context, startStream  	ctx = metadata.NewIncomingContext(ctx, ht.headerMD)  	req := ht.req -	s := &Stream{ -		id:               0, // irrelevant -		ctx:              ctx, -		requestRead:      func(int) {}, +	s := &ServerStream{ +		Stream: &Stream{ +			id:             0, // irrelevant +			ctx:            ctx, +			requestRead:    func(int) {}, +			buf:            newRecvBuffer(), +			method:         req.URL.Path, +			recvCompress:   req.Header.Get("grpc-encoding"), +			contentSubtype: ht.contentSubtype, +		},  		cancel:           cancel, -		buf:              newRecvBuffer(),  		st:               ht, -		method:           req.URL.Path, -		recvCompress:     req.Header.Get("grpc-encoding"), -		contentSubtype:   ht.contentSubtype,  		headerWireLength: 0, // won't have access to header wire length until golang/go#18997.  	}  	s.trReader = &transportReader{ @@ -471,11 +473,9 @@ func (ht *serverHandlerTransport) runStream() {  	}  } -func (ht *serverHandlerTransport) IncrMsgSent() {} +func (ht *serverHandlerTransport) incrMsgRecv() {} -func (ht *serverHandlerTransport) IncrMsgRecv() {} - -func (ht *serverHandlerTransport) Drain(debugData string) { +func (ht *serverHandlerTransport) Drain(string) {  	panic("Drain() is not implemented")  } diff --git a/vendor/google.golang.org/grpc/internal/transport/http2_client.go b/vendor/google.golang.org/grpc/internal/transport/http2_client.go index f46194fdc..f323ab7f4 100644 --- a/vendor/google.golang.org/grpc/internal/transport/http2_client.go +++ b/vendor/google.golang.org/grpc/internal/transport/http2_client.go @@ -86,9 +86,9 @@ type http2Client struct {  	writerDone chan struct{} // sync point to enable testing.  	// goAway is closed to notify the upper layer (i.e., addrConn.transportMonitor)  	// that the server sent GoAway on this transport. -	goAway chan struct{} - -	framer *framer +	goAway        chan struct{} +	keepaliveDone chan struct{} // Closed when the keepalive goroutine exits. +	framer        *framer  	// controlBuf delivers all the control related tasks (e.g., window  	// updates, reset streams, and various settings) to the controller.  	// Do not access controlBuf with mu held. @@ -123,7 +123,7 @@ type http2Client struct {  	mu            sync.Mutex // guard the following variables  	nextID        uint32  	state         transportState -	activeStreams map[uint32]*Stream +	activeStreams map[uint32]*ClientStream  	// prevGoAway ID records the Last-Stream-ID in the previous GOAway frame.  	prevGoAwayID uint32  	// goAwayReason records the http2.ErrCode and debug data received with the @@ -199,10 +199,10 @@ func isTemporary(err error) bool {  	return true  } -// newHTTP2Client constructs a connected ClientTransport to addr based on HTTP2 +// NewHTTP2Client constructs a connected ClientTransport to addr based on HTTP2  // and starts to receive messages on it. Non-nil error returns if construction  // fails. -func newHTTP2Client(connectCtx, ctx context.Context, addr resolver.Address, opts ConnectOptions, onClose func(GoAwayReason)) (_ *http2Client, err error) { +func NewHTTP2Client(connectCtx, ctx context.Context, addr resolver.Address, opts ConnectOptions, onClose func(GoAwayReason)) (_ ClientTransport, err error) {  	scheme := "http"  	ctx, cancel := context.WithCancel(ctx)  	defer func() { @@ -335,10 +335,11 @@ func newHTTP2Client(connectCtx, ctx context.Context, addr resolver.Address, opts  		readerDone:            make(chan struct{}),  		writerDone:            make(chan struct{}),  		goAway:                make(chan struct{}), +		keepaliveDone:         make(chan struct{}),  		framer:                newFramer(conn, writeBufSize, readBufSize, opts.SharedWriteBuffer, maxHeaderListSize),  		fc:                    &trInFlow{limit: uint32(icwz)},  		scheme:                scheme, -		activeStreams:         make(map[uint32]*Stream), +		activeStreams:         make(map[uint32]*ClientStream),  		isSecure:              isSecure,  		perRPCCreds:           perRPCCreds,  		kp:                    kp, @@ -479,17 +480,19 @@ func newHTTP2Client(connectCtx, ctx context.Context, addr resolver.Address, opts  	return t, nil  } -func (t *http2Client) newStream(ctx context.Context, callHdr *CallHdr) *Stream { +func (t *http2Client) newStream(ctx context.Context, callHdr *CallHdr) *ClientStream {  	// TODO(zhaoq): Handle uint32 overflow of Stream.id. -	s := &Stream{ -		ct:             t, -		done:           make(chan struct{}), -		method:         callHdr.Method, -		sendCompress:   callHdr.SendCompress, -		buf:            newRecvBuffer(), -		headerChan:     make(chan struct{}), -		contentSubtype: callHdr.ContentSubtype, -		doneFunc:       callHdr.DoneFunc, +	s := &ClientStream{ +		Stream: &Stream{ +			method:         callHdr.Method, +			sendCompress:   callHdr.SendCompress, +			buf:            newRecvBuffer(), +			contentSubtype: callHdr.ContentSubtype, +		}, +		ct:         t, +		done:       make(chan struct{}), +		headerChan: make(chan struct{}), +		doneFunc:   callHdr.DoneFunc,  	}  	s.wq = newWriteQuota(defaultWriteQuota, s.done)  	s.requestRead = func(n int) { @@ -505,7 +508,7 @@ func (t *http2Client) newStream(ctx context.Context, callHdr *CallHdr) *Stream {  			ctxDone: s.ctx.Done(),  			recv:    s.buf,  			closeStream: func(err error) { -				t.CloseStream(s, err) +				s.Close(err)  			},  		},  		windowHandler: func(n int) { @@ -527,8 +530,9 @@ func (t *http2Client) getPeer() *peer.Peer {  // to be the last frame loopy writes to the transport.  func (t *http2Client) outgoingGoAwayHandler(g *goAway) (bool, error) {  	t.mu.Lock() -	defer t.mu.Unlock() -	if err := t.framer.fr.WriteGoAway(t.nextID-2, http2.ErrCodeNo, g.debugData); err != nil { +	maxStreamID := t.nextID - 2 +	t.mu.Unlock() +	if err := t.framer.fr.WriteGoAway(maxStreamID, http2.ErrCodeNo, g.debugData); err != nil {  		return false, err  	}  	return false, g.closeConn @@ -595,12 +599,6 @@ func (t *http2Client) createHeaderFields(ctx context.Context, callHdr *CallHdr)  	for k, v := range callAuthData {  		headerFields = append(headerFields, hpack.HeaderField{Name: k, Value: encodeMetadataHeader(k, v)})  	} -	if b := stats.OutgoingTags(ctx); b != nil { -		headerFields = append(headerFields, hpack.HeaderField{Name: "grpc-tags-bin", Value: encodeBinHeader(b)}) -	} -	if b := stats.OutgoingTrace(ctx); b != nil { -		headerFields = append(headerFields, hpack.HeaderField{Name: "grpc-trace-bin", Value: encodeBinHeader(b)}) -	}  	if md, added, ok := metadataFromOutgoingContextRaw(ctx); ok {  		var k string @@ -736,7 +734,7 @@ func (e NewStreamError) Error() string {  // NewStream creates a stream and registers it into the transport as "active"  // streams.  All non-nil errors returned will be *NewStreamError. -func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (*Stream, error) { +func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (*ClientStream, error) {  	ctx = peer.NewContext(ctx, t.getPeer())  	// ServerName field of the resolver returned address takes precedence over @@ -761,7 +759,7 @@ func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (*Stream,  			return  		}  		// The stream was unprocessed by the server. -		atomic.StoreUint32(&s.unprocessed, 1) +		s.unprocessed.Store(true)  		s.write(recvMsg{err: err})  		close(s.done)  		// If headerChan isn't closed, then close it. @@ -772,7 +770,7 @@ func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (*Stream,  	hdr := &headerFrame{  		hf:        headerFields,  		endStream: false, -		initStream: func(id uint32) error { +		initStream: func(uint32) error {  			t.mu.Lock()  			// TODO: handle transport closure in loopy instead and remove this  			// initStream is never called when transport is draining. @@ -906,21 +904,7 @@ func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (*Stream,  	return s, nil  } -// CloseStream clears the footprint of a stream when the stream is not needed any more. -// This must not be executed in reader's goroutine. -func (t *http2Client) CloseStream(s *Stream, err error) { -	var ( -		rst     bool -		rstCode http2.ErrCode -	) -	if err != nil { -		rst = true -		rstCode = http2.ErrCodeCancel -	} -	t.closeStream(s, err, rst, rstCode, status.Convert(err), nil, false) -} - -func (t *http2Client) closeStream(s *Stream, err error, rst bool, rstCode http2.ErrCode, st *status.Status, mdata map[string][]string, eosReceived bool) { +func (t *http2Client) closeStream(s *ClientStream, err error, rst bool, rstCode http2.ErrCode, st *status.Status, mdata map[string][]string, eosReceived bool) {  	// Set stream status to done.  	if s.swapState(streamDone) == streamDone {  		// If it was already done, return.  If multiple closeStream calls @@ -1008,6 +992,9 @@ func (t *http2Client) Close(err error) {  		// should unblock it so that the goroutine eventually exits.  		t.kpDormancyCond.Signal()  	} +	// Append info about previous goaways if there were any, since this may be important +	// for understanding the root cause for this connection to be closed. +	goAwayDebugMessage := t.goAwayDebugMessage  	t.mu.Unlock()  	// Per HTTP/2 spec, a GOAWAY frame must be sent before closing the @@ -1025,11 +1012,13 @@ func (t *http2Client) Close(err error) {  	}  	t.cancel()  	t.conn.Close() +	// Waits for the reader and keepalive goroutines to exit before returning to +	// ensure all resources are cleaned up before Close can return. +	<-t.readerDone +	if t.keepaliveEnabled { +		<-t.keepaliveDone +	}  	channelz.RemoveEntry(t.channelz.ID) -	// Append info about previous goaways if there were any, since this may be important -	// for understanding the root cause for this connection to be closed. -	_, goAwayDebugMessage := t.GetGoAwayReason() -  	var st *status.Status  	if len(goAwayDebugMessage) > 0 {  		st = status.Newf(codes.Unavailable, "closing transport due to: %v, received prior goaway: %v", err, goAwayDebugMessage) @@ -1078,7 +1067,7 @@ func (t *http2Client) GracefulClose() {  // Write formats the data into HTTP2 data frame(s) and sends it out. The caller  // should proceed only if Write returns nil. -func (t *http2Client) Write(s *Stream, hdr []byte, data mem.BufferSlice, opts *Options) error { +func (t *http2Client) write(s *ClientStream, hdr []byte, data mem.BufferSlice, opts *WriteOptions) error {  	reader := data.Reader()  	if opts.Last { @@ -1107,10 +1096,11 @@ func (t *http2Client) Write(s *Stream, hdr []byte, data mem.BufferSlice, opts *O  		_ = reader.Close()  		return err  	} +	t.incrMsgSent()  	return nil  } -func (t *http2Client) getStream(f http2.Frame) *Stream { +func (t *http2Client) getStream(f http2.Frame) *ClientStream {  	t.mu.Lock()  	s := t.activeStreams[f.Header().StreamID]  	t.mu.Unlock() @@ -1120,7 +1110,7 @@ func (t *http2Client) getStream(f http2.Frame) *Stream {  // adjustWindow sends out extra window update over the initial window size  // of stream if the application is requesting data larger in size than  // the window. -func (t *http2Client) adjustWindow(s *Stream, n uint32) { +func (t *http2Client) adjustWindow(s *ClientStream, n uint32) {  	if w := s.fc.maybeAdjust(n); w > 0 {  		t.controlBuf.put(&outgoingWindowUpdate{streamID: s.id, increment: w})  	} @@ -1129,7 +1119,7 @@ func (t *http2Client) adjustWindow(s *Stream, n uint32) {  // updateWindow adjusts the inbound quota for the stream.  // Window updates will be sent out when the cumulative quota  // exceeds the corresponding threshold. -func (t *http2Client) updateWindow(s *Stream, n uint32) { +func (t *http2Client) updateWindow(s *ClientStream, n uint32) {  	if w := s.fc.onRead(n); w > 0 {  		t.controlBuf.put(&outgoingWindowUpdate{streamID: s.id, increment: w})  	} @@ -1235,7 +1225,7 @@ func (t *http2Client) handleRSTStream(f *http2.RSTStreamFrame) {  	}  	if f.ErrCode == http2.ErrCodeRefusedStream {  		// The stream was unprocessed by the server. -		atomic.StoreUint32(&s.unprocessed, 1) +		s.unprocessed.Store(true)  	}  	statusCode, ok := http2ErrConvTab[f.ErrCode]  	if !ok { @@ -1316,11 +1306,11 @@ func (t *http2Client) handlePing(f *http2.PingFrame) {  	t.controlBuf.put(pingAck)  } -func (t *http2Client) handleGoAway(f *http2.GoAwayFrame) { +func (t *http2Client) handleGoAway(f *http2.GoAwayFrame) error {  	t.mu.Lock()  	if t.state == closing {  		t.mu.Unlock() -		return +		return nil  	}  	if f.ErrCode == http2.ErrCodeEnhanceYourCalm && string(f.DebugData()) == "too_many_pings" {  		// When a client receives a GOAWAY with error code ENHANCE_YOUR_CALM and debug @@ -1332,8 +1322,7 @@ func (t *http2Client) handleGoAway(f *http2.GoAwayFrame) {  	id := f.LastStreamID  	if id > 0 && id%2 == 0 {  		t.mu.Unlock() -		t.Close(connectionErrorf(true, nil, "received goaway with non-zero even-numbered stream id: %v", id)) -		return +		return connectionErrorf(true, nil, "received goaway with non-zero even-numbered stream id: %v", id)  	}  	// A client can receive multiple GoAways from the server (see  	// https://github.com/grpc/grpc-go/issues/1387).  The idea is that the first @@ -1350,8 +1339,7 @@ func (t *http2Client) handleGoAway(f *http2.GoAwayFrame) {  		// If there are multiple GoAways the first one should always have an ID greater than the following ones.  		if id > t.prevGoAwayID {  			t.mu.Unlock() -			t.Close(connectionErrorf(true, nil, "received goaway with stream id: %v, which exceeds stream id of previous goaway: %v", id, t.prevGoAwayID)) -			return +			return connectionErrorf(true, nil, "received goaway with stream id: %v, which exceeds stream id of previous goaway: %v", id, t.prevGoAwayID)  		}  	default:  		t.setGoAwayReason(f) @@ -1375,15 +1363,14 @@ func (t *http2Client) handleGoAway(f *http2.GoAwayFrame) {  	t.prevGoAwayID = id  	if len(t.activeStreams) == 0 {  		t.mu.Unlock() -		t.Close(connectionErrorf(true, nil, "received goaway and there are no active streams")) -		return +		return connectionErrorf(true, nil, "received goaway and there are no active streams")  	} -	streamsToClose := make([]*Stream, 0) +	streamsToClose := make([]*ClientStream, 0)  	for streamID, stream := range t.activeStreams {  		if streamID > id && streamID <= upperLimit {  			// The stream was unprocessed by the server. -			atomic.StoreUint32(&stream.unprocessed, 1) +			stream.unprocessed.Store(true)  			streamsToClose = append(streamsToClose, stream)  		}  	} @@ -1393,6 +1380,7 @@ func (t *http2Client) handleGoAway(f *http2.GoAwayFrame) {  	for _, stream := range streamsToClose {  		t.closeStream(stream, errStreamDrain, false, http2.ErrCodeNo, statusGoAway, nil, false)  	} +	return nil  }  // setGoAwayReason sets the value of t.goAwayReason based @@ -1434,7 +1422,7 @@ func (t *http2Client) operateHeaders(frame *http2.MetaHeadersFrame) {  		return  	}  	endStream := frame.StreamEnded() -	atomic.StoreUint32(&s.bytesReceived, 1) +	s.bytesReceived.Store(true)  	initialHeader := atomic.LoadUint32(&s.headerChanClosed) == 0  	if !initialHeader && !endStream { @@ -1628,7 +1616,13 @@ func (t *http2Client) readServerPreface() error {  // network connection.  If the server preface is not read successfully, an  // error is pushed to errCh; otherwise errCh is closed with no error.  func (t *http2Client) reader(errCh chan<- error) { -	defer close(t.readerDone) +	var errClose error +	defer func() { +		close(t.readerDone) +		if errClose != nil { +			t.Close(errClose) +		} +	}()  	if err := t.readServerPreface(); err != nil {  		errCh <- err @@ -1667,11 +1661,10 @@ func (t *http2Client) reader(errCh chan<- error) {  					t.closeStream(s, status.Error(code, msg), true, http2.ErrCodeProtocol, status.New(code, msg), nil, false)  				}  				continue -			} else { -				// Transport error. -				t.Close(connectionErrorf(true, err, "error reading from server: %v", err)) -				return  			} +			// Transport error. +			errClose = connectionErrorf(true, err, "error reading from server: %v", err) +			return  		}  		switch frame := frame.(type) {  		case *http2.MetaHeadersFrame: @@ -1685,7 +1678,7 @@ func (t *http2Client) reader(errCh chan<- error) {  		case *http2.PingFrame:  			t.handlePing(frame)  		case *http2.GoAwayFrame: -			t.handleGoAway(frame) +			errClose = t.handleGoAway(frame)  		case *http2.WindowUpdateFrame:  			t.handleWindowUpdate(frame)  		default: @@ -1696,15 +1689,15 @@ func (t *http2Client) reader(errCh chan<- error) {  	}  } -func minTime(a, b time.Duration) time.Duration { -	if a < b { -		return a -	} -	return b -} -  // keepalive running in a separate goroutine makes sure the connection is alive by sending pings.  func (t *http2Client) keepalive() { +	var err error +	defer func() { +		close(t.keepaliveDone) +		if err != nil { +			t.Close(err) +		} +	}()  	p := &ping{data: [8]byte{}}  	// True iff a ping has been sent, and no data has been received since then.  	outstandingPing := false @@ -1728,7 +1721,7 @@ func (t *http2Client) keepalive() {  				continue  			}  			if outstandingPing && timeoutLeft <= 0 { -				t.Close(connectionErrorf(true, nil, "keepalive ping failed to receive ACK within timeout")) +				err = connectionErrorf(true, nil, "keepalive ping failed to receive ACK within timeout")  				return  			}  			t.mu.Lock() @@ -1770,7 +1763,7 @@ func (t *http2Client) keepalive() {  			// timeoutLeft. This will ensure that we wait only for kp.Time  			// before sending out the next ping (for cases where the ping is  			// acked). -			sleepDuration := minTime(t.kp.Time, timeoutLeft) +			sleepDuration := min(t.kp.Time, timeoutLeft)  			timeoutLeft -= sleepDuration  			timer.Reset(sleepDuration)  		case <-t.ctx.Done(): @@ -1799,14 +1792,18 @@ func (t *http2Client) socketMetrics() *channelz.EphemeralSocketMetrics {  func (t *http2Client) RemoteAddr() net.Addr { return t.remoteAddr } -func (t *http2Client) IncrMsgSent() { -	t.channelz.SocketMetrics.MessagesSent.Add(1) -	t.channelz.SocketMetrics.LastMessageSentTimestamp.Store(time.Now().UnixNano()) +func (t *http2Client) incrMsgSent() { +	if channelz.IsOn() { +		t.channelz.SocketMetrics.MessagesSent.Add(1) +		t.channelz.SocketMetrics.LastMessageSentTimestamp.Store(time.Now().UnixNano()) +	}  } -func (t *http2Client) IncrMsgRecv() { -	t.channelz.SocketMetrics.MessagesReceived.Add(1) -	t.channelz.SocketMetrics.LastMessageReceivedTimestamp.Store(time.Now().UnixNano()) +func (t *http2Client) incrMsgRecv() { +	if channelz.IsOn() { +		t.channelz.SocketMetrics.MessagesReceived.Add(1) +		t.channelz.SocketMetrics.LastMessageReceivedTimestamp.Store(time.Now().UnixNano()) +	}  }  func (t *http2Client) getOutFlowWindow() int64 { diff --git a/vendor/google.golang.org/grpc/internal/transport/http2_server.go b/vendor/google.golang.org/grpc/internal/transport/http2_server.go index f5163f770..0055fddd7 100644 --- a/vendor/google.golang.org/grpc/internal/transport/http2_server.go +++ b/vendor/google.golang.org/grpc/internal/transport/http2_server.go @@ -25,7 +25,7 @@ import (  	"fmt"  	"io"  	"math" -	"math/rand" +	rand "math/rand/v2"  	"net"  	"net/http"  	"strconv" @@ -111,7 +111,7 @@ type http2Server struct {  	// already initialized since draining is already underway.  	drainEvent    *grpcsync.Event  	state         transportState -	activeStreams map[uint32]*Stream +	activeStreams map[uint32]*ServerStream  	// idle is the time instant when the connection went idle.  	// This is either the beginning of the connection or when the number of  	// RPCs go down to 0. @@ -256,7 +256,7 @@ func NewServerTransport(conn net.Conn, config *ServerConfig) (_ ServerTransport,  		inTapHandle:       config.InTapHandle,  		fc:                &trInFlow{limit: uint32(icwz)},  		state:             reachable, -		activeStreams:     make(map[uint32]*Stream), +		activeStreams:     make(map[uint32]*ServerStream),  		stats:             config.StatsHandlers,  		kp:                kp,  		idle:              time.Now(), @@ -359,7 +359,7 @@ func NewServerTransport(conn net.Conn, config *ServerConfig) (_ ServerTransport,  // operateHeaders takes action on the decoded headers. Returns an error if fatal  // error encountered and transport needs to close, otherwise returns nil. -func (t *http2Server) operateHeaders(ctx context.Context, frame *http2.MetaHeadersFrame, handle func(*Stream)) error { +func (t *http2Server) operateHeaders(ctx context.Context, frame *http2.MetaHeadersFrame, handle func(*ServerStream)) error {  	// Acquire max stream ID lock for entire duration  	t.maxStreamMu.Lock()  	defer t.maxStreamMu.Unlock() @@ -385,11 +385,13 @@ func (t *http2Server) operateHeaders(ctx context.Context, frame *http2.MetaHeade  	t.maxStreamID = streamID  	buf := newRecvBuffer() -	s := &Stream{ -		id:               streamID, +	s := &ServerStream{ +		Stream: &Stream{ +			id:  streamID, +			buf: buf, +			fc:  &inFlow{limit: uint32(t.initialWindowSize)}, +		},  		st:               t, -		buf:              buf, -		fc:               &inFlow{limit: uint32(t.initialWindowSize)},  		headerWireLength: int(frame.Header().Length),  	}  	var ( @@ -537,12 +539,6 @@ func (t *http2Server) operateHeaders(ctx context.Context, frame *http2.MetaHeade  	// Attach the received metadata to the context.  	if len(mdata) > 0 {  		s.ctx = metadata.NewIncomingContext(s.ctx, mdata) -		if statsTags := mdata["grpc-tags-bin"]; len(statsTags) > 0 { -			s.ctx = stats.SetIncomingTags(s.ctx, []byte(statsTags[len(statsTags)-1])) -		} -		if statsTrace := mdata["grpc-trace-bin"]; len(statsTrace) > 0 { -			s.ctx = stats.SetIncomingTrace(s.ctx, []byte(statsTrace[len(statsTrace)-1])) -		}  	}  	t.mu.Lock()  	if t.state != reachable { @@ -634,7 +630,7 @@ func (t *http2Server) operateHeaders(ctx context.Context, frame *http2.MetaHeade  // HandleStreams receives incoming streams using the given handler. This is  // typically run in a separate goroutine.  // traceCtx attaches trace to ctx and returns the new context. -func (t *http2Server) HandleStreams(ctx context.Context, handle func(*Stream)) { +func (t *http2Server) HandleStreams(ctx context.Context, handle func(*ServerStream)) {  	defer func() {  		close(t.readerDone)  		<-t.loopyWriterDone @@ -698,7 +694,7 @@ func (t *http2Server) HandleStreams(ctx context.Context, handle func(*Stream)) {  	}  } -func (t *http2Server) getStream(f http2.Frame) (*Stream, bool) { +func (t *http2Server) getStream(f http2.Frame) (*ServerStream, bool) {  	t.mu.Lock()  	defer t.mu.Unlock()  	if t.activeStreams == nil { @@ -716,7 +712,7 @@ func (t *http2Server) getStream(f http2.Frame) (*Stream, bool) {  // adjustWindow sends out extra window update over the initial window size  // of stream if the application is requesting data larger in size than  // the window. -func (t *http2Server) adjustWindow(s *Stream, n uint32) { +func (t *http2Server) adjustWindow(s *ServerStream, n uint32) {  	if w := s.fc.maybeAdjust(n); w > 0 {  		t.controlBuf.put(&outgoingWindowUpdate{streamID: s.id, increment: w})  	} @@ -726,7 +722,7 @@ func (t *http2Server) adjustWindow(s *Stream, n uint32) {  // updateWindow adjusts the inbound quota for the stream and the transport.  // Window updates will deliver to the controller for sending when  // the cumulative quota exceeds the corresponding threshold. -func (t *http2Server) updateWindow(s *Stream, n uint32) { +func (t *http2Server) updateWindow(s *ServerStream, n uint32) {  	if w := s.fc.onRead(n); w > 0 {  		t.controlBuf.put(&outgoingWindowUpdate{streamID: s.id,  			increment: w, @@ -963,7 +959,7 @@ func (t *http2Server) checkForHeaderListSize(it any) bool {  	return true  } -func (t *http2Server) streamContextErr(s *Stream) error { +func (t *http2Server) streamContextErr(s *ServerStream) error {  	select {  	case <-t.done:  		return ErrConnClosing @@ -973,7 +969,7 @@ func (t *http2Server) streamContextErr(s *Stream) error {  }  // WriteHeader sends the header metadata md back to the client. -func (t *http2Server) WriteHeader(s *Stream, md metadata.MD) error { +func (t *http2Server) writeHeader(s *ServerStream, md metadata.MD) error {  	s.hdrMu.Lock()  	defer s.hdrMu.Unlock()  	if s.getState() == streamDone { @@ -1006,7 +1002,7 @@ func (t *http2Server) setResetPingStrikes() {  	atomic.StoreUint32(&t.resetPingStrikes, 1)  } -func (t *http2Server) writeHeaderLocked(s *Stream) error { +func (t *http2Server) writeHeaderLocked(s *ServerStream) error {  	// TODO(mmukhi): Benchmark if the performance gets better if count the metadata and other header fields  	// first and create a slice of that exact size.  	headerFields := make([]hpack.HeaderField, 0, 2) // at least :status, content-type will be there if none else. @@ -1046,7 +1042,7 @@ func (t *http2Server) writeHeaderLocked(s *Stream) error {  // There is no further I/O operations being able to perform on this stream.  // TODO(zhaoq): Now it indicates the end of entire stream. Revisit if early  // OK is adopted. -func (t *http2Server) WriteStatus(s *Stream, st *status.Status) error { +func (t *http2Server) writeStatus(s *ServerStream, st *status.Status) error {  	s.hdrMu.Lock()  	defer s.hdrMu.Unlock() @@ -1117,11 +1113,11 @@ func (t *http2Server) WriteStatus(s *Stream, st *status.Status) error {  // Write converts the data into HTTP2 data frame and sends it out. Non-nil error  // is returns if it fails (e.g., framing error, transport error). -func (t *http2Server) Write(s *Stream, hdr []byte, data mem.BufferSlice, opts *Options) error { +func (t *http2Server) write(s *ServerStream, hdr []byte, data mem.BufferSlice, _ *WriteOptions) error {  	reader := data.Reader()  	if !s.isHeaderSent() { // Headers haven't been written yet. -		if err := t.WriteHeader(s, nil); err != nil { +		if err := t.writeHeader(s, nil); err != nil {  			_ = reader.Close()  			return err  		} @@ -1147,6 +1143,7 @@ func (t *http2Server) Write(s *Stream, hdr []byte, data mem.BufferSlice, opts *O  		_ = reader.Close()  		return err  	} +	t.incrMsgSent()  	return nil  } @@ -1238,7 +1235,7 @@ func (t *http2Server) keepalive() {  			// timeoutLeft. This will ensure that we wait only for kp.Time  			// before sending out the next ping (for cases where the ping is  			// acked). -			sleepDuration := minTime(t.kp.Time, kpTimeoutLeft) +			sleepDuration := min(t.kp.Time, kpTimeoutLeft)  			kpTimeoutLeft -= sleepDuration  			kpTimer.Reset(sleepDuration)  		case <-t.done: @@ -1276,7 +1273,7 @@ func (t *http2Server) Close(err error) {  }  // deleteStream deletes the stream s from transport's active streams. -func (t *http2Server) deleteStream(s *Stream, eosReceived bool) { +func (t *http2Server) deleteStream(s *ServerStream, eosReceived bool) {  	t.mu.Lock()  	if _, ok := t.activeStreams[s.id]; ok { @@ -1297,7 +1294,7 @@ func (t *http2Server) deleteStream(s *Stream, eosReceived bool) {  }  // finishStream closes the stream and puts the trailing headerFrame into controlbuf. -func (t *http2Server) finishStream(s *Stream, rst bool, rstCode http2.ErrCode, hdr *headerFrame, eosReceived bool) { +func (t *http2Server) finishStream(s *ServerStream, rst bool, rstCode http2.ErrCode, hdr *headerFrame, eosReceived bool) {  	// In case stream sending and receiving are invoked in separate  	// goroutines (e.g., bi-directional streaming), cancel needs to be  	// called to interrupt the potential blocking on other goroutines. @@ -1321,7 +1318,7 @@ func (t *http2Server) finishStream(s *Stream, rst bool, rstCode http2.ErrCode, h  }  // closeStream clears the footprint of a stream when the stream is not needed any more. -func (t *http2Server) closeStream(s *Stream, rst bool, rstCode http2.ErrCode, eosReceived bool) { +func (t *http2Server) closeStream(s *ServerStream, rst bool, rstCode http2.ErrCode, eosReceived bool) {  	// In case stream sending and receiving are invoked in separate  	// goroutines (e.g., bi-directional streaming), cancel needs to be  	// called to interrupt the potential blocking on other goroutines. @@ -1415,14 +1412,18 @@ func (t *http2Server) socketMetrics() *channelz.EphemeralSocketMetrics {  	}  } -func (t *http2Server) IncrMsgSent() { -	t.channelz.SocketMetrics.MessagesSent.Add(1) -	t.channelz.SocketMetrics.LastMessageSentTimestamp.Add(1) +func (t *http2Server) incrMsgSent() { +	if channelz.IsOn() { +		t.channelz.SocketMetrics.MessagesSent.Add(1) +		t.channelz.SocketMetrics.LastMessageSentTimestamp.Add(1) +	}  } -func (t *http2Server) IncrMsgRecv() { -	t.channelz.SocketMetrics.MessagesReceived.Add(1) -	t.channelz.SocketMetrics.LastMessageReceivedTimestamp.Add(1) +func (t *http2Server) incrMsgRecv() { +	if channelz.IsOn() { +		t.channelz.SocketMetrics.MessagesReceived.Add(1) +		t.channelz.SocketMetrics.LastMessageReceivedTimestamp.Add(1) +	}  }  func (t *http2Server) getOutFlowWindow() int64 { @@ -1455,7 +1456,7 @@ func getJitter(v time.Duration) time.Duration {  	}  	// Generate a jitter between +/- 10% of the value.  	r := int64(v / 10) -	j := rand.Int63n(2*r) - r +	j := rand.Int64N(2*r) - r  	return time.Duration(j)  } diff --git a/vendor/google.golang.org/grpc/internal/transport/http_util.go b/vendor/google.golang.org/grpc/internal/transport/http_util.go index f609c6c66..3613d7b64 100644 --- a/vendor/google.golang.org/grpc/internal/transport/http_util.go +++ b/vendor/google.golang.org/grpc/internal/transport/http_util.go @@ -393,7 +393,7 @@ type framer struct {  	fr     *http2.Framer  } -var writeBufferPoolMap map[int]*sync.Pool = make(map[int]*sync.Pool) +var writeBufferPoolMap = make(map[int]*sync.Pool)  var writeBufferMutex sync.Mutex  func newFramer(conn net.Conn, writeBufferSize, readBufferSize int, sharedWriteBuffer bool, maxHeaderListSize uint32) *framer { diff --git a/vendor/google.golang.org/grpc/internal/transport/server_stream.go b/vendor/google.golang.org/grpc/internal/transport/server_stream.go new file mode 100644 index 000000000..a22a90151 --- /dev/null +++ b/vendor/google.golang.org/grpc/internal/transport/server_stream.go @@ -0,0 +1,178 @@ +/* + * + * Copyright 2024 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + *     http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package transport + +import ( +	"context" +	"errors" +	"strings" +	"sync" +	"sync/atomic" + +	"google.golang.org/grpc/mem" +	"google.golang.org/grpc/metadata" +	"google.golang.org/grpc/status" +) + +// ServerStream implements streaming functionality for a gRPC server. +type ServerStream struct { +	*Stream // Embed for common stream functionality. + +	st      internalServerTransport +	ctxDone <-chan struct{}    // closed at the end of stream.  Cache of ctx.Done() (for performance) +	cancel  context.CancelFunc // invoked at the end of stream to cancel ctx. + +	// Holds compressor names passed in grpc-accept-encoding metadata from the +	// client. +	clientAdvertisedCompressors string +	headerWireLength            int + +	// hdrMu protects outgoing header and trailer metadata. +	hdrMu      sync.Mutex +	header     metadata.MD // the outgoing header metadata.  Updated by WriteHeader. +	headerSent atomic.Bool // atomically set when the headers are sent out. +} + +// Read reads an n byte message from the input stream. +func (s *ServerStream) Read(n int) (mem.BufferSlice, error) { +	b, err := s.Stream.read(n) +	if err == nil { +		s.st.incrMsgRecv() +	} +	return b, err +} + +// SendHeader sends the header metadata for the given stream. +func (s *ServerStream) SendHeader(md metadata.MD) error { +	return s.st.writeHeader(s, md) +} + +// Write writes the hdr and data bytes to the output stream. +func (s *ServerStream) Write(hdr []byte, data mem.BufferSlice, opts *WriteOptions) error { +	return s.st.write(s, hdr, data, opts) +} + +// WriteStatus sends the status of a stream to the client.  WriteStatus is +// the final call made on a stream and always occurs. +func (s *ServerStream) WriteStatus(st *status.Status) error { +	return s.st.writeStatus(s, st) +} + +// isHeaderSent indicates whether headers have been sent. +func (s *ServerStream) isHeaderSent() bool { +	return s.headerSent.Load() +} + +// updateHeaderSent updates headerSent and returns true +// if it was already set. +func (s *ServerStream) updateHeaderSent() bool { +	return s.headerSent.Swap(true) +} + +// RecvCompress returns the compression algorithm applied to the inbound +// message. It is empty string if there is no compression applied. +func (s *ServerStream) RecvCompress() string { +	return s.recvCompress +} + +// SendCompress returns the send compressor name. +func (s *ServerStream) SendCompress() string { +	return s.sendCompress +} + +// ContentSubtype returns the content-subtype for a request. For example, a +// content-subtype of "proto" will result in a content-type of +// "application/grpc+proto". This will always be lowercase.  See +// https://github.com/grpc/grpc/blob/master/doc/PROTOCOL-HTTP2.md#requests for +// more details. +func (s *ServerStream) ContentSubtype() string { +	return s.contentSubtype +} + +// SetSendCompress sets the compression algorithm to the stream. +func (s *ServerStream) SetSendCompress(name string) error { +	if s.isHeaderSent() || s.getState() == streamDone { +		return errors.New("transport: set send compressor called after headers sent or stream done") +	} + +	s.sendCompress = name +	return nil +} + +// SetContext sets the context of the stream. This will be deleted once the +// stats handler callouts all move to gRPC layer. +func (s *ServerStream) SetContext(ctx context.Context) { +	s.ctx = ctx +} + +// ClientAdvertisedCompressors returns the compressor names advertised by the +// client via grpc-accept-encoding header. +func (s *ServerStream) ClientAdvertisedCompressors() []string { +	values := strings.Split(s.clientAdvertisedCompressors, ",") +	for i, v := range values { +		values[i] = strings.TrimSpace(v) +	} +	return values +} + +// Header returns the header metadata of the stream.  It returns the out header +// after t.WriteHeader is called.  It does not block and must not be called +// until after WriteHeader. +func (s *ServerStream) Header() (metadata.MD, error) { +	// Return the header in stream. It will be the out +	// header after t.WriteHeader is called. +	return s.header.Copy(), nil +} + +// HeaderWireLength returns the size of the headers of the stream as received +// from the wire. +func (s *ServerStream) HeaderWireLength() int { +	return s.headerWireLength +} + +// SetHeader sets the header metadata. This can be called multiple times. +// This should not be called in parallel to other data writes. +func (s *ServerStream) SetHeader(md metadata.MD) error { +	if md.Len() == 0 { +		return nil +	} +	if s.isHeaderSent() || s.getState() == streamDone { +		return ErrIllegalHeaderWrite +	} +	s.hdrMu.Lock() +	s.header = metadata.Join(s.header, md) +	s.hdrMu.Unlock() +	return nil +} + +// SetTrailer sets the trailer metadata which will be sent with the RPC status +// by the server. This can be called multiple times. +// This should not be called parallel to other data writes. +func (s *ServerStream) SetTrailer(md metadata.MD) error { +	if md.Len() == 0 { +		return nil +	} +	if s.getState() == streamDone { +		return ErrIllegalHeaderWrite +	} +	s.hdrMu.Lock() +	s.trailer = metadata.Join(s.trailer, md) +	s.hdrMu.Unlock() +	return nil +} diff --git a/vendor/google.golang.org/grpc/internal/transport/transport.go b/vendor/google.golang.org/grpc/internal/transport/transport.go index fdd6fa86c..2859b8775 100644 --- a/vendor/google.golang.org/grpc/internal/transport/transport.go +++ b/vendor/google.golang.org/grpc/internal/transport/transport.go @@ -27,7 +27,6 @@ import (  	"fmt"  	"io"  	"net" -	"strings"  	"sync"  	"sync/atomic"  	"time" @@ -39,7 +38,6 @@ import (  	"google.golang.org/grpc/mem"  	"google.golang.org/grpc/metadata"  	"google.golang.org/grpc/peer" -	"google.golang.org/grpc/resolver"  	"google.golang.org/grpc/stats"  	"google.golang.org/grpc/status"  	"google.golang.org/grpc/tap" @@ -133,7 +131,7 @@ type recvBufferReader struct {  	err         error  } -func (r *recvBufferReader) ReadHeader(header []byte) (n int, err error) { +func (r *recvBufferReader) ReadMessageHeader(header []byte) (n int, err error) {  	if r.err != nil {  		return 0, r.err  	} @@ -142,9 +140,9 @@ func (r *recvBufferReader) ReadHeader(header []byte) (n int, err error) {  		return n, nil  	}  	if r.closeStream != nil { -		n, r.err = r.readHeaderClient(header) +		n, r.err = r.readMessageHeaderClient(header)  	} else { -		n, r.err = r.readHeader(header) +		n, r.err = r.readMessageHeader(header)  	}  	return n, r.err  } @@ -174,12 +172,12 @@ func (r *recvBufferReader) Read(n int) (buf mem.Buffer, err error) {  	return buf, r.err  } -func (r *recvBufferReader) readHeader(header []byte) (n int, err error) { +func (r *recvBufferReader) readMessageHeader(header []byte) (n int, err error) {  	select {  	case <-r.ctxDone:  		return 0, ContextErr(r.ctx.Err())  	case m := <-r.recv.get(): -		return r.readHeaderAdditional(m, header) +		return r.readMessageHeaderAdditional(m, header)  	}  } @@ -192,7 +190,7 @@ func (r *recvBufferReader) read(n int) (buf mem.Buffer, err error) {  	}  } -func (r *recvBufferReader) readHeaderClient(header []byte) (n int, err error) { +func (r *recvBufferReader) readMessageHeaderClient(header []byte) (n int, err error) {  	// If the context is canceled, then closes the stream with nil metadata.  	// closeStream writes its error parameter to r.recv as a recvMsg.  	// r.readAdditional acts on that message and returns the necessary error. @@ -213,9 +211,9 @@ func (r *recvBufferReader) readHeaderClient(header []byte) (n int, err error) {  		// faster.  		r.closeStream(ContextErr(r.ctx.Err()))  		m := <-r.recv.get() -		return r.readHeaderAdditional(m, header) +		return r.readMessageHeaderAdditional(m, header)  	case m := <-r.recv.get(): -		return r.readHeaderAdditional(m, header) +		return r.readMessageHeaderAdditional(m, header)  	}  } @@ -246,7 +244,7 @@ func (r *recvBufferReader) readClient(n int) (buf mem.Buffer, err error) {  	}  } -func (r *recvBufferReader) readHeaderAdditional(m recvMsg, header []byte) (n int, err error) { +func (r *recvBufferReader) readMessageHeaderAdditional(m recvMsg, header []byte) (n int, err error) {  	r.recv.load()  	if m.err != nil {  		if m.buffer != nil { @@ -288,14 +286,8 @@ const (  // Stream represents an RPC in the transport layer.  type Stream struct {  	id           uint32 -	st           ServerTransport    // nil for client side Stream -	ct           ClientTransport    // nil for server side Stream -	ctx          context.Context    // the associated context of the stream -	cancel       context.CancelFunc // always nil for client side Stream -	done         chan struct{}      // closed at the end of stream to unblock writers. On the client side. -	doneFunc     func()             // invoked at the end of stream on client side. -	ctxDone      <-chan struct{}    // same as done chan but for server side. Cache of ctx.Done() (for performance) -	method       string             // the associated RPC method of the stream +	ctx          context.Context // the associated context of the stream +	method       string          // the associated RPC method of the stream  	recvCompress string  	sendCompress string  	buf          *recvBuffer @@ -303,58 +295,17 @@ type Stream struct {  	fc           *inFlow  	wq           *writeQuota -	// Holds compressor names passed in grpc-accept-encoding metadata from the -	// client. This is empty for the client side stream. -	clientAdvertisedCompressors string  	// Callback to state application's intentions to read data. This  	// is used to adjust flow control, if needed.  	requestRead func(int) -	headerChan       chan struct{} // closed to indicate the end of header metadata. -	headerChanClosed uint32        // set when headerChan is closed. Used to avoid closing headerChan multiple times. -	// headerValid indicates whether a valid header was received.  Only -	// meaningful after headerChan is closed (always call waitOnHeader() before -	// reading its value).  Not valid on server side. -	headerValid      bool -	headerWireLength int // Only set on server side. - -	// hdrMu protects header and trailer metadata on the server-side. -	hdrMu sync.Mutex -	// On client side, header keeps the received header metadata. -	// -	// On server side, header keeps the header set by SetHeader(). The complete -	// header will merged into this after t.WriteHeader() is called. -	header  metadata.MD -	trailer metadata.MD // the key-value map of trailer metadata. - -	noHeaders bool // set if the client never received headers (set only after the stream is done). - -	// On the server-side, headerSent is atomically set to 1 when the headers are sent out. -	headerSent uint32 -  	state streamState -	// On client-side it is the status error received from the server. -	// On server-side it is unused. -	status *status.Status - -	bytesReceived uint32 // indicates whether any bytes have been received on this stream -	unprocessed   uint32 // set if the server sends a refused stream or GOAWAY including this stream -  	// contentSubtype is the content-subtype for requests.  	// this must be lowercase or the behavior is undefined.  	contentSubtype string -} - -// isHeaderSent is only valid on the server-side. -func (s *Stream) isHeaderSent() bool { -	return atomic.LoadUint32(&s.headerSent) == 1 -} -// updateHeaderSent updates headerSent and returns true -// if it was already set. It is valid only on server-side. -func (s *Stream) updateHeaderSent() bool { -	return atomic.SwapUint32(&s.headerSent, 1) == 1 +	trailer metadata.MD // the key-value map of trailer metadata.  }  func (s *Stream) swapState(st streamState) streamState { @@ -369,110 +320,12 @@ func (s *Stream) getState() streamState {  	return streamState(atomic.LoadUint32((*uint32)(&s.state)))  } -func (s *Stream) waitOnHeader() { -	if s.headerChan == nil { -		// On the server headerChan is always nil since a stream originates -		// only after having received headers. -		return -	} -	select { -	case <-s.ctx.Done(): -		// Close the stream to prevent headers/trailers from changing after -		// this function returns. -		s.ct.CloseStream(s, ContextErr(s.ctx.Err())) -		// headerChan could possibly not be closed yet if closeStream raced -		// with operateHeaders; wait until it is closed explicitly here. -		<-s.headerChan -	case <-s.headerChan: -	} -} - -// RecvCompress returns the compression algorithm applied to the inbound -// message. It is empty string if there is no compression applied. -func (s *Stream) RecvCompress() string { -	s.waitOnHeader() -	return s.recvCompress -} - -// SetSendCompress sets the compression algorithm to the stream. -func (s *Stream) SetSendCompress(name string) error { -	if s.isHeaderSent() || s.getState() == streamDone { -		return errors.New("transport: set send compressor called after headers sent or stream done") -	} - -	s.sendCompress = name -	return nil -} - -// SendCompress returns the send compressor name. -func (s *Stream) SendCompress() string { -	return s.sendCompress -} - -// ClientAdvertisedCompressors returns the compressor names advertised by the -// client via grpc-accept-encoding header. -func (s *Stream) ClientAdvertisedCompressors() []string { -	values := strings.Split(s.clientAdvertisedCompressors, ",") -	for i, v := range values { -		values[i] = strings.TrimSpace(v) -	} -	return values -} - -// Done returns a channel which is closed when it receives the final status -// from the server. -func (s *Stream) Done() <-chan struct{} { -	return s.done -} - -// Header returns the header metadata of the stream. -// -// On client side, it acquires the key-value pairs of header metadata once it is -// available. It blocks until i) the metadata is ready or ii) there is no header -// metadata or iii) the stream is canceled/expired. -// -// On server side, it returns the out header after t.WriteHeader is called.  It -// does not block and must not be called until after WriteHeader. -func (s *Stream) Header() (metadata.MD, error) { -	if s.headerChan == nil { -		// On server side, return the header in stream. It will be the out -		// header after t.WriteHeader is called. -		return s.header.Copy(), nil -	} -	s.waitOnHeader() - -	if !s.headerValid || s.noHeaders { -		return nil, s.status.Err() -	} - -	return s.header.Copy(), nil -} - -// TrailersOnly blocks until a header or trailers-only frame is received and -// then returns true if the stream was trailers-only.  If the stream ends -// before headers are received, returns true, nil.  Client-side only. -func (s *Stream) TrailersOnly() bool { -	s.waitOnHeader() -	return s.noHeaders -} -  // Trailer returns the cached trailer metadata. Note that if it is not called -// after the entire stream is done, it could return an empty MD. Client -// side only. +// after the entire stream is done, it could return an empty MD.  // It can be safely read only after stream has ended that is either read  // or write have returned io.EOF.  func (s *Stream) Trailer() metadata.MD { -	c := s.trailer.Copy() -	return c -} - -// ContentSubtype returns the content-subtype for a request. For example, a -// content-subtype of "proto" will result in a content-type of -// "application/grpc+proto". This will always be lowercase.  See -// https://github.com/grpc/grpc/blob/master/doc/PROTOCOL-HTTP2.md#requests for -// more details. -func (s *Stream) ContentSubtype() string { -	return s.contentSubtype +	return s.trailer.Copy()  }  // Context returns the context of the stream. @@ -480,81 +333,31 @@ func (s *Stream) Context() context.Context {  	return s.ctx  } -// SetContext sets the context of the stream. This will be deleted once the -// stats handler callouts all move to gRPC layer. -func (s *Stream) SetContext(ctx context.Context) { -	s.ctx = ctx -} -  // Method returns the method for the stream.  func (s *Stream) Method() string {  	return s.method  } -// Status returns the status received from the server. -// Status can be read safely only after the stream has ended, -// that is, after Done() is closed. -func (s *Stream) Status() *status.Status { -	return s.status -} - -// HeaderWireLength returns the size of the headers of the stream as received -// from the wire. Valid only on the server. -func (s *Stream) HeaderWireLength() int { -	return s.headerWireLength -} - -// SetHeader sets the header metadata. This can be called multiple times. -// Server side only. -// This should not be called in parallel to other data writes. -func (s *Stream) SetHeader(md metadata.MD) error { -	if md.Len() == 0 { -		return nil -	} -	if s.isHeaderSent() || s.getState() == streamDone { -		return ErrIllegalHeaderWrite -	} -	s.hdrMu.Lock() -	s.header = metadata.Join(s.header, md) -	s.hdrMu.Unlock() -	return nil -} - -// SendHeader sends the given header metadata. The given metadata is -// combined with any metadata set by previous calls to SetHeader and -// then written to the transport stream. -func (s *Stream) SendHeader(md metadata.MD) error { -	return s.st.WriteHeader(s, md) -} - -// SetTrailer sets the trailer metadata which will be sent with the RPC status -// by the server. This can be called multiple times. Server side only. -// This should not be called parallel to other data writes. -func (s *Stream) SetTrailer(md metadata.MD) error { -	if md.Len() == 0 { -		return nil -	} -	if s.getState() == streamDone { -		return ErrIllegalHeaderWrite -	} -	s.hdrMu.Lock() -	s.trailer = metadata.Join(s.trailer, md) -	s.hdrMu.Unlock() -	return nil -} -  func (s *Stream) write(m recvMsg) {  	s.buf.put(m)  } -func (s *Stream) ReadHeader(header []byte) (err error) { +// ReadMessageHeader reads data into the provided header slice from the stream. +// It first checks if there was an error during a previous read operation and +// returns it if present. It then requests a read operation for the length of +// the header. It continues to read from the stream until the entire header +// slice is filled or an error occurs. If an `io.EOF` error is encountered with +// partially read data, it is converted to `io.ErrUnexpectedEOF` to indicate an +// unexpected end of the stream. The method returns any error encountered during +// the read process or nil if the header was successfully read. +func (s *Stream) ReadMessageHeader(header []byte) (err error) {  	// Don't request a read if there was an error earlier  	if er := s.trReader.er; er != nil {  		return er  	}  	s.requestRead(len(header))  	for len(header) != 0 { -		n, err := s.trReader.ReadHeader(header) +		n, err := s.trReader.ReadMessageHeader(header)  		header = header[n:]  		if len(header) == 0 {  			err = nil @@ -570,7 +373,7 @@ func (s *Stream) ReadHeader(header []byte) (err error) {  }  // Read reads n bytes from the wire for this stream. -func (s *Stream) Read(n int) (data mem.BufferSlice, err error) { +func (s *Stream) read(n int) (data mem.BufferSlice, err error) {  	// Don't request a read if there was an error earlier  	if er := s.trReader.er; er != nil {  		return nil, er @@ -610,13 +413,13 @@ type transportReader struct {  	er            error  } -func (t *transportReader) ReadHeader(header []byte) (int, error) { -	n, err := t.reader.ReadHeader(header) +func (t *transportReader) ReadMessageHeader(header []byte) (int, error) { +	n, err := t.reader.ReadMessageHeader(header)  	if err != nil {  		t.er = err  		return 0, err  	} -	t.windowHandler(len(header)) +	t.windowHandler(n)  	return n, nil  } @@ -630,17 +433,6 @@ func (t *transportReader) Read(n int) (mem.Buffer, error) {  	return buf, nil  } -// BytesReceived indicates whether any bytes have been received on this stream. -func (s *Stream) BytesReceived() bool { -	return atomic.LoadUint32(&s.bytesReceived) == 1 -} - -// Unprocessed indicates whether the server did not process this stream -- -// i.e. it sent a refused stream or GOAWAY including this stream ID. -func (s *Stream) Unprocessed() bool { -	return atomic.LoadUint32(&s.unprocessed) == 1 -} -  // GoString is implemented by Stream so context.String() won't  // race when printing %#v.  func (s *Stream) GoString() string { @@ -716,15 +508,9 @@ type ConnectOptions struct {  	BufferPool mem.BufferPool  } -// NewClientTransport establishes the transport with the required ConnectOptions -// and returns it to the caller. -func NewClientTransport(connectCtx, ctx context.Context, addr resolver.Address, opts ConnectOptions, onClose func(GoAwayReason)) (ClientTransport, error) { -	return newHTTP2Client(connectCtx, ctx, addr, opts, onClose) -} - -// Options provides additional hints and information for message +// WriteOptions provides additional hints and information for message  // transmission. -type Options struct { +type WriteOptions struct {  	// Last indicates whether this write is the last piece for  	// this stream.  	Last bool @@ -773,18 +559,8 @@ type ClientTransport interface {  	// It does not block.  	GracefulClose() -	// Write sends the data for the given stream. A nil stream indicates -	// the write is to be performed on the transport as a whole. -	Write(s *Stream, hdr []byte, data mem.BufferSlice, opts *Options) error -  	// NewStream creates a Stream for an RPC. -	NewStream(ctx context.Context, callHdr *CallHdr) (*Stream, error) - -	// CloseStream clears the footprint of a stream when the stream is -	// not needed any more. The err indicates the error incurred when -	// CloseStream is called. Must be called when a stream is finished -	// unless the associated transport is closing. -	CloseStream(stream *Stream, err error) +	NewStream(ctx context.Context, callHdr *CallHdr) (*ClientStream, error)  	// Error returns a channel that is closed when some I/O error  	// happens. Typically the caller should have a goroutine to monitor @@ -804,12 +580,6 @@ type ClientTransport interface {  	// RemoteAddr returns the remote network address.  	RemoteAddr() net.Addr - -	// IncrMsgSent increments the number of message sent through this transport. -	IncrMsgSent() - -	// IncrMsgRecv increments the number of message received through this transport. -	IncrMsgRecv()  }  // ServerTransport is the common interface for all gRPC server-side transport @@ -819,19 +589,7 @@ type ClientTransport interface {  // Write methods for a given Stream will be called serially.  type ServerTransport interface {  	// HandleStreams receives incoming streams using the given handler. -	HandleStreams(context.Context, func(*Stream)) - -	// WriteHeader sends the header metadata for the given stream. -	// WriteHeader may not be called on all streams. -	WriteHeader(s *Stream, md metadata.MD) error - -	// Write sends the data for the given stream. -	// Write may not be called on all streams. -	Write(s *Stream, hdr []byte, data mem.BufferSlice, opts *Options) error - -	// WriteStatus sends the status of a stream to the client.  WriteStatus is -	// the final call made on a stream and always occurs. -	WriteStatus(s *Stream, st *status.Status) error +	HandleStreams(context.Context, func(*ServerStream))  	// Close tears down the transport. Once it is called, the transport  	// should not be accessed any more. All the pending streams and their @@ -843,12 +601,14 @@ type ServerTransport interface {  	// Drain notifies the client this ServerTransport stops accepting new RPCs.  	Drain(debugData string) +} -	// IncrMsgSent increments the number of message sent through this transport. -	IncrMsgSent() - -	// IncrMsgRecv increments the number of message received through this transport. -	IncrMsgRecv() +type internalServerTransport interface { +	ServerTransport +	writeHeader(s *ServerStream, md metadata.MD) error +	write(s *ServerStream, hdr []byte, data mem.BufferSlice, opts *WriteOptions) error +	writeStatus(s *ServerStream, st *status.Status) error +	incrMsgRecv()  }  // connectionErrorf creates an ConnectionError with the specified error description. diff --git a/vendor/google.golang.org/grpc/keepalive/keepalive.go b/vendor/google.golang.org/grpc/keepalive/keepalive.go index 34d31b5e7..eb42b19fb 100644 --- a/vendor/google.golang.org/grpc/keepalive/keepalive.go +++ b/vendor/google.golang.org/grpc/keepalive/keepalive.go @@ -34,15 +34,29 @@ type ClientParameters struct {  	// After a duration of this time if the client doesn't see any activity it  	// pings the server to see if the transport is still alive.  	// If set below 10s, a minimum value of 10s will be used instead. -	Time time.Duration // The current default value is infinity. +	// +	// Note that gRPC servers have a default EnforcementPolicy.MinTime of 5 +	// minutes (which means the client shouldn't ping more frequently than every +	// 5 minutes). +	// +	// Though not ideal, it's not a strong requirement for Time to be less than +	// EnforcementPolicy.MinTime.  Time will automatically double if the server +	// disconnects due to its enforcement policy. +	// +	// For more details, see +	// https://github.com/grpc/proposal/blob/master/A8-client-side-keepalive.md +	Time time.Duration  	// After having pinged for keepalive check, the client waits for a duration  	// of Timeout and if no activity is seen even after that the connection is  	// closed. -	Timeout time.Duration // The current default value is 20 seconds. +	// +	// If keepalive is enabled, and this value is not explicitly set, the default +	// is 20 seconds. +	Timeout time.Duration  	// If true, client sends keepalive pings even with no active RPCs. If false,  	// when there are no active RPCs, Time and Timeout will be ignored and no  	// keepalive pings will be sent. -	PermitWithoutStream bool // false by default. +	PermitWithoutStream bool  }  // ServerParameters is used to set keepalive and max-age parameters on the diff --git a/vendor/google.golang.org/grpc/mem/buffer_slice.go b/vendor/google.golang.org/grpc/mem/buffer_slice.go index d7775cea6..65002e2cc 100644 --- a/vendor/google.golang.org/grpc/mem/buffer_slice.go +++ b/vendor/google.golang.org/grpc/mem/buffer_slice.go @@ -19,10 +19,14 @@  package mem  import ( -	"compress/flate"  	"io"  ) +const ( +	// 32 KiB is what io.Copy uses. +	readAllBufSize = 32 * 1024 +) +  // BufferSlice offers a means to represent data that spans one or more Buffer  // instances. A BufferSlice is meant to be immutable after creation, and methods  // like Ref create and return copies of the slice. This is why all methods have @@ -92,9 +96,11 @@ func (s BufferSlice) Materialize() []byte {  }  // MaterializeToBuffer functions like Materialize except that it writes the data -// to a single Buffer pulled from the given BufferPool. As a special case, if the -// input BufferSlice only actually has one Buffer, this function has nothing to -// do and simply returns said Buffer. +// to a single Buffer pulled from the given BufferPool. +// +// As a special case, if the input BufferSlice only actually has one Buffer, this +// function simply increases the refcount before returning said Buffer. Freeing this +// buffer won't release it until the BufferSlice is itself released.  func (s BufferSlice) MaterializeToBuffer(pool BufferPool) Buffer {  	if len(s) == 1 {  		s[0].Ref() @@ -124,7 +130,8 @@ func (s BufferSlice) Reader() Reader {  // Remaining(), which returns the number of unread bytes remaining in the slice.  // Buffers will be freed as they are read.  type Reader interface { -	flate.Reader +	io.Reader +	io.ByteReader  	// Close frees the underlying BufferSlice and never returns an error. Subsequent  	// calls to Read will return (0, io.EOF).  	Close() error @@ -217,8 +224,58 @@ func (w *writer) Write(p []byte) (n int, err error) {  // NewWriter wraps the given BufferSlice and BufferPool to implement the  // io.Writer interface. Every call to Write copies the contents of the given -// buffer into a new Buffer pulled from the given pool and the Buffer is added to -// the given BufferSlice. +// buffer into a new Buffer pulled from the given pool and the Buffer is +// added to the given BufferSlice.  func NewWriter(buffers *BufferSlice, pool BufferPool) io.Writer {  	return &writer{buffers: buffers, pool: pool}  } + +// ReadAll reads from r until an error or EOF and returns the data it read. +// A successful call returns err == nil, not err == EOF. Because ReadAll is +// defined to read from src until EOF, it does not treat an EOF from Read +// as an error to be reported. +// +// Important: A failed call returns a non-nil error and may also return +// partially read buffers. It is the responsibility of the caller to free the +// BufferSlice returned, or its memory will not be reused. +func ReadAll(r io.Reader, pool BufferPool) (BufferSlice, error) { +	var result BufferSlice +	if wt, ok := r.(io.WriterTo); ok { +		// This is more optimal since wt knows the size of chunks it wants to +		// write and, hence, we can allocate buffers of an optimal size to fit +		// them. E.g. might be a single big chunk, and we wouldn't chop it +		// into pieces. +		w := NewWriter(&result, pool) +		_, err := wt.WriteTo(w) +		return result, err +	} +nextBuffer: +	for { +		buf := pool.Get(readAllBufSize) +		// We asked for 32KiB but may have been given a bigger buffer. +		// Use all of it if that's the case. +		*buf = (*buf)[:cap(*buf)] +		usedCap := 0 +		for { +			n, err := r.Read((*buf)[usedCap:]) +			usedCap += n +			if err != nil { +				if usedCap == 0 { +					// Nothing in this buf, put it back +					pool.Put(buf) +				} else { +					*buf = (*buf)[:usedCap] +					result = append(result, NewBuffer(buf, pool)) +				} +				if err == io.EOF { +					err = nil +				} +				return result, err +			} +			if len(*buf) == usedCap { +				result = append(result, NewBuffer(buf, pool)) +				continue nextBuffer +			} +		} +	} +} diff --git a/vendor/google.golang.org/grpc/mem/buffers.go b/vendor/google.golang.org/grpc/mem/buffers.go index 975ceb718..ecbf0b9a7 100644 --- a/vendor/google.golang.org/grpc/mem/buffers.go +++ b/vendor/google.golang.org/grpc/mem/buffers.go @@ -65,6 +65,9 @@ var (  	refObjectPool    = sync.Pool{New: func() any { return new(atomic.Int32) }}  ) +// IsBelowBufferPoolingThreshold returns true if the given size is less than or +// equal to the threshold for buffer pooling. This is used to determine whether +// to pool buffers or allocate them directly.  func IsBelowBufferPoolingThreshold(size int) bool {  	return size <= bufferPoolingThreshold  } @@ -89,7 +92,11 @@ func newBuffer() *buffer {  //  // Note that the backing array of the given data is not copied.  func NewBuffer(data *[]byte, pool BufferPool) Buffer { -	if pool == nil || IsBelowBufferPoolingThreshold(len(*data)) { +	// Use the buffer's capacity instead of the length, otherwise buffers may +	// not be reused under certain conditions. For example, if a large buffer +	// is acquired from the pool, but fewer bytes than the buffering threshold +	// are written to it, the buffer will not be returned to the pool. +	if pool == nil || IsBelowBufferPoolingThreshold(cap(*data)) {  		return (SliceBuffer)(*data)  	}  	b := newBuffer() @@ -194,19 +201,19 @@ func (b *buffer) read(buf []byte) (int, Buffer) {  	return n, b  } -// String returns a string representation of the buffer. May be used for -// debugging purposes.  func (b *buffer) String() string {  	return fmt.Sprintf("mem.Buffer(%p, data: %p, length: %d)", b, b.ReadOnlyData(), len(b.ReadOnlyData()))  } +// ReadUnsafe reads bytes from the given Buffer into the provided slice. +// It does not perform safety checks.  func ReadUnsafe(dst []byte, buf Buffer) (int, Buffer) {  	return buf.read(dst)  }  // SplitUnsafe modifies the receiver to point to the first n bytes while it -// returns a new reference to the remaining bytes. The returned Buffer functions -// just like a normal reference acquired using Ref(). +// returns a new reference to the remaining bytes. The returned Buffer +// functions just like a normal reference acquired using Ref().  func SplitUnsafe(buf Buffer, n int) (left, right Buffer) {  	return buf.split(n)  } @@ -224,20 +231,29 @@ func (e emptyBuffer) Len() int {  	return 0  } -func (e emptyBuffer) split(n int) (left, right Buffer) { +func (e emptyBuffer) split(int) (left, right Buffer) {  	return e, e  } -func (e emptyBuffer) read(buf []byte) (int, Buffer) { +func (e emptyBuffer) read([]byte) (int, Buffer) {  	return 0, e  } +// SliceBuffer is a Buffer implementation that wraps a byte slice. It provides +// methods for reading, splitting, and managing the byte slice.  type SliceBuffer []byte +// ReadOnlyData returns the byte slice.  func (s SliceBuffer) ReadOnlyData() []byte { return s } -func (s SliceBuffer) Ref()                 {} -func (s SliceBuffer) Free()                {} -func (s SliceBuffer) Len() int             { return len(s) } + +// Ref is a noop implementation of Ref. +func (s SliceBuffer) Ref() {} + +// Free is a noop implementation of Free. +func (s SliceBuffer) Free() {} + +// Len is a noop implementation of Len. +func (s SliceBuffer) Len() int { return len(s) }  func (s SliceBuffer) split(n int) (left, right Buffer) {  	return s[:n], s[n:] diff --git a/vendor/google.golang.org/grpc/preloader.go b/vendor/google.golang.org/grpc/preloader.go index e87a17f36..ee0ff969a 100644 --- a/vendor/google.golang.org/grpc/preloader.go +++ b/vendor/google.golang.org/grpc/preloader.go @@ -62,7 +62,7 @@ func (p *PreparedMsg) Encode(s Stream, msg any) error {  	materializedData := data.Materialize()  	data.Free() -	p.encodedData = mem.BufferSlice{mem.NewBuffer(&materializedData, nil)} +	p.encodedData = mem.BufferSlice{mem.SliceBuffer(materializedData)}  	// TODO: it should be possible to grab the bufferPool from the underlying  	//  stream implementation with a type cast to its actual type (such as @@ -76,7 +76,7 @@ func (p *PreparedMsg) Encode(s Stream, msg any) error {  	if p.pf.isCompressed() {  		materializedCompData := compData.Materialize()  		compData.Free() -		compData = mem.BufferSlice{mem.NewBuffer(&materializedCompData, nil)} +		compData = mem.BufferSlice{mem.SliceBuffer(materializedCompData)}  	}  	p.hdr, p.payload = msgHeader(p.encodedData, compData, p.pf) diff --git a/vendor/google.golang.org/grpc/resolver/resolver.go b/vendor/google.golang.org/grpc/resolver/resolver.go index 202854511..8eb1cf3bc 100644 --- a/vendor/google.golang.org/grpc/resolver/resolver.go +++ b/vendor/google.golang.org/grpc/resolver/resolver.go @@ -22,6 +22,7 @@ package resolver  import (  	"context" +	"errors"  	"fmt"  	"net"  	"net/url" @@ -237,8 +238,8 @@ type ClientConn interface {  	// UpdateState can be omitted.  	UpdateState(State) error  	// ReportError notifies the ClientConn that the Resolver encountered an -	// error.  The ClientConn will notify the load balancer and begin calling -	// ResolveNow on the Resolver with exponential backoff. +	// error. The ClientConn then forwards this error to the load balancing +	// policy.  	ReportError(error)  	// NewAddress is called by resolver to notify ClientConn a new list  	// of resolved addresses. @@ -330,3 +331,20 @@ type AuthorityOverrider interface {  	// typically in line, and must keep it unchanged.  	OverrideAuthority(Target) string  } + +// ValidateEndpoints validates endpoints from a petiole policy's perspective. +// Petiole policies should call this before calling into their children. See +// [gRPC A61](https://github.com/grpc/proposal/blob/master/A61-IPv4-IPv6-dualstack-backends.md) +// for details. +func ValidateEndpoints(endpoints []Endpoint) error { +	if len(endpoints) == 0 { +		return errors.New("endpoints list is empty") +	} + +	for _, endpoint := range endpoints { +		for range endpoint.Addresses { +			return nil +		} +	} +	return errors.New("endpoints list contains no addresses") +} diff --git a/vendor/google.golang.org/grpc/rpc_util.go b/vendor/google.golang.org/grpc/rpc_util.go index db8865ec3..9fac2b08b 100644 --- a/vendor/google.golang.org/grpc/rpc_util.go +++ b/vendor/google.golang.org/grpc/rpc_util.go @@ -220,8 +220,8 @@ type HeaderCallOption struct {  	HeaderAddr *metadata.MD  } -func (o HeaderCallOption) before(c *callInfo) error { return nil } -func (o HeaderCallOption) after(c *callInfo, attempt *csAttempt) { +func (o HeaderCallOption) before(*callInfo) error { return nil } +func (o HeaderCallOption) after(_ *callInfo, attempt *csAttempt) {  	*o.HeaderAddr, _ = attempt.s.Header()  } @@ -242,8 +242,8 @@ type TrailerCallOption struct {  	TrailerAddr *metadata.MD  } -func (o TrailerCallOption) before(c *callInfo) error { return nil } -func (o TrailerCallOption) after(c *callInfo, attempt *csAttempt) { +func (o TrailerCallOption) before(*callInfo) error { return nil } +func (o TrailerCallOption) after(_ *callInfo, attempt *csAttempt) {  	*o.TrailerAddr = attempt.s.Trailer()  } @@ -264,8 +264,8 @@ type PeerCallOption struct {  	PeerAddr *peer.Peer  } -func (o PeerCallOption) before(c *callInfo) error { return nil } -func (o PeerCallOption) after(c *callInfo, attempt *csAttempt) { +func (o PeerCallOption) before(*callInfo) error { return nil } +func (o PeerCallOption) after(_ *callInfo, attempt *csAttempt) {  	if x, ok := peer.FromContext(attempt.s.Context()); ok {  		*o.PeerAddr = *x  	} @@ -304,7 +304,7 @@ func (o FailFastCallOption) before(c *callInfo) error {  	c.failFast = o.FailFast  	return nil  } -func (o FailFastCallOption) after(c *callInfo, attempt *csAttempt) {} +func (o FailFastCallOption) after(*callInfo, *csAttempt) {}  // OnFinish returns a CallOption that configures a callback to be called when  // the call completes. The error passed to the callback is the status of the @@ -339,7 +339,7 @@ func (o OnFinishCallOption) before(c *callInfo) error {  	return nil  } -func (o OnFinishCallOption) after(c *callInfo, attempt *csAttempt) {} +func (o OnFinishCallOption) after(*callInfo, *csAttempt) {}  // MaxCallRecvMsgSize returns a CallOption which sets the maximum message size  // in bytes the client can receive. If this is not set, gRPC uses the default @@ -363,7 +363,7 @@ func (o MaxRecvMsgSizeCallOption) before(c *callInfo) error {  	c.maxReceiveMessageSize = &o.MaxRecvMsgSize  	return nil  } -func (o MaxRecvMsgSizeCallOption) after(c *callInfo, attempt *csAttempt) {} +func (o MaxRecvMsgSizeCallOption) after(*callInfo, *csAttempt) {}  // MaxCallSendMsgSize returns a CallOption which sets the maximum message size  // in bytes the client can send. If this is not set, gRPC uses the default @@ -387,7 +387,7 @@ func (o MaxSendMsgSizeCallOption) before(c *callInfo) error {  	c.maxSendMessageSize = &o.MaxSendMsgSize  	return nil  } -func (o MaxSendMsgSizeCallOption) after(c *callInfo, attempt *csAttempt) {} +func (o MaxSendMsgSizeCallOption) after(*callInfo, *csAttempt) {}  // PerRPCCredentials returns a CallOption that sets credentials.PerRPCCredentials  // for a call. @@ -410,7 +410,7 @@ func (o PerRPCCredsCallOption) before(c *callInfo) error {  	c.creds = o.Creds  	return nil  } -func (o PerRPCCredsCallOption) after(c *callInfo, attempt *csAttempt) {} +func (o PerRPCCredsCallOption) after(*callInfo, *csAttempt) {}  // UseCompressor returns a CallOption which sets the compressor used when  // sending the request.  If WithCompressor is also set, UseCompressor has @@ -438,7 +438,7 @@ func (o CompressorCallOption) before(c *callInfo) error {  	c.compressorType = o.CompressorType  	return nil  } -func (o CompressorCallOption) after(c *callInfo, attempt *csAttempt) {} +func (o CompressorCallOption) after(*callInfo, *csAttempt) {}  // CallContentSubtype returns a CallOption that will set the content-subtype  // for a call. For example, if content-subtype is "json", the Content-Type over @@ -475,7 +475,7 @@ func (o ContentSubtypeCallOption) before(c *callInfo) error {  	c.contentSubtype = o.ContentSubtype  	return nil  } -func (o ContentSubtypeCallOption) after(c *callInfo, attempt *csAttempt) {} +func (o ContentSubtypeCallOption) after(*callInfo, *csAttempt) {}  // ForceCodec returns a CallOption that will set codec to be used for all  // request and response messages for a call. The result of calling Name() will @@ -514,7 +514,7 @@ func (o ForceCodecCallOption) before(c *callInfo) error {  	c.codec = newCodecV1Bridge(o.Codec)  	return nil  } -func (o ForceCodecCallOption) after(c *callInfo, attempt *csAttempt) {} +func (o ForceCodecCallOption) after(*callInfo, *csAttempt) {}  // ForceCodecV2 returns a CallOption that will set codec to be used for all  // request and response messages for a call. The result of calling Name() will @@ -554,7 +554,7 @@ func (o ForceCodecV2CallOption) before(c *callInfo) error {  	return nil  } -func (o ForceCodecV2CallOption) after(c *callInfo, attempt *csAttempt) {} +func (o ForceCodecV2CallOption) after(*callInfo, *csAttempt) {}  // CallCustomCodec behaves like ForceCodec, but accepts a grpc.Codec instead of  // an encoding.Codec. @@ -579,7 +579,7 @@ func (o CustomCodecCallOption) before(c *callInfo) error {  	c.codec = newCodecV0Bridge(o.Codec)  	return nil  } -func (o CustomCodecCallOption) after(c *callInfo, attempt *csAttempt) {} +func (o CustomCodecCallOption) after(*callInfo, *csAttempt) {}  // MaxRetryRPCBufferSize returns a CallOption that limits the amount of memory  // used for buffering this RPC's requests for retry purposes. @@ -607,7 +607,7 @@ func (o MaxRetryRPCBufferSizeCallOption) before(c *callInfo) error {  	c.maxRetryRPCBufferSize = o.MaxRetryRPCBufferSize  	return nil  } -func (o MaxRetryRPCBufferSizeCallOption) after(c *callInfo, attempt *csAttempt) {} +func (o MaxRetryRPCBufferSizeCallOption) after(*callInfo, *csAttempt) {}  // The format of the payload: compressed or not?  type payloadFormat uint8 @@ -622,7 +622,7 @@ func (pf payloadFormat) isCompressed() bool {  }  type streamReader interface { -	ReadHeader(header []byte) error +	ReadMessageHeader(header []byte) error  	Read(n int) (mem.BufferSlice, error)  } @@ -656,7 +656,7 @@ type parser struct {  // that the underlying streamReader must not return an incompatible  // error.  func (p *parser) recvMsg(maxReceiveMessageSize int) (payloadFormat, mem.BufferSlice, error) { -	err := p.r.ReadHeader(p.header[:]) +	err := p.r.ReadMessageHeader(p.header[:])  	if err != nil {  		return 0, nil, err  	} @@ -664,9 +664,6 @@ func (p *parser) recvMsg(maxReceiveMessageSize int) (payloadFormat, mem.BufferSl  	pf := payloadFormat(p.header[0])  	length := binary.BigEndian.Uint32(p.header[1:]) -	if length == 0 { -		return pf, nil, nil -	}  	if int64(length) > int64(maxInt) {  		return 0, nil, status.Errorf(codes.ResourceExhausted, "grpc: received message larger than max length allowed on current machine (%d vs. %d)", length, maxInt)  	} @@ -791,9 +788,8 @@ func checkRecvPayload(pf payloadFormat, recvCompress string, haveCompressor bool  		if !haveCompressor {  			if isServer {  				return status.Newf(codes.Unimplemented, "grpc: Decompressor is not installed for grpc-encoding %q", recvCompress) -			} else { -				return status.Newf(codes.Internal, "grpc: Decompressor is not installed for grpc-encoding %q", recvCompress)  			} +			return status.Newf(codes.Internal, "grpc: Decompressor is not installed for grpc-encoding %q", recvCompress)  		}  	default:  		return status.Newf(codes.Internal, "grpc: received unexpected payload format %d", pf) @@ -818,7 +814,7 @@ func (p *payloadInfo) free() {  // the buffer is no longer needed.  // TODO: Refactor this function to reduce the number of arguments.  // See: https://google.github.io/styleguide/go/best-practices.html#function-argument-lists -func recvAndDecompress(p *parser, s *transport.Stream, dc Decompressor, maxReceiveMessageSize int, payInfo *payloadInfo, compressor encoding.Compressor, isServer bool, +func recvAndDecompress(p *parser, s recvCompressor, dc Decompressor, maxReceiveMessageSize int, payInfo *payloadInfo, compressor encoding.Compressor, isServer bool,  ) (out mem.BufferSlice, err error) {  	pf, compressed, err := p.recvMsg(maxReceiveMessageSize)  	if err != nil { @@ -842,7 +838,7 @@ func recvAndDecompress(p *parser, s *transport.Stream, dc Decompressor, maxRecei  			var uncompressedBuf []byte  			uncompressedBuf, err = dc.Do(compressed.Reader())  			if err == nil { -				out = mem.BufferSlice{mem.NewBuffer(&uncompressedBuf, nil)} +				out = mem.BufferSlice{mem.SliceBuffer(uncompressedBuf)}  			}  			size = len(uncompressedBuf)  		} else { @@ -878,30 +874,7 @@ func decompress(compressor encoding.Compressor, d mem.BufferSlice, maxReceiveMes  		return nil, 0, err  	} -	// TODO: Can/should this still be preserved with the new BufferSlice API? Are -	//  there any actual benefits to allocating a single large buffer instead of -	//  multiple smaller ones? -	//if sizer, ok := compressor.(interface { -	//	DecompressedSize(compressedBytes []byte) int -	//}); ok { -	//	if size := sizer.DecompressedSize(d); size >= 0 { -	//		if size > maxReceiveMessageSize { -	//			return nil, size, nil -	//		} -	//		// size is used as an estimate to size the buffer, but we -	//		// will read more data if available. -	//		// +MinRead so ReadFrom will not reallocate if size is correct. -	//		// -	//		// TODO: If we ensure that the buffer size is the same as the DecompressedSize, -	//		// we can also utilize the recv buffer pool here. -	//		buf := bytes.NewBuffer(make([]byte, 0, size+bytes.MinRead)) -	//		bytesRead, err := buf.ReadFrom(io.LimitReader(dcReader, int64(maxReceiveMessageSize)+1)) -	//		return buf.Bytes(), int(bytesRead), err -	//	} -	//} - -	var out mem.BufferSlice -	_, err = io.Copy(mem.NewWriter(&out, pool), io.LimitReader(dcReader, int64(maxReceiveMessageSize)+1)) +	out, err := mem.ReadAll(io.LimitReader(dcReader, int64(maxReceiveMessageSize)+1), pool)  	if err != nil {  		out.Free()  		return nil, 0, err @@ -909,10 +882,14 @@ func decompress(compressor encoding.Compressor, d mem.BufferSlice, maxReceiveMes  	return out, out.Len(), nil  } +type recvCompressor interface { +	RecvCompress() string +} +  // For the two compressor parameters, both should not be set, but if they are,  // dc takes precedence over compressor.  // TODO(dfawley): wrap the old compressor/decompressor using the new API? -func recv(p *parser, c baseCodec, s *transport.Stream, dc Decompressor, m any, maxReceiveMessageSize int, payInfo *payloadInfo, compressor encoding.Compressor, isServer bool) error { +func recv(p *parser, c baseCodec, s recvCompressor, dc Decompressor, m any, maxReceiveMessageSize int, payInfo *payloadInfo, compressor encoding.Compressor, isServer bool) error {  	data, err := recvAndDecompress(p, s, dc, maxReceiveMessageSize, payInfo, compressor, isServer)  	if err != nil {  		return err diff --git a/vendor/google.golang.org/grpc/server.go b/vendor/google.golang.org/grpc/server.go index 457d27338..16065a027 100644 --- a/vendor/google.golang.org/grpc/server.go +++ b/vendor/google.golang.org/grpc/server.go @@ -87,12 +87,13 @@ func init() {  var statusOK = status.New(codes.OK, "")  var logger = grpclog.Component("core") -type methodHandler func(srv any, ctx context.Context, dec func(any) error, interceptor UnaryServerInterceptor) (any, error) +// MethodHandler is a function type that processes a unary RPC method call. +type MethodHandler func(srv any, ctx context.Context, dec func(any) error, interceptor UnaryServerInterceptor) (any, error)  // MethodDesc represents an RPC service's method specification.  type MethodDesc struct {  	MethodName string -	Handler    methodHandler +	Handler    MethodHandler  }  // ServiceDesc represents an RPC service's specification. @@ -621,8 +622,8 @@ func bufferPool(bufferPool mem.BufferPool) ServerOption {  // workload (assuming a QPS of a few thousand requests/sec).  const serverWorkerResetThreshold = 1 << 16 -// serverWorker blocks on a *transport.Stream channel forever and waits for -// data to be fed by serveStreams. This allows multiple requests to be +// serverWorker blocks on a *transport.ServerStream channel forever and waits +// for data to be fed by serveStreams. This allows multiple requests to be  // processed by the same goroutine, removing the need for expensive stack  // re-allocations (see the runtime.morestack problem [1]).  // @@ -1020,7 +1021,7 @@ func (s *Server) serveStreams(ctx context.Context, st transport.ServerTransport,  	}()  	streamQuota := newHandlerQuota(s.opts.maxConcurrentStreams) -	st.HandleStreams(ctx, func(stream *transport.Stream) { +	st.HandleStreams(ctx, func(stream *transport.ServerStream) {  		s.handlersWG.Add(1)  		streamQuota.acquire()  		f := func() { @@ -1136,7 +1137,7 @@ func (s *Server) incrCallsFailed() {  	s.channelz.ServerMetrics.CallsFailed.Add(1)  } -func (s *Server) sendResponse(ctx context.Context, t transport.ServerTransport, stream *transport.Stream, msg any, cp Compressor, opts *transport.Options, comp encoding.Compressor) error { +func (s *Server) sendResponse(ctx context.Context, stream *transport.ServerStream, msg any, cp Compressor, opts *transport.WriteOptions, comp encoding.Compressor) error {  	data, err := encode(s.getCodec(stream.ContentSubtype()), msg)  	if err != nil {  		channelz.Error(logger, s.channelz, "grpc: server failed to encode response: ", err) @@ -1165,7 +1166,7 @@ func (s *Server) sendResponse(ctx context.Context, t transport.ServerTransport,  	if payloadLen > s.opts.maxSendMessageSize {  		return status.Errorf(codes.ResourceExhausted, "grpc: trying to send message larger than max (%d vs. %d)", payloadLen, s.opts.maxSendMessageSize)  	} -	err = t.Write(stream, hdr, payload, opts) +	err = stream.Write(hdr, payload, opts)  	if err == nil {  		if len(s.opts.statsHandlers) != 0 {  			for _, sh := range s.opts.statsHandlers { @@ -1212,7 +1213,7 @@ func getChainUnaryHandler(interceptors []UnaryServerInterceptor, curr int, info  	}  } -func (s *Server) processUnaryRPC(ctx context.Context, t transport.ServerTransport, stream *transport.Stream, info *serviceInfo, md *MethodDesc, trInfo *traceInfo) (err error) { +func (s *Server) processUnaryRPC(ctx context.Context, stream *transport.ServerStream, info *serviceInfo, md *MethodDesc, trInfo *traceInfo) (err error) {  	shs := s.opts.statsHandlers  	if len(shs) != 0 || trInfo != nil || channelz.IsOn() {  		if channelz.IsOn() { @@ -1320,7 +1321,7 @@ func (s *Server) processUnaryRPC(ctx context.Context, t transport.ServerTranspor  		decomp = encoding.GetCompressor(rc)  		if decomp == nil {  			st := status.Newf(codes.Unimplemented, "grpc: Decompressor is not installed for grpc-encoding %q", rc) -			t.WriteStatus(stream, st) +			stream.WriteStatus(st)  			return st.Err()  		}  	} @@ -1354,14 +1355,12 @@ func (s *Server) processUnaryRPC(ctx context.Context, t transport.ServerTranspor  	d, err := recvAndDecompress(&parser{r: stream, bufferPool: s.opts.bufferPool}, stream, dc, s.opts.maxReceiveMessageSize, payInfo, decomp, true)  	if err != nil { -		if e := t.WriteStatus(stream, status.Convert(err)); e != nil { +		if e := stream.WriteStatus(status.Convert(err)); e != nil {  			channelz.Warningf(logger, s.channelz, "grpc: Server.processUnaryRPC failed to write status: %v", e)  		}  		return err  	} -	if channelz.IsOn() { -		t.IncrMsgRecv() -	} +	defer d.Free()  	df := func(v any) error {  		if err := s.getCodec(stream.ContentSubtype()).Unmarshal(d, v); err != nil {  			return status.Errorf(codes.Internal, "grpc: error unmarshalling request: %v", err) @@ -1403,7 +1402,7 @@ func (s *Server) processUnaryRPC(ctx context.Context, t transport.ServerTranspor  			trInfo.tr.LazyLog(stringer(appStatus.Message()), true)  			trInfo.tr.SetError()  		} -		if e := t.WriteStatus(stream, appStatus); e != nil { +		if e := stream.WriteStatus(appStatus); e != nil {  			channelz.Warningf(logger, s.channelz, "grpc: Server.processUnaryRPC failed to write status: %v", e)  		}  		if len(binlogs) != 0 { @@ -1430,20 +1429,20 @@ func (s *Server) processUnaryRPC(ctx context.Context, t transport.ServerTranspor  	if trInfo != nil {  		trInfo.tr.LazyLog(stringer("OK"), false)  	} -	opts := &transport.Options{Last: true} +	opts := &transport.WriteOptions{Last: true}  	// Server handler could have set new compressor by calling SetSendCompressor.  	// In case it is set, we need to use it for compressing outbound message.  	if stream.SendCompress() != sendCompressorName {  		comp = encoding.GetCompressor(stream.SendCompress())  	} -	if err := s.sendResponse(ctx, t, stream, reply, cp, opts, comp); err != nil { +	if err := s.sendResponse(ctx, stream, reply, cp, opts, comp); err != nil {  		if err == io.EOF {  			// The entire stream is done (for unary RPC only).  			return err  		}  		if sts, ok := status.FromError(err); ok { -			if e := t.WriteStatus(stream, sts); e != nil { +			if e := stream.WriteStatus(sts); e != nil {  				channelz.Warningf(logger, s.channelz, "grpc: Server.processUnaryRPC failed to write status: %v", e)  			}  		} else { @@ -1483,9 +1482,6 @@ func (s *Server) processUnaryRPC(ctx context.Context, t transport.ServerTranspor  			binlog.Log(ctx, sm)  		}  	} -	if channelz.IsOn() { -		t.IncrMsgSent() -	}  	if trInfo != nil {  		trInfo.tr.LazyLog(&payload{sent: true, msg: reply}, true)  	} @@ -1501,7 +1497,7 @@ func (s *Server) processUnaryRPC(ctx context.Context, t transport.ServerTranspor  			binlog.Log(ctx, st)  		}  	} -	return t.WriteStatus(stream, statusOK) +	return stream.WriteStatus(statusOK)  }  // chainStreamServerInterceptors chains all stream server interceptors into one. @@ -1540,7 +1536,7 @@ func getChainStreamHandler(interceptors []StreamServerInterceptor, curr int, inf  	}  } -func (s *Server) processStreamingRPC(ctx context.Context, t transport.ServerTransport, stream *transport.Stream, info *serviceInfo, sd *StreamDesc, trInfo *traceInfo) (err error) { +func (s *Server) processStreamingRPC(ctx context.Context, stream *transport.ServerStream, info *serviceInfo, sd *StreamDesc, trInfo *traceInfo) (err error) {  	if channelz.IsOn() {  		s.incrCallsStarted()  	} @@ -1560,7 +1556,6 @@ func (s *Server) processStreamingRPC(ctx context.Context, t transport.ServerTran  	ctx = NewContextWithServerTransportStream(ctx, stream)  	ss := &serverStream{  		ctx:                   ctx, -		t:                     t,  		s:                     stream,  		p:                     &parser{r: stream, bufferPool: s.opts.bufferPool},  		codec:                 s.getCodec(stream.ContentSubtype()), @@ -1647,7 +1642,7 @@ func (s *Server) processStreamingRPC(ctx context.Context, t transport.ServerTran  		ss.decomp = encoding.GetCompressor(rc)  		if ss.decomp == nil {  			st := status.Newf(codes.Unimplemented, "grpc: Decompressor is not installed for grpc-encoding %q", rc) -			t.WriteStatus(ss.s, st) +			ss.s.WriteStatus(st)  			return st.Err()  		}  	} @@ -1716,7 +1711,7 @@ func (s *Server) processStreamingRPC(ctx context.Context, t transport.ServerTran  				binlog.Log(ctx, st)  			}  		} -		t.WriteStatus(ss.s, appStatus) +		ss.s.WriteStatus(appStatus)  		// TODO: Should we log an error from WriteStatus here and below?  		return appErr  	} @@ -1734,10 +1729,10 @@ func (s *Server) processStreamingRPC(ctx context.Context, t transport.ServerTran  			binlog.Log(ctx, st)  		}  	} -	return t.WriteStatus(ss.s, statusOK) +	return ss.s.WriteStatus(statusOK)  } -func (s *Server) handleStream(t transport.ServerTransport, stream *transport.Stream) { +func (s *Server) handleStream(t transport.ServerTransport, stream *transport.ServerStream) {  	ctx := stream.Context()  	ctx = contextWithServer(ctx, s)  	var ti *traceInfo @@ -1767,7 +1762,7 @@ func (s *Server) handleStream(t transport.ServerTransport, stream *transport.Str  			ti.tr.SetError()  		}  		errDesc := fmt.Sprintf("malformed method name: %q", stream.Method()) -		if err := t.WriteStatus(stream, status.New(codes.Unimplemented, errDesc)); err != nil { +		if err := stream.WriteStatus(status.New(codes.Unimplemented, errDesc)); err != nil {  			if ti != nil {  				ti.tr.LazyLog(&fmtStringer{"%v", []any{err}}, true)  				ti.tr.SetError() @@ -1782,17 +1777,20 @@ func (s *Server) handleStream(t transport.ServerTransport, stream *transport.Str  	service := sm[:pos]  	method := sm[pos+1:] -	md, _ := metadata.FromIncomingContext(ctx) -	for _, sh := range s.opts.statsHandlers { -		ctx = sh.TagRPC(ctx, &stats.RPCTagInfo{FullMethodName: stream.Method()}) -		sh.HandleRPC(ctx, &stats.InHeader{ -			FullMethod:  stream.Method(), -			RemoteAddr:  t.Peer().Addr, -			LocalAddr:   t.Peer().LocalAddr, -			Compression: stream.RecvCompress(), -			WireLength:  stream.HeaderWireLength(), -			Header:      md, -		}) +	// FromIncomingContext is expensive: skip if there are no statsHandlers +	if len(s.opts.statsHandlers) > 0 { +		md, _ := metadata.FromIncomingContext(ctx) +		for _, sh := range s.opts.statsHandlers { +			ctx = sh.TagRPC(ctx, &stats.RPCTagInfo{FullMethodName: stream.Method()}) +			sh.HandleRPC(ctx, &stats.InHeader{ +				FullMethod:  stream.Method(), +				RemoteAddr:  t.Peer().Addr, +				LocalAddr:   t.Peer().LocalAddr, +				Compression: stream.RecvCompress(), +				WireLength:  stream.HeaderWireLength(), +				Header:      md, +			}) +		}  	}  	// To have calls in stream callouts work. Will delete once all stats handler  	// calls come from the gRPC layer. @@ -1801,17 +1799,17 @@ func (s *Server) handleStream(t transport.ServerTransport, stream *transport.Str  	srv, knownService := s.services[service]  	if knownService {  		if md, ok := srv.methods[method]; ok { -			s.processUnaryRPC(ctx, t, stream, srv, md, ti) +			s.processUnaryRPC(ctx, stream, srv, md, ti)  			return  		}  		if sd, ok := srv.streams[method]; ok { -			s.processStreamingRPC(ctx, t, stream, srv, sd, ti) +			s.processStreamingRPC(ctx, stream, srv, sd, ti)  			return  		}  	}  	// Unknown service, or known server unknown method.  	if unknownDesc := s.opts.unknownStreamDesc; unknownDesc != nil { -		s.processStreamingRPC(ctx, t, stream, nil, unknownDesc, ti) +		s.processStreamingRPC(ctx, stream, nil, unknownDesc, ti)  		return  	}  	var errDesc string @@ -1824,7 +1822,7 @@ func (s *Server) handleStream(t transport.ServerTransport, stream *transport.Str  		ti.tr.LazyPrintf("%s", errDesc)  		ti.tr.SetError()  	} -	if err := t.WriteStatus(stream, status.New(codes.Unimplemented, errDesc)); err != nil { +	if err := stream.WriteStatus(status.New(codes.Unimplemented, errDesc)); err != nil {  		if ti != nil {  			ti.tr.LazyLog(&fmtStringer{"%v", []any{err}}, true)  			ti.tr.SetError() @@ -2099,7 +2097,7 @@ func SendHeader(ctx context.Context, md metadata.MD) error {  // Notice: This function is EXPERIMENTAL and may be changed or removed in a  // later release.  func SetSendCompressor(ctx context.Context, name string) error { -	stream, ok := ServerTransportStreamFromContext(ctx).(*transport.Stream) +	stream, ok := ServerTransportStreamFromContext(ctx).(*transport.ServerStream)  	if !ok || stream == nil {  		return fmt.Errorf("failed to fetch the stream from the given context")  	} @@ -2121,7 +2119,7 @@ func SetSendCompressor(ctx context.Context, name string) error {  // Notice: This function is EXPERIMENTAL and may be changed or removed in a  // later release.  func ClientSupportedCompressors(ctx context.Context) ([]string, error) { -	stream, ok := ServerTransportStreamFromContext(ctx).(*transport.Stream) +	stream, ok := ServerTransportStreamFromContext(ctx).(*transport.ServerStream)  	if !ok || stream == nil {  		return nil, fmt.Errorf("failed to fetch the stream from the given context %v", ctx)  	} diff --git a/vendor/google.golang.org/grpc/service_config.go b/vendor/google.golang.org/grpc/service_config.go index 2671c5ef6..7e83027d1 100644 --- a/vendor/google.golang.org/grpc/service_config.go +++ b/vendor/google.golang.org/grpc/service_config.go @@ -168,6 +168,7 @@ func init() {  		return parseServiceConfig(js, defaultMaxCallAttempts)  	}  } +  func parseServiceConfig(js string, maxAttempts int) *serviceconfig.ParseResult {  	if len(js) == 0 {  		return &serviceconfig.ParseResult{Err: fmt.Errorf("no JSON service config provided")} @@ -297,7 +298,7 @@ func convertRetryPolicy(jrp *jsonRetryPolicy, maxAttempts int) (p *internalservi  	return rp, nil  } -func min(a, b *int) *int { +func minPointers(a, b *int) *int {  	if *a < *b {  		return a  	} @@ -309,7 +310,7 @@ func getMaxSize(mcMax, doptMax *int, defaultVal int) *int {  		return &defaultVal  	}  	if mcMax != nil && doptMax != nil { -		return min(mcMax, doptMax) +		return minPointers(mcMax, doptMax)  	}  	if mcMax != nil {  		return mcMax diff --git a/vendor/google.golang.org/grpc/stats/metrics.go b/vendor/google.golang.org/grpc/stats/metrics.go new file mode 100644 index 000000000..641c8e979 --- /dev/null +++ b/vendor/google.golang.org/grpc/stats/metrics.go @@ -0,0 +1,81 @@ +/* + * Copyright 2024 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + *     http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package stats + +import "maps" + +// MetricSet is a set of metrics to record. Once created, MetricSet is immutable, +// however Add and Remove can make copies with specific metrics added or +// removed, respectively. +// +// Do not construct directly; use NewMetricSet instead. +type MetricSet struct { +	// metrics are the set of metrics to initialize. +	metrics map[string]bool +} + +// NewMetricSet returns a MetricSet containing metricNames. +func NewMetricSet(metricNames ...string) *MetricSet { +	newMetrics := make(map[string]bool) +	for _, metric := range metricNames { +		newMetrics[metric] = true +	} +	return &MetricSet{metrics: newMetrics} +} + +// Metrics returns the metrics set. The returned map is read-only and must not +// be modified. +func (m *MetricSet) Metrics() map[string]bool { +	return m.metrics +} + +// Add adds the metricNames to the metrics set and returns a new copy with the +// additional metrics. +func (m *MetricSet) Add(metricNames ...string) *MetricSet { +	newMetrics := make(map[string]bool) +	for metric := range m.metrics { +		newMetrics[metric] = true +	} + +	for _, metric := range metricNames { +		newMetrics[metric] = true +	} +	return &MetricSet{metrics: newMetrics} +} + +// Join joins the metrics passed in with the metrics set, and returns a new copy +// with the merged metrics. +func (m *MetricSet) Join(metrics *MetricSet) *MetricSet { +	newMetrics := make(map[string]bool) +	maps.Copy(newMetrics, m.metrics) +	maps.Copy(newMetrics, metrics.metrics) +	return &MetricSet{metrics: newMetrics} +} + +// Remove removes the metricNames from the metrics set and returns a new copy +// with the metrics removed. +func (m *MetricSet) Remove(metricNames ...string) *MetricSet { +	newMetrics := make(map[string]bool) +	for metric := range m.metrics { +		newMetrics[metric] = true +	} + +	for _, metric := range metricNames { +		delete(newMetrics, metric) +	} +	return &MetricSet{metrics: newMetrics} +} diff --git a/vendor/google.golang.org/grpc/stats/stats.go b/vendor/google.golang.org/grpc/stats/stats.go index 71195c494..6f20d2d54 100644 --- a/vendor/google.golang.org/grpc/stats/stats.go +++ b/vendor/google.golang.org/grpc/stats/stats.go @@ -260,84 +260,42 @@ func (s *ConnEnd) IsClient() bool { return s.Client }  func (s *ConnEnd) isConnStats() {} -type incomingTagsKey struct{} -type outgoingTagsKey struct{} -  // SetTags attaches stats tagging data to the context, which will be sent in  // the outgoing RPC with the header grpc-tags-bin.  Subsequent calls to  // SetTags will overwrite the values from earlier calls.  // -// NOTE: this is provided only for backward compatibility with existing clients -// and will likely be removed in an upcoming release.  New uses should transmit -// this type of data using metadata with a different, non-reserved (i.e. does -// not begin with "grpc-") header name. +// Deprecated: set the `grpc-tags-bin` header in the metadata instead.  func SetTags(ctx context.Context, b []byte) context.Context { -	return context.WithValue(ctx, outgoingTagsKey{}, b) +	return metadata.AppendToOutgoingContext(ctx, "grpc-tags-bin", string(b))  }  // Tags returns the tags from the context for the inbound RPC.  // -// NOTE: this is provided only for backward compatibility with existing clients -// and will likely be removed in an upcoming release.  New uses should transmit -// this type of data using metadata with a different, non-reserved (i.e. does -// not begin with "grpc-") header name. +// Deprecated: obtain the `grpc-tags-bin` header from metadata instead.  func Tags(ctx context.Context) []byte { -	b, _ := ctx.Value(incomingTagsKey{}).([]byte) -	return b -} - -// SetIncomingTags attaches stats tagging data to the context, to be read by -// the application (not sent in outgoing RPCs). -// -// This is intended for gRPC-internal use ONLY. -func SetIncomingTags(ctx context.Context, b []byte) context.Context { -	return context.WithValue(ctx, incomingTagsKey{}, b) -} - -// OutgoingTags returns the tags from the context for the outbound RPC. -// -// This is intended for gRPC-internal use ONLY. -func OutgoingTags(ctx context.Context) []byte { -	b, _ := ctx.Value(outgoingTagsKey{}).([]byte) -	return b +	traceValues := metadata.ValueFromIncomingContext(ctx, "grpc-tags-bin") +	if len(traceValues) == 0 { +		return nil +	} +	return []byte(traceValues[len(traceValues)-1])  } -type incomingTraceKey struct{} -type outgoingTraceKey struct{} -  // SetTrace attaches stats tagging data to the context, which will be sent in  // the outgoing RPC with the header grpc-trace-bin.  Subsequent calls to  // SetTrace will overwrite the values from earlier calls.  // -// NOTE: this is provided only for backward compatibility with existing clients -// and will likely be removed in an upcoming release.  New uses should transmit -// this type of data using metadata with a different, non-reserved (i.e. does -// not begin with "grpc-") header name. +// Deprecated: set the `grpc-trace-bin` header in the metadata instead.  func SetTrace(ctx context.Context, b []byte) context.Context { -	return context.WithValue(ctx, outgoingTraceKey{}, b) +	return metadata.AppendToOutgoingContext(ctx, "grpc-trace-bin", string(b))  }  // Trace returns the trace from the context for the inbound RPC.  // -// NOTE: this is provided only for backward compatibility with existing clients -// and will likely be removed in an upcoming release.  New uses should transmit -// this type of data using metadata with a different, non-reserved (i.e. does -// not begin with "grpc-") header name. +// Deprecated: obtain the `grpc-trace-bin` header from metadata instead.  func Trace(ctx context.Context) []byte { -	b, _ := ctx.Value(incomingTraceKey{}).([]byte) -	return b -} - -// SetIncomingTrace attaches stats tagging data to the context, to be read by -// the application (not sent in outgoing RPCs).  It is intended for -// gRPC-internal use. -func SetIncomingTrace(ctx context.Context, b []byte) context.Context { -	return context.WithValue(ctx, incomingTraceKey{}, b) -} - -// OutgoingTrace returns the trace from the context for the outbound RPC.  It is -// intended for gRPC-internal use. -func OutgoingTrace(ctx context.Context) []byte { -	b, _ := ctx.Value(outgoingTraceKey{}).([]byte) -	return b +	traceValues := metadata.ValueFromIncomingContext(ctx, "grpc-trace-bin") +	if len(traceValues) == 0 { +		return nil +	} +	return []byte(traceValues[len(traceValues)-1])  } diff --git a/vendor/google.golang.org/grpc/stream.go b/vendor/google.golang.org/grpc/stream.go index bb2b2a216..17e2267b3 100644 --- a/vendor/google.golang.org/grpc/stream.go +++ b/vendor/google.golang.org/grpc/stream.go @@ -23,7 +23,7 @@ import (  	"errors"  	"io"  	"math" -	"math/rand" +	rand "math/rand/v2"  	"strconv"  	"sync"  	"time" @@ -113,7 +113,9 @@ type ClientStream interface {  	// SendMsg is generally called by generated code. On error, SendMsg aborts  	// the stream. If the error was generated by the client, the status is  	// returned directly; otherwise, io.EOF is returned and the status of -	// the stream may be discovered using RecvMsg. +	// the stream may be discovered using RecvMsg. For unary or server-streaming +	// RPCs (StreamDesc.ClientStreams is false), a nil error is returned +	// unconditionally.  	//  	// SendMsg blocks until:  	//   - There is sufficient flow control to schedule m with the transport, or @@ -216,7 +218,7 @@ func newClientStream(ctx context.Context, desc *StreamDesc, cc *ClientConn, meth  	var mc serviceconfig.MethodConfig  	var onCommit func() -	var newStream = func(ctx context.Context, done func()) (iresolver.ClientStream, error) { +	newStream := func(ctx context.Context, done func()) (iresolver.ClientStream, error) {  		return newClientStreamWithParams(ctx, desc, cc, method, mc, onCommit, done, opts...)  	} @@ -584,7 +586,7 @@ type csAttempt struct {  	ctx        context.Context  	cs         *clientStream  	t          transport.ClientTransport -	s          *transport.Stream +	s          *transport.ClientStream  	p          *parser  	pickResult balancer.PickResult @@ -706,11 +708,10 @@ func (a *csAttempt) shouldRetry(err error) (bool, error) {  		cs.numRetriesSincePushback = 0  	} else {  		fact := math.Pow(rp.BackoffMultiplier, float64(cs.numRetriesSincePushback)) -		cur := float64(rp.InitialBackoff) * fact -		if max := float64(rp.MaxBackoff); cur > max { -			cur = max -		} -		dur = time.Duration(rand.Int63n(int64(cur))) +		cur := min(float64(rp.InitialBackoff)*fact, float64(rp.MaxBackoff)) +		// Apply jitter by multiplying with a random factor between 0.8 and 1.2 +		cur *= 0.8 + 0.4*rand.Float64() +		dur = time.Duration(int64(cur))  		cs.numRetriesSincePushback++  	} @@ -991,7 +992,7 @@ func (cs *clientStream) CloseSend() error {  	}  	cs.sentLast = true  	op := func(a *csAttempt) error { -		a.t.Write(a.s, nil, nil, &transport.Options{Last: true}) +		a.s.Write(nil, nil, &transport.WriteOptions{Last: true})  		// Always return nil; io.EOF is the only error that might make sense  		// instead, but there is no need to signal the client to call RecvMsg  		// as the only use left for the stream after CloseSend is to call @@ -1083,7 +1084,7 @@ func (a *csAttempt) sendMsg(m any, hdr []byte, payld mem.BufferSlice, dataLength  		}  		a.mu.Unlock()  	} -	if err := a.t.Write(a.s, hdr, payld, &transport.Options{Last: !cs.desc.ClientStreams}); err != nil { +	if err := a.s.Write(hdr, payld, &transport.WriteOptions{Last: !cs.desc.ClientStreams}); err != nil {  		if !cs.desc.ClientStreams {  			// For non-client-streaming RPCs, we return nil instead of EOF on error  			// because the generated code requires it.  finish is not called; RecvMsg() @@ -1097,9 +1098,6 @@ func (a *csAttempt) sendMsg(m any, hdr []byte, payld mem.BufferSlice, dataLength  			sh.HandleRPC(a.ctx, outPayload(true, m, dataLength, payloadLength, time.Now()))  		}  	} -	if channelz.IsOn() { -		a.t.IncrMsgSent() -	}  	return nil  } @@ -1153,9 +1151,6 @@ func (a *csAttempt) recvMsg(m any, payInfo *payloadInfo) (err error) {  			Length:           payInfo.uncompressedBytes.Len(),  		})  	} -	if channelz.IsOn() { -		a.t.IncrMsgRecv() -	}  	if cs.desc.ServerStreams {  		// Subsequent messages should be received by subsequent RecvMsg calls.  		return nil @@ -1183,7 +1178,7 @@ func (a *csAttempt) finish(err error) {  	}  	var tr metadata.MD  	if a.s != nil { -		a.t.CloseStream(a.s, err) +		a.s.Close(err)  		tr = a.s.Trailer()  	} @@ -1340,7 +1335,7 @@ func newNonRetryClientStream(ctx context.Context, desc *StreamDesc, method strin  }  type addrConnStream struct { -	s         *transport.Stream +	s         *transport.ClientStream  	ac        *addrConn  	callHdr   *transport.CallHdr  	cancel    context.CancelFunc @@ -1380,7 +1375,7 @@ func (as *addrConnStream) CloseSend() error {  	}  	as.sentLast = true -	as.t.Write(as.s, nil, nil, &transport.Options{Last: true}) +	as.s.Write(nil, nil, &transport.WriteOptions{Last: true})  	// Always return nil; io.EOF is the only error that might make sense  	// instead, but there is no need to signal the client to call RecvMsg  	// as the only use left for the stream after CloseSend is to call @@ -1430,7 +1425,7 @@ func (as *addrConnStream) SendMsg(m any) (err error) {  		return status.Errorf(codes.ResourceExhausted, "trying to send message larger than max (%d vs. %d)", payload.Len(), *as.callInfo.maxSendMessageSize)  	} -	if err := as.t.Write(as.s, hdr, payload, &transport.Options{Last: !as.desc.ClientStreams}); err != nil { +	if err := as.s.Write(hdr, payload, &transport.WriteOptions{Last: !as.desc.ClientStreams}); err != nil {  		if !as.desc.ClientStreams {  			// For non-client-streaming RPCs, we return nil instead of EOF on error  			// because the generated code requires it.  finish is not called; RecvMsg() @@ -1440,9 +1435,6 @@ func (as *addrConnStream) SendMsg(m any) (err error) {  		return io.EOF  	} -	if channelz.IsOn() { -		as.t.IncrMsgSent() -	}  	return nil  } @@ -1480,9 +1472,6 @@ func (as *addrConnStream) RecvMsg(m any) (err error) {  		return toRPCErr(err)  	} -	if channelz.IsOn() { -		as.t.IncrMsgRecv() -	}  	if as.desc.ServerStreams {  		// Subsequent messages should be received by subsequent RecvMsg calls.  		return nil @@ -1510,7 +1499,7 @@ func (as *addrConnStream) finish(err error) {  		err = nil  	}  	if as.s != nil { -		as.t.CloseStream(as.s, err) +		as.s.Close(err)  	}  	if err != nil { @@ -1577,8 +1566,7 @@ type ServerStream interface {  // serverStream implements a server side Stream.  type serverStream struct {  	ctx   context.Context -	t     transport.ServerTransport -	s     *transport.Stream +	s     *transport.ServerStream  	p     *parser  	codec baseCodec @@ -1628,7 +1616,7 @@ func (ss *serverStream) SendHeader(md metadata.MD) error {  		return status.Error(codes.Internal, err.Error())  	} -	err = ss.t.WriteHeader(ss.s, md) +	err = ss.s.SendHeader(md)  	if len(ss.binlogs) != 0 && !ss.serverHeaderBinlogged {  		h, _ := ss.s.Header()  		sh := &binarylog.ServerHeader{ @@ -1668,7 +1656,7 @@ func (ss *serverStream) SendMsg(m any) (err error) {  		}  		if err != nil && err != io.EOF {  			st, _ := status.FromError(toRPCErr(err)) -			ss.t.WriteStatus(ss.s, st) +			ss.s.WriteStatus(st)  			// Non-user specified status was sent out. This should be an error  			// case (as a server side Cancel maybe).  			// @@ -1676,9 +1664,6 @@ func (ss *serverStream) SendMsg(m any) (err error) {  			// status from the service handler, we will log that error instead.  			// This behavior is similar to an interceptor.  		} -		if channelz.IsOn() && err == nil { -			ss.t.IncrMsgSent() -		}  	}()  	// Server handler could have set new compressor by calling SetSendCompressor. @@ -1710,7 +1695,7 @@ func (ss *serverStream) SendMsg(m any) (err error) {  	if payloadLen > ss.maxSendMessageSize {  		return status.Errorf(codes.ResourceExhausted, "trying to send message larger than max (%d vs. %d)", payloadLen, ss.maxSendMessageSize)  	} -	if err := ss.t.Write(ss.s, hdr, payload, &transport.Options{Last: false}); err != nil { +	if err := ss.s.Write(hdr, payload, &transport.WriteOptions{Last: false}); err != nil {  		return toRPCErr(err)  	} @@ -1756,7 +1741,7 @@ func (ss *serverStream) RecvMsg(m any) (err error) {  		}  		if err != nil && err != io.EOF {  			st, _ := status.FromError(toRPCErr(err)) -			ss.t.WriteStatus(ss.s, st) +			ss.s.WriteStatus(st)  			// Non-user specified status was sent out. This should be an error  			// case (as a server side Cancel maybe).  			// @@ -1764,9 +1749,6 @@ func (ss *serverStream) RecvMsg(m any) (err error) {  			// status from the service handler, we will log that error instead.  			// This behavior is similar to an interceptor.  		} -		if channelz.IsOn() && err == nil { -			ss.t.IncrMsgRecv() -		}  	}()  	var payInfo *payloadInfo  	if len(ss.statsHandler) != 0 || len(ss.binlogs) != 0 { diff --git a/vendor/google.golang.org/grpc/stream_interfaces.go b/vendor/google.golang.org/grpc/stream_interfaces.go index 8b813529c..0037fee0b 100644 --- a/vendor/google.golang.org/grpc/stream_interfaces.go +++ b/vendor/google.golang.org/grpc/stream_interfaces.go @@ -22,15 +22,35 @@ package grpc  // request, many responses) RPC. It is generic over the type of the response  // message. It is used in generated code.  type ServerStreamingClient[Res any] interface { +	// Recv receives the next response message from the server. The client may +	// repeatedly call Recv to read messages from the response stream.  If +	// io.EOF is returned, the stream has terminated with an OK status.  Any +	// other error is compatible with the status package and indicates the +	// RPC's status code and message.  	Recv() (*Res, error) + +	// ClientStream is embedded to provide Context, Header, and Trailer +	// functionality.  No other methods in the ClientStream should be called +	// directly.  	ClientStream  }  // ServerStreamingServer represents the server side of a server-streaming (one  // request, many responses) RPC. It is generic over the type of the response  // message. It is used in generated code. +// +// To terminate the response stream, return from the handler method and return +// an error from the status package, or use nil to indicate an OK status code.  type ServerStreamingServer[Res any] interface { +	// Send sends a response message to the client.  The server handler may +	// call Send multiple times to send multiple messages to the client.  An +	// error is returned if the stream was terminated unexpectedly, and the +	// handler method should return, as the stream is no longer usable.  	Send(*Res) error + +	// ServerStream is embedded to provide Context, SetHeader, SendHeader, and +	// SetTrailer functionality.  No other methods in the ServerStream should +	// be called directly.  	ServerStream  } @@ -39,8 +59,22 @@ type ServerStreamingServer[Res any] interface {  // message stream and the type of the unary response message. It is used in  // generated code.  type ClientStreamingClient[Req any, Res any] interface { +	// Send sends a request message to the server.  The client may call Send +	// multiple times to send multiple messages to the server.  On error, Send +	// aborts the stream.  If the error was generated by the client, the status +	// is returned directly.  Otherwise, io.EOF is returned, and the status of +	// the stream may be discovered using CloseAndRecv().  	Send(*Req) error + +	// CloseAndRecv closes the request stream and waits for the server's +	// response.  This method must be called once and only once after sending +	// all request messages.  Any error returned is implemented by the status +	// package.  	CloseAndRecv() (*Res, error) + +	// ClientStream is embedded to provide Context, Header, and Trailer +	// functionality.  No other methods in the ClientStream should be called +	// directly.  	ClientStream  } @@ -48,9 +82,28 @@ type ClientStreamingClient[Req any, Res any] interface {  // requests, one response) RPC. It is generic over both the type of the request  // message stream and the type of the unary response message. It is used in  // generated code. +// +// To terminate the RPC, call SendAndClose and return nil from the method +// handler or do not call SendAndClose and return an error from the status +// package.  type ClientStreamingServer[Req any, Res any] interface { +	// Recv receives the next request message from the client.  The server may +	// repeatedly call Recv to read messages from the request stream.  If +	// io.EOF is returned, it indicates the client called CloseAndRecv on its +	// ClientStreamingClient.  Any other error indicates the stream was +	// terminated unexpectedly, and the handler method should return, as the +	// stream is no longer usable.  	Recv() (*Req, error) + +	// SendAndClose sends a single response message to the client and closes +	// the stream.  This method must be called once and only once after all +	// request messages have been processed.  Recv should not be called after +	// calling SendAndClose.  	SendAndClose(*Res) error + +	// ServerStream is embedded to provide Context, SetHeader, SendHeader, and +	// SetTrailer functionality.  No other methods in the ServerStream should +	// be called directly.  	ServerStream  } @@ -59,8 +112,23 @@ type ClientStreamingServer[Req any, Res any] interface {  // request message stream and the type of the response message stream. It is  // used in generated code.  type BidiStreamingClient[Req any, Res any] interface { +	// Send sends a request message to the server.  The client may call Send +	// multiple times to send multiple messages to the server.  On error, Send +	// aborts the stream.  If the error was generated by the client, the status +	// is returned directly.  Otherwise, io.EOF is returned, and the status of +	// the stream may be discovered using Recv().  	Send(*Req) error + +	// Recv receives the next response message from the server. The client may +	// repeatedly call Recv to read messages from the response stream.  If +	// io.EOF is returned, the stream has terminated with an OK status.  Any +	// other error is compatible with the status package and indicates the +	// RPC's status code and message.  	Recv() (*Res, error) + +	// ClientStream is embedded to provide Context, Header, Trailer, and +	// CloseSend functionality.  No other methods in the ClientStream should be +	// called directly.  	ClientStream  } @@ -68,9 +136,27 @@ type BidiStreamingClient[Req any, Res any] interface {  // (many requests, many responses) RPC. It is generic over both the type of the  // request message stream and the type of the response message stream. It is  // used in generated code. +// +// To terminate the stream, return from the handler method and return +// an error from the status package, or use nil to indicate an OK status code.  type BidiStreamingServer[Req any, Res any] interface { +	// Recv receives the next request message from the client.  The server may +	// repeatedly call Recv to read messages from the request stream.  If +	// io.EOF is returned, it indicates the client called CloseSend on its +	// BidiStreamingClient.  Any other error indicates the stream was +	// terminated unexpectedly, and the handler method should return, as the +	// stream is no longer usable.  	Recv() (*Req, error) + +	// Send sends a response message to the client.  The server handler may +	// call Send multiple times to send multiple messages to the client.  An +	// error is returned if the stream was terminated unexpectedly, and the +	// handler method should return, as the stream is no longer usable.  	Send(*Res) error + +	// ServerStream is embedded to provide Context, SetHeader, SendHeader, and +	// SetTrailer functionality.  No other methods in the ServerStream should +	// be called directly.  	ServerStream  } diff --git a/vendor/google.golang.org/grpc/version.go b/vendor/google.golang.org/grpc/version.go index 03f931834..d2bba7f3d 100644 --- a/vendor/google.golang.org/grpc/version.go +++ b/vendor/google.golang.org/grpc/version.go @@ -19,4 +19,4 @@  package grpc  // Version is the current grpc version. -const Version = "1.66.1" +const Version = "1.69.4"  | 
