From 87dbc0401629dd855d6d6ee6bcf749e678249d4b Mon Sep 17 00:00:00 2001 From: Andy Coates Date: Tue, 25 Feb 2020 17:31:56 +0000 Subject: [PATCH 1/3] fix: support partial schemas fixes: https://github.com/confluentinc/ksql/issues/4566 With this change users can now supply just the key schema and use schema inference to get the value columns. For example, if the key is an `INT` serialized using Kafka's `IntegerSerializer` and the value is an Avro record with the schema stored in the Scheme Registry, then such a stream can be registered in ksqlDB with a statement such as: ```sql -- note: only the key columns are provided between the first set of brackets -- the value columns will be inferred from the Avro schema in the Schema Registry CREATE STREAM users (ROWKEY INT KET) WITH (kafka_topic='users', value_format='avro'); ``` --- .../ksql/inference/DefaultSchemaInjector.java | 76 ++++++++++++------ .../inference/DefaultSchemaInjectorTest.java | 9 +-- .../query-validation-tests/elements.json | 78 +++++++++++++++++++ 3 files changed, 133 insertions(+), 30 deletions(-) diff --git a/ksql-engine/src/main/java/io/confluent/ksql/schema/ksql/inference/DefaultSchemaInjector.java b/ksql-engine/src/main/java/io/confluent/ksql/schema/ksql/inference/DefaultSchemaInjector.java index a18d9e573b7..230260afcbb 100644 --- a/ksql-engine/src/main/java/io/confluent/ksql/schema/ksql/inference/DefaultSchemaInjector.java +++ b/ksql-engine/src/main/java/io/confluent/ksql/schema/ksql/inference/DefaultSchemaInjector.java @@ -15,7 +15,6 @@ package io.confluent.ksql.schema.ksql.inference; -import com.google.common.collect.Iterables; import io.confluent.ksql.metastore.TypeRegistry; import io.confluent.ksql.parser.KsqlParser.PreparedStatement; import io.confluent.ksql.parser.SchemaParser; @@ -23,6 +22,8 @@ import io.confluent.ksql.parser.properties.with.CreateSourceProperties; import io.confluent.ksql.parser.tree.CreateSource; import io.confluent.ksql.parser.tree.Statement; +import io.confluent.ksql.parser.tree.TableElement; +import io.confluent.ksql.parser.tree.TableElement.Namespace; import io.confluent.ksql.parser.tree.TableElements; import io.confluent.ksql.schema.connect.SqlSchemaFormatter; import io.confluent.ksql.schema.connect.SqlSchemaFormatter.Option; @@ -35,20 +36,25 @@ import io.confluent.ksql.util.IdentifierUtil; import io.confluent.ksql.util.KsqlException; import io.confluent.ksql.util.KsqlStatementException; +import java.util.ArrayList; +import java.util.List; import java.util.Objects; import java.util.Optional; +import java.util.stream.Stream; import org.apache.kafka.connect.data.Schema; /** - * An injector which injects the schema into the supplied {@code statement}. + * An injector which injects the value columns into the supplied {@code statement}. * - *

The schema is only injected if: + *

The value columns are only injected if: *

* + *

Any key columns present are passed through unchanged. + * *

If any of the above are not true then the {@code statement} is returned unchanged. */ public class DefaultSchemaInjector implements Injector { @@ -62,7 +68,6 @@ public DefaultSchemaInjector(final TopicSchemaSupplier schemaSupplier) { this.schemaSupplier = Objects.requireNonNull(schemaSupplier, "schemaSupplier"); } - @SuppressWarnings("unchecked") @Override public ConfiguredStatement inject( @@ -90,17 +95,18 @@ public ConfiguredStatement inject( private Optional> forCreateStatement( final ConfiguredStatement statement ) { - if (hasElements(statement) + if (hasValueElements(statement) || !statement.getStatement().getProperties().getValueFormat().supportsSchemaInference()) { return Optional.empty(); } final SchemaAndId valueSchema = getValueSchema(statement); final CreateSource withSchema = addSchemaFields(statement, valueSchema); - final PreparedStatement prepared = - buildPreparedStatement(withSchema); - return Optional.of(ConfiguredStatement.of( - prepared, statement.getOverrides(), statement.getConfig())); + final PreparedStatement prepared = buildPreparedStatement(withSchema); + final ConfiguredStatement configured = ConfiguredStatement + .of(prepared, statement.getOverrides(), statement.getConfig()); + + return Optional.of(configured); } private SchemaAndId getValueSchema( @@ -123,10 +129,11 @@ private SchemaAndId getValueSchema( return result.schemaAndId.get(); } - private static boolean hasElements( + private static boolean hasValueElements( final ConfiguredStatement statement ) { - return !Iterables.isEmpty(statement.getStatement().getElements()); + return statement.getStatement().getElements().stream() + .anyMatch(e -> e.getNamespace().equals(Namespace.VALUE)); } private static CreateSource addSchemaFields( @@ -145,23 +152,44 @@ private static CreateSource addSchemaFields( } private static TableElements buildElements( - final Schema schema, + final Schema valueSchema, final ConfiguredStatement preparedStatement ) { - try { - throwOnInvalidSchema(schema); - // custom types cannot be injected, so we can pass in an EMPTY type registry - return SchemaParser.parse(FORMATTER.format(schema), TypeRegistry.EMPTY); - } catch (final Exception e) { - throw new KsqlStatementException( - "Failed to convert schema to KSQL model: " + e.getMessage(), - preparedStatement.getStatementText(), - e); - } + throwOnInvalidSchema(valueSchema); + + final List elements = new ArrayList<>(); + + getKeyColumns(preparedStatement) + .forEach(elements::add); + + getColumnsFromSchema(valueSchema) + .forEach(elements::add); + + return TableElements.of(elements); + } + + private static Stream getKeyColumns( + final ConfiguredStatement preparedStatement + ) { + return preparedStatement.getStatement().getElements().stream() + .filter(e -> e.getNamespace() == Namespace.KEY); + } + + private static Stream getColumnsFromSchema(final Schema schema) { + return SchemaParser.parse(FORMATTER.format(schema), TypeRegistry.EMPTY).stream(); } private static void throwOnInvalidSchema(final Schema schema) { - SchemaConverters.connectToSqlConverter().toSqlType(schema); + try { + SchemaConverters.connectToSqlConverter().toSqlType(schema); + } catch (final Exception e) { + throw new KsqlException( + "Schema contains types not supported by KSQL: " + e.getMessage() + + System.lineSeparator() + + "Schema: " + schema, + e + ); + } } private static PreparedStatement buildPreparedStatement( diff --git a/ksql-engine/src/test/java/io/confluent/ksql/schema/ksql/inference/DefaultSchemaInjectorTest.java b/ksql-engine/src/test/java/io/confluent/ksql/schema/ksql/inference/DefaultSchemaInjectorTest.java index 2b4278ad02f..fb78dccebea 100644 --- a/ksql-engine/src/test/java/io/confluent/ksql/schema/ksql/inference/DefaultSchemaInjectorTest.java +++ b/ksql-engine/src/test/java/io/confluent/ksql/schema/ksql/inference/DefaultSchemaInjectorTest.java @@ -18,9 +18,7 @@ import static io.confluent.ksql.schema.ksql.inference.TopicSchemaSupplier.SchemaAndId.schemaAndId; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.containsString; -import static org.hamcrest.Matchers.instanceOf; import static org.hamcrest.Matchers.is; -import static org.hamcrest.Matchers.not; import static org.hamcrest.Matchers.notNullValue; import static org.hamcrest.Matchers.sameInstance; import static org.junit.Assert.fail; @@ -381,7 +379,7 @@ public void shouldAddSchemaIdIfNotPresentAlready() { final ConfiguredStatement result = injector.inject(csStatement); // Then: - assertThat(result.getStatement().getProperties().getAvroSchemaId().get(), is(SCHEMA_ID)); + assertThat(result.getStatement().getProperties().getAvroSchemaId(), is(Optional.of(SCHEMA_ID))); assertThat(result.getStatementText(), containsString("AVRO_SCHEMA_ID=5")); } @@ -395,8 +393,7 @@ public void shouldNotOverwriteExistingSchemaId() { final ConfiguredStatement result = injector.inject(csStatement); // Then: - assertThat(result.getStatement().getProperties().getAvroSchemaId().get(), - is(42)); + assertThat(result.getStatement().getProperties().getAvroSchemaId(), is(Optional.of(42))); assertThat(result.getStatementText(), containsString("AVRO_SCHEMA_ID='42'")); } @@ -416,7 +413,7 @@ public void shouldThrowOnUnsupportedType() { fail("Expected KsqlStatementException. schema: " + unsupportedSchema); } catch (final KsqlStatementException e) { assertThat(e.getRawMessage(), - containsString("Failed to convert schema to KSQL model:")); + containsString("Schema contains types not supported by KSQL:")); assertThat(e.getSqlStatement(), is(csStatement.getStatementText())); } diff --git a/ksql-functional-tests/src/test/resources/query-validation-tests/elements.json b/ksql-functional-tests/src/test/resources/query-validation-tests/elements.json index 7f83947d8af..cf272ef0d28 100644 --- a/ksql-functional-tests/src/test/resources/query-validation-tests/elements.json +++ b/ksql-functional-tests/src/test/resources/query-validation-tests/elements.json @@ -60,6 +60,84 @@ "inputs": [{"topic": "input", "value": {"c1": 4}}], "outputs": [{"topic": "OUTPUT", "value": {"C1": 4}}] }, + { + "name": "with invalid or reserved words in schema - JSON", + "statements": [ + "CREATE STREAM INPUT WITH (kafka_topic='input', value_format='JSON');", + "CREATE STREAM OUTPUT AS SELECT * FROM input;" + ], + "topics": [ + { + "name": "input", + "schema": {"type": "object","properties": { + "@timestamp": {"type": "integer"}, + "from": {"type": "integer"} + }}, + "format": "JSON" + }, + { + "name": "OUTPUT", + "format": "JSON" + } + ], + "inputs": [{"topic": "input", "value": {"@timestamp": 4, "from": 5}}], + "outputs": [{"topic": "OUTPUT", "value": {"@TIMESTAMP": 4, "FROM": 5}}], + "post": { + "sources": [ + {"name": "OUTPUT", "type": "stream", "schema": "ROWKEY STRING KEY, `@TIMESTAMP` BIGINT, `FROM` BIGINT"} + ] + } + }, + { + "name": "validate without value elements OK - AVRO", + "statements": [ + "CREATE STREAM INPUT (rowkey int key) WITH (kafka_topic='input', value_format='AvRo');", + "CREATE STREAM OUTPUT AS SELECT * FROM input;" + ], + "topics": [ + { + "name": "input", + "schema": {"name": "blah", "type": "record", "fields": [{"name": "c1", "type": "int"}]}, + "format": "AVRO" + }, + { + "name": "OUTPUT", + "format": "AVRO" + } + ], + "inputs": [{"topic": "input", "key": 42, "value": {"c1": 4}}], + "outputs": [{"topic": "OUTPUT", "key": 42, "value": {"C1": 4}}], + "post": { + "sources": [ + {"name": "OUTPUT", "type": "stream", "schema": "ROWKEY INT KEY, `C1` INT"} + ] + } + }, + { + "name": "validate without value elements OK - JSON SCHEMA", + "statements": [ + "CREATE STREAM INPUT (rowkey int key) WITH (kafka_topic='input', value_format='JSON');", + "CREATE STREAM OUTPUT AS SELECT * FROM input;" + ], + "topics": [ + { + "name": "input", + "schema": {"type": "object","properties": {"c1": {"type": "integer"}}}, + "format": "JSON" + }, + { + "name": "OUTPUT", + "format": "JSON" + } + ], + "inputs": [{"topic": "input", "key": 42, "value": {"c1": 4}}], + "outputs": [{"topic": "OUTPUT", "key": 42, "value": {"C1": 4}}], + "post": { + "sources": [ + {"name": "OUTPUT", "type": "stream", "schema": "ROWKEY INT KEY, `C1` BIGINT"} + ] + } + }, { "name": "validate with elements OK", "format": ["JSON", "PROTOBUF"], From 0859c0c0ad22dfa2f45fc8939aa737bebfad1e49 Mon Sep 17 00:00:00 2001 From: Andy Coates Date: Wed, 26 Feb 2020 09:54:53 +0000 Subject: [PATCH 2/3] chore: fix build failure due to bad merge from 5.5 branch The `spec.json` in the 5.5.x branch has been split into a `spec.json` and `plan.json` in the master branch. --- .../ksql/test/planned/PlannedTestPath.java | 6 + .../ksql/test/planned/TestCasePlanLoader.java | 36 ++-- .../ksql/test/planned/TestCaseSpecNode.java | 6 +- .../5.5.0_1582662519759/plan.json | 144 ++++++++++++++++ .../5.5.0_1582662519759/spec.json | 156 ++---------------- 5 files changed, 192 insertions(+), 156 deletions(-) create mode 100644 ksql-functional-tests/src/test/resources/historical_plans/elements_-_validate_without_elements_OK_-_PROTOBUF/5.5.0_1582662519759/plan.json diff --git a/ksql-functional-tests/src/test/java/io/confluent/ksql/test/planned/PlannedTestPath.java b/ksql-functional-tests/src/test/java/io/confluent/ksql/test/planned/PlannedTestPath.java index 2663c47166a..b97901790ea 100644 --- a/ksql-functional-tests/src/test/java/io/confluent/ksql/test/planned/PlannedTestPath.java +++ b/ksql-functional-tests/src/test/java/io/confluent/ksql/test/planned/PlannedTestPath.java @@ -22,6 +22,7 @@ import java.util.Objects; public final class PlannedTestPath { + private static final String INVALID_FILENAME_CHARS_PATTERN = "\\s|/|\\\\|:|\\*|\\?|\"|<|>|\\|"; private static final String BASE_DIRECTORY = "src/test/resources/"; private static final String PLANS_DIR = "historical_plans/"; @@ -80,5 +81,10 @@ private static String formatName(final String originalName) { return originalName .replaceAll(INVALID_FILENAME_CHARS_PATTERN, "_"); } + + @Override + public String toString() { + return path.toString(); + } } diff --git a/ksql-functional-tests/src/test/java/io/confluent/ksql/test/planned/TestCasePlanLoader.java b/ksql-functional-tests/src/test/java/io/confluent/ksql/test/planned/TestCasePlanLoader.java index ddf8df8a069..32f97c60c9a 100644 --- a/ksql-functional-tests/src/test/java/io/confluent/ksql/test/planned/TestCasePlanLoader.java +++ b/ksql-functional-tests/src/test/java/io/confluent/ksql/test/planned/TestCasePlanLoader.java @@ -15,6 +15,7 @@ package io.confluent.ksql.test.planned; +import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableList.Builder; import io.confluent.kafka.schemaregistry.client.MockSchemaRegistryClient; @@ -29,6 +30,7 @@ import io.confluent.ksql.planner.plan.ConfiguredKsqlPlan; import io.confluent.ksql.services.ServiceContext; import io.confluent.ksql.services.TestServiceContext; +import io.confluent.ksql.test.TestFrameworkException; import io.confluent.ksql.test.loader.JsonTestLoader; import io.confluent.ksql.test.model.KsqlVersion; import io.confluent.ksql.test.model.RecordNode; @@ -40,7 +42,7 @@ import io.confluent.ksql.util.PersistentQueryMetadata; import java.io.File; import java.io.IOException; -import java.nio.charset.Charset; +import java.nio.charset.StandardCharsets; import java.nio.file.Files; import java.util.Collections; import java.util.List; @@ -146,22 +148,32 @@ private static TestCasePlan parseSpec(final PlannedTestPath versionDir) { final PlannedTestPath planPath = versionDir.resolve(PlannedTestPath.PLAN_FILE); final PlannedTestPath specPath = versionDir.resolve(PlannedTestPath.SPEC_FILE); final PlannedTestPath topologyPath = versionDir.resolve(PlannedTestPath.TOPOLOGY_FILE); + + return new TestCasePlan( + parseJson(specPath, JsonTestLoader.OBJECT_MAPPER, TestCaseSpecNode.class), + parseJson(planPath, PlannedTestUtils.PLAN_MAPPER, TestCasePlanNode.class), + slurp(topologyPath) + ); + } + + private static T parseJson(final PlannedTestPath path, final ObjectMapper mapper, + final Class type) { try { - return new TestCasePlan( - JsonTestLoader.OBJECT_MAPPER.readValue(slurp(specPath), TestCaseSpecNode.class), - PlannedTestUtils.PLAN_MAPPER.readValue(slurp(planPath), TestCasePlanNode.class), - slurp(topologyPath) - ); + return mapper.readValue(slurp(path), type); } catch (final IOException e) { - throw new RuntimeException(e); + throw new TestFrameworkException("Error parsing json in file: " + path, e); } } - private static String slurp(final PlannedTestPath path) throws IOException { - return new String( - Files.readAllBytes(path.relativePath()), - Charset.defaultCharset() - ); + private static String slurp(final PlannedTestPath path) { + try { + return new String( + Files.readAllBytes(path.relativePath()), + StandardCharsets.UTF_8 + ); + } catch (final IOException e) { + throw new TestFrameworkException("Error reading file: " + path, e); + } } private static TestCasePlan buildStatementsInTestCase( diff --git a/ksql-functional-tests/src/test/java/io/confluent/ksql/test/planned/TestCaseSpecNode.java b/ksql-functional-tests/src/test/java/io/confluent/ksql/test/planned/TestCaseSpecNode.java index 936b72701c1..d68bcb29b31 100644 --- a/ksql-functional-tests/src/test/java/io/confluent/ksql/test/planned/TestCaseSpecNode.java +++ b/ksql-functional-tests/src/test/java/io/confluent/ksql/test/planned/TestCaseSpecNode.java @@ -1,12 +1,14 @@ package io.confluent.ksql.test.planned; import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.common.collect.ImmutableList; import io.confluent.ksql.test.model.RecordNode; import java.util.List; import java.util.Map; import java.util.Objects; public class TestCaseSpecNode { + private final String version; private final long timestamp; private final Map schemas; @@ -23,8 +25,8 @@ public TestCaseSpecNode( this.version = Objects.requireNonNull(version, "version"); this.timestamp = timestamp; this.schemas = Objects.requireNonNull(schemas, "schemas"); - this.inputs = inputs; // Objects.requireNonNull(inputs, "inputs"); - this.outputs = outputs; // Objects.requireNonNull(outputs, "outputs"); + this.inputs = ImmutableList.copyOf(Objects.requireNonNull(inputs, "inputs")); + this.outputs = ImmutableList.copyOf(Objects.requireNonNull(outputs, "outputs")); } public long getTimestamp() { diff --git a/ksql-functional-tests/src/test/resources/historical_plans/elements_-_validate_without_elements_OK_-_PROTOBUF/5.5.0_1582662519759/plan.json b/ksql-functional-tests/src/test/resources/historical_plans/elements_-_validate_without_elements_OK_-_PROTOBUF/5.5.0_1582662519759/plan.json new file mode 100644 index 00000000000..616aa38a303 --- /dev/null +++ b/ksql-functional-tests/src/test/resources/historical_plans/elements_-_validate_without_elements_OK_-_PROTOBUF/5.5.0_1582662519759/plan.json @@ -0,0 +1,144 @@ +{ + "plan" : [ { + "@type" : "ksqlPlanV1", + "statementText" : "CREATE STREAM INPUT (C1 INTEGER) WITH (AVRO_SCHEMA_ID=1, KAFKA_TOPIC='input', VALUE_FORMAT='PROTOBUF');", + "ddlCommand" : { + "@type" : "createStreamV1", + "sourceName" : "INPUT", + "schema" : "`ROWKEY` STRING KEY, `C1` INTEGER", + "keyField" : null, + "timestampColumn" : null, + "topicName" : "input", + "formats" : { + "keyFormat" : { + "format" : "KAFKA", + "properties" : { } + }, + "valueFormat" : { + "format" : "PROTOBUF", + "properties" : { } + }, + "options" : [ ] + }, + "windowInfo" : null + }, + "queryPlan" : null + }, { + "@type" : "ksqlPlanV1", + "statementText" : "CREATE STREAM OUTPUT WITH (PARTITIONS=4) AS SELECT *\nFROM INPUT INPUT\nEMIT CHANGES", + "ddlCommand" : { + "@type" : "createStreamV1", + "sourceName" : "OUTPUT", + "schema" : "`ROWKEY` STRING KEY, `C1` INTEGER", + "keyField" : null, + "timestampColumn" : null, + "topicName" : "OUTPUT", + "formats" : { + "keyFormat" : { + "format" : "KAFKA", + "properties" : { } + }, + "valueFormat" : { + "format" : "PROTOBUF", + "properties" : { } + }, + "options" : [ ] + }, + "windowInfo" : null + }, + "queryPlan" : { + "sources" : [ "INPUT" ], + "sink" : "OUTPUT", + "physicalPlan" : { + "@type" : "streamSinkV1", + "properties" : { + "queryContext" : "OUTPUT" + }, + "source" : { + "@type" : "streamSelectV1", + "properties" : { + "queryContext" : "Project" + }, + "source" : { + "@type" : "streamSourceV1", + "properties" : { + "queryContext" : "KsqlTopic/Source" + }, + "topicName" : "input", + "formats" : { + "keyFormat" : { + "format" : "KAFKA", + "properties" : { } + }, + "valueFormat" : { + "format" : "PROTOBUF", + "properties" : { } + }, + "options" : [ ] + }, + "timestampColumn" : null, + "sourceSchema" : "`ROWKEY` STRING KEY, `C1` INTEGER" + }, + "selectExpressions" : [ "C1 AS C1" ] + }, + "formats" : { + "keyFormat" : { + "format" : "KAFKA", + "properties" : { } + }, + "valueFormat" : { + "format" : "PROTOBUF", + "properties" : { } + }, + "options" : [ ] + }, + "topicName" : "OUTPUT" + }, + "queryId" : "CSAS_OUTPUT_0" + } + } ], + "configs" : { + "ksql.extension.dir" : "ext", + "ksql.streams.cache.max.bytes.buffering" : "0", + "ksql.security.extension.class" : null, + "ksql.transient.prefix" : "transient_", + "ksql.persistence.wrap.single.values" : "true", + "ksql.authorization.cache.expiry.time.secs" : "30", + "ksql.schema.registry.url" : "", + "ksql.streams.default.deserialization.exception.handler" : "io.confluent.ksql.errors.LogMetricAndContinueExceptionHandler", + "ksql.output.topic.name.prefix" : "", + "ksql.streams.auto.offset.reset" : "earliest", + "ksql.query.pull.enable.standby.reads" : "false", + "ksql.connect.url" : "http://localhost:8083", + "ksql.service.id" : "some.ksql.service.id", + "ksql.internal.topic.min.insync.replicas" : "1", + "ksql.streams.shutdown.timeout.ms" : "300000", + "ksql.new.api.enabled" : "false", + "ksql.streams.state.dir" : "/var/folders/bz/qnz23q_j6v12b3b_tm9ztv700000gn/T/confluent7581701272565776623", + "ksql.internal.topic.replicas" : "1", + "ksql.insert.into.values.enabled" : "true", + "ksql.query.pull.max.allowed.offset.lag" : "9223372036854775807", + "ksql.streams.default.production.exception.handler" : "io.confluent.ksql.errors.ProductionExceptionHandlerUtil$LogAndFailProductionExceptionHandler", + "ksql.access.validator.enable" : "auto", + "ksql.streams.bootstrap.servers" : "localhost:0", + "ksql.streams.commit.interval.ms" : "2000", + "ksql.metric.reporters" : "", + "ksql.streams.auto.commit.interval.ms" : "0", + "ksql.metrics.extension" : null, + "ksql.streams.topology.optimization" : "all", + "ksql.query.pull.streamsstore.rebalancing.timeout.ms" : "10000", + "ksql.hidden.topics" : "_confluent.*,__confluent.*,_schemas,__consumer_offsets,__transaction_state,connect-configs,connect-offsets,connect-status,connect-statuses", + "ksql.streams.num.stream.threads" : "4", + "ksql.authorization.cache.max.entries" : "10000", + "ksql.metrics.tags.custom" : "", + "ksql.pull.queries.enable" : "true", + "ksql.udfs.enabled" : "true", + "ksql.udf.enable.security.manager" : "true", + "ksql.connect.worker.config" : "", + "ksql.sink.window.change.log.additional.retention" : "1000000", + "ksql.readonly.topics" : "_confluent.*,__confluent.*,_schemas,__consumer_offsets,__transaction_state,connect-configs,connect-offsets,connect-status,connect-statuses", + "ksql.udf.collect.metrics" : "false", + "ksql.persistent.prefix" : "query_", + "ksql.query.persistent.active.limit" : "2147483647" + } +} \ No newline at end of file diff --git a/ksql-functional-tests/src/test/resources/historical_plans/elements_-_validate_without_elements_OK_-_PROTOBUF/5.5.0_1582662519759/spec.json b/ksql-functional-tests/src/test/resources/historical_plans/elements_-_validate_without_elements_OK_-_PROTOBUF/5.5.0_1582662519759/spec.json index 63ba95cecd5..7196123d191 100644 --- a/ksql-functional-tests/src/test/resources/historical_plans/elements_-_validate_without_elements_OK_-_PROTOBUF/5.5.0_1582662519759/spec.json +++ b/ksql-functional-tests/src/test/resources/historical_plans/elements_-_validate_without_elements_OK_-_PROTOBUF/5.5.0_1582662519759/spec.json @@ -1,150 +1,22 @@ { "version" : "5.5.0", "timestamp" : 1582662519759, - "plan" : [ { - "@type" : "ksqlPlanV1", - "statementText" : "CREATE STREAM INPUT (C1 INTEGER) WITH (AVRO_SCHEMA_ID=1, KAFKA_TOPIC='input', VALUE_FORMAT='PROTOBUF');", - "ddlCommand" : { - "@type" : "createStreamV1", - "sourceName" : "INPUT", - "schema" : "`ROWKEY` STRING KEY, `C1` INTEGER", - "keyField" : null, - "timestampColumn" : null, - "topicName" : "input", - "formats" : { - "keyFormat" : { - "format" : "KAFKA", - "properties" : { } - }, - "valueFormat" : { - "format" : "PROTOBUF", - "properties" : { } - }, - "options" : [ ] - }, - "windowInfo" : null - }, - "queryPlan" : null - }, { - "@type" : "ksqlPlanV1", - "statementText" : "CREATE STREAM OUTPUT WITH (PARTITIONS=4) AS SELECT *\nFROM INPUT INPUT\nEMIT CHANGES", - "ddlCommand" : { - "@type" : "createStreamV1", - "sourceName" : "OUTPUT", - "schema" : "`ROWKEY` STRING KEY, `C1` INTEGER", - "keyField" : null, - "timestampColumn" : null, - "topicName" : "OUTPUT", - "formats" : { - "keyFormat" : { - "format" : "KAFKA", - "properties" : { } - }, - "valueFormat" : { - "format" : "PROTOBUF", - "properties" : { } - }, - "options" : [ ] - }, - "windowInfo" : null - }, - "queryPlan" : { - "sources" : [ "INPUT" ], - "sink" : "OUTPUT", - "physicalPlan" : { - "@type" : "streamSinkV1", - "properties" : { - "queryContext" : "OUTPUT" - }, - "source" : { - "@type" : "streamSelectV1", - "properties" : { - "queryContext" : "Project" - }, - "source" : { - "@type" : "streamSourceV1", - "properties" : { - "queryContext" : "KsqlTopic/Source" - }, - "topicName" : "input", - "formats" : { - "keyFormat" : { - "format" : "KAFKA", - "properties" : { } - }, - "valueFormat" : { - "format" : "PROTOBUF", - "properties" : { } - }, - "options" : [ ] - }, - "timestampColumn" : null, - "sourceSchema" : "`ROWKEY` STRING KEY, `C1` INTEGER" - }, - "selectExpressions" : [ "C1 AS C1" ] - }, - "formats" : { - "keyFormat" : { - "format" : "KAFKA", - "properties" : { } - }, - "valueFormat" : { - "format" : "PROTOBUF", - "properties" : { } - }, - "options" : [ ] - }, - "topicName" : "OUTPUT" - }, - "queryId" : "CSAS_OUTPUT_0" - } - } ], "schemas" : { "CSAS_OUTPUT_0.KsqlTopic.Source" : "STRUCT NOT NULL", "CSAS_OUTPUT_0.OUTPUT" : "STRUCT NOT NULL" }, - "configs" : { - "ksql.extension.dir" : "ext", - "ksql.streams.cache.max.bytes.buffering" : "0", - "ksql.security.extension.class" : null, - "ksql.transient.prefix" : "transient_", - "ksql.persistence.wrap.single.values" : "true", - "ksql.authorization.cache.expiry.time.secs" : "30", - "ksql.schema.registry.url" : "", - "ksql.streams.default.deserialization.exception.handler" : "io.confluent.ksql.errors.LogMetricAndContinueExceptionHandler", - "ksql.output.topic.name.prefix" : "", - "ksql.streams.auto.offset.reset" : "earliest", - "ksql.query.pull.enable.standby.reads" : "false", - "ksql.connect.url" : "http://localhost:8083", - "ksql.service.id" : "some.ksql.service.id", - "ksql.internal.topic.min.insync.replicas" : "1", - "ksql.streams.shutdown.timeout.ms" : "300000", - "ksql.new.api.enabled" : "false", - "ksql.streams.state.dir" : "/var/folders/bz/qnz23q_j6v12b3b_tm9ztv700000gn/T/confluent7581701272565776623", - "ksql.internal.topic.replicas" : "1", - "ksql.insert.into.values.enabled" : "true", - "ksql.query.pull.max.allowed.offset.lag" : "9223372036854775807", - "ksql.streams.default.production.exception.handler" : "io.confluent.ksql.errors.ProductionExceptionHandlerUtil$LogAndFailProductionExceptionHandler", - "ksql.access.validator.enable" : "auto", - "ksql.streams.bootstrap.servers" : "localhost:0", - "ksql.streams.commit.interval.ms" : "2000", - "ksql.metric.reporters" : "", - "ksql.streams.auto.commit.interval.ms" : "0", - "ksql.metrics.extension" : null, - "ksql.streams.topology.optimization" : "all", - "ksql.query.pull.streamsstore.rebalancing.timeout.ms" : "10000", - "ksql.hidden.topics" : "_confluent.*,__confluent.*,_schemas,__consumer_offsets,__transaction_state,connect-configs,connect-offsets,connect-status,connect-statuses", - "ksql.streams.num.stream.threads" : "4", - "ksql.authorization.cache.max.entries" : "10000", - "ksql.metrics.tags.custom" : "", - "ksql.pull.queries.enable" : "true", - "ksql.udfs.enabled" : "true", - "ksql.udf.enable.security.manager" : "true", - "ksql.connect.worker.config" : "", - "ksql.sink.window.change.log.additional.retention" : "1000000", - "ksql.readonly.topics" : "_confluent.*,__confluent.*,_schemas,__consumer_offsets,__transaction_state,connect-configs,connect-offsets,connect-status,connect-statuses", - "ksql.udf.collect.metrics" : "false", - "ksql.persistent.prefix" : "query_", - "ksql.query.persistent.active.limit" : "2147483647" - } + "inputs" : [ { + "topic" : "input", + "key" : "", + "value" : { + "c1" : 4 + } + } ], + "outputs" : [ { + "topic" : "OUTPUT", + "key" : "", + "value" : { + "C1" : 4 + } + } ] } \ No newline at end of file From b2d8d0730fcd364ccf7636eb6f8c12c87d95c349 Mon Sep 17 00:00:00 2001 From: Andy Coates Date: Wed, 26 Feb 2020 10:10:09 +0000 Subject: [PATCH 3/3] chore: add historic plans --- .../ksql/test/PlannedTestGeneratorTest.java | 5 +- .../6.0.0_1582711670275/plan.json | 144 ++++++++++++++++++ .../6.0.0_1582711670275/spec.json | 22 +++ .../6.0.0_1582711670275/topology | 13 ++ .../6.0.0_1582711670325/plan.json | 144 ++++++++++++++++++ .../6.0.0_1582711670325/spec.json | 22 +++ .../6.0.0_1582711670325/topology | 13 ++ .../6.0.0_1582711670295/plan.json | 144 ++++++++++++++++++ .../6.0.0_1582711670295/spec.json | 22 +++ .../6.0.0_1582711670295/topology | 13 ++ .../6.0.0_1582711670237/plan.json | 144 ++++++++++++++++++ .../6.0.0_1582711670237/spec.json | 24 +++ .../6.0.0_1582711670237/topology | 13 ++ .../query-validation-tests/elements.json | 25 +++ 14 files changed, 746 insertions(+), 2 deletions(-) create mode 100644 ksql-functional-tests/src/test/resources/historical_plans/elements_-_validate_without_value_elements_OK_-_AVRO/6.0.0_1582711670275/plan.json create mode 100644 ksql-functional-tests/src/test/resources/historical_plans/elements_-_validate_without_value_elements_OK_-_AVRO/6.0.0_1582711670275/spec.json create mode 100644 ksql-functional-tests/src/test/resources/historical_plans/elements_-_validate_without_value_elements_OK_-_AVRO/6.0.0_1582711670275/topology create mode 100644 ksql-functional-tests/src/test/resources/historical_plans/elements_-_validate_without_value_elements_OK_-_JSON_SCHEMA/6.0.0_1582711670325/plan.json create mode 100644 ksql-functional-tests/src/test/resources/historical_plans/elements_-_validate_without_value_elements_OK_-_JSON_SCHEMA/6.0.0_1582711670325/spec.json create mode 100644 ksql-functional-tests/src/test/resources/historical_plans/elements_-_validate_without_value_elements_OK_-_JSON_SCHEMA/6.0.0_1582711670325/topology create mode 100644 ksql-functional-tests/src/test/resources/historical_plans/elements_-_validate_without_value_elements_OK_-_PROTOBUF/6.0.0_1582711670295/plan.json create mode 100644 ksql-functional-tests/src/test/resources/historical_plans/elements_-_validate_without_value_elements_OK_-_PROTOBUF/6.0.0_1582711670295/spec.json create mode 100644 ksql-functional-tests/src/test/resources/historical_plans/elements_-_validate_without_value_elements_OK_-_PROTOBUF/6.0.0_1582711670295/topology create mode 100644 ksql-functional-tests/src/test/resources/historical_plans/elements_-_with_invalid_or_reserved_words_in_schema_-_JSON/6.0.0_1582711670237/plan.json create mode 100644 ksql-functional-tests/src/test/resources/historical_plans/elements_-_with_invalid_or_reserved_words_in_schema_-_JSON/6.0.0_1582711670237/spec.json create mode 100644 ksql-functional-tests/src/test/resources/historical_plans/elements_-_with_invalid_or_reserved_words_in_schema_-_JSON/6.0.0_1582711670237/topology diff --git a/ksql-functional-tests/src/test/java/io/confluent/ksql/test/PlannedTestGeneratorTest.java b/ksql-functional-tests/src/test/java/io/confluent/ksql/test/PlannedTestGeneratorTest.java index 2463461e78a..7305d022f22 100644 --- a/ksql-functional-tests/src/test/java/io/confluent/ksql/test/PlannedTestGeneratorTest.java +++ b/ksql-functional-tests/src/test/java/io/confluent/ksql/test/PlannedTestGeneratorTest.java @@ -24,8 +24,9 @@ public class PlannedTestGeneratorTest { /** * Run this test to generate new query plans for the {@link QueryTranslationTest} test cases. * - *

Check the new query plans in with your change. Otherwise, {@link PlannedTestsUpToDateTest} - * fill fail if there are missing or changed query plans. + *

Ensure only the test plans you expected have changed, then check the new query plans in + * with your change. Otherwise, {@link PlannedTestsUpToDateTest} fill fail if there are missing + * or changed query plans. */ @Test @Ignore diff --git a/ksql-functional-tests/src/test/resources/historical_plans/elements_-_validate_without_value_elements_OK_-_AVRO/6.0.0_1582711670275/plan.json b/ksql-functional-tests/src/test/resources/historical_plans/elements_-_validate_without_value_elements_OK_-_AVRO/6.0.0_1582711670275/plan.json new file mode 100644 index 00000000000..f6a50c358c4 --- /dev/null +++ b/ksql-functional-tests/src/test/resources/historical_plans/elements_-_validate_without_value_elements_OK_-_AVRO/6.0.0_1582711670275/plan.json @@ -0,0 +1,144 @@ +{ + "plan" : [ { + "@type" : "ksqlPlanV1", + "statementText" : "CREATE STREAM INPUT (ROWKEY INTEGER KEY, C1 INTEGER) WITH (AVRO_SCHEMA_ID=1, KAFKA_TOPIC='input', VALUE_FORMAT='AvRo');", + "ddlCommand" : { + "@type" : "createStreamV1", + "sourceName" : "INPUT", + "schema" : "`ROWKEY` INTEGER KEY, `C1` INTEGER", + "keyField" : null, + "timestampColumn" : null, + "topicName" : "input", + "formats" : { + "keyFormat" : { + "format" : "KAFKA", + "properties" : { } + }, + "valueFormat" : { + "format" : "AVRO", + "properties" : { } + }, + "options" : [ ] + }, + "windowInfo" : null + }, + "queryPlan" : null + }, { + "@type" : "ksqlPlanV1", + "statementText" : "CREATE STREAM OUTPUT AS SELECT *\nFROM INPUT INPUT\nEMIT CHANGES", + "ddlCommand" : { + "@type" : "createStreamV1", + "sourceName" : "OUTPUT", + "schema" : "`ROWKEY` INTEGER KEY, `C1` INTEGER", + "keyField" : null, + "timestampColumn" : null, + "topicName" : "OUTPUT", + "formats" : { + "keyFormat" : { + "format" : "KAFKA", + "properties" : { } + }, + "valueFormat" : { + "format" : "AVRO", + "properties" : { } + }, + "options" : [ ] + }, + "windowInfo" : null + }, + "queryPlan" : { + "sources" : [ "INPUT" ], + "sink" : "OUTPUT", + "physicalPlan" : { + "@type" : "streamSinkV1", + "properties" : { + "queryContext" : "OUTPUT" + }, + "source" : { + "@type" : "streamSelectV1", + "properties" : { + "queryContext" : "Project" + }, + "source" : { + "@type" : "streamSourceV1", + "properties" : { + "queryContext" : "KsqlTopic/Source" + }, + "topicName" : "input", + "formats" : { + "keyFormat" : { + "format" : "KAFKA", + "properties" : { } + }, + "valueFormat" : { + "format" : "AVRO", + "properties" : { } + }, + "options" : [ ] + }, + "timestampColumn" : null, + "sourceSchema" : "`ROWKEY` INTEGER KEY, `C1` INTEGER" + }, + "selectExpressions" : [ "C1 AS C1" ] + }, + "formats" : { + "keyFormat" : { + "format" : "KAFKA", + "properties" : { } + }, + "valueFormat" : { + "format" : "AVRO", + "properties" : { } + }, + "options" : [ ] + }, + "topicName" : "OUTPUT" + }, + "queryId" : "CSAS_OUTPUT_0" + } + } ], + "configs" : { + "ksql.extension.dir" : "ext", + "ksql.streams.cache.max.bytes.buffering" : "0", + "ksql.security.extension.class" : null, + "ksql.transient.prefix" : "transient_", + "ksql.persistence.wrap.single.values" : "true", + "ksql.authorization.cache.expiry.time.secs" : "30", + "ksql.schema.registry.url" : "", + "ksql.streams.default.deserialization.exception.handler" : "io.confluent.ksql.errors.LogMetricAndContinueExceptionHandler", + "ksql.output.topic.name.prefix" : "", + "ksql.streams.auto.offset.reset" : "earliest", + "ksql.query.pull.enable.standby.reads" : "false", + "ksql.connect.url" : "http://localhost:8083", + "ksql.service.id" : "some.ksql.service.id", + "ksql.internal.topic.min.insync.replicas" : "1", + "ksql.streams.shutdown.timeout.ms" : "300000", + "ksql.new.api.enabled" : "false", + "ksql.streams.state.dir" : "/var/folders/2d/3pt97ylj3zngd51bwl91bl3r0000gp/T/confluent3048982612619405901", + "ksql.internal.topic.replicas" : "1", + "ksql.insert.into.values.enabled" : "true", + "ksql.query.pull.max.allowed.offset.lag" : "9223372036854775807", + "ksql.streams.default.production.exception.handler" : "io.confluent.ksql.errors.ProductionExceptionHandlerUtil$LogAndFailProductionExceptionHandler", + "ksql.access.validator.enable" : "auto", + "ksql.streams.bootstrap.servers" : "localhost:0", + "ksql.streams.commit.interval.ms" : "2000", + "ksql.metric.reporters" : "", + "ksql.streams.auto.commit.interval.ms" : "0", + "ksql.metrics.extension" : null, + "ksql.streams.topology.optimization" : "all", + "ksql.query.pull.streamsstore.rebalancing.timeout.ms" : "10000", + "ksql.hidden.topics" : "_confluent.*,__confluent.*,_schemas,__consumer_offsets,__transaction_state,connect-configs,connect-offsets,connect-status,connect-statuses", + "ksql.streams.num.stream.threads" : "4", + "ksql.authorization.cache.max.entries" : "10000", + "ksql.metrics.tags.custom" : "", + "ksql.pull.queries.enable" : "true", + "ksql.udfs.enabled" : "true", + "ksql.udf.enable.security.manager" : "true", + "ksql.connect.worker.config" : "", + "ksql.sink.window.change.log.additional.retention" : "1000000", + "ksql.readonly.topics" : "_confluent.*,__confluent.*,_schemas,__consumer_offsets,__transaction_state,connect-configs,connect-offsets,connect-status,connect-statuses", + "ksql.udf.collect.metrics" : "false", + "ksql.persistent.prefix" : "query_", + "ksql.query.persistent.active.limit" : "2147483647" + } +} \ No newline at end of file diff --git a/ksql-functional-tests/src/test/resources/historical_plans/elements_-_validate_without_value_elements_OK_-_AVRO/6.0.0_1582711670275/spec.json b/ksql-functional-tests/src/test/resources/historical_plans/elements_-_validate_without_value_elements_OK_-_AVRO/6.0.0_1582711670275/spec.json new file mode 100644 index 00000000000..d6570bf810b --- /dev/null +++ b/ksql-functional-tests/src/test/resources/historical_plans/elements_-_validate_without_value_elements_OK_-_AVRO/6.0.0_1582711670275/spec.json @@ -0,0 +1,22 @@ +{ + "version" : "6.0.0", + "timestamp" : 1582711670275, + "schemas" : { + "CSAS_OUTPUT_0.KsqlTopic.Source" : "STRUCT NOT NULL", + "CSAS_OUTPUT_0.OUTPUT" : "STRUCT NOT NULL" + }, + "inputs" : [ { + "topic" : "input", + "key" : 42, + "value" : { + "c1" : 4 + } + } ], + "outputs" : [ { + "topic" : "OUTPUT", + "key" : 42, + "value" : { + "C1" : 4 + } + } ] +} \ No newline at end of file diff --git a/ksql-functional-tests/src/test/resources/historical_plans/elements_-_validate_without_value_elements_OK_-_AVRO/6.0.0_1582711670275/topology b/ksql-functional-tests/src/test/resources/historical_plans/elements_-_validate_without_value_elements_OK_-_AVRO/6.0.0_1582711670275/topology new file mode 100644 index 00000000000..a70a4e91e30 --- /dev/null +++ b/ksql-functional-tests/src/test/resources/historical_plans/elements_-_validate_without_value_elements_OK_-_AVRO/6.0.0_1582711670275/topology @@ -0,0 +1,13 @@ +Topologies: + Sub-topology: 0 + Source: KSTREAM-SOURCE-0000000000 (topics: [input]) + --> KSTREAM-TRANSFORMVALUES-0000000001 + Processor: KSTREAM-TRANSFORMVALUES-0000000001 (stores: []) + --> Project + <-- KSTREAM-SOURCE-0000000000 + Processor: Project (stores: []) + --> KSTREAM-SINK-0000000003 + <-- KSTREAM-TRANSFORMVALUES-0000000001 + Sink: KSTREAM-SINK-0000000003 (topic: OUTPUT) + <-- Project + diff --git a/ksql-functional-tests/src/test/resources/historical_plans/elements_-_validate_without_value_elements_OK_-_JSON_SCHEMA/6.0.0_1582711670325/plan.json b/ksql-functional-tests/src/test/resources/historical_plans/elements_-_validate_without_value_elements_OK_-_JSON_SCHEMA/6.0.0_1582711670325/plan.json new file mode 100644 index 00000000000..553030d2ad0 --- /dev/null +++ b/ksql-functional-tests/src/test/resources/historical_plans/elements_-_validate_without_value_elements_OK_-_JSON_SCHEMA/6.0.0_1582711670325/plan.json @@ -0,0 +1,144 @@ +{ + "plan" : [ { + "@type" : "ksqlPlanV1", + "statementText" : "CREATE STREAM INPUT (ROWKEY INTEGER KEY, C1 BIGINT) WITH (AVRO_SCHEMA_ID=1, KAFKA_TOPIC='input', VALUE_FORMAT='JSON');", + "ddlCommand" : { + "@type" : "createStreamV1", + "sourceName" : "INPUT", + "schema" : "`ROWKEY` INTEGER KEY, `C1` BIGINT", + "keyField" : null, + "timestampColumn" : null, + "topicName" : "input", + "formats" : { + "keyFormat" : { + "format" : "KAFKA", + "properties" : { } + }, + "valueFormat" : { + "format" : "JSON", + "properties" : { } + }, + "options" : [ ] + }, + "windowInfo" : null + }, + "queryPlan" : null + }, { + "@type" : "ksqlPlanV1", + "statementText" : "CREATE STREAM OUTPUT AS SELECT *\nFROM INPUT INPUT\nEMIT CHANGES", + "ddlCommand" : { + "@type" : "createStreamV1", + "sourceName" : "OUTPUT", + "schema" : "`ROWKEY` INTEGER KEY, `C1` BIGINT", + "keyField" : null, + "timestampColumn" : null, + "topicName" : "OUTPUT", + "formats" : { + "keyFormat" : { + "format" : "KAFKA", + "properties" : { } + }, + "valueFormat" : { + "format" : "JSON", + "properties" : { } + }, + "options" : [ ] + }, + "windowInfo" : null + }, + "queryPlan" : { + "sources" : [ "INPUT" ], + "sink" : "OUTPUT", + "physicalPlan" : { + "@type" : "streamSinkV1", + "properties" : { + "queryContext" : "OUTPUT" + }, + "source" : { + "@type" : "streamSelectV1", + "properties" : { + "queryContext" : "Project" + }, + "source" : { + "@type" : "streamSourceV1", + "properties" : { + "queryContext" : "KsqlTopic/Source" + }, + "topicName" : "input", + "formats" : { + "keyFormat" : { + "format" : "KAFKA", + "properties" : { } + }, + "valueFormat" : { + "format" : "JSON", + "properties" : { } + }, + "options" : [ ] + }, + "timestampColumn" : null, + "sourceSchema" : "`ROWKEY` INTEGER KEY, `C1` BIGINT" + }, + "selectExpressions" : [ "C1 AS C1" ] + }, + "formats" : { + "keyFormat" : { + "format" : "KAFKA", + "properties" : { } + }, + "valueFormat" : { + "format" : "JSON", + "properties" : { } + }, + "options" : [ ] + }, + "topicName" : "OUTPUT" + }, + "queryId" : "CSAS_OUTPUT_0" + } + } ], + "configs" : { + "ksql.extension.dir" : "ext", + "ksql.streams.cache.max.bytes.buffering" : "0", + "ksql.security.extension.class" : null, + "ksql.transient.prefix" : "transient_", + "ksql.persistence.wrap.single.values" : "true", + "ksql.authorization.cache.expiry.time.secs" : "30", + "ksql.schema.registry.url" : "", + "ksql.streams.default.deserialization.exception.handler" : "io.confluent.ksql.errors.LogMetricAndContinueExceptionHandler", + "ksql.output.topic.name.prefix" : "", + "ksql.streams.auto.offset.reset" : "earliest", + "ksql.query.pull.enable.standby.reads" : "false", + "ksql.connect.url" : "http://localhost:8083", + "ksql.service.id" : "some.ksql.service.id", + "ksql.internal.topic.min.insync.replicas" : "1", + "ksql.streams.shutdown.timeout.ms" : "300000", + "ksql.new.api.enabled" : "false", + "ksql.streams.state.dir" : "/var/folders/2d/3pt97ylj3zngd51bwl91bl3r0000gp/T/confluent3048982612619405901", + "ksql.internal.topic.replicas" : "1", + "ksql.insert.into.values.enabled" : "true", + "ksql.query.pull.max.allowed.offset.lag" : "9223372036854775807", + "ksql.streams.default.production.exception.handler" : "io.confluent.ksql.errors.ProductionExceptionHandlerUtil$LogAndFailProductionExceptionHandler", + "ksql.access.validator.enable" : "auto", + "ksql.streams.bootstrap.servers" : "localhost:0", + "ksql.streams.commit.interval.ms" : "2000", + "ksql.metric.reporters" : "", + "ksql.streams.auto.commit.interval.ms" : "0", + "ksql.metrics.extension" : null, + "ksql.streams.topology.optimization" : "all", + "ksql.query.pull.streamsstore.rebalancing.timeout.ms" : "10000", + "ksql.hidden.topics" : "_confluent.*,__confluent.*,_schemas,__consumer_offsets,__transaction_state,connect-configs,connect-offsets,connect-status,connect-statuses", + "ksql.streams.num.stream.threads" : "4", + "ksql.authorization.cache.max.entries" : "10000", + "ksql.metrics.tags.custom" : "", + "ksql.pull.queries.enable" : "true", + "ksql.udfs.enabled" : "true", + "ksql.udf.enable.security.manager" : "true", + "ksql.connect.worker.config" : "", + "ksql.sink.window.change.log.additional.retention" : "1000000", + "ksql.readonly.topics" : "_confluent.*,__confluent.*,_schemas,__consumer_offsets,__transaction_state,connect-configs,connect-offsets,connect-status,connect-statuses", + "ksql.udf.collect.metrics" : "false", + "ksql.persistent.prefix" : "query_", + "ksql.query.persistent.active.limit" : "2147483647" + } +} \ No newline at end of file diff --git a/ksql-functional-tests/src/test/resources/historical_plans/elements_-_validate_without_value_elements_OK_-_JSON_SCHEMA/6.0.0_1582711670325/spec.json b/ksql-functional-tests/src/test/resources/historical_plans/elements_-_validate_without_value_elements_OK_-_JSON_SCHEMA/6.0.0_1582711670325/spec.json new file mode 100644 index 00000000000..8c5bbb326dc --- /dev/null +++ b/ksql-functional-tests/src/test/resources/historical_plans/elements_-_validate_without_value_elements_OK_-_JSON_SCHEMA/6.0.0_1582711670325/spec.json @@ -0,0 +1,22 @@ +{ + "version" : "6.0.0", + "timestamp" : 1582711670325, + "schemas" : { + "CSAS_OUTPUT_0.KsqlTopic.Source" : "STRUCT NOT NULL", + "CSAS_OUTPUT_0.OUTPUT" : "STRUCT NOT NULL" + }, + "inputs" : [ { + "topic" : "input", + "key" : 42, + "value" : { + "c1" : 4 + } + } ], + "outputs" : [ { + "topic" : "OUTPUT", + "key" : 42, + "value" : { + "C1" : 4 + } + } ] +} \ No newline at end of file diff --git a/ksql-functional-tests/src/test/resources/historical_plans/elements_-_validate_without_value_elements_OK_-_JSON_SCHEMA/6.0.0_1582711670325/topology b/ksql-functional-tests/src/test/resources/historical_plans/elements_-_validate_without_value_elements_OK_-_JSON_SCHEMA/6.0.0_1582711670325/topology new file mode 100644 index 00000000000..a70a4e91e30 --- /dev/null +++ b/ksql-functional-tests/src/test/resources/historical_plans/elements_-_validate_without_value_elements_OK_-_JSON_SCHEMA/6.0.0_1582711670325/topology @@ -0,0 +1,13 @@ +Topologies: + Sub-topology: 0 + Source: KSTREAM-SOURCE-0000000000 (topics: [input]) + --> KSTREAM-TRANSFORMVALUES-0000000001 + Processor: KSTREAM-TRANSFORMVALUES-0000000001 (stores: []) + --> Project + <-- KSTREAM-SOURCE-0000000000 + Processor: Project (stores: []) + --> KSTREAM-SINK-0000000003 + <-- KSTREAM-TRANSFORMVALUES-0000000001 + Sink: KSTREAM-SINK-0000000003 (topic: OUTPUT) + <-- Project + diff --git a/ksql-functional-tests/src/test/resources/historical_plans/elements_-_validate_without_value_elements_OK_-_PROTOBUF/6.0.0_1582711670295/plan.json b/ksql-functional-tests/src/test/resources/historical_plans/elements_-_validate_without_value_elements_OK_-_PROTOBUF/6.0.0_1582711670295/plan.json new file mode 100644 index 00000000000..edd3386daa4 --- /dev/null +++ b/ksql-functional-tests/src/test/resources/historical_plans/elements_-_validate_without_value_elements_OK_-_PROTOBUF/6.0.0_1582711670295/plan.json @@ -0,0 +1,144 @@ +{ + "plan" : [ { + "@type" : "ksqlPlanV1", + "statementText" : "CREATE STREAM INPUT (ROWKEY INTEGER KEY, C1 INTEGER) WITH (AVRO_SCHEMA_ID=1, KAFKA_TOPIC='input', VALUE_FORMAT='PROTOBUF');", + "ddlCommand" : { + "@type" : "createStreamV1", + "sourceName" : "INPUT", + "schema" : "`ROWKEY` INTEGER KEY, `C1` INTEGER", + "keyField" : null, + "timestampColumn" : null, + "topicName" : "input", + "formats" : { + "keyFormat" : { + "format" : "KAFKA", + "properties" : { } + }, + "valueFormat" : { + "format" : "PROTOBUF", + "properties" : { } + }, + "options" : [ ] + }, + "windowInfo" : null + }, + "queryPlan" : null + }, { + "@type" : "ksqlPlanV1", + "statementText" : "CREATE STREAM OUTPUT AS SELECT *\nFROM INPUT INPUT\nEMIT CHANGES", + "ddlCommand" : { + "@type" : "createStreamV1", + "sourceName" : "OUTPUT", + "schema" : "`ROWKEY` INTEGER KEY, `C1` INTEGER", + "keyField" : null, + "timestampColumn" : null, + "topicName" : "OUTPUT", + "formats" : { + "keyFormat" : { + "format" : "KAFKA", + "properties" : { } + }, + "valueFormat" : { + "format" : "PROTOBUF", + "properties" : { } + }, + "options" : [ ] + }, + "windowInfo" : null + }, + "queryPlan" : { + "sources" : [ "INPUT" ], + "sink" : "OUTPUT", + "physicalPlan" : { + "@type" : "streamSinkV1", + "properties" : { + "queryContext" : "OUTPUT" + }, + "source" : { + "@type" : "streamSelectV1", + "properties" : { + "queryContext" : "Project" + }, + "source" : { + "@type" : "streamSourceV1", + "properties" : { + "queryContext" : "KsqlTopic/Source" + }, + "topicName" : "input", + "formats" : { + "keyFormat" : { + "format" : "KAFKA", + "properties" : { } + }, + "valueFormat" : { + "format" : "PROTOBUF", + "properties" : { } + }, + "options" : [ ] + }, + "timestampColumn" : null, + "sourceSchema" : "`ROWKEY` INTEGER KEY, `C1` INTEGER" + }, + "selectExpressions" : [ "C1 AS C1" ] + }, + "formats" : { + "keyFormat" : { + "format" : "KAFKA", + "properties" : { } + }, + "valueFormat" : { + "format" : "PROTOBUF", + "properties" : { } + }, + "options" : [ ] + }, + "topicName" : "OUTPUT" + }, + "queryId" : "CSAS_OUTPUT_0" + } + } ], + "configs" : { + "ksql.extension.dir" : "ext", + "ksql.streams.cache.max.bytes.buffering" : "0", + "ksql.security.extension.class" : null, + "ksql.transient.prefix" : "transient_", + "ksql.persistence.wrap.single.values" : "true", + "ksql.authorization.cache.expiry.time.secs" : "30", + "ksql.schema.registry.url" : "", + "ksql.streams.default.deserialization.exception.handler" : "io.confluent.ksql.errors.LogMetricAndContinueExceptionHandler", + "ksql.output.topic.name.prefix" : "", + "ksql.streams.auto.offset.reset" : "earliest", + "ksql.query.pull.enable.standby.reads" : "false", + "ksql.connect.url" : "http://localhost:8083", + "ksql.service.id" : "some.ksql.service.id", + "ksql.internal.topic.min.insync.replicas" : "1", + "ksql.streams.shutdown.timeout.ms" : "300000", + "ksql.new.api.enabled" : "false", + "ksql.streams.state.dir" : "/var/folders/2d/3pt97ylj3zngd51bwl91bl3r0000gp/T/confluent3048982612619405901", + "ksql.internal.topic.replicas" : "1", + "ksql.insert.into.values.enabled" : "true", + "ksql.query.pull.max.allowed.offset.lag" : "9223372036854775807", + "ksql.streams.default.production.exception.handler" : "io.confluent.ksql.errors.ProductionExceptionHandlerUtil$LogAndFailProductionExceptionHandler", + "ksql.access.validator.enable" : "auto", + "ksql.streams.bootstrap.servers" : "localhost:0", + "ksql.streams.commit.interval.ms" : "2000", + "ksql.metric.reporters" : "", + "ksql.streams.auto.commit.interval.ms" : "0", + "ksql.metrics.extension" : null, + "ksql.streams.topology.optimization" : "all", + "ksql.query.pull.streamsstore.rebalancing.timeout.ms" : "10000", + "ksql.hidden.topics" : "_confluent.*,__confluent.*,_schemas,__consumer_offsets,__transaction_state,connect-configs,connect-offsets,connect-status,connect-statuses", + "ksql.streams.num.stream.threads" : "4", + "ksql.authorization.cache.max.entries" : "10000", + "ksql.metrics.tags.custom" : "", + "ksql.pull.queries.enable" : "true", + "ksql.udfs.enabled" : "true", + "ksql.udf.enable.security.manager" : "true", + "ksql.connect.worker.config" : "", + "ksql.sink.window.change.log.additional.retention" : "1000000", + "ksql.readonly.topics" : "_confluent.*,__confluent.*,_schemas,__consumer_offsets,__transaction_state,connect-configs,connect-offsets,connect-status,connect-statuses", + "ksql.udf.collect.metrics" : "false", + "ksql.persistent.prefix" : "query_", + "ksql.query.persistent.active.limit" : "2147483647" + } +} \ No newline at end of file diff --git a/ksql-functional-tests/src/test/resources/historical_plans/elements_-_validate_without_value_elements_OK_-_PROTOBUF/6.0.0_1582711670295/spec.json b/ksql-functional-tests/src/test/resources/historical_plans/elements_-_validate_without_value_elements_OK_-_PROTOBUF/6.0.0_1582711670295/spec.json new file mode 100644 index 00000000000..0c5e1f24828 --- /dev/null +++ b/ksql-functional-tests/src/test/resources/historical_plans/elements_-_validate_without_value_elements_OK_-_PROTOBUF/6.0.0_1582711670295/spec.json @@ -0,0 +1,22 @@ +{ + "version" : "6.0.0", + "timestamp" : 1582711670295, + "schemas" : { + "CSAS_OUTPUT_0.KsqlTopic.Source" : "STRUCT NOT NULL", + "CSAS_OUTPUT_0.OUTPUT" : "STRUCT NOT NULL" + }, + "inputs" : [ { + "topic" : "input", + "key" : 42, + "value" : { + "c1" : 4 + } + } ], + "outputs" : [ { + "topic" : "OUTPUT", + "key" : 42, + "value" : { + "C1" : 4 + } + } ] +} \ No newline at end of file diff --git a/ksql-functional-tests/src/test/resources/historical_plans/elements_-_validate_without_value_elements_OK_-_PROTOBUF/6.0.0_1582711670295/topology b/ksql-functional-tests/src/test/resources/historical_plans/elements_-_validate_without_value_elements_OK_-_PROTOBUF/6.0.0_1582711670295/topology new file mode 100644 index 00000000000..a70a4e91e30 --- /dev/null +++ b/ksql-functional-tests/src/test/resources/historical_plans/elements_-_validate_without_value_elements_OK_-_PROTOBUF/6.0.0_1582711670295/topology @@ -0,0 +1,13 @@ +Topologies: + Sub-topology: 0 + Source: KSTREAM-SOURCE-0000000000 (topics: [input]) + --> KSTREAM-TRANSFORMVALUES-0000000001 + Processor: KSTREAM-TRANSFORMVALUES-0000000001 (stores: []) + --> Project + <-- KSTREAM-SOURCE-0000000000 + Processor: Project (stores: []) + --> KSTREAM-SINK-0000000003 + <-- KSTREAM-TRANSFORMVALUES-0000000001 + Sink: KSTREAM-SINK-0000000003 (topic: OUTPUT) + <-- Project + diff --git a/ksql-functional-tests/src/test/resources/historical_plans/elements_-_with_invalid_or_reserved_words_in_schema_-_JSON/6.0.0_1582711670237/plan.json b/ksql-functional-tests/src/test/resources/historical_plans/elements_-_with_invalid_or_reserved_words_in_schema_-_JSON/6.0.0_1582711670237/plan.json new file mode 100644 index 00000000000..7ca63a54610 --- /dev/null +++ b/ksql-functional-tests/src/test/resources/historical_plans/elements_-_with_invalid_or_reserved_words_in_schema_-_JSON/6.0.0_1582711670237/plan.json @@ -0,0 +1,144 @@ +{ + "plan" : [ { + "@type" : "ksqlPlanV1", + "statementText" : "CREATE STREAM INPUT (`@TIMESTAMP` BIGINT, `FROM` BIGINT) WITH (AVRO_SCHEMA_ID=1, KAFKA_TOPIC='input', VALUE_FORMAT='JSON');", + "ddlCommand" : { + "@type" : "createStreamV1", + "sourceName" : "INPUT", + "schema" : "`ROWKEY` STRING KEY, `@TIMESTAMP` BIGINT, `FROM` BIGINT", + "keyField" : null, + "timestampColumn" : null, + "topicName" : "input", + "formats" : { + "keyFormat" : { + "format" : "KAFKA", + "properties" : { } + }, + "valueFormat" : { + "format" : "JSON", + "properties" : { } + }, + "options" : [ ] + }, + "windowInfo" : null + }, + "queryPlan" : null + }, { + "@type" : "ksqlPlanV1", + "statementText" : "CREATE STREAM OUTPUT AS SELECT *\nFROM INPUT INPUT\nEMIT CHANGES", + "ddlCommand" : { + "@type" : "createStreamV1", + "sourceName" : "OUTPUT", + "schema" : "`ROWKEY` STRING KEY, `@TIMESTAMP` BIGINT, `FROM` BIGINT", + "keyField" : null, + "timestampColumn" : null, + "topicName" : "OUTPUT", + "formats" : { + "keyFormat" : { + "format" : "KAFKA", + "properties" : { } + }, + "valueFormat" : { + "format" : "JSON", + "properties" : { } + }, + "options" : [ ] + }, + "windowInfo" : null + }, + "queryPlan" : { + "sources" : [ "INPUT" ], + "sink" : "OUTPUT", + "physicalPlan" : { + "@type" : "streamSinkV1", + "properties" : { + "queryContext" : "OUTPUT" + }, + "source" : { + "@type" : "streamSelectV1", + "properties" : { + "queryContext" : "Project" + }, + "source" : { + "@type" : "streamSourceV1", + "properties" : { + "queryContext" : "KsqlTopic/Source" + }, + "topicName" : "input", + "formats" : { + "keyFormat" : { + "format" : "KAFKA", + "properties" : { } + }, + "valueFormat" : { + "format" : "JSON", + "properties" : { } + }, + "options" : [ ] + }, + "timestampColumn" : null, + "sourceSchema" : "`ROWKEY` STRING KEY, `@TIMESTAMP` BIGINT, `FROM` BIGINT" + }, + "selectExpressions" : [ "`@TIMESTAMP` AS `@TIMESTAMP`", "`FROM` AS `FROM`" ] + }, + "formats" : { + "keyFormat" : { + "format" : "KAFKA", + "properties" : { } + }, + "valueFormat" : { + "format" : "JSON", + "properties" : { } + }, + "options" : [ ] + }, + "topicName" : "OUTPUT" + }, + "queryId" : "CSAS_OUTPUT_0" + } + } ], + "configs" : { + "ksql.extension.dir" : "ext", + "ksql.streams.cache.max.bytes.buffering" : "0", + "ksql.security.extension.class" : null, + "ksql.transient.prefix" : "transient_", + "ksql.persistence.wrap.single.values" : "true", + "ksql.authorization.cache.expiry.time.secs" : "30", + "ksql.schema.registry.url" : "", + "ksql.streams.default.deserialization.exception.handler" : "io.confluent.ksql.errors.LogMetricAndContinueExceptionHandler", + "ksql.output.topic.name.prefix" : "", + "ksql.streams.auto.offset.reset" : "earliest", + "ksql.query.pull.enable.standby.reads" : "false", + "ksql.connect.url" : "http://localhost:8083", + "ksql.service.id" : "some.ksql.service.id", + "ksql.internal.topic.min.insync.replicas" : "1", + "ksql.streams.shutdown.timeout.ms" : "300000", + "ksql.new.api.enabled" : "false", + "ksql.streams.state.dir" : "/var/folders/2d/3pt97ylj3zngd51bwl91bl3r0000gp/T/confluent3048982612619405901", + "ksql.internal.topic.replicas" : "1", + "ksql.insert.into.values.enabled" : "true", + "ksql.query.pull.max.allowed.offset.lag" : "9223372036854775807", + "ksql.streams.default.production.exception.handler" : "io.confluent.ksql.errors.ProductionExceptionHandlerUtil$LogAndFailProductionExceptionHandler", + "ksql.access.validator.enable" : "auto", + "ksql.streams.bootstrap.servers" : "localhost:0", + "ksql.streams.commit.interval.ms" : "2000", + "ksql.metric.reporters" : "", + "ksql.streams.auto.commit.interval.ms" : "0", + "ksql.metrics.extension" : null, + "ksql.streams.topology.optimization" : "all", + "ksql.query.pull.streamsstore.rebalancing.timeout.ms" : "10000", + "ksql.hidden.topics" : "_confluent.*,__confluent.*,_schemas,__consumer_offsets,__transaction_state,connect-configs,connect-offsets,connect-status,connect-statuses", + "ksql.streams.num.stream.threads" : "4", + "ksql.authorization.cache.max.entries" : "10000", + "ksql.metrics.tags.custom" : "", + "ksql.pull.queries.enable" : "true", + "ksql.udfs.enabled" : "true", + "ksql.udf.enable.security.manager" : "true", + "ksql.connect.worker.config" : "", + "ksql.sink.window.change.log.additional.retention" : "1000000", + "ksql.readonly.topics" : "_confluent.*,__confluent.*,_schemas,__consumer_offsets,__transaction_state,connect-configs,connect-offsets,connect-status,connect-statuses", + "ksql.udf.collect.metrics" : "false", + "ksql.persistent.prefix" : "query_", + "ksql.query.persistent.active.limit" : "2147483647" + } +} \ No newline at end of file diff --git a/ksql-functional-tests/src/test/resources/historical_plans/elements_-_with_invalid_or_reserved_words_in_schema_-_JSON/6.0.0_1582711670237/spec.json b/ksql-functional-tests/src/test/resources/historical_plans/elements_-_with_invalid_or_reserved_words_in_schema_-_JSON/6.0.0_1582711670237/spec.json new file mode 100644 index 00000000000..a926c858911 --- /dev/null +++ b/ksql-functional-tests/src/test/resources/historical_plans/elements_-_with_invalid_or_reserved_words_in_schema_-_JSON/6.0.0_1582711670237/spec.json @@ -0,0 +1,24 @@ +{ + "version" : "6.0.0", + "timestamp" : 1582711670237, + "schemas" : { + "CSAS_OUTPUT_0.KsqlTopic.Source" : "STRUCT<@TIMESTAMP BIGINT, FROM BIGINT> NOT NULL", + "CSAS_OUTPUT_0.OUTPUT" : "STRUCT<@TIMESTAMP BIGINT, FROM BIGINT> NOT NULL" + }, + "inputs" : [ { + "topic" : "input", + "key" : "", + "value" : { + "@timestamp" : 4, + "from" : 5 + } + } ], + "outputs" : [ { + "topic" : "OUTPUT", + "key" : "", + "value" : { + "@TIMESTAMP" : 4, + "FROM" : 5 + } + } ] +} \ No newline at end of file diff --git a/ksql-functional-tests/src/test/resources/historical_plans/elements_-_with_invalid_or_reserved_words_in_schema_-_JSON/6.0.0_1582711670237/topology b/ksql-functional-tests/src/test/resources/historical_plans/elements_-_with_invalid_or_reserved_words_in_schema_-_JSON/6.0.0_1582711670237/topology new file mode 100644 index 00000000000..a70a4e91e30 --- /dev/null +++ b/ksql-functional-tests/src/test/resources/historical_plans/elements_-_with_invalid_or_reserved_words_in_schema_-_JSON/6.0.0_1582711670237/topology @@ -0,0 +1,13 @@ +Topologies: + Sub-topology: 0 + Source: KSTREAM-SOURCE-0000000000 (topics: [input]) + --> KSTREAM-TRANSFORMVALUES-0000000001 + Processor: KSTREAM-TRANSFORMVALUES-0000000001 (stores: []) + --> Project + <-- KSTREAM-SOURCE-0000000000 + Processor: Project (stores: []) + --> KSTREAM-SINK-0000000003 + <-- KSTREAM-TRANSFORMVALUES-0000000001 + Sink: KSTREAM-SINK-0000000003 (topic: OUTPUT) + <-- Project + diff --git a/ksql-functional-tests/src/test/resources/query-validation-tests/elements.json b/ksql-functional-tests/src/test/resources/query-validation-tests/elements.json index a967e3bc537..b4017bcbe24 100644 --- a/ksql-functional-tests/src/test/resources/query-validation-tests/elements.json +++ b/ksql-functional-tests/src/test/resources/query-validation-tests/elements.json @@ -134,6 +134,31 @@ ] } }, + { + "name": "validate without value elements OK - PROTOBUF", + "statements": [ + "CREATE STREAM INPUT (rowkey int key) WITH (kafka_topic='input', value_format='PROTOBUF');", + "CREATE STREAM OUTPUT AS SELECT * FROM input;" + ], + "topics": [ + { + "name": "input", + "schema": "syntax = \"proto3\"; message ConnectDefault1 { int32 c1 = 1; }", + "format": "PROTOBUF" + }, + { + "name": "OUTPUT", + "format": "PROTOBUF" + } + ], + "inputs": [{"topic": "input", "key": 42, "value": {"c1": 4}}], + "outputs": [{"topic": "OUTPUT", "key": 42, "value": {"C1": 4}}], + "post": { + "sources": [ + {"name": "OUTPUT", "type": "stream", "schema": "ROWKEY INT KEY, `C1` INT"} + ] + } + }, { "name": "validate without value elements OK - JSON SCHEMA", "statements": [