summaryrefslogtreecommitdiff
path: root/vendor/codeberg.org/gruf/go-mutexes/map.go
blob: 73f8f18215892ef8827950ba4a0f835dcb7435ce (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
package mutexes

import (
	"runtime"
	"sync"
	"sync/atomic"
)

const (
	// possible lock types.
	lockTypeRead  = uint8(1) << 0
	lockTypeWrite = uint8(1) << 1
	lockTypeMap   = uint8(1) << 2

	// possible mutexmap states.
	stateUnlockd = uint8(0)
	stateRLocked = uint8(1)
	stateLocked  = uint8(2)
	stateInUse   = uint8(3)

	// default values.
	defaultWake = 1024
)

// acquireState attempts to acquire required map state for lockType.
func acquireState(state uint8, lt uint8) (uint8, bool) {
	switch state {
	// Unlocked state
	// (all allowed)
	case stateUnlockd:

	// Keys locked, no state lock.
	// (don't allow map locks)
	case stateInUse:
		if lt&lockTypeMap != 0 {
			return 0, false
		}

	// Read locked
	// (only allow read locks)
	case stateRLocked:
		if lt&lockTypeRead == 0 {
			return 0, false
		}

	// Write locked
	// (none allowed)
	case stateLocked:
		return 0, false

	// shouldn't reach here
	default:
		panic("unexpected state")
	}

	switch {
	// If unlocked and not a map
	// lock request, set in use
	case lt&lockTypeMap == 0:
		if state == stateUnlockd {
			state = stateInUse
		}

	// Set read lock state
	case lt&lockTypeRead != 0:
		state = stateRLocked

	// Set write lock state
	case lt&lockTypeWrite != 0:
		state = stateLocked

	default:
		panic("unexpected lock type")
	}

	return state, true
}

// MutexMap is a structure that allows read / write locking key, performing
// as you'd expect a map[string]*sync.RWMutex to perform. The differences
// being that the entire map can itself be read / write locked, it uses memory
// pooling for the mutex (not quite) structures, and it is self-evicting. The
// core configurations of maximum no. open locks and wake modulus* are user
// definable.
//
// * The wake modulus is the number that the current number of open locks is
// modulused against to determine how often to notify sleeping goroutines.
// These are goroutines that are attempting to lock a key / whole map and are
// awaiting a permissible state (.e.g no key write locks allowed when the
// map is read locked).
type MutexMap struct {
	queue *sync.WaitGroup
	qucnt int32

	mumap map[string]*rwmutex
	mpool pool
	evict []*rwmutex

	count int32
	maxmu int32
	wake  int32

	mapmu sync.Mutex
	state uint8
}

// NewMap returns a new MutexMap instance with provided max no. open mutexes.
func NewMap(max, wake int32) MutexMap {
	// Determine wake mod.
	if wake < 1 {
		wake = defaultWake
	}

	// Determine max no. mutexes
	if max < 1 {
		procs := runtime.GOMAXPROCS(0)
		max = wake * int32(procs)
	}

	return MutexMap{
		queue: &sync.WaitGroup{},
		mumap: make(map[string]*rwmutex, max),
		maxmu: max,
		wake:  wake,
	}
}

// SET sets the MutexMap max open locks and wake modulus, returns current values.
// For values less than zero defaults are set, and zero is non-op.
func (mm *MutexMap) SET(max, wake int32) (int32, int32) {
	mm.mapmu.Lock()

	switch {
	// Set default wake
	case wake < 0:
		mm.wake = defaultWake

	// Set supplied wake
	case wake > 0:
		mm.wake = wake
	}

	switch {
	// Set default max
	case max < 0:
		procs := runtime.GOMAXPROCS(0)
		mm.maxmu = wake * int32(procs)

	// Set supplied max
	case max > 0:
		mm.maxmu = max
	}

	// Fetch values
	max = mm.maxmu
	wake = mm.wake

	mm.mapmu.Unlock()
	return max, wake
}

// spinLock will wait (using a mutex to sleep thread) until conditional returns true.
func (mm *MutexMap) spinLock(cond func() bool) {
	for {
		// Acquire map lock
		mm.mapmu.Lock()

		if cond() {
			return
		}

		// Current queue ptr
		queue := mm.queue

		// Queue ourselves
		queue.Add(1)
		mm.qucnt++

		// Unlock map
		mm.mapmu.Unlock()

		// Wait on notify
		mm.queue.Wait()
	}
}

// lock will acquire a lock of given type on the 'mutex' at key.
func (mm *MutexMap) lock(key string, lt uint8) func() {
	var ok bool
	var mu *rwmutex

	// Spin lock until returns true
	mm.spinLock(func() bool {
		// Check not overloaded
		if !(mm.count < mm.maxmu) {
			return false
		}

		// Attempt to acquire usable map state
		state, ok := acquireState(mm.state, lt)
		if !ok {
			return false
		}

		// Update state
		mm.state = state

		// Ensure mutex at key
		// is in lockable state
		mu, ok = mm.mumap[key]
		return !ok || mu.CanLock(lt)
	})

	// Incr count
	mm.count++

	if !ok {
		// No mutex found for key

		// Alloc mu from pool
		mu = mm.mpool.Acquire()
		mm.mumap[key] = mu

		// Set our key
		mu.key = key

		// Queue for eviction
		mm.evict = append(mm.evict, mu)
	}

	// Lock mutex
	mu.Lock(lt)

	// Unlock map
	mm.mapmu.Unlock()

	return func() {
		mm.mapmu.Lock()
		mu.Unlock()
		mm.cleanup()
	}
}

// lockMap will lock the whole map under given lock type.
func (mm *MutexMap) lockMap(lt uint8) {
	// Spin lock until returns true
	mm.spinLock(func() bool {
		// Attempt to acquire usable map state
		state, ok := acquireState(mm.state, lt)
		if !ok {
			return false
		}

		// Update state
		mm.state = state

		return true
	})

	// Incr count
	mm.count++

	// State acquired, unlock
	mm.mapmu.Unlock()
}

// cleanup is performed as the final stage of unlocking a locked key / map state, finally unlocks map.
func (mm *MutexMap) cleanup() {
	// Decr count
	mm.count--

	// Calculate current wake modulus
	wakemod := mm.count % mm.wake

	if mm.count != 0 && wakemod != 0 {
		// Fast path => no cleanup.
		// Unlock, return early
		mm.mapmu.Unlock()
		return
	}

	go func() {
		if wakemod == 0 {
			// Release queued goroutines
			mm.queue.Add(-int(mm.qucnt))

			// Allocate new queue and reset
			mm.queue = &sync.WaitGroup{}
			mm.qucnt = 0
		}

		if mm.count == 0 {
			// Perform evictions
			for _, mu := range mm.evict {
				key := mu.key
				mu.key = ""
				delete(mm.mumap, key)
				mm.mpool.Release(mu)
			}

			// Reset map state
			mm.evict = mm.evict[:0]
			mm.state = stateUnlockd
			mm.mpool.GC()
		}

		// Unlock map
		mm.mapmu.Unlock()
	}()
}

// RLockMap acquires a read lock over the entire map, returning a lock state for acquiring key read locks.
// Please note that the 'unlock()' function will block until all keys locked from this state are unlocked.
func (mm *MutexMap) RLockMap() *LockState {
	mm.lockMap(lockTypeRead | lockTypeMap)
	return &LockState{
		mmap: mm,
		ltyp: lockTypeRead,
	}
}

// LockMap acquires a write lock over the entire map, returning a lock state for acquiring key read/write locks.
// Please note that the 'unlock()' function will block until all keys locked from this state are unlocked.
func (mm *MutexMap) LockMap() *LockState {
	mm.lockMap(lockTypeWrite | lockTypeMap)
	return &LockState{
		mmap: mm,
		ltyp: lockTypeWrite,
	}
}

// RLock acquires a mutex read lock for supplied key, returning an RUnlock function.
func (mm *MutexMap) RLock(key string) (runlock func()) {
	return mm.lock(key, lockTypeRead)
}

// Lock acquires a mutex write lock for supplied key, returning an Unlock function.
func (mm *MutexMap) Lock(key string) (unlock func()) {
	return mm.lock(key, lockTypeWrite)
}

// LockState represents a window to a locked MutexMap.
type LockState struct {
	wait sync.WaitGroup
	mmap *MutexMap
	done uint32
	ltyp uint8
}

// Lock: see MutexMap.Lock() definition. Will panic if map only read locked.
func (st *LockState) Lock(key string) (unlock func()) {
	return st.lock(key, lockTypeWrite)
}

// RLock: see MutexMap.RLock() definition.
func (st *LockState) RLock(key string) (runlock func()) {
	return st.lock(key, lockTypeRead)
}

// lock: see MutexMap.lock() definition.
func (st *LockState) lock(key string, lt uint8) func() {
	st.wait.Add(1) // track lock

	if atomic.LoadUint32(&st.done) == 1 {
		panic("called (r)lock on unlocked state")
	} else if lt&lockTypeWrite != 0 &&
		st.ltyp&lockTypeWrite == 0 {
		panic("called lock on rlocked map")
	}

	var ok bool
	var mu *rwmutex

	// Spin lock until returns true
	st.mmap.spinLock(func() bool {
		// Check not overloaded
		if !(st.mmap.count < st.mmap.maxmu) {
			return false
		}

		// Ensure mutex at key
		// is in lockable state
		mu, ok = st.mmap.mumap[key]
		return !ok || mu.CanLock(lt)
	})

	// Incr count
	st.mmap.count++

	if !ok {
		// No mutex found for key

		// Alloc mu from pool
		mu = st.mmap.mpool.Acquire()
		st.mmap.mumap[key] = mu

		// Set our key
		mu.key = key

		// Queue for eviction
		st.mmap.evict = append(st.mmap.evict, mu)
	}

	// Lock mutex
	mu.Lock(lt)

	// Unlock map
	st.mmap.mapmu.Unlock()

	return func() {
		st.mmap.mapmu.Lock()
		mu.Unlock()
		st.mmap.cleanup()
		st.wait.Add(-1)
	}
}

// UnlockMap will close this state and release the currently locked map.
func (st *LockState) UnlockMap() {
	if !atomic.CompareAndSwapUint32(&st.done, 0, 1) {
		panic("called unlockmap on expired state")
	}
	st.wait.Wait()
	st.mmap.mapmu.Lock()
	st.mmap.cleanup()
}

// rwmutex is a very simple *representation* of a read-write
// mutex, though not one in implementation. it works by
// tracking the lock state for a given map key, which is
// protected by the map's mutex.
type rwmutex struct {
	rcnt int32  // read lock count
	lock uint8  // lock type
	key  string // map key
}

func (mu *rwmutex) CanLock(lt uint8) bool {
	return mu.lock == 0 ||
		(mu.lock&lockTypeRead != 0 && lt&lockTypeRead != 0)
}

func (mu *rwmutex) Lock(lt uint8) {
	// Set lock type
	mu.lock = lt

	if lt&lockTypeRead != 0 {
		// RLock, increment
		mu.rcnt++
	}
}

func (mu *rwmutex) Unlock() {
	if mu.rcnt > 0 {
		// RUnlock
		mu.rcnt--
	}

	if mu.rcnt == 0 {
		// Total unlock
		mu.lock = 0
	}
}