diff options
Diffstat (limited to 'vendor/github.com/sourcegraph/conc/pool')
6 files changed, 685 insertions, 0 deletions
diff --git a/vendor/github.com/sourcegraph/conc/pool/context_pool.go b/vendor/github.com/sourcegraph/conc/pool/context_pool.go new file mode 100644 index 000000000..85c34e5ae --- /dev/null +++ b/vendor/github.com/sourcegraph/conc/pool/context_pool.go @@ -0,0 +1,104 @@ +package pool + +import ( + "context" +) + +// ContextPool is a pool that runs tasks that take a context. +// A new ContextPool should be created with `New().WithContext(ctx)`. +// +// The configuration methods (With*) will panic if they are used after calling +// Go() for the first time. +type ContextPool struct { + errorPool ErrorPool + + ctx context.Context + cancel context.CancelFunc + + cancelOnError bool +} + +// Go submits a task. If it returns an error, the error will be +// collected and returned by Wait(). If all goroutines in the pool +// are busy, a call to Go() will block until the task can be started. +func (p *ContextPool) Go(f func(ctx context.Context) error) { + p.errorPool.Go(func() error { + if p.cancelOnError { + // If we are cancelling on error, then we also want to cancel if a + // panic is raised. To do this, we need to recover, cancel, and then + // re-throw the caught panic. + defer func() { + if r := recover(); r != nil { + p.cancel() + panic(r) + } + }() + } + + err := f(p.ctx) + if err != nil && p.cancelOnError { + // Leaky abstraction warning: We add the error directly because + // otherwise, canceling could cause another goroutine to exit and + // return an error before this error was added, which breaks the + // expectations of WithFirstError(). + p.errorPool.addErr(err) + p.cancel() + return nil + } + return err + }) +} + +// Wait cleans up all spawned goroutines, propagates any panics, and +// returns an error if any of the tasks errored. +func (p *ContextPool) Wait() error { + // Make sure we call cancel after pool is done to avoid memory leakage. + defer p.cancel() + return p.errorPool.Wait() +} + +// WithFirstError configures the pool to only return the first error +// returned by a task. By default, Wait() will return a combined error. +// This is particularly useful for (*ContextPool).WithCancelOnError(), +// where all errors after the first are likely to be context.Canceled. +func (p *ContextPool) WithFirstError() *ContextPool { + p.panicIfInitialized() + p.errorPool.WithFirstError() + return p +} + +// WithCancelOnError configures the pool to cancel its context as soon as +// any task returns an error or panics. By default, the pool's context is not +// canceled until the parent context is canceled. +// +// In this case, all errors returned from the pool after the first will +// likely be context.Canceled - you may want to also use +// (*ContextPool).WithFirstError() to configure the pool to only return +// the first error. +func (p *ContextPool) WithCancelOnError() *ContextPool { + p.panicIfInitialized() + p.cancelOnError = true + return p +} + +// WithFailFast is an alias for the combination of WithFirstError and +// WithCancelOnError. By default, the errors from all tasks are returned and +// the pool's context is not canceled until the parent context is canceled. +func (p *ContextPool) WithFailFast() *ContextPool { + p.panicIfInitialized() + p.WithFirstError() + p.WithCancelOnError() + return p +} + +// WithMaxGoroutines limits the number of goroutines in a pool. +// Defaults to unlimited. Panics if n < 1. +func (p *ContextPool) WithMaxGoroutines(n int) *ContextPool { + p.panicIfInitialized() + p.errorPool.WithMaxGoroutines(n) + return p +} + +func (p *ContextPool) panicIfInitialized() { + p.errorPool.panicIfInitialized() +} diff --git a/vendor/github.com/sourcegraph/conc/pool/error_pool.go b/vendor/github.com/sourcegraph/conc/pool/error_pool.go new file mode 100644 index 000000000..e1789e61b --- /dev/null +++ b/vendor/github.com/sourcegraph/conc/pool/error_pool.go @@ -0,0 +1,100 @@ +package pool + +import ( + "context" + "errors" + "sync" +) + +// ErrorPool is a pool that runs tasks that may return an error. +// Errors are collected and returned by Wait(). +// +// The configuration methods (With*) will panic if they are used after calling +// Go() for the first time. +// +// A new ErrorPool should be created using `New().WithErrors()`. +type ErrorPool struct { + pool Pool + + onlyFirstError bool + + mu sync.Mutex + errs []error +} + +// Go submits a task to the pool. If all goroutines in the pool +// are busy, a call to Go() will block until the task can be started. +func (p *ErrorPool) Go(f func() error) { + p.pool.Go(func() { + p.addErr(f()) + }) +} + +// Wait cleans up any spawned goroutines, propagating any panics and +// returning any errors from tasks. +func (p *ErrorPool) Wait() error { + p.pool.Wait() + + errs := p.errs + p.errs = nil // reset errs + + if len(errs) == 0 { + return nil + } else if p.onlyFirstError { + return errs[0] + } else { + return errors.Join(errs...) + } +} + +// WithContext converts the pool to a ContextPool for tasks that should +// run under the same context, such that they each respect shared cancellation. +// For example, WithCancelOnError can be configured on the returned pool to +// signal that all goroutines should be cancelled upon the first error. +func (p *ErrorPool) WithContext(ctx context.Context) *ContextPool { + p.panicIfInitialized() + ctx, cancel := context.WithCancel(ctx) + return &ContextPool{ + errorPool: p.deref(), + ctx: ctx, + cancel: cancel, + } +} + +// WithFirstError configures the pool to only return the first error +// returned by a task. By default, Wait() will return a combined error. +func (p *ErrorPool) WithFirstError() *ErrorPool { + p.panicIfInitialized() + p.onlyFirstError = true + return p +} + +// WithMaxGoroutines limits the number of goroutines in a pool. +// Defaults to unlimited. Panics if n < 1. +func (p *ErrorPool) WithMaxGoroutines(n int) *ErrorPool { + p.panicIfInitialized() + p.pool.WithMaxGoroutines(n) + return p +} + +// deref is a helper that creates a shallow copy of the pool with the same +// settings. We don't want to just dereference the pointer because that makes +// the copylock lint angry. +func (p *ErrorPool) deref() ErrorPool { + return ErrorPool{ + pool: p.pool.deref(), + onlyFirstError: p.onlyFirstError, + } +} + +func (p *ErrorPool) panicIfInitialized() { + p.pool.panicIfInitialized() +} + +func (p *ErrorPool) addErr(err error) { + if err != nil { + p.mu.Lock() + p.errs = append(p.errs, err) + p.mu.Unlock() + } +} diff --git a/vendor/github.com/sourcegraph/conc/pool/pool.go b/vendor/github.com/sourcegraph/conc/pool/pool.go new file mode 100644 index 000000000..8f4494efb --- /dev/null +++ b/vendor/github.com/sourcegraph/conc/pool/pool.go @@ -0,0 +1,174 @@ +package pool + +import ( + "context" + "sync" + + "github.com/sourcegraph/conc" +) + +// New creates a new Pool. +func New() *Pool { + return &Pool{} +} + +// Pool is a pool of goroutines used to execute tasks concurrently. +// +// Tasks are submitted with Go(). Once all your tasks have been submitted, you +// must call Wait() to clean up any spawned goroutines and propagate any +// panics. +// +// Goroutines are started lazily, so creating a new pool is cheap. There will +// never be more goroutines spawned than there are tasks submitted. +// +// The configuration methods (With*) will panic if they are used after calling +// Go() for the first time. +// +// Pool is efficient, but not zero cost. It should not be used for very short +// tasks. Startup and teardown come with an overhead of around 1µs, and each +// task has an overhead of around 300ns. +type Pool struct { + handle conc.WaitGroup + limiter limiter + tasks chan func() + initOnce sync.Once +} + +// Go submits a task to be run in the pool. If all goroutines in the pool +// are busy, a call to Go() will block until the task can be started. +func (p *Pool) Go(f func()) { + p.init() + + if p.limiter == nil { + // No limit on the number of goroutines. + select { + case p.tasks <- f: + // A goroutine was available to handle the task. + default: + // No goroutine was available to handle the task. + // Spawn a new one and send it the task. + p.handle.Go(func() { + p.worker(f) + }) + } + } else { + select { + case p.limiter <- struct{}{}: + // If we are below our limit, spawn a new worker rather + // than waiting for one to become available. + p.handle.Go(func() { + p.worker(f) + }) + case p.tasks <- f: + // A worker is available and has accepted the task. + return + } + } + +} + +// Wait cleans up spawned goroutines, propagating any panics that were +// raised by a tasks. +func (p *Pool) Wait() { + p.init() + + close(p.tasks) + + // After Wait() returns, reset the struct so tasks will be reinitialized on + // next use. This better matches the behavior of sync.WaitGroup + defer func() { p.initOnce = sync.Once{} }() + + p.handle.Wait() +} + +// MaxGoroutines returns the maximum size of the pool. +func (p *Pool) MaxGoroutines() int { + return p.limiter.limit() +} + +// WithMaxGoroutines limits the number of goroutines in a pool. +// Defaults to unlimited. Panics if n < 1. +func (p *Pool) WithMaxGoroutines(n int) *Pool { + p.panicIfInitialized() + if n < 1 { + panic("max goroutines in a pool must be greater than zero") + } + p.limiter = make(limiter, n) + return p +} + +// init ensures that the pool is initialized before use. This makes the +// zero value of the pool usable. +func (p *Pool) init() { + p.initOnce.Do(func() { + p.tasks = make(chan func()) + }) +} + +// panicIfInitialized will trigger a panic if a configuration method is called +// after the pool has started any goroutines for the first time. In the case that +// new settings are needed, a new pool should be created. +func (p *Pool) panicIfInitialized() { + if p.tasks != nil { + panic("pool can not be reconfigured after calling Go() for the first time") + } +} + +// WithErrors converts the pool to an ErrorPool so the submitted tasks can +// return errors. +func (p *Pool) WithErrors() *ErrorPool { + p.panicIfInitialized() + return &ErrorPool{ + pool: p.deref(), + } +} + +// deref is a helper that creates a shallow copy of the pool with the same +// settings. We don't want to just dereference the pointer because that makes +// the copylock lint angry. +func (p *Pool) deref() Pool { + p.panicIfInitialized() + return Pool{ + limiter: p.limiter, + } +} + +// WithContext converts the pool to a ContextPool for tasks that should +// run under the same context, such that they each respect shared cancellation. +// For example, WithCancelOnError can be configured on the returned pool to +// signal that all goroutines should be cancelled upon the first error. +func (p *Pool) WithContext(ctx context.Context) *ContextPool { + p.panicIfInitialized() + ctx, cancel := context.WithCancel(ctx) + return &ContextPool{ + errorPool: p.WithErrors().deref(), + ctx: ctx, + cancel: cancel, + } +} + +func (p *Pool) worker(initialFunc func()) { + // The only time this matters is if the task panics. + // This makes it possible to spin up new workers in that case. + defer p.limiter.release() + + if initialFunc != nil { + initialFunc() + } + + for f := range p.tasks { + f() + } +} + +type limiter chan struct{} + +func (l limiter) limit() int { + return cap(l) +} + +func (l limiter) release() { + if l != nil { + <-l + } +} diff --git a/vendor/github.com/sourcegraph/conc/pool/result_context_pool.go b/vendor/github.com/sourcegraph/conc/pool/result_context_pool.go new file mode 100644 index 000000000..6bc30dd63 --- /dev/null +++ b/vendor/github.com/sourcegraph/conc/pool/result_context_pool.go @@ -0,0 +1,85 @@ +package pool + +import ( + "context" +) + +// ResultContextPool is a pool that runs tasks that take a context and return a +// result. The context passed to the task will be canceled if any of the tasks +// return an error, which makes its functionality different than just capturing +// a context with the task closure. +// +// The configuration methods (With*) will panic if they are used after calling +// Go() for the first time. +type ResultContextPool[T any] struct { + contextPool ContextPool + agg resultAggregator[T] + collectErrored bool +} + +// Go submits a task to the pool. If all goroutines in the pool +// are busy, a call to Go() will block until the task can be started. +func (p *ResultContextPool[T]) Go(f func(context.Context) (T, error)) { + idx := p.agg.nextIndex() + p.contextPool.Go(func(ctx context.Context) error { + res, err := f(ctx) + p.agg.save(idx, res, err != nil) + return err + }) +} + +// Wait cleans up all spawned goroutines, propagates any panics, and +// returns an error if any of the tasks errored. +func (p *ResultContextPool[T]) Wait() ([]T, error) { + err := p.contextPool.Wait() + results := p.agg.collect(p.collectErrored) + p.agg = resultAggregator[T]{} + return results, err +} + +// WithCollectErrored configures the pool to still collect the result of a task +// even if the task returned an error. By default, the result of tasks that errored +// are ignored and only the error is collected. +func (p *ResultContextPool[T]) WithCollectErrored() *ResultContextPool[T] { + p.panicIfInitialized() + p.collectErrored = true + return p +} + +// WithFirstError configures the pool to only return the first error +// returned by a task. By default, Wait() will return a combined error. +func (p *ResultContextPool[T]) WithFirstError() *ResultContextPool[T] { + p.panicIfInitialized() + p.contextPool.WithFirstError() + return p +} + +// WithCancelOnError configures the pool to cancel its context as soon as +// any task returns an error. By default, the pool's context is not +// canceled until the parent context is canceled. +func (p *ResultContextPool[T]) WithCancelOnError() *ResultContextPool[T] { + p.panicIfInitialized() + p.contextPool.WithCancelOnError() + return p +} + +// WithFailFast is an alias for the combination of WithFirstError and +// WithCancelOnError. By default, the errors from all tasks are returned and +// the pool's context is not canceled until the parent context is canceled. +func (p *ResultContextPool[T]) WithFailFast() *ResultContextPool[T] { + p.panicIfInitialized() + p.contextPool.WithFailFast() + return p +} + +// WithMaxGoroutines limits the number of goroutines in a pool. +// Defaults to unlimited. Panics if n < 1. +func (p *ResultContextPool[T]) WithMaxGoroutines(n int) *ResultContextPool[T] { + p.panicIfInitialized() + p.contextPool.WithMaxGoroutines(n) + return p +} + +func (p *ResultContextPool[T]) panicIfInitialized() { + p.contextPool.panicIfInitialized() +} diff --git a/vendor/github.com/sourcegraph/conc/pool/result_error_pool.go b/vendor/github.com/sourcegraph/conc/pool/result_error_pool.go new file mode 100644 index 000000000..832cd9bb4 --- /dev/null +++ b/vendor/github.com/sourcegraph/conc/pool/result_error_pool.go @@ -0,0 +1,80 @@ +package pool + +import ( + "context" +) + +// ResultErrorPool is a pool that executes tasks that return a generic result +// type and an error. Tasks are executed in the pool with Go(), then the +// results of the tasks are returned by Wait(). +// +// The order of the results is guaranteed to be the same as the order the +// tasks were submitted. +// +// The configuration methods (With*) will panic if they are used after calling +// Go() for the first time. +type ResultErrorPool[T any] struct { + errorPool ErrorPool + agg resultAggregator[T] + collectErrored bool +} + +// Go submits a task to the pool. If all goroutines in the pool +// are busy, a call to Go() will block until the task can be started. +func (p *ResultErrorPool[T]) Go(f func() (T, error)) { + idx := p.agg.nextIndex() + p.errorPool.Go(func() error { + res, err := f() + p.agg.save(idx, res, err != nil) + return err + }) +} + +// Wait cleans up any spawned goroutines, propagating any panics and +// returning the results and any errors from tasks. +func (p *ResultErrorPool[T]) Wait() ([]T, error) { + err := p.errorPool.Wait() + results := p.agg.collect(p.collectErrored) + p.agg = resultAggregator[T]{} // reset for reuse + return results, err +} + +// WithCollectErrored configures the pool to still collect the result of a task +// even if the task returned an error. By default, the result of tasks that errored +// are ignored and only the error is collected. +func (p *ResultErrorPool[T]) WithCollectErrored() *ResultErrorPool[T] { + p.panicIfInitialized() + p.collectErrored = true + return p +} + +// WithContext converts the pool to a ResultContextPool for tasks that should +// run under the same context, such that they each respect shared cancellation. +// For example, WithCancelOnError can be configured on the returned pool to +// signal that all goroutines should be cancelled upon the first error. +func (p *ResultErrorPool[T]) WithContext(ctx context.Context) *ResultContextPool[T] { + p.panicIfInitialized() + return &ResultContextPool[T]{ + contextPool: *p.errorPool.WithContext(ctx), + } +} + +// WithFirstError configures the pool to only return the first error +// returned by a task. By default, Wait() will return a combined error. +func (p *ResultErrorPool[T]) WithFirstError() *ResultErrorPool[T] { + p.panicIfInitialized() + p.errorPool.WithFirstError() + return p +} + +// WithMaxGoroutines limits the number of goroutines in a pool. +// Defaults to unlimited. Panics if n < 1. +func (p *ResultErrorPool[T]) WithMaxGoroutines(n int) *ResultErrorPool[T] { + p.panicIfInitialized() + p.errorPool.WithMaxGoroutines(n) + return p +} + +func (p *ResultErrorPool[T]) panicIfInitialized() { + p.errorPool.panicIfInitialized() +} diff --git a/vendor/github.com/sourcegraph/conc/pool/result_pool.go b/vendor/github.com/sourcegraph/conc/pool/result_pool.go new file mode 100644 index 000000000..f73a77261 --- /dev/null +++ b/vendor/github.com/sourcegraph/conc/pool/result_pool.go @@ -0,0 +1,142 @@ +package pool + +import ( + "context" + "sort" + "sync" +) + +// NewWithResults creates a new ResultPool for tasks with a result of type T. +// +// The configuration methods (With*) will panic if they are used after calling +// Go() for the first time. +func NewWithResults[T any]() *ResultPool[T] { + return &ResultPool[T]{ + pool: *New(), + } +} + +// ResultPool is a pool that executes tasks that return a generic result type. +// Tasks are executed in the pool with Go(), then the results of the tasks are +// returned by Wait(). +// +// The order of the results is guaranteed to be the same as the order the +// tasks were submitted. +type ResultPool[T any] struct { + pool Pool + agg resultAggregator[T] +} + +// Go submits a task to the pool. If all goroutines in the pool +// are busy, a call to Go() will block until the task can be started. +func (p *ResultPool[T]) Go(f func() T) { + idx := p.agg.nextIndex() + p.pool.Go(func() { + p.agg.save(idx, f(), false) + }) +} + +// Wait cleans up all spawned goroutines, propagating any panics, and returning +// a slice of results from tasks that did not panic. +func (p *ResultPool[T]) Wait() []T { + p.pool.Wait() + results := p.agg.collect(true) + p.agg = resultAggregator[T]{} // reset for reuse + return results +} + +// MaxGoroutines returns the maximum size of the pool. +func (p *ResultPool[T]) MaxGoroutines() int { + return p.pool.MaxGoroutines() +} + +// WithErrors converts the pool to an ResultErrorPool so the submitted tasks +// can return errors. +func (p *ResultPool[T]) WithErrors() *ResultErrorPool[T] { + p.panicIfInitialized() + return &ResultErrorPool[T]{ + errorPool: *p.pool.WithErrors(), + } +} + +// WithContext converts the pool to a ResultContextPool for tasks that should +// run under the same context, such that they each respect shared cancellation. +// For example, WithCancelOnError can be configured on the returned pool to +// signal that all goroutines should be cancelled upon the first error. +func (p *ResultPool[T]) WithContext(ctx context.Context) *ResultContextPool[T] { + p.panicIfInitialized() + return &ResultContextPool[T]{ + contextPool: *p.pool.WithContext(ctx), + } +} + +// WithMaxGoroutines limits the number of goroutines in a pool. +// Defaults to unlimited. Panics if n < 1. +func (p *ResultPool[T]) WithMaxGoroutines(n int) *ResultPool[T] { + p.panicIfInitialized() + p.pool.WithMaxGoroutines(n) + return p +} + +func (p *ResultPool[T]) panicIfInitialized() { + p.pool.panicIfInitialized() +} + +// resultAggregator is a utility type that lets us safely append from multiple +// goroutines. The zero value is valid and ready to use. +type resultAggregator[T any] struct { + mu sync.Mutex + len int + results []T + errored []int +} + +// nextIndex reserves a slot for a result. The returned value should be passed +// to save() when adding a result to the aggregator. +func (r *resultAggregator[T]) nextIndex() int { + r.mu.Lock() + defer r.mu.Unlock() + + nextIdx := r.len + r.len += 1 + return nextIdx +} + +func (r *resultAggregator[T]) save(i int, res T, errored bool) { + r.mu.Lock() + defer r.mu.Unlock() + + if i >= len(r.results) { + old := r.results + r.results = make([]T, r.len) + copy(r.results, old) + } + + r.results[i] = res + + if errored { + r.errored = append(r.errored, i) + } +} + +// collect returns the set of aggregated results. +func (r *resultAggregator[T]) collect(collectErrored bool) []T { + if !r.mu.TryLock() { + panic("collect should not be called until all goroutines have exited") + } + + if collectErrored || len(r.errored) == 0 { + return r.results + } + + filtered := r.results[:0] + sort.Ints(r.errored) + for i, e := range r.errored { + if i == 0 { + filtered = append(filtered, r.results[:e]...) + } else { + filtered = append(filtered, r.results[r.errored[i-1]+1:e]...) + } + } + return filtered +} |
