diff options
Diffstat (limited to 'vendor/github.com/sourcegraph/conc')
| -rw-r--r-- | vendor/github.com/sourcegraph/conc/.golangci.yml | 11 | ||||
| -rw-r--r-- | vendor/github.com/sourcegraph/conc/LICENSE | 21 | ||||
| -rw-r--r-- | vendor/github.com/sourcegraph/conc/Makefile | 24 | ||||
| -rw-r--r-- | vendor/github.com/sourcegraph/conc/README.md | 464 | ||||
| -rw-r--r-- | vendor/github.com/sourcegraph/conc/panics/panics.go | 102 | ||||
| -rw-r--r-- | vendor/github.com/sourcegraph/conc/panics/try.go | 11 | ||||
| -rw-r--r-- | vendor/github.com/sourcegraph/conc/pool/context_pool.go | 104 | ||||
| -rw-r--r-- | vendor/github.com/sourcegraph/conc/pool/error_pool.go | 100 | ||||
| -rw-r--r-- | vendor/github.com/sourcegraph/conc/pool/pool.go | 174 | ||||
| -rw-r--r-- | vendor/github.com/sourcegraph/conc/pool/result_context_pool.go | 85 | ||||
| -rw-r--r-- | vendor/github.com/sourcegraph/conc/pool/result_error_pool.go | 80 | ||||
| -rw-r--r-- | vendor/github.com/sourcegraph/conc/pool/result_pool.go | 142 | ||||
| -rw-r--r-- | vendor/github.com/sourcegraph/conc/waitgroup.go | 52 |
13 files changed, 0 insertions, 1370 deletions
diff --git a/vendor/github.com/sourcegraph/conc/.golangci.yml b/vendor/github.com/sourcegraph/conc/.golangci.yml deleted file mode 100644 index ae65a760a..000000000 --- a/vendor/github.com/sourcegraph/conc/.golangci.yml +++ /dev/null @@ -1,11 +0,0 @@ -linters: - disable-all: true - enable: - - errcheck - - godot - - gosimple - - govet - - ineffassign - - staticcheck - - typecheck - - unused diff --git a/vendor/github.com/sourcegraph/conc/LICENSE b/vendor/github.com/sourcegraph/conc/LICENSE deleted file mode 100644 index 1081f4ef4..000000000 --- a/vendor/github.com/sourcegraph/conc/LICENSE +++ /dev/null @@ -1,21 +0,0 @@ -MIT License - -Copyright (c) 2023 Sourcegraph - -Permission is hereby granted, free of charge, to any person obtaining a copy -of this software and associated documentation files (the "Software"), to deal -in the Software without restriction, including without limitation the rights -to use, copy, modify, merge, publish, distribute, sublicense, and/or sell -copies of the Software, and to permit persons to whom the Software is -furnished to do so, subject to the following conditions: - -The above copyright notice and this permission notice shall be included in all -copies or substantial portions of the Software. - -THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR -IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE -AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, -OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE -SOFTWARE. diff --git a/vendor/github.com/sourcegraph/conc/Makefile b/vendor/github.com/sourcegraph/conc/Makefile deleted file mode 100644 index 3e0720a12..000000000 --- a/vendor/github.com/sourcegraph/conc/Makefile +++ /dev/null @@ -1,24 +0,0 @@ -.DEFAULT_GOAL := help - -GO_BIN ?= $(shell go env GOPATH)/bin - -.PHONY: help -help: - @grep -E '^[a-zA-Z_-]+:.*?## .*$$' $(MAKEFILE_LIST) | sort | awk 'BEGIN {FS = ":.*?## "}; {printf "\033[36m%-30s\033[0m %s\n", $$1, $$2}' - -$(GO_BIN)/golangci-lint: - @echo "==> Installing golangci-lint within "${GO_BIN}"" - @go install -v github.com/golangci/golangci-lint/cmd/golangci-lint@latest - -.PHONY: lint -lint: $(GO_BIN)/golangci-lint ## Run linting on Go files - @echo "==> Linting Go source files" - @golangci-lint run -v --fix -c .golangci.yml ./... - -.PHONY: test -test: ## Run tests - go test -race -v ./... -coverprofile ./coverage.txt - -.PHONY: bench -bench: ## Run benchmarks. See https://pkg.go.dev/cmd/go#hdr-Testing_flags - go test ./... -bench . -benchtime 5s -timeout 0 -run=XXX -cpu 1 -benchmem diff --git a/vendor/github.com/sourcegraph/conc/README.md b/vendor/github.com/sourcegraph/conc/README.md deleted file mode 100644 index 1c87c3c96..000000000 --- a/vendor/github.com/sourcegraph/conc/README.md +++ /dev/null @@ -1,464 +0,0 @@ - - -# `conc`: better structured concurrency for go - -[](https://pkg.go.dev/github.com/sourcegraph/conc) -[](https://sourcegraph.com/github.com/sourcegraph/conc) -[](https://goreportcard.com/report/github.com/sourcegraph/conc) -[](https://codecov.io/gh/sourcegraph/conc) -[](https://discord.gg/bvXQXmtRjN) - -`conc` is your toolbelt for structured concurrency in go, making common tasks -easier and safer. - -```sh -go get github.com/sourcegraph/conc -``` - -# At a glance - -- Use [`conc.WaitGroup`](https://pkg.go.dev/github.com/sourcegraph/conc#WaitGroup) if you just want a safer version of `sync.WaitGroup` -- Use [`pool.Pool`](https://pkg.go.dev/github.com/sourcegraph/conc/pool#Pool) if you want a concurrency-limited task runner -- Use [`pool.ResultPool`](https://pkg.go.dev/github.com/sourcegraph/conc/pool#ResultPool) if you want a concurrent task runner that collects task results -- Use [`pool.(Result)?ErrorPool`](https://pkg.go.dev/github.com/sourcegraph/conc/pool#ErrorPool) if your tasks are fallible -- Use [`pool.(Result)?ContextPool`](https://pkg.go.dev/github.com/sourcegraph/conc/pool#ContextPool) if your tasks should be canceled on failure -- Use [`stream.Stream`](https://pkg.go.dev/github.com/sourcegraph/conc/stream#Stream) if you want to process an ordered stream of tasks in parallel with serial callbacks -- Use [`iter.Map`](https://pkg.go.dev/github.com/sourcegraph/conc/iter#Map) if you want to concurrently map a slice -- Use [`iter.ForEach`](https://pkg.go.dev/github.com/sourcegraph/conc/iter#ForEach) if you want to concurrently iterate over a slice -- Use [`panics.Catcher`](https://pkg.go.dev/github.com/sourcegraph/conc/panics#Catcher) if you want to catch panics in your own goroutines - -All pools are created with -[`pool.New()`](https://pkg.go.dev/github.com/sourcegraph/conc/pool#New) -or -[`pool.NewWithResults[T]()`](https://pkg.go.dev/github.com/sourcegraph/conc/pool#NewWithResults), -then configured with methods: - -- [`p.WithMaxGoroutines()`](https://pkg.go.dev/github.com/sourcegraph/conc/pool#Pool.MaxGoroutines) configures the maximum number of goroutines in the pool -- [`p.WithErrors()`](https://pkg.go.dev/github.com/sourcegraph/conc/pool#Pool.WithErrors) configures the pool to run tasks that return errors -- [`p.WithContext(ctx)`](https://pkg.go.dev/github.com/sourcegraph/conc/pool#Pool.WithContext) configures the pool to run tasks that should be canceled on first error -- [`p.WithFirstError()`](https://pkg.go.dev/github.com/sourcegraph/conc/pool#ErrorPool.WithFirstError) configures error pools to only keep the first returned error rather than an aggregated error -- [`p.WithCollectErrored()`](https://pkg.go.dev/github.com/sourcegraph/conc/pool#ResultContextPool.WithCollectErrored) configures result pools to collect results even when the task errored - -# Goals - -The main goals of the package are: -1) Make it harder to leak goroutines -2) Handle panics gracefully -3) Make concurrent code easier to read - -## Goal #1: Make it harder to leak goroutines - -A common pain point when working with goroutines is cleaning them up. It's -really easy to fire off a `go` statement and fail to properly wait for it to -complete. - -`conc` takes the opinionated stance that all concurrency should be scoped. -That is, goroutines should have an owner and that owner should always -ensure that its owned goroutines exit properly. - -In `conc`, the owner of a goroutine is always a `conc.WaitGroup`. Goroutines -are spawned in a `WaitGroup` with `(*WaitGroup).Go()`, and -`(*WaitGroup).Wait()` should always be called before the `WaitGroup` goes out -of scope. - -In some cases, you might want a spawned goroutine to outlast the scope of the -caller. In that case, you could pass a `WaitGroup` into the spawning function. - -```go -func main() { - var wg conc.WaitGroup - defer wg.Wait() - - startTheThing(&wg) -} - -func startTheThing(wg *conc.WaitGroup) { - wg.Go(func() { ... }) -} -``` - -For some more discussion on why scoped concurrency is nice, check out [this -blog -post](https://vorpus.org/blog/notes-on-structured-concurrency-or-go-statement-considered-harmful/). - -## Goal #2: Handle panics gracefully - -A frequent problem with goroutines in long-running applications is handling -panics. A goroutine spawned without a panic handler will crash the whole process -on panic. This is usually undesirable. - -However, if you do add a panic handler to a goroutine, what do you do with the -panic once you catch it? Some options: -1) Ignore it -2) Log it -3) Turn it into an error and return that to the goroutine spawner -4) Propagate the panic to the goroutine spawner - -Ignoring panics is a bad idea since panics usually mean there is actually -something wrong and someone should fix it. - -Just logging panics isn't great either because then there is no indication to the spawner -that something bad happened, and it might just continue on as normal even though your -program is in a really bad state. - -Both (3) and (4) are reasonable options, but both require the goroutine to have -an owner that can actually receive the message that something went wrong. This -is generally not true with a goroutine spawned with `go`, but in the `conc` -package, all goroutines have an owner that must collect the spawned goroutine. -In the conc package, any call to `Wait()` will panic if any of the spawned goroutines -panicked. Additionally, it decorates the panic value with a stacktrace from the child -goroutine so that you don't lose information about what caused the panic. - -Doing this all correctly every time you spawn something with `go` is not -trivial and it requires a lot of boilerplate that makes the important parts of -the code more difficult to read, so `conc` does this for you. - -<table> -<tr> -<th><code>stdlib</code></th> -<th><code>conc</code></th> -</tr> -<tr> -<td> - -```go -type caughtPanicError struct { - val any - stack []byte -} - -func (e *caughtPanicError) Error() string { - return fmt.Sprintf( - "panic: %q\n%s", - e.val, - string(e.stack) - ) -} - -func main() { - done := make(chan error) - go func() { - defer func() { - if v := recover(); v != nil { - done <- &caughtPanicError{ - val: v, - stack: debug.Stack() - } - } else { - done <- nil - } - }() - doSomethingThatMightPanic() - }() - err := <-done - if err != nil { - panic(err) - } -} -``` -</td> -<td> - -```go -func main() { - var wg conc.WaitGroup - wg.Go(doSomethingThatMightPanic) - // panics with a nice stacktrace - wg.Wait() -} -``` -</td> -</tr> -</table> - -## Goal #3: Make concurrent code easier to read - -Doing concurrency correctly is difficult. Doing it in a way that doesn't -obfuscate what the code is actually doing is more difficult. The `conc` package -attempts to make common operations easier by abstracting as much boilerplate -complexity as possible. - -Want to run a set of concurrent tasks with a bounded set of goroutines? Use -`pool.New()`. Want to process an ordered stream of results concurrently, but -still maintain order? Try `stream.New()`. What about a concurrent map over -a slice? Take a peek at `iter.Map()`. - -Browse some examples below for some comparisons with doing these by hand. - -# Examples - -Each of these examples forgoes propagating panics for simplicity. To see -what kind of complexity that would add, check out the "Goal #2" header above. - -Spawn a set of goroutines and waiting for them to finish: - -<table> -<tr> -<th><code>stdlib</code></th> -<th><code>conc</code></th> -</tr> -<tr> -<td> - -```go -func main() { - var wg sync.WaitGroup - for i := 0; i < 10; i++ { - wg.Add(1) - go func() { - defer wg.Done() - // crashes on panic! - doSomething() - }() - } - wg.Wait() -} -``` -</td> -<td> - -```go -func main() { - var wg conc.WaitGroup - for i := 0; i < 10; i++ { - wg.Go(doSomething) - } - wg.Wait() -} -``` -</td> -</tr> -</table> - -Process each element of a stream in a static pool of goroutines: - -<table> -<tr> -<th><code>stdlib</code></th> -<th><code>conc</code></th> -</tr> -<tr> -<td> - -```go -func process(stream chan int) { - var wg sync.WaitGroup - for i := 0; i < 10; i++ { - wg.Add(1) - go func() { - defer wg.Done() - for elem := range stream { - handle(elem) - } - }() - } - wg.Wait() -} -``` -</td> -<td> - -```go -func process(stream chan int) { - p := pool.New().WithMaxGoroutines(10) - for elem := range stream { - elem := elem - p.Go(func() { - handle(elem) - }) - } - p.Wait() -} -``` -</td> -</tr> -</table> - -Process each element of a slice in a static pool of goroutines: - -<table> -<tr> -<th><code>stdlib</code></th> -<th><code>conc</code></th> -</tr> -<tr> -<td> - -```go -func process(values []int) { - feeder := make(chan int, 8) - - var wg sync.WaitGroup - for i := 0; i < 10; i++ { - wg.Add(1) - go func() { - defer wg.Done() - for elem := range feeder { - handle(elem) - } - }() - } - - for _, value := range values { - feeder <- value - } - close(feeder) - wg.Wait() -} -``` -</td> -<td> - -```go -func process(values []int) { - iter.ForEach(values, handle) -} -``` -</td> -</tr> -</table> - -Concurrently map a slice: - -<table> -<tr> -<th><code>stdlib</code></th> -<th><code>conc</code></th> -</tr> -<tr> -<td> - -```go -func concMap( - input []int, - f func(int) int, -) []int { - res := make([]int, len(input)) - var idx atomic.Int64 - - var wg sync.WaitGroup - for i := 0; i < 10; i++ { - wg.Add(1) - go func() { - defer wg.Done() - - for { - i := int(idx.Add(1) - 1) - if i >= len(input) { - return - } - - res[i] = f(input[i]) - } - }() - } - wg.Wait() - return res -} -``` -</td> -<td> - -```go -func concMap( - input []int, - f func(*int) int, -) []int { - return iter.Map(input, f) -} -``` -</td> -</tr> -</table> - -Process an ordered stream concurrently: - - -<table> -<tr> -<th><code>stdlib</code></th> -<th><code>conc</code></th> -</tr> -<tr> -<td> - -```go -func mapStream( - in chan int, - out chan int, - f func(int) int, -) { - tasks := make(chan func()) - taskResults := make(chan chan int) - - // Worker goroutines - var workerWg sync.WaitGroup - for i := 0; i < 10; i++ { - workerWg.Add(1) - go func() { - defer workerWg.Done() - for task := range tasks { - task() - } - }() - } - - // Ordered reader goroutines - var readerWg sync.WaitGroup - readerWg.Add(1) - go func() { - defer readerWg.Done() - for result := range taskResults { - item := <-result - out <- item - } - }() - - // Feed the workers with tasks - for elem := range in { - resultCh := make(chan int, 1) - taskResults <- resultCh - tasks <- func() { - resultCh <- f(elem) - } - } - - // We've exhausted input. - // Wait for everything to finish - close(tasks) - workerWg.Wait() - close(taskResults) - readerWg.Wait() -} -``` -</td> -<td> - -```go -func mapStream( - in chan int, - out chan int, - f func(int) int, -) { - s := stream.New().WithMaxGoroutines(10) - for elem := range in { - elem := elem - s.Go(func() stream.Callback { - res := f(elem) - return func() { out <- res } - }) - } - s.Wait() -} -``` -</td> -</tr> -</table> - -# Status - -This package is currently pre-1.0. There are likely to be minor breaking -changes before a 1.0 release as we stabilize the APIs and tweak defaults. -Please open an issue if you have questions, concerns, or requests that you'd -like addressed before the 1.0 release. Currently, a 1.0 is targeted for -March 2023. diff --git a/vendor/github.com/sourcegraph/conc/panics/panics.go b/vendor/github.com/sourcegraph/conc/panics/panics.go deleted file mode 100644 index abbed7fa0..000000000 --- a/vendor/github.com/sourcegraph/conc/panics/panics.go +++ /dev/null @@ -1,102 +0,0 @@ -package panics - -import ( - "fmt" - "runtime" - "runtime/debug" - "sync/atomic" -) - -// Catcher is used to catch panics. You can execute a function with Try, -// which will catch any spawned panic. Try can be called any number of times, -// from any number of goroutines. Once all calls to Try have completed, you can -// get the value of the first panic (if any) with Recovered(), or you can just -// propagate the panic (re-panic) with Repanic(). -type Catcher struct { - recovered atomic.Pointer[Recovered] -} - -// Try executes f, catching any panic it might spawn. It is safe -// to call from multiple goroutines simultaneously. -func (p *Catcher) Try(f func()) { - defer p.tryRecover() - f() -} - -func (p *Catcher) tryRecover() { - if val := recover(); val != nil { - rp := NewRecovered(1, val) - p.recovered.CompareAndSwap(nil, &rp) - } -} - -// Repanic panics if any calls to Try caught a panic. It will panic with the -// value of the first panic caught, wrapped in a panics.Recovered with caller -// information. -func (p *Catcher) Repanic() { - if val := p.Recovered(); val != nil { - panic(val) - } -} - -// Recovered returns the value of the first panic caught by Try, or nil if -// no calls to Try panicked. -func (p *Catcher) Recovered() *Recovered { - return p.recovered.Load() -} - -// NewRecovered creates a panics.Recovered from a panic value and a collected -// stacktrace. The skip parameter allows the caller to skip stack frames when -// collecting the stacktrace. Calling with a skip of 0 means include the call to -// NewRecovered in the stacktrace. -func NewRecovered(skip int, value any) Recovered { - // 64 frames should be plenty - var callers [64]uintptr - n := runtime.Callers(skip+1, callers[:]) - return Recovered{ - Value: value, - Callers: callers[:n], - Stack: debug.Stack(), - } -} - -// Recovered is a panic that was caught with recover(). -type Recovered struct { - // The original value of the panic. - Value any - // The caller list as returned by runtime.Callers when the panic was - // recovered. Can be used to produce a more detailed stack information with - // runtime.CallersFrames. - Callers []uintptr - // The formatted stacktrace from the goroutine where the panic was recovered. - // Easier to use than Callers. - Stack []byte -} - -// String renders a human-readable formatting of the panic. -func (p *Recovered) String() string { - return fmt.Sprintf("panic: %v\nstacktrace:\n%s\n", p.Value, p.Stack) -} - -// AsError casts the panic into an error implementation. The implementation -// is unwrappable with the cause of the panic, if the panic was provided one. -func (p *Recovered) AsError() error { - if p == nil { - return nil - } - return &ErrRecovered{*p} -} - -// ErrRecovered wraps a panics.Recovered in an error implementation. -type ErrRecovered struct{ Recovered } - -var _ error = (*ErrRecovered)(nil) - -func (p *ErrRecovered) Error() string { return p.String() } - -func (p *ErrRecovered) Unwrap() error { - if err, ok := p.Value.(error); ok { - return err - } - return nil -} diff --git a/vendor/github.com/sourcegraph/conc/panics/try.go b/vendor/github.com/sourcegraph/conc/panics/try.go deleted file mode 100644 index 4ded92a1c..000000000 --- a/vendor/github.com/sourcegraph/conc/panics/try.go +++ /dev/null @@ -1,11 +0,0 @@ -package panics - -// Try executes f, catching and returning any panic it might spawn. -// -// The recovered panic can be propagated with panic(), or handled as a normal error with -// (*panics.Recovered).AsError(). -func Try(f func()) *Recovered { - var c Catcher - c.Try(f) - return c.Recovered() -} diff --git a/vendor/github.com/sourcegraph/conc/pool/context_pool.go b/vendor/github.com/sourcegraph/conc/pool/context_pool.go deleted file mode 100644 index 85c34e5ae..000000000 --- a/vendor/github.com/sourcegraph/conc/pool/context_pool.go +++ /dev/null @@ -1,104 +0,0 @@ -package pool - -import ( - "context" -) - -// ContextPool is a pool that runs tasks that take a context. -// A new ContextPool should be created with `New().WithContext(ctx)`. -// -// The configuration methods (With*) will panic if they are used after calling -// Go() for the first time. -type ContextPool struct { - errorPool ErrorPool - - ctx context.Context - cancel context.CancelFunc - - cancelOnError bool -} - -// Go submits a task. If it returns an error, the error will be -// collected and returned by Wait(). If all goroutines in the pool -// are busy, a call to Go() will block until the task can be started. -func (p *ContextPool) Go(f func(ctx context.Context) error) { - p.errorPool.Go(func() error { - if p.cancelOnError { - // If we are cancelling on error, then we also want to cancel if a - // panic is raised. To do this, we need to recover, cancel, and then - // re-throw the caught panic. - defer func() { - if r := recover(); r != nil { - p.cancel() - panic(r) - } - }() - } - - err := f(p.ctx) - if err != nil && p.cancelOnError { - // Leaky abstraction warning: We add the error directly because - // otherwise, canceling could cause another goroutine to exit and - // return an error before this error was added, which breaks the - // expectations of WithFirstError(). - p.errorPool.addErr(err) - p.cancel() - return nil - } - return err - }) -} - -// Wait cleans up all spawned goroutines, propagates any panics, and -// returns an error if any of the tasks errored. -func (p *ContextPool) Wait() error { - // Make sure we call cancel after pool is done to avoid memory leakage. - defer p.cancel() - return p.errorPool.Wait() -} - -// WithFirstError configures the pool to only return the first error -// returned by a task. By default, Wait() will return a combined error. -// This is particularly useful for (*ContextPool).WithCancelOnError(), -// where all errors after the first are likely to be context.Canceled. -func (p *ContextPool) WithFirstError() *ContextPool { - p.panicIfInitialized() - p.errorPool.WithFirstError() - return p -} - -// WithCancelOnError configures the pool to cancel its context as soon as -// any task returns an error or panics. By default, the pool's context is not -// canceled until the parent context is canceled. -// -// In this case, all errors returned from the pool after the first will -// likely be context.Canceled - you may want to also use -// (*ContextPool).WithFirstError() to configure the pool to only return -// the first error. -func (p *ContextPool) WithCancelOnError() *ContextPool { - p.panicIfInitialized() - p.cancelOnError = true - return p -} - -// WithFailFast is an alias for the combination of WithFirstError and -// WithCancelOnError. By default, the errors from all tasks are returned and -// the pool's context is not canceled until the parent context is canceled. -func (p *ContextPool) WithFailFast() *ContextPool { - p.panicIfInitialized() - p.WithFirstError() - p.WithCancelOnError() - return p -} - -// WithMaxGoroutines limits the number of goroutines in a pool. -// Defaults to unlimited. Panics if n < 1. -func (p *ContextPool) WithMaxGoroutines(n int) *ContextPool { - p.panicIfInitialized() - p.errorPool.WithMaxGoroutines(n) - return p -} - -func (p *ContextPool) panicIfInitialized() { - p.errorPool.panicIfInitialized() -} diff --git a/vendor/github.com/sourcegraph/conc/pool/error_pool.go b/vendor/github.com/sourcegraph/conc/pool/error_pool.go deleted file mode 100644 index e1789e61b..000000000 --- a/vendor/github.com/sourcegraph/conc/pool/error_pool.go +++ /dev/null @@ -1,100 +0,0 @@ -package pool - -import ( - "context" - "errors" - "sync" -) - -// ErrorPool is a pool that runs tasks that may return an error. -// Errors are collected and returned by Wait(). -// -// The configuration methods (With*) will panic if they are used after calling -// Go() for the first time. -// -// A new ErrorPool should be created using `New().WithErrors()`. -type ErrorPool struct { - pool Pool - - onlyFirstError bool - - mu sync.Mutex - errs []error -} - -// Go submits a task to the pool. If all goroutines in the pool -// are busy, a call to Go() will block until the task can be started. -func (p *ErrorPool) Go(f func() error) { - p.pool.Go(func() { - p.addErr(f()) - }) -} - -// Wait cleans up any spawned goroutines, propagating any panics and -// returning any errors from tasks. -func (p *ErrorPool) Wait() error { - p.pool.Wait() - - errs := p.errs - p.errs = nil // reset errs - - if len(errs) == 0 { - return nil - } else if p.onlyFirstError { - return errs[0] - } else { - return errors.Join(errs...) - } -} - -// WithContext converts the pool to a ContextPool for tasks that should -// run under the same context, such that they each respect shared cancellation. -// For example, WithCancelOnError can be configured on the returned pool to -// signal that all goroutines should be cancelled upon the first error. -func (p *ErrorPool) WithContext(ctx context.Context) *ContextPool { - p.panicIfInitialized() - ctx, cancel := context.WithCancel(ctx) - return &ContextPool{ - errorPool: p.deref(), - ctx: ctx, - cancel: cancel, - } -} - -// WithFirstError configures the pool to only return the first error -// returned by a task. By default, Wait() will return a combined error. -func (p *ErrorPool) WithFirstError() *ErrorPool { - p.panicIfInitialized() - p.onlyFirstError = true - return p -} - -// WithMaxGoroutines limits the number of goroutines in a pool. -// Defaults to unlimited. Panics if n < 1. -func (p *ErrorPool) WithMaxGoroutines(n int) *ErrorPool { - p.panicIfInitialized() - p.pool.WithMaxGoroutines(n) - return p -} - -// deref is a helper that creates a shallow copy of the pool with the same -// settings. We don't want to just dereference the pointer because that makes -// the copylock lint angry. -func (p *ErrorPool) deref() ErrorPool { - return ErrorPool{ - pool: p.pool.deref(), - onlyFirstError: p.onlyFirstError, - } -} - -func (p *ErrorPool) panicIfInitialized() { - p.pool.panicIfInitialized() -} - -func (p *ErrorPool) addErr(err error) { - if err != nil { - p.mu.Lock() - p.errs = append(p.errs, err) - p.mu.Unlock() - } -} diff --git a/vendor/github.com/sourcegraph/conc/pool/pool.go b/vendor/github.com/sourcegraph/conc/pool/pool.go deleted file mode 100644 index 8f4494efb..000000000 --- a/vendor/github.com/sourcegraph/conc/pool/pool.go +++ /dev/null @@ -1,174 +0,0 @@ -package pool - -import ( - "context" - "sync" - - "github.com/sourcegraph/conc" -) - -// New creates a new Pool. -func New() *Pool { - return &Pool{} -} - -// Pool is a pool of goroutines used to execute tasks concurrently. -// -// Tasks are submitted with Go(). Once all your tasks have been submitted, you -// must call Wait() to clean up any spawned goroutines and propagate any -// panics. -// -// Goroutines are started lazily, so creating a new pool is cheap. There will -// never be more goroutines spawned than there are tasks submitted. -// -// The configuration methods (With*) will panic if they are used after calling -// Go() for the first time. -// -// Pool is efficient, but not zero cost. It should not be used for very short -// tasks. Startup and teardown come with an overhead of around 1µs, and each -// task has an overhead of around 300ns. -type Pool struct { - handle conc.WaitGroup - limiter limiter - tasks chan func() - initOnce sync.Once -} - -// Go submits a task to be run in the pool. If all goroutines in the pool -// are busy, a call to Go() will block until the task can be started. -func (p *Pool) Go(f func()) { - p.init() - - if p.limiter == nil { - // No limit on the number of goroutines. - select { - case p.tasks <- f: - // A goroutine was available to handle the task. - default: - // No goroutine was available to handle the task. - // Spawn a new one and send it the task. - p.handle.Go(func() { - p.worker(f) - }) - } - } else { - select { - case p.limiter <- struct{}{}: - // If we are below our limit, spawn a new worker rather - // than waiting for one to become available. - p.handle.Go(func() { - p.worker(f) - }) - case p.tasks <- f: - // A worker is available and has accepted the task. - return - } - } - -} - -// Wait cleans up spawned goroutines, propagating any panics that were -// raised by a tasks. -func (p *Pool) Wait() { - p.init() - - close(p.tasks) - - // After Wait() returns, reset the struct so tasks will be reinitialized on - // next use. This better matches the behavior of sync.WaitGroup - defer func() { p.initOnce = sync.Once{} }() - - p.handle.Wait() -} - -// MaxGoroutines returns the maximum size of the pool. -func (p *Pool) MaxGoroutines() int { - return p.limiter.limit() -} - -// WithMaxGoroutines limits the number of goroutines in a pool. -// Defaults to unlimited. Panics if n < 1. -func (p *Pool) WithMaxGoroutines(n int) *Pool { - p.panicIfInitialized() - if n < 1 { - panic("max goroutines in a pool must be greater than zero") - } - p.limiter = make(limiter, n) - return p -} - -// init ensures that the pool is initialized before use. This makes the -// zero value of the pool usable. -func (p *Pool) init() { - p.initOnce.Do(func() { - p.tasks = make(chan func()) - }) -} - -// panicIfInitialized will trigger a panic if a configuration method is called -// after the pool has started any goroutines for the first time. In the case that -// new settings are needed, a new pool should be created. -func (p *Pool) panicIfInitialized() { - if p.tasks != nil { - panic("pool can not be reconfigured after calling Go() for the first time") - } -} - -// WithErrors converts the pool to an ErrorPool so the submitted tasks can -// return errors. -func (p *Pool) WithErrors() *ErrorPool { - p.panicIfInitialized() - return &ErrorPool{ - pool: p.deref(), - } -} - -// deref is a helper that creates a shallow copy of the pool with the same -// settings. We don't want to just dereference the pointer because that makes -// the copylock lint angry. -func (p *Pool) deref() Pool { - p.panicIfInitialized() - return Pool{ - limiter: p.limiter, - } -} - -// WithContext converts the pool to a ContextPool for tasks that should -// run under the same context, such that they each respect shared cancellation. -// For example, WithCancelOnError can be configured on the returned pool to -// signal that all goroutines should be cancelled upon the first error. -func (p *Pool) WithContext(ctx context.Context) *ContextPool { - p.panicIfInitialized() - ctx, cancel := context.WithCancel(ctx) - return &ContextPool{ - errorPool: p.WithErrors().deref(), - ctx: ctx, - cancel: cancel, - } -} - -func (p *Pool) worker(initialFunc func()) { - // The only time this matters is if the task panics. - // This makes it possible to spin up new workers in that case. - defer p.limiter.release() - - if initialFunc != nil { - initialFunc() - } - - for f := range p.tasks { - f() - } -} - -type limiter chan struct{} - -func (l limiter) limit() int { - return cap(l) -} - -func (l limiter) release() { - if l != nil { - <-l - } -} diff --git a/vendor/github.com/sourcegraph/conc/pool/result_context_pool.go b/vendor/github.com/sourcegraph/conc/pool/result_context_pool.go deleted file mode 100644 index 6bc30dd63..000000000 --- a/vendor/github.com/sourcegraph/conc/pool/result_context_pool.go +++ /dev/null @@ -1,85 +0,0 @@ -package pool - -import ( - "context" -) - -// ResultContextPool is a pool that runs tasks that take a context and return a -// result. The context passed to the task will be canceled if any of the tasks -// return an error, which makes its functionality different than just capturing -// a context with the task closure. -// -// The configuration methods (With*) will panic if they are used after calling -// Go() for the first time. -type ResultContextPool[T any] struct { - contextPool ContextPool - agg resultAggregator[T] - collectErrored bool -} - -// Go submits a task to the pool. If all goroutines in the pool -// are busy, a call to Go() will block until the task can be started. -func (p *ResultContextPool[T]) Go(f func(context.Context) (T, error)) { - idx := p.agg.nextIndex() - p.contextPool.Go(func(ctx context.Context) error { - res, err := f(ctx) - p.agg.save(idx, res, err != nil) - return err - }) -} - -// Wait cleans up all spawned goroutines, propagates any panics, and -// returns an error if any of the tasks errored. -func (p *ResultContextPool[T]) Wait() ([]T, error) { - err := p.contextPool.Wait() - results := p.agg.collect(p.collectErrored) - p.agg = resultAggregator[T]{} - return results, err -} - -// WithCollectErrored configures the pool to still collect the result of a task -// even if the task returned an error. By default, the result of tasks that errored -// are ignored and only the error is collected. -func (p *ResultContextPool[T]) WithCollectErrored() *ResultContextPool[T] { - p.panicIfInitialized() - p.collectErrored = true - return p -} - -// WithFirstError configures the pool to only return the first error -// returned by a task. By default, Wait() will return a combined error. -func (p *ResultContextPool[T]) WithFirstError() *ResultContextPool[T] { - p.panicIfInitialized() - p.contextPool.WithFirstError() - return p -} - -// WithCancelOnError configures the pool to cancel its context as soon as -// any task returns an error. By default, the pool's context is not -// canceled until the parent context is canceled. -func (p *ResultContextPool[T]) WithCancelOnError() *ResultContextPool[T] { - p.panicIfInitialized() - p.contextPool.WithCancelOnError() - return p -} - -// WithFailFast is an alias for the combination of WithFirstError and -// WithCancelOnError. By default, the errors from all tasks are returned and -// the pool's context is not canceled until the parent context is canceled. -func (p *ResultContextPool[T]) WithFailFast() *ResultContextPool[T] { - p.panicIfInitialized() - p.contextPool.WithFailFast() - return p -} - -// WithMaxGoroutines limits the number of goroutines in a pool. -// Defaults to unlimited. Panics if n < 1. -func (p *ResultContextPool[T]) WithMaxGoroutines(n int) *ResultContextPool[T] { - p.panicIfInitialized() - p.contextPool.WithMaxGoroutines(n) - return p -} - -func (p *ResultContextPool[T]) panicIfInitialized() { - p.contextPool.panicIfInitialized() -} diff --git a/vendor/github.com/sourcegraph/conc/pool/result_error_pool.go b/vendor/github.com/sourcegraph/conc/pool/result_error_pool.go deleted file mode 100644 index 832cd9bb4..000000000 --- a/vendor/github.com/sourcegraph/conc/pool/result_error_pool.go +++ /dev/null @@ -1,80 +0,0 @@ -package pool - -import ( - "context" -) - -// ResultErrorPool is a pool that executes tasks that return a generic result -// type and an error. Tasks are executed in the pool with Go(), then the -// results of the tasks are returned by Wait(). -// -// The order of the results is guaranteed to be the same as the order the -// tasks were submitted. -// -// The configuration methods (With*) will panic if they are used after calling -// Go() for the first time. -type ResultErrorPool[T any] struct { - errorPool ErrorPool - agg resultAggregator[T] - collectErrored bool -} - -// Go submits a task to the pool. If all goroutines in the pool -// are busy, a call to Go() will block until the task can be started. -func (p *ResultErrorPool[T]) Go(f func() (T, error)) { - idx := p.agg.nextIndex() - p.errorPool.Go(func() error { - res, err := f() - p.agg.save(idx, res, err != nil) - return err - }) -} - -// Wait cleans up any spawned goroutines, propagating any panics and -// returning the results and any errors from tasks. -func (p *ResultErrorPool[T]) Wait() ([]T, error) { - err := p.errorPool.Wait() - results := p.agg.collect(p.collectErrored) - p.agg = resultAggregator[T]{} // reset for reuse - return results, err -} - -// WithCollectErrored configures the pool to still collect the result of a task -// even if the task returned an error. By default, the result of tasks that errored -// are ignored and only the error is collected. -func (p *ResultErrorPool[T]) WithCollectErrored() *ResultErrorPool[T] { - p.panicIfInitialized() - p.collectErrored = true - return p -} - -// WithContext converts the pool to a ResultContextPool for tasks that should -// run under the same context, such that they each respect shared cancellation. -// For example, WithCancelOnError can be configured on the returned pool to -// signal that all goroutines should be cancelled upon the first error. -func (p *ResultErrorPool[T]) WithContext(ctx context.Context) *ResultContextPool[T] { - p.panicIfInitialized() - return &ResultContextPool[T]{ - contextPool: *p.errorPool.WithContext(ctx), - } -} - -// WithFirstError configures the pool to only return the first error -// returned by a task. By default, Wait() will return a combined error. -func (p *ResultErrorPool[T]) WithFirstError() *ResultErrorPool[T] { - p.panicIfInitialized() - p.errorPool.WithFirstError() - return p -} - -// WithMaxGoroutines limits the number of goroutines in a pool. -// Defaults to unlimited. Panics if n < 1. -func (p *ResultErrorPool[T]) WithMaxGoroutines(n int) *ResultErrorPool[T] { - p.panicIfInitialized() - p.errorPool.WithMaxGoroutines(n) - return p -} - -func (p *ResultErrorPool[T]) panicIfInitialized() { - p.errorPool.panicIfInitialized() -} diff --git a/vendor/github.com/sourcegraph/conc/pool/result_pool.go b/vendor/github.com/sourcegraph/conc/pool/result_pool.go deleted file mode 100644 index f73a77261..000000000 --- a/vendor/github.com/sourcegraph/conc/pool/result_pool.go +++ /dev/null @@ -1,142 +0,0 @@ -package pool - -import ( - "context" - "sort" - "sync" -) - -// NewWithResults creates a new ResultPool for tasks with a result of type T. -// -// The configuration methods (With*) will panic if they are used after calling -// Go() for the first time. -func NewWithResults[T any]() *ResultPool[T] { - return &ResultPool[T]{ - pool: *New(), - } -} - -// ResultPool is a pool that executes tasks that return a generic result type. -// Tasks are executed in the pool with Go(), then the results of the tasks are -// returned by Wait(). -// -// The order of the results is guaranteed to be the same as the order the -// tasks were submitted. -type ResultPool[T any] struct { - pool Pool - agg resultAggregator[T] -} - -// Go submits a task to the pool. If all goroutines in the pool -// are busy, a call to Go() will block until the task can be started. -func (p *ResultPool[T]) Go(f func() T) { - idx := p.agg.nextIndex() - p.pool.Go(func() { - p.agg.save(idx, f(), false) - }) -} - -// Wait cleans up all spawned goroutines, propagating any panics, and returning -// a slice of results from tasks that did not panic. -func (p *ResultPool[T]) Wait() []T { - p.pool.Wait() - results := p.agg.collect(true) - p.agg = resultAggregator[T]{} // reset for reuse - return results -} - -// MaxGoroutines returns the maximum size of the pool. -func (p *ResultPool[T]) MaxGoroutines() int { - return p.pool.MaxGoroutines() -} - -// WithErrors converts the pool to an ResultErrorPool so the submitted tasks -// can return errors. -func (p *ResultPool[T]) WithErrors() *ResultErrorPool[T] { - p.panicIfInitialized() - return &ResultErrorPool[T]{ - errorPool: *p.pool.WithErrors(), - } -} - -// WithContext converts the pool to a ResultContextPool for tasks that should -// run under the same context, such that they each respect shared cancellation. -// For example, WithCancelOnError can be configured on the returned pool to -// signal that all goroutines should be cancelled upon the first error. -func (p *ResultPool[T]) WithContext(ctx context.Context) *ResultContextPool[T] { - p.panicIfInitialized() - return &ResultContextPool[T]{ - contextPool: *p.pool.WithContext(ctx), - } -} - -// WithMaxGoroutines limits the number of goroutines in a pool. -// Defaults to unlimited. Panics if n < 1. -func (p *ResultPool[T]) WithMaxGoroutines(n int) *ResultPool[T] { - p.panicIfInitialized() - p.pool.WithMaxGoroutines(n) - return p -} - -func (p *ResultPool[T]) panicIfInitialized() { - p.pool.panicIfInitialized() -} - -// resultAggregator is a utility type that lets us safely append from multiple -// goroutines. The zero value is valid and ready to use. -type resultAggregator[T any] struct { - mu sync.Mutex - len int - results []T - errored []int -} - -// nextIndex reserves a slot for a result. The returned value should be passed -// to save() when adding a result to the aggregator. -func (r *resultAggregator[T]) nextIndex() int { - r.mu.Lock() - defer r.mu.Unlock() - - nextIdx := r.len - r.len += 1 - return nextIdx -} - -func (r *resultAggregator[T]) save(i int, res T, errored bool) { - r.mu.Lock() - defer r.mu.Unlock() - - if i >= len(r.results) { - old := r.results - r.results = make([]T, r.len) - copy(r.results, old) - } - - r.results[i] = res - - if errored { - r.errored = append(r.errored, i) - } -} - -// collect returns the set of aggregated results. -func (r *resultAggregator[T]) collect(collectErrored bool) []T { - if !r.mu.TryLock() { - panic("collect should not be called until all goroutines have exited") - } - - if collectErrored || len(r.errored) == 0 { - return r.results - } - - filtered := r.results[:0] - sort.Ints(r.errored) - for i, e := range r.errored { - if i == 0 { - filtered = append(filtered, r.results[:e]...) - } else { - filtered = append(filtered, r.results[r.errored[i-1]+1:e]...) - } - } - return filtered -} diff --git a/vendor/github.com/sourcegraph/conc/waitgroup.go b/vendor/github.com/sourcegraph/conc/waitgroup.go deleted file mode 100644 index 47b1bc1a5..000000000 --- a/vendor/github.com/sourcegraph/conc/waitgroup.go +++ /dev/null @@ -1,52 +0,0 @@ -package conc - -import ( - "sync" - - "github.com/sourcegraph/conc/panics" -) - -// NewWaitGroup creates a new WaitGroup. -func NewWaitGroup() *WaitGroup { - return &WaitGroup{} -} - -// WaitGroup is the primary building block for scoped concurrency. -// Goroutines can be spawned in the WaitGroup with the Go method, -// and calling Wait() will ensure that each of those goroutines exits -// before continuing. Any panics in a child goroutine will be caught -// and propagated to the caller of Wait(). -// -// The zero value of WaitGroup is usable, just like sync.WaitGroup. -// Also like sync.WaitGroup, it must not be copied after first use. -type WaitGroup struct { - wg sync.WaitGroup - pc panics.Catcher -} - -// Go spawns a new goroutine in the WaitGroup. -func (h *WaitGroup) Go(f func()) { - h.wg.Add(1) - go func() { - defer h.wg.Done() - h.pc.Try(f) - }() -} - -// Wait will block until all goroutines spawned with Go exit and will -// propagate any panics spawned in a child goroutine. -func (h *WaitGroup) Wait() { - h.wg.Wait() - - // Propagate a panic if we caught one from a child goroutine. - h.pc.Repanic() -} - -// WaitAndRecover will block until all goroutines spawned with Go exit and -// will return a *panics.Recovered if one of the child goroutines panics. -func (h *WaitGroup) WaitAndRecover() *panics.Recovered { - h.wg.Wait() - - // Return a recovered panic if we caught one from a child goroutine. - return h.pc.Recovered() -} |
