summaryrefslogtreecommitdiff
path: root/vendor/google.golang.org/grpc/server.go
diff options
context:
space:
mode:
Diffstat (limited to 'vendor/google.golang.org/grpc/server.go')
-rw-r--r--vendor/google.golang.org/grpc/server.go52
1 files changed, 22 insertions, 30 deletions
diff --git a/vendor/google.golang.org/grpc/server.go b/vendor/google.golang.org/grpc/server.go
index 087b9ad7c..76d152a69 100644
--- a/vendor/google.golang.org/grpc/server.go
+++ b/vendor/google.golang.org/grpc/server.go
@@ -43,7 +43,6 @@ import (
"google.golang.org/grpc/internal"
"google.golang.org/grpc/internal/binarylog"
"google.golang.org/grpc/internal/channelz"
- "google.golang.org/grpc/internal/grpcrand"
"google.golang.org/grpc/internal/grpcsync"
"google.golang.org/grpc/internal/grpcutil"
"google.golang.org/grpc/internal/transport"
@@ -146,7 +145,7 @@ type Server struct {
channelzID *channelz.Identifier
czData *channelzData
- serverWorkerChannels []chan *serverWorkerData
+ serverWorkerChannel chan *serverWorkerData
}
type serverOptions struct {
@@ -561,40 +560,38 @@ func NumStreamWorkers(numServerWorkers uint32) ServerOption {
const serverWorkerResetThreshold = 1 << 16
// serverWorkers blocks on a *transport.Stream channel forever and waits for
-// data to be fed by serveStreams. This allows different requests to be
+// data to be fed by serveStreams. This allows multiple requests to be
// processed by the same goroutine, removing the need for expensive stack
// re-allocations (see the runtime.morestack problem [1]).
//
// [1] https://github.com/golang/go/issues/18138
-func (s *Server) serverWorker(ch chan *serverWorkerData) {
- // To make sure all server workers don't reset at the same time, choose a
- // random number of iterations before resetting.
- threshold := serverWorkerResetThreshold + grpcrand.Intn(serverWorkerResetThreshold)
- for completed := 0; completed < threshold; completed++ {
- data, ok := <-ch
+func (s *Server) serverWorker() {
+ for completed := 0; completed < serverWorkerResetThreshold; completed++ {
+ data, ok := <-s.serverWorkerChannel
if !ok {
return
}
- s.handleStream(data.st, data.stream, s.traceInfo(data.st, data.stream))
- data.wg.Done()
+ s.handleSingleStream(data)
}
- go s.serverWorker(ch)
+ go s.serverWorker()
}
-// initServerWorkers creates worker goroutines and channels to process incoming
+func (s *Server) handleSingleStream(data *serverWorkerData) {
+ defer data.wg.Done()
+ s.handleStream(data.st, data.stream, s.traceInfo(data.st, data.stream))
+}
+
+// initServerWorkers creates worker goroutines and a channel to process incoming
// connections to reduce the time spent overall on runtime.morestack.
func (s *Server) initServerWorkers() {
- s.serverWorkerChannels = make([]chan *serverWorkerData, s.opts.numServerWorkers)
+ s.serverWorkerChannel = make(chan *serverWorkerData)
for i := uint32(0); i < s.opts.numServerWorkers; i++ {
- s.serverWorkerChannels[i] = make(chan *serverWorkerData)
- go s.serverWorker(s.serverWorkerChannels[i])
+ go s.serverWorker()
}
}
func (s *Server) stopServerWorkers() {
- for i := uint32(0); i < s.opts.numServerWorkers; i++ {
- close(s.serverWorkerChannels[i])
- }
+ close(s.serverWorkerChannel)
}
// NewServer creates a gRPC server which has no service registered and has not
@@ -946,26 +943,21 @@ func (s *Server) serveStreams(st transport.ServerTransport) {
defer st.Close(errors.New("finished serving streams for the server transport"))
var wg sync.WaitGroup
- var roundRobinCounter uint32
st.HandleStreams(func(stream *transport.Stream) {
wg.Add(1)
if s.opts.numServerWorkers > 0 {
data := &serverWorkerData{st: st, wg: &wg, stream: stream}
select {
- case s.serverWorkerChannels[atomic.AddUint32(&roundRobinCounter, 1)%s.opts.numServerWorkers] <- data:
+ case s.serverWorkerChannel <- data:
+ return
default:
// If all stream workers are busy, fallback to the default code path.
- go func() {
- s.handleStream(st, stream, s.traceInfo(st, stream))
- wg.Done()
- }()
}
- } else {
- go func() {
- defer wg.Done()
- s.handleStream(st, stream, s.traceInfo(st, stream))
- }()
}
+ go func() {
+ defer wg.Done()
+ s.handleStream(st, stream, s.traceInfo(st, stream))
+ }()
}, func(ctx context.Context, method string) context.Context {
if !EnableTracing {
return ctx