Skip to content

Commit

Permalink
Aggregations Refactor: Refactor Range Aggregations
Browse files Browse the repository at this point in the history
  • Loading branch information
colings86 committed Dec 14, 2015
1 parent 97140f8 commit 97cbdb8
Show file tree
Hide file tree
Showing 17 changed files with 1,095 additions and 407 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -113,4 +113,8 @@ static String octetsToCIDR(int[] octets, int networkMask) {
assert octets.length == 4;
return octetsToString(octets) + "/" + networkMask;
}

public static String createCIDR(long ipAddress, int networkMask) {
return octetsToCIDR(longToOctets(ipAddress), networkMask);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@
import org.elasticsearch.search.aggregations.bucket.BucketStreamContext;
import org.elasticsearch.search.aggregations.bucket.BucketStreams;
import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator;
import org.elasticsearch.search.aggregations.support.ValueType;
import org.elasticsearch.search.aggregations.support.ValuesSourceType;
import org.elasticsearch.search.aggregations.support.format.ValueFormatter;
import org.elasticsearch.search.aggregations.support.format.ValueFormatterStreams;

Expand Down Expand Up @@ -225,6 +227,14 @@ public Type type() {
return TYPE;
}

public ValuesSourceType getValueSourceType() {
return ValuesSourceType.NUMERIC;
}

public ValueType getValueType() {
return ValueType.NUMERIC;
}

public R create(String name, List<B> ranges, ValueFormatter formatter, boolean keyed, List<PipelineAggregator> pipelineAggregators,
Map<String, Object> metaData) {
return (R) new InternalRange<>(name, ranges, formatter, keyed, pipelineAggregators, metaData);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,14 @@

import org.apache.lucene.index.LeafReaderContext;
import org.apache.lucene.util.InPlaceMergeSorter;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.ParseFieldMatcher;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.index.fielddata.SortedNumericDoubleValues;
import org.elasticsearch.search.aggregations.Aggregator;
import org.elasticsearch.search.aggregations.AggregatorFactories;
Expand All @@ -31,9 +39,11 @@
import org.elasticsearch.search.aggregations.bucket.BucketsAggregator;
import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator;
import org.elasticsearch.search.aggregations.support.AggregationContext;
import org.elasticsearch.search.aggregations.support.ValueType;
import org.elasticsearch.search.aggregations.support.ValuesSource;
import org.elasticsearch.search.aggregations.support.ValuesSource.Numeric;
import org.elasticsearch.search.aggregations.support.ValuesSourceAggregatorFactory;
import org.elasticsearch.search.aggregations.support.ValuesSourceParser;
import org.elasticsearch.search.aggregations.support.ValuesSourceType;
import org.elasticsearch.search.aggregations.support.format.ValueFormat;
import org.elasticsearch.search.aggregations.support.format.ValueFormatter;
import org.elasticsearch.search.aggregations.support.format.ValueParser;
Expand All @@ -43,21 +53,38 @@
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Objects;

/**
*
*/
public class RangeAggregator extends BucketsAggregator {

public static class Range {
public static final ParseField RANGES_FIELD = new ParseField("ranges");
public static final ParseField KEYED_FIELD = new ParseField("keyed");

public String key;
public double from = Double.NEGATIVE_INFINITY;
String fromAsStr;
public double to = Double.POSITIVE_INFINITY;
String toAsStr;
public static class Range implements Writeable<Range>, ToXContent {

public Range(String key, double from, String fromAsStr, double to, String toAsStr) {
public static final Range PROTOTYPE = new Range(null, -1, null, -1, null);
public static final ParseField KEY_FIELD = new ParseField("key");
public static final ParseField FROM_FIELD = new ParseField("from");
public static final ParseField TO_FIELD = new ParseField("to");

protected String key;
protected double from = Double.NEGATIVE_INFINITY;
protected String fromAsStr;
protected double to = Double.POSITIVE_INFINITY;
protected String toAsStr;

public Range(String key, double from, double to) {
this(key, from, null, to, null);
}

public Range(String key, String from, String to) {
this(key, Double.NEGATIVE_INFINITY, from, Double.POSITIVE_INFINITY, to);
}

protected Range(String key, double from, String fromAsStr, double to, String toAsStr) {
this.key = key;
this.from = from;
this.fromAsStr = fromAsStr;
Expand All @@ -83,6 +110,99 @@ public void process(ValueParser parser, SearchContext context) {
to = parser.parseDouble(toAsStr, context);
}
}

@Override
public Range readFrom(StreamInput in) throws IOException {
String key = in.readOptionalString();
String fromAsStr = in.readOptionalString();
String toAsStr = in.readOptionalString();
double from = in.readDouble();
double to = in.readDouble();
return new Range(key, from, fromAsStr, to, toAsStr);
}

@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeOptionalString(key);
out.writeOptionalString(fromAsStr);
out.writeOptionalString(toAsStr);
out.writeDouble(from);
out.writeDouble(to);
}

public Range fromXContent(XContentParser parser, ParseFieldMatcher parseFieldMatcher) throws IOException {

XContentParser.Token token;
String currentFieldName = null;
double from = Double.NEGATIVE_INFINITY;
String fromAsStr = null;
double to = Double.POSITIVE_INFINITY;
String toAsStr = null;
String key = null;
while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) {
if (token == XContentParser.Token.FIELD_NAME) {
currentFieldName = parser.currentName();
} else if (token == XContentParser.Token.VALUE_NUMBER) {
if (parseFieldMatcher.match(currentFieldName, FROM_FIELD)) {
from = parser.doubleValue();
} else if (parseFieldMatcher.match(currentFieldName, TO_FIELD)) {
to = parser.doubleValue();
}
} else if (token == XContentParser.Token.VALUE_STRING) {
if (parseFieldMatcher.match(currentFieldName, FROM_FIELD)) {
fromAsStr = parser.text();
} else if (parseFieldMatcher.match(currentFieldName, TO_FIELD)) {
toAsStr = parser.text();
} else if (parseFieldMatcher.match(currentFieldName, KEY_FIELD)) {
key = parser.text();
}
}
}
return new Range(key, from, fromAsStr, to, toAsStr);
}

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
if (key != null) {
builder.field(KEY_FIELD.getPreferredName(), key);
}
if (Double.isFinite(from)) {
builder.field(FROM_FIELD.getPreferredName(), from);
}
if (Double.isFinite(to)) {
builder.field(TO_FIELD.getPreferredName(), to);
}
if (fromAsStr != null) {
builder.field(FROM_FIELD.getPreferredName(), fromAsStr);
}
if (toAsStr != null) {
builder.field(TO_FIELD.getPreferredName(), toAsStr);
}
builder.endObject();
return builder;
}

@Override
public int hashCode() {
return Objects.hash(key, from, fromAsStr, to, toAsStr);
}

@Override
public boolean equals(Object obj) {
if (obj == null) {
return false;
}
if (getClass() != obj.getClass()) {
return false;
}
Range other = (Range) obj;
return Objects.equals(key, other.key)
&& Objects.equals(from, other.from)
&& Objects.equals(fromAsStr, other.fromAsStr)
&& Objects.equals(to, other.to)
&& Objects.equals(toAsStr, other.toAsStr);
}
}

final ValuesSource.Numeric valuesSource;
Expand All @@ -94,7 +214,7 @@ public void process(ValueParser parser, SearchContext context) {
final double[] maxTo;

public RangeAggregator(String name, AggregatorFactories factories, ValuesSource.Numeric valuesSource, ValueFormat format,
InternalRange.Factory rangeFactory, List<Range> ranges, boolean keyed, AggregationContext aggregationContext,
InternalRange.Factory rangeFactory, List<? extends Range> ranges, boolean keyed, AggregationContext aggregationContext,
Aggregator parent, List<PipelineAggregator> pipelineAggregators, Map<String, Object> metaData) throws IOException {

super(name, factories, aggregationContext, parent, pipelineAggregators, metaData);
Expand Down Expand Up @@ -245,12 +365,13 @@ protected int compare(int i, int j) {

public static class Unmapped extends NonCollectingAggregator {

private final List<RangeAggregator.Range> ranges;
private final List<? extends RangeAggregator.Range> ranges;
private final boolean keyed;
private final InternalRange.Factory factory;
private final ValueFormatter formatter;

public Unmapped(String name, List<RangeAggregator.Range> ranges, boolean keyed, ValueFormat format, AggregationContext context,
public Unmapped(String name, List<? extends RangeAggregator.Range> ranges, boolean keyed, ValueFormat format,
AggregationContext context,
Aggregator parent, InternalRange.Factory factory, List<PipelineAggregator> pipelineAggregators, Map<String, Object> metaData)
throws IOException {

Expand Down Expand Up @@ -279,17 +400,27 @@ public InternalAggregation buildEmptyAggregation() {
public static class Factory extends ValuesSourceAggregatorFactory<ValuesSource.Numeric> {

private final InternalRange.Factory rangeFactory;
private final List<Range> ranges;
private final boolean keyed;
private final List<? extends Range> ranges;
private boolean keyed = false;

public Factory(String name, List<? extends Range> ranges) {
this(name, InternalRange.FACTORY, ranges);
}

public Factory(String name, ValuesSourceParser.Input<ValuesSource.Numeric> valueSourceInput, InternalRange.Factory rangeFactory,
List<Range> ranges, boolean keyed) {
super(name, rangeFactory.type(), valueSourceInput);
protected Factory(String name, InternalRange.Factory rangeFactory, List<? extends Range> ranges) {
super(name, rangeFactory.type(), rangeFactory.getValueSourceType(), rangeFactory.getValueType());
this.rangeFactory = rangeFactory;
this.ranges = ranges;
}

public void keyed(boolean keyed) {
this.keyed = keyed;
}

public boolean keyed() {
return keyed;
}

@Override
protected Aggregator createUnmapped(AggregationContext aggregationContext, Aggregator parent, List<PipelineAggregator> pipelineAggregators,
Map<String, Object> metaData) throws IOException {
Expand All @@ -301,6 +432,51 @@ protected Aggregator doCreateInternal(ValuesSource.Numeric valuesSource, Aggrega
boolean collectsFromSingleBucket, List<PipelineAggregator> pipelineAggregators, Map<String, Object> metaData) throws IOException {
return new RangeAggregator(name, factories, valuesSource, config.format(), rangeFactory, ranges, keyed, aggregationContext, parent, pipelineAggregators, metaData);
}

@Override
protected XContentBuilder doXContentBody(XContentBuilder builder, Params params) throws IOException {
builder.field(RANGES_FIELD.getPreferredName(), ranges);
builder.field(KEYED_FIELD.getPreferredName(), keyed);
return builder;
}

@Override
protected ValuesSourceAggregatorFactory<Numeric> innerReadFrom(String name, ValuesSourceType valuesSourceType,
ValueType targetValueType, StreamInput in) throws IOException {
Factory factory = createFactoryFromStream(name, in);
factory.keyed = in.readBoolean();
return factory;
}

protected Factory createFactoryFromStream(String name, StreamInput in) throws IOException {
int size = in.readVInt();
List<Range> ranges = new ArrayList<>(size);
for (int i = 0; i < size; i++) {
ranges.add(Range.PROTOTYPE.readFrom(in));
}
return new Factory(name, ranges);
}

@Override
protected void innerWriteTo(StreamOutput out) throws IOException {
out.writeVInt(ranges.size());
for (Range range : ranges) {
range.writeTo(out);
}
out.writeBoolean(keyed);
}

@Override
protected int innerHashCode() {
return Objects.hash(ranges, keyed);
}

@Override
protected boolean innerEquals(Object obj) {
Factory other = (Factory) obj;
return Objects.equals(ranges, other.ranges)
&& Objects.equals(keyed, other.keyed);
}
}

}
Loading

0 comments on commit 97cbdb8

Please sign in to comment.