summaryrefslogtreecommitdiff
path: root/vendor/go.opentelemetry.io/otel/sdk/trace/batch_span_processor.go
diff options
context:
space:
mode:
Diffstat (limited to 'vendor/go.opentelemetry.io/otel/sdk/trace/batch_span_processor.go')
-rw-r--r--vendor/go.opentelemetry.io/otel/sdk/trace/batch_span_processor.go68
1 files changed, 28 insertions, 40 deletions
diff --git a/vendor/go.opentelemetry.io/otel/sdk/trace/batch_span_processor.go b/vendor/go.opentelemetry.io/otel/sdk/trace/batch_span_processor.go
index 43d5b0423..c9c7effbf 100644
--- a/vendor/go.opentelemetry.io/otel/sdk/trace/batch_span_processor.go
+++ b/vendor/go.opentelemetry.io/otel/sdk/trace/batch_span_processor.go
@@ -16,7 +16,6 @@ package trace // import "go.opentelemetry.io/otel/sdk/trace"
import (
"context"
- "runtime"
"sync"
"sync/atomic"
"time"
@@ -84,6 +83,7 @@ type batchSpanProcessor struct {
stopWait sync.WaitGroup
stopOnce sync.Once
stopCh chan struct{}
+ stopped atomic.Bool
}
var _ SpanProcessor = (*batchSpanProcessor)(nil)
@@ -137,6 +137,11 @@ func (bsp *batchSpanProcessor) OnStart(parent context.Context, s ReadWriteSpan)
// OnEnd method enqueues a ReadOnlySpan for later processing.
func (bsp *batchSpanProcessor) OnEnd(s ReadOnlySpan) {
+ // Do not enqueue spans after Shutdown.
+ if bsp.stopped.Load() {
+ return
+ }
+
// Do not enqueue spans if we are just going to drop them.
if bsp.e == nil {
return
@@ -149,6 +154,7 @@ func (bsp *batchSpanProcessor) OnEnd(s ReadOnlySpan) {
func (bsp *batchSpanProcessor) Shutdown(ctx context.Context) error {
var err error
bsp.stopOnce.Do(func() {
+ bsp.stopped.Store(true)
wait := make(chan struct{})
go func() {
close(bsp.stopCh)
@@ -181,11 +187,24 @@ func (f forceFlushSpan) SpanContext() trace.SpanContext {
// ForceFlush exports all ended spans that have not yet been exported.
func (bsp *batchSpanProcessor) ForceFlush(ctx context.Context) error {
+ // Interrupt if context is already canceled.
+ if err := ctx.Err(); err != nil {
+ return err
+ }
+
+ // Do nothing after Shutdown.
+ if bsp.stopped.Load() {
+ return nil
+ }
+
var err error
if bsp.e != nil {
flushCh := make(chan struct{})
if bsp.enqueueBlockOnQueueFull(ctx, forceFlushSpan{flushed: flushCh}) {
select {
+ case <-bsp.stopCh:
+ // The batchSpanProcessor is Shutdown.
+ return nil
case <-flushCh:
// Processed any items in queue prior to ForceFlush being called
case <-ctx.Done():
@@ -326,11 +345,9 @@ func (bsp *batchSpanProcessor) drainQueue() {
for {
select {
case sd := <-bsp.queue:
- if sd == nil {
- if err := bsp.exportSpans(ctx); err != nil {
- otel.Handle(err)
- }
- return
+ if _, ok := sd.(forceFlushSpan); ok {
+ // Ignore flush requests as they are not valid spans.
+ continue
}
bsp.batchMutex.Lock()
@@ -344,7 +361,11 @@ func (bsp *batchSpanProcessor) drainQueue() {
}
}
default:
- close(bsp.queue)
+ // There are no more enqueued spans. Make final export.
+ if err := bsp.exportSpans(ctx); err != nil {
+ otel.Handle(err)
+ }
+ return
}
}
}
@@ -358,34 +379,11 @@ func (bsp *batchSpanProcessor) enqueue(sd ReadOnlySpan) {
}
}
-func recoverSendOnClosedChan() {
- x := recover()
- switch err := x.(type) {
- case nil:
- return
- case runtime.Error:
- if err.Error() == "send on closed channel" {
- return
- }
- }
- panic(x)
-}
-
func (bsp *batchSpanProcessor) enqueueBlockOnQueueFull(ctx context.Context, sd ReadOnlySpan) bool {
if !sd.SpanContext().IsSampled() {
return false
}
- // This ensures the bsp.queue<- below does not panic as the
- // processor shuts down.
- defer recoverSendOnClosedChan()
-
- select {
- case <-bsp.stopCh:
- return false
- default:
- }
-
select {
case bsp.queue <- sd:
return true
@@ -399,16 +397,6 @@ func (bsp *batchSpanProcessor) enqueueDrop(ctx context.Context, sd ReadOnlySpan)
return false
}
- // This ensures the bsp.queue<- below does not panic as the
- // processor shuts down.
- defer recoverSendOnClosedChan()
-
- select {
- case <-bsp.stopCh:
- return false
- default:
- }
-
select {
case bsp.queue <- sd:
return true