diff --git a/client/rest-high-level/src/test/java/org/elasticsearch/client/transform/transforms/pivot/hlrc/DateHistogramGroupSourceTests.java b/client/rest-high-level/src/test/java/org/elasticsearch/client/transform/transforms/pivot/hlrc/DateHistogramGroupSourceTests.java index 3e119b79afb97..6464cea4cf024 100644 --- a/client/rest-high-level/src/test/java/org/elasticsearch/client/transform/transforms/pivot/hlrc/DateHistogramGroupSourceTests.java +++ b/client/rest-high-level/src/test/java/org/elasticsearch/client/transform/transforms/pivot/hlrc/DateHistogramGroupSourceTests.java @@ -74,19 +74,18 @@ public static DateHistogramGroupSource randomDateHistogramGroupSource() { dateHistogramGroupSource = new DateHistogramGroupSource( field, scriptConfig, - new DateHistogramGroupSource.FixedInterval(new DateHistogramInterval(randomPositiveTimeValue())) + new DateHistogramGroupSource.FixedInterval(new DateHistogramInterval(randomPositiveTimeValue())), + randomBoolean() ? randomZone() : null ); } else { dateHistogramGroupSource = new DateHistogramGroupSource( field, scriptConfig, - new DateHistogramGroupSource.CalendarInterval(new DateHistogramInterval(randomTimeValue(1, 1, "m", "h", "d", "w"))) + new DateHistogramGroupSource.CalendarInterval(new DateHistogramInterval(randomTimeValue(1, 1, "m", "h", "d", "w"))), + randomBoolean() ? randomZone() : null ); } - if (randomBoolean()) { - dateHistogramGroupSource.setTimeZone(randomZone()); - } return dateHistogramGroupSource; } diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/transforms/SyncConfig.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/transforms/SyncConfig.java index 44452426d40c3..342c53e4bcbc6 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/transforms/SyncConfig.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/transforms/SyncConfig.java @@ -19,6 +19,8 @@ public interface SyncConfig extends ToXContentObject, NamedWriteable { */ boolean isValid(); + String getField(); + QueryBuilder getRangeQuery(TransformCheckpoint newCheckpoint); QueryBuilder getRangeQuery(TransformCheckpoint oldCheckpoint, TransformCheckpoint newCheckpoint); diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/transforms/TimeSyncConfig.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/transforms/TimeSyncConfig.java index d659a1f3905a0..a54507dc66178 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/transforms/TimeSyncConfig.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/transforms/TimeSyncConfig.java @@ -25,7 +25,7 @@ import static org.elasticsearch.common.xcontent.ConstructingObjectParser.constructorArg; import static org.elasticsearch.common.xcontent.ConstructingObjectParser.optionalConstructorArg; -public class TimeSyncConfig implements SyncConfig { +public class TimeSyncConfig implements SyncConfig { public static final TimeValue DEFAULT_DELAY = TimeValue.timeValueSeconds(60); private static final String NAME = "data_frame_transform_pivot_sync_time"; @@ -37,17 +37,18 @@ public class TimeSyncConfig implements SyncConfig { private static final ConstructingObjectParser LENIENT_PARSER = createParser(true); private static ConstructingObjectParser createParser(boolean lenient) { - ConstructingObjectParser parser = new ConstructingObjectParser<>(NAME, lenient, - args -> { - String field = (String) args[0]; - TimeValue delay = (TimeValue) args[1]; - return new TimeSyncConfig(field, delay); - }); + ConstructingObjectParser parser = new ConstructingObjectParser<>(NAME, lenient, args -> { + String field = (String) args[0]; + TimeValue delay = (TimeValue) args[1]; + return new TimeSyncConfig(field, delay); + }); parser.declareString(constructorArg(), TransformField.FIELD); - parser.declareField(optionalConstructorArg(), + parser.declareField( + optionalConstructorArg(), (p, c) -> TimeValue.parseTimeValue(p.text(), DEFAULT_DELAY, TransformField.DELAY.getPreferredName()), TransformField.DELAY, - ObjectParser.ValueType.STRING); + ObjectParser.ValueType.STRING + ); return parser; } @@ -65,6 +66,7 @@ public TimeSyncConfig(StreamInput in) throws IOException { this.delay = in.readTimeValue(); } + @Override public String getField() { return field; } @@ -105,12 +107,11 @@ public boolean equals(Object other) { final TimeSyncConfig that = (TimeSyncConfig) other; - return Objects.equals(this.field, that.field) - && Objects.equals(this.delay, that.delay); + return Objects.equals(this.field, that.field) && Objects.equals(this.delay, that.delay); } @Override - public int hashCode(){ + public int hashCode() { return Objects.hash(field, delay); } @@ -139,7 +140,8 @@ public QueryBuilder getRangeQuery(TransformCheckpoint newCheckpoint) { @Override public QueryBuilder getRangeQuery(TransformCheckpoint oldCheckpoint, TransformCheckpoint newCheckpoint) { - return new RangeQueryBuilder(field).gte(oldCheckpoint.getTimeUpperBound()).lt(newCheckpoint.getTimeUpperBound()) - .format("epoch_millis"); + return new RangeQueryBuilder(field).gte(oldCheckpoint.getTimeUpperBound()) + .lt(newCheckpoint.getTimeUpperBound()) + .format("epoch_millis"); } } diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/transforms/pivot/DateHistogramGroupSource.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/transforms/pivot/DateHistogramGroupSource.java index 6509546a2da5a..f61497e31d3e6 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/transforms/pivot/DateHistogramGroupSource.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/transforms/pivot/DateHistogramGroupSource.java @@ -7,15 +7,18 @@ import org.elasticsearch.Version; import org.elasticsearch.common.ParseField; +import org.elasticsearch.common.Rounding; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.Writeable; +import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.xcontent.ConstructingObjectParser; import org.elasticsearch.common.xcontent.ObjectParser; import org.elasticsearch.common.xcontent.ToXContentFragment; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentParser; import org.elasticsearch.index.query.QueryBuilder; +import org.elasticsearch.index.query.RangeQueryBuilder; import org.elasticsearch.search.aggregations.bucket.histogram.DateHistogramAggregationBuilder; import org.elasticsearch.search.aggregations.bucket.histogram.DateHistogramInterval; @@ -105,6 +108,11 @@ public boolean equals(Object other) { public int hashCode() { return Objects.hash(interval); } + + @Override + public String toString() { + return interval.toString(); + } } public static class CalendarInterval implements Interval { @@ -169,6 +177,11 @@ public boolean equals(Object other) { public int hashCode() { return Objects.hash(interval); } + + @Override + public String toString() { + return interval.toString(); + } } private Interval readInterval(StreamInput in) throws IOException { @@ -195,11 +208,26 @@ private void writeInterval(Interval interval, StreamOutput out) throws IOExcepti private static final ConstructingObjectParser LENIENT_PARSER = createParser(true); private final Interval interval; - private ZoneId timeZone; + private final ZoneId timeZone; + private Rounding rounding; - public DateHistogramGroupSource(String field, ScriptConfig scriptConfig, Interval interval) { + public DateHistogramGroupSource(String field, ScriptConfig scriptConfig, Interval interval, ZoneId timeZone) { super(field, scriptConfig); this.interval = interval; + this.timeZone = timeZone; + + Rounding.DateTimeUnit timeUnit = DateHistogramAggregationBuilder.DATE_FIELD_UNITS.get(interval.toString()); + final Rounding.Builder roundingBuilder; + if (timeUnit != null) { + roundingBuilder = new Rounding.Builder(timeUnit); + } else { + roundingBuilder = new Rounding.Builder(TimeValue.parseTimeValue(interval.toString(), interval.getName())); + } + + if (timeZone != null) { + roundingBuilder.timeZone(timeZone); + } + this.rounding = roundingBuilder.build(); } public DateHistogramGroupSource(StreamInput in) throws IOException { @@ -218,6 +246,7 @@ private static ConstructingObjectParser createPa ScriptConfig scriptConfig = (ScriptConfig) args[1]; String fixedInterval = (String) args[2]; String calendarInterval = (String) args[3]; + ZoneId zoneId = (ZoneId) args[4]; Interval interval = null; @@ -231,7 +260,7 @@ private static ConstructingObjectParser createPa throw new IllegalArgumentException("You must specify either fixed_interval or calendar_interval, found none"); } - return new DateHistogramGroupSource(field, scriptConfig, interval); + return new DateHistogramGroupSource(field, scriptConfig, interval, zoneId); }); declareValuesSourceFields(parser, lenient); @@ -239,7 +268,7 @@ private static ConstructingObjectParser createPa parser.declareString(optionalConstructorArg(), new ParseField(FixedInterval.NAME)); parser.declareString(optionalConstructorArg(), new ParseField(CalendarInterval.NAME)); - parser.declareField(DateHistogramGroupSource::setTimeZone, p -> { + parser.declareField(optionalConstructorArg(), p -> { if (p.currentToken() == XContentParser.Token.VALUE_STRING) { return ZoneId.of(p.text()); } else { @@ -267,8 +296,8 @@ public ZoneId getTimeZone() { return timeZone; } - public void setTimeZone(ZoneId timeZone) { - this.timeZone = timeZone; + public Rounding getRounding() { + return rounding; } @Override @@ -315,9 +344,16 @@ public int hashCode() { } @Override - public QueryBuilder getIncrementalBucketUpdateFilterQuery(Set changedBuckets) { - // no need for an extra range filter as this is already done by checkpoints - return null; + public QueryBuilder getIncrementalBucketUpdateFilterQuery( + Set changedBuckets, + String synchronizationField, + long synchronizationTimestamp + ) { + if (synchronizationField != null && synchronizationField.equals(field) && synchronizationTimestamp > 0) { + return new RangeQueryBuilder(field).gte(rounding.round(synchronizationTimestamp)).format("epoch_millis"); + } else { + return null; + } } @Override diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/transforms/pivot/HistogramGroupSource.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/transforms/pivot/HistogramGroupSource.java index 0eba51cb51293..4f62faa7cb09b 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/transforms/pivot/HistogramGroupSource.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/transforms/pivot/HistogramGroupSource.java @@ -101,7 +101,11 @@ public int hashCode() { } @Override - public QueryBuilder getIncrementalBucketUpdateFilterQuery(Set changedBuckets) { + public QueryBuilder getIncrementalBucketUpdateFilterQuery( + Set changedBuckets, + String synchronizationField, + long synchronizationTimestamp + ) { // histograms are simple and cheap, so we skip this optimization return null; } diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/transforms/pivot/SingleGroupSource.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/transforms/pivot/SingleGroupSource.java index bd215126119d5..569bf08d8209a 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/transforms/pivot/SingleGroupSource.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/transforms/pivot/SingleGroupSource.java @@ -116,7 +116,11 @@ public void writeTo(StreamOutput out) throws IOException { public abstract boolean supportsIncrementalBucketUpdate(); - public abstract QueryBuilder getIncrementalBucketUpdateFilterQuery(Set changedBuckets); + public abstract QueryBuilder getIncrementalBucketUpdateFilterQuery( + Set changedBuckets, + String synchronizationField, + long synchronizationTimestamp + ); public String getField() { return field; diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/transforms/pivot/TermsGroupSource.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/transforms/pivot/TermsGroupSource.java index e0d7c8a4f9c57..17a173836cb95 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/transforms/pivot/TermsGroupSource.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/transforms/pivot/TermsGroupSource.java @@ -54,8 +54,15 @@ public static TermsGroupSource fromXContent(final XContentParser parser, boolean } @Override - public QueryBuilder getIncrementalBucketUpdateFilterQuery(Set changedBuckets) { - return new TermsQueryBuilder(field, changedBuckets); + public QueryBuilder getIncrementalBucketUpdateFilterQuery( + Set changedBuckets, + String synchronizationField, + long synchronizationTimestamp + ) { + if (changedBuckets != null && changedBuckets.isEmpty() == false) { + return new TermsQueryBuilder(field, changedBuckets); + } + return null; } @Override diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/transform/transforms/TransformConfigUpdateTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/transform/transforms/TransformConfigUpdateTests.java index 59a7d1861d329..56886bf5bd972 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/transform/transforms/TransformConfigUpdateTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/transform/transforms/TransformConfigUpdateTests.java @@ -21,9 +21,9 @@ import java.util.Collections; import java.util.Map; -import static org.elasticsearch.xpack.core.transform.transforms.TransformConfigTests.randomTransformConfig; import static org.elasticsearch.xpack.core.transform.transforms.DestConfigTests.randomDestConfig; import static org.elasticsearch.xpack.core.transform.transforms.SourceConfigTests.randomSourceConfig; +import static org.elasticsearch.xpack.core.transform.transforms.TransformConfigTests.randomTransformConfig; import static org.hamcrest.Matchers.equalTo; public class TransformConfigUpdateTests extends AbstractSerializingTransformTestCase { @@ -184,6 +184,11 @@ public String getWriteableName() { return "foo"; } + @Override + public String getField() { + return "foo"; + } + @Override public void writeTo(StreamOutput out) throws IOException {} diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/transform/transforms/pivot/DateHistogramGroupSourceTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/transform/transforms/pivot/DateHistogramGroupSourceTests.java index 6ce9a9373c750..265f2a9446828 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/transform/transforms/pivot/DateHistogramGroupSourceTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/transform/transforms/pivot/DateHistogramGroupSourceTests.java @@ -10,11 +10,17 @@ import org.elasticsearch.common.io.stream.BytesStreamOutput; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.Writeable.Reader; +import org.elasticsearch.common.time.DateFormatter; +import org.elasticsearch.common.time.DateFormatters; import org.elasticsearch.common.xcontent.XContentParser; import org.elasticsearch.search.aggregations.bucket.histogram.DateHistogramInterval; import org.elasticsearch.test.AbstractSerializingTestCase; import java.io.IOException; +import java.time.ZoneOffset; +import java.time.temporal.TemporalAccessor; + +import static org.hamcrest.Matchers.equalTo; public class DateHistogramGroupSourceTests extends AbstractSerializingTestCase { @@ -26,19 +32,20 @@ public static DateHistogramGroupSource randomDateHistogramGroupSource() { dateHistogramGroupSource = new DateHistogramGroupSource( field, scriptConfig, - new DateHistogramGroupSource.FixedInterval(new DateHistogramInterval(randomPositiveTimeValue())) + new DateHistogramGroupSource.FixedInterval(new DateHistogramInterval(randomTimeValue(1, 100, "d", "h", "ms", "s", "m"))), + randomBoolean() ? randomZone() : null ); } else { dateHistogramGroupSource = new DateHistogramGroupSource( field, scriptConfig, - new DateHistogramGroupSource.CalendarInterval(new DateHistogramInterval(randomTimeValue(1, 1, "m", "h", "d", "w"))) + new DateHistogramGroupSource.CalendarInterval( + new DateHistogramInterval(randomTimeValue(1, 1, "m", "h", "d", "w", "M", "q", "y")) + ), + randomBoolean() ? randomZone() : null ); } - if (randomBoolean()) { - dateHistogramGroupSource.setTimeZone(randomZone()); - } return dateHistogramGroupSource; } @@ -70,4 +77,56 @@ protected Reader instanceReader() { return DateHistogramGroupSource::new; } + public void testRoundingDateHistogramFixedInterval() { + String field = randomBoolean() ? null : randomAlphaOfLengthBetween(1, 20); + DateHistogramGroupSource dateHistogramGroupSource = new DateHistogramGroupSource( + field, + null, + new DateHistogramGroupSource.FixedInterval(new DateHistogramInterval("1d")), + null + ); + + // not meant to be complete rounding tests, see {@link RoundingTests} for more + assertNotNull(dateHistogramGroupSource.getRounding()); + + assertThat( + dateHistogramGroupSource.getRounding().round(time("2020-03-26T23:59:59.000Z")), + equalTo(time("2020-03-26T00:00:00.000Z")) + ); + assertThat( + dateHistogramGroupSource.getRounding().round(time("2020-03-26T00:00:01.000Z")), + equalTo(time("2020-03-26T00:00:00.000Z")) + ); + } + + public void testRoundingDateHistogramCalendarInterval() { + String field = randomBoolean() ? null : randomAlphaOfLengthBetween(1, 20); + DateHistogramGroupSource dateHistogramGroupSource = new DateHistogramGroupSource( + field, + null, + new DateHistogramGroupSource.CalendarInterval(new DateHistogramInterval("1w")), + null + ); + + // not meant to be complete rounding tests, see {@link RoundingTests} for more + assertNotNull(dateHistogramGroupSource.getRounding()); + + assertThat( + dateHistogramGroupSource.getRounding().round(time("2020-03-26T23:59:59.000Z")), + equalTo(time("2020-03-23T00:00:00.000Z")) + ); + assertThat( + dateHistogramGroupSource.getRounding().round(time("2020-03-29T23:59:59.000Z")), + equalTo(time("2020-03-23T00:00:00.000Z")) + ); + assertThat( + dateHistogramGroupSource.getRounding().round(time("2020-03-23T00:00:01.000Z")), + equalTo(time("2020-03-23T00:00:00.000Z")) + ); + } + + private static long time(String time) { + TemporalAccessor accessor = DateFormatter.forPattern("date_optional_time").withZone(ZoneOffset.UTC).parse(time); + return DateFormatters.from(accessor).toInstant().toEpochMilli(); + } } diff --git a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformIndexer.java b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformIndexer.java index 60afee087fb0c..80b9c9ca1ddba 100644 --- a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformIndexer.java +++ b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformIndexer.java @@ -64,15 +64,11 @@ public abstract class TransformIndexer extends AsyncTwoPhaseIndexer> changedBuckets; + private volatile Map> changedBuckets = Collections.emptyMap(); private volatile Map changedBucketsAfterKey; private volatile long lastCheckpointCleanup = 0L; @@ -146,7 +142,7 @@ public TransformIndexer( this.context = ExceptionsHelper.requireNonNull(context, "context"); // give runState a default - this.runState = RunState.FULL_RUN; + this.runState = RunState.APPLY_BUCKET_RESULTS; } public int getPageSize() { @@ -342,7 +338,7 @@ protected void onFinish(ActionListener listener) { // reset the page size, so we do not memorize a low page size forever pageSize = pivot.getInitialPageSize(); // reset the changed bucket to free memory - changedBuckets = null; + changedBuckets = Collections.emptyMap(); long checkpoint = context.getAndIncrementCheckpoint(); lastCheckpoint = getNextCheckpoint(); @@ -414,11 +410,9 @@ protected IterationResult doProcess(SearchResponse sea final CompositeAggregation agg = aggregations.get(COMPOSITE_AGGREGATION_NAME); switch (runState) { - case FULL_RUN: + case APPLY_BUCKET_RESULTS: return processBuckets(agg); - case PARTIAL_RUN_APPLY_CHANGES: - return processPartialBucketUpdates(agg); - case PARTIAL_RUN_IDENTIFY_CHANGES: + case IDENTIFY_CHANGES: return processChangedBuckets(agg); default: @@ -582,7 +576,19 @@ private void sourceHasChanged(ActionListener hasChangedListener) { private IterationResult processBuckets(final CompositeAggregation agg) { // we reached the end if (agg.getBuckets().isEmpty()) { - return new IterationResult<>(Collections.emptyList(), null, true); + if (nextCheckpoint.getCheckpoint() == 1 || isContinuous() == false || pivot.supportsIncrementalBucketUpdate() == false) { + return new IterationResult<>(Collections.emptyList(), null, true); + } + + // cleanup changed Buckets + changedBuckets = Collections.emptyMap(); + + // reset the runState to fetch changed buckets + runState = RunState.IDENTIFY_CHANGES; + + // advance the cursor for changed bucket detection + return new IterationResult<>(Collections.emptyList(), new TransformIndexerPosition(null, changedBucketsAfterKey), false); + } long docsBeforeProcess = getStats().getNumDocuments(); @@ -608,21 +614,6 @@ private IterationResult processBuckets(final Composite return result; } - private IterationResult processPartialBucketUpdates(final CompositeAggregation agg) { - // we reached the end - if (agg.getBuckets().isEmpty()) { - // cleanup changed Buckets - changedBuckets = null; - - // reset the runState to fetch changed buckets - runState = RunState.PARTIAL_RUN_IDENTIFY_CHANGES; - // advance the cursor for changed bucket detection - return new IterationResult<>(Collections.emptyList(), new TransformIndexerPosition(null, changedBucketsAfterKey), false); - } - - return processBuckets(agg); - } - private IterationResult processChangedBuckets(final CompositeAggregation agg) { // initialize the map of changed buckets, the map might be empty if source do not require/implement // changed bucket detection @@ -631,7 +622,7 @@ private IterationResult processChangedBuckets(final Co // reached the end? if (agg.getBuckets().isEmpty()) { // reset everything and return the end marker - changedBuckets = null; + changedBuckets = Collections.emptyMap(); changedBucketsAfterKey = null; return new IterationResult<>(Collections.emptyList(), null, true); } @@ -644,7 +635,7 @@ private IterationResult processChangedBuckets(final Co changedBucketsAfterKey = agg.afterKey(); // reset the runState to fetch the partial updates next - runState = RunState.PARTIAL_RUN_APPLY_CHANGES; + runState = RunState.APPLY_BUCKET_RESULTS; return new IterationResult<>(Collections.emptyList(), getPosition(), false); } @@ -721,15 +712,12 @@ protected SearchRequest buildSearchRequest() { SearchSourceBuilder sourceBuilder = new SearchSourceBuilder().size(0); switch (runState) { - case FULL_RUN: - buildFullRunQuery(sourceBuilder); + case APPLY_BUCKET_RESULTS: + buildUpdateQuery(sourceBuilder); break; - case PARTIAL_RUN_IDENTIFY_CHANGES: + case IDENTIFY_CHANGES: buildChangedBucketsQuery(sourceBuilder); break; - case PARTIAL_RUN_APPLY_CHANGES: - buildPartialUpdateQuery(sourceBuilder); - break; default: // Any other state is a bug, should not happen logger.warn("Encountered unexpected run state [" + runState + "]"); @@ -740,26 +728,6 @@ protected SearchRequest buildSearchRequest() { return searchRequest; } - private SearchSourceBuilder buildFullRunQuery(SearchSourceBuilder sourceBuilder) { - TransformIndexerPosition position = getPosition(); - - sourceBuilder.aggregation(pivot.buildAggregation(position != null ? position.getIndexerPosition() : null, pageSize)); - TransformConfig config = getConfig(); - - QueryBuilder pivotQueryBuilder = config.getSource().getQueryConfig().getQuery(); - if (isContinuous()) { - BoolQueryBuilder filteredQuery = new BoolQueryBuilder().filter(pivotQueryBuilder) - .filter(config.getSyncConfig().getRangeQuery(nextCheckpoint)); - sourceBuilder.query(filteredQuery); - } else { - sourceBuilder.query(pivotQueryBuilder); - } - - logger.trace("running full run query: {}", sourceBuilder); - - return sourceBuilder; - } - private SearchSourceBuilder buildChangedBucketsQuery(SearchSourceBuilder sourceBuilder) { assert isContinuous(); @@ -781,9 +749,7 @@ private SearchSourceBuilder buildChangedBucketsQuery(SearchSourceBuilder sourceB return sourceBuilder; } - private SearchSourceBuilder buildPartialUpdateQuery(SearchSourceBuilder sourceBuilder) { - assert isContinuous(); - + private SearchSourceBuilder buildUpdateQuery(SearchSourceBuilder sourceBuilder) { TransformIndexerPosition position = getPosition(); sourceBuilder.aggregation(pivot.buildAggregation(position != null ? position.getIndexerPosition() : null, pageSize)); @@ -791,18 +757,28 @@ private SearchSourceBuilder buildPartialUpdateQuery(SearchSourceBuilder sourceBu QueryBuilder pivotQueryBuilder = config.getSource().getQueryConfig().getQuery(); + // if its either the 1st run or not continuous, do not apply extra filters + if (nextCheckpoint.getCheckpoint() == 1 || isContinuous() == false) { + sourceBuilder.query(pivotQueryBuilder); + logger.trace("running query: {}", sourceBuilder); + + return sourceBuilder; + } + BoolQueryBuilder filteredQuery = new BoolQueryBuilder().filter(pivotQueryBuilder) .filter(config.getSyncConfig().getRangeQuery(nextCheckpoint)); - if (changedBuckets != null && changedBuckets.isEmpty() == false) { - QueryBuilder pivotFilter = pivot.filterBuckets(changedBuckets); - if (pivotFilter != null) { - filteredQuery.filter(pivotFilter); - } + QueryBuilder pivotFilter = pivot.filterBuckets( + changedBuckets, + config.getSyncConfig().getField(), + lastCheckpoint.getTimeUpperBound() + ); + if (pivotFilter != null) { + filteredQuery.filter(pivotFilter); } sourceBuilder.query(filteredQuery); - logger.trace("running partial update query: {}", sourceBuilder); + logger.trace("running query: {}", sourceBuilder); return sourceBuilder; } @@ -903,16 +879,16 @@ protected boolean shouldAuditOnFinish(long completedCheckpoint) { private RunState determineRunStateAtStart() { // either 1st run or not a continuous transform if (nextCheckpoint.getCheckpoint() == 1 || isContinuous() == false) { - return RunState.FULL_RUN; + return RunState.APPLY_BUCKET_RESULTS; } - // if incremental update is not supported, do a full run + // if incremental update is not supported, do a normal run if (pivot.supportsIncrementalBucketUpdate() == false) { - return RunState.FULL_RUN; + return RunState.APPLY_BUCKET_RESULTS; } // continuous mode: we need to get the changed buckets first - return RunState.PARTIAL_RUN_IDENTIFY_CHANGES; + return RunState.IDENTIFY_CHANGES; } /** diff --git a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/pivot/Pivot.java b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/pivot/Pivot.java index 47259ee8c8142..16bd7d9f6a935 100644 --- a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/pivot/Pivot.java +++ b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/pivot/Pivot.java @@ -8,6 +8,7 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import org.apache.logging.log4j.message.ParameterizedMessage; import org.elasticsearch.ElasticsearchStatusException; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.search.SearchAction; @@ -174,7 +175,6 @@ public Stream> extractResults( Map fieldTypeMap, TransformIndexerStats transformIndexerStats ) { - GroupConfig groups = config.getGroupConfig(); Collection aggregationBuilders = config.getAggregationConfig().getAggregatorFactories(); Collection pipelineAggregationBuilders = config.getAggregationConfig().getPipelineAggregatorFactories(); @@ -189,44 +189,31 @@ public Stream> extractResults( ); } - public QueryBuilder filterBuckets(Map> changedBuckets) { - - if (changedBuckets == null || changedBuckets.isEmpty()) { - return null; - } + public QueryBuilder filterBuckets( + Map> changedBuckets, + String synchronizationField, + long lastSynchronizationCheckpoint + ) { + assert changedBuckets != null; if (config.getGroupConfig().getGroups().size() == 1) { Entry entry = config.getGroupConfig().getGroups().entrySet().iterator().next(); - // it should not be possible to get into this code path - assert (entry.getValue().supportsIncrementalBucketUpdate()); - - logger.trace("filter by bucket: " + entry.getKey() + "/" + entry.getValue().getField()); - if (changedBuckets.containsKey(entry.getKey())) { - return entry.getValue().getIncrementalBucketUpdateFilterQuery(changedBuckets.get(entry.getKey())); - } else { - // should never happen - throw new RuntimeException("Could not find bucket value for key " + entry.getKey()); - } + logger.trace(() -> new ParameterizedMessage("filter by bucket: {}/{}", entry.getKey(), entry.getValue().getField())); + Set changedBucketsByGroup = changedBuckets.get(entry.getKey()); + return entry.getValue() + .getIncrementalBucketUpdateFilterQuery(changedBucketsByGroup, synchronizationField, lastSynchronizationCheckpoint); } // else: more than 1 group by, need to nest it BoolQueryBuilder filteredQuery = new BoolQueryBuilder(); for (Entry entry : config.getGroupConfig().getGroups().entrySet()) { - if (entry.getValue().supportsIncrementalBucketUpdate() == false) { - continue; - } - - if (changedBuckets.containsKey(entry.getKey())) { - QueryBuilder sourceQueryFilter = entry.getValue().getIncrementalBucketUpdateFilterQuery(changedBuckets.get(entry.getKey())); - // the source might not define an filter optimization - if (sourceQueryFilter != null) { - filteredQuery.filter(sourceQueryFilter); - } - } else { - // should never happen - throw new RuntimeException("Could not find bucket value for key " + entry.getKey()); + Set changedBucketsByGroup = changedBuckets.get(entry.getKey()); + QueryBuilder sourceQueryFilter = entry.getValue() + .getIncrementalBucketUpdateFilterQuery(changedBucketsByGroup, synchronizationField, lastSynchronizationCheckpoint); + // the source might not define a filter optimization + if (sourceQueryFilter != null) { + filteredQuery.filter(sourceQueryFilter); } - } return filteredQuery;