Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Optimize the composite aggregation for match_all and range queries #28745

Merged
merged 24 commits into from
Mar 26, 2018

Conversation

jimczi
Copy link
Contributor

@jimczi jimczi commented Feb 20, 2018

This change refactors the composite aggregation to add an execution mode that visits documents in the order of the values present in the leading source of the composite definition. This mode does not need to visit all documents since it can early terminate the collection when the leading source value is greater than the lowest value in the queue.
Instead of collecting the documents in the order of their doc_id, this mode uses the inverted lists (or the bkd tree for numerics) to collect documents
in the order of the values present in the leading source.
For instance the following aggregation:

"composite" : {
  "sources" : [
    { "value1": { "terms" : { "field": "timestamp", "order": "asc" } } }
  ],
  "size": 10
}

... can use the field timestamp to collect the documents with the 10 lowest values for the field instead of visiting all documents.
For composite aggregation with more than one source the execution can early terminate as soon as one of the 10 lowest values produces enough composite buckets. For instance if visiting the first two lowest timestamp created 10 composite buckets we can early terminate the collection since it
is guaranteed that the third lowest timestamp cannot create a composite key that compares lower than the one already visited.

This mode can execute iff:

  • The leading source in the composite definition uses an indexed field of type date (works also with date_histogram source), integer, long or keyword.
  • The query is a match_all query or a range query over the field that is used as the leading source in the composite definition.
  • The sort order of the leading source is the natural order (ascending since postings and numerics are sorted in ascending order only).

If these conditions are not met this aggregation visits each document like any other agg.

Closes #28688

This change refactors the composite aggregation to add an execution mode that visits documents in the order of the values
present in the leading source of the composite definition. This mode does not need to visit all documents since it can early terminate
the collection when the leading source value is greater than the lowest value in the queue.
Instead of collecting the documents in the order of their doc_id, this mode uses the inverted lists (or the bkd tree for numerics) to collect documents
in the order of the values present in the leading source.
For instance the following aggregation:

```
"composite" : {
  "sources" : [
    { "value1": { "terms" : { "field": "timestamp", "order": "asc" } } }
  ],
  "size": 10
}
```
... can use the field `timestamp` to collect the documents with the 10 lowest values for the field instead of visiting all documents.
For composite aggregation with more than one source the execution can early terminate as soon as one of the 10 lowest values produces enough
composite buckets. For instance if visiting the first two lowest timestamp created 10 composite buckets we can early terminate the collection since it
is guaranteed that the third lowest timestamp cannot create a composite key that compares lower than the one already visited.

This mode can execute iff:
 * The leading source in the composite definition uses an indexed field of type `date` (works also with `date_histogram` source), `integer`, `long` or `keyword`.
 * The query is a match_all query or a range query over the field that is used as the leading source in the composite definition.
 * The sort order of the leading source is the natural order (ascending since postings and numerics are sorted in ascending order only).

If these conditions are not met this aggregation visits each document like any other agg.
Copy link
Contributor

@colings86 colings86 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jimczi I left a few comments but it looks good


@Override
public boolean needsScores() {
if (collector == null) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems that this should really never occur? Should we make this an assertion instead?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

++, I replaced it with an assert.

protected boolean processBucket(LeafReaderContext context, LeafBucketCollector sub,
DocIdSetIterator iterator, Comparable<?> leadSourceBucket) throws IOException {
final int[] topCompositeCollected = new int[1];
final boolean[] hasCollected = new boolean[1];
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do these need to be arrays if they only contain a single element?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because the values are set in the anonymous class below, I find it nicer than using Atomic and I saw this pattern in other locations in the codebase.

// of the composite definition and terminates when the leading source value is guaranteed to be
// greater than the lowest composite bucket in the queue.
sortedBucketProducer.processLeaf(context.query(), ctx, sub);
throw new CollectionTerminatedException();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if it would be clearer for the SortedBucketProducer to throw the exception rather than throwing it here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I pushed a commit that adds a comment regarding why we throw an exception here.

Copy link
Contributor

@colings86 colings86 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM (when the build gets to green 😄)

Copy link
Contributor

@jpountz jpountz left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I still need to dig more to fully understand how it works but I like the idea. Some comments:

  • As we are adding more abstractions to the implementation to support optimizations, I wish we had dedicated tests for these abstractions too like the sorted docs producer, the queue, etc.
  • Can we also disable this optimization when there is a high ratio of deleted docs? I don't think that would be a problem in general as merges make sure there are no more than 50% of deleted documents, but this doesn't hold anymore for users of security since hidden docs appear as deleted. Maybe also reauires that numDocs / maxDoc > 0.5?

if (needsScores) {
Scorer scorer = weight.scorer(entry.context);
// We don't need to check if the scorer is null
// since we are sure that there are documents to replay (docIdSetIterator it not empty).
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how do we know it is not empty?

private boolean afterValueSet = false;

/**
* Ctr
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

missing text?

@jimczi
Copy link
Contributor Author

jimczi commented Feb 22, 2018

Thanks for looking @jpountz . I've pushed some commits to address your comments, the optimization is disabled when there are more than 50% of deleted documents and I've added more tests for the new abstractions. Can you take another look ?

@jpountz
Copy link
Contributor

jpountz commented Feb 23, 2018

Jim and I discussed moving away from the deferring framework so that it looks a bit less weird eg. due to feeding collect with out-of-order ids.

@jimczi
Copy link
Contributor Author

jimczi commented Mar 12, 2018

I pushed a commit that rewrites the deferring framework, @jpountz can you take another look ?

Copy link
Contributor

@jpountz jpountz left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I still need to pursue this review (this is a big change!) but at first sight I find it more readable than the previous version!

afterValue = (BytesRef) value;
} else if (value.getClass() == String.class) {
afterValue = new BytesRef((String) value);
} else {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we need to accept both BytesRef and String?

private LeafBucketCollector getSecondPassCollector(LeafBucketCollector subCollector) {
return new LeafBucketCollector() {
@Override
public void collect(int doc, long bucket) throws IOException {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's assert that bucket is 0?

return new LeafBucketCollector() {
@Override
public void collect(int doc, long bucket) throws IOException {
currentValue = filterValue;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we need to set it for every doc?

if (dvs.advanceExact(doc)) {
int num = dvs.docValueCount();
for (int i = 0; i < num; i++) {
currentValue = dvs.nextValue();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this means currentValue will always be the higher value in case of a multi-valued field, is that ok?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

currentValue is only valid for the current composite bucket, next.collect() below will fill the other sources's currentValue and the last collector in the chain will check if the final composite bucket should be added in the queue. We don't use currentValue outside of these recursive calls.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see, thanks.

Copy link
Contributor

@jpountz jpountz left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK, I had a more thorough review and I like the change in general. There is one or two places where it might make too strong assumptions about the value of the cost but other than that it looks good to me. I'd also like to see more comments to explain how things work. I suggested some javadocs improvements.

}

/**
* The type of this source.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe mention how it's used?

abstract String type();

/**
* Copies the current value in <code>slot</code>.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe say how it's supposed to know about the current value?

abstract int compare(int from, int to);

/**
* Compares the current value with the value in <code>slot</code>.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe say that the current value is the one from the last copyCurrent call?

deferredCollectors.preCollection();
for (Entry entry : entries) {
DocIdSetIterator docIdSetIterator = entry.docIdSet.iterator();
if (docIdSetIterator == null || docIdSetIterator.cost() == 0) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

using cost()==0 is a bit unsafe since cost() may be completely off

* @param vs
* @param format
* @param order
*/
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

doesn't look useful?

final int[] topCompositeCollected = new int[1];
final boolean[] hasCollected = new boolean[1];
int cost = (int) iterator.cost();
final DocIdSetBuilder.BulkAdder adder = builder != null ? builder.grow(cost) : null;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This optimization is unsafe since cost may be inaccurate. I would be ok if it was only called with postings, but it looks like this method is sometimes called with the result of DocIdSetBuilder.finish which gives approximate costs in the dense case.

@jimczi
Copy link
Contributor Author

jimczi commented Mar 19, 2018

@jpountz I pushed more changes to fix the issue with the cost approximation and added some javadocs. I think it's ready for another round ;)

Copy link
Contributor

@jpountz jpountz left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I left some minor comments. Otherwise LGTM. Since this is a rather large change I'd rather not fold it into 6.3 and wait for 6.4.

// we need to add the matching document in the builder
// so we build a bulk adder from the approximate cost of the iterator
// and rebuild the adder during the collection if needed
int remainingBits = (int) iterator.cost();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's do a Math.min(cost, Integer.MAX_VALUE) rather than a blind cast?


@Override
public void grow(int count) {
remaining = count;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think you need to count the number of remaining docs here, the BKD tree does it for you

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I need it because we build one doc id set per bucket (not per bkd leaf) so if the values are different inside a leaf I need to know the number of remaining docs in that leaf to create the new doc id set builder.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see, thanks for explaining.

// we need to add the matching document in the builder
// so we build a bulk adder from the approximate cost of the iterator
// and rebuild the adder during the collection if needed
int remainingBits = (int) iterator.cost();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's do min(Integer.MAX_VALUE, iterator.cost())

@jimczi jimczi removed the v6.3.0 label Mar 22, 2018
@jimczi jimczi added the v6.3.0 label Mar 23, 2018
@jimczi jimczi merged commit 5288235 into elastic:master Mar 26, 2018
@jimczi jimczi deleted the composite_sort_optim branch March 26, 2018 07:51
jimczi added a commit that referenced this pull request Mar 26, 2018
…28745)

This change refactors the composite aggregation to add an execution mode that visits documents in the order of the values
present in the leading source of the composite definition. This mode does not need to visit all documents since it can early terminate
the collection when the leading source value is greater than the lowest value in the queue.
Instead of collecting the documents in the order of their doc_id, this mode uses the inverted lists (or the bkd tree for numerics) to collect documents
in the order of the values present in the leading source.
For instance the following aggregation:

```
"composite" : {
  "sources" : [
    { "value1": { "terms" : { "field": "timestamp", "order": "asc" } } }
  ],
  "size": 10
}
```
... can use the field `timestamp` to collect the documents with the 10 lowest values for the field instead of visiting all documents.
For composite aggregation with more than one source the execution can early terminate as soon as one of the 10 lowest values produces enough
composite buckets. For instance if visiting the first two lowest timestamp created 10 composite buckets we can early terminate the collection since it
is guaranteed that the third lowest timestamp cannot create a composite key that compares lower than the one already visited.

This mode can execute iff:
 * The leading source in the composite definition uses an indexed field of type `date` (works also with `date_histogram` source), `integer`, `long` or `keyword`.
 * The query is a match_all query or a range query over the field that is used as the leading source in the composite definition.
 * The sort order of the leading source is the natural order (ascending since postings and numerics are sorted in ascending order only).

If these conditions are not met this aggregation visits each document like any other agg.
jimczi added a commit that referenced this pull request Mar 26, 2018
`allow_partial_search_results` is not needed for these tests.
jasontedor added a commit to jasontedor/elasticsearch that referenced this pull request Mar 27, 2018
* master: (40 commits)
  Do not optimize append-only if seen normal op with higher seqno (elastic#28787)
  [test] packaging: gradle tasks for groovy tests (elastic#29046)
  Prune only gc deletes below local checkpoint (elastic#28790)
  remove testUnassignedShardAndEmptyNodesInRoutingTable
  elastic#28745: remove extra option in the composite rest tests
  Fold EngineDiskUtils into Store, for better lock semantics (elastic#29156)
  Add file permissions checks to precommit task
  Remove execute mode bit from source files
  Optimize the composite aggregation for match_all and range queries (elastic#28745)
  [Docs] Add rank_eval size parameter k (elastic#29218)
  [DOCS] Remove ignore_z_value parameter link
  Docs: Update docs/index_.asciidoc (elastic#29172)
  Docs: Link C++ client lib elasticlient (elastic#28949)
  [DOCS] Unregister repository instead of deleting it (elastic#29206)
  Docs: HighLevelRestClient#multiSearch (elastic#29144)
  Add Z value support to geo_shape
  Remove type casts in logging in server component (elastic#28807)
  Change BroadcastResponse from ToXContentFragment to ToXContentObject (elastic#28878)
  REST : Split `RestUpgradeAction` into two actions (elastic#29124)
  Add error file docs to important settings
  ...
martijnvg added a commit that referenced this pull request Mar 28, 2018
* es/master: (22 commits)
  Fix building Javadoc JARs on JDK for client JARs (#29274)
  Require JDK 10 to build Elasticsearch (#29174)
  Decouple NamedXContentRegistry from ElasticsearchException (#29253)
  Docs: Update generating test coverage reports (#29255)
  [TEST] Fix issue with HttpInfo passed invalid parameter
  Remove all dependencies from XContentBuilder (#29225)
  Fix sporadic failure in CompositeValuesCollectorQueueTests
  Propagate ignore_unmapped to inner_hits (#29261)
  TEST: Increase timeout for testPrimaryReplicaResyncFailed
  REST client: hosts marked dead for the first time should not be immediately retried (#29230)
  TEST: Use different translog dir for a new engine
  Make SearchStats implement Writeable (#29258)
  [Docs] Spelling and grammar changes to reindex.asciidoc (#29232)
  Do not optimize append-only if seen normal op with higher seqno (#28787)
  [test] packaging: gradle tasks for groovy tests (#29046)
  Prune only gc deletes below local checkpoint (#28790)
  remove testUnassignedShardAndEmptyNodesInRoutingTable
  #28745: remove extra option in the composite rest tests
  Fold EngineDiskUtils into Store, for better lock semantics (#29156)
  Add file permissions checks to precommit task
  ...
martijnvg added a commit that referenced this pull request Mar 28, 2018
* es/6.x:
  Fix building Javadoc JARs on JDK for client JARs (#29274)
  Require JDK 10 to build Elasticsearch (#29174)
  Decouple NamedXContentRegistry from ElasticsearchException (#29253)
  Docs: Update generating test coverage reports (#29255)
  [TEST] Fix issue with HttpInfo passed invalid parameter
  Remove all dependencies from XContentBuilder (#29225)
  Fix sporadic failure in CompositeValuesCollectorQueueTests
  Propagate ignore_unmapped to inner_hits (#29261)
  TEST: Increase timeout for testPrimaryReplicaResyncFailed
  REST client: hosts marked dead for the first time should not be immediately retried (#29230)
  Make SearchStats implement Writeable (#29258)
  [Docs] Spelling and grammar changes to reindex.asciidoc (#29232)
  [test] packaging: gradle tasks for groovy tests (#29046)
  remove testUnassignedShardAndEmptyNodesInRoutingTable
  Add file permissions checks to precommit task
  Remove execute mode bit from source files
  #28745: remove 7.x option in the composite rest tests.
  Optimize the composite aggregation for match_all and range queries (#28745)
  Clarify deprecation warning for auto_generate_phrase_query (#29204)
@boicehuang
Copy link
Contributor

boicehuang commented Dec 24, 2018

@jimczi @colings86 @jpountz

Hi! It seems that the optimization of index sorting is replaced by the execution mode that visits documents in the order of the values present in the leading source. I wonder why the previous optimization could not be retained. The condition of index sorting is easier to be met and can early terminate aggregation on each segment without the condition of leading source and query.

Grateful for any help!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants