Skip to content

Commit

Permalink
add EARLIEST_BY/LATEST_BY to make EARLIEST/LATEST function signatures…
Browse files Browse the repository at this point in the history
… less ambiguous (#12145)

* add EARLIEST_BY/LATEST_BY to make EARLIEST/LATEST function signatures unambiguous

* switcheroo

* EARLIEST_BY/LATEST_BY use timestamp instead of numeric types, update docs

* revert unintended change

* fix docs

* fix docs better
  • Loading branch information
clintropolis authored Jan 12, 2022
1 parent fae7380 commit f2ce769
Show file tree
Hide file tree
Showing 13 changed files with 268 additions and 63 deletions.
12 changes: 6 additions & 6 deletions docs/querying/sql.md
Original file line number Diff line number Diff line change
Expand Up @@ -364,14 +364,14 @@ In the aggregation functions supported by Druid, only `COUNT`, `ARRAY_AGG`, and
|`STDDEV_POP(expr)`|Computes standard deviation population of `expr`. See [stats extension](../development/extensions-core/stats.md) documentation for additional details.|`null` if `druid.generic.useDefaultValueForNull=false`, otherwise `0`|
|`STDDEV_SAMP(expr)`|Computes standard deviation sample of `expr`. See [stats extension](../development/extensions-core/stats.md) documentation for additional details.|`null` if `druid.generic.useDefaultValueForNull=false`, otherwise `0`|
|`STDDEV(expr)`|Computes standard deviation sample of `expr`. See [stats extension](../development/extensions-core/stats.md) documentation for additional details.|`null` if `druid.generic.useDefaultValueForNull=false`, otherwise `0`|
|`EARLIEST(expr)`|Returns the earliest value of `expr`, which must be numeric. If `expr` comes from a relation with a timestamp column (like a Druid datasource) then "earliest" is the value first encountered with the minimum overall timestamp of all values being aggregated. If `expr` does not come from a relation with a timestamp, then it is simply the first value encountered.|`null` if `druid.generic.useDefaultValueForNull=false`, otherwise `0`|
|`EARLIEST(expr, timeColumn)`|Returns the earliest value of `expr`, which must be numeric. Earliest value is defined as the value first encountered with the minimum overall value of time column of all values being aggregated.|`null` if `druid.generic.useDefaultValueForNull=false`, otherwise `0`|
|`EARLIEST(expr)`|Returns the earliest value of `expr`, which must be numeric. If `expr` comes from a relation with a timestamp column (like `__time` in a Druid datasource), the "earliest" is taken from the row with the overall earliest non-null value of the timestamp column. If the earliest non-null value of the timestamp column appears in multiple rows, the `expr` may be taken from any of those rows. If `expr` does not come from a relation with a timestamp, then it is simply the first value encountered.|`null` if `druid.generic.useDefaultValueForNull=false`, otherwise `0`|
|`EARLIEST(expr, maxBytesPerString)`|Like `EARLIEST(expr)`, but for strings. The `maxBytesPerString` parameter determines how much aggregation space to allocate per string. Strings longer than this limit will be truncated. This parameter should be set as low as possible, since high values will lead to wasted memory.|`null` if `druid.generic.useDefaultValueForNull=false`, otherwise `''`|
|`EARLIEST(expr, maxBytesPerString, timeColumn)`|Like `EARLIEST(expr, timeColumn)`, but for strings. The `maxBytesPerString` parameter determines how much aggregation space to allocate per string. Strings longer than this limit will be truncated. This parameter should be set as low as possible, since high values will lead to wasted memory.|`null` if `druid.generic.useDefaultValueForNull=false`, otherwise `''`|
|`LATEST(expr)`|Returns the latest value of `expr`, which must be numeric. If `expr` comes from a relation with a timestamp column (like a Druid datasource) then "latest" is the value last encountered with the maximum overall timestamp of all values being aggregated. If `expr` does not come from a relation with a timestamp, then it is simply the last value encountered.|`null` if `druid.generic.useDefaultValueForNull=false`, otherwise `0`|
|`LATEST(expr, timeColumn)`|Returns the latest value of `expr`, which must be numeric. Latest value is defined as the value last encountered with the maximum overall value of time column of all values being aggregated.|`null` if `druid.generic.useDefaultValueForNull=false`, otherwise `0`|
|`EARLIEST_BY(expr, timestampExpr)`|Returns the earliest value of `expr`, which must be numeric. The earliest value of `expr` is taken from the row with the overall earliest non-null value of `timestampExpr`. If the earliest non-null value of `timestampExpr` appears in multiple rows, the `expr` may be taken from any of those rows.|`null` if `druid.generic.useDefaultValueForNull=false`, otherwise `0`|
|`EARLIEST_BY(expr, timestampExpr, maxBytesPerString)`| Like `EARLIEST_BY(expr, timestampExpr)`, but for strings. The `maxBytesPerString` parameter determines how much aggregation space to allocate per string. Strings longer than this limit will be truncated. This parameter should be set as low as possible, since high values will lead to wasted memory.|`null` if `druid.generic.useDefaultValueForNull=false`, otherwise `''`|
|`LATEST(expr)`|Returns the latest value of `expr`, which must be numeric. If `expr` comes from a relation with a timestamp column (like `__time` in a Druid datasource), the "latest" is taken from the row with the overall latest non-null value of the timestamp column. If the latest non-null value of the timestamp column appears in multiple rows, the `expr` may be taken from any of those rows. If `expr` does not come from a relation with a timestamp, then it is simply the last value encountered.|`null` if `druid.generic.useDefaultValueForNull=false`, otherwise `0`|
|`LATEST(expr, maxBytesPerString)`|Like `LATEST(expr)`, but for strings. The `maxBytesPerString` parameter determines how much aggregation space to allocate per string. Strings longer than this limit will be truncated. This parameter should be set as low as possible, since high values will lead to wasted memory.|`null` if `druid.generic.useDefaultValueForNull=false`, otherwise `''`|
|`LATEST(expr, maxBytesPerString, timeColumn)`|Like `LATEST(expr, timeColumn)`, but for strings. The `maxBytesPerString` parameter determines how much aggregation space to allocate per string. Strings longer than this limit will be truncated. This parameter should be set as low as possible, since high values will lead to wasted memory.|`null` if `druid.generic.useDefaultValueForNull=false`, otherwise `''`|
|`LATEST_BY(expr, timestampExpr)`|Returns the latest value of `expr`, which must be numeric. The latest value of `expr` is taken from the row with the overall latest non-null value of `timestampExpr`. If the overall latest non-null value of `timestampExpr` appears in multiple rows, the `expr` may be taken from any of those rows.|`null` if `druid.generic.useDefaultValueForNull=false`, otherwise `0`|
|`LATEST_BY(expr, timestampExpr, maxBytesPerString)`|Like `LATEST_BY(expr, timestampExpr)`, but for strings. The `maxBytesPerString` parameter determines how much aggregation space to allocate per string. Strings longer than this limit will be truncated. This parameter should be set as low as possible, since high values will lead to wasted memory.|`null` if `druid.generic.useDefaultValueForNull=false`, otherwise `''`|
|`ANY_VALUE(expr)`|Returns any value of `expr` including null. `expr` must be numeric. This aggregator can simplify and optimize the performance by returning the first encountered value (including null)|`null` if `druid.generic.useDefaultValueForNull=false`, otherwise `0`|
|`ANY_VALUE(expr, maxBytesPerString)`|Like `ANY_VALUE(expr)`, but for strings. The `maxBytesPerString` parameter determines how much aggregation space to allocate per string. Strings longer than this limit will be truncated. This parameter should be set as low as possible, since high values will lead to wasted memory.|`null` if `druid.generic.useDefaultValueForNull=false`, otherwise `''`|
|`GROUPING(expr, expr...)`|Returns a number to indicate which groupBy dimension is included in a row, when using `GROUPING SETS`. Refer to [additional documentation](aggregations.md#grouping-aggregator) on how to infer this number.|N/A|
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,9 @@ public NumericFirstAggregator(BaseLongColumnValueSelector timeSelector, TSelecto
@Override
public void aggregate()
{
if (timeSelector.isNull()) {
return;
}
long time = timeSelector.getLong();
if (time < firstTime) {
firstTime = time;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,9 @@ public void init(ByteBuffer buf, int position)
@Override
public void aggregate(ByteBuffer buf, int position)
{
if (timeSelector.isNull()) {
return;
}
long time = timeSelector.getLong();
long firstTime = buf.getLong(position);
if (time < firstTime) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,9 @@ public StringFirstAggregator(
@Override
public void aggregate()
{
if (timeSelector.isNull()) {
return;
}
if (needsFoldCheck) {
// Less efficient code path when folding is a possibility (we must read the value selector first just in case
// it's a foldable object).
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,9 @@ public void init(ByteBuffer buf, int position)
@Override
public void aggregate(ByteBuffer buf, int position)
{
if (timeSelector.isNull()) {
return;
}
if (needsFoldCheck) {
// Less efficient code path when folding is a possibility (we must read the value selector first just in case
// it's a foldable object).
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,9 @@ public NumericLastAggregator(BaseLongColumnValueSelector timeSelector, TSelector
@Override
public void aggregate()
{
if (timeSelector.isNull()) {
return;
}
long time = timeSelector.getLong();
if (time >= lastTime) {
lastTime = time;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,9 @@ public void init(ByteBuffer buf, int position)
@Override
public void aggregate(ByteBuffer buf, int position)
{
if (timeSelector.isNull()) {
return;
}
long time = timeSelector.getLong();
long lastTime = buf.getLong(position);
if (time >= lastTime) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,9 @@ public StringLastAggregator(
@Override
public void aggregate()
{
if (timeSelector.isNull()) {
return;
}
if (needsFoldCheck) {
// Less efficient code path when folding is a possibility (we must read the value selector first just in case
// it's a foldable object).
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,9 @@ public void init(ByteBuffer buf, int position)
@Override
public void aggregate(ByteBuffer buf, int position)
{
if (timeSelector.isNull()) {
return;
}
if (needsFoldCheck) {
// Less efficient code path when folding is a possibility (we must read the value selector first just in case
// it's a foldable object).
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -204,38 +204,20 @@ public Aggregation toDruidAggregation(
theAggFactory = aggregatorType.createAggregatorFactory(aggregatorName, fieldName, null, outputType, -1);
break;
case 2:
if (!outputType.isNumeric()) { // translates (expr, maxBytesPerString) signature
theAggFactory = aggregatorType.createAggregatorFactory(
aggregatorName,
fieldName,
null,
outputType,
RexLiteral.intValue(rexNodes.get(1))
);
} else { // translates (expr, timeColumn) signature
theAggFactory = aggregatorType.createAggregatorFactory(
aggregatorName,
fieldName,
getColumnName(plannerContext, virtualColumnRegistry, args.get(1), rexNodes.get(1)),
outputType,
-1
);
}
break;
case 3:
theAggFactory = aggregatorType.createAggregatorFactory(
aggregatorName,
fieldName,
getColumnName(plannerContext, virtualColumnRegistry, args.get(2), rexNodes.get(2)),
null,
outputType,
RexLiteral.intValue(rexNodes.get(1))
);
break;
default:
throw new IAE(
"aggregation[%s], Invalid number of arguments[%,d] to Earliest/Latest/Any operator",
"aggregation[%s], Invalid number of arguments[%,d] to [%s] operator",
aggregatorName,
args.size()
args.size(),
aggregatorType.name()
);
}

Expand All @@ -245,7 +227,7 @@ public Aggregation toDruidAggregation(
);
}

private String getColumnName(
static String getColumnName(
PlannerContext plannerContext,
VirtualColumnRegistry virtualColumnRegistry,
DruidExpression arg,
Expand Down Expand Up @@ -307,20 +289,9 @@ private static class EarliestLatestSqlAggFunction extends SqlAggFunction
"'" + aggregatorType.name() + "(expr, maxBytesPerString)'\n",
OperandTypes.ANY,
OperandTypes.and(OperandTypes.NUMERIC, OperandTypes.LITERAL)
),
OperandTypes.sequence(
"'" + aggregatorType.name() + "(expr, timeColumn)'\n",
OperandTypes.ANY,
OperandTypes.NUMERIC
),
OperandTypes.sequence(
"'" + aggregatorType.name() + "(expr, maxBytesPerString, timeColumn)'\n",
OperandTypes.ANY,
OperandTypes.and(OperandTypes.NUMERIC, OperandTypes.LITERAL),
OperandTypes.NUMERIC
)
),
SqlFunctionCategory.STRING,
SqlFunctionCategory.USER_DEFINED_FUNCTION,
false,
false,
Optionality.FORBIDDEN
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,181 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.druid.sql.calcite.aggregation.builtin;

import org.apache.calcite.rel.core.AggregateCall;
import org.apache.calcite.rel.core.Project;
import org.apache.calcite.rex.RexBuilder;
import org.apache.calcite.rex.RexLiteral;
import org.apache.calcite.rex.RexNode;
import org.apache.calcite.sql.SqlAggFunction;
import org.apache.calcite.sql.SqlFunctionCategory;
import org.apache.calcite.sql.SqlKind;
import org.apache.calcite.sql.type.InferTypes;
import org.apache.calcite.sql.type.OperandTypes;
import org.apache.calcite.sql.type.SqlReturnTypeInference;
import org.apache.calcite.sql.type.SqlTypeFamily;
import org.apache.calcite.util.Optionality;
import org.apache.druid.java.util.common.IAE;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.query.aggregation.AggregatorFactory;
import org.apache.druid.query.aggregation.post.FinalizingFieldAccessPostAggregator;
import org.apache.druid.segment.column.ColumnType;
import org.apache.druid.segment.column.RowSignature;
import org.apache.druid.sql.calcite.aggregation.Aggregation;
import org.apache.druid.sql.calcite.aggregation.SqlAggregator;
import org.apache.druid.sql.calcite.expression.DruidExpression;
import org.apache.druid.sql.calcite.expression.Expressions;
import org.apache.druid.sql.calcite.planner.Calcites;
import org.apache.druid.sql.calcite.planner.PlannerContext;
import org.apache.druid.sql.calcite.rel.VirtualColumnRegistry;

import javax.annotation.Nullable;
import java.util.Collections;
import java.util.List;
import java.util.stream.Collectors;

public class EarliestLatestBySqlAggregator implements SqlAggregator
{
public static final SqlAggregator EARLIEST_BY = new EarliestLatestBySqlAggregator(EarliestLatestAnySqlAggregator.AggregatorType.EARLIEST);
public static final SqlAggregator LATEST_BY = new EarliestLatestBySqlAggregator(EarliestLatestAnySqlAggregator.AggregatorType.LATEST);

private final EarliestLatestAnySqlAggregator.AggregatorType aggregatorType;
private final SqlAggFunction function;

private EarliestLatestBySqlAggregator(final EarliestLatestAnySqlAggregator.AggregatorType aggregatorType)
{
this.aggregatorType = aggregatorType;
this.function = new EarliestByLatestBySqlAggFunction(aggregatorType);
}

@Override
public SqlAggFunction calciteFunction()
{
return function;
}

@Nullable
@Override
public Aggregation toDruidAggregation(
final PlannerContext plannerContext,
final RowSignature rowSignature,
final VirtualColumnRegistry virtualColumnRegistry,
final RexBuilder rexBuilder,
final String name,
final AggregateCall aggregateCall,
final Project project,
final List<Aggregation> existingAggregations,
final boolean finalizeAggregations
)
{
final List<RexNode> rexNodes = aggregateCall
.getArgList()
.stream()
.map(i -> Expressions.fromFieldAccess(rowSignature, project, i))
.collect(Collectors.toList());

final List<DruidExpression> args = Expressions.toDruidExpressions(plannerContext, rowSignature, rexNodes);

if (args == null) {
return null;
}

final String aggregatorName = finalizeAggregations ? Calcites.makePrefixedName(name, "a") : name;
final ColumnType outputType = Calcites.getColumnTypeForRelDataType(aggregateCall.getType());
if (outputType == null) {
throw new ISE(
"Cannot translate output sqlTypeName[%s] to Druid type for aggregator[%s]",
aggregateCall.getType().getSqlTypeName(),
aggregateCall.getName()
);
}

final String fieldName = EarliestLatestAnySqlAggregator.getColumnName(plannerContext, virtualColumnRegistry, args.get(0), rexNodes.get(0));

final AggregatorFactory theAggFactory;
switch (args.size()) {
case 2:
theAggFactory = aggregatorType.createAggregatorFactory(
aggregatorName,
fieldName,
EarliestLatestAnySqlAggregator.getColumnName(plannerContext, virtualColumnRegistry, args.get(1), rexNodes.get(1)),
outputType,
-1
);
break;
case 3:
theAggFactory = aggregatorType.createAggregatorFactory(
aggregatorName,
fieldName,
EarliestLatestAnySqlAggregator.getColumnName(plannerContext, virtualColumnRegistry, args.get(1), rexNodes.get(1)),
outputType,
RexLiteral.intValue(rexNodes.get(2))
);
break;
default:
throw new IAE(
"aggregation[%s], Invalid number of arguments[%,d] to [%s] operator",
aggregatorName,
args.size(),
aggregatorType.name()
);
}

return Aggregation.create(
Collections.singletonList(theAggFactory),
finalizeAggregations ? new FinalizingFieldAccessPostAggregator(name, aggregatorName) : null
);
}

private static class EarliestByLatestBySqlAggFunction extends SqlAggFunction
{
private static final SqlReturnTypeInference EARLIEST_LATEST_ARG0_RETURN_TYPE_INFERENCE =
new EarliestLatestAnySqlAggregator.EarliestLatestReturnTypeInference(0);

EarliestByLatestBySqlAggFunction(EarliestLatestAnySqlAggregator.AggregatorType aggregatorType)
{
super(
StringUtils.format("%s_BY", aggregatorType.name()),
null,
SqlKind.OTHER_FUNCTION,
EARLIEST_LATEST_ARG0_RETURN_TYPE_INFERENCE,
InferTypes.RETURN_TYPE,
OperandTypes.or(
OperandTypes.sequence(
"'" + aggregatorType.name() + "(expr, timeColumn)'\n",
OperandTypes.ANY,
OperandTypes.family(SqlTypeFamily.TIMESTAMP)
),
OperandTypes.sequence(
"'" + aggregatorType.name() + "(expr, timeColumn, maxBytesPerString)'\n",
OperandTypes.ANY,
OperandTypes.family(SqlTypeFamily.TIMESTAMP),
OperandTypes.and(OperandTypes.NUMERIC, OperandTypes.LITERAL)
)
),
SqlFunctionCategory.USER_DEFINED_FUNCTION,
false,
false,
Optionality.FORBIDDEN
);
}
}
}
Loading

0 comments on commit f2ce769

Please sign in to comment.