diff options
| author | 2024-04-26 13:50:46 +0100 | |
|---|---|---|
| committer | 2024-04-26 13:50:46 +0100 | |
| commit | c9c0773f2c2363dcfa37e675b83ec3f0b49bd0d9 (patch) | |
| tree | dbd3409070765d5ca81448a574ccd32b4da1ffe6 /internal/transport | |
| parent | [chore] update Docker container to use new go swagger hash (#2872) (diff) | |
| download | gotosocial-c9c0773f2c2363dcfa37e675b83ec3f0b49bd0d9.tar.xz | |
[performance] update remaining worker pools to use queues (#2865)
* start replacing client + federator + media workers with new worker + queue types
* refactor federatingDB.Delete(), drop queued messages when deleting account / status
* move all queue purging to the processor workers
* undo toolchain updates
* code comments, ensure dereferencer worker pool gets started
* update gruf libraries in readme
* start the job scheduler separately to the worker pools
* reshuffle ordering or server.go + remove duplicate worker start / stop
* update go-list version
* fix vendoring
* move queue invalidation to before wipeing / deletion, to ensure queued work not dropped
* add logging to worker processing functions in testrig, don't start workers in unexpected places
* update go-structr to add (+then rely on) QueueCtx{} type
* ensure more worker pools get started properly in tests
* fix remaining broken tests relying on worker queue logic
* fix account test suite queue popping logic, ensure noop workers do not pull from queue
* move back accidentally shuffled account deletion order
* ensure error (non nil!!) gets passed in refactored federatingDB{}.Delete()
* silently drop deletes from accounts not permitted to
* don't warn log on forwarded deletes
* make if else clauses easier to parse
* use getFederatorMsg()
* improved code comment
* improved code comment re: requesting account delete checks
* remove boolean result from worker start / stop since false = already running or already stopped
* remove optional passed-in http.client
* remove worker starting from the admin CLI commands (we don't need to handle side-effects)
* update prune cli to start scheduler but not all of the workers
* fix rebase issues
* remove redundant return statements
* i'm sorry sir linter
Diffstat (limited to 'internal/transport')
| -rw-r--r-- | internal/transport/delivery/delivery.go | 81 | ||||
| -rw-r--r-- | internal/transport/delivery/delivery_test.go | 4 |
2 files changed, 48 insertions, 37 deletions
diff --git a/internal/transport/delivery/delivery.go b/internal/transport/delivery/delivery.go index 27281399f..286f2abd2 100644 --- a/internal/transport/delivery/delivery.go +++ b/internal/transport/delivery/delivery.go @@ -28,6 +28,7 @@ import ( "github.com/superseriousbusiness/gotosocial/internal/httpclient" "github.com/superseriousbusiness/gotosocial/internal/log" "github.com/superseriousbusiness/gotosocial/internal/queue" + "github.com/superseriousbusiness/gotosocial/internal/util" ) // Delivery wraps an httpclient.Request{} @@ -99,29 +100,51 @@ func (p *WorkerPool) Init(client *httpclient.Client) { } // Start will attempt to start 'n' Worker{}s. -func (p *WorkerPool) Start(n int) (ok bool) { - if ok = (len(p.workers) == 0); ok { - p.workers = make([]*Worker, n) - for i := range p.workers { - p.workers[i] = new(Worker) - p.workers[i].Client = p.Client - p.workers[i].Queue = &p.Queue - ok = p.workers[i].Start() && ok - } +func (p *WorkerPool) Start(n int) { + // Check whether workers are + // set (is already running). + ok := (len(p.workers) > 0) + if ok { + return + } + + // Allocate new workers slice. + p.workers = make([]*Worker, n) + for i := range p.workers { + + // Allocate new Worker{}. + p.workers[i] = new(Worker) + p.workers[i].Client = p.Client + p.workers[i].Queue = &p.Queue + + // Attempt to start worker. + // Return bool not useful + // here, as true = started, + // false = already running. + _ = p.workers[i].Start() } - return } // Stop will attempt to stop contained Worker{}s. -func (p *WorkerPool) Stop() (ok bool) { - if ok = (len(p.workers) > 0); ok { - for i := range p.workers { - ok = p.workers[i].Stop() && ok - p.workers[i] = nil - } - p.workers = p.workers[:0] +func (p *WorkerPool) Stop() { + // Check whether workers are + // set (is currently running). + ok := (len(p.workers) == 0) + if ok { + return } - return + + // Stop all running workers. + for i := range p.workers { + + // return bool not useful + // here, as true = stopped, + // false = never running. + _ = p.workers[i].Stop() + } + + // Unset workers slice. + p.workers = p.workers[:0] } // Worker wraps an httpclient.Client{} to feed @@ -158,23 +181,13 @@ func (w *Worker) run(ctx context.Context) { if w.Client == nil || w.Queue == nil { panic("not yet initialized") } - log.Infof(ctx, "%p: started delivery worker", w) - defer log.Infof(ctx, "%p: stopped delivery worker", w) - for returned := false; !returned; { - func() { - defer func() { - if r := recover(); r != nil { - log.Errorf(ctx, "recovered panic: %v", r) - } - }() - w.process(ctx) - returned = true - }() - } + log.Infof(ctx, "%p: starting worker", w) + defer log.Infof(ctx, "%p: stopped worker", w) + util.Must(func() { w.process(ctx) }) } // process is the main delivery worker processing routine. -func (w *Worker) process(ctx context.Context) { +func (w *Worker) process(ctx context.Context) bool { if w.Client == nil || w.Queue == nil { // we perform this check here just // to ensure the compiler knows these @@ -188,7 +201,7 @@ loop: // Get next delivery. dlv, ok := w.next(ctx) if !ok { - return + return true } // Check whether backoff required. @@ -203,7 +216,7 @@ loop: // Main ctx // cancelled. backoff.Stop() - return + return true case <-w.Queue.Wait(): // A new message was diff --git a/internal/transport/delivery/delivery_test.go b/internal/transport/delivery/delivery_test.go index 852c6f6f3..48831f098 100644 --- a/internal/transport/delivery/delivery_test.go +++ b/internal/transport/delivery/delivery_test.go @@ -32,9 +32,7 @@ func testDeliveryWorkerPool(t *testing.T, sz int, input []*testrequest) { "127.0.0.0/8", }), })) - if !wp.Start(sz) { - t.Fatal("failed starting pool") - } + wp.Start(sz) defer wp.Stop() test(t, &wp.Queue, input) } |
