diff options
Diffstat (limited to 'vendor/github.com/puzpuzpuz/xsync/v3/mpmcqueue.go')
-rw-r--r-- | vendor/github.com/puzpuzpuz/xsync/v3/mpmcqueue.go | 56 |
1 files changed, 22 insertions, 34 deletions
diff --git a/vendor/github.com/puzpuzpuz/xsync/v3/mpmcqueue.go b/vendor/github.com/puzpuzpuz/xsync/v3/mpmcqueue.go index 96584e698..c5fd26237 100644 --- a/vendor/github.com/puzpuzpuz/xsync/v3/mpmcqueue.go +++ b/vendor/github.com/puzpuzpuz/xsync/v3/mpmcqueue.go @@ -50,6 +50,8 @@ func NewMPMCQueue(capacity int) *MPMCQueue { // Enqueue inserts the given item into the queue. // Blocks, if the queue is full. +// +// Deprecated: use TryEnqueue in combination with runtime.Gosched(). func (q *MPMCQueue) Enqueue(item interface{}) { head := atomic.AddUint64(&q.head, 1) - 1 slot := &q.slots[q.idx(head)] @@ -63,6 +65,8 @@ func (q *MPMCQueue) Enqueue(item interface{}) { // Dequeue retrieves and removes the item from the head of the queue. // Blocks, if the queue is empty. +// +// Deprecated: use TryDequeue in combination with runtime.Gosched(). func (q *MPMCQueue) Dequeue() interface{} { tail := atomic.AddUint64(&q.tail, 1) - 1 slot := &q.slots[q.idx(tail)] @@ -81,24 +85,16 @@ func (q *MPMCQueue) Dequeue() interface{} { // full and the item was inserted. func (q *MPMCQueue) TryEnqueue(item interface{}) bool { head := atomic.LoadUint64(&q.head) - for { - slot := &q.slots[q.idx(head)] - turn := q.turn(head) * 2 - if atomic.LoadUint64(&slot.turn) == turn { - if atomic.CompareAndSwapUint64(&q.head, head, head+1) { - slot.item = item - atomic.StoreUint64(&slot.turn, turn+1) - return true - } - } else { - prevHead := head - head = atomic.LoadUint64(&q.head) - if head == prevHead { - return false - } + slot := &q.slots[q.idx(head)] + turn := q.turn(head) * 2 + if atomic.LoadUint64(&slot.turn) == turn { + if atomic.CompareAndSwapUint64(&q.head, head, head+1) { + slot.item = item + atomic.StoreUint64(&slot.turn, turn+1) + return true } - runtime.Gosched() } + return false } // TryDequeue retrieves and removes the item from the head of the @@ -106,26 +102,18 @@ func (q *MPMCQueue) TryEnqueue(item interface{}) bool { // indicates that the queue isn't empty and an item was retrieved. func (q *MPMCQueue) TryDequeue() (item interface{}, ok bool) { tail := atomic.LoadUint64(&q.tail) - for { - slot := &q.slots[q.idx(tail)] - turn := q.turn(tail)*2 + 1 - if atomic.LoadUint64(&slot.turn) == turn { - if atomic.CompareAndSwapUint64(&q.tail, tail, tail+1) { - item = slot.item - ok = true - slot.item = nil - atomic.StoreUint64(&slot.turn, turn+1) - return - } - } else { - prevTail := tail - tail = atomic.LoadUint64(&q.tail) - if tail == prevTail { - return - } + slot := &q.slots[q.idx(tail)] + turn := q.turn(tail)*2 + 1 + if atomic.LoadUint64(&slot.turn) == turn { + if atomic.CompareAndSwapUint64(&q.tail, tail, tail+1) { + item = slot.item + ok = true + slot.item = nil + atomic.StoreUint64(&slot.turn, turn+1) + return } - runtime.Gosched() } + return } func (q *MPMCQueue) idx(i uint64) uint64 { |