Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

INSERT INTO doesn't work if the target stream's name doesn't match the underlying topic ("Sink topic …does not exist in th e metastore.") #2123

Closed
rmoff opened this issue Nov 6, 2018 · 2 comments

Comments

@rmoff
Copy link
Member

rmoff commented Nov 6, 2018

Populate two topics (using this docker-compose file)

docker run --rm -it --network docker-compose_default confluentinc/cp-kafkacat kafkacat -b kafka:29092 -t sourceTopic1 -P << EOF
{"category":"Foo1","source":"bar", "type": "x","id" : "myid1", "payload":"asdf"}
EOF

docker run --rm -it --network docker-compose_default confluentinc/cp-kafkacat kafkacat -b kafka:29092 -t sourceTopic2 -P << EOF
{"category":"Foo2","source":"bar", "type": "x","id" : "myid2", "payload":"asdf"}
EOF

Register the two topics as STREAMs:

SET 'auto.offset.reset' = 'earliest';

CREATE STREAM sourceTopic1Stream (category varchar, source varchar, type varchar, id varchar, payload varchar) WITH (KAFKA_TOPIC='sourceTopic1', VALUE_FORMAT='json');
CREATE STREAM sourceTopic2Stream (category varchar, source varchar, type varchar, id varchar, payload varchar) WITH (KAFKA_TOPIC='sourceTopic2', VALUE_FORMAT='json');

Problem: INSERT INTO only works if the target CSAS has the same name as the underlying topic. Otherwise it fails:

ksql> CREATE STREAM TargetStream01 WITH (kafka_topic='FOOBARWIBBLE', partitions=3) AS select 'sourceTopic1' topicname, category, source, type, id, payload from sourceTopic1Stream where id like 'myid%';

 Message
----------------------------
 Stream created and running
----------------------------
ksql> INSERT INTO TargetStream01 SELECT 'sourceTopic2' AS topicname, * FROM sourceTopic2Stream where id like 'myid%';
io.confluent.ksql.util.KsqlException: Sink topic TARGETSTREAM01 does not exist in th e metastore.
ksql>

(also note typo s/th e/the)

Also doesn't work if the topic & stream name are the same but the topic is mixed case:

ksql> CREATE STREAM foobar WITH (kafka_topic='foobar', partitions=3) AS select 'sourceTopic1' topicname, category, source, type, id, payload from sourceTopic1Stream where id like 'myid%';

 Message
----------------------------
 Stream created and running
----------------------------
ksql> INSERT INTO foobar SELECT 'sourceTopic2' AS topicname, * FROM sourceTopic2Stream where id like 'myid%';
io.confluent.ksql.util.KsqlException: Sink topic FOOBAR does not exist in th e metastore.
ksql>

Current fix: don't specify kafka_topic and just use the stream name.

ksql>
ksql> CREATE STREAM TargetStream WITH (partitions=3) AS select 'sourceTopic1' topicname, category, source, type, id, payload from sourceTopic1Stream where id like 'myid%';

 Message
----------------------------
 Stream created and running
----------------------------
ksql> INSERT INTO TargetStream SELECT 'sourceTopic2' AS topicname, * FROM sourceTopic2Stream where id like 'myid%';

 Message
-------------------------------
 Insert Into query is running.
-------------------------------
ksql> SELECT * FROM TargetStream;
1541496149897 | null | sourceTopic2 | Foo2 | bar: | x | myid2 | asdf
1541496141671 | null | sourceTopic1 | Foo1 | bar: | x | myid1 | asdf
@apurvam
Copy link
Contributor

apurvam commented Nov 30, 2018

Isn't this fixed by #2149 @hjafarpour ?

@rmoff rmoff changed the title INSERT INTO doesn't work if the target stream's name doesn't match the underlying topic ("Sink topic …does not exist in th e metastore.") INSERT INTO doesn't work if the target stream's name doesn't match the underlying topic ("Sink topic …does not exist in the metastore.") Feb 5, 2019
@rmoff rmoff changed the title INSERT INTO doesn't work if the target stream's name doesn't match the underlying topic ("Sink topic …does not exist in the metastore.") INSERT INTO doesn't work if the target stream's name doesn't match the underlying topic ("Sink topic …does not exist in th e metastore.") Feb 5, 2019
@rmoff
Copy link
Member Author

rmoff commented Feb 5, 2019

Fixed in 5.1

ksql> CREATE STREAM TargetStream01 WITH (kafka_topic='FOOBARWIBBLE', partitions=3) AS select 'sourceTopic1' topicname, category, source, type, id, payload from sourceTopic1Stream where id like 'myid%';

 Message
----------------------------
 Stream created and running
----------------------------
ksql> INSERT INTO TargetStream01 SELECT 'sourceTopic2' AS topicname, * FROM sourceTopic2Stream where id like 'myid%';

 Message
-------------------------------
 Insert Into query is running.
-------------------------------
ksql>
ksql> select * from targetstream01;
1549375509196 | null | sourceTopic1 | Foo1 | bar | x | myid1 | asdf
1549375512959 | null | sourceTopic2 | Foo2 | bar | x | myid2 | asdf

@rmoff rmoff closed this as completed Feb 5, 2019
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

2 participants