Skip to content

Commit

Permalink
Revert "Begin moving date_histogram to offset rounding (backport of e…
Browse files Browse the repository at this point in the history
…lastic#50873) (elastic#50978)"

This reverts commit 9a3d4db. It was
subtly broken in ways we didn't have tests for.
  • Loading branch information
nik9000 committed Jan 20, 2020
1 parent 79cf089 commit 5dfb6fb
Show file tree
Hide file tree
Showing 10 changed files with 33 additions and 77 deletions.
46 changes: 2 additions & 44 deletions server/src/main/java/org/elasticsearch/common/Rounding.java
Original file line number Diff line number Diff line change
Expand Up @@ -164,19 +164,6 @@ public void writeTo(StreamOutput out) throws IOException {
*/
public abstract long nextRoundingValue(long value);

/**
* How "offset" this rounding is from the traditional "start" of the period.
* @deprecated We're in the process of abstracting offset *into* Rounding
* so keep any usage to migratory shims
*/
@Deprecated
public abstract long offset();

/**
* Strip the {@code offset} from these bounds.
*/
public abstract Rounding withoutOffset();

@Override
public abstract boolean equals(Object obj);

Expand Down Expand Up @@ -438,16 +425,6 @@ public long nextRoundingValue(long utcMillis) {
}
}

@Override
public long offset() {
return 0;
}

@Override
public Rounding withoutOffset() {
return this;
}

@Override
public int hashCode() {
return Objects.hash(unit, timeZone);
Expand Down Expand Up @@ -579,16 +556,6 @@ public long nextRoundingValue(long time) {
.toInstant().toEpochMilli();
}

@Override
public long offset() {
return 0;
}

@Override
public Rounding withoutOffset() {
return this;
}

@Override
public int hashCode() {
return Objects.hash(interval, timeZone);
Expand Down Expand Up @@ -650,17 +617,8 @@ public long round(long value) {

@Override
public long nextRoundingValue(long value) {
return delegate.nextRoundingValue(value - offset) + offset;
}

@Override
public long offset() {
return offset;
}

@Override
public Rounding withoutOffset() {
return delegate;
// This isn't needed by the current users. We'll implement it when we migrate other users to it.
throw new UnsupportedOperationException("not yet supported");
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -500,21 +500,21 @@ protected ValuesSourceAggregatorFactory<ValuesSource> innerBuild(QueryShardConte
Builder subFactoriesBuilder) throws IOException {
final ZoneId tz = timeZone();
// TODO use offset here rather than explicitly in the aggregation
final Rounding rounding = dateHistogramInterval.createRounding(tz, offset);
final Rounding rounding = dateHistogramInterval.createRounding(tz, 0);
final ZoneId rewrittenTimeZone = rewriteTimeZone(queryShardContext);
final Rounding shardRounding;
if (tz == rewrittenTimeZone) {
shardRounding = rounding;
} else {
shardRounding = dateHistogramInterval.createRounding(rewrittenTimeZone, offset);
shardRounding = dateHistogramInterval.createRounding(rewrittenTimeZone, 0);
}

ExtendedBounds roundedBounds = null;
if (this.extendedBounds != null) {
// parse any string bounds to longs and round
roundedBounds = this.extendedBounds.parseAndValidate(name, queryShardContext, config.format()).round(rounding);
}
return new DateHistogramAggregatorFactory(name, config, order, keyed, minDocCount,
return new DateHistogramAggregatorFactory(name, config, offset, order, keyed, minDocCount,
rounding, shardRounding, roundedBounds, queryShardContext, parent, subFactoriesBuilder, metaData);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,16 +64,18 @@ class DateHistogramAggregator extends BucketsAggregator {
private final ExtendedBounds extendedBounds;

private final LongHash bucketOrds;
private long offset;

DateHistogramAggregator(String name, AggregatorFactories factories, Rounding rounding, Rounding shardRounding,
BucketOrder order, boolean keyed,
long offset, BucketOrder order, boolean keyed,
long minDocCount, @Nullable ExtendedBounds extendedBounds, @Nullable ValuesSource.Numeric valuesSource,
DocValueFormat formatter, SearchContext aggregationContext,
Aggregator parent, List<PipelineAggregator> pipelineAggregators, Map<String, Object> metaData) throws IOException {

super(name, factories, aggregationContext, parent, pipelineAggregators, metaData);
this.rounding = rounding;
this.shardRounding = shardRounding;
this.offset = offset;
this.order = InternalOrder.validate(order, this);
this.keyed = keyed;
this.minDocCount = minDocCount;
Expand Down Expand Up @@ -111,7 +113,7 @@ public void collect(int doc, long bucket) throws IOException {
long value = values.nextValue();
// We can use shardRounding here, which is sometimes more efficient
// if daylight saving times are involved.
long rounded = shardRounding.round(value);
long rounded = shardRounding.round(value - offset) + offset;
assert rounded >= previousRounded;
if (rounded == previousRounded) {
continue;
Expand Down Expand Up @@ -148,7 +150,7 @@ public InternalAggregation buildAggregation(long owningBucketOrdinal) throws IOE
InternalDateHistogram.EmptyBucketInfo emptyBucketInfo = minDocCount == 0
? new InternalDateHistogram.EmptyBucketInfo(rounding, buildEmptySubAggregations(), extendedBounds)
: null;
return new InternalDateHistogram(name, buckets, order, minDocCount, rounding.offset(), emptyBucketInfo, formatter, keyed,
return new InternalDateHistogram(name, buckets, order, minDocCount, offset, emptyBucketInfo, formatter, keyed,
pipelineAggregators(), metaData());
}

Expand All @@ -157,8 +159,8 @@ public InternalAggregation buildEmptyAggregation() {
InternalDateHistogram.EmptyBucketInfo emptyBucketInfo = minDocCount == 0
? new InternalDateHistogram.EmptyBucketInfo(rounding, buildEmptySubAggregations(), extendedBounds)
: null;
return new InternalDateHistogram(name, Collections.emptyList(), order, minDocCount, rounding.offset(), emptyBucketInfo, formatter,
keyed, pipelineAggregators(), metaData());
return new InternalDateHistogram(name, Collections.emptyList(), order, minDocCount, offset, emptyBucketInfo, formatter, keyed,
pipelineAggregators(), metaData());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
public final class DateHistogramAggregatorFactory
extends ValuesSourceAggregatorFactory<ValuesSource> {

private final long offset;
private final BucketOrder order;
private final boolean keyed;
private final long minDocCount;
Expand All @@ -47,11 +48,12 @@ public final class DateHistogramAggregatorFactory
private final Rounding shardRounding;

public DateHistogramAggregatorFactory(String name, ValuesSourceConfig<ValuesSource> config,
BucketOrder order, boolean keyed, long minDocCount,
long offset, BucketOrder order, boolean keyed, long minDocCount,
Rounding rounding, Rounding shardRounding, ExtendedBounds extendedBounds, QueryShardContext queryShardContext,
AggregatorFactory parent, AggregatorFactories.Builder subFactoriesBuilder,
Map<String, Object> metaData) throws IOException {
super(name, config, queryShardContext, parent, subFactoriesBuilder, metaData);
this.offset = offset;
this.order = order;
this.keyed = keyed;
this.minDocCount = minDocCount;
Expand Down Expand Up @@ -102,7 +104,7 @@ protected Aggregator doCreateInternal(ValuesSource valuesSource,
private Aggregator createAggregator(ValuesSource.Numeric valuesSource, SearchContext searchContext,
Aggregator parent, List<PipelineAggregator> pipelineAggregators,
Map<String, Object> metaData) throws IOException {
return new DateHistogramAggregator(name, factories, rounding, shardRounding, order, keyed, minDocCount, extendedBounds,
return new DateHistogramAggregator(name, factories, rounding, shardRounding, offset, order, keyed, minDocCount, extendedBounds,
valuesSource, config.format(), searchContext, parent, pipelineAggregators, metaData);
}

Expand All @@ -111,7 +113,7 @@ private Aggregator createRangeAggregator(ValuesSource.Range valuesSource,
Aggregator parent,
List<PipelineAggregator> pipelineAggregators,
Map<String, Object> metaData) throws IOException {
return new DateRangeHistogramAggregator(name, factories, rounding, shardRounding, order, keyed, minDocCount, extendedBounds,
return new DateRangeHistogramAggregator(name, factories, rounding, shardRounding, offset, order, keyed, minDocCount, extendedBounds,
valuesSource, config.format(), searchContext, parent, pipelineAggregators, metaData);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,9 +67,10 @@ class DateRangeHistogramAggregator extends BucketsAggregator {
private final ExtendedBounds extendedBounds;

private final LongHash bucketOrds;
private long offset;

DateRangeHistogramAggregator(String name, AggregatorFactories factories, Rounding rounding, Rounding shardRounding,
BucketOrder order, boolean keyed,
long offset, BucketOrder order, boolean keyed,
long minDocCount, @Nullable ExtendedBounds extendedBounds, @Nullable ValuesSource.Range valuesSource,
DocValueFormat formatter, SearchContext aggregationContext,
Aggregator parent, List<PipelineAggregator> pipelineAggregators,
Expand All @@ -78,6 +79,7 @@ class DateRangeHistogramAggregator extends BucketsAggregator {
super(name, factories, aggregationContext, parent, pipelineAggregators, metaData);
this.rounding = rounding;
this.shardRounding = shardRounding;
this.offset = offset;
this.order = InternalOrder.validate(order, this);
this.keyed = keyed;
this.minDocCount = minDocCount;
Expand Down Expand Up @@ -124,8 +126,8 @@ public void collect(int doc, long bucket) throws IOException {
// The encoding should ensure that this assert is always true.
assert from >= previousFrom : "Start of range not >= previous start";
final Long to = (Long) range.getTo();
final long startKey = shardRounding.round(from);
final long endKey = shardRounding.round(to);
final long startKey = offsetAwareRounding(shardRounding, from, offset);
final long endKey = offsetAwareRounding(shardRounding, to, offset);
for (long key = startKey > previousKey ? startKey : previousKey; key <= endKey;
key = shardRounding.nextRoundingValue(key)) {
if (key == previousKey) {
Expand All @@ -151,6 +153,10 @@ public void collect(int doc, long bucket) throws IOException {
};
}

private long offsetAwareRounding(Rounding rounding, long value, long offset) {
return rounding.round(value - offset) + offset;
}

@Override
public InternalAggregation buildAggregation(long owningBucketOrdinal) throws IOException {
assert owningBucketOrdinal == 0;
Expand All @@ -169,7 +175,7 @@ public InternalAggregation buildAggregation(long owningBucketOrdinal) throws IOE
InternalDateHistogram.EmptyBucketInfo emptyBucketInfo = minDocCount == 0
? new InternalDateHistogram.EmptyBucketInfo(rounding, buildEmptySubAggregations(), extendedBounds)
: null;
return new InternalDateHistogram(name, buckets, order, minDocCount, rounding.offset(), emptyBucketInfo, formatter, keyed,
return new InternalDateHistogram(name, buckets, order, minDocCount, offset, emptyBucketInfo, formatter, keyed,
pipelineAggregators(), metaData());
}

Expand All @@ -178,8 +184,8 @@ public InternalAggregation buildEmptyAggregation() {
InternalDateHistogram.EmptyBucketInfo emptyBucketInfo = minDocCount == 0
? new InternalDateHistogram.EmptyBucketInfo(rounding, buildEmptySubAggregations(), extendedBounds)
: null;
return new InternalDateHistogram(name, Collections.emptyList(), order, minDocCount, rounding.offset(), emptyBucketInfo, formatter,
keyed, pipelineAggregators(), metaData());
return new InternalDateHistogram(name, Collections.emptyList(), order, minDocCount, offset, emptyBucketInfo, formatter, keyed,
pipelineAggregators(), metaData());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -166,11 +166,7 @@ ExtendedBounds parseAndValidate(String aggName, QueryShardContext queryShardCont
}

ExtendedBounds round(Rounding rounding) {
// Extended bounds shouldn't be effected by the offset
Rounding effectiveRounding = rounding.withoutOffset();
return new ExtendedBounds(
min != null ? effectiveRounding.round(min) : null,
max != null ? effectiveRounding.round(max) : null);
return new ExtendedBounds(min != null ? rounding.round(min) : null, max != null ? rounding.round(max) : null);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -497,7 +497,7 @@ public Number getKey(MultiBucketsAggregation.Bucket bucket) {

@Override
public Number nextKey(Number key) {
return emptyBucketInfo.rounding.nextRoundingValue(key.longValue());
return emptyBucketInfo.rounding.nextRoundingValue(key.longValue() - offset) + offset;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -201,18 +201,10 @@ public void testOffsetRounding() {
Rounding rounding = Rounding.builder(Rounding.DateTimeUnit.DAY_OF_MONTH).offset(twoHours).build();
assertThat(rounding.round(0), equalTo(-oneDay + twoHours));
assertThat(rounding.round(twoHours), equalTo(twoHours));
assertThat(rounding.nextRoundingValue(-oneDay), equalTo(-oneDay + twoHours));
assertThat(rounding.nextRoundingValue(0), equalTo(twoHours));
assertThat(rounding.withoutOffset().round(0), equalTo(0L));
assertThat(rounding.withoutOffset().nextRoundingValue(0), equalTo(oneDay));

rounding = Rounding.builder(Rounding.DateTimeUnit.DAY_OF_MONTH).offset(-twoHours).build();
assertThat(rounding.round(0), equalTo(-twoHours));
assertThat(rounding.round(oneDay - twoHours), equalTo(oneDay - twoHours));
assertThat(rounding.nextRoundingValue(-oneDay), equalTo(-twoHours));
assertThat(rounding.nextRoundingValue(0), equalTo(oneDay - twoHours));
assertThat(rounding.withoutOffset().round(0), equalTo(0L));
assertThat(rounding.withoutOffset().nextRoundingValue(0), equalTo(oneDay));
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ static AggregatorFactory getRandomSequentiallyOrderedParentAgg() throws IOExcept
new AggregatorFactories.Builder(), Collections.emptyMap());
break;
case 1:
factory = new DateHistogramAggregatorFactory("name", mock(ValuesSourceConfig.class),
factory = new DateHistogramAggregatorFactory("name", mock(ValuesSourceConfig.class), 0L,
mock(InternalOrder.class), false, 0L, mock(Rounding.class), mock(Rounding.class),
mock(ExtendedBounds.class), mock(QueryShardContext.class), mock(AggregatorFactory.class),
new AggregatorFactories.Builder(), Collections.emptyMap());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ public void testParentValidations() throws IOException {
// Date Histogram
aggBuilders.clear();
aggBuilders.add(new CumulativeCardinalityPipelineAggregationBuilder("cumulative_card", "sum"));
parent = new DateHistogramAggregatorFactory("name", valuesSourceConfig,
parent = new DateHistogramAggregatorFactory("name", valuesSourceConfig, 0L,
mock(InternalOrder.class), false, 0L, mock(Rounding.class), mock(Rounding.class),
mock(ExtendedBounds.class), mock(QueryShardContext.class), mock(AggregatorFactory.class),
new AggregatorFactories.Builder(), Collections.emptyMap());
Expand Down

0 comments on commit 5dfb6fb

Please sign in to comment.