diff options
Diffstat (limited to 'vendor/go.opentelemetry.io/otel/sdk/metric/internal/aggregate/sum.go')
-rw-r--r-- | vendor/go.opentelemetry.io/otel/sdk/metric/internal/aggregate/sum.go | 62 |
1 files changed, 45 insertions, 17 deletions
diff --git a/vendor/go.opentelemetry.io/otel/sdk/metric/internal/aggregate/sum.go b/vendor/go.opentelemetry.io/otel/sdk/metric/internal/aggregate/sum.go index 1e52ff0d1..02de2483f 100644 --- a/vendor/go.opentelemetry.io/otel/sdk/metric/internal/aggregate/sum.go +++ b/vendor/go.opentelemetry.io/otel/sdk/metric/internal/aggregate/sum.go @@ -20,31 +20,55 @@ import ( "time" "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/sdk/metric/internal/exemplar" "go.opentelemetry.io/otel/sdk/metric/metricdata" ) +type sumValue[N int64 | float64] struct { + n N + res exemplar.Reservoir[N] +} + // valueMap is the storage for sums. type valueMap[N int64 | float64] struct { sync.Mutex - values map[attribute.Set]N + newRes func() exemplar.Reservoir[N] + limit limiter[sumValue[N]] + values map[attribute.Set]sumValue[N] } -func newValueMap[N int64 | float64]() *valueMap[N] { - return &valueMap[N]{values: make(map[attribute.Set]N)} +func newValueMap[N int64 | float64](limit int, r func() exemplar.Reservoir[N]) *valueMap[N] { + return &valueMap[N]{ + newRes: r, + limit: newLimiter[sumValue[N]](limit), + values: make(map[attribute.Set]sumValue[N]), + } } -func (s *valueMap[N]) measure(_ context.Context, value N, attr attribute.Set) { +func (s *valueMap[N]) measure(ctx context.Context, value N, fltrAttr attribute.Set, droppedAttr []attribute.KeyValue) { + t := now() + s.Lock() - s.values[attr] += value - s.Unlock() + defer s.Unlock() + + attr := s.limit.Attributes(fltrAttr, s.values) + v, ok := s.values[attr] + if !ok { + v.res = s.newRes() + } + + v.n += value + v.res.Offer(ctx, t, value, droppedAttr) + + s.values[attr] = v } // newSum returns an aggregator that summarizes a set of measurements as their // arithmetic sum. Each sum is scoped by attributes and the aggregation cycle // the measurements were made in. -func newSum[N int64 | float64](monotonic bool) *sum[N] { +func newSum[N int64 | float64](monotonic bool, limit int, r func() exemplar.Reservoir[N]) *sum[N] { return &sum[N]{ - valueMap: newValueMap[N](), + valueMap: newValueMap[N](limit, r), monotonic: monotonic, start: now(), } @@ -74,11 +98,12 @@ func (s *sum[N]) delta(dest *metricdata.Aggregation) int { dPts := reset(sData.DataPoints, n, n) var i int - for attr, value := range s.values { + for attr, val := range s.values { dPts[i].Attributes = attr dPts[i].StartTime = s.start dPts[i].Time = t - dPts[i].Value = value + dPts[i].Value = val.n + val.res.Collect(&dPts[i].Exemplars) // Do not report stale values. delete(s.values, attr) i++ @@ -112,7 +137,8 @@ func (s *sum[N]) cumulative(dest *metricdata.Aggregation) int { dPts[i].Attributes = attr dPts[i].StartTime = s.start dPts[i].Time = t - dPts[i].Value = value + dPts[i].Value = value.n + value.res.Collect(&dPts[i].Exemplars) // TODO (#3006): This will use an unbounded amount of memory if there // are unbounded number of attribute sets being aggregated. Attribute // sets that become "stale" need to be forgotten so this will not @@ -129,9 +155,9 @@ func (s *sum[N]) cumulative(dest *metricdata.Aggregation) int { // newPrecomputedSum returns an aggregator that summarizes a set of // observatrions as their arithmetic sum. Each sum is scoped by attributes and // the aggregation cycle the measurements were made in. -func newPrecomputedSum[N int64 | float64](monotonic bool) *precomputedSum[N] { +func newPrecomputedSum[N int64 | float64](monotonic bool, limit int, r func() exemplar.Reservoir[N]) *precomputedSum[N] { return &precomputedSum[N]{ - valueMap: newValueMap[N](), + valueMap: newValueMap[N](limit, r), monotonic: monotonic, start: now(), } @@ -165,14 +191,15 @@ func (s *precomputedSum[N]) delta(dest *metricdata.Aggregation) int { var i int for attr, value := range s.values { - delta := value - s.reported[attr] + delta := value.n - s.reported[attr] dPts[i].Attributes = attr dPts[i].StartTime = s.start dPts[i].Time = t dPts[i].Value = delta + value.res.Collect(&dPts[i].Exemplars) - newReported[attr] = value + newReported[attr] = value.n // Unused attribute sets do not report. delete(s.values, attr) i++ @@ -204,11 +231,12 @@ func (s *precomputedSum[N]) cumulative(dest *metricdata.Aggregation) int { dPts := reset(sData.DataPoints, n, n) var i int - for attr, value := range s.values { + for attr, val := range s.values { dPts[i].Attributes = attr dPts[i].StartTime = s.start dPts[i].Time = t - dPts[i].Value = value + dPts[i].Value = val.n + val.res.Collect(&dPts[i].Exemplars) // Unused attribute sets do not report. delete(s.values, attr) |