diff options
author | 2024-04-26 13:50:46 +0100 | |
---|---|---|
committer | 2024-04-26 13:50:46 +0100 | |
commit | c9c0773f2c2363dcfa37e675b83ec3f0b49bd0d9 (patch) | |
tree | dbd3409070765d5ca81448a574ccd32b4da1ffe6 /vendor/codeberg.org | |
parent | [chore] update Docker container to use new go swagger hash (#2872) (diff) | |
download | gotosocial-c9c0773f2c2363dcfa37e675b83ec3f0b49bd0d9.tar.xz |
[performance] update remaining worker pools to use queues (#2865)
* start replacing client + federator + media workers with new worker + queue types
* refactor federatingDB.Delete(), drop queued messages when deleting account / status
* move all queue purging to the processor workers
* undo toolchain updates
* code comments, ensure dereferencer worker pool gets started
* update gruf libraries in readme
* start the job scheduler separately to the worker pools
* reshuffle ordering or server.go + remove duplicate worker start / stop
* update go-list version
* fix vendoring
* move queue invalidation to before wipeing / deletion, to ensure queued work not dropped
* add logging to worker processing functions in testrig, don't start workers in unexpected places
* update go-structr to add (+then rely on) QueueCtx{} type
* ensure more worker pools get started properly in tests
* fix remaining broken tests relying on worker queue logic
* fix account test suite queue popping logic, ensure noop workers do not pull from queue
* move back accidentally shuffled account deletion order
* ensure error (non nil!!) gets passed in refactored federatingDB{}.Delete()
* silently drop deletes from accounts not permitted to
* don't warn log on forwarded deletes
* make if else clauses easier to parse
* use getFederatorMsg()
* improved code comment
* improved code comment re: requesting account delete checks
* remove boolean result from worker start / stop since false = already running or already stopped
* remove optional passed-in http.client
* remove worker starting from the admin CLI commands (we don't need to handle side-effects)
* update prune cli to start scheduler but not all of the workers
* fix rebase issues
* remove redundant return statements
* i'm sorry sir linter
Diffstat (limited to 'vendor/codeberg.org')
-rw-r--r-- | vendor/codeberg.org/gruf/go-list/LICENSE | 9 | ||||
-rw-r--r-- | vendor/codeberg.org/gruf/go-list/README.md | 3 | ||||
-rw-r--r-- | vendor/codeberg.org/gruf/go-list/list.go | 204 | ||||
-rw-r--r-- | vendor/codeberg.org/gruf/go-structr/cache.go | 61 | ||||
-rw-r--r-- | vendor/codeberg.org/gruf/go-structr/queue_ctx.go | 134 | ||||
-rw-r--r-- | vendor/codeberg.org/gruf/go-structr/util.go | 13 |
6 files changed, 404 insertions, 20 deletions
diff --git a/vendor/codeberg.org/gruf/go-list/LICENSE b/vendor/codeberg.org/gruf/go-list/LICENSE new file mode 100644 index 000000000..d6f08d0ab --- /dev/null +++ b/vendor/codeberg.org/gruf/go-list/LICENSE @@ -0,0 +1,9 @@ +MIT License + +Copyright (c) gruf + +Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. diff --git a/vendor/codeberg.org/gruf/go-list/README.md b/vendor/codeberg.org/gruf/go-list/README.md new file mode 100644 index 000000000..c5ae37cc0 --- /dev/null +++ b/vendor/codeberg.org/gruf/go-list/README.md @@ -0,0 +1,3 @@ +# go-list + +a doubly-linked list library with generic support. diff --git a/vendor/codeberg.org/gruf/go-list/list.go b/vendor/codeberg.org/gruf/go-list/list.go new file mode 100644 index 000000000..5490fa636 --- /dev/null +++ b/vendor/codeberg.org/gruf/go-list/list.go @@ -0,0 +1,204 @@ +package list + +// Elem represents an element in a doubly-linked list. +type Elem[T any] struct { + Next *Elem[T] + Prev *Elem[T] + Value T +} + +// List implements a doubly-linked list, where: +// - Head = index 0 (i.e. the front) +// - Tail = index n-1 (i.e. the back) +type List[T any] struct { + Head *Elem[T] + Tail *Elem[T] + len int +} + +// Len returns the current list length. +func (l *List[T]) Len() int { + return l.len +} + +// PushFront adds 'v' to the beginning of the list. +func (l *List[T]) PushFront(v T) *Elem[T] { + elem := &Elem[T]{Value: v} + l.PushElemFront(elem) + return elem +} + +// PushBack adds 'v' to the end of the list. +func (l *List[T]) PushBack(v T) *Elem[T] { + elem := &Elem[T]{Value: v} + l.PushElemBack(elem) + return elem +} + +// InsertBefore adds 'v' into the list before 'at'. +func (l *List[T]) InsertBefore(v T, at *Elem[T]) *Elem[T] { + elem := &Elem[T]{Value: v} + l.InsertElemBefore(elem, at) + return elem +} + +// InsertAfter adds 'v' into the list after 'at'. +func (l *List[T]) InsertAfter(v T, at *Elem[T]) *Elem[T] { + elem := &Elem[T]{Value: v} + l.InsertElemAfter(elem, at) + return elem +} + +// PushFrontNode adds 'elem' to the front of the list. +func (l *List[T]) PushElemFront(elem *Elem[T]) { + if elem == l.Head { + return + } + + // Set new head. + oldHead := l.Head + l.Head = elem + + if oldHead != nil { + // Link to old head + elem.Next = oldHead + oldHead.Prev = elem + } else { + // First in list. + l.Tail = elem + } + + // Incr count + l.len++ +} + +// PushBackNode adds 'elem' to the back of the list. +func (l *List[T]) PushElemBack(elem *Elem[T]) { + if elem == l.Tail { + return + } + + // Set new tail. + oldTail := l.Tail + l.Tail = elem + + if oldTail != nil { + // Link to old tail + elem.Prev = oldTail + oldTail.Next = elem + } else { + // First in list. + l.Head = elem + } + + // Incr count + l.len++ +} + +// InsertElemAfter adds 'elem' into the list after 'at' (i.e. at.Next = elem). +func (l *List[T]) InsertElemAfter(elem *Elem[T], at *Elem[T]) { + if elem == at { + return + } + + // Set new 'next'. + oldNext := at.Next + at.Next = elem + + // Link to 'at'. + elem.Prev = at + + if oldNext == nil { + // Set new tail + l.Tail = elem + } else { + // Link to 'prev'. + oldNext.Prev = elem + elem.Next = oldNext + } + + // Incr count + l.len++ +} + +// InsertElemBefore adds 'elem' into the list before 'at' (i.e. at.Prev = elem). +func (l *List[T]) InsertElemBefore(elem *Elem[T], at *Elem[T]) { + if elem == at { + return + } + + // Set new 'prev'. + oldPrev := at.Prev + at.Prev = elem + + // Link to 'at'. + elem.Next = at + + if oldPrev == nil { + // Set new head + l.Head = elem + } else { + // Link to 'next'. + oldPrev.Next = elem + elem.Prev = oldPrev + } + + // Incr count + l.len++ +} + +// Remove removes the 'elem' from the list. +func (l *List[T]) Remove(elem *Elem[T]) { + // Get linked elems. + next := elem.Next + prev := elem.Prev + + // Unset elem. + elem.Next = nil + elem.Prev = nil + + switch { + // elem is ONLY one in list. + case next == nil && prev == nil: + l.Head = nil + l.Tail = nil + + // elem is front in list. + case next != nil && prev == nil: + l.Head = next + next.Prev = nil + + // elem is last in list. + case prev != nil && next == nil: + l.Tail = prev + prev.Next = nil + + // elem in middle of list. + default: + next.Prev = prev + prev.Next = next + } + + // Decr count + l.len-- +} + +// Range calls 'fn' on every element from head forward in list. +func (l *List[T]) Range(fn func(*Elem[T])) { + if fn == nil { + panic("nil function") + } + for elem := l.Head; elem != nil; elem = elem.Next { + fn(elem) + } +} + +// RangeReverse calls 'fn' on every element from tail backward in list. +func (l *List[T]) RangeReverse(fn func(*Elem[T])) { + if fn == nil { + panic("nil function") + } + for elem := l.Tail; elem != nil; elem = elem.Prev { + fn(elem) + } +} diff --git a/vendor/codeberg.org/gruf/go-structr/cache.go b/vendor/codeberg.org/gruf/go-structr/cache.go index 9d5e7f912..1a2a07257 100644 --- a/vendor/codeberg.org/gruf/go-structr/cache.go +++ b/vendor/codeberg.org/gruf/go-structr/cache.go @@ -150,10 +150,10 @@ func (c *Cache[T]) Get(index *Index, keys ...Key) []T { // Acquire lock. c.mutex.Lock() + defer c.mutex.Unlock() // Check cache init. if c.copy == nil { - c.mutex.Unlock() panic("not initialized") } @@ -173,9 +173,6 @@ func (c *Cache[T]) Get(index *Index, keys ...Key) []T { }) } - // Done with lock. - c.mutex.Unlock() - return values } @@ -185,12 +182,12 @@ func (c *Cache[T]) Put(values ...T) { // Acquire lock. c.mutex.Lock() - // Get func ptrs. - invalid := c.invalid + // Wrap unlock to only do once. + unlock := once(c.mutex.Unlock) + defer unlock() // Check cache init. if c.copy == nil { - c.mutex.Unlock() panic("not initialized") } @@ -203,8 +200,12 @@ func (c *Cache[T]) Put(values ...T) { ) } - // Done with lock. - c.mutex.Unlock() + // Get func ptrs. + invalid := c.invalid + + // Done with + // the lock. + unlock() if invalid != nil { // Pass all invalidated values @@ -241,13 +242,13 @@ func (c *Cache[T]) LoadOne(index *Index, key Key, load func() (T, error)) (T, er // Acquire lock. c.mutex.Lock() - // Get func ptrs. - ignore := c.ignore + // Wrap unlock to only do once. + unlock := once(c.mutex.Unlock) + defer unlock() // Check init'd. if c.copy == nil || - ignore == nil { - c.mutex.Unlock() + c.ignore == nil { panic("not initialized") } @@ -273,8 +274,12 @@ func (c *Cache[T]) LoadOne(index *Index, key Key, load func() (T, error)) (T, er } } - // Done with lock. - c.mutex.Unlock() + // Get func ptrs. + ignore := c.ignore + + // Done with + // the lock. + unlock() if ok { // item found! @@ -325,9 +330,12 @@ func (c *Cache[T]) Load(index *Index, keys []Key, load func([]Key) ([]T, error)) // Acquire lock. c.mutex.Lock() + // Wrap unlock to only do once. + unlock := once(c.mutex.Unlock) + defer unlock() + // Check init'd. if c.copy == nil { - c.mutex.Unlock() panic("not initialized") } @@ -365,8 +373,9 @@ func (c *Cache[T]) Load(index *Index, keys []Key, load func([]Key) ([]T, error)) i++ } - // Done with lock. - c.mutex.Unlock() + // Done with + // the lock. + unlock() // Load uncached values. uncached, err := load(keys) @@ -374,8 +383,20 @@ func (c *Cache[T]) Load(index *Index, keys []Key, load func([]Key) ([]T, error)) return nil, err } - // Insert uncached. - c.Put(uncached...) + // Acquire lock. + c.mutex.Lock() + + // Store all uncached values. + for i := range uncached { + c.store_value( + nil, + Key{}, + uncached[i], + ) + } + + // Done with lock. + c.mutex.Unlock() // Append uncached to return values. values = append(values, uncached...) diff --git a/vendor/codeberg.org/gruf/go-structr/queue_ctx.go b/vendor/codeberg.org/gruf/go-structr/queue_ctx.go new file mode 100644 index 000000000..d8843c1eb --- /dev/null +++ b/vendor/codeberg.org/gruf/go-structr/queue_ctx.go @@ -0,0 +1,134 @@ +package structr + +import ( + "context" +) + +// QueueCtx is a context-aware form of Queue{}. +type QueueCtx[StructType any] struct { + Queue[StructType] + ch chan struct{} +} + +// PopFront pops the current value at front of the queue, else blocking on ctx. +func (q *QueueCtx[T]) PopFront(ctx context.Context) (T, bool) { + return q.pop(ctx, func() *list_elem { + return q.queue.head + }) +} + +// PopBack pops the current value at back of the queue, else blocking on ctx. +func (q *QueueCtx[T]) PopBack(ctx context.Context) (T, bool) { + return q.pop(ctx, func() *list_elem { + return q.queue.tail + }) +} + +// PushFront pushes values to front of queue. +func (q *QueueCtx[T]) PushFront(values ...T) { + q.mutex.Lock() + for i := range values { + item := q.index(values[i]) + q.queue.push_front(&item.elem) + } + if q.ch != nil { + close(q.ch) + q.ch = nil + } + q.mutex.Unlock() +} + +// PushBack pushes values to back of queue. +func (q *QueueCtx[T]) PushBack(values ...T) { + q.mutex.Lock() + for i := range values { + item := q.index(values[i]) + q.queue.push_back(&item.elem) + } + if q.ch != nil { + close(q.ch) + q.ch = nil + } + q.mutex.Unlock() +} + +// Wait returns a ptr to the current ctx channel, +// this will block until next push to the queue. +func (q *QueueCtx[T]) Wait() <-chan struct{} { + q.mutex.Lock() + if q.ch == nil { + q.ch = make(chan struct{}) + } + ctx := q.ch + q.mutex.Unlock() + return ctx +} + +func (q *QueueCtx[T]) pop(ctx context.Context, next func() *list_elem) (T, bool) { + if next == nil { + panic("nil fn") + } else if ctx == nil { + panic("nil ctx") + } + + // Acquire lock. + q.mutex.Lock() + + var elem *list_elem + + for { + // Get element. + elem = next() + if elem != nil { + break + } + + if q.ch == nil { + // Allocate new ctx channel. + q.ch = make(chan struct{}) + } + + // Get current + // ch pointer. + ch := q.ch + + // Unlock queue. + q.mutex.Unlock() + + select { + // Ctx cancelled. + case <-ctx.Done(): + var z T + return z, false + + // Pushed! + case <-ch: + } + + // Relock queue. + q.mutex.Lock() + } + + // Cast the indexed item from elem. + item := (*indexed_item)(elem.data) + + // Extract item value. + value := item.data.(T) + + // Delete queued. + q.delete(item) + + // Get func ptrs. + pop := q.Queue.pop + + // Done with lock. + q.mutex.Unlock() + + if pop != nil { + // Pass to + // user hook. + pop(value) + } + + return value, true +} diff --git a/vendor/codeberg.org/gruf/go-structr/util.go b/vendor/codeberg.org/gruf/go-structr/util.go new file mode 100644 index 000000000..46535fcff --- /dev/null +++ b/vendor/codeberg.org/gruf/go-structr/util.go @@ -0,0 +1,13 @@ +package structr + +// once only executes 'fn' once. +func once(fn func()) func() { + var once int32 + return func() { + if once != 0 { + return + } + once = 1 + fn() + } +} |