summaryrefslogtreecommitdiff
path: root/vendor/github.com/puzpuzpuz/xsync/v3/spscqueue.go
diff options
context:
space:
mode:
Diffstat (limited to 'vendor/github.com/puzpuzpuz/xsync/v3/spscqueue.go')
-rw-r--r--vendor/github.com/puzpuzpuz/xsync/v3/spscqueue.go92
1 files changed, 92 insertions, 0 deletions
diff --git a/vendor/github.com/puzpuzpuz/xsync/v3/spscqueue.go b/vendor/github.com/puzpuzpuz/xsync/v3/spscqueue.go
new file mode 100644
index 000000000..6e4f84bc0
--- /dev/null
+++ b/vendor/github.com/puzpuzpuz/xsync/v3/spscqueue.go
@@ -0,0 +1,92 @@
+package xsync
+
+import (
+ "sync/atomic"
+)
+
+// A SPSCQueue is a bounded single-producer single-consumer concurrent
+// queue. This means that not more than a single goroutine must be
+// publishing items to the queue while not more than a single goroutine
+// must be consuming those items.
+//
+// SPSCQueue instances must be created with NewSPSCQueue function.
+// A SPSCQueue must not be copied after first use.
+//
+// Based on the data structure from the following article:
+// https://rigtorp.se/ringbuffer/
+type SPSCQueue struct {
+ cap uint64
+ pidx uint64
+ //lint:ignore U1000 prevents false sharing
+ pad0 [cacheLineSize - 8]byte
+ pcachedIdx uint64
+ //lint:ignore U1000 prevents false sharing
+ pad1 [cacheLineSize - 8]byte
+ cidx uint64
+ //lint:ignore U1000 prevents false sharing
+ pad2 [cacheLineSize - 8]byte
+ ccachedIdx uint64
+ //lint:ignore U1000 prevents false sharing
+ pad3 [cacheLineSize - 8]byte
+ items []interface{}
+}
+
+// NewSPSCQueue creates a new SPSCQueue instance with the given
+// capacity.
+func NewSPSCQueue(capacity int) *SPSCQueue {
+ if capacity < 1 {
+ panic("capacity must be positive number")
+ }
+ return &SPSCQueue{
+ cap: uint64(capacity + 1),
+ items: make([]interface{}, capacity+1),
+ }
+}
+
+// TryEnqueue inserts the given item into the queue. Does not block
+// and returns immediately. The result indicates that the queue isn't
+// full and the item was inserted.
+func (q *SPSCQueue) TryEnqueue(item interface{}) bool {
+ // relaxed memory order would be enough here
+ idx := atomic.LoadUint64(&q.pidx)
+ nextIdx := idx + 1
+ if nextIdx == q.cap {
+ nextIdx = 0
+ }
+ cachedIdx := q.ccachedIdx
+ if nextIdx == cachedIdx {
+ cachedIdx = atomic.LoadUint64(&q.cidx)
+ q.ccachedIdx = cachedIdx
+ if nextIdx == cachedIdx {
+ return false
+ }
+ }
+ q.items[idx] = item
+ atomic.StoreUint64(&q.pidx, nextIdx)
+ return true
+}
+
+// TryDequeue retrieves and removes the item from the head of the
+// queue. Does not block and returns immediately. The ok result
+// indicates that the queue isn't empty and an item was retrieved.
+func (q *SPSCQueue) TryDequeue() (item interface{}, ok bool) {
+ // relaxed memory order would be enough here
+ idx := atomic.LoadUint64(&q.cidx)
+ cachedIdx := q.pcachedIdx
+ if idx == cachedIdx {
+ cachedIdx = atomic.LoadUint64(&q.pidx)
+ q.pcachedIdx = cachedIdx
+ if idx == cachedIdx {
+ return
+ }
+ }
+ item = q.items[idx]
+ q.items[idx] = nil
+ ok = true
+ nextIdx := idx + 1
+ if nextIdx == q.cap {
+ nextIdx = 0
+ }
+ atomic.StoreUint64(&q.cidx, nextIdx)
+ return
+}