Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Transform] Transform optmize date histogram #54068

Merged
Original file line number Diff line number Diff line change
Expand Up @@ -74,19 +74,18 @@ public static DateHistogramGroupSource randomDateHistogramGroupSource() {
dateHistogramGroupSource = new DateHistogramGroupSource(
field,
scriptConfig,
new DateHistogramGroupSource.FixedInterval(new DateHistogramInterval(randomPositiveTimeValue()))
new DateHistogramGroupSource.FixedInterval(new DateHistogramInterval(randomPositiveTimeValue())),
randomBoolean() ? randomZone() : null
);
} else {
dateHistogramGroupSource = new DateHistogramGroupSource(
field,
scriptConfig,
new DateHistogramGroupSource.CalendarInterval(new DateHistogramInterval(randomTimeValue(1, 1, "m", "h", "d", "w")))
new DateHistogramGroupSource.CalendarInterval(new DateHistogramInterval(randomTimeValue(1, 1, "m", "h", "d", "w"))),
randomBoolean() ? randomZone() : null
);
}

if (randomBoolean()) {
dateHistogramGroupSource.setTimeZone(randomZone());
}
return dateHistogramGroupSource;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ public interface SyncConfig extends ToXContentObject, NamedWriteable {
*/
boolean isValid();

String getField();

QueryBuilder getRangeQuery(TransformCheckpoint newCheckpoint);

QueryBuilder getRangeQuery(TransformCheckpoint oldCheckpoint, TransformCheckpoint newCheckpoint);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
import static org.elasticsearch.common.xcontent.ConstructingObjectParser.constructorArg;
import static org.elasticsearch.common.xcontent.ConstructingObjectParser.optionalConstructorArg;

public class TimeSyncConfig implements SyncConfig {
public class TimeSyncConfig implements SyncConfig {

public static final TimeValue DEFAULT_DELAY = TimeValue.timeValueSeconds(60);
private static final String NAME = "data_frame_transform_pivot_sync_time";
Expand All @@ -37,17 +37,18 @@ public class TimeSyncConfig implements SyncConfig {
private static final ConstructingObjectParser<TimeSyncConfig, Void> LENIENT_PARSER = createParser(true);

private static ConstructingObjectParser<TimeSyncConfig, Void> createParser(boolean lenient) {
ConstructingObjectParser<TimeSyncConfig, Void> parser = new ConstructingObjectParser<>(NAME, lenient,
args -> {
String field = (String) args[0];
TimeValue delay = (TimeValue) args[1];
return new TimeSyncConfig(field, delay);
});
ConstructingObjectParser<TimeSyncConfig, Void> parser = new ConstructingObjectParser<>(NAME, lenient, args -> {
String field = (String) args[0];
TimeValue delay = (TimeValue) args[1];
return new TimeSyncConfig(field, delay);
});
parser.declareString(constructorArg(), TransformField.FIELD);
parser.declareField(optionalConstructorArg(),
parser.declareField(
optionalConstructorArg(),
(p, c) -> TimeValue.parseTimeValue(p.text(), DEFAULT_DELAY, TransformField.DELAY.getPreferredName()),
TransformField.DELAY,
ObjectParser.ValueType.STRING);
ObjectParser.ValueType.STRING
);
return parser;
}

Expand All @@ -65,6 +66,7 @@ public TimeSyncConfig(StreamInput in) throws IOException {
this.delay = in.readTimeValue();
}

@Override
public String getField() {
return field;
}
Expand Down Expand Up @@ -105,12 +107,11 @@ public boolean equals(Object other) {

final TimeSyncConfig that = (TimeSyncConfig) other;

return Objects.equals(this.field, that.field)
&& Objects.equals(this.delay, that.delay);
return Objects.equals(this.field, that.field) && Objects.equals(this.delay, that.delay);
}

@Override
public int hashCode(){
public int hashCode() {
return Objects.hash(field, delay);
}

Expand Down Expand Up @@ -139,7 +140,8 @@ public QueryBuilder getRangeQuery(TransformCheckpoint newCheckpoint) {

@Override
public QueryBuilder getRangeQuery(TransformCheckpoint oldCheckpoint, TransformCheckpoint newCheckpoint) {
return new RangeQueryBuilder(field).gte(oldCheckpoint.getTimeUpperBound()).lt(newCheckpoint.getTimeUpperBound())
.format("epoch_millis");
return new RangeQueryBuilder(field).gte(oldCheckpoint.getTimeUpperBound())
.lt(newCheckpoint.getTimeUpperBound())
.format("epoch_millis");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,15 +7,18 @@

import org.elasticsearch.Version;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.Rounding;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.ConstructingObjectParser;
import org.elasticsearch.common.xcontent.ObjectParser;
import org.elasticsearch.common.xcontent.ToXContentFragment;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.index.query.RangeQueryBuilder;
import org.elasticsearch.search.aggregations.bucket.histogram.DateHistogramAggregationBuilder;
import org.elasticsearch.search.aggregations.bucket.histogram.DateHistogramInterval;

Expand Down Expand Up @@ -105,6 +108,11 @@ public boolean equals(Object other) {
public int hashCode() {
return Objects.hash(interval);
}

@Override
public String toString() {
return interval.toString();
}
}

public static class CalendarInterval implements Interval {
Expand Down Expand Up @@ -169,6 +177,11 @@ public boolean equals(Object other) {
public int hashCode() {
return Objects.hash(interval);
}

@Override
public String toString() {
return interval.toString();
}
}

private Interval readInterval(StreamInput in) throws IOException {
Expand All @@ -195,11 +208,26 @@ private void writeInterval(Interval interval, StreamOutput out) throws IOExcepti
private static final ConstructingObjectParser<DateHistogramGroupSource, Void> LENIENT_PARSER = createParser(true);

private final Interval interval;
private ZoneId timeZone;
private final ZoneId timeZone;
private Rounding rounding;

public DateHistogramGroupSource(String field, ScriptConfig scriptConfig, Interval interval) {
public DateHistogramGroupSource(String field, ScriptConfig scriptConfig, Interval interval, ZoneId timeZone) {
super(field, scriptConfig);
this.interval = interval;
this.timeZone = timeZone;

Rounding.DateTimeUnit timeUnit = DateHistogramAggregationBuilder.DATE_FIELD_UNITS.get(interval.toString());
final Rounding.Builder roundingBuilder;
if (timeUnit != null) {
roundingBuilder = new Rounding.Builder(timeUnit);
} else {
roundingBuilder = new Rounding.Builder(TimeValue.parseTimeValue(interval.toString(), interval.getName()));
}

if (timeZone != null) {
roundingBuilder.timeZone(timeZone);
}
this.rounding = roundingBuilder.build();
}

public DateHistogramGroupSource(StreamInput in) throws IOException {
Expand All @@ -218,6 +246,7 @@ private static ConstructingObjectParser<DateHistogramGroupSource, Void> createPa
ScriptConfig scriptConfig = (ScriptConfig) args[1];
String fixedInterval = (String) args[2];
String calendarInterval = (String) args[3];
ZoneId zoneId = (ZoneId) args[4];

Interval interval = null;

Expand All @@ -231,15 +260,15 @@ private static ConstructingObjectParser<DateHistogramGroupSource, Void> createPa
throw new IllegalArgumentException("You must specify either fixed_interval or calendar_interval, found none");
}

return new DateHistogramGroupSource(field, scriptConfig, interval);
return new DateHistogramGroupSource(field, scriptConfig, interval, zoneId);
});

declareValuesSourceFields(parser, lenient);

parser.declareString(optionalConstructorArg(), new ParseField(FixedInterval.NAME));
parser.declareString(optionalConstructorArg(), new ParseField(CalendarInterval.NAME));

parser.declareField(DateHistogramGroupSource::setTimeZone, p -> {
parser.declareField(optionalConstructorArg(), p -> {
if (p.currentToken() == XContentParser.Token.VALUE_STRING) {
return ZoneId.of(p.text());
} else {
Expand Down Expand Up @@ -267,8 +296,8 @@ public ZoneId getTimeZone() {
return timeZone;
}

public void setTimeZone(ZoneId timeZone) {
this.timeZone = timeZone;
public Rounding getRounding() {
return rounding;
}

@Override
Expand Down Expand Up @@ -315,9 +344,16 @@ public int hashCode() {
}

@Override
public QueryBuilder getIncrementalBucketUpdateFilterQuery(Set<String> changedBuckets) {
// no need for an extra range filter as this is already done by checkpoints
return null;
public QueryBuilder getIncrementalBucketUpdateFilterQuery(
Set<String> changedBuckets,
String synchronizationField,
long synchronizationTimestamp
) {
if (synchronizationField != null && synchronizationField.equals(field) && synchronizationTimestamp > 0) {
return new RangeQueryBuilder(field).gte(rounding.round(synchronizationTimestamp)).format("epoch_millis");
hendrikmuhs marked this conversation as resolved.
Show resolved Hide resolved
} else {
return null;
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,11 @@ public int hashCode() {
}

@Override
public QueryBuilder getIncrementalBucketUpdateFilterQuery(Set<String> changedBuckets) {
public QueryBuilder getIncrementalBucketUpdateFilterQuery(
Set<String> changedBuckets,
String synchronizationField,
long synchronizationTimestamp
) {
// histograms are simple and cheap, so we skip this optimization
return null;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,11 @@ public void writeTo(StreamOutput out) throws IOException {

public abstract boolean supportsIncrementalBucketUpdate();

public abstract QueryBuilder getIncrementalBucketUpdateFilterQuery(Set<String> changedBuckets);
public abstract QueryBuilder getIncrementalBucketUpdateFilterQuery(
Set<String> changedBuckets,
String synchronizationField,
long synchronizationTimestamp
);

public String getField() {
return field;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,15 @@ public static TermsGroupSource fromXContent(final XContentParser parser, boolean
}

@Override
public QueryBuilder getIncrementalBucketUpdateFilterQuery(Set<String> changedBuckets) {
return new TermsQueryBuilder(field, changedBuckets);
public QueryBuilder getIncrementalBucketUpdateFilterQuery(
Set<String> changedBuckets,
String synchronizationField,
long synchronizationTimestamp
) {
if (changedBuckets != null && changedBuckets.isEmpty() == false) {
return new TermsQueryBuilder(field, changedBuckets);
}
return null;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,9 @@
import java.util.Collections;
import java.util.Map;

import static org.elasticsearch.xpack.core.transform.transforms.TransformConfigTests.randomTransformConfig;
import static org.elasticsearch.xpack.core.transform.transforms.DestConfigTests.randomDestConfig;
import static org.elasticsearch.xpack.core.transform.transforms.SourceConfigTests.randomSourceConfig;
import static org.elasticsearch.xpack.core.transform.transforms.TransformConfigTests.randomTransformConfig;
import static org.hamcrest.Matchers.equalTo;

public class TransformConfigUpdateTests extends AbstractSerializingTransformTestCase<TransformConfigUpdate> {
Expand Down Expand Up @@ -184,6 +184,11 @@ public String getWriteableName() {
return "foo";
}

@Override
public String getField() {
return "foo";
}

@Override
public void writeTo(StreamOutput out) throws IOException {}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,17 @@
import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.Writeable.Reader;
import org.elasticsearch.common.time.DateFormatter;
import org.elasticsearch.common.time.DateFormatters;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.search.aggregations.bucket.histogram.DateHistogramInterval;
import org.elasticsearch.test.AbstractSerializingTestCase;

import java.io.IOException;
import java.time.ZoneOffset;
import java.time.temporal.TemporalAccessor;

import static org.hamcrest.Matchers.equalTo;

public class DateHistogramGroupSourceTests extends AbstractSerializingTestCase<DateHistogramGroupSource> {

Expand All @@ -26,19 +32,20 @@ public static DateHistogramGroupSource randomDateHistogramGroupSource() {
dateHistogramGroupSource = new DateHistogramGroupSource(
field,
scriptConfig,
new DateHistogramGroupSource.FixedInterval(new DateHistogramInterval(randomPositiveTimeValue()))
new DateHistogramGroupSource.FixedInterval(new DateHistogramInterval(randomTimeValue(1, 100, "d", "h", "ms", "s", "m"))),
randomBoolean() ? randomZone() : null
);
} else {
dateHistogramGroupSource = new DateHistogramGroupSource(
field,
scriptConfig,
new DateHistogramGroupSource.CalendarInterval(new DateHistogramInterval(randomTimeValue(1, 1, "m", "h", "d", "w")))
new DateHistogramGroupSource.CalendarInterval(
new DateHistogramInterval(randomTimeValue(1, 1, "m", "h", "d", "w", "M", "q", "y"))
),
randomBoolean() ? randomZone() : null
);
}

if (randomBoolean()) {
dateHistogramGroupSource.setTimeZone(randomZone());
}
return dateHistogramGroupSource;
}

Expand Down Expand Up @@ -70,4 +77,56 @@ protected Reader<DateHistogramGroupSource> instanceReader() {
return DateHistogramGroupSource::new;
}

public void testRoundingDateHistogramFixedInterval() {
String field = randomBoolean() ? null : randomAlphaOfLengthBetween(1, 20);
DateHistogramGroupSource dateHistogramGroupSource = new DateHistogramGroupSource(
field,
null,
new DateHistogramGroupSource.FixedInterval(new DateHistogramInterval("1d")),
null
);

// not meant to be complete rounding tests, see {@link RoundingTests} for more
assertNotNull(dateHistogramGroupSource.getRounding());

assertThat(
dateHistogramGroupSource.getRounding().round(time("2020-03-26T23:59:59.000Z")),
equalTo(time("2020-03-26T00:00:00.000Z"))
);
assertThat(
dateHistogramGroupSource.getRounding().round(time("2020-03-26T00:00:01.000Z")),
equalTo(time("2020-03-26T00:00:00.000Z"))
);
}

public void testRoundingDateHistogramCalendarInterval() {
String field = randomBoolean() ? null : randomAlphaOfLengthBetween(1, 20);
DateHistogramGroupSource dateHistogramGroupSource = new DateHistogramGroupSource(
field,
null,
new DateHistogramGroupSource.CalendarInterval(new DateHistogramInterval("1w")),
null
);

// not meant to be complete rounding tests, see {@link RoundingTests} for more
assertNotNull(dateHistogramGroupSource.getRounding());

assertThat(
dateHistogramGroupSource.getRounding().round(time("2020-03-26T23:59:59.000Z")),
equalTo(time("2020-03-23T00:00:00.000Z"))
);
assertThat(
dateHistogramGroupSource.getRounding().round(time("2020-03-29T23:59:59.000Z")),
equalTo(time("2020-03-23T00:00:00.000Z"))
);
assertThat(
dateHistogramGroupSource.getRounding().round(time("2020-03-23T00:00:01.000Z")),
equalTo(time("2020-03-23T00:00:00.000Z"))
);
}

private static long time(String time) {
TemporalAccessor accessor = DateFormatter.forPattern("date_optional_time").withZone(ZoneOffset.UTC).parse(time);
return DateFormatters.from(accessor).toInstant().toEpochMilli();
}
}
Loading