diff options
| author | 2024-03-11 15:34:34 +0100 | |
|---|---|---|
| committer | 2024-03-11 15:34:34 +0100 | |
| commit | 5e871e81a87a638b07d540c15d1b95608843255d (patch) | |
| tree | 62db65c7de651bac3d8894f4f70e0fe8de853a5e /vendor/google.golang.org/grpc/internal/buffer | |
| parent | [chore]: Bump github.com/minio/minio-go/v7 from 7.0.67 to 7.0.69 (#2748) (diff) | |
| download | gotosocial-5e871e81a87a638b07d540c15d1b95608843255d.tar.xz | |
[chore] Update usage of OTEL libraries (#2725)
* otel to 1.24
* prometheus exporter to 0.46
* bunotel to 1.1.17
Also:
* Use schemaless URL for metrics
* Add software version to tracing schema
Diffstat (limited to 'vendor/google.golang.org/grpc/internal/buffer')
| -rw-r--r-- | vendor/google.golang.org/grpc/internal/buffer/unbounded.go | 41 | 
1 files changed, 26 insertions, 15 deletions
diff --git a/vendor/google.golang.org/grpc/internal/buffer/unbounded.go b/vendor/google.golang.org/grpc/internal/buffer/unbounded.go index 4399c3df4..11f91668a 100644 --- a/vendor/google.golang.org/grpc/internal/buffer/unbounded.go +++ b/vendor/google.golang.org/grpc/internal/buffer/unbounded.go @@ -18,7 +18,10 @@  // Package buffer provides an implementation of an unbounded buffer.  package buffer -import "sync" +import ( +	"errors" +	"sync" +)  // Unbounded is an implementation of an unbounded buffer which does not use  // extra goroutines. This is typically used for passing updates from one entity @@ -36,6 +39,7 @@ import "sync"  type Unbounded struct {  	c       chan any  	closed  bool +	closing bool  	mu      sync.Mutex  	backlog []any  } @@ -45,32 +49,32 @@ func NewUnbounded() *Unbounded {  	return &Unbounded{c: make(chan any, 1)}  } +var errBufferClosed = errors.New("Put called on closed buffer.Unbounded") +  // Put adds t to the unbounded buffer. -func (b *Unbounded) Put(t any) { +func (b *Unbounded) Put(t any) error {  	b.mu.Lock()  	defer b.mu.Unlock() -	if b.closed { -		return +	if b.closing { +		return errBufferClosed  	}  	if len(b.backlog) == 0 {  		select {  		case b.c <- t: -			return +			return nil  		default:  		}  	}  	b.backlog = append(b.backlog, t) +	return nil  } -// Load sends the earliest buffered data, if any, onto the read channel -// returned by Get(). Users are expected to call this every time they read a +// Load sends the earliest buffered data, if any, onto the read channel returned +// by Get(). Users are expected to call this every time they successfully read a  // value from the read channel.  func (b *Unbounded) Load() {  	b.mu.Lock()  	defer b.mu.Unlock() -	if b.closed { -		return -	}  	if len(b.backlog) > 0 {  		select {  		case b.c <- b.backlog[0]: @@ -78,6 +82,8 @@ func (b *Unbounded) Load() {  			b.backlog = b.backlog[1:]  		default:  		} +	} else if b.closing && !b.closed { +		close(b.c)  	}  } @@ -88,18 +94,23 @@ func (b *Unbounded) Load() {  // send the next buffered value onto the channel if there is any.  //  // If the unbounded buffer is closed, the read channel returned by this method -// is closed. +// is closed after all data is drained.  func (b *Unbounded) Get() <-chan any {  	return b.c  } -// Close closes the unbounded buffer. +// Close closes the unbounded buffer. No subsequent data may be Put(), and the +// channel returned from Get() will be closed after all the data is read and +// Load() is called for the final time.  func (b *Unbounded) Close() {  	b.mu.Lock()  	defer b.mu.Unlock() -	if b.closed { +	if b.closing {  		return  	} -	b.closed = true -	close(b.c) +	b.closing = true +	if len(b.backlog) == 0 { +		b.closed = true +		close(b.c) +	}  }  | 
