summaryrefslogtreecommitdiff
path: root/vendor/google.golang.org/grpc/rpc_util.go
diff options
context:
space:
mode:
authorLibravatar Dominik Süß <dominik@suess.wtf>2025-02-06 12:14:37 +0100
committerLibravatar GitHub <noreply@github.com>2025-02-06 12:14:37 +0100
commitdd094e401282e135989f57c0ca3dee7dea3f5207 (patch)
tree74cb77830f621840273255a17565ced73b4fa997 /vendor/google.golang.org/grpc/rpc_util.go
parent[feature] Use `X-Robots-Tag` headers to instruct scrapers/crawlers (#3737) (diff)
downloadgotosocial-dd094e401282e135989f57c0ca3dee7dea3f5207.tar.xz
[chore] update otel libraries (#3740)
* chore: update otel dependencies * refactor: combine tracing & metrics in observability package * chore: update example tracing compose file
Diffstat (limited to 'vendor/google.golang.org/grpc/rpc_util.go')
-rw-r--r--vendor/google.golang.org/grpc/rpc_util.go79
1 files changed, 28 insertions, 51 deletions
diff --git a/vendor/google.golang.org/grpc/rpc_util.go b/vendor/google.golang.org/grpc/rpc_util.go
index db8865ec3..9fac2b08b 100644
--- a/vendor/google.golang.org/grpc/rpc_util.go
+++ b/vendor/google.golang.org/grpc/rpc_util.go
@@ -220,8 +220,8 @@ type HeaderCallOption struct {
HeaderAddr *metadata.MD
}
-func (o HeaderCallOption) before(c *callInfo) error { return nil }
-func (o HeaderCallOption) after(c *callInfo, attempt *csAttempt) {
+func (o HeaderCallOption) before(*callInfo) error { return nil }
+func (o HeaderCallOption) after(_ *callInfo, attempt *csAttempt) {
*o.HeaderAddr, _ = attempt.s.Header()
}
@@ -242,8 +242,8 @@ type TrailerCallOption struct {
TrailerAddr *metadata.MD
}
-func (o TrailerCallOption) before(c *callInfo) error { return nil }
-func (o TrailerCallOption) after(c *callInfo, attempt *csAttempt) {
+func (o TrailerCallOption) before(*callInfo) error { return nil }
+func (o TrailerCallOption) after(_ *callInfo, attempt *csAttempt) {
*o.TrailerAddr = attempt.s.Trailer()
}
@@ -264,8 +264,8 @@ type PeerCallOption struct {
PeerAddr *peer.Peer
}
-func (o PeerCallOption) before(c *callInfo) error { return nil }
-func (o PeerCallOption) after(c *callInfo, attempt *csAttempt) {
+func (o PeerCallOption) before(*callInfo) error { return nil }
+func (o PeerCallOption) after(_ *callInfo, attempt *csAttempt) {
if x, ok := peer.FromContext(attempt.s.Context()); ok {
*o.PeerAddr = *x
}
@@ -304,7 +304,7 @@ func (o FailFastCallOption) before(c *callInfo) error {
c.failFast = o.FailFast
return nil
}
-func (o FailFastCallOption) after(c *callInfo, attempt *csAttempt) {}
+func (o FailFastCallOption) after(*callInfo, *csAttempt) {}
// OnFinish returns a CallOption that configures a callback to be called when
// the call completes. The error passed to the callback is the status of the
@@ -339,7 +339,7 @@ func (o OnFinishCallOption) before(c *callInfo) error {
return nil
}
-func (o OnFinishCallOption) after(c *callInfo, attempt *csAttempt) {}
+func (o OnFinishCallOption) after(*callInfo, *csAttempt) {}
// MaxCallRecvMsgSize returns a CallOption which sets the maximum message size
// in bytes the client can receive. If this is not set, gRPC uses the default
@@ -363,7 +363,7 @@ func (o MaxRecvMsgSizeCallOption) before(c *callInfo) error {
c.maxReceiveMessageSize = &o.MaxRecvMsgSize
return nil
}
-func (o MaxRecvMsgSizeCallOption) after(c *callInfo, attempt *csAttempt) {}
+func (o MaxRecvMsgSizeCallOption) after(*callInfo, *csAttempt) {}
// MaxCallSendMsgSize returns a CallOption which sets the maximum message size
// in bytes the client can send. If this is not set, gRPC uses the default
@@ -387,7 +387,7 @@ func (o MaxSendMsgSizeCallOption) before(c *callInfo) error {
c.maxSendMessageSize = &o.MaxSendMsgSize
return nil
}
-func (o MaxSendMsgSizeCallOption) after(c *callInfo, attempt *csAttempt) {}
+func (o MaxSendMsgSizeCallOption) after(*callInfo, *csAttempt) {}
// PerRPCCredentials returns a CallOption that sets credentials.PerRPCCredentials
// for a call.
@@ -410,7 +410,7 @@ func (o PerRPCCredsCallOption) before(c *callInfo) error {
c.creds = o.Creds
return nil
}
-func (o PerRPCCredsCallOption) after(c *callInfo, attempt *csAttempt) {}
+func (o PerRPCCredsCallOption) after(*callInfo, *csAttempt) {}
// UseCompressor returns a CallOption which sets the compressor used when
// sending the request. If WithCompressor is also set, UseCompressor has
@@ -438,7 +438,7 @@ func (o CompressorCallOption) before(c *callInfo) error {
c.compressorType = o.CompressorType
return nil
}
-func (o CompressorCallOption) after(c *callInfo, attempt *csAttempt) {}
+func (o CompressorCallOption) after(*callInfo, *csAttempt) {}
// CallContentSubtype returns a CallOption that will set the content-subtype
// for a call. For example, if content-subtype is "json", the Content-Type over
@@ -475,7 +475,7 @@ func (o ContentSubtypeCallOption) before(c *callInfo) error {
c.contentSubtype = o.ContentSubtype
return nil
}
-func (o ContentSubtypeCallOption) after(c *callInfo, attempt *csAttempt) {}
+func (o ContentSubtypeCallOption) after(*callInfo, *csAttempt) {}
// ForceCodec returns a CallOption that will set codec to be used for all
// request and response messages for a call. The result of calling Name() will
@@ -514,7 +514,7 @@ func (o ForceCodecCallOption) before(c *callInfo) error {
c.codec = newCodecV1Bridge(o.Codec)
return nil
}
-func (o ForceCodecCallOption) after(c *callInfo, attempt *csAttempt) {}
+func (o ForceCodecCallOption) after(*callInfo, *csAttempt) {}
// ForceCodecV2 returns a CallOption that will set codec to be used for all
// request and response messages for a call. The result of calling Name() will
@@ -554,7 +554,7 @@ func (o ForceCodecV2CallOption) before(c *callInfo) error {
return nil
}
-func (o ForceCodecV2CallOption) after(c *callInfo, attempt *csAttempt) {}
+func (o ForceCodecV2CallOption) after(*callInfo, *csAttempt) {}
// CallCustomCodec behaves like ForceCodec, but accepts a grpc.Codec instead of
// an encoding.Codec.
@@ -579,7 +579,7 @@ func (o CustomCodecCallOption) before(c *callInfo) error {
c.codec = newCodecV0Bridge(o.Codec)
return nil
}
-func (o CustomCodecCallOption) after(c *callInfo, attempt *csAttempt) {}
+func (o CustomCodecCallOption) after(*callInfo, *csAttempt) {}
// MaxRetryRPCBufferSize returns a CallOption that limits the amount of memory
// used for buffering this RPC's requests for retry purposes.
@@ -607,7 +607,7 @@ func (o MaxRetryRPCBufferSizeCallOption) before(c *callInfo) error {
c.maxRetryRPCBufferSize = o.MaxRetryRPCBufferSize
return nil
}
-func (o MaxRetryRPCBufferSizeCallOption) after(c *callInfo, attempt *csAttempt) {}
+func (o MaxRetryRPCBufferSizeCallOption) after(*callInfo, *csAttempt) {}
// The format of the payload: compressed or not?
type payloadFormat uint8
@@ -622,7 +622,7 @@ func (pf payloadFormat) isCompressed() bool {
}
type streamReader interface {
- ReadHeader(header []byte) error
+ ReadMessageHeader(header []byte) error
Read(n int) (mem.BufferSlice, error)
}
@@ -656,7 +656,7 @@ type parser struct {
// that the underlying streamReader must not return an incompatible
// error.
func (p *parser) recvMsg(maxReceiveMessageSize int) (payloadFormat, mem.BufferSlice, error) {
- err := p.r.ReadHeader(p.header[:])
+ err := p.r.ReadMessageHeader(p.header[:])
if err != nil {
return 0, nil, err
}
@@ -664,9 +664,6 @@ func (p *parser) recvMsg(maxReceiveMessageSize int) (payloadFormat, mem.BufferSl
pf := payloadFormat(p.header[0])
length := binary.BigEndian.Uint32(p.header[1:])
- if length == 0 {
- return pf, nil, nil
- }
if int64(length) > int64(maxInt) {
return 0, nil, status.Errorf(codes.ResourceExhausted, "grpc: received message larger than max length allowed on current machine (%d vs. %d)", length, maxInt)
}
@@ -791,9 +788,8 @@ func checkRecvPayload(pf payloadFormat, recvCompress string, haveCompressor bool
if !haveCompressor {
if isServer {
return status.Newf(codes.Unimplemented, "grpc: Decompressor is not installed for grpc-encoding %q", recvCompress)
- } else {
- return status.Newf(codes.Internal, "grpc: Decompressor is not installed for grpc-encoding %q", recvCompress)
}
+ return status.Newf(codes.Internal, "grpc: Decompressor is not installed for grpc-encoding %q", recvCompress)
}
default:
return status.Newf(codes.Internal, "grpc: received unexpected payload format %d", pf)
@@ -818,7 +814,7 @@ func (p *payloadInfo) free() {
// the buffer is no longer needed.
// TODO: Refactor this function to reduce the number of arguments.
// See: https://google.github.io/styleguide/go/best-practices.html#function-argument-lists
-func recvAndDecompress(p *parser, s *transport.Stream, dc Decompressor, maxReceiveMessageSize int, payInfo *payloadInfo, compressor encoding.Compressor, isServer bool,
+func recvAndDecompress(p *parser, s recvCompressor, dc Decompressor, maxReceiveMessageSize int, payInfo *payloadInfo, compressor encoding.Compressor, isServer bool,
) (out mem.BufferSlice, err error) {
pf, compressed, err := p.recvMsg(maxReceiveMessageSize)
if err != nil {
@@ -842,7 +838,7 @@ func recvAndDecompress(p *parser, s *transport.Stream, dc Decompressor, maxRecei
var uncompressedBuf []byte
uncompressedBuf, err = dc.Do(compressed.Reader())
if err == nil {
- out = mem.BufferSlice{mem.NewBuffer(&uncompressedBuf, nil)}
+ out = mem.BufferSlice{mem.SliceBuffer(uncompressedBuf)}
}
size = len(uncompressedBuf)
} else {
@@ -878,30 +874,7 @@ func decompress(compressor encoding.Compressor, d mem.BufferSlice, maxReceiveMes
return nil, 0, err
}
- // TODO: Can/should this still be preserved with the new BufferSlice API? Are
- // there any actual benefits to allocating a single large buffer instead of
- // multiple smaller ones?
- //if sizer, ok := compressor.(interface {
- // DecompressedSize(compressedBytes []byte) int
- //}); ok {
- // if size := sizer.DecompressedSize(d); size >= 0 {
- // if size > maxReceiveMessageSize {
- // return nil, size, nil
- // }
- // // size is used as an estimate to size the buffer, but we
- // // will read more data if available.
- // // +MinRead so ReadFrom will not reallocate if size is correct.
- // //
- // // TODO: If we ensure that the buffer size is the same as the DecompressedSize,
- // // we can also utilize the recv buffer pool here.
- // buf := bytes.NewBuffer(make([]byte, 0, size+bytes.MinRead))
- // bytesRead, err := buf.ReadFrom(io.LimitReader(dcReader, int64(maxReceiveMessageSize)+1))
- // return buf.Bytes(), int(bytesRead), err
- // }
- //}
-
- var out mem.BufferSlice
- _, err = io.Copy(mem.NewWriter(&out, pool), io.LimitReader(dcReader, int64(maxReceiveMessageSize)+1))
+ out, err := mem.ReadAll(io.LimitReader(dcReader, int64(maxReceiveMessageSize)+1), pool)
if err != nil {
out.Free()
return nil, 0, err
@@ -909,10 +882,14 @@ func decompress(compressor encoding.Compressor, d mem.BufferSlice, maxReceiveMes
return out, out.Len(), nil
}
+type recvCompressor interface {
+ RecvCompress() string
+}
+
// For the two compressor parameters, both should not be set, but if they are,
// dc takes precedence over compressor.
// TODO(dfawley): wrap the old compressor/decompressor using the new API?
-func recv(p *parser, c baseCodec, s *transport.Stream, dc Decompressor, m any, maxReceiveMessageSize int, payInfo *payloadInfo, compressor encoding.Compressor, isServer bool) error {
+func recv(p *parser, c baseCodec, s recvCompressor, dc Decompressor, m any, maxReceiveMessageSize int, payInfo *payloadInfo, compressor encoding.Compressor, isServer bool) error {
data, err := recvAndDecompress(p, s, dc, maxReceiveMessageSize, payInfo, compressor, isServer)
if err != nil {
return err