summaryrefslogtreecommitdiff
path: root/vendor/github.com/sourcegraph/conc/pool
diff options
context:
space:
mode:
Diffstat (limited to 'vendor/github.com/sourcegraph/conc/pool')
-rw-r--r--vendor/github.com/sourcegraph/conc/pool/context_pool.go104
-rw-r--r--vendor/github.com/sourcegraph/conc/pool/error_pool.go100
-rw-r--r--vendor/github.com/sourcegraph/conc/pool/pool.go174
-rw-r--r--vendor/github.com/sourcegraph/conc/pool/result_context_pool.go85
-rw-r--r--vendor/github.com/sourcegraph/conc/pool/result_error_pool.go80
-rw-r--r--vendor/github.com/sourcegraph/conc/pool/result_pool.go142
6 files changed, 685 insertions, 0 deletions
diff --git a/vendor/github.com/sourcegraph/conc/pool/context_pool.go b/vendor/github.com/sourcegraph/conc/pool/context_pool.go
new file mode 100644
index 000000000..85c34e5ae
--- /dev/null
+++ b/vendor/github.com/sourcegraph/conc/pool/context_pool.go
@@ -0,0 +1,104 @@
+package pool
+
+import (
+ "context"
+)
+
+// ContextPool is a pool that runs tasks that take a context.
+// A new ContextPool should be created with `New().WithContext(ctx)`.
+//
+// The configuration methods (With*) will panic if they are used after calling
+// Go() for the first time.
+type ContextPool struct {
+ errorPool ErrorPool
+
+ ctx context.Context
+ cancel context.CancelFunc
+
+ cancelOnError bool
+}
+
+// Go submits a task. If it returns an error, the error will be
+// collected and returned by Wait(). If all goroutines in the pool
+// are busy, a call to Go() will block until the task can be started.
+func (p *ContextPool) Go(f func(ctx context.Context) error) {
+ p.errorPool.Go(func() error {
+ if p.cancelOnError {
+ // If we are cancelling on error, then we also want to cancel if a
+ // panic is raised. To do this, we need to recover, cancel, and then
+ // re-throw the caught panic.
+ defer func() {
+ if r := recover(); r != nil {
+ p.cancel()
+ panic(r)
+ }
+ }()
+ }
+
+ err := f(p.ctx)
+ if err != nil && p.cancelOnError {
+ // Leaky abstraction warning: We add the error directly because
+ // otherwise, canceling could cause another goroutine to exit and
+ // return an error before this error was added, which breaks the
+ // expectations of WithFirstError().
+ p.errorPool.addErr(err)
+ p.cancel()
+ return nil
+ }
+ return err
+ })
+}
+
+// Wait cleans up all spawned goroutines, propagates any panics, and
+// returns an error if any of the tasks errored.
+func (p *ContextPool) Wait() error {
+ // Make sure we call cancel after pool is done to avoid memory leakage.
+ defer p.cancel()
+ return p.errorPool.Wait()
+}
+
+// WithFirstError configures the pool to only return the first error
+// returned by a task. By default, Wait() will return a combined error.
+// This is particularly useful for (*ContextPool).WithCancelOnError(),
+// where all errors after the first are likely to be context.Canceled.
+func (p *ContextPool) WithFirstError() *ContextPool {
+ p.panicIfInitialized()
+ p.errorPool.WithFirstError()
+ return p
+}
+
+// WithCancelOnError configures the pool to cancel its context as soon as
+// any task returns an error or panics. By default, the pool's context is not
+// canceled until the parent context is canceled.
+//
+// In this case, all errors returned from the pool after the first will
+// likely be context.Canceled - you may want to also use
+// (*ContextPool).WithFirstError() to configure the pool to only return
+// the first error.
+func (p *ContextPool) WithCancelOnError() *ContextPool {
+ p.panicIfInitialized()
+ p.cancelOnError = true
+ return p
+}
+
+// WithFailFast is an alias for the combination of WithFirstError and
+// WithCancelOnError. By default, the errors from all tasks are returned and
+// the pool's context is not canceled until the parent context is canceled.
+func (p *ContextPool) WithFailFast() *ContextPool {
+ p.panicIfInitialized()
+ p.WithFirstError()
+ p.WithCancelOnError()
+ return p
+}
+
+// WithMaxGoroutines limits the number of goroutines in a pool.
+// Defaults to unlimited. Panics if n < 1.
+func (p *ContextPool) WithMaxGoroutines(n int) *ContextPool {
+ p.panicIfInitialized()
+ p.errorPool.WithMaxGoroutines(n)
+ return p
+}
+
+func (p *ContextPool) panicIfInitialized() {
+ p.errorPool.panicIfInitialized()
+}
diff --git a/vendor/github.com/sourcegraph/conc/pool/error_pool.go b/vendor/github.com/sourcegraph/conc/pool/error_pool.go
new file mode 100644
index 000000000..e1789e61b
--- /dev/null
+++ b/vendor/github.com/sourcegraph/conc/pool/error_pool.go
@@ -0,0 +1,100 @@
+package pool
+
+import (
+ "context"
+ "errors"
+ "sync"
+)
+
+// ErrorPool is a pool that runs tasks that may return an error.
+// Errors are collected and returned by Wait().
+//
+// The configuration methods (With*) will panic if they are used after calling
+// Go() for the first time.
+//
+// A new ErrorPool should be created using `New().WithErrors()`.
+type ErrorPool struct {
+ pool Pool
+
+ onlyFirstError bool
+
+ mu sync.Mutex
+ errs []error
+}
+
+// Go submits a task to the pool. If all goroutines in the pool
+// are busy, a call to Go() will block until the task can be started.
+func (p *ErrorPool) Go(f func() error) {
+ p.pool.Go(func() {
+ p.addErr(f())
+ })
+}
+
+// Wait cleans up any spawned goroutines, propagating any panics and
+// returning any errors from tasks.
+func (p *ErrorPool) Wait() error {
+ p.pool.Wait()
+
+ errs := p.errs
+ p.errs = nil // reset errs
+
+ if len(errs) == 0 {
+ return nil
+ } else if p.onlyFirstError {
+ return errs[0]
+ } else {
+ return errors.Join(errs...)
+ }
+}
+
+// WithContext converts the pool to a ContextPool for tasks that should
+// run under the same context, such that they each respect shared cancellation.
+// For example, WithCancelOnError can be configured on the returned pool to
+// signal that all goroutines should be cancelled upon the first error.
+func (p *ErrorPool) WithContext(ctx context.Context) *ContextPool {
+ p.panicIfInitialized()
+ ctx, cancel := context.WithCancel(ctx)
+ return &ContextPool{
+ errorPool: p.deref(),
+ ctx: ctx,
+ cancel: cancel,
+ }
+}
+
+// WithFirstError configures the pool to only return the first error
+// returned by a task. By default, Wait() will return a combined error.
+func (p *ErrorPool) WithFirstError() *ErrorPool {
+ p.panicIfInitialized()
+ p.onlyFirstError = true
+ return p
+}
+
+// WithMaxGoroutines limits the number of goroutines in a pool.
+// Defaults to unlimited. Panics if n < 1.
+func (p *ErrorPool) WithMaxGoroutines(n int) *ErrorPool {
+ p.panicIfInitialized()
+ p.pool.WithMaxGoroutines(n)
+ return p
+}
+
+// deref is a helper that creates a shallow copy of the pool with the same
+// settings. We don't want to just dereference the pointer because that makes
+// the copylock lint angry.
+func (p *ErrorPool) deref() ErrorPool {
+ return ErrorPool{
+ pool: p.pool.deref(),
+ onlyFirstError: p.onlyFirstError,
+ }
+}
+
+func (p *ErrorPool) panicIfInitialized() {
+ p.pool.panicIfInitialized()
+}
+
+func (p *ErrorPool) addErr(err error) {
+ if err != nil {
+ p.mu.Lock()
+ p.errs = append(p.errs, err)
+ p.mu.Unlock()
+ }
+}
diff --git a/vendor/github.com/sourcegraph/conc/pool/pool.go b/vendor/github.com/sourcegraph/conc/pool/pool.go
new file mode 100644
index 000000000..8f4494efb
--- /dev/null
+++ b/vendor/github.com/sourcegraph/conc/pool/pool.go
@@ -0,0 +1,174 @@
+package pool
+
+import (
+ "context"
+ "sync"
+
+ "github.com/sourcegraph/conc"
+)
+
+// New creates a new Pool.
+func New() *Pool {
+ return &Pool{}
+}
+
+// Pool is a pool of goroutines used to execute tasks concurrently.
+//
+// Tasks are submitted with Go(). Once all your tasks have been submitted, you
+// must call Wait() to clean up any spawned goroutines and propagate any
+// panics.
+//
+// Goroutines are started lazily, so creating a new pool is cheap. There will
+// never be more goroutines spawned than there are tasks submitted.
+//
+// The configuration methods (With*) will panic if they are used after calling
+// Go() for the first time.
+//
+// Pool is efficient, but not zero cost. It should not be used for very short
+// tasks. Startup and teardown come with an overhead of around 1µs, and each
+// task has an overhead of around 300ns.
+type Pool struct {
+ handle conc.WaitGroup
+ limiter limiter
+ tasks chan func()
+ initOnce sync.Once
+}
+
+// Go submits a task to be run in the pool. If all goroutines in the pool
+// are busy, a call to Go() will block until the task can be started.
+func (p *Pool) Go(f func()) {
+ p.init()
+
+ if p.limiter == nil {
+ // No limit on the number of goroutines.
+ select {
+ case p.tasks <- f:
+ // A goroutine was available to handle the task.
+ default:
+ // No goroutine was available to handle the task.
+ // Spawn a new one and send it the task.
+ p.handle.Go(func() {
+ p.worker(f)
+ })
+ }
+ } else {
+ select {
+ case p.limiter <- struct{}{}:
+ // If we are below our limit, spawn a new worker rather
+ // than waiting for one to become available.
+ p.handle.Go(func() {
+ p.worker(f)
+ })
+ case p.tasks <- f:
+ // A worker is available and has accepted the task.
+ return
+ }
+ }
+
+}
+
+// Wait cleans up spawned goroutines, propagating any panics that were
+// raised by a tasks.
+func (p *Pool) Wait() {
+ p.init()
+
+ close(p.tasks)
+
+ // After Wait() returns, reset the struct so tasks will be reinitialized on
+ // next use. This better matches the behavior of sync.WaitGroup
+ defer func() { p.initOnce = sync.Once{} }()
+
+ p.handle.Wait()
+}
+
+// MaxGoroutines returns the maximum size of the pool.
+func (p *Pool) MaxGoroutines() int {
+ return p.limiter.limit()
+}
+
+// WithMaxGoroutines limits the number of goroutines in a pool.
+// Defaults to unlimited. Panics if n < 1.
+func (p *Pool) WithMaxGoroutines(n int) *Pool {
+ p.panicIfInitialized()
+ if n < 1 {
+ panic("max goroutines in a pool must be greater than zero")
+ }
+ p.limiter = make(limiter, n)
+ return p
+}
+
+// init ensures that the pool is initialized before use. This makes the
+// zero value of the pool usable.
+func (p *Pool) init() {
+ p.initOnce.Do(func() {
+ p.tasks = make(chan func())
+ })
+}
+
+// panicIfInitialized will trigger a panic if a configuration method is called
+// after the pool has started any goroutines for the first time. In the case that
+// new settings are needed, a new pool should be created.
+func (p *Pool) panicIfInitialized() {
+ if p.tasks != nil {
+ panic("pool can not be reconfigured after calling Go() for the first time")
+ }
+}
+
+// WithErrors converts the pool to an ErrorPool so the submitted tasks can
+// return errors.
+func (p *Pool) WithErrors() *ErrorPool {
+ p.panicIfInitialized()
+ return &ErrorPool{
+ pool: p.deref(),
+ }
+}
+
+// deref is a helper that creates a shallow copy of the pool with the same
+// settings. We don't want to just dereference the pointer because that makes
+// the copylock lint angry.
+func (p *Pool) deref() Pool {
+ p.panicIfInitialized()
+ return Pool{
+ limiter: p.limiter,
+ }
+}
+
+// WithContext converts the pool to a ContextPool for tasks that should
+// run under the same context, such that they each respect shared cancellation.
+// For example, WithCancelOnError can be configured on the returned pool to
+// signal that all goroutines should be cancelled upon the first error.
+func (p *Pool) WithContext(ctx context.Context) *ContextPool {
+ p.panicIfInitialized()
+ ctx, cancel := context.WithCancel(ctx)
+ return &ContextPool{
+ errorPool: p.WithErrors().deref(),
+ ctx: ctx,
+ cancel: cancel,
+ }
+}
+
+func (p *Pool) worker(initialFunc func()) {
+ // The only time this matters is if the task panics.
+ // This makes it possible to spin up new workers in that case.
+ defer p.limiter.release()
+
+ if initialFunc != nil {
+ initialFunc()
+ }
+
+ for f := range p.tasks {
+ f()
+ }
+}
+
+type limiter chan struct{}
+
+func (l limiter) limit() int {
+ return cap(l)
+}
+
+func (l limiter) release() {
+ if l != nil {
+ <-l
+ }
+}
diff --git a/vendor/github.com/sourcegraph/conc/pool/result_context_pool.go b/vendor/github.com/sourcegraph/conc/pool/result_context_pool.go
new file mode 100644
index 000000000..6bc30dd63
--- /dev/null
+++ b/vendor/github.com/sourcegraph/conc/pool/result_context_pool.go
@@ -0,0 +1,85 @@
+package pool
+
+import (
+ "context"
+)
+
+// ResultContextPool is a pool that runs tasks that take a context and return a
+// result. The context passed to the task will be canceled if any of the tasks
+// return an error, which makes its functionality different than just capturing
+// a context with the task closure.
+//
+// The configuration methods (With*) will panic if they are used after calling
+// Go() for the first time.
+type ResultContextPool[T any] struct {
+ contextPool ContextPool
+ agg resultAggregator[T]
+ collectErrored bool
+}
+
+// Go submits a task to the pool. If all goroutines in the pool
+// are busy, a call to Go() will block until the task can be started.
+func (p *ResultContextPool[T]) Go(f func(context.Context) (T, error)) {
+ idx := p.agg.nextIndex()
+ p.contextPool.Go(func(ctx context.Context) error {
+ res, err := f(ctx)
+ p.agg.save(idx, res, err != nil)
+ return err
+ })
+}
+
+// Wait cleans up all spawned goroutines, propagates any panics, and
+// returns an error if any of the tasks errored.
+func (p *ResultContextPool[T]) Wait() ([]T, error) {
+ err := p.contextPool.Wait()
+ results := p.agg.collect(p.collectErrored)
+ p.agg = resultAggregator[T]{}
+ return results, err
+}
+
+// WithCollectErrored configures the pool to still collect the result of a task
+// even if the task returned an error. By default, the result of tasks that errored
+// are ignored and only the error is collected.
+func (p *ResultContextPool[T]) WithCollectErrored() *ResultContextPool[T] {
+ p.panicIfInitialized()
+ p.collectErrored = true
+ return p
+}
+
+// WithFirstError configures the pool to only return the first error
+// returned by a task. By default, Wait() will return a combined error.
+func (p *ResultContextPool[T]) WithFirstError() *ResultContextPool[T] {
+ p.panicIfInitialized()
+ p.contextPool.WithFirstError()
+ return p
+}
+
+// WithCancelOnError configures the pool to cancel its context as soon as
+// any task returns an error. By default, the pool's context is not
+// canceled until the parent context is canceled.
+func (p *ResultContextPool[T]) WithCancelOnError() *ResultContextPool[T] {
+ p.panicIfInitialized()
+ p.contextPool.WithCancelOnError()
+ return p
+}
+
+// WithFailFast is an alias for the combination of WithFirstError and
+// WithCancelOnError. By default, the errors from all tasks are returned and
+// the pool's context is not canceled until the parent context is canceled.
+func (p *ResultContextPool[T]) WithFailFast() *ResultContextPool[T] {
+ p.panicIfInitialized()
+ p.contextPool.WithFailFast()
+ return p
+}
+
+// WithMaxGoroutines limits the number of goroutines in a pool.
+// Defaults to unlimited. Panics if n < 1.
+func (p *ResultContextPool[T]) WithMaxGoroutines(n int) *ResultContextPool[T] {
+ p.panicIfInitialized()
+ p.contextPool.WithMaxGoroutines(n)
+ return p
+}
+
+func (p *ResultContextPool[T]) panicIfInitialized() {
+ p.contextPool.panicIfInitialized()
+}
diff --git a/vendor/github.com/sourcegraph/conc/pool/result_error_pool.go b/vendor/github.com/sourcegraph/conc/pool/result_error_pool.go
new file mode 100644
index 000000000..832cd9bb4
--- /dev/null
+++ b/vendor/github.com/sourcegraph/conc/pool/result_error_pool.go
@@ -0,0 +1,80 @@
+package pool
+
+import (
+ "context"
+)
+
+// ResultErrorPool is a pool that executes tasks that return a generic result
+// type and an error. Tasks are executed in the pool with Go(), then the
+// results of the tasks are returned by Wait().
+//
+// The order of the results is guaranteed to be the same as the order the
+// tasks were submitted.
+//
+// The configuration methods (With*) will panic if they are used after calling
+// Go() for the first time.
+type ResultErrorPool[T any] struct {
+ errorPool ErrorPool
+ agg resultAggregator[T]
+ collectErrored bool
+}
+
+// Go submits a task to the pool. If all goroutines in the pool
+// are busy, a call to Go() will block until the task can be started.
+func (p *ResultErrorPool[T]) Go(f func() (T, error)) {
+ idx := p.agg.nextIndex()
+ p.errorPool.Go(func() error {
+ res, err := f()
+ p.agg.save(idx, res, err != nil)
+ return err
+ })
+}
+
+// Wait cleans up any spawned goroutines, propagating any panics and
+// returning the results and any errors from tasks.
+func (p *ResultErrorPool[T]) Wait() ([]T, error) {
+ err := p.errorPool.Wait()
+ results := p.agg.collect(p.collectErrored)
+ p.agg = resultAggregator[T]{} // reset for reuse
+ return results, err
+}
+
+// WithCollectErrored configures the pool to still collect the result of a task
+// even if the task returned an error. By default, the result of tasks that errored
+// are ignored and only the error is collected.
+func (p *ResultErrorPool[T]) WithCollectErrored() *ResultErrorPool[T] {
+ p.panicIfInitialized()
+ p.collectErrored = true
+ return p
+}
+
+// WithContext converts the pool to a ResultContextPool for tasks that should
+// run under the same context, such that they each respect shared cancellation.
+// For example, WithCancelOnError can be configured on the returned pool to
+// signal that all goroutines should be cancelled upon the first error.
+func (p *ResultErrorPool[T]) WithContext(ctx context.Context) *ResultContextPool[T] {
+ p.panicIfInitialized()
+ return &ResultContextPool[T]{
+ contextPool: *p.errorPool.WithContext(ctx),
+ }
+}
+
+// WithFirstError configures the pool to only return the first error
+// returned by a task. By default, Wait() will return a combined error.
+func (p *ResultErrorPool[T]) WithFirstError() *ResultErrorPool[T] {
+ p.panicIfInitialized()
+ p.errorPool.WithFirstError()
+ return p
+}
+
+// WithMaxGoroutines limits the number of goroutines in a pool.
+// Defaults to unlimited. Panics if n < 1.
+func (p *ResultErrorPool[T]) WithMaxGoroutines(n int) *ResultErrorPool[T] {
+ p.panicIfInitialized()
+ p.errorPool.WithMaxGoroutines(n)
+ return p
+}
+
+func (p *ResultErrorPool[T]) panicIfInitialized() {
+ p.errorPool.panicIfInitialized()
+}
diff --git a/vendor/github.com/sourcegraph/conc/pool/result_pool.go b/vendor/github.com/sourcegraph/conc/pool/result_pool.go
new file mode 100644
index 000000000..f73a77261
--- /dev/null
+++ b/vendor/github.com/sourcegraph/conc/pool/result_pool.go
@@ -0,0 +1,142 @@
+package pool
+
+import (
+ "context"
+ "sort"
+ "sync"
+)
+
+// NewWithResults creates a new ResultPool for tasks with a result of type T.
+//
+// The configuration methods (With*) will panic if they are used after calling
+// Go() for the first time.
+func NewWithResults[T any]() *ResultPool[T] {
+ return &ResultPool[T]{
+ pool: *New(),
+ }
+}
+
+// ResultPool is a pool that executes tasks that return a generic result type.
+// Tasks are executed in the pool with Go(), then the results of the tasks are
+// returned by Wait().
+//
+// The order of the results is guaranteed to be the same as the order the
+// tasks were submitted.
+type ResultPool[T any] struct {
+ pool Pool
+ agg resultAggregator[T]
+}
+
+// Go submits a task to the pool. If all goroutines in the pool
+// are busy, a call to Go() will block until the task can be started.
+func (p *ResultPool[T]) Go(f func() T) {
+ idx := p.agg.nextIndex()
+ p.pool.Go(func() {
+ p.agg.save(idx, f(), false)
+ })
+}
+
+// Wait cleans up all spawned goroutines, propagating any panics, and returning
+// a slice of results from tasks that did not panic.
+func (p *ResultPool[T]) Wait() []T {
+ p.pool.Wait()
+ results := p.agg.collect(true)
+ p.agg = resultAggregator[T]{} // reset for reuse
+ return results
+}
+
+// MaxGoroutines returns the maximum size of the pool.
+func (p *ResultPool[T]) MaxGoroutines() int {
+ return p.pool.MaxGoroutines()
+}
+
+// WithErrors converts the pool to an ResultErrorPool so the submitted tasks
+// can return errors.
+func (p *ResultPool[T]) WithErrors() *ResultErrorPool[T] {
+ p.panicIfInitialized()
+ return &ResultErrorPool[T]{
+ errorPool: *p.pool.WithErrors(),
+ }
+}
+
+// WithContext converts the pool to a ResultContextPool for tasks that should
+// run under the same context, such that they each respect shared cancellation.
+// For example, WithCancelOnError can be configured on the returned pool to
+// signal that all goroutines should be cancelled upon the first error.
+func (p *ResultPool[T]) WithContext(ctx context.Context) *ResultContextPool[T] {
+ p.panicIfInitialized()
+ return &ResultContextPool[T]{
+ contextPool: *p.pool.WithContext(ctx),
+ }
+}
+
+// WithMaxGoroutines limits the number of goroutines in a pool.
+// Defaults to unlimited. Panics if n < 1.
+func (p *ResultPool[T]) WithMaxGoroutines(n int) *ResultPool[T] {
+ p.panicIfInitialized()
+ p.pool.WithMaxGoroutines(n)
+ return p
+}
+
+func (p *ResultPool[T]) panicIfInitialized() {
+ p.pool.panicIfInitialized()
+}
+
+// resultAggregator is a utility type that lets us safely append from multiple
+// goroutines. The zero value is valid and ready to use.
+type resultAggregator[T any] struct {
+ mu sync.Mutex
+ len int
+ results []T
+ errored []int
+}
+
+// nextIndex reserves a slot for a result. The returned value should be passed
+// to save() when adding a result to the aggregator.
+func (r *resultAggregator[T]) nextIndex() int {
+ r.mu.Lock()
+ defer r.mu.Unlock()
+
+ nextIdx := r.len
+ r.len += 1
+ return nextIdx
+}
+
+func (r *resultAggregator[T]) save(i int, res T, errored bool) {
+ r.mu.Lock()
+ defer r.mu.Unlock()
+
+ if i >= len(r.results) {
+ old := r.results
+ r.results = make([]T, r.len)
+ copy(r.results, old)
+ }
+
+ r.results[i] = res
+
+ if errored {
+ r.errored = append(r.errored, i)
+ }
+}
+
+// collect returns the set of aggregated results.
+func (r *resultAggregator[T]) collect(collectErrored bool) []T {
+ if !r.mu.TryLock() {
+ panic("collect should not be called until all goroutines have exited")
+ }
+
+ if collectErrored || len(r.errored) == 0 {
+ return r.results
+ }
+
+ filtered := r.results[:0]
+ sort.Ints(r.errored)
+ for i, e := range r.errored {
+ if i == 0 {
+ filtered = append(filtered, r.results[:e]...)
+ } else {
+ filtered = append(filtered, r.results[r.errored[i-1]+1:e]...)
+ }
+ }
+ return filtered
+}