summaryrefslogtreecommitdiff
path: root/vendor/google.golang.org/grpc/server.go
diff options
context:
space:
mode:
authorLibravatar tobi <31960611+tsmethurst@users.noreply.github.com>2023-05-12 14:33:40 +0200
committerLibravatar GitHub <noreply@github.com>2023-05-12 14:33:40 +0200
commitec325fee141c1e9757144a0a4094061b56839b78 (patch)
tree2948ab4ef5702cc8478ab2be841340b946bdb867 /vendor/google.golang.org/grpc/server.go
parent[frogend/bugfix] fix dynamicSpoiler elements (#1771) (diff)
downloadgotosocial-ec325fee141c1e9757144a0a4094061b56839b78.tar.xz
[chore] Update a bunch of database dependencies (#1772)
* [chore] Update a bunch of database dependencies * fix lil thing
Diffstat (limited to 'vendor/google.golang.org/grpc/server.go')
-rw-r--r--vendor/google.golang.org/grpc/server.go151
1 files changed, 121 insertions, 30 deletions
diff --git a/vendor/google.golang.org/grpc/server.go b/vendor/google.golang.org/grpc/server.go
index d5a6e78be..087b9ad7c 100644
--- a/vendor/google.golang.org/grpc/server.go
+++ b/vendor/google.golang.org/grpc/server.go
@@ -45,6 +45,7 @@ import (
"google.golang.org/grpc/internal/channelz"
"google.golang.org/grpc/internal/grpcrand"
"google.golang.org/grpc/internal/grpcsync"
+ "google.golang.org/grpc/internal/grpcutil"
"google.golang.org/grpc/internal/transport"
"google.golang.org/grpc/keepalive"
"google.golang.org/grpc/metadata"
@@ -74,10 +75,10 @@ func init() {
srv.drainServerTransports(addr)
}
internal.AddGlobalServerOptions = func(opt ...ServerOption) {
- extraServerOptions = append(extraServerOptions, opt...)
+ globalServerOptions = append(globalServerOptions, opt...)
}
internal.ClearGlobalServerOptions = func() {
- extraServerOptions = nil
+ globalServerOptions = nil
}
internal.BinaryLogger = binaryLogger
internal.JoinServerOptions = newJoinServerOption
@@ -183,7 +184,7 @@ var defaultServerOptions = serverOptions{
writeBufferSize: defaultWriteBufSize,
readBufferSize: defaultReadBufSize,
}
-var extraServerOptions []ServerOption
+var globalServerOptions []ServerOption
// A ServerOption sets options such as credentials, codec and keepalive parameters, etc.
type ServerOption interface {
@@ -600,7 +601,7 @@ func (s *Server) stopServerWorkers() {
// started to accept requests yet.
func NewServer(opt ...ServerOption) *Server {
opts := defaultServerOptions
- for _, o := range extraServerOptions {
+ for _, o := range globalServerOptions {
o.apply(&opts)
}
for _, o := range opt {
@@ -1252,7 +1253,7 @@ func (s *Server) processUnaryRPC(t transport.ServerTransport, stream *transport.
logEntry.PeerAddr = peer.Addr
}
for _, binlog := range binlogs {
- binlog.Log(logEntry)
+ binlog.Log(ctx, logEntry)
}
}
@@ -1263,6 +1264,7 @@ func (s *Server) processUnaryRPC(t transport.ServerTransport, stream *transport.
var comp, decomp encoding.Compressor
var cp Compressor
var dc Decompressor
+ var sendCompressorName string
// If dc is set and matches the stream's compression, use it. Otherwise, try
// to find a matching registered compressor for decomp.
@@ -1283,12 +1285,18 @@ func (s *Server) processUnaryRPC(t transport.ServerTransport, stream *transport.
// NOTE: this needs to be ahead of all handling, https://github.com/grpc/grpc-go/issues/686.
if s.opts.cp != nil {
cp = s.opts.cp
- stream.SetSendCompress(cp.Type())
+ sendCompressorName = cp.Type()
} else if rc := stream.RecvCompress(); rc != "" && rc != encoding.Identity {
// Legacy compressor not specified; attempt to respond with same encoding.
comp = encoding.GetCompressor(rc)
if comp != nil {
- stream.SetSendCompress(rc)
+ sendCompressorName = comp.Name()
+ }
+ }
+
+ if sendCompressorName != "" {
+ if err := stream.SetSendCompress(sendCompressorName); err != nil {
+ return status.Errorf(codes.Internal, "grpc: failed to set send compressor: %v", err)
}
}
@@ -1312,11 +1320,12 @@ func (s *Server) processUnaryRPC(t transport.ServerTransport, stream *transport.
}
for _, sh := range shs {
sh.HandleRPC(stream.Context(), &stats.InPayload{
- RecvTime: time.Now(),
- Payload: v,
- WireLength: payInfo.wireLength + headerLen,
- Data: d,
- Length: len(d),
+ RecvTime: time.Now(),
+ Payload: v,
+ Length: len(d),
+ WireLength: payInfo.compressedLength + headerLen,
+ CompressedLength: payInfo.compressedLength,
+ Data: d,
})
}
if len(binlogs) != 0 {
@@ -1324,7 +1333,7 @@ func (s *Server) processUnaryRPC(t transport.ServerTransport, stream *transport.
Message: d,
}
for _, binlog := range binlogs {
- binlog.Log(cm)
+ binlog.Log(stream.Context(), cm)
}
}
if trInfo != nil {
@@ -1357,7 +1366,7 @@ func (s *Server) processUnaryRPC(t transport.ServerTransport, stream *transport.
Header: h,
}
for _, binlog := range binlogs {
- binlog.Log(sh)
+ binlog.Log(stream.Context(), sh)
}
}
st := &binarylog.ServerTrailer{
@@ -1365,7 +1374,7 @@ func (s *Server) processUnaryRPC(t transport.ServerTransport, stream *transport.
Err: appErr,
}
for _, binlog := range binlogs {
- binlog.Log(st)
+ binlog.Log(stream.Context(), st)
}
}
return appErr
@@ -1375,6 +1384,11 @@ func (s *Server) processUnaryRPC(t transport.ServerTransport, stream *transport.
}
opts := &transport.Options{Last: true}
+ // Server handler could have set new compressor by calling SetSendCompressor.
+ // In case it is set, we need to use it for compressing outbound message.
+ if stream.SendCompress() != sendCompressorName {
+ comp = encoding.GetCompressor(stream.SendCompress())
+ }
if err := s.sendResponse(t, stream, reply, cp, opts, comp); err != nil {
if err == io.EOF {
// The entire stream is done (for unary RPC only).
@@ -1402,8 +1416,8 @@ func (s *Server) processUnaryRPC(t transport.ServerTransport, stream *transport.
Err: appErr,
}
for _, binlog := range binlogs {
- binlog.Log(sh)
- binlog.Log(st)
+ binlog.Log(stream.Context(), sh)
+ binlog.Log(stream.Context(), st)
}
}
return err
@@ -1417,8 +1431,8 @@ func (s *Server) processUnaryRPC(t transport.ServerTransport, stream *transport.
Message: reply,
}
for _, binlog := range binlogs {
- binlog.Log(sh)
- binlog.Log(sm)
+ binlog.Log(stream.Context(), sh)
+ binlog.Log(stream.Context(), sm)
}
}
if channelz.IsOn() {
@@ -1430,17 +1444,16 @@ func (s *Server) processUnaryRPC(t transport.ServerTransport, stream *transport.
// TODO: Should we be logging if writing status failed here, like above?
// Should the logging be in WriteStatus? Should we ignore the WriteStatus
// error or allow the stats handler to see it?
- err = t.WriteStatus(stream, statusOK)
if len(binlogs) != 0 {
st := &binarylog.ServerTrailer{
Trailer: stream.Trailer(),
Err: appErr,
}
for _, binlog := range binlogs {
- binlog.Log(st)
+ binlog.Log(stream.Context(), st)
}
}
- return err
+ return t.WriteStatus(stream, statusOK)
}
// chainStreamServerInterceptors chains all stream server interceptors into one.
@@ -1574,7 +1587,7 @@ func (s *Server) processStreamingRPC(t transport.ServerTransport, stream *transp
logEntry.PeerAddr = peer.Addr
}
for _, binlog := range ss.binlogs {
- binlog.Log(logEntry)
+ binlog.Log(stream.Context(), logEntry)
}
}
@@ -1597,12 +1610,18 @@ func (s *Server) processStreamingRPC(t transport.ServerTransport, stream *transp
// NOTE: this needs to be ahead of all handling, https://github.com/grpc/grpc-go/issues/686.
if s.opts.cp != nil {
ss.cp = s.opts.cp
- stream.SetSendCompress(s.opts.cp.Type())
+ ss.sendCompressorName = s.opts.cp.Type()
} else if rc := stream.RecvCompress(); rc != "" && rc != encoding.Identity {
// Legacy compressor not specified; attempt to respond with same encoding.
ss.comp = encoding.GetCompressor(rc)
if ss.comp != nil {
- stream.SetSendCompress(rc)
+ ss.sendCompressorName = rc
+ }
+ }
+
+ if ss.sendCompressorName != "" {
+ if err := stream.SetSendCompress(ss.sendCompressorName); err != nil {
+ return status.Errorf(codes.Internal, "grpc: failed to set send compressor: %v", err)
}
}
@@ -1640,16 +1659,16 @@ func (s *Server) processStreamingRPC(t transport.ServerTransport, stream *transp
ss.trInfo.tr.SetError()
ss.mu.Unlock()
}
- t.WriteStatus(ss.s, appStatus)
if len(ss.binlogs) != 0 {
st := &binarylog.ServerTrailer{
Trailer: ss.s.Trailer(),
Err: appErr,
}
for _, binlog := range ss.binlogs {
- binlog.Log(st)
+ binlog.Log(stream.Context(), st)
}
}
+ t.WriteStatus(ss.s, appStatus)
// TODO: Should we log an error from WriteStatus here and below?
return appErr
}
@@ -1658,17 +1677,16 @@ func (s *Server) processStreamingRPC(t transport.ServerTransport, stream *transp
ss.trInfo.tr.LazyLog(stringer("OK"), false)
ss.mu.Unlock()
}
- err = t.WriteStatus(ss.s, statusOK)
if len(ss.binlogs) != 0 {
st := &binarylog.ServerTrailer{
Trailer: ss.s.Trailer(),
Err: appErr,
}
for _, binlog := range ss.binlogs {
- binlog.Log(st)
+ binlog.Log(stream.Context(), st)
}
}
- return err
+ return t.WriteStatus(ss.s, statusOK)
}
func (s *Server) handleStream(t transport.ServerTransport, stream *transport.Stream, trInfo *traceInfo) {
@@ -1935,6 +1953,60 @@ func SendHeader(ctx context.Context, md metadata.MD) error {
return nil
}
+// SetSendCompressor sets a compressor for outbound messages from the server.
+// It must not be called after any event that causes headers to be sent
+// (see ServerStream.SetHeader for the complete list). Provided compressor is
+// used when below conditions are met:
+//
+// - compressor is registered via encoding.RegisterCompressor
+// - compressor name must exist in the client advertised compressor names
+// sent in grpc-accept-encoding header. Use ClientSupportedCompressors to
+// get client supported compressor names.
+//
+// The context provided must be the context passed to the server's handler.
+// It must be noted that compressor name encoding.Identity disables the
+// outbound compression.
+// By default, server messages will be sent using the same compressor with
+// which request messages were sent.
+//
+// It is not safe to call SetSendCompressor concurrently with SendHeader and
+// SendMsg.
+//
+// # Experimental
+//
+// Notice: This function is EXPERIMENTAL and may be changed or removed in a
+// later release.
+func SetSendCompressor(ctx context.Context, name string) error {
+ stream, ok := ServerTransportStreamFromContext(ctx).(*transport.Stream)
+ if !ok || stream == nil {
+ return fmt.Errorf("failed to fetch the stream from the given context")
+ }
+
+ if err := validateSendCompressor(name, stream.ClientAdvertisedCompressors()); err != nil {
+ return fmt.Errorf("unable to set send compressor: %w", err)
+ }
+
+ return stream.SetSendCompress(name)
+}
+
+// ClientSupportedCompressors returns compressor names advertised by the client
+// via grpc-accept-encoding header.
+//
+// The context provided must be the context passed to the server's handler.
+//
+// # Experimental
+//
+// Notice: This function is EXPERIMENTAL and may be changed or removed in a
+// later release.
+func ClientSupportedCompressors(ctx context.Context) ([]string, error) {
+ stream, ok := ServerTransportStreamFromContext(ctx).(*transport.Stream)
+ if !ok || stream == nil {
+ return nil, fmt.Errorf("failed to fetch the stream from the given context %v", ctx)
+ }
+
+ return strings.Split(stream.ClientAdvertisedCompressors(), ","), nil
+}
+
// SetTrailer sets the trailer metadata that will be sent when an RPC returns.
// When called more than once, all the provided metadata will be merged.
//
@@ -1969,3 +2041,22 @@ type channelzServer struct {
func (c *channelzServer) ChannelzMetric() *channelz.ServerInternalMetric {
return c.s.channelzMetric()
}
+
+// validateSendCompressor returns an error when given compressor name cannot be
+// handled by the server or the client based on the advertised compressors.
+func validateSendCompressor(name, clientCompressors string) error {
+ if name == encoding.Identity {
+ return nil
+ }
+
+ if !grpcutil.IsCompressorNameRegistered(name) {
+ return fmt.Errorf("compressor not registered %q", name)
+ }
+
+ for _, c := range strings.Split(clientCompressors, ",") {
+ if c == name {
+ return nil // found match
+ }
+ }
+ return fmt.Errorf("client does not support compressor %q", name)
+}