diff --git a/docs/querying/aggregations.md b/docs/querying/aggregations.md index a25482b8bc30..ff73b5c65e20 100644 --- a/docs/querying/aggregations.md +++ b/docs/querying/aggregations.md @@ -134,8 +134,6 @@ Computes and returns arithmetic mean of a column values as 64 bit float value. T ### First / Last aggregator -(Double/Float/Long) First and Last aggregator cannot be used in ingestion spec, and should only be specified as part of queries. - Note that queries with first/last aggregators on a segment created with rollup enabled will return the rolled up value, and not the last value within the raw ingested data. #### `doubleFirst` aggregator diff --git a/integration-tests/src/test/resources/indexer/wikipedia_merge_index_queries.json b/integration-tests/src/test/resources/indexer/wikipedia_merge_index_queries.json index ab4674999b5d..b2953d97ebea 100644 --- a/integration-tests/src/test/resources/indexer/wikipedia_merge_index_queries.json +++ b/integration-tests/src/test/resources/indexer/wikipedia_merge_index_queries.json @@ -1,6 +1,6 @@ [ { - "description": "groupby, stringFirst/stringLast rollup aggs, all", + "description": "groupby, stringFirst/stringLast/doubleFirst/doubleLast/longFirst/longLast/floatFirst/floatLast rollup aggs, all", "query":{ "queryType" : "groupBy", "dataSource": "%%DATASOURCE%%", @@ -26,6 +26,36 @@ "type":"stringLast", "name":"latest_user", "fieldName":"last_user" + }, + { + "type": "doubleFirst", + "name": "double_first_delta", + "fieldName": "double_first_delta" + }, + { + "type": "doubleLast", + "name": "double_last_delta", + "fieldName": "double_last_delta" + }, + { + "type": "longFirst", + "name": "long_first_delta", + "fieldName": "long_first_delta" + }, + { + "type": "longFirst", + "name": "long_last_delta", + "fieldName": "long_last_delta" + }, + { + "type": "floatFirst", + "name": "float_first_delta", + "fieldName": "float_first_delta" + }, + { + "type": "floatLast", + "name": "float_last_delta", + "fieldName": "float_last_delta" } ] }, @@ -35,7 +65,13 @@ "event" : { "continent":"Asia", "earliest_user":"masterYi", - "latest_user":"stringer" + "latest_user":"stringer", + "double_first_delta": 111.0, + "double_last_delta": -9.0, + "long_first_delta": 111, + "long_last_delta": -9, + "float_first_delta": 111.0, + "float_last_delta": -9.0 } } ] } diff --git a/integration-tests/src/test/resources/indexer/wikipedia_merge_index_task.json b/integration-tests/src/test/resources/indexer/wikipedia_merge_index_task.json index 43264a8c6751..66379d0d0fa4 100644 --- a/integration-tests/src/test/resources/indexer/wikipedia_merge_index_task.json +++ b/integration-tests/src/test/resources/indexer/wikipedia_merge_index_task.json @@ -23,6 +23,36 @@ "name": "delta", "fieldName": "delta" }, + { + "type": "doubleFirst", + "name": "double_first_delta", + "fieldName": "delta" + }, + { + "type": "doubleLast", + "name": "double_last_delta", + "fieldName": "delta" + }, + { + "type": "longFirst", + "name": "long_first_delta", + "fieldName": "delta" + }, + { + "type": "longLast", + "name": "long_last_delta", + "fieldName": "delta" + }, + { + "type": "floatFirst", + "name": "float_first_delta", + "fieldName": "delta" + }, + { + "type": "floatLast", + "name": "float_last_delta", + "fieldName": "delta" + }, { "type": "stringFirst", "name": "first_user", diff --git a/integration-tests/src/test/resources/indexer/wikipedia_merge_reindex_druid_input_source_task.json b/integration-tests/src/test/resources/indexer/wikipedia_merge_reindex_druid_input_source_task.json index 9daae62c8d42..348aff886455 100644 --- a/integration-tests/src/test/resources/indexer/wikipedia_merge_reindex_druid_input_source_task.json +++ b/integration-tests/src/test/resources/indexer/wikipedia_merge_reindex_druid_input_source_task.json @@ -56,6 +56,36 @@ "type": "stringLast", "name": "last_user", "fieldName": "last_user" + }, + { + "type": "doubleFirst", + "name": "double_first_delta", + "fieldName": "double_first_delta" + }, + { + "type": "doubleLast", + "name": "double_last_delta", + "fieldName": "double_last_delta" + }, + { + "type": "longFirst", + "name": "long_first_delta", + "fieldName": "long_first_delta" + }, + { + "type": "longLast", + "name": "long_last_delta", + "fieldName": "long_last_delta" + }, + { + "type": "floatFirst", + "name": "float_first_delta", + "fieldName": "float_first_delta" + }, + { + "type": "floatLast", + "name": "float_last_delta", + "fieldName": "float_last_delta" } ] } diff --git a/integration-tests/src/test/resources/indexer/wikipedia_merge_reindex_task.json b/integration-tests/src/test/resources/indexer/wikipedia_merge_reindex_task.json index 127461dd117c..b0f9959a476b 100644 --- a/integration-tests/src/test/resources/indexer/wikipedia_merge_reindex_task.json +++ b/integration-tests/src/test/resources/indexer/wikipedia_merge_reindex_task.json @@ -19,6 +19,36 @@ "name": "delta", "fieldName": "delta" }, + { + "type": "doubleFirst", + "name": "double_first_delta", + "fieldName": "double_first_delta" + }, + { + "type": "doubleLast", + "name": "double_last_delta", + "fieldName": "double_last_delta" + }, + { + "type": "longFirst", + "name": "long_first_delta", + "fieldName": "long_first_delta" + }, + { + "type": "longLast", + "name": "long_last_delta", + "fieldName": "long_last_delta" + }, + { + "type": "floatFirst", + "name": "float_first_delta", + "fieldName": "float_first_delta" + }, + { + "type": "floatLast", + "name": "float_last_delta", + "fieldName": "float_last_delta" + }, { "type": "stringFirst", "name": "first_user", @@ -62,4 +92,4 @@ "type": "index" } } -} +} \ No newline at end of file diff --git a/processing/src/main/java/org/apache/druid/jackson/AggregatorsModule.java b/processing/src/main/java/org/apache/druid/jackson/AggregatorsModule.java index 795ea5b6d31c..0d3e2dea8efc 100644 --- a/processing/src/main/java/org/apache/druid/jackson/AggregatorsModule.java +++ b/processing/src/main/java/org/apache/druid/jackson/AggregatorsModule.java @@ -38,6 +38,9 @@ import org.apache.druid.query.aggregation.LongMinAggregatorFactory; import org.apache.druid.query.aggregation.LongSumAggregatorFactory; import org.apache.druid.query.aggregation.PostAggregator; +import org.apache.druid.query.aggregation.SerializablePairLongDoubleSerde; +import org.apache.druid.query.aggregation.SerializablePairLongFloatSerde; +import org.apache.druid.query.aggregation.SerializablePairLongLongSerde; import org.apache.druid.query.aggregation.SerializablePairLongStringSerde; import org.apache.druid.query.aggregation.any.DoubleAnyAggregatorFactory; import org.apache.druid.query.aggregation.any.FloatAnyAggregatorFactory; @@ -80,7 +83,10 @@ public AggregatorsModule() ComplexMetrics.registerSerde("hyperUnique", new HyperUniquesSerde()); ComplexMetrics.registerSerde("preComputedHyperUnique", new PreComputedHyperUniquesSerde()); - ComplexMetrics.registerSerde("serializablePairLongString", new SerializablePairLongStringSerde()); + ComplexMetrics.registerSerde(new SerializablePairLongStringSerde()); + ComplexMetrics.registerSerde(new SerializablePairLongDoubleSerde()); + ComplexMetrics.registerSerde(new SerializablePairLongFloatSerde()); + ComplexMetrics.registerSerde(new SerializablePairLongLongSerde()); setMixInAnnotation(AggregatorFactory.class, AggregatorFactoryMixin.class); setMixInAnnotation(PostAggregator.class, PostAggregatorMixin.class); diff --git a/processing/src/main/java/org/apache/druid/query/aggregation/AbstractSerializableLongObjectPairSerde.java b/processing/src/main/java/org/apache/druid/query/aggregation/AbstractSerializableLongObjectPairSerde.java new file mode 100644 index 000000000000..2598d8c5bf65 --- /dev/null +++ b/processing/src/main/java/org/apache/druid/query/aggregation/AbstractSerializableLongObjectPairSerde.java @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.query.aggregation; + +import org.apache.druid.collections.SerializablePair; +import org.apache.druid.data.input.InputRow; +import org.apache.druid.segment.GenericColumnSerializer; +import org.apache.druid.segment.column.ColumnBuilder; +import org.apache.druid.segment.data.GenericIndexed; +import org.apache.druid.segment.serde.ComplexColumnPartSupplier; +import org.apache.druid.segment.serde.ComplexMetricExtractor; +import org.apache.druid.segment.serde.ComplexMetricSerde; +import org.apache.druid.segment.serde.LargeColumnSupportedComplexColumnSerializer; +import org.apache.druid.segment.writeout.SegmentWriteOutMedium; + +import java.nio.ByteBuffer; + +/** + * The class serializes/deserializes a Pair object for double/float/longFirst and double/float/longLast aggregators + */ +public abstract class AbstractSerializableLongObjectPairSerde> + extends ComplexMetricSerde +{ + private final Class pairClassObject; + + public AbstractSerializableLongObjectPairSerde(Class pairClassObject) + { + this.pairClassObject = pairClassObject; + } + + @Override + public ComplexMetricExtractor getExtractor() + { + return new ComplexMetricExtractor() + { + @Override + public Class extractedClass() + { + return pairClassObject; + } + + @Override + public Object extractValue(InputRow inputRow, String metricName) + { + return inputRow.getRaw(metricName); + } + }; + } + + @Override + public void deserializeColumn(ByteBuffer buffer, ColumnBuilder columnBuilder) + { + final GenericIndexed column = GenericIndexed.read(buffer, getObjectStrategy(), columnBuilder.getFileMapper()); + columnBuilder.setComplexColumnSupplier(new ComplexColumnPartSupplier(getTypeName(), column)); + } + + @Override + public GenericColumnSerializer getSerializer(SegmentWriteOutMedium segmentWriteOutMedium, String column) + { + return LargeColumnSupportedComplexColumnSerializer.create(segmentWriteOutMedium, column, this.getObjectStrategy()); + } +} diff --git a/processing/src/main/java/org/apache/druid/query/aggregation/SerializablePairLongDouble.java b/processing/src/main/java/org/apache/druid/query/aggregation/SerializablePairLongDouble.java new file mode 100644 index 000000000000..199a214ac77a --- /dev/null +++ b/processing/src/main/java/org/apache/druid/query/aggregation/SerializablePairLongDouble.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.query.aggregation; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import org.apache.druid.collections.SerializablePair; + +import javax.annotation.Nullable; + +public class SerializablePairLongDouble extends SerializablePair +{ + @JsonCreator + public SerializablePairLongDouble(@JsonProperty("lhs") Long lhs, @JsonProperty("rhs") @Nullable Double rhs) + { + super(lhs, rhs); + } +} + + diff --git a/processing/src/main/java/org/apache/druid/query/aggregation/SerializablePairLongDoubleSerde.java b/processing/src/main/java/org/apache/druid/query/aggregation/SerializablePairLongDoubleSerde.java new file mode 100644 index 000000000000..270c5ecd140d --- /dev/null +++ b/processing/src/main/java/org/apache/druid/query/aggregation/SerializablePairLongDoubleSerde.java @@ -0,0 +1,114 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.query.aggregation; + +import org.apache.druid.collections.SerializablePair; +import org.apache.druid.common.config.NullHandling; +import org.apache.druid.segment.data.ObjectStrategy; + +import javax.annotation.Nullable; +import java.nio.ByteBuffer; +import java.util.Comparator; + +/** + * The class serializes a Long-Double pair (SerializablePair). + * The serialization structure is: Long:Double + *

+ * The class is used on first/last Double aggregators to store the time and the first/last Double. + * (Long:timestamp, Double:value) + */ +public class SerializablePairLongDoubleSerde extends AbstractSerializableLongObjectPairSerde +{ + public static final String TYPE_NAME = "serializablePairLongDouble"; + + private static class ObjectStrategyImpl implements ObjectStrategy + { + /** + * Since SerializablePairLongDouble is subclass of SerializablePair, + * it's safe to declare the generic type of comparator as SerializablePair. + */ + private final Comparator> pairComparator = SerializablePair.createNullHandlingComparator( + Double::compare, + true + ); + + @Override + public int compare(@Nullable SerializablePairLongDouble o1, @Nullable SerializablePairLongDouble o2) + { + return pairComparator.compare(o1, o2); + } + + @Override + public Class getClazz() + { + return SerializablePairLongDouble.class; + } + + @Override + public SerializablePairLongDouble fromByteBuffer(ByteBuffer buffer, int numBytes) + { + final ByteBuffer readOnlyBuffer = buffer.asReadOnlyBuffer(); + long lhs = readOnlyBuffer.getLong(); + boolean isNotNull = readOnlyBuffer.get() == NullHandling.IS_NOT_NULL_BYTE; + if (isNotNull) { + return new SerializablePairLongDouble(lhs, readOnlyBuffer.getDouble()); + } else { + return new SerializablePairLongDouble(lhs, null); + } + } + + @Override + public byte[] toBytes(@Nullable SerializablePairLongDouble longObjectPair) + { + if (longObjectPair == null) { + return new byte[]{}; + } + + ByteBuffer bbuf = ByteBuffer.allocate(Long.BYTES + Byte.BYTES + Double.BYTES); + bbuf.putLong(longObjectPair.lhs); + if (longObjectPair.rhs == null) { + bbuf.put(NullHandling.IS_NULL_BYTE); + } else { + bbuf.put(NullHandling.IS_NOT_NULL_BYTE); + bbuf.putDouble(longObjectPair.rhs); + } + return bbuf.array(); + } + } + + private static final ObjectStrategy OBJECT_STRATEGY = new ObjectStrategyImpl(); + + public SerializablePairLongDoubleSerde() + { + super(SerializablePairLongDouble.class); + } + + @Override + public String getTypeName() + { + return TYPE_NAME; + } + + @Override + public ObjectStrategy getObjectStrategy() + { + return OBJECT_STRATEGY; + } +} diff --git a/processing/src/main/java/org/apache/druid/query/aggregation/SerializablePairLongFloat.java b/processing/src/main/java/org/apache/druid/query/aggregation/SerializablePairLongFloat.java new file mode 100644 index 000000000000..6ecc0f9533e9 --- /dev/null +++ b/processing/src/main/java/org/apache/druid/query/aggregation/SerializablePairLongFloat.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.query.aggregation; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import org.apache.druid.collections.SerializablePair; + +import javax.annotation.Nullable; + +public class SerializablePairLongFloat extends SerializablePair +{ + @JsonCreator + public SerializablePairLongFloat(@JsonProperty("lhs") Long lhs, @JsonProperty("rhs") @Nullable Float rhs) + { + super(lhs, rhs); + } +} + + diff --git a/processing/src/main/java/org/apache/druid/query/aggregation/SerializablePairLongFloatSerde.java b/processing/src/main/java/org/apache/druid/query/aggregation/SerializablePairLongFloatSerde.java new file mode 100644 index 000000000000..db4ae83246cf --- /dev/null +++ b/processing/src/main/java/org/apache/druid/query/aggregation/SerializablePairLongFloatSerde.java @@ -0,0 +1,114 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.query.aggregation; + +import org.apache.druid.collections.SerializablePair; +import org.apache.druid.common.config.NullHandling; +import org.apache.druid.segment.data.ObjectStrategy; + +import javax.annotation.Nullable; +import java.nio.ByteBuffer; +import java.util.Comparator; + +/** + * The class serializes a Long-Float pair (SerializablePair). + * The serialization structure is: Long:Float + *

+ * The class is used on first/last Float aggregators to store the time and the first/last Float. + * (Long:timestamp, Float:value) + */ +public class SerializablePairLongFloatSerde extends AbstractSerializableLongObjectPairSerde +{ + public static final String TYPE_NAME = "serializablePairLongFloat"; + + private static class ObjectStrategyImpl implements ObjectStrategy + { + /** + * Since SerializablePairLongFloat is subclass of SerializablePair, + * it's safe to declare the generic type of comparator as SerializablePair. + */ + private final Comparator> pairComparator = SerializablePair.createNullHandlingComparator( + Float::compare, + true + ); + + @Override + public int compare(@Nullable SerializablePairLongFloat o1, @Nullable SerializablePairLongFloat o2) + { + return pairComparator.compare(o1, o2); + } + + @Override + public Class getClazz() + { + return SerializablePairLongFloat.class; + } + + @Override + public SerializablePairLongFloat fromByteBuffer(ByteBuffer buffer, int numBytes) + { + final ByteBuffer readOnlyBuffer = buffer.asReadOnlyBuffer(); + long lhs = readOnlyBuffer.getLong(); + boolean isNotNull = readOnlyBuffer.get() == NullHandling.IS_NOT_NULL_BYTE; + if (isNotNull) { + return new SerializablePairLongFloat(lhs, readOnlyBuffer.getFloat()); + } else { + return new SerializablePairLongFloat(lhs, null); + } + } + + @Override + public byte[] toBytes(@Nullable SerializablePairLongFloat longObjectPair) + { + if (longObjectPair == null) { + return new byte[]{}; + } + + ByteBuffer bbuf = ByteBuffer.allocate(Long.BYTES + Byte.BYTES + Float.BYTES); + bbuf.putLong(longObjectPair.lhs); + if (longObjectPair.rhs == null) { + bbuf.put(NullHandling.IS_NULL_BYTE); + } else { + bbuf.put(NullHandling.IS_NOT_NULL_BYTE); + bbuf.putFloat(longObjectPair.rhs); + } + return bbuf.array(); + } + } + + private static final ObjectStrategy OBJECT_STRATEGY = new ObjectStrategyImpl(); + + public SerializablePairLongFloatSerde() + { + super(SerializablePairLongFloat.class); + } + + @Override + public String getTypeName() + { + return TYPE_NAME; + } + + @Override + public ObjectStrategy getObjectStrategy() + { + return OBJECT_STRATEGY; + } +} diff --git a/processing/src/main/java/org/apache/druid/query/aggregation/SerializablePairLongLong.java b/processing/src/main/java/org/apache/druid/query/aggregation/SerializablePairLongLong.java new file mode 100644 index 000000000000..82f9f191b1f6 --- /dev/null +++ b/processing/src/main/java/org/apache/druid/query/aggregation/SerializablePairLongLong.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.query.aggregation; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import org.apache.druid.collections.SerializablePair; + +import javax.annotation.Nullable; + +public class SerializablePairLongLong extends SerializablePair +{ + @JsonCreator + public SerializablePairLongLong(@JsonProperty("lhs") Long lhs, @JsonProperty("rhs") @Nullable Long rhs) + { + super(lhs, rhs); + } +} + + diff --git a/processing/src/main/java/org/apache/druid/query/aggregation/SerializablePairLongLongSerde.java b/processing/src/main/java/org/apache/druid/query/aggregation/SerializablePairLongLongSerde.java new file mode 100644 index 000000000000..76208f10dd23 --- /dev/null +++ b/processing/src/main/java/org/apache/druid/query/aggregation/SerializablePairLongLongSerde.java @@ -0,0 +1,114 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.query.aggregation; + +import org.apache.druid.collections.SerializablePair; +import org.apache.druid.common.config.NullHandling; +import org.apache.druid.segment.data.ObjectStrategy; + +import javax.annotation.Nullable; +import java.nio.ByteBuffer; +import java.util.Comparator; + +/** + * The class serializes a Long-Long pair (SerializablePair). + * The serialization structure is: Long:Long + *

+ * The class is used on first/last Long aggregators to store the time and the first/last Long. + * (Long:timestamp, Long:value) + */ +public class SerializablePairLongLongSerde extends AbstractSerializableLongObjectPairSerde +{ + public static final String TYPE_NAME = "serializablePairLongLong"; + + private static class ObjectStrategyImpl implements ObjectStrategy + { + /** + * Since SerializablePairLongLong is subclass of SerializablePair, + * it's safe to declare the generic type of comparator as SerializablePair. + */ + private final Comparator> pairComparator = SerializablePair.createNullHandlingComparator( + Long::compare, + true + ); + + @Override + public int compare(@Nullable SerializablePairLongLong o1, @Nullable SerializablePairLongLong o2) + { + return pairComparator.compare(o1, o2); + } + + @Override + public Class getClazz() + { + return SerializablePairLongLong.class; + } + + @Override + public SerializablePairLongLong fromByteBuffer(ByteBuffer buffer, int numBytes) + { + final ByteBuffer readOnlyBuffer = buffer.asReadOnlyBuffer(); + long lhs = readOnlyBuffer.getLong(); + boolean isNotNull = readOnlyBuffer.get() == NullHandling.IS_NOT_NULL_BYTE; + if (isNotNull) { + return new SerializablePairLongLong(lhs, readOnlyBuffer.getLong()); + } else { + return new SerializablePairLongLong(lhs, null); + } + } + + @Override + public byte[] toBytes(@Nullable SerializablePairLongLong longObjectPair) + { + if (longObjectPair == null) { + return new byte[]{}; + } + + ByteBuffer bbuf = ByteBuffer.allocate(Long.BYTES + Byte.BYTES + Long.BYTES); + bbuf.putLong(longObjectPair.lhs); + if (longObjectPair.rhs == null) { + bbuf.put(NullHandling.IS_NULL_BYTE); + } else { + bbuf.put(NullHandling.IS_NOT_NULL_BYTE); + bbuf.putLong(longObjectPair.rhs); + } + return bbuf.array(); + } + } + + private static final ObjectStrategy OBJECT_STRATEGY = new ObjectStrategyImpl(); + + public SerializablePairLongLongSerde() + { + super(SerializablePairLongLong.class); + } + + @Override + public String getTypeName() + { + return TYPE_NAME; + } + + @Override + public ObjectStrategy getObjectStrategy() + { + return OBJECT_STRATEGY; + } +} diff --git a/processing/src/main/java/org/apache/druid/query/aggregation/SerializablePairLongStringSerde.java b/processing/src/main/java/org/apache/druid/query/aggregation/SerializablePairLongStringSerde.java index ab40ec7a1899..41a08ff43d03 100644 --- a/processing/src/main/java/org/apache/druid/query/aggregation/SerializablePairLongStringSerde.java +++ b/processing/src/main/java/org/apache/druid/query/aggregation/SerializablePairLongStringSerde.java @@ -45,7 +45,7 @@ public class SerializablePairLongStringSerde extends ComplexMetricSerde { - private static final String TYPE_NAME = "serializablePairLongString"; + public static final String TYPE_NAME = "serializablePairLongString"; @Override public String getTypeName() diff --git a/processing/src/main/java/org/apache/druid/query/aggregation/first/DoubleFirstAggregator.java b/processing/src/main/java/org/apache/druid/query/aggregation/first/DoubleFirstAggregator.java index 8b4a89d0d72e..a0148c48f496 100644 --- a/processing/src/main/java/org/apache/druid/query/aggregation/first/DoubleFirstAggregator.java +++ b/processing/src/main/java/org/apache/druid/query/aggregation/first/DoubleFirstAggregator.java @@ -19,30 +19,38 @@ package org.apache.druid.query.aggregation.first; -import org.apache.druid.collections.SerializablePair; -import org.apache.druid.segment.BaseDoubleColumnValueSelector; +import org.apache.druid.query.aggregation.SerializablePairLongDouble; import org.apache.druid.segment.BaseLongColumnValueSelector; +import org.apache.druid.segment.ColumnValueSelector; -public class DoubleFirstAggregator extends NumericFirstAggregator +public class DoubleFirstAggregator extends NumericFirstAggregator { double firstValue; - public DoubleFirstAggregator(BaseLongColumnValueSelector timeSelector, BaseDoubleColumnValueSelector valueSelector) + public DoubleFirstAggregator(BaseLongColumnValueSelector timeSelector, + ColumnValueSelector valueSelector, + boolean needsFoldCheck) { - super(timeSelector, valueSelector); + super(timeSelector, valueSelector, needsFoldCheck); firstValue = 0; } @Override - void setCurrentValue() + void setFirstValue(ColumnValueSelector valueSelector) { firstValue = valueSelector.getDouble(); } + @Override + void setFirstValue(Number firstValue) + { + this.firstValue = firstValue.doubleValue(); + } + @Override public Object get() { - return new SerializablePair<>(firstTime, rhsNull ? null : firstValue); + return new SerializablePairLongDouble(firstTime, rhsNull ? null : firstValue); } @Override diff --git a/processing/src/main/java/org/apache/druid/query/aggregation/first/DoubleFirstAggregatorFactory.java b/processing/src/main/java/org/apache/druid/query/aggregation/first/DoubleFirstAggregatorFactory.java index 15463cf1d938..f7f00599d168 100644 --- a/processing/src/main/java/org/apache/druid/query/aggregation/first/DoubleFirstAggregatorFactory.java +++ b/processing/src/main/java/org/apache/druid/query/aggregation/first/DoubleFirstAggregatorFactory.java @@ -21,17 +21,17 @@ import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.annotation.JsonTypeName; import com.google.common.base.Preconditions; import org.apache.druid.collections.SerializablePair; import org.apache.druid.java.util.common.StringUtils; -import org.apache.druid.java.util.common.UOE; import org.apache.druid.query.aggregation.AggregateCombiner; import org.apache.druid.query.aggregation.Aggregator; import org.apache.druid.query.aggregation.AggregatorFactory; import org.apache.druid.query.aggregation.AggregatorUtil; import org.apache.druid.query.aggregation.BufferAggregator; -import org.apache.druid.query.monomorphicprocessing.RuntimeShapeInspector; -import org.apache.druid.segment.BaseDoubleColumnValueSelector; +import org.apache.druid.query.aggregation.SerializablePairLongDouble; +import org.apache.druid.query.aggregation.SerializablePairLongDoubleSerde; import org.apache.druid.segment.ColumnSelectorFactory; import org.apache.druid.segment.ColumnValueSelector; import org.apache.druid.segment.NilColumnValueSelector; @@ -47,11 +47,13 @@ import java.util.Map; import java.util.Objects; +@JsonTypeName("doubleFirst") public class DoubleFirstAggregatorFactory extends AggregatorFactory { private static final Aggregator NIL_AGGREGATOR = new DoubleFirstAggregator( NilColumnValueSelector.instance(), - NilColumnValueSelector.instance() + NilColumnValueSelector.instance(), + false ) { @Override @@ -63,7 +65,8 @@ public void aggregate() private static final BufferAggregator NIL_BUFFER_AGGREGATOR = new DoubleFirstBufferAggregator( NilColumnValueSelector.instance(), - NilColumnValueSelector.instance() + NilColumnValueSelector.instance(), + false ) { @Override @@ -97,29 +100,39 @@ public DoubleFirstAggregatorFactory( @Override public Aggregator factorize(ColumnSelectorFactory metricFactory) { - final BaseDoubleColumnValueSelector valueSelector = metricFactory.makeColumnValueSelector(fieldName); + final ColumnValueSelector valueSelector = metricFactory.makeColumnValueSelector(fieldName); if (valueSelector instanceof NilColumnValueSelector) { return NIL_AGGREGATOR; - } else { - return new DoubleFirstAggregator( - metricFactory.makeColumnValueSelector(ColumnHolder.TIME_COLUMN_NAME), - valueSelector - ); } + + return new DoubleFirstAggregator( + metricFactory.makeColumnValueSelector(ColumnHolder.TIME_COLUMN_NAME), + valueSelector, + StringFirstLastUtils.selectorNeedsFoldCheck( + valueSelector, + metricFactory.getColumnCapabilities(fieldName), + SerializablePairLongDouble.class + ) + ); } @Override public BufferAggregator factorizeBuffered(ColumnSelectorFactory metricFactory) { - final BaseDoubleColumnValueSelector valueSelector = metricFactory.makeColumnValueSelector(fieldName); - if (valueSelector instanceof NilColumnValueSelector) { + final ColumnValueSelector selector = metricFactory.makeColumnValueSelector(fieldName); + if (selector instanceof NilColumnValueSelector) { return NIL_BUFFER_AGGREGATOR; - } else { - return new DoubleFirstBufferAggregator( - metricFactory.makeColumnValueSelector(ColumnHolder.TIME_COLUMN_NAME), - valueSelector - ); } + + return new DoubleFirstBufferAggregator( + metricFactory.makeColumnValueSelector(ColumnHolder.TIME_COLUMN_NAME), + selector, + StringFirstLastUtils.selectorNeedsFoldCheck( + selector, + metricFactory.getColumnCapabilities(fieldName), + SerializablePairLongDouble.class + ) + ); } @Override @@ -150,74 +163,13 @@ public Object combine(@Nullable Object lhs, @Nullable Object rhs) @Override public AggregateCombiner makeAggregateCombiner() { - throw new UOE("DoubleFirstAggregatorFactory is not supported during ingestion for rollup"); + return new GenericFirstAggregateCombiner(SerializablePairLongDouble.class); } @Override public AggregatorFactory getCombiningFactory() { - return new DoubleFirstAggregatorFactory(name, name) - { - @Override - public Aggregator factorize(ColumnSelectorFactory metricFactory) - { - final ColumnValueSelector> selector = - metricFactory.makeColumnValueSelector(name); - return new DoubleFirstAggregator(null, null) - { - @Override - public void aggregate() - { - SerializablePair pair = selector.getObject(); - if (pair.lhs < firstTime) { - firstTime = pair.lhs; - if (pair.rhs != null) { - firstValue = pair.rhs; - rhsNull = false; - } else { - rhsNull = true; - } - } - } - }; - } - - @Override - public BufferAggregator factorizeBuffered(ColumnSelectorFactory metricFactory) - { - final ColumnValueSelector> selector = - metricFactory.makeColumnValueSelector(name); - return new DoubleFirstBufferAggregator(null, null) - { - @Override - public void putValue(ByteBuffer buf, int position) - { - SerializablePair pair = selector.getObject(); - buf.putDouble(position, pair.rhs); - } - - @Override - public void aggregate(ByteBuffer buf, int position) - { - SerializablePair pair = (SerializablePair) selector.getObject(); - long firstTime = buf.getLong(position); - if (pair.lhs < firstTime) { - if (pair.rhs != null) { - updateTimeWithValue(buf, position, pair.lhs); - } else { - updateTimeWithNull(buf, position, pair.lhs); - } - } - } - - @Override - public void inspectRuntimeShape(RuntimeShapeInspector inspector) - { - inspector.visit("selector", selector); - } - }; - } - }; + return new DoubleFirstAggregatorFactory(name, name); } @Override @@ -273,11 +225,19 @@ public byte[] getCacheKey() .array(); } + @Override + public String getComplexTypeName() + { + return SerializablePairLongDoubleSerde.TYPE_NAME; + } + + /** + * actual type is {@link SerializablePair} + */ @Override public ValueType getType() { - // if we don't pretend to be a primitive, group by v1 gets sad and doesn't work because no complex type serde - return storeDoubleAsFloat ? ValueType.FLOAT : ValueType.DOUBLE; + return ValueType.COMPLEX; } @Override diff --git a/processing/src/main/java/org/apache/druid/query/aggregation/first/DoubleFirstBufferAggregator.java b/processing/src/main/java/org/apache/druid/query/aggregation/first/DoubleFirstBufferAggregator.java index dabade475369..9b56104b7192 100644 --- a/processing/src/main/java/org/apache/druid/query/aggregation/first/DoubleFirstBufferAggregator.java +++ b/processing/src/main/java/org/apache/druid/query/aggregation/first/DoubleFirstBufferAggregator.java @@ -19,20 +19,21 @@ package org.apache.druid.query.aggregation.first; -import org.apache.druid.collections.SerializablePair; -import org.apache.druid.segment.BaseDoubleColumnValueSelector; +import org.apache.druid.query.aggregation.SerializablePairLongDouble; import org.apache.druid.segment.BaseLongColumnValueSelector; +import org.apache.druid.segment.ColumnValueSelector; import java.nio.ByteBuffer; -public class DoubleFirstBufferAggregator extends NumericFirstBufferAggregator +public class DoubleFirstBufferAggregator extends NumericFirstBufferAggregator { public DoubleFirstBufferAggregator( BaseLongColumnValueSelector timeSelector, - BaseDoubleColumnValueSelector valueSelector + ColumnValueSelector valueSelector, + boolean needsFoldCheck ) { - super(timeSelector, valueSelector); + super(timeSelector, valueSelector, needsFoldCheck); } @Override @@ -42,16 +43,25 @@ void initValue(ByteBuffer buf, int position) } @Override - void putValue(ByteBuffer buf, int position) + void putValue(ByteBuffer buf, int position, ColumnValueSelector valueSelector) { buf.putDouble(position, valueSelector.getDouble()); } + @Override + void putValue(ByteBuffer buf, int position, Number value) + { + buf.putDouble(position, value.doubleValue()); + } + @Override public Object get(ByteBuffer buf, int position) { final boolean rhsNull = isValueNull(buf, position); - return new SerializablePair<>(buf.getLong(position), rhsNull ? null : buf.getDouble(position + VALUE_OFFSET)); + return new SerializablePairLongDouble( + buf.getLong(position), + rhsNull ? null : buf.getDouble(position + VALUE_OFFSET) + ); } @Override diff --git a/processing/src/main/java/org/apache/druid/query/aggregation/first/FloatFirstAggregator.java b/processing/src/main/java/org/apache/druid/query/aggregation/first/FloatFirstAggregator.java index 2c0f62934ffe..d02bfdb922cd 100644 --- a/processing/src/main/java/org/apache/druid/query/aggregation/first/FloatFirstAggregator.java +++ b/processing/src/main/java/org/apache/druid/query/aggregation/first/FloatFirstAggregator.java @@ -19,33 +19,40 @@ package org.apache.druid.query.aggregation.first; -import org.apache.druid.collections.SerializablePair; -import org.apache.druid.segment.BaseFloatColumnValueSelector; +import org.apache.druid.query.aggregation.SerializablePairLongFloat; import org.apache.druid.segment.BaseLongColumnValueSelector; +import org.apache.druid.segment.ColumnValueSelector; -public class FloatFirstAggregator extends NumericFirstAggregator +public class FloatFirstAggregator extends NumericFirstAggregator { float firstValue; public FloatFirstAggregator( BaseLongColumnValueSelector timeSelector, - BaseFloatColumnValueSelector valueSelector + ColumnValueSelector valueSelector, + boolean needsFoldCheck ) { - super(timeSelector, valueSelector); + super(timeSelector, valueSelector, needsFoldCheck); firstValue = 0; } @Override - void setCurrentValue() + void setFirstValue(ColumnValueSelector valueSelector) { firstValue = valueSelector.getFloat(); } + @Override + void setFirstValue(Number firstValue) + { + this.firstValue = firstValue.floatValue(); + } + @Override public Object get() { - return new SerializablePair<>(firstTime, rhsNull ? null : firstValue); + return new SerializablePairLongFloat(firstTime, rhsNull ? null : firstValue); } @Override diff --git a/processing/src/main/java/org/apache/druid/query/aggregation/first/FloatFirstAggregatorFactory.java b/processing/src/main/java/org/apache/druid/query/aggregation/first/FloatFirstAggregatorFactory.java index ab9211f1277c..d7fe64225d5f 100644 --- a/processing/src/main/java/org/apache/druid/query/aggregation/first/FloatFirstAggregatorFactory.java +++ b/processing/src/main/java/org/apache/druid/query/aggregation/first/FloatFirstAggregatorFactory.java @@ -21,17 +21,17 @@ import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.annotation.JsonTypeName; import com.google.common.base.Preconditions; import org.apache.druid.collections.SerializablePair; import org.apache.druid.java.util.common.StringUtils; -import org.apache.druid.java.util.common.UOE; import org.apache.druid.query.aggregation.AggregateCombiner; import org.apache.druid.query.aggregation.Aggregator; import org.apache.druid.query.aggregation.AggregatorFactory; import org.apache.druid.query.aggregation.AggregatorUtil; import org.apache.druid.query.aggregation.BufferAggregator; -import org.apache.druid.query.monomorphicprocessing.RuntimeShapeInspector; -import org.apache.druid.segment.BaseFloatColumnValueSelector; +import org.apache.druid.query.aggregation.SerializablePairLongFloat; +import org.apache.druid.query.aggregation.SerializablePairLongFloatSerde; import org.apache.druid.segment.ColumnSelectorFactory; import org.apache.druid.segment.ColumnValueSelector; import org.apache.druid.segment.NilColumnValueSelector; @@ -47,11 +47,13 @@ import java.util.Map; import java.util.Objects; +@JsonTypeName("floatFirst") public class FloatFirstAggregatorFactory extends AggregatorFactory { private static final Aggregator NIL_AGGREGATOR = new FloatFirstAggregator( NilColumnValueSelector.instance(), - NilColumnValueSelector.instance() + NilColumnValueSelector.instance(), + false ) { @Override @@ -63,7 +65,8 @@ public void aggregate() private static final BufferAggregator NIL_BUFFER_AGGREGATOR = new FloatFirstBufferAggregator( NilColumnValueSelector.instance(), - NilColumnValueSelector.instance() + NilColumnValueSelector.instance(), + false ) { @Override @@ -95,29 +98,39 @@ public FloatFirstAggregatorFactory( @Override public Aggregator factorize(ColumnSelectorFactory metricFactory) { - final BaseFloatColumnValueSelector valueSelector = metricFactory.makeColumnValueSelector(fieldName); - if (valueSelector instanceof NilColumnValueSelector) { + final ColumnValueSelector selector = metricFactory.makeColumnValueSelector(fieldName); + if (selector instanceof NilColumnValueSelector) { return NIL_AGGREGATOR; - } else { - return new FloatFirstAggregator( - metricFactory.makeColumnValueSelector(ColumnHolder.TIME_COLUMN_NAME), - valueSelector - ); } + + return new FloatFirstAggregator( + metricFactory.makeColumnValueSelector(ColumnHolder.TIME_COLUMN_NAME), + selector, + StringFirstLastUtils.selectorNeedsFoldCheck( + selector, + metricFactory.getColumnCapabilities(fieldName), + SerializablePairLongFloat.class + ) + ); } @Override public BufferAggregator factorizeBuffered(ColumnSelectorFactory metricFactory) { - final BaseFloatColumnValueSelector valueSelector = metricFactory.makeColumnValueSelector(fieldName); - if (valueSelector instanceof NilColumnValueSelector) { + final ColumnValueSelector selector = metricFactory.makeColumnValueSelector(fieldName); + if (selector instanceof NilColumnValueSelector) { return NIL_BUFFER_AGGREGATOR; - } else { - return new FloatFirstBufferAggregator( - metricFactory.makeColumnValueSelector(ColumnHolder.TIME_COLUMN_NAME), - valueSelector - ); } + + return new FloatFirstBufferAggregator( + metricFactory.makeColumnValueSelector(ColumnHolder.TIME_COLUMN_NAME), + selector, + StringFirstLastUtils.selectorNeedsFoldCheck( + selector, + metricFactory.getColumnCapabilities(fieldName), + SerializablePairLongFloat.class + ) + ); } @Override @@ -148,73 +161,13 @@ public Object combine(@Nullable Object lhs, @Nullable Object rhs) @Override public AggregateCombiner makeAggregateCombiner() { - throw new UOE("FloatFirstAggregatorFactory is not supported during ingestion for rollup"); + return new GenericFirstAggregateCombiner(SerializablePairLongFloat.class); } @Override public AggregatorFactory getCombiningFactory() { - - return new FloatFirstAggregatorFactory(name, name) - { - @Override - public Aggregator factorize(ColumnSelectorFactory metricFactory) - { - final ColumnValueSelector> selector = metricFactory.makeColumnValueSelector(name); - return new FloatFirstAggregator(null, null) - { - @Override - public void aggregate() - { - SerializablePair pair = selector.getObject(); - if (pair.lhs < firstTime) { - firstTime = pair.lhs; - if (pair.rhs != null) { - firstValue = pair.rhs; - rhsNull = false; - } else { - rhsNull = true; - } - } - } - }; - } - - @Override - public BufferAggregator factorizeBuffered(ColumnSelectorFactory metricFactory) - { - final ColumnValueSelector> selector = metricFactory.makeColumnValueSelector(name); - return new FloatFirstBufferAggregator(null, null) - { - @Override - public void putValue(ByteBuffer buf, int position) - { - SerializablePair pair = selector.getObject(); - buf.putFloat(position, pair.rhs); - } - - @Override - public void aggregate(ByteBuffer buf, int position) - { - SerializablePair pair = selector.getObject(); - long firstTime = buf.getLong(position); - if (pair.lhs < firstTime) { - if (pair.rhs != null) { - updateTimeWithValue(buf, position, pair.lhs); - } else { - updateTimeWithNull(buf, position, pair.lhs); - } - } - } - - @Override - public void inspectRuntimeShape(RuntimeShapeInspector inspector) - { - inspector.visit("selector", selector); - } - }; - } - }; + return new FloatFirstAggregatorFactory(name, name); } @Override @@ -270,11 +223,19 @@ public byte[] getCacheKey() .array(); } + @Override + public String getComplexTypeName() + { + return SerializablePairLongFloatSerde.TYPE_NAME; + } + + /** + * actual type is {@link SerializablePair} + */ @Override public ValueType getType() { - // if we don't pretend to be a primitive, group by v1 gets sad and doesn't work because no complex type serde - return ValueType.FLOAT; + return ValueType.COMPLEX; } @Override diff --git a/processing/src/main/java/org/apache/druid/query/aggregation/first/FloatFirstBufferAggregator.java b/processing/src/main/java/org/apache/druid/query/aggregation/first/FloatFirstBufferAggregator.java index cf7d272b0085..e4cdfbf11a88 100644 --- a/processing/src/main/java/org/apache/druid/query/aggregation/first/FloatFirstBufferAggregator.java +++ b/processing/src/main/java/org/apache/druid/query/aggregation/first/FloatFirstBufferAggregator.java @@ -19,20 +19,21 @@ package org.apache.druid.query.aggregation.first; -import org.apache.druid.collections.SerializablePair; -import org.apache.druid.segment.BaseFloatColumnValueSelector; +import org.apache.druid.query.aggregation.SerializablePairLongFloat; import org.apache.druid.segment.BaseLongColumnValueSelector; +import org.apache.druid.segment.ColumnValueSelector; import java.nio.ByteBuffer; -public class FloatFirstBufferAggregator extends NumericFirstBufferAggregator +public class FloatFirstBufferAggregator extends NumericFirstBufferAggregator { public FloatFirstBufferAggregator( BaseLongColumnValueSelector timeSelector, - BaseFloatColumnValueSelector valueSelector + ColumnValueSelector valueSelector, + boolean needsFoldCheck ) { - super(timeSelector, valueSelector); + super(timeSelector, valueSelector, needsFoldCheck); } @Override @@ -42,16 +43,22 @@ void initValue(ByteBuffer buf, int position) } @Override - void putValue(ByteBuffer buf, int position) + void putValue(ByteBuffer buf, int position, ColumnValueSelector valueSelector) { buf.putFloat(position, valueSelector.getFloat()); } + @Override + void putValue(ByteBuffer buf, int position, Number value) + { + buf.putFloat(position, value.floatValue()); + } + @Override public Object get(ByteBuffer buf, int position) { final boolean rhsNull = isValueNull(buf, position); - return new SerializablePair<>(buf.getLong(position), rhsNull ? null : buf.getFloat(position + VALUE_OFFSET)); + return new SerializablePairLongFloat(buf.getLong(position), rhsNull ? null : buf.getFloat(position + VALUE_OFFSET)); } @Override diff --git a/processing/src/main/java/org/apache/druid/query/aggregation/first/GenericFirstAggregateCombiner.java b/processing/src/main/java/org/apache/druid/query/aggregation/first/GenericFirstAggregateCombiner.java new file mode 100644 index 000000000000..caba5ac003bb --- /dev/null +++ b/processing/src/main/java/org/apache/druid/query/aggregation/first/GenericFirstAggregateCombiner.java @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.query.aggregation.first; + +import com.google.common.primitives.Longs; +import org.apache.druid.collections.SerializablePair; +import org.apache.druid.query.aggregation.ObjectAggregateCombiner; +import org.apache.druid.segment.ColumnValueSelector; + +import javax.annotation.Nullable; + +public class GenericFirstAggregateCombiner> + extends ObjectAggregateCombiner +{ + private final Class pairClass; + private T firstValue; + + public GenericFirstAggregateCombiner(Class pairClass) + { + this.pairClass = pairClass; + } + + @Override + public void reset(ColumnValueSelector selector) + { + firstValue = (T) selector.getObject(); + } + + @Override + public void fold(ColumnValueSelector selector) + { + T newValue = (T) selector.getObject(); + + if (Longs.compare(firstValue.lhs, newValue.lhs) > 0) { + firstValue = newValue; + } + } + + @Nullable + @Override + public T getObject() + { + return firstValue; + } + + @Override + public Class classOfObject() + { + return this.pairClass; + } +} diff --git a/processing/src/main/java/org/apache/druid/query/aggregation/first/LongFirstAggregator.java b/processing/src/main/java/org/apache/druid/query/aggregation/first/LongFirstAggregator.java index 8cda544521ea..53559674393c 100644 --- a/processing/src/main/java/org/apache/druid/query/aggregation/first/LongFirstAggregator.java +++ b/processing/src/main/java/org/apache/druid/query/aggregation/first/LongFirstAggregator.java @@ -19,29 +19,38 @@ package org.apache.druid.query.aggregation.first; -import org.apache.druid.collections.SerializablePair; +import org.apache.druid.query.aggregation.SerializablePairLongLong; import org.apache.druid.segment.BaseLongColumnValueSelector; +import org.apache.druid.segment.ColumnValueSelector; -public class LongFirstAggregator extends NumericFirstAggregator +public class LongFirstAggregator extends NumericFirstAggregator { long firstValue; - public LongFirstAggregator(BaseLongColumnValueSelector timeSelector, BaseLongColumnValueSelector valueSelector) + public LongFirstAggregator(BaseLongColumnValueSelector timeSelector, + ColumnValueSelector valueSelector, + boolean needsFoldCheck) { - super(timeSelector, valueSelector); + super(timeSelector, valueSelector, needsFoldCheck); firstValue = 0; } @Override - void setCurrentValue() + void setFirstValue(ColumnValueSelector valueSelector) { firstValue = valueSelector.getLong(); } + @Override + void setFirstValue(Number firstValue) + { + this.firstValue = firstValue.longValue(); + } + @Override public Object get() { - return new SerializablePair<>(firstTime, rhsNull ? null : firstValue); + return new SerializablePairLongLong(firstTime, rhsNull ? null : firstValue); } @Override diff --git a/processing/src/main/java/org/apache/druid/query/aggregation/first/LongFirstAggregatorFactory.java b/processing/src/main/java/org/apache/druid/query/aggregation/first/LongFirstAggregatorFactory.java index 697663c38574..30a083e30ec7 100644 --- a/processing/src/main/java/org/apache/druid/query/aggregation/first/LongFirstAggregatorFactory.java +++ b/processing/src/main/java/org/apache/druid/query/aggregation/first/LongFirstAggregatorFactory.java @@ -21,17 +21,17 @@ import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.annotation.JsonTypeName; import com.google.common.base.Preconditions; import org.apache.druid.collections.SerializablePair; import org.apache.druid.java.util.common.StringUtils; -import org.apache.druid.java.util.common.UOE; import org.apache.druid.query.aggregation.AggregateCombiner; import org.apache.druid.query.aggregation.Aggregator; import org.apache.druid.query.aggregation.AggregatorFactory; import org.apache.druid.query.aggregation.AggregatorUtil; import org.apache.druid.query.aggregation.BufferAggregator; -import org.apache.druid.query.monomorphicprocessing.RuntimeShapeInspector; -import org.apache.druid.segment.BaseLongColumnValueSelector; +import org.apache.druid.query.aggregation.SerializablePairLongLong; +import org.apache.druid.query.aggregation.SerializablePairLongLongSerde; import org.apache.druid.segment.ColumnSelectorFactory; import org.apache.druid.segment.ColumnValueSelector; import org.apache.druid.segment.NilColumnValueSelector; @@ -46,11 +46,13 @@ import java.util.List; import java.util.Map; +@JsonTypeName("longFirst") public class LongFirstAggregatorFactory extends AggregatorFactory { private static final Aggregator NIL_AGGREGATOR = new LongFirstAggregator( NilColumnValueSelector.instance(), - NilColumnValueSelector.instance() + NilColumnValueSelector.instance(), + false ) { @Override @@ -62,7 +64,8 @@ public void aggregate() private static final BufferAggregator NIL_BUFFER_AGGREGATOR = new LongFirstBufferAggregator( NilColumnValueSelector.instance(), - NilColumnValueSelector.instance() + NilColumnValueSelector.instance(), + false ) { @Override @@ -94,29 +97,39 @@ public LongFirstAggregatorFactory( @Override public Aggregator factorize(ColumnSelectorFactory metricFactory) { - final BaseLongColumnValueSelector valueSelector = metricFactory.makeColumnValueSelector(fieldName); + final ColumnValueSelector valueSelector = metricFactory.makeColumnValueSelector(fieldName); if (valueSelector instanceof NilColumnValueSelector) { return NIL_AGGREGATOR; - } else { - return new LongFirstAggregator( - metricFactory.makeColumnValueSelector(ColumnHolder.TIME_COLUMN_NAME), - valueSelector - ); } + + return new LongFirstAggregator( + metricFactory.makeColumnValueSelector(ColumnHolder.TIME_COLUMN_NAME), + valueSelector, + StringFirstLastUtils.selectorNeedsFoldCheck( + valueSelector, + metricFactory.getColumnCapabilities(fieldName), + SerializablePairLongLong.class + ) + ); } @Override public BufferAggregator factorizeBuffered(ColumnSelectorFactory metricFactory) { - final BaseLongColumnValueSelector valueSelector = metricFactory.makeColumnValueSelector(fieldName); - if (valueSelector instanceof NilColumnValueSelector) { + final ColumnValueSelector selector = metricFactory.makeColumnValueSelector(fieldName); + if (selector instanceof NilColumnValueSelector) { return NIL_BUFFER_AGGREGATOR; - } else { - return new LongFirstBufferAggregator( - metricFactory.makeColumnValueSelector(ColumnHolder.TIME_COLUMN_NAME), - valueSelector - ); } + + return new LongFirstBufferAggregator( + metricFactory.makeColumnValueSelector(ColumnHolder.TIME_COLUMN_NAME), + selector, + StringFirstLastUtils.selectorNeedsFoldCheck( + selector, + metricFactory.getColumnCapabilities(fieldName), + SerializablePairLongLong.class + ) + ); } @Override @@ -147,72 +160,13 @@ public Object combine(@Nullable Object lhs, @Nullable Object rhs) @Override public AggregateCombiner makeAggregateCombiner() { - throw new UOE("LongFirstAggregatorFactory is not supported during ingestion for rollup"); + return new GenericFirstAggregateCombiner(SerializablePairLongLong.class); } @Override public AggregatorFactory getCombiningFactory() { - return new LongFirstAggregatorFactory(name, name) - { - @Override - public Aggregator factorize(ColumnSelectorFactory metricFactory) - { - final ColumnValueSelector> selector = metricFactory.makeColumnValueSelector(name); - return new LongFirstAggregator(null, null) - { - @Override - public void aggregate() - { - SerializablePair pair = selector.getObject(); - if (pair.lhs < firstTime) { - firstTime = pair.lhs; - if (pair.rhs != null) { - firstValue = pair.rhs; - rhsNull = false; - } else { - rhsNull = true; - } - } - } - }; - } - - @Override - public BufferAggregator factorizeBuffered(ColumnSelectorFactory metricFactory) - { - final ColumnValueSelector> selector = metricFactory.makeColumnValueSelector(name); - return new LongFirstBufferAggregator(null, null) - { - @Override - public void putValue(ByteBuffer buf, int position) - { - SerializablePair pair = selector.getObject(); - buf.putLong(position, pair.rhs); - } - - @Override - public void aggregate(ByteBuffer buf, int position) - { - SerializablePair pair = selector.getObject(); - long firstTime = buf.getLong(position); - if (pair.lhs < firstTime) { - if (pair.rhs != null) { - updateTimeWithValue(buf, position, pair.lhs); - } else { - updateTimeWithNull(buf, position, pair.lhs); - } - } - } - - @Override - public void inspectRuntimeShape(RuntimeShapeInspector inspector) - { - inspector.visit("selector", selector); - } - }; - } - }; + return new LongFirstAggregatorFactory(name, name); } @Override @@ -268,11 +222,19 @@ public byte[] getCacheKey() .array(); } + @Override + public String getComplexTypeName() + { + return SerializablePairLongLongSerde.TYPE_NAME; + } + + /** + * actual type is {@link SerializablePair} + */ @Override public ValueType getType() { - // if we don't pretend to be a primitive, group by v1 gets sad and doesn't work because no complex type serde - return ValueType.LONG; + return ValueType.COMPLEX; } @Override @@ -319,4 +281,5 @@ public String toString() ", fieldName='" + fieldName + '\'' + '}'; } + } diff --git a/processing/src/main/java/org/apache/druid/query/aggregation/first/LongFirstBufferAggregator.java b/processing/src/main/java/org/apache/druid/query/aggregation/first/LongFirstBufferAggregator.java index 582cda160153..ba41f1da70a6 100644 --- a/processing/src/main/java/org/apache/druid/query/aggregation/first/LongFirstBufferAggregator.java +++ b/processing/src/main/java/org/apache/druid/query/aggregation/first/LongFirstBufferAggregator.java @@ -19,16 +19,21 @@ package org.apache.druid.query.aggregation.first; -import org.apache.druid.collections.SerializablePair; +import org.apache.druid.query.aggregation.SerializablePairLongLong; import org.apache.druid.segment.BaseLongColumnValueSelector; +import org.apache.druid.segment.ColumnValueSelector; import java.nio.ByteBuffer; -public class LongFirstBufferAggregator extends NumericFirstBufferAggregator +public class LongFirstBufferAggregator extends NumericFirstBufferAggregator { - public LongFirstBufferAggregator(BaseLongColumnValueSelector timeSelector, BaseLongColumnValueSelector valueSelector) + public LongFirstBufferAggregator( + BaseLongColumnValueSelector timeSelector, + ColumnValueSelector valueSelector, + boolean needsFoldCheck + ) { - super(timeSelector, valueSelector); + super(timeSelector, valueSelector, needsFoldCheck); } @Override @@ -38,16 +43,22 @@ void initValue(ByteBuffer buf, int position) } @Override - void putValue(ByteBuffer buf, int position) + void putValue(ByteBuffer buf, int position, ColumnValueSelector valueSelector) { buf.putLong(position, valueSelector.getLong()); } + @Override + void putValue(ByteBuffer buf, int position, Number value) + { + buf.putLong(position, value.longValue()); + } + @Override public Object get(ByteBuffer buf, int position) { final boolean rhsNull = isValueNull(buf, position); - return new SerializablePair<>(buf.getLong(position), rhsNull ? null : buf.getLong(position + VALUE_OFFSET)); + return new SerializablePairLongLong(buf.getLong(position), rhsNull ? null : buf.getLong(position + VALUE_OFFSET)); } @Override diff --git a/processing/src/main/java/org/apache/druid/query/aggregation/first/NumericFirstAggregator.java b/processing/src/main/java/org/apache/druid/query/aggregation/first/NumericFirstAggregator.java index e7b60b9e5c81..3d4d90090b06 100644 --- a/processing/src/main/java/org/apache/druid/query/aggregation/first/NumericFirstAggregator.java +++ b/processing/src/main/java/org/apache/druid/query/aggregation/first/NumericFirstAggregator.java @@ -19,48 +19,86 @@ package org.apache.druid.query.aggregation.first; +import org.apache.druid.collections.SerializablePair; import org.apache.druid.common.config.NullHandling; import org.apache.druid.query.aggregation.Aggregator; import org.apache.druid.segment.BaseLongColumnValueSelector; -import org.apache.druid.segment.BaseNullableColumnValueSelector; +import org.apache.druid.segment.ColumnValueSelector; /** * Base type for on heap 'first' aggregator for primitive numeric column selectors */ -public abstract class NumericFirstAggregator implements Aggregator +public abstract class NumericFirstAggregator implements Aggregator { - private final boolean useDefault = NullHandling.replaceWithDefault(); + private static final boolean USE_DEFAULT = NullHandling.replaceWithDefault(); private final BaseLongColumnValueSelector timeSelector; - - final TSelector valueSelector; + private final ColumnValueSelector valueSelector; + private final boolean needsFoldCheck; long firstTime; boolean rhsNull; - public NumericFirstAggregator(BaseLongColumnValueSelector timeSelector, TSelector valueSelector) + public NumericFirstAggregator( + BaseLongColumnValueSelector timeSelector, + ColumnValueSelector valueSelector, + boolean needsFoldCheck + ) { this.timeSelector = timeSelector; this.valueSelector = valueSelector; + this.needsFoldCheck = needsFoldCheck; firstTime = Long.MAX_VALUE; - rhsNull = !useDefault; + rhsNull = !USE_DEFAULT; } /** * Store the current primitive typed 'first' value */ - abstract void setCurrentValue(); + abstract void setFirstValue(ColumnValueSelector valueSelector); + + /** + * Store a non-null first value + */ + abstract void setFirstValue(Number firstValue); @Override public void aggregate() { + if (needsFoldCheck) { + + // Need to read this first (before time), just in case it's a SerializablePairLongString (we don't know; it's + // detected at query time). + final Object object = valueSelector.getObject(); + + if (object instanceof SerializablePair) { + + // cast to Pair to support reindex from type such as doubleFirst into longFirst + final SerializablePair pair = (SerializablePair) object; + if (pair.lhs < firstTime) { + firstTime = pair.lhs; + + // rhs might be NULL under SQL-compatibility mode + if (pair.rhs == null) { + rhsNull = true; + } else { + rhsNull = false; + setFirstValue(pair.rhs); + } + } + + return; + } + } + long time = timeSelector.getLong(); if (time < firstTime) { firstTime = time; - if (useDefault || !valueSelector.isNull()) { - setCurrentValue(); + if (USE_DEFAULT || !valueSelector.isNull()) { + setFirstValue(valueSelector); rhsNull = false; } else { + setFirstValue(0); rhsNull = true; } } diff --git a/processing/src/main/java/org/apache/druid/query/aggregation/first/NumericFirstBufferAggregator.java b/processing/src/main/java/org/apache/druid/query/aggregation/first/NumericFirstBufferAggregator.java index ebb0a87169a0..ae878e5de90b 100644 --- a/processing/src/main/java/org/apache/druid/query/aggregation/first/NumericFirstBufferAggregator.java +++ b/processing/src/main/java/org/apache/druid/query/aggregation/first/NumericFirstBufferAggregator.java @@ -19,18 +19,19 @@ package org.apache.druid.query.aggregation.first; +import org.apache.druid.collections.SerializablePair; import org.apache.druid.common.config.NullHandling; import org.apache.druid.query.aggregation.BufferAggregator; import org.apache.druid.query.monomorphicprocessing.RuntimeShapeInspector; import org.apache.druid.segment.BaseLongColumnValueSelector; -import org.apache.druid.segment.BaseNullableColumnValueSelector; +import org.apache.druid.segment.ColumnValueSelector; import java.nio.ByteBuffer; /** * Base type for buffer based 'first' aggregator for primitive numeric column selectors */ -public abstract class NumericFirstBufferAggregator +public abstract class NumericFirstBufferAggregator implements BufferAggregator { static final int NULL_OFFSET = Long.BYTES; @@ -38,13 +39,18 @@ public abstract class NumericFirstBufferAggregator to support reindex such as doubleFirst into longFirst + final SerializablePair pair = (SerializablePair) object; + if (pair.lhs < firstTime) { + if (pair.rhs == null) { + // rhs might be NULL under SQL-compatibility mode + updateTimeWithNull(buf, position, pair.lhs); + } else { + updateTimeWithValue(buf, position, pair.lhs, pair.rhs); + } + } + return; + } + } + long time = timeSelector.getLong(); - long firstTime = buf.getLong(position); if (time < firstTime) { if (useDefault || !valueSelector.isNull()) { - updateTimeWithValue(buf, position, time); + updateTimeWithValue(buf, position, time, valueSelector); } else { updateTimeWithNull(buf, position, time); } diff --git a/processing/src/main/java/org/apache/druid/query/aggregation/first/StringFirstAggregatorFactory.java b/processing/src/main/java/org/apache/druid/query/aggregation/first/StringFirstAggregatorFactory.java index 32a35433da1c..ded2970478ae 100644 --- a/processing/src/main/java/org/apache/druid/query/aggregation/first/StringFirstAggregatorFactory.java +++ b/processing/src/main/java/org/apache/druid/query/aggregation/first/StringFirstAggregatorFactory.java @@ -158,7 +158,7 @@ public Aggregator factorize(ColumnSelectorFactory metricFactory) metricFactory.makeColumnValueSelector(ColumnHolder.TIME_COLUMN_NAME), valueSelector, maxStringBytes, - StringFirstLastUtils.selectorNeedsFoldCheck(valueSelector, metricFactory.getColumnCapabilities(fieldName)) + StringFirstLastUtils.selectorNeedsFoldCheck(valueSelector, metricFactory.getColumnCapabilities(fieldName), SerializablePairLongString.class) ); } } @@ -174,7 +174,7 @@ public BufferAggregator factorizeBuffered(ColumnSelectorFactory metricFactory) metricFactory.makeColumnValueSelector(ColumnHolder.TIME_COLUMN_NAME), valueSelector, maxStringBytes, - StringFirstLastUtils.selectorNeedsFoldCheck(valueSelector, metricFactory.getColumnCapabilities(fieldName)) + StringFirstLastUtils.selectorNeedsFoldCheck(valueSelector, metricFactory.getColumnCapabilities(fieldName), SerializablePairLongString.class) ); } } diff --git a/processing/src/main/java/org/apache/druid/query/aggregation/first/StringFirstLastUtils.java b/processing/src/main/java/org/apache/druid/query/aggregation/first/StringFirstLastUtils.java index 910cb9433581..ad8ba7fe993f 100644 --- a/processing/src/main/java/org/apache/druid/query/aggregation/first/StringFirstLastUtils.java +++ b/processing/src/main/java/org/apache/druid/query/aggregation/first/StringFirstLastUtils.java @@ -36,11 +36,16 @@ public class StringFirstLastUtils private static final int NULL_VALUE = -1; /** - * Returns whether a given value selector *might* contain SerializablePairLongString objects. + * Returns whether a given value selector *might* contain objects with given type + * + * @param pairClass should be one of the following {@link SerializablePairLongString } {@link org.apache.druid.query.aggregation.SerializablePairLongLong} + * {@link org.apache.druid.query.aggregation.SerializablePairLongDouble} + * {@link org.apache.druid.query.aggregation.SerializablePairLongFloat} */ public static boolean selectorNeedsFoldCheck( final BaseObjectColumnValueSelector valueSelector, - @Nullable final ColumnCapabilities valueSelectorCapabilities + @Nullable final ColumnCapabilities valueSelectorCapabilities, + Class pairClass ) { if (valueSelectorCapabilities != null && valueSelectorCapabilities.getType() != ValueType.COMPLEX) { @@ -55,8 +60,8 @@ public static boolean selectorNeedsFoldCheck( // Check if the selector class could possibly be a SerializablePairLongString (either a superclass or subclass). final Class clazz = valueSelector.classOfObject(); - return clazz.isAssignableFrom(SerializablePairLongString.class) - || SerializablePairLongString.class.isAssignableFrom(clazz); + return clazz.isAssignableFrom(pairClass) + || pairClass.isAssignableFrom(clazz); } @Nullable diff --git a/processing/src/main/java/org/apache/druid/query/aggregation/last/DoubleLastAggregator.java b/processing/src/main/java/org/apache/druid/query/aggregation/last/DoubleLastAggregator.java index 3f6a1506bad6..e19479b21e70 100644 --- a/processing/src/main/java/org/apache/druid/query/aggregation/last/DoubleLastAggregator.java +++ b/processing/src/main/java/org/apache/druid/query/aggregation/last/DoubleLastAggregator.java @@ -19,30 +19,38 @@ package org.apache.druid.query.aggregation.last; -import org.apache.druid.collections.SerializablePair; -import org.apache.druid.segment.BaseDoubleColumnValueSelector; +import org.apache.druid.query.aggregation.SerializablePairLongDouble; import org.apache.druid.segment.BaseLongColumnValueSelector; +import org.apache.druid.segment.ColumnValueSelector; -public class DoubleLastAggregator extends NumericLastAggregator +public class DoubleLastAggregator extends NumericLastAggregator { double lastValue; - public DoubleLastAggregator(BaseLongColumnValueSelector timeSelector, BaseDoubleColumnValueSelector valueSelector) + public DoubleLastAggregator(BaseLongColumnValueSelector timeSelector, + ColumnValueSelector valueSelector, + boolean needsFoldCheck) { - super(timeSelector, valueSelector); + super(timeSelector, valueSelector, needsFoldCheck); lastValue = 0; } @Override - void setCurrentValue() + void setLastValue(ColumnValueSelector valueSelector) { lastValue = valueSelector.getDouble(); } + @Override + void setLastValue(Number lastValue) + { + this.lastValue = lastValue.doubleValue(); + } + @Override public Object get() { - return new SerializablePair<>(lastTime, rhsNull ? null : lastValue); + return new SerializablePairLongDouble(lastTime, rhsNull ? null : lastValue); } @Override diff --git a/processing/src/main/java/org/apache/druid/query/aggregation/last/DoubleLastAggregatorFactory.java b/processing/src/main/java/org/apache/druid/query/aggregation/last/DoubleLastAggregatorFactory.java index 11a7e8d4b961..759966be5c29 100644 --- a/processing/src/main/java/org/apache/druid/query/aggregation/last/DoubleLastAggregatorFactory.java +++ b/processing/src/main/java/org/apache/druid/query/aggregation/last/DoubleLastAggregatorFactory.java @@ -21,19 +21,20 @@ import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.annotation.JsonTypeName; import com.google.common.base.Preconditions; import org.apache.druid.collections.SerializablePair; import org.apache.druid.java.util.common.StringUtils; -import org.apache.druid.java.util.common.UOE; import org.apache.druid.query.aggregation.AggregateCombiner; import org.apache.druid.query.aggregation.Aggregator; import org.apache.druid.query.aggregation.AggregatorFactory; import org.apache.druid.query.aggregation.AggregatorUtil; import org.apache.druid.query.aggregation.BufferAggregator; +import org.apache.druid.query.aggregation.SerializablePairLongDouble; +import org.apache.druid.query.aggregation.SerializablePairLongDoubleSerde; import org.apache.druid.query.aggregation.first.DoubleFirstAggregatorFactory; import org.apache.druid.query.aggregation.first.LongFirstAggregatorFactory; -import org.apache.druid.query.monomorphicprocessing.RuntimeShapeInspector; -import org.apache.druid.segment.BaseDoubleColumnValueSelector; +import org.apache.druid.query.aggregation.first.StringFirstLastUtils; import org.apache.druid.segment.ColumnSelectorFactory; import org.apache.druid.segment.ColumnValueSelector; import org.apache.druid.segment.NilColumnValueSelector; @@ -49,11 +50,13 @@ import java.util.Map; import java.util.Objects; +@JsonTypeName("doubleLast") public class DoubleLastAggregatorFactory extends AggregatorFactory { private static final Aggregator NIL_AGGREGATOR = new DoubleLastAggregator( NilColumnValueSelector.instance(), - NilColumnValueSelector.instance() + NilColumnValueSelector.instance(), + false ) { @Override @@ -65,7 +68,8 @@ public void aggregate() private static final BufferAggregator NIL_BUFFER_AGGREGATOR = new DoubleLastBufferAggregator( NilColumnValueSelector.instance(), - NilColumnValueSelector.instance() + NilColumnValueSelector.instance(), + false ) { @Override @@ -95,29 +99,39 @@ public DoubleLastAggregatorFactory( @Override public Aggregator factorize(ColumnSelectorFactory metricFactory) { - final BaseDoubleColumnValueSelector valueSelector = metricFactory.makeColumnValueSelector(fieldName); - if (valueSelector instanceof NilColumnValueSelector) { + final ColumnValueSelector selector = metricFactory.makeColumnValueSelector(fieldName); + if (selector instanceof NilColumnValueSelector) { return NIL_AGGREGATOR; - } else { - return new DoubleLastAggregator( - metricFactory.makeColumnValueSelector(ColumnHolder.TIME_COLUMN_NAME), - valueSelector - ); } + + return new DoubleLastAggregator( + metricFactory.makeColumnValueSelector(ColumnHolder.TIME_COLUMN_NAME), + selector, + StringFirstLastUtils.selectorNeedsFoldCheck( + selector, + metricFactory.getColumnCapabilities(fieldName), + SerializablePairLongDouble.class + ) + ); } @Override public BufferAggregator factorizeBuffered(ColumnSelectorFactory metricFactory) { - final BaseDoubleColumnValueSelector valueSelector = metricFactory.makeColumnValueSelector(fieldName); - if (valueSelector instanceof NilColumnValueSelector) { + final ColumnValueSelector selector = metricFactory.makeColumnValueSelector(fieldName); + if (selector instanceof NilColumnValueSelector) { return NIL_BUFFER_AGGREGATOR; - } else { - return new DoubleLastBufferAggregator( - metricFactory.makeColumnValueSelector(ColumnHolder.TIME_COLUMN_NAME), - valueSelector - ); } + + return new DoubleLastBufferAggregator( + metricFactory.makeColumnValueSelector(ColumnHolder.TIME_COLUMN_NAME), + selector, + StringFirstLastUtils.selectorNeedsFoldCheck( + selector, + metricFactory.getColumnCapabilities(fieldName), + SerializablePairLongDouble.class + ) + ); } @Override @@ -148,74 +162,13 @@ public Object combine(@Nullable Object lhs, @Nullable Object rhs) @Override public AggregateCombiner makeAggregateCombiner() { - throw new UOE("DoubleLastAggregatorFactory is not supported during ingestion for rollup"); + return new GenericLastAggregateCombiner(SerializablePairLongDouble.class); } @Override public AggregatorFactory getCombiningFactory() { - return new DoubleLastAggregatorFactory(name, name) - { - @Override - public Aggregator factorize(ColumnSelectorFactory metricFactory) - { - final ColumnValueSelector> selector = - metricFactory.makeColumnValueSelector(name); - return new DoubleLastAggregator(null, null) - { - @Override - public void aggregate() - { - SerializablePair pair = selector.getObject(); - if (pair.lhs >= lastTime) { - lastTime = pair.lhs; - if (pair.rhs != null) { - lastValue = pair.rhs; - rhsNull = false; - } else { - rhsNull = true; - } - } - } - }; - } - - @Override - public BufferAggregator factorizeBuffered(ColumnSelectorFactory metricFactory) - { - final ColumnValueSelector> selector = - metricFactory.makeColumnValueSelector(name); - return new DoubleLastBufferAggregator(null, null) - { - @Override - public void putValue(ByteBuffer buf, int position) - { - SerializablePair pair = selector.getObject(); - buf.putDouble(position, pair.rhs); - } - - @Override - public void aggregate(ByteBuffer buf, int position) - { - SerializablePair pair = selector.getObject(); - long lastTime = buf.getLong(position); - if (pair.lhs >= lastTime) { - if (pair.rhs != null) { - updateTimeWithValue(buf, position, pair.lhs); - } else { - updateTimeWithNull(buf, position, pair.lhs); - } - } - } - - @Override - public void inspectRuntimeShape(RuntimeShapeInspector inspector) - { - inspector.visit("selector", selector); - } - }; - } - }; + return new DoubleLastAggregatorFactory(name, name); } @Override @@ -271,11 +224,19 @@ public byte[] getCacheKey() .array(); } + @Override + public String getComplexTypeName() + { + return SerializablePairLongDoubleSerde.TYPE_NAME; + } + + /** + * actual type is {@link SerializablePair} + */ @Override public ValueType getType() { - // if we don't pretend to be a primitive, group by v1 gets sad and doesn't work because no complex type serde - return storeDoubleAsFloat ? ValueType.FLOAT : ValueType.DOUBLE; + return ValueType.COMPLEX; } @Override diff --git a/processing/src/main/java/org/apache/druid/query/aggregation/last/DoubleLastBufferAggregator.java b/processing/src/main/java/org/apache/druid/query/aggregation/last/DoubleLastBufferAggregator.java index 8acddce53a83..74c46d69aaad 100644 --- a/processing/src/main/java/org/apache/druid/query/aggregation/last/DoubleLastBufferAggregator.java +++ b/processing/src/main/java/org/apache/druid/query/aggregation/last/DoubleLastBufferAggregator.java @@ -19,20 +19,21 @@ package org.apache.druid.query.aggregation.last; -import org.apache.druid.collections.SerializablePair; -import org.apache.druid.segment.BaseDoubleColumnValueSelector; +import org.apache.druid.query.aggregation.SerializablePairLongDouble; import org.apache.druid.segment.BaseLongColumnValueSelector; +import org.apache.druid.segment.ColumnValueSelector; import java.nio.ByteBuffer; -public class DoubleLastBufferAggregator extends NumericLastBufferAggregator +public class DoubleLastBufferAggregator extends NumericLastBufferAggregator { public DoubleLastBufferAggregator( BaseLongColumnValueSelector timeSelector, - BaseDoubleColumnValueSelector valueSelector + ColumnValueSelector valueSelector, + boolean needsFoldCheck ) { - super(timeSelector, valueSelector); + super(timeSelector, valueSelector, needsFoldCheck); } @Override @@ -42,16 +43,25 @@ void initValue(ByteBuffer buf, int position) } @Override - void putValue(ByteBuffer buf, int position) + void putValue(ByteBuffer buf, int position, ColumnValueSelector valueSelector) { buf.putDouble(position, valueSelector.getDouble()); } + @Override + void putValue(ByteBuffer buf, int position, Number value) + { + buf.putDouble(position, value.doubleValue()); + } + @Override public Object get(ByteBuffer buf, int position) { final boolean rhsNull = isValueNull(buf, position); - return new SerializablePair<>(buf.getLong(position), rhsNull ? null : buf.getDouble(position + VALUE_OFFSET)); + return new SerializablePairLongDouble( + buf.getLong(position), + rhsNull ? null : buf.getDouble(position + VALUE_OFFSET) + ); } @Override diff --git a/processing/src/main/java/org/apache/druid/query/aggregation/last/FloatLastAggregator.java b/processing/src/main/java/org/apache/druid/query/aggregation/last/FloatLastAggregator.java index 1381ccb18b7a..b874cf1ac79e 100644 --- a/processing/src/main/java/org/apache/druid/query/aggregation/last/FloatLastAggregator.java +++ b/processing/src/main/java/org/apache/druid/query/aggregation/last/FloatLastAggregator.java @@ -19,30 +19,38 @@ package org.apache.druid.query.aggregation.last; -import org.apache.druid.collections.SerializablePair; -import org.apache.druid.segment.BaseFloatColumnValueSelector; +import org.apache.druid.query.aggregation.SerializablePairLongFloat; import org.apache.druid.segment.BaseLongColumnValueSelector; +import org.apache.druid.segment.ColumnValueSelector; -public class FloatLastAggregator extends NumericLastAggregator +public class FloatLastAggregator extends NumericLastAggregator { float lastValue; - public FloatLastAggregator(BaseLongColumnValueSelector timeSelector, BaseFloatColumnValueSelector valueSelector) + public FloatLastAggregator(BaseLongColumnValueSelector timeSelector, + ColumnValueSelector valueSelector, + boolean needsFoldCheck) { - super(timeSelector, valueSelector); + super(timeSelector, valueSelector, needsFoldCheck); lastValue = 0; } @Override - void setCurrentValue() + void setLastValue(ColumnValueSelector valueSelector) { lastValue = valueSelector.getFloat(); } + @Override + void setLastValue(Number lastValue) + { + this.lastValue = lastValue.floatValue(); + } + @Override public Object get() { - return new SerializablePair<>(lastTime, rhsNull ? null : lastValue); + return new SerializablePairLongFloat(lastTime, rhsNull ? null : lastValue); } @Override diff --git a/processing/src/main/java/org/apache/druid/query/aggregation/last/FloatLastAggregatorFactory.java b/processing/src/main/java/org/apache/druid/query/aggregation/last/FloatLastAggregatorFactory.java index 01d7808c830b..3fcba9087363 100644 --- a/processing/src/main/java/org/apache/druid/query/aggregation/last/FloatLastAggregatorFactory.java +++ b/processing/src/main/java/org/apache/druid/query/aggregation/last/FloatLastAggregatorFactory.java @@ -21,19 +21,20 @@ import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.annotation.JsonTypeName; import com.google.common.base.Preconditions; import org.apache.druid.collections.SerializablePair; import org.apache.druid.java.util.common.StringUtils; -import org.apache.druid.java.util.common.UOE; import org.apache.druid.query.aggregation.AggregateCombiner; import org.apache.druid.query.aggregation.Aggregator; import org.apache.druid.query.aggregation.AggregatorFactory; import org.apache.druid.query.aggregation.AggregatorUtil; import org.apache.druid.query.aggregation.BufferAggregator; +import org.apache.druid.query.aggregation.SerializablePairLongFloat; +import org.apache.druid.query.aggregation.SerializablePairLongFloatSerde; import org.apache.druid.query.aggregation.first.FloatFirstAggregatorFactory; import org.apache.druid.query.aggregation.first.LongFirstAggregatorFactory; -import org.apache.druid.query.monomorphicprocessing.RuntimeShapeInspector; -import org.apache.druid.segment.BaseFloatColumnValueSelector; +import org.apache.druid.query.aggregation.first.StringFirstLastUtils; import org.apache.druid.segment.ColumnSelectorFactory; import org.apache.druid.segment.ColumnValueSelector; import org.apache.druid.segment.NilColumnValueSelector; @@ -49,11 +50,13 @@ import java.util.Map; import java.util.Objects; +@JsonTypeName("floatLast") public class FloatLastAggregatorFactory extends AggregatorFactory { private static final Aggregator NIL_AGGREGATOR = new FloatLastAggregator( NilColumnValueSelector.instance(), - NilColumnValueSelector.instance() + NilColumnValueSelector.instance(), + false ) { @Override @@ -65,7 +68,8 @@ public void aggregate() private static final BufferAggregator NIL_BUFFER_AGGREGATOR = new FloatLastBufferAggregator( NilColumnValueSelector.instance(), - NilColumnValueSelector.instance() + NilColumnValueSelector.instance(), + false ) { @Override @@ -93,29 +97,39 @@ public FloatLastAggregatorFactory( @Override public Aggregator factorize(ColumnSelectorFactory metricFactory) { - final BaseFloatColumnValueSelector valueSelector = metricFactory.makeColumnValueSelector(fieldName); + final ColumnValueSelector valueSelector = metricFactory.makeColumnValueSelector(fieldName); if (valueSelector instanceof NilColumnValueSelector) { return NIL_AGGREGATOR; - } else { - return new FloatLastAggregator( - metricFactory.makeColumnValueSelector(ColumnHolder.TIME_COLUMN_NAME), - valueSelector - ); } + + return new FloatLastAggregator( + metricFactory.makeColumnValueSelector(ColumnHolder.TIME_COLUMN_NAME), + valueSelector, + StringFirstLastUtils.selectorNeedsFoldCheck( + valueSelector, + metricFactory.getColumnCapabilities(fieldName), + SerializablePairLongFloat.class + ) + ); } @Override public BufferAggregator factorizeBuffered(ColumnSelectorFactory metricFactory) { - final BaseFloatColumnValueSelector valueSelector = metricFactory.makeColumnValueSelector(fieldName); - if (valueSelector instanceof NilColumnValueSelector) { + final ColumnValueSelector selector = metricFactory.makeColumnValueSelector(fieldName); + if (selector instanceof NilColumnValueSelector) { return NIL_BUFFER_AGGREGATOR; - } else { - return new FloatLastBufferAggregator( - metricFactory.makeColumnValueSelector(ColumnHolder.TIME_COLUMN_NAME), - valueSelector - ); } + + return new FloatLastBufferAggregator( + metricFactory.makeColumnValueSelector(ColumnHolder.TIME_COLUMN_NAME), + selector, + StringFirstLastUtils.selectorNeedsFoldCheck( + selector, + metricFactory.getColumnCapabilities(fieldName), + SerializablePairLongFloat.class + ) + ); } @Override @@ -146,72 +160,13 @@ public Object combine(@Nullable Object lhs, @Nullable Object rhs) @Override public AggregateCombiner makeAggregateCombiner() { - throw new UOE("FloatLastAggregatorFactory is not supported during ingestion for rollup"); + return new GenericLastAggregateCombiner(SerializablePairLongFloat.class); } @Override public AggregatorFactory getCombiningFactory() { - return new FloatLastAggregatorFactory(name, name) - { - @Override - public Aggregator factorize(ColumnSelectorFactory metricFactory) - { - ColumnValueSelector> selector = metricFactory.makeColumnValueSelector(name); - return new FloatLastAggregator(null, null) - { - @Override - public void aggregate() - { - SerializablePair pair = selector.getObject(); - if (pair.lhs >= lastTime) { - lastTime = pair.lhs; - if (pair.rhs != null) { - lastValue = pair.rhs; - rhsNull = false; - } else { - rhsNull = true; - } - } - } - }; - } - - @Override - public BufferAggregator factorizeBuffered(ColumnSelectorFactory metricFactory) - { - ColumnValueSelector> selector = metricFactory.makeColumnValueSelector(name); - return new FloatLastBufferAggregator(null, null) - { - @Override - public void putValue(ByteBuffer buf, int position) - { - SerializablePair pair = selector.getObject(); - buf.putFloat(position, pair.rhs); - } - - @Override - public void aggregate(ByteBuffer buf, int position) - { - SerializablePair pair = selector.getObject(); - long lastTime = buf.getLong(position); - if (pair.lhs >= lastTime) { - if (pair.rhs != null) { - updateTimeWithValue(buf, position, pair.lhs); - } else { - updateTimeWithNull(buf, position, pair.lhs); - } - } - } - - @Override - public void inspectRuntimeShape(RuntimeShapeInspector inspector) - { - inspector.visit("selector", selector); - } - }; - } - }; + return new FloatLastAggregatorFactory(name, name); } @Override @@ -268,11 +223,19 @@ public byte[] getCacheKey() .array(); } + @Override + public String getComplexTypeName() + { + return SerializablePairLongFloatSerde.TYPE_NAME; + } + + /** + * actual type is {@link SerializablePair} + */ @Override public ValueType getType() { - // if we don't pretend to be a primitive, group by v1 gets sad and doesn't work because no complex type serde - return ValueType.FLOAT; + return ValueType.COMPLEX; } @Override diff --git a/processing/src/main/java/org/apache/druid/query/aggregation/last/FloatLastBufferAggregator.java b/processing/src/main/java/org/apache/druid/query/aggregation/last/FloatLastBufferAggregator.java index 95ad6fe5c5ee..4cf443707af7 100644 --- a/processing/src/main/java/org/apache/druid/query/aggregation/last/FloatLastBufferAggregator.java +++ b/processing/src/main/java/org/apache/druid/query/aggregation/last/FloatLastBufferAggregator.java @@ -19,20 +19,21 @@ package org.apache.druid.query.aggregation.last; -import org.apache.druid.collections.SerializablePair; -import org.apache.druid.segment.BaseFloatColumnValueSelector; +import org.apache.druid.query.aggregation.SerializablePairLongFloat; import org.apache.druid.segment.BaseLongColumnValueSelector; +import org.apache.druid.segment.ColumnValueSelector; import java.nio.ByteBuffer; -public class FloatLastBufferAggregator extends NumericLastBufferAggregator +public class FloatLastBufferAggregator extends NumericLastBufferAggregator { public FloatLastBufferAggregator( BaseLongColumnValueSelector timeSelector, - BaseFloatColumnValueSelector valueSelector + ColumnValueSelector valueSelector, + boolean needsFoldCheck ) { - super(timeSelector, valueSelector); + super(timeSelector, valueSelector, needsFoldCheck); } @Override @@ -42,16 +43,22 @@ void initValue(ByteBuffer buf, int position) } @Override - void putValue(ByteBuffer buf, int position) + void putValue(ByteBuffer buf, int position, ColumnValueSelector valueSelector) { buf.putFloat(position, valueSelector.getFloat()); } + @Override + void putValue(ByteBuffer buf, int position, Number value) + { + buf.putFloat(position, value.floatValue()); + } + @Override public Object get(ByteBuffer buf, int position) { final boolean rhsNull = isValueNull(buf, position); - return new SerializablePair<>(buf.getLong(position), rhsNull ? null : buf.getFloat(position + VALUE_OFFSET)); + return new SerializablePairLongFloat(buf.getLong(position), rhsNull ? null : buf.getFloat(position + VALUE_OFFSET)); } @Override diff --git a/processing/src/main/java/org/apache/druid/query/aggregation/last/GenericLastAggregateCombiner.java b/processing/src/main/java/org/apache/druid/query/aggregation/last/GenericLastAggregateCombiner.java new file mode 100644 index 000000000000..308b37308111 --- /dev/null +++ b/processing/src/main/java/org/apache/druid/query/aggregation/last/GenericLastAggregateCombiner.java @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.query.aggregation.last; + +import com.google.common.primitives.Longs; +import org.apache.druid.collections.SerializablePair; +import org.apache.druid.query.aggregation.ObjectAggregateCombiner; +import org.apache.druid.segment.ColumnValueSelector; + +import javax.annotation.Nullable; + +public class GenericLastAggregateCombiner> + extends ObjectAggregateCombiner +{ + private T lastValue; + + private final Class pairClass; + + public GenericLastAggregateCombiner(Class pairClass) + { + this.pairClass = pairClass; + } + + @Override + public void reset(ColumnValueSelector selector) + { + lastValue = (T) selector.getObject(); + } + + @Override + public void fold(ColumnValueSelector selector) + { + T newValue = (T) selector.getObject(); + + if (Longs.compare(lastValue.lhs, newValue.lhs) <= 0) { + lastValue = newValue; + } + } + + @Nullable + @Override + public T getObject() + { + return lastValue; + } + + @Override + public Class classOfObject() + { + return this.pairClass; + } +} diff --git a/processing/src/main/java/org/apache/druid/query/aggregation/last/LongLastAggregator.java b/processing/src/main/java/org/apache/druid/query/aggregation/last/LongLastAggregator.java index 59a159d2d875..b9aaa358c606 100644 --- a/processing/src/main/java/org/apache/druid/query/aggregation/last/LongLastAggregator.java +++ b/processing/src/main/java/org/apache/druid/query/aggregation/last/LongLastAggregator.java @@ -19,29 +19,38 @@ package org.apache.druid.query.aggregation.last; -import org.apache.druid.collections.SerializablePair; +import org.apache.druid.query.aggregation.SerializablePairLongLong; import org.apache.druid.segment.BaseLongColumnValueSelector; +import org.apache.druid.segment.ColumnValueSelector; -public class LongLastAggregator extends NumericLastAggregator +public class LongLastAggregator extends NumericLastAggregator { long lastValue; - public LongLastAggregator(BaseLongColumnValueSelector timeSelector, BaseLongColumnValueSelector valueSelector) + public LongLastAggregator(BaseLongColumnValueSelector timeSelector, + ColumnValueSelector valueSelector, + boolean needsFoldCheck) { - super(timeSelector, valueSelector); + super(timeSelector, valueSelector, needsFoldCheck); lastValue = 0; } @Override - void setCurrentValue() + void setLastValue(ColumnValueSelector valueSelector) { lastValue = valueSelector.getLong(); } + @Override + void setLastValue(Number lastValue) + { + this.lastValue = lastValue.longValue(); + } + @Override public Object get() { - return new SerializablePair<>(lastTime, rhsNull ? null : lastValue); + return new SerializablePairLongLong(lastTime, rhsNull ? null : lastValue); } @Override diff --git a/processing/src/main/java/org/apache/druid/query/aggregation/last/LongLastAggregatorFactory.java b/processing/src/main/java/org/apache/druid/query/aggregation/last/LongLastAggregatorFactory.java index 5a8964dc3da2..b7afae707389 100644 --- a/processing/src/main/java/org/apache/druid/query/aggregation/last/LongLastAggregatorFactory.java +++ b/processing/src/main/java/org/apache/druid/query/aggregation/last/LongLastAggregatorFactory.java @@ -21,18 +21,19 @@ import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.annotation.JsonTypeName; import com.google.common.base.Preconditions; import org.apache.druid.collections.SerializablePair; import org.apache.druid.java.util.common.StringUtils; -import org.apache.druid.java.util.common.UOE; import org.apache.druid.query.aggregation.AggregateCombiner; import org.apache.druid.query.aggregation.Aggregator; import org.apache.druid.query.aggregation.AggregatorFactory; import org.apache.druid.query.aggregation.AggregatorUtil; import org.apache.druid.query.aggregation.BufferAggregator; +import org.apache.druid.query.aggregation.SerializablePairLongLong; +import org.apache.druid.query.aggregation.SerializablePairLongLongSerde; import org.apache.druid.query.aggregation.first.LongFirstAggregatorFactory; -import org.apache.druid.query.monomorphicprocessing.RuntimeShapeInspector; -import org.apache.druid.segment.BaseLongColumnValueSelector; +import org.apache.druid.query.aggregation.first.StringFirstLastUtils; import org.apache.druid.segment.ColumnSelectorFactory; import org.apache.druid.segment.ColumnValueSelector; import org.apache.druid.segment.NilColumnValueSelector; @@ -48,11 +49,13 @@ import java.util.Map; import java.util.Objects; +@JsonTypeName("longLast") public class LongLastAggregatorFactory extends AggregatorFactory { private static final Aggregator NIL_AGGREGATOR = new LongLastAggregator( NilColumnValueSelector.instance(), - NilColumnValueSelector.instance() + NilColumnValueSelector.instance(), + false ) { @Override @@ -64,7 +67,8 @@ public void aggregate() private static final BufferAggregator NIL_BUFFER_AGGREGATOR = new LongLastBufferAggregator( NilColumnValueSelector.instance(), - NilColumnValueSelector.instance() + NilColumnValueSelector.instance(), + false ) { @Override @@ -92,29 +96,39 @@ public LongLastAggregatorFactory( @Override public Aggregator factorize(ColumnSelectorFactory metricFactory) { - final BaseLongColumnValueSelector valueSelector = metricFactory.makeColumnValueSelector(fieldName); + final ColumnValueSelector valueSelector = metricFactory.makeColumnValueSelector(fieldName); if (valueSelector instanceof NilColumnValueSelector) { return NIL_AGGREGATOR; - } else { - return new LongLastAggregator( - metricFactory.makeColumnValueSelector(ColumnHolder.TIME_COLUMN_NAME), - valueSelector - ); } + + return new LongLastAggregator( + metricFactory.makeColumnValueSelector(ColumnHolder.TIME_COLUMN_NAME), + valueSelector, + StringFirstLastUtils.selectorNeedsFoldCheck( + valueSelector, + metricFactory.getColumnCapabilities(fieldName), + SerializablePairLongLong.class + ) + ); } @Override public BufferAggregator factorizeBuffered(ColumnSelectorFactory metricFactory) { - final BaseLongColumnValueSelector valueSelector = metricFactory.makeColumnValueSelector(fieldName); - if (valueSelector instanceof NilColumnValueSelector) { + final ColumnValueSelector selector = metricFactory.makeColumnValueSelector(fieldName); + if (selector instanceof NilColumnValueSelector) { return NIL_BUFFER_AGGREGATOR; - } else { - return new LongLastBufferAggregator( - metricFactory.makeColumnValueSelector(ColumnHolder.TIME_COLUMN_NAME), - valueSelector - ); } + + return new LongLastBufferAggregator( + metricFactory.makeColumnValueSelector(ColumnHolder.TIME_COLUMN_NAME), + selector, + StringFirstLastUtils.selectorNeedsFoldCheck( + selector, + metricFactory.getColumnCapabilities(fieldName), + SerializablePairLongLong.class + ) + ); } @Override @@ -145,72 +159,13 @@ public Object combine(@Nullable Object lhs, @Nullable Object rhs) @Override public AggregateCombiner makeAggregateCombiner() { - throw new UOE("LongLastAggregatorFactory is not supported during ingestion for rollup"); + return new GenericLastAggregateCombiner(SerializablePairLongLong.class); } @Override public AggregatorFactory getCombiningFactory() { - return new LongLastAggregatorFactory(name, name) - { - @Override - public Aggregator factorize(ColumnSelectorFactory metricFactory) - { - final ColumnValueSelector> selector = metricFactory.makeColumnValueSelector(name); - return new LongLastAggregator(null, null) - { - @Override - public void aggregate() - { - SerializablePair pair = selector.getObject(); - if (pair.lhs >= lastTime) { - lastTime = pair.lhs; - if (pair.rhs != null) { - lastValue = pair.rhs; - rhsNull = false; - } else { - rhsNull = true; - } - } - } - }; - } - - @Override - public BufferAggregator factorizeBuffered(ColumnSelectorFactory metricFactory) - { - final ColumnValueSelector> selector = metricFactory.makeColumnValueSelector(name); - return new LongLastBufferAggregator(null, null) - { - @Override - public void putValue(ByteBuffer buf, int position) - { - SerializablePair pair = selector.getObject(); - buf.putLong(position, pair.rhs); - } - - @Override - public void aggregate(ByteBuffer buf, int position) - { - SerializablePair pair = selector.getObject(); - long lastTime = buf.getLong(position); - if (pair.lhs >= lastTime) { - if (pair.rhs != null) { - updateTimeWithValue(buf, position, pair.lhs); - } else { - updateTimeWithNull(buf, position, pair.lhs); - } - } - } - - @Override - public void inspectRuntimeShape(RuntimeShapeInspector inspector) - { - inspector.visit("selector", selector); - } - }; - } - }; + return new LongLastAggregatorFactory(name, name); } @Override @@ -266,11 +221,19 @@ public byte[] getCacheKey() .array(); } + @Override + public String getComplexTypeName() + { + return SerializablePairLongLongSerde.TYPE_NAME; + } + + /** + * actual type is {@link SerializablePair} + */ @Override public ValueType getType() { - // if we don't pretend to be a primitive, group by v1 gets sad and doesn't work because no complex type serde - return ValueType.LONG; + return ValueType.COMPLEX; } @Override diff --git a/processing/src/main/java/org/apache/druid/query/aggregation/last/LongLastBufferAggregator.java b/processing/src/main/java/org/apache/druid/query/aggregation/last/LongLastBufferAggregator.java index 981ba3e2f665..80553f4fcc0a 100644 --- a/processing/src/main/java/org/apache/druid/query/aggregation/last/LongLastBufferAggregator.java +++ b/processing/src/main/java/org/apache/druid/query/aggregation/last/LongLastBufferAggregator.java @@ -19,16 +19,21 @@ package org.apache.druid.query.aggregation.last; -import org.apache.druid.collections.SerializablePair; +import org.apache.druid.query.aggregation.SerializablePairLongLong; import org.apache.druid.segment.BaseLongColumnValueSelector; +import org.apache.druid.segment.ColumnValueSelector; import java.nio.ByteBuffer; -public class LongLastBufferAggregator extends NumericLastBufferAggregator +public class LongLastBufferAggregator extends NumericLastBufferAggregator { - public LongLastBufferAggregator(BaseLongColumnValueSelector timeSelector, BaseLongColumnValueSelector valueSelector) + public LongLastBufferAggregator( + BaseLongColumnValueSelector timeSelector, + ColumnValueSelector valueSelector, + boolean needsFoldCheck + ) { - super(timeSelector, valueSelector); + super(timeSelector, valueSelector, needsFoldCheck); } @Override @@ -38,16 +43,22 @@ void initValue(ByteBuffer buf, int position) } @Override - void putValue(ByteBuffer buf, int position) + void putValue(ByteBuffer buf, int position, ColumnValueSelector valueSelector) { buf.putLong(position, valueSelector.getLong()); } + @Override + void putValue(ByteBuffer buf, int position, Number value) + { + buf.putLong(position, value.longValue()); + } + @Override public Object get(ByteBuffer buf, int position) { boolean rhsNull = isValueNull(buf, position); - return new SerializablePair<>(buf.getLong(position), rhsNull ? null : buf.getLong(position + VALUE_OFFSET)); + return new SerializablePairLongLong(buf.getLong(position), rhsNull ? null : buf.getLong(position + VALUE_OFFSET)); } @Override diff --git a/processing/src/main/java/org/apache/druid/query/aggregation/last/NumericLastAggregator.java b/processing/src/main/java/org/apache/druid/query/aggregation/last/NumericLastAggregator.java index 6506f976aeff..9f25eea71cab 100644 --- a/processing/src/main/java/org/apache/druid/query/aggregation/last/NumericLastAggregator.java +++ b/processing/src/main/java/org/apache/druid/query/aggregation/last/NumericLastAggregator.java @@ -19,44 +19,76 @@ package org.apache.druid.query.aggregation.last; +import org.apache.druid.collections.SerializablePair; import org.apache.druid.common.config.NullHandling; import org.apache.druid.query.aggregation.Aggregator; import org.apache.druid.segment.BaseLongColumnValueSelector; -import org.apache.druid.segment.BaseNullableColumnValueSelector; +import org.apache.druid.segment.ColumnValueSelector; /** * Base type for on heap 'last' aggregator for primitive numeric column selectors.. - * + *

* This could probably share a base class with {@link org.apache.druid.query.aggregation.first.NumericFirstAggregator} */ -public abstract class NumericLastAggregator implements Aggregator +public abstract class NumericLastAggregator implements Aggregator { - private final boolean useDefault = NullHandling.replaceWithDefault(); + private static final boolean USE_DEFAULT = NullHandling.replaceWithDefault(); private final BaseLongColumnValueSelector timeSelector; + private final boolean needsFoldCheck; + private final ColumnValueSelector valueSelector; - final TSelector valueSelector; long lastTime; boolean rhsNull; - public NumericLastAggregator(BaseLongColumnValueSelector timeSelector, TSelector valueSelector) + public NumericLastAggregator( + BaseLongColumnValueSelector timeSelector, + ColumnValueSelector valueSelector, + boolean needsFoldCheck + ) { this.timeSelector = timeSelector; this.valueSelector = valueSelector; + this.needsFoldCheck = needsFoldCheck; lastTime = Long.MIN_VALUE; - rhsNull = !useDefault; + rhsNull = !USE_DEFAULT; } @Override public void aggregate() { + if (needsFoldCheck) { + // Need to read this first (before time), just in case it's a SerializablePair (we don't know; it's + // detected at query time). + final Object object = valueSelector.getObject(); + + if (object instanceof SerializablePair) { + + // cast to Pair to support reindex from type such as doubleFirst into another type(longFirst) + final SerializablePair pair = (SerializablePair) object; + if (pair.lhs >= lastTime) { + lastTime = pair.lhs; + + // rhs might be NULL under SQL-compatibility mode + if (pair.rhs == null) { + rhsNull = true; + } else { + rhsNull = false; + setLastValue(pair.rhs); + } + } + return; + } + } + long time = timeSelector.getLong(); if (time >= lastTime) { lastTime = time; - if (useDefault || !valueSelector.isNull()) { - setCurrentValue(); + if (USE_DEFAULT || !valueSelector.isNull()) { + setLastValue(valueSelector); rhsNull = false; } else { + setLastValue(0); rhsNull = true; } } @@ -71,5 +103,7 @@ public void close() /** * Store the current primitive typed 'first' value */ - abstract void setCurrentValue(); + abstract void setLastValue(ColumnValueSelector valueSelector); + + abstract void setLastValue(Number lastValue); } diff --git a/processing/src/main/java/org/apache/druid/query/aggregation/last/NumericLastBufferAggregator.java b/processing/src/main/java/org/apache/druid/query/aggregation/last/NumericLastBufferAggregator.java index 7c90aadafb5f..f416255da230 100644 --- a/processing/src/main/java/org/apache/druid/query/aggregation/last/NumericLastBufferAggregator.java +++ b/processing/src/main/java/org/apache/druid/query/aggregation/last/NumericLastBufferAggregator.java @@ -19,11 +19,12 @@ package org.apache.druid.query.aggregation.last; +import org.apache.druid.collections.SerializablePair; import org.apache.druid.common.config.NullHandling; import org.apache.druid.query.aggregation.BufferAggregator; import org.apache.druid.query.monomorphicprocessing.RuntimeShapeInspector; import org.apache.druid.segment.BaseLongColumnValueSelector; -import org.apache.druid.segment.BaseNullableColumnValueSelector; +import org.apache.druid.segment.ColumnValueSelector; import java.nio.ByteBuffer; @@ -33,7 +34,7 @@ * This could probably share a base type with * {@link org.apache.druid.query.aggregation.first.NumericFirstBufferAggregator} ... */ -public abstract class NumericLastBufferAggregator +public abstract class NumericLastBufferAggregator implements BufferAggregator { static final int NULL_OFFSET = Long.BYTES; @@ -41,13 +42,16 @@ public abstract class NumericLastBufferAggregator to support reindex such as doubleLast into longLast + final SerializablePair pair = (SerializablePair) object; + if (pair.lhs >= lastTime) { + if (pair.rhs == null) { + // rhs might be NULL under SQL-compatibility mode + updateTimeWithNull(buf, position, pair.lhs); + } else { + updateTimeWithValue(buf, position, pair.lhs, pair.rhs); + } + } + return; + } + } + long time = timeSelector.getLong(); - long lastTime = buf.getLong(position); + if (time >= lastTime) { if (useDefault || !valueSelector.isNull()) { - updateTimeWithValue(buf, position, time); + updateTimeWithValue(buf, position, time, valueSelector); } else { updateTimeWithNull(buf, position, time); } diff --git a/processing/src/main/java/org/apache/druid/query/aggregation/last/StringLastAggregatorFactory.java b/processing/src/main/java/org/apache/druid/query/aggregation/last/StringLastAggregatorFactory.java index 29543518f66c..6c8576e2b468 100644 --- a/processing/src/main/java/org/apache/druid/query/aggregation/last/StringLastAggregatorFactory.java +++ b/processing/src/main/java/org/apache/druid/query/aggregation/last/StringLastAggregatorFactory.java @@ -115,7 +115,7 @@ public Aggregator factorize(ColumnSelectorFactory metricFactory) metricFactory.makeColumnValueSelector(ColumnHolder.TIME_COLUMN_NAME), valueSelector, maxStringBytes, - StringFirstLastUtils.selectorNeedsFoldCheck(valueSelector, metricFactory.getColumnCapabilities(fieldName)) + StringFirstLastUtils.selectorNeedsFoldCheck(valueSelector, metricFactory.getColumnCapabilities(fieldName), SerializablePairLongString.class) ); } } @@ -131,7 +131,7 @@ public BufferAggregator factorizeBuffered(ColumnSelectorFactory metricFactory) metricFactory.makeColumnValueSelector(ColumnHolder.TIME_COLUMN_NAME), valueSelector, maxStringBytes, - StringFirstLastUtils.selectorNeedsFoldCheck(valueSelector, metricFactory.getColumnCapabilities(fieldName)) + StringFirstLastUtils.selectorNeedsFoldCheck(valueSelector, metricFactory.getColumnCapabilities(fieldName), SerializablePairLongString.class) ); } } diff --git a/processing/src/main/java/org/apache/druid/segment/serde/ComplexMetrics.java b/processing/src/main/java/org/apache/druid/segment/serde/ComplexMetrics.java index 4f40ff6bd83d..d79e1a44ce8c 100644 --- a/processing/src/main/java/org/apache/druid/segment/serde/ComplexMetrics.java +++ b/processing/src/main/java/org/apache/druid/segment/serde/ComplexMetrics.java @@ -37,6 +37,14 @@ public static ComplexMetricSerde getSerdeForType(String type) return COMPLEX_SERIALIZERS.get(type); } + /** + * Register a serde by its type name + */ + public static void registerSerde(ComplexMetricSerde serde) + { + registerSerde(serde.getTypeName(), serde); + } + /** * Register a serde name -> ComplexMetricSerde mapping. * diff --git a/processing/src/test/java/org/apache/druid/query/aggregation/AggregatorFactoryTest.java b/processing/src/test/java/org/apache/druid/query/aggregation/AggregatorFactoryTest.java index 3aa9aa3ffeb8..3136aa04cb22 100644 --- a/processing/src/test/java/org/apache/druid/query/aggregation/AggregatorFactoryTest.java +++ b/processing/src/test/java/org/apache/druid/query/aggregation/AggregatorFactoryTest.java @@ -209,21 +209,21 @@ public void testResultArraySignature() .add("longSum", ValueType.LONG) .add("longMin", ValueType.LONG) .add("longMax", ValueType.LONG) - .add("longFirst", ValueType.LONG) - .add("longLast", ValueType.LONG) + .add("longFirst", null) + .add("longLast", null) .add("longAny", ValueType.LONG) .add("doubleSum", ValueType.DOUBLE) .add("doubleMin", ValueType.DOUBLE) .add("doubleMax", ValueType.DOUBLE) - .add("doubleFirst", ValueType.DOUBLE) - .add("doubleLast", ValueType.DOUBLE) + .add("doubleFirst", null) + .add("doubleLast", null) .add("doubleAny", ValueType.DOUBLE) .add("doubleMean", null) .add("floatSum", ValueType.FLOAT) .add("floatMin", ValueType.FLOAT) .add("floatMax", ValueType.FLOAT) - .add("floatFirst", ValueType.FLOAT) - .add("floatLast", ValueType.FLOAT) + .add("floatFirst", null) + .add("floatLast", null) .add("floatAny", ValueType.FLOAT) .add("stringFirst", null) .add("stringLast", null) diff --git a/processing/src/test/java/org/apache/druid/query/aggregation/first/DoubleFirstAggregationTest.java b/processing/src/test/java/org/apache/druid/query/aggregation/first/DoubleFirstAggregationTest.java index c85b6b11c779..14f8d871a0b5 100644 --- a/processing/src/test/java/org/apache/druid/query/aggregation/first/DoubleFirstAggregationTest.java +++ b/processing/src/test/java/org/apache/druid/query/aggregation/first/DoubleFirstAggregationTest.java @@ -22,6 +22,7 @@ import org.apache.druid.collections.SerializablePair; import org.apache.druid.jackson.DefaultObjectMapper; import org.apache.druid.java.util.common.Pair; +import org.apache.druid.query.aggregation.AggregateCombiner; import org.apache.druid.query.aggregation.Aggregator; import org.apache.druid.query.aggregation.AggregatorFactory; import org.apache.druid.query.aggregation.BufferAggregator; @@ -29,7 +30,9 @@ import org.apache.druid.query.aggregation.TestLongColumnSelector; import org.apache.druid.query.aggregation.TestObjectColumnSelector; import org.apache.druid.segment.ColumnSelectorFactory; +import org.apache.druid.segment.column.ColumnCapabilitiesImpl; import org.apache.druid.segment.column.ColumnHolder; +import org.apache.druid.segment.column.ValueType; import org.apache.druid.testing.InitializedNullHandlingTest; import org.easymock.EasyMock; import org.junit.Assert; @@ -67,14 +70,16 @@ public void setup() objectSelector = new TestObjectColumnSelector<>(pairs); colSelectorFactory = EasyMock.createMock(ColumnSelectorFactory.class); EasyMock.expect(colSelectorFactory.makeColumnValueSelector(ColumnHolder.TIME_COLUMN_NAME)).andReturn(timeSelector); - EasyMock.expect(colSelectorFactory.makeColumnValueSelector("nilly")).andReturn(valueSelector); - EasyMock.expect(colSelectorFactory.makeColumnValueSelector("billy")).andReturn(objectSelector); - EasyMock.replay(colSelectorFactory); + EasyMock.expect(colSelectorFactory.makeColumnValueSelector("nilly")).andReturn(valueSelector).atLeastOnce(); + EasyMock.expect(colSelectorFactory.makeColumnValueSelector("billy")).andReturn(objectSelector).atLeastOnce(); } @Test public void testDoubleFirstAggregator() { + EasyMock.expect(colSelectorFactory.getColumnCapabilities("nilly")).andReturn(new ColumnCapabilitiesImpl().setType(ValueType.DOUBLE)); + EasyMock.replay(colSelectorFactory); + Aggregator agg = doubleFirstAggFactory.factorize(colSelectorFactory); aggregate(agg); @@ -93,6 +98,9 @@ public void testDoubleFirstAggregator() @Test public void testDoubleFirstBufferAggregator() { + EasyMock.expect(colSelectorFactory.getColumnCapabilities("nilly")).andReturn(new ColumnCapabilitiesImpl().setType(ValueType.DOUBLE)); + EasyMock.replay(colSelectorFactory); + BufferAggregator agg = doubleFirstAggFactory.factorizeBuffered( colSelectorFactory); @@ -148,6 +156,9 @@ public void testComparatorWithNulls() @Test public void testDoubleFirstCombiningAggregator() { + EasyMock.expect(colSelectorFactory.getColumnCapabilities("billy")).andReturn(new ColumnCapabilitiesImpl().setType(ValueType.COMPLEX)); + EasyMock.replay(colSelectorFactory); + Aggregator agg = combiningAggFactory.factorize(colSelectorFactory); aggregate(agg); @@ -167,6 +178,9 @@ public void testDoubleFirstCombiningAggregator() @Test public void testDoubleFirstCombiningBufferAggregator() { + EasyMock.expect(colSelectorFactory.getColumnCapabilities("billy")).andReturn(new ColumnCapabilitiesImpl().setType(ValueType.COMPLEX)); + EasyMock.replay(colSelectorFactory); + BufferAggregator agg = combiningAggFactory.factorizeBuffered( colSelectorFactory); @@ -196,6 +210,40 @@ public void testSerde() throws Exception Assert.assertEquals(doubleFirstAggFactory, mapper.readValue(doubleSpecJson, AggregatorFactory.class)); } + @Test + public void testDoubleFirstAggregateCombiner() + { + AggregateCombiner doubleFirstAggregateCombiner = combiningAggFactory.makeAggregateCombiner(); + + SerializablePair[] inputPairs = { + new SerializablePair<>(5L, 134.3d), + new SerializablePair<>(4L, 1232.212d), + new SerializablePair<>(3L, 18d), + new SerializablePair<>(6L, 233.5232d) + }; + TestObjectColumnSelector columnSelector = new TestObjectColumnSelector<>(inputPairs); + doubleFirstAggregateCombiner.reset(columnSelector); + Assert.assertEquals(inputPairs[0], doubleFirstAggregateCombiner.getObject()); + + // inputPairs[1] has lower time value, it should be the first + columnSelector.increment(); + doubleFirstAggregateCombiner.fold(columnSelector); + Assert.assertEquals(inputPairs[1], doubleFirstAggregateCombiner.getObject()); + + // inputPairs[2] has lower time value, it should be the first + columnSelector.increment(); + doubleFirstAggregateCombiner.fold(columnSelector); + Assert.assertEquals(inputPairs[2], doubleFirstAggregateCombiner.getObject()); + + // inputPairs[3] has the max time value, it should NOT be the first + columnSelector.increment(); + doubleFirstAggregateCombiner.fold(columnSelector); + Assert.assertEquals(inputPairs[2], doubleFirstAggregateCombiner.getObject()); + + doubleFirstAggregateCombiner.reset(columnSelector); + Assert.assertEquals(inputPairs[3], doubleFirstAggregateCombiner.getObject()); + } + private void aggregate( Aggregator agg ) diff --git a/processing/src/test/java/org/apache/druid/query/aggregation/first/FloatFirstAggregationTest.java b/processing/src/test/java/org/apache/druid/query/aggregation/first/FloatFirstAggregationTest.java index b9b37f83f8f7..fbe3d6c230ed 100644 --- a/processing/src/test/java/org/apache/druid/query/aggregation/first/FloatFirstAggregationTest.java +++ b/processing/src/test/java/org/apache/druid/query/aggregation/first/FloatFirstAggregationTest.java @@ -22,6 +22,7 @@ import org.apache.druid.collections.SerializablePair; import org.apache.druid.jackson.DefaultObjectMapper; import org.apache.druid.java.util.common.Pair; +import org.apache.druid.query.aggregation.AggregateCombiner; import org.apache.druid.query.aggregation.Aggregator; import org.apache.druid.query.aggregation.AggregatorFactory; import org.apache.druid.query.aggregation.BufferAggregator; @@ -29,7 +30,9 @@ import org.apache.druid.query.aggregation.TestLongColumnSelector; import org.apache.druid.query.aggregation.TestObjectColumnSelector; import org.apache.druid.segment.ColumnSelectorFactory; +import org.apache.druid.segment.column.ColumnCapabilitiesImpl; import org.apache.druid.segment.column.ColumnHolder; +import org.apache.druid.segment.column.ValueType; import org.apache.druid.testing.InitializedNullHandlingTest; import org.easymock.EasyMock; import org.junit.Assert; @@ -51,10 +54,13 @@ public class FloatFirstAggregationTest extends InitializedNullHandlingTest private float[] floats = {1.1f, 2.7f, 3.5f, 1.3f}; private long[] times = {12, 10, 5344, 7899999}; private SerializablePair[] pairs = { + new SerializablePair<>(1567899920L, null), new SerializablePair<>(1467225096L, 134.3f), new SerializablePair<>(23163L, 1232.212f), new SerializablePair<>(742L, 18f), - new SerializablePair<>(111111L, 233.5232f) + new SerializablePair<>(111111L, 233.5232f), + new SerializablePair<>(500L, null), + new SerializablePair<>(500L, 400.f) }; @Before @@ -69,12 +75,18 @@ public void setup() EasyMock.expect(colSelectorFactory.makeColumnValueSelector(ColumnHolder.TIME_COLUMN_NAME)).andReturn(timeSelector); EasyMock.expect(colSelectorFactory.makeColumnValueSelector("nilly")).andReturn(valueSelector).atLeastOnce(); EasyMock.expect(colSelectorFactory.makeColumnValueSelector("billy")).andReturn(objectSelector).atLeastOnce(); - EasyMock.replay(colSelectorFactory); } + /** + * test aggregator on value selector column + */ @Test public void testDoubleFirstAggregator() { + EasyMock.expect(colSelectorFactory.getColumnCapabilities("nilly")).andReturn(new ColumnCapabilitiesImpl().setType( + ValueType.FLOAT)); + EasyMock.replay(colSelectorFactory); + Aggregator agg = floatFirstAggregatorFactory.factorize(colSelectorFactory); aggregate(agg); @@ -90,9 +102,16 @@ public void testDoubleFirstAggregator() Assert.assertEquals(floats[1], agg.getFloat(), 0.0001); } + /** + * test aggregator on value selector column + */ @Test public void testDoubleFirstBufferAggregator() { + EasyMock.expect(colSelectorFactory.getColumnCapabilities("nilly")).andReturn(new ColumnCapabilitiesImpl().setType( + ValueType.FLOAT)); + EasyMock.replay(colSelectorFactory); + BufferAggregator agg = floatFirstAggregatorFactory.factorizeBuffered( colSelectorFactory); @@ -132,48 +151,117 @@ public void testComparatorWithNulls() Assert.assertEquals(-1, comparator.compare(pair2, pair1)); } + /** + * test aggregator on an object selector column + */ @Test public void testDoubleFirstCombiningAggregator() { + EasyMock.expect(colSelectorFactory.getColumnCapabilities("billy")).andReturn(new ColumnCapabilitiesImpl().setType( + ValueType.COMPLEX)); + EasyMock.replay(colSelectorFactory); + Aggregator agg = combiningAggFactory.factorize(colSelectorFactory); + // + // aggregate first 5 events, pair[3] is supposed to be the first + // aggregate(agg); aggregate(agg); aggregate(agg); aggregate(agg); + agg.aggregate(); + objectSelector.increment(); Pair result = (Pair) agg.get(); - Pair expected = (Pair) pairs[2]; + Pair expected = (Pair) pairs[3]; Assert.assertEquals(expected.lhs, result.lhs); Assert.assertEquals(expected.rhs, result.rhs, 0.0001); Assert.assertEquals(expected.rhs.longValue(), agg.getLong()); Assert.assertEquals(expected.rhs, agg.getFloat(), 0.0001); + + // + // aggregator once more, pair[5] will be the first + // + agg.aggregate(); + objectSelector.increment(); + result = (Pair) agg.get(); + expected = (Pair) pairs[5]; + + Assert.assertEquals(expected.lhs, result.lhs); + Assert.assertEquals(expected.lhs, result.lhs); + Assert.assertNull(result.rhs); + + // + // aggregate once more, pair[6] has the same timestamp as pair[5], but it won't be the first + // + agg.aggregate(); + objectSelector.increment(); + result = (Pair) agg.get(); + expected = (Pair) pairs[5]; + + Assert.assertEquals(expected.lhs, result.lhs); + Assert.assertEquals(expected.lhs, result.lhs); + Assert.assertNull(result.rhs); } + /** + * test aggregator on an object column + */ @Test public void testDoubleFirstCombiningBufferAggregator() { - BufferAggregator agg = combiningAggFactory.factorizeBuffered( - colSelectorFactory); + EasyMock.expect(colSelectorFactory.getColumnCapabilities("billy")).andReturn(new ColumnCapabilitiesImpl().setType( + ValueType.COMPLEX)); + EasyMock.replay(colSelectorFactory); + + BufferAggregator agg = combiningAggFactory.factorizeBuffered(colSelectorFactory); ByteBuffer buffer = ByteBuffer.wrap(new byte[floatFirstAggregatorFactory.getMaxIntermediateSizeWithNulls()]); agg.init(buffer, 0); + // + // aggregate first 5 events, pair[3] is the first + // + aggregate(agg, buffer, 0); aggregate(agg, buffer, 0); aggregate(agg, buffer, 0); aggregate(agg, buffer, 0); aggregate(agg, buffer, 0); Pair result = (Pair) agg.get(buffer, 0); - Pair expected = (Pair) pairs[2]; + Pair expected = (Pair) pairs[3]; Assert.assertEquals(expected.lhs, result.lhs); Assert.assertEquals(expected.rhs, result.rhs, 0.0001); Assert.assertEquals(expected.rhs.longValue(), agg.getLong(buffer, 0)); Assert.assertEquals(expected.rhs, agg.getFloat(buffer, 0), 0.0001); - } + // + // aggregate once more, pair[5] is the first + // + agg.aggregate(buffer, 0); + objectSelector.increment(); + result = (Pair) agg.get(buffer, 0); + expected = (Pair) pairs[5]; + + Assert.assertEquals(expected.lhs, result.lhs); + Assert.assertEquals(expected.lhs, result.lhs); + Assert.assertNull(result.rhs); + + // + // aggregate once more, pair[6] has the same timestamp as pair[5], but it won't be the first + // + agg.aggregate(buffer, 0); + objectSelector.increment(); + result = (Pair) agg.get(buffer, 0); + expected = (Pair) pairs[5]; + + Assert.assertEquals(expected.lhs, result.lhs); + Assert.assertEquals(expected.lhs, result.lhs); + Assert.assertNull(result.rhs); + } @Test public void testSerde() throws Exception @@ -183,6 +271,49 @@ public void testSerde() throws Exception Assert.assertEquals(floatFirstAggregatorFactory, mapper.readValue(doubleSpecJson, AggregatorFactory.class)); } + @Test + public void testFloatFirstAggregateCombiner() + { + AggregateCombiner floatFirstAggregateCombiner = combiningAggFactory.makeAggregateCombiner(); + + SerializablePair[] inputPairs = { + new SerializablePair<>(6L, null), + new SerializablePair<>(6L, 134.3f), + new SerializablePair<>(5L, 1232.212f), + new SerializablePair<>(4L, 18f), + new SerializablePair<>(7L, 233.5232f), + new SerializablePair<>(0L, null) + }; + TestObjectColumnSelector columnSelector = new TestObjectColumnSelector<>(inputPairs); + floatFirstAggregateCombiner.reset(columnSelector); + Assert.assertEquals(inputPairs[0], floatFirstAggregateCombiner.getObject()); + + // inputPairs[1].first > inputPair[0].first, it should NOT be the first + columnSelector.increment(); + floatFirstAggregateCombiner.fold(columnSelector); + Assert.assertEquals(inputPairs[0], floatFirstAggregateCombiner.getObject()); + + // inputPairs[2].first < inputPair[0].first, it should be the first + columnSelector.increment(); + floatFirstAggregateCombiner.fold(columnSelector); + Assert.assertEquals(inputPairs[2], floatFirstAggregateCombiner.getObject()); + + // inputPairs[3].first is the lowest, it should be the first + columnSelector.increment(); + floatFirstAggregateCombiner.fold(columnSelector); + Assert.assertEquals(inputPairs[3], floatFirstAggregateCombiner.getObject()); + + // inputPairs[4] is not the lowest, it should NOT be the first + columnSelector.increment(); + floatFirstAggregateCombiner.fold(columnSelector); + Assert.assertEquals(inputPairs[3], floatFirstAggregateCombiner.getObject()); + + columnSelector.increment(); + floatFirstAggregateCombiner.fold(columnSelector); + + Assert.assertEquals(inputPairs[5], floatFirstAggregateCombiner.getObject()); + } + private void aggregate( Aggregator agg ) diff --git a/processing/src/test/java/org/apache/druid/query/aggregation/first/LongFirstAggregationTest.java b/processing/src/test/java/org/apache/druid/query/aggregation/first/LongFirstAggregationTest.java index 7fe925656e2b..07e5dff0a47e 100644 --- a/processing/src/test/java/org/apache/druid/query/aggregation/first/LongFirstAggregationTest.java +++ b/processing/src/test/java/org/apache/druid/query/aggregation/first/LongFirstAggregationTest.java @@ -22,13 +22,16 @@ import org.apache.druid.collections.SerializablePair; import org.apache.druid.jackson.DefaultObjectMapper; import org.apache.druid.java.util.common.Pair; +import org.apache.druid.query.aggregation.AggregateCombiner; import org.apache.druid.query.aggregation.Aggregator; import org.apache.druid.query.aggregation.AggregatorFactory; import org.apache.druid.query.aggregation.BufferAggregator; import org.apache.druid.query.aggregation.TestLongColumnSelector; import org.apache.druid.query.aggregation.TestObjectColumnSelector; import org.apache.druid.segment.ColumnSelectorFactory; +import org.apache.druid.segment.column.ColumnCapabilitiesImpl; import org.apache.druid.segment.column.ColumnHolder; +import org.apache.druid.segment.column.ValueType; import org.apache.druid.testing.InitializedNullHandlingTest; import org.easymock.EasyMock; import org.junit.Assert; @@ -68,12 +71,15 @@ public void setup() EasyMock.expect(colSelectorFactory.makeColumnValueSelector(ColumnHolder.TIME_COLUMN_NAME)).andReturn(timeSelector); EasyMock.expect(colSelectorFactory.makeColumnValueSelector("nilly")).andReturn(valueSelector); EasyMock.expect(colSelectorFactory.makeColumnValueSelector("billy")).andReturn(objectSelector); - EasyMock.replay(colSelectorFactory); } @Test public void testLongFirstAggregator() { + EasyMock.expect(colSelectorFactory.getColumnCapabilities("nilly")).andReturn(new ColumnCapabilitiesImpl().setType( + ValueType.LONG)); + EasyMock.replay(colSelectorFactory); + Aggregator agg = longFirstAggFactory.factorize(colSelectorFactory); aggregate(agg); @@ -92,6 +98,10 @@ public void testLongFirstAggregator() @Test public void testLongFirstBufferAggregator() { + EasyMock.expect(colSelectorFactory.getColumnCapabilities("nilly")).andReturn(new ColumnCapabilitiesImpl().setType( + ValueType.LONG)); + EasyMock.replay(colSelectorFactory); + BufferAggregator agg = longFirstAggFactory.factorizeBuffered( colSelectorFactory); @@ -134,6 +144,10 @@ public void testComparatorWithNulls() @Test public void testLongFirstCombiningAggregator() { + EasyMock.expect(colSelectorFactory.getColumnCapabilities("billy")).andReturn(new ColumnCapabilitiesImpl().setType( + ValueType.COMPLEX)); + EasyMock.replay(colSelectorFactory); + Aggregator agg = combiningAggFactory.factorize(colSelectorFactory); aggregate(agg); @@ -153,6 +167,9 @@ public void testLongFirstCombiningAggregator() @Test public void testLongFirstCombiningBufferAggregator() { + EasyMock.expect(colSelectorFactory.getColumnCapabilities("billy")).andReturn(new ColumnCapabilitiesImpl().setType(ValueType.COMPLEX)); + EasyMock.replay(colSelectorFactory); + BufferAggregator agg = combiningAggFactory.factorizeBuffered( colSelectorFactory); @@ -173,7 +190,6 @@ public void testLongFirstCombiningBufferAggregator() Assert.assertEquals(expected.rhs, agg.getFloat(buffer, 0), 0.0001); } - @Test public void testSerde() throws Exception { @@ -182,6 +198,40 @@ public void testSerde() throws Exception Assert.assertEquals(longFirstAggFactory, mapper.readValue(longSpecJson, AggregatorFactory.class)); } + @Test + public void testLongFirstAggregateCombiner() + { + AggregateCombiner longFirstAggregateCombiner = combiningAggFactory.makeAggregateCombiner(); + + SerializablePair[] inputPairs = { + new SerializablePair<>(5L, 134L), + new SerializablePair<>(4L, 1232L), + new SerializablePair<>(3L, 18L), + new SerializablePair<>(6L, 233L) + }; + TestObjectColumnSelector columnSelector = new TestObjectColumnSelector<>(inputPairs); + longFirstAggregateCombiner.reset(columnSelector); + Assert.assertEquals(inputPairs[0], longFirstAggregateCombiner.getObject()); + + // inputPairs[1] has lower time value, it should be the first + columnSelector.increment(); + longFirstAggregateCombiner.fold(columnSelector); + Assert.assertEquals(inputPairs[1], longFirstAggregateCombiner.getObject()); + + // inputPairs[2] has lower time value, it should be the first + columnSelector.increment(); + longFirstAggregateCombiner.fold(columnSelector); + Assert.assertEquals(inputPairs[2], longFirstAggregateCombiner.getObject()); + + // inputPairs[3] has the max time value, it should NOT be the first + columnSelector.increment(); + longFirstAggregateCombiner.fold(columnSelector); + Assert.assertEquals(inputPairs[2], longFirstAggregateCombiner.getObject()); + + longFirstAggregateCombiner.reset(columnSelector); + Assert.assertEquals(inputPairs[3], longFirstAggregateCombiner.getObject()); + } + private void aggregate( Aggregator agg ) diff --git a/processing/src/test/java/org/apache/druid/query/aggregation/last/DoubleLastAggregationTest.java b/processing/src/test/java/org/apache/druid/query/aggregation/last/DoubleLastAggregationTest.java index 847194f12760..62fb84823bc9 100644 --- a/processing/src/test/java/org/apache/druid/query/aggregation/last/DoubleLastAggregationTest.java +++ b/processing/src/test/java/org/apache/druid/query/aggregation/last/DoubleLastAggregationTest.java @@ -22,6 +22,7 @@ import org.apache.druid.collections.SerializablePair; import org.apache.druid.jackson.DefaultObjectMapper; import org.apache.druid.java.util.common.Pair; +import org.apache.druid.query.aggregation.AggregateCombiner; import org.apache.druid.query.aggregation.Aggregator; import org.apache.druid.query.aggregation.AggregatorFactory; import org.apache.druid.query.aggregation.BufferAggregator; @@ -29,7 +30,9 @@ import org.apache.druid.query.aggregation.TestLongColumnSelector; import org.apache.druid.query.aggregation.TestObjectColumnSelector; import org.apache.druid.segment.ColumnSelectorFactory; +import org.apache.druid.segment.column.ColumnCapabilitiesImpl; import org.apache.druid.segment.column.ColumnHolder; +import org.apache.druid.segment.column.ValueType; import org.apache.druid.testing.InitializedNullHandlingTest; import org.easymock.EasyMock; import org.junit.Assert; @@ -69,12 +72,15 @@ public void setup() EasyMock.expect(colSelectorFactory.makeColumnValueSelector(ColumnHolder.TIME_COLUMN_NAME)).andReturn(timeSelector); EasyMock.expect(colSelectorFactory.makeColumnValueSelector("nilly")).andReturn(valueSelector); EasyMock.expect(colSelectorFactory.makeColumnValueSelector("billy")).andReturn(objectSelector); - EasyMock.replay(colSelectorFactory); } @Test public void testDoubleLastAggregator() { + EasyMock.expect(colSelectorFactory.getColumnCapabilities("nilly")).andReturn(new ColumnCapabilitiesImpl().setType( + ValueType.DOUBLE)); + EasyMock.replay(colSelectorFactory); + Aggregator agg = doubleLastAggFactory.factorize(colSelectorFactory); aggregate(agg); @@ -93,6 +99,10 @@ public void testDoubleLastAggregator() @Test public void testDoubleLastBufferAggregator() { + EasyMock.expect(colSelectorFactory.getColumnCapabilities("nilly")).andReturn(new ColumnCapabilitiesImpl().setType( + ValueType.DOUBLE)); + EasyMock.replay(colSelectorFactory); + BufferAggregator agg = doubleLastAggFactory.factorizeBuffered( colSelectorFactory); @@ -135,6 +145,10 @@ public void testComparatorWithNulls() @Test public void testDoubleLastCombiningAggregator() { + EasyMock.expect(colSelectorFactory.getColumnCapabilities("billy")).andReturn(new ColumnCapabilitiesImpl().setType( + ValueType.COMPLEX)); + EasyMock.replay(colSelectorFactory); + Aggregator agg = combiningAggFactory.factorize(colSelectorFactory); aggregate(agg); @@ -154,6 +168,10 @@ public void testDoubleLastCombiningAggregator() @Test public void testDoubleLastCombiningBufferAggregator() { + EasyMock.expect(colSelectorFactory.getColumnCapabilities("billy")).andReturn(new ColumnCapabilitiesImpl().setType( + ValueType.COMPLEX)); + EasyMock.replay(colSelectorFactory); + BufferAggregator agg = combiningAggFactory.factorizeBuffered( colSelectorFactory); @@ -183,6 +201,40 @@ public void testSerde() throws Exception Assert.assertEquals(doubleLastAggFactory, mapper.readValue(doubleSpecJson, AggregatorFactory.class)); } + @Test + public void testDoubleLastAggregateCombiner() + { + AggregateCombiner doubleLastAggregateCombiner = combiningAggFactory.makeAggregateCombiner(); + + SerializablePair[] inputPairs = { + new SerializablePair<>(3L, 18d), + new SerializablePair<>(5L, 134.3d), + new SerializablePair<>(6L, 1232.212d), + new SerializablePair<>(1L, 233.5232d) + }; + TestObjectColumnSelector columnSelector = new TestObjectColumnSelector<>(inputPairs); + doubleLastAggregateCombiner.reset(columnSelector); + Assert.assertEquals(inputPairs[0], doubleLastAggregateCombiner.getObject()); + + // inputPairs[1] has larger time value, it should be the last + columnSelector.increment(); + doubleLastAggregateCombiner.fold(columnSelector); + Assert.assertEquals(inputPairs[1], doubleLastAggregateCombiner.getObject()); + + // inputPairs[2] has larger time value, it should be the last + columnSelector.increment(); + doubleLastAggregateCombiner.fold(columnSelector); + Assert.assertEquals(inputPairs[2], doubleLastAggregateCombiner.getObject()); + + // inputPairs[3] has the min time value, it should NOT be the first + columnSelector.increment(); + doubleLastAggregateCombiner.fold(columnSelector); + Assert.assertEquals(inputPairs[2], doubleLastAggregateCombiner.getObject()); + + doubleLastAggregateCombiner.reset(columnSelector); + Assert.assertEquals(inputPairs[3], doubleLastAggregateCombiner.getObject()); + } + private void aggregate( Aggregator agg ) diff --git a/processing/src/test/java/org/apache/druid/query/aggregation/last/FloatLastAggregationTest.java b/processing/src/test/java/org/apache/druid/query/aggregation/last/FloatLastAggregationTest.java index 86b6a998b8e3..8f349afa340d 100644 --- a/processing/src/test/java/org/apache/druid/query/aggregation/last/FloatLastAggregationTest.java +++ b/processing/src/test/java/org/apache/druid/query/aggregation/last/FloatLastAggregationTest.java @@ -22,6 +22,7 @@ import org.apache.druid.collections.SerializablePair; import org.apache.druid.jackson.DefaultObjectMapper; import org.apache.druid.java.util.common.Pair; +import org.apache.druid.query.aggregation.AggregateCombiner; import org.apache.druid.query.aggregation.Aggregator; import org.apache.druid.query.aggregation.AggregatorFactory; import org.apache.druid.query.aggregation.BufferAggregator; @@ -29,7 +30,9 @@ import org.apache.druid.query.aggregation.TestLongColumnSelector; import org.apache.druid.query.aggregation.TestObjectColumnSelector; import org.apache.druid.segment.ColumnSelectorFactory; +import org.apache.druid.segment.column.ColumnCapabilitiesImpl; import org.apache.druid.segment.column.ColumnHolder; +import org.apache.druid.segment.column.ValueType; import org.apache.druid.testing.InitializedNullHandlingTest; import org.easymock.EasyMock; import org.junit.Assert; @@ -51,10 +54,13 @@ public class FloatLastAggregationTest extends InitializedNullHandlingTest private float[] floats = {1.1897f, 0.001f, 86.23f, 166.228f}; private long[] times = {8224, 6879, 2436, 7888}; private SerializablePair[] pairs = { + new SerializablePair<>(111L, null), new SerializablePair<>(52782L, 134.3f), new SerializablePair<>(65492L, 1232.212f), new SerializablePair<>(69134L, 18.1233f), - new SerializablePair<>(11111L, 233.5232f) + new SerializablePair<>(11111L, 233.5232f), + new SerializablePair<>(99999L, 99999.f), + new SerializablePair<>(99999L, null) }; @Before @@ -69,12 +75,14 @@ public void setup() EasyMock.expect(colSelectorFactory.makeColumnValueSelector(ColumnHolder.TIME_COLUMN_NAME)).andReturn(timeSelector); EasyMock.expect(colSelectorFactory.makeColumnValueSelector("nilly")).andReturn(valueSelector); EasyMock.expect(colSelectorFactory.makeColumnValueSelector("billy")).andReturn(objectSelector); - EasyMock.replay(colSelectorFactory); } @Test public void testDoubleLastAggregator() { + EasyMock.expect(colSelectorFactory.getColumnCapabilities("nilly")).andReturn(new ColumnCapabilitiesImpl().setType( + ValueType.FLOAT)); + EasyMock.replay(colSelectorFactory); Aggregator agg = floatLastAggregatorFactory.factorize(colSelectorFactory); aggregate(agg); @@ -88,11 +96,16 @@ public void testDoubleLastAggregator() Assert.assertEquals(floats[0], result.rhs, 0.0001); Assert.assertEquals((long) floats[0], agg.getLong()); Assert.assertEquals(floats[0], agg.getFloat(), 0.0001); + + } @Test public void testDoubleLastBufferAggregator() { + EasyMock.expect(colSelectorFactory.getColumnCapabilities("nilly")).andReturn(new ColumnCapabilitiesImpl().setType( + ValueType.FLOAT)); + EasyMock.replay(colSelectorFactory); BufferAggregator agg = floatLastAggregatorFactory.factorizeBuffered( colSelectorFactory); @@ -135,45 +148,102 @@ public void testComparatorWithNulls() @Test public void testDoubleLastCombiningAggregator() { + EasyMock.expect(colSelectorFactory.getColumnCapabilities("billy")).andReturn(new ColumnCapabilitiesImpl().setType( + ValueType.COMPLEX)); + EasyMock.replay(colSelectorFactory); + Aggregator agg = combiningAggFactory.factorize(colSelectorFactory); - aggregate(agg); - aggregate(agg); - aggregate(agg); - aggregate(agg); + agg.aggregate(); + objectSelector.increment(); + agg.aggregate(); + objectSelector.increment(); + agg.aggregate(); + objectSelector.increment(); + agg.aggregate(); + objectSelector.increment(); + agg.aggregate(); + objectSelector.increment(); Pair result = (Pair) agg.get(); - Pair expected = (Pair) pairs[2]; + // pair[3] has the largest timestamp in first 5 events + Pair expected = (Pair) pairs[3]; + + Assert.assertEquals(expected.lhs, result.lhs); + Assert.assertEquals(expected.rhs, result.rhs, 0.0001); + Assert.assertEquals(expected.rhs.longValue(), agg.getLong()); + Assert.assertEquals(expected.rhs, agg.getFloat(), 0.0001); + // aggregate once more, the last will change to pair[5] event + agg.aggregate(); + objectSelector.increment(); + result = (Pair) agg.get(); + expected = (Pair) pairs[5]; Assert.assertEquals(expected.lhs, result.lhs); Assert.assertEquals(expected.rhs, result.rhs, 0.0001); Assert.assertEquals(expected.rhs.longValue(), agg.getLong()); Assert.assertEquals(expected.rhs, agg.getFloat(), 0.0001); + + // aggregate once more, now the last event has the same timestamp as the last-1 event, it will be the last + agg.aggregate(); + objectSelector.increment(); + result = (Pair) agg.get(); + expected = (Pair) pairs[6]; + Assert.assertEquals(expected.lhs, result.lhs); + Assert.assertEquals(result.rhs, null); } @Test public void testDoubleLastCombiningBufferAggregator() { + EasyMock.expect(colSelectorFactory.getColumnCapabilities("billy")).andReturn(new ColumnCapabilitiesImpl().setType( + ValueType.COMPLEX)); + EasyMock.replay(colSelectorFactory); + BufferAggregator agg = combiningAggFactory.factorizeBuffered( colSelectorFactory); ByteBuffer buffer = ByteBuffer.wrap(new byte[floatLastAggregatorFactory.getMaxIntermediateSizeWithNulls()]); agg.init(buffer, 0); - aggregate(agg, buffer, 0); - aggregate(agg, buffer, 0); - aggregate(agg, buffer, 0); - aggregate(agg, buffer, 0); + // aggregate first 5 events, pair[3] is the last + agg.aggregate(buffer, 0); + objectSelector.increment(); + agg.aggregate(buffer, 0); + objectSelector.increment(); + agg.aggregate(buffer, 0); + objectSelector.increment(); + agg.aggregate(buffer, 0); + objectSelector.increment(); + agg.aggregate(buffer, 0); + objectSelector.increment(); Pair result = (Pair) agg.get(buffer, 0); - Pair expected = (Pair) pairs[2]; + Pair expected = (Pair) pairs[3]; Assert.assertEquals(expected.lhs, result.lhs); Assert.assertEquals(expected.rhs, result.rhs, 0.0001); Assert.assertEquals(expected.rhs.longValue(), agg.getLong(buffer, 0)); Assert.assertEquals(expected.rhs, agg.getFloat(buffer, 0), 0.0001); - } + // aggregate once more, pair[5] is the last + agg.aggregate(buffer, 0); + objectSelector.increment(); + result = (Pair) agg.get(buffer, 0); + expected = (Pair) pairs[5]; + Assert.assertEquals(expected.lhs, result.lhs); + Assert.assertEquals(expected.rhs, result.rhs, 0.0001); + Assert.assertEquals(expected.rhs.longValue(), agg.getLong(buffer, 0)); + Assert.assertEquals(expected.rhs, agg.getFloat(buffer, 0), 0.0001); + + // aggregate once more, pair[6] has the same timestamp with pair[5], it will be the last + agg.aggregate(buffer, 0); + objectSelector.increment(); + result = (Pair) agg.get(buffer, 0); + expected = (Pair) pairs[5]; + Assert.assertEquals(expected.lhs, result.lhs); + Assert.assertEquals(result.rhs, null); + } @Test public void testSerde() throws Exception @@ -183,6 +253,41 @@ public void testSerde() throws Exception Assert.assertEquals(floatLastAggregatorFactory, mapper.readValue(doubleSpecJson, AggregatorFactory.class)); } + @Test + public void testDoubleLastAggregateCombiner() + { + AggregateCombiner floatLastAggregateCombiner = combiningAggFactory.makeAggregateCombiner(); + + SerializablePair[] inputPairs = { + new SerializablePair<>(3L, 18f), + new SerializablePair<>(5L, 134.3f), + new SerializablePair<>(6L, 1232.212f), + new SerializablePair<>(1L, 233.5232f) + }; + TestObjectColumnSelector columnSelector = new TestObjectColumnSelector<>(inputPairs); + floatLastAggregateCombiner.reset(columnSelector); + Assert.assertEquals(inputPairs[0], floatLastAggregateCombiner.getObject()); + + // inputPairs[1] has larger time value, it should be the last + columnSelector.increment(); + floatLastAggregateCombiner.fold(columnSelector); + Assert.assertEquals(inputPairs[1], floatLastAggregateCombiner.getObject()); + + // inputPairs[2] has larger time value, it should be the last + columnSelector.increment(); + floatLastAggregateCombiner.fold(columnSelector); + Assert.assertEquals(inputPairs[2], floatLastAggregateCombiner.getObject()); + + // inputPairs[3] has the min time value, it should NOT be the first + columnSelector.increment(); + floatLastAggregateCombiner.fold(columnSelector); + Assert.assertEquals(inputPairs[2], floatLastAggregateCombiner.getObject()); + + floatLastAggregateCombiner.reset(columnSelector); + Assert.assertEquals(inputPairs[3], floatLastAggregateCombiner.getObject()); + } + + private void aggregate( Aggregator agg ) diff --git a/processing/src/test/java/org/apache/druid/query/aggregation/last/LongLastAggregationTest.java b/processing/src/test/java/org/apache/druid/query/aggregation/last/LongLastAggregationTest.java index a9b2fadcc90e..80ddbaef0d26 100644 --- a/processing/src/test/java/org/apache/druid/query/aggregation/last/LongLastAggregationTest.java +++ b/processing/src/test/java/org/apache/druid/query/aggregation/last/LongLastAggregationTest.java @@ -22,13 +22,16 @@ import org.apache.druid.collections.SerializablePair; import org.apache.druid.jackson.DefaultObjectMapper; import org.apache.druid.java.util.common.Pair; +import org.apache.druid.query.aggregation.AggregateCombiner; import org.apache.druid.query.aggregation.Aggregator; import org.apache.druid.query.aggregation.AggregatorFactory; import org.apache.druid.query.aggregation.BufferAggregator; import org.apache.druid.query.aggregation.TestLongColumnSelector; import org.apache.druid.query.aggregation.TestObjectColumnSelector; import org.apache.druid.segment.ColumnSelectorFactory; +import org.apache.druid.segment.column.ColumnCapabilitiesImpl; import org.apache.druid.segment.column.ColumnHolder; +import org.apache.druid.segment.column.ValueType; import org.apache.druid.testing.InitializedNullHandlingTest; import org.easymock.EasyMock; import org.junit.Assert; @@ -68,12 +71,15 @@ public void setup() EasyMock.expect(colSelectorFactory.makeColumnValueSelector(ColumnHolder.TIME_COLUMN_NAME)).andReturn(timeSelector); EasyMock.expect(colSelectorFactory.makeColumnValueSelector("nilly")).andReturn(valueSelector); EasyMock.expect(colSelectorFactory.makeColumnValueSelector("billy")).andReturn(objectSelector); - EasyMock.replay(colSelectorFactory); } @Test public void testLongLastAggregator() { + EasyMock.expect(colSelectorFactory.getColumnCapabilities("nilly")).andReturn(new ColumnCapabilitiesImpl().setType( + ValueType.LONG)); + EasyMock.replay(colSelectorFactory); + Aggregator agg = longLastAggFactory.factorize(colSelectorFactory); aggregate(agg); @@ -92,6 +98,10 @@ public void testLongLastAggregator() @Test public void testLongLastBufferAggregator() { + EasyMock.expect(colSelectorFactory.getColumnCapabilities("nilly")).andReturn(new ColumnCapabilitiesImpl().setType( + ValueType.LONG)); + EasyMock.replay(colSelectorFactory); + BufferAggregator agg = longLastAggFactory.factorizeBuffered( colSelectorFactory); @@ -134,6 +144,10 @@ public void testComparatorWithNulls() @Test public void testLongLastCombiningAggregator() { + EasyMock.expect(colSelectorFactory.getColumnCapabilities("billy")).andReturn(new ColumnCapabilitiesImpl().setType( + ValueType.COMPLEX)); + EasyMock.replay(colSelectorFactory); + Aggregator agg = combiningAggFactory.factorize(colSelectorFactory); aggregate(agg); @@ -153,6 +167,10 @@ public void testLongLastCombiningAggregator() @Test public void testLongLastCombiningBufferAggregator() { + EasyMock.expect(colSelectorFactory.getColumnCapabilities("billy")).andReturn(new ColumnCapabilitiesImpl().setType( + ValueType.COMPLEX)); + EasyMock.replay(colSelectorFactory); + BufferAggregator agg = combiningAggFactory.factorizeBuffered( colSelectorFactory); @@ -182,6 +200,40 @@ public void testSerde() throws Exception Assert.assertEquals(longLastAggFactory, mapper.readValue(longSpecJson, AggregatorFactory.class)); } + @Test + public void testLongLastAggregateCombiner() + { + AggregateCombiner longLastAggregateCombiner = combiningAggFactory.makeAggregateCombiner(); + + SerializablePair[] inputPairs = { + new SerializablePair<>(3L, 18L), + new SerializablePair<>(5L, 134L), + new SerializablePair<>(6L, 1232L), + new SerializablePair<>(1L, 2332L) + }; + TestObjectColumnSelector columnSelector = new TestObjectColumnSelector<>(inputPairs); + longLastAggregateCombiner.reset(columnSelector); + Assert.assertEquals(inputPairs[0], longLastAggregateCombiner.getObject()); + + // inputPairs[1] has larger time value, it should be the last + columnSelector.increment(); + longLastAggregateCombiner.fold(columnSelector); + Assert.assertEquals(inputPairs[1], longLastAggregateCombiner.getObject()); + + // inputPairs[2] has larger time value, it should be the last + columnSelector.increment(); + longLastAggregateCombiner.fold(columnSelector); + Assert.assertEquals(inputPairs[2], longLastAggregateCombiner.getObject()); + + // inputPairs[3] has the min time value, it should NOT be the first + columnSelector.increment(); + longLastAggregateCombiner.fold(columnSelector); + Assert.assertEquals(inputPairs[2], longLastAggregateCombiner.getObject()); + + longLastAggregateCombiner.reset(columnSelector); + Assert.assertEquals(inputPairs[3], longLastAggregateCombiner.getObject()); + } + private void aggregate( Aggregator agg ) diff --git a/processing/src/test/java/org/apache/druid/segment/IndexMergerRollupTest.java b/processing/src/test/java/org/apache/druid/segment/IndexMergerRollupTest.java index 3cd0e8620ced..1d20d6225415 100644 --- a/processing/src/test/java/org/apache/druid/segment/IndexMergerRollupTest.java +++ b/processing/src/test/java/org/apache/druid/segment/IndexMergerRollupTest.java @@ -20,9 +20,18 @@ package org.apache.druid.segment; import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import org.apache.druid.common.config.NullHandling; import org.apache.druid.data.input.MapBasedInputRow; import org.apache.druid.query.aggregation.AggregatorFactory; +import org.apache.druid.query.aggregation.SerializablePairLongLong; +import org.apache.druid.query.aggregation.first.DoubleFirstAggregatorFactory; +import org.apache.druid.query.aggregation.first.FloatFirstAggregatorFactory; +import org.apache.druid.query.aggregation.first.LongFirstAggregatorFactory; import org.apache.druid.query.aggregation.first.StringFirstAggregatorFactory; +import org.apache.druid.query.aggregation.last.DoubleLastAggregatorFactory; +import org.apache.druid.query.aggregation.last.FloatLastAggregatorFactory; +import org.apache.druid.query.aggregation.last.LongLastAggregatorFactory; import org.apache.druid.query.aggregation.last.StringLastAggregatorFactory; import org.apache.druid.segment.data.IncrementalIndexTest; import org.apache.druid.segment.incremental.IncrementalIndex; @@ -38,7 +47,6 @@ import java.time.Instant; import java.util.ArrayList; import java.util.Arrays; -import java.util.HashMap; import java.util.List; import java.util.Map; @@ -52,6 +60,26 @@ public class IndexMergerRollupTest extends InitializedNullHandlingTest @Rule public TemporaryFolder temporaryFolder = new TemporaryFolder(); + private final List> strEventsList = Arrays.asList( + ImmutableMap.of("d", "d1", "m", "m1"), + ImmutableMap.of("d", "d1", "m", "m2") + ); + + private final List> doubleEventsList = Arrays.asList( + ImmutableMap.of("d", "d1", "m", 1.0d), + ImmutableMap.of("d", "d1", "m", 2.0d) + ); + + private final List> floatEventsList = Arrays.asList( + ImmutableMap.of("d", "d1", "m", 1.0f), + ImmutableMap.of("d", "d1", "m", 2.0f) + ); + + private final List> longEventsList = Arrays.asList( + ImmutableMap.of("d", "d1", "m", 1L), + ImmutableMap.of("d", "d1", "m", 2L) + ); + @Before public void setUp() { @@ -61,61 +89,157 @@ public void setUp() indexSpec = new IndexSpec(); } - private void testStringFirstLastRollup( - AggregatorFactory[] aggregatorFactories + private QueryableIndex testFirstLastRollup( + final List> eventsList, + final List dimensions, + final AggregatorFactory... aggregatorFactories ) throws Exception { - List> eventsList = Arrays.asList( - new HashMap() - { - { - put("d", "d1"); - put("m", "m1"); - } - }, - new HashMap() - { - { - put("d", "d1"); - put("m", "m2"); - } - } - ); - final File tempDir = temporaryFolder.newFolder(); List indexes = new ArrayList<>(); + + // set timestamp of all events to the same so that rollup will be applied to these events Instant time = Instant.now(); for (Map events : eventsList) { IncrementalIndex toPersist = IncrementalIndexTest.createIndex(aggregatorFactories); - toPersist.add(new MapBasedInputRow(time.toEpochMilli(), ImmutableList.of("d"), events)); + toPersist.add(new MapBasedInputRow(time.toEpochMilli(), dimensions, events)); indexes.add(indexIO.loadIndex(indexMerger.persist(toPersist, tempDir, indexSpec, null))); } File indexFile = indexMerger .mergeQueryableIndex(indexes, true, aggregatorFactories, tempDir, indexSpec, null, -1); - try (QueryableIndex mergedIndex = indexIO.loadIndex(indexFile)) { - Assert.assertEquals("Number of rows should be 1", 1, mergedIndex.getNumRows()); - } + return indexIO.loadIndex(indexFile); } @Test public void testStringFirstRollup() throws Exception { - AggregatorFactory[] aggregatorFactories = new AggregatorFactory[]{ + try (QueryableIndex mergedIndex = testFirstLastRollup( + strEventsList, + ImmutableList.of("d"), new StringFirstAggregatorFactory("m", "m", 1024) - }; - testStringFirstLastRollup(aggregatorFactories); + )) { + Assert.assertEquals("Number of rows should be 1", 1, mergedIndex.getNumRows()); + } } @Test public void testStringLastRollup() throws Exception { - AggregatorFactory[] aggregatorFactories = new AggregatorFactory[]{ + try (QueryableIndex mergedIndex = testFirstLastRollup( + strEventsList, + ImmutableList.of("d"), new StringLastAggregatorFactory("m", "m", 1024) - }; - testStringFirstLastRollup(aggregatorFactories); + )) { + Assert.assertEquals("Number of rows should be 1", 1, mergedIndex.getNumRows()); + } + } + + @Test + public void testDoubleFirstRollup() throws Exception + { + try (QueryableIndex mergedIndex = testFirstLastRollup( + doubleEventsList, + ImmutableList.of("d"), + new DoubleFirstAggregatorFactory("m", "m") + )) { + Assert.assertEquals("Number of rows should be 1", 1, mergedIndex.getNumRows()); + } + } + + @Test + public void testDoubleLastRollup() throws Exception + { + try (QueryableIndex mergedIndex = testFirstLastRollup( + doubleEventsList, + ImmutableList.of("d"), + new DoubleLastAggregatorFactory("m", "m") + )) { + Assert.assertEquals("Number of rows should be 1", 1, mergedIndex.getNumRows()); + } + } + + @Test + public void testFloatFirstRollup() throws Exception + { + try (QueryableIndex mergedIndex = testFirstLastRollup( + floatEventsList, + ImmutableList.of("d"), + new FloatFirstAggregatorFactory("m", "m") + )) { + Assert.assertEquals("Number of rows should be 1", 1, mergedIndex.getNumRows()); + } + } + + @Test + public void testFloatLastRollup() throws Exception + { + try (QueryableIndex mergedIndex = testFirstLastRollup( + floatEventsList, + ImmutableList.of("d"), + new FloatLastAggregatorFactory("m", "m") + )) { + Assert.assertEquals("Number of rows should be 1", 1, mergedIndex.getNumRows()); + } + } + + @Test + public void testLongFirstRollup() throws Exception + { + try (QueryableIndex mergedIndex = testFirstLastRollup( + longEventsList, + ImmutableList.of("d"), + new LongFirstAggregatorFactory("m", "m") + )) { + Assert.assertEquals("Number of rows should be 1", 1, mergedIndex.getNumRows()); + } + } + + @Test + public void testLongLastRollup() throws Exception + { + try (QueryableIndex mergedIndex = testFirstLastRollup( + longEventsList, + ImmutableList.of("d"), + new LongLastAggregatorFactory("m", "m") + )) { + Assert.assertEquals("Number of rows should be 1", 1, mergedIndex.getNumRows()); + } + } + + @Test + public void testLongFirstRollupWithNull() throws Exception + { + final List> longEventsList = Arrays.asList( + // m == null + ImmutableMap.of("d", "d1"), + + ImmutableMap.of("d", "d1", "m", 1L), + ImmutableMap.of("d", "d1", "m", 2L) + ); + + try (QueryableIndex mergedIndex = testFirstLastRollup( + longEventsList, + ImmutableList.of("d"), + new LongFirstAggregatorFactory("m", "m") + )) { + Assert.assertEquals("Number of rows should be 1", 1, mergedIndex.getNumRows()); + + Object o = mergedIndex.getColumnHolder("m") + .getColumn() + .makeColumnValueSelector(new SimpleAscendingOffset(0)) + .getObject(); + Assert.assertEquals(o.getClass(), SerializablePairLongLong.class); + + // since input events have the same timestamp, longFirst aggregator should return the first event + if (NullHandling.replaceWithDefault()) { + Assert.assertEquals(0L, ((SerializablePairLongLong) o).rhs.longValue()); + } else { + Assert.assertEquals(null, ((SerializablePairLongLong) o).rhs); + } + } } } diff --git a/web-console/src/druid-models/metric-spec.tsx b/web-console/src/druid-models/metric-spec.tsx index e72d114cff35..fd1282fe7702 100644 --- a/web-console/src/druid-models/metric-spec.tsx +++ b/web-console/src/druid-models/metric-spec.tsx @@ -67,10 +67,14 @@ export const METRIC_SPEC_FIELDS: Field[] = [ group: 'max', suggestions: ['longMax', 'doubleMax', 'floatMax'], }, - // Do not show first and last aggregators as they can not be used in ingestion specs and this definition is only used in the data loader. - // Ref: https://druid.apache.org/docs/latest/querying/aggregations.html#first--last-aggregator - // Should the first / last aggregators become usable at ingestion time, reverse the changes made in: - // https://github.com/apache/druid/pull/10794 + { + group: 'first', + suggestions: ['longFirst', 'doubleFirst', 'floatFirst'], + }, + { + group: 'last', + suggestions: ['longLast', 'doubleLast', 'floatLast'], + }, 'thetaSketch', { group: 'HLLSketch',