diff options
author | 2025-02-06 12:14:37 +0100 | |
---|---|---|
committer | 2025-02-06 12:14:37 +0100 | |
commit | dd094e401282e135989f57c0ca3dee7dea3f5207 (patch) | |
tree | 74cb77830f621840273255a17565ced73b4fa997 /vendor/go.opentelemetry.io/otel/sdk | |
parent | [feature] Use `X-Robots-Tag` headers to instruct scrapers/crawlers (#3737) (diff) | |
download | gotosocial-dd094e401282e135989f57c0ca3dee7dea3f5207.tar.xz |
[chore] update otel libraries (#3740)
* chore: update otel dependencies
* refactor: combine tracing & metrics in observability package
* chore: update example tracing compose file
Diffstat (limited to 'vendor/go.opentelemetry.io/otel/sdk')
45 files changed, 805 insertions, 587 deletions
diff --git a/vendor/go.opentelemetry.io/otel/sdk/instrumentation/scope.go b/vendor/go.opentelemetry.io/otel/sdk/instrumentation/scope.go index 728115045..34852a47b 100644 --- a/vendor/go.opentelemetry.io/otel/sdk/instrumentation/scope.go +++ b/vendor/go.opentelemetry.io/otel/sdk/instrumentation/scope.go @@ -3,6 +3,8 @@ package instrumentation // import "go.opentelemetry.io/otel/sdk/instrumentation" +import "go.opentelemetry.io/otel/attribute" + // Scope represents the instrumentation scope. type Scope struct { // Name is the name of the instrumentation scope. This should be the @@ -12,4 +14,6 @@ type Scope struct { Version string // SchemaURL of the telemetry emitted by the scope. SchemaURL string + // Attributes of the telemetry emitted by the scope. + Attributes attribute.Set } diff --git a/vendor/go.opentelemetry.io/otel/sdk/metric/config.go b/vendor/go.opentelemetry.io/otel/sdk/metric/config.go index bbe7bf671..203cd9d65 100644 --- a/vendor/go.opentelemetry.io/otel/sdk/metric/config.go +++ b/vendor/go.opentelemetry.io/otel/sdk/metric/config.go @@ -5,17 +5,22 @@ package metric // import "go.opentelemetry.io/otel/sdk/metric" import ( "context" - "fmt" + "errors" + "os" + "strings" "sync" + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/sdk/metric/exemplar" "go.opentelemetry.io/otel/sdk/resource" ) // config contains configuration options for a MeterProvider. type config struct { - res *resource.Resource - readers []Reader - views []View + res *resource.Resource + readers []Reader + views []View + exemplarFilter exemplar.Filter } // readerSignals returns a force-flush and shutdown function for a @@ -39,25 +44,13 @@ func (c config) readerSignals() (forceFlush, shutdown func(context.Context) erro // value. func unify(funcs []func(context.Context) error) func(context.Context) error { return func(ctx context.Context) error { - var errs []error + var err error for _, f := range funcs { - if err := f(ctx); err != nil { - errs = append(errs, err) + if e := f(ctx); e != nil { + err = errors.Join(err, e) } } - return unifyErrors(errs) - } -} - -// unifyErrors combines multiple errors into a single error. -func unifyErrors(errs []error) error { - switch len(errs) { - case 0: - return nil - case 1: - return errs[0] - default: - return fmt.Errorf("%v", errs) + return err } } @@ -75,7 +68,13 @@ func unifyShutdown(funcs []func(context.Context) error) func(context.Context) er // newConfig returns a config configured with options. func newConfig(options []Option) config { - conf := config{res: resource.Default()} + conf := config{ + res: resource.Default(), + exemplarFilter: exemplar.TraceBasedFilter, + } + for _, o := range meterProviderOptionsFromEnv() { + conf = o.apply(conf) + } for _, o := range options { conf = o.apply(conf) } @@ -103,7 +102,11 @@ func (o optionFunc) apply(conf config) config { // go.opentelemetry.io/otel/sdk/resource package will be used. func WithResource(res *resource.Resource) Option { return optionFunc(func(conf config) config { - conf.res = res + var err error + conf.res, err = resource.Merge(resource.Environment(), res) + if err != nil { + otel.Handle(err) + } return conf }) } @@ -135,3 +138,35 @@ func WithView(views ...View) Option { return cfg }) } + +// WithExemplarFilter configures the exemplar filter. +// +// The exemplar filter determines which measurements are offered to the +// exemplar reservoir, but the exemplar reservoir makes the final decision of +// whether to store an exemplar. +// +// By default, the [exemplar.SampledFilter] +// is used. Exemplars can be entirely disabled by providing the +// [exemplar.AlwaysOffFilter]. +func WithExemplarFilter(filter exemplar.Filter) Option { + return optionFunc(func(cfg config) config { + cfg.exemplarFilter = filter + return cfg + }) +} + +func meterProviderOptionsFromEnv() []Option { + var opts []Option + // https://github.com/open-telemetry/opentelemetry-specification/blob/d4b241f451674e8f611bb589477680341006ad2b/specification/configuration/sdk-environment-variables.md#exemplar + const filterEnvKey = "OTEL_METRICS_EXEMPLAR_FILTER" + + switch strings.ToLower(strings.TrimSpace(os.Getenv(filterEnvKey))) { + case "always_on": + opts = append(opts, WithExemplarFilter(exemplar.AlwaysOnFilter)) + case "always_off": + opts = append(opts, WithExemplarFilter(exemplar.AlwaysOffFilter)) + case "trace_based": + opts = append(opts, WithExemplarFilter(exemplar.TraceBasedFilter)) + } + return opts +} diff --git a/vendor/go.opentelemetry.io/otel/sdk/metric/exemplar.go b/vendor/go.opentelemetry.io/otel/sdk/metric/exemplar.go index 82619da78..0335b8ae4 100644 --- a/vendor/go.opentelemetry.io/otel/sdk/metric/exemplar.go +++ b/vendor/go.opentelemetry.io/otel/sdk/metric/exemplar.go @@ -4,51 +4,49 @@ package metric // import "go.opentelemetry.io/otel/sdk/metric" import ( - "os" "runtime" - "slices" - "go.opentelemetry.io/otel/sdk/metric/internal/exemplar" - "go.opentelemetry.io/otel/sdk/metric/internal/x" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/sdk/metric/exemplar" + "go.opentelemetry.io/otel/sdk/metric/internal/aggregate" ) -// reservoirFunc returns the appropriately configured exemplar reservoir -// creation func based on the passed InstrumentKind and user defined -// environment variables. -// -// Note: This will only return non-nil values when the experimental exemplar -// feature is enabled and the OTEL_METRICS_EXEMPLAR_FILTER environment variable -// is not set to always_off. -func reservoirFunc[N int64 | float64](agg Aggregation) func() exemplar.FilteredReservoir[N] { - if !x.Exemplars.Enabled() { - return nil - } - // https://github.com/open-telemetry/opentelemetry-specification/blob/d4b241f451674e8f611bb589477680341006ad2b/specification/configuration/sdk-environment-variables.md#exemplar - const filterEnvKey = "OTEL_METRICS_EXEMPLAR_FILTER" +// ExemplarReservoirProviderSelector selects the +// [exemplar.ReservoirProvider] to use +// based on the [Aggregation] of the metric. +type ExemplarReservoirProviderSelector func(Aggregation) exemplar.ReservoirProvider - var filter exemplar.Filter - - switch os.Getenv(filterEnvKey) { - case "always_on": - filter = exemplar.AlwaysOnFilter - case "always_off": - return exemplar.Drop - case "trace_based": - fallthrough - default: - filter = exemplar.SampledFilter +// reservoirFunc returns the appropriately configured exemplar reservoir +// creation func based on the passed InstrumentKind and filter configuration. +func reservoirFunc[N int64 | float64](provider exemplar.ReservoirProvider, filter exemplar.Filter) func(attribute.Set) aggregate.FilteredExemplarReservoir[N] { + return func(attrs attribute.Set) aggregate.FilteredExemplarReservoir[N] { + return aggregate.NewFilteredExemplarReservoir[N](filter, provider(attrs)) } +} +// DefaultExemplarReservoirProviderSelector returns the default +// [exemplar.ReservoirProvider] for the +// provided [Aggregation]. +// +// For explicit bucket histograms with more than 1 bucket, it uses the +// [exemplar.HistogramReservoirProvider]. +// For exponential histograms, it uses the +// [exemplar.FixedSizeReservoirProvider] +// with a size of min(20, max_buckets). +// For all other aggregations, it uses the +// [exemplar.FixedSizeReservoirProvider] +// with a size equal to the number of CPUs. +// +// Exemplar default reservoirs MAY change in a minor version bump. No +// guarantees are made on the shape or statistical properties of returned +// exemplars. +func DefaultExemplarReservoirProviderSelector(agg Aggregation) exemplar.ReservoirProvider { // https://github.com/open-telemetry/opentelemetry-specification/blob/d4b241f451674e8f611bb589477680341006ad2b/specification/metrics/sdk.md#exemplar-defaults // Explicit bucket histogram aggregation with more than 1 bucket will // use AlignedHistogramBucketExemplarReservoir. a, ok := agg.(AggregationExplicitBucketHistogram) if ok && len(a.Boundaries) > 0 { - cp := slices.Clone(a.Boundaries) - return func() exemplar.FilteredReservoir[N] { - bounds := cp - return exemplar.NewFilteredReservoir[N](filter, exemplar.Histogram(bounds)) - } + return exemplar.HistogramReservoirProvider(a.Boundaries) } var n int @@ -75,7 +73,5 @@ func reservoirFunc[N int64 | float64](agg Aggregation) func() exemplar.FilteredR } } - return func() exemplar.FilteredReservoir[N] { - return exemplar.NewFilteredReservoir[N](filter, exemplar.FixedSize(n)) - } + return exemplar.FixedSizeReservoirProvider(n) } diff --git a/vendor/go.opentelemetry.io/otel/sdk/metric/exemplar/README.md b/vendor/go.opentelemetry.io/otel/sdk/metric/exemplar/README.md new file mode 100644 index 000000000..d1025f5eb --- /dev/null +++ b/vendor/go.opentelemetry.io/otel/sdk/metric/exemplar/README.md @@ -0,0 +1,3 @@ +# Metric SDK Exemplars + +[](https://pkg.go.dev/go.opentelemetry.io/otel/sdk/metric/exemplar) diff --git a/vendor/go.opentelemetry.io/otel/sdk/metric/internal/exemplar/doc.go b/vendor/go.opentelemetry.io/otel/sdk/metric/exemplar/doc.go index 5394f48e0..9f2389376 100644 --- a/vendor/go.opentelemetry.io/otel/sdk/metric/internal/exemplar/doc.go +++ b/vendor/go.opentelemetry.io/otel/sdk/metric/exemplar/doc.go @@ -3,4 +3,4 @@ // Package exemplar provides an implementation of the OpenTelemetry exemplar // reservoir to be used in metric collection pipelines. -package exemplar // import "go.opentelemetry.io/otel/sdk/metric/internal/exemplar" +package exemplar // import "go.opentelemetry.io/otel/sdk/metric/exemplar" diff --git a/vendor/go.opentelemetry.io/otel/sdk/metric/internal/exemplar/exemplar.go b/vendor/go.opentelemetry.io/otel/sdk/metric/exemplar/exemplar.go index fcaa6a469..1ab694678 100644 --- a/vendor/go.opentelemetry.io/otel/sdk/metric/internal/exemplar/exemplar.go +++ b/vendor/go.opentelemetry.io/otel/sdk/metric/exemplar/exemplar.go @@ -1,7 +1,7 @@ // Copyright The OpenTelemetry Authors // SPDX-License-Identifier: Apache-2.0 -package exemplar // import "go.opentelemetry.io/otel/sdk/metric/internal/exemplar" +package exemplar // import "go.opentelemetry.io/otel/sdk/metric/exemplar" import ( "time" diff --git a/vendor/go.opentelemetry.io/otel/sdk/metric/internal/exemplar/filter.go b/vendor/go.opentelemetry.io/otel/sdk/metric/exemplar/filter.go index 152a069a0..b595e2ace 100644 --- a/vendor/go.opentelemetry.io/otel/sdk/metric/internal/exemplar/filter.go +++ b/vendor/go.opentelemetry.io/otel/sdk/metric/exemplar/filter.go @@ -1,7 +1,7 @@ // Copyright The OpenTelemetry Authors // SPDX-License-Identifier: Apache-2.0 -package exemplar // import "go.opentelemetry.io/otel/sdk/metric/internal/exemplar" +package exemplar // import "go.opentelemetry.io/otel/sdk/metric/exemplar" import ( "context" @@ -16,10 +16,10 @@ import ( // Reservoir in making a sampling decision. type Filter func(context.Context) bool -// SampledFilter is a [Filter] that will only offer measurements +// TraceBasedFilter is a [Filter] that will only offer measurements // if the passed context associated with the measurement contains a sampled // [go.opentelemetry.io/otel/trace.SpanContext]. -func SampledFilter(ctx context.Context) bool { +func TraceBasedFilter(ctx context.Context) bool { return trace.SpanContextFromContext(ctx).IsSampled() } @@ -27,3 +27,8 @@ func SampledFilter(ctx context.Context) bool { func AlwaysOnFilter(ctx context.Context) bool { return true } + +// AlwaysOffFilter is a [Filter] that never offers measurements. +func AlwaysOffFilter(ctx context.Context) bool { + return false +} diff --git a/vendor/go.opentelemetry.io/otel/sdk/metric/internal/exemplar/rand.go b/vendor/go.opentelemetry.io/otel/sdk/metric/exemplar/fixed_size_reservoir.go index 199a2608f..d4aab0aad 100644 --- a/vendor/go.opentelemetry.io/otel/sdk/metric/internal/exemplar/rand.go +++ b/vendor/go.opentelemetry.io/otel/sdk/metric/exemplar/fixed_size_reservoir.go @@ -1,31 +1,69 @@ // Copyright The OpenTelemetry Authors // SPDX-License-Identifier: Apache-2.0 -package exemplar // import "go.opentelemetry.io/otel/sdk/metric/internal/exemplar" +package exemplar // import "go.opentelemetry.io/otel/sdk/metric/exemplar" import ( "context" "math" "math/rand" - "sync" "time" "go.opentelemetry.io/otel/attribute" ) -var ( +// FixedSizeReservoirProvider returns a provider of [FixedSizeReservoir]. +func FixedSizeReservoirProvider(k int) ReservoirProvider { + return func(_ attribute.Set) Reservoir { + return NewFixedSizeReservoir(k) + } +} + +// NewFixedSizeReservoir returns a [FixedSizeReservoir] that samples at most +// k exemplars. If there are k or less measurements made, the Reservoir will +// sample each one. If there are more than k, the Reservoir will then randomly +// sample all additional measurement with a decreasing probability. +func NewFixedSizeReservoir(k int) *FixedSizeReservoir { + return newFixedSizeReservoir(newStorage(k)) +} + +var _ Reservoir = &FixedSizeReservoir{} + +// FixedSizeReservoir is a [Reservoir] that samples at most k exemplars. If +// there are k or less measurements made, the Reservoir will sample each one. +// If there are more than k, the Reservoir will then randomly sample all +// additional measurement with a decreasing probability. +type FixedSizeReservoir struct { + *storage + + // count is the number of measurement seen. + count int64 + // next is the next count that will store a measurement at a random index + // once the reservoir has been filled. + next int64 + // w is the largest random number in a distribution that is used to compute + // the next next. + w float64 + // rng is used to make sampling decisions. // // Do not use crypto/rand. There is no reason for the decrease in performance // given this is not a security sensitive decision. - rng = rand.New(rand.NewSource(time.Now().UnixNano())) - // Ensure concurrent safe accecess to rng and its underlying source. - rngMu sync.Mutex -) + rng *rand.Rand +} -// random returns, as a float64, a uniform pseudo-random number in the open -// interval (0.0,1.0). -func random() float64 { +func newFixedSizeReservoir(s *storage) *FixedSizeReservoir { + r := &FixedSizeReservoir{ + storage: s, + rng: rand.New(rand.NewSource(time.Now().UnixNano())), + } + r.reset() + return r +} + +// randomFloat64 returns, as a float64, a uniform pseudo-random number in the +// open interval (0.0,1.0). +func (r *FixedSizeReservoir) randomFloat64() float64 { // TODO: This does not return a uniform number. rng.Float64 returns a // uniformly random int in [0,2^53) that is divided by 2^53. Meaning it // returns multiples of 2^-53, and not all floating point numbers between 0 @@ -43,40 +81,25 @@ func random() float64 { // // There are likely many other methods to explore here as well. - rngMu.Lock() - defer rngMu.Unlock() - - f := rng.Float64() + f := r.rng.Float64() for f == 0 { - f = rng.Float64() + f = r.rng.Float64() } return f } -// FixedSize returns a [Reservoir] that samples at most k exemplars. If there -// are k or less measurements made, the Reservoir will sample each one. If -// there are more than k, the Reservoir will then randomly sample all -// additional measurement with a decreasing probability. -func FixedSize(k int) Reservoir { - r := &randRes{storage: newStorage(k)} - r.reset() - return r -} - -type randRes struct { - *storage - - // count is the number of measurement seen. - count int64 - // next is the next count that will store a measurement at a random index - // once the reservoir has been filled. - next int64 - // w is the largest random number in a distribution that is used to compute - // the next next. - w float64 -} - -func (r *randRes) Offer(ctx context.Context, t time.Time, n Value, a []attribute.KeyValue) { +// Offer accepts the parameters associated with a measurement. The +// parameters will be stored as an exemplar if the Reservoir decides to +// sample the measurement. +// +// The passed ctx needs to contain any baggage or span that were active +// when the measurement was made. This information may be used by the +// Reservoir in making a sampling decision. +// +// The time t is the time when the measurement was made. The v and a +// parameters are the value and dropped (filtered) attributes of the +// measurement respectively. +func (r *FixedSizeReservoir) Offer(ctx context.Context, t time.Time, n Value, a []attribute.KeyValue) { // The following algorithm is "Algorithm L" from Li, Kim-Hung (4 December // 1994). "Reservoir-Sampling Algorithms of Time Complexity // O(n(1+log(N/n)))". ACM Transactions on Mathematical Software. 20 (4): @@ -123,7 +146,7 @@ func (r *randRes) Offer(ctx context.Context, t time.Time, n Value, a []attribute } else { if r.count == r.next { // Overwrite a random existing measurement with the one offered. - idx := int(rng.Int63n(int64(cap(r.store)))) + idx := int(r.rng.Int63n(int64(cap(r.store)))) r.store[idx] = newMeasurement(ctx, t, n, a) r.advance() } @@ -132,7 +155,7 @@ func (r *randRes) Offer(ctx context.Context, t time.Time, n Value, a []attribute } // reset resets r to the initial state. -func (r *randRes) reset() { +func (r *FixedSizeReservoir) reset() { // This resets the number of exemplars known. r.count = 0 // Random index inserts should only happen after the storage is full. @@ -147,14 +170,14 @@ func (r *randRes) reset() { // This maps the uniform random number in (0,1) to a geometric distribution // over the same interval. The mean of the distribution is inversely // proportional to the storage capacity. - r.w = math.Exp(math.Log(random()) / float64(cap(r.store))) + r.w = math.Exp(math.Log(r.randomFloat64()) / float64(cap(r.store))) r.advance() } // advance updates the count at which the offered measurement will overwrite an // existing exemplar. -func (r *randRes) advance() { +func (r *FixedSizeReservoir) advance() { // Calculate the next value in the random number series. // // The current value of r.w is based on the max of a distribution of random @@ -167,7 +190,7 @@ func (r *randRes) advance() { // therefore the next r.w will be based on the same distribution (i.e. // `max(u_1,u_2,...,u_k)`). Therefore, we can sample the next r.w by // computing the next random number `u` and take r.w as `w * u^(1/k)`. - r.w *= math.Exp(math.Log(random()) / float64(cap(r.store))) + r.w *= math.Exp(math.Log(r.randomFloat64()) / float64(cap(r.store))) // Use the new random number in the series to calculate the count of the // next measurement that will be stored. // @@ -178,10 +201,13 @@ func (r *randRes) advance() { // // Important to note, the new r.next will always be at least 1 more than // the last r.next. - r.next += int64(math.Log(random())/math.Log(1-r.w)) + 1 + r.next += int64(math.Log(r.randomFloat64())/math.Log(1-r.w)) + 1 } -func (r *randRes) Collect(dest *[]Exemplar) { +// Collect returns all the held exemplars. +// +// The Reservoir state is preserved after this call. +func (r *FixedSizeReservoir) Collect(dest *[]Exemplar) { r.storage.Collect(dest) // Call reset here even though it will reset r.count and restart the random // number series. This will persist any old exemplars as long as no new diff --git a/vendor/go.opentelemetry.io/otel/sdk/metric/exemplar/histogram_reservoir.go b/vendor/go.opentelemetry.io/otel/sdk/metric/exemplar/histogram_reservoir.go new file mode 100644 index 000000000..3b76cf305 --- /dev/null +++ b/vendor/go.opentelemetry.io/otel/sdk/metric/exemplar/histogram_reservoir.go @@ -0,0 +1,70 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package exemplar // import "go.opentelemetry.io/otel/sdk/metric/exemplar" + +import ( + "context" + "slices" + "sort" + "time" + + "go.opentelemetry.io/otel/attribute" +) + +// HistogramReservoirProvider is a provider of [HistogramReservoir]. +func HistogramReservoirProvider(bounds []float64) ReservoirProvider { + cp := slices.Clone(bounds) + slices.Sort(cp) + return func(_ attribute.Set) Reservoir { + return NewHistogramReservoir(cp) + } +} + +// NewHistogramReservoir returns a [HistogramReservoir] that samples the last +// measurement that falls within a histogram bucket. The histogram bucket +// upper-boundaries are define by bounds. +// +// The passed bounds must be sorted before calling this function. +func NewHistogramReservoir(bounds []float64) *HistogramReservoir { + return &HistogramReservoir{ + bounds: bounds, + storage: newStorage(len(bounds) + 1), + } +} + +var _ Reservoir = &HistogramReservoir{} + +// HistogramReservoir is a [Reservoir] that samples the last measurement that +// falls within a histogram bucket. The histogram bucket upper-boundaries are +// define by bounds. +type HistogramReservoir struct { + *storage + + // bounds are bucket bounds in ascending order. + bounds []float64 +} + +// Offer accepts the parameters associated with a measurement. The +// parameters will be stored as an exemplar if the Reservoir decides to +// sample the measurement. +// +// The passed ctx needs to contain any baggage or span that were active +// when the measurement was made. This information may be used by the +// Reservoir in making a sampling decision. +// +// The time t is the time when the measurement was made. The v and a +// parameters are the value and dropped (filtered) attributes of the +// measurement respectively. +func (r *HistogramReservoir) Offer(ctx context.Context, t time.Time, v Value, a []attribute.KeyValue) { + var x float64 + switch v.Type() { + case Int64ValueType: + x = float64(v.Int64()) + case Float64ValueType: + x = v.Float64() + default: + panic("unknown value type") + } + r.store[sort.SearchFloat64s(r.bounds, x)] = newMeasurement(ctx, t, v, a) +} diff --git a/vendor/go.opentelemetry.io/otel/sdk/metric/internal/exemplar/reservoir.go b/vendor/go.opentelemetry.io/otel/sdk/metric/exemplar/reservoir.go index 80fa59554..ba5cd1a6b 100644 --- a/vendor/go.opentelemetry.io/otel/sdk/metric/internal/exemplar/reservoir.go +++ b/vendor/go.opentelemetry.io/otel/sdk/metric/exemplar/reservoir.go @@ -1,7 +1,7 @@ // Copyright The OpenTelemetry Authors // SPDX-License-Identifier: Apache-2.0 -package exemplar // import "go.opentelemetry.io/otel/sdk/metric/internal/exemplar" +package exemplar // import "go.opentelemetry.io/otel/sdk/metric/exemplar" import ( "context" @@ -30,3 +30,11 @@ type Reservoir interface { // The Reservoir state is preserved after this call. Collect(dest *[]Exemplar) } + +// ReservoirProvider creates new [Reservoir]s. +// +// The attributes provided are attributes which are kept by the aggregation, and +// are exclusive with attributes passed to Offer. The combination of these +// attributes and the attributes passed to Offer is the complete set of +// attributes a measurement was made with. +type ReservoirProvider func(attr attribute.Set) Reservoir diff --git a/vendor/go.opentelemetry.io/otel/sdk/metric/internal/exemplar/storage.go b/vendor/go.opentelemetry.io/otel/sdk/metric/exemplar/storage.go index 10b2976f7..0e2e26dfb 100644 --- a/vendor/go.opentelemetry.io/otel/sdk/metric/internal/exemplar/storage.go +++ b/vendor/go.opentelemetry.io/otel/sdk/metric/exemplar/storage.go @@ -1,7 +1,7 @@ // Copyright The OpenTelemetry Authors // SPDX-License-Identifier: Apache-2.0 -package exemplar // import "go.opentelemetry.io/otel/sdk/metric/internal/exemplar" +package exemplar // import "go.opentelemetry.io/otel/sdk/metric/exemplar" import ( "context" @@ -35,7 +35,7 @@ func (r *storage) Collect(dest *[]Exemplar) { continue } - m.Exemplar(&(*dest)[n]) + m.exemplar(&(*dest)[n]) n++ } *dest = (*dest)[:n] @@ -66,8 +66,8 @@ func newMeasurement(ctx context.Context, ts time.Time, v Value, droppedAttr []at } } -// Exemplar returns m as an [Exemplar]. -func (m measurement) Exemplar(dest *Exemplar) { +// exemplar returns m as an [Exemplar]. +func (m measurement) exemplar(dest *Exemplar) { dest.FilteredAttributes = m.FilteredAttributes dest.Time = m.Time dest.Value = m.Value diff --git a/vendor/go.opentelemetry.io/otel/sdk/metric/internal/exemplar/value.go b/vendor/go.opentelemetry.io/otel/sdk/metric/exemplar/value.go index 1957d6b1e..590b089a8 100644 --- a/vendor/go.opentelemetry.io/otel/sdk/metric/internal/exemplar/value.go +++ b/vendor/go.opentelemetry.io/otel/sdk/metric/exemplar/value.go @@ -1,7 +1,7 @@ // Copyright The OpenTelemetry Authors // SPDX-License-Identifier: Apache-2.0 -package exemplar // import "go.opentelemetry.io/otel/sdk/metric/internal/exemplar" +package exemplar // import "go.opentelemetry.io/otel/sdk/metric/exemplar" import "math" @@ -28,7 +28,8 @@ type Value struct { func NewValue[N int64 | float64](value N) Value { switch v := any(value).(type) { case int64: - return Value{t: Int64ValueType, val: uint64(v)} + // This can be later converted back to int64 (overflow not checked). + return Value{t: Int64ValueType, val: uint64(v)} // nolint:gosec case float64: return Value{t: Float64ValueType, val: math.Float64bits(v)} } diff --git a/vendor/go.opentelemetry.io/otel/sdk/metric/exporter.go b/vendor/go.opentelemetry.io/otel/sdk/metric/exporter.go index 1a3cccb67..1969cb42c 100644 --- a/vendor/go.opentelemetry.io/otel/sdk/metric/exporter.go +++ b/vendor/go.opentelemetry.io/otel/sdk/metric/exporter.go @@ -5,14 +5,14 @@ package metric // import "go.opentelemetry.io/otel/sdk/metric" import ( "context" - "fmt" + "errors" "go.opentelemetry.io/otel/sdk/metric/metricdata" ) // ErrExporterShutdown is returned if Export or Shutdown are called after an // Exporter has been Shutdown. -var ErrExporterShutdown = fmt.Errorf("exporter is shutdown") +var ErrExporterShutdown = errors.New("exporter is shutdown") // Exporter handles the delivery of metric data to external receivers. This is // the final component in the metric push pipeline. diff --git a/vendor/go.opentelemetry.io/otel/sdk/metric/instrument.go b/vendor/go.opentelemetry.io/otel/sdk/metric/instrument.go index b52a330b3..c33e1a28c 100644 --- a/vendor/go.opentelemetry.io/otel/sdk/metric/instrument.go +++ b/vendor/go.opentelemetry.io/otel/sdk/metric/instrument.go @@ -16,6 +16,7 @@ import ( "go.opentelemetry.io/otel/metric/embedded" "go.opentelemetry.io/otel/sdk/instrumentation" "go.opentelemetry.io/otel/sdk/metric/internal/aggregate" + "go.opentelemetry.io/otel/sdk/metric/internal/x" ) var zeroScope instrumentation.Scope @@ -144,6 +145,12 @@ type Stream struct { // Use NewAllowKeysFilter from "go.opentelemetry.io/otel/attribute" to // provide an allow-list of attribute keys here. AttributeFilter attribute.Filter + // ExemplarReservoirProvider selects the + // [go.opentelemetry.io/otel/sdk/metric/exemplar.ReservoirProvider] based + // on the [Aggregation]. + // + // If unspecified, [DefaultExemplarReservoirProviderSelector] is used. + ExemplarReservoirProviderSelector ExemplarReservoirProviderSelector } // instID are the identifying properties of a instrument. @@ -184,6 +191,7 @@ var ( _ metric.Int64UpDownCounter = (*int64Inst)(nil) _ metric.Int64Histogram = (*int64Inst)(nil) _ metric.Int64Gauge = (*int64Inst)(nil) + _ x.EnabledInstrument = (*int64Inst)(nil) ) func (i *int64Inst) Add(ctx context.Context, val int64, opts ...metric.AddOption) { @@ -196,6 +204,10 @@ func (i *int64Inst) Record(ctx context.Context, val int64, opts ...metric.Record i.aggregate(ctx, val, c.Attributes()) } +func (i *int64Inst) Enabled(_ context.Context) bool { + return len(i.measures) != 0 +} + func (i *int64Inst) aggregate(ctx context.Context, val int64, s attribute.Set) { // nolint:revive // okay to shadow pkg with method. for _, in := range i.measures { in(ctx, val, s) @@ -216,6 +228,7 @@ var ( _ metric.Float64UpDownCounter = (*float64Inst)(nil) _ metric.Float64Histogram = (*float64Inst)(nil) _ metric.Float64Gauge = (*float64Inst)(nil) + _ x.EnabledInstrument = (*float64Inst)(nil) ) func (i *float64Inst) Add(ctx context.Context, val float64, opts ...metric.AddOption) { @@ -228,14 +241,18 @@ func (i *float64Inst) Record(ctx context.Context, val float64, opts ...metric.Re i.aggregate(ctx, val, c.Attributes()) } +func (i *float64Inst) Enabled(_ context.Context) bool { + return len(i.measures) != 0 +} + func (i *float64Inst) aggregate(ctx context.Context, val float64, s attribute.Set) { for _, in := range i.measures { in(ctx, val, s) } } -// observablID is a comparable unique identifier of an observable. -type observablID[N int64 | float64] struct { +// observableID is a comparable unique identifier of an observable. +type observableID[N int64 | float64] struct { name string description string kind InstrumentKind @@ -287,7 +304,7 @@ func newInt64Observable(m *meter, kind InstrumentKind, name, desc, u string) int type observable[N int64 | float64] struct { metric.Observable - observablID[N] + observableID[N] meter *meter measures measures[N] @@ -296,7 +313,7 @@ type observable[N int64 | float64] struct { func newObservable[N int64 | float64](m *meter, kind InstrumentKind, name, desc, u string) *observable[N] { return &observable[N]{ - observablID: observablID[N]{ + observableID: observableID[N]{ name: name, description: desc, kind: kind, diff --git a/vendor/go.opentelemetry.io/otel/sdk/metric/internal/aggregate/aggregate.go b/vendor/go.opentelemetry.io/otel/sdk/metric/internal/aggregate/aggregate.go index b18ee719b..fde219333 100644 --- a/vendor/go.opentelemetry.io/otel/sdk/metric/internal/aggregate/aggregate.go +++ b/vendor/go.opentelemetry.io/otel/sdk/metric/internal/aggregate/aggregate.go @@ -8,7 +8,6 @@ import ( "time" "go.opentelemetry.io/otel/attribute" - "go.opentelemetry.io/otel/sdk/metric/internal/exemplar" "go.opentelemetry.io/otel/sdk/metric/metricdata" ) @@ -38,8 +37,8 @@ type Builder[N int64 | float64] struct { // create new exemplar reservoirs for a new seen attribute set. // // If this is not provided a default factory function that returns an - // exemplar.Drop reservoir will be used. - ReservoirFunc func() exemplar.FilteredReservoir[N] + // dropReservoir reservoir will be used. + ReservoirFunc func(attribute.Set) FilteredExemplarReservoir[N] // AggregationLimit is the cardinality limit of measurement attributes. Any // measurement for new attributes once the limit has been reached will be // aggregated into a single aggregate for the "otel.metric.overflow" @@ -50,12 +49,12 @@ type Builder[N int64 | float64] struct { AggregationLimit int } -func (b Builder[N]) resFunc() func() exemplar.FilteredReservoir[N] { +func (b Builder[N]) resFunc() func(attribute.Set) FilteredExemplarReservoir[N] { if b.ReservoirFunc != nil { return b.ReservoirFunc } - return exemplar.Drop + return dropReservoir } type fltrMeasure[N int64 | float64] func(ctx context.Context, value N, fltrAttr attribute.Set, droppedAttr []attribute.KeyValue) diff --git a/vendor/go.opentelemetry.io/otel/sdk/metric/internal/aggregate/drop.go b/vendor/go.opentelemetry.io/otel/sdk/metric/internal/aggregate/drop.go new file mode 100644 index 000000000..8396faaa4 --- /dev/null +++ b/vendor/go.opentelemetry.io/otel/sdk/metric/internal/aggregate/drop.go @@ -0,0 +1,27 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package aggregate // import "go.opentelemetry.io/otel/sdk/metric/internal/aggregate" + +import ( + "context" + + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/sdk/metric/exemplar" +) + +// dropReservoir returns a [FilteredReservoir] that drops all measurements it is offered. +func dropReservoir[N int64 | float64](attribute.Set) FilteredExemplarReservoir[N] { + return &dropRes[N]{} +} + +type dropRes[N int64 | float64] struct{} + +// Offer does nothing, all measurements offered will be dropped. +func (r *dropRes[N]) Offer(context.Context, N, []attribute.KeyValue) {} + +// Collect resets dest. No exemplars will ever be returned. +func (r *dropRes[N]) Collect(dest *[]exemplar.Exemplar) { + clear(*dest) // Erase elements to let GC collect objects + *dest = (*dest)[:0] +} diff --git a/vendor/go.opentelemetry.io/otel/sdk/metric/internal/aggregate/exemplar.go b/vendor/go.opentelemetry.io/otel/sdk/metric/internal/aggregate/exemplar.go index 170ae8e58..25d709948 100644 --- a/vendor/go.opentelemetry.io/otel/sdk/metric/internal/aggregate/exemplar.go +++ b/vendor/go.opentelemetry.io/otel/sdk/metric/internal/aggregate/exemplar.go @@ -6,7 +6,7 @@ package aggregate // import "go.opentelemetry.io/otel/sdk/metric/internal/aggreg import ( "sync" - "go.opentelemetry.io/otel/sdk/metric/internal/exemplar" + "go.opentelemetry.io/otel/sdk/metric/exemplar" "go.opentelemetry.io/otel/sdk/metric/metricdata" ) @@ -17,6 +17,7 @@ var exemplarPool = sync.Pool{ func collectExemplars[N int64 | float64](out *[]metricdata.Exemplar[N], f func(*[]exemplar.Exemplar)) { dest := exemplarPool.Get().(*[]exemplar.Exemplar) defer func() { + clear(*dest) // Erase elements to let GC collect objects. *dest = (*dest)[:0] exemplarPool.Put(dest) }() diff --git a/vendor/go.opentelemetry.io/otel/sdk/metric/internal/aggregate/exponential_histogram.go b/vendor/go.opentelemetry.io/otel/sdk/metric/internal/aggregate/exponential_histogram.go index 707342408..336ea91d1 100644 --- a/vendor/go.opentelemetry.io/otel/sdk/metric/internal/aggregate/exponential_histogram.go +++ b/vendor/go.opentelemetry.io/otel/sdk/metric/internal/aggregate/exponential_histogram.go @@ -12,7 +12,6 @@ import ( "go.opentelemetry.io/otel" "go.opentelemetry.io/otel/attribute" - "go.opentelemetry.io/otel/sdk/metric/internal/exemplar" "go.opentelemetry.io/otel/sdk/metric/metricdata" ) @@ -31,7 +30,7 @@ const ( // expoHistogramDataPoint is a single data point in an exponential histogram. type expoHistogramDataPoint[N int64 | float64] struct { attrs attribute.Set - res exemplar.FilteredReservoir[N] + res FilteredExemplarReservoir[N] count uint64 min N @@ -51,16 +50,16 @@ type expoHistogramDataPoint[N int64 | float64] struct { func newExpoHistogramDataPoint[N int64 | float64](attrs attribute.Set, maxSize int, maxScale int32, noMinMax, noSum bool) *expoHistogramDataPoint[N] { f := math.MaxFloat64 - max := N(f) // if N is int64, max will overflow to -9223372036854775808 - min := N(-f) + ma := N(f) // if N is int64, max will overflow to -9223372036854775808 + mi := N(-f) if N(maxInt64) > N(f) { - max = N(maxInt64) - min = N(minInt64) + ma = N(maxInt64) + mi = N(minInt64) } return &expoHistogramDataPoint[N]{ attrs: attrs, - min: max, - max: min, + min: ma, + max: mi, maxSize: maxSize, noMinMax: noMinMax, noSum: noSum, @@ -284,7 +283,7 @@ func (b *expoBuckets) downscale(delta int32) { // newExponentialHistogram returns an Aggregator that summarizes a set of // measurements as an exponential histogram. Each histogram is scoped by attributes // and the aggregation cycle the measurements were made in. -func newExponentialHistogram[N int64 | float64](maxSize, maxScale int32, noMinMax, noSum bool, limit int, r func() exemplar.FilteredReservoir[N]) *expoHistogram[N] { +func newExponentialHistogram[N int64 | float64](maxSize, maxScale int32, noMinMax, noSum bool, limit int, r func(attribute.Set) FilteredExemplarReservoir[N]) *expoHistogram[N] { return &expoHistogram[N]{ noSum: noSum, noMinMax: noMinMax, @@ -307,7 +306,7 @@ type expoHistogram[N int64 | float64] struct { maxSize int maxScale int32 - newRes func() exemplar.FilteredReservoir[N] + newRes func(attribute.Set) FilteredExemplarReservoir[N] limit limiter[*expoHistogramDataPoint[N]] values map[attribute.Distinct]*expoHistogramDataPoint[N] valuesMu sync.Mutex @@ -328,7 +327,7 @@ func (e *expoHistogram[N]) measure(ctx context.Context, value N, fltrAttr attrib v, ok := e.values[attr.Equivalent()] if !ok { v = newExpoHistogramDataPoint[N](attr, e.maxSize, e.maxScale, e.noMinMax, e.noSum) - v.res = e.newRes() + v.res = e.newRes(attr) e.values[attr.Equivalent()] = v } diff --git a/vendor/go.opentelemetry.io/otel/sdk/metric/internal/aggregate/filtered_reservoir.go b/vendor/go.opentelemetry.io/otel/sdk/metric/internal/aggregate/filtered_reservoir.go new file mode 100644 index 000000000..691a91060 --- /dev/null +++ b/vendor/go.opentelemetry.io/otel/sdk/metric/internal/aggregate/filtered_reservoir.go @@ -0,0 +1,50 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package aggregate // import "go.opentelemetry.io/otel/sdk/metric/internal/aggregate" + +import ( + "context" + "time" + + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/sdk/metric/exemplar" +) + +// FilteredExemplarReservoir wraps a [exemplar.Reservoir] with a filter. +type FilteredExemplarReservoir[N int64 | float64] interface { + // Offer accepts the parameters associated with a measurement. The + // parameters will be stored as an exemplar if the filter decides to + // sample the measurement. + // + // The passed ctx needs to contain any baggage or span that were active + // when the measurement was made. This information may be used by the + // Reservoir in making a sampling decision. + Offer(ctx context.Context, val N, attr []attribute.KeyValue) + // Collect returns all the held exemplars in the reservoir. + Collect(dest *[]exemplar.Exemplar) +} + +// filteredExemplarReservoir handles the pre-sampled exemplar of measurements made. +type filteredExemplarReservoir[N int64 | float64] struct { + filter exemplar.Filter + reservoir exemplar.Reservoir +} + +// NewFilteredExemplarReservoir creates a [FilteredExemplarReservoir] which only offers values +// that are allowed by the filter. +func NewFilteredExemplarReservoir[N int64 | float64](f exemplar.Filter, r exemplar.Reservoir) FilteredExemplarReservoir[N] { + return &filteredExemplarReservoir[N]{ + filter: f, + reservoir: r, + } +} + +func (f *filteredExemplarReservoir[N]) Offer(ctx context.Context, val N, attr []attribute.KeyValue) { + if f.filter(ctx) { + // only record the current time if we are sampling this measurement. + f.reservoir.Offer(ctx, time.Now(), exemplar.NewValue(val), attr) + } +} + +func (f *filteredExemplarReservoir[N]) Collect(dest *[]exemplar.Exemplar) { f.reservoir.Collect(dest) } diff --git a/vendor/go.opentelemetry.io/otel/sdk/metric/internal/aggregate/histogram.go b/vendor/go.opentelemetry.io/otel/sdk/metric/internal/aggregate/histogram.go index ade0941f5..d577ae2c1 100644 --- a/vendor/go.opentelemetry.io/otel/sdk/metric/internal/aggregate/histogram.go +++ b/vendor/go.opentelemetry.io/otel/sdk/metric/internal/aggregate/histogram.go @@ -11,13 +11,12 @@ import ( "time" "go.opentelemetry.io/otel/attribute" - "go.opentelemetry.io/otel/sdk/metric/internal/exemplar" "go.opentelemetry.io/otel/sdk/metric/metricdata" ) type buckets[N int64 | float64] struct { attrs attribute.Set - res exemplar.FilteredReservoir[N] + res FilteredExemplarReservoir[N] counts []uint64 count uint64 @@ -48,13 +47,13 @@ type histValues[N int64 | float64] struct { noSum bool bounds []float64 - newRes func() exemplar.FilteredReservoir[N] + newRes func(attribute.Set) FilteredExemplarReservoir[N] limit limiter[*buckets[N]] values map[attribute.Distinct]*buckets[N] valuesMu sync.Mutex } -func newHistValues[N int64 | float64](bounds []float64, noSum bool, limit int, r func() exemplar.FilteredReservoir[N]) *histValues[N] { +func newHistValues[N int64 | float64](bounds []float64, noSum bool, limit int, r func(attribute.Set) FilteredExemplarReservoir[N]) *histValues[N] { // The responsibility of keeping all buckets correctly associated with the // passed boundaries is ultimately this type's responsibility. Make a copy // here so we can always guarantee this. Or, in the case of failure, have @@ -94,7 +93,7 @@ func (s *histValues[N]) measure(ctx context.Context, value N, fltrAttr attribute // // buckets = (-∞, 0], (0, 5.0], (5.0, 10.0], (10.0, +∞) b = newBuckets[N](attr, len(s.bounds)+1) - b.res = s.newRes() + b.res = s.newRes(attr) // Ensure min and max are recorded values (not zero), for new buckets. b.min, b.max = value, value @@ -109,7 +108,7 @@ func (s *histValues[N]) measure(ctx context.Context, value N, fltrAttr attribute // newHistogram returns an Aggregator that summarizes a set of measurements as // an histogram. -func newHistogram[N int64 | float64](boundaries []float64, noMinMax, noSum bool, limit int, r func() exemplar.FilteredReservoir[N]) *histogram[N] { +func newHistogram[N int64 | float64](boundaries []float64, noMinMax, noSum bool, limit int, r func(attribute.Set) FilteredExemplarReservoir[N]) *histogram[N] { return &histogram[N]{ histValues: newHistValues[N](boundaries, noSum, limit, r), noMinMax: noMinMax, diff --git a/vendor/go.opentelemetry.io/otel/sdk/metric/internal/aggregate/lastvalue.go b/vendor/go.opentelemetry.io/otel/sdk/metric/internal/aggregate/lastvalue.go index c35936840..d3a93f085 100644 --- a/vendor/go.opentelemetry.io/otel/sdk/metric/internal/aggregate/lastvalue.go +++ b/vendor/go.opentelemetry.io/otel/sdk/metric/internal/aggregate/lastvalue.go @@ -9,7 +9,6 @@ import ( "time" "go.opentelemetry.io/otel/attribute" - "go.opentelemetry.io/otel/sdk/metric/internal/exemplar" "go.opentelemetry.io/otel/sdk/metric/metricdata" ) @@ -17,10 +16,10 @@ import ( type datapoint[N int64 | float64] struct { attrs attribute.Set value N - res exemplar.FilteredReservoir[N] + res FilteredExemplarReservoir[N] } -func newLastValue[N int64 | float64](limit int, r func() exemplar.FilteredReservoir[N]) *lastValue[N] { +func newLastValue[N int64 | float64](limit int, r func(attribute.Set) FilteredExemplarReservoir[N]) *lastValue[N] { return &lastValue[N]{ newRes: r, limit: newLimiter[datapoint[N]](limit), @@ -33,7 +32,7 @@ func newLastValue[N int64 | float64](limit int, r func() exemplar.FilteredReserv type lastValue[N int64 | float64] struct { sync.Mutex - newRes func() exemplar.FilteredReservoir[N] + newRes func(attribute.Set) FilteredExemplarReservoir[N] limit limiter[datapoint[N]] values map[attribute.Distinct]datapoint[N] start time.Time @@ -46,7 +45,7 @@ func (s *lastValue[N]) measure(ctx context.Context, value N, fltrAttr attribute. attr := s.limit.Attributes(fltrAttr, s.values) d, ok := s.values[attr.Equivalent()] if !ok { - d.res = s.newRes() + d.res = s.newRes(attr) } d.attrs = attr @@ -115,7 +114,7 @@ func (s *lastValue[N]) copyDpts(dest *[]metricdata.DataPoint[N], t time.Time) in // newPrecomputedLastValue returns an aggregator that summarizes a set of // observations as the last one made. -func newPrecomputedLastValue[N int64 | float64](limit int, r func() exemplar.FilteredReservoir[N]) *precomputedLastValue[N] { +func newPrecomputedLastValue[N int64 | float64](limit int, r func(attribute.Set) FilteredExemplarReservoir[N]) *precomputedLastValue[N] { return &precomputedLastValue[N]{lastValue: newLastValue[N](limit, r)} } diff --git a/vendor/go.opentelemetry.io/otel/sdk/metric/internal/aggregate/sum.go b/vendor/go.opentelemetry.io/otel/sdk/metric/internal/aggregate/sum.go index 891366922..8e132ad61 100644 --- a/vendor/go.opentelemetry.io/otel/sdk/metric/internal/aggregate/sum.go +++ b/vendor/go.opentelemetry.io/otel/sdk/metric/internal/aggregate/sum.go @@ -9,25 +9,24 @@ import ( "time" "go.opentelemetry.io/otel/attribute" - "go.opentelemetry.io/otel/sdk/metric/internal/exemplar" "go.opentelemetry.io/otel/sdk/metric/metricdata" ) type sumValue[N int64 | float64] struct { n N - res exemplar.FilteredReservoir[N] + res FilteredExemplarReservoir[N] attrs attribute.Set } // valueMap is the storage for sums. type valueMap[N int64 | float64] struct { sync.Mutex - newRes func() exemplar.FilteredReservoir[N] + newRes func(attribute.Set) FilteredExemplarReservoir[N] limit limiter[sumValue[N]] values map[attribute.Distinct]sumValue[N] } -func newValueMap[N int64 | float64](limit int, r func() exemplar.FilteredReservoir[N]) *valueMap[N] { +func newValueMap[N int64 | float64](limit int, r func(attribute.Set) FilteredExemplarReservoir[N]) *valueMap[N] { return &valueMap[N]{ newRes: r, limit: newLimiter[sumValue[N]](limit), @@ -42,7 +41,7 @@ func (s *valueMap[N]) measure(ctx context.Context, value N, fltrAttr attribute.S attr := s.limit.Attributes(fltrAttr, s.values) v, ok := s.values[attr.Equivalent()] if !ok { - v.res = s.newRes() + v.res = s.newRes(attr) } v.attrs = attr @@ -55,7 +54,7 @@ func (s *valueMap[N]) measure(ctx context.Context, value N, fltrAttr attribute.S // newSum returns an aggregator that summarizes a set of measurements as their // arithmetic sum. Each sum is scoped by attributes and the aggregation cycle // the measurements were made in. -func newSum[N int64 | float64](monotonic bool, limit int, r func() exemplar.FilteredReservoir[N]) *sum[N] { +func newSum[N int64 | float64](monotonic bool, limit int, r func(attribute.Set) FilteredExemplarReservoir[N]) *sum[N] { return &sum[N]{ valueMap: newValueMap[N](limit, r), monotonic: monotonic, @@ -142,9 +141,9 @@ func (s *sum[N]) cumulative(dest *metricdata.Aggregation) int { } // newPrecomputedSum returns an aggregator that summarizes a set of -// observatrions as their arithmetic sum. Each sum is scoped by attributes and +// observations as their arithmetic sum. Each sum is scoped by attributes and // the aggregation cycle the measurements were made in. -func newPrecomputedSum[N int64 | float64](monotonic bool, limit int, r func() exemplar.FilteredReservoir[N]) *precomputedSum[N] { +func newPrecomputedSum[N int64 | float64](monotonic bool, limit int, r func(attribute.Set) FilteredExemplarReservoir[N]) *precomputedSum[N] { return &precomputedSum[N]{ valueMap: newValueMap[N](limit, r), monotonic: monotonic, @@ -152,7 +151,7 @@ func newPrecomputedSum[N int64 | float64](monotonic bool, limit int, r func() ex } } -// precomputedSum summarizes a set of observatrions as their arithmetic sum. +// precomputedSum summarizes a set of observations as their arithmetic sum. type precomputedSum[N int64 | float64] struct { *valueMap[N] diff --git a/vendor/go.opentelemetry.io/otel/sdk/metric/internal/exemplar/drop.go b/vendor/go.opentelemetry.io/otel/sdk/metric/internal/exemplar/drop.go deleted file mode 100644 index 5a0f39ae1..000000000 --- a/vendor/go.opentelemetry.io/otel/sdk/metric/internal/exemplar/drop.go +++ /dev/null @@ -1,23 +0,0 @@ -// Copyright The OpenTelemetry Authors -// SPDX-License-Identifier: Apache-2.0 - -package exemplar // import "go.opentelemetry.io/otel/sdk/metric/internal/exemplar" - -import ( - "context" - - "go.opentelemetry.io/otel/attribute" -) - -// Drop returns a [FilteredReservoir] that drops all measurements it is offered. -func Drop[N int64 | float64]() FilteredReservoir[N] { return &dropRes[N]{} } - -type dropRes[N int64 | float64] struct{} - -// Offer does nothing, all measurements offered will be dropped. -func (r *dropRes[N]) Offer(context.Context, N, []attribute.KeyValue) {} - -// Collect resets dest. No exemplars will ever be returned. -func (r *dropRes[N]) Collect(dest *[]Exemplar) { - *dest = (*dest)[:0] -} diff --git a/vendor/go.opentelemetry.io/otel/sdk/metric/internal/exemplar/filtered_reservoir.go b/vendor/go.opentelemetry.io/otel/sdk/metric/internal/exemplar/filtered_reservoir.go deleted file mode 100644 index 9fedfa4be..000000000 --- a/vendor/go.opentelemetry.io/otel/sdk/metric/internal/exemplar/filtered_reservoir.go +++ /dev/null @@ -1,49 +0,0 @@ -// Copyright The OpenTelemetry Authors -// SPDX-License-Identifier: Apache-2.0 - -package exemplar // import "go.opentelemetry.io/otel/sdk/metric/internal/exemplar" - -import ( - "context" - "time" - - "go.opentelemetry.io/otel/attribute" -) - -// FilteredReservoir wraps a [Reservoir] with a filter. -type FilteredReservoir[N int64 | float64] interface { - // Offer accepts the parameters associated with a measurement. The - // parameters will be stored as an exemplar if the filter decides to - // sample the measurement. - // - // The passed ctx needs to contain any baggage or span that were active - // when the measurement was made. This information may be used by the - // Reservoir in making a sampling decision. - Offer(ctx context.Context, val N, attr []attribute.KeyValue) - // Collect returns all the held exemplars in the reservoir. - Collect(dest *[]Exemplar) -} - -// filteredReservoir handles the pre-sampled exemplar of measurements made. -type filteredReservoir[N int64 | float64] struct { - filter Filter - reservoir Reservoir -} - -// NewFilteredReservoir creates a [FilteredReservoir] which only offers values -// that are allowed by the filter. -func NewFilteredReservoir[N int64 | float64](f Filter, r Reservoir) FilteredReservoir[N] { - return &filteredReservoir[N]{ - filter: f, - reservoir: r, - } -} - -func (f *filteredReservoir[N]) Offer(ctx context.Context, val N, attr []attribute.KeyValue) { - if f.filter(ctx) { - // only record the current time if we are sampling this measurment. - f.reservoir.Offer(ctx, time.Now(), NewValue(val), attr) - } -} - -func (f *filteredReservoir[N]) Collect(dest *[]Exemplar) { f.reservoir.Collect(dest) } diff --git a/vendor/go.opentelemetry.io/otel/sdk/metric/internal/exemplar/hist.go b/vendor/go.opentelemetry.io/otel/sdk/metric/internal/exemplar/hist.go deleted file mode 100644 index a6ff86d02..000000000 --- a/vendor/go.opentelemetry.io/otel/sdk/metric/internal/exemplar/hist.go +++ /dev/null @@ -1,46 +0,0 @@ -// Copyright The OpenTelemetry Authors -// SPDX-License-Identifier: Apache-2.0 - -package exemplar // import "go.opentelemetry.io/otel/sdk/metric/internal/exemplar" - -import ( - "context" - "slices" - "sort" - "time" - - "go.opentelemetry.io/otel/attribute" -) - -// Histogram returns a [Reservoir] that samples the last measurement that falls -// within a histogram bucket. The histogram bucket upper-boundaries are define -// by bounds. -// -// The passed bounds will be sorted by this function. -func Histogram(bounds []float64) Reservoir { - slices.Sort(bounds) - return &histRes{ - bounds: bounds, - storage: newStorage(len(bounds) + 1), - } -} - -type histRes struct { - *storage - - // bounds are bucket bounds in ascending order. - bounds []float64 -} - -func (r *histRes) Offer(ctx context.Context, t time.Time, v Value, a []attribute.KeyValue) { - var x float64 - switch v.Type() { - case Int64ValueType: - x = float64(v.Int64()) - case Float64ValueType: - x = v.Float64() - default: - panic("unknown value type") - } - r.store[sort.SearchFloat64s(r.bounds, x)] = newMeasurement(ctx, t, v, a) -} diff --git a/vendor/go.opentelemetry.io/otel/sdk/metric/internal/x/README.md b/vendor/go.opentelemetry.io/otel/sdk/metric/internal/x/README.md index aba69d654..59f736b73 100644 --- a/vendor/go.opentelemetry.io/otel/sdk/metric/internal/x/README.md +++ b/vendor/go.opentelemetry.io/otel/sdk/metric/internal/x/README.md @@ -10,6 +10,7 @@ See the [Compatibility and Stability](#compatibility-and-stability) section for - [Cardinality Limit](#cardinality-limit) - [Exemplars](#exemplars) +- [Instrument Enabled](#instrument-enabled) ### Cardinality Limit @@ -102,6 +103,24 @@ Revert to the default exemplar filter (`"trace_based"`) unset OTEL_METRICS_EXEMPLAR_FILTER ``` +### Instrument Enabled + +To help users avoid performing computationally expensive operations when recording measurements, synchronous instruments provide an `Enabled` method. + +#### Examples + +The following code shows an example of how to check if an instrument implements the `EnabledInstrument` interface before using the `Enabled` function to avoid doing an expensive computation: + +```go +type enabledInstrument interface { Enabled(context.Context) bool } + +ctr, err := m.Int64Counter("expensive-counter") +c, ok := ctr.(enabledInstrument) +if !ok || c.Enabled(context.Background()) { + c.Add(expensiveComputation()) +} +``` + ## Compatibility and Stability Experimental features do not fall within the scope of the OpenTelemetry Go versioning and stability [policy](../../../../VERSIONING.md). diff --git a/vendor/go.opentelemetry.io/otel/sdk/metric/internal/x/x.go b/vendor/go.opentelemetry.io/otel/sdk/metric/internal/x/x.go index 8cd2f3741..a98606238 100644 --- a/vendor/go.opentelemetry.io/otel/sdk/metric/internal/x/x.go +++ b/vendor/go.opentelemetry.io/otel/sdk/metric/internal/x/x.go @@ -8,41 +8,26 @@ package x // import "go.opentelemetry.io/otel/sdk/metric/internal/x" import ( + "context" "os" "strconv" - "strings" ) -var ( - // Exemplars is an experimental feature flag that defines if exemplars - // should be recorded for metric data-points. - // - // To enable this feature set the OTEL_GO_X_EXEMPLAR environment variable - // to the case-insensitive string value of "true" (i.e. "True" and "TRUE" - // will also enable this). - Exemplars = newFeature("EXEMPLAR", func(v string) (string, bool) { - if strings.ToLower(v) == "true" { - return v, true - } - return "", false - }) - - // CardinalityLimit is an experimental feature flag that defines if - // cardinality limits should be applied to the recorded metric data-points. - // - // To enable this feature set the OTEL_GO_X_CARDINALITY_LIMIT environment - // variable to the integer limit value you want to use. - // - // Setting OTEL_GO_X_CARDINALITY_LIMIT to a value less than or equal to 0 - // will disable the cardinality limits. - CardinalityLimit = newFeature("CARDINALITY_LIMIT", func(v string) (int, bool) { - n, err := strconv.Atoi(v) - if err != nil { - return 0, false - } - return n, true - }) -) +// CardinalityLimit is an experimental feature flag that defines if +// cardinality limits should be applied to the recorded metric data-points. +// +// To enable this feature set the OTEL_GO_X_CARDINALITY_LIMIT environment +// variable to the integer limit value you want to use. +// +// Setting OTEL_GO_X_CARDINALITY_LIMIT to a value less than or equal to 0 +// will disable the cardinality limits. +var CardinalityLimit = newFeature("CARDINALITY_LIMIT", func(v string) (int, bool) { + n, err := strconv.Atoi(v) + if err != nil { + return 0, false + } + return n, true +}) // Feature is an experimental feature control flag. It provides a uniform way // to interact with these feature flags and parse their values. @@ -83,3 +68,14 @@ func (f Feature[T]) Enabled() bool { _, ok := f.Lookup() return ok } + +// EnabledInstrument informs whether the instrument is enabled. +// +// EnabledInstrument interface is implemented by synchronous instruments. +type EnabledInstrument interface { + // Enabled returns whether the instrument will process measurements for the given context. + // + // This function can be used in places where measuring an instrument + // would result in computationally expensive operations. + Enabled(context.Context) bool +} diff --git a/vendor/go.opentelemetry.io/otel/sdk/metric/manual_reader.go b/vendor/go.opentelemetry.io/otel/sdk/metric/manual_reader.go index e0fd86ca7..c495985bc 100644 --- a/vendor/go.opentelemetry.io/otel/sdk/metric/manual_reader.go +++ b/vendor/go.opentelemetry.io/otel/sdk/metric/manual_reader.go @@ -113,18 +113,17 @@ func (mr *ManualReader) Collect(ctx context.Context, rm *metricdata.ResourceMetr if err != nil { return err } - var errs []error for _, producer := range mr.externalProducers.Load().([]Producer) { - externalMetrics, err := producer.Produce(ctx) - if err != nil { - errs = append(errs, err) + externalMetrics, e := producer.Produce(ctx) + if e != nil { + err = errors.Join(err, e) } rm.ScopeMetrics = append(rm.ScopeMetrics, externalMetrics...) } global.Debug("ManualReader collection", "Data", rm) - return unifyErrors(errs) + return err } // MarshalLog returns logging data about the ManualReader. diff --git a/vendor/go.opentelemetry.io/otel/sdk/metric/meter.go b/vendor/go.opentelemetry.io/otel/sdk/metric/meter.go index 2309e5b2b..a6ccd117b 100644 --- a/vendor/go.opentelemetry.io/otel/sdk/metric/meter.go +++ b/vendor/go.opentelemetry.io/otel/sdk/metric/meter.go @@ -150,6 +150,11 @@ func (m *meter) int64ObservableInstrument(id Instrument, callbacks []metric.Int6 continue } inst.appendMeasures(in) + + // Add the measures to the pipeline. It is required to maintain + // measures per pipeline to avoid calling the measure that + // is not part of the pipeline. + insert.pipeline.addInt64Measure(inst.observableID, in) for _, cback := range callbacks { inst := int64Observer{measures: in} fn := cback @@ -309,6 +314,11 @@ func (m *meter) float64ObservableInstrument(id Instrument, callbacks []metric.Fl continue } inst.appendMeasures(in) + + // Add the measures to the pipeline. It is required to maintain + // measures per pipeline to avoid calling the measure that + // is not part of the pipeline. + insert.pipeline.addFloat64Measure(inst.observableID, in) for _, cback := range callbacks { inst := float64Observer{measures: in} fn := cback @@ -441,73 +451,80 @@ func (m *meter) RegisterCallback(f metric.Callback, insts ...metric.Observable) return noopRegister{}, nil } - reg := newObserver() - var errs multierror + var err error + validInstruments := make([]metric.Observable, 0, len(insts)) for _, inst := range insts { - // Unwrap any global. - if u, ok := inst.(interface { - Unwrap() metric.Observable - }); ok { - inst = u.Unwrap() - } - switch o := inst.(type) { case int64Observable: - if err := o.registerable(m); err != nil { - if !errors.Is(err, errEmptyAgg) { - errs.append(err) + if e := o.registerable(m); e != nil { + if !errors.Is(e, errEmptyAgg) { + err = errors.Join(err, e) } continue } - reg.registerInt64(o.observablID) + + validInstruments = append(validInstruments, inst) case float64Observable: - if err := o.registerable(m); err != nil { - if !errors.Is(err, errEmptyAgg) { - errs.append(err) + if e := o.registerable(m); e != nil { + if !errors.Is(e, errEmptyAgg) { + err = errors.Join(err, e) } continue } - reg.registerFloat64(o.observablID) + + validInstruments = append(validInstruments, inst) default: // Instrument external to the SDK. - return nil, fmt.Errorf("invalid observable: from different implementation") + return nil, errors.New("invalid observable: from different implementation") } } - err := errs.errorOrNil() - if reg.len() == 0 { + if len(validInstruments) == 0 { // All insts use drop aggregation or are invalid. return noopRegister{}, err } - // Some or all instruments were valid. - cback := func(ctx context.Context) error { return f(ctx, reg) } - return m.pipes.registerMultiCallback(cback), err + unregs := make([]func(), len(m.pipes)) + for ix, pipe := range m.pipes { + reg := newObserver(pipe) + for _, inst := range validInstruments { + switch o := inst.(type) { + case int64Observable: + reg.registerInt64(o.observableID) + case float64Observable: + reg.registerFloat64(o.observableID) + } + } + + // Some or all instruments were valid. + cBack := func(ctx context.Context) error { return f(ctx, reg) } + unregs[ix] = pipe.addMultiCallback(cBack) + } + + return unregisterFuncs{f: unregs}, err } type observer struct { embedded.Observer - float64 map[observablID[float64]]struct{} - int64 map[observablID[int64]]struct{} + pipe *pipeline + float64 map[observableID[float64]]struct{} + int64 map[observableID[int64]]struct{} } -func newObserver() observer { +func newObserver(p *pipeline) observer { return observer{ - float64: make(map[observablID[float64]]struct{}), - int64: make(map[observablID[int64]]struct{}), + pipe: p, + float64: make(map[observableID[float64]]struct{}), + int64: make(map[observableID[int64]]struct{}), } } -func (r observer) len() int { - return len(r.float64) + len(r.int64) -} - -func (r observer) registerFloat64(id observablID[float64]) { +func (r observer) registerFloat64(id observableID[float64]) { r.float64[id] = struct{}{} } -func (r observer) registerInt64(id observablID[int64]) { +func (r observer) registerInt64(id observableID[int64]) { r.int64[id] = struct{}{} } @@ -521,22 +538,12 @@ func (r observer) ObserveFloat64(o metric.Float64Observable, v float64, opts ... switch conv := o.(type) { case float64Observable: oImpl = conv - case interface { - Unwrap() metric.Observable - }: - // Unwrap any global. - async := conv.Unwrap() - var ok bool - if oImpl, ok = async.(float64Observable); !ok { - global.Error(errUnknownObserver, "failed to record asynchronous") - return - } default: global.Error(errUnknownObserver, "failed to record") return } - if _, registered := r.float64[oImpl.observablID]; !registered { + if _, registered := r.float64[oImpl.observableID]; !registered { if !oImpl.dropAggregation { global.Error(errUnregObserver, "failed to record", "name", oImpl.name, @@ -548,7 +555,12 @@ func (r observer) ObserveFloat64(o metric.Float64Observable, v float64, opts ... return } c := metric.NewObserveConfig(opts) - oImpl.observe(v, c.Attributes()) + // Access to r.pipe.float64Measure is already guarded by a lock in pipeline.produce. + // TODO (#5946): Refactor pipeline and observable measures. + measures := r.pipe.float64Measures[oImpl.observableID] + for _, m := range measures { + m(context.Background(), v, c.Attributes()) + } } func (r observer) ObserveInt64(o metric.Int64Observable, v int64, opts ...metric.ObserveOption) { @@ -556,22 +568,12 @@ func (r observer) ObserveInt64(o metric.Int64Observable, v int64, opts ...metric switch conv := o.(type) { case int64Observable: oImpl = conv - case interface { - Unwrap() metric.Observable - }: - // Unwrap any global. - async := conv.Unwrap() - var ok bool - if oImpl, ok = async.(int64Observable); !ok { - global.Error(errUnknownObserver, "failed to record asynchronous") - return - } default: global.Error(errUnknownObserver, "failed to record") return } - if _, registered := r.int64[oImpl.observablID]; !registered { + if _, registered := r.int64[oImpl.observableID]; !registered { if !oImpl.dropAggregation { global.Error(errUnregObserver, "failed to record", "name", oImpl.name, @@ -583,7 +585,12 @@ func (r observer) ObserveInt64(o metric.Int64Observable, v int64, opts ...metric return } c := metric.NewObserveConfig(opts) - oImpl.observe(v, c.Attributes()) + // Access to r.pipe.int64Measures is already guarded b a lock in pipeline.produce. + // TODO (#5946): Refactor pipeline and observable measures. + measures := r.pipe.int64Measures[oImpl.observableID] + for _, m := range measures { + m(context.Background(), v, c.Attributes()) + } } type noopRegister struct{ embedded.Registration } diff --git a/vendor/go.opentelemetry.io/otel/sdk/metric/periodic_reader.go b/vendor/go.opentelemetry.io/otel/sdk/metric/periodic_reader.go index 67ee1b11a..dcd2182d9 100644 --- a/vendor/go.opentelemetry.io/otel/sdk/metric/periodic_reader.go +++ b/vendor/go.opentelemetry.io/otel/sdk/metric/periodic_reader.go @@ -251,18 +251,17 @@ func (r *PeriodicReader) collect(ctx context.Context, p interface{}, rm *metricd if err != nil { return err } - var errs []error for _, producer := range r.externalProducers.Load().([]Producer) { - externalMetrics, err := producer.Produce(ctx) - if err != nil { - errs = append(errs, err) + externalMetrics, e := producer.Produce(ctx) + if e != nil { + err = errors.Join(err, e) } rm.ScopeMetrics = append(rm.ScopeMetrics, externalMetrics...) } global.Debug("PeriodicReader collection", "Data", rm) - return unifyErrors(errs) + return err } // export exports metric data m using r's exporter. diff --git a/vendor/go.opentelemetry.io/otel/sdk/metric/pipeline.go b/vendor/go.opentelemetry.io/otel/sdk/metric/pipeline.go index 823bf2fe3..775e24526 100644 --- a/vendor/go.opentelemetry.io/otel/sdk/metric/pipeline.go +++ b/vendor/go.opentelemetry.io/otel/sdk/metric/pipeline.go @@ -8,14 +8,13 @@ import ( "context" "errors" "fmt" - "strings" "sync" "sync/atomic" "go.opentelemetry.io/otel/internal/global" - "go.opentelemetry.io/otel/metric" "go.opentelemetry.io/otel/metric/embedded" "go.opentelemetry.io/otel/sdk/instrumentation" + "go.opentelemetry.io/otel/sdk/metric/exemplar" "go.opentelemetry.io/otel/sdk/metric/internal" "go.opentelemetry.io/otel/sdk/metric/internal/aggregate" "go.opentelemetry.io/otel/sdk/metric/internal/x" @@ -38,14 +37,17 @@ type instrumentSync struct { compAgg aggregate.ComputeAggregation } -func newPipeline(res *resource.Resource, reader Reader, views []View) *pipeline { +func newPipeline(res *resource.Resource, reader Reader, views []View, exemplarFilter exemplar.Filter) *pipeline { if res == nil { res = resource.Empty() } return &pipeline{ - resource: res, - reader: reader, - views: views, + resource: res, + reader: reader, + views: views, + int64Measures: map[observableID[int64]][]aggregate.Measure[int64]{}, + float64Measures: map[observableID[float64]][]aggregate.Measure[float64]{}, + exemplarFilter: exemplarFilter, // aggregations is lazy allocated when needed. } } @@ -63,9 +65,26 @@ type pipeline struct { views []View sync.Mutex - aggregations map[instrumentation.Scope][]instrumentSync - callbacks []func(context.Context) error - multiCallbacks list.List + int64Measures map[observableID[int64]][]aggregate.Measure[int64] + float64Measures map[observableID[float64]][]aggregate.Measure[float64] + aggregations map[instrumentation.Scope][]instrumentSync + callbacks []func(context.Context) error + multiCallbacks list.List + exemplarFilter exemplar.Filter +} + +// addInt64Measure adds a new int64 measure to the pipeline for each observer. +func (p *pipeline) addInt64Measure(id observableID[int64], m []aggregate.Measure[int64]) { + p.Lock() + defer p.Unlock() + p.int64Measures[id] = m +} + +// addFloat64Measure adds a new float64 measure to the pipeline for each observer. +func (p *pipeline) addFloat64Measure(id observableID[float64], m []aggregate.Measure[float64]) { + p.Lock() + defer p.Unlock() + p.float64Measures[id] = m } // addSync adds the instrumentSync to pipeline p with scope. This method is not @@ -105,14 +124,15 @@ func (p *pipeline) produce(ctx context.Context, rm *metricdata.ResourceMetrics) p.Lock() defer p.Unlock() - var errs multierror + var err error for _, c := range p.callbacks { // TODO make the callbacks parallel. ( #3034 ) - if err := c(ctx); err != nil { - errs.append(err) + if e := c(ctx); e != nil { + err = errors.Join(err, e) } if err := ctx.Err(); err != nil { rm.Resource = nil + clear(rm.ScopeMetrics) // Erase elements to let GC collect objects. rm.ScopeMetrics = rm.ScopeMetrics[:0] return err } @@ -120,12 +140,13 @@ func (p *pipeline) produce(ctx context.Context, rm *metricdata.ResourceMetrics) for e := p.multiCallbacks.Front(); e != nil; e = e.Next() { // TODO make the callbacks parallel. ( #3034 ) f := e.Value.(multiCallback) - if err := f(ctx); err != nil { - errs.append(err) + if e := f(ctx); e != nil { + err = errors.Join(err, e) } if err := ctx.Err(); err != nil { // This means the context expired before we finished running callbacks. rm.Resource = nil + clear(rm.ScopeMetrics) // Erase elements to let GC collect objects. rm.ScopeMetrics = rm.ScopeMetrics[:0] return err } @@ -157,7 +178,7 @@ func (p *pipeline) produce(ctx context.Context, rm *metricdata.ResourceMetrics) rm.ScopeMetrics = rm.ScopeMetrics[:i] - return errs.errorOrNil() + return err } // inserter facilitates inserting of new instruments from a single scope into a @@ -219,7 +240,7 @@ func (i *inserter[N]) Instrument(inst Instrument, readerAggregation Aggregation) measures []aggregate.Measure[N] ) - errs := &multierror{wrapped: errCreatingAggregators} + var err error seen := make(map[uint64]struct{}) for _, v := range i.pipeline.views { stream, match := v(inst) @@ -227,9 +248,9 @@ func (i *inserter[N]) Instrument(inst Instrument, readerAggregation Aggregation) continue } matched = true - in, id, err := i.cachedAggregator(inst.Scope, inst.Kind, stream, readerAggregation) - if err != nil { - errs.append(err) + in, id, e := i.cachedAggregator(inst.Scope, inst.Kind, stream, readerAggregation) + if e != nil { + err = errors.Join(err, e) } if in == nil { // Drop aggregation. continue @@ -242,8 +263,12 @@ func (i *inserter[N]) Instrument(inst Instrument, readerAggregation Aggregation) measures = append(measures, in) } + if err != nil { + err = errors.Join(errCreatingAggregators, err) + } + if matched { - return measures, errs.errorOrNil() + return measures, err } // Apply implicit default view if no explicit matched. @@ -252,15 +277,18 @@ func (i *inserter[N]) Instrument(inst Instrument, readerAggregation Aggregation) Description: inst.Description, Unit: inst.Unit, } - in, _, err := i.cachedAggregator(inst.Scope, inst.Kind, stream, readerAggregation) - if err != nil { - errs.append(err) + in, _, e := i.cachedAggregator(inst.Scope, inst.Kind, stream, readerAggregation) + if e != nil { + if err == nil { + err = errCreatingAggregators + } + err = errors.Join(err, e) } if in != nil { // Ensured to have not seen given matched was false. measures = append(measures, in) } - return measures, errs.errorOrNil() + return measures, err } // addCallback registers a single instrument callback to be run when @@ -329,6 +357,9 @@ func (i *inserter[N]) cachedAggregator(scope instrumentation.Scope, kind Instrum // The view explicitly requested the default aggregation. stream.Aggregation = DefaultAggregationSelector(kind) } + if stream.ExemplarReservoirProviderSelector == nil { + stream.ExemplarReservoirProviderSelector = DefaultExemplarReservoirProviderSelector + } if err := isAggregatorCompatible(kind, stream.Aggregation); err != nil { return nil, 0, fmt.Errorf( @@ -349,7 +380,7 @@ func (i *inserter[N]) cachedAggregator(scope instrumentation.Scope, kind Instrum cv := i.aggregators.Lookup(normID, func() aggVal[N] { b := aggregate.Builder[N]{ Temporality: i.pipeline.reader.temporality(kind), - ReservoirFunc: reservoirFunc[N](stream.Aggregation), + ReservoirFunc: reservoirFunc[N](stream.ExemplarReservoirProviderSelector(stream.Aggregation), i.pipeline.exemplarFilter), } b.Filter = stream.AttributeFilter // A value less than or equal to zero will disable the aggregation @@ -552,24 +583,16 @@ func isAggregatorCompatible(kind InstrumentKind, agg Aggregation) error { // measurement. type pipelines []*pipeline -func newPipelines(res *resource.Resource, readers []Reader, views []View) pipelines { +func newPipelines(res *resource.Resource, readers []Reader, views []View, exemplarFilter exemplar.Filter) pipelines { pipes := make([]*pipeline, 0, len(readers)) for _, r := range readers { - p := newPipeline(res, r, views) + p := newPipeline(res, r, views, exemplarFilter) r.register(p) pipes = append(pipes, p) } return pipes } -func (p pipelines) registerMultiCallback(c multiCallback) metric.Registration { - unregs := make([]func(), len(p)) - for i, pipe := range p { - unregs[i] = pipe.addMultiCallback(c) - } - return unregisterFuncs{f: unregs} -} - type unregisterFuncs struct { embedded.Registration f []func() @@ -602,15 +625,15 @@ func newResolver[N int64 | float64](p pipelines, vc *cache[string, instID]) reso func (r resolver[N]) Aggregators(id Instrument) ([]aggregate.Measure[N], error) { var measures []aggregate.Measure[N] - errs := &multierror{} + var err error for _, i := range r.inserters { - in, err := i.Instrument(id, i.readerDefaultAggregation(id.Kind)) - if err != nil { - errs.append(err) + in, e := i.Instrument(id, i.readerDefaultAggregation(id.Kind)) + if e != nil { + err = errors.Join(err, e) } measures = append(measures, in...) } - return measures, errs.errorOrNil() + return measures, err } // HistogramAggregators returns the histogram Aggregators that must be updated by the instrument @@ -619,37 +642,18 @@ func (r resolver[N]) Aggregators(id Instrument) ([]aggregate.Measure[N], error) func (r resolver[N]) HistogramAggregators(id Instrument, boundaries []float64) ([]aggregate.Measure[N], error) { var measures []aggregate.Measure[N] - errs := &multierror{} + var err error for _, i := range r.inserters { agg := i.readerDefaultAggregation(id.Kind) if histAgg, ok := agg.(AggregationExplicitBucketHistogram); ok && len(boundaries) > 0 { histAgg.Boundaries = boundaries agg = histAgg } - in, err := i.Instrument(id, agg) - if err != nil { - errs.append(err) + in, e := i.Instrument(id, agg) + if e != nil { + err = errors.Join(err, e) } measures = append(measures, in...) } - return measures, errs.errorOrNil() -} - -type multierror struct { - wrapped error - errors []string -} - -func (m *multierror) errorOrNil() error { - if len(m.errors) == 0 { - return nil - } - if m.wrapped == nil { - return errors.New(strings.Join(m.errors, "; ")) - } - return fmt.Errorf("%w: %s", m.wrapped, strings.Join(m.errors, "; ")) -} - -func (m *multierror) append(err error) { - m.errors = append(m.errors, err.Error()) + return measures, err } diff --git a/vendor/go.opentelemetry.io/otel/sdk/metric/provider.go b/vendor/go.opentelemetry.io/otel/sdk/metric/provider.go index a82af538e..2fca89e5a 100644 --- a/vendor/go.opentelemetry.io/otel/sdk/metric/provider.go +++ b/vendor/go.opentelemetry.io/otel/sdk/metric/provider.go @@ -42,7 +42,7 @@ func NewMeterProvider(options ...Option) *MeterProvider { flush, sdown := conf.readerSignals() mp := &MeterProvider{ - pipes: newPipelines(conf.res, conf.readers, conf.views), + pipes: newPipelines(conf.res, conf.readers, conf.views, conf.exemplarFilter), forceFlush: flush, shutdown: sdown, } @@ -76,15 +76,17 @@ func (mp *MeterProvider) Meter(name string, options ...metric.MeterOption) metri c := metric.NewMeterConfig(options...) s := instrumentation.Scope{ - Name: name, - Version: c.InstrumentationVersion(), - SchemaURL: c.SchemaURL(), + Name: name, + Version: c.InstrumentationVersion(), + SchemaURL: c.SchemaURL(), + Attributes: c.InstrumentationAttributes(), } global.Info("Meter created", "Name", s.Name, "Version", s.Version, "SchemaURL", s.SchemaURL, + "Attributes", s.Attributes, ) return mp.meters.Lookup(s, func() *meter { diff --git a/vendor/go.opentelemetry.io/otel/sdk/metric/reader.go b/vendor/go.opentelemetry.io/otel/sdk/metric/reader.go index d94bdee75..d13a70697 100644 --- a/vendor/go.opentelemetry.io/otel/sdk/metric/reader.go +++ b/vendor/go.opentelemetry.io/otel/sdk/metric/reader.go @@ -5,26 +5,26 @@ package metric // import "go.opentelemetry.io/otel/sdk/metric" import ( "context" - "fmt" + "errors" "go.opentelemetry.io/otel/sdk/metric/metricdata" ) // errDuplicateRegister is logged by a Reader when an attempt to registered it // more than once occurs. -var errDuplicateRegister = fmt.Errorf("duplicate reader registration") +var errDuplicateRegister = errors.New("duplicate reader registration") // ErrReaderNotRegistered is returned if Collect or Shutdown are called before // the reader is registered with a MeterProvider. -var ErrReaderNotRegistered = fmt.Errorf("reader is not registered") +var ErrReaderNotRegistered = errors.New("reader is not registered") // ErrReaderShutdown is returned if Collect or Shutdown are called after a // reader has been Shutdown once. -var ErrReaderShutdown = fmt.Errorf("reader is shutdown") +var ErrReaderShutdown = errors.New("reader is shutdown") // errNonPositiveDuration is logged when an environmental variable // has non-positive value. -var errNonPositiveDuration = fmt.Errorf("non-positive duration") +var errNonPositiveDuration = errors.New("non-positive duration") // Reader is the interface used between the SDK and an // exporter. Control flow is bi-directional through the @@ -60,8 +60,8 @@ type Reader interface { aggregation(InstrumentKind) Aggregation // nolint:revive // import-shadow for method scoped by type. // Collect gathers and returns all metric data related to the Reader from - // the SDK and stores it in out. An error is returned if this is called - // after Shutdown or if out is nil. + // the SDK and stores it in rm. An error is returned if this is called + // after Shutdown or if rm is nil. // // This method needs to be concurrent safe, and the cancellation of the // passed context is expected to be honored. diff --git a/vendor/go.opentelemetry.io/otel/sdk/metric/version.go b/vendor/go.opentelemetry.io/otel/sdk/metric/version.go index 44316caa1..7c4b8530d 100644 --- a/vendor/go.opentelemetry.io/otel/sdk/metric/version.go +++ b/vendor/go.opentelemetry.io/otel/sdk/metric/version.go @@ -5,5 +5,5 @@ package metric // import "go.opentelemetry.io/otel/sdk/metric" // version is the current release version of the metric SDK in use. func version() string { - return "1.29.0" + return "1.34.0" } diff --git a/vendor/go.opentelemetry.io/otel/sdk/metric/view.go b/vendor/go.opentelemetry.io/otel/sdk/metric/view.go index cd08c6732..630890f42 100644 --- a/vendor/go.opentelemetry.io/otel/sdk/metric/view.go +++ b/vendor/go.opentelemetry.io/otel/sdk/metric/view.go @@ -96,11 +96,12 @@ func NewView(criteria Instrument, mask Stream) View { return func(i Instrument) (Stream, bool) { if matchFunc(i) { return Stream{ - Name: nonZero(mask.Name, i.Name), - Description: nonZero(mask.Description, i.Description), - Unit: nonZero(mask.Unit, i.Unit), - Aggregation: agg, - AttributeFilter: mask.AttributeFilter, + Name: nonZero(mask.Name, i.Name), + Description: nonZero(mask.Description, i.Description), + Unit: nonZero(mask.Unit, i.Unit), + Aggregation: agg, + AttributeFilter: mask.AttributeFilter, + ExemplarReservoirProviderSelector: mask.ExemplarReservoirProviderSelector, }, true } return Stream{}, false diff --git a/vendor/go.opentelemetry.io/otel/sdk/resource/auto.go b/vendor/go.opentelemetry.io/otel/sdk/resource/auto.go index 95a61d61d..c02aeefdd 100644 --- a/vendor/go.opentelemetry.io/otel/sdk/resource/auto.go +++ b/vendor/go.opentelemetry.io/otel/sdk/resource/auto.go @@ -7,7 +7,6 @@ import ( "context" "errors" "fmt" - "strings" ) // ErrPartialResource is returned by a detector when complete source @@ -57,62 +56,37 @@ func Detect(ctx context.Context, detectors ...Detector) (*Resource, error) { // these errors will be returned. Otherwise, nil is returned. func detect(ctx context.Context, res *Resource, detectors []Detector) error { var ( - r *Resource - errs detectErrs - err error + r *Resource + err error + e error ) for _, detector := range detectors { if detector == nil { continue } - r, err = detector.Detect(ctx) - if err != nil { - errs = append(errs, err) - if !errors.Is(err, ErrPartialResource) { + r, e = detector.Detect(ctx) + if e != nil { + err = errors.Join(err, e) + if !errors.Is(e, ErrPartialResource) { continue } } - r, err = Merge(res, r) - if err != nil { - errs = append(errs, err) + r, e = Merge(res, r) + if e != nil { + err = errors.Join(err, e) } *res = *r } - if len(errs) == 0 { - return nil - } - if errors.Is(errs, ErrSchemaURLConflict) { - // If there has been a merge conflict, ensure the resource has no - // schema URL. - res.schemaURL = "" - } - return errs -} - -type detectErrs []error - -func (e detectErrs) Error() string { - errStr := make([]string, len(e)) - for i, err := range e { - errStr[i] = fmt.Sprintf("* %s", err) - } - - format := "%d errors occurred detecting resource:\n\t%s" - return fmt.Sprintf(format, len(e), strings.Join(errStr, "\n\t")) -} + if err != nil { + if errors.Is(err, ErrSchemaURLConflict) { + // If there has been a merge conflict, ensure the resource has no + // schema URL. + res.schemaURL = "" + } -func (e detectErrs) Unwrap() error { - switch len(e) { - case 0: - return nil - case 1: - return e[0] + err = fmt.Errorf("error detecting resource: %w", err) } - return e[1:] -} - -func (e detectErrs) Is(target error) bool { - return len(e) != 0 && errors.Is(e[0], target) + return err } diff --git a/vendor/go.opentelemetry.io/otel/sdk/resource/builtin.go b/vendor/go.opentelemetry.io/otel/sdk/resource/builtin.go index 6ac1cdbf7..cf3c88e15 100644 --- a/vendor/go.opentelemetry.io/otel/sdk/resource/builtin.go +++ b/vendor/go.opentelemetry.io/otel/sdk/resource/builtin.go @@ -20,15 +20,13 @@ type ( // telemetrySDK is a Detector that provides information about // the OpenTelemetry SDK used. This Detector is included as a // builtin. If these resource attributes are not wanted, use - // the WithTelemetrySDK(nil) or WithoutBuiltin() options to - // explicitly disable them. + // resource.New() to explicitly disable them. telemetrySDK struct{} // host is a Detector that provides information about the host // being run on. This Detector is included as a builtin. If // these resource attributes are not wanted, use the - // WithHost(nil) or WithoutBuiltin() options to explicitly - // disable them. + // resource.New() to explicitly disable them. host struct{} stringDetector struct { diff --git a/vendor/go.opentelemetry.io/otel/sdk/resource/host_id_windows.go b/vendor/go.opentelemetry.io/otel/sdk/resource/host_id_windows.go index 71386e2da..3677c83d7 100644 --- a/vendor/go.opentelemetry.io/otel/sdk/resource/host_id_windows.go +++ b/vendor/go.opentelemetry.io/otel/sdk/resource/host_id_windows.go @@ -10,17 +10,16 @@ import ( "golang.org/x/sys/windows/registry" ) -// implements hostIDReader +// implements hostIDReader. type hostIDReaderWindows struct{} -// read reads MachineGuid from the windows registry key: -// SOFTWARE\Microsoft\Cryptography +// read reads MachineGuid from the Windows registry key: +// SOFTWARE\Microsoft\Cryptography. func (*hostIDReaderWindows) read() (string, error) { k, err := registry.OpenKey( registry.LOCAL_MACHINE, `SOFTWARE\Microsoft\Cryptography`, registry.QUERY_VALUE|registry.WOW64_64KEY, ) - if err != nil { return "", err } diff --git a/vendor/go.opentelemetry.io/otel/sdk/resource/os_windows.go b/vendor/go.opentelemetry.io/otel/sdk/resource/os_windows.go index 5e3d199d7..a6a5a53c0 100644 --- a/vendor/go.opentelemetry.io/otel/sdk/resource/os_windows.go +++ b/vendor/go.opentelemetry.io/otel/sdk/resource/os_windows.go @@ -17,7 +17,6 @@ import ( func platformOSDescription() (string, error) { k, err := registry.OpenKey( registry.LOCAL_MACHINE, `SOFTWARE\Microsoft\Windows NT\CurrentVersion`, registry.QUERY_VALUE) - if err != nil { return "", err } diff --git a/vendor/go.opentelemetry.io/otel/sdk/trace/batch_span_processor.go b/vendor/go.opentelemetry.io/otel/sdk/trace/batch_span_processor.go index 1d399a75d..ccc97e1b6 100644 --- a/vendor/go.opentelemetry.io/otel/sdk/trace/batch_span_processor.go +++ b/vendor/go.opentelemetry.io/otel/sdk/trace/batch_span_processor.go @@ -280,6 +280,7 @@ func (bsp *batchSpanProcessor) exportSpans(ctx context.Context) error { // // It is up to the exporter to implement any type of retry logic if a batch is failing // to be exported, since it is specific to the protocol and backend being sent to. + clear(bsp.batch) // Erase elements to let GC collect objects bsp.batch = bsp.batch[:0] if err != nil { @@ -316,7 +317,11 @@ func (bsp *batchSpanProcessor) processQueue() { bsp.batchMutex.Unlock() if shouldExport { if !bsp.timer.Stop() { - <-bsp.timer.C + // Handle both GODEBUG=asynctimerchan=[0|1] properly. + select { + case <-bsp.timer.C: + default: + } } if err := bsp.exportSpans(ctx); err != nil { otel.Handle(err) diff --git a/vendor/go.opentelemetry.io/otel/sdk/trace/evictedqueue.go b/vendor/go.opentelemetry.io/otel/sdk/trace/evictedqueue.go index 821c83faa..8c308dd60 100644 --- a/vendor/go.opentelemetry.io/otel/sdk/trace/evictedqueue.go +++ b/vendor/go.opentelemetry.io/otel/sdk/trace/evictedqueue.go @@ -12,25 +12,26 @@ import ( // evictedQueue is a FIFO queue with a configurable capacity. type evictedQueue[T any] struct { - queue []T - capacity int - droppedCount int - logDropped func() + queue []T + capacity int + droppedCount int + logDroppedMsg string + logDroppedOnce sync.Once } func newEvictedQueueEvent(capacity int) evictedQueue[Event] { // Do not pre-allocate queue, do this lazily. return evictedQueue[Event]{ - capacity: capacity, - logDropped: sync.OnceFunc(func() { global.Warn("limit reached: dropping trace trace.Event") }), + capacity: capacity, + logDroppedMsg: "limit reached: dropping trace trace.Event", } } func newEvictedQueueLink(capacity int) evictedQueue[Link] { // Do not pre-allocate queue, do this lazily. return evictedQueue[Link]{ - capacity: capacity, - logDropped: sync.OnceFunc(func() { global.Warn("limit reached: dropping trace trace.Link") }), + capacity: capacity, + logDroppedMsg: "limit reached: dropping trace trace.Link", } } @@ -53,6 +54,10 @@ func (eq *evictedQueue[T]) add(value T) { eq.queue = append(eq.queue, value) } +func (eq *evictedQueue[T]) logDropped() { + eq.logDroppedOnce.Do(func() { global.Warn(eq.logDroppedMsg) }) +} + // copy returns a copy of the evictedQueue. func (eq *evictedQueue[T]) copy() []T { return slices.Clone(eq.queue) diff --git a/vendor/go.opentelemetry.io/otel/sdk/trace/provider.go b/vendor/go.opentelemetry.io/otel/sdk/trace/provider.go index 14c2e5beb..185aa7c08 100644 --- a/vendor/go.opentelemetry.io/otel/sdk/trace/provider.go +++ b/vendor/go.opentelemetry.io/otel/sdk/trace/provider.go @@ -139,9 +139,10 @@ func (p *TracerProvider) Tracer(name string, opts ...trace.TracerOption) trace.T name = defaultTracerName } is := instrumentation.Scope{ - Name: name, - Version: c.InstrumentationVersion(), - SchemaURL: c.SchemaURL(), + Name: name, + Version: c.InstrumentationVersion(), + SchemaURL: c.SchemaURL(), + Attributes: c.InstrumentationAttributes(), } t, ok := func() (trace.Tracer, bool) { @@ -168,7 +169,7 @@ func (p *TracerProvider) Tracer(name string, opts ...trace.TracerOption) trace.T // slowing down all tracing consumers. // - Logging code may be instrumented with tracing and deadlock because it could try // acquiring the same non-reentrant mutex. - global.Info("Tracer created", "name", name, "version", is.Version, "schemaURL", is.SchemaURL) + global.Info("Tracer created", "name", name, "version", is.Version, "schemaURL", is.SchemaURL, "attributes", is.Attributes) } return t } diff --git a/vendor/go.opentelemetry.io/otel/sdk/trace/sampler_env.go b/vendor/go.opentelemetry.io/otel/sdk/trace/sampler_env.go index d2d1f7246..9b672a1d7 100644 --- a/vendor/go.opentelemetry.io/otel/sdk/trace/sampler_env.go +++ b/vendor/go.opentelemetry.io/otel/sdk/trace/sampler_env.go @@ -5,7 +5,6 @@ package trace // import "go.opentelemetry.io/otel/sdk/trace" import ( "errors" - "fmt" "os" "strconv" "strings" @@ -26,7 +25,7 @@ const ( type errUnsupportedSampler string func (e errUnsupportedSampler) Error() string { - return fmt.Sprintf("unsupported sampler: %s", string(e)) + return "unsupported sampler: " + string(e) } var ( @@ -39,7 +38,7 @@ type samplerArgParseError struct { } func (e samplerArgParseError) Error() string { - return fmt.Sprintf("parsing sampler argument: %s", e.parseErr.Error()) + return "parsing sampler argument: " + e.parseErr.Error() } func (e samplerArgParseError) Unwrap() error { diff --git a/vendor/go.opentelemetry.io/otel/sdk/trace/span.go b/vendor/go.opentelemetry.io/otel/sdk/trace/span.go index 4945f5083..8f4fc3850 100644 --- a/vendor/go.opentelemetry.io/otel/sdk/trace/span.go +++ b/vendor/go.opentelemetry.io/otel/sdk/trace/span.go @@ -174,6 +174,17 @@ func (s *recordingSpan) IsRecording() bool { s.mu.Lock() defer s.mu.Unlock() + return s.isRecording() +} + +// isRecording returns if this span is being recorded. If this span has ended +// this will return false. +// +// This method assumes s.mu.Lock is held by the caller. +func (s *recordingSpan) isRecording() bool { + if s == nil { + return false + } return s.endTime.IsZero() } @@ -182,11 +193,15 @@ func (s *recordingSpan) IsRecording() bool { // included in the set status when the code is for an error. If this span is // not being recorded than this method does nothing. func (s *recordingSpan) SetStatus(code codes.Code, description string) { - if !s.IsRecording() { + if s == nil { return } + s.mu.Lock() defer s.mu.Unlock() + if !s.isRecording() { + return + } if s.status.Code > code { return } @@ -210,12 +225,15 @@ func (s *recordingSpan) SetStatus(code codes.Code, description string) { // attributes the span is configured to have, the last added attributes will // be dropped. func (s *recordingSpan) SetAttributes(attributes ...attribute.KeyValue) { - if !s.IsRecording() { + if s == nil || len(attributes) == 0 { return } s.mu.Lock() defer s.mu.Unlock() + if !s.isRecording() { + return + } limit := s.tracer.provider.spanLimits.AttributeCountLimit if limit == 0 { @@ -233,7 +251,7 @@ func (s *recordingSpan) SetAttributes(attributes ...attribute.KeyValue) { // Otherwise, add without deduplication. When attributes are read they // will be deduplicated, optimizing the operation. - s.attributes = slices.Grow(s.attributes, len(s.attributes)+len(attributes)) + s.attributes = slices.Grow(s.attributes, len(attributes)) for _, a := range attributes { if !a.Valid() { // Drop all invalid attributes. @@ -280,13 +298,17 @@ func (s *recordingSpan) addOverCapAttrs(limit int, attrs []attribute.KeyValue) { // Do not set a capacity when creating this map. Benchmark testing has // showed this to only add unused memory allocations in general use. - exists := make(map[attribute.Key]int) - s.dedupeAttrsFromRecord(&exists) + exists := make(map[attribute.Key]int, len(s.attributes)) + s.dedupeAttrsFromRecord(exists) // Now that s.attributes is deduplicated, adding unique attributes up to // the capacity of s will not over allocate s.attributes. - sum := len(attrs) + len(s.attributes) - s.attributes = slices.Grow(s.attributes, min(sum, limit)) + + // max size = limit + maxCap := min(len(attrs)+len(s.attributes), limit) + if cap(s.attributes) < maxCap { + s.attributes = slices.Grow(s.attributes, maxCap-cap(s.attributes)) + } for _, a := range attrs { if !a.Valid() { // Drop all invalid attributes. @@ -296,6 +318,7 @@ func (s *recordingSpan) addOverCapAttrs(limit int, attrs []attribute.KeyValue) { if idx, ok := exists[a.Key]; ok { // Perform all updates before dropping, even when at capacity. + a = truncateAttr(s.tracer.provider.spanLimits.AttributeValueLengthLimit, a) s.attributes[idx] = a continue } @@ -324,54 +347,99 @@ func truncateAttr(limit int, attr attribute.KeyValue) attribute.KeyValue { } switch attr.Value.Type() { case attribute.STRING: - if v := attr.Value.AsString(); len(v) > limit { - return attr.Key.String(safeTruncate(v, limit)) - } + v := attr.Value.AsString() + return attr.Key.String(truncate(limit, v)) case attribute.STRINGSLICE: v := attr.Value.AsStringSlice() for i := range v { - if len(v[i]) > limit { - v[i] = safeTruncate(v[i], limit) - } + v[i] = truncate(limit, v[i]) } return attr.Key.StringSlice(v) } return attr } -// safeTruncate truncates the string and guarantees valid UTF-8 is returned. -func safeTruncate(input string, limit int) string { - if trunc, ok := safeTruncateValidUTF8(input, limit); ok { - return trunc +// truncate returns a truncated version of s such that it contains less than +// the limit number of characters. Truncation is applied by returning the limit +// number of valid characters contained in s. +// +// If limit is negative, it returns the original string. +// +// UTF-8 is supported. When truncating, all invalid characters are dropped +// before applying truncation. +// +// If s already contains less than the limit number of bytes, it is returned +// unchanged. No invalid characters are removed. +func truncate(limit int, s string) string { + // This prioritize performance in the following order based on the most + // common expected use-cases. + // + // - Short values less than the default limit (128). + // - Strings with valid encodings that exceed the limit. + // - No limit. + // - Strings with invalid encodings that exceed the limit. + if limit < 0 || len(s) <= limit { + return s + } + + // Optimistically, assume all valid UTF-8. + var b strings.Builder + count := 0 + for i, c := range s { + if c != utf8.RuneError { + count++ + if count > limit { + return s[:i] + } + continue + } + + _, size := utf8.DecodeRuneInString(s[i:]) + if size == 1 { + // Invalid encoding. + b.Grow(len(s) - 1) + _, _ = b.WriteString(s[:i]) + s = s[i:] + break + } + } + + // Fast-path, no invalid input. + if b.Cap() == 0 { + return s } - trunc, _ := safeTruncateValidUTF8(strings.ToValidUTF8(input, ""), limit) - return trunc -} -// safeTruncateValidUTF8 returns a copy of the input string safely truncated to -// limit. The truncation is ensured to occur at the bounds of complete UTF-8 -// characters. If invalid encoding of UTF-8 is encountered, input is returned -// with false, otherwise, the truncated input will be returned with true. -func safeTruncateValidUTF8(input string, limit int) (string, bool) { - for cnt := 0; cnt <= limit; { - r, size := utf8.DecodeRuneInString(input[cnt:]) - if r == utf8.RuneError { - return input, false + // Truncate while validating UTF-8. + for i := 0; i < len(s) && count < limit; { + c := s[i] + if c < utf8.RuneSelf { + // Optimization for single byte runes (common case). + _ = b.WriteByte(c) + i++ + count++ + continue } - if cnt+size > limit { - return input[:cnt], true + _, size := utf8.DecodeRuneInString(s[i:]) + if size == 1 { + // We checked for all 1-byte runes above, this is a RuneError. + i++ + continue } - cnt += size + + _, _ = b.WriteString(s[i : i+size]) + i += size + count++ } - return input, true + + return b.String() } // End ends the span. This method does nothing if the span is already ended or // is not being recorded. // -// The only SpanOption currently supported is WithTimestamp which will set the -// end time for a Span's life-cycle. +// The only SpanEndOption currently supported are [trace.WithTimestamp], and +// [trace.WithStackTrace]. // // If this method is called while panicking an error event is added to the // Span before ending it and the panic is continued. @@ -386,9 +454,10 @@ func (s *recordingSpan) End(options ...trace.SpanEndOption) { // the span's duration in case some operation below takes a while. et := monotonicEndTime(s.startTime) - // Do relative expensive check now that we have an end time and see if we - // need to do any more processing. - if !s.IsRecording() { + // Lock the span now that we have an end time and see if we need to do any more processing. + s.mu.Lock() + if !s.isRecording() { + s.mu.Unlock() return } @@ -413,10 +482,11 @@ func (s *recordingSpan) End(options ...trace.SpanEndOption) { } if s.executionTracerTaskEnd != nil { + s.mu.Unlock() s.executionTracerTaskEnd() + s.mu.Lock() } - s.mu.Lock() // Setting endTime to non-zero marks the span as ended and not recording. if config.Timestamp().IsZero() { s.endTime = et @@ -450,7 +520,13 @@ func monotonicEndTime(start time.Time) time.Time { // does not change the Span status. If this span is not being recorded or err is nil // than this method does nothing. func (s *recordingSpan) RecordError(err error, opts ...trace.EventOption) { - if s == nil || err == nil || !s.IsRecording() { + if s == nil || err == nil { + return + } + + s.mu.Lock() + defer s.mu.Unlock() + if !s.isRecording() { return } @@ -486,14 +562,23 @@ func recordStackTrace() string { } // AddEvent adds an event with the provided name and options. If this span is -// not being recorded than this method does nothing. +// not being recorded then this method does nothing. func (s *recordingSpan) AddEvent(name string, o ...trace.EventOption) { - if !s.IsRecording() { + if s == nil { + return + } + + s.mu.Lock() + defer s.mu.Unlock() + if !s.isRecording() { return } s.addEvent(name, o...) } +// addEvent adds an event with the provided name and options. +// +// This method assumes s.mu.Lock is held by the caller. func (s *recordingSpan) addEvent(name string, o ...trace.EventOption) { c := trace.NewEventConfig(o...) e := Event{Name: name, Attributes: c.Attributes(), Time: c.Timestamp()} @@ -510,20 +595,21 @@ func (s *recordingSpan) addEvent(name string, o ...trace.EventOption) { e.Attributes = e.Attributes[:limit] } - s.mu.Lock() s.events.add(e) - s.mu.Unlock() } // SetName sets the name of this span. If this span is not being recorded than // this method does nothing. func (s *recordingSpan) SetName(name string) { - if !s.IsRecording() { + if s == nil { return } s.mu.Lock() defer s.mu.Unlock() + if !s.isRecording() { + return + } s.name = name } @@ -579,29 +665,26 @@ func (s *recordingSpan) Attributes() []attribute.KeyValue { func (s *recordingSpan) dedupeAttrs() { // Do not set a capacity when creating this map. Benchmark testing has // showed this to only add unused memory allocations in general use. - exists := make(map[attribute.Key]int) - s.dedupeAttrsFromRecord(&exists) + exists := make(map[attribute.Key]int, len(s.attributes)) + s.dedupeAttrsFromRecord(exists) } // dedupeAttrsFromRecord deduplicates the attributes of s to fit capacity // using record as the record of unique attribute keys to their index. // // This method assumes s.mu.Lock is held by the caller. -func (s *recordingSpan) dedupeAttrsFromRecord(record *map[attribute.Key]int) { +func (s *recordingSpan) dedupeAttrsFromRecord(record map[attribute.Key]int) { // Use the fact that slices share the same backing array. unique := s.attributes[:0] for _, a := range s.attributes { - if idx, ok := (*record)[a.Key]; ok { + if idx, ok := record[a.Key]; ok { unique[idx] = a } else { unique = append(unique, a) - (*record)[a.Key] = len(unique) - 1 + record[a.Key] = len(unique) - 1 } } - // s.attributes have element types of attribute.KeyValue. These types are - // not pointers and they themselves do not contain pointer fields, - // therefore the duplicate values do not need to be zeroed for them to be - // garbage collected. + clear(s.attributes[len(unique):]) // Erase unneeded elements to let GC collect objects. s.attributes = unique } @@ -657,7 +740,7 @@ func (s *recordingSpan) Resource() *resource.Resource { } func (s *recordingSpan) AddLink(link trace.Link) { - if !s.IsRecording() { + if s == nil { return } if !link.SpanContext.IsValid() && len(link.Attributes) == 0 && @@ -665,6 +748,12 @@ func (s *recordingSpan) AddLink(link trace.Link) { return } + s.mu.Lock() + defer s.mu.Unlock() + if !s.isRecording() { + return + } + l := Link{SpanContext: link.SpanContext, Attributes: link.Attributes} // Discard attributes over limit. @@ -678,9 +767,7 @@ func (s *recordingSpan) AddLink(link trace.Link) { l.Attributes = l.Attributes[:limit] } - s.mu.Lock() s.links.add(l) - s.mu.Unlock() } // DroppedAttributes returns the number of attributes dropped by the span @@ -755,12 +842,16 @@ func (s *recordingSpan) snapshot() ReadOnlySpan { } func (s *recordingSpan) addChild() { - if !s.IsRecording() { + if s == nil { return } + s.mu.Lock() + defer s.mu.Unlock() + if !s.isRecording() { + return + } s.childSpanCount++ - s.mu.Unlock() } func (*recordingSpan) private() {} diff --git a/vendor/go.opentelemetry.io/otel/sdk/version.go b/vendor/go.opentelemetry.io/otel/sdk/version.go index b7cede891..6b4038510 100644 --- a/vendor/go.opentelemetry.io/otel/sdk/version.go +++ b/vendor/go.opentelemetry.io/otel/sdk/version.go @@ -5,5 +5,5 @@ package sdk // import "go.opentelemetry.io/otel/sdk" // Version is the current release version of the OpenTelemetry SDK in use. func Version() string { - return "1.29.0" + return "1.34.0" } |