summaryrefslogtreecommitdiff
path: root/internal/processing/processor.go
diff options
context:
space:
mode:
Diffstat (limited to 'internal/processing/processor.go')
-rw-r--r--internal/processing/processor.go20
1 files changed, 12 insertions, 8 deletions
diff --git a/internal/processing/processor.go b/internal/processing/processor.go
index ad485b9ae..3e3854f69 100644
--- a/internal/processing/processor.go
+++ b/internal/processing/processor.go
@@ -138,20 +138,24 @@ func NewProcessor(
return processor
}
-func (p *Processor) EnqueueClientAPI(ctx context.Context, msg messages.FromClientAPI) {
- log.WithContext(ctx).WithField("msg", msg).Trace("enqueuing client API")
+func (p *Processor) EnqueueClientAPI(ctx context.Context, msgs ...messages.FromClientAPI) {
+ log.Trace(ctx, "enqueuing client API")
_ = p.state.Workers.ClientAPI.MustEnqueueCtx(ctx, func(ctx context.Context) {
- if err := p.ProcessFromClientAPI(ctx, msg); err != nil {
- log.Errorf(ctx, "error processing client API message: %v", err)
+ for _, msg := range msgs {
+ if err := p.ProcessFromClientAPI(ctx, msg); err != nil {
+ log.WithContext(ctx).WithField("msg", msg).Errorf("error processing client API message: %v", err)
+ }
}
})
}
-func (p *Processor) EnqueueFederator(ctx context.Context, msg messages.FromFederator) {
- log.WithContext(ctx).WithField("msg", msg).Trace("enqueuing federator")
+func (p *Processor) EnqueueFederator(ctx context.Context, msgs ...messages.FromFederator) {
+ log.Trace(ctx, "enqueuing federator")
_ = p.state.Workers.Federator.MustEnqueueCtx(ctx, func(ctx context.Context) {
- if err := p.ProcessFromFederator(ctx, msg); err != nil {
- log.Errorf(ctx, "error processing federator message: %v", err)
+ for _, msg := range msgs {
+ if err := p.ProcessFromFederator(ctx, msg); err != nil {
+ log.WithContext(ctx).WithField("msg", msg).Errorf("error processing federator message: %v", err)
+ }
}
})
}