summaryrefslogtreecommitdiff
path: root/vendor/codeberg.org/gruf/go-structr/queue_ctx.go
diff options
context:
space:
mode:
authorLibravatar kim <89579420+NyaaaWhatsUpDoc@users.noreply.github.com>2024-04-26 13:50:46 +0100
committerLibravatar GitHub <noreply@github.com>2024-04-26 13:50:46 +0100
commitc9c0773f2c2363dcfa37e675b83ec3f0b49bd0d9 (patch)
treedbd3409070765d5ca81448a574ccd32b4da1ffe6 /vendor/codeberg.org/gruf/go-structr/queue_ctx.go
parent[chore] update Docker container to use new go swagger hash (#2872) (diff)
downloadgotosocial-c9c0773f2c2363dcfa37e675b83ec3f0b49bd0d9.tar.xz
[performance] update remaining worker pools to use queues (#2865)
* start replacing client + federator + media workers with new worker + queue types * refactor federatingDB.Delete(), drop queued messages when deleting account / status * move all queue purging to the processor workers * undo toolchain updates * code comments, ensure dereferencer worker pool gets started * update gruf libraries in readme * start the job scheduler separately to the worker pools * reshuffle ordering or server.go + remove duplicate worker start / stop * update go-list version * fix vendoring * move queue invalidation to before wipeing / deletion, to ensure queued work not dropped * add logging to worker processing functions in testrig, don't start workers in unexpected places * update go-structr to add (+then rely on) QueueCtx{} type * ensure more worker pools get started properly in tests * fix remaining broken tests relying on worker queue logic * fix account test suite queue popping logic, ensure noop workers do not pull from queue * move back accidentally shuffled account deletion order * ensure error (non nil!!) gets passed in refactored federatingDB{}.Delete() * silently drop deletes from accounts not permitted to * don't warn log on forwarded deletes * make if else clauses easier to parse * use getFederatorMsg() * improved code comment * improved code comment re: requesting account delete checks * remove boolean result from worker start / stop since false = already running or already stopped * remove optional passed-in http.client * remove worker starting from the admin CLI commands (we don't need to handle side-effects) * update prune cli to start scheduler but not all of the workers * fix rebase issues * remove redundant return statements * i'm sorry sir linter
Diffstat (limited to 'vendor/codeberg.org/gruf/go-structr/queue_ctx.go')
-rw-r--r--vendor/codeberg.org/gruf/go-structr/queue_ctx.go134
1 files changed, 134 insertions, 0 deletions
diff --git a/vendor/codeberg.org/gruf/go-structr/queue_ctx.go b/vendor/codeberg.org/gruf/go-structr/queue_ctx.go
new file mode 100644
index 000000000..d8843c1eb
--- /dev/null
+++ b/vendor/codeberg.org/gruf/go-structr/queue_ctx.go
@@ -0,0 +1,134 @@
+package structr
+
+import (
+ "context"
+)
+
+// QueueCtx is a context-aware form of Queue{}.
+type QueueCtx[StructType any] struct {
+ Queue[StructType]
+ ch chan struct{}
+}
+
+// PopFront pops the current value at front of the queue, else blocking on ctx.
+func (q *QueueCtx[T]) PopFront(ctx context.Context) (T, bool) {
+ return q.pop(ctx, func() *list_elem {
+ return q.queue.head
+ })
+}
+
+// PopBack pops the current value at back of the queue, else blocking on ctx.
+func (q *QueueCtx[T]) PopBack(ctx context.Context) (T, bool) {
+ return q.pop(ctx, func() *list_elem {
+ return q.queue.tail
+ })
+}
+
+// PushFront pushes values to front of queue.
+func (q *QueueCtx[T]) PushFront(values ...T) {
+ q.mutex.Lock()
+ for i := range values {
+ item := q.index(values[i])
+ q.queue.push_front(&item.elem)
+ }
+ if q.ch != nil {
+ close(q.ch)
+ q.ch = nil
+ }
+ q.mutex.Unlock()
+}
+
+// PushBack pushes values to back of queue.
+func (q *QueueCtx[T]) PushBack(values ...T) {
+ q.mutex.Lock()
+ for i := range values {
+ item := q.index(values[i])
+ q.queue.push_back(&item.elem)
+ }
+ if q.ch != nil {
+ close(q.ch)
+ q.ch = nil
+ }
+ q.mutex.Unlock()
+}
+
+// Wait returns a ptr to the current ctx channel,
+// this will block until next push to the queue.
+func (q *QueueCtx[T]) Wait() <-chan struct{} {
+ q.mutex.Lock()
+ if q.ch == nil {
+ q.ch = make(chan struct{})
+ }
+ ctx := q.ch
+ q.mutex.Unlock()
+ return ctx
+}
+
+func (q *QueueCtx[T]) pop(ctx context.Context, next func() *list_elem) (T, bool) {
+ if next == nil {
+ panic("nil fn")
+ } else if ctx == nil {
+ panic("nil ctx")
+ }
+
+ // Acquire lock.
+ q.mutex.Lock()
+
+ var elem *list_elem
+
+ for {
+ // Get element.
+ elem = next()
+ if elem != nil {
+ break
+ }
+
+ if q.ch == nil {
+ // Allocate new ctx channel.
+ q.ch = make(chan struct{})
+ }
+
+ // Get current
+ // ch pointer.
+ ch := q.ch
+
+ // Unlock queue.
+ q.mutex.Unlock()
+
+ select {
+ // Ctx cancelled.
+ case <-ctx.Done():
+ var z T
+ return z, false
+
+ // Pushed!
+ case <-ch:
+ }
+
+ // Relock queue.
+ q.mutex.Lock()
+ }
+
+ // Cast the indexed item from elem.
+ item := (*indexed_item)(elem.data)
+
+ // Extract item value.
+ value := item.data.(T)
+
+ // Delete queued.
+ q.delete(item)
+
+ // Get func ptrs.
+ pop := q.Queue.pop
+
+ // Done with lock.
+ q.mutex.Unlock()
+
+ if pop != nil {
+ // Pass to
+ // user hook.
+ pop(value)
+ }
+
+ return value, true
+}