diff options
Diffstat (limited to 'vendor/golang.org/x/net/internal')
| -rw-r--r-- | vendor/golang.org/x/net/internal/timeseries/timeseries.go | 525 | 
1 files changed, 525 insertions, 0 deletions
diff --git a/vendor/golang.org/x/net/internal/timeseries/timeseries.go b/vendor/golang.org/x/net/internal/timeseries/timeseries.go new file mode 100644 index 000000000..dc5225b6d --- /dev/null +++ b/vendor/golang.org/x/net/internal/timeseries/timeseries.go @@ -0,0 +1,525 @@ +// Copyright 2015 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +// Package timeseries implements a time series structure for stats collection. +package timeseries // import "golang.org/x/net/internal/timeseries" + +import ( +	"fmt" +	"log" +	"time" +) + +const ( +	timeSeriesNumBuckets       = 64 +	minuteHourSeriesNumBuckets = 60 +) + +var timeSeriesResolutions = []time.Duration{ +	1 * time.Second, +	10 * time.Second, +	1 * time.Minute, +	10 * time.Minute, +	1 * time.Hour, +	6 * time.Hour, +	24 * time.Hour,          // 1 day +	7 * 24 * time.Hour,      // 1 week +	4 * 7 * 24 * time.Hour,  // 4 weeks +	16 * 7 * 24 * time.Hour, // 16 weeks +} + +var minuteHourSeriesResolutions = []time.Duration{ +	1 * time.Second, +	1 * time.Minute, +} + +// An Observable is a kind of data that can be aggregated in a time series. +type Observable interface { +	Multiply(ratio float64)    // Multiplies the data in self by a given ratio +	Add(other Observable)      // Adds the data from a different observation to self +	Clear()                    // Clears the observation so it can be reused. +	CopyFrom(other Observable) // Copies the contents of a given observation to self +} + +// Float attaches the methods of Observable to a float64. +type Float float64 + +// NewFloat returns a Float. +func NewFloat() Observable { +	f := Float(0) +	return &f +} + +// String returns the float as a string. +func (f *Float) String() string { return fmt.Sprintf("%g", f.Value()) } + +// Value returns the float's value. +func (f *Float) Value() float64 { return float64(*f) } + +func (f *Float) Multiply(ratio float64) { *f *= Float(ratio) } + +func (f *Float) Add(other Observable) { +	o := other.(*Float) +	*f += *o +} + +func (f *Float) Clear() { *f = 0 } + +func (f *Float) CopyFrom(other Observable) { +	o := other.(*Float) +	*f = *o +} + +// A Clock tells the current time. +type Clock interface { +	Time() time.Time +} + +type defaultClock int + +var defaultClockInstance defaultClock + +func (defaultClock) Time() time.Time { return time.Now() } + +// Information kept per level. Each level consists of a circular list of +// observations. The start of the level may be derived from end and the +// len(buckets) * sizeInMillis. +type tsLevel struct { +	oldest   int               // index to oldest bucketed Observable +	newest   int               // index to newest bucketed Observable +	end      time.Time         // end timestamp for this level +	size     time.Duration     // duration of the bucketed Observable +	buckets  []Observable      // collections of observations +	provider func() Observable // used for creating new Observable +} + +func (l *tsLevel) Clear() { +	l.oldest = 0 +	l.newest = len(l.buckets) - 1 +	l.end = time.Time{} +	for i := range l.buckets { +		if l.buckets[i] != nil { +			l.buckets[i].Clear() +			l.buckets[i] = nil +		} +	} +} + +func (l *tsLevel) InitLevel(size time.Duration, numBuckets int, f func() Observable) { +	l.size = size +	l.provider = f +	l.buckets = make([]Observable, numBuckets) +} + +// Keeps a sequence of levels. Each level is responsible for storing data at +// a given resolution. For example, the first level stores data at a one +// minute resolution while the second level stores data at a one hour +// resolution. + +// Each level is represented by a sequence of buckets. Each bucket spans an +// interval equal to the resolution of the level. New observations are added +// to the last bucket. +type timeSeries struct { +	provider    func() Observable // make more Observable +	numBuckets  int               // number of buckets in each level +	levels      []*tsLevel        // levels of bucketed Observable +	lastAdd     time.Time         // time of last Observable tracked +	total       Observable        // convenient aggregation of all Observable +	clock       Clock             // Clock for getting current time +	pending     Observable        // observations not yet bucketed +	pendingTime time.Time         // what time are we keeping in pending +	dirty       bool              // if there are pending observations +} + +// init initializes a level according to the supplied criteria. +func (ts *timeSeries) init(resolutions []time.Duration, f func() Observable, numBuckets int, clock Clock) { +	ts.provider = f +	ts.numBuckets = numBuckets +	ts.clock = clock +	ts.levels = make([]*tsLevel, len(resolutions)) + +	for i := range resolutions { +		if i > 0 && resolutions[i-1] >= resolutions[i] { +			log.Print("timeseries: resolutions must be monotonically increasing") +			break +		} +		newLevel := new(tsLevel) +		newLevel.InitLevel(resolutions[i], ts.numBuckets, ts.provider) +		ts.levels[i] = newLevel +	} + +	ts.Clear() +} + +// Clear removes all observations from the time series. +func (ts *timeSeries) Clear() { +	ts.lastAdd = time.Time{} +	ts.total = ts.resetObservation(ts.total) +	ts.pending = ts.resetObservation(ts.pending) +	ts.pendingTime = time.Time{} +	ts.dirty = false + +	for i := range ts.levels { +		ts.levels[i].Clear() +	} +} + +// Add records an observation at the current time. +func (ts *timeSeries) Add(observation Observable) { +	ts.AddWithTime(observation, ts.clock.Time()) +} + +// AddWithTime records an observation at the specified time. +func (ts *timeSeries) AddWithTime(observation Observable, t time.Time) { + +	smallBucketDuration := ts.levels[0].size + +	if t.After(ts.lastAdd) { +		ts.lastAdd = t +	} + +	if t.After(ts.pendingTime) { +		ts.advance(t) +		ts.mergePendingUpdates() +		ts.pendingTime = ts.levels[0].end +		ts.pending.CopyFrom(observation) +		ts.dirty = true +	} else if t.After(ts.pendingTime.Add(-1 * smallBucketDuration)) { +		// The observation is close enough to go into the pending bucket. +		// This compensates for clock skewing and small scheduling delays +		// by letting the update stay in the fast path. +		ts.pending.Add(observation) +		ts.dirty = true +	} else { +		ts.mergeValue(observation, t) +	} +} + +// mergeValue inserts the observation at the specified time in the past into all levels. +func (ts *timeSeries) mergeValue(observation Observable, t time.Time) { +	for _, level := range ts.levels { +		index := (ts.numBuckets - 1) - int(level.end.Sub(t)/level.size) +		if 0 <= index && index < ts.numBuckets { +			bucketNumber := (level.oldest + index) % ts.numBuckets +			if level.buckets[bucketNumber] == nil { +				level.buckets[bucketNumber] = level.provider() +			} +			level.buckets[bucketNumber].Add(observation) +		} +	} +	ts.total.Add(observation) +} + +// mergePendingUpdates applies the pending updates into all levels. +func (ts *timeSeries) mergePendingUpdates() { +	if ts.dirty { +		ts.mergeValue(ts.pending, ts.pendingTime) +		ts.pending = ts.resetObservation(ts.pending) +		ts.dirty = false +	} +} + +// advance cycles the buckets at each level until the latest bucket in +// each level can hold the time specified. +func (ts *timeSeries) advance(t time.Time) { +	if !t.After(ts.levels[0].end) { +		return +	} +	for i := 0; i < len(ts.levels); i++ { +		level := ts.levels[i] +		if !level.end.Before(t) { +			break +		} + +		// If the time is sufficiently far, just clear the level and advance +		// directly. +		if !t.Before(level.end.Add(level.size * time.Duration(ts.numBuckets))) { +			for _, b := range level.buckets { +				ts.resetObservation(b) +			} +			level.end = time.Unix(0, (t.UnixNano()/level.size.Nanoseconds())*level.size.Nanoseconds()) +		} + +		for t.After(level.end) { +			level.end = level.end.Add(level.size) +			level.newest = level.oldest +			level.oldest = (level.oldest + 1) % ts.numBuckets +			ts.resetObservation(level.buckets[level.newest]) +		} + +		t = level.end +	} +} + +// Latest returns the sum of the num latest buckets from the level. +func (ts *timeSeries) Latest(level, num int) Observable { +	now := ts.clock.Time() +	if ts.levels[0].end.Before(now) { +		ts.advance(now) +	} + +	ts.mergePendingUpdates() + +	result := ts.provider() +	l := ts.levels[level] +	index := l.newest + +	for i := 0; i < num; i++ { +		if l.buckets[index] != nil { +			result.Add(l.buckets[index]) +		} +		if index == 0 { +			index = ts.numBuckets +		} +		index-- +	} + +	return result +} + +// LatestBuckets returns a copy of the num latest buckets from level. +func (ts *timeSeries) LatestBuckets(level, num int) []Observable { +	if level < 0 || level > len(ts.levels) { +		log.Print("timeseries: bad level argument: ", level) +		return nil +	} +	if num < 0 || num >= ts.numBuckets { +		log.Print("timeseries: bad num argument: ", num) +		return nil +	} + +	results := make([]Observable, num) +	now := ts.clock.Time() +	if ts.levels[0].end.Before(now) { +		ts.advance(now) +	} + +	ts.mergePendingUpdates() + +	l := ts.levels[level] +	index := l.newest + +	for i := 0; i < num; i++ { +		result := ts.provider() +		results[i] = result +		if l.buckets[index] != nil { +			result.CopyFrom(l.buckets[index]) +		} + +		if index == 0 { +			index = ts.numBuckets +		} +		index -= 1 +	} +	return results +} + +// ScaleBy updates observations by scaling by factor. +func (ts *timeSeries) ScaleBy(factor float64) { +	for _, l := range ts.levels { +		for i := 0; i < ts.numBuckets; i++ { +			l.buckets[i].Multiply(factor) +		} +	} + +	ts.total.Multiply(factor) +	ts.pending.Multiply(factor) +} + +// Range returns the sum of observations added over the specified time range. +// If start or finish times don't fall on bucket boundaries of the same +// level, then return values are approximate answers. +func (ts *timeSeries) Range(start, finish time.Time) Observable { +	return ts.ComputeRange(start, finish, 1)[0] +} + +// Recent returns the sum of observations from the last delta. +func (ts *timeSeries) Recent(delta time.Duration) Observable { +	now := ts.clock.Time() +	return ts.Range(now.Add(-delta), now) +} + +// Total returns the total of all observations. +func (ts *timeSeries) Total() Observable { +	ts.mergePendingUpdates() +	return ts.total +} + +// ComputeRange computes a specified number of values into a slice using +// the observations recorded over the specified time period. The return +// values are approximate if the start or finish times don't fall on the +// bucket boundaries at the same level or if the number of buckets spanning +// the range is not an integral multiple of num. +func (ts *timeSeries) ComputeRange(start, finish time.Time, num int) []Observable { +	if start.After(finish) { +		log.Printf("timeseries: start > finish, %v>%v", start, finish) +		return nil +	} + +	if num < 0 { +		log.Printf("timeseries: num < 0, %v", num) +		return nil +	} + +	results := make([]Observable, num) + +	for _, l := range ts.levels { +		if !start.Before(l.end.Add(-l.size * time.Duration(ts.numBuckets))) { +			ts.extract(l, start, finish, num, results) +			return results +		} +	} + +	// Failed to find a level that covers the desired range. So just +	// extract from the last level, even if it doesn't cover the entire +	// desired range. +	ts.extract(ts.levels[len(ts.levels)-1], start, finish, num, results) + +	return results +} + +// RecentList returns the specified number of values in slice over the most +// recent time period of the specified range. +func (ts *timeSeries) RecentList(delta time.Duration, num int) []Observable { +	if delta < 0 { +		return nil +	} +	now := ts.clock.Time() +	return ts.ComputeRange(now.Add(-delta), now, num) +} + +// extract returns a slice of specified number of observations from a given +// level over a given range. +func (ts *timeSeries) extract(l *tsLevel, start, finish time.Time, num int, results []Observable) { +	ts.mergePendingUpdates() + +	srcInterval := l.size +	dstInterval := finish.Sub(start) / time.Duration(num) +	dstStart := start +	srcStart := l.end.Add(-srcInterval * time.Duration(ts.numBuckets)) + +	srcIndex := 0 + +	// Where should scanning start? +	if dstStart.After(srcStart) { +		advance := int(dstStart.Sub(srcStart) / srcInterval) +		srcIndex += advance +		srcStart = srcStart.Add(time.Duration(advance) * srcInterval) +	} + +	// The i'th value is computed as show below. +	// interval = (finish/start)/num +	// i'th value = sum of observation in range +	//   [ start + i       * interval, +	//     start + (i + 1) * interval ) +	for i := 0; i < num; i++ { +		results[i] = ts.resetObservation(results[i]) +		dstEnd := dstStart.Add(dstInterval) +		for srcIndex < ts.numBuckets && srcStart.Before(dstEnd) { +			srcEnd := srcStart.Add(srcInterval) +			if srcEnd.After(ts.lastAdd) { +				srcEnd = ts.lastAdd +			} + +			if !srcEnd.Before(dstStart) { +				srcValue := l.buckets[(srcIndex+l.oldest)%ts.numBuckets] +				if !srcStart.Before(dstStart) && !srcEnd.After(dstEnd) { +					// dst completely contains src. +					if srcValue != nil { +						results[i].Add(srcValue) +					} +				} else { +					// dst partially overlaps src. +					overlapStart := maxTime(srcStart, dstStart) +					overlapEnd := minTime(srcEnd, dstEnd) +					base := srcEnd.Sub(srcStart) +					fraction := overlapEnd.Sub(overlapStart).Seconds() / base.Seconds() + +					used := ts.provider() +					if srcValue != nil { +						used.CopyFrom(srcValue) +					} +					used.Multiply(fraction) +					results[i].Add(used) +				} + +				if srcEnd.After(dstEnd) { +					break +				} +			} +			srcIndex++ +			srcStart = srcStart.Add(srcInterval) +		} +		dstStart = dstStart.Add(dstInterval) +	} +} + +// resetObservation clears the content so the struct may be reused. +func (ts *timeSeries) resetObservation(observation Observable) Observable { +	if observation == nil { +		observation = ts.provider() +	} else { +		observation.Clear() +	} +	return observation +} + +// TimeSeries tracks data at granularities from 1 second to 16 weeks. +type TimeSeries struct { +	timeSeries +} + +// NewTimeSeries creates a new TimeSeries using the function provided for creating new Observable. +func NewTimeSeries(f func() Observable) *TimeSeries { +	return NewTimeSeriesWithClock(f, defaultClockInstance) +} + +// NewTimeSeriesWithClock creates a new TimeSeries using the function provided for creating new Observable and the clock for +// assigning timestamps. +func NewTimeSeriesWithClock(f func() Observable, clock Clock) *TimeSeries { +	ts := new(TimeSeries) +	ts.timeSeries.init(timeSeriesResolutions, f, timeSeriesNumBuckets, clock) +	return ts +} + +// MinuteHourSeries tracks data at granularities of 1 minute and 1 hour. +type MinuteHourSeries struct { +	timeSeries +} + +// NewMinuteHourSeries creates a new MinuteHourSeries using the function provided for creating new Observable. +func NewMinuteHourSeries(f func() Observable) *MinuteHourSeries { +	return NewMinuteHourSeriesWithClock(f, defaultClockInstance) +} + +// NewMinuteHourSeriesWithClock creates a new MinuteHourSeries using the function provided for creating new Observable and the clock for +// assigning timestamps. +func NewMinuteHourSeriesWithClock(f func() Observable, clock Clock) *MinuteHourSeries { +	ts := new(MinuteHourSeries) +	ts.timeSeries.init(minuteHourSeriesResolutions, f, +		minuteHourSeriesNumBuckets, clock) +	return ts +} + +func (ts *MinuteHourSeries) Minute() Observable { +	return ts.timeSeries.Latest(0, 60) +} + +func (ts *MinuteHourSeries) Hour() Observable { +	return ts.timeSeries.Latest(1, 60) +} + +func minTime(a, b time.Time) time.Time { +	if a.Before(b) { +		return a +	} +	return b +} + +func maxTime(a, b time.Time) time.Time { +	if a.After(b) { +		return a +	} +	return b +}  | 
