From a8b13900a1a87be05402d08ec3314b27c278f053 Mon Sep 17 00:00:00 2001 From: Colin Goodheart-Smithe Date: Tue, 25 Aug 2015 16:22:44 +0200 Subject: [PATCH] Aggregations Refactor: Refactor Derivative Aggregation --- .../pipeline/derivative/DerivativeParser.java | 34 ++---- .../DerivativePipelineAggregator.java | 100 ++++++++++++++++-- .../pipeline/DerivativeTests.java | 51 +++++++++ 3 files changed, 154 insertions(+), 31 deletions(-) create mode 100644 core/src/test/java/org/elasticsearch/search/aggregations/pipeline/DerivativeTests.java diff --git a/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/derivative/DerivativeParser.java b/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/derivative/DerivativeParser.java index a12f77e75400a..bd7b634a8c959 100644 --- a/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/derivative/DerivativeParser.java +++ b/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/derivative/DerivativeParser.java @@ -20,16 +20,11 @@ package org.elasticsearch.search.aggregations.pipeline.derivative; import org.elasticsearch.common.ParseField; -import org.elasticsearch.common.rounding.DateTimeUnit; -import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.xcontent.XContentParser; import org.elasticsearch.search.SearchParseException; -import org.elasticsearch.search.aggregations.bucket.histogram.HistogramAggregator; import org.elasticsearch.search.aggregations.pipeline.BucketHelpers.GapPolicy; import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator; import org.elasticsearch.search.aggregations.pipeline.PipelineAggregatorFactory; -import org.elasticsearch.search.aggregations.support.format.ValueFormat; -import org.elasticsearch.search.aggregations.support.format.ValueFormatter; import org.elasticsearch.search.internal.SearchContext; import java.io.IOException; @@ -54,7 +49,7 @@ public PipelineAggregatorFactory parse(String pipelineAggregatorName, XContentPa String[] bucketsPaths = null; String format = null; String units = null; - GapPolicy gapPolicy = GapPolicy.SKIP; + GapPolicy gapPolicy = null; while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) { if (token == XContentParser.Token.FIELD_NAME) { @@ -95,33 +90,22 @@ public PipelineAggregatorFactory parse(String pipelineAggregatorName, XContentPa + "] for derivative aggregation [" + pipelineAggregatorName + "]", parser.getTokenLocation()); } - ValueFormatter formatter = null; + DerivativePipelineAggregator.Factory factory = new DerivativePipelineAggregator.Factory(pipelineAggregatorName, bucketsPaths); if (format != null) { - formatter = ValueFormat.Patternable.Number.format(format).formatter(); - } else { - formatter = ValueFormatter.RAW; + factory.format(format); + } + if (gapPolicy != null) { + factory.gapPolicy(gapPolicy); } - - Long xAxisUnits = null; if (units != null) { - DateTimeUnit dateTimeUnit = HistogramAggregator.DateHistogramFactory.DATE_FIELD_UNITS.get(units); - if (dateTimeUnit != null) { - xAxisUnits = dateTimeUnit.field().getDurationField().getUnitMillis(); - } else { - TimeValue timeValue = TimeValue.parseTimeValue(units, null, getClass().getSimpleName() + ".unit"); - if (timeValue != null) { - xAxisUnits = timeValue.getMillis(); + factory.units(units); } - } - } - - return new DerivativePipelineAggregator.Factory(pipelineAggregatorName, bucketsPaths, formatter, gapPolicy, xAxisUnits); + return factory; } - // NORELEASE implement this method when refactoring this aggregation @Override public PipelineAggregatorFactory getFactoryPrototype() { - return null; + return new DerivativePipelineAggregator.Factory(null, null); } } diff --git a/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/derivative/DerivativePipelineAggregator.java b/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/derivative/DerivativePipelineAggregator.java index 3da0d93e8e091..b8ff35cdf7272 100644 --- a/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/derivative/DerivativePipelineAggregator.java +++ b/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/derivative/DerivativePipelineAggregator.java @@ -21,6 +21,9 @@ import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.rounding.DateTimeUnit; +import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.search.aggregations.AggregationExecutionException; import org.elasticsearch.search.aggregations.AggregatorFactory; import org.elasticsearch.search.aggregations.InternalAggregation; @@ -33,6 +36,7 @@ import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator; import org.elasticsearch.search.aggregations.pipeline.PipelineAggregatorFactory; import org.elasticsearch.search.aggregations.pipeline.PipelineAggregatorStreams; +import org.elasticsearch.search.aggregations.support.format.ValueFormat; import org.elasticsearch.search.aggregations.support.format.ValueFormatter; import org.elasticsearch.search.aggregations.support.format.ValueFormatterStreams; import org.joda.time.DateTime; @@ -41,6 +45,7 @@ import java.util.ArrayList; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.stream.Collectors; import java.util.stream.StreamSupport; @@ -154,19 +159,46 @@ public void doWriteTo(StreamOutput out) throws IOException { public static class Factory extends PipelineAggregatorFactory { - private final ValueFormatter formatter; - private GapPolicy gapPolicy; - private Long xAxisUnits; + private String format; + private GapPolicy gapPolicy = GapPolicy.SKIP; + private String units; - public Factory(String name, String[] bucketsPaths, ValueFormatter formatter, GapPolicy gapPolicy, Long xAxisUnits) { + public Factory(String name, String[] bucketsPaths) { super(name, TYPE.name(), bucketsPaths); - this.formatter = formatter; + } + + public void format(String format) { + this.format = format; + } + + public void gapPolicy(GapPolicy gapPolicy) { this.gapPolicy = gapPolicy; - this.xAxisUnits = xAxisUnits; + } + + public void units(String units) { + this.units = units; } @Override protected PipelineAggregator createInternal(Map metaData) throws IOException { + ValueFormatter formatter; + if (format != null) { + formatter = ValueFormat.Patternable.Number.format(format).formatter(); + } else { + formatter = ValueFormatter.RAW; + } + Long xAxisUnits = null; + if (units != null) { + DateTimeUnit dateTimeUnit = HistogramAggregator.DateHistogramFactory.DATE_FIELD_UNITS.get(units); + if (dateTimeUnit != null) { + xAxisUnits = dateTimeUnit.field().getDurationField().getUnitMillis(); + } else { + TimeValue timeValue = TimeValue.parseTimeValue(units, null, getClass().getSimpleName() + ".unit"); + if (timeValue != null) { + xAxisUnits = timeValue.getMillis(); + } + } + } return new DerivativePipelineAggregator(name, bucketsPaths, formatter, gapPolicy, xAxisUnits, metaData); } @@ -188,5 +220,61 @@ public void doValidate(AggregatorFactory parent, AggregatorFactory[] aggFactorie } } + @Override + protected PipelineAggregatorFactory doReadFrom(String name, String[] bucketsPaths, StreamInput in) throws IOException { + Factory factory = new Factory(name, bucketsPaths); + factory.format = in.readOptionalString(); + if (in.readBoolean()) { + factory.gapPolicy = GapPolicy.readFrom(in); + } + factory.units = in.readOptionalString(); + return factory; + } + + @Override + protected void doWriteTo(StreamOutput out) throws IOException { + out.writeOptionalString(format); + boolean hasGapPolicy = gapPolicy != null; + out.writeBoolean(hasGapPolicy); + if (hasGapPolicy) { + gapPolicy.writeTo(out); + } + out.writeOptionalString(units); + } + + @Override + protected XContentBuilder internalXContent(XContentBuilder builder, Params params) throws IOException { + if (format != null) { + builder.field(DerivativeParser.FORMAT.getPreferredName(), format); + } + if (gapPolicy != null) { + builder.field(DerivativeParser.GAP_POLICY.getPreferredName(), gapPolicy.getName()); + } + if (units != null) { + builder.field(DerivativeParser.UNIT.getPreferredName(), units); + } + return builder; + } + + @Override + protected boolean doEquals(Object obj) { + Factory other = (Factory) obj; + if (!Objects.equals(format, other.format)) { + return false; + } + if (!Objects.equals(gapPolicy, other.gapPolicy)) { + return false; + } + if (!Objects.equals(units, other.units)) { + return false; + } + return true; + } + + @Override + protected int doHashCode() { + return Objects.hash(format, gapPolicy, units); + } + } } diff --git a/core/src/test/java/org/elasticsearch/search/aggregations/pipeline/DerivativeTests.java b/core/src/test/java/org/elasticsearch/search/aggregations/pipeline/DerivativeTests.java new file mode 100644 index 0000000000000..b2cd1d4be6247 --- /dev/null +++ b/core/src/test/java/org/elasticsearch/search/aggregations/pipeline/DerivativeTests.java @@ -0,0 +1,51 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.search.aggregations.pipeline; + +import org.elasticsearch.search.aggregations.BasePipelineAggregationTestCase; +import org.elasticsearch.search.aggregations.pipeline.BucketHelpers.GapPolicy; +import org.elasticsearch.search.aggregations.pipeline.derivative.DerivativePipelineAggregator; +import org.elasticsearch.search.aggregations.pipeline.derivative.DerivativePipelineAggregator.Factory; + +public class DerivativeTests extends BasePipelineAggregationTestCase { + + @Override + protected Factory createTestAggregatorFactory() { + String name = randomAsciiOfLengthBetween(3, 20); + String[] bucketsPaths = new String[1]; + bucketsPaths[0] = randomAsciiOfLengthBetween(3, 20); + Factory factory = new Factory(name, bucketsPaths); + if (randomBoolean()) { + factory.format(randomAsciiOfLengthBetween(1, 10)); + } + if (randomBoolean()) { + factory.gapPolicy(randomFrom(GapPolicy.values())); + } + if (randomBoolean()) { + if (randomBoolean()) { + factory.units(String.valueOf(randomInt())); + } else { + factory.units(String.valueOf(randomIntBetween(1, 10) + randomFrom("s", "m", "h", "d", "w", "M", "y"))); + } + } + return factory; + } + +}