summaryrefslogtreecommitdiff
path: root/vendor/codeberg.org/gruf/go-pools/pool.go
blob: 1e3db74b20f61db980d1df710acf4d05cb31ae26 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
package pools

import (
	"runtime"
	"sync"
	"sync/atomic"
	"unsafe"
)

type Pool struct {
	// New is used to instantiate new items
	New func() interface{}

	// Evict is called on evicted items during pool .Clean()
	Evict func(interface{})

	local    unsafe.Pointer // ptr to []_ppool
	localSz  int64          // count of all elems in local
	victim   unsafe.Pointer // ptr to []_ppool
	victimSz int64          // count of all elems in victim
	mutex    sync.Mutex     // mutex protects new cleanups, and new allocations of local
}

// Get attempts to fetch an item from the pool, failing that allocates with supplied .New() function
func (p *Pool) Get() interface{} {
	// Get local pool for proc
	// (also pins proc)
	pool, pid := p.pin()

	if v := pool.getPrivate(); v != nil {
		// local _ppool private elem acquired
		runtime_procUnpin()
		atomic.AddInt64(&p.localSz, -1)
		return v
	}

	if v := pool.get(); v != nil {
		// local _ppool queue elem acquired
		runtime_procUnpin()
		atomic.AddInt64(&p.localSz, -1)
		return v
	}

	// Unpin before attempting slow
	runtime_procUnpin()
	if v := p.getSlow(pid); v != nil {
		// note size decrementing
		// is handled within p.getSlow()
		// as we don't know if it came
		// from the local or victim pools
		return v
	}

	// Alloc new
	return p.New()
}

// Put places supplied item in the proc local pool
func (p *Pool) Put(v interface{}) {
	// Don't store nil
	if v == nil {
		return
	}

	// Get proc local pool
	// (also pins proc)
	pool, _ := p.pin()

	// first try private, then queue
	if !pool.setPrivate(v) {
		pool.put(v)
	}
	runtime_procUnpin()

	// Increment local pool size
	atomic.AddInt64(&p.localSz, 1)
}

// Clean will drop the current victim pools, move the current local pools to its
// place and reset the local pools ptr in order to be regenerated
func (p *Pool) Clean() {
	p.mutex.Lock()

	// victim becomes local, local becomes nil
	localPtr := atomic.SwapPointer(&p.local, nil)
	victimPtr := atomic.SwapPointer(&p.victim, localPtr)
	localSz := atomic.SwapInt64(&p.localSz, 0)
	atomic.StoreInt64(&p.victimSz, localSz)

	var victim []ppool
	if victimPtr != nil {
		victim = *(*[]ppool)(victimPtr)
	}

	// drain each of the vict _ppool items
	for i := 0; i < len(victim); i++ {
		ppool := &victim[i]
		ppool.evict(p.Evict)
	}

	p.mutex.Unlock()
}

// LocalSize returns the total number of elements in all the proc-local pools
func (p *Pool) LocalSize() int64 {
	return atomic.LoadInt64(&p.localSz)
}

// VictimSize returns the total number of elements in all the victim (old proc-local) pools
func (p *Pool) VictimSize() int64 {
	return atomic.LoadInt64(&p.victimSz)
}

// getSlow is the slow path for fetching an element, attempting to steal from other proc's
// local pools, and failing that, from the aging-out victim pools. pid is still passed so
// not all procs start iterating from the same index
func (p *Pool) getSlow(pid int) interface{} {
	// get local pools
	local := p.localPools()

	// Try to steal from other proc locals
	for i := 0; i < len(local); i++ {
		pool := &local[(pid+i+1)%len(local)]
		if v := pool.get(); v != nil {
			atomic.AddInt64(&p.localSz, -1)
			return v
		}
	}

	// get victim pools
	victim := p.victimPools()

	// Attempt to steal from victim pools
	for i := 0; i < len(victim); i++ {
		pool := &victim[(pid+i+1)%len(victim)]
		if v := pool.get(); v != nil {
			atomic.AddInt64(&p.victimSz, -1)
			return v
		}
	}

	// Set victim pools to nil (none found)
	atomic.StorePointer(&p.victim, nil)

	return nil
}

// localPools safely loads slice of local _ppools
func (p *Pool) localPools() []ppool {
	local := atomic.LoadPointer(&p.local)
	if local == nil {
		return nil
	}
	return *(*[]ppool)(local)
}

// victimPools safely loads slice of victim _ppools
func (p *Pool) victimPools() []ppool {
	victim := atomic.LoadPointer(&p.victim)
	if victim == nil {
		return nil
	}
	return *(*[]ppool)(victim)
}

// pin will get fetch pin proc to PID, fetch proc-local _ppool and current PID we're pinned to
func (p *Pool) pin() (*ppool, int) {
	for {
		// get local pools
		local := p.localPools()

		if len(local) > 0 {
			// local already initialized

			// pin to current proc
			pid := runtime_procPin()

			// check for pid local pool
			if pid < len(local) {
				return &local[pid], pid
			}

			// unpin from proc
			runtime_procUnpin()
		} else {
			// local not yet initialized

			// Check functions are set
			if p.New == nil {
				panic("new func must not be nil")
			}
			if p.Evict == nil {
				panic("evict func must not be nil")
			}
		}

		// allocate local
		p.allocLocal()
	}
}

// allocLocal allocates a new local pool slice, with the old length passed to check
// if pool was previously nil, or whether a change in GOMAXPROCS occurred
func (p *Pool) allocLocal() {
	// get pool lock
	p.mutex.Lock()

	// Calculate new size to use
	size := runtime.GOMAXPROCS(0)

	local := p.localPools()
	if len(local) != size {
		// GOMAXPROCS changed, reallocate
		pools := make([]ppool, size)
		atomic.StorePointer(&p.local, unsafe.Pointer(&pools))

		// Evict old local elements
		for i := 0; i < len(local); i++ {
			pool := &local[i]
			pool.evict(p.Evict)
		}
	}

	// Unlock pool
	p.mutex.Unlock()
}

// _ppool is a proc local pool
type _ppool struct {
	// root is the root element of the _ppool queue,
	// and protects concurrent access to the queue
	root unsafe.Pointer

	// private is a proc private member accessible
	// only to the pid this _ppool is assigned to,
	// except during evict (hence the unsafe pointer)
	private unsafe.Pointer
}

// ppool wraps _ppool with pad.
type ppool struct {
	_ppool

	// Prevents false sharing on widespread platforms with
	// 128 mod (cache line size) = 0 .
	pad [128 - unsafe.Sizeof(_ppool{})%128]byte
}

// getPrivate gets the proc private member
func (pp *_ppool) getPrivate() interface{} {
	ptr := atomic.SwapPointer(&pp.private, nil)
	if ptr == nil {
		return nil
	}
	return *(*interface{})(ptr)
}

// setPrivate sets the proc private member (only if unset)
func (pp *_ppool) setPrivate(v interface{}) bool {
	return atomic.CompareAndSwapPointer(&pp.private, nil, unsafe.Pointer(&v))
}

// get fetches an element from the queue
func (pp *_ppool) get() interface{} {
	for {
		// Attempt to load root elem
		root := atomic.LoadPointer(&pp.root)
		if root == nil {
			return nil
		}

		// Attempt to consume root elem
		if root == inUsePtr ||
			!atomic.CompareAndSwapPointer(&pp.root, root, inUsePtr) {
			continue
		}

		// Root becomes next in chain
		e := (*elem)(root)
		v := e.value

		// Place new root back in the chain
		atomic.StorePointer(&pp.root, unsafe.Pointer(e.next))
		putElem(e)

		return v
	}
}

// put places an element in the queue
func (pp *_ppool) put(v interface{}) {
	// Prepare next elem
	e := getElem()
	e.value = v

	for {
		// Attempt to load root elem
		root := atomic.LoadPointer(&pp.root)
		if root == inUsePtr {
			continue
		}

		// Set the next elem value (might be nil)
		e.next = (*elem)(root)

		// Attempt to store this new value at root
		if atomic.CompareAndSwapPointer(&pp.root, root, unsafe.Pointer(e)) {
			break
		}
	}
}

// hook evicts all entries from pool, calling hook on each
func (pp *_ppool) evict(hook func(interface{})) {
	if v := pp.getPrivate(); v != nil {
		hook(v)
	}
	for {
		v := pp.get()
		if v == nil {
			break
		}
		hook(v)
	}
}

// inUsePtr is a ptr used to indicate _ppool is in use
var inUsePtr = unsafe.Pointer(&elem{
	next:  nil,
	value: "in_use",
})

// elem defines an element in the _ppool queue
type elem struct {
	next  *elem
	value interface{}
}

// elemPool is a simple pool of unused elements
var elemPool = struct {
	root unsafe.Pointer
}{}

// getElem fetches a new elem from pool, or creates new
func getElem() *elem {
	// Attempt to load root elem
	root := atomic.LoadPointer(&elemPool.root)
	if root == nil {
		return &elem{}
	}

	// Attempt to consume root elem
	if root == inUsePtr ||
		!atomic.CompareAndSwapPointer(&elemPool.root, root, inUsePtr) {
		return &elem{}
	}

	// Root becomes next in chain
	e := (*elem)(root)
	atomic.StorePointer(&elemPool.root, unsafe.Pointer(e.next))
	e.next = nil

	return e
}

// putElem will place element in the pool
func putElem(e *elem) {
	e.value = nil

	// Attempt to load root elem
	root := atomic.LoadPointer(&elemPool.root)
	if root == inUsePtr {
		return // drop
	}

	// Set the next elem value (might be nil)
	e.next = (*elem)(root)

	// Attempt to store this new value at root
	atomic.CompareAndSwapPointer(&elemPool.root, root, unsafe.Pointer(e))
}

//go:linkname runtime_procPin sync.runtime_procPin
func runtime_procPin() int

//go:linkname runtime_procUnpin sync.runtime_procUnpin
func runtime_procUnpin()