Skip to content

Commit

Permalink
Supporting the variable precision of DateTime64
Browse files Browse the repository at this point in the history
  • Loading branch information
Paultagoras committed Jan 30, 2024
1 parent 87ec89d commit ce9ac31
Show file tree
Hide file tree
Showing 2 changed files with 38 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,7 @@
import com.clickhouse.kafka.connect.sink.db.mapping.Column;
import com.clickhouse.kafka.connect.sink.db.mapping.Table;
import com.clickhouse.kafka.connect.sink.db.mapping.Type;
import com.clickhouse.kafka.connect.sink.dlq.DuplicateException;
import com.clickhouse.kafka.connect.sink.dlq.ErrorReporter;
import com.clickhouse.kafka.connect.util.Mask;

import com.clickhouse.kafka.connect.util.QueryIdentifier;
import com.clickhouse.kafka.connect.util.Utils;
Expand All @@ -24,6 +22,7 @@

import java.math.BigDecimal;
import java.time.ZonedDateTime;
import java.time.temporal.ChronoField;
import java.util.*;
import java.util.concurrent.TimeUnit;

Expand Down Expand Up @@ -196,7 +195,7 @@ private boolean validateDataSchema(Table table, Record record, boolean onlyField
return validSchema;
}

private void doWriteDates(Type type, ClickHousePipedOutputStream stream, Data value) throws IOException {
private void doWriteDates(Type type, ClickHousePipedOutputStream stream, Data value, int precision) throws IOException {
// TODO: develop more specific tests to have better coverage
if (value.getObject() == null) {
BinaryStreamUtils.writeNull(stream);
Expand Down Expand Up @@ -239,8 +238,16 @@ private void doWriteDates(Type type, ClickHousePipedOutputStream stream, Data va
} else {
BinaryStreamUtils.writeUnsignedInt32(stream, (Long) value.getObject());
}
} else if (value.getFieldType().equals(Schema.Type.STRING)) {
try {
ZonedDateTime zonedDateTime = ZonedDateTime.parse((String) value.getObject());
LOGGER.trace("Writing epoch seconds: {}", zonedDateTime.toInstant().getEpochSecond());
BinaryStreamUtils.writeUnsignedInt32(stream, zonedDateTime.toInstant().getEpochSecond());
} catch (Exception e) {
LOGGER.error("Error parsing date time string: {}", value.getObject());
unsupported = true;
}
} else {

unsupported = true;
}
break;
Expand All @@ -256,7 +263,24 @@ private void doWriteDates(Type type, ClickHousePipedOutputStream stream, Data va
} else if (value.getFieldType().equals(Schema.Type.STRING)) {
try {
ZonedDateTime zonedDateTime = ZonedDateTime.parse((String) value.getObject());
BinaryStreamUtils.writeInt64(stream, zonedDateTime.toInstant().toEpochMilli());
long seconds = zonedDateTime.toInstant().getEpochSecond();
long milliSeconds = zonedDateTime.toInstant().toEpochMilli();
long microSeconds = TimeUnit.MICROSECONDS.convert(seconds, TimeUnit.SECONDS) + zonedDateTime.get(ChronoField.MICRO_OF_SECOND);
long nanoSeconds = TimeUnit.NANOSECONDS.convert(seconds, TimeUnit.SECONDS) + zonedDateTime.getNano();

if (precision == 3) {
LOGGER.trace("Writing epoch milliseconds: {}", milliSeconds);
BinaryStreamUtils.writeInt64(stream, milliSeconds);
} else if (precision == 6) {
LOGGER.trace("Writing epoch microseconds: {}", microSeconds);
BinaryStreamUtils.writeInt64(stream, microSeconds);
} else if (precision == 9) {
LOGGER.trace("Writing epoch nanoseconds: {}", nanoSeconds);
BinaryStreamUtils.writeInt64(stream, nanoSeconds);
} else {
LOGGER.trace("Writing epoch seconds: {}", seconds);
BinaryStreamUtils.writeInt64(stream, seconds);
}
} catch (Exception e) {
LOGGER.error("Error parsing date time string: {}", value.getObject());
unsupported = true;
Expand Down Expand Up @@ -405,7 +429,7 @@ private void doWriteCol(Record record, Column col, ClickHousePipedOutputStream s
case Date32:
case DateTime:
case DateTime64:
doWriteDates(colType, stream, value);
doWriteDates(colType, stream, value, col.getPrecision());
break;
case Decimal:
if (value.getObject() == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ private Column(String name, Type type, boolean isNullable, boolean hasDefaultVa
this.isNullable = isNullable;
this.hasDefaultValue = hasDefaultValue;
this.subType = null;
this.precision = 0;
}

private Column(String name, Type type, boolean isNullable, boolean hasDefaultValue, Type mapKeyType, Type mapValueType) {
Expand All @@ -40,6 +41,7 @@ private Column(String name, Type type, boolean isNullable, boolean hasDefaultVal
this.subType = null;
this.mapKeyType = mapKeyType;
this.mapValueType = mapValueType;
this.precision = 0;
}

private Column(String name, Type type, boolean isNullable, boolean hasDefaultValue, Column subType) {
Expand All @@ -48,6 +50,7 @@ private Column(String name, Type type, boolean isNullable, boolean hasDefaultVal
this.isNullable = isNullable;
this.hasDefaultValue = hasDefaultValue;
this.subType = subType;
this.precision = 0;
}

private Column(String name, Type type, boolean isNullable, boolean hasDefaultValue, int precision, int scale) {
Expand Down Expand Up @@ -186,6 +189,11 @@ public static Column extractColumn(String name, String valueType, boolean isNull
return extractColumn(name, valueType.substring("LowCardinality".length() + 1, valueType.length() - 1), isNull, hasDefaultValue);
} else if (valueType.startsWith("Nullable")) {
return extractColumn(name, valueType.substring("Nullable".length() + 1, valueType.length() - 1), true, hasDefaultValue);
} else if (type == Type.DateTime64) {
String[] scaleAndTimezone = valueType.substring("DateTime64".length() + 1, valueType.length() - 1).split(",");
int precision = Integer.parseInt(scaleAndTimezone[0].trim());
LOGGER.trace("Precision is {}", precision);
return new Column(name, type, isNull, hasDefaultValue, precision, 0);
} else if (type == Type.Decimal) {
final Pattern patter = Pattern.compile("Decimal(?<size>\\d{2,3})?\\s*(\\((?<a1>\\d{1,}\\s*)?,*\\s*(?<a2>\\d{1,})?\\))?");
Matcher match = patter.matcher(valueType);
Expand Down

0 comments on commit ce9ac31

Please sign in to comment.