summaryrefslogtreecommitdiff
path: root/vendor/google.golang.org
diff options
context:
space:
mode:
Diffstat (limited to 'vendor/google.golang.org')
-rw-r--r--vendor/google.golang.org/grpc/internal/transport/http2_server.go11
-rw-r--r--vendor/google.golang.org/grpc/server.go71
-rw-r--r--vendor/google.golang.org/grpc/version.go2
3 files changed, 54 insertions, 30 deletions
diff --git a/vendor/google.golang.org/grpc/internal/transport/http2_server.go b/vendor/google.golang.org/grpc/internal/transport/http2_server.go
index 8d3a353c1..c06db679d 100644
--- a/vendor/google.golang.org/grpc/internal/transport/http2_server.go
+++ b/vendor/google.golang.org/grpc/internal/transport/http2_server.go
@@ -171,15 +171,10 @@ func NewServerTransport(conn net.Conn, config *ServerConfig) (_ ServerTransport,
ID: http2.SettingMaxFrameSize,
Val: http2MaxFrameLen,
}}
- // TODO(zhaoq): Have a better way to signal "no limit" because 0 is
- // permitted in the HTTP2 spec.
- maxStreams := config.MaxStreams
- if maxStreams == 0 {
- maxStreams = math.MaxUint32
- } else {
+ if config.MaxStreams != math.MaxUint32 {
isettings = append(isettings, http2.Setting{
ID: http2.SettingMaxConcurrentStreams,
- Val: maxStreams,
+ Val: config.MaxStreams,
})
}
dynamicWindow := true
@@ -258,7 +253,7 @@ func NewServerTransport(conn net.Conn, config *ServerConfig) (_ ServerTransport,
framer: framer,
readerDone: make(chan struct{}),
writerDone: make(chan struct{}),
- maxStreams: maxStreams,
+ maxStreams: config.MaxStreams,
inTapHandle: config.InTapHandle,
fc: &trInFlow{limit: uint32(icwz)},
state: reachable,
diff --git a/vendor/google.golang.org/grpc/server.go b/vendor/google.golang.org/grpc/server.go
index 244123c6c..eeae92fbe 100644
--- a/vendor/google.golang.org/grpc/server.go
+++ b/vendor/google.golang.org/grpc/server.go
@@ -115,12 +115,6 @@ type serviceInfo struct {
mdata any
}
-type serverWorkerData struct {
- st transport.ServerTransport
- wg *sync.WaitGroup
- stream *transport.Stream
-}
-
// Server is a gRPC server to serve RPC requests.
type Server struct {
opts serverOptions
@@ -145,7 +139,7 @@ type Server struct {
channelzID *channelz.Identifier
czData *channelzData
- serverWorkerChannel chan *serverWorkerData
+ serverWorkerChannel chan func()
}
type serverOptions struct {
@@ -179,6 +173,7 @@ type serverOptions struct {
}
var defaultServerOptions = serverOptions{
+ maxConcurrentStreams: math.MaxUint32,
maxReceiveMessageSize: defaultServerMaxReceiveMessageSize,
maxSendMessageSize: defaultServerMaxSendMessageSize,
connectionTimeout: 120 * time.Second,
@@ -404,6 +399,9 @@ func MaxSendMsgSize(m int) ServerOption {
// MaxConcurrentStreams returns a ServerOption that will apply a limit on the number
// of concurrent streams to each ServerTransport.
func MaxConcurrentStreams(n uint32) ServerOption {
+ if n == 0 {
+ n = math.MaxUint32
+ }
return newFuncServerOption(func(o *serverOptions) {
o.maxConcurrentStreams = n
})
@@ -605,24 +603,19 @@ const serverWorkerResetThreshold = 1 << 16
// [1] https://github.com/golang/go/issues/18138
func (s *Server) serverWorker() {
for completed := 0; completed < serverWorkerResetThreshold; completed++ {
- data, ok := <-s.serverWorkerChannel
+ f, ok := <-s.serverWorkerChannel
if !ok {
return
}
- s.handleSingleStream(data)
+ f()
}
go s.serverWorker()
}
-func (s *Server) handleSingleStream(data *serverWorkerData) {
- defer data.wg.Done()
- s.handleStream(data.st, data.stream, s.traceInfo(data.st, data.stream))
-}
-
// initServerWorkers creates worker goroutines and a channel to process incoming
// connections to reduce the time spent overall on runtime.morestack.
func (s *Server) initServerWorkers() {
- s.serverWorkerChannel = make(chan *serverWorkerData)
+ s.serverWorkerChannel = make(chan func())
for i := uint32(0); i < s.opts.numServerWorkers; i++ {
go s.serverWorker()
}
@@ -982,21 +975,26 @@ func (s *Server) serveStreams(st transport.ServerTransport) {
defer st.Close(errors.New("finished serving streams for the server transport"))
var wg sync.WaitGroup
+ streamQuota := newHandlerQuota(s.opts.maxConcurrentStreams)
st.HandleStreams(func(stream *transport.Stream) {
wg.Add(1)
+
+ streamQuota.acquire()
+ f := func() {
+ defer streamQuota.release()
+ defer wg.Done()
+ s.handleStream(st, stream, s.traceInfo(st, stream))
+ }
+
if s.opts.numServerWorkers > 0 {
- data := &serverWorkerData{st: st, wg: &wg, stream: stream}
select {
- case s.serverWorkerChannel <- data:
+ case s.serverWorkerChannel <- f:
return
default:
// If all stream workers are busy, fallback to the default code path.
}
}
- go func() {
- defer wg.Done()
- s.handleStream(st, stream, s.traceInfo(st, stream))
- }()
+ go f()
}, func(ctx context.Context, method string) context.Context {
if !EnableTracing {
return ctx
@@ -2091,3 +2089,34 @@ func validateSendCompressor(name, clientCompressors string) error {
}
return fmt.Errorf("client does not support compressor %q", name)
}
+
+// atomicSemaphore implements a blocking, counting semaphore. acquire should be
+// called synchronously; release may be called asynchronously.
+type atomicSemaphore struct {
+ n atomic.Int64
+ wait chan struct{}
+}
+
+func (q *atomicSemaphore) acquire() {
+ if q.n.Add(-1) < 0 {
+ // We ran out of quota. Block until a release happens.
+ <-q.wait
+ }
+}
+
+func (q *atomicSemaphore) release() {
+ // N.B. the "<= 0" check below should allow for this to work with multiple
+ // concurrent calls to acquire, but also note that with synchronous calls to
+ // acquire, as our system does, n will never be less than -1. There are
+ // fairness issues (queuing) to consider if this was to be generalized.
+ if q.n.Add(1) <= 0 {
+ // An acquire was waiting on us. Unblock it.
+ q.wait <- struct{}{}
+ }
+}
+
+func newHandlerQuota(n uint32) *atomicSemaphore {
+ a := &atomicSemaphore{wait: make(chan struct{}, 1)}
+ a.n.Store(int64(n))
+ return a
+}
diff --git a/vendor/google.golang.org/grpc/version.go b/vendor/google.golang.org/grpc/version.go
index d3f5bcbfc..724ad2102 100644
--- a/vendor/google.golang.org/grpc/version.go
+++ b/vendor/google.golang.org/grpc/version.go
@@ -19,4 +19,4 @@
package grpc
// Version is the current grpc version.
-const Version = "1.58.2"
+const Version = "1.58.3"