From 39cf584a98c96d59475837dbd5344aaf1dc66008 Mon Sep 17 00:00:00 2001 From: Artem Date: Mon, 2 Mar 2020 11:04:45 -0500 Subject: [PATCH 1/8] [query] fix a bug with step iteration allocating --- .../m3db/consolidators/step_consolidator.go | 5 +- .../ts/m3db/encoded_step_iterator_test.go | 130 ++++++++++-------- 2 files changed, 74 insertions(+), 61 deletions(-) diff --git a/src/query/ts/m3db/consolidators/step_consolidator.go b/src/query/ts/m3db/consolidators/step_consolidator.go index 37eea4601c..ea10b1e5bf 100644 --- a/src/query/ts/m3db/consolidators/step_consolidator.go +++ b/src/query/ts/m3db/consolidators/step_consolidator.go @@ -63,7 +63,7 @@ func NewStepLookbackConsolidator( startTime time.Time, fn ConsolidationFunc, ) *StepLookbackConsolidator { - datapoints := make([]ts.Datapoint, 0, initLength) + datapoints := make([]ts.Datapoint, 0, initLength*BufferSteps) buffer := make([]float64, BufferSteps) return &StepLookbackConsolidator{ lookbackDuration: lookbackDuration, @@ -108,6 +108,7 @@ func (c *StepLookbackConsolidator) ConsolidateAndMoveToNext() float64 { } val := c.unconsumed[0] - c.unconsumed = c.unconsumed[1:] + copy(c.unconsumed, c.unconsumed[1:]) + c.unconsumed = c.unconsumed[:len(c.unconsumed)-1] return val } diff --git a/src/query/ts/m3db/encoded_step_iterator_test.go b/src/query/ts/m3db/encoded_step_iterator_test.go index caeb696375..b0f7e0bb6c 100644 --- a/src/query/ts/m3db/encoded_step_iterator_test.go +++ b/src/query/ts/m3db/encoded_step_iterator_test.go @@ -24,6 +24,8 @@ import ( "fmt" "io" "os" + "runtime" + "sync" "testing" "time" @@ -373,6 +375,7 @@ const ( stepSequential iterType = iota stepParallel seriesSequential + seriesBatch ) func (t iterType) name(name string) string { @@ -384,6 +387,8 @@ func (t iterType) name(name string) string { n = "sequential" case seriesSequential: n = "series" + case seriesBatch: + n = "series_batch" default: panic(fmt.Sprint("bad iter type", t)) } @@ -391,7 +396,9 @@ func (t iterType) name(name string) string { return fmt.Sprintf("%s_%s", n, name) } -func benchmarkNextIteration(b *testing.B, iterations int, t iterType) { +type reset func() + +func setupBlock(b *testing.B, iterations int, t iterType) (block.Block, reset) { var ( seriesCount = 100 replicasCount = 3 @@ -402,7 +409,6 @@ func benchmarkNextIteration(b *testing.B, iterations int, t iterType) { iters = make([]encoding.SeriesIterator, seriesCount) itersReset = make([]func(), seriesCount) collectors = make([]consolidators.StepCollector, seriesCount) - peeks = make([]peekValue, seriesCount) encodingOpts = encoding.NewOptions() namespaceID = ident.StringID("namespace") @@ -456,7 +462,6 @@ func benchmarkNextIteration(b *testing.B, iterations int, t iterType) { } seriesID := ident.StringID(fmt.Sprintf("foo.%d", i)) - tags, err := ident.NewTagStringsIterator("foo", "bar", "baz", "qux") require.NoError(b, err) @@ -515,83 +520,89 @@ func benchmarkNextIteration(b *testing.B, iterations int, t iterType) { profilesTaken[key] = profilesTaken[key] + 1 } - if t == seriesSequential { - sm := make([]block.SeriesMeta, seriesCount) - for i := range iters { - sm[i] = block.SeriesMeta{} - } + opts := NewOptions() + if usePools { + poolOpts := xsync.NewPooledWorkerPoolOptions() + readWorkerPools, err := xsync.NewPooledWorkerPool(1024, poolOpts) + require.NoError(b, err) + readWorkerPools.Init() + opts = opts.SetReadWorkerPool(readWorkerPools) + } - it := encodedSeriesIter{ - idx: -1, - meta: block.Metadata{ - Bounds: models.Bounds{ - Start: start, - StepSize: stepSize, - Duration: window, - }, - }, + for _, reset := range itersReset { + reset() + } + + block, err := NewEncodedBlock(iters, models.Bounds{ + Start: start, + StepSize: stepSize, + Duration: window, + }, false, block.NewResultMetadata(), NewOptions()) - seriesIters: iters, - seriesMeta: sm, - lookbackDuration: time.Minute * 5, + require.NoError(b, err) + return block, func() { + for _, reset := range itersReset { + reset() } + } +} + +func benchmarkNextIteration(b *testing.B, iterations int, t iterType) { + bl, reset := setupBlock(b, iterations, t) + if t == seriesSequential { + it, err := bl.SeriesIter() + require.NoError(b, err) b.ResetTimer() for i := 0; i < b.N; i++ { - it.idx = -1 - // Reset all the underlying compressed series iterators. - for _, reset := range itersReset { - reset() - } - + reset() for it.Next() { } + require.NoError(b, it.Err()) } return } - it := &encodedStepIterWithCollector{ - stepTime: start, - blockEnd: end, - meta: block.Metadata{ - Bounds: models.Bounds{ - Start: start, - StepSize: stepSize, - Duration: window, - }, - }, + if t == seriesBatch { + batches, err := bl.MultiSeriesIter(runtime.NumCPU()) + require.NoError(b, err) - seriesCollectors: collectors, - seriesPeek: peeks, - seriesIters: iters, - } + var wg sync.WaitGroup + b.ResetTimer() + for i := 0; i < b.N; i++ { + reset() - if usePools { - opts := xsync.NewPooledWorkerPoolOptions() - readWorkerPools, err := xsync.NewPooledWorkerPool(1024, opts) - require.NoError(b, err) - readWorkerPools.Init() - it.workerPool = readWorkerPools - } + for _, batch := range batches { + it := batch.Iter + wg.Add(1) + go func() { + for it.Next() { + } - b.ResetTimer() - for i := 0; i < b.N; i++ { - it.stepTime = start - it.bufferTime = time.Time{} - it.finished = false - for i := range it.seriesPeek { - it.seriesPeek[i] = peekValue{} - } + wg.Done() + }() + } - // Reset all the underlying compressed series iterators. - for _, reset := range itersReset { - reset() + wg.Wait() + for _, batch := range batches { + require.NoError(b, batch.Iter.Err()) + } } + return + } + + it, err := bl.StepIter() + require.NoError(b, err) + + b.ResetTimer() + for i := 0; i < b.N; i++ { + reset() for it.Next() { } + require.NoError(b, it.Err()) } } @@ -638,9 +649,10 @@ func BenchmarkNextIteration(b *testing.B) { stepSequential, stepParallel, seriesSequential, + seriesBatch, } - for _, s := range []int{10, 100, 200, 500, 1000, 2000} { + for _, s := range []int{2000} { for _, t := range iterTypes { name := t.name(fmt.Sprintf("%d", s)) b.Run(name, func(b *testing.B) { From 8cd22b242d00e4d17efcae299f588c067e118c47 Mon Sep 17 00:00:00 2001 From: Artem Date: Mon, 2 Mar 2020 11:06:26 -0500 Subject: [PATCH 2/8] Re-add multiple ranges --- src/query/ts/m3db/encoded_step_iterator_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/query/ts/m3db/encoded_step_iterator_test.go b/src/query/ts/m3db/encoded_step_iterator_test.go index b0f7e0bb6c..23b596998d 100644 --- a/src/query/ts/m3db/encoded_step_iterator_test.go +++ b/src/query/ts/m3db/encoded_step_iterator_test.go @@ -652,7 +652,7 @@ func BenchmarkNextIteration(b *testing.B) { seriesBatch, } - for _, s := range []int{2000} { + for _, s := range []int{10, 100, 200, 500, 1000, 2000} { for _, t := range iterTypes { name := t.name(fmt.Sprintf("%d", s)) b.Run(name, func(b *testing.B) { From 193beb0b156e7525a7f21b7c74c26db43f930659 Mon Sep 17 00:00:00 2001 From: Artem Date: Mon, 2 Mar 2020 11:08:55 -0500 Subject: [PATCH 3/8] Update results --- .../ts/m3db/encoded_step_iterator_test.go | 63 +++++++++++-------- 1 file changed, 36 insertions(+), 27 deletions(-) diff --git a/src/query/ts/m3db/encoded_step_iterator_test.go b/src/query/ts/m3db/encoded_step_iterator_test.go index 23b596998d..6333b0af5b 100644 --- a/src/query/ts/m3db/encoded_step_iterator_test.go +++ b/src/query/ts/m3db/encoded_step_iterator_test.go @@ -617,33 +617,42 @@ var ( profilesTaken = make(map[profileTakenKey]int) ) -// $ go test -v -run none -bench BenchmarkNextIteration -// goos: darwin -// goarch: amd64 -// pkg: github.com/m3db/m3/src/query/ts/m3db -// BenchmarkNextIteration/sequential_10-12 1776 642349 ns/op -// BenchmarkNextIteration/parallel_10-12 2538 466186 ns/op -// BenchmarkNextIteration/series_10-12 1915 601583 ns/op - -// BenchmarkNextIteration/sequential_100-12 621 1945963 ns/op -// BenchmarkNextIteration/parallel_100-12 1118 1042822 ns/op -// BenchmarkNextIteration/series_100-12 834 1451031 ns/op - -// BenchmarkNextIteration/sequential_200-12 398 3002165 ns/op -// BenchmarkNextIteration/parallel_200-12 699 1613085 ns/op -// BenchmarkNextIteration/series_200-12 614 1969783 ns/op - -// BenchmarkNextIteration/sequential_500-12 214 5522765 ns/op -// BenchmarkNextIteration/parallel_500-12 382 2904843 ns/op -// BenchmarkNextIteration/series_500-12 400 2996965 ns/op - -// BenchmarkNextIteration/sequential_1000-12 129 9050684 ns/op -// BenchmarkNextIteration/parallel_1000-12 238 4775567 ns/op -// BenchmarkNextIteration/series_1000-12 289 4176052 ns/op - -// BenchmarkNextIteration/sequential_2000-12 64 16190003 ns/op -// BenchmarkNextIteration/parallel_2000-12 136 8238382 ns/op -// BenchmarkNextIteration/series_2000-12 207 5744589 ns/op +/* + $ go test -v -run none -bench BenchmarkNextIteration + goos: darwin + goarch: amd64 + pkg: github.com/m3db/m3/src/query/ts/m3db + + BenchmarkNextIteration/sequential_10-12 4112 282491 ns/op + BenchmarkNextIteration/parallel_10-12 4214 249335 ns/op + BenchmarkNextIteration/series_10-12 4515 248946 ns/op + BenchmarkNextIteration/series_batch_10-12 4434 269776 ns/op + + BenchmarkNextIteration/sequential_100-12 4069 267836 ns/op + BenchmarkNextIteration/parallel_100-12 4126 283069 ns/op + BenchmarkNextIteration/series_100-12 4146 266928 ns/op + BenchmarkNextIteration/series_batch_100-12 4399 255991 ns/op + + BenchmarkNextIteration/sequential_200-12 4267 245249 ns/op + BenchmarkNextIteration/parallel_200-12 4233 239597 ns/op + BenchmarkNextIteration/series_200-12 4365 245924 ns/op + BenchmarkNextIteration/series_batch_200-12 4485 235055 ns/op + + BenchmarkNextIteration/sequential_500-12 5108 230085 ns/op + BenchmarkNextIteration/parallel_500-12 4802 230694 ns/op + BenchmarkNextIteration/series_500-12 4831 229797 ns/op + BenchmarkNextIteration/series_batch_500-12 4880 246588 ns/op + + BenchmarkNextIteration/sequential_1000-12 3807 265449 ns/op + BenchmarkNextIteration/parallel_1000-12 5062 254942 ns/op + BenchmarkNextIteration/series_1000-12 4423 236796 ns/op + BenchmarkNextIteration/series_batch_1000-12 4772 251977 ns/op + + BenchmarkNextIteration/sequential_2000-12 4916 243593 ns/op + BenchmarkNextIteration/parallel_2000-12 4743 253677 ns/op + BenchmarkNextIteration/series_2000-12 4078 256375 ns/op + BenchmarkNextIteration/series_batch_2000-12 4465 242323 ns/op +*/ func BenchmarkNextIteration(b *testing.B) { iterTypes := []iterType{ stepSequential, From 1f767adafe2c1423ce4bea9b7e34d7f81b04ce59 Mon Sep 17 00:00:00 2001 From: Rob Skillington Date: Tue, 3 Mar 2020 09:56:49 -0500 Subject: [PATCH 4/8] Fix pools and tests --- src/query/pools/query_pools.go | 168 +++++++++++++----- src/query/pools/query_pools_test.go | 50 ++++++ src/query/test/mock_pools.go | 4 +- .../m3db/consolidators/series_consolidator.go | 20 ++- .../ts/m3db/consolidators/step_accumulator.go | 16 +- .../m3db/consolidators/step_consolidator.go | 33 +++- src/query/ts/m3db/consolidators/types.go | 2 +- .../ts/m3db/encoded_step_iterator_test.go | 46 +++-- src/query/ts/m3db/options.go | 14 +- src/x/serialize/decoder_options.go | 39 +++- 10 files changed, 313 insertions(+), 79 deletions(-) create mode 100644 src/query/pools/query_pools_test.go diff --git a/src/query/pools/query_pools.go b/src/query/pools/query_pools.go index caef363686..e30f9cb9d6 100644 --- a/src/query/pools/query_pools.go +++ b/src/query/pools/query_pools.go @@ -23,9 +23,9 @@ package pools import ( "io" - "github.com/m3db/m3/src/dbnode/namespace" "github.com/m3db/m3/src/dbnode/encoding" "github.com/m3db/m3/src/dbnode/encoding/m3tsz" + "github.com/m3db/m3/src/dbnode/namespace" "github.com/m3db/m3/src/dbnode/x/xpool" xconfig "github.com/m3db/m3/src/x/config" "github.com/m3db/m3/src/x/ident" @@ -39,11 +39,32 @@ import ( const ( // TODO: add capabilities to get this from configs - replicas = 1 - iteratorPoolSize = 65536 - checkedBytesWrapperPoolSize = 65536 - defaultIdentifierPoolSize = 8192 - defaultBucketCapacity = 256 + defaultReplicas = 3 + defaultSeriesIteratorPoolSize = 2 << 12 // ~8k + defaultCheckedBytesWrapperPoolSize = 2 << 12 // ~8k + defaultBucketCapacity = 256 + defaultPoolableConcurrentQueries = 64 + defaultPoolableSeriesPerQuery = 4096 + defaultSeriesReplicaReaderPoolSize = defaultPoolableConcurrentQueries * defaultPoolableSeriesPerQuery * defaultReplicas +) + +var ( + defaultSeriesIteratorsPoolBuckets = []pool.Bucket{ + { + Capacity: defaultPoolableSeriesPerQuery, + Count: defaultPoolableConcurrentQueries, + }, + } + defaultSeriesIDBytesPoolBuckets = []pool.Bucket{ + { + Capacity: 256, // Can pool IDs up to 256 in size with this bucket. + Count: defaultPoolableSeriesPerQuery, + }, + { + Capacity: 1024, // Can pool IDs up to 1024 in size with this bucket. + Count: defaultPoolableSeriesPerQuery, + }, + } ) // BuildWorkerPools builds a worker pool @@ -116,75 +137,132 @@ func (s sessionPools) TagDecoder() serialize.TagDecoderPool { return s.tagDecoder } -func buildBuckets() []pool.Bucket { - return []pool.Bucket{ - {Capacity: defaultBucketCapacity, Count: defaultIdentifierPoolSize}, +// iteratorPoolSize = 1000 +// checkedBytesWrapperPoolSize = 128 +// defaultIdentifierPoolSize = 128 + +// BuildIteratorPoolsOptions is a set of build iterator pools. +type BuildIteratorPoolsOptions struct { + Replicas int + SeriesIteratorPoolSize int + SeriesIteratorsPoolBuckets []pool.Bucket + SeriesIDBytesPoolBuckets []pool.Bucket + CheckedBytesWrapperPoolSize int +} + +// ReplicasOrDefault returns the replicas or default. +func (o BuildIteratorPoolsOptions) ReplicasOrDefault() int { + if o.Replicas <= 0 { + return defaultReplicas + } + return o.Replicas +} + +// SeriesIteratorPoolSizeOrDefault returns the replicas or default. +func (o BuildIteratorPoolsOptions) SeriesIteratorPoolSizeOrDefault() int { + if o.SeriesIteratorPoolSize <= 0 { + return defaultSeriesIteratorPoolSize + } + return o.SeriesIteratorPoolSize +} + +// CheckedBytesWrapperPoolSizeOrDefault returns the checked bytes +// wrapper pool size or default. +func (o BuildIteratorPoolsOptions) CheckedBytesWrapperPoolSizeOrDefault() int { + if o.CheckedBytesWrapperPoolSize <= 0 { + return defaultCheckedBytesWrapperPoolSize + } + return o.CheckedBytesWrapperPoolSize +} + +// SeriesIteratorsPoolBucketsOrDefault returns the series iterator pool +// buckets or defaults. +func (o BuildIteratorPoolsOptions) SeriesIteratorsPoolBucketsOrDefault() []pool.Bucket { + if len(o.SeriesIteratorsPoolBuckets) == 0 { + return defaultSeriesIteratorsPoolBuckets + } + return o.SeriesIteratorsPoolBuckets +} + +// SeriesIDBytesPoolBucketsOrDefault returns the bytes pool buckets or defaults. +func (o BuildIteratorPoolsOptions) SeriesIDBytesPoolBucketsOrDefault() []pool.Bucket { + if len(o.SeriesIDBytesPoolBuckets) == 0 { + return defaultSeriesIDBytesPoolBuckets } + return o.SeriesIDBytesPoolBuckets } // BuildIteratorPools build iterator pools if they are unavailable from // m3db (e.g. if running standalone query) -func BuildIteratorPools() encoding.IteratorPools { +func BuildIteratorPools( + opts BuildIteratorPoolsOptions, +) encoding.IteratorPools { // TODO: add instrumentation options to these pools pools := sessionPools{} - pools.multiReaderIteratorArray = encoding.NewMultiReaderIteratorArrayPool([]pool.Bucket{ - pool.Bucket{ - Capacity: replicas, - Count: iteratorPoolSize, - }, - }) + + defaultPerSeriesIteratorsBuckets := opts.SeriesIteratorsPoolBucketsOrDefault() + + pools.multiReaderIteratorArray = encoding.NewMultiReaderIteratorArrayPool(defaultPerSeriesIteratorsBuckets) pools.multiReaderIteratorArray.Init() - size := replicas * iteratorPoolSize - poolOpts := pool.NewObjectPoolOptions(). - SetSize(size) - pools.multiReaderIterator = encoding.NewMultiReaderIteratorPool(poolOpts) - encodingOpts := encoding.NewOptions() - readerIterAlloc := func(r io.Reader, _ namespace.SchemaDescr) encoding.ReaderIterator { - intOptimized := m3tsz.DefaultIntOptimizationEnabled - return m3tsz.NewReaderIterator(r, intOptimized, encodingOpts) - } + defaultPerSeriesPoolOpts := pool.NewObjectPoolOptions(). + SetSize(opts.SeriesIteratorPoolSizeOrDefault()) - pools.multiReaderIterator.Init(readerIterAlloc) + readerIteratorPoolPoolOpts := pool.NewObjectPoolOptions(). + SetSize(opts.SeriesIteratorPoolSizeOrDefault() * opts.ReplicasOrDefault()) - seriesIteratorPoolOpts := pool.NewObjectPoolOptions(). - SetSize(iteratorPoolSize) - pools.seriesIterator = encoding.NewSeriesIteratorPool(seriesIteratorPoolOpts) + readerIteratorPool := encoding.NewReaderIteratorPool(readerIteratorPoolPoolOpts) + + encodingOpts := encoding.NewOptions(). + SetReaderIteratorPool(readerIteratorPool) + + readerIteratorPool.Init(func(r io.Reader, descr namespace.SchemaDescr) encoding.ReaderIterator { + return m3tsz.NewReaderIterator(r, m3tsz.DefaultIntOptimizationEnabled, encodingOpts) + }) + + pools.multiReaderIterator = encoding.NewMultiReaderIteratorPool(defaultPerSeriesPoolOpts) + pools.multiReaderIterator.Init(func(r io.Reader, s namespace.SchemaDescr) encoding.ReaderIterator { + iter := readerIteratorPool.Get() + iter.Reset(r, s) + return iter + }) + + pools.seriesIterator = encoding.NewSeriesIteratorPool(defaultPerSeriesPoolOpts) pools.seriesIterator.Init() - pools.seriesIterators = encoding.NewMutableSeriesIteratorsPool(buildBuckets()) + pools.seriesIterators = encoding.NewMutableSeriesIteratorsPool(defaultPerSeriesIteratorsBuckets) pools.seriesIterators.Init() wrapperPoolOpts := pool.NewObjectPoolOptions(). - SetSize(checkedBytesWrapperPoolSize) + SetSize(opts.CheckedBytesWrapperPoolSizeOrDefault()) pools.checkedBytesWrapper = xpool.NewCheckedBytesWrapperPool(wrapperPoolOpts) pools.checkedBytesWrapper.Init() pools.tagEncoder = serialize.NewTagEncoderPool( serialize.NewTagEncoderOptions(), - pool.NewObjectPoolOptions(), - ) + defaultPerSeriesPoolOpts) pools.tagEncoder.Init() - pools.tagDecoder = serialize.NewTagDecoderPool( - serialize.NewTagDecoderOptions(), - pool.NewObjectPoolOptions(), - ) + tagDecoderCheckBytesWrapperPoolSize := 0 + tagDecoderOpts := serialize.NewTagDecoderOptions(serialize.TagDecoderOptionsConfig{ + // We pass in a preallocated pool so use a zero sized pool in options init. + CheckBytesWrapperPoolSize: &tagDecoderCheckBytesWrapperPoolSize, + }) + tagDecoderOpts = tagDecoderOpts.SetCheckedBytesWrapperPool(pools.checkedBytesWrapper) + + pools.tagDecoder = serialize.NewTagDecoderPool(tagDecoderOpts, defaultPerSeriesPoolOpts) pools.tagDecoder.Init() - bytesPool := pool.NewCheckedBytesPool(buildBuckets(), nil, - func(sizes []pool.Bucket) pool.BytesPool { + bytesPool := pool.NewCheckedBytesPool(opts.SeriesIDBytesPoolBucketsOrDefault(), + nil, func(sizes []pool.Bucket) pool.BytesPool { return pool.NewBytesPool(sizes, nil) }) bytesPool.Init() - idPoolOpts := pool.NewObjectPoolOptions(). - SetSize(defaultIdentifierPoolSize) - pools.id = ident.NewPool(bytesPool, ident.PoolOptions{ - IDPoolOptions: idPoolOpts, - TagsPoolOptions: idPoolOpts, - TagsIteratorPoolOptions: idPoolOpts, + IDPoolOptions: defaultPerSeriesPoolOpts, + TagsPoolOptions: defaultPerSeriesPoolOpts, + TagsIteratorPoolOptions: defaultPerSeriesPoolOpts, }) return pools diff --git a/src/query/pools/query_pools_test.go b/src/query/pools/query_pools_test.go new file mode 100644 index 0000000000..1a0244aee5 --- /dev/null +++ b/src/query/pools/query_pools_test.go @@ -0,0 +1,50 @@ +// Copyright (c) 2020 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package pools + +import ( + "runtime" + "strconv" + "testing" + + "github.com/stretchr/testify/require" +) + +func TestBuildIteratorPoolsHasSaneDefaults(t *testing.T) { + var stats runtime.MemStats + runtime.ReadMemStats(&stats) + + // TotalAlloc increases as heap objects are allocated, but + // unlike Alloc and HeapAlloc, it does not decrease when + // objects are freed. + totalAllocBefore := stats.TotalAlloc + + BuildIteratorPools(BuildIteratorPoolsOptions{}) + + runtime.ReadMemStats(&stats) + + allocated := int(stats.TotalAlloc - totalAllocBefore) + t.Logf("allocated %v bytes", allocated) + + upperLimit := 64 * 1024 * 1024 // 64mb + require.True(t, allocated < upperLimit, + "allocated more than "+strconv.Itoa(upperLimit)+" bytes") +} diff --git a/src/query/test/mock_pools.go b/src/query/test/mock_pools.go index 727b607f79..b543d321a1 100644 --- a/src/query/test/mock_pools.go +++ b/src/query/test/mock_pools.go @@ -107,7 +107,9 @@ func (ip *MockIteratorPool) ID() ident.Pool { // TagDecoder exposes the session's tag decoder pool func (ip *MockIteratorPool) TagDecoder() serialize.TagDecoderPool { ip.DecodePoolUsed = true - decoderPool := serialize.NewTagDecoderPool(serialize.NewTagDecoderOptions(), poolOpts) + decoderPool := serialize.NewTagDecoderPool( + serialize.NewTagDecoderOptions(serialize.TagDecoderOptionsConfig{}), + poolOpts) decoderPool.Init() return decoderPool } diff --git a/src/query/ts/m3db/consolidators/series_consolidator.go b/src/query/ts/m3db/consolidators/series_consolidator.go index 9afb710db2..eae912d684 100644 --- a/src/query/ts/m3db/consolidators/series_consolidator.go +++ b/src/query/ts/m3db/consolidators/series_consolidator.go @@ -37,6 +37,7 @@ type SeriesLookbackConsolidator struct { earliestLookback time.Time consolidated float64 datapoints []ts.Datapoint + datapointsBuffer []ts.Datapoint fn ConsolidationFunc } @@ -47,12 +48,14 @@ func NewSeriesLookbackConsolidator( startTime time.Time, fn ConsolidationFunc, ) *SeriesLookbackConsolidator { + datapoints := make([]ts.Datapoint, 0, initLength) return &SeriesLookbackConsolidator{ lookbackDuration: lookbackDuration, stepSize: stepSize, earliestLookback: startTime.Add(-1 * lookbackDuration), consolidated: math.NaN(), - datapoints: make([]ts.Datapoint, 0, initLength), + datapoints: datapoints, + datapointsBuffer: datapoints, fn: fn, } } @@ -76,6 +79,19 @@ func (c *SeriesLookbackConsolidator) ConsolidateAndMoveToNext() float64 { c.earliestLookback = c.earliestLookback.Add(c.stepSize) c.consolidated = c.fn(c.datapoints) c.datapoints = removeStale(c.earliestLookback, c.datapoints) + + // Remove any datapoints not relevant to the next step now. + datapointsRelevant := removeStale(c.earliestLookback, c.datapoints) + if len(datapointsRelevant) > 0 { + // Move them back to the start of the slice to reuse the slice + // as best as possible. + c.datapoints = c.datapointsBuffer[:len(datapointsRelevant)] + copy(c.datapoints, datapointsRelevant) + } else { + // No relevant datapoints, repoint to the start of the buffer. + c.datapoints = c.datapointsBuffer[:0] + } + return c.consolidated } @@ -91,5 +107,5 @@ func (c *SeriesLookbackConsolidator) Reset( startTime time.Time, ) { c.earliestLookback = startTime.Add(-1 * c.lookbackDuration) - c.datapoints = c.datapoints[:0] + c.datapoints = c.datapointsBuffer[:0] } diff --git a/src/query/ts/m3db/consolidators/step_accumulator.go b/src/query/ts/m3db/consolidators/step_accumulator.go index a61da25924..5a56738bfd 100644 --- a/src/query/ts/m3db/consolidators/step_accumulator.go +++ b/src/query/ts/m3db/consolidators/step_accumulator.go @@ -36,6 +36,7 @@ type StepLookbackAccumulator struct { stepSize time.Duration earliestLookback time.Time datapoints []xts.Datapoint + datapointsBuffer []xts.Datapoint buffer [][]xts.Datapoint unconsumed [][]xts.Datapoint } @@ -56,6 +57,7 @@ func NewStepLookbackAccumulator( stepSize: stepSize, earliestLookback: startTime.Add(-1 * lookbackDuration), datapoints: datapoints, + datapointsBuffer: datapoints, buffer: buffer, unconsumed: buffer[:0], } @@ -87,7 +89,7 @@ func (c *StepLookbackAccumulator) BufferStep() { accumulated := make([]xts.Datapoint, len(c.datapoints)) copy(accumulated, c.datapoints) - c.datapoints = c.datapoints[:0] + c.datapoints = c.datapointsBuffer[:0] c.unconsumed = append(c.unconsumed, accumulated) } @@ -104,6 +106,16 @@ func (c *StepLookbackAccumulator) AccumulateAndMoveToNext() []xts.Datapoint { } val := c.unconsumed[0] - c.unconsumed = c.unconsumed[1:] + remaining := c.unconsumed[1:] + + if len(remaining) > 0 { + // Move any unconsumed values to the front of unconsumed. + c.unconsumed = c.buffer[:len(remaining)] + copy(c.unconsumed, remaining) + } else { + // Otherwise just repoint to the start of the buffer. + c.unconsumed = c.buffer[:0] + } + return val } diff --git a/src/query/ts/m3db/consolidators/step_consolidator.go b/src/query/ts/m3db/consolidators/step_consolidator.go index ea10b1e5bf..0c910a8c53 100644 --- a/src/query/ts/m3db/consolidators/step_consolidator.go +++ b/src/query/ts/m3db/consolidators/step_consolidator.go @@ -48,6 +48,7 @@ type StepLookbackConsolidator struct { stepSize time.Duration earliestLookback time.Time datapoints []ts.Datapoint + datapointsBuffer []ts.Datapoint buffer []float64 unconsumed []float64 fn ConsolidationFunc @@ -63,13 +64,14 @@ func NewStepLookbackConsolidator( startTime time.Time, fn ConsolidationFunc, ) *StepLookbackConsolidator { - datapoints := make([]ts.Datapoint, 0, initLength*BufferSteps) + datapoints := make([]ts.Datapoint, 0, initLength) buffer := make([]float64, BufferSteps) return &StepLookbackConsolidator{ lookbackDuration: lookbackDuration, stepSize: stepSize, earliestLookback: startTime.Add(-1 * lookbackDuration), datapoints: datapoints, + datapointsBuffer: datapoints, buffer: buffer, unconsumed: buffer[:0], fn: fn, @@ -91,7 +93,20 @@ func (c *StepLookbackConsolidator) AddPoint(dp ts.Datapoint) { func (c *StepLookbackConsolidator) BufferStep() { c.earliestLookback = c.earliestLookback.Add(c.stepSize) val := c.fn(c.datapoints) - c.datapoints = removeStale(c.earliestLookback, c.datapoints) + + // Remove any datapoints not relevant to the next step now. + datapointsRelevant := removeStale(c.earliestLookback, c.datapoints) + if len(datapointsRelevant) > 0 { + // Move them back to the start of the slice to reuse the slice + // as best as possible. + c.datapoints = c.datapointsBuffer[:len(datapointsRelevant)] + copy(c.datapoints, datapointsRelevant) + } else { + // No relevant datapoints, repoint to the start of the buffer. + c.datapoints = c.datapointsBuffer[:0] + } + + // Blindly append to unconsumed. c.unconsumed = append(c.unconsumed, val) } @@ -107,8 +122,18 @@ func (c *StepLookbackConsolidator) ConsolidateAndMoveToNext() float64 { return c.fn(nil) } + // Consume value. val := c.unconsumed[0] - copy(c.unconsumed, c.unconsumed[1:]) - c.unconsumed = c.unconsumed[:len(c.unconsumed)-1] + remaining := c.unconsumed[1:] + + if len(remaining) > 0 { + // Move any unconsumed values to the front of unconsumed. + c.unconsumed = c.buffer[:len(remaining)] + copy(c.unconsumed, remaining) + } else { + // Otherwise just repoint to the start of the buffer. + c.unconsumed = c.buffer[:0] + } + return val } diff --git a/src/query/ts/m3db/consolidators/types.go b/src/query/ts/m3db/consolidators/types.go index 54979668e3..e0d8f39807 100644 --- a/src/query/ts/m3db/consolidators/types.go +++ b/src/query/ts/m3db/consolidators/types.go @@ -58,7 +58,7 @@ func TakeLast(values []ts.Datapoint) float64 { return math.NaN() } -const initLength = 10 +const initLength = BufferSteps // Set NaN to a variable makes tests easier. var nan = math.NaN() diff --git a/src/query/ts/m3db/encoded_step_iterator_test.go b/src/query/ts/m3db/encoded_step_iterator_test.go index 6333b0af5b..c81e5e14c7 100644 --- a/src/query/ts/m3db/encoded_step_iterator_test.go +++ b/src/query/ts/m3db/encoded_step_iterator_test.go @@ -36,10 +36,11 @@ import ( "github.com/m3db/m3/src/dbnode/x/xio" "github.com/m3db/m3/src/query/block" "github.com/m3db/m3/src/query/models" + "github.com/m3db/m3/src/query/pools" "github.com/m3db/m3/src/query/test" - "github.com/m3db/m3/src/query/ts/m3db/consolidators" "github.com/m3db/m3/src/x/checked" "github.com/m3db/m3/src/x/ident" + "github.com/m3db/m3/src/x/pool" xsync "github.com/m3db/m3/src/x/sync" xtime "github.com/m3db/m3/src/x/time" "github.com/pkg/profile" @@ -144,7 +145,7 @@ var consolidatedStepIteratorTests = []struct { func testConsolidatedStepIteratorMinuteLookback(t *testing.T, withPools bool) { for _, tt := range consolidatedStepIteratorTests { - opts := NewOptions(). + opts := newTestOptions(). SetLookbackDuration(1 * time.Minute). SetSplitSeriesByBlock(false) require.NoError(t, opts.Validate()) @@ -293,7 +294,7 @@ var consolidatedStepIteratorTestsSplitByBlock = []struct { func testConsolidatedStepIteratorSplitByBlock(t *testing.T, withPools bool) { for _, tt := range consolidatedStepIteratorTestsSplitByBlock { - opts := NewOptions(). + opts := newTestOptions(). SetLookbackDuration(0). SetSplitSeriesByBlock(true) require.NoError(t, opts.Validate()) @@ -330,7 +331,7 @@ func TestConsolidatedStepIteratorSplitByBlockSequential(t *testing.T) { } func benchmarkSingleBlock(b *testing.B, withPools bool) { - opts := NewOptions(). + opts := newTestOptions(). SetLookbackDuration(1 * time.Minute). SetSplitSeriesByBlock(false) require.NoError(b, opts.Validate()) @@ -398,9 +399,33 @@ func (t iterType) name(name string) string { type reset func() +// newTestOptions provides options with very small/non-existent pools +// so that memory profiles don't get cluttered with pooled allocated objects. +func newTestOptions() Options { + poolOpts := pool.NewObjectPoolOptions().SetSize(1) + bytesPool := pool.NewCheckedBytesPool(nil, poolOpts, + func(s []pool.Bucket) pool.BytesPool { + return pool.NewBytesPool(s, poolOpts) + }) + bytesPool.Init() + + iteratorPools := pools.BuildIteratorPools(pools.BuildIteratorPoolsOptions{ + Replicas: 1, + SeriesIteratorPoolSize: 1, + SeriesIteratorsPoolBuckets: []pool.Bucket{ + {Capacity: 1, Count: 1}, + }, + SeriesIDBytesPoolBuckets: []pool.Bucket{ + {Capacity: 1, Count: 1}, + }, + CheckedBytesWrapperPoolSize: 1, + }) + return newOptions(bytesPool, iteratorPools) +} + func setupBlock(b *testing.B, iterations int, t iterType) (block.Block, reset) { var ( - seriesCount = 100 + seriesCount = 1000 replicasCount = 3 start = time.Now() stepSize = time.Second * 10 @@ -408,19 +433,12 @@ func setupBlock(b *testing.B, iterations int, t iterType) (block.Block, reset) { end = start.Add(window) iters = make([]encoding.SeriesIterator, seriesCount) itersReset = make([]func(), seriesCount) - collectors = make([]consolidators.StepCollector, seriesCount) encodingOpts = encoding.NewOptions() namespaceID = ident.StringID("namespace") ) for i := 0; i < seriesCount; i++ { - collectors[i] = consolidators.NewStepLookbackConsolidator( - stepSize, - stepSize, - start, - consolidators.TakeLast) - encoder := m3tsz.NewEncoder(start, checked.NewBytes(nil, nil), m3tsz.DefaultIntOptimizationEnabled, encodingOpts) @@ -520,7 +538,7 @@ func setupBlock(b *testing.B, iterations int, t iterType) (block.Block, reset) { profilesTaken[key] = profilesTaken[key] + 1 } - opts := NewOptions() + opts := newTestOptions() if usePools { poolOpts := xsync.NewPooledWorkerPoolOptions() readWorkerPools, err := xsync.NewPooledWorkerPool(1024, poolOpts) @@ -537,7 +555,7 @@ func setupBlock(b *testing.B, iterations int, t iterType) (block.Block, reset) { Start: start, StepSize: stepSize, Duration: window, - }, false, block.NewResultMetadata(), NewOptions()) + }, false, block.NewResultMetadata(), opts) require.NoError(b, err) return block, func() { diff --git a/src/query/ts/m3db/options.go b/src/query/ts/m3db/options.go index 6755890c26..fbe7cd00dd 100644 --- a/src/query/ts/m3db/options.go +++ b/src/query/ts/m3db/options.go @@ -79,18 +79,20 @@ func NewOptions() Options { }) bytesPool.Init() - opts := pool.NewObjectPoolOptions().SetSize(1024) - batchPool := pool.NewObjectPool(opts) - batchPool.Init(func() interface{} { - return nextDetails{} - }) + iteratorPools := pools.BuildIteratorPools(pools.BuildIteratorPoolsOptions{}) + return newOptions(bytesPool, iteratorPools) +} +func newOptions( + bytesPool pool.CheckedBytesPool, + iteratorPools encoding.IteratorPools, +) Options { return &encodedBlockOptions{ lookbackDuration: defaultLookbackDuration, consolidationFn: defaultConsolidationFn, tagOptions: models.NewTagOptions(), iterAlloc: defaultIterAlloc, - pools: pools.BuildIteratorPools(), + pools: iteratorPools, checkedPools: bytesPool, batchingFn: defaultIteratorBatchingFn, instrumented: defaultInstrumented, diff --git a/src/x/serialize/decoder_options.go b/src/x/serialize/decoder_options.go index bfaf0e1392..d626cb2030 100644 --- a/src/x/serialize/decoder_options.go +++ b/src/x/serialize/decoder_options.go @@ -39,13 +39,44 @@ type decodeOpts struct { limits TagSerializationLimits } +// TagDecoderOptionsConfig allows for defaults to be set at initialization. +type TagDecoderOptionsConfig struct { + CheckBytesWrapperPoolSize *int `yaml:"checkBytesWrapperPoolSize"` + CheckBytesWrapperPoolLowWatermark *float64 `yaml:"checkBytesWrapperPoolLowWatermark"` + CheckBytesWrapperPoolHighWatermark *float64 `yaml:"checkBytesWrapperPoolHighWatermark"` +} + +// CheckBytesWrapperPoolSizeOrDefault returns config value or default. +func (c TagDecoderOptionsConfig) CheckBytesWrapperPoolSizeOrDefault() int { + if c.CheckBytesWrapperPoolSize == nil { + return defaultCheckBytesWrapperPoolSize + } + return *c.CheckBytesWrapperPoolSize +} + +// CheckBytesWrapperPoolLowWatermarkOrDefault returns config value or default. +func (c TagDecoderOptionsConfig) CheckBytesWrapperPoolLowWatermarkOrDefault() float64 { + if c.CheckBytesWrapperPoolLowWatermark == nil { + return defaultCheckBytesWrapperPoolLowWatermark + } + return *c.CheckBytesWrapperPoolLowWatermark +} + +// CheckBytesWrapperPoolHighWatermarkOrDefault returns config value or default. +func (c TagDecoderOptionsConfig) CheckBytesWrapperPoolHighWatermarkOrDefault() float64 { + if c.CheckBytesWrapperPoolHighWatermark == nil { + return defaultCheckBytesWrapperPoolHighWatermark + } + return *c.CheckBytesWrapperPoolHighWatermark +} + // NewTagDecoderOptions returns a new TagDecoderOptions. -func NewTagDecoderOptions() TagDecoderOptions { +func NewTagDecoderOptions(cfg TagDecoderOptionsConfig) TagDecoderOptions { pool := xpool.NewCheckedBytesWrapperPool( pool.NewObjectPoolOptions(). - SetSize(defaultCheckBytesWrapperPoolSize). - SetRefillLowWatermark(defaultCheckBytesWrapperPoolLowWatermark). - SetRefillHighWatermark(defaultCheckBytesWrapperPoolHighWatermark)) + SetSize(cfg.CheckBytesWrapperPoolSizeOrDefault()). + SetRefillLowWatermark(cfg.CheckBytesWrapperPoolLowWatermarkOrDefault()). + SetRefillHighWatermark(cfg.CheckBytesWrapperPoolHighWatermarkOrDefault())) pool.Init() return &decodeOpts{ wrapperPool: pool, From 2c2dcb21b336b6a01ada7ee9dbe0ab9a945593eb Mon Sep 17 00:00:00 2001 From: Rob Skillington Date: Fri, 6 Mar 2020 14:11:46 -0500 Subject: [PATCH 5/8] Fix build --- .../m3coordinator/downsample/downsampler_test.go | 10 +++++++--- src/cmd/services/m3coordinator/ingest/m3msg/config.go | 4 ++-- src/collector/api/v1/handler/json/report_test.go | 8 +++++--- src/collector/server/server.go | 2 +- .../fetch_tagged_results_accumulator_misc_test.go | 4 +++- src/dbnode/client/options.go | 2 +- src/dbnode/client/session_fetch_bulk_blocks_test.go | 6 ++++-- src/dbnode/network/server/tchannelthrift/options.go | 6 ++++-- src/dbnode/persist/fs/options.go | 3 ++- src/dbnode/persist/fs/read_test.go | 3 ++- src/dbnode/server/server.go | 2 +- src/query/server/query.go | 8 +++++--- 12 files changed, 37 insertions(+), 21 deletions(-) diff --git a/src/cmd/services/m3coordinator/downsample/downsampler_test.go b/src/cmd/services/m3coordinator/downsample/downsampler_test.go index c6d41f2849..c70ce6345d 100644 --- a/src/cmd/services/m3coordinator/downsample/downsampler_test.go +++ b/src/cmd/services/m3coordinator/downsample/downsampler_test.go @@ -741,7 +741,7 @@ func newTestDownsampler(t *testing.T, opts testDownsamplerOptions) testDownsampl rulesStore := ruleskv.NewStore(rulesKVStore, rulesStoreOpts) tagEncoderOptions := serialize.NewTagEncoderOptions() - tagDecoderOptions := serialize.NewTagDecoderOptions() + tagDecoderOptions := serialize.NewTagDecoderOptions(serialize.TagDecoderOptionsConfig{}) tagEncoderPoolOptions := pool.NewObjectPoolOptions(). SetInstrumentOptions(instrumentOpts. SetMetricsScope(instrumentOpts.MetricsScope(). @@ -808,8 +808,12 @@ func newTestID(t *testing.T, tags map[string]string) id.ID { data, ok := tagEncoder.Data() require.True(t, ok) - tagDecoderPool := serialize.NewTagDecoderPool(serialize.NewTagDecoderOptions(), - pool.NewObjectPoolOptions().SetSize(1)) + size := 1 + tagDecoderPool := serialize.NewTagDecoderPool( + serialize.NewTagDecoderOptions(serialize.TagDecoderOptionsConfig{ + CheckBytesWrapperPoolSize: &size, + }), + pool.NewObjectPoolOptions().SetSize(size)) tagDecoderPool.Init() tagDecoder := tagDecoderPool.Get() diff --git a/src/cmd/services/m3coordinator/ingest/m3msg/config.go b/src/cmd/services/m3coordinator/ingest/m3msg/config.go index 67c06ad46f..239c6d980b 100644 --- a/src/cmd/services/m3coordinator/ingest/m3msg/config.go +++ b/src/cmd/services/m3coordinator/ingest/m3msg/config.go @@ -22,11 +22,11 @@ package ingestm3msg import ( "github.com/m3db/m3/src/query/storage" - "github.com/m3db/m3/src/x/serialize" "github.com/m3db/m3/src/x/instrument" "github.com/m3db/m3/src/x/pool" "github.com/m3db/m3/src/x/retry" "github.com/m3db/m3/src/x/sampler" + "github.com/m3db/m3/src/x/serialize" xsync "github.com/m3db/m3/src/x/sync" ) @@ -70,7 +70,7 @@ func (cfg Configuration) newOptions( workers.Init() tagDecoderPool := serialize.NewTagDecoderPool( - serialize.NewTagDecoderOptions(), + serialize.NewTagDecoderOptions(serialize.TagDecoderOptionsConfig{}), pool.NewObjectPoolOptions(). SetInstrumentOptions(instrumentOptions. SetMetricsScope(instrumentOptions.MetricsScope(). diff --git a/src/collector/api/v1/handler/json/report_test.go b/src/collector/api/v1/handler/json/report_test.go index 98680265ac..db9a16c607 100644 --- a/src/collector/api/v1/handler/json/report_test.go +++ b/src/collector/api/v1/handler/json/report_test.go @@ -30,9 +30,9 @@ import ( "github.com/m3db/m3/src/collector/reporter" "github.com/m3db/m3/src/metrics/metric/id" - "github.com/m3db/m3/src/x/serialize" "github.com/m3db/m3/src/x/instrument" "github.com/m3db/m3/src/x/pool" + "github.com/m3db/m3/src/x/serialize" "github.com/golang/mock/gomock" "github.com/stretchr/testify/assert" @@ -191,10 +191,12 @@ func newTestReportHandler(ctrl *gomock.Controller) testReportHandler { reporter := reporter.NewMockReporter(ctrl) poolOpts := pool.NewObjectPoolOptions().SetSize(1) tagEncoderPool := serialize.NewTagEncoderPool( - serialize.NewTagEncoderOptions(), poolOpts) + serialize.NewTagEncoderOptions(), + poolOpts) tagEncoderPool.Init() tagDecoderPool := serialize.NewTagDecoderPool( - serialize.NewTagDecoderOptions(), poolOpts) + serialize.NewTagDecoderOptions(serialize.TagDecoderOptionsConfig{}), + poolOpts) tagDecoderPool.Init() instrumentOpts := instrument.NewOptions() diff --git a/src/collector/server/server.go b/src/collector/server/server.go index 819564e51c..217f20a8d8 100644 --- a/src/collector/server/server.go +++ b/src/collector/server/server.go @@ -92,7 +92,7 @@ func Run(runOpts RunOptions) { } tagEncoderOptions := serialize.NewTagEncoderOptions() - tagDecoderOptions := serialize.NewTagDecoderOptions() + tagDecoderOptions := serialize.NewTagDecoderOptions(serialize.TagDecoderOptionsConfig{}) tagEncoderPoolOptions := pool.NewObjectPoolOptions(). SetInstrumentOptions(instrumentOpts. SetMetricsScope(instrumentOpts.MetricsScope(). diff --git a/src/dbnode/client/fetch_tagged_results_accumulator_misc_test.go b/src/dbnode/client/fetch_tagged_results_accumulator_misc_test.go index 2eecde11b6..109e224268 100644 --- a/src/dbnode/client/fetch_tagged_results_accumulator_misc_test.go +++ b/src/dbnode/client/fetch_tagged_results_accumulator_misc_test.go @@ -291,7 +291,9 @@ func initTestFetchTaggedPools() *testFetchTaggedPools { pools.checkedBytesWrapper = xpool.NewCheckedBytesWrapperPool(opts) pools.checkedBytesWrapper.Init() - pools.tagDecoder = serialize.NewTagDecoderPool(serialize.NewTagDecoderOptions(), opts) + pools.tagDecoder = serialize.NewTagDecoderPool( + serialize.NewTagDecoderOptions(serialize.TagDecoderOptionsConfig{}), + opts) pools.tagDecoder.Init() return pools diff --git a/src/dbnode/client/options.go b/src/dbnode/client/options.go index 6957f6f89d..3b61f2d38f 100644 --- a/src/dbnode/client/options.go +++ b/src/dbnode/client/options.go @@ -341,7 +341,7 @@ func newOptions() *options { tagEncoderPoolSize: defaultTagEncoderPoolSize, tagEncoderOpts: serialize.NewTagEncoderOptions(), tagDecoderPoolSize: defaultTagDecoderPoolSize, - tagDecoderOpts: serialize.NewTagDecoderOptions(), + tagDecoderOpts: serialize.NewTagDecoderOptions(serialize.TagDecoderOptionsConfig{}), streamBlocksRetrier: defaultStreamBlocksRetrier, writeOperationPoolSize: defaultWriteOpPoolSize, writeTaggedOperationPoolSize: defaultWriteTaggedOpPoolSize, diff --git a/src/dbnode/client/session_fetch_bulk_blocks_test.go b/src/dbnode/client/session_fetch_bulk_blocks_test.go index 1171f93f37..50e3cfcee6 100644 --- a/src/dbnode/client/session_fetch_bulk_blocks_test.go +++ b/src/dbnode/client/session_fetch_bulk_blocks_test.go @@ -63,9 +63,11 @@ var ( nsRetentionOpts = retention.NewOptions(). SetBlockSize(blockSize). SetRetentionPeriod(48 * blockSize) - testTagDecodingPool = serialize.NewTagDecoderPool(serialize.NewTagDecoderOptions(), + testTagDecodingPool = serialize.NewTagDecoderPool( + serialize.NewTagDecoderOptions(serialize.TagDecoderOptionsConfig{}), pool.NewObjectPoolOptions().SetSize(1)) - testTagEncodingPool = serialize.NewTagEncoderPool(serialize.NewTagEncoderOptions(), + testTagEncodingPool = serialize.NewTagEncoderPool( + serialize.NewTagEncoderOptions(), pool.NewObjectPoolOptions().SetSize(1)) testIDPool = newSessionTestOptions().IdentifierPool() fooID = ident.StringID("foo") diff --git a/src/dbnode/network/server/tchannelthrift/options.go b/src/dbnode/network/server/tchannelthrift/options.go index 837d1ef6fd..9e2d7dbf21 100644 --- a/src/dbnode/network/server/tchannelthrift/options.go +++ b/src/dbnode/network/server/tchannelthrift/options.go @@ -62,11 +62,13 @@ func NewOptions() Options { }) tagEncoderPool := serialize.NewTagEncoderPool( - serialize.NewTagEncoderOptions(), poolOptions) + serialize.NewTagEncoderOptions(), + poolOptions) tagEncoderPool.Init() tagDecoderPool := serialize.NewTagDecoderPool( - serialize.NewTagDecoderOptions(), poolOptions) + serialize.NewTagDecoderOptions(serialize.TagDecoderOptionsConfig{}), + poolOptions) tagDecoderPool.Init() bytesWrapperPool := xpool.NewCheckedBytesWrapperPool(poolOptions) diff --git a/src/dbnode/persist/fs/options.go b/src/dbnode/persist/fs/options.go index 7db96c212b..deec323002 100644 --- a/src/dbnode/persist/fs/options.go +++ b/src/dbnode/persist/fs/options.go @@ -108,7 +108,8 @@ func NewOptions() Options { serialize.NewTagEncoderOptions(), pool.NewObjectPoolOptions()) tagEncoderPool.Init() tagDecoderPool := serialize.NewTagDecoderPool( - serialize.NewTagDecoderOptions(), pool.NewObjectPoolOptions()) + serialize.NewTagDecoderOptions(serialize.TagDecoderOptionsConfig{}), + pool.NewObjectPoolOptions()) tagDecoderPool.Init() fstOptions := fst.NewOptions() diff --git a/src/dbnode/persist/fs/read_test.go b/src/dbnode/persist/fs/read_test.go index 573064d4d9..073d7f4187 100644 --- a/src/dbnode/persist/fs/read_test.go +++ b/src/dbnode/persist/fs/read_test.go @@ -90,7 +90,8 @@ func init() { }) testBytesPool.Init() testTagDecoderPool = serialize.NewTagDecoderPool( - serialize.NewTagDecoderOptions(), pool.NewObjectPoolOptions()) + serialize.NewTagDecoderOptions(serialize.TagDecoderOptionsConfig{}), + pool.NewObjectPoolOptions()) testTagDecoderPool.Init() } diff --git a/src/dbnode/server/server.go b/src/dbnode/server/server.go index 3a072f4bf1..e95beda334 100644 --- a/src/dbnode/server/server.go +++ b/src/dbnode/server/server.go @@ -429,7 +429,7 @@ func Run(runOpts RunOptions) { scope.SubScope("tag-encoder-pool"))) tagEncoderPool.Init() tagDecoderPool := serialize.NewTagDecoderPool( - serialize.NewTagDecoderOptions(), + serialize.NewTagDecoderOptions(serialize.TagDecoderOptionsConfig{}), poolOptions( policy.TagDecoderPool, scope.SubScope("tag-decoder-pool"))) diff --git a/src/query/server/query.go b/src/query/server/query.go index 534d63f0f1..405af62382 100644 --- a/src/query/server/query.go +++ b/src/query/server/query.go @@ -253,7 +253,8 @@ func Run(runOpts RunOptions) { if cfg.Backend == config.GRPCStorageType { // For grpc backend, we need to setup only the grpc client and a storage // accompanying that client. - poolWrapper := pools.NewPoolsWrapper(pools.BuildIteratorPools()) + poolWrapper := pools.NewPoolsWrapper( + pools.BuildIteratorPools(pools.BuildIteratorPoolsOptions{})) remoteOpts := config.RemoteOptionsFromConfig(cfg.RPC) remotes, enabled, err := remoteClient(poolWrapper, remoteOpts, tsdbOpts, instrumentOptions) @@ -549,7 +550,7 @@ func newDownsampler( } tagEncoderOptions := serialize.NewTagEncoderOptions() - tagDecoderOptions := serialize.NewTagDecoderOptions() + tagDecoderOptions := serialize.NewTagDecoderOptions(serialize.TagDecoderOptionsConfig{}) tagEncoderPoolOptions := pool.NewObjectPoolOptions(). SetInstrumentOptions(instrumentOpts. SetMetricsScope(instrumentOpts.MetricsScope(). @@ -633,7 +634,8 @@ func initClusters( return nil, nil, errors.Wrap(err, "unable to connect to clusters") } - poolWrapper = pools.NewPoolsWrapper(pools.BuildIteratorPools()) + poolWrapper = pools.NewPoolsWrapper( + pools.BuildIteratorPools(pools.BuildIteratorPoolsOptions{})) } else { localCfg := cfg.Local if localCfg == nil { From 7a84398faf5087c1001745ba142cc772b18ca246 Mon Sep 17 00:00:00 2001 From: Rob Skillington Date: Fri, 6 Mar 2020 14:13:52 -0500 Subject: [PATCH 6/8] Remove defaults --- src/query/pools/query_pools.go | 4 ---- 1 file changed, 4 deletions(-) diff --git a/src/query/pools/query_pools.go b/src/query/pools/query_pools.go index e30f9cb9d6..65c5c7ad99 100644 --- a/src/query/pools/query_pools.go +++ b/src/query/pools/query_pools.go @@ -137,10 +137,6 @@ func (s sessionPools) TagDecoder() serialize.TagDecoderPool { return s.tagDecoder } -// iteratorPoolSize = 1000 -// checkedBytesWrapperPoolSize = 128 -// defaultIdentifierPoolSize = 128 - // BuildIteratorPoolsOptions is a set of build iterator pools. type BuildIteratorPoolsOptions struct { Replicas int From eef365feffdbe9ec957a0306f814e660ca892f47 Mon Sep 17 00:00:00 2001 From: Rob Skillington Date: Fri, 6 Mar 2020 21:04:11 -0500 Subject: [PATCH 7/8] Fix build --- src/cmd/services/m3comparator/main/main.go | 3 ++- src/query/remote/server_test.go | 3 ++- src/x/serialize/decoder_test.go | 2 +- 3 files changed, 5 insertions(+), 3 deletions(-) diff --git a/src/cmd/services/m3comparator/main/main.go b/src/cmd/services/m3comparator/main/main.go index f78a026c04..67558f8b2a 100644 --- a/src/cmd/services/m3comparator/main/main.go +++ b/src/cmd/services/m3comparator/main/main.go @@ -37,7 +37,8 @@ import ( func main() { var ( - iterPools = pools.BuildIteratorPools() + iterPools = pools.BuildIteratorPools( + pools.BuildIteratorPoolsOptions{}) poolWrapper = pools.NewPoolsWrapper(iterPools) iOpts = instrument.NewOptions() diff --git a/src/query/remote/server_test.go b/src/query/remote/server_test.go index 8f4c149fc9..df441f8bf9 100644 --- a/src/query/remote/server_test.go +++ b/src/query/remote/server_test.go @@ -52,7 +52,8 @@ import ( var ( errRead = errors.New("read error") - poolsWrapper = pools.NewPoolsWrapper(pools.BuildIteratorPools()) + poolsWrapper = pools.NewPoolsWrapper( + pools.BuildIteratorPools(pools.BuildIteratorPoolsOptions{})) ) type mockStorageOptions struct { diff --git a/src/x/serialize/decoder_test.go b/src/x/serialize/decoder_test.go index c580161e5a..ad2638a0ec 100644 --- a/src/x/serialize/decoder_test.go +++ b/src/x/serialize/decoder_test.go @@ -32,7 +32,7 @@ import ( ) var ( - testDecodeOpts = NewTagDecoderOptions() + testDecodeOpts = NewTagDecoderOptions(TagDecoderOptionsConfig{}) ) func TestEmptyDecode(t *testing.T) { From d01f453b64e00f87c65d375623938b2c817d3781 Mon Sep 17 00:00:00 2001 From: Rob Skillington Date: Fri, 6 Mar 2020 21:34:09 -0500 Subject: [PATCH 8/8] Fix not reusing same buffer --- src/query/ts/m3db/consolidators/series_consolidator.go | 8 +++----- src/query/ts/m3db/consolidators/step_accumulator.go | 4 +--- src/query/ts/m3db/consolidators/step_consolidator.go | 6 ++---- 3 files changed, 6 insertions(+), 12 deletions(-) diff --git a/src/query/ts/m3db/consolidators/series_consolidator.go b/src/query/ts/m3db/consolidators/series_consolidator.go index eae912d684..145ab1db6c 100644 --- a/src/query/ts/m3db/consolidators/series_consolidator.go +++ b/src/query/ts/m3db/consolidators/series_consolidator.go @@ -37,7 +37,6 @@ type SeriesLookbackConsolidator struct { earliestLookback time.Time consolidated float64 datapoints []ts.Datapoint - datapointsBuffer []ts.Datapoint fn ConsolidationFunc } @@ -55,7 +54,6 @@ func NewSeriesLookbackConsolidator( earliestLookback: startTime.Add(-1 * lookbackDuration), consolidated: math.NaN(), datapoints: datapoints, - datapointsBuffer: datapoints, fn: fn, } } @@ -85,11 +83,11 @@ func (c *SeriesLookbackConsolidator) ConsolidateAndMoveToNext() float64 { if len(datapointsRelevant) > 0 { // Move them back to the start of the slice to reuse the slice // as best as possible. - c.datapoints = c.datapointsBuffer[:len(datapointsRelevant)] + c.datapoints = c.datapoints[:len(datapointsRelevant)] copy(c.datapoints, datapointsRelevant) } else { // No relevant datapoints, repoint to the start of the buffer. - c.datapoints = c.datapointsBuffer[:0] + c.datapoints = c.datapoints[:0] } return c.consolidated @@ -107,5 +105,5 @@ func (c *SeriesLookbackConsolidator) Reset( startTime time.Time, ) { c.earliestLookback = startTime.Add(-1 * c.lookbackDuration) - c.datapoints = c.datapointsBuffer[:0] + c.datapoints = c.datapoints[:0] } diff --git a/src/query/ts/m3db/consolidators/step_accumulator.go b/src/query/ts/m3db/consolidators/step_accumulator.go index 5a56738bfd..eb6e052ce8 100644 --- a/src/query/ts/m3db/consolidators/step_accumulator.go +++ b/src/query/ts/m3db/consolidators/step_accumulator.go @@ -36,7 +36,6 @@ type StepLookbackAccumulator struct { stepSize time.Duration earliestLookback time.Time datapoints []xts.Datapoint - datapointsBuffer []xts.Datapoint buffer [][]xts.Datapoint unconsumed [][]xts.Datapoint } @@ -57,7 +56,6 @@ func NewStepLookbackAccumulator( stepSize: stepSize, earliestLookback: startTime.Add(-1 * lookbackDuration), datapoints: datapoints, - datapointsBuffer: datapoints, buffer: buffer, unconsumed: buffer[:0], } @@ -89,7 +87,7 @@ func (c *StepLookbackAccumulator) BufferStep() { accumulated := make([]xts.Datapoint, len(c.datapoints)) copy(accumulated, c.datapoints) - c.datapoints = c.datapointsBuffer[:0] + c.datapoints = c.datapoints[:0] c.unconsumed = append(c.unconsumed, accumulated) } diff --git a/src/query/ts/m3db/consolidators/step_consolidator.go b/src/query/ts/m3db/consolidators/step_consolidator.go index 0c910a8c53..6d53c3dabb 100644 --- a/src/query/ts/m3db/consolidators/step_consolidator.go +++ b/src/query/ts/m3db/consolidators/step_consolidator.go @@ -48,7 +48,6 @@ type StepLookbackConsolidator struct { stepSize time.Duration earliestLookback time.Time datapoints []ts.Datapoint - datapointsBuffer []ts.Datapoint buffer []float64 unconsumed []float64 fn ConsolidationFunc @@ -71,7 +70,6 @@ func NewStepLookbackConsolidator( stepSize: stepSize, earliestLookback: startTime.Add(-1 * lookbackDuration), datapoints: datapoints, - datapointsBuffer: datapoints, buffer: buffer, unconsumed: buffer[:0], fn: fn, @@ -99,11 +97,11 @@ func (c *StepLookbackConsolidator) BufferStep() { if len(datapointsRelevant) > 0 { // Move them back to the start of the slice to reuse the slice // as best as possible. - c.datapoints = c.datapointsBuffer[:len(datapointsRelevant)] + c.datapoints = c.datapoints[:len(datapointsRelevant)] copy(c.datapoints, datapointsRelevant) } else { // No relevant datapoints, repoint to the start of the buffer. - c.datapoints = c.datapointsBuffer[:0] + c.datapoints = c.datapoints[:0] } // Blindly append to unconsumed.