diff options
Diffstat (limited to 'vendor/github.com/puzpuzpuz/xsync/v3/mpmcqueueof.go')
-rw-r--r-- | vendor/github.com/puzpuzpuz/xsync/v3/mpmcqueueof.go | 138 |
1 files changed, 0 insertions, 138 deletions
diff --git a/vendor/github.com/puzpuzpuz/xsync/v3/mpmcqueueof.go b/vendor/github.com/puzpuzpuz/xsync/v3/mpmcqueueof.go deleted file mode 100644 index 3f7e4ccc1..000000000 --- a/vendor/github.com/puzpuzpuz/xsync/v3/mpmcqueueof.go +++ /dev/null @@ -1,138 +0,0 @@ -//go:build go1.19 -// +build go1.19 - -package xsync - -import ( - "runtime" - "sync/atomic" - "unsafe" -) - -// A MPMCQueueOf is a bounded multi-producer multi-consumer concurrent -// queue. It's a generic version of MPMCQueue. -// -// MPMCQueueOf instances must be created with NewMPMCQueueOf function. -// A MPMCQueueOf must not be copied after first use. -// -// Based on the data structure from the following C++ library: -// https://github.com/rigtorp/MPMCQueue -type MPMCQueueOf[I any] struct { - cap uint64 - head uint64 - //lint:ignore U1000 prevents false sharing - hpad [cacheLineSize - 8]byte - tail uint64 - //lint:ignore U1000 prevents false sharing - tpad [cacheLineSize - 8]byte - slots []slotOfPadded[I] -} - -type slotOfPadded[I any] struct { - slotOf[I] - // Unfortunately, proper padding like the below one: - // - // pad [cacheLineSize - (unsafe.Sizeof(slotOf[I]{}) % cacheLineSize)]byte - // - // won't compile, so here we add a best-effort padding for items up to - // 56 bytes size. - //lint:ignore U1000 prevents false sharing - pad [cacheLineSize - unsafe.Sizeof(atomic.Uint64{})]byte -} - -type slotOf[I any] struct { - // atomic.Uint64 is used here to get proper 8 byte alignment on - // 32-bit archs. - turn atomic.Uint64 - item I -} - -// NewMPMCQueueOf creates a new MPMCQueueOf instance with the given -// capacity. -func NewMPMCQueueOf[I any](capacity int) *MPMCQueueOf[I] { - if capacity < 1 { - panic("capacity must be positive number") - } - return &MPMCQueueOf[I]{ - cap: uint64(capacity), - slots: make([]slotOfPadded[I], capacity), - } -} - -// Enqueue inserts the given item into the queue. -// Blocks, if the queue is full. -// -// Deprecated: use TryEnqueue in combination with runtime.Gosched(). -func (q *MPMCQueueOf[I]) Enqueue(item I) { - head := atomic.AddUint64(&q.head, 1) - 1 - slot := &q.slots[q.idx(head)] - turn := q.turn(head) * 2 - for slot.turn.Load() != turn { - runtime.Gosched() - } - slot.item = item - slot.turn.Store(turn + 1) -} - -// Dequeue retrieves and removes the item from the head of the queue. -// Blocks, if the queue is empty. -// -// Deprecated: use TryDequeue in combination with runtime.Gosched(). -func (q *MPMCQueueOf[I]) Dequeue() I { - var zeroI I - tail := atomic.AddUint64(&q.tail, 1) - 1 - slot := &q.slots[q.idx(tail)] - turn := q.turn(tail)*2 + 1 - for slot.turn.Load() != turn { - runtime.Gosched() - } - item := slot.item - slot.item = zeroI - slot.turn.Store(turn + 1) - return item -} - -// TryEnqueue inserts the given item into the queue. Does not block -// and returns immediately. The result indicates that the queue isn't -// full and the item was inserted. -func (q *MPMCQueueOf[I]) TryEnqueue(item I) bool { - head := atomic.LoadUint64(&q.head) - slot := &q.slots[q.idx(head)] - turn := q.turn(head) * 2 - if slot.turn.Load() == turn { - if atomic.CompareAndSwapUint64(&q.head, head, head+1) { - slot.item = item - slot.turn.Store(turn + 1) - return true - } - } - return false -} - -// TryDequeue retrieves and removes the item from the head of the -// queue. Does not block and returns immediately. The ok result -// indicates that the queue isn't empty and an item was retrieved. -func (q *MPMCQueueOf[I]) TryDequeue() (item I, ok bool) { - tail := atomic.LoadUint64(&q.tail) - slot := &q.slots[q.idx(tail)] - turn := q.turn(tail)*2 + 1 - if slot.turn.Load() == turn { - if atomic.CompareAndSwapUint64(&q.tail, tail, tail+1) { - var zeroI I - item = slot.item - ok = true - slot.item = zeroI - slot.turn.Store(turn + 1) - return - } - } - return -} - -func (q *MPMCQueueOf[I]) idx(i uint64) uint64 { - return i % q.cap -} - -func (q *MPMCQueueOf[I]) turn(i uint64) uint64 { - return i / q.cap -} |