Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

RUN SCRIPT - Provide way to wait for a stream/topic to exist before continuing #2880

Closed
rmoff opened this issue May 24, 2019 · 5 comments
Closed
Labels
code-lifecycle-ux Issues relating to the lifecycle of developing KSQL applications and deploying them to Production user-experience

Comments

@rmoff
Copy link
Contributor

rmoff commented May 24, 2019

I've got a CSAS which writes to a topic on which I want to declare a table. Standard re-key requirement. Problem is if I run the CSAS and CT straight after it, the CSAS hasn't yet populated the topic and the CT fails.

From the interactive CLI I workaround this by using a SELECT … LIMIT 1 which forces execution to wait until there is a message read from the new stream and therefore the topic exists and my CT will work.

CREATE STREAM FOO WITH (KAFKA_TOPIC='test', VALUE_FORMAT='AVRO');
CREATE STREAM FOO_REKEY AS SELECT * FROM FOO PARTITION BY BAR;
SELECT * FROM FOO_REKEY LIMIT 1;
CREATE TABLE FOO_T WITH (KAFKA_TOPIC='FOO_REKEY', VALUE_FORMAT='AVRO');

But if I bundle this into a script to call with RUN SCRIPT it fails because the SELECT is rejected.

SELECT and PRINT queries must use the /query endpoint

How can I automate the deployment of this without having to run it manually from the CLI?

@rmoff rmoff added code-lifecycle-ux Issues relating to the lifecycle of developing KSQL applications and deploying them to Production user-experience labels May 24, 2019
@rmoff
Copy link
Contributor Author

rmoff commented Jun 11, 2019

@vcrfxia since you asked about this on Slack I'm tagging you here :)

Here's an example of the problem.

Code is https://github.com/confluentinc/demo-scene/tree/blog-idea-may19/rail-data-streaming-pipeline/data/ksql/01_corpus_location

When I run it this happens:

ksql> RUN SCRIPT '/data/ksql/01_corpus_location/01_corpus.ksql';

CREATE STREAM CORPUS_RAW (NLCDESC VARCHAR, NLC VARCHAR, TIPLOC VARCHAR, 3ALPHA VARCHAR, STANOX VARCHAR, NLCDESC16 VARCHAR, UIC VARCHAR) WITH (KAFKA_TOPIC='corpus', VALUE_FORMAT='JSON');
 Message
----------------
 Stream created
----------------

CREATE STREAM CORPUS_BY_STANOX
  WITH (VALUE_FORMAT='AVRO') AS
  SELECT NLCDESC,
         STANOX
    FROM CORPUS_RAW
  PARTITION BY STANOX;
 Message
----------------------------
 Stream created and running
----------------------------
ksql> RUN SCRIPT '/data/ksql/01_corpus_location/02_stanox.ksql';
Avro schema for message values on topic CORPUS_BY_STANOX does not exist in the Schema Registry.
Subject: CORPUS_BY_STANOX-value

Possible causes include:
- The topic itself does not exist
        -> Use SHOW TOPICS; to check
- Messages on the topic are not Avro serialized
        -> Use PRINT 'CORPUS_BY_STANOX' FROM BEGINNING; to verify
- Messages on the topic have not been serialized using the Confluent Schema Registry Avro serializer
        -> See https://docs.confluent.io/current/schema-registry/docs/serializer-formatter.html
- The schema is registered on a different instance of the Schema Registry
        -> Use the REST API to list available subjects
        https://docs.confluent.io/current/schema-registry/docs/api.html#get--subjects

If I re-run the same thing straight after, it works (because the topic now exists):

ksql> RUN SCRIPT '/data/ksql/01_corpus_location/02_stanox.ksql';

 Message
---------------
 Table created
---------------
ksql>

/cc @MichaelDrogalis this is always what I was trying to describe on our call yesterday :)

@vcrfxia
Copy link
Contributor

vcrfxia commented Jun 11, 2019

Two things:

  1. As of KSQL 5.2, when submitting a CSAS followed by a CT on the topic created by the CSAS, you shouldn't need the SELECT … LIMIT 1 workaround that you described, since statements submitted through the CLI wait for previous statements to finish executing by default (see Make CLI requests wait for last command sequence number #2280).

  2. The above is also true (as of KSQL 5.2) for submitting a CSAS followed by a CT through RUN SCRIPT through the CLI. For example, I can submit the following script through RUN SCRIPT perfectly fine (without the topic FOO previously existing):

CREATE STREAM FOO AS SELECT * FROM KSQL_PROCESSING_LOG;
CREATE TABLE FOO_T (id VARCHAR) WITH (KAFKA_TOPIC='FOO', VALUE_FORMAT='JSON', KEY='id');

From looking at your scripts, you've separated the CSAS and CT into two separate script files. I haven't gotten a chance to dig into the code to see whether that would make a difference or not yet, but I'm curious to know whether your commands work when they're submitted as part of the same script, since I believe they should.

@rmoff
Copy link
Contributor Author

rmoff commented Jun 11, 2019

On 5.2.1, I can't get either of these to work.

As part of script

I moved the CT into the first script - it doesn't work:

ksql> RUN SCRIPT '/data/ksql/01_corpus_location/01_corpus.ksql';
Avro schema for message values on topic CORPUS_BY_STANOX does not exist in the Schema Registry.
Subject: CORPUS_BY_STANOX-value

Possible causes include:
- The topic itself does not exist
        -> Use SHOW TOPICS; to check
- Messages on the topic are not Avro serialized
        -> Use PRINT 'CORPUS_BY_STANOX' FROM BEGINNING; to verify
- Messages on the topic have not been serialized using the Confluent Schema Registry Avro serializer
        -> See https://docs.confluent.io/current/schema-registry/docs/serializer-formatter.html
- The schema is registered on a different instance of the Schema Registry
        -> Use the REST API to list available subjects
        https://docs.confluent.io/current/schema-registry/docs/api.html#get--subjects

From CLI

Fails the first time run:

ksql> CREATE STREAM CORPUS_RAW (NLCDESC VARCHAR, NLC VARCHAR, TIPLOC VARCHAR, 3ALPHA VARCHAR, STANOX VARCHAR, NLCDESC16 VARCHAR, UIC VARCHAR) WITH (KAFKA_TOPIC='corpus', VALUE_FORMAT='JSON');

 Message
----------------
 Stream created
----------------
ksql>
ksql> SET 'auto.offset.reset' = 'earliest';
Successfully changed local property 'auto.offset.reset' to 'earliest'. Use the UNSET command to revert your change.
ksql> CREATE STREAM CORPUS_BY_STANOX
>  WITH (VALUE_FORMAT='AVRO') AS
>  SELECT NLCDESC,
>         STANOX
>    FROM CORPUS_RAW
>  PARTITION BY STANOX;

 Message
----------------------------
 Stream created and running
----------------------------
ksql>
ksql> CREATE TABLE STANOX WITH (KAFKA_TOPIC='CORPUS_BY_STANOX', VALUE_FORMAT='AVRO', KEY='STANOX');
Avro schema for message values on topic CORPUS_BY_STANOX does not exist in the Schema Registry.
Subject: CORPUS_BY_STANOX-value

Possible causes include:
- The topic itself does not exist
        -> Use SHOW TOPICS; to check
- Messages on the topic are not Avro serialized
        -> Use PRINT 'CORPUS_BY_STANOX' FROM BEGINNING; to verify
- Messages on the topic have not been serialized using the Confluent Schema Registry Avro serializer
        -> See https://docs.confluent.io/current/schema-registry/docs/serializer-formatter.html
- The schema is registered on a different instance of the Schema Registry
        -> Use the REST API to list available subjects
        https://docs.confluent.io/current/schema-registry/docs/api.html#get--subjects

Works when re-run straight after;

ksql> CREATE TABLE STANOX WITH (KAFKA_TOPIC='CORPUS_BY_STANOX', VALUE_FORMAT='AVRO', KEY='STANOX');

 Message
---------------
 Table created
---------------
ksql>

@vcrfxia
Copy link
Contributor

vcrfxia commented Jun 11, 2019

I see, apologies for not having read your example carefully enough @rmoff . The troubles you are experiencing are specific to using Avro schema inference (see discussion in #1394). The workaround is to specify the schema for the table yourself, rather than relying on KSQL's schema inference. For example, I have verified that I can submit the following script via RUN SCRIPT successfully:

CREATE STREAM CORPUS_RAW (NLCDESC VARCHAR, NLC VARCHAR, TIPLOC VARCHAR, 3ALPHA VARCHAR, STANOX VARCHAR, NLCDESC16 VARCHAR, UIC VARCHAR) WITH (KAFKA_TOPIC='corpus', VALUE_FORMAT='JSON');
CREATE STREAM CORPUS_BY_STANOX WITH (VALUE_FORMAT='AVRO') AS SELECT NLCDESC, STANOX FROM CORPUS_RAW PARTITION BY STANOX;
CREATE TABLE STANOX (NLCDESC VARCHAR, STANOX VARCHAR) WITH (KAFKA_TOPIC='CORPUS_BY_STANOX', VALUE_FORMAT='AVRO', KEY='STANOX');

@rmoff
Copy link
Contributor Author

rmoff commented Jun 14, 2019

Thanks @vcrfxia. Let's track in #1394.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
code-lifecycle-ux Issues relating to the lifecycle of developing KSQL applications and deploying them to Production user-experience
Projects
None yet
Development

No branches or pull requests

2 participants