summaryrefslogtreecommitdiff
path: root/vendor/go.opentelemetry.io/otel/sdk/metric/internal/aggregate/aggregate.go
diff options
context:
space:
mode:
Diffstat (limited to 'vendor/go.opentelemetry.io/otel/sdk/metric/internal/aggregate/aggregate.go')
-rw-r--r--vendor/go.opentelemetry.io/otel/sdk/metric/internal/aggregate/aggregate.go45
1 files changed, 36 insertions, 9 deletions
diff --git a/vendor/go.opentelemetry.io/otel/sdk/metric/internal/aggregate/aggregate.go b/vendor/go.opentelemetry.io/otel/sdk/metric/internal/aggregate/aggregate.go
index 8dec14237..4060a2f76 100644
--- a/vendor/go.opentelemetry.io/otel/sdk/metric/internal/aggregate/aggregate.go
+++ b/vendor/go.opentelemetry.io/otel/sdk/metric/internal/aggregate/aggregate.go
@@ -19,6 +19,7 @@ import (
"time"
"go.opentelemetry.io/otel/attribute"
+ "go.opentelemetry.io/otel/sdk/metric/internal/exemplar"
"go.opentelemetry.io/otel/sdk/metric/metricdata"
)
@@ -44,17 +45,43 @@ type Builder[N int64 | float64] struct {
// Filter is the attribute filter the aggregate function will use on the
// input of measurements.
Filter attribute.Filter
+ // ReservoirFunc is the factory function used by aggregate functions to
+ // create new exemplar reservoirs for a new seen attribute set.
+ //
+ // If this is not provided a default factory function that returns an
+ // exemplar.Drop reservoir will be used.
+ ReservoirFunc func() exemplar.Reservoir[N]
+ // AggregationLimit is the cardinality limit of measurement attributes. Any
+ // measurement for new attributes once the limit has been reached will be
+ // aggregated into a single aggregate for the "otel.metric.overflow"
+ // attribute.
+ //
+ // If AggregationLimit is less than or equal to zero there will not be an
+ // aggregation limit imposed (i.e. unlimited attribute sets).
+ AggregationLimit int
+}
+
+func (b Builder[N]) resFunc() func() exemplar.Reservoir[N] {
+ if b.ReservoirFunc != nil {
+ return b.ReservoirFunc
+ }
+
+ return exemplar.Drop[N]
}
-func (b Builder[N]) filter(f Measure[N]) Measure[N] {
+type fltrMeasure[N int64 | float64] func(ctx context.Context, value N, fltrAttr attribute.Set, droppedAttr []attribute.KeyValue)
+
+func (b Builder[N]) filter(f fltrMeasure[N]) Measure[N] {
if b.Filter != nil {
fltr := b.Filter // Copy to make it immutable after assignment.
return func(ctx context.Context, n N, a attribute.Set) {
- fAttr, _ := a.Filter(fltr)
- f(ctx, n, fAttr)
+ fAttr, dropped := a.Filter(fltr)
+ f(ctx, n, fAttr, dropped)
}
}
- return f
+ return func(ctx context.Context, n N, a attribute.Set) {
+ f(ctx, n, a, nil)
+ }
}
// LastValue returns a last-value aggregate function input and output.
@@ -63,7 +90,7 @@ func (b Builder[N]) filter(f Measure[N]) Measure[N] {
func (b Builder[N]) LastValue() (Measure[N], ComputeAggregation) {
// Delta temporality is the only temporality that makes semantic sense for
// a last-value aggregate.
- lv := newLastValue[N]()
+ lv := newLastValue[N](b.AggregationLimit, b.resFunc())
return b.filter(lv.measure), func(dest *metricdata.Aggregation) int {
// Ignore if dest is not a metricdata.Gauge. The chance for memory
@@ -79,7 +106,7 @@ func (b Builder[N]) LastValue() (Measure[N], ComputeAggregation) {
// PrecomputedSum returns a sum aggregate function input and output. The
// arguments passed to the input are expected to be the precomputed sum values.
func (b Builder[N]) PrecomputedSum(monotonic bool) (Measure[N], ComputeAggregation) {
- s := newPrecomputedSum[N](monotonic)
+ s := newPrecomputedSum[N](monotonic, b.AggregationLimit, b.resFunc())
switch b.Temporality {
case metricdata.DeltaTemporality:
return b.filter(s.measure), s.delta
@@ -90,7 +117,7 @@ func (b Builder[N]) PrecomputedSum(monotonic bool) (Measure[N], ComputeAggregati
// Sum returns a sum aggregate function input and output.
func (b Builder[N]) Sum(monotonic bool) (Measure[N], ComputeAggregation) {
- s := newSum[N](monotonic)
+ s := newSum[N](monotonic, b.AggregationLimit, b.resFunc())
switch b.Temporality {
case metricdata.DeltaTemporality:
return b.filter(s.measure), s.delta
@@ -102,7 +129,7 @@ func (b Builder[N]) Sum(monotonic bool) (Measure[N], ComputeAggregation) {
// ExplicitBucketHistogram returns a histogram aggregate function input and
// output.
func (b Builder[N]) ExplicitBucketHistogram(boundaries []float64, noMinMax, noSum bool) (Measure[N], ComputeAggregation) {
- h := newHistogram[N](boundaries, noMinMax, noSum)
+ h := newHistogram[N](boundaries, noMinMax, noSum, b.AggregationLimit, b.resFunc())
switch b.Temporality {
case metricdata.DeltaTemporality:
return b.filter(h.measure), h.delta
@@ -114,7 +141,7 @@ func (b Builder[N]) ExplicitBucketHistogram(boundaries []float64, noMinMax, noSu
// ExponentialBucketHistogram returns a histogram aggregate function input and
// output.
func (b Builder[N]) ExponentialBucketHistogram(maxSize, maxScale int32, noMinMax, noSum bool) (Measure[N], ComputeAggregation) {
- h := newExponentialHistogram[N](maxSize, maxScale, noMinMax, noSum)
+ h := newExponentialHistogram[N](maxSize, maxScale, noMinMax, noSum, b.AggregationLimit, b.resFunc())
switch b.Temporality {
case metricdata.DeltaTemporality:
return b.filter(h.measure), h.delta