From cd0f11305ad8f7aeb8c25faf9d781445f6fdc54f Mon Sep 17 00:00:00 2001 From: Greg Harris Date: Thu, 23 May 2024 13:23:18 -0700 Subject: [PATCH] MINOR: Refactor Values class to fix checkstyle, add benchmark, optimize exceptions (#15469) Signed-off-by: Greg Harris Reviewers: Mickael Maison --- checkstyle/suppressions.xml | 8 +- .../org/apache/kafka/connect/data/Values.java | 1161 ++++++++++------- .../apache/kafka/connect/data/ValuesTest.java | 248 +++- gradle/spotbugs-exclude.xml | 7 + .../kafka/jmh/connect/ValuesBenchmark.java | 297 +++++ 5 files changed, 1195 insertions(+), 526 deletions(-) create mode 100644 jmh-benchmarks/src/main/java/org/apache/kafka/jmh/connect/ValuesBenchmark.java diff --git a/checkstyle/suppressions.xml b/checkstyle/suppressions.xml index a8bc6351db3fa..a317fa558c10f 100644 --- a/checkstyle/suppressions.xml +++ b/checkstyle/suppressions.xml @@ -99,7 +99,7 @@ files="(AbstractRequest|AbstractResponse|KerberosLogin|WorkerSinkTaskTest|TransactionManagerTest|SenderTest|KafkaAdminClient|ConsumerCoordinatorTest|KafkaAdminClientTest|KafkaRaftClientTest).java"/> + files="(ConsumerCoordinator|BufferPool|MetricName|Node|ConfigDef|RecordBatch|SslFactory|SslTransportLayer|MetadataResponse|KerberosLogin|Selector|Sender|Serdes|TokenInformation|Agent|PluginUtils|MiniTrogdorCluster|TasksRequest|KafkaProducer|AbstractStickyAssignor|KafkaRaftClient|Authorizer|FetchSessionHandler|RecordAccumulator|Shell).java"/> @@ -143,7 +143,7 @@ + files="(DistributedHerder|DistributedConfig|KafkaConfigBackingStore|IncrementalCooperativeAssignor).java"/> + files="(JsonConverter|ConnectHeaders).java"/> + files="(KafkaConfigBackingStore|ConnectMetricsRegistry).java"/> diff --git a/connect/api/src/main/java/org/apache/kafka/connect/data/Values.java b/connect/api/src/main/java/org/apache/kafka/connect/data/Values.java index 7b78c64af0ca7..e144b7c69b6c7 100644 --- a/connect/api/src/main/java/org/apache/kafka/connect/data/Values.java +++ b/connect/api/src/main/java/org/apache/kafka/connect/data/Values.java @@ -22,25 +22,24 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.Serializable; import java.math.BigDecimal; +import java.math.BigInteger; +import java.math.RoundingMode; import java.nio.ByteBuffer; import java.text.CharacterIterator; import java.text.DateFormat; -import java.text.ParseException; +import java.text.ParsePosition; import java.text.SimpleDateFormat; import java.text.StringCharacterIterator; import java.util.ArrayList; -import java.util.Arrays; import java.util.Base64; import java.util.Calendar; -import java.util.Collections; -import java.util.HashSet; import java.util.Iterator; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.NoSuchElementException; -import java.util.Set; import java.util.TimeZone; import java.util.regex.Pattern; @@ -61,42 +60,16 @@ */ public class Values { - private static final Logger LOG = LoggerFactory.getLogger(Values.class); - private static final TimeZone UTC = TimeZone.getTimeZone("UTC"); private static final SchemaAndValue NULL_SCHEMA_AND_VALUE = new SchemaAndValue(null, null); - private static final SchemaAndValue TRUE_SCHEMA_AND_VALUE = new SchemaAndValue(Schema.BOOLEAN_SCHEMA, Boolean.TRUE); - private static final SchemaAndValue FALSE_SCHEMA_AND_VALUE = new SchemaAndValue(Schema.BOOLEAN_SCHEMA, Boolean.FALSE); private static final Schema ARRAY_SELECTOR_SCHEMA = SchemaBuilder.array(Schema.STRING_SCHEMA).build(); private static final Schema MAP_SELECTOR_SCHEMA = SchemaBuilder.map(Schema.STRING_SCHEMA, Schema.STRING_SCHEMA).build(); private static final Schema STRUCT_SELECTOR_SCHEMA = SchemaBuilder.struct().build(); - private static final String TRUE_LITERAL = Boolean.TRUE.toString(); - private static final String FALSE_LITERAL = Boolean.FALSE.toString(); private static final long MILLIS_PER_DAY = 24 * 60 * 60 * 1000; private static final String NULL_VALUE = "null"; static final String ISO_8601_DATE_FORMAT_PATTERN = "yyyy-MM-dd"; static final String ISO_8601_TIME_FORMAT_PATTERN = "HH:mm:ss.SSS'Z'"; static final String ISO_8601_TIMESTAMP_FORMAT_PATTERN = ISO_8601_DATE_FORMAT_PATTERN + "'T'" + ISO_8601_TIME_FORMAT_PATTERN; - private static final Set TEMPORAL_LOGICAL_TYPE_NAMES = - Collections.unmodifiableSet( - new HashSet<>( - Arrays.asList(Time.LOGICAL_NAME, - Timestamp.LOGICAL_NAME, - Date.LOGICAL_NAME - ) - ) - ); - - private static final String QUOTE_DELIMITER = "\""; - private static final String COMMA_DELIMITER = ","; - private static final String ENTRY_DELIMITER = ":"; - private static final String ARRAY_BEGIN_DELIMITER = "["; - private static final String ARRAY_END_DELIMITER = "]"; - private static final String MAP_BEGIN_DELIMITER = "{"; - private static final String MAP_END_DELIMITER = "}"; - private static final int ISO_8601_DATE_LENGTH = ISO_8601_DATE_FORMAT_PATTERN.length(); - private static final int ISO_8601_TIME_LENGTH = ISO_8601_TIME_FORMAT_PATTERN.length() - 2; // subtract single quotes - private static final int ISO_8601_TIMESTAMP_LENGTH = ISO_8601_TIMESTAMP_FORMAT_PATTERN.length() - 4; // subtract single quotes private static final Pattern TWO_BACKSLASHES = Pattern.compile("\\\\"); @@ -112,7 +85,17 @@ public class Values { * @throws DataException if the value could not be converted to a boolean */ public static Boolean convertToBoolean(Schema schema, Object value) throws DataException { - return (Boolean) convertTo(Schema.OPTIONAL_BOOLEAN_SCHEMA, schema, value); + if (value == null) { + return null; + } else if (value instanceof Boolean) { + return (Boolean) value; + } else if (value instanceof String) { + SchemaAndValue parsed = parseString(value.toString()); + if (parsed.value() instanceof Boolean) { + return (Boolean) parsed.value(); + } + } + return asLong(value, schema, null) == 0L ? Boolean.FALSE : Boolean.TRUE; } /** @@ -125,7 +108,12 @@ public static Boolean convertToBoolean(Schema schema, Object value) throws DataE * @throws DataException if the value could not be converted to a byte */ public static Byte convertToByte(Schema schema, Object value) throws DataException { - return (Byte) convertTo(Schema.OPTIONAL_INT8_SCHEMA, schema, value); + if (value == null) { + return null; + } else if (value instanceof Byte) { + return (Byte) value; + } + return (byte) asLong(value, schema, null); } /** @@ -138,7 +126,12 @@ public static Byte convertToByte(Schema schema, Object value) throws DataExcepti * @throws DataException if the value could not be converted to a short */ public static Short convertToShort(Schema schema, Object value) throws DataException { - return (Short) convertTo(Schema.OPTIONAL_INT16_SCHEMA, schema, value); + if (value == null) { + return null; + } else if (value instanceof Short) { + return (Short) value; + } + return (short) asLong(value, schema, null); } /** @@ -151,7 +144,12 @@ public static Short convertToShort(Schema schema, Object value) throws DataExcep * @throws DataException if the value could not be converted to an integer */ public static Integer convertToInteger(Schema schema, Object value) throws DataException { - return (Integer) convertTo(Schema.OPTIONAL_INT32_SCHEMA, schema, value); + if (value == null) { + return null; + } else if (value instanceof Integer) { + return (Integer) value; + } + return (int) asLong(value, schema, null); } /** @@ -164,7 +162,12 @@ public static Integer convertToInteger(Schema schema, Object value) throws DataE * @throws DataException if the value could not be converted to a long */ public static Long convertToLong(Schema schema, Object value) throws DataException { - return (Long) convertTo(Schema.OPTIONAL_INT64_SCHEMA, schema, value); + if (value == null) { + return null; + } else if (value instanceof Long) { + return (Long) value; + } + return asLong(value, schema, null); } /** @@ -177,7 +180,12 @@ public static Long convertToLong(Schema schema, Object value) throws DataExcepti * @throws DataException if the value could not be converted to a float */ public static Float convertToFloat(Schema schema, Object value) throws DataException { - return (Float) convertTo(Schema.OPTIONAL_FLOAT32_SCHEMA, schema, value); + if (value == null) { + return null; + } else if (value instanceof Float) { + return (Float) value; + } + return (float) asDouble(value, schema, null); } /** @@ -190,7 +198,12 @@ public static Float convertToFloat(Schema schema, Object value) throws DataExcep * @throws DataException if the value could not be converted to a double */ public static Double convertToDouble(Schema schema, Object value) throws DataException { - return (Double) convertTo(Schema.OPTIONAL_FLOAT64_SCHEMA, schema, value); + if (value == null) { + return null; + } else if (value instanceof Double) { + return (Double) value; + } + return asDouble(value, schema, null); } /** @@ -202,7 +215,12 @@ public static Double convertToDouble(Schema schema, Object value) throws DataExc * @return the representation as a string, or null if the supplied value was null */ public static String convertToString(Schema schema, Object value) { - return (String) convertTo(Schema.OPTIONAL_STRING_SCHEMA, schema, value); + if (value == null) { + return null; + } + StringBuilder sb = new StringBuilder(); + append(sb, value, false); + return sb.toString(); } /** @@ -219,7 +237,7 @@ public static String convertToString(Schema schema, Object value) { * @throws DataException if the value cannot be converted to a list value */ public static List convertToList(Schema schema, Object value) { - return (List) convertTo(ARRAY_SELECTOR_SCHEMA, schema, value); + return convertToArray(ARRAY_SELECTOR_SCHEMA, value); } /** @@ -235,7 +253,7 @@ public static List convertToList(Schema schema, Object value) { * @throws DataException if the value cannot be converted to a map value */ public static Map convertToMap(Schema schema, Object value) { - return (Map) convertTo(MAP_SELECTOR_SCHEMA, schema, value); + return convertToMapInternal(MAP_SELECTOR_SCHEMA, value); } /** @@ -250,7 +268,7 @@ public static List convertToList(Schema schema, Object value) { * @throws DataException if the value is not a struct */ public static Struct convertToStruct(Schema schema, Object value) { - return (Struct) convertTo(STRUCT_SELECTOR_SCHEMA, schema, value); + return convertToStructInternal(STRUCT_SELECTOR_SCHEMA, value); } /** @@ -263,7 +281,10 @@ public static Struct convertToStruct(Schema schema, Object value) { * @throws DataException if the value cannot be converted to a time value */ public static java.util.Date convertToTime(Schema schema, Object value) { - return (java.util.Date) convertTo(Time.SCHEMA, schema, value); + if (value == null) { + throw new DataException("Unable to convert a null value to a schema that requires a value"); + } + return convertToTime(Time.SCHEMA, schema, value); } /** @@ -276,7 +297,10 @@ public static java.util.Date convertToTime(Schema schema, Object value) { * @throws DataException if the value cannot be converted to a date value */ public static java.util.Date convertToDate(Schema schema, Object value) { - return (java.util.Date) convertTo(Date.SCHEMA, schema, value); + if (value == null) { + throw new DataException("Unable to convert a null value to a schema that requires a value"); + } + return convertToDate(Date.SCHEMA, schema, value); } /** @@ -289,7 +313,10 @@ public static java.util.Date convertToDate(Schema schema, Object value) { * @throws DataException if the value cannot be converted to a timestamp value */ public static java.util.Date convertToTimestamp(Schema schema, Object value) { - return (java.util.Date) convertTo(Timestamp.SCHEMA, schema, value); + if (value == null) { + throw new DataException("Unable to convert a null value to a schema that requires a value"); + } + return convertToTimestamp(Timestamp.SCHEMA, schema, value); } /** @@ -302,7 +329,10 @@ public static java.util.Date convertToTimestamp(Schema schema, Object value) { * @throws DataException if the value cannot be converted to a decimal value */ public static BigDecimal convertToDecimal(Schema schema, Object value, int scale) { - return (BigDecimal) convertTo(Decimal.schema(scale), schema, value); + if (value == null) { + throw new DataException("Unable to convert a null value to a schema that requires a value"); + } + return convertToDecimal(Decimal.schema(scale), value); } /** @@ -314,65 +344,59 @@ public static BigDecimal convertToDecimal(Schema schema, Object value, int scale public static Schema inferSchema(Object value) { if (value instanceof String) { return Schema.STRING_SCHEMA; - } - if (value instanceof Boolean) { + } else if (value instanceof Boolean) { return Schema.BOOLEAN_SCHEMA; - } - if (value instanceof Byte) { + } else if (value instanceof Byte) { return Schema.INT8_SCHEMA; - } - if (value instanceof Short) { + } else if (value instanceof Short) { return Schema.INT16_SCHEMA; - } - if (value instanceof Integer) { + } else if (value instanceof Integer) { return Schema.INT32_SCHEMA; - } - if (value instanceof Long) { + } else if (value instanceof Long) { return Schema.INT64_SCHEMA; - } - if (value instanceof Float) { + } else if (value instanceof Float) { return Schema.FLOAT32_SCHEMA; - } - if (value instanceof Double) { + } else if (value instanceof Double) { return Schema.FLOAT64_SCHEMA; - } - if (value instanceof byte[] || value instanceof ByteBuffer) { + } else if (value instanceof byte[] || value instanceof ByteBuffer) { return Schema.BYTES_SCHEMA; + } else if (value instanceof List) { + return inferListSchema((List) value); + } else if (value instanceof Map) { + return inferMapSchema((Map) value); + } else if (value instanceof Struct) { + return ((Struct) value).schema(); } - if (value instanceof List) { - List list = (List) value; - if (list.isEmpty()) { + return null; + } + + private static Schema inferListSchema(List list) { + if (list.isEmpty()) { + return null; + } + SchemaDetector detector = new SchemaDetector(); + for (Object element : list) { + if (!detector.canDetect(element)) { return null; } - SchemaDetector detector = new SchemaDetector(); - for (Object element : list) { - if (!detector.canDetect(element)) { - return null; - } - } - return SchemaBuilder.array(detector.schema()).build(); } - if (value instanceof Map) { - Map map = (Map) value; - if (map.isEmpty()) { + return SchemaBuilder.array(detector.schema()).build(); + } + + private static Schema inferMapSchema(Map map) { + if (map.isEmpty()) { + return null; + } + SchemaDetector keyDetector = new SchemaDetector(); + SchemaDetector valueDetector = new SchemaDetector(); + for (Map.Entry entry : map.entrySet()) { + if (!keyDetector.canDetect(entry.getKey()) || !valueDetector.canDetect(entry.getValue())) { return null; } - SchemaDetector keyDetector = new SchemaDetector(); - SchemaDetector valueDetector = new SchemaDetector(); - for (Map.Entry entry : map.entrySet()) { - if (!keyDetector.canDetect(entry.getKey()) || !valueDetector.canDetect(entry.getValue())) { - return null; - } - } - return SchemaBuilder.map(keyDetector.schema(), valueDetector.schema()).build(); } - if (value instanceof Struct) { - return ((Struct) value).schema(); - } - return null; + return SchemaBuilder.map(keyDetector.schema(), valueDetector.schema()).build(); } - /** * Parse the specified string representation of a value into its schema and value. * @@ -387,8 +411,8 @@ public static SchemaAndValue parseString(String value) { if (value.isEmpty()) { return new SchemaAndValue(Schema.STRING_SCHEMA, value); } - Parser parser = new Parser(value); - return parse(parser, false); + ValueParser parser = new ValueParser(new Parser(value)); + return parser.parse(false); } /** @@ -396,7 +420,7 @@ public static SchemaAndValue parseString(String value) { * * @param toSchema the schema for the desired type; may not be null * @param fromSchema the schema for the supplied value; may be null if not known - * @return the converted value; never null + * @return the converted value; null if the passed-in schema was optional, and the input value was null. * @throws DataException if the value could not be converted to the desired type */ protected static Object convertTo(Schema toSchema, Schema fromSchema, Object value) throws DataException { @@ -408,184 +432,213 @@ protected static Object convertTo(Schema toSchema, Schema fromSchema, Object val } switch (toSchema.type()) { case BYTES: - if (Decimal.LOGICAL_NAME.equals(toSchema.name())) { - if (value instanceof ByteBuffer) { - value = Utils.toArray((ByteBuffer) value); - } - if (value instanceof byte[]) { - return Decimal.toLogical(toSchema, (byte[]) value); - } - if (value instanceof BigDecimal) { - return value; - } - if (value instanceof Number) { - // Not already a decimal, so treat it as a double ... - double converted = ((Number) value).doubleValue(); - return BigDecimal.valueOf(converted); - } - if (value instanceof String) { - return new BigDecimal(value.toString()); - } - } - if (value instanceof ByteBuffer) { - return Utils.toArray((ByteBuffer) value); - } - if (value instanceof byte[]) { - return value; - } - if (value instanceof BigDecimal) { - return Decimal.fromLogical(toSchema, (BigDecimal) value); - } - break; + return convertMaybeLogicalBytes(toSchema, value); case STRING: - StringBuilder sb = new StringBuilder(); - append(sb, value, false); - return sb.toString(); + return convertToString(fromSchema, value); case BOOLEAN: - if (value instanceof Boolean) { - return value; - } - if (value instanceof String) { - SchemaAndValue parsed = parseString(value.toString()); - if (parsed.value() instanceof Boolean) { - return parsed.value(); - } - } - return asLong(value, fromSchema, null) == 0L ? Boolean.FALSE : Boolean.TRUE; + return convertToBoolean(fromSchema, value); case INT8: - if (value instanceof Byte) { - return value; - } - return (byte) asLong(value, fromSchema, null); + return convertToByte(fromSchema, value); case INT16: - if (value instanceof Short) { - return value; - } - return (short) asLong(value, fromSchema, null); + return convertToShort(fromSchema, value); case INT32: - if (Date.LOGICAL_NAME.equals(toSchema.name())) { - if (value instanceof String) { - SchemaAndValue parsed = parseString(value.toString()); - value = parsed.value(); - } - if (value instanceof java.util.Date) { - if (fromSchema != null) { - String fromSchemaName = fromSchema.name(); - if (Date.LOGICAL_NAME.equals(fromSchemaName)) { - return value; - } - if (Timestamp.LOGICAL_NAME.equals(fromSchemaName)) { - // Just get the number of days from this timestamp - long millis = ((java.util.Date) value).getTime(); - int days = (int) (millis / MILLIS_PER_DAY); // truncates - return Date.toLogical(toSchema, days); - } - } else { - // There is no fromSchema, so no conversion is needed - return value; - } - } - long numeric = asLong(value, fromSchema, null); - return Date.toLogical(toSchema, (int) numeric); - } - if (Time.LOGICAL_NAME.equals(toSchema.name())) { - if (value instanceof String) { - SchemaAndValue parsed = parseString(value.toString()); - value = parsed.value(); - } - if (value instanceof java.util.Date) { - if (fromSchema != null) { - String fromSchemaName = fromSchema.name(); - if (Time.LOGICAL_NAME.equals(fromSchemaName)) { - return value; - } - if (Timestamp.LOGICAL_NAME.equals(fromSchemaName)) { - // Just get the time portion of this timestamp - Calendar calendar = Calendar.getInstance(UTC); - calendar.setTime((java.util.Date) value); - calendar.set(Calendar.YEAR, 1970); - calendar.set(Calendar.MONTH, 0); // Months are zero-based - calendar.set(Calendar.DAY_OF_MONTH, 1); - return Time.toLogical(toSchema, (int) calendar.getTimeInMillis()); - } - } else { - // There is no fromSchema, so no conversion is needed - return value; - } - } - long numeric = asLong(value, fromSchema, null); - return Time.toLogical(toSchema, (int) numeric); - } - if (value instanceof Integer) { - return value; - } - return (int) asLong(value, fromSchema, null); + return convertMaybeLogicalInteger(toSchema, fromSchema, value); case INT64: - if (Timestamp.LOGICAL_NAME.equals(toSchema.name())) { - if (value instanceof String) { - SchemaAndValue parsed = parseString(value.toString()); - value = parsed.value(); - } - if (value instanceof java.util.Date) { - java.util.Date date = (java.util.Date) value; - if (fromSchema != null) { - String fromSchemaName = fromSchema.name(); - if (Date.LOGICAL_NAME.equals(fromSchemaName)) { - int days = Date.fromLogical(fromSchema, date); - long millis = days * MILLIS_PER_DAY; - return Timestamp.toLogical(toSchema, millis); - } - if (Time.LOGICAL_NAME.equals(fromSchemaName)) { - long millis = Time.fromLogical(fromSchema, date); - return Timestamp.toLogical(toSchema, millis); - } - if (Timestamp.LOGICAL_NAME.equals(fromSchemaName)) { - return value; - } - } else { - // There is no fromSchema, so no conversion is needed - return value; - } - } - long numeric = asLong(value, fromSchema, null); - return Timestamp.toLogical(toSchema, numeric); - } - if (value instanceof Long) { - return value; - } - return asLong(value, fromSchema, null); + return convertMaybeLogicalLong(toSchema, fromSchema, value); case FLOAT32: - if (value instanceof Float) { - return value; - } - return (float) asDouble(value, fromSchema, null); + return convertToFloat(fromSchema, value); case FLOAT64: - if (value instanceof Double) { - return value; - } - return asDouble(value, fromSchema, null); + return convertToDouble(fromSchema, value); case ARRAY: - if (value instanceof String) { - SchemaAndValue schemaAndValue = parseString(value.toString()); - value = schemaAndValue.value(); + return convertToArray(toSchema, value); + case MAP: + return convertToMapInternal(toSchema, value); + case STRUCT: + return convertToStructInternal(toSchema, value); + } + throw new DataException("Unable to convert " + value + " (" + value.getClass() + ") to " + toSchema); + } + + private static Serializable convertMaybeLogicalBytes(Schema toSchema, Object value) { + if (Decimal.LOGICAL_NAME.equals(toSchema.name())) { + return convertToDecimal(toSchema, value); + } + return convertToBytes(toSchema, value); + } + + private static BigDecimal convertToDecimal(Schema toSchema, Object value) { + if (value instanceof ByteBuffer) { + value = Utils.toArray((ByteBuffer) value); + } + if (value instanceof byte[]) { + return Decimal.toLogical(toSchema, (byte[]) value); + } + if (value instanceof BigDecimal) { + return (BigDecimal) value; + } + if (value instanceof Number) { + // Not already a decimal, so treat it as a double ... + double converted = ((Number) value).doubleValue(); + return BigDecimal.valueOf(converted); + } + if (value instanceof String) { + return new BigDecimal(value.toString()); + } + throw new DataException("Unable to convert " + value + " (" + value.getClass() + ") to " + toSchema); + } + + private static byte[] convertToBytes(Schema toSchema, Object value) { + if (value instanceof ByteBuffer) { + return Utils.toArray((ByteBuffer) value); + } + if (value instanceof byte[]) { + return (byte[]) value; + } + if (value instanceof BigDecimal) { + return Decimal.fromLogical(toSchema, (BigDecimal) value); + } + throw new DataException("Unable to convert " + value + " (" + value.getClass() + ") to " + toSchema); + } + + private static Serializable convertMaybeLogicalInteger(Schema toSchema, Schema fromSchema, Object value) { + if (Date.LOGICAL_NAME.equals(toSchema.name())) { + return convertToDate(toSchema, fromSchema, value); + } + if (Time.LOGICAL_NAME.equals(toSchema.name())) { + return convertToTime(toSchema, fromSchema, value); + } + return convertToInteger(fromSchema, value); + } + + private static java.util.Date convertToDate(Schema toSchema, Schema fromSchema, Object value) { + if (value == null) { + return null; + } else if (value instanceof String) { + SchemaAndValue parsed = parseString(value.toString()); + value = parsed.value(); + } + if (value instanceof java.util.Date) { + if (fromSchema != null) { + String fromSchemaName = fromSchema.name(); + if (Date.LOGICAL_NAME.equals(fromSchemaName)) { + return (java.util.Date) value; } - if (value instanceof List) { - return value; + if (Timestamp.LOGICAL_NAME.equals(fromSchemaName)) { + // Just get the number of days from this timestamp + long millis = ((java.util.Date) value).getTime(); + int days = (int) (millis / MILLIS_PER_DAY); // truncates + return Date.toLogical(toSchema, days); } - break; - case MAP: - if (value instanceof String) { - SchemaAndValue schemaAndValue = parseString(value.toString()); - value = schemaAndValue.value(); + } else { + // There is no fromSchema, so no conversion is needed + return (java.util.Date) value; + } + } + long numeric = asLong(value, fromSchema, null); + return Date.toLogical(toSchema, (int) numeric); + } + + private static java.util.Date convertToTime(Schema toSchema, Schema fromSchema, Object value) { + if (value == null) { + return null; + } else if (value instanceof String) { + SchemaAndValue parsed = parseString(value.toString()); + value = parsed.value(); + } + if (value instanceof java.util.Date) { + if (fromSchema != null) { + String fromSchemaName = fromSchema.name(); + if (Time.LOGICAL_NAME.equals(fromSchemaName)) { + return (java.util.Date) value; } - if (value instanceof Map) { - return value; + if (Timestamp.LOGICAL_NAME.equals(fromSchemaName)) { + // Just get the time portion of this timestamp + Calendar calendar = Calendar.getInstance(UTC); + calendar.setTime((java.util.Date) value); + calendar.set(Calendar.YEAR, 1970); + calendar.set(Calendar.MONTH, 0); // Months are zero-based + calendar.set(Calendar.DAY_OF_MONTH, 1); + return Time.toLogical(toSchema, (int) calendar.getTimeInMillis()); } - break; - case STRUCT: - if (value instanceof Struct) { - return value; + } else { + // There is no fromSchema, so no conversion is needed + return (java.util.Date) value; + } + } + long numeric = asLong(value, fromSchema, null); + return Time.toLogical(toSchema, (int) numeric); + } + + private static Serializable convertMaybeLogicalLong(Schema toSchema, Schema fromSchema, Object value) { + if (Timestamp.LOGICAL_NAME.equals(toSchema.name())) { + return convertToTimestamp(toSchema, fromSchema, value); + } + return convertToLong(fromSchema, value); + } + + private static java.util.Date convertToTimestamp(Schema toSchema, Schema fromSchema, Object value) { + if (value == null) { + return null; + } else if (value instanceof String) { + SchemaAndValue parsed = parseString(value.toString()); + value = parsed.value(); + } + if (value instanceof java.util.Date) { + java.util.Date date = (java.util.Date) value; + if (fromSchema != null) { + String fromSchemaName = fromSchema.name(); + if (Date.LOGICAL_NAME.equals(fromSchemaName)) { + int days = Date.fromLogical(fromSchema, date); + long millis = days * MILLIS_PER_DAY; + return Timestamp.toLogical(toSchema, millis); } + if (Time.LOGICAL_NAME.equals(fromSchemaName)) { + long millis = Time.fromLogical(fromSchema, date); + return Timestamp.toLogical(toSchema, millis); + } + if (Timestamp.LOGICAL_NAME.equals(fromSchemaName)) { + return date; + } + } else { + // There is no fromSchema, so no conversion is needed + return date; + } + } + long numeric = asLong(value, fromSchema, null); + return Timestamp.toLogical(toSchema, numeric); + } + + private static List convertToArray(Schema toSchema, Object value) { + if (value == null) { + throw new DataException("Unable to convert a null value to a schema that requires a value"); + } else if (value instanceof String) { + SchemaAndValue schemaAndValue = parseString(value.toString()); + value = schemaAndValue.value(); + } + if (value instanceof List) { + return (List) value; + } + throw new DataException("Unable to convert " + value + " (" + value.getClass() + ") to " + toSchema); + } + + private static Map convertToMapInternal(Schema toSchema, Object value) { + if (value == null) { + throw new DataException("Unable to convert a null value to a schema that requires a value"); + } else if (value instanceof String) { + SchemaAndValue schemaAndValue = parseString(value.toString()); + value = schemaAndValue.value(); + } + if (value instanceof Map) { + return (Map) value; + } + throw new DataException("Unable to convert " + value + " (" + value.getClass() + ") to " + toSchema); + } + + private static Struct convertToStructInternal(Schema toSchema, Object value) { + if (value == null) { + throw new DataException("Unable to convert a null value to a schema that requires a value"); + } else if (value instanceof Struct) { + return (Struct) value; } throw new DataException("Unable to convert " + value + " (" + value.getClass() + ") to " + toSchema); } @@ -744,174 +797,212 @@ public static DateFormat dateFormatFor(java.util.Date value) { return new SimpleDateFormat(ISO_8601_TIMESTAMP_FORMAT_PATTERN); } - protected static boolean canParseSingleTokenLiteral(Parser parser, boolean embedded, String tokenLiteral) { - int startPosition = parser.mark(); - // If the next token is what we expect, then either... - if (parser.canConsume(tokenLiteral)) { - // ...we're reading an embedded value, in which case the next token will be handled appropriately - // by the caller if it's something like an end delimiter for a map or array, or a comma to - // separate multiple embedded values... - // ...or it's being parsed as part of a top-level string, in which case, any other tokens should - // cause use to stop parsing this single-token literal as such and instead just treat it like - // a string. For example, the top-level string "true}" will be tokenized as the tokens "true" and - // "}", but should ultimately be parsed as just the string "true}" instead of the boolean true. - if (embedded || !parser.hasNext()) { - return true; - } + private static class ValueParser { + + private static final Logger log = LoggerFactory.getLogger(ValueParser.class); + private static final SchemaAndValue TRUE_SCHEMA_AND_VALUE = new SchemaAndValue(Schema.BOOLEAN_SCHEMA, Boolean.TRUE); + private static final SchemaAndValue FALSE_SCHEMA_AND_VALUE = new SchemaAndValue(Schema.BOOLEAN_SCHEMA, Boolean.FALSE); + private static final String TRUE_LITERAL = Boolean.TRUE.toString(); + private static final String FALSE_LITERAL = Boolean.FALSE.toString(); + private static final BigInteger LONG_MIN = BigInteger.valueOf(Long.MIN_VALUE); + private static final BigInteger LONG_MAX = BigInteger.valueOf(Long.MAX_VALUE); + private static final String QUOTE_DELIMITER = "\""; + private static final String COMMA_DELIMITER = ","; + private static final String ENTRY_DELIMITER = ":"; + private static final String ARRAY_BEGIN_DELIMITER = "["; + private static final String ARRAY_END_DELIMITER = "]"; + private static final String MAP_BEGIN_DELIMITER = "{"; + private static final String MAP_END_DELIMITER = "}"; + private static final int ISO_8601_DATE_LENGTH = ISO_8601_DATE_FORMAT_PATTERN.length(); + private static final int ISO_8601_TIME_LENGTH = ISO_8601_TIME_FORMAT_PATTERN.length() - 2; // subtract single quotes + private static final int ISO_8601_TIMESTAMP_LENGTH = ISO_8601_TIMESTAMP_FORMAT_PATTERN.length() - 4; // subtract single quotes + + private final Parser parser; + + private ValueParser(Parser parser) { + this.parser = parser; } - parser.rewindTo(startPosition); - return false; - } - protected static SchemaAndValue parse(Parser parser, boolean embedded) throws NoSuchElementException { - if (!parser.hasNext()) { - return null; - } - if (embedded) { - if (parser.canConsume(QUOTE_DELIMITER)) { - StringBuilder sb = new StringBuilder(); - while (parser.hasNext()) { - if (parser.canConsume(QUOTE_DELIMITER)) { - break; - } - sb.append(parser.next()); - } - String content = sb.toString(); - // We can parse string literals as temporal logical types, but all others - // are treated as strings - SchemaAndValue parsed = parseString(content); - if (parsed != null && TEMPORAL_LOGICAL_TYPE_NAMES.contains(parsed.schema().name())) { - return parsed; + private boolean canParseSingleTokenLiteral(boolean embedded, String tokenLiteral) { + int startPosition = parser.mark(); + // If the next token is what we expect, then either... + if (parser.canConsume(tokenLiteral)) { + // ...we're reading an embedded value, in which case the next token will be handled appropriately + // by the caller if it's something like an end delimiter for a map or array, or a comma to + // separate multiple embedded values... + // ...or it's being parsed as part of a top-level string, in which case, any other tokens should + // cause use to stop parsing this single-token literal as such and instead just treat it like + // a string. For example, the top-level string "true}" will be tokenized as the tokens "true" and + // "}", but should ultimately be parsed as just the string "true}" instead of the boolean true. + if (embedded || !parser.hasNext()) { + return true; } - return new SchemaAndValue(Schema.STRING_SCHEMA, content); } + parser.rewindTo(startPosition); + return false; } - if (canParseSingleTokenLiteral(parser, embedded, NULL_VALUE)) { - return null; - } - if (canParseSingleTokenLiteral(parser, embedded, TRUE_LITERAL)) { - return TRUE_SCHEMA_AND_VALUE; - } - if (canParseSingleTokenLiteral(parser, embedded, FALSE_LITERAL)) { - return FALSE_SCHEMA_AND_VALUE; + public SchemaAndValue parse(boolean embedded) throws NoSuchElementException { + if (!parser.hasNext()) { + return null; + } else if (embedded && parser.canConsume(QUOTE_DELIMITER)) { + return parseQuotedString(); + } else if (canParseSingleTokenLiteral(embedded, NULL_VALUE)) { + return null; + } else if (canParseSingleTokenLiteral(embedded, TRUE_LITERAL)) { + return TRUE_SCHEMA_AND_VALUE; + } else if (canParseSingleTokenLiteral(embedded, FALSE_LITERAL)) { + return FALSE_SCHEMA_AND_VALUE; + } + + int startPosition = parser.mark(); + + try { + if (parser.canConsume(ARRAY_BEGIN_DELIMITER)) { + return parseArray(); + } else if (parser.canConsume(MAP_BEGIN_DELIMITER)) { + return parseMap(); + } + } catch (DataException e) { + log.trace("Unable to parse the value as a map or an array; reverting to string", e); + parser.rewindTo(startPosition); + } + + String token = parser.next(); + if (Utils.isBlank(token)) { + return new SchemaAndValue(Schema.STRING_SCHEMA, token); + } else { + return parseNextToken(embedded, token.trim()); + } } - int startPosition = parser.mark(); + private SchemaAndValue parseNextToken(boolean embedded, String token) { + char firstChar = token.charAt(0); + boolean firstCharIsDigit = Character.isDigit(firstChar); - try { - if (parser.canConsume(ARRAY_BEGIN_DELIMITER)) { - List result = new ArrayList<>(); - boolean compatible = true; - Schema elementSchema = null; - while (parser.hasNext()) { - if (parser.canConsume(ARRAY_END_DELIMITER)) { - Schema listSchema; - if (elementSchema != null && compatible) { - listSchema = SchemaBuilder.array(elementSchema).schema(); - result = alignListEntriesWithSchema(listSchema, result); - } else { - // Every value is null - listSchema = SchemaBuilder.arrayOfNull().build(); - } - return new SchemaAndValue(listSchema, result); - } + // Temporal types are more restrictive, so try them first + if (firstCharIsDigit) { + SchemaAndValue temporal = parseMultipleTokensAsTemporal(token); + if (temporal != null) { + return temporal; + } + } + if (firstCharIsDigit || firstChar == '+' || firstChar == '-') { + try { + return parseAsNumber(token); + } catch (NumberFormatException e) { + // can't parse as a number + } + } + if (embedded) { + throw new DataException("Failed to parse embedded value"); + } + // At this point, the only thing this non-embedded value can be is a string. + return new SchemaAndValue(Schema.STRING_SCHEMA, parser.original()); + } - if (parser.canConsume(COMMA_DELIMITER)) { - throw new DataException("Unable to parse an empty array element: " + parser.original()); - } - SchemaAndValue element = parse(parser, true); - elementSchema = commonSchemaFor(elementSchema, element); - if (elementSchema == null && element != null && element.schema() != null) { - compatible = false; - } - result.add(element != null ? element.value() : null); + private SchemaAndValue parseQuotedString() { + StringBuilder sb = new StringBuilder(); + while (parser.hasNext()) { + if (parser.canConsume(QUOTE_DELIMITER)) { + break; + } + sb.append(parser.next()); + } + String content = sb.toString(); + // We can parse string literals as temporal logical types, but all others + // are treated as strings + SchemaAndValue parsed = parseAsTemporal(content); + if (parsed != null) { + return parsed; + } + return new SchemaAndValue(Schema.STRING_SCHEMA, content); + } - int currentPosition = parser.mark(); - if (parser.canConsume(ARRAY_END_DELIMITER)) { - parser.rewindTo(currentPosition); - } else if (!parser.canConsume(COMMA_DELIMITER)) { - throw new DataException("Array elements missing '" + COMMA_DELIMITER + "' delimiter"); + private SchemaAndValue parseArray() { + List result = new ArrayList<>(); + SchemaMerger elementSchema = new SchemaMerger(); + while (parser.hasNext()) { + if (parser.canConsume(ARRAY_END_DELIMITER)) { + Schema listSchema; + if (elementSchema.hasCommonSchema()) { + listSchema = SchemaBuilder.array(elementSchema.schema()).schema(); + result = alignListEntriesWithSchema(listSchema, result); + } else { + // Every value is null + listSchema = SchemaBuilder.arrayOfNull().build(); } + return new SchemaAndValue(listSchema, result); } - // Missing either a comma or an end delimiter - if (COMMA_DELIMITER.equals(parser.previous())) { - throw new DataException("Array is missing element after ',': " + parser.original()); + if (parser.canConsume(COMMA_DELIMITER)) { + throw new DataException("Unable to parse an empty array element: " + parser.original()); + } + SchemaAndValue element = parse(true); + elementSchema.merge(element); + result.add(element != null ? element.value() : null); + + int currentPosition = parser.mark(); + if (parser.canConsume(ARRAY_END_DELIMITER)) { + parser.rewindTo(currentPosition); + } else if (!parser.canConsume(COMMA_DELIMITER)) { + throw new DataException("Array elements missing '" + COMMA_DELIMITER + "' delimiter"); } - throw new DataException("Array is missing terminating ']': " + parser.original()); } - if (parser.canConsume(MAP_BEGIN_DELIMITER)) { - Map result = new LinkedHashMap<>(); - boolean keyCompatible = true; - Schema keySchema = null; - boolean valueCompatible = true; - Schema valueSchema = null; - while (parser.hasNext()) { - if (parser.canConsume(MAP_END_DELIMITER)) { - Schema mapSchema; - if (keySchema != null && valueSchema != null && keyCompatible && valueCompatible) { - mapSchema = SchemaBuilder.map(keySchema, valueSchema).build(); - result = alignMapKeysAndValuesWithSchema(mapSchema, result); - } else if (keySchema != null && keyCompatible) { - mapSchema = SchemaBuilder.mapWithNullValues(keySchema); - result = alignMapKeysWithSchema(mapSchema, result); - } else { - mapSchema = SchemaBuilder.mapOfNull().build(); - } - return new SchemaAndValue(mapSchema, result); - } + // Missing either a comma or an end delimiter + if (COMMA_DELIMITER.equals(parser.previous())) { + throw new DataException("Array is missing element after ',': " + parser.original()); + } + throw new DataException("Array is missing terminating ']': " + parser.original()); + } - if (parser.canConsume(COMMA_DELIMITER)) { - throw new DataException("Unable to parse a map entry with no key or value: " + parser.original()); - } - SchemaAndValue key = parse(parser, true); - if (key == null || key.value() == null) { - throw new DataException("Map entry may not have a null key: " + parser.original()); + private SchemaAndValue parseMap() { + Map result = new LinkedHashMap<>(); + SchemaMerger keySchema = new SchemaMerger(); + SchemaMerger valueSchema = new SchemaMerger(); + while (parser.hasNext()) { + if (parser.canConsume(MAP_END_DELIMITER)) { + Schema mapSchema; + if (keySchema.hasCommonSchema() && valueSchema.hasCommonSchema()) { + mapSchema = SchemaBuilder.map(keySchema.schema(), valueSchema.schema()).build(); + result = alignMapKeysAndValuesWithSchema(mapSchema, result); + } else if (keySchema.hasCommonSchema()) { + mapSchema = SchemaBuilder.mapWithNullValues(keySchema.schema()); + result = alignMapKeysWithSchema(mapSchema, result); + } else { + mapSchema = SchemaBuilder.mapOfNull().build(); } + return new SchemaAndValue(mapSchema, result); + } - if (!parser.canConsume(ENTRY_DELIMITER)) { - throw new DataException("Map entry is missing '" + ENTRY_DELIMITER - + "' at " + parser.position() - + " in " + parser.original()); - } - SchemaAndValue value = parse(parser, true); - Object entryValue = value != null ? value.value() : null; - result.put(key.value(), entryValue); - - parser.canConsume(COMMA_DELIMITER); - keySchema = commonSchemaFor(keySchema, key); - if (keySchema == null && key.schema() != null) { - keyCompatible = false; - } - valueSchema = commonSchemaFor(valueSchema, value); - if (valueSchema == null && value != null && value.schema() != null) { - valueCompatible = false; - } + if (parser.canConsume(COMMA_DELIMITER)) { + throw new DataException("Unable to parse a map entry with no key or value: " + parser.original()); } - // Missing either a comma or an end delimiter - if (COMMA_DELIMITER.equals(parser.previous())) { - throw new DataException("Map is missing element after ',': " + parser.original()); + SchemaAndValue key = parse(true); + if (key == null || key.value() == null) { + throw new DataException("Map entry may not have a null key: " + parser.original()); + } else if (!parser.canConsume(ENTRY_DELIMITER)) { + throw new DataException("Map entry is missing '" + ENTRY_DELIMITER + + "' at " + parser.position() + + " in " + parser.original()); } - throw new DataException("Map is missing terminating '}': " + parser.original()); - } - } catch (DataException e) { - LOG.trace("Unable to parse the value as a map or an array; reverting to string", e); - parser.rewindTo(startPosition); - } + SchemaAndValue value = parse(true); + Object entryValue = value != null ? value.value() : null; + result.put(key.value(), entryValue); - String token = parser.next(); - if (Utils.isBlank(token)) { - return new SchemaAndValue(Schema.STRING_SCHEMA, token); + parser.canConsume(COMMA_DELIMITER); + keySchema.merge(key); + valueSchema.merge(value); + } + // Missing either a comma or an end delimiter + if (COMMA_DELIMITER.equals(parser.previous())) { + throw new DataException("Map is missing element after ',': " + parser.original()); + } + throw new DataException("Map is missing terminating '}': " + parser.original()); } - token = token.trim(); - char firstChar = token.charAt(0); - boolean firstCharIsDigit = Character.isDigit(firstChar); - - // Temporal types are more restrictive, so try them first - if (firstCharIsDigit) { + private SchemaAndValue parseMultipleTokensAsTemporal(String token) { // The time and timestamp literals may be split into 5 tokens since an unescaped colon // is a delimiter. Check these first since the first of these tokens is a simple numeric int position = parser.mark(); @@ -925,143 +1016,146 @@ protected static SchemaAndValue parse(Parser parser, boolean embedded) throws No } // No match was found using the 5 tokens, so rewind and see if the current token has a date, time, or timestamp parser.rewindTo(position); - SchemaAndValue temporal = parseAsTemporal(token); - if (temporal != null) { - return temporal; - } + return parseAsTemporal(token); } - if (firstCharIsDigit || firstChar == '+' || firstChar == '-') { - try { - // Try to parse as a number ... - BigDecimal decimal = new BigDecimal(token); - try { - return new SchemaAndValue(Schema.INT8_SCHEMA, decimal.byteValueExact()); - } catch (ArithmeticException e) { - // continue - } - try { - return new SchemaAndValue(Schema.INT16_SCHEMA, decimal.shortValueExact()); - } catch (ArithmeticException e) { - // continue - } - try { - return new SchemaAndValue(Schema.INT32_SCHEMA, decimal.intValueExact()); - } catch (ArithmeticException e) { - // continue - } - try { - return new SchemaAndValue(Schema.INT64_SCHEMA, decimal.longValueExact()); - } catch (ArithmeticException e) { - // continue - } - float fValue = decimal.floatValue(); - if (fValue != Float.NEGATIVE_INFINITY && fValue != Float.POSITIVE_INFINITY + + private static SchemaAndValue parseAsNumber(String token) { + // Try to parse as a number ... + BigDecimal decimal = new BigDecimal(token); + SchemaAndValue exactDecimal = parseAsExactDecimal(decimal); + float fValue = decimal.floatValue(); + double dValue = decimal.doubleValue(); + if (exactDecimal != null) { + return exactDecimal; + } else if (fValue != Float.NEGATIVE_INFINITY && fValue != Float.POSITIVE_INFINITY && decimal.scale() != 0) { - return new SchemaAndValue(Schema.FLOAT32_SCHEMA, fValue); - } - double dValue = decimal.doubleValue(); - if (dValue != Double.NEGATIVE_INFINITY && dValue != Double.POSITIVE_INFINITY + return new SchemaAndValue(Schema.FLOAT32_SCHEMA, fValue); + } else if (dValue != Double.NEGATIVE_INFINITY && dValue != Double.POSITIVE_INFINITY && decimal.scale() != 0) { - return new SchemaAndValue(Schema.FLOAT64_SCHEMA, dValue); - } + return new SchemaAndValue(Schema.FLOAT64_SCHEMA, dValue); + } else { Schema schema = Decimal.schema(decimal.scale()); return new SchemaAndValue(schema, decimal); - } catch (NumberFormatException e) { - // can't parse as a number } } - if (embedded) { - throw new DataException("Failed to parse embedded value"); - } - // At this point, the only thing this non-embedded value can be is a string. - return new SchemaAndValue(Schema.STRING_SCHEMA, parser.original()); - } - private static SchemaAndValue parseAsTemporal(String token) { - if (token == null) { + private static SchemaAndValue parseAsExactDecimal(BigDecimal decimal) { + BigDecimal ceil = decimal.setScale(0, RoundingMode.CEILING); + BigDecimal floor = decimal.setScale(0, RoundingMode.FLOOR); + if (ceil.equals(floor)) { + BigInteger num = ceil.toBigIntegerExact(); + if (ceil.precision() >= 19 && (num.compareTo(LONG_MIN) < 0 || num.compareTo(LONG_MAX) > 0)) { + return null; + } + long integral = num.longValue(); + byte int8 = (byte) integral; + short int16 = (short) integral; + int int32 = (int) integral; + if (int8 == integral) { + return new SchemaAndValue(Schema.INT8_SCHEMA, int8); + } else if (int16 == integral) { + return new SchemaAndValue(Schema.INT16_SCHEMA, int16); + } else if (int32 == integral) { + return new SchemaAndValue(Schema.INT32_SCHEMA, int32); + } else { + return new SchemaAndValue(Schema.INT64_SCHEMA, integral); + } + } return null; } - // If the colons were escaped, we'll see the escape chars and need to remove them - token = token.replace("\\:", ":"); - int tokenLength = token.length(); - if (tokenLength == ISO_8601_TIME_LENGTH) { - try { - return new SchemaAndValue(Time.SCHEMA, new SimpleDateFormat(ISO_8601_TIME_FORMAT_PATTERN).parse(token)); - } catch (ParseException e) { - // not a valid date + + private static SchemaAndValue parseAsTemporal(String token) { + if (token == null) { + return null; } - } else if (tokenLength == ISO_8601_TIMESTAMP_LENGTH) { - try { - return new SchemaAndValue(Timestamp.SCHEMA, new SimpleDateFormat(ISO_8601_TIMESTAMP_FORMAT_PATTERN).parse(token)); - } catch (ParseException e) { - // not a valid date + // If the colons were escaped, we'll see the escape chars and need to remove them + token = token.replace("\\:", ":"); + int tokenLength = token.length(); + if (tokenLength == ISO_8601_TIME_LENGTH) { + return parseAsTemporalType(token, Time.SCHEMA, ISO_8601_TIME_FORMAT_PATTERN); + } else if (tokenLength == ISO_8601_TIMESTAMP_LENGTH) { + return parseAsTemporalType(token, Timestamp.SCHEMA, ISO_8601_TIMESTAMP_FORMAT_PATTERN); + } else if (tokenLength == ISO_8601_DATE_LENGTH) { + return parseAsTemporalType(token, Date.SCHEMA, ISO_8601_DATE_FORMAT_PATTERN); + } else { + return null; } - } else if (tokenLength == ISO_8601_DATE_LENGTH) { - try { - return new SchemaAndValue(Date.SCHEMA, new SimpleDateFormat(ISO_8601_DATE_FORMAT_PATTERN).parse(token)); - } catch (ParseException e) { - // not a valid date + } + + private static SchemaAndValue parseAsTemporalType(String token, Schema schema, String pattern) { + ParsePosition pos = new ParsePosition(0); + java.util.Date result = new SimpleDateFormat(pattern).parse(token, pos); + if (pos.getIndex() != 0) { + return new SchemaAndValue(schema, result); } + return null; } - return null; } - protected static Schema commonSchemaFor(Schema previous, SchemaAndValue latest) { - if (latest == null) { - return previous; + /** + * Utility for merging various optional primitive numeric schemas into a common schema. + * If a non-numeric type appears (including logical numeric types), no common schema will be inferred. + * This class is not thread-safe and should only be accessed by one thread. + */ + private static class SchemaMerger { + /** + * Schema which applies to all of the values passed to {@link #merge(SchemaAndValue)} + * Null if no non-null schemas have been seen, or if the values seen do not have a common schema + */ + private Schema common = null; + /** + * Flag to determine the meaning of the null sentinel in {@link #common} + * If true, null means "any optional type", as no non-null values have appeared. + * If false, null means "no common type", as one or more non-null values had mutually exclusive schemas. + */ + private boolean compatible = true; + + protected void merge(SchemaAndValue latest) { + if (latest != null && latest.schema() != null && compatible) { + if (common == null) { + // This null means any type is valid, so choose the new schema. + common = latest.schema(); + } else { + // There is a previous type restriction, so merge the new schema into the old one. + common = mergeSchemas(common, latest.schema()); + // If there isn't a common schema any longer, then give up on finding further compatible schemas. + compatible = common != null; + } + } } - if (previous == null) { - return latest.schema(); + + protected boolean hasCommonSchema() { + return common != null; } - Schema newSchema = latest.schema(); + + protected Schema schema() { + return common; + } + } + + /** + * Merge two schemas to a common schema which can represent values from both input schemas. + * @param previous One Schema, non-null + * @param newSchema Another schema, non-null + * @return A schema that is a superset of both input schemas, or null if no common schema is found. + */ + private static Schema mergeSchemas(Schema previous, Schema newSchema) { Type previousType = previous.type(); Type newType = newSchema.type(); if (previousType != newType) { switch (previous.type()) { case INT8: - if (newType == Type.INT16 || newType == Type.INT32 || newType == Type.INT64 || newType == Type.FLOAT32 || newType == - Type.FLOAT64) { - return newSchema; - } - break; + return commonSchemaForInt8(newSchema, newType); case INT16: - if (newType == Type.INT8) { - return previous; - } - if (newType == Type.INT32 || newType == Type.INT64 || newType == Type.FLOAT32 || newType == Type.FLOAT64) { - return newSchema; - } - break; + return commonSchemaForInt16(previous, newSchema, newType); case INT32: - if (newType == Type.INT8 || newType == Type.INT16) { - return previous; - } - if (newType == Type.INT64 || newType == Type.FLOAT32 || newType == Type.FLOAT64) { - return newSchema; - } - break; + return commonSchemaForInt32(previous, newSchema, newType); case INT64: - if (newType == Type.INT8 || newType == Type.INT16 || newType == Type.INT32) { - return previous; - } - if (newType == Type.FLOAT32 || newType == Type.FLOAT64) { - return newSchema; - } - break; + return commonSchemaForInt64(previous, newSchema, newType); case FLOAT32: - if (newType == Type.INT8 || newType == Type.INT16 || newType == Type.INT32 || newType == Type.INT64) { - return previous; - } - if (newType == Type.FLOAT64) { - return newSchema; - } - break; + return commonSchemaForFloat32(previous, newSchema, newType); case FLOAT64: - if (newType == Type.INT8 || newType == Type.INT16 || newType == Type.INT32 || newType == Type.INT64 || newType == - Type.FLOAT32) { - return previous; - } - break; + return commonSchemaForFloat64(previous, newType); } return null; } @@ -1075,6 +1169,59 @@ protected static Schema commonSchemaFor(Schema previous, SchemaAndValue latest) return previous; } + private static Schema commonSchemaForInt8(Schema newSchema, Type newType) { + if (newType == Type.INT16 || newType == Type.INT32 || newType == Type.INT64 + || newType == Type.FLOAT32 || newType == Type.FLOAT64) { + return newSchema; + } + return null; + } + + private static Schema commonSchemaForInt16(Schema previous, Schema newSchema, Type newType) { + if (newType == Type.INT8) { + return previous; + } else if (newType == Type.INT32 || newType == Type.INT64 + || newType == Type.FLOAT32 || newType == Type.FLOAT64) { + return newSchema; + } + return null; + } + + private static Schema commonSchemaForInt32(Schema previous, Schema newSchema, Type newType) { + if (newType == Type.INT8 || newType == Type.INT16) { + return previous; + } else if (newType == Type.INT64 || newType == Type.FLOAT32 || newType == Type.FLOAT64) { + return newSchema; + } + return null; + } + + private static Schema commonSchemaForInt64(Schema previous, Schema newSchema, Type newType) { + if (newType == Type.INT8 || newType == Type.INT16 || newType == Type.INT32) { + return previous; + } else if (newType == Type.FLOAT32 || newType == Type.FLOAT64) { + return newSchema; + } + return null; + } + + private static Schema commonSchemaForFloat32(Schema previous, Schema newSchema, Type newType) { + if (newType == Type.INT8 || newType == Type.INT16 || newType == Type.INT32 || newType == Type.INT64) { + return previous; + } else if (newType == Type.FLOAT64) { + return newSchema; + } + return null; + } + + private static Schema commonSchemaForFloat64(Schema previous, Type newType) { + if (newType == Type.INT8 || newType == Type.INT16 || newType == Type.INT32 || newType == Type.INT64 + || newType == Type.FLOAT32) { + return previous; + } + return null; + } + protected static List alignListEntriesWithSchema(Schema schema, List input) { Schema valueSchema = schema.valueSchema(); List result = new ArrayList<>(); diff --git a/connect/api/src/test/java/org/apache/kafka/connect/data/ValuesTest.java b/connect/api/src/test/java/org/apache/kafka/connect/data/ValuesTest.java index 3aad588fc220b..df3c2ade5dd83 100644 --- a/connect/api/src/test/java/org/apache/kafka/connect/data/ValuesTest.java +++ b/connect/api/src/test/java/org/apache/kafka/connect/data/ValuesTest.java @@ -81,6 +81,20 @@ public class ValuesTest { INT_LIST.add(-987654321); } + @Test + public void shouldParseNullString() { + SchemaAndValue schemaAndValue = Values.parseString(null); + assertNull(schemaAndValue.schema()); + assertNull(schemaAndValue.value()); + } + + @Test + public void shouldParseEmptyString() { + SchemaAndValue schemaAndValue = Values.parseString(""); + assertEquals(Schema.STRING_SCHEMA, schemaAndValue.schema()); + assertEquals("", schemaAndValue.value()); + } + @Test @Timeout(5) public void shouldNotEncounterInfiniteLoop() { @@ -246,6 +260,20 @@ public void shouldEscapeStringsWithEmbeddedQuotesAndBackslashes() { @Test public void shouldConvertNullValue() { + assertRoundTrip(Schema.INT8_SCHEMA, Schema.STRING_SCHEMA, null); + assertRoundTrip(Schema.OPTIONAL_INT8_SCHEMA, Schema.STRING_SCHEMA, null); + assertRoundTrip(Schema.INT16_SCHEMA, Schema.STRING_SCHEMA, null); + assertRoundTrip(Schema.OPTIONAL_INT16_SCHEMA, Schema.STRING_SCHEMA, null); + assertRoundTrip(Schema.INT32_SCHEMA, Schema.STRING_SCHEMA, null); + assertRoundTrip(Schema.OPTIONAL_INT32_SCHEMA, Schema.STRING_SCHEMA, null); + assertRoundTrip(Schema.INT64_SCHEMA, Schema.STRING_SCHEMA, null); + assertRoundTrip(Schema.OPTIONAL_INT64_SCHEMA, Schema.STRING_SCHEMA, null); + assertRoundTrip(Schema.FLOAT32_SCHEMA, Schema.STRING_SCHEMA, null); + assertRoundTrip(Schema.OPTIONAL_FLOAT32_SCHEMA, Schema.STRING_SCHEMA, null); + assertRoundTrip(Schema.FLOAT64_SCHEMA, Schema.STRING_SCHEMA, null); + assertRoundTrip(Schema.OPTIONAL_FLOAT64_SCHEMA, Schema.STRING_SCHEMA, null); + assertRoundTrip(Schema.BOOLEAN_SCHEMA, Schema.STRING_SCHEMA, null); + assertRoundTrip(Schema.OPTIONAL_BOOLEAN_SCHEMA, Schema.STRING_SCHEMA, null); assertRoundTrip(Schema.STRING_SCHEMA, Schema.STRING_SCHEMA, null); assertRoundTrip(Schema.OPTIONAL_STRING_SCHEMA, Schema.STRING_SCHEMA, null); } @@ -253,14 +281,22 @@ public void shouldConvertNullValue() { @Test public void shouldConvertBooleanValues() { assertRoundTrip(Schema.BOOLEAN_SCHEMA, Schema.BOOLEAN_SCHEMA, Boolean.FALSE); + assertShortCircuit(Schema.BOOLEAN_SCHEMA, Boolean.FALSE); SchemaAndValue resultFalse = roundTrip(Schema.BOOLEAN_SCHEMA, "false"); assertEquals(Schema.BOOLEAN_SCHEMA, resultFalse.schema()); assertEquals(Boolean.FALSE, resultFalse.value()); + resultFalse = roundTrip(Schema.BOOLEAN_SCHEMA, "0"); + assertEquals(Schema.BOOLEAN_SCHEMA, resultFalse.schema()); + assertEquals(Boolean.FALSE, resultFalse.value()); assertRoundTrip(Schema.BOOLEAN_SCHEMA, Schema.BOOLEAN_SCHEMA, Boolean.TRUE); + assertShortCircuit(Schema.BOOLEAN_SCHEMA, Boolean.TRUE); SchemaAndValue resultTrue = roundTrip(Schema.BOOLEAN_SCHEMA, "true"); assertEquals(Schema.BOOLEAN_SCHEMA, resultTrue.schema()); assertEquals(Boolean.TRUE, resultTrue.value()); + resultTrue = roundTrip(Schema.BOOLEAN_SCHEMA, "1"); + assertEquals(Schema.BOOLEAN_SCHEMA, resultTrue.schema()); + assertEquals(Boolean.TRUE, resultTrue.value()); } @Test @@ -268,6 +304,38 @@ public void shouldFailToParseInvalidBooleanValueString() { assertThrows(DataException.class, () -> Values.convertToBoolean(Schema.STRING_SCHEMA, "\"green\"")); } + @Test + public void shouldConvertInt8() { + assertRoundTrip(Schema.INT8_SCHEMA, Schema.INT8_SCHEMA, (byte) 0); + assertRoundTrip(Schema.INT8_SCHEMA, Schema.INT8_SCHEMA, (byte) 1); + } + + @Test + public void shouldConvertInt64() { + assertRoundTrip(Schema.INT64_SCHEMA, Schema.INT64_SCHEMA, (long) 1); + assertShortCircuit(Schema.INT64_SCHEMA, (long) 1); + } + + @Test + public void shouldConvertFloat32() { + assertRoundTrip(Schema.FLOAT32_SCHEMA, Schema.FLOAT32_SCHEMA, (float) 1); + assertShortCircuit(Schema.FLOAT32_SCHEMA, (float) 1); + } + + @Test + public void shouldConvertFloat64() { + assertRoundTrip(Schema.FLOAT64_SCHEMA, Schema.FLOAT64_SCHEMA, (double) 1); + assertShortCircuit(Schema.FLOAT64_SCHEMA, (double) 1); + } + + @Test + public void shouldConvertEmptyStruct() { + Struct struct = new Struct(SchemaBuilder.struct().build()); + assertThrows(DataException.class, () -> Values.convertToStruct(struct.schema(), null)); + assertThrows(DataException.class, () -> Values.convertToStruct(struct.schema(), "")); + Values.convertToStruct(struct.schema(), struct); + } + @Test public void shouldConvertSimpleString() { assertRoundTrip(Schema.STRING_SCHEMA, "simple"); @@ -361,7 +429,27 @@ public void shouldConvertStringOfListWithOnlyNumericElementTypesIntoListOfLarges assertEquals(3, list.size()); assertEquals(1, ((Number) list.get(0)).intValue()); assertEquals(2, ((Number) list.get(1)).intValue()); - assertEquals(thirdValue, ((Number) list.get(2)).intValue()); + assertEquals(thirdValue, list.get(2)); + } + + @Test + public void shouldConvertIntegralTypesToFloat() { + float thirdValue = Float.MAX_VALUE; + List list = Values.convertToList(Schema.STRING_SCHEMA, "[1, 2, " + thirdValue + "]"); + assertEquals(3, list.size()); + assertEquals(1, ((Number) list.get(0)).intValue()); + assertEquals(2, ((Number) list.get(1)).intValue()); + assertEquals(thirdValue, list.get(2)); + } + + @Test + public void shouldConvertIntegralTypesToDouble() { + double thirdValue = Double.MAX_VALUE; + List list = Values.convertToList(Schema.STRING_SCHEMA, "[1, 2, " + thirdValue + "]"); + assertEquals(3, list.size()); + assertEquals(1, ((Number) list.get(0)).intValue()); + assertEquals(2, ((Number) list.get(1)).intValue()); + assertEquals(thirdValue, list.get(2)); } /** @@ -434,6 +522,34 @@ private void assertParseStringMapWithNoSchema(Map expected, Stri assertEquals(expected, list); } + @Test + public void shouldParseNestedArray() { + SchemaAndValue schemaAndValue = Values.parseString("[[]]"); + assertEquals(Type.ARRAY, schemaAndValue.schema().type()); + assertEquals(Type.ARRAY, schemaAndValue.schema().valueSchema().type()); + } + + @Test + public void shouldParseArrayContainingMap() { + SchemaAndValue schemaAndValue = Values.parseString("[{}]"); + assertEquals(Type.ARRAY, schemaAndValue.schema().type()); + assertEquals(Type.MAP, schemaAndValue.schema().valueSchema().type()); + } + + @Test + public void shouldParseNestedMap() { + SchemaAndValue schemaAndValue = Values.parseString("{\"a\":{}}"); + assertEquals(Type.MAP, schemaAndValue.schema().type()); + assertEquals(Type.MAP, schemaAndValue.schema().valueSchema().type()); + } + + @Test + public void shouldParseMapContainingArray() { + SchemaAndValue schemaAndValue = Values.parseString("{\"a\":[]}"); + assertEquals(Type.MAP, schemaAndValue.schema().type()); + assertEquals(Type.ARRAY, schemaAndValue.schema().valueSchema().type()); + } + /** * We can't infer or successfully parse into a different type, so this returns the same string. */ @@ -445,6 +561,22 @@ public void shouldParseStringListWithExtraDelimitersAndReturnString() { assertEquals(str, result.value()); } + @Test + public void shouldParseStringListWithNullLastAsString() { + String str = "[1, null]"; + SchemaAndValue result = Values.parseString(str); + assertEquals(Type.STRING, result.schema().type()); + assertEquals(str, result.value()); + } + + @Test + public void shouldParseStringListWithNullFirstAsString() { + String str = "[null, 1]"; + SchemaAndValue result = Values.parseString(str); + assertEquals(Type.STRING, result.schema().type()); + assertEquals(str, result.value()); + } + @Test public void shouldParseTimestampStringAsTimestamp() throws Exception { String str = "2019-08-23T14:34:54.346Z"; @@ -585,6 +717,13 @@ public void shouldParseTimeStringAsTimeInMap() throws Exception { assertEquals(Collections.singletonMap(keyStr, expected), result.value()); } + @Test + public void shouldFailToConvertNullTime() { + assertThrows(DataException.class, () -> Values.convertToTime(null, null)); + assertThrows(DataException.class, () -> Values.convertToDate(null, null)); + assertThrows(DataException.class, () -> Values.convertToTimestamp(null, null)); + } + /** * This is technically invalid JSON, and we don't want to simply ignore the blank elements. */ @@ -802,6 +941,51 @@ public void shouldConvertDecimalValues() { assertEquals(value, Values.convertToDecimal(null, buffer, 1)); } + @Test + public void shouldFailToConvertNullToDecimal() { + assertThrows(DataException.class, () -> Values.convertToDecimal(null, null, 1)); + } + + @Test + public void shouldInferByteSchema() { + byte[] bytes = new byte[1]; + Schema byteSchema = Values.inferSchema(bytes); + assertEquals(Schema.BYTES_SCHEMA, byteSchema); + Schema byteBufferSchema = Values.inferSchema(ByteBuffer.wrap(bytes)); + assertEquals(Schema.BYTES_SCHEMA, byteBufferSchema); + } + + @Test + public void shouldInferStructSchema() { + Struct struct = new Struct(SchemaBuilder.struct().build()); + Schema structSchema = Values.inferSchema(struct); + assertEquals(struct.schema(), structSchema); + } + + @Test + public void shouldInferNoSchemaForEmptyList() { + Schema listSchema = Values.inferSchema(Collections.emptyList()); + assertNull(listSchema); + } + + @Test + public void shouldInferNoSchemaForListContainingObject() { + Schema listSchema = Values.inferSchema(Collections.singletonList(new Object())); + assertNull(listSchema); + } + + @Test + public void shouldInferNoSchemaForEmptyMap() { + Schema listSchema = Values.inferSchema(Collections.emptyMap()); + assertNull(listSchema); + } + + @Test + public void shouldInferNoSchemaForMapContainingObject() { + Schema listSchema = Values.inferSchema(Collections.singletonMap(new Object(), new Object())); + assertNull(listSchema); + } + /** * Test parsing distinct number-like types (strings containing numbers, and logical Decimals) in the same list * The parser does not convert Numbers to Decimals, or Strings containing numbers to Numbers automatically. @@ -819,6 +1003,17 @@ public void shouldNotConvertArrayValuesToDecimal() { assertEquals(expected, schemaAndValue.value()); } + @Test + public void shouldParseArrayOfOnlyDecimals() { + List decimals = Arrays.asList(BigDecimal.valueOf(Long.MAX_VALUE).add(BigDecimal.ONE), + BigDecimal.valueOf(Long.MIN_VALUE).subtract(BigDecimal.ONE)); + SchemaAndValue schemaAndValue = Values.parseString(decimals.toString()); + Schema schema = schemaAndValue.schema(); + assertEquals(Type.ARRAY, schema.type()); + assertEquals(Decimal.schema(0), schema.valueSchema()); + assertEquals(decimals, schemaAndValue.value()); + } + @Test public void canConsume() { } @@ -949,6 +1144,16 @@ public void shouldParseDoubleAsFloat64() { assertEquals(value, (Double) schemaAndValue.value(), 0); } + @Test + public void shouldParseFractionalPartsAsIntegerWhenNoFractionalPart() { + assertEquals(new SchemaAndValue(Schema.INT8_SCHEMA, (byte) 1), Values.parseString("1.0")); + assertEquals(new SchemaAndValue(Schema.FLOAT32_SCHEMA, 1.1f), Values.parseString("1.1")); + assertEquals(new SchemaAndValue(Schema.INT16_SCHEMA, (short) 300), Values.parseString("300.0")); + assertEquals(new SchemaAndValue(Schema.FLOAT32_SCHEMA, 300.01f), Values.parseString("300.01")); + assertEquals(new SchemaAndValue(Schema.INT32_SCHEMA, 66000), Values.parseString("66000.0")); + assertEquals(new SchemaAndValue(Schema.FLOAT32_SCHEMA, 66000.0008f), Values.parseString("66000.0008")); + } + protected void assertParsed(String input) { assertParsed(input, input); } @@ -1011,47 +1216,48 @@ protected SchemaAndValue roundTrip(Schema desiredSchema, SchemaAndValue input) { desiredSchema = Values.inferSchema(input); assertNotNull(desiredSchema); } + return convertTo(desiredSchema, serialized); + } + + protected SchemaAndValue convertTo(Schema desiredSchema, Object value) { Object newValue = null; - Schema newSchema = null; switch (desiredSchema.type()) { case STRING: - newValue = Values.convertToString(Schema.STRING_SCHEMA, serialized); + newValue = Values.convertToString(Schema.STRING_SCHEMA, value); break; case INT8: - newValue = Values.convertToByte(Schema.STRING_SCHEMA, serialized); + newValue = Values.convertToByte(Schema.STRING_SCHEMA, value); break; case INT16: - newValue = Values.convertToShort(Schema.STRING_SCHEMA, serialized); + newValue = Values.convertToShort(Schema.STRING_SCHEMA, value); break; case INT32: - newValue = Values.convertToInteger(Schema.STRING_SCHEMA, serialized); + newValue = Values.convertToInteger(Schema.STRING_SCHEMA, value); break; case INT64: - newValue = Values.convertToLong(Schema.STRING_SCHEMA, serialized); + newValue = Values.convertToLong(Schema.STRING_SCHEMA, value); break; case FLOAT32: - newValue = Values.convertToFloat(Schema.STRING_SCHEMA, serialized); + newValue = Values.convertToFloat(Schema.STRING_SCHEMA, value); break; case FLOAT64: - newValue = Values.convertToDouble(Schema.STRING_SCHEMA, serialized); + newValue = Values.convertToDouble(Schema.STRING_SCHEMA, value); break; case BOOLEAN: - newValue = Values.convertToBoolean(Schema.STRING_SCHEMA, serialized); + newValue = Values.convertToBoolean(Schema.STRING_SCHEMA, value); break; case ARRAY: - newValue = Values.convertToList(Schema.STRING_SCHEMA, serialized); + newValue = Values.convertToList(Schema.STRING_SCHEMA, value); break; case MAP: - newValue = Values.convertToMap(Schema.STRING_SCHEMA, serialized); + newValue = Values.convertToMap(Schema.STRING_SCHEMA, value); break; case STRUCT: - newValue = Values.convertToStruct(Schema.STRING_SCHEMA, serialized); - break; case BYTES: fail("unexpected schema type"); break; } - newSchema = Values.inferSchema(newValue); + Schema newSchema = Values.inferSchema(newValue); return new SchemaAndValue(newSchema, newValue); } @@ -1075,4 +1281,16 @@ protected void assertRoundTrip(Schema schema, Schema currentSchema, Object value assertEquals(result, result2); } } + + protected void assertShortCircuit(Schema schema, Object value) { + SchemaAndValue result = convertTo(schema, value); + + if (value == null) { + assertNull(result.schema()); + assertNull(result.value()); + } else { + assertEquals(value, result.value()); + assertEquals(schema, result.schema()); + } + } } diff --git a/gradle/spotbugs-exclude.xml b/gradle/spotbugs-exclude.xml index 7c4b96f077d4c..cfa942ed053fd 100644 --- a/gradle/spotbugs-exclude.xml +++ b/gradle/spotbugs-exclude.xml @@ -298,6 +298,13 @@ For a detailed description of spotbugs bug categories, see https://spotbugs.read + + + + + + + diff --git a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/connect/ValuesBenchmark.java b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/connect/ValuesBenchmark.java new file mode 100644 index 0000000000000..ddcdb5e2418ff --- /dev/null +++ b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/connect/ValuesBenchmark.java @@ -0,0 +1,297 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.jmh.connect; + +import org.apache.kafka.connect.data.Date; +import org.apache.kafka.connect.data.Schema; +import org.apache.kafka.connect.data.SchemaAndValue; +import org.apache.kafka.connect.data.SchemaBuilder; +import org.apache.kafka.connect.data.Struct; +import org.apache.kafka.connect.data.Time; +import org.apache.kafka.connect.data.Timestamp; +import org.apache.kafka.connect.data.Values; +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.BenchmarkMode; +import org.openjdk.jmh.annotations.Fork; +import org.openjdk.jmh.annotations.Measurement; +import org.openjdk.jmh.annotations.Mode; +import org.openjdk.jmh.annotations.OutputTimeUnit; +import org.openjdk.jmh.annotations.Scope; +import org.openjdk.jmh.annotations.Setup; +import org.openjdk.jmh.annotations.State; +import org.openjdk.jmh.annotations.Warmup; +import org.openjdk.jmh.infra.Blackhole; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.TimeUnit; +import java.util.function.BiFunction; +import java.util.function.Function; + +/** + * This benchmark tests the performance of the {@link Values} data handling class. + */ +@State(Scope.Benchmark) +@Fork(value = 1) +@Warmup(iterations = 3) +@Measurement(iterations = 7) +@BenchmarkMode(Mode.AverageTime) +@OutputTimeUnit(TimeUnit.NANOSECONDS) +public class ValuesBenchmark { + + private static final Schema MAP_INT_STRING_SCHEMA = SchemaBuilder.map(Schema.INT32_SCHEMA, Schema.STRING_SCHEMA).build(); + private static final Schema FLAT_STRUCT_SCHEMA = SchemaBuilder.struct() + .field("field", Schema.INT32_SCHEMA) + .build(); + private static final Schema STRUCT_SCHEMA = SchemaBuilder.struct() + .field("first", Schema.INT32_SCHEMA) + .field("second", Schema.STRING_SCHEMA) + .field("array", SchemaBuilder.array(Schema.INT32_SCHEMA).build()) + .field("map", MAP_INT_STRING_SCHEMA) + .field("nested", FLAT_STRUCT_SCHEMA) + .build(); + private static final SchemaAndValue[] TEST_VALUES = { + SchemaAndValue.NULL, + new SchemaAndValue(Schema.OPTIONAL_BOOLEAN_SCHEMA, null), + new SchemaAndValue(Schema.BOOLEAN_SCHEMA, true), + new SchemaAndValue(Schema.BOOLEAN_SCHEMA, false), + new SchemaAndValue(Schema.OPTIONAL_INT8_SCHEMA, null), + new SchemaAndValue(Schema.INT8_SCHEMA, (byte) 0), + new SchemaAndValue(Schema.INT8_SCHEMA, Byte.MAX_VALUE), + new SchemaAndValue(Schema.OPTIONAL_INT16_SCHEMA, null), + new SchemaAndValue(Schema.INT16_SCHEMA, (short) 0), + new SchemaAndValue(Schema.INT16_SCHEMA, Short.MAX_VALUE), + new SchemaAndValue(Schema.OPTIONAL_INT32_SCHEMA, null), + new SchemaAndValue(Schema.INT32_SCHEMA, 0), + new SchemaAndValue(Schema.INT32_SCHEMA, Integer.MAX_VALUE), + new SchemaAndValue(Schema.OPTIONAL_INT64_SCHEMA, null), + new SchemaAndValue(Schema.INT64_SCHEMA, (long) 0), + new SchemaAndValue(Schema.INT64_SCHEMA, Long.MAX_VALUE), + new SchemaAndValue(Schema.OPTIONAL_FLOAT32_SCHEMA, null), + new SchemaAndValue(Schema.FLOAT32_SCHEMA, (float) 0), + new SchemaAndValue(Schema.FLOAT32_SCHEMA, 0.1f), + new SchemaAndValue(Schema.FLOAT64_SCHEMA, 1.1f), + new SchemaAndValue(Schema.FLOAT32_SCHEMA, Float.MAX_VALUE), + new SchemaAndValue(Schema.OPTIONAL_FLOAT64_SCHEMA, null), + new SchemaAndValue(Schema.FLOAT64_SCHEMA, (double) 0), + new SchemaAndValue(Schema.FLOAT64_SCHEMA, (double) 0.1f), + new SchemaAndValue(Schema.FLOAT64_SCHEMA, (double) 1.1f), + new SchemaAndValue(Schema.FLOAT64_SCHEMA, Double.MAX_VALUE), + new SchemaAndValue(Schema.OPTIONAL_STRING_SCHEMA, null), + new SchemaAndValue(Date.SCHEMA, "2019-08-23"), + new SchemaAndValue(Time.SCHEMA, "14:34:54.346Z"), + new SchemaAndValue(Timestamp.SCHEMA, "2019-08-23T14:34:54.346Z"), + new SchemaAndValue(Schema.STRING_SCHEMA, ""), + new SchemaAndValue(Schema.STRING_SCHEMA, "a-random-string"), + new SchemaAndValue(Schema.STRING_SCHEMA, "[]"), + new SchemaAndValue(Schema.STRING_SCHEMA, "[1, 2, 3]"), + new SchemaAndValue(Schema.STRING_SCHEMA, "{}"), + new SchemaAndValue(Schema.STRING_SCHEMA, "{\"1\": 2, \"3\": 4}"), + new SchemaAndValue(SchemaBuilder.array(Schema.INT16_SCHEMA), new short[]{1, 2, 3}), + new SchemaAndValue(SchemaBuilder.map(Schema.STRING_SCHEMA, Schema.BOOLEAN_SCHEMA), + Collections.singletonMap("key", true)), + new SchemaAndValue(STRUCT_SCHEMA, new Struct(STRUCT_SCHEMA) + .put("first", 1) + .put("second", "foo") + .put("array", Arrays.asList(1, 2, 3)) + .put("map", Collections.singletonMap(1, "value")) + .put("nested", new Struct(FLAT_STRUCT_SCHEMA).put("field", 12))), + }; + + private SchemaAndValue[] convertToBooleanCases; + private SchemaAndValue[] convertToByteCases; + private SchemaAndValue[] convertToDateCases; + private SchemaAndValue[] convertToDecimalCases; + private SchemaAndValue[] convertToDoubleCases; + private SchemaAndValue[] convertToFloatCases; + private SchemaAndValue[] convertToShortCases; + private SchemaAndValue[] convertToListCases; + private SchemaAndValue[] convertToMapCases; + private SchemaAndValue[] convertToLongCases; + private SchemaAndValue[] convertToIntegerCases; + private SchemaAndValue[] convertToStructCases; + private SchemaAndValue[] convertToTimeCases; + private SchemaAndValue[] convertToTimestampCases; + private SchemaAndValue[] convertToStringCases; + private String[] parseStringCases; + + private SchemaAndValue[] successfulCases(BiFunction fn) { + List successful = new ArrayList<>(); + for (SchemaAndValue testCase : TEST_VALUES) { + try { + fn.apply(testCase.schema(), testCase.value()); + successful.add(testCase); + } catch (Throwable ignored) { + } + } + return successful.toArray(new SchemaAndValue[]{}); + } + + private String[] casesToString(Function fn) { + List successful = new ArrayList<>(); + for (SchemaAndValue testCase : TEST_VALUES) { + String v = String.valueOf(testCase.value()); + try { + fn.apply(v); + successful.add(v); + } catch (Throwable ignored) { + } + } + return successful.toArray(new String[]{}); + } + + @Setup + public void setup() { + convertToBooleanCases = successfulCases(Values::convertToBoolean); + convertToByteCases = successfulCases(Values::convertToByte); + convertToDateCases = successfulCases(Values::convertToDate); + convertToDecimalCases = successfulCases((schema, object) -> Values.convertToDecimal(schema, object, 1)); + convertToDoubleCases = successfulCases(Values::convertToDouble); + convertToFloatCases = successfulCases(Values::convertToFloat); + convertToShortCases = successfulCases(Values::convertToShort); + convertToListCases = successfulCases(Values::convertToList); + convertToMapCases = successfulCases(Values::convertToMap); + convertToLongCases = successfulCases(Values::convertToLong); + convertToIntegerCases = successfulCases(Values::convertToInteger); + convertToStructCases = successfulCases(Values::convertToStruct); + convertToTimeCases = successfulCases(Values::convertToTime); + convertToTimestampCases = successfulCases(Values::convertToTimestamp); + convertToStringCases = successfulCases(Values::convertToString); + parseStringCases = casesToString(Values::parseString); + } + + @Benchmark + public void testConvertToBoolean(Blackhole blackhole) { + for (SchemaAndValue testCase : convertToBooleanCases) { + blackhole.consume(Values.convertToBoolean(testCase.schema(), testCase.value())); + } + } + + @Benchmark + public void testConvertToByte(Blackhole blackhole) { + for (SchemaAndValue testCase : convertToByteCases) { + blackhole.consume(Values.convertToByte(testCase.schema(), testCase.value())); + } + } + + @Benchmark + public void testConvertToDate(Blackhole blackhole) { + for (SchemaAndValue testCase : convertToDateCases) { + blackhole.consume(Values.convertToDate(testCase.schema(), testCase.value())); + } + } + + @Benchmark + public void testConvertToDecimal(Blackhole blackhole) { + for (SchemaAndValue testCase : convertToDecimalCases) { + blackhole.consume(Values.convertToDecimal(testCase.schema(), testCase.value(), 1)); + } + } + + @Benchmark + public void testConvertToDouble(Blackhole blackhole) { + for (SchemaAndValue testCase : convertToDoubleCases) { + blackhole.consume(Values.convertToDouble(testCase.schema(), testCase.value())); + } + } + + @Benchmark + public void testConvertToFloat(Blackhole blackhole) { + for (SchemaAndValue testCase : convertToFloatCases) { + blackhole.consume(Values.convertToFloat(testCase.schema(), testCase.value())); + } + } + + @Benchmark + public void testConvertToShort(Blackhole blackhole) { + for (SchemaAndValue testCase : convertToShortCases) { + blackhole.consume(Values.convertToShort(testCase.schema(), testCase.value())); + } + } + + @Benchmark + public void testConvertToList(Blackhole blackhole) { + for (SchemaAndValue testCase : convertToListCases) { + blackhole.consume(Values.convertToList(testCase.schema(), testCase.value())); + } + } + + @Benchmark + public void testConvertToMap(Blackhole blackhole) { + for (SchemaAndValue testCase : convertToMapCases) { + blackhole.consume(Values.convertToMap(testCase.schema(), testCase.value())); + } + } + + @Benchmark + public void testConvertToLong(Blackhole blackhole) { + for (SchemaAndValue testCase : convertToLongCases) { + blackhole.consume(Values.convertToLong(testCase.schema(), testCase.value())); + } + } + + @Benchmark + public void testConvertToInteger(Blackhole blackhole) { + for (SchemaAndValue testCase : convertToIntegerCases) { + blackhole.consume(Values.convertToInteger(testCase.schema(), testCase.value())); + } + } + + @Benchmark + public void testConvertToStruct(Blackhole blackhole) { + for (SchemaAndValue testCase : convertToStructCases) { + blackhole.consume(Values.convertToStruct(testCase.schema(), testCase.value())); + } + } + + @Benchmark + public void testConvertToTime(Blackhole blackhole) { + for (SchemaAndValue testCase : convertToTimeCases) { + blackhole.consume(Values.convertToTime(testCase.schema(), testCase.value())); + } + } + + @Benchmark + public void testConvertToTimestamp(Blackhole blackhole) { + for (SchemaAndValue testCase : convertToTimestampCases) { + blackhole.consume(Values.convertToTimestamp(testCase.schema(), testCase.value())); + } + } + + @Benchmark + public void testConvertToString(Blackhole blackhole) { + for (SchemaAndValue testCase : convertToStringCases) { + blackhole.consume(Values.convertToString(testCase.schema(), testCase.value())); + } + } + + @Benchmark + public void testInferSchema(Blackhole blackhole) { + for (SchemaAndValue testCase : TEST_VALUES) { + blackhole.consume(Values.inferSchema(testCase.value())); + } + } + + @Benchmark + public void testParseString(Blackhole blackhole) { + for (String testCase : parseStringCases) { + blackhole.consume(Values.parseString(testCase)); + } + } +}