summaryrefslogtreecommitdiff
path: root/vendor/go.opentelemetry.io/otel/sdk/metric/pipeline.go
diff options
context:
space:
mode:
Diffstat (limited to 'vendor/go.opentelemetry.io/otel/sdk/metric/pipeline.go')
-rw-r--r--vendor/go.opentelemetry.io/otel/sdk/metric/pipeline.go132
1 files changed, 68 insertions, 64 deletions
diff --git a/vendor/go.opentelemetry.io/otel/sdk/metric/pipeline.go b/vendor/go.opentelemetry.io/otel/sdk/metric/pipeline.go
index 823bf2fe3..775e24526 100644
--- a/vendor/go.opentelemetry.io/otel/sdk/metric/pipeline.go
+++ b/vendor/go.opentelemetry.io/otel/sdk/metric/pipeline.go
@@ -8,14 +8,13 @@ import (
"context"
"errors"
"fmt"
- "strings"
"sync"
"sync/atomic"
"go.opentelemetry.io/otel/internal/global"
- "go.opentelemetry.io/otel/metric"
"go.opentelemetry.io/otel/metric/embedded"
"go.opentelemetry.io/otel/sdk/instrumentation"
+ "go.opentelemetry.io/otel/sdk/metric/exemplar"
"go.opentelemetry.io/otel/sdk/metric/internal"
"go.opentelemetry.io/otel/sdk/metric/internal/aggregate"
"go.opentelemetry.io/otel/sdk/metric/internal/x"
@@ -38,14 +37,17 @@ type instrumentSync struct {
compAgg aggregate.ComputeAggregation
}
-func newPipeline(res *resource.Resource, reader Reader, views []View) *pipeline {
+func newPipeline(res *resource.Resource, reader Reader, views []View, exemplarFilter exemplar.Filter) *pipeline {
if res == nil {
res = resource.Empty()
}
return &pipeline{
- resource: res,
- reader: reader,
- views: views,
+ resource: res,
+ reader: reader,
+ views: views,
+ int64Measures: map[observableID[int64]][]aggregate.Measure[int64]{},
+ float64Measures: map[observableID[float64]][]aggregate.Measure[float64]{},
+ exemplarFilter: exemplarFilter,
// aggregations is lazy allocated when needed.
}
}
@@ -63,9 +65,26 @@ type pipeline struct {
views []View
sync.Mutex
- aggregations map[instrumentation.Scope][]instrumentSync
- callbacks []func(context.Context) error
- multiCallbacks list.List
+ int64Measures map[observableID[int64]][]aggregate.Measure[int64]
+ float64Measures map[observableID[float64]][]aggregate.Measure[float64]
+ aggregations map[instrumentation.Scope][]instrumentSync
+ callbacks []func(context.Context) error
+ multiCallbacks list.List
+ exemplarFilter exemplar.Filter
+}
+
+// addInt64Measure adds a new int64 measure to the pipeline for each observer.
+func (p *pipeline) addInt64Measure(id observableID[int64], m []aggregate.Measure[int64]) {
+ p.Lock()
+ defer p.Unlock()
+ p.int64Measures[id] = m
+}
+
+// addFloat64Measure adds a new float64 measure to the pipeline for each observer.
+func (p *pipeline) addFloat64Measure(id observableID[float64], m []aggregate.Measure[float64]) {
+ p.Lock()
+ defer p.Unlock()
+ p.float64Measures[id] = m
}
// addSync adds the instrumentSync to pipeline p with scope. This method is not
@@ -105,14 +124,15 @@ func (p *pipeline) produce(ctx context.Context, rm *metricdata.ResourceMetrics)
p.Lock()
defer p.Unlock()
- var errs multierror
+ var err error
for _, c := range p.callbacks {
// TODO make the callbacks parallel. ( #3034 )
- if err := c(ctx); err != nil {
- errs.append(err)
+ if e := c(ctx); e != nil {
+ err = errors.Join(err, e)
}
if err := ctx.Err(); err != nil {
rm.Resource = nil
+ clear(rm.ScopeMetrics) // Erase elements to let GC collect objects.
rm.ScopeMetrics = rm.ScopeMetrics[:0]
return err
}
@@ -120,12 +140,13 @@ func (p *pipeline) produce(ctx context.Context, rm *metricdata.ResourceMetrics)
for e := p.multiCallbacks.Front(); e != nil; e = e.Next() {
// TODO make the callbacks parallel. ( #3034 )
f := e.Value.(multiCallback)
- if err := f(ctx); err != nil {
- errs.append(err)
+ if e := f(ctx); e != nil {
+ err = errors.Join(err, e)
}
if err := ctx.Err(); err != nil {
// This means the context expired before we finished running callbacks.
rm.Resource = nil
+ clear(rm.ScopeMetrics) // Erase elements to let GC collect objects.
rm.ScopeMetrics = rm.ScopeMetrics[:0]
return err
}
@@ -157,7 +178,7 @@ func (p *pipeline) produce(ctx context.Context, rm *metricdata.ResourceMetrics)
rm.ScopeMetrics = rm.ScopeMetrics[:i]
- return errs.errorOrNil()
+ return err
}
// inserter facilitates inserting of new instruments from a single scope into a
@@ -219,7 +240,7 @@ func (i *inserter[N]) Instrument(inst Instrument, readerAggregation Aggregation)
measures []aggregate.Measure[N]
)
- errs := &multierror{wrapped: errCreatingAggregators}
+ var err error
seen := make(map[uint64]struct{})
for _, v := range i.pipeline.views {
stream, match := v(inst)
@@ -227,9 +248,9 @@ func (i *inserter[N]) Instrument(inst Instrument, readerAggregation Aggregation)
continue
}
matched = true
- in, id, err := i.cachedAggregator(inst.Scope, inst.Kind, stream, readerAggregation)
- if err != nil {
- errs.append(err)
+ in, id, e := i.cachedAggregator(inst.Scope, inst.Kind, stream, readerAggregation)
+ if e != nil {
+ err = errors.Join(err, e)
}
if in == nil { // Drop aggregation.
continue
@@ -242,8 +263,12 @@ func (i *inserter[N]) Instrument(inst Instrument, readerAggregation Aggregation)
measures = append(measures, in)
}
+ if err != nil {
+ err = errors.Join(errCreatingAggregators, err)
+ }
+
if matched {
- return measures, errs.errorOrNil()
+ return measures, err
}
// Apply implicit default view if no explicit matched.
@@ -252,15 +277,18 @@ func (i *inserter[N]) Instrument(inst Instrument, readerAggregation Aggregation)
Description: inst.Description,
Unit: inst.Unit,
}
- in, _, err := i.cachedAggregator(inst.Scope, inst.Kind, stream, readerAggregation)
- if err != nil {
- errs.append(err)
+ in, _, e := i.cachedAggregator(inst.Scope, inst.Kind, stream, readerAggregation)
+ if e != nil {
+ if err == nil {
+ err = errCreatingAggregators
+ }
+ err = errors.Join(err, e)
}
if in != nil {
// Ensured to have not seen given matched was false.
measures = append(measures, in)
}
- return measures, errs.errorOrNil()
+ return measures, err
}
// addCallback registers a single instrument callback to be run when
@@ -329,6 +357,9 @@ func (i *inserter[N]) cachedAggregator(scope instrumentation.Scope, kind Instrum
// The view explicitly requested the default aggregation.
stream.Aggregation = DefaultAggregationSelector(kind)
}
+ if stream.ExemplarReservoirProviderSelector == nil {
+ stream.ExemplarReservoirProviderSelector = DefaultExemplarReservoirProviderSelector
+ }
if err := isAggregatorCompatible(kind, stream.Aggregation); err != nil {
return nil, 0, fmt.Errorf(
@@ -349,7 +380,7 @@ func (i *inserter[N]) cachedAggregator(scope instrumentation.Scope, kind Instrum
cv := i.aggregators.Lookup(normID, func() aggVal[N] {
b := aggregate.Builder[N]{
Temporality: i.pipeline.reader.temporality(kind),
- ReservoirFunc: reservoirFunc[N](stream.Aggregation),
+ ReservoirFunc: reservoirFunc[N](stream.ExemplarReservoirProviderSelector(stream.Aggregation), i.pipeline.exemplarFilter),
}
b.Filter = stream.AttributeFilter
// A value less than or equal to zero will disable the aggregation
@@ -552,24 +583,16 @@ func isAggregatorCompatible(kind InstrumentKind, agg Aggregation) error {
// measurement.
type pipelines []*pipeline
-func newPipelines(res *resource.Resource, readers []Reader, views []View) pipelines {
+func newPipelines(res *resource.Resource, readers []Reader, views []View, exemplarFilter exemplar.Filter) pipelines {
pipes := make([]*pipeline, 0, len(readers))
for _, r := range readers {
- p := newPipeline(res, r, views)
+ p := newPipeline(res, r, views, exemplarFilter)
r.register(p)
pipes = append(pipes, p)
}
return pipes
}
-func (p pipelines) registerMultiCallback(c multiCallback) metric.Registration {
- unregs := make([]func(), len(p))
- for i, pipe := range p {
- unregs[i] = pipe.addMultiCallback(c)
- }
- return unregisterFuncs{f: unregs}
-}
-
type unregisterFuncs struct {
embedded.Registration
f []func()
@@ -602,15 +625,15 @@ func newResolver[N int64 | float64](p pipelines, vc *cache[string, instID]) reso
func (r resolver[N]) Aggregators(id Instrument) ([]aggregate.Measure[N], error) {
var measures []aggregate.Measure[N]
- errs := &multierror{}
+ var err error
for _, i := range r.inserters {
- in, err := i.Instrument(id, i.readerDefaultAggregation(id.Kind))
- if err != nil {
- errs.append(err)
+ in, e := i.Instrument(id, i.readerDefaultAggregation(id.Kind))
+ if e != nil {
+ err = errors.Join(err, e)
}
measures = append(measures, in...)
}
- return measures, errs.errorOrNil()
+ return measures, err
}
// HistogramAggregators returns the histogram Aggregators that must be updated by the instrument
@@ -619,37 +642,18 @@ func (r resolver[N]) Aggregators(id Instrument) ([]aggregate.Measure[N], error)
func (r resolver[N]) HistogramAggregators(id Instrument, boundaries []float64) ([]aggregate.Measure[N], error) {
var measures []aggregate.Measure[N]
- errs := &multierror{}
+ var err error
for _, i := range r.inserters {
agg := i.readerDefaultAggregation(id.Kind)
if histAgg, ok := agg.(AggregationExplicitBucketHistogram); ok && len(boundaries) > 0 {
histAgg.Boundaries = boundaries
agg = histAgg
}
- in, err := i.Instrument(id, agg)
- if err != nil {
- errs.append(err)
+ in, e := i.Instrument(id, agg)
+ if e != nil {
+ err = errors.Join(err, e)
}
measures = append(measures, in...)
}
- return measures, errs.errorOrNil()
-}
-
-type multierror struct {
- wrapped error
- errors []string
-}
-
-func (m *multierror) errorOrNil() error {
- if len(m.errors) == 0 {
- return nil
- }
- if m.wrapped == nil {
- return errors.New(strings.Join(m.errors, "; "))
- }
- return fmt.Errorf("%w: %s", m.wrapped, strings.Join(m.errors, "; "))
-}
-
-func (m *multierror) append(err error) {
- m.errors = append(m.errors, err.Error())
+ return measures, err
}