Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Rethink TIMESTAMP on CSAS/CTAS #836

Closed
big-andy-coates opened this issue Mar 2, 2018 · 9 comments · Fixed by #4489
Closed

Rethink TIMESTAMP on CSAS/CTAS #836

big-andy-coates opened this issue Mar 2, 2018 · 9 comments · Fixed by #4489
Assignees

Comments

@big-andy-coates
Copy link
Contributor

Following on from the discussion here:
#756 (comment)

We should consider changing the semantics of the WITH TIMESTAMP property on CSAS and CTASs, or add a new property to allow users to override the timestamp extractors to use within the query.

Currently, using WITH TIMESTAMP on CS or CT statements makes sense: it allows users to override the timestamp extractor to use when reading the source topic. These statements are about importing existing datasets in KSQL. They may come from disparate systems, and may not have the correct value in the record's metadata, so providing a way to override that when importing is useful.

Conversely, using WITH TIMESTAMP on CSAS or CTAS statements is not necessarily intuitive, (or particularly useful?). Such a statement does not currently change the way the statement is run. It does not change the timestamp extractor used. It does not affect the contents of ROWTIME or the timestamp used to publish output records to Kafka. All it does is register metadata that derived streams and tables will use to configure their timestamp extractors with.

So currently, WITH TIMESTAMP, in both cases, controls only the timestamp extractor downstream/derived queries should use when reading the data.

One downside to this approach is that, should a downstream query want to use a different timestamp, it would require an intermediate query that did nothing except select * with a different timestamp, with its associated costs, to achieve this.

Given that, for CSAS and CTAS, it's no longer about importing data into KSQL, but about deriving data, what would be more intuitive and useful, IMHO, would be for the WITH TIMESTAMP to override the timestamp extractor the query itself uses when reading a source stream. This would allow the each statement to be more self-contained, and would allow such a query to control the record timestamp used when outputting to Kafka, and hence control the default timestamp to be used by downstream systems, without the need of any additional metadata.

It's possible this change could be done by adding a new property, rather than changing the semantics of the existing.

We should probably also think ahead to how this would work with stream-to-stream joins, where users should be able to override the timestamp extractor of each source stream independently.

@rodesai
Copy link
Contributor

rodesai commented Mar 3, 2018

Continuing the discussion from: #756

Plus I'm struggling to think of a use-case where the current TIMESTAMP semantics within a CSAS would be useful. Can you?

Sure. I may have a time in some format that isn't easy to handle with KSQL UDFs so I need to do some intermediate queries to distill it down to a timestamp (e.g. maybe my date is split in 2 or more columns). Its convenient to declare the timestamp column in the CSAS that creates the stream w/ the final timestamp column rather than in each query that consumes that stream. Or maybe my stream has two time fields. I may want to split it into separate streams to be processed independently. For example a stream of data about people visiting an office building may have a field called entry time and a field called exit time. I could CSAS one stream that declares the entry time as the timestamp and CSAS another stream that declares the exit time as a timestamp. I could then write queries on those streams without having to specify what the timestamp is each time.

But I don't think this is about which pattern is more common. We're defining a language here and we should keep semantics consistent so its easier to reason about as it grows. When we say CREATE STREAM|TABLE <name> WITH (<properties>), IMO the properties in WITH should describe the Stream or Table being created, not the query being run to populate it. If we want to support overriding the configured Stream/Table timestamp field for a given query, then we should use a different config, preferably somewhere outside the WITH.

@big-andy-coates
Copy link
Contributor Author

big-andy-coates commented Mar 7, 2018

Yep, makes sense.

Maybe we need to support CREATE STEAM|TABLE <name> WITH (<properties>) AS <query> WITH (<properties>).

BTW, I think the updated docs around TIMESTAMP in the change below are incorrect. I think the match Damian's approach, and not the current functionality: #783

@big-andy-coates
Copy link
Contributor Author

big-andy-coates commented Feb 4, 2020

I actually think this is bad behaviour. In general:

  • things in the WITH clause of a CREATE STREAM or CREATE TABLE statement affect the source (topic), and
  • things in the WITH clause of a CREATE STREAM AS SELECT or CREATE TABLE AS SELECT affect the sink (topic).

For example, think about VALUE_FORMAT. A CSAS with a WITH(VALUE_FORMAT) changes the format of how the data is written to the sink. Obviously, this then means the stream created in the CSAS also has the specified format, and this format will be used when reading the data if the source is used in any downstream queries.

In the same way TIMESTAMP should change the rowtime used to set the Kafka record's timestamp. This would also mean that when the stream is used by downstream statements its ROWTIME would correctly be set to match the column specified in the TIMESTAMP property.

@apurvam
Copy link
Contributor

apurvam commented Feb 5, 2020

What exactly is the proposed change? To use a new property or to redefine what the existing WITH property means? If the latter, it is going to be a breaking change right?

@big-andy-coates
Copy link
Contributor Author

big-andy-coates commented Feb 5, 2020

Redefine what the current one does.

Is it a breaking change? Not really.

In terms of KSQL functionality and how it processes data nothing will change.

The only thing that changes is the ts on the sink topic is now correctly set.

Nothing changes for KSQL processing as it’s just a change from changing the ts on the way back in, i.e. when downstream queries read the sink topic, to on the way out, I.e. when writing to the sink topic.

Personally, I see this as a bug fix.

@apurvam
Copy link
Contributor

apurvam commented Feb 6, 2020

But this changes the contents of what's stored in the sink topics right?

If I understand correctly, today a given sink topic T for some CSAS or CTAS statement will set the timestamp of the records to the timestamp of the records in input topic. After the upgrade to the version with the present patch, the messages in topic T will have the timestamp set to the field specified in the WITH clause. Is this true, or am I misunderstanding?

This may not affect the behavior of other KSQL queries reading the output topic T, but it will affect non ksql consumers right?

@spena
Copy link
Member

spena commented Feb 7, 2020

Correct. After the upgrade, the messages in topic T will have the timestamp set to the field specified in the WITH clause. Note that only the CSAS/CTAS statements that use the WITH TIMESTAMP clause will be affected.

By the way, while debugging the code, I saw that the WITH TIMESTAMP in the CSAS/CTAS is used neither for writing or reading the input topics. The input topics timestamps use the one specified in the input stream, so users are getting a bug when using this WITH in the CSAS.

@apurvam
Copy link
Contributor

apurvam commented Feb 7, 2020

I think if this changes the contents of the output topic with the same set of input queries, it is a backward incompatible change. I believe our QTT tests also failed because the output of the new topologies didn't match the output from the old ones.

Should we use the compatibility breaking config defs to preserve the old behavior for old queries?

@spena
Copy link
Member

spena commented Feb 10, 2020

Yes, compatibility breaking config should be good to add here as well. What would the default behavior be, then?

If the option is not to break compatibility to current uses, then I say keep the option disabled for now. Users can enable it if find this issue. And we could enable it by default in a major ksqldb release?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging a pull request may close this issue.

4 participants