diff options
Diffstat (limited to 'vendor')
70 files changed, 1010 insertions, 834 deletions
| diff --git a/vendor/go.opentelemetry.io/otel/.golangci.yml b/vendor/go.opentelemetry.io/otel/.golangci.yml index 61782fbf0..6e8eeec00 100644 --- a/vendor/go.opentelemetry.io/otel/.golangci.yml +++ b/vendor/go.opentelemetry.io/otel/.golangci.yml @@ -76,11 +76,6 @@ linters-settings:        otlp-internal:          files:            - "!**/exporters/otlp/internal/**/*.go" -          # TODO: remove the following when otlpmetric/internal is removed. -          - "!**/exporters/otlp/otlpmetric/internal/oconf/envconfig.go" -          - "!**/exporters/otlp/otlpmetric/internal/oconf/options.go" -          - "!**/exporters/otlp/otlpmetric/internal/oconf/options_test.go" -          - "!**/exporters/otlp/otlpmetric/internal/otest/client_test.go"          deny:            - pkg: "go.opentelemetry.io/otel/exporters/otlp/internal"              desc: Do not use cross-module internal packages. diff --git a/vendor/go.opentelemetry.io/otel/CHANGELOG.md b/vendor/go.opentelemetry.io/otel/CHANGELOG.md index 7aa5c8051..a57345210 100644 --- a/vendor/go.opentelemetry.io/otel/CHANGELOG.md +++ b/vendor/go.opentelemetry.io/otel/CHANGELOG.md @@ -8,6 +8,32 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm  ## [Unreleased] +## [1.18.0/0.41.0/0.0.6] 2023-09-12 + +This release drops the compatibility guarantee of [Go 1.19]. + +### Added + +- Add `WithProducer` option in `go.opentelemetry.op/otel/exporters/prometheus` to restore the ability to register producers on the prometheus exporter's manual reader. (#4473) +- Add `IgnoreValue` option in `go.opentelemetry.io/otel/sdk/metric/metricdata/metricdatatest` to allow ignoring values when comparing metrics. (#4447) + +### Changed + +- Use a `TestingT` interface instead of `*testing.T` struct in `go.opentelemetry.io/otel/sdk/metric/metricdata/metricdatatest`. (#4483) + +### Deprecated + +- The `NewMetricExporter` in `go.opentelemetry.io/otel/bridge/opencensus` was deprecated in `v0.35.0` (#3541). +  The deprecation notice format for the function has been corrected to trigger Go documentation and build tooling. (#4470) + +### Removed + +- Removed the deprecated `go.opentelemetry.io/otel/exporters/jaeger` package. (#4467) +- Removed the deprecated `go.opentelemetry.io/otel/example/jaeger` package. (#4467) +- Removed the deprecated `go.opentelemetry.io/otel/sdk/metric/aggregation` package. (#4468) +- Removed the deprecated internal packages in `go.opentelemetry.io/otel/exporters/otlp` and its sub-packages. (#4469) +- Dropped guaranteed support for versions of Go less than 1.20. (#4481) +  ## [1.17.0/0.40.0/0.0.5] 2023-08-28  ### Added @@ -2591,7 +2617,8 @@ It contains api and sdk for trace and meter.  - CircleCI build CI manifest files.  - CODEOWNERS file to track owners of this project. -[Unreleased]: https://github.com/open-telemetry/opentelemetry-go/compare/v1.17.0...HEAD +[Unreleased]: https://github.com/open-telemetry/opentelemetry-go/compare/v1.18.0...HEAD +[1.18.0/0.41.0/0.0.6]: https://github.com/open-telemetry/opentelemetry-go/releases/tag/v1.18.0  [1.17.0/0.40.0/0.0.5]: https://github.com/open-telemetry/opentelemetry-go/releases/tag/v1.17.0  [1.16.0/0.39.0]: https://github.com/open-telemetry/opentelemetry-go/releases/tag/v1.16.0  [1.16.0-rc.1/0.39.0-rc.1]: https://github.com/open-telemetry/opentelemetry-go/releases/tag/v1.16.0-rc.1 @@ -2663,5 +2690,6 @@ It contains api and sdk for trace and meter.  [Go 1.20]: https://go.dev/doc/go1.20  [Go 1.19]: https://go.dev/doc/go1.19  [Go 1.18]: https://go.dev/doc/go1.18 +[Go 1.19]: https://go.dev/doc/go1.19  [metric API]:https://pkg.go.dev/go.opentelemetry.io/otel/metric diff --git a/vendor/go.opentelemetry.io/otel/Makefile b/vendor/go.opentelemetry.io/otel/Makefile index c996d227b..5c311706b 100644 --- a/vendor/go.opentelemetry.io/otel/Makefile +++ b/vendor/go.opentelemetry.io/otel/Makefile @@ -210,7 +210,7 @@ go-mod-tidy/%: DIR=$*  go-mod-tidy/%: | crosslink  	@echo "$(GO) mod tidy in $(DIR)" \  		&& cd $(DIR) \ -		&& $(GO) mod tidy -compat=1.19 +		&& $(GO) mod tidy -compat=1.20  .PHONY: lint-modules  lint-modules: go-mod-tidy diff --git a/vendor/go.opentelemetry.io/otel/README.md b/vendor/go.opentelemetry.io/otel/README.md index 4e5531f30..634326ef8 100644 --- a/vendor/go.opentelemetry.io/otel/README.md +++ b/vendor/go.opentelemetry.io/otel/README.md @@ -55,19 +55,14 @@ Currently, this project supports the following environments.  |---------|------------|--------------|  | Ubuntu  | 1.21       | amd64        |  | Ubuntu  | 1.20       | amd64        | -| Ubuntu  | 1.19       | amd64        |  | Ubuntu  | 1.21       | 386          |  | Ubuntu  | 1.20       | 386          | -| Ubuntu  | 1.19       | 386          |  | MacOS   | 1.21       | amd64        |  | MacOS   | 1.20       | amd64        | -| MacOS   | 1.19       | amd64        |  | Windows | 1.21       | amd64        |  | Windows | 1.20       | amd64        | -| Windows | 1.19       | amd64        |  | Windows | 1.21       | 386          |  | Windows | 1.20       | 386          | -| Windows | 1.19       | 386          |  While this project should work for other systems, no compatibility guarantees  are made for those systems currently. diff --git a/vendor/go.opentelemetry.io/otel/exporters/otlp/otlptrace/version.go b/vendor/go.opentelemetry.io/otel/exporters/otlp/otlptrace/version.go index 1780b7165..661c146c2 100644 --- a/vendor/go.opentelemetry.io/otel/exporters/otlp/otlptrace/version.go +++ b/vendor/go.opentelemetry.io/otel/exporters/otlp/otlptrace/version.go @@ -16,5 +16,5 @@ package otlptrace // import "go.opentelemetry.io/otel/exporters/otlp/otlptrace"  // Version is the current release version of the OpenTelemetry OTLP trace exporter in use.  func Version() string { -	return "1.17.0" +	return "1.18.0"  } diff --git a/vendor/go.opentelemetry.io/otel/requirements.txt b/vendor/go.opentelemetry.io/otel/requirements.txt index 407f17489..ddff45468 100644 --- a/vendor/go.opentelemetry.io/otel/requirements.txt +++ b/vendor/go.opentelemetry.io/otel/requirements.txt @@ -1 +1 @@ -codespell==2.2.4 +codespell==2.2.5 diff --git a/vendor/go.opentelemetry.io/otel/sdk/version.go b/vendor/go.opentelemetry.io/otel/sdk/version.go index a99bdd38e..756bfed75 100644 --- a/vendor/go.opentelemetry.io/otel/sdk/version.go +++ b/vendor/go.opentelemetry.io/otel/sdk/version.go @@ -16,5 +16,5 @@ package sdk // import "go.opentelemetry.io/otel/sdk"  // Version is the current release version of the OpenTelemetry SDK in use.  func Version() string { -	return "1.17.0" +	return "1.18.0"  } diff --git a/vendor/go.opentelemetry.io/otel/version.go b/vendor/go.opentelemetry.io/otel/version.go index 3bce1b1e4..d5f274688 100644 --- a/vendor/go.opentelemetry.io/otel/version.go +++ b/vendor/go.opentelemetry.io/otel/version.go @@ -16,5 +16,5 @@ package otel // import "go.opentelemetry.io/otel"  // Version is the current release version of OpenTelemetry in use.  func Version() string { -	return "1.17.0" +	return "1.18.0"  } diff --git a/vendor/go.opentelemetry.io/otel/versions.yaml b/vendor/go.opentelemetry.io/otel/versions.yaml index 94f1c919e..a4952d493 100644 --- a/vendor/go.opentelemetry.io/otel/versions.yaml +++ b/vendor/go.opentelemetry.io/otel/versions.yaml @@ -14,19 +14,16 @@  module-sets:    stable-v1: -    version: v1.17.0 +    version: v1.18.0      modules:        - go.opentelemetry.io/otel        - go.opentelemetry.io/otel/bridge/opentracing        - go.opentelemetry.io/otel/bridge/opentracing/test        - go.opentelemetry.io/otel/example/fib -      - go.opentelemetry.io/otel/example/jaeger        - go.opentelemetry.io/otel/example/namedtracer        - go.opentelemetry.io/otel/example/otel-collector        - go.opentelemetry.io/otel/example/passthrough        - go.opentelemetry.io/otel/example/zipkin -      - go.opentelemetry.io/otel/exporters/jaeger -      - go.opentelemetry.io/otel/exporters/otlp/internal/retry        - go.opentelemetry.io/otel/exporters/otlp/otlptrace        - go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc        - go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp @@ -36,7 +33,7 @@ module-sets:        - go.opentelemetry.io/otel/sdk        - go.opentelemetry.io/otel/trace    experimental-metrics: -    version: v0.40.0 +    version: v0.41.0      modules:        - go.opentelemetry.io/otel/example/opencensus        - go.opentelemetry.io/otel/example/prometheus @@ -50,7 +47,7 @@ module-sets:        - go.opentelemetry.io/otel/bridge/opencensus/test        - go.opentelemetry.io/otel/example/view    experimental-schema: -    version: v0.0.5 +    version: v0.0.6      modules:        - go.opentelemetry.io/otel/schema  excluded-modules: diff --git a/vendor/google.golang.org/grpc/README.md b/vendor/google.golang.org/grpc/README.md index 0e6ae69a5..1bc92248c 100644 --- a/vendor/google.golang.org/grpc/README.md +++ b/vendor/google.golang.org/grpc/README.md @@ -14,21 +14,14 @@ RPC framework that puts mobile and HTTP/2 first. For more information see the  ## Installation -With [Go module][] support (Go 1.11+), simply add the following import +Simply add the following import to your code, and then `go [build|run|test]` +will automatically fetch the necessary dependencies: +  ```go  import "google.golang.org/grpc"  ``` -to your code, and then `go [build|run|test]` will automatically fetch the -necessary dependencies. - -Otherwise, to install the `grpc-go` package, run the following command: - -```console -$ go get -u google.golang.org/grpc -``` -  > **Note:** If you are trying to access `grpc-go` from **China**, see the  > [FAQ](#FAQ) below. @@ -56,15 +49,6 @@ To build Go code, there are several options:  - Set up a VPN and access google.golang.org through that. -- Without Go module support: `git clone` the repo manually: - -  ```sh -  git clone https://github.com/grpc/grpc-go.git $GOPATH/src/google.golang.org/grpc -  ``` - -  You will need to do the same for all of grpc's dependencies in `golang.org`, -  e.g. `golang.org/x/net`. -  - With Go module support: it is possible to use the `replace` feature of `go    mod` to create aliases for golang.org packages.  In your project's directory: @@ -76,33 +60,13 @@ To build Go code, there are several options:    ```    Again, this will need to be done for all transitive dependencies hosted on -  golang.org as well. For details, refer to [golang/go issue #28652](https://github.com/golang/go/issues/28652). +  golang.org as well. For details, refer to [golang/go issue +  #28652](https://github.com/golang/go/issues/28652).  ### Compiling error, undefined: grpc.SupportPackageIsVersion -#### If you are using Go modules: - -Ensure your gRPC-Go version is `require`d at the appropriate version in -the same module containing the generated `.pb.go` files.  For example, -`SupportPackageIsVersion6` needs `v1.27.0`, so in your `go.mod` file: - -```go -module <your module name> - -require ( -    google.golang.org/grpc v1.27.0 -) -``` - -#### If you are *not* using Go modules: - -Update the `proto` package, gRPC package, and rebuild the `.proto` files: - -```sh -go get -u github.com/golang/protobuf/{proto,protoc-gen-go} -go get -u google.golang.org/grpc -protoc --go_out=plugins=grpc:. *.proto -``` +Please update to the latest version of gRPC-Go using +`go get google.golang.org/grpc`.  ### How to turn on logging @@ -121,9 +85,11 @@ possible reasons, including:   1. mis-configured transport credentials, connection failed on handshaking   1. bytes disrupted, possibly by a proxy in between   1. server shutdown - 1. Keepalive parameters caused connection shutdown, for example if you have configured -    your server to terminate connections regularly to [trigger DNS lookups](https://github.com/grpc/grpc-go/issues/3170#issuecomment-552517779). -    If this is the case, you may want to increase your [MaxConnectionAgeGrace](https://pkg.go.dev/google.golang.org/grpc/keepalive?tab=doc#ServerParameters), + 1. Keepalive parameters caused connection shutdown, for example if you have +    configured your server to terminate connections regularly to [trigger DNS +    lookups](https://github.com/grpc/grpc-go/issues/3170#issuecomment-552517779). +    If this is the case, you may want to increase your +    [MaxConnectionAgeGrace](https://pkg.go.dev/google.golang.org/grpc/keepalive?tab=doc#ServerParameters),      to allow longer RPC calls to finish.  It can be tricky to debug this because the error happens on the client side but diff --git a/vendor/google.golang.org/grpc/attributes/attributes.go b/vendor/google.golang.org/grpc/attributes/attributes.go index 49712aca3..712fef4d0 100644 --- a/vendor/google.golang.org/grpc/attributes/attributes.go +++ b/vendor/google.golang.org/grpc/attributes/attributes.go @@ -34,26 +34,26 @@ import (  // key/value pairs.  Keys must be hashable, and users should define their own  // types for keys.  Values should not be modified after they are added to an  // Attributes or if they were received from one.  If values implement 'Equal(o -// interface{}) bool', it will be called by (*Attributes).Equal to determine -// whether two values with the same key should be considered equal. +// any) bool', it will be called by (*Attributes).Equal to determine whether +// two values with the same key should be considered equal.  type Attributes struct { -	m map[interface{}]interface{} +	m map[any]any  }  // New returns a new Attributes containing the key/value pair. -func New(key, value interface{}) *Attributes { -	return &Attributes{m: map[interface{}]interface{}{key: value}} +func New(key, value any) *Attributes { +	return &Attributes{m: map[any]any{key: value}}  }  // WithValue returns a new Attributes containing the previous keys and values  // and the new key/value pair.  If the same key appears multiple times, the  // last value overwrites all previous values for that key.  To remove an  // existing key, use a nil value.  value should not be modified later. -func (a *Attributes) WithValue(key, value interface{}) *Attributes { +func (a *Attributes) WithValue(key, value any) *Attributes {  	if a == nil {  		return New(key, value)  	} -	n := &Attributes{m: make(map[interface{}]interface{}, len(a.m)+1)} +	n := &Attributes{m: make(map[any]any, len(a.m)+1)}  	for k, v := range a.m {  		n.m[k] = v  	} @@ -63,20 +63,19 @@ func (a *Attributes) WithValue(key, value interface{}) *Attributes {  // Value returns the value associated with these attributes for key, or nil if  // no value is associated with key.  The returned value should not be modified. -func (a *Attributes) Value(key interface{}) interface{} { +func (a *Attributes) Value(key any) any {  	if a == nil {  		return nil  	}  	return a.m[key]  } -// Equal returns whether a and o are equivalent.  If 'Equal(o interface{}) -// bool' is implemented for a value in the attributes, it is called to -// determine if the value matches the one stored in the other attributes.  If -// Equal is not implemented, standard equality is used to determine if the two -// values are equal. Note that some types (e.g. maps) aren't comparable by -// default, so they must be wrapped in a struct, or in an alias type, with Equal -// defined. +// Equal returns whether a and o are equivalent.  If 'Equal(o any) bool' is +// implemented for a value in the attributes, it is called to determine if the +// value matches the one stored in the other attributes.  If Equal is not +// implemented, standard equality is used to determine if the two values are +// equal. Note that some types (e.g. maps) aren't comparable by default, so +// they must be wrapped in a struct, or in an alias type, with Equal defined.  func (a *Attributes) Equal(o *Attributes) bool {  	if a == nil && o == nil {  		return true @@ -93,7 +92,7 @@ func (a *Attributes) Equal(o *Attributes) bool {  			// o missing element of a  			return false  		} -		if eq, ok := v.(interface{ Equal(o interface{}) bool }); ok { +		if eq, ok := v.(interface{ Equal(o any) bool }); ok {  			if !eq.Equal(ov) {  				return false  			} @@ -122,7 +121,7 @@ func (a *Attributes) String() string {  	return sb.String()  } -func str(x interface{}) string { +func str(x any) string {  	if v, ok := x.(fmt.Stringer); ok {  		return v.String()  	} else if v, ok := x.(string); ok { diff --git a/vendor/google.golang.org/grpc/balancer/balancer.go b/vendor/google.golang.org/grpc/balancer/balancer.go index 8f00523c0..b6377f445 100644 --- a/vendor/google.golang.org/grpc/balancer/balancer.go +++ b/vendor/google.golang.org/grpc/balancer/balancer.go @@ -105,8 +105,8 @@ type SubConn interface {  	//  	// This will trigger a state transition for the SubConn.  	// -	// Deprecated: This method is now part of the ClientConn interface and will -	// eventually be removed from here. +	// Deprecated: this method will be removed.  Create new SubConns for new +	// addresses instead.  	UpdateAddresses([]resolver.Address)  	// Connect starts the connecting for this SubConn.  	Connect() @@ -115,6 +115,13 @@ type SubConn interface {  	// creates a new one and returns it.  Returns a close function which must  	// be called when the Producer is no longer needed.  	GetOrBuildProducer(ProducerBuilder) (p Producer, close func()) +	// Shutdown shuts down the SubConn gracefully.  Any started RPCs will be +	// allowed to complete.  No future calls should be made on the SubConn. +	// One final state update will be delivered to the StateListener (or +	// UpdateSubConnState; deprecated) with ConnectivityState of Shutdown to +	// indicate the shutdown operation.  This may be delivered before +	// in-progress RPCs are complete and the actual connection is closed. +	Shutdown()  }  // NewSubConnOptions contains options to create new SubConn. @@ -129,6 +136,11 @@ type NewSubConnOptions struct {  	// HealthCheckEnabled indicates whether health check service should be  	// enabled on this SubConn  	HealthCheckEnabled bool +	// StateListener is called when the state of the subconn changes.  If nil, +	// Balancer.UpdateSubConnState will be called instead.  Will never be +	// invoked until after Connect() is called on the SubConn created with +	// these options. +	StateListener func(SubConnState)  }  // State contains the balancer's state relevant to the gRPC ClientConn. @@ -150,16 +162,24 @@ type ClientConn interface {  	// NewSubConn is called by balancer to create a new SubConn.  	// It doesn't block and wait for the connections to be established.  	// Behaviors of the SubConn can be controlled by options. +	// +	// Deprecated: please be aware that in a future version, SubConns will only +	// support one address per SubConn.  	NewSubConn([]resolver.Address, NewSubConnOptions) (SubConn, error)  	// RemoveSubConn removes the SubConn from ClientConn.  	// The SubConn will be shutdown. +	// +	// Deprecated: use SubConn.Shutdown instead.  	RemoveSubConn(SubConn)  	// UpdateAddresses updates the addresses used in the passed in SubConn.  	// gRPC checks if the currently connected address is still in the new list.  	// If so, the connection will be kept. Else, the connection will be  	// gracefully closed, and a new connection will be created.  	// -	// This will trigger a state transition for the SubConn. +	// This may trigger a state transition for the SubConn. +	// +	// Deprecated: this method will be removed.  Create new SubConns for new +	// addresses instead.  	UpdateAddresses(SubConn, []resolver.Address)  	// UpdateState notifies gRPC that the balancer's internal state has @@ -250,7 +270,7 @@ type DoneInfo struct {  	// trailing metadata.  	//  	// The only supported type now is *orca_v3.LoadReport. -	ServerLoad interface{} +	ServerLoad any  }  var ( @@ -343,9 +363,13 @@ type Balancer interface {  	ResolverError(error)  	// UpdateSubConnState is called by gRPC when the state of a SubConn  	// changes. +	// +	// Deprecated: Use NewSubConnOptions.StateListener when creating the +	// SubConn instead.  	UpdateSubConnState(SubConn, SubConnState) -	// Close closes the balancer. The balancer is not required to call -	// ClientConn.RemoveSubConn for its existing SubConns. +	// Close closes the balancer. The balancer is not currently required to +	// call SubConn.Shutdown for its existing SubConns; however, this will be +	// required in a future release, so it is recommended.  	Close()  } @@ -390,15 +414,14 @@ var ErrBadResolverState = errors.New("bad resolver state")  type ProducerBuilder interface {  	// Build creates a Producer.  The first parameter is always a  	// grpc.ClientConnInterface (a type to allow creating RPCs/streams on the -	// associated SubConn), but is declared as interface{} to avoid a -	// dependency cycle.  Should also return a close function that will be -	// called when all references to the Producer have been given up. -	Build(grpcClientConnInterface interface{}) (p Producer, close func()) +	// associated SubConn), but is declared as `any` to avoid a dependency +	// cycle.  Should also return a close function that will be called when all +	// references to the Producer have been given up. +	Build(grpcClientConnInterface any) (p Producer, close func())  }  // A Producer is a type shared among potentially many consumers.  It is  // associated with a SubConn, and an implementation will typically contain  // other methods to provide additional functionality, e.g. configuration or  // subscription registration. -type Producer interface { -} +type Producer any diff --git a/vendor/google.golang.org/grpc/balancer/base/balancer.go b/vendor/google.golang.org/grpc/balancer/base/balancer.go index 3929c26d3..a7f1eeec8 100644 --- a/vendor/google.golang.org/grpc/balancer/base/balancer.go +++ b/vendor/google.golang.org/grpc/balancer/base/balancer.go @@ -105,7 +105,12 @@ func (b *baseBalancer) UpdateClientConnState(s balancer.ClientConnState) error {  		addrsSet.Set(a, nil)  		if _, ok := b.subConns.Get(a); !ok {  			// a is a new address (not existing in b.subConns). -			sc, err := b.cc.NewSubConn([]resolver.Address{a}, balancer.NewSubConnOptions{HealthCheckEnabled: b.config.HealthCheck}) +			var sc balancer.SubConn +			opts := balancer.NewSubConnOptions{ +				HealthCheckEnabled: b.config.HealthCheck, +				StateListener:      func(scs balancer.SubConnState) { b.updateSubConnState(sc, scs) }, +			} +			sc, err := b.cc.NewSubConn([]resolver.Address{a}, opts)  			if err != nil {  				logger.Warningf("base.baseBalancer: failed to create new SubConn: %v", err)  				continue @@ -121,10 +126,10 @@ func (b *baseBalancer) UpdateClientConnState(s balancer.ClientConnState) error {  		sc := sci.(balancer.SubConn)  		// a was removed by resolver.  		if _, ok := addrsSet.Get(a); !ok { -			b.cc.RemoveSubConn(sc) +			sc.Shutdown()  			b.subConns.Delete(a)  			// Keep the state of this sc in b.scStates until sc's state becomes Shutdown. -			// The entry will be deleted in UpdateSubConnState. +			// The entry will be deleted in updateSubConnState.  		}  	}  	// If resolver state contains no addresses, return an error so ClientConn @@ -177,7 +182,12 @@ func (b *baseBalancer) regeneratePicker() {  	b.picker = b.pickerBuilder.Build(PickerBuildInfo{ReadySCs: readySCs})  } +// UpdateSubConnState is a nop because a StateListener is always set in NewSubConn.  func (b *baseBalancer) UpdateSubConnState(sc balancer.SubConn, state balancer.SubConnState) { +	logger.Errorf("base.baseBalancer: UpdateSubConnState(%v, %+v) called unexpectedly", sc, state) +} + +func (b *baseBalancer) updateSubConnState(sc balancer.SubConn, state balancer.SubConnState) {  	s := state.ConnectivityState  	if logger.V(2) {  		logger.Infof("base.baseBalancer: handle SubConn state change: %p, %v", sc, s) @@ -204,8 +214,8 @@ func (b *baseBalancer) UpdateSubConnState(sc balancer.SubConn, state balancer.Su  	case connectivity.Idle:  		sc.Connect()  	case connectivity.Shutdown: -		// When an address was removed by resolver, b called RemoveSubConn but -		// kept the sc's state in scStates. Remove state for this sc here. +		// When an address was removed by resolver, b called Shutdown but kept +		// the sc's state in scStates. Remove state for this sc here.  		delete(b.scStates, sc)  	case connectivity.TransientFailure:  		// Save error to be reported via picker. @@ -226,7 +236,7 @@ func (b *baseBalancer) UpdateSubConnState(sc balancer.SubConn, state balancer.Su  }  // Close is a nop because base balancer doesn't have internal state to clean up, -// and it doesn't need to call RemoveSubConn for the SubConns. +// and it doesn't need to call Shutdown for the SubConns.  func (b *baseBalancer) Close() {  } diff --git a/vendor/google.golang.org/grpc/balancer_conn_wrappers.go b/vendor/google.golang.org/grpc/balancer_conn_wrappers.go index 04b9ad411..a4411c22b 100644 --- a/vendor/google.golang.org/grpc/balancer_conn_wrappers.go +++ b/vendor/google.golang.org/grpc/balancer_conn_wrappers.go @@ -99,20 +99,6 @@ func (ccb *ccBalancerWrapper) updateClientConnState(ccs *balancer.ClientConnStat  	// lock held. But the lock guards only the scheduling part. The actual  	// callback is called asynchronously without the lock being held.  	ok := ccb.serializer.Schedule(func(_ context.Context) { -		// If the addresses specified in the update contain addresses of type -		// "grpclb" and the selected LB policy is not "grpclb", these addresses -		// will be filtered out and ccs will be modified with the updated -		// address list. -		if ccb.curBalancerName != grpclbName { -			var addrs []resolver.Address -			for _, addr := range ccs.ResolverState.Addresses { -				if addr.Type == resolver.GRPCLB { -					continue -				} -				addrs = append(addrs, addr) -			} -			ccs.ResolverState.Addresses = addrs -		}  		errCh <- ccb.balancer.UpdateClientConnState(*ccs)  	})  	if !ok { @@ -139,7 +125,9 @@ func (ccb *ccBalancerWrapper) updateClientConnState(ccs *balancer.ClientConnStat  func (ccb *ccBalancerWrapper) updateSubConnState(sc balancer.SubConn, s connectivity.State, err error) {  	ccb.mu.Lock()  	ccb.serializer.Schedule(func(_ context.Context) { -		ccb.balancer.UpdateSubConnState(sc, balancer.SubConnState{ConnectivityState: s, ConnectionError: err}) +		// Even though it is optional for balancers, gracefulswitch ensures +		// opts.StateListener is set, so this cannot ever be nil. +		sc.(*acBalancerWrapper).stateListener(balancer.SubConnState{ConnectivityState: s, ConnectionError: err})  	})  	ccb.mu.Unlock()  } @@ -221,7 +209,7 @@ func (ccb *ccBalancerWrapper) closeBalancer(m ccbMode) {  	}  	ccb.mode = m -	done := ccb.serializer.Done +	done := ccb.serializer.Done()  	b := ccb.balancer  	ok := ccb.serializer.Schedule(func(_ context.Context) {  		// Close the serializer to ensure that no more calls from gRPC are sent @@ -238,11 +226,9 @@ func (ccb *ccBalancerWrapper) closeBalancer(m ccbMode) {  	}  	ccb.mu.Unlock() -	// Give enqueued callbacks a chance to finish. +	// Give enqueued callbacks a chance to finish before closing the balancer.  	<-done -	// Spawn a goroutine to close the balancer (since it may block trying to -	// cleanup all allocated resources) and return early. -	go b.Close() +	b.Close()  }  // exitIdleMode is invoked by grpc when the channel exits idle mode either @@ -314,29 +300,19 @@ func (ccb *ccBalancerWrapper) NewSubConn(addrs []resolver.Address, opts balancer  		channelz.Warningf(logger, ccb.cc.channelzID, "acBalancerWrapper: NewSubConn: failed to newAddrConn: %v", err)  		return nil, err  	} -	acbw := &acBalancerWrapper{ac: ac, producers: make(map[balancer.ProducerBuilder]*refCountedProducer)} +	acbw := &acBalancerWrapper{ +		ccb:           ccb, +		ac:            ac, +		producers:     make(map[balancer.ProducerBuilder]*refCountedProducer), +		stateListener: opts.StateListener, +	}  	ac.acbw = acbw  	return acbw, nil  }  func (ccb *ccBalancerWrapper) RemoveSubConn(sc balancer.SubConn) { -	if ccb.isIdleOrClosed() { -		// It it safe to ignore this call when the balancer is closed or in idle -		// because the ClientConn takes care of closing the connections. -		// -		// Not returning early from here when the balancer is closed or in idle -		// leads to a deadlock though, because of the following sequence of -		// calls when holding cc.mu: -		// cc.exitIdleMode --> ccb.enterIdleMode --> gsw.Close --> -		// ccb.RemoveAddrConn --> cc.removeAddrConn -		return -	} - -	acbw, ok := sc.(*acBalancerWrapper) -	if !ok { -		return -	} -	ccb.cc.removeAddrConn(acbw.ac, errConnDrain) +	// The graceful switch balancer will never call this. +	logger.Errorf("ccb RemoveSubConn(%v) called unexpectedly, sc")  }  func (ccb *ccBalancerWrapper) UpdateAddresses(sc balancer.SubConn, addrs []resolver.Address) { @@ -380,7 +356,9 @@ func (ccb *ccBalancerWrapper) Target() string {  // acBalancerWrapper is a wrapper on top of ac for balancers.  // It implements balancer.SubConn interface.  type acBalancerWrapper struct { -	ac *addrConn // read-only +	ac            *addrConn          // read-only +	ccb           *ccBalancerWrapper // read-only +	stateListener func(balancer.SubConnState)  	mu        sync.Mutex  	producers map[balancer.ProducerBuilder]*refCountedProducer @@ -398,6 +376,23 @@ func (acbw *acBalancerWrapper) Connect() {  	go acbw.ac.connect()  } +func (acbw *acBalancerWrapper) Shutdown() { +	ccb := acbw.ccb +	if ccb.isIdleOrClosed() { +		// It it safe to ignore this call when the balancer is closed or in idle +		// because the ClientConn takes care of closing the connections. +		// +		// Not returning early from here when the balancer is closed or in idle +		// leads to a deadlock though, because of the following sequence of +		// calls when holding cc.mu: +		// cc.exitIdleMode --> ccb.enterIdleMode --> gsw.Close --> +		// ccb.RemoveAddrConn --> cc.removeAddrConn +		return +	} + +	ccb.cc.removeAddrConn(acbw.ac, errConnDrain) +} +  // NewStream begins a streaming RPC on the addrConn.  If the addrConn is not  // ready, blocks until it is or ctx expires.  Returns an error when the context  // expires or the addrConn is shut down. @@ -411,7 +406,7 @@ func (acbw *acBalancerWrapper) NewStream(ctx context.Context, desc *StreamDesc,  // Invoke performs a unary RPC.  If the addrConn is not ready, returns  // errSubConnNotReady. -func (acbw *acBalancerWrapper) Invoke(ctx context.Context, method string, args interface{}, reply interface{}, opts ...CallOption) error { +func (acbw *acBalancerWrapper) Invoke(ctx context.Context, method string, args any, reply any, opts ...CallOption) error {  	cs, err := acbw.NewStream(ctx, unaryStreamDesc, method, opts...)  	if err != nil {  		return err diff --git a/vendor/google.golang.org/grpc/binarylog/grpc_binarylog_v1/binarylog.pb.go b/vendor/google.golang.org/grpc/binarylog/grpc_binarylog_v1/binarylog.pb.go index ec2c2fa14..595480112 100644 --- a/vendor/google.golang.org/grpc/binarylog/grpc_binarylog_v1/binarylog.pb.go +++ b/vendor/google.golang.org/grpc/binarylog/grpc_binarylog_v1/binarylog.pb.go @@ -18,7 +18,7 @@  // Code generated by protoc-gen-go. DO NOT EDIT.  // versions: -// 	protoc-gen-go v1.30.0 +// 	protoc-gen-go v1.31.0  // 	protoc        v4.22.0  // source: grpc/binlog/v1/binarylog.proto diff --git a/vendor/google.golang.org/grpc/call.go b/vendor/google.golang.org/grpc/call.go index e6a1dc5d7..a67a3db02 100644 --- a/vendor/google.golang.org/grpc/call.go +++ b/vendor/google.golang.org/grpc/call.go @@ -26,11 +26,11 @@ import (  // received.  This is typically called by generated code.  //  // All errors returned by Invoke are compatible with the status package. -func (cc *ClientConn) Invoke(ctx context.Context, method string, args, reply interface{}, opts ...CallOption) error { -	if err := cc.idlenessMgr.onCallBegin(); err != nil { +func (cc *ClientConn) Invoke(ctx context.Context, method string, args, reply any, opts ...CallOption) error { +	if err := cc.idlenessMgr.OnCallBegin(); err != nil {  		return err  	} -	defer cc.idlenessMgr.onCallEnd() +	defer cc.idlenessMgr.OnCallEnd()  	// allow interceptor to see all applicable call options, which means those  	// configured as defaults from dial option as well as per-call options @@ -61,13 +61,13 @@ func combine(o1 []CallOption, o2 []CallOption) []CallOption {  // received.  This is typically called by generated code.  //  // DEPRECATED: Use ClientConn.Invoke instead. -func Invoke(ctx context.Context, method string, args, reply interface{}, cc *ClientConn, opts ...CallOption) error { +func Invoke(ctx context.Context, method string, args, reply any, cc *ClientConn, opts ...CallOption) error {  	return cc.Invoke(ctx, method, args, reply, opts...)  }  var unaryStreamDesc = &StreamDesc{ServerStreams: false, ClientStreams: false} -func invoke(ctx context.Context, method string, req, reply interface{}, cc *ClientConn, opts ...CallOption) error { +func invoke(ctx context.Context, method string, req, reply any, cc *ClientConn, opts ...CallOption) error {  	cs, err := newClientStream(ctx, unaryStreamDesc, cc, method, opts...)  	if err != nil {  		return err diff --git a/vendor/google.golang.org/grpc/clientconn.go b/vendor/google.golang.org/grpc/clientconn.go index bfd7555a8..d53d91d5d 100644 --- a/vendor/google.golang.org/grpc/clientconn.go +++ b/vendor/google.golang.org/grpc/clientconn.go @@ -34,9 +34,11 @@ import (  	"google.golang.org/grpc/codes"  	"google.golang.org/grpc/connectivity"  	"google.golang.org/grpc/credentials" +	"google.golang.org/grpc/internal"  	"google.golang.org/grpc/internal/backoff"  	"google.golang.org/grpc/internal/channelz"  	"google.golang.org/grpc/internal/grpcsync" +	"google.golang.org/grpc/internal/idle"  	"google.golang.org/grpc/internal/pretty"  	iresolver "google.golang.org/grpc/internal/resolver"  	"google.golang.org/grpc/internal/transport" @@ -54,8 +56,6 @@ import (  const (  	// minimum time to give a connection to complete  	minConnectTimeout = 20 * time.Second -	// must match grpclbName in grpclb/grpclb.go -	grpclbName = "grpclb"  )  var ( @@ -138,7 +138,6 @@ func (dcs *defaultConfigSelector) SelectConfig(rpcInfo iresolver.RPCInfo) (*ires  func DialContext(ctx context.Context, target string, opts ...DialOption) (conn *ClientConn, err error) {  	cc := &ClientConn{  		target: target, -		csMgr:  &connectivityStateManager{},  		conns:  make(map[*addrConn]struct{}),  		dopts:  defaultDialOptions(),  		czData: new(channelzData), @@ -191,6 +190,8 @@ func DialContext(ctx context.Context, target string, opts ...DialOption) (conn *  	// Register ClientConn with channelz.  	cc.channelzRegistration(target) +	cc.csMgr = newConnectivityStateManager(cc.ctx, cc.channelzID) +  	if err := cc.validateTransportCredentials(); err != nil {  		return nil, err  	} @@ -266,7 +267,7 @@ func DialContext(ctx context.Context, target string, opts ...DialOption) (conn *  	// Configure idleness support with configured idle timeout or default idle  	// timeout duration. Idleness can be explicitly disabled by the user, by  	// setting the dial option to 0. -	cc.idlenessMgr = newIdlenessManager(cc, cc.dopts.idleTimeout) +	cc.idlenessMgr = idle.NewManager(idle.ManagerOptions{Enforcer: (*idler)(cc), Timeout: cc.dopts.idleTimeout, Logger: logger})  	// Return early for non-blocking dials.  	if !cc.dopts.block { @@ -317,6 +318,16 @@ func (cc *ClientConn) addTraceEvent(msg string) {  	channelz.AddTraceEvent(logger, cc.channelzID, 0, ted)  } +type idler ClientConn + +func (i *idler) EnterIdleMode() error { +	return (*ClientConn)(i).enterIdleMode() +} + +func (i *idler) ExitIdleMode() error { +	return (*ClientConn)(i).exitIdleMode() +} +  // exitIdleMode moves the channel out of idle mode by recreating the name  // resolver and load balancer.  func (cc *ClientConn) exitIdleMode() error { @@ -327,7 +338,7 @@ func (cc *ClientConn) exitIdleMode() error {  	}  	if cc.idlenessState != ccIdlenessStateIdle {  		cc.mu.Unlock() -		logger.Info("ClientConn asked to exit idle mode when not in idle mode") +		channelz.Infof(logger, cc.channelzID, "ClientConn asked to exit idle mode, current mode is %v", cc.idlenessState)  		return nil  	} @@ -350,7 +361,7 @@ func (cc *ClientConn) exitIdleMode() error {  	cc.idlenessState = ccIdlenessStateExitingIdle  	exitedIdle := false  	if cc.blockingpicker == nil { -		cc.blockingpicker = newPickerWrapper() +		cc.blockingpicker = newPickerWrapper(cc.dopts.copts.StatsHandlers)  	} else {  		cc.blockingpicker.exitIdleMode()  		exitedIdle = true @@ -398,7 +409,8 @@ func (cc *ClientConn) enterIdleMode() error {  		return ErrClientConnClosing  	}  	if cc.idlenessState != ccIdlenessStateActive { -		logger.Error("ClientConn asked to enter idle mode when not active") +		channelz.Errorf(logger, cc.channelzID, "ClientConn asked to enter idle mode, current mode is %v", cc.idlenessState) +		cc.mu.Unlock()  		return nil  	} @@ -475,7 +487,6 @@ func (cc *ClientConn) validateTransportCredentials() error {  func (cc *ClientConn) channelzRegistration(target string) {  	cc.channelzID = channelz.RegisterChannel(&channelzChannel{cc}, cc.dopts.channelzParentID, target)  	cc.addTraceEvent("created") -	cc.csMgr.channelzID = cc.channelzID  }  // chainUnaryClientInterceptors chains all unary client interceptors into one. @@ -492,7 +503,7 @@ func chainUnaryClientInterceptors(cc *ClientConn) {  	} else if len(interceptors) == 1 {  		chainedInt = interceptors[0]  	} else { -		chainedInt = func(ctx context.Context, method string, req, reply interface{}, cc *ClientConn, invoker UnaryInvoker, opts ...CallOption) error { +		chainedInt = func(ctx context.Context, method string, req, reply any, cc *ClientConn, invoker UnaryInvoker, opts ...CallOption) error {  			return interceptors[0](ctx, method, req, reply, cc, getChainUnaryInvoker(interceptors, 0, invoker), opts...)  		}  	} @@ -504,7 +515,7 @@ func getChainUnaryInvoker(interceptors []UnaryClientInterceptor, curr int, final  	if curr == len(interceptors)-1 {  		return finalInvoker  	} -	return func(ctx context.Context, method string, req, reply interface{}, cc *ClientConn, opts ...CallOption) error { +	return func(ctx context.Context, method string, req, reply any, cc *ClientConn, opts ...CallOption) error {  		return interceptors[curr+1](ctx, method, req, reply, cc, getChainUnaryInvoker(interceptors, curr+1, finalInvoker), opts...)  	}  } @@ -540,13 +551,27 @@ func getChainStreamer(interceptors []StreamClientInterceptor, curr int, finalStr  	}  } +// newConnectivityStateManager creates an connectivityStateManager with +// the specified id. +func newConnectivityStateManager(ctx context.Context, id *channelz.Identifier) *connectivityStateManager { +	return &connectivityStateManager{ +		channelzID: id, +		pubSub:     grpcsync.NewPubSub(ctx), +	} +} +  // connectivityStateManager keeps the connectivity.State of ClientConn.  // This struct will eventually be exported so the balancers can access it. +// +// TODO: If possible, get rid of the `connectivityStateManager` type, and +// provide this functionality using the `PubSub`, to avoid keeping track of +// the connectivity state at two places.  type connectivityStateManager struct {  	mu         sync.Mutex  	state      connectivity.State  	notifyChan chan struct{}  	channelzID *channelz.Identifier +	pubSub     *grpcsync.PubSub  }  // updateState updates the connectivity.State of ClientConn. @@ -562,6 +587,8 @@ func (csm *connectivityStateManager) updateState(state connectivity.State) {  		return  	}  	csm.state = state +	csm.pubSub.Publish(state) +  	channelz.Infof(logger, csm.channelzID, "Channel Connectivity change to %v", state)  	if csm.notifyChan != nil {  		// There are other goroutines waiting on this channel. @@ -591,7 +618,7 @@ func (csm *connectivityStateManager) getNotifyChan() <-chan struct{} {  type ClientConnInterface interface {  	// Invoke performs a unary RPC and returns after the response is received  	// into reply. -	Invoke(ctx context.Context, method string, args interface{}, reply interface{}, opts ...CallOption) error +	Invoke(ctx context.Context, method string, args any, reply any, opts ...CallOption) error  	// NewStream begins a streaming RPC.  	NewStream(ctx context.Context, desc *StreamDesc, method string, opts ...CallOption) (ClientStream, error)  } @@ -623,7 +650,7 @@ type ClientConn struct {  	channelzID      *channelz.Identifier // Channelz identifier for the channel.  	resolverBuilder resolver.Builder     // See parseTargetAndFindResolver().  	balancerWrapper *ccBalancerWrapper   // Uses gracefulswitch.balancer underneath. -	idlenessMgr     idlenessManager +	idlenessMgr     idle.Manager  	// The following provide their own synchronization, and therefore don't  	// require cc.mu to be held to access them. @@ -669,6 +696,19 @@ const (  	ccIdlenessStateExitingIdle  ) +func (s ccIdlenessState) String() string { +	switch s { +	case ccIdlenessStateActive: +		return "active" +	case ccIdlenessStateIdle: +		return "idle" +	case ccIdlenessStateExitingIdle: +		return "exitingIdle" +	default: +		return "unknown" +	} +} +  // WaitForStateChange waits until the connectivity.State of ClientConn changes from sourceState or  // ctx expires. A true value is returned in former case and false in latter.  // @@ -760,6 +800,10 @@ func init() {  		panic(fmt.Sprintf("impossible error parsing empty service config: %v", cfg.Err))  	}  	emptyServiceConfig = cfg.Config.(*ServiceConfig) + +	internal.SubscribeToConnectivityStateChanges = func(cc *ClientConn, s grpcsync.Subscriber) func() { +		return cc.csMgr.pubSub.Subscribe(s) +	}  }  func (cc *ClientConn) maybeApplyDefaultServiceConfig(addrs []resolver.Address) { @@ -1153,23 +1197,13 @@ func (cc *ClientConn) applyServiceConfigAndBalancer(sc *ServiceConfig, configSel  	}  	var newBalancerName string -	if cc.sc != nil && cc.sc.lbConfig != nil { +	if cc.sc == nil || (cc.sc.lbConfig == nil && cc.sc.LB == nil) { +		// No service config or no LB policy specified in config. +		newBalancerName = PickFirstBalancerName +	} else if cc.sc.lbConfig != nil {  		newBalancerName = cc.sc.lbConfig.name -	} else { -		var isGRPCLB bool -		for _, a := range addrs { -			if a.Type == resolver.GRPCLB { -				isGRPCLB = true -				break -			} -		} -		if isGRPCLB { -			newBalancerName = grpclbName -		} else if cc.sc != nil && cc.sc.LB != nil { -			newBalancerName = *cc.sc.LB -		} else { -			newBalancerName = PickFirstBalancerName -		} +	} else { // cc.sc.LB != nil +		newBalancerName = *cc.sc.LB  	}  	cc.balancerWrapper.switchTo(newBalancerName)  } @@ -1208,7 +1242,10 @@ func (cc *ClientConn) ResetConnectBackoff() {  // Close tears down the ClientConn and all underlying connections.  func (cc *ClientConn) Close() error { -	defer cc.cancel() +	defer func() { +		cc.cancel() +		<-cc.csMgr.pubSub.Done() +	}()  	cc.mu.Lock()  	if cc.conns == nil { @@ -1242,7 +1279,7 @@ func (cc *ClientConn) Close() error {  		rWrapper.close()  	}  	if idlenessMgr != nil { -		idlenessMgr.close() +		idlenessMgr.Close()  	}  	for ac := range conns { @@ -1352,12 +1389,14 @@ func (ac *addrConn) resetTransport() {  	if err := ac.tryAllAddrs(acCtx, addrs, connectDeadline); err != nil {  		ac.cc.resolveNow(resolver.ResolveNowOptions{}) -		// After exhausting all addresses, the addrConn enters -		// TRANSIENT_FAILURE. +		ac.mu.Lock()  		if acCtx.Err() != nil { +			// addrConn was torn down. +			ac.mu.Unlock()  			return  		} -		ac.mu.Lock() +		// After exhausting all addresses, the addrConn enters +		// TRANSIENT_FAILURE.  		ac.updateConnectivityState(connectivity.TransientFailure, err)  		// Backoff. @@ -1553,7 +1592,7 @@ func (ac *addrConn) startHealthCheck(ctx context.Context) {  	// Set up the health check helper functions.  	currentTr := ac.transport -	newStream := func(method string) (interface{}, error) { +	newStream := func(method string) (any, error) {  		ac.mu.Lock()  		if ac.transport != currentTr {  			ac.mu.Unlock() diff --git a/vendor/google.golang.org/grpc/codec.go b/vendor/google.golang.org/grpc/codec.go index 129776547..411e3dfd4 100644 --- a/vendor/google.golang.org/grpc/codec.go +++ b/vendor/google.golang.org/grpc/codec.go @@ -27,8 +27,8 @@ import (  // omits the name/string, which vary between the two and are not needed for  // anything besides the registry in the encoding package.  type baseCodec interface { -	Marshal(v interface{}) ([]byte, error) -	Unmarshal(data []byte, v interface{}) error +	Marshal(v any) ([]byte, error) +	Unmarshal(data []byte, v any) error  }  var _ baseCodec = Codec(nil) @@ -41,9 +41,9 @@ var _ baseCodec = encoding.Codec(nil)  // Deprecated: use encoding.Codec instead.  type Codec interface {  	// Marshal returns the wire format of v. -	Marshal(v interface{}) ([]byte, error) +	Marshal(v any) ([]byte, error)  	// Unmarshal parses the wire format into v. -	Unmarshal(data []byte, v interface{}) error +	Unmarshal(data []byte, v any) error  	// String returns the name of the Codec implementation.  This is unused by  	// gRPC.  	String() string diff --git a/vendor/google.golang.org/grpc/dialoptions.go b/vendor/google.golang.org/grpc/dialoptions.go index 23ea95237..1fd0d5c12 100644 --- a/vendor/google.golang.org/grpc/dialoptions.go +++ b/vendor/google.golang.org/grpc/dialoptions.go @@ -139,6 +139,20 @@ func newJoinDialOption(opts ...DialOption) DialOption {  	return &joinDialOption{opts: opts}  } +// WithSharedWriteBuffer allows reusing per-connection transport write buffer. +// If this option is set to true every connection will release the buffer after +// flushing the data on the wire. +// +// # Experimental +// +// Notice: This API is EXPERIMENTAL and may be changed or removed in a +// later release. +func WithSharedWriteBuffer(val bool) DialOption { +	return newFuncDialOption(func(o *dialOptions) { +		o.copts.SharedWriteBuffer = val +	}) +} +  // WithWriteBufferSize determines how much data can be batched before doing a  // write on the wire. The corresponding memory allocation for this buffer will  // be twice the size to keep syscalls low. The default value for this buffer is diff --git a/vendor/google.golang.org/grpc/encoding/encoding.go b/vendor/google.golang.org/grpc/encoding/encoding.go index 07a586135..69d5580b6 100644 --- a/vendor/google.golang.org/grpc/encoding/encoding.go +++ b/vendor/google.golang.org/grpc/encoding/encoding.go @@ -90,9 +90,9 @@ func GetCompressor(name string) Compressor {  // methods can be called from concurrent goroutines.  type Codec interface {  	// Marshal returns the wire format of v. -	Marshal(v interface{}) ([]byte, error) +	Marshal(v any) ([]byte, error)  	// Unmarshal parses the wire format into v. -	Unmarshal(data []byte, v interface{}) error +	Unmarshal(data []byte, v any) error  	// Name returns the name of the Codec implementation. The returned string  	// will be used as part of content type in transmission.  The result must be  	// static; the result cannot change between calls. diff --git a/vendor/google.golang.org/grpc/encoding/gzip/gzip.go b/vendor/google.golang.org/grpc/encoding/gzip/gzip.go index a3bb173c2..6306e8bb0 100644 --- a/vendor/google.golang.org/grpc/encoding/gzip/gzip.go +++ b/vendor/google.golang.org/grpc/encoding/gzip/gzip.go @@ -40,7 +40,7 @@ const Name = "gzip"  func init() {  	c := &compressor{} -	c.poolCompressor.New = func() interface{} { +	c.poolCompressor.New = func() any {  		return &writer{Writer: gzip.NewWriter(io.Discard), pool: &c.poolCompressor}  	}  	encoding.RegisterCompressor(c) @@ -61,7 +61,7 @@ func SetLevel(level int) error {  		return fmt.Errorf("grpc: invalid gzip compression level: %d", level)  	}  	c := encoding.GetCompressor(Name).(*compressor) -	c.poolCompressor.New = func() interface{} { +	c.poolCompressor.New = func() any {  		w, err := gzip.NewWriterLevel(io.Discard, level)  		if err != nil {  			panic(err) diff --git a/vendor/google.golang.org/grpc/encoding/proto/proto.go b/vendor/google.golang.org/grpc/encoding/proto/proto.go index 3009b35af..0ee3d3bae 100644 --- a/vendor/google.golang.org/grpc/encoding/proto/proto.go +++ b/vendor/google.golang.org/grpc/encoding/proto/proto.go @@ -37,7 +37,7 @@ func init() {  // codec is a Codec implementation with protobuf. It is the default codec for gRPC.  type codec struct{} -func (codec) Marshal(v interface{}) ([]byte, error) { +func (codec) Marshal(v any) ([]byte, error) {  	vv, ok := v.(proto.Message)  	if !ok {  		return nil, fmt.Errorf("failed to marshal, message is %T, want proto.Message", v) @@ -45,7 +45,7 @@ func (codec) Marshal(v interface{}) ([]byte, error) {  	return proto.Marshal(vv)  } -func (codec) Unmarshal(data []byte, v interface{}) error { +func (codec) Unmarshal(data []byte, v any) error {  	vv, ok := v.(proto.Message)  	if !ok {  		return fmt.Errorf("failed to unmarshal, message is %T, want proto.Message", v) diff --git a/vendor/google.golang.org/grpc/grpclog/component.go b/vendor/google.golang.org/grpc/grpclog/component.go index 8358dd6e2..ac73c9ced 100644 --- a/vendor/google.golang.org/grpc/grpclog/component.go +++ b/vendor/google.golang.org/grpc/grpclog/component.go @@ -31,71 +31,71 @@ type componentData struct {  var cache = map[string]*componentData{} -func (c *componentData) InfoDepth(depth int, args ...interface{}) { -	args = append([]interface{}{"[" + string(c.name) + "]"}, args...) +func (c *componentData) InfoDepth(depth int, args ...any) { +	args = append([]any{"[" + string(c.name) + "]"}, args...)  	grpclog.InfoDepth(depth+1, args...)  } -func (c *componentData) WarningDepth(depth int, args ...interface{}) { -	args = append([]interface{}{"[" + string(c.name) + "]"}, args...) +func (c *componentData) WarningDepth(depth int, args ...any) { +	args = append([]any{"[" + string(c.name) + "]"}, args...)  	grpclog.WarningDepth(depth+1, args...)  } -func (c *componentData) ErrorDepth(depth int, args ...interface{}) { -	args = append([]interface{}{"[" + string(c.name) + "]"}, args...) +func (c *componentData) ErrorDepth(depth int, args ...any) { +	args = append([]any{"[" + string(c.name) + "]"}, args...)  	grpclog.ErrorDepth(depth+1, args...)  } -func (c *componentData) FatalDepth(depth int, args ...interface{}) { -	args = append([]interface{}{"[" + string(c.name) + "]"}, args...) +func (c *componentData) FatalDepth(depth int, args ...any) { +	args = append([]any{"[" + string(c.name) + "]"}, args...)  	grpclog.FatalDepth(depth+1, args...)  } -func (c *componentData) Info(args ...interface{}) { +func (c *componentData) Info(args ...any) {  	c.InfoDepth(1, args...)  } -func (c *componentData) Warning(args ...interface{}) { +func (c *componentData) Warning(args ...any) {  	c.WarningDepth(1, args...)  } -func (c *componentData) Error(args ...interface{}) { +func (c *componentData) Error(args ...any) {  	c.ErrorDepth(1, args...)  } -func (c *componentData) Fatal(args ...interface{}) { +func (c *componentData) Fatal(args ...any) {  	c.FatalDepth(1, args...)  } -func (c *componentData) Infof(format string, args ...interface{}) { +func (c *componentData) Infof(format string, args ...any) {  	c.InfoDepth(1, fmt.Sprintf(format, args...))  } -func (c *componentData) Warningf(format string, args ...interface{}) { +func (c *componentData) Warningf(format string, args ...any) {  	c.WarningDepth(1, fmt.Sprintf(format, args...))  } -func (c *componentData) Errorf(format string, args ...interface{}) { +func (c *componentData) Errorf(format string, args ...any) {  	c.ErrorDepth(1, fmt.Sprintf(format, args...))  } -func (c *componentData) Fatalf(format string, args ...interface{}) { +func (c *componentData) Fatalf(format string, args ...any) {  	c.FatalDepth(1, fmt.Sprintf(format, args...))  } -func (c *componentData) Infoln(args ...interface{}) { +func (c *componentData) Infoln(args ...any) {  	c.InfoDepth(1, args...)  } -func (c *componentData) Warningln(args ...interface{}) { +func (c *componentData) Warningln(args ...any) {  	c.WarningDepth(1, args...)  } -func (c *componentData) Errorln(args ...interface{}) { +func (c *componentData) Errorln(args ...any) {  	c.ErrorDepth(1, args...)  } -func (c *componentData) Fatalln(args ...interface{}) { +func (c *componentData) Fatalln(args ...any) {  	c.FatalDepth(1, args...)  } diff --git a/vendor/google.golang.org/grpc/grpclog/grpclog.go b/vendor/google.golang.org/grpc/grpclog/grpclog.go index c8bb2be34..16928c9cb 100644 --- a/vendor/google.golang.org/grpc/grpclog/grpclog.go +++ b/vendor/google.golang.org/grpc/grpclog/grpclog.go @@ -42,53 +42,53 @@ func V(l int) bool {  }  // Info logs to the INFO log. -func Info(args ...interface{}) { +func Info(args ...any) {  	grpclog.Logger.Info(args...)  }  // Infof logs to the INFO log. Arguments are handled in the manner of fmt.Printf. -func Infof(format string, args ...interface{}) { +func Infof(format string, args ...any) {  	grpclog.Logger.Infof(format, args...)  }  // Infoln logs to the INFO log. Arguments are handled in the manner of fmt.Println. -func Infoln(args ...interface{}) { +func Infoln(args ...any) {  	grpclog.Logger.Infoln(args...)  }  // Warning logs to the WARNING log. -func Warning(args ...interface{}) { +func Warning(args ...any) {  	grpclog.Logger.Warning(args...)  }  // Warningf logs to the WARNING log. Arguments are handled in the manner of fmt.Printf. -func Warningf(format string, args ...interface{}) { +func Warningf(format string, args ...any) {  	grpclog.Logger.Warningf(format, args...)  }  // Warningln logs to the WARNING log. Arguments are handled in the manner of fmt.Println. -func Warningln(args ...interface{}) { +func Warningln(args ...any) {  	grpclog.Logger.Warningln(args...)  }  // Error logs to the ERROR log. -func Error(args ...interface{}) { +func Error(args ...any) {  	grpclog.Logger.Error(args...)  }  // Errorf logs to the ERROR log. Arguments are handled in the manner of fmt.Printf. -func Errorf(format string, args ...interface{}) { +func Errorf(format string, args ...any) {  	grpclog.Logger.Errorf(format, args...)  }  // Errorln logs to the ERROR log. Arguments are handled in the manner of fmt.Println. -func Errorln(args ...interface{}) { +func Errorln(args ...any) {  	grpclog.Logger.Errorln(args...)  }  // Fatal logs to the FATAL log. Arguments are handled in the manner of fmt.Print.  // It calls os.Exit() with exit code 1. -func Fatal(args ...interface{}) { +func Fatal(args ...any) {  	grpclog.Logger.Fatal(args...)  	// Make sure fatal logs will exit.  	os.Exit(1) @@ -96,7 +96,7 @@ func Fatal(args ...interface{}) {  // Fatalf logs to the FATAL log. Arguments are handled in the manner of fmt.Printf.  // It calls os.Exit() with exit code 1. -func Fatalf(format string, args ...interface{}) { +func Fatalf(format string, args ...any) {  	grpclog.Logger.Fatalf(format, args...)  	// Make sure fatal logs will exit.  	os.Exit(1) @@ -104,7 +104,7 @@ func Fatalf(format string, args ...interface{}) {  // Fatalln logs to the FATAL log. Arguments are handled in the manner of fmt.Println.  // It calle os.Exit()) with exit code 1. -func Fatalln(args ...interface{}) { +func Fatalln(args ...any) {  	grpclog.Logger.Fatalln(args...)  	// Make sure fatal logs will exit.  	os.Exit(1) @@ -113,20 +113,20 @@ func Fatalln(args ...interface{}) {  // Print prints to the logger. Arguments are handled in the manner of fmt.Print.  //  // Deprecated: use Info. -func Print(args ...interface{}) { +func Print(args ...any) {  	grpclog.Logger.Info(args...)  }  // Printf prints to the logger. Arguments are handled in the manner of fmt.Printf.  //  // Deprecated: use Infof. -func Printf(format string, args ...interface{}) { +func Printf(format string, args ...any) {  	grpclog.Logger.Infof(format, args...)  }  // Println prints to the logger. Arguments are handled in the manner of fmt.Println.  //  // Deprecated: use Infoln. -func Println(args ...interface{}) { +func Println(args ...any) {  	grpclog.Logger.Infoln(args...)  } diff --git a/vendor/google.golang.org/grpc/grpclog/logger.go b/vendor/google.golang.org/grpc/grpclog/logger.go index ef06a4822..b1674d826 100644 --- a/vendor/google.golang.org/grpc/grpclog/logger.go +++ b/vendor/google.golang.org/grpc/grpclog/logger.go @@ -24,12 +24,12 @@ import "google.golang.org/grpc/internal/grpclog"  //  // Deprecated: use LoggerV2.  type Logger interface { -	Fatal(args ...interface{}) -	Fatalf(format string, args ...interface{}) -	Fatalln(args ...interface{}) -	Print(args ...interface{}) -	Printf(format string, args ...interface{}) -	Println(args ...interface{}) +	Fatal(args ...any) +	Fatalf(format string, args ...any) +	Fatalln(args ...any) +	Print(args ...any) +	Printf(format string, args ...any) +	Println(args ...any)  }  // SetLogger sets the logger that is used in grpc. Call only from @@ -45,39 +45,39 @@ type loggerWrapper struct {  	Logger  } -func (g *loggerWrapper) Info(args ...interface{}) { +func (g *loggerWrapper) Info(args ...any) {  	g.Logger.Print(args...)  } -func (g *loggerWrapper) Infoln(args ...interface{}) { +func (g *loggerWrapper) Infoln(args ...any) {  	g.Logger.Println(args...)  } -func (g *loggerWrapper) Infof(format string, args ...interface{}) { +func (g *loggerWrapper) Infof(format string, args ...any) {  	g.Logger.Printf(format, args...)  } -func (g *loggerWrapper) Warning(args ...interface{}) { +func (g *loggerWrapper) Warning(args ...any) {  	g.Logger.Print(args...)  } -func (g *loggerWrapper) Warningln(args ...interface{}) { +func (g *loggerWrapper) Warningln(args ...any) {  	g.Logger.Println(args...)  } -func (g *loggerWrapper) Warningf(format string, args ...interface{}) { +func (g *loggerWrapper) Warningf(format string, args ...any) {  	g.Logger.Printf(format, args...)  } -func (g *loggerWrapper) Error(args ...interface{}) { +func (g *loggerWrapper) Error(args ...any) {  	g.Logger.Print(args...)  } -func (g *loggerWrapper) Errorln(args ...interface{}) { +func (g *loggerWrapper) Errorln(args ...any) {  	g.Logger.Println(args...)  } -func (g *loggerWrapper) Errorf(format string, args ...interface{}) { +func (g *loggerWrapper) Errorf(format string, args ...any) {  	g.Logger.Printf(format, args...)  } diff --git a/vendor/google.golang.org/grpc/grpclog/loggerv2.go b/vendor/google.golang.org/grpc/grpclog/loggerv2.go index 5de66e40d..ecfd36d71 100644 --- a/vendor/google.golang.org/grpc/grpclog/loggerv2.go +++ b/vendor/google.golang.org/grpc/grpclog/loggerv2.go @@ -33,35 +33,35 @@ import (  // LoggerV2 does underlying logging work for grpclog.  type LoggerV2 interface {  	// Info logs to INFO log. Arguments are handled in the manner of fmt.Print. -	Info(args ...interface{}) +	Info(args ...any)  	// Infoln logs to INFO log. Arguments are handled in the manner of fmt.Println. -	Infoln(args ...interface{}) +	Infoln(args ...any)  	// Infof logs to INFO log. Arguments are handled in the manner of fmt.Printf. -	Infof(format string, args ...interface{}) +	Infof(format string, args ...any)  	// Warning logs to WARNING log. Arguments are handled in the manner of fmt.Print. -	Warning(args ...interface{}) +	Warning(args ...any)  	// Warningln logs to WARNING log. Arguments are handled in the manner of fmt.Println. -	Warningln(args ...interface{}) +	Warningln(args ...any)  	// Warningf logs to WARNING log. Arguments are handled in the manner of fmt.Printf. -	Warningf(format string, args ...interface{}) +	Warningf(format string, args ...any)  	// Error logs to ERROR log. Arguments are handled in the manner of fmt.Print. -	Error(args ...interface{}) +	Error(args ...any)  	// Errorln logs to ERROR log. Arguments are handled in the manner of fmt.Println. -	Errorln(args ...interface{}) +	Errorln(args ...any)  	// Errorf logs to ERROR log. Arguments are handled in the manner of fmt.Printf. -	Errorf(format string, args ...interface{}) +	Errorf(format string, args ...any)  	// Fatal logs to ERROR log. Arguments are handled in the manner of fmt.Print.  	// gRPC ensures that all Fatal logs will exit with os.Exit(1).  	// Implementations may also call os.Exit() with a non-zero exit code. -	Fatal(args ...interface{}) +	Fatal(args ...any)  	// Fatalln logs to ERROR log. Arguments are handled in the manner of fmt.Println.  	// gRPC ensures that all Fatal logs will exit with os.Exit(1).  	// Implementations may also call os.Exit() with a non-zero exit code. -	Fatalln(args ...interface{}) +	Fatalln(args ...any)  	// Fatalf logs to ERROR log. Arguments are handled in the manner of fmt.Printf.  	// gRPC ensures that all Fatal logs will exit with os.Exit(1).  	// Implementations may also call os.Exit() with a non-zero exit code. -	Fatalf(format string, args ...interface{}) +	Fatalf(format string, args ...any)  	// V reports whether verbosity level l is at least the requested verbose level.  	V(l int) bool  } @@ -182,53 +182,53 @@ func (g *loggerT) output(severity int, s string) {  	g.m[severity].Output(2, string(b))  } -func (g *loggerT) Info(args ...interface{}) { +func (g *loggerT) Info(args ...any) {  	g.output(infoLog, fmt.Sprint(args...))  } -func (g *loggerT) Infoln(args ...interface{}) { +func (g *loggerT) Infoln(args ...any) {  	g.output(infoLog, fmt.Sprintln(args...))  } -func (g *loggerT) Infof(format string, args ...interface{}) { +func (g *loggerT) Infof(format string, args ...any) {  	g.output(infoLog, fmt.Sprintf(format, args...))  } -func (g *loggerT) Warning(args ...interface{}) { +func (g *loggerT) Warning(args ...any) {  	g.output(warningLog, fmt.Sprint(args...))  } -func (g *loggerT) Warningln(args ...interface{}) { +func (g *loggerT) Warningln(args ...any) {  	g.output(warningLog, fmt.Sprintln(args...))  } -func (g *loggerT) Warningf(format string, args ...interface{}) { +func (g *loggerT) Warningf(format string, args ...any) {  	g.output(warningLog, fmt.Sprintf(format, args...))  } -func (g *loggerT) Error(args ...interface{}) { +func (g *loggerT) Error(args ...any) {  	g.output(errorLog, fmt.Sprint(args...))  } -func (g *loggerT) Errorln(args ...interface{}) { +func (g *loggerT) Errorln(args ...any) {  	g.output(errorLog, fmt.Sprintln(args...))  } -func (g *loggerT) Errorf(format string, args ...interface{}) { +func (g *loggerT) Errorf(format string, args ...any) {  	g.output(errorLog, fmt.Sprintf(format, args...))  } -func (g *loggerT) Fatal(args ...interface{}) { +func (g *loggerT) Fatal(args ...any) {  	g.output(fatalLog, fmt.Sprint(args...))  	os.Exit(1)  } -func (g *loggerT) Fatalln(args ...interface{}) { +func (g *loggerT) Fatalln(args ...any) {  	g.output(fatalLog, fmt.Sprintln(args...))  	os.Exit(1)  } -func (g *loggerT) Fatalf(format string, args ...interface{}) { +func (g *loggerT) Fatalf(format string, args ...any) {  	g.output(fatalLog, fmt.Sprintf(format, args...))  	os.Exit(1)  } @@ -248,11 +248,11 @@ func (g *loggerT) V(l int) bool {  type DepthLoggerV2 interface {  	LoggerV2  	// InfoDepth logs to INFO log at the specified depth. Arguments are handled in the manner of fmt.Println. -	InfoDepth(depth int, args ...interface{}) +	InfoDepth(depth int, args ...any)  	// WarningDepth logs to WARNING log at the specified depth. Arguments are handled in the manner of fmt.Println. -	WarningDepth(depth int, args ...interface{}) +	WarningDepth(depth int, args ...any)  	// ErrorDepth logs to ERROR log at the specified depth. Arguments are handled in the manner of fmt.Println. -	ErrorDepth(depth int, args ...interface{}) +	ErrorDepth(depth int, args ...any)  	// FatalDepth logs to FATAL log at the specified depth. Arguments are handled in the manner of fmt.Println. -	FatalDepth(depth int, args ...interface{}) +	FatalDepth(depth int, args ...any)  } diff --git a/vendor/google.golang.org/grpc/health/grpc_health_v1/health.pb.go b/vendor/google.golang.org/grpc/health/grpc_health_v1/health.pb.go index 142d35f75..24299efd6 100644 --- a/vendor/google.golang.org/grpc/health/grpc_health_v1/health.pb.go +++ b/vendor/google.golang.org/grpc/health/grpc_health_v1/health.pb.go @@ -17,7 +17,7 @@  // Code generated by protoc-gen-go. DO NOT EDIT.  // versions: -// 	protoc-gen-go v1.30.0 +// 	protoc-gen-go v1.31.0  // 	protoc        v4.22.0  // source: grpc/health/v1/health.proto diff --git a/vendor/google.golang.org/grpc/interceptor.go b/vendor/google.golang.org/grpc/interceptor.go index bb96ef57b..877d78fc3 100644 --- a/vendor/google.golang.org/grpc/interceptor.go +++ b/vendor/google.golang.org/grpc/interceptor.go @@ -23,7 +23,7 @@ import (  )  // UnaryInvoker is called by UnaryClientInterceptor to complete RPCs. -type UnaryInvoker func(ctx context.Context, method string, req, reply interface{}, cc *ClientConn, opts ...CallOption) error +type UnaryInvoker func(ctx context.Context, method string, req, reply any, cc *ClientConn, opts ...CallOption) error  // UnaryClientInterceptor intercepts the execution of a unary RPC on the client.  // Unary interceptors can be specified as a DialOption, using @@ -40,7 +40,7 @@ type UnaryInvoker func(ctx context.Context, method string, req, reply interface{  // defaults from the ClientConn as well as per-call options.  //  // The returned error must be compatible with the status package. -type UnaryClientInterceptor func(ctx context.Context, method string, req, reply interface{}, cc *ClientConn, invoker UnaryInvoker, opts ...CallOption) error +type UnaryClientInterceptor func(ctx context.Context, method string, req, reply any, cc *ClientConn, invoker UnaryInvoker, opts ...CallOption) error  // Streamer is called by StreamClientInterceptor to create a ClientStream.  type Streamer func(ctx context.Context, desc *StreamDesc, cc *ClientConn, method string, opts ...CallOption) (ClientStream, error) @@ -66,7 +66,7 @@ type StreamClientInterceptor func(ctx context.Context, desc *StreamDesc, cc *Cli  // server side. All per-rpc information may be mutated by the interceptor.  type UnaryServerInfo struct {  	// Server is the service implementation the user provides. This is read-only. -	Server interface{} +	Server any  	// FullMethod is the full RPC method string, i.e., /package.service/method.  	FullMethod string  } @@ -78,13 +78,13 @@ type UnaryServerInfo struct {  // status package, or be one of the context errors. Otherwise, gRPC will use  // codes.Unknown as the status code and err.Error() as the status message of the  // RPC. -type UnaryHandler func(ctx context.Context, req interface{}) (interface{}, error) +type UnaryHandler func(ctx context.Context, req any) (any, error)  // UnaryServerInterceptor provides a hook to intercept the execution of a unary RPC on the server. info  // contains all the information of this RPC the interceptor can operate on. And handler is the wrapper  // of the service method implementation. It is the responsibility of the interceptor to invoke handler  // to complete the RPC. -type UnaryServerInterceptor func(ctx context.Context, req interface{}, info *UnaryServerInfo, handler UnaryHandler) (resp interface{}, err error) +type UnaryServerInterceptor func(ctx context.Context, req any, info *UnaryServerInfo, handler UnaryHandler) (resp any, err error)  // StreamServerInfo consists of various information about a streaming RPC on  // server side. All per-rpc information may be mutated by the interceptor. @@ -101,4 +101,4 @@ type StreamServerInfo struct {  // info contains all the information of this RPC the interceptor can operate on. And handler is the  // service method implementation. It is the responsibility of the interceptor to invoke handler to  // complete the RPC. -type StreamServerInterceptor func(srv interface{}, ss ServerStream, info *StreamServerInfo, handler StreamHandler) error +type StreamServerInterceptor func(srv any, ss ServerStream, info *StreamServerInfo, handler StreamHandler) error diff --git a/vendor/google.golang.org/grpc/internal/balancer/gracefulswitch/gracefulswitch.go b/vendor/google.golang.org/grpc/internal/balancer/gracefulswitch/gracefulswitch.go index 08666f62a..3c594e6e4 100644 --- a/vendor/google.golang.org/grpc/internal/balancer/gracefulswitch/gracefulswitch.go +++ b/vendor/google.golang.org/grpc/internal/balancer/gracefulswitch/gracefulswitch.go @@ -200,8 +200,8 @@ func (gsb *Balancer) ExitIdle() {  	}  } -// UpdateSubConnState forwards the update to the appropriate child. -func (gsb *Balancer) UpdateSubConnState(sc balancer.SubConn, state balancer.SubConnState) { +// updateSubConnState forwards the update to the appropriate child. +func (gsb *Balancer) updateSubConnState(sc balancer.SubConn, state balancer.SubConnState, cb func(balancer.SubConnState)) {  	gsb.currentMu.Lock()  	defer gsb.currentMu.Unlock()  	gsb.mu.Lock() @@ -214,13 +214,26 @@ func (gsb *Balancer) UpdateSubConnState(sc balancer.SubConn, state balancer.SubC  	} else if gsb.balancerPending != nil && gsb.balancerPending.subconns[sc] {  		balToUpdate = gsb.balancerPending  	} -	gsb.mu.Unlock()  	if balToUpdate == nil {  		// SubConn belonged to a stale lb policy that has not yet fully closed,  		// or the balancer was already closed. +		gsb.mu.Unlock()  		return  	} -	balToUpdate.UpdateSubConnState(sc, state) +	if state.ConnectivityState == connectivity.Shutdown { +		delete(balToUpdate.subconns, sc) +	} +	gsb.mu.Unlock() +	if cb != nil { +		cb(state) +	} else { +		balToUpdate.UpdateSubConnState(sc, state) +	} +} + +// UpdateSubConnState forwards the update to the appropriate child. +func (gsb *Balancer) UpdateSubConnState(sc balancer.SubConn, state balancer.SubConnState) { +	gsb.updateSubConnState(sc, state, nil)  }  // Close closes any active child balancers. @@ -242,7 +255,7 @@ func (gsb *Balancer) Close() {  //  // It implements the balancer.ClientConn interface and is passed down in that  // capacity to the wrapped balancer. It maintains a set of subConns created by -// the wrapped balancer and calls from the latter to create/update/remove +// the wrapped balancer and calls from the latter to create/update/shutdown  // SubConns update this set before being forwarded to the parent ClientConn.  // State updates from the wrapped balancer can result in invocation of the  // graceful switch logic. @@ -254,21 +267,10 @@ type balancerWrapper struct {  	subconns  map[balancer.SubConn]bool // subconns created by this balancer  } -func (bw *balancerWrapper) UpdateSubConnState(sc balancer.SubConn, state balancer.SubConnState) { -	if state.ConnectivityState == connectivity.Shutdown { -		bw.gsb.mu.Lock() -		delete(bw.subconns, sc) -		bw.gsb.mu.Unlock() -	} -	// There is no need to protect this read with a mutex, as the write to the -	// Balancer field happens in SwitchTo, which completes before this can be -	// called. -	bw.Balancer.UpdateSubConnState(sc, state) -} - -// Close closes the underlying LB policy and removes the subconns it created. bw -// must not be referenced via balancerCurrent or balancerPending in gsb when -// called. gsb.mu must not be held.  Does not panic with a nil receiver. +// Close closes the underlying LB policy and shuts down the subconns it +// created. bw must not be referenced via balancerCurrent or balancerPending in +// gsb when called. gsb.mu must not be held.  Does not panic with a nil +// receiver.  func (bw *balancerWrapper) Close() {  	// before Close is called.  	if bw == nil { @@ -281,7 +283,7 @@ func (bw *balancerWrapper) Close() {  	bw.Balancer.Close()  	bw.gsb.mu.Lock()  	for sc := range bw.subconns { -		bw.gsb.cc.RemoveSubConn(sc) +		sc.Shutdown()  	}  	bw.gsb.mu.Unlock()  } @@ -335,13 +337,16 @@ func (bw *balancerWrapper) NewSubConn(addrs []resolver.Address, opts balancer.Ne  	}  	bw.gsb.mu.Unlock() +	var sc balancer.SubConn +	oldListener := opts.StateListener +	opts.StateListener = func(state balancer.SubConnState) { bw.gsb.updateSubConnState(sc, state, oldListener) }  	sc, err := bw.gsb.cc.NewSubConn(addrs, opts)  	if err != nil {  		return nil, err  	}  	bw.gsb.mu.Lock()  	if !bw.gsb.balancerCurrentOrPending(bw) { // balancer was closed during this call -		bw.gsb.cc.RemoveSubConn(sc) +		sc.Shutdown()  		bw.gsb.mu.Unlock()  		return nil, fmt.Errorf("%T at address %p that called NewSubConn is deleted", bw, bw)  	} @@ -360,13 +365,9 @@ func (bw *balancerWrapper) ResolveNow(opts resolver.ResolveNowOptions) {  }  func (bw *balancerWrapper) RemoveSubConn(sc balancer.SubConn) { -	bw.gsb.mu.Lock() -	if !bw.gsb.balancerCurrentOrPending(bw) { -		bw.gsb.mu.Unlock() -		return -	} -	bw.gsb.mu.Unlock() -	bw.gsb.cc.RemoveSubConn(sc) +	// Note: existing third party balancers may call this, so it must remain +	// until RemoveSubConn is fully removed. +	sc.Shutdown()  }  func (bw *balancerWrapper) UpdateAddresses(sc balancer.SubConn, addrs []resolver.Address) { diff --git a/vendor/google.golang.org/grpc/internal/balancerload/load.go b/vendor/google.golang.org/grpc/internal/balancerload/load.go index 3a905d966..94a08d687 100644 --- a/vendor/google.golang.org/grpc/internal/balancerload/load.go +++ b/vendor/google.golang.org/grpc/internal/balancerload/load.go @@ -25,7 +25,7 @@ import (  // Parser converts loads from metadata into a concrete type.  type Parser interface {  	// Parse parses loads from metadata. -	Parse(md metadata.MD) interface{} +	Parse(md metadata.MD) any  }  var parser Parser @@ -38,7 +38,7 @@ func SetParser(lr Parser) {  }  // Parse calls parser.Read(). -func Parse(md metadata.MD) interface{} { +func Parse(md metadata.MD) any {  	if parser == nil {  		return nil  	} diff --git a/vendor/google.golang.org/grpc/internal/binarylog/method_logger.go b/vendor/google.golang.org/grpc/internal/binarylog/method_logger.go index 6c3f63221..0f31274a3 100644 --- a/vendor/google.golang.org/grpc/internal/binarylog/method_logger.go +++ b/vendor/google.golang.org/grpc/internal/binarylog/method_logger.go @@ -230,7 +230,7 @@ type ClientMessage struct {  	OnClientSide bool  	// Message can be a proto.Message or []byte. Other messages formats are not  	// supported. -	Message interface{} +	Message any  }  func (c *ClientMessage) toProto() *binlogpb.GrpcLogEntry { @@ -270,7 +270,7 @@ type ServerMessage struct {  	OnClientSide bool  	// Message can be a proto.Message or []byte. Other messages formats are not  	// supported. -	Message interface{} +	Message any  }  func (c *ServerMessage) toProto() *binlogpb.GrpcLogEntry { diff --git a/vendor/google.golang.org/grpc/internal/buffer/unbounded.go b/vendor/google.golang.org/grpc/internal/buffer/unbounded.go index 81c2f5fd7..4399c3df4 100644 --- a/vendor/google.golang.org/grpc/internal/buffer/unbounded.go +++ b/vendor/google.golang.org/grpc/internal/buffer/unbounded.go @@ -28,25 +28,25 @@ import "sync"  // the underlying mutex used for synchronization.  //  // Unbounded supports values of any type to be stored in it by using a channel -// of `interface{}`. This means that a call to Put() incurs an extra memory -// allocation, and also that users need a type assertion while reading. For -// performance critical code paths, using Unbounded is strongly discouraged and -// defining a new type specific implementation of this buffer is preferred. See +// of `any`. This means that a call to Put() incurs an extra memory allocation, +// and also that users need a type assertion while reading. For performance +// critical code paths, using Unbounded is strongly discouraged and defining a +// new type specific implementation of this buffer is preferred. See  // internal/transport/transport.go for an example of this.  type Unbounded struct { -	c       chan interface{} +	c       chan any  	closed  bool  	mu      sync.Mutex -	backlog []interface{} +	backlog []any  }  // NewUnbounded returns a new instance of Unbounded.  func NewUnbounded() *Unbounded { -	return &Unbounded{c: make(chan interface{}, 1)} +	return &Unbounded{c: make(chan any, 1)}  }  // Put adds t to the unbounded buffer. -func (b *Unbounded) Put(t interface{}) { +func (b *Unbounded) Put(t any) {  	b.mu.Lock()  	defer b.mu.Unlock()  	if b.closed { @@ -89,7 +89,7 @@ func (b *Unbounded) Load() {  //  // If the unbounded buffer is closed, the read channel returned by this method  // is closed. -func (b *Unbounded) Get() <-chan interface{} { +func (b *Unbounded) Get() <-chan any {  	return b.c  } diff --git a/vendor/google.golang.org/grpc/internal/channelz/funcs.go b/vendor/google.golang.org/grpc/internal/channelz/funcs.go index 777cbcd79..5395e7752 100644 --- a/vendor/google.golang.org/grpc/internal/channelz/funcs.go +++ b/vendor/google.golang.org/grpc/internal/channelz/funcs.go @@ -24,9 +24,7 @@  package channelz  import ( -	"context"  	"errors" -	"fmt"  	"sort"  	"sync"  	"sync/atomic" @@ -40,8 +38,11 @@ const (  )  var ( -	db    dbWrapper -	idGen idGenerator +	// IDGen is the global channelz entity ID generator.  It should not be used +	// outside this package except by tests. +	IDGen IDGenerator + +	db dbWrapper  	// EntryPerPage defines the number of channelz entries to be shown on a web page.  	EntryPerPage  = int64(50)  	curState      int32 @@ -52,14 +53,14 @@ var (  func TurnOn() {  	if !IsOn() {  		db.set(newChannelMap()) -		idGen.reset() +		IDGen.Reset()  		atomic.StoreInt32(&curState, 1)  	}  }  // IsOn returns whether channelz data collection is on.  func IsOn() bool { -	return atomic.CompareAndSwapInt32(&curState, 1, 1) +	return atomic.LoadInt32(&curState) == 1  }  // SetMaxTraceEntry sets maximum number of trace entry per entity (i.e. channel/subchannel). @@ -97,43 +98,6 @@ func (d *dbWrapper) get() *channelMap {  	return d.DB  } -// NewChannelzStorageForTesting initializes channelz data storage and id -// generator for testing purposes. -// -// Returns a cleanup function to be invoked by the test, which waits for up to -// 10s for all channelz state to be reset by the grpc goroutines when those -// entities get closed. This cleanup function helps with ensuring that tests -// don't mess up each other. -func NewChannelzStorageForTesting() (cleanup func() error) { -	db.set(newChannelMap()) -	idGen.reset() - -	return func() error { -		cm := db.get() -		if cm == nil { -			return nil -		} - -		ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) -		defer cancel() -		ticker := time.NewTicker(10 * time.Millisecond) -		defer ticker.Stop() -		for { -			cm.mu.RLock() -			topLevelChannels, servers, channels, subChannels, listenSockets, normalSockets := len(cm.topLevelChannels), len(cm.servers), len(cm.channels), len(cm.subChannels), len(cm.listenSockets), len(cm.normalSockets) -			cm.mu.RUnlock() - -			if err := ctx.Err(); err != nil { -				return fmt.Errorf("after 10s the channelz map has not been cleaned up yet, topchannels: %d, servers: %d, channels: %d, subchannels: %d, listen sockets: %d, normal sockets: %d", topLevelChannels, servers, channels, subChannels, listenSockets, normalSockets) -			} -			if topLevelChannels == 0 && servers == 0 && channels == 0 && subChannels == 0 && listenSockets == 0 && normalSockets == 0 { -				return nil -			} -			<-ticker.C -		} -	} -} -  // GetTopChannels returns a slice of top channel's ChannelMetric, along with a  // boolean indicating whether there's more top channels to be queried for.  // @@ -193,7 +157,7 @@ func GetServer(id int64) *ServerMetric {  //  // If channelz is not turned ON, the channelz database is not mutated.  func RegisterChannel(c Channel, pid *Identifier, ref string) *Identifier { -	id := idGen.genID() +	id := IDGen.genID()  	var parent int64  	isTopChannel := true  	if pid != nil { @@ -229,7 +193,7 @@ func RegisterSubChannel(c Channel, pid *Identifier, ref string) (*Identifier, er  	if pid == nil {  		return nil, errors.New("a SubChannel's parent id cannot be nil")  	} -	id := idGen.genID() +	id := IDGen.genID()  	if !IsOn() {  		return newIdentifer(RefSubChannel, id, pid), nil  	} @@ -251,7 +215,7 @@ func RegisterSubChannel(c Channel, pid *Identifier, ref string) (*Identifier, er  //  // If channelz is not turned ON, the channelz database is not mutated.  func RegisterServer(s Server, ref string) *Identifier { -	id := idGen.genID() +	id := IDGen.genID()  	if !IsOn() {  		return newIdentifer(RefServer, id, nil)  	} @@ -277,7 +241,7 @@ func RegisterListenSocket(s Socket, pid *Identifier, ref string) (*Identifier, e  	if pid == nil {  		return nil, errors.New("a ListenSocket's parent id cannot be 0")  	} -	id := idGen.genID() +	id := IDGen.genID()  	if !IsOn() {  		return newIdentifer(RefListenSocket, id, pid), nil  	} @@ -297,7 +261,7 @@ func RegisterNormalSocket(s Socket, pid *Identifier, ref string) (*Identifier, e  	if pid == nil {  		return nil, errors.New("a NormalSocket's parent id cannot be 0")  	} -	id := idGen.genID() +	id := IDGen.genID()  	if !IsOn() {  		return newIdentifer(RefNormalSocket, id, pid), nil  	} @@ -776,14 +740,17 @@ func (c *channelMap) GetServer(id int64) *ServerMetric {  	return sm  } -type idGenerator struct { +// IDGenerator is an incrementing atomic that tracks IDs for channelz entities. +type IDGenerator struct {  	id int64  } -func (i *idGenerator) reset() { +// Reset resets the generated ID back to zero.  Should only be used at +// initialization or by tests sensitive to the ID number. +func (i *IDGenerator) Reset() {  	atomic.StoreInt64(&i.id, 0)  } -func (i *idGenerator) genID() int64 { +func (i *IDGenerator) genID() int64 {  	return atomic.AddInt64(&i.id, 1)  } diff --git a/vendor/google.golang.org/grpc/internal/channelz/logging.go b/vendor/google.golang.org/grpc/internal/channelz/logging.go index 8e13a3d2c..f89e6f77b 100644 --- a/vendor/google.golang.org/grpc/internal/channelz/logging.go +++ b/vendor/google.golang.org/grpc/internal/channelz/logging.go @@ -31,7 +31,7 @@ func withParens(id *Identifier) string {  }  // Info logs and adds a trace event if channelz is on. -func Info(l grpclog.DepthLoggerV2, id *Identifier, args ...interface{}) { +func Info(l grpclog.DepthLoggerV2, id *Identifier, args ...any) {  	AddTraceEvent(l, id, 1, &TraceEventDesc{  		Desc:     fmt.Sprint(args...),  		Severity: CtInfo, @@ -39,7 +39,7 @@ func Info(l grpclog.DepthLoggerV2, id *Identifier, args ...interface{}) {  }  // Infof logs and adds a trace event if channelz is on. -func Infof(l grpclog.DepthLoggerV2, id *Identifier, format string, args ...interface{}) { +func Infof(l grpclog.DepthLoggerV2, id *Identifier, format string, args ...any) {  	AddTraceEvent(l, id, 1, &TraceEventDesc{  		Desc:     fmt.Sprintf(format, args...),  		Severity: CtInfo, @@ -47,7 +47,7 @@ func Infof(l grpclog.DepthLoggerV2, id *Identifier, format string, args ...inter  }  // Warning logs and adds a trace event if channelz is on. -func Warning(l grpclog.DepthLoggerV2, id *Identifier, args ...interface{}) { +func Warning(l grpclog.DepthLoggerV2, id *Identifier, args ...any) {  	AddTraceEvent(l, id, 1, &TraceEventDesc{  		Desc:     fmt.Sprint(args...),  		Severity: CtWarning, @@ -55,7 +55,7 @@ func Warning(l grpclog.DepthLoggerV2, id *Identifier, args ...interface{}) {  }  // Warningf logs and adds a trace event if channelz is on. -func Warningf(l grpclog.DepthLoggerV2, id *Identifier, format string, args ...interface{}) { +func Warningf(l grpclog.DepthLoggerV2, id *Identifier, format string, args ...any) {  	AddTraceEvent(l, id, 1, &TraceEventDesc{  		Desc:     fmt.Sprintf(format, args...),  		Severity: CtWarning, @@ -63,7 +63,7 @@ func Warningf(l grpclog.DepthLoggerV2, id *Identifier, format string, args ...in  }  // Error logs and adds a trace event if channelz is on. -func Error(l grpclog.DepthLoggerV2, id *Identifier, args ...interface{}) { +func Error(l grpclog.DepthLoggerV2, id *Identifier, args ...any) {  	AddTraceEvent(l, id, 1, &TraceEventDesc{  		Desc:     fmt.Sprint(args...),  		Severity: CtError, @@ -71,7 +71,7 @@ func Error(l grpclog.DepthLoggerV2, id *Identifier, args ...interface{}) {  }  // Errorf logs and adds a trace event if channelz is on. -func Errorf(l grpclog.DepthLoggerV2, id *Identifier, format string, args ...interface{}) { +func Errorf(l grpclog.DepthLoggerV2, id *Identifier, format string, args ...any) {  	AddTraceEvent(l, id, 1, &TraceEventDesc{  		Desc:     fmt.Sprintf(format, args...),  		Severity: CtError, diff --git a/vendor/google.golang.org/grpc/internal/channelz/types.go b/vendor/google.golang.org/grpc/internal/channelz/types.go index 7b2f350e2..1d4020f53 100644 --- a/vendor/google.golang.org/grpc/internal/channelz/types.go +++ b/vendor/google.golang.org/grpc/internal/channelz/types.go @@ -628,6 +628,7 @@ type tracedChannel interface {  type channelTrace struct {  	cm          *channelMap +	clearCalled bool  	createdTime time.Time  	eventCount  int64  	mu          sync.Mutex @@ -656,6 +657,10 @@ func (c *channelTrace) append(e *TraceEvent) {  }  func (c *channelTrace) clear() { +	if c.clearCalled { +		return +	} +	c.clearCalled = true  	c.mu.Lock()  	for _, e := range c.events {  		if e.RefID != 0 { diff --git a/vendor/google.golang.org/grpc/internal/channelz/util_linux.go b/vendor/google.golang.org/grpc/internal/channelz/util_linux.go index 8d194e44e..98288c3f8 100644 --- a/vendor/google.golang.org/grpc/internal/channelz/util_linux.go +++ b/vendor/google.golang.org/grpc/internal/channelz/util_linux.go @@ -23,7 +23,7 @@ import (  )  // GetSocketOption gets the socket option info of the conn. -func GetSocketOption(socket interface{}) *SocketOptionData { +func GetSocketOption(socket any) *SocketOptionData {  	c, ok := socket.(syscall.Conn)  	if !ok {  		return nil diff --git a/vendor/google.golang.org/grpc/internal/channelz/util_nonlinux.go b/vendor/google.golang.org/grpc/internal/channelz/util_nonlinux.go index 837ddc402..b5568b22e 100644 --- a/vendor/google.golang.org/grpc/internal/channelz/util_nonlinux.go +++ b/vendor/google.golang.org/grpc/internal/channelz/util_nonlinux.go @@ -22,6 +22,6 @@  package channelz  // GetSocketOption gets the socket option info of the conn. -func GetSocketOption(c interface{}) *SocketOptionData { +func GetSocketOption(c any) *SocketOptionData {  	return nil  } diff --git a/vendor/google.golang.org/grpc/internal/credentials/credentials.go b/vendor/google.golang.org/grpc/internal/credentials/credentials.go index 32c9b5903..9deee7f65 100644 --- a/vendor/google.golang.org/grpc/internal/credentials/credentials.go +++ b/vendor/google.golang.org/grpc/internal/credentials/credentials.go @@ -25,12 +25,12 @@ import (  type requestInfoKey struct{}  // NewRequestInfoContext creates a context with ri. -func NewRequestInfoContext(ctx context.Context, ri interface{}) context.Context { +func NewRequestInfoContext(ctx context.Context, ri any) context.Context {  	return context.WithValue(ctx, requestInfoKey{}, ri)  }  // RequestInfoFromContext extracts the RequestInfo from ctx. -func RequestInfoFromContext(ctx context.Context) interface{} { +func RequestInfoFromContext(ctx context.Context) any {  	return ctx.Value(requestInfoKey{})  } @@ -39,11 +39,11 @@ func RequestInfoFromContext(ctx context.Context) interface{} {  type clientHandshakeInfoKey struct{}  // ClientHandshakeInfoFromContext extracts the ClientHandshakeInfo from ctx. -func ClientHandshakeInfoFromContext(ctx context.Context) interface{} { +func ClientHandshakeInfoFromContext(ctx context.Context) any {  	return ctx.Value(clientHandshakeInfoKey{})  }  // NewClientHandshakeInfoContext creates a context with chi. -func NewClientHandshakeInfoContext(ctx context.Context, chi interface{}) context.Context { +func NewClientHandshakeInfoContext(ctx context.Context, chi any) context.Context {  	return context.WithValue(ctx, clientHandshakeInfoKey{}, chi)  } diff --git a/vendor/google.golang.org/grpc/internal/envconfig/envconfig.go b/vendor/google.golang.org/grpc/internal/envconfig/envconfig.go index 77c2c0b89..3cf10ddfb 100644 --- a/vendor/google.golang.org/grpc/internal/envconfig/envconfig.go +++ b/vendor/google.golang.org/grpc/internal/envconfig/envconfig.go @@ -37,9 +37,12 @@ var (  	// checking which NACKs configs specifying ring sizes > 8*1024*1024 (~8M).  	RingHashCap = uint64FromEnv("GRPC_RING_HASH_CAP", 4096, 1, 8*1024*1024)  	// PickFirstLBConfig is set if we should support configuration of the -	// pick_first LB policy, which can be enabled by setting the environment -	// variable "GRPC_EXPERIMENTAL_PICKFIRST_LB_CONFIG" to "true". -	PickFirstLBConfig = boolFromEnv("GRPC_EXPERIMENTAL_PICKFIRST_LB_CONFIG", false) +	// pick_first LB policy. +	PickFirstLBConfig = boolFromEnv("GRPC_EXPERIMENTAL_PICKFIRST_LB_CONFIG", true) +	// LeastRequestLB is set if we should support the least_request_experimental +	// LB policy, which can be enabled by setting the environment variable +	// "GRPC_EXPERIMENTAL_ENABLE_LEAST_REQUEST" to "true". +	LeastRequestLB = boolFromEnv("GRPC_EXPERIMENTAL_ENABLE_LEAST_REQUEST", false)  	// ALTSMaxConcurrentHandshakes is the maximum number of concurrent ALTS  	// handshakes that can be performed.  	ALTSMaxConcurrentHandshakes = uint64FromEnv("GRPC_ALTS_MAX_CONCURRENT_HANDSHAKES", 100, 1, 100) diff --git a/vendor/google.golang.org/grpc/internal/grpclog/grpclog.go b/vendor/google.golang.org/grpc/internal/grpclog/grpclog.go index b68e26a36..bfc45102a 100644 --- a/vendor/google.golang.org/grpc/internal/grpclog/grpclog.go +++ b/vendor/google.golang.org/grpc/internal/grpclog/grpclog.go @@ -30,7 +30,7 @@ var Logger LoggerV2  var DepthLogger DepthLoggerV2  // InfoDepth logs to the INFO log at the specified depth. -func InfoDepth(depth int, args ...interface{}) { +func InfoDepth(depth int, args ...any) {  	if DepthLogger != nil {  		DepthLogger.InfoDepth(depth, args...)  	} else { @@ -39,7 +39,7 @@ func InfoDepth(depth int, args ...interface{}) {  }  // WarningDepth logs to the WARNING log at the specified depth. -func WarningDepth(depth int, args ...interface{}) { +func WarningDepth(depth int, args ...any) {  	if DepthLogger != nil {  		DepthLogger.WarningDepth(depth, args...)  	} else { @@ -48,7 +48,7 @@ func WarningDepth(depth int, args ...interface{}) {  }  // ErrorDepth logs to the ERROR log at the specified depth. -func ErrorDepth(depth int, args ...interface{}) { +func ErrorDepth(depth int, args ...any) {  	if DepthLogger != nil {  		DepthLogger.ErrorDepth(depth, args...)  	} else { @@ -57,7 +57,7 @@ func ErrorDepth(depth int, args ...interface{}) {  }  // FatalDepth logs to the FATAL log at the specified depth. -func FatalDepth(depth int, args ...interface{}) { +func FatalDepth(depth int, args ...any) {  	if DepthLogger != nil {  		DepthLogger.FatalDepth(depth, args...)  	} else { @@ -71,35 +71,35 @@ func FatalDepth(depth int, args ...interface{}) {  // is defined here to avoid a circular dependency.  type LoggerV2 interface {  	// Info logs to INFO log. Arguments are handled in the manner of fmt.Print. -	Info(args ...interface{}) +	Info(args ...any)  	// Infoln logs to INFO log. Arguments are handled in the manner of fmt.Println. -	Infoln(args ...interface{}) +	Infoln(args ...any)  	// Infof logs to INFO log. Arguments are handled in the manner of fmt.Printf. -	Infof(format string, args ...interface{}) +	Infof(format string, args ...any)  	// Warning logs to WARNING log. Arguments are handled in the manner of fmt.Print. -	Warning(args ...interface{}) +	Warning(args ...any)  	// Warningln logs to WARNING log. Arguments are handled in the manner of fmt.Println. -	Warningln(args ...interface{}) +	Warningln(args ...any)  	// Warningf logs to WARNING log. Arguments are handled in the manner of fmt.Printf. -	Warningf(format string, args ...interface{}) +	Warningf(format string, args ...any)  	// Error logs to ERROR log. Arguments are handled in the manner of fmt.Print. -	Error(args ...interface{}) +	Error(args ...any)  	// Errorln logs to ERROR log. Arguments are handled in the manner of fmt.Println. -	Errorln(args ...interface{}) +	Errorln(args ...any)  	// Errorf logs to ERROR log. Arguments are handled in the manner of fmt.Printf. -	Errorf(format string, args ...interface{}) +	Errorf(format string, args ...any)  	// Fatal logs to ERROR log. Arguments are handled in the manner of fmt.Print.  	// gRPC ensures that all Fatal logs will exit with os.Exit(1).  	// Implementations may also call os.Exit() with a non-zero exit code. -	Fatal(args ...interface{}) +	Fatal(args ...any)  	// Fatalln logs to ERROR log. Arguments are handled in the manner of fmt.Println.  	// gRPC ensures that all Fatal logs will exit with os.Exit(1).  	// Implementations may also call os.Exit() with a non-zero exit code. -	Fatalln(args ...interface{}) +	Fatalln(args ...any)  	// Fatalf logs to ERROR log. Arguments are handled in the manner of fmt.Printf.  	// gRPC ensures that all Fatal logs will exit with os.Exit(1).  	// Implementations may also call os.Exit() with a non-zero exit code. -	Fatalf(format string, args ...interface{}) +	Fatalf(format string, args ...any)  	// V reports whether verbosity level l is at least the requested verbose level.  	V(l int) bool  } @@ -116,11 +116,11 @@ type LoggerV2 interface {  // later release.  type DepthLoggerV2 interface {  	// InfoDepth logs to INFO log at the specified depth. Arguments are handled in the manner of fmt.Println. -	InfoDepth(depth int, args ...interface{}) +	InfoDepth(depth int, args ...any)  	// WarningDepth logs to WARNING log at the specified depth. Arguments are handled in the manner of fmt.Println. -	WarningDepth(depth int, args ...interface{}) +	WarningDepth(depth int, args ...any)  	// ErrorDepth logs to ERROR log at the specified depth. Arguments are handled in the manner of fmt.Println. -	ErrorDepth(depth int, args ...interface{}) +	ErrorDepth(depth int, args ...any)  	// FatalDepth logs to FATAL log at the specified depth. Arguments are handled in the manner of fmt.Println. -	FatalDepth(depth int, args ...interface{}) +	FatalDepth(depth int, args ...any)  } diff --git a/vendor/google.golang.org/grpc/internal/grpclog/prefixLogger.go b/vendor/google.golang.org/grpc/internal/grpclog/prefixLogger.go index 02224b42c..faa998de7 100644 --- a/vendor/google.golang.org/grpc/internal/grpclog/prefixLogger.go +++ b/vendor/google.golang.org/grpc/internal/grpclog/prefixLogger.go @@ -31,7 +31,7 @@ type PrefixLogger struct {  }  // Infof does info logging. -func (pl *PrefixLogger) Infof(format string, args ...interface{}) { +func (pl *PrefixLogger) Infof(format string, args ...any) {  	if pl != nil {  		// Handle nil, so the tests can pass in a nil logger.  		format = pl.prefix + format @@ -42,7 +42,7 @@ func (pl *PrefixLogger) Infof(format string, args ...interface{}) {  }  // Warningf does warning logging. -func (pl *PrefixLogger) Warningf(format string, args ...interface{}) { +func (pl *PrefixLogger) Warningf(format string, args ...any) {  	if pl != nil {  		format = pl.prefix + format  		pl.logger.WarningDepth(1, fmt.Sprintf(format, args...)) @@ -52,7 +52,7 @@ func (pl *PrefixLogger) Warningf(format string, args ...interface{}) {  }  // Errorf does error logging. -func (pl *PrefixLogger) Errorf(format string, args ...interface{}) { +func (pl *PrefixLogger) Errorf(format string, args ...any) {  	if pl != nil {  		format = pl.prefix + format  		pl.logger.ErrorDepth(1, fmt.Sprintf(format, args...)) @@ -62,7 +62,7 @@ func (pl *PrefixLogger) Errorf(format string, args ...interface{}) {  }  // Debugf does info logging at verbose level 2. -func (pl *PrefixLogger) Debugf(format string, args ...interface{}) { +func (pl *PrefixLogger) Debugf(format string, args ...any) {  	// TODO(6044): Refactor interfaces LoggerV2 and DepthLogger, and maybe  	// rewrite PrefixLogger a little to ensure that we don't use the global  	// `Logger` here, and instead use the `logger` field. diff --git a/vendor/google.golang.org/grpc/internal/grpcsync/callback_serializer.go b/vendor/google.golang.org/grpc/internal/grpcsync/callback_serializer.go index 37b8d4117..900917dbe 100644 --- a/vendor/google.golang.org/grpc/internal/grpcsync/callback_serializer.go +++ b/vendor/google.golang.org/grpc/internal/grpcsync/callback_serializer.go @@ -32,10 +32,10 @@ import (  //  // This type is safe for concurrent access.  type CallbackSerializer struct { -	// Done is closed once the serializer is shut down completely, i.e all +	// done is closed once the serializer is shut down completely, i.e all  	// scheduled callbacks are executed and the serializer has deallocated all  	// its resources. -	Done chan struct{} +	done chan struct{}  	callbacks *buffer.Unbounded  	closedMu  sync.Mutex @@ -48,12 +48,12 @@ type CallbackSerializer struct {  // callbacks will be added once this context is canceled, and any pending un-run  // callbacks will be executed before the serializer is shut down.  func NewCallbackSerializer(ctx context.Context) *CallbackSerializer { -	t := &CallbackSerializer{ -		Done:      make(chan struct{}), +	cs := &CallbackSerializer{ +		done:      make(chan struct{}),  		callbacks: buffer.NewUnbounded(),  	} -	go t.run(ctx) -	return t +	go cs.run(ctx) +	return cs  }  // Schedule adds a callback to be scheduled after existing callbacks are run. @@ -64,56 +64,62 @@ func NewCallbackSerializer(ctx context.Context) *CallbackSerializer {  // Return value indicates if the callback was successfully added to the list of  // callbacks to be executed by the serializer. It is not possible to add  // callbacks once the context passed to NewCallbackSerializer is cancelled. -func (t *CallbackSerializer) Schedule(f func(ctx context.Context)) bool { -	t.closedMu.Lock() -	defer t.closedMu.Unlock() +func (cs *CallbackSerializer) Schedule(f func(ctx context.Context)) bool { +	cs.closedMu.Lock() +	defer cs.closedMu.Unlock() -	if t.closed { +	if cs.closed {  		return false  	} -	t.callbacks.Put(f) +	cs.callbacks.Put(f)  	return true  } -func (t *CallbackSerializer) run(ctx context.Context) { +func (cs *CallbackSerializer) run(ctx context.Context) {  	var backlog []func(context.Context) -	defer close(t.Done) +	defer close(cs.done)  	for ctx.Err() == nil {  		select {  		case <-ctx.Done():  			// Do nothing here. Next iteration of the for loop will not happen,  			// since ctx.Err() would be non-nil. -		case callback, ok := <-t.callbacks.Get(): +		case callback, ok := <-cs.callbacks.Get():  			if !ok {  				return  			} -			t.callbacks.Load() +			cs.callbacks.Load()  			callback.(func(ctx context.Context))(ctx)  		}  	}  	// Fetch pending callbacks if any, and execute them before returning from -	// this method and closing t.Done. -	t.closedMu.Lock() -	t.closed = true -	backlog = t.fetchPendingCallbacks() -	t.callbacks.Close() -	t.closedMu.Unlock() +	// this method and closing cs.done. +	cs.closedMu.Lock() +	cs.closed = true +	backlog = cs.fetchPendingCallbacks() +	cs.callbacks.Close() +	cs.closedMu.Unlock()  	for _, b := range backlog {  		b(ctx)  	}  } -func (t *CallbackSerializer) fetchPendingCallbacks() []func(context.Context) { +func (cs *CallbackSerializer) fetchPendingCallbacks() []func(context.Context) {  	var backlog []func(context.Context)  	for {  		select { -		case b := <-t.callbacks.Get(): +		case b := <-cs.callbacks.Get():  			backlog = append(backlog, b.(func(context.Context))) -			t.callbacks.Load() +			cs.callbacks.Load()  		default:  			return backlog  		}  	}  } + +// Done returns a channel that is closed after the context passed to +// NewCallbackSerializer is canceled and all callbacks have been executed. +func (cs *CallbackSerializer) Done() <-chan struct{} { +	return cs.done +} diff --git a/vendor/google.golang.org/grpc/internal/grpcsync/pubsub.go b/vendor/google.golang.org/grpc/internal/grpcsync/pubsub.go index f58b5ffa6..aef8cec1a 100644 --- a/vendor/google.golang.org/grpc/internal/grpcsync/pubsub.go +++ b/vendor/google.golang.org/grpc/internal/grpcsync/pubsub.go @@ -29,7 +29,7 @@ import (  type Subscriber interface {  	// OnMessage is invoked when a new message is published. Implementations  	// must not block in this method. -	OnMessage(msg interface{}) +	OnMessage(msg any)  }  // PubSub is a simple one-to-many publish-subscribe system that supports @@ -40,25 +40,23 @@ type Subscriber interface {  // subscribers interested in receiving these messages register a callback  // via the Subscribe() method.  // -// Once a PubSub is stopped, no more messages can be published, and -// it is guaranteed that no more subscriber callback will be invoked. +// Once a PubSub is stopped, no more messages can be published, but any pending +// published messages will be delivered to the subscribers.  Done may be used +// to determine when all published messages have been delivered.  type PubSub struct { -	cs     *CallbackSerializer -	cancel context.CancelFunc +	cs *CallbackSerializer  	// Access to the below fields are guarded by this mutex.  	mu          sync.Mutex -	msg         interface{} +	msg         any  	subscribers map[Subscriber]bool -	stopped     bool  } -// NewPubSub returns a new PubSub instance. -func NewPubSub() *PubSub { -	ctx, cancel := context.WithCancel(context.Background()) +// NewPubSub returns a new PubSub instance.  Users should cancel the +// provided context to shutdown the PubSub. +func NewPubSub(ctx context.Context) *PubSub {  	return &PubSub{  		cs:          NewCallbackSerializer(ctx), -		cancel:      cancel,  		subscribers: map[Subscriber]bool{},  	}  } @@ -75,10 +73,6 @@ func (ps *PubSub) Subscribe(sub Subscriber) (cancel func()) {  	ps.mu.Lock()  	defer ps.mu.Unlock() -	if ps.stopped { -		return func() {} -	} -  	ps.subscribers[sub] = true  	if ps.msg != nil { @@ -102,14 +96,10 @@ func (ps *PubSub) Subscribe(sub Subscriber) (cancel func()) {  // Publish publishes the provided message to the PubSub, and invokes  // callbacks registered by subscribers asynchronously. -func (ps *PubSub) Publish(msg interface{}) { +func (ps *PubSub) Publish(msg any) {  	ps.mu.Lock()  	defer ps.mu.Unlock() -	if ps.stopped { -		return -	} -  	ps.msg = msg  	for sub := range ps.subscribers {  		s := sub @@ -124,13 +114,8 @@ func (ps *PubSub) Publish(msg interface{}) {  	}  } -// Stop shuts down the PubSub and releases any resources allocated by it. -// It is guaranteed that no subscriber callbacks would be invoked once this -// method returns. -func (ps *PubSub) Stop() { -	ps.mu.Lock() -	defer ps.mu.Unlock() -	ps.stopped = true - -	ps.cancel() +// Done returns a channel that is closed after the context passed to NewPubSub +// is canceled and all updates have been sent to subscribers. +func (ps *PubSub) Done() <-chan struct{} { +	return ps.cs.Done()  } diff --git a/vendor/google.golang.org/grpc/idle.go b/vendor/google.golang.org/grpc/internal/idle/idle.go index dc3dc72f6..6c272476e 100644 --- a/vendor/google.golang.org/grpc/idle.go +++ b/vendor/google.golang.org/grpc/internal/idle/idle.go @@ -16,7 +16,9 @@   *   */ -package grpc +// Package idle contains a component for managing idleness (entering and exiting) +// based on RPC activity. +package idle  import (  	"fmt" @@ -24,6 +26,8 @@ import (  	"sync"  	"sync/atomic"  	"time" + +	"google.golang.org/grpc/grpclog"  )  // For overriding in unit tests. @@ -31,31 +35,31 @@ var timeAfterFunc = func(d time.Duration, f func()) *time.Timer {  	return time.AfterFunc(d, f)  } -// idlenessEnforcer is the functionality provided by grpc.ClientConn to enter +// Enforcer is the functionality provided by grpc.ClientConn to enter  // and exit from idle mode. -type idlenessEnforcer interface { -	exitIdleMode() error -	enterIdleMode() error +type Enforcer interface { +	ExitIdleMode() error +	EnterIdleMode() error  } -// idlenessManager defines the functionality required to track RPC activity on a +// Manager defines the functionality required to track RPC activity on a  // channel. -type idlenessManager interface { -	onCallBegin() error -	onCallEnd() -	close() +type Manager interface { +	OnCallBegin() error +	OnCallEnd() +	Close()  } -type noopIdlenessManager struct{} +type noopManager struct{} -func (noopIdlenessManager) onCallBegin() error { return nil } -func (noopIdlenessManager) onCallEnd()         {} -func (noopIdlenessManager) close()             {} +func (noopManager) OnCallBegin() error { return nil } +func (noopManager) OnCallEnd()         {} +func (noopManager) Close()             {} -// idlenessManagerImpl implements the idlenessManager interface. It uses atomic -// operations to synchronize access to shared state and a mutex to guarantee -// mutual exclusion in a critical section. -type idlenessManagerImpl struct { +// manager implements the Manager interface. It uses atomic operations to +// synchronize access to shared state and a mutex to guarantee mutual exclusion +// in a critical section. +type manager struct {  	// State accessed atomically.  	lastCallEndTime           int64 // Unix timestamp in nanos; time when the most recent RPC completed.  	activeCallsCount          int32 // Count of active RPCs; -math.MaxInt32 means channel is idle or is trying to get there. @@ -64,14 +68,15 @@ type idlenessManagerImpl struct {  	// Can be accessed without atomics or mutex since these are set at creation  	// time and read-only after that. -	enforcer idlenessEnforcer // Functionality provided by grpc.ClientConn. -	timeout  int64            // Idle timeout duration nanos stored as an int64. +	enforcer Enforcer // Functionality provided by grpc.ClientConn. +	timeout  int64    // Idle timeout duration nanos stored as an int64. +	logger   grpclog.LoggerV2  	// idleMu is used to guarantee mutual exclusion in two scenarios:  	// - Opposing intentions:  	//   - a: Idle timeout has fired and handleIdleTimeout() is trying to put  	//     the channel in idle mode because the channel has been inactive. -	//   - b: At the same time an RPC is made on the channel, and onCallBegin() +	//   - b: At the same time an RPC is made on the channel, and OnCallBegin()  	//     is trying to prevent the channel from going idle.  	// - Competing intentions:  	//   - The channel is in idle mode and there are multiple RPCs starting at @@ -83,28 +88,37 @@ type idlenessManagerImpl struct {  	timer        *time.Timer  } -// newIdlenessManager creates a new idleness manager implementation for the +// ManagerOptions is a collection of options used by +// NewManager. +type ManagerOptions struct { +	Enforcer Enforcer +	Timeout  time.Duration +	Logger   grpclog.LoggerV2 +} + +// NewManager creates a new idleness manager implementation for the  // given idle timeout. -func newIdlenessManager(enforcer idlenessEnforcer, idleTimeout time.Duration) idlenessManager { -	if idleTimeout == 0 { -		return noopIdlenessManager{} +func NewManager(opts ManagerOptions) Manager { +	if opts.Timeout == 0 { +		return noopManager{}  	} -	i := &idlenessManagerImpl{ -		enforcer: enforcer, -		timeout:  int64(idleTimeout), +	m := &manager{ +		enforcer: opts.Enforcer, +		timeout:  int64(opts.Timeout), +		logger:   opts.Logger,  	} -	i.timer = timeAfterFunc(idleTimeout, i.handleIdleTimeout) -	return i +	m.timer = timeAfterFunc(opts.Timeout, m.handleIdleTimeout) +	return m  }  // resetIdleTimer resets the idle timer to the given duration. This method  // should only be called from the timer callback. -func (i *idlenessManagerImpl) resetIdleTimer(d time.Duration) { -	i.idleMu.Lock() -	defer i.idleMu.Unlock() +func (m *manager) resetIdleTimer(d time.Duration) { +	m.idleMu.Lock() +	defer m.idleMu.Unlock() -	if i.timer == nil { +	if m.timer == nil {  		// Only close sets timer to nil. We are done.  		return  	} @@ -112,47 +126,47 @@ func (i *idlenessManagerImpl) resetIdleTimer(d time.Duration) {  	// It is safe to ignore the return value from Reset() because this method is  	// only ever called from the timer callback, which means the timer has  	// already fired. -	i.timer.Reset(d) +	m.timer.Reset(d)  }  // handleIdleTimeout is the timer callback that is invoked upon expiry of the  // configured idle timeout. The channel is considered inactive if there are no  // ongoing calls and no RPC activity since the last time the timer fired. -func (i *idlenessManagerImpl) handleIdleTimeout() { -	if i.isClosed() { +func (m *manager) handleIdleTimeout() { +	if m.isClosed() {  		return  	} -	if atomic.LoadInt32(&i.activeCallsCount) > 0 { -		i.resetIdleTimer(time.Duration(i.timeout)) +	if atomic.LoadInt32(&m.activeCallsCount) > 0 { +		m.resetIdleTimer(time.Duration(m.timeout))  		return  	}  	// There has been activity on the channel since we last got here. Reset the  	// timer and return. -	if atomic.LoadInt32(&i.activeSinceLastTimerCheck) == 1 { +	if atomic.LoadInt32(&m.activeSinceLastTimerCheck) == 1 {  		// Set the timer to fire after a duration of idle timeout, calculated  		// from the time the most recent RPC completed. -		atomic.StoreInt32(&i.activeSinceLastTimerCheck, 0) -		i.resetIdleTimer(time.Duration(atomic.LoadInt64(&i.lastCallEndTime) + i.timeout - time.Now().UnixNano())) +		atomic.StoreInt32(&m.activeSinceLastTimerCheck, 0) +		m.resetIdleTimer(time.Duration(atomic.LoadInt64(&m.lastCallEndTime) + m.timeout - time.Now().UnixNano()))  		return  	}  	// This CAS operation is extremely likely to succeed given that there has  	// been no activity since the last time we were here.  Setting the -	// activeCallsCount to -math.MaxInt32 indicates to onCallBegin() that the +	// activeCallsCount to -math.MaxInt32 indicates to OnCallBegin() that the  	// channel is either in idle mode or is trying to get there. -	if !atomic.CompareAndSwapInt32(&i.activeCallsCount, 0, -math.MaxInt32) { +	if !atomic.CompareAndSwapInt32(&m.activeCallsCount, 0, -math.MaxInt32) {  		// This CAS operation can fail if an RPC started after we checked for  		// activity at the top of this method, or one was ongoing from before  		// the last time we were here. In both case, reset the timer and return. -		i.resetIdleTimer(time.Duration(i.timeout)) +		m.resetIdleTimer(time.Duration(m.timeout))  		return  	}  	// Now that we've set the active calls count to -math.MaxInt32, it's time to  	// actually move to idle mode. -	if i.tryEnterIdleMode() { +	if m.tryEnterIdleMode() {  		// Successfully entered idle mode. No timer needed until we exit idle.  		return  	} @@ -160,8 +174,8 @@ func (i *idlenessManagerImpl) handleIdleTimeout() {  	// Failed to enter idle mode due to a concurrent RPC that kept the channel  	// active, or because of an error from the channel. Undo the attempt to  	// enter idle, and reset the timer to try again later. -	atomic.AddInt32(&i.activeCallsCount, math.MaxInt32) -	i.resetIdleTimer(time.Duration(i.timeout)) +	atomic.AddInt32(&m.activeCallsCount, math.MaxInt32) +	m.resetIdleTimer(time.Duration(m.timeout))  }  // tryEnterIdleMode instructs the channel to enter idle mode. But before @@ -171,15 +185,15 @@ func (i *idlenessManagerImpl) handleIdleTimeout() {  // Return value indicates whether or not the channel moved to idle mode.  //  // Holds idleMu which ensures mutual exclusion with exitIdleMode. -func (i *idlenessManagerImpl) tryEnterIdleMode() bool { -	i.idleMu.Lock() -	defer i.idleMu.Unlock() +func (m *manager) tryEnterIdleMode() bool { +	m.idleMu.Lock() +	defer m.idleMu.Unlock() -	if atomic.LoadInt32(&i.activeCallsCount) != -math.MaxInt32 { +	if atomic.LoadInt32(&m.activeCallsCount) != -math.MaxInt32 {  		// We raced and lost to a new RPC. Very rare, but stop entering idle.  		return false  	} -	if atomic.LoadInt32(&i.activeSinceLastTimerCheck) == 1 { +	if atomic.LoadInt32(&m.activeSinceLastTimerCheck) == 1 {  		// An very short RPC could have come in (and also finished) after we  		// checked for calls count and activity in handleIdleTimeout(), but  		// before the CAS operation. So, we need to check for activity again. @@ -189,99 +203,99 @@ func (i *idlenessManagerImpl) tryEnterIdleMode() bool {  	// No new RPCs have come in since we last set the active calls count value  	// -math.MaxInt32 in the timer callback. And since we have the lock, it is  	// safe to enter idle mode now. -	if err := i.enforcer.enterIdleMode(); err != nil { -		logger.Errorf("Failed to enter idle mode: %v", err) +	if err := m.enforcer.EnterIdleMode(); err != nil { +		m.logger.Errorf("Failed to enter idle mode: %v", err)  		return false  	}  	// Successfully entered idle mode. -	i.actuallyIdle = true +	m.actuallyIdle = true  	return true  } -// onCallBegin is invoked at the start of every RPC. -func (i *idlenessManagerImpl) onCallBegin() error { -	if i.isClosed() { +// OnCallBegin is invoked at the start of every RPC. +func (m *manager) OnCallBegin() error { +	if m.isClosed() {  		return nil  	} -	if atomic.AddInt32(&i.activeCallsCount, 1) > 0 { +	if atomic.AddInt32(&m.activeCallsCount, 1) > 0 {  		// Channel is not idle now. Set the activity bit and allow the call. -		atomic.StoreInt32(&i.activeSinceLastTimerCheck, 1) +		atomic.StoreInt32(&m.activeSinceLastTimerCheck, 1)  		return nil  	}  	// Channel is either in idle mode or is in the process of moving to idle  	// mode. Attempt to exit idle mode to allow this RPC. -	if err := i.exitIdleMode(); err != nil { +	if err := m.exitIdleMode(); err != nil {  		// Undo the increment to calls count, and return an error causing the  		// RPC to fail. -		atomic.AddInt32(&i.activeCallsCount, -1) +		atomic.AddInt32(&m.activeCallsCount, -1)  		return err  	} -	atomic.StoreInt32(&i.activeSinceLastTimerCheck, 1) +	atomic.StoreInt32(&m.activeSinceLastTimerCheck, 1)  	return nil  }  // exitIdleMode instructs the channel to exit idle mode.  //  // Holds idleMu which ensures mutual exclusion with tryEnterIdleMode. -func (i *idlenessManagerImpl) exitIdleMode() error { -	i.idleMu.Lock() -	defer i.idleMu.Unlock() +func (m *manager) exitIdleMode() error { +	m.idleMu.Lock() +	defer m.idleMu.Unlock() -	if !i.actuallyIdle { +	if !m.actuallyIdle {  		// This can happen in two scenarios:  		// - handleIdleTimeout() set the calls count to -math.MaxInt32 and called  		//   tryEnterIdleMode(). But before the latter could grab the lock, an RPC -		//   came in and onCallBegin() noticed that the calls count is negative. +		//   came in and OnCallBegin() noticed that the calls count is negative.  		// - Channel is in idle mode, and multiple new RPCs come in at the same -		//   time, all of them notice a negative calls count in onCallBegin and get +		//   time, all of them notice a negative calls count in OnCallBegin and get  		//   here. The first one to get the lock would got the channel to exit idle.  		//  		// Either way, nothing to do here.  		return nil  	} -	if err := i.enforcer.exitIdleMode(); err != nil { +	if err := m.enforcer.ExitIdleMode(); err != nil {  		return fmt.Errorf("channel failed to exit idle mode: %v", err)  	}  	// Undo the idle entry process. This also respects any new RPC attempts. -	atomic.AddInt32(&i.activeCallsCount, math.MaxInt32) -	i.actuallyIdle = false +	atomic.AddInt32(&m.activeCallsCount, math.MaxInt32) +	m.actuallyIdle = false  	// Start a new timer to fire after the configured idle timeout. -	i.timer = timeAfterFunc(time.Duration(i.timeout), i.handleIdleTimeout) +	m.timer = timeAfterFunc(time.Duration(m.timeout), m.handleIdleTimeout)  	return nil  } -// onCallEnd is invoked at the end of every RPC. -func (i *idlenessManagerImpl) onCallEnd() { -	if i.isClosed() { +// OnCallEnd is invoked at the end of every RPC. +func (m *manager) OnCallEnd() { +	if m.isClosed() {  		return  	}  	// Record the time at which the most recent call finished. -	atomic.StoreInt64(&i.lastCallEndTime, time.Now().UnixNano()) +	atomic.StoreInt64(&m.lastCallEndTime, time.Now().UnixNano())  	// Decrement the active calls count. This count can temporarily go negative  	// when the timer callback is in the process of moving the channel to idle  	// mode, but one or more RPCs come in and complete before the timer callback  	// can get done with the process of moving to idle mode. -	atomic.AddInt32(&i.activeCallsCount, -1) +	atomic.AddInt32(&m.activeCallsCount, -1)  } -func (i *idlenessManagerImpl) isClosed() bool { -	return atomic.LoadInt32(&i.closed) == 1 +func (m *manager) isClosed() bool { +	return atomic.LoadInt32(&m.closed) == 1  } -func (i *idlenessManagerImpl) close() { -	atomic.StoreInt32(&i.closed, 1) +func (m *manager) Close() { +	atomic.StoreInt32(&m.closed, 1) -	i.idleMu.Lock() -	i.timer.Stop() -	i.timer = nil -	i.idleMu.Unlock() +	m.idleMu.Lock() +	m.timer.Stop() +	m.timer = nil +	m.idleMu.Unlock()  } diff --git a/vendor/google.golang.org/grpc/internal/internal.go b/vendor/google.golang.org/grpc/internal/internal.go index 42ff39c84..c8a8c76d6 100644 --- a/vendor/google.golang.org/grpc/internal/internal.go +++ b/vendor/google.golang.org/grpc/internal/internal.go @@ -30,7 +30,7 @@ import (  var (  	// WithHealthCheckFunc is set by dialoptions.go -	WithHealthCheckFunc interface{} // func (HealthChecker) DialOption +	WithHealthCheckFunc any // func (HealthChecker) DialOption  	// HealthCheckFunc is used to provide client-side LB channel health checking  	HealthCheckFunc HealthChecker  	// BalancerUnregister is exported by package balancer to unregister a balancer. @@ -38,8 +38,12 @@ var (  	// KeepaliveMinPingTime is the minimum ping interval.  This must be 10s by  	// default, but tests may wish to set it lower for convenience.  	KeepaliveMinPingTime = 10 * time.Second +	// KeepaliveMinServerPingTime is the minimum ping interval for servers. +	// This must be 1s by default, but tests may wish to set it lower for +	// convenience. +	KeepaliveMinServerPingTime = time.Second  	// ParseServiceConfig parses a JSON representation of the service config. -	ParseServiceConfig interface{} // func(string) *serviceconfig.ParseResult +	ParseServiceConfig any // func(string) *serviceconfig.ParseResult  	// EqualServiceConfigForTesting is for testing service config generation and  	// parsing. Both a and b should be returned by ParseServiceConfig.  	// This function compares the config without rawJSON stripped, in case the @@ -49,33 +53,33 @@ var (  	// given name. This is set by package certprovider for use from xDS  	// bootstrap code while parsing certificate provider configs in the  	// bootstrap file. -	GetCertificateProviderBuilder interface{} // func(string) certprovider.Builder +	GetCertificateProviderBuilder any // func(string) certprovider.Builder  	// GetXDSHandshakeInfoForTesting returns a pointer to the xds.HandshakeInfo  	// stored in the passed in attributes. This is set by  	// credentials/xds/xds.go. -	GetXDSHandshakeInfoForTesting interface{} // func (*attributes.Attributes) *xds.HandshakeInfo +	GetXDSHandshakeInfoForTesting any // func (*attributes.Attributes) *xds.HandshakeInfo  	// GetServerCredentials returns the transport credentials configured on a  	// gRPC server. An xDS-enabled server needs to know what type of credentials  	// is configured on the underlying gRPC server. This is set by server.go. -	GetServerCredentials interface{} // func (*grpc.Server) credentials.TransportCredentials +	GetServerCredentials any // func (*grpc.Server) credentials.TransportCredentials  	// CanonicalString returns the canonical string of the code defined here:  	// https://github.com/grpc/grpc/blob/master/doc/statuscodes.md.  	//  	// This is used in the 1.0 release of gcp/observability, and thus must not be  	// deleted or changed. -	CanonicalString interface{} // func (codes.Code) string +	CanonicalString any // func (codes.Code) string  	// DrainServerTransports initiates a graceful close of existing connections  	// on a gRPC server accepted on the provided listener address. An  	// xDS-enabled server invokes this method on a grpc.Server when a particular  	// listener moves to "not-serving" mode. -	DrainServerTransports interface{} // func(*grpc.Server, string) +	DrainServerTransports any // func(*grpc.Server, string)  	// AddGlobalServerOptions adds an array of ServerOption that will be  	// effective globally for newly created servers. The priority will be: 1.  	// user-provided; 2. this method; 3. default values.  	//  	// This is used in the 1.0 release of gcp/observability, and thus must not be  	// deleted or changed. -	AddGlobalServerOptions interface{} // func(opt ...ServerOption) +	AddGlobalServerOptions any // func(opt ...ServerOption)  	// ClearGlobalServerOptions clears the array of extra ServerOption. This  	// method is useful in testing and benchmarking.  	// @@ -88,14 +92,14 @@ var (  	//  	// This is used in the 1.0 release of gcp/observability, and thus must not be  	// deleted or changed. -	AddGlobalDialOptions interface{} // func(opt ...DialOption) +	AddGlobalDialOptions any // func(opt ...DialOption)  	// DisableGlobalDialOptions returns a DialOption that prevents the  	// ClientConn from applying the global DialOptions (set via  	// AddGlobalDialOptions).  	//  	// This is used in the 1.0 release of gcp/observability, and thus must not be  	// deleted or changed. -	DisableGlobalDialOptions interface{} // func() grpc.DialOption +	DisableGlobalDialOptions any // func() grpc.DialOption  	// ClearGlobalDialOptions clears the array of extra DialOption. This  	// method is useful in testing and benchmarking.  	// @@ -104,23 +108,26 @@ var (  	ClearGlobalDialOptions func()  	// JoinDialOptions combines the dial options passed as arguments into a  	// single dial option. -	JoinDialOptions interface{} // func(...grpc.DialOption) grpc.DialOption +	JoinDialOptions any // func(...grpc.DialOption) grpc.DialOption  	// JoinServerOptions combines the server options passed as arguments into a  	// single server option. -	JoinServerOptions interface{} // func(...grpc.ServerOption) grpc.ServerOption +	JoinServerOptions any // func(...grpc.ServerOption) grpc.ServerOption  	// WithBinaryLogger returns a DialOption that specifies the binary logger  	// for a ClientConn.  	//  	// This is used in the 1.0 release of gcp/observability, and thus must not be  	// deleted or changed. -	WithBinaryLogger interface{} // func(binarylog.Logger) grpc.DialOption +	WithBinaryLogger any // func(binarylog.Logger) grpc.DialOption  	// BinaryLogger returns a ServerOption that can set the binary logger for a  	// server.  	//  	// This is used in the 1.0 release of gcp/observability, and thus must not be  	// deleted or changed. -	BinaryLogger interface{} // func(binarylog.Logger) grpc.ServerOption +	BinaryLogger any // func(binarylog.Logger) grpc.ServerOption + +	// SubscribeToConnectivityStateChanges adds a grpcsync.Subscriber to a provided grpc.ClientConn +	SubscribeToConnectivityStateChanges any // func(*grpc.ClientConn, grpcsync.Subscriber)  	// NewXDSResolverWithConfigForTesting creates a new xds resolver builder using  	// the provided xds bootstrap config instead of the global configuration from @@ -131,7 +138,7 @@ var (  	//  	// This function should ONLY be used for testing and may not work with some  	// other features, including the CSDS service. -	NewXDSResolverWithConfigForTesting interface{} // func([]byte) (resolver.Builder, error) +	NewXDSResolverWithConfigForTesting any // func([]byte) (resolver.Builder, error)  	// RegisterRLSClusterSpecifierPluginForTesting registers the RLS Cluster  	// Specifier Plugin for testing purposes, regardless of the XDSRLS environment @@ -163,7 +170,11 @@ var (  	UnregisterRBACHTTPFilterForTesting func()  	// ORCAAllowAnyMinReportingInterval is for examples/orca use ONLY. -	ORCAAllowAnyMinReportingInterval interface{} // func(so *orca.ServiceOptions) +	ORCAAllowAnyMinReportingInterval any // func(so *orca.ServiceOptions) + +	// GRPCResolverSchemeExtraMetadata determines when gRPC will add extra +	// metadata to RPCs. +	GRPCResolverSchemeExtraMetadata string = "xds"  )  // HealthChecker defines the signature of the client-side LB channel health checking function. @@ -174,7 +185,7 @@ var (  //  // The health checking protocol is defined at:  // https://github.com/grpc/grpc/blob/master/doc/health-checking.md -type HealthChecker func(ctx context.Context, newStream func(string) (interface{}, error), setConnectivityState func(connectivity.State, error), serviceName string) error +type HealthChecker func(ctx context.Context, newStream func(string) (any, error), setConnectivityState func(connectivity.State, error), serviceName string) error  const (  	// CredsBundleModeFallback switches GoogleDefaultCreds to fallback mode. diff --git a/vendor/google.golang.org/grpc/internal/metadata/metadata.go b/vendor/google.golang.org/grpc/internal/metadata/metadata.go index c82e608e0..900bfb716 100644 --- a/vendor/google.golang.org/grpc/internal/metadata/metadata.go +++ b/vendor/google.golang.org/grpc/internal/metadata/metadata.go @@ -35,7 +35,7 @@ const mdKey = mdKeyType("grpc.internal.address.metadata")  type mdValue metadata.MD -func (m mdValue) Equal(o interface{}) bool { +func (m mdValue) Equal(o any) bool {  	om, ok := o.(mdValue)  	if !ok {  		return false diff --git a/vendor/google.golang.org/grpc/internal/pretty/pretty.go b/vendor/google.golang.org/grpc/internal/pretty/pretty.go index 0177af4b5..703319137 100644 --- a/vendor/google.golang.org/grpc/internal/pretty/pretty.go +++ b/vendor/google.golang.org/grpc/internal/pretty/pretty.go @@ -35,7 +35,7 @@ const jsonIndent = "  "  // ToJSON marshals the input into a json string.  //  // If marshal fails, it falls back to fmt.Sprintf("%+v"). -func ToJSON(e interface{}) string { +func ToJSON(e any) string {  	switch ee := e.(type) {  	case protov1.Message:  		mm := jsonpb.Marshaler{Indent: jsonIndent} diff --git a/vendor/google.golang.org/grpc/internal/resolver/config_selector.go b/vendor/google.golang.org/grpc/internal/resolver/config_selector.go index c7a18a948..f0603871c 100644 --- a/vendor/google.golang.org/grpc/internal/resolver/config_selector.go +++ b/vendor/google.golang.org/grpc/internal/resolver/config_selector.go @@ -92,7 +92,7 @@ type ClientStream interface {  	// calling RecvMsg on the same stream at the same time, but it is not safe  	// to call SendMsg on the same stream in different goroutines. It is also  	// not safe to call CloseSend concurrently with SendMsg. -	SendMsg(m interface{}) error +	SendMsg(m any) error  	// RecvMsg blocks until it receives a message into m or the stream is  	// done. It returns io.EOF when the stream completes successfully. On  	// any other error, the stream is aborted and the error contains the RPC @@ -101,7 +101,7 @@ type ClientStream interface {  	// It is safe to have a goroutine calling SendMsg and another goroutine  	// calling RecvMsg on the same stream at the same time, but it is not  	// safe to call RecvMsg on the same stream in different goroutines. -	RecvMsg(m interface{}) error +	RecvMsg(m any) error  }  // ClientInterceptor is an interceptor for gRPC client streams. diff --git a/vendor/google.golang.org/grpc/internal/status/status.go b/vendor/google.golang.org/grpc/internal/status/status.go index b0ead4f54..4cf85cad9 100644 --- a/vendor/google.golang.org/grpc/internal/status/status.go +++ b/vendor/google.golang.org/grpc/internal/status/status.go @@ -49,7 +49,7 @@ func New(c codes.Code, msg string) *Status {  }  // Newf returns New(c, fmt.Sprintf(format, a...)). -func Newf(c codes.Code, format string, a ...interface{}) *Status { +func Newf(c codes.Code, format string, a ...any) *Status {  	return New(c, fmt.Sprintf(format, a...))  } @@ -64,7 +64,7 @@ func Err(c codes.Code, msg string) error {  }  // Errorf returns Error(c, fmt.Sprintf(format, a...)). -func Errorf(c codes.Code, format string, a ...interface{}) error { +func Errorf(c codes.Code, format string, a ...any) error {  	return Err(c, fmt.Sprintf(format, a...))  } @@ -120,11 +120,11 @@ func (s *Status) WithDetails(details ...proto.Message) (*Status, error) {  // Details returns a slice of details messages attached to the status.  // If a detail cannot be decoded, the error is returned in place of the detail. -func (s *Status) Details() []interface{} { +func (s *Status) Details() []any {  	if s == nil || s.s == nil {  		return nil  	} -	details := make([]interface{}, 0, len(s.s.Details)) +	details := make([]any, 0, len(s.s.Details))  	for _, any := range s.s.Details {  		detail := &ptypes.DynamicAny{}  		if err := ptypes.UnmarshalAny(any, detail); err != nil { diff --git a/vendor/google.golang.org/grpc/internal/transport/controlbuf.go b/vendor/google.golang.org/grpc/internal/transport/controlbuf.go index be5a9c81e..b330ccedc 100644 --- a/vendor/google.golang.org/grpc/internal/transport/controlbuf.go +++ b/vendor/google.golang.org/grpc/internal/transport/controlbuf.go @@ -40,7 +40,7 @@ var updateHeaderTblSize = func(e *hpack.Encoder, v uint32) {  }  type itemNode struct { -	it   interface{} +	it   any  	next *itemNode  } @@ -49,7 +49,7 @@ type itemList struct {  	tail *itemNode  } -func (il *itemList) enqueue(i interface{}) { +func (il *itemList) enqueue(i any) {  	n := &itemNode{it: i}  	if il.tail == nil {  		il.head, il.tail = n, n @@ -61,11 +61,11 @@ func (il *itemList) enqueue(i interface{}) {  // peek returns the first item in the list without removing it from the  // list. -func (il *itemList) peek() interface{} { +func (il *itemList) peek() any {  	return il.head.it  } -func (il *itemList) dequeue() interface{} { +func (il *itemList) dequeue() any {  	if il.head == nil {  		return nil  	} @@ -336,7 +336,7 @@ func (c *controlBuffer) put(it cbItem) error {  	return err  } -func (c *controlBuffer) executeAndPut(f func(it interface{}) bool, it cbItem) (bool, error) { +func (c *controlBuffer) executeAndPut(f func(it any) bool, it cbItem) (bool, error) {  	var wakeUp bool  	c.mu.Lock()  	if c.err != nil { @@ -373,7 +373,7 @@ func (c *controlBuffer) executeAndPut(f func(it interface{}) bool, it cbItem) (b  }  // Note argument f should never be nil. -func (c *controlBuffer) execute(f func(it interface{}) bool, it interface{}) (bool, error) { +func (c *controlBuffer) execute(f func(it any) bool, it any) (bool, error) {  	c.mu.Lock()  	if c.err != nil {  		c.mu.Unlock() @@ -387,7 +387,7 @@ func (c *controlBuffer) execute(f func(it interface{}) bool, it interface{}) (bo  	return true, nil  } -func (c *controlBuffer) get(block bool) (interface{}, error) { +func (c *controlBuffer) get(block bool) (any, error) {  	for {  		c.mu.Lock()  		if c.err != nil { @@ -830,7 +830,7 @@ func (l *loopyWriter) goAwayHandler(g *goAway) error {  	return nil  } -func (l *loopyWriter) handle(i interface{}) error { +func (l *loopyWriter) handle(i any) error {  	switch i := i.(type) {  	case *incomingWindowUpdate:  		l.incomingWindowUpdateHandler(i) diff --git a/vendor/google.golang.org/grpc/internal/transport/http2_client.go b/vendor/google.golang.org/grpc/internal/transport/http2_client.go index 326bf0848..badab8acf 100644 --- a/vendor/google.golang.org/grpc/internal/transport/http2_client.go +++ b/vendor/google.golang.org/grpc/internal/transport/http2_client.go @@ -330,7 +330,7 @@ func newHTTP2Client(connectCtx, ctx context.Context, addr resolver.Address, opts  		readerDone:            make(chan struct{}),  		writerDone:            make(chan struct{}),  		goAway:                make(chan struct{}), -		framer:                newFramer(conn, writeBufSize, readBufSize, maxHeaderListSize), +		framer:                newFramer(conn, writeBufSize, readBufSize, opts.SharedWriteBuffer, maxHeaderListSize),  		fc:                    &trInFlow{limit: uint32(icwz)},  		scheme:                scheme,  		activeStreams:         make(map[uint32]*Stream), @@ -762,7 +762,7 @@ func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (*Stream,  	firstTry := true  	var ch chan struct{}  	transportDrainRequired := false -	checkForStreamQuota := func(it interface{}) bool { +	checkForStreamQuota := func(it any) bool {  		if t.streamQuota <= 0 { // Can go negative if server decreases it.  			if firstTry {  				t.waitingStreams++ @@ -800,7 +800,7 @@ func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (*Stream,  		return true  	}  	var hdrListSizeErr error -	checkForHeaderListSize := func(it interface{}) bool { +	checkForHeaderListSize := func(it any) bool {  		if t.maxSendHeaderListSize == nil {  			return true  		} @@ -815,7 +815,7 @@ func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (*Stream,  		return true  	}  	for { -		success, err := t.controlBuf.executeAndPut(func(it interface{}) bool { +		success, err := t.controlBuf.executeAndPut(func(it any) bool {  			return checkForHeaderListSize(it) && checkForStreamQuota(it)  		}, hdr)  		if err != nil { @@ -927,7 +927,7 @@ func (t *http2Client) closeStream(s *Stream, err error, rst bool, rstCode http2.  		rst:     rst,  		rstCode: rstCode,  	} -	addBackStreamQuota := func(interface{}) bool { +	addBackStreamQuota := func(any) bool {  		t.streamQuota++  		if t.streamQuota > 0 && t.waitingStreams > 0 {  			select { @@ -1080,7 +1080,7 @@ func (t *http2Client) updateWindow(s *Stream, n uint32) {  // for the transport and the stream based on the current bdp  // estimation.  func (t *http2Client) updateFlowControl(n uint32) { -	updateIWS := func(interface{}) bool { +	updateIWS := func(any) bool {  		t.initialWindowSize = int32(n)  		t.mu.Lock()  		for _, s := range t.activeStreams { @@ -1233,7 +1233,7 @@ func (t *http2Client) handleSettings(f *http2.SettingsFrame, isFirst bool) {  		}  		updateFuncs = append(updateFuncs, updateStreamQuota)  	} -	t.controlBuf.executeAndPut(func(interface{}) bool { +	t.controlBuf.executeAndPut(func(any) bool {  		for _, f := range updateFuncs {  			f()  		} @@ -1505,14 +1505,15 @@ func (t *http2Client) operateHeaders(frame *http2.MetaHeadersFrame) {  		return  	} -	isHeader := false - -	// If headerChan hasn't been closed yet -	if atomic.CompareAndSwapUint32(&s.headerChanClosed, 0, 1) { -		s.headerValid = true -		if !endStream { -			// HEADERS frame block carries a Response-Headers. -			isHeader = true +	// For headers, set them in s.header and close headerChan.  For trailers or +	// trailers-only, closeStream will set the trailers and close headerChan as +	// needed. +	if !endStream { +		// If headerChan hasn't been closed yet (expected, given we checked it +		// above, but something else could have potentially closed the whole +		// stream). +		if atomic.CompareAndSwapUint32(&s.headerChanClosed, 0, 1) { +			s.headerValid = true  			// These values can be set without any synchronization because  			// stream goroutine will read it only after seeing a closed  			// headerChan which we'll close after setting this. @@ -1520,15 +1521,12 @@ func (t *http2Client) operateHeaders(frame *http2.MetaHeadersFrame) {  			if len(mdata) > 0 {  				s.header = mdata  			} -		} else { -			// HEADERS frame block carries a Trailers-Only. -			s.noHeaders = true +			close(s.headerChan)  		} -		close(s.headerChan)  	}  	for _, sh := range t.statsHandlers { -		if isHeader { +		if !endStream {  			inHeader := &stats.InHeader{  				Client:      true,  				WireLength:  int(frame.Header().Length), @@ -1554,9 +1552,10 @@ func (t *http2Client) operateHeaders(frame *http2.MetaHeadersFrame) {  		statusGen = status.New(rawStatusCode, grpcMessage)  	} -	// if client received END_STREAM from server while stream was still active, send RST_STREAM -	rst := s.getState() == streamActive -	t.closeStream(s, io.EOF, rst, http2.ErrCodeNo, statusGen, mdata, true) +	// If client received END_STREAM from server while stream was still active, +	// send RST_STREAM. +	rstStream := s.getState() == streamActive +	t.closeStream(s, io.EOF, rstStream, http2.ErrCodeNo, statusGen, mdata, true)  }  // readServerPreface reads and handles the initial settings frame from the diff --git a/vendor/google.golang.org/grpc/internal/transport/http2_server.go b/vendor/google.golang.org/grpc/internal/transport/http2_server.go index f96064012..8d3a353c1 100644 --- a/vendor/google.golang.org/grpc/internal/transport/http2_server.go +++ b/vendor/google.golang.org/grpc/internal/transport/http2_server.go @@ -165,7 +165,7 @@ func NewServerTransport(conn net.Conn, config *ServerConfig) (_ ServerTransport,  	if config.MaxHeaderListSize != nil {  		maxHeaderListSize = *config.MaxHeaderListSize  	} -	framer := newFramer(conn, writeBufSize, readBufSize, maxHeaderListSize) +	framer := newFramer(conn, writeBufSize, readBufSize, config.SharedWriteBuffer, maxHeaderListSize)  	// Send initial settings as connection preface to client.  	isettings := []http2.Setting{{  		ID:  http2.SettingMaxFrameSize, @@ -855,7 +855,7 @@ func (t *http2Server) handleSettings(f *http2.SettingsFrame) {  		}  		return nil  	}) -	t.controlBuf.executeAndPut(func(interface{}) bool { +	t.controlBuf.executeAndPut(func(any) bool {  		for _, f := range updateFuncs {  			f()  		} @@ -939,7 +939,7 @@ func appendHeaderFieldsFromMD(headerFields []hpack.HeaderField, md metadata.MD)  	return headerFields  } -func (t *http2Server) checkForHeaderListSize(it interface{}) bool { +func (t *http2Server) checkForHeaderListSize(it any) bool {  	if t.maxSendHeaderListSize == nil {  		return true  	} diff --git a/vendor/google.golang.org/grpc/internal/transport/http_util.go b/vendor/google.golang.org/grpc/internal/transport/http_util.go index 19cbb18f5..195814008 100644 --- a/vendor/google.golang.org/grpc/internal/transport/http_util.go +++ b/vendor/google.golang.org/grpc/internal/transport/http_util.go @@ -30,6 +30,7 @@ import (  	"net/url"  	"strconv"  	"strings" +	"sync"  	"time"  	"unicode/utf8" @@ -309,6 +310,7 @@ func decodeGrpcMessageUnchecked(msg string) string {  }  type bufWriter struct { +	pool      *sync.Pool  	buf       []byte  	offset    int  	batchSize int @@ -316,12 +318,17 @@ type bufWriter struct {  	err       error  } -func newBufWriter(conn net.Conn, batchSize int) *bufWriter { -	return &bufWriter{ -		buf:       make([]byte, batchSize*2), +func newBufWriter(conn net.Conn, batchSize int, pool *sync.Pool) *bufWriter { +	w := &bufWriter{  		batchSize: batchSize,  		conn:      conn, +		pool:      pool,  	} +	// this indicates that we should use non shared buf +	if pool == nil { +		w.buf = make([]byte, batchSize) +	} +	return w  }  func (w *bufWriter) Write(b []byte) (n int, err error) { @@ -332,19 +339,34 @@ func (w *bufWriter) Write(b []byte) (n int, err error) {  		n, err = w.conn.Write(b)  		return n, toIOError(err)  	} +	if w.buf == nil { +		b := w.pool.Get().(*[]byte) +		w.buf = *b +	}  	for len(b) > 0 {  		nn := copy(w.buf[w.offset:], b)  		b = b[nn:]  		w.offset += nn  		n += nn  		if w.offset >= w.batchSize { -			err = w.Flush() +			err = w.flushKeepBuffer()  		}  	}  	return n, err  }  func (w *bufWriter) Flush() error { +	err := w.flushKeepBuffer() +	// Only release the buffer if we are in a "shared" mode +	if w.buf != nil && w.pool != nil { +		b := w.buf +		w.pool.Put(&b) +		w.buf = nil +	} +	return err +} + +func (w *bufWriter) flushKeepBuffer() error {  	if w.err != nil {  		return w.err  	} @@ -381,7 +403,10 @@ type framer struct {  	fr     *http2.Framer  } -func newFramer(conn net.Conn, writeBufferSize, readBufferSize int, maxHeaderListSize uint32) *framer { +var writeBufferPoolMap map[int]*sync.Pool = make(map[int]*sync.Pool) +var writeBufferMutex sync.Mutex + +func newFramer(conn net.Conn, writeBufferSize, readBufferSize int, sharedWriteBuffer bool, maxHeaderListSize uint32) *framer {  	if writeBufferSize < 0 {  		writeBufferSize = 0  	} @@ -389,7 +414,11 @@ func newFramer(conn net.Conn, writeBufferSize, readBufferSize int, maxHeaderList  	if readBufferSize > 0 {  		r = bufio.NewReaderSize(r, readBufferSize)  	} -	w := newBufWriter(conn, writeBufferSize) +	var pool *sync.Pool +	if sharedWriteBuffer { +		pool = getWriteBufferPool(writeBufferSize) +	} +	w := newBufWriter(conn, writeBufferSize, pool)  	f := &framer{  		writer: w,  		fr:     http2.NewFramer(w, r), @@ -403,6 +432,24 @@ func newFramer(conn net.Conn, writeBufferSize, readBufferSize int, maxHeaderList  	return f  } +func getWriteBufferPool(writeBufferSize int) *sync.Pool { +	writeBufferMutex.Lock() +	defer writeBufferMutex.Unlock() +	size := writeBufferSize * 2 +	pool, ok := writeBufferPoolMap[size] +	if ok { +		return pool +	} +	pool = &sync.Pool{ +		New: func() any { +			b := make([]byte, size) +			return &b +		}, +	} +	writeBufferPoolMap[size] = pool +	return pool +} +  // parseDialTarget returns the network and address to pass to dialer.  func parseDialTarget(target string) (string, string) {  	net := "tcp" diff --git a/vendor/google.golang.org/grpc/internal/transport/transport.go b/vendor/google.golang.org/grpc/internal/transport/transport.go index aa1c89659..74a811fc0 100644 --- a/vendor/google.golang.org/grpc/internal/transport/transport.go +++ b/vendor/google.golang.org/grpc/internal/transport/transport.go @@ -43,10 +43,6 @@ import (  	"google.golang.org/grpc/tap"  ) -// ErrNoHeaders is used as a signal that a trailers only response was received, -// and is not a real error. -var ErrNoHeaders = errors.New("stream has no headers") -  const logLevel = 2  type bufferPool struct { @@ -56,7 +52,7 @@ type bufferPool struct {  func newBufferPool() *bufferPool {  	return &bufferPool{  		pool: sync.Pool{ -			New: func() interface{} { +			New: func() any {  				return new(bytes.Buffer)  			},  		}, @@ -390,14 +386,10 @@ func (s *Stream) Header() (metadata.MD, error) {  	}  	s.waitOnHeader() -	if !s.headerValid { +	if !s.headerValid || s.noHeaders {  		return nil, s.status.Err()  	} -	if s.noHeaders { -		return nil, ErrNoHeaders -	} -  	return s.header.Copy(), nil  } @@ -559,6 +551,7 @@ type ServerConfig struct {  	InitialConnWindowSize int32  	WriteBufferSize       int  	ReadBufferSize        int +	SharedWriteBuffer     bool  	ChannelzParentID      *channelz.Identifier  	MaxHeaderListSize     *uint32  	HeaderTableSize       *uint32 @@ -592,6 +585,8 @@ type ConnectOptions struct {  	WriteBufferSize int  	// ReadBufferSize sets the size of read buffer, which in turn determines how much data can be read at most for one read syscall.  	ReadBufferSize int +	// SharedWriteBuffer indicates whether connections should reuse write buffer +	SharedWriteBuffer bool  	// ChannelzParentID sets the addrConn id which initiate the creation of this client transport.  	ChannelzParentID *channelz.Identifier  	// MaxHeaderListSize sets the max (uncompressed) size of header list that is prepared to be received. @@ -736,7 +731,7 @@ type ServerTransport interface {  }  // connectionErrorf creates an ConnectionError with the specified error description. -func connectionErrorf(temp bool, e error, format string, a ...interface{}) ConnectionError { +func connectionErrorf(temp bool, e error, format string, a ...any) ConnectionError {  	return ConnectionError{  		Desc: fmt.Sprintf(format, a...),  		temp: temp, diff --git a/vendor/google.golang.org/grpc/picker_wrapper.go b/vendor/google.golang.org/grpc/picker_wrapper.go index 02f975951..236837f41 100644 --- a/vendor/google.golang.org/grpc/picker_wrapper.go +++ b/vendor/google.golang.org/grpc/picker_wrapper.go @@ -28,21 +28,26 @@ import (  	"google.golang.org/grpc/internal/channelz"  	istatus "google.golang.org/grpc/internal/status"  	"google.golang.org/grpc/internal/transport" +	"google.golang.org/grpc/stats"  	"google.golang.org/grpc/status"  )  // pickerWrapper is a wrapper of balancer.Picker. It blocks on certain pick  // actions and unblock when there's a picker update.  type pickerWrapper struct { -	mu         sync.Mutex -	done       bool -	idle       bool -	blockingCh chan struct{} -	picker     balancer.Picker +	mu            sync.Mutex +	done          bool +	idle          bool +	blockingCh    chan struct{} +	picker        balancer.Picker +	statsHandlers []stats.Handler // to record blocking picker calls  } -func newPickerWrapper() *pickerWrapper { -	return &pickerWrapper{blockingCh: make(chan struct{})} +func newPickerWrapper(statsHandlers []stats.Handler) *pickerWrapper { +	return &pickerWrapper{ +		blockingCh:    make(chan struct{}), +		statsHandlers: statsHandlers, +	}  }  // updatePicker is called by UpdateBalancerState. It unblocks all blocked pick. @@ -95,6 +100,7 @@ func (pw *pickerWrapper) pick(ctx context.Context, failfast bool, info balancer.  	var ch chan struct{}  	var lastPickErr error +  	for {  		pw.mu.Lock()  		if pw.done { @@ -129,6 +135,20 @@ func (pw *pickerWrapper) pick(ctx context.Context, failfast bool, info balancer.  			continue  		} +		// If the channel is set, it means that the pick call had to wait for a +		// new picker at some point. Either it's the first iteration and this +		// function received the first picker, or a picker errored with +		// ErrNoSubConnAvailable or errored with failfast set to false, which +		// will trigger a continue to the next iteration. In the first case this +		// conditional will hit if this call had to block (the channel is set). +		// In the second case, the only way it will get to this conditional is +		// if there is a new picker. +		if ch != nil { +			for _, sh := range pw.statsHandlers { +				sh.HandleRPC(ctx, &stats.PickerUpdated{}) +			} +		} +  		ch = pw.blockingCh  		p := pw.picker  		pw.mu.Unlock() diff --git a/vendor/google.golang.org/grpc/pickfirst.go b/vendor/google.golang.org/grpc/pickfirst.go index abe266b02..2e9cf66b4 100644 --- a/vendor/google.golang.org/grpc/pickfirst.go +++ b/vendor/google.golang.org/grpc/pickfirst.go @@ -26,12 +26,18 @@ import (  	"google.golang.org/grpc/balancer"  	"google.golang.org/grpc/connectivity"  	"google.golang.org/grpc/internal/envconfig" +	internalgrpclog "google.golang.org/grpc/internal/grpclog"  	"google.golang.org/grpc/internal/grpcrand" +	"google.golang.org/grpc/internal/pretty" +	"google.golang.org/grpc/resolver"  	"google.golang.org/grpc/serviceconfig"  ) -// PickFirstBalancerName is the name of the pick_first balancer. -const PickFirstBalancerName = "pick_first" +const ( +	// PickFirstBalancerName is the name of the pick_first balancer. +	PickFirstBalancerName = "pick_first" +	logPrefix             = "[pick-first-lb %p] " +)  func newPickfirstBuilder() balancer.Builder {  	return &pickfirstBuilder{} @@ -40,7 +46,9 @@ func newPickfirstBuilder() balancer.Builder {  type pickfirstBuilder struct{}  func (*pickfirstBuilder) Build(cc balancer.ClientConn, opt balancer.BuildOptions) balancer.Balancer { -	return &pickfirstBalancer{cc: cc} +	b := &pickfirstBalancer{cc: cc} +	b.logger = internalgrpclog.NewPrefixLogger(logger, fmt.Sprintf(logPrefix, b)) +	return b  }  func (*pickfirstBuilder) Name() string { @@ -57,23 +65,36 @@ type pfConfig struct {  }  func (*pickfirstBuilder) ParseConfig(js json.RawMessage) (serviceconfig.LoadBalancingConfig, error) { -	cfg := &pfConfig{} -	if err := json.Unmarshal(js, cfg); err != nil { +	if !envconfig.PickFirstLBConfig { +		// Prior to supporting loadbalancing configuration, the pick_first LB +		// policy did not implement the balancer.ConfigParser interface. This +		// meant that if a non-empty configuration was passed to it, the service +		// config unmarshaling code would throw a warning log, but would +		// continue using the pick_first LB policy. The code below ensures the +		// same behavior is retained if the env var is not set. +		if string(js) != "{}" { +			logger.Warningf("Ignoring non-empty balancer configuration %q for the pick_first LB policy", string(js)) +		} +		return nil, nil +	} + +	var cfg pfConfig +	if err := json.Unmarshal(js, &cfg); err != nil {  		return nil, fmt.Errorf("pickfirst: unable to unmarshal LB policy config: %s, error: %v", string(js), err)  	}  	return cfg, nil  }  type pickfirstBalancer struct { +	logger  *internalgrpclog.PrefixLogger  	state   connectivity.State  	cc      balancer.ClientConn  	subConn balancer.SubConn -	cfg     *pfConfig  }  func (b *pickfirstBalancer) ResolverError(err error) { -	if logger.V(2) { -		logger.Infof("pickfirstBalancer: ResolverError called with error: %v", err) +	if b.logger.V(2) { +		b.logger.Infof("Received error from the name resolver: %v", err)  	}  	if b.subConn == nil {  		b.state = connectivity.TransientFailure @@ -96,35 +117,44 @@ func (b *pickfirstBalancer) UpdateClientConnState(state balancer.ClientConnState  		// The resolver reported an empty address list. Treat it like an error by  		// calling b.ResolverError.  		if b.subConn != nil { -			// Remove the old subConn. All addresses were removed, so it is no longer -			// valid. -			b.cc.RemoveSubConn(b.subConn) +			// Shut down the old subConn. All addresses were removed, so it is +			// no longer valid. +			b.subConn.Shutdown()  			b.subConn = nil  		}  		b.ResolverError(errors.New("produced zero addresses"))  		return balancer.ErrBadResolverState  	} -	if state.BalancerConfig != nil { -		cfg, ok := state.BalancerConfig.(*pfConfig) -		if !ok { -			return fmt.Errorf("pickfirstBalancer: received nil or illegal BalancerConfig (type %T): %v", state.BalancerConfig, state.BalancerConfig) -		} -		b.cfg = cfg +	// We don't have to guard this block with the env var because ParseConfig +	// already does so. +	cfg, ok := state.BalancerConfig.(pfConfig) +	if state.BalancerConfig != nil && !ok { +		return fmt.Errorf("pickfirst: received illegal BalancerConfig (type %T): %v", state.BalancerConfig, state.BalancerConfig)  	} - -	if envconfig.PickFirstLBConfig && b.cfg != nil && b.cfg.ShuffleAddressList { +	if cfg.ShuffleAddressList { +		addrs = append([]resolver.Address{}, addrs...)  		grpcrand.Shuffle(len(addrs), func(i, j int) { addrs[i], addrs[j] = addrs[j], addrs[i] })  	} + +	if b.logger.V(2) { +		b.logger.Infof("Received new config %s, resolver state %s", pretty.ToJSON(cfg), pretty.ToJSON(state.ResolverState)) +	} +  	if b.subConn != nil {  		b.cc.UpdateAddresses(b.subConn, addrs)  		return nil  	} -	subConn, err := b.cc.NewSubConn(addrs, balancer.NewSubConnOptions{}) +	var subConn balancer.SubConn +	subConn, err := b.cc.NewSubConn(addrs, balancer.NewSubConnOptions{ +		StateListener: func(state balancer.SubConnState) { +			b.updateSubConnState(subConn, state) +		}, +	})  	if err != nil { -		if logger.V(2) { -			logger.Errorf("pickfirstBalancer: failed to NewSubConn: %v", err) +		if b.logger.V(2) { +			b.logger.Infof("Failed to create new SubConn: %v", err)  		}  		b.state = connectivity.TransientFailure  		b.cc.UpdateState(balancer.State{ @@ -143,13 +173,19 @@ func (b *pickfirstBalancer) UpdateClientConnState(state balancer.ClientConnState  	return nil  } +// UpdateSubConnState is unused as a StateListener is always registered when +// creating SubConns.  func (b *pickfirstBalancer) UpdateSubConnState(subConn balancer.SubConn, state balancer.SubConnState) { -	if logger.V(2) { -		logger.Infof("pickfirstBalancer: UpdateSubConnState: %p, %v", subConn, state) +	b.logger.Errorf("UpdateSubConnState(%v, %+v) called unexpectedly", subConn, state) +} + +func (b *pickfirstBalancer) updateSubConnState(subConn balancer.SubConn, state balancer.SubConnState) { +	if b.logger.V(2) { +		b.logger.Infof("Received SubConn state update: %p, %+v", subConn, state)  	}  	if b.subConn != subConn { -		if logger.V(2) { -			logger.Infof("pickfirstBalancer: ignored state change because subConn is not recognized") +		if b.logger.V(2) { +			b.logger.Infof("Ignored state change because subConn is not recognized")  		}  		return  	} diff --git a/vendor/google.golang.org/grpc/preloader.go b/vendor/google.golang.org/grpc/preloader.go index cd4554785..73bd63364 100644 --- a/vendor/google.golang.org/grpc/preloader.go +++ b/vendor/google.golang.org/grpc/preloader.go @@ -37,7 +37,7 @@ type PreparedMsg struct {  }  // Encode marshalls and compresses the message using the codec and compressor for the stream. -func (p *PreparedMsg) Encode(s Stream, msg interface{}) error { +func (p *PreparedMsg) Encode(s Stream, msg any) error {  	ctx := s.Context()  	rpcInfo, ok := rpcInfoFromContext(ctx)  	if !ok { diff --git a/vendor/google.golang.org/grpc/resolver/map.go b/vendor/google.golang.org/grpc/resolver/map.go index efcb7f3ef..804be887d 100644 --- a/vendor/google.golang.org/grpc/resolver/map.go +++ b/vendor/google.golang.org/grpc/resolver/map.go @@ -20,7 +20,7 @@ package resolver  type addressMapEntry struct {  	addr  Address -	value interface{} +	value any  }  // AddressMap is a map of addresses to arbitrary values taking into account @@ -69,7 +69,7 @@ func (l addressMapEntryList) find(addr Address) int {  }  // Get returns the value for the address in the map, if present. -func (a *AddressMap) Get(addr Address) (value interface{}, ok bool) { +func (a *AddressMap) Get(addr Address) (value any, ok bool) {  	addrKey := toMapKey(&addr)  	entryList := a.m[addrKey]  	if entry := entryList.find(addr); entry != -1 { @@ -79,7 +79,7 @@ func (a *AddressMap) Get(addr Address) (value interface{}, ok bool) {  }  // Set updates or adds the value to the address in the map. -func (a *AddressMap) Set(addr Address, value interface{}) { +func (a *AddressMap) Set(addr Address, value any) {  	addrKey := toMapKey(&addr)  	entryList := a.m[addrKey]  	if entry := entryList.find(addr); entry != -1 { @@ -127,8 +127,8 @@ func (a *AddressMap) Keys() []Address {  }  // Values returns a slice of all current map values. -func (a *AddressMap) Values() []interface{} { -	ret := make([]interface{}, 0, a.Len()) +func (a *AddressMap) Values() []any { +	ret := make([]any, 0, a.Len())  	for _, entryList := range a.m {  		for _, entry := range entryList {  			ret = append(ret, entry.value) diff --git a/vendor/google.golang.org/grpc/resolver/resolver.go b/vendor/google.golang.org/grpc/resolver/resolver.go index d8db6f5d3..11384e228 100644 --- a/vendor/google.golang.org/grpc/resolver/resolver.go +++ b/vendor/google.golang.org/grpc/resolver/resolver.go @@ -77,25 +77,6 @@ func GetDefaultScheme() string {  	return defaultScheme  } -// AddressType indicates the address type returned by name resolution. -// -// Deprecated: use Attributes in Address instead. -type AddressType uint8 - -const ( -	// Backend indicates the address is for a backend server. -	// -	// Deprecated: use Attributes in Address instead. -	Backend AddressType = iota -	// GRPCLB indicates the address is for a grpclb load balancer. -	// -	// Deprecated: to select the GRPCLB load balancing policy, use a service -	// config with a corresponding loadBalancingConfig.  To supply balancer -	// addresses to the GRPCLB load balancing policy, set State.Attributes -	// using balancer/grpclb/state.Set. -	GRPCLB -) -  // Address represents a server the client connects to.  //  // # Experimental @@ -111,9 +92,6 @@ type Address struct {  	// the address, instead of the hostname from the Dial target string. In most cases,  	// this should not be set.  	// -	// If Type is GRPCLB, ServerName should be the name of the remote load -	// balancer, not the name of the backend. -	//  	// WARNING: ServerName must only be populated with trusted values. It  	// is insecure to populate it with data from untrusted inputs since untrusted  	// values could be used to bypass the authority checks performed by TLS. @@ -126,18 +104,16 @@ type Address struct {  	// BalancerAttributes contains arbitrary data about this address intended  	// for consumption by the LB policy.  These attributes do not affect SubConn  	// creation, connection establishment, handshaking, etc. -	BalancerAttributes *attributes.Attributes - -	// Type is the type of this address.  	// -	// Deprecated: use Attributes instead. -	Type AddressType +	// Deprecated: when an Address is inside an Endpoint, this field should not +	// be used, and it will eventually be removed entirely. +	BalancerAttributes *attributes.Attributes  	// Metadata is the information associated with Addr, which may be used  	// to make load balancing decision.  	//  	// Deprecated: use Attributes instead. -	Metadata interface{} +	Metadata any  }  // Equal returns whether a and o are identical.  Metadata is compared directly, @@ -150,7 +126,7 @@ func (a Address) Equal(o Address) bool {  	return a.Addr == o.Addr && a.ServerName == o.ServerName &&  		a.Attributes.Equal(o.Attributes) &&  		a.BalancerAttributes.Equal(o.BalancerAttributes) && -		a.Type == o.Type && a.Metadata == o.Metadata +		a.Metadata == o.Metadata  }  // String returns JSON formatted string representation of the address. @@ -194,11 +170,37 @@ type BuildOptions struct {  	Dialer func(context.Context, string) (net.Conn, error)  } +// An Endpoint is one network endpoint, or server, which may have multiple +// addresses with which it can be accessed. +type Endpoint struct { +	// Addresses contains a list of addresses used to access this endpoint. +	Addresses []Address + +	// Attributes contains arbitrary data about this endpoint intended for +	// consumption by the LB policy. +	Attributes *attributes.Attributes +} +  // State contains the current Resolver state relevant to the ClientConn.  type State struct {  	// Addresses is the latest set of resolved addresses for the target. +	// +	// If a resolver sets Addresses but does not set Endpoints, one Endpoint +	// will be created for each Address before the State is passed to the LB +	// policy.  The BalancerAttributes of each entry in Addresses will be set +	// in Endpoints.Attributes, and be cleared in the Endpoint's Address's +	// BalancerAttributes. +	// +	// Soon, Addresses will be deprecated and replaced fully by Endpoints.  	Addresses []Address +	// Endpoints is the latest set of resolved endpoints for the target. +	// +	// If a resolver produces a State containing Endpoints but not Addresses, +	// it must take care to ensure the LB policies it selects will support +	// Endpoints. +	Endpoints []Endpoint +  	// ServiceConfig contains the result from parsing the latest service  	// config.  If it is nil, it indicates no service config is present or the  	// resolver does not provide service configs. @@ -258,15 +260,6 @@ type ClientConn interface {  // target does not contain a scheme or if the parsed scheme is not registered  // (i.e. no corresponding resolver available to resolve the endpoint), we will  // apply the default scheme, and will attempt to reparse it. -// -// Examples: -// -//   - "dns://some_authority/foo.bar" -//     Target{Scheme: "dns", Authority: "some_authority", Endpoint: "foo.bar"} -//   - "foo.bar" -//     Target{Scheme: resolver.GetDefaultScheme(), Endpoint: "foo.bar"} -//   - "unknown_scheme://authority/endpoint" -//     Target{Scheme: resolver.GetDefaultScheme(), Endpoint: "unknown_scheme://authority/endpoint"}  type Target struct {  	// URL contains the parsed dial target with an optional default scheme added  	// to it if the original dial target contained no scheme or contained an @@ -321,10 +314,3 @@ type Resolver interface {  	// Close closes the resolver.  	Close()  } - -// UnregisterForTesting removes the resolver builder with the given scheme from the -// resolver map. -// This function is for testing only. -func UnregisterForTesting(scheme string) { -	delete(m, scheme) -} diff --git a/vendor/google.golang.org/grpc/resolver_conn_wrapper.go b/vendor/google.golang.org/grpc/resolver_conn_wrapper.go index b408b3688..d68330560 100644 --- a/vendor/google.golang.org/grpc/resolver_conn_wrapper.go +++ b/vendor/google.golang.org/grpc/resolver_conn_wrapper.go @@ -133,7 +133,7 @@ func (ccr *ccResolverWrapper) close() {  	ccr.mu.Unlock()  	// Give enqueued callbacks a chance to finish. -	<-ccr.serializer.Done +	<-ccr.serializer.Done()  	// Spawn a goroutine to close the resolver (since it may block trying to  	// cleanup all allocated resources) and return early. @@ -152,6 +152,14 @@ func (ccr *ccResolverWrapper) serializerScheduleLocked(f func(context.Context))  // which includes addresses and service config.  func (ccr *ccResolverWrapper) UpdateState(s resolver.State) error {  	errCh := make(chan error, 1) +	if s.Endpoints == nil { +		s.Endpoints = make([]resolver.Endpoint, 0, len(s.Addresses)) +		for _, a := range s.Addresses { +			ep := resolver.Endpoint{Addresses: []resolver.Address{a}, Attributes: a.BalancerAttributes} +			ep.Addresses[0].BalancerAttributes = nil +			s.Endpoints = append(s.Endpoints, ep) +		} +	}  	ok := ccr.serializer.Schedule(func(context.Context) {  		ccr.addChannelzTraceEvent(s)  		ccr.curState = s diff --git a/vendor/google.golang.org/grpc/rpc_util.go b/vendor/google.golang.org/grpc/rpc_util.go index a844d28f4..b7723aa09 100644 --- a/vendor/google.golang.org/grpc/rpc_util.go +++ b/vendor/google.golang.org/grpc/rpc_util.go @@ -75,7 +75,7 @@ func NewGZIPCompressorWithLevel(level int) (Compressor, error) {  	}  	return &gzipCompressor{  		pool: sync.Pool{ -			New: func() interface{} { +			New: func() any {  				w, err := gzip.NewWriterLevel(io.Discard, level)  				if err != nil {  					panic(err) @@ -626,7 +626,7 @@ func (p *parser) recvMsg(maxReceiveMessageSize int) (pf payloadFormat, msg []byt  // encode serializes msg and returns a buffer containing the message, or an  // error if it is too large to be transmitted by grpc.  If msg is nil, it  // generates an empty message. -func encode(c baseCodec, msg interface{}) ([]byte, error) { +func encode(c baseCodec, msg any) ([]byte, error) {  	if msg == nil { // NOTE: typed nils will not be caught by this check  		return nil, nil  	} @@ -693,7 +693,7 @@ func msgHeader(data, compData []byte) (hdr []byte, payload []byte) {  	return hdr, data  } -func outPayload(client bool, msg interface{}, data, payload []byte, t time.Time) *stats.OutPayload { +func outPayload(client bool, msg any, data, payload []byte, t time.Time) *stats.OutPayload {  	return &stats.OutPayload{  		Client:           client,  		Payload:          msg, @@ -792,7 +792,7 @@ func decompress(compressor encoding.Compressor, d []byte, maxReceiveMessageSize  // For the two compressor parameters, both should not be set, but if they are,  // dc takes precedence over compressor.  // TODO(dfawley): wrap the old compressor/decompressor using the new API? -func recv(p *parser, c baseCodec, s *transport.Stream, dc Decompressor, m interface{}, maxReceiveMessageSize int, payInfo *payloadInfo, compressor encoding.Compressor) error { +func recv(p *parser, c baseCodec, s *transport.Stream, dc Decompressor, m any, maxReceiveMessageSize int, payInfo *payloadInfo, compressor encoding.Compressor) error {  	buf, err := recvAndDecompress(p, s, dc, maxReceiveMessageSize, payInfo, compressor)  	if err != nil {  		return err @@ -863,19 +863,22 @@ func ErrorDesc(err error) string {  // Errorf returns nil if c is OK.  //  // Deprecated: use status.Errorf instead. -func Errorf(c codes.Code, format string, a ...interface{}) error { +func Errorf(c codes.Code, format string, a ...any) error {  	return status.Errorf(c, format, a...)  } +var errContextCanceled = status.Error(codes.Canceled, context.Canceled.Error()) +var errContextDeadline = status.Error(codes.DeadlineExceeded, context.DeadlineExceeded.Error()) +  // toRPCErr converts an error into an error from the status package.  func toRPCErr(err error) error {  	switch err {  	case nil, io.EOF:  		return err  	case context.DeadlineExceeded: -		return status.Error(codes.DeadlineExceeded, err.Error()) +		return errContextDeadline  	case context.Canceled: -		return status.Error(codes.Canceled, err.Error()) +		return errContextCanceled  	case io.ErrUnexpectedEOF:  		return status.Error(codes.Internal, err.Error())  	} diff --git a/vendor/google.golang.org/grpc/server.go b/vendor/google.golang.org/grpc/server.go index e076ec714..244123c6c 100644 --- a/vendor/google.golang.org/grpc/server.go +++ b/vendor/google.golang.org/grpc/server.go @@ -86,7 +86,7 @@ func init() {  var statusOK = status.New(codes.OK, "")  var logger = grpclog.Component("core") -type methodHandler func(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor UnaryServerInterceptor) (interface{}, error) +type methodHandler func(srv any, ctx context.Context, dec func(any) error, interceptor UnaryServerInterceptor) (any, error)  // MethodDesc represents an RPC service's method specification.  type MethodDesc struct { @@ -99,20 +99,20 @@ type ServiceDesc struct {  	ServiceName string  	// The pointer to the service interface. Used to check whether the user  	// provided implementation satisfies the interface requirements. -	HandlerType interface{} +	HandlerType any  	Methods     []MethodDesc  	Streams     []StreamDesc -	Metadata    interface{} +	Metadata    any  }  // serviceInfo wraps information about a service. It is very similar to  // ServiceDesc and is constructed from it for internal purposes.  type serviceInfo struct {  	// Contains the implementation for the methods in this service. -	serviceImpl interface{} +	serviceImpl any  	methods     map[string]*MethodDesc  	streams     map[string]*StreamDesc -	mdata       interface{} +	mdata       any  }  type serverWorkerData struct { @@ -170,6 +170,7 @@ type serverOptions struct {  	initialConnWindowSize int32  	writeBufferSize       int  	readBufferSize        int +	sharedWriteBuffer     bool  	connectionTimeout     time.Duration  	maxHeaderListSize     *uint32  	headerTableSize       *uint32 @@ -235,6 +236,20 @@ func newJoinServerOption(opts ...ServerOption) ServerOption {  	return &joinServerOption{opts: opts}  } +// SharedWriteBuffer allows reusing per-connection transport write buffer. +// If this option is set to true every connection will release the buffer after +// flushing the data on the wire. +// +// # Experimental +// +// Notice: This API is EXPERIMENTAL and may be changed or removed in a +// later release. +func SharedWriteBuffer(val bool) ServerOption { +	return newFuncServerOption(func(o *serverOptions) { +		o.sharedWriteBuffer = val +	}) +} +  // WriteBufferSize determines how much data can be batched before doing a write  // on the wire. The corresponding memory allocation for this buffer will be  // twice the size to keep syscalls low. The default value for this buffer is @@ -275,9 +290,9 @@ func InitialConnWindowSize(s int32) ServerOption {  // KeepaliveParams returns a ServerOption that sets keepalive and max-age parameters for the server.  func KeepaliveParams(kp keepalive.ServerParameters) ServerOption { -	if kp.Time > 0 && kp.Time < time.Second { +	if kp.Time > 0 && kp.Time < internal.KeepaliveMinServerPingTime {  		logger.Warning("Adjusting keepalive ping interval to minimum period of 1s") -		kp.Time = time.Second +		kp.Time = internal.KeepaliveMinServerPingTime  	}  	return newFuncServerOption(func(o *serverOptions) { @@ -655,7 +670,7 @@ func NewServer(opt ...ServerOption) *Server {  // printf records an event in s's event log, unless s has been stopped.  // REQUIRES s.mu is held. -func (s *Server) printf(format string, a ...interface{}) { +func (s *Server) printf(format string, a ...any) {  	if s.events != nil {  		s.events.Printf(format, a...)  	} @@ -663,7 +678,7 @@ func (s *Server) printf(format string, a ...interface{}) {  // errorf records an error in s's event log, unless s has been stopped.  // REQUIRES s.mu is held. -func (s *Server) errorf(format string, a ...interface{}) { +func (s *Server) errorf(format string, a ...any) {  	if s.events != nil {  		s.events.Errorf(format, a...)  	} @@ -678,14 +693,14 @@ type ServiceRegistrar interface {  	// once the server has started serving.  	// desc describes the service and its methods and handlers. impl is the  	// service implementation which is passed to the method handlers. -	RegisterService(desc *ServiceDesc, impl interface{}) +	RegisterService(desc *ServiceDesc, impl any)  }  // RegisterService registers a service and its implementation to the gRPC  // server. It is called from the IDL generated code. This must be called before  // invoking Serve. If ss is non-nil (for legacy code), its type is checked to  // ensure it implements sd.HandlerType. -func (s *Server) RegisterService(sd *ServiceDesc, ss interface{}) { +func (s *Server) RegisterService(sd *ServiceDesc, ss any) {  	if ss != nil {  		ht := reflect.TypeOf(sd.HandlerType).Elem()  		st := reflect.TypeOf(ss) @@ -696,7 +711,7 @@ func (s *Server) RegisterService(sd *ServiceDesc, ss interface{}) {  	s.register(sd, ss)  } -func (s *Server) register(sd *ServiceDesc, ss interface{}) { +func (s *Server) register(sd *ServiceDesc, ss any) {  	s.mu.Lock()  	defer s.mu.Unlock()  	s.printf("RegisterService(%q)", sd.ServiceName) @@ -737,7 +752,7 @@ type MethodInfo struct {  type ServiceInfo struct {  	Methods []MethodInfo  	// Metadata is the metadata specified in ServiceDesc when registering service. -	Metadata interface{} +	Metadata any  }  // GetServiceInfo returns a map from service names to ServiceInfo. @@ -938,6 +953,7 @@ func (s *Server) newHTTP2Transport(c net.Conn) transport.ServerTransport {  		InitialConnWindowSize: s.opts.initialConnWindowSize,  		WriteBufferSize:       s.opts.writeBufferSize,  		ReadBufferSize:        s.opts.readBufferSize, +		SharedWriteBuffer:     s.opts.sharedWriteBuffer,  		ChannelzParentID:      s.channelzID,  		MaxHeaderListSize:     s.opts.maxHeaderListSize,  		HeaderTableSize:       s.opts.headerTableSize, @@ -1119,7 +1135,7 @@ func (s *Server) incrCallsFailed() {  	atomic.AddInt64(&s.czData.callsFailed, 1)  } -func (s *Server) sendResponse(t transport.ServerTransport, stream *transport.Stream, msg interface{}, cp Compressor, opts *transport.Options, comp encoding.Compressor) error { +func (s *Server) sendResponse(t transport.ServerTransport, stream *transport.Stream, msg any, cp Compressor, opts *transport.Options, comp encoding.Compressor) error {  	data, err := encode(s.getCodec(stream.ContentSubtype()), msg)  	if err != nil {  		channelz.Error(logger, s.channelzID, "grpc: server failed to encode response: ", err) @@ -1166,7 +1182,7 @@ func chainUnaryServerInterceptors(s *Server) {  }  func chainUnaryInterceptors(interceptors []UnaryServerInterceptor) UnaryServerInterceptor { -	return func(ctx context.Context, req interface{}, info *UnaryServerInfo, handler UnaryHandler) (interface{}, error) { +	return func(ctx context.Context, req any, info *UnaryServerInfo, handler UnaryHandler) (any, error) {  		return interceptors[0](ctx, req, info, getChainUnaryHandler(interceptors, 0, info, handler))  	}  } @@ -1175,7 +1191,7 @@ func getChainUnaryHandler(interceptors []UnaryServerInterceptor, curr int, info  	if curr == len(interceptors)-1 {  		return finalHandler  	} -	return func(ctx context.Context, req interface{}) (interface{}, error) { +	return func(ctx context.Context, req any) (any, error) {  		return interceptors[curr+1](ctx, req, info, getChainUnaryHandler(interceptors, curr+1, info, finalHandler))  	}  } @@ -1212,7 +1228,7 @@ func (s *Server) processUnaryRPC(t transport.ServerTransport, stream *transport.  		defer func() {  			if trInfo != nil {  				if err != nil && err != io.EOF { -					trInfo.tr.LazyLog(&fmtStringer{"%v", []interface{}{err}}, true) +					trInfo.tr.LazyLog(&fmtStringer{"%v", []any{err}}, true)  					trInfo.tr.SetError()  				}  				trInfo.tr.Finish() @@ -1329,7 +1345,7 @@ func (s *Server) processUnaryRPC(t transport.ServerTransport, stream *transport.  	if channelz.IsOn() {  		t.IncrMsgRecv()  	} -	df := func(v interface{}) error { +	df := func(v any) error {  		if err := s.getCodec(stream.ContentSubtype()).Unmarshal(d, v); err != nil {  			return status.Errorf(codes.Internal, "grpc: error unmarshalling request: %v", err)  		} @@ -1493,7 +1509,7 @@ func chainStreamServerInterceptors(s *Server) {  }  func chainStreamInterceptors(interceptors []StreamServerInterceptor) StreamServerInterceptor { -	return func(srv interface{}, ss ServerStream, info *StreamServerInfo, handler StreamHandler) error { +	return func(srv any, ss ServerStream, info *StreamServerInfo, handler StreamHandler) error {  		return interceptors[0](srv, ss, info, getChainStreamHandler(interceptors, 0, info, handler))  	}  } @@ -1502,7 +1518,7 @@ func getChainStreamHandler(interceptors []StreamServerInterceptor, curr int, inf  	if curr == len(interceptors)-1 {  		return finalHandler  	} -	return func(srv interface{}, stream ServerStream) error { +	return func(srv any, stream ServerStream) error {  		return interceptors[curr+1](srv, stream, info, getChainStreamHandler(interceptors, curr+1, info, finalHandler))  	}  } @@ -1543,7 +1559,7 @@ func (s *Server) processStreamingRPC(t transport.ServerTransport, stream *transp  			if trInfo != nil {  				ss.mu.Lock()  				if err != nil && err != io.EOF { -					ss.trInfo.tr.LazyLog(&fmtStringer{"%v", []interface{}{err}}, true) +					ss.trInfo.tr.LazyLog(&fmtStringer{"%v", []any{err}}, true)  					ss.trInfo.tr.SetError()  				}  				ss.trInfo.tr.Finish() @@ -1646,7 +1662,7 @@ func (s *Server) processStreamingRPC(t transport.ServerTransport, stream *transp  		trInfo.tr.LazyLog(&trInfo.firstLine, false)  	}  	var appErr error -	var server interface{} +	var server any  	if info != nil {  		server = info.serviceImpl  	} @@ -1712,13 +1728,13 @@ func (s *Server) handleStream(t transport.ServerTransport, stream *transport.Str  	pos := strings.LastIndex(sm, "/")  	if pos == -1 {  		if trInfo != nil { -			trInfo.tr.LazyLog(&fmtStringer{"Malformed method name %q", []interface{}{sm}}, true) +			trInfo.tr.LazyLog(&fmtStringer{"Malformed method name %q", []any{sm}}, true)  			trInfo.tr.SetError()  		}  		errDesc := fmt.Sprintf("malformed method name: %q", stream.Method())  		if err := t.WriteStatus(stream, status.New(codes.Unimplemented, errDesc)); err != nil {  			if trInfo != nil { -				trInfo.tr.LazyLog(&fmtStringer{"%v", []interface{}{err}}, true) +				trInfo.tr.LazyLog(&fmtStringer{"%v", []any{err}}, true)  				trInfo.tr.SetError()  			}  			channelz.Warningf(logger, s.channelzID, "grpc: Server.handleStream failed to write status: %v", err) @@ -1759,7 +1775,7 @@ func (s *Server) handleStream(t transport.ServerTransport, stream *transport.Str  	}  	if err := t.WriteStatus(stream, status.New(codes.Unimplemented, errDesc)); err != nil {  		if trInfo != nil { -			trInfo.tr.LazyLog(&fmtStringer{"%v", []interface{}{err}}, true) +			trInfo.tr.LazyLog(&fmtStringer{"%v", []any{err}}, true)  			trInfo.tr.SetError()  		}  		channelz.Warningf(logger, s.channelzID, "grpc: Server.handleStream failed to write status: %v", err) diff --git a/vendor/google.golang.org/grpc/shared_buffer_pool.go b/vendor/google.golang.org/grpc/shared_buffer_pool.go index c3a5a9ac1..48a64cfe8 100644 --- a/vendor/google.golang.org/grpc/shared_buffer_pool.go +++ b/vendor/google.golang.org/grpc/shared_buffer_pool.go @@ -109,7 +109,7 @@ const (  type simpleSharedBufferChildPool interface {  	Get(size int) []byte -	Put(interface{}) +	Put(any)  }  type bufferPool struct { @@ -133,7 +133,7 @@ func (p *bufferPool) Get(size int) []byte {  func newBytesPool(size int) simpleSharedBufferChildPool {  	return &bufferPool{  		Pool: sync.Pool{ -			New: func() interface{} { +			New: func() any {  				bs := make([]byte, size)  				return &bs  			}, diff --git a/vendor/google.golang.org/grpc/stats/stats.go b/vendor/google.golang.org/grpc/stats/stats.go index 7a552a9b7..4ab70e2d4 100644 --- a/vendor/google.golang.org/grpc/stats/stats.go +++ b/vendor/google.golang.org/grpc/stats/stats.go @@ -59,12 +59,22 @@ func (s *Begin) IsClient() bool { return s.Client }  func (s *Begin) isRPCStats() {} +// PickerUpdated indicates that the LB policy provided a new picker while the +// RPC was waiting for one. +type PickerUpdated struct{} + +// IsClient indicates if the stats information is from client side. Only Client +// Side interfaces with a Picker, thus always returns true. +func (*PickerUpdated) IsClient() bool { return true } + +func (*PickerUpdated) isRPCStats() {} +  // InPayload contains the information for an incoming payload.  type InPayload struct {  	// Client is true if this InPayload is from client side.  	Client bool  	// Payload is the payload with original type. -	Payload interface{} +	Payload any  	// Data is the serialized message payload.  	Data []byte @@ -134,7 +144,7 @@ type OutPayload struct {  	// Client is true if this OutPayload is from client side.  	Client bool  	// Payload is the payload with original type. -	Payload interface{} +	Payload any  	// Data is the serialized message payload.  	Data []byte  	// Length is the size of the uncompressed payload data. Does not include any diff --git a/vendor/google.golang.org/grpc/status/status.go b/vendor/google.golang.org/grpc/status/status.go index bcf2e4d81..a93360efb 100644 --- a/vendor/google.golang.org/grpc/status/status.go +++ b/vendor/google.golang.org/grpc/status/status.go @@ -50,7 +50,7 @@ func New(c codes.Code, msg string) *Status {  }  // Newf returns New(c, fmt.Sprintf(format, a...)). -func Newf(c codes.Code, format string, a ...interface{}) *Status { +func Newf(c codes.Code, format string, a ...any) *Status {  	return New(c, fmt.Sprintf(format, a...))  } @@ -60,7 +60,7 @@ func Error(c codes.Code, msg string) error {  }  // Errorf returns Error(c, fmt.Sprintf(format, a...)). -func Errorf(c codes.Code, format string, a ...interface{}) error { +func Errorf(c codes.Code, format string, a ...any) error {  	return Error(c, fmt.Sprintf(format, a...))  } @@ -99,25 +99,27 @@ func FromError(err error) (s *Status, ok bool) {  	}  	type grpcstatus interface{ GRPCStatus() *Status }  	if gs, ok := err.(grpcstatus); ok { -		if gs.GRPCStatus() == nil { +		grpcStatus := gs.GRPCStatus() +		if grpcStatus == nil {  			// Error has status nil, which maps to codes.OK. There  			// is no sensible behavior for this, so we turn it into  			// an error with codes.Unknown and discard the existing  			// status.  			return New(codes.Unknown, err.Error()), false  		} -		return gs.GRPCStatus(), true +		return grpcStatus, true  	}  	var gs grpcstatus  	if errors.As(err, &gs) { -		if gs.GRPCStatus() == nil { +		grpcStatus := gs.GRPCStatus() +		if grpcStatus == nil {  			// Error wraps an error that has status nil, which maps  			// to codes.OK.  There is no sensible behavior for this,  			// so we turn it into an error with codes.Unknown and  			// discard the existing status.  			return New(codes.Unknown, err.Error()), false  		} -		p := gs.GRPCStatus().Proto() +		p := grpcStatus.Proto()  		p.Message = err.Error()  		return status.FromProto(p), true  	} diff --git a/vendor/google.golang.org/grpc/stream.go b/vendor/google.golang.org/grpc/stream.go index de32a7597..421a41f88 100644 --- a/vendor/google.golang.org/grpc/stream.go +++ b/vendor/google.golang.org/grpc/stream.go @@ -31,6 +31,7 @@ import (  	"google.golang.org/grpc/balancer"  	"google.golang.org/grpc/codes"  	"google.golang.org/grpc/encoding" +	"google.golang.org/grpc/internal"  	"google.golang.org/grpc/internal/balancerload"  	"google.golang.org/grpc/internal/binarylog"  	"google.golang.org/grpc/internal/channelz" @@ -54,7 +55,7 @@ import (  // status package, or be one of the context errors. Otherwise, gRPC will use  // codes.Unknown as the status code and err.Error() as the status message of the  // RPC. -type StreamHandler func(srv interface{}, stream ServerStream) error +type StreamHandler func(srv any, stream ServerStream) error  // StreamDesc represents a streaming RPC service's method specification.  Used  // on the server when registering services and on the client when initiating @@ -79,9 +80,9 @@ type Stream interface {  	// Deprecated: See ClientStream and ServerStream documentation instead.  	Context() context.Context  	// Deprecated: See ClientStream and ServerStream documentation instead. -	SendMsg(m interface{}) error +	SendMsg(m any) error  	// Deprecated: See ClientStream and ServerStream documentation instead. -	RecvMsg(m interface{}) error +	RecvMsg(m any) error  }  // ClientStream defines the client-side behavior of a streaming RPC. @@ -90,7 +91,9 @@ type Stream interface {  // status package.  type ClientStream interface {  	// Header returns the header metadata received from the server if there -	// is any. It blocks if the metadata is not ready to read. +	// is any. It blocks if the metadata is not ready to read.  If the metadata +	// is nil and the error is also nil, then the stream was terminated without +	// headers, and the status can be discovered by calling RecvMsg.  	Header() (metadata.MD, error)  	// Trailer returns the trailer metadata from the server, if there is any.  	// It must only be called after stream.CloseAndRecv has returned, or @@ -126,7 +129,7 @@ type ClientStream interface {  	//  	// It is not safe to modify the message after calling SendMsg. Tracing  	// libraries and stats handlers may use the message lazily. -	SendMsg(m interface{}) error +	SendMsg(m any) error  	// RecvMsg blocks until it receives a message into m or the stream is  	// done. It returns io.EOF when the stream completes successfully. On  	// any other error, the stream is aborted and the error contains the RPC @@ -135,7 +138,7 @@ type ClientStream interface {  	// It is safe to have a goroutine calling SendMsg and another goroutine  	// calling RecvMsg on the same stream at the same time, but it is not  	// safe to call RecvMsg on the same stream in different goroutines. -	RecvMsg(m interface{}) error +	RecvMsg(m any) error  }  // NewStream creates a new Stream for the client side. This is typically @@ -155,10 +158,10 @@ type ClientStream interface {  // If none of the above happen, a goroutine and a context will be leaked, and grpc  // will not call the optionally-configured stats handler with a stats.End message.  func (cc *ClientConn) NewStream(ctx context.Context, desc *StreamDesc, method string, opts ...CallOption) (ClientStream, error) { -	if err := cc.idlenessMgr.onCallBegin(); err != nil { +	if err := cc.idlenessMgr.OnCallBegin(); err != nil {  		return nil, err  	} -	defer cc.idlenessMgr.onCallEnd() +	defer cc.idlenessMgr.OnCallEnd()  	// allow interceptor to see all applicable call options, which means those  	// configured as defaults from dial option as well as per-call options @@ -433,7 +436,7 @@ func (cs *clientStream) newAttemptLocked(isTransparent bool) (*csAttempt, error)  		ctx = trace.NewContext(ctx, trInfo.tr)  	} -	if cs.cc.parsedTarget.URL.Scheme == "xds" { +	if cs.cc.parsedTarget.URL.Scheme == internal.GRPCResolverSchemeExtraMetadata {  		// Add extra metadata (metadata that will be added by transport) to context  		// so the balancer can see them.  		ctx = grpcutil.WithExtraMetadata(ctx, metadata.Pairs( @@ -788,23 +791,24 @@ func (cs *clientStream) withRetry(op func(a *csAttempt) error, onSuccess func())  func (cs *clientStream) Header() (metadata.MD, error) {  	var m metadata.MD -	noHeader := false  	err := cs.withRetry(func(a *csAttempt) error {  		var err error  		m, err = a.s.Header() -		if err == transport.ErrNoHeaders { -			noHeader = true -			return nil -		}  		return toRPCErr(err)  	}, cs.commitAttemptLocked) +	if m == nil && err == nil { +		// The stream ended with success.  Finish the clientStream. +		err = io.EOF +	} +  	if err != nil {  		cs.finish(err) -		return nil, err +		// Do not return the error.  The user should get it by calling Recv(). +		return nil, nil  	} -	if len(cs.binlogs) != 0 && !cs.serverHeaderBinlogged && !noHeader { +	if len(cs.binlogs) != 0 && !cs.serverHeaderBinlogged && m != nil {  		// Only log if binary log is on and header has not been logged, and  		// there is actually headers to log.  		logEntry := &binarylog.ServerHeader{ @@ -820,6 +824,7 @@ func (cs *clientStream) Header() (metadata.MD, error) {  			binlog.Log(cs.ctx, logEntry)  		}  	} +  	return m, nil  } @@ -860,7 +865,7 @@ func (cs *clientStream) bufferForRetryLocked(sz int, op func(a *csAttempt) error  	cs.buffer = append(cs.buffer, op)  } -func (cs *clientStream) SendMsg(m interface{}) (err error) { +func (cs *clientStream) SendMsg(m any) (err error) {  	defer func() {  		if err != nil && err != io.EOF {  			// Call finish on the client stream for errors generated by this SendMsg @@ -904,7 +909,7 @@ func (cs *clientStream) SendMsg(m interface{}) (err error) {  	return err  } -func (cs *clientStream) RecvMsg(m interface{}) error { +func (cs *clientStream) RecvMsg(m any) error {  	if len(cs.binlogs) != 0 && !cs.serverHeaderBinlogged {  		// Call Header() to binary log header if it's not already logged.  		cs.Header() @@ -928,24 +933,6 @@ func (cs *clientStream) RecvMsg(m interface{}) error {  	if err != nil || !cs.desc.ServerStreams {  		// err != nil or non-server-streaming indicates end of stream.  		cs.finish(err) - -		if len(cs.binlogs) != 0 { -			// finish will not log Trailer. Log Trailer here. -			logEntry := &binarylog.ServerTrailer{ -				OnClientSide: true, -				Trailer:      cs.Trailer(), -				Err:          err, -			} -			if logEntry.Err == io.EOF { -				logEntry.Err = nil -			} -			if peer, ok := peer.FromContext(cs.Context()); ok { -				logEntry.PeerAddr = peer.Addr -			} -			for _, binlog := range cs.binlogs { -				binlog.Log(cs.ctx, logEntry) -			} -		}  	}  	return err  } @@ -1001,18 +988,30 @@ func (cs *clientStream) finish(err error) {  			}  		}  	} +  	cs.mu.Unlock() -	// For binary logging. only log cancel in finish (could be caused by RPC ctx -	// canceled or ClientConn closed). Trailer will be logged in RecvMsg. -	// -	// Only one of cancel or trailer needs to be logged. In the cases where -	// users don't call RecvMsg, users must have already canceled the RPC. -	if len(cs.binlogs) != 0 && status.Code(err) == codes.Canceled { -		c := &binarylog.Cancel{ -			OnClientSide: true, -		} -		for _, binlog := range cs.binlogs { -			binlog.Log(cs.ctx, c) +	// Only one of cancel or trailer needs to be logged. +	if len(cs.binlogs) != 0 { +		switch err { +		case errContextCanceled, errContextDeadline, ErrClientConnClosing: +			c := &binarylog.Cancel{ +				OnClientSide: true, +			} +			for _, binlog := range cs.binlogs { +				binlog.Log(cs.ctx, c) +			} +		default: +			logEntry := &binarylog.ServerTrailer{ +				OnClientSide: true, +				Trailer:      cs.Trailer(), +				Err:          err, +			} +			if peer, ok := peer.FromContext(cs.Context()); ok { +				logEntry.PeerAddr = peer.Addr +			} +			for _, binlog := range cs.binlogs { +				binlog.Log(cs.ctx, logEntry) +			}  		}  	}  	if err == nil { @@ -1028,7 +1027,7 @@ func (cs *clientStream) finish(err error) {  	cs.cancel()  } -func (a *csAttempt) sendMsg(m interface{}, hdr, payld, data []byte) error { +func (a *csAttempt) sendMsg(m any, hdr, payld, data []byte) error {  	cs := a.cs  	if a.trInfo != nil {  		a.mu.Lock() @@ -1055,7 +1054,7 @@ func (a *csAttempt) sendMsg(m interface{}, hdr, payld, data []byte) error {  	return nil  } -func (a *csAttempt) recvMsg(m interface{}, payInfo *payloadInfo) (err error) { +func (a *csAttempt) recvMsg(m any, payInfo *payloadInfo) (err error) {  	cs := a.cs  	if len(a.statsHandlers) != 0 && payInfo == nil {  		payInfo = &payloadInfo{} @@ -1348,7 +1347,7 @@ func (as *addrConnStream) Context() context.Context {  	return as.s.Context()  } -func (as *addrConnStream) SendMsg(m interface{}) (err error) { +func (as *addrConnStream) SendMsg(m any) (err error) {  	defer func() {  		if err != nil && err != io.EOF {  			// Call finish on the client stream for errors generated by this SendMsg @@ -1393,7 +1392,7 @@ func (as *addrConnStream) SendMsg(m interface{}) (err error) {  	return nil  } -func (as *addrConnStream) RecvMsg(m interface{}) (err error) { +func (as *addrConnStream) RecvMsg(m any) (err error) {  	defer func() {  		if err != nil || !as.desc.ServerStreams {  			// err != nil or non-server-streaming indicates end of stream. @@ -1512,7 +1511,7 @@ type ServerStream interface {  	//  	// It is not safe to modify the message after calling SendMsg. Tracing  	// libraries and stats handlers may use the message lazily. -	SendMsg(m interface{}) error +	SendMsg(m any) error  	// RecvMsg blocks until it receives a message into m or the stream is  	// done. It returns io.EOF when the client has performed a CloseSend. On  	// any non-EOF error, the stream is aborted and the error contains the @@ -1521,7 +1520,7 @@ type ServerStream interface {  	// It is safe to have a goroutine calling SendMsg and another goroutine  	// calling RecvMsg on the same stream at the same time, but it is not  	// safe to call RecvMsg on the same stream in different goroutines. -	RecvMsg(m interface{}) error +	RecvMsg(m any) error  }  // serverStream implements a server side Stream. @@ -1602,7 +1601,7 @@ func (ss *serverStream) SetTrailer(md metadata.MD) {  	ss.s.SetTrailer(md)  } -func (ss *serverStream) SendMsg(m interface{}) (err error) { +func (ss *serverStream) SendMsg(m any) (err error) {  	defer func() {  		if ss.trInfo != nil {  			ss.mu.Lock() @@ -1610,7 +1609,7 @@ func (ss *serverStream) SendMsg(m interface{}) (err error) {  				if err == nil {  					ss.trInfo.tr.LazyLog(&payload{sent: true, msg: m}, true)  				} else { -					ss.trInfo.tr.LazyLog(&fmtStringer{"%v", []interface{}{err}}, true) +					ss.trInfo.tr.LazyLog(&fmtStringer{"%v", []any{err}}, true)  					ss.trInfo.tr.SetError()  				}  			} @@ -1677,7 +1676,7 @@ func (ss *serverStream) SendMsg(m interface{}) (err error) {  	return nil  } -func (ss *serverStream) RecvMsg(m interface{}) (err error) { +func (ss *serverStream) RecvMsg(m any) (err error) {  	defer func() {  		if ss.trInfo != nil {  			ss.mu.Lock() @@ -1685,7 +1684,7 @@ func (ss *serverStream) RecvMsg(m interface{}) (err error) {  				if err == nil {  					ss.trInfo.tr.LazyLog(&payload{sent: false, msg: m}, true)  				} else if err != io.EOF { -					ss.trInfo.tr.LazyLog(&fmtStringer{"%v", []interface{}{err}}, true) +					ss.trInfo.tr.LazyLog(&fmtStringer{"%v", []any{err}}, true)  					ss.trInfo.tr.SetError()  				}  			} @@ -1757,7 +1756,7 @@ func MethodFromServerStream(stream ServerStream) (string, bool) {  // prepareMsg returns the hdr, payload and data  // using the compressors passed or using the  // passed preparedmsg -func prepareMsg(m interface{}, codec baseCodec, cp Compressor, comp encoding.Compressor) (hdr, payload, data []byte, err error) { +func prepareMsg(m any, codec baseCodec, cp Compressor, comp encoding.Compressor) (hdr, payload, data []byte, err error) {  	if preparedMsg, ok := m.(*PreparedMsg); ok {  		return preparedMsg.hdr, preparedMsg.payload, preparedMsg.encodedData, nil  	} diff --git a/vendor/google.golang.org/grpc/trace.go b/vendor/google.golang.org/grpc/trace.go index 07a2d26b3..9ded79321 100644 --- a/vendor/google.golang.org/grpc/trace.go +++ b/vendor/google.golang.org/grpc/trace.go @@ -97,8 +97,8 @@ func truncate(x string, l int) string {  // payload represents an RPC request or response payload.  type payload struct { -	sent bool        // whether this is an outgoing payload -	msg  interface{} // e.g. a proto.Message +	sent bool // whether this is an outgoing payload +	msg  any  // e.g. a proto.Message  	// TODO(dsymonds): add stringifying info to codec, and limit how much we hold here?  } @@ -111,7 +111,7 @@ func (p payload) String() string {  type fmtStringer struct {  	format string -	a      []interface{} +	a      []any  }  func (f *fmtStringer) String() string { diff --git a/vendor/google.golang.org/grpc/version.go b/vendor/google.golang.org/grpc/version.go index 353cfd528..914ce665f 100644 --- a/vendor/google.golang.org/grpc/version.go +++ b/vendor/google.golang.org/grpc/version.go @@ -19,4 +19,4 @@  package grpc  // Version is the current grpc version. -const Version = "1.57.0" +const Version = "1.58.0" diff --git a/vendor/google.golang.org/grpc/vet.sh b/vendor/google.golang.org/grpc/vet.sh index a8e4732b3..bbc9e2e3c 100644 --- a/vendor/google.golang.org/grpc/vet.sh +++ b/vendor/google.golang.org/grpc/vet.sh @@ -84,6 +84,9 @@ not git grep -l 'x/net/context' -- "*.go"  #   thread safety.  git grep -l '"math/rand"' -- "*.go" 2>&1 | not grep -v '^examples\|^stress\|grpcrand\|^benchmark\|wrr_test' +# - Do not use "interface{}"; use "any" instead. +git grep -l 'interface{}' -- "*.go" 2>&1 | not grep -v '\.pb\.go\|protoc-gen-go-grpc' +  # - Do not call grpclog directly. Use grpclog.Component instead.  git grep -l -e 'grpclog.I' --or -e 'grpclog.W' --or -e 'grpclog.E' --or -e 'grpclog.F' --or -e 'grpclog.V' -- "*.go" | not grep -v '^grpclog/component.go\|^internal/grpctest/tlogger_test.go' @@ -106,7 +109,7 @@ for MOD_FILE in $(find . -name 'go.mod'); do    goimports -l . 2>&1 | not grep -vE "\.pb\.go"    golint ./... 2>&1 | not grep -vE "/grpc_testing_not_regenerate/.*\.pb\.go:" -  go mod tidy -compat=1.17 +  go mod tidy -compat=1.19    git status --porcelain 2>&1 | fail_on_output || \      (git status; git --no-pager diff; exit 1)    popd @@ -168,8 +171,6 @@ proto.RegisteredExtension is deprecated  proto.RegisteredExtensions is deprecated  proto.RegisterMapType is deprecated  proto.Unmarshaler is deprecated -resolver.Backend -resolver.GRPCLB  Target is deprecated: Use the Target field in the BuildOptions instead.  xxx_messageInfo_  ' "${SC_OUT}" diff --git a/vendor/modules.txt b/vendor/modules.txt index 9b236c249..98cc60a73 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -749,8 +749,8 @@ github.com/yuin/goldmark/renderer  github.com/yuin/goldmark/renderer/html  github.com/yuin/goldmark/text  github.com/yuin/goldmark/util -# go.opentelemetry.io/otel v1.17.0 -## explicit; go 1.19 +# go.opentelemetry.io/otel v1.18.0 +## explicit; go 1.20  go.opentelemetry.io/otel  go.opentelemetry.io/otel/attribute  go.opentelemetry.io/otel/baggage @@ -768,8 +768,8 @@ go.opentelemetry.io/otel/semconv/v1.17.0  go.opentelemetry.io/otel/semconv/v1.17.0/httpconv  go.opentelemetry.io/otel/semconv/v1.20.0  go.opentelemetry.io/otel/semconv/v1.21.0 -# go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.17.0 -## explicit; go 1.19 +# go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.18.0 +## explicit; go 1.20  go.opentelemetry.io/otel/exporters/otlp/otlptrace  go.opentelemetry.io/otel/exporters/otlp/otlptrace/internal/tracetransform  # go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.17.0 @@ -779,27 +779,27 @@ go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc/internal  go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc/internal/envconfig  go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc/internal/otlpconfig  go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc/internal/retry -# go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.17.0 -## explicit; go 1.19 +# go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.18.0 +## explicit; go 1.20  go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp  go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp/internal  go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp/internal/envconfig  go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp/internal/otlpconfig  go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp/internal/retry -# go.opentelemetry.io/otel/metric v1.17.0 -## explicit; go 1.19 +# go.opentelemetry.io/otel/metric v1.18.0 +## explicit; go 1.20  go.opentelemetry.io/otel/metric  go.opentelemetry.io/otel/metric/embedded -# go.opentelemetry.io/otel/sdk v1.17.0 -## explicit; go 1.19 +# go.opentelemetry.io/otel/sdk v1.18.0 +## explicit; go 1.20  go.opentelemetry.io/otel/sdk  go.opentelemetry.io/otel/sdk/instrumentation  go.opentelemetry.io/otel/sdk/internal  go.opentelemetry.io/otel/sdk/internal/env  go.opentelemetry.io/otel/sdk/resource  go.opentelemetry.io/otel/sdk/trace -# go.opentelemetry.io/otel/trace v1.17.0 -## explicit; go 1.19 +# go.opentelemetry.io/otel/trace v1.18.0 +## explicit; go 1.20  go.opentelemetry.io/otel/trace  # go.opentelemetry.io/proto/otlp v1.0.0  ## explicit; go 1.17 @@ -924,15 +924,15 @@ google.golang.org/appengine/internal/log  google.golang.org/appengine/internal/remote_api  google.golang.org/appengine/internal/urlfetch  google.golang.org/appengine/urlfetch -# google.golang.org/genproto/googleapis/api v0.0.0-20230530153820-e85fd2cbaebc +# google.golang.org/genproto/googleapis/api v0.0.0-20230711160842-782d3b101e98  ## explicit; go 1.19  google.golang.org/genproto/googleapis/api/httpbody -# google.golang.org/genproto/googleapis/rpc v0.0.0-20230530153820-e85fd2cbaebc +# google.golang.org/genproto/googleapis/rpc v0.0.0-20230711160842-782d3b101e98  ## explicit; go 1.19  google.golang.org/genproto/googleapis/rpc/errdetails  google.golang.org/genproto/googleapis/rpc/status -# google.golang.org/grpc v1.57.0 -## explicit; go 1.17 +# google.golang.org/grpc v1.58.0 +## explicit; go 1.19  google.golang.org/grpc  google.golang.org/grpc/attributes  google.golang.org/grpc/backoff @@ -964,6 +964,7 @@ google.golang.org/grpc/internal/grpclog  google.golang.org/grpc/internal/grpcrand  google.golang.org/grpc/internal/grpcsync  google.golang.org/grpc/internal/grpcutil +google.golang.org/grpc/internal/idle  google.golang.org/grpc/internal/metadata  google.golang.org/grpc/internal/pretty  google.golang.org/grpc/internal/resolver | 
