summaryrefslogtreecommitdiff
path: root/vendor/go.opentelemetry.io/otel/sdk/metric/pipeline.go
diff options
context:
space:
mode:
Diffstat (limited to 'vendor/go.opentelemetry.io/otel/sdk/metric/pipeline.go')
-rw-r--r--vendor/go.opentelemetry.io/otel/sdk/metric/pipeline.go54
1 files changed, 33 insertions, 21 deletions
diff --git a/vendor/go.opentelemetry.io/otel/sdk/metric/pipeline.go b/vendor/go.opentelemetry.io/otel/sdk/metric/pipeline.go
index 7bdb699ca..408fddc8d 100644
--- a/vendor/go.opentelemetry.io/otel/sdk/metric/pipeline.go
+++ b/vendor/go.opentelemetry.io/otel/sdk/metric/pipeline.go
@@ -17,7 +17,6 @@ import (
"go.opentelemetry.io/otel/sdk/metric/exemplar"
"go.opentelemetry.io/otel/sdk/metric/internal"
"go.opentelemetry.io/otel/sdk/metric/internal/aggregate"
- "go.opentelemetry.io/otel/sdk/metric/internal/x"
"go.opentelemetry.io/otel/sdk/metric/metricdata"
"go.opentelemetry.io/otel/sdk/resource"
)
@@ -37,17 +36,24 @@ type instrumentSync struct {
compAgg aggregate.ComputeAggregation
}
-func newPipeline(res *resource.Resource, reader Reader, views []View, exemplarFilter exemplar.Filter) *pipeline {
+func newPipeline(
+ res *resource.Resource,
+ reader Reader,
+ views []View,
+ exemplarFilter exemplar.Filter,
+ cardinalityLimit int,
+) *pipeline {
if res == nil {
res = resource.Empty()
}
return &pipeline{
- resource: res,
- reader: reader,
- views: views,
- int64Measures: map[observableID[int64]][]aggregate.Measure[int64]{},
- float64Measures: map[observableID[float64]][]aggregate.Measure[float64]{},
- exemplarFilter: exemplarFilter,
+ resource: res,
+ reader: reader,
+ views: views,
+ int64Measures: map[observableID[int64]][]aggregate.Measure[int64]{},
+ float64Measures: map[observableID[float64]][]aggregate.Measure[float64]{},
+ exemplarFilter: exemplarFilter,
+ cardinalityLimit: cardinalityLimit,
// aggregations is lazy allocated when needed.
}
}
@@ -65,12 +71,13 @@ type pipeline struct {
views []View
sync.Mutex
- int64Measures map[observableID[int64]][]aggregate.Measure[int64]
- float64Measures map[observableID[float64]][]aggregate.Measure[float64]
- aggregations map[instrumentation.Scope][]instrumentSync
- callbacks []func(context.Context) error
- multiCallbacks list.List
- exemplarFilter exemplar.Filter
+ int64Measures map[observableID[int64]][]aggregate.Measure[int64]
+ float64Measures map[observableID[float64]][]aggregate.Measure[float64]
+ aggregations map[instrumentation.Scope][]instrumentSync
+ callbacks []func(context.Context) error
+ multiCallbacks list.List
+ exemplarFilter exemplar.Filter
+ cardinalityLimit int
}
// addInt64Measure adds a new int64 measure to the pipeline for each observer.
@@ -388,10 +395,9 @@ func (i *inserter[N]) cachedAggregator(
b.Filter = stream.AttributeFilter
// A value less than or equal to zero will disable the aggregation
// limits for the builder (an all the created aggregates).
- // CardinalityLimit.Lookup returns 0 by default if unset (or
+ // cardinalityLimit will be 0 by default if unset (or
// unrecognized input). Use that value directly.
- b.AggregationLimit, _ = x.CardinalityLimit.Lookup()
-
+ b.AggregationLimit = i.pipeline.cardinalityLimit
in, out, err := i.aggregateFunc(b, stream.Aggregation, kind)
if err != nil {
return aggVal[N]{0, nil, err}
@@ -426,7 +432,7 @@ func (i *inserter[N]) logConflict(id instID) {
}
const msg = "duplicate metric stream definitions"
- args := []interface{}{
+ args := []any{
"names", fmt.Sprintf("%q, %q", existing.Name, id.Name),
"descriptions", fmt.Sprintf("%q, %q", existing.Description, id.Description),
"kinds", fmt.Sprintf("%s, %s", existing.Kind, id.Kind),
@@ -460,7 +466,7 @@ func (i *inserter[N]) logConflict(id instID) {
global.Warn(msg, args...)
}
-func (i *inserter[N]) instID(kind InstrumentKind, stream Stream) instID {
+func (*inserter[N]) instID(kind InstrumentKind, stream Stream) instID {
var zero N
return instID{
Name: stream.Name,
@@ -590,10 +596,16 @@ func isAggregatorCompatible(kind InstrumentKind, agg Aggregation) error {
// measurement.
type pipelines []*pipeline
-func newPipelines(res *resource.Resource, readers []Reader, views []View, exemplarFilter exemplar.Filter) pipelines {
+func newPipelines(
+ res *resource.Resource,
+ readers []Reader,
+ views []View,
+ exemplarFilter exemplar.Filter,
+ cardinalityLimit int,
+) pipelines {
pipes := make([]*pipeline, 0, len(readers))
for _, r := range readers {
- p := newPipeline(res, r, views, exemplarFilter)
+ p := newPipeline(res, r, views, exemplarFilter, cardinalityLimit)
r.register(p)
pipes = append(pipes, p)
}