Skip to content

Commit

Permalink
feat: Implement ROUND() UDF (#3404)
Browse files Browse the repository at this point in the history
Implements the ROUND() function.
Please see #3404 for more information.
  • Loading branch information
purplefox authored Sep 26, 2019
1 parent bb5a491 commit f9783a9
Show file tree
Hide file tree
Showing 8 changed files with 494 additions and 46 deletions.
9 changes: 8 additions & 1 deletion docs/developer-guide/syntax-reference.rst
Original file line number Diff line number Diff line change
Expand Up @@ -1680,7 +1680,14 @@ Scalar functions
| REPLACE | ``REPLACE(col1, 'foo', 'bar')`` | Replace all instances of a substring in a string |
| | | with a new string. |
+------------------------+---------------------------------------------------------------------------+---------------------------------------------------+
| ROUND | ``ROUND(col1)`` | Round a value to the nearest BIGINT value. |
| ROUND | ``ROUND(col1)`` or ``ROUND(col1, scale)`` | Round a value to the number of decimal places |
| | | as specified by scale to the right of the decimal |
| | | point. If scale is negative then value is rounded |
| | | to the right of the decimal point. |
| | | Numbers equidistant to the nearest value are |
| | | rounded up (in the positive direction). |
| | | If the number of decimal places is not provided |
| | | it defaults to zero. |
+------------------------+---------------------------------------------------------------------------+---------------------------------------------------+
| SIGN | ``SIGN(col1)`` | The sign of a numeric value as an INTEGER: |
| | | * -1 if the argument is negative |
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
import io.confluent.ksql.function.udf.json.JsonExtractStringKudf;
import io.confluent.ksql.function.udf.math.CeilKudf;
import io.confluent.ksql.function.udf.math.RandomKudf;
import io.confluent.ksql.function.udf.math.RoundKudf;
import io.confluent.ksql.function.udf.string.ConcatKudf;
import io.confluent.ksql.function.udf.string.IfNullKudf;
import io.confluent.ksql.function.udf.string.LCaseKudf;
Expand Down Expand Up @@ -224,12 +223,6 @@ private void addMathFunctions() {
"CEIL",
CeilKudf.class));

addBuiltInFunction(KsqlFunction.createLegacyBuiltIn(
Schema.OPTIONAL_INT64_SCHEMA,
Collections.singletonList(Schema.OPTIONAL_FLOAT64_SCHEMA),
"ROUND",
RoundKudf.class));

addBuiltInFunction(KsqlFunction.createLegacyBuiltIn(
Schema.OPTIONAL_FLOAT64_SCHEMA,
Collections.emptyList(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,9 +55,6 @@ public BigDecimal abs(@UdfParameter final BigDecimal val) {

@UdfSchemaProvider
public SqlType provideSchema(final List<SqlType> params) {
if (params.size() != 1) {
throw new KsqlException("Abs udf accepts one parameter");
}
final SqlType s = params.get(0);
if (s.baseType() != SqlBaseType.DECIMAL) {
throw new KsqlException("The schema provider method for Abs expects a BigDecimal parameter"
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
/*
* Copyright 2019 Confluent Inc.
*
* Licensed under the Confluent Community License (the "License"; you may not use
* this file except in compliance with the License. You may obtain a copy of the
* License at
*
* http://www.confluent.io/confluent-community-license
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/

package io.confluent.ksql.function.udf.math;

import io.confluent.ksql.function.udf.Udf;
import io.confluent.ksql.function.udf.UdfDescription;
import io.confluent.ksql.function.udf.UdfParameter;
import io.confluent.ksql.function.udf.UdfSchemaProvider;
import io.confluent.ksql.schema.ksql.SqlBaseType;
import io.confluent.ksql.schema.ksql.types.SqlDecimal;
import io.confluent.ksql.schema.ksql.types.SqlType;
import io.confluent.ksql.util.KsqlException;
import java.math.BigDecimal;
import java.math.RoundingMode;
import java.util.List;

/*
The rounding behaviour implemented here follows that of java.lang.Math.round() - we do that
in order to provide compatibility with the previous ROUND() implementation which used
Math.round(). The BigDecimal HALF_UP rounding behaviour is a bit more sane and would be a better
choice if we were starting from scratch.
It's an implementation of rounding "half up". This means we round to the nearest integer value and
in the case we are equidistant from the two nearest integers we round UP to the nearest integer.
This means:
ROUND(1.1) -> 1
ROUND(1.5) -> 2
ROUND(1.9) -> 2
ROUND(-1.1) -> -1
ROUND(-1.5) -> -1 Note this is not -2! We round up and up is in a positive direction.
ROUND(-1.9) -> -2
Unfortunately there is an inconsistency in the way that java.lang.Math and
BigDecimal work with respect to rounding:
Math.round(1.5) --> 2
Math.round(-1.5) -- -1
But:
new BigDecimal("1.5").setScale(2, RoundingMode.HALF_UP) --> 2
new BigDecimal("-1.5").setScale(2, RoundingMode.HALF_UP) --> -2
There isn't any BigDecimal rounding mode which captures the java.lang.Math behaviour so
we need to use different rounding modes on BigDecimal depending on whether the value
is +ve or -ve to get consistent behaviour.
*/
@UdfDescription(name = "Round", description = Round.DESCRIPTION)
public class Round {

static final String DESCRIPTION =
"Round a value to the number of decimal places as specified by scale to the right of the "
+ "decimal point. If scale is negative then value is rounded to the right of the decimal "
+ "point. Numbers equidistant to the nearest value are rounded up (in the positive"
+ " direction). If the number of decimal places is not provided it defaults to zero.";

@Udf
public Long round(@UdfParameter final long val) {
return val;
}

@Udf
public Long round(@UdfParameter final int val) {
return (long)val;
}

@Udf
public Long round(@UdfParameter final Double val) {
return val == null ? null : Math.round(val);
}

@Udf
public Double round(@UdfParameter final Double val, @UdfParameter final Integer decimalPlaces) {
return val == null
? null
: roundBigDecimal(BigDecimal.valueOf(val), decimalPlaces).doubleValue();
}

@Udf(schemaProvider = "provideDecimalSchema")
public BigDecimal round(@UdfParameter final BigDecimal val) {
return round(val, 0);
}

@Udf(schemaProvider = "provideDecimalSchemaWithDecimalPlaces")
public BigDecimal round(
@UdfParameter final BigDecimal val,
@UdfParameter final Integer decimalPlaces
) {
return val == null ? null : roundBigDecimal(val, decimalPlaces);
}

@UdfSchemaProvider
public SqlType provideDecimalSchemaWithDecimalPlaces(final List<SqlType> params) {
final SqlType s0 = params.get(0);
if (s0.baseType() != SqlBaseType.DECIMAL) {
throw new KsqlException("The schema provider method for round expects a BigDecimal parameter"
+ "type as first parameter.");
}
final SqlType s1 = params.get(1);
if (s1.baseType() != SqlBaseType.INTEGER) {
throw new KsqlException("The schema provider method for round expects an Integer parameter"
+ "type as second parameter.");
}
return s0;
}

@UdfSchemaProvider
public SqlType provideDecimalSchema(final List<SqlType> params) {
final SqlType s0 = params.get(0);
if (s0.baseType() != SqlBaseType.DECIMAL) {
throw new KsqlException("The schema provider method for round expects a BigDecimal parameter"
+ "type as a parameter.");
}
final SqlDecimal param = (SqlDecimal)s0;
return SqlDecimal.of(param.getPrecision() - param.getScale(), 0);
}

private BigDecimal roundBigDecimal(final BigDecimal val, final int decimalPlaces) {
final RoundingMode roundingMode = val.compareTo(BigDecimal.ZERO) > 0
? RoundingMode.HALF_UP : RoundingMode.HALF_DOWN;
return val.setScale(decimalPlaces, roundingMode);
}
}

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -379,7 +379,7 @@ public void shouldHaveBuiltInUDFRegistered() {
// String UDF
"LCASE", "UCASE", "CONCAT", "TRIM", "IFNULL", "LEN",
// Math UDF
"CEIL", "ROUND", "RANDOM",
"CEIL", "RANDOM",
// JSON UDF
"EXTRACTJSONFIELD", "ARRAYCONTAINS",
// Struct UDF
Expand Down
Loading

0 comments on commit f9783a9

Please sign in to comment.