summaryrefslogtreecommitdiff
path: root/vendor/go.opentelemetry.io/otel/sdk/metric/internal
diff options
context:
space:
mode:
Diffstat (limited to 'vendor/go.opentelemetry.io/otel/sdk/metric/internal')
-rw-r--r--vendor/go.opentelemetry.io/otel/sdk/metric/internal/aggregate/aggregate.go133
-rw-r--r--vendor/go.opentelemetry.io/otel/sdk/metric/internal/aggregate/doc.go18
-rw-r--r--vendor/go.opentelemetry.io/otel/sdk/metric/internal/aggregate/exponential_histogram.go432
-rw-r--r--vendor/go.opentelemetry.io/otel/sdk/metric/internal/aggregate/histogram.go231
-rw-r--r--vendor/go.opentelemetry.io/otel/sdk/metric/internal/aggregate/lastvalue.go68
-rw-r--r--vendor/go.opentelemetry.io/otel/sdk/metric/internal/aggregate/sum.go222
-rw-r--r--vendor/go.opentelemetry.io/otel/sdk/metric/internal/reuse_slice.go24
7 files changed, 1128 insertions, 0 deletions
diff --git a/vendor/go.opentelemetry.io/otel/sdk/metric/internal/aggregate/aggregate.go b/vendor/go.opentelemetry.io/otel/sdk/metric/internal/aggregate/aggregate.go
new file mode 100644
index 000000000..8dec14237
--- /dev/null
+++ b/vendor/go.opentelemetry.io/otel/sdk/metric/internal/aggregate/aggregate.go
@@ -0,0 +1,133 @@
+// Copyright The OpenTelemetry Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package aggregate // import "go.opentelemetry.io/otel/sdk/metric/internal/aggregate"
+
+import (
+ "context"
+ "time"
+
+ "go.opentelemetry.io/otel/attribute"
+ "go.opentelemetry.io/otel/sdk/metric/metricdata"
+)
+
+// now is used to return the current local time while allowing tests to
+// override the default time.Now function.
+var now = time.Now
+
+// Measure receives measurements to be aggregated.
+type Measure[N int64 | float64] func(context.Context, N, attribute.Set)
+
+// ComputeAggregation stores the aggregate of measurements into dest and
+// returns the number of aggregate data-points output.
+type ComputeAggregation func(dest *metricdata.Aggregation) int
+
+// Builder builds an aggregate function.
+type Builder[N int64 | float64] struct {
+ // Temporality is the temporality used for the returned aggregate function.
+ //
+ // If this is not provided a default of cumulative will be used (except for
+ // the last-value aggregate function where delta is the only appropriate
+ // temporality).
+ Temporality metricdata.Temporality
+ // Filter is the attribute filter the aggregate function will use on the
+ // input of measurements.
+ Filter attribute.Filter
+}
+
+func (b Builder[N]) filter(f Measure[N]) Measure[N] {
+ if b.Filter != nil {
+ fltr := b.Filter // Copy to make it immutable after assignment.
+ return func(ctx context.Context, n N, a attribute.Set) {
+ fAttr, _ := a.Filter(fltr)
+ f(ctx, n, fAttr)
+ }
+ }
+ return f
+}
+
+// LastValue returns a last-value aggregate function input and output.
+//
+// The Builder.Temporality is ignored and delta is use always.
+func (b Builder[N]) LastValue() (Measure[N], ComputeAggregation) {
+ // Delta temporality is the only temporality that makes semantic sense for
+ // a last-value aggregate.
+ lv := newLastValue[N]()
+
+ return b.filter(lv.measure), func(dest *metricdata.Aggregation) int {
+ // Ignore if dest is not a metricdata.Gauge. The chance for memory
+ // reuse of the DataPoints is missed (better luck next time).
+ gData, _ := (*dest).(metricdata.Gauge[N])
+ lv.computeAggregation(&gData.DataPoints)
+ *dest = gData
+
+ return len(gData.DataPoints)
+ }
+}
+
+// PrecomputedSum returns a sum aggregate function input and output. The
+// arguments passed to the input are expected to be the precomputed sum values.
+func (b Builder[N]) PrecomputedSum(monotonic bool) (Measure[N], ComputeAggregation) {
+ s := newPrecomputedSum[N](monotonic)
+ switch b.Temporality {
+ case metricdata.DeltaTemporality:
+ return b.filter(s.measure), s.delta
+ default:
+ return b.filter(s.measure), s.cumulative
+ }
+}
+
+// Sum returns a sum aggregate function input and output.
+func (b Builder[N]) Sum(monotonic bool) (Measure[N], ComputeAggregation) {
+ s := newSum[N](monotonic)
+ switch b.Temporality {
+ case metricdata.DeltaTemporality:
+ return b.filter(s.measure), s.delta
+ default:
+ return b.filter(s.measure), s.cumulative
+ }
+}
+
+// ExplicitBucketHistogram returns a histogram aggregate function input and
+// output.
+func (b Builder[N]) ExplicitBucketHistogram(boundaries []float64, noMinMax, noSum bool) (Measure[N], ComputeAggregation) {
+ h := newHistogram[N](boundaries, noMinMax, noSum)
+ switch b.Temporality {
+ case metricdata.DeltaTemporality:
+ return b.filter(h.measure), h.delta
+ default:
+ return b.filter(h.measure), h.cumulative
+ }
+}
+
+// ExponentialBucketHistogram returns a histogram aggregate function input and
+// output.
+func (b Builder[N]) ExponentialBucketHistogram(maxSize, maxScale int32, noMinMax, noSum bool) (Measure[N], ComputeAggregation) {
+ h := newExponentialHistogram[N](maxSize, maxScale, noMinMax, noSum)
+ switch b.Temporality {
+ case metricdata.DeltaTemporality:
+ return b.filter(h.measure), h.delta
+ default:
+ return b.filter(h.measure), h.cumulative
+ }
+}
+
+// reset ensures s has capacity and sets it length. If the capacity of s too
+// small, a new slice is returned with the specified capacity and length.
+func reset[T any](s []T, length, capacity int) []T {
+ if cap(s) < capacity {
+ return make([]T, length, capacity)
+ }
+ return s[:length]
+}
diff --git a/vendor/go.opentelemetry.io/otel/sdk/metric/internal/aggregate/doc.go b/vendor/go.opentelemetry.io/otel/sdk/metric/internal/aggregate/doc.go
new file mode 100644
index 000000000..e83a2693f
--- /dev/null
+++ b/vendor/go.opentelemetry.io/otel/sdk/metric/internal/aggregate/doc.go
@@ -0,0 +1,18 @@
+// Copyright The OpenTelemetry Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// Package aggregate provides aggregate types used compute aggregations and
+// cycle the state of metric measurements made by the SDK. These types and
+// functionality are meant only for internal SDK use.
+package aggregate // import "go.opentelemetry.io/otel/sdk/metric/internal/aggregate"
diff --git a/vendor/go.opentelemetry.io/otel/sdk/metric/internal/aggregate/exponential_histogram.go b/vendor/go.opentelemetry.io/otel/sdk/metric/internal/aggregate/exponential_histogram.go
new file mode 100644
index 000000000..98b7dc1e0
--- /dev/null
+++ b/vendor/go.opentelemetry.io/otel/sdk/metric/internal/aggregate/exponential_histogram.go
@@ -0,0 +1,432 @@
+// Copyright The OpenTelemetry Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package aggregate // import "go.opentelemetry.io/otel/sdk/metric/internal/aggregate"
+
+import (
+ "context"
+ "errors"
+ "math"
+ "sync"
+ "time"
+
+ "go.opentelemetry.io/otel"
+ "go.opentelemetry.io/otel/attribute"
+ "go.opentelemetry.io/otel/sdk/metric/metricdata"
+)
+
+const (
+ expoMaxScale = 20
+ expoMinScale = -10
+
+ smallestNonZeroNormalFloat64 = 0x1p-1022
+
+ // These redefine the Math constants with a type, so the compiler won't coerce
+ // them into an int on 32 bit platforms.
+ maxInt64 int64 = math.MaxInt64
+ minInt64 int64 = math.MinInt64
+)
+
+// expoHistogramDataPoint is a single data point in an exponential histogram.
+type expoHistogramDataPoint[N int64 | float64] struct {
+ count uint64
+ min N
+ max N
+ sum N
+
+ maxSize int
+ noMinMax bool
+ noSum bool
+
+ scale int
+
+ posBuckets expoBuckets
+ negBuckets expoBuckets
+ zeroCount uint64
+}
+
+func newExpoHistogramDataPoint[N int64 | float64](maxSize, maxScale int, noMinMax, noSum bool) *expoHistogramDataPoint[N] {
+ f := math.MaxFloat64
+ max := N(f) // if N is int64, max will overflow to -9223372036854775808
+ min := N(-f)
+ if N(maxInt64) > N(f) {
+ max = N(maxInt64)
+ min = N(minInt64)
+ }
+ return &expoHistogramDataPoint[N]{
+ min: max,
+ max: min,
+ maxSize: maxSize,
+ noMinMax: noMinMax,
+ noSum: noSum,
+ scale: maxScale,
+ }
+}
+
+// record adds a new measurement to the histogram. It will rescale the buckets if needed.
+func (p *expoHistogramDataPoint[N]) record(v N) {
+ p.count++
+
+ if !p.noMinMax {
+ if v < p.min {
+ p.min = v
+ }
+ if v > p.max {
+ p.max = v
+ }
+ }
+ if !p.noSum {
+ p.sum += v
+ }
+
+ absV := math.Abs(float64(v))
+
+ if float64(absV) == 0.0 {
+ p.zeroCount++
+ return
+ }
+
+ bin := p.getBin(absV)
+
+ bucket := &p.posBuckets
+ if v < 0 {
+ bucket = &p.negBuckets
+ }
+
+ // If the new bin would make the counts larger than maxScale, we need to
+ // downscale current measurements.
+ if scaleDelta := p.scaleChange(bin, bucket.startBin, len(bucket.counts)); scaleDelta > 0 {
+ if p.scale-scaleDelta < expoMinScale {
+ // With a scale of -10 there is only two buckets for the whole range of float64 values.
+ // This can only happen if there is a max size of 1.
+ otel.Handle(errors.New("exponential histogram scale underflow"))
+ return
+ }
+ // Downscale
+ p.scale -= scaleDelta
+ p.posBuckets.downscale(scaleDelta)
+ p.negBuckets.downscale(scaleDelta)
+
+ bin = p.getBin(absV)
+ }
+
+ bucket.record(bin)
+}
+
+// getBin returns the bin v should be recorded into.
+func (p *expoHistogramDataPoint[N]) getBin(v float64) int {
+ frac, exp := math.Frexp(v)
+ if p.scale <= 0 {
+ // Because of the choice of fraction is always 1 power of two higher than we want.
+ correction := 1
+ if frac == .5 {
+ // If v is an exact power of two the frac will be .5 and the exp
+ // will be one higher than we want.
+ correction = 2
+ }
+ return (exp - correction) >> (-p.scale)
+ }
+ return exp<<p.scale + int(math.Log(frac)*scaleFactors[p.scale]) - 1
+}
+
+// scaleFactors are constants used in calculating the logarithm index. They are
+// equivalent to 2^index/log(2).
+var scaleFactors = [21]float64{
+ math.Ldexp(math.Log2E, 0),
+ math.Ldexp(math.Log2E, 1),
+ math.Ldexp(math.Log2E, 2),
+ math.Ldexp(math.Log2E, 3),
+ math.Ldexp(math.Log2E, 4),
+ math.Ldexp(math.Log2E, 5),
+ math.Ldexp(math.Log2E, 6),
+ math.Ldexp(math.Log2E, 7),
+ math.Ldexp(math.Log2E, 8),
+ math.Ldexp(math.Log2E, 9),
+ math.Ldexp(math.Log2E, 10),
+ math.Ldexp(math.Log2E, 11),
+ math.Ldexp(math.Log2E, 12),
+ math.Ldexp(math.Log2E, 13),
+ math.Ldexp(math.Log2E, 14),
+ math.Ldexp(math.Log2E, 15),
+ math.Ldexp(math.Log2E, 16),
+ math.Ldexp(math.Log2E, 17),
+ math.Ldexp(math.Log2E, 18),
+ math.Ldexp(math.Log2E, 19),
+ math.Ldexp(math.Log2E, 20),
+}
+
+// scaleChange returns the magnitude of the scale change needed to fit bin in
+// the bucket. If no scale change is needed 0 is returned.
+func (p *expoHistogramDataPoint[N]) scaleChange(bin, startBin, length int) int {
+ if length == 0 {
+ // No need to rescale if there are no buckets.
+ return 0
+ }
+
+ low := startBin
+ high := bin
+ if startBin >= bin {
+ low = bin
+ high = startBin + length - 1
+ }
+
+ count := 0
+ for high-low >= p.maxSize {
+ low = low >> 1
+ high = high >> 1
+ count++
+ if count > expoMaxScale-expoMinScale {
+ return count
+ }
+ }
+ return count
+}
+
+// expoBuckets is a set of buckets in an exponential histogram.
+type expoBuckets struct {
+ startBin int
+ counts []uint64
+}
+
+// record increments the count for the given bin, and expands the buckets if needed.
+// Size changes must be done before calling this function.
+func (b *expoBuckets) record(bin int) {
+ if len(b.counts) == 0 {
+ b.counts = []uint64{1}
+ b.startBin = bin
+ return
+ }
+
+ endBin := b.startBin + len(b.counts) - 1
+
+ // if the new bin is inside the current range
+ if bin >= b.startBin && bin <= endBin {
+ b.counts[bin-b.startBin]++
+ return
+ }
+ // if the new bin is before the current start add spaces to the counts
+ if bin < b.startBin {
+ origLen := len(b.counts)
+ newLength := endBin - bin + 1
+ shift := b.startBin - bin
+
+ if newLength > cap(b.counts) {
+ b.counts = append(b.counts, make([]uint64, newLength-len(b.counts))...)
+ }
+
+ copy(b.counts[shift:origLen+shift], b.counts[:])
+ b.counts = b.counts[:newLength]
+ for i := 1; i < shift; i++ {
+ b.counts[i] = 0
+ }
+ b.startBin = bin
+ b.counts[0] = 1
+ return
+ }
+ // if the new is after the end add spaces to the end
+ if bin > endBin {
+ if bin-b.startBin < cap(b.counts) {
+ b.counts = b.counts[:bin-b.startBin+1]
+ for i := endBin + 1 - b.startBin; i < len(b.counts); i++ {
+ b.counts[i] = 0
+ }
+ b.counts[bin-b.startBin] = 1
+ return
+ }
+
+ end := make([]uint64, bin-b.startBin-len(b.counts)+1)
+ b.counts = append(b.counts, end...)
+ b.counts[bin-b.startBin] = 1
+ }
+}
+
+// downscale shrinks a bucket by a factor of 2*s. It will sum counts into the
+// correct lower resolution bucket.
+func (b *expoBuckets) downscale(delta int) {
+ // Example
+ // delta = 2
+ // Original offset: -6
+ // Counts: [ 3, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
+ // bins: -6 -5, -4, -3, -2, -1, 0, 1, 2, 3, 4
+ // new bins:-2, -2, -1, -1, -1, -1, 0, 0, 0, 0, 1
+ // new Offset: -2
+ // new Counts: [4, 14, 30, 10]
+
+ if len(b.counts) <= 1 || delta < 1 {
+ b.startBin = b.startBin >> delta
+ return
+ }
+
+ steps := 1 << delta
+ offset := b.startBin % steps
+ offset = (offset + steps) % steps // to make offset positive
+ for i := 1; i < len(b.counts); i++ {
+ idx := i + offset
+ if idx%steps == 0 {
+ b.counts[idx/steps] = b.counts[i]
+ continue
+ }
+ b.counts[idx/steps] += b.counts[i]
+ }
+
+ lastIdx := (len(b.counts) - 1 + offset) / steps
+ b.counts = b.counts[:lastIdx+1]
+ b.startBin = b.startBin >> delta
+}
+
+// newExponentialHistogram returns an Aggregator that summarizes a set of
+// measurements as an exponential histogram. Each histogram is scoped by attributes
+// and the aggregation cycle the measurements were made in.
+func newExponentialHistogram[N int64 | float64](maxSize, maxScale int32, noMinMax, noSum bool) *expoHistogram[N] {
+ return &expoHistogram[N]{
+ noSum: noSum,
+ noMinMax: noMinMax,
+ maxSize: int(maxSize),
+ maxScale: int(maxScale),
+
+ values: make(map[attribute.Set]*expoHistogramDataPoint[N]),
+
+ start: now(),
+ }
+}
+
+// expoHistogram summarizes a set of measurements as an histogram with exponentially
+// defined buckets.
+type expoHistogram[N int64 | float64] struct {
+ noSum bool
+ noMinMax bool
+ maxSize int
+ maxScale int
+
+ values map[attribute.Set]*expoHistogramDataPoint[N]
+ valuesMu sync.Mutex
+
+ start time.Time
+}
+
+func (e *expoHistogram[N]) measure(_ context.Context, value N, attr attribute.Set) {
+ // Ignore NaN and infinity.
+ if math.IsInf(float64(value), 0) || math.IsNaN(float64(value)) {
+ return
+ }
+
+ e.valuesMu.Lock()
+ defer e.valuesMu.Unlock()
+
+ v, ok := e.values[attr]
+ if !ok {
+ v = newExpoHistogramDataPoint[N](e.maxSize, e.maxScale, e.noMinMax, e.noSum)
+ e.values[attr] = v
+ }
+ v.record(value)
+}
+
+func (e *expoHistogram[N]) delta(dest *metricdata.Aggregation) int {
+ t := now()
+
+ // If *dest is not a metricdata.ExponentialHistogram, memory reuse is missed.
+ // In that case, use the zero-value h and hope for better alignment next cycle.
+ h, _ := (*dest).(metricdata.ExponentialHistogram[N])
+ h.Temporality = metricdata.DeltaTemporality
+
+ e.valuesMu.Lock()
+ defer e.valuesMu.Unlock()
+
+ n := len(e.values)
+ hDPts := reset(h.DataPoints, n, n)
+
+ var i int
+ for a, b := range e.values {
+ hDPts[i].Attributes = a
+ hDPts[i].StartTime = e.start
+ hDPts[i].Time = t
+ hDPts[i].Count = b.count
+ hDPts[i].Scale = int32(b.scale)
+ hDPts[i].ZeroCount = b.zeroCount
+ hDPts[i].ZeroThreshold = 0.0
+
+ hDPts[i].PositiveBucket.Offset = int32(b.posBuckets.startBin)
+ hDPts[i].PositiveBucket.Counts = reset(hDPts[i].PositiveBucket.Counts, len(b.posBuckets.counts), len(b.posBuckets.counts))
+ copy(hDPts[i].PositiveBucket.Counts, b.posBuckets.counts)
+
+ hDPts[i].NegativeBucket.Offset = int32(b.negBuckets.startBin)
+ hDPts[i].NegativeBucket.Counts = reset(hDPts[i].NegativeBucket.Counts, len(b.negBuckets.counts), len(b.negBuckets.counts))
+
+ if !e.noSum {
+ hDPts[i].Sum = b.sum
+ }
+ if !e.noMinMax {
+ hDPts[i].Min = metricdata.NewExtrema(b.min)
+ hDPts[i].Max = metricdata.NewExtrema(b.max)
+ }
+
+ delete(e.values, a)
+ i++
+ }
+ e.start = t
+ h.DataPoints = hDPts
+ *dest = h
+ return n
+}
+
+func (e *expoHistogram[N]) cumulative(dest *metricdata.Aggregation) int {
+ t := now()
+
+ // If *dest is not a metricdata.ExponentialHistogram, memory reuse is missed.
+ // In that case, use the zero-value h and hope for better alignment next cycle.
+ h, _ := (*dest).(metricdata.ExponentialHistogram[N])
+ h.Temporality = metricdata.CumulativeTemporality
+
+ e.valuesMu.Lock()
+ defer e.valuesMu.Unlock()
+
+ n := len(e.values)
+ hDPts := reset(h.DataPoints, n, n)
+
+ var i int
+ for a, b := range e.values {
+ hDPts[i].Attributes = a
+ hDPts[i].StartTime = e.start
+ hDPts[i].Time = t
+ hDPts[i].Count = b.count
+ hDPts[i].Scale = int32(b.scale)
+ hDPts[i].ZeroCount = b.zeroCount
+ hDPts[i].ZeroThreshold = 0.0
+
+ hDPts[i].PositiveBucket.Offset = int32(b.posBuckets.startBin)
+ hDPts[i].PositiveBucket.Counts = reset(hDPts[i].PositiveBucket.Counts, len(b.posBuckets.counts), len(b.posBuckets.counts))
+ copy(hDPts[i].PositiveBucket.Counts, b.posBuckets.counts)
+
+ hDPts[i].NegativeBucket.Offset = int32(b.negBuckets.startBin)
+ hDPts[i].NegativeBucket.Counts = reset(hDPts[i].NegativeBucket.Counts, len(b.negBuckets.counts), len(b.negBuckets.counts))
+
+ if !e.noSum {
+ hDPts[i].Sum = b.sum
+ }
+ if !e.noMinMax {
+ hDPts[i].Min = metricdata.NewExtrema(b.min)
+ hDPts[i].Max = metricdata.NewExtrema(b.max)
+ }
+
+ i++
+ // TODO (#3006): This will use an unbounded amount of memory if there
+ // are unbounded number of attribute sets being aggregated. Attribute
+ // sets that become "stale" need to be forgotten so this will not
+ // overload the system.
+ }
+
+ h.DataPoints = hDPts
+ *dest = h
+ return n
+}
diff --git a/vendor/go.opentelemetry.io/otel/sdk/metric/internal/aggregate/histogram.go b/vendor/go.opentelemetry.io/otel/sdk/metric/internal/aggregate/histogram.go
new file mode 100644
index 000000000..62ec51e1f
--- /dev/null
+++ b/vendor/go.opentelemetry.io/otel/sdk/metric/internal/aggregate/histogram.go
@@ -0,0 +1,231 @@
+// Copyright The OpenTelemetry Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package aggregate // import "go.opentelemetry.io/otel/sdk/metric/internal/aggregate"
+
+import (
+ "context"
+ "sort"
+ "sync"
+ "time"
+
+ "go.opentelemetry.io/otel/attribute"
+ "go.opentelemetry.io/otel/sdk/metric/metricdata"
+)
+
+type buckets[N int64 | float64] struct {
+ counts []uint64
+ count uint64
+ total N
+ min, max N
+}
+
+// newBuckets returns buckets with n bins.
+func newBuckets[N int64 | float64](n int) *buckets[N] {
+ return &buckets[N]{counts: make([]uint64, n)}
+}
+
+func (b *buckets[N]) sum(value N) { b.total += value }
+
+func (b *buckets[N]) bin(idx int, value N) {
+ b.counts[idx]++
+ b.count++
+ if value < b.min {
+ b.min = value
+ } else if value > b.max {
+ b.max = value
+ }
+}
+
+// histValues summarizes a set of measurements as an histValues with
+// explicitly defined buckets.
+type histValues[N int64 | float64] struct {
+ noSum bool
+ bounds []float64
+
+ values map[attribute.Set]*buckets[N]
+ valuesMu sync.Mutex
+}
+
+func newHistValues[N int64 | float64](bounds []float64, noSum bool) *histValues[N] {
+ // The responsibility of keeping all buckets correctly associated with the
+ // passed boundaries is ultimately this type's responsibility. Make a copy
+ // here so we can always guarantee this. Or, in the case of failure, have
+ // complete control over the fix.
+ b := make([]float64, len(bounds))
+ copy(b, bounds)
+ sort.Float64s(b)
+ return &histValues[N]{
+ noSum: noSum,
+ bounds: b,
+ values: make(map[attribute.Set]*buckets[N]),
+ }
+}
+
+// Aggregate records the measurement value, scoped by attr, and aggregates it
+// into a histogram.
+func (s *histValues[N]) measure(_ context.Context, value N, attr attribute.Set) {
+ // This search will return an index in the range [0, len(s.bounds)], where
+ // it will return len(s.bounds) if value is greater than the last element
+ // of s.bounds. This aligns with the buckets in that the length of buckets
+ // is len(s.bounds)+1, with the last bucket representing:
+ // (s.bounds[len(s.bounds)-1], +∞).
+ idx := sort.SearchFloat64s(s.bounds, float64(value))
+
+ s.valuesMu.Lock()
+ defer s.valuesMu.Unlock()
+
+ b, ok := s.values[attr]
+ if !ok {
+ // N+1 buckets. For example:
+ //
+ // bounds = [0, 5, 10]
+ //
+ // Then,
+ //
+ // buckets = (-∞, 0], (0, 5.0], (5.0, 10.0], (10.0, +∞)
+ b = newBuckets[N](len(s.bounds) + 1)
+ // Ensure min and max are recorded values (not zero), for new buckets.
+ b.min, b.max = value, value
+ s.values[attr] = b
+ }
+ b.bin(idx, value)
+ if !s.noSum {
+ b.sum(value)
+ }
+}
+
+// newHistogram returns an Aggregator that summarizes a set of measurements as
+// an histogram.
+func newHistogram[N int64 | float64](boundaries []float64, noMinMax, noSum bool) *histogram[N] {
+ return &histogram[N]{
+ histValues: newHistValues[N](boundaries, noSum),
+ noMinMax: noMinMax,
+ start: now(),
+ }
+}
+
+// histogram summarizes a set of measurements as an histogram with explicitly
+// defined buckets.
+type histogram[N int64 | float64] struct {
+ *histValues[N]
+
+ noMinMax bool
+ start time.Time
+}
+
+func (s *histogram[N]) delta(dest *metricdata.Aggregation) int {
+ t := now()
+
+ // If *dest is not a metricdata.Histogram, memory reuse is missed. In that
+ // case, use the zero-value h and hope for better alignment next cycle.
+ h, _ := (*dest).(metricdata.Histogram[N])
+ h.Temporality = metricdata.DeltaTemporality
+
+ s.valuesMu.Lock()
+ defer s.valuesMu.Unlock()
+
+ // Do not allow modification of our copy of bounds.
+ bounds := make([]float64, len(s.bounds))
+ copy(bounds, s.bounds)
+
+ n := len(s.values)
+ hDPts := reset(h.DataPoints, n, n)
+
+ var i int
+ for a, b := range s.values {
+ hDPts[i].Attributes = a
+ hDPts[i].StartTime = s.start
+ hDPts[i].Time = t
+ hDPts[i].Count = b.count
+ hDPts[i].Bounds = bounds
+ hDPts[i].BucketCounts = b.counts
+
+ if !s.noSum {
+ hDPts[i].Sum = b.total
+ }
+
+ if !s.noMinMax {
+ hDPts[i].Min = metricdata.NewExtrema(b.min)
+ hDPts[i].Max = metricdata.NewExtrema(b.max)
+ }
+
+ // Unused attribute sets do not report.
+ delete(s.values, a)
+ i++
+ }
+ // The delta collection cycle resets.
+ s.start = t
+
+ h.DataPoints = hDPts
+ *dest = h
+
+ return n
+}
+
+func (s *histogram[N]) cumulative(dest *metricdata.Aggregation) int {
+ t := now()
+
+ // If *dest is not a metricdata.Histogram, memory reuse is missed. In that
+ // case, use the zero-value h and hope for better alignment next cycle.
+ h, _ := (*dest).(metricdata.Histogram[N])
+ h.Temporality = metricdata.CumulativeTemporality
+
+ s.valuesMu.Lock()
+ defer s.valuesMu.Unlock()
+
+ // Do not allow modification of our copy of bounds.
+ bounds := make([]float64, len(s.bounds))
+ copy(bounds, s.bounds)
+
+ n := len(s.values)
+ hDPts := reset(h.DataPoints, n, n)
+
+ var i int
+ for a, b := range s.values {
+ // The HistogramDataPoint field values returned need to be copies of
+ // the buckets value as we will keep updating them.
+ //
+ // TODO (#3047): Making copies for bounds and counts incurs a large
+ // memory allocation footprint. Alternatives should be explored.
+ counts := make([]uint64, len(b.counts))
+ copy(counts, b.counts)
+
+ hDPts[i].Attributes = a
+ hDPts[i].StartTime = s.start
+ hDPts[i].Time = t
+ hDPts[i].Count = b.count
+ hDPts[i].Bounds = bounds
+ hDPts[i].BucketCounts = counts
+
+ if !s.noSum {
+ hDPts[i].Sum = b.total
+ }
+
+ if !s.noMinMax {
+ hDPts[i].Min = metricdata.NewExtrema(b.min)
+ hDPts[i].Max = metricdata.NewExtrema(b.max)
+ }
+ i++
+ // TODO (#3006): This will use an unbounded amount of memory if there
+ // are unbounded number of attribute sets being aggregated. Attribute
+ // sets that become "stale" need to be forgotten so this will not
+ // overload the system.
+ }
+
+ h.DataPoints = hDPts
+ *dest = h
+
+ return n
+}
diff --git a/vendor/go.opentelemetry.io/otel/sdk/metric/internal/aggregate/lastvalue.go b/vendor/go.opentelemetry.io/otel/sdk/metric/internal/aggregate/lastvalue.go
new file mode 100644
index 000000000..6af2d6061
--- /dev/null
+++ b/vendor/go.opentelemetry.io/otel/sdk/metric/internal/aggregate/lastvalue.go
@@ -0,0 +1,68 @@
+// Copyright The OpenTelemetry Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package aggregate // import "go.opentelemetry.io/otel/sdk/metric/internal/aggregate"
+
+import (
+ "context"
+ "sync"
+ "time"
+
+ "go.opentelemetry.io/otel/attribute"
+ "go.opentelemetry.io/otel/sdk/metric/metricdata"
+)
+
+// datapoint is timestamped measurement data.
+type datapoint[N int64 | float64] struct {
+ timestamp time.Time
+ value N
+}
+
+func newLastValue[N int64 | float64]() *lastValue[N] {
+ return &lastValue[N]{values: make(map[attribute.Set]datapoint[N])}
+}
+
+// lastValue summarizes a set of measurements as the last one made.
+type lastValue[N int64 | float64] struct {
+ sync.Mutex
+
+ values map[attribute.Set]datapoint[N]
+}
+
+func (s *lastValue[N]) measure(ctx context.Context, value N, attr attribute.Set) {
+ d := datapoint[N]{timestamp: now(), value: value}
+ s.Lock()
+ s.values[attr] = d
+ s.Unlock()
+}
+
+func (s *lastValue[N]) computeAggregation(dest *[]metricdata.DataPoint[N]) {
+ s.Lock()
+ defer s.Unlock()
+
+ n := len(s.values)
+ *dest = reset(*dest, n, n)
+
+ var i int
+ for a, v := range s.values {
+ (*dest)[i].Attributes = a
+ // The event time is the only meaningful timestamp, StartTime is
+ // ignored.
+ (*dest)[i].Time = v.timestamp
+ (*dest)[i].Value = v.value
+ // Do not report stale values.
+ delete(s.values, a)
+ i++
+ }
+}
diff --git a/vendor/go.opentelemetry.io/otel/sdk/metric/internal/aggregate/sum.go b/vendor/go.opentelemetry.io/otel/sdk/metric/internal/aggregate/sum.go
new file mode 100644
index 000000000..1e52ff0d1
--- /dev/null
+++ b/vendor/go.opentelemetry.io/otel/sdk/metric/internal/aggregate/sum.go
@@ -0,0 +1,222 @@
+// Copyright The OpenTelemetry Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package aggregate // import "go.opentelemetry.io/otel/sdk/metric/internal/aggregate"
+
+import (
+ "context"
+ "sync"
+ "time"
+
+ "go.opentelemetry.io/otel/attribute"
+ "go.opentelemetry.io/otel/sdk/metric/metricdata"
+)
+
+// valueMap is the storage for sums.
+type valueMap[N int64 | float64] struct {
+ sync.Mutex
+ values map[attribute.Set]N
+}
+
+func newValueMap[N int64 | float64]() *valueMap[N] {
+ return &valueMap[N]{values: make(map[attribute.Set]N)}
+}
+
+func (s *valueMap[N]) measure(_ context.Context, value N, attr attribute.Set) {
+ s.Lock()
+ s.values[attr] += value
+ s.Unlock()
+}
+
+// newSum returns an aggregator that summarizes a set of measurements as their
+// arithmetic sum. Each sum is scoped by attributes and the aggregation cycle
+// the measurements were made in.
+func newSum[N int64 | float64](monotonic bool) *sum[N] {
+ return &sum[N]{
+ valueMap: newValueMap[N](),
+ monotonic: monotonic,
+ start: now(),
+ }
+}
+
+// sum summarizes a set of measurements made as their arithmetic sum.
+type sum[N int64 | float64] struct {
+ *valueMap[N]
+
+ monotonic bool
+ start time.Time
+}
+
+func (s *sum[N]) delta(dest *metricdata.Aggregation) int {
+ t := now()
+
+ // If *dest is not a metricdata.Sum, memory reuse is missed. In that case,
+ // use the zero-value sData and hope for better alignment next cycle.
+ sData, _ := (*dest).(metricdata.Sum[N])
+ sData.Temporality = metricdata.DeltaTemporality
+ sData.IsMonotonic = s.monotonic
+
+ s.Lock()
+ defer s.Unlock()
+
+ n := len(s.values)
+ dPts := reset(sData.DataPoints, n, n)
+
+ var i int
+ for attr, value := range s.values {
+ dPts[i].Attributes = attr
+ dPts[i].StartTime = s.start
+ dPts[i].Time = t
+ dPts[i].Value = value
+ // Do not report stale values.
+ delete(s.values, attr)
+ i++
+ }
+ // The delta collection cycle resets.
+ s.start = t
+
+ sData.DataPoints = dPts
+ *dest = sData
+
+ return n
+}
+
+func (s *sum[N]) cumulative(dest *metricdata.Aggregation) int {
+ t := now()
+
+ // If *dest is not a metricdata.Sum, memory reuse is missed. In that case,
+ // use the zero-value sData and hope for better alignment next cycle.
+ sData, _ := (*dest).(metricdata.Sum[N])
+ sData.Temporality = metricdata.CumulativeTemporality
+ sData.IsMonotonic = s.monotonic
+
+ s.Lock()
+ defer s.Unlock()
+
+ n := len(s.values)
+ dPts := reset(sData.DataPoints, n, n)
+
+ var i int
+ for attr, value := range s.values {
+ dPts[i].Attributes = attr
+ dPts[i].StartTime = s.start
+ dPts[i].Time = t
+ dPts[i].Value = value
+ // TODO (#3006): This will use an unbounded amount of memory if there
+ // are unbounded number of attribute sets being aggregated. Attribute
+ // sets that become "stale" need to be forgotten so this will not
+ // overload the system.
+ i++
+ }
+
+ sData.DataPoints = dPts
+ *dest = sData
+
+ return n
+}
+
+// newPrecomputedSum returns an aggregator that summarizes a set of
+// observatrions as their arithmetic sum. Each sum is scoped by attributes and
+// the aggregation cycle the measurements were made in.
+func newPrecomputedSum[N int64 | float64](monotonic bool) *precomputedSum[N] {
+ return &precomputedSum[N]{
+ valueMap: newValueMap[N](),
+ monotonic: monotonic,
+ start: now(),
+ }
+}
+
+// precomputedSum summarizes a set of observatrions as their arithmetic sum.
+type precomputedSum[N int64 | float64] struct {
+ *valueMap[N]
+
+ monotonic bool
+ start time.Time
+
+ reported map[attribute.Set]N
+}
+
+func (s *precomputedSum[N]) delta(dest *metricdata.Aggregation) int {
+ t := now()
+ newReported := make(map[attribute.Set]N)
+
+ // If *dest is not a metricdata.Sum, memory reuse is missed. In that case,
+ // use the zero-value sData and hope for better alignment next cycle.
+ sData, _ := (*dest).(metricdata.Sum[N])
+ sData.Temporality = metricdata.DeltaTemporality
+ sData.IsMonotonic = s.monotonic
+
+ s.Lock()
+ defer s.Unlock()
+
+ n := len(s.values)
+ dPts := reset(sData.DataPoints, n, n)
+
+ var i int
+ for attr, value := range s.values {
+ delta := value - s.reported[attr]
+
+ dPts[i].Attributes = attr
+ dPts[i].StartTime = s.start
+ dPts[i].Time = t
+ dPts[i].Value = delta
+
+ newReported[attr] = value
+ // Unused attribute sets do not report.
+ delete(s.values, attr)
+ i++
+ }
+ // Unused attribute sets are forgotten.
+ s.reported = newReported
+ // The delta collection cycle resets.
+ s.start = t
+
+ sData.DataPoints = dPts
+ *dest = sData
+
+ return n
+}
+
+func (s *precomputedSum[N]) cumulative(dest *metricdata.Aggregation) int {
+ t := now()
+
+ // If *dest is not a metricdata.Sum, memory reuse is missed. In that case,
+ // use the zero-value sData and hope for better alignment next cycle.
+ sData, _ := (*dest).(metricdata.Sum[N])
+ sData.Temporality = metricdata.CumulativeTemporality
+ sData.IsMonotonic = s.monotonic
+
+ s.Lock()
+ defer s.Unlock()
+
+ n := len(s.values)
+ dPts := reset(sData.DataPoints, n, n)
+
+ var i int
+ for attr, value := range s.values {
+ dPts[i].Attributes = attr
+ dPts[i].StartTime = s.start
+ dPts[i].Time = t
+ dPts[i].Value = value
+
+ // Unused attribute sets do not report.
+ delete(s.values, attr)
+ i++
+ }
+
+ sData.DataPoints = dPts
+ *dest = sData
+
+ return n
+}
diff --git a/vendor/go.opentelemetry.io/otel/sdk/metric/internal/reuse_slice.go b/vendor/go.opentelemetry.io/otel/sdk/metric/internal/reuse_slice.go
new file mode 100644
index 000000000..9695492b0
--- /dev/null
+++ b/vendor/go.opentelemetry.io/otel/sdk/metric/internal/reuse_slice.go
@@ -0,0 +1,24 @@
+// Copyright The OpenTelemetry Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package internal // import "go.opentelemetry.io/otel/sdk/metric/internal"
+
+// ReuseSlice returns a zeroed view of slice if its capacity is greater than or
+// equal to n. Otherwise, it returns a new []T with capacity equal to n.
+func ReuseSlice[T any](slice []T, n int) []T {
+ if cap(slice) >= n {
+ return slice[:n]
+ }
+ return make([]T, n)
+}