summaryrefslogtreecommitdiff
path: root/vendor/github.com/puzpuzpuz/xsync/v3/mpmcqueue.go
blob: 96584e698a431a3bb43d9301c703e08f178ef7fe (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
package xsync

import (
	"runtime"
	"sync/atomic"
	"unsafe"
)

// A MPMCQueue is a bounded multi-producer multi-consumer concurrent
// queue.
//
// MPMCQueue instances must be created with NewMPMCQueue function.
// A MPMCQueue must not be copied after first use.
//
// Based on the data structure from the following C++ library:
// https://github.com/rigtorp/MPMCQueue
type MPMCQueue struct {
	cap  uint64
	head uint64
	//lint:ignore U1000 prevents false sharing
	hpad [cacheLineSize - 8]byte
	tail uint64
	//lint:ignore U1000 prevents false sharing
	tpad  [cacheLineSize - 8]byte
	slots []slotPadded
}

type slotPadded struct {
	slot
	//lint:ignore U1000 prevents false sharing
	pad [cacheLineSize - unsafe.Sizeof(slot{})]byte
}

type slot struct {
	turn uint64
	item interface{}
}

// NewMPMCQueue creates a new MPMCQueue instance with the given
// capacity.
func NewMPMCQueue(capacity int) *MPMCQueue {
	if capacity < 1 {
		panic("capacity must be positive number")
	}
	return &MPMCQueue{
		cap:   uint64(capacity),
		slots: make([]slotPadded, capacity),
	}
}

// Enqueue inserts the given item into the queue.
// Blocks, if the queue is full.
func (q *MPMCQueue) Enqueue(item interface{}) {
	head := atomic.AddUint64(&q.head, 1) - 1
	slot := &q.slots[q.idx(head)]
	turn := q.turn(head) * 2
	for atomic.LoadUint64(&slot.turn) != turn {
		runtime.Gosched()
	}
	slot.item = item
	atomic.StoreUint64(&slot.turn, turn+1)
}

// Dequeue retrieves and removes the item from the head of the queue.
// Blocks, if the queue is empty.
func (q *MPMCQueue) Dequeue() interface{} {
	tail := atomic.AddUint64(&q.tail, 1) - 1
	slot := &q.slots[q.idx(tail)]
	turn := q.turn(tail)*2 + 1
	for atomic.LoadUint64(&slot.turn) != turn {
		runtime.Gosched()
	}
	item := slot.item
	slot.item = nil
	atomic.StoreUint64(&slot.turn, turn+1)
	return item
}

// TryEnqueue inserts the given item into the queue. Does not block
// and returns immediately. The result indicates that the queue isn't
// full and the item was inserted.
func (q *MPMCQueue) TryEnqueue(item interface{}) bool {
	head := atomic.LoadUint64(&q.head)
	for {
		slot := &q.slots[q.idx(head)]
		turn := q.turn(head) * 2
		if atomic.LoadUint64(&slot.turn) == turn {
			if atomic.CompareAndSwapUint64(&q.head, head, head+1) {
				slot.item = item
				atomic.StoreUint64(&slot.turn, turn+1)
				return true
			}
		} else {
			prevHead := head
			head = atomic.LoadUint64(&q.head)
			if head == prevHead {
				return false
			}
		}
		runtime.Gosched()
	}
}

// TryDequeue retrieves and removes the item from the head of the
// queue. Does not block and returns immediately. The ok result
// indicates that the queue isn't empty and an item was retrieved.
func (q *MPMCQueue) TryDequeue() (item interface{}, ok bool) {
	tail := atomic.LoadUint64(&q.tail)
	for {
		slot := &q.slots[q.idx(tail)]
		turn := q.turn(tail)*2 + 1
		if atomic.LoadUint64(&slot.turn) == turn {
			if atomic.CompareAndSwapUint64(&q.tail, tail, tail+1) {
				item = slot.item
				ok = true
				slot.item = nil
				atomic.StoreUint64(&slot.turn, turn+1)
				return
			}
		} else {
			prevTail := tail
			tail = atomic.LoadUint64(&q.tail)
			if tail == prevTail {
				return
			}
		}
		runtime.Gosched()
	}
}

func (q *MPMCQueue) idx(i uint64) uint64 {
	return i % q.cap
}

func (q *MPMCQueue) turn(i uint64) uint64 {
	return i / q.cap
}