diff options
author | 2024-03-11 15:34:34 +0100 | |
---|---|---|
committer | 2024-03-11 15:34:34 +0100 | |
commit | 5e871e81a87a638b07d540c15d1b95608843255d (patch) | |
tree | 62db65c7de651bac3d8894f4f70e0fe8de853a5e /vendor/google.golang.org/grpc/internal/buffer/unbounded.go | |
parent | [chore]: Bump github.com/minio/minio-go/v7 from 7.0.67 to 7.0.69 (#2748) (diff) | |
download | gotosocial-5e871e81a87a638b07d540c15d1b95608843255d.tar.xz |
[chore] Update usage of OTEL libraries (#2725)
* otel to 1.24
* prometheus exporter to 0.46
* bunotel to 1.1.17
Also:
* Use schemaless URL for metrics
* Add software version to tracing schema
Diffstat (limited to 'vendor/google.golang.org/grpc/internal/buffer/unbounded.go')
-rw-r--r-- | vendor/google.golang.org/grpc/internal/buffer/unbounded.go | 41 |
1 files changed, 26 insertions, 15 deletions
diff --git a/vendor/google.golang.org/grpc/internal/buffer/unbounded.go b/vendor/google.golang.org/grpc/internal/buffer/unbounded.go index 4399c3df4..11f91668a 100644 --- a/vendor/google.golang.org/grpc/internal/buffer/unbounded.go +++ b/vendor/google.golang.org/grpc/internal/buffer/unbounded.go @@ -18,7 +18,10 @@ // Package buffer provides an implementation of an unbounded buffer. package buffer -import "sync" +import ( + "errors" + "sync" +) // Unbounded is an implementation of an unbounded buffer which does not use // extra goroutines. This is typically used for passing updates from one entity @@ -36,6 +39,7 @@ import "sync" type Unbounded struct { c chan any closed bool + closing bool mu sync.Mutex backlog []any } @@ -45,32 +49,32 @@ func NewUnbounded() *Unbounded { return &Unbounded{c: make(chan any, 1)} } +var errBufferClosed = errors.New("Put called on closed buffer.Unbounded") + // Put adds t to the unbounded buffer. -func (b *Unbounded) Put(t any) { +func (b *Unbounded) Put(t any) error { b.mu.Lock() defer b.mu.Unlock() - if b.closed { - return + if b.closing { + return errBufferClosed } if len(b.backlog) == 0 { select { case b.c <- t: - return + return nil default: } } b.backlog = append(b.backlog, t) + return nil } -// Load sends the earliest buffered data, if any, onto the read channel -// returned by Get(). Users are expected to call this every time they read a +// Load sends the earliest buffered data, if any, onto the read channel returned +// by Get(). Users are expected to call this every time they successfully read a // value from the read channel. func (b *Unbounded) Load() { b.mu.Lock() defer b.mu.Unlock() - if b.closed { - return - } if len(b.backlog) > 0 { select { case b.c <- b.backlog[0]: @@ -78,6 +82,8 @@ func (b *Unbounded) Load() { b.backlog = b.backlog[1:] default: } + } else if b.closing && !b.closed { + close(b.c) } } @@ -88,18 +94,23 @@ func (b *Unbounded) Load() { // send the next buffered value onto the channel if there is any. // // If the unbounded buffer is closed, the read channel returned by this method -// is closed. +// is closed after all data is drained. func (b *Unbounded) Get() <-chan any { return b.c } -// Close closes the unbounded buffer. +// Close closes the unbounded buffer. No subsequent data may be Put(), and the +// channel returned from Get() will be closed after all the data is read and +// Load() is called for the final time. func (b *Unbounded) Close() { b.mu.Lock() defer b.mu.Unlock() - if b.closed { + if b.closing { return } - b.closed = true - close(b.c) + b.closing = true + if len(b.backlog) == 0 { + b.closed = true + close(b.c) + } } |