Skip to content

Commit

Permalink
fix: better error message on self-join (#4248)
Browse files Browse the repository at this point in the history
Fixes: #4241

Self-joins are not yet supported. Previously they resulted in an confusing error message:

>  Invalid topology: Topic <something> has already been registered by another source.

They not result in:

> Can not join 'something' to 'something': self joins are not yet supported.
  • Loading branch information
big-andy-coates authored Jan 10, 2020
1 parent 5ee1e9e commit 1281ab2
Show file tree
Hide file tree
Showing 3 changed files with 43 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -341,6 +341,14 @@ protected AstNode visitJoin(final Join node, final Void context) {
throw new KsqlException("Only equality join criteria is supported.");
}

if (left.getDataSource().getName().equals(right.getDataSource().getName())) {
throw new KsqlException(
"Can not join '" + left.getDataSource().getName().toString(FormatOptions.noEscape())
+ "' to '" + right.getDataSource().getName().toString(FormatOptions.noEscape())
+ "': self joins are not yet supported."
);
}

final ColumnRef leftJoinField = getJoinFieldName(
comparisonExpression,
left.getAlias(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ public class AnalyzerFunctionalTest {
public final ExpectedException expectedException = ExpectedException.none();

@Mock
private SerdeOptionsSupplier serdeOptiponsSupplier;
private SerdeOptionsSupplier serdeOptionsSupplier;
@Mock
private Sink sink;

Expand All @@ -127,7 +127,7 @@ public void init() {
jsonMetaStore,
"",
DEFAULT_SERDE_OPTIONS,
serdeOptiponsSupplier
serdeOptionsSupplier
);

when(sink.getName()).thenReturn(SourceName.of("TEST0"));
Expand Down Expand Up @@ -388,7 +388,7 @@ public void shouldFailIfExplicitNamespaceIsProvidedButEmpty() {
public void shouldGetSerdeOptions() {
// Given:
final Set<SerdeOption> serdeOptions = ImmutableSet.of(SerdeOption.UNWRAP_SINGLE_VALUES);
when(serdeOptiponsSupplier.build(any(), any(), any(), any())).thenReturn(serdeOptions);
when(serdeOptionsSupplier.build(any(), any(), any(), any())).thenReturn(serdeOptions);

givenSinkValueFormat(Format.AVRO);
givenWrapSingleValues(true);
Expand All @@ -397,7 +397,7 @@ public void shouldGetSerdeOptions() {
final Analysis result = analyzer.analyze(query, Optional.of(sink));

// Then:
verify(serdeOptiponsSupplier).build(
verify(serdeOptionsSupplier).build(
ImmutableList.of("COL0", "COL1").stream().map(ColumnName::of).collect(Collectors.toList()),
Format.AVRO,
Optional.of(true),
Expand Down Expand Up @@ -484,6 +484,26 @@ public void shouldNotIncludeMetaColumnsForSelectStartOnStaticQueries() {
)));
}

@Test
public void shouldThrowOnSelfJoin() {
// Given:
final CreateStreamAsSelect createStreamAsSelect = parseSingle(
"CREATE STREAM FOO AS "
+ "SELECT * FROM test1 t1 JOIN test1 t2 ON t1.rowkey = t2.rowkey;"
);

final Query query = createStreamAsSelect.getQuery();

final Analyzer analyzer = new Analyzer(jsonMetaStore, "", DEFAULT_SERDE_OPTIONS);

// Expect:
expectedException.expect(KsqlException.class);
expectedException.expectMessage("Can not join 'TEST1' to 'TEST1': self joins are not yet supported.");

// When:
analyzer.analyze(query, Optional.of(createStreamAsSelect.getSink()));
}

@SuppressWarnings("unchecked")
private <T extends Statement> T parseSingle(final String simpleQuery) {
return (T) Iterables.getOnlyElement(parse(simpleQuery, jsonMetaStore));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1922,6 +1922,17 @@
{"name": "OUTPUT", "type": "stream", "schema": "ROWKEY STRING KEY, L_ROWKEY STRING, L1 INT, R1 INT"}
]
}
},
{
"name": "self join",
"statements": [
"CREATE STREAM INPUT (ID bigint) WITH (kafka_topic='left_topic', value_format='JSON');",
"CREATE STREAM OUTPUT as SELECT * FROM INPUT s1 JOIN INPUT s2 WITHIN 1 HOUR ON s1.id = s2.id;"
],
"expectedException": {
"type": "io.confluent.ksql.util.KsqlStatementException",
"message": "Can not join 'INPUT' to 'INPUT': self joins are not yet supported."
}
}
]
}

0 comments on commit 1281ab2

Please sign in to comment.