summaryrefslogtreecommitdiff
path: root/vendor/codeberg.org/gruf/go-structr/queue_ctx.go
diff options
context:
space:
mode:
Diffstat (limited to 'vendor/codeberg.org/gruf/go-structr/queue_ctx.go')
-rw-r--r--vendor/codeberg.org/gruf/go-structr/queue_ctx.go134
1 files changed, 134 insertions, 0 deletions
diff --git a/vendor/codeberg.org/gruf/go-structr/queue_ctx.go b/vendor/codeberg.org/gruf/go-structr/queue_ctx.go
new file mode 100644
index 000000000..d8843c1eb
--- /dev/null
+++ b/vendor/codeberg.org/gruf/go-structr/queue_ctx.go
@@ -0,0 +1,134 @@
+package structr
+
+import (
+ "context"
+)
+
+// QueueCtx is a context-aware form of Queue{}.
+type QueueCtx[StructType any] struct {
+ Queue[StructType]
+ ch chan struct{}
+}
+
+// PopFront pops the current value at front of the queue, else blocking on ctx.
+func (q *QueueCtx[T]) PopFront(ctx context.Context) (T, bool) {
+ return q.pop(ctx, func() *list_elem {
+ return q.queue.head
+ })
+}
+
+// PopBack pops the current value at back of the queue, else blocking on ctx.
+func (q *QueueCtx[T]) PopBack(ctx context.Context) (T, bool) {
+ return q.pop(ctx, func() *list_elem {
+ return q.queue.tail
+ })
+}
+
+// PushFront pushes values to front of queue.
+func (q *QueueCtx[T]) PushFront(values ...T) {
+ q.mutex.Lock()
+ for i := range values {
+ item := q.index(values[i])
+ q.queue.push_front(&item.elem)
+ }
+ if q.ch != nil {
+ close(q.ch)
+ q.ch = nil
+ }
+ q.mutex.Unlock()
+}
+
+// PushBack pushes values to back of queue.
+func (q *QueueCtx[T]) PushBack(values ...T) {
+ q.mutex.Lock()
+ for i := range values {
+ item := q.index(values[i])
+ q.queue.push_back(&item.elem)
+ }
+ if q.ch != nil {
+ close(q.ch)
+ q.ch = nil
+ }
+ q.mutex.Unlock()
+}
+
+// Wait returns a ptr to the current ctx channel,
+// this will block until next push to the queue.
+func (q *QueueCtx[T]) Wait() <-chan struct{} {
+ q.mutex.Lock()
+ if q.ch == nil {
+ q.ch = make(chan struct{})
+ }
+ ctx := q.ch
+ q.mutex.Unlock()
+ return ctx
+}
+
+func (q *QueueCtx[T]) pop(ctx context.Context, next func() *list_elem) (T, bool) {
+ if next == nil {
+ panic("nil fn")
+ } else if ctx == nil {
+ panic("nil ctx")
+ }
+
+ // Acquire lock.
+ q.mutex.Lock()
+
+ var elem *list_elem
+
+ for {
+ // Get element.
+ elem = next()
+ if elem != nil {
+ break
+ }
+
+ if q.ch == nil {
+ // Allocate new ctx channel.
+ q.ch = make(chan struct{})
+ }
+
+ // Get current
+ // ch pointer.
+ ch := q.ch
+
+ // Unlock queue.
+ q.mutex.Unlock()
+
+ select {
+ // Ctx cancelled.
+ case <-ctx.Done():
+ var z T
+ return z, false
+
+ // Pushed!
+ case <-ch:
+ }
+
+ // Relock queue.
+ q.mutex.Lock()
+ }
+
+ // Cast the indexed item from elem.
+ item := (*indexed_item)(elem.data)
+
+ // Extract item value.
+ value := item.data.(T)
+
+ // Delete queued.
+ q.delete(item)
+
+ // Get func ptrs.
+ pop := q.Queue.pop
+
+ // Done with lock.
+ q.mutex.Unlock()
+
+ if pop != nil {
+ // Pass to
+ // user hook.
+ pop(value)
+ }
+
+ return value, true
+}