summaryrefslogtreecommitdiff
path: root/vendor/google.golang.org/grpc/internal/grpcsync/pubsub.go
diff options
context:
space:
mode:
authorLibravatar dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>2023-09-18 13:47:28 +0100
committerLibravatar GitHub <noreply@github.com>2023-09-18 13:47:28 +0100
commitc6fdcd52fabb6984de280f763ec5dc2023613054 (patch)
tree939de6cc265fb0c73ef40c2129c8eb298fd93b0c /vendor/google.golang.org/grpc/internal/grpcsync/pubsub.go
parent[chore]: Bump github.com/miekg/dns from 1.1.55 to 1.1.56 (#2204) (diff)
downloadgotosocial-c6fdcd52fabb6984de280f763ec5dc2023613054.tar.xz
[chore]: Bump go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp from 1.17.0 to 1.18.0 (#2207)
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
Diffstat (limited to 'vendor/google.golang.org/grpc/internal/grpcsync/pubsub.go')
-rw-r--r--vendor/google.golang.org/grpc/internal/grpcsync/pubsub.go43
1 files changed, 14 insertions, 29 deletions
diff --git a/vendor/google.golang.org/grpc/internal/grpcsync/pubsub.go b/vendor/google.golang.org/grpc/internal/grpcsync/pubsub.go
index f58b5ffa6..aef8cec1a 100644
--- a/vendor/google.golang.org/grpc/internal/grpcsync/pubsub.go
+++ b/vendor/google.golang.org/grpc/internal/grpcsync/pubsub.go
@@ -29,7 +29,7 @@ import (
type Subscriber interface {
// OnMessage is invoked when a new message is published. Implementations
// must not block in this method.
- OnMessage(msg interface{})
+ OnMessage(msg any)
}
// PubSub is a simple one-to-many publish-subscribe system that supports
@@ -40,25 +40,23 @@ type Subscriber interface {
// subscribers interested in receiving these messages register a callback
// via the Subscribe() method.
//
-// Once a PubSub is stopped, no more messages can be published, and
-// it is guaranteed that no more subscriber callback will be invoked.
+// Once a PubSub is stopped, no more messages can be published, but any pending
+// published messages will be delivered to the subscribers. Done may be used
+// to determine when all published messages have been delivered.
type PubSub struct {
- cs *CallbackSerializer
- cancel context.CancelFunc
+ cs *CallbackSerializer
// Access to the below fields are guarded by this mutex.
mu sync.Mutex
- msg interface{}
+ msg any
subscribers map[Subscriber]bool
- stopped bool
}
-// NewPubSub returns a new PubSub instance.
-func NewPubSub() *PubSub {
- ctx, cancel := context.WithCancel(context.Background())
+// NewPubSub returns a new PubSub instance. Users should cancel the
+// provided context to shutdown the PubSub.
+func NewPubSub(ctx context.Context) *PubSub {
return &PubSub{
cs: NewCallbackSerializer(ctx),
- cancel: cancel,
subscribers: map[Subscriber]bool{},
}
}
@@ -75,10 +73,6 @@ func (ps *PubSub) Subscribe(sub Subscriber) (cancel func()) {
ps.mu.Lock()
defer ps.mu.Unlock()
- if ps.stopped {
- return func() {}
- }
-
ps.subscribers[sub] = true
if ps.msg != nil {
@@ -102,14 +96,10 @@ func (ps *PubSub) Subscribe(sub Subscriber) (cancel func()) {
// Publish publishes the provided message to the PubSub, and invokes
// callbacks registered by subscribers asynchronously.
-func (ps *PubSub) Publish(msg interface{}) {
+func (ps *PubSub) Publish(msg any) {
ps.mu.Lock()
defer ps.mu.Unlock()
- if ps.stopped {
- return
- }
-
ps.msg = msg
for sub := range ps.subscribers {
s := sub
@@ -124,13 +114,8 @@ func (ps *PubSub) Publish(msg interface{}) {
}
}
-// Stop shuts down the PubSub and releases any resources allocated by it.
-// It is guaranteed that no subscriber callbacks would be invoked once this
-// method returns.
-func (ps *PubSub) Stop() {
- ps.mu.Lock()
- defer ps.mu.Unlock()
- ps.stopped = true
-
- ps.cancel()
+// Done returns a channel that is closed after the context passed to NewPubSub
+// is canceled and all updates have been sent to subscribers.
+func (ps *PubSub) Done() <-chan struct{} {
+ return ps.cs.Done()
}