summaryrefslogtreecommitdiff
path: root/vendor/github.com/puzpuzpuz/xsync/v3/mpmcqueue.go
blob: c5fd2623795ef1555063ed792e649af3f2afa0e6 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
package xsync

import (
	"runtime"
	"sync/atomic"
	"unsafe"
)

// A MPMCQueue is a bounded multi-producer multi-consumer concurrent
// queue.
//
// MPMCQueue instances must be created with NewMPMCQueue function.
// A MPMCQueue must not be copied after first use.
//
// Based on the data structure from the following C++ library:
// https://github.com/rigtorp/MPMCQueue
type MPMCQueue struct {
	cap  uint64
	head uint64
	//lint:ignore U1000 prevents false sharing
	hpad [cacheLineSize - 8]byte
	tail uint64
	//lint:ignore U1000 prevents false sharing
	tpad  [cacheLineSize - 8]byte
	slots []slotPadded
}

type slotPadded struct {
	slot
	//lint:ignore U1000 prevents false sharing
	pad [cacheLineSize - unsafe.Sizeof(slot{})]byte
}

type slot struct {
	turn uint64
	item interface{}
}

// NewMPMCQueue creates a new MPMCQueue instance with the given
// capacity.
func NewMPMCQueue(capacity int) *MPMCQueue {
	if capacity < 1 {
		panic("capacity must be positive number")
	}
	return &MPMCQueue{
		cap:   uint64(capacity),
		slots: make([]slotPadded, capacity),
	}
}

// Enqueue inserts the given item into the queue.
// Blocks, if the queue is full.
//
// Deprecated: use TryEnqueue in combination with runtime.Gosched().
func (q *MPMCQueue) Enqueue(item interface{}) {
	head := atomic.AddUint64(&q.head, 1) - 1
	slot := &q.slots[q.idx(head)]
	turn := q.turn(head) * 2
	for atomic.LoadUint64(&slot.turn) != turn {
		runtime.Gosched()
	}
	slot.item = item
	atomic.StoreUint64(&slot.turn, turn+1)
}

// Dequeue retrieves and removes the item from the head of the queue.
// Blocks, if the queue is empty.
//
// Deprecated: use TryDequeue in combination with runtime.Gosched().
func (q *MPMCQueue) Dequeue() interface{} {
	tail := atomic.AddUint64(&q.tail, 1) - 1
	slot := &q.slots[q.idx(tail)]
	turn := q.turn(tail)*2 + 1
	for atomic.LoadUint64(&slot.turn) != turn {
		runtime.Gosched()
	}
	item := slot.item
	slot.item = nil
	atomic.StoreUint64(&slot.turn, turn+1)
	return item
}

// TryEnqueue inserts the given item into the queue. Does not block
// and returns immediately. The result indicates that the queue isn't
// full and the item was inserted.
func (q *MPMCQueue) TryEnqueue(item interface{}) bool {
	head := atomic.LoadUint64(&q.head)
	slot := &q.slots[q.idx(head)]
	turn := q.turn(head) * 2
	if atomic.LoadUint64(&slot.turn) == turn {
		if atomic.CompareAndSwapUint64(&q.head, head, head+1) {
			slot.item = item
			atomic.StoreUint64(&slot.turn, turn+1)
			return true
		}
	}
	return false
}

// TryDequeue retrieves and removes the item from the head of the
// queue. Does not block and returns immediately. The ok result
// indicates that the queue isn't empty and an item was retrieved.
func (q *MPMCQueue) TryDequeue() (item interface{}, ok bool) {
	tail := atomic.LoadUint64(&q.tail)
	slot := &q.slots[q.idx(tail)]
	turn := q.turn(tail)*2 + 1
	if atomic.LoadUint64(&slot.turn) == turn {
		if atomic.CompareAndSwapUint64(&q.tail, tail, tail+1) {
			item = slot.item
			ok = true
			slot.item = nil
			atomic.StoreUint64(&slot.turn, turn+1)
			return
		}
	}
	return
}

func (q *MPMCQueue) idx(i uint64) uint64 {
	return i % q.cap
}

func (q *MPMCQueue) turn(i uint64) uint64 {
	return i / q.cap
}