1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
|

# `conc`: better structured concurrency for go
[](https://pkg.go.dev/github.com/sourcegraph/conc)
[](https://sourcegraph.com/github.com/sourcegraph/conc)
[](https://goreportcard.com/report/github.com/sourcegraph/conc)
[](https://codecov.io/gh/sourcegraph/conc)
[](https://discord.gg/bvXQXmtRjN)
`conc` is your toolbelt for structured concurrency in go, making common tasks
easier and safer.
```sh
go get github.com/sourcegraph/conc
```
# At a glance
- Use [`conc.WaitGroup`](https://pkg.go.dev/github.com/sourcegraph/conc#WaitGroup) if you just want a safer version of `sync.WaitGroup`
- Use [`pool.Pool`](https://pkg.go.dev/github.com/sourcegraph/conc/pool#Pool) if you want a concurrency-limited task runner
- Use [`pool.ResultPool`](https://pkg.go.dev/github.com/sourcegraph/conc/pool#ResultPool) if you want a concurrent task runner that collects task results
- Use [`pool.(Result)?ErrorPool`](https://pkg.go.dev/github.com/sourcegraph/conc/pool#ErrorPool) if your tasks are fallible
- Use [`pool.(Result)?ContextPool`](https://pkg.go.dev/github.com/sourcegraph/conc/pool#ContextPool) if your tasks should be canceled on failure
- Use [`stream.Stream`](https://pkg.go.dev/github.com/sourcegraph/conc/stream#Stream) if you want to process an ordered stream of tasks in parallel with serial callbacks
- Use [`iter.Map`](https://pkg.go.dev/github.com/sourcegraph/conc/iter#Map) if you want to concurrently map a slice
- Use [`iter.ForEach`](https://pkg.go.dev/github.com/sourcegraph/conc/iter#ForEach) if you want to concurrently iterate over a slice
- Use [`panics.Catcher`](https://pkg.go.dev/github.com/sourcegraph/conc/panics#Catcher) if you want to catch panics in your own goroutines
All pools are created with
[`pool.New()`](https://pkg.go.dev/github.com/sourcegraph/conc/pool#New)
or
[`pool.NewWithResults[T]()`](https://pkg.go.dev/github.com/sourcegraph/conc/pool#NewWithResults),
then configured with methods:
- [`p.WithMaxGoroutines()`](https://pkg.go.dev/github.com/sourcegraph/conc/pool#Pool.MaxGoroutines) configures the maximum number of goroutines in the pool
- [`p.WithErrors()`](https://pkg.go.dev/github.com/sourcegraph/conc/pool#Pool.WithErrors) configures the pool to run tasks that return errors
- [`p.WithContext(ctx)`](https://pkg.go.dev/github.com/sourcegraph/conc/pool#Pool.WithContext) configures the pool to run tasks that should be canceled on first error
- [`p.WithFirstError()`](https://pkg.go.dev/github.com/sourcegraph/conc/pool#ErrorPool.WithFirstError) configures error pools to only keep the first returned error rather than an aggregated error
- [`p.WithCollectErrored()`](https://pkg.go.dev/github.com/sourcegraph/conc/pool#ResultContextPool.WithCollectErrored) configures result pools to collect results even when the task errored
# Goals
The main goals of the package are:
1) Make it harder to leak goroutines
2) Handle panics gracefully
3) Make concurrent code easier to read
## Goal #1: Make it harder to leak goroutines
A common pain point when working with goroutines is cleaning them up. It's
really easy to fire off a `go` statement and fail to properly wait for it to
complete.
`conc` takes the opinionated stance that all concurrency should be scoped.
That is, goroutines should have an owner and that owner should always
ensure that its owned goroutines exit properly.
In `conc`, the owner of a goroutine is always a `conc.WaitGroup`. Goroutines
are spawned in a `WaitGroup` with `(*WaitGroup).Go()`, and
`(*WaitGroup).Wait()` should always be called before the `WaitGroup` goes out
of scope.
In some cases, you might want a spawned goroutine to outlast the scope of the
caller. In that case, you could pass a `WaitGroup` into the spawning function.
```go
func main() {
var wg conc.WaitGroup
defer wg.Wait()
startTheThing(&wg)
}
func startTheThing(wg *conc.WaitGroup) {
wg.Go(func() { ... })
}
```
For some more discussion on why scoped concurrency is nice, check out [this
blog
post](https://vorpus.org/blog/notes-on-structured-concurrency-or-go-statement-considered-harmful/).
## Goal #2: Handle panics gracefully
A frequent problem with goroutines in long-running applications is handling
panics. A goroutine spawned without a panic handler will crash the whole process
on panic. This is usually undesirable.
However, if you do add a panic handler to a goroutine, what do you do with the
panic once you catch it? Some options:
1) Ignore it
2) Log it
3) Turn it into an error and return that to the goroutine spawner
4) Propagate the panic to the goroutine spawner
Ignoring panics is a bad idea since panics usually mean there is actually
something wrong and someone should fix it.
Just logging panics isn't great either because then there is no indication to the spawner
that something bad happened, and it might just continue on as normal even though your
program is in a really bad state.
Both (3) and (4) are reasonable options, but both require the goroutine to have
an owner that can actually receive the message that something went wrong. This
is generally not true with a goroutine spawned with `go`, but in the `conc`
package, all goroutines have an owner that must collect the spawned goroutine.
In the conc package, any call to `Wait()` will panic if any of the spawned goroutines
panicked. Additionally, it decorates the panic value with a stacktrace from the child
goroutine so that you don't lose information about what caused the panic.
Doing this all correctly every time you spawn something with `go` is not
trivial and it requires a lot of boilerplate that makes the important parts of
the code more difficult to read, so `conc` does this for you.
<table>
<tr>
<th><code>stdlib</code></th>
<th><code>conc</code></th>
</tr>
<tr>
<td>
```go
type caughtPanicError struct {
val any
stack []byte
}
func (e *caughtPanicError) Error() string {
return fmt.Sprintf(
"panic: %q\n%s",
e.val,
string(e.stack)
)
}
func main() {
done := make(chan error)
go func() {
defer func() {
if v := recover(); v != nil {
done <- &caughtPanicError{
val: v,
stack: debug.Stack()
}
} else {
done <- nil
}
}()
doSomethingThatMightPanic()
}()
err := <-done
if err != nil {
panic(err)
}
}
```
</td>
<td>
```go
func main() {
var wg conc.WaitGroup
wg.Go(doSomethingThatMightPanic)
// panics with a nice stacktrace
wg.Wait()
}
```
</td>
</tr>
</table>
## Goal #3: Make concurrent code easier to read
Doing concurrency correctly is difficult. Doing it in a way that doesn't
obfuscate what the code is actually doing is more difficult. The `conc` package
attempts to make common operations easier by abstracting as much boilerplate
complexity as possible.
Want to run a set of concurrent tasks with a bounded set of goroutines? Use
`pool.New()`. Want to process an ordered stream of results concurrently, but
still maintain order? Try `stream.New()`. What about a concurrent map over
a slice? Take a peek at `iter.Map()`.
Browse some examples below for some comparisons with doing these by hand.
# Examples
Each of these examples forgoes propagating panics for simplicity. To see
what kind of complexity that would add, check out the "Goal #2" header above.
Spawn a set of goroutines and waiting for them to finish:
<table>
<tr>
<th><code>stdlib</code></th>
<th><code>conc</code></th>
</tr>
<tr>
<td>
```go
func main() {
var wg sync.WaitGroup
for i := 0; i < 10; i++ {
wg.Add(1)
go func() {
defer wg.Done()
// crashes on panic!
doSomething()
}()
}
wg.Wait()
}
```
</td>
<td>
```go
func main() {
var wg conc.WaitGroup
for i := 0; i < 10; i++ {
wg.Go(doSomething)
}
wg.Wait()
}
```
</td>
</tr>
</table>
Process each element of a stream in a static pool of goroutines:
<table>
<tr>
<th><code>stdlib</code></th>
<th><code>conc</code></th>
</tr>
<tr>
<td>
```go
func process(stream chan int) {
var wg sync.WaitGroup
for i := 0; i < 10; i++ {
wg.Add(1)
go func() {
defer wg.Done()
for elem := range stream {
handle(elem)
}
}()
}
wg.Wait()
}
```
</td>
<td>
```go
func process(stream chan int) {
p := pool.New().WithMaxGoroutines(10)
for elem := range stream {
elem := elem
p.Go(func() {
handle(elem)
})
}
p.Wait()
}
```
</td>
</tr>
</table>
Process each element of a slice in a static pool of goroutines:
<table>
<tr>
<th><code>stdlib</code></th>
<th><code>conc</code></th>
</tr>
<tr>
<td>
```go
func process(values []int) {
feeder := make(chan int, 8)
var wg sync.WaitGroup
for i := 0; i < 10; i++ {
wg.Add(1)
go func() {
defer wg.Done()
for elem := range feeder {
handle(elem)
}
}()
}
for _, value := range values {
feeder <- value
}
close(feeder)
wg.Wait()
}
```
</td>
<td>
```go
func process(values []int) {
iter.ForEach(values, handle)
}
```
</td>
</tr>
</table>
Concurrently map a slice:
<table>
<tr>
<th><code>stdlib</code></th>
<th><code>conc</code></th>
</tr>
<tr>
<td>
```go
func concMap(
input []int,
f func(int) int,
) []int {
res := make([]int, len(input))
var idx atomic.Int64
var wg sync.WaitGroup
for i := 0; i < 10; i++ {
wg.Add(1)
go func() {
defer wg.Done()
for {
i := int(idx.Add(1) - 1)
if i >= len(input) {
return
}
res[i] = f(input[i])
}
}()
}
wg.Wait()
return res
}
```
</td>
<td>
```go
func concMap(
input []int,
f func(*int) int,
) []int {
return iter.Map(input, f)
}
```
</td>
</tr>
</table>
Process an ordered stream concurrently:
<table>
<tr>
<th><code>stdlib</code></th>
<th><code>conc</code></th>
</tr>
<tr>
<td>
```go
func mapStream(
in chan int,
out chan int,
f func(int) int,
) {
tasks := make(chan func())
taskResults := make(chan chan int)
// Worker goroutines
var workerWg sync.WaitGroup
for i := 0; i < 10; i++ {
workerWg.Add(1)
go func() {
defer workerWg.Done()
for task := range tasks {
task()
}
}()
}
// Ordered reader goroutines
var readerWg sync.WaitGroup
readerWg.Add(1)
go func() {
defer readerWg.Done()
for result := range taskResults {
item := <-result
out <- item
}
}()
// Feed the workers with tasks
for elem := range in {
resultCh := make(chan int, 1)
taskResults <- resultCh
tasks <- func() {
resultCh <- f(elem)
}
}
// We've exhausted input.
// Wait for everything to finish
close(tasks)
workerWg.Wait()
close(taskResults)
readerWg.Wait()
}
```
</td>
<td>
```go
func mapStream(
in chan int,
out chan int,
f func(int) int,
) {
s := stream.New().WithMaxGoroutines(10)
for elem := range in {
elem := elem
s.Go(func() stream.Callback {
res := f(elem)
return func() { out <- res }
})
}
s.Wait()
}
```
</td>
</tr>
</table>
# Status
This package is currently pre-1.0. There are likely to be minor breaking
changes before a 1.0 release as we stabilize the APIs and tweak defaults.
Please open an issue if you have questions, concerns, or requests that you'd
like addressed before the 1.0 release. Currently, a 1.0 is targeted for
March 2023.
|