diff options
Diffstat (limited to 'internal/processing/processor.go')
-rw-r--r-- | internal/processing/processor.go | 20 |
1 files changed, 12 insertions, 8 deletions
diff --git a/internal/processing/processor.go b/internal/processing/processor.go index ad485b9ae..3e3854f69 100644 --- a/internal/processing/processor.go +++ b/internal/processing/processor.go @@ -138,20 +138,24 @@ func NewProcessor( return processor } -func (p *Processor) EnqueueClientAPI(ctx context.Context, msg messages.FromClientAPI) { - log.WithContext(ctx).WithField("msg", msg).Trace("enqueuing client API") +func (p *Processor) EnqueueClientAPI(ctx context.Context, msgs ...messages.FromClientAPI) { + log.Trace(ctx, "enqueuing client API") _ = p.state.Workers.ClientAPI.MustEnqueueCtx(ctx, func(ctx context.Context) { - if err := p.ProcessFromClientAPI(ctx, msg); err != nil { - log.Errorf(ctx, "error processing client API message: %v", err) + for _, msg := range msgs { + if err := p.ProcessFromClientAPI(ctx, msg); err != nil { + log.WithContext(ctx).WithField("msg", msg).Errorf("error processing client API message: %v", err) + } } }) } -func (p *Processor) EnqueueFederator(ctx context.Context, msg messages.FromFederator) { - log.WithContext(ctx).WithField("msg", msg).Trace("enqueuing federator") +func (p *Processor) EnqueueFederator(ctx context.Context, msgs ...messages.FromFederator) { + log.Trace(ctx, "enqueuing federator") _ = p.state.Workers.Federator.MustEnqueueCtx(ctx, func(ctx context.Context) { - if err := p.ProcessFromFederator(ctx, msg); err != nil { - log.Errorf(ctx, "error processing federator message: %v", err) + for _, msg := range msgs { + if err := p.ProcessFromFederator(ctx, msg); err != nil { + log.WithContext(ctx).WithField("msg", msg).Errorf("error processing federator message: %v", err) + } } }) } |