Skip to content

Commit

Permalink
[Transform] Transform optmize date histogram (#54068)
Browse files Browse the repository at this point in the history
optimize transform for group_by on date_histogram by injecting an additional range query. This limits the number of search and index requests and avoids unnecessary updates. Only recent buckets get re-written.

fixes #54254
  • Loading branch information
Hendrik Muhs authored Mar 26, 2020
1 parent 8168895 commit 69fe13c
Show file tree
Hide file tree
Showing 11 changed files with 221 additions and 140 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -74,19 +74,18 @@ public static DateHistogramGroupSource randomDateHistogramGroupSource() {
dateHistogramGroupSource = new DateHistogramGroupSource(
field,
scriptConfig,
new DateHistogramGroupSource.FixedInterval(new DateHistogramInterval(randomPositiveTimeValue()))
new DateHistogramGroupSource.FixedInterval(new DateHistogramInterval(randomPositiveTimeValue())),
randomBoolean() ? randomZone() : null
);
} else {
dateHistogramGroupSource = new DateHistogramGroupSource(
field,
scriptConfig,
new DateHistogramGroupSource.CalendarInterval(new DateHistogramInterval(randomTimeValue(1, 1, "m", "h", "d", "w")))
new DateHistogramGroupSource.CalendarInterval(new DateHistogramInterval(randomTimeValue(1, 1, "m", "h", "d", "w"))),
randomBoolean() ? randomZone() : null
);
}

if (randomBoolean()) {
dateHistogramGroupSource.setTimeZone(randomZone());
}
return dateHistogramGroupSource;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ public interface SyncConfig extends ToXContentObject, NamedWriteable {
*/
boolean isValid();

String getField();

QueryBuilder getRangeQuery(TransformCheckpoint newCheckpoint);

QueryBuilder getRangeQuery(TransformCheckpoint oldCheckpoint, TransformCheckpoint newCheckpoint);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
import static org.elasticsearch.common.xcontent.ConstructingObjectParser.constructorArg;
import static org.elasticsearch.common.xcontent.ConstructingObjectParser.optionalConstructorArg;

public class TimeSyncConfig implements SyncConfig {
public class TimeSyncConfig implements SyncConfig {

public static final TimeValue DEFAULT_DELAY = TimeValue.timeValueSeconds(60);
private static final String NAME = "data_frame_transform_pivot_sync_time";
Expand All @@ -37,17 +37,18 @@ public class TimeSyncConfig implements SyncConfig {
private static final ConstructingObjectParser<TimeSyncConfig, Void> LENIENT_PARSER = createParser(true);

private static ConstructingObjectParser<TimeSyncConfig, Void> createParser(boolean lenient) {
ConstructingObjectParser<TimeSyncConfig, Void> parser = new ConstructingObjectParser<>(NAME, lenient,
args -> {
String field = (String) args[0];
TimeValue delay = (TimeValue) args[1];
return new TimeSyncConfig(field, delay);
});
ConstructingObjectParser<TimeSyncConfig, Void> parser = new ConstructingObjectParser<>(NAME, lenient, args -> {
String field = (String) args[0];
TimeValue delay = (TimeValue) args[1];
return new TimeSyncConfig(field, delay);
});
parser.declareString(constructorArg(), TransformField.FIELD);
parser.declareField(optionalConstructorArg(),
parser.declareField(
optionalConstructorArg(),
(p, c) -> TimeValue.parseTimeValue(p.text(), DEFAULT_DELAY, TransformField.DELAY.getPreferredName()),
TransformField.DELAY,
ObjectParser.ValueType.STRING);
ObjectParser.ValueType.STRING
);
return parser;
}

Expand All @@ -65,6 +66,7 @@ public TimeSyncConfig(StreamInput in) throws IOException {
this.delay = in.readTimeValue();
}

@Override
public String getField() {
return field;
}
Expand Down Expand Up @@ -105,12 +107,11 @@ public boolean equals(Object other) {

final TimeSyncConfig that = (TimeSyncConfig) other;

return Objects.equals(this.field, that.field)
&& Objects.equals(this.delay, that.delay);
return Objects.equals(this.field, that.field) && Objects.equals(this.delay, that.delay);
}

@Override
public int hashCode(){
public int hashCode() {
return Objects.hash(field, delay);
}

Expand Down Expand Up @@ -139,7 +140,8 @@ public QueryBuilder getRangeQuery(TransformCheckpoint newCheckpoint) {

@Override
public QueryBuilder getRangeQuery(TransformCheckpoint oldCheckpoint, TransformCheckpoint newCheckpoint) {
return new RangeQueryBuilder(field).gte(oldCheckpoint.getTimeUpperBound()).lt(newCheckpoint.getTimeUpperBound())
.format("epoch_millis");
return new RangeQueryBuilder(field).gte(oldCheckpoint.getTimeUpperBound())
.lt(newCheckpoint.getTimeUpperBound())
.format("epoch_millis");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,15 +7,18 @@

import org.elasticsearch.Version;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.Rounding;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.ConstructingObjectParser;
import org.elasticsearch.common.xcontent.ObjectParser;
import org.elasticsearch.common.xcontent.ToXContentFragment;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.index.query.RangeQueryBuilder;
import org.elasticsearch.search.aggregations.bucket.histogram.DateHistogramAggregationBuilder;
import org.elasticsearch.search.aggregations.bucket.histogram.DateHistogramInterval;

Expand Down Expand Up @@ -105,6 +108,11 @@ public boolean equals(Object other) {
public int hashCode() {
return Objects.hash(interval);
}

@Override
public String toString() {
return interval.toString();
}
}

public static class CalendarInterval implements Interval {
Expand Down Expand Up @@ -169,6 +177,11 @@ public boolean equals(Object other) {
public int hashCode() {
return Objects.hash(interval);
}

@Override
public String toString() {
return interval.toString();
}
}

private Interval readInterval(StreamInput in) throws IOException {
Expand All @@ -195,11 +208,26 @@ private void writeInterval(Interval interval, StreamOutput out) throws IOExcepti
private static final ConstructingObjectParser<DateHistogramGroupSource, Void> LENIENT_PARSER = createParser(true);

private final Interval interval;
private ZoneId timeZone;
private final ZoneId timeZone;
private Rounding rounding;

public DateHistogramGroupSource(String field, ScriptConfig scriptConfig, Interval interval) {
public DateHistogramGroupSource(String field, ScriptConfig scriptConfig, Interval interval, ZoneId timeZone) {
super(field, scriptConfig);
this.interval = interval;
this.timeZone = timeZone;

Rounding.DateTimeUnit timeUnit = DateHistogramAggregationBuilder.DATE_FIELD_UNITS.get(interval.toString());
final Rounding.Builder roundingBuilder;
if (timeUnit != null) {
roundingBuilder = new Rounding.Builder(timeUnit);
} else {
roundingBuilder = new Rounding.Builder(TimeValue.parseTimeValue(interval.toString(), interval.getName()));
}

if (timeZone != null) {
roundingBuilder.timeZone(timeZone);
}
this.rounding = roundingBuilder.build();
}

public DateHistogramGroupSource(StreamInput in) throws IOException {
Expand All @@ -218,6 +246,7 @@ private static ConstructingObjectParser<DateHistogramGroupSource, Void> createPa
ScriptConfig scriptConfig = (ScriptConfig) args[1];
String fixedInterval = (String) args[2];
String calendarInterval = (String) args[3];
ZoneId zoneId = (ZoneId) args[4];

Interval interval = null;

Expand All @@ -231,15 +260,15 @@ private static ConstructingObjectParser<DateHistogramGroupSource, Void> createPa
throw new IllegalArgumentException("You must specify either fixed_interval or calendar_interval, found none");
}

return new DateHistogramGroupSource(field, scriptConfig, interval);
return new DateHistogramGroupSource(field, scriptConfig, interval, zoneId);
});

declareValuesSourceFields(parser, lenient);

parser.declareString(optionalConstructorArg(), new ParseField(FixedInterval.NAME));
parser.declareString(optionalConstructorArg(), new ParseField(CalendarInterval.NAME));

parser.declareField(DateHistogramGroupSource::setTimeZone, p -> {
parser.declareField(optionalConstructorArg(), p -> {
if (p.currentToken() == XContentParser.Token.VALUE_STRING) {
return ZoneId.of(p.text());
} else {
Expand Down Expand Up @@ -267,8 +296,8 @@ public ZoneId getTimeZone() {
return timeZone;
}

public void setTimeZone(ZoneId timeZone) {
this.timeZone = timeZone;
public Rounding getRounding() {
return rounding;
}

@Override
Expand Down Expand Up @@ -315,9 +344,16 @@ public int hashCode() {
}

@Override
public QueryBuilder getIncrementalBucketUpdateFilterQuery(Set<String> changedBuckets) {
// no need for an extra range filter as this is already done by checkpoints
return null;
public QueryBuilder getIncrementalBucketUpdateFilterQuery(
Set<String> changedBuckets,
String synchronizationField,
long synchronizationTimestamp
) {
if (synchronizationField != null && synchronizationField.equals(field) && synchronizationTimestamp > 0) {
return new RangeQueryBuilder(field).gte(rounding.round(synchronizationTimestamp)).format("epoch_millis");
} else {
return null;
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,11 @@ public int hashCode() {
}

@Override
public QueryBuilder getIncrementalBucketUpdateFilterQuery(Set<String> changedBuckets) {
public QueryBuilder getIncrementalBucketUpdateFilterQuery(
Set<String> changedBuckets,
String synchronizationField,
long synchronizationTimestamp
) {
// histograms are simple and cheap, so we skip this optimization
return null;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,11 @@ public void writeTo(StreamOutput out) throws IOException {

public abstract boolean supportsIncrementalBucketUpdate();

public abstract QueryBuilder getIncrementalBucketUpdateFilterQuery(Set<String> changedBuckets);
public abstract QueryBuilder getIncrementalBucketUpdateFilterQuery(
Set<String> changedBuckets,
String synchronizationField,
long synchronizationTimestamp
);

public String getField() {
return field;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,15 @@ public static TermsGroupSource fromXContent(final XContentParser parser, boolean
}

@Override
public QueryBuilder getIncrementalBucketUpdateFilterQuery(Set<String> changedBuckets) {
return new TermsQueryBuilder(field, changedBuckets);
public QueryBuilder getIncrementalBucketUpdateFilterQuery(
Set<String> changedBuckets,
String synchronizationField,
long synchronizationTimestamp
) {
if (changedBuckets != null && changedBuckets.isEmpty() == false) {
return new TermsQueryBuilder(field, changedBuckets);
}
return null;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,9 @@
import java.util.Collections;
import java.util.Map;

import static org.elasticsearch.xpack.core.transform.transforms.TransformConfigTests.randomTransformConfig;
import static org.elasticsearch.xpack.core.transform.transforms.DestConfigTests.randomDestConfig;
import static org.elasticsearch.xpack.core.transform.transforms.SourceConfigTests.randomSourceConfig;
import static org.elasticsearch.xpack.core.transform.transforms.TransformConfigTests.randomTransformConfig;
import static org.hamcrest.Matchers.equalTo;

public class TransformConfigUpdateTests extends AbstractSerializingTransformTestCase<TransformConfigUpdate> {
Expand Down Expand Up @@ -184,6 +184,11 @@ public String getWriteableName() {
return "foo";
}

@Override
public String getField() {
return "foo";
}

@Override
public void writeTo(StreamOutput out) throws IOException {}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,17 @@
import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.Writeable.Reader;
import org.elasticsearch.common.time.DateFormatter;
import org.elasticsearch.common.time.DateFormatters;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.search.aggregations.bucket.histogram.DateHistogramInterval;
import org.elasticsearch.test.AbstractSerializingTestCase;

import java.io.IOException;
import java.time.ZoneOffset;
import java.time.temporal.TemporalAccessor;

import static org.hamcrest.Matchers.equalTo;

public class DateHistogramGroupSourceTests extends AbstractSerializingTestCase<DateHistogramGroupSource> {

Expand All @@ -26,19 +32,20 @@ public static DateHistogramGroupSource randomDateHistogramGroupSource() {
dateHistogramGroupSource = new DateHistogramGroupSource(
field,
scriptConfig,
new DateHistogramGroupSource.FixedInterval(new DateHistogramInterval(randomPositiveTimeValue()))
new DateHistogramGroupSource.FixedInterval(new DateHistogramInterval(randomTimeValue(1, 100, "d", "h", "ms", "s", "m"))),
randomBoolean() ? randomZone() : null
);
} else {
dateHistogramGroupSource = new DateHistogramGroupSource(
field,
scriptConfig,
new DateHistogramGroupSource.CalendarInterval(new DateHistogramInterval(randomTimeValue(1, 1, "m", "h", "d", "w")))
new DateHistogramGroupSource.CalendarInterval(
new DateHistogramInterval(randomTimeValue(1, 1, "m", "h", "d", "w", "M", "q", "y"))
),
randomBoolean() ? randomZone() : null
);
}

if (randomBoolean()) {
dateHistogramGroupSource.setTimeZone(randomZone());
}
return dateHistogramGroupSource;
}

Expand Down Expand Up @@ -70,4 +77,56 @@ protected Reader<DateHistogramGroupSource> instanceReader() {
return DateHistogramGroupSource::new;
}

public void testRoundingDateHistogramFixedInterval() {
String field = randomBoolean() ? null : randomAlphaOfLengthBetween(1, 20);
DateHistogramGroupSource dateHistogramGroupSource = new DateHistogramGroupSource(
field,
null,
new DateHistogramGroupSource.FixedInterval(new DateHistogramInterval("1d")),
null
);

// not meant to be complete rounding tests, see {@link RoundingTests} for more
assertNotNull(dateHistogramGroupSource.getRounding());

assertThat(
dateHistogramGroupSource.getRounding().round(time("2020-03-26T23:59:59.000Z")),
equalTo(time("2020-03-26T00:00:00.000Z"))
);
assertThat(
dateHistogramGroupSource.getRounding().round(time("2020-03-26T00:00:01.000Z")),
equalTo(time("2020-03-26T00:00:00.000Z"))
);
}

public void testRoundingDateHistogramCalendarInterval() {
String field = randomBoolean() ? null : randomAlphaOfLengthBetween(1, 20);
DateHistogramGroupSource dateHistogramGroupSource = new DateHistogramGroupSource(
field,
null,
new DateHistogramGroupSource.CalendarInterval(new DateHistogramInterval("1w")),
null
);

// not meant to be complete rounding tests, see {@link RoundingTests} for more
assertNotNull(dateHistogramGroupSource.getRounding());

assertThat(
dateHistogramGroupSource.getRounding().round(time("2020-03-26T23:59:59.000Z")),
equalTo(time("2020-03-23T00:00:00.000Z"))
);
assertThat(
dateHistogramGroupSource.getRounding().round(time("2020-03-29T23:59:59.000Z")),
equalTo(time("2020-03-23T00:00:00.000Z"))
);
assertThat(
dateHistogramGroupSource.getRounding().round(time("2020-03-23T00:00:01.000Z")),
equalTo(time("2020-03-23T00:00:00.000Z"))
);
}

private static long time(String time) {
TemporalAccessor accessor = DateFormatter.forPattern("date_optional_time").withZone(ZoneOffset.UTC).parse(time);
return DateFormatters.from(accessor).toInstant().toEpochMilli();
}
}
Loading

0 comments on commit 69fe13c

Please sign in to comment.