Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Rollup] Return empty response when aggs are missing #32796

Merged
merged 4 commits into from
Aug 23, 2018
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -238,11 +238,41 @@ private static SearchResponse doCombineResponse(SearchResponse liveResponse, Lis
? (InternalAggregations)liveResponse.getAggregations()
: InternalAggregations.EMPTY;

rolledResponses.forEach(r -> {
if (r == null || r.getAggregations() == null || r.getAggregations().asList().size() == 0) {
throw new RuntimeException("Expected to find aggregations in rollup response, but none found.");
int missingRollupAggs = rolledResponses.stream().mapToInt(searchResponse -> {
if (searchResponse == null
|| searchResponse.getAggregations() == null
|| searchResponse.getAggregations().asList().size() == 0) {
return 1;
}
});
return 0;
}).sum();

// We had no rollup aggs, so there is nothing to process
if (missingRollupAggs == rolledResponses.size()) {
// If we had a live response (even if it was empty) just return that
if (liveResponse != null) {
return liveResponse;
} else {
// Otherwise we just had rollup, so build an empty response
InternalSearchResponse combinedInternal = new InternalSearchResponse(SearchHits.empty(),
InternalAggregations.EMPTY, null, null,
rolledResponses.stream().anyMatch(SearchResponse::isTimedOut),
rolledResponses.stream().anyMatch(SearchResponse::isTimedOut),
rolledResponses.stream().mapToInt(SearchResponse::getNumReducePhases).sum());

int totalShards = rolledResponses.stream().mapToInt(SearchResponse::getTotalShards).sum();
int sucessfulShards = rolledResponses.stream().mapToInt(SearchResponse::getSuccessfulShards).sum();
int skippedShards = rolledResponses.stream().mapToInt(SearchResponse::getSkippedShards).sum();
long took = rolledResponses.stream().mapToLong(r -> r.getTook().getMillis()).sum() ;

// Shard failures are ignored atm, so returning an empty array is fine
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we should preserve the shard failures in the response, let's do that in a follow up ? We should also sum the number of shards (rollup + normal) in the response.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, right now we just fail the search request if there are any failures, which is why we don't bother to send any back right now.

But I agree it'd be better to send back the failures. The above was done for simplicity at the time :)

We should also sum the number of shards (rollup + normal) in the response.

Hm actually, I should tweak this whole section. I didn't sum up the shards because we return the live response if it exists. But it should be a combined sum regardless of if live/rollup exist or not. I'll tweak it a little to make this section better.

return new SearchResponse(combinedInternal, null, totalShards, sucessfulShards, skippedShards,
took, ShardSearchFailure.EMPTY_ARRAY, rolledResponses.get(0).getClusters());
}
} else if (missingRollupAggs > 0 && missingRollupAggs != rolledResponses.size()) {
// We were missing some but not all the aggs, unclear how to handle this. Bail.
throw new RuntimeException("Expected to find aggregations in rollup response, but none found.");
}

// The combination process returns a tree that is identical to the non-rolled
// which means we can use aggregation's reduce method to combine, just as if
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,18 @@ static MultiSearchRequest createMSearchRequest(SearchRequest request, NamedWrite
rolledSearchSource.size(0);
AggregatorFactories.Builder sourceAgg = request.source().aggregations();

// If there are no aggs in the request, our translation won't create any msearch.
// So just add an dummy request to the msearch and return. This is a bit silly
// but maintains how the regular search API behaves
if (sourceAgg == null || sourceAgg.count() == 0) {

// Note: we can't apply any query rewriting or filtering on the query because there
// are no validated caps, so we have no idea what job is intended here. The only thing
// this affects is doc count, since hits and aggs will both be empty it doesn't really matter.
msearch.add(new SearchRequest(context.getRollupIndices(), request.source()).types(request.types()));
return msearch;
}

// Find our list of "best" job caps
Set<RollupJobCaps> validatedCaps = new HashSet<>();
sourceAgg.getAggregatorFactories()
Expand Down Expand Up @@ -248,11 +260,6 @@ static void validateSearchRequest(SearchRequest request) {
if (request.source().explain() != null && request.source().explain()) {
throw new IllegalArgumentException("Rollup search does not support explaining.");
}

// Rollup is only useful if aggregations are set, throw an exception otherwise
if (request.source().aggregations() == null) {
throw new IllegalArgumentException("Rollup requires at least one aggregation to be set.");
}
}

static QueryBuilder rewriteQuery(QueryBuilder builder, Set<RollupJobCaps> jobCaps) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -198,10 +198,11 @@ public void testRolledMissingAggs() {
BigArrays bigArrays = new MockBigArrays(new MockPageCacheRecycler(Settings.EMPTY), new NoneCircuitBreakerService());
ScriptService scriptService = mock(ScriptService.class);

Exception e = expectThrows(RuntimeException.class,
() -> RollupResponseTranslator.combineResponses(msearch,
new InternalAggregation.ReduceContext(bigArrays, scriptService, true)));
assertThat(e.getMessage(), equalTo("Expected to find aggregations in rollup response, but none found."));
SearchResponse response = RollupResponseTranslator.combineResponses(msearch,
new InternalAggregation.ReduceContext(bigArrays, scriptService, true));
assertNotNull(response);
Aggregations responseAggs = response.getAggregations();
assertThat(responseAggs.asList().size(), equalTo(0));
}

public void testMissingRolledIndex() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -307,21 +307,22 @@ public void testExplain() {
assertThat(e.getMessage(), equalTo("Rollup search does not support explaining."));
}

public void testNoAgg() {
String[] normalIndices = new String[]{randomAlphaOfLength(10)};
public void testNoRollupAgg() {
String[] normalIndices = new String[]{};
String[] rollupIndices = new String[]{randomAlphaOfLength(10)};
TransportRollupSearchAction.RollupSearchContext ctx
= new TransportRollupSearchAction.RollupSearchContext(normalIndices, rollupIndices, Collections.emptySet());
SearchSourceBuilder source = new SearchSourceBuilder();
source.query(new MatchAllQueryBuilder());
source.size(0);
SearchRequest request = new SearchRequest(normalIndices, source);
SearchRequest request = new SearchRequest(rollupIndices, source);
NamedWriteableRegistry registry = mock(NamedWriteableRegistry.class);
Exception e = expectThrows(IllegalArgumentException.class,
() -> TransportRollupSearchAction.createMSearchRequest(request, registry, ctx));
assertThat(e.getMessage(), equalTo("Rollup requires at least one aggregation to be set."));
MultiSearchRequest msearch = TransportRollupSearchAction.createMSearchRequest(request, registry, ctx);
assertThat(msearch.requests().size(), equalTo(1));
assertThat(msearch.requests().get(0), equalTo(request));
}


public void testNoLiveNoRollup() {
String[] normalIndices = new String[0];
String[] rollupIndices = new String[0];
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,20 @@ setup:
- match: { aggregations.histo.buckets.3.key_as_string: "2017-01-01T08:00:00.000Z" }
- match: { aggregations.histo.buckets.3.doc_count: 20 }

---
"Empty aggregation":

- do:
xpack.rollup.rollup_search:
index: "foo_rollup"
body:
size: 0
aggs: {}

- length: { hits.hits: 0 }
- match: { hits.total: 0 }
- is_false: aggregations


---
"Search with Metric":
Expand Down