diff options
Diffstat (limited to 'vendor/github.com/puzpuzpuz/xsync/v3/mpmcqueueof.go')
-rw-r--r-- | vendor/github.com/puzpuzpuz/xsync/v3/mpmcqueueof.go | 64 |
1 files changed, 26 insertions, 38 deletions
diff --git a/vendor/github.com/puzpuzpuz/xsync/v3/mpmcqueueof.go b/vendor/github.com/puzpuzpuz/xsync/v3/mpmcqueueof.go index 38a8fa3c6..3f7e4ccc1 100644 --- a/vendor/github.com/puzpuzpuz/xsync/v3/mpmcqueueof.go +++ b/vendor/github.com/puzpuzpuz/xsync/v3/mpmcqueueof.go @@ -12,7 +12,7 @@ import ( // A MPMCQueueOf is a bounded multi-producer multi-consumer concurrent // queue. It's a generic version of MPMCQueue. // -// MPMCQueue instances must be created with NewMPMCQueueOf function. +// MPMCQueueOf instances must be created with NewMPMCQueueOf function. // A MPMCQueueOf must not be copied after first use. // // Based on the data structure from the following C++ library: @@ -61,6 +61,8 @@ func NewMPMCQueueOf[I any](capacity int) *MPMCQueueOf[I] { // Enqueue inserts the given item into the queue. // Blocks, if the queue is full. +// +// Deprecated: use TryEnqueue in combination with runtime.Gosched(). func (q *MPMCQueueOf[I]) Enqueue(item I) { head := atomic.AddUint64(&q.head, 1) - 1 slot := &q.slots[q.idx(head)] @@ -74,8 +76,10 @@ func (q *MPMCQueueOf[I]) Enqueue(item I) { // Dequeue retrieves and removes the item from the head of the queue. // Blocks, if the queue is empty. +// +// Deprecated: use TryDequeue in combination with runtime.Gosched(). func (q *MPMCQueueOf[I]) Dequeue() I { - var zeroedI I + var zeroI I tail := atomic.AddUint64(&q.tail, 1) - 1 slot := &q.slots[q.idx(tail)] turn := q.turn(tail)*2 + 1 @@ -83,7 +87,7 @@ func (q *MPMCQueueOf[I]) Dequeue() I { runtime.Gosched() } item := slot.item - slot.item = zeroedI + slot.item = zeroI slot.turn.Store(turn + 1) return item } @@ -93,24 +97,16 @@ func (q *MPMCQueueOf[I]) Dequeue() I { // full and the item was inserted. func (q *MPMCQueueOf[I]) TryEnqueue(item I) bool { head := atomic.LoadUint64(&q.head) - for { - slot := &q.slots[q.idx(head)] - turn := q.turn(head) * 2 - if slot.turn.Load() == turn { - if atomic.CompareAndSwapUint64(&q.head, head, head+1) { - slot.item = item - slot.turn.Store(turn + 1) - return true - } - } else { - prevHead := head - head = atomic.LoadUint64(&q.head) - if head == prevHead { - return false - } + slot := &q.slots[q.idx(head)] + turn := q.turn(head) * 2 + if slot.turn.Load() == turn { + if atomic.CompareAndSwapUint64(&q.head, head, head+1) { + slot.item = item + slot.turn.Store(turn + 1) + return true } - runtime.Gosched() } + return false } // TryDequeue retrieves and removes the item from the head of the @@ -118,27 +114,19 @@ func (q *MPMCQueueOf[I]) TryEnqueue(item I) bool { // indicates that the queue isn't empty and an item was retrieved. func (q *MPMCQueueOf[I]) TryDequeue() (item I, ok bool) { tail := atomic.LoadUint64(&q.tail) - for { - slot := &q.slots[q.idx(tail)] - turn := q.turn(tail)*2 + 1 - if slot.turn.Load() == turn { - if atomic.CompareAndSwapUint64(&q.tail, tail, tail+1) { - var zeroedI I - item = slot.item - ok = true - slot.item = zeroedI - slot.turn.Store(turn + 1) - return - } - } else { - prevTail := tail - tail = atomic.LoadUint64(&q.tail) - if tail == prevTail { - return - } + slot := &q.slots[q.idx(tail)] + turn := q.turn(tail)*2 + 1 + if slot.turn.Load() == turn { + if atomic.CompareAndSwapUint64(&q.tail, tail, tail+1) { + var zeroI I + item = slot.item + ok = true + slot.item = zeroI + slot.turn.Store(turn + 1) + return } - runtime.Gosched() } + return } func (q *MPMCQueueOf[I]) idx(i uint64) uint64 { |