summaryrefslogtreecommitdiff
path: root/vendor/google.golang.org/grpc/server.go
diff options
context:
space:
mode:
authorLibravatar dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>2023-10-26 11:20:49 +0200
committerLibravatar GitHub <noreply@github.com>2023-10-26 11:20:49 +0200
commit03732aca3befa6e892c129f63d3de5e928a5b26d (patch)
treed3e427b31ab3b1fc0c2d84aabb7ae88efa0eed20 /vendor/google.golang.org/grpc/server.go
parent[feature] attach any request errors if found, only set level=ERROR if code >=... (diff)
downloadgotosocial-03732aca3befa6e892c129f63d3de5e928a5b26d.tar.xz
[chore]: Bump google.golang.org/grpc from 1.58.2 to 1.58.3 (#2301)
Bumps [google.golang.org/grpc](https://github.com/grpc/grpc-go) from 1.58.2 to 1.58.3. - [Release notes](https://github.com/grpc/grpc-go/releases) - [Commits](https://github.com/grpc/grpc-go/compare/v1.58.2...v1.58.3) --- updated-dependencies: - dependency-name: google.golang.org/grpc dependency-type: indirect ... Signed-off-by: dependabot[bot] <support@github.com> Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
Diffstat (limited to 'vendor/google.golang.org/grpc/server.go')
-rw-r--r--vendor/google.golang.org/grpc/server.go71
1 files changed, 50 insertions, 21 deletions
diff --git a/vendor/google.golang.org/grpc/server.go b/vendor/google.golang.org/grpc/server.go
index 244123c6c..eeae92fbe 100644
--- a/vendor/google.golang.org/grpc/server.go
+++ b/vendor/google.golang.org/grpc/server.go
@@ -115,12 +115,6 @@ type serviceInfo struct {
mdata any
}
-type serverWorkerData struct {
- st transport.ServerTransport
- wg *sync.WaitGroup
- stream *transport.Stream
-}
-
// Server is a gRPC server to serve RPC requests.
type Server struct {
opts serverOptions
@@ -145,7 +139,7 @@ type Server struct {
channelzID *channelz.Identifier
czData *channelzData
- serverWorkerChannel chan *serverWorkerData
+ serverWorkerChannel chan func()
}
type serverOptions struct {
@@ -179,6 +173,7 @@ type serverOptions struct {
}
var defaultServerOptions = serverOptions{
+ maxConcurrentStreams: math.MaxUint32,
maxReceiveMessageSize: defaultServerMaxReceiveMessageSize,
maxSendMessageSize: defaultServerMaxSendMessageSize,
connectionTimeout: 120 * time.Second,
@@ -404,6 +399,9 @@ func MaxSendMsgSize(m int) ServerOption {
// MaxConcurrentStreams returns a ServerOption that will apply a limit on the number
// of concurrent streams to each ServerTransport.
func MaxConcurrentStreams(n uint32) ServerOption {
+ if n == 0 {
+ n = math.MaxUint32
+ }
return newFuncServerOption(func(o *serverOptions) {
o.maxConcurrentStreams = n
})
@@ -605,24 +603,19 @@ const serverWorkerResetThreshold = 1 << 16
// [1] https://github.com/golang/go/issues/18138
func (s *Server) serverWorker() {
for completed := 0; completed < serverWorkerResetThreshold; completed++ {
- data, ok := <-s.serverWorkerChannel
+ f, ok := <-s.serverWorkerChannel
if !ok {
return
}
- s.handleSingleStream(data)
+ f()
}
go s.serverWorker()
}
-func (s *Server) handleSingleStream(data *serverWorkerData) {
- defer data.wg.Done()
- s.handleStream(data.st, data.stream, s.traceInfo(data.st, data.stream))
-}
-
// initServerWorkers creates worker goroutines and a channel to process incoming
// connections to reduce the time spent overall on runtime.morestack.
func (s *Server) initServerWorkers() {
- s.serverWorkerChannel = make(chan *serverWorkerData)
+ s.serverWorkerChannel = make(chan func())
for i := uint32(0); i < s.opts.numServerWorkers; i++ {
go s.serverWorker()
}
@@ -982,21 +975,26 @@ func (s *Server) serveStreams(st transport.ServerTransport) {
defer st.Close(errors.New("finished serving streams for the server transport"))
var wg sync.WaitGroup
+ streamQuota := newHandlerQuota(s.opts.maxConcurrentStreams)
st.HandleStreams(func(stream *transport.Stream) {
wg.Add(1)
+
+ streamQuota.acquire()
+ f := func() {
+ defer streamQuota.release()
+ defer wg.Done()
+ s.handleStream(st, stream, s.traceInfo(st, stream))
+ }
+
if s.opts.numServerWorkers > 0 {
- data := &serverWorkerData{st: st, wg: &wg, stream: stream}
select {
- case s.serverWorkerChannel <- data:
+ case s.serverWorkerChannel <- f:
return
default:
// If all stream workers are busy, fallback to the default code path.
}
}
- go func() {
- defer wg.Done()
- s.handleStream(st, stream, s.traceInfo(st, stream))
- }()
+ go f()
}, func(ctx context.Context, method string) context.Context {
if !EnableTracing {
return ctx
@@ -2091,3 +2089,34 @@ func validateSendCompressor(name, clientCompressors string) error {
}
return fmt.Errorf("client does not support compressor %q", name)
}
+
+// atomicSemaphore implements a blocking, counting semaphore. acquire should be
+// called synchronously; release may be called asynchronously.
+type atomicSemaphore struct {
+ n atomic.Int64
+ wait chan struct{}
+}
+
+func (q *atomicSemaphore) acquire() {
+ if q.n.Add(-1) < 0 {
+ // We ran out of quota. Block until a release happens.
+ <-q.wait
+ }
+}
+
+func (q *atomicSemaphore) release() {
+ // N.B. the "<= 0" check below should allow for this to work with multiple
+ // concurrent calls to acquire, but also note that with synchronous calls to
+ // acquire, as our system does, n will never be less than -1. There are
+ // fairness issues (queuing) to consider if this was to be generalized.
+ if q.n.Add(1) <= 0 {
+ // An acquire was waiting on us. Unblock it.
+ q.wait <- struct{}{}
+ }
+}
+
+func newHandlerQuota(n uint32) *atomicSemaphore {
+ a := &atomicSemaphore{wait: make(chan struct{}, 1)}
+ a.n.Store(int64(n))
+ return a
+}