diff --git a/connect/api/src/main/java/org/apache/kafka/connect/data/Values.java b/connect/api/src/main/java/org/apache/kafka/connect/data/Values.java index d4085f8b28335..7ea9eb4a0da25 100644 --- a/connect/api/src/main/java/org/apache/kafka/connect/data/Values.java +++ b/connect/api/src/main/java/org/apache/kafka/connect/data/Values.java @@ -943,8 +943,14 @@ protected static SchemaAndValue parse(Parser parser, boolean embedded) throws No } catch (ArithmeticException e) { // continue } + float fValue = decimal.floatValue(); + if (fValue != Float.NEGATIVE_INFINITY && fValue != Float.POSITIVE_INFINITY + && decimal.scale() != 0) { + return new SchemaAndValue(Schema.FLOAT32_SCHEMA, fValue); + } double dValue = decimal.doubleValue(); - if (dValue != Double.NEGATIVE_INFINITY && dValue != Double.POSITIVE_INFINITY) { + if (dValue != Double.NEGATIVE_INFINITY && dValue != Double.POSITIVE_INFINITY + && decimal.scale() != 0) { return new SchemaAndValue(Schema.FLOAT64_SCHEMA, dValue); } Schema schema = Decimal.schema(decimal.scale()); diff --git a/connect/api/src/test/java/org/apache/kafka/connect/data/ValuesTest.java b/connect/api/src/test/java/org/apache/kafka/connect/data/ValuesTest.java index c437e46c25956..63c05eaa9b1e3 100644 --- a/connect/api/src/test/java/org/apache/kafka/connect/data/ValuesTest.java +++ b/connect/api/src/test/java/org/apache/kafka/connect/data/ValuesTest.java @@ -21,6 +21,8 @@ import org.apache.kafka.connect.errors.DataException; import org.junit.Test; +import java.math.BigDecimal; +import java.math.BigInteger; import java.text.SimpleDateFormat; import java.util.ArrayList; import java.util.Arrays; @@ -726,6 +728,132 @@ public void shouldConvertTimestampValues() { public void canConsume() { } + @Test + public void shouldParseBigIntegerAsDecimalWithZeroScale() { + BigInteger value = BigInteger.valueOf(Long.MAX_VALUE).add(new BigInteger("1")); + SchemaAndValue schemaAndValue = Values.parseString( + String.valueOf(value) + ); + assertEquals(Decimal.schema(0), schemaAndValue.schema()); + assertTrue(schemaAndValue.value() instanceof BigDecimal); + assertEquals(value, ((BigDecimal) schemaAndValue.value()).unscaledValue()); + value = BigInteger.valueOf(Long.MIN_VALUE).subtract(new BigInteger("1")); + schemaAndValue = Values.parseString( + String.valueOf(value) + ); + assertEquals(Decimal.schema(0), schemaAndValue.schema()); + assertTrue(schemaAndValue.value() instanceof BigDecimal); + assertEquals(value, ((BigDecimal) schemaAndValue.value()).unscaledValue()); + } + + @Test + public void shouldParseByteAsInt8() { + Byte value = Byte.MAX_VALUE; + SchemaAndValue schemaAndValue = Values.parseString( + String.valueOf(value) + ); + assertEquals(Schema.INT8_SCHEMA, schemaAndValue.schema()); + assertTrue(schemaAndValue.value() instanceof Byte); + assertEquals(value.byteValue(), ((Byte) schemaAndValue.value()).byteValue()); + value = Byte.MIN_VALUE; + schemaAndValue = Values.parseString( + String.valueOf(value) + ); + assertEquals(Schema.INT8_SCHEMA, schemaAndValue.schema()); + assertTrue(schemaAndValue.value() instanceof Byte); + assertEquals(value.byteValue(), ((Byte) schemaAndValue.value()).byteValue()); + } + + @Test + public void shouldParseShortAsInt16() { + Short value = Short.MAX_VALUE; + SchemaAndValue schemaAndValue = Values.parseString( + String.valueOf(value) + ); + assertEquals(Schema.INT16_SCHEMA, schemaAndValue.schema()); + assertTrue(schemaAndValue.value() instanceof Short); + assertEquals(value.shortValue(), ((Short) schemaAndValue.value()).shortValue()); + value = Short.MIN_VALUE; + schemaAndValue = Values.parseString( + String.valueOf(value) + ); + assertEquals(Schema.INT16_SCHEMA, schemaAndValue.schema()); + assertTrue(schemaAndValue.value() instanceof Short); + assertEquals(value.shortValue(), ((Short) schemaAndValue.value()).shortValue()); + } + + @Test + public void shouldParseIntegerAsInt32() { + Integer value = Integer.MAX_VALUE; + SchemaAndValue schemaAndValue = Values.parseString( + String.valueOf(value) + ); + assertEquals(Schema.INT32_SCHEMA, schemaAndValue.schema()); + assertTrue(schemaAndValue.value() instanceof Integer); + assertEquals(value.intValue(), ((Integer) schemaAndValue.value()).intValue()); + value = Integer.MIN_VALUE; + schemaAndValue = Values.parseString( + String.valueOf(value) + ); + assertEquals(Schema.INT32_SCHEMA, schemaAndValue.schema()); + assertTrue(schemaAndValue.value() instanceof Integer); + assertEquals(value.intValue(), ((Integer) schemaAndValue.value()).intValue()); + } + + @Test + public void shouldParseLongAsInt64() { + Long value = Long.MAX_VALUE; + SchemaAndValue schemaAndValue = Values.parseString( + String.valueOf(value) + ); + assertEquals(Schema.INT64_SCHEMA, schemaAndValue.schema()); + assertTrue(schemaAndValue.value() instanceof Long); + assertEquals(value.longValue(), ((Long) schemaAndValue.value()).longValue()); + value = Long.MIN_VALUE; + schemaAndValue = Values.parseString( + String.valueOf(value) + ); + assertEquals(Schema.INT64_SCHEMA, schemaAndValue.schema()); + assertTrue(schemaAndValue.value() instanceof Long); + assertEquals(value.longValue(), ((Long) schemaAndValue.value()).longValue()); + } + + @Test + public void shouldParseFloatAsFloat32() { + Float value = Float.MAX_VALUE; + SchemaAndValue schemaAndValue = Values.parseString( + String.valueOf(value) + ); + assertEquals(Schema.FLOAT32_SCHEMA, schemaAndValue.schema()); + assertTrue(schemaAndValue.value() instanceof Float); + assertEquals(value.floatValue(), ((Float) schemaAndValue.value()).floatValue(), 0); + value = -Float.MAX_VALUE; + schemaAndValue = Values.parseString( + String.valueOf(value) + ); + assertEquals(Schema.FLOAT32_SCHEMA, schemaAndValue.schema()); + assertTrue(schemaAndValue.value() instanceof Float); + assertEquals(value.floatValue(), ((Float) schemaAndValue.value()).floatValue(), 0); + } + + @Test + public void shouldParseDoubleAsFloat64() { + Double value = Double.MAX_VALUE; + SchemaAndValue schemaAndValue = Values.parseString( + String.valueOf(value) + ); + assertEquals(Schema.FLOAT64_SCHEMA, schemaAndValue.schema()); + assertTrue(schemaAndValue.value() instanceof Double); + assertEquals(value.doubleValue(), ((Double) schemaAndValue.value()).doubleValue(), 0); + value = -Double.MAX_VALUE; + schemaAndValue = Values.parseString( + String.valueOf(value) + ); + assertEquals(Schema.FLOAT64_SCHEMA, schemaAndValue.schema()); + assertTrue(schemaAndValue.value() instanceof Double); + assertEquals(value.doubleValue(), ((Double) schemaAndValue.value()).doubleValue(), 0); + } + protected void assertParsed(String input) { assertParsed(input, input); }