summaryrefslogtreecommitdiff
path: root/vendor/google.golang.org/grpc/stream.go
diff options
context:
space:
mode:
Diffstat (limited to 'vendor/google.golang.org/grpc/stream.go')
-rw-r--r--vendor/google.golang.org/grpc/stream.go213
1 files changed, 136 insertions, 77 deletions
diff --git a/vendor/google.golang.org/grpc/stream.go b/vendor/google.golang.org/grpc/stream.go
index 8051ef5b5..bb2b2a216 100644
--- a/vendor/google.golang.org/grpc/stream.go
+++ b/vendor/google.golang.org/grpc/stream.go
@@ -41,6 +41,7 @@ import (
"google.golang.org/grpc/internal/serviceconfig"
istatus "google.golang.org/grpc/internal/status"
"google.golang.org/grpc/internal/transport"
+ "google.golang.org/grpc/mem"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/peer"
"google.golang.org/grpc/stats"
@@ -359,7 +360,7 @@ func newClientStreamWithParams(ctx context.Context, desc *StreamDesc, cc *Client
cs.attempt = a
return nil
}
- if err := cs.withRetry(op, func() { cs.bufferForRetryLocked(0, op) }); err != nil {
+ if err := cs.withRetry(op, func() { cs.bufferForRetryLocked(0, op, nil) }); err != nil {
return nil, err
}
@@ -517,7 +518,7 @@ func (a *csAttempt) newStream() error {
}
a.s = s
a.ctx = s.Context()
- a.p = &parser{r: s, recvBufferPool: a.cs.cc.dopts.recvBufferPool}
+ a.p = &parser{r: s, bufferPool: a.cs.cc.dopts.copts.BufferPool}
return nil
}
@@ -566,10 +567,15 @@ type clientStream struct {
// place where we need to check if the attempt is nil.
attempt *csAttempt
// TODO(hedging): hedging will have multiple attempts simultaneously.
- committed bool // active attempt committed for retry?
- onCommit func()
- buffer []func(a *csAttempt) error // operations to replay on retry
- bufferSize int // current size of buffer
+ committed bool // active attempt committed for retry?
+ onCommit func()
+ replayBuffer []replayOp // operations to replay on retry
+ replayBufferSize int // current size of replayBuffer
+}
+
+type replayOp struct {
+ op func(a *csAttempt) error
+ cleanup func()
}
// csAttempt implements a single transport stream attempt within a
@@ -607,7 +613,12 @@ func (cs *clientStream) commitAttemptLocked() {
cs.onCommit()
}
cs.committed = true
- cs.buffer = nil
+ for _, op := range cs.replayBuffer {
+ if op.cleanup != nil {
+ op.cleanup()
+ }
+ }
+ cs.replayBuffer = nil
}
func (cs *clientStream) commitAttempt() {
@@ -732,7 +743,7 @@ func (cs *clientStream) retryLocked(attempt *csAttempt, lastErr error) error {
// the stream is canceled.
return err
}
- // Note that the first op in the replay buffer always sets cs.attempt
+ // Note that the first op in replayBuffer always sets cs.attempt
// if it is able to pick a transport and create a stream.
if lastErr = cs.replayBufferLocked(attempt); lastErr == nil {
return nil
@@ -761,7 +772,7 @@ func (cs *clientStream) withRetry(op func(a *csAttempt) error, onSuccess func())
// already be status errors.
return toRPCErr(op(cs.attempt))
}
- if len(cs.buffer) == 0 {
+ if len(cs.replayBuffer) == 0 {
// For the first op, which controls creation of the stream and
// assigns cs.attempt, we need to create a new attempt inline
// before executing the first op. On subsequent ops, the attempt
@@ -851,25 +862,26 @@ func (cs *clientStream) Trailer() metadata.MD {
}
func (cs *clientStream) replayBufferLocked(attempt *csAttempt) error {
- for _, f := range cs.buffer {
- if err := f(attempt); err != nil {
+ for _, f := range cs.replayBuffer {
+ if err := f.op(attempt); err != nil {
return err
}
}
return nil
}
-func (cs *clientStream) bufferForRetryLocked(sz int, op func(a *csAttempt) error) {
+func (cs *clientStream) bufferForRetryLocked(sz int, op func(a *csAttempt) error, cleanup func()) {
// Note: we still will buffer if retry is disabled (for transparent retries).
if cs.committed {
return
}
- cs.bufferSize += sz
- if cs.bufferSize > cs.callInfo.maxRetryRPCBufferSize {
+ cs.replayBufferSize += sz
+ if cs.replayBufferSize > cs.callInfo.maxRetryRPCBufferSize {
cs.commitAttemptLocked()
+ cleanup()
return
}
- cs.buffer = append(cs.buffer, op)
+ cs.replayBuffer = append(cs.replayBuffer, replayOp{op: op, cleanup: cleanup})
}
func (cs *clientStream) SendMsg(m any) (err error) {
@@ -891,23 +903,50 @@ func (cs *clientStream) SendMsg(m any) (err error) {
}
// load hdr, payload, data
- hdr, payload, data, err := prepareMsg(m, cs.codec, cs.cp, cs.comp)
+ hdr, data, payload, pf, err := prepareMsg(m, cs.codec, cs.cp, cs.comp, cs.cc.dopts.copts.BufferPool)
if err != nil {
return err
}
+ defer func() {
+ data.Free()
+ // only free payload if compression was made, and therefore it is a different set
+ // of buffers from data.
+ if pf.isCompressed() {
+ payload.Free()
+ }
+ }()
+
+ dataLen := data.Len()
+ payloadLen := payload.Len()
// TODO(dfawley): should we be checking len(data) instead?
- if len(payload) > *cs.callInfo.maxSendMessageSize {
- return status.Errorf(codes.ResourceExhausted, "trying to send message larger than max (%d vs. %d)", len(payload), *cs.callInfo.maxSendMessageSize)
+ if payloadLen > *cs.callInfo.maxSendMessageSize {
+ return status.Errorf(codes.ResourceExhausted, "trying to send message larger than max (%d vs. %d)", payloadLen, *cs.callInfo.maxSendMessageSize)
}
+
+ // always take an extra ref in case data == payload (i.e. when the data isn't
+ // compressed). The original ref will always be freed by the deferred free above.
+ payload.Ref()
op := func(a *csAttempt) error {
- return a.sendMsg(m, hdr, payload, data)
+ return a.sendMsg(m, hdr, payload, dataLen, payloadLen)
+ }
+
+ // onSuccess is invoked when the op is captured for a subsequent retry. If the
+ // stream was established by a previous message and therefore retries are
+ // disabled, onSuccess will not be invoked, and payloadRef can be freed
+ // immediately.
+ onSuccessCalled := false
+ err = cs.withRetry(op, func() {
+ cs.bufferForRetryLocked(len(hdr)+payloadLen, op, payload.Free)
+ onSuccessCalled = true
+ })
+ if !onSuccessCalled {
+ payload.Free()
}
- err = cs.withRetry(op, func() { cs.bufferForRetryLocked(len(hdr)+len(payload), op) })
if len(cs.binlogs) != 0 && err == nil {
cm := &binarylog.ClientMessage{
OnClientSide: true,
- Message: data,
+ Message: data.Materialize(),
}
for _, binlog := range cs.binlogs {
binlog.Log(cs.ctx, cm)
@@ -924,6 +963,7 @@ func (cs *clientStream) RecvMsg(m any) error {
var recvInfo *payloadInfo
if len(cs.binlogs) != 0 {
recvInfo = &payloadInfo{}
+ defer recvInfo.free()
}
err := cs.withRetry(func(a *csAttempt) error {
return a.recvMsg(m, recvInfo)
@@ -931,7 +971,7 @@ func (cs *clientStream) RecvMsg(m any) error {
if len(cs.binlogs) != 0 && err == nil {
sm := &binarylog.ServerMessage{
OnClientSide: true,
- Message: recvInfo.uncompressedBytes,
+ Message: recvInfo.uncompressedBytes.Materialize(),
}
for _, binlog := range cs.binlogs {
binlog.Log(cs.ctx, sm)
@@ -958,7 +998,7 @@ func (cs *clientStream) CloseSend() error {
// RecvMsg. This also matches historical behavior.
return nil
}
- cs.withRetry(op, func() { cs.bufferForRetryLocked(0, op) })
+ cs.withRetry(op, func() { cs.bufferForRetryLocked(0, op, nil) })
if len(cs.binlogs) != 0 {
chc := &binarylog.ClientHalfClose{
OnClientSide: true,
@@ -1034,7 +1074,7 @@ func (cs *clientStream) finish(err error) {
cs.cancel()
}
-func (a *csAttempt) sendMsg(m any, hdr, payld, data []byte) error {
+func (a *csAttempt) sendMsg(m any, hdr []byte, payld mem.BufferSlice, dataLength, payloadLength int) error {
cs := a.cs
if a.trInfo != nil {
a.mu.Lock()
@@ -1052,8 +1092,10 @@ func (a *csAttempt) sendMsg(m any, hdr, payld, data []byte) error {
}
return io.EOF
}
- for _, sh := range a.statsHandlers {
- sh.HandleRPC(a.ctx, outPayload(true, m, data, payld, time.Now()))
+ if len(a.statsHandlers) != 0 {
+ for _, sh := range a.statsHandlers {
+ sh.HandleRPC(a.ctx, outPayload(true, m, dataLength, payloadLength, time.Now()))
+ }
}
if channelz.IsOn() {
a.t.IncrMsgSent()
@@ -1065,6 +1107,7 @@ func (a *csAttempt) recvMsg(m any, payInfo *payloadInfo) (err error) {
cs := a.cs
if len(a.statsHandlers) != 0 && payInfo == nil {
payInfo = &payloadInfo{}
+ defer payInfo.free()
}
if !a.decompSet {
@@ -1083,8 +1126,7 @@ func (a *csAttempt) recvMsg(m any, payInfo *payloadInfo) (err error) {
// Only initialize this state once per stream.
a.decompSet = true
}
- err = recv(a.p, cs.codec, a.s, a.dc, m, *cs.callInfo.maxReceiveMessageSize, payInfo, a.decomp)
- if err != nil {
+ if err := recv(a.p, cs.codec, a.s, a.dc, m, *cs.callInfo.maxReceiveMessageSize, payInfo, a.decomp, false); err != nil {
if err == io.EOF {
if statusErr := a.s.Status().Err(); statusErr != nil {
return statusErr
@@ -1103,14 +1145,12 @@ func (a *csAttempt) recvMsg(m any, payInfo *payloadInfo) (err error) {
}
for _, sh := range a.statsHandlers {
sh.HandleRPC(a.ctx, &stats.InPayload{
- Client: true,
- RecvTime: time.Now(),
- Payload: m,
- // TODO truncate large payload.
- Data: payInfo.uncompressedBytes,
+ Client: true,
+ RecvTime: time.Now(),
+ Payload: m,
WireLength: payInfo.compressedLength + headerLen,
CompressedLength: payInfo.compressedLength,
- Length: len(payInfo.uncompressedBytes),
+ Length: payInfo.uncompressedBytes.Len(),
})
}
if channelz.IsOn() {
@@ -1122,14 +1162,12 @@ func (a *csAttempt) recvMsg(m any, payInfo *payloadInfo) (err error) {
}
// Special handling for non-server-stream rpcs.
// This recv expects EOF or errors, so we don't collect inPayload.
- err = recv(a.p, cs.codec, a.s, a.dc, m, *cs.callInfo.maxReceiveMessageSize, nil, a.decomp)
- if err == nil {
- return toRPCErr(errors.New("grpc: client streaming protocol violation: get <nil>, want <EOF>"))
- }
- if err == io.EOF {
+ if err := recv(a.p, cs.codec, a.s, a.dc, m, *cs.callInfo.maxReceiveMessageSize, nil, a.decomp, false); err == io.EOF {
return a.s.Status().Err() // non-server streaming Recv returns nil on success
+ } else if err != nil {
+ return toRPCErr(err)
}
- return toRPCErr(err)
+ return toRPCErr(errors.New("grpc: client streaming protocol violation: get <nil>, want <EOF>"))
}
func (a *csAttempt) finish(err error) {
@@ -1185,12 +1223,12 @@ func (a *csAttempt) finish(err error) {
a.mu.Unlock()
}
-// newClientStream creates a ClientStream with the specified transport, on the
+// newNonRetryClientStream creates a ClientStream with the specified transport, on the
// given addrConn.
//
// It's expected that the given transport is either the same one in addrConn, or
// is already closed. To avoid race, transport is specified separately, instead
-// of using ac.transpot.
+// of using ac.transport.
//
// Main difference between this and ClientConn.NewStream:
// - no retry
@@ -1276,7 +1314,7 @@ func newNonRetryClientStream(ctx context.Context, desc *StreamDesc, method strin
return nil, err
}
as.s = s
- as.p = &parser{r: s, recvBufferPool: ac.dopts.recvBufferPool}
+ as.p = &parser{r: s, bufferPool: ac.dopts.copts.BufferPool}
ac.incrCallsStarted()
if desc != unaryStreamDesc {
// Listen on stream context to cleanup when the stream context is
@@ -1373,17 +1411,26 @@ func (as *addrConnStream) SendMsg(m any) (err error) {
}
// load hdr, payload, data
- hdr, payld, _, err := prepareMsg(m, as.codec, as.cp, as.comp)
+ hdr, data, payload, pf, err := prepareMsg(m, as.codec, as.cp, as.comp, as.ac.dopts.copts.BufferPool)
if err != nil {
return err
}
+ defer func() {
+ data.Free()
+ // only free payload if compression was made, and therefore it is a different set
+ // of buffers from data.
+ if pf.isCompressed() {
+ payload.Free()
+ }
+ }()
+
// TODO(dfawley): should we be checking len(data) instead?
- if len(payld) > *as.callInfo.maxSendMessageSize {
- return status.Errorf(codes.ResourceExhausted, "trying to send message larger than max (%d vs. %d)", len(payld), *as.callInfo.maxSendMessageSize)
+ if payload.Len() > *as.callInfo.maxSendMessageSize {
+ return status.Errorf(codes.ResourceExhausted, "trying to send message larger than max (%d vs. %d)", payload.Len(), *as.callInfo.maxSendMessageSize)
}
- if err := as.t.Write(as.s, hdr, payld, &transport.Options{Last: !as.desc.ClientStreams}); err != nil {
+ if err := as.t.Write(as.s, hdr, payload, &transport.Options{Last: !as.desc.ClientStreams}); err != nil {
if !as.desc.ClientStreams {
// For non-client-streaming RPCs, we return nil instead of EOF on error
// because the generated code requires it. finish is not called; RecvMsg()
@@ -1423,8 +1470,7 @@ func (as *addrConnStream) RecvMsg(m any) (err error) {
// Only initialize this state once per stream.
as.decompSet = true
}
- err = recv(as.p, as.codec, as.s, as.dc, m, *as.callInfo.maxReceiveMessageSize, nil, as.decomp)
- if err != nil {
+ if err := recv(as.p, as.codec, as.s, as.dc, m, *as.callInfo.maxReceiveMessageSize, nil, as.decomp, false); err != nil {
if err == io.EOF {
if statusErr := as.s.Status().Err(); statusErr != nil {
return statusErr
@@ -1444,14 +1490,12 @@ func (as *addrConnStream) RecvMsg(m any) (err error) {
// Special handling for non-server-stream rpcs.
// This recv expects EOF or errors, so we don't collect inPayload.
- err = recv(as.p, as.codec, as.s, as.dc, m, *as.callInfo.maxReceiveMessageSize, nil, as.decomp)
- if err == nil {
- return toRPCErr(errors.New("grpc: client streaming protocol violation: get <nil>, want <EOF>"))
- }
- if err == io.EOF {
+ if err := recv(as.p, as.codec, as.s, as.dc, m, *as.callInfo.maxReceiveMessageSize, nil, as.decomp, false); err == io.EOF {
return as.s.Status().Err() // non-server streaming Recv returns nil on success
+ } else if err != nil {
+ return toRPCErr(err)
}
- return toRPCErr(err)
+ return toRPCErr(errors.New("grpc: client streaming protocol violation: get <nil>, want <EOF>"))
}
func (as *addrConnStream) finish(err error) {
@@ -1645,18 +1689,31 @@ func (ss *serverStream) SendMsg(m any) (err error) {
}
// load hdr, payload, data
- hdr, payload, data, err := prepareMsg(m, ss.codec, ss.cp, ss.comp)
+ hdr, data, payload, pf, err := prepareMsg(m, ss.codec, ss.cp, ss.comp, ss.p.bufferPool)
if err != nil {
return err
}
+ defer func() {
+ data.Free()
+ // only free payload if compression was made, and therefore it is a different set
+ // of buffers from data.
+ if pf.isCompressed() {
+ payload.Free()
+ }
+ }()
+
+ dataLen := data.Len()
+ payloadLen := payload.Len()
+
// TODO(dfawley): should we be checking len(data) instead?
- if len(payload) > ss.maxSendMessageSize {
- return status.Errorf(codes.ResourceExhausted, "trying to send message larger than max (%d vs. %d)", len(payload), ss.maxSendMessageSize)
+ if payloadLen > ss.maxSendMessageSize {
+ return status.Errorf(codes.ResourceExhausted, "trying to send message larger than max (%d vs. %d)", payloadLen, ss.maxSendMessageSize)
}
if err := ss.t.Write(ss.s, hdr, payload, &transport.Options{Last: false}); err != nil {
return toRPCErr(err)
}
+
if len(ss.binlogs) != 0 {
if !ss.serverHeaderBinlogged {
h, _ := ss.s.Header()
@@ -1669,7 +1726,7 @@ func (ss *serverStream) SendMsg(m any) (err error) {
}
}
sm := &binarylog.ServerMessage{
- Message: data,
+ Message: data.Materialize(),
}
for _, binlog := range ss.binlogs {
binlog.Log(ss.ctx, sm)
@@ -1677,7 +1734,7 @@ func (ss *serverStream) SendMsg(m any) (err error) {
}
if len(ss.statsHandler) != 0 {
for _, sh := range ss.statsHandler {
- sh.HandleRPC(ss.s.Context(), outPayload(false, m, data, payload, time.Now()))
+ sh.HandleRPC(ss.s.Context(), outPayload(false, m, dataLen, payloadLen, time.Now()))
}
}
return nil
@@ -1714,8 +1771,9 @@ func (ss *serverStream) RecvMsg(m any) (err error) {
var payInfo *payloadInfo
if len(ss.statsHandler) != 0 || len(ss.binlogs) != 0 {
payInfo = &payloadInfo{}
+ defer payInfo.free()
}
- if err := recv(ss.p, ss.codec, ss.s, ss.dc, m, ss.maxReceiveMessageSize, payInfo, ss.decomp); err != nil {
+ if err := recv(ss.p, ss.codec, ss.s, ss.dc, m, ss.maxReceiveMessageSize, payInfo, ss.decomp, true); err != nil {
if err == io.EOF {
if len(ss.binlogs) != 0 {
chc := &binarylog.ClientHalfClose{}
@@ -1733,11 +1791,9 @@ func (ss *serverStream) RecvMsg(m any) (err error) {
if len(ss.statsHandler) != 0 {
for _, sh := range ss.statsHandler {
sh.HandleRPC(ss.s.Context(), &stats.InPayload{
- RecvTime: time.Now(),
- Payload: m,
- // TODO truncate large payload.
- Data: payInfo.uncompressedBytes,
- Length: len(payInfo.uncompressedBytes),
+ RecvTime: time.Now(),
+ Payload: m,
+ Length: payInfo.uncompressedBytes.Len(),
WireLength: payInfo.compressedLength + headerLen,
CompressedLength: payInfo.compressedLength,
})
@@ -1745,7 +1801,7 @@ func (ss *serverStream) RecvMsg(m any) (err error) {
}
if len(ss.binlogs) != 0 {
cm := &binarylog.ClientMessage{
- Message: payInfo.uncompressedBytes,
+ Message: payInfo.uncompressedBytes.Materialize(),
}
for _, binlog := range ss.binlogs {
binlog.Log(ss.ctx, cm)
@@ -1760,23 +1816,26 @@ func MethodFromServerStream(stream ServerStream) (string, bool) {
return Method(stream.Context())
}
-// prepareMsg returns the hdr, payload and data
-// using the compressors passed or using the
-// passed preparedmsg
-func prepareMsg(m any, codec baseCodec, cp Compressor, comp encoding.Compressor) (hdr, payload, data []byte, err error) {
+// prepareMsg returns the hdr, payload and data using the compressors passed or
+// using the passed preparedmsg. The returned boolean indicates whether
+// compression was made and therefore whether the payload needs to be freed in
+// addition to the returned data. Freeing the payload if the returned boolean is
+// false can lead to undefined behavior.
+func prepareMsg(m any, codec baseCodec, cp Compressor, comp encoding.Compressor, pool mem.BufferPool) (hdr []byte, data, payload mem.BufferSlice, pf payloadFormat, err error) {
if preparedMsg, ok := m.(*PreparedMsg); ok {
- return preparedMsg.hdr, preparedMsg.payload, preparedMsg.encodedData, nil
+ return preparedMsg.hdr, preparedMsg.encodedData, preparedMsg.payload, preparedMsg.pf, nil
}
// The input interface is not a prepared msg.
// Marshal and Compress the data at this point
data, err = encode(codec, m)
if err != nil {
- return nil, nil, nil, err
+ return nil, nil, nil, 0, err
}
- compData, err := compress(data, cp, comp)
+ compData, pf, err := compress(data, cp, comp, pool)
if err != nil {
- return nil, nil, nil, err
+ data.Free()
+ return nil, nil, nil, 0, err
}
- hdr, payload = msgHeader(data, compData)
- return hdr, payload, data, nil
+ hdr, payload = msgHeader(data, compData, pf)
+ return hdr, data, payload, pf, nil
}