Skip to content

Commit

Permalink
Merge #25025
Browse files Browse the repository at this point in the history
25025: ts: Factor Query Time Parameters into Struct r=mrtracy a=mrtracy

Moves time parameters of a query (start, end, sample duration) into a
struct. Several utility methods that were previously inlined and
repeated have been added to this structure to make code elsewhere more
understandable.

Release Note: None

Co-authored-by: Matt Tracy <[email protected]>
  • Loading branch information
craig[bot] and Matt Tracy committed Apr 24, 2018
2 parents 6064ff0 + 905a5b3 commit 5d39b64
Show file tree
Hide file tree
Showing 6 changed files with 199 additions and 89 deletions.
8 changes: 5 additions & 3 deletions pkg/storage/ts_maintenance_queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -308,9 +308,11 @@ func TestTimeSeriesMaintenanceQueueServer(t *testing.T) {
context.TODO(),
tspb.Query{Name: seriesName},
ts.Resolution10s,
ts.Resolution10s.SampleDuration(),
0,
now+ts.Resolution10s.SlabDuration(),
ts.QueryTimespan{
SampleDurationNanos: ts.Resolution10s.SampleDuration(),
StartNanos: 0,
EndNanos: now + ts.Resolution10s.SlabDuration(),
},
memContext,
)
return dps, err
Expand Down
13 changes: 6 additions & 7 deletions pkg/ts/db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -313,14 +313,13 @@ func (tm *testModelRunner) assertQuery(

memContext := tm.makeMemoryContext(interpolationLimit)
defer memContext.Close(context.TODO())
timespan := QueryTimespan{
StartNanos: start,
EndNanos: end,
SampleDurationNanos: sampleDuration,
}
actualDatapoints, actualSources, err := tm.DB.QueryMemoryConstrained(
context.TODO(),
q,
r,
sampleDuration,
start,
end,
memContext,
context.TODO(), q, r, timespan, memContext,
)
if err != nil {
tm.t.Fatal(err)
Expand Down
118 changes: 55 additions & 63 deletions pkg/ts/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -761,51 +761,49 @@ func (ai aggregatingIterator) makeLeadingEdgeFilter(
// query into multiple "chunks" as necessary according to the provided memory
// budget. Chunks are queried sequentially, ensuring that the memory budget
// applies to only a single chunk at any given time.
//
// In addition to the memory budget, an "expected source count" must be provided
// to this function. This allows the query to better predict how many slabs
// it will encounter when querying a time span.
func (db *DB) QueryMemoryConstrained(
ctx context.Context,
query tspb.Query,
queryResolution Resolution,
sampleDuration, startNanos, endNanos int64,
timespan QueryTimespan,
mem QueryMemoryContext,
) ([]tspb.TimeSeriesDatapoint, []string, error) {
maxTimespan, err := mem.GetMaxTimespan(queryResolution)
maxTimespanWidth, err := mem.GetMaxTimespan(queryResolution)
if err != nil {
return nil, nil, err
}

totalTimespan := endNanos - startNanos
if maxTimespan >= totalTimespan {
timespan.normalize()
if maxTimespanWidth >= timespan.width() {
return db.Query(
ctx,
query,
queryResolution,
sampleDuration,
startNanos,
endNanos,
timespan,
mem,
)
}

allData := make([]tspb.TimeSeriesDatapoint, 0)
allSourcesMap := make(map[string]struct{})
for s, e := startNanos, startNanos+maxTimespan; s < endNanos; s, e = e, e+maxTimespan {
// End span is not inclusive for partial queries.
adjustedEnd := e - queryResolution.SampleDuration()
// Do not exceed the specified endNanos.
if adjustedEnd > endNanos {
adjustedEnd = endNanos
chunkTime := timespan
chunkTime.EndNanos = chunkTime.StartNanos + maxTimespanWidth
for ; chunkTime.StartNanos < timespan.EndNanos; chunkTime.slideForward() {
adjustedChunkTime := chunkTime
if adjustedChunkTime.EndNanos > timespan.EndNanos {
// Final chunk may be a smaller window.
adjustedChunkTime.EndNanos = timespan.EndNanos
} else {
// Before the final chunk, do not include the final sample (which will be
// queried by the subsequent chunk).
adjustedChunkTime.EndNanos -= adjustedChunkTime.SampleDurationNanos
}

data, sources, err := db.Query(
ctx,
query,
queryResolution,
sampleDuration,
s,
adjustedEnd,
adjustedChunkTime,
mem,
)
if err != nil {
Expand Down Expand Up @@ -863,61 +861,57 @@ func (db *DB) Query(
ctx context.Context,
query tspb.Query,
queryResolution Resolution,
sampleDuration, startNanos, endNanos int64,
timespan QueryTimespan,
mem QueryMemoryContext,
) ([]tspb.TimeSeriesDatapoint, []string, error) {
resolutionSampleDuration := queryResolution.SampleDuration()
// Verify that sampleDuration is a multiple of
// queryResolution.SampleDuration().
if sampleDuration < resolutionSampleDuration {
return nil, nil, fmt.Errorf(
"sampleDuration %d was not less that queryResolution.SampleDuration %d",
sampleDuration,
resolutionSampleDuration,
)
}
if sampleDuration%resolutionSampleDuration != 0 {
return nil, nil, fmt.Errorf(
"sampleDuration %d is not a multiple of queryResolution.SampleDuration %d",
sampleDuration,
resolutionSampleDuration,
)
if err := timespan.verifyDiskResolution(queryResolution); err != nil {
return nil, nil, err
}

// Create a local account to track memory usage local to this function.
localAccount := mem.workerMonitor.MakeBoundAccount()
defer localAccount.Close(ctx)

// Disallow queries in the future.
systemTime := timeutil.Now().UnixNano()
if startNanos > systemTime {
if timespan.StartNanos >= systemTime {
return nil, nil, nil
}
if endNanos > systemTime {
endNanos = systemTime
if timespan.EndNanos > systemTime {
timespan.EndNanos = systemTime
}

// Normalize startNanos to a sampleDuration boundary.
startNanos -= startNanos % sampleDuration
// If we are downsampling and the last sample period queried contains the
// system time, move endNanos back by one sample period. This avoids the
// situation where we return a data point which was downsampled from
// incomplete data which will later be complete.
queryResolutionSampleDuration := queryResolution.SampleDuration()
if timespan.SampleDurationNanos > queryResolutionSampleDuration &&
timespan.EndNanos+timespan.SampleDurationNanos > systemTime {
timespan.EndNanos -= timespan.SampleDurationNanos
}

// If query is near the current moment and we are downsampling, normalize
// endNanos to avoid querying an incomplete datapoint.
if sampleDuration > resolutionSampleDuration &&
endNanos > systemTime-resolutionSampleDuration {
endNanos -= endNanos % sampleDuration
// Verify that our time period still makes sense.
timespan.normalize()
if err := timespan.verifyBounds(); err != nil {
return nil, nil, err
}

// Create a local account to track memory usage local to this function.
localAccount := mem.workerMonitor.MakeBoundAccount()
defer localAccount.Close(ctx)

// Actual queried data should include the interpolation limit on either side.
queryTimespan := timespan
queryTimespan.expand(mem.InterpolationLimitNanos)

var rows []client.KeyValue
if len(query.Sources) == 0 {
// Based on the supplied timestamps and resolution, construct start and
// end keys for a scan that will return every key with data relevant to
// the query. Query slightly before and after the actual queried range
// to allow interpolation of points at the start and end of the range.
startKey := MakeDataKey(
query.Name, "" /* source */, queryResolution, startNanos-mem.InterpolationLimitNanos,
query.Name, "" /* source */, queryResolution, queryTimespan.StartNanos,
)
endKey := MakeDataKey(
query.Name, "" /* source */, queryResolution, endNanos+mem.InterpolationLimitNanos,
query.Name, "" /* source */, queryResolution, queryTimespan.EndNanos,
).PrefixEnd()
b := &client.Batch{}
b.Scan(startKey, endKey)
Expand All @@ -931,10 +925,8 @@ func (db *DB) Query(
// Iterate over all key timestamps which may contain data for the given
// sources, based on the given start/end time and the resolution.
kd := queryResolution.SlabDuration()
startKeyNanos := startNanos - mem.InterpolationLimitNanos
startKeyNanos = startKeyNanos - (startKeyNanos % kd)
endKeyNanos := endNanos + mem.InterpolationLimitNanos
for currentTimestamp := startKeyNanos; currentTimestamp <= endKeyNanos; currentTimestamp += kd {
startTimestamp := queryTimespan.StartNanos - queryTimespan.StartNanos%kd
for currentTimestamp := startTimestamp; currentTimestamp <= queryTimespan.EndNanos; currentTimestamp += kd {
for _, source := range query.Sources {
key := MakeDataKey(query.Name, source, queryResolution, currentTimestamp)
b.Get(key)
Expand All @@ -955,7 +947,7 @@ func (db *DB) Query(

// Convert the queried source data into a set of data spans, one for each
// source.
sourceSpans, err := makeDataSpans(ctx, rows, startNanos, &localAccount)
sourceSpans, err := makeDataSpans(ctx, rows, timespan.StartNanos, &localAccount)
if err != nil {
return nil, nil, err
}
Expand All @@ -978,14 +970,14 @@ func (db *DB) Query(
// list of all sources with data present in the query.
sources := make([]string, 0, len(sourceSpans))
iters := make(aggregatingIterator, 0, len(sourceSpans))
maxDistance := int32(mem.InterpolationLimitNanos / sampleDuration)
maxDistance := int32(mem.InterpolationLimitNanos / timespan.SampleDurationNanos)
for name, span := range sourceSpans {
if err := mem.resultAccount.Grow(ctx, int64(len(name))); err != nil {
return nil, nil, err
}
sources = append(sources, name)
iters = append(iters, newInterpolatingIterator(
*span, 0, sampleDuration, maxDistance, extractor, downsampler, query.GetDerivative(),
*span, 0, timespan.SampleDurationNanos, maxDistance, extractor, downsampler, query.GetDerivative(),
))
}

Expand All @@ -1009,7 +1001,7 @@ func (db *DB) Query(

// Filter the result of the aggregation function through a leading edge
// filter.
cutoffNanos := timeutil.Now().UnixNano() - resolutionSampleDuration
cutoffNanos := timeutil.Now().UnixNano() - queryResolutionSampleDuration
valueFn := iters.makeLeadingEdgeFilter(aggFn, cutoffNanos)

// Iterate over all requested offsets, recording a value from the
Expand All @@ -1023,7 +1015,7 @@ func (db *DB) Query(
}

var responseData []tspb.TimeSeriesDatapoint
for iters.isValid() && iters.timestamp() <= endNanos {
for iters.isValid() && iters.timestamp() <= timespan.EndNanos {
if value, valid := valueFn(); valid {
if err := mem.resultAccount.Grow(ctx, sizeOfDataPoint); err != nil {
return nil, nil, err
Expand All @@ -1033,7 +1025,7 @@ func (db *DB) Query(
Value: value,
}
if query.GetDerivative() != tspb.TimeSeriesQueryDerivative_NONE {
response.Value = response.Value / float64(sampleDuration) * float64(time.Second.Nanoseconds())
response.Value = response.Value / float64(timespan.SampleDurationNanos) * float64(time.Second.Nanoseconds())
}
responseData = append(responseData, response)
}
Expand Down
51 changes: 38 additions & 13 deletions pkg/ts/query_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -841,8 +841,13 @@ func TestQueryDownsampling(t *testing.T) {
defer memContext.Close(context.TODO())

// Query with sampleDuration that is too small, expect error.
timespan := QueryTimespan{
SampleDurationNanos: 1,
StartNanos: 0,
EndNanos: 10000,
}
_, _, err := tm.DB.Query(
context.TODO(), tspb.Query{}, Resolution10s, 1, 0, 10000, memContext,
context.TODO(), tspb.Query{}, Resolution10s, timespan, memContext,
)
if err == nil {
t.Fatal("expected query to fail with sampleDuration less than resolution allows.")
Expand All @@ -853,14 +858,16 @@ func TestQueryDownsampling(t *testing.T) {
}

// Query with sampleDuration which is not an even multiple of the resolution.

timespan = QueryTimespan{
SampleDurationNanos: Resolution10s.SampleDuration() + 1,
StartNanos: 0,
EndNanos: 10000,
}
_, _, err = tm.DB.Query(
context.TODO(),
tspb.Query{},
Resolution10s,
Resolution10s.SampleDuration()+1,
0,
10000,
timespan,
memContext,
)
if err == nil {
Expand Down Expand Up @@ -1089,13 +1096,16 @@ func TestQueryWorkerMemoryConstraint(t *testing.T) {
memContext.BudgetBytes = 1000
memContext.EstimatedSources = 3
defer memContext.Close(context.TODO())
timespan := QueryTimespan{
SampleDurationNanos: 1,
StartNanos: 0,
EndNanos: 10000,
}
_, _, err := tm.DB.QueryMemoryConstrained(
context.TODO(),
tspb.Query{Name: "test.metric"},
resolution1ns,
1,
0,
10000,
timespan,
memContext,
)
if errorStr := "insufficient"; !testutils.IsError(err, errorStr) {
Expand Down Expand Up @@ -1156,8 +1166,13 @@ func TestQueryWorkerMemoryMonitor(t *testing.T) {

memContext := tm.makeMemoryContext(0)
defer memContext.Close(context.TODO())
timespan := QueryTimespan{
SampleDurationNanos: 1,
StartNanos: 0,
EndNanos: 10000,
}
_, _, err := tm.DB.Query(
context.TODO(), tspb.Query{Name: "test.metric"}, resolution1ns, 1, 0, 10000, memContext,
context.TODO(), tspb.Query{Name: "test.metric"}, resolution1ns, timespan, memContext,
)
if errorStr := "memory budget exceeded"; !testutils.IsError(err, errorStr) {
t.Fatalf("bad query got error %q, wanted to match %q", err.Error(), errorStr)
Expand All @@ -1177,8 +1192,13 @@ func TestQueryWorkerMemoryMonitor(t *testing.T) {
)
runtime.ReadMemStats(&memStatsBefore)

timespan = QueryTimespan{
SampleDurationNanos: 1,
StartNanos: 0,
EndNanos: 10000,
}
_, _, err = tm.DB.Query(
context.TODO(), tspb.Query{Name: "test.metric"}, resolution1ns, 1, 0, 10000, memContext,
context.TODO(), tspb.Query{Name: "test.metric"}, resolution1ns, timespan, memContext,
)
if err != nil {
t.Fatalf("expected no error from query, got %v", err)
Expand All @@ -1202,11 +1222,16 @@ func TestQueryBadRequests(t *testing.T) {

// Query with a downsampler that is invalid, expect error.
downsampler := (tspb.TimeSeriesQueryAggregator)(999)
timespan := QueryTimespan{
SampleDurationNanos: 10,
StartNanos: 0,
EndNanos: 10000,
}
_, _, err := tm.DB.Query(
context.TODO(), tspb.Query{
Name: "metric.test",
Downsampler: downsampler.Enum(),
}, resolution1ns, 10, 0, 10000, memContext,
}, resolution1ns, timespan, memContext,
)
errorStr := "unknown time series downsampler"
if !testutils.IsError(err, errorStr) {
Expand All @@ -1223,7 +1248,7 @@ func TestQueryBadRequests(t *testing.T) {
context.TODO(), tspb.Query{
Name: "metric.test",
SourceAggregator: aggregator.Enum(),
}, resolution1ns, 10, 0, 10000, memContext,
}, resolution1ns, timespan, memContext,
)
errorStr = "unknown time series aggregator"
if !testutils.IsError(err, errorStr) {
Expand All @@ -1239,7 +1264,7 @@ func TestQueryBadRequests(t *testing.T) {
_, _, err = tm.DB.Query(
context.TODO(), tspb.Query{
Derivative: derivative.Enum(),
}, resolution1ns, 10, 0, 10000, memContext,
}, resolution1ns, timespan, memContext,
)
if !testutils.IsError(err, "") {
t.Fatalf("bad query got error %q, wanted no error", err.Error())
Expand Down
Loading

0 comments on commit 5d39b64

Please sign in to comment.