diff --git a/Makefile b/Makefile index d0cf323e9b..a6a5df5992 100644 --- a/Makefile +++ b/Makefile @@ -332,11 +332,11 @@ define SUBDIR_RULES # bundle. # ifeq ($(SUBDIR), kube) -# Builds the single kube bundle from individual manifest files. +# Builds the single kube bundle from individual manifest files. # all-gen-kube: install-tools # @echo "--- Generating kube bundle" # @./kube/scripts/build_bundle.sh -# find kube -name '*.yaml' -print0 | PATH=$(combined_bin_paths):$(PATH) xargs -0 kubeval -v=1.12.0 +# find kube -name '*.yaml' -print0 | PATH=$(combined_bin_paths):$(PATH) xargs -0 kubeval -v=1.12.0 # else diff --git a/src/cmd/services/m3comparator/main/main.go b/src/cmd/services/m3comparator/main/main.go index 85c73e9c1a..5d190114d3 100644 --- a/src/cmd/services/m3comparator/main/main.go +++ b/src/cmd/services/m3comparator/main/main.go @@ -36,24 +36,39 @@ import ( "go.uber.org/zap" ) -func main() { - var ( - iterPools = pools.BuildIteratorPools( - pools.BuildIteratorPoolsOptions{}) - poolWrapper = pools.NewPoolsWrapper(iterPools) +var ( + iterPools = pools.BuildIteratorPools( + pools.BuildIteratorPoolsOptions{}) + poolWrapper = pools.NewPoolsWrapper(iterPools) - iOpts = instrument.NewOptions() - logger = iOpts.Logger() - tagOptions = models.NewTagOptions() + iOpts = instrument.NewOptions() + logger = iOpts.Logger() + tagOptions = models.NewTagOptions() - encoderPoolOpts = pool.NewObjectPoolOptions() - encoderPool = encoding.NewEncoderPool(encoderPoolOpts) - ) + encoderPoolOpts = pool.NewObjectPoolOptions() + encoderPool = encoding.NewEncoderPool(encoderPoolOpts) + + checkedBytesPool pool.CheckedBytesPool + encodingOpts encoding.Options +) + +func init() { + buckets := []pool.Bucket{{Capacity: 10, Count: 10}} + newBackingBytesPool := func(s []pool.Bucket) pool.BytesPool { + return pool.NewBytesPool(s, nil) + } + + checkedBytesPool = pool.NewCheckedBytesPool(buckets, nil, newBackingBytesPool) + checkedBytesPool.Init() + + encodingOpts = encoding.NewOptions().SetEncoderPool(encoderPool).SetBytesPool(checkedBytesPool) encoderPool.Init(func() encoding.Encoder { - return m3tsz.NewEncoder(time.Time{}, nil, true, encoding.NewOptions()) + return m3tsz.NewEncoder(time.Time{}, nil, true, encodingOpts) }) +} +func main() { opts := iteratorOptions{ encoderPool: encoderPool, iteratorPools: iterPools, diff --git a/src/cmd/services/m3comparator/main/parser/parser.go b/src/cmd/services/m3comparator/main/parser/parser.go index b4a0a4601d..db18416aa9 100644 --- a/src/cmd/services/m3comparator/main/parser/parser.go +++ b/src/cmd/services/m3comparator/main/parser/parser.go @@ -21,8 +21,10 @@ package parser import ( + "encoding/json" "fmt" "sort" + "strconv" "strings" "time" ) @@ -45,10 +47,35 @@ type Datapoints []Datapoint // Datapoint is a JSON serializeable datapoint for the series. type Datapoint struct { - Value float64 `json:"val"` + Value Value `json:"val"` Timestamp time.Time `json:"ts"` } +// Value is a JSON serizlizable float64 that allows NaNs. +type Value float64 + +// MarshalJSON returns state as the JSON encoding of a Value. +func (v Value) MarshalJSON() ([]byte, error) { + return json.Marshal(fmt.Sprintf("%g", float64(v))) +} + +// UnmarshalJSON unmarshals JSON-encoded data into a Value. +func (v *Value) UnmarshalJSON(data []byte) error { + var str string + err := json.Unmarshal(data, &str) + if err != nil { + return err + } + + f, err := strconv.ParseFloat(str, 64) + if err != nil { + return err + } + + *v = Value(f) + return nil +} + // IDOrGenID gets the ID for this result. func (r *Series) IDOrGenID() string { if len(r.id) == 0 { diff --git a/src/cmd/services/m3comparator/main/parser/parser_test.go b/src/cmd/services/m3comparator/main/parser/parser_test.go new file mode 100644 index 0000000000..5459fddc24 --- /dev/null +++ b/src/cmd/services/m3comparator/main/parser/parser_test.go @@ -0,0 +1,59 @@ +// Copyright (c) 2020 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The abovale copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package parser + +import ( + "encoding/json" + "math" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestElectionStateJSONMarshal(t *testing.T) { + for _, input := range []struct { + val Value + str string + }{ + {val: 1231243.123123, str: `"1.231243123123e+06"`}, + {val: 0.000000001, str: `"1e-09"`}, + {val: Value(math.NaN()), str: `"NaN"`}, + {val: Value(math.Inf(1)), str: `"+Inf"`}, + {val: Value(math.Inf(-1)), str: `"-Inf"`}, + } { + b, err := json.Marshal(input.val) + require.NoError(t, err) + assert.Equal(t, input.str, string(b)) + + var val Value + json.Unmarshal([]byte(input.str), &val) + if math.IsNaN(float64(input.val)) { + assert.True(t, math.IsNaN(float64(val))) + } else if math.IsInf(float64(input.val), 1) { + assert.True(t, math.IsInf(float64(val), 1)) + } else if math.IsInf(float64(input.val), -1) { + assert.True(t, math.IsInf(float64(val), -1)) + } else { + assert.Equal(t, input.val, val) + } + } +} diff --git a/src/cmd/services/m3comparator/main/querier.go b/src/cmd/services/m3comparator/main/querier.go index a85ec81338..e98866bae6 100644 --- a/src/cmd/services/m3comparator/main/querier.go +++ b/src/cmd/services/m3comparator/main/querier.go @@ -35,6 +35,7 @@ import ( "github.com/m3db/m3/src/query/block" "github.com/m3db/m3/src/query/storage" "github.com/m3db/m3/src/query/storage/m3" + xtime "github.com/m3db/m3/src/x/time" ) var _ m3.Querier = (*querier)(nil) @@ -62,9 +63,11 @@ func generateSeriesBlock( numPoints := int(blockSize / resolution) dps := make(seriesBlock, 0, numPoints) for i := 0; i < numPoints; i++ { + stamp := start.Add(resolution * time.Duration(i)) dp := ts.Datapoint{ - Timestamp: start.Add(resolution * time.Duration(i)), - Value: rand.Float64(), + Timestamp: stamp, + TimestampNanos: xtime.ToUnixNano(stamp), + Value: rand.Float64(), } dps = append(dps, dp) @@ -116,13 +119,17 @@ func (q *querier) FetchCompressed( name := q.opts.tagOptions.MetricName() for _, matcher := range query.TagMatchers { if bytes.Equal(name, matcher.Name) { - iters = q.handler.getSeriesIterators(string(matcher.Value)) + iters, err = q.handler.getSeriesIterators(string(matcher.Value)) + if err != nil { + return m3.SeriesFetchResult{}, noop, err + } + break } } if iters == nil || iters.Len() == 0 { - iters, err = q.genIters(query) + iters, err = q.generateRandomIters(query) if err != nil { return m3.SeriesFetchResult{}, noop, err } @@ -139,7 +146,7 @@ func (q *querier) FetchCompressed( }, cleanup, nil } -func (q *querier) genIters( +func (q *querier) generateRandomIters( query *storage.FetchQuery, ) (encoding.SeriesIterators, error) { var ( diff --git a/src/cmd/services/m3comparator/main/series_load_handler.go b/src/cmd/services/m3comparator/main/series_load_handler.go index 12bdf27bf0..62be51d954 100644 --- a/src/cmd/services/m3comparator/main/series_load_handler.go +++ b/src/cmd/services/m3comparator/main/series_load_handler.go @@ -76,32 +76,37 @@ func (l *seriesLoadHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { } func (l *seriesLoadHandler) getSeriesIterators( - name string) encoding.SeriesIterators { + name string) (encoding.SeriesIterators, error) { l.RLock() defer l.RUnlock() logger := l.iterOpts.iOpts.Logger() seriesMap, found := l.nameIDSeriesMap[name] if !found || len(seriesMap.series) == 0 { - return nil + return nil, nil } iters := make([]encoding.SeriesIterator, 0, len(seriesMap.series)) for _, series := range seriesMap.series { encoder := l.iterOpts.encoderPool.Get() dps := series.Datapoints - encoder.Reset(time.Now(), len(dps), nil) + startTime := time.Time{} + if len(dps) > 0 { + startTime = dps[0].Timestamp.Truncate(time.Hour) + } + + encoder.Reset(startTime, len(dps), nil) for _, dp := range dps { err := encoder.Encode(ts.Datapoint{ Timestamp: dp.Timestamp, - Value: dp.Value, + Value: float64(dp.Value), TimestampNanos: xtime.ToUnixNano(dp.Timestamp), - }, xtime.Second, nil) + }, xtime.Nanosecond, nil) if err != nil { encoder.Close() logger.Error("error encoding datapoints", zap.Error(err)) - return nil + return nil, err } } @@ -138,7 +143,7 @@ func (l *seriesLoadHandler) getSeriesIterators( return encoding.NewSeriesIterators( iters, l.iterOpts.iteratorPools.MutableSeriesIterators(), - ) + ), nil } func calculateSeriesRange(seriesList []parser.Series) (time.Time, time.Time) { diff --git a/src/cmd/services/m3comparator/main/series_load_handler_test.go b/src/cmd/services/m3comparator/main/series_load_handler_test.go new file mode 100644 index 0000000000..6b92e20f9d --- /dev/null +++ b/src/cmd/services/m3comparator/main/series_load_handler_test.go @@ -0,0 +1,198 @@ +// Copyright (c) 2020 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package main + +import ( + "encoding/json" + "net/http" + "net/http/httptest" + "strings" + "testing" + + "github.com/m3db/m3/src/cmd/services/m3comparator/main/parser" + "github.com/m3db/m3/src/dbnode/encoding" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +// NB: this is regression test data that used to cause issues. +const seriesStr = ` +[ + { + "start": "2020-03-30T11:39:45Z", + "end": "2020-03-30T11:58:00Z", + "tags": {"__name__": "series_name","abc": "def","tag_a": "foo"}, + "datapoints": [ + { "val": "7076", "ts": "2020-03-30T11:39:51.288Z" }, + { "val": "7076", "ts": "2020-03-30T11:39:57.478Z" }, + { "val": "7076", "ts": "2020-03-30T11:40:07.478Z" }, + { "val": "7076", "ts": "2020-03-30T11:40:18.886Z" }, + { "val": "7076", "ts": "2020-03-30T11:40:31.135Z" }, + { "val": "7077", "ts": "2020-03-30T11:40:40.047Z" }, + { "val": "7077", "ts": "2020-03-30T11:40:54.893Z" }, + { "val": "7077", "ts": "2020-03-30T11:40:57.478Z" }, + { "val": "7077", "ts": "2020-03-30T11:41:07.478Z" }, + { "val": "7077", "ts": "2020-03-30T11:41:17.478Z" }, + { "val": "7077", "ts": "2020-03-30T11:41:29.323Z" }, + { "val": "7078", "ts": "2020-03-30T11:41:43.873Z" }, + { "val": "7078", "ts": "2020-03-30T11:41:54.375Z" }, + { "val": "7078", "ts": "2020-03-30T11:41:58.053Z" }, + { "val": "7078", "ts": "2020-03-30T11:42:09.250Z" }, + { "val": "7078", "ts": "2020-03-30T11:42:20.793Z" }, + { "val": "7078", "ts": "2020-03-30T11:42:34.915Z" }, + { "val": "7079", "ts": "2020-03-30T11:42:43.467Z" }, + { "val": "7079", "ts": "2020-03-30T11:42:50.364Z" }, + { "val": "7079", "ts": "2020-03-30T11:43:02.376Z" }, + { "val": "7079", "ts": "2020-03-30T11:43:07.478Z" }, + { "val": "7079", "ts": "2020-03-30T11:43:20.807Z" }, + { "val": "7079", "ts": "2020-03-30T11:43:29.432Z" }, + { "val": "7079", "ts": "2020-03-30T11:43:37.478Z" }, + { "val": "7080", "ts": "2020-03-30T11:43:47.478Z" }, + { "val": "7080", "ts": "2020-03-30T11:44:01.078Z" }, + { "val": "7080", "ts": "2020-03-30T11:44:07.478Z" }, + { "val": "7080", "ts": "2020-03-30T11:44:17.478Z" }, + { "val": "7080", "ts": "2020-03-30T11:44:28.444Z" }, + { "val": "7080", "ts": "2020-03-30T11:44:37.478Z" }, + { "val": "7081", "ts": "2020-03-30T11:44:49.607Z" }, + { "val": "7081", "ts": "2020-03-30T11:45:02.758Z" }, + { "val": "7081", "ts": "2020-03-30T11:45:16.740Z" }, + { "val": "7081", "ts": "2020-03-30T11:45:27.813Z" }, + { "val": "7081", "ts": "2020-03-30T11:45:38.141Z" }, + { "val": "7082", "ts": "2020-03-30T11:45:53.850Z" }, + { "val": "7082", "ts": "2020-03-30T11:46:00.954Z" }, + { "val": "7082", "ts": "2020-03-30T11:46:08.814Z" }, + { "val": "7082", "ts": "2020-03-30T11:46:17.478Z" }, + { "val": "7082", "ts": "2020-03-30T11:46:27.478Z" }, + { "val": "7082", "ts": "2020-03-30T11:46:38.152Z" }, + { "val": "7083", "ts": "2020-03-30T11:46:48.192Z" }, + { "val": "7084", "ts": "2020-03-30T11:47:40.871Z" }, + { "val": "7084", "ts": "2020-03-30T11:47:49.966Z" }, + { "val": "7084", "ts": "2020-03-30T11:47:57.478Z" }, + { "val": "7084", "ts": "2020-03-30T11:48:07.478Z" }, + { "val": "7084", "ts": "2020-03-30T11:48:23.279Z" }, + { "val": "7084", "ts": "2020-03-30T11:48:29.018Z" }, + { "val": "7084", "ts": "2020-03-30T11:48:37.478Z" }, + { "val": "7085", "ts": "2020-03-30T11:48:47.478Z" }, + { "val": "7085", "ts": "2020-03-30T11:48:57.478Z" }, + { "val": "7085", "ts": "2020-03-30T11:49:07.478Z" }, + { "val": "7085", "ts": "2020-03-30T11:49:17.478Z" }, + { "val": "7085", "ts": "2020-03-30T11:49:27.478Z" }, + { "val": "7085", "ts": "2020-03-30T11:49:37.478Z" }, + { "val": "7086", "ts": "2020-03-30T11:49:47.478Z" }, + { "val": "7086", "ts": "2020-03-30T11:49:57.850Z" }, + { "val": "7086", "ts": "2020-03-30T11:50:07.478Z" }, + { "val": "7086", "ts": "2020-03-30T11:50:20.887Z" }, + { "val": "7087", "ts": "2020-03-30T11:51:12.729Z" }, + { "val": "7087", "ts": "2020-03-30T11:51:19.914Z" }, + { "val": "7087", "ts": "2020-03-30T11:51:27.478Z" }, + { "val": "7087", "ts": "2020-03-30T11:51:37.478Z" }, + { "val": "7088", "ts": "2020-03-30T11:51:47.478Z" }, + { "val": "7088", "ts": "2020-03-30T11:51:57.478Z" }, + { "val": "7088", "ts": "2020-03-30T11:52:07.478Z" }, + { "val": "7088", "ts": "2020-03-30T11:52:17.478Z" }, + { "val": "7088", "ts": "2020-03-30T11:52:29.869Z" }, + { "val": "7088", "ts": "2020-03-30T11:52:38.976Z" }, + { "val": "7089", "ts": "2020-03-30T11:52:47.478Z" }, + { "val": "7089", "ts": "2020-03-30T11:52:57.478Z" }, + { "val": "7089", "ts": "2020-03-30T11:53:07.478Z" }, + { "val": "7089", "ts": "2020-03-30T11:53:17.906Z" }, + { "val": "7089", "ts": "2020-03-30T11:53:27.478Z" }, + { "val": "7090", "ts": "2020-03-30T11:54:17.478Z" }, + { "val": "7090", "ts": "2020-03-30T11:54:27.478Z" }, + { "val": "7090", "ts": "2020-03-30T11:54:37.478Z" }, + { "val": "7091", "ts": "2020-03-30T11:54:51.214Z" }, + { "val": "7091", "ts": "2020-03-30T11:54:58.985Z" }, + { "val": "7091", "ts": "2020-03-30T11:55:08.548Z" }, + { "val": "7091", "ts": "2020-03-30T11:55:19.762Z" }, + { "val": "7091", "ts": "2020-03-30T11:55:27.478Z" }, + { "val": "7091", "ts": "2020-03-30T11:55:39.009Z" }, + { "val": "7092", "ts": "2020-03-30T11:55:47.478Z" }, + { "val": "7092", "ts": "2020-03-30T11:56:01.507Z" }, + { "val": "7092", "ts": "2020-03-30T11:56:12.995Z" }, + { "val": "7092", "ts": "2020-03-30T11:56:24.892Z" }, + { "val": "7092", "ts": "2020-03-30T11:56:38.410Z" }, + { "val": "7093", "ts": "2020-03-30T11:56:47.478Z" }, + { "val": "7093", "ts": "2020-03-30T11:56:58.786Z" }, + { "val": "7093", "ts": "2020-03-30T11:57:07.478Z" }, + { "val": "7093", "ts": "2020-03-30T11:57:17.478Z" }, + { "val": "7093", "ts": "2020-03-30T11:57:31.283Z" }, + { "val": "7093", "ts": "2020-03-30T11:57:39.113Z" }, + { "val": "7094", "ts": "2020-03-30T11:57:48.864Z" }, + { "val": "7094", "ts": "2020-03-30T11:57:57.478Z" } + ] + } +]` + +func TestIngestSeries(t *testing.T) { + opts := iteratorOptions{ + encoderPool: encoderPool, + iteratorPools: iterPools, + tagOptions: tagOptions, + iOpts: iOpts, + } + + req, err := http.NewRequest(http.MethodPost, "", strings.NewReader(seriesStr)) + require.NoError(t, err) + + recorder := httptest.NewRecorder() + + handler := newSeriesLoadHandler(opts) + handler.ServeHTTP(recorder, req) + + assert.Equal(t, http.StatusOK, recorder.Code) + + iters, err := handler.getSeriesIterators("series_name") + require.NoError(t, err) + + expectedList := make([]parser.Series, 0, 10) + err = json.Unmarshal([]byte(seriesStr), &expectedList) + require.NoError(t, err) + require.Equal(t, 1, len(expectedList)) + expected := expectedList[0] + + require.Equal(t, 1, len(iters.Iters())) + it := iters.Iters()[0] + j := 0 + for it.Next() { + c, _, _ := it.Current() + ts := c.Timestamp.UTC() + ex := expected.Datapoints[j] + assert.Equal(t, ex.Timestamp, ts) + assert.Equal(t, float64(ex.Value), c.Value) + j++ + } + + assert.NoError(t, it.Err()) + assert.Equal(t, expected.Tags, readTags(it)) + assert.Equal(t, j, len(expected.Datapoints)) +} + +func readTags(it encoding.SeriesIterator) parser.Tags { + tagIter := it.Tags() + tags := make(parser.Tags, tagIter.Len()) + for tagIter.Next() { + tag := tagIter.Current() + tags[tag.Name.String()] = tag.Value.String() + } + + return tags +} diff --git a/src/dbnode/encoding/m3tsz/encoder.go b/src/dbnode/encoding/m3tsz/encoder.go index 302c43be16..e825222cc8 100644 --- a/src/dbnode/encoding/m3tsz/encoder.go +++ b/src/dbnode/encoding/m3tsz/encoder.go @@ -302,7 +302,10 @@ func (enc *encoder) LastEncoded() (ts.Datapoint, error) { return ts.Datapoint{}, errNoEncodedDatapoints } - result := ts.Datapoint{Timestamp: enc.tsEncoderState.PrevTime} + result := ts.Datapoint{ + Timestamp: enc.tsEncoderState.PrevTime, + TimestampNanos: xtime.ToUnixNano(enc.tsEncoderState.PrevTime), + } if enc.isFloat { result.Value = math.Float64frombits(enc.floatEnc.PrevFloatBits) } else { diff --git a/src/dbnode/persist/fs/commitlog/reader.go b/src/dbnode/persist/fs/commitlog/reader.go index ac5ea3fed2..65976dfdf3 100644 --- a/src/dbnode/persist/fs/commitlog/reader.go +++ b/src/dbnode/persist/fs/commitlog/reader.go @@ -170,8 +170,9 @@ func (r *reader) Read() (LogEntry, error) { result := LogEntry{ Series: metadata, Datapoint: ts.Datapoint{ - Timestamp: time.Unix(0, entry.Timestamp), - Value: entry.Value, + Timestamp: time.Unix(0, entry.Timestamp), + TimestampNanos: xtime.UnixNano(entry.Timestamp), + Value: entry.Value, }, Unit: xtime.Unit(entry.Unit), Metadata: LogEntryMetadata{ diff --git a/src/dbnode/storage/database.go b/src/dbnode/storage/database.go index 185c2d9ed7..2848dbfd6a 100644 --- a/src/dbnode/storage/database.go +++ b/src/dbnode/storage/database.go @@ -593,7 +593,12 @@ func (d *db) Write( return nil } - dp := ts.Datapoint{Timestamp: timestamp, Value: value} + dp := ts.Datapoint{ + Timestamp: timestamp, + TimestampNanos: xtime.ToUnixNano(timestamp), + Value: value, + } + return d.commitLog.Write(ctx, series, dp, unit, annotation) } @@ -622,7 +627,12 @@ func (d *db) WriteTagged( return nil } - dp := ts.Datapoint{Timestamp: timestamp, Value: value} + dp := ts.Datapoint{ + Timestamp: timestamp, + TimestampNanos: xtime.ToUnixNano(timestamp), + Value: value, + } + return d.commitLog.Write(ctx, series, dp, unit, annotation) } diff --git a/src/dbnode/storage/series/buffer.go b/src/dbnode/storage/series/buffer.go index 44be94a0aa..6bfa593f28 100644 --- a/src/dbnode/storage/series/buffer.go +++ b/src/dbnode/storage/series/buffer.go @@ -1128,8 +1128,9 @@ func (b *BufferBucket) write( schema namespace.SchemaDescr, ) (bool, error) { datapoint := ts.Datapoint{ - Timestamp: timestamp, - Value: value, + Timestamp: timestamp, + TimestampNanos: xtime.ToUnixNano(timestamp), + Value: value, } // Find the correct encoder to write to diff --git a/src/dbnode/ts/write_batch.go b/src/dbnode/ts/write_batch.go index 987987fec0..fb3a8231e0 100644 --- a/src/dbnode/ts/write_batch.go +++ b/src/dbnode/ts/write_batch.go @@ -205,8 +205,9 @@ func newBatchWriterWrite( Namespace: namespace, }, Datapoint: Datapoint{ - Timestamp: timestamp, - Value: value, + Timestamp: timestamp, + TimestampNanos: xtime.ToUnixNano(timestamp), + Value: value, }, Unit: unit, Annotation: annotation, diff --git a/src/query/api/v1/handler/prometheus/native/read_common.go b/src/query/api/v1/handler/prometheus/native/read_common.go index 84da0a2b08..8a20f318fd 100644 --- a/src/query/api/v1/handler/prometheus/native/read_common.go +++ b/src/query/api/v1/handler/prometheus/native/read_common.go @@ -22,10 +22,8 @@ package native import ( "context" - "fmt" "math" "net/http" - "sort" "github.com/m3db/m3/src/query/api/v1/handler" "github.com/m3db/m3/src/query/api/v1/handler/prometheus" @@ -79,219 +77,61 @@ func read( return emptyResult, err } - result, err := engine.ExecuteExpr(ctx, parser, opts, fetchOpts, params) + bl, err := engine.ExecuteExpr(ctx, parser, opts, fetchOpts, params) if err != nil { return emptyResult, err } - // Block slices are sorted by start time. - // TODO: Pooling - sortedBlockList := make([]blockWithMeta, 0, initialBlockAlloc) - resultChan := result.ResultChan() - defer func() { - for range resultChan { - // NB: drain result channel in case of early termination. - } - }() - - var ( - numSteps int - numSeries int - - firstElement bool - ) - - meta := block.NewResultMetadata() - // TODO(nikunj): Stream blocks to client - for blkResult := range resultChan { - if err := blkResult.Err; err != nil { - return emptyResult, err - } - - b := blkResult.Block - if !firstElement { - firstElement = true - firstStepIter, err := b.StepIter() - if err != nil { - return emptyResult, err - } - - numSteps = firstStepIter.StepCount() - numSeries = len(firstStepIter.SeriesMeta()) - meta = b.Meta().ResultMetadata - } - - // Insert blocks sorted by start time. - insertResult, err := insertSortedBlock(b, sortedBlockList, - numSteps, numSeries) - if err != nil { - return emptyResult, err - } - - sortedBlockList = insertResult.blocks - meta = meta.CombineMetadata(insertResult.meta) - } - - // Ensure that the blocks are closed. Can't do this above since - // sortedBlockList might change. - defer func() { - for _, b := range sortedBlockList { - // FIXME: this will double close blocks that have gone through the - // function pipeline. - b.block.Close() - } - }() - - series, err := sortedBlocksToSeriesList(sortedBlockList) + resultMeta := bl.Meta().ResultMetadata + it, err := bl.StepIter() if err != nil { return emptyResult, err } - series = prometheus.FilterSeriesByOptions(series, fetchOpts) - return readResult{ - series: series, - meta: meta, - }, nil -} - -func sortedBlocksToSeriesList(blockList []blockWithMeta) ([]*ts.Series, error) { - if len(blockList) == 0 { - return emptySeriesList, nil - } - - var ( - firstBlock = blockList[0].block - meta = firstBlock.Meta() - bounds = meta.Bounds - commonTags = meta.Tags.Tags - ) - - firstIter, err := firstBlock.StepIter() - if err != nil { - return nil, err - } - - var ( - seriesMeta = firstIter.SeriesMeta() - numSeries = len(seriesMeta) - seriesList = make([]*ts.Series, 0, numSeries) - iters = make([]block.StepIter, 0, len(blockList)) - ) - - // To create individual series, we iterate over seriesIterators for each - // block in the block list. For each iterator, the nth current() will - // be combined to give the nth series. - for _, b := range blockList { - it, err := b.block.StepIter() - if err != nil { - return nil, err - } - - iters = append(iters, it) - } - - numValues := 0 - for _, block := range blockList { - b, err := block.block.StepIter() - if err != nil { - return nil, err - } - - numValues += b.StepCount() - } + seriesMeta := it.SeriesMeta() + numSeries := len(seriesMeta) + bounds := bl.Meta().Bounds // Initialize data slices. - data := make([]ts.FixedResolutionMutableValues, numSeries) - for i := range data { - data[i] = ts.NewFixedStepValues(bounds.StepSize, numValues, - math.NaN(), bounds.Start) + data := make([]ts.FixedResolutionMutableValues, 0, numSeries) + for i := 0; i < numSeries; i++ { + data = append(data, ts.NewFixedStepValues(bounds.StepSize, bounds.Steps(), + math.NaN(), bounds.Start)) } stepIndex := 0 - for _, it := range iters { - for it.Next() { - step := it.Current() - for seriesIndex, v := range step.Values() { - // NB: iteration moves by time step across a block, so each value in the - // step iterator corresponds to a different series; transform it to - // series-based iteration using mutable series values. - mutableValuesForSeries := data[seriesIndex] - mutableValuesForSeries.SetValueAt(stepIndex, v) - } - - stepIndex++ + for it.Next() { + step := it.Current() + for seriesIndex, v := range step.Values() { + mutableValuesForSeries := data[seriesIndex] + mutableValuesForSeries.SetValueAt(stepIndex, v) } - if err := it.Err(); err != nil { - return nil, err - } + stepIndex++ } + if err := it.Err(); err != nil { + return emptyResult, err + } + + seriesList := make([]*ts.Series, 0, len(data)) for i, values := range data { var ( meta = seriesMeta[i] - tags = meta.Tags.AddTags(commonTags) + tags = meta.Tags.AddTags(bl.Meta().Tags.Tags) series = ts.NewSeries(meta.Name, values, tags) ) seriesList = append(seriesList, series) } - return seriesList, nil -} - -type insertBlockResult struct { - blocks []blockWithMeta - meta block.ResultMetadata -} - -func insertSortedBlock( - b block.Block, - blockList []blockWithMeta, - stepCount, - seriesCount int, -) (insertBlockResult, error) { - it, err := b.StepIter() - emptyResult := insertBlockResult{meta: b.Meta().ResultMetadata} - if err != nil { + if err := bl.Close(); err != nil { return emptyResult, err } - meta := b.Meta() - if len(blockList) == 0 { - blockList = append(blockList, blockWithMeta{ - block: b, - meta: meta, - }) - - return insertBlockResult{ - blocks: blockList, - meta: b.Meta().ResultMetadata, - }, nil - } - - blockSeriesCount := len(it.SeriesMeta()) - if seriesCount != blockSeriesCount { - return emptyResult, fmt.Errorf( - "mismatch in number of series for the block, wanted: %d, found: %d", - seriesCount, blockSeriesCount) - } - - // Binary search to keep the start times sorted - index := sort.Search(len(blockList), func(i int) bool { - return blockList[i].meta.Bounds.Start.After(meta.Bounds.Start) - }) - - // Append here ensures enough size in the slice - blockList = append(blockList, blockWithMeta{}) - copy(blockList[index+1:], blockList[index:]) - blockList[index] = blockWithMeta{ - block: b, - meta: meta, - } - - return insertBlockResult{ - meta: b.Meta().ResultMetadata, - blocks: blockList, + seriesList = prometheus.FilterSeriesByOptions(seriesList, fetchOpts) + return readResult{ + series: seriesList, + meta: resultMeta, }, nil } diff --git a/src/query/block/column.go b/src/query/block/column.go index d4d3bab9a9..86f406a566 100644 --- a/src/query/block/column.go +++ b/src/query/block/column.go @@ -237,6 +237,10 @@ func (cb ColumnBlockBuilder) PopulateColumns(size int) { for i := range cb.block.columns { cb.block.columns[i] = column{Values: cols[size*i : size*(i+1)]} } + + // NB: initialize a clean series meta list with given cap and len, + // as row operations are done by arbitrary index. + cb.block.seriesMeta = make([]SeriesMeta, size) } // SetRow sets a given block row to the given values and metadata. diff --git a/src/query/block/column_test.go b/src/query/block/column_test.go index d2f86341af..ee1c9392bc 100644 --- a/src/query/block/column_test.go +++ b/src/query/block/column_test.go @@ -22,20 +22,26 @@ package block import ( "context" + "fmt" "testing" + "time" "github.com/m3db/m3/src/query/cost" "github.com/m3db/m3/src/query/models" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" "github.com/uber-go/tally" ) -func TestColumnBuilderInfoTypes(t *testing.T) { - ctx := models.NewQueryContext(context.Background(), +func makeTestQueryContext() *models.QueryContext { + return models.NewQueryContext(context.Background(), tally.NoopScope, cost.NoopChainedEnforcer(), models.QueryContextOptions{}) +} +func TestColumnBuilderInfoTypes(t *testing.T) { + ctx := makeTestQueryContext() builder := NewColumnBlockBuilder(ctx, Metadata{}, []SeriesMeta{}) block := builder.Build() assert.Equal(t, BlockDecompressed, block.Info().blockType) @@ -43,3 +49,60 @@ func TestColumnBuilderInfoTypes(t *testing.T) { block = builder.BuildAsType(BlockScalar) assert.Equal(t, BlockScalar, block.Info().blockType) } + +func TestSetRow(t *testing.T) { + buildMeta := func(i int) SeriesMeta { + name := fmt.Sprint(i) + + return SeriesMeta{ + Name: []byte(name), + Tags: models.MustMakeTags("name", name), + } + } + + size := 10 + metas := make([]SeriesMeta, size) + for i := range metas { + metas[i] = buildMeta(i) + } + + ctx := makeTestQueryContext() + builder := NewColumnBlockBuilder(ctx, Metadata{ + Bounds: models.Bounds{StepSize: time.Minute, Duration: time.Minute}, + }, nil) + + require.NoError(t, builder.AddCols(1)) + builder.PopulateColumns(size) + // NB: set the row metas backwards. + j := 0 + for i := size - 1; i >= 0; i-- { + err := builder.SetRow(j, []float64{float64(i)}, metas[i]) + require.NoError(t, err) + j++ + } + + bl := builder.Build() + it, err := bl.StepIter() + require.NoError(t, err) + + actualMetas := it.SeriesMeta() + for i, m := range actualMetas { + ex := fmt.Sprint(size - 1 - i) + assert.Equal(t, ex, string(m.Name)) + require.Equal(t, 1, m.Tags.Len()) + tag, found := m.Tags.Get([]byte("name")) + require.True(t, found) + assert.Equal(t, ex, string(tag)) + } + + assert.True(t, it.Next()) + exVals := make([]float64, size) + for i := range exVals { + exVals[i] = float64(size - 1 - i) + } + + vals := it.Current().Values() + assert.Equal(t, exVals, vals) + assert.False(t, it.Next()) + assert.NoError(t, it.Err()) +} diff --git a/src/query/executor/engine.go b/src/query/executor/engine.go index 10ca4f1cbe..584d141388 100644 --- a/src/query/executor/engine.go +++ b/src/query/executor/engine.go @@ -24,6 +24,7 @@ import ( "context" "time" + "github.com/m3db/m3/src/query/block" qcost "github.com/m3db/m3/src/query/cost" "github.com/m3db/m3/src/query/models" "github.com/m3db/m3/src/query/parser" @@ -43,12 +44,6 @@ type QueryOptions struct { QueryContextOptions models.QueryContextOptions } -// Query is the result after execution. -type Query struct { - Err error - Result Result -} - // NewEngine returns a new instance of QueryExecutor. func NewEngine( engineOpts EngineOptions, @@ -124,7 +119,7 @@ func (e *engine) ExecuteExpr( opts *QueryOptions, fetchOpts *storage.FetchOptions, params models.RequestParams, -) (Result, error) { +) (block.Block, error) { perQueryEnforcer := e.opts.GlobalEnforcer().Child(qcost.QueryLevel) defer perQueryEnforcer.Close() req := newRequest(e, params, fetchOpts, e.opts.InstrumentOptions()) @@ -147,20 +142,16 @@ func (e *engine) ExecuteExpr( sp, ctx := opentracing.StartSpanFromContext(ctx, "executing") defer sp.Finish() - result := state.resultNode scope := e.opts.InstrumentOptions().MetricsScope() queryCtx := models.NewQueryContext(ctx, scope, perQueryEnforcer, opts.QueryContextOptions) - go func() { - if err := state.Execute(queryCtx); err != nil { - result.abort(err) - } else { - result.done() - } - }() + if err := state.Execute(queryCtx); err != nil { + state.sink.closeWithError(err) + return nil, err + } - return result, nil + return state.sink.getValue() } func (e *engine) Options() EngineOptions { diff --git a/src/query/executor/engine_test.go b/src/query/executor/engine_test.go index a779660f2a..8906c613c1 100644 --- a/src/query/executor/engine_test.go +++ b/src/query/executor/engine_test.go @@ -27,14 +27,15 @@ import ( "time" "github.com/m3db/m3/src/dbnode/client" + "github.com/m3db/m3/src/query/block" "github.com/m3db/m3/src/query/cost" qcost "github.com/m3db/m3/src/query/cost" "github.com/m3db/m3/src/query/models" "github.com/m3db/m3/src/query/parser/promql" "github.com/m3db/m3/src/query/storage" - "github.com/m3db/m3/src/query/storage/mock" "github.com/m3db/m3/src/query/test/m3" "github.com/m3db/m3/src/x/instrument" + xtest "github.com/m3db/m3/src/x/test" "github.com/golang/mock/gomock" "github.com/stretchr/testify/assert" @@ -56,8 +57,8 @@ func newEngine( return NewEngine(engineOpts) } -func TestEngine_Execute(t *testing.T) { - ctrl := gomock.NewController(t) +func TestExecute(t *testing.T) { + ctrl := xtest.NewController(t) store, session := m3.NewStorageAndSession(t, ctrl) session.EXPECT().FetchTagged(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil, client.FetchResponseMetadata{Exhaustive: false}, fmt.Errorf("dummy")) @@ -70,8 +71,8 @@ func TestEngine_Execute(t *testing.T) { assert.NotNil(t, err) } -func TestEngine_ExecuteExpr(t *testing.T) { - ctrl := gomock.NewController(t) +func TestExecuteExpr(t *testing.T) { + ctrl := xtest.NewController(t) defer ctrl.Finish() mockEnforcer := cost.NewMockChainedEnforcer(ctrl) @@ -84,7 +85,12 @@ func TestEngine_ExecuteExpr(t *testing.T) { models.NewTagOptions(), promql.NewParseOptions()) require.NoError(t, err) - engine := newEngine(mock.NewMockStorage(), defaultLookbackDuration, + store := storage.NewMockStorage(ctrl) + store.EXPECT().FetchBlocks(gomock.Any(), gomock.Any(), gomock.Any()). + Return(block.Result{ + Blocks: []block.Block{block.NewMockBlock(ctrl)}, + }, nil) + engine := newEngine(store, defaultLookbackDuration, mockParent, instrument.NewOptions()) _, err = engine.ExecuteExpr(context.TODO(), parser, &QueryOptions{}, storage.NewFetchOptions(), models.RequestParams{ diff --git a/src/query/executor/result.go b/src/query/executor/result.go index af65be0a23..053b785639 100644 --- a/src/query/executor/result.go +++ b/src/query/executor/result.go @@ -24,79 +24,74 @@ import ( "sync" "github.com/m3db/m3/src/query/block" + "github.com/m3db/m3/src/query/executor/transform" "github.com/m3db/m3/src/query/models" "github.com/m3db/m3/src/query/parser" - "github.com/pkg/errors" ) -const ( - // TODO: Get from config - channelSize = 100 -) +type sink interface { + transform.OpNode + closeWithError(err error) + getValue() (block.Block, error) +} -var ( - errAborted = errors.New("the query has been aborted") -) +// resultNode collects final blocks. +type resultNode struct { + sync.RWMutex + wg sync.WaitGroup -// Result provides the execution results -type Result interface { - abort(err error) - done() - ResultChan() chan ResultChan + err error + completed bool + block block.Block } -// ResultNode is used to provide the results to the caller from the query execution -type ResultNode struct { - mu sync.Mutex - resultChan chan ResultChan - aborted bool +func newResultNode() sink { + node := &resultNode{} + node.wg.Add(1) + return node } -// ResultChan has the result from a block -type ResultChan struct { - Block block.Block - Err error +func (r *resultNode) closeWithLock() { + r.wg.Done() + r.completed = true } -func newResultNode() *ResultNode { - blocks := make(chan ResultChan, channelSize) - return &ResultNode{resultChan: blocks} -} +// Process sets the incoming block and releases the wait group. +func (r *resultNode) Process(_ *models.QueryContext, + _ parser.NodeID, block block.Block) error { + r.Lock() + defer r.Unlock() -// Process the block -func (r *ResultNode) Process(queryCtx *models.QueryContext, ID parser.NodeID, block block.Block) error { - if r.aborted { - return errAborted + if r.err != nil { + return r.err } - r.resultChan <- ResultChan{ - Block: block, + if r.block != nil { + r.err = errors.New("resultNode block already set") + return r.err } + r.block = block + r.closeWithLock() return nil } -// ResultChan return a channel to stream back resultChan to the client -func (r *ResultNode) ResultChan() chan ResultChan { - return r.resultChan -} - -// TODO: Signal error downstream -func (r *ResultNode) abort(err error) { - r.mu.Lock() - defer r.mu.Unlock() - if r.aborted { +func (r *resultNode) closeWithError(err error) { + r.Lock() + defer r.Unlock() + if r.completed { return } - r.aborted = true - r.resultChan <- ResultChan{ - Err: err, - } - close(r.resultChan) + r.err = err + r.closeWithLock() } -func (r *ResultNode) done() { - close(r.resultChan) +func (r *resultNode) getValue() (block.Block, error) { + r.wg.Wait() + r.RLock() + bl, err := r.block, r.err + r.RUnlock() + return bl, err } diff --git a/src/query/executor/state.go b/src/query/executor/state.go index 79f9d4c044..ba37ed2a69 100644 --- a/src/query/executor/state.go +++ b/src/query/executor/state.go @@ -37,10 +37,10 @@ import ( // ExecutionState represents the execution hierarchy. type ExecutionState struct { - plan plan.PhysicalPlan - sources []parser.Source - resultNode Result - storage storage.Storage + plan plan.PhysicalPlan + sources []parser.Source + sink sink + storage storage.Storage } // CreateSource creates a source node. @@ -126,9 +126,9 @@ func GenerateExecutionState( return nil, errors.New("empty sources for the execution state") } - rNode := newResultNode() - state.resultNode = rNode - controller.AddTransform(rNode) + sink := newResultNode() + state.sink = sink + controller.AddTransform(sink) return state, nil } @@ -181,12 +181,12 @@ func (s *ExecutionState) createNode( // Execute the sources in parallel and return the first error. func (s *ExecutionState) Execute(queryCtx *models.QueryContext) error { - requests := make([]execution.Request, len(s.sources)) - for idx, source := range s.sources { - requests[idx] = sourceRequest{ + requests := make([]execution.Request, 0, len(s.sources)) + for _, source := range s.sources { + requests = append(requests, sourceRequest{ source: source, queryCtx: queryCtx, - } + }) } return execution.ExecuteParallel(queryCtx.Ctx, requests) @@ -194,8 +194,7 @@ func (s *ExecutionState) Execute(queryCtx *models.QueryContext) error { // String representation of the state. func (s *ExecutionState) String() string { - return fmt.Sprintf("plan: %s\nsources: %s\nresult: %s", - s.plan, s.sources, s.resultNode) + return fmt.Sprintf("plan: %s\nsources: %s\n", s.plan, s.sources) } type sourceRequest struct { diff --git a/src/query/executor/transform/exec_test.go b/src/query/executor/transform/exec_test.go index 58926fdaa3..d9e1335396 100644 --- a/src/query/executor/transform/exec_test.go +++ b/src/query/executor/transform/exec_test.go @@ -31,6 +31,7 @@ import ( "github.com/m3db/m3/src/query/models" "github.com/m3db/m3/src/query/parser" "github.com/m3db/m3/src/query/test" + xtest "github.com/m3db/m3/src/x/test" "github.com/golang/mock/gomock" "github.com/opentracing/opentracing-go" @@ -51,7 +52,7 @@ func TestProcessSimpleBlock(t *testing.T) { } setup := func(t *testing.T) (*testContext, func()) { - ctrl := gomock.NewController(t) + ctrl := xtest.NewController(t) controller := &Controller{ ID: parser.NodeID("foo"), diff --git a/src/query/executor/types.go b/src/query/executor/types.go index d3de67d952..c5194c2f84 100644 --- a/src/query/executor/types.go +++ b/src/query/executor/types.go @@ -24,6 +24,7 @@ import ( "context" "time" + "github.com/m3db/m3/src/query/block" qcost "github.com/m3db/m3/src/query/cost" "github.com/m3db/m3/src/query/models" "github.com/m3db/m3/src/query/parser" @@ -50,7 +51,7 @@ type Engine interface { opts *QueryOptions, fetchOpts *storage.FetchOptions, params models.RequestParams, - ) (Result, error) + ) (block.Block, error) // Options returns the currently configured options. Options() EngineOptions diff --git a/src/query/executor/types_mock.go b/src/query/executor/types_mock.go index d2be11a12a..4edb8181dd 100644 --- a/src/query/executor/types_mock.go +++ b/src/query/executor/types_mock.go @@ -1,7 +1,7 @@ // Code generated by MockGen. DO NOT EDIT. // Source: github.com/m3db/m3/src/query/executor (interfaces: Engine) -// Copyright (c) 2019 Uber Technologies, Inc. +// Copyright (c) 2020 Uber Technologies, Inc. // // Permission is hereby granted, free of charge, to any person obtaining a copy // of this software and associated documentation files (the "Software"), to deal @@ -28,6 +28,7 @@ import ( "context" "reflect" + "github.com/m3db/m3/src/query/block" "github.com/m3db/m3/src/query/models" "github.com/m3db/m3/src/query/parser" "github.com/m3db/m3/src/query/storage" @@ -73,10 +74,10 @@ func (mr *MockEngineMockRecorder) Close() *gomock.Call { } // ExecuteExpr mocks base method -func (m *MockEngine) ExecuteExpr(arg0 context.Context, arg1 parser.Parser, arg2 *QueryOptions, arg3 *storage.FetchOptions, arg4 models.RequestParams) (Result, error) { +func (m *MockEngine) ExecuteExpr(arg0 context.Context, arg1 parser.Parser, arg2 *QueryOptions, arg3 *storage.FetchOptions, arg4 models.RequestParams) (block.Block, error) { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "ExecuteExpr", arg0, arg1, arg2, arg3, arg4) - ret0, _ := ret[0].(Result) + ret0, _ := ret[0].(block.Block) ret1, _ := ret[1].(error) return ret0, ret1 } diff --git a/src/query/functions/temporal/aggregation.go b/src/query/functions/temporal/aggregation.go index e6fbbd725d..7a108db6f6 100644 --- a/src/query/functions/temporal/aggregation.go +++ b/src/query/functions/temporal/aggregation.go @@ -76,12 +76,10 @@ type aggProcessor struct { func (a aggProcessor) initialize( _ time.Duration, - controller *transform.Controller, opts transform.Options, ) processor { return &aggNode{ - controller: controller, - aggFunc: a.aggFunc, + aggFunc: a.aggFunc, } } @@ -137,9 +135,8 @@ func NewAggOp(args []interface{}, optype string) (transform.Params, error) { } type aggNode struct { - controller *transform.Controller - values []float64 - aggFunc func([]float64) float64 + values []float64 + aggFunc func([]float64) float64 } func (a *aggNode) process(datapoints ts.Datapoints, _ iterationBounds) float64 { diff --git a/src/query/functions/temporal/base.go b/src/query/functions/temporal/base.go index 5a3176d831..915744372c 100644 --- a/src/query/functions/temporal/base.go +++ b/src/query/functions/temporal/base.go @@ -50,11 +50,7 @@ type iterationBounds struct { // makeProcessor is a way to create a transform. type makeProcessor interface { // initialize initializes the processor. - initialize( - duration time.Duration, - controller *transform.Controller, - opts transform.Options, - ) processor + initialize(duration time.Duration, opts transform.Options) processor } // processor is implemented by the underlying transforms. @@ -97,7 +93,7 @@ func (o baseOp) Node( return &baseNode{ controller: controller, op: o, - processor: o.processorFn.initialize(o.duration, controller, opts), + makeProcessor: o.processorFn, transformOpts: opts, } } @@ -109,7 +105,7 @@ type baseNode struct { // https://github.com/m3db/m3/issues/1430 controller controller op baseOp - processor processor + makeProcessor makeProcessor transformOpts transform.Options } @@ -127,47 +123,27 @@ func (c *baseNode) Process( return fmt.Errorf("bound duration cannot be 0, bounds: %v", bounds) } - seriesIter, err := b.SeriesIter() - if err != nil { - return err - } - - // rename series to exclude their __name__ tag as part of function processing. - resultSeriesMeta := make([]block.SeriesMeta, 0, len(seriesIter.SeriesMeta())) - for _, m := range seriesIter.SeriesMeta() { - tags := m.Tags.WithoutName() - resultSeriesMeta = append(resultSeriesMeta, block.SeriesMeta{ - Name: tags.ID(), - Tags: tags, - }) - } - - builder, err := c.controller.BlockBuilder(queryCtx, meta, resultSeriesMeta) - if err != nil { - return err - } - - steps := bounds.Steps() - if err := builder.AddCols(steps); err != nil { - return err - } - m := blockMeta{ end: xtime.ToUnixNano(bounds.Start), - seriesMeta: resultSeriesMeta, + queryCtx: queryCtx, aggDuration: xtime.UnixNano(c.op.duration), stepSize: xtime.UnixNano(bounds.StepSize), - steps: steps, + steps: bounds.Steps(), } concurrency := runtime.NumCPU() + var builder block.Builder batches, err := b.MultiSeriesIter(concurrency) if err != nil { // NB: If the unconsolidated block does not support multi series iteration, // fallback to processing series one by one. - singleProcess(ctx, seriesIter, builder, m, c.processor) + builder, err = c.singleProcess(ctx, b, m) } else { - batchProcess(ctx, batches, builder, m, c.processor) + builder, err = c.batchProcess(ctx, b, batches, m) + } + + if err != nil { + return err } // NB: safe to close the block here. @@ -184,33 +160,47 @@ type blockMeta struct { end xtime.UnixNano aggDuration xtime.UnixNano stepSize xtime.UnixNano + queryCtx *models.QueryContext steps int - seriesMeta []block.SeriesMeta } -func batchProcess( +func (c *baseNode) batchProcess( ctx context.Context, + b block.Block, iterBatches []block.SeriesIterBatch, - builder block.Builder, m blockMeta, - p processor, -) error { +) (block.Builder, error) { var ( - metas = m.seriesMeta - mu sync.Mutex wg sync.WaitGroup multiErr xerrors.MultiError idx int ) - builder.PopulateColumns(len(metas)) + meta := b.Meta() + builder, err := c.controller.BlockBuilder(m.queryCtx, meta, nil) + if err != nil { + return nil, err + } + + err = builder.AddCols(m.steps) + if err != nil { + return nil, err + } + + numSeries := 0 + for _, b := range iterBatches { + numSeries += b.Size + } + + builder.PopulateColumns(numSeries) for _, batch := range iterBatches { wg.Add(1) // capture loop variables loopIndex := idx batch := batch idx = idx + batch.Size + p := c.makeProcessor.initialize(c.op.duration, c.transformOpts) go func() { err := parallelProcess(ctx, loopIndex, batch.Iter, builder, m, p, &mu) if err != nil { @@ -225,7 +215,7 @@ func batchProcess( } wg.Wait() - return multiErr.FinalError() + return builder, multiErr.FinalError() } func parallelProcess( @@ -248,14 +238,20 @@ func parallelProcess( // Simulate as if we did all the decoding up front so we can visualize // how much decoding takes relative to the entire processing of the function. - _, sp, _ := xcontext.StartSampledTraceSpan(ctx, tracepoint.TemporalDecodeParallel, opentracing.StartTime(start)) + _, sp, _ := xcontext.StartSampledTraceSpan(ctx, + tracepoint.TemporalDecodeParallel, opentracing.StartTime(start)) sp.FinishWithOptions(opentracing.FinishOptions{ FinishTime: start.Add(decodeDuration), }) }() values := make([]float64, 0, blockMeta.steps) - for iter.Next() { + metas := iter.SeriesMeta() + for i := 0; iter.Next(); i++ { + if i >= len(metas) { + return fmt.Errorf("invalid series meta index: %d, max %d", i, len(metas)) + } + var ( newVal float64 init = 0 @@ -266,12 +262,17 @@ func parallelProcess( series = iter.Current() datapoints = series.Datapoints() stats = series.Stats() + seriesMeta = metas[i] ) if stats.Enabled { decodeDuration += stats.DecodeDuration } + // rename series to exclude their __name__ tag as + // part of function processing. + seriesMeta.Tags = seriesMeta.Tags.WithoutName() + seriesMeta.Name = seriesMeta.Tags.ID() values = values[:0] for i := 0; i < blockMeta.steps; i++ { iterBounds := iterationBounds{ @@ -293,9 +294,10 @@ func parallelProcess( } mu.Lock() + // NB: this sets the values internally, so no need to worry about keeping // a reference to underlying `values`. - err := builder.SetRow(idx, values, blockMeta.seriesMeta[idx]) + err := builder.SetRow(idx, values, seriesMeta) mu.Unlock() idx++ if err != nil { @@ -306,29 +308,56 @@ func parallelProcess( return iter.Err() } -func singleProcess( +func (c *baseNode) singleProcess( ctx context.Context, - seriesIter block.SeriesIter, - builder block.Builder, + b block.Block, m blockMeta, - p processor, -) error { +) (block.Builder, error) { var ( start = time.Now() decodeDuration time.Duration ) + defer func() { if decodeDuration == 0 { return // Do not record this span if instrumentation is not turned on. } // Simulate as if we did all the decoding up front so we can visualize // how much decoding takes relative to the entire processing of the function. - _, sp, _ := xcontext.StartSampledTraceSpan(ctx, tracepoint.TemporalDecodeSingle, opentracing.StartTime(start)) + _, sp, _ := xcontext.StartSampledTraceSpan(ctx, + tracepoint.TemporalDecodeSingle, opentracing.StartTime(start)) sp.FinishWithOptions(opentracing.FinishOptions{ FinishTime: start.Add(decodeDuration), }) }() + seriesIter, err := b.SeriesIter() + if err != nil { + return nil, err + } + + // rename series to exclude their __name__ tag as part of function processing. + resultSeriesMeta := make([]block.SeriesMeta, 0, len(seriesIter.SeriesMeta())) + for _, m := range seriesIter.SeriesMeta() { + tags := m.Tags.WithoutName() + resultSeriesMeta = append(resultSeriesMeta, block.SeriesMeta{ + Name: tags.ID(), + Tags: tags, + }) + } + + meta := b.Meta() + builder, err := c.controller.BlockBuilder(m.queryCtx, meta, resultSeriesMeta) + if err != nil { + return nil, err + } + + err = builder.AddCols(m.steps) + if err != nil { + return nil, err + } + + p := c.makeProcessor.initialize(c.op.duration, c.transformOpts) for seriesIter.Next() { var ( newVal float64 @@ -361,7 +390,7 @@ func singleProcess( } if err := builder.AppendValue(i, newVal); err != nil { - return err + return nil, err } start += step @@ -369,7 +398,7 @@ func singleProcess( } } - return seriesIter.Err() + return builder, seriesIter.Err() } // getIndices returns the index of the points on the left and the right of the diff --git a/src/query/functions/temporal/base_test.go b/src/query/functions/temporal/base_test.go index 489a2ecfa3..6e38d0af97 100644 --- a/src/query/functions/temporal/base_test.go +++ b/src/query/functions/temporal/base_test.go @@ -21,6 +21,7 @@ package temporal import ( + "fmt" "math" "testing" "time" @@ -33,8 +34,10 @@ import ( "github.com/m3db/m3/src/query/test/executor" "github.com/m3db/m3/src/query/test/transformtest" "github.com/m3db/m3/src/query/ts" + xtest "github.com/m3db/m3/src/x/test" xtime "github.com/m3db/m3/src/x/time" + "github.com/golang/mock/gomock" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) @@ -52,66 +55,73 @@ type opGenerator func(t *testing.T, tc testCase) transform.Params func testTemporalFunc(t *testing.T, opGen opGenerator, tests []testCase) { for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - values, bounds := test.GenerateValuesAndBounds(tt.vals, nil) - boundStart := bounds.Start - - seriesMetas := []block.SeriesMeta{ - { - Name: []byte("s1"), - Tags: models.EmptyTags().AddTags([]models.Tag{{ - Name: []byte("t1"), - Value: []byte("v1"), - }}).SetName([]byte("foobar")), - }, - { - Name: []byte("s2"), - Tags: models.EmptyTags().AddTags([]models.Tag{{ - Name: []byte("t1"), - Value: []byte("v2"), - }}).SetName([]byte("foobar")), - }, + for _, runBatched := range []bool{true, false} { + name := tt.name + "_unbatched" + if runBatched { + name = tt.name + "_batched" } + t.Run(name, func(t *testing.T) { + values, bounds := test.GenerateValuesAndBounds(tt.vals, nil) + boundStart := bounds.Start + + seriesMetas := []block.SeriesMeta{ + { + Name: []byte("s1"), + Tags: models.EmptyTags().AddTags([]models.Tag{{ + Name: []byte("t1"), + Value: []byte("v1"), + }}).SetName([]byte("foobar")), + }, + { + Name: []byte("s2"), + Tags: models.EmptyTags().AddTags([]models.Tag{{ + Name: []byte("t1"), + Value: []byte("v2"), + }}).SetName([]byte("foobar")), + }, + } - bl := test.NewUnconsolidatedBlockFromDatapointsWithMeta(models.Bounds{ - Start: bounds.Start.Add(-2 * bounds.Duration), - Duration: bounds.Duration * 2, - StepSize: bounds.StepSize, - }, seriesMetas, values) - - c, sink := executor.NewControllerWithSink(parser.NodeID(1)) - baseOp := opGen(t, tt) - node := baseOp.Node(c, transformtest.Options(t, transform.OptionsParams{ - TimeSpec: transform.TimeSpec{ - Start: boundStart.Add(-2 * bounds.Duration), - End: bounds.End(), - Step: time.Second, - }, - })) - - err := node.Process(models.NoopQueryContext(), parser.NodeID(0), bl) - require.NoError(t, err) - - test.EqualsWithNansWithDelta(t, tt.expected, sink.Values, 0.0001) - // Name should be dropped from series tags. - expectedSeriesMetas := []block.SeriesMeta{ - block.SeriesMeta{ + bl := test.NewUnconsolidatedBlockFromDatapointsWithMeta(models.Bounds{ + Start: bounds.Start.Add(-2 * bounds.Duration), + Duration: bounds.Duration * 2, + StepSize: bounds.StepSize, + }, seriesMetas, values, runBatched) + + c, sink := executor.NewControllerWithSink(parser.NodeID(1)) + baseOp := opGen(t, tt) + node := baseOp.Node(c, transformtest.Options(t, transform.OptionsParams{ + TimeSpec: transform.TimeSpec{ + Start: boundStart.Add(-2 * bounds.Duration), + End: bounds.End(), + Step: time.Second, + }, + })) + + err := node.Process(models.NoopQueryContext(), parser.NodeID(0), bl) + require.NoError(t, err) + + test.EqualsWithNansWithDelta(t, tt.expected, sink.Values, 0.0001) + metaOne := block.SeriesMeta{ Name: []byte("t1=v1,"), Tags: models.EmptyTags().AddTags([]models.Tag{{ Name: []byte("t1"), Value: []byte("v1"), }}), - }, - block.SeriesMeta{ + } + + metaTwo := block.SeriesMeta{ Name: []byte("t1=v2,"), Tags: models.EmptyTags().AddTags([]models.Tag{{ Name: []byte("t1"), Value: []byte("v2"), - }})}, - } + }})} - assert.Equal(t, expectedSeriesMetas, sink.Metas) - }) + // NB: name should be dropped from series tags, and the name + // should be the updated ID. + expectedSeriesMetas := []block.SeriesMeta{metaOne, metaTwo} + require.Equal(t, expectedSeriesMetas, sink.Metas) + }) + } } } @@ -143,3 +153,151 @@ func TestGetIndicesError(t *testing.T) { require.Equal(t, 10, r) require.False(t, ok) } + +var _ block.SeriesIter = (*dummySeriesIter)(nil) + +type dummySeriesIter struct { + metas []block.SeriesMeta + vals []float64 + idx int +} + +func (it *dummySeriesIter) SeriesMeta() []block.SeriesMeta { + return it.metas +} + +func (it *dummySeriesIter) SeriesCount() int { + return len(it.metas) +} + +func (it *dummySeriesIter) Current() block.UnconsolidatedSeries { + return block.NewUnconsolidatedSeries( + ts.Datapoints{ts.Datapoint{Value: it.vals[it.idx]}}, + it.metas[it.idx], + block.UnconsolidatedSeriesStats{}, + ) +} + +func (it *dummySeriesIter) Next() bool { + if it.idx >= len(it.metas)-1 { + return false + } + + it.idx++ + return true +} + +func (it *dummySeriesIter) Err() error { + return nil +} + +func (it *dummySeriesIter) Close() { + //no-op +} + +func TestParallelProcess(t *testing.T) { + ctrl := xtest.NewController(t) + defer ctrl.Finish() + + tagName := "tag" + c, sink := executor.NewControllerWithSink(parser.NodeID(1)) + aggProcess := aggProcessor{ + aggFunc: func(fs []float64) float64 { + require.Equal(t, 1, len(fs)) + return fs[0] + }, + } + + node := baseNode{ + controller: c, + op: baseOp{duration: time.Minute}, + makeProcessor: aggProcess, + transformOpts: transform.Options{}, + } + + stepSize := time.Minute + bl := block.NewMockBlock(ctrl) + bl.EXPECT().Meta().Return(block.Metadata{ + Bounds: models.Bounds{ + StepSize: stepSize, + Duration: stepSize, + }}).AnyTimes() + + numSeries := 10 + seriesMetas := make([]block.SeriesMeta, 0, numSeries) + vals := make([]float64, 0, numSeries) + for i := 0; i < numSeries; i++ { + number := fmt.Sprint(i) + name := []byte(fmt.Sprintf("%d_should_not_appear_after_func_applied", i)) + meta := block.SeriesMeta{ + Name: []byte(number), + Tags: models.MustMakeTags(tagName, number).SetName(name), + } + + seriesMetas = append(seriesMetas, meta) + vals = append(vals, float64(i)) + } + + fullIter := &dummySeriesIter{ + idx: -1, + vals: vals, + metas: seriesMetas, + } + + bl.EXPECT().SeriesIter().Return(fullIter, nil).MaxTimes(1) + + numBatches := 3 + blockMetas := make([][]block.SeriesMeta, 0, numBatches) + blockVals := make([][]float64, 0, numBatches) + for i := 0; i < numBatches; i++ { + l := numSeries/numBatches + 1 + blockMetas = append(blockMetas, make([]block.SeriesMeta, 0, l)) + blockVals = append(blockVals, make([]float64, 0, l)) + } + + for i, meta := range seriesMetas { + idx := i % numBatches + blockMetas[idx] = append(blockMetas[idx], meta) + blockVals[idx] = append(blockVals[idx], float64(i)) + } + + batches := make([]block.SeriesIterBatch, 0, numBatches) + for i := 0; i < numBatches; i++ { + iter := &dummySeriesIter{ + idx: -1, + vals: blockVals[i], + metas: blockMetas[i], + } + + batches = append(batches, block.SeriesIterBatch{ + Iter: iter, + Size: len(blockVals[i]), + }) + } + + bl.EXPECT().MultiSeriesIter(gomock.Any()).Return(batches, nil).MaxTimes(1) + bl.EXPECT().Close().Times(1) + + err := node.Process(models.NoopQueryContext(), parser.NodeID(0), bl) + require.NoError(t, err) + + expected := []float64{ + 0, 3, 6, 9, + 1, 4, 7, + 2, 5, 8, + } + + for i, v := range sink.Values { + assert.Equal(t, expected[i], v[0]) + } + + for i, m := range sink.Metas { + expected := fmt.Sprint(expected[i]) + expectedName := fmt.Sprintf("tag=%s,", expected) + assert.Equal(t, expectedName, string(m.Name)) + require.Equal(t, 1, m.Tags.Len()) + tag, found := m.Tags.Get([]byte(tagName)) + require.True(t, found) + assert.Equal(t, expected, string(tag)) + } +} diff --git a/src/query/functions/temporal/functions.go b/src/query/functions/temporal/functions.go index c58e197e56..12ddde4669 100644 --- a/src/query/functions/temporal/functions.go +++ b/src/query/functions/temporal/functions.go @@ -49,11 +49,9 @@ type functionProcessor struct { func (f functionProcessor) initialize( _ time.Duration, - controller *transform.Controller, - opts transform.Options, + _ transform.Options, ) processor { - return &functionNode{ - controller: controller, + return &functionNode{ comparisonFunc: f.compFunc, } } @@ -84,8 +82,7 @@ func NewFunctionOp(args []interface{}, optype string) (transform.Params, error) return newBaseOp(duration, optype, f) } -type functionNode struct { - controller *transform.Controller +type functionNode struct { comparisonFunc comparisonFunc } diff --git a/src/query/functions/temporal/linear_regression.go b/src/query/functions/temporal/linear_regression.go index 29f857f6b1..2af1317c75 100644 --- a/src/query/functions/temporal/linear_regression.go +++ b/src/query/functions/temporal/linear_regression.go @@ -49,11 +49,9 @@ type linearRegressionProcessor struct { func (l linearRegressionProcessor) initialize( _ time.Duration, - controller *transform.Controller, opts transform.Options, ) processor { return &linearRegressionNode{ - controller: controller, timeSpec: opts.TimeSpec(), fn: l.fn, isDeriv: l.isDeriv, @@ -121,7 +119,6 @@ func NewLinearRegressionOp( } type linearRegressionNode struct { - controller *transform.Controller timeSpec transform.TimeSpec fn linearRegFn isDeriv bool diff --git a/src/query/functions/temporal/rate.go b/src/query/functions/temporal/rate.go index ac4b3f7bd0..13ae045274 100644 --- a/src/query/functions/temporal/rate.go +++ b/src/query/functions/temporal/rate.go @@ -56,11 +56,9 @@ type rateProcessor struct { func (r rateProcessor) initialize( duration time.Duration, - controller *transform.Controller, - opts transform.Options, + _ transform.Options, ) processor { return &rateNode{ - controller: controller, isRate: r.isRate, isCounter: r.isCounter, rateFn: r.rateFn, @@ -121,7 +119,6 @@ type rateFn func( ) float64 type rateNode struct { - controller *transform.Controller isRate, isCounter bool duration time.Duration rateFn rateFn diff --git a/src/query/test/block.go b/src/query/test/block.go index bd960480be..47d5f3f238 100644 --- a/src/query/test/block.go +++ b/src/query/test/block.go @@ -35,12 +35,15 @@ type multiSeriesBlock struct { lookbackDuration time.Duration meta block.Metadata seriesList ts.SeriesList + query *storage.FetchQuery + enableBatched bool } func newMultiSeriesBlock( fetchResult *storage.FetchResult, query *storage.FetchQuery, lookbackDuration time.Duration, + enableBatched bool, ) block.Block { meta := block.Metadata{ Bounds: models.Bounds{ @@ -56,6 +59,8 @@ func newMultiSeriesBlock( seriesList: fetchResult.SeriesList, meta: meta, lookbackDuration: lookbackDuration, + enableBatched: enableBatched, + query: query, } } @@ -79,7 +84,39 @@ func (m multiSeriesBlock) SeriesIter() (block.SeriesIter, error) { func (m multiSeriesBlock) MultiSeriesIter( concurrency int, ) ([]block.SeriesIterBatch, error) { - return nil, errors.New("batched iterator is not supported by test block") + if !m.enableBatched { + return nil, + errors.New("batched iterator is not supported by this test block") + } + + batches := make([]ts.SeriesList, 0, concurrency) + for i := 0; i < concurrency; i++ { + batches = append(batches, make(ts.SeriesList, 0, 10)) + } + + // round-robin series. + for i, seriesList := range m.seriesList { + batches[i%concurrency] = append(batches[i%concurrency], seriesList) + } + + seriesIterBatches := make([]block.SeriesIterBatch, 0, concurrency) + for _, batch := range batches { + insideBlock := newMultiSeriesBlock(&storage.FetchResult{ + SeriesList: batch, + Metadata: m.meta.ResultMetadata, + }, m.query, m.lookbackDuration, false) + it, err := insideBlock.SeriesIter() + if err != nil { + return nil, err + } + + seriesIterBatches = append(seriesIterBatches, block.SeriesIterBatch{ + Iter: it, + Size: len(batch), + }) + } + + return seriesIterBatches, nil } func (m multiSeriesBlock) SeriesMeta() []block.SeriesMeta { diff --git a/src/query/test/builder.go b/src/query/test/builder.go index 066589ccb5..e28d938e62 100644 --- a/src/query/test/builder.go +++ b/src/query/test/builder.go @@ -54,6 +54,7 @@ func NewUnconsolidatedBlockFromDatapointsWithMeta( bounds models.Bounds, meta []block.SeriesMeta, seriesValues [][]float64, + enableBatched bool, ) block.Block { seriesList := make(ts.SeriesList, len(seriesValues)) for i, values := range seriesValues { @@ -70,21 +71,7 @@ func NewUnconsolidatedBlockFromDatapointsWithMeta( Start: bounds.Start, End: bounds.End(), Interval: bounds.StepSize, - }, time.Minute) -} - -// NewUnconsolidatedBlockFromDatapoints creates a new unconsolidated block -// using the provided values. -func NewUnconsolidatedBlockFromDatapoints( - bounds models.Bounds, - seriesValues [][]float64, -) block.Block { - meta := NewSeriesMeta("dummy", len(seriesValues)) - return NewUnconsolidatedBlockFromDatapointsWithMeta( - bounds, - meta, - seriesValues, - ) + }, time.Minute, enableBatched) } func seriesValuesToDatapoints(