summaryrefslogtreecommitdiff
path: root/vendor/go.opentelemetry.io/otel/sdk
diff options
context:
space:
mode:
authorLibravatar Dominik Süß <dominik@suess.wtf>2025-02-06 12:14:37 +0100
committerLibravatar GitHub <noreply@github.com>2025-02-06 12:14:37 +0100
commitdd094e401282e135989f57c0ca3dee7dea3f5207 (patch)
tree74cb77830f621840273255a17565ced73b4fa997 /vendor/go.opentelemetry.io/otel/sdk
parent[feature] Use `X-Robots-Tag` headers to instruct scrapers/crawlers (#3737) (diff)
downloadgotosocial-dd094e401282e135989f57c0ca3dee7dea3f5207.tar.xz
[chore] update otel libraries (#3740)
* chore: update otel dependencies * refactor: combine tracing & metrics in observability package * chore: update example tracing compose file
Diffstat (limited to 'vendor/go.opentelemetry.io/otel/sdk')
-rw-r--r--vendor/go.opentelemetry.io/otel/sdk/instrumentation/scope.go4
-rw-r--r--vendor/go.opentelemetry.io/otel/sdk/metric/config.go79
-rw-r--r--vendor/go.opentelemetry.io/otel/sdk/metric/exemplar.go68
-rw-r--r--vendor/go.opentelemetry.io/otel/sdk/metric/exemplar/README.md3
-rw-r--r--vendor/go.opentelemetry.io/otel/sdk/metric/exemplar/doc.go (renamed from vendor/go.opentelemetry.io/otel/sdk/metric/internal/exemplar/doc.go)2
-rw-r--r--vendor/go.opentelemetry.io/otel/sdk/metric/exemplar/exemplar.go (renamed from vendor/go.opentelemetry.io/otel/sdk/metric/internal/exemplar/exemplar.go)2
-rw-r--r--vendor/go.opentelemetry.io/otel/sdk/metric/exemplar/filter.go (renamed from vendor/go.opentelemetry.io/otel/sdk/metric/internal/exemplar/filter.go)11
-rw-r--r--vendor/go.opentelemetry.io/otel/sdk/metric/exemplar/fixed_size_reservoir.go (renamed from vendor/go.opentelemetry.io/otel/sdk/metric/internal/exemplar/rand.go)118
-rw-r--r--vendor/go.opentelemetry.io/otel/sdk/metric/exemplar/histogram_reservoir.go70
-rw-r--r--vendor/go.opentelemetry.io/otel/sdk/metric/exemplar/reservoir.go (renamed from vendor/go.opentelemetry.io/otel/sdk/metric/internal/exemplar/reservoir.go)10
-rw-r--r--vendor/go.opentelemetry.io/otel/sdk/metric/exemplar/storage.go (renamed from vendor/go.opentelemetry.io/otel/sdk/metric/internal/exemplar/storage.go)8
-rw-r--r--vendor/go.opentelemetry.io/otel/sdk/metric/exemplar/value.go (renamed from vendor/go.opentelemetry.io/otel/sdk/metric/internal/exemplar/value.go)5
-rw-r--r--vendor/go.opentelemetry.io/otel/sdk/metric/exporter.go4
-rw-r--r--vendor/go.opentelemetry.io/otel/sdk/metric/instrument.go25
-rw-r--r--vendor/go.opentelemetry.io/otel/sdk/metric/internal/aggregate/aggregate.go9
-rw-r--r--vendor/go.opentelemetry.io/otel/sdk/metric/internal/aggregate/drop.go27
-rw-r--r--vendor/go.opentelemetry.io/otel/sdk/metric/internal/aggregate/exemplar.go3
-rw-r--r--vendor/go.opentelemetry.io/otel/sdk/metric/internal/aggregate/exponential_histogram.go21
-rw-r--r--vendor/go.opentelemetry.io/otel/sdk/metric/internal/aggregate/filtered_reservoir.go50
-rw-r--r--vendor/go.opentelemetry.io/otel/sdk/metric/internal/aggregate/histogram.go11
-rw-r--r--vendor/go.opentelemetry.io/otel/sdk/metric/internal/aggregate/lastvalue.go11
-rw-r--r--vendor/go.opentelemetry.io/otel/sdk/metric/internal/aggregate/sum.go17
-rw-r--r--vendor/go.opentelemetry.io/otel/sdk/metric/internal/exemplar/drop.go23
-rw-r--r--vendor/go.opentelemetry.io/otel/sdk/metric/internal/exemplar/filtered_reservoir.go49
-rw-r--r--vendor/go.opentelemetry.io/otel/sdk/metric/internal/exemplar/hist.go46
-rw-r--r--vendor/go.opentelemetry.io/otel/sdk/metric/internal/x/README.md19
-rw-r--r--vendor/go.opentelemetry.io/otel/sdk/metric/internal/x/x.go58
-rw-r--r--vendor/go.opentelemetry.io/otel/sdk/metric/manual_reader.go9
-rw-r--r--vendor/go.opentelemetry.io/otel/sdk/metric/meter.go123
-rw-r--r--vendor/go.opentelemetry.io/otel/sdk/metric/periodic_reader.go9
-rw-r--r--vendor/go.opentelemetry.io/otel/sdk/metric/pipeline.go132
-rw-r--r--vendor/go.opentelemetry.io/otel/sdk/metric/provider.go10
-rw-r--r--vendor/go.opentelemetry.io/otel/sdk/metric/reader.go14
-rw-r--r--vendor/go.opentelemetry.io/otel/sdk/metric/version.go2
-rw-r--r--vendor/go.opentelemetry.io/otel/sdk/metric/view.go11
-rw-r--r--vendor/go.opentelemetry.io/otel/sdk/resource/auto.go62
-rw-r--r--vendor/go.opentelemetry.io/otel/sdk/resource/builtin.go6
-rw-r--r--vendor/go.opentelemetry.io/otel/sdk/resource/host_id_windows.go7
-rw-r--r--vendor/go.opentelemetry.io/otel/sdk/resource/os_windows.go1
-rw-r--r--vendor/go.opentelemetry.io/otel/sdk/trace/batch_span_processor.go7
-rw-r--r--vendor/go.opentelemetry.io/otel/sdk/trace/evictedqueue.go21
-rw-r--r--vendor/go.opentelemetry.io/otel/sdk/trace/provider.go9
-rw-r--r--vendor/go.opentelemetry.io/otel/sdk/trace/sampler_env.go5
-rw-r--r--vendor/go.opentelemetry.io/otel/sdk/trace/span.go209
-rw-r--r--vendor/go.opentelemetry.io/otel/sdk/version.go2
45 files changed, 805 insertions, 587 deletions
diff --git a/vendor/go.opentelemetry.io/otel/sdk/instrumentation/scope.go b/vendor/go.opentelemetry.io/otel/sdk/instrumentation/scope.go
index 728115045..34852a47b 100644
--- a/vendor/go.opentelemetry.io/otel/sdk/instrumentation/scope.go
+++ b/vendor/go.opentelemetry.io/otel/sdk/instrumentation/scope.go
@@ -3,6 +3,8 @@
package instrumentation // import "go.opentelemetry.io/otel/sdk/instrumentation"
+import "go.opentelemetry.io/otel/attribute"
+
// Scope represents the instrumentation scope.
type Scope struct {
// Name is the name of the instrumentation scope. This should be the
@@ -12,4 +14,6 @@ type Scope struct {
Version string
// SchemaURL of the telemetry emitted by the scope.
SchemaURL string
+ // Attributes of the telemetry emitted by the scope.
+ Attributes attribute.Set
}
diff --git a/vendor/go.opentelemetry.io/otel/sdk/metric/config.go b/vendor/go.opentelemetry.io/otel/sdk/metric/config.go
index bbe7bf671..203cd9d65 100644
--- a/vendor/go.opentelemetry.io/otel/sdk/metric/config.go
+++ b/vendor/go.opentelemetry.io/otel/sdk/metric/config.go
@@ -5,17 +5,22 @@ package metric // import "go.opentelemetry.io/otel/sdk/metric"
import (
"context"
- "fmt"
+ "errors"
+ "os"
+ "strings"
"sync"
+ "go.opentelemetry.io/otel"
+ "go.opentelemetry.io/otel/sdk/metric/exemplar"
"go.opentelemetry.io/otel/sdk/resource"
)
// config contains configuration options for a MeterProvider.
type config struct {
- res *resource.Resource
- readers []Reader
- views []View
+ res *resource.Resource
+ readers []Reader
+ views []View
+ exemplarFilter exemplar.Filter
}
// readerSignals returns a force-flush and shutdown function for a
@@ -39,25 +44,13 @@ func (c config) readerSignals() (forceFlush, shutdown func(context.Context) erro
// value.
func unify(funcs []func(context.Context) error) func(context.Context) error {
return func(ctx context.Context) error {
- var errs []error
+ var err error
for _, f := range funcs {
- if err := f(ctx); err != nil {
- errs = append(errs, err)
+ if e := f(ctx); e != nil {
+ err = errors.Join(err, e)
}
}
- return unifyErrors(errs)
- }
-}
-
-// unifyErrors combines multiple errors into a single error.
-func unifyErrors(errs []error) error {
- switch len(errs) {
- case 0:
- return nil
- case 1:
- return errs[0]
- default:
- return fmt.Errorf("%v", errs)
+ return err
}
}
@@ -75,7 +68,13 @@ func unifyShutdown(funcs []func(context.Context) error) func(context.Context) er
// newConfig returns a config configured with options.
func newConfig(options []Option) config {
- conf := config{res: resource.Default()}
+ conf := config{
+ res: resource.Default(),
+ exemplarFilter: exemplar.TraceBasedFilter,
+ }
+ for _, o := range meterProviderOptionsFromEnv() {
+ conf = o.apply(conf)
+ }
for _, o := range options {
conf = o.apply(conf)
}
@@ -103,7 +102,11 @@ func (o optionFunc) apply(conf config) config {
// go.opentelemetry.io/otel/sdk/resource package will be used.
func WithResource(res *resource.Resource) Option {
return optionFunc(func(conf config) config {
- conf.res = res
+ var err error
+ conf.res, err = resource.Merge(resource.Environment(), res)
+ if err != nil {
+ otel.Handle(err)
+ }
return conf
})
}
@@ -135,3 +138,35 @@ func WithView(views ...View) Option {
return cfg
})
}
+
+// WithExemplarFilter configures the exemplar filter.
+//
+// The exemplar filter determines which measurements are offered to the
+// exemplar reservoir, but the exemplar reservoir makes the final decision of
+// whether to store an exemplar.
+//
+// By default, the [exemplar.SampledFilter]
+// is used. Exemplars can be entirely disabled by providing the
+// [exemplar.AlwaysOffFilter].
+func WithExemplarFilter(filter exemplar.Filter) Option {
+ return optionFunc(func(cfg config) config {
+ cfg.exemplarFilter = filter
+ return cfg
+ })
+}
+
+func meterProviderOptionsFromEnv() []Option {
+ var opts []Option
+ // https://github.com/open-telemetry/opentelemetry-specification/blob/d4b241f451674e8f611bb589477680341006ad2b/specification/configuration/sdk-environment-variables.md#exemplar
+ const filterEnvKey = "OTEL_METRICS_EXEMPLAR_FILTER"
+
+ switch strings.ToLower(strings.TrimSpace(os.Getenv(filterEnvKey))) {
+ case "always_on":
+ opts = append(opts, WithExemplarFilter(exemplar.AlwaysOnFilter))
+ case "always_off":
+ opts = append(opts, WithExemplarFilter(exemplar.AlwaysOffFilter))
+ case "trace_based":
+ opts = append(opts, WithExemplarFilter(exemplar.TraceBasedFilter))
+ }
+ return opts
+}
diff --git a/vendor/go.opentelemetry.io/otel/sdk/metric/exemplar.go b/vendor/go.opentelemetry.io/otel/sdk/metric/exemplar.go
index 82619da78..0335b8ae4 100644
--- a/vendor/go.opentelemetry.io/otel/sdk/metric/exemplar.go
+++ b/vendor/go.opentelemetry.io/otel/sdk/metric/exemplar.go
@@ -4,51 +4,49 @@
package metric // import "go.opentelemetry.io/otel/sdk/metric"
import (
- "os"
"runtime"
- "slices"
- "go.opentelemetry.io/otel/sdk/metric/internal/exemplar"
- "go.opentelemetry.io/otel/sdk/metric/internal/x"
+ "go.opentelemetry.io/otel/attribute"
+ "go.opentelemetry.io/otel/sdk/metric/exemplar"
+ "go.opentelemetry.io/otel/sdk/metric/internal/aggregate"
)
-// reservoirFunc returns the appropriately configured exemplar reservoir
-// creation func based on the passed InstrumentKind and user defined
-// environment variables.
-//
-// Note: This will only return non-nil values when the experimental exemplar
-// feature is enabled and the OTEL_METRICS_EXEMPLAR_FILTER environment variable
-// is not set to always_off.
-func reservoirFunc[N int64 | float64](agg Aggregation) func() exemplar.FilteredReservoir[N] {
- if !x.Exemplars.Enabled() {
- return nil
- }
- // https://github.com/open-telemetry/opentelemetry-specification/blob/d4b241f451674e8f611bb589477680341006ad2b/specification/configuration/sdk-environment-variables.md#exemplar
- const filterEnvKey = "OTEL_METRICS_EXEMPLAR_FILTER"
+// ExemplarReservoirProviderSelector selects the
+// [exemplar.ReservoirProvider] to use
+// based on the [Aggregation] of the metric.
+type ExemplarReservoirProviderSelector func(Aggregation) exemplar.ReservoirProvider
- var filter exemplar.Filter
-
- switch os.Getenv(filterEnvKey) {
- case "always_on":
- filter = exemplar.AlwaysOnFilter
- case "always_off":
- return exemplar.Drop
- case "trace_based":
- fallthrough
- default:
- filter = exemplar.SampledFilter
+// reservoirFunc returns the appropriately configured exemplar reservoir
+// creation func based on the passed InstrumentKind and filter configuration.
+func reservoirFunc[N int64 | float64](provider exemplar.ReservoirProvider, filter exemplar.Filter) func(attribute.Set) aggregate.FilteredExemplarReservoir[N] {
+ return func(attrs attribute.Set) aggregate.FilteredExemplarReservoir[N] {
+ return aggregate.NewFilteredExemplarReservoir[N](filter, provider(attrs))
}
+}
+// DefaultExemplarReservoirProviderSelector returns the default
+// [exemplar.ReservoirProvider] for the
+// provided [Aggregation].
+//
+// For explicit bucket histograms with more than 1 bucket, it uses the
+// [exemplar.HistogramReservoirProvider].
+// For exponential histograms, it uses the
+// [exemplar.FixedSizeReservoirProvider]
+// with a size of min(20, max_buckets).
+// For all other aggregations, it uses the
+// [exemplar.FixedSizeReservoirProvider]
+// with a size equal to the number of CPUs.
+//
+// Exemplar default reservoirs MAY change in a minor version bump. No
+// guarantees are made on the shape or statistical properties of returned
+// exemplars.
+func DefaultExemplarReservoirProviderSelector(agg Aggregation) exemplar.ReservoirProvider {
// https://github.com/open-telemetry/opentelemetry-specification/blob/d4b241f451674e8f611bb589477680341006ad2b/specification/metrics/sdk.md#exemplar-defaults
// Explicit bucket histogram aggregation with more than 1 bucket will
// use AlignedHistogramBucketExemplarReservoir.
a, ok := agg.(AggregationExplicitBucketHistogram)
if ok && len(a.Boundaries) > 0 {
- cp := slices.Clone(a.Boundaries)
- return func() exemplar.FilteredReservoir[N] {
- bounds := cp
- return exemplar.NewFilteredReservoir[N](filter, exemplar.Histogram(bounds))
- }
+ return exemplar.HistogramReservoirProvider(a.Boundaries)
}
var n int
@@ -75,7 +73,5 @@ func reservoirFunc[N int64 | float64](agg Aggregation) func() exemplar.FilteredR
}
}
- return func() exemplar.FilteredReservoir[N] {
- return exemplar.NewFilteredReservoir[N](filter, exemplar.FixedSize(n))
- }
+ return exemplar.FixedSizeReservoirProvider(n)
}
diff --git a/vendor/go.opentelemetry.io/otel/sdk/metric/exemplar/README.md b/vendor/go.opentelemetry.io/otel/sdk/metric/exemplar/README.md
new file mode 100644
index 000000000..d1025f5eb
--- /dev/null
+++ b/vendor/go.opentelemetry.io/otel/sdk/metric/exemplar/README.md
@@ -0,0 +1,3 @@
+# Metric SDK Exemplars
+
+[![PkgGoDev](https://pkg.go.dev/badge/go.opentelemetry.io/otel/sdk/metric/exemplar)](https://pkg.go.dev/go.opentelemetry.io/otel/sdk/metric/exemplar)
diff --git a/vendor/go.opentelemetry.io/otel/sdk/metric/internal/exemplar/doc.go b/vendor/go.opentelemetry.io/otel/sdk/metric/exemplar/doc.go
index 5394f48e0..9f2389376 100644
--- a/vendor/go.opentelemetry.io/otel/sdk/metric/internal/exemplar/doc.go
+++ b/vendor/go.opentelemetry.io/otel/sdk/metric/exemplar/doc.go
@@ -3,4 +3,4 @@
// Package exemplar provides an implementation of the OpenTelemetry exemplar
// reservoir to be used in metric collection pipelines.
-package exemplar // import "go.opentelemetry.io/otel/sdk/metric/internal/exemplar"
+package exemplar // import "go.opentelemetry.io/otel/sdk/metric/exemplar"
diff --git a/vendor/go.opentelemetry.io/otel/sdk/metric/internal/exemplar/exemplar.go b/vendor/go.opentelemetry.io/otel/sdk/metric/exemplar/exemplar.go
index fcaa6a469..1ab694678 100644
--- a/vendor/go.opentelemetry.io/otel/sdk/metric/internal/exemplar/exemplar.go
+++ b/vendor/go.opentelemetry.io/otel/sdk/metric/exemplar/exemplar.go
@@ -1,7 +1,7 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0
-package exemplar // import "go.opentelemetry.io/otel/sdk/metric/internal/exemplar"
+package exemplar // import "go.opentelemetry.io/otel/sdk/metric/exemplar"
import (
"time"
diff --git a/vendor/go.opentelemetry.io/otel/sdk/metric/internal/exemplar/filter.go b/vendor/go.opentelemetry.io/otel/sdk/metric/exemplar/filter.go
index 152a069a0..b595e2ace 100644
--- a/vendor/go.opentelemetry.io/otel/sdk/metric/internal/exemplar/filter.go
+++ b/vendor/go.opentelemetry.io/otel/sdk/metric/exemplar/filter.go
@@ -1,7 +1,7 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0
-package exemplar // import "go.opentelemetry.io/otel/sdk/metric/internal/exemplar"
+package exemplar // import "go.opentelemetry.io/otel/sdk/metric/exemplar"
import (
"context"
@@ -16,10 +16,10 @@ import (
// Reservoir in making a sampling decision.
type Filter func(context.Context) bool
-// SampledFilter is a [Filter] that will only offer measurements
+// TraceBasedFilter is a [Filter] that will only offer measurements
// if the passed context associated with the measurement contains a sampled
// [go.opentelemetry.io/otel/trace.SpanContext].
-func SampledFilter(ctx context.Context) bool {
+func TraceBasedFilter(ctx context.Context) bool {
return trace.SpanContextFromContext(ctx).IsSampled()
}
@@ -27,3 +27,8 @@ func SampledFilter(ctx context.Context) bool {
func AlwaysOnFilter(ctx context.Context) bool {
return true
}
+
+// AlwaysOffFilter is a [Filter] that never offers measurements.
+func AlwaysOffFilter(ctx context.Context) bool {
+ return false
+}
diff --git a/vendor/go.opentelemetry.io/otel/sdk/metric/internal/exemplar/rand.go b/vendor/go.opentelemetry.io/otel/sdk/metric/exemplar/fixed_size_reservoir.go
index 199a2608f..d4aab0aad 100644
--- a/vendor/go.opentelemetry.io/otel/sdk/metric/internal/exemplar/rand.go
+++ b/vendor/go.opentelemetry.io/otel/sdk/metric/exemplar/fixed_size_reservoir.go
@@ -1,31 +1,69 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0
-package exemplar // import "go.opentelemetry.io/otel/sdk/metric/internal/exemplar"
+package exemplar // import "go.opentelemetry.io/otel/sdk/metric/exemplar"
import (
"context"
"math"
"math/rand"
- "sync"
"time"
"go.opentelemetry.io/otel/attribute"
)
-var (
+// FixedSizeReservoirProvider returns a provider of [FixedSizeReservoir].
+func FixedSizeReservoirProvider(k int) ReservoirProvider {
+ return func(_ attribute.Set) Reservoir {
+ return NewFixedSizeReservoir(k)
+ }
+}
+
+// NewFixedSizeReservoir returns a [FixedSizeReservoir] that samples at most
+// k exemplars. If there are k or less measurements made, the Reservoir will
+// sample each one. If there are more than k, the Reservoir will then randomly
+// sample all additional measurement with a decreasing probability.
+func NewFixedSizeReservoir(k int) *FixedSizeReservoir {
+ return newFixedSizeReservoir(newStorage(k))
+}
+
+var _ Reservoir = &FixedSizeReservoir{}
+
+// FixedSizeReservoir is a [Reservoir] that samples at most k exemplars. If
+// there are k or less measurements made, the Reservoir will sample each one.
+// If there are more than k, the Reservoir will then randomly sample all
+// additional measurement with a decreasing probability.
+type FixedSizeReservoir struct {
+ *storage
+
+ // count is the number of measurement seen.
+ count int64
+ // next is the next count that will store a measurement at a random index
+ // once the reservoir has been filled.
+ next int64
+ // w is the largest random number in a distribution that is used to compute
+ // the next next.
+ w float64
+
// rng is used to make sampling decisions.
//
// Do not use crypto/rand. There is no reason for the decrease in performance
// given this is not a security sensitive decision.
- rng = rand.New(rand.NewSource(time.Now().UnixNano()))
- // Ensure concurrent safe accecess to rng and its underlying source.
- rngMu sync.Mutex
-)
+ rng *rand.Rand
+}
-// random returns, as a float64, a uniform pseudo-random number in the open
-// interval (0.0,1.0).
-func random() float64 {
+func newFixedSizeReservoir(s *storage) *FixedSizeReservoir {
+ r := &FixedSizeReservoir{
+ storage: s,
+ rng: rand.New(rand.NewSource(time.Now().UnixNano())),
+ }
+ r.reset()
+ return r
+}
+
+// randomFloat64 returns, as a float64, a uniform pseudo-random number in the
+// open interval (0.0,1.0).
+func (r *FixedSizeReservoir) randomFloat64() float64 {
// TODO: This does not return a uniform number. rng.Float64 returns a
// uniformly random int in [0,2^53) that is divided by 2^53. Meaning it
// returns multiples of 2^-53, and not all floating point numbers between 0
@@ -43,40 +81,25 @@ func random() float64 {
//
// There are likely many other methods to explore here as well.
- rngMu.Lock()
- defer rngMu.Unlock()
-
- f := rng.Float64()
+ f := r.rng.Float64()
for f == 0 {
- f = rng.Float64()
+ f = r.rng.Float64()
}
return f
}
-// FixedSize returns a [Reservoir] that samples at most k exemplars. If there
-// are k or less measurements made, the Reservoir will sample each one. If
-// there are more than k, the Reservoir will then randomly sample all
-// additional measurement with a decreasing probability.
-func FixedSize(k int) Reservoir {
- r := &randRes{storage: newStorage(k)}
- r.reset()
- return r
-}
-
-type randRes struct {
- *storage
-
- // count is the number of measurement seen.
- count int64
- // next is the next count that will store a measurement at a random index
- // once the reservoir has been filled.
- next int64
- // w is the largest random number in a distribution that is used to compute
- // the next next.
- w float64
-}
-
-func (r *randRes) Offer(ctx context.Context, t time.Time, n Value, a []attribute.KeyValue) {
+// Offer accepts the parameters associated with a measurement. The
+// parameters will be stored as an exemplar if the Reservoir decides to
+// sample the measurement.
+//
+// The passed ctx needs to contain any baggage or span that were active
+// when the measurement was made. This information may be used by the
+// Reservoir in making a sampling decision.
+//
+// The time t is the time when the measurement was made. The v and a
+// parameters are the value and dropped (filtered) attributes of the
+// measurement respectively.
+func (r *FixedSizeReservoir) Offer(ctx context.Context, t time.Time, n Value, a []attribute.KeyValue) {
// The following algorithm is "Algorithm L" from Li, Kim-Hung (4 December
// 1994). "Reservoir-Sampling Algorithms of Time Complexity
// O(n(1+log(N/n)))". ACM Transactions on Mathematical Software. 20 (4):
@@ -123,7 +146,7 @@ func (r *randRes) Offer(ctx context.Context, t time.Time, n Value, a []attribute
} else {
if r.count == r.next {
// Overwrite a random existing measurement with the one offered.
- idx := int(rng.Int63n(int64(cap(r.store))))
+ idx := int(r.rng.Int63n(int64(cap(r.store))))
r.store[idx] = newMeasurement(ctx, t, n, a)
r.advance()
}
@@ -132,7 +155,7 @@ func (r *randRes) Offer(ctx context.Context, t time.Time, n Value, a []attribute
}
// reset resets r to the initial state.
-func (r *randRes) reset() {
+func (r *FixedSizeReservoir) reset() {
// This resets the number of exemplars known.
r.count = 0
// Random index inserts should only happen after the storage is full.
@@ -147,14 +170,14 @@ func (r *randRes) reset() {
// This maps the uniform random number in (0,1) to a geometric distribution
// over the same interval. The mean of the distribution is inversely
// proportional to the storage capacity.
- r.w = math.Exp(math.Log(random()) / float64(cap(r.store)))
+ r.w = math.Exp(math.Log(r.randomFloat64()) / float64(cap(r.store)))
r.advance()
}
// advance updates the count at which the offered measurement will overwrite an
// existing exemplar.
-func (r *randRes) advance() {
+func (r *FixedSizeReservoir) advance() {
// Calculate the next value in the random number series.
//
// The current value of r.w is based on the max of a distribution of random
@@ -167,7 +190,7 @@ func (r *randRes) advance() {
// therefore the next r.w will be based on the same distribution (i.e.
// `max(u_1,u_2,...,u_k)`). Therefore, we can sample the next r.w by
// computing the next random number `u` and take r.w as `w * u^(1/k)`.
- r.w *= math.Exp(math.Log(random()) / float64(cap(r.store)))
+ r.w *= math.Exp(math.Log(r.randomFloat64()) / float64(cap(r.store)))
// Use the new random number in the series to calculate the count of the
// next measurement that will be stored.
//
@@ -178,10 +201,13 @@ func (r *randRes) advance() {
//
// Important to note, the new r.next will always be at least 1 more than
// the last r.next.
- r.next += int64(math.Log(random())/math.Log(1-r.w)) + 1
+ r.next += int64(math.Log(r.randomFloat64())/math.Log(1-r.w)) + 1
}
-func (r *randRes) Collect(dest *[]Exemplar) {
+// Collect returns all the held exemplars.
+//
+// The Reservoir state is preserved after this call.
+func (r *FixedSizeReservoir) Collect(dest *[]Exemplar) {
r.storage.Collect(dest)
// Call reset here even though it will reset r.count and restart the random
// number series. This will persist any old exemplars as long as no new
diff --git a/vendor/go.opentelemetry.io/otel/sdk/metric/exemplar/histogram_reservoir.go b/vendor/go.opentelemetry.io/otel/sdk/metric/exemplar/histogram_reservoir.go
new file mode 100644
index 000000000..3b76cf305
--- /dev/null
+++ b/vendor/go.opentelemetry.io/otel/sdk/metric/exemplar/histogram_reservoir.go
@@ -0,0 +1,70 @@
+// Copyright The OpenTelemetry Authors
+// SPDX-License-Identifier: Apache-2.0
+
+package exemplar // import "go.opentelemetry.io/otel/sdk/metric/exemplar"
+
+import (
+ "context"
+ "slices"
+ "sort"
+ "time"
+
+ "go.opentelemetry.io/otel/attribute"
+)
+
+// HistogramReservoirProvider is a provider of [HistogramReservoir].
+func HistogramReservoirProvider(bounds []float64) ReservoirProvider {
+ cp := slices.Clone(bounds)
+ slices.Sort(cp)
+ return func(_ attribute.Set) Reservoir {
+ return NewHistogramReservoir(cp)
+ }
+}
+
+// NewHistogramReservoir returns a [HistogramReservoir] that samples the last
+// measurement that falls within a histogram bucket. The histogram bucket
+// upper-boundaries are define by bounds.
+//
+// The passed bounds must be sorted before calling this function.
+func NewHistogramReservoir(bounds []float64) *HistogramReservoir {
+ return &HistogramReservoir{
+ bounds: bounds,
+ storage: newStorage(len(bounds) + 1),
+ }
+}
+
+var _ Reservoir = &HistogramReservoir{}
+
+// HistogramReservoir is a [Reservoir] that samples the last measurement that
+// falls within a histogram bucket. The histogram bucket upper-boundaries are
+// define by bounds.
+type HistogramReservoir struct {
+ *storage
+
+ // bounds are bucket bounds in ascending order.
+ bounds []float64
+}
+
+// Offer accepts the parameters associated with a measurement. The
+// parameters will be stored as an exemplar if the Reservoir decides to
+// sample the measurement.
+//
+// The passed ctx needs to contain any baggage or span that were active
+// when the measurement was made. This information may be used by the
+// Reservoir in making a sampling decision.
+//
+// The time t is the time when the measurement was made. The v and a
+// parameters are the value and dropped (filtered) attributes of the
+// measurement respectively.
+func (r *HistogramReservoir) Offer(ctx context.Context, t time.Time, v Value, a []attribute.KeyValue) {
+ var x float64
+ switch v.Type() {
+ case Int64ValueType:
+ x = float64(v.Int64())
+ case Float64ValueType:
+ x = v.Float64()
+ default:
+ panic("unknown value type")
+ }
+ r.store[sort.SearchFloat64s(r.bounds, x)] = newMeasurement(ctx, t, v, a)
+}
diff --git a/vendor/go.opentelemetry.io/otel/sdk/metric/internal/exemplar/reservoir.go b/vendor/go.opentelemetry.io/otel/sdk/metric/exemplar/reservoir.go
index 80fa59554..ba5cd1a6b 100644
--- a/vendor/go.opentelemetry.io/otel/sdk/metric/internal/exemplar/reservoir.go
+++ b/vendor/go.opentelemetry.io/otel/sdk/metric/exemplar/reservoir.go
@@ -1,7 +1,7 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0
-package exemplar // import "go.opentelemetry.io/otel/sdk/metric/internal/exemplar"
+package exemplar // import "go.opentelemetry.io/otel/sdk/metric/exemplar"
import (
"context"
@@ -30,3 +30,11 @@ type Reservoir interface {
// The Reservoir state is preserved after this call.
Collect(dest *[]Exemplar)
}
+
+// ReservoirProvider creates new [Reservoir]s.
+//
+// The attributes provided are attributes which are kept by the aggregation, and
+// are exclusive with attributes passed to Offer. The combination of these
+// attributes and the attributes passed to Offer is the complete set of
+// attributes a measurement was made with.
+type ReservoirProvider func(attr attribute.Set) Reservoir
diff --git a/vendor/go.opentelemetry.io/otel/sdk/metric/internal/exemplar/storage.go b/vendor/go.opentelemetry.io/otel/sdk/metric/exemplar/storage.go
index 10b2976f7..0e2e26dfb 100644
--- a/vendor/go.opentelemetry.io/otel/sdk/metric/internal/exemplar/storage.go
+++ b/vendor/go.opentelemetry.io/otel/sdk/metric/exemplar/storage.go
@@ -1,7 +1,7 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0
-package exemplar // import "go.opentelemetry.io/otel/sdk/metric/internal/exemplar"
+package exemplar // import "go.opentelemetry.io/otel/sdk/metric/exemplar"
import (
"context"
@@ -35,7 +35,7 @@ func (r *storage) Collect(dest *[]Exemplar) {
continue
}
- m.Exemplar(&(*dest)[n])
+ m.exemplar(&(*dest)[n])
n++
}
*dest = (*dest)[:n]
@@ -66,8 +66,8 @@ func newMeasurement(ctx context.Context, ts time.Time, v Value, droppedAttr []at
}
}
-// Exemplar returns m as an [Exemplar].
-func (m measurement) Exemplar(dest *Exemplar) {
+// exemplar returns m as an [Exemplar].
+func (m measurement) exemplar(dest *Exemplar) {
dest.FilteredAttributes = m.FilteredAttributes
dest.Time = m.Time
dest.Value = m.Value
diff --git a/vendor/go.opentelemetry.io/otel/sdk/metric/internal/exemplar/value.go b/vendor/go.opentelemetry.io/otel/sdk/metric/exemplar/value.go
index 1957d6b1e..590b089a8 100644
--- a/vendor/go.opentelemetry.io/otel/sdk/metric/internal/exemplar/value.go
+++ b/vendor/go.opentelemetry.io/otel/sdk/metric/exemplar/value.go
@@ -1,7 +1,7 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0
-package exemplar // import "go.opentelemetry.io/otel/sdk/metric/internal/exemplar"
+package exemplar // import "go.opentelemetry.io/otel/sdk/metric/exemplar"
import "math"
@@ -28,7 +28,8 @@ type Value struct {
func NewValue[N int64 | float64](value N) Value {
switch v := any(value).(type) {
case int64:
- return Value{t: Int64ValueType, val: uint64(v)}
+ // This can be later converted back to int64 (overflow not checked).
+ return Value{t: Int64ValueType, val: uint64(v)} // nolint:gosec
case float64:
return Value{t: Float64ValueType, val: math.Float64bits(v)}
}
diff --git a/vendor/go.opentelemetry.io/otel/sdk/metric/exporter.go b/vendor/go.opentelemetry.io/otel/sdk/metric/exporter.go
index 1a3cccb67..1969cb42c 100644
--- a/vendor/go.opentelemetry.io/otel/sdk/metric/exporter.go
+++ b/vendor/go.opentelemetry.io/otel/sdk/metric/exporter.go
@@ -5,14 +5,14 @@ package metric // import "go.opentelemetry.io/otel/sdk/metric"
import (
"context"
- "fmt"
+ "errors"
"go.opentelemetry.io/otel/sdk/metric/metricdata"
)
// ErrExporterShutdown is returned if Export or Shutdown are called after an
// Exporter has been Shutdown.
-var ErrExporterShutdown = fmt.Errorf("exporter is shutdown")
+var ErrExporterShutdown = errors.New("exporter is shutdown")
// Exporter handles the delivery of metric data to external receivers. This is
// the final component in the metric push pipeline.
diff --git a/vendor/go.opentelemetry.io/otel/sdk/metric/instrument.go b/vendor/go.opentelemetry.io/otel/sdk/metric/instrument.go
index b52a330b3..c33e1a28c 100644
--- a/vendor/go.opentelemetry.io/otel/sdk/metric/instrument.go
+++ b/vendor/go.opentelemetry.io/otel/sdk/metric/instrument.go
@@ -16,6 +16,7 @@ import (
"go.opentelemetry.io/otel/metric/embedded"
"go.opentelemetry.io/otel/sdk/instrumentation"
"go.opentelemetry.io/otel/sdk/metric/internal/aggregate"
+ "go.opentelemetry.io/otel/sdk/metric/internal/x"
)
var zeroScope instrumentation.Scope
@@ -144,6 +145,12 @@ type Stream struct {
// Use NewAllowKeysFilter from "go.opentelemetry.io/otel/attribute" to
// provide an allow-list of attribute keys here.
AttributeFilter attribute.Filter
+ // ExemplarReservoirProvider selects the
+ // [go.opentelemetry.io/otel/sdk/metric/exemplar.ReservoirProvider] based
+ // on the [Aggregation].
+ //
+ // If unspecified, [DefaultExemplarReservoirProviderSelector] is used.
+ ExemplarReservoirProviderSelector ExemplarReservoirProviderSelector
}
// instID are the identifying properties of a instrument.
@@ -184,6 +191,7 @@ var (
_ metric.Int64UpDownCounter = (*int64Inst)(nil)
_ metric.Int64Histogram = (*int64Inst)(nil)
_ metric.Int64Gauge = (*int64Inst)(nil)
+ _ x.EnabledInstrument = (*int64Inst)(nil)
)
func (i *int64Inst) Add(ctx context.Context, val int64, opts ...metric.AddOption) {
@@ -196,6 +204,10 @@ func (i *int64Inst) Record(ctx context.Context, val int64, opts ...metric.Record
i.aggregate(ctx, val, c.Attributes())
}
+func (i *int64Inst) Enabled(_ context.Context) bool {
+ return len(i.measures) != 0
+}
+
func (i *int64Inst) aggregate(ctx context.Context, val int64, s attribute.Set) { // nolint:revive // okay to shadow pkg with method.
for _, in := range i.measures {
in(ctx, val, s)
@@ -216,6 +228,7 @@ var (
_ metric.Float64UpDownCounter = (*float64Inst)(nil)
_ metric.Float64Histogram = (*float64Inst)(nil)
_ metric.Float64Gauge = (*float64Inst)(nil)
+ _ x.EnabledInstrument = (*float64Inst)(nil)
)
func (i *float64Inst) Add(ctx context.Context, val float64, opts ...metric.AddOption) {
@@ -228,14 +241,18 @@ func (i *float64Inst) Record(ctx context.Context, val float64, opts ...metric.Re
i.aggregate(ctx, val, c.Attributes())
}
+func (i *float64Inst) Enabled(_ context.Context) bool {
+ return len(i.measures) != 0
+}
+
func (i *float64Inst) aggregate(ctx context.Context, val float64, s attribute.Set) {
for _, in := range i.measures {
in(ctx, val, s)
}
}
-// observablID is a comparable unique identifier of an observable.
-type observablID[N int64 | float64] struct {
+// observableID is a comparable unique identifier of an observable.
+type observableID[N int64 | float64] struct {
name string
description string
kind InstrumentKind
@@ -287,7 +304,7 @@ func newInt64Observable(m *meter, kind InstrumentKind, name, desc, u string) int
type observable[N int64 | float64] struct {
metric.Observable
- observablID[N]
+ observableID[N]
meter *meter
measures measures[N]
@@ -296,7 +313,7 @@ type observable[N int64 | float64] struct {
func newObservable[N int64 | float64](m *meter, kind InstrumentKind, name, desc, u string) *observable[N] {
return &observable[N]{
- observablID: observablID[N]{
+ observableID: observableID[N]{
name: name,
description: desc,
kind: kind,
diff --git a/vendor/go.opentelemetry.io/otel/sdk/metric/internal/aggregate/aggregate.go b/vendor/go.opentelemetry.io/otel/sdk/metric/internal/aggregate/aggregate.go
index b18ee719b..fde219333 100644
--- a/vendor/go.opentelemetry.io/otel/sdk/metric/internal/aggregate/aggregate.go
+++ b/vendor/go.opentelemetry.io/otel/sdk/metric/internal/aggregate/aggregate.go
@@ -8,7 +8,6 @@ import (
"time"
"go.opentelemetry.io/otel/attribute"
- "go.opentelemetry.io/otel/sdk/metric/internal/exemplar"
"go.opentelemetry.io/otel/sdk/metric/metricdata"
)
@@ -38,8 +37,8 @@ type Builder[N int64 | float64] struct {
// create new exemplar reservoirs for a new seen attribute set.
//
// If this is not provided a default factory function that returns an
- // exemplar.Drop reservoir will be used.
- ReservoirFunc func() exemplar.FilteredReservoir[N]
+ // dropReservoir reservoir will be used.
+ ReservoirFunc func(attribute.Set) FilteredExemplarReservoir[N]
// AggregationLimit is the cardinality limit of measurement attributes. Any
// measurement for new attributes once the limit has been reached will be
// aggregated into a single aggregate for the "otel.metric.overflow"
@@ -50,12 +49,12 @@ type Builder[N int64 | float64] struct {
AggregationLimit int
}
-func (b Builder[N]) resFunc() func() exemplar.FilteredReservoir[N] {
+func (b Builder[N]) resFunc() func(attribute.Set) FilteredExemplarReservoir[N] {
if b.ReservoirFunc != nil {
return b.ReservoirFunc
}
- return exemplar.Drop
+ return dropReservoir
}
type fltrMeasure[N int64 | float64] func(ctx context.Context, value N, fltrAttr attribute.Set, droppedAttr []attribute.KeyValue)
diff --git a/vendor/go.opentelemetry.io/otel/sdk/metric/internal/aggregate/drop.go b/vendor/go.opentelemetry.io/otel/sdk/metric/internal/aggregate/drop.go
new file mode 100644
index 000000000..8396faaa4
--- /dev/null
+++ b/vendor/go.opentelemetry.io/otel/sdk/metric/internal/aggregate/drop.go
@@ -0,0 +1,27 @@
+// Copyright The OpenTelemetry Authors
+// SPDX-License-Identifier: Apache-2.0
+
+package aggregate // import "go.opentelemetry.io/otel/sdk/metric/internal/aggregate"
+
+import (
+ "context"
+
+ "go.opentelemetry.io/otel/attribute"
+ "go.opentelemetry.io/otel/sdk/metric/exemplar"
+)
+
+// dropReservoir returns a [FilteredReservoir] that drops all measurements it is offered.
+func dropReservoir[N int64 | float64](attribute.Set) FilteredExemplarReservoir[N] {
+ return &dropRes[N]{}
+}
+
+type dropRes[N int64 | float64] struct{}
+
+// Offer does nothing, all measurements offered will be dropped.
+func (r *dropRes[N]) Offer(context.Context, N, []attribute.KeyValue) {}
+
+// Collect resets dest. No exemplars will ever be returned.
+func (r *dropRes[N]) Collect(dest *[]exemplar.Exemplar) {
+ clear(*dest) // Erase elements to let GC collect objects
+ *dest = (*dest)[:0]
+}
diff --git a/vendor/go.opentelemetry.io/otel/sdk/metric/internal/aggregate/exemplar.go b/vendor/go.opentelemetry.io/otel/sdk/metric/internal/aggregate/exemplar.go
index 170ae8e58..25d709948 100644
--- a/vendor/go.opentelemetry.io/otel/sdk/metric/internal/aggregate/exemplar.go
+++ b/vendor/go.opentelemetry.io/otel/sdk/metric/internal/aggregate/exemplar.go
@@ -6,7 +6,7 @@ package aggregate // import "go.opentelemetry.io/otel/sdk/metric/internal/aggreg
import (
"sync"
- "go.opentelemetry.io/otel/sdk/metric/internal/exemplar"
+ "go.opentelemetry.io/otel/sdk/metric/exemplar"
"go.opentelemetry.io/otel/sdk/metric/metricdata"
)
@@ -17,6 +17,7 @@ var exemplarPool = sync.Pool{
func collectExemplars[N int64 | float64](out *[]metricdata.Exemplar[N], f func(*[]exemplar.Exemplar)) {
dest := exemplarPool.Get().(*[]exemplar.Exemplar)
defer func() {
+ clear(*dest) // Erase elements to let GC collect objects.
*dest = (*dest)[:0]
exemplarPool.Put(dest)
}()
diff --git a/vendor/go.opentelemetry.io/otel/sdk/metric/internal/aggregate/exponential_histogram.go b/vendor/go.opentelemetry.io/otel/sdk/metric/internal/aggregate/exponential_histogram.go
index 707342408..336ea91d1 100644
--- a/vendor/go.opentelemetry.io/otel/sdk/metric/internal/aggregate/exponential_histogram.go
+++ b/vendor/go.opentelemetry.io/otel/sdk/metric/internal/aggregate/exponential_histogram.go
@@ -12,7 +12,6 @@ import (
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
- "go.opentelemetry.io/otel/sdk/metric/internal/exemplar"
"go.opentelemetry.io/otel/sdk/metric/metricdata"
)
@@ -31,7 +30,7 @@ const (
// expoHistogramDataPoint is a single data point in an exponential histogram.
type expoHistogramDataPoint[N int64 | float64] struct {
attrs attribute.Set
- res exemplar.FilteredReservoir[N]
+ res FilteredExemplarReservoir[N]
count uint64
min N
@@ -51,16 +50,16 @@ type expoHistogramDataPoint[N int64 | float64] struct {
func newExpoHistogramDataPoint[N int64 | float64](attrs attribute.Set, maxSize int, maxScale int32, noMinMax, noSum bool) *expoHistogramDataPoint[N] {
f := math.MaxFloat64
- max := N(f) // if N is int64, max will overflow to -9223372036854775808
- min := N(-f)
+ ma := N(f) // if N is int64, max will overflow to -9223372036854775808
+ mi := N(-f)
if N(maxInt64) > N(f) {
- max = N(maxInt64)
- min = N(minInt64)
+ ma = N(maxInt64)
+ mi = N(minInt64)
}
return &expoHistogramDataPoint[N]{
attrs: attrs,
- min: max,
- max: min,
+ min: ma,
+ max: mi,
maxSize: maxSize,
noMinMax: noMinMax,
noSum: noSum,
@@ -284,7 +283,7 @@ func (b *expoBuckets) downscale(delta int32) {
// newExponentialHistogram returns an Aggregator that summarizes a set of
// measurements as an exponential histogram. Each histogram is scoped by attributes
// and the aggregation cycle the measurements were made in.
-func newExponentialHistogram[N int64 | float64](maxSize, maxScale int32, noMinMax, noSum bool, limit int, r func() exemplar.FilteredReservoir[N]) *expoHistogram[N] {
+func newExponentialHistogram[N int64 | float64](maxSize, maxScale int32, noMinMax, noSum bool, limit int, r func(attribute.Set) FilteredExemplarReservoir[N]) *expoHistogram[N] {
return &expoHistogram[N]{
noSum: noSum,
noMinMax: noMinMax,
@@ -307,7 +306,7 @@ type expoHistogram[N int64 | float64] struct {
maxSize int
maxScale int32
- newRes func() exemplar.FilteredReservoir[N]
+ newRes func(attribute.Set) FilteredExemplarReservoir[N]
limit limiter[*expoHistogramDataPoint[N]]
values map[attribute.Distinct]*expoHistogramDataPoint[N]
valuesMu sync.Mutex
@@ -328,7 +327,7 @@ func (e *expoHistogram[N]) measure(ctx context.Context, value N, fltrAttr attrib
v, ok := e.values[attr.Equivalent()]
if !ok {
v = newExpoHistogramDataPoint[N](attr, e.maxSize, e.maxScale, e.noMinMax, e.noSum)
- v.res = e.newRes()
+ v.res = e.newRes(attr)
e.values[attr.Equivalent()] = v
}
diff --git a/vendor/go.opentelemetry.io/otel/sdk/metric/internal/aggregate/filtered_reservoir.go b/vendor/go.opentelemetry.io/otel/sdk/metric/internal/aggregate/filtered_reservoir.go
new file mode 100644
index 000000000..691a91060
--- /dev/null
+++ b/vendor/go.opentelemetry.io/otel/sdk/metric/internal/aggregate/filtered_reservoir.go
@@ -0,0 +1,50 @@
+// Copyright The OpenTelemetry Authors
+// SPDX-License-Identifier: Apache-2.0
+
+package aggregate // import "go.opentelemetry.io/otel/sdk/metric/internal/aggregate"
+
+import (
+ "context"
+ "time"
+
+ "go.opentelemetry.io/otel/attribute"
+ "go.opentelemetry.io/otel/sdk/metric/exemplar"
+)
+
+// FilteredExemplarReservoir wraps a [exemplar.Reservoir] with a filter.
+type FilteredExemplarReservoir[N int64 | float64] interface {
+ // Offer accepts the parameters associated with a measurement. The
+ // parameters will be stored as an exemplar if the filter decides to
+ // sample the measurement.
+ //
+ // The passed ctx needs to contain any baggage or span that were active
+ // when the measurement was made. This information may be used by the
+ // Reservoir in making a sampling decision.
+ Offer(ctx context.Context, val N, attr []attribute.KeyValue)
+ // Collect returns all the held exemplars in the reservoir.
+ Collect(dest *[]exemplar.Exemplar)
+}
+
+// filteredExemplarReservoir handles the pre-sampled exemplar of measurements made.
+type filteredExemplarReservoir[N int64 | float64] struct {
+ filter exemplar.Filter
+ reservoir exemplar.Reservoir
+}
+
+// NewFilteredExemplarReservoir creates a [FilteredExemplarReservoir] which only offers values
+// that are allowed by the filter.
+func NewFilteredExemplarReservoir[N int64 | float64](f exemplar.Filter, r exemplar.Reservoir) FilteredExemplarReservoir[N] {
+ return &filteredExemplarReservoir[N]{
+ filter: f,
+ reservoir: r,
+ }
+}
+
+func (f *filteredExemplarReservoir[N]) Offer(ctx context.Context, val N, attr []attribute.KeyValue) {
+ if f.filter(ctx) {
+ // only record the current time if we are sampling this measurement.
+ f.reservoir.Offer(ctx, time.Now(), exemplar.NewValue(val), attr)
+ }
+}
+
+func (f *filteredExemplarReservoir[N]) Collect(dest *[]exemplar.Exemplar) { f.reservoir.Collect(dest) }
diff --git a/vendor/go.opentelemetry.io/otel/sdk/metric/internal/aggregate/histogram.go b/vendor/go.opentelemetry.io/otel/sdk/metric/internal/aggregate/histogram.go
index ade0941f5..d577ae2c1 100644
--- a/vendor/go.opentelemetry.io/otel/sdk/metric/internal/aggregate/histogram.go
+++ b/vendor/go.opentelemetry.io/otel/sdk/metric/internal/aggregate/histogram.go
@@ -11,13 +11,12 @@ import (
"time"
"go.opentelemetry.io/otel/attribute"
- "go.opentelemetry.io/otel/sdk/metric/internal/exemplar"
"go.opentelemetry.io/otel/sdk/metric/metricdata"
)
type buckets[N int64 | float64] struct {
attrs attribute.Set
- res exemplar.FilteredReservoir[N]
+ res FilteredExemplarReservoir[N]
counts []uint64
count uint64
@@ -48,13 +47,13 @@ type histValues[N int64 | float64] struct {
noSum bool
bounds []float64
- newRes func() exemplar.FilteredReservoir[N]
+ newRes func(attribute.Set) FilteredExemplarReservoir[N]
limit limiter[*buckets[N]]
values map[attribute.Distinct]*buckets[N]
valuesMu sync.Mutex
}
-func newHistValues[N int64 | float64](bounds []float64, noSum bool, limit int, r func() exemplar.FilteredReservoir[N]) *histValues[N] {
+func newHistValues[N int64 | float64](bounds []float64, noSum bool, limit int, r func(attribute.Set) FilteredExemplarReservoir[N]) *histValues[N] {
// The responsibility of keeping all buckets correctly associated with the
// passed boundaries is ultimately this type's responsibility. Make a copy
// here so we can always guarantee this. Or, in the case of failure, have
@@ -94,7 +93,7 @@ func (s *histValues[N]) measure(ctx context.Context, value N, fltrAttr attribute
//
// buckets = (-∞, 0], (0, 5.0], (5.0, 10.0], (10.0, +∞)
b = newBuckets[N](attr, len(s.bounds)+1)
- b.res = s.newRes()
+ b.res = s.newRes(attr)
// Ensure min and max are recorded values (not zero), for new buckets.
b.min, b.max = value, value
@@ -109,7 +108,7 @@ func (s *histValues[N]) measure(ctx context.Context, value N, fltrAttr attribute
// newHistogram returns an Aggregator that summarizes a set of measurements as
// an histogram.
-func newHistogram[N int64 | float64](boundaries []float64, noMinMax, noSum bool, limit int, r func() exemplar.FilteredReservoir[N]) *histogram[N] {
+func newHistogram[N int64 | float64](boundaries []float64, noMinMax, noSum bool, limit int, r func(attribute.Set) FilteredExemplarReservoir[N]) *histogram[N] {
return &histogram[N]{
histValues: newHistValues[N](boundaries, noSum, limit, r),
noMinMax: noMinMax,
diff --git a/vendor/go.opentelemetry.io/otel/sdk/metric/internal/aggregate/lastvalue.go b/vendor/go.opentelemetry.io/otel/sdk/metric/internal/aggregate/lastvalue.go
index c35936840..d3a93f085 100644
--- a/vendor/go.opentelemetry.io/otel/sdk/metric/internal/aggregate/lastvalue.go
+++ b/vendor/go.opentelemetry.io/otel/sdk/metric/internal/aggregate/lastvalue.go
@@ -9,7 +9,6 @@ import (
"time"
"go.opentelemetry.io/otel/attribute"
- "go.opentelemetry.io/otel/sdk/metric/internal/exemplar"
"go.opentelemetry.io/otel/sdk/metric/metricdata"
)
@@ -17,10 +16,10 @@ import (
type datapoint[N int64 | float64] struct {
attrs attribute.Set
value N
- res exemplar.FilteredReservoir[N]
+ res FilteredExemplarReservoir[N]
}
-func newLastValue[N int64 | float64](limit int, r func() exemplar.FilteredReservoir[N]) *lastValue[N] {
+func newLastValue[N int64 | float64](limit int, r func(attribute.Set) FilteredExemplarReservoir[N]) *lastValue[N] {
return &lastValue[N]{
newRes: r,
limit: newLimiter[datapoint[N]](limit),
@@ -33,7 +32,7 @@ func newLastValue[N int64 | float64](limit int, r func() exemplar.FilteredReserv
type lastValue[N int64 | float64] struct {
sync.Mutex
- newRes func() exemplar.FilteredReservoir[N]
+ newRes func(attribute.Set) FilteredExemplarReservoir[N]
limit limiter[datapoint[N]]
values map[attribute.Distinct]datapoint[N]
start time.Time
@@ -46,7 +45,7 @@ func (s *lastValue[N]) measure(ctx context.Context, value N, fltrAttr attribute.
attr := s.limit.Attributes(fltrAttr, s.values)
d, ok := s.values[attr.Equivalent()]
if !ok {
- d.res = s.newRes()
+ d.res = s.newRes(attr)
}
d.attrs = attr
@@ -115,7 +114,7 @@ func (s *lastValue[N]) copyDpts(dest *[]metricdata.DataPoint[N], t time.Time) in
// newPrecomputedLastValue returns an aggregator that summarizes a set of
// observations as the last one made.
-func newPrecomputedLastValue[N int64 | float64](limit int, r func() exemplar.FilteredReservoir[N]) *precomputedLastValue[N] {
+func newPrecomputedLastValue[N int64 | float64](limit int, r func(attribute.Set) FilteredExemplarReservoir[N]) *precomputedLastValue[N] {
return &precomputedLastValue[N]{lastValue: newLastValue[N](limit, r)}
}
diff --git a/vendor/go.opentelemetry.io/otel/sdk/metric/internal/aggregate/sum.go b/vendor/go.opentelemetry.io/otel/sdk/metric/internal/aggregate/sum.go
index 891366922..8e132ad61 100644
--- a/vendor/go.opentelemetry.io/otel/sdk/metric/internal/aggregate/sum.go
+++ b/vendor/go.opentelemetry.io/otel/sdk/metric/internal/aggregate/sum.go
@@ -9,25 +9,24 @@ import (
"time"
"go.opentelemetry.io/otel/attribute"
- "go.opentelemetry.io/otel/sdk/metric/internal/exemplar"
"go.opentelemetry.io/otel/sdk/metric/metricdata"
)
type sumValue[N int64 | float64] struct {
n N
- res exemplar.FilteredReservoir[N]
+ res FilteredExemplarReservoir[N]
attrs attribute.Set
}
// valueMap is the storage for sums.
type valueMap[N int64 | float64] struct {
sync.Mutex
- newRes func() exemplar.FilteredReservoir[N]
+ newRes func(attribute.Set) FilteredExemplarReservoir[N]
limit limiter[sumValue[N]]
values map[attribute.Distinct]sumValue[N]
}
-func newValueMap[N int64 | float64](limit int, r func() exemplar.FilteredReservoir[N]) *valueMap[N] {
+func newValueMap[N int64 | float64](limit int, r func(attribute.Set) FilteredExemplarReservoir[N]) *valueMap[N] {
return &valueMap[N]{
newRes: r,
limit: newLimiter[sumValue[N]](limit),
@@ -42,7 +41,7 @@ func (s *valueMap[N]) measure(ctx context.Context, value N, fltrAttr attribute.S
attr := s.limit.Attributes(fltrAttr, s.values)
v, ok := s.values[attr.Equivalent()]
if !ok {
- v.res = s.newRes()
+ v.res = s.newRes(attr)
}
v.attrs = attr
@@ -55,7 +54,7 @@ func (s *valueMap[N]) measure(ctx context.Context, value N, fltrAttr attribute.S
// newSum returns an aggregator that summarizes a set of measurements as their
// arithmetic sum. Each sum is scoped by attributes and the aggregation cycle
// the measurements were made in.
-func newSum[N int64 | float64](monotonic bool, limit int, r func() exemplar.FilteredReservoir[N]) *sum[N] {
+func newSum[N int64 | float64](monotonic bool, limit int, r func(attribute.Set) FilteredExemplarReservoir[N]) *sum[N] {
return &sum[N]{
valueMap: newValueMap[N](limit, r),
monotonic: monotonic,
@@ -142,9 +141,9 @@ func (s *sum[N]) cumulative(dest *metricdata.Aggregation) int {
}
// newPrecomputedSum returns an aggregator that summarizes a set of
-// observatrions as their arithmetic sum. Each sum is scoped by attributes and
+// observations as their arithmetic sum. Each sum is scoped by attributes and
// the aggregation cycle the measurements were made in.
-func newPrecomputedSum[N int64 | float64](monotonic bool, limit int, r func() exemplar.FilteredReservoir[N]) *precomputedSum[N] {
+func newPrecomputedSum[N int64 | float64](monotonic bool, limit int, r func(attribute.Set) FilteredExemplarReservoir[N]) *precomputedSum[N] {
return &precomputedSum[N]{
valueMap: newValueMap[N](limit, r),
monotonic: monotonic,
@@ -152,7 +151,7 @@ func newPrecomputedSum[N int64 | float64](monotonic bool, limit int, r func() ex
}
}
-// precomputedSum summarizes a set of observatrions as their arithmetic sum.
+// precomputedSum summarizes a set of observations as their arithmetic sum.
type precomputedSum[N int64 | float64] struct {
*valueMap[N]
diff --git a/vendor/go.opentelemetry.io/otel/sdk/metric/internal/exemplar/drop.go b/vendor/go.opentelemetry.io/otel/sdk/metric/internal/exemplar/drop.go
deleted file mode 100644
index 5a0f39ae1..000000000
--- a/vendor/go.opentelemetry.io/otel/sdk/metric/internal/exemplar/drop.go
+++ /dev/null
@@ -1,23 +0,0 @@
-// Copyright The OpenTelemetry Authors
-// SPDX-License-Identifier: Apache-2.0
-
-package exemplar // import "go.opentelemetry.io/otel/sdk/metric/internal/exemplar"
-
-import (
- "context"
-
- "go.opentelemetry.io/otel/attribute"
-)
-
-// Drop returns a [FilteredReservoir] that drops all measurements it is offered.
-func Drop[N int64 | float64]() FilteredReservoir[N] { return &dropRes[N]{} }
-
-type dropRes[N int64 | float64] struct{}
-
-// Offer does nothing, all measurements offered will be dropped.
-func (r *dropRes[N]) Offer(context.Context, N, []attribute.KeyValue) {}
-
-// Collect resets dest. No exemplars will ever be returned.
-func (r *dropRes[N]) Collect(dest *[]Exemplar) {
- *dest = (*dest)[:0]
-}
diff --git a/vendor/go.opentelemetry.io/otel/sdk/metric/internal/exemplar/filtered_reservoir.go b/vendor/go.opentelemetry.io/otel/sdk/metric/internal/exemplar/filtered_reservoir.go
deleted file mode 100644
index 9fedfa4be..000000000
--- a/vendor/go.opentelemetry.io/otel/sdk/metric/internal/exemplar/filtered_reservoir.go
+++ /dev/null
@@ -1,49 +0,0 @@
-// Copyright The OpenTelemetry Authors
-// SPDX-License-Identifier: Apache-2.0
-
-package exemplar // import "go.opentelemetry.io/otel/sdk/metric/internal/exemplar"
-
-import (
- "context"
- "time"
-
- "go.opentelemetry.io/otel/attribute"
-)
-
-// FilteredReservoir wraps a [Reservoir] with a filter.
-type FilteredReservoir[N int64 | float64] interface {
- // Offer accepts the parameters associated with a measurement. The
- // parameters will be stored as an exemplar if the filter decides to
- // sample the measurement.
- //
- // The passed ctx needs to contain any baggage or span that were active
- // when the measurement was made. This information may be used by the
- // Reservoir in making a sampling decision.
- Offer(ctx context.Context, val N, attr []attribute.KeyValue)
- // Collect returns all the held exemplars in the reservoir.
- Collect(dest *[]Exemplar)
-}
-
-// filteredReservoir handles the pre-sampled exemplar of measurements made.
-type filteredReservoir[N int64 | float64] struct {
- filter Filter
- reservoir Reservoir
-}
-
-// NewFilteredReservoir creates a [FilteredReservoir] which only offers values
-// that are allowed by the filter.
-func NewFilteredReservoir[N int64 | float64](f Filter, r Reservoir) FilteredReservoir[N] {
- return &filteredReservoir[N]{
- filter: f,
- reservoir: r,
- }
-}
-
-func (f *filteredReservoir[N]) Offer(ctx context.Context, val N, attr []attribute.KeyValue) {
- if f.filter(ctx) {
- // only record the current time if we are sampling this measurment.
- f.reservoir.Offer(ctx, time.Now(), NewValue(val), attr)
- }
-}
-
-func (f *filteredReservoir[N]) Collect(dest *[]Exemplar) { f.reservoir.Collect(dest) }
diff --git a/vendor/go.opentelemetry.io/otel/sdk/metric/internal/exemplar/hist.go b/vendor/go.opentelemetry.io/otel/sdk/metric/internal/exemplar/hist.go
deleted file mode 100644
index a6ff86d02..000000000
--- a/vendor/go.opentelemetry.io/otel/sdk/metric/internal/exemplar/hist.go
+++ /dev/null
@@ -1,46 +0,0 @@
-// Copyright The OpenTelemetry Authors
-// SPDX-License-Identifier: Apache-2.0
-
-package exemplar // import "go.opentelemetry.io/otel/sdk/metric/internal/exemplar"
-
-import (
- "context"
- "slices"
- "sort"
- "time"
-
- "go.opentelemetry.io/otel/attribute"
-)
-
-// Histogram returns a [Reservoir] that samples the last measurement that falls
-// within a histogram bucket. The histogram bucket upper-boundaries are define
-// by bounds.
-//
-// The passed bounds will be sorted by this function.
-func Histogram(bounds []float64) Reservoir {
- slices.Sort(bounds)
- return &histRes{
- bounds: bounds,
- storage: newStorage(len(bounds) + 1),
- }
-}
-
-type histRes struct {
- *storage
-
- // bounds are bucket bounds in ascending order.
- bounds []float64
-}
-
-func (r *histRes) Offer(ctx context.Context, t time.Time, v Value, a []attribute.KeyValue) {
- var x float64
- switch v.Type() {
- case Int64ValueType:
- x = float64(v.Int64())
- case Float64ValueType:
- x = v.Float64()
- default:
- panic("unknown value type")
- }
- r.store[sort.SearchFloat64s(r.bounds, x)] = newMeasurement(ctx, t, v, a)
-}
diff --git a/vendor/go.opentelemetry.io/otel/sdk/metric/internal/x/README.md b/vendor/go.opentelemetry.io/otel/sdk/metric/internal/x/README.md
index aba69d654..59f736b73 100644
--- a/vendor/go.opentelemetry.io/otel/sdk/metric/internal/x/README.md
+++ b/vendor/go.opentelemetry.io/otel/sdk/metric/internal/x/README.md
@@ -10,6 +10,7 @@ See the [Compatibility and Stability](#compatibility-and-stability) section for
- [Cardinality Limit](#cardinality-limit)
- [Exemplars](#exemplars)
+- [Instrument Enabled](#instrument-enabled)
### Cardinality Limit
@@ -102,6 +103,24 @@ Revert to the default exemplar filter (`"trace_based"`)
unset OTEL_METRICS_EXEMPLAR_FILTER
```
+### Instrument Enabled
+
+To help users avoid performing computationally expensive operations when recording measurements, synchronous instruments provide an `Enabled` method.
+
+#### Examples
+
+The following code shows an example of how to check if an instrument implements the `EnabledInstrument` interface before using the `Enabled` function to avoid doing an expensive computation:
+
+```go
+type enabledInstrument interface { Enabled(context.Context) bool }
+
+ctr, err := m.Int64Counter("expensive-counter")
+c, ok := ctr.(enabledInstrument)
+if !ok || c.Enabled(context.Background()) {
+ c.Add(expensiveComputation())
+}
+```
+
## Compatibility and Stability
Experimental features do not fall within the scope of the OpenTelemetry Go versioning and stability [policy](../../../../VERSIONING.md).
diff --git a/vendor/go.opentelemetry.io/otel/sdk/metric/internal/x/x.go b/vendor/go.opentelemetry.io/otel/sdk/metric/internal/x/x.go
index 8cd2f3741..a98606238 100644
--- a/vendor/go.opentelemetry.io/otel/sdk/metric/internal/x/x.go
+++ b/vendor/go.opentelemetry.io/otel/sdk/metric/internal/x/x.go
@@ -8,41 +8,26 @@
package x // import "go.opentelemetry.io/otel/sdk/metric/internal/x"
import (
+ "context"
"os"
"strconv"
- "strings"
)
-var (
- // Exemplars is an experimental feature flag that defines if exemplars
- // should be recorded for metric data-points.
- //
- // To enable this feature set the OTEL_GO_X_EXEMPLAR environment variable
- // to the case-insensitive string value of "true" (i.e. "True" and "TRUE"
- // will also enable this).
- Exemplars = newFeature("EXEMPLAR", func(v string) (string, bool) {
- if strings.ToLower(v) == "true" {
- return v, true
- }
- return "", false
- })
-
- // CardinalityLimit is an experimental feature flag that defines if
- // cardinality limits should be applied to the recorded metric data-points.
- //
- // To enable this feature set the OTEL_GO_X_CARDINALITY_LIMIT environment
- // variable to the integer limit value you want to use.
- //
- // Setting OTEL_GO_X_CARDINALITY_LIMIT to a value less than or equal to 0
- // will disable the cardinality limits.
- CardinalityLimit = newFeature("CARDINALITY_LIMIT", func(v string) (int, bool) {
- n, err := strconv.Atoi(v)
- if err != nil {
- return 0, false
- }
- return n, true
- })
-)
+// CardinalityLimit is an experimental feature flag that defines if
+// cardinality limits should be applied to the recorded metric data-points.
+//
+// To enable this feature set the OTEL_GO_X_CARDINALITY_LIMIT environment
+// variable to the integer limit value you want to use.
+//
+// Setting OTEL_GO_X_CARDINALITY_LIMIT to a value less than or equal to 0
+// will disable the cardinality limits.
+var CardinalityLimit = newFeature("CARDINALITY_LIMIT", func(v string) (int, bool) {
+ n, err := strconv.Atoi(v)
+ if err != nil {
+ return 0, false
+ }
+ return n, true
+})
// Feature is an experimental feature control flag. It provides a uniform way
// to interact with these feature flags and parse their values.
@@ -83,3 +68,14 @@ func (f Feature[T]) Enabled() bool {
_, ok := f.Lookup()
return ok
}
+
+// EnabledInstrument informs whether the instrument is enabled.
+//
+// EnabledInstrument interface is implemented by synchronous instruments.
+type EnabledInstrument interface {
+ // Enabled returns whether the instrument will process measurements for the given context.
+ //
+ // This function can be used in places where measuring an instrument
+ // would result in computationally expensive operations.
+ Enabled(context.Context) bool
+}
diff --git a/vendor/go.opentelemetry.io/otel/sdk/metric/manual_reader.go b/vendor/go.opentelemetry.io/otel/sdk/metric/manual_reader.go
index e0fd86ca7..c495985bc 100644
--- a/vendor/go.opentelemetry.io/otel/sdk/metric/manual_reader.go
+++ b/vendor/go.opentelemetry.io/otel/sdk/metric/manual_reader.go
@@ -113,18 +113,17 @@ func (mr *ManualReader) Collect(ctx context.Context, rm *metricdata.ResourceMetr
if err != nil {
return err
}
- var errs []error
for _, producer := range mr.externalProducers.Load().([]Producer) {
- externalMetrics, err := producer.Produce(ctx)
- if err != nil {
- errs = append(errs, err)
+ externalMetrics, e := producer.Produce(ctx)
+ if e != nil {
+ err = errors.Join(err, e)
}
rm.ScopeMetrics = append(rm.ScopeMetrics, externalMetrics...)
}
global.Debug("ManualReader collection", "Data", rm)
- return unifyErrors(errs)
+ return err
}
// MarshalLog returns logging data about the ManualReader.
diff --git a/vendor/go.opentelemetry.io/otel/sdk/metric/meter.go b/vendor/go.opentelemetry.io/otel/sdk/metric/meter.go
index 2309e5b2b..a6ccd117b 100644
--- a/vendor/go.opentelemetry.io/otel/sdk/metric/meter.go
+++ b/vendor/go.opentelemetry.io/otel/sdk/metric/meter.go
@@ -150,6 +150,11 @@ func (m *meter) int64ObservableInstrument(id Instrument, callbacks []metric.Int6
continue
}
inst.appendMeasures(in)
+
+ // Add the measures to the pipeline. It is required to maintain
+ // measures per pipeline to avoid calling the measure that
+ // is not part of the pipeline.
+ insert.pipeline.addInt64Measure(inst.observableID, in)
for _, cback := range callbacks {
inst := int64Observer{measures: in}
fn := cback
@@ -309,6 +314,11 @@ func (m *meter) float64ObservableInstrument(id Instrument, callbacks []metric.Fl
continue
}
inst.appendMeasures(in)
+
+ // Add the measures to the pipeline. It is required to maintain
+ // measures per pipeline to avoid calling the measure that
+ // is not part of the pipeline.
+ insert.pipeline.addFloat64Measure(inst.observableID, in)
for _, cback := range callbacks {
inst := float64Observer{measures: in}
fn := cback
@@ -441,73 +451,80 @@ func (m *meter) RegisterCallback(f metric.Callback, insts ...metric.Observable)
return noopRegister{}, nil
}
- reg := newObserver()
- var errs multierror
+ var err error
+ validInstruments := make([]metric.Observable, 0, len(insts))
for _, inst := range insts {
- // Unwrap any global.
- if u, ok := inst.(interface {
- Unwrap() metric.Observable
- }); ok {
- inst = u.Unwrap()
- }
-
switch o := inst.(type) {
case int64Observable:
- if err := o.registerable(m); err != nil {
- if !errors.Is(err, errEmptyAgg) {
- errs.append(err)
+ if e := o.registerable(m); e != nil {
+ if !errors.Is(e, errEmptyAgg) {
+ err = errors.Join(err, e)
}
continue
}
- reg.registerInt64(o.observablID)
+
+ validInstruments = append(validInstruments, inst)
case float64Observable:
- if err := o.registerable(m); err != nil {
- if !errors.Is(err, errEmptyAgg) {
- errs.append(err)
+ if e := o.registerable(m); e != nil {
+ if !errors.Is(e, errEmptyAgg) {
+ err = errors.Join(err, e)
}
continue
}
- reg.registerFloat64(o.observablID)
+
+ validInstruments = append(validInstruments, inst)
default:
// Instrument external to the SDK.
- return nil, fmt.Errorf("invalid observable: from different implementation")
+ return nil, errors.New("invalid observable: from different implementation")
}
}
- err := errs.errorOrNil()
- if reg.len() == 0 {
+ if len(validInstruments) == 0 {
// All insts use drop aggregation or are invalid.
return noopRegister{}, err
}
- // Some or all instruments were valid.
- cback := func(ctx context.Context) error { return f(ctx, reg) }
- return m.pipes.registerMultiCallback(cback), err
+ unregs := make([]func(), len(m.pipes))
+ for ix, pipe := range m.pipes {
+ reg := newObserver(pipe)
+ for _, inst := range validInstruments {
+ switch o := inst.(type) {
+ case int64Observable:
+ reg.registerInt64(o.observableID)
+ case float64Observable:
+ reg.registerFloat64(o.observableID)
+ }
+ }
+
+ // Some or all instruments were valid.
+ cBack := func(ctx context.Context) error { return f(ctx, reg) }
+ unregs[ix] = pipe.addMultiCallback(cBack)
+ }
+
+ return unregisterFuncs{f: unregs}, err
}
type observer struct {
embedded.Observer
- float64 map[observablID[float64]]struct{}
- int64 map[observablID[int64]]struct{}
+ pipe *pipeline
+ float64 map[observableID[float64]]struct{}
+ int64 map[observableID[int64]]struct{}
}
-func newObserver() observer {
+func newObserver(p *pipeline) observer {
return observer{
- float64: make(map[observablID[float64]]struct{}),
- int64: make(map[observablID[int64]]struct{}),
+ pipe: p,
+ float64: make(map[observableID[float64]]struct{}),
+ int64: make(map[observableID[int64]]struct{}),
}
}
-func (r observer) len() int {
- return len(r.float64) + len(r.int64)
-}
-
-func (r observer) registerFloat64(id observablID[float64]) {
+func (r observer) registerFloat64(id observableID[float64]) {
r.float64[id] = struct{}{}
}
-func (r observer) registerInt64(id observablID[int64]) {
+func (r observer) registerInt64(id observableID[int64]) {
r.int64[id] = struct{}{}
}
@@ -521,22 +538,12 @@ func (r observer) ObserveFloat64(o metric.Float64Observable, v float64, opts ...
switch conv := o.(type) {
case float64Observable:
oImpl = conv
- case interface {
- Unwrap() metric.Observable
- }:
- // Unwrap any global.
- async := conv.Unwrap()
- var ok bool
- if oImpl, ok = async.(float64Observable); !ok {
- global.Error(errUnknownObserver, "failed to record asynchronous")
- return
- }
default:
global.Error(errUnknownObserver, "failed to record")
return
}
- if _, registered := r.float64[oImpl.observablID]; !registered {
+ if _, registered := r.float64[oImpl.observableID]; !registered {
if !oImpl.dropAggregation {
global.Error(errUnregObserver, "failed to record",
"name", oImpl.name,
@@ -548,7 +555,12 @@ func (r observer) ObserveFloat64(o metric.Float64Observable, v float64, opts ...
return
}
c := metric.NewObserveConfig(opts)
- oImpl.observe(v, c.Attributes())
+ // Access to r.pipe.float64Measure is already guarded by a lock in pipeline.produce.
+ // TODO (#5946): Refactor pipeline and observable measures.
+ measures := r.pipe.float64Measures[oImpl.observableID]
+ for _, m := range measures {
+ m(context.Background(), v, c.Attributes())
+ }
}
func (r observer) ObserveInt64(o metric.Int64Observable, v int64, opts ...metric.ObserveOption) {
@@ -556,22 +568,12 @@ func (r observer) ObserveInt64(o metric.Int64Observable, v int64, opts ...metric
switch conv := o.(type) {
case int64Observable:
oImpl = conv
- case interface {
- Unwrap() metric.Observable
- }:
- // Unwrap any global.
- async := conv.Unwrap()
- var ok bool
- if oImpl, ok = async.(int64Observable); !ok {
- global.Error(errUnknownObserver, "failed to record asynchronous")
- return
- }
default:
global.Error(errUnknownObserver, "failed to record")
return
}
- if _, registered := r.int64[oImpl.observablID]; !registered {
+ if _, registered := r.int64[oImpl.observableID]; !registered {
if !oImpl.dropAggregation {
global.Error(errUnregObserver, "failed to record",
"name", oImpl.name,
@@ -583,7 +585,12 @@ func (r observer) ObserveInt64(o metric.Int64Observable, v int64, opts ...metric
return
}
c := metric.NewObserveConfig(opts)
- oImpl.observe(v, c.Attributes())
+ // Access to r.pipe.int64Measures is already guarded b a lock in pipeline.produce.
+ // TODO (#5946): Refactor pipeline and observable measures.
+ measures := r.pipe.int64Measures[oImpl.observableID]
+ for _, m := range measures {
+ m(context.Background(), v, c.Attributes())
+ }
}
type noopRegister struct{ embedded.Registration }
diff --git a/vendor/go.opentelemetry.io/otel/sdk/metric/periodic_reader.go b/vendor/go.opentelemetry.io/otel/sdk/metric/periodic_reader.go
index 67ee1b11a..dcd2182d9 100644
--- a/vendor/go.opentelemetry.io/otel/sdk/metric/periodic_reader.go
+++ b/vendor/go.opentelemetry.io/otel/sdk/metric/periodic_reader.go
@@ -251,18 +251,17 @@ func (r *PeriodicReader) collect(ctx context.Context, p interface{}, rm *metricd
if err != nil {
return err
}
- var errs []error
for _, producer := range r.externalProducers.Load().([]Producer) {
- externalMetrics, err := producer.Produce(ctx)
- if err != nil {
- errs = append(errs, err)
+ externalMetrics, e := producer.Produce(ctx)
+ if e != nil {
+ err = errors.Join(err, e)
}
rm.ScopeMetrics = append(rm.ScopeMetrics, externalMetrics...)
}
global.Debug("PeriodicReader collection", "Data", rm)
- return unifyErrors(errs)
+ return err
}
// export exports metric data m using r's exporter.
diff --git a/vendor/go.opentelemetry.io/otel/sdk/metric/pipeline.go b/vendor/go.opentelemetry.io/otel/sdk/metric/pipeline.go
index 823bf2fe3..775e24526 100644
--- a/vendor/go.opentelemetry.io/otel/sdk/metric/pipeline.go
+++ b/vendor/go.opentelemetry.io/otel/sdk/metric/pipeline.go
@@ -8,14 +8,13 @@ import (
"context"
"errors"
"fmt"
- "strings"
"sync"
"sync/atomic"
"go.opentelemetry.io/otel/internal/global"
- "go.opentelemetry.io/otel/metric"
"go.opentelemetry.io/otel/metric/embedded"
"go.opentelemetry.io/otel/sdk/instrumentation"
+ "go.opentelemetry.io/otel/sdk/metric/exemplar"
"go.opentelemetry.io/otel/sdk/metric/internal"
"go.opentelemetry.io/otel/sdk/metric/internal/aggregate"
"go.opentelemetry.io/otel/sdk/metric/internal/x"
@@ -38,14 +37,17 @@ type instrumentSync struct {
compAgg aggregate.ComputeAggregation
}
-func newPipeline(res *resource.Resource, reader Reader, views []View) *pipeline {
+func newPipeline(res *resource.Resource, reader Reader, views []View, exemplarFilter exemplar.Filter) *pipeline {
if res == nil {
res = resource.Empty()
}
return &pipeline{
- resource: res,
- reader: reader,
- views: views,
+ resource: res,
+ reader: reader,
+ views: views,
+ int64Measures: map[observableID[int64]][]aggregate.Measure[int64]{},
+ float64Measures: map[observableID[float64]][]aggregate.Measure[float64]{},
+ exemplarFilter: exemplarFilter,
// aggregations is lazy allocated when needed.
}
}
@@ -63,9 +65,26 @@ type pipeline struct {
views []View
sync.Mutex
- aggregations map[instrumentation.Scope][]instrumentSync
- callbacks []func(context.Context) error
- multiCallbacks list.List
+ int64Measures map[observableID[int64]][]aggregate.Measure[int64]
+ float64Measures map[observableID[float64]][]aggregate.Measure[float64]
+ aggregations map[instrumentation.Scope][]instrumentSync
+ callbacks []func(context.Context) error
+ multiCallbacks list.List
+ exemplarFilter exemplar.Filter
+}
+
+// addInt64Measure adds a new int64 measure to the pipeline for each observer.
+func (p *pipeline) addInt64Measure(id observableID[int64], m []aggregate.Measure[int64]) {
+ p.Lock()
+ defer p.Unlock()
+ p.int64Measures[id] = m
+}
+
+// addFloat64Measure adds a new float64 measure to the pipeline for each observer.
+func (p *pipeline) addFloat64Measure(id observableID[float64], m []aggregate.Measure[float64]) {
+ p.Lock()
+ defer p.Unlock()
+ p.float64Measures[id] = m
}
// addSync adds the instrumentSync to pipeline p with scope. This method is not
@@ -105,14 +124,15 @@ func (p *pipeline) produce(ctx context.Context, rm *metricdata.ResourceMetrics)
p.Lock()
defer p.Unlock()
- var errs multierror
+ var err error
for _, c := range p.callbacks {
// TODO make the callbacks parallel. ( #3034 )
- if err := c(ctx); err != nil {
- errs.append(err)
+ if e := c(ctx); e != nil {
+ err = errors.Join(err, e)
}
if err := ctx.Err(); err != nil {
rm.Resource = nil
+ clear(rm.ScopeMetrics) // Erase elements to let GC collect objects.
rm.ScopeMetrics = rm.ScopeMetrics[:0]
return err
}
@@ -120,12 +140,13 @@ func (p *pipeline) produce(ctx context.Context, rm *metricdata.ResourceMetrics)
for e := p.multiCallbacks.Front(); e != nil; e = e.Next() {
// TODO make the callbacks parallel. ( #3034 )
f := e.Value.(multiCallback)
- if err := f(ctx); err != nil {
- errs.append(err)
+ if e := f(ctx); e != nil {
+ err = errors.Join(err, e)
}
if err := ctx.Err(); err != nil {
// This means the context expired before we finished running callbacks.
rm.Resource = nil
+ clear(rm.ScopeMetrics) // Erase elements to let GC collect objects.
rm.ScopeMetrics = rm.ScopeMetrics[:0]
return err
}
@@ -157,7 +178,7 @@ func (p *pipeline) produce(ctx context.Context, rm *metricdata.ResourceMetrics)
rm.ScopeMetrics = rm.ScopeMetrics[:i]
- return errs.errorOrNil()
+ return err
}
// inserter facilitates inserting of new instruments from a single scope into a
@@ -219,7 +240,7 @@ func (i *inserter[N]) Instrument(inst Instrument, readerAggregation Aggregation)
measures []aggregate.Measure[N]
)
- errs := &multierror{wrapped: errCreatingAggregators}
+ var err error
seen := make(map[uint64]struct{})
for _, v := range i.pipeline.views {
stream, match := v(inst)
@@ -227,9 +248,9 @@ func (i *inserter[N]) Instrument(inst Instrument, readerAggregation Aggregation)
continue
}
matched = true
- in, id, err := i.cachedAggregator(inst.Scope, inst.Kind, stream, readerAggregation)
- if err != nil {
- errs.append(err)
+ in, id, e := i.cachedAggregator(inst.Scope, inst.Kind, stream, readerAggregation)
+ if e != nil {
+ err = errors.Join(err, e)
}
if in == nil { // Drop aggregation.
continue
@@ -242,8 +263,12 @@ func (i *inserter[N]) Instrument(inst Instrument, readerAggregation Aggregation)
measures = append(measures, in)
}
+ if err != nil {
+ err = errors.Join(errCreatingAggregators, err)
+ }
+
if matched {
- return measures, errs.errorOrNil()
+ return measures, err
}
// Apply implicit default view if no explicit matched.
@@ -252,15 +277,18 @@ func (i *inserter[N]) Instrument(inst Instrument, readerAggregation Aggregation)
Description: inst.Description,
Unit: inst.Unit,
}
- in, _, err := i.cachedAggregator(inst.Scope, inst.Kind, stream, readerAggregation)
- if err != nil {
- errs.append(err)
+ in, _, e := i.cachedAggregator(inst.Scope, inst.Kind, stream, readerAggregation)
+ if e != nil {
+ if err == nil {
+ err = errCreatingAggregators
+ }
+ err = errors.Join(err, e)
}
if in != nil {
// Ensured to have not seen given matched was false.
measures = append(measures, in)
}
- return measures, errs.errorOrNil()
+ return measures, err
}
// addCallback registers a single instrument callback to be run when
@@ -329,6 +357,9 @@ func (i *inserter[N]) cachedAggregator(scope instrumentation.Scope, kind Instrum
// The view explicitly requested the default aggregation.
stream.Aggregation = DefaultAggregationSelector(kind)
}
+ if stream.ExemplarReservoirProviderSelector == nil {
+ stream.ExemplarReservoirProviderSelector = DefaultExemplarReservoirProviderSelector
+ }
if err := isAggregatorCompatible(kind, stream.Aggregation); err != nil {
return nil, 0, fmt.Errorf(
@@ -349,7 +380,7 @@ func (i *inserter[N]) cachedAggregator(scope instrumentation.Scope, kind Instrum
cv := i.aggregators.Lookup(normID, func() aggVal[N] {
b := aggregate.Builder[N]{
Temporality: i.pipeline.reader.temporality(kind),
- ReservoirFunc: reservoirFunc[N](stream.Aggregation),
+ ReservoirFunc: reservoirFunc[N](stream.ExemplarReservoirProviderSelector(stream.Aggregation), i.pipeline.exemplarFilter),
}
b.Filter = stream.AttributeFilter
// A value less than or equal to zero will disable the aggregation
@@ -552,24 +583,16 @@ func isAggregatorCompatible(kind InstrumentKind, agg Aggregation) error {
// measurement.
type pipelines []*pipeline
-func newPipelines(res *resource.Resource, readers []Reader, views []View) pipelines {
+func newPipelines(res *resource.Resource, readers []Reader, views []View, exemplarFilter exemplar.Filter) pipelines {
pipes := make([]*pipeline, 0, len(readers))
for _, r := range readers {
- p := newPipeline(res, r, views)
+ p := newPipeline(res, r, views, exemplarFilter)
r.register(p)
pipes = append(pipes, p)
}
return pipes
}
-func (p pipelines) registerMultiCallback(c multiCallback) metric.Registration {
- unregs := make([]func(), len(p))
- for i, pipe := range p {
- unregs[i] = pipe.addMultiCallback(c)
- }
- return unregisterFuncs{f: unregs}
-}
-
type unregisterFuncs struct {
embedded.Registration
f []func()
@@ -602,15 +625,15 @@ func newResolver[N int64 | float64](p pipelines, vc *cache[string, instID]) reso
func (r resolver[N]) Aggregators(id Instrument) ([]aggregate.Measure[N], error) {
var measures []aggregate.Measure[N]
- errs := &multierror{}
+ var err error
for _, i := range r.inserters {
- in, err := i.Instrument(id, i.readerDefaultAggregation(id.Kind))
- if err != nil {
- errs.append(err)
+ in, e := i.Instrument(id, i.readerDefaultAggregation(id.Kind))
+ if e != nil {
+ err = errors.Join(err, e)
}
measures = append(measures, in...)
}
- return measures, errs.errorOrNil()
+ return measures, err
}
// HistogramAggregators returns the histogram Aggregators that must be updated by the instrument
@@ -619,37 +642,18 @@ func (r resolver[N]) Aggregators(id Instrument) ([]aggregate.Measure[N], error)
func (r resolver[N]) HistogramAggregators(id Instrument, boundaries []float64) ([]aggregate.Measure[N], error) {
var measures []aggregate.Measure[N]
- errs := &multierror{}
+ var err error
for _, i := range r.inserters {
agg := i.readerDefaultAggregation(id.Kind)
if histAgg, ok := agg.(AggregationExplicitBucketHistogram); ok && len(boundaries) > 0 {
histAgg.Boundaries = boundaries
agg = histAgg
}
- in, err := i.Instrument(id, agg)
- if err != nil {
- errs.append(err)
+ in, e := i.Instrument(id, agg)
+ if e != nil {
+ err = errors.Join(err, e)
}
measures = append(measures, in...)
}
- return measures, errs.errorOrNil()
-}
-
-type multierror struct {
- wrapped error
- errors []string
-}
-
-func (m *multierror) errorOrNil() error {
- if len(m.errors) == 0 {
- return nil
- }
- if m.wrapped == nil {
- return errors.New(strings.Join(m.errors, "; "))
- }
- return fmt.Errorf("%w: %s", m.wrapped, strings.Join(m.errors, "; "))
-}
-
-func (m *multierror) append(err error) {
- m.errors = append(m.errors, err.Error())
+ return measures, err
}
diff --git a/vendor/go.opentelemetry.io/otel/sdk/metric/provider.go b/vendor/go.opentelemetry.io/otel/sdk/metric/provider.go
index a82af538e..2fca89e5a 100644
--- a/vendor/go.opentelemetry.io/otel/sdk/metric/provider.go
+++ b/vendor/go.opentelemetry.io/otel/sdk/metric/provider.go
@@ -42,7 +42,7 @@ func NewMeterProvider(options ...Option) *MeterProvider {
flush, sdown := conf.readerSignals()
mp := &MeterProvider{
- pipes: newPipelines(conf.res, conf.readers, conf.views),
+ pipes: newPipelines(conf.res, conf.readers, conf.views, conf.exemplarFilter),
forceFlush: flush,
shutdown: sdown,
}
@@ -76,15 +76,17 @@ func (mp *MeterProvider) Meter(name string, options ...metric.MeterOption) metri
c := metric.NewMeterConfig(options...)
s := instrumentation.Scope{
- Name: name,
- Version: c.InstrumentationVersion(),
- SchemaURL: c.SchemaURL(),
+ Name: name,
+ Version: c.InstrumentationVersion(),
+ SchemaURL: c.SchemaURL(),
+ Attributes: c.InstrumentationAttributes(),
}
global.Info("Meter created",
"Name", s.Name,
"Version", s.Version,
"SchemaURL", s.SchemaURL,
+ "Attributes", s.Attributes,
)
return mp.meters.Lookup(s, func() *meter {
diff --git a/vendor/go.opentelemetry.io/otel/sdk/metric/reader.go b/vendor/go.opentelemetry.io/otel/sdk/metric/reader.go
index d94bdee75..d13a70697 100644
--- a/vendor/go.opentelemetry.io/otel/sdk/metric/reader.go
+++ b/vendor/go.opentelemetry.io/otel/sdk/metric/reader.go
@@ -5,26 +5,26 @@ package metric // import "go.opentelemetry.io/otel/sdk/metric"
import (
"context"
- "fmt"
+ "errors"
"go.opentelemetry.io/otel/sdk/metric/metricdata"
)
// errDuplicateRegister is logged by a Reader when an attempt to registered it
// more than once occurs.
-var errDuplicateRegister = fmt.Errorf("duplicate reader registration")
+var errDuplicateRegister = errors.New("duplicate reader registration")
// ErrReaderNotRegistered is returned if Collect or Shutdown are called before
// the reader is registered with a MeterProvider.
-var ErrReaderNotRegistered = fmt.Errorf("reader is not registered")
+var ErrReaderNotRegistered = errors.New("reader is not registered")
// ErrReaderShutdown is returned if Collect or Shutdown are called after a
// reader has been Shutdown once.
-var ErrReaderShutdown = fmt.Errorf("reader is shutdown")
+var ErrReaderShutdown = errors.New("reader is shutdown")
// errNonPositiveDuration is logged when an environmental variable
// has non-positive value.
-var errNonPositiveDuration = fmt.Errorf("non-positive duration")
+var errNonPositiveDuration = errors.New("non-positive duration")
// Reader is the interface used between the SDK and an
// exporter. Control flow is bi-directional through the
@@ -60,8 +60,8 @@ type Reader interface {
aggregation(InstrumentKind) Aggregation // nolint:revive // import-shadow for method scoped by type.
// Collect gathers and returns all metric data related to the Reader from
- // the SDK and stores it in out. An error is returned if this is called
- // after Shutdown or if out is nil.
+ // the SDK and stores it in rm. An error is returned if this is called
+ // after Shutdown or if rm is nil.
//
// This method needs to be concurrent safe, and the cancellation of the
// passed context is expected to be honored.
diff --git a/vendor/go.opentelemetry.io/otel/sdk/metric/version.go b/vendor/go.opentelemetry.io/otel/sdk/metric/version.go
index 44316caa1..7c4b8530d 100644
--- a/vendor/go.opentelemetry.io/otel/sdk/metric/version.go
+++ b/vendor/go.opentelemetry.io/otel/sdk/metric/version.go
@@ -5,5 +5,5 @@ package metric // import "go.opentelemetry.io/otel/sdk/metric"
// version is the current release version of the metric SDK in use.
func version() string {
- return "1.29.0"
+ return "1.34.0"
}
diff --git a/vendor/go.opentelemetry.io/otel/sdk/metric/view.go b/vendor/go.opentelemetry.io/otel/sdk/metric/view.go
index cd08c6732..630890f42 100644
--- a/vendor/go.opentelemetry.io/otel/sdk/metric/view.go
+++ b/vendor/go.opentelemetry.io/otel/sdk/metric/view.go
@@ -96,11 +96,12 @@ func NewView(criteria Instrument, mask Stream) View {
return func(i Instrument) (Stream, bool) {
if matchFunc(i) {
return Stream{
- Name: nonZero(mask.Name, i.Name),
- Description: nonZero(mask.Description, i.Description),
- Unit: nonZero(mask.Unit, i.Unit),
- Aggregation: agg,
- AttributeFilter: mask.AttributeFilter,
+ Name: nonZero(mask.Name, i.Name),
+ Description: nonZero(mask.Description, i.Description),
+ Unit: nonZero(mask.Unit, i.Unit),
+ Aggregation: agg,
+ AttributeFilter: mask.AttributeFilter,
+ ExemplarReservoirProviderSelector: mask.ExemplarReservoirProviderSelector,
}, true
}
return Stream{}, false
diff --git a/vendor/go.opentelemetry.io/otel/sdk/resource/auto.go b/vendor/go.opentelemetry.io/otel/sdk/resource/auto.go
index 95a61d61d..c02aeefdd 100644
--- a/vendor/go.opentelemetry.io/otel/sdk/resource/auto.go
+++ b/vendor/go.opentelemetry.io/otel/sdk/resource/auto.go
@@ -7,7 +7,6 @@ import (
"context"
"errors"
"fmt"
- "strings"
)
// ErrPartialResource is returned by a detector when complete source
@@ -57,62 +56,37 @@ func Detect(ctx context.Context, detectors ...Detector) (*Resource, error) {
// these errors will be returned. Otherwise, nil is returned.
func detect(ctx context.Context, res *Resource, detectors []Detector) error {
var (
- r *Resource
- errs detectErrs
- err error
+ r *Resource
+ err error
+ e error
)
for _, detector := range detectors {
if detector == nil {
continue
}
- r, err = detector.Detect(ctx)
- if err != nil {
- errs = append(errs, err)
- if !errors.Is(err, ErrPartialResource) {
+ r, e = detector.Detect(ctx)
+ if e != nil {
+ err = errors.Join(err, e)
+ if !errors.Is(e, ErrPartialResource) {
continue
}
}
- r, err = Merge(res, r)
- if err != nil {
- errs = append(errs, err)
+ r, e = Merge(res, r)
+ if e != nil {
+ err = errors.Join(err, e)
}
*res = *r
}
- if len(errs) == 0 {
- return nil
- }
- if errors.Is(errs, ErrSchemaURLConflict) {
- // If there has been a merge conflict, ensure the resource has no
- // schema URL.
- res.schemaURL = ""
- }
- return errs
-}
-
-type detectErrs []error
-
-func (e detectErrs) Error() string {
- errStr := make([]string, len(e))
- for i, err := range e {
- errStr[i] = fmt.Sprintf("* %s", err)
- }
-
- format := "%d errors occurred detecting resource:\n\t%s"
- return fmt.Sprintf(format, len(e), strings.Join(errStr, "\n\t"))
-}
+ if err != nil {
+ if errors.Is(err, ErrSchemaURLConflict) {
+ // If there has been a merge conflict, ensure the resource has no
+ // schema URL.
+ res.schemaURL = ""
+ }
-func (e detectErrs) Unwrap() error {
- switch len(e) {
- case 0:
- return nil
- case 1:
- return e[0]
+ err = fmt.Errorf("error detecting resource: %w", err)
}
- return e[1:]
-}
-
-func (e detectErrs) Is(target error) bool {
- return len(e) != 0 && errors.Is(e[0], target)
+ return err
}
diff --git a/vendor/go.opentelemetry.io/otel/sdk/resource/builtin.go b/vendor/go.opentelemetry.io/otel/sdk/resource/builtin.go
index 6ac1cdbf7..cf3c88e15 100644
--- a/vendor/go.opentelemetry.io/otel/sdk/resource/builtin.go
+++ b/vendor/go.opentelemetry.io/otel/sdk/resource/builtin.go
@@ -20,15 +20,13 @@ type (
// telemetrySDK is a Detector that provides information about
// the OpenTelemetry SDK used. This Detector is included as a
// builtin. If these resource attributes are not wanted, use
- // the WithTelemetrySDK(nil) or WithoutBuiltin() options to
- // explicitly disable them.
+ // resource.New() to explicitly disable them.
telemetrySDK struct{}
// host is a Detector that provides information about the host
// being run on. This Detector is included as a builtin. If
// these resource attributes are not wanted, use the
- // WithHost(nil) or WithoutBuiltin() options to explicitly
- // disable them.
+ // resource.New() to explicitly disable them.
host struct{}
stringDetector struct {
diff --git a/vendor/go.opentelemetry.io/otel/sdk/resource/host_id_windows.go b/vendor/go.opentelemetry.io/otel/sdk/resource/host_id_windows.go
index 71386e2da..3677c83d7 100644
--- a/vendor/go.opentelemetry.io/otel/sdk/resource/host_id_windows.go
+++ b/vendor/go.opentelemetry.io/otel/sdk/resource/host_id_windows.go
@@ -10,17 +10,16 @@ import (
"golang.org/x/sys/windows/registry"
)
-// implements hostIDReader
+// implements hostIDReader.
type hostIDReaderWindows struct{}
-// read reads MachineGuid from the windows registry key:
-// SOFTWARE\Microsoft\Cryptography
+// read reads MachineGuid from the Windows registry key:
+// SOFTWARE\Microsoft\Cryptography.
func (*hostIDReaderWindows) read() (string, error) {
k, err := registry.OpenKey(
registry.LOCAL_MACHINE, `SOFTWARE\Microsoft\Cryptography`,
registry.QUERY_VALUE|registry.WOW64_64KEY,
)
-
if err != nil {
return "", err
}
diff --git a/vendor/go.opentelemetry.io/otel/sdk/resource/os_windows.go b/vendor/go.opentelemetry.io/otel/sdk/resource/os_windows.go
index 5e3d199d7..a6a5a53c0 100644
--- a/vendor/go.opentelemetry.io/otel/sdk/resource/os_windows.go
+++ b/vendor/go.opentelemetry.io/otel/sdk/resource/os_windows.go
@@ -17,7 +17,6 @@ import (
func platformOSDescription() (string, error) {
k, err := registry.OpenKey(
registry.LOCAL_MACHINE, `SOFTWARE\Microsoft\Windows NT\CurrentVersion`, registry.QUERY_VALUE)
-
if err != nil {
return "", err
}
diff --git a/vendor/go.opentelemetry.io/otel/sdk/trace/batch_span_processor.go b/vendor/go.opentelemetry.io/otel/sdk/trace/batch_span_processor.go
index 1d399a75d..ccc97e1b6 100644
--- a/vendor/go.opentelemetry.io/otel/sdk/trace/batch_span_processor.go
+++ b/vendor/go.opentelemetry.io/otel/sdk/trace/batch_span_processor.go
@@ -280,6 +280,7 @@ func (bsp *batchSpanProcessor) exportSpans(ctx context.Context) error {
//
// It is up to the exporter to implement any type of retry logic if a batch is failing
// to be exported, since it is specific to the protocol and backend being sent to.
+ clear(bsp.batch) // Erase elements to let GC collect objects
bsp.batch = bsp.batch[:0]
if err != nil {
@@ -316,7 +317,11 @@ func (bsp *batchSpanProcessor) processQueue() {
bsp.batchMutex.Unlock()
if shouldExport {
if !bsp.timer.Stop() {
- <-bsp.timer.C
+ // Handle both GODEBUG=asynctimerchan=[0|1] properly.
+ select {
+ case <-bsp.timer.C:
+ default:
+ }
}
if err := bsp.exportSpans(ctx); err != nil {
otel.Handle(err)
diff --git a/vendor/go.opentelemetry.io/otel/sdk/trace/evictedqueue.go b/vendor/go.opentelemetry.io/otel/sdk/trace/evictedqueue.go
index 821c83faa..8c308dd60 100644
--- a/vendor/go.opentelemetry.io/otel/sdk/trace/evictedqueue.go
+++ b/vendor/go.opentelemetry.io/otel/sdk/trace/evictedqueue.go
@@ -12,25 +12,26 @@ import (
// evictedQueue is a FIFO queue with a configurable capacity.
type evictedQueue[T any] struct {
- queue []T
- capacity int
- droppedCount int
- logDropped func()
+ queue []T
+ capacity int
+ droppedCount int
+ logDroppedMsg string
+ logDroppedOnce sync.Once
}
func newEvictedQueueEvent(capacity int) evictedQueue[Event] {
// Do not pre-allocate queue, do this lazily.
return evictedQueue[Event]{
- capacity: capacity,
- logDropped: sync.OnceFunc(func() { global.Warn("limit reached: dropping trace trace.Event") }),
+ capacity: capacity,
+ logDroppedMsg: "limit reached: dropping trace trace.Event",
}
}
func newEvictedQueueLink(capacity int) evictedQueue[Link] {
// Do not pre-allocate queue, do this lazily.
return evictedQueue[Link]{
- capacity: capacity,
- logDropped: sync.OnceFunc(func() { global.Warn("limit reached: dropping trace trace.Link") }),
+ capacity: capacity,
+ logDroppedMsg: "limit reached: dropping trace trace.Link",
}
}
@@ -53,6 +54,10 @@ func (eq *evictedQueue[T]) add(value T) {
eq.queue = append(eq.queue, value)
}
+func (eq *evictedQueue[T]) logDropped() {
+ eq.logDroppedOnce.Do(func() { global.Warn(eq.logDroppedMsg) })
+}
+
// copy returns a copy of the evictedQueue.
func (eq *evictedQueue[T]) copy() []T {
return slices.Clone(eq.queue)
diff --git a/vendor/go.opentelemetry.io/otel/sdk/trace/provider.go b/vendor/go.opentelemetry.io/otel/sdk/trace/provider.go
index 14c2e5beb..185aa7c08 100644
--- a/vendor/go.opentelemetry.io/otel/sdk/trace/provider.go
+++ b/vendor/go.opentelemetry.io/otel/sdk/trace/provider.go
@@ -139,9 +139,10 @@ func (p *TracerProvider) Tracer(name string, opts ...trace.TracerOption) trace.T
name = defaultTracerName
}
is := instrumentation.Scope{
- Name: name,
- Version: c.InstrumentationVersion(),
- SchemaURL: c.SchemaURL(),
+ Name: name,
+ Version: c.InstrumentationVersion(),
+ SchemaURL: c.SchemaURL(),
+ Attributes: c.InstrumentationAttributes(),
}
t, ok := func() (trace.Tracer, bool) {
@@ -168,7 +169,7 @@ func (p *TracerProvider) Tracer(name string, opts ...trace.TracerOption) trace.T
// slowing down all tracing consumers.
// - Logging code may be instrumented with tracing and deadlock because it could try
// acquiring the same non-reentrant mutex.
- global.Info("Tracer created", "name", name, "version", is.Version, "schemaURL", is.SchemaURL)
+ global.Info("Tracer created", "name", name, "version", is.Version, "schemaURL", is.SchemaURL, "attributes", is.Attributes)
}
return t
}
diff --git a/vendor/go.opentelemetry.io/otel/sdk/trace/sampler_env.go b/vendor/go.opentelemetry.io/otel/sdk/trace/sampler_env.go
index d2d1f7246..9b672a1d7 100644
--- a/vendor/go.opentelemetry.io/otel/sdk/trace/sampler_env.go
+++ b/vendor/go.opentelemetry.io/otel/sdk/trace/sampler_env.go
@@ -5,7 +5,6 @@ package trace // import "go.opentelemetry.io/otel/sdk/trace"
import (
"errors"
- "fmt"
"os"
"strconv"
"strings"
@@ -26,7 +25,7 @@ const (
type errUnsupportedSampler string
func (e errUnsupportedSampler) Error() string {
- return fmt.Sprintf("unsupported sampler: %s", string(e))
+ return "unsupported sampler: " + string(e)
}
var (
@@ -39,7 +38,7 @@ type samplerArgParseError struct {
}
func (e samplerArgParseError) Error() string {
- return fmt.Sprintf("parsing sampler argument: %s", e.parseErr.Error())
+ return "parsing sampler argument: " + e.parseErr.Error()
}
func (e samplerArgParseError) Unwrap() error {
diff --git a/vendor/go.opentelemetry.io/otel/sdk/trace/span.go b/vendor/go.opentelemetry.io/otel/sdk/trace/span.go
index 4945f5083..8f4fc3850 100644
--- a/vendor/go.opentelemetry.io/otel/sdk/trace/span.go
+++ b/vendor/go.opentelemetry.io/otel/sdk/trace/span.go
@@ -174,6 +174,17 @@ func (s *recordingSpan) IsRecording() bool {
s.mu.Lock()
defer s.mu.Unlock()
+ return s.isRecording()
+}
+
+// isRecording returns if this span is being recorded. If this span has ended
+// this will return false.
+//
+// This method assumes s.mu.Lock is held by the caller.
+func (s *recordingSpan) isRecording() bool {
+ if s == nil {
+ return false
+ }
return s.endTime.IsZero()
}
@@ -182,11 +193,15 @@ func (s *recordingSpan) IsRecording() bool {
// included in the set status when the code is for an error. If this span is
// not being recorded than this method does nothing.
func (s *recordingSpan) SetStatus(code codes.Code, description string) {
- if !s.IsRecording() {
+ if s == nil {
return
}
+
s.mu.Lock()
defer s.mu.Unlock()
+ if !s.isRecording() {
+ return
+ }
if s.status.Code > code {
return
}
@@ -210,12 +225,15 @@ func (s *recordingSpan) SetStatus(code codes.Code, description string) {
// attributes the span is configured to have, the last added attributes will
// be dropped.
func (s *recordingSpan) SetAttributes(attributes ...attribute.KeyValue) {
- if !s.IsRecording() {
+ if s == nil || len(attributes) == 0 {
return
}
s.mu.Lock()
defer s.mu.Unlock()
+ if !s.isRecording() {
+ return
+ }
limit := s.tracer.provider.spanLimits.AttributeCountLimit
if limit == 0 {
@@ -233,7 +251,7 @@ func (s *recordingSpan) SetAttributes(attributes ...attribute.KeyValue) {
// Otherwise, add without deduplication. When attributes are read they
// will be deduplicated, optimizing the operation.
- s.attributes = slices.Grow(s.attributes, len(s.attributes)+len(attributes))
+ s.attributes = slices.Grow(s.attributes, len(attributes))
for _, a := range attributes {
if !a.Valid() {
// Drop all invalid attributes.
@@ -280,13 +298,17 @@ func (s *recordingSpan) addOverCapAttrs(limit int, attrs []attribute.KeyValue) {
// Do not set a capacity when creating this map. Benchmark testing has
// showed this to only add unused memory allocations in general use.
- exists := make(map[attribute.Key]int)
- s.dedupeAttrsFromRecord(&exists)
+ exists := make(map[attribute.Key]int, len(s.attributes))
+ s.dedupeAttrsFromRecord(exists)
// Now that s.attributes is deduplicated, adding unique attributes up to
// the capacity of s will not over allocate s.attributes.
- sum := len(attrs) + len(s.attributes)
- s.attributes = slices.Grow(s.attributes, min(sum, limit))
+
+ // max size = limit
+ maxCap := min(len(attrs)+len(s.attributes), limit)
+ if cap(s.attributes) < maxCap {
+ s.attributes = slices.Grow(s.attributes, maxCap-cap(s.attributes))
+ }
for _, a := range attrs {
if !a.Valid() {
// Drop all invalid attributes.
@@ -296,6 +318,7 @@ func (s *recordingSpan) addOverCapAttrs(limit int, attrs []attribute.KeyValue) {
if idx, ok := exists[a.Key]; ok {
// Perform all updates before dropping, even when at capacity.
+ a = truncateAttr(s.tracer.provider.spanLimits.AttributeValueLengthLimit, a)
s.attributes[idx] = a
continue
}
@@ -324,54 +347,99 @@ func truncateAttr(limit int, attr attribute.KeyValue) attribute.KeyValue {
}
switch attr.Value.Type() {
case attribute.STRING:
- if v := attr.Value.AsString(); len(v) > limit {
- return attr.Key.String(safeTruncate(v, limit))
- }
+ v := attr.Value.AsString()
+ return attr.Key.String(truncate(limit, v))
case attribute.STRINGSLICE:
v := attr.Value.AsStringSlice()
for i := range v {
- if len(v[i]) > limit {
- v[i] = safeTruncate(v[i], limit)
- }
+ v[i] = truncate(limit, v[i])
}
return attr.Key.StringSlice(v)
}
return attr
}
-// safeTruncate truncates the string and guarantees valid UTF-8 is returned.
-func safeTruncate(input string, limit int) string {
- if trunc, ok := safeTruncateValidUTF8(input, limit); ok {
- return trunc
+// truncate returns a truncated version of s such that it contains less than
+// the limit number of characters. Truncation is applied by returning the limit
+// number of valid characters contained in s.
+//
+// If limit is negative, it returns the original string.
+//
+// UTF-8 is supported. When truncating, all invalid characters are dropped
+// before applying truncation.
+//
+// If s already contains less than the limit number of bytes, it is returned
+// unchanged. No invalid characters are removed.
+func truncate(limit int, s string) string {
+ // This prioritize performance in the following order based on the most
+ // common expected use-cases.
+ //
+ // - Short values less than the default limit (128).
+ // - Strings with valid encodings that exceed the limit.
+ // - No limit.
+ // - Strings with invalid encodings that exceed the limit.
+ if limit < 0 || len(s) <= limit {
+ return s
+ }
+
+ // Optimistically, assume all valid UTF-8.
+ var b strings.Builder
+ count := 0
+ for i, c := range s {
+ if c != utf8.RuneError {
+ count++
+ if count > limit {
+ return s[:i]
+ }
+ continue
+ }
+
+ _, size := utf8.DecodeRuneInString(s[i:])
+ if size == 1 {
+ // Invalid encoding.
+ b.Grow(len(s) - 1)
+ _, _ = b.WriteString(s[:i])
+ s = s[i:]
+ break
+ }
+ }
+
+ // Fast-path, no invalid input.
+ if b.Cap() == 0 {
+ return s
}
- trunc, _ := safeTruncateValidUTF8(strings.ToValidUTF8(input, ""), limit)
- return trunc
-}
-// safeTruncateValidUTF8 returns a copy of the input string safely truncated to
-// limit. The truncation is ensured to occur at the bounds of complete UTF-8
-// characters. If invalid encoding of UTF-8 is encountered, input is returned
-// with false, otherwise, the truncated input will be returned with true.
-func safeTruncateValidUTF8(input string, limit int) (string, bool) {
- for cnt := 0; cnt <= limit; {
- r, size := utf8.DecodeRuneInString(input[cnt:])
- if r == utf8.RuneError {
- return input, false
+ // Truncate while validating UTF-8.
+ for i := 0; i < len(s) && count < limit; {
+ c := s[i]
+ if c < utf8.RuneSelf {
+ // Optimization for single byte runes (common case).
+ _ = b.WriteByte(c)
+ i++
+ count++
+ continue
}
- if cnt+size > limit {
- return input[:cnt], true
+ _, size := utf8.DecodeRuneInString(s[i:])
+ if size == 1 {
+ // We checked for all 1-byte runes above, this is a RuneError.
+ i++
+ continue
}
- cnt += size
+
+ _, _ = b.WriteString(s[i : i+size])
+ i += size
+ count++
}
- return input, true
+
+ return b.String()
}
// End ends the span. This method does nothing if the span is already ended or
// is not being recorded.
//
-// The only SpanOption currently supported is WithTimestamp which will set the
-// end time for a Span's life-cycle.
+// The only SpanEndOption currently supported are [trace.WithTimestamp], and
+// [trace.WithStackTrace].
//
// If this method is called while panicking an error event is added to the
// Span before ending it and the panic is continued.
@@ -386,9 +454,10 @@ func (s *recordingSpan) End(options ...trace.SpanEndOption) {
// the span's duration in case some operation below takes a while.
et := monotonicEndTime(s.startTime)
- // Do relative expensive check now that we have an end time and see if we
- // need to do any more processing.
- if !s.IsRecording() {
+ // Lock the span now that we have an end time and see if we need to do any more processing.
+ s.mu.Lock()
+ if !s.isRecording() {
+ s.mu.Unlock()
return
}
@@ -413,10 +482,11 @@ func (s *recordingSpan) End(options ...trace.SpanEndOption) {
}
if s.executionTracerTaskEnd != nil {
+ s.mu.Unlock()
s.executionTracerTaskEnd()
+ s.mu.Lock()
}
- s.mu.Lock()
// Setting endTime to non-zero marks the span as ended and not recording.
if config.Timestamp().IsZero() {
s.endTime = et
@@ -450,7 +520,13 @@ func monotonicEndTime(start time.Time) time.Time {
// does not change the Span status. If this span is not being recorded or err is nil
// than this method does nothing.
func (s *recordingSpan) RecordError(err error, opts ...trace.EventOption) {
- if s == nil || err == nil || !s.IsRecording() {
+ if s == nil || err == nil {
+ return
+ }
+
+ s.mu.Lock()
+ defer s.mu.Unlock()
+ if !s.isRecording() {
return
}
@@ -486,14 +562,23 @@ func recordStackTrace() string {
}
// AddEvent adds an event with the provided name and options. If this span is
-// not being recorded than this method does nothing.
+// not being recorded then this method does nothing.
func (s *recordingSpan) AddEvent(name string, o ...trace.EventOption) {
- if !s.IsRecording() {
+ if s == nil {
+ return
+ }
+
+ s.mu.Lock()
+ defer s.mu.Unlock()
+ if !s.isRecording() {
return
}
s.addEvent(name, o...)
}
+// addEvent adds an event with the provided name and options.
+//
+// This method assumes s.mu.Lock is held by the caller.
func (s *recordingSpan) addEvent(name string, o ...trace.EventOption) {
c := trace.NewEventConfig(o...)
e := Event{Name: name, Attributes: c.Attributes(), Time: c.Timestamp()}
@@ -510,20 +595,21 @@ func (s *recordingSpan) addEvent(name string, o ...trace.EventOption) {
e.Attributes = e.Attributes[:limit]
}
- s.mu.Lock()
s.events.add(e)
- s.mu.Unlock()
}
// SetName sets the name of this span. If this span is not being recorded than
// this method does nothing.
func (s *recordingSpan) SetName(name string) {
- if !s.IsRecording() {
+ if s == nil {
return
}
s.mu.Lock()
defer s.mu.Unlock()
+ if !s.isRecording() {
+ return
+ }
s.name = name
}
@@ -579,29 +665,26 @@ func (s *recordingSpan) Attributes() []attribute.KeyValue {
func (s *recordingSpan) dedupeAttrs() {
// Do not set a capacity when creating this map. Benchmark testing has
// showed this to only add unused memory allocations in general use.
- exists := make(map[attribute.Key]int)
- s.dedupeAttrsFromRecord(&exists)
+ exists := make(map[attribute.Key]int, len(s.attributes))
+ s.dedupeAttrsFromRecord(exists)
}
// dedupeAttrsFromRecord deduplicates the attributes of s to fit capacity
// using record as the record of unique attribute keys to their index.
//
// This method assumes s.mu.Lock is held by the caller.
-func (s *recordingSpan) dedupeAttrsFromRecord(record *map[attribute.Key]int) {
+func (s *recordingSpan) dedupeAttrsFromRecord(record map[attribute.Key]int) {
// Use the fact that slices share the same backing array.
unique := s.attributes[:0]
for _, a := range s.attributes {
- if idx, ok := (*record)[a.Key]; ok {
+ if idx, ok := record[a.Key]; ok {
unique[idx] = a
} else {
unique = append(unique, a)
- (*record)[a.Key] = len(unique) - 1
+ record[a.Key] = len(unique) - 1
}
}
- // s.attributes have element types of attribute.KeyValue. These types are
- // not pointers and they themselves do not contain pointer fields,
- // therefore the duplicate values do not need to be zeroed for them to be
- // garbage collected.
+ clear(s.attributes[len(unique):]) // Erase unneeded elements to let GC collect objects.
s.attributes = unique
}
@@ -657,7 +740,7 @@ func (s *recordingSpan) Resource() *resource.Resource {
}
func (s *recordingSpan) AddLink(link trace.Link) {
- if !s.IsRecording() {
+ if s == nil {
return
}
if !link.SpanContext.IsValid() && len(link.Attributes) == 0 &&
@@ -665,6 +748,12 @@ func (s *recordingSpan) AddLink(link trace.Link) {
return
}
+ s.mu.Lock()
+ defer s.mu.Unlock()
+ if !s.isRecording() {
+ return
+ }
+
l := Link{SpanContext: link.SpanContext, Attributes: link.Attributes}
// Discard attributes over limit.
@@ -678,9 +767,7 @@ func (s *recordingSpan) AddLink(link trace.Link) {
l.Attributes = l.Attributes[:limit]
}
- s.mu.Lock()
s.links.add(l)
- s.mu.Unlock()
}
// DroppedAttributes returns the number of attributes dropped by the span
@@ -755,12 +842,16 @@ func (s *recordingSpan) snapshot() ReadOnlySpan {
}
func (s *recordingSpan) addChild() {
- if !s.IsRecording() {
+ if s == nil {
return
}
+
s.mu.Lock()
+ defer s.mu.Unlock()
+ if !s.isRecording() {
+ return
+ }
s.childSpanCount++
- s.mu.Unlock()
}
func (*recordingSpan) private() {}
diff --git a/vendor/go.opentelemetry.io/otel/sdk/version.go b/vendor/go.opentelemetry.io/otel/sdk/version.go
index b7cede891..6b4038510 100644
--- a/vendor/go.opentelemetry.io/otel/sdk/version.go
+++ b/vendor/go.opentelemetry.io/otel/sdk/version.go
@@ -5,5 +5,5 @@ package sdk // import "go.opentelemetry.io/otel/sdk"
// Version is the current release version of the OpenTelemetry SDK in use.
func Version() string {
- return "1.29.0"
+ return "1.34.0"
}