summaryrefslogtreecommitdiff
path: root/vendor/google.golang.org/grpc/internal
diff options
context:
space:
mode:
Diffstat (limited to 'vendor/google.golang.org/grpc/internal')
-rw-r--r--vendor/google.golang.org/grpc/internal/balancer/gracefulswitch/config.go83
-rw-r--r--vendor/google.golang.org/grpc/internal/balancer/gracefulswitch/gracefulswitch.go45
-rw-r--r--vendor/google.golang.org/grpc/internal/binarylog/method_logger.go9
-rw-r--r--vendor/google.golang.org/grpc/internal/binarylog/sink.go2
-rw-r--r--vendor/google.golang.org/grpc/internal/channelz/channel.go255
-rw-r--r--vendor/google.golang.org/grpc/internal/channelz/channelmap.go402
-rw-r--r--vendor/google.golang.org/grpc/internal/channelz/funcs.go692
-rw-r--r--vendor/google.golang.org/grpc/internal/channelz/id.go75
-rw-r--r--vendor/google.golang.org/grpc/internal/channelz/logging.go28
-rw-r--r--vendor/google.golang.org/grpc/internal/channelz/server.go119
-rw-r--r--vendor/google.golang.org/grpc/internal/channelz/socket.go130
-rw-r--r--vendor/google.golang.org/grpc/internal/channelz/subchannel.go151
-rw-r--r--vendor/google.golang.org/grpc/internal/channelz/syscall_linux.go (renamed from vendor/google.golang.org/grpc/internal/channelz/types_linux.go)14
-rw-r--r--vendor/google.golang.org/grpc/internal/channelz/syscall_nonlinux.go (renamed from vendor/google.golang.org/grpc/internal/channelz/types_nonlinux.go)6
-rw-r--r--vendor/google.golang.org/grpc/internal/channelz/trace.go204
-rw-r--r--vendor/google.golang.org/grpc/internal/channelz/types.go727
-rw-r--r--vendor/google.golang.org/grpc/internal/channelz/util_linux.go37
-rw-r--r--vendor/google.golang.org/grpc/internal/channelz/util_nonlinux.go27
-rw-r--r--vendor/google.golang.org/grpc/internal/grpcrand/grpcrand.go5
-rw-r--r--vendor/google.golang.org/grpc/internal/grpcrand/grpcrand_go1.21.go73
-rw-r--r--vendor/google.golang.org/grpc/internal/internal.go8
-rw-r--r--vendor/google.golang.org/grpc/internal/pretty/pretty.go35
-rw-r--r--vendor/google.golang.org/grpc/internal/resolver/dns/dns_resolver.go29
-rw-r--r--vendor/google.golang.org/grpc/internal/status/status.go15
-rw-r--r--vendor/google.golang.org/grpc/internal/transport/handler_server.go17
-rw-r--r--vendor/google.golang.org/grpc/internal/transport/http2_client.go72
-rw-r--r--vendor/google.golang.org/grpc/internal/transport/http2_server.go91
-rw-r--r--vendor/google.golang.org/grpc/internal/transport/http_util.go3
-rw-r--r--vendor/google.golang.org/grpc/internal/transport/transport.go39
-rw-r--r--vendor/google.golang.org/grpc/internal/xds_handshake_cluster.go40
30 files changed, 1720 insertions, 1713 deletions
diff --git a/vendor/google.golang.org/grpc/internal/balancer/gracefulswitch/config.go b/vendor/google.golang.org/grpc/internal/balancer/gracefulswitch/config.go
new file mode 100644
index 000000000..6bf7f8739
--- /dev/null
+++ b/vendor/google.golang.org/grpc/internal/balancer/gracefulswitch/config.go
@@ -0,0 +1,83 @@
+/*
+ *
+ * Copyright 2024 gRPC authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package gracefulswitch
+
+import (
+ "encoding/json"
+ "fmt"
+
+ "google.golang.org/grpc/balancer"
+ "google.golang.org/grpc/serviceconfig"
+)
+
+type lbConfig struct {
+ serviceconfig.LoadBalancingConfig
+
+ childBuilder balancer.Builder
+ childConfig serviceconfig.LoadBalancingConfig
+}
+
+func ChildName(l serviceconfig.LoadBalancingConfig) string {
+ return l.(*lbConfig).childBuilder.Name()
+}
+
+// ParseConfig parses a child config list and returns a LB config for the
+// gracefulswitch Balancer.
+//
+// cfg is expected to be a json.RawMessage containing a JSON array of LB policy
+// names + configs as the format of the "loadBalancingConfig" field in
+// ServiceConfig. It returns a type that should be passed to
+// UpdateClientConnState in the BalancerConfig field.
+func ParseConfig(cfg json.RawMessage) (serviceconfig.LoadBalancingConfig, error) {
+ var lbCfg []map[string]json.RawMessage
+ if err := json.Unmarshal(cfg, &lbCfg); err != nil {
+ return nil, err
+ }
+ for i, e := range lbCfg {
+ if len(e) != 1 {
+ return nil, fmt.Errorf("expected a JSON struct with one entry; received entry %v at index %d", e, i)
+ }
+
+ var name string
+ var jsonCfg json.RawMessage
+ for name, jsonCfg = range e {
+ }
+
+ builder := balancer.Get(name)
+ if builder == nil {
+ // Skip unregistered balancer names.
+ continue
+ }
+
+ parser, ok := builder.(balancer.ConfigParser)
+ if !ok {
+ // This is a valid child with no config.
+ return &lbConfig{childBuilder: builder}, nil
+ }
+
+ cfg, err := parser.ParseConfig(jsonCfg)
+ if err != nil {
+ return nil, fmt.Errorf("error parsing config for policy %q: %v", name, err)
+ }
+
+ return &lbConfig{childBuilder: builder, childConfig: cfg}, nil
+ }
+
+ return nil, fmt.Errorf("no supported policies found in config: %v", string(cfg))
+}
diff --git a/vendor/google.golang.org/grpc/internal/balancer/gracefulswitch/gracefulswitch.go b/vendor/google.golang.org/grpc/internal/balancer/gracefulswitch/gracefulswitch.go
index 3c594e6e4..45d5e50ea 100644
--- a/vendor/google.golang.org/grpc/internal/balancer/gracefulswitch/gracefulswitch.go
+++ b/vendor/google.golang.org/grpc/internal/balancer/gracefulswitch/gracefulswitch.go
@@ -94,14 +94,23 @@ func (gsb *Balancer) balancerCurrentOrPending(bw *balancerWrapper) bool {
// process is not complete when this method returns. This method must be called
// synchronously alongside the rest of the balancer.Balancer methods this
// Graceful Switch Balancer implements.
+//
+// Deprecated: use ParseConfig and pass a parsed config to UpdateClientConnState
+// to cause the Balancer to automatically change to the new child when necessary.
func (gsb *Balancer) SwitchTo(builder balancer.Builder) error {
+ _, err := gsb.switchTo(builder)
+ return err
+}
+
+func (gsb *Balancer) switchTo(builder balancer.Builder) (*balancerWrapper, error) {
gsb.mu.Lock()
if gsb.closed {
gsb.mu.Unlock()
- return errBalancerClosed
+ return nil, errBalancerClosed
}
bw := &balancerWrapper{
- gsb: gsb,
+ builder: builder,
+ gsb: gsb,
lastState: balancer.State{
ConnectivityState: connectivity.Connecting,
Picker: base.NewErrPicker(balancer.ErrNoSubConnAvailable),
@@ -129,7 +138,7 @@ func (gsb *Balancer) SwitchTo(builder balancer.Builder) error {
gsb.balancerCurrent = nil
}
gsb.mu.Unlock()
- return balancer.ErrBadResolverState
+ return nil, balancer.ErrBadResolverState
}
// This write doesn't need to take gsb.mu because this field never gets read
@@ -138,7 +147,7 @@ func (gsb *Balancer) SwitchTo(builder balancer.Builder) error {
// bw.Balancer field will never be forwarded to until this SwitchTo()
// function returns.
bw.Balancer = newBalancer
- return nil
+ return bw, nil
}
// Returns nil if the graceful switch balancer is closed.
@@ -152,12 +161,33 @@ func (gsb *Balancer) latestBalancer() *balancerWrapper {
}
// UpdateClientConnState forwards the update to the latest balancer created.
+//
+// If the state's BalancerConfig is the config returned by a call to
+// gracefulswitch.ParseConfig, then this function will automatically SwitchTo
+// the balancer indicated by the config before forwarding its config to it, if
+// necessary.
func (gsb *Balancer) UpdateClientConnState(state balancer.ClientConnState) error {
// The resolver data is only relevant to the most recent LB Policy.
balToUpdate := gsb.latestBalancer()
+
+ gsbCfg, ok := state.BalancerConfig.(*lbConfig)
+ if ok {
+ // Switch to the child in the config unless it is already active.
+ if balToUpdate == nil || gsbCfg.childBuilder.Name() != balToUpdate.builder.Name() {
+ var err error
+ balToUpdate, err = gsb.switchTo(gsbCfg.childBuilder)
+ if err != nil {
+ return fmt.Errorf("could not switch to new child balancer: %w", err)
+ }
+ }
+ // Unwrap the child balancer's config.
+ state.BalancerConfig = gsbCfg.childConfig
+ }
+
if balToUpdate == nil {
return errBalancerClosed
}
+
// Perform this call without gsb.mu to prevent deadlocks if the child calls
// back into the channel. The latest balancer can never be closed during a
// call from the channel, even without gsb.mu held.
@@ -169,6 +199,10 @@ func (gsb *Balancer) ResolverError(err error) {
// The resolver data is only relevant to the most recent LB Policy.
balToUpdate := gsb.latestBalancer()
if balToUpdate == nil {
+ gsb.cc.UpdateState(balancer.State{
+ ConnectivityState: connectivity.TransientFailure,
+ Picker: base.NewErrPicker(err),
+ })
return
}
// Perform this call without gsb.mu to prevent deadlocks if the child calls
@@ -261,7 +295,8 @@ func (gsb *Balancer) Close() {
// graceful switch logic.
type balancerWrapper struct {
balancer.Balancer
- gsb *Balancer
+ gsb *Balancer
+ builder balancer.Builder
lastState balancer.State
subconns map[balancer.SubConn]bool // subconns created by this balancer
diff --git a/vendor/google.golang.org/grpc/internal/binarylog/method_logger.go b/vendor/google.golang.org/grpc/internal/binarylog/method_logger.go
index 0f31274a3..e8456a77c 100644
--- a/vendor/google.golang.org/grpc/internal/binarylog/method_logger.go
+++ b/vendor/google.golang.org/grpc/internal/binarylog/method_logger.go
@@ -25,11 +25,12 @@ import (
"sync/atomic"
"time"
- "github.com/golang/protobuf/proto"
- "github.com/golang/protobuf/ptypes"
binlogpb "google.golang.org/grpc/binarylog/grpc_binarylog_v1"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/status"
+ "google.golang.org/protobuf/proto"
+ "google.golang.org/protobuf/types/known/durationpb"
+ "google.golang.org/protobuf/types/known/timestamppb"
)
type callIDGenerator struct {
@@ -88,7 +89,7 @@ func NewTruncatingMethodLogger(h, m uint64) *TruncatingMethodLogger {
// in TruncatingMethodLogger as possible.
func (ml *TruncatingMethodLogger) Build(c LogEntryConfig) *binlogpb.GrpcLogEntry {
m := c.toProto()
- timestamp, _ := ptypes.TimestampProto(time.Now())
+ timestamp := timestamppb.Now()
m.Timestamp = timestamp
m.CallId = ml.callID
m.SequenceIdWithinCall = ml.idWithinCallGen.next()
@@ -178,7 +179,7 @@ func (c *ClientHeader) toProto() *binlogpb.GrpcLogEntry {
Authority: c.Authority,
}
if c.Timeout > 0 {
- clientHeader.Timeout = ptypes.DurationProto(c.Timeout)
+ clientHeader.Timeout = durationpb.New(c.Timeout)
}
ret := &binlogpb.GrpcLogEntry{
Type: binlogpb.GrpcLogEntry_EVENT_TYPE_CLIENT_HEADER,
diff --git a/vendor/google.golang.org/grpc/internal/binarylog/sink.go b/vendor/google.golang.org/grpc/internal/binarylog/sink.go
index 264de387c..9ea598b14 100644
--- a/vendor/google.golang.org/grpc/internal/binarylog/sink.go
+++ b/vendor/google.golang.org/grpc/internal/binarylog/sink.go
@@ -25,8 +25,8 @@ import (
"sync"
"time"
- "github.com/golang/protobuf/proto"
binlogpb "google.golang.org/grpc/binarylog/grpc_binarylog_v1"
+ "google.golang.org/protobuf/proto"
)
var (
diff --git a/vendor/google.golang.org/grpc/internal/channelz/channel.go b/vendor/google.golang.org/grpc/internal/channelz/channel.go
new file mode 100644
index 000000000..d7e9e1d54
--- /dev/null
+++ b/vendor/google.golang.org/grpc/internal/channelz/channel.go
@@ -0,0 +1,255 @@
+/*
+ *
+ * Copyright 2024 gRPC authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package channelz
+
+import (
+ "fmt"
+ "sync/atomic"
+
+ "google.golang.org/grpc/connectivity"
+)
+
+// Channel represents a channel within channelz, which includes metrics and
+// internal channelz data, such as channelz id, child list, etc.
+type Channel struct {
+ Entity
+ // ID is the channelz id of this channel.
+ ID int64
+ // RefName is the human readable reference string of this channel.
+ RefName string
+
+ closeCalled bool
+ nestedChans map[int64]string
+ subChans map[int64]string
+ Parent *Channel
+ trace *ChannelTrace
+ // traceRefCount is the number of trace events that reference this channel.
+ // Non-zero traceRefCount means the trace of this channel cannot be deleted.
+ traceRefCount int32
+
+ ChannelMetrics ChannelMetrics
+}
+
+// Implemented to make Channel implement the Identifier interface used for
+// nesting.
+func (c *Channel) channelzIdentifier() {}
+
+func (c *Channel) String() string {
+ if c.Parent == nil {
+ return fmt.Sprintf("Channel #%d", c.ID)
+ }
+ return fmt.Sprintf("%s Channel #%d", c.Parent, c.ID)
+}
+
+func (c *Channel) id() int64 {
+ return c.ID
+}
+
+func (c *Channel) SubChans() map[int64]string {
+ db.mu.RLock()
+ defer db.mu.RUnlock()
+ return copyMap(c.subChans)
+}
+
+func (c *Channel) NestedChans() map[int64]string {
+ db.mu.RLock()
+ defer db.mu.RUnlock()
+ return copyMap(c.nestedChans)
+}
+
+func (c *Channel) Trace() *ChannelTrace {
+ db.mu.RLock()
+ defer db.mu.RUnlock()
+ return c.trace.copy()
+}
+
+type ChannelMetrics struct {
+ // The current connectivity state of the channel.
+ State atomic.Pointer[connectivity.State]
+ // The target this channel originally tried to connect to. May be absent
+ Target atomic.Pointer[string]
+ // The number of calls started on the channel.
+ CallsStarted atomic.Int64
+ // The number of calls that have completed with an OK status.
+ CallsSucceeded atomic.Int64
+ // The number of calls that have a completed with a non-OK status.
+ CallsFailed atomic.Int64
+ // The last time a call was started on the channel.
+ LastCallStartedTimestamp atomic.Int64
+}
+
+// CopyFrom copies the metrics in o to c. For testing only.
+func (c *ChannelMetrics) CopyFrom(o *ChannelMetrics) {
+ c.State.Store(o.State.Load())
+ c.Target.Store(o.Target.Load())
+ c.CallsStarted.Store(o.CallsStarted.Load())
+ c.CallsSucceeded.Store(o.CallsSucceeded.Load())
+ c.CallsFailed.Store(o.CallsFailed.Load())
+ c.LastCallStartedTimestamp.Store(o.LastCallStartedTimestamp.Load())
+}
+
+// Equal returns true iff the metrics of c are the same as the metrics of o.
+// For testing only.
+func (c *ChannelMetrics) Equal(o any) bool {
+ oc, ok := o.(*ChannelMetrics)
+ if !ok {
+ return false
+ }
+ if (c.State.Load() == nil) != (oc.State.Load() == nil) {
+ return false
+ }
+ if c.State.Load() != nil && *c.State.Load() != *oc.State.Load() {
+ return false
+ }
+ if (c.Target.Load() == nil) != (oc.Target.Load() == nil) {
+ return false
+ }
+ if c.Target.Load() != nil && *c.Target.Load() != *oc.Target.Load() {
+ return false
+ }
+ return c.CallsStarted.Load() == oc.CallsStarted.Load() &&
+ c.CallsFailed.Load() == oc.CallsFailed.Load() &&
+ c.CallsSucceeded.Load() == oc.CallsSucceeded.Load() &&
+ c.LastCallStartedTimestamp.Load() == oc.LastCallStartedTimestamp.Load()
+}
+
+func strFromPointer(s *string) string {
+ if s == nil {
+ return ""
+ }
+ return *s
+}
+
+func (c *ChannelMetrics) String() string {
+ return fmt.Sprintf("State: %v, Target: %s, CallsStarted: %v, CallsSucceeded: %v, CallsFailed: %v, LastCallStartedTimestamp: %v",
+ c.State.Load(), strFromPointer(c.Target.Load()), c.CallsStarted.Load(), c.CallsSucceeded.Load(), c.CallsFailed.Load(), c.LastCallStartedTimestamp.Load(),
+ )
+}
+
+func NewChannelMetricForTesting(state connectivity.State, target string, started, succeeded, failed, timestamp int64) *ChannelMetrics {
+ c := &ChannelMetrics{}
+ c.State.Store(&state)
+ c.Target.Store(&target)
+ c.CallsStarted.Store(started)
+ c.CallsSucceeded.Store(succeeded)
+ c.CallsFailed.Store(failed)
+ c.LastCallStartedTimestamp.Store(timestamp)
+ return c
+}
+
+func (c *Channel) addChild(id int64, e entry) {
+ switch v := e.(type) {
+ case *SubChannel:
+ c.subChans[id] = v.RefName
+ case *Channel:
+ c.nestedChans[id] = v.RefName
+ default:
+ logger.Errorf("cannot add a child (id = %d) of type %T to a channel", id, e)
+ }
+}
+
+func (c *Channel) deleteChild(id int64) {
+ delete(c.subChans, id)
+ delete(c.nestedChans, id)
+ c.deleteSelfIfReady()
+}
+
+func (c *Channel) triggerDelete() {
+ c.closeCalled = true
+ c.deleteSelfIfReady()
+}
+
+func (c *Channel) getParentID() int64 {
+ if c.Parent == nil {
+ return -1
+ }
+ return c.Parent.ID
+}
+
+// deleteSelfFromTree tries to delete the channel from the channelz entry relation tree, which means
+// deleting the channel reference from its parent's child list.
+//
+// In order for a channel to be deleted from the tree, it must meet the criteria that, removal of the
+// corresponding grpc object has been invoked, and the channel does not have any children left.
+//
+// The returned boolean value indicates whether the channel has been successfully deleted from tree.
+func (c *Channel) deleteSelfFromTree() (deleted bool) {
+ if !c.closeCalled || len(c.subChans)+len(c.nestedChans) != 0 {
+ return false
+ }
+ // not top channel
+ if c.Parent != nil {
+ c.Parent.deleteChild(c.ID)
+ }
+ return true
+}
+
+// deleteSelfFromMap checks whether it is valid to delete the channel from the map, which means
+// deleting the channel from channelz's tracking entirely. Users can no longer use id to query the
+// channel, and its memory will be garbage collected.
+//
+// The trace reference count of the channel must be 0 in order to be deleted from the map. This is
+// specified in the channel tracing gRFC that as long as some other trace has reference to an entity,
+// the trace of the referenced entity must not be deleted. In order to release the resource allocated
+// by grpc, the reference to the grpc object is reset to a dummy object.
+//
+// deleteSelfFromMap must be called after deleteSelfFromTree returns true.
+//
+// It returns a bool to indicate whether the channel can be safely deleted from map.
+func (c *Channel) deleteSelfFromMap() (delete bool) {
+ return c.getTraceRefCount() == 0
+}
+
+// deleteSelfIfReady tries to delete the channel itself from the channelz database.
+// The delete process includes two steps:
+// 1. delete the channel from the entry relation tree, i.e. delete the channel reference from its
+// parent's child list.
+// 2. delete the channel from the map, i.e. delete the channel entirely from channelz. Lookup by id
+// will return entry not found error.
+func (c *Channel) deleteSelfIfReady() {
+ if !c.deleteSelfFromTree() {
+ return
+ }
+ if !c.deleteSelfFromMap() {
+ return
+ }
+ db.deleteEntry(c.ID)
+ c.trace.clear()
+}
+
+func (c *Channel) getChannelTrace() *ChannelTrace {
+ return c.trace
+}
+
+func (c *Channel) incrTraceRefCount() {
+ atomic.AddInt32(&c.traceRefCount, 1)
+}
+
+func (c *Channel) decrTraceRefCount() {
+ atomic.AddInt32(&c.traceRefCount, -1)
+}
+
+func (c *Channel) getTraceRefCount() int {
+ i := atomic.LoadInt32(&c.traceRefCount)
+ return int(i)
+}
+
+func (c *Channel) getRefName() string {
+ return c.RefName
+}
diff --git a/vendor/google.golang.org/grpc/internal/channelz/channelmap.go b/vendor/google.golang.org/grpc/internal/channelz/channelmap.go
new file mode 100644
index 000000000..dfe18b089
--- /dev/null
+++ b/vendor/google.golang.org/grpc/internal/channelz/channelmap.go
@@ -0,0 +1,402 @@
+/*
+ *
+ * Copyright 2018 gRPC authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package channelz
+
+import (
+ "fmt"
+ "sort"
+ "sync"
+ "time"
+)
+
+// entry represents a node in the channelz database.
+type entry interface {
+ // addChild adds a child e, whose channelz id is id to child list
+ addChild(id int64, e entry)
+ // deleteChild deletes a child with channelz id to be id from child list
+ deleteChild(id int64)
+ // triggerDelete tries to delete self from channelz database. However, if
+ // child list is not empty, then deletion from the database is on hold until
+ // the last child is deleted from database.
+ triggerDelete()
+ // deleteSelfIfReady check whether triggerDelete() has been called before,
+ // and whether child list is now empty. If both conditions are met, then
+ // delete self from database.
+ deleteSelfIfReady()
+ // getParentID returns parent ID of the entry. 0 value parent ID means no parent.
+ getParentID() int64
+ Entity
+}
+
+// channelMap is the storage data structure for channelz.
+//
+// Methods of channelMap can be divided in two two categories with respect to
+// locking.
+//
+// 1. Methods acquire the global lock.
+// 2. Methods that can only be called when global lock is held.
+//
+// A second type of method need always to be called inside a first type of method.
+type channelMap struct {
+ mu sync.RWMutex
+ topLevelChannels map[int64]struct{}
+ channels map[int64]*Channel
+ subChannels map[int64]*SubChannel
+ sockets map[int64]*Socket
+ servers map[int64]*Server
+}
+
+func newChannelMap() *channelMap {
+ return &channelMap{
+ topLevelChannels: make(map[int64]struct{}),
+ channels: make(map[int64]*Channel),
+ subChannels: make(map[int64]*SubChannel),
+ sockets: make(map[int64]*Socket),
+ servers: make(map[int64]*Server),
+ }
+}
+
+func (c *channelMap) addServer(id int64, s *Server) {
+ c.mu.Lock()
+ defer c.mu.Unlock()
+ s.cm = c
+ c.servers[id] = s
+}
+
+func (c *channelMap) addChannel(id int64, cn *Channel, isTopChannel bool, pid int64) {
+ c.mu.Lock()
+ defer c.mu.Unlock()
+ cn.trace.cm = c
+ c.channels[id] = cn
+ if isTopChannel {
+ c.topLevelChannels[id] = struct{}{}
+ } else if p := c.channels[pid]; p != nil {
+ p.addChild(id, cn)
+ } else {
+ logger.Infof("channel %d references invalid parent ID %d", id, pid)
+ }
+}
+
+func (c *channelMap) addSubChannel(id int64, sc *SubChannel, pid int64) {
+ c.mu.Lock()
+ defer c.mu.Unlock()
+ sc.trace.cm = c
+ c.subChannels[id] = sc
+ if p := c.channels[pid]; p != nil {
+ p.addChild(id, sc)
+ } else {
+ logger.Infof("subchannel %d references invalid parent ID %d", id, pid)
+ }
+}
+
+func (c *channelMap) addSocket(s *Socket) {
+ c.mu.Lock()
+ defer c.mu.Unlock()
+ s.cm = c
+ c.sockets[s.ID] = s
+ if s.Parent == nil {
+ logger.Infof("normal socket %d has no parent", s.ID)
+ }
+ s.Parent.(entry).addChild(s.ID, s)
+}
+
+// removeEntry triggers the removal of an entry, which may not indeed delete the
+// entry, if it has to wait on the deletion of its children and until no other
+// entity's channel trace references it. It may lead to a chain of entry
+// deletion. For example, deleting the last socket of a gracefully shutting down
+// server will lead to the server being also deleted.
+func (c *channelMap) removeEntry(id int64) {
+ c.mu.Lock()
+ defer c.mu.Unlock()
+ c.findEntry(id).triggerDelete()
+}
+
+// tracedChannel represents tracing operations which are present on both
+// channels and subChannels.
+type tracedChannel interface {
+ getChannelTrace() *ChannelTrace
+ incrTraceRefCount()
+ decrTraceRefCount()
+ getRefName() string
+}
+
+// c.mu must be held by the caller
+func (c *channelMap) decrTraceRefCount(id int64) {
+ e := c.findEntry(id)
+ if v, ok := e.(tracedChannel); ok {
+ v.decrTraceRefCount()
+ e.deleteSelfIfReady()
+ }
+}
+
+// c.mu must be held by the caller.
+func (c *channelMap) findEntry(id int64) entry {
+ if v, ok := c.channels[id]; ok {
+ return v
+ }
+ if v, ok := c.subChannels[id]; ok {
+ return v
+ }
+ if v, ok := c.servers[id]; ok {
+ return v
+ }
+ if v, ok := c.sockets[id]; ok {
+ return v
+ }
+ return &dummyEntry{idNotFound: id}
+}
+
+// c.mu must be held by the caller
+//
+// deleteEntry deletes an entry from the channelMap. Before calling this method,
+// caller must check this entry is ready to be deleted, i.e removeEntry() has
+// been called on it, and no children still exist.
+func (c *channelMap) deleteEntry(id int64) entry {
+ if v, ok := c.sockets[id]; ok {
+ delete(c.sockets, id)
+ return v
+ }
+ if v, ok := c.subChannels[id]; ok {
+ delete(c.subChannels, id)
+ return v
+ }
+ if v, ok := c.channels[id]; ok {
+ delete(c.channels, id)
+ delete(c.topLevelChannels, id)
+ return v
+ }
+ if v, ok := c.servers[id]; ok {
+ delete(c.servers, id)
+ return v
+ }
+ return &dummyEntry{idNotFound: id}
+}
+
+func (c *channelMap) traceEvent(id int64, desc *TraceEvent) {
+ c.mu.Lock()
+ defer c.mu.Unlock()
+ child := c.findEntry(id)
+ childTC, ok := child.(tracedChannel)
+ if !ok {
+ return
+ }
+ childTC.getChannelTrace().append(&traceEvent{Desc: desc.Desc, Severity: desc.Severity, Timestamp: time.Now()})
+ if desc.Parent != nil {
+ parent := c.findEntry(child.getParentID())
+ var chanType RefChannelType
+ switch child.(type) {
+ case *Channel:
+ chanType = RefChannel
+ case *SubChannel:
+ chanType = RefSubChannel
+ }
+ if parentTC, ok := parent.(tracedChannel); ok {
+ parentTC.getChannelTrace().append(&traceEvent{
+ Desc: desc.Parent.Desc,
+ Severity: desc.Parent.Severity,
+ Timestamp: time.Now(),
+ RefID: id,
+ RefName: childTC.getRefName(),
+ RefType: chanType,
+ })
+ childTC.incrTraceRefCount()
+ }
+ }
+}
+
+type int64Slice []int64
+
+func (s int64Slice) Len() int { return len(s) }
+func (s int64Slice) Swap(i, j int) { s[i], s[j] = s[j], s[i] }
+func (s int64Slice) Less(i, j int) bool { return s[i] < s[j] }
+
+func copyMap(m map[int64]string) map[int64]string {
+ n := make(map[int64]string)
+ for k, v := range m {
+ n[k] = v
+ }
+ return n
+}
+
+func min(a, b int) int {
+ if a < b {
+ return a
+ }
+ return b
+}
+
+func (c *channelMap) getTopChannels(id int64, maxResults int) ([]*Channel, bool) {
+ if maxResults <= 0 {
+ maxResults = EntriesPerPage
+ }
+ c.mu.RLock()
+ defer c.mu.RUnlock()
+ l := int64(len(c.topLevelChannels))
+ ids := make([]int64, 0, l)
+
+ for k := range c.topLevelChannels {
+ ids = append(ids, k)
+ }
+ sort.Sort(int64Slice(ids))
+ idx := sort.Search(len(ids), func(i int) bool { return ids[i] >= id })
+ end := true
+ var t []*Channel
+ for _, v := range ids[idx:] {
+ if len(t) == maxResults {
+ end = false
+ break
+ }
+ if cn, ok := c.channels[v]; ok {
+ t = append(t, cn)
+ }
+ }
+ return t, end
+}
+
+func (c *channelMap) getServers(id int64, maxResults int) ([]*Server, bool) {
+ if maxResults <= 0 {
+ maxResults = EntriesPerPage
+ }
+ c.mu.RLock()
+ defer c.mu.RUnlock()
+ ids := make([]int64, 0, len(c.servers))
+ for k := range c.servers {
+ ids = append(ids, k)
+ }
+ sort.Sort(int64Slice(ids))
+ idx := sort.Search(len(ids), func(i int) bool { return ids[i] >= id })
+ end := true
+ var s []*Server
+ for _, v := range ids[idx:] {
+ if len(s) == maxResults {
+ end = false
+ break
+ }
+ if svr, ok := c.servers[v]; ok {
+ s = append(s, svr)
+ }
+ }
+ return s, end
+}
+
+func (c *channelMap) getServerSockets(id int64, startID int64, maxResults int) ([]*Socket, bool) {
+ if maxResults <= 0 {
+ maxResults = EntriesPerPage
+ }
+ c.mu.RLock()
+ defer c.mu.RUnlock()
+ svr, ok := c.servers[id]
+ if !ok {
+ // server with id doesn't exist.
+ return nil, true
+ }
+ svrskts := svr.sockets
+ ids := make([]int64, 0, len(svrskts))
+ sks := make([]*Socket, 0, min(len(svrskts), maxResults))
+ for k := range svrskts {
+ ids = append(ids, k)
+ }
+ sort.Sort(int64Slice(ids))
+ idx := sort.Search(len(ids), func(i int) bool { return ids[i] >= startID })
+ end := true
+ for _, v := range ids[idx:] {
+ if len(sks) == maxResults {
+ end = false
+ break
+ }
+ if ns, ok := c.sockets[v]; ok {
+ sks = append(sks, ns)
+ }
+ }
+ return sks, end
+}
+
+func (c *channelMap) getChannel(id int64) *Channel {
+ c.mu.RLock()
+ defer c.mu.RUnlock()
+ return c.channels[id]
+}
+
+func (c *channelMap) getSubChannel(id int64) *SubChannel {
+ c.mu.RLock()
+ defer c.mu.RUnlock()
+ return c.subChannels[id]
+}
+
+func (c *channelMap) getSocket(id int64) *Socket {
+ c.mu.RLock()
+ defer c.mu.RUnlock()
+ return c.sockets[id]
+}
+
+func (c *channelMap) getServer(id int64) *Server {
+ c.mu.RLock()
+ defer c.mu.RUnlock()
+ return c.servers[id]
+}
+
+type dummyEntry struct {
+ // dummyEntry is a fake entry to handle entry not found case.
+ idNotFound int64
+ Entity
+}
+
+func (d *dummyEntry) String() string {
+ return fmt.Sprintf("non-existent entity #%d", d.idNotFound)
+}
+
+func (d *dummyEntry) ID() int64 { return d.idNotFound }
+
+func (d *dummyEntry) addChild(id int64, e entry) {
+ // Note: It is possible for a normal program to reach here under race
+ // condition. For example, there could be a race between ClientConn.Close()
+ // info being propagated to addrConn and http2Client. ClientConn.Close()
+ // cancel the context and result in http2Client to error. The error info is
+ // then caught by transport monitor and before addrConn.tearDown() is called
+ // in side ClientConn.Close(). Therefore, the addrConn will create a new
+ // transport. And when registering the new transport in channelz, its parent
+ // addrConn could have already been torn down and deleted from channelz
+ // tracking, and thus reach the code here.
+ logger.Infof("attempt to add child of type %T with id %d to a parent (id=%d) that doesn't currently exist", e, id, d.idNotFound)
+}
+
+func (d *dummyEntry) deleteChild(id int64) {
+ // It is possible for a normal program to reach here under race condition.
+ // Refer to the example described in addChild().
+ logger.Infof("attempt to delete child with id %d from a parent (id=%d) that doesn't currently exist", id, d.idNotFound)
+}
+
+func (d *dummyEntry) triggerDelete() {
+ logger.Warningf("attempt to delete an entry (id=%d) that doesn't currently exist", d.idNotFound)
+}
+
+func (*dummyEntry) deleteSelfIfReady() {
+ // code should not reach here. deleteSelfIfReady is always called on an existing entry.
+}
+
+func (*dummyEntry) getParentID() int64 {
+ return 0
+}
+
+// Entity is implemented by all channelz types.
+type Entity interface {
+ isEntity()
+ fmt.Stringer
+ id() int64
+}
diff --git a/vendor/google.golang.org/grpc/internal/channelz/funcs.go b/vendor/google.golang.org/grpc/internal/channelz/funcs.go
index fc094f344..f461e9bc3 100644
--- a/vendor/google.golang.org/grpc/internal/channelz/funcs.go
+++ b/vendor/google.golang.org/grpc/internal/channelz/funcs.go
@@ -16,47 +16,32 @@
*
*/
-// Package channelz defines APIs for enabling channelz service, entry
+// Package channelz defines internal APIs for enabling channelz service, entry
// registration/deletion, and accessing channelz data. It also defines channelz
// metric struct formats.
-//
-// All APIs in this package are experimental.
package channelz
import (
- "errors"
- "sort"
- "sync"
"sync/atomic"
"time"
- "google.golang.org/grpc/grpclog"
"google.golang.org/grpc/internal"
)
-const (
- defaultMaxTraceEntry int32 = 30
-)
-
var (
// IDGen is the global channelz entity ID generator. It should not be used
// outside this package except by tests.
IDGen IDGenerator
- db dbWrapper
- // EntryPerPage defines the number of channelz entries to be shown on a web page.
- EntryPerPage = int64(50)
- curState int32
- maxTraceEntry = defaultMaxTraceEntry
+ db *channelMap = newChannelMap()
+ // EntriesPerPage defines the number of channelz entries to be shown on a web page.
+ EntriesPerPage = 50
+ curState int32
)
// TurnOn turns on channelz data collection.
func TurnOn() {
- if !IsOn() {
- db.set(newChannelMap())
- IDGen.Reset()
- atomic.StoreInt32(&curState, 1)
- }
+ atomic.StoreInt32(&curState, 1)
}
func init() {
@@ -70,49 +55,15 @@ func IsOn() bool {
return atomic.LoadInt32(&curState) == 1
}
-// SetMaxTraceEntry sets maximum number of trace entry per entity (i.e. channel/subchannel).
-// Setting it to 0 will disable channel tracing.
-func SetMaxTraceEntry(i int32) {
- atomic.StoreInt32(&maxTraceEntry, i)
-}
-
-// ResetMaxTraceEntryToDefault resets the maximum number of trace entry per entity to default.
-func ResetMaxTraceEntryToDefault() {
- atomic.StoreInt32(&maxTraceEntry, defaultMaxTraceEntry)
-}
-
-func getMaxTraceEntry() int {
- i := atomic.LoadInt32(&maxTraceEntry)
- return int(i)
-}
-
-// dbWarpper wraps around a reference to internal channelz data storage, and
-// provide synchronized functionality to set and get the reference.
-type dbWrapper struct {
- mu sync.RWMutex
- DB *channelMap
-}
-
-func (d *dbWrapper) set(db *channelMap) {
- d.mu.Lock()
- d.DB = db
- d.mu.Unlock()
-}
-
-func (d *dbWrapper) get() *channelMap {
- d.mu.RLock()
- defer d.mu.RUnlock()
- return d.DB
-}
-
// GetTopChannels returns a slice of top channel's ChannelMetric, along with a
// boolean indicating whether there's more top channels to be queried for.
//
-// The arg id specifies that only top channel with id at or above it will be included
-// in the result. The returned slice is up to a length of the arg maxResults or
-// EntryPerPage if maxResults is zero, and is sorted in ascending id order.
-func GetTopChannels(id int64, maxResults int64) ([]*ChannelMetric, bool) {
- return db.get().GetTopChannels(id, maxResults)
+// The arg id specifies that only top channel with id at or above it will be
+// included in the result. The returned slice is up to a length of the arg
+// maxResults or EntriesPerPage if maxResults is zero, and is sorted in ascending
+// id order.
+func GetTopChannels(id int64, maxResults int) ([]*Channel, bool) {
+ return db.getTopChannels(id, maxResults)
}
// GetServers returns a slice of server's ServerMetric, along with a
@@ -120,73 +71,69 @@ func GetTopChannels(id int64, maxResults int64) ([]*ChannelMetric, bool) {
//
// The arg id specifies that only server with id at or above it will be included
// in the result. The returned slice is up to a length of the arg maxResults or
-// EntryPerPage if maxResults is zero, and is sorted in ascending id order.
-func GetServers(id int64, maxResults int64) ([]*ServerMetric, bool) {
- return db.get().GetServers(id, maxResults)
+// EntriesPerPage if maxResults is zero, and is sorted in ascending id order.
+func GetServers(id int64, maxResults int) ([]*Server, bool) {
+ return db.getServers(id, maxResults)
}
// GetServerSockets returns a slice of server's (identified by id) normal socket's
-// SocketMetric, along with a boolean indicating whether there's more sockets to
+// SocketMetrics, along with a boolean indicating whether there's more sockets to
// be queried for.
//
// The arg startID specifies that only sockets with id at or above it will be
// included in the result. The returned slice is up to a length of the arg maxResults
-// or EntryPerPage if maxResults is zero, and is sorted in ascending id order.
-func GetServerSockets(id int64, startID int64, maxResults int64) ([]*SocketMetric, bool) {
- return db.get().GetServerSockets(id, startID, maxResults)
+// or EntriesPerPage if maxResults is zero, and is sorted in ascending id order.
+func GetServerSockets(id int64, startID int64, maxResults int) ([]*Socket, bool) {
+ return db.getServerSockets(id, startID, maxResults)
}
-// GetChannel returns the ChannelMetric for the channel (identified by id).
-func GetChannel(id int64) *ChannelMetric {
- return db.get().GetChannel(id)
+// GetChannel returns the Channel for the channel (identified by id).
+func GetChannel(id int64) *Channel {
+ return db.getChannel(id)
}
-// GetSubChannel returns the SubChannelMetric for the subchannel (identified by id).
-func GetSubChannel(id int64) *SubChannelMetric {
- return db.get().GetSubChannel(id)
+// GetSubChannel returns the SubChannel for the subchannel (identified by id).
+func GetSubChannel(id int64) *SubChannel {
+ return db.getSubChannel(id)
}
-// GetSocket returns the SocketInternalMetric for the socket (identified by id).
-func GetSocket(id int64) *SocketMetric {
- return db.get().GetSocket(id)
+// GetSocket returns the Socket for the socket (identified by id).
+func GetSocket(id int64) *Socket {
+ return db.getSocket(id)
}
// GetServer returns the ServerMetric for the server (identified by id).
-func GetServer(id int64) *ServerMetric {
- return db.get().GetServer(id)
+func GetServer(id int64) *Server {
+ return db.getServer(id)
}
// RegisterChannel registers the given channel c in the channelz database with
-// ref as its reference name, and adds it to the child list of its parent
-// (identified by pid). pid == nil means no parent.
+// target as its target and reference name, and adds it to the child list of its
+// parent. parent == nil means no parent.
//
// Returns a unique channelz identifier assigned to this channel.
//
// If channelz is not turned ON, the channelz database is not mutated.
-func RegisterChannel(c Channel, pid *Identifier, ref string) *Identifier {
+func RegisterChannel(parent *Channel, target string) *Channel {
id := IDGen.genID()
- var parent int64
- isTopChannel := true
- if pid != nil {
- isTopChannel = false
- parent = pid.Int()
- }
if !IsOn() {
- return newIdentifer(RefChannel, id, pid)
+ return &Channel{ID: id}
}
- cn := &channel{
- refName: ref,
- c: c,
- subChans: make(map[int64]string),
+ isTopChannel := parent == nil
+
+ cn := &Channel{
+ ID: id,
+ RefName: target,
nestedChans: make(map[int64]string),
- id: id,
- pid: parent,
- trace: &channelTrace{createdTime: time.Now(), events: make([]*TraceEvent, 0, getMaxTraceEntry())},
+ subChans: make(map[int64]string),
+ Parent: parent,
+ trace: &ChannelTrace{CreationTime: time.Now(), Events: make([]*traceEvent, 0, getMaxTraceEntry())},
}
- db.get().addChannel(id, cn, isTopChannel, parent)
- return newIdentifer(RefChannel, id, pid)
+ cn.ChannelMetrics.Target.Store(&target)
+ db.addChannel(id, cn, isTopChannel, cn.getParentID())
+ return cn
}
// RegisterSubChannel registers the given subChannel c in the channelz database
@@ -196,555 +143,66 @@ func RegisterChannel(c Channel, pid *Identifier, ref string) *Identifier {
// Returns a unique channelz identifier assigned to this subChannel.
//
// If channelz is not turned ON, the channelz database is not mutated.
-func RegisterSubChannel(c Channel, pid *Identifier, ref string) (*Identifier, error) {
- if pid == nil {
- return nil, errors.New("a SubChannel's parent id cannot be nil")
- }
+func RegisterSubChannel(pid int64, ref string) *SubChannel {
id := IDGen.genID()
if !IsOn() {
- return newIdentifer(RefSubChannel, id, pid), nil
+ return &SubChannel{ID: id}
}
- sc := &subChannel{
- refName: ref,
- c: c,
+ sc := &SubChannel{
+ RefName: ref,
+ ID: id,
sockets: make(map[int64]string),
- id: id,
- pid: pid.Int(),
- trace: &channelTrace{createdTime: time.Now(), events: make([]*TraceEvent, 0, getMaxTraceEntry())},
+ parent: db.getChannel(pid),
+ trace: &ChannelTrace{CreationTime: time.Now(), Events: make([]*traceEvent, 0, getMaxTraceEntry())},
}
- db.get().addSubChannel(id, sc, pid.Int())
- return newIdentifer(RefSubChannel, id, pid), nil
+ db.addSubChannel(id, sc, pid)
+ return sc
}
// RegisterServer registers the given server s in channelz database. It returns
// the unique channelz tracking id assigned to this server.
//
// If channelz is not turned ON, the channelz database is not mutated.
-func RegisterServer(s Server, ref string) *Identifier {
+func RegisterServer(ref string) *Server {
id := IDGen.genID()
if !IsOn() {
- return newIdentifer(RefServer, id, nil)
+ return &Server{ID: id}
}
- svr := &server{
- refName: ref,
- s: s,
+ svr := &Server{
+ RefName: ref,
sockets: make(map[int64]string),
listenSockets: make(map[int64]string),
- id: id,
- }
- db.get().addServer(id, svr)
- return newIdentifer(RefServer, id, nil)
-}
-
-// RegisterListenSocket registers the given listen socket s in channelz database
-// with ref as its reference name, and add it to the child list of its parent
-// (identified by pid). It returns the unique channelz tracking id assigned to
-// this listen socket.
-//
-// If channelz is not turned ON, the channelz database is not mutated.
-func RegisterListenSocket(s Socket, pid *Identifier, ref string) (*Identifier, error) {
- if pid == nil {
- return nil, errors.New("a ListenSocket's parent id cannot be 0")
+ ID: id,
}
- id := IDGen.genID()
- if !IsOn() {
- return newIdentifer(RefListenSocket, id, pid), nil
- }
-
- ls := &listenSocket{refName: ref, s: s, id: id, pid: pid.Int()}
- db.get().addListenSocket(id, ls, pid.Int())
- return newIdentifer(RefListenSocket, id, pid), nil
+ db.addServer(id, svr)
+ return svr
}
-// RegisterNormalSocket registers the given normal socket s in channelz database
+// RegisterSocket registers the given normal socket s in channelz database
// with ref as its reference name, and adds it to the child list of its parent
-// (identified by pid). It returns the unique channelz tracking id assigned to
-// this normal socket.
+// (identified by skt.Parent, which must be set). It returns the unique channelz
+// tracking id assigned to this normal socket.
//
// If channelz is not turned ON, the channelz database is not mutated.
-func RegisterNormalSocket(s Socket, pid *Identifier, ref string) (*Identifier, error) {
- if pid == nil {
- return nil, errors.New("a NormalSocket's parent id cannot be 0")
- }
- id := IDGen.genID()
- if !IsOn() {
- return newIdentifer(RefNormalSocket, id, pid), nil
+func RegisterSocket(skt *Socket) *Socket {
+ skt.ID = IDGen.genID()
+ if IsOn() {
+ db.addSocket(skt)
}
-
- ns := &normalSocket{refName: ref, s: s, id: id, pid: pid.Int()}
- db.get().addNormalSocket(id, ns, pid.Int())
- return newIdentifer(RefNormalSocket, id, pid), nil
+ return skt
}
// RemoveEntry removes an entry with unique channelz tracking id to be id from
// channelz database.
//
// If channelz is not turned ON, this function is a no-op.
-func RemoveEntry(id *Identifier) {
+func RemoveEntry(id int64) {
if !IsOn() {
return
}
- db.get().removeEntry(id.Int())
-}
-
-// TraceEventDesc is what the caller of AddTraceEvent should provide to describe
-// the event to be added to the channel trace.
-//
-// The Parent field is optional. It is used for an event that will be recorded
-// in the entity's parent trace.
-type TraceEventDesc struct {
- Desc string
- Severity Severity
- Parent *TraceEventDesc
-}
-
-// AddTraceEvent adds trace related to the entity with specified id, using the
-// provided TraceEventDesc.
-//
-// If channelz is not turned ON, this will simply log the event descriptions.
-func AddTraceEvent(l grpclog.DepthLoggerV2, id *Identifier, depth int, desc *TraceEventDesc) {
- // Log only the trace description associated with the bottom most entity.
- switch desc.Severity {
- case CtUnknown, CtInfo:
- l.InfoDepth(depth+1, withParens(id)+desc.Desc)
- case CtWarning:
- l.WarningDepth(depth+1, withParens(id)+desc.Desc)
- case CtError:
- l.ErrorDepth(depth+1, withParens(id)+desc.Desc)
- }
-
- if getMaxTraceEntry() == 0 {
- return
- }
- if IsOn() {
- db.get().traceEvent(id.Int(), desc)
- }
-}
-
-// channelMap is the storage data structure for channelz.
-// Methods of channelMap can be divided in two two categories with respect to locking.
-// 1. Methods acquire the global lock.
-// 2. Methods that can only be called when global lock is held.
-// A second type of method need always to be called inside a first type of method.
-type channelMap struct {
- mu sync.RWMutex
- topLevelChannels map[int64]struct{}
- servers map[int64]*server
- channels map[int64]*channel
- subChannels map[int64]*subChannel
- listenSockets map[int64]*listenSocket
- normalSockets map[int64]*normalSocket
-}
-
-func newChannelMap() *channelMap {
- return &channelMap{
- topLevelChannels: make(map[int64]struct{}),
- channels: make(map[int64]*channel),
- listenSockets: make(map[int64]*listenSocket),
- normalSockets: make(map[int64]*normalSocket),
- servers: make(map[int64]*server),
- subChannels: make(map[int64]*subChannel),
- }
-}
-
-func (c *channelMap) addServer(id int64, s *server) {
- c.mu.Lock()
- s.cm = c
- c.servers[id] = s
- c.mu.Unlock()
-}
-
-func (c *channelMap) addChannel(id int64, cn *channel, isTopChannel bool, pid int64) {
- c.mu.Lock()
- cn.cm = c
- cn.trace.cm = c
- c.channels[id] = cn
- if isTopChannel {
- c.topLevelChannels[id] = struct{}{}
- } else {
- c.findEntry(pid).addChild(id, cn)
- }
- c.mu.Unlock()
-}
-
-func (c *channelMap) addSubChannel(id int64, sc *subChannel, pid int64) {
- c.mu.Lock()
- sc.cm = c
- sc.trace.cm = c
- c.subChannels[id] = sc
- c.findEntry(pid).addChild(id, sc)
- c.mu.Unlock()
-}
-
-func (c *channelMap) addListenSocket(id int64, ls *listenSocket, pid int64) {
- c.mu.Lock()
- ls.cm = c
- c.listenSockets[id] = ls
- c.findEntry(pid).addChild(id, ls)
- c.mu.Unlock()
-}
-
-func (c *channelMap) addNormalSocket(id int64, ns *normalSocket, pid int64) {
- c.mu.Lock()
- ns.cm = c
- c.normalSockets[id] = ns
- c.findEntry(pid).addChild(id, ns)
- c.mu.Unlock()
-}
-
-// removeEntry triggers the removal of an entry, which may not indeed delete the entry, if it has to
-// wait on the deletion of its children and until no other entity's channel trace references it.
-// It may lead to a chain of entry deletion. For example, deleting the last socket of a gracefully
-// shutting down server will lead to the server being also deleted.
-func (c *channelMap) removeEntry(id int64) {
- c.mu.Lock()
- c.findEntry(id).triggerDelete()
- c.mu.Unlock()
-}
-
-// c.mu must be held by the caller
-func (c *channelMap) decrTraceRefCount(id int64) {
- e := c.findEntry(id)
- if v, ok := e.(tracedChannel); ok {
- v.decrTraceRefCount()
- e.deleteSelfIfReady()
- }
-}
-
-// c.mu must be held by the caller.
-func (c *channelMap) findEntry(id int64) entry {
- var v entry
- var ok bool
- if v, ok = c.channels[id]; ok {
- return v
- }
- if v, ok = c.subChannels[id]; ok {
- return v
- }
- if v, ok = c.servers[id]; ok {
- return v
- }
- if v, ok = c.listenSockets[id]; ok {
- return v
- }
- if v, ok = c.normalSockets[id]; ok {
- return v
- }
- return &dummyEntry{idNotFound: id}
-}
-
-// c.mu must be held by the caller
-// deleteEntry simply deletes an entry from the channelMap. Before calling this
-// method, caller must check this entry is ready to be deleted, i.e removeEntry()
-// has been called on it, and no children still exist.
-// Conditionals are ordered by the expected frequency of deletion of each entity
-// type, in order to optimize performance.
-func (c *channelMap) deleteEntry(id int64) {
- var ok bool
- if _, ok = c.normalSockets[id]; ok {
- delete(c.normalSockets, id)
- return
- }
- if _, ok = c.subChannels[id]; ok {
- delete(c.subChannels, id)
- return
- }
- if _, ok = c.channels[id]; ok {
- delete(c.channels, id)
- delete(c.topLevelChannels, id)
- return
- }
- if _, ok = c.listenSockets[id]; ok {
- delete(c.listenSockets, id)
- return
- }
- if _, ok = c.servers[id]; ok {
- delete(c.servers, id)
- return
- }
-}
-
-func (c *channelMap) traceEvent(id int64, desc *TraceEventDesc) {
- c.mu.Lock()
- child := c.findEntry(id)
- childTC, ok := child.(tracedChannel)
- if !ok {
- c.mu.Unlock()
- return
- }
- childTC.getChannelTrace().append(&TraceEvent{Desc: desc.Desc, Severity: desc.Severity, Timestamp: time.Now()})
- if desc.Parent != nil {
- parent := c.findEntry(child.getParentID())
- var chanType RefChannelType
- switch child.(type) {
- case *channel:
- chanType = RefChannel
- case *subChannel:
- chanType = RefSubChannel
- }
- if parentTC, ok := parent.(tracedChannel); ok {
- parentTC.getChannelTrace().append(&TraceEvent{
- Desc: desc.Parent.Desc,
- Severity: desc.Parent.Severity,
- Timestamp: time.Now(),
- RefID: id,
- RefName: childTC.getRefName(),
- RefType: chanType,
- })
- childTC.incrTraceRefCount()
- }
- }
- c.mu.Unlock()
-}
-
-type int64Slice []int64
-
-func (s int64Slice) Len() int { return len(s) }
-func (s int64Slice) Swap(i, j int) { s[i], s[j] = s[j], s[i] }
-func (s int64Slice) Less(i, j int) bool { return s[i] < s[j] }
-
-func copyMap(m map[int64]string) map[int64]string {
- n := make(map[int64]string)
- for k, v := range m {
- n[k] = v
- }
- return n
-}
-
-func min(a, b int64) int64 {
- if a < b {
- return a
- }
- return b
-}
-
-func (c *channelMap) GetTopChannels(id int64, maxResults int64) ([]*ChannelMetric, bool) {
- if maxResults <= 0 {
- maxResults = EntryPerPage
- }
- c.mu.RLock()
- l := int64(len(c.topLevelChannels))
- ids := make([]int64, 0, l)
- cns := make([]*channel, 0, min(l, maxResults))
-
- for k := range c.topLevelChannels {
- ids = append(ids, k)
- }
- sort.Sort(int64Slice(ids))
- idx := sort.Search(len(ids), func(i int) bool { return ids[i] >= id })
- count := int64(0)
- var end bool
- var t []*ChannelMetric
- for i, v := range ids[idx:] {
- if count == maxResults {
- break
- }
- if cn, ok := c.channels[v]; ok {
- cns = append(cns, cn)
- t = append(t, &ChannelMetric{
- NestedChans: copyMap(cn.nestedChans),
- SubChans: copyMap(cn.subChans),
- })
- count++
- }
- if i == len(ids[idx:])-1 {
- end = true
- break
- }
- }
- c.mu.RUnlock()
- if count == 0 {
- end = true
- }
-
- for i, cn := range cns {
- t[i].ChannelData = cn.c.ChannelzMetric()
- t[i].ID = cn.id
- t[i].RefName = cn.refName
- t[i].Trace = cn.trace.dumpData()
- }
- return t, end
-}
-
-func (c *channelMap) GetServers(id, maxResults int64) ([]*ServerMetric, bool) {
- if maxResults <= 0 {
- maxResults = EntryPerPage
- }
- c.mu.RLock()
- l := int64(len(c.servers))
- ids := make([]int64, 0, l)
- ss := make([]*server, 0, min(l, maxResults))
- for k := range c.servers {
- ids = append(ids, k)
- }
- sort.Sort(int64Slice(ids))
- idx := sort.Search(len(ids), func(i int) bool { return ids[i] >= id })
- count := int64(0)
- var end bool
- var s []*ServerMetric
- for i, v := range ids[idx:] {
- if count == maxResults {
- break
- }
- if svr, ok := c.servers[v]; ok {
- ss = append(ss, svr)
- s = append(s, &ServerMetric{
- ListenSockets: copyMap(svr.listenSockets),
- })
- count++
- }
- if i == len(ids[idx:])-1 {
- end = true
- break
- }
- }
- c.mu.RUnlock()
- if count == 0 {
- end = true
- }
-
- for i, svr := range ss {
- s[i].ServerData = svr.s.ChannelzMetric()
- s[i].ID = svr.id
- s[i].RefName = svr.refName
- }
- return s, end
-}
-
-func (c *channelMap) GetServerSockets(id int64, startID int64, maxResults int64) ([]*SocketMetric, bool) {
- if maxResults <= 0 {
- maxResults = EntryPerPage
- }
- var svr *server
- var ok bool
- c.mu.RLock()
- if svr, ok = c.servers[id]; !ok {
- // server with id doesn't exist.
- c.mu.RUnlock()
- return nil, true
- }
- svrskts := svr.sockets
- l := int64(len(svrskts))
- ids := make([]int64, 0, l)
- sks := make([]*normalSocket, 0, min(l, maxResults))
- for k := range svrskts {
- ids = append(ids, k)
- }
- sort.Sort(int64Slice(ids))
- idx := sort.Search(len(ids), func(i int) bool { return ids[i] >= startID })
- count := int64(0)
- var end bool
- for i, v := range ids[idx:] {
- if count == maxResults {
- break
- }
- if ns, ok := c.normalSockets[v]; ok {
- sks = append(sks, ns)
- count++
- }
- if i == len(ids[idx:])-1 {
- end = true
- break
- }
- }
- c.mu.RUnlock()
- if count == 0 {
- end = true
- }
- s := make([]*SocketMetric, 0, len(sks))
- for _, ns := range sks {
- sm := &SocketMetric{}
- sm.SocketData = ns.s.ChannelzMetric()
- sm.ID = ns.id
- sm.RefName = ns.refName
- s = append(s, sm)
- }
- return s, end
-}
-
-func (c *channelMap) GetChannel(id int64) *ChannelMetric {
- cm := &ChannelMetric{}
- var cn *channel
- var ok bool
- c.mu.RLock()
- if cn, ok = c.channels[id]; !ok {
- // channel with id doesn't exist.
- c.mu.RUnlock()
- return nil
- }
- cm.NestedChans = copyMap(cn.nestedChans)
- cm.SubChans = copyMap(cn.subChans)
- // cn.c can be set to &dummyChannel{} when deleteSelfFromMap is called. Save a copy of cn.c when
- // holding the lock to prevent potential data race.
- chanCopy := cn.c
- c.mu.RUnlock()
- cm.ChannelData = chanCopy.ChannelzMetric()
- cm.ID = cn.id
- cm.RefName = cn.refName
- cm.Trace = cn.trace.dumpData()
- return cm
-}
-
-func (c *channelMap) GetSubChannel(id int64) *SubChannelMetric {
- cm := &SubChannelMetric{}
- var sc *subChannel
- var ok bool
- c.mu.RLock()
- if sc, ok = c.subChannels[id]; !ok {
- // subchannel with id doesn't exist.
- c.mu.RUnlock()
- return nil
- }
- cm.Sockets = copyMap(sc.sockets)
- // sc.c can be set to &dummyChannel{} when deleteSelfFromMap is called. Save a copy of sc.c when
- // holding the lock to prevent potential data race.
- chanCopy := sc.c
- c.mu.RUnlock()
- cm.ChannelData = chanCopy.ChannelzMetric()
- cm.ID = sc.id
- cm.RefName = sc.refName
- cm.Trace = sc.trace.dumpData()
- return cm
-}
-
-func (c *channelMap) GetSocket(id int64) *SocketMetric {
- sm := &SocketMetric{}
- c.mu.RLock()
- if ls, ok := c.listenSockets[id]; ok {
- c.mu.RUnlock()
- sm.SocketData = ls.s.ChannelzMetric()
- sm.ID = ls.id
- sm.RefName = ls.refName
- return sm
- }
- if ns, ok := c.normalSockets[id]; ok {
- c.mu.RUnlock()
- sm.SocketData = ns.s.ChannelzMetric()
- sm.ID = ns.id
- sm.RefName = ns.refName
- return sm
- }
- c.mu.RUnlock()
- return nil
-}
-
-func (c *channelMap) GetServer(id int64) *ServerMetric {
- sm := &ServerMetric{}
- var svr *server
- var ok bool
- c.mu.RLock()
- if svr, ok = c.servers[id]; !ok {
- c.mu.RUnlock()
- return nil
- }
- sm.ListenSockets = copyMap(svr.listenSockets)
- c.mu.RUnlock()
- sm.ID = svr.id
- sm.RefName = svr.refName
- sm.ServerData = svr.s.ChannelzMetric()
- return sm
+ db.removeEntry(id)
}
// IDGenerator is an incrementing atomic that tracks IDs for channelz entities.
@@ -761,3 +219,11 @@ func (i *IDGenerator) Reset() {
func (i *IDGenerator) genID() int64 {
return atomic.AddInt64(&i.id, 1)
}
+
+// Identifier is an opaque channelz identifier used to expose channelz symbols
+// outside of grpc. Currently only implemented by Channel since no other
+// types require exposure outside grpc.
+type Identifier interface {
+ Entity
+ channelzIdentifier()
+}
diff --git a/vendor/google.golang.org/grpc/internal/channelz/id.go b/vendor/google.golang.org/grpc/internal/channelz/id.go
deleted file mode 100644
index c9a27acd3..000000000
--- a/vendor/google.golang.org/grpc/internal/channelz/id.go
+++ /dev/null
@@ -1,75 +0,0 @@
-/*
- *
- * Copyright 2022 gRPC authors.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package channelz
-
-import "fmt"
-
-// Identifier is an opaque identifier which uniquely identifies an entity in the
-// channelz database.
-type Identifier struct {
- typ RefChannelType
- id int64
- str string
- pid *Identifier
-}
-
-// Type returns the entity type corresponding to id.
-func (id *Identifier) Type() RefChannelType {
- return id.typ
-}
-
-// Int returns the integer identifier corresponding to id.
-func (id *Identifier) Int() int64 {
- return id.id
-}
-
-// String returns a string representation of the entity corresponding to id.
-//
-// This includes some information about the parent as well. Examples:
-// Top-level channel: [Channel #channel-number]
-// Nested channel: [Channel #parent-channel-number Channel #channel-number]
-// Sub channel: [Channel #parent-channel SubChannel #subchannel-number]
-func (id *Identifier) String() string {
- return id.str
-}
-
-// Equal returns true if other is the same as id.
-func (id *Identifier) Equal(other *Identifier) bool {
- if (id != nil) != (other != nil) {
- return false
- }
- if id == nil && other == nil {
- return true
- }
- return id.typ == other.typ && id.id == other.id && id.pid == other.pid
-}
-
-// NewIdentifierForTesting returns a new opaque identifier to be used only for
-// testing purposes.
-func NewIdentifierForTesting(typ RefChannelType, id int64, pid *Identifier) *Identifier {
- return newIdentifer(typ, id, pid)
-}
-
-func newIdentifer(typ RefChannelType, id int64, pid *Identifier) *Identifier {
- str := fmt.Sprintf("%s #%d", typ, id)
- if pid != nil {
- str = fmt.Sprintf("%s %s", pid, str)
- }
- return &Identifier{typ: typ, id: id, str: str, pid: pid}
-}
diff --git a/vendor/google.golang.org/grpc/internal/channelz/logging.go b/vendor/google.golang.org/grpc/internal/channelz/logging.go
index f89e6f77b..ee4d72125 100644
--- a/vendor/google.golang.org/grpc/internal/channelz/logging.go
+++ b/vendor/google.golang.org/grpc/internal/channelz/logging.go
@@ -26,53 +26,49 @@ import (
var logger = grpclog.Component("channelz")
-func withParens(id *Identifier) string {
- return "[" + id.String() + "] "
-}
-
// Info logs and adds a trace event if channelz is on.
-func Info(l grpclog.DepthLoggerV2, id *Identifier, args ...any) {
- AddTraceEvent(l, id, 1, &TraceEventDesc{
+func Info(l grpclog.DepthLoggerV2, e Entity, args ...any) {
+ AddTraceEvent(l, e, 1, &TraceEvent{
Desc: fmt.Sprint(args...),
Severity: CtInfo,
})
}
// Infof logs and adds a trace event if channelz is on.
-func Infof(l grpclog.DepthLoggerV2, id *Identifier, format string, args ...any) {
- AddTraceEvent(l, id, 1, &TraceEventDesc{
+func Infof(l grpclog.DepthLoggerV2, e Entity, format string, args ...any) {
+ AddTraceEvent(l, e, 1, &TraceEvent{
Desc: fmt.Sprintf(format, args...),
Severity: CtInfo,
})
}
// Warning logs and adds a trace event if channelz is on.
-func Warning(l grpclog.DepthLoggerV2, id *Identifier, args ...any) {
- AddTraceEvent(l, id, 1, &TraceEventDesc{
+func Warning(l grpclog.DepthLoggerV2, e Entity, args ...any) {
+ AddTraceEvent(l, e, 1, &TraceEvent{
Desc: fmt.Sprint(args...),
Severity: CtWarning,
})
}
// Warningf logs and adds a trace event if channelz is on.
-func Warningf(l grpclog.DepthLoggerV2, id *Identifier, format string, args ...any) {
- AddTraceEvent(l, id, 1, &TraceEventDesc{
+func Warningf(l grpclog.DepthLoggerV2, e Entity, format string, args ...any) {
+ AddTraceEvent(l, e, 1, &TraceEvent{
Desc: fmt.Sprintf(format, args...),
Severity: CtWarning,
})
}
// Error logs and adds a trace event if channelz is on.
-func Error(l grpclog.DepthLoggerV2, id *Identifier, args ...any) {
- AddTraceEvent(l, id, 1, &TraceEventDesc{
+func Error(l grpclog.DepthLoggerV2, e Entity, args ...any) {
+ AddTraceEvent(l, e, 1, &TraceEvent{
Desc: fmt.Sprint(args...),
Severity: CtError,
})
}
// Errorf logs and adds a trace event if channelz is on.
-func Errorf(l grpclog.DepthLoggerV2, id *Identifier, format string, args ...any) {
- AddTraceEvent(l, id, 1, &TraceEventDesc{
+func Errorf(l grpclog.DepthLoggerV2, e Entity, format string, args ...any) {
+ AddTraceEvent(l, e, 1, &TraceEvent{
Desc: fmt.Sprintf(format, args...),
Severity: CtError,
})
diff --git a/vendor/google.golang.org/grpc/internal/channelz/server.go b/vendor/google.golang.org/grpc/internal/channelz/server.go
new file mode 100644
index 000000000..cdfc49d6e
--- /dev/null
+++ b/vendor/google.golang.org/grpc/internal/channelz/server.go
@@ -0,0 +1,119 @@
+/*
+ *
+ * Copyright 2024 gRPC authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package channelz
+
+import (
+ "fmt"
+ "sync/atomic"
+)
+
+// Server is the channelz representation of a server.
+type Server struct {
+ Entity
+ ID int64
+ RefName string
+
+ ServerMetrics ServerMetrics
+
+ closeCalled bool
+ sockets map[int64]string
+ listenSockets map[int64]string
+ cm *channelMap
+}
+
+// ServerMetrics defines a struct containing metrics for servers.
+type ServerMetrics struct {
+ // The number of incoming calls started on the server.
+ CallsStarted atomic.Int64
+ // The number of incoming calls that have completed with an OK status.
+ CallsSucceeded atomic.Int64
+ // The number of incoming calls that have a completed with a non-OK status.
+ CallsFailed atomic.Int64
+ // The last time a call was started on the server.
+ LastCallStartedTimestamp atomic.Int64
+}
+
+// NewServerMetricsForTesting returns an initialized ServerMetrics.
+func NewServerMetricsForTesting(started, succeeded, failed, timestamp int64) *ServerMetrics {
+ sm := &ServerMetrics{}
+ sm.CallsStarted.Store(started)
+ sm.CallsSucceeded.Store(succeeded)
+ sm.CallsFailed.Store(failed)
+ sm.LastCallStartedTimestamp.Store(timestamp)
+ return sm
+}
+
+func (sm *ServerMetrics) CopyFrom(o *ServerMetrics) {
+ sm.CallsStarted.Store(o.CallsStarted.Load())
+ sm.CallsSucceeded.Store(o.CallsSucceeded.Load())
+ sm.CallsFailed.Store(o.CallsFailed.Load())
+ sm.LastCallStartedTimestamp.Store(o.LastCallStartedTimestamp.Load())
+}
+
+// ListenSockets returns the listening sockets for s.
+func (s *Server) ListenSockets() map[int64]string {
+ db.mu.RLock()
+ defer db.mu.RUnlock()
+ return copyMap(s.listenSockets)
+}
+
+// String returns a printable description of s.
+func (s *Server) String() string {
+ return fmt.Sprintf("Server #%d", s.ID)
+}
+
+func (s *Server) id() int64 {
+ return s.ID
+}
+
+func (s *Server) addChild(id int64, e entry) {
+ switch v := e.(type) {
+ case *Socket:
+ switch v.SocketType {
+ case SocketTypeNormal:
+ s.sockets[id] = v.RefName
+ case SocketTypeListen:
+ s.listenSockets[id] = v.RefName
+ }
+ default:
+ logger.Errorf("cannot add a child (id = %d) of type %T to a server", id, e)
+ }
+}
+
+func (s *Server) deleteChild(id int64) {
+ delete(s.sockets, id)
+ delete(s.listenSockets, id)
+ s.deleteSelfIfReady()
+}
+
+func (s *Server) triggerDelete() {
+ s.closeCalled = true
+ s.deleteSelfIfReady()
+}
+
+func (s *Server) deleteSelfIfReady() {
+ if !s.closeCalled || len(s.sockets)+len(s.listenSockets) != 0 {
+ return
+ }
+ s.cm.deleteEntry(s.ID)
+}
+
+func (s *Server) getParentID() int64 {
+ return 0
+}
diff --git a/vendor/google.golang.org/grpc/internal/channelz/socket.go b/vendor/google.golang.org/grpc/internal/channelz/socket.go
new file mode 100644
index 000000000..fa64834b2
--- /dev/null
+++ b/vendor/google.golang.org/grpc/internal/channelz/socket.go
@@ -0,0 +1,130 @@
+/*
+ *
+ * Copyright 2024 gRPC authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package channelz
+
+import (
+ "fmt"
+ "net"
+ "sync/atomic"
+
+ "google.golang.org/grpc/credentials"
+)
+
+// SocketMetrics defines the struct that the implementor of Socket interface
+// should return from ChannelzMetric().
+type SocketMetrics struct {
+ // The number of streams that have been started.
+ StreamsStarted atomic.Int64
+ // The number of streams that have ended successfully:
+ // On client side, receiving frame with eos bit set.
+ // On server side, sending frame with eos bit set.
+ StreamsSucceeded atomic.Int64
+ // The number of streams that have ended unsuccessfully:
+ // On client side, termination without receiving frame with eos bit set.
+ // On server side, termination without sending frame with eos bit set.
+ StreamsFailed atomic.Int64
+ // The number of messages successfully sent on this socket.
+ MessagesSent atomic.Int64
+ MessagesReceived atomic.Int64
+ // The number of keep alives sent. This is typically implemented with HTTP/2
+ // ping messages.
+ KeepAlivesSent atomic.Int64
+ // The last time a stream was created by this endpoint. Usually unset for
+ // servers.
+ LastLocalStreamCreatedTimestamp atomic.Int64
+ // The last time a stream was created by the remote endpoint. Usually unset
+ // for clients.
+ LastRemoteStreamCreatedTimestamp atomic.Int64
+ // The last time a message was sent by this endpoint.
+ LastMessageSentTimestamp atomic.Int64
+ // The last time a message was received by this endpoint.
+ LastMessageReceivedTimestamp atomic.Int64
+}
+
+// EphemeralSocketMetrics are metrics that change rapidly and are tracked
+// outside of channelz.
+type EphemeralSocketMetrics struct {
+ // The amount of window, granted to the local endpoint by the remote endpoint.
+ // This may be slightly out of date due to network latency. This does NOT
+ // include stream level or TCP level flow control info.
+ LocalFlowControlWindow int64
+ // The amount of window, granted to the remote endpoint by the local endpoint.
+ // This may be slightly out of date due to network latency. This does NOT
+ // include stream level or TCP level flow control info.
+ RemoteFlowControlWindow int64
+}
+
+type SocketType string
+
+const (
+ SocketTypeNormal = "NormalSocket"
+ SocketTypeListen = "ListenSocket"
+)
+
+type Socket struct {
+ Entity
+ SocketType SocketType
+ ID int64
+ Parent Entity
+ cm *channelMap
+ SocketMetrics SocketMetrics
+ EphemeralMetrics func() *EphemeralSocketMetrics
+
+ RefName string
+ // The locally bound address. Immutable.
+ LocalAddr net.Addr
+ // The remote bound address. May be absent. Immutable.
+ RemoteAddr net.Addr
+ // Optional, represents the name of the remote endpoint, if different than
+ // the original target name. Immutable.
+ RemoteName string
+ // Immutable.
+ SocketOptions *SocketOptionData
+ // Immutable.
+ Security credentials.ChannelzSecurityValue
+}
+
+func (ls *Socket) String() string {
+ return fmt.Sprintf("%s %s #%d", ls.Parent, ls.SocketType, ls.ID)
+}
+
+func (ls *Socket) id() int64 {
+ return ls.ID
+}
+
+func (ls *Socket) addChild(id int64, e entry) {
+ logger.Errorf("cannot add a child (id = %d) of type %T to a listen socket", id, e)
+}
+
+func (ls *Socket) deleteChild(id int64) {
+ logger.Errorf("cannot delete a child (id = %d) from a listen socket", id)
+}
+
+func (ls *Socket) triggerDelete() {
+ ls.cm.deleteEntry(ls.ID)
+ ls.Parent.(entry).deleteChild(ls.ID)
+}
+
+func (ls *Socket) deleteSelfIfReady() {
+ logger.Errorf("cannot call deleteSelfIfReady on a listen socket")
+}
+
+func (ls *Socket) getParentID() int64 {
+ return ls.Parent.id()
+}
diff --git a/vendor/google.golang.org/grpc/internal/channelz/subchannel.go b/vendor/google.golang.org/grpc/internal/channelz/subchannel.go
new file mode 100644
index 000000000..3b88e4cba
--- /dev/null
+++ b/vendor/google.golang.org/grpc/internal/channelz/subchannel.go
@@ -0,0 +1,151 @@
+/*
+ *
+ * Copyright 2024 gRPC authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package channelz
+
+import (
+ "fmt"
+ "sync/atomic"
+)
+
+// SubChannel is the channelz representation of a subchannel.
+type SubChannel struct {
+ Entity
+ // ID is the channelz id of this subchannel.
+ ID int64
+ // RefName is the human readable reference string of this subchannel.
+ RefName string
+ closeCalled bool
+ sockets map[int64]string
+ parent *Channel
+ trace *ChannelTrace
+ traceRefCount int32
+
+ ChannelMetrics ChannelMetrics
+}
+
+func (sc *SubChannel) String() string {
+ return fmt.Sprintf("%s SubChannel #%d", sc.parent, sc.ID)
+}
+
+func (sc *SubChannel) id() int64 {
+ return sc.ID
+}
+
+func (sc *SubChannel) Sockets() map[int64]string {
+ db.mu.RLock()
+ defer db.mu.RUnlock()
+ return copyMap(sc.sockets)
+}
+
+func (sc *SubChannel) Trace() *ChannelTrace {
+ db.mu.RLock()
+ defer db.mu.RUnlock()
+ return sc.trace.copy()
+}
+
+func (sc *SubChannel) addChild(id int64, e entry) {
+ if v, ok := e.(*Socket); ok && v.SocketType == SocketTypeNormal {
+ sc.sockets[id] = v.RefName
+ } else {
+ logger.Errorf("cannot add a child (id = %d) of type %T to a subChannel", id, e)
+ }
+}
+
+func (sc *SubChannel) deleteChild(id int64) {
+ delete(sc.sockets, id)
+ sc.deleteSelfIfReady()
+}
+
+func (sc *SubChannel) triggerDelete() {
+ sc.closeCalled = true
+ sc.deleteSelfIfReady()
+}
+
+func (sc *SubChannel) getParentID() int64 {
+ return sc.parent.ID
+}
+
+// deleteSelfFromTree tries to delete the subchannel from the channelz entry relation tree, which
+// means deleting the subchannel reference from its parent's child list.
+//
+// In order for a subchannel to be deleted from the tree, it must meet the criteria that, removal of
+// the corresponding grpc object has been invoked, and the subchannel does not have any children left.
+//
+// The returned boolean value indicates whether the channel has been successfully deleted from tree.
+func (sc *SubChannel) deleteSelfFromTree() (deleted bool) {
+ if !sc.closeCalled || len(sc.sockets) != 0 {
+ return false
+ }
+ sc.parent.deleteChild(sc.ID)
+ return true
+}
+
+// deleteSelfFromMap checks whether it is valid to delete the subchannel from the map, which means
+// deleting the subchannel from channelz's tracking entirely. Users can no longer use id to query
+// the subchannel, and its memory will be garbage collected.
+//
+// The trace reference count of the subchannel must be 0 in order to be deleted from the map. This is
+// specified in the channel tracing gRFC that as long as some other trace has reference to an entity,
+// the trace of the referenced entity must not be deleted. In order to release the resource allocated
+// by grpc, the reference to the grpc object is reset to a dummy object.
+//
+// deleteSelfFromMap must be called after deleteSelfFromTree returns true.
+//
+// It returns a bool to indicate whether the channel can be safely deleted from map.
+func (sc *SubChannel) deleteSelfFromMap() (delete bool) {
+ return sc.getTraceRefCount() == 0
+}
+
+// deleteSelfIfReady tries to delete the subchannel itself from the channelz database.
+// The delete process includes two steps:
+// 1. delete the subchannel from the entry relation tree, i.e. delete the subchannel reference from
+// its parent's child list.
+// 2. delete the subchannel from the map, i.e. delete the subchannel entirely from channelz. Lookup
+// by id will return entry not found error.
+func (sc *SubChannel) deleteSelfIfReady() {
+ if !sc.deleteSelfFromTree() {
+ return
+ }
+ if !sc.deleteSelfFromMap() {
+ return
+ }
+ db.deleteEntry(sc.ID)
+ sc.trace.clear()
+}
+
+func (sc *SubChannel) getChannelTrace() *ChannelTrace {
+ return sc.trace
+}
+
+func (sc *SubChannel) incrTraceRefCount() {
+ atomic.AddInt32(&sc.traceRefCount, 1)
+}
+
+func (sc *SubChannel) decrTraceRefCount() {
+ atomic.AddInt32(&sc.traceRefCount, -1)
+}
+
+func (sc *SubChannel) getTraceRefCount() int {
+ i := atomic.LoadInt32(&sc.traceRefCount)
+ return int(i)
+}
+
+func (sc *SubChannel) getRefName() string {
+ return sc.RefName
+}
diff --git a/vendor/google.golang.org/grpc/internal/channelz/types_linux.go b/vendor/google.golang.org/grpc/internal/channelz/syscall_linux.go
index 1b1c4cce3..5ac73ff83 100644
--- a/vendor/google.golang.org/grpc/internal/channelz/types_linux.go
+++ b/vendor/google.golang.org/grpc/internal/channelz/syscall_linux.go
@@ -49,3 +49,17 @@ func (s *SocketOptionData) Getsockopt(fd uintptr) {
s.TCPInfo = v
}
}
+
+// GetSocketOption gets the socket option info of the conn.
+func GetSocketOption(socket any) *SocketOptionData {
+ c, ok := socket.(syscall.Conn)
+ if !ok {
+ return nil
+ }
+ data := &SocketOptionData{}
+ if rawConn, err := c.SyscallConn(); err == nil {
+ rawConn.Control(data.Getsockopt)
+ return data
+ }
+ return nil
+}
diff --git a/vendor/google.golang.org/grpc/internal/channelz/types_nonlinux.go b/vendor/google.golang.org/grpc/internal/channelz/syscall_nonlinux.go
index 8b06eed1a..d1ed8df6a 100644
--- a/vendor/google.golang.org/grpc/internal/channelz/types_nonlinux.go
+++ b/vendor/google.golang.org/grpc/internal/channelz/syscall_nonlinux.go
@@ -1,5 +1,4 @@
//go:build !linux
-// +build !linux
/*
*
@@ -41,3 +40,8 @@ func (s *SocketOptionData) Getsockopt(fd uintptr) {
logger.Warning("Channelz: socket options are not supported on non-linux environments")
})
}
+
+// GetSocketOption gets the socket option info of the conn.
+func GetSocketOption(c any) *SocketOptionData {
+ return nil
+}
diff --git a/vendor/google.golang.org/grpc/internal/channelz/trace.go b/vendor/google.golang.org/grpc/internal/channelz/trace.go
new file mode 100644
index 000000000..36b867403
--- /dev/null
+++ b/vendor/google.golang.org/grpc/internal/channelz/trace.go
@@ -0,0 +1,204 @@
+/*
+ *
+ * Copyright 2018 gRPC authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package channelz
+
+import (
+ "fmt"
+ "sync"
+ "sync/atomic"
+ "time"
+
+ "google.golang.org/grpc/grpclog"
+)
+
+const (
+ defaultMaxTraceEntry int32 = 30
+)
+
+var maxTraceEntry = defaultMaxTraceEntry
+
+// SetMaxTraceEntry sets maximum number of trace entries per entity (i.e.
+// channel/subchannel). Setting it to 0 will disable channel tracing.
+func SetMaxTraceEntry(i int32) {
+ atomic.StoreInt32(&maxTraceEntry, i)
+}
+
+// ResetMaxTraceEntryToDefault resets the maximum number of trace entries per
+// entity to default.
+func ResetMaxTraceEntryToDefault() {
+ atomic.StoreInt32(&maxTraceEntry, defaultMaxTraceEntry)
+}
+
+func getMaxTraceEntry() int {
+ i := atomic.LoadInt32(&maxTraceEntry)
+ return int(i)
+}
+
+// traceEvent is an internal representation of a single trace event
+type traceEvent struct {
+ // Desc is a simple description of the trace event.
+ Desc string
+ // Severity states the severity of this trace event.
+ Severity Severity
+ // Timestamp is the event time.
+ Timestamp time.Time
+ // RefID is the id of the entity that gets referenced in the event. RefID is 0 if no other entity is
+ // involved in this event.
+ // e.g. SubChannel (id: 4[]) Created. --> RefID = 4, RefName = "" (inside [])
+ RefID int64
+ // RefName is the reference name for the entity that gets referenced in the event.
+ RefName string
+ // RefType indicates the referenced entity type, i.e Channel or SubChannel.
+ RefType RefChannelType
+}
+
+// TraceEvent is what the caller of AddTraceEvent should provide to describe the
+// event to be added to the channel trace.
+//
+// The Parent field is optional. It is used for an event that will be recorded
+// in the entity's parent trace.
+type TraceEvent struct {
+ Desc string
+ Severity Severity
+ Parent *TraceEvent
+}
+
+type ChannelTrace struct {
+ cm *channelMap
+ clearCalled bool
+ CreationTime time.Time
+ EventNum int64
+ mu sync.Mutex
+ Events []*traceEvent
+}
+
+func (c *ChannelTrace) copy() *ChannelTrace {
+ return &ChannelTrace{
+ CreationTime: c.CreationTime,
+ EventNum: c.EventNum,
+ Events: append(([]*traceEvent)(nil), c.Events...),
+ }
+}
+
+func (c *ChannelTrace) append(e *traceEvent) {
+ c.mu.Lock()
+ if len(c.Events) == getMaxTraceEntry() {
+ del := c.Events[0]
+ c.Events = c.Events[1:]
+ if del.RefID != 0 {
+ // start recursive cleanup in a goroutine to not block the call originated from grpc.
+ go func() {
+ // need to acquire c.cm.mu lock to call the unlocked attemptCleanup func.
+ c.cm.mu.Lock()
+ c.cm.decrTraceRefCount(del.RefID)
+ c.cm.mu.Unlock()
+ }()
+ }
+ }
+ e.Timestamp = time.Now()
+ c.Events = append(c.Events, e)
+ c.EventNum++
+ c.mu.Unlock()
+}
+
+func (c *ChannelTrace) clear() {
+ if c.clearCalled {
+ return
+ }
+ c.clearCalled = true
+ c.mu.Lock()
+ for _, e := range c.Events {
+ if e.RefID != 0 {
+ // caller should have already held the c.cm.mu lock.
+ c.cm.decrTraceRefCount(e.RefID)
+ }
+ }
+ c.mu.Unlock()
+}
+
+// Severity is the severity level of a trace event.
+// The canonical enumeration of all valid values is here:
+// https://github.com/grpc/grpc-proto/blob/9b13d199cc0d4703c7ea26c9c330ba695866eb23/grpc/channelz/v1/channelz.proto#L126.
+type Severity int
+
+const (
+ // CtUnknown indicates unknown severity of a trace event.
+ CtUnknown Severity = iota
+ // CtInfo indicates info level severity of a trace event.
+ CtInfo
+ // CtWarning indicates warning level severity of a trace event.
+ CtWarning
+ // CtError indicates error level severity of a trace event.
+ CtError
+)
+
+// RefChannelType is the type of the entity being referenced in a trace event.
+type RefChannelType int
+
+const (
+ // RefUnknown indicates an unknown entity type, the zero value for this type.
+ RefUnknown RefChannelType = iota
+ // RefChannel indicates the referenced entity is a Channel.
+ RefChannel
+ // RefSubChannel indicates the referenced entity is a SubChannel.
+ RefSubChannel
+ // RefServer indicates the referenced entity is a Server.
+ RefServer
+ // RefListenSocket indicates the referenced entity is a ListenSocket.
+ RefListenSocket
+ // RefNormalSocket indicates the referenced entity is a NormalSocket.
+ RefNormalSocket
+)
+
+var refChannelTypeToString = map[RefChannelType]string{
+ RefUnknown: "Unknown",
+ RefChannel: "Channel",
+ RefSubChannel: "SubChannel",
+ RefServer: "Server",
+ RefListenSocket: "ListenSocket",
+ RefNormalSocket: "NormalSocket",
+}
+
+func (r RefChannelType) String() string {
+ return refChannelTypeToString[r]
+}
+
+// AddTraceEvent adds trace related to the entity with specified id, using the
+// provided TraceEventDesc.
+//
+// If channelz is not turned ON, this will simply log the event descriptions.
+func AddTraceEvent(l grpclog.DepthLoggerV2, e Entity, depth int, desc *TraceEvent) {
+ // Log only the trace description associated with the bottom most entity.
+ d := fmt.Sprintf("[%s]%s", e, desc.Desc)
+ switch desc.Severity {
+ case CtUnknown, CtInfo:
+ l.InfoDepth(depth+1, d)
+ case CtWarning:
+ l.WarningDepth(depth+1, d)
+ case CtError:
+ l.ErrorDepth(depth+1, d)
+ }
+
+ if getMaxTraceEntry() == 0 {
+ return
+ }
+ if IsOn() {
+ db.traceEvent(e.id(), desc)
+ }
+}
diff --git a/vendor/google.golang.org/grpc/internal/channelz/types.go b/vendor/google.golang.org/grpc/internal/channelz/types.go
deleted file mode 100644
index 1d4020f53..000000000
--- a/vendor/google.golang.org/grpc/internal/channelz/types.go
+++ /dev/null
@@ -1,727 +0,0 @@
-/*
- *
- * Copyright 2018 gRPC authors.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package channelz
-
-import (
- "net"
- "sync"
- "sync/atomic"
- "time"
-
- "google.golang.org/grpc/connectivity"
- "google.golang.org/grpc/credentials"
-)
-
-// entry represents a node in the channelz database.
-type entry interface {
- // addChild adds a child e, whose channelz id is id to child list
- addChild(id int64, e entry)
- // deleteChild deletes a child with channelz id to be id from child list
- deleteChild(id int64)
- // triggerDelete tries to delete self from channelz database. However, if child
- // list is not empty, then deletion from the database is on hold until the last
- // child is deleted from database.
- triggerDelete()
- // deleteSelfIfReady check whether triggerDelete() has been called before, and whether child
- // list is now empty. If both conditions are met, then delete self from database.
- deleteSelfIfReady()
- // getParentID returns parent ID of the entry. 0 value parent ID means no parent.
- getParentID() int64
-}
-
-// dummyEntry is a fake entry to handle entry not found case.
-type dummyEntry struct {
- idNotFound int64
-}
-
-func (d *dummyEntry) addChild(id int64, e entry) {
- // Note: It is possible for a normal program to reach here under race condition.
- // For example, there could be a race between ClientConn.Close() info being propagated
- // to addrConn and http2Client. ClientConn.Close() cancel the context and result
- // in http2Client to error. The error info is then caught by transport monitor
- // and before addrConn.tearDown() is called in side ClientConn.Close(). Therefore,
- // the addrConn will create a new transport. And when registering the new transport in
- // channelz, its parent addrConn could have already been torn down and deleted
- // from channelz tracking, and thus reach the code here.
- logger.Infof("attempt to add child of type %T with id %d to a parent (id=%d) that doesn't currently exist", e, id, d.idNotFound)
-}
-
-func (d *dummyEntry) deleteChild(id int64) {
- // It is possible for a normal program to reach here under race condition.
- // Refer to the example described in addChild().
- logger.Infof("attempt to delete child with id %d from a parent (id=%d) that doesn't currently exist", id, d.idNotFound)
-}
-
-func (d *dummyEntry) triggerDelete() {
- logger.Warningf("attempt to delete an entry (id=%d) that doesn't currently exist", d.idNotFound)
-}
-
-func (*dummyEntry) deleteSelfIfReady() {
- // code should not reach here. deleteSelfIfReady is always called on an existing entry.
-}
-
-func (*dummyEntry) getParentID() int64 {
- return 0
-}
-
-// ChannelMetric defines the info channelz provides for a specific Channel, which
-// includes ChannelInternalMetric and channelz-specific data, such as channelz id,
-// child list, etc.
-type ChannelMetric struct {
- // ID is the channelz id of this channel.
- ID int64
- // RefName is the human readable reference string of this channel.
- RefName string
- // ChannelData contains channel internal metric reported by the channel through
- // ChannelzMetric().
- ChannelData *ChannelInternalMetric
- // NestedChans tracks the nested channel type children of this channel in the format of
- // a map from nested channel channelz id to corresponding reference string.
- NestedChans map[int64]string
- // SubChans tracks the subchannel type children of this channel in the format of a
- // map from subchannel channelz id to corresponding reference string.
- SubChans map[int64]string
- // Sockets tracks the socket type children of this channel in the format of a map
- // from socket channelz id to corresponding reference string.
- // Note current grpc implementation doesn't allow channel having sockets directly,
- // therefore, this is field is unused.
- Sockets map[int64]string
- // Trace contains the most recent traced events.
- Trace *ChannelTrace
-}
-
-// SubChannelMetric defines the info channelz provides for a specific SubChannel,
-// which includes ChannelInternalMetric and channelz-specific data, such as
-// channelz id, child list, etc.
-type SubChannelMetric struct {
- // ID is the channelz id of this subchannel.
- ID int64
- // RefName is the human readable reference string of this subchannel.
- RefName string
- // ChannelData contains subchannel internal metric reported by the subchannel
- // through ChannelzMetric().
- ChannelData *ChannelInternalMetric
- // NestedChans tracks the nested channel type children of this subchannel in the format of
- // a map from nested channel channelz id to corresponding reference string.
- // Note current grpc implementation doesn't allow subchannel to have nested channels
- // as children, therefore, this field is unused.
- NestedChans map[int64]string
- // SubChans tracks the subchannel type children of this subchannel in the format of a
- // map from subchannel channelz id to corresponding reference string.
- // Note current grpc implementation doesn't allow subchannel to have subchannels
- // as children, therefore, this field is unused.
- SubChans map[int64]string
- // Sockets tracks the socket type children of this subchannel in the format of a map
- // from socket channelz id to corresponding reference string.
- Sockets map[int64]string
- // Trace contains the most recent traced events.
- Trace *ChannelTrace
-}
-
-// ChannelInternalMetric defines the struct that the implementor of Channel interface
-// should return from ChannelzMetric().
-type ChannelInternalMetric struct {
- // current connectivity state of the channel.
- State connectivity.State
- // The target this channel originally tried to connect to. May be absent
- Target string
- // The number of calls started on the channel.
- CallsStarted int64
- // The number of calls that have completed with an OK status.
- CallsSucceeded int64
- // The number of calls that have a completed with a non-OK status.
- CallsFailed int64
- // The last time a call was started on the channel.
- LastCallStartedTimestamp time.Time
-}
-
-// ChannelTrace stores traced events on a channel/subchannel and related info.
-type ChannelTrace struct {
- // EventNum is the number of events that ever got traced (i.e. including those that have been deleted)
- EventNum int64
- // CreationTime is the creation time of the trace.
- CreationTime time.Time
- // Events stores the most recent trace events (up to $maxTraceEntry, newer event will overwrite the
- // oldest one)
- Events []*TraceEvent
-}
-
-// TraceEvent represent a single trace event
-type TraceEvent struct {
- // Desc is a simple description of the trace event.
- Desc string
- // Severity states the severity of this trace event.
- Severity Severity
- // Timestamp is the event time.
- Timestamp time.Time
- // RefID is the id of the entity that gets referenced in the event. RefID is 0 if no other entity is
- // involved in this event.
- // e.g. SubChannel (id: 4[]) Created. --> RefID = 4, RefName = "" (inside [])
- RefID int64
- // RefName is the reference name for the entity that gets referenced in the event.
- RefName string
- // RefType indicates the referenced entity type, i.e Channel or SubChannel.
- RefType RefChannelType
-}
-
-// Channel is the interface that should be satisfied in order to be tracked by
-// channelz as Channel or SubChannel.
-type Channel interface {
- ChannelzMetric() *ChannelInternalMetric
-}
-
-type dummyChannel struct{}
-
-func (d *dummyChannel) ChannelzMetric() *ChannelInternalMetric {
- return &ChannelInternalMetric{}
-}
-
-type channel struct {
- refName string
- c Channel
- closeCalled bool
- nestedChans map[int64]string
- subChans map[int64]string
- id int64
- pid int64
- cm *channelMap
- trace *channelTrace
- // traceRefCount is the number of trace events that reference this channel.
- // Non-zero traceRefCount means the trace of this channel cannot be deleted.
- traceRefCount int32
-}
-
-func (c *channel) addChild(id int64, e entry) {
- switch v := e.(type) {
- case *subChannel:
- c.subChans[id] = v.refName
- case *channel:
- c.nestedChans[id] = v.refName
- default:
- logger.Errorf("cannot add a child (id = %d) of type %T to a channel", id, e)
- }
-}
-
-func (c *channel) deleteChild(id int64) {
- delete(c.subChans, id)
- delete(c.nestedChans, id)
- c.deleteSelfIfReady()
-}
-
-func (c *channel) triggerDelete() {
- c.closeCalled = true
- c.deleteSelfIfReady()
-}
-
-func (c *channel) getParentID() int64 {
- return c.pid
-}
-
-// deleteSelfFromTree tries to delete the channel from the channelz entry relation tree, which means
-// deleting the channel reference from its parent's child list.
-//
-// In order for a channel to be deleted from the tree, it must meet the criteria that, removal of the
-// corresponding grpc object has been invoked, and the channel does not have any children left.
-//
-// The returned boolean value indicates whether the channel has been successfully deleted from tree.
-func (c *channel) deleteSelfFromTree() (deleted bool) {
- if !c.closeCalled || len(c.subChans)+len(c.nestedChans) != 0 {
- return false
- }
- // not top channel
- if c.pid != 0 {
- c.cm.findEntry(c.pid).deleteChild(c.id)
- }
- return true
-}
-
-// deleteSelfFromMap checks whether it is valid to delete the channel from the map, which means
-// deleting the channel from channelz's tracking entirely. Users can no longer use id to query the
-// channel, and its memory will be garbage collected.
-//
-// The trace reference count of the channel must be 0 in order to be deleted from the map. This is
-// specified in the channel tracing gRFC that as long as some other trace has reference to an entity,
-// the trace of the referenced entity must not be deleted. In order to release the resource allocated
-// by grpc, the reference to the grpc object is reset to a dummy object.
-//
-// deleteSelfFromMap must be called after deleteSelfFromTree returns true.
-//
-// It returns a bool to indicate whether the channel can be safely deleted from map.
-func (c *channel) deleteSelfFromMap() (delete bool) {
- if c.getTraceRefCount() != 0 {
- c.c = &dummyChannel{}
- return false
- }
- return true
-}
-
-// deleteSelfIfReady tries to delete the channel itself from the channelz database.
-// The delete process includes two steps:
-// 1. delete the channel from the entry relation tree, i.e. delete the channel reference from its
-// parent's child list.
-// 2. delete the channel from the map, i.e. delete the channel entirely from channelz. Lookup by id
-// will return entry not found error.
-func (c *channel) deleteSelfIfReady() {
- if !c.deleteSelfFromTree() {
- return
- }
- if !c.deleteSelfFromMap() {
- return
- }
- c.cm.deleteEntry(c.id)
- c.trace.clear()
-}
-
-func (c *channel) getChannelTrace() *channelTrace {
- return c.trace
-}
-
-func (c *channel) incrTraceRefCount() {
- atomic.AddInt32(&c.traceRefCount, 1)
-}
-
-func (c *channel) decrTraceRefCount() {
- atomic.AddInt32(&c.traceRefCount, -1)
-}
-
-func (c *channel) getTraceRefCount() int {
- i := atomic.LoadInt32(&c.traceRefCount)
- return int(i)
-}
-
-func (c *channel) getRefName() string {
- return c.refName
-}
-
-type subChannel struct {
- refName string
- c Channel
- closeCalled bool
- sockets map[int64]string
- id int64
- pid int64
- cm *channelMap
- trace *channelTrace
- traceRefCount int32
-}
-
-func (sc *subChannel) addChild(id int64, e entry) {
- if v, ok := e.(*normalSocket); ok {
- sc.sockets[id] = v.refName
- } else {
- logger.Errorf("cannot add a child (id = %d) of type %T to a subChannel", id, e)
- }
-}
-
-func (sc *subChannel) deleteChild(id int64) {
- delete(sc.sockets, id)
- sc.deleteSelfIfReady()
-}
-
-func (sc *subChannel) triggerDelete() {
- sc.closeCalled = true
- sc.deleteSelfIfReady()
-}
-
-func (sc *subChannel) getParentID() int64 {
- return sc.pid
-}
-
-// deleteSelfFromTree tries to delete the subchannel from the channelz entry relation tree, which
-// means deleting the subchannel reference from its parent's child list.
-//
-// In order for a subchannel to be deleted from the tree, it must meet the criteria that, removal of
-// the corresponding grpc object has been invoked, and the subchannel does not have any children left.
-//
-// The returned boolean value indicates whether the channel has been successfully deleted from tree.
-func (sc *subChannel) deleteSelfFromTree() (deleted bool) {
- if !sc.closeCalled || len(sc.sockets) != 0 {
- return false
- }
- sc.cm.findEntry(sc.pid).deleteChild(sc.id)
- return true
-}
-
-// deleteSelfFromMap checks whether it is valid to delete the subchannel from the map, which means
-// deleting the subchannel from channelz's tracking entirely. Users can no longer use id to query
-// the subchannel, and its memory will be garbage collected.
-//
-// The trace reference count of the subchannel must be 0 in order to be deleted from the map. This is
-// specified in the channel tracing gRFC that as long as some other trace has reference to an entity,
-// the trace of the referenced entity must not be deleted. In order to release the resource allocated
-// by grpc, the reference to the grpc object is reset to a dummy object.
-//
-// deleteSelfFromMap must be called after deleteSelfFromTree returns true.
-//
-// It returns a bool to indicate whether the channel can be safely deleted from map.
-func (sc *subChannel) deleteSelfFromMap() (delete bool) {
- if sc.getTraceRefCount() != 0 {
- // free the grpc struct (i.e. addrConn)
- sc.c = &dummyChannel{}
- return false
- }
- return true
-}
-
-// deleteSelfIfReady tries to delete the subchannel itself from the channelz database.
-// The delete process includes two steps:
-// 1. delete the subchannel from the entry relation tree, i.e. delete the subchannel reference from
-// its parent's child list.
-// 2. delete the subchannel from the map, i.e. delete the subchannel entirely from channelz. Lookup
-// by id will return entry not found error.
-func (sc *subChannel) deleteSelfIfReady() {
- if !sc.deleteSelfFromTree() {
- return
- }
- if !sc.deleteSelfFromMap() {
- return
- }
- sc.cm.deleteEntry(sc.id)
- sc.trace.clear()
-}
-
-func (sc *subChannel) getChannelTrace() *channelTrace {
- return sc.trace
-}
-
-func (sc *subChannel) incrTraceRefCount() {
- atomic.AddInt32(&sc.traceRefCount, 1)
-}
-
-func (sc *subChannel) decrTraceRefCount() {
- atomic.AddInt32(&sc.traceRefCount, -1)
-}
-
-func (sc *subChannel) getTraceRefCount() int {
- i := atomic.LoadInt32(&sc.traceRefCount)
- return int(i)
-}
-
-func (sc *subChannel) getRefName() string {
- return sc.refName
-}
-
-// SocketMetric defines the info channelz provides for a specific Socket, which
-// includes SocketInternalMetric and channelz-specific data, such as channelz id, etc.
-type SocketMetric struct {
- // ID is the channelz id of this socket.
- ID int64
- // RefName is the human readable reference string of this socket.
- RefName string
- // SocketData contains socket internal metric reported by the socket through
- // ChannelzMetric().
- SocketData *SocketInternalMetric
-}
-
-// SocketInternalMetric defines the struct that the implementor of Socket interface
-// should return from ChannelzMetric().
-type SocketInternalMetric struct {
- // The number of streams that have been started.
- StreamsStarted int64
- // The number of streams that have ended successfully:
- // On client side, receiving frame with eos bit set.
- // On server side, sending frame with eos bit set.
- StreamsSucceeded int64
- // The number of streams that have ended unsuccessfully:
- // On client side, termination without receiving frame with eos bit set.
- // On server side, termination without sending frame with eos bit set.
- StreamsFailed int64
- // The number of messages successfully sent on this socket.
- MessagesSent int64
- MessagesReceived int64
- // The number of keep alives sent. This is typically implemented with HTTP/2
- // ping messages.
- KeepAlivesSent int64
- // The last time a stream was created by this endpoint. Usually unset for
- // servers.
- LastLocalStreamCreatedTimestamp time.Time
- // The last time a stream was created by the remote endpoint. Usually unset
- // for clients.
- LastRemoteStreamCreatedTimestamp time.Time
- // The last time a message was sent by this endpoint.
- LastMessageSentTimestamp time.Time
- // The last time a message was received by this endpoint.
- LastMessageReceivedTimestamp time.Time
- // The amount of window, granted to the local endpoint by the remote endpoint.
- // This may be slightly out of date due to network latency. This does NOT
- // include stream level or TCP level flow control info.
- LocalFlowControlWindow int64
- // The amount of window, granted to the remote endpoint by the local endpoint.
- // This may be slightly out of date due to network latency. This does NOT
- // include stream level or TCP level flow control info.
- RemoteFlowControlWindow int64
- // The locally bound address.
- LocalAddr net.Addr
- // The remote bound address. May be absent.
- RemoteAddr net.Addr
- // Optional, represents the name of the remote endpoint, if different than
- // the original target name.
- RemoteName string
- SocketOptions *SocketOptionData
- Security credentials.ChannelzSecurityValue
-}
-
-// Socket is the interface that should be satisfied in order to be tracked by
-// channelz as Socket.
-type Socket interface {
- ChannelzMetric() *SocketInternalMetric
-}
-
-type listenSocket struct {
- refName string
- s Socket
- id int64
- pid int64
- cm *channelMap
-}
-
-func (ls *listenSocket) addChild(id int64, e entry) {
- logger.Errorf("cannot add a child (id = %d) of type %T to a listen socket", id, e)
-}
-
-func (ls *listenSocket) deleteChild(id int64) {
- logger.Errorf("cannot delete a child (id = %d) from a listen socket", id)
-}
-
-func (ls *listenSocket) triggerDelete() {
- ls.cm.deleteEntry(ls.id)
- ls.cm.findEntry(ls.pid).deleteChild(ls.id)
-}
-
-func (ls *listenSocket) deleteSelfIfReady() {
- logger.Errorf("cannot call deleteSelfIfReady on a listen socket")
-}
-
-func (ls *listenSocket) getParentID() int64 {
- return ls.pid
-}
-
-type normalSocket struct {
- refName string
- s Socket
- id int64
- pid int64
- cm *channelMap
-}
-
-func (ns *normalSocket) addChild(id int64, e entry) {
- logger.Errorf("cannot add a child (id = %d) of type %T to a normal socket", id, e)
-}
-
-func (ns *normalSocket) deleteChild(id int64) {
- logger.Errorf("cannot delete a child (id = %d) from a normal socket", id)
-}
-
-func (ns *normalSocket) triggerDelete() {
- ns.cm.deleteEntry(ns.id)
- ns.cm.findEntry(ns.pid).deleteChild(ns.id)
-}
-
-func (ns *normalSocket) deleteSelfIfReady() {
- logger.Errorf("cannot call deleteSelfIfReady on a normal socket")
-}
-
-func (ns *normalSocket) getParentID() int64 {
- return ns.pid
-}
-
-// ServerMetric defines the info channelz provides for a specific Server, which
-// includes ServerInternalMetric and channelz-specific data, such as channelz id,
-// child list, etc.
-type ServerMetric struct {
- // ID is the channelz id of this server.
- ID int64
- // RefName is the human readable reference string of this server.
- RefName string
- // ServerData contains server internal metric reported by the server through
- // ChannelzMetric().
- ServerData *ServerInternalMetric
- // ListenSockets tracks the listener socket type children of this server in the
- // format of a map from socket channelz id to corresponding reference string.
- ListenSockets map[int64]string
-}
-
-// ServerInternalMetric defines the struct that the implementor of Server interface
-// should return from ChannelzMetric().
-type ServerInternalMetric struct {
- // The number of incoming calls started on the server.
- CallsStarted int64
- // The number of incoming calls that have completed with an OK status.
- CallsSucceeded int64
- // The number of incoming calls that have a completed with a non-OK status.
- CallsFailed int64
- // The last time a call was started on the server.
- LastCallStartedTimestamp time.Time
-}
-
-// Server is the interface to be satisfied in order to be tracked by channelz as
-// Server.
-type Server interface {
- ChannelzMetric() *ServerInternalMetric
-}
-
-type server struct {
- refName string
- s Server
- closeCalled bool
- sockets map[int64]string
- listenSockets map[int64]string
- id int64
- cm *channelMap
-}
-
-func (s *server) addChild(id int64, e entry) {
- switch v := e.(type) {
- case *normalSocket:
- s.sockets[id] = v.refName
- case *listenSocket:
- s.listenSockets[id] = v.refName
- default:
- logger.Errorf("cannot add a child (id = %d) of type %T to a server", id, e)
- }
-}
-
-func (s *server) deleteChild(id int64) {
- delete(s.sockets, id)
- delete(s.listenSockets, id)
- s.deleteSelfIfReady()
-}
-
-func (s *server) triggerDelete() {
- s.closeCalled = true
- s.deleteSelfIfReady()
-}
-
-func (s *server) deleteSelfIfReady() {
- if !s.closeCalled || len(s.sockets)+len(s.listenSockets) != 0 {
- return
- }
- s.cm.deleteEntry(s.id)
-}
-
-func (s *server) getParentID() int64 {
- return 0
-}
-
-type tracedChannel interface {
- getChannelTrace() *channelTrace
- incrTraceRefCount()
- decrTraceRefCount()
- getRefName() string
-}
-
-type channelTrace struct {
- cm *channelMap
- clearCalled bool
- createdTime time.Time
- eventCount int64
- mu sync.Mutex
- events []*TraceEvent
-}
-
-func (c *channelTrace) append(e *TraceEvent) {
- c.mu.Lock()
- if len(c.events) == getMaxTraceEntry() {
- del := c.events[0]
- c.events = c.events[1:]
- if del.RefID != 0 {
- // start recursive cleanup in a goroutine to not block the call originated from grpc.
- go func() {
- // need to acquire c.cm.mu lock to call the unlocked attemptCleanup func.
- c.cm.mu.Lock()
- c.cm.decrTraceRefCount(del.RefID)
- c.cm.mu.Unlock()
- }()
- }
- }
- e.Timestamp = time.Now()
- c.events = append(c.events, e)
- c.eventCount++
- c.mu.Unlock()
-}
-
-func (c *channelTrace) clear() {
- if c.clearCalled {
- return
- }
- c.clearCalled = true
- c.mu.Lock()
- for _, e := range c.events {
- if e.RefID != 0 {
- // caller should have already held the c.cm.mu lock.
- c.cm.decrTraceRefCount(e.RefID)
- }
- }
- c.mu.Unlock()
-}
-
-// Severity is the severity level of a trace event.
-// The canonical enumeration of all valid values is here:
-// https://github.com/grpc/grpc-proto/blob/9b13d199cc0d4703c7ea26c9c330ba695866eb23/grpc/channelz/v1/channelz.proto#L126.
-type Severity int
-
-const (
- // CtUnknown indicates unknown severity of a trace event.
- CtUnknown Severity = iota
- // CtInfo indicates info level severity of a trace event.
- CtInfo
- // CtWarning indicates warning level severity of a trace event.
- CtWarning
- // CtError indicates error level severity of a trace event.
- CtError
-)
-
-// RefChannelType is the type of the entity being referenced in a trace event.
-type RefChannelType int
-
-const (
- // RefUnknown indicates an unknown entity type, the zero value for this type.
- RefUnknown RefChannelType = iota
- // RefChannel indicates the referenced entity is a Channel.
- RefChannel
- // RefSubChannel indicates the referenced entity is a SubChannel.
- RefSubChannel
- // RefServer indicates the referenced entity is a Server.
- RefServer
- // RefListenSocket indicates the referenced entity is a ListenSocket.
- RefListenSocket
- // RefNormalSocket indicates the referenced entity is a NormalSocket.
- RefNormalSocket
-)
-
-var refChannelTypeToString = map[RefChannelType]string{
- RefUnknown: "Unknown",
- RefChannel: "Channel",
- RefSubChannel: "SubChannel",
- RefServer: "Server",
- RefListenSocket: "ListenSocket",
- RefNormalSocket: "NormalSocket",
-}
-
-func (r RefChannelType) String() string {
- return refChannelTypeToString[r]
-}
-
-func (c *channelTrace) dumpData() *ChannelTrace {
- c.mu.Lock()
- ct := &ChannelTrace{EventNum: c.eventCount, CreationTime: c.createdTime}
- ct.Events = c.events[:len(c.events)]
- c.mu.Unlock()
- return ct
-}
diff --git a/vendor/google.golang.org/grpc/internal/channelz/util_linux.go b/vendor/google.golang.org/grpc/internal/channelz/util_linux.go
deleted file mode 100644
index 98288c3f8..000000000
--- a/vendor/google.golang.org/grpc/internal/channelz/util_linux.go
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- *
- * Copyright 2018 gRPC authors.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package channelz
-
-import (
- "syscall"
-)
-
-// GetSocketOption gets the socket option info of the conn.
-func GetSocketOption(socket any) *SocketOptionData {
- c, ok := socket.(syscall.Conn)
- if !ok {
- return nil
- }
- data := &SocketOptionData{}
- if rawConn, err := c.SyscallConn(); err == nil {
- rawConn.Control(data.Getsockopt)
- return data
- }
- return nil
-}
diff --git a/vendor/google.golang.org/grpc/internal/channelz/util_nonlinux.go b/vendor/google.golang.org/grpc/internal/channelz/util_nonlinux.go
deleted file mode 100644
index b5568b22e..000000000
--- a/vendor/google.golang.org/grpc/internal/channelz/util_nonlinux.go
+++ /dev/null
@@ -1,27 +0,0 @@
-//go:build !linux
-// +build !linux
-
-/*
- *
- * Copyright 2018 gRPC authors.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package channelz
-
-// GetSocketOption gets the socket option info of the conn.
-func GetSocketOption(c any) *SocketOptionData {
- return nil
-}
diff --git a/vendor/google.golang.org/grpc/internal/grpcrand/grpcrand.go b/vendor/google.golang.org/grpc/internal/grpcrand/grpcrand.go
index aa97273e7..0126d6b51 100644
--- a/vendor/google.golang.org/grpc/internal/grpcrand/grpcrand.go
+++ b/vendor/google.golang.org/grpc/internal/grpcrand/grpcrand.go
@@ -1,3 +1,8 @@
+//go:build !go1.21
+
+// TODO: when this file is deleted (after Go 1.20 support is dropped), delete
+// all of grpcrand and call the rand package directly.
+
/*
*
* Copyright 2018 gRPC authors.
diff --git a/vendor/google.golang.org/grpc/internal/grpcrand/grpcrand_go1.21.go b/vendor/google.golang.org/grpc/internal/grpcrand/grpcrand_go1.21.go
new file mode 100644
index 000000000..c37299af1
--- /dev/null
+++ b/vendor/google.golang.org/grpc/internal/grpcrand/grpcrand_go1.21.go
@@ -0,0 +1,73 @@
+//go:build go1.21
+
+/*
+ *
+ * Copyright 2024 gRPC authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+// Package grpcrand implements math/rand functions in a concurrent-safe way
+// with a global random source, independent of math/rand's global source.
+package grpcrand
+
+import "math/rand"
+
+// This implementation will be used for Go version 1.21 or newer.
+// For older versions, the original implementation with mutex will be used.
+
+// Int implements rand.Int on the grpcrand global source.
+func Int() int {
+ return rand.Int()
+}
+
+// Int63n implements rand.Int63n on the grpcrand global source.
+func Int63n(n int64) int64 {
+ return rand.Int63n(n)
+}
+
+// Intn implements rand.Intn on the grpcrand global source.
+func Intn(n int) int {
+ return rand.Intn(n)
+}
+
+// Int31n implements rand.Int31n on the grpcrand global source.
+func Int31n(n int32) int32 {
+ return rand.Int31n(n)
+}
+
+// Float64 implements rand.Float64 on the grpcrand global source.
+func Float64() float64 {
+ return rand.Float64()
+}
+
+// Uint64 implements rand.Uint64 on the grpcrand global source.
+func Uint64() uint64 {
+ return rand.Uint64()
+}
+
+// Uint32 implements rand.Uint32 on the grpcrand global source.
+func Uint32() uint32 {
+ return rand.Uint32()
+}
+
+// ExpFloat64 implements rand.ExpFloat64 on the grpcrand global source.
+func ExpFloat64() float64 {
+ return rand.ExpFloat64()
+}
+
+// Shuffle implements rand.Shuffle on the grpcrand global source.
+var Shuffle = func(n int, f func(int, int)) {
+ rand.Shuffle(n, f)
+}
diff --git a/vendor/google.golang.org/grpc/internal/internal.go b/vendor/google.golang.org/grpc/internal/internal.go
index 6c7ea6a53..48d24bdb4 100644
--- a/vendor/google.golang.org/grpc/internal/internal.go
+++ b/vendor/google.golang.org/grpc/internal/internal.go
@@ -190,12 +190,16 @@ var (
// function makes events more predictable than relying on timer events.
TriggerXDSResourceNameNotFoundForTesting any // func(func(xdsresource.Type, string), string, string) error
- // TriggerXDSResourceNotFoundClient invokes the testing xDS Client singleton
- // to invoke resource not found for a resource type name and resource name.
+ // TriggerXDSResourceNameNotFoundClient invokes the testing xDS Client
+ // singleton to invoke resource not found for a resource type name and
+ // resource name.
TriggerXDSResourceNameNotFoundClient any // func(string, string) error
// FromOutgoingContextRaw returns the un-merged, intermediary contents of metadata.rawMD.
FromOutgoingContextRaw any // func(context.Context) (metadata.MD, [][]string, bool)
+
+ // UserSetDefaultScheme is set to true if the user has overridden the default resolver scheme.
+ UserSetDefaultScheme bool = false
)
// HealthChecker defines the signature of the client-side LB channel health checking function.
diff --git a/vendor/google.golang.org/grpc/internal/pretty/pretty.go b/vendor/google.golang.org/grpc/internal/pretty/pretty.go
index 703319137..dbee7a60d 100644
--- a/vendor/google.golang.org/grpc/internal/pretty/pretty.go
+++ b/vendor/google.golang.org/grpc/internal/pretty/pretty.go
@@ -24,10 +24,8 @@ import (
"encoding/json"
"fmt"
- "github.com/golang/protobuf/jsonpb"
- protov1 "github.com/golang/protobuf/proto"
"google.golang.org/protobuf/encoding/protojson"
- protov2 "google.golang.org/protobuf/proto"
+ "google.golang.org/protobuf/protoadapt"
)
const jsonIndent = " "
@@ -36,21 +34,14 @@ const jsonIndent = " "
//
// If marshal fails, it falls back to fmt.Sprintf("%+v").
func ToJSON(e any) string {
- switch ee := e.(type) {
- case protov1.Message:
- mm := jsonpb.Marshaler{Indent: jsonIndent}
- ret, err := mm.MarshalToString(ee)
- if err != nil {
- // This may fail for proto.Anys, e.g. for xDS v2, LDS, the v2
- // messages are not imported, and this will fail because the message
- // is not found.
- return fmt.Sprintf("%+v", ee)
- }
- return ret
- case protov2.Message:
+ if ee, ok := e.(protoadapt.MessageV1); ok {
+ e = protoadapt.MessageV2Of(ee)
+ }
+
+ if ee, ok := e.(protoadapt.MessageV2); ok {
mm := protojson.MarshalOptions{
- Multiline: true,
Indent: jsonIndent,
+ Multiline: true,
}
ret, err := mm.Marshal(ee)
if err != nil {
@@ -60,13 +51,13 @@ func ToJSON(e any) string {
return fmt.Sprintf("%+v", ee)
}
return string(ret)
- default:
- ret, err := json.MarshalIndent(ee, "", jsonIndent)
- if err != nil {
- return fmt.Sprintf("%+v", ee)
- }
- return string(ret)
}
+
+ ret, err := json.MarshalIndent(e, "", jsonIndent)
+ if err != nil {
+ return fmt.Sprintf("%+v", e)
+ }
+ return string(ret)
}
// FormatJSON formats the input json bytes with indentation.
diff --git a/vendor/google.golang.org/grpc/internal/resolver/dns/dns_resolver.go b/vendor/google.golang.org/grpc/internal/resolver/dns/dns_resolver.go
index b66dcb213..abab35e25 100644
--- a/vendor/google.golang.org/grpc/internal/resolver/dns/dns_resolver.go
+++ b/vendor/google.golang.org/grpc/internal/resolver/dns/dns_resolver.go
@@ -45,6 +45,13 @@ import (
// addresses from SRV records. Must not be changed after init time.
var EnableSRVLookups = false
+// ResolvingTimeout specifies the maximum duration for a DNS resolution request.
+// If the timeout expires before a response is received, the request will be canceled.
+//
+// It is recommended to set this value at application startup. Avoid modifying this variable
+// after initialization as it's not thread-safe for concurrent modification.
+var ResolvingTimeout = 30 * time.Second
+
var logger = grpclog.Component("dns")
func init() {
@@ -221,18 +228,18 @@ func (d *dnsResolver) watcher() {
}
}
-func (d *dnsResolver) lookupSRV() ([]resolver.Address, error) {
+func (d *dnsResolver) lookupSRV(ctx context.Context) ([]resolver.Address, error) {
if !EnableSRVLookups {
return nil, nil
}
var newAddrs []resolver.Address
- _, srvs, err := d.resolver.LookupSRV(d.ctx, "grpclb", "tcp", d.host)
+ _, srvs, err := d.resolver.LookupSRV(ctx, "grpclb", "tcp", d.host)
if err != nil {
err = handleDNSError(err, "SRV") // may become nil
return nil, err
}
for _, s := range srvs {
- lbAddrs, err := d.resolver.LookupHost(d.ctx, s.Target)
+ lbAddrs, err := d.resolver.LookupHost(ctx, s.Target)
if err != nil {
err = handleDNSError(err, "A") // may become nil
if err == nil {
@@ -269,8 +276,8 @@ func handleDNSError(err error, lookupType string) error {
return err
}
-func (d *dnsResolver) lookupTXT() *serviceconfig.ParseResult {
- ss, err := d.resolver.LookupTXT(d.ctx, txtPrefix+d.host)
+func (d *dnsResolver) lookupTXT(ctx context.Context) *serviceconfig.ParseResult {
+ ss, err := d.resolver.LookupTXT(ctx, txtPrefix+d.host)
if err != nil {
if envconfig.TXTErrIgnore {
return nil
@@ -297,8 +304,8 @@ func (d *dnsResolver) lookupTXT() *serviceconfig.ParseResult {
return d.cc.ParseServiceConfig(sc)
}
-func (d *dnsResolver) lookupHost() ([]resolver.Address, error) {
- addrs, err := d.resolver.LookupHost(d.ctx, d.host)
+func (d *dnsResolver) lookupHost(ctx context.Context) ([]resolver.Address, error) {
+ addrs, err := d.resolver.LookupHost(ctx, d.host)
if err != nil {
err = handleDNSError(err, "A")
return nil, err
@@ -316,8 +323,10 @@ func (d *dnsResolver) lookupHost() ([]resolver.Address, error) {
}
func (d *dnsResolver) lookup() (*resolver.State, error) {
- srv, srvErr := d.lookupSRV()
- addrs, hostErr := d.lookupHost()
+ ctx, cancel := context.WithTimeout(d.ctx, ResolvingTimeout)
+ defer cancel()
+ srv, srvErr := d.lookupSRV(ctx)
+ addrs, hostErr := d.lookupHost(ctx)
if hostErr != nil && (srvErr != nil || len(srv) == 0) {
return nil, hostErr
}
@@ -327,7 +336,7 @@ func (d *dnsResolver) lookup() (*resolver.State, error) {
state = grpclbstate.Set(state, &grpclbstate.State{BalancerAddresses: srv})
}
if !d.disableServiceConfig {
- state.ServiceConfig = d.lookupTXT()
+ state.ServiceConfig = d.lookupTXT(ctx)
}
return &state, nil
}
diff --git a/vendor/google.golang.org/grpc/internal/status/status.go b/vendor/google.golang.org/grpc/internal/status/status.go
index 03ef2fedd..c7dbc8205 100644
--- a/vendor/google.golang.org/grpc/internal/status/status.go
+++ b/vendor/google.golang.org/grpc/internal/status/status.go
@@ -31,10 +31,11 @@ import (
"errors"
"fmt"
- "github.com/golang/protobuf/proto"
- "github.com/golang/protobuf/ptypes"
spb "google.golang.org/genproto/googleapis/rpc/status"
"google.golang.org/grpc/codes"
+ "google.golang.org/protobuf/proto"
+ "google.golang.org/protobuf/protoadapt"
+ "google.golang.org/protobuf/types/known/anypb"
)
// Status represents an RPC status code, message, and details. It is immutable
@@ -130,14 +131,14 @@ func (s *Status) Err() error {
// WithDetails returns a new status with the provided details messages appended to the status.
// If any errors are encountered, it returns nil and the first error encountered.
-func (s *Status) WithDetails(details ...proto.Message) (*Status, error) {
+func (s *Status) WithDetails(details ...protoadapt.MessageV1) (*Status, error) {
if s.Code() == codes.OK {
return nil, errors.New("no error details for status with code OK")
}
// s.Code() != OK implies that s.Proto() != nil.
p := s.Proto()
for _, detail := range details {
- any, err := ptypes.MarshalAny(detail)
+ any, err := anypb.New(protoadapt.MessageV2Of(detail))
if err != nil {
return nil, err
}
@@ -154,12 +155,12 @@ func (s *Status) Details() []any {
}
details := make([]any, 0, len(s.s.Details))
for _, any := range s.s.Details {
- detail := &ptypes.DynamicAny{}
- if err := ptypes.UnmarshalAny(any, detail); err != nil {
+ detail, err := any.UnmarshalNew()
+ if err != nil {
details = append(details, err)
continue
}
- details = append(details, detail.Message)
+ details = append(details, detail)
}
return details
}
diff --git a/vendor/google.golang.org/grpc/internal/transport/handler_server.go b/vendor/google.golang.org/grpc/internal/transport/handler_server.go
index a9d70e2a1..4a3ddce29 100644
--- a/vendor/google.golang.org/grpc/internal/transport/handler_server.go
+++ b/vendor/google.golang.org/grpc/internal/transport/handler_server.go
@@ -35,7 +35,6 @@ import (
"sync"
"time"
- "github.com/golang/protobuf/proto"
"golang.org/x/net/http2"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/credentials"
@@ -45,20 +44,17 @@ import (
"google.golang.org/grpc/peer"
"google.golang.org/grpc/stats"
"google.golang.org/grpc/status"
+ "google.golang.org/protobuf/proto"
)
// NewServerHandlerTransport returns a ServerTransport handling gRPC from
// inside an http.Handler, or writes an HTTP error to w and returns an error.
// It requires that the http Server supports HTTP/2.
func NewServerHandlerTransport(w http.ResponseWriter, r *http.Request, stats []stats.Handler) (ServerTransport, error) {
- if r.ProtoMajor != 2 {
- msg := "gRPC requires HTTP/2"
- http.Error(w, msg, http.StatusBadRequest)
- return nil, errors.New(msg)
- }
- if r.Method != "POST" {
+ if r.Method != http.MethodPost {
+ w.Header().Set("Allow", http.MethodPost)
msg := fmt.Sprintf("invalid gRPC request method %q", r.Method)
- http.Error(w, msg, http.StatusBadRequest)
+ http.Error(w, msg, http.StatusMethodNotAllowed)
return nil, errors.New(msg)
}
contentType := r.Header.Get("Content-Type")
@@ -69,6 +65,11 @@ func NewServerHandlerTransport(w http.ResponseWriter, r *http.Request, stats []s
http.Error(w, msg, http.StatusUnsupportedMediaType)
return nil, errors.New(msg)
}
+ if r.ProtoMajor != 2 {
+ msg := "gRPC requires HTTP/2"
+ http.Error(w, msg, http.StatusHTTPVersionNotSupported)
+ return nil, errors.New(msg)
+ }
if _, ok := w.(http.Flusher); !ok {
msg := "gRPC requires a ResponseWriter supporting http.Flusher"
http.Error(w, msg, http.StatusInternalServerError)
diff --git a/vendor/google.golang.org/grpc/internal/transport/http2_client.go b/vendor/google.golang.org/grpc/internal/transport/http2_client.go
index eff879964..deba0c4d9 100644
--- a/vendor/google.golang.org/grpc/internal/transport/http2_client.go
+++ b/vendor/google.golang.org/grpc/internal/transport/http2_client.go
@@ -140,9 +140,7 @@ type http2Client struct {
// variable.
kpDormant bool
- // Fields below are for channelz metric collection.
- channelzID *channelz.Identifier
- czData *channelzData
+ channelz *channelz.Socket
onClose func(GoAwayReason)
@@ -319,6 +317,7 @@ func newHTTP2Client(connectCtx, ctx context.Context, addr resolver.Address, opts
if opts.MaxHeaderListSize != nil {
maxHeaderListSize = *opts.MaxHeaderListSize
}
+
t := &http2Client{
ctx: ctx,
ctxDone: ctx.Done(), // Cache Done chan.
@@ -346,11 +345,25 @@ func newHTTP2Client(connectCtx, ctx context.Context, addr resolver.Address, opts
maxConcurrentStreams: defaultMaxStreamsClient,
streamQuota: defaultMaxStreamsClient,
streamsQuotaAvailable: make(chan struct{}, 1),
- czData: new(channelzData),
keepaliveEnabled: keepaliveEnabled,
bufferPool: newBufferPool(),
onClose: onClose,
}
+ var czSecurity credentials.ChannelzSecurityValue
+ if au, ok := authInfo.(credentials.ChannelzSecurityInfo); ok {
+ czSecurity = au.GetSecurityValue()
+ }
+ t.channelz = channelz.RegisterSocket(
+ &channelz.Socket{
+ SocketType: channelz.SocketTypeNormal,
+ Parent: opts.ChannelzParent,
+ SocketMetrics: channelz.SocketMetrics{},
+ EphemeralMetrics: t.socketMetrics,
+ LocalAddr: t.localAddr,
+ RemoteAddr: t.remoteAddr,
+ SocketOptions: channelz.GetSocketOption(t.conn),
+ Security: czSecurity,
+ })
t.logger = prefixLoggerForClientTransport(t)
// Add peer information to the http2client context.
t.ctx = peer.NewContext(t.ctx, t.getPeer())
@@ -381,10 +394,6 @@ func newHTTP2Client(connectCtx, ctx context.Context, addr resolver.Address, opts
}
sh.HandleConn(t.ctx, connBegin)
}
- t.channelzID, err = channelz.RegisterNormalSocket(t, opts.ChannelzParentID, fmt.Sprintf("%s -> %s", t.localAddr, t.remoteAddr))
- if err != nil {
- return nil, err
- }
if t.keepaliveEnabled {
t.kpDormancyCond = sync.NewCond(&t.mu)
go t.keepalive()
@@ -756,8 +765,8 @@ func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (*Stream,
return ErrConnClosing
}
if channelz.IsOn() {
- atomic.AddInt64(&t.czData.streamsStarted, 1)
- atomic.StoreInt64(&t.czData.lastStreamCreatedTime, time.Now().UnixNano())
+ t.channelz.SocketMetrics.StreamsStarted.Add(1)
+ t.channelz.SocketMetrics.LastLocalStreamCreatedTimestamp.Store(time.Now().UnixNano())
}
// If the keepalive goroutine has gone dormant, wake it up.
if t.kpDormant {
@@ -928,9 +937,9 @@ func (t *http2Client) closeStream(s *Stream, err error, rst bool, rstCode http2.
t.mu.Unlock()
if channelz.IsOn() {
if eosReceived {
- atomic.AddInt64(&t.czData.streamsSucceeded, 1)
+ t.channelz.SocketMetrics.StreamsSucceeded.Add(1)
} else {
- atomic.AddInt64(&t.czData.streamsFailed, 1)
+ t.channelz.SocketMetrics.StreamsFailed.Add(1)
}
}
},
@@ -985,7 +994,7 @@ func (t *http2Client) Close(err error) {
t.controlBuf.finish()
t.cancel()
t.conn.Close()
- channelz.RemoveEntry(t.channelzID)
+ channelz.RemoveEntry(t.channelz.ID)
// Append info about previous goaways if there were any, since this may be important
// for understanding the root cause for this connection to be closed.
_, goAwayDebugMessage := t.GetGoAwayReason()
@@ -1708,7 +1717,7 @@ func (t *http2Client) keepalive() {
// keepalive timer expired. In both cases, we need to send a ping.
if !outstandingPing {
if channelz.IsOn() {
- atomic.AddInt64(&t.czData.kpCount, 1)
+ t.channelz.SocketMetrics.KeepAlivesSent.Add(1)
}
t.controlBuf.put(p)
timeoutLeft = t.kp.Timeout
@@ -1738,40 +1747,23 @@ func (t *http2Client) GoAway() <-chan struct{} {
return t.goAway
}
-func (t *http2Client) ChannelzMetric() *channelz.SocketInternalMetric {
- s := channelz.SocketInternalMetric{
- StreamsStarted: atomic.LoadInt64(&t.czData.streamsStarted),
- StreamsSucceeded: atomic.LoadInt64(&t.czData.streamsSucceeded),
- StreamsFailed: atomic.LoadInt64(&t.czData.streamsFailed),
- MessagesSent: atomic.LoadInt64(&t.czData.msgSent),
- MessagesReceived: atomic.LoadInt64(&t.czData.msgRecv),
- KeepAlivesSent: atomic.LoadInt64(&t.czData.kpCount),
- LastLocalStreamCreatedTimestamp: time.Unix(0, atomic.LoadInt64(&t.czData.lastStreamCreatedTime)),
- LastMessageSentTimestamp: time.Unix(0, atomic.LoadInt64(&t.czData.lastMsgSentTime)),
- LastMessageReceivedTimestamp: time.Unix(0, atomic.LoadInt64(&t.czData.lastMsgRecvTime)),
- LocalFlowControlWindow: int64(t.fc.getSize()),
- SocketOptions: channelz.GetSocketOption(t.conn),
- LocalAddr: t.localAddr,
- RemoteAddr: t.remoteAddr,
- // RemoteName :
- }
- if au, ok := t.authInfo.(credentials.ChannelzSecurityInfo); ok {
- s.Security = au.GetSecurityValue()
- }
- s.RemoteFlowControlWindow = t.getOutFlowWindow()
- return &s
+func (t *http2Client) socketMetrics() *channelz.EphemeralSocketMetrics {
+ return &channelz.EphemeralSocketMetrics{
+ LocalFlowControlWindow: int64(t.fc.getSize()),
+ RemoteFlowControlWindow: t.getOutFlowWindow(),
+ }
}
func (t *http2Client) RemoteAddr() net.Addr { return t.remoteAddr }
func (t *http2Client) IncrMsgSent() {
- atomic.AddInt64(&t.czData.msgSent, 1)
- atomic.StoreInt64(&t.czData.lastMsgSentTime, time.Now().UnixNano())
+ t.channelz.SocketMetrics.MessagesSent.Add(1)
+ t.channelz.SocketMetrics.LastMessageSentTimestamp.Store(time.Now().UnixNano())
}
func (t *http2Client) IncrMsgRecv() {
- atomic.AddInt64(&t.czData.msgRecv, 1)
- atomic.StoreInt64(&t.czData.lastMsgRecvTime, time.Now().UnixNano())
+ t.channelz.SocketMetrics.MessagesReceived.Add(1)
+ t.channelz.SocketMetrics.LastMessageReceivedTimestamp.Store(time.Now().UnixNano())
}
func (t *http2Client) getOutFlowWindow() int64 {
diff --git a/vendor/google.golang.org/grpc/internal/transport/http2_server.go b/vendor/google.golang.org/grpc/internal/transport/http2_server.go
index a206e2eef..d582e0471 100644
--- a/vendor/google.golang.org/grpc/internal/transport/http2_server.go
+++ b/vendor/google.golang.org/grpc/internal/transport/http2_server.go
@@ -32,13 +32,13 @@ import (
"sync/atomic"
"time"
- "github.com/golang/protobuf/proto"
"golang.org/x/net/http2"
"golang.org/x/net/http2/hpack"
"google.golang.org/grpc/internal/grpclog"
"google.golang.org/grpc/internal/grpcutil"
"google.golang.org/grpc/internal/pretty"
"google.golang.org/grpc/internal/syscall"
+ "google.golang.org/protobuf/proto"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/credentials"
@@ -118,8 +118,7 @@ type http2Server struct {
idle time.Time
// Fields below are for channelz metric collection.
- channelzID *channelz.Identifier
- czData *channelzData
+ channelz *channelz.Socket
bufferPool *bufferPool
connectionID uint64
@@ -262,9 +261,24 @@ func NewServerTransport(conn net.Conn, config *ServerConfig) (_ ServerTransport,
idle: time.Now(),
kep: kep,
initialWindowSize: iwz,
- czData: new(channelzData),
bufferPool: newBufferPool(),
}
+ var czSecurity credentials.ChannelzSecurityValue
+ if au, ok := authInfo.(credentials.ChannelzSecurityInfo); ok {
+ czSecurity = au.GetSecurityValue()
+ }
+ t.channelz = channelz.RegisterSocket(
+ &channelz.Socket{
+ SocketType: channelz.SocketTypeNormal,
+ Parent: config.ChannelzParent,
+ SocketMetrics: channelz.SocketMetrics{},
+ EphemeralMetrics: t.socketMetrics,
+ LocalAddr: t.peer.LocalAddr,
+ RemoteAddr: t.peer.Addr,
+ SocketOptions: channelz.GetSocketOption(t.conn),
+ Security: czSecurity,
+ },
+ )
t.logger = prefixLoggerForServerTransport(t)
t.controlBuf = newControlBuffer(t.done)
@@ -274,10 +288,6 @@ func NewServerTransport(conn net.Conn, config *ServerConfig) (_ ServerTransport,
updateFlowControl: t.updateFlowControl,
}
}
- t.channelzID, err = channelz.RegisterNormalSocket(t, config.ChannelzParentID, fmt.Sprintf("%s -> %s", t.peer.Addr, t.peer.LocalAddr))
- if err != nil {
- return nil, err
- }
t.connectionID = atomic.AddUint64(&serverConnectionCounter, 1)
t.framer.writer.Flush()
@@ -334,9 +344,11 @@ func NewServerTransport(conn net.Conn, config *ServerConfig) (_ ServerTransport,
// closed, would lead to a TCP RST instead of FIN, and the client
// encountering errors. For more info:
// https://github.com/grpc/grpc-go/issues/5358
+ timer := time.NewTimer(time.Second)
+ defer timer.Stop()
select {
case <-t.readerDone:
- case <-time.After(time.Second):
+ case <-timer.C:
}
t.conn.Close()
}
@@ -592,8 +604,8 @@ func (t *http2Server) operateHeaders(ctx context.Context, frame *http2.MetaHeade
}
t.mu.Unlock()
if channelz.IsOn() {
- atomic.AddInt64(&t.czData.streamsStarted, 1)
- atomic.StoreInt64(&t.czData.lastStreamCreatedTime, time.Now().UnixNano())
+ t.channelz.SocketMetrics.StreamsStarted.Add(1)
+ t.channelz.SocketMetrics.LastRemoteStreamCreatedTimestamp.Store(time.Now().UnixNano())
}
s.requestRead = func(n int) {
t.adjustWindow(s, uint32(n))
@@ -652,18 +664,20 @@ func (t *http2Server) HandleStreams(ctx context.Context, handle func(*Stream)) {
}
continue
}
- if err == io.EOF || err == io.ErrUnexpectedEOF {
- t.Close(err)
- return
- }
t.Close(err)
return
}
switch frame := frame.(type) {
case *http2.MetaHeadersFrame:
if err := t.operateHeaders(ctx, frame, handle); err != nil {
- t.Close(err)
- break
+ // Any error processing client headers, e.g. invalid stream ID,
+ // is considered a protocol violation.
+ t.controlBuf.put(&goAway{
+ code: http2.ErrCodeProtocol,
+ debugData: []byte(err.Error()),
+ closeConn: err,
+ })
+ continue
}
case *http2.DataFrame:
t.handleData(frame)
@@ -1199,7 +1213,7 @@ func (t *http2Server) keepalive() {
}
if !outstandingPing {
if channelz.IsOn() {
- atomic.AddInt64(&t.czData.kpCount, 1)
+ t.channelz.SocketMetrics.KeepAlivesSent.Add(1)
}
t.controlBuf.put(p)
kpTimeoutLeft = t.kp.Timeout
@@ -1239,7 +1253,7 @@ func (t *http2Server) Close(err error) {
if err := t.conn.Close(); err != nil && t.logger.V(logLevel) {
t.logger.Infof("Error closing underlying net.Conn during Close: %v", err)
}
- channelz.RemoveEntry(t.channelzID)
+ channelz.RemoveEntry(t.channelz.ID)
// Cancel all active streams.
for _, s := range streams {
s.cancel()
@@ -1260,9 +1274,9 @@ func (t *http2Server) deleteStream(s *Stream, eosReceived bool) {
if channelz.IsOn() {
if eosReceived {
- atomic.AddInt64(&t.czData.streamsSucceeded, 1)
+ t.channelz.SocketMetrics.StreamsSucceeded.Add(1)
} else {
- atomic.AddInt64(&t.czData.streamsFailed, 1)
+ t.channelz.SocketMetrics.StreamsFailed.Add(1)
}
}
}
@@ -1379,38 +1393,21 @@ func (t *http2Server) outgoingGoAwayHandler(g *goAway) (bool, error) {
return false, nil
}
-func (t *http2Server) ChannelzMetric() *channelz.SocketInternalMetric {
- s := channelz.SocketInternalMetric{
- StreamsStarted: atomic.LoadInt64(&t.czData.streamsStarted),
- StreamsSucceeded: atomic.LoadInt64(&t.czData.streamsSucceeded),
- StreamsFailed: atomic.LoadInt64(&t.czData.streamsFailed),
- MessagesSent: atomic.LoadInt64(&t.czData.msgSent),
- MessagesReceived: atomic.LoadInt64(&t.czData.msgRecv),
- KeepAlivesSent: atomic.LoadInt64(&t.czData.kpCount),
- LastRemoteStreamCreatedTimestamp: time.Unix(0, atomic.LoadInt64(&t.czData.lastStreamCreatedTime)),
- LastMessageSentTimestamp: time.Unix(0, atomic.LoadInt64(&t.czData.lastMsgSentTime)),
- LastMessageReceivedTimestamp: time.Unix(0, atomic.LoadInt64(&t.czData.lastMsgRecvTime)),
- LocalFlowControlWindow: int64(t.fc.getSize()),
- SocketOptions: channelz.GetSocketOption(t.conn),
- LocalAddr: t.peer.LocalAddr,
- RemoteAddr: t.peer.Addr,
- // RemoteName :
- }
- if au, ok := t.peer.AuthInfo.(credentials.ChannelzSecurityInfo); ok {
- s.Security = au.GetSecurityValue()
- }
- s.RemoteFlowControlWindow = t.getOutFlowWindow()
- return &s
+func (t *http2Server) socketMetrics() *channelz.EphemeralSocketMetrics {
+ return &channelz.EphemeralSocketMetrics{
+ LocalFlowControlWindow: int64(t.fc.getSize()),
+ RemoteFlowControlWindow: t.getOutFlowWindow(),
+ }
}
func (t *http2Server) IncrMsgSent() {
- atomic.AddInt64(&t.czData.msgSent, 1)
- atomic.StoreInt64(&t.czData.lastMsgSentTime, time.Now().UnixNano())
+ t.channelz.SocketMetrics.MessagesSent.Add(1)
+ t.channelz.SocketMetrics.LastMessageSentTimestamp.Add(1)
}
func (t *http2Server) IncrMsgRecv() {
- atomic.AddInt64(&t.czData.msgRecv, 1)
- atomic.StoreInt64(&t.czData.lastMsgRecvTime, time.Now().UnixNano())
+ t.channelz.SocketMetrics.MessagesReceived.Add(1)
+ t.channelz.SocketMetrics.LastMessageReceivedTimestamp.Add(1)
}
func (t *http2Server) getOutFlowWindow() int64 {
diff --git a/vendor/google.golang.org/grpc/internal/transport/http_util.go b/vendor/google.golang.org/grpc/internal/transport/http_util.go
index dc29d590e..39cef3bd4 100644
--- a/vendor/google.golang.org/grpc/internal/transport/http_util.go
+++ b/vendor/google.golang.org/grpc/internal/transport/http_util.go
@@ -418,10 +418,9 @@ func newFramer(conn net.Conn, writeBufferSize, readBufferSize int, sharedWriteBu
return f
}
-func getWriteBufferPool(writeBufferSize int) *sync.Pool {
+func getWriteBufferPool(size int) *sync.Pool {
writeBufferMutex.Lock()
defer writeBufferMutex.Unlock()
- size := writeBufferSize * 2
pool, ok := writeBufferPoolMap[size]
if ok {
return pool
diff --git a/vendor/google.golang.org/grpc/internal/transport/transport.go b/vendor/google.golang.org/grpc/internal/transport/transport.go
index b7b8fec18..0d2a6e47f 100644
--- a/vendor/google.golang.org/grpc/internal/transport/transport.go
+++ b/vendor/google.golang.org/grpc/internal/transport/transport.go
@@ -28,6 +28,7 @@ import (
"fmt"
"io"
"net"
+ "strings"
"sync"
"sync/atomic"
"time"
@@ -362,8 +363,12 @@ func (s *Stream) SendCompress() string {
// ClientAdvertisedCompressors returns the compressor names advertised by the
// client via grpc-accept-encoding header.
-func (s *Stream) ClientAdvertisedCompressors() string {
- return s.clientAdvertisedCompressors
+func (s *Stream) ClientAdvertisedCompressors() []string {
+ values := strings.Split(s.clientAdvertisedCompressors, ",")
+ for i, v := range values {
+ values[i] = strings.TrimSpace(v)
+ }
+ return values
}
// Done returns a channel which is closed when it receives the final status
@@ -566,7 +571,7 @@ type ServerConfig struct {
WriteBufferSize int
ReadBufferSize int
SharedWriteBuffer bool
- ChannelzParentID *channelz.Identifier
+ ChannelzParent *channelz.Server
MaxHeaderListSize *uint32
HeaderTableSize *uint32
}
@@ -601,8 +606,8 @@ type ConnectOptions struct {
ReadBufferSize int
// SharedWriteBuffer indicates whether connections should reuse write buffer
SharedWriteBuffer bool
- // ChannelzParentID sets the addrConn id which initiate the creation of this client transport.
- ChannelzParentID *channelz.Identifier
+ // ChannelzParent sets the addrConn id which initiated the creation of this client transport.
+ ChannelzParent *channelz.SubChannel
// MaxHeaderListSize sets the max (uncompressed) size of header list that is prepared to be received.
MaxHeaderListSize *uint32
// UseProxy specifies if a proxy should be used.
@@ -815,30 +820,6 @@ const (
GoAwayTooManyPings GoAwayReason = 2
)
-// channelzData is used to store channelz related data for http2Client and http2Server.
-// These fields cannot be embedded in the original structs (e.g. http2Client), since to do atomic
-// operation on int64 variable on 32-bit machine, user is responsible to enforce memory alignment.
-// Here, by grouping those int64 fields inside a struct, we are enforcing the alignment.
-type channelzData struct {
- kpCount int64
- // The number of streams that have started, including already finished ones.
- streamsStarted int64
- // Client side: The number of streams that have ended successfully by receiving
- // EoS bit set frame from server.
- // Server side: The number of streams that have ended successfully by sending
- // frame with EoS bit set.
- streamsSucceeded int64
- streamsFailed int64
- // lastStreamCreatedTime stores the timestamp that the last stream gets created. It is of int64 type
- // instead of time.Time since it's more costly to atomically update time.Time variable than int64
- // variable. The same goes for lastMsgSentTime and lastMsgRecvTime.
- lastStreamCreatedTime int64
- msgSent int64
- msgRecv int64
- lastMsgSentTime int64
- lastMsgRecvTime int64
-}
-
// ContextErr converts the error from context package into a status error.
func ContextErr(err error) error {
switch err {
diff --git a/vendor/google.golang.org/grpc/internal/xds_handshake_cluster.go b/vendor/google.golang.org/grpc/internal/xds_handshake_cluster.go
deleted file mode 100644
index e8b492774..000000000
--- a/vendor/google.golang.org/grpc/internal/xds_handshake_cluster.go
+++ /dev/null
@@ -1,40 +0,0 @@
-/*
- * Copyright 2021 gRPC authors.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package internal
-
-import (
- "google.golang.org/grpc/attributes"
- "google.golang.org/grpc/resolver"
-)
-
-// handshakeClusterNameKey is the type used as the key to store cluster name in
-// the Attributes field of resolver.Address.
-type handshakeClusterNameKey struct{}
-
-// SetXDSHandshakeClusterName returns a copy of addr in which the Attributes field
-// is updated with the cluster name.
-func SetXDSHandshakeClusterName(addr resolver.Address, clusterName string) resolver.Address {
- addr.Attributes = addr.Attributes.WithValue(handshakeClusterNameKey{}, clusterName)
- return addr
-}
-
-// GetXDSHandshakeClusterName returns cluster name stored in attr.
-func GetXDSHandshakeClusterName(attr *attributes.Attributes) (string, bool) {
- v := attr.Value(handshakeClusterNameKey{})
- name, ok := v.(string)
- return name, ok
-}