diff --git a/README.md b/README.md index c95ef1ae..683d95c1 100644 --- a/README.md +++ b/README.md @@ -151,14 +151,14 @@ for raw records, use projections to help reduce the load on the system and netwo Aggregations allow you to perform some operation on the collected records. They take an optional size to restrict the size of the aggregation (this applies for aggregations high cardinality aggregations and raw records). -The current aggregations supported are: +The current aggregation types that are supported are: | Aggregation | Meaning | | ----------- | ------- | -| COUNT | The resulting output would be a single record that contains the number of records seen that matched the filters. | +| GROUP | The resulting output would be a record containing the result of an operation for each unique group in the specified fields (only supported with no fields at this time, which groups all records) | | LIMIT | The resulting output would be at most the number specified in size. | -We currently only support COUNT if there is no fields being grouped on. In other words, you get a count of the number of records that matched your filters. +We currently only support GROUP operations if there is no fields being grouped on. In other words, you get the results of the operation on all records that matched your filters. The current format for an aggregation is (**note see above for what is supported at the moment**): @@ -183,48 +183,16 @@ The current format for an aggregation is (**note see above for what is supported You can also use LIMIT as an alias for RAW. DISTINCT is also an alias for GROUP. These exist to make some queries read a bit better. -See the [examples section](#examples) below to see how to perform the aggregations supported at the moment, as well as the attributes for them. +Currently we support GROUP aggregations on the following operations: -#### Coming Soon - -It is often intractable to perform aggregations on an unbounded stream of data and support arbitrary queries. However, it is possible -if an exact answer is not required as long as the error is quantifiable. There are stochastic algorithms and data structures that let us -support these aggregations. We will be using [Data Sketches](https://datasketches.github.io/) to solve aggregations such as counting -uniques, getting distributions, approximating top k etc. Sketches let us be exact in our computation up to configured thresholds -and approximate after. The error is very controllable and mathematically provable. This lets us address otherwise hard to solve problems in -sublinear space. We will also use Sketches as a way to control high cardinality grouping (group by a natural key column or related) and rely on -the Sketching data structure to drop excess groups. It is up to the user launching Bullet to determine to set Sketch sizes large or -small enough for to satisfy the queries that will be performed on that instance of Bullet. - -Using Sketches, we are working on other aggregations including but not limited to: - -| Aggregation | Meaning | +| Operation | Meaning | | -------------- | ------- | | SUM | Computes the sum of the elements in the group | | MIN | Returns the minimum of the elements in the group | | MAX | Returns the maximum of the elements in the group | | AVG | Computes the average of the elements in the group | -| COUNT DISTINCT | Computes the number of distinct elements in the column | -| TOP K | Returns the top K most freqently appearing values in the column | -| DISTRIBUTION | Computes distributions of the elements in the column | - -The following attributes are planned to be supported for the different aggregations: - -Attributes for TOP K: - -```javascript - "attributes": { - "k": 15, - } -``` -Attributes for COUNT DISTINCT: - -```javascript - "attributes": { - "newName": "the name of the resulting count column" - } -``` +The following attributes are supported for GROUP: Attributes for GROUP: ```javascript @@ -258,6 +226,46 @@ Attributes for GROUP: } ``` +See the [examples section](#examples) for a detailed description of how to perform these aggregations. + +#### Coming Soon + +It is often intractable to perform aggregations on an unbounded stream of data and support arbitrary queries. However, it is possible +if an exact answer is not required as long as the error is quantifiable. There are stochastic algorithms and data structures that let us +support these aggregations. We will be using [Data Sketches](https://datasketches.github.io/) to solve aggregations such as counting +uniques, getting distributions, approximating top k etc. Sketches let us be exact in our computation up to configured thresholds +and approximate after. The error is very controllable and mathematically provable. This lets us address otherwise hard to solve problems in +sublinear space. We will also use Sketches as a way to control high cardinality grouping (group by a natural key column or related) and rely on +the Sketching data structure to drop excess groups. It is up to the user launching Bullet to determine to set Sketch sizes large or +small enough for to satisfy the queries that will be performed on that instance of Bullet. + +Using Sketches, we are working on other aggregations including but not limited to: + +| Aggregation | Meaning | +| -------------- | ------- | +| GROUP | We currently support GROUP with no fields (group all); grouping on specific fields will be supported soon | +| COUNT DISTINCT | Computes the number of distinct elements in the column | +| TOP K | Returns the top K most freqently appearing values in the column | +| DISTRIBUTION | Computes distributions of the elements in the column | + +The following attributes are planned to be supported for the different aggregations: + +Attributes for TOP K: + +```javascript + "attributes": { + "k": 15, + } +``` + +Attributes for COUNT DISTINCT: + +```javascript + "attributes": { + "newName": "the name of the resulting count column" + } +``` + The attributes for the DISTRIBUTION aggregation haven't been decided yet. ### Constraints @@ -584,6 +592,71 @@ A sample result would look like: This result indicates that 363,201 records were counted with demographics.age > 65 during the 20 seconds the query was running. +COUNT is the only GROUP operation for which you can omit a "field". A more complicated example with multiple group operations would look like: + +```javascript +{ + "filters":[ + { + "field": "demographics.state", + "operation": "==", + "values": ["california"] + } + ], + "aggregation":{ + "type": "GROUP", + "attributes": { + "operations": [ + { + "type": "COUNT", + "newName": "numCalifornians" + }, + { + "type": "AVG", + "field": "demographics.age", + "newName": "avgAge" + }, + { + "type": "MIN", + "field": "demographics.age", + "newName": "minAge" + }, + { + "type": "MAX", + "field": "demographics.age", + "newName": "maxAge" + } + ] + } + }, + "duration": 20000 +} +``` + +A sample result would look like: + +```javascript +{ + "records": [ + { + "maxAge": 94.0, + "numCalifornians": 188451, + "minAge": 6.0, + "avgAge": 33.71828 + } + ], + "meta": { + "rule_id": 8051040987827161000, + "rule_body": "}", + "rule_finish_time": 1482371927435, + "rule_receive_time": 1482371916625 + } +} +``` + +This result indicates that, among the records observed during the 20 seconds this query ran, there were 188,451 users with demographics.state == "california". Among these users the average age was 33.71828, the max +age observed was 94, and the minimum age observed was 6. + ## Configuration Bullet is configured at run-time using settings defined in a file. Settings not overridden will default to the values in [src/main/resources/bullet_defaults.yaml](src/main/resources/bullet_defaults.yaml). diff --git a/pom.xml b/pom.xml index ba200696..eeb8f59c 100644 --- a/pom.xml +++ b/pom.xml @@ -38,7 +38,7 @@ 1.8 1.0.2 1.0.2 - 0.0.2 + 0.0.3 @@ -145,6 +145,7 @@ maven-clover2-plugin 4.0.5 + ${user.home}/clover.license **/Client.java **/Topology.java diff --git a/src/main/java/com/yahoo/bullet/operations/AggregationOperations.java b/src/main/java/com/yahoo/bullet/operations/AggregationOperations.java index c18f9d04..e6578114 100644 --- a/src/main/java/com/yahoo/bullet/operations/AggregationOperations.java +++ b/src/main/java/com/yahoo/bullet/operations/AggregationOperations.java @@ -17,6 +17,7 @@ import java.util.Map; import java.util.Objects; import java.util.Set; +import java.util.function.BiFunction; public class AggregationOperations { public enum AggregationType { @@ -41,7 +42,9 @@ public enum GroupOperationType { SUM("SUM"), MIN("MIN"), MAX("MAX"), - AVG("AVG"); + AVG("AVG"), + // COUNT_FIELD operation is only used internally in conjunction with AVG and won't be returned. + COUNT_FIELD("COUNT_FIELD"); private String name; @@ -60,6 +63,15 @@ public boolean isMe(String name) { } } + public interface AggregationOperator extends BiFunction { + } + + // If either argument is null, a NullPointerException will be thrown. + public static final AggregationOperator MIN = (x, y) -> x.doubleValue() < y.doubleValue() ? x : y; + public static final AggregationOperator MAX = (x, y) -> x.doubleValue() > y.doubleValue() ? x : y; + public static final AggregationOperator SUM = (x, y) -> x.doubleValue() + y.doubleValue(); + public static final AggregationOperator COUNT = (x, y) -> x.longValue() + y.longValue(); + /** * Checks to see if a {@link Map} contains items. * diff --git a/src/main/java/com/yahoo/bullet/operations/aggregations/GroupAll.java b/src/main/java/com/yahoo/bullet/operations/aggregations/GroupAll.java index 80dfdf05..ecf89764 100644 --- a/src/main/java/com/yahoo/bullet/operations/aggregations/GroupAll.java +++ b/src/main/java/com/yahoo/bullet/operations/aggregations/GroupAll.java @@ -30,12 +30,12 @@ public GroupAll(Aggregation aggregation) { @Override public void consume(BulletRecord data) { - this.data.compute(data); + this.data.consume(data); } @Override public void combine(byte[] serializedAggregation) { - data.merge(serializedAggregation); + data.combine(serializedAggregation); } @Override diff --git a/src/main/java/com/yahoo/bullet/operations/aggregations/GroupData.java b/src/main/java/com/yahoo/bullet/operations/aggregations/GroupData.java index 87030a16..8f4bb1b9 100644 --- a/src/main/java/com/yahoo/bullet/operations/aggregations/GroupData.java +++ b/src/main/java/com/yahoo/bullet/operations/aggregations/GroupData.java @@ -5,6 +5,8 @@ */ package com.yahoo.bullet.operations.aggregations; +import com.yahoo.bullet.operations.AggregationOperations; +import com.yahoo.bullet.operations.AggregationOperations.AggregationOperator; import com.yahoo.bullet.operations.AggregationOperations.GroupOperationType; import com.yahoo.bullet.record.BulletRecord; import lombok.extern.slf4j.Slf4j; @@ -19,6 +21,9 @@ import java.util.Map; import java.util.Set; +import static com.yahoo.bullet.operations.AggregationOperations.GroupOperationType.AVG; +import static com.yahoo.bullet.operations.AggregationOperations.GroupOperationType.COUNT_FIELD; + /** * This class represents the results of a GroupOperations. The result is always a {@link Number}, so * that is what this class stores. It is {@link Serializable} and provides convenience static methods to @@ -41,18 +46,23 @@ public class GroupData implements Serializable { * @param operations the non-null operations that this will compute metrics for. */ public GroupData(Set operations) { - // TODO Could do the identity for each operation instead of null // Initialize with nulls. - operations.stream().forEach(o -> metrics.put(o, null)); + for (GroupOperation operation : operations) { + metrics.put(operation, null); + if (operation.getType() == AVG) { + // For AVG we store an addition COUNT_FIELD operation to store the count (the sum is stored in AVG) + metrics.put(new GroupOperation(COUNT_FIELD, operation.getField(), null), null); + } + } } /** - * Computes and adds metrics from the given {@link BulletRecord} for the {@link GroupOperation} defined. + * Consumes the given {@link BulletRecord} and computes group operation metrics. * * @param data The record to compute metrics for. */ - public void compute(BulletRecord data) { - metrics.entrySet().stream().forEach(e -> compute(e, data)); + public void consume(BulletRecord data) { + metrics.entrySet().stream().forEach(e -> consume(e, data)); } /** @@ -61,13 +71,13 @@ public void compute(BulletRecord data) { * * @param serializedGroupData the serialized bytes of a GroupData. */ - public void merge(byte[] serializedGroupData) { + public void combine(byte[] serializedGroupData) { GroupData otherMetric = GroupData.fromBytes(serializedGroupData); if (otherMetric == null) { log.error("Could not create a GroupData. Skipping..."); return; } - merge(otherMetric); + combine(otherMetric); } /** @@ -76,8 +86,8 @@ public void merge(byte[] serializedGroupData) { * * @param otherData The other GroupData to merge. */ - public void merge(GroupData otherData) { - metrics.entrySet().stream().forEach(e -> merge(e, otherData)); + public void combine(GroupData otherData) { + metrics.entrySet().stream().forEach(e -> combine(e, otherData)); } /** @@ -87,50 +97,80 @@ public void merge(GroupData otherData) { */ public BulletRecord getAsBulletRecord() { BulletRecord record = new BulletRecord(); - metrics.entrySet().stream().forEach(e -> update(e, record)); + metrics.entrySet().stream().forEach(e -> addToRecord(e, record)); return record; } - private Number getOrDefault(GroupOperation operation, Number defaultValue) { - // Can't use metrics.getOrDefault since that checks for containsKey - we put explicit nulls into it - Number value = metrics.get(operation); - return value == null ? defaultValue : value; - } - - private void compute(Map.Entry metric, BulletRecord data) { + private void consume(Map.Entry metric, BulletRecord data) { GroupOperation operation = metric.getKey(); + Object value = data.get(operation.getField()); switch (operation.getType()) { + case MIN: + updateMetric(value, metric, AggregationOperations.MIN); + break; + case MAX: + updateMetric(value, metric, AggregationOperations.MAX); + break; case COUNT: - incrementCount(operation); + updateMetric(1L, metric, AggregationOperations.COUNT); break; - default: - // These other cases can't happen yet + case COUNT_FIELD: + updateMetric(value != null ? 1L : null, metric, AggregationOperations.COUNT); + break; + case SUM: + case AVG: + updateMetric(value, metric, AggregationOperations.SUM); break; } } - private void merge(Map.Entry metric, GroupData otherData) { + private void combine(Map.Entry metric, GroupData otherData) { GroupOperation operation = metric.getKey(); + Number value = otherData.metrics.get(metric.getKey()); switch (operation.getType()) { - case COUNT: - incrementCount(operation, otherData.getCount(operation)); + case MIN: + updateMetric(value, metric, AggregationOperations.MIN); + break; + case MAX: + updateMetric(value, metric, AggregationOperations.MAX); break; - default: - // These other cases can't happen yet + case SUM: + case AVG: + updateMetric(value, metric, AggregationOperations.SUM); + break; + case COUNT: + case COUNT_FIELD: + updateMetric(value, metric, AggregationOperations.COUNT); break; } } - private void update(Map.Entry metric, BulletRecord record) { + private void addToRecord(Map.Entry metric, BulletRecord record) { GroupOperation operation = metric.getKey(); + Number value = metric.getValue(); switch (operation.getType()) { case COUNT: - record.setLong(getResultName(operation), getCount(operation)); + record.setLong(getResultName(operation), value == null ? 0 : value.longValue()); + break; + case AVG: + record.setDouble(getResultName(operation), calculateAvg(value, operation.getField())); break; - default: - // These other cases can't happen yet + case COUNT_FIELD: break; + case MIN: + case MAX: + case SUM: + record.setDouble(getResultName(operation), value == null ? null : value.doubleValue()); + break; + } + } + + private Double calculateAvg(Number sum, String field) { + Number count = metrics.get(new GroupOperation(COUNT_FIELD, field, null)); + if (sum == null || count == null) { + return null; } + return sum.doubleValue() / count.longValue(); } /** @@ -192,20 +232,16 @@ public static byte[] toBytes(GroupData metric) { return null; } - /* ======================== OPERATION IMPLEMENTATIONS ============================ */ - - private void incrementCount(GroupOperation operation) { - incrementCount(operation, 1L); - } - - private void incrementCount(GroupOperation operation, Long by) { - Number count = getOrDefault(operation, 0L); - metrics.put(operation, count.longValue() + by); - } - - private Long getCount(GroupOperation operation) { - Number count = getOrDefault(operation, 0L); - return count.longValue(); + /* + * This function accepts an AggregationOperator and applies it to the new and current value for the given + * GroupOperation and updates metrics accordingly. + */ + private void updateMetric(Object object, Map.Entry metric, AggregationOperator operator) { + // Also catches null. + if (object instanceof Number) { + Number current = metric.getValue(); + Number number = (Number) object; + metrics.put(metric.getKey(), current == null ? number : operator.apply(number, current)); + } } - } diff --git a/src/main/java/com/yahoo/bullet/parsing/Aggregation.java b/src/main/java/com/yahoo/bullet/parsing/Aggregation.java index 5af77f89..e86ae02c 100644 --- a/src/main/java/com/yahoo/bullet/parsing/Aggregation.java +++ b/src/main/java/com/yahoo/bullet/parsing/Aggregation.java @@ -52,17 +52,27 @@ public class Aggregation implements Configurable, Validatable { // We only support COUNT for now. public static final Set SUPPORTED_AGGREGATION_TYPES = new HashSet<>(asList(AggregationType.GROUP, AggregationType.RAW)); - public static final List SUPPORTED_GROUP_OPERATIONS = singletonList(GroupOperationType.COUNT); + public static final Set SUPPORTED_GROUP_OPERATIONS = new HashSet<>(asList(GroupOperationType.COUNT, + GroupOperationType.AVG, + GroupOperationType.MAX, + GroupOperationType.MIN, + GroupOperationType.SUM)); public static final String TYPE_NOT_SUPPORTED_ERROR_PREFIX = "Aggregation type not supported"; public static final String TYPE_NOT_SUPPORTED_RESOLUTION = "Current supported aggregation types are: RAW, GROUP"; + public static final String SUPPORTED_GROUP_OPERATIONS_RESOLUTION = + "Currently supported operations are: COUNT, AVG, MIN, MAX, SUM"; + + public static final String GROUP_OPERATION_REQUIRES_FIELD = "Group operation requires a field: "; + public static final String GROUP_OPERATION_REQUIRES_FIELD_RESOLUTION = "Please add field for this operation."; + // Temporary public static final Error GROUP_FIELDS_NOT_SUPPORTED_ERROR = makeError("Group type aggregation cannot have fields", "Do not specify fields when type is GROUP"); // Temporary public static final Error GROUP_ALL_OPERATION_ERROR = makeError("Group all needs to specify an operation to do", - "Currently supported operations are: COUNT"); + SUPPORTED_GROUP_OPERATIONS_RESOLUTION); public static final Integer DEFAULT_SIZE = 1; public static final Integer DEFAULT_MAX_SIZE = 30; @@ -115,6 +125,14 @@ public Optional> validate() { return Optional.of(singletonList(makeError(TYPE_NOT_SUPPORTED_ERROR_PREFIX + typeSuffix, TYPE_NOT_SUPPORTED_RESOLUTION))); } + if (groupOperations != null) { + for (GroupOperation operation : groupOperations) { + if (operation.getField() == null && operation.getType() != GroupOperationType.COUNT) { + return Optional.of(singletonList(makeError(GROUP_OPERATION_REQUIRES_FIELD + operation.getType(), + GROUP_OPERATION_REQUIRES_FIELD_RESOLUTION))); + } + } + } return Optional.empty(); } diff --git a/src/test/java/com/yahoo/bullet/drpc/FilterBoltTest.java b/src/test/java/com/yahoo/bullet/drpc/FilterBoltTest.java index adaf3530..fc2c5c13 100644 --- a/src/test/java/com/yahoo/bullet/drpc/FilterBoltTest.java +++ b/src/test/java/com/yahoo/bullet/drpc/FilterBoltTest.java @@ -6,7 +6,6 @@ package com.yahoo.bullet.drpc; import com.yahoo.bullet.operations.AggregationOperations.AggregationType; -import com.yahoo.bullet.operations.AggregationOperations.GroupOperationType; import com.yahoo.bullet.operations.FilterOperations; import com.yahoo.bullet.operations.aggregations.GroupData; import com.yahoo.bullet.operations.aggregations.GroupOperation; @@ -21,13 +20,13 @@ import org.testng.annotations.Test; import java.util.Arrays; -import java.util.Collections; import java.util.List; import java.util.Optional; import java.util.stream.IntStream; import static com.yahoo.bullet.drpc.TupleUtils.makeIDTuple; import static com.yahoo.bullet.drpc.TupleUtils.makeTuple; +import static com.yahoo.bullet.operations.AggregationOperations.GroupOperationType.COUNT; import static com.yahoo.bullet.operations.FilterOperations.FilterType.AND; import static com.yahoo.bullet.operations.FilterOperations.FilterType.EQUALS; import static com.yahoo.bullet.operations.FilterOperations.FilterType.GREATER_THAN; @@ -181,10 +180,10 @@ public void testFiltering() { @Test public void testProjectionAndFiltering() { Tuple rule = makeIDTuple(TupleType.Type.RULE_TUPLE, 42L, - makeProjectionFilterRule("map_field.id", Collections.singletonList("123"), - EQUALS, - Pair.of("field", "id"), - Pair.of("map_field.id", "mid"))); + makeProjectionFilterRule("map_field.id", singletonList("123"), + EQUALS, + Pair.of("field", "id"), + Pair.of("map_field.id", "mid"))); bolt.execute(rule); BulletRecord record = RecordBox.get().add("field", "b235gf23b").add("timestamp", 92L) @@ -202,10 +201,10 @@ public void testProjectionAndFiltering() { @Test public void testFilteringUsingProjectedName() { Tuple rule = makeIDTuple(TupleType.Type.RULE_TUPLE, 42L, - makeProjectionFilterRule("mid", Collections.singletonList("123"), - EQUALS, - Pair.of("field", "id"), - Pair.of("map_field.id", "mid"))); + makeProjectionFilterRule("mid", singletonList("123"), + EQUALS, + Pair.of("field", "id"), + Pair.of("map_field.id", "mid"))); bolt.execute(rule); BulletRecord record = RecordBox.get().add("field", "b235gf23b").add("timestamp", 92L) @@ -223,10 +222,10 @@ public void testFilteringUsingProjectedName() { @Test public void testProjectionNotLosingFilterColumn() { Tuple rule = makeIDTuple(TupleType.Type.RULE_TUPLE, 42L, - makeProjectionFilterRule("timestamp", Collections.singletonList("92"), - EQUALS, - Pair.of("field", "id"), - Pair.of("map_field.id", "mid"))); + makeProjectionFilterRule("timestamp", singletonList("92"), + EQUALS, + Pair.of("field", "id"), + Pair.of("map_field.id", "mid"))); bolt.execute(rule); BulletRecord record = RecordBox.get().add("field", "b235gf23b").add("timestamp", 92L) @@ -244,8 +243,8 @@ public void testProjectionNotLosingFilterColumn() { @Test public void testMultiFiltering() { Tuple rule = makeIDTuple(TupleType.Type.RULE_TUPLE, 42L, - makeSimpleAggregationFilterRule("field", Collections.singletonList("b235gf23b"), - EQUALS, AggregationType.RAW, 5)); + makeSimpleAggregationFilterRule("field", singletonList("b235gf23b"), + EQUALS, AggregationType.RAW, 5)); bolt.execute(rule); BulletRecord record = RecordBox.get().add("field", "b235gf23b").getRecord(); Tuple matching = makeTuple(TupleType.Type.RECORD_TUPLE, record); @@ -263,8 +262,8 @@ public void testMultiFiltering() { public void testDifferentRuleMatchingSameTuple() { Tuple ruleA = makeIDTuple(TupleType.Type.RULE_TUPLE, 42L, makeFieldFilterRule("b235gf23b")); Tuple ruleB = makeIDTuple(TupleType.Type.RULE_TUPLE, 43L, - makeFilterRule("timestamp", Arrays.asList("1", "2", "3", "45"), - EQUALS)); + makeFilterRule("timestamp", Arrays.asList("1", "2", "3", "45"), + EQUALS)); bolt.execute(ruleA); bolt.execute(ruleB); @@ -284,8 +283,8 @@ public void testDifferentRuleMatchingSameTuple() { public void testDifferentRuleMatchingDifferentTuple() { Tuple ruleA = makeIDTuple(TupleType.Type.RULE_TUPLE, 42L, makeFieldFilterRule("b235gf23b")); Tuple ruleB = makeIDTuple(TupleType.Type.RULE_TUPLE, 43L, - makeFilterRule("timestamp", Arrays.asList("1", "2", "3", "45"), - FilterOperations.FilterType.NOT_EQUALS)); + makeFilterRule("timestamp", Arrays.asList("1", "2", "3", "45"), + FilterOperations.FilterType.NOT_EQUALS)); bolt.execute(ruleA); bolt.execute(ruleB); @@ -388,16 +387,16 @@ public void testRuleNonExpiryAndThenExpiry() { public void testComplexFilterRule() { Tuple rule = makeIDTuple(TupleType.Type.RULE_TUPLE, 42L, makeFilterRule(OR, - clause(AND, - clause("field", EQUALS, "abc"), - clause(OR, - clause(AND, - clause("experience", EQUALS, "app", "tv"), - clause("pid", EQUALS, "1", "2")), - clause("mid", GREATER_THAN, "10"))), - clause(AND, - clause("demographic_map.age", GREATER_THAN, "65"), - clause("filter_map.is_fake_event", EQUALS, "true")))); + clause(AND, + clause("field", EQUALS, "abc"), + clause(OR, + clause(AND, + clause("experience", EQUALS, "app", "tv"), + clause("pid", EQUALS, "1", "2")), + clause("mid", GREATER_THAN, "10"))), + clause(AND, + clause("demographic_map.age", GREATER_THAN, "65"), + clause("filter_map.is_fake_event", EQUALS, "true")))); bolt.execute(rule); // first clause is true : field == "abc", experience == "app" or "tv", mid < 10 @@ -457,9 +456,9 @@ public void testGroupAllCount() { bolt = ComponentUtils.prepare(new ExpiringFilterBolt(), collector); Tuple rule = makeIDTuple(TupleType.Type.RULE_TUPLE, 42L, - makeGroupFilterRule("timestamp", Arrays.asList("1", "2"), EQUALS, - AggregationType.GROUP, 1, emptyList(), - singletonList(new GroupOperation(GroupOperationType.COUNT, null, "cnt")))); + makeGroupFilterRule("timestamp", Arrays.asList("1", "2"), EQUALS, + AggregationType.GROUP, 1, emptyList(), + singletonList(new GroupOperation(COUNT, null, "cnt")))); bolt.execute(rule); BulletRecord record = RecordBox.get().add("timestamp", "1").getRecord(); diff --git a/src/test/java/com/yahoo/bullet/drpc/JoinBoltTest.java b/src/test/java/com/yahoo/bullet/drpc/JoinBoltTest.java index a381cad1..4aec9d83 100644 --- a/src/test/java/com/yahoo/bullet/drpc/JoinBoltTest.java +++ b/src/test/java/com/yahoo/bullet/drpc/JoinBoltTest.java @@ -86,7 +86,7 @@ private void sendRawByteTuplesTo(IRichBolt bolt, Long id, List data) { private byte[] getGroupDataWithCount(String countField, int count) { GroupData groupData = new GroupData(new HashSet<>(singletonList(new GroupOperation(COUNT, null, countField)))); - IntStream.range(0, count).forEach(i -> groupData.compute(null)); + IntStream.range(0, count).forEach(i -> groupData.consume(RecordBox.get().getRecord())); return GroupData.toBytes(groupData); } diff --git a/src/test/java/com/yahoo/bullet/operations/aggregations/GroupAllTest.java b/src/test/java/com/yahoo/bullet/operations/aggregations/GroupAllTest.java index 9488b218..1ee50c60 100644 --- a/src/test/java/com/yahoo/bullet/operations/aggregations/GroupAllTest.java +++ b/src/test/java/com/yahoo/bullet/operations/aggregations/GroupAllTest.java @@ -37,12 +37,16 @@ public static GroupAll makeGroupAll(Map... groupOperations) { return new GroupAll(aggregation); } - public static GroupAll makeGroupAll(GroupOperation... groupOperations) { + public static GroupAll makeGroupAll(List groupOperations) { Aggregation aggregation = mock(Aggregation.class); - when(aggregation.getGroupOperations()).thenReturn(new HashSet<>(asList(groupOperations))); + when(aggregation.getGroupOperations()).thenReturn(new HashSet<>(groupOperations)); return new GroupAll(aggregation); } + public static GroupAll makeGroupAll(GroupOperation... groupOperations) { + return makeGroupAll(asList(groupOperations)); + } + @Test public void testNoRecordCount() { GroupAll groupAll = makeGroupAll(makeGroupOperation(GroupOperationType.COUNT, null, null)); @@ -102,12 +106,23 @@ public void testCountingMoreThanMaximum() { @Test public void testCombiningMetrics() { - GroupAll groupAll = makeGroupAll(new GroupOperation(GroupOperationType.COUNT, null, "myCount")); - BulletRecord someRecord = RecordBox.get().add("foo", 1).getRecord(); - IntStream.range(0, 21).forEach(i -> groupAll.consume(someRecord)); + List operations = asList(new GroupOperation(GroupOperationType.COUNT, null, "myCount"), + new GroupOperation(GroupOperationType.MIN, "minField", "myMin"), + new GroupOperation(GroupOperationType.AVG, "groupField", "groupAvg"), + new GroupOperation(GroupOperationType.MIN, "groupField", "groupMin"), + new GroupOperation(GroupOperationType.SUM, "groupField", "groupSum")); + + GroupAll groupAll = makeGroupAll(operations); + groupAll.consume(RecordBox.get().add("minField", -8.8).add("groupField", 3.14).getRecord()); + groupAll.consume(RecordBox.get().add("minField", 0.0).addNull("groupField").getRecord()); + groupAll.consume(RecordBox.get().add("minField", 51.44).add("groupField", -4.88).getRecord()); - GroupAll another = makeGroupAll(makeGroupOperation(GroupOperationType.COUNT, null, null)); - IntStream.range(0, 21).forEach(i -> another.consume(someRecord)); + GroupAll another = makeGroupAll(operations); + another.consume(RecordBox.get().add("minField", -8.8).add("groupField", 12345.67).getRecord()); + another.consume(RecordBox.get().addNull("minField").add("groupField", 2.718).getRecord()); + another.consume(RecordBox.get().add("minField", -51.0).addNull("groupField").getRecord()); + another.consume(RecordBox.get().add("minField", 0).add("groupField", 1).getRecord()); + another.consume(RecordBox.get().add("minField", 44.8).add("groupField", -51.44).getRecord()); byte[] serialized = another.getSerializedAggregation(); groupAll.combine(serialized); @@ -115,10 +130,11 @@ public void testCombiningMetrics() { Assert.assertNotNull(groupAll.getSerializedAggregation()); List aggregate = groupAll.getAggregation(); Assert.assertEquals(aggregate.size(), 1); + BulletRecord actual = aggregate.get(0); - // 21 + 21 - BulletRecord expected = RecordBox.get().add("myCount", 42L).getRecord(); - Assert.assertEquals(actual, expected); + BulletRecord expected = RecordBox.get().add("myCount", 8L).add("myMin", -51.0).add("groupAvg", 2049.368) + .add("groupMin", -51.44).add("groupSum", 12296.208).getRecord(); + Assert.assertTrue(actual.equals(expected)); } @Test @@ -138,4 +154,108 @@ public void testCombiningMetricsFail() { BulletRecord expected = RecordBox.get().add(GroupOperationType.COUNT.getName(), 10L).getRecord(); Assert.assertEquals(actual, expected); } + + @Test + public void testMin() { + GroupAll groupAll = makeGroupAll(makeGroupOperation(GroupOperationType.MIN, "someField", "min")); + Assert.assertNotNull(groupAll.getSerializedAggregation()); + + groupAll.consume(RecordBox.get().addNull("someField").getRecord()); + BulletRecord expected = RecordBox.get().addNull("min").getRecord(); + Assert.assertEquals(groupAll.getAggregation().get(0), expected); + + groupAll.consume(RecordBox.get().add("someField", -4.8).getRecord()); + groupAll.consume(RecordBox.get().add("someField", -8.8).getRecord()); + groupAll.consume(RecordBox.get().add("someField", 51.44).getRecord()); + expected = RecordBox.get().add("min", -8.8).getRecord(); + Assert.assertEquals(groupAll.getAggregation().get(0), expected); + + groupAll.consume(RecordBox.get().addNull("someField").getRecord()); + expected = RecordBox.get().add("min", -8.8).getRecord(); + Assert.assertEquals(groupAll.getAggregation().get(0), expected); + + groupAll.consume(RecordBox.get().add("someField", -51.44).getRecord()); + expected = RecordBox.get().add("min", -51.44).getRecord(); + Assert.assertEquals(groupAll.getAggregation().get(0), expected); + + Assert.assertEquals(groupAll.getAggregation().size(), 1); + } + + @Test + public void testMax() { + GroupAll groupAll = makeGroupAll(makeGroupOperation(GroupOperationType.MAX, "someField", "max")); + Assert.assertNotNull(groupAll.getSerializedAggregation()); + + groupAll.consume(RecordBox.get().addNull("someField").getRecord()); + BulletRecord expected = RecordBox.get().addNull("max").getRecord(); + Assert.assertEquals(groupAll.getAggregation().get(0), expected); + + groupAll.consume(RecordBox.get().add("someField", -4.8).getRecord()); + groupAll.consume(RecordBox.get().add("someField", -8.8).getRecord()); + groupAll.consume(RecordBox.get().add("someField", 51.44).getRecord()); + expected = RecordBox.get().add("max", 51.44).getRecord(); + Assert.assertEquals(groupAll.getAggregation().get(0), expected); + + groupAll.consume(RecordBox.get().addNull("someField").getRecord()); + expected = RecordBox.get().add("max", 51.44).getRecord(); + Assert.assertEquals(groupAll.getAggregation().get(0), expected); + + groupAll.consume(RecordBox.get().add("someField", 88.0).getRecord()); + expected = RecordBox.get().add("max", 88.0).getRecord(); + Assert.assertEquals(groupAll.getAggregation().get(0), expected); + + Assert.assertEquals(groupAll.getAggregation().size(), 1); + } + + @Test + public void testSum() { + GroupAll groupAll = makeGroupAll(makeGroupOperation(GroupOperationType.SUM, "someField", "sum")); + Assert.assertNotNull(groupAll.getSerializedAggregation()); + + groupAll.consume(RecordBox.get().addNull("someField").getRecord()); + BulletRecord expected = RecordBox.get().addNull("sum").getRecord(); + Assert.assertEquals(groupAll.getAggregation().get(0), expected); + + groupAll.consume(RecordBox.get().add("someField", -4.8).getRecord()); + groupAll.consume(RecordBox.get().add("someField", -8).getRecord()); + groupAll.consume(RecordBox.get().add("someField", 51.44).getRecord()); + expected = RecordBox.get().add("sum", 38.64).getRecord(); + Assert.assertEquals(groupAll.getAggregation().get(0), expected); + + groupAll.consume(RecordBox.get().addNull("someField").getRecord()); + expected = RecordBox.get().add("sum", 38.64).getRecord(); + Assert.assertEquals(groupAll.getAggregation().get(0), expected); + + groupAll.consume(RecordBox.get().add("someField", 88.0).getRecord()); + expected = RecordBox.get().add("sum", 126.64).getRecord(); + Assert.assertEquals(groupAll.getAggregation().get(0), expected); + + Assert.assertEquals(groupAll.getAggregation().size(), 1); + } + + @Test + public void testAvg() { + GroupAll groupAll = makeGroupAll(makeGroupOperation(GroupOperationType.AVG, "someField", "avg")); + Assert.assertNotNull(groupAll.getSerializedAggregation()); + + groupAll.consume(RecordBox.get().addNull("someField").getRecord()); + BulletRecord expected = RecordBox.get().addNull("avg").getRecord(); + Assert.assertEquals(groupAll.getAggregation().get(0), expected); + + groupAll.consume(RecordBox.get().add("someField", -4.8).getRecord()); + groupAll.consume(RecordBox.get().add("someField", -8).getRecord()); + groupAll.consume(RecordBox.get().add("someField", 51.44).getRecord()); + expected = RecordBox.get().add("avg", 12.88).getRecord(); + Assert.assertEquals(groupAll.getAggregation().get(0), expected); + + groupAll.consume(RecordBox.get().addNull("someField").getRecord()); + expected = RecordBox.get().add("avg", 12.88).getRecord(); + Assert.assertEquals(groupAll.getAggregation().get(0), expected); + + groupAll.consume(RecordBox.get().add("someField", 88.0).getRecord()); + expected = RecordBox.get().add("avg", 31.66).getRecord(); + Assert.assertEquals(groupAll.getAggregation().get(0), expected); + + Assert.assertEquals(groupAll.getAggregation().size(), 1); + } } diff --git a/src/test/java/com/yahoo/bullet/operations/aggregations/GroupDataTest.java b/src/test/java/com/yahoo/bullet/operations/aggregations/GroupDataTest.java index 085e8be2..b34f6c63 100644 --- a/src/test/java/com/yahoo/bullet/operations/aggregations/GroupDataTest.java +++ b/src/test/java/com/yahoo/bullet/operations/aggregations/GroupDataTest.java @@ -16,12 +16,15 @@ import java.io.ByteArrayOutputStream; import java.io.ObjectInputStream; import java.io.ObjectOutputStream; -import java.util.Arrays; +import java.util.ArrayList; import java.util.Collections; import java.util.HashSet; +import java.util.List; import java.util.Set; import java.util.stream.IntStream; +import static java.util.Arrays.asList; + public class GroupDataTest { private class NotSerializable { private int foo; @@ -57,7 +60,7 @@ private byte[] unmake(GroupData data) { } public static GroupData make(GroupOperation... operations) { - return new GroupData(new HashSet<>(Arrays.asList(operations))); + return new GroupData(new HashSet<>(asList(operations))); } @Test @@ -70,7 +73,7 @@ public void testManualSerializationFailing() { public void testManualSerialization() { GroupData data = make(new GroupOperation(GroupOperationType.COUNT, null, "foo")); BulletRecord sample = RecordBox.get().getRecord(); - IntStream.range(0, 5).forEach(i -> data.compute(sample)); + IntStream.range(0, 5).forEach(i -> data.consume(sample)); byte[] serialized = GroupData.toBytes(data); Assert.assertNotNull(serialized); @@ -89,7 +92,7 @@ public void testManualDeserializationFailing() { public void testManualDeserialization() { GroupData data = make(new GroupOperation(GroupOperationType.COUNT, null, "foo")); BulletRecord sample = RecordBox.get().getRecord(); - IntStream.range(0, 5).forEach(i -> data.compute(sample)); + IntStream.range(0, 5).forEach(i -> data.consume(sample)); byte[] serialized = unmake(data); GroupData remade = GroupData.fromBytes(serialized); @@ -118,7 +121,7 @@ public void testNameExtraction() { @Test public void testNullRecordCount() { GroupData data = make(new GroupOperation(GroupOperationType.COUNT, null, "count")); - data.compute(RecordBox.get().add("foo", "bar").getRecord()); + data.consume(RecordBox.get().add("foo", "bar").getRecord()); // We do not expect to send in null records so the count is incremented. BulletRecord expected = RecordBox.get().add("count", 1L).getRecord(); @@ -141,7 +144,7 @@ public void testSingleCounting() { Assert.assertEquals(data.getAsBulletRecord(), expected); - data.compute(RecordBox.get().getRecord()); + data.consume(RecordBox.get().getRecord()); expected = RecordBox.get().add("foo", 1L).getRecord(); Assert.assertEquals(data.getAsBulletRecord(), expected); } @@ -151,7 +154,7 @@ public void testMultiCounting() { GroupData data = make(new GroupOperation(GroupOperationType.COUNT, null, "count")); BulletRecord someRecord = RecordBox.get().add("foo", 1).getRecord(); - IntStream.range(0, 10).forEach(i -> data.compute(someRecord)); + IntStream.range(0, 10).forEach(i -> data.consume(someRecord)); BulletRecord expected = RecordBox.get().add("count", 10L).getRecord(); Assert.assertEquals(data.getAsBulletRecord(), expected); @@ -162,7 +165,7 @@ public void testCountingMoreThanMaximum() { GroupData data = make(new GroupOperation(GroupOperationType.COUNT, null, null)); BulletRecord someRecord = RecordBox.get().add("foo", 1).getRecord(); - IntStream.range(0, 2 * Aggregation.DEFAULT_MAX_SIZE).forEach(i -> data.compute(someRecord)); + IntStream.range(0, 2 * Aggregation.DEFAULT_MAX_SIZE).forEach(i -> data.consume(someRecord)); BulletRecord expected = RecordBox.get().add(GroupOperationType.COUNT.getName(), 2L * Aggregation.DEFAULT_MAX_SIZE).getRecord(); @@ -170,31 +173,68 @@ public void testCountingMoreThanMaximum() { } @Test - public void testMergingRawMetric() { - GroupData data = make(new GroupOperation(GroupOperationType.COUNT, null, "myCount")); - BulletRecord someRecord = RecordBox.get().add("foo", 1).getRecord(); - IntStream.range(0, 21).forEach(i -> data.compute(someRecord)); - - GroupData another = make(new GroupOperation(GroupOperationType.COUNT, null, "count")); - IntStream.range(0, 21).forEach(i -> another.compute(someRecord)); + public void testMergingMetric() { + GroupData data = make(new GroupOperation(GroupOperationType.COUNT, "shouldBeIgnored", "myCount"), + new GroupOperation(GroupOperationType.MIN, "groupField", "myMin"), + new GroupOperation(GroupOperationType.MAX, "groupField", "myMax"), + new GroupOperation(GroupOperationType.SUM, "groupField", "mySum"), + new GroupOperation(GroupOperationType.AVG, "groupField", "myAvg")); + asList(0.0, -8.8, 51.0).stream().map(x -> RecordBox.get().add("groupField", x).getRecord()) + .forEach(data::consume); + + GroupData another = make(new GroupOperation(GroupOperationType.COUNT, "alsoIgnored", "myCount"), + new GroupOperation(GroupOperationType.MIN, "groupField", "myMin"), + new GroupOperation(GroupOperationType.MAX, "groupField", "myMax"), + new GroupOperation(GroupOperationType.SUM, "groupField", "mySum"), + new GroupOperation(GroupOperationType.AVG, "groupField", "myAvg")); + asList(1.1, 4.4, -44.0, 12345.67, 3.3).stream().map(x -> RecordBox.get().add("groupField", x).getRecord()) + .forEach(data::consume); byte[] serialized = GroupData.toBytes(another); - data.merge(serialized); + data.combine(serialized); - // 21 + 21 - BulletRecord expected = RecordBox.get().add("myCount", 42L).getRecord(); - Assert.assertEquals(data.getAsBulletRecord(), expected); + BulletRecord expected = RecordBox.get().add("myCount", 8L).add("myMin", -44.0) + .add("myMax", 12345.67).add("mySum", 12352.67) + .add("myAvg", 1544.08375).getRecord(); + Assert.assertTrue(expected.equals(data.getAsBulletRecord())); } + @Test + public void testGroupMultipleFields() { + GroupData data = make(new GroupOperation(GroupOperationType.COUNT, "shouldBeIgnored", "myCount"), + new GroupOperation(GroupOperationType.MIN, "minField", "myMin"), + new GroupOperation(GroupOperationType.MIN, "groupField", "minGroupField"), + new GroupOperation(GroupOperationType.MAX, "maxField", "myMax"), + new GroupOperation(GroupOperationType.MAX, "groupField", "maxGroupField"), + new GroupOperation(GroupOperationType.SUM, "groupField", "sumGroupField")); + List minColumnValues = asList(0.0, -8.8, 51.0); + List maxColumnValues = asList(4.4, 88.51, -8.44); + List groupColumnValues = asList(123.45, -884451.8851, 3.14); + List records = new ArrayList<>(); + for (int i = 0; i < minColumnValues.size(); i++) { + RecordBox recordBox = RecordBox.get(); + recordBox.add("minField", minColumnValues.get(i)); + recordBox.add("maxField", maxColumnValues.get(i)); + recordBox.add("groupField", groupColumnValues.get(i)); + records.add(recordBox.getRecord()); + } + + records.stream().forEach(data::consume); + + BulletRecord expected = RecordBox.get().add("myCount", 3L).add("myMin", -8.8) + .add("minGroupField", -884451.8851).add("myMax", 88.51).add("maxGroupField", 123.45) + .add("sumGroupField", -884325.2951).getRecord(); + Assert.assertTrue(expected.equals(data.getAsBulletRecord())); + } @Test public void testMergingRawMetricFail() { GroupData data = make(new GroupOperation(GroupOperationType.COUNT, null, null)); BulletRecord someRecord = RecordBox.get().add("foo", 1).getRecord(); - IntStream.range(0, 10).forEach(i -> data.compute(someRecord)); + IntStream.range(0, 10).forEach(i -> data.consume(someRecord)); // Not a serialized GroupData - data.merge(String.valueOf(242).getBytes()); + data.combine(String.valueOf(242).getBytes()); // Unchanged count BulletRecord expected = RecordBox.get().add(GroupOperationType.COUNT.getName(), 10L).getRecord(); @@ -202,55 +242,309 @@ public void testMergingRawMetricFail() { } @Test - public void testMergingTwoUnsupportedOperations() { - GroupData data = make(new GroupOperation(GroupOperationType.AVG, "foo", "bar")); + public void testMergingSupportedAndUnSupportedOperation() { + GroupData data = make(new GroupOperation(GroupOperationType.COUNT, null, null)); BulletRecord someRecord = RecordBox.get().add("foo", 1).getRecord(); - IntStream.range(0, 10).forEach(i -> data.compute(someRecord)); + IntStream.range(0, 10).forEach(i -> data.consume(someRecord)); GroupData another = make(new GroupOperation(GroupOperationType.AVG, "foo", "bar")); - IntStream.range(0, 21).forEach(i -> another.compute(someRecord)); + IntStream.range(0, 21).forEach(i -> another.consume(someRecord)); byte[] serialized = GroupData.toBytes(another); - data.merge(serialized); + // This should combine since we only merge known GroupOperations from the other GroupData + data.combine(serialized); - // Empty record - BulletRecord expected = RecordBox.get().getRecord(); + // AVG should not have influenced other counts. + BulletRecord expected = RecordBox.get().add(GroupOperationType.COUNT.getName(), 10L).getRecord(); Assert.assertEquals(data.getAsBulletRecord(), expected); } @Test - public void testMergingSupportedAndUnSupportedOperation() { - GroupData data = make(new GroupOperation(GroupOperationType.COUNT, null, null)); - BulletRecord someRecord = RecordBox.get().add("foo", 1).getRecord(); - IntStream.range(0, 10).forEach(i -> data.compute(someRecord)); + public void testNullRecordMin() { + GroupData data = make(new GroupOperation(GroupOperationType.MIN, null, "min")); + data.consume(RecordBox.get().add("foo", "bar").getRecord()); - GroupData another = make(new GroupOperation(GroupOperationType.AVG, "foo", "bar")); - IntStream.range(0, 21).forEach(i -> another.compute(someRecord)); - byte[] serialized = GroupData.toBytes(another); + BulletRecord expected = RecordBox.get().addNull("min").getRecord(); + Assert.assertEquals(data.getAsBulletRecord(), expected); + } - // This should combine since we only merge known GroupOperations from the other GroupData - data.merge(serialized); + @Test + public void testNoRecordMin() { + GroupData data = make(new GroupOperation(GroupOperationType.MIN, null, "min")); - // AVG should not have influenced other counts. - BulletRecord expected = RecordBox.get().add(GroupOperationType.COUNT.getName(), 10L).getRecord(); + // MIN will return null if no records are observed + BulletRecord expected = RecordBox.get().addNull("min").getRecord(); Assert.assertEquals(data.getAsBulletRecord(), expected); } @Test - public void testMergingUnsupportedAndSupportedOperation() { - GroupData data = make(new GroupOperation(GroupOperationType.AVG, "foo", "bar")); - BulletRecord someRecord = RecordBox.get().add("foo", 1).getRecord(); - IntStream.range(0, 10).forEach(i -> data.compute(someRecord)); + public void testSingleMin() { + GroupData data = make(new GroupOperation(GroupOperationType.MIN, "someField", "foo")); + BulletRecord expected = RecordBox.get().addNull("foo").getRecord(); - GroupData another = make(new GroupOperation(GroupOperationType.COUNT, null, null)); - IntStream.range(0, 21).forEach(i -> another.compute(someRecord)); - byte[] serialized = GroupData.toBytes(another); + Assert.assertEquals(data.getAsBulletRecord(), expected); + + data.consume(RecordBox.get().add("someField", 8.8).getRecord()); + expected = RecordBox.get().add("foo", 8.8).getRecord(); + Assert.assertEquals(data.getAsBulletRecord(), expected); + } + + @Test + public void testMultiMin() { + GroupData data = make(new GroupOperation(GroupOperationType.MIN, "someField", "foo")); + List numbers = asList(0.0, 8.8, -88.0, 51.0, 4.0, -4.0, 1234567.89, -51.0); + + numbers.stream().map(x -> RecordBox.get().add("someField", x).getRecord()).forEach(data::consume); + + BulletRecord expected = RecordBox.get().add("foo", -88.0).getRecord(); + Assert.assertEquals(data.getAsBulletRecord(), expected); + } + + @Test + public void testNonNumericMin() { + GroupData data = make(new GroupOperation(GroupOperationType.MIN, "someField", "foo")); + BulletRecord expected = RecordBox.get().addNull("foo").getRecord(); + + Assert.assertEquals(data.getAsBulletRecord(), expected); + + data.consume(RecordBox.get().add("someField", "nonNumericValue").getRecord()); + expected = RecordBox.get().addNull("foo").getRecord(); + Assert.assertEquals(data.getAsBulletRecord(), expected); - // This should not merge the counts into this operation - data.merge(serialized); + data.consume(RecordBox.get().add("someField", 8.8).getRecord()); + expected = RecordBox.get().add("foo", 8.8).getRecord(); + Assert.assertEquals(data.getAsBulletRecord(), expected); + + data.consume(RecordBox.get().add("someField", "nonNumericValue").getRecord()); + expected = RecordBox.get().add("foo", 8.8).getRecord(); + Assert.assertEquals(data.getAsBulletRecord(), expected); + } + + @Test + public void testNullRecordMax() { + GroupData data = make(new GroupOperation(GroupOperationType.MAX, null, "max")); + data.consume(RecordBox.get().add("foo", "bar").getRecord()); + + BulletRecord expected = RecordBox.get().addNull("max").getRecord(); + Assert.assertEquals(data.getAsBulletRecord(), expected); + } + + @Test + public void testNoRecordMax() { + GroupData data = make(new GroupOperation(GroupOperationType.MAX, null, "max")); + + // MAX will return null if no records are observed + BulletRecord expected = RecordBox.get().addNull("max").getRecord(); + Assert.assertEquals(data.getAsBulletRecord(), expected); + } + + @Test + public void testSingleMax() { + GroupData data = make(new GroupOperation(GroupOperationType.MAX, "someField", "foo")); + BulletRecord expected = RecordBox.get().addNull("foo").getRecord(); + + Assert.assertEquals(data.getAsBulletRecord(), expected); + + data.consume(RecordBox.get().add("someField", 8.8).getRecord()); + expected = RecordBox.get().add("foo", 8.8).getRecord(); + Assert.assertEquals(data.getAsBulletRecord(), expected); + } + + @Test + public void testMultiMax() { + GroupData data = make(new GroupOperation(GroupOperationType.MAX, "someField", "foo")); + List numbers = asList(0.0, 8.8, -88.0, 51.0, 4.0, -4.0, 1234567.89, -51.0); + + numbers.stream().map(x -> RecordBox.get().add("someField", x).getRecord()).forEach(data::consume); + + BulletRecord expected = RecordBox.get().add("foo", 1234567.89).getRecord(); + Assert.assertEquals(data.getAsBulletRecord(), expected); + } + + @Test + public void testNonNumericMax() { + GroupData data = make(new GroupOperation(GroupOperationType.MAX, "someField", "foo")); + BulletRecord expected = RecordBox.get().addNull("foo").getRecord(); + + Assert.assertEquals(data.getAsBulletRecord(), expected); + + data.consume(RecordBox.get().add("someField", "nonNumericValue").getRecord()); + expected = RecordBox.get().addNull("foo").getRecord(); + Assert.assertEquals(data.getAsBulletRecord(), expected); + + data.consume(RecordBox.get().add("someField", 8.8).getRecord()); + expected = RecordBox.get().add("foo", 8.8).getRecord(); + Assert.assertEquals(data.getAsBulletRecord(), expected); + + data.consume(RecordBox.get().add("someField", "nonNumericValue").getRecord()); + expected = RecordBox.get().add("foo", 8.8).getRecord(); + Assert.assertEquals(data.getAsBulletRecord(), expected); + } + + @Test + public void testNullRecordSum() { + GroupData data = make(new GroupOperation(GroupOperationType.SUM, null, "sum")); + data.consume(RecordBox.get().add("foo", "bar").getRecord()); + + BulletRecord expected = RecordBox.get().addNull("sum").getRecord(); + Assert.assertEquals(data.getAsBulletRecord(), expected); + } + + @Test + public void testNoRecordSum() { + GroupData data = make(new GroupOperation(GroupOperationType.SUM, null, "sum")); + + // SUM will return null if no records are observed + BulletRecord expected = RecordBox.get().addNull("sum").getRecord(); + Assert.assertEquals(data.getAsBulletRecord(), expected); + } + + @Test + public void testSingleSum() { + GroupData data = make(new GroupOperation(GroupOperationType.SUM, "someField", "foo")); + BulletRecord expected = RecordBox.get().addNull("foo").getRecord(); + + Assert.assertEquals(data.getAsBulletRecord(), expected); + + data.consume(RecordBox.get().add("someField", 8.8).getRecord()); + expected = RecordBox.get().add("foo", 8.8).getRecord(); + Assert.assertEquals(data.getAsBulletRecord(), expected); + } + + @Test + public void testMultiSum() { + GroupData data = make(new GroupOperation(GroupOperationType.SUM, "someField", "foo")); + List numbers = asList(0.0, 8.8, -88.0, 51.0, 4.0, -4.0, 1234567.89, -51.0); + + numbers.stream().map(x -> RecordBox.get().add("someField", x).getRecord()).forEach(data::consume); + + BulletRecord expected = RecordBox.get().add("foo", 1234488.69).getRecord(); + Assert.assertEquals(data.getAsBulletRecord(), expected); + } + + @Test + public void testNonNumericSum() { + GroupData data = make(new GroupOperation(GroupOperationType.SUM, "someField", "foo")); + BulletRecord expected = RecordBox.get().addNull("foo").getRecord(); + + Assert.assertEquals(data.getAsBulletRecord(), expected); + + data.consume(RecordBox.get().add("someField", "nonNumericValue").getRecord()); + expected = RecordBox.get().addNull("foo").getRecord(); + Assert.assertEquals(data.getAsBulletRecord(), expected); + + data.consume(RecordBox.get().add("someField", 8.8).getRecord()); + expected = RecordBox.get().add("foo", 8.8).getRecord(); + Assert.assertEquals(data.getAsBulletRecord(), expected); + + data.consume(RecordBox.get().add("someField", "nonNumericValue").getRecord()); + expected = RecordBox.get().add("foo", 8.8).getRecord(); + Assert.assertEquals(data.getAsBulletRecord(), expected); + + data.consume(RecordBox.get().add("someField", 51.4).getRecord()); + expected = RecordBox.get().add("foo", 60.2).getRecord(); + Assert.assertEquals(data.getAsBulletRecord(), expected); + } + + @Test + public void testMultiSumOfLongs() { + GroupData data = make(new GroupOperation(GroupOperationType.SUM, "someField", "foo")); + List numbers = asList(0L, 8L, -88L, 51L, 4L); + + numbers.stream().map(x -> RecordBox.get().add("someField", x).getRecord()).forEach(data::consume); + + BulletRecord expected = RecordBox.get().add("foo", -25.0).getRecord(); + Assert.assertEquals(data.getAsBulletRecord(), expected); + } + + @Test + public void testNullRecordAvg() { + GroupData data = make(new GroupOperation(GroupOperationType.AVG, "someField", "avg")); + data.consume(RecordBox.get().add("foo", "bar").getRecord()); + + BulletRecord expected = RecordBox.get().addNull("avg").getRecord(); + Assert.assertEquals(data.getAsBulletRecord(), expected); + } + + @Test + public void testNoRecordAvg() { + GroupData data = make(new GroupOperation(GroupOperationType.AVG, "someField", "avg")); + + // AVG will return null if no records are observed + BulletRecord expected = RecordBox.get().addNull("avg").getRecord(); + Assert.assertEquals(data.getAsBulletRecord(), expected); + } + + @Test + public void testSingleAvg() { + GroupData data = make(new GroupOperation(GroupOperationType.AVG, "someField", "foo")); + BulletRecord expected = RecordBox.get().addNull("foo").getRecord(); + + Assert.assertEquals(data.getAsBulletRecord(), expected); + + data.consume(RecordBox.get().add("someField", 8.8).getRecord()); + expected = RecordBox.get().add("foo", 8.8).getRecord(); + Assert.assertEquals(data.getAsBulletRecord(), expected); + } + + @Test + public void testMultiAvg() { + GroupData data = make(new GroupOperation(GroupOperationType.AVG, "someField", "foo")); + List numbers = asList(0.0, 8.8, -88.0, 51.0, 4.0, -4.0, 1234567.89, -51.0); + + numbers.stream().map(x -> RecordBox.get().add("someField", x).getRecord()).forEach(data::consume); + + BulletRecord expected = RecordBox.get().add("foo", 154311.08625).getRecord(); + Assert.assertEquals(data.getAsBulletRecord(), expected); + } + + @Test + public void testNonNumericsCountedAvg() { + GroupData data = make(new GroupOperation(GroupOperationType.AVG, "someField", "foo")); + BulletRecord expected = RecordBox.get().addNull("foo").getRecord(); + + Assert.assertEquals(data.getAsBulletRecord(), expected); + + data.consume(RecordBox.get().add("someField", "nonNumericValue").getRecord()); + expected = RecordBox.get().addNull("foo").getRecord(); + Assert.assertEquals(data.getAsBulletRecord(), expected); + + data.consume(RecordBox.get().add("someField", 8.6).getRecord()); + data.consume(RecordBox.get().add("someField", 51.4).getRecord()); + expected = RecordBox.get().add("foo", 20.0).getRecord(); + Assert.assertEquals(data.getAsBulletRecord(), expected); + + data.consume(RecordBox.get().add("someField", "nonNumericValue").getRecord()); + expected = RecordBox.get().add("foo", 15.0).getRecord(); + Assert.assertEquals(data.getAsBulletRecord(), expected); + + data.consume(RecordBox.get().add("someField", -4.5).getRecord()); + expected = RecordBox.get().add("foo", 11.1).getRecord(); + Assert.assertEquals(data.getAsBulletRecord(), expected); + } + + @Test + public void testIgnoreNullsAvg() { + GroupData data = make(new GroupOperation(GroupOperationType.AVG, "someField", "foo")); + BulletRecord expected = RecordBox.get().addNull("foo").getRecord(); + + Assert.assertEquals(data.getAsBulletRecord(), expected); + + data.consume(RecordBox.get().addNull("someField").getRecord()); + expected = RecordBox.get().addNull("foo").getRecord(); + Assert.assertEquals(data.getAsBulletRecord(), expected); + + data.consume(RecordBox.get().add("someField", 8.8).getRecord()); + data.consume(RecordBox.get().add("someField", 51.4).getRecord()); + expected = RecordBox.get().add("foo", 30.1).getRecord(); + Assert.assertEquals(data.getAsBulletRecord(), expected); + + data.consume(RecordBox.get().addNull("someField").getRecord()); + expected = RecordBox.get().add("foo", 30.1).getRecord(); + Assert.assertEquals(data.getAsBulletRecord(), expected); - // AVG does not exist and COUNT will not be merged. - BulletRecord expected = RecordBox.get().getRecord(); + data.consume(RecordBox.get().add("someField", -4.4).getRecord()); + expected = RecordBox.get().add("foo", 18.6).getRecord(); Assert.assertEquals(data.getAsBulletRecord(), expected); } } diff --git a/src/test/java/com/yahoo/bullet/parsing/AggregationTest.java b/src/test/java/com/yahoo/bullet/parsing/AggregationTest.java index de0f2606..8764597c 100644 --- a/src/test/java/com/yahoo/bullet/parsing/AggregationTest.java +++ b/src/test/java/com/yahoo/bullet/parsing/AggregationTest.java @@ -18,8 +18,9 @@ import static com.yahoo.bullet.operations.AggregationOperations.AggregationType.GROUP; import static com.yahoo.bullet.operations.AggregationOperations.AggregationType.PERCENTILE; -import static com.yahoo.bullet.operations.AggregationOperations.GroupOperationType.AVG; import static com.yahoo.bullet.operations.AggregationOperations.GroupOperationType.COUNT; +import static com.yahoo.bullet.operations.AggregationOperations.GroupOperationType.COUNT_FIELD; +import static com.yahoo.bullet.operations.AggregationOperations.GroupOperationType.SUM; import static com.yahoo.bullet.parsing.AggregationUtils.makeAttributes; import static com.yahoo.bullet.parsing.AggregationUtils.makeGroupOperation; import static java.util.Arrays.asList; @@ -140,6 +141,29 @@ public void testSuccessfulValidate() { Assert.assertFalse(aggregation.validate().isPresent()); } + @Test + public void testValidateNoField() { + Aggregation aggregation = new Aggregation(); + aggregation.setType(GROUP); + aggregation.setAttributes(makeAttributes(makeGroupOperation(SUM, null, null))); + aggregation.configure(new HashMap<>()); + + List errors = aggregation.validate().get(); + Assert.assertEquals(errors.size(), 1); + Assert.assertEquals(errors.get(0).getError(), Aggregation.GROUP_OPERATION_REQUIRES_FIELD + SUM); + } + + @Test + public void testUnsupportedOperation() { + Aggregation aggregation = new Aggregation(); + aggregation.setType(GROUP); + aggregation.setAttributes(makeAttributes(makeGroupOperation(COUNT_FIELD, "someField", "myCountField"))); + aggregation.configure(new HashMap<>()); + + Set operations = aggregation.getGroupOperations(); + Assert.assertEquals(operations.size(), 0); + } + @Test public void testAttributeOperationMissing() { Aggregation aggregation = new Aggregation(); @@ -168,7 +192,7 @@ public void testAttributeOperationsUnknownOperation() { Aggregation aggregation = new Aggregation(); aggregation.setType(GROUP); aggregation.setAttributes(makeAttributes(makeGroupOperation(COUNT, null, "bar"), - makeGroupOperation(AVG, "foo", "foo_avg"))); + makeGroupOperation(COUNT_FIELD, "foo", "foo_avg"))); Assert.assertNull(aggregation.getGroupOperations()); aggregation.configure(emptyMap()); diff --git a/src/test/java/com/yahoo/bullet/result/RecordBox.java b/src/test/java/com/yahoo/bullet/result/RecordBox.java index 92ee0827..9427a918 100644 --- a/src/test/java/com/yahoo/bullet/result/RecordBox.java +++ b/src/test/java/com/yahoo/bullet/result/RecordBox.java @@ -27,6 +27,11 @@ public static RecordBox get() { return new RecordBox(); } + public final RecordBox addNull(String name) { + record.setString(name, null); + return this; + } + public final RecordBox add(String name, Object value) { if (value instanceof Boolean) { record.setBoolean(name, (Boolean) value);