summaryrefslogtreecommitdiff
path: root/vendor/google.golang.org/grpc/stream.go
diff options
context:
space:
mode:
authorLibravatar tobi <31960611+tsmethurst@users.noreply.github.com>2023-05-12 14:33:40 +0200
committerLibravatar GitHub <noreply@github.com>2023-05-12 14:33:40 +0200
commitec325fee141c1e9757144a0a4094061b56839b78 (patch)
tree2948ab4ef5702cc8478ab2be841340b946bdb867 /vendor/google.golang.org/grpc/stream.go
parent[frogend/bugfix] fix dynamicSpoiler elements (#1771) (diff)
downloadgotosocial-ec325fee141c1e9757144a0a4094061b56839b78.tar.xz
[chore] Update a bunch of database dependencies (#1772)
* [chore] Update a bunch of database dependencies * fix lil thing
Diffstat (limited to 'vendor/google.golang.org/grpc/stream.go')
-rw-r--r--vendor/google.golang.org/grpc/stream.go61
1 files changed, 42 insertions, 19 deletions
diff --git a/vendor/google.golang.org/grpc/stream.go b/vendor/google.golang.org/grpc/stream.go
index 93231af2a..d1226a412 100644
--- a/vendor/google.golang.org/grpc/stream.go
+++ b/vendor/google.golang.org/grpc/stream.go
@@ -168,10 +168,19 @@ func NewClientStream(ctx context.Context, desc *StreamDesc, cc *ClientConn, meth
}
func newClientStream(ctx context.Context, desc *StreamDesc, cc *ClientConn, method string, opts ...CallOption) (_ ClientStream, err error) {
- if md, _, ok := metadata.FromOutgoingContextRaw(ctx); ok {
+ if md, added, ok := metadata.FromOutgoingContextRaw(ctx); ok {
+ // validate md
if err := imetadata.Validate(md); err != nil {
return nil, status.Error(codes.Internal, err.Error())
}
+ // validate added
+ for _, kvs := range added {
+ for i := 0; i < len(kvs); i += 2 {
+ if err := imetadata.ValidatePair(kvs[i], kvs[i+1]); err != nil {
+ return nil, status.Error(codes.Internal, err.Error())
+ }
+ }
+ }
}
if channelz.IsOn() {
cc.incrCallsStarted()
@@ -352,7 +361,7 @@ func newClientStreamWithParams(ctx context.Context, desc *StreamDesc, cc *Client
}
}
for _, binlog := range cs.binlogs {
- binlog.Log(logEntry)
+ binlog.Log(cs.ctx, logEntry)
}
}
@@ -800,7 +809,7 @@ func (cs *clientStream) Header() (metadata.MD, error) {
}
cs.serverHeaderBinlogged = true
for _, binlog := range cs.binlogs {
- binlog.Log(logEntry)
+ binlog.Log(cs.ctx, logEntry)
}
}
return m, nil
@@ -881,7 +890,7 @@ func (cs *clientStream) SendMsg(m interface{}) (err error) {
Message: data,
}
for _, binlog := range cs.binlogs {
- binlog.Log(cm)
+ binlog.Log(cs.ctx, cm)
}
}
return err
@@ -905,7 +914,7 @@ func (cs *clientStream) RecvMsg(m interface{}) error {
Message: recvInfo.uncompressedBytes,
}
for _, binlog := range cs.binlogs {
- binlog.Log(sm)
+ binlog.Log(cs.ctx, sm)
}
}
if err != nil || !cs.desc.ServerStreams {
@@ -926,7 +935,7 @@ func (cs *clientStream) RecvMsg(m interface{}) error {
logEntry.PeerAddr = peer.Addr
}
for _, binlog := range cs.binlogs {
- binlog.Log(logEntry)
+ binlog.Log(cs.ctx, logEntry)
}
}
}
@@ -953,7 +962,7 @@ func (cs *clientStream) CloseSend() error {
OnClientSide: true,
}
for _, binlog := range cs.binlogs {
- binlog.Log(chc)
+ binlog.Log(cs.ctx, chc)
}
}
// We never returned an error here for reasons.
@@ -971,6 +980,9 @@ func (cs *clientStream) finish(err error) {
return
}
cs.finished = true
+ for _, onFinish := range cs.callInfo.onFinish {
+ onFinish(err)
+ }
cs.commitAttemptLocked()
if cs.attempt != nil {
cs.attempt.finish(err)
@@ -992,7 +1004,7 @@ func (cs *clientStream) finish(err error) {
OnClientSide: true,
}
for _, binlog := range cs.binlogs {
- binlog.Log(c)
+ binlog.Log(cs.ctx, c)
}
}
if err == nil {
@@ -1081,9 +1093,10 @@ func (a *csAttempt) recvMsg(m interface{}, payInfo *payloadInfo) (err error) {
RecvTime: time.Now(),
Payload: m,
// TODO truncate large payload.
- Data: payInfo.uncompressedBytes,
- WireLength: payInfo.wireLength + headerLen,
- Length: len(payInfo.uncompressedBytes),
+ Data: payInfo.uncompressedBytes,
+ WireLength: payInfo.compressedLength + headerLen,
+ CompressedLength: payInfo.compressedLength,
+ Length: len(payInfo.uncompressedBytes),
})
}
if channelz.IsOn() {
@@ -1511,6 +1524,8 @@ type serverStream struct {
comp encoding.Compressor
decomp encoding.Compressor
+ sendCompressorName string
+
maxReceiveMessageSize int
maxSendMessageSize int
trInfo *traceInfo
@@ -1558,7 +1573,7 @@ func (ss *serverStream) SendHeader(md metadata.MD) error {
}
ss.serverHeaderBinlogged = true
for _, binlog := range ss.binlogs {
- binlog.Log(sh)
+ binlog.Log(ss.ctx, sh)
}
}
return err
@@ -1603,6 +1618,13 @@ func (ss *serverStream) SendMsg(m interface{}) (err error) {
}
}()
+ // Server handler could have set new compressor by calling SetSendCompressor.
+ // In case it is set, we need to use it for compressing outbound message.
+ if sendCompressorsName := ss.s.SendCompress(); sendCompressorsName != ss.sendCompressorName {
+ ss.comp = encoding.GetCompressor(sendCompressorsName)
+ ss.sendCompressorName = sendCompressorsName
+ }
+
// load hdr, payload, data
hdr, payload, data, err := prepareMsg(m, ss.codec, ss.cp, ss.comp)
if err != nil {
@@ -1624,14 +1646,14 @@ func (ss *serverStream) SendMsg(m interface{}) (err error) {
}
ss.serverHeaderBinlogged = true
for _, binlog := range ss.binlogs {
- binlog.Log(sh)
+ binlog.Log(ss.ctx, sh)
}
}
sm := &binarylog.ServerMessage{
Message: data,
}
for _, binlog := range ss.binlogs {
- binlog.Log(sm)
+ binlog.Log(ss.ctx, sm)
}
}
if len(ss.statsHandler) != 0 {
@@ -1679,7 +1701,7 @@ func (ss *serverStream) RecvMsg(m interface{}) (err error) {
if len(ss.binlogs) != 0 {
chc := &binarylog.ClientHalfClose{}
for _, binlog := range ss.binlogs {
- binlog.Log(chc)
+ binlog.Log(ss.ctx, chc)
}
}
return err
@@ -1695,9 +1717,10 @@ func (ss *serverStream) RecvMsg(m interface{}) (err error) {
RecvTime: time.Now(),
Payload: m,
// TODO truncate large payload.
- Data: payInfo.uncompressedBytes,
- WireLength: payInfo.wireLength + headerLen,
- Length: len(payInfo.uncompressedBytes),
+ Data: payInfo.uncompressedBytes,
+ Length: len(payInfo.uncompressedBytes),
+ WireLength: payInfo.compressedLength + headerLen,
+ CompressedLength: payInfo.compressedLength,
})
}
}
@@ -1706,7 +1729,7 @@ func (ss *serverStream) RecvMsg(m interface{}) (err error) {
Message: payInfo.uncompressedBytes,
}
for _, binlog := range ss.binlogs {
- binlog.Log(cm)
+ binlog.Log(ss.ctx, cm)
}
}
return nil