summaryrefslogtreecommitdiff
path: root/internal/transport/delivery/worker.go
diff options
context:
space:
mode:
Diffstat (limited to 'internal/transport/delivery/worker.go')
-rw-r--r--internal/transport/delivery/worker.go80
1 files changed, 48 insertions, 32 deletions
diff --git a/internal/transport/delivery/worker.go b/internal/transport/delivery/worker.go
index ef31e94a6..d6d253769 100644
--- a/internal/transport/delivery/worker.go
+++ b/internal/transport/delivery/worker.go
@@ -19,6 +19,7 @@ package delivery
import (
"context"
+ "errors"
"slices"
"time"
@@ -160,6 +161,13 @@ func (w *Worker) process(ctx context.Context) bool {
loop:
for {
+ // Before trying to get
+ // next delivery, check
+ // context still valid.
+ if ctx.Err() != nil {
+ return true
+ }
+
// Get next delivery.
dlv, ok := w.next(ctx)
if !ok {
@@ -195,16 +203,30 @@ loop:
// Attempt delivery of AP request.
rsp, retry, err := w.Client.DoOnce(
- &dlv.Request,
+ dlv.Request,
)
- if err == nil {
+ switch {
+ case err == nil:
// Ensure body closed.
_ = rsp.Body.Close()
continue loop
- }
- if !retry {
+ case errors.Is(err, context.Canceled) &&
+ ctx.Err() != nil:
+ // In the case of our own context
+ // being cancelled, push delivery
+ // back onto queue for persisting.
+ //
+ // Note we specifically check against
+ // context.Canceled here as it will
+ // be faster than the mutex lock of
+ // ctx.Err(), so gives an initial
+ // faster check in the if-clause.
+ w.Queue.Push(dlv)
+ continue loop
+
+ case !retry:
// Drop deliveries when no
// retry requested, or they
// reached max (either).
@@ -222,42 +244,36 @@ loop:
// next gets the next available delivery, blocking until available if necessary.
func (w *Worker) next(ctx context.Context) (*Delivery, bool) {
-loop:
- for {
- // Try pop next queued.
- dlv, ok := w.Queue.Pop()
+ // Try a fast-pop of queued
+ // delivery before anything.
+ dlv, ok := w.Queue.Pop()
- if !ok {
- // Check the backlog.
- if len(w.backlog) > 0 {
+ if !ok {
+ // Check the backlog.
+ if len(w.backlog) > 0 {
- // Sort by 'next' time.
- sortDeliveries(w.backlog)
+ // Sort by 'next' time.
+ sortDeliveries(w.backlog)
- // Pop next delivery.
- dlv := w.popBacklog()
+ // Pop next delivery.
+ dlv := w.popBacklog()
- return dlv, true
- }
-
- select {
- // Backlog is empty, we MUST
- // block until next enqueued.
- case <-w.Queue.Wait():
- continue loop
+ return dlv, true
+ }
- // Worker was stopped.
- case <-ctx.Done():
- return nil, false
- }
+ // Block on next delivery push
+ // OR worker context canceled.
+ dlv, ok = w.Queue.PopCtx(ctx)
+ if !ok {
+ return nil, false
}
+ }
- // Replace request context for worker state canceling.
- ctx := gtscontext.WithValues(ctx, dlv.Request.Context())
- dlv.Request.Request = dlv.Request.Request.WithContext(ctx)
+ // Replace request context for worker state canceling.
+ ctx = gtscontext.WithValues(ctx, dlv.Request.Context())
+ dlv.Request.Request = dlv.Request.Request.WithContext(ctx)
- return dlv, true
- }
+ return dlv, true
}
// popBacklog pops next available from the backlog.