diff options
Diffstat (limited to 'internal/transport/delivery/delivery.go')
-rw-r--r-- | internal/transport/delivery/delivery.go | 81 |
1 files changed, 47 insertions, 34 deletions
diff --git a/internal/transport/delivery/delivery.go b/internal/transport/delivery/delivery.go index 27281399f..286f2abd2 100644 --- a/internal/transport/delivery/delivery.go +++ b/internal/transport/delivery/delivery.go @@ -28,6 +28,7 @@ import ( "github.com/superseriousbusiness/gotosocial/internal/httpclient" "github.com/superseriousbusiness/gotosocial/internal/log" "github.com/superseriousbusiness/gotosocial/internal/queue" + "github.com/superseriousbusiness/gotosocial/internal/util" ) // Delivery wraps an httpclient.Request{} @@ -99,29 +100,51 @@ func (p *WorkerPool) Init(client *httpclient.Client) { } // Start will attempt to start 'n' Worker{}s. -func (p *WorkerPool) Start(n int) (ok bool) { - if ok = (len(p.workers) == 0); ok { - p.workers = make([]*Worker, n) - for i := range p.workers { - p.workers[i] = new(Worker) - p.workers[i].Client = p.Client - p.workers[i].Queue = &p.Queue - ok = p.workers[i].Start() && ok - } +func (p *WorkerPool) Start(n int) { + // Check whether workers are + // set (is already running). + ok := (len(p.workers) > 0) + if ok { + return + } + + // Allocate new workers slice. + p.workers = make([]*Worker, n) + for i := range p.workers { + + // Allocate new Worker{}. + p.workers[i] = new(Worker) + p.workers[i].Client = p.Client + p.workers[i].Queue = &p.Queue + + // Attempt to start worker. + // Return bool not useful + // here, as true = started, + // false = already running. + _ = p.workers[i].Start() } - return } // Stop will attempt to stop contained Worker{}s. -func (p *WorkerPool) Stop() (ok bool) { - if ok = (len(p.workers) > 0); ok { - for i := range p.workers { - ok = p.workers[i].Stop() && ok - p.workers[i] = nil - } - p.workers = p.workers[:0] +func (p *WorkerPool) Stop() { + // Check whether workers are + // set (is currently running). + ok := (len(p.workers) == 0) + if ok { + return } - return + + // Stop all running workers. + for i := range p.workers { + + // return bool not useful + // here, as true = stopped, + // false = never running. + _ = p.workers[i].Stop() + } + + // Unset workers slice. + p.workers = p.workers[:0] } // Worker wraps an httpclient.Client{} to feed @@ -158,23 +181,13 @@ func (w *Worker) run(ctx context.Context) { if w.Client == nil || w.Queue == nil { panic("not yet initialized") } - log.Infof(ctx, "%p: started delivery worker", w) - defer log.Infof(ctx, "%p: stopped delivery worker", w) - for returned := false; !returned; { - func() { - defer func() { - if r := recover(); r != nil { - log.Errorf(ctx, "recovered panic: %v", r) - } - }() - w.process(ctx) - returned = true - }() - } + log.Infof(ctx, "%p: starting worker", w) + defer log.Infof(ctx, "%p: stopped worker", w) + util.Must(func() { w.process(ctx) }) } // process is the main delivery worker processing routine. -func (w *Worker) process(ctx context.Context) { +func (w *Worker) process(ctx context.Context) bool { if w.Client == nil || w.Queue == nil { // we perform this check here just // to ensure the compiler knows these @@ -188,7 +201,7 @@ loop: // Get next delivery. dlv, ok := w.next(ctx) if !ok { - return + return true } // Check whether backoff required. @@ -203,7 +216,7 @@ loop: // Main ctx // cancelled. backoff.Stop() - return + return true case <-w.Queue.Wait(): // A new message was |