summaryrefslogtreecommitdiff
path: root/internal/transport/delivery/delivery.go
diff options
context:
space:
mode:
Diffstat (limited to 'internal/transport/delivery/delivery.go')
-rw-r--r--internal/transport/delivery/delivery.go81
1 files changed, 47 insertions, 34 deletions
diff --git a/internal/transport/delivery/delivery.go b/internal/transport/delivery/delivery.go
index 27281399f..286f2abd2 100644
--- a/internal/transport/delivery/delivery.go
+++ b/internal/transport/delivery/delivery.go
@@ -28,6 +28,7 @@ import (
"github.com/superseriousbusiness/gotosocial/internal/httpclient"
"github.com/superseriousbusiness/gotosocial/internal/log"
"github.com/superseriousbusiness/gotosocial/internal/queue"
+ "github.com/superseriousbusiness/gotosocial/internal/util"
)
// Delivery wraps an httpclient.Request{}
@@ -99,29 +100,51 @@ func (p *WorkerPool) Init(client *httpclient.Client) {
}
// Start will attempt to start 'n' Worker{}s.
-func (p *WorkerPool) Start(n int) (ok bool) {
- if ok = (len(p.workers) == 0); ok {
- p.workers = make([]*Worker, n)
- for i := range p.workers {
- p.workers[i] = new(Worker)
- p.workers[i].Client = p.Client
- p.workers[i].Queue = &p.Queue
- ok = p.workers[i].Start() && ok
- }
+func (p *WorkerPool) Start(n int) {
+ // Check whether workers are
+ // set (is already running).
+ ok := (len(p.workers) > 0)
+ if ok {
+ return
+ }
+
+ // Allocate new workers slice.
+ p.workers = make([]*Worker, n)
+ for i := range p.workers {
+
+ // Allocate new Worker{}.
+ p.workers[i] = new(Worker)
+ p.workers[i].Client = p.Client
+ p.workers[i].Queue = &p.Queue
+
+ // Attempt to start worker.
+ // Return bool not useful
+ // here, as true = started,
+ // false = already running.
+ _ = p.workers[i].Start()
}
- return
}
// Stop will attempt to stop contained Worker{}s.
-func (p *WorkerPool) Stop() (ok bool) {
- if ok = (len(p.workers) > 0); ok {
- for i := range p.workers {
- ok = p.workers[i].Stop() && ok
- p.workers[i] = nil
- }
- p.workers = p.workers[:0]
+func (p *WorkerPool) Stop() {
+ // Check whether workers are
+ // set (is currently running).
+ ok := (len(p.workers) == 0)
+ if ok {
+ return
}
- return
+
+ // Stop all running workers.
+ for i := range p.workers {
+
+ // return bool not useful
+ // here, as true = stopped,
+ // false = never running.
+ _ = p.workers[i].Stop()
+ }
+
+ // Unset workers slice.
+ p.workers = p.workers[:0]
}
// Worker wraps an httpclient.Client{} to feed
@@ -158,23 +181,13 @@ func (w *Worker) run(ctx context.Context) {
if w.Client == nil || w.Queue == nil {
panic("not yet initialized")
}
- log.Infof(ctx, "%p: started delivery worker", w)
- defer log.Infof(ctx, "%p: stopped delivery worker", w)
- for returned := false; !returned; {
- func() {
- defer func() {
- if r := recover(); r != nil {
- log.Errorf(ctx, "recovered panic: %v", r)
- }
- }()
- w.process(ctx)
- returned = true
- }()
- }
+ log.Infof(ctx, "%p: starting worker", w)
+ defer log.Infof(ctx, "%p: stopped worker", w)
+ util.Must(func() { w.process(ctx) })
}
// process is the main delivery worker processing routine.
-func (w *Worker) process(ctx context.Context) {
+func (w *Worker) process(ctx context.Context) bool {
if w.Client == nil || w.Queue == nil {
// we perform this check here just
// to ensure the compiler knows these
@@ -188,7 +201,7 @@ loop:
// Get next delivery.
dlv, ok := w.next(ctx)
if !ok {
- return
+ return true
}
// Check whether backoff required.
@@ -203,7 +216,7 @@ loop:
// Main ctx
// cancelled.
backoff.Stop()
- return
+ return true
case <-w.Queue.Wait():
// A new message was