diff --git a/pkg/storage/ts_maintenance_queue_test.go b/pkg/storage/ts_maintenance_queue_test.go index ae01075da317..bf0cfa8091af 100644 --- a/pkg/storage/ts_maintenance_queue_test.go +++ b/pkg/storage/ts_maintenance_queue_test.go @@ -308,9 +308,11 @@ func TestTimeSeriesMaintenanceQueueServer(t *testing.T) { context.TODO(), tspb.Query{Name: seriesName}, ts.Resolution10s, - ts.Resolution10s.SampleDuration(), - 0, - now+ts.Resolution10s.SlabDuration(), + ts.QueryTimespan{ + SampleDurationNanos: ts.Resolution10s.SampleDuration(), + StartNanos: 0, + EndNanos: now + ts.Resolution10s.SlabDuration(), + }, memContext, ) return dps, err diff --git a/pkg/ts/db_test.go b/pkg/ts/db_test.go index b8b2d55d2bdd..6cd78e107c43 100644 --- a/pkg/ts/db_test.go +++ b/pkg/ts/db_test.go @@ -313,14 +313,13 @@ func (tm *testModelRunner) assertQuery( memContext := tm.makeMemoryContext(interpolationLimit) defer memContext.Close(context.TODO()) + timespan := QueryTimespan{ + StartNanos: start, + EndNanos: end, + SampleDurationNanos: sampleDuration, + } actualDatapoints, actualSources, err := tm.DB.QueryMemoryConstrained( - context.TODO(), - q, - r, - sampleDuration, - start, - end, - memContext, + context.TODO(), q, r, timespan, memContext, ) if err != nil { tm.t.Fatal(err) diff --git a/pkg/ts/query.go b/pkg/ts/query.go index e312c43878b4..4a311b4d1665 100644 --- a/pkg/ts/query.go +++ b/pkg/ts/query.go @@ -761,51 +761,49 @@ func (ai aggregatingIterator) makeLeadingEdgeFilter( // query into multiple "chunks" as necessary according to the provided memory // budget. Chunks are queried sequentially, ensuring that the memory budget // applies to only a single chunk at any given time. -// -// In addition to the memory budget, an "expected source count" must be provided -// to this function. This allows the query to better predict how many slabs -// it will encounter when querying a time span. func (db *DB) QueryMemoryConstrained( ctx context.Context, query tspb.Query, queryResolution Resolution, - sampleDuration, startNanos, endNanos int64, + timespan QueryTimespan, mem QueryMemoryContext, ) ([]tspb.TimeSeriesDatapoint, []string, error) { - maxTimespan, err := mem.GetMaxTimespan(queryResolution) + maxTimespanWidth, err := mem.GetMaxTimespan(queryResolution) if err != nil { return nil, nil, err } - totalTimespan := endNanos - startNanos - if maxTimespan >= totalTimespan { + timespan.normalize() + if maxTimespanWidth >= timespan.width() { return db.Query( ctx, query, queryResolution, - sampleDuration, - startNanos, - endNanos, + timespan, mem, ) } allData := make([]tspb.TimeSeriesDatapoint, 0) allSourcesMap := make(map[string]struct{}) - for s, e := startNanos, startNanos+maxTimespan; s < endNanos; s, e = e, e+maxTimespan { - // End span is not inclusive for partial queries. - adjustedEnd := e - queryResolution.SampleDuration() - // Do not exceed the specified endNanos. - if adjustedEnd > endNanos { - adjustedEnd = endNanos + chunkTime := timespan + chunkTime.EndNanos = chunkTime.StartNanos + maxTimespanWidth + for ; chunkTime.StartNanos < timespan.EndNanos; chunkTime.slideForward() { + adjustedChunkTime := chunkTime + if adjustedChunkTime.EndNanos > timespan.EndNanos { + // Final chunk may be a smaller window. + adjustedChunkTime.EndNanos = timespan.EndNanos + } else { + // Before the final chunk, do not include the final sample (which will be + // queried by the subsequent chunk). + adjustedChunkTime.EndNanos -= adjustedChunkTime.SampleDurationNanos } + data, sources, err := db.Query( ctx, query, queryResolution, - sampleDuration, - s, - adjustedEnd, + adjustedChunkTime, mem, ) if err != nil { @@ -863,50 +861,46 @@ func (db *DB) Query( ctx context.Context, query tspb.Query, queryResolution Resolution, - sampleDuration, startNanos, endNanos int64, + timespan QueryTimespan, mem QueryMemoryContext, ) ([]tspb.TimeSeriesDatapoint, []string, error) { - resolutionSampleDuration := queryResolution.SampleDuration() - // Verify that sampleDuration is a multiple of - // queryResolution.SampleDuration(). - if sampleDuration < resolutionSampleDuration { - return nil, nil, fmt.Errorf( - "sampleDuration %d was not less that queryResolution.SampleDuration %d", - sampleDuration, - resolutionSampleDuration, - ) - } - if sampleDuration%resolutionSampleDuration != 0 { - return nil, nil, fmt.Errorf( - "sampleDuration %d is not a multiple of queryResolution.SampleDuration %d", - sampleDuration, - resolutionSampleDuration, - ) + if err := timespan.verifyDiskResolution(queryResolution); err != nil { + return nil, nil, err } - // Create a local account to track memory usage local to this function. - localAccount := mem.workerMonitor.MakeBoundAccount() - defer localAccount.Close(ctx) - // Disallow queries in the future. systemTime := timeutil.Now().UnixNano() - if startNanos > systemTime { + if timespan.StartNanos >= systemTime { return nil, nil, nil } - if endNanos > systemTime { - endNanos = systemTime + if timespan.EndNanos > systemTime { + timespan.EndNanos = systemTime } - // Normalize startNanos to a sampleDuration boundary. - startNanos -= startNanos % sampleDuration + // If we are downsampling and the last sample period queried contains the + // system time, move endNanos back by one sample period. This avoids the + // situation where we return a data point which was downsampled from + // incomplete data which will later be complete. + queryResolutionSampleDuration := queryResolution.SampleDuration() + if timespan.SampleDurationNanos > queryResolutionSampleDuration && + timespan.EndNanos+timespan.SampleDurationNanos > systemTime { + timespan.EndNanos -= timespan.SampleDurationNanos + } - // If query is near the current moment and we are downsampling, normalize - // endNanos to avoid querying an incomplete datapoint. - if sampleDuration > resolutionSampleDuration && - endNanos > systemTime-resolutionSampleDuration { - endNanos -= endNanos % sampleDuration + // Verify that our time period still makes sense. + timespan.normalize() + if err := timespan.verifyBounds(); err != nil { + return nil, nil, err } + // Create a local account to track memory usage local to this function. + localAccount := mem.workerMonitor.MakeBoundAccount() + defer localAccount.Close(ctx) + + // Actual queried data should include the interpolation limit on either side. + queryTimespan := timespan + queryTimespan.expand(mem.InterpolationLimitNanos) + var rows []client.KeyValue if len(query.Sources) == 0 { // Based on the supplied timestamps and resolution, construct start and @@ -914,10 +908,10 @@ func (db *DB) Query( // the query. Query slightly before and after the actual queried range // to allow interpolation of points at the start and end of the range. startKey := MakeDataKey( - query.Name, "" /* source */, queryResolution, startNanos-mem.InterpolationLimitNanos, + query.Name, "" /* source */, queryResolution, queryTimespan.StartNanos, ) endKey := MakeDataKey( - query.Name, "" /* source */, queryResolution, endNanos+mem.InterpolationLimitNanos, + query.Name, "" /* source */, queryResolution, queryTimespan.EndNanos, ).PrefixEnd() b := &client.Batch{} b.Scan(startKey, endKey) @@ -931,10 +925,8 @@ func (db *DB) Query( // Iterate over all key timestamps which may contain data for the given // sources, based on the given start/end time and the resolution. kd := queryResolution.SlabDuration() - startKeyNanos := startNanos - mem.InterpolationLimitNanos - startKeyNanos = startKeyNanos - (startKeyNanos % kd) - endKeyNanos := endNanos + mem.InterpolationLimitNanos - for currentTimestamp := startKeyNanos; currentTimestamp <= endKeyNanos; currentTimestamp += kd { + startTimestamp := queryTimespan.StartNanos - queryTimespan.StartNanos%kd + for currentTimestamp := startTimestamp; currentTimestamp <= queryTimespan.EndNanos; currentTimestamp += kd { for _, source := range query.Sources { key := MakeDataKey(query.Name, source, queryResolution, currentTimestamp) b.Get(key) @@ -955,7 +947,7 @@ func (db *DB) Query( // Convert the queried source data into a set of data spans, one for each // source. - sourceSpans, err := makeDataSpans(ctx, rows, startNanos, &localAccount) + sourceSpans, err := makeDataSpans(ctx, rows, timespan.StartNanos, &localAccount) if err != nil { return nil, nil, err } @@ -978,14 +970,14 @@ func (db *DB) Query( // list of all sources with data present in the query. sources := make([]string, 0, len(sourceSpans)) iters := make(aggregatingIterator, 0, len(sourceSpans)) - maxDistance := int32(mem.InterpolationLimitNanos / sampleDuration) + maxDistance := int32(mem.InterpolationLimitNanos / timespan.SampleDurationNanos) for name, span := range sourceSpans { if err := mem.resultAccount.Grow(ctx, int64(len(name))); err != nil { return nil, nil, err } sources = append(sources, name) iters = append(iters, newInterpolatingIterator( - *span, 0, sampleDuration, maxDistance, extractor, downsampler, query.GetDerivative(), + *span, 0, timespan.SampleDurationNanos, maxDistance, extractor, downsampler, query.GetDerivative(), )) } @@ -1009,7 +1001,7 @@ func (db *DB) Query( // Filter the result of the aggregation function through a leading edge // filter. - cutoffNanos := timeutil.Now().UnixNano() - resolutionSampleDuration + cutoffNanos := timeutil.Now().UnixNano() - queryResolutionSampleDuration valueFn := iters.makeLeadingEdgeFilter(aggFn, cutoffNanos) // Iterate over all requested offsets, recording a value from the @@ -1023,7 +1015,7 @@ func (db *DB) Query( } var responseData []tspb.TimeSeriesDatapoint - for iters.isValid() && iters.timestamp() <= endNanos { + for iters.isValid() && iters.timestamp() <= timespan.EndNanos { if value, valid := valueFn(); valid { if err := mem.resultAccount.Grow(ctx, sizeOfDataPoint); err != nil { return nil, nil, err @@ -1033,7 +1025,7 @@ func (db *DB) Query( Value: value, } if query.GetDerivative() != tspb.TimeSeriesQueryDerivative_NONE { - response.Value = response.Value / float64(sampleDuration) * float64(time.Second.Nanoseconds()) + response.Value = response.Value / float64(timespan.SampleDurationNanos) * float64(time.Second.Nanoseconds()) } responseData = append(responseData, response) } diff --git a/pkg/ts/query_test.go b/pkg/ts/query_test.go index 4222ab685905..44844da80a82 100644 --- a/pkg/ts/query_test.go +++ b/pkg/ts/query_test.go @@ -841,8 +841,13 @@ func TestQueryDownsampling(t *testing.T) { defer memContext.Close(context.TODO()) // Query with sampleDuration that is too small, expect error. + timespan := QueryTimespan{ + SampleDurationNanos: 1, + StartNanos: 0, + EndNanos: 10000, + } _, _, err := tm.DB.Query( - context.TODO(), tspb.Query{}, Resolution10s, 1, 0, 10000, memContext, + context.TODO(), tspb.Query{}, Resolution10s, timespan, memContext, ) if err == nil { t.Fatal("expected query to fail with sampleDuration less than resolution allows.") @@ -853,14 +858,16 @@ func TestQueryDownsampling(t *testing.T) { } // Query with sampleDuration which is not an even multiple of the resolution. - + timespan = QueryTimespan{ + SampleDurationNanos: Resolution10s.SampleDuration() + 1, + StartNanos: 0, + EndNanos: 10000, + } _, _, err = tm.DB.Query( context.TODO(), tspb.Query{}, Resolution10s, - Resolution10s.SampleDuration()+1, - 0, - 10000, + timespan, memContext, ) if err == nil { @@ -1089,13 +1096,16 @@ func TestQueryWorkerMemoryConstraint(t *testing.T) { memContext.BudgetBytes = 1000 memContext.EstimatedSources = 3 defer memContext.Close(context.TODO()) + timespan := QueryTimespan{ + SampleDurationNanos: 1, + StartNanos: 0, + EndNanos: 10000, + } _, _, err := tm.DB.QueryMemoryConstrained( context.TODO(), tspb.Query{Name: "test.metric"}, resolution1ns, - 1, - 0, - 10000, + timespan, memContext, ) if errorStr := "insufficient"; !testutils.IsError(err, errorStr) { @@ -1156,8 +1166,13 @@ func TestQueryWorkerMemoryMonitor(t *testing.T) { memContext := tm.makeMemoryContext(0) defer memContext.Close(context.TODO()) + timespan := QueryTimespan{ + SampleDurationNanos: 1, + StartNanos: 0, + EndNanos: 10000, + } _, _, err := tm.DB.Query( - context.TODO(), tspb.Query{Name: "test.metric"}, resolution1ns, 1, 0, 10000, memContext, + context.TODO(), tspb.Query{Name: "test.metric"}, resolution1ns, timespan, memContext, ) if errorStr := "memory budget exceeded"; !testutils.IsError(err, errorStr) { t.Fatalf("bad query got error %q, wanted to match %q", err.Error(), errorStr) @@ -1177,8 +1192,13 @@ func TestQueryWorkerMemoryMonitor(t *testing.T) { ) runtime.ReadMemStats(&memStatsBefore) + timespan = QueryTimespan{ + SampleDurationNanos: 1, + StartNanos: 0, + EndNanos: 10000, + } _, _, err = tm.DB.Query( - context.TODO(), tspb.Query{Name: "test.metric"}, resolution1ns, 1, 0, 10000, memContext, + context.TODO(), tspb.Query{Name: "test.metric"}, resolution1ns, timespan, memContext, ) if err != nil { t.Fatalf("expected no error from query, got %v", err) @@ -1202,11 +1222,16 @@ func TestQueryBadRequests(t *testing.T) { // Query with a downsampler that is invalid, expect error. downsampler := (tspb.TimeSeriesQueryAggregator)(999) + timespan := QueryTimespan{ + SampleDurationNanos: 10, + StartNanos: 0, + EndNanos: 10000, + } _, _, err := tm.DB.Query( context.TODO(), tspb.Query{ Name: "metric.test", Downsampler: downsampler.Enum(), - }, resolution1ns, 10, 0, 10000, memContext, + }, resolution1ns, timespan, memContext, ) errorStr := "unknown time series downsampler" if !testutils.IsError(err, errorStr) { @@ -1223,7 +1248,7 @@ func TestQueryBadRequests(t *testing.T) { context.TODO(), tspb.Query{ Name: "metric.test", SourceAggregator: aggregator.Enum(), - }, resolution1ns, 10, 0, 10000, memContext, + }, resolution1ns, timespan, memContext, ) errorStr = "unknown time series aggregator" if !testutils.IsError(err, errorStr) { @@ -1239,7 +1264,7 @@ func TestQueryBadRequests(t *testing.T) { _, _, err = tm.DB.Query( context.TODO(), tspb.Query{ Derivative: derivative.Enum(), - }, resolution1ns, 10, 0, 10000, memContext, + }, resolution1ns, timespan, memContext, ) if !testutils.IsError(err, "") { t.Fatalf("bad query got error %q, wanted no error", err.Error()) diff --git a/pkg/ts/server.go b/pkg/ts/server.go index dc558dc3bf4b..2bb419ed5367 100644 --- a/pkg/ts/server.go +++ b/pkg/ts/server.go @@ -212,6 +212,12 @@ func (s *Server) Query( } }() + timespan := QueryTimespan{ + StartNanos: request.StartNanos, + EndNanos: request.EndNanos, + SampleDurationNanos: sampleNanos, + } + // Start a task which is itself responsible for starting per-query worker // tasks. This is needed because RunLimitedAsyncTask can block; in the // case where a single request has more queries than the semaphore limit, @@ -253,9 +259,7 @@ func (s *Server) Query( ctx, query, Resolution10s, - sampleNanos, - request.StartNanos, - request.EndNanos, + timespan, memContexts[queryIdx], ) if err == nil { diff --git a/pkg/ts/timespan.go b/pkg/ts/timespan.go new file mode 100644 index 000000000000..df1af2777a42 --- /dev/null +++ b/pkg/ts/timespan.go @@ -0,0 +1,88 @@ +// Copyright 2018 The Cockroach Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +// implied. See the License for the specific language governing +// permissions and limitations under the License. + +package ts + +import "fmt" + +// QueryTimespan describes the time range information for a query - the start +// and end bounds of the query, along with the requested duration of individual +// samples to be returned. Methods of this structure are mutating. +type QueryTimespan struct { + StartNanos int64 + EndNanos int64 + SampleDurationNanos int64 +} + +// width returns the width of the timespan: the distance between its start and +// and end bounds. +func (qt *QueryTimespan) width() int64 { + return qt.EndNanos - qt.StartNanos +} + +// slideForward modifies the timespan so that it has the same width, but +// startNanos is moved to endNanos - in effect, sliding the timespan forward to +// the next "window" with the same width. +func (qt *QueryTimespan) slideForward() { + w := qt.width() + qt.StartNanos += w + qt.EndNanos += w +} + +// expand modifies the timespan so that its width is expanded *on each side* +// by the supplied size; the resulting width will be (2 * size) larger than the +// original width. +func (qt *QueryTimespan) expand(size int64) { + qt.StartNanos -= size + qt.EndNanos += size +} + +// normalize modifies startNanos and endNanos so that they are exact multiples +// of the sampleDuration. Values are modified by subtraction. +func (qt *QueryTimespan) normalize() { + qt.StartNanos -= qt.StartNanos % qt.SampleDurationNanos + qt.EndNanos -= qt.EndNanos % qt.SampleDurationNanos +} + +// verifyBounds returns an error if the bounds of this QueryTimespan are +// incorrect; currently, this only occurs if the width is negative. +func (qt *QueryTimespan) verifyBounds() error { + if qt.StartNanos > qt.EndNanos { + return fmt.Errorf("startNanos %d was later than endNanos %d", qt.StartNanos, qt.EndNanos) + } + return nil +} + +// verifyDiskResolution returns an error if this timespan is not suitable for +// querying the supplied disk resolution. +func (qt *QueryTimespan) verifyDiskResolution(diskResolution Resolution) error { + resolutionSampleDuration := diskResolution.SampleDuration() + // Verify that sampleDuration is a multiple of + // diskResolution.SampleDuration(). + if qt.SampleDurationNanos < resolutionSampleDuration { + return fmt.Errorf( + "sampleDuration %d was not less that queryResolution.SampleDuration %d", + qt.SampleDurationNanos, + resolutionSampleDuration, + ) + } + if qt.SampleDurationNanos%resolutionSampleDuration != 0 { + return fmt.Errorf( + "sampleDuration %d is not a multiple of queryResolution.SampleDuration %d", + qt.SampleDurationNanos, + resolutionSampleDuration, + ) + } + return nil +}