Skip to content

Commit

Permalink
chore: follow-ups for numeric JSON decimal support and documentation (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
agavra authored Nov 7, 2019
1 parent 04e206f commit a1b5059
Show file tree
Hide file tree
Showing 12 changed files with 97 additions and 40 deletions.
23 changes: 23 additions & 0 deletions docs/developer-guide/serialization.rst
Original file line number Diff line number Diff line change
Expand Up @@ -373,6 +373,29 @@ Field Name Case Sensitivity
The format is case-insensitive when matching a KSQL field name with an Avro record's field name.
The first case-insensitive match is used.

Decimal Serialization
---------------------

KSQL accepts Decimals that are serialized either as numbers, or the text representation of the
base 10 equivalent. For example, KSQL can read data from both formats below:

.. code:: json
{
"value": 1.12345678912345,
"value": "1.12345678912345"
}
Decimals with specified precision and scale are serialized as JSON floating point numbers. For
example:

.. code:: json
{
"value": 1.12345678912345
}
.. _kafka_format:

-----
Expand Down
16 changes: 16 additions & 0 deletions docs/developer-guide/syntax-reference.rst
Original file line number Diff line number Diff line change
Expand Up @@ -306,6 +306,22 @@ defines a struct with three fields, with the supplied name and type.
Access the fields of a struct by using the ``->`` operator. For example, ``SOME_STRUCT->ID``
retrieves the value of the struct's ``ID`` field. For more information, see :ref:`operators`.

Decimal
-------

``DECIMAL(Precision, Scale)``

KSQL supports fields that are numeric data types with fixed precision and scale:

- **Precision** is the maximum total number of decimal digits to be stored, including values to
the left and right of the decimal point. The precision must be greater than 1 - there is no
default precision.
- **Scale** is the number of decimal digits to the right of the decimal points. This number must
be greater than 0 and less than or equal to the value for Precision.

Mathematical operations between ``DOUBLE`` and ``DECIMAL`` will automatically cause the decimal
to be converted to a double value. Converting from decimal data type to any floating point type
(``DOUBLE``) may cause loss of precision.

===============
KSQL statements
Expand Down
13 changes: 10 additions & 3 deletions docs/developer-guide/udf.rst
Original file line number Diff line number Diff line change
Expand Up @@ -115,8 +115,9 @@ Dynamic return type
~~~~~~~~~~~~~~~~~~~

UDFs support dynamic return types that are resolved at runtime. This is useful if you want to
implement a UDF with a non-deterministic return type. A UDF which returns ``BigDecimal``,
for example, may vary the precision and scale of the output based on the input schema.
implement a UDF with a non-deterministic return type such as ``DECIMAL`` or ``STRUCT``. For example,
A UDF that returns ``BigDecimal`` (which maps to the SQL ``DECIMAL`` type) may vary the precision
and scale of the output based on the input schema.

To use this functionality, you need to specify a method with signature
``public SqlType <your-method-name>(final List<SqlType> params)`` and annotate it with ``@SchemaProvider``.
Expand Down Expand Up @@ -879,9 +880,15 @@ The types supported by UDFs/UDAFs/UDTFs are currently limited to:
+--------------+------------------+
| Map | MAP |
+--------------+------------------+
| Struct | STRUCT |
| Struct | STRUCT\ :sup:`1` |
+--------------+------------------+
| BigDecimal | DECIMAL\ :sup:`1`|
+--------------+------------------+

1. Using Struct or BigDecimal in UDFs requires specifying the schema using ``paramSchema``,
``returnSchema``, ``aggregateSchema`` or a schema provider.


.. _deploying-udf:

=========
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.gson.internal.Primitives;
import com.google.common.primitives.Primitives;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,14 +54,22 @@ public static boolean matches(final Object actualValue, final JsonNode expectedV
private static boolean compareStruct(final Object actualValue, final JsonNode expectedValue) {
final ObjectNode expected = (ObjectNode) expectedValue;
final Function<String, Object> getter;
final int numFields;

if (actualValue instanceof Struct) {
getter = ((Struct) actualValue)::get;
numFields = ((Struct) actualValue).schema().fields().size();
} else if (actualValue instanceof Map) {
getter = ((Map<?, ?>) actualValue)::get;
numFields = ((Map<?, ?>) actualValue).size();
} else {
return false;
}

if (numFields != expected.size()) {
return false;
}

final Iterator<Entry<String, JsonNode>> fields = expected.fields();
while (fields.hasNext()) {
final Entry<String, JsonNode> field = fields.next();
Expand All @@ -74,22 +82,27 @@ private static boolean compareStruct(final Object actualValue, final JsonNode ex

private static boolean compareArray(final Object actualValue, final JsonNode expectedValue) {
final ArrayNode expected = (ArrayNode) expectedValue;
if (actualValue instanceof List) {
final List<?> actual = (List<?>) actualValue;
final Iterator<JsonNode> elements = expected.elements();

int i = 0;
while (elements.hasNext()) {
final JsonNode el = elements.next();
if (!comparator(el).test(actual.get(i), el)) {
return false;
}
i++;
}
if (!(actualValue instanceof List)) {
return false;
}

final List<?> actual = (List<?>) actualValue;
if (actual.size() != expected.size()) {
return false;
}

return true;
final Iterator<JsonNode> elements = expected.elements();

int i = 0;
while (elements.hasNext()) {
final JsonNode el = elements.next();
if (!comparator(el).test(actual.get(i), el)) {
return false;
}
i++;
}
return false;

return true;
}

private static boolean compareNumber(final Object actualValue, final JsonNode expectedValue) {
Expand All @@ -110,7 +123,6 @@ private static boolean compareNumber(final Object actualValue, final JsonNode ex
return false;
}

expected.isBigDecimal();
try {
return expected.decimalValue()
.setScale(((BigDecimal) actualValue).scale(), RoundingMode.UNNECESSARY)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ public class Record {
private final Object value;
private final Optional<Long> timestamp;
private final WindowData window;
private JsonNode jsonValue;
private Optional<JsonNode> jsonValue;

public Record(
final Topic topic,
Expand All @@ -61,7 +61,7 @@ public Record(
this.topic = topic;
this.key = key;
this.value = value;
this.jsonValue = jsonValue;
this.jsonValue = Optional.ofNullable(jsonValue);
this.timestamp = Objects.requireNonNull(timestamp, "timestamp");
this.window = window;
}
Expand Down Expand Up @@ -120,7 +120,7 @@ public Topic topic() {
return topic;
}

public JsonNode getJsonValue() {
public Optional<JsonNode> getJsonValue() {
return jsonValue;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import io.confluent.ksql.test.tools.stubs.StubKafkaTopicClient;
import io.confluent.ksql.util.KsqlConfig;
import io.confluent.ksql.util.KsqlException;
import io.confluent.ksql.util.KsqlServerException;
import java.io.Closeable;
import java.util.Arrays;
import java.util.Collections;
Expand Down Expand Up @@ -485,7 +486,9 @@ private static void validateCreatedMessage(
final long actualTimestamp = actualProducerRecord.timestamp();

final Object expectedKey = expectedRecord.key();
final JsonNode expectedValue = expectedRecord.getJsonValue();
final JsonNode expectedValue = expectedRecord.getJsonValue()
.orElseThrow(() -> new KsqlServerException(
"could not get expected value from test record: " + expectedRecord));
final long expectedTimestamp = expectedRecord.timestamp().orElse(actualTimestamp);

final AssertionError error = new AssertionError(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,16 +96,14 @@ private static List<Record> generateInputRecords(
return list;
}

private static List<Record> getOutputRecords(
final Topic topic, final List<Record> inputRecords,
final org.apache.avro.Schema avroSchema) {
private static List<Record> getOutputRecords(final Topic topic, final List<Record> inputRecords) {
return inputRecords.stream()
.map(
r -> new Record(
topic,
"test-key",
r.value(),
r.getJsonValue(),
r.getJsonValue().orElse(null),
Optional.of(0L),
null
))
Expand Down Expand Up @@ -171,7 +169,7 @@ Stream<TestCase> buildTests(final Path testPath) {
);

final List<Record> inputRecords = generateInputRecords(srcTopic, schema);
final List<Record> outputRecords = getOutputRecords(OUTPUT_TOPIC, inputRecords, schema);
final List<Record> outputRecords = getOutputRecords(OUTPUT_TOPIC, inputRecords);

final String csasStatement = schema.getFields()
.stream()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import io.confluent.ksql.test.tools.Topic;
import io.confluent.ksql.test.util.EmbeddedSingleNodeKafkaCluster;
import io.confluent.ksql.util.KsqlConstants;
import io.confluent.ksql.util.KsqlServerException;
import io.confluent.ksql.util.RetryUtil;
import java.io.Closeable;
import java.net.URL;
Expand Down Expand Up @@ -327,7 +328,9 @@ private static void compareKeyValueTimestamp(
final Object actualValue = actual.value();

final Object expectedKey = expected.key();
final JsonNode expectedValue = expected.getJsonValue();
final JsonNode expectedValue = expected.getJsonValue()
.orElseThrow(() -> new KsqlServerException(
"could not get expected value from test record: " + expected));
final long expectedTimestamp = expected.timestamp().orElse(actualTimestamp);

final AssertionError error = new AssertionError(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,8 @@
"outputs": [
{"topic": "OUTPUT", "key": 1, "value": {"KSQL_COL_0": [0, 1, 2, 3]}},
{"topic": "OUTPUT", "key": 1, "value": {"KSQL_COL_0": [-2, -1, 0, 1]}},
{"topic": "OUTPUT", "key": 1, "value": {"KSQL_COL_0": []}},
{"topic": "OUTPUT", "key": 1, "value": {"KSQL_COL_0": []}}
{"topic": "OUTPUT", "key": 1, "value": {"KSQL_COL_0": [4, 3]}},
{"topic": "OUTPUT", "key": 1, "value": {"KSQL_COL_0": [4, 3, 2, 1, 0]}}
]
},
{
Expand Down
5 changes: 0 additions & 5 deletions ksql-serde/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -39,11 +39,6 @@
<artifactId>commons-csv</artifactId>
</dependency>

<dependency>
<groupId>com.google.code.gson</groupId>
<artifactId>gson</artifactId>
</dependency>

<dependency>
<groupId>io.confluent</groupId>
<artifactId>kafka-schema-registry-client</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@
import java.util.Objects;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.commons.lang3.ObjectUtils;
import org.apache.kafka.common.errors.SerializationException;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.connect.data.Field;
Expand Down Expand Up @@ -225,9 +224,10 @@ private static Struct enforceFieldTypesForStruct(final JsonValueContext context)
// during parsing. any ksql fields that are case insensitive, therefore, will be matched
// in this case insensitive field map without modification but the quoted fields will not
// (unless they were all uppercase to start off with, which is expected to match)
final JsonNode fieldValue = ObjectUtils.defaultIfNull(
jsonFields.get(ksqlField.name()),
upperCasedFields.get(ksqlField.name()));
JsonNode fieldValue = jsonFields.get(ksqlField.name());
if (fieldValue == null) {
fieldValue = upperCasedFields.get(ksqlField.name());
}

final Object coerced = enforceFieldType(
context.deserializer,
Expand Down

0 comments on commit a1b5059

Please sign in to comment.