diff options
| author | 2025-05-05 16:22:45 +0000 | |
|---|---|---|
| committer | 2025-05-05 16:22:45 +0000 | |
| commit | ecbdc4227ba49eca622812b7413aa877318fd7a0 (patch) | |
| tree | 37f441534f0f7673f72b8b1fa83ddc12db8ec95c /vendor/go.opentelemetry.io/otel/sdk/log/batch.go | |
| parent | [chore] Update goreleaser (#4133) (diff) | |
| download | gotosocial-ecbdc4227ba49eca622812b7413aa877318fd7a0.tar.xz | |
[chore] Simplify the OTEL setup (#4110)
# Description
This simplifies our OTEL setup by:
* Getting rid of some deprecated things.
* Using `autoexport` and letting things get configured by the `OTEL_` environment variables.
* Removing all the unnecessary config options.
## Checklist
Please put an x inside each checkbox to indicate that you've read and followed it: `[ ]` -> `[x]`
If this is a documentation change, only the first checkbox must be filled (you can delete the others if you want).
- [x] I/we have read the [GoToSocial contribution guidelines](https://codeberg.org/superseriousbusiness/gotosocial/src/branch/main/CONTRIBUTING.md).
- [x] I/we have discussed the proposed changes already, either in an issue on the repository, or in the Matrix chat.
- [x] I/we have not leveraged AI to create the proposed changes.
- [x] I/we have performed a self-review of added code.
- [x] I/we have written code that is legible and maintainable by others.
- [ ] I/we have commented the added code, particularly in hard-to-understand areas.
- [x] I/we have made any necessary changes to documentation.
- [ ] I/we have added tests that cover new code.
- [x] I/we have run tests and they pass locally with the changes.
- [x] I/we have run `go fmt ./...` and `golangci-lint run`.
Reviewed-on: https://codeberg.org/superseriousbusiness/gotosocial/pulls/4110
Reviewed-by: tobi <kipvandenbos@noreply.codeberg.org>
Co-authored-by: Daenney <daenney@noreply.codeberg.org>
Co-committed-by: Daenney <daenney@noreply.codeberg.org>
Diffstat (limited to 'vendor/go.opentelemetry.io/otel/sdk/log/batch.go')
| -rw-r--r-- | vendor/go.opentelemetry.io/otel/sdk/log/batch.go | 477 |
1 files changed, 477 insertions, 0 deletions
diff --git a/vendor/go.opentelemetry.io/otel/sdk/log/batch.go b/vendor/go.opentelemetry.io/otel/sdk/log/batch.go new file mode 100644 index 000000000..28c969262 --- /dev/null +++ b/vendor/go.opentelemetry.io/otel/sdk/log/batch.go @@ -0,0 +1,477 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package log // import "go.opentelemetry.io/otel/sdk/log" + +import ( + "context" + "errors" + "slices" + "sync" + "sync/atomic" + "time" + + "go.opentelemetry.io/otel/internal/global" +) + +const ( + dfltMaxQSize = 2048 + dfltExpInterval = time.Second + dfltExpTimeout = 30 * time.Second + dfltExpMaxBatchSize = 512 + dfltExpBufferSize = 1 + + envarMaxQSize = "OTEL_BLRP_MAX_QUEUE_SIZE" + envarExpInterval = "OTEL_BLRP_SCHEDULE_DELAY" + envarExpTimeout = "OTEL_BLRP_EXPORT_TIMEOUT" + envarExpMaxBatchSize = "OTEL_BLRP_MAX_EXPORT_BATCH_SIZE" +) + +// Compile-time check BatchProcessor implements Processor. +var _ Processor = (*BatchProcessor)(nil) + +// BatchProcessor is a processor that exports batches of log records. +// +// Use [NewBatchProcessor] to create a BatchProcessor. An empty BatchProcessor +// is shut down by default, no records will be batched or exported. +type BatchProcessor struct { + // The BatchProcessor is designed to provide the highest throughput of + // log records possible while being compatible with OpenTelemetry. The + // entry point of log records is the OnEmit method. This method is designed + // to receive records as fast as possible while still honoring shutdown + // commands. All records received are enqueued to queue. + // + // In order to block OnEmit as little as possible, a separate "poll" + // goroutine is spawned at the creation of a BatchProcessor. This + // goroutine is responsible for batching the queue at regular polled + // intervals, or when it is directly signaled to. + // + // To keep the polling goroutine from backing up, all batches it makes are + // exported with a bufferedExporter. This exporter allows the poll + // goroutine to enqueue an export payload that will be handled in a + // separate goroutine dedicated to the export. This asynchronous behavior + // allows the poll goroutine to maintain accurate interval polling. + // + // ___BatchProcessor____ __Poll Goroutine__ __Export Goroutine__ + // || || || || || || + // || ********** || || || || ********** || + // || Records=>* OnEmit * || || | - ticker || || * export * || + // || ********** || || | - trigger || || ********** || + // || || || || | || || || || + // || || || || | || || || || + // || __________\/___ || || |*********** || || ______/\_______ || + // || (____queue______)>=||=||===|* batch *===||=||=>[_export_buffer_] || + // || || || |*********** || || || + // ||_____________________|| ||__________________|| ||____________________|| + // + // + // The "release valve" in this processing is the record queue. This queue + // is a ring buffer. It will overwrite the oldest records first when writes + // to OnEmit are made faster than the queue can be flushed. If batches + // cannot be flushed to the export buffer, the records will remain in the + // queue. + + // exporter is the bufferedExporter all batches are exported with. + exporter *bufferExporter + + // q is the active queue of records that have not yet been exported. + q *queue + // batchSize is the minimum number of records needed before an export is + // triggered (unless the interval expires). + batchSize int + + // pollTrigger triggers the poll goroutine to flush a batch from the queue. + // This is sent to when it is known that the queue contains at least one + // complete batch. + // + // When a send is made to the channel, the poll loop will be reset after + // the flush. If there is still enough records in the queue for another + // batch the reset of the poll loop will automatically re-trigger itself. + // There is no need for the original sender to monitor and resend. + pollTrigger chan struct{} + // pollKill kills the poll goroutine. This is only expected to be closed + // once by the Shutdown method. + pollKill chan struct{} + // pollDone signals the poll goroutine has completed. + pollDone chan struct{} + + // stopped holds the stopped state of the BatchProcessor. + stopped atomic.Bool + + noCmp [0]func() //nolint: unused // This is indeed used. +} + +// NewBatchProcessor decorates the provided exporter +// so that the log records are batched before exporting. +// +// All of the exporter's methods are called synchronously. +func NewBatchProcessor(exporter Exporter, opts ...BatchProcessorOption) *BatchProcessor { + cfg := newBatchConfig(opts) + if exporter == nil { + // Do not panic on nil export. + exporter = defaultNoopExporter + } + // Order is important here. Wrap the timeoutExporter with the chunkExporter + // to ensure each export completes in timeout (instead of all chunked + // exports). + exporter = newTimeoutExporter(exporter, cfg.expTimeout.Value) + // Use a chunkExporter to ensure ForceFlush and Shutdown calls are batched + // appropriately on export. + exporter = newChunkExporter(exporter, cfg.expMaxBatchSize.Value) + + b := &BatchProcessor{ + exporter: newBufferExporter(exporter, cfg.expBufferSize.Value), + + q: newQueue(cfg.maxQSize.Value), + batchSize: cfg.expMaxBatchSize.Value, + pollTrigger: make(chan struct{}, 1), + pollKill: make(chan struct{}), + } + b.pollDone = b.poll(cfg.expInterval.Value) + return b +} + +// poll spawns a goroutine to handle interval polling and batch exporting. The +// returned done chan is closed when the spawned goroutine completes. +func (b *BatchProcessor) poll(interval time.Duration) (done chan struct{}) { + done = make(chan struct{}) + + ticker := time.NewTicker(interval) + // TODO: investigate using a sync.Pool instead of cloning. + buf := make([]Record, b.batchSize) + go func() { + defer close(done) + defer ticker.Stop() + + for { + select { + case <-ticker.C: + case <-b.pollTrigger: + ticker.Reset(interval) + case <-b.pollKill: + return + } + + if d := b.q.Dropped(); d > 0 { + global.Warn("dropped log records", "dropped", d) + } + + qLen := b.q.TryDequeue(buf, func(r []Record) bool { + ok := b.exporter.EnqueueExport(r) + if ok { + buf = slices.Clone(buf) + } + return ok + }) + if qLen >= b.batchSize { + // There is another full batch ready. Immediately trigger + // another export attempt. + select { + case b.pollTrigger <- struct{}{}: + default: + // Another flush signal already received. + } + } + } + }() + return done +} + +// OnEmit batches provided log record. +func (b *BatchProcessor) OnEmit(_ context.Context, r *Record) error { + if b.stopped.Load() || b.q == nil { + return nil + } + // The record is cloned so that changes done by subsequent processors + // are not going to lead to a data race. + if n := b.q.Enqueue(r.Clone()); n >= b.batchSize { + select { + case b.pollTrigger <- struct{}{}: + default: + // Flush chan full. The poll goroutine will handle this by + // re-sending any trigger until the queue has less than batchSize + // records. + } + } + return nil +} + +// Shutdown flushes queued log records and shuts down the decorated exporter. +func (b *BatchProcessor) Shutdown(ctx context.Context) error { + if b.stopped.Swap(true) || b.q == nil { + return nil + } + + // Stop the poll goroutine. + close(b.pollKill) + select { + case <-b.pollDone: + case <-ctx.Done(): + // Out of time. + return errors.Join(ctx.Err(), b.exporter.Shutdown(ctx)) + } + + // Flush remaining queued before exporter shutdown. + err := b.exporter.Export(ctx, b.q.Flush()) + return errors.Join(err, b.exporter.Shutdown(ctx)) +} + +var errPartialFlush = errors.New("partial flush: export buffer full") + +// Used for testing. +var ctxErr = func(ctx context.Context) error { + return ctx.Err() +} + +// ForceFlush flushes queued log records and flushes the decorated exporter. +func (b *BatchProcessor) ForceFlush(ctx context.Context) error { + if b.stopped.Load() || b.q == nil { + return nil + } + + buf := make([]Record, b.q.cap) + notFlushed := func() bool { + var flushed bool + _ = b.q.TryDequeue(buf, func(r []Record) bool { + flushed = b.exporter.EnqueueExport(r) + return flushed + }) + return !flushed + } + var err error + // For as long as ctx allows, try to make a single flush of the queue. + for notFlushed() { + // Use ctxErr instead of calling ctx.Err directly so we can test + // the partial error return. + if e := ctxErr(ctx); e != nil { + err = errors.Join(e, errPartialFlush) + break + } + } + return errors.Join(err, b.exporter.ForceFlush(ctx)) +} + +// queue holds a queue of logging records. +// +// When the queue becomes full, the oldest records in the queue are +// overwritten. +type queue struct { + sync.Mutex + + dropped atomic.Uint64 + cap, len int + read, write *ring +} + +func newQueue(size int) *queue { + r := newRing(size) + return &queue{ + cap: size, + read: r, + write: r, + } +} + +// Dropped returns the number of Records dropped during enqueueing since the +// last time Dropped was called. +func (q *queue) Dropped() uint64 { + return q.dropped.Swap(0) +} + +// Enqueue adds r to the queue. The queue size, including the addition of r, is +// returned. +// +// If enqueueing r will exceed the capacity of q, the oldest Record held in q +// will be dropped and r retained. +func (q *queue) Enqueue(r Record) int { + q.Lock() + defer q.Unlock() + + q.write.Value = r + q.write = q.write.Next() + + q.len++ + if q.len > q.cap { + // Overflow. Advance read to be the new "oldest". + q.len = q.cap + q.read = q.read.Next() + q.dropped.Add(1) + } + return q.len +} + +// TryDequeue attempts to dequeue up to len(buf) Records. The available Records +// will be assigned into buf and passed to write. If write fails, returning +// false, the Records will not be removed from the queue. If write succeeds, +// returning true, the dequeued Records are removed from the queue. The number +// of Records remaining in the queue are returned. +// +// When write is called the lock of q is held. The write function must not call +// other methods of this q that acquire the lock. +func (q *queue) TryDequeue(buf []Record, write func([]Record) bool) int { + q.Lock() + defer q.Unlock() + + origRead := q.read + + n := min(len(buf), q.len) + for i := 0; i < n; i++ { + buf[i] = q.read.Value + q.read = q.read.Next() + } + + if write(buf[:n]) { + q.len -= n + } else { + q.read = origRead + } + return q.len +} + +// Flush returns all the Records held in the queue and resets it to be +// empty. +func (q *queue) Flush() []Record { + q.Lock() + defer q.Unlock() + + out := make([]Record, q.len) + for i := range out { + out[i] = q.read.Value + q.read = q.read.Next() + } + q.len = 0 + + return out +} + +type batchConfig struct { + maxQSize setting[int] + expInterval setting[time.Duration] + expTimeout setting[time.Duration] + expMaxBatchSize setting[int] + expBufferSize setting[int] +} + +func newBatchConfig(options []BatchProcessorOption) batchConfig { + var c batchConfig + for _, o := range options { + c = o.apply(c) + } + + c.maxQSize = c.maxQSize.Resolve( + clearLessThanOne[int](), + getenv[int](envarMaxQSize), + clearLessThanOne[int](), + fallback[int](dfltMaxQSize), + ) + c.expInterval = c.expInterval.Resolve( + clearLessThanOne[time.Duration](), + getenv[time.Duration](envarExpInterval), + clearLessThanOne[time.Duration](), + fallback[time.Duration](dfltExpInterval), + ) + c.expTimeout = c.expTimeout.Resolve( + clearLessThanOne[time.Duration](), + getenv[time.Duration](envarExpTimeout), + clearLessThanOne[time.Duration](), + fallback[time.Duration](dfltExpTimeout), + ) + c.expMaxBatchSize = c.expMaxBatchSize.Resolve( + clearLessThanOne[int](), + getenv[int](envarExpMaxBatchSize), + clearLessThanOne[int](), + clampMax[int](c.maxQSize.Value), + fallback[int](dfltExpMaxBatchSize), + ) + c.expBufferSize = c.expBufferSize.Resolve( + clearLessThanOne[int](), + fallback[int](dfltExpBufferSize), + ) + + return c +} + +// BatchProcessorOption applies a configuration to a [BatchProcessor]. +type BatchProcessorOption interface { + apply(batchConfig) batchConfig +} + +type batchOptionFunc func(batchConfig) batchConfig + +func (fn batchOptionFunc) apply(c batchConfig) batchConfig { + return fn(c) +} + +// WithMaxQueueSize sets the maximum queue size used by the Batcher. +// After the size is reached log records are dropped. +// +// If the OTEL_BLRP_MAX_QUEUE_SIZE environment variable is set, +// and this option is not passed, that variable value will be used. +// +// By default, if an environment variable is not set, and this option is not +// passed, 2048 will be used. +// The default value is also used when the provided value is less than one. +func WithMaxQueueSize(size int) BatchProcessorOption { + return batchOptionFunc(func(cfg batchConfig) batchConfig { + cfg.maxQSize = newSetting(size) + return cfg + }) +} + +// WithExportInterval sets the maximum duration between batched exports. +// +// If the OTEL_BLRP_SCHEDULE_DELAY environment variable is set, +// and this option is not passed, that variable value will be used. +// +// By default, if an environment variable is not set, and this option is not +// passed, 1s will be used. +// The default value is also used when the provided value is less than one. +func WithExportInterval(d time.Duration) BatchProcessorOption { + return batchOptionFunc(func(cfg batchConfig) batchConfig { + cfg.expInterval = newSetting(d) + return cfg + }) +} + +// WithExportTimeout sets the duration after which a batched export is canceled. +// +// If the OTEL_BLRP_EXPORT_TIMEOUT environment variable is set, +// and this option is not passed, that variable value will be used. +// +// By default, if an environment variable is not set, and this option is not +// passed, 30s will be used. +// The default value is also used when the provided value is less than one. +func WithExportTimeout(d time.Duration) BatchProcessorOption { + return batchOptionFunc(func(cfg batchConfig) batchConfig { + cfg.expTimeout = newSetting(d) + return cfg + }) +} + +// WithExportMaxBatchSize sets the maximum batch size of every export. +// A batch will be split into multiple exports to not exceed this size. +// +// If the OTEL_BLRP_MAX_EXPORT_BATCH_SIZE environment variable is set, +// and this option is not passed, that variable value will be used. +// +// By default, if an environment variable is not set, and this option is not +// passed, 512 will be used. +// The default value is also used when the provided value is less than one. +func WithExportMaxBatchSize(size int) BatchProcessorOption { + return batchOptionFunc(func(cfg batchConfig) batchConfig { + cfg.expMaxBatchSize = newSetting(size) + return cfg + }) +} + +// WithExportBufferSize sets the batch buffer size. +// Batches will be temporarily kept in a memory buffer until they are exported. +// +// By default, a value of 1 will be used. +// The default value is also used when the provided value is less than one. +func WithExportBufferSize(size int) BatchProcessorOption { + return batchOptionFunc(func(cfg batchConfig) batchConfig { + cfg.expBufferSize = newSetting(size) + return cfg + }) +} |
