summaryrefslogtreecommitdiff
path: root/vendor/github.com/puzpuzpuz/xsync/v3/mpmcqueueof.go
diff options
context:
space:
mode:
Diffstat (limited to 'vendor/github.com/puzpuzpuz/xsync/v3/mpmcqueueof.go')
-rw-r--r--vendor/github.com/puzpuzpuz/xsync/v3/mpmcqueueof.go64
1 files changed, 26 insertions, 38 deletions
diff --git a/vendor/github.com/puzpuzpuz/xsync/v3/mpmcqueueof.go b/vendor/github.com/puzpuzpuz/xsync/v3/mpmcqueueof.go
index 38a8fa3c6..3f7e4ccc1 100644
--- a/vendor/github.com/puzpuzpuz/xsync/v3/mpmcqueueof.go
+++ b/vendor/github.com/puzpuzpuz/xsync/v3/mpmcqueueof.go
@@ -12,7 +12,7 @@ import (
// A MPMCQueueOf is a bounded multi-producer multi-consumer concurrent
// queue. It's a generic version of MPMCQueue.
//
-// MPMCQueue instances must be created with NewMPMCQueueOf function.
+// MPMCQueueOf instances must be created with NewMPMCQueueOf function.
// A MPMCQueueOf must not be copied after first use.
//
// Based on the data structure from the following C++ library:
@@ -61,6 +61,8 @@ func NewMPMCQueueOf[I any](capacity int) *MPMCQueueOf[I] {
// Enqueue inserts the given item into the queue.
// Blocks, if the queue is full.
+//
+// Deprecated: use TryEnqueue in combination with runtime.Gosched().
func (q *MPMCQueueOf[I]) Enqueue(item I) {
head := atomic.AddUint64(&q.head, 1) - 1
slot := &q.slots[q.idx(head)]
@@ -74,8 +76,10 @@ func (q *MPMCQueueOf[I]) Enqueue(item I) {
// Dequeue retrieves and removes the item from the head of the queue.
// Blocks, if the queue is empty.
+//
+// Deprecated: use TryDequeue in combination with runtime.Gosched().
func (q *MPMCQueueOf[I]) Dequeue() I {
- var zeroedI I
+ var zeroI I
tail := atomic.AddUint64(&q.tail, 1) - 1
slot := &q.slots[q.idx(tail)]
turn := q.turn(tail)*2 + 1
@@ -83,7 +87,7 @@ func (q *MPMCQueueOf[I]) Dequeue() I {
runtime.Gosched()
}
item := slot.item
- slot.item = zeroedI
+ slot.item = zeroI
slot.turn.Store(turn + 1)
return item
}
@@ -93,24 +97,16 @@ func (q *MPMCQueueOf[I]) Dequeue() I {
// full and the item was inserted.
func (q *MPMCQueueOf[I]) TryEnqueue(item I) bool {
head := atomic.LoadUint64(&q.head)
- for {
- slot := &q.slots[q.idx(head)]
- turn := q.turn(head) * 2
- if slot.turn.Load() == turn {
- if atomic.CompareAndSwapUint64(&q.head, head, head+1) {
- slot.item = item
- slot.turn.Store(turn + 1)
- return true
- }
- } else {
- prevHead := head
- head = atomic.LoadUint64(&q.head)
- if head == prevHead {
- return false
- }
+ slot := &q.slots[q.idx(head)]
+ turn := q.turn(head) * 2
+ if slot.turn.Load() == turn {
+ if atomic.CompareAndSwapUint64(&q.head, head, head+1) {
+ slot.item = item
+ slot.turn.Store(turn + 1)
+ return true
}
- runtime.Gosched()
}
+ return false
}
// TryDequeue retrieves and removes the item from the head of the
@@ -118,27 +114,19 @@ func (q *MPMCQueueOf[I]) TryEnqueue(item I) bool {
// indicates that the queue isn't empty and an item was retrieved.
func (q *MPMCQueueOf[I]) TryDequeue() (item I, ok bool) {
tail := atomic.LoadUint64(&q.tail)
- for {
- slot := &q.slots[q.idx(tail)]
- turn := q.turn(tail)*2 + 1
- if slot.turn.Load() == turn {
- if atomic.CompareAndSwapUint64(&q.tail, tail, tail+1) {
- var zeroedI I
- item = slot.item
- ok = true
- slot.item = zeroedI
- slot.turn.Store(turn + 1)
- return
- }
- } else {
- prevTail := tail
- tail = atomic.LoadUint64(&q.tail)
- if tail == prevTail {
- return
- }
+ slot := &q.slots[q.idx(tail)]
+ turn := q.turn(tail)*2 + 1
+ if slot.turn.Load() == turn {
+ if atomic.CompareAndSwapUint64(&q.tail, tail, tail+1) {
+ var zeroI I
+ item = slot.item
+ ok = true
+ slot.item = zeroI
+ slot.turn.Store(turn + 1)
+ return
}
- runtime.Gosched()
}
+ return
}
func (q *MPMCQueueOf[I]) idx(i uint64) uint64 {