diff options
author | 2024-04-11 10:45:35 +0100 | |
---|---|---|
committer | 2024-04-11 11:45:35 +0200 | |
commit | a483bd9e38333b153df1d4df95276cca38f99ff5 (patch) | |
tree | b2fdb6f53ef248b31719a15adc93e767eba3d5c4 /internal/queue/wrappers.go | |
parent | [chore]: Bump github.com/yuin/goldmark from 1.7.0 to 1.7.1 (#2819) (diff) | |
download | gotosocial-a483bd9e38333b153df1d4df95276cca38f99ff5.tar.xz |
[performance] massively improved ActivityPub delivery worker efficiency (#2812)
* add delivery worker type that pulls from queue to httpclient package
* finish up some code commenting, bodge a vendored activity library change, integrate the deliverypool changes into transportcontroller
* hook up queue deletion logic
* support deleting queued http requests by target ID
* don't index APRequest by hostname in the queue
* use gorun
* use the original context's values when wrapping msg type as delivery{}
* actually log in the AP delivery worker ...
* add uncommitted changes
* use errors.AsV2()
* use errorsv2.AsV2()
* finish adding some code comments, add bad host handling to delivery workers
* slightly tweak deliveryworkerpool API, use advanced sender multiplier
* remove PopCtx() method, let others instead rely on Wait()
* shuffle things around to move delivery stuff into transport/ subpkg
* remove dead code
* formatting
* validate request before queueing for delivery
* finish adding code comments, fix up backoff code
* finish adding more code comments
* clamp minimum no. senders to 1
* add start/stop logging to delivery worker, some slight changes
* remove double logging
* use worker ptrs
* expose the embedded log fields in httpclient.Request{}
* ensure request context values are preserved when updating ctx
* add delivery worker tests
* fix linter issues
* ensure delivery worker gets inited in testrig
* fix tests to delivering messages to check worker delivery queue
* update error type to use ptr instead of value receiver
* fix test calling Workers{}.Start() instead of testrig.StartWorkers()
* update docs for advanced-sender-multiplier
* update to the latest activity library version
* add comment about not using httptest.Server{}
Diffstat (limited to 'internal/queue/wrappers.go')
-rw-r--r-- | internal/queue/wrappers.go | 96 |
1 files changed, 96 insertions, 0 deletions
diff --git a/internal/queue/wrappers.go b/internal/queue/wrappers.go new file mode 100644 index 000000000..e07984f84 --- /dev/null +++ b/internal/queue/wrappers.go @@ -0,0 +1,96 @@ +// GoToSocial +// Copyright (C) GoToSocial Authors admin@gotosocial.org +// SPDX-License-Identifier: AGPL-3.0-or-later +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see <http://www.gnu.org/licenses/>. + +package queue + +import ( + "sync/atomic" + + "codeberg.org/gruf/go-structr" +) + +// StructQueue wraps a structr.Queue{} to +// provide simple index caching by name. +type StructQueue[StructType any] struct { + queue structr.Queue[StructType] + index map[string]*structr.Index + wait atomic.Pointer[chan struct{}] +} + +// Init initializes queue with structr.QueueConfig{}. +func (q *StructQueue[T]) Init(config structr.QueueConfig[T]) { + q.index = make(map[string]*structr.Index, len(config.Indices)) + q.queue = structr.Queue[T]{} + q.queue.Init(config) + for _, cfg := range config.Indices { + q.index[cfg.Fields] = q.queue.Index(cfg.Fields) + } +} + +// Pop: see structr.Queue{}.PopFront(). +func (q *StructQueue[T]) Pop() (value T, ok bool) { + return q.queue.PopFront() +} + +// Push wraps structr.Queue{}.PushBack() to awaken those blocking on <-.Wait(). +func (q *StructQueue[T]) Push(values ...T) { + q.queue.PushBack(values...) + q.broadcast() +} + +// Delete pops (and drops!) all queued entries under index with key. +func (q *StructQueue[T]) Delete(index string, key ...any) { + i := q.index[index] + _ = q.queue.Pop(i, i.Key(key...)) +} + +// Len: see structr.Queue{}.Len(). +func (q *StructQueue[T]) Len() int { + return q.queue.Len() +} + +// Wait returns current wait channel, which may be +// blocked on to awaken when new value pushed to queue. +func (q *StructQueue[T]) Wait() <-chan struct{} { + var ch chan struct{} + + for { + // Get channel ptr. + ptr := q.wait.Load() + if ptr != nil { + return *ptr + } + + if ch == nil { + // Allocate new channel. + ch = make(chan struct{}) + } + + // Try set the new wait channel ptr. + if q.wait.CompareAndSwap(ptr, &ch) { + return ch + } + } +} + +// broadcast safely closes wait channel if +// currently set, releasing waiting goroutines. +func (q *StructQueue[T]) broadcast() { + if ptr := q.wait.Swap(nil); ptr != nil { + close(*ptr) + } +} |