summaryrefslogtreecommitdiff
path: root/vendor/go.opentelemetry.io/otel/sdk/metric/meter.go
diff options
context:
space:
mode:
Diffstat (limited to 'vendor/go.opentelemetry.io/otel/sdk/metric/meter.go')
-rw-r--r--vendor/go.opentelemetry.io/otel/sdk/metric/meter.go123
1 files changed, 65 insertions, 58 deletions
diff --git a/vendor/go.opentelemetry.io/otel/sdk/metric/meter.go b/vendor/go.opentelemetry.io/otel/sdk/metric/meter.go
index 2309e5b2b..a6ccd117b 100644
--- a/vendor/go.opentelemetry.io/otel/sdk/metric/meter.go
+++ b/vendor/go.opentelemetry.io/otel/sdk/metric/meter.go
@@ -150,6 +150,11 @@ func (m *meter) int64ObservableInstrument(id Instrument, callbacks []metric.Int6
continue
}
inst.appendMeasures(in)
+
+ // Add the measures to the pipeline. It is required to maintain
+ // measures per pipeline to avoid calling the measure that
+ // is not part of the pipeline.
+ insert.pipeline.addInt64Measure(inst.observableID, in)
for _, cback := range callbacks {
inst := int64Observer{measures: in}
fn := cback
@@ -309,6 +314,11 @@ func (m *meter) float64ObservableInstrument(id Instrument, callbacks []metric.Fl
continue
}
inst.appendMeasures(in)
+
+ // Add the measures to the pipeline. It is required to maintain
+ // measures per pipeline to avoid calling the measure that
+ // is not part of the pipeline.
+ insert.pipeline.addFloat64Measure(inst.observableID, in)
for _, cback := range callbacks {
inst := float64Observer{measures: in}
fn := cback
@@ -441,73 +451,80 @@ func (m *meter) RegisterCallback(f metric.Callback, insts ...metric.Observable)
return noopRegister{}, nil
}
- reg := newObserver()
- var errs multierror
+ var err error
+ validInstruments := make([]metric.Observable, 0, len(insts))
for _, inst := range insts {
- // Unwrap any global.
- if u, ok := inst.(interface {
- Unwrap() metric.Observable
- }); ok {
- inst = u.Unwrap()
- }
-
switch o := inst.(type) {
case int64Observable:
- if err := o.registerable(m); err != nil {
- if !errors.Is(err, errEmptyAgg) {
- errs.append(err)
+ if e := o.registerable(m); e != nil {
+ if !errors.Is(e, errEmptyAgg) {
+ err = errors.Join(err, e)
}
continue
}
- reg.registerInt64(o.observablID)
+
+ validInstruments = append(validInstruments, inst)
case float64Observable:
- if err := o.registerable(m); err != nil {
- if !errors.Is(err, errEmptyAgg) {
- errs.append(err)
+ if e := o.registerable(m); e != nil {
+ if !errors.Is(e, errEmptyAgg) {
+ err = errors.Join(err, e)
}
continue
}
- reg.registerFloat64(o.observablID)
+
+ validInstruments = append(validInstruments, inst)
default:
// Instrument external to the SDK.
- return nil, fmt.Errorf("invalid observable: from different implementation")
+ return nil, errors.New("invalid observable: from different implementation")
}
}
- err := errs.errorOrNil()
- if reg.len() == 0 {
+ if len(validInstruments) == 0 {
// All insts use drop aggregation or are invalid.
return noopRegister{}, err
}
- // Some or all instruments were valid.
- cback := func(ctx context.Context) error { return f(ctx, reg) }
- return m.pipes.registerMultiCallback(cback), err
+ unregs := make([]func(), len(m.pipes))
+ for ix, pipe := range m.pipes {
+ reg := newObserver(pipe)
+ for _, inst := range validInstruments {
+ switch o := inst.(type) {
+ case int64Observable:
+ reg.registerInt64(o.observableID)
+ case float64Observable:
+ reg.registerFloat64(o.observableID)
+ }
+ }
+
+ // Some or all instruments were valid.
+ cBack := func(ctx context.Context) error { return f(ctx, reg) }
+ unregs[ix] = pipe.addMultiCallback(cBack)
+ }
+
+ return unregisterFuncs{f: unregs}, err
}
type observer struct {
embedded.Observer
- float64 map[observablID[float64]]struct{}
- int64 map[observablID[int64]]struct{}
+ pipe *pipeline
+ float64 map[observableID[float64]]struct{}
+ int64 map[observableID[int64]]struct{}
}
-func newObserver() observer {
+func newObserver(p *pipeline) observer {
return observer{
- float64: make(map[observablID[float64]]struct{}),
- int64: make(map[observablID[int64]]struct{}),
+ pipe: p,
+ float64: make(map[observableID[float64]]struct{}),
+ int64: make(map[observableID[int64]]struct{}),
}
}
-func (r observer) len() int {
- return len(r.float64) + len(r.int64)
-}
-
-func (r observer) registerFloat64(id observablID[float64]) {
+func (r observer) registerFloat64(id observableID[float64]) {
r.float64[id] = struct{}{}
}
-func (r observer) registerInt64(id observablID[int64]) {
+func (r observer) registerInt64(id observableID[int64]) {
r.int64[id] = struct{}{}
}
@@ -521,22 +538,12 @@ func (r observer) ObserveFloat64(o metric.Float64Observable, v float64, opts ...
switch conv := o.(type) {
case float64Observable:
oImpl = conv
- case interface {
- Unwrap() metric.Observable
- }:
- // Unwrap any global.
- async := conv.Unwrap()
- var ok bool
- if oImpl, ok = async.(float64Observable); !ok {
- global.Error(errUnknownObserver, "failed to record asynchronous")
- return
- }
default:
global.Error(errUnknownObserver, "failed to record")
return
}
- if _, registered := r.float64[oImpl.observablID]; !registered {
+ if _, registered := r.float64[oImpl.observableID]; !registered {
if !oImpl.dropAggregation {
global.Error(errUnregObserver, "failed to record",
"name", oImpl.name,
@@ -548,7 +555,12 @@ func (r observer) ObserveFloat64(o metric.Float64Observable, v float64, opts ...
return
}
c := metric.NewObserveConfig(opts)
- oImpl.observe(v, c.Attributes())
+ // Access to r.pipe.float64Measure is already guarded by a lock in pipeline.produce.
+ // TODO (#5946): Refactor pipeline and observable measures.
+ measures := r.pipe.float64Measures[oImpl.observableID]
+ for _, m := range measures {
+ m(context.Background(), v, c.Attributes())
+ }
}
func (r observer) ObserveInt64(o metric.Int64Observable, v int64, opts ...metric.ObserveOption) {
@@ -556,22 +568,12 @@ func (r observer) ObserveInt64(o metric.Int64Observable, v int64, opts ...metric
switch conv := o.(type) {
case int64Observable:
oImpl = conv
- case interface {
- Unwrap() metric.Observable
- }:
- // Unwrap any global.
- async := conv.Unwrap()
- var ok bool
- if oImpl, ok = async.(int64Observable); !ok {
- global.Error(errUnknownObserver, "failed to record asynchronous")
- return
- }
default:
global.Error(errUnknownObserver, "failed to record")
return
}
- if _, registered := r.int64[oImpl.observablID]; !registered {
+ if _, registered := r.int64[oImpl.observableID]; !registered {
if !oImpl.dropAggregation {
global.Error(errUnregObserver, "failed to record",
"name", oImpl.name,
@@ -583,7 +585,12 @@ func (r observer) ObserveInt64(o metric.Int64Observable, v int64, opts ...metric
return
}
c := metric.NewObserveConfig(opts)
- oImpl.observe(v, c.Attributes())
+ // Access to r.pipe.int64Measures is already guarded b a lock in pipeline.produce.
+ // TODO (#5946): Refactor pipeline and observable measures.
+ measures := r.pipe.int64Measures[oImpl.observableID]
+ for _, m := range measures {
+ m(context.Background(), v, c.Attributes())
+ }
}
type noopRegister struct{ embedded.Registration }