diff options
| author | 2024-11-08 13:51:23 +0000 | |
|---|---|---|
| committer | 2024-11-08 13:51:23 +0000 | |
| commit | 29007b1b886e7455ece5aa6e4d477805209fad88 (patch) | |
| tree | 21b11cbf12faf71d44d43c61a2d4916b7bd82b80 /vendor/github.com/puzpuzpuz/xsync/v3/mpmcqueue.go | |
| parent | bump ncruces/go-sqlite3 to v0.20.2 (#3524) (diff) | |
| download | gotosocial-29007b1b886e7455ece5aa6e4d477805209fad88.tar.xz | |
[chore] update bun libraries to v1.2.5 (#3528)
* update bun libraries to v1.2.5
* pin old v1.29.0 of otel
Diffstat (limited to 'vendor/github.com/puzpuzpuz/xsync/v3/mpmcqueue.go')
| -rw-r--r-- | vendor/github.com/puzpuzpuz/xsync/v3/mpmcqueue.go | 137 |
1 files changed, 137 insertions, 0 deletions
diff --git a/vendor/github.com/puzpuzpuz/xsync/v3/mpmcqueue.go b/vendor/github.com/puzpuzpuz/xsync/v3/mpmcqueue.go new file mode 100644 index 000000000..96584e698 --- /dev/null +++ b/vendor/github.com/puzpuzpuz/xsync/v3/mpmcqueue.go @@ -0,0 +1,137 @@ +package xsync + +import ( + "runtime" + "sync/atomic" + "unsafe" +) + +// A MPMCQueue is a bounded multi-producer multi-consumer concurrent +// queue. +// +// MPMCQueue instances must be created with NewMPMCQueue function. +// A MPMCQueue must not be copied after first use. +// +// Based on the data structure from the following C++ library: +// https://github.com/rigtorp/MPMCQueue +type MPMCQueue struct { + cap uint64 + head uint64 + //lint:ignore U1000 prevents false sharing + hpad [cacheLineSize - 8]byte + tail uint64 + //lint:ignore U1000 prevents false sharing + tpad [cacheLineSize - 8]byte + slots []slotPadded +} + +type slotPadded struct { + slot + //lint:ignore U1000 prevents false sharing + pad [cacheLineSize - unsafe.Sizeof(slot{})]byte +} + +type slot struct { + turn uint64 + item interface{} +} + +// NewMPMCQueue creates a new MPMCQueue instance with the given +// capacity. +func NewMPMCQueue(capacity int) *MPMCQueue { + if capacity < 1 { + panic("capacity must be positive number") + } + return &MPMCQueue{ + cap: uint64(capacity), + slots: make([]slotPadded, capacity), + } +} + +// Enqueue inserts the given item into the queue. +// Blocks, if the queue is full. +func (q *MPMCQueue) Enqueue(item interface{}) { + head := atomic.AddUint64(&q.head, 1) - 1 + slot := &q.slots[q.idx(head)] + turn := q.turn(head) * 2 + for atomic.LoadUint64(&slot.turn) != turn { + runtime.Gosched() + } + slot.item = item + atomic.StoreUint64(&slot.turn, turn+1) +} + +// Dequeue retrieves and removes the item from the head of the queue. +// Blocks, if the queue is empty. +func (q *MPMCQueue) Dequeue() interface{} { + tail := atomic.AddUint64(&q.tail, 1) - 1 + slot := &q.slots[q.idx(tail)] + turn := q.turn(tail)*2 + 1 + for atomic.LoadUint64(&slot.turn) != turn { + runtime.Gosched() + } + item := slot.item + slot.item = nil + atomic.StoreUint64(&slot.turn, turn+1) + return item +} + +// TryEnqueue inserts the given item into the queue. Does not block +// and returns immediately. The result indicates that the queue isn't +// full and the item was inserted. +func (q *MPMCQueue) TryEnqueue(item interface{}) bool { + head := atomic.LoadUint64(&q.head) + for { + slot := &q.slots[q.idx(head)] + turn := q.turn(head) * 2 + if atomic.LoadUint64(&slot.turn) == turn { + if atomic.CompareAndSwapUint64(&q.head, head, head+1) { + slot.item = item + atomic.StoreUint64(&slot.turn, turn+1) + return true + } + } else { + prevHead := head + head = atomic.LoadUint64(&q.head) + if head == prevHead { + return false + } + } + runtime.Gosched() + } +} + +// TryDequeue retrieves and removes the item from the head of the +// queue. Does not block and returns immediately. The ok result +// indicates that the queue isn't empty and an item was retrieved. +func (q *MPMCQueue) TryDequeue() (item interface{}, ok bool) { + tail := atomic.LoadUint64(&q.tail) + for { + slot := &q.slots[q.idx(tail)] + turn := q.turn(tail)*2 + 1 + if atomic.LoadUint64(&slot.turn) == turn { + if atomic.CompareAndSwapUint64(&q.tail, tail, tail+1) { + item = slot.item + ok = true + slot.item = nil + atomic.StoreUint64(&slot.turn, turn+1) + return + } + } else { + prevTail := tail + tail = atomic.LoadUint64(&q.tail) + if tail == prevTail { + return + } + } + runtime.Gosched() + } +} + +func (q *MPMCQueue) idx(i uint64) uint64 { + return i % q.cap +} + +func (q *MPMCQueue) turn(i uint64) uint64 { + return i / q.cap +} |
