summaryrefslogtreecommitdiff
path: root/vendor/google.golang.org/grpc/internal
diff options
context:
space:
mode:
authorLibravatar tobi <31960611+tsmethurst@users.noreply.github.com>2024-09-16 11:06:00 +0200
committerLibravatar GitHub <noreply@github.com>2024-09-16 09:06:00 +0000
commitb2572b9e074ebbce8bcf1b9979d4d8ea066650d6 (patch)
tree0c2a08ed63b582676ce7661252a6917db751c62a /vendor/google.golang.org/grpc/internal
parent[chore]: Bump golang.org/x/net from 0.28.0 to 0.29.0 (#3303) (diff)
downloadgotosocial-b2572b9e074ebbce8bcf1b9979d4d8ea066650d6.tar.xz
[chore] Bump otel deps -> v1.30.0/v0.52.0 (#3307)
Diffstat (limited to 'vendor/google.golang.org/grpc/internal')
-rw-r--r--vendor/google.golang.org/grpc/internal/channelz/channelmap.go2
-rw-r--r--vendor/google.golang.org/grpc/internal/envconfig/envconfig.go4
-rw-r--r--vendor/google.golang.org/grpc/internal/experimental.go8
-rw-r--r--vendor/google.golang.org/grpc/internal/grpclog/grpclog.go126
-rw-r--r--vendor/google.golang.org/grpc/internal/grpclog/prefix_logger.go (renamed from vendor/google.golang.org/grpc/internal/grpclog/prefixLogger.go)40
-rw-r--r--vendor/google.golang.org/grpc/internal/grpcsync/callback_serializer.go24
-rw-r--r--vendor/google.golang.org/grpc/internal/grpcsync/pubsub.go4
-rw-r--r--vendor/google.golang.org/grpc/internal/internal.go21
-rw-r--r--vendor/google.golang.org/grpc/internal/stats/labels.go42
-rw-r--r--vendor/google.golang.org/grpc/internal/stats/metrics_recorder_list.go95
-rw-r--r--vendor/google.golang.org/grpc/internal/tcp_keepalive_unix.go2
-rw-r--r--vendor/google.golang.org/grpc/internal/tcp_keepalive_windows.go2
-rw-r--r--vendor/google.golang.org/grpc/internal/transport/controlbuf.go249
-rw-r--r--vendor/google.golang.org/grpc/internal/transport/handler_server.go45
-rw-r--r--vendor/google.golang.org/grpc/internal/transport/http2_client.go61
-rw-r--r--vendor/google.golang.org/grpc/internal/transport/http2_server.go47
-rw-r--r--vendor/google.golang.org/grpc/internal/transport/http_util.go22
-rw-r--r--vendor/google.golang.org/grpc/internal/transport/proxy.go10
-rw-r--r--vendor/google.golang.org/grpc/internal/transport/transport.go242
19 files changed, 640 insertions, 406 deletions
diff --git a/vendor/google.golang.org/grpc/internal/channelz/channelmap.go b/vendor/google.golang.org/grpc/internal/channelz/channelmap.go
index dfe18b089..bb531225d 100644
--- a/vendor/google.golang.org/grpc/internal/channelz/channelmap.go
+++ b/vendor/google.golang.org/grpc/internal/channelz/channelmap.go
@@ -46,7 +46,7 @@ type entry interface {
// channelMap is the storage data structure for channelz.
//
-// Methods of channelMap can be divided in two two categories with respect to
+// Methods of channelMap can be divided into two categories with respect to
// locking.
//
// 1. Methods acquire the global lock.
diff --git a/vendor/google.golang.org/grpc/internal/envconfig/envconfig.go b/vendor/google.golang.org/grpc/internal/envconfig/envconfig.go
index d90648713..00abc7c2b 100644
--- a/vendor/google.golang.org/grpc/internal/envconfig/envconfig.go
+++ b/vendor/google.golang.org/grpc/internal/envconfig/envconfig.go
@@ -46,6 +46,10 @@ var (
// by setting the environment variable "GRPC_ENFORCE_ALPN_ENABLED" to "true"
// or "false".
EnforceALPNEnabled = boolFromEnv("GRPC_ENFORCE_ALPN_ENABLED", false)
+ // XDSFallbackSupport is the env variable that controls whether support for
+ // xDS fallback is turned on. If this is unset or is false, only the first
+ // xDS server in the list of server configs will be used.
+ XDSFallbackSupport = boolFromEnv("GRPC_EXPERIMENTAL_XDS_FALLBACK", false)
)
func boolFromEnv(envVar string, def bool) bool {
diff --git a/vendor/google.golang.org/grpc/internal/experimental.go b/vendor/google.golang.org/grpc/internal/experimental.go
index 7f7044e17..7617be215 100644
--- a/vendor/google.golang.org/grpc/internal/experimental.go
+++ b/vendor/google.golang.org/grpc/internal/experimental.go
@@ -18,11 +18,11 @@
package internal
var (
- // WithRecvBufferPool is implemented by the grpc package and returns a dial
+ // WithBufferPool is implemented by the grpc package and returns a dial
// option to configure a shared buffer pool for a grpc.ClientConn.
- WithRecvBufferPool any // func (grpc.SharedBufferPool) grpc.DialOption
+ WithBufferPool any // func (grpc.SharedBufferPool) grpc.DialOption
- // RecvBufferPool is implemented by the grpc package and returns a server
+ // BufferPool is implemented by the grpc package and returns a server
// option to configure a shared buffer pool for a grpc.Server.
- RecvBufferPool any // func (grpc.SharedBufferPool) grpc.ServerOption
+ BufferPool any // func (grpc.SharedBufferPool) grpc.ServerOption
)
diff --git a/vendor/google.golang.org/grpc/internal/grpclog/grpclog.go b/vendor/google.golang.org/grpc/internal/grpclog/grpclog.go
deleted file mode 100644
index bfc45102a..000000000
--- a/vendor/google.golang.org/grpc/internal/grpclog/grpclog.go
+++ /dev/null
@@ -1,126 +0,0 @@
-/*
- *
- * Copyright 2020 gRPC authors.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-// Package grpclog (internal) defines depth logging for grpc.
-package grpclog
-
-import (
- "os"
-)
-
-// Logger is the logger used for the non-depth log functions.
-var Logger LoggerV2
-
-// DepthLogger is the logger used for the depth log functions.
-var DepthLogger DepthLoggerV2
-
-// InfoDepth logs to the INFO log at the specified depth.
-func InfoDepth(depth int, args ...any) {
- if DepthLogger != nil {
- DepthLogger.InfoDepth(depth, args...)
- } else {
- Logger.Infoln(args...)
- }
-}
-
-// WarningDepth logs to the WARNING log at the specified depth.
-func WarningDepth(depth int, args ...any) {
- if DepthLogger != nil {
- DepthLogger.WarningDepth(depth, args...)
- } else {
- Logger.Warningln(args...)
- }
-}
-
-// ErrorDepth logs to the ERROR log at the specified depth.
-func ErrorDepth(depth int, args ...any) {
- if DepthLogger != nil {
- DepthLogger.ErrorDepth(depth, args...)
- } else {
- Logger.Errorln(args...)
- }
-}
-
-// FatalDepth logs to the FATAL log at the specified depth.
-func FatalDepth(depth int, args ...any) {
- if DepthLogger != nil {
- DepthLogger.FatalDepth(depth, args...)
- } else {
- Logger.Fatalln(args...)
- }
- os.Exit(1)
-}
-
-// LoggerV2 does underlying logging work for grpclog.
-// This is a copy of the LoggerV2 defined in the external grpclog package. It
-// is defined here to avoid a circular dependency.
-type LoggerV2 interface {
- // Info logs to INFO log. Arguments are handled in the manner of fmt.Print.
- Info(args ...any)
- // Infoln logs to INFO log. Arguments are handled in the manner of fmt.Println.
- Infoln(args ...any)
- // Infof logs to INFO log. Arguments are handled in the manner of fmt.Printf.
- Infof(format string, args ...any)
- // Warning logs to WARNING log. Arguments are handled in the manner of fmt.Print.
- Warning(args ...any)
- // Warningln logs to WARNING log. Arguments are handled in the manner of fmt.Println.
- Warningln(args ...any)
- // Warningf logs to WARNING log. Arguments are handled in the manner of fmt.Printf.
- Warningf(format string, args ...any)
- // Error logs to ERROR log. Arguments are handled in the manner of fmt.Print.
- Error(args ...any)
- // Errorln logs to ERROR log. Arguments are handled in the manner of fmt.Println.
- Errorln(args ...any)
- // Errorf logs to ERROR log. Arguments are handled in the manner of fmt.Printf.
- Errorf(format string, args ...any)
- // Fatal logs to ERROR log. Arguments are handled in the manner of fmt.Print.
- // gRPC ensures that all Fatal logs will exit with os.Exit(1).
- // Implementations may also call os.Exit() with a non-zero exit code.
- Fatal(args ...any)
- // Fatalln logs to ERROR log. Arguments are handled in the manner of fmt.Println.
- // gRPC ensures that all Fatal logs will exit with os.Exit(1).
- // Implementations may also call os.Exit() with a non-zero exit code.
- Fatalln(args ...any)
- // Fatalf logs to ERROR log. Arguments are handled in the manner of fmt.Printf.
- // gRPC ensures that all Fatal logs will exit with os.Exit(1).
- // Implementations may also call os.Exit() with a non-zero exit code.
- Fatalf(format string, args ...any)
- // V reports whether verbosity level l is at least the requested verbose level.
- V(l int) bool
-}
-
-// DepthLoggerV2 logs at a specified call frame. If a LoggerV2 also implements
-// DepthLoggerV2, the below functions will be called with the appropriate stack
-// depth set for trivial functions the logger may ignore.
-// This is a copy of the DepthLoggerV2 defined in the external grpclog package.
-// It is defined here to avoid a circular dependency.
-//
-// # Experimental
-//
-// Notice: This type is EXPERIMENTAL and may be changed or removed in a
-// later release.
-type DepthLoggerV2 interface {
- // InfoDepth logs to INFO log at the specified depth. Arguments are handled in the manner of fmt.Println.
- InfoDepth(depth int, args ...any)
- // WarningDepth logs to WARNING log at the specified depth. Arguments are handled in the manner of fmt.Println.
- WarningDepth(depth int, args ...any)
- // ErrorDepth logs to ERROR log at the specified depth. Arguments are handled in the manner of fmt.Println.
- ErrorDepth(depth int, args ...any)
- // FatalDepth logs to FATAL log at the specified depth. Arguments are handled in the manner of fmt.Println.
- FatalDepth(depth int, args ...any)
-}
diff --git a/vendor/google.golang.org/grpc/internal/grpclog/prefixLogger.go b/vendor/google.golang.org/grpc/internal/grpclog/prefix_logger.go
index faa998de7..092ad187a 100644
--- a/vendor/google.golang.org/grpc/internal/grpclog/prefixLogger.go
+++ b/vendor/google.golang.org/grpc/internal/grpclog/prefix_logger.go
@@ -16,17 +16,21 @@
*
*/
+// Package grpclog provides logging functionality for internal gRPC packages,
+// outside of the functionality provided by the external `grpclog` package.
package grpclog
import (
"fmt"
+
+ "google.golang.org/grpc/grpclog"
)
// PrefixLogger does logging with a prefix.
//
// Logging method on a nil logs without any prefix.
type PrefixLogger struct {
- logger DepthLoggerV2
+ logger grpclog.DepthLoggerV2
prefix string
}
@@ -38,7 +42,7 @@ func (pl *PrefixLogger) Infof(format string, args ...any) {
pl.logger.InfoDepth(1, fmt.Sprintf(format, args...))
return
}
- InfoDepth(1, fmt.Sprintf(format, args...))
+ grpclog.InfoDepth(1, fmt.Sprintf(format, args...))
}
// Warningf does warning logging.
@@ -48,7 +52,7 @@ func (pl *PrefixLogger) Warningf(format string, args ...any) {
pl.logger.WarningDepth(1, fmt.Sprintf(format, args...))
return
}
- WarningDepth(1, fmt.Sprintf(format, args...))
+ grpclog.WarningDepth(1, fmt.Sprintf(format, args...))
}
// Errorf does error logging.
@@ -58,36 +62,18 @@ func (pl *PrefixLogger) Errorf(format string, args ...any) {
pl.logger.ErrorDepth(1, fmt.Sprintf(format, args...))
return
}
- ErrorDepth(1, fmt.Sprintf(format, args...))
-}
-
-// Debugf does info logging at verbose level 2.
-func (pl *PrefixLogger) Debugf(format string, args ...any) {
- // TODO(6044): Refactor interfaces LoggerV2 and DepthLogger, and maybe
- // rewrite PrefixLogger a little to ensure that we don't use the global
- // `Logger` here, and instead use the `logger` field.
- if !Logger.V(2) {
- return
- }
- if pl != nil {
- // Handle nil, so the tests can pass in a nil logger.
- format = pl.prefix + format
- pl.logger.InfoDepth(1, fmt.Sprintf(format, args...))
- return
- }
- InfoDepth(1, fmt.Sprintf(format, args...))
-
+ grpclog.ErrorDepth(1, fmt.Sprintf(format, args...))
}
// V reports whether verbosity level l is at least the requested verbose level.
func (pl *PrefixLogger) V(l int) bool {
- // TODO(6044): Refactor interfaces LoggerV2 and DepthLogger, and maybe
- // rewrite PrefixLogger a little to ensure that we don't use the global
- // `Logger` here, and instead use the `logger` field.
- return Logger.V(l)
+ if pl != nil {
+ return pl.logger.V(l)
+ }
+ return true
}
// NewPrefixLogger creates a prefix logger with the given prefix.
-func NewPrefixLogger(logger DepthLoggerV2, prefix string) *PrefixLogger {
+func NewPrefixLogger(logger grpclog.DepthLoggerV2, prefix string) *PrefixLogger {
return &PrefixLogger{logger: logger, prefix: prefix}
}
diff --git a/vendor/google.golang.org/grpc/internal/grpcsync/callback_serializer.go b/vendor/google.golang.org/grpc/internal/grpcsync/callback_serializer.go
index f7f40a16a..19b9d6392 100644
--- a/vendor/google.golang.org/grpc/internal/grpcsync/callback_serializer.go
+++ b/vendor/google.golang.org/grpc/internal/grpcsync/callback_serializer.go
@@ -53,16 +53,28 @@ func NewCallbackSerializer(ctx context.Context) *CallbackSerializer {
return cs
}
-// Schedule adds a callback to be scheduled after existing callbacks are run.
+// TrySchedule tries to schedules the provided callback function f to be
+// executed in the order it was added. This is a best-effort operation. If the
+// context passed to NewCallbackSerializer was canceled before this method is
+// called, the callback will not be scheduled.
//
// Callbacks are expected to honor the context when performing any blocking
// operations, and should return early when the context is canceled.
+func (cs *CallbackSerializer) TrySchedule(f func(ctx context.Context)) {
+ cs.callbacks.Put(f)
+}
+
+// ScheduleOr schedules the provided callback function f to be executed in the
+// order it was added. If the context passed to NewCallbackSerializer has been
+// canceled before this method is called, the onFailure callback will be
+// executed inline instead.
//
-// Return value indicates if the callback was successfully added to the list of
-// callbacks to be executed by the serializer. It is not possible to add
-// callbacks once the context passed to NewCallbackSerializer is cancelled.
-func (cs *CallbackSerializer) Schedule(f func(ctx context.Context)) bool {
- return cs.callbacks.Put(f) == nil
+// Callbacks are expected to honor the context when performing any blocking
+// operations, and should return early when the context is canceled.
+func (cs *CallbackSerializer) ScheduleOr(f func(ctx context.Context), onFailure func()) {
+ if cs.callbacks.Put(f) != nil {
+ onFailure()
+ }
}
func (cs *CallbackSerializer) run(ctx context.Context) {
diff --git a/vendor/google.golang.org/grpc/internal/grpcsync/pubsub.go b/vendor/google.golang.org/grpc/internal/grpcsync/pubsub.go
index aef8cec1a..6d8c2f518 100644
--- a/vendor/google.golang.org/grpc/internal/grpcsync/pubsub.go
+++ b/vendor/google.golang.org/grpc/internal/grpcsync/pubsub.go
@@ -77,7 +77,7 @@ func (ps *PubSub) Subscribe(sub Subscriber) (cancel func()) {
if ps.msg != nil {
msg := ps.msg
- ps.cs.Schedule(func(context.Context) {
+ ps.cs.TrySchedule(func(context.Context) {
ps.mu.Lock()
defer ps.mu.Unlock()
if !ps.subscribers[sub] {
@@ -103,7 +103,7 @@ func (ps *PubSub) Publish(msg any) {
ps.msg = msg
for sub := range ps.subscribers {
s := sub
- ps.cs.Schedule(func(context.Context) {
+ ps.cs.TrySchedule(func(context.Context) {
ps.mu.Lock()
defer ps.mu.Unlock()
if !ps.subscribers[s] {
diff --git a/vendor/google.golang.org/grpc/internal/internal.go b/vendor/google.golang.org/grpc/internal/internal.go
index 5d6653986..65f936a62 100644
--- a/vendor/google.golang.org/grpc/internal/internal.go
+++ b/vendor/google.golang.org/grpc/internal/internal.go
@@ -208,6 +208,27 @@ var (
// ShuffleAddressListForTesting pseudo-randomizes the order of addresses. n
// is the number of elements. swap swaps the elements with indexes i and j.
ShuffleAddressListForTesting any // func(n int, swap func(i, j int))
+
+ // ConnectedAddress returns the connected address for a SubConnState. The
+ // address is only valid if the state is READY.
+ ConnectedAddress any // func (scs SubConnState) resolver.Address
+
+ // SetConnectedAddress sets the connected address for a SubConnState.
+ SetConnectedAddress any // func(scs *SubConnState, addr resolver.Address)
+
+ // SnapshotMetricRegistryForTesting snapshots the global data of the metric
+ // registry. Registers a cleanup function on the provided testing.T that
+ // sets the metric registry to its original state. Only called in testing
+ // functions.
+ SnapshotMetricRegistryForTesting any // func(t *testing.T)
+
+ // SetDefaultBufferPoolForTesting updates the default buffer pool, for
+ // testing purposes.
+ SetDefaultBufferPoolForTesting any // func(mem.BufferPool)
+
+ // SetBufferPoolingThresholdForTesting updates the buffer pooling threshold, for
+ // testing purposes.
+ SetBufferPoolingThresholdForTesting any // func(int)
)
// HealthChecker defines the signature of the client-side LB channel health
diff --git a/vendor/google.golang.org/grpc/internal/stats/labels.go b/vendor/google.golang.org/grpc/internal/stats/labels.go
new file mode 100644
index 000000000..fd33af51a
--- /dev/null
+++ b/vendor/google.golang.org/grpc/internal/stats/labels.go
@@ -0,0 +1,42 @@
+/*
+ *
+ * Copyright 2024 gRPC authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+// Package stats provides internal stats related functionality.
+package stats
+
+import "context"
+
+// Labels are the labels for metrics.
+type Labels struct {
+ // TelemetryLabels are the telemetry labels to record.
+ TelemetryLabels map[string]string
+}
+
+type labelsKey struct{}
+
+// GetLabels returns the Labels stored in the context, or nil if there is one.
+func GetLabels(ctx context.Context) *Labels {
+ labels, _ := ctx.Value(labelsKey{}).(*Labels)
+ return labels
+}
+
+// SetLabels sets the Labels in the context.
+func SetLabels(ctx context.Context, labels *Labels) context.Context {
+ // could also append
+ return context.WithValue(ctx, labelsKey{}, labels)
+}
diff --git a/vendor/google.golang.org/grpc/internal/stats/metrics_recorder_list.go b/vendor/google.golang.org/grpc/internal/stats/metrics_recorder_list.go
new file mode 100644
index 000000000..be110d41f
--- /dev/null
+++ b/vendor/google.golang.org/grpc/internal/stats/metrics_recorder_list.go
@@ -0,0 +1,95 @@
+/*
+ * Copyright 2024 gRPC authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package stats
+
+import (
+ "fmt"
+
+ estats "google.golang.org/grpc/experimental/stats"
+ "google.golang.org/grpc/stats"
+)
+
+// MetricsRecorderList forwards Record calls to all of its metricsRecorders.
+//
+// It eats any record calls where the label values provided do not match the
+// number of label keys.
+type MetricsRecorderList struct {
+ // metricsRecorders are the metrics recorders this list will forward to.
+ metricsRecorders []estats.MetricsRecorder
+}
+
+// NewMetricsRecorderList creates a new metric recorder list with all the stats
+// handlers provided which implement the MetricsRecorder interface.
+// If no stats handlers provided implement the MetricsRecorder interface,
+// the MetricsRecorder list returned is a no-op.
+func NewMetricsRecorderList(shs []stats.Handler) *MetricsRecorderList {
+ var mrs []estats.MetricsRecorder
+ for _, sh := range shs {
+ if mr, ok := sh.(estats.MetricsRecorder); ok {
+ mrs = append(mrs, mr)
+ }
+ }
+ return &MetricsRecorderList{
+ metricsRecorders: mrs,
+ }
+}
+
+func verifyLabels(desc *estats.MetricDescriptor, labelsRecv ...string) {
+ if got, want := len(labelsRecv), len(desc.Labels)+len(desc.OptionalLabels); got != want {
+ panic(fmt.Sprintf("Received %d labels in call to record metric %q, but expected %d.", got, desc.Name, want))
+ }
+}
+
+func (l *MetricsRecorderList) RecordInt64Count(handle *estats.Int64CountHandle, incr int64, labels ...string) {
+ verifyLabels(handle.Descriptor(), labels...)
+
+ for _, metricRecorder := range l.metricsRecorders {
+ metricRecorder.RecordInt64Count(handle, incr, labels...)
+ }
+}
+
+func (l *MetricsRecorderList) RecordFloat64Count(handle *estats.Float64CountHandle, incr float64, labels ...string) {
+ verifyLabels(handle.Descriptor(), labels...)
+
+ for _, metricRecorder := range l.metricsRecorders {
+ metricRecorder.RecordFloat64Count(handle, incr, labels...)
+ }
+}
+
+func (l *MetricsRecorderList) RecordInt64Histo(handle *estats.Int64HistoHandle, incr int64, labels ...string) {
+ verifyLabels(handle.Descriptor(), labels...)
+
+ for _, metricRecorder := range l.metricsRecorders {
+ metricRecorder.RecordInt64Histo(handle, incr, labels...)
+ }
+}
+
+func (l *MetricsRecorderList) RecordFloat64Histo(handle *estats.Float64HistoHandle, incr float64, labels ...string) {
+ verifyLabels(handle.Descriptor(), labels...)
+
+ for _, metricRecorder := range l.metricsRecorders {
+ metricRecorder.RecordFloat64Histo(handle, incr, labels...)
+ }
+}
+
+func (l *MetricsRecorderList) RecordInt64Gauge(handle *estats.Int64GaugeHandle, incr int64, labels ...string) {
+ verifyLabels(handle.Descriptor(), labels...)
+
+ for _, metricRecorder := range l.metricsRecorders {
+ metricRecorder.RecordInt64Gauge(handle, incr, labels...)
+ }
+}
diff --git a/vendor/google.golang.org/grpc/internal/tcp_keepalive_unix.go b/vendor/google.golang.org/grpc/internal/tcp_keepalive_unix.go
index 078137b7f..7e7aaa546 100644
--- a/vendor/google.golang.org/grpc/internal/tcp_keepalive_unix.go
+++ b/vendor/google.golang.org/grpc/internal/tcp_keepalive_unix.go
@@ -44,7 +44,7 @@ func NetDialerWithTCPKeepalive() *net.Dialer {
// combination of unconditionally enabling TCP keepalives here, and
// disabling the overriding of TCP keepalive parameters by setting the
// KeepAlive field to a negative value above, results in OS defaults for
- // the TCP keealive interval and time parameters.
+ // the TCP keepalive interval and time parameters.
Control: func(_, _ string, c syscall.RawConn) error {
return c.Control(func(fd uintptr) {
unix.SetsockoptInt(int(fd), unix.SOL_SOCKET, unix.SO_KEEPALIVE, 1)
diff --git a/vendor/google.golang.org/grpc/internal/tcp_keepalive_windows.go b/vendor/google.golang.org/grpc/internal/tcp_keepalive_windows.go
index fd7d43a89..d5c1085ee 100644
--- a/vendor/google.golang.org/grpc/internal/tcp_keepalive_windows.go
+++ b/vendor/google.golang.org/grpc/internal/tcp_keepalive_windows.go
@@ -44,7 +44,7 @@ func NetDialerWithTCPKeepalive() *net.Dialer {
// combination of unconditionally enabling TCP keepalives here, and
// disabling the overriding of TCP keepalive parameters by setting the
// KeepAlive field to a negative value above, results in OS defaults for
- // the TCP keealive interval and time parameters.
+ // the TCP keepalive interval and time parameters.
Control: func(_, _ string, c syscall.RawConn) error {
return c.Control(func(fd uintptr) {
windows.SetsockoptInt(windows.Handle(fd), windows.SOL_SOCKET, windows.SO_KEEPALIVE, 1)
diff --git a/vendor/google.golang.org/grpc/internal/transport/controlbuf.go b/vendor/google.golang.org/grpc/internal/transport/controlbuf.go
index 3deadfb4a..ea0633bbd 100644
--- a/vendor/google.golang.org/grpc/internal/transport/controlbuf.go
+++ b/vendor/google.golang.org/grpc/internal/transport/controlbuf.go
@@ -32,6 +32,7 @@ import (
"golang.org/x/net/http2/hpack"
"google.golang.org/grpc/internal/grpclog"
"google.golang.org/grpc/internal/grpcutil"
+ "google.golang.org/grpc/mem"
"google.golang.org/grpc/status"
)
@@ -148,9 +149,9 @@ type dataFrame struct {
streamID uint32
endStream bool
h []byte
- d []byte
+ reader mem.Reader
// onEachWrite is called every time
- // a part of d is written out.
+ // a part of data is written out.
onEachWrite func()
}
@@ -289,18 +290,22 @@ func (l *outStreamList) dequeue() *outStream {
}
// controlBuffer is a way to pass information to loopy.
-// Information is passed as specific struct types called control frames.
-// A control frame not only represents data, messages or headers to be sent out
-// but can also be used to instruct loopy to update its internal state.
-// It shouldn't be confused with an HTTP2 frame, although some of the control frames
-// like dataFrame and headerFrame do go out on wire as HTTP2 frames.
+//
+// Information is passed as specific struct types called control frames. A
+// control frame not only represents data, messages or headers to be sent out
+// but can also be used to instruct loopy to update its internal state. It
+// shouldn't be confused with an HTTP2 frame, although some of the control
+// frames like dataFrame and headerFrame do go out on wire as HTTP2 frames.
type controlBuffer struct {
- ch chan struct{}
- done <-chan struct{}
+ wakeupCh chan struct{} // Unblocks readers waiting for something to read.
+ done <-chan struct{} // Closed when the transport is done.
+
+ // Mutex guards all the fields below, except trfChan which can be read
+ // atomically without holding mu.
mu sync.Mutex
- consumerWaiting bool
- list *itemList
- err error
+ consumerWaiting bool // True when readers are blocked waiting for new data.
+ closed bool // True when the controlbuf is finished.
+ list *itemList // List of queued control frames.
// transportResponseFrames counts the number of queued items that represent
// the response of an action initiated by the peer. trfChan is created
@@ -308,47 +313,59 @@ type controlBuffer struct {
// closed and nilled when transportResponseFrames drops below the
// threshold. Both fields are protected by mu.
transportResponseFrames int
- trfChan atomic.Value // chan struct{}
+ trfChan atomic.Pointer[chan struct{}]
}
func newControlBuffer(done <-chan struct{}) *controlBuffer {
return &controlBuffer{
- ch: make(chan struct{}, 1),
- list: &itemList{},
- done: done,
+ wakeupCh: make(chan struct{}, 1),
+ list: &itemList{},
+ done: done,
}
}
-// throttle blocks if there are too many incomingSettings/cleanupStreams in the
-// controlbuf.
+// throttle blocks if there are too many frames in the control buf that
+// represent the response of an action initiated by the peer, like
+// incomingSettings cleanupStreams etc.
func (c *controlBuffer) throttle() {
- ch, _ := c.trfChan.Load().(chan struct{})
- if ch != nil {
+ if ch := c.trfChan.Load(); ch != nil {
select {
- case <-ch:
+ case <-(*ch):
case <-c.done:
}
}
}
+// put adds an item to the controlbuf.
func (c *controlBuffer) put(it cbItem) error {
_, err := c.executeAndPut(nil, it)
return err
}
+// executeAndPut runs f, and if the return value is true, adds the given item to
+// the controlbuf. The item could be nil, in which case, this method simply
+// executes f and does not add the item to the controlbuf.
+//
+// The first return value indicates whether the item was successfully added to
+// the control buffer. A non-nil error, specifically ErrConnClosing, is returned
+// if the control buffer is already closed.
func (c *controlBuffer) executeAndPut(f func() bool, it cbItem) (bool, error) {
- var wakeUp bool
c.mu.Lock()
- if c.err != nil {
- c.mu.Unlock()
- return false, c.err
+ defer c.mu.Unlock()
+
+ if c.closed {
+ return false, ErrConnClosing
}
if f != nil {
if !f() { // f wasn't successful
- c.mu.Unlock()
return false, nil
}
}
+ if it == nil {
+ return true, nil
+ }
+
+ var wakeUp bool
if c.consumerWaiting {
wakeUp = true
c.consumerWaiting = false
@@ -359,98 +376,102 @@ func (c *controlBuffer) executeAndPut(f func() bool, it cbItem) (bool, error) {
if c.transportResponseFrames == maxQueuedTransportResponseFrames {
// We are adding the frame that puts us over the threshold; create
// a throttling channel.
- c.trfChan.Store(make(chan struct{}))
+ ch := make(chan struct{})
+ c.trfChan.Store(&ch)
}
}
- c.mu.Unlock()
if wakeUp {
select {
- case c.ch <- struct{}{}:
+ case c.wakeupCh <- struct{}{}:
default:
}
}
return true, nil
}
-// Note argument f should never be nil.
-func (c *controlBuffer) execute(f func(it any) bool, it any) (bool, error) {
- c.mu.Lock()
- if c.err != nil {
- c.mu.Unlock()
- return false, c.err
- }
- if !f(it) { // f wasn't successful
- c.mu.Unlock()
- return false, nil
- }
- c.mu.Unlock()
- return true, nil
-}
-
+// get returns the next control frame from the control buffer. If block is true
+// **and** there are no control frames in the control buffer, the call blocks
+// until one of the conditions is met: there is a frame to return or the
+// transport is closed.
func (c *controlBuffer) get(block bool) (any, error) {
for {
c.mu.Lock()
- if c.err != nil {
+ frame, err := c.getOnceLocked()
+ if frame != nil || err != nil || !block {
+ // If we read a frame or an error, we can return to the caller. The
+ // call to getOnceLocked() returns a nil frame and a nil error if
+ // there is nothing to read, and in that case, if the caller asked
+ // us not to block, we can return now as well.
c.mu.Unlock()
- return nil, c.err
- }
- if !c.list.isEmpty() {
- h := c.list.dequeue().(cbItem)
- if h.isTransportResponseFrame() {
- if c.transportResponseFrames == maxQueuedTransportResponseFrames {
- // We are removing the frame that put us over the
- // threshold; close and clear the throttling channel.
- ch := c.trfChan.Load().(chan struct{})
- close(ch)
- c.trfChan.Store((chan struct{})(nil))
- }
- c.transportResponseFrames--
- }
- c.mu.Unlock()
- return h, nil
- }
- if !block {
- c.mu.Unlock()
- return nil, nil
+ return frame, err
}
c.consumerWaiting = true
c.mu.Unlock()
+
+ // Release the lock above and wait to be woken up.
select {
- case <-c.ch:
+ case <-c.wakeupCh:
case <-c.done:
return nil, errors.New("transport closed by client")
}
}
}
+// Callers must not use this method, but should instead use get().
+//
+// Caller must hold c.mu.
+func (c *controlBuffer) getOnceLocked() (any, error) {
+ if c.closed {
+ return false, ErrConnClosing
+ }
+ if c.list.isEmpty() {
+ return nil, nil
+ }
+ h := c.list.dequeue().(cbItem)
+ if h.isTransportResponseFrame() {
+ if c.transportResponseFrames == maxQueuedTransportResponseFrames {
+ // We are removing the frame that put us over the
+ // threshold; close and clear the throttling channel.
+ ch := c.trfChan.Swap(nil)
+ close(*ch)
+ }
+ c.transportResponseFrames--
+ }
+ return h, nil
+}
+
+// finish closes the control buffer, cleaning up any streams that have queued
+// header frames. Once this method returns, no more frames can be added to the
+// control buffer, and attempts to do so will return ErrConnClosing.
func (c *controlBuffer) finish() {
c.mu.Lock()
- if c.err != nil {
- c.mu.Unlock()
+ defer c.mu.Unlock()
+
+ if c.closed {
return
}
- c.err = ErrConnClosing
+ c.closed = true
// There may be headers for streams in the control buffer.
// These streams need to be cleaned out since the transport
// is still not aware of these yet.
for head := c.list.dequeueAll(); head != nil; head = head.next {
- hdr, ok := head.it.(*headerFrame)
- if !ok {
- continue
- }
- if hdr.onOrphaned != nil { // It will be nil on the server-side.
- hdr.onOrphaned(ErrConnClosing)
+ switch v := head.it.(type) {
+ case *headerFrame:
+ if v.onOrphaned != nil { // It will be nil on the server-side.
+ v.onOrphaned(ErrConnClosing)
+ }
+ case *dataFrame:
+ _ = v.reader.Close()
}
}
+
// In case throttle() is currently in flight, it needs to be unblocked.
// Otherwise, the transport may not close, since the transport is closed by
// the reader encountering the connection error.
- ch, _ := c.trfChan.Load().(chan struct{})
+ ch := c.trfChan.Swap(nil)
if ch != nil {
- close(ch)
+ close(*ch)
}
- c.trfChan.Store((chan struct{})(nil))
- c.mu.Unlock()
}
type side int
@@ -466,7 +487,7 @@ const (
// stream maintains a queue of data frames; as loopy receives data frames
// it gets added to the queue of the relevant stream.
// Loopy goes over this list of active streams by processing one node every iteration,
-// thereby closely resemebling to a round-robin scheduling over all streams. While
+// thereby closely resembling a round-robin scheduling over all streams. While
// processing a stream, loopy writes out data bytes from this stream capped by the min
// of http2MaxFrameLen, connection-level flow control and stream-level flow control.
type loopyWriter struct {
@@ -490,12 +511,13 @@ type loopyWriter struct {
draining bool
conn net.Conn
logger *grpclog.PrefixLogger
+ bufferPool mem.BufferPool
// Side-specific handlers
ssGoAwayHandler func(*goAway) (bool, error)
}
-func newLoopyWriter(s side, fr *framer, cbuf *controlBuffer, bdpEst *bdpEstimator, conn net.Conn, logger *grpclog.PrefixLogger, goAwayHandler func(*goAway) (bool, error)) *loopyWriter {
+func newLoopyWriter(s side, fr *framer, cbuf *controlBuffer, bdpEst *bdpEstimator, conn net.Conn, logger *grpclog.PrefixLogger, goAwayHandler func(*goAway) (bool, error), bufferPool mem.BufferPool) *loopyWriter {
var buf bytes.Buffer
l := &loopyWriter{
side: s,
@@ -511,6 +533,7 @@ func newLoopyWriter(s side, fr *framer, cbuf *controlBuffer, bdpEst *bdpEstimato
conn: conn,
logger: logger,
ssGoAwayHandler: goAwayHandler,
+ bufferPool: bufferPool,
}
return l
}
@@ -768,6 +791,11 @@ func (l *loopyWriter) cleanupStreamHandler(c *cleanupStream) error {
// not be established yet.
delete(l.estdStreams, c.streamID)
str.deleteSelf()
+ for head := str.itl.dequeueAll(); head != nil; head = head.next {
+ if df, ok := head.it.(*dataFrame); ok {
+ _ = df.reader.Close()
+ }
+ }
}
if c.rst { // If RST_STREAM needs to be sent.
if err := l.framer.fr.WriteRSTStream(c.streamID, c.rstCode); err != nil {
@@ -903,16 +931,18 @@ func (l *loopyWriter) processData() (bool, error) {
dataItem := str.itl.peek().(*dataFrame) // Peek at the first data item this stream.
// A data item is represented by a dataFrame, since it later translates into
// multiple HTTP2 data frames.
- // Every dataFrame has two buffers; h that keeps grpc-message header and d that is actual data.
- // As an optimization to keep wire traffic low, data from d is copied to h to make as big as the
- // maximum possible HTTP2 frame size.
+ // Every dataFrame has two buffers; h that keeps grpc-message header and data
+ // that is the actual message. As an optimization to keep wire traffic low, data
+ // from data is copied to h to make as big as the maximum possible HTTP2 frame
+ // size.
- if len(dataItem.h) == 0 && len(dataItem.d) == 0 { // Empty data frame
+ if len(dataItem.h) == 0 && dataItem.reader.Remaining() == 0 { // Empty data frame
// Client sends out empty data frame with endStream = true
if err := l.framer.fr.WriteData(dataItem.streamID, dataItem.endStream, nil); err != nil {
return false, err
}
str.itl.dequeue() // remove the empty data item from stream
+ _ = dataItem.reader.Close()
if str.itl.isEmpty() {
str.state = empty
} else if trailer, ok := str.itl.peek().(*headerFrame); ok { // the next item is trailers.
@@ -927,9 +957,7 @@ func (l *loopyWriter) processData() (bool, error) {
}
return false, nil
}
- var (
- buf []byte
- )
+
// Figure out the maximum size we can send
maxSize := http2MaxFrameLen
if strQuota := int(l.oiws) - str.bytesOutStanding; strQuota <= 0 { // stream-level flow control.
@@ -943,43 +971,50 @@ func (l *loopyWriter) processData() (bool, error) {
}
// Compute how much of the header and data we can send within quota and max frame length
hSize := min(maxSize, len(dataItem.h))
- dSize := min(maxSize-hSize, len(dataItem.d))
- if hSize != 0 {
- if dSize == 0 {
- buf = dataItem.h
- } else {
- // We can add some data to grpc message header to distribute bytes more equally across frames.
- // Copy on the stack to avoid generating garbage
- var localBuf [http2MaxFrameLen]byte
- copy(localBuf[:hSize], dataItem.h)
- copy(localBuf[hSize:], dataItem.d[:dSize])
- buf = localBuf[:hSize+dSize]
- }
+ dSize := min(maxSize-hSize, dataItem.reader.Remaining())
+ remainingBytes := len(dataItem.h) + dataItem.reader.Remaining() - hSize - dSize
+ size := hSize + dSize
+
+ var buf *[]byte
+
+ if hSize != 0 && dSize == 0 {
+ buf = &dataItem.h
} else {
- buf = dataItem.d
- }
+ // Note: this is only necessary because the http2.Framer does not support
+ // partially writing a frame, so the sequence must be materialized into a buffer.
+ // TODO: Revisit once https://github.com/golang/go/issues/66655 is addressed.
+ pool := l.bufferPool
+ if pool == nil {
+ // Note that this is only supposed to be nil in tests. Otherwise, stream is
+ // always initialized with a BufferPool.
+ pool = mem.DefaultBufferPool()
+ }
+ buf = pool.Get(size)
+ defer pool.Put(buf)
- size := hSize + dSize
+ copy((*buf)[:hSize], dataItem.h)
+ _, _ = dataItem.reader.Read((*buf)[hSize:])
+ }
// Now that outgoing flow controls are checked we can replenish str's write quota
str.wq.replenish(size)
var endStream bool
// If this is the last data message on this stream and all of it can be written in this iteration.
- if dataItem.endStream && len(dataItem.h)+len(dataItem.d) <= size {
+ if dataItem.endStream && remainingBytes == 0 {
endStream = true
}
if dataItem.onEachWrite != nil {
dataItem.onEachWrite()
}
- if err := l.framer.fr.WriteData(dataItem.streamID, endStream, buf[:size]); err != nil {
+ if err := l.framer.fr.WriteData(dataItem.streamID, endStream, (*buf)[:size]); err != nil {
return false, err
}
str.bytesOutStanding += size
l.sendQuota -= uint32(size)
dataItem.h = dataItem.h[hSize:]
- dataItem.d = dataItem.d[dSize:]
- if len(dataItem.h) == 0 && len(dataItem.d) == 0 { // All the data from that message was written out.
+ if remainingBytes == 0 { // All the data from that message was written out.
+ _ = dataItem.reader.Close()
str.itl.dequeue()
}
if str.itl.isEmpty() {
diff --git a/vendor/google.golang.org/grpc/internal/transport/handler_server.go b/vendor/google.golang.org/grpc/internal/transport/handler_server.go
index 4a3ddce29..e1cd86b2f 100644
--- a/vendor/google.golang.org/grpc/internal/transport/handler_server.go
+++ b/vendor/google.golang.org/grpc/internal/transport/handler_server.go
@@ -24,7 +24,6 @@
package transport
import (
- "bytes"
"context"
"errors"
"fmt"
@@ -40,6 +39,7 @@ import (
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/internal/grpclog"
"google.golang.org/grpc/internal/grpcutil"
+ "google.golang.org/grpc/mem"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/peer"
"google.golang.org/grpc/stats"
@@ -50,7 +50,7 @@ import (
// NewServerHandlerTransport returns a ServerTransport handling gRPC from
// inside an http.Handler, or writes an HTTP error to w and returns an error.
// It requires that the http Server supports HTTP/2.
-func NewServerHandlerTransport(w http.ResponseWriter, r *http.Request, stats []stats.Handler) (ServerTransport, error) {
+func NewServerHandlerTransport(w http.ResponseWriter, r *http.Request, stats []stats.Handler, bufferPool mem.BufferPool) (ServerTransport, error) {
if r.Method != http.MethodPost {
w.Header().Set("Allow", http.MethodPost)
msg := fmt.Sprintf("invalid gRPC request method %q", r.Method)
@@ -98,6 +98,7 @@ func NewServerHandlerTransport(w http.ResponseWriter, r *http.Request, stats []s
contentType: contentType,
contentSubtype: contentSubtype,
stats: stats,
+ bufferPool: bufferPool,
}
st.logger = prefixLoggerForServerHandlerTransport(st)
@@ -171,6 +172,8 @@ type serverHandlerTransport struct {
stats []stats.Handler
logger *grpclog.PrefixLogger
+
+ bufferPool mem.BufferPool
}
func (ht *serverHandlerTransport) Close(err error) {
@@ -244,6 +247,7 @@ func (ht *serverHandlerTransport) WriteStatus(s *Stream, st *status.Status) erro
}
s.hdrMu.Lock()
+ defer s.hdrMu.Unlock()
if p := st.Proto(); p != nil && len(p.Details) > 0 {
delete(s.trailer, grpcStatusDetailsBinHeader)
stBytes, err := proto.Marshal(p)
@@ -268,7 +272,6 @@ func (ht *serverHandlerTransport) WriteStatus(s *Stream, st *status.Status) erro
}
}
}
- s.hdrMu.Unlock()
})
if err == nil { // transport has not been closed
@@ -330,16 +333,28 @@ func (ht *serverHandlerTransport) writeCustomHeaders(s *Stream) {
s.hdrMu.Unlock()
}
-func (ht *serverHandlerTransport) Write(s *Stream, hdr []byte, data []byte, opts *Options) error {
+func (ht *serverHandlerTransport) Write(s *Stream, hdr []byte, data mem.BufferSlice, opts *Options) error {
+ // Always take a reference because otherwise there is no guarantee the data will
+ // be available after this function returns. This is what callers to Write
+ // expect.
+ data.Ref()
headersWritten := s.updateHeaderSent()
- return ht.do(func() {
+ err := ht.do(func() {
+ defer data.Free()
if !headersWritten {
ht.writePendingHeaders(s)
}
ht.rw.Write(hdr)
- ht.rw.Write(data)
+ for _, b := range data {
+ _, _ = ht.rw.Write(b.ReadOnlyData())
+ }
ht.rw.(http.Flusher).Flush()
})
+ if err != nil {
+ data.Free()
+ return err
+ }
+ return nil
}
func (ht *serverHandlerTransport) WriteHeader(s *Stream, md metadata.MD) error {
@@ -406,7 +421,7 @@ func (ht *serverHandlerTransport) HandleStreams(ctx context.Context, startStream
headerWireLength: 0, // won't have access to header wire length until golang/go#18997.
}
s.trReader = &transportReader{
- reader: &recvBufferReader{ctx: s.ctx, ctxDone: s.ctx.Done(), recv: s.buf, freeBuffer: func(*bytes.Buffer) {}},
+ reader: &recvBufferReader{ctx: s.ctx, ctxDone: s.ctx.Done(), recv: s.buf},
windowHandler: func(int) {},
}
@@ -415,21 +430,19 @@ func (ht *serverHandlerTransport) HandleStreams(ctx context.Context, startStream
go func() {
defer close(readerDone)
- // TODO: minimize garbage, optimize recvBuffer code/ownership
- const readSize = 8196
- for buf := make([]byte, readSize); ; {
- n, err := req.Body.Read(buf)
+ for {
+ buf := ht.bufferPool.Get(http2MaxFrameLen)
+ n, err := req.Body.Read(*buf)
if n > 0 {
- s.buf.put(recvMsg{buffer: bytes.NewBuffer(buf[:n:n])})
- buf = buf[n:]
+ *buf = (*buf)[:n]
+ s.buf.put(recvMsg{buffer: mem.NewBuffer(buf, ht.bufferPool)})
+ } else {
+ ht.bufferPool.Put(buf)
}
if err != nil {
s.buf.put(recvMsg{err: mapRecvMsgError(err)})
return
}
- if len(buf) == 0 {
- buf = make([]byte, readSize)
- }
}
}()
diff --git a/vendor/google.golang.org/grpc/internal/transport/http2_client.go b/vendor/google.golang.org/grpc/internal/transport/http2_client.go
index 3c63c7069..f46194fdc 100644
--- a/vendor/google.golang.org/grpc/internal/transport/http2_client.go
+++ b/vendor/google.golang.org/grpc/internal/transport/http2_client.go
@@ -47,6 +47,7 @@ import (
isyscall "google.golang.org/grpc/internal/syscall"
"google.golang.org/grpc/internal/transport/networktype"
"google.golang.org/grpc/keepalive"
+ "google.golang.org/grpc/mem"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/peer"
"google.golang.org/grpc/resolver"
@@ -59,6 +60,8 @@ import (
// atomically.
var clientConnectionCounter uint64
+var goAwayLoopyWriterTimeout = 5 * time.Second
+
var metadataFromOutgoingContextRaw = internal.FromOutgoingContextRaw.(func(context.Context) (metadata.MD, [][]string, bool))
// http2Client implements the ClientTransport interface with HTTP2.
@@ -144,7 +147,7 @@ type http2Client struct {
onClose func(GoAwayReason)
- bufferPool *bufferPool
+ bufferPool mem.BufferPool
connectionID uint64
logger *grpclog.PrefixLogger
@@ -229,7 +232,7 @@ func newHTTP2Client(connectCtx, ctx context.Context, addr resolver.Address, opts
}
}(conn)
- // The following defer and goroutine monitor the connectCtx for cancelation
+ // The following defer and goroutine monitor the connectCtx for cancellation
// and deadline. On context expiration, the connection is hard closed and
// this function will naturally fail as a result. Otherwise, the defer
// waits for the goroutine to exit to prevent the context from being
@@ -346,7 +349,7 @@ func newHTTP2Client(connectCtx, ctx context.Context, addr resolver.Address, opts
streamQuota: defaultMaxStreamsClient,
streamsQuotaAvailable: make(chan struct{}, 1),
keepaliveEnabled: keepaliveEnabled,
- bufferPool: newBufferPool(),
+ bufferPool: opts.BufferPool,
onClose: onClose,
}
var czSecurity credentials.ChannelzSecurityValue
@@ -463,7 +466,7 @@ func newHTTP2Client(connectCtx, ctx context.Context, addr resolver.Address, opts
return nil, err
}
go func() {
- t.loopy = newLoopyWriter(clientSide, t.framer, t.controlBuf, t.bdpEst, t.conn, t.logger, t.outgoingGoAwayHandler)
+ t.loopy = newLoopyWriter(clientSide, t.framer, t.controlBuf, t.bdpEst, t.conn, t.logger, t.outgoingGoAwayHandler, t.bufferPool)
if err := t.loopy.run(); !isIOError(err) {
// Immediately close the connection, as the loopy writer returns
// when there are no more active streams and we were draining (the
@@ -504,7 +507,6 @@ func (t *http2Client) newStream(ctx context.Context, callHdr *CallHdr) *Stream {
closeStream: func(err error) {
t.CloseStream(s, err)
},
- freeBuffer: t.bufferPool.put,
},
windowHandler: func(n int) {
t.updateWindow(s, uint32(n))
@@ -983,6 +985,7 @@ func (t *http2Client) closeStream(s *Stream, err error, rst bool, rstCode http2.
// only once on a transport. Once it is called, the transport should not be
// accessed anymore.
func (t *http2Client) Close(err error) {
+ t.conn.SetWriteDeadline(time.Now().Add(time.Second * 10))
t.mu.Lock()
// Make sure we only close once.
if t.state == closing {
@@ -1006,10 +1009,20 @@ func (t *http2Client) Close(err error) {
t.kpDormancyCond.Signal()
}
t.mu.Unlock()
+
// Per HTTP/2 spec, a GOAWAY frame must be sent before closing the
- // connection. See https://httpwg.org/specs/rfc7540.html#GOAWAY.
+ // connection. See https://httpwg.org/specs/rfc7540.html#GOAWAY. It
+ // also waits for loopyWriter to be closed with a timer to avoid the
+ // long blocking in case the connection is blackholed, i.e. TCP is
+ // just stuck.
t.controlBuf.put(&goAway{code: http2.ErrCodeNo, debugData: []byte("client transport shutdown"), closeConn: err})
- <-t.writerDone
+ timer := time.NewTimer(goAwayLoopyWriterTimeout)
+ defer timer.Stop()
+ select {
+ case <-t.writerDone: // success
+ case <-timer.C:
+ t.logger.Infof("Failed to write a GOAWAY frame as part of connection close after %s. Giving up and closing the transport.", goAwayLoopyWriterTimeout)
+ }
t.cancel()
t.conn.Close()
channelz.RemoveEntry(t.channelz.ID)
@@ -1065,27 +1078,36 @@ func (t *http2Client) GracefulClose() {
// Write formats the data into HTTP2 data frame(s) and sends it out. The caller
// should proceed only if Write returns nil.
-func (t *http2Client) Write(s *Stream, hdr []byte, data []byte, opts *Options) error {
+func (t *http2Client) Write(s *Stream, hdr []byte, data mem.BufferSlice, opts *Options) error {
+ reader := data.Reader()
+
if opts.Last {
// If it's the last message, update stream state.
if !s.compareAndSwapState(streamActive, streamWriteDone) {
+ _ = reader.Close()
return errStreamDone
}
} else if s.getState() != streamActive {
+ _ = reader.Close()
return errStreamDone
}
df := &dataFrame{
streamID: s.id,
endStream: opts.Last,
h: hdr,
- d: data,
+ reader: reader,
}
- if hdr != nil || data != nil { // If it's not an empty data frame, check quota.
- if err := s.wq.get(int32(len(hdr) + len(data))); err != nil {
+ if hdr != nil || df.reader.Remaining() != 0 { // If it's not an empty data frame, check quota.
+ if err := s.wq.get(int32(len(hdr) + df.reader.Remaining())); err != nil {
+ _ = reader.Close()
return err
}
}
- return t.controlBuf.put(df)
+ if err := t.controlBuf.put(df); err != nil {
+ _ = reader.Close()
+ return err
+ }
+ return nil
}
func (t *http2Client) getStream(f http2.Frame) *Stream {
@@ -1190,10 +1212,13 @@ func (t *http2Client) handleData(f *http2.DataFrame) {
// guarantee f.Data() is consumed before the arrival of next frame.
// Can this copy be eliminated?
if len(f.Data()) > 0 {
- buffer := t.bufferPool.get()
- buffer.Reset()
- buffer.Write(f.Data())
- s.write(recvMsg{buffer: buffer})
+ pool := t.bufferPool
+ if pool == nil {
+ // Note that this is only supposed to be nil in tests. Otherwise, stream is
+ // always initialized with a BufferPool.
+ pool = mem.DefaultBufferPool()
+ }
+ s.write(recvMsg{buffer: mem.Copy(f.Data(), pool)})
}
}
// The server has closed the stream without sending trailers. Record that
@@ -1222,7 +1247,7 @@ func (t *http2Client) handleRSTStream(f *http2.RSTStreamFrame) {
if statusCode == codes.Canceled {
if d, ok := s.ctx.Deadline(); ok && !d.After(time.Now()) {
// Our deadline was already exceeded, and that was likely the cause
- // of this cancelation. Alter the status code accordingly.
+ // of this cancellation. Alter the status code accordingly.
statusCode = codes.DeadlineExceeded
}
}
@@ -1307,7 +1332,7 @@ func (t *http2Client) handleGoAway(f *http2.GoAwayFrame) {
id := f.LastStreamID
if id > 0 && id%2 == 0 {
t.mu.Unlock()
- t.Close(connectionErrorf(true, nil, "received goaway with non-zero even-numbered numbered stream id: %v", id))
+ t.Close(connectionErrorf(true, nil, "received goaway with non-zero even-numbered stream id: %v", id))
return
}
// A client can receive multiple GoAways from the server (see
diff --git a/vendor/google.golang.org/grpc/internal/transport/http2_server.go b/vendor/google.golang.org/grpc/internal/transport/http2_server.go
index b7091165b..f5163f770 100644
--- a/vendor/google.golang.org/grpc/internal/transport/http2_server.go
+++ b/vendor/google.golang.org/grpc/internal/transport/http2_server.go
@@ -39,6 +39,7 @@ import (
"google.golang.org/grpc/internal/grpcutil"
"google.golang.org/grpc/internal/pretty"
"google.golang.org/grpc/internal/syscall"
+ "google.golang.org/grpc/mem"
"google.golang.org/protobuf/proto"
"google.golang.org/grpc/codes"
@@ -119,7 +120,7 @@ type http2Server struct {
// Fields below are for channelz metric collection.
channelz *channelz.Socket
- bufferPool *bufferPool
+ bufferPool mem.BufferPool
connectionID uint64
@@ -261,7 +262,7 @@ func NewServerTransport(conn net.Conn, config *ServerConfig) (_ ServerTransport,
idle: time.Now(),
kep: kep,
initialWindowSize: iwz,
- bufferPool: newBufferPool(),
+ bufferPool: config.BufferPool,
}
var czSecurity credentials.ChannelzSecurityValue
if au, ok := authInfo.(credentials.ChannelzSecurityInfo); ok {
@@ -330,7 +331,7 @@ func NewServerTransport(conn net.Conn, config *ServerConfig) (_ ServerTransport,
t.handleSettings(sf)
go func() {
- t.loopy = newLoopyWriter(serverSide, t.framer, t.controlBuf, t.bdpEst, t.conn, t.logger, t.outgoingGoAwayHandler)
+ t.loopy = newLoopyWriter(serverSide, t.framer, t.controlBuf, t.bdpEst, t.conn, t.logger, t.outgoingGoAwayHandler, t.bufferPool)
err := t.loopy.run()
close(t.loopyWriterDone)
if !isIOError(err) {
@@ -613,10 +614,9 @@ func (t *http2Server) operateHeaders(ctx context.Context, frame *http2.MetaHeade
s.wq = newWriteQuota(defaultWriteQuota, s.ctxDone)
s.trReader = &transportReader{
reader: &recvBufferReader{
- ctx: s.ctx,
- ctxDone: s.ctxDone,
- recv: s.buf,
- freeBuffer: t.bufferPool.put,
+ ctx: s.ctx,
+ ctxDone: s.ctxDone,
+ recv: s.buf,
},
windowHandler: func(n int) {
t.updateWindow(s, uint32(n))
@@ -813,10 +813,13 @@ func (t *http2Server) handleData(f *http2.DataFrame) {
// guarantee f.Data() is consumed before the arrival of next frame.
// Can this copy be eliminated?
if len(f.Data()) > 0 {
- buffer := t.bufferPool.get()
- buffer.Reset()
- buffer.Write(f.Data())
- s.write(recvMsg{buffer: buffer})
+ pool := t.bufferPool
+ if pool == nil {
+ // Note that this is only supposed to be nil in tests. Otherwise, stream is
+ // always initialized with a BufferPool.
+ pool = mem.DefaultBufferPool()
+ }
+ s.write(recvMsg{buffer: mem.Copy(f.Data(), pool)})
}
}
if f.StreamEnded() {
@@ -1089,7 +1092,9 @@ func (t *http2Server) WriteStatus(s *Stream, st *status.Status) error {
onWrite: t.setResetPingStrikes,
}
- success, err := t.controlBuf.execute(t.checkForHeaderListSize, trailingHeader)
+ success, err := t.controlBuf.executeAndPut(func() bool {
+ return t.checkForHeaderListSize(trailingHeader)
+ }, nil)
if !success {
if err != nil {
return err
@@ -1112,27 +1117,37 @@ func (t *http2Server) WriteStatus(s *Stream, st *status.Status) error {
// Write converts the data into HTTP2 data frame and sends it out. Non-nil error
// is returns if it fails (e.g., framing error, transport error).
-func (t *http2Server) Write(s *Stream, hdr []byte, data []byte, opts *Options) error {
+func (t *http2Server) Write(s *Stream, hdr []byte, data mem.BufferSlice, opts *Options) error {
+ reader := data.Reader()
+
if !s.isHeaderSent() { // Headers haven't been written yet.
if err := t.WriteHeader(s, nil); err != nil {
+ _ = reader.Close()
return err
}
} else {
// Writing headers checks for this condition.
if s.getState() == streamDone {
+ _ = reader.Close()
return t.streamContextErr(s)
}
}
+
df := &dataFrame{
streamID: s.id,
h: hdr,
- d: data,
+ reader: reader,
onEachWrite: t.setResetPingStrikes,
}
- if err := s.wq.get(int32(len(hdr) + len(data))); err != nil {
+ if err := s.wq.get(int32(len(hdr) + df.reader.Remaining())); err != nil {
+ _ = reader.Close()
return t.streamContextErr(s)
}
- return t.controlBuf.put(df)
+ if err := t.controlBuf.put(df); err != nil {
+ _ = reader.Close()
+ return err
+ }
+ return nil
}
// keepalive running in a separate goroutine does the following:
diff --git a/vendor/google.golang.org/grpc/internal/transport/http_util.go b/vendor/google.golang.org/grpc/internal/transport/http_util.go
index 39cef3bd4..f609c6c66 100644
--- a/vendor/google.golang.org/grpc/internal/transport/http_util.go
+++ b/vendor/google.golang.org/grpc/internal/transport/http_util.go
@@ -317,28 +317,32 @@ func newBufWriter(conn net.Conn, batchSize int, pool *sync.Pool) *bufWriter {
return w
}
-func (w *bufWriter) Write(b []byte) (n int, err error) {
+func (w *bufWriter) Write(b []byte) (int, error) {
if w.err != nil {
return 0, w.err
}
if w.batchSize == 0 { // Buffer has been disabled.
- n, err = w.conn.Write(b)
+ n, err := w.conn.Write(b)
return n, toIOError(err)
}
if w.buf == nil {
b := w.pool.Get().(*[]byte)
w.buf = *b
}
+ written := 0
for len(b) > 0 {
- nn := copy(w.buf[w.offset:], b)
- b = b[nn:]
- w.offset += nn
- n += nn
- if w.offset >= w.batchSize {
- err = w.flushKeepBuffer()
+ copied := copy(w.buf[w.offset:], b)
+ b = b[copied:]
+ written += copied
+ w.offset += copied
+ if w.offset < w.batchSize {
+ continue
+ }
+ if err := w.flushKeepBuffer(); err != nil {
+ return written, err
}
}
- return n, err
+ return written, nil
}
func (w *bufWriter) Flush() error {
diff --git a/vendor/google.golang.org/grpc/internal/transport/proxy.go b/vendor/google.golang.org/grpc/internal/transport/proxy.go
index 24fa10325..54b224436 100644
--- a/vendor/google.golang.org/grpc/internal/transport/proxy.go
+++ b/vendor/google.golang.org/grpc/internal/transport/proxy.go
@@ -107,8 +107,14 @@ func doHTTPConnectHandshake(ctx context.Context, conn net.Conn, backendAddr stri
}
return nil, fmt.Errorf("failed to do connect handshake, response: %q", dump)
}
-
- return &bufConn{Conn: conn, r: r}, nil
+ // The buffer could contain extra bytes from the target server, so we can't
+ // discard it. However, in many cases where the server waits for the client
+ // to send the first message (e.g. when TLS is being used), the buffer will
+ // be empty, so we can avoid the overhead of reading through this buffer.
+ if r.Buffered() != 0 {
+ return &bufConn{Conn: conn, r: r}, nil
+ }
+ return conn, nil
}
// proxyDial dials, connecting to a proxy first if necessary. Checks if a proxy
diff --git a/vendor/google.golang.org/grpc/internal/transport/transport.go b/vendor/google.golang.org/grpc/internal/transport/transport.go
index 4b39c0ade..fdd6fa86c 100644
--- a/vendor/google.golang.org/grpc/internal/transport/transport.go
+++ b/vendor/google.golang.org/grpc/internal/transport/transport.go
@@ -22,7 +22,6 @@
package transport
import (
- "bytes"
"context"
"errors"
"fmt"
@@ -37,6 +36,7 @@ import (
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/internal/channelz"
"google.golang.org/grpc/keepalive"
+ "google.golang.org/grpc/mem"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/peer"
"google.golang.org/grpc/resolver"
@@ -47,32 +47,10 @@ import (
const logLevel = 2
-type bufferPool struct {
- pool sync.Pool
-}
-
-func newBufferPool() *bufferPool {
- return &bufferPool{
- pool: sync.Pool{
- New: func() any {
- return new(bytes.Buffer)
- },
- },
- }
-}
-
-func (p *bufferPool) get() *bytes.Buffer {
- return p.pool.Get().(*bytes.Buffer)
-}
-
-func (p *bufferPool) put(b *bytes.Buffer) {
- p.pool.Put(b)
-}
-
// recvMsg represents the received msg from the transport. All transport
// protocol specific info has been removed.
type recvMsg struct {
- buffer *bytes.Buffer
+ buffer mem.Buffer
// nil: received some data
// io.EOF: stream is completed. data is nil.
// other non-nil error: transport failure. data is nil.
@@ -102,6 +80,9 @@ func newRecvBuffer() *recvBuffer {
func (b *recvBuffer) put(r recvMsg) {
b.mu.Lock()
if b.err != nil {
+ // drop the buffer on the floor. Since b.err is not nil, any subsequent reads
+ // will always return an error, making this buffer inaccessible.
+ r.buffer.Free()
b.mu.Unlock()
// An error had occurred earlier, don't accept more
// data or errors.
@@ -148,45 +129,97 @@ type recvBufferReader struct {
ctx context.Context
ctxDone <-chan struct{} // cache of ctx.Done() (for performance).
recv *recvBuffer
- last *bytes.Buffer // Stores the remaining data in the previous calls.
+ last mem.Buffer // Stores the remaining data in the previous calls.
err error
- freeBuffer func(*bytes.Buffer)
}
-// Read reads the next len(p) bytes from last. If last is drained, it tries to
-// read additional data from recv. It blocks if there no additional data available
-// in recv. If Read returns any non-nil error, it will continue to return that error.
-func (r *recvBufferReader) Read(p []byte) (n int, err error) {
+func (r *recvBufferReader) ReadHeader(header []byte) (n int, err error) {
if r.err != nil {
return 0, r.err
}
if r.last != nil {
- // Read remaining data left in last call.
- copied, _ := r.last.Read(p)
- if r.last.Len() == 0 {
- r.freeBuffer(r.last)
+ n, r.last = mem.ReadUnsafe(header, r.last)
+ return n, nil
+ }
+ if r.closeStream != nil {
+ n, r.err = r.readHeaderClient(header)
+ } else {
+ n, r.err = r.readHeader(header)
+ }
+ return n, r.err
+}
+
+// Read reads the next n bytes from last. If last is drained, it tries to read
+// additional data from recv. It blocks if there no additional data available in
+// recv. If Read returns any non-nil error, it will continue to return that
+// error.
+func (r *recvBufferReader) Read(n int) (buf mem.Buffer, err error) {
+ if r.err != nil {
+ return nil, r.err
+ }
+ if r.last != nil {
+ buf = r.last
+ if r.last.Len() > n {
+ buf, r.last = mem.SplitUnsafe(buf, n)
+ } else {
r.last = nil
}
- return copied, nil
+ return buf, nil
}
if r.closeStream != nil {
- n, r.err = r.readClient(p)
+ buf, r.err = r.readClient(n)
} else {
- n, r.err = r.read(p)
+ buf, r.err = r.read(n)
}
- return n, r.err
+ return buf, r.err
}
-func (r *recvBufferReader) read(p []byte) (n int, err error) {
+func (r *recvBufferReader) readHeader(header []byte) (n int, err error) {
select {
case <-r.ctxDone:
return 0, ContextErr(r.ctx.Err())
case m := <-r.recv.get():
- return r.readAdditional(m, p)
+ return r.readHeaderAdditional(m, header)
+ }
+}
+
+func (r *recvBufferReader) read(n int) (buf mem.Buffer, err error) {
+ select {
+ case <-r.ctxDone:
+ return nil, ContextErr(r.ctx.Err())
+ case m := <-r.recv.get():
+ return r.readAdditional(m, n)
+ }
+}
+
+func (r *recvBufferReader) readHeaderClient(header []byte) (n int, err error) {
+ // If the context is canceled, then closes the stream with nil metadata.
+ // closeStream writes its error parameter to r.recv as a recvMsg.
+ // r.readAdditional acts on that message and returns the necessary error.
+ select {
+ case <-r.ctxDone:
+ // Note that this adds the ctx error to the end of recv buffer, and
+ // reads from the head. This will delay the error until recv buffer is
+ // empty, thus will delay ctx cancellation in Recv().
+ //
+ // It's done this way to fix a race between ctx cancel and trailer. The
+ // race was, stream.Recv() may return ctx error if ctxDone wins the
+ // race, but stream.Trailer() may return a non-nil md because the stream
+ // was not marked as done when trailer is received. This closeStream
+ // call will mark stream as done, thus fix the race.
+ //
+ // TODO: delaying ctx error seems like a unnecessary side effect. What
+ // we really want is to mark the stream as done, and return ctx error
+ // faster.
+ r.closeStream(ContextErr(r.ctx.Err()))
+ m := <-r.recv.get()
+ return r.readHeaderAdditional(m, header)
+ case m := <-r.recv.get():
+ return r.readHeaderAdditional(m, header)
}
}
-func (r *recvBufferReader) readClient(p []byte) (n int, err error) {
+func (r *recvBufferReader) readClient(n int) (buf mem.Buffer, err error) {
// If the context is canceled, then closes the stream with nil metadata.
// closeStream writes its error parameter to r.recv as a recvMsg.
// r.readAdditional acts on that message and returns the necessary error.
@@ -207,25 +240,40 @@ func (r *recvBufferReader) readClient(p []byte) (n int, err error) {
// faster.
r.closeStream(ContextErr(r.ctx.Err()))
m := <-r.recv.get()
- return r.readAdditional(m, p)
+ return r.readAdditional(m, n)
case m := <-r.recv.get():
- return r.readAdditional(m, p)
+ return r.readAdditional(m, n)
}
}
-func (r *recvBufferReader) readAdditional(m recvMsg, p []byte) (n int, err error) {
+func (r *recvBufferReader) readHeaderAdditional(m recvMsg, header []byte) (n int, err error) {
r.recv.load()
if m.err != nil {
+ if m.buffer != nil {
+ m.buffer.Free()
+ }
return 0, m.err
}
- copied, _ := m.buffer.Read(p)
- if m.buffer.Len() == 0 {
- r.freeBuffer(m.buffer)
- r.last = nil
- } else {
- r.last = m.buffer
+
+ n, r.last = mem.ReadUnsafe(header, m.buffer)
+
+ return n, nil
+}
+
+func (r *recvBufferReader) readAdditional(m recvMsg, n int) (b mem.Buffer, err error) {
+ r.recv.load()
+ if m.err != nil {
+ if m.buffer != nil {
+ m.buffer.Free()
+ }
+ return nil, m.err
+ }
+
+ if m.buffer.Len() > n {
+ m.buffer, r.last = mem.SplitUnsafe(m.buffer, n)
}
- return copied, nil
+
+ return m.buffer, nil
}
type streamState uint32
@@ -241,7 +289,7 @@ const (
type Stream struct {
id uint32
st ServerTransport // nil for client side Stream
- ct *http2Client // nil for server side Stream
+ ct ClientTransport // nil for server side Stream
ctx context.Context // the associated context of the stream
cancel context.CancelFunc // always nil for client side Stream
done chan struct{} // closed at the end of stream to unblock writers. On the client side.
@@ -251,7 +299,7 @@ type Stream struct {
recvCompress string
sendCompress string
buf *recvBuffer
- trReader io.Reader
+ trReader *transportReader
fc *inFlow
wq *writeQuota
@@ -408,7 +456,7 @@ func (s *Stream) TrailersOnly() bool {
return s.noHeaders
}
-// Trailer returns the cached trailer metedata. Note that if it is not called
+// Trailer returns the cached trailer metadata. Note that if it is not called
// after the entire stream is done, it could return an empty MD. Client
// side only.
// It can be safely read only after stream has ended that is either read
@@ -499,36 +547,87 @@ func (s *Stream) write(m recvMsg) {
s.buf.put(m)
}
-// Read reads all p bytes from the wire for this stream.
-func (s *Stream) Read(p []byte) (n int, err error) {
+func (s *Stream) ReadHeader(header []byte) (err error) {
+ // Don't request a read if there was an error earlier
+ if er := s.trReader.er; er != nil {
+ return er
+ }
+ s.requestRead(len(header))
+ for len(header) != 0 {
+ n, err := s.trReader.ReadHeader(header)
+ header = header[n:]
+ if len(header) == 0 {
+ err = nil
+ }
+ if err != nil {
+ if n > 0 && err == io.EOF {
+ err = io.ErrUnexpectedEOF
+ }
+ return err
+ }
+ }
+ return nil
+}
+
+// Read reads n bytes from the wire for this stream.
+func (s *Stream) Read(n int) (data mem.BufferSlice, err error) {
// Don't request a read if there was an error earlier
- if er := s.trReader.(*transportReader).er; er != nil {
- return 0, er
+ if er := s.trReader.er; er != nil {
+ return nil, er
}
- s.requestRead(len(p))
- return io.ReadFull(s.trReader, p)
+ s.requestRead(n)
+ for n != 0 {
+ buf, err := s.trReader.Read(n)
+ var bufLen int
+ if buf != nil {
+ bufLen = buf.Len()
+ }
+ n -= bufLen
+ if n == 0 {
+ err = nil
+ }
+ if err != nil {
+ if bufLen > 0 && err == io.EOF {
+ err = io.ErrUnexpectedEOF
+ }
+ data.Free()
+ return nil, err
+ }
+ data = append(data, buf)
+ }
+ return data, nil
}
-// tranportReader reads all the data available for this Stream from the transport and
+// transportReader reads all the data available for this Stream from the transport and
// passes them into the decoder, which converts them into a gRPC message stream.
// The error is io.EOF when the stream is done or another non-nil error if
// the stream broke.
type transportReader struct {
- reader io.Reader
+ reader *recvBufferReader
// The handler to control the window update procedure for both this
// particular stream and the associated transport.
windowHandler func(int)
er error
}
-func (t *transportReader) Read(p []byte) (n int, err error) {
- n, err = t.reader.Read(p)
+func (t *transportReader) ReadHeader(header []byte) (int, error) {
+ n, err := t.reader.ReadHeader(header)
if err != nil {
t.er = err
- return
+ return 0, err
+ }
+ t.windowHandler(len(header))
+ return n, nil
+}
+
+func (t *transportReader) Read(n int) (mem.Buffer, error) {
+ buf, err := t.reader.Read(n)
+ if err != nil {
+ t.er = err
+ return buf, err
}
- t.windowHandler(n)
- return
+ t.windowHandler(buf.Len())
+ return buf, nil
}
// BytesReceived indicates whether any bytes have been received on this stream.
@@ -574,6 +673,7 @@ type ServerConfig struct {
ChannelzParent *channelz.Server
MaxHeaderListSize *uint32
HeaderTableSize *uint32
+ BufferPool mem.BufferPool
}
// ConnectOptions covers all relevant options for communicating with the server.
@@ -612,6 +712,8 @@ type ConnectOptions struct {
MaxHeaderListSize *uint32
// UseProxy specifies if a proxy should be used.
UseProxy bool
+ // The mem.BufferPool to use when reading/writing to the wire.
+ BufferPool mem.BufferPool
}
// NewClientTransport establishes the transport with the required ConnectOptions
@@ -673,7 +775,7 @@ type ClientTransport interface {
// Write sends the data for the given stream. A nil stream indicates
// the write is to be performed on the transport as a whole.
- Write(s *Stream, hdr []byte, data []byte, opts *Options) error
+ Write(s *Stream, hdr []byte, data mem.BufferSlice, opts *Options) error
// NewStream creates a Stream for an RPC.
NewStream(ctx context.Context, callHdr *CallHdr) (*Stream, error)
@@ -725,7 +827,7 @@ type ServerTransport interface {
// Write sends the data for the given stream.
// Write may not be called on all streams.
- Write(s *Stream, hdr []byte, data []byte, opts *Options) error
+ Write(s *Stream, hdr []byte, data mem.BufferSlice, opts *Options) error
// WriteStatus sends the status of a stream to the client. WriteStatus is
// the final call made on a stream and always occurs.
@@ -798,7 +900,7 @@ var (
// connection is draining. This could be caused by goaway or balancer
// removing the address.
errStreamDrain = status.Error(codes.Unavailable, "the connection is draining")
- // errStreamDone is returned from write at the client side to indiacte application
+ // errStreamDone is returned from write at the client side to indicate application
// layer of an error.
errStreamDone = errors.New("the stream is done")
// StatusGoAway indicates that the server sent a GOAWAY that included this