1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
|
package runners
import (
"context"
"sync"
)
// Service provides a means of tracking a single long-running service, provided protected state
// changes and preventing multiple instances running. Also providing service state information.
type Service struct {
state uint32 // 0=stopped, 1=running, 2=stopping
mutex sync.Mutex // mutex protects overall state changes
wait sync.Mutex // wait is used as a single-entity wait-group, only ever locked within 'mutex'
ctx chan struct{} // ctx is the current context for running function (or nil if not running)
}
// Run will run the supplied function until completion, using given context to propagate cancel.
// Immediately returns false if the Service is already running, and true after completed run.
func (svc *Service) Run(fn func(context.Context)) bool {
// Attempt to start the svc
ctx, ok := svc.doStart()
if !ok {
return false
}
defer func() {
// unlock single wait
svc.wait.Unlock()
// ensure stopped
_ = svc.Stop()
}()
// Run with context.
fn(CancelCtx(ctx))
return true
}
// GoRun will run the supplied function until completion in a goroutine, using given context to
// propagate cancel. Immediately returns boolean indicating success, or that service is already running.
func (svc *Service) GoRun(fn func(context.Context)) bool {
// Attempt to start the svc
ctx, ok := svc.doStart()
if !ok {
return false
}
go func() {
defer func() {
// unlock single wait
svc.wait.Unlock()
// ensure stopped
_ = svc.Stop()
}()
// Run with context.
fn(CancelCtx(ctx))
}()
return true
}
// RunWait is functionally the same as .Run(), but blocks until the first instance of .Run() returns.
func (svc *Service) RunWait(fn func(context.Context)) bool {
// Attempt to start the svc
ctx, ok := svc.doStart()
if !ok {
<-ctx // block
return false
}
defer func() {
// unlock single wait
svc.wait.Unlock()
// ensure stopped
_ = svc.Stop()
}()
// Run with context.
fn(CancelCtx(ctx))
return true
}
// Stop will attempt to stop the service, cancelling the running function's context. Immediately
// returns false if not running, and true only after Service is fully stopped.
func (svc *Service) Stop() bool {
// Attempt to stop the svc
ctx, ok := svc.doStop()
if !ok {
return false
}
defer func() {
// Get svc lock
svc.mutex.Lock()
// Wait until stopped
svc.wait.Lock()
svc.wait.Unlock()
// Reset the svc
svc.ctx = nil
svc.state = 0
svc.mutex.Unlock()
}()
// Cancel ctx
close(ctx)
return true
}
// While allows you to execute given function guaranteed within current
// service state. Please note that this will hold the underlying service
// state change mutex open while executing the function.
func (svc *Service) While(fn func()) {
// Protect state change
svc.mutex.Lock()
defer svc.mutex.Unlock()
// Run
fn()
}
// doStart will safely set Service state to started, returning a ptr to this context insance.
func (svc *Service) doStart() (chan struct{}, bool) {
// Protect startup
svc.mutex.Lock()
if svc.ctx == nil {
// this will only have been allocated
// if svc.Done() was already called.
svc.ctx = make(chan struct{})
}
// Take our own ptr
ctx := svc.ctx
if svc.state != 0 {
// State was not stopped.
svc.mutex.Unlock()
return ctx, false
}
// Set started.
svc.state = 1
// Start waiter.
svc.wait.Lock()
// Unlock and return
svc.mutex.Unlock()
return ctx, true
}
// doStop will safely set Service state to stopping, returning a ptr to this cancelfunc instance.
func (svc *Service) doStop() (chan struct{}, bool) {
// Protect stop
svc.mutex.Lock()
if svc.state != 1 /* not started */ {
svc.mutex.Unlock()
return nil, false
}
// state stopping
svc.state = 2
// Take our own ptr
// and unlock state
ctx := svc.ctx
svc.mutex.Unlock()
return ctx, true
}
// Running returns if Service is running (i.e. state NOT stopped / stopping).
func (svc *Service) Running() bool {
svc.mutex.Lock()
state := svc.state
svc.mutex.Unlock()
return (state == 1)
}
// Done returns a channel that's closed when Service.Stop() is called. It is
// the same channel provided to the currently running service function.
func (svc *Service) Done() <-chan struct{} {
var done <-chan struct{}
svc.mutex.Lock()
switch svc.state {
// stopped
case 0:
if svc.ctx == nil {
// here we create a new context so that the
// returned 'done' channel here will still
// be valid for when Service is next started.
svc.ctx = make(chan struct{})
}
done = svc.ctx
// started
case 1:
done = svc.ctx
// stopping
case 2:
done = svc.ctx
}
svc.mutex.Unlock()
return done
}
|