diff options
Diffstat (limited to 'vendor/go.opentelemetry.io/otel/sdk/metric/internal/aggregate/sum.go')
-rw-r--r-- | vendor/go.opentelemetry.io/otel/sdk/metric/internal/aggregate/sum.go | 84 |
1 files changed, 36 insertions, 48 deletions
diff --git a/vendor/go.opentelemetry.io/otel/sdk/metric/internal/aggregate/sum.go b/vendor/go.opentelemetry.io/otel/sdk/metric/internal/aggregate/sum.go index 02de2483f..891366922 100644 --- a/vendor/go.opentelemetry.io/otel/sdk/metric/internal/aggregate/sum.go +++ b/vendor/go.opentelemetry.io/otel/sdk/metric/internal/aggregate/sum.go @@ -1,16 +1,5 @@ // Copyright The OpenTelemetry Authors -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. +// SPDX-License-Identifier: Apache-2.0 package aggregate // import "go.opentelemetry.io/otel/sdk/metric/internal/aggregate" @@ -25,48 +14,48 @@ import ( ) type sumValue[N int64 | float64] struct { - n N - res exemplar.Reservoir[N] + n N + res exemplar.FilteredReservoir[N] + attrs attribute.Set } // valueMap is the storage for sums. type valueMap[N int64 | float64] struct { sync.Mutex - newRes func() exemplar.Reservoir[N] + newRes func() exemplar.FilteredReservoir[N] limit limiter[sumValue[N]] - values map[attribute.Set]sumValue[N] + values map[attribute.Distinct]sumValue[N] } -func newValueMap[N int64 | float64](limit int, r func() exemplar.Reservoir[N]) *valueMap[N] { +func newValueMap[N int64 | float64](limit int, r func() exemplar.FilteredReservoir[N]) *valueMap[N] { return &valueMap[N]{ newRes: r, limit: newLimiter[sumValue[N]](limit), - values: make(map[attribute.Set]sumValue[N]), + values: make(map[attribute.Distinct]sumValue[N]), } } func (s *valueMap[N]) measure(ctx context.Context, value N, fltrAttr attribute.Set, droppedAttr []attribute.KeyValue) { - t := now() - s.Lock() defer s.Unlock() attr := s.limit.Attributes(fltrAttr, s.values) - v, ok := s.values[attr] + v, ok := s.values[attr.Equivalent()] if !ok { v.res = s.newRes() } + v.attrs = attr v.n += value - v.res.Offer(ctx, t, value, droppedAttr) + v.res.Offer(ctx, value, droppedAttr) - s.values[attr] = v + s.values[attr.Equivalent()] = v } // newSum returns an aggregator that summarizes a set of measurements as their // arithmetic sum. Each sum is scoped by attributes and the aggregation cycle // the measurements were made in. -func newSum[N int64 | float64](monotonic bool, limit int, r func() exemplar.Reservoir[N]) *sum[N] { +func newSum[N int64 | float64](monotonic bool, limit int, r func() exemplar.FilteredReservoir[N]) *sum[N] { return &sum[N]{ valueMap: newValueMap[N](limit, r), monotonic: monotonic, @@ -98,16 +87,16 @@ func (s *sum[N]) delta(dest *metricdata.Aggregation) int { dPts := reset(sData.DataPoints, n, n) var i int - for attr, val := range s.values { - dPts[i].Attributes = attr + for _, val := range s.values { + dPts[i].Attributes = val.attrs dPts[i].StartTime = s.start dPts[i].Time = t dPts[i].Value = val.n - val.res.Collect(&dPts[i].Exemplars) - // Do not report stale values. - delete(s.values, attr) + collectExemplars(&dPts[i].Exemplars, val.res.Collect) i++ } + // Do not report stale values. + clear(s.values) // The delta collection cycle resets. s.start = t @@ -133,12 +122,12 @@ func (s *sum[N]) cumulative(dest *metricdata.Aggregation) int { dPts := reset(sData.DataPoints, n, n) var i int - for attr, value := range s.values { - dPts[i].Attributes = attr + for _, value := range s.values { + dPts[i].Attributes = value.attrs dPts[i].StartTime = s.start dPts[i].Time = t dPts[i].Value = value.n - value.res.Collect(&dPts[i].Exemplars) + collectExemplars(&dPts[i].Exemplars, value.res.Collect) // TODO (#3006): This will use an unbounded amount of memory if there // are unbounded number of attribute sets being aggregated. Attribute // sets that become "stale" need to be forgotten so this will not @@ -155,7 +144,7 @@ func (s *sum[N]) cumulative(dest *metricdata.Aggregation) int { // newPrecomputedSum returns an aggregator that summarizes a set of // observatrions as their arithmetic sum. Each sum is scoped by attributes and // the aggregation cycle the measurements were made in. -func newPrecomputedSum[N int64 | float64](monotonic bool, limit int, r func() exemplar.Reservoir[N]) *precomputedSum[N] { +func newPrecomputedSum[N int64 | float64](monotonic bool, limit int, r func() exemplar.FilteredReservoir[N]) *precomputedSum[N] { return &precomputedSum[N]{ valueMap: newValueMap[N](limit, r), monotonic: monotonic, @@ -170,12 +159,12 @@ type precomputedSum[N int64 | float64] struct { monotonic bool start time.Time - reported map[attribute.Set]N + reported map[attribute.Distinct]N } func (s *precomputedSum[N]) delta(dest *metricdata.Aggregation) int { t := now() - newReported := make(map[attribute.Set]N) + newReported := make(map[attribute.Distinct]N) // If *dest is not a metricdata.Sum, memory reuse is missed. In that case, // use the zero-value sData and hope for better alignment next cycle. @@ -190,21 +179,20 @@ func (s *precomputedSum[N]) delta(dest *metricdata.Aggregation) int { dPts := reset(sData.DataPoints, n, n) var i int - for attr, value := range s.values { - delta := value.n - s.reported[attr] + for key, value := range s.values { + delta := value.n - s.reported[key] - dPts[i].Attributes = attr + dPts[i].Attributes = value.attrs dPts[i].StartTime = s.start dPts[i].Time = t dPts[i].Value = delta - value.res.Collect(&dPts[i].Exemplars) + collectExemplars(&dPts[i].Exemplars, value.res.Collect) - newReported[attr] = value.n - // Unused attribute sets do not report. - delete(s.values, attr) + newReported[key] = value.n i++ } - // Unused attribute sets are forgotten. + // Unused attribute sets do not report. + clear(s.values) s.reported = newReported // The delta collection cycle resets. s.start = t @@ -231,17 +219,17 @@ func (s *precomputedSum[N]) cumulative(dest *metricdata.Aggregation) int { dPts := reset(sData.DataPoints, n, n) var i int - for attr, val := range s.values { - dPts[i].Attributes = attr + for _, val := range s.values { + dPts[i].Attributes = val.attrs dPts[i].StartTime = s.start dPts[i].Time = t dPts[i].Value = val.n - val.res.Collect(&dPts[i].Exemplars) + collectExemplars(&dPts[i].Exemplars, val.res.Collect) - // Unused attribute sets do not report. - delete(s.values, attr) i++ } + // Unused attribute sets do not report. + clear(s.values) sData.DataPoints = dPts *dest = sData |