Skip to content

Commit

Permalink
Mitigate date histogram slowdowns with non-fixed timezones. (#30534)
Browse files Browse the repository at this point in the history
Date histograms on non-fixed timezones such as `Europe/Paris` proved much slower
than histograms on fixed timezones in #28727. This change mitigates the issue by
using a fixed time zone instead when shard data doesn't cross a transition so
that all timestamps share the same fixed offset. This should be a common case
with daily indices.

NOTE: Rewriting the aggregation doesn't work since the timezone is then also
used on the coordinating node to create empty buckets, which might be out of the
range of data that exists on the shard.

NOTE: In order to be able to get a shard context in the tests, I reused code
from the base query test case by creating a new parent test case for both
queries and aggregations: `AbstractBuilderTestCase`.

Mitigates #28727
  • Loading branch information
jpountz committed May 16, 2018
1 parent 53ec892 commit a9c8ca8
Show file tree
Hide file tree
Showing 7 changed files with 629 additions and 444 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,10 @@

package org.elasticsearch.search.aggregations.bucket.histogram;

import org.apache.lucene.index.IndexReader;
import org.apache.lucene.index.LeafReaderContext;
import org.apache.lucene.index.SortedNumericDocValues;
import org.apache.lucene.search.DocIdSetIterator;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.rounding.DateTimeUnit;
Expand All @@ -27,8 +31,13 @@
import org.elasticsearch.common.xcontent.ObjectParser;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.index.fielddata.AtomicNumericFieldData;
import org.elasticsearch.index.fielddata.IndexNumericFieldData;
import org.elasticsearch.index.mapper.MappedFieldType;
import org.elasticsearch.index.mapper.MappedFieldType.Relation;
import org.elasticsearch.index.query.QueryShardContext;
import org.elasticsearch.search.DocValueFormat;
import org.elasticsearch.search.aggregations.AggregationBuilder;
import org.elasticsearch.search.aggregations.AggregatorFactories;
import org.elasticsearch.search.aggregations.AggregatorFactories.Builder;
import org.elasticsearch.search.aggregations.AggregatorFactory;
import org.elasticsearch.search.aggregations.BucketOrder;
Expand All @@ -44,6 +53,8 @@
import org.elasticsearch.search.aggregations.support.ValuesSourceParserHelper;
import org.elasticsearch.search.aggregations.support.ValuesSourceType;
import org.elasticsearch.search.internal.SearchContext;
import org.joda.time.DateTimeField;
import org.joda.time.DateTimeZone;

import java.io.IOException;
import java.util.HashMap;
Expand Down Expand Up @@ -351,36 +362,121 @@ public String getType() {
return NAME;
}

/*
* NOTE: this can't be done in rewrite() because the timezone is then also used on the
* coordinating node in order to generate missing buckets, which may cross a transition
* even though data on the shards doesn't.
*/
DateTimeZone rewriteTimeZone(QueryShardContext context) throws IOException {
final DateTimeZone tz = timeZone();
if (field() != null &&
tz != null &&
tz.isFixed() == false &&
field() != null &&
script() == null) {
final MappedFieldType ft = context.fieldMapper(field());
final IndexReader reader = context.getIndexReader();
if (ft != null && reader != null) {
Long anyInstant = null;
final IndexNumericFieldData fieldData = context.getForField(ft);
for (LeafReaderContext ctx : reader.leaves()) {
AtomicNumericFieldData leafFD = ((IndexNumericFieldData) fieldData).load(ctx);
SortedNumericDocValues values = leafFD.getLongValues();
if (values.nextDoc() != DocIdSetIterator.NO_MORE_DOCS) {
anyInstant = values.nextValue();
break;
}
}

if (anyInstant != null) {
final long prevTransition = tz.previousTransition(anyInstant);
final long nextTransition = tz.nextTransition(anyInstant);

// We need all not only values but also rounded values to be within
// [prevTransition, nextTransition].
final long low;
DateTimeUnit intervalAsUnit = getIntervalAsDateTimeUnit();
if (intervalAsUnit != null) {
final DateTimeField dateTimeField = intervalAsUnit.field(tz);
low = dateTimeField.roundCeiling(prevTransition);
} else {
final TimeValue intervalAsMillis = getIntervalAsTimeValue();
low = Math.addExact(prevTransition, intervalAsMillis.millis());
}
// rounding rounds down, so 'nextTransition' is a good upper bound
final long high = nextTransition;

final DocValueFormat format = ft.docValueFormat(null, null);
final String formattedLow = format.format(low);
final String formattedHigh = format.format(high);
if (ft.isFieldWithinQuery(reader, formattedLow, formattedHigh,
true, false, tz, null, context) == Relation.WITHIN) {
// All values in this reader have the same offset despite daylight saving times.
// This is very common for location-based timezones such as Europe/Paris in
// combination with time-based indices.
return DateTimeZone.forOffsetMillis(tz.getOffset(anyInstant));
}
}
}
}
return tz;
}

@Override
protected ValuesSourceAggregatorFactory<Numeric, ?> innerBuild(SearchContext context, ValuesSourceConfig<Numeric> config,
AggregatorFactory<?> parent, Builder subFactoriesBuilder) throws IOException {
Rounding rounding = createRounding();
final DateTimeZone tz = timeZone();
final Rounding rounding = createRounding(tz);
final DateTimeZone rewrittenTimeZone = rewriteTimeZone(context.getQueryShardContext());
final Rounding shardRounding;
if (tz == rewrittenTimeZone) {
shardRounding = rounding;
} else {
shardRounding = createRounding(rewrittenTimeZone);
}

ExtendedBounds roundedBounds = null;
if (this.extendedBounds != null) {
// parse any string bounds to longs and round
roundedBounds = this.extendedBounds.parseAndValidate(name, context, config.format()).round(rounding);
}
return new DateHistogramAggregatorFactory(name, config, interval, dateHistogramInterval, offset, order, keyed, minDocCount,
rounding, roundedBounds, context, parent, subFactoriesBuilder, metaData);
return new DateHistogramAggregatorFactory(name, config, offset, order, keyed, minDocCount,
rounding, shardRounding, roundedBounds, context, parent, subFactoriesBuilder, metaData);
}

private Rounding createRounding() {
Rounding.Builder tzRoundingBuilder;
/** Return the interval as a date time unit if applicable. If this returns
* {@code null} then it means that the interval is expressed as a fixed
* {@link TimeValue} and may be accessed via
* {@link #getIntervalAsTimeValue()}. */
private DateTimeUnit getIntervalAsDateTimeUnit() {
if (dateHistogramInterval != null) {
DateTimeUnit dateTimeUnit = DATE_FIELD_UNITS.get(dateHistogramInterval.toString());
if (dateTimeUnit != null) {
tzRoundingBuilder = Rounding.builder(dateTimeUnit);
} else {
// the interval is a time value?
tzRoundingBuilder = Rounding.builder(
TimeValue.parseTimeValue(dateHistogramInterval.toString(), null, getClass().getSimpleName() + ".interval"));
}
return DATE_FIELD_UNITS.get(dateHistogramInterval.toString());
}
return null;
}

/**
* Get the interval as a {@link TimeValue}. Should only be called if
* {@link #getIntervalAsDateTimeUnit()} returned {@code null}.
*/
private TimeValue getIntervalAsTimeValue() {
if (dateHistogramInterval != null) {
return TimeValue.parseTimeValue(dateHistogramInterval.toString(), null, getClass().getSimpleName() + ".interval");
} else {
return TimeValue.timeValueMillis(interval);
}
}

private Rounding createRounding(DateTimeZone timeZone) {
Rounding.Builder tzRoundingBuilder;
DateTimeUnit intervalAsUnit = getIntervalAsDateTimeUnit();
if (intervalAsUnit != null) {
tzRoundingBuilder = Rounding.builder(intervalAsUnit);
} else {
// the interval is an integer time value in millis?
tzRoundingBuilder = Rounding.builder(TimeValue.timeValueMillis(interval));
tzRoundingBuilder = Rounding.builder(getIntervalAsTimeValue());
}
if (timeZone() != null) {
tzRoundingBuilder.timeZone(timeZone());
if (timeZone != null) {
tzRoundingBuilder.timeZone(timeZone);
}
Rounding rounding = tzRoundingBuilder.build();
return rounding;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ class DateHistogramAggregator extends BucketsAggregator {
private final ValuesSource.Numeric valuesSource;
private final DocValueFormat formatter;
private final Rounding rounding;
private final Rounding shardRounding;
private final BucketOrder order;
private final boolean keyed;

Expand All @@ -64,14 +65,15 @@ class DateHistogramAggregator extends BucketsAggregator {
private final LongHash bucketOrds;
private long offset;

DateHistogramAggregator(String name, AggregatorFactories factories, Rounding rounding, long offset, BucketOrder order,
boolean keyed,
DateHistogramAggregator(String name, AggregatorFactories factories, Rounding rounding, Rounding shardRounding,
long offset, BucketOrder order, boolean keyed,
long minDocCount, @Nullable ExtendedBounds extendedBounds, @Nullable ValuesSource.Numeric valuesSource,
DocValueFormat formatter, SearchContext aggregationContext,
Aggregator parent, List<PipelineAggregator> pipelineAggregators, Map<String, Object> metaData) throws IOException {

super(name, factories, aggregationContext, parent, pipelineAggregators, metaData);
this.rounding = rounding;
this.shardRounding = shardRounding;
this.offset = offset;
this.order = InternalOrder.validate(order, this);;
this.keyed = keyed;
Expand Down Expand Up @@ -105,7 +107,9 @@ public void collect(int doc, long bucket) throws IOException {
long previousRounded = Long.MIN_VALUE;
for (int i = 0; i < valuesCount; ++i) {
long value = values.nextValue();
long rounded = rounding.round(value - offset) + offset;
// We can use shardRounding here, which is sometimes more efficient
// if daylight saving times are involved.
long rounded = shardRounding.round(value - offset) + offset;
assert rounded >= previousRounded;
if (rounded == previousRounded) {
continue;
Expand Down Expand Up @@ -138,6 +142,7 @@ public InternalAggregation buildAggregation(long owningBucketOrdinal) throws IOE
CollectionUtil.introSort(buckets, BucketOrder.key(true).comparator(this));

// value source will be null for unmapped fields
// Important: use `rounding` here, not `shardRounding`
InternalDateHistogram.EmptyBucketInfo emptyBucketInfo = minDocCount == 0
? new InternalDateHistogram.EmptyBucketInfo(rounding, buildEmptySubAggregations(), extendedBounds)
: null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,28 +38,27 @@
public final class DateHistogramAggregatorFactory
extends ValuesSourceAggregatorFactory<ValuesSource.Numeric, DateHistogramAggregatorFactory> {

private final DateHistogramInterval dateHistogramInterval;
private final long interval;
private final long offset;
private final BucketOrder order;
private final boolean keyed;
private final long minDocCount;
private final ExtendedBounds extendedBounds;
private Rounding rounding;
private final Rounding rounding;
private final Rounding shardRounding;

public DateHistogramAggregatorFactory(String name, ValuesSourceConfig<Numeric> config, long interval,
DateHistogramInterval dateHistogramInterval, long offset, BucketOrder order, boolean keyed, long minDocCount,
Rounding rounding, ExtendedBounds extendedBounds, SearchContext context, AggregatorFactory<?> parent,
AggregatorFactories.Builder subFactoriesBuilder, Map<String, Object> metaData) throws IOException {
public DateHistogramAggregatorFactory(String name, ValuesSourceConfig<Numeric> config,
long offset, BucketOrder order, boolean keyed, long minDocCount,
Rounding rounding, Rounding shardRounding, ExtendedBounds extendedBounds, SearchContext context,
AggregatorFactory<?> parent, AggregatorFactories.Builder subFactoriesBuilder,
Map<String, Object> metaData) throws IOException {
super(name, config, context, parent, subFactoriesBuilder, metaData);
this.interval = interval;
this.dateHistogramInterval = dateHistogramInterval;
this.offset = offset;
this.order = order;
this.keyed = keyed;
this.minDocCount = minDocCount;
this.extendedBounds = extendedBounds;
this.rounding = rounding;
this.shardRounding = shardRounding;
}

public long minDocCount() {
Expand All @@ -77,8 +76,8 @@ protected Aggregator doCreateInternal(ValuesSource.Numeric valuesSource, Aggrega

private Aggregator createAggregator(ValuesSource.Numeric valuesSource, Aggregator parent, List<PipelineAggregator> pipelineAggregators,
Map<String, Object> metaData) throws IOException {
return new DateHistogramAggregator(name, factories, rounding, offset, order, keyed, minDocCount, extendedBounds, valuesSource,
config.format(), context, parent, pipelineAggregators, metaData);
return new DateHistogramAggregator(name, factories, rounding, shardRounding, offset, order, keyed, minDocCount, extendedBounds,
valuesSource, config.format(), context, parent, pipelineAggregators, metaData);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,27 @@
* under the License.
*/

package org.elasticsearch.search.aggregations.bucket;
package org.elasticsearch.search.aggregations.bucket.histogram;

import org.apache.lucene.document.Document;
import org.apache.lucene.document.LongPoint;
import org.apache.lucene.document.SortedNumericDocValuesField;
import org.apache.lucene.index.DirectoryReader;
import org.apache.lucene.index.IndexReader;
import org.apache.lucene.index.IndexWriter;
import org.apache.lucene.store.Directory;
import org.elasticsearch.common.joda.FormatDateTimeFormatter;
import org.elasticsearch.common.joda.Joda;
import org.elasticsearch.index.query.QueryShardContext;
import org.elasticsearch.search.aggregations.BaseAggregationTestCase;
import org.elasticsearch.search.aggregations.BucketOrder;
import org.elasticsearch.search.aggregations.bucket.histogram.DateHistogramAggregationBuilder;
import org.elasticsearch.search.aggregations.bucket.histogram.DateHistogramInterval;
import org.elasticsearch.search.aggregations.bucket.histogram.ExtendedBoundsTests;
import org.elasticsearch.search.aggregations.BucketOrder;
import org.joda.time.DateTimeZone;
import org.junit.Assume;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

Expand Down Expand Up @@ -120,4 +133,73 @@ private List<BucketOrder> randomOrder() {
return orders;
}

private static Document documentForDate(String field, long millis) {
Document doc = new Document();
doc.add(new LongPoint(field, millis));
doc.add(new SortedNumericDocValuesField(field, millis));
return doc;
}

public void testRewriteTimeZone() throws IOException {
Assume.assumeTrue(getCurrentTypes().length > 0); // we need mappings
FormatDateTimeFormatter format = Joda.forPattern("strict_date_optional_time");

try (Directory dir = newDirectory();
IndexWriter w = new IndexWriter(dir, newIndexWriterConfig())) {

w.addDocument(documentForDate(DATE_FIELD_NAME, format.parser().parseDateTime("2018-03-11T11:55:00").getMillis()));
w.addDocument(documentForDate(DATE_FIELD_NAME, format.parser().parseDateTime("2017-10-30T18:13:00").getMillis()));

try (IndexReader readerThatDoesntCross = DirectoryReader.open(w)) {

w.addDocument(documentForDate(DATE_FIELD_NAME, format.parser().parseDateTime("2018-03-25T02:44:00").getMillis()));

try (IndexReader readerThatCrosses = DirectoryReader.open(w)) {

QueryShardContext shardContextThatDoesntCross = createShardContext(readerThatDoesntCross);
QueryShardContext shardContextThatCrosses = createShardContext(readerThatCrosses);

DateHistogramAggregationBuilder builder = new DateHistogramAggregationBuilder("my_date_histo");
builder.field(DATE_FIELD_NAME);
builder.dateHistogramInterval(DateHistogramInterval.DAY);

// no timeZone => no rewrite
assertNull(builder.rewriteTimeZone(shardContextThatDoesntCross));
assertNull(builder.rewriteTimeZone(shardContextThatCrosses));

// fixed timeZone => no rewrite
DateTimeZone tz = DateTimeZone.forOffsetHours(1);
builder.timeZone(tz);
assertSame(tz, builder.rewriteTimeZone(shardContextThatDoesntCross));
assertSame(tz, builder.rewriteTimeZone(shardContextThatCrosses));

// daylight-saving-times => rewrite if doesn't cross
tz = DateTimeZone.forID("Europe/Paris");
builder.timeZone(tz);
assertEquals(DateTimeZone.forOffsetHours(1), builder.rewriteTimeZone(shardContextThatDoesntCross));
assertSame(tz, builder.rewriteTimeZone(shardContextThatCrosses));

// Rounded values are no longer all within the same transitions => no rewrite
builder.dateHistogramInterval(DateHistogramInterval.MONTH);
assertSame(tz, builder.rewriteTimeZone(shardContextThatDoesntCross));
assertSame(tz, builder.rewriteTimeZone(shardContextThatCrosses));

builder = new DateHistogramAggregationBuilder("my_date_histo");
builder.field(DATE_FIELD_NAME);
builder.timeZone(tz);

builder.interval(1000L * 60 * 60 * 24); // ~ 1 day
assertEquals(DateTimeZone.forOffsetHours(1), builder.rewriteTimeZone(shardContextThatDoesntCross));
assertSame(tz, builder.rewriteTimeZone(shardContextThatCrosses));

// Because the interval is large, rounded values are not
// within the same transitions as the values => no rewrite
builder.interval(1000L * 60 * 60 * 24 * 30); // ~ 1 month
assertSame(tz, builder.rewriteTimeZone(shardContextThatDoesntCross));
assertSame(tz, builder.rewriteTimeZone(shardContextThatCrosses));
}
}
}
}

}
Loading

0 comments on commit a9c8ca8

Please sign in to comment.