diff options
Diffstat (limited to 'vendor/go.opentelemetry.io/otel/sdk/metric/pipeline.go')
| -rw-r--r-- | vendor/go.opentelemetry.io/otel/sdk/metric/pipeline.go | 21 |
1 files changed, 8 insertions, 13 deletions
diff --git a/vendor/go.opentelemetry.io/otel/sdk/metric/pipeline.go b/vendor/go.opentelemetry.io/otel/sdk/metric/pipeline.go index 2240c26e9..7bdb699ca 100644 --- a/vendor/go.opentelemetry.io/otel/sdk/metric/pipeline.go +++ b/vendor/go.opentelemetry.io/otel/sdk/metric/pipeline.go @@ -121,6 +121,14 @@ func (p *pipeline) addMultiCallback(c multiCallback) (unregister func()) { // // This method is safe to call concurrently. func (p *pipeline) produce(ctx context.Context, rm *metricdata.ResourceMetrics) error { + // Only check if context is already cancelled before starting, not inside or after callback loops. + // If this method returns after executing some callbacks but before running all aggregations, + // internal aggregation state can be corrupted and result in incorrect data returned + // by future produce calls. + if err := ctx.Err(); err != nil { + return err + } + p.Lock() defer p.Unlock() @@ -130,12 +138,6 @@ func (p *pipeline) produce(ctx context.Context, rm *metricdata.ResourceMetrics) if e := c(ctx); e != nil { err = errors.Join(err, e) } - if err := ctx.Err(); err != nil { - rm.Resource = nil - clear(rm.ScopeMetrics) // Erase elements to let GC collect objects. - rm.ScopeMetrics = rm.ScopeMetrics[:0] - return err - } } for e := p.multiCallbacks.Front(); e != nil; e = e.Next() { // TODO make the callbacks parallel. ( #3034 ) @@ -143,13 +145,6 @@ func (p *pipeline) produce(ctx context.Context, rm *metricdata.ResourceMetrics) if e := f(ctx); e != nil { err = errors.Join(err, e) } - if err := ctx.Err(); err != nil { - // This means the context expired before we finished running callbacks. - rm.Resource = nil - clear(rm.ScopeMetrics) // Erase elements to let GC collect objects. - rm.ScopeMetrics = rm.ScopeMetrics[:0] - return err - } } rm.Resource = p.resource |
