Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[dbnode] Aggregate() using only FSTs where possible #1545

Merged
merged 1 commit into from
Apr 23, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 16 additions & 1 deletion src/dbnode/generated-source-files.mk
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,9 @@ genny-map-storage-index-aggregation-results: genny-map-storage-index-aggregate-v

# generation rule for all generated arraypools
.PHONY: genny-arraypool-all
genny-arraypool-all: genny-arraypool-node-segments
genny-arraypool-all: \
genny-arraypool-node-segments \
genny-arraypool-aggregate-results-entry \

# arraypool generation rule for ./network/server/tchannelthrift/node/segmentsArrayPool
.PHONY: genny-arraypool-node-segments
Expand All @@ -186,6 +188,19 @@ genny-arraypool-node-segments:
rename_type_middle=Segments \
rename_constructor=newSegmentsArrayPool

# arraypool generation rule for ./storage/index/AggregateResultsEntryArrayPool
.PHONY: genny-arraypool-aggregate-results-entry
genny-arraypool-aggregate-results-entry:
cd $(m3x_package_path) && make genny-arraypool \
pkg=index \
elem_type=AggregateResultsEntry \
target_package=$(m3db_package)/src/dbnode/storage/index \
out_file=aggregate_results_entry_arraypool_gen.go \
rename_type_prefix=AggregateResultsEntry \
rename_type_middle=AggregateResultsEntry \
rename_constructor=NewAggregateResultsEntryArrayPool \
rename_gen_types=true \

# generation rule for all generated leakcheckpools
.PHONY: genny-leakcheckpool-all
genny-leakcheckpool-all: \
Expand Down
134 changes: 96 additions & 38 deletions src/dbnode/storage/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ import (
"github.com/m3db/m3/src/dbnode/storage/index/convert"
"github.com/m3db/m3/src/dbnode/storage/namespace"
"github.com/m3db/m3/src/m3ninx/doc"
"github.com/m3db/m3/src/m3ninx/idx"
m3ninxindex "github.com/m3db/m3/src/m3ninx/index"
"github.com/m3db/m3/src/m3ninx/index/segment"
"github.com/m3db/m3/src/m3ninx/index/segment/builder"
Expand Down Expand Up @@ -72,6 +73,10 @@ const (
nsIndexReportStatsInterval = 10 * time.Second
)

var (
allQuery = idx.NewAllQuery()
)

// nolint: maligned
type nsIndex struct {
state nsIndexState
Expand Down Expand Up @@ -167,6 +172,23 @@ type newNamespaceIndexOpts struct {
newBlockFn newBlockFn
}

// execBlockQueryFn executes a query against the given block whilst tracking state.
type execBlockQueryFn func(
cancellable *resource.CancellableLifetime,
block index.Block,
query index.Query,
opts index.QueryOptions,
state *asyncQueryExecState,
results index.BaseResults,
)

// asyncQueryExecState tracks the async execution errors and results for a query.
type asyncQueryExecState struct {
sync.Mutex
multiErr xerrors.MultiError
exhaustive bool
}

// newNamespaceIndex returns a new namespaceIndex for the provided namespace.
func newNamespaceIndex(
nsMD namespace.Metadata,
Expand Down Expand Up @@ -265,6 +287,7 @@ func newNamespaceIndexWithOptions(
queryWorkersPool: newIndexOpts.opts.QueryIDsWorkerPool(),
metrics: newNamespaceIndexMetrics(indexOpts, instrumentOpts),
}

if runtimeOptsMgr != nil {
idx.runtimeOptsListener = runtimeOptsMgr.RegisterListener(idx)
}
Expand Down Expand Up @@ -869,7 +892,7 @@ func (i *nsIndex) Query(
SizeLimit: opts.Limit,
})
ctx.RegisterFinalizer(results)
exhaustive, err := i.query(ctx, query, results, opts)
exhaustive, err := i.query(ctx, query, results, opts, i.execBlockQueryFn)
if err != nil {
return index.QueryResult{}, err
}
Expand All @@ -892,7 +915,12 @@ func (i *nsIndex) AggregateQuery(
Type: opts.Type,
})
ctx.RegisterFinalizer(results)
exhaustive, err := i.query(ctx, query, results, opts.QueryOptions)
// use appropriate fn to query underlying blocks.
fn := i.execBlockQueryFn
if query.Equal(allQuery) {
fn = i.execBlockAggregateQueryFn
}
exhaustive, err := i.query(ctx, query, results, opts.QueryOptions, fn)
if err != nil {
return index.AggregateQueryResult{}, err
}
Expand All @@ -907,6 +935,7 @@ func (i *nsIndex) query(
query index.Query,
results index.BaseResults,
opts index.QueryOptions,
execBlockFn execBlockQueryFn,
) (bool, error) {
// Capture start before needing to acquire lock.
start := i.nowFn()
Expand Down Expand Up @@ -942,50 +971,19 @@ func (i *nsIndex) query(
}

var (
deadline = start.Add(timeout)
wg sync.WaitGroup

// State contains concurrent mutable state for async execution below.
state = struct {
sync.Mutex
multiErr xerrors.MultiError
exhaustive bool
}{
state = asyncQueryExecState{
exhaustive: true,
}
deadline = start.Add(timeout)
wg sync.WaitGroup
)

// Create a cancellable lifetime and cancel it at end of this method so that
// no child async task modifies the result after this method returns.
cancellable := resource.NewCancellableLifetime()
defer cancellable.Cancel()

execBlockQuery := func(block index.Block) {
blockExhaustive, err := block.Query(cancellable, query, opts, results)
if err == index.ErrUnableToQueryBlockClosed {
// NB(r): Because we query this block outside of the results lock, it's
// possible this block may get closed if it slides out of retention, in
// that case those results are no longer considered valid and outside of
// retention regardless, so this is a non-issue.
err = nil
}

state.Lock()
defer state.Unlock()

if err != nil {
state.multiErr = state.multiErr.Add(err)
return
}

if blockExhaustive {
return
}

// If block had more data but we stopped early, need to notify caller.
state.exhaustive = false
}

for _, block := range blocks {
// Capture block for async query execution below.
block := block
Expand All @@ -1011,7 +1009,7 @@ func (i *nsIndex) query(
// No timeout, just wait blockingly for a worker.
wg.Add(1)
i.queryWorkersPool.Go(func() {
execBlockQuery(block)
execBlockFn(cancellable, block, query, opts, &state, results)
wg.Done()
})
continue
Expand All @@ -1022,7 +1020,7 @@ func (i *nsIndex) query(
if timeLeft := deadline.Sub(i.nowFn()); timeLeft > 0 {
wg.Add(1)
timedOut := !i.queryWorkersPool.GoWithTimeout(func() {
execBlockQuery(block)
execBlockFn(cancellable, block, query, opts, &state, results)
wg.Done()
}, timeLeft)

Expand Down Expand Up @@ -1087,6 +1085,66 @@ func (i *nsIndex) query(
return exhaustive, nil
}

func (i *nsIndex) execBlockQueryFn(
cancellable *resource.CancellableLifetime,
block index.Block,
query index.Query,
opts index.QueryOptions,
state *asyncQueryExecState,
results index.BaseResults,
) {
blockExhaustive, err := block.Query(cancellable, query, opts, results)
if err == index.ErrUnableToQueryBlockClosed {
// NB(r): Because we query this block outside of the results lock, it's
// possible this block may get closed if it slides out of retention, in
// that case those results are no longer considered valid and outside of
// retention regardless, so this is a non-issue.
err = nil
}

state.Lock()
defer state.Unlock()

if err != nil {
state.multiErr = state.multiErr.Add(err)
}
state.exhaustive = state.exhaustive && blockExhaustive
}

func (i *nsIndex) execBlockAggregateQueryFn(
cancellable *resource.CancellableLifetime,
block index.Block,
query index.Query,
opts index.QueryOptions,
state *asyncQueryExecState,
results index.BaseResults,
) {
aggResults, ok := results.(index.AggregateResults)
if !ok { // should never happen
state.Lock()
state.multiErr = state.multiErr.Add(
fmt.Errorf("unknown results type [%T] received during aggregation", results))
state.Unlock()
return
}

blockExhaustive, err := block.Aggregate(cancellable, opts, aggResults)
if err == index.ErrUnableToQueryBlockClosed {
// NB(r): Because we query this block outside of the results lock, it's
// possible this block may get closed if it slides out of retention, in
// that case those results are no longer considered valid and outside of
// retention regardless, so this is a non-issue.
err = nil
}

state.Lock()
defer state.Unlock()
if err != nil {
state.multiErr = state.multiErr.Add(err)
}
state.exhaustive = state.exhaustive && blockExhaustive
}

func (i *nsIndex) timeoutForQueryWithRLock(
ctx context.Context,
) time.Duration {
Expand Down
43 changes: 43 additions & 0 deletions src/dbnode/storage/index/aggregate_results.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,49 @@ func (r *aggregatedResults) AddDocuments(batch []doc.Document) (int, error) {
return size, err
}

func (r *aggregatedResults) AggregateResultsOptions() AggregateResultsOptions {
return r.aggregateOpts
}

func (r *aggregatedResults) AddFields(batch []AggregateResultsEntry) int {
r.Lock()
for _, entry := range batch {
f := entry.Field
aggValues, ok := r.resultsMap.Get(f)
if !ok {
aggValues = r.valuesPool.Get()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How do these get reset? I don't see it after any Get() calls nor in the generated code on put. Are we just trusting that they've been properly reset here?

Copy link
Collaborator Author

@prateek prateek Apr 18, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The way it's supposed to work: we register a Finalizer on the incoming context to ensure the Finalize() method on the object is called (which in turn calls Reset(nil, ...) which does the actual releasing).

That said, the current code doesn't look to be registering a Finalizer on the context in either Query/Aggregate. Will put up another PR for this.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

#1567 as the follow up.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is it cheap to do a reset after pulling out the pool? I generally prefer that pattern over trust that every Put does a proper reset but I trust you understand this lifecycle better than I do

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah Reset is O(items in the map); so should be free considering the Put is doing the right thing. Don't need to do it tho, we ensure to cleanup before the Put

I prefer this approach (assume object returned from pool is valid) cause it puts the cleanup penalty on the last callsite to use the object (as opposed to the new callsite to receive the object). Seems like a "fairer" way to tax users

// we can avoid the copy because we assume ownership of the passed ident.ID,
// but still need to finalize it.
r.resultsMap.SetUnsafe(f, aggValues, AggregateResultsMapSetUnsafeOptions{
NoCopyKey: true,
NoFinalizeKey: false,
})
} else {
// because we already have a entry for this field, we release the ident back to
// the underlying pool.
f.Finalize()
}
valuesMap := aggValues.Map()
for _, t := range entry.Terms {
if !valuesMap.Contains(t) {
// we can avoid the copy because we assume ownership of the passed ident.ID,
// but still need to finalize it.
valuesMap.SetUnsafe(t, struct{}{}, AggregateValuesMapSetUnsafeOptions{
NoCopyKey: true,
NoFinalizeKey: false,
})
} else {
// because we already have a entry for this term, we release the ident back to
// the underlying pool.
t.Finalize()
}
}
}
size := r.resultsMap.Len()
r.Unlock()
return size
}

func (r *aggregatedResults) addDocumentsBatchWithLock(
batch []doc.Document,
) error {
Expand Down
Loading