From 9690e8098f3a92af61840b0dfd2e219e812fe72e Mon Sep 17 00:00:00 2001 From: Hendrik Muhs Date: Tue, 24 Mar 2020 10:31:34 +0100 Subject: [PATCH 1/9] make date histogram group source immutable by moving timezone into the constructor --- .../hlrc/DateHistogramGroupSourceTests.java | 9 ++-- .../pivot/DateHistogramGroupSource.java | 42 ++++++++++++++++--- .../pivot/DateHistogramGroupSourceTests.java | 9 ++-- 3 files changed, 44 insertions(+), 16 deletions(-) diff --git a/client/rest-high-level/src/test/java/org/elasticsearch/client/transform/transforms/pivot/hlrc/DateHistogramGroupSourceTests.java b/client/rest-high-level/src/test/java/org/elasticsearch/client/transform/transforms/pivot/hlrc/DateHistogramGroupSourceTests.java index 3e119b79afb97..6464cea4cf024 100644 --- a/client/rest-high-level/src/test/java/org/elasticsearch/client/transform/transforms/pivot/hlrc/DateHistogramGroupSourceTests.java +++ b/client/rest-high-level/src/test/java/org/elasticsearch/client/transform/transforms/pivot/hlrc/DateHistogramGroupSourceTests.java @@ -74,19 +74,18 @@ public static DateHistogramGroupSource randomDateHistogramGroupSource() { dateHistogramGroupSource = new DateHistogramGroupSource( field, scriptConfig, - new DateHistogramGroupSource.FixedInterval(new DateHistogramInterval(randomPositiveTimeValue())) + new DateHistogramGroupSource.FixedInterval(new DateHistogramInterval(randomPositiveTimeValue())), + randomBoolean() ? randomZone() : null ); } else { dateHistogramGroupSource = new DateHistogramGroupSource( field, scriptConfig, - new DateHistogramGroupSource.CalendarInterval(new DateHistogramInterval(randomTimeValue(1, 1, "m", "h", "d", "w"))) + new DateHistogramGroupSource.CalendarInterval(new DateHistogramInterval(randomTimeValue(1, 1, "m", "h", "d", "w"))), + randomBoolean() ? randomZone() : null ); } - if (randomBoolean()) { - dateHistogramGroupSource.setTimeZone(randomZone()); - } return dateHistogramGroupSource; } diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/transforms/pivot/DateHistogramGroupSource.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/transforms/pivot/DateHistogramGroupSource.java index 6509546a2da5a..8c66b623e3d86 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/transforms/pivot/DateHistogramGroupSource.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/transforms/pivot/DateHistogramGroupSource.java @@ -7,15 +7,18 @@ import org.elasticsearch.Version; import org.elasticsearch.common.ParseField; +import org.elasticsearch.common.Rounding; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.Writeable; +import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.xcontent.ConstructingObjectParser; import org.elasticsearch.common.xcontent.ObjectParser; import org.elasticsearch.common.xcontent.ToXContentFragment; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentParser; import org.elasticsearch.index.query.QueryBuilder; +import org.elasticsearch.index.query.RangeQueryBuilder; import org.elasticsearch.search.aggregations.bucket.histogram.DateHistogramAggregationBuilder; import org.elasticsearch.search.aggregations.bucket.histogram.DateHistogramInterval; @@ -105,6 +108,11 @@ public boolean equals(Object other) { public int hashCode() { return Objects.hash(interval); } + + @Override + public String toString() { + return interval.toString(); + } } public static class CalendarInterval implements Interval { @@ -169,6 +177,11 @@ public boolean equals(Object other) { public int hashCode() { return Objects.hash(interval); } + + @Override + public String toString() { + return interval.toString(); + } } private Interval readInterval(StreamInput in) throws IOException { @@ -195,11 +208,27 @@ private void writeInterval(Interval interval, StreamOutput out) throws IOExcepti private static final ConstructingObjectParser LENIENT_PARSER = createParser(true); private final Interval interval; - private ZoneId timeZone; + private final ZoneId timeZone; + private Rounding rounding; - public DateHistogramGroupSource(String field, ScriptConfig scriptConfig, Interval interval) { + public DateHistogramGroupSource(String field, ScriptConfig scriptConfig, Interval interval, ZoneId timeZone) { super(field, scriptConfig); this.interval = interval; + this.timeZone = timeZone; + + Rounding.DateTimeUnit timeUnit = DateHistogramAggregationBuilder.DATE_FIELD_UNITS.get(interval.toString()); + final Rounding.Builder roundingBuilder; + if (timeUnit != null) { + roundingBuilder = new Rounding.Builder(timeUnit); + + } else { + roundingBuilder = new Rounding.Builder(TimeValue.parseTimeValue(interval.toString(), "createRounding")); + } + + if (timeZone != null) { + roundingBuilder.timeZone(timeZone); + } + this.rounding = roundingBuilder.build(); } public DateHistogramGroupSource(StreamInput in) throws IOException { @@ -218,6 +247,7 @@ private static ConstructingObjectParser createPa ScriptConfig scriptConfig = (ScriptConfig) args[1]; String fixedInterval = (String) args[2]; String calendarInterval = (String) args[3]; + ZoneId zoneId = (ZoneId) args[4]; Interval interval = null; @@ -231,7 +261,7 @@ private static ConstructingObjectParser createPa throw new IllegalArgumentException("You must specify either fixed_interval or calendar_interval, found none"); } - return new DateHistogramGroupSource(field, scriptConfig, interval); + return new DateHistogramGroupSource(field, scriptConfig, interval, zoneId); }); declareValuesSourceFields(parser, lenient); @@ -239,7 +269,7 @@ private static ConstructingObjectParser createPa parser.declareString(optionalConstructorArg(), new ParseField(FixedInterval.NAME)); parser.declareString(optionalConstructorArg(), new ParseField(CalendarInterval.NAME)); - parser.declareField(DateHistogramGroupSource::setTimeZone, p -> { + parser.declareField(optionalConstructorArg(), p -> { if (p.currentToken() == XContentParser.Token.VALUE_STRING) { return ZoneId.of(p.text()); } else { @@ -267,8 +297,8 @@ public ZoneId getTimeZone() { return timeZone; } - public void setTimeZone(ZoneId timeZone) { - this.timeZone = timeZone; + public Rounding getRounding() { + return rounding; } @Override diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/transform/transforms/pivot/DateHistogramGroupSourceTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/transform/transforms/pivot/DateHistogramGroupSourceTests.java index 6ce9a9373c750..1cd8dea1b4133 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/transform/transforms/pivot/DateHistogramGroupSourceTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/transform/transforms/pivot/DateHistogramGroupSourceTests.java @@ -26,19 +26,18 @@ public static DateHistogramGroupSource randomDateHistogramGroupSource() { dateHistogramGroupSource = new DateHistogramGroupSource( field, scriptConfig, - new DateHistogramGroupSource.FixedInterval(new DateHistogramInterval(randomPositiveTimeValue())) + new DateHistogramGroupSource.FixedInterval(new DateHistogramInterval(randomTimeValue(1, 1, "d", "h", "ms", "s", "m"))), + randomBoolean() ? randomZone() : null ); } else { dateHistogramGroupSource = new DateHistogramGroupSource( field, scriptConfig, - new DateHistogramGroupSource.CalendarInterval(new DateHistogramInterval(randomTimeValue(1, 1, "m", "h", "d", "w"))) + new DateHistogramGroupSource.CalendarInterval(new DateHistogramInterval(randomTimeValue(1, 1, "m", "h", "d", "w"))), + randomBoolean() ? randomZone() : null ); } - if (randomBoolean()) { - dateHistogramGroupSource.setTimeZone(randomZone()); - } return dateHistogramGroupSource; } From 99fca93ab41b5e89baee244495b97c8e0619f56e Mon Sep 17 00:00:00 2001 From: Hendrik Muhs Date: Tue, 24 Mar 2020 10:39:43 +0100 Subject: [PATCH 2/9] optimize date_histogram continuous transforms by limiting updates to recent buckets according to sync configuration --- .../core/transform/transforms/SyncConfig.java | 2 + .../transform/transforms/TimeSyncConfig.java | 30 ++--- .../pivot/DateHistogramGroupSource.java | 10 +- .../pivot/HistogramGroupSource.java | 2 +- .../transforms/pivot/SingleGroupSource.java | 2 +- .../transforms/pivot/TermsGroupSource.java | 7 +- .../TransformConfigUpdateTests.java | 7 +- .../transforms/TransformIndexer.java | 107 ++++++++---------- .../transform/transforms/pivot/Pivot.java | 47 ++++---- 9 files changed, 108 insertions(+), 106 deletions(-) diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/transforms/SyncConfig.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/transforms/SyncConfig.java index 44452426d40c3..342c53e4bcbc6 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/transforms/SyncConfig.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/transforms/SyncConfig.java @@ -19,6 +19,8 @@ public interface SyncConfig extends ToXContentObject, NamedWriteable { */ boolean isValid(); + String getField(); + QueryBuilder getRangeQuery(TransformCheckpoint newCheckpoint); QueryBuilder getRangeQuery(TransformCheckpoint oldCheckpoint, TransformCheckpoint newCheckpoint); diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/transforms/TimeSyncConfig.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/transforms/TimeSyncConfig.java index d659a1f3905a0..a54507dc66178 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/transforms/TimeSyncConfig.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/transforms/TimeSyncConfig.java @@ -25,7 +25,7 @@ import static org.elasticsearch.common.xcontent.ConstructingObjectParser.constructorArg; import static org.elasticsearch.common.xcontent.ConstructingObjectParser.optionalConstructorArg; -public class TimeSyncConfig implements SyncConfig { +public class TimeSyncConfig implements SyncConfig { public static final TimeValue DEFAULT_DELAY = TimeValue.timeValueSeconds(60); private static final String NAME = "data_frame_transform_pivot_sync_time"; @@ -37,17 +37,18 @@ public class TimeSyncConfig implements SyncConfig { private static final ConstructingObjectParser LENIENT_PARSER = createParser(true); private static ConstructingObjectParser createParser(boolean lenient) { - ConstructingObjectParser parser = new ConstructingObjectParser<>(NAME, lenient, - args -> { - String field = (String) args[0]; - TimeValue delay = (TimeValue) args[1]; - return new TimeSyncConfig(field, delay); - }); + ConstructingObjectParser parser = new ConstructingObjectParser<>(NAME, lenient, args -> { + String field = (String) args[0]; + TimeValue delay = (TimeValue) args[1]; + return new TimeSyncConfig(field, delay); + }); parser.declareString(constructorArg(), TransformField.FIELD); - parser.declareField(optionalConstructorArg(), + parser.declareField( + optionalConstructorArg(), (p, c) -> TimeValue.parseTimeValue(p.text(), DEFAULT_DELAY, TransformField.DELAY.getPreferredName()), TransformField.DELAY, - ObjectParser.ValueType.STRING); + ObjectParser.ValueType.STRING + ); return parser; } @@ -65,6 +66,7 @@ public TimeSyncConfig(StreamInput in) throws IOException { this.delay = in.readTimeValue(); } + @Override public String getField() { return field; } @@ -105,12 +107,11 @@ public boolean equals(Object other) { final TimeSyncConfig that = (TimeSyncConfig) other; - return Objects.equals(this.field, that.field) - && Objects.equals(this.delay, that.delay); + return Objects.equals(this.field, that.field) && Objects.equals(this.delay, that.delay); } @Override - public int hashCode(){ + public int hashCode() { return Objects.hash(field, delay); } @@ -139,7 +140,8 @@ public QueryBuilder getRangeQuery(TransformCheckpoint newCheckpoint) { @Override public QueryBuilder getRangeQuery(TransformCheckpoint oldCheckpoint, TransformCheckpoint newCheckpoint) { - return new RangeQueryBuilder(field).gte(oldCheckpoint.getTimeUpperBound()).lt(newCheckpoint.getTimeUpperBound()) - .format("epoch_millis"); + return new RangeQueryBuilder(field).gte(oldCheckpoint.getTimeUpperBound()) + .lt(newCheckpoint.getTimeUpperBound()) + .format("epoch_millis"); } } diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/transforms/pivot/DateHistogramGroupSource.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/transforms/pivot/DateHistogramGroupSource.java index 8c66b623e3d86..7e50d97685487 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/transforms/pivot/DateHistogramGroupSource.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/transforms/pivot/DateHistogramGroupSource.java @@ -345,9 +345,13 @@ public int hashCode() { } @Override - public QueryBuilder getIncrementalBucketUpdateFilterQuery(Set changedBuckets) { - // no need for an extra range filter as this is already done by checkpoints - return null; + public QueryBuilder getIncrementalBucketUpdateFilterQuery(Set changedBuckets, long synchronizationTimestamp) { + if (synchronizationTimestamp > 0) { + + return new RangeQueryBuilder(field).gte(rounding.round(synchronizationTimestamp)).format("epoch_millis"); + } else { + return null; + } } @Override diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/transforms/pivot/HistogramGroupSource.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/transforms/pivot/HistogramGroupSource.java index 0eba51cb51293..25549002848fa 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/transforms/pivot/HistogramGroupSource.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/transforms/pivot/HistogramGroupSource.java @@ -101,7 +101,7 @@ public int hashCode() { } @Override - public QueryBuilder getIncrementalBucketUpdateFilterQuery(Set changedBuckets) { + public QueryBuilder getIncrementalBucketUpdateFilterQuery(Set changedBuckets, long synchronizationTimestamp) { // histograms are simple and cheap, so we skip this optimization return null; } diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/transforms/pivot/SingleGroupSource.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/transforms/pivot/SingleGroupSource.java index bd215126119d5..1ff2a7d6a9ef2 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/transforms/pivot/SingleGroupSource.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/transforms/pivot/SingleGroupSource.java @@ -116,7 +116,7 @@ public void writeTo(StreamOutput out) throws IOException { public abstract boolean supportsIncrementalBucketUpdate(); - public abstract QueryBuilder getIncrementalBucketUpdateFilterQuery(Set changedBuckets); + public abstract QueryBuilder getIncrementalBucketUpdateFilterQuery(Set changedBuckets, long synchronizationTimestamp); public String getField() { return field; diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/transforms/pivot/TermsGroupSource.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/transforms/pivot/TermsGroupSource.java index e0d7c8a4f9c57..5a93163e2cf8a 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/transforms/pivot/TermsGroupSource.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/transforms/pivot/TermsGroupSource.java @@ -54,8 +54,11 @@ public static TermsGroupSource fromXContent(final XContentParser parser, boolean } @Override - public QueryBuilder getIncrementalBucketUpdateFilterQuery(Set changedBuckets) { - return new TermsQueryBuilder(field, changedBuckets); + public QueryBuilder getIncrementalBucketUpdateFilterQuery(Set changedBuckets, long synchronizationTimestamp) { + if (changedBuckets != null && changedBuckets.isEmpty() == false) { + return new TermsQueryBuilder(field, changedBuckets); + } + return null; } @Override diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/transform/transforms/TransformConfigUpdateTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/transform/transforms/TransformConfigUpdateTests.java index 59a7d1861d329..56886bf5bd972 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/transform/transforms/TransformConfigUpdateTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/transform/transforms/TransformConfigUpdateTests.java @@ -21,9 +21,9 @@ import java.util.Collections; import java.util.Map; -import static org.elasticsearch.xpack.core.transform.transforms.TransformConfigTests.randomTransformConfig; import static org.elasticsearch.xpack.core.transform.transforms.DestConfigTests.randomDestConfig; import static org.elasticsearch.xpack.core.transform.transforms.SourceConfigTests.randomSourceConfig; +import static org.elasticsearch.xpack.core.transform.transforms.TransformConfigTests.randomTransformConfig; import static org.hamcrest.Matchers.equalTo; public class TransformConfigUpdateTests extends AbstractSerializingTransformTestCase { @@ -184,6 +184,11 @@ public String getWriteableName() { return "foo"; } + @Override + public String getField() { + return "foo"; + } + @Override public void writeTo(StreamOutput out) throws IOException {} diff --git a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformIndexer.java b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformIndexer.java index 60afee087fb0c..1312f14248792 100644 --- a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformIndexer.java +++ b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformIndexer.java @@ -64,15 +64,11 @@ public abstract class TransformIndexer extends AsyncTwoPhaseIndexer listener) { // reset the page size, so we do not memorize a low page size forever pageSize = pivot.getInitialPageSize(); // reset the changed bucket to free memory - changedBuckets = null; + changedBuckets = Collections.emptyMap(); long checkpoint = context.getAndIncrementCheckpoint(); lastCheckpoint = getNextCheckpoint(); @@ -414,11 +410,9 @@ protected IterationResult doProcess(SearchResponse sea final CompositeAggregation agg = aggregations.get(COMPOSITE_AGGREGATION_NAME); switch (runState) { - case FULL_RUN: + case APPLY_BUCKET_RESULTS: return processBuckets(agg); - case PARTIAL_RUN_APPLY_CHANGES: - return processPartialBucketUpdates(agg); - case PARTIAL_RUN_IDENTIFY_CHANGES: + case IDENTIFY_CHANGES: return processChangedBuckets(agg); default: @@ -582,7 +576,19 @@ private void sourceHasChanged(ActionListener hasChangedListener) { private IterationResult processBuckets(final CompositeAggregation agg) { // we reached the end if (agg.getBuckets().isEmpty()) { - return new IterationResult<>(Collections.emptyList(), null, true); + if (nextCheckpoint.getCheckpoint() == 1 || isContinuous() == false) { + return new IterationResult<>(Collections.emptyList(), null, true); + } + + // cleanup changed Buckets + changedBuckets = Collections.emptyMap(); + + // reset the runState to fetch changed buckets + runState = determineRunStateAtStart(); + + // advance the cursor for changed bucket detection + return new IterationResult<>(Collections.emptyList(), new TransformIndexerPosition(null, changedBucketsAfterKey), false); + } long docsBeforeProcess = getStats().getNumDocuments(); @@ -612,10 +618,12 @@ private IterationResult processPartialBucketUpdates(fi // we reached the end if (agg.getBuckets().isEmpty()) { // cleanup changed Buckets - changedBuckets = null; + changedBuckets = Collections.emptyMap(); // reset the runState to fetch changed buckets - runState = RunState.PARTIAL_RUN_IDENTIFY_CHANGES; + runState = determineRunStateAtStart(); + + // RunState.PARTIAL_RUN_IDENTIFY_CHANGES; // advance the cursor for changed bucket detection return new IterationResult<>(Collections.emptyList(), new TransformIndexerPosition(null, changedBucketsAfterKey), false); } @@ -631,7 +639,7 @@ private IterationResult processChangedBuckets(final Co // reached the end? if (agg.getBuckets().isEmpty()) { // reset everything and return the end marker - changedBuckets = null; + changedBuckets = Collections.emptyMap(); changedBucketsAfterKey = null; return new IterationResult<>(Collections.emptyList(), null, true); } @@ -644,7 +652,7 @@ private IterationResult processChangedBuckets(final Co changedBucketsAfterKey = agg.afterKey(); // reset the runState to fetch the partial updates next - runState = RunState.PARTIAL_RUN_APPLY_CHANGES; + runState = RunState.APPLY_BUCKET_RESULTS; return new IterationResult<>(Collections.emptyList(), getPosition(), false); } @@ -721,15 +729,12 @@ protected SearchRequest buildSearchRequest() { SearchSourceBuilder sourceBuilder = new SearchSourceBuilder().size(0); switch (runState) { - case FULL_RUN: - buildFullRunQuery(sourceBuilder); + case APPLY_BUCKET_RESULTS: + buildUpdateQuery(sourceBuilder); break; - case PARTIAL_RUN_IDENTIFY_CHANGES: + case IDENTIFY_CHANGES: buildChangedBucketsQuery(sourceBuilder); break; - case PARTIAL_RUN_APPLY_CHANGES: - buildPartialUpdateQuery(sourceBuilder); - break; default: // Any other state is a bug, should not happen logger.warn("Encountered unexpected run state [" + runState + "]"); @@ -740,26 +745,6 @@ protected SearchRequest buildSearchRequest() { return searchRequest; } - private SearchSourceBuilder buildFullRunQuery(SearchSourceBuilder sourceBuilder) { - TransformIndexerPosition position = getPosition(); - - sourceBuilder.aggregation(pivot.buildAggregation(position != null ? position.getIndexerPosition() : null, pageSize)); - TransformConfig config = getConfig(); - - QueryBuilder pivotQueryBuilder = config.getSource().getQueryConfig().getQuery(); - if (isContinuous()) { - BoolQueryBuilder filteredQuery = new BoolQueryBuilder().filter(pivotQueryBuilder) - .filter(config.getSyncConfig().getRangeQuery(nextCheckpoint)); - sourceBuilder.query(filteredQuery); - } else { - sourceBuilder.query(pivotQueryBuilder); - } - - logger.trace("running full run query: {}", sourceBuilder); - - return sourceBuilder; - } - private SearchSourceBuilder buildChangedBucketsQuery(SearchSourceBuilder sourceBuilder) { assert isContinuous(); @@ -781,9 +766,7 @@ private SearchSourceBuilder buildChangedBucketsQuery(SearchSourceBuilder sourceB return sourceBuilder; } - private SearchSourceBuilder buildPartialUpdateQuery(SearchSourceBuilder sourceBuilder) { - assert isContinuous(); - + private SearchSourceBuilder buildUpdateQuery(SearchSourceBuilder sourceBuilder) { TransformIndexerPosition position = getPosition(); sourceBuilder.aggregation(pivot.buildAggregation(position != null ? position.getIndexerPosition() : null, pageSize)); @@ -791,18 +774,28 @@ private SearchSourceBuilder buildPartialUpdateQuery(SearchSourceBuilder sourceBu QueryBuilder pivotQueryBuilder = config.getSource().getQueryConfig().getQuery(); + // if its either the 1st run or not continuous, do not apply extra filters + if (nextCheckpoint.getCheckpoint() == 1 || isContinuous() == false) { + sourceBuilder.query(pivotQueryBuilder); + logger.trace("running query: {}", sourceBuilder); + + return sourceBuilder; + } + BoolQueryBuilder filteredQuery = new BoolQueryBuilder().filter(pivotQueryBuilder) .filter(config.getSyncConfig().getRangeQuery(nextCheckpoint)); - if (changedBuckets != null && changedBuckets.isEmpty() == false) { - QueryBuilder pivotFilter = pivot.filterBuckets(changedBuckets); - if (pivotFilter != null) { - filteredQuery.filter(pivotFilter); - } + QueryBuilder pivotFilter = pivot.filterBuckets( + changedBuckets, + config.getSyncConfig().getField(), + lastCheckpoint.getTimeUpperBound() + ); + if (pivotFilter != null) { + filteredQuery.filter(pivotFilter); } sourceBuilder.query(filteredQuery); - logger.trace("running partial update query: {}", sourceBuilder); + logger.trace("running query: {}", sourceBuilder); return sourceBuilder; } @@ -903,16 +896,16 @@ protected boolean shouldAuditOnFinish(long completedCheckpoint) { private RunState determineRunStateAtStart() { // either 1st run or not a continuous transform if (nextCheckpoint.getCheckpoint() == 1 || isContinuous() == false) { - return RunState.FULL_RUN; + return RunState.APPLY_BUCKET_RESULTS; } // if incremental update is not supported, do a full run if (pivot.supportsIncrementalBucketUpdate() == false) { - return RunState.FULL_RUN; + return RunState.APPLY_BUCKET_RESULTS; } // continuous mode: we need to get the changed buckets first - return RunState.PARTIAL_RUN_IDENTIFY_CHANGES; + return RunState.IDENTIFY_CHANGES; } /** diff --git a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/pivot/Pivot.java b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/pivot/Pivot.java index 47259ee8c8142..6863c31cfb7f5 100644 --- a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/pivot/Pivot.java +++ b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/pivot/Pivot.java @@ -189,44 +189,37 @@ public Stream> extractResults( ); } - public QueryBuilder filterBuckets(Map> changedBuckets) { - - if (changedBuckets == null || changedBuckets.isEmpty()) { - return null; - } + public QueryBuilder filterBuckets( + Map> changedBuckets, + String synchronizationField, + long lastSynchronizationCheckpoint + ) { if (config.getGroupConfig().getGroups().size() == 1) { Entry entry = config.getGroupConfig().getGroups().entrySet().iterator().next(); - // it should not be possible to get into this code path - assert (entry.getValue().supportsIncrementalBucketUpdate()); logger.trace("filter by bucket: " + entry.getKey() + "/" + entry.getValue().getField()); - if (changedBuckets.containsKey(entry.getKey())) { - return entry.getValue().getIncrementalBucketUpdateFilterQuery(changedBuckets.get(entry.getKey())); - } else { - // should never happen - throw new RuntimeException("Could not find bucket value for key " + entry.getKey()); - } + Set changedBucketsByGroup = changedBuckets.get(entry.getKey()); + + // important: the fields must match to apply this optimization + long synchronizationTimestamp = entry.getKey().equals(synchronizationField) ? lastSynchronizationCheckpoint : 0; + + return entry.getValue().getIncrementalBucketUpdateFilterQuery(changedBucketsByGroup, synchronizationTimestamp); } // else: more than 1 group by, need to nest it BoolQueryBuilder filteredQuery = new BoolQueryBuilder(); for (Entry entry : config.getGroupConfig().getGroups().entrySet()) { - if (entry.getValue().supportsIncrementalBucketUpdate() == false) { - continue; - } - - if (changedBuckets.containsKey(entry.getKey())) { - QueryBuilder sourceQueryFilter = entry.getValue().getIncrementalBucketUpdateFilterQuery(changedBuckets.get(entry.getKey())); - // the source might not define an filter optimization - if (sourceQueryFilter != null) { - filteredQuery.filter(sourceQueryFilter); - } - } else { - // should never happen - throw new RuntimeException("Could not find bucket value for key " + entry.getKey()); + Set changedBucketsByGroup = changedBuckets.get(entry.getKey()); + // important: the fields must match to apply this optimization + long synchronizationTimestamp = entry.getKey().equals(synchronizationField) ? lastSynchronizationCheckpoint : 0; + + QueryBuilder sourceQueryFilter = entry.getValue() + .getIncrementalBucketUpdateFilterQuery(changedBucketsByGroup, synchronizationTimestamp); + // the source might not define a filter optimization + if (sourceQueryFilter != null) { + filteredQuery.filter(sourceQueryFilter); } - } return filteredQuery; From d7ebaf50d6d5f104090bc3a0e1a4a6e07cd56b38 Mon Sep 17 00:00:00 2001 From: Hendrik Muhs Date: Tue, 24 Mar 2020 11:44:56 +0100 Subject: [PATCH 3/9] remove dead code --- .../transforms/TransformIndexer.java | 23 +++---------------- 1 file changed, 3 insertions(+), 20 deletions(-) diff --git a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformIndexer.java b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformIndexer.java index 1312f14248792..b9525e9e65114 100644 --- a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformIndexer.java +++ b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformIndexer.java @@ -231,7 +231,7 @@ protected void onStart(long now, ActionListener listener) { pageSize = pivot.getInitialPageSize(); } - runState = determineRunStateAtStart(); + runState = determineRunState(); listener.onResponse(true); } catch (Exception e) { listener.onFailure(e); @@ -584,7 +584,7 @@ private IterationResult processBuckets(final Composite changedBuckets = Collections.emptyMap(); // reset the runState to fetch changed buckets - runState = determineRunStateAtStart(); + runState = determineRunState(); // advance the cursor for changed bucket detection return new IterationResult<>(Collections.emptyList(), new TransformIndexerPosition(null, changedBucketsAfterKey), false); @@ -614,23 +614,6 @@ private IterationResult processBuckets(final Composite return result; } - private IterationResult processPartialBucketUpdates(final CompositeAggregation agg) { - // we reached the end - if (agg.getBuckets().isEmpty()) { - // cleanup changed Buckets - changedBuckets = Collections.emptyMap(); - - // reset the runState to fetch changed buckets - runState = determineRunStateAtStart(); - - // RunState.PARTIAL_RUN_IDENTIFY_CHANGES; - // advance the cursor for changed bucket detection - return new IterationResult<>(Collections.emptyList(), new TransformIndexerPosition(null, changedBucketsAfterKey), false); - } - - return processBuckets(agg); - } - private IterationResult processChangedBuckets(final CompositeAggregation agg) { // initialize the map of changed buckets, the map might be empty if source do not require/implement // changed bucket detection @@ -893,7 +876,7 @@ protected boolean shouldAuditOnFinish(long completedCheckpoint) { return true; } - private RunState determineRunStateAtStart() { + private RunState determineRunState() { // either 1st run or not a continuous transform if (nextCheckpoint.getCheckpoint() == 1 || isContinuous() == false) { return RunState.APPLY_BUCKET_RESULTS; From 5eddcdea8a7d82ac41ecb156b467d44a014fada0 Mon Sep 17 00:00:00 2001 From: Hendrik Muhs Date: Tue, 24 Mar 2020 22:02:24 +0100 Subject: [PATCH 4/9] move synchronizationField into group source --- .../pivot/DateHistogramGroupSource.java | 12 +++++++----- .../transforms/pivot/HistogramGroupSource.java | 6 +++++- .../transforms/pivot/SingleGroupSource.java | 6 +++++- .../transforms/pivot/TermsGroupSource.java | 6 +++++- .../transform/transforms/TransformIndexer.java | 8 ++++---- .../xpack/transform/transforms/pivot/Pivot.java | 15 +++------------ 6 files changed, 29 insertions(+), 24 deletions(-) diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/transforms/pivot/DateHistogramGroupSource.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/transforms/pivot/DateHistogramGroupSource.java index 7e50d97685487..46d7727791eb9 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/transforms/pivot/DateHistogramGroupSource.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/transforms/pivot/DateHistogramGroupSource.java @@ -220,9 +220,8 @@ public DateHistogramGroupSource(String field, ScriptConfig scriptConfig, Interva final Rounding.Builder roundingBuilder; if (timeUnit != null) { roundingBuilder = new Rounding.Builder(timeUnit); - } else { - roundingBuilder = new Rounding.Builder(TimeValue.parseTimeValue(interval.toString(), "createRounding")); + roundingBuilder = new Rounding.Builder(TimeValue.parseTimeValue(interval.toString(), interval.getName())); } if (timeZone != null) { @@ -345,9 +344,12 @@ public int hashCode() { } @Override - public QueryBuilder getIncrementalBucketUpdateFilterQuery(Set changedBuckets, long synchronizationTimestamp) { - if (synchronizationTimestamp > 0) { - + public QueryBuilder getIncrementalBucketUpdateFilterQuery( + Set changedBuckets, + String synchronizationField, + long synchronizationTimestamp + ) { + if (synchronizationField != null && field != null && synchronizationField.equals(field) && synchronizationTimestamp > 0) { return new RangeQueryBuilder(field).gte(rounding.round(synchronizationTimestamp)).format("epoch_millis"); } else { return null; diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/transforms/pivot/HistogramGroupSource.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/transforms/pivot/HistogramGroupSource.java index 25549002848fa..4f62faa7cb09b 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/transforms/pivot/HistogramGroupSource.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/transforms/pivot/HistogramGroupSource.java @@ -101,7 +101,11 @@ public int hashCode() { } @Override - public QueryBuilder getIncrementalBucketUpdateFilterQuery(Set changedBuckets, long synchronizationTimestamp) { + public QueryBuilder getIncrementalBucketUpdateFilterQuery( + Set changedBuckets, + String synchronizationField, + long synchronizationTimestamp + ) { // histograms are simple and cheap, so we skip this optimization return null; } diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/transforms/pivot/SingleGroupSource.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/transforms/pivot/SingleGroupSource.java index 1ff2a7d6a9ef2..569bf08d8209a 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/transforms/pivot/SingleGroupSource.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/transforms/pivot/SingleGroupSource.java @@ -116,7 +116,11 @@ public void writeTo(StreamOutput out) throws IOException { public abstract boolean supportsIncrementalBucketUpdate(); - public abstract QueryBuilder getIncrementalBucketUpdateFilterQuery(Set changedBuckets, long synchronizationTimestamp); + public abstract QueryBuilder getIncrementalBucketUpdateFilterQuery( + Set changedBuckets, + String synchronizationField, + long synchronizationTimestamp + ); public String getField() { return field; diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/transforms/pivot/TermsGroupSource.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/transforms/pivot/TermsGroupSource.java index 5a93163e2cf8a..17a173836cb95 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/transforms/pivot/TermsGroupSource.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/transforms/pivot/TermsGroupSource.java @@ -54,7 +54,11 @@ public static TermsGroupSource fromXContent(final XContentParser parser, boolean } @Override - public QueryBuilder getIncrementalBucketUpdateFilterQuery(Set changedBuckets, long synchronizationTimestamp) { + public QueryBuilder getIncrementalBucketUpdateFilterQuery( + Set changedBuckets, + String synchronizationField, + long synchronizationTimestamp + ) { if (changedBuckets != null && changedBuckets.isEmpty() == false) { return new TermsQueryBuilder(field, changedBuckets); } diff --git a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformIndexer.java b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformIndexer.java index b9525e9e65114..80de3b8d2a622 100644 --- a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformIndexer.java +++ b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformIndexer.java @@ -108,7 +108,7 @@ private enum RunState { private volatile RunState runState; // hold information for continuous mode (partial updates) - private volatile Map> changedBuckets; + private volatile Map> changedBuckets = Collections.emptyMap(); private volatile Map changedBucketsAfterKey; private volatile long lastCheckpointCleanup = 0L; @@ -576,7 +576,7 @@ private void sourceHasChanged(ActionListener hasChangedListener) { private IterationResult processBuckets(final CompositeAggregation agg) { // we reached the end if (agg.getBuckets().isEmpty()) { - if (nextCheckpoint.getCheckpoint() == 1 || isContinuous() == false) { + if (nextCheckpoint.getCheckpoint() == 1 || isContinuous() == false || pivot.supportsIncrementalBucketUpdate() == false) { return new IterationResult<>(Collections.emptyList(), null, true); } @@ -584,7 +584,7 @@ private IterationResult processBuckets(final Composite changedBuckets = Collections.emptyMap(); // reset the runState to fetch changed buckets - runState = determineRunState(); + runState = RunState.IDENTIFY_CHANGES; // advance the cursor for changed bucket detection return new IterationResult<>(Collections.emptyList(), new TransformIndexerPosition(null, changedBucketsAfterKey), false); @@ -882,7 +882,7 @@ private RunState determineRunState() { return RunState.APPLY_BUCKET_RESULTS; } - // if incremental update is not supported, do a full run + // if incremental update is not supported, do a normal run if (pivot.supportsIncrementalBucketUpdate() == false) { return RunState.APPLY_BUCKET_RESULTS; } diff --git a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/pivot/Pivot.java b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/pivot/Pivot.java index 6863c31cfb7f5..1e174dbdbad58 100644 --- a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/pivot/Pivot.java +++ b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/pivot/Pivot.java @@ -174,7 +174,6 @@ public Stream> extractResults( Map fieldTypeMap, TransformIndexerStats transformIndexerStats ) { - GroupConfig groups = config.getGroupConfig(); Collection aggregationBuilders = config.getAggregationConfig().getAggregatorFactories(); Collection pipelineAggregationBuilders = config.getAggregationConfig().getPipelineAggregatorFactories(); @@ -194,28 +193,20 @@ public QueryBuilder filterBuckets( String synchronizationField, long lastSynchronizationCheckpoint ) { - if (config.getGroupConfig().getGroups().size() == 1) { Entry entry = config.getGroupConfig().getGroups().entrySet().iterator().next(); - logger.trace("filter by bucket: " + entry.getKey() + "/" + entry.getValue().getField()); Set changedBucketsByGroup = changedBuckets.get(entry.getKey()); - - // important: the fields must match to apply this optimization - long synchronizationTimestamp = entry.getKey().equals(synchronizationField) ? lastSynchronizationCheckpoint : 0; - - return entry.getValue().getIncrementalBucketUpdateFilterQuery(changedBucketsByGroup, synchronizationTimestamp); + return entry.getValue() + .getIncrementalBucketUpdateFilterQuery(changedBucketsByGroup, synchronizationField, lastSynchronizationCheckpoint); } // else: more than 1 group by, need to nest it BoolQueryBuilder filteredQuery = new BoolQueryBuilder(); for (Entry entry : config.getGroupConfig().getGroups().entrySet()) { Set changedBucketsByGroup = changedBuckets.get(entry.getKey()); - // important: the fields must match to apply this optimization - long synchronizationTimestamp = entry.getKey().equals(synchronizationField) ? lastSynchronizationCheckpoint : 0; - QueryBuilder sourceQueryFilter = entry.getValue() - .getIncrementalBucketUpdateFilterQuery(changedBucketsByGroup, synchronizationTimestamp); + .getIncrementalBucketUpdateFilterQuery(changedBucketsByGroup, synchronizationField, lastSynchronizationCheckpoint); // the source might not define a filter optimization if (sourceQueryFilter != null) { filteredQuery.filter(sourceQueryFilter); From e10111b724490c3d3f7391a5ac8b895f50eaf294 Mon Sep 17 00:00:00 2001 From: Hendrik Muhs Date: Thu, 26 Mar 2020 09:07:20 +0100 Subject: [PATCH 5/9] add some more date histogram rounding test cases --- .../pivot/DateHistogramGroupSourceTests.java | 64 ++++++++++++++++++- 1 file changed, 62 insertions(+), 2 deletions(-) diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/transform/transforms/pivot/DateHistogramGroupSourceTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/transform/transforms/pivot/DateHistogramGroupSourceTests.java index 1cd8dea1b4133..265f2a9446828 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/transform/transforms/pivot/DateHistogramGroupSourceTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/transform/transforms/pivot/DateHistogramGroupSourceTests.java @@ -10,11 +10,17 @@ import org.elasticsearch.common.io.stream.BytesStreamOutput; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.Writeable.Reader; +import org.elasticsearch.common.time.DateFormatter; +import org.elasticsearch.common.time.DateFormatters; import org.elasticsearch.common.xcontent.XContentParser; import org.elasticsearch.search.aggregations.bucket.histogram.DateHistogramInterval; import org.elasticsearch.test.AbstractSerializingTestCase; import java.io.IOException; +import java.time.ZoneOffset; +import java.time.temporal.TemporalAccessor; + +import static org.hamcrest.Matchers.equalTo; public class DateHistogramGroupSourceTests extends AbstractSerializingTestCase { @@ -26,14 +32,16 @@ public static DateHistogramGroupSource randomDateHistogramGroupSource() { dateHistogramGroupSource = new DateHistogramGroupSource( field, scriptConfig, - new DateHistogramGroupSource.FixedInterval(new DateHistogramInterval(randomTimeValue(1, 1, "d", "h", "ms", "s", "m"))), + new DateHistogramGroupSource.FixedInterval(new DateHistogramInterval(randomTimeValue(1, 100, "d", "h", "ms", "s", "m"))), randomBoolean() ? randomZone() : null ); } else { dateHistogramGroupSource = new DateHistogramGroupSource( field, scriptConfig, - new DateHistogramGroupSource.CalendarInterval(new DateHistogramInterval(randomTimeValue(1, 1, "m", "h", "d", "w"))), + new DateHistogramGroupSource.CalendarInterval( + new DateHistogramInterval(randomTimeValue(1, 1, "m", "h", "d", "w", "M", "q", "y")) + ), randomBoolean() ? randomZone() : null ); } @@ -69,4 +77,56 @@ protected Reader instanceReader() { return DateHistogramGroupSource::new; } + public void testRoundingDateHistogramFixedInterval() { + String field = randomBoolean() ? null : randomAlphaOfLengthBetween(1, 20); + DateHistogramGroupSource dateHistogramGroupSource = new DateHistogramGroupSource( + field, + null, + new DateHistogramGroupSource.FixedInterval(new DateHistogramInterval("1d")), + null + ); + + // not meant to be complete rounding tests, see {@link RoundingTests} for more + assertNotNull(dateHistogramGroupSource.getRounding()); + + assertThat( + dateHistogramGroupSource.getRounding().round(time("2020-03-26T23:59:59.000Z")), + equalTo(time("2020-03-26T00:00:00.000Z")) + ); + assertThat( + dateHistogramGroupSource.getRounding().round(time("2020-03-26T00:00:01.000Z")), + equalTo(time("2020-03-26T00:00:00.000Z")) + ); + } + + public void testRoundingDateHistogramCalendarInterval() { + String field = randomBoolean() ? null : randomAlphaOfLengthBetween(1, 20); + DateHistogramGroupSource dateHistogramGroupSource = new DateHistogramGroupSource( + field, + null, + new DateHistogramGroupSource.CalendarInterval(new DateHistogramInterval("1w")), + null + ); + + // not meant to be complete rounding tests, see {@link RoundingTests} for more + assertNotNull(dateHistogramGroupSource.getRounding()); + + assertThat( + dateHistogramGroupSource.getRounding().round(time("2020-03-26T23:59:59.000Z")), + equalTo(time("2020-03-23T00:00:00.000Z")) + ); + assertThat( + dateHistogramGroupSource.getRounding().round(time("2020-03-29T23:59:59.000Z")), + equalTo(time("2020-03-23T00:00:00.000Z")) + ); + assertThat( + dateHistogramGroupSource.getRounding().round(time("2020-03-23T00:00:01.000Z")), + equalTo(time("2020-03-23T00:00:00.000Z")) + ); + } + + private static long time(String time) { + TemporalAccessor accessor = DateFormatter.forPattern("date_optional_time").withZone(ZoneOffset.UTC).parse(time); + return DateFormatters.from(accessor).toInstant().toEpochMilli(); + } } From 80d94133ed508acde7634f710f0a03b38f4e168d Mon Sep 17 00:00:00 2001 From: Hendrik Muhs Date: Thu, 26 Mar 2020 09:22:09 +0100 Subject: [PATCH 6/9] revert methode rename --- .../xpack/transform/transforms/TransformIndexer.java | 4 ++-- .../elasticsearch/xpack/transform/transforms/pivot/Pivot.java | 2 ++ 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformIndexer.java b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformIndexer.java index 80de3b8d2a622..80b9c9ca1ddba 100644 --- a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformIndexer.java +++ b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformIndexer.java @@ -231,7 +231,7 @@ protected void onStart(long now, ActionListener listener) { pageSize = pivot.getInitialPageSize(); } - runState = determineRunState(); + runState = determineRunStateAtStart(); listener.onResponse(true); } catch (Exception e) { listener.onFailure(e); @@ -876,7 +876,7 @@ protected boolean shouldAuditOnFinish(long completedCheckpoint) { return true; } - private RunState determineRunState() { + private RunState determineRunStateAtStart() { // either 1st run or not a continuous transform if (nextCheckpoint.getCheckpoint() == 1 || isContinuous() == false) { return RunState.APPLY_BUCKET_RESULTS; diff --git a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/pivot/Pivot.java b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/pivot/Pivot.java index 1e174dbdbad58..6c8b11e3b9c88 100644 --- a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/pivot/Pivot.java +++ b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/pivot/Pivot.java @@ -193,6 +193,8 @@ public QueryBuilder filterBuckets( String synchronizationField, long lastSynchronizationCheckpoint ) { + assert changedBuckets != null; + if (config.getGroupConfig().getGroups().size() == 1) { Entry entry = config.getGroupConfig().getGroups().entrySet().iterator().next(); logger.trace("filter by bucket: " + entry.getKey() + "/" + entry.getValue().getField()); From e4389bf1a6d09ac5fb5d3bb3a61b33fbe52d5f74 Mon Sep 17 00:00:00 2001 From: Hendrik Muhs Date: Thu, 26 Mar 2020 14:11:44 +0100 Subject: [PATCH 7/9] Update x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/pivot/Pivot.java Co-Authored-By: Benjamin Trent --- .../elasticsearch/xpack/transform/transforms/pivot/Pivot.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/pivot/Pivot.java b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/pivot/Pivot.java index 6c8b11e3b9c88..d11059e97e2b8 100644 --- a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/pivot/Pivot.java +++ b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/pivot/Pivot.java @@ -197,7 +197,7 @@ public QueryBuilder filterBuckets( if (config.getGroupConfig().getGroups().size() == 1) { Entry entry = config.getGroupConfig().getGroups().entrySet().iterator().next(); - logger.trace("filter by bucket: " + entry.getKey() + "/" + entry.getValue().getField()); + logger.trace(() -> new ParameterizedMessage("filter by bucket: {}/{}", entry.getKey(), entry.getValue().getField())); Set changedBucketsByGroup = changedBuckets.get(entry.getKey()); return entry.getValue() .getIncrementalBucketUpdateFilterQuery(changedBucketsByGroup, synchronizationField, lastSynchronizationCheckpoint); From f7cd216a0982afe3fa9cb0dd07343506b625ef9a Mon Sep 17 00:00:00 2001 From: Hendrik Muhs Date: Thu, 26 Mar 2020 14:11:57 +0100 Subject: [PATCH 8/9] Update x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/transforms/pivot/DateHistogramGroupSource.java Co-Authored-By: Benjamin Trent --- .../transform/transforms/pivot/DateHistogramGroupSource.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/transforms/pivot/DateHistogramGroupSource.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/transforms/pivot/DateHistogramGroupSource.java index 46d7727791eb9..f61497e31d3e6 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/transforms/pivot/DateHistogramGroupSource.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/transforms/pivot/DateHistogramGroupSource.java @@ -349,7 +349,7 @@ public QueryBuilder getIncrementalBucketUpdateFilterQuery( String synchronizationField, long synchronizationTimestamp ) { - if (synchronizationField != null && field != null && synchronizationField.equals(field) && synchronizationTimestamp > 0) { + if (synchronizationField != null && synchronizationField.equals(field) && synchronizationTimestamp > 0) { return new RangeQueryBuilder(field).gte(rounding.round(synchronizationTimestamp)).format("epoch_millis"); } else { return null; From a3595e6c0155143a1dcd3ceca14c0a054e280a33 Mon Sep 17 00:00:00 2001 From: Hendrik Muhs Date: Thu, 26 Mar 2020 14:43:32 +0100 Subject: [PATCH 9/9] fix import --- .../elasticsearch/xpack/transform/transforms/pivot/Pivot.java | 1 + 1 file changed, 1 insertion(+) diff --git a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/pivot/Pivot.java b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/pivot/Pivot.java index d11059e97e2b8..16bd7d9f6a935 100644 --- a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/pivot/Pivot.java +++ b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/pivot/Pivot.java @@ -8,6 +8,7 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import org.apache.logging.log4j.message.ParameterizedMessage; import org.elasticsearch.ElasticsearchStatusException; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.search.SearchAction;