1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
|
package pools
import (
"runtime"
"sync"
"sync/atomic"
"unsafe"
)
type Pool struct {
// New is used to instantiate new items
New func() interface{}
// Evict is called on evicted items during pool .Clean()
Evict func(interface{})
local unsafe.Pointer // ptr to []_ppool
localSz int64 // count of all elems in local
victim unsafe.Pointer // ptr to []_ppool
victimSz int64 // count of all elems in victim
mutex sync.Mutex // mutex protects new cleanups, and new allocations of local
}
// Get attempts to fetch an item from the pool, failing that allocates with supplied .New() function
func (p *Pool) Get() interface{} {
// Get local pool for proc
// (also pins proc)
pool, pid := p.pin()
if v := pool.getPrivate(); v != nil {
// local _ppool private elem acquired
runtime_procUnpin()
atomic.AddInt64(&p.localSz, -1)
return v
}
if v := pool.get(); v != nil {
// local _ppool queue elem acquired
runtime_procUnpin()
atomic.AddInt64(&p.localSz, -1)
return v
}
// Unpin before attempting slow
runtime_procUnpin()
if v := p.getSlow(pid); v != nil {
// note size decrementing
// is handled within p.getSlow()
// as we don't know if it came
// from the local or victim pools
return v
}
// Alloc new
return p.New()
}
// Put places supplied item in the proc local pool
func (p *Pool) Put(v interface{}) {
// Don't store nil
if v == nil {
return
}
// Get proc local pool
// (also pins proc)
pool, _ := p.pin()
// first try private, then queue
if !pool.setPrivate(v) {
pool.put(v)
}
runtime_procUnpin()
// Increment local pool size
atomic.AddInt64(&p.localSz, 1)
}
// Clean will drop the current victim pools, move the current local pools to its
// place and reset the local pools ptr in order to be regenerated
func (p *Pool) Clean() {
p.mutex.Lock()
// victim becomes local, local becomes nil
localPtr := atomic.SwapPointer(&p.local, nil)
victimPtr := atomic.SwapPointer(&p.victim, localPtr)
localSz := atomic.SwapInt64(&p.localSz, 0)
atomic.StoreInt64(&p.victimSz, localSz)
var victim []ppool
if victimPtr != nil {
victim = *(*[]ppool)(victimPtr)
}
// drain each of the vict _ppool items
for i := 0; i < len(victim); i++ {
ppool := &victim[i]
ppool.evict(p.Evict)
}
p.mutex.Unlock()
}
// LocalSize returns the total number of elements in all the proc-local pools
func (p *Pool) LocalSize() int64 {
return atomic.LoadInt64(&p.localSz)
}
// VictimSize returns the total number of elements in all the victim (old proc-local) pools
func (p *Pool) VictimSize() int64 {
return atomic.LoadInt64(&p.victimSz)
}
// getSlow is the slow path for fetching an element, attempting to steal from other proc's
// local pools, and failing that, from the aging-out victim pools. pid is still passed so
// not all procs start iterating from the same index
func (p *Pool) getSlow(pid int) interface{} {
// get local pools
local := p.localPools()
// Try to steal from other proc locals
for i := 0; i < len(local); i++ {
pool := &local[(pid+i+1)%len(local)]
if v := pool.get(); v != nil {
atomic.AddInt64(&p.localSz, -1)
return v
}
}
// get victim pools
victim := p.victimPools()
// Attempt to steal from victim pools
for i := 0; i < len(victim); i++ {
pool := &victim[(pid+i+1)%len(victim)]
if v := pool.get(); v != nil {
atomic.AddInt64(&p.victimSz, -1)
return v
}
}
// Set victim pools to nil (none found)
atomic.StorePointer(&p.victim, nil)
return nil
}
// localPools safely loads slice of local _ppools
func (p *Pool) localPools() []ppool {
local := atomic.LoadPointer(&p.local)
if local == nil {
return nil
}
return *(*[]ppool)(local)
}
// victimPools safely loads slice of victim _ppools
func (p *Pool) victimPools() []ppool {
victim := atomic.LoadPointer(&p.victim)
if victim == nil {
return nil
}
return *(*[]ppool)(victim)
}
// pin will get fetch pin proc to PID, fetch proc-local _ppool and current PID we're pinned to
func (p *Pool) pin() (*ppool, int) {
for {
// get local pools
local := p.localPools()
if len(local) > 0 {
// local already initialized
// pin to current proc
pid := runtime_procPin()
// check for pid local pool
if pid < len(local) {
return &local[pid], pid
}
// unpin from proc
runtime_procUnpin()
} else {
// local not yet initialized
// Check functions are set
if p.New == nil {
panic("new func must not be nil")
}
if p.Evict == nil {
panic("evict func must not be nil")
}
}
// allocate local
p.allocLocal()
}
}
// allocLocal allocates a new local pool slice, with the old length passed to check
// if pool was previously nil, or whether a change in GOMAXPROCS occurred
func (p *Pool) allocLocal() {
// get pool lock
p.mutex.Lock()
// Calculate new size to use
size := runtime.GOMAXPROCS(0)
local := p.localPools()
if len(local) != size {
// GOMAXPROCS changed, reallocate
pools := make([]ppool, size)
atomic.StorePointer(&p.local, unsafe.Pointer(&pools))
// Evict old local elements
for i := 0; i < len(local); i++ {
pool := &local[i]
pool.evict(p.Evict)
}
}
// Unlock pool
p.mutex.Unlock()
}
// _ppool is a proc local pool
type _ppool struct {
// root is the root element of the _ppool queue,
// and protects concurrent access to the queue
root unsafe.Pointer
// private is a proc private member accessible
// only to the pid this _ppool is assigned to,
// except during evict (hence the unsafe pointer)
private unsafe.Pointer
}
// ppool wraps _ppool with pad.
type ppool struct {
_ppool
// Prevents false sharing on widespread platforms with
// 128 mod (cache line size) = 0 .
pad [128 - unsafe.Sizeof(_ppool{})%128]byte
}
// getPrivate gets the proc private member
func (pp *_ppool) getPrivate() interface{} {
ptr := atomic.SwapPointer(&pp.private, nil)
if ptr == nil {
return nil
}
return *(*interface{})(ptr)
}
// setPrivate sets the proc private member (only if unset)
func (pp *_ppool) setPrivate(v interface{}) bool {
return atomic.CompareAndSwapPointer(&pp.private, nil, unsafe.Pointer(&v))
}
// get fetches an element from the queue
func (pp *_ppool) get() interface{} {
for {
// Attempt to load root elem
root := atomic.LoadPointer(&pp.root)
if root == nil {
return nil
}
// Attempt to consume root elem
if root == inUsePtr ||
!atomic.CompareAndSwapPointer(&pp.root, root, inUsePtr) {
continue
}
// Root becomes next in chain
e := (*elem)(root)
v := e.value
// Place new root back in the chain
atomic.StorePointer(&pp.root, unsafe.Pointer(e.next))
putElem(e)
return v
}
}
// put places an element in the queue
func (pp *_ppool) put(v interface{}) {
// Prepare next elem
e := getElem()
e.value = v
for {
// Attempt to load root elem
root := atomic.LoadPointer(&pp.root)
if root == inUsePtr {
continue
}
// Set the next elem value (might be nil)
e.next = (*elem)(root)
// Attempt to store this new value at root
if atomic.CompareAndSwapPointer(&pp.root, root, unsafe.Pointer(e)) {
break
}
}
}
// hook evicts all entries from pool, calling hook on each
func (pp *_ppool) evict(hook func(interface{})) {
if v := pp.getPrivate(); v != nil {
hook(v)
}
for {
v := pp.get()
if v == nil {
break
}
hook(v)
}
}
// inUsePtr is a ptr used to indicate _ppool is in use
var inUsePtr = unsafe.Pointer(&elem{
next: nil,
value: "in_use",
})
// elem defines an element in the _ppool queue
type elem struct {
next *elem
value interface{}
}
// elemPool is a simple pool of unused elements
var elemPool = struct {
root unsafe.Pointer
}{}
// getElem fetches a new elem from pool, or creates new
func getElem() *elem {
// Attempt to load root elem
root := atomic.LoadPointer(&elemPool.root)
if root == nil {
return &elem{}
}
// Attempt to consume root elem
if root == inUsePtr ||
!atomic.CompareAndSwapPointer(&elemPool.root, root, inUsePtr) {
return &elem{}
}
// Root becomes next in chain
e := (*elem)(root)
atomic.StorePointer(&elemPool.root, unsafe.Pointer(e.next))
e.next = nil
return e
}
// putElem will place element in the pool
func putElem(e *elem) {
e.value = nil
// Attempt to load root elem
root := atomic.LoadPointer(&elemPool.root)
if root == inUsePtr {
return // drop
}
// Set the next elem value (might be nil)
e.next = (*elem)(root)
// Attempt to store this new value at root
atomic.CompareAndSwapPointer(&elemPool.root, root, unsafe.Pointer(e))
}
//go:linkname runtime_procPin sync.runtime_procPin
func runtime_procPin() int
//go:linkname runtime_procUnpin sync.runtime_procUnpin
func runtime_procUnpin()
|