diff options
Diffstat (limited to 'vendor/go.opentelemetry.io/otel/sdk/metric/periodic_reader.go')
-rw-r--r-- | vendor/go.opentelemetry.io/otel/sdk/metric/periodic_reader.go | 369 |
1 files changed, 0 insertions, 369 deletions
diff --git a/vendor/go.opentelemetry.io/otel/sdk/metric/periodic_reader.go b/vendor/go.opentelemetry.io/otel/sdk/metric/periodic_reader.go deleted file mode 100644 index dcd2182d9..000000000 --- a/vendor/go.opentelemetry.io/otel/sdk/metric/periodic_reader.go +++ /dev/null @@ -1,369 +0,0 @@ -// Copyright The OpenTelemetry Authors -// SPDX-License-Identifier: Apache-2.0 - -package metric // import "go.opentelemetry.io/otel/sdk/metric" - -import ( - "context" - "errors" - "fmt" - "sync" - "sync/atomic" - "time" - - "go.opentelemetry.io/otel" - "go.opentelemetry.io/otel/internal/global" - "go.opentelemetry.io/otel/sdk/metric/metricdata" -) - -// Default periodic reader timing. -const ( - defaultTimeout = time.Millisecond * 30000 - defaultInterval = time.Millisecond * 60000 -) - -// periodicReaderConfig contains configuration options for a PeriodicReader. -type periodicReaderConfig struct { - interval time.Duration - timeout time.Duration - producers []Producer -} - -// newPeriodicReaderConfig returns a periodicReaderConfig configured with -// options. -func newPeriodicReaderConfig(options []PeriodicReaderOption) periodicReaderConfig { - c := periodicReaderConfig{ - interval: envDuration(envInterval, defaultInterval), - timeout: envDuration(envTimeout, defaultTimeout), - } - for _, o := range options { - c = o.applyPeriodic(c) - } - return c -} - -// PeriodicReaderOption applies a configuration option value to a PeriodicReader. -type PeriodicReaderOption interface { - applyPeriodic(periodicReaderConfig) periodicReaderConfig -} - -// periodicReaderOptionFunc applies a set of options to a periodicReaderConfig. -type periodicReaderOptionFunc func(periodicReaderConfig) periodicReaderConfig - -// applyPeriodic returns a periodicReaderConfig with option(s) applied. -func (o periodicReaderOptionFunc) applyPeriodic(conf periodicReaderConfig) periodicReaderConfig { - return o(conf) -} - -// WithTimeout configures the time a PeriodicReader waits for an export to -// complete before canceling it. This includes an export which occurs as part -// of Shutdown or ForceFlush if the user passed context does not have a -// deadline. If the user passed context does have a deadline, it will be used -// instead. -// -// This option overrides any value set for the -// OTEL_METRIC_EXPORT_TIMEOUT environment variable. -// -// If this option is not used or d is less than or equal to zero, 30 seconds -// is used as the default. -func WithTimeout(d time.Duration) PeriodicReaderOption { - return periodicReaderOptionFunc(func(conf periodicReaderConfig) periodicReaderConfig { - if d <= 0 { - return conf - } - conf.timeout = d - return conf - }) -} - -// WithInterval configures the intervening time between exports for a -// PeriodicReader. -// -// This option overrides any value set for the -// OTEL_METRIC_EXPORT_INTERVAL environment variable. -// -// If this option is not used or d is less than or equal to zero, 60 seconds -// is used as the default. -func WithInterval(d time.Duration) PeriodicReaderOption { - return periodicReaderOptionFunc(func(conf periodicReaderConfig) periodicReaderConfig { - if d <= 0 { - return conf - } - conf.interval = d - return conf - }) -} - -// NewPeriodicReader returns a Reader that collects and exports metric data to -// the exporter at a defined interval. By default, the returned Reader will -// collect and export data every 60 seconds, and will cancel any attempts that -// exceed 30 seconds, collect and export combined. The collect and export time -// are not counted towards the interval between attempts. -// -// The Collect method of the returned Reader continues to gather and return -// metric data to the user. It will not automatically send that data to the -// exporter. That is left to the user to accomplish. -func NewPeriodicReader(exporter Exporter, options ...PeriodicReaderOption) *PeriodicReader { - conf := newPeriodicReaderConfig(options) - ctx, cancel := context.WithCancel(context.Background()) - r := &PeriodicReader{ - interval: conf.interval, - timeout: conf.timeout, - exporter: exporter, - flushCh: make(chan chan error), - cancel: cancel, - done: make(chan struct{}), - rmPool: sync.Pool{ - New: func() interface{} { - return &metricdata.ResourceMetrics{} - }, - }, - } - r.externalProducers.Store(conf.producers) - - go func() { - defer func() { close(r.done) }() - r.run(ctx, conf.interval) - }() - - return r -} - -// PeriodicReader is a Reader that continuously collects and exports metric -// data at a set interval. -type PeriodicReader struct { - sdkProducer atomic.Value - - mu sync.Mutex - isShutdown bool - externalProducers atomic.Value - - interval time.Duration - timeout time.Duration - exporter Exporter - flushCh chan chan error - - done chan struct{} - cancel context.CancelFunc - shutdownOnce sync.Once - - rmPool sync.Pool -} - -// Compile time check the periodicReader implements Reader and is comparable. -var _ = map[Reader]struct{}{&PeriodicReader{}: {}} - -// newTicker allows testing override. -var newTicker = time.NewTicker - -// run continuously collects and exports metric data at the specified -// interval. This will run until ctx is canceled or times out. -func (r *PeriodicReader) run(ctx context.Context, interval time.Duration) { - ticker := newTicker(interval) - defer ticker.Stop() - - for { - select { - case <-ticker.C: - err := r.collectAndExport(ctx) - if err != nil { - otel.Handle(err) - } - case errCh := <-r.flushCh: - errCh <- r.collectAndExport(ctx) - ticker.Reset(interval) - case <-ctx.Done(): - return - } - } -} - -// register registers p as the producer of this reader. -func (r *PeriodicReader) register(p sdkProducer) { - // Only register once. If producer is already set, do nothing. - if !r.sdkProducer.CompareAndSwap(nil, produceHolder{produce: p.produce}) { - msg := "did not register periodic reader" - global.Error(errDuplicateRegister, msg) - } -} - -// temporality reports the Temporality for the instrument kind provided. -func (r *PeriodicReader) temporality(kind InstrumentKind) metricdata.Temporality { - return r.exporter.Temporality(kind) -} - -// aggregation returns what Aggregation to use for kind. -func (r *PeriodicReader) aggregation(kind InstrumentKind) Aggregation { // nolint:revive // import-shadow for method scoped by type. - return r.exporter.Aggregation(kind) -} - -// collectAndExport gather all metric data related to the periodicReader r from -// the SDK and exports it with r's exporter. -func (r *PeriodicReader) collectAndExport(ctx context.Context) error { - ctx, cancel := context.WithTimeout(ctx, r.timeout) - defer cancel() - - // TODO (#3047): Use a sync.Pool or persistent pointer instead of allocating rm every Collect. - rm := r.rmPool.Get().(*metricdata.ResourceMetrics) - err := r.Collect(ctx, rm) - if err == nil { - err = r.export(ctx, rm) - } - r.rmPool.Put(rm) - return err -} - -// Collect gathers all metric data related to the Reader from -// the SDK and other Producers and stores the result in rm. The metric -// data is not exported to the configured exporter, it is left to the caller to -// handle that if desired. -// -// Collect will return an error if called after shutdown. -// Collect will return an error if rm is a nil ResourceMetrics. -// Collect will return an error if the context's Done channel is closed. -// -// This method is safe to call concurrently. -func (r *PeriodicReader) Collect(ctx context.Context, rm *metricdata.ResourceMetrics) error { - if rm == nil { - return errors.New("periodic reader: *metricdata.ResourceMetrics is nil") - } - // TODO (#3047): When collect is updated to accept output as param, pass rm. - return r.collect(ctx, r.sdkProducer.Load(), rm) -} - -// collect unwraps p as a produceHolder and returns its produce results. -func (r *PeriodicReader) collect(ctx context.Context, p interface{}, rm *metricdata.ResourceMetrics) error { - if p == nil { - return ErrReaderNotRegistered - } - - ph, ok := p.(produceHolder) - if !ok { - // The atomic.Value is entirely in the periodicReader's control so - // this should never happen. In the unforeseen case that this does - // happen, return an error instead of panicking so a users code does - // not halt in the processes. - err := fmt.Errorf("periodic reader: invalid producer: %T", p) - return err - } - - err := ph.produce(ctx, rm) - if err != nil { - return err - } - for _, producer := range r.externalProducers.Load().([]Producer) { - externalMetrics, e := producer.Produce(ctx) - if e != nil { - err = errors.Join(err, e) - } - rm.ScopeMetrics = append(rm.ScopeMetrics, externalMetrics...) - } - - global.Debug("PeriodicReader collection", "Data", rm) - - return err -} - -// export exports metric data m using r's exporter. -func (r *PeriodicReader) export(ctx context.Context, m *metricdata.ResourceMetrics) error { - return r.exporter.Export(ctx, m) -} - -// ForceFlush flushes pending telemetry. -// -// This method is safe to call concurrently. -func (r *PeriodicReader) ForceFlush(ctx context.Context) error { - // Prioritize the ctx timeout if it is set. - if _, ok := ctx.Deadline(); !ok { - var cancel context.CancelFunc - ctx, cancel = context.WithTimeout(ctx, r.timeout) - defer cancel() - } - - errCh := make(chan error, 1) - select { - case r.flushCh <- errCh: - select { - case err := <-errCh: - if err != nil { - return err - } - close(errCh) - case <-ctx.Done(): - return ctx.Err() - } - case <-r.done: - return ErrReaderShutdown - case <-ctx.Done(): - return ctx.Err() - } - return r.exporter.ForceFlush(ctx) -} - -// Shutdown flushes pending telemetry and then stops the export pipeline. -// -// This method is safe to call concurrently. -func (r *PeriodicReader) Shutdown(ctx context.Context) error { - err := ErrReaderShutdown - r.shutdownOnce.Do(func() { - // Prioritize the ctx timeout if it is set. - if _, ok := ctx.Deadline(); !ok { - var cancel context.CancelFunc - ctx, cancel = context.WithTimeout(ctx, r.timeout) - defer cancel() - } - - // Stop the run loop. - r.cancel() - <-r.done - - // Any future call to Collect will now return ErrReaderShutdown. - ph := r.sdkProducer.Swap(produceHolder{ - produce: shutdownProducer{}.produce, - }) - - if ph != nil { // Reader was registered. - // Flush pending telemetry. - m := r.rmPool.Get().(*metricdata.ResourceMetrics) - err = r.collect(ctx, ph, m) - if err == nil { - err = r.export(ctx, m) - } - r.rmPool.Put(m) - } - - sErr := r.exporter.Shutdown(ctx) - if err == nil || errors.Is(err, ErrReaderShutdown) { - err = sErr - } - - r.mu.Lock() - defer r.mu.Unlock() - r.isShutdown = true - // release references to Producer(s) - r.externalProducers.Store([]Producer{}) - }) - return err -} - -// MarshalLog returns logging data about the PeriodicReader. -func (r *PeriodicReader) MarshalLog() interface{} { - r.mu.Lock() - down := r.isShutdown - r.mu.Unlock() - return struct { - Type string - Exporter Exporter - Registered bool - Shutdown bool - Interval time.Duration - Timeout time.Duration - }{ - Type: "PeriodicReader", - Exporter: r.exporter, - Registered: r.sdkProducer.Load() != nil, - Shutdown: down, - Interval: r.interval, - Timeout: r.timeout, - } -} |