diff options
Diffstat (limited to 'vendor/golang.org/x/net/http2/transport.go')
-rw-r--r-- | vendor/golang.org/x/net/http2/transport.go | 310 |
1 files changed, 107 insertions, 203 deletions
diff --git a/vendor/golang.org/x/net/http2/transport.go b/vendor/golang.org/x/net/http2/transport.go index 2fa49490c..98a49c6b6 100644 --- a/vendor/golang.org/x/net/http2/transport.go +++ b/vendor/golang.org/x/net/http2/transport.go @@ -185,7 +185,45 @@ type Transport struct { connPoolOnce sync.Once connPoolOrDef ClientConnPool // non-nil version of ConnPool - syncHooks *testSyncHooks + *transportTestHooks +} + +// Hook points used for testing. +// Outside of tests, t.transportTestHooks is nil and these all have minimal implementations. +// Inside tests, see the testSyncHooks function docs. + +type transportTestHooks struct { + newclientconn func(*ClientConn) + group synctestGroupInterface +} + +func (t *Transport) markNewGoroutine() { + if t != nil && t.transportTestHooks != nil { + t.transportTestHooks.group.Join() + } +} + +// newTimer creates a new time.Timer, or a synthetic timer in tests. +func (t *Transport) newTimer(d time.Duration) timer { + if t.transportTestHooks != nil { + return t.transportTestHooks.group.NewTimer(d) + } + return timeTimer{time.NewTimer(d)} +} + +// afterFunc creates a new time.AfterFunc timer, or a synthetic timer in tests. +func (t *Transport) afterFunc(d time.Duration, f func()) timer { + if t.transportTestHooks != nil { + return t.transportTestHooks.group.AfterFunc(d, f) + } + return timeTimer{time.AfterFunc(d, f)} +} + +func (t *Transport) contextWithTimeout(ctx context.Context, d time.Duration) (context.Context, context.CancelFunc) { + if t.transportTestHooks != nil { + return t.transportTestHooks.group.ContextWithTimeout(ctx, d) + } + return context.WithTimeout(ctx, d) } func (t *Transport) maxHeaderListSize() uint32 { @@ -352,60 +390,6 @@ type ClientConn struct { werr error // first write error that has occurred hbuf bytes.Buffer // HPACK encoder writes into this henc *hpack.Encoder - - syncHooks *testSyncHooks // can be nil -} - -// Hook points used for testing. -// Outside of tests, cc.syncHooks is nil and these all have minimal implementations. -// Inside tests, see the testSyncHooks function docs. - -// goRun starts a new goroutine. -func (cc *ClientConn) goRun(f func()) { - if cc.syncHooks != nil { - cc.syncHooks.goRun(f) - return - } - go f() -} - -// condBroadcast is cc.cond.Broadcast. -func (cc *ClientConn) condBroadcast() { - if cc.syncHooks != nil { - cc.syncHooks.condBroadcast(cc.cond) - } - cc.cond.Broadcast() -} - -// condWait is cc.cond.Wait. -func (cc *ClientConn) condWait() { - if cc.syncHooks != nil { - cc.syncHooks.condWait(cc.cond) - } - cc.cond.Wait() -} - -// newTimer creates a new time.Timer, or a synthetic timer in tests. -func (cc *ClientConn) newTimer(d time.Duration) timer { - if cc.syncHooks != nil { - return cc.syncHooks.newTimer(d) - } - return newTimeTimer(d) -} - -// afterFunc creates a new time.AfterFunc timer, or a synthetic timer in tests. -func (cc *ClientConn) afterFunc(d time.Duration, f func()) timer { - if cc.syncHooks != nil { - return cc.syncHooks.afterFunc(d, f) - } - return newTimeAfterFunc(d, f) -} - -func (cc *ClientConn) contextWithTimeout(ctx context.Context, d time.Duration) (context.Context, context.CancelFunc) { - if cc.syncHooks != nil { - return cc.syncHooks.contextWithTimeout(ctx, d) - } - return context.WithTimeout(ctx, d) } // clientStream is the state for a single HTTP/2 stream. One of these @@ -487,7 +471,7 @@ func (cs *clientStream) abortStreamLocked(err error) { // TODO(dneil): Clean up tests where cs.cc.cond is nil. if cs.cc.cond != nil { // Wake up writeRequestBody if it is waiting on flow control. - cs.cc.condBroadcast() + cs.cc.cond.Broadcast() } } @@ -497,7 +481,7 @@ func (cs *clientStream) abortRequestBodyWrite() { defer cc.mu.Unlock() if cs.reqBody != nil && cs.reqBodyClosed == nil { cs.closeReqBodyLocked() - cc.condBroadcast() + cc.cond.Broadcast() } } @@ -507,10 +491,11 @@ func (cs *clientStream) closeReqBodyLocked() { } cs.reqBodyClosed = make(chan struct{}) reqBodyClosed := cs.reqBodyClosed - cs.cc.goRun(func() { + go func() { + cs.cc.t.markNewGoroutine() cs.reqBody.Close() close(reqBodyClosed) - }) + }() } type stickyErrWriter struct { @@ -626,21 +611,7 @@ func (t *Transport) RoundTripOpt(req *http.Request, opt RoundTripOpt) (*http.Res backoff := float64(uint(1) << (uint(retry) - 1)) backoff += backoff * (0.1 * mathrand.Float64()) d := time.Second * time.Duration(backoff) - var tm timer - if t.syncHooks != nil { - tm = t.syncHooks.newTimer(d) - t.syncHooks.blockUntil(func() bool { - select { - case <-tm.C(): - case <-req.Context().Done(): - default: - return false - } - return true - }) - } else { - tm = newTimeTimer(d) - } + tm := t.newTimer(d) select { case <-tm.C(): t.vlogf("RoundTrip retrying after failure: %v", roundTripErr) @@ -725,8 +696,8 @@ func canRetryError(err error) bool { } func (t *Transport) dialClientConn(ctx context.Context, addr string, singleUse bool) (*ClientConn, error) { - if t.syncHooks != nil { - return t.newClientConn(nil, singleUse, t.syncHooks) + if t.transportTestHooks != nil { + return t.newClientConn(nil, singleUse) } host, _, err := net.SplitHostPort(addr) if err != nil { @@ -736,7 +707,7 @@ func (t *Transport) dialClientConn(ctx context.Context, addr string, singleUse b if err != nil { return nil, err } - return t.newClientConn(tconn, singleUse, nil) + return t.newClientConn(tconn, singleUse) } func (t *Transport) newTLSConfig(host string) *tls.Config { @@ -802,10 +773,10 @@ func (t *Transport) maxEncoderHeaderTableSize() uint32 { } func (t *Transport) NewClientConn(c net.Conn) (*ClientConn, error) { - return t.newClientConn(c, t.disableKeepAlives(), nil) + return t.newClientConn(c, t.disableKeepAlives()) } -func (t *Transport) newClientConn(c net.Conn, singleUse bool, hooks *testSyncHooks) (*ClientConn, error) { +func (t *Transport) newClientConn(c net.Conn, singleUse bool) (*ClientConn, error) { cc := &ClientConn{ t: t, tconn: c, @@ -820,16 +791,12 @@ func (t *Transport) newClientConn(c net.Conn, singleUse bool, hooks *testSyncHoo wantSettingsAck: true, pings: make(map[[8]byte]chan struct{}), reqHeaderMu: make(chan struct{}, 1), - syncHooks: hooks, } - if hooks != nil { - hooks.newclientconn(cc) + if t.transportTestHooks != nil { + t.markNewGoroutine() + t.transportTestHooks.newclientconn(cc) c = cc.tconn } - if d := t.idleConnTimeout(); d != 0 { - cc.idleTimeout = d - cc.idleTimer = cc.afterFunc(d, cc.onIdleTimeout) - } if VerboseLogs { t.vlogf("http2: Transport creating client conn %p to %v", cc, c.RemoteAddr()) } @@ -893,7 +860,13 @@ func (t *Transport) newClientConn(c net.Conn, singleUse bool, hooks *testSyncHoo return nil, cc.werr } - cc.goRun(cc.readLoop) + // Start the idle timer after the connection is fully initialized. + if d := t.idleConnTimeout(); d != 0 { + cc.idleTimeout = d + cc.idleTimer = t.afterFunc(d, cc.onIdleTimeout) + } + + go cc.readLoop() return cc, nil } @@ -901,7 +874,7 @@ func (cc *ClientConn) healthCheck() { pingTimeout := cc.t.pingTimeout() // We don't need to periodically ping in the health check, because the readLoop of ClientConn will // trigger the healthCheck again if there is no frame received. - ctx, cancel := cc.contextWithTimeout(context.Background(), pingTimeout) + ctx, cancel := cc.t.contextWithTimeout(context.Background(), pingTimeout) defer cancel() cc.vlogf("http2: Transport sending health check") err := cc.Ping(ctx) @@ -1144,7 +1117,8 @@ func (cc *ClientConn) Shutdown(ctx context.Context) error { // Wait for all in-flight streams to complete or connection to close done := make(chan struct{}) cancelled := false // guarded by cc.mu - cc.goRun(func() { + go func() { + cc.t.markNewGoroutine() cc.mu.Lock() defer cc.mu.Unlock() for { @@ -1156,9 +1130,9 @@ func (cc *ClientConn) Shutdown(ctx context.Context) error { if cancelled { break } - cc.condWait() + cc.cond.Wait() } - }) + }() shutdownEnterWaitStateHook() select { case <-done: @@ -1168,7 +1142,7 @@ func (cc *ClientConn) Shutdown(ctx context.Context) error { cc.mu.Lock() // Free the goroutine above cancelled = true - cc.condBroadcast() + cc.cond.Broadcast() cc.mu.Unlock() return ctx.Err() } @@ -1206,7 +1180,7 @@ func (cc *ClientConn) closeForError(err error) { for _, cs := range cc.streams { cs.abortStreamLocked(err) } - cc.condBroadcast() + cc.cond.Broadcast() cc.mu.Unlock() cc.closeConn() } @@ -1321,23 +1295,30 @@ func (cc *ClientConn) roundTrip(req *http.Request, streamf func(*clientStream)) respHeaderRecv: make(chan struct{}), donec: make(chan struct{}), } - cc.goRun(func() { - cs.doRequest(req) - }) + + // TODO(bradfitz): this is a copy of the logic in net/http. Unify somewhere? + if !cc.t.disableCompression() && + req.Header.Get("Accept-Encoding") == "" && + req.Header.Get("Range") == "" && + !cs.isHead { + // Request gzip only, not deflate. Deflate is ambiguous and + // not as universally supported anyway. + // See: https://zlib.net/zlib_faq.html#faq39 + // + // Note that we don't request this for HEAD requests, + // due to a bug in nginx: + // http://trac.nginx.org/nginx/ticket/358 + // https://golang.org/issue/5522 + // + // We don't request gzip if the request is for a range, since + // auto-decoding a portion of a gzipped document will just fail + // anyway. See https://golang.org/issue/8923 + cs.requestedGzip = true + } + + go cs.doRequest(req, streamf) waitDone := func() error { - if cc.syncHooks != nil { - cc.syncHooks.blockUntil(func() bool { - select { - case <-cs.donec: - case <-ctx.Done(): - case <-cs.reqCancel: - default: - return false - } - return true - }) - } select { case <-cs.donec: return nil @@ -1398,24 +1379,7 @@ func (cc *ClientConn) roundTrip(req *http.Request, streamf func(*clientStream)) return err } - if streamf != nil { - streamf(cs) - } - for { - if cc.syncHooks != nil { - cc.syncHooks.blockUntil(func() bool { - select { - case <-cs.respHeaderRecv: - case <-cs.abort: - case <-ctx.Done(): - case <-cs.reqCancel: - default: - return false - } - return true - }) - } select { case <-cs.respHeaderRecv: return handleResponseHeaders() @@ -1445,8 +1409,9 @@ func (cc *ClientConn) roundTrip(req *http.Request, streamf func(*clientStream)) // doRequest runs for the duration of the request lifetime. // // It sends the request and performs post-request cleanup (closing Request.Body, etc.). -func (cs *clientStream) doRequest(req *http.Request) { - err := cs.writeRequest(req) +func (cs *clientStream) doRequest(req *http.Request, streamf func(*clientStream)) { + cs.cc.t.markNewGoroutine() + err := cs.writeRequest(req, streamf) cs.cleanupWriteRequest(err) } @@ -1457,7 +1422,7 @@ func (cs *clientStream) doRequest(req *http.Request) { // // It returns non-nil if the request ends otherwise. // If the returned error is StreamError, the error Code may be used in resetting the stream. -func (cs *clientStream) writeRequest(req *http.Request) (err error) { +func (cs *clientStream) writeRequest(req *http.Request, streamf func(*clientStream)) (err error) { cc := cs.cc ctx := cs.ctx @@ -1471,21 +1436,6 @@ func (cs *clientStream) writeRequest(req *http.Request) (err error) { if cc.reqHeaderMu == nil { panic("RoundTrip on uninitialized ClientConn") // for tests } - var newStreamHook func(*clientStream) - if cc.syncHooks != nil { - newStreamHook = cc.syncHooks.newstream - cc.syncHooks.blockUntil(func() bool { - select { - case cc.reqHeaderMu <- struct{}{}: - <-cc.reqHeaderMu - case <-cs.reqCancel: - case <-ctx.Done(): - default: - return false - } - return true - }) - } select { case cc.reqHeaderMu <- struct{}{}: case <-cs.reqCancel: @@ -1510,28 +1460,8 @@ func (cs *clientStream) writeRequest(req *http.Request) (err error) { } cc.mu.Unlock() - if newStreamHook != nil { - newStreamHook(cs) - } - - // TODO(bradfitz): this is a copy of the logic in net/http. Unify somewhere? - if !cc.t.disableCompression() && - req.Header.Get("Accept-Encoding") == "" && - req.Header.Get("Range") == "" && - !cs.isHead { - // Request gzip only, not deflate. Deflate is ambiguous and - // not as universally supported anyway. - // See: https://zlib.net/zlib_faq.html#faq39 - // - // Note that we don't request this for HEAD requests, - // due to a bug in nginx: - // http://trac.nginx.org/nginx/ticket/358 - // https://golang.org/issue/5522 - // - // We don't request gzip if the request is for a range, since - // auto-decoding a portion of a gzipped document will just fail - // anyway. See https://golang.org/issue/8923 - cs.requestedGzip = true + if streamf != nil { + streamf(cs) } continueTimeout := cc.t.expectContinueTimeout() @@ -1594,7 +1524,7 @@ func (cs *clientStream) writeRequest(req *http.Request) (err error) { var respHeaderTimer <-chan time.Time var respHeaderRecv chan struct{} if d := cc.responseHeaderTimeout(); d != 0 { - timer := cc.newTimer(d) + timer := cc.t.newTimer(d) defer timer.Stop() respHeaderTimer = timer.C() respHeaderRecv = cs.respHeaderRecv @@ -1603,21 +1533,6 @@ func (cs *clientStream) writeRequest(req *http.Request) (err error) { // or until the request is aborted (via context, error, or otherwise), // whichever comes first. for { - if cc.syncHooks != nil { - cc.syncHooks.blockUntil(func() bool { - select { - case <-cs.peerClosed: - case <-respHeaderTimer: - case <-respHeaderRecv: - case <-cs.abort: - case <-ctx.Done(): - case <-cs.reqCancel: - default: - return false - } - return true - }) - } select { case <-cs.peerClosed: return nil @@ -1766,7 +1681,7 @@ func (cc *ClientConn) awaitOpenSlotForStreamLocked(cs *clientStream) error { return nil } cc.pendingRequests++ - cc.condWait() + cc.cond.Wait() cc.pendingRequests-- select { case <-cs.abort: @@ -2028,7 +1943,7 @@ func (cs *clientStream) awaitFlowControl(maxBytes int) (taken int32, err error) cs.flow.take(take) return take, nil } - cc.condWait() + cc.cond.Wait() } } @@ -2311,7 +2226,7 @@ func (cc *ClientConn) forgetStreamID(id uint32) { } // Wake up writeRequestBody via clientStream.awaitFlowControl and // wake up RoundTrip if there is a pending request. - cc.condBroadcast() + cc.cond.Broadcast() closeOnIdle := cc.singleUse || cc.doNotReuse || cc.t.disableKeepAlives() || cc.goAway != nil if closeOnIdle && cc.streamsReserved == 0 && len(cc.streams) == 0 { @@ -2333,6 +2248,7 @@ type clientConnReadLoop struct { // readLoop runs in its own goroutine and reads and dispatches frames. func (cc *ClientConn) readLoop() { + cc.t.markNewGoroutine() rl := &clientConnReadLoop{cc: cc} defer rl.cleanup() cc.readerErr = rl.run() @@ -2399,7 +2315,7 @@ func (rl *clientConnReadLoop) cleanup() { cs.abortStreamLocked(err) } } - cc.condBroadcast() + cc.cond.Broadcast() cc.mu.Unlock() } @@ -2436,7 +2352,7 @@ func (rl *clientConnReadLoop) run() error { readIdleTimeout := cc.t.ReadIdleTimeout var t timer if readIdleTimeout != 0 { - t = cc.afterFunc(readIdleTimeout, cc.healthCheck) + t = cc.t.afterFunc(readIdleTimeout, cc.healthCheck) } for { f, err := cc.fr.ReadFrame() @@ -3034,7 +2950,7 @@ func (rl *clientConnReadLoop) processSettingsNoWrite(f *SettingsFrame) error { for _, cs := range cc.streams { cs.flow.add(delta) } - cc.condBroadcast() + cc.cond.Broadcast() cc.initialWindowSize = s.Val case SettingHeaderTableSize: @@ -3089,7 +3005,7 @@ func (rl *clientConnReadLoop) processWindowUpdate(f *WindowUpdateFrame) error { return ConnectionError(ErrCodeFlowControl) } - cc.condBroadcast() + cc.cond.Broadcast() return nil } @@ -3133,7 +3049,8 @@ func (cc *ClientConn) Ping(ctx context.Context) error { } var pingError error errc := make(chan struct{}) - cc.goRun(func() { + go func() { + cc.t.markNewGoroutine() cc.wmu.Lock() defer cc.wmu.Unlock() if pingError = cc.fr.WritePing(false, p); pingError != nil { @@ -3144,20 +3061,7 @@ func (cc *ClientConn) Ping(ctx context.Context) error { close(errc) return } - }) - if cc.syncHooks != nil { - cc.syncHooks.blockUntil(func() bool { - select { - case <-c: - case <-errc: - case <-ctx.Done(): - case <-cc.readerDone: - default: - return false - } - return true - }) - } + }() select { case <-c: return nil |