Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support offset in composite aggs #50609

Merged
merged 13 commits into from
Jan 7, 2020
66 changes: 66 additions & 0 deletions docs/reference/aggregations/bucket/composite-aggregation.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -287,6 +287,72 @@ Time zones may either be specified as an ISO 8601 UTC offset (e.g. `+01:00` or
`-08:00`) or as a timezone id, an identifier used in the TZ database like
`America/Los_Angeles`.

*Offset*

include::datehistogram-aggregation.asciidoc[tag=offset-explanation]
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I figured because the option is the same I should just include the docs from the normal date histogram.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍


[source,console,id=composite-aggregation-datehistogram-offset-example]
----
PUT my_index/_doc/1?refresh
{
"date": "2015-10-01T05:30:00Z"
}

PUT my_index/_doc/2?refresh
{
"date": "2015-10-01T06:30:00Z"
}

GET my_index/_search?size=0
{
"aggs": {
"my_buckets": {
"composite" : {
"sources" : [
{
"date": {
"date_histogram" : {
"field": "date",
"calendar_interval": "day",
"offset": "+6h",
"format": "iso8601"
}
}
}
]
}
}
}
}
----

include::datehistogram-aggregation.asciidoc[tag=offset-result-intro]

[source,console-result]
----
{
...
"aggregations": {
"my_buckets": {
"after_key": { "date": "2015-10-01T06:00:00.000Z" },
"buckets": [
{
"key": { "date": "2015-09-30T06:00:00.000Z" },
"doc_count": 1
},
{
"key": { "date": "2015-10-01T06:00:00.000Z" },
"doc_count": 1
}
]
}
}
}
----
// TESTRESPONSE[s/\.\.\./"took": $body.took,"timed_out": false,"_shards": $body._shards,"hits": $body.hits,/]

include::datehistogram-aggregation.asciidoc[tag=offset-note]

===== Mixing different values source

The `sources` parameter accepts an array of values source.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -461,16 +461,19 @@ the bucket covering that day will only hold data for 23 hours instead of the usu
where you'll have only a 11h bucket on the morning of 27 March when the DST shift
happens.

[[search-aggregations-bucket-datehistogram-offset]]
===== Offset

// tag::offset-explanation[]
Use the `offset` parameter to change the start value of each bucket by the
specified positive (`+`) or negative offset (`-`) duration, such as `1h` for
an hour, or `1d` for a day. See <<time-units>> for more possible time
duration options.

For example, when using an interval of `day`, each bucket runs from midnight
to midnight. Setting the `offset` parameter to `+6h` changes each bucket
to midnight. Setting the `offset` parameter to `+6h` changes each bucket
to run from 6am to 6am:
// end::offset-explanation[]

[source,console,id=datehistogram-aggregation-offset-example]
-----------------------------
Expand Down Expand Up @@ -498,8 +501,10 @@ GET my_index/_search?size=0
}
-----------------------------

// tag::offset-result-intro[]
Instead of a single bucket starting at midnight, the above request groups the
documents into buckets starting at 6am:
// end::offset-result-intro[]

[source,console-result]
-----------------------------
Expand All @@ -525,8 +530,10 @@ documents into buckets starting at 6am:
-----------------------------
// TESTRESPONSE[s/\.\.\./"took": $body.took,"timed_out": false,"_shards": $body._shards,"hits": $body.hits,/]

// tag::offset-note[]
NOTE: The start `offset` of each bucket is calculated after `time_zone`
adjustments have been made.
// end::offset-note[]

===== Keyed Response

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -367,6 +367,40 @@ setup:
- match: { aggregations.test.buckets.0.key.date: "2017-10-21" }
- match: { aggregations.test.buckets.0.doc_count: 1 }

---
"Composite aggregation with date_histogram offset":
- skip:
version: " - 7.99.99"
reason: offset introduced in 8.0.0

- do:
search:
rest_total_hits_as_int: true
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's use the new format, this option should be removed soon.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure!

index: test
body:
aggregations:
test:
composite:
sources: [
{
"date": {
"date_histogram": {
"field": "date",
"calendar_interval": "1d",
"offset": "4h",
"format": "iso8601" # Format makes the comparisons a little more obvious
}
}
}
]

- match: {hits.total: 6}
- length: { aggregations.test.buckets: 2 }
- match: { aggregations.test.buckets.0.key.date: "2017-10-19T04:00:00.000Z" }
- match: { aggregations.test.buckets.0.doc_count: 1 }
- match: { aggregations.test.buckets.1.key.date: "2017-10-21T04:00:00.000Z" }
- match: { aggregations.test.buckets.1.doc_count: 1 }

---
"Composite aggregation with after_key in the response":
- do:
Expand Down
77 changes: 71 additions & 6 deletions server/src/main/java/org/elasticsearch/common/Rounding.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.elasticsearch.common;

import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.Version;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
Expand Down Expand Up @@ -177,6 +178,16 @@ public static Builder builder(TimeValue interval) {
return new Builder(interval);
}

/**
* Create a rounding that offsets values before passing them into another rounding
* and then un-offsets them after that rounding has done its job.
* @param delegate the other rounding to offset
* @param offset the offset, in milliseconds
*/
public static Rounding offset(Rounding delegate, long offset) {
return new OffsetRounding(delegate, offset);
}

public static class Builder {

private final DateTimeUnit unit;
Expand Down Expand Up @@ -556,19 +567,73 @@ public boolean equals(Object obj) {
}
}

static class OffsetRounding extends Rounding {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the intention to migrate the existing users of offsets (regular date histo agg, etc) over to this wrapper? E.g. it seems a bit heavyweight to create a whole new rounding, but if the idea is to followup with changes to the existing users of offset (rather than baking the logic into the aggs) it makes sense to me.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, we should probably add a javadoc explaining why/when to use this class

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I did intend to migrate the rest over of the uses of offset over to it.

I added javadoc on the public interface to the class which is what the rest of the classes in this file do. Do you think I should duplicate it onto this one? Or add something maybe?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, missed that it was documented on the public method. Since the rest of the class does it that way, fine with me :)

Thanks for the explanation!

static final byte ID = 3;

private final Rounding delegate;
private final long offset;

OffsetRounding(Rounding delegate, long offset) {
this.delegate = delegate;
this.offset = offset;
}

OffsetRounding(StreamInput in) throws IOException {
delegate = Rounding.read(in);
offset = in.readLong();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe read/write ZLong instead? I suspect offsets will be small-to-medium'ish sized and either positive or negative, so zlong might be a win?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In other places offsets are written as long and I just copied it. But I'd be fine with zlong.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pushed the switch to zlong.

}

@Override
public void innerWriteTo(StreamOutput out) throws IOException {
if (out.getVersion().before(Version.V_8_0_0)) {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't actually serialize this Rounding but I believe I've got this bit right so I kept it. I'd like to move the other offset code over to this Rounding which means that we will eventually serialize it. At that point we'll get exhaustive tests for this code.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Modulo the version check which I discuss below.

throw new IllegalArgumentException("Offset rounding not supported before 8.0.0");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should the message be before 7.6.0 ?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah. I'll fix it.

}
delegate.writeTo(out);
out.writeLong(offset);
}

@Override
public byte id() {
return ID;
}

@Override
public long round(long value) {
return delegate.round(value - offset) + offset;
}

@Override
public long nextRoundingValue(long value) {
// This isn't needed by the current users. We'll implement it when we migrate other users to it.
throw new UnsupportedOperationException("not yet supported");
}

@Override
public int hashCode() {
return Objects.hash(delegate, offset);
}

@Override
public boolean equals(Object obj) {
if (obj == null || getClass() != obj.getClass()) {
return false;
}
OffsetRounding other = (OffsetRounding) obj;
return delegate.equals(other.delegate) && offset == other.offset;
}
}

public static Rounding read(StreamInput in) throws IOException {
Rounding rounding;
byte id = in.readByte();
switch (id) {
case TimeUnitRounding.ID:
rounding = new TimeUnitRounding(in);
break;
return new TimeUnitRounding(in);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🙏 for the cleanup here :)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure everyone would consider that cleaner, but I sure do!

case TimeIntervalRounding.ID:
rounding = new TimeIntervalRounding(in);
break;
return new TimeIntervalRounding(in);
case OffsetRounding.ID:
return new OffsetRounding(in);
default:
throw new ElasticsearchException("unknown rounding id [" + id + "]");
}
return rounding;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

package org.elasticsearch.search.aggregations.bucket.composite;

import org.elasticsearch.Version;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.Rounding;
import org.elasticsearch.common.io.stream.StreamInput;
Expand All @@ -31,9 +32,11 @@
import org.elasticsearch.index.query.QueryShardContext;
import org.elasticsearch.script.Script;
import org.elasticsearch.search.DocValueFormat;
import org.elasticsearch.search.aggregations.bucket.histogram.DateHistogramAggregationBuilder;
import org.elasticsearch.search.aggregations.bucket.histogram.DateHistogramInterval;
import org.elasticsearch.search.aggregations.bucket.histogram.DateIntervalConsumer;
import org.elasticsearch.search.aggregations.bucket.histogram.DateIntervalWrapper;
import org.elasticsearch.search.aggregations.bucket.histogram.Histogram;
import org.elasticsearch.search.aggregations.support.ValueType;
import org.elasticsearch.search.aggregations.support.ValuesSource;
import org.elasticsearch.search.aggregations.support.ValuesSourceConfig;
Expand All @@ -56,6 +59,13 @@ public class DateHistogramValuesSourceBuilder
PARSER = new ObjectParser<>(DateHistogramValuesSourceBuilder.TYPE);
PARSER.declareString(DateHistogramValuesSourceBuilder::format, new ParseField("format"));
DateIntervalWrapper.declareIntervalFields(PARSER);
PARSER.declareField(DateHistogramValuesSourceBuilder::offset, p -> {
if (p.currentToken() == XContentParser.Token.VALUE_NUMBER) {
return p.longValue();
} else {
return DateHistogramAggregationBuilder.parseStringOffset(p.text());
}
}, Histogram.OFFSET_FIELD, ObjectParser.ValueType.LONG);
PARSER.declareField(DateHistogramValuesSourceBuilder::timeZone, p -> {
if (p.currentToken() == XContentParser.Token.VALUE_STRING) {
return ZoneId.of(p.text());
Expand All @@ -71,6 +81,7 @@ static DateHistogramValuesSourceBuilder parse(String name, XContentParser parser

private ZoneId timeZone = null;
private DateIntervalWrapper dateHistogramInterval = new DateIntervalWrapper();
private long offset = 0;

public DateHistogramValuesSourceBuilder(String name) {
super(name, ValueType.DATE);
Expand All @@ -80,12 +91,18 @@ protected DateHistogramValuesSourceBuilder(StreamInput in) throws IOException {
super(in);
dateHistogramInterval = new DateIntervalWrapper(in);
timeZone = in.readOptionalZoneId();
if (in.getVersion().after(Version.V_8_0_0)) {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is the way to do this, but it has been a long time. I want to land this change in master and 7.x, but these tests have no change of passing the bwc tests until I backport it.

Now that I think about it, maybe this version check should be V_7_6_0 actually. Help!

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The tests caught an error - this at least be onOrAfter but I'll switch it to V_7_6_0 while I'm there.

offset = in.readLong();
}
}

@Override
protected void innerWriteTo(StreamOutput out) throws IOException {
dateHistogramInterval.writeTo(out);
out.writeOptionalZoneId(timeZone);
if (out.getVersion().after(Version.V_8_0_0)) {
out.writeLong(offset);
}
}

@Override
Expand Down Expand Up @@ -215,9 +232,28 @@ public ZoneId timeZone() {
return timeZone;
}

/**
* Get the offset to use when rounding, which is a number of milliseconds.
*/
public long offset() {
return offset;
}

/**
* Set the offset on this builder, which is a number of milliseconds.
* @return this for chaining
*/
public DateHistogramValuesSourceBuilder offset(long offset) {
this.offset = offset;
return this;
}

@Override
protected CompositeValuesSourceConfig innerBuild(QueryShardContext queryShardContext, ValuesSourceConfig<?> config) throws IOException {
Rounding rounding = dateHistogramInterval.createRounding(timeZone());
if (offset != 0) {
rounding = Rounding.offset(rounding, offset);
}
ValuesSource orig = config.toValuesSource(queryShardContext);
if (orig == null) {
orig = ValuesSource.Numeric.EMPTY;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -285,7 +285,10 @@ public DateHistogramAggregationBuilder offset(String offset) {
return offset(parseStringOffset(offset));
}

static long parseStringOffset(String offset) {
/**
* Parse the string specification of an offset.
*/
public static long parseStringOffset(String offset) {
if (offset.charAt(0) == '-') {
return -TimeValue
.parseTimeValue(offset.substring(1), null, DateHistogramAggregationBuilder.class.getSimpleName() + ".parseOffset")
Expand Down
10 changes: 10 additions & 0 deletions server/src/test/java/org/elasticsearch/common/RoundingTests.java
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,16 @@ public void testTimeUnitRoundingDST() {
assertThat(tzRounding_chg.round(time("2014-11-02T06:01:01", chg)), isDate(time("2014-11-02T06:00:00", chg), chg));
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, do you know if we have any serialization tests for Rounding? I was looking to see if we test that the id() bytes are correct and don't accidentally change (similar to what we do with AbstractWriteableEnumTestCase enum tests)... but couldn't find any serialization tests at all.

If we have them somewhere, let's add a test for the id byte. If not, probably too much to add to this PR but we should file a ticket so we don't forget to add some tests... makes me uneasy that such a widespread class doesn't have serialization tests :)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe we end up testing serialization as part of testing things like extended bounds bucket response. Adding a unit test for just this class's serialization makes sense to me though. I figured I'd wait until I used it in a context where we serialized it but since I'm writing the serialization code now I probably ought to write the test now.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I pushed a wire test case.

public void testOffsetRounding() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's add another test that does negative offsets too?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure!

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I pushed a test with negative offsets.

long twoHours = TimeUnit.HOURS.toMillis(2);
long oneDay = TimeUnit.DAYS.toMillis(1);
Rounding dayOfMonth = Rounding.builder(Rounding.DateTimeUnit.DAY_OF_MONTH).build();
Rounding offsetRounding = Rounding.offset(dayOfMonth, twoHours);
assertThat(offsetRounding.round(0), equalTo(-oneDay + twoHours));

assertThat(offsetRounding.round(twoHours), equalTo(twoHours));
}

/**
* Randomized test on TimeUnitRounding. Test uses random
* {@link DateTimeUnit} and {@link ZoneId} and often (50% of the time)
Expand Down
Loading