diff options
| author | 2024-04-11 11:46:18 +0200 | |
|---|---|---|
| committer | 2024-04-11 11:46:18 +0200 | |
| commit | c097745c38537b86df282ba2d2a2f2b791dd477b (patch) | |
| tree | 6ba202ab6a7358d7a76ce9ae5b408af78383874e /vendor/google.golang.org/grpc | |
| parent | [chore] update go-structr => v0.6.2 (fixes nested field ptr following) (#2822) (diff) | |
| download | gotosocial-c097745c38537b86df282ba2d2a2f2b791dd477b.tar.xz | |
[chore]: Bump go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc (#2818)
Bumps [go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc](https://github.com/open-telemetry/opentelemetry-go) from 1.24.0 to 1.25.0.
- [Release notes](https://github.com/open-telemetry/opentelemetry-go/releases)
- [Changelog](https://github.com/open-telemetry/opentelemetry-go/blob/main/CHANGELOG.md)
- [Commits](https://github.com/open-telemetry/opentelemetry-go/compare/v1.24.0...v1.25.0)
---
updated-dependencies:
- dependency-name: go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc
  dependency-type: direct:production
  update-type: version-update:semver-minor
...
Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
Diffstat (limited to 'vendor/google.golang.org/grpc')
51 files changed, 2167 insertions, 2093 deletions
| diff --git a/vendor/google.golang.org/grpc/balancer/balancer.go b/vendor/google.golang.org/grpc/balancer/balancer.go index d79560a2e..f391744f7 100644 --- a/vendor/google.golang.org/grpc/balancer/balancer.go +++ b/vendor/google.golang.org/grpc/balancer/balancer.go @@ -54,13 +54,14 @@ var (  // an init() function), and is not thread-safe. If multiple Balancers are  // registered with the same name, the one registered last will take effect.  func Register(b Builder) { -	if strings.ToLower(b.Name()) != b.Name() { +	name := strings.ToLower(b.Name()) +	if name != b.Name() {  		// TODO: Skip the use of strings.ToLower() to index the map after v1.59  		// is released to switch to case sensitive balancer registry. Also,  		// remove this warning and update the docstrings for Register and Get.  		logger.Warningf("Balancer registered with name %q. grpc-go will be switching to case sensitive balancer registries soon", b.Name())  	} -	m[strings.ToLower(b.Name())] = b +	m[name] = b  }  // unregisterForTesting deletes the balancer with the given name from the @@ -232,8 +233,8 @@ type BuildOptions struct {  	// implementations which do not communicate with a remote load balancer  	// server can ignore this field.  	Authority string -	// ChannelzParentID is the parent ClientConn's channelz ID. -	ChannelzParentID *channelz.Identifier +	// ChannelzParent is the parent ClientConn's channelz channel. +	ChannelzParent channelz.Identifier  	// CustomUserAgent is the custom user agent set on the parent ClientConn.  	// The balancer should set the same custom user agent if it creates a  	// ClientConn. diff --git a/vendor/google.golang.org/grpc/balancer_wrapper.go b/vendor/google.golang.org/grpc/balancer_wrapper.go index b5e30cff0..af39b8a4c 100644 --- a/vendor/google.golang.org/grpc/balancer_wrapper.go +++ b/vendor/google.golang.org/grpc/balancer_wrapper.go @@ -21,7 +21,6 @@ package grpc  import (  	"context"  	"fmt" -	"strings"  	"sync"  	"google.golang.org/grpc/balancer" @@ -66,19 +65,20 @@ type ccBalancerWrapper struct {  }  // newCCBalancerWrapper creates a new balancer wrapper in idle state. The -// underlying balancer is not created until the switchTo() method is invoked. +// underlying balancer is not created until the updateClientConnState() method +// is invoked.  func newCCBalancerWrapper(cc *ClientConn) *ccBalancerWrapper {  	ctx, cancel := context.WithCancel(cc.ctx)  	ccb := &ccBalancerWrapper{  		cc: cc,  		opts: balancer.BuildOptions{ -			DialCreds:        cc.dopts.copts.TransportCredentials, -			CredsBundle:      cc.dopts.copts.CredsBundle, -			Dialer:           cc.dopts.copts.Dialer, -			Authority:        cc.authority, -			CustomUserAgent:  cc.dopts.copts.UserAgent, -			ChannelzParentID: cc.channelzID, -			Target:           cc.parsedTarget, +			DialCreds:       cc.dopts.copts.TransportCredentials, +			CredsBundle:     cc.dopts.copts.CredsBundle, +			Dialer:          cc.dopts.copts.Dialer, +			Authority:       cc.authority, +			CustomUserAgent: cc.dopts.copts.UserAgent, +			ChannelzParent:  cc.channelz, +			Target:          cc.parsedTarget,  		},  		serializer:       grpcsync.NewCallbackSerializer(ctx),  		serializerCancel: cancel, @@ -97,6 +97,11 @@ func (ccb *ccBalancerWrapper) updateClientConnState(ccs *balancer.ClientConnStat  		if ctx.Err() != nil || ccb.balancer == nil {  			return  		} +		name := gracefulswitch.ChildName(ccs.BalancerConfig) +		if ccb.curBalancerName != name { +			ccb.curBalancerName = name +			channelz.Infof(logger, ccb.cc.channelz, "Channel switches to new LB policy %q", name) +		}  		err := ccb.balancer.UpdateClientConnState(*ccs)  		if logger.V(2) && err != nil {  			logger.Infof("error from balancer.UpdateClientConnState: %v", err) @@ -120,54 +125,6 @@ func (ccb *ccBalancerWrapper) resolverError(err error) {  	})  } -// switchTo is invoked by grpc to instruct the balancer wrapper to switch to the -// LB policy identified by name. -// -// ClientConn calls newCCBalancerWrapper() at creation time. Upon receipt of the -// first good update from the name resolver, it determines the LB policy to use -// and invokes the switchTo() method. Upon receipt of every subsequent update -// from the name resolver, it invokes this method. -// -// the ccBalancerWrapper keeps track of the current LB policy name, and skips -// the graceful balancer switching process if the name does not change. -func (ccb *ccBalancerWrapper) switchTo(name string) { -	ccb.serializer.Schedule(func(ctx context.Context) { -		if ctx.Err() != nil || ccb.balancer == nil { -			return -		} -		// TODO: Other languages use case-sensitive balancer registries. We should -		// switch as well. See: https://github.com/grpc/grpc-go/issues/5288. -		if strings.EqualFold(ccb.curBalancerName, name) { -			return -		} -		ccb.buildLoadBalancingPolicy(name) -	}) -} - -// buildLoadBalancingPolicy performs the following: -//   - retrieve a balancer builder for the given name. Use the default LB -//     policy, pick_first, if no LB policy with name is found in the registry. -//   - instruct the gracefulswitch balancer to switch to the above builder. This -//     will actually build the new balancer. -//   - update the `curBalancerName` field -// -// Must be called from a serializer callback. -func (ccb *ccBalancerWrapper) buildLoadBalancingPolicy(name string) { -	builder := balancer.Get(name) -	if builder == nil { -		channelz.Warningf(logger, ccb.cc.channelzID, "Channel switches to new LB policy %q, since the specified LB policy %q was not registered", PickFirstBalancerName, name) -		builder = newPickfirstBuilder() -	} else { -		channelz.Infof(logger, ccb.cc.channelzID, "Channel switches to new LB policy %q", name) -	} - -	if err := ccb.balancer.SwitchTo(builder); err != nil { -		channelz.Errorf(logger, ccb.cc.channelzID, "Channel failed to build new LB policy %q: %v", name, err) -		return -	} -	ccb.curBalancerName = builder.Name() -} -  // close initiates async shutdown of the wrapper.  cc.mu must be held when  // calling this function.  To determine the wrapper has finished shutting down,  // the channel should block on ccb.serializer.Done() without cc.mu held. @@ -175,7 +132,7 @@ func (ccb *ccBalancerWrapper) close() {  	ccb.mu.Lock()  	ccb.closed = true  	ccb.mu.Unlock() -	channelz.Info(logger, ccb.cc.channelzID, "ccBalancerWrapper: closing") +	channelz.Info(logger, ccb.cc.channelz, "ccBalancerWrapper: closing")  	ccb.serializer.Schedule(func(context.Context) {  		if ccb.balancer == nil {  			return @@ -212,7 +169,7 @@ func (ccb *ccBalancerWrapper) NewSubConn(addrs []resolver.Address, opts balancer  	}  	ac, err := ccb.cc.newAddrConnLocked(addrs, opts)  	if err != nil { -		channelz.Warningf(logger, ccb.cc.channelzID, "acBalancerWrapper: NewSubConn: failed to newAddrConn: %v", err) +		channelz.Warningf(logger, ccb.cc.channelz, "acBalancerWrapper: NewSubConn: failed to newAddrConn: %v", err)  		return nil, err  	}  	acbw := &acBalancerWrapper{ @@ -304,7 +261,7 @@ func (acbw *acBalancerWrapper) updateState(s connectivity.State, err error) {  }  func (acbw *acBalancerWrapper) String() string { -	return fmt.Sprintf("SubConn(id:%d)", acbw.ac.channelzID.Int()) +	return fmt.Sprintf("SubConn(id:%d)", acbw.ac.channelz.ID)  }  func (acbw *acBalancerWrapper) UpdateAddresses(addrs []resolver.Address) { diff --git a/vendor/google.golang.org/grpc/binarylog/grpc_binarylog_v1/binarylog.pb.go b/vendor/google.golang.org/grpc/binarylog/grpc_binarylog_v1/binarylog.pb.go index e9e97d451..856c75dd4 100644 --- a/vendor/google.golang.org/grpc/binarylog/grpc_binarylog_v1/binarylog.pb.go +++ b/vendor/google.golang.org/grpc/binarylog/grpc_binarylog_v1/binarylog.pb.go @@ -18,8 +18,8 @@  // Code generated by protoc-gen-go. DO NOT EDIT.  // versions: -// 	protoc-gen-go v1.31.0 -// 	protoc        v4.22.0 +// 	protoc-gen-go v1.32.0 +// 	protoc        v4.25.2  // source: grpc/binlog/v1/binarylog.proto  package grpc_binarylog_v1 diff --git a/vendor/google.golang.org/grpc/clientconn.go b/vendor/google.golang.org/grpc/clientconn.go index f6e815e6b..e3eb44d58 100644 --- a/vendor/google.golang.org/grpc/clientconn.go +++ b/vendor/google.golang.org/grpc/clientconn.go @@ -67,7 +67,7 @@ var (  	errConnDrain = errors.New("grpc: the connection is drained")  	// errConnClosing indicates that the connection is closing.  	errConnClosing = errors.New("grpc: the connection is closing") -	// errConnIdling indicates the the connection is being closed as the channel +	// errConnIdling indicates the connection is being closed as the channel  	// is moving to an idle mode due to inactivity.  	errConnIdling = errors.New("grpc: the connection is closing due to channel idleness")  	// invalidDefaultServiceConfigErrPrefix is used to prefix the json parsing error for the default @@ -101,11 +101,6 @@ const (  	defaultReadBufSize  = 32 * 1024  ) -// Dial creates a client connection to the given target. -func Dial(target string, opts ...DialOption) (*ClientConn, error) { -	return DialContext(context.Background(), target, opts...) -} -  type defaultConfigSelector struct {  	sc *ServiceConfig  } @@ -117,13 +112,22 @@ func (dcs *defaultConfigSelector) SelectConfig(rpcInfo iresolver.RPCInfo) (*ires  	}, nil  } -// newClient returns a new client in idle mode. -func newClient(target string, opts ...DialOption) (conn *ClientConn, err error) { +// NewClient creates a new gRPC "channel" for the target URI provided.  No I/O +// is performed.  Use of the ClientConn for RPCs will automatically cause it to +// connect.  Connect may be used to manually create a connection, but for most +// users this is unnecessary. +// +// The target name syntax is defined in +// https://github.com/grpc/grpc/blob/master/doc/naming.md.  e.g. to use dns +// resolver, a "dns:///" prefix should be applied to the target. +// +// The DialOptions returned by WithBlock, WithTimeout, and +// WithReturnConnectionError are ignored by this function. +func NewClient(target string, opts ...DialOption) (conn *ClientConn, err error) {  	cc := &ClientConn{  		target: target,  		conns:  make(map[*addrConn]struct{}),  		dopts:  defaultDialOptions(), -		czData: new(channelzData),  	}  	cc.retryThrottler.Store((*retryThrottler)(nil)) @@ -175,15 +179,15 @@ func newClient(target string, opts ...DialOption) (conn *ClientConn, err error)  	// Determine the resolver to use.  	if err := cc.parseTargetAndFindResolver(); err != nil { -		channelz.RemoveEntry(cc.channelzID) +		channelz.RemoveEntry(cc.channelz.ID)  		return nil, err  	}  	if err = cc.determineAuthority(); err != nil { -		channelz.RemoveEntry(cc.channelzID) +		channelz.RemoveEntry(cc.channelz.ID)  		return nil, err  	} -	cc.csMgr = newConnectivityStateManager(cc.ctx, cc.channelzID) +	cc.csMgr = newConnectivityStateManager(cc.ctx, cc.channelz)  	cc.pickerWrapper = newPickerWrapper(cc.dopts.copts.StatsHandlers)  	cc.initIdleStateLocked() // Safe to call without the lock, since nothing else has a reference to cc. @@ -191,39 +195,36 @@ func newClient(target string, opts ...DialOption) (conn *ClientConn, err error)  	return cc, nil  } -// DialContext creates a client connection to the given target. By default, it's -// a non-blocking dial (the function won't wait for connections to be -// established, and connecting happens in the background). To make it a blocking -// dial, use WithBlock() dial option. +// Dial calls DialContext(context.Background(), target, opts...).  // -// In the non-blocking case, the ctx does not act against the connection. It -// only controls the setup steps. +// Deprecated: use NewClient instead.  Will be supported throughout 1.x. +func Dial(target string, opts ...DialOption) (*ClientConn, error) { +	return DialContext(context.Background(), target, opts...) +} + +// DialContext calls NewClient and then exits idle mode.  If WithBlock(true) is +// used, it calls Connect and WaitForStateChange until either the context +// expires or the state of the ClientConn is Ready.  // -// In the blocking case, ctx can be used to cancel or expire the pending -// connection. Once this function returns, the cancellation and expiration of -// ctx will be noop. Users should call ClientConn.Close to terminate all the -// pending operations after this function returns. +// One subtle difference between NewClient and Dial and DialContext is that the +// former uses "dns" as the default name resolver, while the latter use +// "passthrough" for backward compatibility.  This distinction should not matter +// to most users, but could matter to legacy users that specify a custom dialer +// and expect it to receive the target string directly.  // -// The target name syntax is defined in -// https://github.com/grpc/grpc/blob/master/doc/naming.md. -// e.g. to use dns resolver, a "dns:///" prefix should be applied to the target. +// Deprecated: use NewClient instead.  Will be supported throughout 1.x.  func DialContext(ctx context.Context, target string, opts ...DialOption) (conn *ClientConn, err error) { -	cc, err := newClient(target, opts...) +	// At the end of this method, we kick the channel out of idle, rather than +	// waiting for the first rpc. +	opts = append([]DialOption{withDefaultScheme("passthrough")}, opts...) +	cc, err := NewClient(target, opts...)  	if err != nil {  		return nil, err  	}  	// We start the channel off in idle mode, but kick it out of idle now, -	// instead of waiting for the first RPC. Other gRPC implementations do wait -	// for the first RPC to kick the channel out of idle. But doing so would be -	// a major behavior change for our users who are used to seeing the channel -	// active after Dial. -	// -	// Taking this approach of kicking it out of idle at the end of this method -	// allows us to share the code between channel creation and exiting idle -	// mode. This will also make it easy for us to switch to starting the -	// channel off in idle, i.e. by making newClient exported. - +	// instead of waiting for the first RPC.  This is the legacy behavior of +	// Dial.  	defer func() {  		if err != nil {  			cc.Close() @@ -291,17 +292,17 @@ func DialContext(ctx context.Context, target string, opts ...DialOption) (conn *  // addTraceEvent is a helper method to add a trace event on the channel. If the  // channel is a nested one, the same event is also added on the parent channel.  func (cc *ClientConn) addTraceEvent(msg string) { -	ted := &channelz.TraceEventDesc{ +	ted := &channelz.TraceEvent{  		Desc:     fmt.Sprintf("Channel %s", msg),  		Severity: channelz.CtInfo,  	} -	if cc.dopts.channelzParentID != nil { -		ted.Parent = &channelz.TraceEventDesc{ -			Desc:     fmt.Sprintf("Nested channel(id:%d) %s", cc.channelzID.Int(), msg), +	if cc.dopts.channelzParent != nil { +		ted.Parent = &channelz.TraceEvent{ +			Desc:     fmt.Sprintf("Nested channel(id:%d) %s", cc.channelz.ID, msg),  			Severity: channelz.CtInfo,  		}  	} -	channelz.AddTraceEvent(logger, cc.channelzID, 0, ted) +	channelz.AddTraceEvent(logger, cc.channelz, 0, ted)  }  type idler ClientConn @@ -418,14 +419,15 @@ func (cc *ClientConn) validateTransportCredentials() error {  }  // channelzRegistration registers the newly created ClientConn with channelz and -// stores the returned identifier in `cc.channelzID` and `cc.csMgr.channelzID`. -// A channelz trace event is emitted for ClientConn creation. If the newly -// created ClientConn is a nested one, i.e a valid parent ClientConn ID is -// specified via a dial option, the trace event is also added to the parent. +// stores the returned identifier in `cc.channelz`.  A channelz trace event is +// emitted for ClientConn creation. If the newly created ClientConn is a nested +// one, i.e a valid parent ClientConn ID is specified via a dial option, the +// trace event is also added to the parent.  //  // Doesn't grab cc.mu as this method is expected to be called only at Dial time.  func (cc *ClientConn) channelzRegistration(target string) { -	cc.channelzID = channelz.RegisterChannel(&channelzChannel{cc}, cc.dopts.channelzParentID, target) +	parentChannel, _ := cc.dopts.channelzParent.(*channelz.Channel) +	cc.channelz = channelz.RegisterChannel(parentChannel, target)  	cc.addTraceEvent("created")  } @@ -492,11 +494,11 @@ func getChainStreamer(interceptors []StreamClientInterceptor, curr int, finalStr  }  // newConnectivityStateManager creates an connectivityStateManager with -// the specified id. -func newConnectivityStateManager(ctx context.Context, id *channelz.Identifier) *connectivityStateManager { +// the specified channel. +func newConnectivityStateManager(ctx context.Context, channel *channelz.Channel) *connectivityStateManager {  	return &connectivityStateManager{ -		channelzID: id, -		pubSub:     grpcsync.NewPubSub(ctx), +		channelz: channel, +		pubSub:   grpcsync.NewPubSub(ctx),  	}  } @@ -510,7 +512,7 @@ type connectivityStateManager struct {  	mu         sync.Mutex  	state      connectivity.State  	notifyChan chan struct{} -	channelzID *channelz.Identifier +	channelz   *channelz.Channel  	pubSub     *grpcsync.PubSub  } @@ -527,9 +529,10 @@ func (csm *connectivityStateManager) updateState(state connectivity.State) {  		return  	}  	csm.state = state +	csm.channelz.ChannelMetrics.State.Store(&state)  	csm.pubSub.Publish(state) -	channelz.Infof(logger, csm.channelzID, "Channel Connectivity change to %v", state) +	channelz.Infof(logger, csm.channelz, "Channel Connectivity change to %v", state)  	if csm.notifyChan != nil {  		// There are other goroutines waiting on this channel.  		close(csm.notifyChan) @@ -583,12 +586,12 @@ type ClientConn struct {  	cancel context.CancelFunc // Cancelled on close.  	// The following are initialized at dial time, and are read-only after that. -	target          string               // User's dial target. -	parsedTarget    resolver.Target      // See parseTargetAndFindResolver(). -	authority       string               // See determineAuthority(). -	dopts           dialOptions          // Default and user specified dial options. -	channelzID      *channelz.Identifier // Channelz identifier for the channel. -	resolverBuilder resolver.Builder     // See parseTargetAndFindResolver(). +	target          string            // User's dial target. +	parsedTarget    resolver.Target   // See parseTargetAndFindResolver(). +	authority       string            // See determineAuthority(). +	dopts           dialOptions       // Default and user specified dial options. +	channelz        *channelz.Channel // Channelz object. +	resolverBuilder resolver.Builder  // See parseTargetAndFindResolver().  	idlenessMgr     *idle.Manager  	// The following provide their own synchronization, and therefore don't @@ -596,7 +599,6 @@ type ClientConn struct {  	csMgr              *connectivityStateManager  	pickerWrapper      *pickerWrapper  	safeConfigSelector iresolver.SafeConfigSelector -	czData             *channelzData  	retryThrottler     atomic.Value // Updated from service config.  	// mu protects the following fields. @@ -690,6 +692,7 @@ func (cc *ClientConn) waitForResolvedAddrs(ctx context.Context) error {  var emptyServiceConfig *ServiceConfig  func init() { +	balancer.Register(pickfirstBuilder{})  	cfg := parseServiceConfig("{}")  	if cfg.Err != nil {  		panic(fmt.Sprintf("impossible error parsing empty service config: %v", cfg.Err)) @@ -707,15 +710,15 @@ func init() {  	}  } -func (cc *ClientConn) maybeApplyDefaultServiceConfig(addrs []resolver.Address) { +func (cc *ClientConn) maybeApplyDefaultServiceConfig() {  	if cc.sc != nil { -		cc.applyServiceConfigAndBalancer(cc.sc, nil, addrs) +		cc.applyServiceConfigAndBalancer(cc.sc, nil)  		return  	}  	if cc.dopts.defaultServiceConfig != nil { -		cc.applyServiceConfigAndBalancer(cc.dopts.defaultServiceConfig, &defaultConfigSelector{cc.dopts.defaultServiceConfig}, addrs) +		cc.applyServiceConfigAndBalancer(cc.dopts.defaultServiceConfig, &defaultConfigSelector{cc.dopts.defaultServiceConfig})  	} else { -		cc.applyServiceConfigAndBalancer(emptyServiceConfig, &defaultConfigSelector{emptyServiceConfig}, addrs) +		cc.applyServiceConfigAndBalancer(emptyServiceConfig, &defaultConfigSelector{emptyServiceConfig})  	}  } @@ -733,7 +736,7 @@ func (cc *ClientConn) updateResolverStateAndUnlock(s resolver.State, err error)  		// May need to apply the initial service config in case the resolver  		// doesn't support service configs, or doesn't provide a service config  		// with the new addresses. -		cc.maybeApplyDefaultServiceConfig(nil) +		cc.maybeApplyDefaultServiceConfig()  		cc.balancerWrapper.resolverError(err) @@ -744,10 +747,10 @@ func (cc *ClientConn) updateResolverStateAndUnlock(s resolver.State, err error)  	var ret error  	if cc.dopts.disableServiceConfig { -		channelz.Infof(logger, cc.channelzID, "ignoring service config from resolver (%v) and applying the default because service config is disabled", s.ServiceConfig) -		cc.maybeApplyDefaultServiceConfig(s.Addresses) +		channelz.Infof(logger, cc.channelz, "ignoring service config from resolver (%v) and applying the default because service config is disabled", s.ServiceConfig) +		cc.maybeApplyDefaultServiceConfig()  	} else if s.ServiceConfig == nil { -		cc.maybeApplyDefaultServiceConfig(s.Addresses) +		cc.maybeApplyDefaultServiceConfig()  		// TODO: do we need to apply a failing LB policy if there is no  		// default, per the error handling design?  	} else { @@ -755,12 +758,12 @@ func (cc *ClientConn) updateResolverStateAndUnlock(s resolver.State, err error)  			configSelector := iresolver.GetConfigSelector(s)  			if configSelector != nil {  				if len(s.ServiceConfig.Config.(*ServiceConfig).Methods) != 0 { -					channelz.Infof(logger, cc.channelzID, "method configs in service config will be ignored due to presence of config selector") +					channelz.Infof(logger, cc.channelz, "method configs in service config will be ignored due to presence of config selector")  				}  			} else {  				configSelector = &defaultConfigSelector{sc}  			} -			cc.applyServiceConfigAndBalancer(sc, configSelector, s.Addresses) +			cc.applyServiceConfigAndBalancer(sc, configSelector)  		} else {  			ret = balancer.ErrBadResolverState  			if cc.sc == nil { @@ -775,7 +778,7 @@ func (cc *ClientConn) updateResolverStateAndUnlock(s resolver.State, err error)  	var balCfg serviceconfig.LoadBalancingConfig  	if cc.sc != nil && cc.sc.lbConfig != nil { -		balCfg = cc.sc.lbConfig.cfg +		balCfg = cc.sc.lbConfig  	}  	bw := cc.balancerWrapper  	cc.mu.Unlock() @@ -834,22 +837,17 @@ func (cc *ClientConn) newAddrConnLocked(addrs []resolver.Address, opts balancer.  		addrs:        copyAddressesWithoutBalancerAttributes(addrs),  		scopts:       opts,  		dopts:        cc.dopts, -		czData:       new(channelzData), +		channelz:     channelz.RegisterSubChannel(cc.channelz.ID, ""),  		resetBackoff: make(chan struct{}),  		stateChan:    make(chan struct{}),  	}  	ac.ctx, ac.cancel = context.WithCancel(cc.ctx) -	var err error -	ac.channelzID, err = channelz.RegisterSubChannel(ac, cc.channelzID, "") -	if err != nil { -		return nil, err -	} -	channelz.AddTraceEvent(logger, ac.channelzID, 0, &channelz.TraceEventDesc{ +	channelz.AddTraceEvent(logger, ac.channelz, 0, &channelz.TraceEvent{  		Desc:     "Subchannel created",  		Severity: channelz.CtInfo, -		Parent: &channelz.TraceEventDesc{ -			Desc:     fmt.Sprintf("Subchannel(id:%d) created", ac.channelzID.Int()), +		Parent: &channelz.TraceEvent{ +			Desc:     fmt.Sprintf("Subchannel(id:%d) created", ac.channelz.ID),  			Severity: channelz.CtInfo,  		},  	}) @@ -872,38 +870,27 @@ func (cc *ClientConn) removeAddrConn(ac *addrConn, err error) {  	ac.tearDown(err)  } -func (cc *ClientConn) channelzMetric() *channelz.ChannelInternalMetric { -	return &channelz.ChannelInternalMetric{ -		State:                    cc.GetState(), -		Target:                   cc.target, -		CallsStarted:             atomic.LoadInt64(&cc.czData.callsStarted), -		CallsSucceeded:           atomic.LoadInt64(&cc.czData.callsSucceeded), -		CallsFailed:              atomic.LoadInt64(&cc.czData.callsFailed), -		LastCallStartedTimestamp: time.Unix(0, atomic.LoadInt64(&cc.czData.lastCallStartedTime)), -	} -} -  // Target returns the target string of the ClientConn. -// -// # Experimental -// -// Notice: This API is EXPERIMENTAL and may be changed or removed in a -// later release.  func (cc *ClientConn) Target() string {  	return cc.target  } +// CanonicalTarget returns the canonical target string of the ClientConn. +func (cc *ClientConn) CanonicalTarget() string { +	return cc.parsedTarget.String() +} +  func (cc *ClientConn) incrCallsStarted() { -	atomic.AddInt64(&cc.czData.callsStarted, 1) -	atomic.StoreInt64(&cc.czData.lastCallStartedTime, time.Now().UnixNano()) +	cc.channelz.ChannelMetrics.CallsStarted.Add(1) +	cc.channelz.ChannelMetrics.LastCallStartedTimestamp.Store(time.Now().UnixNano())  }  func (cc *ClientConn) incrCallsSucceeded() { -	atomic.AddInt64(&cc.czData.callsSucceeded, 1) +	cc.channelz.ChannelMetrics.CallsSucceeded.Add(1)  }  func (cc *ClientConn) incrCallsFailed() { -	atomic.AddInt64(&cc.czData.callsFailed, 1) +	cc.channelz.ChannelMetrics.CallsFailed.Add(1)  }  // connect starts creating a transport. @@ -947,7 +934,7 @@ func equalAddresses(a, b []resolver.Address) bool {  // connections or connection attempts.  func (ac *addrConn) updateAddrs(addrs []resolver.Address) {  	ac.mu.Lock() -	channelz.Infof(logger, ac.channelzID, "addrConn: updateAddrs curAddr: %v, addrs: %v", pretty.ToJSON(ac.curAddr), pretty.ToJSON(addrs)) +	channelz.Infof(logger, ac.channelz, "addrConn: updateAddrs curAddr: %v, addrs: %v", pretty.ToJSON(ac.curAddr), pretty.ToJSON(addrs))  	addrs = copyAddressesWithoutBalancerAttributes(addrs)  	if equalAddresses(ac.addrs, addrs) { @@ -1067,7 +1054,7 @@ func (cc *ClientConn) getTransport(ctx context.Context, failfast bool, method st  	})  } -func (cc *ClientConn) applyServiceConfigAndBalancer(sc *ServiceConfig, configSelector iresolver.ConfigSelector, addrs []resolver.Address) { +func (cc *ClientConn) applyServiceConfigAndBalancer(sc *ServiceConfig, configSelector iresolver.ConfigSelector) {  	if sc == nil {  		// should never reach here.  		return @@ -1088,17 +1075,6 @@ func (cc *ClientConn) applyServiceConfigAndBalancer(sc *ServiceConfig, configSel  	} else {  		cc.retryThrottler.Store((*retryThrottler)(nil))  	} - -	var newBalancerName string -	if cc.sc == nil || (cc.sc.lbConfig == nil && cc.sc.LB == nil) { -		// No service config or no LB policy specified in config. -		newBalancerName = PickFirstBalancerName -	} else if cc.sc.lbConfig != nil { -		newBalancerName = cc.sc.lbConfig.name -	} else { // cc.sc.LB != nil -		newBalancerName = *cc.sc.LB -	} -	cc.balancerWrapper.switchTo(newBalancerName)  }  func (cc *ClientConn) resolveNow(o resolver.ResolveNowOptions) { @@ -1174,7 +1150,7 @@ func (cc *ClientConn) Close() error {  	// TraceEvent needs to be called before RemoveEntry, as TraceEvent may add  	// trace reference to the entity being deleted, and thus prevent it from being  	// deleted right away. -	channelz.RemoveEntry(cc.channelzID) +	channelz.RemoveEntry(cc.channelz.ID)  	return nil  } @@ -1206,8 +1182,7 @@ type addrConn struct {  	backoffIdx   int // Needs to be stateful for resetConnectBackoff.  	resetBackoff chan struct{} -	channelzID *channelz.Identifier -	czData     *channelzData +	channelz *channelz.SubChannel  }  // Note: this requires a lock on ac.mu. @@ -1219,10 +1194,11 @@ func (ac *addrConn) updateConnectivityState(s connectivity.State, lastErr error)  	close(ac.stateChan)  	ac.stateChan = make(chan struct{})  	ac.state = s +	ac.channelz.ChannelMetrics.State.Store(&s)  	if lastErr == nil { -		channelz.Infof(logger, ac.channelzID, "Subchannel Connectivity change to %v", s) +		channelz.Infof(logger, ac.channelz, "Subchannel Connectivity change to %v", s)  	} else { -		channelz.Infof(logger, ac.channelzID, "Subchannel Connectivity change to %v, last error: %s", s, lastErr) +		channelz.Infof(logger, ac.channelz, "Subchannel Connectivity change to %v, last error: %s", s, lastErr)  	}  	ac.acbw.updateState(s, lastErr)  } @@ -1335,7 +1311,7 @@ func (ac *addrConn) tryAllAddrs(ctx context.Context, addrs []resolver.Address, c  		}  		ac.mu.Unlock() -		channelz.Infof(logger, ac.channelzID, "Subchannel picks a new address %q to connect", addr.Addr) +		channelz.Infof(logger, ac.channelz, "Subchannel picks a new address %q to connect", addr.Addr)  		err := ac.createTransport(ctx, addr, copts, connectDeadline)  		if err == nil { @@ -1388,7 +1364,7 @@ func (ac *addrConn) createTransport(ctx context.Context, addr resolver.Address,  	connectCtx, cancel := context.WithDeadline(ctx, connectDeadline)  	defer cancel() -	copts.ChannelzParentID = ac.channelzID +	copts.ChannelzParent = ac.channelz  	newTr, err := transport.NewClientTransport(connectCtx, ac.cc.ctx, addr, copts, onClose)  	if err != nil { @@ -1397,7 +1373,7 @@ func (ac *addrConn) createTransport(ctx context.Context, addr resolver.Address,  		}  		// newTr is either nil, or closed.  		hcancel() -		channelz.Warningf(logger, ac.channelzID, "grpc: addrConn.createTransport failed to connect to %s. Err: %v", addr, err) +		channelz.Warningf(logger, ac.channelz, "grpc: addrConn.createTransport failed to connect to %s. Err: %v", addr, err)  		return err  	} @@ -1469,7 +1445,7 @@ func (ac *addrConn) startHealthCheck(ctx context.Context) {  		// The health package is not imported to set health check function.  		//  		// TODO: add a link to the health check doc in the error message. -		channelz.Error(logger, ac.channelzID, "Health check is requested but health check function is not set.") +		channelz.Error(logger, ac.channelz, "Health check is requested but health check function is not set.")  		return  	} @@ -1499,9 +1475,9 @@ func (ac *addrConn) startHealthCheck(ctx context.Context) {  		err := ac.cc.dopts.healthCheckFunc(ctx, newStream, setConnectivityState, healthCheckConfig.ServiceName)  		if err != nil {  			if status.Code(err) == codes.Unimplemented { -				channelz.Error(logger, ac.channelzID, "Subchannel health check is unimplemented at server side, thus health check is disabled") +				channelz.Error(logger, ac.channelz, "Subchannel health check is unimplemented at server side, thus health check is disabled")  			} else { -				channelz.Errorf(logger, ac.channelzID, "Health checking failed: %v", err) +				channelz.Errorf(logger, ac.channelz, "Health checking failed: %v", err)  			}  		}  	}() @@ -1566,18 +1542,18 @@ func (ac *addrConn) tearDown(err error) {  	ac.cancel()  	ac.curAddr = resolver.Address{} -	channelz.AddTraceEvent(logger, ac.channelzID, 0, &channelz.TraceEventDesc{ +	channelz.AddTraceEvent(logger, ac.channelz, 0, &channelz.TraceEvent{  		Desc:     "Subchannel deleted",  		Severity: channelz.CtInfo, -		Parent: &channelz.TraceEventDesc{ -			Desc:     fmt.Sprintf("Subchannel(id:%d) deleted", ac.channelzID.Int()), +		Parent: &channelz.TraceEvent{ +			Desc:     fmt.Sprintf("Subchannel(id:%d) deleted", ac.channelz.ID),  			Severity: channelz.CtInfo,  		},  	})  	// TraceEvent needs to be called before RemoveEntry, as TraceEvent may add  	// trace reference to the entity being deleted, and thus prevent it from  	// being deleted right away. -	channelz.RemoveEntry(ac.channelzID) +	channelz.RemoveEntry(ac.channelz.ID)  	ac.mu.Unlock()  	// We have to release the lock before the call to GracefulClose/Close here @@ -1604,39 +1580,6 @@ func (ac *addrConn) tearDown(err error) {  	}  } -func (ac *addrConn) getState() connectivity.State { -	ac.mu.Lock() -	defer ac.mu.Unlock() -	return ac.state -} - -func (ac *addrConn) ChannelzMetric() *channelz.ChannelInternalMetric { -	ac.mu.Lock() -	addr := ac.curAddr.Addr -	ac.mu.Unlock() -	return &channelz.ChannelInternalMetric{ -		State:                    ac.getState(), -		Target:                   addr, -		CallsStarted:             atomic.LoadInt64(&ac.czData.callsStarted), -		CallsSucceeded:           atomic.LoadInt64(&ac.czData.callsSucceeded), -		CallsFailed:              atomic.LoadInt64(&ac.czData.callsFailed), -		LastCallStartedTimestamp: time.Unix(0, atomic.LoadInt64(&ac.czData.lastCallStartedTime)), -	} -} - -func (ac *addrConn) incrCallsStarted() { -	atomic.AddInt64(&ac.czData.callsStarted, 1) -	atomic.StoreInt64(&ac.czData.lastCallStartedTime, time.Now().UnixNano()) -} - -func (ac *addrConn) incrCallsSucceeded() { -	atomic.AddInt64(&ac.czData.callsSucceeded, 1) -} - -func (ac *addrConn) incrCallsFailed() { -	atomic.AddInt64(&ac.czData.callsFailed, 1) -} -  type retryThrottler struct {  	max    float64  	thresh float64 @@ -1674,12 +1617,17 @@ func (rt *retryThrottler) successfulRPC() {  	}  } -type channelzChannel struct { -	cc *ClientConn +func (ac *addrConn) incrCallsStarted() { +	ac.channelz.ChannelMetrics.CallsStarted.Add(1) +	ac.channelz.ChannelMetrics.LastCallStartedTimestamp.Store(time.Now().UnixNano())  } -func (c *channelzChannel) ChannelzMetric() *channelz.ChannelInternalMetric { -	return c.cc.channelzMetric() +func (ac *addrConn) incrCallsSucceeded() { +	ac.channelz.ChannelMetrics.CallsSucceeded.Add(1) +} + +func (ac *addrConn) incrCallsFailed() { +	ac.channelz.ChannelMetrics.CallsFailed.Add(1)  }  // ErrClientConnTimeout indicates that the ClientConn cannot establish the @@ -1721,14 +1669,14 @@ func (cc *ClientConn) connectionError() error {  //  // Doesn't grab cc.mu as this method is expected to be called only at Dial time.  func (cc *ClientConn) parseTargetAndFindResolver() error { -	channelz.Infof(logger, cc.channelzID, "original dial target is: %q", cc.target) +	channelz.Infof(logger, cc.channelz, "original dial target is: %q", cc.target)  	var rb resolver.Builder  	parsedTarget, err := parseTarget(cc.target)  	if err != nil { -		channelz.Infof(logger, cc.channelzID, "dial target %q parse failed: %v", cc.target, err) +		channelz.Infof(logger, cc.channelz, "dial target %q parse failed: %v", cc.target, err)  	} else { -		channelz.Infof(logger, cc.channelzID, "parsed dial target is: %#v", parsedTarget) +		channelz.Infof(logger, cc.channelz, "parsed dial target is: %#v", parsedTarget)  		rb = cc.getResolver(parsedTarget.URL.Scheme)  		if rb != nil {  			cc.parsedTarget = parsedTarget @@ -1740,17 +1688,22 @@ func (cc *ClientConn) parseTargetAndFindResolver() error {  	// We are here because the user's dial target did not contain a scheme or  	// specified an unregistered scheme. We should fallback to the default  	// scheme, except when a custom dialer is specified in which case, we should -	// always use passthrough scheme. -	defScheme := resolver.GetDefaultScheme() -	channelz.Infof(logger, cc.channelzID, "fallback to scheme %q", defScheme) +	// always use passthrough scheme. For either case, we need to respect any overridden +	// global defaults set by the user. +	defScheme := cc.dopts.defaultScheme +	if internal.UserSetDefaultScheme { +		defScheme = resolver.GetDefaultScheme() +	} + +	channelz.Infof(logger, cc.channelz, "fallback to scheme %q", defScheme)  	canonicalTarget := defScheme + ":///" + cc.target  	parsedTarget, err = parseTarget(canonicalTarget)  	if err != nil { -		channelz.Infof(logger, cc.channelzID, "dial target %q parse failed: %v", canonicalTarget, err) +		channelz.Infof(logger, cc.channelz, "dial target %q parse failed: %v", canonicalTarget, err)  		return err  	} -	channelz.Infof(logger, cc.channelzID, "parsed dial target is: %+v", parsedTarget) +	channelz.Infof(logger, cc.channelz, "parsed dial target is: %+v", parsedTarget)  	rb = cc.getResolver(parsedTarget.URL.Scheme)  	if rb == nil {  		return fmt.Errorf("could not get resolver for default scheme: %q", parsedTarget.URL.Scheme) @@ -1772,6 +1725,8 @@ func parseTarget(target string) (resolver.Target, error) {  	return resolver.Target{URL: *u}, nil  } +// encodeAuthority escapes the authority string based on valid chars defined in +// https://datatracker.ietf.org/doc/html/rfc3986#section-3.2.  func encodeAuthority(authority string) string {  	const upperhex = "0123456789ABCDEF" @@ -1871,6 +1826,6 @@ func (cc *ClientConn) determineAuthority() error {  	} else {  		cc.authority = encodeAuthority(endpoint)  	} -	channelz.Infof(logger, cc.channelzID, "Channel authority set to %q", cc.authority) +	channelz.Infof(logger, cc.channelz, "Channel authority set to %q", cc.authority)  	return nil  } diff --git a/vendor/google.golang.org/grpc/credentials/credentials.go b/vendor/google.golang.org/grpc/credentials/credentials.go index 5feac3aa0..f6b55c68b 100644 --- a/vendor/google.golang.org/grpc/credentials/credentials.go +++ b/vendor/google.golang.org/grpc/credentials/credentials.go @@ -28,9 +28,9 @@ import (  	"fmt"  	"net" -	"github.com/golang/protobuf/proto"  	"google.golang.org/grpc/attributes"  	icredentials "google.golang.org/grpc/internal/credentials" +	"google.golang.org/protobuf/protoadapt"  )  // PerRPCCredentials defines the common interface for the credentials which need to @@ -287,5 +287,5 @@ type ChannelzSecurityValue interface {  type OtherChannelzSecurityValue struct {  	ChannelzSecurityValue  	Name  string -	Value proto.Message +	Value protoadapt.MessageV1  } diff --git a/vendor/google.golang.org/grpc/dialoptions.go b/vendor/google.golang.org/grpc/dialoptions.go index ba2426180..402493224 100644 --- a/vendor/google.golang.org/grpc/dialoptions.go +++ b/vendor/google.golang.org/grpc/dialoptions.go @@ -68,7 +68,7 @@ type dialOptions struct {  	binaryLogger                binarylog.Logger  	copts                       transport.ConnectOptions  	callOptions                 []CallOption -	channelzParentID            *channelz.Identifier +	channelzParent              channelz.Identifier  	disableServiceConfig        bool  	disableRetry                bool  	disableHealthCheck          bool @@ -79,6 +79,7 @@ type dialOptions struct {  	resolvers                   []resolver.Builder  	idleTimeout                 time.Duration  	recvBufferPool              SharedBufferPool +	defaultScheme               string  }  // DialOption configures how we set up the connection. @@ -154,9 +155,7 @@ func WithSharedWriteBuffer(val bool) DialOption {  }  // WithWriteBufferSize determines how much data can be batched before doing a -// write on the wire. The corresponding memory allocation for this buffer will -// be twice the size to keep syscalls low. The default value for this buffer is -// 32KB. +// write on the wire. The default value for this buffer is 32KB.  //  // Zero or negative values will disable the write buffer such that each write  // will be on underlying connection. Note: A Send call may not directly @@ -555,9 +554,9 @@ func WithAuthority(a string) DialOption {  //  // Notice: This API is EXPERIMENTAL and may be changed or removed in a  // later release. -func WithChannelzParentID(id *channelz.Identifier) DialOption { +func WithChannelzParentID(c channelz.Identifier) DialOption {  	return newFuncDialOption(func(o *dialOptions) { -		o.channelzParentID = id +		o.channelzParent = c  	})  } @@ -645,6 +644,7 @@ func defaultDialOptions() dialOptions {  		healthCheckFunc: internal.HealthCheckFunc,  		idleTimeout:     30 * time.Minute,  		recvBufferPool:  nopBufferPool{}, +		defaultScheme:   "dns",  	}  } @@ -659,6 +659,14 @@ func withMinConnectDeadline(f func() time.Duration) DialOption {  	})  } +// withDefaultScheme is used to allow Dial to use "passthrough" as the default +// name resolver, while NewClient uses "dns" otherwise. +func withDefaultScheme(s string) DialOption { +	return newFuncDialOption(func(o *dialOptions) { +		o.defaultScheme = s +	}) +} +  // WithResolvers allows a list of resolver implementations to be registered  // locally with the ClientConn without needing to be globally registered via  // resolver.Register.  They will be matched against the scheme used for the diff --git a/vendor/google.golang.org/grpc/encoding/proto/proto.go b/vendor/google.golang.org/grpc/encoding/proto/proto.go index 0ee3d3bae..66d5cdf03 100644 --- a/vendor/google.golang.org/grpc/encoding/proto/proto.go +++ b/vendor/google.golang.org/grpc/encoding/proto/proto.go @@ -23,8 +23,9 @@ package proto  import (  	"fmt" -	"github.com/golang/protobuf/proto"  	"google.golang.org/grpc/encoding" +	"google.golang.org/protobuf/proto" +	"google.golang.org/protobuf/protoadapt"  )  // Name is the name registered for the proto compressor. @@ -38,21 +39,34 @@ func init() {  type codec struct{}  func (codec) Marshal(v any) ([]byte, error) { -	vv, ok := v.(proto.Message) -	if !ok { +	vv := messageV2Of(v) +	if vv == nil {  		return nil, fmt.Errorf("failed to marshal, message is %T, want proto.Message", v)  	} +  	return proto.Marshal(vv)  }  func (codec) Unmarshal(data []byte, v any) error { -	vv, ok := v.(proto.Message) -	if !ok { +	vv := messageV2Of(v) +	if vv == nil {  		return fmt.Errorf("failed to unmarshal, message is %T, want proto.Message", v)  	} +  	return proto.Unmarshal(data, vv)  } +func messageV2Of(v any) proto.Message { +	switch v := v.(type) { +	case protoadapt.MessageV1: +		return protoadapt.MessageV2Of(v) +	case protoadapt.MessageV2: +		return v +	} + +	return nil +} +  func (codec) Name() string {  	return Name  } diff --git a/vendor/google.golang.org/grpc/health/grpc_health_v1/health.pb.go b/vendor/google.golang.org/grpc/health/grpc_health_v1/health.pb.go index 24299efd6..5bf880d41 100644 --- a/vendor/google.golang.org/grpc/health/grpc_health_v1/health.pb.go +++ b/vendor/google.golang.org/grpc/health/grpc_health_v1/health.pb.go @@ -17,8 +17,8 @@  // Code generated by protoc-gen-go. DO NOT EDIT.  // versions: -// 	protoc-gen-go v1.31.0 -// 	protoc        v4.22.0 +// 	protoc-gen-go v1.32.0 +// 	protoc        v4.25.2  // source: grpc/health/v1/health.proto  package grpc_health_v1 diff --git a/vendor/google.golang.org/grpc/health/grpc_health_v1/health_grpc.pb.go b/vendor/google.golang.org/grpc/health/grpc_health_v1/health_grpc.pb.go index 4439cda0f..4c46c098d 100644 --- a/vendor/google.golang.org/grpc/health/grpc_health_v1/health_grpc.pb.go +++ b/vendor/google.golang.org/grpc/health/grpc_health_v1/health_grpc.pb.go @@ -18,7 +18,7 @@  // Code generated by protoc-gen-go-grpc. DO NOT EDIT.  // versions:  // - protoc-gen-go-grpc v1.3.0 -// - protoc             v4.22.0 +// - protoc             v4.25.2  // source: grpc/health/v1/health.proto  package grpc_health_v1 diff --git a/vendor/google.golang.org/grpc/internal/balancer/gracefulswitch/config.go b/vendor/google.golang.org/grpc/internal/balancer/gracefulswitch/config.go new file mode 100644 index 000000000..6bf7f8739 --- /dev/null +++ b/vendor/google.golang.org/grpc/internal/balancer/gracefulswitch/config.go @@ -0,0 +1,83 @@ +/* + * + * Copyright 2024 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + *     http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package gracefulswitch + +import ( +	"encoding/json" +	"fmt" + +	"google.golang.org/grpc/balancer" +	"google.golang.org/grpc/serviceconfig" +) + +type lbConfig struct { +	serviceconfig.LoadBalancingConfig + +	childBuilder balancer.Builder +	childConfig  serviceconfig.LoadBalancingConfig +} + +func ChildName(l serviceconfig.LoadBalancingConfig) string { +	return l.(*lbConfig).childBuilder.Name() +} + +// ParseConfig parses a child config list and returns a LB config for the +// gracefulswitch Balancer. +// +// cfg is expected to be a json.RawMessage containing a JSON array of LB policy +// names + configs as the format of the "loadBalancingConfig" field in +// ServiceConfig.  It returns a type that should be passed to +// UpdateClientConnState in the BalancerConfig field. +func ParseConfig(cfg json.RawMessage) (serviceconfig.LoadBalancingConfig, error) { +	var lbCfg []map[string]json.RawMessage +	if err := json.Unmarshal(cfg, &lbCfg); err != nil { +		return nil, err +	} +	for i, e := range lbCfg { +		if len(e) != 1 { +			return nil, fmt.Errorf("expected a JSON struct with one entry; received entry %v at index %d", e, i) +		} + +		var name string +		var jsonCfg json.RawMessage +		for name, jsonCfg = range e { +		} + +		builder := balancer.Get(name) +		if builder == nil { +			// Skip unregistered balancer names. +			continue +		} + +		parser, ok := builder.(balancer.ConfigParser) +		if !ok { +			// This is a valid child with no config. +			return &lbConfig{childBuilder: builder}, nil +		} + +		cfg, err := parser.ParseConfig(jsonCfg) +		if err != nil { +			return nil, fmt.Errorf("error parsing config for policy %q: %v", name, err) +		} + +		return &lbConfig{childBuilder: builder, childConfig: cfg}, nil +	} + +	return nil, fmt.Errorf("no supported policies found in config: %v", string(cfg)) +} diff --git a/vendor/google.golang.org/grpc/internal/balancer/gracefulswitch/gracefulswitch.go b/vendor/google.golang.org/grpc/internal/balancer/gracefulswitch/gracefulswitch.go index 3c594e6e4..45d5e50ea 100644 --- a/vendor/google.golang.org/grpc/internal/balancer/gracefulswitch/gracefulswitch.go +++ b/vendor/google.golang.org/grpc/internal/balancer/gracefulswitch/gracefulswitch.go @@ -94,14 +94,23 @@ func (gsb *Balancer) balancerCurrentOrPending(bw *balancerWrapper) bool {  // process is not complete when this method returns. This method must be called  // synchronously alongside the rest of the balancer.Balancer methods this  // Graceful Switch Balancer implements. +// +// Deprecated: use ParseConfig and pass a parsed config to UpdateClientConnState +// to cause the Balancer to automatically change to the new child when necessary.  func (gsb *Balancer) SwitchTo(builder balancer.Builder) error { +	_, err := gsb.switchTo(builder) +	return err +} + +func (gsb *Balancer) switchTo(builder balancer.Builder) (*balancerWrapper, error) {  	gsb.mu.Lock()  	if gsb.closed {  		gsb.mu.Unlock() -		return errBalancerClosed +		return nil, errBalancerClosed  	}  	bw := &balancerWrapper{ -		gsb: gsb, +		builder: builder, +		gsb:     gsb,  		lastState: balancer.State{  			ConnectivityState: connectivity.Connecting,  			Picker:            base.NewErrPicker(balancer.ErrNoSubConnAvailable), @@ -129,7 +138,7 @@ func (gsb *Balancer) SwitchTo(builder balancer.Builder) error {  			gsb.balancerCurrent = nil  		}  		gsb.mu.Unlock() -		return balancer.ErrBadResolverState +		return nil, balancer.ErrBadResolverState  	}  	// This write doesn't need to take gsb.mu because this field never gets read @@ -138,7 +147,7 @@ func (gsb *Balancer) SwitchTo(builder balancer.Builder) error {  	// bw.Balancer field will never be forwarded to until this SwitchTo()  	// function returns.  	bw.Balancer = newBalancer -	return nil +	return bw, nil  }  // Returns nil if the graceful switch balancer is closed. @@ -152,12 +161,33 @@ func (gsb *Balancer) latestBalancer() *balancerWrapper {  }  // UpdateClientConnState forwards the update to the latest balancer created. +// +// If the state's BalancerConfig is the config returned by a call to +// gracefulswitch.ParseConfig, then this function will automatically SwitchTo +// the balancer indicated by the config before forwarding its config to it, if +// necessary.  func (gsb *Balancer) UpdateClientConnState(state balancer.ClientConnState) error {  	// The resolver data is only relevant to the most recent LB Policy.  	balToUpdate := gsb.latestBalancer() + +	gsbCfg, ok := state.BalancerConfig.(*lbConfig) +	if ok { +		// Switch to the child in the config unless it is already active. +		if balToUpdate == nil || gsbCfg.childBuilder.Name() != balToUpdate.builder.Name() { +			var err error +			balToUpdate, err = gsb.switchTo(gsbCfg.childBuilder) +			if err != nil { +				return fmt.Errorf("could not switch to new child balancer: %w", err) +			} +		} +		// Unwrap the child balancer's config. +		state.BalancerConfig = gsbCfg.childConfig +	} +  	if balToUpdate == nil {  		return errBalancerClosed  	} +  	// Perform this call without gsb.mu to prevent deadlocks if the child calls  	// back into the channel. The latest balancer can never be closed during a  	// call from the channel, even without gsb.mu held. @@ -169,6 +199,10 @@ func (gsb *Balancer) ResolverError(err error) {  	// The resolver data is only relevant to the most recent LB Policy.  	balToUpdate := gsb.latestBalancer()  	if balToUpdate == nil { +		gsb.cc.UpdateState(balancer.State{ +			ConnectivityState: connectivity.TransientFailure, +			Picker:            base.NewErrPicker(err), +		})  		return  	}  	// Perform this call without gsb.mu to prevent deadlocks if the child calls @@ -261,7 +295,8 @@ func (gsb *Balancer) Close() {  // graceful switch logic.  type balancerWrapper struct {  	balancer.Balancer -	gsb *Balancer +	gsb     *Balancer +	builder balancer.Builder  	lastState balancer.State  	subconns  map[balancer.SubConn]bool // subconns created by this balancer diff --git a/vendor/google.golang.org/grpc/internal/binarylog/method_logger.go b/vendor/google.golang.org/grpc/internal/binarylog/method_logger.go index 0f31274a3..e8456a77c 100644 --- a/vendor/google.golang.org/grpc/internal/binarylog/method_logger.go +++ b/vendor/google.golang.org/grpc/internal/binarylog/method_logger.go @@ -25,11 +25,12 @@ import (  	"sync/atomic"  	"time" -	"github.com/golang/protobuf/proto" -	"github.com/golang/protobuf/ptypes"  	binlogpb "google.golang.org/grpc/binarylog/grpc_binarylog_v1"  	"google.golang.org/grpc/metadata"  	"google.golang.org/grpc/status" +	"google.golang.org/protobuf/proto" +	"google.golang.org/protobuf/types/known/durationpb" +	"google.golang.org/protobuf/types/known/timestamppb"  )  type callIDGenerator struct { @@ -88,7 +89,7 @@ func NewTruncatingMethodLogger(h, m uint64) *TruncatingMethodLogger {  // in TruncatingMethodLogger as possible.  func (ml *TruncatingMethodLogger) Build(c LogEntryConfig) *binlogpb.GrpcLogEntry {  	m := c.toProto() -	timestamp, _ := ptypes.TimestampProto(time.Now()) +	timestamp := timestamppb.Now()  	m.Timestamp = timestamp  	m.CallId = ml.callID  	m.SequenceIdWithinCall = ml.idWithinCallGen.next() @@ -178,7 +179,7 @@ func (c *ClientHeader) toProto() *binlogpb.GrpcLogEntry {  		Authority:  c.Authority,  	}  	if c.Timeout > 0 { -		clientHeader.Timeout = ptypes.DurationProto(c.Timeout) +		clientHeader.Timeout = durationpb.New(c.Timeout)  	}  	ret := &binlogpb.GrpcLogEntry{  		Type: binlogpb.GrpcLogEntry_EVENT_TYPE_CLIENT_HEADER, diff --git a/vendor/google.golang.org/grpc/internal/binarylog/sink.go b/vendor/google.golang.org/grpc/internal/binarylog/sink.go index 264de387c..9ea598b14 100644 --- a/vendor/google.golang.org/grpc/internal/binarylog/sink.go +++ b/vendor/google.golang.org/grpc/internal/binarylog/sink.go @@ -25,8 +25,8 @@ import (  	"sync"  	"time" -	"github.com/golang/protobuf/proto"  	binlogpb "google.golang.org/grpc/binarylog/grpc_binarylog_v1" +	"google.golang.org/protobuf/proto"  )  var ( diff --git a/vendor/google.golang.org/grpc/internal/channelz/channel.go b/vendor/google.golang.org/grpc/internal/channelz/channel.go new file mode 100644 index 000000000..d7e9e1d54 --- /dev/null +++ b/vendor/google.golang.org/grpc/internal/channelz/channel.go @@ -0,0 +1,255 @@ +/* + * + * Copyright 2024 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + *     http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package channelz + +import ( +	"fmt" +	"sync/atomic" + +	"google.golang.org/grpc/connectivity" +) + +// Channel represents a channel within channelz, which includes metrics and +// internal channelz data, such as channelz id, child list, etc. +type Channel struct { +	Entity +	// ID is the channelz id of this channel. +	ID int64 +	// RefName is the human readable reference string of this channel. +	RefName string + +	closeCalled bool +	nestedChans map[int64]string +	subChans    map[int64]string +	Parent      *Channel +	trace       *ChannelTrace +	// traceRefCount is the number of trace events that reference this channel. +	// Non-zero traceRefCount means the trace of this channel cannot be deleted. +	traceRefCount int32 + +	ChannelMetrics ChannelMetrics +} + +// Implemented to make Channel implement the Identifier interface used for +// nesting. +func (c *Channel) channelzIdentifier() {} + +func (c *Channel) String() string { +	if c.Parent == nil { +		return fmt.Sprintf("Channel #%d", c.ID) +	} +	return fmt.Sprintf("%s Channel #%d", c.Parent, c.ID) +} + +func (c *Channel) id() int64 { +	return c.ID +} + +func (c *Channel) SubChans() map[int64]string { +	db.mu.RLock() +	defer db.mu.RUnlock() +	return copyMap(c.subChans) +} + +func (c *Channel) NestedChans() map[int64]string { +	db.mu.RLock() +	defer db.mu.RUnlock() +	return copyMap(c.nestedChans) +} + +func (c *Channel) Trace() *ChannelTrace { +	db.mu.RLock() +	defer db.mu.RUnlock() +	return c.trace.copy() +} + +type ChannelMetrics struct { +	// The current connectivity state of the channel. +	State atomic.Pointer[connectivity.State] +	// The target this channel originally tried to connect to.  May be absent +	Target atomic.Pointer[string] +	// The number of calls started on the channel. +	CallsStarted atomic.Int64 +	// The number of calls that have completed with an OK status. +	CallsSucceeded atomic.Int64 +	// The number of calls that have a completed with a non-OK status. +	CallsFailed atomic.Int64 +	// The last time a call was started on the channel. +	LastCallStartedTimestamp atomic.Int64 +} + +// CopyFrom copies the metrics in o to c.  For testing only. +func (c *ChannelMetrics) CopyFrom(o *ChannelMetrics) { +	c.State.Store(o.State.Load()) +	c.Target.Store(o.Target.Load()) +	c.CallsStarted.Store(o.CallsStarted.Load()) +	c.CallsSucceeded.Store(o.CallsSucceeded.Load()) +	c.CallsFailed.Store(o.CallsFailed.Load()) +	c.LastCallStartedTimestamp.Store(o.LastCallStartedTimestamp.Load()) +} + +// Equal returns true iff the metrics of c are the same as the metrics of o. +// For testing only. +func (c *ChannelMetrics) Equal(o any) bool { +	oc, ok := o.(*ChannelMetrics) +	if !ok { +		return false +	} +	if (c.State.Load() == nil) != (oc.State.Load() == nil) { +		return false +	} +	if c.State.Load() != nil && *c.State.Load() != *oc.State.Load() { +		return false +	} +	if (c.Target.Load() == nil) != (oc.Target.Load() == nil) { +		return false +	} +	if c.Target.Load() != nil && *c.Target.Load() != *oc.Target.Load() { +		return false +	} +	return c.CallsStarted.Load() == oc.CallsStarted.Load() && +		c.CallsFailed.Load() == oc.CallsFailed.Load() && +		c.CallsSucceeded.Load() == oc.CallsSucceeded.Load() && +		c.LastCallStartedTimestamp.Load() == oc.LastCallStartedTimestamp.Load() +} + +func strFromPointer(s *string) string { +	if s == nil { +		return "" +	} +	return *s +} + +func (c *ChannelMetrics) String() string { +	return fmt.Sprintf("State: %v, Target: %s, CallsStarted: %v, CallsSucceeded: %v, CallsFailed: %v, LastCallStartedTimestamp: %v", +		c.State.Load(), strFromPointer(c.Target.Load()), c.CallsStarted.Load(), c.CallsSucceeded.Load(), c.CallsFailed.Load(), c.LastCallStartedTimestamp.Load(), +	) +} + +func NewChannelMetricForTesting(state connectivity.State, target string, started, succeeded, failed, timestamp int64) *ChannelMetrics { +	c := &ChannelMetrics{} +	c.State.Store(&state) +	c.Target.Store(&target) +	c.CallsStarted.Store(started) +	c.CallsSucceeded.Store(succeeded) +	c.CallsFailed.Store(failed) +	c.LastCallStartedTimestamp.Store(timestamp) +	return c +} + +func (c *Channel) addChild(id int64, e entry) { +	switch v := e.(type) { +	case *SubChannel: +		c.subChans[id] = v.RefName +	case *Channel: +		c.nestedChans[id] = v.RefName +	default: +		logger.Errorf("cannot add a child (id = %d) of type %T to a channel", id, e) +	} +} + +func (c *Channel) deleteChild(id int64) { +	delete(c.subChans, id) +	delete(c.nestedChans, id) +	c.deleteSelfIfReady() +} + +func (c *Channel) triggerDelete() { +	c.closeCalled = true +	c.deleteSelfIfReady() +} + +func (c *Channel) getParentID() int64 { +	if c.Parent == nil { +		return -1 +	} +	return c.Parent.ID +} + +// deleteSelfFromTree tries to delete the channel from the channelz entry relation tree, which means +// deleting the channel reference from its parent's child list. +// +// In order for a channel to be deleted from the tree, it must meet the criteria that, removal of the +// corresponding grpc object has been invoked, and the channel does not have any children left. +// +// The returned boolean value indicates whether the channel has been successfully deleted from tree. +func (c *Channel) deleteSelfFromTree() (deleted bool) { +	if !c.closeCalled || len(c.subChans)+len(c.nestedChans) != 0 { +		return false +	} +	// not top channel +	if c.Parent != nil { +		c.Parent.deleteChild(c.ID) +	} +	return true +} + +// deleteSelfFromMap checks whether it is valid to delete the channel from the map, which means +// deleting the channel from channelz's tracking entirely. Users can no longer use id to query the +// channel, and its memory will be garbage collected. +// +// The trace reference count of the channel must be 0 in order to be deleted from the map. This is +// specified in the channel tracing gRFC that as long as some other trace has reference to an entity, +// the trace of the referenced entity must not be deleted. In order to release the resource allocated +// by grpc, the reference to the grpc object is reset to a dummy object. +// +// deleteSelfFromMap must be called after deleteSelfFromTree returns true. +// +// It returns a bool to indicate whether the channel can be safely deleted from map. +func (c *Channel) deleteSelfFromMap() (delete bool) { +	return c.getTraceRefCount() == 0 +} + +// deleteSelfIfReady tries to delete the channel itself from the channelz database. +// The delete process includes two steps: +//  1. delete the channel from the entry relation tree, i.e. delete the channel reference from its +//     parent's child list. +//  2. delete the channel from the map, i.e. delete the channel entirely from channelz. Lookup by id +//     will return entry not found error. +func (c *Channel) deleteSelfIfReady() { +	if !c.deleteSelfFromTree() { +		return +	} +	if !c.deleteSelfFromMap() { +		return +	} +	db.deleteEntry(c.ID) +	c.trace.clear() +} + +func (c *Channel) getChannelTrace() *ChannelTrace { +	return c.trace +} + +func (c *Channel) incrTraceRefCount() { +	atomic.AddInt32(&c.traceRefCount, 1) +} + +func (c *Channel) decrTraceRefCount() { +	atomic.AddInt32(&c.traceRefCount, -1) +} + +func (c *Channel) getTraceRefCount() int { +	i := atomic.LoadInt32(&c.traceRefCount) +	return int(i) +} + +func (c *Channel) getRefName() string { +	return c.RefName +} diff --git a/vendor/google.golang.org/grpc/internal/channelz/channelmap.go b/vendor/google.golang.org/grpc/internal/channelz/channelmap.go new file mode 100644 index 000000000..dfe18b089 --- /dev/null +++ b/vendor/google.golang.org/grpc/internal/channelz/channelmap.go @@ -0,0 +1,402 @@ +/* + * + * Copyright 2018 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + *     http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package channelz + +import ( +	"fmt" +	"sort" +	"sync" +	"time" +) + +// entry represents a node in the channelz database. +type entry interface { +	// addChild adds a child e, whose channelz id is id to child list +	addChild(id int64, e entry) +	// deleteChild deletes a child with channelz id to be id from child list +	deleteChild(id int64) +	// triggerDelete tries to delete self from channelz database. However, if +	// child list is not empty, then deletion from the database is on hold until +	// the last child is deleted from database. +	triggerDelete() +	// deleteSelfIfReady check whether triggerDelete() has been called before, +	// and whether child list is now empty. If both conditions are met, then +	// delete self from database. +	deleteSelfIfReady() +	// getParentID returns parent ID of the entry. 0 value parent ID means no parent. +	getParentID() int64 +	Entity +} + +// channelMap is the storage data structure for channelz. +// +// Methods of channelMap can be divided in two two categories with respect to +// locking. +// +// 1. Methods acquire the global lock. +// 2. Methods that can only be called when global lock is held. +// +// A second type of method need always to be called inside a first type of method. +type channelMap struct { +	mu               sync.RWMutex +	topLevelChannels map[int64]struct{} +	channels         map[int64]*Channel +	subChannels      map[int64]*SubChannel +	sockets          map[int64]*Socket +	servers          map[int64]*Server +} + +func newChannelMap() *channelMap { +	return &channelMap{ +		topLevelChannels: make(map[int64]struct{}), +		channels:         make(map[int64]*Channel), +		subChannels:      make(map[int64]*SubChannel), +		sockets:          make(map[int64]*Socket), +		servers:          make(map[int64]*Server), +	} +} + +func (c *channelMap) addServer(id int64, s *Server) { +	c.mu.Lock() +	defer c.mu.Unlock() +	s.cm = c +	c.servers[id] = s +} + +func (c *channelMap) addChannel(id int64, cn *Channel, isTopChannel bool, pid int64) { +	c.mu.Lock() +	defer c.mu.Unlock() +	cn.trace.cm = c +	c.channels[id] = cn +	if isTopChannel { +		c.topLevelChannels[id] = struct{}{} +	} else if p := c.channels[pid]; p != nil { +		p.addChild(id, cn) +	} else { +		logger.Infof("channel %d references invalid parent ID %d", id, pid) +	} +} + +func (c *channelMap) addSubChannel(id int64, sc *SubChannel, pid int64) { +	c.mu.Lock() +	defer c.mu.Unlock() +	sc.trace.cm = c +	c.subChannels[id] = sc +	if p := c.channels[pid]; p != nil { +		p.addChild(id, sc) +	} else { +		logger.Infof("subchannel %d references invalid parent ID %d", id, pid) +	} +} + +func (c *channelMap) addSocket(s *Socket) { +	c.mu.Lock() +	defer c.mu.Unlock() +	s.cm = c +	c.sockets[s.ID] = s +	if s.Parent == nil { +		logger.Infof("normal socket %d has no parent", s.ID) +	} +	s.Parent.(entry).addChild(s.ID, s) +} + +// removeEntry triggers the removal of an entry, which may not indeed delete the +// entry, if it has to wait on the deletion of its children and until no other +// entity's channel trace references it.  It may lead to a chain of entry +// deletion. For example, deleting the last socket of a gracefully shutting down +// server will lead to the server being also deleted. +func (c *channelMap) removeEntry(id int64) { +	c.mu.Lock() +	defer c.mu.Unlock() +	c.findEntry(id).triggerDelete() +} + +// tracedChannel represents tracing operations which are present on both +// channels and subChannels. +type tracedChannel interface { +	getChannelTrace() *ChannelTrace +	incrTraceRefCount() +	decrTraceRefCount() +	getRefName() string +} + +// c.mu must be held by the caller +func (c *channelMap) decrTraceRefCount(id int64) { +	e := c.findEntry(id) +	if v, ok := e.(tracedChannel); ok { +		v.decrTraceRefCount() +		e.deleteSelfIfReady() +	} +} + +// c.mu must be held by the caller. +func (c *channelMap) findEntry(id int64) entry { +	if v, ok := c.channels[id]; ok { +		return v +	} +	if v, ok := c.subChannels[id]; ok { +		return v +	} +	if v, ok := c.servers[id]; ok { +		return v +	} +	if v, ok := c.sockets[id]; ok { +		return v +	} +	return &dummyEntry{idNotFound: id} +} + +// c.mu must be held by the caller +// +// deleteEntry deletes an entry from the channelMap. Before calling this method, +// caller must check this entry is ready to be deleted, i.e removeEntry() has +// been called on it, and no children still exist. +func (c *channelMap) deleteEntry(id int64) entry { +	if v, ok := c.sockets[id]; ok { +		delete(c.sockets, id) +		return v +	} +	if v, ok := c.subChannels[id]; ok { +		delete(c.subChannels, id) +		return v +	} +	if v, ok := c.channels[id]; ok { +		delete(c.channels, id) +		delete(c.topLevelChannels, id) +		return v +	} +	if v, ok := c.servers[id]; ok { +		delete(c.servers, id) +		return v +	} +	return &dummyEntry{idNotFound: id} +} + +func (c *channelMap) traceEvent(id int64, desc *TraceEvent) { +	c.mu.Lock() +	defer c.mu.Unlock() +	child := c.findEntry(id) +	childTC, ok := child.(tracedChannel) +	if !ok { +		return +	} +	childTC.getChannelTrace().append(&traceEvent{Desc: desc.Desc, Severity: desc.Severity, Timestamp: time.Now()}) +	if desc.Parent != nil { +		parent := c.findEntry(child.getParentID()) +		var chanType RefChannelType +		switch child.(type) { +		case *Channel: +			chanType = RefChannel +		case *SubChannel: +			chanType = RefSubChannel +		} +		if parentTC, ok := parent.(tracedChannel); ok { +			parentTC.getChannelTrace().append(&traceEvent{ +				Desc:      desc.Parent.Desc, +				Severity:  desc.Parent.Severity, +				Timestamp: time.Now(), +				RefID:     id, +				RefName:   childTC.getRefName(), +				RefType:   chanType, +			}) +			childTC.incrTraceRefCount() +		} +	} +} + +type int64Slice []int64 + +func (s int64Slice) Len() int           { return len(s) } +func (s int64Slice) Swap(i, j int)      { s[i], s[j] = s[j], s[i] } +func (s int64Slice) Less(i, j int) bool { return s[i] < s[j] } + +func copyMap(m map[int64]string) map[int64]string { +	n := make(map[int64]string) +	for k, v := range m { +		n[k] = v +	} +	return n +} + +func min(a, b int) int { +	if a < b { +		return a +	} +	return b +} + +func (c *channelMap) getTopChannels(id int64, maxResults int) ([]*Channel, bool) { +	if maxResults <= 0 { +		maxResults = EntriesPerPage +	} +	c.mu.RLock() +	defer c.mu.RUnlock() +	l := int64(len(c.topLevelChannels)) +	ids := make([]int64, 0, l) + +	for k := range c.topLevelChannels { +		ids = append(ids, k) +	} +	sort.Sort(int64Slice(ids)) +	idx := sort.Search(len(ids), func(i int) bool { return ids[i] >= id }) +	end := true +	var t []*Channel +	for _, v := range ids[idx:] { +		if len(t) == maxResults { +			end = false +			break +		} +		if cn, ok := c.channels[v]; ok { +			t = append(t, cn) +		} +	} +	return t, end +} + +func (c *channelMap) getServers(id int64, maxResults int) ([]*Server, bool) { +	if maxResults <= 0 { +		maxResults = EntriesPerPage +	} +	c.mu.RLock() +	defer c.mu.RUnlock() +	ids := make([]int64, 0, len(c.servers)) +	for k := range c.servers { +		ids = append(ids, k) +	} +	sort.Sort(int64Slice(ids)) +	idx := sort.Search(len(ids), func(i int) bool { return ids[i] >= id }) +	end := true +	var s []*Server +	for _, v := range ids[idx:] { +		if len(s) == maxResults { +			end = false +			break +		} +		if svr, ok := c.servers[v]; ok { +			s = append(s, svr) +		} +	} +	return s, end +} + +func (c *channelMap) getServerSockets(id int64, startID int64, maxResults int) ([]*Socket, bool) { +	if maxResults <= 0 { +		maxResults = EntriesPerPage +	} +	c.mu.RLock() +	defer c.mu.RUnlock() +	svr, ok := c.servers[id] +	if !ok { +		// server with id doesn't exist. +		return nil, true +	} +	svrskts := svr.sockets +	ids := make([]int64, 0, len(svrskts)) +	sks := make([]*Socket, 0, min(len(svrskts), maxResults)) +	for k := range svrskts { +		ids = append(ids, k) +	} +	sort.Sort(int64Slice(ids)) +	idx := sort.Search(len(ids), func(i int) bool { return ids[i] >= startID }) +	end := true +	for _, v := range ids[idx:] { +		if len(sks) == maxResults { +			end = false +			break +		} +		if ns, ok := c.sockets[v]; ok { +			sks = append(sks, ns) +		} +	} +	return sks, end +} + +func (c *channelMap) getChannel(id int64) *Channel { +	c.mu.RLock() +	defer c.mu.RUnlock() +	return c.channels[id] +} + +func (c *channelMap) getSubChannel(id int64) *SubChannel { +	c.mu.RLock() +	defer c.mu.RUnlock() +	return c.subChannels[id] +} + +func (c *channelMap) getSocket(id int64) *Socket { +	c.mu.RLock() +	defer c.mu.RUnlock() +	return c.sockets[id] +} + +func (c *channelMap) getServer(id int64) *Server { +	c.mu.RLock() +	defer c.mu.RUnlock() +	return c.servers[id] +} + +type dummyEntry struct { +	// dummyEntry is a fake entry to handle entry not found case. +	idNotFound int64 +	Entity +} + +func (d *dummyEntry) String() string { +	return fmt.Sprintf("non-existent entity #%d", d.idNotFound) +} + +func (d *dummyEntry) ID() int64 { return d.idNotFound } + +func (d *dummyEntry) addChild(id int64, e entry) { +	// Note: It is possible for a normal program to reach here under race +	// condition.  For example, there could be a race between ClientConn.Close() +	// info being propagated to addrConn and http2Client. ClientConn.Close() +	// cancel the context and result in http2Client to error. The error info is +	// then caught by transport monitor and before addrConn.tearDown() is called +	// in side ClientConn.Close(). Therefore, the addrConn will create a new +	// transport. And when registering the new transport in channelz, its parent +	// addrConn could have already been torn down and deleted from channelz +	// tracking, and thus reach the code here. +	logger.Infof("attempt to add child of type %T with id %d to a parent (id=%d) that doesn't currently exist", e, id, d.idNotFound) +} + +func (d *dummyEntry) deleteChild(id int64) { +	// It is possible for a normal program to reach here under race condition. +	// Refer to the example described in addChild(). +	logger.Infof("attempt to delete child with id %d from a parent (id=%d) that doesn't currently exist", id, d.idNotFound) +} + +func (d *dummyEntry) triggerDelete() { +	logger.Warningf("attempt to delete an entry (id=%d) that doesn't currently exist", d.idNotFound) +} + +func (*dummyEntry) deleteSelfIfReady() { +	// code should not reach here. deleteSelfIfReady is always called on an existing entry. +} + +func (*dummyEntry) getParentID() int64 { +	return 0 +} + +// Entity is implemented by all channelz types. +type Entity interface { +	isEntity() +	fmt.Stringer +	id() int64 +} diff --git a/vendor/google.golang.org/grpc/internal/channelz/funcs.go b/vendor/google.golang.org/grpc/internal/channelz/funcs.go index fc094f344..f461e9bc3 100644 --- a/vendor/google.golang.org/grpc/internal/channelz/funcs.go +++ b/vendor/google.golang.org/grpc/internal/channelz/funcs.go @@ -16,47 +16,32 @@   *   */ -// Package channelz defines APIs for enabling channelz service, entry +// Package channelz defines internal APIs for enabling channelz service, entry  // registration/deletion, and accessing channelz data. It also defines channelz  // metric struct formats. -// -// All APIs in this package are experimental.  package channelz  import ( -	"errors" -	"sort" -	"sync"  	"sync/atomic"  	"time" -	"google.golang.org/grpc/grpclog"  	"google.golang.org/grpc/internal"  ) -const ( -	defaultMaxTraceEntry int32 = 30 -) -  var (  	// IDGen is the global channelz entity ID generator.  It should not be used  	// outside this package except by tests.  	IDGen IDGenerator -	db dbWrapper -	// EntryPerPage defines the number of channelz entries to be shown on a web page. -	EntryPerPage  = int64(50) -	curState      int32 -	maxTraceEntry = defaultMaxTraceEntry +	db *channelMap = newChannelMap() +	// EntriesPerPage defines the number of channelz entries to be shown on a web page. +	EntriesPerPage = 50 +	curState       int32  )  // TurnOn turns on channelz data collection.  func TurnOn() { -	if !IsOn() { -		db.set(newChannelMap()) -		IDGen.Reset() -		atomic.StoreInt32(&curState, 1) -	} +	atomic.StoreInt32(&curState, 1)  }  func init() { @@ -70,49 +55,15 @@ func IsOn() bool {  	return atomic.LoadInt32(&curState) == 1  } -// SetMaxTraceEntry sets maximum number of trace entry per entity (i.e. channel/subchannel). -// Setting it to 0 will disable channel tracing. -func SetMaxTraceEntry(i int32) { -	atomic.StoreInt32(&maxTraceEntry, i) -} - -// ResetMaxTraceEntryToDefault resets the maximum number of trace entry per entity to default. -func ResetMaxTraceEntryToDefault() { -	atomic.StoreInt32(&maxTraceEntry, defaultMaxTraceEntry) -} - -func getMaxTraceEntry() int { -	i := atomic.LoadInt32(&maxTraceEntry) -	return int(i) -} - -// dbWarpper wraps around a reference to internal channelz data storage, and -// provide synchronized functionality to set and get the reference. -type dbWrapper struct { -	mu sync.RWMutex -	DB *channelMap -} - -func (d *dbWrapper) set(db *channelMap) { -	d.mu.Lock() -	d.DB = db -	d.mu.Unlock() -} - -func (d *dbWrapper) get() *channelMap { -	d.mu.RLock() -	defer d.mu.RUnlock() -	return d.DB -} -  // GetTopChannels returns a slice of top channel's ChannelMetric, along with a  // boolean indicating whether there's more top channels to be queried for.  // -// The arg id specifies that only top channel with id at or above it will be included -// in the result. The returned slice is up to a length of the arg maxResults or -// EntryPerPage if maxResults is zero, and is sorted in ascending id order. -func GetTopChannels(id int64, maxResults int64) ([]*ChannelMetric, bool) { -	return db.get().GetTopChannels(id, maxResults) +// The arg id specifies that only top channel with id at or above it will be +// included in the result. The returned slice is up to a length of the arg +// maxResults or EntriesPerPage if maxResults is zero, and is sorted in ascending +// id order. +func GetTopChannels(id int64, maxResults int) ([]*Channel, bool) { +	return db.getTopChannels(id, maxResults)  }  // GetServers returns a slice of server's ServerMetric, along with a @@ -120,73 +71,69 @@ func GetTopChannels(id int64, maxResults int64) ([]*ChannelMetric, bool) {  //  // The arg id specifies that only server with id at or above it will be included  // in the result. The returned slice is up to a length of the arg maxResults or -// EntryPerPage if maxResults is zero, and is sorted in ascending id order. -func GetServers(id int64, maxResults int64) ([]*ServerMetric, bool) { -	return db.get().GetServers(id, maxResults) +// EntriesPerPage if maxResults is zero, and is sorted in ascending id order. +func GetServers(id int64, maxResults int) ([]*Server, bool) { +	return db.getServers(id, maxResults)  }  // GetServerSockets returns a slice of server's (identified by id) normal socket's -// SocketMetric, along with a boolean indicating whether there's more sockets to +// SocketMetrics, along with a boolean indicating whether there's more sockets to  // be queried for.  //  // The arg startID specifies that only sockets with id at or above it will be  // included in the result. The returned slice is up to a length of the arg maxResults -// or EntryPerPage if maxResults is zero, and is sorted in ascending id order. -func GetServerSockets(id int64, startID int64, maxResults int64) ([]*SocketMetric, bool) { -	return db.get().GetServerSockets(id, startID, maxResults) +// or EntriesPerPage if maxResults is zero, and is sorted in ascending id order. +func GetServerSockets(id int64, startID int64, maxResults int) ([]*Socket, bool) { +	return db.getServerSockets(id, startID, maxResults)  } -// GetChannel returns the ChannelMetric for the channel (identified by id). -func GetChannel(id int64) *ChannelMetric { -	return db.get().GetChannel(id) +// GetChannel returns the Channel for the channel (identified by id). +func GetChannel(id int64) *Channel { +	return db.getChannel(id)  } -// GetSubChannel returns the SubChannelMetric for the subchannel (identified by id). -func GetSubChannel(id int64) *SubChannelMetric { -	return db.get().GetSubChannel(id) +// GetSubChannel returns the SubChannel for the subchannel (identified by id). +func GetSubChannel(id int64) *SubChannel { +	return db.getSubChannel(id)  } -// GetSocket returns the SocketInternalMetric for the socket (identified by id). -func GetSocket(id int64) *SocketMetric { -	return db.get().GetSocket(id) +// GetSocket returns the Socket for the socket (identified by id). +func GetSocket(id int64) *Socket { +	return db.getSocket(id)  }  // GetServer returns the ServerMetric for the server (identified by id). -func GetServer(id int64) *ServerMetric { -	return db.get().GetServer(id) +func GetServer(id int64) *Server { +	return db.getServer(id)  }  // RegisterChannel registers the given channel c in the channelz database with -// ref as its reference name, and adds it to the child list of its parent -// (identified by pid). pid == nil means no parent. +// target as its target and reference name, and adds it to the child list of its +// parent.  parent == nil means no parent.  //  // Returns a unique channelz identifier assigned to this channel.  //  // If channelz is not turned ON, the channelz database is not mutated. -func RegisterChannel(c Channel, pid *Identifier, ref string) *Identifier { +func RegisterChannel(parent *Channel, target string) *Channel {  	id := IDGen.genID() -	var parent int64 -	isTopChannel := true -	if pid != nil { -		isTopChannel = false -		parent = pid.Int() -	}  	if !IsOn() { -		return newIdentifer(RefChannel, id, pid) +		return &Channel{ID: id}  	} -	cn := &channel{ -		refName:     ref, -		c:           c, -		subChans:    make(map[int64]string), +	isTopChannel := parent == nil + +	cn := &Channel{ +		ID:          id, +		RefName:     target,  		nestedChans: make(map[int64]string), -		id:          id, -		pid:         parent, -		trace:       &channelTrace{createdTime: time.Now(), events: make([]*TraceEvent, 0, getMaxTraceEntry())}, +		subChans:    make(map[int64]string), +		Parent:      parent, +		trace:       &ChannelTrace{CreationTime: time.Now(), Events: make([]*traceEvent, 0, getMaxTraceEntry())},  	} -	db.get().addChannel(id, cn, isTopChannel, parent) -	return newIdentifer(RefChannel, id, pid) +	cn.ChannelMetrics.Target.Store(&target) +	db.addChannel(id, cn, isTopChannel, cn.getParentID()) +	return cn  }  // RegisterSubChannel registers the given subChannel c in the channelz database @@ -196,555 +143,66 @@ func RegisterChannel(c Channel, pid *Identifier, ref string) *Identifier {  // Returns a unique channelz identifier assigned to this subChannel.  //  // If channelz is not turned ON, the channelz database is not mutated. -func RegisterSubChannel(c Channel, pid *Identifier, ref string) (*Identifier, error) { -	if pid == nil { -		return nil, errors.New("a SubChannel's parent id cannot be nil") -	} +func RegisterSubChannel(pid int64, ref string) *SubChannel {  	id := IDGen.genID()  	if !IsOn() { -		return newIdentifer(RefSubChannel, id, pid), nil +		return &SubChannel{ID: id}  	} -	sc := &subChannel{ -		refName: ref, -		c:       c, +	sc := &SubChannel{ +		RefName: ref, +		ID:      id,  		sockets: make(map[int64]string), -		id:      id, -		pid:     pid.Int(), -		trace:   &channelTrace{createdTime: time.Now(), events: make([]*TraceEvent, 0, getMaxTraceEntry())}, +		parent:  db.getChannel(pid), +		trace:   &ChannelTrace{CreationTime: time.Now(), Events: make([]*traceEvent, 0, getMaxTraceEntry())},  	} -	db.get().addSubChannel(id, sc, pid.Int()) -	return newIdentifer(RefSubChannel, id, pid), nil +	db.addSubChannel(id, sc, pid) +	return sc  }  // RegisterServer registers the given server s in channelz database. It returns  // the unique channelz tracking id assigned to this server.  //  // If channelz is not turned ON, the channelz database is not mutated. -func RegisterServer(s Server, ref string) *Identifier { +func RegisterServer(ref string) *Server {  	id := IDGen.genID()  	if !IsOn() { -		return newIdentifer(RefServer, id, nil) +		return &Server{ID: id}  	} -	svr := &server{ -		refName:       ref, -		s:             s, +	svr := &Server{ +		RefName:       ref,  		sockets:       make(map[int64]string),  		listenSockets: make(map[int64]string), -		id:            id, -	} -	db.get().addServer(id, svr) -	return newIdentifer(RefServer, id, nil) -} - -// RegisterListenSocket registers the given listen socket s in channelz database -// with ref as its reference name, and add it to the child list of its parent -// (identified by pid). It returns the unique channelz tracking id assigned to -// this listen socket. -// -// If channelz is not turned ON, the channelz database is not mutated. -func RegisterListenSocket(s Socket, pid *Identifier, ref string) (*Identifier, error) { -	if pid == nil { -		return nil, errors.New("a ListenSocket's parent id cannot be 0") +		ID:            id,  	} -	id := IDGen.genID() -	if !IsOn() { -		return newIdentifer(RefListenSocket, id, pid), nil -	} - -	ls := &listenSocket{refName: ref, s: s, id: id, pid: pid.Int()} -	db.get().addListenSocket(id, ls, pid.Int()) -	return newIdentifer(RefListenSocket, id, pid), nil +	db.addServer(id, svr) +	return svr  } -// RegisterNormalSocket registers the given normal socket s in channelz database +// RegisterSocket registers the given normal socket s in channelz database  // with ref as its reference name, and adds it to the child list of its parent -// (identified by pid). It returns the unique channelz tracking id assigned to -// this normal socket. +// (identified by skt.Parent, which must be set). It returns the unique channelz +// tracking id assigned to this normal socket.  //  // If channelz is not turned ON, the channelz database is not mutated. -func RegisterNormalSocket(s Socket, pid *Identifier, ref string) (*Identifier, error) { -	if pid == nil { -		return nil, errors.New("a NormalSocket's parent id cannot be 0") -	} -	id := IDGen.genID() -	if !IsOn() { -		return newIdentifer(RefNormalSocket, id, pid), nil +func RegisterSocket(skt *Socket) *Socket { +	skt.ID = IDGen.genID() +	if IsOn() { +		db.addSocket(skt)  	} - -	ns := &normalSocket{refName: ref, s: s, id: id, pid: pid.Int()} -	db.get().addNormalSocket(id, ns, pid.Int()) -	return newIdentifer(RefNormalSocket, id, pid), nil +	return skt  }  // RemoveEntry removes an entry with unique channelz tracking id to be id from  // channelz database.  //  // If channelz is not turned ON, this function is a no-op. -func RemoveEntry(id *Identifier) { +func RemoveEntry(id int64) {  	if !IsOn() {  		return  	} -	db.get().removeEntry(id.Int()) -} - -// TraceEventDesc is what the caller of AddTraceEvent should provide to describe -// the event to be added to the channel trace. -// -// The Parent field is optional. It is used for an event that will be recorded -// in the entity's parent trace. -type TraceEventDesc struct { -	Desc     string -	Severity Severity -	Parent   *TraceEventDesc -} - -// AddTraceEvent adds trace related to the entity with specified id, using the -// provided TraceEventDesc. -// -// If channelz is not turned ON, this will simply log the event descriptions. -func AddTraceEvent(l grpclog.DepthLoggerV2, id *Identifier, depth int, desc *TraceEventDesc) { -	// Log only the trace description associated with the bottom most entity. -	switch desc.Severity { -	case CtUnknown, CtInfo: -		l.InfoDepth(depth+1, withParens(id)+desc.Desc) -	case CtWarning: -		l.WarningDepth(depth+1, withParens(id)+desc.Desc) -	case CtError: -		l.ErrorDepth(depth+1, withParens(id)+desc.Desc) -	} - -	if getMaxTraceEntry() == 0 { -		return -	} -	if IsOn() { -		db.get().traceEvent(id.Int(), desc) -	} -} - -// channelMap is the storage data structure for channelz. -// Methods of channelMap can be divided in two two categories with respect to locking. -// 1. Methods acquire the global lock. -// 2. Methods that can only be called when global lock is held. -// A second type of method need always to be called inside a first type of method. -type channelMap struct { -	mu               sync.RWMutex -	topLevelChannels map[int64]struct{} -	servers          map[int64]*server -	channels         map[int64]*channel -	subChannels      map[int64]*subChannel -	listenSockets    map[int64]*listenSocket -	normalSockets    map[int64]*normalSocket -} - -func newChannelMap() *channelMap { -	return &channelMap{ -		topLevelChannels: make(map[int64]struct{}), -		channels:         make(map[int64]*channel), -		listenSockets:    make(map[int64]*listenSocket), -		normalSockets:    make(map[int64]*normalSocket), -		servers:          make(map[int64]*server), -		subChannels:      make(map[int64]*subChannel), -	} -} - -func (c *channelMap) addServer(id int64, s *server) { -	c.mu.Lock() -	s.cm = c -	c.servers[id] = s -	c.mu.Unlock() -} - -func (c *channelMap) addChannel(id int64, cn *channel, isTopChannel bool, pid int64) { -	c.mu.Lock() -	cn.cm = c -	cn.trace.cm = c -	c.channels[id] = cn -	if isTopChannel { -		c.topLevelChannels[id] = struct{}{} -	} else { -		c.findEntry(pid).addChild(id, cn) -	} -	c.mu.Unlock() -} - -func (c *channelMap) addSubChannel(id int64, sc *subChannel, pid int64) { -	c.mu.Lock() -	sc.cm = c -	sc.trace.cm = c -	c.subChannels[id] = sc -	c.findEntry(pid).addChild(id, sc) -	c.mu.Unlock() -} - -func (c *channelMap) addListenSocket(id int64, ls *listenSocket, pid int64) { -	c.mu.Lock() -	ls.cm = c -	c.listenSockets[id] = ls -	c.findEntry(pid).addChild(id, ls) -	c.mu.Unlock() -} - -func (c *channelMap) addNormalSocket(id int64, ns *normalSocket, pid int64) { -	c.mu.Lock() -	ns.cm = c -	c.normalSockets[id] = ns -	c.findEntry(pid).addChild(id, ns) -	c.mu.Unlock() -} - -// removeEntry triggers the removal of an entry, which may not indeed delete the entry, if it has to -// wait on the deletion of its children and until no other entity's channel trace references it. -// It may lead to a chain of entry deletion. For example, deleting the last socket of a gracefully -// shutting down server will lead to the server being also deleted. -func (c *channelMap) removeEntry(id int64) { -	c.mu.Lock() -	c.findEntry(id).triggerDelete() -	c.mu.Unlock() -} - -// c.mu must be held by the caller -func (c *channelMap) decrTraceRefCount(id int64) { -	e := c.findEntry(id) -	if v, ok := e.(tracedChannel); ok { -		v.decrTraceRefCount() -		e.deleteSelfIfReady() -	} -} - -// c.mu must be held by the caller. -func (c *channelMap) findEntry(id int64) entry { -	var v entry -	var ok bool -	if v, ok = c.channels[id]; ok { -		return v -	} -	if v, ok = c.subChannels[id]; ok { -		return v -	} -	if v, ok = c.servers[id]; ok { -		return v -	} -	if v, ok = c.listenSockets[id]; ok { -		return v -	} -	if v, ok = c.normalSockets[id]; ok { -		return v -	} -	return &dummyEntry{idNotFound: id} -} - -// c.mu must be held by the caller -// deleteEntry simply deletes an entry from the channelMap. Before calling this -// method, caller must check this entry is ready to be deleted, i.e removeEntry() -// has been called on it, and no children still exist. -// Conditionals are ordered by the expected frequency of deletion of each entity -// type, in order to optimize performance. -func (c *channelMap) deleteEntry(id int64) { -	var ok bool -	if _, ok = c.normalSockets[id]; ok { -		delete(c.normalSockets, id) -		return -	} -	if _, ok = c.subChannels[id]; ok { -		delete(c.subChannels, id) -		return -	} -	if _, ok = c.channels[id]; ok { -		delete(c.channels, id) -		delete(c.topLevelChannels, id) -		return -	} -	if _, ok = c.listenSockets[id]; ok { -		delete(c.listenSockets, id) -		return -	} -	if _, ok = c.servers[id]; ok { -		delete(c.servers, id) -		return -	} -} - -func (c *channelMap) traceEvent(id int64, desc *TraceEventDesc) { -	c.mu.Lock() -	child := c.findEntry(id) -	childTC, ok := child.(tracedChannel) -	if !ok { -		c.mu.Unlock() -		return -	} -	childTC.getChannelTrace().append(&TraceEvent{Desc: desc.Desc, Severity: desc.Severity, Timestamp: time.Now()}) -	if desc.Parent != nil { -		parent := c.findEntry(child.getParentID()) -		var chanType RefChannelType -		switch child.(type) { -		case *channel: -			chanType = RefChannel -		case *subChannel: -			chanType = RefSubChannel -		} -		if parentTC, ok := parent.(tracedChannel); ok { -			parentTC.getChannelTrace().append(&TraceEvent{ -				Desc:      desc.Parent.Desc, -				Severity:  desc.Parent.Severity, -				Timestamp: time.Now(), -				RefID:     id, -				RefName:   childTC.getRefName(), -				RefType:   chanType, -			}) -			childTC.incrTraceRefCount() -		} -	} -	c.mu.Unlock() -} - -type int64Slice []int64 - -func (s int64Slice) Len() int           { return len(s) } -func (s int64Slice) Swap(i, j int)      { s[i], s[j] = s[j], s[i] } -func (s int64Slice) Less(i, j int) bool { return s[i] < s[j] } - -func copyMap(m map[int64]string) map[int64]string { -	n := make(map[int64]string) -	for k, v := range m { -		n[k] = v -	} -	return n -} - -func min(a, b int64) int64 { -	if a < b { -		return a -	} -	return b -} - -func (c *channelMap) GetTopChannels(id int64, maxResults int64) ([]*ChannelMetric, bool) { -	if maxResults <= 0 { -		maxResults = EntryPerPage -	} -	c.mu.RLock() -	l := int64(len(c.topLevelChannels)) -	ids := make([]int64, 0, l) -	cns := make([]*channel, 0, min(l, maxResults)) - -	for k := range c.topLevelChannels { -		ids = append(ids, k) -	} -	sort.Sort(int64Slice(ids)) -	idx := sort.Search(len(ids), func(i int) bool { return ids[i] >= id }) -	count := int64(0) -	var end bool -	var t []*ChannelMetric -	for i, v := range ids[idx:] { -		if count == maxResults { -			break -		} -		if cn, ok := c.channels[v]; ok { -			cns = append(cns, cn) -			t = append(t, &ChannelMetric{ -				NestedChans: copyMap(cn.nestedChans), -				SubChans:    copyMap(cn.subChans), -			}) -			count++ -		} -		if i == len(ids[idx:])-1 { -			end = true -			break -		} -	} -	c.mu.RUnlock() -	if count == 0 { -		end = true -	} - -	for i, cn := range cns { -		t[i].ChannelData = cn.c.ChannelzMetric() -		t[i].ID = cn.id -		t[i].RefName = cn.refName -		t[i].Trace = cn.trace.dumpData() -	} -	return t, end -} - -func (c *channelMap) GetServers(id, maxResults int64) ([]*ServerMetric, bool) { -	if maxResults <= 0 { -		maxResults = EntryPerPage -	} -	c.mu.RLock() -	l := int64(len(c.servers)) -	ids := make([]int64, 0, l) -	ss := make([]*server, 0, min(l, maxResults)) -	for k := range c.servers { -		ids = append(ids, k) -	} -	sort.Sort(int64Slice(ids)) -	idx := sort.Search(len(ids), func(i int) bool { return ids[i] >= id }) -	count := int64(0) -	var end bool -	var s []*ServerMetric -	for i, v := range ids[idx:] { -		if count == maxResults { -			break -		} -		if svr, ok := c.servers[v]; ok { -			ss = append(ss, svr) -			s = append(s, &ServerMetric{ -				ListenSockets: copyMap(svr.listenSockets), -			}) -			count++ -		} -		if i == len(ids[idx:])-1 { -			end = true -			break -		} -	} -	c.mu.RUnlock() -	if count == 0 { -		end = true -	} - -	for i, svr := range ss { -		s[i].ServerData = svr.s.ChannelzMetric() -		s[i].ID = svr.id -		s[i].RefName = svr.refName -	} -	return s, end -} - -func (c *channelMap) GetServerSockets(id int64, startID int64, maxResults int64) ([]*SocketMetric, bool) { -	if maxResults <= 0 { -		maxResults = EntryPerPage -	} -	var svr *server -	var ok bool -	c.mu.RLock() -	if svr, ok = c.servers[id]; !ok { -		// server with id doesn't exist. -		c.mu.RUnlock() -		return nil, true -	} -	svrskts := svr.sockets -	l := int64(len(svrskts)) -	ids := make([]int64, 0, l) -	sks := make([]*normalSocket, 0, min(l, maxResults)) -	for k := range svrskts { -		ids = append(ids, k) -	} -	sort.Sort(int64Slice(ids)) -	idx := sort.Search(len(ids), func(i int) bool { return ids[i] >= startID }) -	count := int64(0) -	var end bool -	for i, v := range ids[idx:] { -		if count == maxResults { -			break -		} -		if ns, ok := c.normalSockets[v]; ok { -			sks = append(sks, ns) -			count++ -		} -		if i == len(ids[idx:])-1 { -			end = true -			break -		} -	} -	c.mu.RUnlock() -	if count == 0 { -		end = true -	} -	s := make([]*SocketMetric, 0, len(sks)) -	for _, ns := range sks { -		sm := &SocketMetric{} -		sm.SocketData = ns.s.ChannelzMetric() -		sm.ID = ns.id -		sm.RefName = ns.refName -		s = append(s, sm) -	} -	return s, end -} - -func (c *channelMap) GetChannel(id int64) *ChannelMetric { -	cm := &ChannelMetric{} -	var cn *channel -	var ok bool -	c.mu.RLock() -	if cn, ok = c.channels[id]; !ok { -		// channel with id doesn't exist. -		c.mu.RUnlock() -		return nil -	} -	cm.NestedChans = copyMap(cn.nestedChans) -	cm.SubChans = copyMap(cn.subChans) -	// cn.c can be set to &dummyChannel{} when deleteSelfFromMap is called. Save a copy of cn.c when -	// holding the lock to prevent potential data race. -	chanCopy := cn.c -	c.mu.RUnlock() -	cm.ChannelData = chanCopy.ChannelzMetric() -	cm.ID = cn.id -	cm.RefName = cn.refName -	cm.Trace = cn.trace.dumpData() -	return cm -} - -func (c *channelMap) GetSubChannel(id int64) *SubChannelMetric { -	cm := &SubChannelMetric{} -	var sc *subChannel -	var ok bool -	c.mu.RLock() -	if sc, ok = c.subChannels[id]; !ok { -		// subchannel with id doesn't exist. -		c.mu.RUnlock() -		return nil -	} -	cm.Sockets = copyMap(sc.sockets) -	// sc.c can be set to &dummyChannel{} when deleteSelfFromMap is called. Save a copy of sc.c when -	// holding the lock to prevent potential data race. -	chanCopy := sc.c -	c.mu.RUnlock() -	cm.ChannelData = chanCopy.ChannelzMetric() -	cm.ID = sc.id -	cm.RefName = sc.refName -	cm.Trace = sc.trace.dumpData() -	return cm -} - -func (c *channelMap) GetSocket(id int64) *SocketMetric { -	sm := &SocketMetric{} -	c.mu.RLock() -	if ls, ok := c.listenSockets[id]; ok { -		c.mu.RUnlock() -		sm.SocketData = ls.s.ChannelzMetric() -		sm.ID = ls.id -		sm.RefName = ls.refName -		return sm -	} -	if ns, ok := c.normalSockets[id]; ok { -		c.mu.RUnlock() -		sm.SocketData = ns.s.ChannelzMetric() -		sm.ID = ns.id -		sm.RefName = ns.refName -		return sm -	} -	c.mu.RUnlock() -	return nil -} - -func (c *channelMap) GetServer(id int64) *ServerMetric { -	sm := &ServerMetric{} -	var svr *server -	var ok bool -	c.mu.RLock() -	if svr, ok = c.servers[id]; !ok { -		c.mu.RUnlock() -		return nil -	} -	sm.ListenSockets = copyMap(svr.listenSockets) -	c.mu.RUnlock() -	sm.ID = svr.id -	sm.RefName = svr.refName -	sm.ServerData = svr.s.ChannelzMetric() -	return sm +	db.removeEntry(id)  }  // IDGenerator is an incrementing atomic that tracks IDs for channelz entities. @@ -761,3 +219,11 @@ func (i *IDGenerator) Reset() {  func (i *IDGenerator) genID() int64 {  	return atomic.AddInt64(&i.id, 1)  } + +// Identifier is an opaque channelz identifier used to expose channelz symbols +// outside of grpc.  Currently only implemented by Channel since no other +// types require exposure outside grpc. +type Identifier interface { +	Entity +	channelzIdentifier() +} diff --git a/vendor/google.golang.org/grpc/internal/channelz/id.go b/vendor/google.golang.org/grpc/internal/channelz/id.go deleted file mode 100644 index c9a27acd3..000000000 --- a/vendor/google.golang.org/grpc/internal/channelz/id.go +++ /dev/null @@ -1,75 +0,0 @@ -/* - * - * Copyright 2022 gRPC authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - *     http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -package channelz - -import "fmt" - -// Identifier is an opaque identifier which uniquely identifies an entity in the -// channelz database. -type Identifier struct { -	typ RefChannelType -	id  int64 -	str string -	pid *Identifier -} - -// Type returns the entity type corresponding to id. -func (id *Identifier) Type() RefChannelType { -	return id.typ -} - -// Int returns the integer identifier corresponding to id. -func (id *Identifier) Int() int64 { -	return id.id -} - -// String returns a string representation of the entity corresponding to id. -// -// This includes some information about the parent as well. Examples: -// Top-level channel: [Channel #channel-number] -// Nested channel:    [Channel #parent-channel-number Channel #channel-number] -// Sub channel:       [Channel #parent-channel SubChannel #subchannel-number] -func (id *Identifier) String() string { -	return id.str -} - -// Equal returns true if other is the same as id. -func (id *Identifier) Equal(other *Identifier) bool { -	if (id != nil) != (other != nil) { -		return false -	} -	if id == nil && other == nil { -		return true -	} -	return id.typ == other.typ && id.id == other.id && id.pid == other.pid -} - -// NewIdentifierForTesting returns a new opaque identifier to be used only for -// testing purposes. -func NewIdentifierForTesting(typ RefChannelType, id int64, pid *Identifier) *Identifier { -	return newIdentifer(typ, id, pid) -} - -func newIdentifer(typ RefChannelType, id int64, pid *Identifier) *Identifier { -	str := fmt.Sprintf("%s #%d", typ, id) -	if pid != nil { -		str = fmt.Sprintf("%s %s", pid, str) -	} -	return &Identifier{typ: typ, id: id, str: str, pid: pid} -} diff --git a/vendor/google.golang.org/grpc/internal/channelz/logging.go b/vendor/google.golang.org/grpc/internal/channelz/logging.go index f89e6f77b..ee4d72125 100644 --- a/vendor/google.golang.org/grpc/internal/channelz/logging.go +++ b/vendor/google.golang.org/grpc/internal/channelz/logging.go @@ -26,53 +26,49 @@ import (  var logger = grpclog.Component("channelz") -func withParens(id *Identifier) string { -	return "[" + id.String() + "] " -} -  // Info logs and adds a trace event if channelz is on. -func Info(l grpclog.DepthLoggerV2, id *Identifier, args ...any) { -	AddTraceEvent(l, id, 1, &TraceEventDesc{ +func Info(l grpclog.DepthLoggerV2, e Entity, args ...any) { +	AddTraceEvent(l, e, 1, &TraceEvent{  		Desc:     fmt.Sprint(args...),  		Severity: CtInfo,  	})  }  // Infof logs and adds a trace event if channelz is on. -func Infof(l grpclog.DepthLoggerV2, id *Identifier, format string, args ...any) { -	AddTraceEvent(l, id, 1, &TraceEventDesc{ +func Infof(l grpclog.DepthLoggerV2, e Entity, format string, args ...any) { +	AddTraceEvent(l, e, 1, &TraceEvent{  		Desc:     fmt.Sprintf(format, args...),  		Severity: CtInfo,  	})  }  // Warning logs and adds a trace event if channelz is on. -func Warning(l grpclog.DepthLoggerV2, id *Identifier, args ...any) { -	AddTraceEvent(l, id, 1, &TraceEventDesc{ +func Warning(l grpclog.DepthLoggerV2, e Entity, args ...any) { +	AddTraceEvent(l, e, 1, &TraceEvent{  		Desc:     fmt.Sprint(args...),  		Severity: CtWarning,  	})  }  // Warningf logs and adds a trace event if channelz is on. -func Warningf(l grpclog.DepthLoggerV2, id *Identifier, format string, args ...any) { -	AddTraceEvent(l, id, 1, &TraceEventDesc{ +func Warningf(l grpclog.DepthLoggerV2, e Entity, format string, args ...any) { +	AddTraceEvent(l, e, 1, &TraceEvent{  		Desc:     fmt.Sprintf(format, args...),  		Severity: CtWarning,  	})  }  // Error logs and adds a trace event if channelz is on. -func Error(l grpclog.DepthLoggerV2, id *Identifier, args ...any) { -	AddTraceEvent(l, id, 1, &TraceEventDesc{ +func Error(l grpclog.DepthLoggerV2, e Entity, args ...any) { +	AddTraceEvent(l, e, 1, &TraceEvent{  		Desc:     fmt.Sprint(args...),  		Severity: CtError,  	})  }  // Errorf logs and adds a trace event if channelz is on. -func Errorf(l grpclog.DepthLoggerV2, id *Identifier, format string, args ...any) { -	AddTraceEvent(l, id, 1, &TraceEventDesc{ +func Errorf(l grpclog.DepthLoggerV2, e Entity, format string, args ...any) { +	AddTraceEvent(l, e, 1, &TraceEvent{  		Desc:     fmt.Sprintf(format, args...),  		Severity: CtError,  	}) diff --git a/vendor/google.golang.org/grpc/internal/channelz/server.go b/vendor/google.golang.org/grpc/internal/channelz/server.go new file mode 100644 index 000000000..cdfc49d6e --- /dev/null +++ b/vendor/google.golang.org/grpc/internal/channelz/server.go @@ -0,0 +1,119 @@ +/* + * + * Copyright 2024 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + *     http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package channelz + +import ( +	"fmt" +	"sync/atomic" +) + +// Server is the channelz representation of a server. +type Server struct { +	Entity +	ID      int64 +	RefName string + +	ServerMetrics ServerMetrics + +	closeCalled   bool +	sockets       map[int64]string +	listenSockets map[int64]string +	cm            *channelMap +} + +// ServerMetrics defines a struct containing metrics for servers. +type ServerMetrics struct { +	// The number of incoming calls started on the server. +	CallsStarted atomic.Int64 +	// The number of incoming calls that have completed with an OK status. +	CallsSucceeded atomic.Int64 +	// The number of incoming calls that have a completed with a non-OK status. +	CallsFailed atomic.Int64 +	// The last time a call was started on the server. +	LastCallStartedTimestamp atomic.Int64 +} + +// NewServerMetricsForTesting returns an initialized ServerMetrics. +func NewServerMetricsForTesting(started, succeeded, failed, timestamp int64) *ServerMetrics { +	sm := &ServerMetrics{} +	sm.CallsStarted.Store(started) +	sm.CallsSucceeded.Store(succeeded) +	sm.CallsFailed.Store(failed) +	sm.LastCallStartedTimestamp.Store(timestamp) +	return sm +} + +func (sm *ServerMetrics) CopyFrom(o *ServerMetrics) { +	sm.CallsStarted.Store(o.CallsStarted.Load()) +	sm.CallsSucceeded.Store(o.CallsSucceeded.Load()) +	sm.CallsFailed.Store(o.CallsFailed.Load()) +	sm.LastCallStartedTimestamp.Store(o.LastCallStartedTimestamp.Load()) +} + +// ListenSockets returns the listening sockets for s. +func (s *Server) ListenSockets() map[int64]string { +	db.mu.RLock() +	defer db.mu.RUnlock() +	return copyMap(s.listenSockets) +} + +// String returns a printable description of s. +func (s *Server) String() string { +	return fmt.Sprintf("Server #%d", s.ID) +} + +func (s *Server) id() int64 { +	return s.ID +} + +func (s *Server) addChild(id int64, e entry) { +	switch v := e.(type) { +	case *Socket: +		switch v.SocketType { +		case SocketTypeNormal: +			s.sockets[id] = v.RefName +		case SocketTypeListen: +			s.listenSockets[id] = v.RefName +		} +	default: +		logger.Errorf("cannot add a child (id = %d) of type %T to a server", id, e) +	} +} + +func (s *Server) deleteChild(id int64) { +	delete(s.sockets, id) +	delete(s.listenSockets, id) +	s.deleteSelfIfReady() +} + +func (s *Server) triggerDelete() { +	s.closeCalled = true +	s.deleteSelfIfReady() +} + +func (s *Server) deleteSelfIfReady() { +	if !s.closeCalled || len(s.sockets)+len(s.listenSockets) != 0 { +		return +	} +	s.cm.deleteEntry(s.ID) +} + +func (s *Server) getParentID() int64 { +	return 0 +} diff --git a/vendor/google.golang.org/grpc/internal/channelz/socket.go b/vendor/google.golang.org/grpc/internal/channelz/socket.go new file mode 100644 index 000000000..fa64834b2 --- /dev/null +++ b/vendor/google.golang.org/grpc/internal/channelz/socket.go @@ -0,0 +1,130 @@ +/* + * + * Copyright 2024 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + *     http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package channelz + +import ( +	"fmt" +	"net" +	"sync/atomic" + +	"google.golang.org/grpc/credentials" +) + +// SocketMetrics defines the struct that the implementor of Socket interface +// should return from ChannelzMetric(). +type SocketMetrics struct { +	// The number of streams that have been started. +	StreamsStarted atomic.Int64 +	// The number of streams that have ended successfully: +	// On client side, receiving frame with eos bit set. +	// On server side, sending frame with eos bit set. +	StreamsSucceeded atomic.Int64 +	// The number of streams that have ended unsuccessfully: +	// On client side, termination without receiving frame with eos bit set. +	// On server side, termination without sending frame with eos bit set. +	StreamsFailed atomic.Int64 +	// The number of messages successfully sent on this socket. +	MessagesSent     atomic.Int64 +	MessagesReceived atomic.Int64 +	// The number of keep alives sent.  This is typically implemented with HTTP/2 +	// ping messages. +	KeepAlivesSent atomic.Int64 +	// The last time a stream was created by this endpoint.  Usually unset for +	// servers. +	LastLocalStreamCreatedTimestamp atomic.Int64 +	// The last time a stream was created by the remote endpoint.  Usually unset +	// for clients. +	LastRemoteStreamCreatedTimestamp atomic.Int64 +	// The last time a message was sent by this endpoint. +	LastMessageSentTimestamp atomic.Int64 +	// The last time a message was received by this endpoint. +	LastMessageReceivedTimestamp atomic.Int64 +} + +// EphemeralSocketMetrics are metrics that change rapidly and are tracked +// outside of channelz. +type EphemeralSocketMetrics struct { +	// The amount of window, granted to the local endpoint by the remote endpoint. +	// This may be slightly out of date due to network latency.  This does NOT +	// include stream level or TCP level flow control info. +	LocalFlowControlWindow int64 +	// The amount of window, granted to the remote endpoint by the local endpoint. +	// This may be slightly out of date due to network latency.  This does NOT +	// include stream level or TCP level flow control info. +	RemoteFlowControlWindow int64 +} + +type SocketType string + +const ( +	SocketTypeNormal = "NormalSocket" +	SocketTypeListen = "ListenSocket" +) + +type Socket struct { +	Entity +	SocketType       SocketType +	ID               int64 +	Parent           Entity +	cm               *channelMap +	SocketMetrics    SocketMetrics +	EphemeralMetrics func() *EphemeralSocketMetrics + +	RefName string +	// The locally bound address.  Immutable. +	LocalAddr net.Addr +	// The remote bound address.  May be absent.  Immutable. +	RemoteAddr net.Addr +	// Optional, represents the name of the remote endpoint, if different than +	// the original target name.  Immutable. +	RemoteName string +	// Immutable. +	SocketOptions *SocketOptionData +	// Immutable. +	Security credentials.ChannelzSecurityValue +} + +func (ls *Socket) String() string { +	return fmt.Sprintf("%s %s #%d", ls.Parent, ls.SocketType, ls.ID) +} + +func (ls *Socket) id() int64 { +	return ls.ID +} + +func (ls *Socket) addChild(id int64, e entry) { +	logger.Errorf("cannot add a child (id = %d) of type %T to a listen socket", id, e) +} + +func (ls *Socket) deleteChild(id int64) { +	logger.Errorf("cannot delete a child (id = %d) from a listen socket", id) +} + +func (ls *Socket) triggerDelete() { +	ls.cm.deleteEntry(ls.ID) +	ls.Parent.(entry).deleteChild(ls.ID) +} + +func (ls *Socket) deleteSelfIfReady() { +	logger.Errorf("cannot call deleteSelfIfReady on a listen socket") +} + +func (ls *Socket) getParentID() int64 { +	return ls.Parent.id() +} diff --git a/vendor/google.golang.org/grpc/internal/channelz/subchannel.go b/vendor/google.golang.org/grpc/internal/channelz/subchannel.go new file mode 100644 index 000000000..3b88e4cba --- /dev/null +++ b/vendor/google.golang.org/grpc/internal/channelz/subchannel.go @@ -0,0 +1,151 @@ +/* + * + * Copyright 2024 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + *     http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package channelz + +import ( +	"fmt" +	"sync/atomic" +) + +// SubChannel is the channelz representation of a subchannel. +type SubChannel struct { +	Entity +	// ID is the channelz id of this subchannel. +	ID int64 +	// RefName is the human readable reference string of this subchannel. +	RefName       string +	closeCalled   bool +	sockets       map[int64]string +	parent        *Channel +	trace         *ChannelTrace +	traceRefCount int32 + +	ChannelMetrics ChannelMetrics +} + +func (sc *SubChannel) String() string { +	return fmt.Sprintf("%s SubChannel #%d", sc.parent, sc.ID) +} + +func (sc *SubChannel) id() int64 { +	return sc.ID +} + +func (sc *SubChannel) Sockets() map[int64]string { +	db.mu.RLock() +	defer db.mu.RUnlock() +	return copyMap(sc.sockets) +} + +func (sc *SubChannel) Trace() *ChannelTrace { +	db.mu.RLock() +	defer db.mu.RUnlock() +	return sc.trace.copy() +} + +func (sc *SubChannel) addChild(id int64, e entry) { +	if v, ok := e.(*Socket); ok && v.SocketType == SocketTypeNormal { +		sc.sockets[id] = v.RefName +	} else { +		logger.Errorf("cannot add a child (id = %d) of type %T to a subChannel", id, e) +	} +} + +func (sc *SubChannel) deleteChild(id int64) { +	delete(sc.sockets, id) +	sc.deleteSelfIfReady() +} + +func (sc *SubChannel) triggerDelete() { +	sc.closeCalled = true +	sc.deleteSelfIfReady() +} + +func (sc *SubChannel) getParentID() int64 { +	return sc.parent.ID +} + +// deleteSelfFromTree tries to delete the subchannel from the channelz entry relation tree, which +// means deleting the subchannel reference from its parent's child list. +// +// In order for a subchannel to be deleted from the tree, it must meet the criteria that, removal of +// the corresponding grpc object has been invoked, and the subchannel does not have any children left. +// +// The returned boolean value indicates whether the channel has been successfully deleted from tree. +func (sc *SubChannel) deleteSelfFromTree() (deleted bool) { +	if !sc.closeCalled || len(sc.sockets) != 0 { +		return false +	} +	sc.parent.deleteChild(sc.ID) +	return true +} + +// deleteSelfFromMap checks whether it is valid to delete the subchannel from the map, which means +// deleting the subchannel from channelz's tracking entirely. Users can no longer use id to query +// the subchannel, and its memory will be garbage collected. +// +// The trace reference count of the subchannel must be 0 in order to be deleted from the map. This is +// specified in the channel tracing gRFC that as long as some other trace has reference to an entity, +// the trace of the referenced entity must not be deleted. In order to release the resource allocated +// by grpc, the reference to the grpc object is reset to a dummy object. +// +// deleteSelfFromMap must be called after deleteSelfFromTree returns true. +// +// It returns a bool to indicate whether the channel can be safely deleted from map. +func (sc *SubChannel) deleteSelfFromMap() (delete bool) { +	return sc.getTraceRefCount() == 0 +} + +// deleteSelfIfReady tries to delete the subchannel itself from the channelz database. +// The delete process includes two steps: +//  1. delete the subchannel from the entry relation tree, i.e. delete the subchannel reference from +//     its parent's child list. +//  2. delete the subchannel from the map, i.e. delete the subchannel entirely from channelz. Lookup +//     by id will return entry not found error. +func (sc *SubChannel) deleteSelfIfReady() { +	if !sc.deleteSelfFromTree() { +		return +	} +	if !sc.deleteSelfFromMap() { +		return +	} +	db.deleteEntry(sc.ID) +	sc.trace.clear() +} + +func (sc *SubChannel) getChannelTrace() *ChannelTrace { +	return sc.trace +} + +func (sc *SubChannel) incrTraceRefCount() { +	atomic.AddInt32(&sc.traceRefCount, 1) +} + +func (sc *SubChannel) decrTraceRefCount() { +	atomic.AddInt32(&sc.traceRefCount, -1) +} + +func (sc *SubChannel) getTraceRefCount() int { +	i := atomic.LoadInt32(&sc.traceRefCount) +	return int(i) +} + +func (sc *SubChannel) getRefName() string { +	return sc.RefName +} diff --git a/vendor/google.golang.org/grpc/internal/channelz/types_linux.go b/vendor/google.golang.org/grpc/internal/channelz/syscall_linux.go index 1b1c4cce3..5ac73ff83 100644 --- a/vendor/google.golang.org/grpc/internal/channelz/types_linux.go +++ b/vendor/google.golang.org/grpc/internal/channelz/syscall_linux.go @@ -49,3 +49,17 @@ func (s *SocketOptionData) Getsockopt(fd uintptr) {  		s.TCPInfo = v  	}  } + +// GetSocketOption gets the socket option info of the conn. +func GetSocketOption(socket any) *SocketOptionData { +	c, ok := socket.(syscall.Conn) +	if !ok { +		return nil +	} +	data := &SocketOptionData{} +	if rawConn, err := c.SyscallConn(); err == nil { +		rawConn.Control(data.Getsockopt) +		return data +	} +	return nil +} diff --git a/vendor/google.golang.org/grpc/internal/channelz/types_nonlinux.go b/vendor/google.golang.org/grpc/internal/channelz/syscall_nonlinux.go index 8b06eed1a..d1ed8df6a 100644 --- a/vendor/google.golang.org/grpc/internal/channelz/types_nonlinux.go +++ b/vendor/google.golang.org/grpc/internal/channelz/syscall_nonlinux.go @@ -1,5 +1,4 @@  //go:build !linux -// +build !linux  /*   * @@ -41,3 +40,8 @@ func (s *SocketOptionData) Getsockopt(fd uintptr) {  		logger.Warning("Channelz: socket options are not supported on non-linux environments")  	})  } + +// GetSocketOption gets the socket option info of the conn. +func GetSocketOption(c any) *SocketOptionData { +	return nil +} diff --git a/vendor/google.golang.org/grpc/internal/channelz/trace.go b/vendor/google.golang.org/grpc/internal/channelz/trace.go new file mode 100644 index 000000000..36b867403 --- /dev/null +++ b/vendor/google.golang.org/grpc/internal/channelz/trace.go @@ -0,0 +1,204 @@ +/* + * + * Copyright 2018 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + *     http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package channelz + +import ( +	"fmt" +	"sync" +	"sync/atomic" +	"time" + +	"google.golang.org/grpc/grpclog" +) + +const ( +	defaultMaxTraceEntry int32 = 30 +) + +var maxTraceEntry = defaultMaxTraceEntry + +// SetMaxTraceEntry sets maximum number of trace entries per entity (i.e. +// channel/subchannel).  Setting it to 0 will disable channel tracing. +func SetMaxTraceEntry(i int32) { +	atomic.StoreInt32(&maxTraceEntry, i) +} + +// ResetMaxTraceEntryToDefault resets the maximum number of trace entries per +// entity to default. +func ResetMaxTraceEntryToDefault() { +	atomic.StoreInt32(&maxTraceEntry, defaultMaxTraceEntry) +} + +func getMaxTraceEntry() int { +	i := atomic.LoadInt32(&maxTraceEntry) +	return int(i) +} + +// traceEvent is an internal representation of a single trace event +type traceEvent struct { +	// Desc is a simple description of the trace event. +	Desc string +	// Severity states the severity of this trace event. +	Severity Severity +	// Timestamp is the event time. +	Timestamp time.Time +	// RefID is the id of the entity that gets referenced in the event. RefID is 0 if no other entity is +	// involved in this event. +	// e.g. SubChannel (id: 4[]) Created. --> RefID = 4, RefName = "" (inside []) +	RefID int64 +	// RefName is the reference name for the entity that gets referenced in the event. +	RefName string +	// RefType indicates the referenced entity type, i.e Channel or SubChannel. +	RefType RefChannelType +} + +// TraceEvent is what the caller of AddTraceEvent should provide to describe the +// event to be added to the channel trace. +// +// The Parent field is optional. It is used for an event that will be recorded +// in the entity's parent trace. +type TraceEvent struct { +	Desc     string +	Severity Severity +	Parent   *TraceEvent +} + +type ChannelTrace struct { +	cm           *channelMap +	clearCalled  bool +	CreationTime time.Time +	EventNum     int64 +	mu           sync.Mutex +	Events       []*traceEvent +} + +func (c *ChannelTrace) copy() *ChannelTrace { +	return &ChannelTrace{ +		CreationTime: c.CreationTime, +		EventNum:     c.EventNum, +		Events:       append(([]*traceEvent)(nil), c.Events...), +	} +} + +func (c *ChannelTrace) append(e *traceEvent) { +	c.mu.Lock() +	if len(c.Events) == getMaxTraceEntry() { +		del := c.Events[0] +		c.Events = c.Events[1:] +		if del.RefID != 0 { +			// start recursive cleanup in a goroutine to not block the call originated from grpc. +			go func() { +				// need to acquire c.cm.mu lock to call the unlocked attemptCleanup func. +				c.cm.mu.Lock() +				c.cm.decrTraceRefCount(del.RefID) +				c.cm.mu.Unlock() +			}() +		} +	} +	e.Timestamp = time.Now() +	c.Events = append(c.Events, e) +	c.EventNum++ +	c.mu.Unlock() +} + +func (c *ChannelTrace) clear() { +	if c.clearCalled { +		return +	} +	c.clearCalled = true +	c.mu.Lock() +	for _, e := range c.Events { +		if e.RefID != 0 { +			// caller should have already held the c.cm.mu lock. +			c.cm.decrTraceRefCount(e.RefID) +		} +	} +	c.mu.Unlock() +} + +// Severity is the severity level of a trace event. +// The canonical enumeration of all valid values is here: +// https://github.com/grpc/grpc-proto/blob/9b13d199cc0d4703c7ea26c9c330ba695866eb23/grpc/channelz/v1/channelz.proto#L126. +type Severity int + +const ( +	// CtUnknown indicates unknown severity of a trace event. +	CtUnknown Severity = iota +	// CtInfo indicates info level severity of a trace event. +	CtInfo +	// CtWarning indicates warning level severity of a trace event. +	CtWarning +	// CtError indicates error level severity of a trace event. +	CtError +) + +// RefChannelType is the type of the entity being referenced in a trace event. +type RefChannelType int + +const ( +	// RefUnknown indicates an unknown entity type, the zero value for this type. +	RefUnknown RefChannelType = iota +	// RefChannel indicates the referenced entity is a Channel. +	RefChannel +	// RefSubChannel indicates the referenced entity is a SubChannel. +	RefSubChannel +	// RefServer indicates the referenced entity is a Server. +	RefServer +	// RefListenSocket indicates the referenced entity is a ListenSocket. +	RefListenSocket +	// RefNormalSocket indicates the referenced entity is a NormalSocket. +	RefNormalSocket +) + +var refChannelTypeToString = map[RefChannelType]string{ +	RefUnknown:      "Unknown", +	RefChannel:      "Channel", +	RefSubChannel:   "SubChannel", +	RefServer:       "Server", +	RefListenSocket: "ListenSocket", +	RefNormalSocket: "NormalSocket", +} + +func (r RefChannelType) String() string { +	return refChannelTypeToString[r] +} + +// AddTraceEvent adds trace related to the entity with specified id, using the +// provided TraceEventDesc. +// +// If channelz is not turned ON, this will simply log the event descriptions. +func AddTraceEvent(l grpclog.DepthLoggerV2, e Entity, depth int, desc *TraceEvent) { +	// Log only the trace description associated with the bottom most entity. +	d := fmt.Sprintf("[%s]%s", e, desc.Desc) +	switch desc.Severity { +	case CtUnknown, CtInfo: +		l.InfoDepth(depth+1, d) +	case CtWarning: +		l.WarningDepth(depth+1, d) +	case CtError: +		l.ErrorDepth(depth+1, d) +	} + +	if getMaxTraceEntry() == 0 { +		return +	} +	if IsOn() { +		db.traceEvent(e.id(), desc) +	} +} diff --git a/vendor/google.golang.org/grpc/internal/channelz/types.go b/vendor/google.golang.org/grpc/internal/channelz/types.go deleted file mode 100644 index 1d4020f53..000000000 --- a/vendor/google.golang.org/grpc/internal/channelz/types.go +++ /dev/null @@ -1,727 +0,0 @@ -/* - * - * Copyright 2018 gRPC authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - *     http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -package channelz - -import ( -	"net" -	"sync" -	"sync/atomic" -	"time" - -	"google.golang.org/grpc/connectivity" -	"google.golang.org/grpc/credentials" -) - -// entry represents a node in the channelz database. -type entry interface { -	// addChild adds a child e, whose channelz id is id to child list -	addChild(id int64, e entry) -	// deleteChild deletes a child with channelz id to be id from child list -	deleteChild(id int64) -	// triggerDelete tries to delete self from channelz database. However, if child -	// list is not empty, then deletion from the database is on hold until the last -	// child is deleted from database. -	triggerDelete() -	// deleteSelfIfReady check whether triggerDelete() has been called before, and whether child -	// list is now empty. If both conditions are met, then delete self from database. -	deleteSelfIfReady() -	// getParentID returns parent ID of the entry. 0 value parent ID means no parent. -	getParentID() int64 -} - -// dummyEntry is a fake entry to handle entry not found case. -type dummyEntry struct { -	idNotFound int64 -} - -func (d *dummyEntry) addChild(id int64, e entry) { -	// Note: It is possible for a normal program to reach here under race condition. -	// For example, there could be a race between ClientConn.Close() info being propagated -	// to addrConn and http2Client. ClientConn.Close() cancel the context and result -	// in http2Client to error. The error info is then caught by transport monitor -	// and before addrConn.tearDown() is called in side ClientConn.Close(). Therefore, -	// the addrConn will create a new transport. And when registering the new transport in -	// channelz, its parent addrConn could have already been torn down and deleted -	// from channelz tracking, and thus reach the code here. -	logger.Infof("attempt to add child of type %T with id %d to a parent (id=%d) that doesn't currently exist", e, id, d.idNotFound) -} - -func (d *dummyEntry) deleteChild(id int64) { -	// It is possible for a normal program to reach here under race condition. -	// Refer to the example described in addChild(). -	logger.Infof("attempt to delete child with id %d from a parent (id=%d) that doesn't currently exist", id, d.idNotFound) -} - -func (d *dummyEntry) triggerDelete() { -	logger.Warningf("attempt to delete an entry (id=%d) that doesn't currently exist", d.idNotFound) -} - -func (*dummyEntry) deleteSelfIfReady() { -	// code should not reach here. deleteSelfIfReady is always called on an existing entry. -} - -func (*dummyEntry) getParentID() int64 { -	return 0 -} - -// ChannelMetric defines the info channelz provides for a specific Channel, which -// includes ChannelInternalMetric and channelz-specific data, such as channelz id, -// child list, etc. -type ChannelMetric struct { -	// ID is the channelz id of this channel. -	ID int64 -	// RefName is the human readable reference string of this channel. -	RefName string -	// ChannelData contains channel internal metric reported by the channel through -	// ChannelzMetric(). -	ChannelData *ChannelInternalMetric -	// NestedChans tracks the nested channel type children of this channel in the format of -	// a map from nested channel channelz id to corresponding reference string. -	NestedChans map[int64]string -	// SubChans tracks the subchannel type children of this channel in the format of a -	// map from subchannel channelz id to corresponding reference string. -	SubChans map[int64]string -	// Sockets tracks the socket type children of this channel in the format of a map -	// from socket channelz id to corresponding reference string. -	// Note current grpc implementation doesn't allow channel having sockets directly, -	// therefore, this is field is unused. -	Sockets map[int64]string -	// Trace contains the most recent traced events. -	Trace *ChannelTrace -} - -// SubChannelMetric defines the info channelz provides for a specific SubChannel, -// which includes ChannelInternalMetric and channelz-specific data, such as -// channelz id, child list, etc. -type SubChannelMetric struct { -	// ID is the channelz id of this subchannel. -	ID int64 -	// RefName is the human readable reference string of this subchannel. -	RefName string -	// ChannelData contains subchannel internal metric reported by the subchannel -	// through ChannelzMetric(). -	ChannelData *ChannelInternalMetric -	// NestedChans tracks the nested channel type children of this subchannel in the format of -	// a map from nested channel channelz id to corresponding reference string. -	// Note current grpc implementation doesn't allow subchannel to have nested channels -	// as children, therefore, this field is unused. -	NestedChans map[int64]string -	// SubChans tracks the subchannel type children of this subchannel in the format of a -	// map from subchannel channelz id to corresponding reference string. -	// Note current grpc implementation doesn't allow subchannel to have subchannels -	// as children, therefore, this field is unused. -	SubChans map[int64]string -	// Sockets tracks the socket type children of this subchannel in the format of a map -	// from socket channelz id to corresponding reference string. -	Sockets map[int64]string -	// Trace contains the most recent traced events. -	Trace *ChannelTrace -} - -// ChannelInternalMetric defines the struct that the implementor of Channel interface -// should return from ChannelzMetric(). -type ChannelInternalMetric struct { -	// current connectivity state of the channel. -	State connectivity.State -	// The target this channel originally tried to connect to.  May be absent -	Target string -	// The number of calls started on the channel. -	CallsStarted int64 -	// The number of calls that have completed with an OK status. -	CallsSucceeded int64 -	// The number of calls that have a completed with a non-OK status. -	CallsFailed int64 -	// The last time a call was started on the channel. -	LastCallStartedTimestamp time.Time -} - -// ChannelTrace stores traced events on a channel/subchannel and related info. -type ChannelTrace struct { -	// EventNum is the number of events that ever got traced (i.e. including those that have been deleted) -	EventNum int64 -	// CreationTime is the creation time of the trace. -	CreationTime time.Time -	// Events stores the most recent trace events (up to $maxTraceEntry, newer event will overwrite the -	// oldest one) -	Events []*TraceEvent -} - -// TraceEvent represent a single trace event -type TraceEvent struct { -	// Desc is a simple description of the trace event. -	Desc string -	// Severity states the severity of this trace event. -	Severity Severity -	// Timestamp is the event time. -	Timestamp time.Time -	// RefID is the id of the entity that gets referenced in the event. RefID is 0 if no other entity is -	// involved in this event. -	// e.g. SubChannel (id: 4[]) Created. --> RefID = 4, RefName = "" (inside []) -	RefID int64 -	// RefName is the reference name for the entity that gets referenced in the event. -	RefName string -	// RefType indicates the referenced entity type, i.e Channel or SubChannel. -	RefType RefChannelType -} - -// Channel is the interface that should be satisfied in order to be tracked by -// channelz as Channel or SubChannel. -type Channel interface { -	ChannelzMetric() *ChannelInternalMetric -} - -type dummyChannel struct{} - -func (d *dummyChannel) ChannelzMetric() *ChannelInternalMetric { -	return &ChannelInternalMetric{} -} - -type channel struct { -	refName     string -	c           Channel -	closeCalled bool -	nestedChans map[int64]string -	subChans    map[int64]string -	id          int64 -	pid         int64 -	cm          *channelMap -	trace       *channelTrace -	// traceRefCount is the number of trace events that reference this channel. -	// Non-zero traceRefCount means the trace of this channel cannot be deleted. -	traceRefCount int32 -} - -func (c *channel) addChild(id int64, e entry) { -	switch v := e.(type) { -	case *subChannel: -		c.subChans[id] = v.refName -	case *channel: -		c.nestedChans[id] = v.refName -	default: -		logger.Errorf("cannot add a child (id = %d) of type %T to a channel", id, e) -	} -} - -func (c *channel) deleteChild(id int64) { -	delete(c.subChans, id) -	delete(c.nestedChans, id) -	c.deleteSelfIfReady() -} - -func (c *channel) triggerDelete() { -	c.closeCalled = true -	c.deleteSelfIfReady() -} - -func (c *channel) getParentID() int64 { -	return c.pid -} - -// deleteSelfFromTree tries to delete the channel from the channelz entry relation tree, which means -// deleting the channel reference from its parent's child list. -// -// In order for a channel to be deleted from the tree, it must meet the criteria that, removal of the -// corresponding grpc object has been invoked, and the channel does not have any children left. -// -// The returned boolean value indicates whether the channel has been successfully deleted from tree. -func (c *channel) deleteSelfFromTree() (deleted bool) { -	if !c.closeCalled || len(c.subChans)+len(c.nestedChans) != 0 { -		return false -	} -	// not top channel -	if c.pid != 0 { -		c.cm.findEntry(c.pid).deleteChild(c.id) -	} -	return true -} - -// deleteSelfFromMap checks whether it is valid to delete the channel from the map, which means -// deleting the channel from channelz's tracking entirely. Users can no longer use id to query the -// channel, and its memory will be garbage collected. -// -// The trace reference count of the channel must be 0 in order to be deleted from the map. This is -// specified in the channel tracing gRFC that as long as some other trace has reference to an entity, -// the trace of the referenced entity must not be deleted. In order to release the resource allocated -// by grpc, the reference to the grpc object is reset to a dummy object. -// -// deleteSelfFromMap must be called after deleteSelfFromTree returns true. -// -// It returns a bool to indicate whether the channel can be safely deleted from map. -func (c *channel) deleteSelfFromMap() (delete bool) { -	if c.getTraceRefCount() != 0 { -		c.c = &dummyChannel{} -		return false -	} -	return true -} - -// deleteSelfIfReady tries to delete the channel itself from the channelz database. -// The delete process includes two steps: -//  1. delete the channel from the entry relation tree, i.e. delete the channel reference from its -//     parent's child list. -//  2. delete the channel from the map, i.e. delete the channel entirely from channelz. Lookup by id -//     will return entry not found error. -func (c *channel) deleteSelfIfReady() { -	if !c.deleteSelfFromTree() { -		return -	} -	if !c.deleteSelfFromMap() { -		return -	} -	c.cm.deleteEntry(c.id) -	c.trace.clear() -} - -func (c *channel) getChannelTrace() *channelTrace { -	return c.trace -} - -func (c *channel) incrTraceRefCount() { -	atomic.AddInt32(&c.traceRefCount, 1) -} - -func (c *channel) decrTraceRefCount() { -	atomic.AddInt32(&c.traceRefCount, -1) -} - -func (c *channel) getTraceRefCount() int { -	i := atomic.LoadInt32(&c.traceRefCount) -	return int(i) -} - -func (c *channel) getRefName() string { -	return c.refName -} - -type subChannel struct { -	refName       string -	c             Channel -	closeCalled   bool -	sockets       map[int64]string -	id            int64 -	pid           int64 -	cm            *channelMap -	trace         *channelTrace -	traceRefCount int32 -} - -func (sc *subChannel) addChild(id int64, e entry) { -	if v, ok := e.(*normalSocket); ok { -		sc.sockets[id] = v.refName -	} else { -		logger.Errorf("cannot add a child (id = %d) of type %T to a subChannel", id, e) -	} -} - -func (sc *subChannel) deleteChild(id int64) { -	delete(sc.sockets, id) -	sc.deleteSelfIfReady() -} - -func (sc *subChannel) triggerDelete() { -	sc.closeCalled = true -	sc.deleteSelfIfReady() -} - -func (sc *subChannel) getParentID() int64 { -	return sc.pid -} - -// deleteSelfFromTree tries to delete the subchannel from the channelz entry relation tree, which -// means deleting the subchannel reference from its parent's child list. -// -// In order for a subchannel to be deleted from the tree, it must meet the criteria that, removal of -// the corresponding grpc object has been invoked, and the subchannel does not have any children left. -// -// The returned boolean value indicates whether the channel has been successfully deleted from tree. -func (sc *subChannel) deleteSelfFromTree() (deleted bool) { -	if !sc.closeCalled || len(sc.sockets) != 0 { -		return false -	} -	sc.cm.findEntry(sc.pid).deleteChild(sc.id) -	return true -} - -// deleteSelfFromMap checks whether it is valid to delete the subchannel from the map, which means -// deleting the subchannel from channelz's tracking entirely. Users can no longer use id to query -// the subchannel, and its memory will be garbage collected. -// -// The trace reference count of the subchannel must be 0 in order to be deleted from the map. This is -// specified in the channel tracing gRFC that as long as some other trace has reference to an entity, -// the trace of the referenced entity must not be deleted. In order to release the resource allocated -// by grpc, the reference to the grpc object is reset to a dummy object. -// -// deleteSelfFromMap must be called after deleteSelfFromTree returns true. -// -// It returns a bool to indicate whether the channel can be safely deleted from map. -func (sc *subChannel) deleteSelfFromMap() (delete bool) { -	if sc.getTraceRefCount() != 0 { -		// free the grpc struct (i.e. addrConn) -		sc.c = &dummyChannel{} -		return false -	} -	return true -} - -// deleteSelfIfReady tries to delete the subchannel itself from the channelz database. -// The delete process includes two steps: -//  1. delete the subchannel from the entry relation tree, i.e. delete the subchannel reference from -//     its parent's child list. -//  2. delete the subchannel from the map, i.e. delete the subchannel entirely from channelz. Lookup -//     by id will return entry not found error. -func (sc *subChannel) deleteSelfIfReady() { -	if !sc.deleteSelfFromTree() { -		return -	} -	if !sc.deleteSelfFromMap() { -		return -	} -	sc.cm.deleteEntry(sc.id) -	sc.trace.clear() -} - -func (sc *subChannel) getChannelTrace() *channelTrace { -	return sc.trace -} - -func (sc *subChannel) incrTraceRefCount() { -	atomic.AddInt32(&sc.traceRefCount, 1) -} - -func (sc *subChannel) decrTraceRefCount() { -	atomic.AddInt32(&sc.traceRefCount, -1) -} - -func (sc *subChannel) getTraceRefCount() int { -	i := atomic.LoadInt32(&sc.traceRefCount) -	return int(i) -} - -func (sc *subChannel) getRefName() string { -	return sc.refName -} - -// SocketMetric defines the info channelz provides for a specific Socket, which -// includes SocketInternalMetric and channelz-specific data, such as channelz id, etc. -type SocketMetric struct { -	// ID is the channelz id of this socket. -	ID int64 -	// RefName is the human readable reference string of this socket. -	RefName string -	// SocketData contains socket internal metric reported by the socket through -	// ChannelzMetric(). -	SocketData *SocketInternalMetric -} - -// SocketInternalMetric defines the struct that the implementor of Socket interface -// should return from ChannelzMetric(). -type SocketInternalMetric struct { -	// The number of streams that have been started. -	StreamsStarted int64 -	// The number of streams that have ended successfully: -	// On client side, receiving frame with eos bit set. -	// On server side, sending frame with eos bit set. -	StreamsSucceeded int64 -	// The number of streams that have ended unsuccessfully: -	// On client side, termination without receiving frame with eos bit set. -	// On server side, termination without sending frame with eos bit set. -	StreamsFailed int64 -	// The number of messages successfully sent on this socket. -	MessagesSent     int64 -	MessagesReceived int64 -	// The number of keep alives sent.  This is typically implemented with HTTP/2 -	// ping messages. -	KeepAlivesSent int64 -	// The last time a stream was created by this endpoint.  Usually unset for -	// servers. -	LastLocalStreamCreatedTimestamp time.Time -	// The last time a stream was created by the remote endpoint.  Usually unset -	// for clients. -	LastRemoteStreamCreatedTimestamp time.Time -	// The last time a message was sent by this endpoint. -	LastMessageSentTimestamp time.Time -	// The last time a message was received by this endpoint. -	LastMessageReceivedTimestamp time.Time -	// The amount of window, granted to the local endpoint by the remote endpoint. -	// This may be slightly out of date due to network latency.  This does NOT -	// include stream level or TCP level flow control info. -	LocalFlowControlWindow int64 -	// The amount of window, granted to the remote endpoint by the local endpoint. -	// This may be slightly out of date due to network latency.  This does NOT -	// include stream level or TCP level flow control info. -	RemoteFlowControlWindow int64 -	// The locally bound address. -	LocalAddr net.Addr -	// The remote bound address.  May be absent. -	RemoteAddr net.Addr -	// Optional, represents the name of the remote endpoint, if different than -	// the original target name. -	RemoteName    string -	SocketOptions *SocketOptionData -	Security      credentials.ChannelzSecurityValue -} - -// Socket is the interface that should be satisfied in order to be tracked by -// channelz as Socket. -type Socket interface { -	ChannelzMetric() *SocketInternalMetric -} - -type listenSocket struct { -	refName string -	s       Socket -	id      int64 -	pid     int64 -	cm      *channelMap -} - -func (ls *listenSocket) addChild(id int64, e entry) { -	logger.Errorf("cannot add a child (id = %d) of type %T to a listen socket", id, e) -} - -func (ls *listenSocket) deleteChild(id int64) { -	logger.Errorf("cannot delete a child (id = %d) from a listen socket", id) -} - -func (ls *listenSocket) triggerDelete() { -	ls.cm.deleteEntry(ls.id) -	ls.cm.findEntry(ls.pid).deleteChild(ls.id) -} - -func (ls *listenSocket) deleteSelfIfReady() { -	logger.Errorf("cannot call deleteSelfIfReady on a listen socket") -} - -func (ls *listenSocket) getParentID() int64 { -	return ls.pid -} - -type normalSocket struct { -	refName string -	s       Socket -	id      int64 -	pid     int64 -	cm      *channelMap -} - -func (ns *normalSocket) addChild(id int64, e entry) { -	logger.Errorf("cannot add a child (id = %d) of type %T to a normal socket", id, e) -} - -func (ns *normalSocket) deleteChild(id int64) { -	logger.Errorf("cannot delete a child (id = %d) from a normal socket", id) -} - -func (ns *normalSocket) triggerDelete() { -	ns.cm.deleteEntry(ns.id) -	ns.cm.findEntry(ns.pid).deleteChild(ns.id) -} - -func (ns *normalSocket) deleteSelfIfReady() { -	logger.Errorf("cannot call deleteSelfIfReady on a normal socket") -} - -func (ns *normalSocket) getParentID() int64 { -	return ns.pid -} - -// ServerMetric defines the info channelz provides for a specific Server, which -// includes ServerInternalMetric and channelz-specific data, such as channelz id, -// child list, etc. -type ServerMetric struct { -	// ID is the channelz id of this server. -	ID int64 -	// RefName is the human readable reference string of this server. -	RefName string -	// ServerData contains server internal metric reported by the server through -	// ChannelzMetric(). -	ServerData *ServerInternalMetric -	// ListenSockets tracks the listener socket type children of this server in the -	// format of a map from socket channelz id to corresponding reference string. -	ListenSockets map[int64]string -} - -// ServerInternalMetric defines the struct that the implementor of Server interface -// should return from ChannelzMetric(). -type ServerInternalMetric struct { -	// The number of incoming calls started on the server. -	CallsStarted int64 -	// The number of incoming calls that have completed with an OK status. -	CallsSucceeded int64 -	// The number of incoming calls that have a completed with a non-OK status. -	CallsFailed int64 -	// The last time a call was started on the server. -	LastCallStartedTimestamp time.Time -} - -// Server is the interface to be satisfied in order to be tracked by channelz as -// Server. -type Server interface { -	ChannelzMetric() *ServerInternalMetric -} - -type server struct { -	refName       string -	s             Server -	closeCalled   bool -	sockets       map[int64]string -	listenSockets map[int64]string -	id            int64 -	cm            *channelMap -} - -func (s *server) addChild(id int64, e entry) { -	switch v := e.(type) { -	case *normalSocket: -		s.sockets[id] = v.refName -	case *listenSocket: -		s.listenSockets[id] = v.refName -	default: -		logger.Errorf("cannot add a child (id = %d) of type %T to a server", id, e) -	} -} - -func (s *server) deleteChild(id int64) { -	delete(s.sockets, id) -	delete(s.listenSockets, id) -	s.deleteSelfIfReady() -} - -func (s *server) triggerDelete() { -	s.closeCalled = true -	s.deleteSelfIfReady() -} - -func (s *server) deleteSelfIfReady() { -	if !s.closeCalled || len(s.sockets)+len(s.listenSockets) != 0 { -		return -	} -	s.cm.deleteEntry(s.id) -} - -func (s *server) getParentID() int64 { -	return 0 -} - -type tracedChannel interface { -	getChannelTrace() *channelTrace -	incrTraceRefCount() -	decrTraceRefCount() -	getRefName() string -} - -type channelTrace struct { -	cm          *channelMap -	clearCalled bool -	createdTime time.Time -	eventCount  int64 -	mu          sync.Mutex -	events      []*TraceEvent -} - -func (c *channelTrace) append(e *TraceEvent) { -	c.mu.Lock() -	if len(c.events) == getMaxTraceEntry() { -		del := c.events[0] -		c.events = c.events[1:] -		if del.RefID != 0 { -			// start recursive cleanup in a goroutine to not block the call originated from grpc. -			go func() { -				// need to acquire c.cm.mu lock to call the unlocked attemptCleanup func. -				c.cm.mu.Lock() -				c.cm.decrTraceRefCount(del.RefID) -				c.cm.mu.Unlock() -			}() -		} -	} -	e.Timestamp = time.Now() -	c.events = append(c.events, e) -	c.eventCount++ -	c.mu.Unlock() -} - -func (c *channelTrace) clear() { -	if c.clearCalled { -		return -	} -	c.clearCalled = true -	c.mu.Lock() -	for _, e := range c.events { -		if e.RefID != 0 { -			// caller should have already held the c.cm.mu lock. -			c.cm.decrTraceRefCount(e.RefID) -		} -	} -	c.mu.Unlock() -} - -// Severity is the severity level of a trace event. -// The canonical enumeration of all valid values is here: -// https://github.com/grpc/grpc-proto/blob/9b13d199cc0d4703c7ea26c9c330ba695866eb23/grpc/channelz/v1/channelz.proto#L126. -type Severity int - -const ( -	// CtUnknown indicates unknown severity of a trace event. -	CtUnknown Severity = iota -	// CtInfo indicates info level severity of a trace event. -	CtInfo -	// CtWarning indicates warning level severity of a trace event. -	CtWarning -	// CtError indicates error level severity of a trace event. -	CtError -) - -// RefChannelType is the type of the entity being referenced in a trace event. -type RefChannelType int - -const ( -	// RefUnknown indicates an unknown entity type, the zero value for this type. -	RefUnknown RefChannelType = iota -	// RefChannel indicates the referenced entity is a Channel. -	RefChannel -	// RefSubChannel indicates the referenced entity is a SubChannel. -	RefSubChannel -	// RefServer indicates the referenced entity is a Server. -	RefServer -	// RefListenSocket indicates the referenced entity is a ListenSocket. -	RefListenSocket -	// RefNormalSocket indicates the referenced entity is a NormalSocket. -	RefNormalSocket -) - -var refChannelTypeToString = map[RefChannelType]string{ -	RefUnknown:      "Unknown", -	RefChannel:      "Channel", -	RefSubChannel:   "SubChannel", -	RefServer:       "Server", -	RefListenSocket: "ListenSocket", -	RefNormalSocket: "NormalSocket", -} - -func (r RefChannelType) String() string { -	return refChannelTypeToString[r] -} - -func (c *channelTrace) dumpData() *ChannelTrace { -	c.mu.Lock() -	ct := &ChannelTrace{EventNum: c.eventCount, CreationTime: c.createdTime} -	ct.Events = c.events[:len(c.events)] -	c.mu.Unlock() -	return ct -} diff --git a/vendor/google.golang.org/grpc/internal/channelz/util_nonlinux.go b/vendor/google.golang.org/grpc/internal/channelz/util_nonlinux.go deleted file mode 100644 index b5568b22e..000000000 --- a/vendor/google.golang.org/grpc/internal/channelz/util_nonlinux.go +++ /dev/null @@ -1,27 +0,0 @@ -//go:build !linux -// +build !linux - -/* - * - * Copyright 2018 gRPC authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - *     http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -package channelz - -// GetSocketOption gets the socket option info of the conn. -func GetSocketOption(c any) *SocketOptionData { -	return nil -} diff --git a/vendor/google.golang.org/grpc/internal/grpcrand/grpcrand.go b/vendor/google.golang.org/grpc/internal/grpcrand/grpcrand.go index aa97273e7..0126d6b51 100644 --- a/vendor/google.golang.org/grpc/internal/grpcrand/grpcrand.go +++ b/vendor/google.golang.org/grpc/internal/grpcrand/grpcrand.go @@ -1,3 +1,8 @@ +//go:build !go1.21 + +// TODO: when this file is deleted (after Go 1.20 support is dropped), delete +// all of grpcrand and call the rand package directly. +  /*   *   * Copyright 2018 gRPC authors. diff --git a/vendor/google.golang.org/grpc/internal/grpcrand/grpcrand_go1.21.go b/vendor/google.golang.org/grpc/internal/grpcrand/grpcrand_go1.21.go new file mode 100644 index 000000000..c37299af1 --- /dev/null +++ b/vendor/google.golang.org/grpc/internal/grpcrand/grpcrand_go1.21.go @@ -0,0 +1,73 @@ +//go:build go1.21 + +/* + * + * Copyright 2024 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + *     http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +// Package grpcrand implements math/rand functions in a concurrent-safe way +// with a global random source, independent of math/rand's global source. +package grpcrand + +import "math/rand" + +// This implementation will be used for Go version 1.21 or newer. +// For older versions, the original implementation with mutex will be used. + +// Int implements rand.Int on the grpcrand global source. +func Int() int { +	return rand.Int() +} + +// Int63n implements rand.Int63n on the grpcrand global source. +func Int63n(n int64) int64 { +	return rand.Int63n(n) +} + +// Intn implements rand.Intn on the grpcrand global source. +func Intn(n int) int { +	return rand.Intn(n) +} + +// Int31n implements rand.Int31n on the grpcrand global source. +func Int31n(n int32) int32 { +	return rand.Int31n(n) +} + +// Float64 implements rand.Float64 on the grpcrand global source. +func Float64() float64 { +	return rand.Float64() +} + +// Uint64 implements rand.Uint64 on the grpcrand global source. +func Uint64() uint64 { +	return rand.Uint64() +} + +// Uint32 implements rand.Uint32 on the grpcrand global source. +func Uint32() uint32 { +	return rand.Uint32() +} + +// ExpFloat64 implements rand.ExpFloat64 on the grpcrand global source. +func ExpFloat64() float64 { +	return rand.ExpFloat64() +} + +// Shuffle implements rand.Shuffle on the grpcrand global source. +var Shuffle = func(n int, f func(int, int)) { +	rand.Shuffle(n, f) +} diff --git a/vendor/google.golang.org/grpc/internal/internal.go b/vendor/google.golang.org/grpc/internal/internal.go index 6c7ea6a53..48d24bdb4 100644 --- a/vendor/google.golang.org/grpc/internal/internal.go +++ b/vendor/google.golang.org/grpc/internal/internal.go @@ -190,12 +190,16 @@ var (  	// function makes events more predictable than relying on timer events.  	TriggerXDSResourceNameNotFoundForTesting any // func(func(xdsresource.Type, string), string, string) error -	// TriggerXDSResourceNotFoundClient invokes the testing xDS Client singleton -	// to invoke resource not found for a resource type name and resource name. +	// TriggerXDSResourceNameNotFoundClient invokes the testing xDS Client +	// singleton to invoke resource not found for a resource type name and +	// resource name.  	TriggerXDSResourceNameNotFoundClient any // func(string, string) error  	// FromOutgoingContextRaw returns the un-merged, intermediary contents of metadata.rawMD.  	FromOutgoingContextRaw any // func(context.Context) (metadata.MD, [][]string, bool) + +	// UserSetDefaultScheme is set to true if the user has overridden the default resolver scheme. +	UserSetDefaultScheme bool = false  )  // HealthChecker defines the signature of the client-side LB channel health checking function. diff --git a/vendor/google.golang.org/grpc/internal/pretty/pretty.go b/vendor/google.golang.org/grpc/internal/pretty/pretty.go index 703319137..dbee7a60d 100644 --- a/vendor/google.golang.org/grpc/internal/pretty/pretty.go +++ b/vendor/google.golang.org/grpc/internal/pretty/pretty.go @@ -24,10 +24,8 @@ import (  	"encoding/json"  	"fmt" -	"github.com/golang/protobuf/jsonpb" -	protov1 "github.com/golang/protobuf/proto"  	"google.golang.org/protobuf/encoding/protojson" -	protov2 "google.golang.org/protobuf/proto" +	"google.golang.org/protobuf/protoadapt"  )  const jsonIndent = "  " @@ -36,21 +34,14 @@ const jsonIndent = "  "  //  // If marshal fails, it falls back to fmt.Sprintf("%+v").  func ToJSON(e any) string { -	switch ee := e.(type) { -	case protov1.Message: -		mm := jsonpb.Marshaler{Indent: jsonIndent} -		ret, err := mm.MarshalToString(ee) -		if err != nil { -			// This may fail for proto.Anys, e.g. for xDS v2, LDS, the v2 -			// messages are not imported, and this will fail because the message -			// is not found. -			return fmt.Sprintf("%+v", ee) -		} -		return ret -	case protov2.Message: +	if ee, ok := e.(protoadapt.MessageV1); ok { +		e = protoadapt.MessageV2Of(ee) +	} + +	if ee, ok := e.(protoadapt.MessageV2); ok {  		mm := protojson.MarshalOptions{ -			Multiline: true,  			Indent:    jsonIndent, +			Multiline: true,  		}  		ret, err := mm.Marshal(ee)  		if err != nil { @@ -60,13 +51,13 @@ func ToJSON(e any) string {  			return fmt.Sprintf("%+v", ee)  		}  		return string(ret) -	default: -		ret, err := json.MarshalIndent(ee, "", jsonIndent) -		if err != nil { -			return fmt.Sprintf("%+v", ee) -		} -		return string(ret)  	} + +	ret, err := json.MarshalIndent(e, "", jsonIndent) +	if err != nil { +		return fmt.Sprintf("%+v", e) +	} +	return string(ret)  }  // FormatJSON formats the input json bytes with indentation. diff --git a/vendor/google.golang.org/grpc/internal/resolver/dns/dns_resolver.go b/vendor/google.golang.org/grpc/internal/resolver/dns/dns_resolver.go index b66dcb213..abab35e25 100644 --- a/vendor/google.golang.org/grpc/internal/resolver/dns/dns_resolver.go +++ b/vendor/google.golang.org/grpc/internal/resolver/dns/dns_resolver.go @@ -45,6 +45,13 @@ import (  // addresses from SRV records.  Must not be changed after init time.  var EnableSRVLookups = false +// ResolvingTimeout specifies the maximum duration for a DNS resolution request. +// If the timeout expires before a response is received, the request will be canceled. +// +// It is recommended to set this value at application startup. Avoid modifying this variable +// after initialization as it's not thread-safe for concurrent modification. +var ResolvingTimeout = 30 * time.Second +  var logger = grpclog.Component("dns")  func init() { @@ -221,18 +228,18 @@ func (d *dnsResolver) watcher() {  	}  } -func (d *dnsResolver) lookupSRV() ([]resolver.Address, error) { +func (d *dnsResolver) lookupSRV(ctx context.Context) ([]resolver.Address, error) {  	if !EnableSRVLookups {  		return nil, nil  	}  	var newAddrs []resolver.Address -	_, srvs, err := d.resolver.LookupSRV(d.ctx, "grpclb", "tcp", d.host) +	_, srvs, err := d.resolver.LookupSRV(ctx, "grpclb", "tcp", d.host)  	if err != nil {  		err = handleDNSError(err, "SRV") // may become nil  		return nil, err  	}  	for _, s := range srvs { -		lbAddrs, err := d.resolver.LookupHost(d.ctx, s.Target) +		lbAddrs, err := d.resolver.LookupHost(ctx, s.Target)  		if err != nil {  			err = handleDNSError(err, "A") // may become nil  			if err == nil { @@ -269,8 +276,8 @@ func handleDNSError(err error, lookupType string) error {  	return err  } -func (d *dnsResolver) lookupTXT() *serviceconfig.ParseResult { -	ss, err := d.resolver.LookupTXT(d.ctx, txtPrefix+d.host) +func (d *dnsResolver) lookupTXT(ctx context.Context) *serviceconfig.ParseResult { +	ss, err := d.resolver.LookupTXT(ctx, txtPrefix+d.host)  	if err != nil {  		if envconfig.TXTErrIgnore {  			return nil @@ -297,8 +304,8 @@ func (d *dnsResolver) lookupTXT() *serviceconfig.ParseResult {  	return d.cc.ParseServiceConfig(sc)  } -func (d *dnsResolver) lookupHost() ([]resolver.Address, error) { -	addrs, err := d.resolver.LookupHost(d.ctx, d.host) +func (d *dnsResolver) lookupHost(ctx context.Context) ([]resolver.Address, error) { +	addrs, err := d.resolver.LookupHost(ctx, d.host)  	if err != nil {  		err = handleDNSError(err, "A")  		return nil, err @@ -316,8 +323,10 @@ func (d *dnsResolver) lookupHost() ([]resolver.Address, error) {  }  func (d *dnsResolver) lookup() (*resolver.State, error) { -	srv, srvErr := d.lookupSRV() -	addrs, hostErr := d.lookupHost() +	ctx, cancel := context.WithTimeout(d.ctx, ResolvingTimeout) +	defer cancel() +	srv, srvErr := d.lookupSRV(ctx) +	addrs, hostErr := d.lookupHost(ctx)  	if hostErr != nil && (srvErr != nil || len(srv) == 0) {  		return nil, hostErr  	} @@ -327,7 +336,7 @@ func (d *dnsResolver) lookup() (*resolver.State, error) {  		state = grpclbstate.Set(state, &grpclbstate.State{BalancerAddresses: srv})  	}  	if !d.disableServiceConfig { -		state.ServiceConfig = d.lookupTXT() +		state.ServiceConfig = d.lookupTXT(ctx)  	}  	return &state, nil  } diff --git a/vendor/google.golang.org/grpc/internal/status/status.go b/vendor/google.golang.org/grpc/internal/status/status.go index 03ef2fedd..c7dbc8205 100644 --- a/vendor/google.golang.org/grpc/internal/status/status.go +++ b/vendor/google.golang.org/grpc/internal/status/status.go @@ -31,10 +31,11 @@ import (  	"errors"  	"fmt" -	"github.com/golang/protobuf/proto" -	"github.com/golang/protobuf/ptypes"  	spb "google.golang.org/genproto/googleapis/rpc/status"  	"google.golang.org/grpc/codes" +	"google.golang.org/protobuf/proto" +	"google.golang.org/protobuf/protoadapt" +	"google.golang.org/protobuf/types/known/anypb"  )  // Status represents an RPC status code, message, and details.  It is immutable @@ -130,14 +131,14 @@ func (s *Status) Err() error {  // WithDetails returns a new status with the provided details messages appended to the status.  // If any errors are encountered, it returns nil and the first error encountered. -func (s *Status) WithDetails(details ...proto.Message) (*Status, error) { +func (s *Status) WithDetails(details ...protoadapt.MessageV1) (*Status, error) {  	if s.Code() == codes.OK {  		return nil, errors.New("no error details for status with code OK")  	}  	// s.Code() != OK implies that s.Proto() != nil.  	p := s.Proto()  	for _, detail := range details { -		any, err := ptypes.MarshalAny(detail) +		any, err := anypb.New(protoadapt.MessageV2Of(detail))  		if err != nil {  			return nil, err  		} @@ -154,12 +155,12 @@ func (s *Status) Details() []any {  	}  	details := make([]any, 0, len(s.s.Details))  	for _, any := range s.s.Details { -		detail := &ptypes.DynamicAny{} -		if err := ptypes.UnmarshalAny(any, detail); err != nil { +		detail, err := any.UnmarshalNew() +		if err != nil {  			details = append(details, err)  			continue  		} -		details = append(details, detail.Message) +		details = append(details, detail)  	}  	return details  } diff --git a/vendor/google.golang.org/grpc/internal/transport/handler_server.go b/vendor/google.golang.org/grpc/internal/transport/handler_server.go index a9d70e2a1..4a3ddce29 100644 --- a/vendor/google.golang.org/grpc/internal/transport/handler_server.go +++ b/vendor/google.golang.org/grpc/internal/transport/handler_server.go @@ -35,7 +35,6 @@ import (  	"sync"  	"time" -	"github.com/golang/protobuf/proto"  	"golang.org/x/net/http2"  	"google.golang.org/grpc/codes"  	"google.golang.org/grpc/credentials" @@ -45,20 +44,17 @@ import (  	"google.golang.org/grpc/peer"  	"google.golang.org/grpc/stats"  	"google.golang.org/grpc/status" +	"google.golang.org/protobuf/proto"  )  // NewServerHandlerTransport returns a ServerTransport handling gRPC from  // inside an http.Handler, or writes an HTTP error to w and returns an error.  // It requires that the http Server supports HTTP/2.  func NewServerHandlerTransport(w http.ResponseWriter, r *http.Request, stats []stats.Handler) (ServerTransport, error) { -	if r.ProtoMajor != 2 { -		msg := "gRPC requires HTTP/2" -		http.Error(w, msg, http.StatusBadRequest) -		return nil, errors.New(msg) -	} -	if r.Method != "POST" { +	if r.Method != http.MethodPost { +		w.Header().Set("Allow", http.MethodPost)  		msg := fmt.Sprintf("invalid gRPC request method %q", r.Method) -		http.Error(w, msg, http.StatusBadRequest) +		http.Error(w, msg, http.StatusMethodNotAllowed)  		return nil, errors.New(msg)  	}  	contentType := r.Header.Get("Content-Type") @@ -69,6 +65,11 @@ func NewServerHandlerTransport(w http.ResponseWriter, r *http.Request, stats []s  		http.Error(w, msg, http.StatusUnsupportedMediaType)  		return nil, errors.New(msg)  	} +	if r.ProtoMajor != 2 { +		msg := "gRPC requires HTTP/2" +		http.Error(w, msg, http.StatusHTTPVersionNotSupported) +		return nil, errors.New(msg) +	}  	if _, ok := w.(http.Flusher); !ok {  		msg := "gRPC requires a ResponseWriter supporting http.Flusher"  		http.Error(w, msg, http.StatusInternalServerError) diff --git a/vendor/google.golang.org/grpc/internal/transport/http2_client.go b/vendor/google.golang.org/grpc/internal/transport/http2_client.go index eff879964..deba0c4d9 100644 --- a/vendor/google.golang.org/grpc/internal/transport/http2_client.go +++ b/vendor/google.golang.org/grpc/internal/transport/http2_client.go @@ -140,9 +140,7 @@ type http2Client struct {  	// variable.  	kpDormant bool -	// Fields below are for channelz metric collection. -	channelzID *channelz.Identifier -	czData     *channelzData +	channelz *channelz.Socket  	onClose func(GoAwayReason) @@ -319,6 +317,7 @@ func newHTTP2Client(connectCtx, ctx context.Context, addr resolver.Address, opts  	if opts.MaxHeaderListSize != nil {  		maxHeaderListSize = *opts.MaxHeaderListSize  	} +  	t := &http2Client{  		ctx:                   ctx,  		ctxDone:               ctx.Done(), // Cache Done chan. @@ -346,11 +345,25 @@ func newHTTP2Client(connectCtx, ctx context.Context, addr resolver.Address, opts  		maxConcurrentStreams:  defaultMaxStreamsClient,  		streamQuota:           defaultMaxStreamsClient,  		streamsQuotaAvailable: make(chan struct{}, 1), -		czData:                new(channelzData),  		keepaliveEnabled:      keepaliveEnabled,  		bufferPool:            newBufferPool(),  		onClose:               onClose,  	} +	var czSecurity credentials.ChannelzSecurityValue +	if au, ok := authInfo.(credentials.ChannelzSecurityInfo); ok { +		czSecurity = au.GetSecurityValue() +	} +	t.channelz = channelz.RegisterSocket( +		&channelz.Socket{ +			SocketType:       channelz.SocketTypeNormal, +			Parent:           opts.ChannelzParent, +			SocketMetrics:    channelz.SocketMetrics{}, +			EphemeralMetrics: t.socketMetrics, +			LocalAddr:        t.localAddr, +			RemoteAddr:       t.remoteAddr, +			SocketOptions:    channelz.GetSocketOption(t.conn), +			Security:         czSecurity, +		})  	t.logger = prefixLoggerForClientTransport(t)  	// Add peer information to the http2client context.  	t.ctx = peer.NewContext(t.ctx, t.getPeer()) @@ -381,10 +394,6 @@ func newHTTP2Client(connectCtx, ctx context.Context, addr resolver.Address, opts  		}  		sh.HandleConn(t.ctx, connBegin)  	} -	t.channelzID, err = channelz.RegisterNormalSocket(t, opts.ChannelzParentID, fmt.Sprintf("%s -> %s", t.localAddr, t.remoteAddr)) -	if err != nil { -		return nil, err -	}  	if t.keepaliveEnabled {  		t.kpDormancyCond = sync.NewCond(&t.mu)  		go t.keepalive() @@ -756,8 +765,8 @@ func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (*Stream,  				return ErrConnClosing  			}  			if channelz.IsOn() { -				atomic.AddInt64(&t.czData.streamsStarted, 1) -				atomic.StoreInt64(&t.czData.lastStreamCreatedTime, time.Now().UnixNano()) +				t.channelz.SocketMetrics.StreamsStarted.Add(1) +				t.channelz.SocketMetrics.LastLocalStreamCreatedTimestamp.Store(time.Now().UnixNano())  			}  			// If the keepalive goroutine has gone dormant, wake it up.  			if t.kpDormant { @@ -928,9 +937,9 @@ func (t *http2Client) closeStream(s *Stream, err error, rst bool, rstCode http2.  			t.mu.Unlock()  			if channelz.IsOn() {  				if eosReceived { -					atomic.AddInt64(&t.czData.streamsSucceeded, 1) +					t.channelz.SocketMetrics.StreamsSucceeded.Add(1)  				} else { -					atomic.AddInt64(&t.czData.streamsFailed, 1) +					t.channelz.SocketMetrics.StreamsFailed.Add(1)  				}  			}  		}, @@ -985,7 +994,7 @@ func (t *http2Client) Close(err error) {  	t.controlBuf.finish()  	t.cancel()  	t.conn.Close() -	channelz.RemoveEntry(t.channelzID) +	channelz.RemoveEntry(t.channelz.ID)  	// Append info about previous goaways if there were any, since this may be important  	// for understanding the root cause for this connection to be closed.  	_, goAwayDebugMessage := t.GetGoAwayReason() @@ -1708,7 +1717,7 @@ func (t *http2Client) keepalive() {  			// keepalive timer expired. In both cases, we need to send a ping.  			if !outstandingPing {  				if channelz.IsOn() { -					atomic.AddInt64(&t.czData.kpCount, 1) +					t.channelz.SocketMetrics.KeepAlivesSent.Add(1)  				}  				t.controlBuf.put(p)  				timeoutLeft = t.kp.Timeout @@ -1738,40 +1747,23 @@ func (t *http2Client) GoAway() <-chan struct{} {  	return t.goAway  } -func (t *http2Client) ChannelzMetric() *channelz.SocketInternalMetric { -	s := channelz.SocketInternalMetric{ -		StreamsStarted:                  atomic.LoadInt64(&t.czData.streamsStarted), -		StreamsSucceeded:                atomic.LoadInt64(&t.czData.streamsSucceeded), -		StreamsFailed:                   atomic.LoadInt64(&t.czData.streamsFailed), -		MessagesSent:                    atomic.LoadInt64(&t.czData.msgSent), -		MessagesReceived:                atomic.LoadInt64(&t.czData.msgRecv), -		KeepAlivesSent:                  atomic.LoadInt64(&t.czData.kpCount), -		LastLocalStreamCreatedTimestamp: time.Unix(0, atomic.LoadInt64(&t.czData.lastStreamCreatedTime)), -		LastMessageSentTimestamp:        time.Unix(0, atomic.LoadInt64(&t.czData.lastMsgSentTime)), -		LastMessageReceivedTimestamp:    time.Unix(0, atomic.LoadInt64(&t.czData.lastMsgRecvTime)), -		LocalFlowControlWindow:          int64(t.fc.getSize()), -		SocketOptions:                   channelz.GetSocketOption(t.conn), -		LocalAddr:                       t.localAddr, -		RemoteAddr:                      t.remoteAddr, -		// RemoteName : -	} -	if au, ok := t.authInfo.(credentials.ChannelzSecurityInfo); ok { -		s.Security = au.GetSecurityValue() -	} -	s.RemoteFlowControlWindow = t.getOutFlowWindow() -	return &s +func (t *http2Client) socketMetrics() *channelz.EphemeralSocketMetrics { +	return &channelz.EphemeralSocketMetrics{ +		LocalFlowControlWindow:  int64(t.fc.getSize()), +		RemoteFlowControlWindow: t.getOutFlowWindow(), +	}  }  func (t *http2Client) RemoteAddr() net.Addr { return t.remoteAddr }  func (t *http2Client) IncrMsgSent() { -	atomic.AddInt64(&t.czData.msgSent, 1) -	atomic.StoreInt64(&t.czData.lastMsgSentTime, time.Now().UnixNano()) +	t.channelz.SocketMetrics.MessagesSent.Add(1) +	t.channelz.SocketMetrics.LastMessageSentTimestamp.Store(time.Now().UnixNano())  }  func (t *http2Client) IncrMsgRecv() { -	atomic.AddInt64(&t.czData.msgRecv, 1) -	atomic.StoreInt64(&t.czData.lastMsgRecvTime, time.Now().UnixNano()) +	t.channelz.SocketMetrics.MessagesReceived.Add(1) +	t.channelz.SocketMetrics.LastMessageReceivedTimestamp.Store(time.Now().UnixNano())  }  func (t *http2Client) getOutFlowWindow() int64 { diff --git a/vendor/google.golang.org/grpc/internal/transport/http2_server.go b/vendor/google.golang.org/grpc/internal/transport/http2_server.go index a206e2eef..d582e0471 100644 --- a/vendor/google.golang.org/grpc/internal/transport/http2_server.go +++ b/vendor/google.golang.org/grpc/internal/transport/http2_server.go @@ -32,13 +32,13 @@ import (  	"sync/atomic"  	"time" -	"github.com/golang/protobuf/proto"  	"golang.org/x/net/http2"  	"golang.org/x/net/http2/hpack"  	"google.golang.org/grpc/internal/grpclog"  	"google.golang.org/grpc/internal/grpcutil"  	"google.golang.org/grpc/internal/pretty"  	"google.golang.org/grpc/internal/syscall" +	"google.golang.org/protobuf/proto"  	"google.golang.org/grpc/codes"  	"google.golang.org/grpc/credentials" @@ -118,8 +118,7 @@ type http2Server struct {  	idle time.Time  	// Fields below are for channelz metric collection. -	channelzID *channelz.Identifier -	czData     *channelzData +	channelz   *channelz.Socket  	bufferPool *bufferPool  	connectionID uint64 @@ -262,9 +261,24 @@ func NewServerTransport(conn net.Conn, config *ServerConfig) (_ ServerTransport,  		idle:              time.Now(),  		kep:               kep,  		initialWindowSize: iwz, -		czData:            new(channelzData),  		bufferPool:        newBufferPool(),  	} +	var czSecurity credentials.ChannelzSecurityValue +	if au, ok := authInfo.(credentials.ChannelzSecurityInfo); ok { +		czSecurity = au.GetSecurityValue() +	} +	t.channelz = channelz.RegisterSocket( +		&channelz.Socket{ +			SocketType:       channelz.SocketTypeNormal, +			Parent:           config.ChannelzParent, +			SocketMetrics:    channelz.SocketMetrics{}, +			EphemeralMetrics: t.socketMetrics, +			LocalAddr:        t.peer.LocalAddr, +			RemoteAddr:       t.peer.Addr, +			SocketOptions:    channelz.GetSocketOption(t.conn), +			Security:         czSecurity, +		}, +	)  	t.logger = prefixLoggerForServerTransport(t)  	t.controlBuf = newControlBuffer(t.done) @@ -274,10 +288,6 @@ func NewServerTransport(conn net.Conn, config *ServerConfig) (_ ServerTransport,  			updateFlowControl: t.updateFlowControl,  		}  	} -	t.channelzID, err = channelz.RegisterNormalSocket(t, config.ChannelzParentID, fmt.Sprintf("%s -> %s", t.peer.Addr, t.peer.LocalAddr)) -	if err != nil { -		return nil, err -	}  	t.connectionID = atomic.AddUint64(&serverConnectionCounter, 1)  	t.framer.writer.Flush() @@ -334,9 +344,11 @@ func NewServerTransport(conn net.Conn, config *ServerConfig) (_ ServerTransport,  			// closed, would lead to a TCP RST instead of FIN, and the client  			// encountering errors.  For more info:  			// https://github.com/grpc/grpc-go/issues/5358 +			timer := time.NewTimer(time.Second) +			defer timer.Stop()  			select {  			case <-t.readerDone: -			case <-time.After(time.Second): +			case <-timer.C:  			}  			t.conn.Close()  		} @@ -592,8 +604,8 @@ func (t *http2Server) operateHeaders(ctx context.Context, frame *http2.MetaHeade  	}  	t.mu.Unlock()  	if channelz.IsOn() { -		atomic.AddInt64(&t.czData.streamsStarted, 1) -		atomic.StoreInt64(&t.czData.lastStreamCreatedTime, time.Now().UnixNano()) +		t.channelz.SocketMetrics.StreamsStarted.Add(1) +		t.channelz.SocketMetrics.LastRemoteStreamCreatedTimestamp.Store(time.Now().UnixNano())  	}  	s.requestRead = func(n int) {  		t.adjustWindow(s, uint32(n)) @@ -652,18 +664,20 @@ func (t *http2Server) HandleStreams(ctx context.Context, handle func(*Stream)) {  				}  				continue  			} -			if err == io.EOF || err == io.ErrUnexpectedEOF { -				t.Close(err) -				return -			}  			t.Close(err)  			return  		}  		switch frame := frame.(type) {  		case *http2.MetaHeadersFrame:  			if err := t.operateHeaders(ctx, frame, handle); err != nil { -				t.Close(err) -				break +				// Any error processing client headers, e.g. invalid stream ID, +				// is considered a protocol violation. +				t.controlBuf.put(&goAway{ +					code:      http2.ErrCodeProtocol, +					debugData: []byte(err.Error()), +					closeConn: err, +				}) +				continue  			}  		case *http2.DataFrame:  			t.handleData(frame) @@ -1199,7 +1213,7 @@ func (t *http2Server) keepalive() {  			}  			if !outstandingPing {  				if channelz.IsOn() { -					atomic.AddInt64(&t.czData.kpCount, 1) +					t.channelz.SocketMetrics.KeepAlivesSent.Add(1)  				}  				t.controlBuf.put(p)  				kpTimeoutLeft = t.kp.Timeout @@ -1239,7 +1253,7 @@ func (t *http2Server) Close(err error) {  	if err := t.conn.Close(); err != nil && t.logger.V(logLevel) {  		t.logger.Infof("Error closing underlying net.Conn during Close: %v", err)  	} -	channelz.RemoveEntry(t.channelzID) +	channelz.RemoveEntry(t.channelz.ID)  	// Cancel all active streams.  	for _, s := range streams {  		s.cancel() @@ -1260,9 +1274,9 @@ func (t *http2Server) deleteStream(s *Stream, eosReceived bool) {  	if channelz.IsOn() {  		if eosReceived { -			atomic.AddInt64(&t.czData.streamsSucceeded, 1) +			t.channelz.SocketMetrics.StreamsSucceeded.Add(1)  		} else { -			atomic.AddInt64(&t.czData.streamsFailed, 1) +			t.channelz.SocketMetrics.StreamsFailed.Add(1)  		}  	}  } @@ -1379,38 +1393,21 @@ func (t *http2Server) outgoingGoAwayHandler(g *goAway) (bool, error) {  	return false, nil  } -func (t *http2Server) ChannelzMetric() *channelz.SocketInternalMetric { -	s := channelz.SocketInternalMetric{ -		StreamsStarted:                   atomic.LoadInt64(&t.czData.streamsStarted), -		StreamsSucceeded:                 atomic.LoadInt64(&t.czData.streamsSucceeded), -		StreamsFailed:                    atomic.LoadInt64(&t.czData.streamsFailed), -		MessagesSent:                     atomic.LoadInt64(&t.czData.msgSent), -		MessagesReceived:                 atomic.LoadInt64(&t.czData.msgRecv), -		KeepAlivesSent:                   atomic.LoadInt64(&t.czData.kpCount), -		LastRemoteStreamCreatedTimestamp: time.Unix(0, atomic.LoadInt64(&t.czData.lastStreamCreatedTime)), -		LastMessageSentTimestamp:         time.Unix(0, atomic.LoadInt64(&t.czData.lastMsgSentTime)), -		LastMessageReceivedTimestamp:     time.Unix(0, atomic.LoadInt64(&t.czData.lastMsgRecvTime)), -		LocalFlowControlWindow:           int64(t.fc.getSize()), -		SocketOptions:                    channelz.GetSocketOption(t.conn), -		LocalAddr:                        t.peer.LocalAddr, -		RemoteAddr:                       t.peer.Addr, -		// RemoteName : -	} -	if au, ok := t.peer.AuthInfo.(credentials.ChannelzSecurityInfo); ok { -		s.Security = au.GetSecurityValue() -	} -	s.RemoteFlowControlWindow = t.getOutFlowWindow() -	return &s +func (t *http2Server) socketMetrics() *channelz.EphemeralSocketMetrics { +	return &channelz.EphemeralSocketMetrics{ +		LocalFlowControlWindow:  int64(t.fc.getSize()), +		RemoteFlowControlWindow: t.getOutFlowWindow(), +	}  }  func (t *http2Server) IncrMsgSent() { -	atomic.AddInt64(&t.czData.msgSent, 1) -	atomic.StoreInt64(&t.czData.lastMsgSentTime, time.Now().UnixNano()) +	t.channelz.SocketMetrics.MessagesSent.Add(1) +	t.channelz.SocketMetrics.LastMessageSentTimestamp.Add(1)  }  func (t *http2Server) IncrMsgRecv() { -	atomic.AddInt64(&t.czData.msgRecv, 1) -	atomic.StoreInt64(&t.czData.lastMsgRecvTime, time.Now().UnixNano()) +	t.channelz.SocketMetrics.MessagesReceived.Add(1) +	t.channelz.SocketMetrics.LastMessageReceivedTimestamp.Add(1)  }  func (t *http2Server) getOutFlowWindow() int64 { diff --git a/vendor/google.golang.org/grpc/internal/transport/http_util.go b/vendor/google.golang.org/grpc/internal/transport/http_util.go index dc29d590e..39cef3bd4 100644 --- a/vendor/google.golang.org/grpc/internal/transport/http_util.go +++ b/vendor/google.golang.org/grpc/internal/transport/http_util.go @@ -418,10 +418,9 @@ func newFramer(conn net.Conn, writeBufferSize, readBufferSize int, sharedWriteBu  	return f  } -func getWriteBufferPool(writeBufferSize int) *sync.Pool { +func getWriteBufferPool(size int) *sync.Pool {  	writeBufferMutex.Lock()  	defer writeBufferMutex.Unlock() -	size := writeBufferSize * 2  	pool, ok := writeBufferPoolMap[size]  	if ok {  		return pool diff --git a/vendor/google.golang.org/grpc/internal/transport/transport.go b/vendor/google.golang.org/grpc/internal/transport/transport.go index b7b8fec18..0d2a6e47f 100644 --- a/vendor/google.golang.org/grpc/internal/transport/transport.go +++ b/vendor/google.golang.org/grpc/internal/transport/transport.go @@ -28,6 +28,7 @@ import (  	"fmt"  	"io"  	"net" +	"strings"  	"sync"  	"sync/atomic"  	"time" @@ -362,8 +363,12 @@ func (s *Stream) SendCompress() string {  // ClientAdvertisedCompressors returns the compressor names advertised by the  // client via grpc-accept-encoding header. -func (s *Stream) ClientAdvertisedCompressors() string { -	return s.clientAdvertisedCompressors +func (s *Stream) ClientAdvertisedCompressors() []string { +	values := strings.Split(s.clientAdvertisedCompressors, ",") +	for i, v := range values { +		values[i] = strings.TrimSpace(v) +	} +	return values  }  // Done returns a channel which is closed when it receives the final status @@ -566,7 +571,7 @@ type ServerConfig struct {  	WriteBufferSize       int  	ReadBufferSize        int  	SharedWriteBuffer     bool -	ChannelzParentID      *channelz.Identifier +	ChannelzParent        *channelz.Server  	MaxHeaderListSize     *uint32  	HeaderTableSize       *uint32  } @@ -601,8 +606,8 @@ type ConnectOptions struct {  	ReadBufferSize int  	// SharedWriteBuffer indicates whether connections should reuse write buffer  	SharedWriteBuffer bool -	// ChannelzParentID sets the addrConn id which initiate the creation of this client transport. -	ChannelzParentID *channelz.Identifier +	// ChannelzParent sets the addrConn id which initiated the creation of this client transport. +	ChannelzParent *channelz.SubChannel  	// MaxHeaderListSize sets the max (uncompressed) size of header list that is prepared to be received.  	MaxHeaderListSize *uint32  	// UseProxy specifies if a proxy should be used. @@ -815,30 +820,6 @@ const (  	GoAwayTooManyPings GoAwayReason = 2  ) -// channelzData is used to store channelz related data for http2Client and http2Server. -// These fields cannot be embedded in the original structs (e.g. http2Client), since to do atomic -// operation on int64 variable on 32-bit machine, user is responsible to enforce memory alignment. -// Here, by grouping those int64 fields inside a struct, we are enforcing the alignment. -type channelzData struct { -	kpCount int64 -	// The number of streams that have started, including already finished ones. -	streamsStarted int64 -	// Client side: The number of streams that have ended successfully by receiving -	// EoS bit set frame from server. -	// Server side: The number of streams that have ended successfully by sending -	// frame with EoS bit set. -	streamsSucceeded int64 -	streamsFailed    int64 -	// lastStreamCreatedTime stores the timestamp that the last stream gets created. It is of int64 type -	// instead of time.Time since it's more costly to atomically update time.Time variable than int64 -	// variable. The same goes for lastMsgSentTime and lastMsgRecvTime. -	lastStreamCreatedTime int64 -	msgSent               int64 -	msgRecv               int64 -	lastMsgSentTime       int64 -	lastMsgRecvTime       int64 -} -  // ContextErr converts the error from context package into a status error.  func ContextErr(err error) error {  	switch err { diff --git a/vendor/google.golang.org/grpc/internal/xds_handshake_cluster.go b/vendor/google.golang.org/grpc/internal/xds_handshake_cluster.go deleted file mode 100644 index e8b492774..000000000 --- a/vendor/google.golang.org/grpc/internal/xds_handshake_cluster.go +++ /dev/null @@ -1,40 +0,0 @@ -/* - * Copyright 2021 gRPC authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - *     http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package internal - -import ( -	"google.golang.org/grpc/attributes" -	"google.golang.org/grpc/resolver" -) - -// handshakeClusterNameKey is the type used as the key to store cluster name in -// the Attributes field of resolver.Address. -type handshakeClusterNameKey struct{} - -// SetXDSHandshakeClusterName returns a copy of addr in which the Attributes field -// is updated with the cluster name. -func SetXDSHandshakeClusterName(addr resolver.Address, clusterName string) resolver.Address { -	addr.Attributes = addr.Attributes.WithValue(handshakeClusterNameKey{}, clusterName) -	return addr -} - -// GetXDSHandshakeClusterName returns cluster name stored in attr. -func GetXDSHandshakeClusterName(attr *attributes.Attributes) (string, bool) { -	v := attr.Value(handshakeClusterNameKey{}) -	name, ok := v.(string) -	return name, ok -} diff --git a/vendor/google.golang.org/grpc/pickfirst.go b/vendor/google.golang.org/grpc/pickfirst.go index 5128f9364..e3ea42ba9 100644 --- a/vendor/google.golang.org/grpc/pickfirst.go +++ b/vendor/google.golang.org/grpc/pickfirst.go @@ -38,19 +38,15 @@ const (  	logPrefix             = "[pick-first-lb %p] "  ) -func newPickfirstBuilder() balancer.Builder { -	return &pickfirstBuilder{} -} -  type pickfirstBuilder struct{} -func (*pickfirstBuilder) Build(cc balancer.ClientConn, opt balancer.BuildOptions) balancer.Balancer { +func (pickfirstBuilder) Build(cc balancer.ClientConn, opt balancer.BuildOptions) balancer.Balancer {  	b := &pickfirstBalancer{cc: cc}  	b.logger = internalgrpclog.NewPrefixLogger(logger, fmt.Sprintf(logPrefix, b))  	return b  } -func (*pickfirstBuilder) Name() string { +func (pickfirstBuilder) Name() string {  	return PickFirstBalancerName  } @@ -63,7 +59,7 @@ type pfConfig struct {  	ShuffleAddressList bool `json:"shuffleAddressList"`  } -func (*pickfirstBuilder) ParseConfig(js json.RawMessage) (serviceconfig.LoadBalancingConfig, error) { +func (pickfirstBuilder) ParseConfig(js json.RawMessage) (serviceconfig.LoadBalancingConfig, error) {  	var cfg pfConfig  	if err := json.Unmarshal(js, &cfg); err != nil {  		return nil, fmt.Errorf("pickfirst: unable to unmarshal LB policy config: %s, error: %v", string(js), err) @@ -243,7 +239,3 @@ func (i *idlePicker) Pick(balancer.PickInfo) (balancer.PickResult, error) {  	i.subConn.Connect()  	return balancer.PickResult{}, balancer.ErrNoSubConnAvailable  } - -func init() { -	balancer.Register(newPickfirstBuilder()) -} diff --git a/vendor/google.golang.org/grpc/resolver/dns/dns_resolver.go b/vendor/google.golang.org/grpc/resolver/dns/dns_resolver.go index 14aa6f20a..b54a3a322 100644 --- a/vendor/google.golang.org/grpc/resolver/dns/dns_resolver.go +++ b/vendor/google.golang.org/grpc/resolver/dns/dns_resolver.go @@ -24,10 +24,28 @@  package dns  import ( +	"time" +  	"google.golang.org/grpc/internal/resolver/dns"  	"google.golang.org/grpc/resolver"  ) +// SetResolvingTimeout sets the maximum duration for DNS resolution requests. +// +// This function affects the global timeout used by all channels using the DNS +// name resolver scheme. +// +// It must be called only at application startup, before any gRPC calls are +// made. Modifying this value after initialization is not thread-safe. +// +// The default value is 30 seconds. Setting the timeout too low may result in +// premature timeouts during resolution, while setting it too high may lead to +// unnecessary delays in service discovery. Choose a value appropriate for your +// specific needs and network environment. +func SetResolvingTimeout(timeout time.Duration) { +	dns.ResolvingTimeout = timeout +} +  // NewBuilder creates a dnsBuilder which is used to factory DNS resolvers.  //  // Deprecated: import grpc and use resolver.Get("dns") instead. diff --git a/vendor/google.golang.org/grpc/resolver/resolver.go b/vendor/google.golang.org/grpc/resolver/resolver.go index adf89dd9c..202854511 100644 --- a/vendor/google.golang.org/grpc/resolver/resolver.go +++ b/vendor/google.golang.org/grpc/resolver/resolver.go @@ -29,6 +29,7 @@ import (  	"google.golang.org/grpc/attributes"  	"google.golang.org/grpc/credentials" +	"google.golang.org/grpc/internal"  	"google.golang.org/grpc/serviceconfig"  ) @@ -63,16 +64,18 @@ func Get(scheme string) Builder {  }  // SetDefaultScheme sets the default scheme that will be used. The default -// default scheme is "passthrough". +// scheme is initially set to "passthrough".  //  // NOTE: this function must only be called during initialization time (i.e. in  // an init() function), and is not thread-safe. The scheme set last overrides  // previously set values.  func SetDefaultScheme(scheme string) {  	defaultScheme = scheme +	internal.UserSetDefaultScheme = true  } -// GetDefaultScheme gets the default scheme that will be used. +// GetDefaultScheme gets the default scheme that will be used by grpc.Dial.  If +// SetDefaultScheme is never called, the default scheme used by grpc.NewClient is "dns" instead.  func GetDefaultScheme() string {  	return defaultScheme  } @@ -168,6 +171,9 @@ type BuildOptions struct {  	// field. In most cases though, it is not appropriate, and this field may  	// be ignored.  	Dialer func(context.Context, string) (net.Conn, error) +	// Authority is the effective authority of the clientconn for which the +	// resolver is built. +	Authority string  }  // An Endpoint is one network endpoint, or server, which may have multiple @@ -281,9 +287,9 @@ func (t Target) Endpoint() string {  	return strings.TrimPrefix(endpoint, "/")  } -// String returns a string representation of Target. +// String returns the canonical string representation of Target.  func (t Target) String() string { -	return t.URL.String() +	return t.URL.Scheme + "://" + t.URL.Host + "/" + t.Endpoint()  }  // Builder creates a resolver that will be used to watch name resolution updates. diff --git a/vendor/google.golang.org/grpc/resolver_wrapper.go b/vendor/google.golang.org/grpc/resolver_wrapper.go index c79bab121..9dcc9780f 100644 --- a/vendor/google.golang.org/grpc/resolver_wrapper.go +++ b/vendor/google.golang.org/grpc/resolver_wrapper.go @@ -75,6 +75,7 @@ func (ccr *ccResolverWrapper) start() error {  			DialCreds:            ccr.cc.dopts.copts.TransportCredentials,  			CredsBundle:          ccr.cc.dopts.copts.CredsBundle,  			Dialer:               ccr.cc.dopts.copts.Dialer, +			Authority:            ccr.cc.authority,  		}  		var err error  		ccr.resolver, err = ccr.cc.resolverBuilder.Build(ccr.cc.parsedTarget, ccr, opts) @@ -96,7 +97,7 @@ func (ccr *ccResolverWrapper) resolveNow(o resolver.ResolveNowOptions) {  // finished shutting down, the channel should block on ccr.serializer.Done()  // without cc.mu held.  func (ccr *ccResolverWrapper) close() { -	channelz.Info(logger, ccr.cc.channelzID, "Closing the name resolver") +	channelz.Info(logger, ccr.cc.channelz, "Closing the name resolver")  	ccr.mu.Lock()  	ccr.closed = true  	ccr.mu.Unlock() @@ -146,7 +147,7 @@ func (ccr *ccResolverWrapper) ReportError(err error) {  		return  	}  	ccr.mu.Unlock() -	channelz.Warningf(logger, ccr.cc.channelzID, "ccResolverWrapper: reporting error to cc: %v", err) +	channelz.Warningf(logger, ccr.cc.channelz, "ccResolverWrapper: reporting error to cc: %v", err)  	ccr.cc.updateResolverStateAndUnlock(resolver.State{}, err)  } @@ -193,5 +194,5 @@ func (ccr *ccResolverWrapper) addChannelzTraceEvent(s resolver.State) {  	} else if len(ccr.curState.Addresses) == 0 && len(s.Addresses) > 0 {  		updates = append(updates, "resolver returned new addresses")  	} -	channelz.Infof(logger, ccr.cc.channelzID, "Resolver state updated: %s (%v)", pretty.ToJSON(s), strings.Join(updates, "; ")) +	channelz.Infof(logger, ccr.cc.channelz, "Resolver state updated: %s (%v)", pretty.ToJSON(s), strings.Join(updates, "; "))  } diff --git a/vendor/google.golang.org/grpc/rpc_util.go b/vendor/google.golang.org/grpc/rpc_util.go index a4b6bc687..998e251dd 100644 --- a/vendor/google.golang.org/grpc/rpc_util.go +++ b/vendor/google.golang.org/grpc/rpc_util.go @@ -189,6 +189,20 @@ type EmptyCallOption struct{}  func (EmptyCallOption) before(*callInfo) error      { return nil }  func (EmptyCallOption) after(*callInfo, *csAttempt) {} +// StaticMethod returns a CallOption which specifies that a call is being made +// to a method that is static, which means the method is known at compile time +// and doesn't change at runtime. This can be used as a signal to stats plugins +// that this method is safe to include as a key to a measurement. +func StaticMethod() CallOption { +	return StaticMethodCallOption{} +} + +// StaticMethodCallOption is a CallOption that specifies that a call comes +// from a static method. +type StaticMethodCallOption struct { +	EmptyCallOption +} +  // Header returns a CallOptions that retrieves the header metadata  // for a unary RPC.  func Header(md *metadata.MD) CallOption { @@ -730,17 +744,19 @@ type payloadInfo struct {  	uncompressedBytes []byte  } -func recvAndDecompress(p *parser, s *transport.Stream, dc Decompressor, maxReceiveMessageSize int, payInfo *payloadInfo, compressor encoding.Compressor) ([]byte, error) { -	pf, buf, err := p.recvMsg(maxReceiveMessageSize) +// recvAndDecompress reads a message from the stream, decompressing it if necessary. +// +// Cancelling the returned cancel function releases the buffer back to the pool. So the caller should cancel as soon as +// the buffer is no longer needed. +func recvAndDecompress(p *parser, s *transport.Stream, dc Decompressor, maxReceiveMessageSize int, payInfo *payloadInfo, compressor encoding.Compressor, +) (uncompressedBuf []byte, cancel func(), err error) { +	pf, compressedBuf, err := p.recvMsg(maxReceiveMessageSize)  	if err != nil { -		return nil, err -	} -	if payInfo != nil { -		payInfo.compressedLength = len(buf) +		return nil, nil, err  	}  	if st := checkRecvPayload(pf, s.RecvCompress(), compressor != nil || dc != nil); st != nil { -		return nil, st.Err() +		return nil, nil, st.Err()  	}  	var size int @@ -748,21 +764,35 @@ func recvAndDecompress(p *parser, s *transport.Stream, dc Decompressor, maxRecei  		// To match legacy behavior, if the decompressor is set by WithDecompressor or RPCDecompressor,  		// use this decompressor as the default.  		if dc != nil { -			buf, err = dc.Do(bytes.NewReader(buf)) -			size = len(buf) +			uncompressedBuf, err = dc.Do(bytes.NewReader(compressedBuf)) +			size = len(uncompressedBuf)  		} else { -			buf, size, err = decompress(compressor, buf, maxReceiveMessageSize) +			uncompressedBuf, size, err = decompress(compressor, compressedBuf, maxReceiveMessageSize)  		}  		if err != nil { -			return nil, status.Errorf(codes.Internal, "grpc: failed to decompress the received message: %v", err) +			return nil, nil, status.Errorf(codes.Internal, "grpc: failed to decompress the received message: %v", err)  		}  		if size > maxReceiveMessageSize {  			// TODO: Revisit the error code. Currently keep it consistent with java  			// implementation. -			return nil, status.Errorf(codes.ResourceExhausted, "grpc: received message after decompression larger than max (%d vs. %d)", size, maxReceiveMessageSize) +			return nil, nil, status.Errorf(codes.ResourceExhausted, "grpc: received message after decompression larger than max (%d vs. %d)", size, maxReceiveMessageSize)  		} +	} else { +		uncompressedBuf = compressedBuf  	} -	return buf, nil + +	if payInfo != nil { +		payInfo.compressedLength = len(compressedBuf) +		payInfo.uncompressedBytes = uncompressedBuf + +		cancel = func() {} +	} else { +		cancel = func() { +			p.recvBufferPool.Put(&compressedBuf) +		} +	} + +	return uncompressedBuf, cancel, nil  }  // Using compressor, decompress d, returning data and size. @@ -782,6 +812,9 @@ func decompress(compressor encoding.Compressor, d []byte, maxReceiveMessageSize  			// size is used as an estimate to size the buffer, but we  			// will read more data if available.  			// +MinRead so ReadFrom will not reallocate if size is correct. +			// +			// TODO: If we ensure that the buffer size is the same as the DecompressedSize, +			// we can also utilize the recv buffer pool here.  			buf := bytes.NewBuffer(make([]byte, 0, size+bytes.MinRead))  			bytesRead, err := buf.ReadFrom(io.LimitReader(dcReader, int64(maxReceiveMessageSize)+1))  			return buf.Bytes(), int(bytesRead), err @@ -797,18 +830,15 @@ func decompress(compressor encoding.Compressor, d []byte, maxReceiveMessageSize  // dc takes precedence over compressor.  // TODO(dfawley): wrap the old compressor/decompressor using the new API?  func recv(p *parser, c baseCodec, s *transport.Stream, dc Decompressor, m any, maxReceiveMessageSize int, payInfo *payloadInfo, compressor encoding.Compressor) error { -	buf, err := recvAndDecompress(p, s, dc, maxReceiveMessageSize, payInfo, compressor) +	buf, cancel, err := recvAndDecompress(p, s, dc, maxReceiveMessageSize, payInfo, compressor)  	if err != nil {  		return err  	} +	defer cancel() +  	if err := c.Unmarshal(buf, m); err != nil {  		return status.Errorf(codes.Internal, "grpc: failed to unmarshal the received message: %v", err)  	} -	if payInfo != nil { -		payInfo.uncompressedBytes = buf -	} else { -		p.recvBufferPool.Put(&buf) -	}  	return nil  } @@ -932,19 +962,6 @@ func setCallInfoCodec(c *callInfo) error {  	return nil  } -// channelzData is used to store channelz related data for ClientConn, addrConn and Server. -// These fields cannot be embedded in the original structs (e.g. ClientConn), since to do atomic -// operation on int64 variable on 32-bit machine, user is responsible to enforce memory alignment. -// Here, by grouping those int64 fields inside a struct, we are enforcing the alignment. -type channelzData struct { -	callsStarted   int64 -	callsFailed    int64 -	callsSucceeded int64 -	// lastCallStartedTime stores the timestamp that last call starts. It is of int64 type instead of -	// time.Time since it's more costly to atomically update time.Time variable than int64 variable. -	lastCallStartedTime int64 -} -  // The SupportPackageIsVersion variables are referenced from generated protocol  // buffer files to ensure compatibility with the gRPC version used.  The latest  // support package version is 7. @@ -958,6 +975,7 @@ const (  	SupportPackageIsVersion5 = true  	SupportPackageIsVersion6 = true  	SupportPackageIsVersion7 = true +	SupportPackageIsVersion8 = true  )  const grpcUA = "grpc-go/" + Version diff --git a/vendor/google.golang.org/grpc/server.go b/vendor/google.golang.org/grpc/server.go index e89c5ac61..fd4558daa 100644 --- a/vendor/google.golang.org/grpc/server.go +++ b/vendor/google.golang.org/grpc/server.go @@ -33,8 +33,6 @@ import (  	"sync/atomic"  	"time" -	"golang.org/x/net/trace" -  	"google.golang.org/grpc/codes"  	"google.golang.org/grpc/credentials"  	"google.golang.org/grpc/encoding" @@ -131,7 +129,7 @@ type Server struct {  	drain    bool  	cv       *sync.Cond              // signaled when connections close for GracefulStop  	services map[string]*serviceInfo // service name -> service info -	events   trace.EventLog +	events   traceEventLog  	quit               *grpcsync.Event  	done               *grpcsync.Event @@ -139,8 +137,7 @@ type Server struct {  	serveWG            sync.WaitGroup // counts active Serve goroutines for Stop/GracefulStop  	handlersWG         sync.WaitGroup // counts active method handler goroutines -	channelzID *channelz.Identifier -	czData     *channelzData +	channelz *channelz.Server  	serverWorkerChannel      chan func()  	serverWorkerChannelClose func() @@ -251,11 +248,9 @@ func SharedWriteBuffer(val bool) ServerOption {  }  // WriteBufferSize determines how much data can be batched before doing a write -// on the wire. The corresponding memory allocation for this buffer will be -// twice the size to keep syscalls low. The default value for this buffer is -// 32KB. Zero or negative values will disable the write buffer such that each -// write will be on underlying connection. -// Note: A Send call may not directly translate to a write. +// on the wire. The default value for this buffer is 32KB. Zero or negative +// values will disable the write buffer such that each write will be on underlying +// connection. Note: A Send call may not directly translate to a write.  func WriteBufferSize(s int) ServerOption {  	return newFuncServerOption(func(o *serverOptions) {  		o.writeBufferSize = s @@ -663,22 +658,21 @@ func NewServer(opt ...ServerOption) *Server {  		services: make(map[string]*serviceInfo),  		quit:     grpcsync.NewEvent(),  		done:     grpcsync.NewEvent(), -		czData:   new(channelzData), +		channelz: channelz.RegisterServer(""),  	}  	chainUnaryServerInterceptors(s)  	chainStreamServerInterceptors(s)  	s.cv = sync.NewCond(&s.mu)  	if EnableTracing {  		_, file, line, _ := runtime.Caller(1) -		s.events = trace.NewEventLog("grpc.Server", fmt.Sprintf("%s:%d", file, line)) +		s.events = newTraceEventLog("grpc.Server", fmt.Sprintf("%s:%d", file, line))  	}  	if s.opts.numServerWorkers > 0 {  		s.initServerWorkers()  	} -	s.channelzID = channelz.RegisterServer(&channelzServer{s}, "") -	channelz.Info(logger, s.channelzID, "Server created") +	channelz.Info(logger, s.channelz, "Server created")  	return s  } @@ -804,20 +798,13 @@ var ErrServerStopped = errors.New("grpc: the server has been stopped")  type listenSocket struct {  	net.Listener -	channelzID *channelz.Identifier -} - -func (l *listenSocket) ChannelzMetric() *channelz.SocketInternalMetric { -	return &channelz.SocketInternalMetric{ -		SocketOptions: channelz.GetSocketOption(l.Listener), -		LocalAddr:     l.Listener.Addr(), -	} +	channelz *channelz.Socket  }  func (l *listenSocket) Close() error {  	err := l.Listener.Close() -	channelz.RemoveEntry(l.channelzID) -	channelz.Info(logger, l.channelzID, "ListenSocket deleted") +	channelz.RemoveEntry(l.channelz.ID) +	channelz.Info(logger, l.channelz, "ListenSocket deleted")  	return err  } @@ -859,7 +846,16 @@ func (s *Server) Serve(lis net.Listener) error {  		}  	}() -	ls := &listenSocket{Listener: lis} +	ls := &listenSocket{ +		Listener: lis, +		channelz: channelz.RegisterSocket(&channelz.Socket{ +			SocketType:    channelz.SocketTypeListen, +			Parent:        s.channelz, +			RefName:       lis.Addr().String(), +			LocalAddr:     lis.Addr(), +			SocketOptions: channelz.GetSocketOption(lis)}, +		), +	}  	s.lis[ls] = true  	defer func() { @@ -871,14 +867,8 @@ func (s *Server) Serve(lis net.Listener) error {  		s.mu.Unlock()  	}() -	var err error -	ls.channelzID, err = channelz.RegisterListenSocket(ls, s.channelzID, lis.Addr().String()) -	if err != nil { -		s.mu.Unlock() -		return err -	}  	s.mu.Unlock() -	channelz.Info(logger, ls.channelzID, "ListenSocket created") +	channelz.Info(logger, ls.channelz, "ListenSocket created")  	var tempDelay time.Duration // how long to sleep on accept failure  	for { @@ -977,7 +967,7 @@ func (s *Server) newHTTP2Transport(c net.Conn) transport.ServerTransport {  		WriteBufferSize:       s.opts.writeBufferSize,  		ReadBufferSize:        s.opts.readBufferSize,  		SharedWriteBuffer:     s.opts.sharedWriteBuffer, -		ChannelzParentID:      s.channelzID, +		ChannelzParent:        s.channelz,  		MaxHeaderListSize:     s.opts.maxHeaderListSize,  		HeaderTableSize:       s.opts.headerTableSize,  	} @@ -991,7 +981,7 @@ func (s *Server) newHTTP2Transport(c net.Conn) transport.ServerTransport {  		if err != credentials.ErrConnDispatched {  			// Don't log on ErrConnDispatched and io.EOF to prevent log spam.  			if err != io.EOF { -				channelz.Info(logger, s.channelzID, "grpc: Server.Serve failed to create ServerTransport: ", err) +				channelz.Info(logger, s.channelz, "grpc: Server.Serve failed to create ServerTransport: ", err)  			}  			c.Close()  		} @@ -1123,37 +1113,28 @@ func (s *Server) removeConn(addr string, st transport.ServerTransport) {  	}  } -func (s *Server) channelzMetric() *channelz.ServerInternalMetric { -	return &channelz.ServerInternalMetric{ -		CallsStarted:             atomic.LoadInt64(&s.czData.callsStarted), -		CallsSucceeded:           atomic.LoadInt64(&s.czData.callsSucceeded), -		CallsFailed:              atomic.LoadInt64(&s.czData.callsFailed), -		LastCallStartedTimestamp: time.Unix(0, atomic.LoadInt64(&s.czData.lastCallStartedTime)), -	} -} -  func (s *Server) incrCallsStarted() { -	atomic.AddInt64(&s.czData.callsStarted, 1) -	atomic.StoreInt64(&s.czData.lastCallStartedTime, time.Now().UnixNano()) +	s.channelz.ServerMetrics.CallsStarted.Add(1) +	s.channelz.ServerMetrics.LastCallStartedTimestamp.Store(time.Now().UnixNano())  }  func (s *Server) incrCallsSucceeded() { -	atomic.AddInt64(&s.czData.callsSucceeded, 1) +	s.channelz.ServerMetrics.CallsSucceeded.Add(1)  }  func (s *Server) incrCallsFailed() { -	atomic.AddInt64(&s.czData.callsFailed, 1) +	s.channelz.ServerMetrics.CallsFailed.Add(1)  }  func (s *Server) sendResponse(ctx context.Context, t transport.ServerTransport, stream *transport.Stream, msg any, cp Compressor, opts *transport.Options, comp encoding.Compressor) error {  	data, err := encode(s.getCodec(stream.ContentSubtype()), msg)  	if err != nil { -		channelz.Error(logger, s.channelzID, "grpc: server failed to encode response: ", err) +		channelz.Error(logger, s.channelz, "grpc: server failed to encode response: ", err)  		return err  	}  	compData, err := compress(data, cp, comp)  	if err != nil { -		channelz.Error(logger, s.channelzID, "grpc: server failed to compress response: ", err) +		channelz.Error(logger, s.channelz, "grpc: server failed to compress response: ", err)  		return err  	}  	hdr, payload := msgHeader(data, compData) @@ -1344,10 +1325,11 @@ func (s *Server) processUnaryRPC(ctx context.Context, t transport.ServerTranspor  	if len(shs) != 0 || len(binlogs) != 0 {  		payInfo = &payloadInfo{}  	} -	d, err := recvAndDecompress(&parser{r: stream, recvBufferPool: s.opts.recvBufferPool}, stream, dc, s.opts.maxReceiveMessageSize, payInfo, decomp) + +	d, cancel, err := recvAndDecompress(&parser{r: stream, recvBufferPool: s.opts.recvBufferPool}, stream, dc, s.opts.maxReceiveMessageSize, payInfo, decomp)  	if err != nil {  		if e := t.WriteStatus(stream, status.Convert(err)); e != nil { -			channelz.Warningf(logger, s.channelzID, "grpc: Server.processUnaryRPC failed to write status: %v", e) +			channelz.Warningf(logger, s.channelz, "grpc: Server.processUnaryRPC failed to write status: %v", e)  		}  		return err  	} @@ -1355,6 +1337,8 @@ func (s *Server) processUnaryRPC(ctx context.Context, t transport.ServerTranspor  		t.IncrMsgRecv()  	}  	df := func(v any) error { +		defer cancel() +  		if err := s.getCodec(stream.ContentSubtype()).Unmarshal(d, v); err != nil {  			return status.Errorf(codes.Internal, "grpc: error unmarshalling request: %v", err)  		} @@ -1396,7 +1380,7 @@ func (s *Server) processUnaryRPC(ctx context.Context, t transport.ServerTranspor  			trInfo.tr.SetError()  		}  		if e := t.WriteStatus(stream, appStatus); e != nil { -			channelz.Warningf(logger, s.channelzID, "grpc: Server.processUnaryRPC failed to write status: %v", e) +			channelz.Warningf(logger, s.channelz, "grpc: Server.processUnaryRPC failed to write status: %v", e)  		}  		if len(binlogs) != 0 {  			if h, _ := stream.Header(); h.Len() > 0 { @@ -1436,7 +1420,7 @@ func (s *Server) processUnaryRPC(ctx context.Context, t transport.ServerTranspor  		}  		if sts, ok := status.FromError(err); ok {  			if e := t.WriteStatus(stream, sts); e != nil { -				channelz.Warningf(logger, s.channelzID, "grpc: Server.processUnaryRPC failed to write status: %v", e) +				channelz.Warningf(logger, s.channelz, "grpc: Server.processUnaryRPC failed to write status: %v", e)  			}  		} else {  			switch st := err.(type) { @@ -1734,8 +1718,8 @@ func (s *Server) handleStream(t transport.ServerTransport, stream *transport.Str  	ctx = contextWithServer(ctx, s)  	var ti *traceInfo  	if EnableTracing { -		tr := trace.New("grpc.Recv."+methodFamily(stream.Method()), stream.Method()) -		ctx = trace.NewContext(ctx, tr) +		tr := newTrace("grpc.Recv."+methodFamily(stream.Method()), stream.Method()) +		ctx = newTraceContext(ctx, tr)  		ti = &traceInfo{  			tr: tr,  			firstLine: firstLine{ @@ -1764,7 +1748,7 @@ func (s *Server) handleStream(t transport.ServerTransport, stream *transport.Str  				ti.tr.LazyLog(&fmtStringer{"%v", []any{err}}, true)  				ti.tr.SetError()  			} -			channelz.Warningf(logger, s.channelzID, "grpc: Server.handleStream failed to write status: %v", err) +			channelz.Warningf(logger, s.channelz, "grpc: Server.handleStream failed to write status: %v", err)  		}  		if ti != nil {  			ti.tr.Finish() @@ -1821,7 +1805,7 @@ func (s *Server) handleStream(t transport.ServerTransport, stream *transport.Str  			ti.tr.LazyLog(&fmtStringer{"%v", []any{err}}, true)  			ti.tr.SetError()  		} -		channelz.Warningf(logger, s.channelzID, "grpc: Server.handleStream failed to write status: %v", err) +		channelz.Warningf(logger, s.channelz, "grpc: Server.handleStream failed to write status: %v", err)  	}  	if ti != nil {  		ti.tr.Finish() @@ -1893,8 +1877,7 @@ func (s *Server) stop(graceful bool) {  	s.quit.Fire()  	defer s.done.Fire() -	s.channelzRemoveOnce.Do(func() { channelz.RemoveEntry(s.channelzID) }) - +	s.channelzRemoveOnce.Do(func() { channelz.RemoveEntry(s.channelz.ID) })  	s.mu.Lock()  	s.closeListenersLocked()  	// Wait for serving threads to be ready to exit.  Only then can we be sure no @@ -2119,7 +2102,7 @@ func ClientSupportedCompressors(ctx context.Context) ([]string, error) {  		return nil, fmt.Errorf("failed to fetch the stream from the given context %v", ctx)  	} -	return strings.Split(stream.ClientAdvertisedCompressors(), ","), nil +	return stream.ClientAdvertisedCompressors(), nil  }  // SetTrailer sets the trailer metadata that will be sent when an RPC returns. @@ -2149,17 +2132,9 @@ func Method(ctx context.Context) (string, bool) {  	return s.Method(), true  } -type channelzServer struct { -	s *Server -} - -func (c *channelzServer) ChannelzMetric() *channelz.ServerInternalMetric { -	return c.s.channelzMetric() -} -  // validateSendCompressor returns an error when given compressor name cannot be  // handled by the server or the client based on the advertised compressors. -func validateSendCompressor(name, clientCompressors string) error { +func validateSendCompressor(name string, clientCompressors []string) error {  	if name == encoding.Identity {  		return nil  	} @@ -2168,7 +2143,7 @@ func validateSendCompressor(name, clientCompressors string) error {  		return fmt.Errorf("compressor not registered %q", name)  	} -	for _, c := range strings.Split(clientCompressors, ",") { +	for _, c := range clientCompressors {  		if c == name {  			return nil // found match  		} diff --git a/vendor/google.golang.org/grpc/service_config.go b/vendor/google.golang.org/grpc/service_config.go index 0df11fc09..2b35c5d21 100644 --- a/vendor/google.golang.org/grpc/service_config.go +++ b/vendor/google.golang.org/grpc/service_config.go @@ -25,8 +25,10 @@ import (  	"reflect"  	"time" +	"google.golang.org/grpc/balancer"  	"google.golang.org/grpc/codes"  	"google.golang.org/grpc/internal" +	"google.golang.org/grpc/internal/balancer/gracefulswitch"  	internalserviceconfig "google.golang.org/grpc/internal/serviceconfig"  	"google.golang.org/grpc/serviceconfig"  ) @@ -41,11 +43,6 @@ const maxInt = int(^uint(0) >> 1)  // https://github.com/grpc/grpc/blob/master/doc/service_config.md  type MethodConfig = internalserviceconfig.MethodConfig -type lbConfig struct { -	name string -	cfg  serviceconfig.LoadBalancingConfig -} -  // ServiceConfig is provided by the service provider and contains parameters for how  // clients that connect to the service should behave.  // @@ -55,14 +52,9 @@ type lbConfig struct {  type ServiceConfig struct {  	serviceconfig.Config -	// LB is the load balancer the service providers recommends.  This is -	// deprecated; lbConfigs is preferred.  If lbConfig and LB are both present, -	// lbConfig will be used. -	LB *string -  	// lbConfig is the service config's load balancing configuration.  If  	// lbConfig and LB are both present, lbConfig will be used. -	lbConfig *lbConfig +	lbConfig serviceconfig.LoadBalancingConfig  	// Methods contains a map for the methods in this service.  If there is an  	// exact match for a method (i.e. /service/method) in the map, use the @@ -164,7 +156,7 @@ type jsonMC struct {  // TODO(lyuxuan): delete this struct after cleaning up old service config implementation.  type jsonSC struct {  	LoadBalancingPolicy *string -	LoadBalancingConfig *internalserviceconfig.BalancerConfig +	LoadBalancingConfig *json.RawMessage  	MethodConfig        *[]jsonMC  	RetryThrottling     *retryThrottlingPolicy  	HealthCheckConfig   *healthCheckConfig @@ -184,18 +176,33 @@ func parseServiceConfig(js string) *serviceconfig.ParseResult {  		return &serviceconfig.ParseResult{Err: err}  	}  	sc := ServiceConfig{ -		LB:                rsc.LoadBalancingPolicy,  		Methods:           make(map[string]MethodConfig),  		retryThrottling:   rsc.RetryThrottling,  		healthCheckConfig: rsc.HealthCheckConfig,  		rawJSONString:     js,  	} -	if c := rsc.LoadBalancingConfig; c != nil { -		sc.lbConfig = &lbConfig{ -			name: c.Name, -			cfg:  c.Config, +	c := rsc.LoadBalancingConfig +	if c == nil { +		name := PickFirstBalancerName +		if rsc.LoadBalancingPolicy != nil { +			name = *rsc.LoadBalancingPolicy +		} +		if balancer.Get(name) == nil { +			name = PickFirstBalancerName  		} +		cfg := []map[string]any{{name: struct{}{}}} +		strCfg, err := json.Marshal(cfg) +		if err != nil { +			return &serviceconfig.ParseResult{Err: fmt.Errorf("unexpected error marshaling simple LB config: %w", err)} +		} +		r := json.RawMessage(strCfg) +		c = &r +	} +	cfg, err := gracefulswitch.ParseConfig(*c) +	if err != nil { +		return &serviceconfig.ParseResult{Err: err}  	} +	sc.lbConfig = cfg  	if rsc.MethodConfig == nil {  		return &serviceconfig.ParseResult{Config: &sc} diff --git a/vendor/google.golang.org/grpc/stream.go b/vendor/google.golang.org/grpc/stream.go index d621f52b1..d939ffc63 100644 --- a/vendor/google.golang.org/grpc/stream.go +++ b/vendor/google.golang.org/grpc/stream.go @@ -27,7 +27,6 @@ import (  	"sync"  	"time" -	"golang.org/x/net/trace"  	"google.golang.org/grpc/balancer"  	"google.golang.org/grpc/codes"  	"google.golang.org/grpc/encoding" @@ -431,7 +430,7 @@ func (cs *clientStream) newAttemptLocked(isTransparent bool) (*csAttempt, error)  	var trInfo *traceInfo  	if EnableTracing {  		trInfo = &traceInfo{ -			tr: trace.New("grpc.Sent."+methodFamily(method), method), +			tr: newTrace("grpc.Sent."+methodFamily(method), method),  			firstLine: firstLine{  				client: true,  			}, @@ -440,7 +439,7 @@ func (cs *clientStream) newAttemptLocked(isTransparent bool) (*csAttempt, error)  			trInfo.firstLine.deadline = time.Until(deadline)  		}  		trInfo.tr.LazyLog(&trInfo.firstLine, false) -		ctx = trace.NewContext(ctx, trInfo.tr) +		ctx = newTraceContext(ctx, trInfo.tr)  	}  	if cs.cc.parsedTarget.URL.Scheme == internal.GRPCResolverSchemeExtraMetadata { @@ -656,13 +655,13 @@ func (a *csAttempt) shouldRetry(err error) (bool, error) {  		if len(sps) == 1 {  			var e error  			if pushback, e = strconv.Atoi(sps[0]); e != nil || pushback < 0 { -				channelz.Infof(logger, cs.cc.channelzID, "Server retry pushback specified to abort (%q).", sps[0]) +				channelz.Infof(logger, cs.cc.channelz, "Server retry pushback specified to abort (%q).", sps[0])  				cs.retryThrottler.throttle() // This counts as a failure for throttling.  				return false, err  			}  			hasPushback = true  		} else if len(sps) > 1 { -			channelz.Warningf(logger, cs.cc.channelzID, "Server retry pushback specified multiple values (%q); not retrying.", sps) +			channelz.Warningf(logger, cs.cc.channelz, "Server retry pushback specified multiple values (%q); not retrying.", sps)  			cs.retryThrottler.throttle() // This counts as a failure for throttling.  			return false, err  		} diff --git a/vendor/google.golang.org/grpc/trace.go b/vendor/google.golang.org/grpc/trace.go index 9ded79321..10f4f798f 100644 --- a/vendor/google.golang.org/grpc/trace.go +++ b/vendor/google.golang.org/grpc/trace.go @@ -26,8 +26,6 @@ import (  	"strings"  	"sync"  	"time" - -	"golang.org/x/net/trace"  )  // EnableTracing controls whether to trace RPCs using the golang.org/x/net/trace package. @@ -44,9 +42,31 @@ func methodFamily(m string) string {  	return m  } +// traceEventLog mirrors golang.org/x/net/trace.EventLog. +// +// It exists in order to avoid importing x/net/trace on grpcnotrace builds. +type traceEventLog interface { +	Printf(format string, a ...any) +	Errorf(format string, a ...any) +	Finish() +} + +// traceLog mirrors golang.org/x/net/trace.Trace. +// +// It exists in order to avoid importing x/net/trace on grpcnotrace builds. +type traceLog interface { +	LazyLog(x fmt.Stringer, sensitive bool) +	LazyPrintf(format string, a ...any) +	SetError() +	SetRecycler(f func(any)) +	SetTraceInfo(traceID, spanID uint64) +	SetMaxEvents(m int) +	Finish() +} +  // traceInfo contains tracing information for an RPC.  type traceInfo struct { -	tr        trace.Trace +	tr        traceLog  	firstLine firstLine  } diff --git a/vendor/google.golang.org/grpc/trace_notrace.go b/vendor/google.golang.org/grpc/trace_notrace.go new file mode 100644 index 000000000..1da3a2308 --- /dev/null +++ b/vendor/google.golang.org/grpc/trace_notrace.go @@ -0,0 +1,52 @@ +//go:build grpcnotrace + +/* + * + * Copyright 2024 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + *     http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package grpc + +// grpcnotrace can be used to avoid importing golang.org/x/net/trace, which in +// turn enables binaries using gRPC-Go for dead code elimination, which can +// yield 10-15% improvements in binary size when tracing is not needed. + +import ( +	"context" +	"fmt" +) + +type notrace struct{} + +func (notrace) LazyLog(x fmt.Stringer, sensitive bool) {} +func (notrace) LazyPrintf(format string, a ...any)     {} +func (notrace) SetError()                              {} +func (notrace) SetRecycler(f func(any))                {} +func (notrace) SetTraceInfo(traceID, spanID uint64)    {} +func (notrace) SetMaxEvents(m int)                     {} +func (notrace) Finish()                                {} + +func newTrace(family, title string) traceLog { +	return notrace{} +} + +func newTraceContext(ctx context.Context, tr traceLog) context.Context { +	return ctx +} + +func newTraceEventLog(family, title string) traceEventLog { +	return nil +} diff --git a/vendor/google.golang.org/grpc/internal/channelz/util_linux.go b/vendor/google.golang.org/grpc/trace_withtrace.go index 98288c3f8..88d6e8571 100644 --- a/vendor/google.golang.org/grpc/internal/channelz/util_linux.go +++ b/vendor/google.golang.org/grpc/trace_withtrace.go @@ -1,6 +1,8 @@ +//go:build !grpcnotrace +  /*   * - * Copyright 2018 gRPC authors. + * Copyright 2024 gRPC authors.   *   * Licensed under the Apache License, Version 2.0 (the "License");   * you may not use this file except in compliance with the License. @@ -16,22 +18,22 @@   *   */ -package channelz +package grpc  import ( -	"syscall" +	"context" + +	t "golang.org/x/net/trace"  ) -// GetSocketOption gets the socket option info of the conn. -func GetSocketOption(socket any) *SocketOptionData { -	c, ok := socket.(syscall.Conn) -	if !ok { -		return nil -	} -	data := &SocketOptionData{} -	if rawConn, err := c.SyscallConn(); err == nil { -		rawConn.Control(data.Getsockopt) -		return data -	} -	return nil +func newTrace(family, title string) traceLog { +	return t.New(family, title) +} + +func newTraceContext(ctx context.Context, tr traceLog) context.Context { +	return t.NewContext(ctx, tr) +} + +func newTraceEventLog(family, title string) traceEventLog { +	return t.NewEventLog(family, title)  } diff --git a/vendor/google.golang.org/grpc/version.go b/vendor/google.golang.org/grpc/version.go index f1aec4c0a..eaf5dbcea 100644 --- a/vendor/google.golang.org/grpc/version.go +++ b/vendor/google.golang.org/grpc/version.go @@ -19,4 +19,4 @@  package grpc  // Version is the current grpc version. -const Version = "1.61.1" +const Version = "1.63.0" diff --git a/vendor/google.golang.org/grpc/vet.sh b/vendor/google.golang.org/grpc/vet.sh index 5da38a409..7e6b92e49 100644 --- a/vendor/google.golang.org/grpc/vet.sh +++ b/vendor/google.golang.org/grpc/vet.sh @@ -41,7 +41,7 @@ if [[ "$1" = "-install" ]]; then    popd    if [[ -z "${VET_SKIP_PROTO}" ]]; then      if [[ "${GITHUB_ACTIONS}" = "true" ]]; then -      PROTOBUF_VERSION=22.0 # a.k.a v4.22.0 in pb.go files. +      PROTOBUF_VERSION=25.2 # a.k.a. v4.22.0 in pb.go files.        PROTOC_FILENAME=protoc-${PROTOBUF_VERSION}-linux-x86_64.zip        pushd /home/runner/go        wget https://github.com/google/protobuf/releases/download/v${PROTOBUF_VERSION}/${PROTOC_FILENAME} @@ -83,6 +83,10 @@ git grep 'func [A-Z]' -- "*_test.go" | not grep -v 'func Test\|Benchmark\|Exampl  # - Do not import x/net/context.  not git grep -l 'x/net/context' -- "*.go" +# - Do not use time.After except in tests.  It has the potential to leak the +#   timer since there is no way to stop it early. +git grep -l 'time.After(' -- "*.go" | not grep -v '_test.go\|test_utils\|testutils' +  # - Do not import math/rand for real library code.  Use internal/grpcrand for  #   thread safety.  git grep -l '"math/rand"' -- "*.go" 2>&1 | not grep -v '^examples\|^interop/stress\|grpcrand\|^benchmark\|wrr_test' @@ -172,6 +176,7 @@ UpdateAddresses is deprecated:  UpdateSubConnState is deprecated:  balancer.ErrTransientFailure is deprecated:  grpc/reflection/v1alpha/reflection.proto +SwitchTo is deprecated:  XXXXX xDS deprecated fields we support  .ExactMatch  .PrefixMatch | 
