Skip to content

Commit

Permalink
Using ksql topic name instead of Kafka topic name in topic map in met…
Browse files Browse the repository at this point in the history
…astore. (#2149)

This was cherry picked commit.
  • Loading branch information
hjafarpour authored Nov 14, 2018
1 parent 5156eda commit 108f88f
Show file tree
Hide file tree
Showing 8 changed files with 87 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -168,15 +168,15 @@ private void analyzeNonStdOutSink(final boolean doCreateInto) {
}

newIntoKsqlTopic = new KsqlTopic(
intoKafkaTopicName,
intoStructuredDataSource.getName(),
intoKafkaTopicName,
intoTopicSerde
);
} else {
newIntoKsqlTopic = metaStore.getTopic(intoKafkaTopicName);
newIntoKsqlTopic = metaStore.getTopic(intoStructuredDataSource.getName());
if (newIntoKsqlTopic == null) {
throw new KsqlException(
"Sink topic " + intoKafkaTopicName + " does not exist in th e metastore.");
"Sink topic " + intoKafkaTopicName + " does not exist in the metastore.");
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,7 @@ private QueryMetadata buildPlanForStructuredOutputNode(
final String statement
) {

if (metaStore.getTopic(outputNode.getKafkaTopicName()) == null) {
if (metaStore.getTopic(outputNode.getKsqlTopic().getName()) == null) {
metaStore.putTopic(outputNode.getKsqlTopic());
}
final StructuredDataSource sinkDataSource;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ private OutputNode buildOutputNode(final Schema inputSchema,
extractionPolicy,
sourcePlanNode.getKeyField(),
intoDataSource.getKsqlTopic(),
intoDataSource.getKsqlTopic().getTopicName(),
intoDataSource.getKsqlTopic().getKafkaTopicName(),
intoProperties,
analysis.getLimitClause(),
analysis.isDoCreateInto()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,12 +60,12 @@ public KsqlStructuredDataOutputNode(
@JsonProperty("timestamp") final TimestampExtractionPolicy timestampExtractionPolicy,
@JsonProperty("key") final Field keyField,
@JsonProperty("ksqlTopic") final KsqlTopic ksqlTopic,
@JsonProperty("topicName") final String topicName,
@JsonProperty("topicName") final String kafkaTopicName,
@JsonProperty("outputProperties") final Map<String, Object> outputProperties,
@JsonProperty("limit") final Optional<Integer> limit,
@JsonProperty("doCreateInto") final boolean doCreateInto) {
super(id, source, schema, limit, timestampExtractionPolicy);
this.kafkaTopicName = topicName;
this.kafkaTopicName = kafkaTopicName;
this.keyField = keyField;
this.ksqlTopic = ksqlTopic;
this.outputProperties = outputProperties;
Expand Down Expand Up @@ -263,7 +263,7 @@ public static class Builder {
private TimestampExtractionPolicy timestampExtractionPolicy;
private Field keyField;
private KsqlTopic ksqlTopic;
private String topicName;
private String kafkaTopicName;
private Map<String, Object> outputProperties;
private Optional<Integer> limit;
private boolean doCreateInto;
Expand All @@ -276,7 +276,7 @@ public KsqlStructuredDataOutputNode build() {
timestampExtractionPolicy,
keyField,
ksqlTopic,
topicName,
kafkaTopicName,
outputProperties,
limit,
doCreateInto);
Expand All @@ -290,7 +290,7 @@ public static Builder from(final KsqlStructuredDataOutputNode original) {
.withTimestampExtractionPolicy(original.getTimestampExtractionPolicy())
.withKeyField(original.getKeyField())
.withKsqlTopic(original.getKsqlTopic())
.withTopicName(original.getKafkaTopicName())
.withKafkaTopicName(original.getKafkaTopicName())
.withOutputProperties(original.getOutputProperties())
.withLimit(original.getLimit())
.withDoCreateInto(original.isDoCreateInto());
Expand All @@ -307,8 +307,8 @@ Builder withOutputProperties(final Map<String, Object> outputProperties) {
return this;
}

Builder withTopicName(final String topicName) {
this.topicName = topicName;
Builder withKafkaTopicName(final String kafkaTopicName) {
this.kafkaTopicName = kafkaTopicName;
return this;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,11 +62,11 @@ public void shouldThrowExpectedExceptionForDuplicateTable() {
new CommandFactories(topicClient, schemaRegistryClient));
try {
final List<PreparedStatement> statementList = ksqlEngine.parseStatements(
"CREATE TABLE FOO AS SELECT * FROM TEST2; CREATE TABLE BAR WITH (KAFKA_TOPIC='FOO') AS SELECT * FROM TEST2;", metaStore.clone(), true);
"CREATE TABLE FOO AS SELECT * FROM TEST2; CREATE TABLE FOO WITH (KAFKA_TOPIC='BAR') AS SELECT * FROM TEST2;", metaStore.clone(), true);
queryEngine.buildLogicalPlans(metaStore, statementList, ksqlConfig);
Assert.fail();
} catch (final KsqlException e) {
assertThat(e.getMessage(), equalTo("Cannot create the stream/table. The output topic FOO is already used by FOO"));
assertThat(e.getMessage(), equalTo("Exception while processing statement: Cannot add the new data source. Another data source with the same name already exists: KsqlStream name:FOO"));
}

}
Expand All @@ -77,11 +77,11 @@ public void shouldThrowExpectedExceptionForDuplicateStream() {
new CommandFactories(topicClient, schemaRegistryClient));
try {
final List<PreparedStatement> statementList = ksqlEngine.parseStatements(
"CREATE STREAM FOO AS SELECT * FROM ORDERS; CREATE STREAM BAR WITH (KAFKA_TOPIC='FOO') AS SELECT * FROM ORDERS;", metaStore.clone(), true);
"CREATE STREAM FOO AS SELECT * FROM ORDERS; CREATE STREAM FOO WITH (KAFKA_TOPIC='BAR') AS SELECT * FROM ORDERS;", metaStore.clone(), true);
queryEngine.buildLogicalPlans(metaStore, statementList, ksqlConfig);
Assert.fail();
} catch (final KsqlException e) {
assertThat(e.getMessage(), equalTo("Cannot create the stream/table. The output topic FOO is already used by FOO"));
assertThat(e.getMessage(), equalTo("Exception while processing statement: Cannot add the new data source. Another data source with the same name already exists: KsqlStream name:FOO"));
}

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,25 @@
package io.confluent.ksql.analyzer;

import static io.confluent.ksql.testutils.AnalysisTestUtil.analyzeQuery;
import static io.confluent.ksql.testutils.AnalysisTestUtil.getPreparedStatements;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.MatcherAssert.assertThat;

import io.confluent.ksql.function.InternalFunctionRegistry;
import io.confluent.ksql.metastore.KsqlTopic;
import io.confluent.ksql.metastore.MetaStore;
import io.confluent.ksql.parser.KsqlParser.PreparedStatement;
import io.confluent.ksql.parser.SqlFormatter;
import io.confluent.ksql.metastore.StructuredDataSource;
import io.confluent.ksql.parser.tree.CreateStreamAsSelect;
import io.confluent.ksql.parser.tree.QualifiedName;
import io.confluent.ksql.parser.tree.QuerySpecification;
import io.confluent.ksql.parser.tree.Statement;
import io.confluent.ksql.parser.tree.Table;
import io.confluent.ksql.serde.json.KsqlJsonTopicSerDe;
import io.confluent.ksql.util.MetaStoreFixture;
import java.util.List;
import java.util.stream.Collectors;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
Expand Down Expand Up @@ -188,4 +202,42 @@ public void testFilterAnalysis() {
Assert.assertTrue("testFilterAnalysis failed.", analysis.getWhereExpression().toString().equalsIgnoreCase("(TEST1.COL0 > 20)"));

}

@Test
public void shouldCreateCorrectSinkKsqlTopic() {
final String simpleQuery = "CREATE STREAM FOO WITH (KAFKA_TOPIC='TEST_TOPIC1') AS SELECT col0, col2, col3 FROM test1 WHERE col0 > 100;";
// The following few lines are only needed for this test
final MetaStore testMetastore = metaStore.clone();
final KsqlTopic ksqlTopic = new KsqlTopic("FOO", "TEST_TOPIC1", new KsqlJsonTopicSerDe());
testMetastore.putTopic(ksqlTopic);
final List<Statement> statements = getPreparedStatements(simpleQuery, testMetastore)
.stream()
.map(PreparedStatement::getStatement)
.collect(Collectors.toList());
final CreateStreamAsSelect createStreamAsSelect = (CreateStreamAsSelect) statements.get(0);
final Table intoTable = new Table(QualifiedName.of(createStreamAsSelect.getName().toString()));
intoTable.setProperties(createStreamAsSelect.getProperties());
final QuerySpecification querySpecification = (QuerySpecification) createStreamAsSelect.getQuery().getQueryBody();
final QuerySpecification newQuerySpecification = new QuerySpecification(
querySpecification.getSelect(),
intoTable,
false,
querySpecification.getFrom(),
querySpecification.getWindowExpression(),
querySpecification.getWhere(),
querySpecification.getGroupBy(),
querySpecification.getHaving(),
querySpecification.getLimit()
);
final Analysis analysis = new Analysis();
final Analyzer analyzer = new Analyzer("sqlExpression", analysis, testMetastore, "");
analyzer.visitQuerySpecification(newQuerySpecification, new AnalysisContext(null));

Assert.assertNotNull("INTO is null", analysis.getInto());
final StructuredDataSource structuredDataSource = analysis.getInto();
final KsqlTopic createdKsqlTopic = structuredDataSource.getKsqlTopic();
assertThat(createdKsqlTopic.getTopicName(), equalTo("FOO"));
assertThat(createdKsqlTopic.getKafkaTopicName(), equalTo("TEST_TOPIC1"));
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,12 @@ public final class AnalysisTestUtil {
private AnalysisTestUtil() {
}

public static List<PreparedStatement> getPreparedStatements(final String queryStr, final MetaStore metaStore) {
return KSQL_PARSER.buildAst(queryStr, metaStore);
}

public static Analysis analyzeQuery(final String queryStr, final MetaStore metaStore) {
final List<PreparedStatement> statements = KSQL_PARSER.buildAst(queryStr, metaStore);
final List<PreparedStatement> statements = getPreparedStatements(queryStr, metaStore);
final Analysis analysis = new Analysis();
final Analyzer analyzer = new Analyzer(queryStr, analysis, metaStore, "");
analyzer.process(statements.get(0).getStatement(), new AnalysisContext(null));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -279,6 +279,20 @@
{"topic": "S1", "key": 0, "value": {"S__END": "foo"}, "timestamp": 0},
{"topic": "S1", "key": 0, "value": {"S__END": "bar"}, "timestamp": 0}
]
},
{
"name": "CSAS with custom Kafka topic name",
"statements": [
"CREATE STREAM TEST (C1 BIGINT, C2 INTEGER, C3 STRING) WITH (KAFKA_TOPIC='test_topic', value_format='DELIMITED');",
"CREATE STREAM S1 WITH (KAFKA_TOPIC='topic_s') AS SELECT * FROM TEST WHERE C1 = 4294967296;"
],
"inputs": [
{"topic": "test_topic", "key": 0, "value": "123,456,foo", "timestamp": 0},
{"topic": "test_topic", "key": 0, "value": "4294967296,456,foo", "timestamp": 0}
],
"outputs": [
{"topic": "topic_s", "key": 0, "value": "4294967296,456,foo", "timestamp": 0}
]
}
]
}

0 comments on commit 108f88f

Please sign in to comment.