diff --git a/x-pack/plugin/transform/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/transform/integration/TransformPivotRestIT.java b/x-pack/plugin/transform/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/transform/integration/TransformPivotRestIT.java index 13b56259ce51f..c8982620d62ee 100644 --- a/x-pack/plugin/transform/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/transform/integration/TransformPivotRestIT.java +++ b/x-pack/plugin/transform/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/transform/integration/TransformPivotRestIT.java @@ -500,6 +500,115 @@ public void testHistogramPivot() throws Exception { assertOnePivotValue(transformIndex + "/_search?q=every_2:0.0", 1.0); } + public void testContinuousPivotHistogram() throws Exception { + String indexName = "continuous_reviews_histogram"; + createReviewsIndex(indexName); + String transformId = "simple_continuous_pivot"; + String transformIndex = "pivot_reviews_continuous_histogram"; + setupDataAccessRole(DATA_ACCESS_ROLE, indexName, transformIndex); + final Request createTransformRequest = createRequestWithAuth( + "PUT", + getTransformEndpoint() + transformId, + BASIC_AUTH_VALUE_TRANSFORM_ADMIN_WITH_SOME_DATA_ACCESS + ); + String config = "{" + + " \"source\": {\"index\":\"" + + indexName + + "\"}," + + " \"dest\": {\"index\":\"" + + transformIndex + + "\"}," + + " \"frequency\": \"1s\"," + + " \"sync\": {\"time\": {\"field\": \"timestamp\", \"delay\": \"1s\"}}," + + " \"pivot\": {" + + " \"group_by\": {" + + " \"every_2\": {" + + " \"histogram\": {" + + " \"interval\": 2,\"field\":\"stars\"" + + " } } }," + + " \"aggregations\": {" + + " \"user_dc\": {" + + " \"cardinality\": {" + + " \"field\": \"user_id\"" + + " } } } }" + + "}"; + createTransformRequest.setJsonEntity(config); + Map createTransformResponse = entityAsMap(client().performRequest(createTransformRequest)); + assertThat(createTransformResponse.get("acknowledged"), equalTo(Boolean.TRUE)); + + startAndWaitForContinuousTransform(transformId, transformIndex, null); + assertTrue(indexExists(transformIndex)); + + // we expect 3 documents as there shall be 5 unique star values and we are bucketing every 2 starting at 0 + Map indexStats = getAsMap(transformIndex + "/_stats"); + assertEquals(3, XContentMapValues.extractValue("_all.total.docs.count", indexStats)); + + Map searchResult = getAsMap(transformIndex + "/_search?q=every_2:0.0"); + assertEquals(1, XContentMapValues.extractValue("hits.total.value", searchResult)); + assertThat(((List) XContentMapValues.extractValue("hits.hits._source.user_dc", searchResult)).get(0), equalTo(19.0)); + + searchResult = getAsMap(transformIndex + "/_search?q=every_2:2.0"); + assertEquals(1, XContentMapValues.extractValue("hits.total.value", searchResult)); + assertThat(((List) XContentMapValues.extractValue("hits.hits._source.user_dc", searchResult)).get(0), equalTo(27.0)); + + searchResult = getAsMap(transformIndex + "/_search?q=every_2:4.0"); + assertEquals(1, XContentMapValues.extractValue("hits.total.value", searchResult)); + assertThat(((List) XContentMapValues.extractValue("hits.hits._source.user_dc", searchResult)).get(0), equalTo(27.0)); + + final StringBuilder bulk = new StringBuilder(); + + long dateStamp = Instant.now().toEpochMilli() - 1_000; + + // add 5 data points with 3 new users: 27, 28, 29 + for (int i = 25; i < 30; i++) { + bulk.append("{\"index\":{\"_index\":\"" + indexName + "\"}}\n"); + String location = (i + 10) + "," + (i + 15); + + bulk.append("{\"user_id\":\"") + .append("user_") + .append(i) + .append("\",\"business_id\":\"") + .append("business_") + .append(i) + .append("\",\"stars\":") + .append(3) + .append(",\"location\":\"") + .append(location) + .append("\",\"timestamp\":") + .append(dateStamp) + .append("}\n"); + } + bulk.append("\r\n"); + + final Request bulkRequest = new Request("POST", "/_bulk"); + bulkRequest.addParameter("refresh", "true"); + bulkRequest.setJsonEntity(bulk.toString()); + client().performRequest(bulkRequest); + + waitForTransformCheckpoint(transformId, 2); + + stopTransform(transformId, false); + refreshIndex(transformIndex); + + // assert after changes + // still 3 documents + indexStats = getAsMap(transformIndex + "/_stats"); + assertEquals(3, XContentMapValues.extractValue("_all.total.docs.count", indexStats)); + + searchResult = getAsMap(transformIndex + "/_search?q=every_2:0.0"); + assertEquals(1, XContentMapValues.extractValue("hits.total.value", searchResult)); + assertThat(((List) XContentMapValues.extractValue("hits.hits._source.user_dc", searchResult)).get(0), equalTo(19.0)); + + searchResult = getAsMap(transformIndex + "/_search?q=every_2:2.0"); + assertEquals(1, XContentMapValues.extractValue("hits.total.value", searchResult)); + assertThat(((List) XContentMapValues.extractValue("hits.hits._source.user_dc", searchResult)).get(0), equalTo(30.0)); + + searchResult = getAsMap(transformIndex + "/_search?q=every_2:4.0"); + assertEquals(1, XContentMapValues.extractValue("hits.total.value", searchResult)); + assertThat(((List) XContentMapValues.extractValue("hits.hits._source.user_dc", searchResult)).get(0), equalTo(27.0)); + + } + public void testBiggerPivot() throws Exception { String transformId = "bigger_pivot"; String transformIndex = "bigger_pivot_reviews"; diff --git a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformIndexer.java b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformIndexer.java index 645581cad45ca..6456564a8e152 100644 --- a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformIndexer.java +++ b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformIndexer.java @@ -345,7 +345,7 @@ protected void initializeFunction() { // create the function function = FunctionFactory.create(getConfig()); - if (isContinuous()) { + if (isContinuous() && function.supportsIncrementalBucketUpdate()) { changeCollector = function.buildChangeCollector(getConfig().getSyncConfig().getField()); } } @@ -370,7 +370,7 @@ protected void onFinish(ActionListener listener) { // reset the page size, so we do not memorize a low page size forever pageSize = function.getInitialPageSize(); // reset the changed bucket to free memory - if (isContinuous()) { + if (changeCollector != null) { changeCollector.clear(); } @@ -749,7 +749,9 @@ private SearchSourceBuilder buildUpdateQuery(SearchSourceBuilder sourceBuilder) BoolQueryBuilder filteredQuery = new BoolQueryBuilder().filter(queryBuilder) .filter(config.getSyncConfig().getRangeQuery(nextCheckpoint)); - QueryBuilder filter = changeCollector.buildFilterQuery(lastCheckpoint.getTimeUpperBound(), nextCheckpoint.getTimeUpperBound()); + QueryBuilder filter = changeCollector != null + ? changeCollector.buildFilterQuery(lastCheckpoint.getTimeUpperBound(), nextCheckpoint.getTimeUpperBound()) + : null; if (filter != null) { filteredQuery.filter(filter);