diff options
author | 2023-11-20 17:43:55 +0200 | |
---|---|---|
committer | 2023-11-20 16:43:55 +0100 | |
commit | 1ba3e14b36c8f00475bdd41cd4a487ef7636836e (patch) | |
tree | 17405b2de75bb6faaefb9bbfc6487fd7a5efc35a /vendor/go.opentelemetry.io/otel/sdk/metric/internal/aggregate | |
parent | [bugfix] self-referencing collection pages for status replies (#2364) (diff) | |
download | gotosocial-1ba3e14b36c8f00475bdd41cd4a487ef7636836e.tar.xz |
[feature] Initial Prometheus metrics implementation (#2334)
* feat: Initial OTEL metrics
* docs: add metrics documentation
* fix: metrics endpoint conditional check
* feat: metrics endpoint basic auth
* fix: make metrics-auth-enabled default false
* fix: go fmt helpers.gen.go
* fix: add metric-related env vars to envparsing.sh
* fix: metrics docs
* fix: metrics related stuff in envparsing.sh
* fix: metrics docs
* chore: metrics docs wording
* fix: metrics stuff in envparsing?
* bump otel versions
---------
Co-authored-by: Tsuribori <user@acertaindebian>
Co-authored-by: Tsuribori <none@example.org>
Co-authored-by: tsmethurst <tobi.smethurst@protonmail.com>
Diffstat (limited to 'vendor/go.opentelemetry.io/otel/sdk/metric/internal/aggregate')
6 files changed, 1104 insertions, 0 deletions
diff --git a/vendor/go.opentelemetry.io/otel/sdk/metric/internal/aggregate/aggregate.go b/vendor/go.opentelemetry.io/otel/sdk/metric/internal/aggregate/aggregate.go new file mode 100644 index 000000000..8dec14237 --- /dev/null +++ b/vendor/go.opentelemetry.io/otel/sdk/metric/internal/aggregate/aggregate.go @@ -0,0 +1,133 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package aggregate // import "go.opentelemetry.io/otel/sdk/metric/internal/aggregate" + +import ( + "context" + "time" + + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/sdk/metric/metricdata" +) + +// now is used to return the current local time while allowing tests to +// override the default time.Now function. +var now = time.Now + +// Measure receives measurements to be aggregated. +type Measure[N int64 | float64] func(context.Context, N, attribute.Set) + +// ComputeAggregation stores the aggregate of measurements into dest and +// returns the number of aggregate data-points output. +type ComputeAggregation func(dest *metricdata.Aggregation) int + +// Builder builds an aggregate function. +type Builder[N int64 | float64] struct { + // Temporality is the temporality used for the returned aggregate function. + // + // If this is not provided a default of cumulative will be used (except for + // the last-value aggregate function where delta is the only appropriate + // temporality). + Temporality metricdata.Temporality + // Filter is the attribute filter the aggregate function will use on the + // input of measurements. + Filter attribute.Filter +} + +func (b Builder[N]) filter(f Measure[N]) Measure[N] { + if b.Filter != nil { + fltr := b.Filter // Copy to make it immutable after assignment. + return func(ctx context.Context, n N, a attribute.Set) { + fAttr, _ := a.Filter(fltr) + f(ctx, n, fAttr) + } + } + return f +} + +// LastValue returns a last-value aggregate function input and output. +// +// The Builder.Temporality is ignored and delta is use always. +func (b Builder[N]) LastValue() (Measure[N], ComputeAggregation) { + // Delta temporality is the only temporality that makes semantic sense for + // a last-value aggregate. + lv := newLastValue[N]() + + return b.filter(lv.measure), func(dest *metricdata.Aggregation) int { + // Ignore if dest is not a metricdata.Gauge. The chance for memory + // reuse of the DataPoints is missed (better luck next time). + gData, _ := (*dest).(metricdata.Gauge[N]) + lv.computeAggregation(&gData.DataPoints) + *dest = gData + + return len(gData.DataPoints) + } +} + +// PrecomputedSum returns a sum aggregate function input and output. The +// arguments passed to the input are expected to be the precomputed sum values. +func (b Builder[N]) PrecomputedSum(monotonic bool) (Measure[N], ComputeAggregation) { + s := newPrecomputedSum[N](monotonic) + switch b.Temporality { + case metricdata.DeltaTemporality: + return b.filter(s.measure), s.delta + default: + return b.filter(s.measure), s.cumulative + } +} + +// Sum returns a sum aggregate function input and output. +func (b Builder[N]) Sum(monotonic bool) (Measure[N], ComputeAggregation) { + s := newSum[N](monotonic) + switch b.Temporality { + case metricdata.DeltaTemporality: + return b.filter(s.measure), s.delta + default: + return b.filter(s.measure), s.cumulative + } +} + +// ExplicitBucketHistogram returns a histogram aggregate function input and +// output. +func (b Builder[N]) ExplicitBucketHistogram(boundaries []float64, noMinMax, noSum bool) (Measure[N], ComputeAggregation) { + h := newHistogram[N](boundaries, noMinMax, noSum) + switch b.Temporality { + case metricdata.DeltaTemporality: + return b.filter(h.measure), h.delta + default: + return b.filter(h.measure), h.cumulative + } +} + +// ExponentialBucketHistogram returns a histogram aggregate function input and +// output. +func (b Builder[N]) ExponentialBucketHistogram(maxSize, maxScale int32, noMinMax, noSum bool) (Measure[N], ComputeAggregation) { + h := newExponentialHistogram[N](maxSize, maxScale, noMinMax, noSum) + switch b.Temporality { + case metricdata.DeltaTemporality: + return b.filter(h.measure), h.delta + default: + return b.filter(h.measure), h.cumulative + } +} + +// reset ensures s has capacity and sets it length. If the capacity of s too +// small, a new slice is returned with the specified capacity and length. +func reset[T any](s []T, length, capacity int) []T { + if cap(s) < capacity { + return make([]T, length, capacity) + } + return s[:length] +} diff --git a/vendor/go.opentelemetry.io/otel/sdk/metric/internal/aggregate/doc.go b/vendor/go.opentelemetry.io/otel/sdk/metric/internal/aggregate/doc.go new file mode 100644 index 000000000..e83a2693f --- /dev/null +++ b/vendor/go.opentelemetry.io/otel/sdk/metric/internal/aggregate/doc.go @@ -0,0 +1,18 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// Package aggregate provides aggregate types used compute aggregations and +// cycle the state of metric measurements made by the SDK. These types and +// functionality are meant only for internal SDK use. +package aggregate // import "go.opentelemetry.io/otel/sdk/metric/internal/aggregate" diff --git a/vendor/go.opentelemetry.io/otel/sdk/metric/internal/aggregate/exponential_histogram.go b/vendor/go.opentelemetry.io/otel/sdk/metric/internal/aggregate/exponential_histogram.go new file mode 100644 index 000000000..98b7dc1e0 --- /dev/null +++ b/vendor/go.opentelemetry.io/otel/sdk/metric/internal/aggregate/exponential_histogram.go @@ -0,0 +1,432 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package aggregate // import "go.opentelemetry.io/otel/sdk/metric/internal/aggregate" + +import ( + "context" + "errors" + "math" + "sync" + "time" + + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/sdk/metric/metricdata" +) + +const ( + expoMaxScale = 20 + expoMinScale = -10 + + smallestNonZeroNormalFloat64 = 0x1p-1022 + + // These redefine the Math constants with a type, so the compiler won't coerce + // them into an int on 32 bit platforms. + maxInt64 int64 = math.MaxInt64 + minInt64 int64 = math.MinInt64 +) + +// expoHistogramDataPoint is a single data point in an exponential histogram. +type expoHistogramDataPoint[N int64 | float64] struct { + count uint64 + min N + max N + sum N + + maxSize int + noMinMax bool + noSum bool + + scale int + + posBuckets expoBuckets + negBuckets expoBuckets + zeroCount uint64 +} + +func newExpoHistogramDataPoint[N int64 | float64](maxSize, maxScale int, noMinMax, noSum bool) *expoHistogramDataPoint[N] { + f := math.MaxFloat64 + max := N(f) // if N is int64, max will overflow to -9223372036854775808 + min := N(-f) + if N(maxInt64) > N(f) { + max = N(maxInt64) + min = N(minInt64) + } + return &expoHistogramDataPoint[N]{ + min: max, + max: min, + maxSize: maxSize, + noMinMax: noMinMax, + noSum: noSum, + scale: maxScale, + } +} + +// record adds a new measurement to the histogram. It will rescale the buckets if needed. +func (p *expoHistogramDataPoint[N]) record(v N) { + p.count++ + + if !p.noMinMax { + if v < p.min { + p.min = v + } + if v > p.max { + p.max = v + } + } + if !p.noSum { + p.sum += v + } + + absV := math.Abs(float64(v)) + + if float64(absV) == 0.0 { + p.zeroCount++ + return + } + + bin := p.getBin(absV) + + bucket := &p.posBuckets + if v < 0 { + bucket = &p.negBuckets + } + + // If the new bin would make the counts larger than maxScale, we need to + // downscale current measurements. + if scaleDelta := p.scaleChange(bin, bucket.startBin, len(bucket.counts)); scaleDelta > 0 { + if p.scale-scaleDelta < expoMinScale { + // With a scale of -10 there is only two buckets for the whole range of float64 values. + // This can only happen if there is a max size of 1. + otel.Handle(errors.New("exponential histogram scale underflow")) + return + } + // Downscale + p.scale -= scaleDelta + p.posBuckets.downscale(scaleDelta) + p.negBuckets.downscale(scaleDelta) + + bin = p.getBin(absV) + } + + bucket.record(bin) +} + +// getBin returns the bin v should be recorded into. +func (p *expoHistogramDataPoint[N]) getBin(v float64) int { + frac, exp := math.Frexp(v) + if p.scale <= 0 { + // Because of the choice of fraction is always 1 power of two higher than we want. + correction := 1 + if frac == .5 { + // If v is an exact power of two the frac will be .5 and the exp + // will be one higher than we want. + correction = 2 + } + return (exp - correction) >> (-p.scale) + } + return exp<<p.scale + int(math.Log(frac)*scaleFactors[p.scale]) - 1 +} + +// scaleFactors are constants used in calculating the logarithm index. They are +// equivalent to 2^index/log(2). +var scaleFactors = [21]float64{ + math.Ldexp(math.Log2E, 0), + math.Ldexp(math.Log2E, 1), + math.Ldexp(math.Log2E, 2), + math.Ldexp(math.Log2E, 3), + math.Ldexp(math.Log2E, 4), + math.Ldexp(math.Log2E, 5), + math.Ldexp(math.Log2E, 6), + math.Ldexp(math.Log2E, 7), + math.Ldexp(math.Log2E, 8), + math.Ldexp(math.Log2E, 9), + math.Ldexp(math.Log2E, 10), + math.Ldexp(math.Log2E, 11), + math.Ldexp(math.Log2E, 12), + math.Ldexp(math.Log2E, 13), + math.Ldexp(math.Log2E, 14), + math.Ldexp(math.Log2E, 15), + math.Ldexp(math.Log2E, 16), + math.Ldexp(math.Log2E, 17), + math.Ldexp(math.Log2E, 18), + math.Ldexp(math.Log2E, 19), + math.Ldexp(math.Log2E, 20), +} + +// scaleChange returns the magnitude of the scale change needed to fit bin in +// the bucket. If no scale change is needed 0 is returned. +func (p *expoHistogramDataPoint[N]) scaleChange(bin, startBin, length int) int { + if length == 0 { + // No need to rescale if there are no buckets. + return 0 + } + + low := startBin + high := bin + if startBin >= bin { + low = bin + high = startBin + length - 1 + } + + count := 0 + for high-low >= p.maxSize { + low = low >> 1 + high = high >> 1 + count++ + if count > expoMaxScale-expoMinScale { + return count + } + } + return count +} + +// expoBuckets is a set of buckets in an exponential histogram. +type expoBuckets struct { + startBin int + counts []uint64 +} + +// record increments the count for the given bin, and expands the buckets if needed. +// Size changes must be done before calling this function. +func (b *expoBuckets) record(bin int) { + if len(b.counts) == 0 { + b.counts = []uint64{1} + b.startBin = bin + return + } + + endBin := b.startBin + len(b.counts) - 1 + + // if the new bin is inside the current range + if bin >= b.startBin && bin <= endBin { + b.counts[bin-b.startBin]++ + return + } + // if the new bin is before the current start add spaces to the counts + if bin < b.startBin { + origLen := len(b.counts) + newLength := endBin - bin + 1 + shift := b.startBin - bin + + if newLength > cap(b.counts) { + b.counts = append(b.counts, make([]uint64, newLength-len(b.counts))...) + } + + copy(b.counts[shift:origLen+shift], b.counts[:]) + b.counts = b.counts[:newLength] + for i := 1; i < shift; i++ { + b.counts[i] = 0 + } + b.startBin = bin + b.counts[0] = 1 + return + } + // if the new is after the end add spaces to the end + if bin > endBin { + if bin-b.startBin < cap(b.counts) { + b.counts = b.counts[:bin-b.startBin+1] + for i := endBin + 1 - b.startBin; i < len(b.counts); i++ { + b.counts[i] = 0 + } + b.counts[bin-b.startBin] = 1 + return + } + + end := make([]uint64, bin-b.startBin-len(b.counts)+1) + b.counts = append(b.counts, end...) + b.counts[bin-b.startBin] = 1 + } +} + +// downscale shrinks a bucket by a factor of 2*s. It will sum counts into the +// correct lower resolution bucket. +func (b *expoBuckets) downscale(delta int) { + // Example + // delta = 2 + // Original offset: -6 + // Counts: [ 3, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10] + // bins: -6 -5, -4, -3, -2, -1, 0, 1, 2, 3, 4 + // new bins:-2, -2, -1, -1, -1, -1, 0, 0, 0, 0, 1 + // new Offset: -2 + // new Counts: [4, 14, 30, 10] + + if len(b.counts) <= 1 || delta < 1 { + b.startBin = b.startBin >> delta + return + } + + steps := 1 << delta + offset := b.startBin % steps + offset = (offset + steps) % steps // to make offset positive + for i := 1; i < len(b.counts); i++ { + idx := i + offset + if idx%steps == 0 { + b.counts[idx/steps] = b.counts[i] + continue + } + b.counts[idx/steps] += b.counts[i] + } + + lastIdx := (len(b.counts) - 1 + offset) / steps + b.counts = b.counts[:lastIdx+1] + b.startBin = b.startBin >> delta +} + +// newExponentialHistogram returns an Aggregator that summarizes a set of +// measurements as an exponential histogram. Each histogram is scoped by attributes +// and the aggregation cycle the measurements were made in. +func newExponentialHistogram[N int64 | float64](maxSize, maxScale int32, noMinMax, noSum bool) *expoHistogram[N] { + return &expoHistogram[N]{ + noSum: noSum, + noMinMax: noMinMax, + maxSize: int(maxSize), + maxScale: int(maxScale), + + values: make(map[attribute.Set]*expoHistogramDataPoint[N]), + + start: now(), + } +} + +// expoHistogram summarizes a set of measurements as an histogram with exponentially +// defined buckets. +type expoHistogram[N int64 | float64] struct { + noSum bool + noMinMax bool + maxSize int + maxScale int + + values map[attribute.Set]*expoHistogramDataPoint[N] + valuesMu sync.Mutex + + start time.Time +} + +func (e *expoHistogram[N]) measure(_ context.Context, value N, attr attribute.Set) { + // Ignore NaN and infinity. + if math.IsInf(float64(value), 0) || math.IsNaN(float64(value)) { + return + } + + e.valuesMu.Lock() + defer e.valuesMu.Unlock() + + v, ok := e.values[attr] + if !ok { + v = newExpoHistogramDataPoint[N](e.maxSize, e.maxScale, e.noMinMax, e.noSum) + e.values[attr] = v + } + v.record(value) +} + +func (e *expoHistogram[N]) delta(dest *metricdata.Aggregation) int { + t := now() + + // If *dest is not a metricdata.ExponentialHistogram, memory reuse is missed. + // In that case, use the zero-value h and hope for better alignment next cycle. + h, _ := (*dest).(metricdata.ExponentialHistogram[N]) + h.Temporality = metricdata.DeltaTemporality + + e.valuesMu.Lock() + defer e.valuesMu.Unlock() + + n := len(e.values) + hDPts := reset(h.DataPoints, n, n) + + var i int + for a, b := range e.values { + hDPts[i].Attributes = a + hDPts[i].StartTime = e.start + hDPts[i].Time = t + hDPts[i].Count = b.count + hDPts[i].Scale = int32(b.scale) + hDPts[i].ZeroCount = b.zeroCount + hDPts[i].ZeroThreshold = 0.0 + + hDPts[i].PositiveBucket.Offset = int32(b.posBuckets.startBin) + hDPts[i].PositiveBucket.Counts = reset(hDPts[i].PositiveBucket.Counts, len(b.posBuckets.counts), len(b.posBuckets.counts)) + copy(hDPts[i].PositiveBucket.Counts, b.posBuckets.counts) + + hDPts[i].NegativeBucket.Offset = int32(b.negBuckets.startBin) + hDPts[i].NegativeBucket.Counts = reset(hDPts[i].NegativeBucket.Counts, len(b.negBuckets.counts), len(b.negBuckets.counts)) + + if !e.noSum { + hDPts[i].Sum = b.sum + } + if !e.noMinMax { + hDPts[i].Min = metricdata.NewExtrema(b.min) + hDPts[i].Max = metricdata.NewExtrema(b.max) + } + + delete(e.values, a) + i++ + } + e.start = t + h.DataPoints = hDPts + *dest = h + return n +} + +func (e *expoHistogram[N]) cumulative(dest *metricdata.Aggregation) int { + t := now() + + // If *dest is not a metricdata.ExponentialHistogram, memory reuse is missed. + // In that case, use the zero-value h and hope for better alignment next cycle. + h, _ := (*dest).(metricdata.ExponentialHistogram[N]) + h.Temporality = metricdata.CumulativeTemporality + + e.valuesMu.Lock() + defer e.valuesMu.Unlock() + + n := len(e.values) + hDPts := reset(h.DataPoints, n, n) + + var i int + for a, b := range e.values { + hDPts[i].Attributes = a + hDPts[i].StartTime = e.start + hDPts[i].Time = t + hDPts[i].Count = b.count + hDPts[i].Scale = int32(b.scale) + hDPts[i].ZeroCount = b.zeroCount + hDPts[i].ZeroThreshold = 0.0 + + hDPts[i].PositiveBucket.Offset = int32(b.posBuckets.startBin) + hDPts[i].PositiveBucket.Counts = reset(hDPts[i].PositiveBucket.Counts, len(b.posBuckets.counts), len(b.posBuckets.counts)) + copy(hDPts[i].PositiveBucket.Counts, b.posBuckets.counts) + + hDPts[i].NegativeBucket.Offset = int32(b.negBuckets.startBin) + hDPts[i].NegativeBucket.Counts = reset(hDPts[i].NegativeBucket.Counts, len(b.negBuckets.counts), len(b.negBuckets.counts)) + + if !e.noSum { + hDPts[i].Sum = b.sum + } + if !e.noMinMax { + hDPts[i].Min = metricdata.NewExtrema(b.min) + hDPts[i].Max = metricdata.NewExtrema(b.max) + } + + i++ + // TODO (#3006): This will use an unbounded amount of memory if there + // are unbounded number of attribute sets being aggregated. Attribute + // sets that become "stale" need to be forgotten so this will not + // overload the system. + } + + h.DataPoints = hDPts + *dest = h + return n +} diff --git a/vendor/go.opentelemetry.io/otel/sdk/metric/internal/aggregate/histogram.go b/vendor/go.opentelemetry.io/otel/sdk/metric/internal/aggregate/histogram.go new file mode 100644 index 000000000..62ec51e1f --- /dev/null +++ b/vendor/go.opentelemetry.io/otel/sdk/metric/internal/aggregate/histogram.go @@ -0,0 +1,231 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package aggregate // import "go.opentelemetry.io/otel/sdk/metric/internal/aggregate" + +import ( + "context" + "sort" + "sync" + "time" + + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/sdk/metric/metricdata" +) + +type buckets[N int64 | float64] struct { + counts []uint64 + count uint64 + total N + min, max N +} + +// newBuckets returns buckets with n bins. +func newBuckets[N int64 | float64](n int) *buckets[N] { + return &buckets[N]{counts: make([]uint64, n)} +} + +func (b *buckets[N]) sum(value N) { b.total += value } + +func (b *buckets[N]) bin(idx int, value N) { + b.counts[idx]++ + b.count++ + if value < b.min { + b.min = value + } else if value > b.max { + b.max = value + } +} + +// histValues summarizes a set of measurements as an histValues with +// explicitly defined buckets. +type histValues[N int64 | float64] struct { + noSum bool + bounds []float64 + + values map[attribute.Set]*buckets[N] + valuesMu sync.Mutex +} + +func newHistValues[N int64 | float64](bounds []float64, noSum bool) *histValues[N] { + // The responsibility of keeping all buckets correctly associated with the + // passed boundaries is ultimately this type's responsibility. Make a copy + // here so we can always guarantee this. Or, in the case of failure, have + // complete control over the fix. + b := make([]float64, len(bounds)) + copy(b, bounds) + sort.Float64s(b) + return &histValues[N]{ + noSum: noSum, + bounds: b, + values: make(map[attribute.Set]*buckets[N]), + } +} + +// Aggregate records the measurement value, scoped by attr, and aggregates it +// into a histogram. +func (s *histValues[N]) measure(_ context.Context, value N, attr attribute.Set) { + // This search will return an index in the range [0, len(s.bounds)], where + // it will return len(s.bounds) if value is greater than the last element + // of s.bounds. This aligns with the buckets in that the length of buckets + // is len(s.bounds)+1, with the last bucket representing: + // (s.bounds[len(s.bounds)-1], +∞). + idx := sort.SearchFloat64s(s.bounds, float64(value)) + + s.valuesMu.Lock() + defer s.valuesMu.Unlock() + + b, ok := s.values[attr] + if !ok { + // N+1 buckets. For example: + // + // bounds = [0, 5, 10] + // + // Then, + // + // buckets = (-∞, 0], (0, 5.0], (5.0, 10.0], (10.0, +∞) + b = newBuckets[N](len(s.bounds) + 1) + // Ensure min and max are recorded values (not zero), for new buckets. + b.min, b.max = value, value + s.values[attr] = b + } + b.bin(idx, value) + if !s.noSum { + b.sum(value) + } +} + +// newHistogram returns an Aggregator that summarizes a set of measurements as +// an histogram. +func newHistogram[N int64 | float64](boundaries []float64, noMinMax, noSum bool) *histogram[N] { + return &histogram[N]{ + histValues: newHistValues[N](boundaries, noSum), + noMinMax: noMinMax, + start: now(), + } +} + +// histogram summarizes a set of measurements as an histogram with explicitly +// defined buckets. +type histogram[N int64 | float64] struct { + *histValues[N] + + noMinMax bool + start time.Time +} + +func (s *histogram[N]) delta(dest *metricdata.Aggregation) int { + t := now() + + // If *dest is not a metricdata.Histogram, memory reuse is missed. In that + // case, use the zero-value h and hope for better alignment next cycle. + h, _ := (*dest).(metricdata.Histogram[N]) + h.Temporality = metricdata.DeltaTemporality + + s.valuesMu.Lock() + defer s.valuesMu.Unlock() + + // Do not allow modification of our copy of bounds. + bounds := make([]float64, len(s.bounds)) + copy(bounds, s.bounds) + + n := len(s.values) + hDPts := reset(h.DataPoints, n, n) + + var i int + for a, b := range s.values { + hDPts[i].Attributes = a + hDPts[i].StartTime = s.start + hDPts[i].Time = t + hDPts[i].Count = b.count + hDPts[i].Bounds = bounds + hDPts[i].BucketCounts = b.counts + + if !s.noSum { + hDPts[i].Sum = b.total + } + + if !s.noMinMax { + hDPts[i].Min = metricdata.NewExtrema(b.min) + hDPts[i].Max = metricdata.NewExtrema(b.max) + } + + // Unused attribute sets do not report. + delete(s.values, a) + i++ + } + // The delta collection cycle resets. + s.start = t + + h.DataPoints = hDPts + *dest = h + + return n +} + +func (s *histogram[N]) cumulative(dest *metricdata.Aggregation) int { + t := now() + + // If *dest is not a metricdata.Histogram, memory reuse is missed. In that + // case, use the zero-value h and hope for better alignment next cycle. + h, _ := (*dest).(metricdata.Histogram[N]) + h.Temporality = metricdata.CumulativeTemporality + + s.valuesMu.Lock() + defer s.valuesMu.Unlock() + + // Do not allow modification of our copy of bounds. + bounds := make([]float64, len(s.bounds)) + copy(bounds, s.bounds) + + n := len(s.values) + hDPts := reset(h.DataPoints, n, n) + + var i int + for a, b := range s.values { + // The HistogramDataPoint field values returned need to be copies of + // the buckets value as we will keep updating them. + // + // TODO (#3047): Making copies for bounds and counts incurs a large + // memory allocation footprint. Alternatives should be explored. + counts := make([]uint64, len(b.counts)) + copy(counts, b.counts) + + hDPts[i].Attributes = a + hDPts[i].StartTime = s.start + hDPts[i].Time = t + hDPts[i].Count = b.count + hDPts[i].Bounds = bounds + hDPts[i].BucketCounts = counts + + if !s.noSum { + hDPts[i].Sum = b.total + } + + if !s.noMinMax { + hDPts[i].Min = metricdata.NewExtrema(b.min) + hDPts[i].Max = metricdata.NewExtrema(b.max) + } + i++ + // TODO (#3006): This will use an unbounded amount of memory if there + // are unbounded number of attribute sets being aggregated. Attribute + // sets that become "stale" need to be forgotten so this will not + // overload the system. + } + + h.DataPoints = hDPts + *dest = h + + return n +} diff --git a/vendor/go.opentelemetry.io/otel/sdk/metric/internal/aggregate/lastvalue.go b/vendor/go.opentelemetry.io/otel/sdk/metric/internal/aggregate/lastvalue.go new file mode 100644 index 000000000..6af2d6061 --- /dev/null +++ b/vendor/go.opentelemetry.io/otel/sdk/metric/internal/aggregate/lastvalue.go @@ -0,0 +1,68 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package aggregate // import "go.opentelemetry.io/otel/sdk/metric/internal/aggregate" + +import ( + "context" + "sync" + "time" + + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/sdk/metric/metricdata" +) + +// datapoint is timestamped measurement data. +type datapoint[N int64 | float64] struct { + timestamp time.Time + value N +} + +func newLastValue[N int64 | float64]() *lastValue[N] { + return &lastValue[N]{values: make(map[attribute.Set]datapoint[N])} +} + +// lastValue summarizes a set of measurements as the last one made. +type lastValue[N int64 | float64] struct { + sync.Mutex + + values map[attribute.Set]datapoint[N] +} + +func (s *lastValue[N]) measure(ctx context.Context, value N, attr attribute.Set) { + d := datapoint[N]{timestamp: now(), value: value} + s.Lock() + s.values[attr] = d + s.Unlock() +} + +func (s *lastValue[N]) computeAggregation(dest *[]metricdata.DataPoint[N]) { + s.Lock() + defer s.Unlock() + + n := len(s.values) + *dest = reset(*dest, n, n) + + var i int + for a, v := range s.values { + (*dest)[i].Attributes = a + // The event time is the only meaningful timestamp, StartTime is + // ignored. + (*dest)[i].Time = v.timestamp + (*dest)[i].Value = v.value + // Do not report stale values. + delete(s.values, a) + i++ + } +} diff --git a/vendor/go.opentelemetry.io/otel/sdk/metric/internal/aggregate/sum.go b/vendor/go.opentelemetry.io/otel/sdk/metric/internal/aggregate/sum.go new file mode 100644 index 000000000..1e52ff0d1 --- /dev/null +++ b/vendor/go.opentelemetry.io/otel/sdk/metric/internal/aggregate/sum.go @@ -0,0 +1,222 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package aggregate // import "go.opentelemetry.io/otel/sdk/metric/internal/aggregate" + +import ( + "context" + "sync" + "time" + + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/sdk/metric/metricdata" +) + +// valueMap is the storage for sums. +type valueMap[N int64 | float64] struct { + sync.Mutex + values map[attribute.Set]N +} + +func newValueMap[N int64 | float64]() *valueMap[N] { + return &valueMap[N]{values: make(map[attribute.Set]N)} +} + +func (s *valueMap[N]) measure(_ context.Context, value N, attr attribute.Set) { + s.Lock() + s.values[attr] += value + s.Unlock() +} + +// newSum returns an aggregator that summarizes a set of measurements as their +// arithmetic sum. Each sum is scoped by attributes and the aggregation cycle +// the measurements were made in. +func newSum[N int64 | float64](monotonic bool) *sum[N] { + return &sum[N]{ + valueMap: newValueMap[N](), + monotonic: monotonic, + start: now(), + } +} + +// sum summarizes a set of measurements made as their arithmetic sum. +type sum[N int64 | float64] struct { + *valueMap[N] + + monotonic bool + start time.Time +} + +func (s *sum[N]) delta(dest *metricdata.Aggregation) int { + t := now() + + // If *dest is not a metricdata.Sum, memory reuse is missed. In that case, + // use the zero-value sData and hope for better alignment next cycle. + sData, _ := (*dest).(metricdata.Sum[N]) + sData.Temporality = metricdata.DeltaTemporality + sData.IsMonotonic = s.monotonic + + s.Lock() + defer s.Unlock() + + n := len(s.values) + dPts := reset(sData.DataPoints, n, n) + + var i int + for attr, value := range s.values { + dPts[i].Attributes = attr + dPts[i].StartTime = s.start + dPts[i].Time = t + dPts[i].Value = value + // Do not report stale values. + delete(s.values, attr) + i++ + } + // The delta collection cycle resets. + s.start = t + + sData.DataPoints = dPts + *dest = sData + + return n +} + +func (s *sum[N]) cumulative(dest *metricdata.Aggregation) int { + t := now() + + // If *dest is not a metricdata.Sum, memory reuse is missed. In that case, + // use the zero-value sData and hope for better alignment next cycle. + sData, _ := (*dest).(metricdata.Sum[N]) + sData.Temporality = metricdata.CumulativeTemporality + sData.IsMonotonic = s.monotonic + + s.Lock() + defer s.Unlock() + + n := len(s.values) + dPts := reset(sData.DataPoints, n, n) + + var i int + for attr, value := range s.values { + dPts[i].Attributes = attr + dPts[i].StartTime = s.start + dPts[i].Time = t + dPts[i].Value = value + // TODO (#3006): This will use an unbounded amount of memory if there + // are unbounded number of attribute sets being aggregated. Attribute + // sets that become "stale" need to be forgotten so this will not + // overload the system. + i++ + } + + sData.DataPoints = dPts + *dest = sData + + return n +} + +// newPrecomputedSum returns an aggregator that summarizes a set of +// observatrions as their arithmetic sum. Each sum is scoped by attributes and +// the aggregation cycle the measurements were made in. +func newPrecomputedSum[N int64 | float64](monotonic bool) *precomputedSum[N] { + return &precomputedSum[N]{ + valueMap: newValueMap[N](), + monotonic: monotonic, + start: now(), + } +} + +// precomputedSum summarizes a set of observatrions as their arithmetic sum. +type precomputedSum[N int64 | float64] struct { + *valueMap[N] + + monotonic bool + start time.Time + + reported map[attribute.Set]N +} + +func (s *precomputedSum[N]) delta(dest *metricdata.Aggregation) int { + t := now() + newReported := make(map[attribute.Set]N) + + // If *dest is not a metricdata.Sum, memory reuse is missed. In that case, + // use the zero-value sData and hope for better alignment next cycle. + sData, _ := (*dest).(metricdata.Sum[N]) + sData.Temporality = metricdata.DeltaTemporality + sData.IsMonotonic = s.monotonic + + s.Lock() + defer s.Unlock() + + n := len(s.values) + dPts := reset(sData.DataPoints, n, n) + + var i int + for attr, value := range s.values { + delta := value - s.reported[attr] + + dPts[i].Attributes = attr + dPts[i].StartTime = s.start + dPts[i].Time = t + dPts[i].Value = delta + + newReported[attr] = value + // Unused attribute sets do not report. + delete(s.values, attr) + i++ + } + // Unused attribute sets are forgotten. + s.reported = newReported + // The delta collection cycle resets. + s.start = t + + sData.DataPoints = dPts + *dest = sData + + return n +} + +func (s *precomputedSum[N]) cumulative(dest *metricdata.Aggregation) int { + t := now() + + // If *dest is not a metricdata.Sum, memory reuse is missed. In that case, + // use the zero-value sData and hope for better alignment next cycle. + sData, _ := (*dest).(metricdata.Sum[N]) + sData.Temporality = metricdata.CumulativeTemporality + sData.IsMonotonic = s.monotonic + + s.Lock() + defer s.Unlock() + + n := len(s.values) + dPts := reset(sData.DataPoints, n, n) + + var i int + for attr, value := range s.values { + dPts[i].Attributes = attr + dPts[i].StartTime = s.start + dPts[i].Time = t + dPts[i].Value = value + + // Unused attribute sets do not report. + delete(s.values, attr) + i++ + } + + sData.DataPoints = dPts + *dest = sData + + return n +} |