Skip to content

Commit

Permalink
Support offset in composite aggs
Browse files Browse the repository at this point in the history
Adds support for the `offset` parameter to the `date_histogram` source
of composite aggs. The `offset` parameter is supported by the normal
`date_histogram` aggregation and is useful for folks that need to
measure things from, say, 6am one day to 6am the next day.

This is implemented by creating a new `Rounding` that knows how to
handle offsets and delegates to other rounding implementations. That
implementation doesn't fully implement the `Rounding` contract, namely
`nextRoundingValue`. That method isn't used by composite aggs so I can't
be sure that any implementation that I add will be correct. I propose to
leave it throwing `UnsupportedOperationException` until I need it.

Closes elastic#48757
  • Loading branch information
nik9000 committed Jan 3, 2020
1 parent 4441a22 commit 3032d62
Show file tree
Hide file tree
Showing 8 changed files with 261 additions and 11 deletions.
66 changes: 66 additions & 0 deletions docs/reference/aggregations/bucket/composite-aggregation.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -287,6 +287,72 @@ Time zones may either be specified as an ISO 8601 UTC offset (e.g. `+01:00` or
`-08:00`) or as a timezone id, an identifier used in the TZ database like
`America/Los_Angeles`.

*Offset*

include::datehistogram-aggregation.asciidoc[tag=offset-explanation]

[source,console,id=composite-aggregation-datehistogram-offset-example]
----
PUT my_index/_doc/1?refresh
{
"date": "2015-10-01T05:30:00Z"
}
PUT my_index/_doc/2?refresh
{
"date": "2015-10-01T06:30:00Z"
}
GET my_index/_search?size=0
{
"aggs": {
"my_buckets": {
"composite" : {
"sources" : [
{
"date": {
"date_histogram" : {
"field": "date",
"calendar_interval": "day",
"offset": "+6h",
"format": "iso8601"
}
}
}
]
}
}
}
}
----

include::datehistogram-aggregation.asciidoc[tag=offset-result-intro]

[source,console-result]
----
{
...
"aggregations": {
"my_buckets": {
"after_key": { "date": "2015-10-01T06:00:00.000Z" },
"buckets": [
{
"key": { "date": "2015-09-30T06:00:00.000Z" },
"doc_count": 1
},
{
"key": { "date": "2015-10-01T06:00:00.000Z" },
"doc_count": 1
}
]
}
}
}
----
// TESTRESPONSE[s/\.\.\./"took": $body.took,"timed_out": false,"_shards": $body._shards,"hits": $body.hits,/]

include::datehistogram-aggregation.asciidoc[tag=offset-note]

===== Mixing different values source

The `sources` parameter accepts an array of values source.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -461,16 +461,19 @@ the bucket covering that day will only hold data for 23 hours instead of the usu
where you'll have only a 11h bucket on the morning of 27 March when the DST shift
happens.

[[search-aggregations-bucket-datehistogram-offset]]
===== Offset

// tag::offset-explanation[]
Use the `offset` parameter to change the start value of each bucket by the
specified positive (`+`) or negative offset (`-`) duration, such as `1h` for
an hour, or `1d` for a day. See <<time-units>> for more possible time
duration options.

For example, when using an interval of `day`, each bucket runs from midnight
to midnight. Setting the `offset` parameter to `+6h` changes each bucket
to midnight. Setting the `offset` parameter to `+6h` changes each bucket
to run from 6am to 6am:
// end::offset-explanation[]

[source,console,id=datehistogram-aggregation-offset-example]
-----------------------------
Expand Down Expand Up @@ -498,8 +501,10 @@ GET my_index/_search?size=0
}
-----------------------------

// tag::offset-result-intro[]
Instead of a single bucket starting at midnight, the above request groups the
documents into buckets starting at 6am:
// end::offset-result-intro[]

[source,console-result]
-----------------------------
Expand All @@ -525,8 +530,10 @@ documents into buckets starting at 6am:
-----------------------------
// TESTRESPONSE[s/\.\.\./"took": $body.took,"timed_out": false,"_shards": $body._shards,"hits": $body.hits,/]

// tag::offset-note[]
NOTE: The start `offset` of each bucket is calculated after `time_zone`
adjustments have been made.
// end::offset-note[]

===== Keyed Response

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -367,6 +367,40 @@ setup:
- match: { aggregations.test.buckets.0.key.date: "2017-10-21" }
- match: { aggregations.test.buckets.0.doc_count: 1 }

---
"Composite aggregation with date_histogram offset":
- skip:
version: " - 7.99.99"
reason: offset introduced in 8.0.0

- do:
search:
rest_total_hits_as_int: true
index: test
body:
aggregations:
test:
composite:
sources: [
{
"date": {
"date_histogram": {
"field": "date",
"calendar_interval": "1d",
"offset": "4h",
"format": "iso8601" # Format makes the comparisons a little more obvious
}
}
}
]

- match: {hits.total: 6}
- length: { aggregations.test.buckets: 2 }
- match: { aggregations.test.buckets.0.key.date: "2017-10-19T04:00:00.000Z" }
- match: { aggregations.test.buckets.0.doc_count: 1 }
- match: { aggregations.test.buckets.1.key.date: "2017-10-21T04:00:00.000Z" }
- match: { aggregations.test.buckets.1.doc_count: 1 }

---
"Composite aggregation with after_key in the response":
- do:
Expand Down
77 changes: 71 additions & 6 deletions server/src/main/java/org/elasticsearch/common/Rounding.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.elasticsearch.common;

import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.Version;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
Expand Down Expand Up @@ -177,6 +178,16 @@ public static Builder builder(TimeValue interval) {
return new Builder(interval);
}

/**
* Create a rounding that offsets values before passing them into another rounding
* and then un-offsets them after that rounding has done its job.
* @param delegate the other rounding to offset
* @param offset the offset, in milliseconds
*/
public static Rounding offset(Rounding delegate, long offset) {
return new OffsetRounding(delegate, offset);
}

public static class Builder {

private final DateTimeUnit unit;
Expand Down Expand Up @@ -556,19 +567,73 @@ public boolean equals(Object obj) {
}
}

static class OffsetRounding extends Rounding {
static final byte ID = 3;

private final Rounding delegate;
private final long offset;

OffsetRounding(Rounding delegate, long offset) {
this.delegate = delegate;
this.offset = offset;
}

OffsetRounding(StreamInput in) throws IOException {
delegate = Rounding.read(in);
offset = in.readLong();
}

@Override
public void innerWriteTo(StreamOutput out) throws IOException {
if (out.getVersion().before(Version.V_8_0_0)) {
throw new IllegalArgumentException("Offset rounding not supported before 8.0.0");
}
delegate.writeTo(out);
out.writeLong(offset);
}

@Override
public byte id() {
return ID;
}

@Override
public long round(long value) {
return delegate.round(value - offset) + offset;
}

@Override
public long nextRoundingValue(long value) {
// This isn't needed by the current users. We'll implement it when we migrate other users to it.
throw new UnsupportedOperationException("not yet supported");
}

@Override
public int hashCode() {
return Objects.hash(delegate, offset);
}

@Override
public boolean equals(Object obj) {
if (obj == null || getClass() != obj.getClass()) {
return false;
}
OffsetRounding other = (OffsetRounding) obj;
return delegate.equals(other.delegate) && offset == other.offset;
}
}

public static Rounding read(StreamInput in) throws IOException {
Rounding rounding;
byte id = in.readByte();
switch (id) {
case TimeUnitRounding.ID:
rounding = new TimeUnitRounding(in);
break;
return new TimeUnitRounding(in);
case TimeIntervalRounding.ID:
rounding = new TimeIntervalRounding(in);
break;
return new TimeIntervalRounding(in);
case OffsetRounding.ID:
return new OffsetRounding(in);
default:
throw new ElasticsearchException("unknown rounding id [" + id + "]");
}
return rounding;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

package org.elasticsearch.search.aggregations.bucket.composite;

import org.elasticsearch.Version;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.Rounding;
import org.elasticsearch.common.io.stream.StreamInput;
Expand All @@ -31,9 +32,11 @@
import org.elasticsearch.index.query.QueryShardContext;
import org.elasticsearch.script.Script;
import org.elasticsearch.search.DocValueFormat;
import org.elasticsearch.search.aggregations.bucket.histogram.DateHistogramAggregationBuilder;
import org.elasticsearch.search.aggregations.bucket.histogram.DateHistogramInterval;
import org.elasticsearch.search.aggregations.bucket.histogram.DateIntervalConsumer;
import org.elasticsearch.search.aggregations.bucket.histogram.DateIntervalWrapper;
import org.elasticsearch.search.aggregations.bucket.histogram.Histogram;
import org.elasticsearch.search.aggregations.support.ValueType;
import org.elasticsearch.search.aggregations.support.ValuesSource;
import org.elasticsearch.search.aggregations.support.ValuesSourceConfig;
Expand All @@ -56,6 +59,13 @@ public class DateHistogramValuesSourceBuilder
PARSER = new ObjectParser<>(DateHistogramValuesSourceBuilder.TYPE);
PARSER.declareString(DateHistogramValuesSourceBuilder::format, new ParseField("format"));
DateIntervalWrapper.declareIntervalFields(PARSER);
PARSER.declareField(DateHistogramValuesSourceBuilder::offset, p -> {
if (p.currentToken() == XContentParser.Token.VALUE_NUMBER) {
return p.longValue();
} else {
return DateHistogramAggregationBuilder.parseStringOffset(p.text());
}
}, Histogram.OFFSET_FIELD, ObjectParser.ValueType.LONG);
PARSER.declareField(DateHistogramValuesSourceBuilder::timeZone, p -> {
if (p.currentToken() == XContentParser.Token.VALUE_STRING) {
return ZoneId.of(p.text());
Expand All @@ -71,6 +81,7 @@ static DateHistogramValuesSourceBuilder parse(String name, XContentParser parser

private ZoneId timeZone = null;
private DateIntervalWrapper dateHistogramInterval = new DateIntervalWrapper();
private long offset = 0;

public DateHistogramValuesSourceBuilder(String name) {
super(name, ValueType.DATE);
Expand All @@ -80,12 +91,18 @@ protected DateHistogramValuesSourceBuilder(StreamInput in) throws IOException {
super(in);
dateHistogramInterval = new DateIntervalWrapper(in);
timeZone = in.readOptionalZoneId();
if (in.getVersion().after(Version.V_8_0_0)) {
offset = in.readLong();
}
}

@Override
protected void innerWriteTo(StreamOutput out) throws IOException {
dateHistogramInterval.writeTo(out);
out.writeOptionalZoneId(timeZone);
if (out.getVersion().after(Version.V_8_0_0)) {
out.writeLong(offset);
}
}

@Override
Expand Down Expand Up @@ -215,9 +232,28 @@ public ZoneId timeZone() {
return timeZone;
}

/**
* Get the offset to use when rounding, which is a number of milliseconds.
*/
public long offset() {
return offset;
}

/**
* Set the offset on this builder, which is a number of milliseconds.
* @return this for chaining
*/
public DateHistogramValuesSourceBuilder offset(long offset) {
this.offset = offset;
return this;
}

@Override
protected CompositeValuesSourceConfig innerBuild(QueryShardContext queryShardContext, ValuesSourceConfig<?> config) throws IOException {
Rounding rounding = dateHistogramInterval.createRounding(timeZone());
if (offset != 0) {
rounding = Rounding.offset(rounding, offset);
}
ValuesSource orig = config.toValuesSource(queryShardContext);
if (orig == null) {
orig = ValuesSource.Numeric.EMPTY;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -285,7 +285,10 @@ public DateHistogramAggregationBuilder offset(String offset) {
return offset(parseStringOffset(offset));
}

static long parseStringOffset(String offset) {
/**
* Parse the string specification of an offset.
*/
public static long parseStringOffset(String offset) {
if (offset.charAt(0) == '-') {
return -TimeValue
.parseTimeValue(offset.substring(1), null, DateHistogramAggregationBuilder.class.getSimpleName() + ".parseOffset")
Expand Down
10 changes: 10 additions & 0 deletions server/src/test/java/org/elasticsearch/common/RoundingTests.java
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,16 @@ public void testTimeUnitRoundingDST() {
assertThat(tzRounding_chg.round(time("2014-11-02T06:01:01", chg)), isDate(time("2014-11-02T06:00:00", chg), chg));
}

public void testOffsetRounding() {
long twoHours = TimeUnit.HOURS.toMillis(2);
long oneDay = TimeUnit.DAYS.toMillis(1);
Rounding dayOfMonth = Rounding.builder(Rounding.DateTimeUnit.DAY_OF_MONTH).build();
Rounding offsetRounding = Rounding.offset(dayOfMonth, twoHours);
assertThat(offsetRounding.round(0), equalTo(-oneDay + twoHours));

assertThat(offsetRounding.round(twoHours), equalTo(twoHours));
}

/**
* Randomized test on TimeUnitRounding. Test uses random
* {@link DateTimeUnit} and {@link ZoneId} and often (50% of the time)
Expand Down
Loading

0 comments on commit 3032d62

Please sign in to comment.