diff --git a/kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/config/BigQuerySinkConfig.java b/kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/config/BigQuerySinkConfig.java index 32b0b1821..6f2d05446 100644 --- a/kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/config/BigQuerySinkConfig.java +++ b/kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/config/BigQuerySinkConfig.java @@ -346,6 +346,12 @@ public class BigQuerySinkConfig extends AbstractConfig { private static final String BIGQUERY_CLUSTERING_FIELD_NAMES_DOC = "List of fields on which data should be clustered by in BigQuery, separated by commas"; + public static final String CONVERT_DEBEZIUM_TIMESTAMP_TO_INTEGER_CONFIG = "convertDebeziumTimestampToInteger"; + private static final ConfigDef.Type CONVERT_DEBEZIUM_TIMESTAMP_TO_INTEGER_TYPE = ConfigDef.Type.BOOLEAN; + private static final Boolean CONVERT_DEBEZIUM_TIMESTAMP_TO_INTEGER_DEFAULT = false; + private static final ConfigDef.Importance CONVERT_DEBEZIUM_TIMESTAMP_TO_INTEGER_IMPORTANCE = + ConfigDef.Importance.MEDIUM; + /** * Return the ConfigDef object used to define this config's fields. * @@ -546,6 +552,11 @@ public static ConfigDef getConfig() { BIGQUERY_CLUSTERING_FIELD_NAMES_VALIDATOR, BIGQUERY_CLUSTERING_FIELD_NAMES_IMPORTANCE, BIGQUERY_CLUSTERING_FIELD_NAMES_DOC + ).defineInternal( + CONVERT_DEBEZIUM_TIMESTAMP_TO_INTEGER_CONFIG, + CONVERT_DEBEZIUM_TIMESTAMP_TO_INTEGER_TYPE, + CONVERT_DEBEZIUM_TIMESTAMP_TO_INTEGER_DEFAULT, + CONVERT_DEBEZIUM_TIMESTAMP_TO_INTEGER_IMPORTANCE ); } @@ -780,7 +791,7 @@ public SchemaConverter getSchemaConverter() { * @return a {@link RecordConverter} for BigQuery. */ public RecordConverter> getRecordConverter() { - return new BigQueryRecordConverter(getBoolean(CONVERT_DOUBLE_SPECIAL_VALUES_CONFIG)); + return new BigQueryRecordConverter(getBoolean(CONVERT_DOUBLE_SPECIAL_VALUES_CONFIG), getBoolean(CONVERT_DEBEZIUM_TIMESTAMP_TO_INTEGER_CONFIG)); } /** diff --git a/kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/convert/BigQueryRecordConverter.java b/kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/convert/BigQueryRecordConverter.java index 4ba97174c..d5985000b 100644 --- a/kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/convert/BigQueryRecordConverter.java +++ b/kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/convert/BigQueryRecordConverter.java @@ -54,6 +54,7 @@ public class BigQueryRecordConverter implements RecordConverter bigQueryTestRecord = - new BigQueryRecordConverter(SHOULD_CONVERT_DOUBLE).convertRecord(kafkaConnectRecord, KafkaSchemaRecordType.KEY); + new BigQueryRecordConverter(SHOULD_CONVERT_DOUBLE, SHOULD_CONVERT_DEBEZIUM_TIMESTAMP_TO_INTEGER).convertRecord(kafkaConnectRecord, KafkaSchemaRecordType.KEY); assertEquals(bigQueryExpectedRecord, bigQueryTestRecord); } @@ -93,7 +94,7 @@ public void testInteger() { SinkRecord kafkaConnectRecord = spoofSinkRecord(kafkaConnectSchema, kafkaConnectStruct, false); Map bigQueryTestRecord = - new BigQueryRecordConverter(SHOULD_CONVERT_DOUBLE).convertRecord(kafkaConnectRecord, KafkaSchemaRecordType.VALUE); + new BigQueryRecordConverter(SHOULD_CONVERT_DOUBLE, SHOULD_CONVERT_DEBEZIUM_TIMESTAMP_TO_INTEGER).convertRecord(kafkaConnectRecord, KafkaSchemaRecordType.VALUE); assertEquals(bigQueryExpectedRecord, bigQueryTestRecord); final Short fieldShortValue = (short) 4242; @@ -110,7 +111,7 @@ public void testInteger() { kafkaConnectRecord = spoofSinkRecord(kafkaConnectSchema, kafkaConnectStruct, true); bigQueryTestRecord = - new BigQueryRecordConverter(SHOULD_CONVERT_DOUBLE).convertRecord(kafkaConnectRecord, KafkaSchemaRecordType.KEY); + new BigQueryRecordConverter(SHOULD_CONVERT_DOUBLE, SHOULD_CONVERT_DEBEZIUM_TIMESTAMP_TO_INTEGER).convertRecord(kafkaConnectRecord, KafkaSchemaRecordType.KEY); assertEquals(bigQueryExpectedRecord, bigQueryTestRecord); final Integer fieldIntegerValue = 424242; @@ -127,7 +128,7 @@ public void testInteger() { kafkaConnectRecord = spoofSinkRecord(kafkaConnectSchema, kafkaConnectStruct, false); bigQueryTestRecord = - new BigQueryRecordConverter(SHOULD_CONVERT_DOUBLE).convertRecord(kafkaConnectRecord, KafkaSchemaRecordType.VALUE); + new BigQueryRecordConverter(SHOULD_CONVERT_DOUBLE, SHOULD_CONVERT_DEBEZIUM_TIMESTAMP_TO_INTEGER).convertRecord(kafkaConnectRecord, KafkaSchemaRecordType.VALUE); assertEquals(bigQueryExpectedRecord, bigQueryTestRecord); final Long fieldLongValue = 424242424242L; @@ -144,7 +145,7 @@ public void testInteger() { kafkaConnectRecord = spoofSinkRecord(kafkaConnectSchema, kafkaConnectStruct, true); bigQueryTestRecord = - new BigQueryRecordConverter(SHOULD_CONVERT_DOUBLE).convertRecord(kafkaConnectRecord, KafkaSchemaRecordType.KEY); + new BigQueryRecordConverter(SHOULD_CONVERT_DOUBLE, SHOULD_CONVERT_DEBEZIUM_TIMESTAMP_TO_INTEGER).convertRecord(kafkaConnectRecord, KafkaSchemaRecordType.KEY); assertEquals(bigQueryExpectedRecord, bigQueryTestRecord); } @@ -165,7 +166,7 @@ public void testInteger() { SinkRecord kafkaConnectRecord = spoofSinkRecord(kafkaConnectSchema, kafkaConnectStruct, false); Map bigQueryTestRecord = - new BigQueryRecordConverter(SHOULD_CONVERT_DOUBLE).convertRecord(kafkaConnectRecord, KafkaSchemaRecordType.VALUE); + new BigQueryRecordConverter(SHOULD_CONVERT_DOUBLE, SHOULD_CONVERT_DEBEZIUM_TIMESTAMP_TO_INTEGER).convertRecord(kafkaConnectRecord, KafkaSchemaRecordType.VALUE); assertEquals(bigQueryExpectedRecord, bigQueryTestRecord); final Double fieldDoubleValue = 4242424242.4242; @@ -183,7 +184,7 @@ public void testInteger() { kafkaConnectRecord = spoofSinkRecord(kafkaConnectSchema, kafkaConnectStruct, true); bigQueryTestRecord = - new BigQueryRecordConverter(SHOULD_CONVERT_DOUBLE).convertRecord(kafkaConnectRecord, KafkaSchemaRecordType.KEY); + new BigQueryRecordConverter(SHOULD_CONVERT_DOUBLE, SHOULD_CONVERT_DEBEZIUM_TIMESTAMP_TO_INTEGER).convertRecord(kafkaConnectRecord, KafkaSchemaRecordType.KEY); assertEquals(bigQueryExpectedRecord, bigQueryTestRecord); } @@ -210,7 +211,7 @@ public void testInteger() { SinkRecord kafkaConnectRecord = spoofSinkRecord(kafkaConnectSchema, kafkaConnectStruct, false); Map bigQueryTestRecord = - new BigQueryRecordConverter(SHOULD_CONVERT_DOUBLE).convertRecord(kafkaConnectRecord, KafkaSchemaRecordType.VALUE); + new BigQueryRecordConverter(SHOULD_CONVERT_DOUBLE, SHOULD_CONVERT_DEBEZIUM_TIMESTAMP_TO_INTEGER).convertRecord(kafkaConnectRecord, KafkaSchemaRecordType.VALUE); assertEquals(bigQueryExpectedRecord, bigQueryTestRecord); } } @@ -233,7 +234,7 @@ public void testString() { SinkRecord kafkaConnectRecord = spoofSinkRecord(kafkaConnectSchema, kafkaConnectStruct, true); Map bigQueryTestRecord = - new BigQueryRecordConverter(SHOULD_CONVERT_DOUBLE).convertRecord(kafkaConnectRecord, KafkaSchemaRecordType.KEY); + new BigQueryRecordConverter(SHOULD_CONVERT_DOUBLE, SHOULD_CONVERT_DEBEZIUM_TIMESTAMP_TO_INTEGER).convertRecord(kafkaConnectRecord, KafkaSchemaRecordType.KEY); assertEquals(bigQueryExpectedRecord, bigQueryTestRecord); } @@ -265,7 +266,7 @@ public void testStruct() { SinkRecord kafkaConnectInnerSinkRecord = spoofSinkRecord(kafkaConnectInnerSchema, kafkaConnectInnerStruct, false); Map bigQueryTestInnerRecord = - new BigQueryRecordConverter(SHOULD_CONVERT_DOUBLE) + new BigQueryRecordConverter(SHOULD_CONVERT_DOUBLE, SHOULD_CONVERT_DEBEZIUM_TIMESTAMP_TO_INTEGER) .convertRecord(kafkaConnectInnerSinkRecord, KafkaSchemaRecordType.VALUE); assertEquals(bigQueryExpectedInnerRecord, bigQueryTestInnerRecord); @@ -287,7 +288,7 @@ public void testStruct() { SinkRecord kafkaConnectMiddleSinkRecord = spoofSinkRecord(kafkaConnectMiddleSchema, kafkaConnectMiddleStruct, true); Map bigQueryTestMiddleRecord = - new BigQueryRecordConverter(SHOULD_CONVERT_DOUBLE) + new BigQueryRecordConverter(SHOULD_CONVERT_DOUBLE, SHOULD_CONVERT_DEBEZIUM_TIMESTAMP_TO_INTEGER) .convertRecord(kafkaConnectMiddleSinkRecord, KafkaSchemaRecordType.KEY); assertEquals(bigQueryExpectedMiddleRecord, bigQueryTestMiddleRecord); @@ -309,7 +310,7 @@ public void testStruct() { SinkRecord kafkaConnectOuterSinkRecord = spoofSinkRecord(kafkaConnectOuterSchema, kafkaConnectOuterStruct, false); Map bigQueryTestOuterRecord = - new BigQueryRecordConverter(SHOULD_CONVERT_DOUBLE) + new BigQueryRecordConverter(SHOULD_CONVERT_DOUBLE, SHOULD_CONVERT_DEBEZIUM_TIMESTAMP_TO_INTEGER) .convertRecord(kafkaConnectOuterSinkRecord, KafkaSchemaRecordType.VALUE); assertEquals(bigQueryExpectedOuterRecord, bigQueryTestOuterRecord); } @@ -325,7 +326,7 @@ public void testEmptyStruct() { SinkRecord kafkaConnectSinkRecord = spoofSinkRecord(kafkaConnectInnerSchema, kafkaConnectInnerStruct, false); Map bigQueryTestInnerRecord = - new BigQueryRecordConverter(SHOULD_CONVERT_DOUBLE) + new BigQueryRecordConverter(SHOULD_CONVERT_DOUBLE, SHOULD_CONVERT_DEBEZIUM_TIMESTAMP_TO_INTEGER) .convertRecord(kafkaConnectSinkRecord, KafkaSchemaRecordType.VALUE); assertEquals(new HashMap(), bigQueryTestInnerRecord); } @@ -358,7 +359,7 @@ public void testEmptyInnerStruct() { SinkRecord kafkaConnectOuterSinkRecord = spoofSinkRecord(kafkaConnectOuterSchema, kafkaConnectOuterStruct, false); Map bigQueryTestOuterRecord = - new BigQueryRecordConverter(SHOULD_CONVERT_DOUBLE) + new BigQueryRecordConverter(SHOULD_CONVERT_DOUBLE, SHOULD_CONVERT_DEBEZIUM_TIMESTAMP_TO_INTEGER) .convertRecord(kafkaConnectOuterSinkRecord, KafkaSchemaRecordType.VALUE); assertEquals(bigQueryExpectedOuterRecord, bigQueryTestOuterRecord); @@ -398,7 +399,7 @@ public void testMap() { SinkRecord kafkaConnectRecord = spoofSinkRecord(kafkaConnectSchema, kafkaConnectStruct, true); Map bigQueryTestRecord = - new BigQueryRecordConverter(SHOULD_CONVERT_DOUBLE).convertRecord(kafkaConnectRecord, KafkaSchemaRecordType.KEY); + new BigQueryRecordConverter(SHOULD_CONVERT_DOUBLE, SHOULD_CONVERT_DEBEZIUM_TIMESTAMP_TO_INTEGER).convertRecord(kafkaConnectRecord, KafkaSchemaRecordType.KEY); assertEquals(bigQueryExpectedRecord, bigQueryTestRecord); } @@ -420,7 +421,7 @@ public void testIntegerArray() { SinkRecord kafkaConnectRecord = spoofSinkRecord(kafkaConnectSchema, kafkaConnectStruct, false); Map bigQueryTestRecord = - new BigQueryRecordConverter(SHOULD_CONVERT_DOUBLE).convertRecord(kafkaConnectRecord, KafkaSchemaRecordType.VALUE); + new BigQueryRecordConverter(SHOULD_CONVERT_DOUBLE, SHOULD_CONVERT_DEBEZIUM_TIMESTAMP_TO_INTEGER).convertRecord(kafkaConnectRecord, KafkaSchemaRecordType.VALUE); assertEquals(bigQueryExpectedRecord, bigQueryTestRecord); } @@ -447,7 +448,7 @@ public void testStructArray() { SinkRecord kafkaConnectInnerSinkRecord = spoofSinkRecord(kafkaConnectInnerSchema, kafkaConnectInnerStruct, true); Map bigQueryTestInnerRecord = - new BigQueryRecordConverter(SHOULD_CONVERT_DOUBLE) + new BigQueryRecordConverter(SHOULD_CONVERT_DOUBLE, SHOULD_CONVERT_DEBEZIUM_TIMESTAMP_TO_INTEGER) .convertRecord(kafkaConnectInnerSinkRecord, KafkaSchemaRecordType.KEY); assertEquals(bigQueryExpectedInnerRecord, bigQueryTestInnerRecord); @@ -468,7 +469,7 @@ public void testStructArray() { SinkRecord kafkaConnectRecord = spoofSinkRecord(kafkaConnectSchema, kafkaConnectStruct, false); Map bigQueryTestRecord = - new BigQueryRecordConverter(SHOULD_CONVERT_DOUBLE).convertRecord(kafkaConnectRecord, KafkaSchemaRecordType.VALUE); + new BigQueryRecordConverter(SHOULD_CONVERT_DOUBLE, SHOULD_CONVERT_DEBEZIUM_TIMESTAMP_TO_INTEGER).convertRecord(kafkaConnectRecord, KafkaSchemaRecordType.VALUE); assertEquals(bigQueryExpectedRecord, bigQueryTestRecord); } @@ -491,7 +492,7 @@ public void testStringArray() { SinkRecord kafkaConnectRecord = spoofSinkRecord(kafkaConnectSchema, kafkaConnectStruct, true); Map bigQueryTestRecord = - new BigQueryRecordConverter(SHOULD_CONVERT_DOUBLE).convertRecord(kafkaConnectRecord, KafkaSchemaRecordType.KEY); + new BigQueryRecordConverter(SHOULD_CONVERT_DOUBLE, SHOULD_CONVERT_DEBEZIUM_TIMESTAMP_TO_INTEGER).convertRecord(kafkaConnectRecord, KafkaSchemaRecordType.KEY); assertEquals(bigQueryExpectedRecord, bigQueryTestRecord); } @@ -515,12 +516,13 @@ public void testBytes() { SinkRecord kafkaConnectRecord = spoofSinkRecord(kafkaConnectSchema, kafkaConnectStruct, false); Map bigQueryTestRecord = - new BigQueryRecordConverter(SHOULD_CONVERT_DOUBLE).convertRecord(kafkaConnectRecord, KafkaSchemaRecordType.VALUE); + new BigQueryRecordConverter(SHOULD_CONVERT_DOUBLE, SHOULD_CONVERT_DEBEZIUM_TIMESTAMP_TO_INTEGER).convertRecord(kafkaConnectRecord, KafkaSchemaRecordType.VALUE); assertEquals(bigQueryExpectedRecord, bigQueryTestRecord); } @Test public void testDebeziumLogicalType() { + // Test-1 final String fieldName = "DebeziumDate"; final int fieldDate = 17226; @@ -537,7 +539,50 @@ public void testDebeziumLogicalType() { SinkRecord kafkaConnectRecord = spoofSinkRecord(kafkaConnectSchema, kafkaConnectStruct, true); Map bigQueryTestRecord = - new BigQueryRecordConverter(SHOULD_CONVERT_DOUBLE).convertRecord(kafkaConnectRecord, KafkaSchemaRecordType.KEY); + new BigQueryRecordConverter(SHOULD_CONVERT_DOUBLE, SHOULD_CONVERT_DEBEZIUM_TIMESTAMP_TO_INTEGER).convertRecord(kafkaConnectRecord, KafkaSchemaRecordType.KEY); + assertEquals(bigQueryExpectedRecord, bigQueryTestRecord); + + // Test-2 + String timeStampFieldName = "DebeziumTimestamp"; + long fieldValue = 1611854944000l; + + bigQueryExpectedRecord = new HashMap<>(); + bigQueryExpectedRecord.put(timeStampFieldName, "2021-01-28 17:29:04.000"); + + kafkaConnectSchema = SchemaBuilder + .struct() + .field(timeStampFieldName, io.debezium.time.Timestamp.schema()) + .build(); + + kafkaConnectStruct = new Struct(kafkaConnectSchema); + kafkaConnectStruct.put(timeStampFieldName, fieldValue); + kafkaConnectRecord = spoofSinkRecord(kafkaConnectSchema, kafkaConnectStruct, true); + + bigQueryTestRecord = + new BigQueryRecordConverter(SHOULD_CONVERT_DOUBLE, SHOULD_CONVERT_DEBEZIUM_TIMESTAMP_TO_INTEGER).convertRecord(kafkaConnectRecord, KafkaSchemaRecordType.KEY); + assertEquals(bigQueryExpectedRecord, bigQueryTestRecord); + + // Test-3 + timeStampFieldName = "DebeziumTimestamp"; + fieldValue = 1611854944000l; + + // By default, it is set to false + SHOULD_CONVERT_DEBEZIUM_TIMESTAMP_TO_INTEGER = true; + + bigQueryExpectedRecord = new HashMap<>(); + bigQueryExpectedRecord.put(timeStampFieldName, 1611854944000l); + + kafkaConnectSchema = SchemaBuilder + .struct() + .field(timeStampFieldName, io.debezium.time.Timestamp.schema()) + .build(); + + kafkaConnectStruct = new Struct(kafkaConnectSchema); + kafkaConnectStruct.put(timeStampFieldName, fieldValue); + kafkaConnectRecord = spoofSinkRecord(kafkaConnectSchema, kafkaConnectStruct, true); + + bigQueryTestRecord = + new BigQueryRecordConverter(SHOULD_CONVERT_DOUBLE, SHOULD_CONVERT_DEBEZIUM_TIMESTAMP_TO_INTEGER).convertRecord(kafkaConnectRecord, KafkaSchemaRecordType.KEY); assertEquals(bigQueryExpectedRecord, bigQueryTestRecord); } @@ -559,7 +604,7 @@ public void testKafkaLogicalType() { SinkRecord kafkaConnectRecord = spoofSinkRecord(kafkaConnectSchema, kafkaConnectStruct, false); Map bigQueryTestRecord = - new BigQueryRecordConverter(SHOULD_CONVERT_DOUBLE).convertRecord(kafkaConnectRecord, KafkaSchemaRecordType.VALUE); + new BigQueryRecordConverter(SHOULD_CONVERT_DOUBLE, SHOULD_CONVERT_DEBEZIUM_TIMESTAMP_TO_INTEGER).convertRecord(kafkaConnectRecord, KafkaSchemaRecordType.VALUE); assertEquals(bigQueryExpectedRecord, bigQueryTestRecord); } @@ -586,7 +631,7 @@ public void testNullable() { SinkRecord kafkaConnectRecord = spoofSinkRecord(kafkaConnectSchema, kafkaConnectStruct, true); Map bigQueryTestRecord = - new BigQueryRecordConverter(SHOULD_CONVERT_DOUBLE).convertRecord(kafkaConnectRecord, KafkaSchemaRecordType.KEY); + new BigQueryRecordConverter(SHOULD_CONVERT_DOUBLE, SHOULD_CONVERT_DEBEZIUM_TIMESTAMP_TO_INTEGER).convertRecord(kafkaConnectRecord, KafkaSchemaRecordType.KEY); assertEquals(bigQueryExpectedRecord, bigQueryTestRecord); } @@ -609,7 +654,7 @@ public void testNullableStruct() { SinkRecord kafkaConnectRecord = spoofSinkRecord(kafkaConnectSchema, kafkaConnectStruct, false); Map bigQueryTestRecord = - new BigQueryRecordConverter(SHOULD_CONVERT_DOUBLE).convertRecord(kafkaConnectRecord, KafkaSchemaRecordType.VALUE); + new BigQueryRecordConverter(SHOULD_CONVERT_DOUBLE, SHOULD_CONVERT_DEBEZIUM_TIMESTAMP_TO_INTEGER).convertRecord(kafkaConnectRecord, KafkaSchemaRecordType.VALUE); assertEquals(bigQueryExpectedRecord, bigQueryTestRecord); } @@ -631,7 +676,7 @@ public void testValidMapSchemaless() { SinkRecord kafkaConnectRecord = spoofSinkRecord(null, kafkaConnectMap, true); Map convertedMap = - new BigQueryRecordConverter(SHOULD_CONVERT_DOUBLE).convertRecord(kafkaConnectRecord, KafkaSchemaRecordType.KEY); + new BigQueryRecordConverter(SHOULD_CONVERT_DOUBLE, SHOULD_CONVERT_DEBEZIUM_TIMESTAMP_TO_INTEGER).convertRecord(kafkaConnectRecord, KafkaSchemaRecordType.KEY); assertEquals(kafkaConnectMap, convertedMap); } @@ -653,7 +698,7 @@ public void testInvalidMapSchemaless() { SinkRecord kafkaConnectRecord = spoofSinkRecord(null, kafkaConnectMap, false); Map convertedMap = - new BigQueryRecordConverter(SHOULD_CONVERT_DOUBLE).convertRecord(kafkaConnectRecord, KafkaSchemaRecordType.VALUE); + new BigQueryRecordConverter(SHOULD_CONVERT_DOUBLE, SHOULD_CONVERT_DEBEZIUM_TIMESTAMP_TO_INTEGER).convertRecord(kafkaConnectRecord, KafkaSchemaRecordType.VALUE); } @Test @@ -665,7 +710,7 @@ public void testInvalidMapSchemalessNullValue() { }}; SinkRecord kafkaConnectRecord = spoofSinkRecord(null, kafkaConnectMap, true); - Map stringObjectMap = new BigQueryRecordConverter(SHOULD_CONVERT_DOUBLE).convertRecord(kafkaConnectRecord, KafkaSchemaRecordType.KEY); + Map stringObjectMap = new BigQueryRecordConverter(SHOULD_CONVERT_DOUBLE, SHOULD_CONVERT_DEBEZIUM_TIMESTAMP_TO_INTEGER).convertRecord(kafkaConnectRecord, KafkaSchemaRecordType.KEY); Assert.assertEquals(kafkaConnectMap, stringObjectMap ); } @@ -682,7 +727,7 @@ public void testInvalidMapSchemalessNestedMapNullValue() { }}; SinkRecord kafkaConnectRecord = spoofSinkRecord(null, kafkaConnectMap, true); - Map stringObjectMap = new BigQueryRecordConverter(SHOULD_CONVERT_DOUBLE) + Map stringObjectMap = new BigQueryRecordConverter(SHOULD_CONVERT_DOUBLE, SHOULD_CONVERT_DEBEZIUM_TIMESTAMP_TO_INTEGER) .convertRecord(kafkaConnectRecord, KafkaSchemaRecordType.KEY); Assert.assertEquals(kafkaConnectMap, stringObjectMap); } @@ -705,7 +750,7 @@ public void testMapSchemalessConvertDouble() { SinkRecord kafkaConnectRecord = spoofSinkRecord(null, kafkaConnectMap, true); Map convertedMap = - new BigQueryRecordConverter(SHOULD_CONVERT_DOUBLE).convertRecord(kafkaConnectRecord, KafkaSchemaRecordType.KEY); + new BigQueryRecordConverter(SHOULD_CONVERT_DOUBLE, SHOULD_CONVERT_DEBEZIUM_TIMESTAMP_TO_INTEGER).convertRecord(kafkaConnectRecord, KafkaSchemaRecordType.KEY); assertEquals(convertedMap.get("f1"), Double.MAX_VALUE); assertEquals(((Map)(convertedMap.get("f3"))).get("f4"), Double.MAX_VALUE); assertEquals(((ArrayList)((Map)(convertedMap.get("f3"))).get("f6")).get(1), Double.MAX_VALUE); @@ -731,7 +776,7 @@ public void testMapSchemalessConvertBytes() { SinkRecord kafkaConnectRecord = spoofSinkRecord(null, kafkaConnectMap, false); Map convertedMap = - new BigQueryRecordConverter(SHOULD_CONVERT_DOUBLE).convertRecord(kafkaConnectRecord, KafkaSchemaRecordType.VALUE); + new BigQueryRecordConverter(SHOULD_CONVERT_DOUBLE, SHOULD_CONVERT_DEBEZIUM_TIMESTAMP_TO_INTEGER).convertRecord(kafkaConnectRecord, KafkaSchemaRecordType.VALUE); assertEquals(convertedMap.get("f1"), Base64.getEncoder().encodeToString(helloWorld)); assertEquals(((Map)(convertedMap.get("f3"))).get("f4"), Base64.getEncoder().encodeToString(helloWorld)); }