diff options
Diffstat (limited to 'vendor/github.com/ReneKroon/ttlcache/cache.go')
-rw-r--r-- | vendor/github.com/ReneKroon/ttlcache/cache.go | 307 |
1 files changed, 307 insertions, 0 deletions
diff --git a/vendor/github.com/ReneKroon/ttlcache/cache.go b/vendor/github.com/ReneKroon/ttlcache/cache.go new file mode 100644 index 000000000..f772d0c7c --- /dev/null +++ b/vendor/github.com/ReneKroon/ttlcache/cache.go @@ -0,0 +1,307 @@ +package ttlcache + +import ( + "sync" + "time" +) + +// CheckExpireCallback is used as a callback for an external check on item expiration +type checkExpireCallback func(key string, value interface{}) bool + +// ExpireCallback is used as a callback on item expiration or when notifying of an item new to the cache +type expireCallback func(key string, value interface{}) + +// Cache is a synchronized map of items that can auto-expire once stale +type Cache struct { + mutex sync.Mutex + ttl time.Duration + items map[string]*item + expireCallback expireCallback + checkExpireCallback checkExpireCallback + newItemCallback expireCallback + priorityQueue *priorityQueue + expirationNotification chan bool + expirationTime time.Time + skipTTLExtension bool + shutdownSignal chan (chan struct{}) + isShutDown bool +} + +func (cache *Cache) getItem(key string) (*item, bool, bool) { + item, exists := cache.items[key] + if !exists || item.expired() { + return nil, false, false + } + + if item.ttl >= 0 && (item.ttl > 0 || cache.ttl > 0) { + if cache.ttl > 0 && item.ttl == 0 { + item.ttl = cache.ttl + } + + if !cache.skipTTLExtension { + item.touch() + } + cache.priorityQueue.update(item) + } + + expirationNotification := false + if cache.expirationTime.After(time.Now().Add(item.ttl)) { + expirationNotification = true + } + return item, exists, expirationNotification +} + +func (cache *Cache) startExpirationProcessing() { + timer := time.NewTimer(time.Hour) + for { + var sleepTime time.Duration + cache.mutex.Lock() + if cache.priorityQueue.Len() > 0 { + sleepTime = time.Until(cache.priorityQueue.items[0].expireAt) + if sleepTime < 0 && cache.priorityQueue.items[0].expireAt.IsZero() { + sleepTime = time.Hour + } else if sleepTime < 0 { + sleepTime = time.Microsecond + } + if cache.ttl > 0 { + sleepTime = min(sleepTime, cache.ttl) + } + + } else if cache.ttl > 0 { + sleepTime = cache.ttl + } else { + sleepTime = time.Hour + } + + cache.expirationTime = time.Now().Add(sleepTime) + cache.mutex.Unlock() + + timer.Reset(sleepTime) + select { + case shutdownFeedback := <-cache.shutdownSignal: + timer.Stop() + cache.mutex.Lock() + if cache.priorityQueue.Len() > 0 { + cache.evictjob() + } + cache.mutex.Unlock() + shutdownFeedback <- struct{}{} + return + case <-timer.C: + timer.Stop() + cache.mutex.Lock() + if cache.priorityQueue.Len() == 0 { + cache.mutex.Unlock() + continue + } + + cache.cleanjob() + cache.mutex.Unlock() + + case <-cache.expirationNotification: + timer.Stop() + continue + } + } +} + +func (cache *Cache) evictjob() { + // index will only be advanced if the current entry will not be evicted + i := 0 + for item := cache.priorityQueue.items[i]; ; item = cache.priorityQueue.items[i] { + + cache.priorityQueue.remove(item) + delete(cache.items, item.key) + if cache.expireCallback != nil { + go cache.expireCallback(item.key, item.data) + } + if cache.priorityQueue.Len() == 0 { + return + } + } +} + +func (cache *Cache) cleanjob() { + // index will only be advanced if the current entry will not be evicted + i := 0 + for item := cache.priorityQueue.items[i]; item.expired(); item = cache.priorityQueue.items[i] { + + if cache.checkExpireCallback != nil { + if !cache.checkExpireCallback(item.key, item.data) { + item.touch() + cache.priorityQueue.update(item) + i++ + if i == cache.priorityQueue.Len() { + break + } + continue + } + } + + cache.priorityQueue.remove(item) + delete(cache.items, item.key) + if cache.expireCallback != nil { + go cache.expireCallback(item.key, item.data) + } + if cache.priorityQueue.Len() == 0 { + return + } + } +} + +// Close calls Purge, and then stops the goroutine that does ttl checking, for a clean shutdown. +// The cache is no longer cleaning up after the first call to Close, repeated calls are safe though. +func (cache *Cache) Close() { + + cache.mutex.Lock() + if !cache.isShutDown { + cache.isShutDown = true + cache.mutex.Unlock() + feedback := make(chan struct{}) + cache.shutdownSignal <- feedback + <-feedback + close(cache.shutdownSignal) + } else { + cache.mutex.Unlock() + } + cache.Purge() +} + +// Set is a thread-safe way to add new items to the map +func (cache *Cache) Set(key string, data interface{}) { + cache.SetWithTTL(key, data, ItemExpireWithGlobalTTL) +} + +// SetWithTTL is a thread-safe way to add new items to the map with individual ttl +func (cache *Cache) SetWithTTL(key string, data interface{}, ttl time.Duration) { + cache.mutex.Lock() + item, exists, _ := cache.getItem(key) + + if exists { + item.data = data + item.ttl = ttl + } else { + item = newItem(key, data, ttl) + cache.items[key] = item + } + + if item.ttl >= 0 && (item.ttl > 0 || cache.ttl > 0) { + if cache.ttl > 0 && item.ttl == 0 { + item.ttl = cache.ttl + } + item.touch() + } + + if exists { + cache.priorityQueue.update(item) + } else { + cache.priorityQueue.push(item) + } + + cache.mutex.Unlock() + if !exists && cache.newItemCallback != nil { + cache.newItemCallback(key, data) + } + cache.expirationNotification <- true +} + +// Get is a thread-safe way to lookup items +// Every lookup, also touches the item, hence extending it's life +func (cache *Cache) Get(key string) (interface{}, bool) { + cache.mutex.Lock() + item, exists, triggerExpirationNotification := cache.getItem(key) + + var dataToReturn interface{} + if exists { + dataToReturn = item.data + } + cache.mutex.Unlock() + if triggerExpirationNotification { + cache.expirationNotification <- true + } + return dataToReturn, exists +} + +func (cache *Cache) Remove(key string) bool { + cache.mutex.Lock() + object, exists := cache.items[key] + if !exists { + cache.mutex.Unlock() + return false + } + delete(cache.items, object.key) + cache.priorityQueue.remove(object) + cache.mutex.Unlock() + + return true +} + +// Count returns the number of items in the cache +func (cache *Cache) Count() int { + cache.mutex.Lock() + length := len(cache.items) + cache.mutex.Unlock() + return length +} + +func (cache *Cache) SetTTL(ttl time.Duration) { + cache.mutex.Lock() + cache.ttl = ttl + cache.mutex.Unlock() + cache.expirationNotification <- true +} + +// SetExpirationCallback sets a callback that will be called when an item expires +func (cache *Cache) SetExpirationCallback(callback expireCallback) { + cache.expireCallback = callback +} + +// SetCheckExpirationCallback sets a callback that will be called when an item is about to expire +// in order to allow external code to decide whether the item expires or remains for another TTL cycle +func (cache *Cache) SetCheckExpirationCallback(callback checkExpireCallback) { + cache.checkExpireCallback = callback +} + +// SetNewItemCallback sets a callback that will be called when a new item is added to the cache +func (cache *Cache) SetNewItemCallback(callback expireCallback) { + cache.newItemCallback = callback +} + +// SkipTtlExtensionOnHit allows the user to change the cache behaviour. When this flag is set to true it will +// no longer extend TTL of items when they are retrieved using Get, or when their expiration condition is evaluated +// using SetCheckExpirationCallback. +func (cache *Cache) SkipTtlExtensionOnHit(value bool) { + cache.skipTTLExtension = value +} + +// Purge will remove all entries +func (cache *Cache) Purge() { + cache.mutex.Lock() + cache.items = make(map[string]*item) + cache.priorityQueue = newPriorityQueue() + cache.mutex.Unlock() +} + +// NewCache is a helper to create instance of the Cache struct +func NewCache() *Cache { + + shutdownChan := make(chan chan struct{}) + + cache := &Cache{ + items: make(map[string]*item), + priorityQueue: newPriorityQueue(), + expirationNotification: make(chan bool), + expirationTime: time.Now(), + shutdownSignal: shutdownChan, + isShutDown: false, + } + go cache.startExpirationProcessing() + return cache +} + +func min(duration time.Duration, second time.Duration) time.Duration { + if duration < second { + return duration + } + return second +} |