Skip to content

Commit

Permalink
fixes confluentinc#2583 extend ROUND and FLOOR to include number of d…
Browse files Browse the repository at this point in the history
…ecimal places
  • Loading branch information
ouertani committed Jun 28, 2019
1 parent 79970a0 commit bbde8e8
Show file tree
Hide file tree
Showing 4 changed files with 92 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -239,17 +239,29 @@ private void addMathFunctions() {
CeilKudf.class));

addBuiltInFunction(KsqlFunction.createLegacyBuiltIn(
Schema.OPTIONAL_FLOAT64_SCHEMA,
Schema.OPTIONAL_INT64_SCHEMA,
Collections.singletonList(Schema.OPTIONAL_FLOAT64_SCHEMA),
"FLOOR",
FloorKudf.NAME,
FloorKudf.class));

addBuiltInFunction(KsqlFunction.createLegacyBuiltIn(
Schema.OPTIONAL_FLOAT64_SCHEMA,
ImmutableList.of(Schema.FLOAT64_SCHEMA, Schema.OPTIONAL_INT32_SCHEMA),
FloorKudf.NAME,
FloorKudf.class));

addBuiltInFunction(KsqlFunction.createLegacyBuiltIn(
Schema.OPTIONAL_INT64_SCHEMA,
Collections.singletonList(Schema.OPTIONAL_FLOAT64_SCHEMA),
"ROUND",
RoundKudf.NAME,
RoundKudf.class));

addBuiltInFunction(KsqlFunction.createLegacyBuiltIn(
Schema.OPTIONAL_FLOAT64_SCHEMA,
ImmutableList.of(Schema.FLOAT64_SCHEMA, Schema.OPTIONAL_INT32_SCHEMA),
RoundKudf.NAME,
RoundKudf.class));

addBuiltInFunction(KsqlFunction.createLegacyBuiltIn(
Schema.OPTIONAL_FLOAT64_SCHEMA,
Collections.emptyList(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,18 @@
import io.confluent.ksql.function.udf.Kudf;

public class FloorKudf implements Kudf {
public static final String NAME = "FLOOR";

@Override
public Object evaluate(final Object... args) {
if (args.length != 1) {
throw new KsqlFunctionException("Floor udf should have one input argument.");
if (args.length != 1 && args.length != 2) {
throw new KsqlFunctionException("Floor udf should have one or two input arguments.");
}
return Math.floor((Double) args[0]);
final Double number = (Double) args[0];
if (args.length == 1) {
return (long) Math.floor(number);
}
final Double round = Math.pow(10, (Integer) args[1]);
return Math.floor(number * round) / round;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,20 @@
import io.confluent.ksql.function.udf.Kudf;

public class RoundKudf implements Kudf {
public static final String NAME = "ROUND";

@Override
public Object evaluate(final Object... args) {
if (args.length != 1) {
throw new KsqlFunctionException("Len udf should have one input argument.");
if (args.length != 1 && args.length != 2) {
throw new KsqlFunctionException("Round udf should have one or two input arguments.");
}
return Math.round((Double) args[0]);

final Double number = (Double) args[0];

if (args.length == 1) {
return Math.round(number);
}
final Double round = Math.pow(10, (Integer) args[1]);
return Math.round(number * round) / round;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
{
"comments": [
"Tests covering the use of the ROUND(DOUBLE), ROUND(DOUBLE,INT), FLOOR(DOUBLE), FLOOR(DOUBLE,INT) function."
],
"tests": [
{
"name": "calculate ROUND function",
"statements": [
"CREATE STREAM test (v DOUBLE) WITH (kafka_topic='test_topic', value_format='JSON');",
"CREATE STREAM OUTPUT AS SELECT ROUND(v) as R0, ROUND(v, 0) as R00, ROUND(v, 1) as R1, ROUND(v, 2) as R2, ROUND(v, 10) as R10 FROM test;"
],
"inputs": [
{"topic": "test_topic", "value": {"v" : 1.2}},
{"topic": "test_topic", "value": {"v" : 1.7}},
{"topic": "test_topic", "value": {"v" : 1.5}},
{"topic": "test_topic", "value": {"v" : 3}},
{"topic": "test_topic", "value": {"v" : 1.234567}},
{"topic": "test_topic", "value": {"v" : 0}},
{"topic": "test_topic", "value": {"v" : null}}
],
"outputs": [
{"topic": "OUTPUT", "value": {"R0" : 1, "R00" : 1.0, "R1" : 1.2, "R2" : 1.2, "R10" : 1.2}},
{"topic": "OUTPUT", "value": {"R0" : 2, "R00" : 2.0, "R1" : 1.7, "R2" : 1.7, "R10" : 1.7}},
{"topic": "OUTPUT", "value": {"R0" : 2, "R00" : 2.0, "R1" : 1.5, "R2" : 1.5, "R10" : 1.5}},
{"topic": "OUTPUT", "value": {"R0" : 3, "R00" : 3.0, "R1" : 3.0, "R2" : 3.0, "R10" : 3.0}},
{"topic": "OUTPUT", "value": {"R0" : 1, "R00" : 1.0, "R1" : 1.2, "R2" : 1.23, "R10" : 1.234567}},
{"topic": "OUTPUT", "value": {"R0" : 0, "R00" : 0.0, "R1" : 0.0, "R2" : 0.0, "R10" : 0.0}},
{"topic": "OUTPUT", "value": {"R0" : null, "R00" : null, "R1" : null, "R2" : null, "R10" : null}}
]
},
{
"name": "calculate FLOOR function",
"statements": [
"CREATE STREAM test (v DOUBLE) WITH (kafka_topic='test_topic', value_format='JSON');",
"CREATE STREAM OUTPUT AS SELECT FLOOR(v) as F0, FLOOR(v, 0) as F00, FLOOR(v, 1) as F1, FLOOR(v, 2) as F2, FLOOR(v, 10) as F10 FROM test;"
],
"inputs": [
{"topic": "test_topic", "value": {"v" : 1.2}},
{"topic": "test_topic", "value": {"v" : 1.7}},
{"topic": "test_topic", "value": {"v" : 1.5}},
{"topic": "test_topic", "value": {"v" : 3}},
{"topic": "test_topic", "value": {"v" : 1.234567}},
{"topic": "test_topic", "value": {"v" : 0}},
{"topic": "test_topic", "value": {"v" : null}}
],
"outputs": [
{"topic": "OUTPUT", "value": {"F0" : 1, "F00" : 1.0, "F1" : 1.2, "F2" : 1.2, "F10" : 1.2}},
{"topic": "OUTPUT", "value": {"F0" : 1, "F00" : 1.0, "F1" : 1.7, "F2" : 1.7, "F10" : 1.7}},
{"topic": "OUTPUT", "value": {"F0" : 1, "F00" : 1.0, "F1" : 1.5, "F2" : 1.5, "F10" : 1.5}},
{"topic": "OUTPUT", "value": {"F0" : 3, "F00" : 3.0, "F1" : 3.0, "F2" : 3.0, "F10" : 3.0}},
{"topic": "OUTPUT", "value": {"F0" : 1, "F00" : 1.0, "F1" : 1.2, "F2" : 1.23, "F10" : 1.234567}},
{"topic": "OUTPUT", "value": {"F0" : 0, "F00" : 0.0, "F1" : 0.0, "F2" : 0.0, "F10" : 0.0}},
{"topic": "OUTPUT", "value": {"F0" : null, "F00" : null, "F1" : null, "F2" : null, "F10" : null}}
]
}
]
}

0 comments on commit bbde8e8

Please sign in to comment.