summaryrefslogtreecommitdiff
path: root/vendor/go.opentelemetry.io/otel/sdk/metric
diff options
context:
space:
mode:
Diffstat (limited to 'vendor/go.opentelemetry.io/otel/sdk/metric')
-rw-r--r--vendor/go.opentelemetry.io/otel/sdk/metric/LICENSE201
-rw-r--r--vendor/go.opentelemetry.io/otel/sdk/metric/aggregation.go201
-rw-r--r--vendor/go.opentelemetry.io/otel/sdk/metric/cache.go54
-rw-r--r--vendor/go.opentelemetry.io/otel/sdk/metric/config.go148
-rw-r--r--vendor/go.opentelemetry.io/otel/sdk/metric/doc.go47
-rw-r--r--vendor/go.opentelemetry.io/otel/sdk/metric/env.go50
-rw-r--r--vendor/go.opentelemetry.io/otel/sdk/metric/exporter.go88
-rw-r--r--vendor/go.opentelemetry.io/otel/sdk/metric/instrument.go348
-rw-r--r--vendor/go.opentelemetry.io/otel/sdk/metric/instrumentkind_string.go29
-rw-r--r--vendor/go.opentelemetry.io/otel/sdk/metric/internal/aggregate/aggregate.go133
-rw-r--r--vendor/go.opentelemetry.io/otel/sdk/metric/internal/aggregate/doc.go18
-rw-r--r--vendor/go.opentelemetry.io/otel/sdk/metric/internal/aggregate/exponential_histogram.go432
-rw-r--r--vendor/go.opentelemetry.io/otel/sdk/metric/internal/aggregate/histogram.go231
-rw-r--r--vendor/go.opentelemetry.io/otel/sdk/metric/internal/aggregate/lastvalue.go68
-rw-r--r--vendor/go.opentelemetry.io/otel/sdk/metric/internal/aggregate/sum.go222
-rw-r--r--vendor/go.opentelemetry.io/otel/sdk/metric/internal/reuse_slice.go24
-rw-r--r--vendor/go.opentelemetry.io/otel/sdk/metric/manual_reader.go214
-rw-r--r--vendor/go.opentelemetry.io/otel/sdk/metric/meter.go595
-rw-r--r--vendor/go.opentelemetry.io/otel/sdk/metric/metricdata/data.go293
-rw-r--r--vendor/go.opentelemetry.io/otel/sdk/metric/metricdata/temporality.go41
-rw-r--r--vendor/go.opentelemetry.io/otel/sdk/metric/metricdata/temporality_string.go25
-rw-r--r--vendor/go.opentelemetry.io/otel/sdk/metric/periodic_reader.go381
-rw-r--r--vendor/go.opentelemetry.io/otel/sdk/metric/pipeline.go656
-rw-r--r--vendor/go.opentelemetry.io/otel/sdk/metric/provider.go154
-rw-r--r--vendor/go.opentelemetry.io/otel/sdk/metric/reader.go200
-rw-r--r--vendor/go.opentelemetry.io/otel/sdk/metric/version.go20
-rw-r--r--vendor/go.opentelemetry.io/otel/sdk/metric/view.go128
27 files changed, 5001 insertions, 0 deletions
diff --git a/vendor/go.opentelemetry.io/otel/sdk/metric/LICENSE b/vendor/go.opentelemetry.io/otel/sdk/metric/LICENSE
new file mode 100644
index 000000000..261eeb9e9
--- /dev/null
+++ b/vendor/go.opentelemetry.io/otel/sdk/metric/LICENSE
@@ -0,0 +1,201 @@
+ Apache License
+ Version 2.0, January 2004
+ http://www.apache.org/licenses/
+
+ TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION
+
+ 1. Definitions.
+
+ "License" shall mean the terms and conditions for use, reproduction,
+ and distribution as defined by Sections 1 through 9 of this document.
+
+ "Licensor" shall mean the copyright owner or entity authorized by
+ the copyright owner that is granting the License.
+
+ "Legal Entity" shall mean the union of the acting entity and all
+ other entities that control, are controlled by, or are under common
+ control with that entity. For the purposes of this definition,
+ "control" means (i) the power, direct or indirect, to cause the
+ direction or management of such entity, whether by contract or
+ otherwise, or (ii) ownership of fifty percent (50%) or more of the
+ outstanding shares, or (iii) beneficial ownership of such entity.
+
+ "You" (or "Your") shall mean an individual or Legal Entity
+ exercising permissions granted by this License.
+
+ "Source" form shall mean the preferred form for making modifications,
+ including but not limited to software source code, documentation
+ source, and configuration files.
+
+ "Object" form shall mean any form resulting from mechanical
+ transformation or translation of a Source form, including but
+ not limited to compiled object code, generated documentation,
+ and conversions to other media types.
+
+ "Work" shall mean the work of authorship, whether in Source or
+ Object form, made available under the License, as indicated by a
+ copyright notice that is included in or attached to the work
+ (an example is provided in the Appendix below).
+
+ "Derivative Works" shall mean any work, whether in Source or Object
+ form, that is based on (or derived from) the Work and for which the
+ editorial revisions, annotations, elaborations, or other modifications
+ represent, as a whole, an original work of authorship. For the purposes
+ of this License, Derivative Works shall not include works that remain
+ separable from, or merely link (or bind by name) to the interfaces of,
+ the Work and Derivative Works thereof.
+
+ "Contribution" shall mean any work of authorship, including
+ the original version of the Work and any modifications or additions
+ to that Work or Derivative Works thereof, that is intentionally
+ submitted to Licensor for inclusion in the Work by the copyright owner
+ or by an individual or Legal Entity authorized to submit on behalf of
+ the copyright owner. For the purposes of this definition, "submitted"
+ means any form of electronic, verbal, or written communication sent
+ to the Licensor or its representatives, including but not limited to
+ communication on electronic mailing lists, source code control systems,
+ and issue tracking systems that are managed by, or on behalf of, the
+ Licensor for the purpose of discussing and improving the Work, but
+ excluding communication that is conspicuously marked or otherwise
+ designated in writing by the copyright owner as "Not a Contribution."
+
+ "Contributor" shall mean Licensor and any individual or Legal Entity
+ on behalf of whom a Contribution has been received by Licensor and
+ subsequently incorporated within the Work.
+
+ 2. Grant of Copyright License. Subject to the terms and conditions of
+ this License, each Contributor hereby grants to You a perpetual,
+ worldwide, non-exclusive, no-charge, royalty-free, irrevocable
+ copyright license to reproduce, prepare Derivative Works of,
+ publicly display, publicly perform, sublicense, and distribute the
+ Work and such Derivative Works in Source or Object form.
+
+ 3. Grant of Patent License. Subject to the terms and conditions of
+ this License, each Contributor hereby grants to You a perpetual,
+ worldwide, non-exclusive, no-charge, royalty-free, irrevocable
+ (except as stated in this section) patent license to make, have made,
+ use, offer to sell, sell, import, and otherwise transfer the Work,
+ where such license applies only to those patent claims licensable
+ by such Contributor that are necessarily infringed by their
+ Contribution(s) alone or by combination of their Contribution(s)
+ with the Work to which such Contribution(s) was submitted. If You
+ institute patent litigation against any entity (including a
+ cross-claim or counterclaim in a lawsuit) alleging that the Work
+ or a Contribution incorporated within the Work constitutes direct
+ or contributory patent infringement, then any patent licenses
+ granted to You under this License for that Work shall terminate
+ as of the date such litigation is filed.
+
+ 4. Redistribution. You may reproduce and distribute copies of the
+ Work or Derivative Works thereof in any medium, with or without
+ modifications, and in Source or Object form, provided that You
+ meet the following conditions:
+
+ (a) You must give any other recipients of the Work or
+ Derivative Works a copy of this License; and
+
+ (b) You must cause any modified files to carry prominent notices
+ stating that You changed the files; and
+
+ (c) You must retain, in the Source form of any Derivative Works
+ that You distribute, all copyright, patent, trademark, and
+ attribution notices from the Source form of the Work,
+ excluding those notices that do not pertain to any part of
+ the Derivative Works; and
+
+ (d) If the Work includes a "NOTICE" text file as part of its
+ distribution, then any Derivative Works that You distribute must
+ include a readable copy of the attribution notices contained
+ within such NOTICE file, excluding those notices that do not
+ pertain to any part of the Derivative Works, in at least one
+ of the following places: within a NOTICE text file distributed
+ as part of the Derivative Works; within the Source form or
+ documentation, if provided along with the Derivative Works; or,
+ within a display generated by the Derivative Works, if and
+ wherever such third-party notices normally appear. The contents
+ of the NOTICE file are for informational purposes only and
+ do not modify the License. You may add Your own attribution
+ notices within Derivative Works that You distribute, alongside
+ or as an addendum to the NOTICE text from the Work, provided
+ that such additional attribution notices cannot be construed
+ as modifying the License.
+
+ You may add Your own copyright statement to Your modifications and
+ may provide additional or different license terms and conditions
+ for use, reproduction, or distribution of Your modifications, or
+ for any such Derivative Works as a whole, provided Your use,
+ reproduction, and distribution of the Work otherwise complies with
+ the conditions stated in this License.
+
+ 5. Submission of Contributions. Unless You explicitly state otherwise,
+ any Contribution intentionally submitted for inclusion in the Work
+ by You to the Licensor shall be under the terms and conditions of
+ this License, without any additional terms or conditions.
+ Notwithstanding the above, nothing herein shall supersede or modify
+ the terms of any separate license agreement you may have executed
+ with Licensor regarding such Contributions.
+
+ 6. Trademarks. This License does not grant permission to use the trade
+ names, trademarks, service marks, or product names of the Licensor,
+ except as required for reasonable and customary use in describing the
+ origin of the Work and reproducing the content of the NOTICE file.
+
+ 7. Disclaimer of Warranty. Unless required by applicable law or
+ agreed to in writing, Licensor provides the Work (and each
+ Contributor provides its Contributions) on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ implied, including, without limitation, any warranties or conditions
+ of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A
+ PARTICULAR PURPOSE. You are solely responsible for determining the
+ appropriateness of using or redistributing the Work and assume any
+ risks associated with Your exercise of permissions under this License.
+
+ 8. Limitation of Liability. In no event and under no legal theory,
+ whether in tort (including negligence), contract, or otherwise,
+ unless required by applicable law (such as deliberate and grossly
+ negligent acts) or agreed to in writing, shall any Contributor be
+ liable to You for damages, including any direct, indirect, special,
+ incidental, or consequential damages of any character arising as a
+ result of this License or out of the use or inability to use the
+ Work (including but not limited to damages for loss of goodwill,
+ work stoppage, computer failure or malfunction, or any and all
+ other commercial damages or losses), even if such Contributor
+ has been advised of the possibility of such damages.
+
+ 9. Accepting Warranty or Additional Liability. While redistributing
+ the Work or Derivative Works thereof, You may choose to offer,
+ and charge a fee for, acceptance of support, warranty, indemnity,
+ or other liability obligations and/or rights consistent with this
+ License. However, in accepting such obligations, You may act only
+ on Your own behalf and on Your sole responsibility, not on behalf
+ of any other Contributor, and only if You agree to indemnify,
+ defend, and hold each Contributor harmless for any liability
+ incurred by, or claims asserted against, such Contributor by reason
+ of your accepting any such warranty or additional liability.
+
+ END OF TERMS AND CONDITIONS
+
+ APPENDIX: How to apply the Apache License to your work.
+
+ To apply the Apache License to your work, attach the following
+ boilerplate notice, with the fields enclosed by brackets "[]"
+ replaced with your own identifying information. (Don't include
+ the brackets!) The text should be enclosed in the appropriate
+ comment syntax for the file format. We also recommend that a
+ file or class name and description of purpose be included on the
+ same "printed page" as the copyright notice for easier
+ identification within third-party archives.
+
+ Copyright [yyyy] [name of copyright owner]
+
+ Licensed under the Apache License, Version 2.0 (the "License");
+ you may not use this file except in compliance with the License.
+ You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
diff --git a/vendor/go.opentelemetry.io/otel/sdk/metric/aggregation.go b/vendor/go.opentelemetry.io/otel/sdk/metric/aggregation.go
new file mode 100644
index 000000000..faddbb0b6
--- /dev/null
+++ b/vendor/go.opentelemetry.io/otel/sdk/metric/aggregation.go
@@ -0,0 +1,201 @@
+// Copyright The OpenTelemetry Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package metric // import "go.opentelemetry.io/otel/sdk/metric"
+
+import (
+ "errors"
+ "fmt"
+)
+
+// errAgg is wrapped by misconfigured aggregations.
+var errAgg = errors.New("aggregation")
+
+// Aggregation is the aggregation used to summarize recorded measurements.
+type Aggregation interface {
+ // copy returns a deep copy of the Aggregation.
+ copy() Aggregation
+
+ // err returns an error for any misconfigured Aggregation.
+ err() error
+}
+
+// AggregationDrop is an Aggregation that drops all recorded data.
+type AggregationDrop struct{} // AggregationDrop has no parameters.
+
+var _ Aggregation = AggregationDrop{}
+
+// copy returns a deep copy of d.
+func (d AggregationDrop) copy() Aggregation { return d }
+
+// err returns an error for any misconfiguration. A drop aggregation has no
+// parameters and cannot be misconfigured, therefore this always returns nil.
+func (AggregationDrop) err() error { return nil }
+
+// AggregationDefault is an Aggregation that uses the default instrument kind selection
+// mapping to select another Aggregation. A metric reader can be configured to
+// make an aggregation selection based on instrument kind that differs from
+// the default. This Aggregation ensures the default is used.
+//
+// See the [DefaultAggregationSelector] for information about the default
+// instrument kind selection mapping.
+type AggregationDefault struct{} // AggregationDefault has no parameters.
+
+var _ Aggregation = AggregationDefault{}
+
+// copy returns a deep copy of d.
+func (d AggregationDefault) copy() Aggregation { return d }
+
+// err returns an error for any misconfiguration. A default aggregation has no
+// parameters and cannot be misconfigured, therefore this always returns nil.
+func (AggregationDefault) err() error { return nil }
+
+// AggregationSum is an Aggregation that summarizes a set of measurements as their
+// arithmetic sum.
+type AggregationSum struct{} // AggregationSum has no parameters.
+
+var _ Aggregation = AggregationSum{}
+
+// copy returns a deep copy of s.
+func (s AggregationSum) copy() Aggregation { return s }
+
+// err returns an error for any misconfiguration. A sum aggregation has no
+// parameters and cannot be misconfigured, therefore this always returns nil.
+func (AggregationSum) err() error { return nil }
+
+// AggregationLastValue is an Aggregation that summarizes a set of measurements as the
+// last one made.
+type AggregationLastValue struct{} // AggregationLastValue has no parameters.
+
+var _ Aggregation = AggregationLastValue{}
+
+// copy returns a deep copy of l.
+func (l AggregationLastValue) copy() Aggregation { return l }
+
+// err returns an error for any misconfiguration. A last-value aggregation has
+// no parameters and cannot be misconfigured, therefore this always returns
+// nil.
+func (AggregationLastValue) err() error { return nil }
+
+// AggregationExplicitBucketHistogram is an Aggregation that summarizes a set of
+// measurements as an histogram with explicitly defined buckets.
+type AggregationExplicitBucketHistogram struct {
+ // Boundaries are the increasing bucket boundary values. Boundary values
+ // define bucket upper bounds. Buckets are exclusive of their lower
+ // boundary and inclusive of their upper bound (except at positive
+ // infinity). A measurement is defined to fall into the greatest-numbered
+ // bucket with a boundary that is greater than or equal to the
+ // measurement. As an example, boundaries defined as:
+ //
+ // []float64{0, 5, 10, 25, 50, 75, 100, 250, 500, 1000}
+ //
+ // Will define these buckets:
+ //
+ // (-∞, 0], (0, 5.0], (5.0, 10.0], (10.0, 25.0], (25.0, 50.0],
+ // (50.0, 75.0], (75.0, 100.0], (100.0, 250.0], (250.0, 500.0],
+ // (500.0, 1000.0], (1000.0, +∞)
+ Boundaries []float64
+ // NoMinMax indicates whether to not record the min and max of the
+ // distribution. By default, these extrema are recorded.
+ //
+ // Recording these extrema for cumulative data is expected to have little
+ // value, they will represent the entire life of the instrument instead of
+ // just the current collection cycle. It is recommended to set this to true
+ // for that type of data to avoid computing the low-value extrema.
+ NoMinMax bool
+}
+
+var _ Aggregation = AggregationExplicitBucketHistogram{}
+
+// errHist is returned by misconfigured ExplicitBucketHistograms.
+var errHist = fmt.Errorf("%w: explicit bucket histogram", errAgg)
+
+// err returns an error for any misconfiguration.
+func (h AggregationExplicitBucketHistogram) err() error {
+ if len(h.Boundaries) <= 1 {
+ return nil
+ }
+
+ // Check boundaries are monotonic.
+ i := h.Boundaries[0]
+ for _, j := range h.Boundaries[1:] {
+ if i >= j {
+ return fmt.Errorf("%w: non-monotonic boundaries: %v", errHist, h.Boundaries)
+ }
+ i = j
+ }
+
+ return nil
+}
+
+// copy returns a deep copy of h.
+func (h AggregationExplicitBucketHistogram) copy() Aggregation {
+ b := make([]float64, len(h.Boundaries))
+ copy(b, h.Boundaries)
+ return AggregationExplicitBucketHistogram{
+ Boundaries: b,
+ NoMinMax: h.NoMinMax,
+ }
+}
+
+// AggregationBase2ExponentialHistogram is an Aggregation that summarizes a set of
+// measurements as an histogram with bucket widths that grow exponentially.
+type AggregationBase2ExponentialHistogram struct {
+ // MaxSize is the maximum number of buckets to use for the histogram.
+ MaxSize int32
+ // MaxScale is the maximum resolution scale to use for the histogram.
+ //
+ // MaxScale has a maximum value of 20. Using a value of 20 means the
+ // maximum number of buckets that can fit within the range of a
+ // signed 32-bit integer index could be used.
+ //
+ // MaxScale has a minimum value of -10. Using a value of -10 means only
+ // two buckets will be used.
+ MaxScale int32
+
+ // NoMinMax indicates whether to not record the min and max of the
+ // distribution. By default, these extrema are recorded.
+ //
+ // Recording these extrema for cumulative data is expected to have little
+ // value, they will represent the entire life of the instrument instead of
+ // just the current collection cycle. It is recommended to set this to true
+ // for that type of data to avoid computing the low-value extrema.
+ NoMinMax bool
+}
+
+var _ Aggregation = AggregationBase2ExponentialHistogram{}
+
+// copy returns a deep copy of the Aggregation.
+func (e AggregationBase2ExponentialHistogram) copy() Aggregation {
+ return e
+}
+
+const (
+ expoMaxScale = 20
+ expoMinScale = -10
+)
+
+// errExpoHist is returned by misconfigured Base2ExponentialBucketHistograms.
+var errExpoHist = fmt.Errorf("%w: exponential histogram", errAgg)
+
+// err returns an error for any misconfigured Aggregation.
+func (e AggregationBase2ExponentialHistogram) err() error {
+ if e.MaxScale > expoMaxScale {
+ return fmt.Errorf("%w: max size %d is greater than maximum scale %d", errExpoHist, e.MaxSize, expoMaxScale)
+ }
+ if e.MaxSize <= 0 {
+ return fmt.Errorf("%w: max size %d is less than or equal to zero", errExpoHist, e.MaxSize)
+ }
+ return nil
+}
diff --git a/vendor/go.opentelemetry.io/otel/sdk/metric/cache.go b/vendor/go.opentelemetry.io/otel/sdk/metric/cache.go
new file mode 100644
index 000000000..de9d6f001
--- /dev/null
+++ b/vendor/go.opentelemetry.io/otel/sdk/metric/cache.go
@@ -0,0 +1,54 @@
+// Copyright The OpenTelemetry Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package metric // import "go.opentelemetry.io/otel/sdk/metric"
+
+import (
+ "sync"
+)
+
+// cache is a locking storage used to quickly return already computed values.
+//
+// The zero value of a cache is empty and ready to use.
+//
+// A cache must not be copied after first use.
+//
+// All methods of a cache are safe to call concurrently.
+type cache[K comparable, V any] struct {
+ sync.Mutex
+ data map[K]V
+}
+
+// Lookup returns the value stored in the cache with the associated key if it
+// exists. Otherwise, f is called and its returned value is set in the cache
+// for key and returned.
+//
+// Lookup is safe to call concurrently. It will hold the cache lock, so f
+// should not block excessively.
+func (c *cache[K, V]) Lookup(key K, f func() V) V {
+ c.Lock()
+ defer c.Unlock()
+
+ if c.data == nil {
+ val := f()
+ c.data = map[K]V{key: val}
+ return val
+ }
+ if v, ok := c.data[key]; ok {
+ return v
+ }
+ val := f()
+ c.data[key] = val
+ return val
+}
diff --git a/vendor/go.opentelemetry.io/otel/sdk/metric/config.go b/vendor/go.opentelemetry.io/otel/sdk/metric/config.go
new file mode 100644
index 000000000..0b1911284
--- /dev/null
+++ b/vendor/go.opentelemetry.io/otel/sdk/metric/config.go
@@ -0,0 +1,148 @@
+// Copyright The OpenTelemetry Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package metric // import "go.opentelemetry.io/otel/sdk/metric"
+
+import (
+ "context"
+ "fmt"
+ "sync"
+
+ "go.opentelemetry.io/otel/sdk/resource"
+)
+
+// config contains configuration options for a MeterProvider.
+type config struct {
+ res *resource.Resource
+ readers []Reader
+ views []View
+}
+
+// readerSignals returns a force-flush and shutdown function for a
+// MeterProvider to call in their respective options. All Readers c contains
+// will have their force-flush and shutdown methods unified into returned
+// single functions.
+func (c config) readerSignals() (forceFlush, shutdown func(context.Context) error) {
+ var fFuncs, sFuncs []func(context.Context) error
+ for _, r := range c.readers {
+ sFuncs = append(sFuncs, r.Shutdown)
+ if f, ok := r.(interface{ ForceFlush(context.Context) error }); ok {
+ fFuncs = append(fFuncs, f.ForceFlush)
+ }
+ }
+
+ return unify(fFuncs), unifyShutdown(sFuncs)
+}
+
+// unify unifies calling all of funcs into a single function call. All errors
+// returned from calls to funcs will be unify into a single error return
+// value.
+func unify(funcs []func(context.Context) error) func(context.Context) error {
+ return func(ctx context.Context) error {
+ var errs []error
+ for _, f := range funcs {
+ if err := f(ctx); err != nil {
+ errs = append(errs, err)
+ }
+ }
+ return unifyErrors(errs)
+ }
+}
+
+// unifyErrors combines multiple errors into a single error.
+func unifyErrors(errs []error) error {
+ switch len(errs) {
+ case 0:
+ return nil
+ case 1:
+ return errs[0]
+ default:
+ return fmt.Errorf("%v", errs)
+ }
+}
+
+// unifyShutdown unifies calling all of funcs once for a shutdown. If called
+// more than once, an ErrReaderShutdown error is returned.
+func unifyShutdown(funcs []func(context.Context) error) func(context.Context) error {
+ f := unify(funcs)
+ var once sync.Once
+ return func(ctx context.Context) error {
+ err := ErrReaderShutdown
+ once.Do(func() { err = f(ctx) })
+ return err
+ }
+}
+
+// newConfig returns a config configured with options.
+func newConfig(options []Option) config {
+ conf := config{res: resource.Default()}
+ for _, o := range options {
+ conf = o.apply(conf)
+ }
+ return conf
+}
+
+// Option applies a configuration option value to a MeterProvider.
+type Option interface {
+ apply(config) config
+}
+
+// optionFunc applies a set of options to a config.
+type optionFunc func(config) config
+
+// apply returns a config with option(s) applied.
+func (o optionFunc) apply(conf config) config {
+ return o(conf)
+}
+
+// WithResource associates a Resource with a MeterProvider. This Resource
+// represents the entity producing telemetry and is associated with all Meters
+// the MeterProvider will create.
+//
+// By default, if this Option is not used, the default Resource from the
+// go.opentelemetry.io/otel/sdk/resource package will be used.
+func WithResource(res *resource.Resource) Option {
+ return optionFunc(func(conf config) config {
+ conf.res = res
+ return conf
+ })
+}
+
+// WithReader associates Reader r with a MeterProvider.
+//
+// By default, if this option is not used, the MeterProvider will perform no
+// operations; no data will be exported without a Reader.
+func WithReader(r Reader) Option {
+ return optionFunc(func(cfg config) config {
+ if r == nil {
+ return cfg
+ }
+ cfg.readers = append(cfg.readers, r)
+ return cfg
+ })
+}
+
+// WithView associates views a MeterProvider.
+//
+// Views are appended to existing ones in a MeterProvider if this option is
+// used multiple times.
+//
+// By default, if this option is not used, the MeterProvider will use the
+// default view.
+func WithView(views ...View) Option {
+ return optionFunc(func(cfg config) config {
+ cfg.views = append(cfg.views, views...)
+ return cfg
+ })
+}
diff --git a/vendor/go.opentelemetry.io/otel/sdk/metric/doc.go b/vendor/go.opentelemetry.io/otel/sdk/metric/doc.go
new file mode 100644
index 000000000..53f80c428
--- /dev/null
+++ b/vendor/go.opentelemetry.io/otel/sdk/metric/doc.go
@@ -0,0 +1,47 @@
+// Copyright The OpenTelemetry Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// Package metric provides an implementation of the OpenTelemetry metrics SDK.
+//
+// See https://opentelemetry.io/docs/concepts/signals/metrics/ for information
+// about the concept of OpenTelemetry metrics and
+// https://opentelemetry.io/docs/concepts/components/ for more information
+// about OpenTelemetry SDKs.
+//
+// The entry point for the metric package is the MeterProvider. It is the
+// object that all API calls use to create Meters, instruments, and ultimately
+// make metric measurements. Also, it is an object that should be used to
+// control the life-cycle (start, flush, and shutdown) of the SDK.
+//
+// A MeterProvider needs to be configured to export the measured data, this is
+// done by configuring it with a Reader implementation (using the WithReader
+// MeterProviderOption). Readers take two forms: ones that push to an endpoint
+// (NewPeriodicReader), and ones that an endpoint pulls from. See
+// [go.opentelemetry.io/otel/exporters] for exporters that can be used as
+// or with these Readers.
+//
+// Each Reader, when registered with the MeterProvider, can be augmented with a
+// View. Views allow users that run OpenTelemetry instrumented code to modify
+// the generated data of that instrumentation.
+//
+// The data generated by a MeterProvider needs to include information about its
+// origin. A MeterProvider needs to be configured with a Resource, using the
+// WithResource MeterProviderOption, to include this information. This Resource
+// should be used to describe the unique runtime environment instrumented code
+// is being run on. That way when multiple instances of the code are collected
+// at a single endpoint their origin is decipherable.
+//
+// See [go.opentelemetry.io/otel/metric] for more information about
+// the metric API.
+package metric // import "go.opentelemetry.io/otel/sdk/metric"
diff --git a/vendor/go.opentelemetry.io/otel/sdk/metric/env.go b/vendor/go.opentelemetry.io/otel/sdk/metric/env.go
new file mode 100644
index 000000000..940ba8159
--- /dev/null
+++ b/vendor/go.opentelemetry.io/otel/sdk/metric/env.go
@@ -0,0 +1,50 @@
+// Copyright The OpenTelemetry Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package metric // import "go.opentelemetry.io/otel/sdk/metric"
+
+import (
+ "os"
+ "strconv"
+ "time"
+
+ "go.opentelemetry.io/otel/internal/global"
+)
+
+// Environment variable names.
+const (
+ // The time interval (in milliseconds) between the start of two export attempts.
+ envInterval = "OTEL_METRIC_EXPORT_INTERVAL"
+ // Maximum allowed time (in milliseconds) to export data.
+ envTimeout = "OTEL_METRIC_EXPORT_TIMEOUT"
+)
+
+// envDuration returns an environment variable's value as duration in milliseconds if it is exists,
+// or the defaultValue if the environment variable is not defined or the value is not valid.
+func envDuration(key string, defaultValue time.Duration) time.Duration {
+ v := os.Getenv(key)
+ if v == "" {
+ return defaultValue
+ }
+ d, err := strconv.Atoi(v)
+ if err != nil {
+ global.Error(err, "parse duration", "environment variable", key, "value", v)
+ return defaultValue
+ }
+ if d <= 0 {
+ global.Error(errNonPositiveDuration, "non-positive duration", "environment variable", key, "value", v)
+ return defaultValue
+ }
+ return time.Duration(d) * time.Millisecond
+}
diff --git a/vendor/go.opentelemetry.io/otel/sdk/metric/exporter.go b/vendor/go.opentelemetry.io/otel/sdk/metric/exporter.go
new file mode 100644
index 000000000..da8941b37
--- /dev/null
+++ b/vendor/go.opentelemetry.io/otel/sdk/metric/exporter.go
@@ -0,0 +1,88 @@
+// Copyright The OpenTelemetry Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package metric // import "go.opentelemetry.io/otel/sdk/metric"
+
+import (
+ "context"
+ "fmt"
+
+ "go.opentelemetry.io/otel/sdk/metric/metricdata"
+)
+
+// ErrExporterShutdown is returned if Export or Shutdown are called after an
+// Exporter has been Shutdown.
+var ErrExporterShutdown = fmt.Errorf("exporter is shutdown")
+
+// Exporter handles the delivery of metric data to external receivers. This is
+// the final component in the metric push pipeline.
+type Exporter interface {
+ // Temporality returns the Temporality to use for an instrument kind.
+ //
+ // This method needs to be concurrent safe with itself and all the other
+ // Exporter methods.
+ Temporality(InstrumentKind) metricdata.Temporality
+ // DO NOT CHANGE: any modification will not be backwards compatible and
+ // must never be done outside of a new major release.
+
+ // Aggregation returns the Aggregation to use for an instrument kind.
+ //
+ // This method needs to be concurrent safe with itself and all the other
+ // Exporter methods.
+ Aggregation(InstrumentKind) Aggregation
+ // DO NOT CHANGE: any modification will not be backwards compatible and
+ // must never be done outside of a new major release.
+
+ // Export serializes and transmits metric data to a receiver.
+ //
+ // This is called synchronously, there is no concurrency safety
+ // requirement. Because of this, it is critical that all timeouts and
+ // cancellations of the passed context be honored.
+ //
+ // All retry logic must be contained in this function. The SDK does not
+ // implement any retry logic. All errors returned by this function are
+ // considered unrecoverable and will be reported to a configured error
+ // Handler.
+ //
+ // The passed ResourceMetrics may be reused when the call completes. If an
+ // exporter needs to hold this data after it returns, it needs to make a
+ // copy.
+ Export(context.Context, *metricdata.ResourceMetrics) error
+ // DO NOT CHANGE: any modification will not be backwards compatible and
+ // must never be done outside of a new major release.
+
+ // ForceFlush flushes any metric data held by an exporter.
+ //
+ // The deadline or cancellation of the passed context must be honored. An
+ // appropriate error should be returned in these situations.
+ //
+ // This method needs to be concurrent safe.
+ ForceFlush(context.Context) error
+ // DO NOT CHANGE: any modification will not be backwards compatible and
+ // must never be done outside of a new major release.
+
+ // Shutdown flushes all metric data held by an exporter and releases any
+ // held computational resources.
+ //
+ // The deadline or cancellation of the passed context must be honored. An
+ // appropriate error should be returned in these situations.
+ //
+ // After Shutdown is called, calls to Export will perform no operation and
+ // instead will return an error indicating the shutdown state.
+ //
+ // This method needs to be concurrent safe.
+ Shutdown(context.Context) error
+ // DO NOT CHANGE: any modification will not be backwards compatible and
+ // must never be done outside of a new major release.
+}
diff --git a/vendor/go.opentelemetry.io/otel/sdk/metric/instrument.go b/vendor/go.opentelemetry.io/otel/sdk/metric/instrument.go
new file mode 100644
index 000000000..bb52f6ec7
--- /dev/null
+++ b/vendor/go.opentelemetry.io/otel/sdk/metric/instrument.go
@@ -0,0 +1,348 @@
+// Copyright The OpenTelemetry Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+//go:generate stringer -type=InstrumentKind -trimprefix=InstrumentKind
+
+package metric // import "go.opentelemetry.io/otel/sdk/metric"
+
+import (
+ "context"
+ "errors"
+ "fmt"
+ "strings"
+
+ "go.opentelemetry.io/otel/attribute"
+ "go.opentelemetry.io/otel/metric"
+ "go.opentelemetry.io/otel/metric/embedded"
+ "go.opentelemetry.io/otel/sdk/instrumentation"
+ "go.opentelemetry.io/otel/sdk/metric/internal/aggregate"
+)
+
+var (
+ zeroInstrumentKind InstrumentKind
+ zeroScope instrumentation.Scope
+)
+
+// InstrumentKind is the identifier of a group of instruments that all
+// performing the same function.
+type InstrumentKind uint8
+
+const (
+ // instrumentKindUndefined is an undefined instrument kind, it should not
+ // be used by any initialized type.
+ instrumentKindUndefined InstrumentKind = iota // nolint:deadcode,varcheck,unused
+ // InstrumentKindCounter identifies a group of instruments that record
+ // increasing values synchronously with the code path they are measuring.
+ InstrumentKindCounter
+ // InstrumentKindUpDownCounter identifies a group of instruments that
+ // record increasing and decreasing values synchronously with the code path
+ // they are measuring.
+ InstrumentKindUpDownCounter
+ // InstrumentKindHistogram identifies a group of instruments that record a
+ // distribution of values synchronously with the code path they are
+ // measuring.
+ InstrumentKindHistogram
+ // InstrumentKindObservableCounter identifies a group of instruments that
+ // record increasing values in an asynchronous callback.
+ InstrumentKindObservableCounter
+ // InstrumentKindObservableUpDownCounter identifies a group of instruments
+ // that record increasing and decreasing values in an asynchronous
+ // callback.
+ InstrumentKindObservableUpDownCounter
+ // InstrumentKindObservableGauge identifies a group of instruments that
+ // record current values in an asynchronous callback.
+ InstrumentKindObservableGauge
+)
+
+type nonComparable [0]func() // nolint: unused // This is indeed used.
+
+// Instrument describes properties an instrument is created with.
+type Instrument struct {
+ // Name is the human-readable identifier of the instrument.
+ Name string
+ // Description describes the purpose of the instrument.
+ Description string
+ // Kind defines the functional group of the instrument.
+ Kind InstrumentKind
+ // Unit is the unit of measurement recorded by the instrument.
+ Unit string
+ // Scope identifies the instrumentation that created the instrument.
+ Scope instrumentation.Scope
+
+ // Ensure forward compatibility if non-comparable fields need to be added.
+ nonComparable // nolint: unused
+}
+
+// empty returns if all fields of i are their zero-value.
+func (i Instrument) empty() bool {
+ return i.Name == "" &&
+ i.Description == "" &&
+ i.Kind == zeroInstrumentKind &&
+ i.Unit == "" &&
+ i.Scope == zeroScope
+}
+
+// matches returns whether all the non-zero-value fields of i match the
+// corresponding fields of other. If i is empty it will match all other, and
+// true will always be returned.
+func (i Instrument) matches(other Instrument) bool {
+ return i.matchesName(other) &&
+ i.matchesDescription(other) &&
+ i.matchesKind(other) &&
+ i.matchesUnit(other) &&
+ i.matchesScope(other)
+}
+
+// matchesName returns true if the Name of i is "" or it equals the Name of
+// other, otherwise false.
+func (i Instrument) matchesName(other Instrument) bool {
+ return i.Name == "" || i.Name == other.Name
+}
+
+// matchesDescription returns true if the Description of i is "" or it equals
+// the Description of other, otherwise false.
+func (i Instrument) matchesDescription(other Instrument) bool {
+ return i.Description == "" || i.Description == other.Description
+}
+
+// matchesKind returns true if the Kind of i is its zero-value or it equals the
+// Kind of other, otherwise false.
+func (i Instrument) matchesKind(other Instrument) bool {
+ return i.Kind == zeroInstrumentKind || i.Kind == other.Kind
+}
+
+// matchesUnit returns true if the Unit of i is its zero-value or it equals the
+// Unit of other, otherwise false.
+func (i Instrument) matchesUnit(other Instrument) bool {
+ return i.Unit == "" || i.Unit == other.Unit
+}
+
+// matchesScope returns true if the Scope of i is its zero-value or it equals
+// the Scope of other, otherwise false.
+func (i Instrument) matchesScope(other Instrument) bool {
+ return (i.Scope.Name == "" || i.Scope.Name == other.Scope.Name) &&
+ (i.Scope.Version == "" || i.Scope.Version == other.Scope.Version) &&
+ (i.Scope.SchemaURL == "" || i.Scope.SchemaURL == other.Scope.SchemaURL)
+}
+
+// Stream describes the stream of data an instrument produces.
+type Stream struct {
+ // Name is the human-readable identifier of the stream.
+ Name string
+ // Description describes the purpose of the data.
+ Description string
+ // Unit is the unit of measurement recorded.
+ Unit string
+ // Aggregation the stream uses for an instrument.
+ Aggregation Aggregation
+ // AttributeFilter is an attribute Filter applied to the attributes
+ // recorded for an instrument's measurement. If the filter returns false
+ // the attribute will not be recorded, otherwise, if it returns true, it
+ // will record the attribute.
+ //
+ // Use NewAllowKeysFilter from "go.opentelemetry.io/otel/attribute" to
+ // provide an allow-list of attribute keys here.
+ AttributeFilter attribute.Filter
+}
+
+// instID are the identifying properties of a instrument.
+type instID struct {
+ // Name is the name of the stream.
+ Name string
+ // Description is the description of the stream.
+ Description string
+ // Kind defines the functional group of the instrument.
+ Kind InstrumentKind
+ // Unit is the unit of the stream.
+ Unit string
+ // Number is the number type of the stream.
+ Number string
+}
+
+// Returns a normalized copy of the instID i.
+//
+// Instrument names are considered case-insensitive. Standardize the instrument
+// name to always be lowercase for the returned instID so it can be compared
+// without the name casing affecting the comparison.
+func (i instID) normalize() instID {
+ i.Name = strings.ToLower(i.Name)
+ return i
+}
+
+type int64Inst struct {
+ measures []aggregate.Measure[int64]
+
+ embedded.Int64Counter
+ embedded.Int64UpDownCounter
+ embedded.Int64Histogram
+}
+
+var (
+ _ metric.Int64Counter = (*int64Inst)(nil)
+ _ metric.Int64UpDownCounter = (*int64Inst)(nil)
+ _ metric.Int64Histogram = (*int64Inst)(nil)
+)
+
+func (i *int64Inst) Add(ctx context.Context, val int64, opts ...metric.AddOption) {
+ c := metric.NewAddConfig(opts)
+ i.aggregate(ctx, val, c.Attributes())
+}
+
+func (i *int64Inst) Record(ctx context.Context, val int64, opts ...metric.RecordOption) {
+ c := metric.NewRecordConfig(opts)
+ i.aggregate(ctx, val, c.Attributes())
+}
+
+func (i *int64Inst) aggregate(ctx context.Context, val int64, s attribute.Set) { // nolint:revive // okay to shadow pkg with method.
+ if err := ctx.Err(); err != nil {
+ return
+ }
+ for _, in := range i.measures {
+ in(ctx, val, s)
+ }
+}
+
+type float64Inst struct {
+ measures []aggregate.Measure[float64]
+
+ embedded.Float64Counter
+ embedded.Float64UpDownCounter
+ embedded.Float64Histogram
+}
+
+var (
+ _ metric.Float64Counter = (*float64Inst)(nil)
+ _ metric.Float64UpDownCounter = (*float64Inst)(nil)
+ _ metric.Float64Histogram = (*float64Inst)(nil)
+)
+
+func (i *float64Inst) Add(ctx context.Context, val float64, opts ...metric.AddOption) {
+ c := metric.NewAddConfig(opts)
+ i.aggregate(ctx, val, c.Attributes())
+}
+
+func (i *float64Inst) Record(ctx context.Context, val float64, opts ...metric.RecordOption) {
+ c := metric.NewRecordConfig(opts)
+ i.aggregate(ctx, val, c.Attributes())
+}
+
+func (i *float64Inst) aggregate(ctx context.Context, val float64, s attribute.Set) {
+ if err := ctx.Err(); err != nil {
+ return
+ }
+ for _, in := range i.measures {
+ in(ctx, val, s)
+ }
+}
+
+// observablID is a comparable unique identifier of an observable.
+type observablID[N int64 | float64] struct {
+ name string
+ description string
+ kind InstrumentKind
+ unit string
+ scope instrumentation.Scope
+}
+
+type float64Observable struct {
+ metric.Float64Observable
+ *observable[float64]
+
+ embedded.Float64ObservableCounter
+ embedded.Float64ObservableUpDownCounter
+ embedded.Float64ObservableGauge
+}
+
+var (
+ _ metric.Float64ObservableCounter = float64Observable{}
+ _ metric.Float64ObservableUpDownCounter = float64Observable{}
+ _ metric.Float64ObservableGauge = float64Observable{}
+)
+
+func newFloat64Observable(m *meter, kind InstrumentKind, name, desc, u string, meas []aggregate.Measure[float64]) float64Observable {
+ return float64Observable{
+ observable: newObservable(m, kind, name, desc, u, meas),
+ }
+}
+
+type int64Observable struct {
+ metric.Int64Observable
+ *observable[int64]
+
+ embedded.Int64ObservableCounter
+ embedded.Int64ObservableUpDownCounter
+ embedded.Int64ObservableGauge
+}
+
+var (
+ _ metric.Int64ObservableCounter = int64Observable{}
+ _ metric.Int64ObservableUpDownCounter = int64Observable{}
+ _ metric.Int64ObservableGauge = int64Observable{}
+)
+
+func newInt64Observable(m *meter, kind InstrumentKind, name, desc, u string, meas []aggregate.Measure[int64]) int64Observable {
+ return int64Observable{
+ observable: newObservable(m, kind, name, desc, u, meas),
+ }
+}
+
+type observable[N int64 | float64] struct {
+ metric.Observable
+ observablID[N]
+
+ meter *meter
+ measures []aggregate.Measure[N]
+}
+
+func newObservable[N int64 | float64](m *meter, kind InstrumentKind, name, desc, u string, meas []aggregate.Measure[N]) *observable[N] {
+ return &observable[N]{
+ observablID: observablID[N]{
+ name: name,
+ description: desc,
+ kind: kind,
+ unit: u,
+ scope: m.scope,
+ },
+ meter: m,
+ measures: meas,
+ }
+}
+
+// observe records the val for the set of attrs.
+func (o *observable[N]) observe(val N, s attribute.Set) {
+ for _, in := range o.measures {
+ in(context.Background(), val, s)
+ }
+}
+
+var errEmptyAgg = errors.New("no aggregators for observable instrument")
+
+// registerable returns an error if the observable o should not be registered,
+// and nil if it should. An errEmptyAgg error is returned if o is effectively a
+// no-op because it does not have any aggregators. Also, an error is returned
+// if scope defines a Meter other than the one o was created by.
+func (o *observable[N]) registerable(m *meter) error {
+ if len(o.measures) == 0 {
+ return errEmptyAgg
+ }
+ if m != o.meter {
+ return fmt.Errorf(
+ "invalid registration: observable %q from Meter %q, registered with Meter %q",
+ o.name,
+ o.scope.Name,
+ m.scope.Name,
+ )
+ }
+ return nil
+}
diff --git a/vendor/go.opentelemetry.io/otel/sdk/metric/instrumentkind_string.go b/vendor/go.opentelemetry.io/otel/sdk/metric/instrumentkind_string.go
new file mode 100644
index 000000000..d5f9e982c
--- /dev/null
+++ b/vendor/go.opentelemetry.io/otel/sdk/metric/instrumentkind_string.go
@@ -0,0 +1,29 @@
+// Code generated by "stringer -type=InstrumentKind -trimprefix=InstrumentKind"; DO NOT EDIT.
+
+package metric
+
+import "strconv"
+
+func _() {
+ // An "invalid array index" compiler error signifies that the constant values have changed.
+ // Re-run the stringer command to generate them again.
+ var x [1]struct{}
+ _ = x[instrumentKindUndefined-0]
+ _ = x[InstrumentKindCounter-1]
+ _ = x[InstrumentKindUpDownCounter-2]
+ _ = x[InstrumentKindHistogram-3]
+ _ = x[InstrumentKindObservableCounter-4]
+ _ = x[InstrumentKindObservableUpDownCounter-5]
+ _ = x[InstrumentKindObservableGauge-6]
+}
+
+const _InstrumentKind_name = "instrumentKindUndefinedCounterUpDownCounterHistogramObservableCounterObservableUpDownCounterObservableGauge"
+
+var _InstrumentKind_index = [...]uint8{0, 23, 30, 43, 52, 69, 92, 107}
+
+func (i InstrumentKind) String() string {
+ if i >= InstrumentKind(len(_InstrumentKind_index)-1) {
+ return "InstrumentKind(" + strconv.FormatInt(int64(i), 10) + ")"
+ }
+ return _InstrumentKind_name[_InstrumentKind_index[i]:_InstrumentKind_index[i+1]]
+}
diff --git a/vendor/go.opentelemetry.io/otel/sdk/metric/internal/aggregate/aggregate.go b/vendor/go.opentelemetry.io/otel/sdk/metric/internal/aggregate/aggregate.go
new file mode 100644
index 000000000..8dec14237
--- /dev/null
+++ b/vendor/go.opentelemetry.io/otel/sdk/metric/internal/aggregate/aggregate.go
@@ -0,0 +1,133 @@
+// Copyright The OpenTelemetry Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package aggregate // import "go.opentelemetry.io/otel/sdk/metric/internal/aggregate"
+
+import (
+ "context"
+ "time"
+
+ "go.opentelemetry.io/otel/attribute"
+ "go.opentelemetry.io/otel/sdk/metric/metricdata"
+)
+
+// now is used to return the current local time while allowing tests to
+// override the default time.Now function.
+var now = time.Now
+
+// Measure receives measurements to be aggregated.
+type Measure[N int64 | float64] func(context.Context, N, attribute.Set)
+
+// ComputeAggregation stores the aggregate of measurements into dest and
+// returns the number of aggregate data-points output.
+type ComputeAggregation func(dest *metricdata.Aggregation) int
+
+// Builder builds an aggregate function.
+type Builder[N int64 | float64] struct {
+ // Temporality is the temporality used for the returned aggregate function.
+ //
+ // If this is not provided a default of cumulative will be used (except for
+ // the last-value aggregate function where delta is the only appropriate
+ // temporality).
+ Temporality metricdata.Temporality
+ // Filter is the attribute filter the aggregate function will use on the
+ // input of measurements.
+ Filter attribute.Filter
+}
+
+func (b Builder[N]) filter(f Measure[N]) Measure[N] {
+ if b.Filter != nil {
+ fltr := b.Filter // Copy to make it immutable after assignment.
+ return func(ctx context.Context, n N, a attribute.Set) {
+ fAttr, _ := a.Filter(fltr)
+ f(ctx, n, fAttr)
+ }
+ }
+ return f
+}
+
+// LastValue returns a last-value aggregate function input and output.
+//
+// The Builder.Temporality is ignored and delta is use always.
+func (b Builder[N]) LastValue() (Measure[N], ComputeAggregation) {
+ // Delta temporality is the only temporality that makes semantic sense for
+ // a last-value aggregate.
+ lv := newLastValue[N]()
+
+ return b.filter(lv.measure), func(dest *metricdata.Aggregation) int {
+ // Ignore if dest is not a metricdata.Gauge. The chance for memory
+ // reuse of the DataPoints is missed (better luck next time).
+ gData, _ := (*dest).(metricdata.Gauge[N])
+ lv.computeAggregation(&gData.DataPoints)
+ *dest = gData
+
+ return len(gData.DataPoints)
+ }
+}
+
+// PrecomputedSum returns a sum aggregate function input and output. The
+// arguments passed to the input are expected to be the precomputed sum values.
+func (b Builder[N]) PrecomputedSum(monotonic bool) (Measure[N], ComputeAggregation) {
+ s := newPrecomputedSum[N](monotonic)
+ switch b.Temporality {
+ case metricdata.DeltaTemporality:
+ return b.filter(s.measure), s.delta
+ default:
+ return b.filter(s.measure), s.cumulative
+ }
+}
+
+// Sum returns a sum aggregate function input and output.
+func (b Builder[N]) Sum(monotonic bool) (Measure[N], ComputeAggregation) {
+ s := newSum[N](monotonic)
+ switch b.Temporality {
+ case metricdata.DeltaTemporality:
+ return b.filter(s.measure), s.delta
+ default:
+ return b.filter(s.measure), s.cumulative
+ }
+}
+
+// ExplicitBucketHistogram returns a histogram aggregate function input and
+// output.
+func (b Builder[N]) ExplicitBucketHistogram(boundaries []float64, noMinMax, noSum bool) (Measure[N], ComputeAggregation) {
+ h := newHistogram[N](boundaries, noMinMax, noSum)
+ switch b.Temporality {
+ case metricdata.DeltaTemporality:
+ return b.filter(h.measure), h.delta
+ default:
+ return b.filter(h.measure), h.cumulative
+ }
+}
+
+// ExponentialBucketHistogram returns a histogram aggregate function input and
+// output.
+func (b Builder[N]) ExponentialBucketHistogram(maxSize, maxScale int32, noMinMax, noSum bool) (Measure[N], ComputeAggregation) {
+ h := newExponentialHistogram[N](maxSize, maxScale, noMinMax, noSum)
+ switch b.Temporality {
+ case metricdata.DeltaTemporality:
+ return b.filter(h.measure), h.delta
+ default:
+ return b.filter(h.measure), h.cumulative
+ }
+}
+
+// reset ensures s has capacity and sets it length. If the capacity of s too
+// small, a new slice is returned with the specified capacity and length.
+func reset[T any](s []T, length, capacity int) []T {
+ if cap(s) < capacity {
+ return make([]T, length, capacity)
+ }
+ return s[:length]
+}
diff --git a/vendor/go.opentelemetry.io/otel/sdk/metric/internal/aggregate/doc.go b/vendor/go.opentelemetry.io/otel/sdk/metric/internal/aggregate/doc.go
new file mode 100644
index 000000000..e83a2693f
--- /dev/null
+++ b/vendor/go.opentelemetry.io/otel/sdk/metric/internal/aggregate/doc.go
@@ -0,0 +1,18 @@
+// Copyright The OpenTelemetry Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// Package aggregate provides aggregate types used compute aggregations and
+// cycle the state of metric measurements made by the SDK. These types and
+// functionality are meant only for internal SDK use.
+package aggregate // import "go.opentelemetry.io/otel/sdk/metric/internal/aggregate"
diff --git a/vendor/go.opentelemetry.io/otel/sdk/metric/internal/aggregate/exponential_histogram.go b/vendor/go.opentelemetry.io/otel/sdk/metric/internal/aggregate/exponential_histogram.go
new file mode 100644
index 000000000..98b7dc1e0
--- /dev/null
+++ b/vendor/go.opentelemetry.io/otel/sdk/metric/internal/aggregate/exponential_histogram.go
@@ -0,0 +1,432 @@
+// Copyright The OpenTelemetry Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package aggregate // import "go.opentelemetry.io/otel/sdk/metric/internal/aggregate"
+
+import (
+ "context"
+ "errors"
+ "math"
+ "sync"
+ "time"
+
+ "go.opentelemetry.io/otel"
+ "go.opentelemetry.io/otel/attribute"
+ "go.opentelemetry.io/otel/sdk/metric/metricdata"
+)
+
+const (
+ expoMaxScale = 20
+ expoMinScale = -10
+
+ smallestNonZeroNormalFloat64 = 0x1p-1022
+
+ // These redefine the Math constants with a type, so the compiler won't coerce
+ // them into an int on 32 bit platforms.
+ maxInt64 int64 = math.MaxInt64
+ minInt64 int64 = math.MinInt64
+)
+
+// expoHistogramDataPoint is a single data point in an exponential histogram.
+type expoHistogramDataPoint[N int64 | float64] struct {
+ count uint64
+ min N
+ max N
+ sum N
+
+ maxSize int
+ noMinMax bool
+ noSum bool
+
+ scale int
+
+ posBuckets expoBuckets
+ negBuckets expoBuckets
+ zeroCount uint64
+}
+
+func newExpoHistogramDataPoint[N int64 | float64](maxSize, maxScale int, noMinMax, noSum bool) *expoHistogramDataPoint[N] {
+ f := math.MaxFloat64
+ max := N(f) // if N is int64, max will overflow to -9223372036854775808
+ min := N(-f)
+ if N(maxInt64) > N(f) {
+ max = N(maxInt64)
+ min = N(minInt64)
+ }
+ return &expoHistogramDataPoint[N]{
+ min: max,
+ max: min,
+ maxSize: maxSize,
+ noMinMax: noMinMax,
+ noSum: noSum,
+ scale: maxScale,
+ }
+}
+
+// record adds a new measurement to the histogram. It will rescale the buckets if needed.
+func (p *expoHistogramDataPoint[N]) record(v N) {
+ p.count++
+
+ if !p.noMinMax {
+ if v < p.min {
+ p.min = v
+ }
+ if v > p.max {
+ p.max = v
+ }
+ }
+ if !p.noSum {
+ p.sum += v
+ }
+
+ absV := math.Abs(float64(v))
+
+ if float64(absV) == 0.0 {
+ p.zeroCount++
+ return
+ }
+
+ bin := p.getBin(absV)
+
+ bucket := &p.posBuckets
+ if v < 0 {
+ bucket = &p.negBuckets
+ }
+
+ // If the new bin would make the counts larger than maxScale, we need to
+ // downscale current measurements.
+ if scaleDelta := p.scaleChange(bin, bucket.startBin, len(bucket.counts)); scaleDelta > 0 {
+ if p.scale-scaleDelta < expoMinScale {
+ // With a scale of -10 there is only two buckets for the whole range of float64 values.
+ // This can only happen if there is a max size of 1.
+ otel.Handle(errors.New("exponential histogram scale underflow"))
+ return
+ }
+ // Downscale
+ p.scale -= scaleDelta
+ p.posBuckets.downscale(scaleDelta)
+ p.negBuckets.downscale(scaleDelta)
+
+ bin = p.getBin(absV)
+ }
+
+ bucket.record(bin)
+}
+
+// getBin returns the bin v should be recorded into.
+func (p *expoHistogramDataPoint[N]) getBin(v float64) int {
+ frac, exp := math.Frexp(v)
+ if p.scale <= 0 {
+ // Because of the choice of fraction is always 1 power of two higher than we want.
+ correction := 1
+ if frac == .5 {
+ // If v is an exact power of two the frac will be .5 and the exp
+ // will be one higher than we want.
+ correction = 2
+ }
+ return (exp - correction) >> (-p.scale)
+ }
+ return exp<<p.scale + int(math.Log(frac)*scaleFactors[p.scale]) - 1
+}
+
+// scaleFactors are constants used in calculating the logarithm index. They are
+// equivalent to 2^index/log(2).
+var scaleFactors = [21]float64{
+ math.Ldexp(math.Log2E, 0),
+ math.Ldexp(math.Log2E, 1),
+ math.Ldexp(math.Log2E, 2),
+ math.Ldexp(math.Log2E, 3),
+ math.Ldexp(math.Log2E, 4),
+ math.Ldexp(math.Log2E, 5),
+ math.Ldexp(math.Log2E, 6),
+ math.Ldexp(math.Log2E, 7),
+ math.Ldexp(math.Log2E, 8),
+ math.Ldexp(math.Log2E, 9),
+ math.Ldexp(math.Log2E, 10),
+ math.Ldexp(math.Log2E, 11),
+ math.Ldexp(math.Log2E, 12),
+ math.Ldexp(math.Log2E, 13),
+ math.Ldexp(math.Log2E, 14),
+ math.Ldexp(math.Log2E, 15),
+ math.Ldexp(math.Log2E, 16),
+ math.Ldexp(math.Log2E, 17),
+ math.Ldexp(math.Log2E, 18),
+ math.Ldexp(math.Log2E, 19),
+ math.Ldexp(math.Log2E, 20),
+}
+
+// scaleChange returns the magnitude of the scale change needed to fit bin in
+// the bucket. If no scale change is needed 0 is returned.
+func (p *expoHistogramDataPoint[N]) scaleChange(bin, startBin, length int) int {
+ if length == 0 {
+ // No need to rescale if there are no buckets.
+ return 0
+ }
+
+ low := startBin
+ high := bin
+ if startBin >= bin {
+ low = bin
+ high = startBin + length - 1
+ }
+
+ count := 0
+ for high-low >= p.maxSize {
+ low = low >> 1
+ high = high >> 1
+ count++
+ if count > expoMaxScale-expoMinScale {
+ return count
+ }
+ }
+ return count
+}
+
+// expoBuckets is a set of buckets in an exponential histogram.
+type expoBuckets struct {
+ startBin int
+ counts []uint64
+}
+
+// record increments the count for the given bin, and expands the buckets if needed.
+// Size changes must be done before calling this function.
+func (b *expoBuckets) record(bin int) {
+ if len(b.counts) == 0 {
+ b.counts = []uint64{1}
+ b.startBin = bin
+ return
+ }
+
+ endBin := b.startBin + len(b.counts) - 1
+
+ // if the new bin is inside the current range
+ if bin >= b.startBin && bin <= endBin {
+ b.counts[bin-b.startBin]++
+ return
+ }
+ // if the new bin is before the current start add spaces to the counts
+ if bin < b.startBin {
+ origLen := len(b.counts)
+ newLength := endBin - bin + 1
+ shift := b.startBin - bin
+
+ if newLength > cap(b.counts) {
+ b.counts = append(b.counts, make([]uint64, newLength-len(b.counts))...)
+ }
+
+ copy(b.counts[shift:origLen+shift], b.counts[:])
+ b.counts = b.counts[:newLength]
+ for i := 1; i < shift; i++ {
+ b.counts[i] = 0
+ }
+ b.startBin = bin
+ b.counts[0] = 1
+ return
+ }
+ // if the new is after the end add spaces to the end
+ if bin > endBin {
+ if bin-b.startBin < cap(b.counts) {
+ b.counts = b.counts[:bin-b.startBin+1]
+ for i := endBin + 1 - b.startBin; i < len(b.counts); i++ {
+ b.counts[i] = 0
+ }
+ b.counts[bin-b.startBin] = 1
+ return
+ }
+
+ end := make([]uint64, bin-b.startBin-len(b.counts)+1)
+ b.counts = append(b.counts, end...)
+ b.counts[bin-b.startBin] = 1
+ }
+}
+
+// downscale shrinks a bucket by a factor of 2*s. It will sum counts into the
+// correct lower resolution bucket.
+func (b *expoBuckets) downscale(delta int) {
+ // Example
+ // delta = 2
+ // Original offset: -6
+ // Counts: [ 3, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
+ // bins: -6 -5, -4, -3, -2, -1, 0, 1, 2, 3, 4
+ // new bins:-2, -2, -1, -1, -1, -1, 0, 0, 0, 0, 1
+ // new Offset: -2
+ // new Counts: [4, 14, 30, 10]
+
+ if len(b.counts) <= 1 || delta < 1 {
+ b.startBin = b.startBin >> delta
+ return
+ }
+
+ steps := 1 << delta
+ offset := b.startBin % steps
+ offset = (offset + steps) % steps // to make offset positive
+ for i := 1; i < len(b.counts); i++ {
+ idx := i + offset
+ if idx%steps == 0 {
+ b.counts[idx/steps] = b.counts[i]
+ continue
+ }
+ b.counts[idx/steps] += b.counts[i]
+ }
+
+ lastIdx := (len(b.counts) - 1 + offset) / steps
+ b.counts = b.counts[:lastIdx+1]
+ b.startBin = b.startBin >> delta
+}
+
+// newExponentialHistogram returns an Aggregator that summarizes a set of
+// measurements as an exponential histogram. Each histogram is scoped by attributes
+// and the aggregation cycle the measurements were made in.
+func newExponentialHistogram[N int64 | float64](maxSize, maxScale int32, noMinMax, noSum bool) *expoHistogram[N] {
+ return &expoHistogram[N]{
+ noSum: noSum,
+ noMinMax: noMinMax,
+ maxSize: int(maxSize),
+ maxScale: int(maxScale),
+
+ values: make(map[attribute.Set]*expoHistogramDataPoint[N]),
+
+ start: now(),
+ }
+}
+
+// expoHistogram summarizes a set of measurements as an histogram with exponentially
+// defined buckets.
+type expoHistogram[N int64 | float64] struct {
+ noSum bool
+ noMinMax bool
+ maxSize int
+ maxScale int
+
+ values map[attribute.Set]*expoHistogramDataPoint[N]
+ valuesMu sync.Mutex
+
+ start time.Time
+}
+
+func (e *expoHistogram[N]) measure(_ context.Context, value N, attr attribute.Set) {
+ // Ignore NaN and infinity.
+ if math.IsInf(float64(value), 0) || math.IsNaN(float64(value)) {
+ return
+ }
+
+ e.valuesMu.Lock()
+ defer e.valuesMu.Unlock()
+
+ v, ok := e.values[attr]
+ if !ok {
+ v = newExpoHistogramDataPoint[N](e.maxSize, e.maxScale, e.noMinMax, e.noSum)
+ e.values[attr] = v
+ }
+ v.record(value)
+}
+
+func (e *expoHistogram[N]) delta(dest *metricdata.Aggregation) int {
+ t := now()
+
+ // If *dest is not a metricdata.ExponentialHistogram, memory reuse is missed.
+ // In that case, use the zero-value h and hope for better alignment next cycle.
+ h, _ := (*dest).(metricdata.ExponentialHistogram[N])
+ h.Temporality = metricdata.DeltaTemporality
+
+ e.valuesMu.Lock()
+ defer e.valuesMu.Unlock()
+
+ n := len(e.values)
+ hDPts := reset(h.DataPoints, n, n)
+
+ var i int
+ for a, b := range e.values {
+ hDPts[i].Attributes = a
+ hDPts[i].StartTime = e.start
+ hDPts[i].Time = t
+ hDPts[i].Count = b.count
+ hDPts[i].Scale = int32(b.scale)
+ hDPts[i].ZeroCount = b.zeroCount
+ hDPts[i].ZeroThreshold = 0.0
+
+ hDPts[i].PositiveBucket.Offset = int32(b.posBuckets.startBin)
+ hDPts[i].PositiveBucket.Counts = reset(hDPts[i].PositiveBucket.Counts, len(b.posBuckets.counts), len(b.posBuckets.counts))
+ copy(hDPts[i].PositiveBucket.Counts, b.posBuckets.counts)
+
+ hDPts[i].NegativeBucket.Offset = int32(b.negBuckets.startBin)
+ hDPts[i].NegativeBucket.Counts = reset(hDPts[i].NegativeBucket.Counts, len(b.negBuckets.counts), len(b.negBuckets.counts))
+
+ if !e.noSum {
+ hDPts[i].Sum = b.sum
+ }
+ if !e.noMinMax {
+ hDPts[i].Min = metricdata.NewExtrema(b.min)
+ hDPts[i].Max = metricdata.NewExtrema(b.max)
+ }
+
+ delete(e.values, a)
+ i++
+ }
+ e.start = t
+ h.DataPoints = hDPts
+ *dest = h
+ return n
+}
+
+func (e *expoHistogram[N]) cumulative(dest *metricdata.Aggregation) int {
+ t := now()
+
+ // If *dest is not a metricdata.ExponentialHistogram, memory reuse is missed.
+ // In that case, use the zero-value h and hope for better alignment next cycle.
+ h, _ := (*dest).(metricdata.ExponentialHistogram[N])
+ h.Temporality = metricdata.CumulativeTemporality
+
+ e.valuesMu.Lock()
+ defer e.valuesMu.Unlock()
+
+ n := len(e.values)
+ hDPts := reset(h.DataPoints, n, n)
+
+ var i int
+ for a, b := range e.values {
+ hDPts[i].Attributes = a
+ hDPts[i].StartTime = e.start
+ hDPts[i].Time = t
+ hDPts[i].Count = b.count
+ hDPts[i].Scale = int32(b.scale)
+ hDPts[i].ZeroCount = b.zeroCount
+ hDPts[i].ZeroThreshold = 0.0
+
+ hDPts[i].PositiveBucket.Offset = int32(b.posBuckets.startBin)
+ hDPts[i].PositiveBucket.Counts = reset(hDPts[i].PositiveBucket.Counts, len(b.posBuckets.counts), len(b.posBuckets.counts))
+ copy(hDPts[i].PositiveBucket.Counts, b.posBuckets.counts)
+
+ hDPts[i].NegativeBucket.Offset = int32(b.negBuckets.startBin)
+ hDPts[i].NegativeBucket.Counts = reset(hDPts[i].NegativeBucket.Counts, len(b.negBuckets.counts), len(b.negBuckets.counts))
+
+ if !e.noSum {
+ hDPts[i].Sum = b.sum
+ }
+ if !e.noMinMax {
+ hDPts[i].Min = metricdata.NewExtrema(b.min)
+ hDPts[i].Max = metricdata.NewExtrema(b.max)
+ }
+
+ i++
+ // TODO (#3006): This will use an unbounded amount of memory if there
+ // are unbounded number of attribute sets being aggregated. Attribute
+ // sets that become "stale" need to be forgotten so this will not
+ // overload the system.
+ }
+
+ h.DataPoints = hDPts
+ *dest = h
+ return n
+}
diff --git a/vendor/go.opentelemetry.io/otel/sdk/metric/internal/aggregate/histogram.go b/vendor/go.opentelemetry.io/otel/sdk/metric/internal/aggregate/histogram.go
new file mode 100644
index 000000000..62ec51e1f
--- /dev/null
+++ b/vendor/go.opentelemetry.io/otel/sdk/metric/internal/aggregate/histogram.go
@@ -0,0 +1,231 @@
+// Copyright The OpenTelemetry Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package aggregate // import "go.opentelemetry.io/otel/sdk/metric/internal/aggregate"
+
+import (
+ "context"
+ "sort"
+ "sync"
+ "time"
+
+ "go.opentelemetry.io/otel/attribute"
+ "go.opentelemetry.io/otel/sdk/metric/metricdata"
+)
+
+type buckets[N int64 | float64] struct {
+ counts []uint64
+ count uint64
+ total N
+ min, max N
+}
+
+// newBuckets returns buckets with n bins.
+func newBuckets[N int64 | float64](n int) *buckets[N] {
+ return &buckets[N]{counts: make([]uint64, n)}
+}
+
+func (b *buckets[N]) sum(value N) { b.total += value }
+
+func (b *buckets[N]) bin(idx int, value N) {
+ b.counts[idx]++
+ b.count++
+ if value < b.min {
+ b.min = value
+ } else if value > b.max {
+ b.max = value
+ }
+}
+
+// histValues summarizes a set of measurements as an histValues with
+// explicitly defined buckets.
+type histValues[N int64 | float64] struct {
+ noSum bool
+ bounds []float64
+
+ values map[attribute.Set]*buckets[N]
+ valuesMu sync.Mutex
+}
+
+func newHistValues[N int64 | float64](bounds []float64, noSum bool) *histValues[N] {
+ // The responsibility of keeping all buckets correctly associated with the
+ // passed boundaries is ultimately this type's responsibility. Make a copy
+ // here so we can always guarantee this. Or, in the case of failure, have
+ // complete control over the fix.
+ b := make([]float64, len(bounds))
+ copy(b, bounds)
+ sort.Float64s(b)
+ return &histValues[N]{
+ noSum: noSum,
+ bounds: b,
+ values: make(map[attribute.Set]*buckets[N]),
+ }
+}
+
+// Aggregate records the measurement value, scoped by attr, and aggregates it
+// into a histogram.
+func (s *histValues[N]) measure(_ context.Context, value N, attr attribute.Set) {
+ // This search will return an index in the range [0, len(s.bounds)], where
+ // it will return len(s.bounds) if value is greater than the last element
+ // of s.bounds. This aligns with the buckets in that the length of buckets
+ // is len(s.bounds)+1, with the last bucket representing:
+ // (s.bounds[len(s.bounds)-1], +∞).
+ idx := sort.SearchFloat64s(s.bounds, float64(value))
+
+ s.valuesMu.Lock()
+ defer s.valuesMu.Unlock()
+
+ b, ok := s.values[attr]
+ if !ok {
+ // N+1 buckets. For example:
+ //
+ // bounds = [0, 5, 10]
+ //
+ // Then,
+ //
+ // buckets = (-∞, 0], (0, 5.0], (5.0, 10.0], (10.0, +∞)
+ b = newBuckets[N](len(s.bounds) + 1)
+ // Ensure min and max are recorded values (not zero), for new buckets.
+ b.min, b.max = value, value
+ s.values[attr] = b
+ }
+ b.bin(idx, value)
+ if !s.noSum {
+ b.sum(value)
+ }
+}
+
+// newHistogram returns an Aggregator that summarizes a set of measurements as
+// an histogram.
+func newHistogram[N int64 | float64](boundaries []float64, noMinMax, noSum bool) *histogram[N] {
+ return &histogram[N]{
+ histValues: newHistValues[N](boundaries, noSum),
+ noMinMax: noMinMax,
+ start: now(),
+ }
+}
+
+// histogram summarizes a set of measurements as an histogram with explicitly
+// defined buckets.
+type histogram[N int64 | float64] struct {
+ *histValues[N]
+
+ noMinMax bool
+ start time.Time
+}
+
+func (s *histogram[N]) delta(dest *metricdata.Aggregation) int {
+ t := now()
+
+ // If *dest is not a metricdata.Histogram, memory reuse is missed. In that
+ // case, use the zero-value h and hope for better alignment next cycle.
+ h, _ := (*dest).(metricdata.Histogram[N])
+ h.Temporality = metricdata.DeltaTemporality
+
+ s.valuesMu.Lock()
+ defer s.valuesMu.Unlock()
+
+ // Do not allow modification of our copy of bounds.
+ bounds := make([]float64, len(s.bounds))
+ copy(bounds, s.bounds)
+
+ n := len(s.values)
+ hDPts := reset(h.DataPoints, n, n)
+
+ var i int
+ for a, b := range s.values {
+ hDPts[i].Attributes = a
+ hDPts[i].StartTime = s.start
+ hDPts[i].Time = t
+ hDPts[i].Count = b.count
+ hDPts[i].Bounds = bounds
+ hDPts[i].BucketCounts = b.counts
+
+ if !s.noSum {
+ hDPts[i].Sum = b.total
+ }
+
+ if !s.noMinMax {
+ hDPts[i].Min = metricdata.NewExtrema(b.min)
+ hDPts[i].Max = metricdata.NewExtrema(b.max)
+ }
+
+ // Unused attribute sets do not report.
+ delete(s.values, a)
+ i++
+ }
+ // The delta collection cycle resets.
+ s.start = t
+
+ h.DataPoints = hDPts
+ *dest = h
+
+ return n
+}
+
+func (s *histogram[N]) cumulative(dest *metricdata.Aggregation) int {
+ t := now()
+
+ // If *dest is not a metricdata.Histogram, memory reuse is missed. In that
+ // case, use the zero-value h and hope for better alignment next cycle.
+ h, _ := (*dest).(metricdata.Histogram[N])
+ h.Temporality = metricdata.CumulativeTemporality
+
+ s.valuesMu.Lock()
+ defer s.valuesMu.Unlock()
+
+ // Do not allow modification of our copy of bounds.
+ bounds := make([]float64, len(s.bounds))
+ copy(bounds, s.bounds)
+
+ n := len(s.values)
+ hDPts := reset(h.DataPoints, n, n)
+
+ var i int
+ for a, b := range s.values {
+ // The HistogramDataPoint field values returned need to be copies of
+ // the buckets value as we will keep updating them.
+ //
+ // TODO (#3047): Making copies for bounds and counts incurs a large
+ // memory allocation footprint. Alternatives should be explored.
+ counts := make([]uint64, len(b.counts))
+ copy(counts, b.counts)
+
+ hDPts[i].Attributes = a
+ hDPts[i].StartTime = s.start
+ hDPts[i].Time = t
+ hDPts[i].Count = b.count
+ hDPts[i].Bounds = bounds
+ hDPts[i].BucketCounts = counts
+
+ if !s.noSum {
+ hDPts[i].Sum = b.total
+ }
+
+ if !s.noMinMax {
+ hDPts[i].Min = metricdata.NewExtrema(b.min)
+ hDPts[i].Max = metricdata.NewExtrema(b.max)
+ }
+ i++
+ // TODO (#3006): This will use an unbounded amount of memory if there
+ // are unbounded number of attribute sets being aggregated. Attribute
+ // sets that become "stale" need to be forgotten so this will not
+ // overload the system.
+ }
+
+ h.DataPoints = hDPts
+ *dest = h
+
+ return n
+}
diff --git a/vendor/go.opentelemetry.io/otel/sdk/metric/internal/aggregate/lastvalue.go b/vendor/go.opentelemetry.io/otel/sdk/metric/internal/aggregate/lastvalue.go
new file mode 100644
index 000000000..6af2d6061
--- /dev/null
+++ b/vendor/go.opentelemetry.io/otel/sdk/metric/internal/aggregate/lastvalue.go
@@ -0,0 +1,68 @@
+// Copyright The OpenTelemetry Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package aggregate // import "go.opentelemetry.io/otel/sdk/metric/internal/aggregate"
+
+import (
+ "context"
+ "sync"
+ "time"
+
+ "go.opentelemetry.io/otel/attribute"
+ "go.opentelemetry.io/otel/sdk/metric/metricdata"
+)
+
+// datapoint is timestamped measurement data.
+type datapoint[N int64 | float64] struct {
+ timestamp time.Time
+ value N
+}
+
+func newLastValue[N int64 | float64]() *lastValue[N] {
+ return &lastValue[N]{values: make(map[attribute.Set]datapoint[N])}
+}
+
+// lastValue summarizes a set of measurements as the last one made.
+type lastValue[N int64 | float64] struct {
+ sync.Mutex
+
+ values map[attribute.Set]datapoint[N]
+}
+
+func (s *lastValue[N]) measure(ctx context.Context, value N, attr attribute.Set) {
+ d := datapoint[N]{timestamp: now(), value: value}
+ s.Lock()
+ s.values[attr] = d
+ s.Unlock()
+}
+
+func (s *lastValue[N]) computeAggregation(dest *[]metricdata.DataPoint[N]) {
+ s.Lock()
+ defer s.Unlock()
+
+ n := len(s.values)
+ *dest = reset(*dest, n, n)
+
+ var i int
+ for a, v := range s.values {
+ (*dest)[i].Attributes = a
+ // The event time is the only meaningful timestamp, StartTime is
+ // ignored.
+ (*dest)[i].Time = v.timestamp
+ (*dest)[i].Value = v.value
+ // Do not report stale values.
+ delete(s.values, a)
+ i++
+ }
+}
diff --git a/vendor/go.opentelemetry.io/otel/sdk/metric/internal/aggregate/sum.go b/vendor/go.opentelemetry.io/otel/sdk/metric/internal/aggregate/sum.go
new file mode 100644
index 000000000..1e52ff0d1
--- /dev/null
+++ b/vendor/go.opentelemetry.io/otel/sdk/metric/internal/aggregate/sum.go
@@ -0,0 +1,222 @@
+// Copyright The OpenTelemetry Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package aggregate // import "go.opentelemetry.io/otel/sdk/metric/internal/aggregate"
+
+import (
+ "context"
+ "sync"
+ "time"
+
+ "go.opentelemetry.io/otel/attribute"
+ "go.opentelemetry.io/otel/sdk/metric/metricdata"
+)
+
+// valueMap is the storage for sums.
+type valueMap[N int64 | float64] struct {
+ sync.Mutex
+ values map[attribute.Set]N
+}
+
+func newValueMap[N int64 | float64]() *valueMap[N] {
+ return &valueMap[N]{values: make(map[attribute.Set]N)}
+}
+
+func (s *valueMap[N]) measure(_ context.Context, value N, attr attribute.Set) {
+ s.Lock()
+ s.values[attr] += value
+ s.Unlock()
+}
+
+// newSum returns an aggregator that summarizes a set of measurements as their
+// arithmetic sum. Each sum is scoped by attributes and the aggregation cycle
+// the measurements were made in.
+func newSum[N int64 | float64](monotonic bool) *sum[N] {
+ return &sum[N]{
+ valueMap: newValueMap[N](),
+ monotonic: monotonic,
+ start: now(),
+ }
+}
+
+// sum summarizes a set of measurements made as their arithmetic sum.
+type sum[N int64 | float64] struct {
+ *valueMap[N]
+
+ monotonic bool
+ start time.Time
+}
+
+func (s *sum[N]) delta(dest *metricdata.Aggregation) int {
+ t := now()
+
+ // If *dest is not a metricdata.Sum, memory reuse is missed. In that case,
+ // use the zero-value sData and hope for better alignment next cycle.
+ sData, _ := (*dest).(metricdata.Sum[N])
+ sData.Temporality = metricdata.DeltaTemporality
+ sData.IsMonotonic = s.monotonic
+
+ s.Lock()
+ defer s.Unlock()
+
+ n := len(s.values)
+ dPts := reset(sData.DataPoints, n, n)
+
+ var i int
+ for attr, value := range s.values {
+ dPts[i].Attributes = attr
+ dPts[i].StartTime = s.start
+ dPts[i].Time = t
+ dPts[i].Value = value
+ // Do not report stale values.
+ delete(s.values, attr)
+ i++
+ }
+ // The delta collection cycle resets.
+ s.start = t
+
+ sData.DataPoints = dPts
+ *dest = sData
+
+ return n
+}
+
+func (s *sum[N]) cumulative(dest *metricdata.Aggregation) int {
+ t := now()
+
+ // If *dest is not a metricdata.Sum, memory reuse is missed. In that case,
+ // use the zero-value sData and hope for better alignment next cycle.
+ sData, _ := (*dest).(metricdata.Sum[N])
+ sData.Temporality = metricdata.CumulativeTemporality
+ sData.IsMonotonic = s.monotonic
+
+ s.Lock()
+ defer s.Unlock()
+
+ n := len(s.values)
+ dPts := reset(sData.DataPoints, n, n)
+
+ var i int
+ for attr, value := range s.values {
+ dPts[i].Attributes = attr
+ dPts[i].StartTime = s.start
+ dPts[i].Time = t
+ dPts[i].Value = value
+ // TODO (#3006): This will use an unbounded amount of memory if there
+ // are unbounded number of attribute sets being aggregated. Attribute
+ // sets that become "stale" need to be forgotten so this will not
+ // overload the system.
+ i++
+ }
+
+ sData.DataPoints = dPts
+ *dest = sData
+
+ return n
+}
+
+// newPrecomputedSum returns an aggregator that summarizes a set of
+// observatrions as their arithmetic sum. Each sum is scoped by attributes and
+// the aggregation cycle the measurements were made in.
+func newPrecomputedSum[N int64 | float64](monotonic bool) *precomputedSum[N] {
+ return &precomputedSum[N]{
+ valueMap: newValueMap[N](),
+ monotonic: monotonic,
+ start: now(),
+ }
+}
+
+// precomputedSum summarizes a set of observatrions as their arithmetic sum.
+type precomputedSum[N int64 | float64] struct {
+ *valueMap[N]
+
+ monotonic bool
+ start time.Time
+
+ reported map[attribute.Set]N
+}
+
+func (s *precomputedSum[N]) delta(dest *metricdata.Aggregation) int {
+ t := now()
+ newReported := make(map[attribute.Set]N)
+
+ // If *dest is not a metricdata.Sum, memory reuse is missed. In that case,
+ // use the zero-value sData and hope for better alignment next cycle.
+ sData, _ := (*dest).(metricdata.Sum[N])
+ sData.Temporality = metricdata.DeltaTemporality
+ sData.IsMonotonic = s.monotonic
+
+ s.Lock()
+ defer s.Unlock()
+
+ n := len(s.values)
+ dPts := reset(sData.DataPoints, n, n)
+
+ var i int
+ for attr, value := range s.values {
+ delta := value - s.reported[attr]
+
+ dPts[i].Attributes = attr
+ dPts[i].StartTime = s.start
+ dPts[i].Time = t
+ dPts[i].Value = delta
+
+ newReported[attr] = value
+ // Unused attribute sets do not report.
+ delete(s.values, attr)
+ i++
+ }
+ // Unused attribute sets are forgotten.
+ s.reported = newReported
+ // The delta collection cycle resets.
+ s.start = t
+
+ sData.DataPoints = dPts
+ *dest = sData
+
+ return n
+}
+
+func (s *precomputedSum[N]) cumulative(dest *metricdata.Aggregation) int {
+ t := now()
+
+ // If *dest is not a metricdata.Sum, memory reuse is missed. In that case,
+ // use the zero-value sData and hope for better alignment next cycle.
+ sData, _ := (*dest).(metricdata.Sum[N])
+ sData.Temporality = metricdata.CumulativeTemporality
+ sData.IsMonotonic = s.monotonic
+
+ s.Lock()
+ defer s.Unlock()
+
+ n := len(s.values)
+ dPts := reset(sData.DataPoints, n, n)
+
+ var i int
+ for attr, value := range s.values {
+ dPts[i].Attributes = attr
+ dPts[i].StartTime = s.start
+ dPts[i].Time = t
+ dPts[i].Value = value
+
+ // Unused attribute sets do not report.
+ delete(s.values, attr)
+ i++
+ }
+
+ sData.DataPoints = dPts
+ *dest = sData
+
+ return n
+}
diff --git a/vendor/go.opentelemetry.io/otel/sdk/metric/internal/reuse_slice.go b/vendor/go.opentelemetry.io/otel/sdk/metric/internal/reuse_slice.go
new file mode 100644
index 000000000..9695492b0
--- /dev/null
+++ b/vendor/go.opentelemetry.io/otel/sdk/metric/internal/reuse_slice.go
@@ -0,0 +1,24 @@
+// Copyright The OpenTelemetry Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package internal // import "go.opentelemetry.io/otel/sdk/metric/internal"
+
+// ReuseSlice returns a zeroed view of slice if its capacity is greater than or
+// equal to n. Otherwise, it returns a new []T with capacity equal to n.
+func ReuseSlice[T any](slice []T, n int) []T {
+ if cap(slice) >= n {
+ return slice[:n]
+ }
+ return make([]T, n)
+}
diff --git a/vendor/go.opentelemetry.io/otel/sdk/metric/manual_reader.go b/vendor/go.opentelemetry.io/otel/sdk/metric/manual_reader.go
new file mode 100644
index 000000000..7d524de9e
--- /dev/null
+++ b/vendor/go.opentelemetry.io/otel/sdk/metric/manual_reader.go
@@ -0,0 +1,214 @@
+// Copyright The OpenTelemetry Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package metric // import "go.opentelemetry.io/otel/sdk/metric"
+
+import (
+ "context"
+ "errors"
+ "fmt"
+ "sync"
+ "sync/atomic"
+
+ "go.opentelemetry.io/otel/internal/global"
+ "go.opentelemetry.io/otel/sdk/metric/metricdata"
+)
+
+// ManualReader is a simple Reader that allows an application to
+// read metrics on demand.
+type ManualReader struct {
+ sdkProducer atomic.Value
+ shutdownOnce sync.Once
+
+ mu sync.Mutex
+ isShutdown bool
+ externalProducers atomic.Value
+
+ temporalitySelector TemporalitySelector
+ aggregationSelector AggregationSelector
+}
+
+// Compile time check the manualReader implements Reader and is comparable.
+var _ = map[Reader]struct{}{&ManualReader{}: {}}
+
+// NewManualReader returns a Reader which is directly called to collect metrics.
+func NewManualReader(opts ...ManualReaderOption) *ManualReader {
+ cfg := newManualReaderConfig(opts)
+ r := &ManualReader{
+ temporalitySelector: cfg.temporalitySelector,
+ aggregationSelector: cfg.aggregationSelector,
+ }
+ r.externalProducers.Store(cfg.producers)
+ return r
+}
+
+// register stores the sdkProducer which enables the caller
+// to read metrics from the SDK on demand.
+func (mr *ManualReader) register(p sdkProducer) {
+ // Only register once. If producer is already set, do nothing.
+ if !mr.sdkProducer.CompareAndSwap(nil, produceHolder{produce: p.produce}) {
+ msg := "did not register manual reader"
+ global.Error(errDuplicateRegister, msg)
+ }
+}
+
+// temporality reports the Temporality for the instrument kind provided.
+func (mr *ManualReader) temporality(kind InstrumentKind) metricdata.Temporality {
+ return mr.temporalitySelector(kind)
+}
+
+// aggregation returns what Aggregation to use for kind.
+func (mr *ManualReader) aggregation(kind InstrumentKind) Aggregation { // nolint:revive // import-shadow for method scoped by type.
+ return mr.aggregationSelector(kind)
+}
+
+// Shutdown closes any connections and frees any resources used by the reader.
+//
+// This method is safe to call concurrently.
+func (mr *ManualReader) Shutdown(context.Context) error {
+ err := ErrReaderShutdown
+ mr.shutdownOnce.Do(func() {
+ // Any future call to Collect will now return ErrReaderShutdown.
+ mr.sdkProducer.Store(produceHolder{
+ produce: shutdownProducer{}.produce,
+ })
+ mr.mu.Lock()
+ defer mr.mu.Unlock()
+ mr.isShutdown = true
+ // release references to Producer(s)
+ mr.externalProducers.Store([]Producer{})
+ err = nil
+ })
+ return err
+}
+
+// Collect gathers all metric data related to the Reader from
+// the SDK and other Producers and stores the result in rm.
+//
+// Collect will return an error if called after shutdown.
+// Collect will return an error if rm is a nil ResourceMetrics.
+// Collect will return an error if the context's Done channel is closed.
+//
+// This method is safe to call concurrently.
+func (mr *ManualReader) Collect(ctx context.Context, rm *metricdata.ResourceMetrics) error {
+ if rm == nil {
+ return errors.New("manual reader: *metricdata.ResourceMetrics is nil")
+ }
+ p := mr.sdkProducer.Load()
+ if p == nil {
+ return ErrReaderNotRegistered
+ }
+
+ ph, ok := p.(produceHolder)
+ if !ok {
+ // The atomic.Value is entirely in the periodicReader's control so
+ // this should never happen. In the unforeseen case that this does
+ // happen, return an error instead of panicking so a users code does
+ // not halt in the processes.
+ err := fmt.Errorf("manual reader: invalid producer: %T", p)
+ return err
+ }
+
+ err := ph.produce(ctx, rm)
+ if err != nil {
+ return err
+ }
+ var errs []error
+ for _, producer := range mr.externalProducers.Load().([]Producer) {
+ externalMetrics, err := producer.Produce(ctx)
+ if err != nil {
+ errs = append(errs, err)
+ }
+ rm.ScopeMetrics = append(rm.ScopeMetrics, externalMetrics...)
+ }
+
+ global.Debug("ManualReader collection", "Data", rm)
+
+ return unifyErrors(errs)
+}
+
+// MarshalLog returns logging data about the ManualReader.
+func (r *ManualReader) MarshalLog() interface{} {
+ r.mu.Lock()
+ down := r.isShutdown
+ r.mu.Unlock()
+ return struct {
+ Type string
+ Registered bool
+ Shutdown bool
+ }{
+ Type: "ManualReader",
+ Registered: r.sdkProducer.Load() != nil,
+ Shutdown: down,
+ }
+}
+
+// manualReaderConfig contains configuration options for a ManualReader.
+type manualReaderConfig struct {
+ temporalitySelector TemporalitySelector
+ aggregationSelector AggregationSelector
+ producers []Producer
+}
+
+// newManualReaderConfig returns a manualReaderConfig configured with options.
+func newManualReaderConfig(opts []ManualReaderOption) manualReaderConfig {
+ cfg := manualReaderConfig{
+ temporalitySelector: DefaultTemporalitySelector,
+ aggregationSelector: DefaultAggregationSelector,
+ }
+ for _, opt := range opts {
+ cfg = opt.applyManual(cfg)
+ }
+ return cfg
+}
+
+// ManualReaderOption applies a configuration option value to a ManualReader.
+type ManualReaderOption interface {
+ applyManual(manualReaderConfig) manualReaderConfig
+}
+
+// WithTemporalitySelector sets the TemporalitySelector a reader will use to
+// determine the Temporality of an instrument based on its kind. If this
+// option is not used, the reader will use the DefaultTemporalitySelector.
+func WithTemporalitySelector(selector TemporalitySelector) ManualReaderOption {
+ return temporalitySelectorOption{selector: selector}
+}
+
+type temporalitySelectorOption struct {
+ selector func(instrument InstrumentKind) metricdata.Temporality
+}
+
+// applyManual returns a manualReaderConfig with option applied.
+func (t temporalitySelectorOption) applyManual(mrc manualReaderConfig) manualReaderConfig {
+ mrc.temporalitySelector = t.selector
+ return mrc
+}
+
+// WithAggregationSelector sets the AggregationSelector a reader will use to
+// determine the aggregation to use for an instrument based on its kind. If
+// this option is not used, the reader will use the DefaultAggregationSelector
+// or the aggregation explicitly passed for a view matching an instrument.
+func WithAggregationSelector(selector AggregationSelector) ManualReaderOption {
+ return aggregationSelectorOption{selector: selector}
+}
+
+type aggregationSelectorOption struct {
+ selector AggregationSelector
+}
+
+// applyManual returns a manualReaderConfig with option applied.
+func (t aggregationSelectorOption) applyManual(c manualReaderConfig) manualReaderConfig {
+ c.aggregationSelector = t.selector
+ return c
+}
diff --git a/vendor/go.opentelemetry.io/otel/sdk/metric/meter.go b/vendor/go.opentelemetry.io/otel/sdk/metric/meter.go
new file mode 100644
index 000000000..7f51ec512
--- /dev/null
+++ b/vendor/go.opentelemetry.io/otel/sdk/metric/meter.go
@@ -0,0 +1,595 @@
+// Copyright The OpenTelemetry Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package metric // import "go.opentelemetry.io/otel/sdk/metric"
+
+import (
+ "context"
+ "errors"
+ "fmt"
+
+ "go.opentelemetry.io/otel/internal/global"
+ "go.opentelemetry.io/otel/metric"
+ "go.opentelemetry.io/otel/metric/embedded"
+ "go.opentelemetry.io/otel/sdk/instrumentation"
+ "go.opentelemetry.io/otel/sdk/metric/internal/aggregate"
+)
+
+// ErrInstrumentName indicates the created instrument has an invalid name.
+// Valid names must consist of 255 or fewer characters including alphanumeric, _, ., -, / and start with a letter.
+var ErrInstrumentName = errors.New("invalid instrument name")
+
+// meter handles the creation and coordination of all metric instruments. A
+// meter represents a single instrumentation scope; all metric telemetry
+// produced by an instrumentation scope will use metric instruments from a
+// single meter.
+type meter struct {
+ embedded.Meter
+
+ scope instrumentation.Scope
+ pipes pipelines
+
+ int64Resolver resolver[int64]
+ float64Resolver resolver[float64]
+}
+
+func newMeter(s instrumentation.Scope, p pipelines) *meter {
+ // viewCache ensures instrument conflicts, including number conflicts, this
+ // meter is asked to create are logged to the user.
+ var viewCache cache[string, instID]
+
+ return &meter{
+ scope: s,
+ pipes: p,
+ int64Resolver: newResolver[int64](p, &viewCache),
+ float64Resolver: newResolver[float64](p, &viewCache),
+ }
+}
+
+// Compile-time check meter implements metric.Meter.
+var _ metric.Meter = (*meter)(nil)
+
+// Int64Counter returns a new instrument identified by name and configured with
+// options. The instrument is used to synchronously record increasing int64
+// measurements during a computational operation.
+func (m *meter) Int64Counter(name string, options ...metric.Int64CounterOption) (metric.Int64Counter, error) {
+ cfg := metric.NewInt64CounterConfig(options...)
+ const kind = InstrumentKindCounter
+ p := int64InstProvider{m}
+ i, err := p.lookup(kind, name, cfg.Description(), cfg.Unit())
+ if err != nil {
+ return i, err
+ }
+
+ return i, validateInstrumentName(name)
+}
+
+// Int64UpDownCounter returns a new instrument identified by name and
+// configured with options. The instrument is used to synchronously record
+// int64 measurements during a computational operation.
+func (m *meter) Int64UpDownCounter(name string, options ...metric.Int64UpDownCounterOption) (metric.Int64UpDownCounter, error) {
+ cfg := metric.NewInt64UpDownCounterConfig(options...)
+ const kind = InstrumentKindUpDownCounter
+ p := int64InstProvider{m}
+ i, err := p.lookup(kind, name, cfg.Description(), cfg.Unit())
+ if err != nil {
+ return i, err
+ }
+
+ return i, validateInstrumentName(name)
+}
+
+// Int64Histogram returns a new instrument identified by name and configured
+// with options. The instrument is used to synchronously record the
+// distribution of int64 measurements during a computational operation.
+func (m *meter) Int64Histogram(name string, options ...metric.Int64HistogramOption) (metric.Int64Histogram, error) {
+ cfg := metric.NewInt64HistogramConfig(options...)
+ p := int64InstProvider{m}
+ i, err := p.lookupHistogram(name, cfg)
+ if err != nil {
+ return i, err
+ }
+
+ return i, validateInstrumentName(name)
+}
+
+// Int64ObservableCounter returns a new instrument identified by name and
+// configured with options. The instrument is used to asynchronously record
+// increasing int64 measurements once per a measurement collection cycle.
+// Only the measurements recorded during the collection cycle are exported.
+func (m *meter) Int64ObservableCounter(name string, options ...metric.Int64ObservableCounterOption) (metric.Int64ObservableCounter, error) {
+ cfg := metric.NewInt64ObservableCounterConfig(options...)
+ const kind = InstrumentKindObservableCounter
+ p := int64ObservProvider{m}
+ inst, err := p.lookup(kind, name, cfg.Description(), cfg.Unit())
+ if err != nil {
+ return nil, err
+ }
+ p.registerCallbacks(inst, cfg.Callbacks())
+ return inst, validateInstrumentName(name)
+}
+
+// Int64ObservableUpDownCounter returns a new instrument identified by name and
+// configured with options. The instrument is used to asynchronously record
+// int64 measurements once per a measurement collection cycle. Only the
+// measurements recorded during the collection cycle are exported.
+func (m *meter) Int64ObservableUpDownCounter(name string, options ...metric.Int64ObservableUpDownCounterOption) (metric.Int64ObservableUpDownCounter, error) {
+ cfg := metric.NewInt64ObservableUpDownCounterConfig(options...)
+ const kind = InstrumentKindObservableUpDownCounter
+ p := int64ObservProvider{m}
+ inst, err := p.lookup(kind, name, cfg.Description(), cfg.Unit())
+ if err != nil {
+ return nil, err
+ }
+ p.registerCallbacks(inst, cfg.Callbacks())
+ return inst, validateInstrumentName(name)
+}
+
+// Int64ObservableGauge returns a new instrument identified by name and
+// configured with options. The instrument is used to asynchronously record
+// instantaneous int64 measurements once per a measurement collection cycle.
+// Only the measurements recorded during the collection cycle are exported.
+func (m *meter) Int64ObservableGauge(name string, options ...metric.Int64ObservableGaugeOption) (metric.Int64ObservableGauge, error) {
+ cfg := metric.NewInt64ObservableGaugeConfig(options...)
+ const kind = InstrumentKindObservableGauge
+ p := int64ObservProvider{m}
+ inst, err := p.lookup(kind, name, cfg.Description(), cfg.Unit())
+ if err != nil {
+ return nil, err
+ }
+ p.registerCallbacks(inst, cfg.Callbacks())
+ return inst, validateInstrumentName(name)
+}
+
+// Float64Counter returns a new instrument identified by name and configured
+// with options. The instrument is used to synchronously record increasing
+// float64 measurements during a computational operation.
+func (m *meter) Float64Counter(name string, options ...metric.Float64CounterOption) (metric.Float64Counter, error) {
+ cfg := metric.NewFloat64CounterConfig(options...)
+ const kind = InstrumentKindCounter
+ p := float64InstProvider{m}
+ i, err := p.lookup(kind, name, cfg.Description(), cfg.Unit())
+ if err != nil {
+ return i, err
+ }
+
+ return i, validateInstrumentName(name)
+}
+
+// Float64UpDownCounter returns a new instrument identified by name and
+// configured with options. The instrument is used to synchronously record
+// float64 measurements during a computational operation.
+func (m *meter) Float64UpDownCounter(name string, options ...metric.Float64UpDownCounterOption) (metric.Float64UpDownCounter, error) {
+ cfg := metric.NewFloat64UpDownCounterConfig(options...)
+ const kind = InstrumentKindUpDownCounter
+ p := float64InstProvider{m}
+ i, err := p.lookup(kind, name, cfg.Description(), cfg.Unit())
+ if err != nil {
+ return i, err
+ }
+
+ return i, validateInstrumentName(name)
+}
+
+// Float64Histogram returns a new instrument identified by name and configured
+// with options. The instrument is used to synchronously record the
+// distribution of float64 measurements during a computational operation.
+func (m *meter) Float64Histogram(name string, options ...metric.Float64HistogramOption) (metric.Float64Histogram, error) {
+ cfg := metric.NewFloat64HistogramConfig(options...)
+ p := float64InstProvider{m}
+ i, err := p.lookupHistogram(name, cfg)
+ if err != nil {
+ return i, err
+ }
+
+ return i, validateInstrumentName(name)
+}
+
+// Float64ObservableCounter returns a new instrument identified by name and
+// configured with options. The instrument is used to asynchronously record
+// increasing float64 measurements once per a measurement collection cycle.
+// Only the measurements recorded during the collection cycle are exported.
+func (m *meter) Float64ObservableCounter(name string, options ...metric.Float64ObservableCounterOption) (metric.Float64ObservableCounter, error) {
+ cfg := metric.NewFloat64ObservableCounterConfig(options...)
+ const kind = InstrumentKindObservableCounter
+ p := float64ObservProvider{m}
+ inst, err := p.lookup(kind, name, cfg.Description(), cfg.Unit())
+ if err != nil {
+ return nil, err
+ }
+ p.registerCallbacks(inst, cfg.Callbacks())
+ return inst, validateInstrumentName(name)
+}
+
+// Float64ObservableUpDownCounter returns a new instrument identified by name
+// and configured with options. The instrument is used to asynchronously record
+// float64 measurements once per a measurement collection cycle. Only the
+// measurements recorded during the collection cycle are exported.
+func (m *meter) Float64ObservableUpDownCounter(name string, options ...metric.Float64ObservableUpDownCounterOption) (metric.Float64ObservableUpDownCounter, error) {
+ cfg := metric.NewFloat64ObservableUpDownCounterConfig(options...)
+ const kind = InstrumentKindObservableUpDownCounter
+ p := float64ObservProvider{m}
+ inst, err := p.lookup(kind, name, cfg.Description(), cfg.Unit())
+ if err != nil {
+ return nil, err
+ }
+ p.registerCallbacks(inst, cfg.Callbacks())
+ return inst, validateInstrumentName(name)
+}
+
+// Float64ObservableGauge returns a new instrument identified by name and
+// configured with options. The instrument is used to asynchronously record
+// instantaneous float64 measurements once per a measurement collection cycle.
+// Only the measurements recorded during the collection cycle are exported.
+func (m *meter) Float64ObservableGauge(name string, options ...metric.Float64ObservableGaugeOption) (metric.Float64ObservableGauge, error) {
+ cfg := metric.NewFloat64ObservableGaugeConfig(options...)
+ const kind = InstrumentKindObservableGauge
+ p := float64ObservProvider{m}
+ inst, err := p.lookup(kind, name, cfg.Description(), cfg.Unit())
+ if err != nil {
+ return nil, err
+ }
+ p.registerCallbacks(inst, cfg.Callbacks())
+ return inst, validateInstrumentName(name)
+}
+
+func validateInstrumentName(name string) error {
+ if len(name) == 0 {
+ return fmt.Errorf("%w: %s: is empty", ErrInstrumentName, name)
+ }
+ if len(name) > 255 {
+ return fmt.Errorf("%w: %s: longer than 255 characters", ErrInstrumentName, name)
+ }
+ if !isAlpha([]rune(name)[0]) {
+ return fmt.Errorf("%w: %s: must start with a letter", ErrInstrumentName, name)
+ }
+ if len(name) == 1 {
+ return nil
+ }
+ for _, c := range name[1:] {
+ if !isAlphanumeric(c) && c != '_' && c != '.' && c != '-' && c != '/' {
+ return fmt.Errorf("%w: %s: must only contain [A-Za-z0-9_.-/]", ErrInstrumentName, name)
+ }
+ }
+ return nil
+}
+
+func isAlpha(c rune) bool {
+ return ('a' <= c && c <= 'z') || ('A' <= c && c <= 'Z')
+}
+
+func isAlphanumeric(c rune) bool {
+ return isAlpha(c) || ('0' <= c && c <= '9')
+}
+
+// RegisterCallback registers f to be called each collection cycle so it will
+// make observations for insts during those cycles.
+//
+// The only instruments f can make observations for are insts. All other
+// observations will be dropped and an error will be logged.
+//
+// Only instruments from this meter can be registered with f, an error is
+// returned if other instrument are provided.
+//
+// Only observations made in the callback will be exported. Unlike synchronous
+// instruments, asynchronous callbacks can "forget" attribute sets that are no
+// longer relevant by omitting the observation during the callback.
+//
+// The returned Registration can be used to unregister f.
+func (m *meter) RegisterCallback(f metric.Callback, insts ...metric.Observable) (metric.Registration, error) {
+ if len(insts) == 0 {
+ // Don't allocate a observer if not needed.
+ return noopRegister{}, nil
+ }
+
+ reg := newObserver()
+ var errs multierror
+ for _, inst := range insts {
+ // Unwrap any global.
+ if u, ok := inst.(interface {
+ Unwrap() metric.Observable
+ }); ok {
+ inst = u.Unwrap()
+ }
+
+ switch o := inst.(type) {
+ case int64Observable:
+ if err := o.registerable(m); err != nil {
+ if !errors.Is(err, errEmptyAgg) {
+ errs.append(err)
+ }
+ continue
+ }
+ reg.registerInt64(o.observablID)
+ case float64Observable:
+ if err := o.registerable(m); err != nil {
+ if !errors.Is(err, errEmptyAgg) {
+ errs.append(err)
+ }
+ continue
+ }
+ reg.registerFloat64(o.observablID)
+ default:
+ // Instrument external to the SDK.
+ return nil, fmt.Errorf("invalid observable: from different implementation")
+ }
+ }
+
+ err := errs.errorOrNil()
+ if reg.len() == 0 {
+ // All insts use drop aggregation or are invalid.
+ return noopRegister{}, err
+ }
+
+ // Some or all instruments were valid.
+ cback := func(ctx context.Context) error { return f(ctx, reg) }
+ return m.pipes.registerMultiCallback(cback), err
+}
+
+type observer struct {
+ embedded.Observer
+
+ float64 map[observablID[float64]]struct{}
+ int64 map[observablID[int64]]struct{}
+}
+
+func newObserver() observer {
+ return observer{
+ float64: make(map[observablID[float64]]struct{}),
+ int64: make(map[observablID[int64]]struct{}),
+ }
+}
+
+func (r observer) len() int {
+ return len(r.float64) + len(r.int64)
+}
+
+func (r observer) registerFloat64(id observablID[float64]) {
+ r.float64[id] = struct{}{}
+}
+
+func (r observer) registerInt64(id observablID[int64]) {
+ r.int64[id] = struct{}{}
+}
+
+var (
+ errUnknownObserver = errors.New("unknown observable instrument")
+ errUnregObserver = errors.New("observable instrument not registered for callback")
+)
+
+func (r observer) ObserveFloat64(o metric.Float64Observable, v float64, opts ...metric.ObserveOption) {
+ var oImpl float64Observable
+ switch conv := o.(type) {
+ case float64Observable:
+ oImpl = conv
+ case interface {
+ Unwrap() metric.Observable
+ }:
+ // Unwrap any global.
+ async := conv.Unwrap()
+ var ok bool
+ if oImpl, ok = async.(float64Observable); !ok {
+ global.Error(errUnknownObserver, "failed to record asynchronous")
+ return
+ }
+ default:
+ global.Error(errUnknownObserver, "failed to record")
+ return
+ }
+
+ if _, registered := r.float64[oImpl.observablID]; !registered {
+ global.Error(errUnregObserver, "failed to record",
+ "name", oImpl.name,
+ "description", oImpl.description,
+ "unit", oImpl.unit,
+ "number", fmt.Sprintf("%T", float64(0)),
+ )
+ return
+ }
+ c := metric.NewObserveConfig(opts)
+ oImpl.observe(v, c.Attributes())
+}
+
+func (r observer) ObserveInt64(o metric.Int64Observable, v int64, opts ...metric.ObserveOption) {
+ var oImpl int64Observable
+ switch conv := o.(type) {
+ case int64Observable:
+ oImpl = conv
+ case interface {
+ Unwrap() metric.Observable
+ }:
+ // Unwrap any global.
+ async := conv.Unwrap()
+ var ok bool
+ if oImpl, ok = async.(int64Observable); !ok {
+ global.Error(errUnknownObserver, "failed to record asynchronous")
+ return
+ }
+ default:
+ global.Error(errUnknownObserver, "failed to record")
+ return
+ }
+
+ if _, registered := r.int64[oImpl.observablID]; !registered {
+ global.Error(errUnregObserver, "failed to record",
+ "name", oImpl.name,
+ "description", oImpl.description,
+ "unit", oImpl.unit,
+ "number", fmt.Sprintf("%T", int64(0)),
+ )
+ return
+ }
+ c := metric.NewObserveConfig(opts)
+ oImpl.observe(v, c.Attributes())
+}
+
+type noopRegister struct{ embedded.Registration }
+
+func (noopRegister) Unregister() error {
+ return nil
+}
+
+// int64InstProvider provides int64 OpenTelemetry instruments.
+type int64InstProvider struct{ *meter }
+
+func (p int64InstProvider) aggs(kind InstrumentKind, name, desc, u string) ([]aggregate.Measure[int64], error) {
+ inst := Instrument{
+ Name: name,
+ Description: desc,
+ Unit: u,
+ Kind: kind,
+ Scope: p.scope,
+ }
+ return p.int64Resolver.Aggregators(inst)
+}
+
+func (p int64InstProvider) histogramAggs(name string, cfg metric.Int64HistogramConfig) ([]aggregate.Measure[int64], error) {
+ boundaries := cfg.ExplicitBucketBoundaries()
+ aggError := AggregationExplicitBucketHistogram{Boundaries: boundaries}.err()
+ if aggError != nil {
+ // If boundaries are invalid, ignore them.
+ boundaries = nil
+ }
+ inst := Instrument{
+ Name: name,
+ Description: cfg.Description(),
+ Unit: cfg.Unit(),
+ Kind: InstrumentKindHistogram,
+ Scope: p.scope,
+ }
+ measures, err := p.int64Resolver.HistogramAggregators(inst, boundaries)
+ return measures, errors.Join(aggError, err)
+}
+
+// lookup returns the resolved instrumentImpl.
+func (p int64InstProvider) lookup(kind InstrumentKind, name, desc, u string) (*int64Inst, error) {
+ aggs, err := p.aggs(kind, name, desc, u)
+ return &int64Inst{measures: aggs}, err
+}
+
+// lookupHistogram returns the resolved instrumentImpl.
+func (p int64InstProvider) lookupHistogram(name string, cfg metric.Int64HistogramConfig) (*int64Inst, error) {
+ aggs, err := p.histogramAggs(name, cfg)
+ return &int64Inst{measures: aggs}, err
+}
+
+// float64InstProvider provides float64 OpenTelemetry instruments.
+type float64InstProvider struct{ *meter }
+
+func (p float64InstProvider) aggs(kind InstrumentKind, name, desc, u string) ([]aggregate.Measure[float64], error) {
+ inst := Instrument{
+ Name: name,
+ Description: desc,
+ Unit: u,
+ Kind: kind,
+ Scope: p.scope,
+ }
+ return p.float64Resolver.Aggregators(inst)
+}
+
+func (p float64InstProvider) histogramAggs(name string, cfg metric.Float64HistogramConfig) ([]aggregate.Measure[float64], error) {
+ boundaries := cfg.ExplicitBucketBoundaries()
+ aggError := AggregationExplicitBucketHistogram{Boundaries: boundaries}.err()
+ if aggError != nil {
+ // If boundaries are invalid, ignore them.
+ boundaries = nil
+ }
+ inst := Instrument{
+ Name: name,
+ Description: cfg.Description(),
+ Unit: cfg.Unit(),
+ Kind: InstrumentKindHistogram,
+ Scope: p.scope,
+ }
+ measures, err := p.float64Resolver.HistogramAggregators(inst, boundaries)
+ return measures, errors.Join(aggError, err)
+}
+
+// lookup returns the resolved instrumentImpl.
+func (p float64InstProvider) lookup(kind InstrumentKind, name, desc, u string) (*float64Inst, error) {
+ aggs, err := p.aggs(kind, name, desc, u)
+ return &float64Inst{measures: aggs}, err
+}
+
+// lookupHistogram returns the resolved instrumentImpl.
+func (p float64InstProvider) lookupHistogram(name string, cfg metric.Float64HistogramConfig) (*float64Inst, error) {
+ aggs, err := p.histogramAggs(name, cfg)
+ return &float64Inst{measures: aggs}, err
+}
+
+type int64ObservProvider struct{ *meter }
+
+func (p int64ObservProvider) lookup(kind InstrumentKind, name, desc, u string) (int64Observable, error) {
+ aggs, err := (int64InstProvider)(p).aggs(kind, name, desc, u)
+ return newInt64Observable(p.meter, kind, name, desc, u, aggs), err
+}
+
+func (p int64ObservProvider) registerCallbacks(inst int64Observable, cBacks []metric.Int64Callback) {
+ if inst.observable == nil || len(inst.measures) == 0 {
+ // Drop aggregator.
+ return
+ }
+
+ for _, cBack := range cBacks {
+ p.pipes.registerCallback(p.callback(inst, cBack))
+ }
+}
+
+func (p int64ObservProvider) callback(i int64Observable, f metric.Int64Callback) func(context.Context) error {
+ inst := int64Observer{int64Observable: i}
+ return func(ctx context.Context) error { return f(ctx, inst) }
+}
+
+type int64Observer struct {
+ embedded.Int64Observer
+ int64Observable
+}
+
+func (o int64Observer) Observe(val int64, opts ...metric.ObserveOption) {
+ c := metric.NewObserveConfig(opts)
+ o.observe(val, c.Attributes())
+}
+
+type float64ObservProvider struct{ *meter }
+
+func (p float64ObservProvider) lookup(kind InstrumentKind, name, desc, u string) (float64Observable, error) {
+ aggs, err := (float64InstProvider)(p).aggs(kind, name, desc, u)
+ return newFloat64Observable(p.meter, kind, name, desc, u, aggs), err
+}
+
+func (p float64ObservProvider) registerCallbacks(inst float64Observable, cBacks []metric.Float64Callback) {
+ if inst.observable == nil || len(inst.measures) == 0 {
+ // Drop aggregator.
+ return
+ }
+
+ for _, cBack := range cBacks {
+ p.pipes.registerCallback(p.callback(inst, cBack))
+ }
+}
+
+func (p float64ObservProvider) callback(i float64Observable, f metric.Float64Callback) func(context.Context) error {
+ inst := float64Observer{float64Observable: i}
+ return func(ctx context.Context) error { return f(ctx, inst) }
+}
+
+type float64Observer struct {
+ embedded.Float64Observer
+ float64Observable
+}
+
+func (o float64Observer) Observe(val float64, opts ...metric.ObserveOption) {
+ c := metric.NewObserveConfig(opts)
+ o.observe(val, c.Attributes())
+}
diff --git a/vendor/go.opentelemetry.io/otel/sdk/metric/metricdata/data.go b/vendor/go.opentelemetry.io/otel/sdk/metric/metricdata/data.go
new file mode 100644
index 000000000..995d42b38
--- /dev/null
+++ b/vendor/go.opentelemetry.io/otel/sdk/metric/metricdata/data.go
@@ -0,0 +1,293 @@
+// Copyright The OpenTelemetry Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package metricdata // import "go.opentelemetry.io/otel/sdk/metric/metricdata"
+
+import (
+ "time"
+
+ "go.opentelemetry.io/otel/attribute"
+ "go.opentelemetry.io/otel/sdk/instrumentation"
+ "go.opentelemetry.io/otel/sdk/resource"
+)
+
+// ResourceMetrics is a collection of ScopeMetrics and the associated Resource
+// that created them.
+type ResourceMetrics struct {
+ // Resource represents the entity that collected the metrics.
+ Resource *resource.Resource
+ // ScopeMetrics are the collection of metrics with unique Scopes.
+ ScopeMetrics []ScopeMetrics
+}
+
+// ScopeMetrics is a collection of Metrics Produces by a Meter.
+type ScopeMetrics struct {
+ // Scope is the Scope that the Meter was created with.
+ Scope instrumentation.Scope
+ // Metrics are a list of aggregations created by the Meter.
+ Metrics []Metrics
+}
+
+// Metrics is a collection of one or more aggregated timeseries from an Instrument.
+type Metrics struct {
+ // Name is the name of the Instrument that created this data.
+ Name string
+ // Description is the description of the Instrument, which can be used in documentation.
+ Description string
+ // Unit is the unit in which the Instrument reports.
+ Unit string
+ // Data is the aggregated data from an Instrument.
+ Data Aggregation
+}
+
+// Aggregation is the store of data reported by an Instrument.
+// It will be one of: Gauge, Sum, Histogram.
+type Aggregation interface {
+ privateAggregation()
+}
+
+// Gauge represents a measurement of the current value of an instrument.
+type Gauge[N int64 | float64] struct {
+ // DataPoints are the individual aggregated measurements with unique
+ // Attributes.
+ DataPoints []DataPoint[N]
+}
+
+func (Gauge[N]) privateAggregation() {}
+
+// Sum represents the sum of all measurements of values from an instrument.
+type Sum[N int64 | float64] struct {
+ // DataPoints are the individual aggregated measurements with unique
+ // Attributes.
+ DataPoints []DataPoint[N]
+ // Temporality describes if the aggregation is reported as the change from the
+ // last report time, or the cumulative changes since a fixed start time.
+ Temporality Temporality
+ // IsMonotonic represents if this aggregation only increases or decreases.
+ IsMonotonic bool
+}
+
+func (Sum[N]) privateAggregation() {}
+
+// DataPoint is a single data point in a timeseries.
+type DataPoint[N int64 | float64] struct {
+ // Attributes is the set of key value pairs that uniquely identify the
+ // timeseries.
+ Attributes attribute.Set
+ // StartTime is when the timeseries was started. (optional)
+ StartTime time.Time `json:",omitempty"`
+ // Time is the time when the timeseries was recorded. (optional)
+ Time time.Time `json:",omitempty"`
+ // Value is the value of this data point.
+ Value N
+
+ // Exemplars is the sampled Exemplars collected during the timeseries.
+ Exemplars []Exemplar[N] `json:",omitempty"`
+}
+
+// Histogram represents the histogram of all measurements of values from an instrument.
+type Histogram[N int64 | float64] struct {
+ // DataPoints are the individual aggregated measurements with unique
+ // Attributes.
+ DataPoints []HistogramDataPoint[N]
+ // Temporality describes if the aggregation is reported as the change from the
+ // last report time, or the cumulative changes since a fixed start time.
+ Temporality Temporality
+}
+
+func (Histogram[N]) privateAggregation() {}
+
+// HistogramDataPoint is a single histogram data point in a timeseries.
+type HistogramDataPoint[N int64 | float64] struct {
+ // Attributes is the set of key value pairs that uniquely identify the
+ // timeseries.
+ Attributes attribute.Set
+ // StartTime is when the timeseries was started.
+ StartTime time.Time
+ // Time is the time when the timeseries was recorded.
+ Time time.Time
+
+ // Count is the number of updates this histogram has been calculated with.
+ Count uint64
+ // Bounds are the upper bounds of the buckets of the histogram. Because the
+ // last boundary is +infinity this one is implied.
+ Bounds []float64
+ // BucketCounts is the count of each of the buckets.
+ BucketCounts []uint64
+
+ // Min is the minimum value recorded. (optional)
+ Min Extrema[N]
+ // Max is the maximum value recorded. (optional)
+ Max Extrema[N]
+ // Sum is the sum of the values recorded.
+ Sum N
+
+ // Exemplars is the sampled Exemplars collected during the timeseries.
+ Exemplars []Exemplar[N] `json:",omitempty"`
+}
+
+// ExponentialHistogram represents the histogram of all measurements of values from an instrument.
+type ExponentialHistogram[N int64 | float64] struct {
+ // DataPoints are the individual aggregated measurements with unique
+ // attributes.
+ DataPoints []ExponentialHistogramDataPoint[N]
+ // Temporality describes if the aggregation is reported as the change from the
+ // last report time, or the cumulative changes since a fixed start time.
+ Temporality Temporality
+}
+
+func (ExponentialHistogram[N]) privateAggregation() {}
+
+// ExponentialHistogramDataPoint is a single exponential histogram data point in a timeseries.
+type ExponentialHistogramDataPoint[N int64 | float64] struct {
+ // Attributes is the set of key value pairs that uniquely identify the
+ // timeseries.
+ Attributes attribute.Set
+ // StartTime is when the timeseries was started.
+ StartTime time.Time
+ // Time is the time when the timeseries was recorded.
+ Time time.Time
+
+ // Count is the number of updates this histogram has been calculated with.
+ Count uint64
+ // Min is the minimum value recorded. (optional)
+ Min Extrema[N]
+ // Max is the maximum value recorded. (optional)
+ Max Extrema[N]
+ // Sum is the sum of the values recorded.
+ Sum N
+
+ // Scale describes the resolution of the histogram. Boundaries are
+ // located at powers of the base, where:
+ //
+ // base = 2 ^ (2 ^ -Scale)
+ Scale int32
+ // ZeroCount is the number of values whose absolute value
+ // is less than or equal to [ZeroThreshold].
+ // When ZeroThreshold is 0, this is the number of values that
+ // cannot be expressed using the standard exponential formula
+ // as well as values that have been rounded to zero.
+ // ZeroCount represents the special zero count bucket.
+ ZeroCount uint64
+
+ // PositiveBucket is range of positive value bucket counts.
+ PositiveBucket ExponentialBucket
+ // NegativeBucket is range of negative value bucket counts.
+ NegativeBucket ExponentialBucket
+
+ // ZeroThreshold is the width of the zero region. Where the zero region is
+ // defined as the closed interval [-ZeroThreshold, ZeroThreshold].
+ ZeroThreshold float64
+
+ // Exemplars is the sampled Exemplars collected during the timeseries.
+ Exemplars []Exemplar[N] `json:",omitempty"`
+}
+
+// ExponentialBucket are a set of bucket counts, encoded in a contiguous array
+// of counts.
+type ExponentialBucket struct {
+ // Offset is the bucket index of the first entry in the Counts slice.
+ Offset int32
+ // Counts is an slice where Counts[i] carries the count of the bucket at
+ // index (Offset+i). Counts[i] is the count of values greater than
+ // base^(Offset+i) and less than or equal to base^(Offset+i+1).
+ Counts []uint64
+}
+
+// Extrema is the minimum or maximum value of a dataset.
+type Extrema[N int64 | float64] struct {
+ value N
+ valid bool
+}
+
+// NewExtrema returns an Extrema set to v.
+func NewExtrema[N int64 | float64](v N) Extrema[N] {
+ return Extrema[N]{value: v, valid: true}
+}
+
+// Value returns the Extrema value and true if the Extrema is defined.
+// Otherwise, if the Extrema is its zero-value, defined will be false.
+func (e Extrema[N]) Value() (v N, defined bool) {
+ return e.value, e.valid
+}
+
+// Exemplar is a measurement sampled from a timeseries providing a typical
+// example.
+type Exemplar[N int64 | float64] struct {
+ // FilteredAttributes are the attributes recorded with the measurement but
+ // filtered out of the timeseries' aggregated data.
+ FilteredAttributes []attribute.KeyValue
+ // Time is the time when the measurement was recorded.
+ Time time.Time
+ // Value is the measured value.
+ Value N
+ // SpanID is the ID of the span that was active during the measurement. If
+ // no span was active or the span was not sampled this will be empty.
+ SpanID []byte `json:",omitempty"`
+ // TraceID is the ID of the trace the active span belonged to during the
+ // measurement. If no span was active or the span was not sampled this will
+ // be empty.
+ TraceID []byte `json:",omitempty"`
+}
+
+// Summary metric data are used to convey quantile summaries,
+// a Prometheus (see: https://prometheus.io/docs/concepts/metric_types/#summary)
+// data type.
+//
+// These data points cannot always be merged in a meaningful way. The Summary
+// type is only used by bridges from other metrics libraries, and cannot be
+// produced using OpenTelemetry instrumentation.
+type Summary struct {
+ // DataPoints are the individual aggregated measurements with unique
+ // attributes.
+ DataPoints []SummaryDataPoint
+}
+
+func (Summary) privateAggregation() {}
+
+// SummaryDataPoint is a single data point in a timeseries that describes the
+// time-varying values of a Summary metric.
+type SummaryDataPoint struct {
+ // Attributes is the set of key value pairs that uniquely identify the
+ // timeseries.
+ Attributes attribute.Set
+
+ // StartTime is when the timeseries was started.
+ StartTime time.Time
+ // Time is the time when the timeseries was recorded.
+ Time time.Time
+
+ // Count is the number of updates this summary has been calculated with.
+ Count uint64
+
+ // Sum is the sum of the values recorded.
+ Sum float64
+
+ // (Optional) list of values at different quantiles of the distribution calculated
+ // from the current snapshot. The quantiles must be strictly increasing.
+ QuantileValues []QuantileValue
+}
+
+// QuantileValue is the value at a given quantile of a summary.
+type QuantileValue struct {
+ // Quantile is the quantile of this value.
+ //
+ // Must be in the interval [0.0, 1.0].
+ Quantile float64
+
+ // Value is the value at the given quantile of a summary.
+ //
+ // Quantile values must NOT be negative.
+ Value float64
+}
diff --git a/vendor/go.opentelemetry.io/otel/sdk/metric/metricdata/temporality.go b/vendor/go.opentelemetry.io/otel/sdk/metric/metricdata/temporality.go
new file mode 100644
index 000000000..9fceb18cb
--- /dev/null
+++ b/vendor/go.opentelemetry.io/otel/sdk/metric/metricdata/temporality.go
@@ -0,0 +1,41 @@
+// Copyright The OpenTelemetry Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+//go:generate stringer -type=Temporality
+
+package metricdata // import "go.opentelemetry.io/otel/sdk/metric/metricdata"
+
+// Temporality defines the window that an aggregation was calculated over.
+type Temporality uint8
+
+const (
+ // undefinedTemporality represents an unset Temporality.
+ //nolint:deadcode,unused,varcheck
+ undefinedTemporality Temporality = iota
+
+ // CumulativeTemporality defines a measurement interval that continues to
+ // expand forward in time from a starting point. New measurements are
+ // added to all previous measurements since a start time.
+ CumulativeTemporality
+
+ // DeltaTemporality defines a measurement interval that resets each cycle.
+ // Measurements from one cycle are recorded independently, measurements
+ // from other cycles do not affect them.
+ DeltaTemporality
+)
+
+// MarshalText returns the byte encoded of t.
+func (t Temporality) MarshalText() ([]byte, error) {
+ return []byte(t.String()), nil
+}
diff --git a/vendor/go.opentelemetry.io/otel/sdk/metric/metricdata/temporality_string.go b/vendor/go.opentelemetry.io/otel/sdk/metric/metricdata/temporality_string.go
new file mode 100644
index 000000000..4da833cdc
--- /dev/null
+++ b/vendor/go.opentelemetry.io/otel/sdk/metric/metricdata/temporality_string.go
@@ -0,0 +1,25 @@
+// Code generated by "stringer -type=Temporality"; DO NOT EDIT.
+
+package metricdata
+
+import "strconv"
+
+func _() {
+ // An "invalid array index" compiler error signifies that the constant values have changed.
+ // Re-run the stringer command to generate them again.
+ var x [1]struct{}
+ _ = x[undefinedTemporality-0]
+ _ = x[CumulativeTemporality-1]
+ _ = x[DeltaTemporality-2]
+}
+
+const _Temporality_name = "undefinedTemporalityCumulativeTemporalityDeltaTemporality"
+
+var _Temporality_index = [...]uint8{0, 20, 41, 57}
+
+func (i Temporality) String() string {
+ if i >= Temporality(len(_Temporality_index)-1) {
+ return "Temporality(" + strconv.FormatInt(int64(i), 10) + ")"
+ }
+ return _Temporality_name[_Temporality_index[i]:_Temporality_index[i+1]]
+}
diff --git a/vendor/go.opentelemetry.io/otel/sdk/metric/periodic_reader.go b/vendor/go.opentelemetry.io/otel/sdk/metric/periodic_reader.go
new file mode 100644
index 000000000..ff86999c7
--- /dev/null
+++ b/vendor/go.opentelemetry.io/otel/sdk/metric/periodic_reader.go
@@ -0,0 +1,381 @@
+// Copyright The OpenTelemetry Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package metric // import "go.opentelemetry.io/otel/sdk/metric"
+
+import (
+ "context"
+ "errors"
+ "fmt"
+ "sync"
+ "sync/atomic"
+ "time"
+
+ "go.opentelemetry.io/otel"
+ "go.opentelemetry.io/otel/internal/global"
+ "go.opentelemetry.io/otel/sdk/metric/metricdata"
+)
+
+// Default periodic reader timing.
+const (
+ defaultTimeout = time.Millisecond * 30000
+ defaultInterval = time.Millisecond * 60000
+)
+
+// periodicReaderConfig contains configuration options for a PeriodicReader.
+type periodicReaderConfig struct {
+ interval time.Duration
+ timeout time.Duration
+ producers []Producer
+}
+
+// newPeriodicReaderConfig returns a periodicReaderConfig configured with
+// options.
+func newPeriodicReaderConfig(options []PeriodicReaderOption) periodicReaderConfig {
+ c := periodicReaderConfig{
+ interval: envDuration(envInterval, defaultInterval),
+ timeout: envDuration(envTimeout, defaultTimeout),
+ }
+ for _, o := range options {
+ c = o.applyPeriodic(c)
+ }
+ return c
+}
+
+// PeriodicReaderOption applies a configuration option value to a PeriodicReader.
+type PeriodicReaderOption interface {
+ applyPeriodic(periodicReaderConfig) periodicReaderConfig
+}
+
+// periodicReaderOptionFunc applies a set of options to a periodicReaderConfig.
+type periodicReaderOptionFunc func(periodicReaderConfig) periodicReaderConfig
+
+// applyPeriodic returns a periodicReaderConfig with option(s) applied.
+func (o periodicReaderOptionFunc) applyPeriodic(conf periodicReaderConfig) periodicReaderConfig {
+ return o(conf)
+}
+
+// WithTimeout configures the time a PeriodicReader waits for an export to
+// complete before canceling it. This includes an export which occurs as part
+// of Shutdown or ForceFlush if the user passed context does not have a
+// deadline. If the user passed context does have a deadline, it will be used
+// instead.
+//
+// This option overrides any value set for the
+// OTEL_METRIC_EXPORT_TIMEOUT environment variable.
+//
+// If this option is not used or d is less than or equal to zero, 30 seconds
+// is used as the default.
+func WithTimeout(d time.Duration) PeriodicReaderOption {
+ return periodicReaderOptionFunc(func(conf periodicReaderConfig) periodicReaderConfig {
+ if d <= 0 {
+ return conf
+ }
+ conf.timeout = d
+ return conf
+ })
+}
+
+// WithInterval configures the intervening time between exports for a
+// PeriodicReader.
+//
+// This option overrides any value set for the
+// OTEL_METRIC_EXPORT_INTERVAL environment variable.
+//
+// If this option is not used or d is less than or equal to zero, 60 seconds
+// is used as the default.
+func WithInterval(d time.Duration) PeriodicReaderOption {
+ return periodicReaderOptionFunc(func(conf periodicReaderConfig) periodicReaderConfig {
+ if d <= 0 {
+ return conf
+ }
+ conf.interval = d
+ return conf
+ })
+}
+
+// NewPeriodicReader returns a Reader that collects and exports metric data to
+// the exporter at a defined interval. By default, the returned Reader will
+// collect and export data every 60 seconds, and will cancel any attempts that
+// exceed 30 seconds, collect and export combined. The collect and export time
+// are not counted towards the interval between attempts.
+//
+// The Collect method of the returned Reader continues to gather and return
+// metric data to the user. It will not automatically send that data to the
+// exporter. That is left to the user to accomplish.
+func NewPeriodicReader(exporter Exporter, options ...PeriodicReaderOption) *PeriodicReader {
+ conf := newPeriodicReaderConfig(options)
+ ctx, cancel := context.WithCancel(context.Background())
+ r := &PeriodicReader{
+ interval: conf.interval,
+ timeout: conf.timeout,
+ exporter: exporter,
+ flushCh: make(chan chan error),
+ cancel: cancel,
+ done: make(chan struct{}),
+ rmPool: sync.Pool{
+ New: func() interface{} {
+ return &metricdata.ResourceMetrics{}
+ },
+ },
+ }
+ r.externalProducers.Store(conf.producers)
+
+ go func() {
+ defer func() { close(r.done) }()
+ r.run(ctx, conf.interval)
+ }()
+
+ return r
+}
+
+// PeriodicReader is a Reader that continuously collects and exports metric
+// data at a set interval.
+type PeriodicReader struct {
+ sdkProducer atomic.Value
+
+ mu sync.Mutex
+ isShutdown bool
+ externalProducers atomic.Value
+
+ interval time.Duration
+ timeout time.Duration
+ exporter Exporter
+ flushCh chan chan error
+
+ done chan struct{}
+ cancel context.CancelFunc
+ shutdownOnce sync.Once
+
+ rmPool sync.Pool
+}
+
+// Compile time check the periodicReader implements Reader and is comparable.
+var _ = map[Reader]struct{}{&PeriodicReader{}: {}}
+
+// newTicker allows testing override.
+var newTicker = time.NewTicker
+
+// run continuously collects and exports metric data at the specified
+// interval. This will run until ctx is canceled or times out.
+func (r *PeriodicReader) run(ctx context.Context, interval time.Duration) {
+ ticker := newTicker(interval)
+ defer ticker.Stop()
+
+ for {
+ select {
+ case <-ticker.C:
+ err := r.collectAndExport(ctx)
+ if err != nil {
+ otel.Handle(err)
+ }
+ case errCh := <-r.flushCh:
+ errCh <- r.collectAndExport(ctx)
+ ticker.Reset(interval)
+ case <-ctx.Done():
+ return
+ }
+ }
+}
+
+// register registers p as the producer of this reader.
+func (r *PeriodicReader) register(p sdkProducer) {
+ // Only register once. If producer is already set, do nothing.
+ if !r.sdkProducer.CompareAndSwap(nil, produceHolder{produce: p.produce}) {
+ msg := "did not register periodic reader"
+ global.Error(errDuplicateRegister, msg)
+ }
+}
+
+// temporality reports the Temporality for the instrument kind provided.
+func (r *PeriodicReader) temporality(kind InstrumentKind) metricdata.Temporality {
+ return r.exporter.Temporality(kind)
+}
+
+// aggregation returns what Aggregation to use for kind.
+func (r *PeriodicReader) aggregation(kind InstrumentKind) Aggregation { // nolint:revive // import-shadow for method scoped by type.
+ return r.exporter.Aggregation(kind)
+}
+
+// collectAndExport gather all metric data related to the periodicReader r from
+// the SDK and exports it with r's exporter.
+func (r *PeriodicReader) collectAndExport(ctx context.Context) error {
+ ctx, cancel := context.WithTimeout(ctx, r.timeout)
+ defer cancel()
+
+ // TODO (#3047): Use a sync.Pool or persistent pointer instead of allocating rm every Collect.
+ rm := r.rmPool.Get().(*metricdata.ResourceMetrics)
+ err := r.Collect(ctx, rm)
+ if err == nil {
+ err = r.export(ctx, rm)
+ }
+ r.rmPool.Put(rm)
+ return err
+}
+
+// Collect gathers all metric data related to the Reader from
+// the SDK and other Producers and stores the result in rm. The metric
+// data is not exported to the configured exporter, it is left to the caller to
+// handle that if desired.
+//
+// Collect will return an error if called after shutdown.
+// Collect will return an error if rm is a nil ResourceMetrics.
+// Collect will return an error if the context's Done channel is closed.
+//
+// This method is safe to call concurrently.
+func (r *PeriodicReader) Collect(ctx context.Context, rm *metricdata.ResourceMetrics) error {
+ if rm == nil {
+ return errors.New("periodic reader: *metricdata.ResourceMetrics is nil")
+ }
+ // TODO (#3047): When collect is updated to accept output as param, pass rm.
+ return r.collect(ctx, r.sdkProducer.Load(), rm)
+}
+
+// collect unwraps p as a produceHolder and returns its produce results.
+func (r *PeriodicReader) collect(ctx context.Context, p interface{}, rm *metricdata.ResourceMetrics) error {
+ if p == nil {
+ return ErrReaderNotRegistered
+ }
+
+ ph, ok := p.(produceHolder)
+ if !ok {
+ // The atomic.Value is entirely in the periodicReader's control so
+ // this should never happen. In the unforeseen case that this does
+ // happen, return an error instead of panicking so a users code does
+ // not halt in the processes.
+ err := fmt.Errorf("periodic reader: invalid producer: %T", p)
+ return err
+ }
+
+ err := ph.produce(ctx, rm)
+ if err != nil {
+ return err
+ }
+ var errs []error
+ for _, producer := range r.externalProducers.Load().([]Producer) {
+ externalMetrics, err := producer.Produce(ctx)
+ if err != nil {
+ errs = append(errs, err)
+ }
+ rm.ScopeMetrics = append(rm.ScopeMetrics, externalMetrics...)
+ }
+
+ global.Debug("PeriodicReader collection", "Data", rm)
+
+ return unifyErrors(errs)
+}
+
+// export exports metric data m using r's exporter.
+func (r *PeriodicReader) export(ctx context.Context, m *metricdata.ResourceMetrics) error {
+ return r.exporter.Export(ctx, m)
+}
+
+// ForceFlush flushes pending telemetry.
+//
+// This method is safe to call concurrently.
+func (r *PeriodicReader) ForceFlush(ctx context.Context) error {
+ // Prioritize the ctx timeout if it is set.
+ if _, ok := ctx.Deadline(); !ok {
+ var cancel context.CancelFunc
+ ctx, cancel = context.WithTimeout(ctx, r.timeout)
+ defer cancel()
+ }
+
+ errCh := make(chan error, 1)
+ select {
+ case r.flushCh <- errCh:
+ select {
+ case err := <-errCh:
+ if err != nil {
+ return err
+ }
+ close(errCh)
+ case <-ctx.Done():
+ return ctx.Err()
+ }
+ case <-r.done:
+ return ErrReaderShutdown
+ case <-ctx.Done():
+ return ctx.Err()
+ }
+ return r.exporter.ForceFlush(ctx)
+}
+
+// Shutdown flushes pending telemetry and then stops the export pipeline.
+//
+// This method is safe to call concurrently.
+func (r *PeriodicReader) Shutdown(ctx context.Context) error {
+ err := ErrReaderShutdown
+ r.shutdownOnce.Do(func() {
+ // Prioritize the ctx timeout if it is set.
+ if _, ok := ctx.Deadline(); !ok {
+ var cancel context.CancelFunc
+ ctx, cancel = context.WithTimeout(ctx, r.timeout)
+ defer cancel()
+ }
+
+ // Stop the run loop.
+ r.cancel()
+ <-r.done
+
+ // Any future call to Collect will now return ErrReaderShutdown.
+ ph := r.sdkProducer.Swap(produceHolder{
+ produce: shutdownProducer{}.produce,
+ })
+
+ if ph != nil { // Reader was registered.
+ // Flush pending telemetry.
+ m := r.rmPool.Get().(*metricdata.ResourceMetrics)
+ err = r.collect(ctx, ph, m)
+ if err == nil {
+ err = r.export(ctx, m)
+ }
+ r.rmPool.Put(m)
+ }
+
+ sErr := r.exporter.Shutdown(ctx)
+ if err == nil || err == ErrReaderShutdown {
+ err = sErr
+ }
+
+ r.mu.Lock()
+ defer r.mu.Unlock()
+ r.isShutdown = true
+ // release references to Producer(s)
+ r.externalProducers.Store([]Producer{})
+ })
+ return err
+}
+
+// MarshalLog returns logging data about the PeriodicReader.
+func (r *PeriodicReader) MarshalLog() interface{} {
+ r.mu.Lock()
+ down := r.isShutdown
+ r.mu.Unlock()
+ return struct {
+ Type string
+ Exporter Exporter
+ Registered bool
+ Shutdown bool
+ Interval time.Duration
+ Timeout time.Duration
+ }{
+ Type: "PeriodicReader",
+ Exporter: r.exporter,
+ Registered: r.sdkProducer.Load() != nil,
+ Shutdown: down,
+ Interval: r.interval,
+ Timeout: r.timeout,
+ }
+}
diff --git a/vendor/go.opentelemetry.io/otel/sdk/metric/pipeline.go b/vendor/go.opentelemetry.io/otel/sdk/metric/pipeline.go
new file mode 100644
index 000000000..48abcc8a7
--- /dev/null
+++ b/vendor/go.opentelemetry.io/otel/sdk/metric/pipeline.go
@@ -0,0 +1,656 @@
+// Copyright The OpenTelemetry Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package metric // import "go.opentelemetry.io/otel/sdk/metric"
+
+import (
+ "container/list"
+ "context"
+ "errors"
+ "fmt"
+ "strings"
+ "sync"
+ "sync/atomic"
+
+ "go.opentelemetry.io/otel/internal/global"
+ "go.opentelemetry.io/otel/metric"
+ "go.opentelemetry.io/otel/metric/embedded"
+ "go.opentelemetry.io/otel/sdk/instrumentation"
+ "go.opentelemetry.io/otel/sdk/metric/internal"
+ "go.opentelemetry.io/otel/sdk/metric/internal/aggregate"
+ "go.opentelemetry.io/otel/sdk/metric/metricdata"
+ "go.opentelemetry.io/otel/sdk/resource"
+)
+
+var (
+ errCreatingAggregators = errors.New("could not create all aggregators")
+ errIncompatibleAggregation = errors.New("incompatible aggregation")
+ errUnknownAggregation = errors.New("unrecognized aggregation")
+)
+
+// instrumentSync is a synchronization point between a pipeline and an
+// instrument's aggregate function.
+type instrumentSync struct {
+ name string
+ description string
+ unit string
+ compAgg aggregate.ComputeAggregation
+}
+
+func newPipeline(res *resource.Resource, reader Reader, views []View) *pipeline {
+ if res == nil {
+ res = resource.Empty()
+ }
+ return &pipeline{
+ resource: res,
+ reader: reader,
+ views: views,
+ // aggregations is lazy allocated when needed.
+ }
+}
+
+// pipeline connects all of the instruments created by a meter provider to a Reader.
+// This is the object that will be `Reader.register()` when a meter provider is created.
+//
+// As instruments are created the instrument should be checked if it exists in
+// the views of a the Reader, and if so each aggregate function should be added
+// to the pipeline.
+type pipeline struct {
+ resource *resource.Resource
+
+ reader Reader
+ views []View
+
+ sync.Mutex
+ aggregations map[instrumentation.Scope][]instrumentSync
+ callbacks []func(context.Context) error
+ multiCallbacks list.List
+}
+
+// addSync adds the instrumentSync to pipeline p with scope. This method is not
+// idempotent. Duplicate calls will result in duplicate additions, it is the
+// callers responsibility to ensure this is called with unique values.
+func (p *pipeline) addSync(scope instrumentation.Scope, iSync instrumentSync) {
+ p.Lock()
+ defer p.Unlock()
+ if p.aggregations == nil {
+ p.aggregations = map[instrumentation.Scope][]instrumentSync{
+ scope: {iSync},
+ }
+ return
+ }
+ p.aggregations[scope] = append(p.aggregations[scope], iSync)
+}
+
+// addCallback registers a single instrument callback to be run when
+// `produce()` is called.
+func (p *pipeline) addCallback(cback func(context.Context) error) {
+ p.Lock()
+ defer p.Unlock()
+ p.callbacks = append(p.callbacks, cback)
+}
+
+type multiCallback func(context.Context) error
+
+// addMultiCallback registers a multi-instrument callback to be run when
+// `produce()` is called.
+func (p *pipeline) addMultiCallback(c multiCallback) (unregister func()) {
+ p.Lock()
+ defer p.Unlock()
+ e := p.multiCallbacks.PushBack(c)
+ return func() {
+ p.Lock()
+ p.multiCallbacks.Remove(e)
+ p.Unlock()
+ }
+}
+
+// produce returns aggregated metrics from a single collection.
+//
+// This method is safe to call concurrently.
+func (p *pipeline) produce(ctx context.Context, rm *metricdata.ResourceMetrics) error {
+ p.Lock()
+ defer p.Unlock()
+
+ var errs multierror
+ for _, c := range p.callbacks {
+ // TODO make the callbacks parallel. ( #3034 )
+ if err := c(ctx); err != nil {
+ errs.append(err)
+ }
+ if err := ctx.Err(); err != nil {
+ rm.Resource = nil
+ rm.ScopeMetrics = rm.ScopeMetrics[:0]
+ return err
+ }
+ }
+ for e := p.multiCallbacks.Front(); e != nil; e = e.Next() {
+ // TODO make the callbacks parallel. ( #3034 )
+ f := e.Value.(multiCallback)
+ if err := f(ctx); err != nil {
+ errs.append(err)
+ }
+ if err := ctx.Err(); err != nil {
+ // This means the context expired before we finished running callbacks.
+ rm.Resource = nil
+ rm.ScopeMetrics = rm.ScopeMetrics[:0]
+ return err
+ }
+ }
+
+ rm.Resource = p.resource
+ rm.ScopeMetrics = internal.ReuseSlice(rm.ScopeMetrics, len(p.aggregations))
+
+ i := 0
+ for scope, instruments := range p.aggregations {
+ rm.ScopeMetrics[i].Metrics = internal.ReuseSlice(rm.ScopeMetrics[i].Metrics, len(instruments))
+ j := 0
+ for _, inst := range instruments {
+ data := rm.ScopeMetrics[i].Metrics[j].Data
+ if n := inst.compAgg(&data); n > 0 {
+ rm.ScopeMetrics[i].Metrics[j].Name = inst.name
+ rm.ScopeMetrics[i].Metrics[j].Description = inst.description
+ rm.ScopeMetrics[i].Metrics[j].Unit = inst.unit
+ rm.ScopeMetrics[i].Metrics[j].Data = data
+ j++
+ }
+ }
+ rm.ScopeMetrics[i].Metrics = rm.ScopeMetrics[i].Metrics[:j]
+ if len(rm.ScopeMetrics[i].Metrics) > 0 {
+ rm.ScopeMetrics[i].Scope = scope
+ i++
+ }
+ }
+
+ rm.ScopeMetrics = rm.ScopeMetrics[:i]
+
+ return errs.errorOrNil()
+}
+
+// inserter facilitates inserting of new instruments from a single scope into a
+// pipeline.
+type inserter[N int64 | float64] struct {
+ // aggregators is a cache that holds aggregate function inputs whose
+ // outputs have been inserted into the underlying reader pipeline. This
+ // cache ensures no duplicate aggregate functions are inserted into the
+ // reader pipeline and if a new request during an instrument creation asks
+ // for the same aggregate function input the same instance is returned.
+ aggregators *cache[instID, aggVal[N]]
+
+ // views is a cache that holds instrument identifiers for all the
+ // instruments a Meter has created, it is provided from the Meter that owns
+ // this inserter. This cache ensures during the creation of instruments
+ // with the same name but different options (e.g. description, unit) a
+ // warning message is logged.
+ views *cache[string, instID]
+
+ pipeline *pipeline
+}
+
+func newInserter[N int64 | float64](p *pipeline, vc *cache[string, instID]) *inserter[N] {
+ if vc == nil {
+ vc = &cache[string, instID]{}
+ }
+ return &inserter[N]{
+ aggregators: &cache[instID, aggVal[N]]{},
+ views: vc,
+ pipeline: p,
+ }
+}
+
+// Instrument inserts the instrument inst with instUnit into a pipeline. All
+// views the pipeline contains are matched against, and any matching view that
+// creates a unique aggregate function will have its output inserted into the
+// pipeline and its input included in the returned slice.
+//
+// The returned aggregate function inputs are ensured to be deduplicated and
+// unique. If another view in another pipeline that is cached by this
+// inserter's cache has already inserted the same aggregate function for the
+// same instrument, that functions input instance is returned.
+//
+// If another instrument has already been inserted by this inserter, or any
+// other using the same cache, and it conflicts with the instrument being
+// inserted in this call, an aggregate function input matching the arguments
+// will still be returned but an Info level log message will also be logged to
+// the OTel global logger.
+//
+// If the passed instrument would result in an incompatible aggregate function,
+// an error is returned and that aggregate function output is not inserted nor
+// is its input returned.
+//
+// If an instrument is determined to use a Drop aggregation, that instrument is
+// not inserted nor returned.
+func (i *inserter[N]) Instrument(inst Instrument, readerAggregation Aggregation) ([]aggregate.Measure[N], error) {
+ var (
+ matched bool
+ measures []aggregate.Measure[N]
+ )
+
+ errs := &multierror{wrapped: errCreatingAggregators}
+ seen := make(map[uint64]struct{})
+ for _, v := range i.pipeline.views {
+ stream, match := v(inst)
+ if !match {
+ continue
+ }
+ matched = true
+ in, id, err := i.cachedAggregator(inst.Scope, inst.Kind, stream, readerAggregation)
+ if err != nil {
+ errs.append(err)
+ }
+ if in == nil { // Drop aggregation.
+ continue
+ }
+ if _, ok := seen[id]; ok {
+ // This aggregate function has already been added.
+ continue
+ }
+ seen[id] = struct{}{}
+ measures = append(measures, in)
+ }
+
+ if matched {
+ return measures, errs.errorOrNil()
+ }
+
+ // Apply implicit default view if no explicit matched.
+ stream := Stream{
+ Name: inst.Name,
+ Description: inst.Description,
+ Unit: inst.Unit,
+ }
+ in, _, err := i.cachedAggregator(inst.Scope, inst.Kind, stream, readerAggregation)
+ if err != nil {
+ errs.append(err)
+ }
+ if in != nil {
+ // Ensured to have not seen given matched was false.
+ measures = append(measures, in)
+ }
+ return measures, errs.errorOrNil()
+}
+
+var aggIDCount uint64
+
+// aggVal is the cached value in an aggregators cache.
+type aggVal[N int64 | float64] struct {
+ ID uint64
+ Measure aggregate.Measure[N]
+ Err error
+}
+
+// readerDefaultAggregation returns the default aggregation for the instrument
+// kind based on the reader's aggregation preferences. This is used unless the
+// aggregation is overridden with a view.
+func (i *inserter[N]) readerDefaultAggregation(kind InstrumentKind) Aggregation {
+ aggregation := i.pipeline.reader.aggregation(kind)
+ switch aggregation.(type) {
+ case nil, AggregationDefault:
+ // If the reader returns default or nil use the default selector.
+ aggregation = DefaultAggregationSelector(kind)
+ default:
+ // Deep copy and validate before using.
+ aggregation = aggregation.copy()
+ if err := aggregation.err(); err != nil {
+ orig := aggregation
+ aggregation = DefaultAggregationSelector(kind)
+ global.Error(
+ err, "using default aggregation instead",
+ "aggregation", orig,
+ "replacement", aggregation,
+ )
+ }
+ }
+ return aggregation
+}
+
+// cachedAggregator returns the appropriate aggregate input and output
+// functions for an instrument configuration. If the exact instrument has been
+// created within the inst.Scope, those aggregate function instances will be
+// returned. Otherwise, new computed aggregate functions will be cached and
+// returned.
+//
+// If the instrument configuration conflicts with an instrument that has
+// already been created (e.g. description, unit, data type) a warning will be
+// logged at the "Info" level with the global OTel logger. Valid new aggregate
+// functions for the instrument configuration will still be returned without an
+// error.
+//
+// If the instrument defines an unknown or incompatible aggregation, an error
+// is returned.
+func (i *inserter[N]) cachedAggregator(scope instrumentation.Scope, kind InstrumentKind, stream Stream, readerAggregation Aggregation) (meas aggregate.Measure[N], aggID uint64, err error) {
+ switch stream.Aggregation.(type) {
+ case nil:
+ // The aggregation was not overridden with a view. Use the aggregation
+ // provided by the reader.
+ stream.Aggregation = readerAggregation
+ case AggregationDefault:
+ // The view explicitly requested the default aggregation.
+ stream.Aggregation = DefaultAggregationSelector(kind)
+ }
+
+ if err := isAggregatorCompatible(kind, stream.Aggregation); err != nil {
+ return nil, 0, fmt.Errorf(
+ "creating aggregator with instrumentKind: %d, aggregation %v: %w",
+ kind, stream.Aggregation, err,
+ )
+ }
+
+ id := i.instID(kind, stream)
+ // If there is a conflict, the specification says the view should
+ // still be applied and a warning should be logged.
+ i.logConflict(id)
+
+ // If there are requests for the same instrument with different name
+ // casing, the first-seen needs to be returned. Use a normalize ID for the
+ // cache lookup to ensure the correct comparison.
+ normID := id.normalize()
+ cv := i.aggregators.Lookup(normID, func() aggVal[N] {
+ b := aggregate.Builder[N]{
+ Temporality: i.pipeline.reader.temporality(kind),
+ }
+ b.Filter = stream.AttributeFilter
+ in, out, err := i.aggregateFunc(b, stream.Aggregation, kind)
+ if err != nil {
+ return aggVal[N]{0, nil, err}
+ }
+ if in == nil { // Drop aggregator.
+ return aggVal[N]{0, nil, nil}
+ }
+ i.pipeline.addSync(scope, instrumentSync{
+ // Use the first-seen name casing for this and all subsequent
+ // requests of this instrument.
+ name: stream.Name,
+ description: stream.Description,
+ unit: stream.Unit,
+ compAgg: out,
+ })
+ id := atomic.AddUint64(&aggIDCount, 1)
+ return aggVal[N]{id, in, err}
+ })
+ return cv.Measure, cv.ID, cv.Err
+}
+
+// logConflict validates if an instrument with the same case-insensitive name
+// as id has already been created. If that instrument conflicts with id, a
+// warning is logged.
+func (i *inserter[N]) logConflict(id instID) {
+ // The API specification defines names as case-insensitive. If there is a
+ // different casing of a name it needs to be a conflict.
+ name := id.normalize().Name
+ existing := i.views.Lookup(name, func() instID { return id })
+ if id == existing {
+ return
+ }
+
+ const msg = "duplicate metric stream definitions"
+ args := []interface{}{
+ "names", fmt.Sprintf("%q, %q", existing.Name, id.Name),
+ "descriptions", fmt.Sprintf("%q, %q", existing.Description, id.Description),
+ "kinds", fmt.Sprintf("%s, %s", existing.Kind, id.Kind),
+ "units", fmt.Sprintf("%s, %s", existing.Unit, id.Unit),
+ "numbers", fmt.Sprintf("%s, %s", existing.Number, id.Number),
+ }
+
+ // The specification recommends logging a suggested view to resolve
+ // conflicts if possible.
+ //
+ // https://github.com/open-telemetry/opentelemetry-specification/blob/v1.21.0/specification/metrics/sdk.md#duplicate-instrument-registration
+ if id.Unit != existing.Unit || id.Number != existing.Number {
+ // There is no view resolution for these, don't make a suggestion.
+ global.Warn(msg, args...)
+ return
+ }
+
+ var stream string
+ if id.Name != existing.Name || id.Kind != existing.Kind {
+ stream = `Stream{Name: "{{NEW_NAME}}"}`
+ } else if id.Description != existing.Description {
+ stream = fmt.Sprintf("Stream{Description: %q}", existing.Description)
+ }
+
+ inst := fmt.Sprintf(
+ "Instrument{Name: %q, Description: %q, Kind: %q, Unit: %q}",
+ id.Name, id.Description, "InstrumentKind"+id.Kind.String(), id.Unit,
+ )
+ args = append(args, "suggested.view", fmt.Sprintf("NewView(%s, %s)", inst, stream))
+
+ global.Warn(msg, args...)
+}
+
+func (i *inserter[N]) instID(kind InstrumentKind, stream Stream) instID {
+ var zero N
+ return instID{
+ Name: stream.Name,
+ Description: stream.Description,
+ Unit: stream.Unit,
+ Kind: kind,
+ Number: fmt.Sprintf("%T", zero),
+ }
+}
+
+// aggregateFunc returns new aggregate functions matching agg, kind, and
+// monotonic. If the agg is unknown or temporality is invalid, an error is
+// returned.
+func (i *inserter[N]) aggregateFunc(b aggregate.Builder[N], agg Aggregation, kind InstrumentKind) (meas aggregate.Measure[N], comp aggregate.ComputeAggregation, err error) {
+ switch a := agg.(type) {
+ case AggregationDefault:
+ return i.aggregateFunc(b, DefaultAggregationSelector(kind), kind)
+ case AggregationDrop:
+ // Return nil in and out to signify the drop aggregator.
+ case AggregationLastValue:
+ meas, comp = b.LastValue()
+ case AggregationSum:
+ switch kind {
+ case InstrumentKindObservableCounter:
+ meas, comp = b.PrecomputedSum(true)
+ case InstrumentKindObservableUpDownCounter:
+ meas, comp = b.PrecomputedSum(false)
+ case InstrumentKindCounter, InstrumentKindHistogram:
+ meas, comp = b.Sum(true)
+ default:
+ // InstrumentKindUpDownCounter, InstrumentKindObservableGauge, and
+ // instrumentKindUndefined or other invalid instrument kinds.
+ meas, comp = b.Sum(false)
+ }
+ case AggregationExplicitBucketHistogram:
+ var noSum bool
+ switch kind {
+ case InstrumentKindUpDownCounter, InstrumentKindObservableUpDownCounter, InstrumentKindObservableGauge:
+ // The sum should not be collected for any instrument that can make
+ // negative measurements:
+ // https://github.com/open-telemetry/opentelemetry-specification/blob/v1.21.0/specification/metrics/sdk.md#histogram-aggregations
+ noSum = true
+ }
+ meas, comp = b.ExplicitBucketHistogram(a.Boundaries, a.NoMinMax, noSum)
+ case AggregationBase2ExponentialHistogram:
+ var noSum bool
+ switch kind {
+ case InstrumentKindUpDownCounter, InstrumentKindObservableUpDownCounter, InstrumentKindObservableGauge:
+ // The sum should not be collected for any instrument that can make
+ // negative measurements:
+ // https://github.com/open-telemetry/opentelemetry-specification/blob/v1.21.0/specification/metrics/sdk.md#histogram-aggregations
+ noSum = true
+ }
+ meas, comp = b.ExponentialBucketHistogram(a.MaxSize, a.MaxScale, a.NoMinMax, noSum)
+
+ default:
+ err = errUnknownAggregation
+ }
+
+ return meas, comp, err
+}
+
+// isAggregatorCompatible checks if the aggregation can be used by the instrument.
+// Current compatibility:
+//
+// | Instrument Kind | Drop | LastValue | Sum | Histogram | Exponential Histogram |
+// |--------------------------|------|-----------|-----|-----------|-----------------------|
+// | Counter | ✓ | | ✓ | ✓ | ✓ |
+// | UpDownCounter | ✓ | | ✓ | ✓ | ✓ |
+// | Histogram | ✓ | | ✓ | ✓ | ✓ |
+// | Observable Counter | ✓ | | ✓ | ✓ | ✓ |
+// | Observable UpDownCounter | ✓ | | ✓ | ✓ | ✓ |
+// | Observable Gauge | ✓ | ✓ | | ✓ | ✓ |.
+func isAggregatorCompatible(kind InstrumentKind, agg Aggregation) error {
+ switch agg.(type) {
+ case AggregationDefault:
+ return nil
+ case AggregationExplicitBucketHistogram, AggregationBase2ExponentialHistogram:
+ switch kind {
+ case InstrumentKindCounter,
+ InstrumentKindUpDownCounter,
+ InstrumentKindHistogram,
+ InstrumentKindObservableCounter,
+ InstrumentKindObservableUpDownCounter,
+ InstrumentKindObservableGauge:
+ return nil
+ default:
+ return errIncompatibleAggregation
+ }
+ case AggregationSum:
+ switch kind {
+ case InstrumentKindObservableCounter, InstrumentKindObservableUpDownCounter, InstrumentKindCounter, InstrumentKindHistogram, InstrumentKindUpDownCounter:
+ return nil
+ default:
+ // TODO: review need for aggregation check after
+ // https://github.com/open-telemetry/opentelemetry-specification/issues/2710
+ return errIncompatibleAggregation
+ }
+ case AggregationLastValue:
+ if kind == InstrumentKindObservableGauge {
+ return nil
+ }
+ // TODO: review need for aggregation check after
+ // https://github.com/open-telemetry/opentelemetry-specification/issues/2710
+ return errIncompatibleAggregation
+ case AggregationDrop:
+ return nil
+ default:
+ // This is used passed checking for default, it should be an error at this point.
+ return fmt.Errorf("%w: %v", errUnknownAggregation, agg)
+ }
+}
+
+// pipelines is the group of pipelines connecting Readers with instrument
+// measurement.
+type pipelines []*pipeline
+
+func newPipelines(res *resource.Resource, readers []Reader, views []View) pipelines {
+ pipes := make([]*pipeline, 0, len(readers))
+ for _, r := range readers {
+ p := newPipeline(res, r, views)
+ r.register(p)
+ pipes = append(pipes, p)
+ }
+ return pipes
+}
+
+func (p pipelines) registerCallback(cback func(context.Context) error) {
+ for _, pipe := range p {
+ pipe.addCallback(cback)
+ }
+}
+
+func (p pipelines) registerMultiCallback(c multiCallback) metric.Registration {
+ unregs := make([]func(), len(p))
+ for i, pipe := range p {
+ unregs[i] = pipe.addMultiCallback(c)
+ }
+ return unregisterFuncs{f: unregs}
+}
+
+type unregisterFuncs struct {
+ embedded.Registration
+ f []func()
+}
+
+func (u unregisterFuncs) Unregister() error {
+ for _, f := range u.f {
+ f()
+ }
+ return nil
+}
+
+// resolver facilitates resolving aggregate functions an instrument calls to
+// aggregate measurements with while updating all pipelines that need to pull
+// from those aggregations.
+type resolver[N int64 | float64] struct {
+ inserters []*inserter[N]
+}
+
+func newResolver[N int64 | float64](p pipelines, vc *cache[string, instID]) resolver[N] {
+ in := make([]*inserter[N], len(p))
+ for i := range in {
+ in[i] = newInserter[N](p[i], vc)
+ }
+ return resolver[N]{in}
+}
+
+// Aggregators returns the Aggregators that must be updated by the instrument
+// defined by key.
+func (r resolver[N]) Aggregators(id Instrument) ([]aggregate.Measure[N], error) {
+ var measures []aggregate.Measure[N]
+
+ errs := &multierror{}
+ for _, i := range r.inserters {
+ in, err := i.Instrument(id, i.readerDefaultAggregation(id.Kind))
+ if err != nil {
+ errs.append(err)
+ }
+ measures = append(measures, in...)
+ }
+ return measures, errs.errorOrNil()
+}
+
+// HistogramAggregators returns the histogram Aggregators that must be updated by the instrument
+// defined by key. If boundaries were provided on instrument instantiation, those take precedence
+// over boundaries provided by the reader.
+func (r resolver[N]) HistogramAggregators(id Instrument, boundaries []float64) ([]aggregate.Measure[N], error) {
+ var measures []aggregate.Measure[N]
+
+ errs := &multierror{}
+ for _, i := range r.inserters {
+ agg := i.readerDefaultAggregation(id.Kind)
+ if histAgg, ok := agg.(AggregationExplicitBucketHistogram); ok && len(boundaries) > 0 {
+ histAgg.Boundaries = boundaries
+ agg = histAgg
+ }
+ in, err := i.Instrument(id, agg)
+ if err != nil {
+ errs.append(err)
+ }
+ measures = append(measures, in...)
+ }
+ return measures, errs.errorOrNil()
+}
+
+type multierror struct {
+ wrapped error
+ errors []string
+}
+
+func (m *multierror) errorOrNil() error {
+ if len(m.errors) == 0 {
+ return nil
+ }
+ if m.wrapped == nil {
+ return errors.New(strings.Join(m.errors, "; "))
+ }
+ return fmt.Errorf("%w: %s", m.wrapped, strings.Join(m.errors, "; "))
+}
+
+func (m *multierror) append(err error) {
+ m.errors = append(m.errors, err.Error())
+}
diff --git a/vendor/go.opentelemetry.io/otel/sdk/metric/provider.go b/vendor/go.opentelemetry.io/otel/sdk/metric/provider.go
new file mode 100644
index 000000000..7d1a9183c
--- /dev/null
+++ b/vendor/go.opentelemetry.io/otel/sdk/metric/provider.go
@@ -0,0 +1,154 @@
+// Copyright The OpenTelemetry Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package metric // import "go.opentelemetry.io/otel/sdk/metric"
+
+import (
+ "context"
+ "sync/atomic"
+
+ "go.opentelemetry.io/otel/internal/global"
+ "go.opentelemetry.io/otel/metric"
+ "go.opentelemetry.io/otel/metric/embedded"
+ "go.opentelemetry.io/otel/metric/noop"
+ "go.opentelemetry.io/otel/sdk/instrumentation"
+)
+
+// MeterProvider handles the creation and coordination of Meters. All Meters
+// created by a MeterProvider will be associated with the same Resource, have
+// the same Views applied to them, and have their produced metric telemetry
+// passed to the configured Readers.
+type MeterProvider struct {
+ embedded.MeterProvider
+
+ pipes pipelines
+ meters cache[instrumentation.Scope, *meter]
+
+ forceFlush, shutdown func(context.Context) error
+ stopped atomic.Bool
+}
+
+// Compile-time check MeterProvider implements metric.MeterProvider.
+var _ metric.MeterProvider = (*MeterProvider)(nil)
+
+// NewMeterProvider returns a new and configured MeterProvider.
+//
+// By default, the returned MeterProvider is configured with the default
+// Resource and no Readers. Readers cannot be added after a MeterProvider is
+// created. This means the returned MeterProvider, one created with no
+// Readers, will perform no operations.
+func NewMeterProvider(options ...Option) *MeterProvider {
+ conf := newConfig(options)
+ flush, sdown := conf.readerSignals()
+
+ mp := &MeterProvider{
+ pipes: newPipelines(conf.res, conf.readers, conf.views),
+ forceFlush: flush,
+ shutdown: sdown,
+ }
+ // Log after creation so all readers show correctly they are registered.
+ global.Info("MeterProvider created",
+ "Resource", conf.res,
+ "Readers", conf.readers,
+ "Views", len(conf.views),
+ )
+ return mp
+}
+
+// Meter returns a Meter with the given name and configured with options.
+//
+// The name should be the name of the instrumentation scope creating
+// telemetry. This name may be the same as the instrumented code only if that
+// code provides built-in instrumentation.
+//
+// Calls to the Meter method after Shutdown has been called will return Meters
+// that perform no operations.
+//
+// This method is safe to call concurrently.
+func (mp *MeterProvider) Meter(name string, options ...metric.MeterOption) metric.Meter {
+ if name == "" {
+ global.Warn("Invalid Meter name.", "name", name)
+ }
+
+ if mp.stopped.Load() {
+ return noop.Meter{}
+ }
+
+ c := metric.NewMeterConfig(options...)
+ s := instrumentation.Scope{
+ Name: name,
+ Version: c.InstrumentationVersion(),
+ SchemaURL: c.SchemaURL(),
+ }
+
+ global.Info("Meter created",
+ "Name", s.Name,
+ "Version", s.Version,
+ "SchemaURL", s.SchemaURL,
+ )
+
+ return mp.meters.Lookup(s, func() *meter {
+ return newMeter(s, mp.pipes)
+ })
+}
+
+// ForceFlush flushes all pending telemetry.
+//
+// This method honors the deadline or cancellation of ctx. An appropriate
+// error will be returned in these situations. There is no guaranteed that all
+// telemetry be flushed or all resources have been released in these
+// situations.
+//
+// ForceFlush calls ForceFlush(context.Context) error
+// on all Readers that implements this method.
+//
+// This method is safe to call concurrently.
+func (mp *MeterProvider) ForceFlush(ctx context.Context) error {
+ if mp.forceFlush != nil {
+ return mp.forceFlush(ctx)
+ }
+ return nil
+}
+
+// Shutdown shuts down the MeterProvider flushing all pending telemetry and
+// releasing any held computational resources.
+//
+// This call is idempotent. The first call will perform all flush and
+// releasing operations. Subsequent calls will perform no action and will
+// return an error stating this.
+//
+// Measurements made by instruments from meters this MeterProvider created
+// will not be exported after Shutdown is called.
+//
+// This method honors the deadline or cancellation of ctx. An appropriate
+// error will be returned in these situations. There is no guaranteed that all
+// telemetry be flushed or all resources have been released in these
+// situations.
+//
+// This method is safe to call concurrently.
+func (mp *MeterProvider) Shutdown(ctx context.Context) error {
+ // Even though it may seem like there is a synchronization issue between the
+ // call to `Store` and checking `shutdown`, the Go concurrency model ensures
+ // that is not the case, as all the atomic operations executed in a program
+ // behave as though executed in some sequentially consistent order. This
+ // definition provides the same semantics as C++'s sequentially consistent
+ // atomics and Java's volatile variables.
+ // See https://go.dev/ref/mem#atomic and https://pkg.go.dev/sync/atomic.
+
+ mp.stopped.Store(true)
+ if mp.shutdown != nil {
+ return mp.shutdown(ctx)
+ }
+ return nil
+}
diff --git a/vendor/go.opentelemetry.io/otel/sdk/metric/reader.go b/vendor/go.opentelemetry.io/otel/sdk/metric/reader.go
new file mode 100644
index 000000000..65cedaf3c
--- /dev/null
+++ b/vendor/go.opentelemetry.io/otel/sdk/metric/reader.go
@@ -0,0 +1,200 @@
+// Copyright The OpenTelemetry Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package metric // import "go.opentelemetry.io/otel/sdk/metric"
+
+import (
+ "context"
+ "fmt"
+
+ "go.opentelemetry.io/otel/sdk/metric/metricdata"
+)
+
+// errDuplicateRegister is logged by a Reader when an attempt to registered it
+// more than once occurs.
+var errDuplicateRegister = fmt.Errorf("duplicate reader registration")
+
+// ErrReaderNotRegistered is returned if Collect or Shutdown are called before
+// the reader is registered with a MeterProvider.
+var ErrReaderNotRegistered = fmt.Errorf("reader is not registered")
+
+// ErrReaderShutdown is returned if Collect or Shutdown are called after a
+// reader has been Shutdown once.
+var ErrReaderShutdown = fmt.Errorf("reader is shutdown")
+
+// errNonPositiveDuration is logged when an environmental variable
+// has non-positive value.
+var errNonPositiveDuration = fmt.Errorf("non-positive duration")
+
+// Reader is the interface used between the SDK and an
+// exporter. Control flow is bi-directional through the
+// Reader, since the SDK initiates ForceFlush and Shutdown
+// while the exporter initiates collection. The Register() method here
+// informs the Reader that it can begin reading, signaling the
+// start of bi-directional control flow.
+//
+// Typically, push-based exporters that are periodic will
+// implement PeroidicExporter themselves and construct a
+// PeriodicReader to satisfy this interface.
+//
+// Pull-based exporters will typically implement Register
+// themselves, since they read on demand.
+//
+// Warning: methods may be added to this interface in minor releases.
+type Reader interface {
+ // register registers a Reader with a MeterProvider.
+ // The producer argument allows the Reader to signal the sdk to collect
+ // and send aggregated metric measurements.
+ register(sdkProducer)
+
+ // temporality reports the Temporality for the instrument kind provided.
+ //
+ // This method needs to be concurrent safe with itself and all the other
+ // Reader methods.
+ temporality(InstrumentKind) metricdata.Temporality
+
+ // aggregation returns what Aggregation to use for an instrument kind.
+ //
+ // This method needs to be concurrent safe with itself and all the other
+ // Reader methods.
+ aggregation(InstrumentKind) Aggregation // nolint:revive // import-shadow for method scoped by type.
+
+ // Collect gathers and returns all metric data related to the Reader from
+ // the SDK and stores it in out. An error is returned if this is called
+ // after Shutdown or if out is nil.
+ //
+ // This method needs to be concurrent safe, and the cancellation of the
+ // passed context is expected to be honored.
+ Collect(ctx context.Context, rm *metricdata.ResourceMetrics) error
+ // DO NOT CHANGE: any modification will not be backwards compatible and
+ // must never be done outside of a new major release.
+
+ // Shutdown flushes all metric measurements held in an export pipeline and releases any
+ // held computational resources.
+ //
+ // This deadline or cancellation of the passed context are honored. An appropriate
+ // error will be returned in these situations. There is no guaranteed that all
+ // telemetry be flushed or all resources have been released in these
+ // situations.
+ //
+ // After Shutdown is called, calls to Collect will perform no operation and instead will return
+ // an error indicating the shutdown state.
+ //
+ // This method needs to be concurrent safe.
+ Shutdown(context.Context) error
+ // DO NOT CHANGE: any modification will not be backwards compatible and
+ // must never be done outside of a new major release.
+}
+
+// sdkProducer produces metrics for a Reader.
+type sdkProducer interface {
+ // produce returns aggregated metrics from a single collection.
+ //
+ // This method is safe to call concurrently.
+ produce(context.Context, *metricdata.ResourceMetrics) error
+}
+
+// Producer produces metrics for a Reader from an external source.
+type Producer interface {
+ // DO NOT CHANGE: any modification will not be backwards compatible and
+ // must never be done outside of a new major release.
+
+ // Produce returns aggregated metrics from an external source.
+ //
+ // This method should be safe to call concurrently.
+ Produce(context.Context) ([]metricdata.ScopeMetrics, error)
+ // DO NOT CHANGE: any modification will not be backwards compatible and
+ // must never be done outside of a new major release.
+}
+
+// produceHolder is used as an atomic.Value to wrap the non-concrete producer
+// type.
+type produceHolder struct {
+ produce func(context.Context, *metricdata.ResourceMetrics) error
+}
+
+// shutdownProducer produces an ErrReaderShutdown error always.
+type shutdownProducer struct{}
+
+// produce returns an ErrReaderShutdown error.
+func (p shutdownProducer) produce(context.Context, *metricdata.ResourceMetrics) error {
+ return ErrReaderShutdown
+}
+
+// TemporalitySelector selects the temporality to use based on the InstrumentKind.
+type TemporalitySelector func(InstrumentKind) metricdata.Temporality
+
+// DefaultTemporalitySelector is the default TemporalitySelector used if
+// WithTemporalitySelector is not provided. CumulativeTemporality will be used
+// for all instrument kinds if this TemporalitySelector is used.
+func DefaultTemporalitySelector(InstrumentKind) metricdata.Temporality {
+ return metricdata.CumulativeTemporality
+}
+
+// AggregationSelector selects the aggregation and the parameters to use for
+// that aggregation based on the InstrumentKind.
+//
+// If the Aggregation returned is nil or DefaultAggregation, the selection from
+// DefaultAggregationSelector will be used.
+type AggregationSelector func(InstrumentKind) Aggregation
+
+// DefaultAggregationSelector returns the default aggregation and parameters
+// that will be used to summarize measurement made from an instrument of
+// InstrumentKind. This AggregationSelector using the following selection
+// mapping: Counter ⇨ Sum, Observable Counter ⇨ Sum, UpDownCounter ⇨ Sum,
+// Observable UpDownCounter ⇨ Sum, Observable Gauge ⇨ LastValue,
+// Histogram ⇨ ExplicitBucketHistogram.
+func DefaultAggregationSelector(ik InstrumentKind) Aggregation {
+ switch ik {
+ case InstrumentKindCounter, InstrumentKindUpDownCounter, InstrumentKindObservableCounter, InstrumentKindObservableUpDownCounter:
+ return AggregationSum{}
+ case InstrumentKindObservableGauge:
+ return AggregationLastValue{}
+ case InstrumentKindHistogram:
+ return AggregationExplicitBucketHistogram{
+ Boundaries: []float64{0, 5, 10, 25, 50, 75, 100, 250, 500, 750, 1000, 2500, 5000, 7500, 10000},
+ NoMinMax: false,
+ }
+ }
+ panic("unknown instrument kind")
+}
+
+// ReaderOption is an option which can be applied to manual or Periodic
+// readers.
+type ReaderOption interface {
+ PeriodicReaderOption
+ ManualReaderOption
+}
+
+// WithProducers registers producers as an external Producer of metric data
+// for this Reader.
+func WithProducer(p Producer) ReaderOption {
+ return producerOption{p: p}
+}
+
+type producerOption struct {
+ p Producer
+}
+
+// applyManual returns a manualReaderConfig with option applied.
+func (o producerOption) applyManual(c manualReaderConfig) manualReaderConfig {
+ c.producers = append(c.producers, o.p)
+ return c
+}
+
+// applyPeriodic returns a periodicReaderConfig with option applied.
+func (o producerOption) applyPeriodic(c periodicReaderConfig) periodicReaderConfig {
+ c.producers = append(c.producers, o.p)
+ return c
+}
diff --git a/vendor/go.opentelemetry.io/otel/sdk/metric/version.go b/vendor/go.opentelemetry.io/otel/sdk/metric/version.go
new file mode 100644
index 000000000..edcf7cfc8
--- /dev/null
+++ b/vendor/go.opentelemetry.io/otel/sdk/metric/version.go
@@ -0,0 +1,20 @@
+// Copyright The OpenTelemetry Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package metric // import "go.opentelemetry.io/otel/sdk/metric"
+
+// version is the current release version of the metric SDK in use.
+func version() string {
+ return "1.21.0"
+}
diff --git a/vendor/go.opentelemetry.io/otel/sdk/metric/view.go b/vendor/go.opentelemetry.io/otel/sdk/metric/view.go
new file mode 100644
index 000000000..65f243bef
--- /dev/null
+++ b/vendor/go.opentelemetry.io/otel/sdk/metric/view.go
@@ -0,0 +1,128 @@
+// Copyright The OpenTelemetry Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package metric // import "go.opentelemetry.io/otel/sdk/metric"
+
+import (
+ "errors"
+ "regexp"
+ "strings"
+
+ "go.opentelemetry.io/otel/internal/global"
+)
+
+var (
+ errMultiInst = errors.New("name replacement for multiple instruments")
+ errEmptyView = errors.New("no criteria provided for view")
+
+ emptyView = func(Instrument) (Stream, bool) { return Stream{}, false }
+)
+
+// View is an override to the default behavior of the SDK. It defines how data
+// should be collected for certain instruments. It returns true and the exact
+// Stream to use for matching Instruments. Otherwise, if the view does not
+// match, false is returned.
+type View func(Instrument) (Stream, bool)
+
+// NewView returns a View that applies the Stream mask for all instruments that
+// match criteria. The returned View will only apply mask if all non-zero-value
+// fields of criteria match the corresponding Instrument passed to the view. If
+// no criteria are provided, all field of criteria are their zero-values, a
+// view that matches no instruments is returned. If you need to match a
+// zero-value field, create a View directly.
+//
+// The Name field of criteria supports wildcard pattern matching. The "*"
+// wildcard is recognized as matching zero or more characters, and "?" is
+// recognized as matching exactly one character. For example, a pattern of "*"
+// matches all instrument names.
+//
+// The Stream mask only applies updates for non-zero-value fields. By default,
+// the Instrument the View matches against will be use for the Name,
+// Description, and Unit of the returned Stream and no Aggregation or
+// AttributeFilter are set. All non-zero-value fields of mask are used instead
+// of the default. If you need to zero out an Stream field returned from a
+// View, create a View directly.
+func NewView(criteria Instrument, mask Stream) View {
+ if criteria.empty() {
+ global.Error(
+ errEmptyView, "dropping view",
+ "mask", mask,
+ )
+ return emptyView
+ }
+
+ var matchFunc func(Instrument) bool
+ if strings.ContainsAny(criteria.Name, "*?") {
+ if mask.Name != "" {
+ global.Error(
+ errMultiInst, "dropping view",
+ "criteria", criteria,
+ "mask", mask,
+ )
+ return emptyView
+ }
+
+ // Handle branching here in NewView instead of criteria.matches so
+ // criteria.matches remains inlinable for the simple case.
+ pattern := regexp.QuoteMeta(criteria.Name)
+ pattern = "^" + pattern + "$"
+ pattern = strings.ReplaceAll(pattern, `\?`, ".")
+ pattern = strings.ReplaceAll(pattern, `\*`, ".*")
+ re := regexp.MustCompile(pattern)
+ matchFunc = func(i Instrument) bool {
+ return re.MatchString(i.Name) &&
+ criteria.matchesDescription(i) &&
+ criteria.matchesKind(i) &&
+ criteria.matchesUnit(i) &&
+ criteria.matchesScope(i)
+ }
+ } else {
+ matchFunc = criteria.matches
+ }
+
+ var agg Aggregation
+ if mask.Aggregation != nil {
+ agg = mask.Aggregation.copy()
+ if err := agg.err(); err != nil {
+ global.Error(
+ err, "not using aggregation with view",
+ "criteria", criteria,
+ "mask", mask,
+ )
+ agg = nil
+ }
+ }
+
+ return func(i Instrument) (Stream, bool) {
+ if matchFunc(i) {
+ return Stream{
+ Name: nonZero(mask.Name, i.Name),
+ Description: nonZero(mask.Description, i.Description),
+ Unit: nonZero(mask.Unit, i.Unit),
+ Aggregation: agg,
+ AttributeFilter: mask.AttributeFilter,
+ }, true
+ }
+ return Stream{}, false
+ }
+}
+
+// nonZero returns v if it is non-zero-valued, otherwise alt.
+func nonZero[T comparable](v, alt T) T {
+ var zero T
+ if v != zero {
+ return v
+ }
+ return alt
+}