summaryrefslogtreecommitdiff
path: root/vendor/google.golang.org/grpc/internal/grpcsync
diff options
context:
space:
mode:
Diffstat (limited to 'vendor/google.golang.org/grpc/internal/grpcsync')
-rw-r--r--vendor/google.golang.org/grpc/internal/grpcsync/callback_serializer.go54
-rw-r--r--vendor/google.golang.org/grpc/internal/grpcsync/pubsub.go43
2 files changed, 44 insertions, 53 deletions
diff --git a/vendor/google.golang.org/grpc/internal/grpcsync/callback_serializer.go b/vendor/google.golang.org/grpc/internal/grpcsync/callback_serializer.go
index 37b8d4117..900917dbe 100644
--- a/vendor/google.golang.org/grpc/internal/grpcsync/callback_serializer.go
+++ b/vendor/google.golang.org/grpc/internal/grpcsync/callback_serializer.go
@@ -32,10 +32,10 @@ import (
//
// This type is safe for concurrent access.
type CallbackSerializer struct {
- // Done is closed once the serializer is shut down completely, i.e all
+ // done is closed once the serializer is shut down completely, i.e all
// scheduled callbacks are executed and the serializer has deallocated all
// its resources.
- Done chan struct{}
+ done chan struct{}
callbacks *buffer.Unbounded
closedMu sync.Mutex
@@ -48,12 +48,12 @@ type CallbackSerializer struct {
// callbacks will be added once this context is canceled, and any pending un-run
// callbacks will be executed before the serializer is shut down.
func NewCallbackSerializer(ctx context.Context) *CallbackSerializer {
- t := &CallbackSerializer{
- Done: make(chan struct{}),
+ cs := &CallbackSerializer{
+ done: make(chan struct{}),
callbacks: buffer.NewUnbounded(),
}
- go t.run(ctx)
- return t
+ go cs.run(ctx)
+ return cs
}
// Schedule adds a callback to be scheduled after existing callbacks are run.
@@ -64,56 +64,62 @@ func NewCallbackSerializer(ctx context.Context) *CallbackSerializer {
// Return value indicates if the callback was successfully added to the list of
// callbacks to be executed by the serializer. It is not possible to add
// callbacks once the context passed to NewCallbackSerializer is cancelled.
-func (t *CallbackSerializer) Schedule(f func(ctx context.Context)) bool {
- t.closedMu.Lock()
- defer t.closedMu.Unlock()
+func (cs *CallbackSerializer) Schedule(f func(ctx context.Context)) bool {
+ cs.closedMu.Lock()
+ defer cs.closedMu.Unlock()
- if t.closed {
+ if cs.closed {
return false
}
- t.callbacks.Put(f)
+ cs.callbacks.Put(f)
return true
}
-func (t *CallbackSerializer) run(ctx context.Context) {
+func (cs *CallbackSerializer) run(ctx context.Context) {
var backlog []func(context.Context)
- defer close(t.Done)
+ defer close(cs.done)
for ctx.Err() == nil {
select {
case <-ctx.Done():
// Do nothing here. Next iteration of the for loop will not happen,
// since ctx.Err() would be non-nil.
- case callback, ok := <-t.callbacks.Get():
+ case callback, ok := <-cs.callbacks.Get():
if !ok {
return
}
- t.callbacks.Load()
+ cs.callbacks.Load()
callback.(func(ctx context.Context))(ctx)
}
}
// Fetch pending callbacks if any, and execute them before returning from
- // this method and closing t.Done.
- t.closedMu.Lock()
- t.closed = true
- backlog = t.fetchPendingCallbacks()
- t.callbacks.Close()
- t.closedMu.Unlock()
+ // this method and closing cs.done.
+ cs.closedMu.Lock()
+ cs.closed = true
+ backlog = cs.fetchPendingCallbacks()
+ cs.callbacks.Close()
+ cs.closedMu.Unlock()
for _, b := range backlog {
b(ctx)
}
}
-func (t *CallbackSerializer) fetchPendingCallbacks() []func(context.Context) {
+func (cs *CallbackSerializer) fetchPendingCallbacks() []func(context.Context) {
var backlog []func(context.Context)
for {
select {
- case b := <-t.callbacks.Get():
+ case b := <-cs.callbacks.Get():
backlog = append(backlog, b.(func(context.Context)))
- t.callbacks.Load()
+ cs.callbacks.Load()
default:
return backlog
}
}
}
+
+// Done returns a channel that is closed after the context passed to
+// NewCallbackSerializer is canceled and all callbacks have been executed.
+func (cs *CallbackSerializer) Done() <-chan struct{} {
+ return cs.done
+}
diff --git a/vendor/google.golang.org/grpc/internal/grpcsync/pubsub.go b/vendor/google.golang.org/grpc/internal/grpcsync/pubsub.go
index f58b5ffa6..aef8cec1a 100644
--- a/vendor/google.golang.org/grpc/internal/grpcsync/pubsub.go
+++ b/vendor/google.golang.org/grpc/internal/grpcsync/pubsub.go
@@ -29,7 +29,7 @@ import (
type Subscriber interface {
// OnMessage is invoked when a new message is published. Implementations
// must not block in this method.
- OnMessage(msg interface{})
+ OnMessage(msg any)
}
// PubSub is a simple one-to-many publish-subscribe system that supports
@@ -40,25 +40,23 @@ type Subscriber interface {
// subscribers interested in receiving these messages register a callback
// via the Subscribe() method.
//
-// Once a PubSub is stopped, no more messages can be published, and
-// it is guaranteed that no more subscriber callback will be invoked.
+// Once a PubSub is stopped, no more messages can be published, but any pending
+// published messages will be delivered to the subscribers. Done may be used
+// to determine when all published messages have been delivered.
type PubSub struct {
- cs *CallbackSerializer
- cancel context.CancelFunc
+ cs *CallbackSerializer
// Access to the below fields are guarded by this mutex.
mu sync.Mutex
- msg interface{}
+ msg any
subscribers map[Subscriber]bool
- stopped bool
}
-// NewPubSub returns a new PubSub instance.
-func NewPubSub() *PubSub {
- ctx, cancel := context.WithCancel(context.Background())
+// NewPubSub returns a new PubSub instance. Users should cancel the
+// provided context to shutdown the PubSub.
+func NewPubSub(ctx context.Context) *PubSub {
return &PubSub{
cs: NewCallbackSerializer(ctx),
- cancel: cancel,
subscribers: map[Subscriber]bool{},
}
}
@@ -75,10 +73,6 @@ func (ps *PubSub) Subscribe(sub Subscriber) (cancel func()) {
ps.mu.Lock()
defer ps.mu.Unlock()
- if ps.stopped {
- return func() {}
- }
-
ps.subscribers[sub] = true
if ps.msg != nil {
@@ -102,14 +96,10 @@ func (ps *PubSub) Subscribe(sub Subscriber) (cancel func()) {
// Publish publishes the provided message to the PubSub, and invokes
// callbacks registered by subscribers asynchronously.
-func (ps *PubSub) Publish(msg interface{}) {
+func (ps *PubSub) Publish(msg any) {
ps.mu.Lock()
defer ps.mu.Unlock()
- if ps.stopped {
- return
- }
-
ps.msg = msg
for sub := range ps.subscribers {
s := sub
@@ -124,13 +114,8 @@ func (ps *PubSub) Publish(msg interface{}) {
}
}
-// Stop shuts down the PubSub and releases any resources allocated by it.
-// It is guaranteed that no subscriber callbacks would be invoked once this
-// method returns.
-func (ps *PubSub) Stop() {
- ps.mu.Lock()
- defer ps.mu.Unlock()
- ps.stopped = true
-
- ps.cancel()
+// Done returns a channel that is closed after the context passed to NewPubSub
+// is canceled and all updates have been sent to subscribers.
+func (ps *PubSub) Done() <-chan struct{} {
+ return ps.cs.Done()
}