Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

join should preserve key column for result stream #2470

Closed
rodesai opened this issue Feb 21, 2019 · 2 comments · Fixed by #2735
Closed

join should preserve key column for result stream #2470

rodesai opened this issue Feb 21, 2019 · 2 comments · Fixed by #2735

Comments

@rodesai
Copy link
Contributor

rodesai commented Feb 21, 2019

Take the following series of statements:

create stream s1 (k string, v string) with (kafka_topic='s1', value_format='json', key='k');
create table t1 (k string, v string) with kafka_topic='t1', value_format='json', key='k');
create table t2 (k string, v string) with (kafka_topic='t2', value_format='json', key='k');
create stream s1t1 as select s1.k, s1.v, t1.v from s1 join t1 on s1.k = t1.k;
create stream s1t1t2 as select s1_k, s1_v, t1_v, t2.v from s1t1 join t2 on s1t1.s1_k=t2.k;

The first join has no repartition:

ksql> explain CSAS_S1T1_0;

ID                   : CSAS_S1T1_0
SQL                  : create stream s1t1 as select s1.k, s1.v, t1.v from s1 join t1 on s1.k = t1.k;

 Field   | Type
-------------------------------------
 ROWTIME | BIGINT           (system)
 ROWKEY  | VARCHAR(STRING)  (system)
 S1_K    | VARCHAR(STRING)
 S1_V    | VARCHAR(STRING)
 T1_V    | VARCHAR(STRING)
-------------------------------------

Sources that this query reads from:
-----------------------------------
T1
S1

For source description please run: DESCRIBE [EXTENDED] <SourceId>

Sinks that this query writes to:
-----------------------------------
S1T1

For sink description please run: DESCRIBE [EXTENDED] <SinkId>

Execution plan
--------------
 > [ SINK ] | Schema: [S1_K : VARCHAR, S1_V : VARCHAR, T1_V : VARCHAR] | Logger: CSAS_S1T1_0.S1T1
		 > [ PROJECT ] | Schema: [S1_K : VARCHAR, S1_V : VARCHAR, T1_V : VARCHAR] | Logger: CSAS_S1T1_0.Project
				 > [ JOIN ] | Schema: [S1.ROWTIME : BIGINT, S1.ROWKEY : VARCHAR, S1.K : VARCHAR, S1.V : VARCHAR, T1.ROWTIME : BIGINT, T1.ROWKEY : VARCHAR, T1.K : VARCHAR, T1.V : VARCHAR] | Logger: CSAS_S1T1_0.Join
						 > [ SOURCE ] | Schema: [ROWTIME : BIGINT, ROWKEY : VARCHAR, K : VARCHAR, V : VARCHAR] | Logger: CSAS_S1T1_0.KafkaTopic_Left
						 > [ SOURCE ] | Schema: [ROWTIME : BIGINT, ROWKEY : VARCHAR, K : VARCHAR, V : VARCHAR] | Logger: CSAS_S1T1_0.KafkaTopic_Right


Processing topology
-------------------
Topologies:
   Sub-topology: 0
    Source: KSTREAM-SOURCE-0000000005 (topics: [s1])
      --> KSTREAM-MAPVALUES-0000000006
    Processor: KSTREAM-MAPVALUES-0000000006 (stores: [])
      --> KSTREAM-TRANSFORMVALUES-0000000007
      <-- KSTREAM-SOURCE-0000000005
    Source: KSTREAM-SOURCE-0000000000 (topics: [t1])
      --> KSTREAM-MAPVALUES-0000000001
    Processor: KSTREAM-TRANSFORMVALUES-0000000007 (stores: [])
      --> KSTREAM-JOIN-0000000008
      <-- KSTREAM-MAPVALUES-0000000006
    Processor: KSTREAM-JOIN-0000000008 (stores: [KafkaTopic_Right-reduce])
      --> KSTREAM-MAPVALUES-0000000009
      <-- KSTREAM-TRANSFORMVALUES-0000000007
    Processor: KSTREAM-MAPVALUES-0000000001 (stores: [])
      --> KSTREAM-TRANSFORMVALUES-0000000002
      <-- KSTREAM-SOURCE-0000000000
    Processor: KSTREAM-MAPVALUES-0000000009 (stores: [])
      --> KSTREAM-MAPVALUES-0000000010
      <-- KSTREAM-JOIN-0000000008
    Processor: KSTREAM-TRANSFORMVALUES-0000000002 (stores: [])
      --> KSTREAM-MAPVALUES-0000000003
      <-- KSTREAM-MAPVALUES-0000000001
    Processor: KSTREAM-MAPVALUES-0000000003 (stores: [])
      --> KSTREAM-AGGREGATE-0000000004
      <-- KSTREAM-TRANSFORMVALUES-0000000002
    Processor: KSTREAM-MAPVALUES-0000000010 (stores: [])
      --> KSTREAM-SINK-0000000011
      <-- KSTREAM-MAPVALUES-0000000009
    Processor: KSTREAM-AGGREGATE-0000000004 (stores: [KafkaTopic_Right-reduce])
      --> none
      <-- KSTREAM-MAPVALUES-0000000003
    Sink: KSTREAM-SINK-0000000011 (topic: S1T1)
      <-- KSTREAM-MAPVALUES-0000000010

The second one does:

ksql> explain CSAS_S1T1T2_1;

ID                   : CSAS_S1T1T2_1
SQL                  : create stream s1t1t2 as select s1_k, s1_v, t1_v, t2.v from s1t1 join t2 on s1t1.s1_k=t2.k;

 Field   | Type
-------------------------------------
 ROWTIME | BIGINT           (system)
 ROWKEY  | VARCHAR(STRING)  (system)
 S1_K    | VARCHAR(STRING)
 S1_V    | VARCHAR(STRING)
 T1_V    | VARCHAR(STRING)
 V       | VARCHAR(STRING)
-------------------------------------

Sources that this query reads from:
-----------------------------------
S1T1
T2

For source description please run: DESCRIBE [EXTENDED] <SourceId>

Sinks that this query writes to:
-----------------------------------
S1T1T2

For sink description please run: DESCRIBE [EXTENDED] <SinkId>

Execution plan
--------------
 > [ SINK ] | Schema: [S1_K : VARCHAR, S1_V : VARCHAR, T1_V : VARCHAR, V : VARCHAR] | Logger: CSAS_S1T1T2_1.S1T1T2
		 > [ PROJECT ] | Schema: [S1_K : VARCHAR, S1_V : VARCHAR, T1_V : VARCHAR, V : VARCHAR] | Logger: CSAS_S1T1T2_1.Project
				 > [ JOIN ] | Schema: [S1T1.ROWTIME : BIGINT, S1T1.ROWKEY : VARCHAR, S1T1.S1_K : VARCHAR, S1T1.S1_V : VARCHAR, S1T1.T1_V : VARCHAR, T2.ROWTIME : BIGINT, T2.ROWKEY : VARCHAR, T2.K : VARCHAR, T2.V : VARCHAR] | Logger: CSAS_S1T1T2_1.Join
						 > [ REKEY ] | Schema: [ROWTIME : BIGINT, ROWKEY : VARCHAR, S1_K : VARCHAR, S1_V : VARCHAR, T1_V : VARCHAR] | Logger: CSAS_S1T1T2_1.Join
								 > [ SOURCE ] | Schema: [ROWTIME : BIGINT, ROWKEY : VARCHAR, S1_K : VARCHAR, S1_V : VARCHAR, T1_V : VARCHAR] | Logger: CSAS_S1T1T2_1.KafkaTopic_Left
						 > [ SOURCE ] | Schema: [ROWTIME : BIGINT, ROWKEY : VARCHAR, K : VARCHAR, V : VARCHAR] | Logger: CSAS_S1T1T2_1.KafkaTopic_Right


Processing topology
-------------------
Topologies:
   Sub-topology: 0
    Source: KSTREAM-SOURCE-0000000000 (topics: [t2])
      --> KSTREAM-MAPVALUES-0000000001
    Source: KSTREAM-SOURCE-0000000013 (topics: [Join-repartition])
      --> KSTREAM-JOIN-0000000014
    Processor: KSTREAM-JOIN-0000000014 (stores: [KafkaTopic_Right-reduce])
      --> KSTREAM-MAPVALUES-0000000015
      <-- KSTREAM-SOURCE-0000000013
    Processor: KSTREAM-MAPVALUES-0000000001 (stores: [])
      --> KSTREAM-TRANSFORMVALUES-0000000002
      <-- KSTREAM-SOURCE-0000000000
    Processor: KSTREAM-MAPVALUES-0000000015 (stores: [])
      --> KSTREAM-MAPVALUES-0000000016
      <-- KSTREAM-JOIN-0000000014
    Processor: KSTREAM-TRANSFORMVALUES-0000000002 (stores: [])
      --> KSTREAM-MAPVALUES-0000000003
      <-- KSTREAM-MAPVALUES-0000000001
    Processor: KSTREAM-MAPVALUES-0000000003 (stores: [])
      --> KSTREAM-AGGREGATE-0000000004
      <-- KSTREAM-TRANSFORMVALUES-0000000002
    Processor: KSTREAM-MAPVALUES-0000000016 (stores: [])
      --> KSTREAM-SINK-0000000017
      <-- KSTREAM-MAPVALUES-0000000015
    Processor: KSTREAM-AGGREGATE-0000000004 (stores: [KafkaTopic_Right-reduce])
      --> none
      <-- KSTREAM-MAPVALUES-0000000003
    Sink: KSTREAM-SINK-0000000017 (topic: S1T1T2)
      <-- KSTREAM-MAPVALUES-0000000016

  Sub-topology: 1
    Source: KSTREAM-SOURCE-0000000005 (topics: [S1T1])
      --> KSTREAM-MAPVALUES-0000000006
    Processor: KSTREAM-MAPVALUES-0000000006 (stores: [])
      --> KSTREAM-TRANSFORMVALUES-0000000007
      <-- KSTREAM-SOURCE-0000000005
    Processor: KSTREAM-TRANSFORMVALUES-0000000007 (stores: [])
      --> KSTREAM-FILTER-0000000008
      <-- KSTREAM-MAPVALUES-0000000006
    Processor: KSTREAM-FILTER-0000000008 (stores: [])
      --> KSTREAM-KEY-SELECT-0000000009
      <-- KSTREAM-TRANSFORMVALUES-0000000007
    Processor: KSTREAM-KEY-SELECT-0000000009 (stores: [])
      --> KSTREAM-MAPVALUES-0000000010
      <-- KSTREAM-FILTER-0000000008
    Processor: KSTREAM-MAPVALUES-0000000010 (stores: [])
      --> KSTREAM-FILTER-0000000012
      <-- KSTREAM-KEY-SELECT-0000000009
    Processor: KSTREAM-FILTER-0000000012 (stores: [])
      --> KSTREAM-SINK-0000000011
      <-- KSTREAM-MAPVALUES-0000000010
    Sink: KSTREAM-SINK-0000000011 (topic: Join-repartition)
      <-- KSTREAM-FILTER-0000000012

The second join doesn't need to repartition though, since the key has not changed and is the join key.

Note that this can be avoided by an explicit partition by:

create stream s2 (k string, v string) with (kafka_topic='s2', value_format='json', key='k');
create stream s2t1 as select s2.k, s2.v, t1.v from s2 join t1 on s2.k = t1.k partition by s2_k;
create stream s2t1t2 as select s2_k, s2_v, t1_v, t2.v from s2t1 join t2 on s2t1.s2_k=t2.k;
explain CSAS_S2T1T2_3;

ID                   : CSAS_S2T1T2_3
SQL                  : create stream s2t1t2 as select s2_k, s2_v, t1_v, t2.v from s2t1 join t2 on s2t1.s2_k=t2.k;

 Field   | Type
-------------------------------------
 ROWTIME | BIGINT           (system)
 ROWKEY  | VARCHAR(STRING)  (system)
 S2_K    | VARCHAR(STRING)
 S2_V    | VARCHAR(STRING)
 T1_V    | VARCHAR(STRING)
 V       | VARCHAR(STRING)
-------------------------------------

Sources that this query reads from:
-----------------------------------
S2T1
T2

For source description please run: DESCRIBE [EXTENDED] <SourceId>

Sinks that this query writes to:
-----------------------------------
S2T1T2

For sink description please run: DESCRIBE [EXTENDED] <SinkId>

Execution plan
--------------
 > [ SINK ] | Schema: [S2_K : VARCHAR, S2_V : VARCHAR, T1_V : VARCHAR, V : VARCHAR] | Logger: CSAS_S2T1T2_3.S2T1T2
		 > [ PROJECT ] | Schema: [S2_K : VARCHAR, S2_V : VARCHAR, T1_V : VARCHAR, V : VARCHAR] | Logger: CSAS_S2T1T2_3.Project
				 > [ JOIN ] | Schema: [S2T1.ROWTIME : BIGINT, S2T1.ROWKEY : VARCHAR, S2T1.S2_K : VARCHAR, S2T1.S2_V : VARCHAR, S2T1.T1_V : VARCHAR, T2.ROWTIME : BIGINT, T2.ROWKEY : VARCHAR, T2.K : VARCHAR, T2.V : VARCHAR] | Logger: CSAS_S2T1T2_3.Join
						 > [ SOURCE ] | Schema: [ROWTIME : BIGINT, ROWKEY : VARCHAR, S2_K : VARCHAR, S2_V : VARCHAR, T1_V : VARCHAR] | Logger: CSAS_S2T1T2_3.KafkaTopic_Left
						 > [ SOURCE ] | Schema: [ROWTIME : BIGINT, ROWKEY : VARCHAR, K : VARCHAR, V : VARCHAR] | Logger: CSAS_S2T1T2_3.KafkaTopic_Right


Processing topology
-------------------
Topologies:
   Sub-topology: 0
    Source: KSTREAM-SOURCE-0000000005 (topics: [S2T1])
      --> KSTREAM-MAPVALUES-0000000006
    Processor: KSTREAM-MAPVALUES-0000000006 (stores: [])
      --> KSTREAM-TRANSFORMVALUES-0000000007
      <-- KSTREAM-SOURCE-0000000005
    Source: KSTREAM-SOURCE-0000000000 (topics: [t2])
      --> KSTREAM-MAPVALUES-0000000001
    Processor: KSTREAM-TRANSFORMVALUES-0000000007 (stores: [])
      --> KSTREAM-JOIN-0000000008
      <-- KSTREAM-MAPVALUES-0000000006
    Processor: KSTREAM-JOIN-0000000008 (stores: [KafkaTopic_Right-reduce])
      --> KSTREAM-MAPVALUES-0000000009
      <-- KSTREAM-TRANSFORMVALUES-0000000007
    Processor: KSTREAM-MAPVALUES-0000000001 (stores: [])
      --> KSTREAM-TRANSFORMVALUES-0000000002
      <-- KSTREAM-SOURCE-0000000000
    Processor: KSTREAM-MAPVALUES-0000000009 (stores: [])
      --> KSTREAM-MAPVALUES-0000000010
      <-- KSTREAM-JOIN-0000000008
    Processor: KSTREAM-TRANSFORMVALUES-0000000002 (stores: [])
      --> KSTREAM-MAPVALUES-0000000003
      <-- KSTREAM-MAPVALUES-0000000001
    Processor: KSTREAM-MAPVALUES-0000000003 (stores: [])
      --> KSTREAM-AGGREGATE-0000000004
      <-- KSTREAM-TRANSFORMVALUES-0000000002
    Processor: KSTREAM-MAPVALUES-0000000010 (stores: [])
      --> KSTREAM-SINK-0000000011
      <-- KSTREAM-MAPVALUES-0000000009
    Processor: KSTREAM-AGGREGATE-0000000004 (stores: [KafkaTopic_Right-reduce])
      --> none
      <-- KSTREAM-MAPVALUES-0000000003
    Sink: KSTREAM-SINK-0000000011 (topic: S2T1T2)
      <-- KSTREAM-MAPVALUES-0000000010

The underlying cause is that StructuredDataOutputNode sets the key field to the fully qualified source field name (s1.k), while the actual schema uses the projected names (s1_k).

@rodesai rodesai changed the title stream-table join should preserve key column for result stream join should preserve key column for result stream Feb 21, 2019
@big-andy-coates
Copy link
Contributor

Hey @rodesai, is this related to the tests I added in #2574 to key-fields.json?

@big-andy-coates
Copy link
Contributor

This should be fixed by all the work on key-fields, including #2735

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants