summaryrefslogtreecommitdiff
path: root/vendor/google.golang.org/grpc/internal/transport/handler_server.go
diff options
context:
space:
mode:
Diffstat (limited to 'vendor/google.golang.org/grpc/internal/transport/handler_server.go')
-rw-r--r--vendor/google.golang.org/grpc/internal/transport/handler_server.go65
1 files changed, 35 insertions, 30 deletions
diff --git a/vendor/google.golang.org/grpc/internal/transport/handler_server.go b/vendor/google.golang.org/grpc/internal/transport/handler_server.go
index 17f7a21b5..a9d70e2a1 100644
--- a/vendor/google.golang.org/grpc/internal/transport/handler_server.go
+++ b/vendor/google.golang.org/grpc/internal/transport/handler_server.go
@@ -75,11 +75,25 @@ func NewServerHandlerTransport(w http.ResponseWriter, r *http.Request, stats []s
return nil, errors.New(msg)
}
+ var localAddr net.Addr
+ if la := r.Context().Value(http.LocalAddrContextKey); la != nil {
+ localAddr, _ = la.(net.Addr)
+ }
+ var authInfo credentials.AuthInfo
+ if r.TLS != nil {
+ authInfo = credentials.TLSInfo{State: *r.TLS, CommonAuthInfo: credentials.CommonAuthInfo{SecurityLevel: credentials.PrivacyAndIntegrity}}
+ }
+ p := peer.Peer{
+ Addr: strAddr(r.RemoteAddr),
+ LocalAddr: localAddr,
+ AuthInfo: authInfo,
+ }
st := &serverHandlerTransport{
rw: w,
req: r,
closedCh: make(chan struct{}),
writes: make(chan func()),
+ peer: p,
contentType: contentType,
contentSubtype: contentSubtype,
stats: stats,
@@ -134,6 +148,8 @@ type serverHandlerTransport struct {
headerMD metadata.MD
+ peer peer.Peer
+
closeOnce sync.Once
closedCh chan struct{} // closed on Close
@@ -165,7 +181,13 @@ func (ht *serverHandlerTransport) Close(err error) {
})
}
-func (ht *serverHandlerTransport) RemoteAddr() net.Addr { return strAddr(ht.req.RemoteAddr) }
+func (ht *serverHandlerTransport) Peer() *peer.Peer {
+ return &peer.Peer{
+ Addr: ht.peer.Addr,
+ LocalAddr: ht.peer.LocalAddr,
+ AuthInfo: ht.peer.AuthInfo,
+ }
+}
// strAddr is a net.Addr backed by either a TCP "ip:port" string, or
// the empty string if unknown.
@@ -347,10 +369,8 @@ func (ht *serverHandlerTransport) WriteHeader(s *Stream, md metadata.MD) error {
return err
}
-func (ht *serverHandlerTransport) HandleStreams(startStream func(*Stream)) {
+func (ht *serverHandlerTransport) HandleStreams(ctx context.Context, startStream func(*Stream)) {
// With this transport type there will be exactly 1 stream: this HTTP request.
-
- ctx := ht.req.Context()
var cancel context.CancelFunc
if ht.timeoutSet {
ctx, cancel = context.WithTimeout(ctx, ht.timeout)
@@ -370,34 +390,19 @@ func (ht *serverHandlerTransport) HandleStreams(startStream func(*Stream)) {
ht.Close(errors.New("request is done processing"))
}()
+ ctx = metadata.NewIncomingContext(ctx, ht.headerMD)
req := ht.req
-
s := &Stream{
- id: 0, // irrelevant
- requestRead: func(int) {},
- cancel: cancel,
- buf: newRecvBuffer(),
- st: ht,
- method: req.URL.Path,
- recvCompress: req.Header.Get("grpc-encoding"),
- contentSubtype: ht.contentSubtype,
- }
- pr := &peer.Peer{
- Addr: ht.RemoteAddr(),
- }
- if req.TLS != nil {
- pr.AuthInfo = credentials.TLSInfo{State: *req.TLS, CommonAuthInfo: credentials.CommonAuthInfo{SecurityLevel: credentials.PrivacyAndIntegrity}}
- }
- ctx = metadata.NewIncomingContext(ctx, ht.headerMD)
- s.ctx = peer.NewContext(ctx, pr)
- for _, sh := range ht.stats {
- s.ctx = sh.TagRPC(s.ctx, &stats.RPCTagInfo{FullMethodName: s.method})
- inHeader := &stats.InHeader{
- FullMethod: s.method,
- RemoteAddr: ht.RemoteAddr(),
- Compression: s.recvCompress,
- }
- sh.HandleRPC(s.ctx, inHeader)
+ id: 0, // irrelevant
+ ctx: ctx,
+ requestRead: func(int) {},
+ cancel: cancel,
+ buf: newRecvBuffer(),
+ st: ht,
+ method: req.URL.Path,
+ recvCompress: req.Header.Get("grpc-encoding"),
+ contentSubtype: ht.contentSubtype,
+ headerWireLength: 0, // won't have access to header wire length until golang/go#18997.
}
s.trReader = &transportReader{
reader: &recvBufferReader{ctx: s.ctx, ctxDone: s.ctx.Done(), recv: s.buf, freeBuffer: func(*bytes.Buffer) {}},