Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[query] Fix a bug with step iteration allocating #2185

Merged
merged 13 commits into from
Mar 7, 2020
3 changes: 2 additions & 1 deletion src/cmd/services/m3comparator/main/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,8 @@ import (

func main() {
var (
iterPools = pools.BuildIteratorPools()
iterPools = pools.BuildIteratorPools(
pools.BuildIteratorPoolsOptions{})
poolWrapper = pools.NewPoolsWrapper(iterPools)

iOpts = instrument.NewOptions()
Expand Down
10 changes: 7 additions & 3 deletions src/cmd/services/m3coordinator/downsample/downsampler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -741,7 +741,7 @@ func newTestDownsampler(t *testing.T, opts testDownsamplerOptions) testDownsampl
rulesStore := ruleskv.NewStore(rulesKVStore, rulesStoreOpts)

tagEncoderOptions := serialize.NewTagEncoderOptions()
tagDecoderOptions := serialize.NewTagDecoderOptions()
tagDecoderOptions := serialize.NewTagDecoderOptions(serialize.TagDecoderOptionsConfig{})
tagEncoderPoolOptions := pool.NewObjectPoolOptions().
SetInstrumentOptions(instrumentOpts.
SetMetricsScope(instrumentOpts.MetricsScope().
Expand Down Expand Up @@ -808,8 +808,12 @@ func newTestID(t *testing.T, tags map[string]string) id.ID {
data, ok := tagEncoder.Data()
require.True(t, ok)

tagDecoderPool := serialize.NewTagDecoderPool(serialize.NewTagDecoderOptions(),
pool.NewObjectPoolOptions().SetSize(1))
size := 1
tagDecoderPool := serialize.NewTagDecoderPool(
serialize.NewTagDecoderOptions(serialize.TagDecoderOptionsConfig{
CheckBytesWrapperPoolSize: &size,
}),
pool.NewObjectPoolOptions().SetSize(size))
tagDecoderPool.Init()

tagDecoder := tagDecoderPool.Get()
Expand Down
4 changes: 2 additions & 2 deletions src/cmd/services/m3coordinator/ingest/m3msg/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,11 @@ package ingestm3msg

import (
"github.com/m3db/m3/src/query/storage"
"github.com/m3db/m3/src/x/serialize"
"github.com/m3db/m3/src/x/instrument"
"github.com/m3db/m3/src/x/pool"
"github.com/m3db/m3/src/x/retry"
"github.com/m3db/m3/src/x/sampler"
"github.com/m3db/m3/src/x/serialize"
xsync "github.com/m3db/m3/src/x/sync"
)

Expand Down Expand Up @@ -70,7 +70,7 @@ func (cfg Configuration) newOptions(

workers.Init()
tagDecoderPool := serialize.NewTagDecoderPool(
serialize.NewTagDecoderOptions(),
serialize.NewTagDecoderOptions(serialize.TagDecoderOptionsConfig{}),
pool.NewObjectPoolOptions().
SetInstrumentOptions(instrumentOptions.
SetMetricsScope(instrumentOptions.MetricsScope().
Expand Down
8 changes: 5 additions & 3 deletions src/collector/api/v1/handler/json/report_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,9 @@ import (

"github.com/m3db/m3/src/collector/reporter"
"github.com/m3db/m3/src/metrics/metric/id"
"github.com/m3db/m3/src/x/serialize"
"github.com/m3db/m3/src/x/instrument"
"github.com/m3db/m3/src/x/pool"
"github.com/m3db/m3/src/x/serialize"

"github.com/golang/mock/gomock"
"github.com/stretchr/testify/assert"
Expand Down Expand Up @@ -191,10 +191,12 @@ func newTestReportHandler(ctrl *gomock.Controller) testReportHandler {
reporter := reporter.NewMockReporter(ctrl)
poolOpts := pool.NewObjectPoolOptions().SetSize(1)
tagEncoderPool := serialize.NewTagEncoderPool(
serialize.NewTagEncoderOptions(), poolOpts)
serialize.NewTagEncoderOptions(),
poolOpts)
tagEncoderPool.Init()
tagDecoderPool := serialize.NewTagDecoderPool(
serialize.NewTagDecoderOptions(), poolOpts)
serialize.NewTagDecoderOptions(serialize.TagDecoderOptionsConfig{}),
poolOpts)
tagDecoderPool.Init()
instrumentOpts := instrument.NewOptions()

Expand Down
2 changes: 1 addition & 1 deletion src/collector/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ func Run(runOpts RunOptions) {
}

tagEncoderOptions := serialize.NewTagEncoderOptions()
tagDecoderOptions := serialize.NewTagDecoderOptions()
tagDecoderOptions := serialize.NewTagDecoderOptions(serialize.TagDecoderOptionsConfig{})
tagEncoderPoolOptions := pool.NewObjectPoolOptions().
SetInstrumentOptions(instrumentOpts.
SetMetricsScope(instrumentOpts.MetricsScope().
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -294,7 +294,9 @@ func initTestFetchTaggedPools() *testFetchTaggedPools {
pools.checkedBytesWrapper = xpool.NewCheckedBytesWrapperPool(opts)
pools.checkedBytesWrapper.Init()

pools.tagDecoder = serialize.NewTagDecoderPool(serialize.NewTagDecoderOptions(), opts)
pools.tagDecoder = serialize.NewTagDecoderPool(
serialize.NewTagDecoderOptions(serialize.TagDecoderOptionsConfig{}),
opts)
pools.tagDecoder.Init()

return pools
Expand Down
2 changes: 1 addition & 1 deletion src/dbnode/client/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -343,7 +343,7 @@ func newOptions() *options {
tagEncoderPoolSize: defaultTagEncoderPoolSize,
tagEncoderOpts: serialize.NewTagEncoderOptions(),
tagDecoderPoolSize: defaultTagDecoderPoolSize,
tagDecoderOpts: serialize.NewTagDecoderOptions(),
tagDecoderOpts: serialize.NewTagDecoderOptions(serialize.TagDecoderOptionsConfig{}),
streamBlocksRetrier: defaultStreamBlocksRetrier,
writeOperationPoolSize: defaultWriteOpPoolSize,
writeTaggedOperationPoolSize: defaultWriteTaggedOpPoolSize,
Expand Down
6 changes: 4 additions & 2 deletions src/dbnode/client/session_fetch_bulk_blocks_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,9 +63,11 @@ var (
nsRetentionOpts = retention.NewOptions().
SetBlockSize(blockSize).
SetRetentionPeriod(48 * blockSize)
testTagDecodingPool = serialize.NewTagDecoderPool(serialize.NewTagDecoderOptions(),
testTagDecodingPool = serialize.NewTagDecoderPool(
serialize.NewTagDecoderOptions(serialize.TagDecoderOptionsConfig{}),
pool.NewObjectPoolOptions().SetSize(1))
testTagEncodingPool = serialize.NewTagEncoderPool(serialize.NewTagEncoderOptions(),
testTagEncodingPool = serialize.NewTagEncoderPool(
serialize.NewTagEncoderOptions(),
pool.NewObjectPoolOptions().SetSize(1))
testIDPool = newSessionTestOptions().IdentifierPool()
fooID = ident.StringID("foo")
Expand Down
6 changes: 4 additions & 2 deletions src/dbnode/network/server/tchannelthrift/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,11 +62,13 @@ func NewOptions() Options {
})

tagEncoderPool := serialize.NewTagEncoderPool(
serialize.NewTagEncoderOptions(), poolOptions)
serialize.NewTagEncoderOptions(),
poolOptions)
tagEncoderPool.Init()

tagDecoderPool := serialize.NewTagDecoderPool(
serialize.NewTagDecoderOptions(), poolOptions)
serialize.NewTagDecoderOptions(serialize.TagDecoderOptionsConfig{}),
poolOptions)
tagDecoderPool.Init()

bytesWrapperPool := xpool.NewCheckedBytesWrapperPool(poolOptions)
Expand Down
3 changes: 2 additions & 1 deletion src/dbnode/persist/fs/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,8 @@ func NewOptions() Options {
serialize.NewTagEncoderOptions(), pool.NewObjectPoolOptions())
tagEncoderPool.Init()
tagDecoderPool := serialize.NewTagDecoderPool(
serialize.NewTagDecoderOptions(), pool.NewObjectPoolOptions())
serialize.NewTagDecoderOptions(serialize.TagDecoderOptionsConfig{}),
pool.NewObjectPoolOptions())
tagDecoderPool.Init()
fstOptions := fst.NewOptions()

Expand Down
3 changes: 2 additions & 1 deletion src/dbnode/persist/fs/read_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,8 @@ func init() {
})
testBytesPool.Init()
testTagDecoderPool = serialize.NewTagDecoderPool(
serialize.NewTagDecoderOptions(), pool.NewObjectPoolOptions())
serialize.NewTagDecoderOptions(serialize.TagDecoderOptionsConfig{}),
pool.NewObjectPoolOptions())
testTagDecoderPool.Init()
}

Expand Down
2 changes: 1 addition & 1 deletion src/dbnode/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -432,7 +432,7 @@ func Run(runOpts RunOptions) {
scope.SubScope("tag-encoder-pool")))
tagEncoderPool.Init()
tagDecoderPool := serialize.NewTagDecoderPool(
serialize.NewTagDecoderOptions(),
serialize.NewTagDecoderOptions(serialize.TagDecoderOptionsConfig{}),
poolOptions(
policy.TagDecoderPool,
scope.SubScope("tag-decoder-pool")))
Expand Down
164 changes: 119 additions & 45 deletions src/query/pools/query_pools.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,9 @@ package pools
import (
"io"

"github.com/m3db/m3/src/dbnode/namespace"
"github.com/m3db/m3/src/dbnode/encoding"
"github.com/m3db/m3/src/dbnode/encoding/m3tsz"
"github.com/m3db/m3/src/dbnode/namespace"
"github.com/m3db/m3/src/dbnode/x/xpool"
xconfig "github.com/m3db/m3/src/x/config"
"github.com/m3db/m3/src/x/ident"
Expand All @@ -39,11 +39,32 @@ import (

const (
// TODO: add capabilities to get this from configs
replicas = 1
iteratorPoolSize = 65536
checkedBytesWrapperPoolSize = 65536
defaultIdentifierPoolSize = 8192
defaultBucketCapacity = 256
defaultReplicas = 3
defaultSeriesIteratorPoolSize = 2 << 12 // ~8k
defaultCheckedBytesWrapperPoolSize = 2 << 12 // ~8k
defaultBucketCapacity = 256
defaultPoolableConcurrentQueries = 64
defaultPoolableSeriesPerQuery = 4096
defaultSeriesReplicaReaderPoolSize = defaultPoolableConcurrentQueries * defaultPoolableSeriesPerQuery * defaultReplicas
)

var (
defaultSeriesIteratorsPoolBuckets = []pool.Bucket{
{
Capacity: defaultPoolableSeriesPerQuery,
Count: defaultPoolableConcurrentQueries,
},
}
defaultSeriesIDBytesPoolBuckets = []pool.Bucket{
{
Capacity: 256, // Can pool IDs up to 256 in size with this bucket.
Count: defaultPoolableSeriesPerQuery,
},
{
Capacity: 1024, // Can pool IDs up to 1024 in size with this bucket.
Count: defaultPoolableSeriesPerQuery,
},
}
)

// BuildWorkerPools builds a worker pool
Expand Down Expand Up @@ -116,75 +137,128 @@ func (s sessionPools) TagDecoder() serialize.TagDecoderPool {
return s.tagDecoder
}

func buildBuckets() []pool.Bucket {
return []pool.Bucket{
{Capacity: defaultBucketCapacity, Count: defaultIdentifierPoolSize},
// BuildIteratorPoolsOptions is a set of build iterator pools.
type BuildIteratorPoolsOptions struct {
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: maybe better to take in these values as a constructor rather than exposing the internal variables for mutation?

Replicas int
SeriesIteratorPoolSize int
SeriesIteratorsPoolBuckets []pool.Bucket
SeriesIDBytesPoolBuckets []pool.Bucket
CheckedBytesWrapperPoolSize int
}

// ReplicasOrDefault returns the replicas or default.
func (o BuildIteratorPoolsOptions) ReplicasOrDefault() int {
if o.Replicas <= 0 {
return defaultReplicas
}
return o.Replicas
}

// SeriesIteratorPoolSizeOrDefault returns the replicas or default.
func (o BuildIteratorPoolsOptions) SeriesIteratorPoolSizeOrDefault() int {
if o.SeriesIteratorPoolSize <= 0 {
return defaultSeriesIteratorPoolSize
}
return o.SeriesIteratorPoolSize
}

// CheckedBytesWrapperPoolSizeOrDefault returns the checked bytes
// wrapper pool size or default.
func (o BuildIteratorPoolsOptions) CheckedBytesWrapperPoolSizeOrDefault() int {
if o.CheckedBytesWrapperPoolSize <= 0 {
return defaultCheckedBytesWrapperPoolSize
}
return o.CheckedBytesWrapperPoolSize
}

// SeriesIteratorsPoolBucketsOrDefault returns the series iterator pool
// buckets or defaults.
func (o BuildIteratorPoolsOptions) SeriesIteratorsPoolBucketsOrDefault() []pool.Bucket {
if len(o.SeriesIteratorsPoolBuckets) == 0 {
return defaultSeriesIteratorsPoolBuckets
}
return o.SeriesIteratorsPoolBuckets
}

// SeriesIDBytesPoolBucketsOrDefault returns the bytes pool buckets or defaults.
func (o BuildIteratorPoolsOptions) SeriesIDBytesPoolBucketsOrDefault() []pool.Bucket {
if len(o.SeriesIDBytesPoolBuckets) == 0 {
return defaultSeriesIDBytesPoolBuckets
}
return o.SeriesIDBytesPoolBuckets
}

// BuildIteratorPools build iterator pools if they are unavailable from
// m3db (e.g. if running standalone query)
func BuildIteratorPools() encoding.IteratorPools {
func BuildIteratorPools(
opts BuildIteratorPoolsOptions,
) encoding.IteratorPools {
// TODO: add instrumentation options to these pools
pools := sessionPools{}
pools.multiReaderIteratorArray = encoding.NewMultiReaderIteratorArrayPool([]pool.Bucket{
pool.Bucket{
Capacity: replicas,
Count: iteratorPoolSize,
},
})

defaultPerSeriesIteratorsBuckets := opts.SeriesIteratorsPoolBucketsOrDefault()

pools.multiReaderIteratorArray = encoding.NewMultiReaderIteratorArrayPool(defaultPerSeriesIteratorsBuckets)
pools.multiReaderIteratorArray.Init()

size := replicas * iteratorPoolSize
poolOpts := pool.NewObjectPoolOptions().
SetSize(size)
pools.multiReaderIterator = encoding.NewMultiReaderIteratorPool(poolOpts)
encodingOpts := encoding.NewOptions()
readerIterAlloc := func(r io.Reader, _ namespace.SchemaDescr) encoding.ReaderIterator {
intOptimized := m3tsz.DefaultIntOptimizationEnabled
return m3tsz.NewReaderIterator(r, intOptimized, encodingOpts)
}
defaultPerSeriesPoolOpts := pool.NewObjectPoolOptions().
SetSize(opts.SeriesIteratorPoolSizeOrDefault())

pools.multiReaderIterator.Init(readerIterAlloc)
readerIteratorPoolPoolOpts := pool.NewObjectPoolOptions().
SetSize(opts.SeriesIteratorPoolSizeOrDefault() * opts.ReplicasOrDefault())

seriesIteratorPoolOpts := pool.NewObjectPoolOptions().
SetSize(iteratorPoolSize)
pools.seriesIterator = encoding.NewSeriesIteratorPool(seriesIteratorPoolOpts)
readerIteratorPool := encoding.NewReaderIteratorPool(readerIteratorPoolPoolOpts)

encodingOpts := encoding.NewOptions().
SetReaderIteratorPool(readerIteratorPool)

readerIteratorPool.Init(func(r io.Reader, descr namespace.SchemaDescr) encoding.ReaderIterator {
return m3tsz.NewReaderIterator(r, m3tsz.DefaultIntOptimizationEnabled, encodingOpts)
})

pools.multiReaderIterator = encoding.NewMultiReaderIteratorPool(defaultPerSeriesPoolOpts)
pools.multiReaderIterator.Init(func(r io.Reader, s namespace.SchemaDescr) encoding.ReaderIterator {
iter := readerIteratorPool.Get()
iter.Reset(r, s)
return iter
})

pools.seriesIterator = encoding.NewSeriesIteratorPool(defaultPerSeriesPoolOpts)
pools.seriesIterator.Init()

pools.seriesIterators = encoding.NewMutableSeriesIteratorsPool(buildBuckets())
pools.seriesIterators = encoding.NewMutableSeriesIteratorsPool(defaultPerSeriesIteratorsBuckets)
pools.seriesIterators.Init()

wrapperPoolOpts := pool.NewObjectPoolOptions().
SetSize(checkedBytesWrapperPoolSize)
SetSize(opts.CheckedBytesWrapperPoolSizeOrDefault())
pools.checkedBytesWrapper = xpool.NewCheckedBytesWrapperPool(wrapperPoolOpts)
pools.checkedBytesWrapper.Init()

pools.tagEncoder = serialize.NewTagEncoderPool(
serialize.NewTagEncoderOptions(),
pool.NewObjectPoolOptions(),
)
defaultPerSeriesPoolOpts)
pools.tagEncoder.Init()

pools.tagDecoder = serialize.NewTagDecoderPool(
serialize.NewTagDecoderOptions(),
pool.NewObjectPoolOptions(),
)
tagDecoderCheckBytesWrapperPoolSize := 0
tagDecoderOpts := serialize.NewTagDecoderOptions(serialize.TagDecoderOptionsConfig{
// We pass in a preallocated pool so use a zero sized pool in options init.
CheckBytesWrapperPoolSize: &tagDecoderCheckBytesWrapperPoolSize,
})
tagDecoderOpts = tagDecoderOpts.SetCheckedBytesWrapperPool(pools.checkedBytesWrapper)

pools.tagDecoder = serialize.NewTagDecoderPool(tagDecoderOpts, defaultPerSeriesPoolOpts)
pools.tagDecoder.Init()

bytesPool := pool.NewCheckedBytesPool(buildBuckets(), nil,
func(sizes []pool.Bucket) pool.BytesPool {
bytesPool := pool.NewCheckedBytesPool(opts.SeriesIDBytesPoolBucketsOrDefault(),
nil, func(sizes []pool.Bucket) pool.BytesPool {
return pool.NewBytesPool(sizes, nil)
})
bytesPool.Init()

idPoolOpts := pool.NewObjectPoolOptions().
SetSize(defaultIdentifierPoolSize)

pools.id = ident.NewPool(bytesPool, ident.PoolOptions{
IDPoolOptions: idPoolOpts,
TagsPoolOptions: idPoolOpts,
TagsIteratorPoolOptions: idPoolOpts,
IDPoolOptions: defaultPerSeriesPoolOpts,
TagsPoolOptions: defaultPerSeriesPoolOpts,
TagsIteratorPoolOptions: defaultPerSeriesPoolOpts,
})

return pools
Expand Down
Loading