diff options
Diffstat (limited to 'internal')
| -rw-r--r-- | internal/queue/simple.go | 88 | 
1 files changed, 74 insertions, 14 deletions
diff --git a/internal/queue/simple.go b/internal/queue/simple.go index 9307183f2..0a19cfbf4 100644 --- a/internal/queue/simple.go +++ b/internal/queue/simple.go @@ -24,38 +24,67 @@ import (  	"codeberg.org/gruf/go-list"  ) +// frequency of GC cycles +// per no. unlocks. i.e. +// every 'gcfreq' unlocks. +const gcfreq = 1024 +  // SimpleQueue provides a simple concurrency safe  // queue using generics and a memory pool of list  // elements to reduce overall memory usage.  type SimpleQueue[T any] struct {  	l list.List[T] -	p []*list.Elem[T] +	p elemPool[T]  	w chan struct{}  	m sync.Mutex +	n uint32 // pop counter (safely wraps around)  }  // Push will push given value to the queue.  func (q *SimpleQueue[T]) Push(value T) {  	q.m.Lock() -	elem := q.alloc() + +	// Wrap in element. +	elem := q.p.alloc()  	elem.Value = value + +	// Push new elem to queue.  	q.l.PushElemFront(elem) +  	if q.w != nil { +		// Notify any goroutines +		// blocking on q.Wait(), +		// or on PopCtx(...).  		close(q.w)  		q.w = nil  	} +  	q.m.Unlock()  }  // Pop will attempt to pop value from the queue.  func (q *SimpleQueue[T]) Pop() (value T, ok bool) {  	q.m.Lock() + +	// Check for a tail (i.e. not empty).  	if ok = (q.l.Tail != nil); ok { + +		// Extract value.  		tail := q.l.Tail  		value = tail.Value + +		// Remove tail.  		q.l.Remove(tail) -		q.free(tail) +		q.p.free(tail) + +		// Every 'gcfreq' pops perform +		// a garbage collection to keep +		// us squeaky clean :] +		if q.n++; q.n%gcfreq == 0 { +			q.p.GC() +		}  	} +  	q.m.Unlock()  	return  } @@ -105,7 +134,14 @@ func (q *SimpleQueue[T]) PopCtx(ctx context.Context) (value T, ok bool) {  	// Remove element.  	q.l.Remove(elem) -	q.free(elem) +	q.p.free(elem) + +	// Every 'gcfreq' pops perform +	// a garbage collection to keep +	// us squeaky clean :] +	if q.n++; q.n%gcfreq == 0 { +		q.p.GC() +	}  	// Done with lock.  	q.m.Unlock() @@ -121,21 +157,45 @@ func (q *SimpleQueue[T]) Len() int {  	return l  } -// alloc will allocate new list element (relying on memory pool). -func (q *SimpleQueue[T]) alloc() *list.Elem[T] { -	if len(q.p) > 0 { -		elem := q.p[len(q.p)-1] -		q.p = q.p[:len(q.p)-1] -		return elem +// elemPool is a very simple +// list.Elem[T] memory pool. +type elemPool[T any] struct { +	current []*list.Elem[T] +	victim  []*list.Elem[T] +} + +func (p *elemPool[T]) alloc() *list.Elem[T] { +	// First try the current queue +	if l := len(p.current) - 1; l >= 0 { +		mu := p.current[l] +		p.current = p.current[:l] +		return mu  	} -	return new(list.Elem[T]) + +	// Next try the victim queue. +	if l := len(p.victim) - 1; l >= 0 { +		mu := p.victim[l] +		p.victim = p.victim[:l] +		return mu +	} + +	// Lastly, alloc new. +	mu := new(list.Elem[T]) +	return mu  } -// free will free list element and release to pool. -func (q *SimpleQueue[T]) free(elem *list.Elem[T]) { +// free will release given element to pool. +func (p *elemPool[T]) free(elem *list.Elem[T]) {  	var zero T  	elem.Next = nil  	elem.Prev = nil  	elem.Value = zero -	q.p = append(q.p, elem) +	p.current = append(p.current, elem) +} + +// GC will clear out unused entries from the elemPool. +func (p *elemPool[T]) GC() { +	current := p.current +	p.current = nil +	p.victim = current  }  | 
