diff --git a/checkstyle/suppressions.xml b/checkstyle/suppressions.xml
index a8bc6351db3fa..a317fa558c10f 100644
--- a/checkstyle/suppressions.xml
+++ b/checkstyle/suppressions.xml
@@ -99,7 +99,7 @@
files="(AbstractRequest|AbstractResponse|KerberosLogin|WorkerSinkTaskTest|TransactionManagerTest|SenderTest|KafkaAdminClient|ConsumerCoordinatorTest|KafkaAdminClientTest|KafkaRaftClientTest).java"/>
+ files="(ConsumerCoordinator|BufferPool|MetricName|Node|ConfigDef|RecordBatch|SslFactory|SslTransportLayer|MetadataResponse|KerberosLogin|Selector|Sender|Serdes|TokenInformation|Agent|PluginUtils|MiniTrogdorCluster|TasksRequest|KafkaProducer|AbstractStickyAssignor|KafkaRaftClient|Authorizer|FetchSessionHandler|RecordAccumulator|Shell).java"/>
@@ -143,7 +143,7 @@
+ files="(DistributedHerder|DistributedConfig|KafkaConfigBackingStore|IncrementalCooperativeAssignor).java"/>
+ files="(JsonConverter|ConnectHeaders).java"/>
+ files="(KafkaConfigBackingStore|ConnectMetricsRegistry).java"/>
diff --git a/connect/api/src/main/java/org/apache/kafka/connect/data/Values.java b/connect/api/src/main/java/org/apache/kafka/connect/data/Values.java
index 7b78c64af0ca7..e144b7c69b6c7 100644
--- a/connect/api/src/main/java/org/apache/kafka/connect/data/Values.java
+++ b/connect/api/src/main/java/org/apache/kafka/connect/data/Values.java
@@ -22,25 +22,24 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.io.Serializable;
import java.math.BigDecimal;
+import java.math.BigInteger;
+import java.math.RoundingMode;
import java.nio.ByteBuffer;
import java.text.CharacterIterator;
import java.text.DateFormat;
-import java.text.ParseException;
+import java.text.ParsePosition;
import java.text.SimpleDateFormat;
import java.text.StringCharacterIterator;
import java.util.ArrayList;
-import java.util.Arrays;
import java.util.Base64;
import java.util.Calendar;
-import java.util.Collections;
-import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
-import java.util.Set;
import java.util.TimeZone;
import java.util.regex.Pattern;
@@ -61,42 +60,16 @@
*/
public class Values {
- private static final Logger LOG = LoggerFactory.getLogger(Values.class);
-
private static final TimeZone UTC = TimeZone.getTimeZone("UTC");
private static final SchemaAndValue NULL_SCHEMA_AND_VALUE = new SchemaAndValue(null, null);
- private static final SchemaAndValue TRUE_SCHEMA_AND_VALUE = new SchemaAndValue(Schema.BOOLEAN_SCHEMA, Boolean.TRUE);
- private static final SchemaAndValue FALSE_SCHEMA_AND_VALUE = new SchemaAndValue(Schema.BOOLEAN_SCHEMA, Boolean.FALSE);
private static final Schema ARRAY_SELECTOR_SCHEMA = SchemaBuilder.array(Schema.STRING_SCHEMA).build();
private static final Schema MAP_SELECTOR_SCHEMA = SchemaBuilder.map(Schema.STRING_SCHEMA, Schema.STRING_SCHEMA).build();
private static final Schema STRUCT_SELECTOR_SCHEMA = SchemaBuilder.struct().build();
- private static final String TRUE_LITERAL = Boolean.TRUE.toString();
- private static final String FALSE_LITERAL = Boolean.FALSE.toString();
private static final long MILLIS_PER_DAY = 24 * 60 * 60 * 1000;
private static final String NULL_VALUE = "null";
static final String ISO_8601_DATE_FORMAT_PATTERN = "yyyy-MM-dd";
static final String ISO_8601_TIME_FORMAT_PATTERN = "HH:mm:ss.SSS'Z'";
static final String ISO_8601_TIMESTAMP_FORMAT_PATTERN = ISO_8601_DATE_FORMAT_PATTERN + "'T'" + ISO_8601_TIME_FORMAT_PATTERN;
- private static final Set TEMPORAL_LOGICAL_TYPE_NAMES =
- Collections.unmodifiableSet(
- new HashSet<>(
- Arrays.asList(Time.LOGICAL_NAME,
- Timestamp.LOGICAL_NAME,
- Date.LOGICAL_NAME
- )
- )
- );
-
- private static final String QUOTE_DELIMITER = "\"";
- private static final String COMMA_DELIMITER = ",";
- private static final String ENTRY_DELIMITER = ":";
- private static final String ARRAY_BEGIN_DELIMITER = "[";
- private static final String ARRAY_END_DELIMITER = "]";
- private static final String MAP_BEGIN_DELIMITER = "{";
- private static final String MAP_END_DELIMITER = "}";
- private static final int ISO_8601_DATE_LENGTH = ISO_8601_DATE_FORMAT_PATTERN.length();
- private static final int ISO_8601_TIME_LENGTH = ISO_8601_TIME_FORMAT_PATTERN.length() - 2; // subtract single quotes
- private static final int ISO_8601_TIMESTAMP_LENGTH = ISO_8601_TIMESTAMP_FORMAT_PATTERN.length() - 4; // subtract single quotes
private static final Pattern TWO_BACKSLASHES = Pattern.compile("\\\\");
@@ -112,7 +85,17 @@ public class Values {
* @throws DataException if the value could not be converted to a boolean
*/
public static Boolean convertToBoolean(Schema schema, Object value) throws DataException {
- return (Boolean) convertTo(Schema.OPTIONAL_BOOLEAN_SCHEMA, schema, value);
+ if (value == null) {
+ return null;
+ } else if (value instanceof Boolean) {
+ return (Boolean) value;
+ } else if (value instanceof String) {
+ SchemaAndValue parsed = parseString(value.toString());
+ if (parsed.value() instanceof Boolean) {
+ return (Boolean) parsed.value();
+ }
+ }
+ return asLong(value, schema, null) == 0L ? Boolean.FALSE : Boolean.TRUE;
}
/**
@@ -125,7 +108,12 @@ public static Boolean convertToBoolean(Schema schema, Object value) throws DataE
* @throws DataException if the value could not be converted to a byte
*/
public static Byte convertToByte(Schema schema, Object value) throws DataException {
- return (Byte) convertTo(Schema.OPTIONAL_INT8_SCHEMA, schema, value);
+ if (value == null) {
+ return null;
+ } else if (value instanceof Byte) {
+ return (Byte) value;
+ }
+ return (byte) asLong(value, schema, null);
}
/**
@@ -138,7 +126,12 @@ public static Byte convertToByte(Schema schema, Object value) throws DataExcepti
* @throws DataException if the value could not be converted to a short
*/
public static Short convertToShort(Schema schema, Object value) throws DataException {
- return (Short) convertTo(Schema.OPTIONAL_INT16_SCHEMA, schema, value);
+ if (value == null) {
+ return null;
+ } else if (value instanceof Short) {
+ return (Short) value;
+ }
+ return (short) asLong(value, schema, null);
}
/**
@@ -151,7 +144,12 @@ public static Short convertToShort(Schema schema, Object value) throws DataExcep
* @throws DataException if the value could not be converted to an integer
*/
public static Integer convertToInteger(Schema schema, Object value) throws DataException {
- return (Integer) convertTo(Schema.OPTIONAL_INT32_SCHEMA, schema, value);
+ if (value == null) {
+ return null;
+ } else if (value instanceof Integer) {
+ return (Integer) value;
+ }
+ return (int) asLong(value, schema, null);
}
/**
@@ -164,7 +162,12 @@ public static Integer convertToInteger(Schema schema, Object value) throws DataE
* @throws DataException if the value could not be converted to a long
*/
public static Long convertToLong(Schema schema, Object value) throws DataException {
- return (Long) convertTo(Schema.OPTIONAL_INT64_SCHEMA, schema, value);
+ if (value == null) {
+ return null;
+ } else if (value instanceof Long) {
+ return (Long) value;
+ }
+ return asLong(value, schema, null);
}
/**
@@ -177,7 +180,12 @@ public static Long convertToLong(Schema schema, Object value) throws DataExcepti
* @throws DataException if the value could not be converted to a float
*/
public static Float convertToFloat(Schema schema, Object value) throws DataException {
- return (Float) convertTo(Schema.OPTIONAL_FLOAT32_SCHEMA, schema, value);
+ if (value == null) {
+ return null;
+ } else if (value instanceof Float) {
+ return (Float) value;
+ }
+ return (float) asDouble(value, schema, null);
}
/**
@@ -190,7 +198,12 @@ public static Float convertToFloat(Schema schema, Object value) throws DataExcep
* @throws DataException if the value could not be converted to a double
*/
public static Double convertToDouble(Schema schema, Object value) throws DataException {
- return (Double) convertTo(Schema.OPTIONAL_FLOAT64_SCHEMA, schema, value);
+ if (value == null) {
+ return null;
+ } else if (value instanceof Double) {
+ return (Double) value;
+ }
+ return asDouble(value, schema, null);
}
/**
@@ -202,7 +215,12 @@ public static Double convertToDouble(Schema schema, Object value) throws DataExc
* @return the representation as a string, or null if the supplied value was null
*/
public static String convertToString(Schema schema, Object value) {
- return (String) convertTo(Schema.OPTIONAL_STRING_SCHEMA, schema, value);
+ if (value == null) {
+ return null;
+ }
+ StringBuilder sb = new StringBuilder();
+ append(sb, value, false);
+ return sb.toString();
}
/**
@@ -219,7 +237,7 @@ public static String convertToString(Schema schema, Object value) {
* @throws DataException if the value cannot be converted to a list value
*/
public static List> convertToList(Schema schema, Object value) {
- return (List>) convertTo(ARRAY_SELECTOR_SCHEMA, schema, value);
+ return convertToArray(ARRAY_SELECTOR_SCHEMA, value);
}
/**
@@ -235,7 +253,7 @@ public static List> convertToList(Schema schema, Object value) {
* @throws DataException if the value cannot be converted to a map value
*/
public static Map, ?> convertToMap(Schema schema, Object value) {
- return (Map, ?>) convertTo(MAP_SELECTOR_SCHEMA, schema, value);
+ return convertToMapInternal(MAP_SELECTOR_SCHEMA, value);
}
/**
@@ -250,7 +268,7 @@ public static List> convertToList(Schema schema, Object value) {
* @throws DataException if the value is not a struct
*/
public static Struct convertToStruct(Schema schema, Object value) {
- return (Struct) convertTo(STRUCT_SELECTOR_SCHEMA, schema, value);
+ return convertToStructInternal(STRUCT_SELECTOR_SCHEMA, value);
}
/**
@@ -263,7 +281,10 @@ public static Struct convertToStruct(Schema schema, Object value) {
* @throws DataException if the value cannot be converted to a time value
*/
public static java.util.Date convertToTime(Schema schema, Object value) {
- return (java.util.Date) convertTo(Time.SCHEMA, schema, value);
+ if (value == null) {
+ throw new DataException("Unable to convert a null value to a schema that requires a value");
+ }
+ return convertToTime(Time.SCHEMA, schema, value);
}
/**
@@ -276,7 +297,10 @@ public static java.util.Date convertToTime(Schema schema, Object value) {
* @throws DataException if the value cannot be converted to a date value
*/
public static java.util.Date convertToDate(Schema schema, Object value) {
- return (java.util.Date) convertTo(Date.SCHEMA, schema, value);
+ if (value == null) {
+ throw new DataException("Unable to convert a null value to a schema that requires a value");
+ }
+ return convertToDate(Date.SCHEMA, schema, value);
}
/**
@@ -289,7 +313,10 @@ public static java.util.Date convertToDate(Schema schema, Object value) {
* @throws DataException if the value cannot be converted to a timestamp value
*/
public static java.util.Date convertToTimestamp(Schema schema, Object value) {
- return (java.util.Date) convertTo(Timestamp.SCHEMA, schema, value);
+ if (value == null) {
+ throw new DataException("Unable to convert a null value to a schema that requires a value");
+ }
+ return convertToTimestamp(Timestamp.SCHEMA, schema, value);
}
/**
@@ -302,7 +329,10 @@ public static java.util.Date convertToTimestamp(Schema schema, Object value) {
* @throws DataException if the value cannot be converted to a decimal value
*/
public static BigDecimal convertToDecimal(Schema schema, Object value, int scale) {
- return (BigDecimal) convertTo(Decimal.schema(scale), schema, value);
+ if (value == null) {
+ throw new DataException("Unable to convert a null value to a schema that requires a value");
+ }
+ return convertToDecimal(Decimal.schema(scale), value);
}
/**
@@ -314,65 +344,59 @@ public static BigDecimal convertToDecimal(Schema schema, Object value, int scale
public static Schema inferSchema(Object value) {
if (value instanceof String) {
return Schema.STRING_SCHEMA;
- }
- if (value instanceof Boolean) {
+ } else if (value instanceof Boolean) {
return Schema.BOOLEAN_SCHEMA;
- }
- if (value instanceof Byte) {
+ } else if (value instanceof Byte) {
return Schema.INT8_SCHEMA;
- }
- if (value instanceof Short) {
+ } else if (value instanceof Short) {
return Schema.INT16_SCHEMA;
- }
- if (value instanceof Integer) {
+ } else if (value instanceof Integer) {
return Schema.INT32_SCHEMA;
- }
- if (value instanceof Long) {
+ } else if (value instanceof Long) {
return Schema.INT64_SCHEMA;
- }
- if (value instanceof Float) {
+ } else if (value instanceof Float) {
return Schema.FLOAT32_SCHEMA;
- }
- if (value instanceof Double) {
+ } else if (value instanceof Double) {
return Schema.FLOAT64_SCHEMA;
- }
- if (value instanceof byte[] || value instanceof ByteBuffer) {
+ } else if (value instanceof byte[] || value instanceof ByteBuffer) {
return Schema.BYTES_SCHEMA;
+ } else if (value instanceof List) {
+ return inferListSchema((List>) value);
+ } else if (value instanceof Map) {
+ return inferMapSchema((Map, ?>) value);
+ } else if (value instanceof Struct) {
+ return ((Struct) value).schema();
}
- if (value instanceof List) {
- List> list = (List>) value;
- if (list.isEmpty()) {
+ return null;
+ }
+
+ private static Schema inferListSchema(List> list) {
+ if (list.isEmpty()) {
+ return null;
+ }
+ SchemaDetector detector = new SchemaDetector();
+ for (Object element : list) {
+ if (!detector.canDetect(element)) {
return null;
}
- SchemaDetector detector = new SchemaDetector();
- for (Object element : list) {
- if (!detector.canDetect(element)) {
- return null;
- }
- }
- return SchemaBuilder.array(detector.schema()).build();
}
- if (value instanceof Map) {
- Map, ?> map = (Map, ?>) value;
- if (map.isEmpty()) {
+ return SchemaBuilder.array(detector.schema()).build();
+ }
+
+ private static Schema inferMapSchema(Map, ?> map) {
+ if (map.isEmpty()) {
+ return null;
+ }
+ SchemaDetector keyDetector = new SchemaDetector();
+ SchemaDetector valueDetector = new SchemaDetector();
+ for (Map.Entry, ?> entry : map.entrySet()) {
+ if (!keyDetector.canDetect(entry.getKey()) || !valueDetector.canDetect(entry.getValue())) {
return null;
}
- SchemaDetector keyDetector = new SchemaDetector();
- SchemaDetector valueDetector = new SchemaDetector();
- for (Map.Entry, ?> entry : map.entrySet()) {
- if (!keyDetector.canDetect(entry.getKey()) || !valueDetector.canDetect(entry.getValue())) {
- return null;
- }
- }
- return SchemaBuilder.map(keyDetector.schema(), valueDetector.schema()).build();
}
- if (value instanceof Struct) {
- return ((Struct) value).schema();
- }
- return null;
+ return SchemaBuilder.map(keyDetector.schema(), valueDetector.schema()).build();
}
-
/**
* Parse the specified string representation of a value into its schema and value.
*
@@ -387,8 +411,8 @@ public static SchemaAndValue parseString(String value) {
if (value.isEmpty()) {
return new SchemaAndValue(Schema.STRING_SCHEMA, value);
}
- Parser parser = new Parser(value);
- return parse(parser, false);
+ ValueParser parser = new ValueParser(new Parser(value));
+ return parser.parse(false);
}
/**
@@ -396,7 +420,7 @@ public static SchemaAndValue parseString(String value) {
*
* @param toSchema the schema for the desired type; may not be null
* @param fromSchema the schema for the supplied value; may be null if not known
- * @return the converted value; never null
+ * @return the converted value; null if the passed-in schema was optional, and the input value was null.
* @throws DataException if the value could not be converted to the desired type
*/
protected static Object convertTo(Schema toSchema, Schema fromSchema, Object value) throws DataException {
@@ -408,184 +432,213 @@ protected static Object convertTo(Schema toSchema, Schema fromSchema, Object val
}
switch (toSchema.type()) {
case BYTES:
- if (Decimal.LOGICAL_NAME.equals(toSchema.name())) {
- if (value instanceof ByteBuffer) {
- value = Utils.toArray((ByteBuffer) value);
- }
- if (value instanceof byte[]) {
- return Decimal.toLogical(toSchema, (byte[]) value);
- }
- if (value instanceof BigDecimal) {
- return value;
- }
- if (value instanceof Number) {
- // Not already a decimal, so treat it as a double ...
- double converted = ((Number) value).doubleValue();
- return BigDecimal.valueOf(converted);
- }
- if (value instanceof String) {
- return new BigDecimal(value.toString());
- }
- }
- if (value instanceof ByteBuffer) {
- return Utils.toArray((ByteBuffer) value);
- }
- if (value instanceof byte[]) {
- return value;
- }
- if (value instanceof BigDecimal) {
- return Decimal.fromLogical(toSchema, (BigDecimal) value);
- }
- break;
+ return convertMaybeLogicalBytes(toSchema, value);
case STRING:
- StringBuilder sb = new StringBuilder();
- append(sb, value, false);
- return sb.toString();
+ return convertToString(fromSchema, value);
case BOOLEAN:
- if (value instanceof Boolean) {
- return value;
- }
- if (value instanceof String) {
- SchemaAndValue parsed = parseString(value.toString());
- if (parsed.value() instanceof Boolean) {
- return parsed.value();
- }
- }
- return asLong(value, fromSchema, null) == 0L ? Boolean.FALSE : Boolean.TRUE;
+ return convertToBoolean(fromSchema, value);
case INT8:
- if (value instanceof Byte) {
- return value;
- }
- return (byte) asLong(value, fromSchema, null);
+ return convertToByte(fromSchema, value);
case INT16:
- if (value instanceof Short) {
- return value;
- }
- return (short) asLong(value, fromSchema, null);
+ return convertToShort(fromSchema, value);
case INT32:
- if (Date.LOGICAL_NAME.equals(toSchema.name())) {
- if (value instanceof String) {
- SchemaAndValue parsed = parseString(value.toString());
- value = parsed.value();
- }
- if (value instanceof java.util.Date) {
- if (fromSchema != null) {
- String fromSchemaName = fromSchema.name();
- if (Date.LOGICAL_NAME.equals(fromSchemaName)) {
- return value;
- }
- if (Timestamp.LOGICAL_NAME.equals(fromSchemaName)) {
- // Just get the number of days from this timestamp
- long millis = ((java.util.Date) value).getTime();
- int days = (int) (millis / MILLIS_PER_DAY); // truncates
- return Date.toLogical(toSchema, days);
- }
- } else {
- // There is no fromSchema, so no conversion is needed
- return value;
- }
- }
- long numeric = asLong(value, fromSchema, null);
- return Date.toLogical(toSchema, (int) numeric);
- }
- if (Time.LOGICAL_NAME.equals(toSchema.name())) {
- if (value instanceof String) {
- SchemaAndValue parsed = parseString(value.toString());
- value = parsed.value();
- }
- if (value instanceof java.util.Date) {
- if (fromSchema != null) {
- String fromSchemaName = fromSchema.name();
- if (Time.LOGICAL_NAME.equals(fromSchemaName)) {
- return value;
- }
- if (Timestamp.LOGICAL_NAME.equals(fromSchemaName)) {
- // Just get the time portion of this timestamp
- Calendar calendar = Calendar.getInstance(UTC);
- calendar.setTime((java.util.Date) value);
- calendar.set(Calendar.YEAR, 1970);
- calendar.set(Calendar.MONTH, 0); // Months are zero-based
- calendar.set(Calendar.DAY_OF_MONTH, 1);
- return Time.toLogical(toSchema, (int) calendar.getTimeInMillis());
- }
- } else {
- // There is no fromSchema, so no conversion is needed
- return value;
- }
- }
- long numeric = asLong(value, fromSchema, null);
- return Time.toLogical(toSchema, (int) numeric);
- }
- if (value instanceof Integer) {
- return value;
- }
- return (int) asLong(value, fromSchema, null);
+ return convertMaybeLogicalInteger(toSchema, fromSchema, value);
case INT64:
- if (Timestamp.LOGICAL_NAME.equals(toSchema.name())) {
- if (value instanceof String) {
- SchemaAndValue parsed = parseString(value.toString());
- value = parsed.value();
- }
- if (value instanceof java.util.Date) {
- java.util.Date date = (java.util.Date) value;
- if (fromSchema != null) {
- String fromSchemaName = fromSchema.name();
- if (Date.LOGICAL_NAME.equals(fromSchemaName)) {
- int days = Date.fromLogical(fromSchema, date);
- long millis = days * MILLIS_PER_DAY;
- return Timestamp.toLogical(toSchema, millis);
- }
- if (Time.LOGICAL_NAME.equals(fromSchemaName)) {
- long millis = Time.fromLogical(fromSchema, date);
- return Timestamp.toLogical(toSchema, millis);
- }
- if (Timestamp.LOGICAL_NAME.equals(fromSchemaName)) {
- return value;
- }
- } else {
- // There is no fromSchema, so no conversion is needed
- return value;
- }
- }
- long numeric = asLong(value, fromSchema, null);
- return Timestamp.toLogical(toSchema, numeric);
- }
- if (value instanceof Long) {
- return value;
- }
- return asLong(value, fromSchema, null);
+ return convertMaybeLogicalLong(toSchema, fromSchema, value);
case FLOAT32:
- if (value instanceof Float) {
- return value;
- }
- return (float) asDouble(value, fromSchema, null);
+ return convertToFloat(fromSchema, value);
case FLOAT64:
- if (value instanceof Double) {
- return value;
- }
- return asDouble(value, fromSchema, null);
+ return convertToDouble(fromSchema, value);
case ARRAY:
- if (value instanceof String) {
- SchemaAndValue schemaAndValue = parseString(value.toString());
- value = schemaAndValue.value();
+ return convertToArray(toSchema, value);
+ case MAP:
+ return convertToMapInternal(toSchema, value);
+ case STRUCT:
+ return convertToStructInternal(toSchema, value);
+ }
+ throw new DataException("Unable to convert " + value + " (" + value.getClass() + ") to " + toSchema);
+ }
+
+ private static Serializable convertMaybeLogicalBytes(Schema toSchema, Object value) {
+ if (Decimal.LOGICAL_NAME.equals(toSchema.name())) {
+ return convertToDecimal(toSchema, value);
+ }
+ return convertToBytes(toSchema, value);
+ }
+
+ private static BigDecimal convertToDecimal(Schema toSchema, Object value) {
+ if (value instanceof ByteBuffer) {
+ value = Utils.toArray((ByteBuffer) value);
+ }
+ if (value instanceof byte[]) {
+ return Decimal.toLogical(toSchema, (byte[]) value);
+ }
+ if (value instanceof BigDecimal) {
+ return (BigDecimal) value;
+ }
+ if (value instanceof Number) {
+ // Not already a decimal, so treat it as a double ...
+ double converted = ((Number) value).doubleValue();
+ return BigDecimal.valueOf(converted);
+ }
+ if (value instanceof String) {
+ return new BigDecimal(value.toString());
+ }
+ throw new DataException("Unable to convert " + value + " (" + value.getClass() + ") to " + toSchema);
+ }
+
+ private static byte[] convertToBytes(Schema toSchema, Object value) {
+ if (value instanceof ByteBuffer) {
+ return Utils.toArray((ByteBuffer) value);
+ }
+ if (value instanceof byte[]) {
+ return (byte[]) value;
+ }
+ if (value instanceof BigDecimal) {
+ return Decimal.fromLogical(toSchema, (BigDecimal) value);
+ }
+ throw new DataException("Unable to convert " + value + " (" + value.getClass() + ") to " + toSchema);
+ }
+
+ private static Serializable convertMaybeLogicalInteger(Schema toSchema, Schema fromSchema, Object value) {
+ if (Date.LOGICAL_NAME.equals(toSchema.name())) {
+ return convertToDate(toSchema, fromSchema, value);
+ }
+ if (Time.LOGICAL_NAME.equals(toSchema.name())) {
+ return convertToTime(toSchema, fromSchema, value);
+ }
+ return convertToInteger(fromSchema, value);
+ }
+
+ private static java.util.Date convertToDate(Schema toSchema, Schema fromSchema, Object value) {
+ if (value == null) {
+ return null;
+ } else if (value instanceof String) {
+ SchemaAndValue parsed = parseString(value.toString());
+ value = parsed.value();
+ }
+ if (value instanceof java.util.Date) {
+ if (fromSchema != null) {
+ String fromSchemaName = fromSchema.name();
+ if (Date.LOGICAL_NAME.equals(fromSchemaName)) {
+ return (java.util.Date) value;
}
- if (value instanceof List) {
- return value;
+ if (Timestamp.LOGICAL_NAME.equals(fromSchemaName)) {
+ // Just get the number of days from this timestamp
+ long millis = ((java.util.Date) value).getTime();
+ int days = (int) (millis / MILLIS_PER_DAY); // truncates
+ return Date.toLogical(toSchema, days);
}
- break;
- case MAP:
- if (value instanceof String) {
- SchemaAndValue schemaAndValue = parseString(value.toString());
- value = schemaAndValue.value();
+ } else {
+ // There is no fromSchema, so no conversion is needed
+ return (java.util.Date) value;
+ }
+ }
+ long numeric = asLong(value, fromSchema, null);
+ return Date.toLogical(toSchema, (int) numeric);
+ }
+
+ private static java.util.Date convertToTime(Schema toSchema, Schema fromSchema, Object value) {
+ if (value == null) {
+ return null;
+ } else if (value instanceof String) {
+ SchemaAndValue parsed = parseString(value.toString());
+ value = parsed.value();
+ }
+ if (value instanceof java.util.Date) {
+ if (fromSchema != null) {
+ String fromSchemaName = fromSchema.name();
+ if (Time.LOGICAL_NAME.equals(fromSchemaName)) {
+ return (java.util.Date) value;
}
- if (value instanceof Map) {
- return value;
+ if (Timestamp.LOGICAL_NAME.equals(fromSchemaName)) {
+ // Just get the time portion of this timestamp
+ Calendar calendar = Calendar.getInstance(UTC);
+ calendar.setTime((java.util.Date) value);
+ calendar.set(Calendar.YEAR, 1970);
+ calendar.set(Calendar.MONTH, 0); // Months are zero-based
+ calendar.set(Calendar.DAY_OF_MONTH, 1);
+ return Time.toLogical(toSchema, (int) calendar.getTimeInMillis());
}
- break;
- case STRUCT:
- if (value instanceof Struct) {
- return value;
+ } else {
+ // There is no fromSchema, so no conversion is needed
+ return (java.util.Date) value;
+ }
+ }
+ long numeric = asLong(value, fromSchema, null);
+ return Time.toLogical(toSchema, (int) numeric);
+ }
+
+ private static Serializable convertMaybeLogicalLong(Schema toSchema, Schema fromSchema, Object value) {
+ if (Timestamp.LOGICAL_NAME.equals(toSchema.name())) {
+ return convertToTimestamp(toSchema, fromSchema, value);
+ }
+ return convertToLong(fromSchema, value);
+ }
+
+ private static java.util.Date convertToTimestamp(Schema toSchema, Schema fromSchema, Object value) {
+ if (value == null) {
+ return null;
+ } else if (value instanceof String) {
+ SchemaAndValue parsed = parseString(value.toString());
+ value = parsed.value();
+ }
+ if (value instanceof java.util.Date) {
+ java.util.Date date = (java.util.Date) value;
+ if (fromSchema != null) {
+ String fromSchemaName = fromSchema.name();
+ if (Date.LOGICAL_NAME.equals(fromSchemaName)) {
+ int days = Date.fromLogical(fromSchema, date);
+ long millis = days * MILLIS_PER_DAY;
+ return Timestamp.toLogical(toSchema, millis);
}
+ if (Time.LOGICAL_NAME.equals(fromSchemaName)) {
+ long millis = Time.fromLogical(fromSchema, date);
+ return Timestamp.toLogical(toSchema, millis);
+ }
+ if (Timestamp.LOGICAL_NAME.equals(fromSchemaName)) {
+ return date;
+ }
+ } else {
+ // There is no fromSchema, so no conversion is needed
+ return date;
+ }
+ }
+ long numeric = asLong(value, fromSchema, null);
+ return Timestamp.toLogical(toSchema, numeric);
+ }
+
+ private static List> convertToArray(Schema toSchema, Object value) {
+ if (value == null) {
+ throw new DataException("Unable to convert a null value to a schema that requires a value");
+ } else if (value instanceof String) {
+ SchemaAndValue schemaAndValue = parseString(value.toString());
+ value = schemaAndValue.value();
+ }
+ if (value instanceof List) {
+ return (List>) value;
+ }
+ throw new DataException("Unable to convert " + value + " (" + value.getClass() + ") to " + toSchema);
+ }
+
+ private static Map, ?> convertToMapInternal(Schema toSchema, Object value) {
+ if (value == null) {
+ throw new DataException("Unable to convert a null value to a schema that requires a value");
+ } else if (value instanceof String) {
+ SchemaAndValue schemaAndValue = parseString(value.toString());
+ value = schemaAndValue.value();
+ }
+ if (value instanceof Map) {
+ return (Map, ?>) value;
+ }
+ throw new DataException("Unable to convert " + value + " (" + value.getClass() + ") to " + toSchema);
+ }
+
+ private static Struct convertToStructInternal(Schema toSchema, Object value) {
+ if (value == null) {
+ throw new DataException("Unable to convert a null value to a schema that requires a value");
+ } else if (value instanceof Struct) {
+ return (Struct) value;
}
throw new DataException("Unable to convert " + value + " (" + value.getClass() + ") to " + toSchema);
}
@@ -744,174 +797,212 @@ public static DateFormat dateFormatFor(java.util.Date value) {
return new SimpleDateFormat(ISO_8601_TIMESTAMP_FORMAT_PATTERN);
}
- protected static boolean canParseSingleTokenLiteral(Parser parser, boolean embedded, String tokenLiteral) {
- int startPosition = parser.mark();
- // If the next token is what we expect, then either...
- if (parser.canConsume(tokenLiteral)) {
- // ...we're reading an embedded value, in which case the next token will be handled appropriately
- // by the caller if it's something like an end delimiter for a map or array, or a comma to
- // separate multiple embedded values...
- // ...or it's being parsed as part of a top-level string, in which case, any other tokens should
- // cause use to stop parsing this single-token literal as such and instead just treat it like
- // a string. For example, the top-level string "true}" will be tokenized as the tokens "true" and
- // "}", but should ultimately be parsed as just the string "true}" instead of the boolean true.
- if (embedded || !parser.hasNext()) {
- return true;
- }
+ private static class ValueParser {
+
+ private static final Logger log = LoggerFactory.getLogger(ValueParser.class);
+ private static final SchemaAndValue TRUE_SCHEMA_AND_VALUE = new SchemaAndValue(Schema.BOOLEAN_SCHEMA, Boolean.TRUE);
+ private static final SchemaAndValue FALSE_SCHEMA_AND_VALUE = new SchemaAndValue(Schema.BOOLEAN_SCHEMA, Boolean.FALSE);
+ private static final String TRUE_LITERAL = Boolean.TRUE.toString();
+ private static final String FALSE_LITERAL = Boolean.FALSE.toString();
+ private static final BigInteger LONG_MIN = BigInteger.valueOf(Long.MIN_VALUE);
+ private static final BigInteger LONG_MAX = BigInteger.valueOf(Long.MAX_VALUE);
+ private static final String QUOTE_DELIMITER = "\"";
+ private static final String COMMA_DELIMITER = ",";
+ private static final String ENTRY_DELIMITER = ":";
+ private static final String ARRAY_BEGIN_DELIMITER = "[";
+ private static final String ARRAY_END_DELIMITER = "]";
+ private static final String MAP_BEGIN_DELIMITER = "{";
+ private static final String MAP_END_DELIMITER = "}";
+ private static final int ISO_8601_DATE_LENGTH = ISO_8601_DATE_FORMAT_PATTERN.length();
+ private static final int ISO_8601_TIME_LENGTH = ISO_8601_TIME_FORMAT_PATTERN.length() - 2; // subtract single quotes
+ private static final int ISO_8601_TIMESTAMP_LENGTH = ISO_8601_TIMESTAMP_FORMAT_PATTERN.length() - 4; // subtract single quotes
+
+ private final Parser parser;
+
+ private ValueParser(Parser parser) {
+ this.parser = parser;
}
- parser.rewindTo(startPosition);
- return false;
- }
- protected static SchemaAndValue parse(Parser parser, boolean embedded) throws NoSuchElementException {
- if (!parser.hasNext()) {
- return null;
- }
- if (embedded) {
- if (parser.canConsume(QUOTE_DELIMITER)) {
- StringBuilder sb = new StringBuilder();
- while (parser.hasNext()) {
- if (parser.canConsume(QUOTE_DELIMITER)) {
- break;
- }
- sb.append(parser.next());
- }
- String content = sb.toString();
- // We can parse string literals as temporal logical types, but all others
- // are treated as strings
- SchemaAndValue parsed = parseString(content);
- if (parsed != null && TEMPORAL_LOGICAL_TYPE_NAMES.contains(parsed.schema().name())) {
- return parsed;
+ private boolean canParseSingleTokenLiteral(boolean embedded, String tokenLiteral) {
+ int startPosition = parser.mark();
+ // If the next token is what we expect, then either...
+ if (parser.canConsume(tokenLiteral)) {
+ // ...we're reading an embedded value, in which case the next token will be handled appropriately
+ // by the caller if it's something like an end delimiter for a map or array, or a comma to
+ // separate multiple embedded values...
+ // ...or it's being parsed as part of a top-level string, in which case, any other tokens should
+ // cause use to stop parsing this single-token literal as such and instead just treat it like
+ // a string. For example, the top-level string "true}" will be tokenized as the tokens "true" and
+ // "}", but should ultimately be parsed as just the string "true}" instead of the boolean true.
+ if (embedded || !parser.hasNext()) {
+ return true;
}
- return new SchemaAndValue(Schema.STRING_SCHEMA, content);
}
+ parser.rewindTo(startPosition);
+ return false;
}
- if (canParseSingleTokenLiteral(parser, embedded, NULL_VALUE)) {
- return null;
- }
- if (canParseSingleTokenLiteral(parser, embedded, TRUE_LITERAL)) {
- return TRUE_SCHEMA_AND_VALUE;
- }
- if (canParseSingleTokenLiteral(parser, embedded, FALSE_LITERAL)) {
- return FALSE_SCHEMA_AND_VALUE;
+ public SchemaAndValue parse(boolean embedded) throws NoSuchElementException {
+ if (!parser.hasNext()) {
+ return null;
+ } else if (embedded && parser.canConsume(QUOTE_DELIMITER)) {
+ return parseQuotedString();
+ } else if (canParseSingleTokenLiteral(embedded, NULL_VALUE)) {
+ return null;
+ } else if (canParseSingleTokenLiteral(embedded, TRUE_LITERAL)) {
+ return TRUE_SCHEMA_AND_VALUE;
+ } else if (canParseSingleTokenLiteral(embedded, FALSE_LITERAL)) {
+ return FALSE_SCHEMA_AND_VALUE;
+ }
+
+ int startPosition = parser.mark();
+
+ try {
+ if (parser.canConsume(ARRAY_BEGIN_DELIMITER)) {
+ return parseArray();
+ } else if (parser.canConsume(MAP_BEGIN_DELIMITER)) {
+ return parseMap();
+ }
+ } catch (DataException e) {
+ log.trace("Unable to parse the value as a map or an array; reverting to string", e);
+ parser.rewindTo(startPosition);
+ }
+
+ String token = parser.next();
+ if (Utils.isBlank(token)) {
+ return new SchemaAndValue(Schema.STRING_SCHEMA, token);
+ } else {
+ return parseNextToken(embedded, token.trim());
+ }
}
- int startPosition = parser.mark();
+ private SchemaAndValue parseNextToken(boolean embedded, String token) {
+ char firstChar = token.charAt(0);
+ boolean firstCharIsDigit = Character.isDigit(firstChar);
- try {
- if (parser.canConsume(ARRAY_BEGIN_DELIMITER)) {
- List