Skip to content

Commit

Permalink
Aggregations Refactor: Refactor Cardinality Aggregation
Browse files Browse the repository at this point in the history
  • Loading branch information
colings86 committed Nov 24, 2015
1 parent 510848f commit 3037870
Show file tree
Hide file tree
Showing 3 changed files with 164 additions and 45 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,29 +19,61 @@

package org.elasticsearch.search.aggregations.metrics.cardinality;

import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.search.aggregations.Aggregator;
import org.elasticsearch.search.aggregations.bucket.SingleBucketAggregator;
import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator;
import org.elasticsearch.search.aggregations.support.AggregationContext;
import org.elasticsearch.search.aggregations.support.ValueType;
import org.elasticsearch.search.aggregations.support.ValuesSource;
import org.elasticsearch.search.aggregations.support.ValuesSourceAggregatorFactory;
import org.elasticsearch.search.aggregations.support.ValuesSourceParser;
import org.elasticsearch.search.aggregations.support.ValuesSourceType;

import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.Objects;

final class CardinalityAggregatorFactory extends ValuesSourceAggregatorFactory.LeafOnly<ValuesSource> {
public final class CardinalityAggregatorFactory<VS extends ValuesSource> extends ValuesSourceAggregatorFactory.LeafOnly<VS> {

private final long precisionThreshold;
public static final ParseField PRECISION_THRESHOLD_FIELD = new ParseField("precision_threshold");

CardinalityAggregatorFactory(String name, ValuesSourceParser.Input<ValuesSource> input, long precisionThreshold) {
super(name, InternalCardinality.TYPE, input);
private Long precisionThreshold = null;

public CardinalityAggregatorFactory(String name, ValuesSourceType valuesSourceType, ValueType valueType) {
super(name, InternalCardinality.TYPE, valuesSourceType, valueType);
}

/**
* Set a precision threshold. Higher values improve accuracy but also
* increase memory usage.
*/
public void precisionThreshold(long precisionThreshold) {
this.precisionThreshold = precisionThreshold;
}

/**
* Get the precision threshold. Higher values improve accuracy but also
* increase memory usage. Will return <code>null</code> if the
* precisionThreshold has not been set yet.
*/
public Long precisionThreshold() {
return precisionThreshold;
}

/**
* @deprecated no replacement - values will always be rehashed
*/
@Deprecated
public void rehash(boolean rehash) {
// Deprecated all values are already rehashed so do nothing
}

private int precision(Aggregator parent) {
return precisionThreshold < 0 ? defaultPrecision(parent) : HyperLogLogPlusPlus.precisionFromThreshold(precisionThreshold);
return precisionThreshold == null ? defaultPrecision(parent) : HyperLogLogPlusPlus.precisionFromThreshold(precisionThreshold);
}

@Override
Expand All @@ -51,12 +83,50 @@ protected Aggregator createUnmapped(AggregationContext context, Aggregator paren
}

@Override
protected Aggregator doCreateInternal(ValuesSource valuesSource, AggregationContext context, Aggregator parent,
protected Aggregator doCreateInternal(VS valuesSource, AggregationContext context, Aggregator parent,
boolean collectsFromSingleBucket, List<PipelineAggregator> pipelineAggregators, Map<String, Object> metaData) throws IOException {
return new CardinalityAggregator(name, valuesSource, precision(parent), config.formatter(), context, parent, pipelineAggregators,
metaData);
}

@Override
protected ValuesSourceAggregatorFactory<VS> innerReadFrom(String name, ValuesSourceType valuesSourceType,
ValueType targetValueType, StreamInput in) throws IOException {
CardinalityAggregatorFactory<VS> factory = new CardinalityAggregatorFactory<>(name, valuesSourceType, targetValueType);
if (in.readBoolean()) {
factory.precisionThreshold = in.readLong();
}
return factory;
}

@Override
protected void innerWriteTo(StreamOutput out) throws IOException {
boolean hasPrecisionThreshold = precisionThreshold != null;
out.writeBoolean(hasPrecisionThreshold);
if (hasPrecisionThreshold) {
out.writeLong(precisionThreshold);
}
}

@Override
public XContentBuilder doXContentBody(XContentBuilder builder, Params params) throws IOException {
if (precisionThreshold != null) {
builder.field(PRECISION_THRESHOLD_FIELD.getPreferredName(), precisionThreshold);
}
return builder;
}

@Override
protected int innerHashCode() {
return Objects.hash(precisionThreshold);
}

@Override
protected boolean innerEquals(Object obj) {
CardinalityAggregatorFactory<ValuesSource> other = (CardinalityAggregatorFactory<ValuesSource>) obj;
return Objects.equals(precisionThreshold, other.precisionThreshold);
}

/*
* If one of the parent aggregators is a MULTI_BUCKET one, we might want to lower the precision
* because otherwise it might be memory-intensive. On the other hand, for top-level aggregators
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,66 +20,62 @@
package org.elasticsearch.search.aggregations.metrics.cardinality;

import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.ParseFieldMatcher;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.search.SearchParseException;
import org.elasticsearch.search.aggregations.Aggregator;
import org.elasticsearch.common.xcontent.XContentParser.Token;
import org.elasticsearch.search.aggregations.AggregatorFactory;
import org.elasticsearch.search.aggregations.support.AbstractValuesSourceParser.AnyValuesSourceParser;
import org.elasticsearch.search.aggregations.support.ValueType;
import org.elasticsearch.search.aggregations.support.ValuesSource;
import org.elasticsearch.search.aggregations.support.ValuesSourceParser;
import org.elasticsearch.search.internal.SearchContext;
import org.elasticsearch.search.aggregations.support.ValuesSourceAggregatorFactory;
import org.elasticsearch.search.aggregations.support.ValuesSourceType;

import java.io.IOException;
import java.util.Map;


public class CardinalityParser implements Aggregator.Parser {
public class CardinalityParser extends AnyValuesSourceParser {

private static final ParseField PRECISION_THRESHOLD = new ParseField("precision_threshold");
private static final ParseField REHASH = new ParseField("rehash").withAllDeprecated("no replacement - values will always be rehashed");

public CardinalityParser() {
super(true, false);
}

@Override
public String type() {
return InternalCardinality.TYPE.name();
}

@Override
public AggregatorFactory parse(String name, XContentParser parser, SearchContext context) throws IOException {

ValuesSourceParser<ValuesSource> vsParser = ValuesSourceParser.any(name, InternalCardinality.TYPE, context).formattable(false)
.build();

long precisionThreshold = -1;
protected ValuesSourceAggregatorFactory<ValuesSource> createFactory(String aggregationName, ValuesSourceType valuesSourceType,
ValueType targetValueType, Map<ParseField, Object> otherOptions) {
CardinalityAggregatorFactory<ValuesSource> factory = new CardinalityAggregatorFactory<>(aggregationName, valuesSourceType,
targetValueType);
Long precisionThreshold = (Long) otherOptions.get(CardinalityAggregatorFactory.PRECISION_THRESHOLD_FIELD);
if (precisionThreshold != null) {
factory.precisionThreshold(precisionThreshold);
}
return factory;
}

XContentParser.Token token;
String currentFieldName = null;
while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) {
if (token == XContentParser.Token.FIELD_NAME) {
currentFieldName = parser.currentName();
} else if (vsParser.token(currentFieldName, token, parser)) {
continue;
} else if (token.isValue()) {
if (context.parseFieldMatcher().match(currentFieldName, REHASH)) {
// ignore
} else if (context.parseFieldMatcher().match(currentFieldName, PRECISION_THRESHOLD)) {
precisionThreshold = parser.longValue();
} else {
throw new SearchParseException(context, "Unknown key for a " + token + " in [" + name + "]: [" + currentFieldName
+ "].", parser.getTokenLocation());
}
} else {
throw new SearchParseException(context, "Unexpected token " + token + " in [" + name + "].", parser.getTokenLocation());
@Override
protected boolean token(String aggregationName, String currentFieldName, Token token, XContentParser parser,
ParseFieldMatcher parseFieldMatcher, Map<ParseField, Object> otherOptions) throws IOException {
if (token.isValue()) {
if (parseFieldMatcher.match(currentFieldName, CardinalityAggregatorFactory.PRECISION_THRESHOLD_FIELD)) {
otherOptions.put(CardinalityAggregatorFactory.PRECISION_THRESHOLD_FIELD, parser.longValue());
return true;
} else if (parseFieldMatcher.match(currentFieldName, REHASH)) {
// ignore
return true;
}
}

ValuesSourceParser.Input<ValuesSource> input = vsParser.input();

return new CardinalityAggregatorFactory(name, input, precisionThreshold);

return false;
}

// NORELEASE implement this method when refactoring this aggregation
@Override
public AggregatorFactory[] getFactoryPrototypes() {
return null;
return new AggregatorFactory[] { new CardinalityAggregatorFactory<ValuesSource>(null, null, null) };
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.elasticsearch.search.aggregations.metrics.cardinality;

import org.elasticsearch.script.Script;
import org.elasticsearch.search.aggregations.BaseAggregationTestCase;
import org.elasticsearch.search.aggregations.support.ValuesSource;
import org.elasticsearch.search.aggregations.support.ValuesSourceType;

public class CardinalityTests extends BaseAggregationTestCase<CardinalityAggregatorFactory<? extends ValuesSource>> {

@Override
protected final CardinalityAggregatorFactory<? extends ValuesSource> createTestAggregatorFactory() {
CardinalityAggregatorFactory<ValuesSource> factory = new CardinalityAggregatorFactory<ValuesSource>("foo", ValuesSourceType.ANY,
null);
String field = randomNumericField();
int randomFieldBranch = randomInt(3);
switch (randomFieldBranch) {
case 0:
factory.field(field);
break;
case 1:
factory.field(field);
factory.script(new Script("_value + 1"));
break;
case 2:
factory.script(new Script("doc[" + field + "] + 1"));
break;
}
if (randomBoolean()) {
factory.missing("MISSING");
}
return factory;
}

}

0 comments on commit 3037870

Please sign in to comment.