diff options
Diffstat (limited to 'vendor/google.golang.org/grpc/internal')
30 files changed, 1720 insertions, 1713 deletions
diff --git a/vendor/google.golang.org/grpc/internal/balancer/gracefulswitch/config.go b/vendor/google.golang.org/grpc/internal/balancer/gracefulswitch/config.go new file mode 100644 index 000000000..6bf7f8739 --- /dev/null +++ b/vendor/google.golang.org/grpc/internal/balancer/gracefulswitch/config.go @@ -0,0 +1,83 @@ +/* + * + * Copyright 2024 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + *     http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package gracefulswitch + +import ( +	"encoding/json" +	"fmt" + +	"google.golang.org/grpc/balancer" +	"google.golang.org/grpc/serviceconfig" +) + +type lbConfig struct { +	serviceconfig.LoadBalancingConfig + +	childBuilder balancer.Builder +	childConfig  serviceconfig.LoadBalancingConfig +} + +func ChildName(l serviceconfig.LoadBalancingConfig) string { +	return l.(*lbConfig).childBuilder.Name() +} + +// ParseConfig parses a child config list and returns a LB config for the +// gracefulswitch Balancer. +// +// cfg is expected to be a json.RawMessage containing a JSON array of LB policy +// names + configs as the format of the "loadBalancingConfig" field in +// ServiceConfig.  It returns a type that should be passed to +// UpdateClientConnState in the BalancerConfig field. +func ParseConfig(cfg json.RawMessage) (serviceconfig.LoadBalancingConfig, error) { +	var lbCfg []map[string]json.RawMessage +	if err := json.Unmarshal(cfg, &lbCfg); err != nil { +		return nil, err +	} +	for i, e := range lbCfg { +		if len(e) != 1 { +			return nil, fmt.Errorf("expected a JSON struct with one entry; received entry %v at index %d", e, i) +		} + +		var name string +		var jsonCfg json.RawMessage +		for name, jsonCfg = range e { +		} + +		builder := balancer.Get(name) +		if builder == nil { +			// Skip unregistered balancer names. +			continue +		} + +		parser, ok := builder.(balancer.ConfigParser) +		if !ok { +			// This is a valid child with no config. +			return &lbConfig{childBuilder: builder}, nil +		} + +		cfg, err := parser.ParseConfig(jsonCfg) +		if err != nil { +			return nil, fmt.Errorf("error parsing config for policy %q: %v", name, err) +		} + +		return &lbConfig{childBuilder: builder, childConfig: cfg}, nil +	} + +	return nil, fmt.Errorf("no supported policies found in config: %v", string(cfg)) +} diff --git a/vendor/google.golang.org/grpc/internal/balancer/gracefulswitch/gracefulswitch.go b/vendor/google.golang.org/grpc/internal/balancer/gracefulswitch/gracefulswitch.go index 3c594e6e4..45d5e50ea 100644 --- a/vendor/google.golang.org/grpc/internal/balancer/gracefulswitch/gracefulswitch.go +++ b/vendor/google.golang.org/grpc/internal/balancer/gracefulswitch/gracefulswitch.go @@ -94,14 +94,23 @@ func (gsb *Balancer) balancerCurrentOrPending(bw *balancerWrapper) bool {  // process is not complete when this method returns. This method must be called  // synchronously alongside the rest of the balancer.Balancer methods this  // Graceful Switch Balancer implements. +// +// Deprecated: use ParseConfig and pass a parsed config to UpdateClientConnState +// to cause the Balancer to automatically change to the new child when necessary.  func (gsb *Balancer) SwitchTo(builder balancer.Builder) error { +	_, err := gsb.switchTo(builder) +	return err +} + +func (gsb *Balancer) switchTo(builder balancer.Builder) (*balancerWrapper, error) {  	gsb.mu.Lock()  	if gsb.closed {  		gsb.mu.Unlock() -		return errBalancerClosed +		return nil, errBalancerClosed  	}  	bw := &balancerWrapper{ -		gsb: gsb, +		builder: builder, +		gsb:     gsb,  		lastState: balancer.State{  			ConnectivityState: connectivity.Connecting,  			Picker:            base.NewErrPicker(balancer.ErrNoSubConnAvailable), @@ -129,7 +138,7 @@ func (gsb *Balancer) SwitchTo(builder balancer.Builder) error {  			gsb.balancerCurrent = nil  		}  		gsb.mu.Unlock() -		return balancer.ErrBadResolverState +		return nil, balancer.ErrBadResolverState  	}  	// This write doesn't need to take gsb.mu because this field never gets read @@ -138,7 +147,7 @@ func (gsb *Balancer) SwitchTo(builder balancer.Builder) error {  	// bw.Balancer field will never be forwarded to until this SwitchTo()  	// function returns.  	bw.Balancer = newBalancer -	return nil +	return bw, nil  }  // Returns nil if the graceful switch balancer is closed. @@ -152,12 +161,33 @@ func (gsb *Balancer) latestBalancer() *balancerWrapper {  }  // UpdateClientConnState forwards the update to the latest balancer created. +// +// If the state's BalancerConfig is the config returned by a call to +// gracefulswitch.ParseConfig, then this function will automatically SwitchTo +// the balancer indicated by the config before forwarding its config to it, if +// necessary.  func (gsb *Balancer) UpdateClientConnState(state balancer.ClientConnState) error {  	// The resolver data is only relevant to the most recent LB Policy.  	balToUpdate := gsb.latestBalancer() + +	gsbCfg, ok := state.BalancerConfig.(*lbConfig) +	if ok { +		// Switch to the child in the config unless it is already active. +		if balToUpdate == nil || gsbCfg.childBuilder.Name() != balToUpdate.builder.Name() { +			var err error +			balToUpdate, err = gsb.switchTo(gsbCfg.childBuilder) +			if err != nil { +				return fmt.Errorf("could not switch to new child balancer: %w", err) +			} +		} +		// Unwrap the child balancer's config. +		state.BalancerConfig = gsbCfg.childConfig +	} +  	if balToUpdate == nil {  		return errBalancerClosed  	} +  	// Perform this call without gsb.mu to prevent deadlocks if the child calls  	// back into the channel. The latest balancer can never be closed during a  	// call from the channel, even without gsb.mu held. @@ -169,6 +199,10 @@ func (gsb *Balancer) ResolverError(err error) {  	// The resolver data is only relevant to the most recent LB Policy.  	balToUpdate := gsb.latestBalancer()  	if balToUpdate == nil { +		gsb.cc.UpdateState(balancer.State{ +			ConnectivityState: connectivity.TransientFailure, +			Picker:            base.NewErrPicker(err), +		})  		return  	}  	// Perform this call without gsb.mu to prevent deadlocks if the child calls @@ -261,7 +295,8 @@ func (gsb *Balancer) Close() {  // graceful switch logic.  type balancerWrapper struct {  	balancer.Balancer -	gsb *Balancer +	gsb     *Balancer +	builder balancer.Builder  	lastState balancer.State  	subconns  map[balancer.SubConn]bool // subconns created by this balancer diff --git a/vendor/google.golang.org/grpc/internal/binarylog/method_logger.go b/vendor/google.golang.org/grpc/internal/binarylog/method_logger.go index 0f31274a3..e8456a77c 100644 --- a/vendor/google.golang.org/grpc/internal/binarylog/method_logger.go +++ b/vendor/google.golang.org/grpc/internal/binarylog/method_logger.go @@ -25,11 +25,12 @@ import (  	"sync/atomic"  	"time" -	"github.com/golang/protobuf/proto" -	"github.com/golang/protobuf/ptypes"  	binlogpb "google.golang.org/grpc/binarylog/grpc_binarylog_v1"  	"google.golang.org/grpc/metadata"  	"google.golang.org/grpc/status" +	"google.golang.org/protobuf/proto" +	"google.golang.org/protobuf/types/known/durationpb" +	"google.golang.org/protobuf/types/known/timestamppb"  )  type callIDGenerator struct { @@ -88,7 +89,7 @@ func NewTruncatingMethodLogger(h, m uint64) *TruncatingMethodLogger {  // in TruncatingMethodLogger as possible.  func (ml *TruncatingMethodLogger) Build(c LogEntryConfig) *binlogpb.GrpcLogEntry {  	m := c.toProto() -	timestamp, _ := ptypes.TimestampProto(time.Now()) +	timestamp := timestamppb.Now()  	m.Timestamp = timestamp  	m.CallId = ml.callID  	m.SequenceIdWithinCall = ml.idWithinCallGen.next() @@ -178,7 +179,7 @@ func (c *ClientHeader) toProto() *binlogpb.GrpcLogEntry {  		Authority:  c.Authority,  	}  	if c.Timeout > 0 { -		clientHeader.Timeout = ptypes.DurationProto(c.Timeout) +		clientHeader.Timeout = durationpb.New(c.Timeout)  	}  	ret := &binlogpb.GrpcLogEntry{  		Type: binlogpb.GrpcLogEntry_EVENT_TYPE_CLIENT_HEADER, diff --git a/vendor/google.golang.org/grpc/internal/binarylog/sink.go b/vendor/google.golang.org/grpc/internal/binarylog/sink.go index 264de387c..9ea598b14 100644 --- a/vendor/google.golang.org/grpc/internal/binarylog/sink.go +++ b/vendor/google.golang.org/grpc/internal/binarylog/sink.go @@ -25,8 +25,8 @@ import (  	"sync"  	"time" -	"github.com/golang/protobuf/proto"  	binlogpb "google.golang.org/grpc/binarylog/grpc_binarylog_v1" +	"google.golang.org/protobuf/proto"  )  var ( diff --git a/vendor/google.golang.org/grpc/internal/channelz/channel.go b/vendor/google.golang.org/grpc/internal/channelz/channel.go new file mode 100644 index 000000000..d7e9e1d54 --- /dev/null +++ b/vendor/google.golang.org/grpc/internal/channelz/channel.go @@ -0,0 +1,255 @@ +/* + * + * Copyright 2024 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + *     http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package channelz + +import ( +	"fmt" +	"sync/atomic" + +	"google.golang.org/grpc/connectivity" +) + +// Channel represents a channel within channelz, which includes metrics and +// internal channelz data, such as channelz id, child list, etc. +type Channel struct { +	Entity +	// ID is the channelz id of this channel. +	ID int64 +	// RefName is the human readable reference string of this channel. +	RefName string + +	closeCalled bool +	nestedChans map[int64]string +	subChans    map[int64]string +	Parent      *Channel +	trace       *ChannelTrace +	// traceRefCount is the number of trace events that reference this channel. +	// Non-zero traceRefCount means the trace of this channel cannot be deleted. +	traceRefCount int32 + +	ChannelMetrics ChannelMetrics +} + +// Implemented to make Channel implement the Identifier interface used for +// nesting. +func (c *Channel) channelzIdentifier() {} + +func (c *Channel) String() string { +	if c.Parent == nil { +		return fmt.Sprintf("Channel #%d", c.ID) +	} +	return fmt.Sprintf("%s Channel #%d", c.Parent, c.ID) +} + +func (c *Channel) id() int64 { +	return c.ID +} + +func (c *Channel) SubChans() map[int64]string { +	db.mu.RLock() +	defer db.mu.RUnlock() +	return copyMap(c.subChans) +} + +func (c *Channel) NestedChans() map[int64]string { +	db.mu.RLock() +	defer db.mu.RUnlock() +	return copyMap(c.nestedChans) +} + +func (c *Channel) Trace() *ChannelTrace { +	db.mu.RLock() +	defer db.mu.RUnlock() +	return c.trace.copy() +} + +type ChannelMetrics struct { +	// The current connectivity state of the channel. +	State atomic.Pointer[connectivity.State] +	// The target this channel originally tried to connect to.  May be absent +	Target atomic.Pointer[string] +	// The number of calls started on the channel. +	CallsStarted atomic.Int64 +	// The number of calls that have completed with an OK status. +	CallsSucceeded atomic.Int64 +	// The number of calls that have a completed with a non-OK status. +	CallsFailed atomic.Int64 +	// The last time a call was started on the channel. +	LastCallStartedTimestamp atomic.Int64 +} + +// CopyFrom copies the metrics in o to c.  For testing only. +func (c *ChannelMetrics) CopyFrom(o *ChannelMetrics) { +	c.State.Store(o.State.Load()) +	c.Target.Store(o.Target.Load()) +	c.CallsStarted.Store(o.CallsStarted.Load()) +	c.CallsSucceeded.Store(o.CallsSucceeded.Load()) +	c.CallsFailed.Store(o.CallsFailed.Load()) +	c.LastCallStartedTimestamp.Store(o.LastCallStartedTimestamp.Load()) +} + +// Equal returns true iff the metrics of c are the same as the metrics of o. +// For testing only. +func (c *ChannelMetrics) Equal(o any) bool { +	oc, ok := o.(*ChannelMetrics) +	if !ok { +		return false +	} +	if (c.State.Load() == nil) != (oc.State.Load() == nil) { +		return false +	} +	if c.State.Load() != nil && *c.State.Load() != *oc.State.Load() { +		return false +	} +	if (c.Target.Load() == nil) != (oc.Target.Load() == nil) { +		return false +	} +	if c.Target.Load() != nil && *c.Target.Load() != *oc.Target.Load() { +		return false +	} +	return c.CallsStarted.Load() == oc.CallsStarted.Load() && +		c.CallsFailed.Load() == oc.CallsFailed.Load() && +		c.CallsSucceeded.Load() == oc.CallsSucceeded.Load() && +		c.LastCallStartedTimestamp.Load() == oc.LastCallStartedTimestamp.Load() +} + +func strFromPointer(s *string) string { +	if s == nil { +		return "" +	} +	return *s +} + +func (c *ChannelMetrics) String() string { +	return fmt.Sprintf("State: %v, Target: %s, CallsStarted: %v, CallsSucceeded: %v, CallsFailed: %v, LastCallStartedTimestamp: %v", +		c.State.Load(), strFromPointer(c.Target.Load()), c.CallsStarted.Load(), c.CallsSucceeded.Load(), c.CallsFailed.Load(), c.LastCallStartedTimestamp.Load(), +	) +} + +func NewChannelMetricForTesting(state connectivity.State, target string, started, succeeded, failed, timestamp int64) *ChannelMetrics { +	c := &ChannelMetrics{} +	c.State.Store(&state) +	c.Target.Store(&target) +	c.CallsStarted.Store(started) +	c.CallsSucceeded.Store(succeeded) +	c.CallsFailed.Store(failed) +	c.LastCallStartedTimestamp.Store(timestamp) +	return c +} + +func (c *Channel) addChild(id int64, e entry) { +	switch v := e.(type) { +	case *SubChannel: +		c.subChans[id] = v.RefName +	case *Channel: +		c.nestedChans[id] = v.RefName +	default: +		logger.Errorf("cannot add a child (id = %d) of type %T to a channel", id, e) +	} +} + +func (c *Channel) deleteChild(id int64) { +	delete(c.subChans, id) +	delete(c.nestedChans, id) +	c.deleteSelfIfReady() +} + +func (c *Channel) triggerDelete() { +	c.closeCalled = true +	c.deleteSelfIfReady() +} + +func (c *Channel) getParentID() int64 { +	if c.Parent == nil { +		return -1 +	} +	return c.Parent.ID +} + +// deleteSelfFromTree tries to delete the channel from the channelz entry relation tree, which means +// deleting the channel reference from its parent's child list. +// +// In order for a channel to be deleted from the tree, it must meet the criteria that, removal of the +// corresponding grpc object has been invoked, and the channel does not have any children left. +// +// The returned boolean value indicates whether the channel has been successfully deleted from tree. +func (c *Channel) deleteSelfFromTree() (deleted bool) { +	if !c.closeCalled || len(c.subChans)+len(c.nestedChans) != 0 { +		return false +	} +	// not top channel +	if c.Parent != nil { +		c.Parent.deleteChild(c.ID) +	} +	return true +} + +// deleteSelfFromMap checks whether it is valid to delete the channel from the map, which means +// deleting the channel from channelz's tracking entirely. Users can no longer use id to query the +// channel, and its memory will be garbage collected. +// +// The trace reference count of the channel must be 0 in order to be deleted from the map. This is +// specified in the channel tracing gRFC that as long as some other trace has reference to an entity, +// the trace of the referenced entity must not be deleted. In order to release the resource allocated +// by grpc, the reference to the grpc object is reset to a dummy object. +// +// deleteSelfFromMap must be called after deleteSelfFromTree returns true. +// +// It returns a bool to indicate whether the channel can be safely deleted from map. +func (c *Channel) deleteSelfFromMap() (delete bool) { +	return c.getTraceRefCount() == 0 +} + +// deleteSelfIfReady tries to delete the channel itself from the channelz database. +// The delete process includes two steps: +//  1. delete the channel from the entry relation tree, i.e. delete the channel reference from its +//     parent's child list. +//  2. delete the channel from the map, i.e. delete the channel entirely from channelz. Lookup by id +//     will return entry not found error. +func (c *Channel) deleteSelfIfReady() { +	if !c.deleteSelfFromTree() { +		return +	} +	if !c.deleteSelfFromMap() { +		return +	} +	db.deleteEntry(c.ID) +	c.trace.clear() +} + +func (c *Channel) getChannelTrace() *ChannelTrace { +	return c.trace +} + +func (c *Channel) incrTraceRefCount() { +	atomic.AddInt32(&c.traceRefCount, 1) +} + +func (c *Channel) decrTraceRefCount() { +	atomic.AddInt32(&c.traceRefCount, -1) +} + +func (c *Channel) getTraceRefCount() int { +	i := atomic.LoadInt32(&c.traceRefCount) +	return int(i) +} + +func (c *Channel) getRefName() string { +	return c.RefName +} diff --git a/vendor/google.golang.org/grpc/internal/channelz/channelmap.go b/vendor/google.golang.org/grpc/internal/channelz/channelmap.go new file mode 100644 index 000000000..dfe18b089 --- /dev/null +++ b/vendor/google.golang.org/grpc/internal/channelz/channelmap.go @@ -0,0 +1,402 @@ +/* + * + * Copyright 2018 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + *     http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package channelz + +import ( +	"fmt" +	"sort" +	"sync" +	"time" +) + +// entry represents a node in the channelz database. +type entry interface { +	// addChild adds a child e, whose channelz id is id to child list +	addChild(id int64, e entry) +	// deleteChild deletes a child with channelz id to be id from child list +	deleteChild(id int64) +	// triggerDelete tries to delete self from channelz database. However, if +	// child list is not empty, then deletion from the database is on hold until +	// the last child is deleted from database. +	triggerDelete() +	// deleteSelfIfReady check whether triggerDelete() has been called before, +	// and whether child list is now empty. If both conditions are met, then +	// delete self from database. +	deleteSelfIfReady() +	// getParentID returns parent ID of the entry. 0 value parent ID means no parent. +	getParentID() int64 +	Entity +} + +// channelMap is the storage data structure for channelz. +// +// Methods of channelMap can be divided in two two categories with respect to +// locking. +// +// 1. Methods acquire the global lock. +// 2. Methods that can only be called when global lock is held. +// +// A second type of method need always to be called inside a first type of method. +type channelMap struct { +	mu               sync.RWMutex +	topLevelChannels map[int64]struct{} +	channels         map[int64]*Channel +	subChannels      map[int64]*SubChannel +	sockets          map[int64]*Socket +	servers          map[int64]*Server +} + +func newChannelMap() *channelMap { +	return &channelMap{ +		topLevelChannels: make(map[int64]struct{}), +		channels:         make(map[int64]*Channel), +		subChannels:      make(map[int64]*SubChannel), +		sockets:          make(map[int64]*Socket), +		servers:          make(map[int64]*Server), +	} +} + +func (c *channelMap) addServer(id int64, s *Server) { +	c.mu.Lock() +	defer c.mu.Unlock() +	s.cm = c +	c.servers[id] = s +} + +func (c *channelMap) addChannel(id int64, cn *Channel, isTopChannel bool, pid int64) { +	c.mu.Lock() +	defer c.mu.Unlock() +	cn.trace.cm = c +	c.channels[id] = cn +	if isTopChannel { +		c.topLevelChannels[id] = struct{}{} +	} else if p := c.channels[pid]; p != nil { +		p.addChild(id, cn) +	} else { +		logger.Infof("channel %d references invalid parent ID %d", id, pid) +	} +} + +func (c *channelMap) addSubChannel(id int64, sc *SubChannel, pid int64) { +	c.mu.Lock() +	defer c.mu.Unlock() +	sc.trace.cm = c +	c.subChannels[id] = sc +	if p := c.channels[pid]; p != nil { +		p.addChild(id, sc) +	} else { +		logger.Infof("subchannel %d references invalid parent ID %d", id, pid) +	} +} + +func (c *channelMap) addSocket(s *Socket) { +	c.mu.Lock() +	defer c.mu.Unlock() +	s.cm = c +	c.sockets[s.ID] = s +	if s.Parent == nil { +		logger.Infof("normal socket %d has no parent", s.ID) +	} +	s.Parent.(entry).addChild(s.ID, s) +} + +// removeEntry triggers the removal of an entry, which may not indeed delete the +// entry, if it has to wait on the deletion of its children and until no other +// entity's channel trace references it.  It may lead to a chain of entry +// deletion. For example, deleting the last socket of a gracefully shutting down +// server will lead to the server being also deleted. +func (c *channelMap) removeEntry(id int64) { +	c.mu.Lock() +	defer c.mu.Unlock() +	c.findEntry(id).triggerDelete() +} + +// tracedChannel represents tracing operations which are present on both +// channels and subChannels. +type tracedChannel interface { +	getChannelTrace() *ChannelTrace +	incrTraceRefCount() +	decrTraceRefCount() +	getRefName() string +} + +// c.mu must be held by the caller +func (c *channelMap) decrTraceRefCount(id int64) { +	e := c.findEntry(id) +	if v, ok := e.(tracedChannel); ok { +		v.decrTraceRefCount() +		e.deleteSelfIfReady() +	} +} + +// c.mu must be held by the caller. +func (c *channelMap) findEntry(id int64) entry { +	if v, ok := c.channels[id]; ok { +		return v +	} +	if v, ok := c.subChannels[id]; ok { +		return v +	} +	if v, ok := c.servers[id]; ok { +		return v +	} +	if v, ok := c.sockets[id]; ok { +		return v +	} +	return &dummyEntry{idNotFound: id} +} + +// c.mu must be held by the caller +// +// deleteEntry deletes an entry from the channelMap. Before calling this method, +// caller must check this entry is ready to be deleted, i.e removeEntry() has +// been called on it, and no children still exist. +func (c *channelMap) deleteEntry(id int64) entry { +	if v, ok := c.sockets[id]; ok { +		delete(c.sockets, id) +		return v +	} +	if v, ok := c.subChannels[id]; ok { +		delete(c.subChannels, id) +		return v +	} +	if v, ok := c.channels[id]; ok { +		delete(c.channels, id) +		delete(c.topLevelChannels, id) +		return v +	} +	if v, ok := c.servers[id]; ok { +		delete(c.servers, id) +		return v +	} +	return &dummyEntry{idNotFound: id} +} + +func (c *channelMap) traceEvent(id int64, desc *TraceEvent) { +	c.mu.Lock() +	defer c.mu.Unlock() +	child := c.findEntry(id) +	childTC, ok := child.(tracedChannel) +	if !ok { +		return +	} +	childTC.getChannelTrace().append(&traceEvent{Desc: desc.Desc, Severity: desc.Severity, Timestamp: time.Now()}) +	if desc.Parent != nil { +		parent := c.findEntry(child.getParentID()) +		var chanType RefChannelType +		switch child.(type) { +		case *Channel: +			chanType = RefChannel +		case *SubChannel: +			chanType = RefSubChannel +		} +		if parentTC, ok := parent.(tracedChannel); ok { +			parentTC.getChannelTrace().append(&traceEvent{ +				Desc:      desc.Parent.Desc, +				Severity:  desc.Parent.Severity, +				Timestamp: time.Now(), +				RefID:     id, +				RefName:   childTC.getRefName(), +				RefType:   chanType, +			}) +			childTC.incrTraceRefCount() +		} +	} +} + +type int64Slice []int64 + +func (s int64Slice) Len() int           { return len(s) } +func (s int64Slice) Swap(i, j int)      { s[i], s[j] = s[j], s[i] } +func (s int64Slice) Less(i, j int) bool { return s[i] < s[j] } + +func copyMap(m map[int64]string) map[int64]string { +	n := make(map[int64]string) +	for k, v := range m { +		n[k] = v +	} +	return n +} + +func min(a, b int) int { +	if a < b { +		return a +	} +	return b +} + +func (c *channelMap) getTopChannels(id int64, maxResults int) ([]*Channel, bool) { +	if maxResults <= 0 { +		maxResults = EntriesPerPage +	} +	c.mu.RLock() +	defer c.mu.RUnlock() +	l := int64(len(c.topLevelChannels)) +	ids := make([]int64, 0, l) + +	for k := range c.topLevelChannels { +		ids = append(ids, k) +	} +	sort.Sort(int64Slice(ids)) +	idx := sort.Search(len(ids), func(i int) bool { return ids[i] >= id }) +	end := true +	var t []*Channel +	for _, v := range ids[idx:] { +		if len(t) == maxResults { +			end = false +			break +		} +		if cn, ok := c.channels[v]; ok { +			t = append(t, cn) +		} +	} +	return t, end +} + +func (c *channelMap) getServers(id int64, maxResults int) ([]*Server, bool) { +	if maxResults <= 0 { +		maxResults = EntriesPerPage +	} +	c.mu.RLock() +	defer c.mu.RUnlock() +	ids := make([]int64, 0, len(c.servers)) +	for k := range c.servers { +		ids = append(ids, k) +	} +	sort.Sort(int64Slice(ids)) +	idx := sort.Search(len(ids), func(i int) bool { return ids[i] >= id }) +	end := true +	var s []*Server +	for _, v := range ids[idx:] { +		if len(s) == maxResults { +			end = false +			break +		} +		if svr, ok := c.servers[v]; ok { +			s = append(s, svr) +		} +	} +	return s, end +} + +func (c *channelMap) getServerSockets(id int64, startID int64, maxResults int) ([]*Socket, bool) { +	if maxResults <= 0 { +		maxResults = EntriesPerPage +	} +	c.mu.RLock() +	defer c.mu.RUnlock() +	svr, ok := c.servers[id] +	if !ok { +		// server with id doesn't exist. +		return nil, true +	} +	svrskts := svr.sockets +	ids := make([]int64, 0, len(svrskts)) +	sks := make([]*Socket, 0, min(len(svrskts), maxResults)) +	for k := range svrskts { +		ids = append(ids, k) +	} +	sort.Sort(int64Slice(ids)) +	idx := sort.Search(len(ids), func(i int) bool { return ids[i] >= startID }) +	end := true +	for _, v := range ids[idx:] { +		if len(sks) == maxResults { +			end = false +			break +		} +		if ns, ok := c.sockets[v]; ok { +			sks = append(sks, ns) +		} +	} +	return sks, end +} + +func (c *channelMap) getChannel(id int64) *Channel { +	c.mu.RLock() +	defer c.mu.RUnlock() +	return c.channels[id] +} + +func (c *channelMap) getSubChannel(id int64) *SubChannel { +	c.mu.RLock() +	defer c.mu.RUnlock() +	return c.subChannels[id] +} + +func (c *channelMap) getSocket(id int64) *Socket { +	c.mu.RLock() +	defer c.mu.RUnlock() +	return c.sockets[id] +} + +func (c *channelMap) getServer(id int64) *Server { +	c.mu.RLock() +	defer c.mu.RUnlock() +	return c.servers[id] +} + +type dummyEntry struct { +	// dummyEntry is a fake entry to handle entry not found case. +	idNotFound int64 +	Entity +} + +func (d *dummyEntry) String() string { +	return fmt.Sprintf("non-existent entity #%d", d.idNotFound) +} + +func (d *dummyEntry) ID() int64 { return d.idNotFound } + +func (d *dummyEntry) addChild(id int64, e entry) { +	// Note: It is possible for a normal program to reach here under race +	// condition.  For example, there could be a race between ClientConn.Close() +	// info being propagated to addrConn and http2Client. ClientConn.Close() +	// cancel the context and result in http2Client to error. The error info is +	// then caught by transport monitor and before addrConn.tearDown() is called +	// in side ClientConn.Close(). Therefore, the addrConn will create a new +	// transport. And when registering the new transport in channelz, its parent +	// addrConn could have already been torn down and deleted from channelz +	// tracking, and thus reach the code here. +	logger.Infof("attempt to add child of type %T with id %d to a parent (id=%d) that doesn't currently exist", e, id, d.idNotFound) +} + +func (d *dummyEntry) deleteChild(id int64) { +	// It is possible for a normal program to reach here under race condition. +	// Refer to the example described in addChild(). +	logger.Infof("attempt to delete child with id %d from a parent (id=%d) that doesn't currently exist", id, d.idNotFound) +} + +func (d *dummyEntry) triggerDelete() { +	logger.Warningf("attempt to delete an entry (id=%d) that doesn't currently exist", d.idNotFound) +} + +func (*dummyEntry) deleteSelfIfReady() { +	// code should not reach here. deleteSelfIfReady is always called on an existing entry. +} + +func (*dummyEntry) getParentID() int64 { +	return 0 +} + +// Entity is implemented by all channelz types. +type Entity interface { +	isEntity() +	fmt.Stringer +	id() int64 +} diff --git a/vendor/google.golang.org/grpc/internal/channelz/funcs.go b/vendor/google.golang.org/grpc/internal/channelz/funcs.go index fc094f344..f461e9bc3 100644 --- a/vendor/google.golang.org/grpc/internal/channelz/funcs.go +++ b/vendor/google.golang.org/grpc/internal/channelz/funcs.go @@ -16,47 +16,32 @@   *   */ -// Package channelz defines APIs for enabling channelz service, entry +// Package channelz defines internal APIs for enabling channelz service, entry  // registration/deletion, and accessing channelz data. It also defines channelz  // metric struct formats. -// -// All APIs in this package are experimental.  package channelz  import ( -	"errors" -	"sort" -	"sync"  	"sync/atomic"  	"time" -	"google.golang.org/grpc/grpclog"  	"google.golang.org/grpc/internal"  ) -const ( -	defaultMaxTraceEntry int32 = 30 -) -  var (  	// IDGen is the global channelz entity ID generator.  It should not be used  	// outside this package except by tests.  	IDGen IDGenerator -	db dbWrapper -	// EntryPerPage defines the number of channelz entries to be shown on a web page. -	EntryPerPage  = int64(50) -	curState      int32 -	maxTraceEntry = defaultMaxTraceEntry +	db *channelMap = newChannelMap() +	// EntriesPerPage defines the number of channelz entries to be shown on a web page. +	EntriesPerPage = 50 +	curState       int32  )  // TurnOn turns on channelz data collection.  func TurnOn() { -	if !IsOn() { -		db.set(newChannelMap()) -		IDGen.Reset() -		atomic.StoreInt32(&curState, 1) -	} +	atomic.StoreInt32(&curState, 1)  }  func init() { @@ -70,49 +55,15 @@ func IsOn() bool {  	return atomic.LoadInt32(&curState) == 1  } -// SetMaxTraceEntry sets maximum number of trace entry per entity (i.e. channel/subchannel). -// Setting it to 0 will disable channel tracing. -func SetMaxTraceEntry(i int32) { -	atomic.StoreInt32(&maxTraceEntry, i) -} - -// ResetMaxTraceEntryToDefault resets the maximum number of trace entry per entity to default. -func ResetMaxTraceEntryToDefault() { -	atomic.StoreInt32(&maxTraceEntry, defaultMaxTraceEntry) -} - -func getMaxTraceEntry() int { -	i := atomic.LoadInt32(&maxTraceEntry) -	return int(i) -} - -// dbWarpper wraps around a reference to internal channelz data storage, and -// provide synchronized functionality to set and get the reference. -type dbWrapper struct { -	mu sync.RWMutex -	DB *channelMap -} - -func (d *dbWrapper) set(db *channelMap) { -	d.mu.Lock() -	d.DB = db -	d.mu.Unlock() -} - -func (d *dbWrapper) get() *channelMap { -	d.mu.RLock() -	defer d.mu.RUnlock() -	return d.DB -} -  // GetTopChannels returns a slice of top channel's ChannelMetric, along with a  // boolean indicating whether there's more top channels to be queried for.  // -// The arg id specifies that only top channel with id at or above it will be included -// in the result. The returned slice is up to a length of the arg maxResults or -// EntryPerPage if maxResults is zero, and is sorted in ascending id order. -func GetTopChannels(id int64, maxResults int64) ([]*ChannelMetric, bool) { -	return db.get().GetTopChannels(id, maxResults) +// The arg id specifies that only top channel with id at or above it will be +// included in the result. The returned slice is up to a length of the arg +// maxResults or EntriesPerPage if maxResults is zero, and is sorted in ascending +// id order. +func GetTopChannels(id int64, maxResults int) ([]*Channel, bool) { +	return db.getTopChannels(id, maxResults)  }  // GetServers returns a slice of server's ServerMetric, along with a @@ -120,73 +71,69 @@ func GetTopChannels(id int64, maxResults int64) ([]*ChannelMetric, bool) {  //  // The arg id specifies that only server with id at or above it will be included  // in the result. The returned slice is up to a length of the arg maxResults or -// EntryPerPage if maxResults is zero, and is sorted in ascending id order. -func GetServers(id int64, maxResults int64) ([]*ServerMetric, bool) { -	return db.get().GetServers(id, maxResults) +// EntriesPerPage if maxResults is zero, and is sorted in ascending id order. +func GetServers(id int64, maxResults int) ([]*Server, bool) { +	return db.getServers(id, maxResults)  }  // GetServerSockets returns a slice of server's (identified by id) normal socket's -// SocketMetric, along with a boolean indicating whether there's more sockets to +// SocketMetrics, along with a boolean indicating whether there's more sockets to  // be queried for.  //  // The arg startID specifies that only sockets with id at or above it will be  // included in the result. The returned slice is up to a length of the arg maxResults -// or EntryPerPage if maxResults is zero, and is sorted in ascending id order. -func GetServerSockets(id int64, startID int64, maxResults int64) ([]*SocketMetric, bool) { -	return db.get().GetServerSockets(id, startID, maxResults) +// or EntriesPerPage if maxResults is zero, and is sorted in ascending id order. +func GetServerSockets(id int64, startID int64, maxResults int) ([]*Socket, bool) { +	return db.getServerSockets(id, startID, maxResults)  } -// GetChannel returns the ChannelMetric for the channel (identified by id). -func GetChannel(id int64) *ChannelMetric { -	return db.get().GetChannel(id) +// GetChannel returns the Channel for the channel (identified by id). +func GetChannel(id int64) *Channel { +	return db.getChannel(id)  } -// GetSubChannel returns the SubChannelMetric for the subchannel (identified by id). -func GetSubChannel(id int64) *SubChannelMetric { -	return db.get().GetSubChannel(id) +// GetSubChannel returns the SubChannel for the subchannel (identified by id). +func GetSubChannel(id int64) *SubChannel { +	return db.getSubChannel(id)  } -// GetSocket returns the SocketInternalMetric for the socket (identified by id). -func GetSocket(id int64) *SocketMetric { -	return db.get().GetSocket(id) +// GetSocket returns the Socket for the socket (identified by id). +func GetSocket(id int64) *Socket { +	return db.getSocket(id)  }  // GetServer returns the ServerMetric for the server (identified by id). -func GetServer(id int64) *ServerMetric { -	return db.get().GetServer(id) +func GetServer(id int64) *Server { +	return db.getServer(id)  }  // RegisterChannel registers the given channel c in the channelz database with -// ref as its reference name, and adds it to the child list of its parent -// (identified by pid). pid == nil means no parent. +// target as its target and reference name, and adds it to the child list of its +// parent.  parent == nil means no parent.  //  // Returns a unique channelz identifier assigned to this channel.  //  // If channelz is not turned ON, the channelz database is not mutated. -func RegisterChannel(c Channel, pid *Identifier, ref string) *Identifier { +func RegisterChannel(parent *Channel, target string) *Channel {  	id := IDGen.genID() -	var parent int64 -	isTopChannel := true -	if pid != nil { -		isTopChannel = false -		parent = pid.Int() -	}  	if !IsOn() { -		return newIdentifer(RefChannel, id, pid) +		return &Channel{ID: id}  	} -	cn := &channel{ -		refName:     ref, -		c:           c, -		subChans:    make(map[int64]string), +	isTopChannel := parent == nil + +	cn := &Channel{ +		ID:          id, +		RefName:     target,  		nestedChans: make(map[int64]string), -		id:          id, -		pid:         parent, -		trace:       &channelTrace{createdTime: time.Now(), events: make([]*TraceEvent, 0, getMaxTraceEntry())}, +		subChans:    make(map[int64]string), +		Parent:      parent, +		trace:       &ChannelTrace{CreationTime: time.Now(), Events: make([]*traceEvent, 0, getMaxTraceEntry())},  	} -	db.get().addChannel(id, cn, isTopChannel, parent) -	return newIdentifer(RefChannel, id, pid) +	cn.ChannelMetrics.Target.Store(&target) +	db.addChannel(id, cn, isTopChannel, cn.getParentID()) +	return cn  }  // RegisterSubChannel registers the given subChannel c in the channelz database @@ -196,555 +143,66 @@ func RegisterChannel(c Channel, pid *Identifier, ref string) *Identifier {  // Returns a unique channelz identifier assigned to this subChannel.  //  // If channelz is not turned ON, the channelz database is not mutated. -func RegisterSubChannel(c Channel, pid *Identifier, ref string) (*Identifier, error) { -	if pid == nil { -		return nil, errors.New("a SubChannel's parent id cannot be nil") -	} +func RegisterSubChannel(pid int64, ref string) *SubChannel {  	id := IDGen.genID()  	if !IsOn() { -		return newIdentifer(RefSubChannel, id, pid), nil +		return &SubChannel{ID: id}  	} -	sc := &subChannel{ -		refName: ref, -		c:       c, +	sc := &SubChannel{ +		RefName: ref, +		ID:      id,  		sockets: make(map[int64]string), -		id:      id, -		pid:     pid.Int(), -		trace:   &channelTrace{createdTime: time.Now(), events: make([]*TraceEvent, 0, getMaxTraceEntry())}, +		parent:  db.getChannel(pid), +		trace:   &ChannelTrace{CreationTime: time.Now(), Events: make([]*traceEvent, 0, getMaxTraceEntry())},  	} -	db.get().addSubChannel(id, sc, pid.Int()) -	return newIdentifer(RefSubChannel, id, pid), nil +	db.addSubChannel(id, sc, pid) +	return sc  }  // RegisterServer registers the given server s in channelz database. It returns  // the unique channelz tracking id assigned to this server.  //  // If channelz is not turned ON, the channelz database is not mutated. -func RegisterServer(s Server, ref string) *Identifier { +func RegisterServer(ref string) *Server {  	id := IDGen.genID()  	if !IsOn() { -		return newIdentifer(RefServer, id, nil) +		return &Server{ID: id}  	} -	svr := &server{ -		refName:       ref, -		s:             s, +	svr := &Server{ +		RefName:       ref,  		sockets:       make(map[int64]string),  		listenSockets: make(map[int64]string), -		id:            id, -	} -	db.get().addServer(id, svr) -	return newIdentifer(RefServer, id, nil) -} - -// RegisterListenSocket registers the given listen socket s in channelz database -// with ref as its reference name, and add it to the child list of its parent -// (identified by pid). It returns the unique channelz tracking id assigned to -// this listen socket. -// -// If channelz is not turned ON, the channelz database is not mutated. -func RegisterListenSocket(s Socket, pid *Identifier, ref string) (*Identifier, error) { -	if pid == nil { -		return nil, errors.New("a ListenSocket's parent id cannot be 0") +		ID:            id,  	} -	id := IDGen.genID() -	if !IsOn() { -		return newIdentifer(RefListenSocket, id, pid), nil -	} - -	ls := &listenSocket{refName: ref, s: s, id: id, pid: pid.Int()} -	db.get().addListenSocket(id, ls, pid.Int()) -	return newIdentifer(RefListenSocket, id, pid), nil +	db.addServer(id, svr) +	return svr  } -// RegisterNormalSocket registers the given normal socket s in channelz database +// RegisterSocket registers the given normal socket s in channelz database  // with ref as its reference name, and adds it to the child list of its parent -// (identified by pid). It returns the unique channelz tracking id assigned to -// this normal socket. +// (identified by skt.Parent, which must be set). It returns the unique channelz +// tracking id assigned to this normal socket.  //  // If channelz is not turned ON, the channelz database is not mutated. -func RegisterNormalSocket(s Socket, pid *Identifier, ref string) (*Identifier, error) { -	if pid == nil { -		return nil, errors.New("a NormalSocket's parent id cannot be 0") -	} -	id := IDGen.genID() -	if !IsOn() { -		return newIdentifer(RefNormalSocket, id, pid), nil +func RegisterSocket(skt *Socket) *Socket { +	skt.ID = IDGen.genID() +	if IsOn() { +		db.addSocket(skt)  	} - -	ns := &normalSocket{refName: ref, s: s, id: id, pid: pid.Int()} -	db.get().addNormalSocket(id, ns, pid.Int()) -	return newIdentifer(RefNormalSocket, id, pid), nil +	return skt  }  // RemoveEntry removes an entry with unique channelz tracking id to be id from  // channelz database.  //  // If channelz is not turned ON, this function is a no-op. -func RemoveEntry(id *Identifier) { +func RemoveEntry(id int64) {  	if !IsOn() {  		return  	} -	db.get().removeEntry(id.Int()) -} - -// TraceEventDesc is what the caller of AddTraceEvent should provide to describe -// the event to be added to the channel trace. -// -// The Parent field is optional. It is used for an event that will be recorded -// in the entity's parent trace. -type TraceEventDesc struct { -	Desc     string -	Severity Severity -	Parent   *TraceEventDesc -} - -// AddTraceEvent adds trace related to the entity with specified id, using the -// provided TraceEventDesc. -// -// If channelz is not turned ON, this will simply log the event descriptions. -func AddTraceEvent(l grpclog.DepthLoggerV2, id *Identifier, depth int, desc *TraceEventDesc) { -	// Log only the trace description associated with the bottom most entity. -	switch desc.Severity { -	case CtUnknown, CtInfo: -		l.InfoDepth(depth+1, withParens(id)+desc.Desc) -	case CtWarning: -		l.WarningDepth(depth+1, withParens(id)+desc.Desc) -	case CtError: -		l.ErrorDepth(depth+1, withParens(id)+desc.Desc) -	} - -	if getMaxTraceEntry() == 0 { -		return -	} -	if IsOn() { -		db.get().traceEvent(id.Int(), desc) -	} -} - -// channelMap is the storage data structure for channelz. -// Methods of channelMap can be divided in two two categories with respect to locking. -// 1. Methods acquire the global lock. -// 2. Methods that can only be called when global lock is held. -// A second type of method need always to be called inside a first type of method. -type channelMap struct { -	mu               sync.RWMutex -	topLevelChannels map[int64]struct{} -	servers          map[int64]*server -	channels         map[int64]*channel -	subChannels      map[int64]*subChannel -	listenSockets    map[int64]*listenSocket -	normalSockets    map[int64]*normalSocket -} - -func newChannelMap() *channelMap { -	return &channelMap{ -		topLevelChannels: make(map[int64]struct{}), -		channels:         make(map[int64]*channel), -		listenSockets:    make(map[int64]*listenSocket), -		normalSockets:    make(map[int64]*normalSocket), -		servers:          make(map[int64]*server), -		subChannels:      make(map[int64]*subChannel), -	} -} - -func (c *channelMap) addServer(id int64, s *server) { -	c.mu.Lock() -	s.cm = c -	c.servers[id] = s -	c.mu.Unlock() -} - -func (c *channelMap) addChannel(id int64, cn *channel, isTopChannel bool, pid int64) { -	c.mu.Lock() -	cn.cm = c -	cn.trace.cm = c -	c.channels[id] = cn -	if isTopChannel { -		c.topLevelChannels[id] = struct{}{} -	} else { -		c.findEntry(pid).addChild(id, cn) -	} -	c.mu.Unlock() -} - -func (c *channelMap) addSubChannel(id int64, sc *subChannel, pid int64) { -	c.mu.Lock() -	sc.cm = c -	sc.trace.cm = c -	c.subChannels[id] = sc -	c.findEntry(pid).addChild(id, sc) -	c.mu.Unlock() -} - -func (c *channelMap) addListenSocket(id int64, ls *listenSocket, pid int64) { -	c.mu.Lock() -	ls.cm = c -	c.listenSockets[id] = ls -	c.findEntry(pid).addChild(id, ls) -	c.mu.Unlock() -} - -func (c *channelMap) addNormalSocket(id int64, ns *normalSocket, pid int64) { -	c.mu.Lock() -	ns.cm = c -	c.normalSockets[id] = ns -	c.findEntry(pid).addChild(id, ns) -	c.mu.Unlock() -} - -// removeEntry triggers the removal of an entry, which may not indeed delete the entry, if it has to -// wait on the deletion of its children and until no other entity's channel trace references it. -// It may lead to a chain of entry deletion. For example, deleting the last socket of a gracefully -// shutting down server will lead to the server being also deleted. -func (c *channelMap) removeEntry(id int64) { -	c.mu.Lock() -	c.findEntry(id).triggerDelete() -	c.mu.Unlock() -} - -// c.mu must be held by the caller -func (c *channelMap) decrTraceRefCount(id int64) { -	e := c.findEntry(id) -	if v, ok := e.(tracedChannel); ok { -		v.decrTraceRefCount() -		e.deleteSelfIfReady() -	} -} - -// c.mu must be held by the caller. -func (c *channelMap) findEntry(id int64) entry { -	var v entry -	var ok bool -	if v, ok = c.channels[id]; ok { -		return v -	} -	if v, ok = c.subChannels[id]; ok { -		return v -	} -	if v, ok = c.servers[id]; ok { -		return v -	} -	if v, ok = c.listenSockets[id]; ok { -		return v -	} -	if v, ok = c.normalSockets[id]; ok { -		return v -	} -	return &dummyEntry{idNotFound: id} -} - -// c.mu must be held by the caller -// deleteEntry simply deletes an entry from the channelMap. Before calling this -// method, caller must check this entry is ready to be deleted, i.e removeEntry() -// has been called on it, and no children still exist. -// Conditionals are ordered by the expected frequency of deletion of each entity -// type, in order to optimize performance. -func (c *channelMap) deleteEntry(id int64) { -	var ok bool -	if _, ok = c.normalSockets[id]; ok { -		delete(c.normalSockets, id) -		return -	} -	if _, ok = c.subChannels[id]; ok { -		delete(c.subChannels, id) -		return -	} -	if _, ok = c.channels[id]; ok { -		delete(c.channels, id) -		delete(c.topLevelChannels, id) -		return -	} -	if _, ok = c.listenSockets[id]; ok { -		delete(c.listenSockets, id) -		return -	} -	if _, ok = c.servers[id]; ok { -		delete(c.servers, id) -		return -	} -} - -func (c *channelMap) traceEvent(id int64, desc *TraceEventDesc) { -	c.mu.Lock() -	child := c.findEntry(id) -	childTC, ok := child.(tracedChannel) -	if !ok { -		c.mu.Unlock() -		return -	} -	childTC.getChannelTrace().append(&TraceEvent{Desc: desc.Desc, Severity: desc.Severity, Timestamp: time.Now()}) -	if desc.Parent != nil { -		parent := c.findEntry(child.getParentID()) -		var chanType RefChannelType -		switch child.(type) { -		case *channel: -			chanType = RefChannel -		case *subChannel: -			chanType = RefSubChannel -		} -		if parentTC, ok := parent.(tracedChannel); ok { -			parentTC.getChannelTrace().append(&TraceEvent{ -				Desc:      desc.Parent.Desc, -				Severity:  desc.Parent.Severity, -				Timestamp: time.Now(), -				RefID:     id, -				RefName:   childTC.getRefName(), -				RefType:   chanType, -			}) -			childTC.incrTraceRefCount() -		} -	} -	c.mu.Unlock() -} - -type int64Slice []int64 - -func (s int64Slice) Len() int           { return len(s) } -func (s int64Slice) Swap(i, j int)      { s[i], s[j] = s[j], s[i] } -func (s int64Slice) Less(i, j int) bool { return s[i] < s[j] } - -func copyMap(m map[int64]string) map[int64]string { -	n := make(map[int64]string) -	for k, v := range m { -		n[k] = v -	} -	return n -} - -func min(a, b int64) int64 { -	if a < b { -		return a -	} -	return b -} - -func (c *channelMap) GetTopChannels(id int64, maxResults int64) ([]*ChannelMetric, bool) { -	if maxResults <= 0 { -		maxResults = EntryPerPage -	} -	c.mu.RLock() -	l := int64(len(c.topLevelChannels)) -	ids := make([]int64, 0, l) -	cns := make([]*channel, 0, min(l, maxResults)) - -	for k := range c.topLevelChannels { -		ids = append(ids, k) -	} -	sort.Sort(int64Slice(ids)) -	idx := sort.Search(len(ids), func(i int) bool { return ids[i] >= id }) -	count := int64(0) -	var end bool -	var t []*ChannelMetric -	for i, v := range ids[idx:] { -		if count == maxResults { -			break -		} -		if cn, ok := c.channels[v]; ok { -			cns = append(cns, cn) -			t = append(t, &ChannelMetric{ -				NestedChans: copyMap(cn.nestedChans), -				SubChans:    copyMap(cn.subChans), -			}) -			count++ -		} -		if i == len(ids[idx:])-1 { -			end = true -			break -		} -	} -	c.mu.RUnlock() -	if count == 0 { -		end = true -	} - -	for i, cn := range cns { -		t[i].ChannelData = cn.c.ChannelzMetric() -		t[i].ID = cn.id -		t[i].RefName = cn.refName -		t[i].Trace = cn.trace.dumpData() -	} -	return t, end -} - -func (c *channelMap) GetServers(id, maxResults int64) ([]*ServerMetric, bool) { -	if maxResults <= 0 { -		maxResults = EntryPerPage -	} -	c.mu.RLock() -	l := int64(len(c.servers)) -	ids := make([]int64, 0, l) -	ss := make([]*server, 0, min(l, maxResults)) -	for k := range c.servers { -		ids = append(ids, k) -	} -	sort.Sort(int64Slice(ids)) -	idx := sort.Search(len(ids), func(i int) bool { return ids[i] >= id }) -	count := int64(0) -	var end bool -	var s []*ServerMetric -	for i, v := range ids[idx:] { -		if count == maxResults { -			break -		} -		if svr, ok := c.servers[v]; ok { -			ss = append(ss, svr) -			s = append(s, &ServerMetric{ -				ListenSockets: copyMap(svr.listenSockets), -			}) -			count++ -		} -		if i == len(ids[idx:])-1 { -			end = true -			break -		} -	} -	c.mu.RUnlock() -	if count == 0 { -		end = true -	} - -	for i, svr := range ss { -		s[i].ServerData = svr.s.ChannelzMetric() -		s[i].ID = svr.id -		s[i].RefName = svr.refName -	} -	return s, end -} - -func (c *channelMap) GetServerSockets(id int64, startID int64, maxResults int64) ([]*SocketMetric, bool) { -	if maxResults <= 0 { -		maxResults = EntryPerPage -	} -	var svr *server -	var ok bool -	c.mu.RLock() -	if svr, ok = c.servers[id]; !ok { -		// server with id doesn't exist. -		c.mu.RUnlock() -		return nil, true -	} -	svrskts := svr.sockets -	l := int64(len(svrskts)) -	ids := make([]int64, 0, l) -	sks := make([]*normalSocket, 0, min(l, maxResults)) -	for k := range svrskts { -		ids = append(ids, k) -	} -	sort.Sort(int64Slice(ids)) -	idx := sort.Search(len(ids), func(i int) bool { return ids[i] >= startID }) -	count := int64(0) -	var end bool -	for i, v := range ids[idx:] { -		if count == maxResults { -			break -		} -		if ns, ok := c.normalSockets[v]; ok { -			sks = append(sks, ns) -			count++ -		} -		if i == len(ids[idx:])-1 { -			end = true -			break -		} -	} -	c.mu.RUnlock() -	if count == 0 { -		end = true -	} -	s := make([]*SocketMetric, 0, len(sks)) -	for _, ns := range sks { -		sm := &SocketMetric{} -		sm.SocketData = ns.s.ChannelzMetric() -		sm.ID = ns.id -		sm.RefName = ns.refName -		s = append(s, sm) -	} -	return s, end -} - -func (c *channelMap) GetChannel(id int64) *ChannelMetric { -	cm := &ChannelMetric{} -	var cn *channel -	var ok bool -	c.mu.RLock() -	if cn, ok = c.channels[id]; !ok { -		// channel with id doesn't exist. -		c.mu.RUnlock() -		return nil -	} -	cm.NestedChans = copyMap(cn.nestedChans) -	cm.SubChans = copyMap(cn.subChans) -	// cn.c can be set to &dummyChannel{} when deleteSelfFromMap is called. Save a copy of cn.c when -	// holding the lock to prevent potential data race. -	chanCopy := cn.c -	c.mu.RUnlock() -	cm.ChannelData = chanCopy.ChannelzMetric() -	cm.ID = cn.id -	cm.RefName = cn.refName -	cm.Trace = cn.trace.dumpData() -	return cm -} - -func (c *channelMap) GetSubChannel(id int64) *SubChannelMetric { -	cm := &SubChannelMetric{} -	var sc *subChannel -	var ok bool -	c.mu.RLock() -	if sc, ok = c.subChannels[id]; !ok { -		// subchannel with id doesn't exist. -		c.mu.RUnlock() -		return nil -	} -	cm.Sockets = copyMap(sc.sockets) -	// sc.c can be set to &dummyChannel{} when deleteSelfFromMap is called. Save a copy of sc.c when -	// holding the lock to prevent potential data race. -	chanCopy := sc.c -	c.mu.RUnlock() -	cm.ChannelData = chanCopy.ChannelzMetric() -	cm.ID = sc.id -	cm.RefName = sc.refName -	cm.Trace = sc.trace.dumpData() -	return cm -} - -func (c *channelMap) GetSocket(id int64) *SocketMetric { -	sm := &SocketMetric{} -	c.mu.RLock() -	if ls, ok := c.listenSockets[id]; ok { -		c.mu.RUnlock() -		sm.SocketData = ls.s.ChannelzMetric() -		sm.ID = ls.id -		sm.RefName = ls.refName -		return sm -	} -	if ns, ok := c.normalSockets[id]; ok { -		c.mu.RUnlock() -		sm.SocketData = ns.s.ChannelzMetric() -		sm.ID = ns.id -		sm.RefName = ns.refName -		return sm -	} -	c.mu.RUnlock() -	return nil -} - -func (c *channelMap) GetServer(id int64) *ServerMetric { -	sm := &ServerMetric{} -	var svr *server -	var ok bool -	c.mu.RLock() -	if svr, ok = c.servers[id]; !ok { -		c.mu.RUnlock() -		return nil -	} -	sm.ListenSockets = copyMap(svr.listenSockets) -	c.mu.RUnlock() -	sm.ID = svr.id -	sm.RefName = svr.refName -	sm.ServerData = svr.s.ChannelzMetric() -	return sm +	db.removeEntry(id)  }  // IDGenerator is an incrementing atomic that tracks IDs for channelz entities. @@ -761,3 +219,11 @@ func (i *IDGenerator) Reset() {  func (i *IDGenerator) genID() int64 {  	return atomic.AddInt64(&i.id, 1)  } + +// Identifier is an opaque channelz identifier used to expose channelz symbols +// outside of grpc.  Currently only implemented by Channel since no other +// types require exposure outside grpc. +type Identifier interface { +	Entity +	channelzIdentifier() +} diff --git a/vendor/google.golang.org/grpc/internal/channelz/id.go b/vendor/google.golang.org/grpc/internal/channelz/id.go deleted file mode 100644 index c9a27acd3..000000000 --- a/vendor/google.golang.org/grpc/internal/channelz/id.go +++ /dev/null @@ -1,75 +0,0 @@ -/* - * - * Copyright 2022 gRPC authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - *     http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -package channelz - -import "fmt" - -// Identifier is an opaque identifier which uniquely identifies an entity in the -// channelz database. -type Identifier struct { -	typ RefChannelType -	id  int64 -	str string -	pid *Identifier -} - -// Type returns the entity type corresponding to id. -func (id *Identifier) Type() RefChannelType { -	return id.typ -} - -// Int returns the integer identifier corresponding to id. -func (id *Identifier) Int() int64 { -	return id.id -} - -// String returns a string representation of the entity corresponding to id. -// -// This includes some information about the parent as well. Examples: -// Top-level channel: [Channel #channel-number] -// Nested channel:    [Channel #parent-channel-number Channel #channel-number] -// Sub channel:       [Channel #parent-channel SubChannel #subchannel-number] -func (id *Identifier) String() string { -	return id.str -} - -// Equal returns true if other is the same as id. -func (id *Identifier) Equal(other *Identifier) bool { -	if (id != nil) != (other != nil) { -		return false -	} -	if id == nil && other == nil { -		return true -	} -	return id.typ == other.typ && id.id == other.id && id.pid == other.pid -} - -// NewIdentifierForTesting returns a new opaque identifier to be used only for -// testing purposes. -func NewIdentifierForTesting(typ RefChannelType, id int64, pid *Identifier) *Identifier { -	return newIdentifer(typ, id, pid) -} - -func newIdentifer(typ RefChannelType, id int64, pid *Identifier) *Identifier { -	str := fmt.Sprintf("%s #%d", typ, id) -	if pid != nil { -		str = fmt.Sprintf("%s %s", pid, str) -	} -	return &Identifier{typ: typ, id: id, str: str, pid: pid} -} diff --git a/vendor/google.golang.org/grpc/internal/channelz/logging.go b/vendor/google.golang.org/grpc/internal/channelz/logging.go index f89e6f77b..ee4d72125 100644 --- a/vendor/google.golang.org/grpc/internal/channelz/logging.go +++ b/vendor/google.golang.org/grpc/internal/channelz/logging.go @@ -26,53 +26,49 @@ import (  var logger = grpclog.Component("channelz") -func withParens(id *Identifier) string { -	return "[" + id.String() + "] " -} -  // Info logs and adds a trace event if channelz is on. -func Info(l grpclog.DepthLoggerV2, id *Identifier, args ...any) { -	AddTraceEvent(l, id, 1, &TraceEventDesc{ +func Info(l grpclog.DepthLoggerV2, e Entity, args ...any) { +	AddTraceEvent(l, e, 1, &TraceEvent{  		Desc:     fmt.Sprint(args...),  		Severity: CtInfo,  	})  }  // Infof logs and adds a trace event if channelz is on. -func Infof(l grpclog.DepthLoggerV2, id *Identifier, format string, args ...any) { -	AddTraceEvent(l, id, 1, &TraceEventDesc{ +func Infof(l grpclog.DepthLoggerV2, e Entity, format string, args ...any) { +	AddTraceEvent(l, e, 1, &TraceEvent{  		Desc:     fmt.Sprintf(format, args...),  		Severity: CtInfo,  	})  }  // Warning logs and adds a trace event if channelz is on. -func Warning(l grpclog.DepthLoggerV2, id *Identifier, args ...any) { -	AddTraceEvent(l, id, 1, &TraceEventDesc{ +func Warning(l grpclog.DepthLoggerV2, e Entity, args ...any) { +	AddTraceEvent(l, e, 1, &TraceEvent{  		Desc:     fmt.Sprint(args...),  		Severity: CtWarning,  	})  }  // Warningf logs and adds a trace event if channelz is on. -func Warningf(l grpclog.DepthLoggerV2, id *Identifier, format string, args ...any) { -	AddTraceEvent(l, id, 1, &TraceEventDesc{ +func Warningf(l grpclog.DepthLoggerV2, e Entity, format string, args ...any) { +	AddTraceEvent(l, e, 1, &TraceEvent{  		Desc:     fmt.Sprintf(format, args...),  		Severity: CtWarning,  	})  }  // Error logs and adds a trace event if channelz is on. -func Error(l grpclog.DepthLoggerV2, id *Identifier, args ...any) { -	AddTraceEvent(l, id, 1, &TraceEventDesc{ +func Error(l grpclog.DepthLoggerV2, e Entity, args ...any) { +	AddTraceEvent(l, e, 1, &TraceEvent{  		Desc:     fmt.Sprint(args...),  		Severity: CtError,  	})  }  // Errorf logs and adds a trace event if channelz is on. -func Errorf(l grpclog.DepthLoggerV2, id *Identifier, format string, args ...any) { -	AddTraceEvent(l, id, 1, &TraceEventDesc{ +func Errorf(l grpclog.DepthLoggerV2, e Entity, format string, args ...any) { +	AddTraceEvent(l, e, 1, &TraceEvent{  		Desc:     fmt.Sprintf(format, args...),  		Severity: CtError,  	}) diff --git a/vendor/google.golang.org/grpc/internal/channelz/server.go b/vendor/google.golang.org/grpc/internal/channelz/server.go new file mode 100644 index 000000000..cdfc49d6e --- /dev/null +++ b/vendor/google.golang.org/grpc/internal/channelz/server.go @@ -0,0 +1,119 @@ +/* + * + * Copyright 2024 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + *     http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package channelz + +import ( +	"fmt" +	"sync/atomic" +) + +// Server is the channelz representation of a server. +type Server struct { +	Entity +	ID      int64 +	RefName string + +	ServerMetrics ServerMetrics + +	closeCalled   bool +	sockets       map[int64]string +	listenSockets map[int64]string +	cm            *channelMap +} + +// ServerMetrics defines a struct containing metrics for servers. +type ServerMetrics struct { +	// The number of incoming calls started on the server. +	CallsStarted atomic.Int64 +	// The number of incoming calls that have completed with an OK status. +	CallsSucceeded atomic.Int64 +	// The number of incoming calls that have a completed with a non-OK status. +	CallsFailed atomic.Int64 +	// The last time a call was started on the server. +	LastCallStartedTimestamp atomic.Int64 +} + +// NewServerMetricsForTesting returns an initialized ServerMetrics. +func NewServerMetricsForTesting(started, succeeded, failed, timestamp int64) *ServerMetrics { +	sm := &ServerMetrics{} +	sm.CallsStarted.Store(started) +	sm.CallsSucceeded.Store(succeeded) +	sm.CallsFailed.Store(failed) +	sm.LastCallStartedTimestamp.Store(timestamp) +	return sm +} + +func (sm *ServerMetrics) CopyFrom(o *ServerMetrics) { +	sm.CallsStarted.Store(o.CallsStarted.Load()) +	sm.CallsSucceeded.Store(o.CallsSucceeded.Load()) +	sm.CallsFailed.Store(o.CallsFailed.Load()) +	sm.LastCallStartedTimestamp.Store(o.LastCallStartedTimestamp.Load()) +} + +// ListenSockets returns the listening sockets for s. +func (s *Server) ListenSockets() map[int64]string { +	db.mu.RLock() +	defer db.mu.RUnlock() +	return copyMap(s.listenSockets) +} + +// String returns a printable description of s. +func (s *Server) String() string { +	return fmt.Sprintf("Server #%d", s.ID) +} + +func (s *Server) id() int64 { +	return s.ID +} + +func (s *Server) addChild(id int64, e entry) { +	switch v := e.(type) { +	case *Socket: +		switch v.SocketType { +		case SocketTypeNormal: +			s.sockets[id] = v.RefName +		case SocketTypeListen: +			s.listenSockets[id] = v.RefName +		} +	default: +		logger.Errorf("cannot add a child (id = %d) of type %T to a server", id, e) +	} +} + +func (s *Server) deleteChild(id int64) { +	delete(s.sockets, id) +	delete(s.listenSockets, id) +	s.deleteSelfIfReady() +} + +func (s *Server) triggerDelete() { +	s.closeCalled = true +	s.deleteSelfIfReady() +} + +func (s *Server) deleteSelfIfReady() { +	if !s.closeCalled || len(s.sockets)+len(s.listenSockets) != 0 { +		return +	} +	s.cm.deleteEntry(s.ID) +} + +func (s *Server) getParentID() int64 { +	return 0 +} diff --git a/vendor/google.golang.org/grpc/internal/channelz/socket.go b/vendor/google.golang.org/grpc/internal/channelz/socket.go new file mode 100644 index 000000000..fa64834b2 --- /dev/null +++ b/vendor/google.golang.org/grpc/internal/channelz/socket.go @@ -0,0 +1,130 @@ +/* + * + * Copyright 2024 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + *     http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package channelz + +import ( +	"fmt" +	"net" +	"sync/atomic" + +	"google.golang.org/grpc/credentials" +) + +// SocketMetrics defines the struct that the implementor of Socket interface +// should return from ChannelzMetric(). +type SocketMetrics struct { +	// The number of streams that have been started. +	StreamsStarted atomic.Int64 +	// The number of streams that have ended successfully: +	// On client side, receiving frame with eos bit set. +	// On server side, sending frame with eos bit set. +	StreamsSucceeded atomic.Int64 +	// The number of streams that have ended unsuccessfully: +	// On client side, termination without receiving frame with eos bit set. +	// On server side, termination without sending frame with eos bit set. +	StreamsFailed atomic.Int64 +	// The number of messages successfully sent on this socket. +	MessagesSent     atomic.Int64 +	MessagesReceived atomic.Int64 +	// The number of keep alives sent.  This is typically implemented with HTTP/2 +	// ping messages. +	KeepAlivesSent atomic.Int64 +	// The last time a stream was created by this endpoint.  Usually unset for +	// servers. +	LastLocalStreamCreatedTimestamp atomic.Int64 +	// The last time a stream was created by the remote endpoint.  Usually unset +	// for clients. +	LastRemoteStreamCreatedTimestamp atomic.Int64 +	// The last time a message was sent by this endpoint. +	LastMessageSentTimestamp atomic.Int64 +	// The last time a message was received by this endpoint. +	LastMessageReceivedTimestamp atomic.Int64 +} + +// EphemeralSocketMetrics are metrics that change rapidly and are tracked +// outside of channelz. +type EphemeralSocketMetrics struct { +	// The amount of window, granted to the local endpoint by the remote endpoint. +	// This may be slightly out of date due to network latency.  This does NOT +	// include stream level or TCP level flow control info. +	LocalFlowControlWindow int64 +	// The amount of window, granted to the remote endpoint by the local endpoint. +	// This may be slightly out of date due to network latency.  This does NOT +	// include stream level or TCP level flow control info. +	RemoteFlowControlWindow int64 +} + +type SocketType string + +const ( +	SocketTypeNormal = "NormalSocket" +	SocketTypeListen = "ListenSocket" +) + +type Socket struct { +	Entity +	SocketType       SocketType +	ID               int64 +	Parent           Entity +	cm               *channelMap +	SocketMetrics    SocketMetrics +	EphemeralMetrics func() *EphemeralSocketMetrics + +	RefName string +	// The locally bound address.  Immutable. +	LocalAddr net.Addr +	// The remote bound address.  May be absent.  Immutable. +	RemoteAddr net.Addr +	// Optional, represents the name of the remote endpoint, if different than +	// the original target name.  Immutable. +	RemoteName string +	// Immutable. +	SocketOptions *SocketOptionData +	// Immutable. +	Security credentials.ChannelzSecurityValue +} + +func (ls *Socket) String() string { +	return fmt.Sprintf("%s %s #%d", ls.Parent, ls.SocketType, ls.ID) +} + +func (ls *Socket) id() int64 { +	return ls.ID +} + +func (ls *Socket) addChild(id int64, e entry) { +	logger.Errorf("cannot add a child (id = %d) of type %T to a listen socket", id, e) +} + +func (ls *Socket) deleteChild(id int64) { +	logger.Errorf("cannot delete a child (id = %d) from a listen socket", id) +} + +func (ls *Socket) triggerDelete() { +	ls.cm.deleteEntry(ls.ID) +	ls.Parent.(entry).deleteChild(ls.ID) +} + +func (ls *Socket) deleteSelfIfReady() { +	logger.Errorf("cannot call deleteSelfIfReady on a listen socket") +} + +func (ls *Socket) getParentID() int64 { +	return ls.Parent.id() +} diff --git a/vendor/google.golang.org/grpc/internal/channelz/subchannel.go b/vendor/google.golang.org/grpc/internal/channelz/subchannel.go new file mode 100644 index 000000000..3b88e4cba --- /dev/null +++ b/vendor/google.golang.org/grpc/internal/channelz/subchannel.go @@ -0,0 +1,151 @@ +/* + * + * Copyright 2024 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + *     http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package channelz + +import ( +	"fmt" +	"sync/atomic" +) + +// SubChannel is the channelz representation of a subchannel. +type SubChannel struct { +	Entity +	// ID is the channelz id of this subchannel. +	ID int64 +	// RefName is the human readable reference string of this subchannel. +	RefName       string +	closeCalled   bool +	sockets       map[int64]string +	parent        *Channel +	trace         *ChannelTrace +	traceRefCount int32 + +	ChannelMetrics ChannelMetrics +} + +func (sc *SubChannel) String() string { +	return fmt.Sprintf("%s SubChannel #%d", sc.parent, sc.ID) +} + +func (sc *SubChannel) id() int64 { +	return sc.ID +} + +func (sc *SubChannel) Sockets() map[int64]string { +	db.mu.RLock() +	defer db.mu.RUnlock() +	return copyMap(sc.sockets) +} + +func (sc *SubChannel) Trace() *ChannelTrace { +	db.mu.RLock() +	defer db.mu.RUnlock() +	return sc.trace.copy() +} + +func (sc *SubChannel) addChild(id int64, e entry) { +	if v, ok := e.(*Socket); ok && v.SocketType == SocketTypeNormal { +		sc.sockets[id] = v.RefName +	} else { +		logger.Errorf("cannot add a child (id = %d) of type %T to a subChannel", id, e) +	} +} + +func (sc *SubChannel) deleteChild(id int64) { +	delete(sc.sockets, id) +	sc.deleteSelfIfReady() +} + +func (sc *SubChannel) triggerDelete() { +	sc.closeCalled = true +	sc.deleteSelfIfReady() +} + +func (sc *SubChannel) getParentID() int64 { +	return sc.parent.ID +} + +// deleteSelfFromTree tries to delete the subchannel from the channelz entry relation tree, which +// means deleting the subchannel reference from its parent's child list. +// +// In order for a subchannel to be deleted from the tree, it must meet the criteria that, removal of +// the corresponding grpc object has been invoked, and the subchannel does not have any children left. +// +// The returned boolean value indicates whether the channel has been successfully deleted from tree. +func (sc *SubChannel) deleteSelfFromTree() (deleted bool) { +	if !sc.closeCalled || len(sc.sockets) != 0 { +		return false +	} +	sc.parent.deleteChild(sc.ID) +	return true +} + +// deleteSelfFromMap checks whether it is valid to delete the subchannel from the map, which means +// deleting the subchannel from channelz's tracking entirely. Users can no longer use id to query +// the subchannel, and its memory will be garbage collected. +// +// The trace reference count of the subchannel must be 0 in order to be deleted from the map. This is +// specified in the channel tracing gRFC that as long as some other trace has reference to an entity, +// the trace of the referenced entity must not be deleted. In order to release the resource allocated +// by grpc, the reference to the grpc object is reset to a dummy object. +// +// deleteSelfFromMap must be called after deleteSelfFromTree returns true. +// +// It returns a bool to indicate whether the channel can be safely deleted from map. +func (sc *SubChannel) deleteSelfFromMap() (delete bool) { +	return sc.getTraceRefCount() == 0 +} + +// deleteSelfIfReady tries to delete the subchannel itself from the channelz database. +// The delete process includes two steps: +//  1. delete the subchannel from the entry relation tree, i.e. delete the subchannel reference from +//     its parent's child list. +//  2. delete the subchannel from the map, i.e. delete the subchannel entirely from channelz. Lookup +//     by id will return entry not found error. +func (sc *SubChannel) deleteSelfIfReady() { +	if !sc.deleteSelfFromTree() { +		return +	} +	if !sc.deleteSelfFromMap() { +		return +	} +	db.deleteEntry(sc.ID) +	sc.trace.clear() +} + +func (sc *SubChannel) getChannelTrace() *ChannelTrace { +	return sc.trace +} + +func (sc *SubChannel) incrTraceRefCount() { +	atomic.AddInt32(&sc.traceRefCount, 1) +} + +func (sc *SubChannel) decrTraceRefCount() { +	atomic.AddInt32(&sc.traceRefCount, -1) +} + +func (sc *SubChannel) getTraceRefCount() int { +	i := atomic.LoadInt32(&sc.traceRefCount) +	return int(i) +} + +func (sc *SubChannel) getRefName() string { +	return sc.RefName +} diff --git a/vendor/google.golang.org/grpc/internal/channelz/types_linux.go b/vendor/google.golang.org/grpc/internal/channelz/syscall_linux.go index 1b1c4cce3..5ac73ff83 100644 --- a/vendor/google.golang.org/grpc/internal/channelz/types_linux.go +++ b/vendor/google.golang.org/grpc/internal/channelz/syscall_linux.go @@ -49,3 +49,17 @@ func (s *SocketOptionData) Getsockopt(fd uintptr) {  		s.TCPInfo = v  	}  } + +// GetSocketOption gets the socket option info of the conn. +func GetSocketOption(socket any) *SocketOptionData { +	c, ok := socket.(syscall.Conn) +	if !ok { +		return nil +	} +	data := &SocketOptionData{} +	if rawConn, err := c.SyscallConn(); err == nil { +		rawConn.Control(data.Getsockopt) +		return data +	} +	return nil +} diff --git a/vendor/google.golang.org/grpc/internal/channelz/types_nonlinux.go b/vendor/google.golang.org/grpc/internal/channelz/syscall_nonlinux.go index 8b06eed1a..d1ed8df6a 100644 --- a/vendor/google.golang.org/grpc/internal/channelz/types_nonlinux.go +++ b/vendor/google.golang.org/grpc/internal/channelz/syscall_nonlinux.go @@ -1,5 +1,4 @@  //go:build !linux -// +build !linux  /*   * @@ -41,3 +40,8 @@ func (s *SocketOptionData) Getsockopt(fd uintptr) {  		logger.Warning("Channelz: socket options are not supported on non-linux environments")  	})  } + +// GetSocketOption gets the socket option info of the conn. +func GetSocketOption(c any) *SocketOptionData { +	return nil +} diff --git a/vendor/google.golang.org/grpc/internal/channelz/trace.go b/vendor/google.golang.org/grpc/internal/channelz/trace.go new file mode 100644 index 000000000..36b867403 --- /dev/null +++ b/vendor/google.golang.org/grpc/internal/channelz/trace.go @@ -0,0 +1,204 @@ +/* + * + * Copyright 2018 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + *     http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package channelz + +import ( +	"fmt" +	"sync" +	"sync/atomic" +	"time" + +	"google.golang.org/grpc/grpclog" +) + +const ( +	defaultMaxTraceEntry int32 = 30 +) + +var maxTraceEntry = defaultMaxTraceEntry + +// SetMaxTraceEntry sets maximum number of trace entries per entity (i.e. +// channel/subchannel).  Setting it to 0 will disable channel tracing. +func SetMaxTraceEntry(i int32) { +	atomic.StoreInt32(&maxTraceEntry, i) +} + +// ResetMaxTraceEntryToDefault resets the maximum number of trace entries per +// entity to default. +func ResetMaxTraceEntryToDefault() { +	atomic.StoreInt32(&maxTraceEntry, defaultMaxTraceEntry) +} + +func getMaxTraceEntry() int { +	i := atomic.LoadInt32(&maxTraceEntry) +	return int(i) +} + +// traceEvent is an internal representation of a single trace event +type traceEvent struct { +	// Desc is a simple description of the trace event. +	Desc string +	// Severity states the severity of this trace event. +	Severity Severity +	// Timestamp is the event time. +	Timestamp time.Time +	// RefID is the id of the entity that gets referenced in the event. RefID is 0 if no other entity is +	// involved in this event. +	// e.g. SubChannel (id: 4[]) Created. --> RefID = 4, RefName = "" (inside []) +	RefID int64 +	// RefName is the reference name for the entity that gets referenced in the event. +	RefName string +	// RefType indicates the referenced entity type, i.e Channel or SubChannel. +	RefType RefChannelType +} + +// TraceEvent is what the caller of AddTraceEvent should provide to describe the +// event to be added to the channel trace. +// +// The Parent field is optional. It is used for an event that will be recorded +// in the entity's parent trace. +type TraceEvent struct { +	Desc     string +	Severity Severity +	Parent   *TraceEvent +} + +type ChannelTrace struct { +	cm           *channelMap +	clearCalled  bool +	CreationTime time.Time +	EventNum     int64 +	mu           sync.Mutex +	Events       []*traceEvent +} + +func (c *ChannelTrace) copy() *ChannelTrace { +	return &ChannelTrace{ +		CreationTime: c.CreationTime, +		EventNum:     c.EventNum, +		Events:       append(([]*traceEvent)(nil), c.Events...), +	} +} + +func (c *ChannelTrace) append(e *traceEvent) { +	c.mu.Lock() +	if len(c.Events) == getMaxTraceEntry() { +		del := c.Events[0] +		c.Events = c.Events[1:] +		if del.RefID != 0 { +			// start recursive cleanup in a goroutine to not block the call originated from grpc. +			go func() { +				// need to acquire c.cm.mu lock to call the unlocked attemptCleanup func. +				c.cm.mu.Lock() +				c.cm.decrTraceRefCount(del.RefID) +				c.cm.mu.Unlock() +			}() +		} +	} +	e.Timestamp = time.Now() +	c.Events = append(c.Events, e) +	c.EventNum++ +	c.mu.Unlock() +} + +func (c *ChannelTrace) clear() { +	if c.clearCalled { +		return +	} +	c.clearCalled = true +	c.mu.Lock() +	for _, e := range c.Events { +		if e.RefID != 0 { +			// caller should have already held the c.cm.mu lock. +			c.cm.decrTraceRefCount(e.RefID) +		} +	} +	c.mu.Unlock() +} + +// Severity is the severity level of a trace event. +// The canonical enumeration of all valid values is here: +// https://github.com/grpc/grpc-proto/blob/9b13d199cc0d4703c7ea26c9c330ba695866eb23/grpc/channelz/v1/channelz.proto#L126. +type Severity int + +const ( +	// CtUnknown indicates unknown severity of a trace event. +	CtUnknown Severity = iota +	// CtInfo indicates info level severity of a trace event. +	CtInfo +	// CtWarning indicates warning level severity of a trace event. +	CtWarning +	// CtError indicates error level severity of a trace event. +	CtError +) + +// RefChannelType is the type of the entity being referenced in a trace event. +type RefChannelType int + +const ( +	// RefUnknown indicates an unknown entity type, the zero value for this type. +	RefUnknown RefChannelType = iota +	// RefChannel indicates the referenced entity is a Channel. +	RefChannel +	// RefSubChannel indicates the referenced entity is a SubChannel. +	RefSubChannel +	// RefServer indicates the referenced entity is a Server. +	RefServer +	// RefListenSocket indicates the referenced entity is a ListenSocket. +	RefListenSocket +	// RefNormalSocket indicates the referenced entity is a NormalSocket. +	RefNormalSocket +) + +var refChannelTypeToString = map[RefChannelType]string{ +	RefUnknown:      "Unknown", +	RefChannel:      "Channel", +	RefSubChannel:   "SubChannel", +	RefServer:       "Server", +	RefListenSocket: "ListenSocket", +	RefNormalSocket: "NormalSocket", +} + +func (r RefChannelType) String() string { +	return refChannelTypeToString[r] +} + +// AddTraceEvent adds trace related to the entity with specified id, using the +// provided TraceEventDesc. +// +// If channelz is not turned ON, this will simply log the event descriptions. +func AddTraceEvent(l grpclog.DepthLoggerV2, e Entity, depth int, desc *TraceEvent) { +	// Log only the trace description associated with the bottom most entity. +	d := fmt.Sprintf("[%s]%s", e, desc.Desc) +	switch desc.Severity { +	case CtUnknown, CtInfo: +		l.InfoDepth(depth+1, d) +	case CtWarning: +		l.WarningDepth(depth+1, d) +	case CtError: +		l.ErrorDepth(depth+1, d) +	} + +	if getMaxTraceEntry() == 0 { +		return +	} +	if IsOn() { +		db.traceEvent(e.id(), desc) +	} +} diff --git a/vendor/google.golang.org/grpc/internal/channelz/types.go b/vendor/google.golang.org/grpc/internal/channelz/types.go deleted file mode 100644 index 1d4020f53..000000000 --- a/vendor/google.golang.org/grpc/internal/channelz/types.go +++ /dev/null @@ -1,727 +0,0 @@ -/* - * - * Copyright 2018 gRPC authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - *     http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -package channelz - -import ( -	"net" -	"sync" -	"sync/atomic" -	"time" - -	"google.golang.org/grpc/connectivity" -	"google.golang.org/grpc/credentials" -) - -// entry represents a node in the channelz database. -type entry interface { -	// addChild adds a child e, whose channelz id is id to child list -	addChild(id int64, e entry) -	// deleteChild deletes a child with channelz id to be id from child list -	deleteChild(id int64) -	// triggerDelete tries to delete self from channelz database. However, if child -	// list is not empty, then deletion from the database is on hold until the last -	// child is deleted from database. -	triggerDelete() -	// deleteSelfIfReady check whether triggerDelete() has been called before, and whether child -	// list is now empty. If both conditions are met, then delete self from database. -	deleteSelfIfReady() -	// getParentID returns parent ID of the entry. 0 value parent ID means no parent. -	getParentID() int64 -} - -// dummyEntry is a fake entry to handle entry not found case. -type dummyEntry struct { -	idNotFound int64 -} - -func (d *dummyEntry) addChild(id int64, e entry) { -	// Note: It is possible for a normal program to reach here under race condition. -	// For example, there could be a race between ClientConn.Close() info being propagated -	// to addrConn and http2Client. ClientConn.Close() cancel the context and result -	// in http2Client to error. The error info is then caught by transport monitor -	// and before addrConn.tearDown() is called in side ClientConn.Close(). Therefore, -	// the addrConn will create a new transport. And when registering the new transport in -	// channelz, its parent addrConn could have already been torn down and deleted -	// from channelz tracking, and thus reach the code here. -	logger.Infof("attempt to add child of type %T with id %d to a parent (id=%d) that doesn't currently exist", e, id, d.idNotFound) -} - -func (d *dummyEntry) deleteChild(id int64) { -	// It is possible for a normal program to reach here under race condition. -	// Refer to the example described in addChild(). -	logger.Infof("attempt to delete child with id %d from a parent (id=%d) that doesn't currently exist", id, d.idNotFound) -} - -func (d *dummyEntry) triggerDelete() { -	logger.Warningf("attempt to delete an entry (id=%d) that doesn't currently exist", d.idNotFound) -} - -func (*dummyEntry) deleteSelfIfReady() { -	// code should not reach here. deleteSelfIfReady is always called on an existing entry. -} - -func (*dummyEntry) getParentID() int64 { -	return 0 -} - -// ChannelMetric defines the info channelz provides for a specific Channel, which -// includes ChannelInternalMetric and channelz-specific data, such as channelz id, -// child list, etc. -type ChannelMetric struct { -	// ID is the channelz id of this channel. -	ID int64 -	// RefName is the human readable reference string of this channel. -	RefName string -	// ChannelData contains channel internal metric reported by the channel through -	// ChannelzMetric(). -	ChannelData *ChannelInternalMetric -	// NestedChans tracks the nested channel type children of this channel in the format of -	// a map from nested channel channelz id to corresponding reference string. -	NestedChans map[int64]string -	// SubChans tracks the subchannel type children of this channel in the format of a -	// map from subchannel channelz id to corresponding reference string. -	SubChans map[int64]string -	// Sockets tracks the socket type children of this channel in the format of a map -	// from socket channelz id to corresponding reference string. -	// Note current grpc implementation doesn't allow channel having sockets directly, -	// therefore, this is field is unused. -	Sockets map[int64]string -	// Trace contains the most recent traced events. -	Trace *ChannelTrace -} - -// SubChannelMetric defines the info channelz provides for a specific SubChannel, -// which includes ChannelInternalMetric and channelz-specific data, such as -// channelz id, child list, etc. -type SubChannelMetric struct { -	// ID is the channelz id of this subchannel. -	ID int64 -	// RefName is the human readable reference string of this subchannel. -	RefName string -	// ChannelData contains subchannel internal metric reported by the subchannel -	// through ChannelzMetric(). -	ChannelData *ChannelInternalMetric -	// NestedChans tracks the nested channel type children of this subchannel in the format of -	// a map from nested channel channelz id to corresponding reference string. -	// Note current grpc implementation doesn't allow subchannel to have nested channels -	// as children, therefore, this field is unused. -	NestedChans map[int64]string -	// SubChans tracks the subchannel type children of this subchannel in the format of a -	// map from subchannel channelz id to corresponding reference string. -	// Note current grpc implementation doesn't allow subchannel to have subchannels -	// as children, therefore, this field is unused. -	SubChans map[int64]string -	// Sockets tracks the socket type children of this subchannel in the format of a map -	// from socket channelz id to corresponding reference string. -	Sockets map[int64]string -	// Trace contains the most recent traced events. -	Trace *ChannelTrace -} - -// ChannelInternalMetric defines the struct that the implementor of Channel interface -// should return from ChannelzMetric(). -type ChannelInternalMetric struct { -	// current connectivity state of the channel. -	State connectivity.State -	// The target this channel originally tried to connect to.  May be absent -	Target string -	// The number of calls started on the channel. -	CallsStarted int64 -	// The number of calls that have completed with an OK status. -	CallsSucceeded int64 -	// The number of calls that have a completed with a non-OK status. -	CallsFailed int64 -	// The last time a call was started on the channel. -	LastCallStartedTimestamp time.Time -} - -// ChannelTrace stores traced events on a channel/subchannel and related info. -type ChannelTrace struct { -	// EventNum is the number of events that ever got traced (i.e. including those that have been deleted) -	EventNum int64 -	// CreationTime is the creation time of the trace. -	CreationTime time.Time -	// Events stores the most recent trace events (up to $maxTraceEntry, newer event will overwrite the -	// oldest one) -	Events []*TraceEvent -} - -// TraceEvent represent a single trace event -type TraceEvent struct { -	// Desc is a simple description of the trace event. -	Desc string -	// Severity states the severity of this trace event. -	Severity Severity -	// Timestamp is the event time. -	Timestamp time.Time -	// RefID is the id of the entity that gets referenced in the event. RefID is 0 if no other entity is -	// involved in this event. -	// e.g. SubChannel (id: 4[]) Created. --> RefID = 4, RefName = "" (inside []) -	RefID int64 -	// RefName is the reference name for the entity that gets referenced in the event. -	RefName string -	// RefType indicates the referenced entity type, i.e Channel or SubChannel. -	RefType RefChannelType -} - -// Channel is the interface that should be satisfied in order to be tracked by -// channelz as Channel or SubChannel. -type Channel interface { -	ChannelzMetric() *ChannelInternalMetric -} - -type dummyChannel struct{} - -func (d *dummyChannel) ChannelzMetric() *ChannelInternalMetric { -	return &ChannelInternalMetric{} -} - -type channel struct { -	refName     string -	c           Channel -	closeCalled bool -	nestedChans map[int64]string -	subChans    map[int64]string -	id          int64 -	pid         int64 -	cm          *channelMap -	trace       *channelTrace -	// traceRefCount is the number of trace events that reference this channel. -	// Non-zero traceRefCount means the trace of this channel cannot be deleted. -	traceRefCount int32 -} - -func (c *channel) addChild(id int64, e entry) { -	switch v := e.(type) { -	case *subChannel: -		c.subChans[id] = v.refName -	case *channel: -		c.nestedChans[id] = v.refName -	default: -		logger.Errorf("cannot add a child (id = %d) of type %T to a channel", id, e) -	} -} - -func (c *channel) deleteChild(id int64) { -	delete(c.subChans, id) -	delete(c.nestedChans, id) -	c.deleteSelfIfReady() -} - -func (c *channel) triggerDelete() { -	c.closeCalled = true -	c.deleteSelfIfReady() -} - -func (c *channel) getParentID() int64 { -	return c.pid -} - -// deleteSelfFromTree tries to delete the channel from the channelz entry relation tree, which means -// deleting the channel reference from its parent's child list. -// -// In order for a channel to be deleted from the tree, it must meet the criteria that, removal of the -// corresponding grpc object has been invoked, and the channel does not have any children left. -// -// The returned boolean value indicates whether the channel has been successfully deleted from tree. -func (c *channel) deleteSelfFromTree() (deleted bool) { -	if !c.closeCalled || len(c.subChans)+len(c.nestedChans) != 0 { -		return false -	} -	// not top channel -	if c.pid != 0 { -		c.cm.findEntry(c.pid).deleteChild(c.id) -	} -	return true -} - -// deleteSelfFromMap checks whether it is valid to delete the channel from the map, which means -// deleting the channel from channelz's tracking entirely. Users can no longer use id to query the -// channel, and its memory will be garbage collected. -// -// The trace reference count of the channel must be 0 in order to be deleted from the map. This is -// specified in the channel tracing gRFC that as long as some other trace has reference to an entity, -// the trace of the referenced entity must not be deleted. In order to release the resource allocated -// by grpc, the reference to the grpc object is reset to a dummy object. -// -// deleteSelfFromMap must be called after deleteSelfFromTree returns true. -// -// It returns a bool to indicate whether the channel can be safely deleted from map. -func (c *channel) deleteSelfFromMap() (delete bool) { -	if c.getTraceRefCount() != 0 { -		c.c = &dummyChannel{} -		return false -	} -	return true -} - -// deleteSelfIfReady tries to delete the channel itself from the channelz database. -// The delete process includes two steps: -//  1. delete the channel from the entry relation tree, i.e. delete the channel reference from its -//     parent's child list. -//  2. delete the channel from the map, i.e. delete the channel entirely from channelz. Lookup by id -//     will return entry not found error. -func (c *channel) deleteSelfIfReady() { -	if !c.deleteSelfFromTree() { -		return -	} -	if !c.deleteSelfFromMap() { -		return -	} -	c.cm.deleteEntry(c.id) -	c.trace.clear() -} - -func (c *channel) getChannelTrace() *channelTrace { -	return c.trace -} - -func (c *channel) incrTraceRefCount() { -	atomic.AddInt32(&c.traceRefCount, 1) -} - -func (c *channel) decrTraceRefCount() { -	atomic.AddInt32(&c.traceRefCount, -1) -} - -func (c *channel) getTraceRefCount() int { -	i := atomic.LoadInt32(&c.traceRefCount) -	return int(i) -} - -func (c *channel) getRefName() string { -	return c.refName -} - -type subChannel struct { -	refName       string -	c             Channel -	closeCalled   bool -	sockets       map[int64]string -	id            int64 -	pid           int64 -	cm            *channelMap -	trace         *channelTrace -	traceRefCount int32 -} - -func (sc *subChannel) addChild(id int64, e entry) { -	if v, ok := e.(*normalSocket); ok { -		sc.sockets[id] = v.refName -	} else { -		logger.Errorf("cannot add a child (id = %d) of type %T to a subChannel", id, e) -	} -} - -func (sc *subChannel) deleteChild(id int64) { -	delete(sc.sockets, id) -	sc.deleteSelfIfReady() -} - -func (sc *subChannel) triggerDelete() { -	sc.closeCalled = true -	sc.deleteSelfIfReady() -} - -func (sc *subChannel) getParentID() int64 { -	return sc.pid -} - -// deleteSelfFromTree tries to delete the subchannel from the channelz entry relation tree, which -// means deleting the subchannel reference from its parent's child list. -// -// In order for a subchannel to be deleted from the tree, it must meet the criteria that, removal of -// the corresponding grpc object has been invoked, and the subchannel does not have any children left. -// -// The returned boolean value indicates whether the channel has been successfully deleted from tree. -func (sc *subChannel) deleteSelfFromTree() (deleted bool) { -	if !sc.closeCalled || len(sc.sockets) != 0 { -		return false -	} -	sc.cm.findEntry(sc.pid).deleteChild(sc.id) -	return true -} - -// deleteSelfFromMap checks whether it is valid to delete the subchannel from the map, which means -// deleting the subchannel from channelz's tracking entirely. Users can no longer use id to query -// the subchannel, and its memory will be garbage collected. -// -// The trace reference count of the subchannel must be 0 in order to be deleted from the map. This is -// specified in the channel tracing gRFC that as long as some other trace has reference to an entity, -// the trace of the referenced entity must not be deleted. In order to release the resource allocated -// by grpc, the reference to the grpc object is reset to a dummy object. -// -// deleteSelfFromMap must be called after deleteSelfFromTree returns true. -// -// It returns a bool to indicate whether the channel can be safely deleted from map. -func (sc *subChannel) deleteSelfFromMap() (delete bool) { -	if sc.getTraceRefCount() != 0 { -		// free the grpc struct (i.e. addrConn) -		sc.c = &dummyChannel{} -		return false -	} -	return true -} - -// deleteSelfIfReady tries to delete the subchannel itself from the channelz database. -// The delete process includes two steps: -//  1. delete the subchannel from the entry relation tree, i.e. delete the subchannel reference from -//     its parent's child list. -//  2. delete the subchannel from the map, i.e. delete the subchannel entirely from channelz. Lookup -//     by id will return entry not found error. -func (sc *subChannel) deleteSelfIfReady() { -	if !sc.deleteSelfFromTree() { -		return -	} -	if !sc.deleteSelfFromMap() { -		return -	} -	sc.cm.deleteEntry(sc.id) -	sc.trace.clear() -} - -func (sc *subChannel) getChannelTrace() *channelTrace { -	return sc.trace -} - -func (sc *subChannel) incrTraceRefCount() { -	atomic.AddInt32(&sc.traceRefCount, 1) -} - -func (sc *subChannel) decrTraceRefCount() { -	atomic.AddInt32(&sc.traceRefCount, -1) -} - -func (sc *subChannel) getTraceRefCount() int { -	i := atomic.LoadInt32(&sc.traceRefCount) -	return int(i) -} - -func (sc *subChannel) getRefName() string { -	return sc.refName -} - -// SocketMetric defines the info channelz provides for a specific Socket, which -// includes SocketInternalMetric and channelz-specific data, such as channelz id, etc. -type SocketMetric struct { -	// ID is the channelz id of this socket. -	ID int64 -	// RefName is the human readable reference string of this socket. -	RefName string -	// SocketData contains socket internal metric reported by the socket through -	// ChannelzMetric(). -	SocketData *SocketInternalMetric -} - -// SocketInternalMetric defines the struct that the implementor of Socket interface -// should return from ChannelzMetric(). -type SocketInternalMetric struct { -	// The number of streams that have been started. -	StreamsStarted int64 -	// The number of streams that have ended successfully: -	// On client side, receiving frame with eos bit set. -	// On server side, sending frame with eos bit set. -	StreamsSucceeded int64 -	// The number of streams that have ended unsuccessfully: -	// On client side, termination without receiving frame with eos bit set. -	// On server side, termination without sending frame with eos bit set. -	StreamsFailed int64 -	// The number of messages successfully sent on this socket. -	MessagesSent     int64 -	MessagesReceived int64 -	// The number of keep alives sent.  This is typically implemented with HTTP/2 -	// ping messages. -	KeepAlivesSent int64 -	// The last time a stream was created by this endpoint.  Usually unset for -	// servers. -	LastLocalStreamCreatedTimestamp time.Time -	// The last time a stream was created by the remote endpoint.  Usually unset -	// for clients. -	LastRemoteStreamCreatedTimestamp time.Time -	// The last time a message was sent by this endpoint. -	LastMessageSentTimestamp time.Time -	// The last time a message was received by this endpoint. -	LastMessageReceivedTimestamp time.Time -	// The amount of window, granted to the local endpoint by the remote endpoint. -	// This may be slightly out of date due to network latency.  This does NOT -	// include stream level or TCP level flow control info. -	LocalFlowControlWindow int64 -	// The amount of window, granted to the remote endpoint by the local endpoint. -	// This may be slightly out of date due to network latency.  This does NOT -	// include stream level or TCP level flow control info. -	RemoteFlowControlWindow int64 -	// The locally bound address. -	LocalAddr net.Addr -	// The remote bound address.  May be absent. -	RemoteAddr net.Addr -	// Optional, represents the name of the remote endpoint, if different than -	// the original target name. -	RemoteName    string -	SocketOptions *SocketOptionData -	Security      credentials.ChannelzSecurityValue -} - -// Socket is the interface that should be satisfied in order to be tracked by -// channelz as Socket. -type Socket interface { -	ChannelzMetric() *SocketInternalMetric -} - -type listenSocket struct { -	refName string -	s       Socket -	id      int64 -	pid     int64 -	cm      *channelMap -} - -func (ls *listenSocket) addChild(id int64, e entry) { -	logger.Errorf("cannot add a child (id = %d) of type %T to a listen socket", id, e) -} - -func (ls *listenSocket) deleteChild(id int64) { -	logger.Errorf("cannot delete a child (id = %d) from a listen socket", id) -} - -func (ls *listenSocket) triggerDelete() { -	ls.cm.deleteEntry(ls.id) -	ls.cm.findEntry(ls.pid).deleteChild(ls.id) -} - -func (ls *listenSocket) deleteSelfIfReady() { -	logger.Errorf("cannot call deleteSelfIfReady on a listen socket") -} - -func (ls *listenSocket) getParentID() int64 { -	return ls.pid -} - -type normalSocket struct { -	refName string -	s       Socket -	id      int64 -	pid     int64 -	cm      *channelMap -} - -func (ns *normalSocket) addChild(id int64, e entry) { -	logger.Errorf("cannot add a child (id = %d) of type %T to a normal socket", id, e) -} - -func (ns *normalSocket) deleteChild(id int64) { -	logger.Errorf("cannot delete a child (id = %d) from a normal socket", id) -} - -func (ns *normalSocket) triggerDelete() { -	ns.cm.deleteEntry(ns.id) -	ns.cm.findEntry(ns.pid).deleteChild(ns.id) -} - -func (ns *normalSocket) deleteSelfIfReady() { -	logger.Errorf("cannot call deleteSelfIfReady on a normal socket") -} - -func (ns *normalSocket) getParentID() int64 { -	return ns.pid -} - -// ServerMetric defines the info channelz provides for a specific Server, which -// includes ServerInternalMetric and channelz-specific data, such as channelz id, -// child list, etc. -type ServerMetric struct { -	// ID is the channelz id of this server. -	ID int64 -	// RefName is the human readable reference string of this server. -	RefName string -	// ServerData contains server internal metric reported by the server through -	// ChannelzMetric(). -	ServerData *ServerInternalMetric -	// ListenSockets tracks the listener socket type children of this server in the -	// format of a map from socket channelz id to corresponding reference string. -	ListenSockets map[int64]string -} - -// ServerInternalMetric defines the struct that the implementor of Server interface -// should return from ChannelzMetric(). -type ServerInternalMetric struct { -	// The number of incoming calls started on the server. -	CallsStarted int64 -	// The number of incoming calls that have completed with an OK status. -	CallsSucceeded int64 -	// The number of incoming calls that have a completed with a non-OK status. -	CallsFailed int64 -	// The last time a call was started on the server. -	LastCallStartedTimestamp time.Time -} - -// Server is the interface to be satisfied in order to be tracked by channelz as -// Server. -type Server interface { -	ChannelzMetric() *ServerInternalMetric -} - -type server struct { -	refName       string -	s             Server -	closeCalled   bool -	sockets       map[int64]string -	listenSockets map[int64]string -	id            int64 -	cm            *channelMap -} - -func (s *server) addChild(id int64, e entry) { -	switch v := e.(type) { -	case *normalSocket: -		s.sockets[id] = v.refName -	case *listenSocket: -		s.listenSockets[id] = v.refName -	default: -		logger.Errorf("cannot add a child (id = %d) of type %T to a server", id, e) -	} -} - -func (s *server) deleteChild(id int64) { -	delete(s.sockets, id) -	delete(s.listenSockets, id) -	s.deleteSelfIfReady() -} - -func (s *server) triggerDelete() { -	s.closeCalled = true -	s.deleteSelfIfReady() -} - -func (s *server) deleteSelfIfReady() { -	if !s.closeCalled || len(s.sockets)+len(s.listenSockets) != 0 { -		return -	} -	s.cm.deleteEntry(s.id) -} - -func (s *server) getParentID() int64 { -	return 0 -} - -type tracedChannel interface { -	getChannelTrace() *channelTrace -	incrTraceRefCount() -	decrTraceRefCount() -	getRefName() string -} - -type channelTrace struct { -	cm          *channelMap -	clearCalled bool -	createdTime time.Time -	eventCount  int64 -	mu          sync.Mutex -	events      []*TraceEvent -} - -func (c *channelTrace) append(e *TraceEvent) { -	c.mu.Lock() -	if len(c.events) == getMaxTraceEntry() { -		del := c.events[0] -		c.events = c.events[1:] -		if del.RefID != 0 { -			// start recursive cleanup in a goroutine to not block the call originated from grpc. -			go func() { -				// need to acquire c.cm.mu lock to call the unlocked attemptCleanup func. -				c.cm.mu.Lock() -				c.cm.decrTraceRefCount(del.RefID) -				c.cm.mu.Unlock() -			}() -		} -	} -	e.Timestamp = time.Now() -	c.events = append(c.events, e) -	c.eventCount++ -	c.mu.Unlock() -} - -func (c *channelTrace) clear() { -	if c.clearCalled { -		return -	} -	c.clearCalled = true -	c.mu.Lock() -	for _, e := range c.events { -		if e.RefID != 0 { -			// caller should have already held the c.cm.mu lock. -			c.cm.decrTraceRefCount(e.RefID) -		} -	} -	c.mu.Unlock() -} - -// Severity is the severity level of a trace event. -// The canonical enumeration of all valid values is here: -// https://github.com/grpc/grpc-proto/blob/9b13d199cc0d4703c7ea26c9c330ba695866eb23/grpc/channelz/v1/channelz.proto#L126. -type Severity int - -const ( -	// CtUnknown indicates unknown severity of a trace event. -	CtUnknown Severity = iota -	// CtInfo indicates info level severity of a trace event. -	CtInfo -	// CtWarning indicates warning level severity of a trace event. -	CtWarning -	// CtError indicates error level severity of a trace event. -	CtError -) - -// RefChannelType is the type of the entity being referenced in a trace event. -type RefChannelType int - -const ( -	// RefUnknown indicates an unknown entity type, the zero value for this type. -	RefUnknown RefChannelType = iota -	// RefChannel indicates the referenced entity is a Channel. -	RefChannel -	// RefSubChannel indicates the referenced entity is a SubChannel. -	RefSubChannel -	// RefServer indicates the referenced entity is a Server. -	RefServer -	// RefListenSocket indicates the referenced entity is a ListenSocket. -	RefListenSocket -	// RefNormalSocket indicates the referenced entity is a NormalSocket. -	RefNormalSocket -) - -var refChannelTypeToString = map[RefChannelType]string{ -	RefUnknown:      "Unknown", -	RefChannel:      "Channel", -	RefSubChannel:   "SubChannel", -	RefServer:       "Server", -	RefListenSocket: "ListenSocket", -	RefNormalSocket: "NormalSocket", -} - -func (r RefChannelType) String() string { -	return refChannelTypeToString[r] -} - -func (c *channelTrace) dumpData() *ChannelTrace { -	c.mu.Lock() -	ct := &ChannelTrace{EventNum: c.eventCount, CreationTime: c.createdTime} -	ct.Events = c.events[:len(c.events)] -	c.mu.Unlock() -	return ct -} diff --git a/vendor/google.golang.org/grpc/internal/channelz/util_linux.go b/vendor/google.golang.org/grpc/internal/channelz/util_linux.go deleted file mode 100644 index 98288c3f8..000000000 --- a/vendor/google.golang.org/grpc/internal/channelz/util_linux.go +++ /dev/null @@ -1,37 +0,0 @@ -/* - * - * Copyright 2018 gRPC authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - *     http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -package channelz - -import ( -	"syscall" -) - -// GetSocketOption gets the socket option info of the conn. -func GetSocketOption(socket any) *SocketOptionData { -	c, ok := socket.(syscall.Conn) -	if !ok { -		return nil -	} -	data := &SocketOptionData{} -	if rawConn, err := c.SyscallConn(); err == nil { -		rawConn.Control(data.Getsockopt) -		return data -	} -	return nil -} diff --git a/vendor/google.golang.org/grpc/internal/channelz/util_nonlinux.go b/vendor/google.golang.org/grpc/internal/channelz/util_nonlinux.go deleted file mode 100644 index b5568b22e..000000000 --- a/vendor/google.golang.org/grpc/internal/channelz/util_nonlinux.go +++ /dev/null @@ -1,27 +0,0 @@ -//go:build !linux -// +build !linux - -/* - * - * Copyright 2018 gRPC authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - *     http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -package channelz - -// GetSocketOption gets the socket option info of the conn. -func GetSocketOption(c any) *SocketOptionData { -	return nil -} diff --git a/vendor/google.golang.org/grpc/internal/grpcrand/grpcrand.go b/vendor/google.golang.org/grpc/internal/grpcrand/grpcrand.go index aa97273e7..0126d6b51 100644 --- a/vendor/google.golang.org/grpc/internal/grpcrand/grpcrand.go +++ b/vendor/google.golang.org/grpc/internal/grpcrand/grpcrand.go @@ -1,3 +1,8 @@ +//go:build !go1.21 + +// TODO: when this file is deleted (after Go 1.20 support is dropped), delete +// all of grpcrand and call the rand package directly. +  /*   *   * Copyright 2018 gRPC authors. diff --git a/vendor/google.golang.org/grpc/internal/grpcrand/grpcrand_go1.21.go b/vendor/google.golang.org/grpc/internal/grpcrand/grpcrand_go1.21.go new file mode 100644 index 000000000..c37299af1 --- /dev/null +++ b/vendor/google.golang.org/grpc/internal/grpcrand/grpcrand_go1.21.go @@ -0,0 +1,73 @@ +//go:build go1.21 + +/* + * + * Copyright 2024 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + *     http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +// Package grpcrand implements math/rand functions in a concurrent-safe way +// with a global random source, independent of math/rand's global source. +package grpcrand + +import "math/rand" + +// This implementation will be used for Go version 1.21 or newer. +// For older versions, the original implementation with mutex will be used. + +// Int implements rand.Int on the grpcrand global source. +func Int() int { +	return rand.Int() +} + +// Int63n implements rand.Int63n on the grpcrand global source. +func Int63n(n int64) int64 { +	return rand.Int63n(n) +} + +// Intn implements rand.Intn on the grpcrand global source. +func Intn(n int) int { +	return rand.Intn(n) +} + +// Int31n implements rand.Int31n on the grpcrand global source. +func Int31n(n int32) int32 { +	return rand.Int31n(n) +} + +// Float64 implements rand.Float64 on the grpcrand global source. +func Float64() float64 { +	return rand.Float64() +} + +// Uint64 implements rand.Uint64 on the grpcrand global source. +func Uint64() uint64 { +	return rand.Uint64() +} + +// Uint32 implements rand.Uint32 on the grpcrand global source. +func Uint32() uint32 { +	return rand.Uint32() +} + +// ExpFloat64 implements rand.ExpFloat64 on the grpcrand global source. +func ExpFloat64() float64 { +	return rand.ExpFloat64() +} + +// Shuffle implements rand.Shuffle on the grpcrand global source. +var Shuffle = func(n int, f func(int, int)) { +	rand.Shuffle(n, f) +} diff --git a/vendor/google.golang.org/grpc/internal/internal.go b/vendor/google.golang.org/grpc/internal/internal.go index 6c7ea6a53..48d24bdb4 100644 --- a/vendor/google.golang.org/grpc/internal/internal.go +++ b/vendor/google.golang.org/grpc/internal/internal.go @@ -190,12 +190,16 @@ var (  	// function makes events more predictable than relying on timer events.  	TriggerXDSResourceNameNotFoundForTesting any // func(func(xdsresource.Type, string), string, string) error -	// TriggerXDSResourceNotFoundClient invokes the testing xDS Client singleton -	// to invoke resource not found for a resource type name and resource name. +	// TriggerXDSResourceNameNotFoundClient invokes the testing xDS Client +	// singleton to invoke resource not found for a resource type name and +	// resource name.  	TriggerXDSResourceNameNotFoundClient any // func(string, string) error  	// FromOutgoingContextRaw returns the un-merged, intermediary contents of metadata.rawMD.  	FromOutgoingContextRaw any // func(context.Context) (metadata.MD, [][]string, bool) + +	// UserSetDefaultScheme is set to true if the user has overridden the default resolver scheme. +	UserSetDefaultScheme bool = false  )  // HealthChecker defines the signature of the client-side LB channel health checking function. diff --git a/vendor/google.golang.org/grpc/internal/pretty/pretty.go b/vendor/google.golang.org/grpc/internal/pretty/pretty.go index 703319137..dbee7a60d 100644 --- a/vendor/google.golang.org/grpc/internal/pretty/pretty.go +++ b/vendor/google.golang.org/grpc/internal/pretty/pretty.go @@ -24,10 +24,8 @@ import (  	"encoding/json"  	"fmt" -	"github.com/golang/protobuf/jsonpb" -	protov1 "github.com/golang/protobuf/proto"  	"google.golang.org/protobuf/encoding/protojson" -	protov2 "google.golang.org/protobuf/proto" +	"google.golang.org/protobuf/protoadapt"  )  const jsonIndent = "  " @@ -36,21 +34,14 @@ const jsonIndent = "  "  //  // If marshal fails, it falls back to fmt.Sprintf("%+v").  func ToJSON(e any) string { -	switch ee := e.(type) { -	case protov1.Message: -		mm := jsonpb.Marshaler{Indent: jsonIndent} -		ret, err := mm.MarshalToString(ee) -		if err != nil { -			// This may fail for proto.Anys, e.g. for xDS v2, LDS, the v2 -			// messages are not imported, and this will fail because the message -			// is not found. -			return fmt.Sprintf("%+v", ee) -		} -		return ret -	case protov2.Message: +	if ee, ok := e.(protoadapt.MessageV1); ok { +		e = protoadapt.MessageV2Of(ee) +	} + +	if ee, ok := e.(protoadapt.MessageV2); ok {  		mm := protojson.MarshalOptions{ -			Multiline: true,  			Indent:    jsonIndent, +			Multiline: true,  		}  		ret, err := mm.Marshal(ee)  		if err != nil { @@ -60,13 +51,13 @@ func ToJSON(e any) string {  			return fmt.Sprintf("%+v", ee)  		}  		return string(ret) -	default: -		ret, err := json.MarshalIndent(ee, "", jsonIndent) -		if err != nil { -			return fmt.Sprintf("%+v", ee) -		} -		return string(ret)  	} + +	ret, err := json.MarshalIndent(e, "", jsonIndent) +	if err != nil { +		return fmt.Sprintf("%+v", e) +	} +	return string(ret)  }  // FormatJSON formats the input json bytes with indentation. diff --git a/vendor/google.golang.org/grpc/internal/resolver/dns/dns_resolver.go b/vendor/google.golang.org/grpc/internal/resolver/dns/dns_resolver.go index b66dcb213..abab35e25 100644 --- a/vendor/google.golang.org/grpc/internal/resolver/dns/dns_resolver.go +++ b/vendor/google.golang.org/grpc/internal/resolver/dns/dns_resolver.go @@ -45,6 +45,13 @@ import (  // addresses from SRV records.  Must not be changed after init time.  var EnableSRVLookups = false +// ResolvingTimeout specifies the maximum duration for a DNS resolution request. +// If the timeout expires before a response is received, the request will be canceled. +// +// It is recommended to set this value at application startup. Avoid modifying this variable +// after initialization as it's not thread-safe for concurrent modification. +var ResolvingTimeout = 30 * time.Second +  var logger = grpclog.Component("dns")  func init() { @@ -221,18 +228,18 @@ func (d *dnsResolver) watcher() {  	}  } -func (d *dnsResolver) lookupSRV() ([]resolver.Address, error) { +func (d *dnsResolver) lookupSRV(ctx context.Context) ([]resolver.Address, error) {  	if !EnableSRVLookups {  		return nil, nil  	}  	var newAddrs []resolver.Address -	_, srvs, err := d.resolver.LookupSRV(d.ctx, "grpclb", "tcp", d.host) +	_, srvs, err := d.resolver.LookupSRV(ctx, "grpclb", "tcp", d.host)  	if err != nil {  		err = handleDNSError(err, "SRV") // may become nil  		return nil, err  	}  	for _, s := range srvs { -		lbAddrs, err := d.resolver.LookupHost(d.ctx, s.Target) +		lbAddrs, err := d.resolver.LookupHost(ctx, s.Target)  		if err != nil {  			err = handleDNSError(err, "A") // may become nil  			if err == nil { @@ -269,8 +276,8 @@ func handleDNSError(err error, lookupType string) error {  	return err  } -func (d *dnsResolver) lookupTXT() *serviceconfig.ParseResult { -	ss, err := d.resolver.LookupTXT(d.ctx, txtPrefix+d.host) +func (d *dnsResolver) lookupTXT(ctx context.Context) *serviceconfig.ParseResult { +	ss, err := d.resolver.LookupTXT(ctx, txtPrefix+d.host)  	if err != nil {  		if envconfig.TXTErrIgnore {  			return nil @@ -297,8 +304,8 @@ func (d *dnsResolver) lookupTXT() *serviceconfig.ParseResult {  	return d.cc.ParseServiceConfig(sc)  } -func (d *dnsResolver) lookupHost() ([]resolver.Address, error) { -	addrs, err := d.resolver.LookupHost(d.ctx, d.host) +func (d *dnsResolver) lookupHost(ctx context.Context) ([]resolver.Address, error) { +	addrs, err := d.resolver.LookupHost(ctx, d.host)  	if err != nil {  		err = handleDNSError(err, "A")  		return nil, err @@ -316,8 +323,10 @@ func (d *dnsResolver) lookupHost() ([]resolver.Address, error) {  }  func (d *dnsResolver) lookup() (*resolver.State, error) { -	srv, srvErr := d.lookupSRV() -	addrs, hostErr := d.lookupHost() +	ctx, cancel := context.WithTimeout(d.ctx, ResolvingTimeout) +	defer cancel() +	srv, srvErr := d.lookupSRV(ctx) +	addrs, hostErr := d.lookupHost(ctx)  	if hostErr != nil && (srvErr != nil || len(srv) == 0) {  		return nil, hostErr  	} @@ -327,7 +336,7 @@ func (d *dnsResolver) lookup() (*resolver.State, error) {  		state = grpclbstate.Set(state, &grpclbstate.State{BalancerAddresses: srv})  	}  	if !d.disableServiceConfig { -		state.ServiceConfig = d.lookupTXT() +		state.ServiceConfig = d.lookupTXT(ctx)  	}  	return &state, nil  } diff --git a/vendor/google.golang.org/grpc/internal/status/status.go b/vendor/google.golang.org/grpc/internal/status/status.go index 03ef2fedd..c7dbc8205 100644 --- a/vendor/google.golang.org/grpc/internal/status/status.go +++ b/vendor/google.golang.org/grpc/internal/status/status.go @@ -31,10 +31,11 @@ import (  	"errors"  	"fmt" -	"github.com/golang/protobuf/proto" -	"github.com/golang/protobuf/ptypes"  	spb "google.golang.org/genproto/googleapis/rpc/status"  	"google.golang.org/grpc/codes" +	"google.golang.org/protobuf/proto" +	"google.golang.org/protobuf/protoadapt" +	"google.golang.org/protobuf/types/known/anypb"  )  // Status represents an RPC status code, message, and details.  It is immutable @@ -130,14 +131,14 @@ func (s *Status) Err() error {  // WithDetails returns a new status with the provided details messages appended to the status.  // If any errors are encountered, it returns nil and the first error encountered. -func (s *Status) WithDetails(details ...proto.Message) (*Status, error) { +func (s *Status) WithDetails(details ...protoadapt.MessageV1) (*Status, error) {  	if s.Code() == codes.OK {  		return nil, errors.New("no error details for status with code OK")  	}  	// s.Code() != OK implies that s.Proto() != nil.  	p := s.Proto()  	for _, detail := range details { -		any, err := ptypes.MarshalAny(detail) +		any, err := anypb.New(protoadapt.MessageV2Of(detail))  		if err != nil {  			return nil, err  		} @@ -154,12 +155,12 @@ func (s *Status) Details() []any {  	}  	details := make([]any, 0, len(s.s.Details))  	for _, any := range s.s.Details { -		detail := &ptypes.DynamicAny{} -		if err := ptypes.UnmarshalAny(any, detail); err != nil { +		detail, err := any.UnmarshalNew() +		if err != nil {  			details = append(details, err)  			continue  		} -		details = append(details, detail.Message) +		details = append(details, detail)  	}  	return details  } diff --git a/vendor/google.golang.org/grpc/internal/transport/handler_server.go b/vendor/google.golang.org/grpc/internal/transport/handler_server.go index a9d70e2a1..4a3ddce29 100644 --- a/vendor/google.golang.org/grpc/internal/transport/handler_server.go +++ b/vendor/google.golang.org/grpc/internal/transport/handler_server.go @@ -35,7 +35,6 @@ import (  	"sync"  	"time" -	"github.com/golang/protobuf/proto"  	"golang.org/x/net/http2"  	"google.golang.org/grpc/codes"  	"google.golang.org/grpc/credentials" @@ -45,20 +44,17 @@ import (  	"google.golang.org/grpc/peer"  	"google.golang.org/grpc/stats"  	"google.golang.org/grpc/status" +	"google.golang.org/protobuf/proto"  )  // NewServerHandlerTransport returns a ServerTransport handling gRPC from  // inside an http.Handler, or writes an HTTP error to w and returns an error.  // It requires that the http Server supports HTTP/2.  func NewServerHandlerTransport(w http.ResponseWriter, r *http.Request, stats []stats.Handler) (ServerTransport, error) { -	if r.ProtoMajor != 2 { -		msg := "gRPC requires HTTP/2" -		http.Error(w, msg, http.StatusBadRequest) -		return nil, errors.New(msg) -	} -	if r.Method != "POST" { +	if r.Method != http.MethodPost { +		w.Header().Set("Allow", http.MethodPost)  		msg := fmt.Sprintf("invalid gRPC request method %q", r.Method) -		http.Error(w, msg, http.StatusBadRequest) +		http.Error(w, msg, http.StatusMethodNotAllowed)  		return nil, errors.New(msg)  	}  	contentType := r.Header.Get("Content-Type") @@ -69,6 +65,11 @@ func NewServerHandlerTransport(w http.ResponseWriter, r *http.Request, stats []s  		http.Error(w, msg, http.StatusUnsupportedMediaType)  		return nil, errors.New(msg)  	} +	if r.ProtoMajor != 2 { +		msg := "gRPC requires HTTP/2" +		http.Error(w, msg, http.StatusHTTPVersionNotSupported) +		return nil, errors.New(msg) +	}  	if _, ok := w.(http.Flusher); !ok {  		msg := "gRPC requires a ResponseWriter supporting http.Flusher"  		http.Error(w, msg, http.StatusInternalServerError) diff --git a/vendor/google.golang.org/grpc/internal/transport/http2_client.go b/vendor/google.golang.org/grpc/internal/transport/http2_client.go index eff879964..deba0c4d9 100644 --- a/vendor/google.golang.org/grpc/internal/transport/http2_client.go +++ b/vendor/google.golang.org/grpc/internal/transport/http2_client.go @@ -140,9 +140,7 @@ type http2Client struct {  	// variable.  	kpDormant bool -	// Fields below are for channelz metric collection. -	channelzID *channelz.Identifier -	czData     *channelzData +	channelz *channelz.Socket  	onClose func(GoAwayReason) @@ -319,6 +317,7 @@ func newHTTP2Client(connectCtx, ctx context.Context, addr resolver.Address, opts  	if opts.MaxHeaderListSize != nil {  		maxHeaderListSize = *opts.MaxHeaderListSize  	} +  	t := &http2Client{  		ctx:                   ctx,  		ctxDone:               ctx.Done(), // Cache Done chan. @@ -346,11 +345,25 @@ func newHTTP2Client(connectCtx, ctx context.Context, addr resolver.Address, opts  		maxConcurrentStreams:  defaultMaxStreamsClient,  		streamQuota:           defaultMaxStreamsClient,  		streamsQuotaAvailable: make(chan struct{}, 1), -		czData:                new(channelzData),  		keepaliveEnabled:      keepaliveEnabled,  		bufferPool:            newBufferPool(),  		onClose:               onClose,  	} +	var czSecurity credentials.ChannelzSecurityValue +	if au, ok := authInfo.(credentials.ChannelzSecurityInfo); ok { +		czSecurity = au.GetSecurityValue() +	} +	t.channelz = channelz.RegisterSocket( +		&channelz.Socket{ +			SocketType:       channelz.SocketTypeNormal, +			Parent:           opts.ChannelzParent, +			SocketMetrics:    channelz.SocketMetrics{}, +			EphemeralMetrics: t.socketMetrics, +			LocalAddr:        t.localAddr, +			RemoteAddr:       t.remoteAddr, +			SocketOptions:    channelz.GetSocketOption(t.conn), +			Security:         czSecurity, +		})  	t.logger = prefixLoggerForClientTransport(t)  	// Add peer information to the http2client context.  	t.ctx = peer.NewContext(t.ctx, t.getPeer()) @@ -381,10 +394,6 @@ func newHTTP2Client(connectCtx, ctx context.Context, addr resolver.Address, opts  		}  		sh.HandleConn(t.ctx, connBegin)  	} -	t.channelzID, err = channelz.RegisterNormalSocket(t, opts.ChannelzParentID, fmt.Sprintf("%s -> %s", t.localAddr, t.remoteAddr)) -	if err != nil { -		return nil, err -	}  	if t.keepaliveEnabled {  		t.kpDormancyCond = sync.NewCond(&t.mu)  		go t.keepalive() @@ -756,8 +765,8 @@ func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (*Stream,  				return ErrConnClosing  			}  			if channelz.IsOn() { -				atomic.AddInt64(&t.czData.streamsStarted, 1) -				atomic.StoreInt64(&t.czData.lastStreamCreatedTime, time.Now().UnixNano()) +				t.channelz.SocketMetrics.StreamsStarted.Add(1) +				t.channelz.SocketMetrics.LastLocalStreamCreatedTimestamp.Store(time.Now().UnixNano())  			}  			// If the keepalive goroutine has gone dormant, wake it up.  			if t.kpDormant { @@ -928,9 +937,9 @@ func (t *http2Client) closeStream(s *Stream, err error, rst bool, rstCode http2.  			t.mu.Unlock()  			if channelz.IsOn() {  				if eosReceived { -					atomic.AddInt64(&t.czData.streamsSucceeded, 1) +					t.channelz.SocketMetrics.StreamsSucceeded.Add(1)  				} else { -					atomic.AddInt64(&t.czData.streamsFailed, 1) +					t.channelz.SocketMetrics.StreamsFailed.Add(1)  				}  			}  		}, @@ -985,7 +994,7 @@ func (t *http2Client) Close(err error) {  	t.controlBuf.finish()  	t.cancel()  	t.conn.Close() -	channelz.RemoveEntry(t.channelzID) +	channelz.RemoveEntry(t.channelz.ID)  	// Append info about previous goaways if there were any, since this may be important  	// for understanding the root cause for this connection to be closed.  	_, goAwayDebugMessage := t.GetGoAwayReason() @@ -1708,7 +1717,7 @@ func (t *http2Client) keepalive() {  			// keepalive timer expired. In both cases, we need to send a ping.  			if !outstandingPing {  				if channelz.IsOn() { -					atomic.AddInt64(&t.czData.kpCount, 1) +					t.channelz.SocketMetrics.KeepAlivesSent.Add(1)  				}  				t.controlBuf.put(p)  				timeoutLeft = t.kp.Timeout @@ -1738,40 +1747,23 @@ func (t *http2Client) GoAway() <-chan struct{} {  	return t.goAway  } -func (t *http2Client) ChannelzMetric() *channelz.SocketInternalMetric { -	s := channelz.SocketInternalMetric{ -		StreamsStarted:                  atomic.LoadInt64(&t.czData.streamsStarted), -		StreamsSucceeded:                atomic.LoadInt64(&t.czData.streamsSucceeded), -		StreamsFailed:                   atomic.LoadInt64(&t.czData.streamsFailed), -		MessagesSent:                    atomic.LoadInt64(&t.czData.msgSent), -		MessagesReceived:                atomic.LoadInt64(&t.czData.msgRecv), -		KeepAlivesSent:                  atomic.LoadInt64(&t.czData.kpCount), -		LastLocalStreamCreatedTimestamp: time.Unix(0, atomic.LoadInt64(&t.czData.lastStreamCreatedTime)), -		LastMessageSentTimestamp:        time.Unix(0, atomic.LoadInt64(&t.czData.lastMsgSentTime)), -		LastMessageReceivedTimestamp:    time.Unix(0, atomic.LoadInt64(&t.czData.lastMsgRecvTime)), -		LocalFlowControlWindow:          int64(t.fc.getSize()), -		SocketOptions:                   channelz.GetSocketOption(t.conn), -		LocalAddr:                       t.localAddr, -		RemoteAddr:                      t.remoteAddr, -		// RemoteName : -	} -	if au, ok := t.authInfo.(credentials.ChannelzSecurityInfo); ok { -		s.Security = au.GetSecurityValue() -	} -	s.RemoteFlowControlWindow = t.getOutFlowWindow() -	return &s +func (t *http2Client) socketMetrics() *channelz.EphemeralSocketMetrics { +	return &channelz.EphemeralSocketMetrics{ +		LocalFlowControlWindow:  int64(t.fc.getSize()), +		RemoteFlowControlWindow: t.getOutFlowWindow(), +	}  }  func (t *http2Client) RemoteAddr() net.Addr { return t.remoteAddr }  func (t *http2Client) IncrMsgSent() { -	atomic.AddInt64(&t.czData.msgSent, 1) -	atomic.StoreInt64(&t.czData.lastMsgSentTime, time.Now().UnixNano()) +	t.channelz.SocketMetrics.MessagesSent.Add(1) +	t.channelz.SocketMetrics.LastMessageSentTimestamp.Store(time.Now().UnixNano())  }  func (t *http2Client) IncrMsgRecv() { -	atomic.AddInt64(&t.czData.msgRecv, 1) -	atomic.StoreInt64(&t.czData.lastMsgRecvTime, time.Now().UnixNano()) +	t.channelz.SocketMetrics.MessagesReceived.Add(1) +	t.channelz.SocketMetrics.LastMessageReceivedTimestamp.Store(time.Now().UnixNano())  }  func (t *http2Client) getOutFlowWindow() int64 { diff --git a/vendor/google.golang.org/grpc/internal/transport/http2_server.go b/vendor/google.golang.org/grpc/internal/transport/http2_server.go index a206e2eef..d582e0471 100644 --- a/vendor/google.golang.org/grpc/internal/transport/http2_server.go +++ b/vendor/google.golang.org/grpc/internal/transport/http2_server.go @@ -32,13 +32,13 @@ import (  	"sync/atomic"  	"time" -	"github.com/golang/protobuf/proto"  	"golang.org/x/net/http2"  	"golang.org/x/net/http2/hpack"  	"google.golang.org/grpc/internal/grpclog"  	"google.golang.org/grpc/internal/grpcutil"  	"google.golang.org/grpc/internal/pretty"  	"google.golang.org/grpc/internal/syscall" +	"google.golang.org/protobuf/proto"  	"google.golang.org/grpc/codes"  	"google.golang.org/grpc/credentials" @@ -118,8 +118,7 @@ type http2Server struct {  	idle time.Time  	// Fields below are for channelz metric collection. -	channelzID *channelz.Identifier -	czData     *channelzData +	channelz   *channelz.Socket  	bufferPool *bufferPool  	connectionID uint64 @@ -262,9 +261,24 @@ func NewServerTransport(conn net.Conn, config *ServerConfig) (_ ServerTransport,  		idle:              time.Now(),  		kep:               kep,  		initialWindowSize: iwz, -		czData:            new(channelzData),  		bufferPool:        newBufferPool(),  	} +	var czSecurity credentials.ChannelzSecurityValue +	if au, ok := authInfo.(credentials.ChannelzSecurityInfo); ok { +		czSecurity = au.GetSecurityValue() +	} +	t.channelz = channelz.RegisterSocket( +		&channelz.Socket{ +			SocketType:       channelz.SocketTypeNormal, +			Parent:           config.ChannelzParent, +			SocketMetrics:    channelz.SocketMetrics{}, +			EphemeralMetrics: t.socketMetrics, +			LocalAddr:        t.peer.LocalAddr, +			RemoteAddr:       t.peer.Addr, +			SocketOptions:    channelz.GetSocketOption(t.conn), +			Security:         czSecurity, +		}, +	)  	t.logger = prefixLoggerForServerTransport(t)  	t.controlBuf = newControlBuffer(t.done) @@ -274,10 +288,6 @@ func NewServerTransport(conn net.Conn, config *ServerConfig) (_ ServerTransport,  			updateFlowControl: t.updateFlowControl,  		}  	} -	t.channelzID, err = channelz.RegisterNormalSocket(t, config.ChannelzParentID, fmt.Sprintf("%s -> %s", t.peer.Addr, t.peer.LocalAddr)) -	if err != nil { -		return nil, err -	}  	t.connectionID = atomic.AddUint64(&serverConnectionCounter, 1)  	t.framer.writer.Flush() @@ -334,9 +344,11 @@ func NewServerTransport(conn net.Conn, config *ServerConfig) (_ ServerTransport,  			// closed, would lead to a TCP RST instead of FIN, and the client  			// encountering errors.  For more info:  			// https://github.com/grpc/grpc-go/issues/5358 +			timer := time.NewTimer(time.Second) +			defer timer.Stop()  			select {  			case <-t.readerDone: -			case <-time.After(time.Second): +			case <-timer.C:  			}  			t.conn.Close()  		} @@ -592,8 +604,8 @@ func (t *http2Server) operateHeaders(ctx context.Context, frame *http2.MetaHeade  	}  	t.mu.Unlock()  	if channelz.IsOn() { -		atomic.AddInt64(&t.czData.streamsStarted, 1) -		atomic.StoreInt64(&t.czData.lastStreamCreatedTime, time.Now().UnixNano()) +		t.channelz.SocketMetrics.StreamsStarted.Add(1) +		t.channelz.SocketMetrics.LastRemoteStreamCreatedTimestamp.Store(time.Now().UnixNano())  	}  	s.requestRead = func(n int) {  		t.adjustWindow(s, uint32(n)) @@ -652,18 +664,20 @@ func (t *http2Server) HandleStreams(ctx context.Context, handle func(*Stream)) {  				}  				continue  			} -			if err == io.EOF || err == io.ErrUnexpectedEOF { -				t.Close(err) -				return -			}  			t.Close(err)  			return  		}  		switch frame := frame.(type) {  		case *http2.MetaHeadersFrame:  			if err := t.operateHeaders(ctx, frame, handle); err != nil { -				t.Close(err) -				break +				// Any error processing client headers, e.g. invalid stream ID, +				// is considered a protocol violation. +				t.controlBuf.put(&goAway{ +					code:      http2.ErrCodeProtocol, +					debugData: []byte(err.Error()), +					closeConn: err, +				}) +				continue  			}  		case *http2.DataFrame:  			t.handleData(frame) @@ -1199,7 +1213,7 @@ func (t *http2Server) keepalive() {  			}  			if !outstandingPing {  				if channelz.IsOn() { -					atomic.AddInt64(&t.czData.kpCount, 1) +					t.channelz.SocketMetrics.KeepAlivesSent.Add(1)  				}  				t.controlBuf.put(p)  				kpTimeoutLeft = t.kp.Timeout @@ -1239,7 +1253,7 @@ func (t *http2Server) Close(err error) {  	if err := t.conn.Close(); err != nil && t.logger.V(logLevel) {  		t.logger.Infof("Error closing underlying net.Conn during Close: %v", err)  	} -	channelz.RemoveEntry(t.channelzID) +	channelz.RemoveEntry(t.channelz.ID)  	// Cancel all active streams.  	for _, s := range streams {  		s.cancel() @@ -1260,9 +1274,9 @@ func (t *http2Server) deleteStream(s *Stream, eosReceived bool) {  	if channelz.IsOn() {  		if eosReceived { -			atomic.AddInt64(&t.czData.streamsSucceeded, 1) +			t.channelz.SocketMetrics.StreamsSucceeded.Add(1)  		} else { -			atomic.AddInt64(&t.czData.streamsFailed, 1) +			t.channelz.SocketMetrics.StreamsFailed.Add(1)  		}  	}  } @@ -1379,38 +1393,21 @@ func (t *http2Server) outgoingGoAwayHandler(g *goAway) (bool, error) {  	return false, nil  } -func (t *http2Server) ChannelzMetric() *channelz.SocketInternalMetric { -	s := channelz.SocketInternalMetric{ -		StreamsStarted:                   atomic.LoadInt64(&t.czData.streamsStarted), -		StreamsSucceeded:                 atomic.LoadInt64(&t.czData.streamsSucceeded), -		StreamsFailed:                    atomic.LoadInt64(&t.czData.streamsFailed), -		MessagesSent:                     atomic.LoadInt64(&t.czData.msgSent), -		MessagesReceived:                 atomic.LoadInt64(&t.czData.msgRecv), -		KeepAlivesSent:                   atomic.LoadInt64(&t.czData.kpCount), -		LastRemoteStreamCreatedTimestamp: time.Unix(0, atomic.LoadInt64(&t.czData.lastStreamCreatedTime)), -		LastMessageSentTimestamp:         time.Unix(0, atomic.LoadInt64(&t.czData.lastMsgSentTime)), -		LastMessageReceivedTimestamp:     time.Unix(0, atomic.LoadInt64(&t.czData.lastMsgRecvTime)), -		LocalFlowControlWindow:           int64(t.fc.getSize()), -		SocketOptions:                    channelz.GetSocketOption(t.conn), -		LocalAddr:                        t.peer.LocalAddr, -		RemoteAddr:                       t.peer.Addr, -		// RemoteName : -	} -	if au, ok := t.peer.AuthInfo.(credentials.ChannelzSecurityInfo); ok { -		s.Security = au.GetSecurityValue() -	} -	s.RemoteFlowControlWindow = t.getOutFlowWindow() -	return &s +func (t *http2Server) socketMetrics() *channelz.EphemeralSocketMetrics { +	return &channelz.EphemeralSocketMetrics{ +		LocalFlowControlWindow:  int64(t.fc.getSize()), +		RemoteFlowControlWindow: t.getOutFlowWindow(), +	}  }  func (t *http2Server) IncrMsgSent() { -	atomic.AddInt64(&t.czData.msgSent, 1) -	atomic.StoreInt64(&t.czData.lastMsgSentTime, time.Now().UnixNano()) +	t.channelz.SocketMetrics.MessagesSent.Add(1) +	t.channelz.SocketMetrics.LastMessageSentTimestamp.Add(1)  }  func (t *http2Server) IncrMsgRecv() { -	atomic.AddInt64(&t.czData.msgRecv, 1) -	atomic.StoreInt64(&t.czData.lastMsgRecvTime, time.Now().UnixNano()) +	t.channelz.SocketMetrics.MessagesReceived.Add(1) +	t.channelz.SocketMetrics.LastMessageReceivedTimestamp.Add(1)  }  func (t *http2Server) getOutFlowWindow() int64 { diff --git a/vendor/google.golang.org/grpc/internal/transport/http_util.go b/vendor/google.golang.org/grpc/internal/transport/http_util.go index dc29d590e..39cef3bd4 100644 --- a/vendor/google.golang.org/grpc/internal/transport/http_util.go +++ b/vendor/google.golang.org/grpc/internal/transport/http_util.go @@ -418,10 +418,9 @@ func newFramer(conn net.Conn, writeBufferSize, readBufferSize int, sharedWriteBu  	return f  } -func getWriteBufferPool(writeBufferSize int) *sync.Pool { +func getWriteBufferPool(size int) *sync.Pool {  	writeBufferMutex.Lock()  	defer writeBufferMutex.Unlock() -	size := writeBufferSize * 2  	pool, ok := writeBufferPoolMap[size]  	if ok {  		return pool diff --git a/vendor/google.golang.org/grpc/internal/transport/transport.go b/vendor/google.golang.org/grpc/internal/transport/transport.go index b7b8fec18..0d2a6e47f 100644 --- a/vendor/google.golang.org/grpc/internal/transport/transport.go +++ b/vendor/google.golang.org/grpc/internal/transport/transport.go @@ -28,6 +28,7 @@ import (  	"fmt"  	"io"  	"net" +	"strings"  	"sync"  	"sync/atomic"  	"time" @@ -362,8 +363,12 @@ func (s *Stream) SendCompress() string {  // ClientAdvertisedCompressors returns the compressor names advertised by the  // client via grpc-accept-encoding header. -func (s *Stream) ClientAdvertisedCompressors() string { -	return s.clientAdvertisedCompressors +func (s *Stream) ClientAdvertisedCompressors() []string { +	values := strings.Split(s.clientAdvertisedCompressors, ",") +	for i, v := range values { +		values[i] = strings.TrimSpace(v) +	} +	return values  }  // Done returns a channel which is closed when it receives the final status @@ -566,7 +571,7 @@ type ServerConfig struct {  	WriteBufferSize       int  	ReadBufferSize        int  	SharedWriteBuffer     bool -	ChannelzParentID      *channelz.Identifier +	ChannelzParent        *channelz.Server  	MaxHeaderListSize     *uint32  	HeaderTableSize       *uint32  } @@ -601,8 +606,8 @@ type ConnectOptions struct {  	ReadBufferSize int  	// SharedWriteBuffer indicates whether connections should reuse write buffer  	SharedWriteBuffer bool -	// ChannelzParentID sets the addrConn id which initiate the creation of this client transport. -	ChannelzParentID *channelz.Identifier +	// ChannelzParent sets the addrConn id which initiated the creation of this client transport. +	ChannelzParent *channelz.SubChannel  	// MaxHeaderListSize sets the max (uncompressed) size of header list that is prepared to be received.  	MaxHeaderListSize *uint32  	// UseProxy specifies if a proxy should be used. @@ -815,30 +820,6 @@ const (  	GoAwayTooManyPings GoAwayReason = 2  ) -// channelzData is used to store channelz related data for http2Client and http2Server. -// These fields cannot be embedded in the original structs (e.g. http2Client), since to do atomic -// operation on int64 variable on 32-bit machine, user is responsible to enforce memory alignment. -// Here, by grouping those int64 fields inside a struct, we are enforcing the alignment. -type channelzData struct { -	kpCount int64 -	// The number of streams that have started, including already finished ones. -	streamsStarted int64 -	// Client side: The number of streams that have ended successfully by receiving -	// EoS bit set frame from server. -	// Server side: The number of streams that have ended successfully by sending -	// frame with EoS bit set. -	streamsSucceeded int64 -	streamsFailed    int64 -	// lastStreamCreatedTime stores the timestamp that the last stream gets created. It is of int64 type -	// instead of time.Time since it's more costly to atomically update time.Time variable than int64 -	// variable. The same goes for lastMsgSentTime and lastMsgRecvTime. -	lastStreamCreatedTime int64 -	msgSent               int64 -	msgRecv               int64 -	lastMsgSentTime       int64 -	lastMsgRecvTime       int64 -} -  // ContextErr converts the error from context package into a status error.  func ContextErr(err error) error {  	switch err { diff --git a/vendor/google.golang.org/grpc/internal/xds_handshake_cluster.go b/vendor/google.golang.org/grpc/internal/xds_handshake_cluster.go deleted file mode 100644 index e8b492774..000000000 --- a/vendor/google.golang.org/grpc/internal/xds_handshake_cluster.go +++ /dev/null @@ -1,40 +0,0 @@ -/* - * Copyright 2021 gRPC authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - *     http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package internal - -import ( -	"google.golang.org/grpc/attributes" -	"google.golang.org/grpc/resolver" -) - -// handshakeClusterNameKey is the type used as the key to store cluster name in -// the Attributes field of resolver.Address. -type handshakeClusterNameKey struct{} - -// SetXDSHandshakeClusterName returns a copy of addr in which the Attributes field -// is updated with the cluster name. -func SetXDSHandshakeClusterName(addr resolver.Address, clusterName string) resolver.Address { -	addr.Attributes = addr.Attributes.WithValue(handshakeClusterNameKey{}, clusterName) -	return addr -} - -// GetXDSHandshakeClusterName returns cluster name stored in attr. -func GetXDSHandshakeClusterName(attr *attributes.Attributes) (string, bool) { -	v := attr.Value(handshakeClusterNameKey{}) -	name, ok := v.(string) -	return name, ok -}  | 
