summaryrefslogtreecommitdiff
path: root/vendor/github.com/puzpuzpuz/xsync/v3/mpmcqueueof.go
diff options
context:
space:
mode:
Diffstat (limited to 'vendor/github.com/puzpuzpuz/xsync/v3/mpmcqueueof.go')
-rw-r--r--vendor/github.com/puzpuzpuz/xsync/v3/mpmcqueueof.go150
1 files changed, 150 insertions, 0 deletions
diff --git a/vendor/github.com/puzpuzpuz/xsync/v3/mpmcqueueof.go b/vendor/github.com/puzpuzpuz/xsync/v3/mpmcqueueof.go
new file mode 100644
index 000000000..38a8fa3c6
--- /dev/null
+++ b/vendor/github.com/puzpuzpuz/xsync/v3/mpmcqueueof.go
@@ -0,0 +1,150 @@
+//go:build go1.19
+// +build go1.19
+
+package xsync
+
+import (
+ "runtime"
+ "sync/atomic"
+ "unsafe"
+)
+
+// A MPMCQueueOf is a bounded multi-producer multi-consumer concurrent
+// queue. It's a generic version of MPMCQueue.
+//
+// MPMCQueue instances must be created with NewMPMCQueueOf function.
+// A MPMCQueueOf must not be copied after first use.
+//
+// Based on the data structure from the following C++ library:
+// https://github.com/rigtorp/MPMCQueue
+type MPMCQueueOf[I any] struct {
+ cap uint64
+ head uint64
+ //lint:ignore U1000 prevents false sharing
+ hpad [cacheLineSize - 8]byte
+ tail uint64
+ //lint:ignore U1000 prevents false sharing
+ tpad [cacheLineSize - 8]byte
+ slots []slotOfPadded[I]
+}
+
+type slotOfPadded[I any] struct {
+ slotOf[I]
+ // Unfortunately, proper padding like the below one:
+ //
+ // pad [cacheLineSize - (unsafe.Sizeof(slotOf[I]{}) % cacheLineSize)]byte
+ //
+ // won't compile, so here we add a best-effort padding for items up to
+ // 56 bytes size.
+ //lint:ignore U1000 prevents false sharing
+ pad [cacheLineSize - unsafe.Sizeof(atomic.Uint64{})]byte
+}
+
+type slotOf[I any] struct {
+ // atomic.Uint64 is used here to get proper 8 byte alignment on
+ // 32-bit archs.
+ turn atomic.Uint64
+ item I
+}
+
+// NewMPMCQueueOf creates a new MPMCQueueOf instance with the given
+// capacity.
+func NewMPMCQueueOf[I any](capacity int) *MPMCQueueOf[I] {
+ if capacity < 1 {
+ panic("capacity must be positive number")
+ }
+ return &MPMCQueueOf[I]{
+ cap: uint64(capacity),
+ slots: make([]slotOfPadded[I], capacity),
+ }
+}
+
+// Enqueue inserts the given item into the queue.
+// Blocks, if the queue is full.
+func (q *MPMCQueueOf[I]) Enqueue(item I) {
+ head := atomic.AddUint64(&q.head, 1) - 1
+ slot := &q.slots[q.idx(head)]
+ turn := q.turn(head) * 2
+ for slot.turn.Load() != turn {
+ runtime.Gosched()
+ }
+ slot.item = item
+ slot.turn.Store(turn + 1)
+}
+
+// Dequeue retrieves and removes the item from the head of the queue.
+// Blocks, if the queue is empty.
+func (q *MPMCQueueOf[I]) Dequeue() I {
+ var zeroedI I
+ tail := atomic.AddUint64(&q.tail, 1) - 1
+ slot := &q.slots[q.idx(tail)]
+ turn := q.turn(tail)*2 + 1
+ for slot.turn.Load() != turn {
+ runtime.Gosched()
+ }
+ item := slot.item
+ slot.item = zeroedI
+ slot.turn.Store(turn + 1)
+ return item
+}
+
+// TryEnqueue inserts the given item into the queue. Does not block
+// and returns immediately. The result indicates that the queue isn't
+// full and the item was inserted.
+func (q *MPMCQueueOf[I]) TryEnqueue(item I) bool {
+ head := atomic.LoadUint64(&q.head)
+ for {
+ slot := &q.slots[q.idx(head)]
+ turn := q.turn(head) * 2
+ if slot.turn.Load() == turn {
+ if atomic.CompareAndSwapUint64(&q.head, head, head+1) {
+ slot.item = item
+ slot.turn.Store(turn + 1)
+ return true
+ }
+ } else {
+ prevHead := head
+ head = atomic.LoadUint64(&q.head)
+ if head == prevHead {
+ return false
+ }
+ }
+ runtime.Gosched()
+ }
+}
+
+// TryDequeue retrieves and removes the item from the head of the
+// queue. Does not block and returns immediately. The ok result
+// indicates that the queue isn't empty and an item was retrieved.
+func (q *MPMCQueueOf[I]) TryDequeue() (item I, ok bool) {
+ tail := atomic.LoadUint64(&q.tail)
+ for {
+ slot := &q.slots[q.idx(tail)]
+ turn := q.turn(tail)*2 + 1
+ if slot.turn.Load() == turn {
+ if atomic.CompareAndSwapUint64(&q.tail, tail, tail+1) {
+ var zeroedI I
+ item = slot.item
+ ok = true
+ slot.item = zeroedI
+ slot.turn.Store(turn + 1)
+ return
+ }
+ } else {
+ prevTail := tail
+ tail = atomic.LoadUint64(&q.tail)
+ if tail == prevTail {
+ return
+ }
+ }
+ runtime.Gosched()
+ }
+}
+
+func (q *MPMCQueueOf[I]) idx(i uint64) uint64 {
+ return i % q.cap
+}
+
+func (q *MPMCQueueOf[I]) turn(i uint64) uint64 {
+ return i / q.cap
+}