summaryrefslogtreecommitdiff
path: root/vendor/go.opentelemetry.io/otel/sdk/metric/pipeline.go
diff options
context:
space:
mode:
Diffstat (limited to 'vendor/go.opentelemetry.io/otel/sdk/metric/pipeline.go')
-rw-r--r--vendor/go.opentelemetry.io/otel/sdk/metric/pipeline.go32
1 files changed, 17 insertions, 15 deletions
diff --git a/vendor/go.opentelemetry.io/otel/sdk/metric/pipeline.go b/vendor/go.opentelemetry.io/otel/sdk/metric/pipeline.go
index 48abcc8a7..da39ab961 100644
--- a/vendor/go.opentelemetry.io/otel/sdk/metric/pipeline.go
+++ b/vendor/go.opentelemetry.io/otel/sdk/metric/pipeline.go
@@ -29,6 +29,7 @@ import (
"go.opentelemetry.io/otel/sdk/instrumentation"
"go.opentelemetry.io/otel/sdk/metric/internal"
"go.opentelemetry.io/otel/sdk/metric/internal/aggregate"
+ "go.opentelemetry.io/otel/sdk/metric/internal/x"
"go.opentelemetry.io/otel/sdk/metric/metricdata"
"go.opentelemetry.io/otel/sdk/resource"
)
@@ -93,14 +94,6 @@ func (p *pipeline) addSync(scope instrumentation.Scope, iSync instrumentSync) {
p.aggregations[scope] = append(p.aggregations[scope], iSync)
}
-// addCallback registers a single instrument callback to be run when
-// `produce()` is called.
-func (p *pipeline) addCallback(cback func(context.Context) error) {
- p.Lock()
- defer p.Unlock()
- p.callbacks = append(p.callbacks, cback)
-}
-
type multiCallback func(context.Context) error
// addMultiCallback registers a multi-instrument callback to be run when
@@ -281,6 +274,14 @@ func (i *inserter[N]) Instrument(inst Instrument, readerAggregation Aggregation)
return measures, errs.errorOrNil()
}
+// addCallback registers a single instrument callback to be run when
+// `produce()` is called.
+func (i *inserter[N]) addCallback(cback func(context.Context) error) {
+ i.pipeline.Lock()
+ defer i.pipeline.Unlock()
+ i.pipeline.callbacks = append(i.pipeline.callbacks, cback)
+}
+
var aggIDCount uint64
// aggVal is the cached value in an aggregators cache.
@@ -358,9 +359,16 @@ func (i *inserter[N]) cachedAggregator(scope instrumentation.Scope, kind Instrum
normID := id.normalize()
cv := i.aggregators.Lookup(normID, func() aggVal[N] {
b := aggregate.Builder[N]{
- Temporality: i.pipeline.reader.temporality(kind),
+ Temporality: i.pipeline.reader.temporality(kind),
+ ReservoirFunc: reservoirFunc[N](stream.Aggregation),
}
b.Filter = stream.AttributeFilter
+ // A value less than or equal to zero will disable the aggregation
+ // limits for the builder (an all the created aggregates).
+ // CardinalityLimit.Lookup returns 0 by default if unset (or
+ // unrecognized input). Use that value directly.
+ b.AggregationLimit, _ = x.CardinalityLimit.Lookup()
+
in, out, err := i.aggregateFunc(b, stream.Aggregation, kind)
if err != nil {
return aggVal[N]{0, nil, err}
@@ -557,12 +565,6 @@ func newPipelines(res *resource.Resource, readers []Reader, views []View) pipeli
return pipes
}
-func (p pipelines) registerCallback(cback func(context.Context) error) {
- for _, pipe := range p {
- pipe.addCallback(cback)
- }
-}
-
func (p pipelines) registerMultiCallback(c multiCallback) metric.Registration {
unregs := make([]func(), len(p))
for i, pipe := range p {