-
Notifications
You must be signed in to change notification settings - Fork 1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
fix: csas/ctas with timestamp column is used for output rowtime #4489
Conversation
ksql-streams/src/main/java/io/confluent/ksql/execution/streams/SinkBuilder.java
Outdated
Show resolved
Hide resolved
@@ -12,7 +12,7 @@ | |||
"name": "timestamp format", | |||
"statements": [ | |||
"CREATE STREAM TEST (ID bigint, event_timestamp VARCHAR) WITH (kafka_topic='test_topic', value_format='JSON', timestamp='event_timestamp', timestamp_format='yyyy-MM-dd''T''HH:mm:ssX');", | |||
"CREATE STREAM TS AS select id, stringtotimestamp(event_timestamp, 'yyyy-MM-dd''T''HH:mm:ssX') as ets from test;" | |||
"CREATE STREAM TS WITH (timestamp='ets') AS select id, stringtotimestamp(event_timestamp, 'yyyy-MM-dd''T''HH:mm:ssX') as ets from test;" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I added the WITH TIMESTAMP here to use the source timestamp as the output timestamp.
95d22bf
to
2a587f1
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @spena
I think there's some missing test cases on this new functionality (which I've called out below). I think these will highlight some holes in the implementation: specifically, this doesn't look like it will work if timestampformat
is specified in the WITH clause of C*AS statements.
ksql-execution/src/main/java/io/confluent/ksql/execution/plan/StreamSink.java
Outdated
Show resolved
Hide resolved
ksql-execution/src/main/java/io/confluent/ksql/execution/plan/TableSink.java
Outdated
Show resolved
Hide resolved
...onal-tests/src/test/resources/expected_topology/0_6_0-pre/timestampformat_-_timestamp_format
Outdated
Show resolved
Hide resolved
ksql-functional-tests/src/test/resources/query-validation-tests/timestampformat.json
Outdated
Show resolved
Hide resolved
ksql-functional-tests/src/test/resources/query-validation-tests/timestampformat.json
Outdated
Show resolved
Hide resolved
ksql-functional-tests/src/test/resources/query-validation-tests/timestampformat.json
Outdated
Show resolved
Hide resolved
ksql-streams/src/main/java/io/confluent/ksql/execution/streams/SinkBuilder.java
Outdated
Show resolved
Hide resolved
ksql-streams/src/main/java/io/confluent/ksql/execution/streams/SinkBuilder.java
Outdated
Show resolved
Hide resolved
ksql-streams/src/main/java/io/confluent/ksql/execution/streams/SinkBuilder.java
Outdated
Show resolved
Hide resolved
ksql-streams/src/test/java/io/confluent/ksql/execution/streams/StreamSinkBuilderTest.java
Outdated
Show resolved
Hide resolved
0e10dcb
to
d4989d2
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @spena
Could of suggestions and questions below. Plus what looks to me like a bug in your change to the json schema file.
Otherwise, LGTM!
} | ||
}, | ||
"title" : "streamSinkV1", | ||
"required" : [ "@type", "properties", "source", "formats", "topicName" ] | ||
"required" : [ "@type", "properties", "source", "formats", "topicName", "timestampColumn" ] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
timestampColumn
should be optional, right? Not required.
I think with this the way it is the JSON would fail to validate if it had no timestamp column defined.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@rodesai is there a test somewhere that tests existing plans validate against the schema?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
our qtt tests should catch any existing plans that fail to deserialize. In this case I think the schema is just wrong. @spena how did you generate it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
But do we deserialize using the schema?
} | ||
}, | ||
"title" : "tableSinkV1", | ||
"required" : [ "@type", "properties", "source", "formats", "topicName" ] | ||
"required" : [ "@type", "properties", "source", "formats", "topicName", "timestampColumn" ] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As above:
timestampColumn
should be optional, right? Not required.I think with this the way it is the JSON would fail to validate if it had no timestamp column defined.
); | ||
|
||
final KStream<K, GenericRow> transformed = tsTransformer | ||
.map(t -> stream.transform(t)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we used the variant of transform
that takes a Named
please, so that we can give the step in the topology a more descriptive name than just KSTREAM-TRANSFORM-0003
, e.g. name it Apply-timestamp
or similar.
(You'll need to re-write the topologies again once you've done this)
.map(c -> sourceSchema.findValueColumn(c).orElseThrow(IllegalStateException::new)) | ||
.map(Column::index) | ||
.map(timestampPolicy::create) | ||
.filter(te -> te instanceof AbstractColumnTimestampExtractor) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Remove this filter (and cast), by passing making TimestampExtractionPolicy.create
return KsqlTimestampExtraction
, where KsqlTimestampExtraction
extends TimestampExtraction
.
public abstract class AbstractColumnTimestampExtractor implements TimestampExtractor { | ||
protected final int timetampColumnIndex; | ||
|
||
AbstractColumnTimestampExtractor(final int timestampColumnindex) { | ||
Preconditions.checkArgument(timestampColumnindex >= 0, "timestampColumnindex must be >= 0"); | ||
this.timetampColumnIndex = timestampColumnindex; | ||
} | ||
|
||
@Override | ||
public long extract(final ConsumerRecord<Object, Object> record, final long previousTimestamp) { | ||
if (record.value() instanceof GenericRow) { | ||
try { | ||
return extract((GenericRow) record.value()); | ||
} catch (final Exception e) { | ||
throw new KsqlException("Unable to extract timestamp from record." | ||
+ " record=" + record, | ||
e); | ||
} | ||
} | ||
|
||
return 0; | ||
} | ||
|
||
public abstract long extract(GenericRow row); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd consider just making this an interface:
public interface KsqlTimestampExtractor implements TimestampExtractor {
long extract(GenericRow row);
default long extract(final ConsumerRecord<Object, Object> record, final long previousTimestamp) {
return extract((GenericRow) record.value());
}
}
This is because the base class can't actually handle getting the column at index timetampColumnIndex
out of the row for the subclasses, so the abstract class isn't adding much.
Note:
- This impl throws if the value is not a
GenericRow
- as it should always be aGenericRow
and it should fail if its not. Return a timestamp of0
just doesn't make any sense. - I've removed the common exception handling. Why? Because we'd want the same exception handling when calling
extreact(row)
directly, so it seems to me we still need the exception handling in the subclasses.
processorContext.forward( | ||
key, | ||
row, | ||
To.all().withTimestamp(timestampExtractor.extract(row)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If this throws.... does the exception get logged? Does it kill the streams task?
@rodesai what do you think should be the behaviour here? I guess it should be inline with other parts of the processing. What happens with (de)serialization errors?
} | ||
|
||
@Test(expected = NullPointerException.class) | ||
public void shouldThrowIfNegativeProcessorContext() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
public void shouldThrowIfNegativeProcessorContext() { | |
public void shouldThrowOnNullProcessorContext() { |
5df1126
to
2eece42
Compare
2eece42
to
aa40d7b
Compare
Description
Fixes #836 (comment)
BREAKING-CHANGE:
This change is breaking the behavior of the CSAS/CTAS by transforming the
ROWTIME
of the output/sink topic to the one specified in theWITH TIMESTAMP
property. Previously, theROWTIME
was derived from the input/source topics.Although this is PR is mostly a fix to correctly write the ROWTIME In the sink topic, I flagged it as a breaking-change because users will experience different results when moving to this new behavior.
How to review:
The main change is found on the new file
SinkBuilder
which builds a stream using thetransform
method to modify the timestamp:The rest of the changes are just passing the right output timestamp column from the plan to the stream/table builder.
Testing done
Reviewer checklist