summaryrefslogtreecommitdiff
path: root/vendor/google.golang.org/grpc/internal/grpcsync/pubsub.go
blob: aef8cec1ab0cdeba1834b65ebb3ab3659735ad4e (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
/*
 *
 * Copyright 2023 gRPC authors.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 *
 */

package grpcsync

import (
	"context"
	"sync"
)

// Subscriber represents an entity that is subscribed to messages published on
// a PubSub. It wraps the callback to be invoked by the PubSub when a new
// message is published.
type Subscriber interface {
	// OnMessage is invoked when a new message is published. Implementations
	// must not block in this method.
	OnMessage(msg any)
}

// PubSub is a simple one-to-many publish-subscribe system that supports
// messages of arbitrary type. It guarantees that messages are delivered in
// the same order in which they were published.
//
// Publisher invokes the Publish() method to publish new messages, while
// subscribers interested in receiving these messages register a callback
// via the Subscribe() method.
//
// Once a PubSub is stopped, no more messages can be published, but any pending
// published messages will be delivered to the subscribers.  Done may be used
// to determine when all published messages have been delivered.
type PubSub struct {
	cs *CallbackSerializer

	// Access to the below fields are guarded by this mutex.
	mu          sync.Mutex
	msg         any
	subscribers map[Subscriber]bool
}

// NewPubSub returns a new PubSub instance.  Users should cancel the
// provided context to shutdown the PubSub.
func NewPubSub(ctx context.Context) *PubSub {
	return &PubSub{
		cs:          NewCallbackSerializer(ctx),
		subscribers: map[Subscriber]bool{},
	}
}

// Subscribe registers the provided Subscriber to the PubSub.
//
// If the PubSub contains a previously published message, the Subscriber's
// OnMessage() callback will be invoked asynchronously with the existing
// message to begin with, and subsequently for every newly published message.
//
// The caller is responsible for invoking the returned cancel function to
// unsubscribe itself from the PubSub.
func (ps *PubSub) Subscribe(sub Subscriber) (cancel func()) {
	ps.mu.Lock()
	defer ps.mu.Unlock()

	ps.subscribers[sub] = true

	if ps.msg != nil {
		msg := ps.msg
		ps.cs.Schedule(func(context.Context) {
			ps.mu.Lock()
			defer ps.mu.Unlock()
			if !ps.subscribers[sub] {
				return
			}
			sub.OnMessage(msg)
		})
	}

	return func() {
		ps.mu.Lock()
		defer ps.mu.Unlock()
		delete(ps.subscribers, sub)
	}
}

// Publish publishes the provided message to the PubSub, and invokes
// callbacks registered by subscribers asynchronously.
func (ps *PubSub) Publish(msg any) {
	ps.mu.Lock()
	defer ps.mu.Unlock()

	ps.msg = msg
	for sub := range ps.subscribers {
		s := sub
		ps.cs.Schedule(func(context.Context) {
			ps.mu.Lock()
			defer ps.mu.Unlock()
			if !ps.subscribers[s] {
				return
			}
			s.OnMessage(msg)
		})
	}
}

// Done returns a channel that is closed after the context passed to NewPubSub
// is canceled and all updates have been sent to subscribers.
func (ps *PubSub) Done() <-chan struct{} {
	return ps.cs.Done()
}