Skip to content

Commit

Permalink
Tuple sketch SQL support (#13887)
Browse files Browse the repository at this point in the history
This PR is a follow-up to #13819 so that the Tuple sketch functionality can be used in SQL for both ingestion using Multi-Stage Queries (MSQ) and also for analytic queries against Tuple sketch columns.
  • Loading branch information
frankgrimes97 authored Mar 28, 2023
1 parent c2fe6a4 commit 2f98675
Show file tree
Hide file tree
Showing 17 changed files with 1,116 additions and 4 deletions.
6 changes: 6 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -33,3 +33,9 @@ integration-tests/gen-scripts/
**/.ipython/
**/.jupyter/
**/.local/

# ignore NetBeans IDE specific files
nbproject
nbactions.xml
nb-configuration.xml

10 changes: 10 additions & 0 deletions docs/querying/sql-aggregations.md
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,16 @@ Load the [DataSketches extension](../development/extensions-core/datasketches-ex
|`DS_QUANTILES_SKETCH(expr, [k])`|Creates a [Quantiles sketch](../development/extensions-core/datasketches-quantiles.md) on the values of `expr`, which can be a regular column or a column containing quantiles sketches. The `k` parameter is described in the Quantiles sketch documentation.<br/><br/>See the [known issue](sql-translation.md#approximations) with this function.|`'0'` (STRING)|


### Tuple sketch functions

Load the [DataSketches extension](../development/extensions-core/datasketches-extension.md) to use the following functions.

|Function|Notes|Default|
|--------|-----|-------|
|`DS_TUPLE_DOUBLES(expr, [nominalEntries])`|Creates a [Tuple sketch](../development/extensions-core/datasketches-tuple.md) on the values of `expr` which is a column containing Tuple sketches which contain an array of double values as their Summary Objects. The `nominalEntries` override parameter is optional and described in the Tuple sketch documentation.
|`DS_TUPLE_DOUBLES(dimensionColumnExpr, metricColumnExpr, ..., [nominalEntries])`|Creates a [Tuple sketch](../development/extensions-core/datasketches-tuple.md) which contains an array of double values as its Summary Object based on the dimension value of `dimensionColumnExpr` and the numeric metric values contained in one or more `metricColumnExpr` columns. If the last value of the array is a numeric literal, Druid assumes that the value is an override parameter for [nominal entries](../development/extensions-core/datasketches-tuple.md).


### T-Digest sketch functions

Load the T-Digest extension to use the following functions. See the [T-Digest extension](../development/extensions-contrib/tdigestsketch-quantiles.md) for additional details and for more information on these functions.
Expand Down
42 changes: 42 additions & 0 deletions docs/querying/sql-functions.md
Original file line number Diff line number Diff line change
Expand Up @@ -491,6 +491,48 @@ Returns an approximate rank between 0 and 1 of a given value, in which the rank

Creates a Theta sketch on a column containing Theta sketches or a regular column.

## DS_TUPLE_DOUBLES

`DS_TUPLE_DOUBLES(expr, [nominalEntries])`

`DS_TUPLE_DOUBLES(dimensionColumnExpr, metricColumnExpr, ..., [nominalEntries])`

**Function type:** [Aggregation](sql-aggregations.md)

Creates a Tuple sketch which contains an array of double values as the Summary Object. If the last value of the array is a numeric literal, Druid assumes that the value is an override parameter for [nominal entries](../development/extensions-core/datasketches-tuple.md).

## DS_TUPLE_DOUBLES_INTERSECT

`DS_TUPLE_DOUBLES_INTERSECT(expr, ..., [nominalEntries])`

**Function type:** [Scalar, sketch](sql-scalar.md#tuple-sketch-functions)

Returns an intersection of Tuple sketches which each contain an array of double values as their Summary Objects. The values contained in the Summary Objects are summed when combined. If the last value of the array is a numeric literal, Druid assumes that the value is an override parameter for [nominal entries](../development/extensions-core/datasketches-tuple.md).

## DS_TUPLE_DOUBLES_METRICS_SUM_ESTIMATE

`DS_TUPLE_DOUBLES_METRICS_SUM_ESTIMATE(expr)`

**Function type:** [Scalar, sketch](sql-scalar.md#tuple-sketch-functions)

Computes approximate sums of the values contained within a Tuple sketch which contains an array of double values as the Summary Object.

## DS_TUPLE_DOUBLES_NOT

`DS_TUPLE_DOUBLES_NOT(expr, ..., [nominalEntries])`

**Function type:** [Scalar, sketch](sql-scalar.md#tuple-sketch-functions)

Returns a set difference of Tuple sketches which each contain an array of double values as their Summary Objects. The values contained in the Summary Object are preserved as is. If the last value of the array is a numeric literal, Druid assumes that the value is an override parameter for [nominal entries](../development/extensions-core/datasketches-tuple.md).

## DS_TUPLE_DOUBLES_UNION

`DS_TUPLE_DOUBLES_UNION(expr, ..., [nominalEntries])`

**Function type:** [Scalar, sketch](sql-scalar.md#tuple-sketch-functions)

Returns a union of Tuple sketches which each contain an array of double values as their Summary Objects. The values contained in the Summary Objects are summed when combined. If the last value of the array is a numeric literal, Druid assumes that the value is an override parameter for [nominal entries](../development/extensions-core/datasketches-tuple.md).

## EARLIEST

`EARLIEST(expr)`
Expand Down
13 changes: 13 additions & 0 deletions docs/querying/sql-scalar.md
Original file line number Diff line number Diff line change
Expand Up @@ -248,6 +248,19 @@ The [DataSketches extension](../development/extensions-core/datasketches-extensi
|`DS_RANK(expr, value)`|Returns an approximation to the rank of a given value that is the fraction of the distribution less than that value from a quantiles sketch. `expr` must return a quantiles sketch.|
|`DS_QUANTILE_SUMMARY(expr)`|Returns a string summary of a quantiles sketch, useful for debugging. `expr` must return a quantiles sketch.|

### Tuple sketch functions

The following functions operate on [tuple sketches](../development/extensions-core/datasketches-tuple.md).
The [DataSketches extension](../development/extensions-core/datasketches-extension.md) must be loaded to use the following functions.

|Function|Notes|Default|
|--------|-----|-------|
|`DS_TUPLE_DOUBLES_METRICS_SUM_ESTIMATE(expr)`|Computes approximate sums of the values contained within a [Tuple sketch](../development/extensions-core/datasketches-tuple.md#estimated-metrics-values-for-each-column-of-arrayofdoublessketch) column which contains an array of double values as its Summary Object.
|`DS_TUPLE_DOUBLES_INTERSECT(expr, ..., [nominalEntries])`|Returns an intersection of tuple sketches, where each input expression must return a tuple sketch which contains an array of double values as its Summary Object. The values contained in the Summary Objects are summed when combined. If the last value of the array is a numeric literal, Druid assumes that the value is an override parameter for [nominal entries](../development/extensions-core/datasketches-tuple.md).|
|`DS_TUPLE_DOUBLES_NOT(expr, ..., [nominalEntries])`|Returns a set difference of tuple sketches, where each input expression must return a tuple sketch which contains an array of double values as its Summary Object. The values contained in the Summary Object are preserved as is. If the last value of the array is a numeric literal, Druid assumes that the value is an override parameter for [nominal entries](../development/extensions-core/datasketches-tuple.md).|
|`DS_TUPLE_DOUBLES_UNION(expr, ..., [nominalEntries])`|Returns a union of tuple sketches, where each input expression must return a tuple sketch which contains an array of double values as its Summary Object. The values contained in the Summary Objects are summed when combined. If the last value of the array is a numeric literal, Druid assumes that the value is an override parameter for [nominal entries](../development/extensions-core/datasketches-tuple.md).|


## Other scalar functions

|Function|Notes|
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,18 @@
import com.fasterxml.jackson.databind.Module;
import com.fasterxml.jackson.databind.jsontype.NamedType;
import com.fasterxml.jackson.databind.module.SimpleModule;
import com.google.common.annotations.VisibleForTesting;
import com.google.inject.Binder;
import org.apache.datasketches.tuple.arrayofdoubles.ArrayOfDoublesSketch;
import org.apache.druid.initialization.DruidModule;
import org.apache.druid.query.aggregation.datasketches.tuple.sql.ArrayOfDoublesSketchMetricsSumEstimateOperatorConversion;
import org.apache.druid.query.aggregation.datasketches.tuple.sql.ArrayOfDoublesSketchSetIntersectOperatorConversion;
import org.apache.druid.query.aggregation.datasketches.tuple.sql.ArrayOfDoublesSketchSetNotOperatorConversion;
import org.apache.druid.query.aggregation.datasketches.tuple.sql.ArrayOfDoublesSketchSetUnionOperatorConversion;
import org.apache.druid.query.aggregation.datasketches.tuple.sql.ArrayOfDoublesSketchSqlAggregator;
import org.apache.druid.segment.column.ColumnType;
import org.apache.druid.segment.serde.ComplexMetrics;
import org.apache.druid.sql.guice.SqlBindings;

import java.util.Collections;
import java.util.List;
Expand Down Expand Up @@ -58,9 +65,14 @@ public class ArrayOfDoublesSketchModule implements DruidModule
@Override
public void configure(final Binder binder)
{
ComplexMetrics.registerSerde(ARRAY_OF_DOUBLES_SKETCH, new ArrayOfDoublesSketchMergeComplexMetricSerde());
ComplexMetrics.registerSerde(ARRAY_OF_DOUBLES_SKETCH_MERGE_AGG, new ArrayOfDoublesSketchMergeComplexMetricSerde());
ComplexMetrics.registerSerde(ARRAY_OF_DOUBLES_SKETCH_BUILD_AGG, new ArrayOfDoublesSketchBuildComplexMetricSerde());
registerSerde();
SqlBindings.addAggregator(binder, ArrayOfDoublesSketchSqlAggregator.class);

SqlBindings.addOperatorConversion(binder, ArrayOfDoublesSketchMetricsSumEstimateOperatorConversion.class);
SqlBindings.addOperatorConversion(binder, ArrayOfDoublesSketchSetIntersectOperatorConversion.class);
SqlBindings.addOperatorConversion(binder, ArrayOfDoublesSketchSetUnionOperatorConversion.class);
SqlBindings.addOperatorConversion(binder, ArrayOfDoublesSketchSetNotOperatorConversion.class);

}

@Override
Expand Down Expand Up @@ -124,4 +136,13 @@ public List<? extends Module> getJacksonModules()
);
}

@VisibleForTesting
public static void registerSerde()
{
ComplexMetrics.registerSerde(ARRAY_OF_DOUBLES_SKETCH, new ArrayOfDoublesSketchMergeComplexMetricSerde());
ComplexMetrics.registerSerde(ARRAY_OF_DOUBLES_SKETCH_MERGE_AGG, new ArrayOfDoublesSketchMergeComplexMetricSerde());
ComplexMetrics.registerSerde(ARRAY_OF_DOUBLES_SKETCH_BUILD_AGG, new ArrayOfDoublesSketchBuildComplexMetricSerde());
}


}
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,9 @@ public ArrayOfDoublesSketchToMetricsSumEstimatePostAggregator(
public double[] compute(final Map<String, Object> combinedAggregators)
{
final ArrayOfDoublesSketch sketch = (ArrayOfDoublesSketch) getField().compute(combinedAggregators);
if (sketch == null) {
return null;
}
final SummaryStatistics[] stats = new SummaryStatistics[sketch.getNumValues()];
Arrays.setAll(stats, i -> new SummaryStatistics());
final ArrayOfDoublesSketchIterator it = sketch.iterator();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.druid.query.aggregation.datasketches.tuple.sql;

import org.apache.calcite.rex.RexCall;
import org.apache.calcite.rex.RexNode;
import org.apache.calcite.sql.SqlFunction;
import org.apache.calcite.sql.SqlOperator;
import org.apache.calcite.sql.type.SqlTypeFamily;
import org.apache.calcite.sql.type.SqlTypeName;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.query.aggregation.PostAggregator;
import org.apache.druid.query.aggregation.datasketches.tuple.ArrayOfDoublesSketchToMetricsSumEstimatePostAggregator;
import org.apache.druid.segment.column.RowSignature;
import org.apache.druid.sql.calcite.expression.DruidExpression;
import org.apache.druid.sql.calcite.expression.OperatorConversions;
import org.apache.druid.sql.calcite.expression.PostAggregatorVisitor;
import org.apache.druid.sql.calcite.expression.SqlOperatorConversion;
import org.apache.druid.sql.calcite.planner.PlannerContext;

import javax.annotation.Nullable;
import java.util.List;

public class ArrayOfDoublesSketchMetricsSumEstimateOperatorConversion implements SqlOperatorConversion
{
private static final String FUNCTION_NAME = "DS_TUPLE_DOUBLES_METRICS_SUM_ESTIMATE";
private static final SqlFunction SQL_FUNCTION = OperatorConversions
.operatorBuilder(StringUtils.toUpperCase(FUNCTION_NAME))
.operandTypes(SqlTypeFamily.ANY)
.returnTypeNullableArrayWithNullableElements(SqlTypeName.DOUBLE)
.build();


@Override
public SqlOperator calciteOperator()
{
return SQL_FUNCTION;
}

@Override
public DruidExpression toDruidExpression(
PlannerContext plannerContext,
RowSignature rowSignature,
RexNode rexNode
)
{
return null;
}

@Nullable
@Override
public PostAggregator toPostAggregator(
PlannerContext plannerContext,
RowSignature rowSignature,
RexNode rexNode,
PostAggregatorVisitor postAggregatorVisitor
)
{
final List<RexNode> operands = ((RexCall) rexNode).getOperands();
final PostAggregator firstOperand = OperatorConversions.toPostAggregator(
plannerContext,
rowSignature,
operands.get(0),
postAggregatorVisitor,
true
);

if (firstOperand == null) {
return null;
}

return new ArrayOfDoublesSketchToMetricsSumEstimatePostAggregator(
postAggregatorVisitor.getOutputNamePrefix() + postAggregatorVisitor.getAndIncrementCounter(),
firstOperand
);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,144 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.druid.query.aggregation.datasketches.tuple.sql;

import org.apache.calcite.rex.RexCall;
import org.apache.calcite.rex.RexLiteral;
import org.apache.calcite.rex.RexNode;
import org.apache.calcite.sql.SqlFunction;
import org.apache.calcite.sql.SqlFunctionCategory;
import org.apache.calcite.sql.SqlKind;
import org.apache.calcite.sql.SqlOperator;
import org.apache.calcite.sql.type.OperandTypes;
import org.apache.calcite.sql.type.SqlOperandCountRanges;
import org.apache.datasketches.Util;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.query.aggregation.PostAggregator;
import org.apache.druid.query.aggregation.datasketches.tuple.ArrayOfDoublesSketchSetOpPostAggregator;
import org.apache.druid.segment.column.RowSignature;
import org.apache.druid.sql.calcite.expression.DruidExpression;
import org.apache.druid.sql.calcite.expression.OperatorConversions;
import org.apache.druid.sql.calcite.expression.PostAggregatorVisitor;
import org.apache.druid.sql.calcite.expression.SqlOperatorConversion;
import org.apache.druid.sql.calcite.planner.PlannerContext;

import javax.annotation.Nullable;
import java.util.ArrayList;
import java.util.List;

public abstract class ArrayOfDoublesSketchSetBaseOperatorConversion implements SqlOperatorConversion
{
public ArrayOfDoublesSketchSetBaseOperatorConversion()
{
}

@Override
public SqlOperator calciteOperator()
{
return makeSqlFunction();
}

@Nullable
@Override
public DruidExpression toDruidExpression(
PlannerContext plannerContext,
RowSignature rowSignature,
RexNode rexNode
)
{
plannerContext.setPlanningError("%s can only be used on aggregates. " +
"It cannot be used directly on a column or on a scalar expression.", getFunctionName());
return null;
}

@Nullable
@Override
public PostAggregator toPostAggregator(
PlannerContext plannerContext,
RowSignature rowSignature,
RexNode rexNode,
PostAggregatorVisitor postAggregatorVisitor
)
{
final List<RexNode> operands = ((RexCall) rexNode).getOperands();
final List<PostAggregator> inputPostAggs = new ArrayList<>();
final int nominalEntries;
Integer numberOfvalues = null;

final int metricExpressionEndIndex;
final int lastArgIndex = operands.size() - 1;
final RexNode potentialNominalEntriesArg = operands.get(lastArgIndex);

if (potentialNominalEntriesArg.isA(SqlKind.LITERAL) &&
RexLiteral.value(potentialNominalEntriesArg) instanceof Number) {

nominalEntries = ((Number) RexLiteral.value(potentialNominalEntriesArg)).intValue();
metricExpressionEndIndex = lastArgIndex - 1;
} else {
nominalEntries = Util.DEFAULT_NOMINAL_ENTRIES;
metricExpressionEndIndex = lastArgIndex;
}

for (int i = 0; i <= metricExpressionEndIndex; i++) {
RexNode operand = operands.get(i);
final PostAggregator convertedPostAgg = OperatorConversions.toPostAggregator(
plannerContext,
rowSignature,
operand,
postAggregatorVisitor,
true
);

if (convertedPostAgg == null) {
return null;
} else {
inputPostAggs.add(convertedPostAgg);
}
}

return new ArrayOfDoublesSketchSetOpPostAggregator(
postAggregatorVisitor.getOutputNamePrefix() + postAggregatorVisitor.getAndIncrementCounter(),
getSetOperationName(),
nominalEntries,
numberOfvalues,
inputPostAggs
);
}

private SqlFunction makeSqlFunction()
{
return new SqlFunction(
getFunctionName(),
SqlKind.OTHER_FUNCTION,
ArrayOfDoublesSketchSqlOperators.RETURN_TYPE_INFERENCE,
null,
OperandTypes.variadic(SqlOperandCountRanges.from(2)),
SqlFunctionCategory.USER_DEFINED_FUNCTION
);
}

public abstract String getSetOperationName();

public String getFunctionName()
{
return StringUtils.format("DS_TUPLE_DOUBLES_%s", getSetOperationName());
}

}
Loading

0 comments on commit 2f98675

Please sign in to comment.