From 098d468640edabab91976d1fbe5315ae9ac1b073 Mon Sep 17 00:00:00 2001 From: Alex Diachenko Date: Mon, 5 Oct 2020 17:24:44 -0700 Subject: [PATCH] KAFKA-10439: Connect's Values to parse BigInteger as Decimal with zero scale. (#9320) The `org.apache.kafka.connect.data.Values#parse` method parses integers, which are larger than `Long.MAX_VALUE` as `double` with `Schema.FLOAT64_SCHEMA`. That means we are losing precision for these larger integers. For example: `SchemaAndValue schemaAndValue = Values.parseString("9223372036854775808");` returns: `SchemaAndValue{schema=Schema{FLOAT64}, value=9.223372036854776E18}` Also, this method parses values that can be parsed as `FLOAT32` to `FLOAT64`. This PR changes parsing logic, to use `FLOAT32`/`FLOAT64` for numbers that don't have fraction part(`decimal.scale()!=0`) only, and use an arbitrary-precision `org.apache.kafka.connect.data.Decimal` otherwise. Also, it updates the method to parse numbers, that can be represented as `float` to `FLOAT64`. Added unit tests, that cover parsing `BigInteger`, `Byte`, `Short`, `Integer`, `Long`, `Float`, `Double` types. Reviewers: Konstantine Karantasis --- .../org/apache/kafka/connect/data/Values.java | 8 +- .../apache/kafka/connect/data/ValuesTest.java | 128 ++++++++++++++++++ 2 files changed, 135 insertions(+), 1 deletion(-) diff --git a/connect/api/src/main/java/org/apache/kafka/connect/data/Values.java b/connect/api/src/main/java/org/apache/kafka/connect/data/Values.java index d4085f8b28335..7ea9eb4a0da25 100644 --- a/connect/api/src/main/java/org/apache/kafka/connect/data/Values.java +++ b/connect/api/src/main/java/org/apache/kafka/connect/data/Values.java @@ -943,8 +943,14 @@ protected static SchemaAndValue parse(Parser parser, boolean embedded) throws No } catch (ArithmeticException e) { // continue } + float fValue = decimal.floatValue(); + if (fValue != Float.NEGATIVE_INFINITY && fValue != Float.POSITIVE_INFINITY + && decimal.scale() != 0) { + return new SchemaAndValue(Schema.FLOAT32_SCHEMA, fValue); + } double dValue = decimal.doubleValue(); - if (dValue != Double.NEGATIVE_INFINITY && dValue != Double.POSITIVE_INFINITY) { + if (dValue != Double.NEGATIVE_INFINITY && dValue != Double.POSITIVE_INFINITY + && decimal.scale() != 0) { return new SchemaAndValue(Schema.FLOAT64_SCHEMA, dValue); } Schema schema = Decimal.schema(decimal.scale()); diff --git a/connect/api/src/test/java/org/apache/kafka/connect/data/ValuesTest.java b/connect/api/src/test/java/org/apache/kafka/connect/data/ValuesTest.java index c437e46c25956..63c05eaa9b1e3 100644 --- a/connect/api/src/test/java/org/apache/kafka/connect/data/ValuesTest.java +++ b/connect/api/src/test/java/org/apache/kafka/connect/data/ValuesTest.java @@ -21,6 +21,8 @@ import org.apache.kafka.connect.errors.DataException; import org.junit.Test; +import java.math.BigDecimal; +import java.math.BigInteger; import java.text.SimpleDateFormat; import java.util.ArrayList; import java.util.Arrays; @@ -726,6 +728,132 @@ public void shouldConvertTimestampValues() { public void canConsume() { } + @Test + public void shouldParseBigIntegerAsDecimalWithZeroScale() { + BigInteger value = BigInteger.valueOf(Long.MAX_VALUE).add(new BigInteger("1")); + SchemaAndValue schemaAndValue = Values.parseString( + String.valueOf(value) + ); + assertEquals(Decimal.schema(0), schemaAndValue.schema()); + assertTrue(schemaAndValue.value() instanceof BigDecimal); + assertEquals(value, ((BigDecimal) schemaAndValue.value()).unscaledValue()); + value = BigInteger.valueOf(Long.MIN_VALUE).subtract(new BigInteger("1")); + schemaAndValue = Values.parseString( + String.valueOf(value) + ); + assertEquals(Decimal.schema(0), schemaAndValue.schema()); + assertTrue(schemaAndValue.value() instanceof BigDecimal); + assertEquals(value, ((BigDecimal) schemaAndValue.value()).unscaledValue()); + } + + @Test + public void shouldParseByteAsInt8() { + Byte value = Byte.MAX_VALUE; + SchemaAndValue schemaAndValue = Values.parseString( + String.valueOf(value) + ); + assertEquals(Schema.INT8_SCHEMA, schemaAndValue.schema()); + assertTrue(schemaAndValue.value() instanceof Byte); + assertEquals(value.byteValue(), ((Byte) schemaAndValue.value()).byteValue()); + value = Byte.MIN_VALUE; + schemaAndValue = Values.parseString( + String.valueOf(value) + ); + assertEquals(Schema.INT8_SCHEMA, schemaAndValue.schema()); + assertTrue(schemaAndValue.value() instanceof Byte); + assertEquals(value.byteValue(), ((Byte) schemaAndValue.value()).byteValue()); + } + + @Test + public void shouldParseShortAsInt16() { + Short value = Short.MAX_VALUE; + SchemaAndValue schemaAndValue = Values.parseString( + String.valueOf(value) + ); + assertEquals(Schema.INT16_SCHEMA, schemaAndValue.schema()); + assertTrue(schemaAndValue.value() instanceof Short); + assertEquals(value.shortValue(), ((Short) schemaAndValue.value()).shortValue()); + value = Short.MIN_VALUE; + schemaAndValue = Values.parseString( + String.valueOf(value) + ); + assertEquals(Schema.INT16_SCHEMA, schemaAndValue.schema()); + assertTrue(schemaAndValue.value() instanceof Short); + assertEquals(value.shortValue(), ((Short) schemaAndValue.value()).shortValue()); + } + + @Test + public void shouldParseIntegerAsInt32() { + Integer value = Integer.MAX_VALUE; + SchemaAndValue schemaAndValue = Values.parseString( + String.valueOf(value) + ); + assertEquals(Schema.INT32_SCHEMA, schemaAndValue.schema()); + assertTrue(schemaAndValue.value() instanceof Integer); + assertEquals(value.intValue(), ((Integer) schemaAndValue.value()).intValue()); + value = Integer.MIN_VALUE; + schemaAndValue = Values.parseString( + String.valueOf(value) + ); + assertEquals(Schema.INT32_SCHEMA, schemaAndValue.schema()); + assertTrue(schemaAndValue.value() instanceof Integer); + assertEquals(value.intValue(), ((Integer) schemaAndValue.value()).intValue()); + } + + @Test + public void shouldParseLongAsInt64() { + Long value = Long.MAX_VALUE; + SchemaAndValue schemaAndValue = Values.parseString( + String.valueOf(value) + ); + assertEquals(Schema.INT64_SCHEMA, schemaAndValue.schema()); + assertTrue(schemaAndValue.value() instanceof Long); + assertEquals(value.longValue(), ((Long) schemaAndValue.value()).longValue()); + value = Long.MIN_VALUE; + schemaAndValue = Values.parseString( + String.valueOf(value) + ); + assertEquals(Schema.INT64_SCHEMA, schemaAndValue.schema()); + assertTrue(schemaAndValue.value() instanceof Long); + assertEquals(value.longValue(), ((Long) schemaAndValue.value()).longValue()); + } + + @Test + public void shouldParseFloatAsFloat32() { + Float value = Float.MAX_VALUE; + SchemaAndValue schemaAndValue = Values.parseString( + String.valueOf(value) + ); + assertEquals(Schema.FLOAT32_SCHEMA, schemaAndValue.schema()); + assertTrue(schemaAndValue.value() instanceof Float); + assertEquals(value.floatValue(), ((Float) schemaAndValue.value()).floatValue(), 0); + value = -Float.MAX_VALUE; + schemaAndValue = Values.parseString( + String.valueOf(value) + ); + assertEquals(Schema.FLOAT32_SCHEMA, schemaAndValue.schema()); + assertTrue(schemaAndValue.value() instanceof Float); + assertEquals(value.floatValue(), ((Float) schemaAndValue.value()).floatValue(), 0); + } + + @Test + public void shouldParseDoubleAsFloat64() { + Double value = Double.MAX_VALUE; + SchemaAndValue schemaAndValue = Values.parseString( + String.valueOf(value) + ); + assertEquals(Schema.FLOAT64_SCHEMA, schemaAndValue.schema()); + assertTrue(schemaAndValue.value() instanceof Double); + assertEquals(value.doubleValue(), ((Double) schemaAndValue.value()).doubleValue(), 0); + value = -Double.MAX_VALUE; + schemaAndValue = Values.parseString( + String.valueOf(value) + ); + assertEquals(Schema.FLOAT64_SCHEMA, schemaAndValue.schema()); + assertTrue(schemaAndValue.value() instanceof Double); + assertEquals(value.doubleValue(), ((Double) schemaAndValue.value()).doubleValue(), 0); + } + protected void assertParsed(String input) { assertParsed(input, input); }