summaryrefslogtreecommitdiff
path: root/vendor/google.golang.org/grpc/server.go
diff options
context:
space:
mode:
authorLibravatar tobi <31960611+tsmethurst@users.noreply.github.com>2023-11-13 11:08:02 +0100
committerLibravatar GitHub <noreply@github.com>2023-11-13 11:08:02 +0100
commit7753f421323ad4b77c4f5c59dc000f8a6b1aa491 (patch)
treeb4850aac8ed6185c3af249f1c119699775303c9b /vendor/google.golang.org/grpc/server.go
parent[bugfix] support endless polls, and misskey's' method of inferring expiry in ... (diff)
downloadgotosocial-7753f421323ad4b77c4f5c59dc000f8a6b1aa491.tar.xz
[chore] update otel -> v1.20.0 (#2358)
Diffstat (limited to 'vendor/google.golang.org/grpc/server.go')
-rw-r--r--vendor/google.golang.org/grpc/server.go136
1 files changed, 61 insertions, 75 deletions
diff --git a/vendor/google.golang.org/grpc/server.go b/vendor/google.golang.org/grpc/server.go
index eeae92fbe..8f60d4214 100644
--- a/vendor/google.golang.org/grpc/server.go
+++ b/vendor/google.golang.org/grpc/server.go
@@ -983,7 +983,7 @@ func (s *Server) serveStreams(st transport.ServerTransport) {
f := func() {
defer streamQuota.release()
defer wg.Done()
- s.handleStream(st, stream, s.traceInfo(st, stream))
+ s.handleStream(st, stream)
}
if s.opts.numServerWorkers > 0 {
@@ -995,12 +995,6 @@ func (s *Server) serveStreams(st transport.ServerTransport) {
}
}
go f()
- }, func(ctx context.Context, method string) context.Context {
- if !EnableTracing {
- return ctx
- }
- tr := trace.New("grpc.Recv."+methodFamily(method), method)
- return trace.NewContext(ctx, tr)
})
wg.Wait()
}
@@ -1049,30 +1043,6 @@ func (s *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) {
s.serveStreams(st)
}
-// traceInfo returns a traceInfo and associates it with stream, if tracing is enabled.
-// If tracing is not enabled, it returns nil.
-func (s *Server) traceInfo(st transport.ServerTransport, stream *transport.Stream) (trInfo *traceInfo) {
- if !EnableTracing {
- return nil
- }
- tr, ok := trace.FromContext(stream.Context())
- if !ok {
- return nil
- }
-
- trInfo = &traceInfo{
- tr: tr,
- firstLine: firstLine{
- client: false,
- remoteAddr: st.RemoteAddr(),
- },
- }
- if dl, ok := stream.Context().Deadline(); ok {
- trInfo.firstLine.deadline = time.Until(dl)
- }
- return trInfo
-}
-
func (s *Server) addConn(addr string, st transport.ServerTransport) bool {
s.mu.Lock()
defer s.mu.Unlock()
@@ -1133,7 +1103,7 @@ func (s *Server) incrCallsFailed() {
atomic.AddInt64(&s.czData.callsFailed, 1)
}
-func (s *Server) sendResponse(t transport.ServerTransport, stream *transport.Stream, msg any, cp Compressor, opts *transport.Options, comp encoding.Compressor) error {
+func (s *Server) sendResponse(ctx context.Context, t transport.ServerTransport, stream *transport.Stream, msg any, cp Compressor, opts *transport.Options, comp encoding.Compressor) error {
data, err := encode(s.getCodec(stream.ContentSubtype()), msg)
if err != nil {
channelz.Error(logger, s.channelzID, "grpc: server failed to encode response: ", err)
@@ -1152,7 +1122,7 @@ func (s *Server) sendResponse(t transport.ServerTransport, stream *transport.Str
err = t.Write(stream, hdr, payload, opts)
if err == nil {
for _, sh := range s.opts.statsHandlers {
- sh.HandleRPC(stream.Context(), outPayload(false, msg, data, payload, time.Now()))
+ sh.HandleRPC(ctx, outPayload(false, msg, data, payload, time.Now()))
}
}
return err
@@ -1194,7 +1164,7 @@ func getChainUnaryHandler(interceptors []UnaryServerInterceptor, curr int, info
}
}
-func (s *Server) processUnaryRPC(t transport.ServerTransport, stream *transport.Stream, info *serviceInfo, md *MethodDesc, trInfo *traceInfo) (err error) {
+func (s *Server) processUnaryRPC(ctx context.Context, t transport.ServerTransport, stream *transport.Stream, info *serviceInfo, md *MethodDesc, trInfo *traceInfo) (err error) {
shs := s.opts.statsHandlers
if len(shs) != 0 || trInfo != nil || channelz.IsOn() {
if channelz.IsOn() {
@@ -1208,7 +1178,7 @@ func (s *Server) processUnaryRPC(t transport.ServerTransport, stream *transport.
IsClientStream: false,
IsServerStream: false,
}
- sh.HandleRPC(stream.Context(), statsBegin)
+ sh.HandleRPC(ctx, statsBegin)
}
if trInfo != nil {
trInfo.tr.LazyLog(&trInfo.firstLine, false)
@@ -1240,7 +1210,7 @@ func (s *Server) processUnaryRPC(t transport.ServerTransport, stream *transport.
if err != nil && err != io.EOF {
end.Error = toRPCErr(err)
}
- sh.HandleRPC(stream.Context(), end)
+ sh.HandleRPC(ctx, end)
}
if channelz.IsOn() {
@@ -1262,7 +1232,6 @@ func (s *Server) processUnaryRPC(t transport.ServerTransport, stream *transport.
}
}
if len(binlogs) != 0 {
- ctx := stream.Context()
md, _ := metadata.FromIncomingContext(ctx)
logEntry := &binarylog.ClientHeader{
Header: md,
@@ -1348,7 +1317,7 @@ func (s *Server) processUnaryRPC(t transport.ServerTransport, stream *transport.
return status.Errorf(codes.Internal, "grpc: error unmarshalling request: %v", err)
}
for _, sh := range shs {
- sh.HandleRPC(stream.Context(), &stats.InPayload{
+ sh.HandleRPC(ctx, &stats.InPayload{
RecvTime: time.Now(),
Payload: v,
Length: len(d),
@@ -1362,7 +1331,7 @@ func (s *Server) processUnaryRPC(t transport.ServerTransport, stream *transport.
Message: d,
}
for _, binlog := range binlogs {
- binlog.Log(stream.Context(), cm)
+ binlog.Log(ctx, cm)
}
}
if trInfo != nil {
@@ -1370,7 +1339,7 @@ func (s *Server) processUnaryRPC(t transport.ServerTransport, stream *transport.
}
return nil
}
- ctx := NewContextWithServerTransportStream(stream.Context(), stream)
+ ctx = NewContextWithServerTransportStream(ctx, stream)
reply, appErr := md.Handler(info.serviceImpl, ctx, df, s.opts.unaryInt)
if appErr != nil {
appStatus, ok := status.FromError(appErr)
@@ -1395,7 +1364,7 @@ func (s *Server) processUnaryRPC(t transport.ServerTransport, stream *transport.
Header: h,
}
for _, binlog := range binlogs {
- binlog.Log(stream.Context(), sh)
+ binlog.Log(ctx, sh)
}
}
st := &binarylog.ServerTrailer{
@@ -1403,7 +1372,7 @@ func (s *Server) processUnaryRPC(t transport.ServerTransport, stream *transport.
Err: appErr,
}
for _, binlog := range binlogs {
- binlog.Log(stream.Context(), st)
+ binlog.Log(ctx, st)
}
}
return appErr
@@ -1418,7 +1387,7 @@ func (s *Server) processUnaryRPC(t transport.ServerTransport, stream *transport.
if stream.SendCompress() != sendCompressorName {
comp = encoding.GetCompressor(stream.SendCompress())
}
- if err := s.sendResponse(t, stream, reply, cp, opts, comp); err != nil {
+ if err := s.sendResponse(ctx, t, stream, reply, cp, opts, comp); err != nil {
if err == io.EOF {
// The entire stream is done (for unary RPC only).
return err
@@ -1445,8 +1414,8 @@ func (s *Server) processUnaryRPC(t transport.ServerTransport, stream *transport.
Err: appErr,
}
for _, binlog := range binlogs {
- binlog.Log(stream.Context(), sh)
- binlog.Log(stream.Context(), st)
+ binlog.Log(ctx, sh)
+ binlog.Log(ctx, st)
}
}
return err
@@ -1460,8 +1429,8 @@ func (s *Server) processUnaryRPC(t transport.ServerTransport, stream *transport.
Message: reply,
}
for _, binlog := range binlogs {
- binlog.Log(stream.Context(), sh)
- binlog.Log(stream.Context(), sm)
+ binlog.Log(ctx, sh)
+ binlog.Log(ctx, sm)
}
}
if channelz.IsOn() {
@@ -1479,7 +1448,7 @@ func (s *Server) processUnaryRPC(t transport.ServerTransport, stream *transport.
Err: appErr,
}
for _, binlog := range binlogs {
- binlog.Log(stream.Context(), st)
+ binlog.Log(ctx, st)
}
}
return t.WriteStatus(stream, statusOK)
@@ -1521,7 +1490,7 @@ func getChainStreamHandler(interceptors []StreamServerInterceptor, curr int, inf
}
}
-func (s *Server) processStreamingRPC(t transport.ServerTransport, stream *transport.Stream, info *serviceInfo, sd *StreamDesc, trInfo *traceInfo) (err error) {
+func (s *Server) processStreamingRPC(ctx context.Context, t transport.ServerTransport, stream *transport.Stream, info *serviceInfo, sd *StreamDesc, trInfo *traceInfo) (err error) {
if channelz.IsOn() {
s.incrCallsStarted()
}
@@ -1535,10 +1504,10 @@ func (s *Server) processStreamingRPC(t transport.ServerTransport, stream *transp
IsServerStream: sd.ServerStreams,
}
for _, sh := range shs {
- sh.HandleRPC(stream.Context(), statsBegin)
+ sh.HandleRPC(ctx, statsBegin)
}
}
- ctx := NewContextWithServerTransportStream(stream.Context(), stream)
+ ctx = NewContextWithServerTransportStream(ctx, stream)
ss := &serverStream{
ctx: ctx,
t: t,
@@ -1574,7 +1543,7 @@ func (s *Server) processStreamingRPC(t transport.ServerTransport, stream *transp
end.Error = toRPCErr(err)
}
for _, sh := range shs {
- sh.HandleRPC(stream.Context(), end)
+ sh.HandleRPC(ctx, end)
}
}
@@ -1616,7 +1585,7 @@ func (s *Server) processStreamingRPC(t transport.ServerTransport, stream *transp
logEntry.PeerAddr = peer.Addr
}
for _, binlog := range ss.binlogs {
- binlog.Log(stream.Context(), logEntry)
+ binlog.Log(ctx, logEntry)
}
}
@@ -1694,7 +1663,7 @@ func (s *Server) processStreamingRPC(t transport.ServerTransport, stream *transp
Err: appErr,
}
for _, binlog := range ss.binlogs {
- binlog.Log(stream.Context(), st)
+ binlog.Log(ctx, st)
}
}
t.WriteStatus(ss.s, appStatus)
@@ -1712,33 +1681,50 @@ func (s *Server) processStreamingRPC(t transport.ServerTransport, stream *transp
Err: appErr,
}
for _, binlog := range ss.binlogs {
- binlog.Log(stream.Context(), st)
+ binlog.Log(ctx, st)
}
}
return t.WriteStatus(ss.s, statusOK)
}
-func (s *Server) handleStream(t transport.ServerTransport, stream *transport.Stream, trInfo *traceInfo) {
+func (s *Server) handleStream(t transport.ServerTransport, stream *transport.Stream) {
+ ctx := stream.Context()
+ var ti *traceInfo
+ if EnableTracing {
+ tr := trace.New("grpc.Recv."+methodFamily(stream.Method()), stream.Method())
+ ctx = trace.NewContext(ctx, tr)
+ ti = &traceInfo{
+ tr: tr,
+ firstLine: firstLine{
+ client: false,
+ remoteAddr: t.RemoteAddr(),
+ },
+ }
+ if dl, ok := ctx.Deadline(); ok {
+ ti.firstLine.deadline = time.Until(dl)
+ }
+ }
+
sm := stream.Method()
if sm != "" && sm[0] == '/' {
sm = sm[1:]
}
pos := strings.LastIndex(sm, "/")
if pos == -1 {
- if trInfo != nil {
- trInfo.tr.LazyLog(&fmtStringer{"Malformed method name %q", []any{sm}}, true)
- trInfo.tr.SetError()
+ if ti != nil {
+ ti.tr.LazyLog(&fmtStringer{"Malformed method name %q", []any{sm}}, true)
+ ti.tr.SetError()
}
errDesc := fmt.Sprintf("malformed method name: %q", stream.Method())
if err := t.WriteStatus(stream, status.New(codes.Unimplemented, errDesc)); err != nil {
- if trInfo != nil {
- trInfo.tr.LazyLog(&fmtStringer{"%v", []any{err}}, true)
- trInfo.tr.SetError()
+ if ti != nil {
+ ti.tr.LazyLog(&fmtStringer{"%v", []any{err}}, true)
+ ti.tr.SetError()
}
channelz.Warningf(logger, s.channelzID, "grpc: Server.handleStream failed to write status: %v", err)
}
- if trInfo != nil {
- trInfo.tr.Finish()
+ if ti != nil {
+ ti.tr.Finish()
}
return
}
@@ -1748,17 +1734,17 @@ func (s *Server) handleStream(t transport.ServerTransport, stream *transport.Str
srv, knownService := s.services[service]
if knownService {
if md, ok := srv.methods[method]; ok {
- s.processUnaryRPC(t, stream, srv, md, trInfo)
+ s.processUnaryRPC(ctx, t, stream, srv, md, ti)
return
}
if sd, ok := srv.streams[method]; ok {
- s.processStreamingRPC(t, stream, srv, sd, trInfo)
+ s.processStreamingRPC(ctx, t, stream, srv, sd, ti)
return
}
}
// Unknown service, or known server unknown method.
if unknownDesc := s.opts.unknownStreamDesc; unknownDesc != nil {
- s.processStreamingRPC(t, stream, nil, unknownDesc, trInfo)
+ s.processStreamingRPC(ctx, t, stream, nil, unknownDesc, ti)
return
}
var errDesc string
@@ -1767,19 +1753,19 @@ func (s *Server) handleStream(t transport.ServerTransport, stream *transport.Str
} else {
errDesc = fmt.Sprintf("unknown method %v for service %v", method, service)
}
- if trInfo != nil {
- trInfo.tr.LazyPrintf("%s", errDesc)
- trInfo.tr.SetError()
+ if ti != nil {
+ ti.tr.LazyPrintf("%s", errDesc)
+ ti.tr.SetError()
}
if err := t.WriteStatus(stream, status.New(codes.Unimplemented, errDesc)); err != nil {
- if trInfo != nil {
- trInfo.tr.LazyLog(&fmtStringer{"%v", []any{err}}, true)
- trInfo.tr.SetError()
+ if ti != nil {
+ ti.tr.LazyLog(&fmtStringer{"%v", []any{err}}, true)
+ ti.tr.SetError()
}
channelz.Warningf(logger, s.channelzID, "grpc: Server.handleStream failed to write status: %v", err)
}
- if trInfo != nil {
- trInfo.tr.Finish()
+ if ti != nil {
+ ti.tr.Finish()
}
}