summaryrefslogtreecommitdiff
path: root/vendor/go.opentelemetry.io/otel/sdk/metric/meter.go
diff options
context:
space:
mode:
Diffstat (limited to 'vendor/go.opentelemetry.io/otel/sdk/metric/meter.go')
-rw-r--r--vendor/go.opentelemetry.io/otel/sdk/metric/meter.go323
1 files changed, 209 insertions, 114 deletions
diff --git a/vendor/go.opentelemetry.io/otel/sdk/metric/meter.go b/vendor/go.opentelemetry.io/otel/sdk/metric/meter.go
index 7f51ec512..beb7876ec 100644
--- a/vendor/go.opentelemetry.io/otel/sdk/metric/meter.go
+++ b/vendor/go.opentelemetry.io/otel/sdk/metric/meter.go
@@ -23,6 +23,7 @@ import (
"go.opentelemetry.io/otel/metric"
"go.opentelemetry.io/otel/metric/embedded"
"go.opentelemetry.io/otel/sdk/instrumentation"
+
"go.opentelemetry.io/otel/sdk/metric/internal/aggregate"
)
@@ -40,6 +41,11 @@ type meter struct {
scope instrumentation.Scope
pipes pipelines
+ int64Insts *cacheWithErr[instID, *int64Inst]
+ float64Insts *cacheWithErr[instID, *float64Inst]
+ int64ObservableInsts *cacheWithErr[instID, int64Observable]
+ float64ObservableInsts *cacheWithErr[instID, float64Observable]
+
int64Resolver resolver[int64]
float64Resolver resolver[float64]
}
@@ -49,11 +55,20 @@ func newMeter(s instrumentation.Scope, p pipelines) *meter {
// meter is asked to create are logged to the user.
var viewCache cache[string, instID]
+ var int64Insts cacheWithErr[instID, *int64Inst]
+ var float64Insts cacheWithErr[instID, *float64Inst]
+ var int64ObservableInsts cacheWithErr[instID, int64Observable]
+ var float64ObservableInsts cacheWithErr[instID, float64Observable]
+
return &meter{
- scope: s,
- pipes: p,
- int64Resolver: newResolver[int64](p, &viewCache),
- float64Resolver: newResolver[float64](p, &viewCache),
+ scope: s,
+ pipes: p,
+ int64Insts: &int64Insts,
+ float64Insts: &float64Insts,
+ int64ObservableInsts: &int64ObservableInsts,
+ float64ObservableInsts: &float64ObservableInsts,
+ int64Resolver: newResolver[int64](p, &viewCache),
+ float64Resolver: newResolver[float64](p, &viewCache),
}
}
@@ -104,20 +119,62 @@ func (m *meter) Int64Histogram(name string, options ...metric.Int64HistogramOpti
return i, validateInstrumentName(name)
}
+// int64ObservableInstrument returns a new observable identified by the Instrument.
+// It registers callbacks for each reader's pipeline.
+func (m *meter) int64ObservableInstrument(id Instrument, callbacks []metric.Int64Callback) (int64Observable, error) {
+ key := instID{
+ Name: id.Name,
+ Description: id.Description,
+ Unit: id.Unit,
+ Kind: id.Kind,
+ }
+ if m.int64ObservableInsts.HasKey(key) && len(callbacks) > 0 {
+ warnRepeatedObservableCallbacks(id)
+ }
+ return m.int64ObservableInsts.Lookup(key, func() (int64Observable, error) {
+ inst := newInt64Observable(m, id.Kind, id.Name, id.Description, id.Unit)
+ for _, insert := range m.int64Resolver.inserters {
+ // Connect the measure functions for instruments in this pipeline with the
+ // callbacks for this pipeline.
+ in, err := insert.Instrument(id, insert.readerDefaultAggregation(id.Kind))
+ if err != nil {
+ return inst, err
+ }
+ // Drop aggregation
+ if len(in) == 0 {
+ inst.dropAggregation = true
+ continue
+ }
+ inst.appendMeasures(in)
+ for _, cback := range callbacks {
+ inst := int64Observer{measures: in}
+ fn := cback
+ insert.addCallback(func(ctx context.Context) error { return fn(ctx, inst) })
+ }
+ }
+ return inst, validateInstrumentName(id.Name)
+ })
+}
+
// Int64ObservableCounter returns a new instrument identified by name and
// configured with options. The instrument is used to asynchronously record
// increasing int64 measurements once per a measurement collection cycle.
// Only the measurements recorded during the collection cycle are exported.
+//
+// If Int64ObservableCounter is invoked repeatedly with the same Name,
+// Description, and Unit, only the first set of callbacks provided are used.
+// Use meter.RegisterCallback and Registration.Unregister to manage callbacks
+// if instrumentation can be created multiple times with different callbacks.
func (m *meter) Int64ObservableCounter(name string, options ...metric.Int64ObservableCounterOption) (metric.Int64ObservableCounter, error) {
cfg := metric.NewInt64ObservableCounterConfig(options...)
- const kind = InstrumentKindObservableCounter
- p := int64ObservProvider{m}
- inst, err := p.lookup(kind, name, cfg.Description(), cfg.Unit())
- if err != nil {
- return nil, err
+ id := Instrument{
+ Name: name,
+ Description: cfg.Description(),
+ Unit: cfg.Unit(),
+ Kind: InstrumentKindObservableCounter,
+ Scope: m.scope,
}
- p.registerCallbacks(inst, cfg.Callbacks())
- return inst, validateInstrumentName(name)
+ return m.int64ObservableInstrument(id, cfg.Callbacks())
}
// Int64ObservableUpDownCounter returns a new instrument identified by name and
@@ -126,14 +183,14 @@ func (m *meter) Int64ObservableCounter(name string, options ...metric.Int64Obser
// measurements recorded during the collection cycle are exported.
func (m *meter) Int64ObservableUpDownCounter(name string, options ...metric.Int64ObservableUpDownCounterOption) (metric.Int64ObservableUpDownCounter, error) {
cfg := metric.NewInt64ObservableUpDownCounterConfig(options...)
- const kind = InstrumentKindObservableUpDownCounter
- p := int64ObservProvider{m}
- inst, err := p.lookup(kind, name, cfg.Description(), cfg.Unit())
- if err != nil {
- return nil, err
+ id := Instrument{
+ Name: name,
+ Description: cfg.Description(),
+ Unit: cfg.Unit(),
+ Kind: InstrumentKindObservableUpDownCounter,
+ Scope: m.scope,
}
- p.registerCallbacks(inst, cfg.Callbacks())
- return inst, validateInstrumentName(name)
+ return m.int64ObservableInstrument(id, cfg.Callbacks())
}
// Int64ObservableGauge returns a new instrument identified by name and
@@ -142,14 +199,14 @@ func (m *meter) Int64ObservableUpDownCounter(name string, options ...metric.Int6
// Only the measurements recorded during the collection cycle are exported.
func (m *meter) Int64ObservableGauge(name string, options ...metric.Int64ObservableGaugeOption) (metric.Int64ObservableGauge, error) {
cfg := metric.NewInt64ObservableGaugeConfig(options...)
- const kind = InstrumentKindObservableGauge
- p := int64ObservProvider{m}
- inst, err := p.lookup(kind, name, cfg.Description(), cfg.Unit())
- if err != nil {
- return nil, err
+ id := Instrument{
+ Name: name,
+ Description: cfg.Description(),
+ Unit: cfg.Unit(),
+ Kind: InstrumentKindObservableGauge,
+ Scope: m.scope,
}
- p.registerCallbacks(inst, cfg.Callbacks())
- return inst, validateInstrumentName(name)
+ return m.int64ObservableInstrument(id, cfg.Callbacks())
}
// Float64Counter returns a new instrument identified by name and configured
@@ -196,20 +253,62 @@ func (m *meter) Float64Histogram(name string, options ...metric.Float64Histogram
return i, validateInstrumentName(name)
}
+// float64ObservableInstrument returns a new observable identified by the Instrument.
+// It registers callbacks for each reader's pipeline.
+func (m *meter) float64ObservableInstrument(id Instrument, callbacks []metric.Float64Callback) (float64Observable, error) {
+ key := instID{
+ Name: id.Name,
+ Description: id.Description,
+ Unit: id.Unit,
+ Kind: id.Kind,
+ }
+ if m.int64ObservableInsts.HasKey(key) && len(callbacks) > 0 {
+ warnRepeatedObservableCallbacks(id)
+ }
+ return m.float64ObservableInsts.Lookup(key, func() (float64Observable, error) {
+ inst := newFloat64Observable(m, id.Kind, id.Name, id.Description, id.Unit)
+ for _, insert := range m.float64Resolver.inserters {
+ // Connect the measure functions for instruments in this pipeline with the
+ // callbacks for this pipeline.
+ in, err := insert.Instrument(id, insert.readerDefaultAggregation(id.Kind))
+ if err != nil {
+ return inst, err
+ }
+ // Drop aggregation
+ if len(in) == 0 {
+ inst.dropAggregation = true
+ continue
+ }
+ inst.appendMeasures(in)
+ for _, cback := range callbacks {
+ inst := float64Observer{measures: in}
+ fn := cback
+ insert.addCallback(func(ctx context.Context) error { return fn(ctx, inst) })
+ }
+ }
+ return inst, validateInstrumentName(id.Name)
+ })
+}
+
// Float64ObservableCounter returns a new instrument identified by name and
// configured with options. The instrument is used to asynchronously record
// increasing float64 measurements once per a measurement collection cycle.
// Only the measurements recorded during the collection cycle are exported.
+//
+// If Float64ObservableCounter is invoked repeatedly with the same Name,
+// Description, and Unit, only the first set of callbacks provided are used.
+// Use meter.RegisterCallback and Registration.Unregister to manage callbacks
+// if instrumentation can be created multiple times with different callbacks.
func (m *meter) Float64ObservableCounter(name string, options ...metric.Float64ObservableCounterOption) (metric.Float64ObservableCounter, error) {
cfg := metric.NewFloat64ObservableCounterConfig(options...)
- const kind = InstrumentKindObservableCounter
- p := float64ObservProvider{m}
- inst, err := p.lookup(kind, name, cfg.Description(), cfg.Unit())
- if err != nil {
- return nil, err
+ id := Instrument{
+ Name: name,
+ Description: cfg.Description(),
+ Unit: cfg.Unit(),
+ Kind: InstrumentKindObservableCounter,
+ Scope: m.scope,
}
- p.registerCallbacks(inst, cfg.Callbacks())
- return inst, validateInstrumentName(name)
+ return m.float64ObservableInstrument(id, cfg.Callbacks())
}
// Float64ObservableUpDownCounter returns a new instrument identified by name
@@ -218,14 +317,14 @@ func (m *meter) Float64ObservableCounter(name string, options ...metric.Float64O
// measurements recorded during the collection cycle are exported.
func (m *meter) Float64ObservableUpDownCounter(name string, options ...metric.Float64ObservableUpDownCounterOption) (metric.Float64ObservableUpDownCounter, error) {
cfg := metric.NewFloat64ObservableUpDownCounterConfig(options...)
- const kind = InstrumentKindObservableUpDownCounter
- p := float64ObservProvider{m}
- inst, err := p.lookup(kind, name, cfg.Description(), cfg.Unit())
- if err != nil {
- return nil, err
+ id := Instrument{
+ Name: name,
+ Description: cfg.Description(),
+ Unit: cfg.Unit(),
+ Kind: InstrumentKindObservableUpDownCounter,
+ Scope: m.scope,
}
- p.registerCallbacks(inst, cfg.Callbacks())
- return inst, validateInstrumentName(name)
+ return m.float64ObservableInstrument(id, cfg.Callbacks())
}
// Float64ObservableGauge returns a new instrument identified by name and
@@ -234,14 +333,14 @@ func (m *meter) Float64ObservableUpDownCounter(name string, options ...metric.Fl
// Only the measurements recorded during the collection cycle are exported.
func (m *meter) Float64ObservableGauge(name string, options ...metric.Float64ObservableGaugeOption) (metric.Float64ObservableGauge, error) {
cfg := metric.NewFloat64ObservableGaugeConfig(options...)
- const kind = InstrumentKindObservableGauge
- p := float64ObservProvider{m}
- inst, err := p.lookup(kind, name, cfg.Description(), cfg.Unit())
- if err != nil {
- return nil, err
+ id := Instrument{
+ Name: name,
+ Description: cfg.Description(),
+ Unit: cfg.Unit(),
+ Kind: InstrumentKindObservableGauge,
+ Scope: m.scope,
}
- p.registerCallbacks(inst, cfg.Callbacks())
- return inst, validateInstrumentName(name)
+ return m.float64ObservableInstrument(id, cfg.Callbacks())
}
func validateInstrumentName(name string) error {
@@ -273,6 +372,16 @@ func isAlphanumeric(c rune) bool {
return isAlpha(c) || ('0' <= c && c <= '9')
}
+func warnRepeatedObservableCallbacks(id Instrument) {
+ inst := fmt.Sprintf(
+ "Instrument{Name: %q, Description: %q, Kind: %q, Unit: %q}",
+ id.Name, id.Description, "InstrumentKind"+id.Kind.String(), id.Unit,
+ )
+ global.Warn("Repeated observable instrument creation with callbacks. Ignoring new callbacks. Use meter.RegisterCallback and Registration.Unregister to manage callbacks.",
+ "instrument", inst,
+ )
+}
+
// RegisterCallback registers f to be called each collection cycle so it will
// make observations for insts during those cycles.
//
@@ -389,12 +498,14 @@ func (r observer) ObserveFloat64(o metric.Float64Observable, v float64, opts ...
}
if _, registered := r.float64[oImpl.observablID]; !registered {
- global.Error(errUnregObserver, "failed to record",
- "name", oImpl.name,
- "description", oImpl.description,
- "unit", oImpl.unit,
- "number", fmt.Sprintf("%T", float64(0)),
- )
+ if !oImpl.dropAggregation {
+ global.Error(errUnregObserver, "failed to record",
+ "name", oImpl.name,
+ "description", oImpl.description,
+ "unit", oImpl.unit,
+ "number", fmt.Sprintf("%T", float64(0)),
+ )
+ }
return
}
c := metric.NewObserveConfig(opts)
@@ -422,12 +533,14 @@ func (r observer) ObserveInt64(o metric.Int64Observable, v int64, opts ...metric
}
if _, registered := r.int64[oImpl.observablID]; !registered {
- global.Error(errUnregObserver, "failed to record",
- "name", oImpl.name,
- "description", oImpl.description,
- "unit", oImpl.unit,
- "number", fmt.Sprintf("%T", int64(0)),
- )
+ if !oImpl.dropAggregation {
+ global.Error(errUnregObserver, "failed to record",
+ "name", oImpl.name,
+ "description", oImpl.description,
+ "unit", oImpl.unit,
+ "number", fmt.Sprintf("%T", int64(0)),
+ )
+ }
return
}
c := metric.NewObserveConfig(opts)
@@ -474,14 +587,28 @@ func (p int64InstProvider) histogramAggs(name string, cfg metric.Int64HistogramC
// lookup returns the resolved instrumentImpl.
func (p int64InstProvider) lookup(kind InstrumentKind, name, desc, u string) (*int64Inst, error) {
- aggs, err := p.aggs(kind, name, desc, u)
- return &int64Inst{measures: aggs}, err
+ return p.meter.int64Insts.Lookup(instID{
+ Name: name,
+ Description: desc,
+ Unit: u,
+ Kind: kind,
+ }, func() (*int64Inst, error) {
+ aggs, err := p.aggs(kind, name, desc, u)
+ return &int64Inst{measures: aggs}, err
+ })
}
// lookupHistogram returns the resolved instrumentImpl.
func (p int64InstProvider) lookupHistogram(name string, cfg metric.Int64HistogramConfig) (*int64Inst, error) {
- aggs, err := p.histogramAggs(name, cfg)
- return &int64Inst{measures: aggs}, err
+ return p.meter.int64Insts.Lookup(instID{
+ Name: name,
+ Description: cfg.Description(),
+ Unit: cfg.Unit(),
+ Kind: InstrumentKindHistogram,
+ }, func() (*int64Inst, error) {
+ aggs, err := p.histogramAggs(name, cfg)
+ return &int64Inst{measures: aggs}, err
+ })
}
// float64InstProvider provides float64 OpenTelemetry instruments.
@@ -518,42 +645,33 @@ func (p float64InstProvider) histogramAggs(name string, cfg metric.Float64Histog
// lookup returns the resolved instrumentImpl.
func (p float64InstProvider) lookup(kind InstrumentKind, name, desc, u string) (*float64Inst, error) {
- aggs, err := p.aggs(kind, name, desc, u)
- return &float64Inst{measures: aggs}, err
+ return p.meter.float64Insts.Lookup(instID{
+ Name: name,
+ Description: desc,
+ Unit: u,
+ Kind: kind,
+ }, func() (*float64Inst, error) {
+ aggs, err := p.aggs(kind, name, desc, u)
+ return &float64Inst{measures: aggs}, err
+ })
}
// lookupHistogram returns the resolved instrumentImpl.
func (p float64InstProvider) lookupHistogram(name string, cfg metric.Float64HistogramConfig) (*float64Inst, error) {
- aggs, err := p.histogramAggs(name, cfg)
- return &float64Inst{measures: aggs}, err
-}
-
-type int64ObservProvider struct{ *meter }
-
-func (p int64ObservProvider) lookup(kind InstrumentKind, name, desc, u string) (int64Observable, error) {
- aggs, err := (int64InstProvider)(p).aggs(kind, name, desc, u)
- return newInt64Observable(p.meter, kind, name, desc, u, aggs), err
-}
-
-func (p int64ObservProvider) registerCallbacks(inst int64Observable, cBacks []metric.Int64Callback) {
- if inst.observable == nil || len(inst.measures) == 0 {
- // Drop aggregator.
- return
- }
-
- for _, cBack := range cBacks {
- p.pipes.registerCallback(p.callback(inst, cBack))
- }
-}
-
-func (p int64ObservProvider) callback(i int64Observable, f metric.Int64Callback) func(context.Context) error {
- inst := int64Observer{int64Observable: i}
- return func(ctx context.Context) error { return f(ctx, inst) }
+ return p.meter.float64Insts.Lookup(instID{
+ Name: name,
+ Description: cfg.Description(),
+ Unit: cfg.Unit(),
+ Kind: InstrumentKindHistogram,
+ }, func() (*float64Inst, error) {
+ aggs, err := p.histogramAggs(name, cfg)
+ return &float64Inst{measures: aggs}, err
+ })
}
type int64Observer struct {
embedded.Int64Observer
- int64Observable
+ measures[int64]
}
func (o int64Observer) Observe(val int64, opts ...metric.ObserveOption) {
@@ -561,32 +679,9 @@ func (o int64Observer) Observe(val int64, opts ...metric.ObserveOption) {
o.observe(val, c.Attributes())
}
-type float64ObservProvider struct{ *meter }
-
-func (p float64ObservProvider) lookup(kind InstrumentKind, name, desc, u string) (float64Observable, error) {
- aggs, err := (float64InstProvider)(p).aggs(kind, name, desc, u)
- return newFloat64Observable(p.meter, kind, name, desc, u, aggs), err
-}
-
-func (p float64ObservProvider) registerCallbacks(inst float64Observable, cBacks []metric.Float64Callback) {
- if inst.observable == nil || len(inst.measures) == 0 {
- // Drop aggregator.
- return
- }
-
- for _, cBack := range cBacks {
- p.pipes.registerCallback(p.callback(inst, cBack))
- }
-}
-
-func (p float64ObservProvider) callback(i float64Observable, f metric.Float64Callback) func(context.Context) error {
- inst := float64Observer{float64Observable: i}
- return func(ctx context.Context) error { return f(ctx, inst) }
-}
-
type float64Observer struct {
embedded.Float64Observer
- float64Observable
+ measures[float64]
}
func (o float64Observer) Observe(val float64, opts ...metric.ObserveOption) {