diff --git a/pkg/ts/testmodel/data.go b/pkg/ts/testmodel/data.go new file mode 100644 index 000000000000..2992d0eb7432 --- /dev/null +++ b/pkg/ts/testmodel/data.go @@ -0,0 +1,199 @@ +// Copyright 2018 The Cockroach Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +// implied. See the License for the specific language governing +// permissions and limitations under the License. + +package testmodel + +import ( + "math" + "sort" +) + +// DataPoint represents a single point in a time series. It is a timestamp/value +// pair. +type DataPoint struct { + timestamp int64 + value float64 +} + +// dp is a shorthand function for constructing a DataPoint, used for convenience +// in tests. +func dp(timestamp int64, value float64) DataPoint { + return DataPoint{ + timestamp: timestamp, + value: value, + } +} + +// DataSeries represents a series of data points ordered by timestamp. +type DataSeries []DataPoint + +func (data DataSeries) Len() int { return len(data) } +func (data DataSeries) Swap(i, j int) { data[i], data[j] = data[j], data[i] } +func (data DataSeries) Less(i, j int) bool { return data[i].timestamp < data[j].timestamp } + +func normalizeTime(time, resolution int64) int64 { + return time - time%resolution +} + +// timeSlice returns the set of the dataPoints from the supplied series with +// timestamps that fall in the interval [start, end) (not inclusive of end +// timestamp). +func (data DataSeries) timeSlice(start, end int64) DataSeries { + startIdx := sort.Search(len(data), func(i int) bool { + return data[i].timestamp >= start + }) + endIdx := sort.Search(len(data), func(i int) bool { + return end <= data[i].timestamp + }) + + result := data[startIdx:endIdx] + if len(result) == 0 { + return nil + } + return result +} + +// groupByResolution aggregates data points in the given series into time +// buckets based on the provided resolution. +func (data DataSeries) groupByResolution(resolution int64, aggFunc aggFunc) DataSeries { + if len(data) == 0 { + return nil + } + + start := normalizeTime(data[0].timestamp, resolution) + end := normalizeTime(data[len(data)-1].timestamp, resolution) + result := make(DataSeries, 0, (end-start)/resolution+1) + + for len(data) > 0 { + bucketTime := normalizeTime(data[0].timestamp, resolution) + // Grab the index of the first data point which does not belong to the same + // bucket as the start data point. + bucketEndIdx := sort.Search(len(data), func(idx int) bool { + return normalizeTime(data[idx].timestamp, resolution) > bucketTime + }) + // Compute the next point as an aggregate of all underlying points which + // go in the same bucket. + result = append(result, dp(bucketTime, aggFunc(data[:bucketEndIdx]))) + data = data[bucketEndIdx:] + } + + return result +} + +// fillForResolution is used to fill in gaps in the provided data based on the +// provided resolution and fill function; any gaps longer than the resolution +// size will be eligible for fill. This is intended to be called on data sets +// that have been generated using groupByResolution, and may have unexpected +// results otherwise. +func (data DataSeries) fillForResolution(resolution int64, fillFunc fillFunc) DataSeries { + if len(data) < 2 { + return data + } + + result := make(DataSeries, 0, len(data)) + result = append(result, data[0]) + for i := 1; i < len(data); i++ { + if data[i].timestamp-data[i-1].timestamp > resolution { + result = append(result, fillFunc(data[:i], data[i:], resolution)...) + } + result = append(result, data[i]) + } + + return result +} + +// rateOfChange returns the rate of change (over the supplied period) for each +// point in the supplied series, which is defined as: +// (value - valuePrev) / ((time - timePrev) / period) +// The returned series will be shorter than the original series by one, since +// the rate of change for the first datapoint cannot be computed in this +// fashion. +func (data DataSeries) rateOfChange(period int64) DataSeries { + if len(data) < 2 { + return nil + } + + result := make(DataSeries, len(data)-1) + for i := 1; i < len(data); i++ { + result[i-1] = dp( + data[i].timestamp, + (data[i].value-data[i-1].value)/float64((data[i].timestamp-data[i-1].timestamp)/period), + ) + } + return result +} + +// nonNegative replaces any values less than zero with a zero. +func (data DataSeries) nonNegative() DataSeries { + result := make(DataSeries, len(data)) + for i := 0; i < len(data); i++ { + if data[i].value >= 0 { + result[i] = data[i] + } else { + result[i] = dp(data[i].timestamp, 0) + } + } + return result +} + +// groupSeriesByTimestamp returns a single DataSeries by aggregating DataPoints +// with matching timestamps from the supplied set of series. +func groupSeriesByTimestamp(datas []DataSeries, aggFunc aggFunc) DataSeries { + if len(datas) == 0 { + return nil + } + + results := make(DataSeries, 0) + dataPointsToAggregate := make(DataSeries, 0, len(datas)) + for { + // Filter empty data series. + origDatas := datas + datas = datas[:0] + for _, data := range origDatas { + if len(data) > 0 { + datas = append(datas, data) + } + } + if len(datas) == 0 { + break + } + + // Create a slice of datapoints which share the earliest timestamp of any + // datapoint across all collections. If the data series are all perfectly + // aligned (same length and timestamps), then this will just be he first + // data point in each series. + earliestTime := int64(math.MaxInt64) + for _, data := range datas { + if data[0].timestamp < earliestTime { + // New earliest timestamp found, discard any points which were + // previously in the collection. + dataPointsToAggregate = dataPointsToAggregate[:0] + earliestTime = data[0].timestamp + } + if data[0].timestamp == earliestTime { + // Data point matches earliest timestamp, add it to current datapoint + // collection. + dataPointsToAggregate = append(dataPointsToAggregate, data[0]) + } + } + results = append(results, dp(earliestTime, aggFunc(dataPointsToAggregate))) + for i := range datas { + if datas[i][0].timestamp == earliestTime { + datas[i] = datas[i][1:] + } + } + } + + return results +} diff --git a/pkg/ts/testmodel/data_test.go b/pkg/ts/testmodel/data_test.go new file mode 100644 index 000000000000..d02c56a3915d --- /dev/null +++ b/pkg/ts/testmodel/data_test.go @@ -0,0 +1,430 @@ +// Copyright 2018 The Cockroach Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +// implied. See the License for the specific language governing +// permissions and limitations under the License. + +package testmodel + +import ( + "reflect" + "testing" +) + +func TestDataSeriesTimeSlice(t *testing.T) { + testData := DataSeries{ + dp(1, 5), + dp(2, 5), + dp(8, 10), + dp(12, 10), + dp(17, 5), + dp(34, 50), + dp(35, 75), + dp(40, 10), + dp(49, 10), + dp(100, 500), + dp(109, 50), + dp(115, 99), + } + + for _, tc := range []struct { + start int64 + end int64 + expected DataSeries + }{ + { + start: 0, + end: 120, + expected: testData, + }, + { + start: 0, + end: 0, + expected: nil, + }, + { + start: 1000, + end: 1001, + expected: nil, + }, + { + start: 1, + end: 2, + expected: testData[:1], + }, + { + start: 15, + end: 49, + expected: DataSeries{ + dp(17, 5), + dp(34, 50), + dp(35, 75), + dp(40, 10), + }, + }, + } { + t.Run("", func(t *testing.T) { + results := testData.timeSlice(tc.start, tc.end) + if a, e := results, tc.expected; !reflect.DeepEqual(a, e) { + t.Errorf("time slice got %v, wanted %v", a, e) + } + }) + } +} + +func TestDataSeriesGroupByResolution(t *testing.T) { + // Each test uses the same input data. The input data includes several scenarios + // for missing/present data, such as gaps of different sizes and irregular + // recording patterns + testData := DataSeries{ + dp(1, 5), + dp(2, 5), + dp(8, 10), + dp(12, 10), + dp(17, 5), + dp(34, 50), + dp(35, 75), + dp(40, 10), + dp(41, 10), + dp(42, 10), + dp(43, 10), + dp(44, 10), + dp(45, 10), + dp(46, 10), + dp(47, 10), + dp(48, 10), + dp(49, 10), + dp(100, 500), + dp(109, 50), + dp(115, 99), + } + + for _, tc := range []struct { + resolution int64 + aggFunc aggFunc + expected DataSeries + }{ + // Group by 10 second resolution, aggregate add. + { + resolution: 10, + aggFunc: aggFuncSum, + expected: DataSeries{ + dp(0, 20), + dp(10, 15), + dp(30, 125), + dp(40, 100), + dp(100, 550), + dp(110, 99), + }, + }, + // Group by 10 second resolution, aggregate last. + { + resolution: 10, + aggFunc: aggFuncLast, + expected: DataSeries{ + dp(0, 10), + dp(10, 5), + dp(30, 75), + dp(40, 10), + dp(100, 50), + dp(110, 99), + }, + }, + // Group by 20 second resolution, aggregate last. + { + resolution: 20, + aggFunc: aggFuncLast, + expected: DataSeries{ + dp(0, 5), + dp(20, 75), + dp(40, 10), + dp(100, 99), + }, + }, + // Group by 100 second resolution, aggregate add. + { + resolution: 100, + aggFunc: aggFuncSum, + expected: DataSeries{ + dp(0, 260), + dp(100, 649), + }, + }, + } { + t.Run("", func(t *testing.T) { + results := testData.groupByResolution(tc.resolution, tc.aggFunc) + if a, e := results, tc.expected; !reflect.DeepEqual(a, e) { + t.Errorf("group by resolution got %v, wanted %v", a, e) + } + }) + } +} + +func TestDataSeriesFillByResolution(t *testing.T) { + testData := DataSeries{ + dp(0, 10), + dp(10, 5), + dp(30, 75), + dp(40, 10), + dp(100, 70), + dp(110, 99), + } + for _, tc := range []struct { + resolution int64 + fillFunc fillFunc + expected DataSeries + }{ + // fill function that does nothing + { + resolution: 10, + fillFunc: func(_ DataSeries, _ DataSeries, _ int64) DataSeries { + return nil + }, + expected: DataSeries{ + dp(0, 10), + dp(10, 5), + dp(30, 75), + dp(40, 10), + dp(100, 70), + dp(110, 99), + }, + }, + // fill function that returns a constant value + { + resolution: 10, + fillFunc: func(before DataSeries, _ DataSeries, res int64) DataSeries { + return DataSeries{ + dp(before[len(before)-1].timestamp+res, 777), + } + }, + expected: DataSeries{ + dp(0, 10), + dp(10, 5), + dp(20, 777), + dp(30, 75), + dp(40, 10), + dp(50, 777), + dp(100, 70), + dp(110, 99), + }, + }, + // interpolation fill function + { + resolution: 10, + fillFunc: fillFuncLinearInterpolate, + expected: DataSeries{ + dp(0, 10), + dp(10, 5), + dp(20, 40), + dp(30, 75), + dp(40, 10), + dp(50, 20), + dp(60, 30), + dp(70, 40), + dp(80, 50), + dp(90, 60), + dp(100, 70), + dp(110, 99), + }, + }, + } { + t.Run("", func(t *testing.T) { + results := testData.fillForResolution(tc.resolution, tc.fillFunc) + if a, e := results, tc.expected; !reflect.DeepEqual(a, e) { + t.Errorf("fill by resolution got %v, wanted %v", a, e) + } + }) + } +} + +func TestDataSeriesRateOfChange(t *testing.T) { + testData := DataSeries{ + dp(0, 10), + dp(10, 5), + dp(30, 75), + dp(40, 10), + dp(100, 70), + dp(110, 99), + } + for _, tc := range []struct { + period int64 + expected DataSeries + }{ + { + period: 10, + expected: DataSeries{ + dp(10, -5), + dp(30, 35), + dp(40, -65), + dp(100, 10), + dp(110, 29), + }, + }, + { + period: 1, + expected: DataSeries{ + dp(10, -0.5), + dp(30, 3.5), + dp(40, -6.5), + dp(100, 1), + dp(110, 2.9), + }, + }, + } { + t.Run("", func(t *testing.T) { + results := testData.rateOfChange(tc.period) + if a, e := results, tc.expected; !reflect.DeepEqual(a, e) { + t.Errorf("rate of change got %v, wanted %v", a, e) + } + }) + } +} + +func TestDataSeriesNonNegative(t *testing.T) { + for _, tc := range []struct { + input DataSeries + expected DataSeries + }{ + { + input: DataSeries{ + dp(10, -5), + dp(30, 35), + dp(40, -65), + dp(100, 10), + dp(110, 29), + }, + expected: DataSeries{ + dp(10, 0), + dp(30, 35), + dp(40, 0), + dp(100, 10), + dp(110, 29), + }, + }, + { + input: DataSeries{ + dp(10, -0.5), + }, + expected: DataSeries{ + dp(10, 0), + }, + }, + { + input: DataSeries{ + dp(10, 0.001), + }, + expected: DataSeries{ + dp(10, 0.001), + }, + }, + { + input: DataSeries{}, + expected: DataSeries{}, + }, + } { + t.Run("", func(t *testing.T) { + results := tc.input.nonNegative() + if a, e := results, tc.expected; !reflect.DeepEqual(a, e) { + t.Errorf("rate of change got %v, wanted %v", a, e) + } + }) + } +} + +func TestDataSeriesGroupByTimestamp(t *testing.T) { + testData1 := DataSeries{ + dp(10, 1), + dp(20, 1), + dp(30, 1), + dp(140, 1), + dp(777, 1), + } + testData2 := DataSeries{ + dp(10, 2), + dp(20, 2), + dp(30, 2), + dp(140, 2), + dp(777, 2), + } + testDataStaggered := DataSeries{ + dp(10, 5), + dp(25, 5), + dp(30, 5), + dp(141, 5), + dp(888, 5), + } + + for _, tc := range []struct { + inputs []DataSeries + aggFunc aggFunc + expected DataSeries + }{ + { + inputs: nil, + aggFunc: aggFuncSum, + expected: nil, + }, + { + inputs: []DataSeries{testData1}, + aggFunc: aggFuncSum, + expected: DataSeries{ + dp(10, 1), + dp(20, 1), + dp(30, 1), + dp(140, 1), + dp(777, 1), + }, + }, + { + inputs: []DataSeries{testData1, testData2}, + aggFunc: aggFuncSum, + expected: DataSeries{ + dp(10, 3), + dp(20, 3), + dp(30, 3), + dp(140, 3), + dp(777, 3), + }, + }, + { + inputs: []DataSeries{testData1, testData2}, + aggFunc: aggFuncAvg, + expected: DataSeries{ + dp(10, 1.5), + dp(20, 1.5), + dp(30, 1.5), + dp(140, 1.5), + dp(777, 1.5), + }, + }, + { + inputs: []DataSeries{testData1, testData2, testDataStaggered}, + aggFunc: aggFuncSum, + expected: DataSeries{ + dp(10, 8), + dp(20, 3), + dp(25, 5), + dp(30, 8), + dp(140, 3), + dp(141, 5), + dp(777, 3), + dp(888, 5), + }, + }, + } { + t.Run("", func(t *testing.T) { + results := groupSeriesByTimestamp(tc.inputs, tc.aggFunc) + if a, e := results, tc.expected; !reflect.DeepEqual(a, e) { + t.Errorf("rate of change got %v, wanted %v", a, e) + } + }) + } +} diff --git a/pkg/ts/testmodel/db.go b/pkg/ts/testmodel/db.go new file mode 100644 index 000000000000..529c4c1c9530 --- /dev/null +++ b/pkg/ts/testmodel/db.go @@ -0,0 +1,147 @@ +// Copyright 2018 The Cockroach Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +// implied. See the License for the specific language governing +// permissions and limitations under the License. + +package testmodel + +import ( + "fmt" + "sort" + "time" + + "github.com/cockroachdb/cockroach/pkg/ts/tspb" +) + +// ModelDB is a purely in-memory model of CockroachDB's time series database, +// where time series can be stored and queried. +type ModelDB struct { + data map[string]DataSeries + metricNameToDataSources map[string]map[string]struct{} +} + +// NewModelDB instantiates a new ModelDB instance. +func NewModelDB() *ModelDB { + return &ModelDB{ + data: make(map[string]DataSeries), + metricNameToDataSources: make(map[string]map[string]struct{}), + } +} + +// Record stores the given data series for the supplied metric and data source, +// merging it with any previously recorded data for the same series. +func (mdb *ModelDB) Record(metricName, dataSource string, data DataSeries) { + dataSources, ok := mdb.metricNameToDataSources[metricName] + if !ok { + dataSources = make(map[string]struct{}) + mdb.metricNameToDataSources[metricName] = dataSources + } + dataSources[dataSource] = struct{}{} + + seriesName := seriesName(metricName, dataSource) + mdb.data[seriesName] = append(mdb.data[seriesName], data...) + sort.Sort(mdb.data[seriesName]) +} + +// Query retrieves aggregated data from the model database in the same way that +// data is currently queried from CockroachDB's time series database. Each query +// has a named metric, an optional set of sources, and a number of specified +// aggregation options: +// +// + A downsampler function, which is used to group series by resolution +// + An aggregation function, which is used to group multiples series by +// timestamp +// + A derivative option, which transforms the returned series into a rate of +// change. +// +// Each query has a sample duration (determines the length of the group-by-time +// interval), a start and end time, and an 'interpolation limit' which is a +// maximum gap size above which missing data is not filled. When fills are +// performed, linear interpolation is always used. +func (mdb *ModelDB) Query( + name string, + sources []string, + downsample, agg *tspb.TimeSeriesQueryAggregator, + derivative *tspb.TimeSeriesQueryDerivative, + sampleDuration, start, end, interpolationLimit int64, +) DataSeries { + // If explicit sources were not specified, use every source currently + // available for this particular metric. + if len(sources) == 0 { + sourceMap, ok := mdb.metricNameToDataSources[name] + if !ok { + return nil + } + sources = make([]string, 0, len(sourceMap)) + for k := range sourceMap { + sources = append(sources, k) + } + } + + queryData := make([]DataSeries, 0, len(sources)) + for _, source := range sources { + queryData = append(queryData, mdb.getSeriesData(name, source)) + } + + // Process data according to query parameters. + adjustedStart := normalizeTime(start, sampleDuration) - interpolationLimit + adjustedEnd := normalizeTime(end, sampleDuration) + interpolationLimit + for i := range queryData { + data := queryData[i] + + // Slice to relevant period. + data = data.timeSlice(adjustedStart, adjustedEnd) + + // Group by resolution according to the provided sampleDuration. + data = data.groupByResolution(sampleDuration, getAggFunction(*downsample)) + + // Fill in missing data points using linear interpolation. + data = data.fillForResolution( + sampleDuration, + func(before DataSeries, after DataSeries, res int64) DataSeries { + // Do not fill if this gap exceeds the interpolation limit. + start := before[len(before)-1] + end := after[0] + if end.timestamp-start.timestamp > interpolationLimit { + return nil + } + + return fillFuncLinearInterpolate(before, after, res) + }, + ) + + // Convert series to its rate-of-change if specified. + if *derivative != tspb.TimeSeriesQueryDerivative_NONE { + data = data.rateOfChange(time.Second.Nanoseconds()) + if *derivative == tspb.TimeSeriesQueryDerivative_NON_NEGATIVE_DERIVATIVE { + data = data.nonNegative() + } + } + + queryData[i] = data + } + + return groupSeriesByTimestamp(queryData, getAggFunction(*agg)).timeSlice(start, end) +} + +func (mdb *ModelDB) getSeriesData(metricName, dataSource string) []DataPoint { + seriesName := seriesName(metricName, dataSource) + data, ok := mdb.data[seriesName] + if !ok { + return nil + } + return data +} + +func seriesName(metricName, dataSource string) string { + return fmt.Sprintf("%s$$%s", metricName, dataSource) +} diff --git a/pkg/ts/testmodel/db_test.go b/pkg/ts/testmodel/db_test.go new file mode 100644 index 000000000000..79d6822547eb --- /dev/null +++ b/pkg/ts/testmodel/db_test.go @@ -0,0 +1,200 @@ +// Copyright 2018 The Cockroach Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +// implied. See the License for the specific language governing +// permissions and limitations under the License. + +package testmodel + +import ( + "reflect" + "testing" + + "github.com/cockroachdb/cockroach/pkg/ts/tspb" +) + +func TestModelDBQuery(t *testing.T) { + db := NewModelDB() + db.Record("testmetric", "source1", DataSeries{ + dp(0, 0.0), + dp(100, 100.0), + dp(200, 200.0), + dp(400, 400.0), + }) + db.Record("testmetric", "source2", DataSeries{ + dp(0, 0.0), + dp(103, 50.0), + dp(199, 150.0), + dp(205, 400.0), + dp(301, 600.0), + dp(425, 800.0), + }) + db.Record("othermetric", "source1", DataSeries{ + dp(150, 10000), + dp(250, 10000), + dp(600, 5000), + }) + + for _, tc := range []struct { + seriesName string + sources []string + downsampler tspb.TimeSeriesQueryAggregator + aggregator tspb.TimeSeriesQueryAggregator + derivative tspb.TimeSeriesQueryDerivative + sampleDuration int64 + start int64 + end int64 + interpolationLimit int64 + expected DataSeries + }{ + // Basic Query + { + seriesName: "testmetric", + sources: nil, + downsampler: tspb.TimeSeriesQueryAggregator_AVG, + aggregator: tspb.TimeSeriesQueryAggregator_SUM, + derivative: tspb.TimeSeriesQueryDerivative_NONE, + sampleDuration: 100, + start: 0, + end: 10000, + interpolationLimit: 10000, + expected: DataSeries{ + dp(0, 0.0), + dp(100, 200.0), + dp(200, 600.0), + dp(300, 900.0), + dp(400, 1200.0), + }, + }, + // Different downsampler + { + seriesName: "testmetric", + sources: nil, + downsampler: tspb.TimeSeriesQueryAggregator_MAX, + aggregator: tspb.TimeSeriesQueryAggregator_SUM, + derivative: tspb.TimeSeriesQueryDerivative_NONE, + sampleDuration: 100, + start: 0, + end: 10000, + interpolationLimit: 10000, + expected: DataSeries{ + dp(0, 0.0), + dp(100, 250.0), + dp(200, 600.0), + dp(300, 900.0), + dp(400, 1200.0), + }, + }, + // Different aggregator + { + seriesName: "testmetric", + sources: nil, + downsampler: tspb.TimeSeriesQueryAggregator_AVG, + aggregator: tspb.TimeSeriesQueryAggregator_MAX, + derivative: tspb.TimeSeriesQueryDerivative_NONE, + sampleDuration: 100, + start: 0, + end: 10000, + interpolationLimit: 10000, + expected: DataSeries{ + dp(0, 0.0), + dp(100, 100.0), + dp(200, 400.0), + dp(300, 600.0), + dp(400, 800.0), + }, + }, + // Single-source Query + { + seriesName: "testmetric", + sources: []string{"source2"}, + downsampler: tspb.TimeSeriesQueryAggregator_AVG, + aggregator: tspb.TimeSeriesQueryAggregator_SUM, + derivative: tspb.TimeSeriesQueryDerivative_NONE, + sampleDuration: 100, + start: 0, + end: 10000, + interpolationLimit: 10000, + expected: DataSeries{ + dp(0, 0.0), + dp(100, 100.0), + dp(200, 400.0), + dp(300, 600.0), + dp(400, 800.0), + }, + }, + // Limited time. + { + seriesName: "testmetric", + sources: nil, + downsampler: tspb.TimeSeriesQueryAggregator_AVG, + aggregator: tspb.TimeSeriesQueryAggregator_SUM, + derivative: tspb.TimeSeriesQueryDerivative_NONE, + sampleDuration: 100, + start: 200, + end: 400, + interpolationLimit: 10000, + expected: DataSeries{ + dp(200, 600.0), + dp(300, 900.0), + }, + }, + // No interpolation. + { + seriesName: "testmetric", + sources: nil, + downsampler: tspb.TimeSeriesQueryAggregator_AVG, + aggregator: tspb.TimeSeriesQueryAggregator_SUM, + derivative: tspb.TimeSeriesQueryDerivative_NONE, + sampleDuration: 100, + start: 0, + end: 10000, + interpolationLimit: 0, + expected: DataSeries{ + dp(0, 0.0), + dp(100, 200.0), + dp(200, 600.0), + dp(300, 600.0), + dp(400, 1200.0), + }, + }, + // No data. + { + seriesName: "wrongmetric", + sources: nil, + downsampler: tspb.TimeSeriesQueryAggregator_AVG, + aggregator: tspb.TimeSeriesQueryAggregator_SUM, + derivative: tspb.TimeSeriesQueryDerivative_NONE, + sampleDuration: 100, + start: 0, + end: 10000, + interpolationLimit: 10000, + expected: nil, + }, + } { + t.Run("", func(t *testing.T) { + result := db.Query( + tc.seriesName, + tc.sources, + tc.downsampler.Enum(), + tc.aggregator.Enum(), + tc.derivative.Enum(), + tc.sampleDuration, + tc.start, + tc.end, + tc.interpolationLimit, + ) + if a, e := result, tc.expected; !reflect.DeepEqual(a, e) { + t.Errorf("query got result %v, wanted %v", a, e) + } + }) + } +} diff --git a/pkg/ts/testmodel/functions.go b/pkg/ts/testmodel/functions.go new file mode 100644 index 000000000000..69a5e7541427 --- /dev/null +++ b/pkg/ts/testmodel/functions.go @@ -0,0 +1,99 @@ +// Copyright 2018 The Cockroach Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +// implied. See the License for the specific language governing +// permissions and limitations under the License. + +package testmodel + +import ( + "fmt" + "math" + + "github.com/cockroachdb/cockroach/pkg/ts/tspb" +) + +type aggFunc func(DataSeries) float64 +type fillFunc func(DataSeries, DataSeries, int64) DataSeries + +func aggFuncSum(data DataSeries) float64 { + total := 0.0 + for _, dp := range data { + total += dp.value + } + return total +} + +func aggFuncLast(data DataSeries) float64 { + return data[len(data)-1].value +} + +func aggFuncAvg(data DataSeries) float64 { + if len(data) == 0 { + return 0.0 + } + return aggFuncSum(data) / float64(len(data)) +} + +func aggFuncMax(data DataSeries) float64 { + max := -math.MaxFloat64 + for _, dp := range data { + if dp.value > max { + max = dp.value + } + } + return max +} + +func aggFuncMin(data DataSeries) float64 { + min := math.MaxFloat64 + for _, dp := range data { + if dp.value < min { + min = dp.value + } + } + return min +} + +// getAggFunction is a convenience method used to process an aggregator option +// from our time series query protobuffer format. +func getAggFunction(agg tspb.TimeSeriesQueryAggregator) aggFunc { + switch agg { + case tspb.TimeSeriesQueryAggregator_AVG: + return aggFuncAvg + case tspb.TimeSeriesQueryAggregator_SUM: + return aggFuncSum + case tspb.TimeSeriesQueryAggregator_MAX: + return aggFuncMax + case tspb.TimeSeriesQueryAggregator_MIN: + return aggFuncMin + } + + // The model should not be called with an invalid aggregator option. + panic(fmt.Sprintf("unknown aggregator option specified: %v", agg)) +} + +func fillFuncLinearInterpolate(before DataSeries, after DataSeries, resolution int64) DataSeries { + start := before[len(before)-1] + end := after[0] + + // compute interpolation step + step := (end.value - start.value) / float64(end.timestamp-start.timestamp) + + result := make(DataSeries, (end.timestamp-start.timestamp)/resolution-1) + for i := range result { + result[i] = dp( + start.timestamp+(resolution*int64(i+1)), + start.value+(step*float64(i+1)*float64(resolution)), + ) + } + return result +}