summaryrefslogtreecommitdiff
path: root/vendor/google.golang.org/grpc/internal/grpcsync
diff options
context:
space:
mode:
Diffstat (limited to 'vendor/google.golang.org/grpc/internal/grpcsync')
-rw-r--r--vendor/google.golang.org/grpc/internal/grpcsync/callback_serializer.go112
-rw-r--r--vendor/google.golang.org/grpc/internal/grpcsync/event.go61
-rw-r--r--vendor/google.golang.org/grpc/internal/grpcsync/oncefunc.go32
-rw-r--r--vendor/google.golang.org/grpc/internal/grpcsync/pubsub.go121
4 files changed, 0 insertions, 326 deletions
diff --git a/vendor/google.golang.org/grpc/internal/grpcsync/callback_serializer.go b/vendor/google.golang.org/grpc/internal/grpcsync/callback_serializer.go
deleted file mode 100644
index 8e8e86128..000000000
--- a/vendor/google.golang.org/grpc/internal/grpcsync/callback_serializer.go
+++ /dev/null
@@ -1,112 +0,0 @@
-/*
- *
- * Copyright 2022 gRPC authors.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package grpcsync
-
-import (
- "context"
-
- "google.golang.org/grpc/internal/buffer"
-)
-
-// CallbackSerializer provides a mechanism to schedule callbacks in a
-// synchronized manner. It provides a FIFO guarantee on the order of execution
-// of scheduled callbacks. New callbacks can be scheduled by invoking the
-// Schedule() method.
-//
-// This type is safe for concurrent access.
-type CallbackSerializer struct {
- // done is closed once the serializer is shut down completely, i.e all
- // scheduled callbacks are executed and the serializer has deallocated all
- // its resources.
- done chan struct{}
-
- callbacks *buffer.Unbounded
-}
-
-// NewCallbackSerializer returns a new CallbackSerializer instance. The provided
-// context will be passed to the scheduled callbacks. Users should cancel the
-// provided context to shutdown the CallbackSerializer. It is guaranteed that no
-// callbacks will be added once this context is canceled, and any pending un-run
-// callbacks will be executed before the serializer is shut down.
-func NewCallbackSerializer(ctx context.Context) *CallbackSerializer {
- cs := &CallbackSerializer{
- done: make(chan struct{}),
- callbacks: buffer.NewUnbounded(),
- }
- go cs.run(ctx)
- return cs
-}
-
-// TrySchedule tries to schedule the provided callback function f to be
-// executed in the order it was added. This is a best-effort operation. If the
-// context passed to NewCallbackSerializer was canceled before this method is
-// called, the callback will not be scheduled.
-//
-// Callbacks are expected to honor the context when performing any blocking
-// operations, and should return early when the context is canceled.
-func (cs *CallbackSerializer) TrySchedule(f func(ctx context.Context)) {
- cs.callbacks.Put(f)
-}
-
-// ScheduleOr schedules the provided callback function f to be executed in the
-// order it was added. If the context passed to NewCallbackSerializer has been
-// canceled before this method is called, the onFailure callback will be
-// executed inline instead.
-//
-// Callbacks are expected to honor the context when performing any blocking
-// operations, and should return early when the context is canceled.
-func (cs *CallbackSerializer) ScheduleOr(f func(ctx context.Context), onFailure func()) {
- if cs.callbacks.Put(f) != nil {
- onFailure()
- }
-}
-
-func (cs *CallbackSerializer) run(ctx context.Context) {
- defer close(cs.done)
-
- // TODO: when Go 1.21 is the oldest supported version, this loop and Close
- // can be replaced with:
- //
- // context.AfterFunc(ctx, cs.callbacks.Close)
- for ctx.Err() == nil {
- select {
- case <-ctx.Done():
- // Do nothing here. Next iteration of the for loop will not happen,
- // since ctx.Err() would be non-nil.
- case cb := <-cs.callbacks.Get():
- cs.callbacks.Load()
- cb.(func(context.Context))(ctx)
- }
- }
-
- // Close the buffer to prevent new callbacks from being added.
- cs.callbacks.Close()
-
- // Run all pending callbacks.
- for cb := range cs.callbacks.Get() {
- cs.callbacks.Load()
- cb.(func(context.Context))(ctx)
- }
-}
-
-// Done returns a channel that is closed after the context passed to
-// NewCallbackSerializer is canceled and all callbacks have been executed.
-func (cs *CallbackSerializer) Done() <-chan struct{} {
- return cs.done
-}
diff --git a/vendor/google.golang.org/grpc/internal/grpcsync/event.go b/vendor/google.golang.org/grpc/internal/grpcsync/event.go
deleted file mode 100644
index fbe697c37..000000000
--- a/vendor/google.golang.org/grpc/internal/grpcsync/event.go
+++ /dev/null
@@ -1,61 +0,0 @@
-/*
- *
- * Copyright 2018 gRPC authors.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-// Package grpcsync implements additional synchronization primitives built upon
-// the sync package.
-package grpcsync
-
-import (
- "sync"
- "sync/atomic"
-)
-
-// Event represents a one-time event that may occur in the future.
-type Event struct {
- fired int32
- c chan struct{}
- o sync.Once
-}
-
-// Fire causes e to complete. It is safe to call multiple times, and
-// concurrently. It returns true iff this call to Fire caused the signaling
-// channel returned by Done to close.
-func (e *Event) Fire() bool {
- ret := false
- e.o.Do(func() {
- atomic.StoreInt32(&e.fired, 1)
- close(e.c)
- ret = true
- })
- return ret
-}
-
-// Done returns a channel that will be closed when Fire is called.
-func (e *Event) Done() <-chan struct{} {
- return e.c
-}
-
-// HasFired returns true if Fire has been called.
-func (e *Event) HasFired() bool {
- return atomic.LoadInt32(&e.fired) == 1
-}
-
-// NewEvent returns a new, ready-to-use Event.
-func NewEvent() *Event {
- return &Event{c: make(chan struct{})}
-}
diff --git a/vendor/google.golang.org/grpc/internal/grpcsync/oncefunc.go b/vendor/google.golang.org/grpc/internal/grpcsync/oncefunc.go
deleted file mode 100644
index 6635f7bca..000000000
--- a/vendor/google.golang.org/grpc/internal/grpcsync/oncefunc.go
+++ /dev/null
@@ -1,32 +0,0 @@
-/*
- *
- * Copyright 2022 gRPC authors.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package grpcsync
-
-import (
- "sync"
-)
-
-// OnceFunc returns a function wrapping f which ensures f is only executed
-// once even if the returned function is executed multiple times.
-func OnceFunc(f func()) func() {
- var once sync.Once
- return func() {
- once.Do(f)
- }
-}
diff --git a/vendor/google.golang.org/grpc/internal/grpcsync/pubsub.go b/vendor/google.golang.org/grpc/internal/grpcsync/pubsub.go
deleted file mode 100644
index 6d8c2f518..000000000
--- a/vendor/google.golang.org/grpc/internal/grpcsync/pubsub.go
+++ /dev/null
@@ -1,121 +0,0 @@
-/*
- *
- * Copyright 2023 gRPC authors.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package grpcsync
-
-import (
- "context"
- "sync"
-)
-
-// Subscriber represents an entity that is subscribed to messages published on
-// a PubSub. It wraps the callback to be invoked by the PubSub when a new
-// message is published.
-type Subscriber interface {
- // OnMessage is invoked when a new message is published. Implementations
- // must not block in this method.
- OnMessage(msg any)
-}
-
-// PubSub is a simple one-to-many publish-subscribe system that supports
-// messages of arbitrary type. It guarantees that messages are delivered in
-// the same order in which they were published.
-//
-// Publisher invokes the Publish() method to publish new messages, while
-// subscribers interested in receiving these messages register a callback
-// via the Subscribe() method.
-//
-// Once a PubSub is stopped, no more messages can be published, but any pending
-// published messages will be delivered to the subscribers. Done may be used
-// to determine when all published messages have been delivered.
-type PubSub struct {
- cs *CallbackSerializer
-
- // Access to the below fields are guarded by this mutex.
- mu sync.Mutex
- msg any
- subscribers map[Subscriber]bool
-}
-
-// NewPubSub returns a new PubSub instance. Users should cancel the
-// provided context to shutdown the PubSub.
-func NewPubSub(ctx context.Context) *PubSub {
- return &PubSub{
- cs: NewCallbackSerializer(ctx),
- subscribers: map[Subscriber]bool{},
- }
-}
-
-// Subscribe registers the provided Subscriber to the PubSub.
-//
-// If the PubSub contains a previously published message, the Subscriber's
-// OnMessage() callback will be invoked asynchronously with the existing
-// message to begin with, and subsequently for every newly published message.
-//
-// The caller is responsible for invoking the returned cancel function to
-// unsubscribe itself from the PubSub.
-func (ps *PubSub) Subscribe(sub Subscriber) (cancel func()) {
- ps.mu.Lock()
- defer ps.mu.Unlock()
-
- ps.subscribers[sub] = true
-
- if ps.msg != nil {
- msg := ps.msg
- ps.cs.TrySchedule(func(context.Context) {
- ps.mu.Lock()
- defer ps.mu.Unlock()
- if !ps.subscribers[sub] {
- return
- }
- sub.OnMessage(msg)
- })
- }
-
- return func() {
- ps.mu.Lock()
- defer ps.mu.Unlock()
- delete(ps.subscribers, sub)
- }
-}
-
-// Publish publishes the provided message to the PubSub, and invokes
-// callbacks registered by subscribers asynchronously.
-func (ps *PubSub) Publish(msg any) {
- ps.mu.Lock()
- defer ps.mu.Unlock()
-
- ps.msg = msg
- for sub := range ps.subscribers {
- s := sub
- ps.cs.TrySchedule(func(context.Context) {
- ps.mu.Lock()
- defer ps.mu.Unlock()
- if !ps.subscribers[s] {
- return
- }
- s.OnMessage(msg)
- })
- }
-}
-
-// Done returns a channel that is closed after the context passed to NewPubSub
-// is canceled and all updates have been sent to subscribers.
-func (ps *PubSub) Done() <-chan struct{} {
- return ps.cs.Done()
-}