diff options
Diffstat (limited to 'internal/transport/delivery/worker.go')
-rw-r--r-- | internal/transport/delivery/worker.go | 80 |
1 files changed, 48 insertions, 32 deletions
diff --git a/internal/transport/delivery/worker.go b/internal/transport/delivery/worker.go index ef31e94a6..d6d253769 100644 --- a/internal/transport/delivery/worker.go +++ b/internal/transport/delivery/worker.go @@ -19,6 +19,7 @@ package delivery import ( "context" + "errors" "slices" "time" @@ -160,6 +161,13 @@ func (w *Worker) process(ctx context.Context) bool { loop: for { + // Before trying to get + // next delivery, check + // context still valid. + if ctx.Err() != nil { + return true + } + // Get next delivery. dlv, ok := w.next(ctx) if !ok { @@ -195,16 +203,30 @@ loop: // Attempt delivery of AP request. rsp, retry, err := w.Client.DoOnce( - &dlv.Request, + dlv.Request, ) - if err == nil { + switch { + case err == nil: // Ensure body closed. _ = rsp.Body.Close() continue loop - } - if !retry { + case errors.Is(err, context.Canceled) && + ctx.Err() != nil: + // In the case of our own context + // being cancelled, push delivery + // back onto queue for persisting. + // + // Note we specifically check against + // context.Canceled here as it will + // be faster than the mutex lock of + // ctx.Err(), so gives an initial + // faster check in the if-clause. + w.Queue.Push(dlv) + continue loop + + case !retry: // Drop deliveries when no // retry requested, or they // reached max (either). @@ -222,42 +244,36 @@ loop: // next gets the next available delivery, blocking until available if necessary. func (w *Worker) next(ctx context.Context) (*Delivery, bool) { -loop: - for { - // Try pop next queued. - dlv, ok := w.Queue.Pop() + // Try a fast-pop of queued + // delivery before anything. + dlv, ok := w.Queue.Pop() - if !ok { - // Check the backlog. - if len(w.backlog) > 0 { + if !ok { + // Check the backlog. + if len(w.backlog) > 0 { - // Sort by 'next' time. - sortDeliveries(w.backlog) + // Sort by 'next' time. + sortDeliveries(w.backlog) - // Pop next delivery. - dlv := w.popBacklog() + // Pop next delivery. + dlv := w.popBacklog() - return dlv, true - } - - select { - // Backlog is empty, we MUST - // block until next enqueued. - case <-w.Queue.Wait(): - continue loop + return dlv, true + } - // Worker was stopped. - case <-ctx.Done(): - return nil, false - } + // Block on next delivery push + // OR worker context canceled. + dlv, ok = w.Queue.PopCtx(ctx) + if !ok { + return nil, false } + } - // Replace request context for worker state canceling. - ctx := gtscontext.WithValues(ctx, dlv.Request.Context()) - dlv.Request.Request = dlv.Request.Request.WithContext(ctx) + // Replace request context for worker state canceling. + ctx = gtscontext.WithValues(ctx, dlv.Request.Context()) + dlv.Request.Request = dlv.Request.Request.WithContext(ctx) - return dlv, true - } + return dlv, true } // popBacklog pops next available from the backlog. |