summaryrefslogtreecommitdiff
path: root/vendor/go.opentelemetry.io/otel/sdk/metric/internal/aggregate
diff options
context:
space:
mode:
authorLibravatar Terin Stock <terinjokes@gmail.com>2025-03-09 17:47:56 +0100
committerLibravatar Terin Stock <terinjokes@gmail.com>2025-03-10 01:59:49 +0100
commit3ac1ee16f377d31a0fb80c8dae28b6239ac4229e (patch)
treef61faa581feaaeaba2542b9f2b8234a590684413 /vendor/go.opentelemetry.io/otel/sdk/metric/internal/aggregate
parent[chore] update URLs to forked source (diff)
downloadgotosocial-3ac1ee16f377d31a0fb80c8dae28b6239ac4229e.tar.xz
[chore] remove vendor
Diffstat (limited to 'vendor/go.opentelemetry.io/otel/sdk/metric/internal/aggregate')
-rw-r--r--vendor/go.opentelemetry.io/otel/sdk/metric/internal/aggregate/aggregate.go153
-rw-r--r--vendor/go.opentelemetry.io/otel/sdk/metric/internal/aggregate/doc.go7
-rw-r--r--vendor/go.opentelemetry.io/otel/sdk/metric/internal/aggregate/drop.go27
-rw-r--r--vendor/go.opentelemetry.io/otel/sdk/metric/internal/aggregate/exemplar.go43
-rw-r--r--vendor/go.opentelemetry.io/otel/sdk/metric/internal/aggregate/exponential_histogram.go443
-rw-r--r--vendor/go.opentelemetry.io/otel/sdk/metric/internal/aggregate/filtered_reservoir.go50
-rw-r--r--vendor/go.opentelemetry.io/otel/sdk/metric/internal/aggregate/histogram.go232
-rw-r--r--vendor/go.opentelemetry.io/otel/sdk/metric/internal/aggregate/lastvalue.go161
-rw-r--r--vendor/go.opentelemetry.io/otel/sdk/metric/internal/aggregate/limit.go42
-rw-r--r--vendor/go.opentelemetry.io/otel/sdk/metric/internal/aggregate/sum.go237
10 files changed, 0 insertions, 1395 deletions
diff --git a/vendor/go.opentelemetry.io/otel/sdk/metric/internal/aggregate/aggregate.go b/vendor/go.opentelemetry.io/otel/sdk/metric/internal/aggregate/aggregate.go
deleted file mode 100644
index fde219333..000000000
--- a/vendor/go.opentelemetry.io/otel/sdk/metric/internal/aggregate/aggregate.go
+++ /dev/null
@@ -1,153 +0,0 @@
-// Copyright The OpenTelemetry Authors
-// SPDX-License-Identifier: Apache-2.0
-
-package aggregate // import "go.opentelemetry.io/otel/sdk/metric/internal/aggregate"
-
-import (
- "context"
- "time"
-
- "go.opentelemetry.io/otel/attribute"
- "go.opentelemetry.io/otel/sdk/metric/metricdata"
-)
-
-// now is used to return the current local time while allowing tests to
-// override the default time.Now function.
-var now = time.Now
-
-// Measure receives measurements to be aggregated.
-type Measure[N int64 | float64] func(context.Context, N, attribute.Set)
-
-// ComputeAggregation stores the aggregate of measurements into dest and
-// returns the number of aggregate data-points output.
-type ComputeAggregation func(dest *metricdata.Aggregation) int
-
-// Builder builds an aggregate function.
-type Builder[N int64 | float64] struct {
- // Temporality is the temporality used for the returned aggregate function.
- //
- // If this is not provided a default of cumulative will be used (except for
- // the last-value aggregate function where delta is the only appropriate
- // temporality).
- Temporality metricdata.Temporality
- // Filter is the attribute filter the aggregate function will use on the
- // input of measurements.
- Filter attribute.Filter
- // ReservoirFunc is the factory function used by aggregate functions to
- // create new exemplar reservoirs for a new seen attribute set.
- //
- // If this is not provided a default factory function that returns an
- // dropReservoir reservoir will be used.
- ReservoirFunc func(attribute.Set) FilteredExemplarReservoir[N]
- // AggregationLimit is the cardinality limit of measurement attributes. Any
- // measurement for new attributes once the limit has been reached will be
- // aggregated into a single aggregate for the "otel.metric.overflow"
- // attribute.
- //
- // If AggregationLimit is less than or equal to zero there will not be an
- // aggregation limit imposed (i.e. unlimited attribute sets).
- AggregationLimit int
-}
-
-func (b Builder[N]) resFunc() func(attribute.Set) FilteredExemplarReservoir[N] {
- if b.ReservoirFunc != nil {
- return b.ReservoirFunc
- }
-
- return dropReservoir
-}
-
-type fltrMeasure[N int64 | float64] func(ctx context.Context, value N, fltrAttr attribute.Set, droppedAttr []attribute.KeyValue)
-
-func (b Builder[N]) filter(f fltrMeasure[N]) Measure[N] {
- if b.Filter != nil {
- fltr := b.Filter // Copy to make it immutable after assignment.
- return func(ctx context.Context, n N, a attribute.Set) {
- fAttr, dropped := a.Filter(fltr)
- f(ctx, n, fAttr, dropped)
- }
- }
- return func(ctx context.Context, n N, a attribute.Set) {
- f(ctx, n, a, nil)
- }
-}
-
-// LastValue returns a last-value aggregate function input and output.
-func (b Builder[N]) LastValue() (Measure[N], ComputeAggregation) {
- lv := newLastValue[N](b.AggregationLimit, b.resFunc())
- switch b.Temporality {
- case metricdata.DeltaTemporality:
- return b.filter(lv.measure), lv.delta
- default:
- return b.filter(lv.measure), lv.cumulative
- }
-}
-
-// PrecomputedLastValue returns a last-value aggregate function input and
-// output. The aggregation returned from the returned ComputeAggregation
-// function will always only return values from the previous collection cycle.
-func (b Builder[N]) PrecomputedLastValue() (Measure[N], ComputeAggregation) {
- lv := newPrecomputedLastValue[N](b.AggregationLimit, b.resFunc())
- switch b.Temporality {
- case metricdata.DeltaTemporality:
- return b.filter(lv.measure), lv.delta
- default:
- return b.filter(lv.measure), lv.cumulative
- }
-}
-
-// PrecomputedSum returns a sum aggregate function input and output. The
-// arguments passed to the input are expected to be the precomputed sum values.
-func (b Builder[N]) PrecomputedSum(monotonic bool) (Measure[N], ComputeAggregation) {
- s := newPrecomputedSum[N](monotonic, b.AggregationLimit, b.resFunc())
- switch b.Temporality {
- case metricdata.DeltaTemporality:
- return b.filter(s.measure), s.delta
- default:
- return b.filter(s.measure), s.cumulative
- }
-}
-
-// Sum returns a sum aggregate function input and output.
-func (b Builder[N]) Sum(monotonic bool) (Measure[N], ComputeAggregation) {
- s := newSum[N](monotonic, b.AggregationLimit, b.resFunc())
- switch b.Temporality {
- case metricdata.DeltaTemporality:
- return b.filter(s.measure), s.delta
- default:
- return b.filter(s.measure), s.cumulative
- }
-}
-
-// ExplicitBucketHistogram returns a histogram aggregate function input and
-// output.
-func (b Builder[N]) ExplicitBucketHistogram(boundaries []float64, noMinMax, noSum bool) (Measure[N], ComputeAggregation) {
- h := newHistogram[N](boundaries, noMinMax, noSum, b.AggregationLimit, b.resFunc())
- switch b.Temporality {
- case metricdata.DeltaTemporality:
- return b.filter(h.measure), h.delta
- default:
- return b.filter(h.measure), h.cumulative
- }
-}
-
-// ExponentialBucketHistogram returns a histogram aggregate function input and
-// output.
-func (b Builder[N]) ExponentialBucketHistogram(maxSize, maxScale int32, noMinMax, noSum bool) (Measure[N], ComputeAggregation) {
- h := newExponentialHistogram[N](maxSize, maxScale, noMinMax, noSum, b.AggregationLimit, b.resFunc())
- switch b.Temporality {
- case metricdata.DeltaTemporality:
- return b.filter(h.measure), h.delta
- default:
- return b.filter(h.measure), h.cumulative
- }
-}
-
-// reset ensures s has capacity and sets it length. If the capacity of s too
-// small, a new slice is returned with the specified capacity and length.
-func reset[T any](s []T, length, capacity int) []T {
- if cap(s) < capacity {
- return make([]T, length, capacity)
- }
- return s[:length]
-}
diff --git a/vendor/go.opentelemetry.io/otel/sdk/metric/internal/aggregate/doc.go b/vendor/go.opentelemetry.io/otel/sdk/metric/internal/aggregate/doc.go
deleted file mode 100644
index 7b7225e6e..000000000
--- a/vendor/go.opentelemetry.io/otel/sdk/metric/internal/aggregate/doc.go
+++ /dev/null
@@ -1,7 +0,0 @@
-// Copyright The OpenTelemetry Authors
-// SPDX-License-Identifier: Apache-2.0
-
-// Package aggregate provides aggregate types used compute aggregations and
-// cycle the state of metric measurements made by the SDK. These types and
-// functionality are meant only for internal SDK use.
-package aggregate // import "go.opentelemetry.io/otel/sdk/metric/internal/aggregate"
diff --git a/vendor/go.opentelemetry.io/otel/sdk/metric/internal/aggregate/drop.go b/vendor/go.opentelemetry.io/otel/sdk/metric/internal/aggregate/drop.go
deleted file mode 100644
index 8396faaa4..000000000
--- a/vendor/go.opentelemetry.io/otel/sdk/metric/internal/aggregate/drop.go
+++ /dev/null
@@ -1,27 +0,0 @@
-// Copyright The OpenTelemetry Authors
-// SPDX-License-Identifier: Apache-2.0
-
-package aggregate // import "go.opentelemetry.io/otel/sdk/metric/internal/aggregate"
-
-import (
- "context"
-
- "go.opentelemetry.io/otel/attribute"
- "go.opentelemetry.io/otel/sdk/metric/exemplar"
-)
-
-// dropReservoir returns a [FilteredReservoir] that drops all measurements it is offered.
-func dropReservoir[N int64 | float64](attribute.Set) FilteredExemplarReservoir[N] {
- return &dropRes[N]{}
-}
-
-type dropRes[N int64 | float64] struct{}
-
-// Offer does nothing, all measurements offered will be dropped.
-func (r *dropRes[N]) Offer(context.Context, N, []attribute.KeyValue) {}
-
-// Collect resets dest. No exemplars will ever be returned.
-func (r *dropRes[N]) Collect(dest *[]exemplar.Exemplar) {
- clear(*dest) // Erase elements to let GC collect objects
- *dest = (*dest)[:0]
-}
diff --git a/vendor/go.opentelemetry.io/otel/sdk/metric/internal/aggregate/exemplar.go b/vendor/go.opentelemetry.io/otel/sdk/metric/internal/aggregate/exemplar.go
deleted file mode 100644
index 25d709948..000000000
--- a/vendor/go.opentelemetry.io/otel/sdk/metric/internal/aggregate/exemplar.go
+++ /dev/null
@@ -1,43 +0,0 @@
-// Copyright The OpenTelemetry Authors
-// SPDX-License-Identifier: Apache-2.0
-
-package aggregate // import "go.opentelemetry.io/otel/sdk/metric/internal/aggregate"
-
-import (
- "sync"
-
- "go.opentelemetry.io/otel/sdk/metric/exemplar"
- "go.opentelemetry.io/otel/sdk/metric/metricdata"
-)
-
-var exemplarPool = sync.Pool{
- New: func() any { return new([]exemplar.Exemplar) },
-}
-
-func collectExemplars[N int64 | float64](out *[]metricdata.Exemplar[N], f func(*[]exemplar.Exemplar)) {
- dest := exemplarPool.Get().(*[]exemplar.Exemplar)
- defer func() {
- clear(*dest) // Erase elements to let GC collect objects.
- *dest = (*dest)[:0]
- exemplarPool.Put(dest)
- }()
-
- *dest = reset(*dest, len(*out), cap(*out))
-
- f(dest)
-
- *out = reset(*out, len(*dest), cap(*dest))
- for i, e := range *dest {
- (*out)[i].FilteredAttributes = e.FilteredAttributes
- (*out)[i].Time = e.Time
- (*out)[i].SpanID = e.SpanID
- (*out)[i].TraceID = e.TraceID
-
- switch e.Value.Type() {
- case exemplar.Int64ValueType:
- (*out)[i].Value = N(e.Value.Int64())
- case exemplar.Float64ValueType:
- (*out)[i].Value = N(e.Value.Float64())
- }
- }
-}
diff --git a/vendor/go.opentelemetry.io/otel/sdk/metric/internal/aggregate/exponential_histogram.go b/vendor/go.opentelemetry.io/otel/sdk/metric/internal/aggregate/exponential_histogram.go
deleted file mode 100644
index 336ea91d1..000000000
--- a/vendor/go.opentelemetry.io/otel/sdk/metric/internal/aggregate/exponential_histogram.go
+++ /dev/null
@@ -1,443 +0,0 @@
-// Copyright The OpenTelemetry Authors
-// SPDX-License-Identifier: Apache-2.0
-
-package aggregate // import "go.opentelemetry.io/otel/sdk/metric/internal/aggregate"
-
-import (
- "context"
- "errors"
- "math"
- "sync"
- "time"
-
- "go.opentelemetry.io/otel"
- "go.opentelemetry.io/otel/attribute"
- "go.opentelemetry.io/otel/sdk/metric/metricdata"
-)
-
-const (
- expoMaxScale = 20
- expoMinScale = -10
-
- smallestNonZeroNormalFloat64 = 0x1p-1022
-
- // These redefine the Math constants with a type, so the compiler won't coerce
- // them into an int on 32 bit platforms.
- maxInt64 int64 = math.MaxInt64
- minInt64 int64 = math.MinInt64
-)
-
-// expoHistogramDataPoint is a single data point in an exponential histogram.
-type expoHistogramDataPoint[N int64 | float64] struct {
- attrs attribute.Set
- res FilteredExemplarReservoir[N]
-
- count uint64
- min N
- max N
- sum N
-
- maxSize int
- noMinMax bool
- noSum bool
-
- scale int32
-
- posBuckets expoBuckets
- negBuckets expoBuckets
- zeroCount uint64
-}
-
-func newExpoHistogramDataPoint[N int64 | float64](attrs attribute.Set, maxSize int, maxScale int32, noMinMax, noSum bool) *expoHistogramDataPoint[N] {
- f := math.MaxFloat64
- ma := N(f) // if N is int64, max will overflow to -9223372036854775808
- mi := N(-f)
- if N(maxInt64) > N(f) {
- ma = N(maxInt64)
- mi = N(minInt64)
- }
- return &expoHistogramDataPoint[N]{
- attrs: attrs,
- min: ma,
- max: mi,
- maxSize: maxSize,
- noMinMax: noMinMax,
- noSum: noSum,
- scale: maxScale,
- }
-}
-
-// record adds a new measurement to the histogram. It will rescale the buckets if needed.
-func (p *expoHistogramDataPoint[N]) record(v N) {
- p.count++
-
- if !p.noMinMax {
- if v < p.min {
- p.min = v
- }
- if v > p.max {
- p.max = v
- }
- }
- if !p.noSum {
- p.sum += v
- }
-
- absV := math.Abs(float64(v))
-
- if float64(absV) == 0.0 {
- p.zeroCount++
- return
- }
-
- bin := p.getBin(absV)
-
- bucket := &p.posBuckets
- if v < 0 {
- bucket = &p.negBuckets
- }
-
- // If the new bin would make the counts larger than maxScale, we need to
- // downscale current measurements.
- if scaleDelta := p.scaleChange(bin, bucket.startBin, len(bucket.counts)); scaleDelta > 0 {
- if p.scale-scaleDelta < expoMinScale {
- // With a scale of -10 there is only two buckets for the whole range of float64 values.
- // This can only happen if there is a max size of 1.
- otel.Handle(errors.New("exponential histogram scale underflow"))
- return
- }
- // Downscale
- p.scale -= scaleDelta
- p.posBuckets.downscale(scaleDelta)
- p.negBuckets.downscale(scaleDelta)
-
- bin = p.getBin(absV)
- }
-
- bucket.record(bin)
-}
-
-// getBin returns the bin v should be recorded into.
-func (p *expoHistogramDataPoint[N]) getBin(v float64) int32 {
- frac, expInt := math.Frexp(v)
- // 11-bit exponential.
- exp := int32(expInt) // nolint: gosec
- if p.scale <= 0 {
- // Because of the choice of fraction is always 1 power of two higher than we want.
- var correction int32 = 1
- if frac == .5 {
- // If v is an exact power of two the frac will be .5 and the exp
- // will be one higher than we want.
- correction = 2
- }
- return (exp - correction) >> (-p.scale)
- }
- return exp<<p.scale + int32(math.Log(frac)*scaleFactors[p.scale]) - 1
-}
-
-// scaleFactors are constants used in calculating the logarithm index. They are
-// equivalent to 2^index/log(2).
-var scaleFactors = [21]float64{
- math.Ldexp(math.Log2E, 0),
- math.Ldexp(math.Log2E, 1),
- math.Ldexp(math.Log2E, 2),
- math.Ldexp(math.Log2E, 3),
- math.Ldexp(math.Log2E, 4),
- math.Ldexp(math.Log2E, 5),
- math.Ldexp(math.Log2E, 6),
- math.Ldexp(math.Log2E, 7),
- math.Ldexp(math.Log2E, 8),
- math.Ldexp(math.Log2E, 9),
- math.Ldexp(math.Log2E, 10),
- math.Ldexp(math.Log2E, 11),
- math.Ldexp(math.Log2E, 12),
- math.Ldexp(math.Log2E, 13),
- math.Ldexp(math.Log2E, 14),
- math.Ldexp(math.Log2E, 15),
- math.Ldexp(math.Log2E, 16),
- math.Ldexp(math.Log2E, 17),
- math.Ldexp(math.Log2E, 18),
- math.Ldexp(math.Log2E, 19),
- math.Ldexp(math.Log2E, 20),
-}
-
-// scaleChange returns the magnitude of the scale change needed to fit bin in
-// the bucket. If no scale change is needed 0 is returned.
-func (p *expoHistogramDataPoint[N]) scaleChange(bin, startBin int32, length int) int32 {
- if length == 0 {
- // No need to rescale if there are no buckets.
- return 0
- }
-
- low := int(startBin)
- high := int(bin)
- if startBin >= bin {
- low = int(bin)
- high = int(startBin) + length - 1
- }
-
- var count int32
- for high-low >= p.maxSize {
- low = low >> 1
- high = high >> 1
- count++
- if count > expoMaxScale-expoMinScale {
- return count
- }
- }
- return count
-}
-
-// expoBuckets is a set of buckets in an exponential histogram.
-type expoBuckets struct {
- startBin int32
- counts []uint64
-}
-
-// record increments the count for the given bin, and expands the buckets if needed.
-// Size changes must be done before calling this function.
-func (b *expoBuckets) record(bin int32) {
- if len(b.counts) == 0 {
- b.counts = []uint64{1}
- b.startBin = bin
- return
- }
-
- endBin := int(b.startBin) + len(b.counts) - 1
-
- // if the new bin is inside the current range
- if bin >= b.startBin && int(bin) <= endBin {
- b.counts[bin-b.startBin]++
- return
- }
- // if the new bin is before the current start add spaces to the counts
- if bin < b.startBin {
- origLen := len(b.counts)
- newLength := endBin - int(bin) + 1
- shift := b.startBin - bin
-
- if newLength > cap(b.counts) {
- b.counts = append(b.counts, make([]uint64, newLength-len(b.counts))...)
- }
-
- copy(b.counts[shift:origLen+int(shift)], b.counts[:])
- b.counts = b.counts[:newLength]
- for i := 1; i < int(shift); i++ {
- b.counts[i] = 0
- }
- b.startBin = bin
- b.counts[0] = 1
- return
- }
- // if the new is after the end add spaces to the end
- if int(bin) > endBin {
- if int(bin-b.startBin) < cap(b.counts) {
- b.counts = b.counts[:bin-b.startBin+1]
- for i := endBin + 1 - int(b.startBin); i < len(b.counts); i++ {
- b.counts[i] = 0
- }
- b.counts[bin-b.startBin] = 1
- return
- }
-
- end := make([]uint64, int(bin-b.startBin)-len(b.counts)+1)
- b.counts = append(b.counts, end...)
- b.counts[bin-b.startBin] = 1
- }
-}
-
-// downscale shrinks a bucket by a factor of 2*s. It will sum counts into the
-// correct lower resolution bucket.
-func (b *expoBuckets) downscale(delta int32) {
- // Example
- // delta = 2
- // Original offset: -6
- // Counts: [ 3, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
- // bins: -6 -5, -4, -3, -2, -1, 0, 1, 2, 3, 4
- // new bins:-2, -2, -1, -1, -1, -1, 0, 0, 0, 0, 1
- // new Offset: -2
- // new Counts: [4, 14, 30, 10]
-
- if len(b.counts) <= 1 || delta < 1 {
- b.startBin = b.startBin >> delta
- return
- }
-
- steps := int32(1) << delta
- offset := b.startBin % steps
- offset = (offset + steps) % steps // to make offset positive
- for i := 1; i < len(b.counts); i++ {
- idx := i + int(offset)
- if idx%int(steps) == 0 {
- b.counts[idx/int(steps)] = b.counts[i]
- continue
- }
- b.counts[idx/int(steps)] += b.counts[i]
- }
-
- lastIdx := (len(b.counts) - 1 + int(offset)) / int(steps)
- b.counts = b.counts[:lastIdx+1]
- b.startBin = b.startBin >> delta
-}
-
-// newExponentialHistogram returns an Aggregator that summarizes a set of
-// measurements as an exponential histogram. Each histogram is scoped by attributes
-// and the aggregation cycle the measurements were made in.
-func newExponentialHistogram[N int64 | float64](maxSize, maxScale int32, noMinMax, noSum bool, limit int, r func(attribute.Set) FilteredExemplarReservoir[N]) *expoHistogram[N] {
- return &expoHistogram[N]{
- noSum: noSum,
- noMinMax: noMinMax,
- maxSize: int(maxSize),
- maxScale: maxScale,
-
- newRes: r,
- limit: newLimiter[*expoHistogramDataPoint[N]](limit),
- values: make(map[attribute.Distinct]*expoHistogramDataPoint[N]),
-
- start: now(),
- }
-}
-
-// expoHistogram summarizes a set of measurements as an histogram with exponentially
-// defined buckets.
-type expoHistogram[N int64 | float64] struct {
- noSum bool
- noMinMax bool
- maxSize int
- maxScale int32
-
- newRes func(attribute.Set) FilteredExemplarReservoir[N]
- limit limiter[*expoHistogramDataPoint[N]]
- values map[attribute.Distinct]*expoHistogramDataPoint[N]
- valuesMu sync.Mutex
-
- start time.Time
-}
-
-func (e *expoHistogram[N]) measure(ctx context.Context, value N, fltrAttr attribute.Set, droppedAttr []attribute.KeyValue) {
- // Ignore NaN and infinity.
- if math.IsInf(float64(value), 0) || math.IsNaN(float64(value)) {
- return
- }
-
- e.valuesMu.Lock()
- defer e.valuesMu.Unlock()
-
- attr := e.limit.Attributes(fltrAttr, e.values)
- v, ok := e.values[attr.Equivalent()]
- if !ok {
- v = newExpoHistogramDataPoint[N](attr, e.maxSize, e.maxScale, e.noMinMax, e.noSum)
- v.res = e.newRes(attr)
-
- e.values[attr.Equivalent()] = v
- }
- v.record(value)
- v.res.Offer(ctx, value, droppedAttr)
-}
-
-func (e *expoHistogram[N]) delta(dest *metricdata.Aggregation) int {
- t := now()
-
- // If *dest is not a metricdata.ExponentialHistogram, memory reuse is missed.
- // In that case, use the zero-value h and hope for better alignment next cycle.
- h, _ := (*dest).(metricdata.ExponentialHistogram[N])
- h.Temporality = metricdata.DeltaTemporality
-
- e.valuesMu.Lock()
- defer e.valuesMu.Unlock()
-
- n := len(e.values)
- hDPts := reset(h.DataPoints, n, n)
-
- var i int
- for _, val := range e.values {
- hDPts[i].Attributes = val.attrs
- hDPts[i].StartTime = e.start
- hDPts[i].Time = t
- hDPts[i].Count = val.count
- hDPts[i].Scale = val.scale
- hDPts[i].ZeroCount = val.zeroCount
- hDPts[i].ZeroThreshold = 0.0
-
- hDPts[i].PositiveBucket.Offset = val.posBuckets.startBin
- hDPts[i].PositiveBucket.Counts = reset(hDPts[i].PositiveBucket.Counts, len(val.posBuckets.counts), len(val.posBuckets.counts))
- copy(hDPts[i].PositiveBucket.Counts, val.posBuckets.counts)
-
- hDPts[i].NegativeBucket.Offset = val.negBuckets.startBin
- hDPts[i].NegativeBucket.Counts = reset(hDPts[i].NegativeBucket.Counts, len(val.negBuckets.counts), len(val.negBuckets.counts))
- copy(hDPts[i].NegativeBucket.Counts, val.negBuckets.counts)
-
- if !e.noSum {
- hDPts[i].Sum = val.sum
- }
- if !e.noMinMax {
- hDPts[i].Min = metricdata.NewExtrema(val.min)
- hDPts[i].Max = metricdata.NewExtrema(val.max)
- }
-
- collectExemplars(&hDPts[i].Exemplars, val.res.Collect)
-
- i++
- }
- // Unused attribute sets do not report.
- clear(e.values)
-
- e.start = t
- h.DataPoints = hDPts
- *dest = h
- return n
-}
-
-func (e *expoHistogram[N]) cumulative(dest *metricdata.Aggregation) int {
- t := now()
-
- // If *dest is not a metricdata.ExponentialHistogram, memory reuse is missed.
- // In that case, use the zero-value h and hope for better alignment next cycle.
- h, _ := (*dest).(metricdata.ExponentialHistogram[N])
- h.Temporality = metricdata.CumulativeTemporality
-
- e.valuesMu.Lock()
- defer e.valuesMu.Unlock()
-
- n := len(e.values)
- hDPts := reset(h.DataPoints, n, n)
-
- var i int
- for _, val := range e.values {
- hDPts[i].Attributes = val.attrs
- hDPts[i].StartTime = e.start
- hDPts[i].Time = t
- hDPts[i].Count = val.count
- hDPts[i].Scale = val.scale
- hDPts[i].ZeroCount = val.zeroCount
- hDPts[i].ZeroThreshold = 0.0
-
- hDPts[i].PositiveBucket.Offset = val.posBuckets.startBin
- hDPts[i].PositiveBucket.Counts = reset(hDPts[i].PositiveBucket.Counts, len(val.posBuckets.counts), len(val.posBuckets.counts))
- copy(hDPts[i].PositiveBucket.Counts, val.posBuckets.counts)
-
- hDPts[i].NegativeBucket.Offset = val.negBuckets.startBin
- hDPts[i].NegativeBucket.Counts = reset(hDPts[i].NegativeBucket.Counts, len(val.negBuckets.counts), len(val.negBuckets.counts))
- copy(hDPts[i].NegativeBucket.Counts, val.negBuckets.counts)
-
- if !e.noSum {
- hDPts[i].Sum = val.sum
- }
- if !e.noMinMax {
- hDPts[i].Min = metricdata.NewExtrema(val.min)
- hDPts[i].Max = metricdata.NewExtrema(val.max)
- }
-
- collectExemplars(&hDPts[i].Exemplars, val.res.Collect)
-
- i++
- // TODO (#3006): This will use an unbounded amount of memory if there
- // are unbounded number of attribute sets being aggregated. Attribute
- // sets that become "stale" need to be forgotten so this will not
- // overload the system.
- }
-
- h.DataPoints = hDPts
- *dest = h
- return n
-}
diff --git a/vendor/go.opentelemetry.io/otel/sdk/metric/internal/aggregate/filtered_reservoir.go b/vendor/go.opentelemetry.io/otel/sdk/metric/internal/aggregate/filtered_reservoir.go
deleted file mode 100644
index 691a91060..000000000
--- a/vendor/go.opentelemetry.io/otel/sdk/metric/internal/aggregate/filtered_reservoir.go
+++ /dev/null
@@ -1,50 +0,0 @@
-// Copyright The OpenTelemetry Authors
-// SPDX-License-Identifier: Apache-2.0
-
-package aggregate // import "go.opentelemetry.io/otel/sdk/metric/internal/aggregate"
-
-import (
- "context"
- "time"
-
- "go.opentelemetry.io/otel/attribute"
- "go.opentelemetry.io/otel/sdk/metric/exemplar"
-)
-
-// FilteredExemplarReservoir wraps a [exemplar.Reservoir] with a filter.
-type FilteredExemplarReservoir[N int64 | float64] interface {
- // Offer accepts the parameters associated with a measurement. The
- // parameters will be stored as an exemplar if the filter decides to
- // sample the measurement.
- //
- // The passed ctx needs to contain any baggage or span that were active
- // when the measurement was made. This information may be used by the
- // Reservoir in making a sampling decision.
- Offer(ctx context.Context, val N, attr []attribute.KeyValue)
- // Collect returns all the held exemplars in the reservoir.
- Collect(dest *[]exemplar.Exemplar)
-}
-
-// filteredExemplarReservoir handles the pre-sampled exemplar of measurements made.
-type filteredExemplarReservoir[N int64 | float64] struct {
- filter exemplar.Filter
- reservoir exemplar.Reservoir
-}
-
-// NewFilteredExemplarReservoir creates a [FilteredExemplarReservoir] which only offers values
-// that are allowed by the filter.
-func NewFilteredExemplarReservoir[N int64 | float64](f exemplar.Filter, r exemplar.Reservoir) FilteredExemplarReservoir[N] {
- return &filteredExemplarReservoir[N]{
- filter: f,
- reservoir: r,
- }
-}
-
-func (f *filteredExemplarReservoir[N]) Offer(ctx context.Context, val N, attr []attribute.KeyValue) {
- if f.filter(ctx) {
- // only record the current time if we are sampling this measurement.
- f.reservoir.Offer(ctx, time.Now(), exemplar.NewValue(val), attr)
- }
-}
-
-func (f *filteredExemplarReservoir[N]) Collect(dest *[]exemplar.Exemplar) { f.reservoir.Collect(dest) }
diff --git a/vendor/go.opentelemetry.io/otel/sdk/metric/internal/aggregate/histogram.go b/vendor/go.opentelemetry.io/otel/sdk/metric/internal/aggregate/histogram.go
deleted file mode 100644
index d577ae2c1..000000000
--- a/vendor/go.opentelemetry.io/otel/sdk/metric/internal/aggregate/histogram.go
+++ /dev/null
@@ -1,232 +0,0 @@
-// Copyright The OpenTelemetry Authors
-// SPDX-License-Identifier: Apache-2.0
-
-package aggregate // import "go.opentelemetry.io/otel/sdk/metric/internal/aggregate"
-
-import (
- "context"
- "slices"
- "sort"
- "sync"
- "time"
-
- "go.opentelemetry.io/otel/attribute"
- "go.opentelemetry.io/otel/sdk/metric/metricdata"
-)
-
-type buckets[N int64 | float64] struct {
- attrs attribute.Set
- res FilteredExemplarReservoir[N]
-
- counts []uint64
- count uint64
- total N
- min, max N
-}
-
-// newBuckets returns buckets with n bins.
-func newBuckets[N int64 | float64](attrs attribute.Set, n int) *buckets[N] {
- return &buckets[N]{attrs: attrs, counts: make([]uint64, n)}
-}
-
-func (b *buckets[N]) sum(value N) { b.total += value }
-
-func (b *buckets[N]) bin(idx int, value N) {
- b.counts[idx]++
- b.count++
- if value < b.min {
- b.min = value
- } else if value > b.max {
- b.max = value
- }
-}
-
-// histValues summarizes a set of measurements as an histValues with
-// explicitly defined buckets.
-type histValues[N int64 | float64] struct {
- noSum bool
- bounds []float64
-
- newRes func(attribute.Set) FilteredExemplarReservoir[N]
- limit limiter[*buckets[N]]
- values map[attribute.Distinct]*buckets[N]
- valuesMu sync.Mutex
-}
-
-func newHistValues[N int64 | float64](bounds []float64, noSum bool, limit int, r func(attribute.Set) FilteredExemplarReservoir[N]) *histValues[N] {
- // The responsibility of keeping all buckets correctly associated with the
- // passed boundaries is ultimately this type's responsibility. Make a copy
- // here so we can always guarantee this. Or, in the case of failure, have
- // complete control over the fix.
- b := slices.Clone(bounds)
- slices.Sort(b)
- return &histValues[N]{
- noSum: noSum,
- bounds: b,
- newRes: r,
- limit: newLimiter[*buckets[N]](limit),
- values: make(map[attribute.Distinct]*buckets[N]),
- }
-}
-
-// Aggregate records the measurement value, scoped by attr, and aggregates it
-// into a histogram.
-func (s *histValues[N]) measure(ctx context.Context, value N, fltrAttr attribute.Set, droppedAttr []attribute.KeyValue) {
- // This search will return an index in the range [0, len(s.bounds)], where
- // it will return len(s.bounds) if value is greater than the last element
- // of s.bounds. This aligns with the buckets in that the length of buckets
- // is len(s.bounds)+1, with the last bucket representing:
- // (s.bounds[len(s.bounds)-1], +∞).
- idx := sort.SearchFloat64s(s.bounds, float64(value))
-
- s.valuesMu.Lock()
- defer s.valuesMu.Unlock()
-
- attr := s.limit.Attributes(fltrAttr, s.values)
- b, ok := s.values[attr.Equivalent()]
- if !ok {
- // N+1 buckets. For example:
- //
- // bounds = [0, 5, 10]
- //
- // Then,
- //
- // buckets = (-∞, 0], (0, 5.0], (5.0, 10.0], (10.0, +∞)
- b = newBuckets[N](attr, len(s.bounds)+1)
- b.res = s.newRes(attr)
-
- // Ensure min and max are recorded values (not zero), for new buckets.
- b.min, b.max = value, value
- s.values[attr.Equivalent()] = b
- }
- b.bin(idx, value)
- if !s.noSum {
- b.sum(value)
- }
- b.res.Offer(ctx, value, droppedAttr)
-}
-
-// newHistogram returns an Aggregator that summarizes a set of measurements as
-// an histogram.
-func newHistogram[N int64 | float64](boundaries []float64, noMinMax, noSum bool, limit int, r func(attribute.Set) FilteredExemplarReservoir[N]) *histogram[N] {
- return &histogram[N]{
- histValues: newHistValues[N](boundaries, noSum, limit, r),
- noMinMax: noMinMax,
- start: now(),
- }
-}
-
-// histogram summarizes a set of measurements as an histogram with explicitly
-// defined buckets.
-type histogram[N int64 | float64] struct {
- *histValues[N]
-
- noMinMax bool
- start time.Time
-}
-
-func (s *histogram[N]) delta(dest *metricdata.Aggregation) int {
- t := now()
-
- // If *dest is not a metricdata.Histogram, memory reuse is missed. In that
- // case, use the zero-value h and hope for better alignment next cycle.
- h, _ := (*dest).(metricdata.Histogram[N])
- h.Temporality = metricdata.DeltaTemporality
-
- s.valuesMu.Lock()
- defer s.valuesMu.Unlock()
-
- // Do not allow modification of our copy of bounds.
- bounds := slices.Clone(s.bounds)
-
- n := len(s.values)
- hDPts := reset(h.DataPoints, n, n)
-
- var i int
- for _, val := range s.values {
- hDPts[i].Attributes = val.attrs
- hDPts[i].StartTime = s.start
- hDPts[i].Time = t
- hDPts[i].Count = val.count
- hDPts[i].Bounds = bounds
- hDPts[i].BucketCounts = val.counts
-
- if !s.noSum {
- hDPts[i].Sum = val.total
- }
-
- if !s.noMinMax {
- hDPts[i].Min = metricdata.NewExtrema(val.min)
- hDPts[i].Max = metricdata.NewExtrema(val.max)
- }
-
- collectExemplars(&hDPts[i].Exemplars, val.res.Collect)
-
- i++
- }
- // Unused attribute sets do not report.
- clear(s.values)
- // The delta collection cycle resets.
- s.start = t
-
- h.DataPoints = hDPts
- *dest = h
-
- return n
-}
-
-func (s *histogram[N]) cumulative(dest *metricdata.Aggregation) int {
- t := now()
-
- // If *dest is not a metricdata.Histogram, memory reuse is missed. In that
- // case, use the zero-value h and hope for better alignment next cycle.
- h, _ := (*dest).(metricdata.Histogram[N])
- h.Temporality = metricdata.CumulativeTemporality
-
- s.valuesMu.Lock()
- defer s.valuesMu.Unlock()
-
- // Do not allow modification of our copy of bounds.
- bounds := slices.Clone(s.bounds)
-
- n := len(s.values)
- hDPts := reset(h.DataPoints, n, n)
-
- var i int
- for _, val := range s.values {
- hDPts[i].Attributes = val.attrs
- hDPts[i].StartTime = s.start
- hDPts[i].Time = t
- hDPts[i].Count = val.count
- hDPts[i].Bounds = bounds
-
- // The HistogramDataPoint field values returned need to be copies of
- // the buckets value as we will keep updating them.
- //
- // TODO (#3047): Making copies for bounds and counts incurs a large
- // memory allocation footprint. Alternatives should be explored.
- hDPts[i].BucketCounts = slices.Clone(val.counts)
-
- if !s.noSum {
- hDPts[i].Sum = val.total
- }
-
- if !s.noMinMax {
- hDPts[i].Min = metricdata.NewExtrema(val.min)
- hDPts[i].Max = metricdata.NewExtrema(val.max)
- }
-
- collectExemplars(&hDPts[i].Exemplars, val.res.Collect)
-
- i++
- // TODO (#3006): This will use an unbounded amount of memory if there
- // are unbounded number of attribute sets being aggregated. Attribute
- // sets that become "stale" need to be forgotten so this will not
- // overload the system.
- }
-
- h.DataPoints = hDPts
- *dest = h
-
- return n
-}
diff --git a/vendor/go.opentelemetry.io/otel/sdk/metric/internal/aggregate/lastvalue.go b/vendor/go.opentelemetry.io/otel/sdk/metric/internal/aggregate/lastvalue.go
deleted file mode 100644
index d3a93f085..000000000
--- a/vendor/go.opentelemetry.io/otel/sdk/metric/internal/aggregate/lastvalue.go
+++ /dev/null
@@ -1,161 +0,0 @@
-// Copyright The OpenTelemetry Authors
-// SPDX-License-Identifier: Apache-2.0
-
-package aggregate // import "go.opentelemetry.io/otel/sdk/metric/internal/aggregate"
-
-import (
- "context"
- "sync"
- "time"
-
- "go.opentelemetry.io/otel/attribute"
- "go.opentelemetry.io/otel/sdk/metric/metricdata"
-)
-
-// datapoint is timestamped measurement data.
-type datapoint[N int64 | float64] struct {
- attrs attribute.Set
- value N
- res FilteredExemplarReservoir[N]
-}
-
-func newLastValue[N int64 | float64](limit int, r func(attribute.Set) FilteredExemplarReservoir[N]) *lastValue[N] {
- return &lastValue[N]{
- newRes: r,
- limit: newLimiter[datapoint[N]](limit),
- values: make(map[attribute.Distinct]datapoint[N]),
- start: now(),
- }
-}
-
-// lastValue summarizes a set of measurements as the last one made.
-type lastValue[N int64 | float64] struct {
- sync.Mutex
-
- newRes func(attribute.Set) FilteredExemplarReservoir[N]
- limit limiter[datapoint[N]]
- values map[attribute.Distinct]datapoint[N]
- start time.Time
-}
-
-func (s *lastValue[N]) measure(ctx context.Context, value N, fltrAttr attribute.Set, droppedAttr []attribute.KeyValue) {
- s.Lock()
- defer s.Unlock()
-
- attr := s.limit.Attributes(fltrAttr, s.values)
- d, ok := s.values[attr.Equivalent()]
- if !ok {
- d.res = s.newRes(attr)
- }
-
- d.attrs = attr
- d.value = value
- d.res.Offer(ctx, value, droppedAttr)
-
- s.values[attr.Equivalent()] = d
-}
-
-func (s *lastValue[N]) delta(dest *metricdata.Aggregation) int {
- t := now()
- // Ignore if dest is not a metricdata.Gauge. The chance for memory reuse of
- // the DataPoints is missed (better luck next time).
- gData, _ := (*dest).(metricdata.Gauge[N])
-
- s.Lock()
- defer s.Unlock()
-
- n := s.copyDpts(&gData.DataPoints, t)
- // Do not report stale values.
- clear(s.values)
- // Update start time for delta temporality.
- s.start = t
-
- *dest = gData
-
- return n
-}
-
-func (s *lastValue[N]) cumulative(dest *metricdata.Aggregation) int {
- t := now()
- // Ignore if dest is not a metricdata.Gauge. The chance for memory reuse of
- // the DataPoints is missed (better luck next time).
- gData, _ := (*dest).(metricdata.Gauge[N])
-
- s.Lock()
- defer s.Unlock()
-
- n := s.copyDpts(&gData.DataPoints, t)
- // TODO (#3006): This will use an unbounded amount of memory if there
- // are unbounded number of attribute sets being aggregated. Attribute
- // sets that become "stale" need to be forgotten so this will not
- // overload the system.
- *dest = gData
-
- return n
-}
-
-// copyDpts copies the datapoints held by s into dest. The number of datapoints
-// copied is returned.
-func (s *lastValue[N]) copyDpts(dest *[]metricdata.DataPoint[N], t time.Time) int {
- n := len(s.values)
- *dest = reset(*dest, n, n)
-
- var i int
- for _, v := range s.values {
- (*dest)[i].Attributes = v.attrs
- (*dest)[i].StartTime = s.start
- (*dest)[i].Time = t
- (*dest)[i].Value = v.value
- collectExemplars(&(*dest)[i].Exemplars, v.res.Collect)
- i++
- }
- return n
-}
-
-// newPrecomputedLastValue returns an aggregator that summarizes a set of
-// observations as the last one made.
-func newPrecomputedLastValue[N int64 | float64](limit int, r func(attribute.Set) FilteredExemplarReservoir[N]) *precomputedLastValue[N] {
- return &precomputedLastValue[N]{lastValue: newLastValue[N](limit, r)}
-}
-
-// precomputedLastValue summarizes a set of observations as the last one made.
-type precomputedLastValue[N int64 | float64] struct {
- *lastValue[N]
-}
-
-func (s *precomputedLastValue[N]) delta(dest *metricdata.Aggregation) int {
- t := now()
- // Ignore if dest is not a metricdata.Gauge. The chance for memory reuse of
- // the DataPoints is missed (better luck next time).
- gData, _ := (*dest).(metricdata.Gauge[N])
-
- s.Lock()
- defer s.Unlock()
-
- n := s.copyDpts(&gData.DataPoints, t)
- // Do not report stale values.
- clear(s.values)
- // Update start time for delta temporality.
- s.start = t
-
- *dest = gData
-
- return n
-}
-
-func (s *precomputedLastValue[N]) cumulative(dest *metricdata.Aggregation) int {
- t := now()
- // Ignore if dest is not a metricdata.Gauge. The chance for memory reuse of
- // the DataPoints is missed (better luck next time).
- gData, _ := (*dest).(metricdata.Gauge[N])
-
- s.Lock()
- defer s.Unlock()
-
- n := s.copyDpts(&gData.DataPoints, t)
- // Do not report stale values.
- clear(s.values)
- *dest = gData
-
- return n
-}
diff --git a/vendor/go.opentelemetry.io/otel/sdk/metric/internal/aggregate/limit.go b/vendor/go.opentelemetry.io/otel/sdk/metric/internal/aggregate/limit.go
deleted file mode 100644
index 9ea0251ed..000000000
--- a/vendor/go.opentelemetry.io/otel/sdk/metric/internal/aggregate/limit.go
+++ /dev/null
@@ -1,42 +0,0 @@
-// Copyright The OpenTelemetry Authors
-// SPDX-License-Identifier: Apache-2.0
-
-package aggregate // import "go.opentelemetry.io/otel/sdk/metric/internal/aggregate"
-
-import "go.opentelemetry.io/otel/attribute"
-
-// overflowSet is the attribute set used to record a measurement when adding
-// another distinct attribute set to the aggregate would exceed the aggregate
-// limit.
-var overflowSet = attribute.NewSet(attribute.Bool("otel.metric.overflow", true))
-
-// limiter limits aggregate values.
-type limiter[V any] struct {
- // aggLimit is the maximum number of metric streams that can be aggregated.
- //
- // Any metric stream with attributes distinct from any set already
- // aggregated once the aggLimit will be meet will instead be aggregated
- // into an "overflow" metric stream. That stream will only contain the
- // "otel.metric.overflow"=true attribute.
- aggLimit int
-}
-
-// newLimiter returns a new Limiter with the provided aggregation limit.
-func newLimiter[V any](aggregation int) limiter[V] {
- return limiter[V]{aggLimit: aggregation}
-}
-
-// Attributes checks if adding a measurement for attrs will exceed the
-// aggregation cardinality limit for the existing measurements. If it will,
-// overflowSet is returned. Otherwise, if it will not exceed the limit, or the
-// limit is not set (limit <= 0), attr is returned.
-func (l limiter[V]) Attributes(attrs attribute.Set, measurements map[attribute.Distinct]V) attribute.Set {
- if l.aggLimit > 0 {
- _, exists := measurements[attrs.Equivalent()]
- if !exists && len(measurements) >= l.aggLimit-1 {
- return overflowSet
- }
- }
-
- return attrs
-}
diff --git a/vendor/go.opentelemetry.io/otel/sdk/metric/internal/aggregate/sum.go b/vendor/go.opentelemetry.io/otel/sdk/metric/internal/aggregate/sum.go
deleted file mode 100644
index 8e132ad61..000000000
--- a/vendor/go.opentelemetry.io/otel/sdk/metric/internal/aggregate/sum.go
+++ /dev/null
@@ -1,237 +0,0 @@
-// Copyright The OpenTelemetry Authors
-// SPDX-License-Identifier: Apache-2.0
-
-package aggregate // import "go.opentelemetry.io/otel/sdk/metric/internal/aggregate"
-
-import (
- "context"
- "sync"
- "time"
-
- "go.opentelemetry.io/otel/attribute"
- "go.opentelemetry.io/otel/sdk/metric/metricdata"
-)
-
-type sumValue[N int64 | float64] struct {
- n N
- res FilteredExemplarReservoir[N]
- attrs attribute.Set
-}
-
-// valueMap is the storage for sums.
-type valueMap[N int64 | float64] struct {
- sync.Mutex
- newRes func(attribute.Set) FilteredExemplarReservoir[N]
- limit limiter[sumValue[N]]
- values map[attribute.Distinct]sumValue[N]
-}
-
-func newValueMap[N int64 | float64](limit int, r func(attribute.Set) FilteredExemplarReservoir[N]) *valueMap[N] {
- return &valueMap[N]{
- newRes: r,
- limit: newLimiter[sumValue[N]](limit),
- values: make(map[attribute.Distinct]sumValue[N]),
- }
-}
-
-func (s *valueMap[N]) measure(ctx context.Context, value N, fltrAttr attribute.Set, droppedAttr []attribute.KeyValue) {
- s.Lock()
- defer s.Unlock()
-
- attr := s.limit.Attributes(fltrAttr, s.values)
- v, ok := s.values[attr.Equivalent()]
- if !ok {
- v.res = s.newRes(attr)
- }
-
- v.attrs = attr
- v.n += value
- v.res.Offer(ctx, value, droppedAttr)
-
- s.values[attr.Equivalent()] = v
-}
-
-// newSum returns an aggregator that summarizes a set of measurements as their
-// arithmetic sum. Each sum is scoped by attributes and the aggregation cycle
-// the measurements were made in.
-func newSum[N int64 | float64](monotonic bool, limit int, r func(attribute.Set) FilteredExemplarReservoir[N]) *sum[N] {
- return &sum[N]{
- valueMap: newValueMap[N](limit, r),
- monotonic: monotonic,
- start: now(),
- }
-}
-
-// sum summarizes a set of measurements made as their arithmetic sum.
-type sum[N int64 | float64] struct {
- *valueMap[N]
-
- monotonic bool
- start time.Time
-}
-
-func (s *sum[N]) delta(dest *metricdata.Aggregation) int {
- t := now()
-
- // If *dest is not a metricdata.Sum, memory reuse is missed. In that case,
- // use the zero-value sData and hope for better alignment next cycle.
- sData, _ := (*dest).(metricdata.Sum[N])
- sData.Temporality = metricdata.DeltaTemporality
- sData.IsMonotonic = s.monotonic
-
- s.Lock()
- defer s.Unlock()
-
- n := len(s.values)
- dPts := reset(sData.DataPoints, n, n)
-
- var i int
- for _, val := range s.values {
- dPts[i].Attributes = val.attrs
- dPts[i].StartTime = s.start
- dPts[i].Time = t
- dPts[i].Value = val.n
- collectExemplars(&dPts[i].Exemplars, val.res.Collect)
- i++
- }
- // Do not report stale values.
- clear(s.values)
- // The delta collection cycle resets.
- s.start = t
-
- sData.DataPoints = dPts
- *dest = sData
-
- return n
-}
-
-func (s *sum[N]) cumulative(dest *metricdata.Aggregation) int {
- t := now()
-
- // If *dest is not a metricdata.Sum, memory reuse is missed. In that case,
- // use the zero-value sData and hope for better alignment next cycle.
- sData, _ := (*dest).(metricdata.Sum[N])
- sData.Temporality = metricdata.CumulativeTemporality
- sData.IsMonotonic = s.monotonic
-
- s.Lock()
- defer s.Unlock()
-
- n := len(s.values)
- dPts := reset(sData.DataPoints, n, n)
-
- var i int
- for _, value := range s.values {
- dPts[i].Attributes = value.attrs
- dPts[i].StartTime = s.start
- dPts[i].Time = t
- dPts[i].Value = value.n
- collectExemplars(&dPts[i].Exemplars, value.res.Collect)
- // TODO (#3006): This will use an unbounded amount of memory if there
- // are unbounded number of attribute sets being aggregated. Attribute
- // sets that become "stale" need to be forgotten so this will not
- // overload the system.
- i++
- }
-
- sData.DataPoints = dPts
- *dest = sData
-
- return n
-}
-
-// newPrecomputedSum returns an aggregator that summarizes a set of
-// observations as their arithmetic sum. Each sum is scoped by attributes and
-// the aggregation cycle the measurements were made in.
-func newPrecomputedSum[N int64 | float64](monotonic bool, limit int, r func(attribute.Set) FilteredExemplarReservoir[N]) *precomputedSum[N] {
- return &precomputedSum[N]{
- valueMap: newValueMap[N](limit, r),
- monotonic: monotonic,
- start: now(),
- }
-}
-
-// precomputedSum summarizes a set of observations as their arithmetic sum.
-type precomputedSum[N int64 | float64] struct {
- *valueMap[N]
-
- monotonic bool
- start time.Time
-
- reported map[attribute.Distinct]N
-}
-
-func (s *precomputedSum[N]) delta(dest *metricdata.Aggregation) int {
- t := now()
- newReported := make(map[attribute.Distinct]N)
-
- // If *dest is not a metricdata.Sum, memory reuse is missed. In that case,
- // use the zero-value sData and hope for better alignment next cycle.
- sData, _ := (*dest).(metricdata.Sum[N])
- sData.Temporality = metricdata.DeltaTemporality
- sData.IsMonotonic = s.monotonic
-
- s.Lock()
- defer s.Unlock()
-
- n := len(s.values)
- dPts := reset(sData.DataPoints, n, n)
-
- var i int
- for key, value := range s.values {
- delta := value.n - s.reported[key]
-
- dPts[i].Attributes = value.attrs
- dPts[i].StartTime = s.start
- dPts[i].Time = t
- dPts[i].Value = delta
- collectExemplars(&dPts[i].Exemplars, value.res.Collect)
-
- newReported[key] = value.n
- i++
- }
- // Unused attribute sets do not report.
- clear(s.values)
- s.reported = newReported
- // The delta collection cycle resets.
- s.start = t
-
- sData.DataPoints = dPts
- *dest = sData
-
- return n
-}
-
-func (s *precomputedSum[N]) cumulative(dest *metricdata.Aggregation) int {
- t := now()
-
- // If *dest is not a metricdata.Sum, memory reuse is missed. In that case,
- // use the zero-value sData and hope for better alignment next cycle.
- sData, _ := (*dest).(metricdata.Sum[N])
- sData.Temporality = metricdata.CumulativeTemporality
- sData.IsMonotonic = s.monotonic
-
- s.Lock()
- defer s.Unlock()
-
- n := len(s.values)
- dPts := reset(sData.DataPoints, n, n)
-
- var i int
- for _, val := range s.values {
- dPts[i].Attributes = val.attrs
- dPts[i].StartTime = s.start
- dPts[i].Time = t
- dPts[i].Value = val.n
- collectExemplars(&dPts[i].Exemplars, val.res.Collect)
-
- i++
- }
- // Unused attribute sets do not report.
- clear(s.values)
-
- sData.DataPoints = dPts
- *dest = sData
-
- return n
-}