summaryrefslogtreecommitdiff
path: root/vendor/google.golang.org/appengine/internal/api.go
diff options
context:
space:
mode:
Diffstat (limited to 'vendor/google.golang.org/appengine/internal/api.go')
-rw-r--r--vendor/google.golang.org/appengine/internal/api.go653
1 files changed, 0 insertions, 653 deletions
diff --git a/vendor/google.golang.org/appengine/internal/api.go b/vendor/google.golang.org/appengine/internal/api.go
deleted file mode 100644
index 0569f5dd4..000000000
--- a/vendor/google.golang.org/appengine/internal/api.go
+++ /dev/null
@@ -1,653 +0,0 @@
-// Copyright 2011 Google Inc. All rights reserved.
-// Use of this source code is governed by the Apache 2.0
-// license that can be found in the LICENSE file.
-
-//go:build !appengine
-// +build !appengine
-
-package internal
-
-import (
- "bytes"
- "context"
- "errors"
- "fmt"
- "io/ioutil"
- "log"
- "net"
- "net/http"
- "net/url"
- "os"
- "runtime"
- "strconv"
- "strings"
- "sync"
- "sync/atomic"
- "time"
-
- "github.com/golang/protobuf/proto"
-
- basepb "google.golang.org/appengine/internal/base"
- logpb "google.golang.org/appengine/internal/log"
- remotepb "google.golang.org/appengine/internal/remote_api"
-)
-
-const (
- apiPath = "/rpc_http"
-)
-
-var (
- // Incoming headers.
- ticketHeader = http.CanonicalHeaderKey("X-AppEngine-API-Ticket")
- dapperHeader = http.CanonicalHeaderKey("X-Google-DapperTraceInfo")
- traceHeader = http.CanonicalHeaderKey("X-Cloud-Trace-Context")
- curNamespaceHeader = http.CanonicalHeaderKey("X-AppEngine-Current-Namespace")
- userIPHeader = http.CanonicalHeaderKey("X-AppEngine-User-IP")
- remoteAddrHeader = http.CanonicalHeaderKey("X-AppEngine-Remote-Addr")
- devRequestIdHeader = http.CanonicalHeaderKey("X-Appengine-Dev-Request-Id")
-
- // Outgoing headers.
- apiEndpointHeader = http.CanonicalHeaderKey("X-Google-RPC-Service-Endpoint")
- apiEndpointHeaderValue = []string{"app-engine-apis"}
- apiMethodHeader = http.CanonicalHeaderKey("X-Google-RPC-Service-Method")
- apiMethodHeaderValue = []string{"/VMRemoteAPI.CallRemoteAPI"}
- apiDeadlineHeader = http.CanonicalHeaderKey("X-Google-RPC-Service-Deadline")
- apiContentType = http.CanonicalHeaderKey("Content-Type")
- apiContentTypeValue = []string{"application/octet-stream"}
- logFlushHeader = http.CanonicalHeaderKey("X-AppEngine-Log-Flush-Count")
-
- apiHTTPClient = &http.Client{
- Transport: &http.Transport{
- Proxy: http.ProxyFromEnvironment,
- Dial: limitDial,
- MaxIdleConns: 1000,
- MaxIdleConnsPerHost: 10000,
- IdleConnTimeout: 90 * time.Second,
- },
- }
-)
-
-func apiURL(ctx context.Context) *url.URL {
- host, port := "appengine.googleapis.internal", "10001"
- if h := os.Getenv("API_HOST"); h != "" {
- host = h
- }
- if hostOverride := ctx.Value(apiHostOverrideKey); hostOverride != nil {
- host = hostOverride.(string)
- }
- if p := os.Getenv("API_PORT"); p != "" {
- port = p
- }
- if portOverride := ctx.Value(apiPortOverrideKey); portOverride != nil {
- port = portOverride.(string)
- }
- return &url.URL{
- Scheme: "http",
- Host: host + ":" + port,
- Path: apiPath,
- }
-}
-
-// Middleware wraps an http handler so that it can make GAE API calls
-func Middleware(next http.Handler) http.Handler {
- return handleHTTPMiddleware(executeRequestSafelyMiddleware(next))
-}
-
-func handleHTTPMiddleware(next http.Handler) http.Handler {
- return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
- c := &aeContext{
- req: r,
- outHeader: w.Header(),
- }
- r = r.WithContext(withContext(r.Context(), c))
- c.req = r
-
- stopFlushing := make(chan int)
-
- // Patch up RemoteAddr so it looks reasonable.
- if addr := r.Header.Get(userIPHeader); addr != "" {
- r.RemoteAddr = addr
- } else if addr = r.Header.Get(remoteAddrHeader); addr != "" {
- r.RemoteAddr = addr
- } else {
- // Should not normally reach here, but pick a sensible default anyway.
- r.RemoteAddr = "127.0.0.1"
- }
- // The address in the headers will most likely be of these forms:
- // 123.123.123.123
- // 2001:db8::1
- // net/http.Request.RemoteAddr is specified to be in "IP:port" form.
- if _, _, err := net.SplitHostPort(r.RemoteAddr); err != nil {
- // Assume the remote address is only a host; add a default port.
- r.RemoteAddr = net.JoinHostPort(r.RemoteAddr, "80")
- }
-
- if logToLogservice() {
- // Start goroutine responsible for flushing app logs.
- // This is done after adding c to ctx.m (and stopped before removing it)
- // because flushing logs requires making an API call.
- go c.logFlusher(stopFlushing)
- }
-
- next.ServeHTTP(c, r)
- c.outHeader = nil // make sure header changes aren't respected any more
-
- flushed := make(chan struct{})
- if logToLogservice() {
- stopFlushing <- 1 // any logging beyond this point will be dropped
-
- // Flush any pending logs asynchronously.
- c.pendingLogs.Lock()
- flushes := c.pendingLogs.flushes
- if len(c.pendingLogs.lines) > 0 {
- flushes++
- }
- c.pendingLogs.Unlock()
- go func() {
- defer close(flushed)
- // Force a log flush, because with very short requests we
- // may not ever flush logs.
- c.flushLog(true)
- }()
- w.Header().Set(logFlushHeader, strconv.Itoa(flushes))
- }
-
- // Avoid nil Write call if c.Write is never called.
- if c.outCode != 0 {
- w.WriteHeader(c.outCode)
- }
- if c.outBody != nil {
- w.Write(c.outBody)
- }
- if logToLogservice() {
- // Wait for the last flush to complete before returning,
- // otherwise the security ticket will not be valid.
- <-flushed
- }
- })
-}
-
-func executeRequestSafelyMiddleware(next http.Handler) http.Handler {
- return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
- defer func() {
- if x := recover(); x != nil {
- c := w.(*aeContext)
- logf(c, 4, "%s", renderPanic(x)) // 4 == critical
- c.outCode = 500
- }
- }()
-
- next.ServeHTTP(w, r)
- })
-}
-
-func renderPanic(x interface{}) string {
- buf := make([]byte, 16<<10) // 16 KB should be plenty
- buf = buf[:runtime.Stack(buf, false)]
-
- // Remove the first few stack frames:
- // this func
- // the recover closure in the caller
- // That will root the stack trace at the site of the panic.
- const (
- skipStart = "internal.renderPanic"
- skipFrames = 2
- )
- start := bytes.Index(buf, []byte(skipStart))
- p := start
- for i := 0; i < skipFrames*2 && p+1 < len(buf); i++ {
- p = bytes.IndexByte(buf[p+1:], '\n') + p + 1
- if p < 0 {
- break
- }
- }
- if p >= 0 {
- // buf[start:p+1] is the block to remove.
- // Copy buf[p+1:] over buf[start:] and shrink buf.
- copy(buf[start:], buf[p+1:])
- buf = buf[:len(buf)-(p+1-start)]
- }
-
- // Add panic heading.
- head := fmt.Sprintf("panic: %v\n\n", x)
- if len(head) > len(buf) {
- // Extremely unlikely to happen.
- return head
- }
- copy(buf[len(head):], buf)
- copy(buf, head)
-
- return string(buf)
-}
-
-// aeContext represents the aeContext of an in-flight HTTP request.
-// It implements the appengine.Context and http.ResponseWriter interfaces.
-type aeContext struct {
- req *http.Request
-
- outCode int
- outHeader http.Header
- outBody []byte
-
- pendingLogs struct {
- sync.Mutex
- lines []*logpb.UserAppLogLine
- flushes int
- }
-}
-
-var contextKey = "holds a *context"
-
-// jointContext joins two contexts in a superficial way.
-// It takes values and timeouts from a base context, and only values from another context.
-type jointContext struct {
- base context.Context
- valuesOnly context.Context
-}
-
-func (c jointContext) Deadline() (time.Time, bool) {
- return c.base.Deadline()
-}
-
-func (c jointContext) Done() <-chan struct{} {
- return c.base.Done()
-}
-
-func (c jointContext) Err() error {
- return c.base.Err()
-}
-
-func (c jointContext) Value(key interface{}) interface{} {
- if val := c.base.Value(key); val != nil {
- return val
- }
- return c.valuesOnly.Value(key)
-}
-
-// fromContext returns the App Engine context or nil if ctx is not
-// derived from an App Engine context.
-func fromContext(ctx context.Context) *aeContext {
- c, _ := ctx.Value(&contextKey).(*aeContext)
- return c
-}
-
-func withContext(parent context.Context, c *aeContext) context.Context {
- ctx := context.WithValue(parent, &contextKey, c)
- if ns := c.req.Header.Get(curNamespaceHeader); ns != "" {
- ctx = withNamespace(ctx, ns)
- }
- return ctx
-}
-
-func toContext(c *aeContext) context.Context {
- return withContext(context.Background(), c)
-}
-
-func IncomingHeaders(ctx context.Context) http.Header {
- if c := fromContext(ctx); c != nil {
- return c.req.Header
- }
- return nil
-}
-
-func ReqContext(req *http.Request) context.Context {
- return req.Context()
-}
-
-func WithContext(parent context.Context, req *http.Request) context.Context {
- return jointContext{
- base: parent,
- valuesOnly: req.Context(),
- }
-}
-
-// RegisterTestRequest registers the HTTP request req for testing, such that
-// any API calls are sent to the provided URL.
-// It should only be used by aetest package.
-func RegisterTestRequest(req *http.Request, apiURL *url.URL, appID string) *http.Request {
- ctx := req.Context()
- ctx = withAPIHostOverride(ctx, apiURL.Hostname())
- ctx = withAPIPortOverride(ctx, apiURL.Port())
- ctx = WithAppIDOverride(ctx, appID)
-
- // use the unregistered request as a placeholder so that withContext can read the headers
- c := &aeContext{req: req}
- c.req = req.WithContext(withContext(ctx, c))
- return c.req
-}
-
-var errTimeout = &CallError{
- Detail: "Deadline exceeded",
- Code: int32(remotepb.RpcError_CANCELLED),
- Timeout: true,
-}
-
-func (c *aeContext) Header() http.Header { return c.outHeader }
-
-// Copied from $GOROOT/src/pkg/net/http/transfer.go. Some response status
-// codes do not permit a response body (nor response entity headers such as
-// Content-Length, Content-Type, etc).
-func bodyAllowedForStatus(status int) bool {
- switch {
- case status >= 100 && status <= 199:
- return false
- case status == 204:
- return false
- case status == 304:
- return false
- }
- return true
-}
-
-func (c *aeContext) Write(b []byte) (int, error) {
- if c.outCode == 0 {
- c.WriteHeader(http.StatusOK)
- }
- if len(b) > 0 && !bodyAllowedForStatus(c.outCode) {
- return 0, http.ErrBodyNotAllowed
- }
- c.outBody = append(c.outBody, b...)
- return len(b), nil
-}
-
-func (c *aeContext) WriteHeader(code int) {
- if c.outCode != 0 {
- logf(c, 3, "WriteHeader called multiple times on request.") // error level
- return
- }
- c.outCode = code
-}
-
-func post(ctx context.Context, body []byte, timeout time.Duration) (b []byte, err error) {
- apiURL := apiURL(ctx)
- hreq := &http.Request{
- Method: "POST",
- URL: apiURL,
- Header: http.Header{
- apiEndpointHeader: apiEndpointHeaderValue,
- apiMethodHeader: apiMethodHeaderValue,
- apiContentType: apiContentTypeValue,
- apiDeadlineHeader: []string{strconv.FormatFloat(timeout.Seconds(), 'f', -1, 64)},
- },
- Body: ioutil.NopCloser(bytes.NewReader(body)),
- ContentLength: int64(len(body)),
- Host: apiURL.Host,
- }
- c := fromContext(ctx)
- if c != nil {
- if info := c.req.Header.Get(dapperHeader); info != "" {
- hreq.Header.Set(dapperHeader, info)
- }
- if info := c.req.Header.Get(traceHeader); info != "" {
- hreq.Header.Set(traceHeader, info)
- }
- }
-
- tr := apiHTTPClient.Transport.(*http.Transport)
-
- var timedOut int32 // atomic; set to 1 if timed out
- t := time.AfterFunc(timeout, func() {
- atomic.StoreInt32(&timedOut, 1)
- tr.CancelRequest(hreq)
- })
- defer t.Stop()
- defer func() {
- // Check if timeout was exceeded.
- if atomic.LoadInt32(&timedOut) != 0 {
- err = errTimeout
- }
- }()
-
- hresp, err := apiHTTPClient.Do(hreq)
- if err != nil {
- return nil, &CallError{
- Detail: fmt.Sprintf("service bridge HTTP failed: %v", err),
- Code: int32(remotepb.RpcError_UNKNOWN),
- }
- }
- defer hresp.Body.Close()
- hrespBody, err := ioutil.ReadAll(hresp.Body)
- if hresp.StatusCode != 200 {
- return nil, &CallError{
- Detail: fmt.Sprintf("service bridge returned HTTP %d (%q)", hresp.StatusCode, hrespBody),
- Code: int32(remotepb.RpcError_UNKNOWN),
- }
- }
- if err != nil {
- return nil, &CallError{
- Detail: fmt.Sprintf("service bridge response bad: %v", err),
- Code: int32(remotepb.RpcError_UNKNOWN),
- }
- }
- return hrespBody, nil
-}
-
-func Call(ctx context.Context, service, method string, in, out proto.Message) error {
- if ns := NamespaceFromContext(ctx); ns != "" {
- if fn, ok := NamespaceMods[service]; ok {
- fn(in, ns)
- }
- }
-
- if f, ctx, ok := callOverrideFromContext(ctx); ok {
- return f(ctx, service, method, in, out)
- }
-
- // Handle already-done contexts quickly.
- select {
- case <-ctx.Done():
- return ctx.Err()
- default:
- }
-
- c := fromContext(ctx)
-
- // Apply transaction modifications if we're in a transaction.
- if t := transactionFromContext(ctx); t != nil {
- if t.finished {
- return errors.New("transaction aeContext has expired")
- }
- applyTransaction(in, &t.transaction)
- }
-
- // Default RPC timeout is 60s.
- timeout := 60 * time.Second
- if deadline, ok := ctx.Deadline(); ok {
- timeout = deadline.Sub(time.Now())
- }
-
- data, err := proto.Marshal(in)
- if err != nil {
- return err
- }
-
- ticket := ""
- if c != nil {
- ticket = c.req.Header.Get(ticketHeader)
- if dri := c.req.Header.Get(devRequestIdHeader); IsDevAppServer() && dri != "" {
- ticket = dri
- }
- }
- req := &remotepb.Request{
- ServiceName: &service,
- Method: &method,
- Request: data,
- RequestId: &ticket,
- }
- hreqBody, err := proto.Marshal(req)
- if err != nil {
- return err
- }
-
- hrespBody, err := post(ctx, hreqBody, timeout)
- if err != nil {
- return err
- }
-
- res := &remotepb.Response{}
- if err := proto.Unmarshal(hrespBody, res); err != nil {
- return err
- }
- if res.RpcError != nil {
- ce := &CallError{
- Detail: res.RpcError.GetDetail(),
- Code: *res.RpcError.Code,
- }
- switch remotepb.RpcError_ErrorCode(ce.Code) {
- case remotepb.RpcError_CANCELLED, remotepb.RpcError_DEADLINE_EXCEEDED:
- ce.Timeout = true
- }
- return ce
- }
- if res.ApplicationError != nil {
- return &APIError{
- Service: *req.ServiceName,
- Detail: res.ApplicationError.GetDetail(),
- Code: *res.ApplicationError.Code,
- }
- }
- if res.Exception != nil || res.JavaException != nil {
- // This shouldn't happen, but let's be defensive.
- return &CallError{
- Detail: "service bridge returned exception",
- Code: int32(remotepb.RpcError_UNKNOWN),
- }
- }
- return proto.Unmarshal(res.Response, out)
-}
-
-func (c *aeContext) Request() *http.Request {
- return c.req
-}
-
-func (c *aeContext) addLogLine(ll *logpb.UserAppLogLine) {
- // Truncate long log lines.
- // TODO(dsymonds): Check if this is still necessary.
- const lim = 8 << 10
- if len(*ll.Message) > lim {
- suffix := fmt.Sprintf("...(length %d)", len(*ll.Message))
- ll.Message = proto.String((*ll.Message)[:lim-len(suffix)] + suffix)
- }
-
- c.pendingLogs.Lock()
- c.pendingLogs.lines = append(c.pendingLogs.lines, ll)
- c.pendingLogs.Unlock()
-}
-
-var logLevelName = map[int64]string{
- 0: "DEBUG",
- 1: "INFO",
- 2: "WARNING",
- 3: "ERROR",
- 4: "CRITICAL",
-}
-
-func logf(c *aeContext, level int64, format string, args ...interface{}) {
- if c == nil {
- panic("not an App Engine aeContext")
- }
- s := fmt.Sprintf(format, args...)
- s = strings.TrimRight(s, "\n") // Remove any trailing newline characters.
- if logToLogservice() {
- c.addLogLine(&logpb.UserAppLogLine{
- TimestampUsec: proto.Int64(time.Now().UnixNano() / 1e3),
- Level: &level,
- Message: &s,
- })
- }
- // Log to stdout if not deployed
- if !IsSecondGen() {
- log.Print(logLevelName[level] + ": " + s)
- }
-}
-
-// flushLog attempts to flush any pending logs to the appserver.
-// It should not be called concurrently.
-func (c *aeContext) flushLog(force bool) (flushed bool) {
- c.pendingLogs.Lock()
- // Grab up to 30 MB. We can get away with up to 32 MB, but let's be cautious.
- n, rem := 0, 30<<20
- for ; n < len(c.pendingLogs.lines); n++ {
- ll := c.pendingLogs.lines[n]
- // Each log line will require about 3 bytes of overhead.
- nb := proto.Size(ll) + 3
- if nb > rem {
- break
- }
- rem -= nb
- }
- lines := c.pendingLogs.lines[:n]
- c.pendingLogs.lines = c.pendingLogs.lines[n:]
- c.pendingLogs.Unlock()
-
- if len(lines) == 0 && !force {
- // Nothing to flush.
- return false
- }
-
- rescueLogs := false
- defer func() {
- if rescueLogs {
- c.pendingLogs.Lock()
- c.pendingLogs.lines = append(lines, c.pendingLogs.lines...)
- c.pendingLogs.Unlock()
- }
- }()
-
- buf, err := proto.Marshal(&logpb.UserAppLogGroup{
- LogLine: lines,
- })
- if err != nil {
- log.Printf("internal.flushLog: marshaling UserAppLogGroup: %v", err)
- rescueLogs = true
- return false
- }
-
- req := &logpb.FlushRequest{
- Logs: buf,
- }
- res := &basepb.VoidProto{}
- c.pendingLogs.Lock()
- c.pendingLogs.flushes++
- c.pendingLogs.Unlock()
- if err := Call(toContext(c), "logservice", "Flush", req, res); err != nil {
- log.Printf("internal.flushLog: Flush RPC: %v", err)
- rescueLogs = true
- return false
- }
- return true
-}
-
-const (
- // Log flushing parameters.
- flushInterval = 1 * time.Second
- forceFlushInterval = 60 * time.Second
-)
-
-func (c *aeContext) logFlusher(stop <-chan int) {
- lastFlush := time.Now()
- tick := time.NewTicker(flushInterval)
- for {
- select {
- case <-stop:
- // Request finished.
- tick.Stop()
- return
- case <-tick.C:
- force := time.Now().Sub(lastFlush) > forceFlushInterval
- if c.flushLog(force) {
- lastFlush = time.Now()
- }
- }
- }
-}
-
-func ContextForTesting(req *http.Request) context.Context {
- return toContext(&aeContext{req: req})
-}
-
-func logToLogservice() bool {
- // TODO: replace logservice with json structured logs to $LOG_DIR/app.log.json
- // where $LOG_DIR is /var/log in prod and some tmpdir in dev
- return os.Getenv("LOG_TO_LOGSERVICE") != "0"
-}