Skip to content

Commit

Permalink
[CCDB-4843] Register TimestampConverter in DebeziumLogicalConverters …
Browse files Browse the repository at this point in the history
…behind a config for BigQuerySink Connector
  • Loading branch information
sp-gupta committed Mar 16, 2023
1 parent db332ed commit c525134
Show file tree
Hide file tree
Showing 3 changed files with 95 additions and 33 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -346,6 +346,12 @@ public class BigQuerySinkConfig extends AbstractConfig {
private static final String BIGQUERY_CLUSTERING_FIELD_NAMES_DOC =
"List of fields on which data should be clustered by in BigQuery, separated by commas";

public static final String CONVERT_DEBEZIUM_TIMESTAMP_TO_INTEGER_CONFIG = "convertDebeziumTimestampToInteger";
private static final ConfigDef.Type CONVERT_DEBEZIUM_TIMESTAMP_TO_INTEGER_TYPE = ConfigDef.Type.BOOLEAN;
private static final Boolean CONVERT_DEBEZIUM_TIMESTAMP_TO_INTEGER_DEFAULT = false;
private static final ConfigDef.Importance CONVERT_DEBEZIUM_TIMESTAMP_TO_INTEGER_IMPORTANCE =
ConfigDef.Importance.MEDIUM;

/**
* Return the ConfigDef object used to define this config's fields.
*
Expand Down Expand Up @@ -546,6 +552,11 @@ public static ConfigDef getConfig() {
BIGQUERY_CLUSTERING_FIELD_NAMES_VALIDATOR,
BIGQUERY_CLUSTERING_FIELD_NAMES_IMPORTANCE,
BIGQUERY_CLUSTERING_FIELD_NAMES_DOC
).defineInternal(
CONVERT_DEBEZIUM_TIMESTAMP_TO_INTEGER_CONFIG,
CONVERT_DEBEZIUM_TIMESTAMP_TO_INTEGER_TYPE,
CONVERT_DEBEZIUM_TIMESTAMP_TO_INTEGER_DEFAULT,
CONVERT_DEBEZIUM_TIMESTAMP_TO_INTEGER_IMPORTANCE
);
}

Expand Down Expand Up @@ -780,7 +791,7 @@ public SchemaConverter<Schema> getSchemaConverter() {
* @return a {@link RecordConverter} for BigQuery.
*/
public RecordConverter<Map<String, Object>> getRecordConverter() {
return new BigQueryRecordConverter(getBoolean(CONVERT_DOUBLE_SPECIAL_VALUES_CONFIG));
return new BigQueryRecordConverter(getBoolean(CONVERT_DOUBLE_SPECIAL_VALUES_CONFIG), getBoolean(CONVERT_DEBEZIUM_TIMESTAMP_TO_INTEGER_CONFIG));
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,15 +54,17 @@ public class BigQueryRecordConverter implements RecordConverter<Map<String, Obje
Integer.class, Long.class, Float.class, Double.class, String.class)
);
private boolean shouldConvertSpecialDouble;
private boolean shouldConvertDebeziumTimestampToInteger;

static {
// force registration
new DebeziumLogicalConverters();
new KafkaLogicalConverters();
}

public BigQueryRecordConverter(boolean shouldConvertDoubleSpecial) {
public BigQueryRecordConverter(boolean shouldConvertDoubleSpecial, boolean shouldConvertDebeziumTimestampToInteger) {
this.shouldConvertSpecialDouble = shouldConvertDoubleSpecial;
this.shouldConvertDebeziumTimestampToInteger = shouldConvertDebeziumTimestampToInteger;
}

/**
Expand Down Expand Up @@ -236,6 +238,10 @@ private Object convertLogical(Object kafkaConnectObject,
Schema kafkaConnectSchema) {
LogicalTypeConverter converter =
LogicalConverterRegistry.getConverter(kafkaConnectSchema.name());

if(shouldConvertDebeziumTimestampToInteger && converter instanceof DebeziumLogicalConverters.TimestampConverter) {
return (Long) kafkaConnectObject;
}
return converter.convert(kafkaConnectObject);
}

Expand Down
Loading

0 comments on commit c525134

Please sign in to comment.