summaryrefslogtreecommitdiff
path: root/vendor/go.opentelemetry.io/otel/sdk/trace/batch_span_processor.go
diff options
context:
space:
mode:
authorLibravatar Terin Stock <terinjokes@gmail.com>2025-03-09 17:47:56 +0100
committerLibravatar Terin Stock <terinjokes@gmail.com>2025-03-10 01:59:49 +0100
commit3ac1ee16f377d31a0fb80c8dae28b6239ac4229e (patch)
treef61faa581feaaeaba2542b9f2b8234a590684413 /vendor/go.opentelemetry.io/otel/sdk/trace/batch_span_processor.go
parent[chore] update URLs to forked source (diff)
downloadgotosocial-3ac1ee16f377d31a0fb80c8dae28b6239ac4229e.tar.xz
[chore] remove vendor
Diffstat (limited to 'vendor/go.opentelemetry.io/otel/sdk/trace/batch_span_processor.go')
-rw-r--r--vendor/go.opentelemetry.io/otel/sdk/trace/batch_span_processor.go414
1 files changed, 0 insertions, 414 deletions
diff --git a/vendor/go.opentelemetry.io/otel/sdk/trace/batch_span_processor.go b/vendor/go.opentelemetry.io/otel/sdk/trace/batch_span_processor.go
deleted file mode 100644
index ccc97e1b6..000000000
--- a/vendor/go.opentelemetry.io/otel/sdk/trace/batch_span_processor.go
+++ /dev/null
@@ -1,414 +0,0 @@
-// Copyright The OpenTelemetry Authors
-// SPDX-License-Identifier: Apache-2.0
-
-package trace // import "go.opentelemetry.io/otel/sdk/trace"
-
-import (
- "context"
- "sync"
- "sync/atomic"
- "time"
-
- "go.opentelemetry.io/otel"
- "go.opentelemetry.io/otel/internal/global"
- "go.opentelemetry.io/otel/sdk/internal/env"
- "go.opentelemetry.io/otel/trace"
-)
-
-// Defaults for BatchSpanProcessorOptions.
-const (
- DefaultMaxQueueSize = 2048
- DefaultScheduleDelay = 5000
- DefaultExportTimeout = 30000
- DefaultMaxExportBatchSize = 512
-)
-
-// BatchSpanProcessorOption configures a BatchSpanProcessor.
-type BatchSpanProcessorOption func(o *BatchSpanProcessorOptions)
-
-// BatchSpanProcessorOptions is configuration settings for a
-// BatchSpanProcessor.
-type BatchSpanProcessorOptions struct {
- // MaxQueueSize is the maximum queue size to buffer spans for delayed processing. If the
- // queue gets full it drops the spans. Use BlockOnQueueFull to change this behavior.
- // The default value of MaxQueueSize is 2048.
- MaxQueueSize int
-
- // BatchTimeout is the maximum duration for constructing a batch. Processor
- // forcefully sends available spans when timeout is reached.
- // The default value of BatchTimeout is 5000 msec.
- BatchTimeout time.Duration
-
- // ExportTimeout specifies the maximum duration for exporting spans. If the timeout
- // is reached, the export will be cancelled.
- // The default value of ExportTimeout is 30000 msec.
- ExportTimeout time.Duration
-
- // MaxExportBatchSize is the maximum number of spans to process in a single batch.
- // If there are more than one batch worth of spans then it processes multiple batches
- // of spans one batch after the other without any delay.
- // The default value of MaxExportBatchSize is 512.
- MaxExportBatchSize int
-
- // BlockOnQueueFull blocks onEnd() and onStart() method if the queue is full
- // AND if BlockOnQueueFull is set to true.
- // Blocking option should be used carefully as it can severely affect the performance of an
- // application.
- BlockOnQueueFull bool
-}
-
-// batchSpanProcessor is a SpanProcessor that batches asynchronously-received
-// spans and sends them to a trace.Exporter when complete.
-type batchSpanProcessor struct {
- e SpanExporter
- o BatchSpanProcessorOptions
-
- queue chan ReadOnlySpan
- dropped uint32
-
- batch []ReadOnlySpan
- batchMutex sync.Mutex
- timer *time.Timer
- stopWait sync.WaitGroup
- stopOnce sync.Once
- stopCh chan struct{}
- stopped atomic.Bool
-}
-
-var _ SpanProcessor = (*batchSpanProcessor)(nil)
-
-// NewBatchSpanProcessor creates a new SpanProcessor that will send completed
-// span batches to the exporter with the supplied options.
-//
-// If the exporter is nil, the span processor will perform no action.
-func NewBatchSpanProcessor(exporter SpanExporter, options ...BatchSpanProcessorOption) SpanProcessor {
- maxQueueSize := env.BatchSpanProcessorMaxQueueSize(DefaultMaxQueueSize)
- maxExportBatchSize := env.BatchSpanProcessorMaxExportBatchSize(DefaultMaxExportBatchSize)
-
- if maxExportBatchSize > maxQueueSize {
- if DefaultMaxExportBatchSize > maxQueueSize {
- maxExportBatchSize = maxQueueSize
- } else {
- maxExportBatchSize = DefaultMaxExportBatchSize
- }
- }
-
- o := BatchSpanProcessorOptions{
- BatchTimeout: time.Duration(env.BatchSpanProcessorScheduleDelay(DefaultScheduleDelay)) * time.Millisecond,
- ExportTimeout: time.Duration(env.BatchSpanProcessorExportTimeout(DefaultExportTimeout)) * time.Millisecond,
- MaxQueueSize: maxQueueSize,
- MaxExportBatchSize: maxExportBatchSize,
- }
- for _, opt := range options {
- opt(&o)
- }
- bsp := &batchSpanProcessor{
- e: exporter,
- o: o,
- batch: make([]ReadOnlySpan, 0, o.MaxExportBatchSize),
- timer: time.NewTimer(o.BatchTimeout),
- queue: make(chan ReadOnlySpan, o.MaxQueueSize),
- stopCh: make(chan struct{}),
- }
-
- bsp.stopWait.Add(1)
- go func() {
- defer bsp.stopWait.Done()
- bsp.processQueue()
- bsp.drainQueue()
- }()
-
- return bsp
-}
-
-// OnStart method does nothing.
-func (bsp *batchSpanProcessor) OnStart(parent context.Context, s ReadWriteSpan) {}
-
-// OnEnd method enqueues a ReadOnlySpan for later processing.
-func (bsp *batchSpanProcessor) OnEnd(s ReadOnlySpan) {
- // Do not enqueue spans after Shutdown.
- if bsp.stopped.Load() {
- return
- }
-
- // Do not enqueue spans if we are just going to drop them.
- if bsp.e == nil {
- return
- }
- bsp.enqueue(s)
-}
-
-// Shutdown flushes the queue and waits until all spans are processed.
-// It only executes once. Subsequent call does nothing.
-func (bsp *batchSpanProcessor) Shutdown(ctx context.Context) error {
- var err error
- bsp.stopOnce.Do(func() {
- bsp.stopped.Store(true)
- wait := make(chan struct{})
- go func() {
- close(bsp.stopCh)
- bsp.stopWait.Wait()
- if bsp.e != nil {
- if err := bsp.e.Shutdown(ctx); err != nil {
- otel.Handle(err)
- }
- }
- close(wait)
- }()
- // Wait until the wait group is done or the context is cancelled
- select {
- case <-wait:
- case <-ctx.Done():
- err = ctx.Err()
- }
- })
- return err
-}
-
-type forceFlushSpan struct {
- ReadOnlySpan
- flushed chan struct{}
-}
-
-func (f forceFlushSpan) SpanContext() trace.SpanContext {
- return trace.NewSpanContext(trace.SpanContextConfig{TraceFlags: trace.FlagsSampled})
-}
-
-// ForceFlush exports all ended spans that have not yet been exported.
-func (bsp *batchSpanProcessor) ForceFlush(ctx context.Context) error {
- // Interrupt if context is already canceled.
- if err := ctx.Err(); err != nil {
- return err
- }
-
- // Do nothing after Shutdown.
- if bsp.stopped.Load() {
- return nil
- }
-
- var err error
- if bsp.e != nil {
- flushCh := make(chan struct{})
- if bsp.enqueueBlockOnQueueFull(ctx, forceFlushSpan{flushed: flushCh}) {
- select {
- case <-bsp.stopCh:
- // The batchSpanProcessor is Shutdown.
- return nil
- case <-flushCh:
- // Processed any items in queue prior to ForceFlush being called
- case <-ctx.Done():
- return ctx.Err()
- }
- }
-
- wait := make(chan error)
- go func() {
- wait <- bsp.exportSpans(ctx)
- close(wait)
- }()
- // Wait until the export is finished or the context is cancelled/timed out
- select {
- case err = <-wait:
- case <-ctx.Done():
- err = ctx.Err()
- }
- }
- return err
-}
-
-// WithMaxQueueSize returns a BatchSpanProcessorOption that configures the
-// maximum queue size allowed for a BatchSpanProcessor.
-func WithMaxQueueSize(size int) BatchSpanProcessorOption {
- return func(o *BatchSpanProcessorOptions) {
- o.MaxQueueSize = size
- }
-}
-
-// WithMaxExportBatchSize returns a BatchSpanProcessorOption that configures
-// the maximum export batch size allowed for a BatchSpanProcessor.
-func WithMaxExportBatchSize(size int) BatchSpanProcessorOption {
- return func(o *BatchSpanProcessorOptions) {
- o.MaxExportBatchSize = size
- }
-}
-
-// WithBatchTimeout returns a BatchSpanProcessorOption that configures the
-// maximum delay allowed for a BatchSpanProcessor before it will export any
-// held span (whether the queue is full or not).
-func WithBatchTimeout(delay time.Duration) BatchSpanProcessorOption {
- return func(o *BatchSpanProcessorOptions) {
- o.BatchTimeout = delay
- }
-}
-
-// WithExportTimeout returns a BatchSpanProcessorOption that configures the
-// amount of time a BatchSpanProcessor waits for an exporter to export before
-// abandoning the export.
-func WithExportTimeout(timeout time.Duration) BatchSpanProcessorOption {
- return func(o *BatchSpanProcessorOptions) {
- o.ExportTimeout = timeout
- }
-}
-
-// WithBlocking returns a BatchSpanProcessorOption that configures a
-// BatchSpanProcessor to wait for enqueue operations to succeed instead of
-// dropping data when the queue is full.
-func WithBlocking() BatchSpanProcessorOption {
- return func(o *BatchSpanProcessorOptions) {
- o.BlockOnQueueFull = true
- }
-}
-
-// exportSpans is a subroutine of processing and draining the queue.
-func (bsp *batchSpanProcessor) exportSpans(ctx context.Context) error {
- bsp.timer.Reset(bsp.o.BatchTimeout)
-
- bsp.batchMutex.Lock()
- defer bsp.batchMutex.Unlock()
-
- if bsp.o.ExportTimeout > 0 {
- var cancel context.CancelFunc
- ctx, cancel = context.WithTimeout(ctx, bsp.o.ExportTimeout)
- defer cancel()
- }
-
- if l := len(bsp.batch); l > 0 {
- global.Debug("exporting spans", "count", len(bsp.batch), "total_dropped", atomic.LoadUint32(&bsp.dropped))
- err := bsp.e.ExportSpans(ctx, bsp.batch)
-
- // A new batch is always created after exporting, even if the batch failed to be exported.
- //
- // It is up to the exporter to implement any type of retry logic if a batch is failing
- // to be exported, since it is specific to the protocol and backend being sent to.
- clear(bsp.batch) // Erase elements to let GC collect objects
- bsp.batch = bsp.batch[:0]
-
- if err != nil {
- return err
- }
- }
- return nil
-}
-
-// processQueue removes spans from the `queue` channel until processor
-// is shut down. It calls the exporter in batches of up to MaxExportBatchSize
-// waiting up to BatchTimeout to form a batch.
-func (bsp *batchSpanProcessor) processQueue() {
- defer bsp.timer.Stop()
-
- ctx, cancel := context.WithCancel(context.Background())
- defer cancel()
- for {
- select {
- case <-bsp.stopCh:
- return
- case <-bsp.timer.C:
- if err := bsp.exportSpans(ctx); err != nil {
- otel.Handle(err)
- }
- case sd := <-bsp.queue:
- if ffs, ok := sd.(forceFlushSpan); ok {
- close(ffs.flushed)
- continue
- }
- bsp.batchMutex.Lock()
- bsp.batch = append(bsp.batch, sd)
- shouldExport := len(bsp.batch) >= bsp.o.MaxExportBatchSize
- bsp.batchMutex.Unlock()
- if shouldExport {
- if !bsp.timer.Stop() {
- // Handle both GODEBUG=asynctimerchan=[0|1] properly.
- select {
- case <-bsp.timer.C:
- default:
- }
- }
- if err := bsp.exportSpans(ctx); err != nil {
- otel.Handle(err)
- }
- }
- }
- }
-}
-
-// drainQueue awaits the any caller that had added to bsp.stopWait
-// to finish the enqueue, then exports the final batch.
-func (bsp *batchSpanProcessor) drainQueue() {
- ctx, cancel := context.WithCancel(context.Background())
- defer cancel()
- for {
- select {
- case sd := <-bsp.queue:
- if _, ok := sd.(forceFlushSpan); ok {
- // Ignore flush requests as they are not valid spans.
- continue
- }
-
- bsp.batchMutex.Lock()
- bsp.batch = append(bsp.batch, sd)
- shouldExport := len(bsp.batch) == bsp.o.MaxExportBatchSize
- bsp.batchMutex.Unlock()
-
- if shouldExport {
- if err := bsp.exportSpans(ctx); err != nil {
- otel.Handle(err)
- }
- }
- default:
- // There are no more enqueued spans. Make final export.
- if err := bsp.exportSpans(ctx); err != nil {
- otel.Handle(err)
- }
- return
- }
- }
-}
-
-func (bsp *batchSpanProcessor) enqueue(sd ReadOnlySpan) {
- ctx := context.TODO()
- if bsp.o.BlockOnQueueFull {
- bsp.enqueueBlockOnQueueFull(ctx, sd)
- } else {
- bsp.enqueueDrop(ctx, sd)
- }
-}
-
-func (bsp *batchSpanProcessor) enqueueBlockOnQueueFull(ctx context.Context, sd ReadOnlySpan) bool {
- if !sd.SpanContext().IsSampled() {
- return false
- }
-
- select {
- case bsp.queue <- sd:
- return true
- case <-ctx.Done():
- return false
- }
-}
-
-func (bsp *batchSpanProcessor) enqueueDrop(_ context.Context, sd ReadOnlySpan) bool {
- if !sd.SpanContext().IsSampled() {
- return false
- }
-
- select {
- case bsp.queue <- sd:
- return true
- default:
- atomic.AddUint32(&bsp.dropped, 1)
- }
- return false
-}
-
-// MarshalLog is the marshaling function used by the logging system to represent this Span Processor.
-func (bsp *batchSpanProcessor) MarshalLog() interface{} {
- return struct {
- Type string
- SpanExporter SpanExporter
- Config BatchSpanProcessorOptions
- }{
- Type: "BatchSpanProcessor",
- SpanExporter: bsp.e,
- Config: bsp.o,
- }
-}