summaryrefslogtreecommitdiff
path: root/vendor/go.opentelemetry.io/otel/sdk/metric/periodic_reader.go
diff options
context:
space:
mode:
Diffstat (limited to 'vendor/go.opentelemetry.io/otel/sdk/metric/periodic_reader.go')
-rw-r--r--vendor/go.opentelemetry.io/otel/sdk/metric/periodic_reader.go381
1 files changed, 381 insertions, 0 deletions
diff --git a/vendor/go.opentelemetry.io/otel/sdk/metric/periodic_reader.go b/vendor/go.opentelemetry.io/otel/sdk/metric/periodic_reader.go
new file mode 100644
index 000000000..ff86999c7
--- /dev/null
+++ b/vendor/go.opentelemetry.io/otel/sdk/metric/periodic_reader.go
@@ -0,0 +1,381 @@
+// Copyright The OpenTelemetry Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package metric // import "go.opentelemetry.io/otel/sdk/metric"
+
+import (
+ "context"
+ "errors"
+ "fmt"
+ "sync"
+ "sync/atomic"
+ "time"
+
+ "go.opentelemetry.io/otel"
+ "go.opentelemetry.io/otel/internal/global"
+ "go.opentelemetry.io/otel/sdk/metric/metricdata"
+)
+
+// Default periodic reader timing.
+const (
+ defaultTimeout = time.Millisecond * 30000
+ defaultInterval = time.Millisecond * 60000
+)
+
+// periodicReaderConfig contains configuration options for a PeriodicReader.
+type periodicReaderConfig struct {
+ interval time.Duration
+ timeout time.Duration
+ producers []Producer
+}
+
+// newPeriodicReaderConfig returns a periodicReaderConfig configured with
+// options.
+func newPeriodicReaderConfig(options []PeriodicReaderOption) periodicReaderConfig {
+ c := periodicReaderConfig{
+ interval: envDuration(envInterval, defaultInterval),
+ timeout: envDuration(envTimeout, defaultTimeout),
+ }
+ for _, o := range options {
+ c = o.applyPeriodic(c)
+ }
+ return c
+}
+
+// PeriodicReaderOption applies a configuration option value to a PeriodicReader.
+type PeriodicReaderOption interface {
+ applyPeriodic(periodicReaderConfig) periodicReaderConfig
+}
+
+// periodicReaderOptionFunc applies a set of options to a periodicReaderConfig.
+type periodicReaderOptionFunc func(periodicReaderConfig) periodicReaderConfig
+
+// applyPeriodic returns a periodicReaderConfig with option(s) applied.
+func (o periodicReaderOptionFunc) applyPeriodic(conf periodicReaderConfig) periodicReaderConfig {
+ return o(conf)
+}
+
+// WithTimeout configures the time a PeriodicReader waits for an export to
+// complete before canceling it. This includes an export which occurs as part
+// of Shutdown or ForceFlush if the user passed context does not have a
+// deadline. If the user passed context does have a deadline, it will be used
+// instead.
+//
+// This option overrides any value set for the
+// OTEL_METRIC_EXPORT_TIMEOUT environment variable.
+//
+// If this option is not used or d is less than or equal to zero, 30 seconds
+// is used as the default.
+func WithTimeout(d time.Duration) PeriodicReaderOption {
+ return periodicReaderOptionFunc(func(conf periodicReaderConfig) periodicReaderConfig {
+ if d <= 0 {
+ return conf
+ }
+ conf.timeout = d
+ return conf
+ })
+}
+
+// WithInterval configures the intervening time between exports for a
+// PeriodicReader.
+//
+// This option overrides any value set for the
+// OTEL_METRIC_EXPORT_INTERVAL environment variable.
+//
+// If this option is not used or d is less than or equal to zero, 60 seconds
+// is used as the default.
+func WithInterval(d time.Duration) PeriodicReaderOption {
+ return periodicReaderOptionFunc(func(conf periodicReaderConfig) periodicReaderConfig {
+ if d <= 0 {
+ return conf
+ }
+ conf.interval = d
+ return conf
+ })
+}
+
+// NewPeriodicReader returns a Reader that collects and exports metric data to
+// the exporter at a defined interval. By default, the returned Reader will
+// collect and export data every 60 seconds, and will cancel any attempts that
+// exceed 30 seconds, collect and export combined. The collect and export time
+// are not counted towards the interval between attempts.
+//
+// The Collect method of the returned Reader continues to gather and return
+// metric data to the user. It will not automatically send that data to the
+// exporter. That is left to the user to accomplish.
+func NewPeriodicReader(exporter Exporter, options ...PeriodicReaderOption) *PeriodicReader {
+ conf := newPeriodicReaderConfig(options)
+ ctx, cancel := context.WithCancel(context.Background())
+ r := &PeriodicReader{
+ interval: conf.interval,
+ timeout: conf.timeout,
+ exporter: exporter,
+ flushCh: make(chan chan error),
+ cancel: cancel,
+ done: make(chan struct{}),
+ rmPool: sync.Pool{
+ New: func() interface{} {
+ return &metricdata.ResourceMetrics{}
+ },
+ },
+ }
+ r.externalProducers.Store(conf.producers)
+
+ go func() {
+ defer func() { close(r.done) }()
+ r.run(ctx, conf.interval)
+ }()
+
+ return r
+}
+
+// PeriodicReader is a Reader that continuously collects and exports metric
+// data at a set interval.
+type PeriodicReader struct {
+ sdkProducer atomic.Value
+
+ mu sync.Mutex
+ isShutdown bool
+ externalProducers atomic.Value
+
+ interval time.Duration
+ timeout time.Duration
+ exporter Exporter
+ flushCh chan chan error
+
+ done chan struct{}
+ cancel context.CancelFunc
+ shutdownOnce sync.Once
+
+ rmPool sync.Pool
+}
+
+// Compile time check the periodicReader implements Reader and is comparable.
+var _ = map[Reader]struct{}{&PeriodicReader{}: {}}
+
+// newTicker allows testing override.
+var newTicker = time.NewTicker
+
+// run continuously collects and exports metric data at the specified
+// interval. This will run until ctx is canceled or times out.
+func (r *PeriodicReader) run(ctx context.Context, interval time.Duration) {
+ ticker := newTicker(interval)
+ defer ticker.Stop()
+
+ for {
+ select {
+ case <-ticker.C:
+ err := r.collectAndExport(ctx)
+ if err != nil {
+ otel.Handle(err)
+ }
+ case errCh := <-r.flushCh:
+ errCh <- r.collectAndExport(ctx)
+ ticker.Reset(interval)
+ case <-ctx.Done():
+ return
+ }
+ }
+}
+
+// register registers p as the producer of this reader.
+func (r *PeriodicReader) register(p sdkProducer) {
+ // Only register once. If producer is already set, do nothing.
+ if !r.sdkProducer.CompareAndSwap(nil, produceHolder{produce: p.produce}) {
+ msg := "did not register periodic reader"
+ global.Error(errDuplicateRegister, msg)
+ }
+}
+
+// temporality reports the Temporality for the instrument kind provided.
+func (r *PeriodicReader) temporality(kind InstrumentKind) metricdata.Temporality {
+ return r.exporter.Temporality(kind)
+}
+
+// aggregation returns what Aggregation to use for kind.
+func (r *PeriodicReader) aggregation(kind InstrumentKind) Aggregation { // nolint:revive // import-shadow for method scoped by type.
+ return r.exporter.Aggregation(kind)
+}
+
+// collectAndExport gather all metric data related to the periodicReader r from
+// the SDK and exports it with r's exporter.
+func (r *PeriodicReader) collectAndExport(ctx context.Context) error {
+ ctx, cancel := context.WithTimeout(ctx, r.timeout)
+ defer cancel()
+
+ // TODO (#3047): Use a sync.Pool or persistent pointer instead of allocating rm every Collect.
+ rm := r.rmPool.Get().(*metricdata.ResourceMetrics)
+ err := r.Collect(ctx, rm)
+ if err == nil {
+ err = r.export(ctx, rm)
+ }
+ r.rmPool.Put(rm)
+ return err
+}
+
+// Collect gathers all metric data related to the Reader from
+// the SDK and other Producers and stores the result in rm. The metric
+// data is not exported to the configured exporter, it is left to the caller to
+// handle that if desired.
+//
+// Collect will return an error if called after shutdown.
+// Collect will return an error if rm is a nil ResourceMetrics.
+// Collect will return an error if the context's Done channel is closed.
+//
+// This method is safe to call concurrently.
+func (r *PeriodicReader) Collect(ctx context.Context, rm *metricdata.ResourceMetrics) error {
+ if rm == nil {
+ return errors.New("periodic reader: *metricdata.ResourceMetrics is nil")
+ }
+ // TODO (#3047): When collect is updated to accept output as param, pass rm.
+ return r.collect(ctx, r.sdkProducer.Load(), rm)
+}
+
+// collect unwraps p as a produceHolder and returns its produce results.
+func (r *PeriodicReader) collect(ctx context.Context, p interface{}, rm *metricdata.ResourceMetrics) error {
+ if p == nil {
+ return ErrReaderNotRegistered
+ }
+
+ ph, ok := p.(produceHolder)
+ if !ok {
+ // The atomic.Value is entirely in the periodicReader's control so
+ // this should never happen. In the unforeseen case that this does
+ // happen, return an error instead of panicking so a users code does
+ // not halt in the processes.
+ err := fmt.Errorf("periodic reader: invalid producer: %T", p)
+ return err
+ }
+
+ err := ph.produce(ctx, rm)
+ if err != nil {
+ return err
+ }
+ var errs []error
+ for _, producer := range r.externalProducers.Load().([]Producer) {
+ externalMetrics, err := producer.Produce(ctx)
+ if err != nil {
+ errs = append(errs, err)
+ }
+ rm.ScopeMetrics = append(rm.ScopeMetrics, externalMetrics...)
+ }
+
+ global.Debug("PeriodicReader collection", "Data", rm)
+
+ return unifyErrors(errs)
+}
+
+// export exports metric data m using r's exporter.
+func (r *PeriodicReader) export(ctx context.Context, m *metricdata.ResourceMetrics) error {
+ return r.exporter.Export(ctx, m)
+}
+
+// ForceFlush flushes pending telemetry.
+//
+// This method is safe to call concurrently.
+func (r *PeriodicReader) ForceFlush(ctx context.Context) error {
+ // Prioritize the ctx timeout if it is set.
+ if _, ok := ctx.Deadline(); !ok {
+ var cancel context.CancelFunc
+ ctx, cancel = context.WithTimeout(ctx, r.timeout)
+ defer cancel()
+ }
+
+ errCh := make(chan error, 1)
+ select {
+ case r.flushCh <- errCh:
+ select {
+ case err := <-errCh:
+ if err != nil {
+ return err
+ }
+ close(errCh)
+ case <-ctx.Done():
+ return ctx.Err()
+ }
+ case <-r.done:
+ return ErrReaderShutdown
+ case <-ctx.Done():
+ return ctx.Err()
+ }
+ return r.exporter.ForceFlush(ctx)
+}
+
+// Shutdown flushes pending telemetry and then stops the export pipeline.
+//
+// This method is safe to call concurrently.
+func (r *PeriodicReader) Shutdown(ctx context.Context) error {
+ err := ErrReaderShutdown
+ r.shutdownOnce.Do(func() {
+ // Prioritize the ctx timeout if it is set.
+ if _, ok := ctx.Deadline(); !ok {
+ var cancel context.CancelFunc
+ ctx, cancel = context.WithTimeout(ctx, r.timeout)
+ defer cancel()
+ }
+
+ // Stop the run loop.
+ r.cancel()
+ <-r.done
+
+ // Any future call to Collect will now return ErrReaderShutdown.
+ ph := r.sdkProducer.Swap(produceHolder{
+ produce: shutdownProducer{}.produce,
+ })
+
+ if ph != nil { // Reader was registered.
+ // Flush pending telemetry.
+ m := r.rmPool.Get().(*metricdata.ResourceMetrics)
+ err = r.collect(ctx, ph, m)
+ if err == nil {
+ err = r.export(ctx, m)
+ }
+ r.rmPool.Put(m)
+ }
+
+ sErr := r.exporter.Shutdown(ctx)
+ if err == nil || err == ErrReaderShutdown {
+ err = sErr
+ }
+
+ r.mu.Lock()
+ defer r.mu.Unlock()
+ r.isShutdown = true
+ // release references to Producer(s)
+ r.externalProducers.Store([]Producer{})
+ })
+ return err
+}
+
+// MarshalLog returns logging data about the PeriodicReader.
+func (r *PeriodicReader) MarshalLog() interface{} {
+ r.mu.Lock()
+ down := r.isShutdown
+ r.mu.Unlock()
+ return struct {
+ Type string
+ Exporter Exporter
+ Registered bool
+ Shutdown bool
+ Interval time.Duration
+ Timeout time.Duration
+ }{
+ Type: "PeriodicReader",
+ Exporter: r.exporter,
+ Registered: r.sdkProducer.Load() != nil,
+ Shutdown: down,
+ Interval: r.interval,
+ Timeout: r.timeout,
+ }
+}