summaryrefslogtreecommitdiff
path: root/vendor/google.golang.org/grpc/internal/grpcsync/pubsub.go
diff options
context:
space:
mode:
Diffstat (limited to 'vendor/google.golang.org/grpc/internal/grpcsync/pubsub.go')
-rw-r--r--vendor/google.golang.org/grpc/internal/grpcsync/pubsub.go121
1 files changed, 0 insertions, 121 deletions
diff --git a/vendor/google.golang.org/grpc/internal/grpcsync/pubsub.go b/vendor/google.golang.org/grpc/internal/grpcsync/pubsub.go
deleted file mode 100644
index 6d8c2f518..000000000
--- a/vendor/google.golang.org/grpc/internal/grpcsync/pubsub.go
+++ /dev/null
@@ -1,121 +0,0 @@
-/*
- *
- * Copyright 2023 gRPC authors.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package grpcsync
-
-import (
- "context"
- "sync"
-)
-
-// Subscriber represents an entity that is subscribed to messages published on
-// a PubSub. It wraps the callback to be invoked by the PubSub when a new
-// message is published.
-type Subscriber interface {
- // OnMessage is invoked when a new message is published. Implementations
- // must not block in this method.
- OnMessage(msg any)
-}
-
-// PubSub is a simple one-to-many publish-subscribe system that supports
-// messages of arbitrary type. It guarantees that messages are delivered in
-// the same order in which they were published.
-//
-// Publisher invokes the Publish() method to publish new messages, while
-// subscribers interested in receiving these messages register a callback
-// via the Subscribe() method.
-//
-// Once a PubSub is stopped, no more messages can be published, but any pending
-// published messages will be delivered to the subscribers. Done may be used
-// to determine when all published messages have been delivered.
-type PubSub struct {
- cs *CallbackSerializer
-
- // Access to the below fields are guarded by this mutex.
- mu sync.Mutex
- msg any
- subscribers map[Subscriber]bool
-}
-
-// NewPubSub returns a new PubSub instance. Users should cancel the
-// provided context to shutdown the PubSub.
-func NewPubSub(ctx context.Context) *PubSub {
- return &PubSub{
- cs: NewCallbackSerializer(ctx),
- subscribers: map[Subscriber]bool{},
- }
-}
-
-// Subscribe registers the provided Subscriber to the PubSub.
-//
-// If the PubSub contains a previously published message, the Subscriber's
-// OnMessage() callback will be invoked asynchronously with the existing
-// message to begin with, and subsequently for every newly published message.
-//
-// The caller is responsible for invoking the returned cancel function to
-// unsubscribe itself from the PubSub.
-func (ps *PubSub) Subscribe(sub Subscriber) (cancel func()) {
- ps.mu.Lock()
- defer ps.mu.Unlock()
-
- ps.subscribers[sub] = true
-
- if ps.msg != nil {
- msg := ps.msg
- ps.cs.TrySchedule(func(context.Context) {
- ps.mu.Lock()
- defer ps.mu.Unlock()
- if !ps.subscribers[sub] {
- return
- }
- sub.OnMessage(msg)
- })
- }
-
- return func() {
- ps.mu.Lock()
- defer ps.mu.Unlock()
- delete(ps.subscribers, sub)
- }
-}
-
-// Publish publishes the provided message to the PubSub, and invokes
-// callbacks registered by subscribers asynchronously.
-func (ps *PubSub) Publish(msg any) {
- ps.mu.Lock()
- defer ps.mu.Unlock()
-
- ps.msg = msg
- for sub := range ps.subscribers {
- s := sub
- ps.cs.TrySchedule(func(context.Context) {
- ps.mu.Lock()
- defer ps.mu.Unlock()
- if !ps.subscribers[s] {
- return
- }
- s.OnMessage(msg)
- })
- }
-}
-
-// Done returns a channel that is closed after the context passed to NewPubSub
-// is canceled and all updates have been sent to subscribers.
-func (ps *PubSub) Done() <-chan struct{} {
- return ps.cs.Done()
-}