Skip to content

Commit

Permalink
[dbnode] Aggregate() using only FSTs where possible
Browse files Browse the repository at this point in the history
  • Loading branch information
prateek committed Apr 11, 2019
1 parent 0a395df commit 501c20d
Show file tree
Hide file tree
Showing 17 changed files with 1,239 additions and 59 deletions.
17 changes: 16 additions & 1 deletion src/dbnode/generated-source-files.mk
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,9 @@ genny-map-storage-index-aggregation-results: genny-map-storage-index-aggregate-v

# generation rule for all generated arraypools
.PHONY: genny-arraypool-all
genny-arraypool-all: genny-arraypool-node-segments
genny-arraypool-all: \
genny-arraypool-node-segments \
genny-arraypool-aggregate-results-entry \

# arraypool generation rule for ./network/server/tchannelthrift/node/segmentsArrayPool
.PHONY: genny-arraypool-node-segments
Expand All @@ -186,6 +188,19 @@ genny-arraypool-node-segments:
rename_type_middle=Segments \
rename_constructor=newSegmentsArrayPool

# arraypool generation rule for ./storage/index/AggregateResultsEntryArrayPool
.PHONY: genny-arraypool-aggregate-results-entry
genny-arraypool-aggregate-results-entry:
cd $(m3x_package_path) && make genny-arraypool \
pkg=index \
elem_type=AggregateResultsEntry \
target_package=$(m3db_package)/src/dbnode/storage/index \
out_file=aggregate_results_entry_arraypool_gen.go \
rename_type_prefix=AggregateResultsEntry \
rename_type_middle=AggregateResultsEntry \
rename_constructor=NewAggregateResultsEntryArrayPool \
rename_gen_types=true \

# generation rule for all generated leakcheckpools
.PHONY: genny-leakcheckpool-all
genny-leakcheckpool-all: \
Expand Down
147 changes: 108 additions & 39 deletions src/dbnode/storage/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,16 +41,17 @@ import (
"github.com/m3db/m3/src/dbnode/storage/index/convert"
"github.com/m3db/m3/src/dbnode/storage/namespace"
"github.com/m3db/m3/src/m3ninx/doc"
"github.com/m3db/m3/src/m3ninx/idx"
m3ninxindex "github.com/m3db/m3/src/m3ninx/index"
"github.com/m3db/m3/src/m3ninx/index/segment"
"github.com/m3db/m3/src/m3ninx/index/segment/builder"
"github.com/m3db/m3/src/x/resource"
xclose "github.com/m3db/m3/src/x/close"
"github.com/m3db/m3/src/x/context"
xerrors "github.com/m3db/m3/src/x/errors"
"github.com/m3db/m3/src/x/ident"
"github.com/m3db/m3/src/x/instrument"
xlog "github.com/m3db/m3/src/x/log"
"github.com/m3db/m3/src/x/resource"
xsync "github.com/m3db/m3/src/x/sync"
xtime "github.com/m3db/m3/src/x/time"

Expand Down Expand Up @@ -167,6 +168,23 @@ type newNamespaceIndexOpts struct {
newBlockFn newBlockFn
}

// execBlockQueryFn executes a query against the given block whilst tracking state.
type execBlockQueryFn func(
cancellable *resource.CancellableLifetime,
block index.Block,
query index.Query,
opts index.QueryOptions,
state *asyncQueryExecState,
results index.BaseResults,
)

// asyncQueryExecState tracks the async execution errors and results for a query.
type asyncQueryExecState struct {
sync.Mutex
multiErr xerrors.MultiError
exhaustive bool
}

// newNamespaceIndex returns a new namespaceIndex for the provided namespace.
func newNamespaceIndex(
nsMD namespace.Metadata,
Expand Down Expand Up @@ -265,6 +283,7 @@ func newNamespaceIndexWithOptions(
queryWorkersPool: newIndexOpts.opts.QueryIDsWorkerPool(),
metrics: newNamespaceIndexMetrics(indexOpts, instrumentOpts),
}

if runtimeOptsMgr != nil {
idx.runtimeOptsListener = runtimeOptsMgr.RegisterListener(idx)
}
Expand Down Expand Up @@ -868,7 +887,7 @@ func (i *nsIndex) Query(
results.Reset(i.nsMetadata.ID(), index.QueryResultsOptions{
SizeLimit: opts.Limit,
})
exhaustive, err := i.query(ctx, query, results, opts)
exhaustive, err := i.query(ctx, query, results, opts, i.execBlockQueryFn)
if err != nil {
return index.QueryResult{}, err
}
Expand All @@ -890,7 +909,12 @@ func (i *nsIndex) AggregateQuery(
TermFilter: opts.TermFilter,
Type: opts.Type,
})
exhaustive, err := i.query(ctx, query, results, opts.QueryOptions)
// use appropriate fn to query underlying blocks.
fn := i.execBlockQueryFn
if query.Equal(idx.NewAllQuery()) {
fn = i.execBlockAggregateQueryFn
}
exhaustive, err := i.query(ctx, query, results, opts.QueryOptions, fn)
if err != nil {
return index.AggregateQueryResult{}, err
}
Expand All @@ -905,6 +929,7 @@ func (i *nsIndex) query(
query index.Query,
results index.BaseResults,
opts index.QueryOptions,
execBlockFn execBlockQueryFn,
) (bool, error) {
// Capture start before needing to acquire lock.
start := i.nowFn()
Expand Down Expand Up @@ -940,50 +965,19 @@ func (i *nsIndex) query(
}

var (
deadline = start.Add(timeout)
wg sync.WaitGroup

// State contains concurrent mutable state for async execution below.
state = struct {
sync.Mutex
multiErr xerrors.MultiError
exhaustive bool
}{
state = asyncQueryExecState{
exhaustive: true,
}
deadline = start.Add(timeout)
wg sync.WaitGroup
)

// Create a cancellable lifetime and cancel it at end of this method so that
// no child async task modifies the result after this method returns.
cancellable := resource.NewCancellableLifetime()
defer cancellable.Cancel()

execBlockQuery := func(block index.Block) {
blockExhaustive, err := block.Query(cancellable, query, opts, results)
if err == index.ErrUnableToQueryBlockClosed {
// NB(r): Because we query this block outside of the results lock, it's
// possible this block may get closed if it slides out of retention, in
// that case those results are no longer considered valid and outside of
// retention regardless, so this is a non-issue.
err = nil
}

state.Lock()
defer state.Unlock()

if err != nil {
state.multiErr = state.multiErr.Add(err)
return
}

if blockExhaustive {
return
}

// If block had more data but we stopped early, need to notify caller.
state.exhaustive = false
}

for _, block := range blocks {
// Capture block for async query execution below.
block := block
Expand All @@ -1009,7 +1003,7 @@ func (i *nsIndex) query(
// No timeout, just wait blockingly for a worker.
wg.Add(1)
i.queryWorkersPool.Go(func() {
execBlockQuery(block)
execBlockFn(cancellable, block, query, opts, &state, results)
wg.Done()
})
continue
Expand All @@ -1020,7 +1014,7 @@ func (i *nsIndex) query(
if timeLeft := deadline.Sub(i.nowFn()); timeLeft > 0 {
wg.Add(1)
timedOut := !i.queryWorkersPool.GoWithTimeout(func() {
execBlockQuery(block)
execBlockFn(cancellable, block, query, opts, &state, results)
wg.Done()
}, timeLeft)

Expand Down Expand Up @@ -1085,6 +1079,81 @@ func (i *nsIndex) query(
return exhaustive, nil
}

func (i *nsIndex) execBlockQueryFn(
cancellable *resource.CancellableLifetime,
block index.Block,
query index.Query,
opts index.QueryOptions,
state *asyncQueryExecState,
results index.BaseResults,
) {
blockExhaustive, err := block.Query(cancellable, query, opts, results)
if err == index.ErrUnableToQueryBlockClosed {
// NB(r): Because we query this block outside of the results lock, it's
// possible this block may get closed if it slides out of retention, in
// that case those results are no longer considered valid and outside of
// retention regardless, so this is a non-issue.
err = nil
}

state.Lock()
defer state.Unlock()

if err != nil {
state.multiErr = state.multiErr.Add(err)
return
}

if blockExhaustive {
return
}

// If block had more data but we stopped early, need to notify caller.
state.exhaustive = false
}

func (i *nsIndex) execBlockAggregateQueryFn(
cancellable *resource.CancellableLifetime,
block index.Block,
query index.Query,
opts index.QueryOptions,
state *asyncQueryExecState,
results index.BaseResults,
) {
aggResults, ok := results.(index.AggregateResults)
if !ok { // should never happen
state.Lock()
state.multiErr = state.multiErr.Add(
fmt.Errorf("unknown results type [%T] received during aggregation", results))
state.Unlock()
return
}

blockExhaustive, err := block.Aggregate(cancellable, opts, aggResults)
if err == index.ErrUnableToQueryBlockClosed {
// NB(r): Because we query this block outside of the results lock, it's
// possible this block may get closed if it slides out of retention, in
// that case those results are no longer considered valid and outside of
// retention regardless, so this is a non-issue.
err = nil
}

state.Lock()
defer state.Unlock()

if err != nil {
state.multiErr = state.multiErr.Add(err)
return
}

if blockExhaustive {
return
}

// If block had more data but we stopped early, need to notify caller.
state.exhaustive = false
}

func (i *nsIndex) timeoutForQueryWithRLock(
ctx context.Context,
) time.Duration {
Expand Down
44 changes: 44 additions & 0 deletions src/dbnode/storage/index/aggregate_results.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,50 @@ func (r *aggregatedResults) AddDocuments(batch []doc.Document) (int, error) {
return size, err
}

func (r *aggregatedResults) AggregateResultsOptions() AggregateResultsOptions {
return r.aggregateOpts
}

func (r *aggregatedResults) AddFields(batch []AggregateResultsEntry) (int, error) {
r.Lock()
for _, entry := range batch {
f := entry.Field
aggValues, ok := r.resultsMap.Get(f)
if !ok {
aggValues = r.valuesPool.Get()
// we can avoid the copy because we assume ownership of the passed ident.ID,
// but still need to finalize it.
r.resultsMap.set(f, aggValues, _AggregateResultsMapKeyOptions{
copyKey: false,
finalizeKey: true,
})
} else {
// because we already have a entry for this field, we release the ident back to
// the underlying pool.
f.Finalize()
}
valuesMap := aggValues.Map()
for _, t := range entry.Terms {
_, ok := valuesMap.Get(t)
if !ok {
// we can avoid the copy because we assume ownership of the passed ident.ID,
// but still need to finalize it.
valuesMap.set(t, struct{}{}, _AggregateValuesMapKeyOptions{
copyKey: false,
finalizeKey: true,
})
} else {
// because we already have a entry for this field, we release the ident back to
// the underlying pool.
t.Finalize()
}
}
}
size := r.resultsMap.Len()
r.Unlock()
return size, nil
}

func (r *aggregatedResults) addDocumentsBatchWithLock(
batch []doc.Document,
) error {
Expand Down
Loading

0 comments on commit 501c20d

Please sign in to comment.