summaryrefslogtreecommitdiff
path: root/vendor/google.golang.org/grpc/internal/buffer/unbounded.go
diff options
context:
space:
mode:
authorLibravatar Daenney <daenney@users.noreply.github.com>2024-03-11 15:34:34 +0100
committerLibravatar GitHub <noreply@github.com>2024-03-11 15:34:34 +0100
commit5e871e81a87a638b07d540c15d1b95608843255d (patch)
tree62db65c7de651bac3d8894f4f70e0fe8de853a5e /vendor/google.golang.org/grpc/internal/buffer/unbounded.go
parent[chore]: Bump github.com/minio/minio-go/v7 from 7.0.67 to 7.0.69 (#2748) (diff)
downloadgotosocial-5e871e81a87a638b07d540c15d1b95608843255d.tar.xz
[chore] Update usage of OTEL libraries (#2725)
* otel to 1.24 * prometheus exporter to 0.46 * bunotel to 1.1.17 Also: * Use schemaless URL for metrics * Add software version to tracing schema
Diffstat (limited to 'vendor/google.golang.org/grpc/internal/buffer/unbounded.go')
-rw-r--r--vendor/google.golang.org/grpc/internal/buffer/unbounded.go41
1 files changed, 26 insertions, 15 deletions
diff --git a/vendor/google.golang.org/grpc/internal/buffer/unbounded.go b/vendor/google.golang.org/grpc/internal/buffer/unbounded.go
index 4399c3df4..11f91668a 100644
--- a/vendor/google.golang.org/grpc/internal/buffer/unbounded.go
+++ b/vendor/google.golang.org/grpc/internal/buffer/unbounded.go
@@ -18,7 +18,10 @@
// Package buffer provides an implementation of an unbounded buffer.
package buffer
-import "sync"
+import (
+ "errors"
+ "sync"
+)
// Unbounded is an implementation of an unbounded buffer which does not use
// extra goroutines. This is typically used for passing updates from one entity
@@ -36,6 +39,7 @@ import "sync"
type Unbounded struct {
c chan any
closed bool
+ closing bool
mu sync.Mutex
backlog []any
}
@@ -45,32 +49,32 @@ func NewUnbounded() *Unbounded {
return &Unbounded{c: make(chan any, 1)}
}
+var errBufferClosed = errors.New("Put called on closed buffer.Unbounded")
+
// Put adds t to the unbounded buffer.
-func (b *Unbounded) Put(t any) {
+func (b *Unbounded) Put(t any) error {
b.mu.Lock()
defer b.mu.Unlock()
- if b.closed {
- return
+ if b.closing {
+ return errBufferClosed
}
if len(b.backlog) == 0 {
select {
case b.c <- t:
- return
+ return nil
default:
}
}
b.backlog = append(b.backlog, t)
+ return nil
}
-// Load sends the earliest buffered data, if any, onto the read channel
-// returned by Get(). Users are expected to call this every time they read a
+// Load sends the earliest buffered data, if any, onto the read channel returned
+// by Get(). Users are expected to call this every time they successfully read a
// value from the read channel.
func (b *Unbounded) Load() {
b.mu.Lock()
defer b.mu.Unlock()
- if b.closed {
- return
- }
if len(b.backlog) > 0 {
select {
case b.c <- b.backlog[0]:
@@ -78,6 +82,8 @@ func (b *Unbounded) Load() {
b.backlog = b.backlog[1:]
default:
}
+ } else if b.closing && !b.closed {
+ close(b.c)
}
}
@@ -88,18 +94,23 @@ func (b *Unbounded) Load() {
// send the next buffered value onto the channel if there is any.
//
// If the unbounded buffer is closed, the read channel returned by this method
-// is closed.
+// is closed after all data is drained.
func (b *Unbounded) Get() <-chan any {
return b.c
}
-// Close closes the unbounded buffer.
+// Close closes the unbounded buffer. No subsequent data may be Put(), and the
+// channel returned from Get() will be closed after all the data is read and
+// Load() is called for the final time.
func (b *Unbounded) Close() {
b.mu.Lock()
defer b.mu.Unlock()
- if b.closed {
+ if b.closing {
return
}
- b.closed = true
- close(b.c)
+ b.closing = true
+ if len(b.backlog) == 0 {
+ b.closed = true
+ close(b.c)
+ }
}