From 2ace6488210fd14df5b979d61c743afe68256ef1 Mon Sep 17 00:00:00 2001 From: Andy Coates Date: Wed, 8 Jan 2020 11:30:34 +0000 Subject: [PATCH] fix: better error message on self-join Fixes: https://github.com/confluentinc/ksql/issues/4241 Self-joins are not yet supported. Previously they resulted in an confusing error message: > Invalid topology: Topic has already been registered by another source. They not result in: > Can not join 'something' to 'something': self joins are not yet supported. --- .../io/confluent/ksql/analyzer/Analyzer.java | 8 ++++++ .../ksql/analyzer/AnalyzerFunctionalTest.java | 28 ++++++++++++++++--- .../query-validation-tests/joins.json | 11 ++++++++ 3 files changed, 43 insertions(+), 4 deletions(-) diff --git a/ksql-engine/src/main/java/io/confluent/ksql/analyzer/Analyzer.java b/ksql-engine/src/main/java/io/confluent/ksql/analyzer/Analyzer.java index cb23dec9a5e2..7f0c174529ce 100644 --- a/ksql-engine/src/main/java/io/confluent/ksql/analyzer/Analyzer.java +++ b/ksql-engine/src/main/java/io/confluent/ksql/analyzer/Analyzer.java @@ -341,6 +341,14 @@ protected AstNode visitJoin(final Join node, final Void context) { throw new KsqlException("Only equality join criteria is supported."); } + if (left.getDataSource().getName().equals(right.getDataSource().getName())) { + throw new KsqlException( + "Can not join '" + left.getDataSource().getName().toString(FormatOptions.noEscape()) + + "' to '" + right.getDataSource().getName().toString(FormatOptions.noEscape()) + + "': self joins are not yet supported." + ); + } + final ColumnRef leftJoinField = getJoinFieldName( comparisonExpression, left.getAlias(), diff --git a/ksql-engine/src/test/java/io/confluent/ksql/analyzer/AnalyzerFunctionalTest.java b/ksql-engine/src/test/java/io/confluent/ksql/analyzer/AnalyzerFunctionalTest.java index e9c5ad74986b..4c8587a61d2c 100644 --- a/ksql-engine/src/test/java/io/confluent/ksql/analyzer/AnalyzerFunctionalTest.java +++ b/ksql-engine/src/test/java/io/confluent/ksql/analyzer/AnalyzerFunctionalTest.java @@ -106,7 +106,7 @@ public class AnalyzerFunctionalTest { public final ExpectedException expectedException = ExpectedException.none(); @Mock - private SerdeOptionsSupplier serdeOptiponsSupplier; + private SerdeOptionsSupplier serdeOptionsSupplier; @Mock private Sink sink; @@ -127,7 +127,7 @@ public void init() { jsonMetaStore, "", DEFAULT_SERDE_OPTIONS, - serdeOptiponsSupplier + serdeOptionsSupplier ); when(sink.getName()).thenReturn(SourceName.of("TEST0")); @@ -388,7 +388,7 @@ public void shouldFailIfExplicitNamespaceIsProvidedButEmpty() { public void shouldGetSerdeOptions() { // Given: final Set serdeOptions = ImmutableSet.of(SerdeOption.UNWRAP_SINGLE_VALUES); - when(serdeOptiponsSupplier.build(any(), any(), any(), any())).thenReturn(serdeOptions); + when(serdeOptionsSupplier.build(any(), any(), any(), any())).thenReturn(serdeOptions); givenSinkValueFormat(Format.AVRO); givenWrapSingleValues(true); @@ -397,7 +397,7 @@ public void shouldGetSerdeOptions() { final Analysis result = analyzer.analyze(query, Optional.of(sink)); // Then: - verify(serdeOptiponsSupplier).build( + verify(serdeOptionsSupplier).build( ImmutableList.of("COL0", "COL1").stream().map(ColumnName::of).collect(Collectors.toList()), Format.AVRO, Optional.of(true), @@ -484,6 +484,26 @@ public void shouldNotIncludeMetaColumnsForSelectStartOnStaticQueries() { ))); } + @Test + public void shouldThrowOnSelfJoin() { + // Given: + final CreateStreamAsSelect createStreamAsSelect = parseSingle( + "CREATE STREAM FOO AS " + + "SELECT * FROM test1 t1 JOIN test1 t2 ON t1.rowkey = t2.rowkey;" + ); + + final Query query = createStreamAsSelect.getQuery(); + + final Analyzer analyzer = new Analyzer(jsonMetaStore, "", DEFAULT_SERDE_OPTIONS); + + // Expect: + expectedException.expect(KsqlException.class); + expectedException.expectMessage("Can not join 'TEST1' to 'TEST1': self joins are not yet supported."); + + // When: + analyzer.analyze(query, Optional.of(createStreamAsSelect.getSink())); + } + @SuppressWarnings("unchecked") private T parseSingle(final String simpleQuery) { return (T) Iterables.getOnlyElement(parse(simpleQuery, jsonMetaStore)); diff --git a/ksql-functional-tests/src/test/resources/query-validation-tests/joins.json b/ksql-functional-tests/src/test/resources/query-validation-tests/joins.json index 7f10d5d5a06d..125764a25f8f 100644 --- a/ksql-functional-tests/src/test/resources/query-validation-tests/joins.json +++ b/ksql-functional-tests/src/test/resources/query-validation-tests/joins.json @@ -1922,6 +1922,17 @@ {"name": "OUTPUT", "type": "stream", "schema": "ROWKEY STRING KEY, L_ROWKEY STRING, L1 INT, R1 INT"} ] } + }, + { + "name": "self join", + "statements": [ + "CREATE STREAM INPUT (ID bigint) WITH (kafka_topic='left_topic', value_format='JSON');", + "CREATE STREAM OUTPUT as SELECT * FROM INPUT s1 JOIN INPUT s2 WITHIN 1 HOUR ON s1.id = s2.id;" + ], + "expectedException": { + "type": "io.confluent.ksql.util.KsqlStatementException", + "message": "Can not join 'INPUT' to 'INPUT': self joins are not yet supported." + } } ] }