diff options
Diffstat (limited to 'vendor/go.opentelemetry.io/otel/sdk/metric/internal')
20 files changed, 546 insertions, 444 deletions
diff --git a/vendor/go.opentelemetry.io/otel/sdk/metric/internal/aggregate/aggregate.go b/vendor/go.opentelemetry.io/otel/sdk/metric/internal/aggregate/aggregate.go index 4060a2f76..b18ee719b 100644 --- a/vendor/go.opentelemetry.io/otel/sdk/metric/internal/aggregate/aggregate.go +++ b/vendor/go.opentelemetry.io/otel/sdk/metric/internal/aggregate/aggregate.go @@ -1,16 +1,5 @@ // Copyright The OpenTelemetry Authors -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. +// SPDX-License-Identifier: Apache-2.0 package aggregate // import "go.opentelemetry.io/otel/sdk/metric/internal/aggregate" @@ -50,7 +39,7 @@ type Builder[N int64 | float64] struct { // // If this is not provided a default factory function that returns an // exemplar.Drop reservoir will be used. - ReservoirFunc func() exemplar.Reservoir[N] + ReservoirFunc func() exemplar.FilteredReservoir[N] // AggregationLimit is the cardinality limit of measurement attributes. Any // measurement for new attributes once the limit has been reached will be // aggregated into a single aggregate for the "otel.metric.overflow" @@ -61,12 +50,12 @@ type Builder[N int64 | float64] struct { AggregationLimit int } -func (b Builder[N]) resFunc() func() exemplar.Reservoir[N] { +func (b Builder[N]) resFunc() func() exemplar.FilteredReservoir[N] { if b.ReservoirFunc != nil { return b.ReservoirFunc } - return exemplar.Drop[N] + return exemplar.Drop } type fltrMeasure[N int64 | float64] func(ctx context.Context, value N, fltrAttr attribute.Set, droppedAttr []attribute.KeyValue) @@ -85,21 +74,26 @@ func (b Builder[N]) filter(f fltrMeasure[N]) Measure[N] { } // LastValue returns a last-value aggregate function input and output. -// -// The Builder.Temporality is ignored and delta is use always. func (b Builder[N]) LastValue() (Measure[N], ComputeAggregation) { - // Delta temporality is the only temporality that makes semantic sense for - // a last-value aggregate. lv := newLastValue[N](b.AggregationLimit, b.resFunc()) + switch b.Temporality { + case metricdata.DeltaTemporality: + return b.filter(lv.measure), lv.delta + default: + return b.filter(lv.measure), lv.cumulative + } +} - return b.filter(lv.measure), func(dest *metricdata.Aggregation) int { - // Ignore if dest is not a metricdata.Gauge. The chance for memory - // reuse of the DataPoints is missed (better luck next time). - gData, _ := (*dest).(metricdata.Gauge[N]) - lv.computeAggregation(&gData.DataPoints) - *dest = gData - - return len(gData.DataPoints) +// PrecomputedLastValue returns a last-value aggregate function input and +// output. The aggregation returned from the returned ComputeAggregation +// function will always only return values from the previous collection cycle. +func (b Builder[N]) PrecomputedLastValue() (Measure[N], ComputeAggregation) { + lv := newPrecomputedLastValue[N](b.AggregationLimit, b.resFunc()) + switch b.Temporality { + case metricdata.DeltaTemporality: + return b.filter(lv.measure), lv.delta + default: + return b.filter(lv.measure), lv.cumulative } } diff --git a/vendor/go.opentelemetry.io/otel/sdk/metric/internal/aggregate/doc.go b/vendor/go.opentelemetry.io/otel/sdk/metric/internal/aggregate/doc.go index e83a2693f..7b7225e6e 100644 --- a/vendor/go.opentelemetry.io/otel/sdk/metric/internal/aggregate/doc.go +++ b/vendor/go.opentelemetry.io/otel/sdk/metric/internal/aggregate/doc.go @@ -1,16 +1,5 @@ // Copyright The OpenTelemetry Authors -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. +// SPDX-License-Identifier: Apache-2.0 // Package aggregate provides aggregate types used compute aggregations and // cycle the state of metric measurements made by the SDK. These types and diff --git a/vendor/go.opentelemetry.io/otel/sdk/metric/internal/aggregate/exemplar.go b/vendor/go.opentelemetry.io/otel/sdk/metric/internal/aggregate/exemplar.go new file mode 100644 index 000000000..170ae8e58 --- /dev/null +++ b/vendor/go.opentelemetry.io/otel/sdk/metric/internal/aggregate/exemplar.go @@ -0,0 +1,42 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package aggregate // import "go.opentelemetry.io/otel/sdk/metric/internal/aggregate" + +import ( + "sync" + + "go.opentelemetry.io/otel/sdk/metric/internal/exemplar" + "go.opentelemetry.io/otel/sdk/metric/metricdata" +) + +var exemplarPool = sync.Pool{ + New: func() any { return new([]exemplar.Exemplar) }, +} + +func collectExemplars[N int64 | float64](out *[]metricdata.Exemplar[N], f func(*[]exemplar.Exemplar)) { + dest := exemplarPool.Get().(*[]exemplar.Exemplar) + defer func() { + *dest = (*dest)[:0] + exemplarPool.Put(dest) + }() + + *dest = reset(*dest, len(*out), cap(*out)) + + f(dest) + + *out = reset(*out, len(*dest), cap(*dest)) + for i, e := range *dest { + (*out)[i].FilteredAttributes = e.FilteredAttributes + (*out)[i].Time = e.Time + (*out)[i].SpanID = e.SpanID + (*out)[i].TraceID = e.TraceID + + switch e.Value.Type() { + case exemplar.Int64ValueType: + (*out)[i].Value = N(e.Value.Int64()) + case exemplar.Float64ValueType: + (*out)[i].Value = N(e.Value.Float64()) + } + } +} diff --git a/vendor/go.opentelemetry.io/otel/sdk/metric/internal/aggregate/exponential_histogram.go b/vendor/go.opentelemetry.io/otel/sdk/metric/internal/aggregate/exponential_histogram.go index 4139a6d15..707342408 100644 --- a/vendor/go.opentelemetry.io/otel/sdk/metric/internal/aggregate/exponential_histogram.go +++ b/vendor/go.opentelemetry.io/otel/sdk/metric/internal/aggregate/exponential_histogram.go @@ -1,16 +1,5 @@ // Copyright The OpenTelemetry Authors -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. +// SPDX-License-Identifier: Apache-2.0 package aggregate // import "go.opentelemetry.io/otel/sdk/metric/internal/aggregate" @@ -41,7 +30,8 @@ const ( // expoHistogramDataPoint is a single data point in an exponential histogram. type expoHistogramDataPoint[N int64 | float64] struct { - res exemplar.Reservoir[N] + attrs attribute.Set + res exemplar.FilteredReservoir[N] count uint64 min N @@ -52,14 +42,14 @@ type expoHistogramDataPoint[N int64 | float64] struct { noMinMax bool noSum bool - scale int + scale int32 posBuckets expoBuckets negBuckets expoBuckets zeroCount uint64 } -func newExpoHistogramDataPoint[N int64 | float64](maxSize, maxScale int, noMinMax, noSum bool) *expoHistogramDataPoint[N] { +func newExpoHistogramDataPoint[N int64 | float64](attrs attribute.Set, maxSize int, maxScale int32, noMinMax, noSum bool) *expoHistogramDataPoint[N] { f := math.MaxFloat64 max := N(f) // if N is int64, max will overflow to -9223372036854775808 min := N(-f) @@ -68,6 +58,7 @@ func newExpoHistogramDataPoint[N int64 | float64](maxSize, maxScale int, noMinMa min = N(minInt64) } return &expoHistogramDataPoint[N]{ + attrs: attrs, min: max, max: min, maxSize: maxSize, @@ -128,11 +119,13 @@ func (p *expoHistogramDataPoint[N]) record(v N) { } // getBin returns the bin v should be recorded into. -func (p *expoHistogramDataPoint[N]) getBin(v float64) int { - frac, exp := math.Frexp(v) +func (p *expoHistogramDataPoint[N]) getBin(v float64) int32 { + frac, expInt := math.Frexp(v) + // 11-bit exponential. + exp := int32(expInt) // nolint: gosec if p.scale <= 0 { // Because of the choice of fraction is always 1 power of two higher than we want. - correction := 1 + var correction int32 = 1 if frac == .5 { // If v is an exact power of two the frac will be .5 and the exp // will be one higher than we want. @@ -140,7 +133,7 @@ func (p *expoHistogramDataPoint[N]) getBin(v float64) int { } return (exp - correction) >> (-p.scale) } - return exp<<p.scale + int(math.Log(frac)*scaleFactors[p.scale]) - 1 + return exp<<p.scale + int32(math.Log(frac)*scaleFactors[p.scale]) - 1 } // scaleFactors are constants used in calculating the logarithm index. They are @@ -171,20 +164,20 @@ var scaleFactors = [21]float64{ // scaleChange returns the magnitude of the scale change needed to fit bin in // the bucket. If no scale change is needed 0 is returned. -func (p *expoHistogramDataPoint[N]) scaleChange(bin, startBin, length int) int { +func (p *expoHistogramDataPoint[N]) scaleChange(bin, startBin int32, length int) int32 { if length == 0 { // No need to rescale if there are no buckets. return 0 } - low := startBin - high := bin + low := int(startBin) + high := int(bin) if startBin >= bin { - low = bin - high = startBin + length - 1 + low = int(bin) + high = int(startBin) + length - 1 } - count := 0 + var count int32 for high-low >= p.maxSize { low = low >> 1 high = high >> 1 @@ -198,39 +191,39 @@ func (p *expoHistogramDataPoint[N]) scaleChange(bin, startBin, length int) int { // expoBuckets is a set of buckets in an exponential histogram. type expoBuckets struct { - startBin int + startBin int32 counts []uint64 } // record increments the count for the given bin, and expands the buckets if needed. // Size changes must be done before calling this function. -func (b *expoBuckets) record(bin int) { +func (b *expoBuckets) record(bin int32) { if len(b.counts) == 0 { b.counts = []uint64{1} b.startBin = bin return } - endBin := b.startBin + len(b.counts) - 1 + endBin := int(b.startBin) + len(b.counts) - 1 // if the new bin is inside the current range - if bin >= b.startBin && bin <= endBin { + if bin >= b.startBin && int(bin) <= endBin { b.counts[bin-b.startBin]++ return } // if the new bin is before the current start add spaces to the counts if bin < b.startBin { origLen := len(b.counts) - newLength := endBin - bin + 1 + newLength := endBin - int(bin) + 1 shift := b.startBin - bin if newLength > cap(b.counts) { b.counts = append(b.counts, make([]uint64, newLength-len(b.counts))...) } - copy(b.counts[shift:origLen+shift], b.counts[:]) + copy(b.counts[shift:origLen+int(shift)], b.counts[:]) b.counts = b.counts[:newLength] - for i := 1; i < shift; i++ { + for i := 1; i < int(shift); i++ { b.counts[i] = 0 } b.startBin = bin @@ -238,17 +231,17 @@ func (b *expoBuckets) record(bin int) { return } // if the new is after the end add spaces to the end - if bin > endBin { - if bin-b.startBin < cap(b.counts) { + if int(bin) > endBin { + if int(bin-b.startBin) < cap(b.counts) { b.counts = b.counts[:bin-b.startBin+1] - for i := endBin + 1 - b.startBin; i < len(b.counts); i++ { + for i := endBin + 1 - int(b.startBin); i < len(b.counts); i++ { b.counts[i] = 0 } b.counts[bin-b.startBin] = 1 return } - end := make([]uint64, bin-b.startBin-len(b.counts)+1) + end := make([]uint64, int(bin-b.startBin)-len(b.counts)+1) b.counts = append(b.counts, end...) b.counts[bin-b.startBin] = 1 } @@ -256,7 +249,7 @@ func (b *expoBuckets) record(bin int) { // downscale shrinks a bucket by a factor of 2*s. It will sum counts into the // correct lower resolution bucket. -func (b *expoBuckets) downscale(delta int) { +func (b *expoBuckets) downscale(delta int32) { // Example // delta = 2 // Original offset: -6 @@ -271,19 +264,19 @@ func (b *expoBuckets) downscale(delta int) { return } - steps := 1 << delta + steps := int32(1) << delta offset := b.startBin % steps offset = (offset + steps) % steps // to make offset positive for i := 1; i < len(b.counts); i++ { - idx := i + offset - if idx%steps == 0 { - b.counts[idx/steps] = b.counts[i] + idx := i + int(offset) + if idx%int(steps) == 0 { + b.counts[idx/int(steps)] = b.counts[i] continue } - b.counts[idx/steps] += b.counts[i] + b.counts[idx/int(steps)] += b.counts[i] } - lastIdx := (len(b.counts) - 1 + offset) / steps + lastIdx := (len(b.counts) - 1 + int(offset)) / int(steps) b.counts = b.counts[:lastIdx+1] b.startBin = b.startBin >> delta } @@ -291,16 +284,16 @@ func (b *expoBuckets) downscale(delta int) { // newExponentialHistogram returns an Aggregator that summarizes a set of // measurements as an exponential histogram. Each histogram is scoped by attributes // and the aggregation cycle the measurements were made in. -func newExponentialHistogram[N int64 | float64](maxSize, maxScale int32, noMinMax, noSum bool, limit int, r func() exemplar.Reservoir[N]) *expoHistogram[N] { +func newExponentialHistogram[N int64 | float64](maxSize, maxScale int32, noMinMax, noSum bool, limit int, r func() exemplar.FilteredReservoir[N]) *expoHistogram[N] { return &expoHistogram[N]{ noSum: noSum, noMinMax: noMinMax, maxSize: int(maxSize), - maxScale: int(maxScale), + maxScale: maxScale, newRes: r, limit: newLimiter[*expoHistogramDataPoint[N]](limit), - values: make(map[attribute.Set]*expoHistogramDataPoint[N]), + values: make(map[attribute.Distinct]*expoHistogramDataPoint[N]), start: now(), } @@ -312,11 +305,11 @@ type expoHistogram[N int64 | float64] struct { noSum bool noMinMax bool maxSize int - maxScale int + maxScale int32 - newRes func() exemplar.Reservoir[N] + newRes func() exemplar.FilteredReservoir[N] limit limiter[*expoHistogramDataPoint[N]] - values map[attribute.Set]*expoHistogramDataPoint[N] + values map[attribute.Distinct]*expoHistogramDataPoint[N] valuesMu sync.Mutex start time.Time @@ -328,21 +321,19 @@ func (e *expoHistogram[N]) measure(ctx context.Context, value N, fltrAttr attrib return } - t := now() - e.valuesMu.Lock() defer e.valuesMu.Unlock() attr := e.limit.Attributes(fltrAttr, e.values) - v, ok := e.values[attr] + v, ok := e.values[attr.Equivalent()] if !ok { - v = newExpoHistogramDataPoint[N](e.maxSize, e.maxScale, e.noMinMax, e.noSum) + v = newExpoHistogramDataPoint[N](attr, e.maxSize, e.maxScale, e.noMinMax, e.noSum) v.res = e.newRes() - e.values[attr] = v + e.values[attr.Equivalent()] = v } v.record(value) - v.res.Offer(ctx, t, value, droppedAttr) + v.res.Offer(ctx, value, droppedAttr) } func (e *expoHistogram[N]) delta(dest *metricdata.Aggregation) int { @@ -360,36 +351,38 @@ func (e *expoHistogram[N]) delta(dest *metricdata.Aggregation) int { hDPts := reset(h.DataPoints, n, n) var i int - for a, b := range e.values { - hDPts[i].Attributes = a + for _, val := range e.values { + hDPts[i].Attributes = val.attrs hDPts[i].StartTime = e.start hDPts[i].Time = t - hDPts[i].Count = b.count - hDPts[i].Scale = int32(b.scale) - hDPts[i].ZeroCount = b.zeroCount + hDPts[i].Count = val.count + hDPts[i].Scale = val.scale + hDPts[i].ZeroCount = val.zeroCount hDPts[i].ZeroThreshold = 0.0 - hDPts[i].PositiveBucket.Offset = int32(b.posBuckets.startBin) - hDPts[i].PositiveBucket.Counts = reset(hDPts[i].PositiveBucket.Counts, len(b.posBuckets.counts), len(b.posBuckets.counts)) - copy(hDPts[i].PositiveBucket.Counts, b.posBuckets.counts) + hDPts[i].PositiveBucket.Offset = val.posBuckets.startBin + hDPts[i].PositiveBucket.Counts = reset(hDPts[i].PositiveBucket.Counts, len(val.posBuckets.counts), len(val.posBuckets.counts)) + copy(hDPts[i].PositiveBucket.Counts, val.posBuckets.counts) - hDPts[i].NegativeBucket.Offset = int32(b.negBuckets.startBin) - hDPts[i].NegativeBucket.Counts = reset(hDPts[i].NegativeBucket.Counts, len(b.negBuckets.counts), len(b.negBuckets.counts)) - copy(hDPts[i].NegativeBucket.Counts, b.negBuckets.counts) + hDPts[i].NegativeBucket.Offset = val.negBuckets.startBin + hDPts[i].NegativeBucket.Counts = reset(hDPts[i].NegativeBucket.Counts, len(val.negBuckets.counts), len(val.negBuckets.counts)) + copy(hDPts[i].NegativeBucket.Counts, val.negBuckets.counts) if !e.noSum { - hDPts[i].Sum = b.sum + hDPts[i].Sum = val.sum } if !e.noMinMax { - hDPts[i].Min = metricdata.NewExtrema(b.min) - hDPts[i].Max = metricdata.NewExtrema(b.max) + hDPts[i].Min = metricdata.NewExtrema(val.min) + hDPts[i].Max = metricdata.NewExtrema(val.max) } - b.res.Collect(&hDPts[i].Exemplars) + collectExemplars(&hDPts[i].Exemplars, val.res.Collect) - delete(e.values, a) i++ } + // Unused attribute sets do not report. + clear(e.values) + e.start = t h.DataPoints = hDPts *dest = h @@ -411,32 +404,32 @@ func (e *expoHistogram[N]) cumulative(dest *metricdata.Aggregation) int { hDPts := reset(h.DataPoints, n, n) var i int - for a, b := range e.values { - hDPts[i].Attributes = a + for _, val := range e.values { + hDPts[i].Attributes = val.attrs hDPts[i].StartTime = e.start hDPts[i].Time = t - hDPts[i].Count = b.count - hDPts[i].Scale = int32(b.scale) - hDPts[i].ZeroCount = b.zeroCount + hDPts[i].Count = val.count + hDPts[i].Scale = val.scale + hDPts[i].ZeroCount = val.zeroCount hDPts[i].ZeroThreshold = 0.0 - hDPts[i].PositiveBucket.Offset = int32(b.posBuckets.startBin) - hDPts[i].PositiveBucket.Counts = reset(hDPts[i].PositiveBucket.Counts, len(b.posBuckets.counts), len(b.posBuckets.counts)) - copy(hDPts[i].PositiveBucket.Counts, b.posBuckets.counts) + hDPts[i].PositiveBucket.Offset = val.posBuckets.startBin + hDPts[i].PositiveBucket.Counts = reset(hDPts[i].PositiveBucket.Counts, len(val.posBuckets.counts), len(val.posBuckets.counts)) + copy(hDPts[i].PositiveBucket.Counts, val.posBuckets.counts) - hDPts[i].NegativeBucket.Offset = int32(b.negBuckets.startBin) - hDPts[i].NegativeBucket.Counts = reset(hDPts[i].NegativeBucket.Counts, len(b.negBuckets.counts), len(b.negBuckets.counts)) - copy(hDPts[i].NegativeBucket.Counts, b.negBuckets.counts) + hDPts[i].NegativeBucket.Offset = val.negBuckets.startBin + hDPts[i].NegativeBucket.Counts = reset(hDPts[i].NegativeBucket.Counts, len(val.negBuckets.counts), len(val.negBuckets.counts)) + copy(hDPts[i].NegativeBucket.Counts, val.negBuckets.counts) if !e.noSum { - hDPts[i].Sum = b.sum + hDPts[i].Sum = val.sum } if !e.noMinMax { - hDPts[i].Min = metricdata.NewExtrema(b.min) - hDPts[i].Max = metricdata.NewExtrema(b.max) + hDPts[i].Min = metricdata.NewExtrema(val.min) + hDPts[i].Max = metricdata.NewExtrema(val.max) } - b.res.Collect(&hDPts[i].Exemplars) + collectExemplars(&hDPts[i].Exemplars, val.res.Collect) i++ // TODO (#3006): This will use an unbounded amount of memory if there diff --git a/vendor/go.opentelemetry.io/otel/sdk/metric/internal/aggregate/histogram.go b/vendor/go.opentelemetry.io/otel/sdk/metric/internal/aggregate/histogram.go index a9a4706bf..ade0941f5 100644 --- a/vendor/go.opentelemetry.io/otel/sdk/metric/internal/aggregate/histogram.go +++ b/vendor/go.opentelemetry.io/otel/sdk/metric/internal/aggregate/histogram.go @@ -1,21 +1,11 @@ // Copyright The OpenTelemetry Authors -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. +// SPDX-License-Identifier: Apache-2.0 package aggregate // import "go.opentelemetry.io/otel/sdk/metric/internal/aggregate" import ( "context" + "slices" "sort" "sync" "time" @@ -26,7 +16,8 @@ import ( ) type buckets[N int64 | float64] struct { - res exemplar.Reservoir[N] + attrs attribute.Set + res exemplar.FilteredReservoir[N] counts []uint64 count uint64 @@ -35,8 +26,8 @@ type buckets[N int64 | float64] struct { } // newBuckets returns buckets with n bins. -func newBuckets[N int64 | float64](n int) *buckets[N] { - return &buckets[N]{counts: make([]uint64, n)} +func newBuckets[N int64 | float64](attrs attribute.Set, n int) *buckets[N] { + return &buckets[N]{attrs: attrs, counts: make([]uint64, n)} } func (b *buckets[N]) sum(value N) { b.total += value } @@ -57,26 +48,25 @@ type histValues[N int64 | float64] struct { noSum bool bounds []float64 - newRes func() exemplar.Reservoir[N] + newRes func() exemplar.FilteredReservoir[N] limit limiter[*buckets[N]] - values map[attribute.Set]*buckets[N] + values map[attribute.Distinct]*buckets[N] valuesMu sync.Mutex } -func newHistValues[N int64 | float64](bounds []float64, noSum bool, limit int, r func() exemplar.Reservoir[N]) *histValues[N] { +func newHistValues[N int64 | float64](bounds []float64, noSum bool, limit int, r func() exemplar.FilteredReservoir[N]) *histValues[N] { // The responsibility of keeping all buckets correctly associated with the // passed boundaries is ultimately this type's responsibility. Make a copy // here so we can always guarantee this. Or, in the case of failure, have // complete control over the fix. - b := make([]float64, len(bounds)) - copy(b, bounds) - sort.Float64s(b) + b := slices.Clone(bounds) + slices.Sort(b) return &histValues[N]{ noSum: noSum, bounds: b, newRes: r, limit: newLimiter[*buckets[N]](limit), - values: make(map[attribute.Set]*buckets[N]), + values: make(map[attribute.Distinct]*buckets[N]), } } @@ -90,13 +80,11 @@ func (s *histValues[N]) measure(ctx context.Context, value N, fltrAttr attribute // (s.bounds[len(s.bounds)-1], +∞). idx := sort.SearchFloat64s(s.bounds, float64(value)) - t := now() - s.valuesMu.Lock() defer s.valuesMu.Unlock() attr := s.limit.Attributes(fltrAttr, s.values) - b, ok := s.values[attr] + b, ok := s.values[attr.Equivalent()] if !ok { // N+1 buckets. For example: // @@ -105,23 +93,23 @@ func (s *histValues[N]) measure(ctx context.Context, value N, fltrAttr attribute // Then, // // buckets = (-∞, 0], (0, 5.0], (5.0, 10.0], (10.0, +∞) - b = newBuckets[N](len(s.bounds) + 1) + b = newBuckets[N](attr, len(s.bounds)+1) b.res = s.newRes() // Ensure min and max are recorded values (not zero), for new buckets. b.min, b.max = value, value - s.values[attr] = b + s.values[attr.Equivalent()] = b } b.bin(idx, value) if !s.noSum { b.sum(value) } - b.res.Offer(ctx, t, value, droppedAttr) + b.res.Offer(ctx, value, droppedAttr) } // newHistogram returns an Aggregator that summarizes a set of measurements as // an histogram. -func newHistogram[N int64 | float64](boundaries []float64, noMinMax, noSum bool, limit int, r func() exemplar.Reservoir[N]) *histogram[N] { +func newHistogram[N int64 | float64](boundaries []float64, noMinMax, noSum bool, limit int, r func() exemplar.FilteredReservoir[N]) *histogram[N] { return &histogram[N]{ histValues: newHistValues[N](boundaries, noSum, limit, r), noMinMax: noMinMax, @@ -150,36 +138,35 @@ func (s *histogram[N]) delta(dest *metricdata.Aggregation) int { defer s.valuesMu.Unlock() // Do not allow modification of our copy of bounds. - bounds := make([]float64, len(s.bounds)) - copy(bounds, s.bounds) + bounds := slices.Clone(s.bounds) n := len(s.values) hDPts := reset(h.DataPoints, n, n) var i int - for a, b := range s.values { - hDPts[i].Attributes = a + for _, val := range s.values { + hDPts[i].Attributes = val.attrs hDPts[i].StartTime = s.start hDPts[i].Time = t - hDPts[i].Count = b.count + hDPts[i].Count = val.count hDPts[i].Bounds = bounds - hDPts[i].BucketCounts = b.counts + hDPts[i].BucketCounts = val.counts if !s.noSum { - hDPts[i].Sum = b.total + hDPts[i].Sum = val.total } if !s.noMinMax { - hDPts[i].Min = metricdata.NewExtrema(b.min) - hDPts[i].Max = metricdata.NewExtrema(b.max) + hDPts[i].Min = metricdata.NewExtrema(val.min) + hDPts[i].Max = metricdata.NewExtrema(val.max) } - b.res.Collect(&hDPts[i].Exemplars) + collectExemplars(&hDPts[i].Exemplars, val.res.Collect) - // Unused attribute sets do not report. - delete(s.values, a) i++ } + // Unused attribute sets do not report. + clear(s.values) // The delta collection cycle resets. s.start = t @@ -201,39 +188,36 @@ func (s *histogram[N]) cumulative(dest *metricdata.Aggregation) int { defer s.valuesMu.Unlock() // Do not allow modification of our copy of bounds. - bounds := make([]float64, len(s.bounds)) - copy(bounds, s.bounds) + bounds := slices.Clone(s.bounds) n := len(s.values) hDPts := reset(h.DataPoints, n, n) var i int - for a, b := range s.values { + for _, val := range s.values { + hDPts[i].Attributes = val.attrs + hDPts[i].StartTime = s.start + hDPts[i].Time = t + hDPts[i].Count = val.count + hDPts[i].Bounds = bounds + // The HistogramDataPoint field values returned need to be copies of // the buckets value as we will keep updating them. // // TODO (#3047): Making copies for bounds and counts incurs a large // memory allocation footprint. Alternatives should be explored. - counts := make([]uint64, len(b.counts)) - copy(counts, b.counts) - - hDPts[i].Attributes = a - hDPts[i].StartTime = s.start - hDPts[i].Time = t - hDPts[i].Count = b.count - hDPts[i].Bounds = bounds - hDPts[i].BucketCounts = counts + hDPts[i].BucketCounts = slices.Clone(val.counts) if !s.noSum { - hDPts[i].Sum = b.total + hDPts[i].Sum = val.total } if !s.noMinMax { - hDPts[i].Min = metricdata.NewExtrema(b.min) - hDPts[i].Max = metricdata.NewExtrema(b.max) + hDPts[i].Min = metricdata.NewExtrema(val.min) + hDPts[i].Max = metricdata.NewExtrema(val.max) } - b.res.Collect(&hDPts[i].Exemplars) + collectExemplars(&hDPts[i].Exemplars, val.res.Collect) i++ // TODO (#3006): This will use an unbounded amount of memory if there diff --git a/vendor/go.opentelemetry.io/otel/sdk/metric/internal/aggregate/lastvalue.go b/vendor/go.opentelemetry.io/otel/sdk/metric/internal/aggregate/lastvalue.go index 5699e728f..c35936840 100644 --- a/vendor/go.opentelemetry.io/otel/sdk/metric/internal/aggregate/lastvalue.go +++ b/vendor/go.opentelemetry.io/otel/sdk/metric/internal/aggregate/lastvalue.go @@ -1,16 +1,5 @@ // Copyright The OpenTelemetry Authors -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. +// SPDX-License-Identifier: Apache-2.0 package aggregate // import "go.opentelemetry.io/otel/sdk/metric/internal/aggregate" @@ -26,16 +15,17 @@ import ( // datapoint is timestamped measurement data. type datapoint[N int64 | float64] struct { - timestamp time.Time - value N - res exemplar.Reservoir[N] + attrs attribute.Set + value N + res exemplar.FilteredReservoir[N] } -func newLastValue[N int64 | float64](limit int, r func() exemplar.Reservoir[N]) *lastValue[N] { +func newLastValue[N int64 | float64](limit int, r func() exemplar.FilteredReservoir[N]) *lastValue[N] { return &lastValue[N]{ newRes: r, limit: newLimiter[datapoint[N]](limit), - values: make(map[attribute.Set]datapoint[N]), + values: make(map[attribute.Distinct]datapoint[N]), + start: now(), } } @@ -43,47 +33,130 @@ func newLastValue[N int64 | float64](limit int, r func() exemplar.Reservoir[N]) type lastValue[N int64 | float64] struct { sync.Mutex - newRes func() exemplar.Reservoir[N] + newRes func() exemplar.FilteredReservoir[N] limit limiter[datapoint[N]] - values map[attribute.Set]datapoint[N] + values map[attribute.Distinct]datapoint[N] + start time.Time } func (s *lastValue[N]) measure(ctx context.Context, value N, fltrAttr attribute.Set, droppedAttr []attribute.KeyValue) { - t := now() - s.Lock() defer s.Unlock() attr := s.limit.Attributes(fltrAttr, s.values) - d, ok := s.values[attr] + d, ok := s.values[attr.Equivalent()] if !ok { d.res = s.newRes() } - d.timestamp = t + d.attrs = attr d.value = value - d.res.Offer(ctx, t, value, droppedAttr) + d.res.Offer(ctx, value, droppedAttr) - s.values[attr] = d + s.values[attr.Equivalent()] = d } -func (s *lastValue[N]) computeAggregation(dest *[]metricdata.DataPoint[N]) { +func (s *lastValue[N]) delta(dest *metricdata.Aggregation) int { + t := now() + // Ignore if dest is not a metricdata.Gauge. The chance for memory reuse of + // the DataPoints is missed (better luck next time). + gData, _ := (*dest).(metricdata.Gauge[N]) + s.Lock() defer s.Unlock() + n := s.copyDpts(&gData.DataPoints, t) + // Do not report stale values. + clear(s.values) + // Update start time for delta temporality. + s.start = t + + *dest = gData + + return n +} + +func (s *lastValue[N]) cumulative(dest *metricdata.Aggregation) int { + t := now() + // Ignore if dest is not a metricdata.Gauge. The chance for memory reuse of + // the DataPoints is missed (better luck next time). + gData, _ := (*dest).(metricdata.Gauge[N]) + + s.Lock() + defer s.Unlock() + + n := s.copyDpts(&gData.DataPoints, t) + // TODO (#3006): This will use an unbounded amount of memory if there + // are unbounded number of attribute sets being aggregated. Attribute + // sets that become "stale" need to be forgotten so this will not + // overload the system. + *dest = gData + + return n +} + +// copyDpts copies the datapoints held by s into dest. The number of datapoints +// copied is returned. +func (s *lastValue[N]) copyDpts(dest *[]metricdata.DataPoint[N], t time.Time) int { n := len(s.values) *dest = reset(*dest, n, n) var i int - for a, v := range s.values { - (*dest)[i].Attributes = a - // The event time is the only meaningful timestamp, StartTime is - // ignored. - (*dest)[i].Time = v.timestamp + for _, v := range s.values { + (*dest)[i].Attributes = v.attrs + (*dest)[i].StartTime = s.start + (*dest)[i].Time = t (*dest)[i].Value = v.value - v.res.Collect(&(*dest)[i].Exemplars) - // Do not report stale values. - delete(s.values, a) + collectExemplars(&(*dest)[i].Exemplars, v.res.Collect) i++ } + return n +} + +// newPrecomputedLastValue returns an aggregator that summarizes a set of +// observations as the last one made. +func newPrecomputedLastValue[N int64 | float64](limit int, r func() exemplar.FilteredReservoir[N]) *precomputedLastValue[N] { + return &precomputedLastValue[N]{lastValue: newLastValue[N](limit, r)} +} + +// precomputedLastValue summarizes a set of observations as the last one made. +type precomputedLastValue[N int64 | float64] struct { + *lastValue[N] +} + +func (s *precomputedLastValue[N]) delta(dest *metricdata.Aggregation) int { + t := now() + // Ignore if dest is not a metricdata.Gauge. The chance for memory reuse of + // the DataPoints is missed (better luck next time). + gData, _ := (*dest).(metricdata.Gauge[N]) + + s.Lock() + defer s.Unlock() + + n := s.copyDpts(&gData.DataPoints, t) + // Do not report stale values. + clear(s.values) + // Update start time for delta temporality. + s.start = t + + *dest = gData + + return n +} + +func (s *precomputedLastValue[N]) cumulative(dest *metricdata.Aggregation) int { + t := now() + // Ignore if dest is not a metricdata.Gauge. The chance for memory reuse of + // the DataPoints is missed (better luck next time). + gData, _ := (*dest).(metricdata.Gauge[N]) + + s.Lock() + defer s.Unlock() + + n := s.copyDpts(&gData.DataPoints, t) + // Do not report stale values. + clear(s.values) + *dest = gData + + return n } diff --git a/vendor/go.opentelemetry.io/otel/sdk/metric/internal/aggregate/limit.go b/vendor/go.opentelemetry.io/otel/sdk/metric/internal/aggregate/limit.go index d3de84272..9ea0251ed 100644 --- a/vendor/go.opentelemetry.io/otel/sdk/metric/internal/aggregate/limit.go +++ b/vendor/go.opentelemetry.io/otel/sdk/metric/internal/aggregate/limit.go @@ -1,16 +1,5 @@ // Copyright The OpenTelemetry Authors -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. +// SPDX-License-Identifier: Apache-2.0 package aggregate // import "go.opentelemetry.io/otel/sdk/metric/internal/aggregate" @@ -41,9 +30,9 @@ func newLimiter[V any](aggregation int) limiter[V] { // aggregation cardinality limit for the existing measurements. If it will, // overflowSet is returned. Otherwise, if it will not exceed the limit, or the // limit is not set (limit <= 0), attr is returned. -func (l limiter[V]) Attributes(attrs attribute.Set, measurements map[attribute.Set]V) attribute.Set { +func (l limiter[V]) Attributes(attrs attribute.Set, measurements map[attribute.Distinct]V) attribute.Set { if l.aggLimit > 0 { - _, exists := measurements[attrs] + _, exists := measurements[attrs.Equivalent()] if !exists && len(measurements) >= l.aggLimit-1 { return overflowSet } diff --git a/vendor/go.opentelemetry.io/otel/sdk/metric/internal/aggregate/sum.go b/vendor/go.opentelemetry.io/otel/sdk/metric/internal/aggregate/sum.go index 02de2483f..891366922 100644 --- a/vendor/go.opentelemetry.io/otel/sdk/metric/internal/aggregate/sum.go +++ b/vendor/go.opentelemetry.io/otel/sdk/metric/internal/aggregate/sum.go @@ -1,16 +1,5 @@ // Copyright The OpenTelemetry Authors -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. +// SPDX-License-Identifier: Apache-2.0 package aggregate // import "go.opentelemetry.io/otel/sdk/metric/internal/aggregate" @@ -25,48 +14,48 @@ import ( ) type sumValue[N int64 | float64] struct { - n N - res exemplar.Reservoir[N] + n N + res exemplar.FilteredReservoir[N] + attrs attribute.Set } // valueMap is the storage for sums. type valueMap[N int64 | float64] struct { sync.Mutex - newRes func() exemplar.Reservoir[N] + newRes func() exemplar.FilteredReservoir[N] limit limiter[sumValue[N]] - values map[attribute.Set]sumValue[N] + values map[attribute.Distinct]sumValue[N] } -func newValueMap[N int64 | float64](limit int, r func() exemplar.Reservoir[N]) *valueMap[N] { +func newValueMap[N int64 | float64](limit int, r func() exemplar.FilteredReservoir[N]) *valueMap[N] { return &valueMap[N]{ newRes: r, limit: newLimiter[sumValue[N]](limit), - values: make(map[attribute.Set]sumValue[N]), + values: make(map[attribute.Distinct]sumValue[N]), } } func (s *valueMap[N]) measure(ctx context.Context, value N, fltrAttr attribute.Set, droppedAttr []attribute.KeyValue) { - t := now() - s.Lock() defer s.Unlock() attr := s.limit.Attributes(fltrAttr, s.values) - v, ok := s.values[attr] + v, ok := s.values[attr.Equivalent()] if !ok { v.res = s.newRes() } + v.attrs = attr v.n += value - v.res.Offer(ctx, t, value, droppedAttr) + v.res.Offer(ctx, value, droppedAttr) - s.values[attr] = v + s.values[attr.Equivalent()] = v } // newSum returns an aggregator that summarizes a set of measurements as their // arithmetic sum. Each sum is scoped by attributes and the aggregation cycle // the measurements were made in. -func newSum[N int64 | float64](monotonic bool, limit int, r func() exemplar.Reservoir[N]) *sum[N] { +func newSum[N int64 | float64](monotonic bool, limit int, r func() exemplar.FilteredReservoir[N]) *sum[N] { return &sum[N]{ valueMap: newValueMap[N](limit, r), monotonic: monotonic, @@ -98,16 +87,16 @@ func (s *sum[N]) delta(dest *metricdata.Aggregation) int { dPts := reset(sData.DataPoints, n, n) var i int - for attr, val := range s.values { - dPts[i].Attributes = attr + for _, val := range s.values { + dPts[i].Attributes = val.attrs dPts[i].StartTime = s.start dPts[i].Time = t dPts[i].Value = val.n - val.res.Collect(&dPts[i].Exemplars) - // Do not report stale values. - delete(s.values, attr) + collectExemplars(&dPts[i].Exemplars, val.res.Collect) i++ } + // Do not report stale values. + clear(s.values) // The delta collection cycle resets. s.start = t @@ -133,12 +122,12 @@ func (s *sum[N]) cumulative(dest *metricdata.Aggregation) int { dPts := reset(sData.DataPoints, n, n) var i int - for attr, value := range s.values { - dPts[i].Attributes = attr + for _, value := range s.values { + dPts[i].Attributes = value.attrs dPts[i].StartTime = s.start dPts[i].Time = t dPts[i].Value = value.n - value.res.Collect(&dPts[i].Exemplars) + collectExemplars(&dPts[i].Exemplars, value.res.Collect) // TODO (#3006): This will use an unbounded amount of memory if there // are unbounded number of attribute sets being aggregated. Attribute // sets that become "stale" need to be forgotten so this will not @@ -155,7 +144,7 @@ func (s *sum[N]) cumulative(dest *metricdata.Aggregation) int { // newPrecomputedSum returns an aggregator that summarizes a set of // observatrions as their arithmetic sum. Each sum is scoped by attributes and // the aggregation cycle the measurements were made in. -func newPrecomputedSum[N int64 | float64](monotonic bool, limit int, r func() exemplar.Reservoir[N]) *precomputedSum[N] { +func newPrecomputedSum[N int64 | float64](monotonic bool, limit int, r func() exemplar.FilteredReservoir[N]) *precomputedSum[N] { return &precomputedSum[N]{ valueMap: newValueMap[N](limit, r), monotonic: monotonic, @@ -170,12 +159,12 @@ type precomputedSum[N int64 | float64] struct { monotonic bool start time.Time - reported map[attribute.Set]N + reported map[attribute.Distinct]N } func (s *precomputedSum[N]) delta(dest *metricdata.Aggregation) int { t := now() - newReported := make(map[attribute.Set]N) + newReported := make(map[attribute.Distinct]N) // If *dest is not a metricdata.Sum, memory reuse is missed. In that case, // use the zero-value sData and hope for better alignment next cycle. @@ -190,21 +179,20 @@ func (s *precomputedSum[N]) delta(dest *metricdata.Aggregation) int { dPts := reset(sData.DataPoints, n, n) var i int - for attr, value := range s.values { - delta := value.n - s.reported[attr] + for key, value := range s.values { + delta := value.n - s.reported[key] - dPts[i].Attributes = attr + dPts[i].Attributes = value.attrs dPts[i].StartTime = s.start dPts[i].Time = t dPts[i].Value = delta - value.res.Collect(&dPts[i].Exemplars) + collectExemplars(&dPts[i].Exemplars, value.res.Collect) - newReported[attr] = value.n - // Unused attribute sets do not report. - delete(s.values, attr) + newReported[key] = value.n i++ } - // Unused attribute sets are forgotten. + // Unused attribute sets do not report. + clear(s.values) s.reported = newReported // The delta collection cycle resets. s.start = t @@ -231,17 +219,17 @@ func (s *precomputedSum[N]) cumulative(dest *metricdata.Aggregation) int { dPts := reset(sData.DataPoints, n, n) var i int - for attr, val := range s.values { - dPts[i].Attributes = attr + for _, val := range s.values { + dPts[i].Attributes = val.attrs dPts[i].StartTime = s.start dPts[i].Time = t dPts[i].Value = val.n - val.res.Collect(&dPts[i].Exemplars) + collectExemplars(&dPts[i].Exemplars, val.res.Collect) - // Unused attribute sets do not report. - delete(s.values, attr) i++ } + // Unused attribute sets do not report. + clear(s.values) sData.DataPoints = dPts *dest = sData diff --git a/vendor/go.opentelemetry.io/otel/sdk/metric/internal/exemplar/doc.go b/vendor/go.opentelemetry.io/otel/sdk/metric/internal/exemplar/doc.go index 3caeb542c..5394f48e0 100644 --- a/vendor/go.opentelemetry.io/otel/sdk/metric/internal/exemplar/doc.go +++ b/vendor/go.opentelemetry.io/otel/sdk/metric/internal/exemplar/doc.go @@ -1,16 +1,5 @@ // Copyright The OpenTelemetry Authors -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. +// SPDX-License-Identifier: Apache-2.0 // Package exemplar provides an implementation of the OpenTelemetry exemplar // reservoir to be used in metric collection pipelines. diff --git a/vendor/go.opentelemetry.io/otel/sdk/metric/internal/exemplar/drop.go b/vendor/go.opentelemetry.io/otel/sdk/metric/internal/exemplar/drop.go index 39bf37b9e..5a0f39ae1 100644 --- a/vendor/go.opentelemetry.io/otel/sdk/metric/internal/exemplar/drop.go +++ b/vendor/go.opentelemetry.io/otel/sdk/metric/internal/exemplar/drop.go @@ -1,36 +1,23 @@ // Copyright The OpenTelemetry Authors -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. +// SPDX-License-Identifier: Apache-2.0 package exemplar // import "go.opentelemetry.io/otel/sdk/metric/internal/exemplar" import ( "context" - "time" "go.opentelemetry.io/otel/attribute" - "go.opentelemetry.io/otel/sdk/metric/metricdata" ) -// Drop returns a [Reservoir] that drops all measurements it is offered. -func Drop[N int64 | float64]() Reservoir[N] { return &dropRes[N]{} } +// Drop returns a [FilteredReservoir] that drops all measurements it is offered. +func Drop[N int64 | float64]() FilteredReservoir[N] { return &dropRes[N]{} } type dropRes[N int64 | float64] struct{} // Offer does nothing, all measurements offered will be dropped. -func (r *dropRes[N]) Offer(context.Context, time.Time, N, []attribute.KeyValue) {} +func (r *dropRes[N]) Offer(context.Context, N, []attribute.KeyValue) {} // Collect resets dest. No exemplars will ever be returned. -func (r *dropRes[N]) Collect(dest *[]metricdata.Exemplar[N]) { +func (r *dropRes[N]) Collect(dest *[]Exemplar) { *dest = (*dest)[:0] } diff --git a/vendor/go.opentelemetry.io/otel/sdk/metric/internal/exemplar/exemplar.go b/vendor/go.opentelemetry.io/otel/sdk/metric/internal/exemplar/exemplar.go new file mode 100644 index 000000000..fcaa6a469 --- /dev/null +++ b/vendor/go.opentelemetry.io/otel/sdk/metric/internal/exemplar/exemplar.go @@ -0,0 +1,29 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package exemplar // import "go.opentelemetry.io/otel/sdk/metric/internal/exemplar" + +import ( + "time" + + "go.opentelemetry.io/otel/attribute" +) + +// Exemplar is a measurement sampled from a timeseries providing a typical +// example. +type Exemplar struct { + // FilteredAttributes are the attributes recorded with the measurement but + // filtered out of the timeseries' aggregated data. + FilteredAttributes []attribute.KeyValue + // Time is the time when the measurement was recorded. + Time time.Time + // Value is the measured value. + Value Value + // SpanID is the ID of the span that was active during the measurement. If + // no span was active or the span was not sampled this will be empty. + SpanID []byte `json:",omitempty"` + // TraceID is the ID of the trace the active span belonged to during the + // measurement. If no span was active or the span was not sampled this will + // be empty. + TraceID []byte `json:",omitempty"` +} diff --git a/vendor/go.opentelemetry.io/otel/sdk/metric/internal/exemplar/filter.go b/vendor/go.opentelemetry.io/otel/sdk/metric/internal/exemplar/filter.go index 4f5946fb9..152a069a0 100644 --- a/vendor/go.opentelemetry.io/otel/sdk/metric/internal/exemplar/filter.go +++ b/vendor/go.opentelemetry.io/otel/sdk/metric/internal/exemplar/filter.go @@ -1,40 +1,29 @@ // Copyright The OpenTelemetry Authors -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. +// SPDX-License-Identifier: Apache-2.0 package exemplar // import "go.opentelemetry.io/otel/sdk/metric/internal/exemplar" import ( "context" - "time" - "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/trace" ) -// SampledFilter returns a [Reservoir] wrapping r that will only offer measurements -// to r if the passed context associated with the measurement contains a sampled -// [go.opentelemetry.io/otel/trace.SpanContext]. -func SampledFilter[N int64 | float64](r Reservoir[N]) Reservoir[N] { - return filtered[N]{Reservoir: r} -} +// Filter determines if a measurement should be offered. +// +// The passed ctx needs to contain any baggage or span that were active +// when the measurement was made. This information may be used by the +// Reservoir in making a sampling decision. +type Filter func(context.Context) bool -type filtered[N int64 | float64] struct { - Reservoir[N] +// SampledFilter is a [Filter] that will only offer measurements +// if the passed context associated with the measurement contains a sampled +// [go.opentelemetry.io/otel/trace.SpanContext]. +func SampledFilter(ctx context.Context) bool { + return trace.SpanContextFromContext(ctx).IsSampled() } -func (f filtered[N]) Offer(ctx context.Context, t time.Time, n N, a []attribute.KeyValue) { - if trace.SpanContextFromContext(ctx).IsSampled() { - f.Reservoir.Offer(ctx, t, n, a) - } +// AlwaysOnFilter is a [Filter] that always offers measurements. +func AlwaysOnFilter(ctx context.Context) bool { + return true } diff --git a/vendor/go.opentelemetry.io/otel/sdk/metric/internal/exemplar/filtered_reservoir.go b/vendor/go.opentelemetry.io/otel/sdk/metric/internal/exemplar/filtered_reservoir.go new file mode 100644 index 000000000..9fedfa4be --- /dev/null +++ b/vendor/go.opentelemetry.io/otel/sdk/metric/internal/exemplar/filtered_reservoir.go @@ -0,0 +1,49 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package exemplar // import "go.opentelemetry.io/otel/sdk/metric/internal/exemplar" + +import ( + "context" + "time" + + "go.opentelemetry.io/otel/attribute" +) + +// FilteredReservoir wraps a [Reservoir] with a filter. +type FilteredReservoir[N int64 | float64] interface { + // Offer accepts the parameters associated with a measurement. The + // parameters will be stored as an exemplar if the filter decides to + // sample the measurement. + // + // The passed ctx needs to contain any baggage or span that were active + // when the measurement was made. This information may be used by the + // Reservoir in making a sampling decision. + Offer(ctx context.Context, val N, attr []attribute.KeyValue) + // Collect returns all the held exemplars in the reservoir. + Collect(dest *[]Exemplar) +} + +// filteredReservoir handles the pre-sampled exemplar of measurements made. +type filteredReservoir[N int64 | float64] struct { + filter Filter + reservoir Reservoir +} + +// NewFilteredReservoir creates a [FilteredReservoir] which only offers values +// that are allowed by the filter. +func NewFilteredReservoir[N int64 | float64](f Filter, r Reservoir) FilteredReservoir[N] { + return &filteredReservoir[N]{ + filter: f, + reservoir: r, + } +} + +func (f *filteredReservoir[N]) Offer(ctx context.Context, val N, attr []attribute.KeyValue) { + if f.filter(ctx) { + // only record the current time if we are sampling this measurment. + f.reservoir.Offer(ctx, time.Now(), NewValue(val), attr) + } +} + +func (f *filteredReservoir[N]) Collect(dest *[]Exemplar) { f.reservoir.Collect(dest) } diff --git a/vendor/go.opentelemetry.io/otel/sdk/metric/internal/exemplar/hist.go b/vendor/go.opentelemetry.io/otel/sdk/metric/internal/exemplar/hist.go index 6f4fe5524..a6ff86d02 100644 --- a/vendor/go.opentelemetry.io/otel/sdk/metric/internal/exemplar/hist.go +++ b/vendor/go.opentelemetry.io/otel/sdk/metric/internal/exemplar/hist.go @@ -1,21 +1,11 @@ // Copyright The OpenTelemetry Authors -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. +// SPDX-License-Identifier: Apache-2.0 package exemplar // import "go.opentelemetry.io/otel/sdk/metric/internal/exemplar" import ( "context" + "slices" "sort" "time" @@ -27,21 +17,30 @@ import ( // by bounds. // // The passed bounds will be sorted by this function. -func Histogram[N int64 | float64](bounds []float64) Reservoir[N] { - sort.Float64s(bounds) - return &histRes[N]{ +func Histogram(bounds []float64) Reservoir { + slices.Sort(bounds) + return &histRes{ bounds: bounds, - storage: newStorage[N](len(bounds) + 1), + storage: newStorage(len(bounds) + 1), } } -type histRes[N int64 | float64] struct { - *storage[N] +type histRes struct { + *storage // bounds are bucket bounds in ascending order. bounds []float64 } -func (r *histRes[N]) Offer(ctx context.Context, t time.Time, n N, a []attribute.KeyValue) { - r.store[sort.SearchFloat64s(r.bounds, float64(n))] = newMeasurement(ctx, t, n, a) +func (r *histRes) Offer(ctx context.Context, t time.Time, v Value, a []attribute.KeyValue) { + var x float64 + switch v.Type() { + case Int64ValueType: + x = float64(v.Int64()) + case Float64ValueType: + x = v.Float64() + default: + panic("unknown value type") + } + r.store[sort.SearchFloat64s(r.bounds, x)] = newMeasurement(ctx, t, v, a) } diff --git a/vendor/go.opentelemetry.io/otel/sdk/metric/internal/exemplar/rand.go b/vendor/go.opentelemetry.io/otel/sdk/metric/internal/exemplar/rand.go index 7f9fda5b4..199a2608f 100644 --- a/vendor/go.opentelemetry.io/otel/sdk/metric/internal/exemplar/rand.go +++ b/vendor/go.opentelemetry.io/otel/sdk/metric/internal/exemplar/rand.go @@ -1,16 +1,5 @@ // Copyright The OpenTelemetry Authors -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. +// SPDX-License-Identifier: Apache-2.0 package exemplar // import "go.opentelemetry.io/otel/sdk/metric/internal/exemplar" @@ -18,17 +7,21 @@ import ( "context" "math" "math/rand" + "sync" "time" "go.opentelemetry.io/otel/attribute" - "go.opentelemetry.io/otel/sdk/metric/metricdata" ) -// rng is used to make sampling decisions. -// -// Do not use crypto/rand. There is no reason for the decrease in performance -// given this is not a security sensitive decision. -var rng = rand.New(rand.NewSource(time.Now().UnixNano())) +var ( + // rng is used to make sampling decisions. + // + // Do not use crypto/rand. There is no reason for the decrease in performance + // given this is not a security sensitive decision. + rng = rand.New(rand.NewSource(time.Now().UnixNano())) + // Ensure concurrent safe accecess to rng and its underlying source. + rngMu sync.Mutex +) // random returns, as a float64, a uniform pseudo-random number in the open // interval (0.0,1.0). @@ -50,6 +43,9 @@ func random() float64 { // // There are likely many other methods to explore here as well. + rngMu.Lock() + defer rngMu.Unlock() + f := rng.Float64() for f == 0 { f = rng.Float64() @@ -61,14 +57,14 @@ func random() float64 { // are k or less measurements made, the Reservoir will sample each one. If // there are more than k, the Reservoir will then randomly sample all // additional measurement with a decreasing probability. -func FixedSize[N int64 | float64](k int) Reservoir[N] { - r := &randRes[N]{storage: newStorage[N](k)} +func FixedSize(k int) Reservoir { + r := &randRes{storage: newStorage(k)} r.reset() return r } -type randRes[N int64 | float64] struct { - *storage[N] +type randRes struct { + *storage // count is the number of measurement seen. count int64 @@ -80,7 +76,7 @@ type randRes[N int64 | float64] struct { w float64 } -func (r *randRes[N]) Offer(ctx context.Context, t time.Time, n N, a []attribute.KeyValue) { +func (r *randRes) Offer(ctx context.Context, t time.Time, n Value, a []attribute.KeyValue) { // The following algorithm is "Algorithm L" from Li, Kim-Hung (4 December // 1994). "Reservoir-Sampling Algorithms of Time Complexity // O(n(1+log(N/n)))". ACM Transactions on Mathematical Software. 20 (4): @@ -136,7 +132,7 @@ func (r *randRes[N]) Offer(ctx context.Context, t time.Time, n N, a []attribute. } // reset resets r to the initial state. -func (r *randRes[N]) reset() { +func (r *randRes) reset() { // This resets the number of exemplars known. r.count = 0 // Random index inserts should only happen after the storage is full. @@ -158,7 +154,7 @@ func (r *randRes[N]) reset() { // advance updates the count at which the offered measurement will overwrite an // existing exemplar. -func (r *randRes[N]) advance() { +func (r *randRes) advance() { // Calculate the next value in the random number series. // // The current value of r.w is based on the max of a distribution of random @@ -185,7 +181,7 @@ func (r *randRes[N]) advance() { r.next += int64(math.Log(random())/math.Log(1-r.w)) + 1 } -func (r *randRes[N]) Collect(dest *[]metricdata.Exemplar[N]) { +func (r *randRes) Collect(dest *[]Exemplar) { r.storage.Collect(dest) // Call reset here even though it will reset r.count and restart the random // number series. This will persist any old exemplars as long as no new diff --git a/vendor/go.opentelemetry.io/otel/sdk/metric/internal/exemplar/reservoir.go b/vendor/go.opentelemetry.io/otel/sdk/metric/internal/exemplar/reservoir.go index 7d5276a34..80fa59554 100644 --- a/vendor/go.opentelemetry.io/otel/sdk/metric/internal/exemplar/reservoir.go +++ b/vendor/go.opentelemetry.io/otel/sdk/metric/internal/exemplar/reservoir.go @@ -1,16 +1,5 @@ // Copyright The OpenTelemetry Authors -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. +// SPDX-License-Identifier: Apache-2.0 package exemplar // import "go.opentelemetry.io/otel/sdk/metric/internal/exemplar" @@ -19,11 +8,10 @@ import ( "time" "go.opentelemetry.io/otel/attribute" - "go.opentelemetry.io/otel/sdk/metric/metricdata" ) // Reservoir holds the sampled exemplar of measurements made. -type Reservoir[N int64 | float64] interface { +type Reservoir interface { // Offer accepts the parameters associated with a measurement. The // parameters will be stored as an exemplar if the Reservoir decides to // sample the measurement. @@ -35,10 +23,10 @@ type Reservoir[N int64 | float64] interface { // The time t is the time when the measurement was made. The val and attr // parameters are the value and dropped (filtered) attributes of the // measurement respectively. - Offer(ctx context.Context, t time.Time, val N, attr []attribute.KeyValue) + Offer(ctx context.Context, t time.Time, val Value, attr []attribute.KeyValue) // Collect returns all the held exemplars. // // The Reservoir state is preserved after this call. - Collect(dest *[]metricdata.Exemplar[N]) + Collect(dest *[]Exemplar) } diff --git a/vendor/go.opentelemetry.io/otel/sdk/metric/internal/exemplar/storage.go b/vendor/go.opentelemetry.io/otel/sdk/metric/internal/exemplar/storage.go index e2c2b90a3..10b2976f7 100644 --- a/vendor/go.opentelemetry.io/otel/sdk/metric/internal/exemplar/storage.go +++ b/vendor/go.opentelemetry.io/otel/sdk/metric/internal/exemplar/storage.go @@ -1,16 +1,5 @@ // Copyright The OpenTelemetry Authors -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. +// SPDX-License-Identifier: Apache-2.0 package exemplar // import "go.opentelemetry.io/otel/sdk/metric/internal/exemplar" @@ -19,27 +8,26 @@ import ( "time" "go.opentelemetry.io/otel/attribute" - "go.opentelemetry.io/otel/sdk/metric/metricdata" "go.opentelemetry.io/otel/trace" ) // storage is an exemplar storage for [Reservoir] implementations. -type storage[N int64 | float64] struct { +type storage struct { // store are the measurements sampled. // // This does not use []metricdata.Exemplar because it potentially would // require an allocation for trace and span IDs in the hot path of Offer. - store []measurement[N] + store []measurement } -func newStorage[N int64 | float64](n int) *storage[N] { - return &storage[N]{store: make([]measurement[N], n)} +func newStorage(n int) *storage { + return &storage{store: make([]measurement, n)} } // Collect returns all the held exemplars. // // The Reservoir state is preserved after this call. -func (r *storage[N]) Collect(dest *[]metricdata.Exemplar[N]) { +func (r *storage) Collect(dest *[]Exemplar) { *dest = reset(*dest, len(r.store), len(r.store)) var n int for _, m := range r.store { @@ -54,13 +42,13 @@ func (r *storage[N]) Collect(dest *[]metricdata.Exemplar[N]) { } // measurement is a measurement made by a telemetry system. -type measurement[N int64 | float64] struct { +type measurement struct { // FilteredAttributes are the attributes dropped during the measurement. FilteredAttributes []attribute.KeyValue // Time is the time when the measurement was made. Time time.Time // Value is the value of the measurement. - Value N + Value Value // SpanContext is the SpanContext active when a measurement was made. SpanContext trace.SpanContext @@ -68,8 +56,8 @@ type measurement[N int64 | float64] struct { } // newMeasurement returns a new non-empty Measurement. -func newMeasurement[N int64 | float64](ctx context.Context, ts time.Time, v N, droppedAttr []attribute.KeyValue) measurement[N] { - return measurement[N]{ +func newMeasurement(ctx context.Context, ts time.Time, v Value, droppedAttr []attribute.KeyValue) measurement { + return measurement{ FilteredAttributes: droppedAttr, Time: ts, Value: v, @@ -78,8 +66,8 @@ func newMeasurement[N int64 | float64](ctx context.Context, ts time.Time, v N, d } } -// Exemplar returns m as a [metricdata.Exemplar]. -func (m measurement[N]) Exemplar(dest *metricdata.Exemplar[N]) { +// Exemplar returns m as an [Exemplar]. +func (m measurement) Exemplar(dest *Exemplar) { dest.FilteredAttributes = m.FilteredAttributes dest.Time = m.Time dest.Value = m.Value diff --git a/vendor/go.opentelemetry.io/otel/sdk/metric/internal/exemplar/value.go b/vendor/go.opentelemetry.io/otel/sdk/metric/internal/exemplar/value.go new file mode 100644 index 000000000..1957d6b1e --- /dev/null +++ b/vendor/go.opentelemetry.io/otel/sdk/metric/internal/exemplar/value.go @@ -0,0 +1,58 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package exemplar // import "go.opentelemetry.io/otel/sdk/metric/internal/exemplar" + +import "math" + +// ValueType identifies the type of value used in exemplar data. +type ValueType uint8 + +const ( + // UnknownValueType should not be used. It represents a misconfigured + // Value. + UnknownValueType ValueType = 0 + // Int64ValueType represents a Value with int64 data. + Int64ValueType ValueType = 1 + // Float64ValueType represents a Value with float64 data. + Float64ValueType ValueType = 2 +) + +// Value is the value of data held by an exemplar. +type Value struct { + t ValueType + val uint64 +} + +// NewValue returns a new [Value] for the provided value. +func NewValue[N int64 | float64](value N) Value { + switch v := any(value).(type) { + case int64: + return Value{t: Int64ValueType, val: uint64(v)} + case float64: + return Value{t: Float64ValueType, val: math.Float64bits(v)} + } + return Value{} +} + +// Type returns the [ValueType] of data held by v. +func (v Value) Type() ValueType { return v.t } + +// Int64 returns the value of v as an int64. If the ValueType of v is not an +// Int64ValueType, 0 is returned. +func (v Value) Int64() int64 { + if v.t == Int64ValueType { + // Assumes the correct int64 was stored in v.val based on type. + return int64(v.val) // nolint: gosec + } + return 0 +} + +// Float64 returns the value of v as an float64. If the ValueType of v is not +// an Float64ValueType, 0 is returned. +func (v Value) Float64() float64 { + if v.t == Float64ValueType { + return math.Float64frombits(v.val) + } + return 0 +} diff --git a/vendor/go.opentelemetry.io/otel/sdk/metric/internal/reuse_slice.go b/vendor/go.opentelemetry.io/otel/sdk/metric/internal/reuse_slice.go index 9695492b0..19ec6806f 100644 --- a/vendor/go.opentelemetry.io/otel/sdk/metric/internal/reuse_slice.go +++ b/vendor/go.opentelemetry.io/otel/sdk/metric/internal/reuse_slice.go @@ -1,16 +1,5 @@ // Copyright The OpenTelemetry Authors -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. +// SPDX-License-Identifier: Apache-2.0 package internal // import "go.opentelemetry.io/otel/sdk/metric/internal" diff --git a/vendor/go.opentelemetry.io/otel/sdk/metric/internal/x/x.go b/vendor/go.opentelemetry.io/otel/sdk/metric/internal/x/x.go index 541160f94..8cd2f3741 100644 --- a/vendor/go.opentelemetry.io/otel/sdk/metric/internal/x/x.go +++ b/vendor/go.opentelemetry.io/otel/sdk/metric/internal/x/x.go @@ -1,16 +1,5 @@ // Copyright The OpenTelemetry Authors -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. +// SPDX-License-Identifier: Apache-2.0 // Package x contains support for OTel metric SDK experimental features. // |