Skip to content

Commit

Permalink
Adds the ability to specify a format on composite date_histogram sour…
Browse files Browse the repository at this point in the history
…ce (#28310)

This commit adds the ability to specify a date format on the `date_histogram` composite source.
If the format is defined, the key for the source is returned as a formatted date.

Closes #27923
  • Loading branch information
jimczi authored Jan 23, 2018
1 parent d31e964 commit 19cfc25
Show file tree
Hide file tree
Showing 15 changed files with 401 additions and 76 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -225,7 +225,41 @@ Note that fractional time values are not supported, but you can address this by
time unit (e.g., `1.5h` could instead be specified as `90m`).

[float]
===== Time Zone
====== Format

Internally, a date is represented as a 64 bit number representing a timestamp in milliseconds-since-the-epoch.
These timestamps are returned as the bucket keys. It is possible to return a formatted date string instead using
the format specified with the format parameter:

[source,js]
--------------------------------------------------
GET /_search
{
"aggs" : {
"my_buckets": {
"composite" : {
"sources" : [
{
"date": {
"date_histogram" : {
"field": "timestamp",
"interval": "1d",
"format": "yyyy-MM-dd" <1>
}
}
}
]
}
}
}
}
--------------------------------------------------
// CONSOLE

<1> Supports expressive date <<date-format-pattern,format pattern>>

[float]
====== Time Zone

Date-times are stored in Elasticsearch in UTC. By default, all bucketing and
rounding is also done in UTC. The `time_zone` parameter can be used to indicate
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ setup:
mappings:
doc:
properties:
date:
type: date
keyword:
type: keyword
long:
Expand Down Expand Up @@ -40,6 +42,20 @@ setup:
id: 4
body: { "keyword": "bar", "long": [1000, 0] }

- do:
index:
index: test
type: doc
id: 5
body: { "date": "2017-10-20T03:08:45" }

- do:
index:
index: test
type: doc
id: 6
body: { "date": "2017-10-21T07:00:00" }

- do:
indices.refresh:
index: [test]
Expand All @@ -66,7 +82,7 @@ setup:
}
]

- match: {hits.total: 4}
- match: {hits.total: 6}
- length: { aggregations.test.buckets: 2 }
- match: { aggregations.test.buckets.0.key.kw: "bar" }
- match: { aggregations.test.buckets.0.doc_count: 3 }
Expand Down Expand Up @@ -104,7 +120,7 @@ setup:
}
]

- match: {hits.total: 4}
- match: {hits.total: 6}
- length: { aggregations.test.buckets: 5 }
- match: { aggregations.test.buckets.0.key.long: 0}
- match: { aggregations.test.buckets.0.key.kw: "bar" }
Expand Down Expand Up @@ -154,7 +170,7 @@ setup:
]
after: { "long": 20, "kw": "foo" }

- match: {hits.total: 4}
- match: {hits.total: 6}
- length: { aggregations.test.buckets: 2 }
- match: { aggregations.test.buckets.0.key.long: 100 }
- match: { aggregations.test.buckets.0.key.kw: "bar" }
Expand Down Expand Up @@ -188,7 +204,7 @@ setup:
]
after: { "kw": "delta" }

- match: {hits.total: 4}
- match: {hits.total: 6}
- length: { aggregations.test.buckets: 1 }
- match: { aggregations.test.buckets.0.key.kw: "foo" }
- match: { aggregations.test.buckets.0.doc_count: 2 }
Expand Down Expand Up @@ -220,3 +236,62 @@ setup:
}
}
]

---
"Composite aggregation with format":
- skip:
version: " - 6.99.99"
reason: this uses a new option (format) added in 7.0.0

- do:
search:
index: test
body:
aggregations:
test:
composite:
sources: [
{
"date": {
"date_histogram": {
"field": "date",
"interval": "1d",
"format": "yyyy-MM-dd"
}
}
}
]

- match: {hits.total: 6}
- length: { aggregations.test.buckets: 2 }
- match: { aggregations.test.buckets.0.key.date: "2017-10-20" }
- match: { aggregations.test.buckets.0.doc_count: 1 }
- match: { aggregations.test.buckets.1.key.date: "2017-10-21" }
- match: { aggregations.test.buckets.1.doc_count: 1 }

- do:
search:
index: test
body:
aggregations:
test:
composite:
after: {
date: "2017-10-20"
}
sources: [
{
"date": {
"date_histogram": {
"field": "date",
"interval": "1d",
"format": "yyyy-MM-dd"
}
}
}
]

- match: {hits.total: 6}
- length: { aggregations.test.buckets: 1 }
- match: { aggregations.test.buckets.0.key.date: "2017-10-21" }
- match: { aggregations.test.buckets.0.doc_count: 1 }
Original file line number Diff line number Diff line change
Expand Up @@ -147,17 +147,15 @@ protected AggregatorFactory<?> doBuild(SearchContext context, AggregatorFactory<
Sort sort = indexSortConfig.buildIndexSort(shardContext::fieldMapper, shardContext::getForField);
System.arraycopy(sort.getSort(), 0, sortFields, 0, sortFields.length);
}
List<String> sourceNames = new ArrayList<>();
for (int i = 0; i < configs.length; i++) {
configs[i] = sources.get(i).build(context, i, configs.length, sortFields[i]);
sourceNames.add(sources.get(i).name());
if (configs[i].valuesSource().needsScores()) {
throw new IllegalArgumentException("[sources] cannot access _score");
}
}
final CompositeKey afterKey;
if (after != null) {
if (after.size() != sources.size()) {
if (after.size() != configs.length) {
throw new IllegalArgumentException("[after] has " + after.size() +
" value(s) but [sources] has " + sources.size());
}
Expand All @@ -179,7 +177,7 @@ protected AggregatorFactory<?> doBuild(SearchContext context, AggregatorFactory<
} else {
afterKey = null;
}
return new CompositeAggregationFactory(name, context, parent, subfactoriesBuilder, metaData, size, configs, sourceNames, afterKey);
return new CompositeAggregationFactory(name, context, parent, subfactoriesBuilder, metaData, size, configs, afterKey);
}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,24 +32,21 @@
class CompositeAggregationFactory extends AggregatorFactory<CompositeAggregationFactory> {
private final int size;
private final CompositeValuesSourceConfig[] sources;
private final List<String> sourceNames;
private final CompositeKey afterKey;

CompositeAggregationFactory(String name, SearchContext context, AggregatorFactory<?> parent,
AggregatorFactories.Builder subFactoriesBuilder, Map<String, Object> metaData,
int size, CompositeValuesSourceConfig[] sources,
List<String> sourceNames, CompositeKey afterKey) throws IOException {
int size, CompositeValuesSourceConfig[] sources, CompositeKey afterKey) throws IOException {
super(name, context, parent, subFactoriesBuilder, metaData);
this.size = size;
this.sources = sources;
this.sourceNames = sourceNames;
this.afterKey = afterKey;
}

@Override
protected Aggregator createInternal(Aggregator parent, boolean collectsFromSingleBucket,
List<PipelineAggregator> pipelineAggregators, Map<String, Object> metaData) throws IOException {
return new CompositeAggregator(name, factories, context, parent, pipelineAggregators, metaData,
size, sources, sourceNames, afterKey);
size, sources, afterKey);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.lucene.search.Scorer;
import org.apache.lucene.search.Weight;
import org.apache.lucene.util.RoaringDocIdSet;
import org.elasticsearch.search.DocValueFormat;
import org.elasticsearch.search.aggregations.Aggregator;
import org.elasticsearch.search.aggregations.AggregatorFactories;
import org.elasticsearch.search.aggregations.InternalAggregation;
Expand All @@ -43,11 +44,13 @@
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
import java.util.stream.Collectors;

final class CompositeAggregator extends BucketsAggregator {
private final int size;
private final CompositeValuesSourceConfig[] sources;
private final List<String> sourceNames;
private final List<DocValueFormat> formats;
private final boolean canEarlyTerminate;

private final TreeMap<Integer, Integer> keys;
Expand All @@ -59,12 +62,12 @@ final class CompositeAggregator extends BucketsAggregator {

CompositeAggregator(String name, AggregatorFactories factories, SearchContext context, Aggregator parent,
List<PipelineAggregator> pipelineAggregators, Map<String, Object> metaData,
int size, CompositeValuesSourceConfig[] sources, List<String> sourceNames,
CompositeKey rawAfterKey) throws IOException {
int size, CompositeValuesSourceConfig[] sources, CompositeKey rawAfterKey) throws IOException {
super(name, factories, context, parent, pipelineAggregators, metaData);
this.size = size;
this.sources = sources;
this.sourceNames = sourceNames;
this.sourceNames = Arrays.stream(sources).map(CompositeValuesSourceConfig::name).collect(Collectors.toList());
this.formats = Arrays.stream(sources).map(CompositeValuesSourceConfig::format).collect(Collectors.toList());
// we use slot 0 to fill the current document (size+1).
this.array = new CompositeValuesComparator(context.searcher().getIndexReader(), sources, size+1);
if (rawAfterKey != null) {
Expand Down Expand Up @@ -131,15 +134,17 @@ public InternalAggregation buildAggregation(long zeroBucket) throws IOException
CompositeKey key = array.toCompositeKey(slot);
InternalAggregations aggs = bucketAggregations(slot);
int docCount = bucketDocCount(slot);
buckets[pos++] = new InternalComposite.InternalBucket(sourceNames, key, reverseMuls, docCount, aggs);
buckets[pos++] = new InternalComposite.InternalBucket(sourceNames, formats, key, reverseMuls, docCount, aggs);
}
return new InternalComposite(name, size, sourceNames, Arrays.asList(buckets), reverseMuls, pipelineAggregators(), metaData());
return new InternalComposite(name, size, sourceNames, formats, Arrays.asList(buckets), reverseMuls,
pipelineAggregators(), metaData());
}

@Override
public InternalAggregation buildEmptyAggregation() {
final int[] reverseMuls = getReverseMuls();
return new InternalComposite(name, size, sourceNames, Collections.emptyList(), reverseMuls, pipelineAggregators(), metaData());
return new InternalComposite(name, size, sourceNames, formats, Collections.emptyList(), reverseMuls,
pipelineAggregators(), metaData());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ final class CompositeValuesComparator {
if (vs.isFloatingPoint()) {
arrays[i] = CompositeValuesSource.wrapDouble(vs, size, reverseMul);
} else {
arrays[i] = CompositeValuesSource.wrapLong(vs, size, reverseMul);
arrays[i] = CompositeValuesSource.wrapLong(vs, sources[i].format(), size, reverseMul);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,10 @@
import org.apache.lucene.index.SortedSetDocValues;
import org.apache.lucene.search.LeafCollector;
import org.apache.lucene.util.BytesRef;
import org.elasticsearch.common.joda.FormatDateTimeFormatter;
import org.elasticsearch.index.fielddata.SortedBinaryDocValues;
import org.elasticsearch.index.fielddata.SortedNumericDoubleValues;
import org.elasticsearch.search.DocValueFormat;
import org.elasticsearch.search.aggregations.support.ValuesSource;
import org.elasticsearch.search.sort.SortOrder;

Expand Down Expand Up @@ -96,8 +98,9 @@ interface Collector {
/**
* Creates a {@link CompositeValuesSource} that generates long values.
*/
static CompositeValuesSource<ValuesSource.Numeric, Long> wrapLong(ValuesSource.Numeric vs, int size, int reverseMul) {
return new LongValuesSource(vs, size, reverseMul);
static CompositeValuesSource<ValuesSource.Numeric, Long> wrapLong(ValuesSource.Numeric vs, DocValueFormat format,
int size, int reverseMul) {
return new LongValuesSource(vs, format, size, reverseMul);
}

/**
Expand Down Expand Up @@ -273,9 +276,12 @@ Collector getLeafCollector(LeafReaderContext context, Collector next) throws IOE
*/
private static class LongValuesSource extends CompositeValuesSource<ValuesSource.Numeric, Long> {
private final long[] values;
// handles "format" for date histogram source
private final DocValueFormat format;

LongValuesSource(ValuesSource.Numeric vs, int size, int reverseMul) {
LongValuesSource(ValuesSource.Numeric vs, DocValueFormat format, int size, int reverseMul) {
super(vs, size, reverseMul);
this.format = format;
this.values = new long[size];
}

Expand Down Expand Up @@ -304,7 +310,11 @@ void setTop(Comparable<?> value) {
if (value instanceof Number) {
topValue = ((Number) value).longValue();
} else {
topValue = Long.parseLong(value.toString());
// for date histogram source with "format", the after value is formatted
// as a string so we need to retrieve the original value in milliseconds.
topValue = format.parseLong(value.toString(), false, () -> {
throw new IllegalArgumentException("now() is not supported in [after] key");
});
}
}

Expand Down
Loading

0 comments on commit 19cfc25

Please sign in to comment.