blob: 27f7fb9b8ad432abc2f0edd968f8372615c55f02 (
plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
|
package runners
import (
"context"
"errors"
"fmt"
"sync"
"time"
)
// FuncRunner provides a means of managing long-running functions e.g. main logic loops.
type FuncRunner struct {
// HandOff is the time after which a blocking function will be considered handed off
HandOff time.Duration
// ErrorHandler is the function that errors are passed to when encountered by the
// provided function. This can be used both for logging, and for error filtering
ErrorHandler func(err error) error
svc Service // underlying service to manage start/stop
err error // last-set error
mu sync.Mutex // protects err
}
// Go will attempt to run 'fn' asynchronously. The provided context is used to propagate requested
// cancel if FuncRunner.Stop() is called. Any returned error will be passed to FuncRunner.ErrorHandler
// for filtering/logging/etc. Any blocking functions will be waited on for FuncRunner.HandOff amount of
// time before considering the function as handed off. Returned bool is success state, i.e. returns true
// if function is successfully handed off or returns within hand off time with nil error.
func (r *FuncRunner) Go(fn func(ctx context.Context) error) bool {
done := make(chan struct{})
go func() {
var cancelled bool
has := r.svc.Run(func(ctx context.Context) {
// reset error
r.mu.Lock()
r.err = nil
r.mu.Unlock()
// Run supplied func and set errror if returned
if err := Run(func() error { return fn(ctx) }); err != nil {
r.mu.Lock()
r.err = err
r.mu.Unlock()
}
// signal done
close(done)
// Check if cancelled
select {
case <-ctx.Done():
cancelled = true
default:
cancelled = false
}
})
switch has {
// returned after starting
case true:
r.mu.Lock()
// filter out errors due FuncRunner.Stop() being called
if cancelled && errors.Is(r.err, context.Canceled) {
// filter out errors from FuncRunner.Stop() being called
r.err = nil
} else if r.err != nil && r.ErrorHandler != nil {
// pass any non-nil error to set handler
r.err = r.ErrorHandler(r.err)
}
r.mu.Unlock()
// already running
case false:
close(done)
}
}()
// get valid handoff to use
handoff := r.HandOff
if handoff < 1 {
handoff = time.Second * 5
}
select {
// handed off (long-run successful)
case <-time.After(handoff):
return true
// 'fn' returned, check error
case <-done:
return (r.Err() == nil)
}
}
// Stop will cancel the context supplied to the running function.
func (r *FuncRunner) Stop() bool {
return r.svc.Stop()
}
// Err returns the last-set error value.
func (r *FuncRunner) Err() error {
r.mu.Lock()
err := r.err
r.mu.Unlock()
return err
}
// Run will execute the supplied 'fn' catching any panics. Returns either function-returned error or formatted panic.
func Run(fn func() error) (err error) {
defer func() {
if r := recover(); r != nil {
if e, ok := r.(error); ok {
// wrap and preserve existing error
err = fmt.Errorf("caught panic: %w", e)
} else {
// simply create new error fromt iface
err = fmt.Errorf("caught panic: %v", r)
}
}
}()
// run supplied func
err = fn()
return
}
|