diff options
Diffstat (limited to 'vendor/google.golang.org/grpc/internal/buffer')
| -rw-r--r-- | vendor/google.golang.org/grpc/internal/buffer/unbounded.go | 41 | 
1 files changed, 26 insertions, 15 deletions
diff --git a/vendor/google.golang.org/grpc/internal/buffer/unbounded.go b/vendor/google.golang.org/grpc/internal/buffer/unbounded.go index 4399c3df4..11f91668a 100644 --- a/vendor/google.golang.org/grpc/internal/buffer/unbounded.go +++ b/vendor/google.golang.org/grpc/internal/buffer/unbounded.go @@ -18,7 +18,10 @@  // Package buffer provides an implementation of an unbounded buffer.  package buffer -import "sync" +import ( +	"errors" +	"sync" +)  // Unbounded is an implementation of an unbounded buffer which does not use  // extra goroutines. This is typically used for passing updates from one entity @@ -36,6 +39,7 @@ import "sync"  type Unbounded struct {  	c       chan any  	closed  bool +	closing bool  	mu      sync.Mutex  	backlog []any  } @@ -45,32 +49,32 @@ func NewUnbounded() *Unbounded {  	return &Unbounded{c: make(chan any, 1)}  } +var errBufferClosed = errors.New("Put called on closed buffer.Unbounded") +  // Put adds t to the unbounded buffer. -func (b *Unbounded) Put(t any) { +func (b *Unbounded) Put(t any) error {  	b.mu.Lock()  	defer b.mu.Unlock() -	if b.closed { -		return +	if b.closing { +		return errBufferClosed  	}  	if len(b.backlog) == 0 {  		select {  		case b.c <- t: -			return +			return nil  		default:  		}  	}  	b.backlog = append(b.backlog, t) +	return nil  } -// Load sends the earliest buffered data, if any, onto the read channel -// returned by Get(). Users are expected to call this every time they read a +// Load sends the earliest buffered data, if any, onto the read channel returned +// by Get(). Users are expected to call this every time they successfully read a  // value from the read channel.  func (b *Unbounded) Load() {  	b.mu.Lock()  	defer b.mu.Unlock() -	if b.closed { -		return -	}  	if len(b.backlog) > 0 {  		select {  		case b.c <- b.backlog[0]: @@ -78,6 +82,8 @@ func (b *Unbounded) Load() {  			b.backlog = b.backlog[1:]  		default:  		} +	} else if b.closing && !b.closed { +		close(b.c)  	}  } @@ -88,18 +94,23 @@ func (b *Unbounded) Load() {  // send the next buffered value onto the channel if there is any.  //  // If the unbounded buffer is closed, the read channel returned by this method -// is closed. +// is closed after all data is drained.  func (b *Unbounded) Get() <-chan any {  	return b.c  } -// Close closes the unbounded buffer. +// Close closes the unbounded buffer. No subsequent data may be Put(), and the +// channel returned from Get() will be closed after all the data is read and +// Load() is called for the final time.  func (b *Unbounded) Close() {  	b.mu.Lock()  	defer b.mu.Unlock() -	if b.closed { +	if b.closing {  		return  	} -	b.closed = true -	close(b.c) +	b.closing = true +	if len(b.backlog) == 0 { +		b.closed = true +		close(b.c) +	}  }  | 
