Skip to content

Commit

Permalink
KAFKA-10439: Connect's Values to parse BigInteger as Decimal with zer…
Browse files Browse the repository at this point in the history
…o scale. (apache#9320)

The `org.apache.kafka.connect.data.Values#parse` method parses integers, which are larger than `Long.MAX_VALUE` as `double` with `Schema.FLOAT64_SCHEMA`.

That means we are losing precision for these larger integers.

For example:
`SchemaAndValue schemaAndValue = Values.parseString("9223372036854775808");`
returns:
`SchemaAndValue{schema=Schema{FLOAT64}, value=9.223372036854776E18}`

Also, this method parses values that can be parsed as `FLOAT32` to `FLOAT64`.

This PR changes parsing logic, to use `FLOAT32`/`FLOAT64` for numbers that don't have fraction part(`decimal.scale()!=0`) only, and use an arbitrary-precision `org.apache.kafka.connect.data.Decimal` otherwise.
Also, it updates the method to parse numbers, that can be represented as `float` to `FLOAT64`.

Added unit tests, that cover parsing `BigInteger`, `Byte`, `Short`, `Integer`, `Long`, `Float`, `Double` types.

Reviewers: Konstantine Karantasis <[email protected]>
  • Loading branch information
avocader authored and javierfreire committed Oct 8, 2020
1 parent b2d9f66 commit 098d468
Show file tree
Hide file tree
Showing 2 changed files with 135 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -943,8 +943,14 @@ protected static SchemaAndValue parse(Parser parser, boolean embedded) throws No
} catch (ArithmeticException e) {
// continue
}
float fValue = decimal.floatValue();
if (fValue != Float.NEGATIVE_INFINITY && fValue != Float.POSITIVE_INFINITY
&& decimal.scale() != 0) {
return new SchemaAndValue(Schema.FLOAT32_SCHEMA, fValue);
}
double dValue = decimal.doubleValue();
if (dValue != Double.NEGATIVE_INFINITY && dValue != Double.POSITIVE_INFINITY) {
if (dValue != Double.NEGATIVE_INFINITY && dValue != Double.POSITIVE_INFINITY
&& decimal.scale() != 0) {
return new SchemaAndValue(Schema.FLOAT64_SCHEMA, dValue);
}
Schema schema = Decimal.schema(decimal.scale());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
import org.apache.kafka.connect.errors.DataException;
import org.junit.Test;

import java.math.BigDecimal;
import java.math.BigInteger;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Arrays;
Expand Down Expand Up @@ -726,6 +728,132 @@ public void shouldConvertTimestampValues() {
public void canConsume() {
}

@Test
public void shouldParseBigIntegerAsDecimalWithZeroScale() {
BigInteger value = BigInteger.valueOf(Long.MAX_VALUE).add(new BigInteger("1"));
SchemaAndValue schemaAndValue = Values.parseString(
String.valueOf(value)
);
assertEquals(Decimal.schema(0), schemaAndValue.schema());
assertTrue(schemaAndValue.value() instanceof BigDecimal);
assertEquals(value, ((BigDecimal) schemaAndValue.value()).unscaledValue());
value = BigInteger.valueOf(Long.MIN_VALUE).subtract(new BigInteger("1"));
schemaAndValue = Values.parseString(
String.valueOf(value)
);
assertEquals(Decimal.schema(0), schemaAndValue.schema());
assertTrue(schemaAndValue.value() instanceof BigDecimal);
assertEquals(value, ((BigDecimal) schemaAndValue.value()).unscaledValue());
}

@Test
public void shouldParseByteAsInt8() {
Byte value = Byte.MAX_VALUE;
SchemaAndValue schemaAndValue = Values.parseString(
String.valueOf(value)
);
assertEquals(Schema.INT8_SCHEMA, schemaAndValue.schema());
assertTrue(schemaAndValue.value() instanceof Byte);
assertEquals(value.byteValue(), ((Byte) schemaAndValue.value()).byteValue());
value = Byte.MIN_VALUE;
schemaAndValue = Values.parseString(
String.valueOf(value)
);
assertEquals(Schema.INT8_SCHEMA, schemaAndValue.schema());
assertTrue(schemaAndValue.value() instanceof Byte);
assertEquals(value.byteValue(), ((Byte) schemaAndValue.value()).byteValue());
}

@Test
public void shouldParseShortAsInt16() {
Short value = Short.MAX_VALUE;
SchemaAndValue schemaAndValue = Values.parseString(
String.valueOf(value)
);
assertEquals(Schema.INT16_SCHEMA, schemaAndValue.schema());
assertTrue(schemaAndValue.value() instanceof Short);
assertEquals(value.shortValue(), ((Short) schemaAndValue.value()).shortValue());
value = Short.MIN_VALUE;
schemaAndValue = Values.parseString(
String.valueOf(value)
);
assertEquals(Schema.INT16_SCHEMA, schemaAndValue.schema());
assertTrue(schemaAndValue.value() instanceof Short);
assertEquals(value.shortValue(), ((Short) schemaAndValue.value()).shortValue());
}

@Test
public void shouldParseIntegerAsInt32() {
Integer value = Integer.MAX_VALUE;
SchemaAndValue schemaAndValue = Values.parseString(
String.valueOf(value)
);
assertEquals(Schema.INT32_SCHEMA, schemaAndValue.schema());
assertTrue(schemaAndValue.value() instanceof Integer);
assertEquals(value.intValue(), ((Integer) schemaAndValue.value()).intValue());
value = Integer.MIN_VALUE;
schemaAndValue = Values.parseString(
String.valueOf(value)
);
assertEquals(Schema.INT32_SCHEMA, schemaAndValue.schema());
assertTrue(schemaAndValue.value() instanceof Integer);
assertEquals(value.intValue(), ((Integer) schemaAndValue.value()).intValue());
}

@Test
public void shouldParseLongAsInt64() {
Long value = Long.MAX_VALUE;
SchemaAndValue schemaAndValue = Values.parseString(
String.valueOf(value)
);
assertEquals(Schema.INT64_SCHEMA, schemaAndValue.schema());
assertTrue(schemaAndValue.value() instanceof Long);
assertEquals(value.longValue(), ((Long) schemaAndValue.value()).longValue());
value = Long.MIN_VALUE;
schemaAndValue = Values.parseString(
String.valueOf(value)
);
assertEquals(Schema.INT64_SCHEMA, schemaAndValue.schema());
assertTrue(schemaAndValue.value() instanceof Long);
assertEquals(value.longValue(), ((Long) schemaAndValue.value()).longValue());
}

@Test
public void shouldParseFloatAsFloat32() {
Float value = Float.MAX_VALUE;
SchemaAndValue schemaAndValue = Values.parseString(
String.valueOf(value)
);
assertEquals(Schema.FLOAT32_SCHEMA, schemaAndValue.schema());
assertTrue(schemaAndValue.value() instanceof Float);
assertEquals(value.floatValue(), ((Float) schemaAndValue.value()).floatValue(), 0);
value = -Float.MAX_VALUE;
schemaAndValue = Values.parseString(
String.valueOf(value)
);
assertEquals(Schema.FLOAT32_SCHEMA, schemaAndValue.schema());
assertTrue(schemaAndValue.value() instanceof Float);
assertEquals(value.floatValue(), ((Float) schemaAndValue.value()).floatValue(), 0);
}

@Test
public void shouldParseDoubleAsFloat64() {
Double value = Double.MAX_VALUE;
SchemaAndValue schemaAndValue = Values.parseString(
String.valueOf(value)
);
assertEquals(Schema.FLOAT64_SCHEMA, schemaAndValue.schema());
assertTrue(schemaAndValue.value() instanceof Double);
assertEquals(value.doubleValue(), ((Double) schemaAndValue.value()).doubleValue(), 0);
value = -Double.MAX_VALUE;
schemaAndValue = Values.parseString(
String.valueOf(value)
);
assertEquals(Schema.FLOAT64_SCHEMA, schemaAndValue.schema());
assertTrue(schemaAndValue.value() instanceof Double);
assertEquals(value.doubleValue(), ((Double) schemaAndValue.value()).doubleValue(), 0);
}

protected void assertParsed(String input) {
assertParsed(input, input);
}
Expand Down

0 comments on commit 098d468

Please sign in to comment.