Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KSQL partitions by column in source when there is a name collision with a column in the sink #2525

Closed
apurvam opened this issue Mar 5, 2019 · 0 comments · Fixed by #2735
Closed
Assignees
Labels

Comments

@apurvam
Copy link
Contributor

apurvam commented Mar 5, 2019

Setup:

ksql> create stream TEST_1 (a varchar, b varchar)  \
  WITH (KAFKA_TOPIC='TEST_PART_1', value_format='delimited', \
  key='a');

 Message
----------------
 Stream created
----------------
ksql> create stream TEST_2 WITH (KAFKA_TOPIC='TEST_PART_2', \
  value_format='delimited') as select a + '_NEW' as a, b \
  from TEST_1 partition by a;

 Message
----------------------------
 Stream created and running
----------------------------

Produce to TEST_PART_1:

$ kafkacat -b localhost:9092 -t TEST_PART_1 -P -K:
C:C,D
E:E,F

See that the key in stream TEST_2 is the original value from TEST_1, not the modified value:

ksql> create stream TEST_2 WITH (KAFKA_TOPIC='TEST_PART_2', \
  value_format='delimited') as select a + '_NEW' as a, b \
  from TEST_1 partition by a;

 Message
----------------------------
 Stream created and running
----------------------------
ksql> select * from TEST_2;
1551824886814 | C | C_NEW | D
1551824930017 | E | E_NEW | F

As we can see, the query TEST_2 is picking the key column a from the source, rather than the modified key column with the same name a from the TEST_2 select statement.

If we modify the name of a in TEST_2 to a_new, and partition by a_new, then it behaves as expected:

ksql> create stream TEST_3 WITH (KAFKA_TOPIC='TEST_PART_3', \
   value_format='delimited') as select a + '_NEW' as a_new, b \
   from TEST_1 partition by a_new;

 Message
----------------------------
 Stream created and running
----------------------------
ksql> select * from TEST_3;
1551825264867 | K_NEW | K_NEW | L

That produces the correct key for the input:

$ kafkacat -b localhost:9092 -t TEST_PART_1 -P -K:
K:K,L

This is odd default behavior. We should always be choosing the key from the columns in the sink rather than the source when there are name collisions.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants