diff options
| author | 2025-01-27 15:54:51 +0000 | |
|---|---|---|
| committer | 2025-01-27 15:54:51 +0000 | |
| commit | 3617e27afa181392763258240bc276f3da4b44e2 (patch) | |
| tree | 9a65f9a361fd9ddf53de7d32ca35c81aade6cb84 /vendor/github.com/puzpuzpuz/xsync/v3/spscqueueof.go | |
| parent | [feature/frontend] Add login button to index page which reiterates info about... (diff) | |
| download | gotosocial-3617e27afa181392763258240bc276f3da4b44e2.tar.xz | |
bumps uptrace/bun deps to v1.2.8 (#3698)
Diffstat (limited to 'vendor/github.com/puzpuzpuz/xsync/v3/spscqueueof.go')
| -rw-r--r-- | vendor/github.com/puzpuzpuz/xsync/v3/spscqueueof.go | 96 |
1 files changed, 96 insertions, 0 deletions
diff --git a/vendor/github.com/puzpuzpuz/xsync/v3/spscqueueof.go b/vendor/github.com/puzpuzpuz/xsync/v3/spscqueueof.go new file mode 100644 index 000000000..3ae132e50 --- /dev/null +++ b/vendor/github.com/puzpuzpuz/xsync/v3/spscqueueof.go @@ -0,0 +1,96 @@ +//go:build go1.19 +// +build go1.19 + +package xsync + +import ( + "sync/atomic" +) + +// A SPSCQueueOf is a bounded single-producer single-consumer concurrent +// queue. This means that not more than a single goroutine must be +// publishing items to the queue while not more than a single goroutine +// must be consuming those items. +// +// SPSCQueueOf instances must be created with NewSPSCQueueOf function. +// A SPSCQueueOf must not be copied after first use. +// +// Based on the data structure from the following article: +// https://rigtorp.se/ringbuffer/ +type SPSCQueueOf[I any] struct { + cap uint64 + pidx uint64 + //lint:ignore U1000 prevents false sharing + pad0 [cacheLineSize - 8]byte + pcachedIdx uint64 + //lint:ignore U1000 prevents false sharing + pad1 [cacheLineSize - 8]byte + cidx uint64 + //lint:ignore U1000 prevents false sharing + pad2 [cacheLineSize - 8]byte + ccachedIdx uint64 + //lint:ignore U1000 prevents false sharing + pad3 [cacheLineSize - 8]byte + items []I +} + +// NewSPSCQueueOf creates a new SPSCQueueOf instance with the given +// capacity. +func NewSPSCQueueOf[I any](capacity int) *SPSCQueueOf[I] { + if capacity < 1 { + panic("capacity must be positive number") + } + return &SPSCQueueOf[I]{ + cap: uint64(capacity + 1), + items: make([]I, capacity+1), + } +} + +// TryEnqueue inserts the given item into the queue. Does not block +// and returns immediately. The result indicates that the queue isn't +// full and the item was inserted. +func (q *SPSCQueueOf[I]) TryEnqueue(item I) bool { + // relaxed memory order would be enough here + idx := atomic.LoadUint64(&q.pidx) + next_idx := idx + 1 + if next_idx == q.cap { + next_idx = 0 + } + cached_idx := q.ccachedIdx + if next_idx == cached_idx { + cached_idx = atomic.LoadUint64(&q.cidx) + q.ccachedIdx = cached_idx + if next_idx == cached_idx { + return false + } + } + q.items[idx] = item + atomic.StoreUint64(&q.pidx, next_idx) + return true +} + +// TryDequeue retrieves and removes the item from the head of the +// queue. Does not block and returns immediately. The ok result +// indicates that the queue isn't empty and an item was retrieved. +func (q *SPSCQueueOf[I]) TryDequeue() (item I, ok bool) { + // relaxed memory order would be enough here + idx := atomic.LoadUint64(&q.cidx) + cached_idx := q.pcachedIdx + if idx == cached_idx { + cached_idx = atomic.LoadUint64(&q.pidx) + q.pcachedIdx = cached_idx + if idx == cached_idx { + return + } + } + var zeroI I + item = q.items[idx] + q.items[idx] = zeroI + ok = true + next_idx := idx + 1 + if next_idx == q.cap { + next_idx = 0 + } + atomic.StoreUint64(&q.cidx, next_idx) + return +} |
