Skip to content

Commit

Permalink
[Transform] Fix Regression: continuous transform can fail for (date) …
Browse files Browse the repository at this point in the history
…histogram group_by(#60196)

do not create change collector if group_by configuration does not support change detection

fixes #60125
  • Loading branch information
Hendrik Muhs authored Jul 27, 2020
1 parent 2078509 commit 6398f73
Show file tree
Hide file tree
Showing 2 changed files with 114 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -500,6 +500,115 @@ public void testHistogramPivot() throws Exception {
assertOnePivotValue(transformIndex + "/_search?q=every_2:0.0", 1.0);
}

public void testContinuousPivotHistogram() throws Exception {
String indexName = "continuous_reviews_histogram";
createReviewsIndex(indexName);
String transformId = "simple_continuous_pivot";
String transformIndex = "pivot_reviews_continuous_histogram";
setupDataAccessRole(DATA_ACCESS_ROLE, indexName, transformIndex);
final Request createTransformRequest = createRequestWithAuth(
"PUT",
getTransformEndpoint() + transformId,
BASIC_AUTH_VALUE_TRANSFORM_ADMIN_WITH_SOME_DATA_ACCESS
);
String config = "{"
+ " \"source\": {\"index\":\""
+ indexName
+ "\"},"
+ " \"dest\": {\"index\":\""
+ transformIndex
+ "\"},"
+ " \"frequency\": \"1s\","
+ " \"sync\": {\"time\": {\"field\": \"timestamp\", \"delay\": \"1s\"}},"
+ " \"pivot\": {"
+ " \"group_by\": {"
+ " \"every_2\": {"
+ " \"histogram\": {"
+ " \"interval\": 2,\"field\":\"stars\""
+ " } } },"
+ " \"aggregations\": {"
+ " \"user_dc\": {"
+ " \"cardinality\": {"
+ " \"field\": \"user_id\""
+ " } } } }"
+ "}";
createTransformRequest.setJsonEntity(config);
Map<String, Object> createTransformResponse = entityAsMap(client().performRequest(createTransformRequest));
assertThat(createTransformResponse.get("acknowledged"), equalTo(Boolean.TRUE));

startAndWaitForContinuousTransform(transformId, transformIndex, null);
assertTrue(indexExists(transformIndex));

// we expect 3 documents as there shall be 5 unique star values and we are bucketing every 2 starting at 0
Map<String, Object> indexStats = getAsMap(transformIndex + "/_stats");
assertEquals(3, XContentMapValues.extractValue("_all.total.docs.count", indexStats));

Map<String, Object> searchResult = getAsMap(transformIndex + "/_search?q=every_2:0.0");
assertEquals(1, XContentMapValues.extractValue("hits.total.value", searchResult));
assertThat(((List<?>) XContentMapValues.extractValue("hits.hits._source.user_dc", searchResult)).get(0), equalTo(19.0));

searchResult = getAsMap(transformIndex + "/_search?q=every_2:2.0");
assertEquals(1, XContentMapValues.extractValue("hits.total.value", searchResult));
assertThat(((List<?>) XContentMapValues.extractValue("hits.hits._source.user_dc", searchResult)).get(0), equalTo(27.0));

searchResult = getAsMap(transformIndex + "/_search?q=every_2:4.0");
assertEquals(1, XContentMapValues.extractValue("hits.total.value", searchResult));
assertThat(((List<?>) XContentMapValues.extractValue("hits.hits._source.user_dc", searchResult)).get(0), equalTo(27.0));

final StringBuilder bulk = new StringBuilder();

long dateStamp = Instant.now().toEpochMilli() - 1_000;

// add 5 data points with 3 new users: 27, 28, 29
for (int i = 25; i < 30; i++) {
bulk.append("{\"index\":{\"_index\":\"" + indexName + "\"}}\n");
String location = (i + 10) + "," + (i + 15);

bulk.append("{\"user_id\":\"")
.append("user_")
.append(i)
.append("\",\"business_id\":\"")
.append("business_")
.append(i)
.append("\",\"stars\":")
.append(3)
.append(",\"location\":\"")
.append(location)
.append("\",\"timestamp\":")
.append(dateStamp)
.append("}\n");
}
bulk.append("\r\n");

final Request bulkRequest = new Request("POST", "/_bulk");
bulkRequest.addParameter("refresh", "true");
bulkRequest.setJsonEntity(bulk.toString());
client().performRequest(bulkRequest);

waitForTransformCheckpoint(transformId, 2);

stopTransform(transformId, false);
refreshIndex(transformIndex);

// assert after changes
// still 3 documents
indexStats = getAsMap(transformIndex + "/_stats");
assertEquals(3, XContentMapValues.extractValue("_all.total.docs.count", indexStats));

searchResult = getAsMap(transformIndex + "/_search?q=every_2:0.0");
assertEquals(1, XContentMapValues.extractValue("hits.total.value", searchResult));
assertThat(((List<?>) XContentMapValues.extractValue("hits.hits._source.user_dc", searchResult)).get(0), equalTo(19.0));

searchResult = getAsMap(transformIndex + "/_search?q=every_2:2.0");
assertEquals(1, XContentMapValues.extractValue("hits.total.value", searchResult));
assertThat(((List<?>) XContentMapValues.extractValue("hits.hits._source.user_dc", searchResult)).get(0), equalTo(30.0));

searchResult = getAsMap(transformIndex + "/_search?q=every_2:4.0");
assertEquals(1, XContentMapValues.extractValue("hits.total.value", searchResult));
assertThat(((List<?>) XContentMapValues.extractValue("hits.hits._source.user_dc", searchResult)).get(0), equalTo(27.0));

}

public void testBiggerPivot() throws Exception {
String transformId = "bigger_pivot";
String transformIndex = "bigger_pivot_reviews";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -345,7 +345,7 @@ protected void initializeFunction() {
// create the function
function = FunctionFactory.create(getConfig());

if (isContinuous()) {
if (isContinuous() && function.supportsIncrementalBucketUpdate()) {
changeCollector = function.buildChangeCollector(getConfig().getSyncConfig().getField());
}
}
Expand All @@ -370,7 +370,7 @@ protected void onFinish(ActionListener<Void> listener) {
// reset the page size, so we do not memorize a low page size forever
pageSize = function.getInitialPageSize();
// reset the changed bucket to free memory
if (isContinuous()) {
if (changeCollector != null) {
changeCollector.clear();
}

Expand Down Expand Up @@ -749,7 +749,9 @@ private SearchSourceBuilder buildUpdateQuery(SearchSourceBuilder sourceBuilder)
BoolQueryBuilder filteredQuery = new BoolQueryBuilder().filter(queryBuilder)
.filter(config.getSyncConfig().getRangeQuery(nextCheckpoint));

QueryBuilder filter = changeCollector.buildFilterQuery(lastCheckpoint.getTimeUpperBound(), nextCheckpoint.getTimeUpperBound());
QueryBuilder filter = changeCollector != null
? changeCollector.buildFilterQuery(lastCheckpoint.getTimeUpperBound(), nextCheckpoint.getTimeUpperBound())
: null;

if (filter != null) {
filteredQuery.filter(filter);
Expand Down

0 comments on commit 6398f73

Please sign in to comment.