-
Notifications
You must be signed in to change notification settings - Fork 3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add Kafka Json encoder #4477
Add Kafka Json encoder #4477
Conversation
54fb472
to
355841e
Compare
presto-kafka/src/main/java/io/prestosql/plugin/kafka/encoder/json/JsonRowEncoder.java
Outdated
Show resolved
Hide resolved
presto-kafka/src/main/java/io/prestosql/plugin/kafka/encoder/json/JsonRowEncoder.java
Outdated
Show resolved
Hide resolved
presto-kafka/src/main/java/io/prestosql/plugin/kafka/encoder/json/JsonRowEncoder.java
Outdated
Show resolved
Hide resolved
presto-kafka/src/main/java/io/prestosql/plugin/kafka/encoder/json/JsonRowEncoder.java
Outdated
Show resolved
Hide resolved
presto-kafka/src/test/java/io/prestosql/plugin/kafka/TestKafkaIntegrationSmokeTest.java
Outdated
Show resolved
Hide resolved
presto-kafka/src/test/java/io/prestosql/plugin/kafka/TestKafkaIntegrationSmokeTest.java
Outdated
Show resolved
Hide resolved
presto-kafka/src/test/java/io/prestosql/plugin/kafka/TestKafkaIntegrationSmokeTest.java
Outdated
Show resolved
Hide resolved
presto-kafka/src/test/java/io/prestosql/plugin/kafka/encoder/json/TestJsonDateTime.java
Outdated
Show resolved
Hide resolved
presto-kafka/src/test/java/io/prestosql/plugin/kafka/TestKafkaIntegrationSmokeTest.java
Outdated
Show resolved
Hide resolved
presto-kafka/src/test/resources/write_test/custom_date_time.json
Outdated
Show resolved
Hide resolved
presto-kafka/src/main/java/io/prestosql/plugin/kafka/encoder/json/JsonRowEncoder.java
Outdated
Show resolved
Hide resolved
presto-kafka/src/main/java/io/prestosql/plugin/kafka/encoder/json/JsonRowEncoder.java
Outdated
Show resolved
Hide resolved
presto-kafka/src/main/java/io/prestosql/plugin/kafka/encoder/json/JsonRowEncoder.java
Show resolved
Hide resolved
presto-kafka/src/main/java/io/prestosql/plugin/kafka/encoder/json/JsonRowEncoder.java
Outdated
Show resolved
Hide resolved
presto-kafka/src/main/java/io/prestosql/plugin/kafka/encoder/json/JsonRowEncoder.java
Outdated
Show resolved
Hide resolved
presto-kafka/src/main/java/io/prestosql/plugin/kafka/encoder/json/JsonRowEncoder.java
Outdated
Show resolved
Hide resolved
presto-kafka/src/main/java/io/prestosql/plugin/kafka/encoder/json/JsonRowEncoder.java
Outdated
Show resolved
Hide resolved
presto-kafka/src/main/java/io/prestosql/plugin/kafka/encoder/json/JsonRowEncoder.java
Outdated
Show resolved
Hide resolved
presto-kafka/src/test/java/io/prestosql/plugin/kafka/TestKafkaIntegrationSmokeTest.java
Outdated
Show resolved
Hide resolved
Implemented all the changes in the resolved review comments, will make it through the rest on Monday. I changed the date/time formatting to precompile a list of functions to format the values, lmk what you think of this. |
163919d
to
b3c8fd0
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks. I left some comments but is not full review. Structurally it looks much better. Timestamp handling is hard as I honestly am not sure what is the expected behaviour here.
presto-kafka/src/main/java/io/prestosql/plugin/kafka/encoder/json/JsonRowEncoder.java
Outdated
Show resolved
Hide resolved
presto-kafka/src/main/java/io/prestosql/plugin/kafka/encoder/json/JsonRowEncoder.java
Outdated
Show resolved
Hide resolved
presto-kafka/src/main/java/io/prestosql/plugin/kafka/encoder/json/JsonRowEncoder.java
Outdated
Show resolved
Hide resolved
presto-kafka/src/main/java/io/prestosql/plugin/kafka/encoder/json/JsonRowEncoder.java
Outdated
Show resolved
Hide resolved
presto-kafka/src/main/java/io/prestosql/plugin/kafka/encoder/json/JsonRowEncoder.java
Outdated
Show resolved
Hide resolved
...kafka/src/main/java/io/prestosql/plugin/kafka/encoder/json/format/DefaultFormatFunction.java
Outdated
Show resolved
Hide resolved
presto-kafka/src/main/java/io/prestosql/plugin/kafka/encoder/json/JsonRowEncoder.java
Outdated
Show resolved
Hide resolved
presto-kafka/src/test/java/io/prestosql/plugin/kafka/TestKafkaIntegrationSmokeTest.java
Outdated
Show resolved
Hide resolved
presto-kafka/src/test/java/io/prestosql/plugin/kafka/encoder/json/TestJsonDateTime.java
Outdated
Show resolved
Hide resolved
...c/main/java/io/prestosql/plugin/kafka/encoder/json/format/CustomDateTimeFormatFunctions.java
Outdated
Show resolved
Hide resolved
presto-kafka/src/test/java/io/prestosql/plugin/kafka/TestKafkaIntegrationSmokeTest.java
Outdated
Show resolved
Hide resolved
...c/main/java/io/prestosql/plugin/kafka/encoder/json/format/CustomDateTimeFormatFunctions.java
Outdated
Show resolved
Hide resolved
b3c8fd0
to
3ddeb43
Compare
Changes:
|
955c7f6
to
30b4b63
Compare
I think I made the suggested legacy timestamp changes. The legacy format functions definitely need a close look at tho |
fdcb87f
to
33f8a82
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Legacy semantics imply that that temporal types are interpreted in the session’s time zone. See for example the Javadoc on TimestampType
. It seems to me that throughout these changes, the UTC timezone is used instead.
...o-kafka/src/main/java/io/prestosql/plugin/kafka/encoder/json/format/JsonFormatFunctions.java
Outdated
Show resolved
Hide resolved
...c/main/java/io/prestosql/plugin/kafka/encoder/json/format/CustomDateTimeFormatFunctions.java
Outdated
Show resolved
Hide resolved
...o-kafka/src/main/java/io/prestosql/plugin/kafka/encoder/json/format/JsonFormatFunctions.java
Outdated
Show resolved
Hide resolved
...o-kafka/src/main/java/io/prestosql/plugin/kafka/encoder/json/format/JsonFormatFunctions.java
Outdated
Show resolved
Hide resolved
33f8a82
to
34cb597
Compare
34cb597
to
a900543
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Mostly stylistic suggestions.
if (type == DATE) { | ||
return JsonFormatFunction.builder().setFormatDateFunc(getDateFormatFunction(dataFormat, formatHint)).build(); | ||
} | ||
else if (type == TIME) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You actually don't need all the else
s.
public static Function<Long, String> formatDateFunc(String formatHint) | ||
{ | ||
try { | ||
DateTimeFormatter formatter = DateTimeFormat.forPattern(formatHint).withLocale(Locale.ENGLISH).withZoneUTC(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You can use a static import (below too).
{ | ||
try { | ||
DateTimeFormatter formatter = DateTimeFormat.forPattern(formatHint).withLocale(Locale.ENGLISH).withZoneUTC(); | ||
return millis -> (new DateTime(millis, DateTimeZone.UTC)).toString(formatter); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's be consistent in the way we format. Either new DateTime(...).toString(fromatter)
as here, or formatter.print(new DateTime(...))
as below.
|
||
public static class Builder | ||
{ | ||
private Function<Long, String> formatDateFunc = (ignored) -> { throw new RuntimeException("unsupported argument type"); }; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You could define
private static final Function<Long, String> UNIMPLEMENTED = ignored -> { throw new RuntimeException("unsupported argument type"); };
and reference that:
private Function<Long, String> formatDateFunc = UNIMPLEMENTED;
private Function<Long, String> formatTimeFunc = UNIMPLEMENTED;
...
|
||
public static long daysToEpochMillis(long value) | ||
{ | ||
return DAYS.toMillis(value); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is subjective, but you could just as well inline. I think DAYS.toMillis(value)
reads pretty well.
|
||
public static Function<Long, String> formatMillisWithTZFunc() | ||
{ | ||
return encodedMillisWithTZ -> String.valueOf(unpackMillisUtc(encodedMillisWithTZ)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Perhaps we should not support this scenario. This does not seem a meaningful return value - without the tz info, the milliseconds don't make much sense here, right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry, that was wrong. This format implies millis since epoch.
unpackZoneKey(encodedMillisWithTZ).getZoneId())); | ||
} | ||
|
||
private static LocalDateTime localDateTimeOfEpochMillis(long epochMillis) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Perhaps this would warrant being public in JsonFormatFunctions
. It's being used in ISO8601FormatFunctions
, as well.
return millis -> String.valueOf(millisToSeconds(millis)); | ||
} | ||
|
||
public static Function<Long, String> formatSecondsWithTZFunc() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Another situation in which we're throwing away tz info.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is like unix time()
, it's fine.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
skimming.
i have several code style-level comments whch are generally applicable to other place as well.
i have some comemnts abut timestamp formatting semantics.
lets talk about them more.
checkArgument(isSupportedType(columnHandle.getType()), "Unsupported column type '%s' for column '%s'", columnHandle.getType(), columnHandle.getName()); | ||
|
||
if (isDateTimeType(columnHandle.getType())) { | ||
checkArgument(columnHandle.getDataFormat() != null, "Unsupported or no dataFormat '%s' defined for temporal column '%s'", columnHandle.getDataFormat(), columnHandle.getName()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
checkArgument(columnHandle.getDataFormat() != null, "Unsupported or no dataFormat '%s' defined for temporal column '%s'", columnHandle.getDataFormat(), columnHandle.getName()); | |
checkArgument(columnHandle.getDataFormat() != null, "No dataFormat defined for temporal column '%s'", columnHandle.getName()); |
else if (type == TIME) { | ||
return JsonFormatFunction.builder().setFormatTimeFunc(getTimeFormatFunction(dataFormat, formatHint)).build(); | ||
} | ||
else if (type == TIME_WITH_TIME_ZONE) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
{ | ||
private CustomDateTimeFormatFunctions() {} | ||
|
||
public static Function<Long, String> formatDateFunc(String formatHint) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
avoid abbreviations
func -> function
public static Function<Long, String> formatDateFunc(String formatHint) | ||
{ | ||
try { | ||
DateTimeFormatter formatter = DateTimeFormat.forPattern(formatHint).withLocale(Locale.ENGLISH).withZoneUTC(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is the format hint documented to be Joda Time?
DateTimeFormatter formatter = DateTimeFormat.forPattern(formatHint).withLocale(Locale.ENGLISH).withZoneUTC(); | ||
return millis -> (new DateTime(millis, DateTimeZone.UTC)).toString(formatter); | ||
} | ||
catch (IllegalArgumentException e) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Code is correct but I would find it more readable if the lambda was outside of try block...
{ | ||
try { | ||
DateTimeFormatter formatter = DateTimeFormat.forPattern(formatHint).withLocale(Locale.ENGLISH).withZoneUTC(); | ||
return millis -> (new DateTime(millis, DateTimeZone.UTC)).toString(formatter); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it would be better to supplement the session zone here.
This would make it more similar to the non-legacy case, when we implement it.
Ie
INSERT ,.. TIMESTAMP 'some_date_time'
would be some_date_time session_zone
in legacy
and some_date_time
in non-legacy
and if the format hint does not contain the zone,
they would actually be the same.
{ | ||
try { | ||
DateTimeFormatter formatter = DateTimeFormat.forPattern(formatHint).withLocale(Locale.ENGLISH).withZoneUTC(); | ||
return encodedMillisWithTZ -> formatter.print(new DateTime( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
encodedMillisWithTZ -> value
DateTimeZone.forID(unpackZoneKey(encodedMillisWithTZ).getId()))); | ||
} | ||
catch (IllegalArgumentException e) { | ||
throw new IllegalArgumentException(format("Invalid joda pattern '%s' passed as format hint", formatHint), e); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
joda -> Joda Time
return formatTimeFunc.apply(value); | ||
} | ||
|
||
public String formatTimeWithTZ(long value) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
withTZ -> withTimeZone
a900543
to
99ad68e
Compare
removed temporal column support from this pr, will open another pr for that |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
Merged, thanks! |
Add
JsonRowEncoder
andJsonRowEncoderFactory
Add test case in
io.prestosql.plugin.kafka.TestKafkaIntegrationSmokeTest#testRoundTripAllFormats
Might it be better to have separate formatters for the date/time types rather than methods in
JsonRowEncoder
?closes #3980