Skip to content

Commit

Permalink
Rename document pool -> metadata pool; create new document pool
Browse files Browse the repository at this point in the history
  • Loading branch information
nbroyles committed Jan 5, 2021
1 parent dbbee60 commit 8318fe5
Show file tree
Hide file tree
Showing 15 changed files with 248 additions and 70 deletions.
4 changes: 2 additions & 2 deletions src/cmd/services/m3dbnode/config/bootstrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -279,8 +279,8 @@ func (bsc BootstrapConfiguration) New(
adminClient client.AdminClient,
) (bootstrap.ProcessProvider, error) {
idxOpts := opts.IndexOptions()
compactor, err := compaction.NewCompactor(idxOpts.DocumentArrayPool(),
index.DocumentArrayPoolCapacity,
compactor, err := compaction.NewCompactor(idxOpts.MetadataArrayPool(),
index.MetadataArrayPoolCapacity,
idxOpts.SegmentBuilderOptions(),
idxOpts.FSTSegmentOptions(),
compaction.CompactorOptions{
Expand Down
4 changes: 2 additions & 2 deletions src/dbnode/integration/integration.go
Original file line number Diff line number Diff line change
Expand Up @@ -430,8 +430,8 @@ func newCompactor(
}

func newCompactorWithErr(opts index.Options) (*compaction.Compactor, error) {
return compaction.NewCompactor(opts.DocumentArrayPool(),
index.DocumentArrayPoolCapacity,
return compaction.NewCompactor(opts.MetadataArrayPool(),
index.MetadataArrayPoolCapacity,
opts.SegmentBuilderOptions(),
opts.FSTSegmentOptions(),
compaction.CompactorOptions{
Expand Down
12 changes: 6 additions & 6 deletions src/dbnode/storage/bootstrap/bootstrapper/fs/source.go
Original file line number Diff line number Diff line change
Expand Up @@ -401,12 +401,12 @@ func (s *fileSystemSource) loadShardReadersDataIntoShardResult(
seriesCachePolicy = ropts.SeriesCachePolicy()
timesWithErrors []time.Time
nsCtx = namespace.NewContextFrom(ns)
docsPool = s.opts.IndexOptions().DocumentArrayPool()
batch = docsPool.Get()
metadataPool = s.opts.IndexOptions().MetadataArrayPool()
batch = metadataPool.Get()
totalEntries int
totalFulfilledRanges = result.NewShardTimeRanges()
)
defer docsPool.Put(batch)
defer metadataPool.Put(batch)

requestedRanges := timeWindowReaders.Ranges
remainingRanges := requestedRanges.Copy()
Expand Down Expand Up @@ -740,7 +740,7 @@ func (s *fileSystemSource) readNextEntryAndMaybeIndex(

batch = append(batch, d)

if len(batch) >= index.DocumentArrayPoolCapacity {
if len(batch) >= index.MetadataArrayPoolCapacity {
return builder.FlushBatch(batch)
}

Expand Down Expand Up @@ -857,8 +857,8 @@ func (s *fileSystemSource) read(
builder := result.NewIndexBuilder(segBuilder)

indexOpts := s.opts.IndexOptions()
compactor, err := compaction.NewCompactor(indexOpts.DocumentArrayPool(),
index.DocumentArrayPoolCapacity,
compactor, err := compaction.NewCompactor(indexOpts.MetadataArrayPool(),
index.MetadataArrayPoolCapacity,
indexOpts.SegmentBuilderOptions(),
indexOpts.FSTSegmentOptions(),
compaction.CompactorOptions{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,8 +79,8 @@ var (

func newTestOptions(t require.TestingT, filePathPrefix string) Options {
idxOpts := index.NewOptions()
compactor, err := compaction.NewCompactor(idxOpts.DocumentArrayPool(),
index.DocumentArrayPoolCapacity,
compactor, err := compaction.NewCompactor(idxOpts.MetadataArrayPool(),
index.MetadataArrayPoolCapacity,
idxOpts.SegmentBuilderOptions(),
idxOpts.FSTSegmentOptions(),
compaction.CompactorOptions{
Expand Down
4 changes: 2 additions & 2 deletions src/dbnode/storage/bootstrap/bootstrapper/peers/peers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,8 @@ func TestNewPeersBootstrapper(t *testing.T) {
defer ctrl.Finish()

idxOpts := index.NewOptions()
compactor, err := compaction.NewCompactor(idxOpts.DocumentArrayPool(),
index.DocumentArrayPoolCapacity,
compactor, err := compaction.NewCompactor(idxOpts.MetadataArrayPool(),
index.MetadataArrayPoolCapacity,
idxOpts.SegmentBuilderOptions(),
idxOpts.FSTSegmentOptions(),
compaction.CompactorOptions{
Expand Down
12 changes: 6 additions & 6 deletions src/dbnode/storage/bootstrap/bootstrapper/peers/source.go
Original file line number Diff line number Diff line change
Expand Up @@ -750,8 +750,8 @@ func (s *peersSource) readIndex(
builder := result.NewIndexBuilder(segBuilder)

indexOpts := s.opts.IndexOptions()
compactor, err := compaction.NewCompactor(indexOpts.DocumentArrayPool(),
index.DocumentArrayPoolCapacity,
compactor, err := compaction.NewCompactor(indexOpts.MetadataArrayPool(),
index.MetadataArrayPoolCapacity,
indexOpts.SegmentBuilderOptions(),
indexOpts.FSTSegmentOptions(),
compaction.CompactorOptions{
Expand Down Expand Up @@ -831,13 +831,13 @@ func (s *peersSource) processReaders(
resultLock *sync.Mutex,
) (result.ShardTimeRanges, []time.Time) {
var (
docsPool = s.opts.IndexOptions().DocumentArrayPool()
batch = docsPool.Get()
metadataPool = s.opts.IndexOptions().MetadataArrayPool()
batch = metadataPool.Get()
timesWithErrors []time.Time
totalEntries int
)
defer func() {
docsPool.Put(batch)
metadataPool.Put(batch)
// Return readers to pool.
for _, shardReaders := range timeWindowReaders.Readers {
for _, r := range shardReaders.Readers {
Expand Down Expand Up @@ -1031,7 +1031,7 @@ func (s *peersSource) readNextEntryAndMaybeIndex(

batch = append(batch, d)

if len(batch) >= index.DocumentArrayPoolCapacity {
if len(batch) >= index.MetadataArrayPoolCapacity {
return builder.FlushBatch(batch)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,8 +85,8 @@ type namespaceOption func(namespace.Options) namespace.Options

func newTestDefaultOpts(t *testing.T, ctrl *gomock.Controller) Options {
idxOpts := index.NewOptions()
compactor, err := compaction.NewCompactor(idxOpts.DocumentArrayPool(),
index.DocumentArrayPoolCapacity,
compactor, err := compaction.NewCompactor(idxOpts.MetadataArrayPool(),
index.MetadataArrayPoolCapacity,
idxOpts.SegmentBuilderOptions(),
idxOpts.FSTSegmentOptions(),
compaction.CompactorOptions{
Expand Down
12 changes: 6 additions & 6 deletions src/dbnode/storage/index/compaction/compactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ type Compactor struct {

opts CompactorOptions
writer fst.Writer
docsPool doc.DocumentArrayPool
metadataPool doc.MetadataArrayPool
docsMaxBatch int
fstOpts fst.Options
builder segment.SegmentsBuilder
Expand All @@ -71,7 +71,7 @@ type CompactorOptions struct {
// NewCompactor returns a new compactor which reuses buffers
// to avoid allocating intermediate buffers when compacting.
func NewCompactor(
docsPool doc.DocumentArrayPool,
metadataPool doc.MetadataArrayPool,
docsMaxBatch int,
builderOpts builder.Options,
fstOpts fst.Options,
Expand All @@ -88,7 +88,7 @@ func NewCompactor(
return &Compactor{
opts: opts,
writer: writer,
docsPool: docsPool,
metadataPool: metadataPool,
docsMaxBatch: docsMaxBatch,
builder: builder.NewBuilderFromSegments(builderOpts),
fstOpts: fstOpts,
Expand Down Expand Up @@ -147,9 +147,9 @@ func (c *Compactor) CompactUsingBuilder(
}

// Need to combine segments first
batch := c.docsPool.Get()
batch := c.metadataPool.Get()
defer func() {
c.docsPool.Put(batch)
c.metadataPool.Put(batch)
}()

// flushBatch is declared to reuse the same code from the
Expand Down Expand Up @@ -374,7 +374,7 @@ func (c *Compactor) Close() error {
c.closed = true

c.writer = nil
c.docsPool = nil
c.metadataPool = nil
c.fstOpts = nil
c.builder = nil
c.buff = nil
Expand Down
16 changes: 8 additions & 8 deletions src/dbnode/storage/index/compaction/compactor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,15 +69,15 @@ var (
},
}

testDocsMaxBatch = 8
testDocsPool = doc.NewDocumentArrayPool(doc.DocumentArrayPoolOpts{
testMetadataMaxBatch = 8
testMetadataPool = doc.NewMetadataArrayPool(doc.MetadataArrayPoolOpts{
Options: pool.NewObjectPoolOptions().SetSize(1),
Capacity: testDocsMaxBatch,
Capacity: testMetadataMaxBatch,
})
)

func init() {
testDocsPool.Init()
testMetadataPool.Init()
}

func TestCompactorSingleMutableSegment(t *testing.T) {
Expand All @@ -90,7 +90,7 @@ func TestCompactorSingleMutableSegment(t *testing.T) {
_, err = seg.Insert(testDocuments[1])
require.NoError(t, err)

compactor, err := NewCompactor(testDocsPool, testDocsMaxBatch,
compactor, err := NewCompactor(testMetadataPool, testMetadataMaxBatch,
testBuilderSegmentOptions, testFSTSegmentOptions, CompactorOptions{})
require.NoError(t, err)

Expand All @@ -114,7 +114,7 @@ func TestCompactorSingleMutableSegmentWithMmapDocsData(t *testing.T) {
_, err = seg.Insert(testDocuments[1])
require.NoError(t, err)

compactor, err := NewCompactor(testDocsPool, testDocsMaxBatch,
compactor, err := NewCompactor(testMetadataPool, testMetadataMaxBatch,
testBuilderSegmentOptions, testFSTSegmentOptions, CompactorOptions{
MmapDocsData: true,
})
Expand Down Expand Up @@ -143,7 +143,7 @@ func TestCompactorManySegments(t *testing.T) {
_, err = seg2.Insert(testDocuments[1])
require.NoError(t, err)

compactor, err := NewCompactor(testDocsPool, testDocsMaxBatch,
compactor, err := NewCompactor(testMetadataPool, testMetadataMaxBatch,
testBuilderSegmentOptions, testFSTSegmentOptions, CompactorOptions{})
require.NoError(t, err)

Expand Down Expand Up @@ -174,7 +174,7 @@ func TestCompactorCompactDuplicateIDsNoError(t *testing.T) {
_, err = seg2.Insert(testDocuments[1])
require.NoError(t, err)

compactor, err := NewCompactor(testDocsPool, testDocsMaxBatch,
compactor, err := NewCompactor(testMetadataPool, testMetadataMaxBatch,
testBuilderSegmentOptions, testFSTSegmentOptions, CompactorOptions{})
require.NoError(t, err)

Expand Down
12 changes: 6 additions & 6 deletions src/dbnode/storage/index/mutable_segments.go
Original file line number Diff line number Diff line change
Expand Up @@ -781,8 +781,8 @@ func (m *mutableSegmentsCompact) allocLazyBuilderAndCompactorsWithLock(
opts Options,
) error {
var (
err error
docsPool = opts.DocumentArrayPool()
err error
metadataPool = opts.MetadataArrayPool()
)
if m.segmentBuilder == nil {
builderOpts := opts.SegmentBuilderOptions().
Expand All @@ -795,8 +795,8 @@ func (m *mutableSegmentsCompact) allocLazyBuilderAndCompactorsWithLock(
}

if m.foregroundCompactor == nil {
m.foregroundCompactor, err = compaction.NewCompactor(docsPool,
DocumentArrayPoolCapacity,
m.foregroundCompactor, err = compaction.NewCompactor(metadataPool,
MetadataArrayPoolCapacity,
opts.SegmentBuilderOptions(),
opts.FSTSegmentOptions(),
compaction.CompactorOptions{
Expand All @@ -814,8 +814,8 @@ func (m *mutableSegmentsCompact) allocLazyBuilderAndCompactorsWithLock(
}

if m.backgroundCompactor == nil {
m.backgroundCompactor, err = compaction.NewCompactor(docsPool,
DocumentArrayPoolCapacity,
m.backgroundCompactor, err = compaction.NewCompactor(metadataPool,
MetadataArrayPoolCapacity,
opts.SegmentBuilderOptions(),
opts.FSTSegmentOptions(),
compaction.CompactorOptions{
Expand Down
44 changes: 38 additions & 6 deletions src/dbnode/storage/index/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,18 +40,26 @@ const (
// defaultIndexInsertMode sets the default indexing mode to synchronous.
defaultIndexInsertMode = InsertSync

// documentArrayPool size in general: 256*256*sizeof(doc.Metadata)
// = 256 * 256 * 16
// = 1mb (but with Go's heap probably 2mb)
// metadataArrayPool size in general: 256*256*sizeof(doc.Metadata)
// = 256 * 256 * 48
// =~ 3mb
// TODO(r): Make this configurable in a followup change.
metadataArrayPoolSize = 256
// MetadataArrayPoolCapacity is the capacity of the metadata array pool.
MetadataArrayPoolCapacity = 256
metadataArrayPoolMaxCapacity = 256 // Do not allow grows, since we know the size

// documentArrayPool size in general: 256*256*sizeof(doc.Document)
// = 256 * 256 * 80
// =~ 5mb
documentArrayPoolSize = 256
// DocumentArrayPoolCapacity is the capacity of the document array pool.
// DocumentArrayPoolCapacity is the capacity of the encoded document array pool.
DocumentArrayPoolCapacity = 256
documentArrayPoolMaxCapacity = 256 // Do not allow grows, since we know the size

// aggregateResultsEntryArrayPool size in general: 256*256*sizeof(doc.Field)
// = 256 * 256 * 16
// = 1mb (but with Go's heap probably 2mb)
// = 256 * 256 * 48
// =~ 3mb
// TODO(prateek): Make this configurable in a followup change.
aggregateResultsEntryArrayPoolSize = 256
aggregateResultsEntryArrayPoolCapacity = 256
Expand All @@ -65,6 +73,7 @@ var (
errOptionsAggResultsPoolUnspecified = errors.New("aggregate results pool is unset")
errOptionsAggValuesPoolUnspecified = errors.New("aggregate values pool is unset")
errOptionsDocPoolUnspecified = errors.New("docs array pool is unset")
errOptionsDocContainerPoolUnspecified = errors.New("doc container array pool is unset")
errOptionsAggResultsEntryPoolUnspecified = errors.New("aggregate results entry array pool is unset")
errIDGenerationDisabled = errors.New("id generation is disabled")
errPostingsListCacheUnspecified = errors.New("postings list cache is unset")
Expand Down Expand Up @@ -118,6 +127,7 @@ type opts struct {
aggResultsPool AggregateResultsPool
aggValuesPool AggregateValuesPool
docArrayPool doc.DocumentArrayPool
metadataArrayPool doc.MetadataArrayPool
aggResultsEntryArrayPool AggregateResultsEntryArrayPool
foregroundCompactionPlannerOpts compaction.PlannerOptions
backgroundCompactionPlannerOpts compaction.PlannerOptions
Expand Down Expand Up @@ -150,6 +160,14 @@ func NewOptions() Options {
})
docArrayPool.Init()

metadataArrayPool := doc.NewMetadataArrayPool(doc.MetadataArrayPoolOpts{
Options: pool.NewObjectPoolOptions().
SetSize(metadataArrayPoolSize),
Capacity: MetadataArrayPoolCapacity,
MaxCapacity: metadataArrayPoolMaxCapacity,
})
metadataArrayPool.Init()

aggResultsEntryArrayPool := NewAggregateResultsEntryArrayPool(AggregateResultsEntryArrayPoolOpts{
Options: pool.NewObjectPoolOptions().
SetSize(aggregateResultsEntryArrayPoolSize),
Expand All @@ -172,6 +190,7 @@ func NewOptions() Options {
aggResultsPool: aggResultsPool,
aggValuesPool: aggValuesPool,
docArrayPool: docArrayPool,
metadataArrayPool: metadataArrayPool,
aggResultsEntryArrayPool: aggResultsEntryArrayPool,
foregroundCompactionPlannerOpts: defaultForegroundCompactionOpts,
backgroundCompactionPlannerOpts: defaultBackgroundCompactionOpts,
Expand Down Expand Up @@ -206,6 +225,9 @@ func (o *opts) Validate() error {
if o.docArrayPool == nil {
return errOptionsDocPoolUnspecified
}
if o.metadataArrayPool == nil {
return errOptionsDocContainerPoolUnspecified
}
if o.aggResultsEntryArrayPool == nil {
return errOptionsAggResultsEntryPoolUnspecified
}
Expand Down Expand Up @@ -339,6 +361,16 @@ func (o *opts) DocumentArrayPool() doc.DocumentArrayPool {
return o.docArrayPool
}

func (o *opts) SetMetadataArrayPool(value doc.MetadataArrayPool) Options {
opts := *o
opts.metadataArrayPool = value
return &opts
}

func (o *opts) MetadataArrayPool() doc.MetadataArrayPool {
return o.metadataArrayPool
}

func (o *opts) SetAggregateResultsEntryArrayPool(value AggregateResultsEntryArrayPool) Options {
opts := *o
opts.aggResultsEntryArrayPool = value
Expand Down
6 changes: 6 additions & 0 deletions src/dbnode/storage/index/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -937,6 +937,12 @@ type Options interface {
// DocumentArrayPool returns the document array pool.
DocumentArrayPool() doc.DocumentArrayPool

// SetMetadataArrayPool sets the document container array pool.
SetMetadataArrayPool(value doc.MetadataArrayPool) Options

// MetadataArrayPool returns the document container array pool.
MetadataArrayPool() doc.MetadataArrayPool

// SetAggregateResultsEntryArrayPool sets the aggregate results entry array pool.
SetAggregateResultsEntryArrayPool(value AggregateResultsEntryArrayPool) Options

Expand Down
Loading

0 comments on commit 8318fe5

Please sign in to comment.