summaryrefslogtreecommitdiff
path: root/vendor/go.opentelemetry.io/otel/sdk/trace/batch_span_processor.go
diff options
context:
space:
mode:
Diffstat (limited to 'vendor/go.opentelemetry.io/otel/sdk/trace/batch_span_processor.go')
-rw-r--r--vendor/go.opentelemetry.io/otel/sdk/trace/batch_span_processor.go122
1 files changed, 111 insertions, 11 deletions
diff --git a/vendor/go.opentelemetry.io/otel/sdk/trace/batch_span_processor.go b/vendor/go.opentelemetry.io/otel/sdk/trace/batch_span_processor.go
index 6966ed861..9bc3e525d 100644
--- a/vendor/go.opentelemetry.io/otel/sdk/trace/batch_span_processor.go
+++ b/vendor/go.opentelemetry.io/otel/sdk/trace/batch_span_processor.go
@@ -6,24 +6,35 @@ package trace // import "go.opentelemetry.io/otel/sdk/trace"
import (
"context"
"errors"
+ "fmt"
"sync"
"sync/atomic"
"time"
"go.opentelemetry.io/otel"
+ "go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/internal/global"
+ "go.opentelemetry.io/otel/metric"
+ "go.opentelemetry.io/otel/sdk"
"go.opentelemetry.io/otel/sdk/internal/env"
+ "go.opentelemetry.io/otel/sdk/trace/internal/x"
+ semconv "go.opentelemetry.io/otel/semconv/v1.37.0"
+ "go.opentelemetry.io/otel/semconv/v1.37.0/otelconv"
"go.opentelemetry.io/otel/trace"
)
// Defaults for BatchSpanProcessorOptions.
const (
- DefaultMaxQueueSize = 2048
- DefaultScheduleDelay = 5000
+ DefaultMaxQueueSize = 2048
+ // DefaultScheduleDelay is the delay interval between two consecutive exports, in milliseconds.
+ DefaultScheduleDelay = 5000
+ // DefaultExportTimeout is the duration after which an export is cancelled, in milliseconds.
DefaultExportTimeout = 30000
DefaultMaxExportBatchSize = 512
)
+var queueFull = otelconv.ErrorTypeAttr("queue_full")
+
// BatchSpanProcessorOption configures a BatchSpanProcessor.
type BatchSpanProcessorOption func(o *BatchSpanProcessorOptions)
@@ -67,6 +78,11 @@ type batchSpanProcessor struct {
queue chan ReadOnlySpan
dropped uint32
+ selfObservabilityEnabled bool
+ callbackRegistration metric.Registration
+ spansProcessedCounter otelconv.SDKProcessorSpanProcessed
+ componentNameAttr attribute.KeyValue
+
batch []ReadOnlySpan
batchMutex sync.Mutex
timer *time.Timer
@@ -87,11 +103,7 @@ func NewBatchSpanProcessor(exporter SpanExporter, options ...BatchSpanProcessorO
maxExportBatchSize := env.BatchSpanProcessorMaxExportBatchSize(DefaultMaxExportBatchSize)
if maxExportBatchSize > maxQueueSize {
- if DefaultMaxExportBatchSize > maxQueueSize {
- maxExportBatchSize = maxQueueSize
- } else {
- maxExportBatchSize = DefaultMaxExportBatchSize
- }
+ maxExportBatchSize = min(DefaultMaxExportBatchSize, maxQueueSize)
}
o := BatchSpanProcessorOptions{
@@ -112,6 +124,21 @@ func NewBatchSpanProcessor(exporter SpanExporter, options ...BatchSpanProcessorO
stopCh: make(chan struct{}),
}
+ if x.SelfObservability.Enabled() {
+ bsp.selfObservabilityEnabled = true
+ bsp.componentNameAttr = componentName()
+
+ var err error
+ bsp.spansProcessedCounter, bsp.callbackRegistration, err = newBSPObs(
+ bsp.componentNameAttr,
+ func() int64 { return int64(len(bsp.queue)) },
+ int64(bsp.o.MaxQueueSize),
+ )
+ if err != nil {
+ otel.Handle(err)
+ }
+ }
+
bsp.stopWait.Add(1)
go func() {
defer bsp.stopWait.Done()
@@ -122,8 +149,61 @@ func NewBatchSpanProcessor(exporter SpanExporter, options ...BatchSpanProcessorO
return bsp
}
+var processorIDCounter atomic.Int64
+
+// nextProcessorID returns an identifier for this batch span processor,
+// starting with 0 and incrementing by 1 each time it is called.
+func nextProcessorID() int64 {
+ return processorIDCounter.Add(1) - 1
+}
+
+func componentName() attribute.KeyValue {
+ id := nextProcessorID()
+ name := fmt.Sprintf("%s/%d", otelconv.ComponentTypeBatchingSpanProcessor, id)
+ return semconv.OTelComponentName(name)
+}
+
+// newBSPObs creates and returns a new set of metrics instruments and a
+// registration for a BatchSpanProcessor. It is the caller's responsibility
+// to unregister the registration when it is no longer needed.
+func newBSPObs(
+ cmpnt attribute.KeyValue,
+ qLen func() int64,
+ qMax int64,
+) (otelconv.SDKProcessorSpanProcessed, metric.Registration, error) {
+ meter := otel.GetMeterProvider().Meter(
+ selfObsScopeName,
+ metric.WithInstrumentationVersion(sdk.Version()),
+ metric.WithSchemaURL(semconv.SchemaURL),
+ )
+
+ qCap, err := otelconv.NewSDKProcessorSpanQueueCapacity(meter)
+
+ qSize, e := otelconv.NewSDKProcessorSpanQueueSize(meter)
+ err = errors.Join(err, e)
+
+ spansProcessed, e := otelconv.NewSDKProcessorSpanProcessed(meter)
+ err = errors.Join(err, e)
+
+ cmpntT := semconv.OTelComponentTypeBatchingSpanProcessor
+ attrs := metric.WithAttributes(cmpnt, cmpntT)
+
+ reg, e := meter.RegisterCallback(
+ func(_ context.Context, o metric.Observer) error {
+ o.ObserveInt64(qSize.Inst(), qLen(), attrs)
+ o.ObserveInt64(qCap.Inst(), qMax, attrs)
+ return nil
+ },
+ qSize.Inst(),
+ qCap.Inst(),
+ )
+ err = errors.Join(err, e)
+
+ return spansProcessed, reg, err
+}
+
// OnStart method does nothing.
-func (bsp *batchSpanProcessor) OnStart(parent context.Context, s ReadWriteSpan) {}
+func (*batchSpanProcessor) OnStart(context.Context, ReadWriteSpan) {}
// OnEnd method enqueues a ReadOnlySpan for later processing.
func (bsp *batchSpanProcessor) OnEnd(s ReadOnlySpan) {
@@ -162,6 +242,9 @@ func (bsp *batchSpanProcessor) Shutdown(ctx context.Context) error {
case <-ctx.Done():
err = ctx.Err()
}
+ if bsp.selfObservabilityEnabled {
+ err = errors.Join(err, bsp.callbackRegistration.Unregister())
+ }
})
return err
}
@@ -171,7 +254,7 @@ type forceFlushSpan struct {
flushed chan struct{}
}
-func (f forceFlushSpan) SpanContext() trace.SpanContext {
+func (forceFlushSpan) SpanContext() trace.SpanContext {
return trace.NewSpanContext(trace.SpanContextConfig{TraceFlags: trace.FlagsSampled})
}
@@ -274,6 +357,11 @@ func (bsp *batchSpanProcessor) exportSpans(ctx context.Context) error {
if l := len(bsp.batch); l > 0 {
global.Debug("exporting spans", "count", len(bsp.batch), "total_dropped", atomic.LoadUint32(&bsp.dropped))
+ if bsp.selfObservabilityEnabled {
+ bsp.spansProcessedCounter.Add(ctx, int64(l),
+ bsp.componentNameAttr,
+ bsp.spansProcessedCounter.AttrComponentType(otelconv.ComponentTypeBatchingSpanProcessor))
+ }
err := bsp.e.ExportSpans(ctx, bsp.batch)
// A new batch is always created after exporting, even if the batch failed to be exported.
@@ -382,11 +470,17 @@ func (bsp *batchSpanProcessor) enqueueBlockOnQueueFull(ctx context.Context, sd R
case bsp.queue <- sd:
return true
case <-ctx.Done():
+ if bsp.selfObservabilityEnabled {
+ bsp.spansProcessedCounter.Add(ctx, 1,
+ bsp.componentNameAttr,
+ bsp.spansProcessedCounter.AttrComponentType(otelconv.ComponentTypeBatchingSpanProcessor),
+ bsp.spansProcessedCounter.AttrErrorType(queueFull))
+ }
return false
}
}
-func (bsp *batchSpanProcessor) enqueueDrop(_ context.Context, sd ReadOnlySpan) bool {
+func (bsp *batchSpanProcessor) enqueueDrop(ctx context.Context, sd ReadOnlySpan) bool {
if !sd.SpanContext().IsSampled() {
return false
}
@@ -396,12 +490,18 @@ func (bsp *batchSpanProcessor) enqueueDrop(_ context.Context, sd ReadOnlySpan) b
return true
default:
atomic.AddUint32(&bsp.dropped, 1)
+ if bsp.selfObservabilityEnabled {
+ bsp.spansProcessedCounter.Add(ctx, 1,
+ bsp.componentNameAttr,
+ bsp.spansProcessedCounter.AttrComponentType(otelconv.ComponentTypeBatchingSpanProcessor),
+ bsp.spansProcessedCounter.AttrErrorType(queueFull))
+ }
}
return false
}
// MarshalLog is the marshaling function used by the logging system to represent this Span Processor.
-func (bsp *batchSpanProcessor) MarshalLog() interface{} {
+func (bsp *batchSpanProcessor) MarshalLog() any {
return struct {
Type string
SpanExporter SpanExporter