diff options
Diffstat (limited to 'vendor/google.golang.org/grpc/internal/grpcsync')
| -rw-r--r-- | vendor/google.golang.org/grpc/internal/grpcsync/callback_serializer.go | 64 | ||||
| -rw-r--r-- | vendor/google.golang.org/grpc/internal/grpcsync/pubsub.go | 136 | 
2 files changed, 195 insertions, 5 deletions
| diff --git a/vendor/google.golang.org/grpc/internal/grpcsync/callback_serializer.go b/vendor/google.golang.org/grpc/internal/grpcsync/callback_serializer.go index 79993d343..37b8d4117 100644 --- a/vendor/google.golang.org/grpc/internal/grpcsync/callback_serializer.go +++ b/vendor/google.golang.org/grpc/internal/grpcsync/callback_serializer.go @@ -20,6 +20,7 @@ package grpcsync  import (  	"context" +	"sync"  	"google.golang.org/grpc/internal/buffer"  ) @@ -31,15 +32,26 @@ import (  //  // This type is safe for concurrent access.  type CallbackSerializer struct { +	// Done is closed once the serializer is shut down completely, i.e all +	// scheduled callbacks are executed and the serializer has deallocated all +	// its resources. +	Done chan struct{} +  	callbacks *buffer.Unbounded +	closedMu  sync.Mutex +	closed    bool  }  // NewCallbackSerializer returns a new CallbackSerializer instance. The provided  // context will be passed to the scheduled callbacks. Users should cancel the  // provided context to shutdown the CallbackSerializer. It is guaranteed that no -// callbacks will be executed once this context is canceled. +// callbacks will be added once this context is canceled, and any pending un-run +// callbacks will be executed before the serializer is shut down.  func NewCallbackSerializer(ctx context.Context) *CallbackSerializer { -	t := &CallbackSerializer{callbacks: buffer.NewUnbounded()} +	t := &CallbackSerializer{ +		Done:      make(chan struct{}), +		callbacks: buffer.NewUnbounded(), +	}  	go t.run(ctx)  	return t  } @@ -48,18 +60,60 @@ func NewCallbackSerializer(ctx context.Context) *CallbackSerializer {  //  // Callbacks are expected to honor the context when performing any blocking  // operations, and should return early when the context is canceled. -func (t *CallbackSerializer) Schedule(f func(ctx context.Context)) { +// +// Return value indicates if the callback was successfully added to the list of +// callbacks to be executed by the serializer. It is not possible to add +// callbacks once the context passed to NewCallbackSerializer is cancelled. +func (t *CallbackSerializer) Schedule(f func(ctx context.Context)) bool { +	t.closedMu.Lock() +	defer t.closedMu.Unlock() + +	if t.closed { +		return false +	}  	t.callbacks.Put(f) +	return true  }  func (t *CallbackSerializer) run(ctx context.Context) { +	var backlog []func(context.Context) + +	defer close(t.Done)  	for ctx.Err() == nil {  		select {  		case <-ctx.Done(): -			return -		case callback := <-t.callbacks.Get(): +			// Do nothing here. Next iteration of the for loop will not happen, +			// since ctx.Err() would be non-nil. +		case callback, ok := <-t.callbacks.Get(): +			if !ok { +				return +			}  			t.callbacks.Load()  			callback.(func(ctx context.Context))(ctx)  		}  	} + +	// Fetch pending callbacks if any, and execute them before returning from +	// this method and closing t.Done. +	t.closedMu.Lock() +	t.closed = true +	backlog = t.fetchPendingCallbacks() +	t.callbacks.Close() +	t.closedMu.Unlock() +	for _, b := range backlog { +		b(ctx) +	} +} + +func (t *CallbackSerializer) fetchPendingCallbacks() []func(context.Context) { +	var backlog []func(context.Context) +	for { +		select { +		case b := <-t.callbacks.Get(): +			backlog = append(backlog, b.(func(context.Context))) +			t.callbacks.Load() +		default: +			return backlog +		} +	}  } diff --git a/vendor/google.golang.org/grpc/internal/grpcsync/pubsub.go b/vendor/google.golang.org/grpc/internal/grpcsync/pubsub.go new file mode 100644 index 000000000..f58b5ffa6 --- /dev/null +++ b/vendor/google.golang.org/grpc/internal/grpcsync/pubsub.go @@ -0,0 +1,136 @@ +/* + * + * Copyright 2023 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + *     http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package grpcsync + +import ( +	"context" +	"sync" +) + +// Subscriber represents an entity that is subscribed to messages published on +// a PubSub. It wraps the callback to be invoked by the PubSub when a new +// message is published. +type Subscriber interface { +	// OnMessage is invoked when a new message is published. Implementations +	// must not block in this method. +	OnMessage(msg interface{}) +} + +// PubSub is a simple one-to-many publish-subscribe system that supports +// messages of arbitrary type. It guarantees that messages are delivered in +// the same order in which they were published. +// +// Publisher invokes the Publish() method to publish new messages, while +// subscribers interested in receiving these messages register a callback +// via the Subscribe() method. +// +// Once a PubSub is stopped, no more messages can be published, and +// it is guaranteed that no more subscriber callback will be invoked. +type PubSub struct { +	cs     *CallbackSerializer +	cancel context.CancelFunc + +	// Access to the below fields are guarded by this mutex. +	mu          sync.Mutex +	msg         interface{} +	subscribers map[Subscriber]bool +	stopped     bool +} + +// NewPubSub returns a new PubSub instance. +func NewPubSub() *PubSub { +	ctx, cancel := context.WithCancel(context.Background()) +	return &PubSub{ +		cs:          NewCallbackSerializer(ctx), +		cancel:      cancel, +		subscribers: map[Subscriber]bool{}, +	} +} + +// Subscribe registers the provided Subscriber to the PubSub. +// +// If the PubSub contains a previously published message, the Subscriber's +// OnMessage() callback will be invoked asynchronously with the existing +// message to begin with, and subsequently for every newly published message. +// +// The caller is responsible for invoking the returned cancel function to +// unsubscribe itself from the PubSub. +func (ps *PubSub) Subscribe(sub Subscriber) (cancel func()) { +	ps.mu.Lock() +	defer ps.mu.Unlock() + +	if ps.stopped { +		return func() {} +	} + +	ps.subscribers[sub] = true + +	if ps.msg != nil { +		msg := ps.msg +		ps.cs.Schedule(func(context.Context) { +			ps.mu.Lock() +			defer ps.mu.Unlock() +			if !ps.subscribers[sub] { +				return +			} +			sub.OnMessage(msg) +		}) +	} + +	return func() { +		ps.mu.Lock() +		defer ps.mu.Unlock() +		delete(ps.subscribers, sub) +	} +} + +// Publish publishes the provided message to the PubSub, and invokes +// callbacks registered by subscribers asynchronously. +func (ps *PubSub) Publish(msg interface{}) { +	ps.mu.Lock() +	defer ps.mu.Unlock() + +	if ps.stopped { +		return +	} + +	ps.msg = msg +	for sub := range ps.subscribers { +		s := sub +		ps.cs.Schedule(func(context.Context) { +			ps.mu.Lock() +			defer ps.mu.Unlock() +			if !ps.subscribers[s] { +				return +			} +			s.OnMessage(msg) +		}) +	} +} + +// Stop shuts down the PubSub and releases any resources allocated by it. +// It is guaranteed that no subscriber callbacks would be invoked once this +// method returns. +func (ps *PubSub) Stop() { +	ps.mu.Lock() +	defer ps.mu.Unlock() +	ps.stopped = true + +	ps.cancel() +} | 
