summaryrefslogtreecommitdiff
path: root/vendor/github.com/puzpuzpuz/xsync/v3/mpmcqueue.go
diff options
context:
space:
mode:
Diffstat (limited to 'vendor/github.com/puzpuzpuz/xsync/v3/mpmcqueue.go')
-rw-r--r--vendor/github.com/puzpuzpuz/xsync/v3/mpmcqueue.go56
1 files changed, 22 insertions, 34 deletions
diff --git a/vendor/github.com/puzpuzpuz/xsync/v3/mpmcqueue.go b/vendor/github.com/puzpuzpuz/xsync/v3/mpmcqueue.go
index 96584e698..c5fd26237 100644
--- a/vendor/github.com/puzpuzpuz/xsync/v3/mpmcqueue.go
+++ b/vendor/github.com/puzpuzpuz/xsync/v3/mpmcqueue.go
@@ -50,6 +50,8 @@ func NewMPMCQueue(capacity int) *MPMCQueue {
// Enqueue inserts the given item into the queue.
// Blocks, if the queue is full.
+//
+// Deprecated: use TryEnqueue in combination with runtime.Gosched().
func (q *MPMCQueue) Enqueue(item interface{}) {
head := atomic.AddUint64(&q.head, 1) - 1
slot := &q.slots[q.idx(head)]
@@ -63,6 +65,8 @@ func (q *MPMCQueue) Enqueue(item interface{}) {
// Dequeue retrieves and removes the item from the head of the queue.
// Blocks, if the queue is empty.
+//
+// Deprecated: use TryDequeue in combination with runtime.Gosched().
func (q *MPMCQueue) Dequeue() interface{} {
tail := atomic.AddUint64(&q.tail, 1) - 1
slot := &q.slots[q.idx(tail)]
@@ -81,24 +85,16 @@ func (q *MPMCQueue) Dequeue() interface{} {
// full and the item was inserted.
func (q *MPMCQueue) TryEnqueue(item interface{}) bool {
head := atomic.LoadUint64(&q.head)
- for {
- slot := &q.slots[q.idx(head)]
- turn := q.turn(head) * 2
- if atomic.LoadUint64(&slot.turn) == turn {
- if atomic.CompareAndSwapUint64(&q.head, head, head+1) {
- slot.item = item
- atomic.StoreUint64(&slot.turn, turn+1)
- return true
- }
- } else {
- prevHead := head
- head = atomic.LoadUint64(&q.head)
- if head == prevHead {
- return false
- }
+ slot := &q.slots[q.idx(head)]
+ turn := q.turn(head) * 2
+ if atomic.LoadUint64(&slot.turn) == turn {
+ if atomic.CompareAndSwapUint64(&q.head, head, head+1) {
+ slot.item = item
+ atomic.StoreUint64(&slot.turn, turn+1)
+ return true
}
- runtime.Gosched()
}
+ return false
}
// TryDequeue retrieves and removes the item from the head of the
@@ -106,26 +102,18 @@ func (q *MPMCQueue) TryEnqueue(item interface{}) bool {
// indicates that the queue isn't empty and an item was retrieved.
func (q *MPMCQueue) TryDequeue() (item interface{}, ok bool) {
tail := atomic.LoadUint64(&q.tail)
- for {
- slot := &q.slots[q.idx(tail)]
- turn := q.turn(tail)*2 + 1
- if atomic.LoadUint64(&slot.turn) == turn {
- if atomic.CompareAndSwapUint64(&q.tail, tail, tail+1) {
- item = slot.item
- ok = true
- slot.item = nil
- atomic.StoreUint64(&slot.turn, turn+1)
- return
- }
- } else {
- prevTail := tail
- tail = atomic.LoadUint64(&q.tail)
- if tail == prevTail {
- return
- }
+ slot := &q.slots[q.idx(tail)]
+ turn := q.turn(tail)*2 + 1
+ if atomic.LoadUint64(&slot.turn) == turn {
+ if atomic.CompareAndSwapUint64(&q.tail, tail, tail+1) {
+ item = slot.item
+ ok = true
+ slot.item = nil
+ atomic.StoreUint64(&slot.turn, turn+1)
+ return
}
- runtime.Gosched()
}
+ return
}
func (q *MPMCQueue) idx(i uint64) uint64 {