diff --git a/src/query/storage/converter.go b/src/query/storage/converter.go index befb0231d4..b1f3556677 100644 --- a/src/query/storage/converter.go +++ b/src/query/storage/converter.go @@ -135,51 +135,83 @@ func TimestampToTime(timestampMS int64) time.Time { // TimeToTimestamp converts a time.Time to prometheus timestamp func TimeToTimestamp(timestamp time.Time) int64 { + // Significantly faster than time.Truncate() return timestamp.UnixNano() / int64(time.Millisecond) } -// FetchResultToPromResult converts fetch results from M3 to Prometheus result +// FetchResultToPromResult converts fetch results from M3 to Prometheus result. +// TODO(rartoul): We should pool all of these intermediary datastructures, or +// at least the []*prompb.Sample (as thats the most heavily allocated object) +// since we have full control over the lifecycle. func FetchResultToPromResult(result *FetchResult) *prompb.QueryResult { - timeseries := make([]*prompb.TimeSeries, 0) - + // Perform bulk allocation upfront then convert to pointers afterwards + // to reduce total number of allocations. See BenchmarkFetchResultToPromResult + // if modifying. + timeseries := make([]prompb.TimeSeries, 0, len(result.SeriesList)) for _, series := range result.SeriesList { promTs := SeriesToPromTS(series) timeseries = append(timeseries, promTs) } + timeSeriesPointers := make([]*prompb.TimeSeries, 0, len(result.SeriesList)) + for i := range timeseries { + timeSeriesPointers = append(timeSeriesPointers, ×eries[i]) + } + return &prompb.QueryResult{ - Timeseries: timeseries, + Timeseries: timeSeriesPointers, } } -// SeriesToPromTS converts a series to prometheus timeseries -func SeriesToPromTS(series *ts.Series) *prompb.TimeSeries { +// SeriesToPromTS converts a series to prometheus timeseries. +func SeriesToPromTS(series *ts.Series) prompb.TimeSeries { labels := TagsToPromLabels(series.Tags) samples := SeriesToPromSamples(series) - return &prompb.TimeSeries{Labels: labels, Samples: samples} + return prompb.TimeSeries{Labels: labels, Samples: samples} } -// TagsToPromLabels converts tags to prometheus labels +// TagsToPromLabels converts tags to prometheus labels. func TagsToPromLabels(tags models.Tags) []*prompb.Label { - labels := make([]*prompb.Label, 0, len(tags)) + // Perform bulk allocation upfront then convert to pointers afterwards + // to reduce total number of allocations. See BenchmarkFetchResultToPromResult + // if modifying. + labels := make([]prompb.Label, 0, len(tags)) for _, t := range tags { - labels = append(labels, &prompb.Label{Name: t.Name, Value: t.Value}) + labels = append(labels, prompb.Label{Name: t.Name, Value: t.Value}) + } + + labelsPointers := make([]*prompb.Label, 0, len(tags)) + for i := range labels { + labelsPointers = append(labelsPointers, &labels[i]) } - return labels + return labelsPointers } -// SeriesToPromSamples series datapoints to prometheus samples +// SeriesToPromSamples series datapoints to prometheus samples.SeriesToPromSamples. func SeriesToPromSamples(series *ts.Series) []*prompb.Sample { - samples := make([]*prompb.Sample, series.Len()) - for i := 0; i < series.Len(); i++ { - samples[i] = &prompb.Sample{ - Timestamp: series.Values().DatapointAt(i).Timestamp.UnixNano() / int64(time.Millisecond), - Value: series.Values().ValueAt(i), - } + var ( + seriesLen = series.Len() + values = series.Values() + datapoints = values.Datapoints() + // Perform bulk allocation upfront then convert to pointers afterwards + // to reduce total number of allocations. See BenchmarkFetchResultToPromResult + // if modifying. + samples = make([]prompb.Sample, 0, seriesLen) + ) + for _, dp := range datapoints { + samples = append(samples, prompb.Sample{ + Timestamp: TimeToTimestamp(dp.Timestamp), + Value: dp.Value, + }) + } + + samplesPointers := make([]*prompb.Sample, 0, len(samples)) + for i := range samples { + samplesPointers = append(samplesPointers, &samples[i]) } - return samples + return samplesPointers } func iteratorToTsSeries( diff --git a/src/query/storage/converter_test.go b/src/query/storage/converter_test.go index 6f06f18bf7..8c6fe5ea2d 100644 --- a/src/query/storage/converter_test.go +++ b/src/query/storage/converter_test.go @@ -31,6 +31,7 @@ import ( "github.com/m3db/m3/src/query/generated/proto/prompb" "github.com/m3db/m3/src/query/models" "github.com/m3db/m3/src/query/test/seriesiter" + "github.com/m3db/m3/src/query/ts" "github.com/m3db/m3x/ident" "github.com/m3db/m3x/pool" xsync "github.com/m3db/m3x/sync" @@ -229,3 +230,46 @@ func TestPromReadQueryToM3(t *testing.T) { }) } } + +var ( + benchResult *prompb.QueryResult +) + +// BenchmarkFetchResultToPromResult-8 100 10563444 ns/op 25368543 B/op 4443 allocs/op +func BenchmarkFetchResultToPromResult(b *testing.B) { + var ( + numSeries = 1000 + numDatapointsPerSeries = 1000 + numTagsPerSeries = 10 + fr = &FetchResult{ + SeriesList: make(ts.SeriesList, 0, numSeries), + } + ) + + for i := 0; i < numSeries; i++ { + values := make(ts.Datapoints, 0, numDatapointsPerSeries) + for i := 0; i < numDatapointsPerSeries; i++ { + values = append(values, ts.Datapoint{ + Timestamp: time.Time{}, + Value: float64(i), + }) + } + + tags := make(models.Tags, 0, numTagsPerSeries) + for i := 0; i < numTagsPerSeries; i++ { + tags = append(tags, models.Tag{ + Name: fmt.Sprintf("name-%d", i), + Value: fmt.Sprintf("value-%d", i), + }) + } + + series := ts.NewSeries( + fmt.Sprintf("series-%d", i), values, tags) + + fr.SeriesList = append(fr.SeriesList, series) + } + + for i := 0; i < b.N; i++ { + benchResult = FetchResultToPromResult(fr) + } +} diff --git a/src/query/ts/values.go b/src/query/ts/values.go index 27a5995551..6e12f4671b 100644 --- a/src/query/ts/values.go +++ b/src/query/ts/values.go @@ -46,6 +46,9 @@ type Values interface { // DatapointAt returns the datapoint at the nth element DatapointAt(n int) Datapoint + // Datapoints returns all the datapoints + Datapoints() []Datapoint + // AlignToBounds returns values aligned to the start time and duration AlignToBounds(bounds models.Bounds) []Datapoints } @@ -68,6 +71,9 @@ func (d Datapoints) ValueAt(n int) float64 { return d[n].Value } // DatapointAt returns the value at the nth element. func (d Datapoints) DatapointAt(n int) Datapoint { return d[n] } +// Datapoints returns all the datapoints. +func (d Datapoints) Datapoints() []Datapoint { return d } + // Values returns the values representation. func (d Datapoints) Values() []float64 { values := make([]float64, len(d)) @@ -137,6 +143,13 @@ func (b *fixedResolutionValues) DatapointAt(point int) Datapoint { Value: b.ValueAt(point), } } +func (b *fixedResolutionValues) Datapoints() []Datapoint { + datapoints := make([]Datapoint, 0, len(b.values)) + for i := range b.values { + datapoints = append(datapoints, b.DatapointAt(i)) + } + return datapoints +} // AlignToBounds returns values aligned to given bounds. // TODO: Consider bounds as well