From ca6478a521e7a1c8b8d38c84b378eff2aae47d02 Mon Sep 17 00:00:00 2001 From: Andy Coates <8012398+big-andy-coates@users.noreply.github.com> Date: Sun, 14 Jul 2019 17:55:05 +0100 Subject: [PATCH] feat: add basic support for key syntax (#3034) * feat: add basic support for key syntax Adds ability to explicitly include key fields in C* statements so long as: - the key field is named `ROWKEY` - it of type `STRING`. i.e. you can now do: `CREATE STREAM FOO AS (ROWKEY STRING KEY, ...) ...;` --- .../ksql/benchmark/SerdeBenchmark.java | 21 +- .../ksql/schema/ksql/LogicalSchema.java | 27 +- .../io/confluent/ksql/util/SchemaUtil.java | 1 - .../ksql/schema/ksql/LogicalSchemaTest.java | 430 ++++++++++++++---- .../ddl/commands/CreateSourceCommand.java | 15 +- .../ksql/planner/LogicalPlanner.java | 20 +- .../ksql/planner/plan/AggregateNode.java | 5 +- .../confluent/ksql/planner/plan/JoinNode.java | 45 +- .../ksql/structured/SchemaKStream.java | 19 +- .../ddl/commands/CommandFactoriesTest.java | 7 +- .../ddl/commands/CreateSourceCommandTest.java | 72 ++- .../ddl/commands/CreateStreamCommandTest.java | 5 +- .../ddl/commands/CreateTableCommandTest.java | 3 +- .../ksql/planner/plan/DataSourceNodeTest.java | 16 +- .../inference/DefaultSchemaInjectorTest.java | 23 +- .../confluent/ksql/datagen/AvroProducer.java | 7 +- .../ksql/datagen/DataGenProducer.java | 6 + .../ksql/datagen/DelimitedProducer.java | 7 +- .../confluent/ksql/datagen/JsonProducer.java | 7 +- .../query-validation-tests/key-schemas.json | 81 ++++ .../io/confluent/ksql/parser/SqlBase.g4 | 4 +- .../io/confluent/ksql/parser/AstBuilder.java | 2 + .../confluent/ksql/parser/SchemaParser.java | 5 +- .../confluent/ksql/parser/SqlFormatter.java | 4 +- .../ksql/parser/tree/TableElement.java | 45 +- .../ksql/parser/tree/TableElements.java | 50 +- .../confluent/ksql/parser/KsqlParserTest.java | 100 ++-- .../ksql/parser/SchemaParserTest.java | 29 +- .../ksql/parser/SqlFormatterTest.java | 48 +- .../parser/rewrite/StatementRewriterTest.java | 113 ++--- .../ksql/parser/tree/CreateStreamTest.java | 3 +- .../ksql/parser/tree/CreateTableTest.java | 3 +- .../ksql/parser/tree/TableElementTest.java | 77 +++- .../ksql/parser/tree/TableElementsTest.java | 98 +++- .../KsqlRestServiceContextFactory.java | 1 - .../security/KsqlUserContextProvider.java | 1 - .../AuthorizationFunctionalTest.java | 25 +- .../MockKsqlSecurityExtension.java | 1 - .../rest/server/StandaloneExecutorTest.java | 3 +- .../KsqlRestServiceContextFactoryTest.java | 24 +- .../server/resources/KsqlResourceTest.java | 3 +- .../metrics/collector/BasicCollectorTest.java | 1 - 42 files changed, 1075 insertions(+), 382 deletions(-) diff --git a/ksql-benchmark/src/main/java/io/confluent/ksql/benchmark/SerdeBenchmark.java b/ksql-benchmark/src/main/java/io/confluent/ksql/benchmark/SerdeBenchmark.java index 4ed82f8d53fa..0fdd07525630 100644 --- a/ksql-benchmark/src/main/java/io/confluent/ksql/benchmark/SerdeBenchmark.java +++ b/ksql-benchmark/src/main/java/io/confluent/ksql/benchmark/SerdeBenchmark.java @@ -33,6 +33,7 @@ import io.confluent.ksql.serde.json.KsqlJsonSerdeFactory; import io.confluent.ksql.util.KsqlConfig; import io.confluent.ksql.util.Pair; +import io.confluent.ksql.util.SchemaUtil; import java.io.InputStream; import java.nio.file.Path; import java.nio.file.Paths; @@ -117,6 +118,11 @@ private InputStream getSchemaStream() { @State(Scope.Thread) public static class SerdeState { + + private static final org.apache.kafka.connect.data.Schema KEY_SCHEMA = SchemaBuilder.struct() + .field(SchemaUtil.ROWKEY_NAME, org.apache.kafka.connect.data.Schema.OPTIONAL_STRING_SCHEMA) + .build(); + Serializer serializer; Deserializer deserializer; GenericRow row; @@ -165,9 +171,14 @@ private static org.apache.kafka.connect.data.Schema convertFieldNamesToUppercase private static Serde getJsonSerdeHelper( final org.apache.kafka.connect.data.Schema schema ) { + final PhysicalSchema physicalSchema = PhysicalSchema.from( + LogicalSchema.of(KEY_SCHEMA, schema), + SerdeOption.none() + ); + return GenericRowSerDe.from( new KsqlJsonSerdeFactory(), - PhysicalSchema.from(LogicalSchema.of(schema), SerdeOption.none()), + physicalSchema, new KsqlConfig(Collections.emptyMap()), () -> null, "benchmark", @@ -178,9 +189,15 @@ private static Serde getAvroSerde( final org.apache.kafka.connect.data.Schema schema ) { final SchemaRegistryClient schemaRegistryClient = new MockSchemaRegistryClient(); + + final PhysicalSchema physicalSchema = PhysicalSchema.from( + LogicalSchema.of(KEY_SCHEMA, schema), + SerdeOption.none() + ); + return GenericRowSerDe.from( new KsqlAvroSerdeFactory("benchmarkSchema"), - PhysicalSchema.from(LogicalSchema.of(schema), SerdeOption.none()), + physicalSchema, new KsqlConfig(Collections.emptyMap()), () -> schemaRegistryClient, "benchmark", diff --git a/ksql-common/src/main/java/io/confluent/ksql/schema/ksql/LogicalSchema.java b/ksql-common/src/main/java/io/confluent/ksql/schema/ksql/LogicalSchema.java index 7c9cb6ee7846..5f62d5fd1504 100644 --- a/ksql-common/src/main/java/io/confluent/ksql/schema/ksql/LogicalSchema.java +++ b/ksql-common/src/main/java/io/confluent/ksql/schema/ksql/LogicalSchema.java @@ -63,7 +63,7 @@ public final class LogicalSchema { .field(SchemaUtil.ROWTIME_NAME, Schema.OPTIONAL_INT64_SCHEMA) .build(); - private static final Schema KEY_SCHEMA = SchemaBuilder + private static final Schema IMPLICIT_KEY_SCHEMA = SchemaBuilder .struct() .field(SchemaUtil.ROWKEY_NAME, Schema.OPTIONAL_STRING_SCHEMA) .build(); @@ -90,7 +90,14 @@ public final class LogicalSchema { private final ConnectSchema valueSchema; public static LogicalSchema of(final Schema valueSchema) { - return new LogicalSchema(METADATA_SCHEMA, KEY_SCHEMA, valueSchema, Optional.empty()); + return LogicalSchema.of(IMPLICIT_KEY_SCHEMA, valueSchema); + } + + public static LogicalSchema of( + final Schema keySchema, + final Schema valueSchema + ) { + return new LogicalSchema(METADATA_SCHEMA, keySchema, valueSchema, Optional.empty()); } private LogicalSchema( @@ -105,6 +112,10 @@ private LogicalSchema( this.alias = requireNonNull(alias, "alias"); } + public ConnectSchema keySchema() { + return keySchema; + } + public ConnectSchema valueSchema() { return valueSchema; } @@ -237,6 +248,13 @@ public LogicalSchema withoutAlias() { ); } + /** + * @return {@code true} is aliased, {@code false} otherwise. + */ + public boolean isAliased() { + return alias.isPresent(); + } + /** * Copies metadata and key fields to the value schema. * @@ -310,12 +328,13 @@ public boolean equals(final Object o) { return false; } final LogicalSchema that = (LogicalSchema) o; - return schemasAreEqual(valueSchema, that.valueSchema); + return Objects.equals(keySchema, that.keySchema) + && Objects.equals(valueSchema, that.valueSchema); } @Override public int hashCode() { - return Objects.hash(valueSchema); + return Objects.hash(keySchema, valueSchema); } @Override diff --git a/ksql-common/src/main/java/io/confluent/ksql/util/SchemaUtil.java b/ksql-common/src/main/java/io/confluent/ksql/util/SchemaUtil.java index 9e2b34424dcb..ca28b5312fcd 100644 --- a/ksql-common/src/main/java/io/confluent/ksql/util/SchemaUtil.java +++ b/ksql-common/src/main/java/io/confluent/ksql/util/SchemaUtil.java @@ -50,7 +50,6 @@ public final class SchemaUtil { public static final String ROWKEY_NAME = "ROWKEY"; public static final String ROWTIME_NAME = "ROWTIME"; - public static final int ROWTIME_INDEX = 0; public static final int ROWKEY_INDEX = 1; private static final Map> typeToSchema = ImmutableMap.>builder() diff --git a/ksql-common/src/test/java/io/confluent/ksql/schema/ksql/LogicalSchemaTest.java b/ksql-common/src/test/java/io/confluent/ksql/schema/ksql/LogicalSchemaTest.java index bcf4ca8ba048..1601ee47b850 100644 --- a/ksql-common/src/test/java/io/confluent/ksql/schema/ksql/LogicalSchemaTest.java +++ b/ksql-common/src/test/java/io/confluent/ksql/schema/ksql/LogicalSchemaTest.java @@ -48,7 +48,7 @@ public class LogicalSchemaTest { .optional() .field("f0", Schema.OPTIONAL_INT64_SCHEMA); - private static final Schema SOME_CONNECT_SCHEMA = SchemaBuilder.struct() + private static final Schema SOME_VALUE_SCHEMA = SchemaBuilder.struct() .field("f0", SchemaBuilder.OPTIONAL_STRING_SCHEMA) .field("f1", SchemaBuilder.OPTIONAL_INT64_SCHEMA) .build(); @@ -62,7 +62,13 @@ public class LogicalSchemaTest { .field("bob.f1", SchemaBuilder.OPTIONAL_INT64_SCHEMA) .build(); - private static final LogicalSchema SOME_SCHEMA = LogicalSchema.of(SOME_CONNECT_SCHEMA); + private static final Schema SOME_KEY_SCHEMA = SchemaBuilder.struct() + .field("k0", SchemaBuilder.OPTIONAL_INT64_SCHEMA) + .build(); + + private static final LogicalSchema SOME_SCHEMA = + LogicalSchema.of(SOME_KEY_SCHEMA, SOME_VALUE_SCHEMA); + private static final LogicalSchema ALIASED_SCHEMA = SOME_SCHEMA.withAlias("bob"); @Rule @@ -70,30 +76,62 @@ public class LogicalSchemaTest { @Test public void shouldImplementEqualsProperly() { + final Schema implicitKeySchema = SchemaBuilder.struct() + .field(ROWKEY_NAME, Schema.OPTIONAL_STRING_SCHEMA) + .build(); + + final Schema otherKeySchema = SchemaBuilder.struct() + .field("k0", Schema.OPTIONAL_INT32_SCHEMA) + .build(); + new EqualsTester() .addEqualityGroup( - LogicalSchema.of(SOME_CONNECT_SCHEMA), - LogicalSchema.of(SOME_CONNECT_SCHEMA), - LogicalSchema.of(SOME_CONNECT_SCHEMA).withAlias("bob").withoutAlias() + LogicalSchema.of(SOME_VALUE_SCHEMA), + LogicalSchema.of(implicitKeySchema, SOME_VALUE_SCHEMA), + LogicalSchema.of(SOME_VALUE_SCHEMA).withAlias("bob").withoutAlias(), + LogicalSchema.of(SOME_VALUE_SCHEMA) + .withMetaAndKeyFieldsInValue() + .withoutMetaAndKeyFieldsInValue() ) .addEqualityGroup( - LogicalSchema.of(SOME_CONNECT_SCHEMA).withAlias("bob") + LogicalSchema.of(SOME_VALUE_SCHEMA).withAlias("bob") ) .addEqualityGroup( LogicalSchema.of(OTHER_CONNECT_SCHEMA) ) + .addEqualityGroup( + LogicalSchema.of(otherKeySchema, SOME_VALUE_SCHEMA) + ) .testEquals(); } @Test - public void shouldThrowOnNoneSqlTypes() { + public void shouldThrowOnNoneSqlTypesInKey() { + Stream.of( + Schema.OPTIONAL_INT8_SCHEMA, + Schema.OPTIONAL_INT16_SCHEMA, + Schema.OPTIONAL_FLOAT32_SCHEMA + ).forEach(schema -> { + try { + final Schema keySchema = SchemaBuilder.struct().field("test", schema).build(); + LogicalSchema.of(keySchema, SOME_VALUE_SCHEMA); + fail(); + } catch (final IllegalArgumentException e) { + assertThat(schema.toString(), e.getMessage(), containsString("Unsupported schema type")); + } + }); + } + + @Test + public void shouldThrowOnNoneSqlTypesInValue() { Stream.of( Schema.OPTIONAL_INT8_SCHEMA, Schema.OPTIONAL_INT16_SCHEMA, Schema.OPTIONAL_FLOAT32_SCHEMA ).forEach(schema -> { try { - LogicalSchema.of(SchemaBuilder.struct().field("test", schema).build()); + final Schema valueSchema = SchemaBuilder.struct().field("test", schema).build(); + LogicalSchema.of(SOME_KEY_SCHEMA, valueSchema); fail(); } catch (final IllegalArgumentException e) { assertThat(schema.toString(), e.getMessage(), containsString("Unsupported schema type")); @@ -102,145 +140,336 @@ public void shouldThrowOnNoneSqlTypes() { } @Test - public void shouldThrowIfNotTopLevelStruct() { + public void shouldThrowIfKeyNotTopLevelStruct() { + // Then: + expectedException.expect(IllegalArgumentException.class); + expectedException.expectMessage("Top level schema must be STRUCT"); + + // When: + LogicalSchema.of(Schema.OPTIONAL_INT64_SCHEMA, SOME_VALUE_SCHEMA); + } + + @Test + public void shouldThrowIfValueNotTopLevelStruct() { // Then: expectedException.expect(IllegalArgumentException.class); expectedException.expectMessage("Top level schema must be STRUCT"); // When: - LogicalSchema.of(Schema.OPTIONAL_INT64_SCHEMA); + LogicalSchema.of(SOME_KEY_SCHEMA, Schema.OPTIONAL_INT64_SCHEMA); } @Test - public void shouldThrowOnMutableStructFields() { + public void shouldThrowOnMutableStructFieldsInKey() { // Then: expectedException.expect(IllegalArgumentException.class); expectedException.expectMessage("Mutable schema found"); // When: - LogicalSchema.of(nested( - SchemaBuilder.struct() + LogicalSchema.of( + nested(SchemaBuilder.struct() .field("fieldWithMutableSchema", MUTABLE_SCHEMA) .optional() - .build() - )); + .build()), + SOME_VALUE_SCHEMA + ); } @Test - public void shouldThrowOnMutableMapKeys() { + public void shouldThrowOnMutableStructFieldsInValue() { // Then: expectedException.expect(IllegalArgumentException.class); expectedException.expectMessage("Mutable schema found"); // When: - LogicalSchema.of(nested( - SchemaBuilder.map(new SchemaBuilder(Type.STRING).optional(), IMMUTABLE_SCHEMA) + LogicalSchema.of( + SOME_KEY_SCHEMA, + nested(SchemaBuilder + .struct() + .field("fieldWithMutableSchema", MUTABLE_SCHEMA) .optional() - .build() - )); + .build()) + ); } @Test - public void shouldThrowOnMutableMapValues() { + public void shouldThrowOnMutableMapKeysInKey() { // Then: expectedException.expect(IllegalArgumentException.class); expectedException.expectMessage("Mutable schema found"); // When: - LogicalSchema.of(nested( - SchemaBuilder.map(IMMUTABLE_SCHEMA, MUTABLE_SCHEMA) + LogicalSchema.of( + nested(SchemaBuilder + .map(new SchemaBuilder(Type.STRING).optional(), IMMUTABLE_SCHEMA) .optional() - .build() - )); + .build()), + SOME_VALUE_SCHEMA + ); } @Test - public void shouldThrowOnMutableArrayElements() { + public void shouldThrowOnMutableMapKeysInValue() { // Then: expectedException.expect(IllegalArgumentException.class); expectedException.expectMessage("Mutable schema found"); // When: - LogicalSchema.of(nested( - SchemaBuilder.array(MUTABLE_SCHEMA) + LogicalSchema.of( + SOME_KEY_SCHEMA, + nested(SchemaBuilder + .map(new SchemaBuilder(Type.STRING).optional(), IMMUTABLE_SCHEMA) .optional() - .build() - )); + .build()) + ); } @Test - public void shouldThrowOnNoneOptionalMapKeys() { + public void shouldThrowOnMutableMapValuesInKey() { + // Then: + expectedException.expect(IllegalArgumentException.class); + expectedException.expectMessage("Mutable schema found"); + + // When: + LogicalSchema.of( + nested(SchemaBuilder + .map(IMMUTABLE_SCHEMA, MUTABLE_SCHEMA) + .optional() + .build()), + SOME_VALUE_SCHEMA + ); + } + + @Test + public void shouldThrowOnMutableMapValuesInValue() { + // Then: + expectedException.expect(IllegalArgumentException.class); + expectedException.expectMessage("Mutable schema found"); + + // When: + LogicalSchema.of( + SOME_KEY_SCHEMA, + nested(SchemaBuilder + .map(IMMUTABLE_SCHEMA, MUTABLE_SCHEMA) + .optional() + .build()) + ); + } + + @Test + public void shouldThrowOnMutableArrayElementsInKey() { + // Then: + expectedException.expect(IllegalArgumentException.class); + expectedException.expectMessage("Mutable schema found"); + + // When: + LogicalSchema.of( + nested(SchemaBuilder + .array(MUTABLE_SCHEMA) + .optional() + .build()), + SOME_VALUE_SCHEMA + ); + } + + @Test + public void shouldThrowOnMutableArrayElementsInValue() { + // Then: + expectedException.expect(IllegalArgumentException.class); + expectedException.expectMessage("Mutable schema found"); + + // When: + LogicalSchema.of( + SOME_KEY_SCHEMA, + nested(SchemaBuilder + .array(MUTABLE_SCHEMA) + .optional() + .build()) + ); + } + + @Test + public void shouldThrowOnNoneOptionalMapKeysInKey() { // Then: expectedException.expect(IllegalArgumentException.class); expectedException.expectMessage("Non-optional field found"); // When: - LogicalSchema.of(nested( - SchemaBuilder.map(IMMUTABLE_SCHEMA, IMMUTABLE_SCHEMA) - .build() - )); + LogicalSchema.of( + nested(SchemaBuilder + .map(IMMUTABLE_SCHEMA, IMMUTABLE_SCHEMA) + .build()), + SOME_VALUE_SCHEMA + ); } @Test - public void shouldThrowOnNoneOptionalMapValues() { + public void shouldThrowOnNoneOptionalMapKeysInValue() { // Then: expectedException.expect(IllegalArgumentException.class); expectedException.expectMessage("Non-optional field found"); // When: - LogicalSchema.of(nested( - SchemaBuilder.map(IMMUTABLE_SCHEMA, IMMUTABLE_SCHEMA).build() - )); + LogicalSchema.of( + SOME_KEY_SCHEMA, + nested(SchemaBuilder + .map(IMMUTABLE_SCHEMA, IMMUTABLE_SCHEMA) + .build()) + ); } @Test - public void shouldThrowOnNoneOptionalElements() { + public void shouldThrowOnNoneOptionalMapValuesInKey() { // Then: expectedException.expect(IllegalArgumentException.class); expectedException.expectMessage("Non-optional field found"); // When: - LogicalSchema.of(nested( - SchemaBuilder.array(IMMUTABLE_SCHEMA).build() - )); + LogicalSchema.of( + nested(SchemaBuilder + .map(IMMUTABLE_SCHEMA, IMMUTABLE_SCHEMA) + .build()), + SOME_VALUE_SCHEMA + ); } @Test - public void shouldNotThrowIfTopLevelNotOptional() { + public void shouldThrowOnNoneOptionalMapValuesInValue() { + // Then: + expectedException.expect(IllegalArgumentException.class); + expectedException.expectMessage("Non-optional field found"); + + // When: + LogicalSchema.of( + SOME_KEY_SCHEMA, + nested(SchemaBuilder + .map(IMMUTABLE_SCHEMA, IMMUTABLE_SCHEMA) + .build()) + ); + } + + @Test + public void shouldThrowOnNoneOptionalElementsInKey() { + // Then: + expectedException.expect(IllegalArgumentException.class); + expectedException.expectMessage("Non-optional field found"); + + // When: + LogicalSchema.of( + nested(SchemaBuilder + .array(IMMUTABLE_SCHEMA) + .build()), + SOME_VALUE_SCHEMA + ); + } + + @Test + public void shouldThrowOnNoneOptionalElementsInValue() { + // Then: + expectedException.expect(IllegalArgumentException.class); + expectedException.expectMessage("Non-optional field found"); + + // When: + LogicalSchema.of( + SOME_KEY_SCHEMA, + nested(SchemaBuilder + .array(IMMUTABLE_SCHEMA) + .build()) + ); + } + + @Test + public void shouldNotThrowIfTopLevelNotOptionalInKey() { + // Given: + final Schema schema = SchemaBuilder.struct() + .field("f0", Schema.OPTIONAL_INT64_SCHEMA) + .build(); + + // When: + LogicalSchema.of(schema, SOME_VALUE_SCHEMA); + + // Then: did not throw. + } + + @Test + public void shouldNotThrowIfTopLevelNotOptionalInValue() { // Given: final Schema schema = SchemaBuilder.struct() .field("f0", Schema.OPTIONAL_INT64_SCHEMA) .build(); // When: - LogicalSchema.of(schema); + LogicalSchema.of(SOME_KEY_SCHEMA, schema); // Then: did not throw. } @Test - public void shouldThrowOnNoneStringMapLey() { + public void shouldThrowOnNoneStringMapKeyInKey() { // Then: expectedException.expect(IllegalArgumentException.class); expectedException.expectMessage("MAP only supports STRING keys"); // When: - LogicalSchema.of(nested( - SchemaBuilder.map(Schema.OPTIONAL_INT64_SCHEMA, IMMUTABLE_SCHEMA) + LogicalSchema.of( + nested(SchemaBuilder + .map(Schema.OPTIONAL_INT64_SCHEMA, IMMUTABLE_SCHEMA) .optional() - .build() - )); + .build()), + SOME_VALUE_SCHEMA + ); + } + + @Test + public void shouldThrowOnNoneStringMapKeyInValue() { + // Then: + expectedException.expect(IllegalArgumentException.class); + expectedException.expectMessage("MAP only supports STRING keys"); + + // When: + LogicalSchema.of( + SOME_KEY_SCHEMA, + nested(SchemaBuilder + .map(Schema.OPTIONAL_INT64_SCHEMA, IMMUTABLE_SCHEMA) + .optional() + .build()) + ); + } + + @Test + public void shouldThrowOnNonDecimalBytesInKey() { + // Then: + expectedException.expect(KsqlException.class); + expectedException.expectMessage( + "Expected schema of type DECIMAL but got a schema of type BYTES and name foobar"); + + // When: + LogicalSchema.of( + nested(SchemaBuilder + .bytes() + .name("foobar") + .optional() + .build()), + SOME_VALUE_SCHEMA + ); } @Test - public void shouldThrowOnNonDecimalBytes() { + public void shouldThrowOnNonDecimalBytesInValue() { // Then: expectedException.expect(KsqlException.class); expectedException.expectMessage( "Expected schema of type DECIMAL but got a schema of type BYTES and name foobar"); // When: - LogicalSchema.of(nested(SchemaBuilder.bytes().name("foobar").optional().build())); + LogicalSchema.of( + SOME_KEY_SCHEMA, + nested(SchemaBuilder + .bytes() + .name("foobar") + .optional() + .build()) + ); } @Test @@ -266,6 +495,9 @@ public void shouldThrowIfAlreadyAliased() { public void shouldOnlyAddAliasToTopLevelFields() { // Given: final LogicalSchema schema = LogicalSchema.of( + SchemaBuilder.struct() + .field("k0", Schema.OPTIONAL_INT64_SCHEMA) + .build(), SchemaBuilder.struct() .field("f0", SchemaBuilder.OPTIONAL_STRING_SCHEMA) .field("f1", SchemaBuilder.struct() @@ -279,14 +511,19 @@ public void shouldOnlyAddAliasToTopLevelFields() { final LogicalSchema result = schema.withAlias("bob"); // Then: - assertThat(result, is(LogicalSchema.of(SchemaBuilder.struct() - .field("bob.f0", SchemaBuilder.OPTIONAL_STRING_SCHEMA) - .field("bob.f1", SchemaBuilder - .struct() - .field("nested", Schema.OPTIONAL_INT64_SCHEMA) - .optional() - .build()) - .build()))); + assertThat(result, is(LogicalSchema.of( + SchemaBuilder.struct() + .field("bob.k0", Schema.OPTIONAL_INT64_SCHEMA) + .build(), + SchemaBuilder.struct() + .field("bob.f0", SchemaBuilder.OPTIONAL_STRING_SCHEMA) + .field("bob.f1", SchemaBuilder + .struct() + .field("nested", Schema.OPTIONAL_INT64_SCHEMA) + .optional() + .build()) + .build() + ))); } @Test @@ -309,6 +546,9 @@ public void shouldThrowIfNotAliased() { public void shouldOnlyRemoveAliasFromTopLevelFields() { // Given: final LogicalSchema schema = LogicalSchema.of( + SchemaBuilder.struct() + .field("k0", Schema.OPTIONAL_INT32_SCHEMA) + .build(), SchemaBuilder.struct() .field("f0", SchemaBuilder.OPTIONAL_STRING_SCHEMA) .field("f1", SchemaBuilder.struct() @@ -322,14 +562,18 @@ public void shouldOnlyRemoveAliasFromTopLevelFields() { final LogicalSchema result = schema.withoutAlias(); // Then: - assertThat(result, is(LogicalSchema.of(SchemaBuilder.struct() - .field("f0", SchemaBuilder.OPTIONAL_STRING_SCHEMA) - .field("f1", SchemaBuilder - .struct() - .field("bob.nested", Schema.OPTIONAL_INT64_SCHEMA) - .optional() - .build()) - .build()))); + assertThat(result, is(LogicalSchema.of( + SchemaBuilder.struct() + .field("k0", Schema.OPTIONAL_INT32_SCHEMA) + .build(), + SchemaBuilder.struct() + .field("f0", SchemaBuilder.OPTIONAL_STRING_SCHEMA) + .field("f1", SchemaBuilder + .struct() + .field("bob.nested", Schema.OPTIONAL_INT64_SCHEMA) + .optional() + .build()) + .build()))); } @Test @@ -338,7 +582,7 @@ public void shouldGetFieldByName() { final Optional result = SOME_SCHEMA.findValueField("f0"); // Then: - assertThat(result, is(Optional.of(SOME_CONNECT_SCHEMA.field("f0")))); + assertThat(result, is(Optional.of(SOME_VALUE_SCHEMA.field("f0")))); } @Test @@ -347,7 +591,7 @@ public void shouldGetFieldByAliasedName() { final Optional result = SOME_SCHEMA.findValueField("SomeAlias.f0"); // Then: - assertThat(result, is(Optional.of(SOME_CONNECT_SCHEMA.field("f0")))); + assertThat(result, is(Optional.of(SOME_VALUE_SCHEMA.field("f0")))); } @Test @@ -384,7 +628,7 @@ public void shouldNotGetMetaFieldFromValue() { @Test public void shouldNotGetKeyFieldFromValue() { - assertThat(SOME_SCHEMA.findValueField("ROWKEY"), is(Optional.empty())); + assertThat(SOME_SCHEMA.findValueField("k0"), is(Optional.empty())); } @Test @@ -395,7 +639,7 @@ public void shouldGetMetaFieldFromValueIfAdded() { @Test public void shouldGetKeyFieldFromValueIfAdded() { - assertThat(SOME_SCHEMA.withMetaAndKeyFieldsInValue().findValueField("ROWKEY"), + assertThat(SOME_SCHEMA.withMetaAndKeyFieldsInValue().findValueField("k0"), is(not(Optional.empty()))); } @@ -407,9 +651,9 @@ public void shouldGetMetaFields() { } @Test - public void shouldGetFields() { - assertThat(SOME_SCHEMA.findField("ROWKEY"), is(Optional.of( - new Field("ROWKEY", 0, Schema.OPTIONAL_STRING_SCHEMA) + public void shouldGetKeyFields() { + assertThat(SOME_SCHEMA.findField("k0"), is(Optional.of( + new Field("k0", 0, Schema.OPTIONAL_INT64_SCHEMA) ))); } @@ -475,7 +719,7 @@ public void shouldExposeAliasedMetaFields() { @Test public void shouldExposeKeyFields() { assertThat(SOME_SCHEMA.keyFields(), is(ImmutableList.of( - new Field(ROWKEY_NAME, 0, Schema.OPTIONAL_STRING_SCHEMA) + new Field("k0", 0, Schema.OPTIONAL_INT64_SCHEMA) ))); } @@ -489,13 +733,13 @@ public void shouldExposeAliasedKeyFields() { // Then: assertThat(fields, is(ImmutableList.of( - new Field("fred." + ROWKEY_NAME, 0, Schema.OPTIONAL_STRING_SCHEMA) + new Field("fred.k0", 0, Schema.OPTIONAL_INT64_SCHEMA) ))); } @Test public void shouldExposeValueFields() { - assertThat(SOME_SCHEMA.valueFields(), is(SOME_CONNECT_SCHEMA.fields())); + assertThat(SOME_SCHEMA.valueFields(), is(SOME_VALUE_SCHEMA.fields())); } @Test @@ -514,7 +758,7 @@ public void shouldExposeAliasedValueFields() { public void shouldExposeAllFields() { assertThat(SOME_SCHEMA.fields(), is(ImmutableList.of( new Field(ROWTIME_NAME, 0, Schema.OPTIONAL_INT64_SCHEMA), - new Field(ROWKEY_NAME, 0, Schema.OPTIONAL_STRING_SCHEMA), + new Field("k0", 0, Schema.OPTIONAL_INT64_SCHEMA), new Field("f0", 0, SchemaBuilder.OPTIONAL_STRING_SCHEMA), new Field("f1", 1, SchemaBuilder.OPTIONAL_INT64_SCHEMA) ))); @@ -531,7 +775,7 @@ public void shouldExposeAliasedAllFields() { // Then: assertThat(fields, is(ImmutableList.of( new Field("bob." + ROWTIME_NAME, 0, Schema.OPTIONAL_INT64_SCHEMA), - new Field("bob." + ROWKEY_NAME, 0, Schema.OPTIONAL_STRING_SCHEMA), + new Field("bob.k0", 0, Schema.OPTIONAL_INT64_SCHEMA), new Field("bob.f0", 0, SchemaBuilder.OPTIONAL_STRING_SCHEMA), new Field("bob.f1", 1, SchemaBuilder.OPTIONAL_INT64_SCHEMA) ))); @@ -636,14 +880,14 @@ public void shouldConvertAliasedSchemaToString() { @Test public void shouldAddMetaAndKeyColumns() { // Given: - final LogicalSchema schema = LogicalSchema.of(SOME_CONNECT_SCHEMA); + final LogicalSchema schema = LogicalSchema.of(SOME_VALUE_SCHEMA); // When: final LogicalSchema result = schema .withMetaAndKeyFieldsInValue(); // Then: - assertThat(result.valueFields(), hasSize(SOME_CONNECT_SCHEMA.fields().size() + 2)); + assertThat(result.valueFields(), hasSize(SOME_VALUE_SCHEMA.fields().size() + 2)); assertThat(result.valueFields().get(0).name(), is(SchemaUtil.ROWTIME_NAME)); assertThat(result.valueFields().get(0).index(), is(0)); assertThat(result.valueFields().get(0).schema(), is(Schema.OPTIONAL_INT64_SCHEMA)); @@ -655,7 +899,7 @@ public void shouldAddMetaAndKeyColumns() { @Test public void shouldAddMetaAndKeyColumnsWhenAliased() { // Given: - final LogicalSchema schema = LogicalSchema.of(SOME_CONNECT_SCHEMA) + final LogicalSchema schema = LogicalSchema.of(SOME_VALUE_SCHEMA) .withAlias("bob"); // When: @@ -663,7 +907,7 @@ public void shouldAddMetaAndKeyColumnsWhenAliased() { .withMetaAndKeyFieldsInValue(); // Then: - assertThat(result.valueFields(), hasSize(SOME_CONNECT_SCHEMA.fields().size() + 2)); + assertThat(result.valueFields(), hasSize(SOME_VALUE_SCHEMA.fields().size() + 2)); assertThat(result.valueFields().get(0).name(), is("bob." + SchemaUtil.ROWTIME_NAME)); assertThat(result.valueFields().get(0).index(), is(0)); assertThat(result.valueFields().get(0).schema(), is(Schema.OPTIONAL_INT64_SCHEMA)); @@ -675,7 +919,7 @@ public void shouldAddMetaAndKeyColumnsWhenAliased() { @Test public void shouldAddMetaAndKeyColumnsOnlyOnce() { // Given: - final LogicalSchema ksqlSchema = LogicalSchema.of(SOME_CONNECT_SCHEMA) + final LogicalSchema ksqlSchema = LogicalSchema.of(SOME_VALUE_SCHEMA) .withMetaAndKeyFieldsInValue(); // When: @@ -765,10 +1009,14 @@ public void shouldRemoveMetaFieldsEvenIfAliased() { final LogicalSchema result = schema.withoutMetaAndKeyFieldsInValue(); // Then: - assertThat(result, is(LogicalSchema.of(SchemaBuilder.struct() - .field("bob.f0", Schema.OPTIONAL_INT64_SCHEMA) - .field("bob.f1", Schema.OPTIONAL_INT64_SCHEMA) - .build() + assertThat(result, is(LogicalSchema.of( + SchemaBuilder.struct() + .field("bob.ROWKEY", Schema.OPTIONAL_STRING_SCHEMA) + .build(), + SchemaBuilder.struct() + .field("bob.f0", Schema.OPTIONAL_INT64_SCHEMA) + .field("bob.f1", Schema.OPTIONAL_INT64_SCHEMA) + .build() ))); } @@ -780,8 +1028,8 @@ public void shouldMatchMetaFieldName() { @Test public void shouldMatchKeyFieldName() { - assertThat(SOME_SCHEMA.isMetaField(ROWKEY_NAME), is(false)); - assertThat(SOME_SCHEMA.isKeyField(ROWKEY_NAME), is(true)); + assertThat(SOME_SCHEMA.isMetaField("k0"), is(false)); + assertThat(SOME_SCHEMA.isKeyField("k0"), is(true)); } @Test diff --git a/ksql-engine/src/main/java/io/confluent/ksql/ddl/commands/CreateSourceCommand.java b/ksql-engine/src/main/java/io/confluent/ksql/ddl/commands/CreateSourceCommand.java index 7171920eaaec..b2fd54db8923 100644 --- a/ksql-engine/src/main/java/io/confluent/ksql/ddl/commands/CreateSourceCommand.java +++ b/ksql-engine/src/main/java/io/confluent/ksql/ddl/commands/CreateSourceCommand.java @@ -25,6 +25,7 @@ import io.confluent.ksql.parser.properties.with.CreateSourceProperties; import io.confluent.ksql.parser.tree.CreateSource; import io.confluent.ksql.parser.tree.TableElement; +import io.confluent.ksql.parser.tree.TableElement.Namespace; import io.confluent.ksql.parser.tree.TableElements; import io.confluent.ksql.schema.ksql.LogicalSchema; import io.confluent.ksql.schema.ksql.SchemaConverters; @@ -37,6 +38,7 @@ import io.confluent.ksql.services.KafkaTopicClient; import io.confluent.ksql.util.KsqlConfig; import io.confluent.ksql.util.KsqlException; +import io.confluent.ksql.util.SchemaUtil; import io.confluent.ksql.util.StringUtil; import io.confluent.ksql.util.timestamp.TimestampExtractionPolicy; import io.confluent.ksql.util.timestamp.TimestampExtractionPolicyFactory; @@ -144,16 +146,25 @@ private static LogicalSchema buildSchema(final TableElements tableElements) { throw new KsqlException("The statement does not define any columns."); } + final SchemaBuilder keySchema = SchemaBuilder.struct(); final SchemaBuilder valueSchema = SchemaBuilder.struct(); for (final TableElement tableElement : tableElements) { final String fieldName = tableElement.getName(); final Schema fieldSchema = SchemaConverters.sqlToLogicalConverter() .fromSqlType(tableElement.getType().getSqlType()); - valueSchema.field(fieldName, fieldSchema); + if (tableElement.getNamespace() == Namespace.KEY) { + keySchema.field(fieldName, fieldSchema); + } else { + valueSchema.field(fieldName, fieldSchema); + } } - return LogicalSchema.of(valueSchema.build()); + if (keySchema.fields().isEmpty()) { + keySchema.field(SchemaUtil.ROWKEY_NAME, Schema.OPTIONAL_STRING_SCHEMA); + } + + return LogicalSchema.of(keySchema.build(), valueSchema.build()); } static void checkMetaData( diff --git a/ksql-engine/src/main/java/io/confluent/ksql/planner/LogicalPlanner.java b/ksql-engine/src/main/java/io/confluent/ksql/planner/LogicalPlanner.java index 0a62c9632166..d503664253d7 100644 --- a/ksql-engine/src/main/java/io/confluent/ksql/planner/LogicalPlanner.java +++ b/ksql-engine/src/main/java/io/confluent/ksql/planner/LogicalPlanner.java @@ -42,6 +42,7 @@ import io.confluent.ksql.util.timestamp.TimestampExtractionPolicyFactory; import java.util.List; import java.util.Optional; +import org.apache.kafka.connect.data.ConnectSchema; import org.apache.kafka.connect.data.Schema; import org.apache.kafka.connect.data.SchemaBuilder; @@ -182,10 +183,19 @@ private AggregateNode buildAggregateNode(final PlanNode sourcePlanNode) { } } + final ConnectSchema keySchema = sourcePlanNode.getSchema().isAliased() + ? sourcePlanNode.getSchema().withoutAlias().keySchema() + : sourcePlanNode.getSchema().keySchema(); + + final LogicalSchema schema = LogicalSchema.of( + keySchema, + aggregateSchema.build() + ); + return new AggregateNode( new PlanNodeId("Aggregate"), sourcePlanNode, - LogicalSchema.of(aggregateSchema.build()), + schema, keyField, analysis.getGroupByExpressions(), analysis.getWindowExpression(), @@ -224,10 +234,16 @@ private ProjectNode buildProjectNode(final PlanNode sourcePlanNode) { } } + final ConnectSchema keySchema = sourcePlanNode.getSchema().isAliased() + ? sourcePlanNode.getSchema().withoutAlias().keySchema() + : sourcePlanNode.getSchema().keySchema(); + + final LogicalSchema schema = LogicalSchema.of(keySchema, projectionSchema.build()); + return new ProjectNode( new PlanNodeId("Project"), sourcePlanNode, - LogicalSchema.of(projectionSchema.build()), + schema, keyFieldName, analysis.getSelectExpressions() ); diff --git a/ksql-engine/src/main/java/io/confluent/ksql/planner/plan/AggregateNode.java b/ksql-engine/src/main/java/io/confluent/ksql/planner/plan/AggregateNode.java index ccfec9f744a2..322e8b6323eb 100644 --- a/ksql-engine/src/main/java/io/confluent/ksql/planner/plan/AggregateNode.java +++ b/ksql-engine/src/main/java/io/confluent/ksql/planner/plan/AggregateNode.java @@ -379,7 +379,10 @@ private LogicalSchema buildAggregateSchema( ); } - return LogicalSchema.of(schemaBuilder.build()); + return LogicalSchema.of( + schema.keySchema(), + schemaBuilder.build() + ); } private static class InternalSchema { diff --git a/ksql-engine/src/main/java/io/confluent/ksql/planner/plan/JoinNode.java b/ksql-engine/src/main/java/io/confluent/ksql/planner/plan/JoinNode.java index 3d7fc5915218..351e44ae327d 100644 --- a/ksql-engine/src/main/java/io/confluent/ksql/planner/plan/JoinNode.java +++ b/ksql-engine/src/main/java/io/confluent/ksql/planner/plan/JoinNode.java @@ -43,6 +43,7 @@ import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.common.serialization.Serde; import org.apache.kafka.connect.data.Field; +import org.apache.kafka.connect.data.Schema; import org.apache.kafka.connect.data.SchemaBuilder; @@ -77,7 +78,6 @@ public JoinNode( this.joinType = joinType; this.left = Objects.requireNonNull(left, "left"); this.right = Objects.requireNonNull(right, "right"); - this.schema = buildSchema(left, right); this.leftJoinFieldName = Objects.requireNonNull(leftJoinFieldName, "leftJoinFieldName"); this.rightJoinFieldName = Objects.requireNonNull(rightJoinFieldName, "rightJoinFieldName"); this.withinExpression = Objects.requireNonNull(withinExpression, "withinExpression"); @@ -86,23 +86,8 @@ public JoinNode( validateFieldInSchema(rightJoinFieldName, right.getSchema()); this.keyField = KeyField.of(leftJoinFieldName, leftKeyField); - } - - private static LogicalSchema buildSchema(final PlanNode left, final PlanNode right) { - - final LogicalSchema leftSchema = left.getSchema(); - final LogicalSchema rightSchema = right.getSchema(); - - final SchemaBuilder schemaBuilder = SchemaBuilder.struct(); - for (final Field field : leftSchema.valueSchema().fields()) { - schemaBuilder.field(field.name(), field.schema()); - } - - for (final Field field : rightSchema.valueSchema().fields()) { - schemaBuilder.field(field.name(), field.schema()); - } - return LogicalSchema.of(schemaBuilder.build()); + this.schema = buildSchema(left, right); } @Override @@ -516,4 +501,30 @@ private static DataSourceType calculateSinkType( ? DataSourceType.KTABLE : DataSourceType.KSTREAM; } + + private static LogicalSchema buildSchema( + final PlanNode left, + final PlanNode right + ) { + + final LogicalSchema leftSchema = left.getSchema(); + final LogicalSchema rightSchema = right.getSchema(); + + final SchemaBuilder valueSchema = SchemaBuilder.struct(); + + for (final Field field : leftSchema.valueSchema().fields()) { + valueSchema.field(field.name(), field.schema()); + } + + for (final Field field : rightSchema.valueSchema().fields()) { + valueSchema.field(field.name(), field.schema()); + } + + // Hard-wire for now, until we support custom type/name of key fields: + final Schema keySchema = SchemaBuilder.struct() + .field(SchemaUtil.ROWKEY_NAME, Schema.OPTIONAL_STRING_SCHEMA) + .build(); + + return LogicalSchema.of(keySchema, valueSchema.build()); + } } diff --git a/ksql-engine/src/main/java/io/confluent/ksql/structured/SchemaKStream.java b/ksql-engine/src/main/java/io/confluent/ksql/structured/SchemaKStream.java index eed50c110eff..5a89999060b0 100644 --- a/ksql-engine/src/main/java/io/confluent/ksql/structured/SchemaKStream.java +++ b/ksql-engine/src/main/java/io/confluent/ksql/structured/SchemaKStream.java @@ -49,6 +49,7 @@ import java.util.stream.Stream; import org.apache.kafka.common.serialization.Serde; import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.connect.data.ConnectSchema; import org.apache.kafka.connect.data.Field; import org.apache.kafka.connect.data.Schema; import org.apache.kafka.connect.data.SchemaBuilder; @@ -223,7 +224,8 @@ class Selection { Selection( final List selectExpressions, - final ProcessingLogger processingLogger) { + final ProcessingLogger processingLogger + ) { key = findKeyField(selectExpressions); final List expressionEvaluators = buildExpressions(selectExpressions); schema = buildSchema(selectExpressions, expressionEvaluators); @@ -309,13 +311,19 @@ private Optional doFindKeyField( private LogicalSchema buildSchema( final List selectExpressions, - final List expressionEvaluators) { - final SchemaBuilder schemaBuilder = SchemaBuilder.struct(); + final List expressionEvaluators + ) { + final SchemaBuilder valueSchema = SchemaBuilder.struct(); IntStream.range(0, selectExpressions.size()).forEach( - i -> schemaBuilder.field( + i -> valueSchema.field( selectExpressions.get(i).getName(), expressionEvaluators.get(i).getExpressionType())); - return LogicalSchema.of(schemaBuilder.build()); + + final ConnectSchema keySchema = SchemaKStream.this.schema.isAliased() + ? SchemaKStream.this.schema.withoutAlias().keySchema() + : SchemaKStream.this.schema.keySchema(); + + return LogicalSchema.of(keySchema, valueSchema.build()); } List buildExpressions(final List selectExpressions @@ -507,7 +515,6 @@ public SchemaKStream outerJoin( ); } - @SuppressWarnings("unchecked") public SchemaKStream selectKey( final String fieldName, diff --git a/ksql-engine/src/test/java/io/confluent/ksql/ddl/commands/CommandFactoriesTest.java b/ksql-engine/src/test/java/io/confluent/ksql/ddl/commands/CommandFactoriesTest.java index 340268694da5..87dd92667444 100644 --- a/ksql-engine/src/test/java/io/confluent/ksql/ddl/commands/CommandFactoriesTest.java +++ b/ksql-engine/src/test/java/io/confluent/ksql/ddl/commands/CommandFactoriesTest.java @@ -36,6 +36,7 @@ import io.confluent.ksql.parser.tree.QualifiedName; import io.confluent.ksql.parser.tree.StringLiteral; import io.confluent.ksql.parser.tree.TableElement; +import io.confluent.ksql.parser.tree.TableElement.Namespace; import io.confluent.ksql.parser.tree.TableElements; import io.confluent.ksql.parser.tree.Type; import io.confluent.ksql.properties.with.CommonCreateConfigs; @@ -60,7 +61,7 @@ public class CommandFactoriesTest { private static final Map NO_PROPS = Collections.emptyMap(); private static final String sqlExpression = "sqlExpression"; private static final TableElements SOME_ELEMENTS = TableElements.of( - new TableElement("bob", new Type(SqlTypes.STRING))); + new TableElement(Namespace.VALUE, "bob", new Type(SqlTypes.STRING))); private static final Map MINIMIM_PROPS = ImmutableMap.of( CommonCreateConfigs.VALUE_FORMAT_PROPERTY, new StringLiteral("JSON"), @@ -105,8 +106,8 @@ public void shouldCreateCommandForCreateTable() { // Given: final CreateTable ddlStatement = new CreateTable(SOME_NAME, TableElements.of( - new TableElement("COL1", new Type(SqlTypes.BIGINT)), - new TableElement("COL2", new Type(SqlTypes.STRING))), + new TableElement(Namespace.VALUE, "COL1", new Type(SqlTypes.BIGINT)), + new TableElement(Namespace.VALUE, "COL2", new Type(SqlTypes.STRING))), true, withProperties); // When: diff --git a/ksql-engine/src/test/java/io/confluent/ksql/ddl/commands/CreateSourceCommandTest.java b/ksql-engine/src/test/java/io/confluent/ksql/ddl/commands/CreateSourceCommandTest.java index 25485792e520..ab3822ea4ea4 100644 --- a/ksql-engine/src/test/java/io/confluent/ksql/ddl/commands/CreateSourceCommandTest.java +++ b/ksql-engine/src/test/java/io/confluent/ksql/ddl/commands/CreateSourceCommandTest.java @@ -34,6 +34,7 @@ import io.confluent.ksql.parser.tree.QualifiedName; import io.confluent.ksql.parser.tree.StringLiteral; import io.confluent.ksql.parser.tree.TableElement; +import io.confluent.ksql.parser.tree.TableElement.Namespace; import io.confluent.ksql.parser.tree.TableElements; import io.confluent.ksql.parser.tree.Type; import io.confluent.ksql.properties.with.CommonCreateConfigs; @@ -65,11 +66,11 @@ public class CreateSourceCommandTest { private static final String TOPIC_NAME = "some topic"; private static final TableElements ONE_ELEMENT = TableElements.of( - new TableElement("bob", new Type(SqlTypes.STRING))); + new TableElement(Namespace.VALUE, "bob", new Type(SqlTypes.STRING))); private static final TableElements SOME_ELEMENTS = TableElements.of( - new TableElement("bob", new Type(SqlTypes.STRING)), - new TableElement("hojjat", new Type(SqlTypes.STRING)) + new TableElement(Namespace.VALUE, "bob", new Type(SqlTypes.STRING)), + new TableElement(Namespace.VALUE, "hojjat", new Type(SqlTypes.STRING)) ); private static final Set SOME_SERDE_OPTIONS = ImmutableSet @@ -313,6 +314,71 @@ public void shouldBuildSerdeOptions() { assertThat(cmd.getSerdeOptions(), is(SOME_SERDE_OPTIONS)); } + @Test + public void shouldBuildSchemaWithImplicitKeyField() { + // Given: + when(statement.getElements()).thenReturn(TableElements.of( + new TableElement(Namespace.VALUE, "bob", new Type(SqlTypes.STRING)), + new TableElement(Namespace.VALUE, "hojjat", new Type(SqlTypes.STRING)) + )); + + // When: + final TestCmd result = new TestCmd( + "look mum, no columns", + statement, + ksqlConfig, + kafkaTopicClient, + serdeOptions, + serdeFactories + ); + + // Then: + assertThat(result.schema, is(LogicalSchema.of( + SchemaBuilder + .struct() + .field("ROWKEY", Schema.OPTIONAL_STRING_SCHEMA) + .build(), + SchemaBuilder + .struct() + .field("bob", Schema.OPTIONAL_STRING_SCHEMA) + .field("hojjat", Schema.OPTIONAL_STRING_SCHEMA) + .build() + ))); + } + + @Test + public void shouldBuildSchemaWithExplicitKeyField() { + // Given: + when(statement.getElements()).thenReturn(TableElements.of( + new TableElement(Namespace.KEY, "ROWKEY", new Type(SqlTypes.STRING)), + new TableElement(Namespace.VALUE, "bob", new Type(SqlTypes.STRING)), + new TableElement(Namespace.VALUE, "hojjat", new Type(SqlTypes.STRING)) + )); + + // When: + final TestCmd result = new TestCmd( + "look mum, no columns", + statement, + ksqlConfig, + kafkaTopicClient, + serdeOptions, + serdeFactories + ); + + // Then: + assertThat(result.schema, is(LogicalSchema.of( + SchemaBuilder + .struct() + .field("ROWKEY", Schema.OPTIONAL_STRING_SCHEMA) + .build(), + SchemaBuilder + .struct() + .field("bob", Schema.OPTIONAL_STRING_SCHEMA) + .field("hojjat", Schema.OPTIONAL_STRING_SCHEMA) + .build() + ))); + } + private static Map minValidProps() { return ImmutableMap.of( CommonCreateConfigs.VALUE_FORMAT_PROPERTY, new StringLiteral("json"), diff --git a/ksql-engine/src/test/java/io/confluent/ksql/ddl/commands/CreateStreamCommandTest.java b/ksql-engine/src/test/java/io/confluent/ksql/ddl/commands/CreateStreamCommandTest.java index 8b0d054f2382..d469a251a7db 100644 --- a/ksql-engine/src/test/java/io/confluent/ksql/ddl/commands/CreateStreamCommandTest.java +++ b/ksql-engine/src/test/java/io/confluent/ksql/ddl/commands/CreateStreamCommandTest.java @@ -32,6 +32,7 @@ import io.confluent.ksql.parser.tree.QualifiedName; import io.confluent.ksql.parser.tree.StringLiteral; import io.confluent.ksql.parser.tree.TableElement; +import io.confluent.ksql.parser.tree.TableElement.Namespace; import io.confluent.ksql.parser.tree.TableElements; import io.confluent.ksql.parser.tree.Type; import io.confluent.ksql.properties.with.CommonCreateConfigs; @@ -60,8 +61,8 @@ public class CreateStreamCommandTest { private static final String STREAM_NAME = "s1"; private static final TableElements SOME_ELEMENTS = TableElements.of( - new TableElement("ID", new Type(SqlTypes.BIGINT)), - new TableElement("bob", new Type(SqlTypes.STRING)) + new TableElement(Namespace.VALUE, "ID", new Type(SqlTypes.BIGINT)), + new TableElement(Namespace.VALUE, "bob", new Type(SqlTypes.STRING)) ); @Mock diff --git a/ksql-engine/src/test/java/io/confluent/ksql/ddl/commands/CreateTableCommandTest.java b/ksql-engine/src/test/java/io/confluent/ksql/ddl/commands/CreateTableCommandTest.java index 4ada46ce4103..1348656b409a 100644 --- a/ksql-engine/src/test/java/io/confluent/ksql/ddl/commands/CreateTableCommandTest.java +++ b/ksql-engine/src/test/java/io/confluent/ksql/ddl/commands/CreateTableCommandTest.java @@ -32,6 +32,7 @@ import io.confluent.ksql.parser.tree.QualifiedName; import io.confluent.ksql.parser.tree.StringLiteral; import io.confluent.ksql.parser.tree.TableElement; +import io.confluent.ksql.parser.tree.TableElement.Namespace; import io.confluent.ksql.parser.tree.TableElements; import io.confluent.ksql.parser.tree.Type; import io.confluent.ksql.properties.with.CommonCreateConfigs; @@ -78,7 +79,7 @@ public void setUp() { givenPropertiesWith((Collections.emptyMap())); when(createTableStatement.getName()).thenReturn(QualifiedName.of(TABLE_NAME)); when(createTableStatement.getElements()).thenReturn(TableElements.of( - new TableElement("SOME-KEY", new Type(SqlTypes.STRING)) + new TableElement(Namespace.VALUE, "SOME-KEY", new Type(SqlTypes.STRING)) )); when(topicClient.isTopicExists(any())).thenReturn(true); } diff --git a/ksql-engine/src/test/java/io/confluent/ksql/planner/plan/DataSourceNodeTest.java b/ksql-engine/src/test/java/io/confluent/ksql/planner/plan/DataSourceNodeTest.java index 9c1af65514c6..3b9ce74ba356 100644 --- a/ksql-engine/src/test/java/io/confluent/ksql/planner/plan/DataSourceNodeTest.java +++ b/ksql-engine/src/test/java/io/confluent/ksql/planner/plan/DataSourceNodeTest.java @@ -366,14 +366,14 @@ public void shouldHaveFullyQualifiedSchema() { // Then: assertThat(schema, is( LogicalSchema.of(SchemaBuilder.struct() - .field(sourceName + "." + SchemaUtil.ROWTIME_NAME, Schema.OPTIONAL_INT64_SCHEMA) - .field(sourceName + "." + SchemaUtil.ROWKEY_NAME, Schema.OPTIONAL_STRING_SCHEMA) - .field(sourceName + ".field1", Schema.OPTIONAL_STRING_SCHEMA) - .field(sourceName + ".field2", Schema.OPTIONAL_STRING_SCHEMA) - .field(sourceName + ".field3", Schema.OPTIONAL_STRING_SCHEMA) - .field(sourceName + "." + TIMESTAMP_FIELD, Schema.OPTIONAL_INT64_SCHEMA) - .field(sourceName + ".key", Schema.OPTIONAL_STRING_SCHEMA) - .build()))); + .field(SchemaUtil.ROWTIME_NAME, Schema.OPTIONAL_INT64_SCHEMA) + .field(SchemaUtil.ROWKEY_NAME, Schema.OPTIONAL_STRING_SCHEMA) + .field("field1", Schema.OPTIONAL_STRING_SCHEMA) + .field("field2", Schema.OPTIONAL_STRING_SCHEMA) + .field("field3", Schema.OPTIONAL_STRING_SCHEMA) + .field(TIMESTAMP_FIELD, Schema.OPTIONAL_INT64_SCHEMA) + .field("key", Schema.OPTIONAL_STRING_SCHEMA) + .build()).withAlias(sourceName))); } @Test diff --git a/ksql-engine/src/test/java/io/confluent/ksql/schema/ksql/inference/DefaultSchemaInjectorTest.java b/ksql-engine/src/test/java/io/confluent/ksql/schema/ksql/inference/DefaultSchemaInjectorTest.java index 9899003f4387..e5ccef30d901 100644 --- a/ksql-engine/src/test/java/io/confluent/ksql/schema/ksql/inference/DefaultSchemaInjectorTest.java +++ b/ksql-engine/src/test/java/io/confluent/ksql/schema/ksql/inference/DefaultSchemaInjectorTest.java @@ -41,6 +41,7 @@ import io.confluent.ksql.parser.tree.Statement; import io.confluent.ksql.parser.tree.StringLiteral; import io.confluent.ksql.parser.tree.TableElement; +import io.confluent.ksql.parser.tree.TableElement.Namespace; import io.confluent.ksql.parser.tree.TableElements; import io.confluent.ksql.parser.tree.Type; import io.confluent.ksql.schema.ksql.inference.TopicSchemaSupplier.SchemaResult; @@ -70,7 +71,7 @@ public class DefaultSchemaInjectorTest { private static final TableElements SOME_ELEMENTS = TableElements.of( - new TableElement("bob", new Type(SqlTypes.STRING))); + new TableElement(Namespace.VALUE, "bob", new Type(SqlTypes.STRING))); private static final String KAFKA_TOPIC = "some-topic"; private static final Map UNSUPPORTED_PROPS = ImmutableMap.of( "VALUE_FORMAT", new StringLiteral("json"), @@ -106,17 +107,19 @@ public class DefaultSchemaInjectorTest { .build(); private static final TableElements EXPECTED_KSQL_SCHEMA = TableElements.of( - new TableElement("INTFIELD", new Type(SqlTypes.INTEGER)), - new TableElement("BIGINTFIELD", new Type(SqlTypes.BIGINT)), - new TableElement("DOUBLEFIELD", new Type(SqlTypes.DOUBLE)), - new TableElement("STRINGFIELD", new Type(SqlTypes.STRING)), - new TableElement("BOOLEANFIELD", new Type(SqlTypes.BOOLEAN)), - new TableElement("ARRAYFIELD", new Type(SqlTypes.array(SqlTypes.INTEGER))), - new TableElement("MAPFIELD", new Type(SqlTypes.map(SqlTypes.BIGINT))), - new TableElement("STRUCTFIELD", new Type(SqlStruct.builder() + new TableElement(Namespace.VALUE, "INTFIELD", new Type(SqlTypes.INTEGER)), + new TableElement(Namespace.VALUE, "BIGINTFIELD", new Type(SqlTypes.BIGINT)), + new TableElement(Namespace.VALUE, "DOUBLEFIELD", new Type(SqlTypes.DOUBLE)), + new TableElement(Namespace.VALUE, "STRINGFIELD", new Type(SqlTypes.STRING)), + new TableElement(Namespace.VALUE, "BOOLEANFIELD", new Type(SqlTypes.BOOLEAN)), + new TableElement(Namespace.VALUE, "ARRAYFIELD", new Type(SqlTypes.array(SqlTypes.INTEGER))), + new TableElement(Namespace.VALUE, "MAPFIELD", new Type(SqlTypes.map(SqlTypes.BIGINT))), + new TableElement(Namespace.VALUE, "STRUCTFIELD", new Type(SqlStruct.builder() .field("S0", SqlTypes.BIGINT) .build())), - new TableElement("DECIMALFIELD", new Type(SqlTypes.decimal(4, 2)))); + new TableElement(Namespace.VALUE, + "DECIMALFIELD", new Type(SqlTypes.decimal(4, 2)) + )); private static final int SCHEMA_ID = 5; diff --git a/ksql-examples/src/main/java/io/confluent/ksql/datagen/AvroProducer.java b/ksql-examples/src/main/java/io/confluent/ksql/datagen/AvroProducer.java index 812c54efbc2b..ca95bf023fda 100644 --- a/ksql-examples/src/main/java/io/confluent/ksql/datagen/AvroProducer.java +++ b/ksql-examples/src/main/java/io/confluent/ksql/datagen/AvroProducer.java @@ -54,9 +54,14 @@ protected Serializer getSerializer( final org.apache.kafka.connect.data.Schema kafkaSchema, final String topicName ) { + final PhysicalSchema physicalSchema = PhysicalSchema.from( + LogicalSchema.of(KEY_SCHEMA, kafkaSchema), + SerdeOption.none() + ); + return GenericRowSerDe.from( new KsqlAvroSerdeFactory(KsqlConstants.DEFAULT_AVRO_SCHEMA_FULL_NAME), - PhysicalSchema.from(LogicalSchema.of(kafkaSchema), SerdeOption.none()), + physicalSchema, ksqlConfig, () -> schemaRegistryClient, "", diff --git a/ksql-examples/src/main/java/io/confluent/ksql/datagen/DataGenProducer.java b/ksql-examples/src/main/java/io/confluent/ksql/datagen/DataGenProducer.java index a3e494582633..40a777f1ce33 100644 --- a/ksql-examples/src/main/java/io/confluent/ksql/datagen/DataGenProducer.java +++ b/ksql-examples/src/main/java/io/confluent/ksql/datagen/DataGenProducer.java @@ -19,6 +19,7 @@ import io.confluent.connect.avro.AvroData; import io.confluent.ksql.GenericRow; import io.confluent.ksql.util.Pair; +import io.confluent.ksql.util.SchemaUtil; import java.util.Objects; import java.util.Properties; import org.apache.avro.Schema; @@ -28,9 +29,14 @@ import org.apache.kafka.clients.producer.RecordMetadata; import org.apache.kafka.common.serialization.Serializer; import org.apache.kafka.common.serialization.StringSerializer; +import org.apache.kafka.connect.data.SchemaBuilder; public abstract class DataGenProducer { + static final org.apache.kafka.connect.data.Schema KEY_SCHEMA = SchemaBuilder.struct() + .field(SchemaUtil.ROWKEY_NAME, org.apache.kafka.connect.data.Schema.OPTIONAL_STRING_SCHEMA) + .build(); + // Max 100 ms between messsages. public static final long INTER_MESSAGE_MAX_INTERVAL = 500; diff --git a/ksql-examples/src/main/java/io/confluent/ksql/datagen/DelimitedProducer.java b/ksql-examples/src/main/java/io/confluent/ksql/datagen/DelimitedProducer.java index 4df869bc97ec..5d2fc6972961 100644 --- a/ksql-examples/src/main/java/io/confluent/ksql/datagen/DelimitedProducer.java +++ b/ksql-examples/src/main/java/io/confluent/ksql/datagen/DelimitedProducer.java @@ -35,9 +35,14 @@ protected Serializer getSerializer( final org.apache.kafka.connect.data.Schema kafkaSchema, final String topicName ) { + final PhysicalSchema physicalSchema = PhysicalSchema.from( + LogicalSchema.of(KEY_SCHEMA, kafkaSchema), + SerdeOption.none() + ); + return GenericRowSerDe.from( new KsqlDelimitedSerdeFactory(), - PhysicalSchema.from(LogicalSchema.of(kafkaSchema), SerdeOption.none()), + physicalSchema, new KsqlConfig(ImmutableMap.of()), () -> null, "", diff --git a/ksql-examples/src/main/java/io/confluent/ksql/datagen/JsonProducer.java b/ksql-examples/src/main/java/io/confluent/ksql/datagen/JsonProducer.java index 81981ec2afa5..cf26d1e59ba4 100644 --- a/ksql-examples/src/main/java/io/confluent/ksql/datagen/JsonProducer.java +++ b/ksql-examples/src/main/java/io/confluent/ksql/datagen/JsonProducer.java @@ -35,9 +35,14 @@ protected Serializer getSerializer( final org.apache.kafka.connect.data.Schema kafkaSchema, final String topicName ) { + final PhysicalSchema physicalSchema = PhysicalSchema.from( + LogicalSchema.of(KEY_SCHEMA, kafkaSchema), + SerdeOption.none() + ); + return GenericRowSerDe.from( new KsqlJsonSerdeFactory(), - PhysicalSchema.from(LogicalSchema.of(kafkaSchema), SerdeOption.none()), + physicalSchema, new KsqlConfig(ImmutableMap.of()), () -> null, "", diff --git a/ksql-functional-tests/src/test/resources/query-validation-tests/key-schemas.json b/ksql-functional-tests/src/test/resources/query-validation-tests/key-schemas.json index 754c61a5f99b..46e753a6a3a7 100644 --- a/ksql-functional-tests/src/test/resources/query-validation-tests/key-schemas.json +++ b/ksql-functional-tests/src/test/resources/query-validation-tests/key-schemas.json @@ -37,6 +37,87 @@ {"topic": "OUTPUT", "key": "", "value": {"ID": 3, "KEY": ""}}, {"topic": "OUTPUT", "key": null, "value": {"ID": 4, "KEY": ""}} ] + }, + { + "name": "stream explicit STRING ROWKEY", + "statements": [ + "CREATE STREAM INPUT (ROWKEY STRING KEY, ID bigint) WITH (kafka_topic='input',value_format='JSON');", + "CREATE STREAM OUTPUT as SELECT ID, ROWKEY as KEY FROM INPUT;" + ], + "inputs": [ + {"topic": "input", "key": 1, "value": {"id": 1}}, + {"topic": "input", "key": "1", "value": {"id": 2}}, + {"topic": "input", "key": "", "value": {"id": 3}}, + {"topic": "input", "key": null, "value": {"id": 4}} + ], + "outputs": [ + {"topic": "OUTPUT", "key": "1", "value": {"ID": 1, "KEY": "1"}}, + {"topic": "OUTPUT", "key": "1", "value": {"ID": 2, "KEY": "1"}}, + {"topic": "OUTPUT", "key": "", "value": {"ID": 3, "KEY": ""}}, + {"topic": "OUTPUT", "key": null, "value": {"ID": 4, "KEY": ""}} + ] + }, + { + "name": "table explicit STRING ROWKEY", + "statements": [ + "CREATE TABLE INPUT (ROWKEY STRING KEY, ID bigint) WITH (kafka_topic='input',value_format='JSON');", + "CREATE TABLE OUTPUT as SELECT ID, ROWKEY as KEY FROM INPUT;" + ], + "inputs": [ + {"topic": "input", "key": 1, "value": {"id": 1}}, + {"topic": "input", "key": "1", "value": {"id": 2}}, + {"topic": "input", "key": "", "value": {"id": 3}}, + {"topic": "input", "key": null, "value": {"id": 4}} + ], + "outputs": [ + {"topic": "OUTPUT", "key": "1", "value": {"ID": 1, "KEY": "1"}}, + {"topic": "OUTPUT", "key": "1", "value": {"ID": 2, "KEY": "1"}}, + {"topic": "OUTPUT", "key": "", "value": {"ID": 3, "KEY": ""}}, + {"topic": "OUTPUT", "key": null, "value": {"ID": 4, "KEY": ""}} + ] + }, + { + "name": "explicit non-STRING ROWKEY", + "statements": [ + "CREATE STREAM INPUT (ROWKEY INT KEY, ID bigint) WITH (kafka_topic='input',value_format='JSON');" + ], + "expectedException": { + "type": "io.confluent.ksql.parser.exception.ParseFailedException", + "message": "line 1:23: 'ROWKEY' is a KEY field with an unsupported type. KSQL currently only supports KEY fields of type STRING." + } + }, + { + "name": "explicit key field named other than ROWKEY", + "statements": [ + "CREATE STREAM INPUT (OTHER STRING KEY, ID bigint) WITH (kafka_topic='input',value_format='JSON');" + ], + "expectedException": { + "type": "io.confluent.ksql.parser.exception.ParseFailedException", + "message": "line 1:23: 'OTHER' is an invalid KEY field name. KSQL currently only supports KEY fields named ROWKEY." + } + }, + { + "name": "KEY key field name", + "statements": [ + "CREATE STREAM INPUT (KEY STRING KEY, ID bigint) WITH (kafka_topic='input',value_format='JSON');" + ], + "expectedException": { + "type": "io.confluent.ksql.parser.exception.ParseFailedException", + "message": "line 1:23: 'KEY' is an invalid KEY field name. KSQL currently only supports KEY fields named ROWKEY." + } + }, + { + "name": "KEY value field name", + "statements": [ + "CREATE STREAM INPUT (KEY STRING, ID bigint) WITH (kafka_topic='input',value_format='JSON');", + "CREATE STREAM OUTPUT as SELECT * FROM INPUT;" + ], + "inputs": [ + {"topic": "input", "key": 1, "value": {"key": "a", "id": 1}} + ], + "outputs": [ + {"topic": "OUTPUT", "key": "1", "value": {"ID": 1, "KEY": "a"}} + ] } ] } \ No newline at end of file diff --git a/ksql-parser/src/main/antlr4/io/confluent/ksql/parser/SqlBase.g4 b/ksql-parser/src/main/antlr4/io/confluent/ksql/parser/SqlBase.g4 index e60123a4a834..572e628f0137 100644 --- a/ksql-parser/src/main/antlr4/io/confluent/ksql/parser/SqlBase.g4 +++ b/ksql-parser/src/main/antlr4/io/confluent/ksql/parser/SqlBase.g4 @@ -79,7 +79,7 @@ tableElements ; tableElement - : identifier type + : identifier type (KEY)? ; tableProperties @@ -316,6 +316,7 @@ nonReserved | EXPLAIN | ANALYZE | TYPE | SET | RESET | IF + | KEY ; SELECT: 'SELECT'; @@ -426,6 +427,7 @@ UNSET: 'UNSET'; RUN: 'RUN'; SCRIPT: 'SCRIPT'; DECIMAL: 'DECIMAL'; +KEY: 'KEY'; IF: 'IF'; diff --git a/ksql-parser/src/main/java/io/confluent/ksql/parser/AstBuilder.java b/ksql-parser/src/main/java/io/confluent/ksql/parser/AstBuilder.java index 691738045f63..6ecb1d18c5ec 100644 --- a/ksql-parser/src/main/java/io/confluent/ksql/parser/AstBuilder.java +++ b/ksql-parser/src/main/java/io/confluent/ksql/parser/AstBuilder.java @@ -100,6 +100,7 @@ import io.confluent.ksql.parser.tree.SubscriptExpression; import io.confluent.ksql.parser.tree.Table; import io.confluent.ksql.parser.tree.TableElement; +import io.confluent.ksql.parser.tree.TableElement.Namespace; import io.confluent.ksql.parser.tree.TableElements; import io.confluent.ksql.parser.tree.TerminateQuery; import io.confluent.ksql.parser.tree.TimeLiteral; @@ -1010,6 +1011,7 @@ public Node visitFunctionCall(final SqlBaseParser.FunctionCallContext context) { public Node visitTableElement(final SqlBaseParser.TableElementContext context) { return new TableElement( getLocation(context), + context.KEY() == null ? Namespace.VALUE : Namespace.KEY, ParserUtil.getIdentifierText(context.identifier()), getType(context.type()) ); diff --git a/ksql-parser/src/main/java/io/confluent/ksql/parser/SchemaParser.java b/ksql-parser/src/main/java/io/confluent/ksql/parser/SchemaParser.java index 0b54f2bcd896..a11c21f07f8b 100644 --- a/ksql-parser/src/main/java/io/confluent/ksql/parser/SchemaParser.java +++ b/ksql-parser/src/main/java/io/confluent/ksql/parser/SchemaParser.java @@ -19,6 +19,7 @@ import static io.confluent.ksql.util.ParserUtil.getLocation; import io.confluent.ksql.parser.tree.TableElement; +import io.confluent.ksql.parser.tree.TableElement.Namespace; import io.confluent.ksql.parser.tree.TableElements; import io.confluent.ksql.util.KsqlException; import io.confluent.ksql.util.ParserUtil; @@ -73,8 +74,10 @@ public void syntaxError( .stream() .map(ctx -> new TableElement( getLocation(ctx), + ctx.KEY() == null ? Namespace.VALUE : Namespace.KEY, ParserUtil.getIdentifierText(ctx.identifier()), - getType(ctx.type()))) + getType(ctx.type()) + )) .collect(Collectors.toList()); return TableElements.of(elements); diff --git a/ksql-parser/src/main/java/io/confluent/ksql/parser/SqlFormatter.java b/ksql-parser/src/main/java/io/confluent/ksql/parser/SqlFormatter.java index 982739a0b8b1..eeca61011268 100644 --- a/ksql-parser/src/main/java/io/confluent/ksql/parser/SqlFormatter.java +++ b/ksql-parser/src/main/java/io/confluent/ksql/parser/SqlFormatter.java @@ -48,6 +48,7 @@ import io.confluent.ksql.parser.tree.SingleColumn; import io.confluent.ksql.parser.tree.Table; import io.confluent.ksql.parser.tree.TableElement; +import io.confluent.ksql.parser.tree.TableElement.Namespace; import io.confluent.ksql.util.ParserUtil; import java.util.List; import java.util.Optional; @@ -423,7 +424,8 @@ private void formatCreateAs(final CreateAsSelect node, final Integer indent) { private static String formatTableElement(final TableElement e) { return ParserUtil.escapeIfReservedIdentifier(e.getName()) + " " - + e.getType(); + + e.getType() + + (e.getNamespace() == Namespace.KEY ? " KEY" : ""); } } } diff --git a/ksql-parser/src/main/java/io/confluent/ksql/parser/tree/TableElement.java b/ksql-parser/src/main/java/io/confluent/ksql/parser/tree/TableElement.java index 817e85f33400..c3252d1a366f 100644 --- a/ksql-parser/src/main/java/io/confluent/ksql/parser/tree/TableElement.java +++ b/ksql-parser/src/main/java/io/confluent/ksql/parser/tree/TableElement.java @@ -19,6 +19,7 @@ import com.google.errorprone.annotations.Immutable; import io.confluent.ksql.parser.ParsingException; +import io.confluent.ksql.schema.ksql.SqlBaseType; import io.confluent.ksql.util.SchemaUtil; import java.util.Objects; import java.util.Optional; @@ -29,31 +30,42 @@ @Immutable public final class TableElement extends Node { + public enum Namespace { + KEY, + VALUE + } + + private final Namespace namespace; private final String name; private final Type type; /** + * @param namespace indicates if the element is part of the key or value. * @param name the name of the element. * @param type the sql type of the element. */ public TableElement( + final Namespace namespace, final String name, final Type type ) { - this(Optional.empty(), name, type); + this(Optional.empty(), namespace, name, type); } /** * @param location the location in the SQL text. + * @param namespace indicates if the element is part of the key or value. * @param name the name of the element. * @param type the sql type of the element. */ public TableElement( final Optional location, + final Namespace namespace, final String name, final Type type ) { super(location); + this.namespace = requireNonNull(namespace, "namespace"); this.name = requireNonNull(name, "name"); this.type = requireNonNull(type, "type"); @@ -68,6 +80,10 @@ public Type getType() { return type; } + public Namespace getNamespace() { + return namespace; + } + @Override public R accept(final AstVisitor visitor, final C context) { return visitor.visitTableElement(this, context); @@ -83,12 +99,13 @@ public boolean equals(final Object obj) { } final TableElement o = (TableElement) obj; return Objects.equals(this.name, o.name) - && Objects.equals(this.type, o.type); + && Objects.equals(this.type, o.type) + && Objects.equals(this.namespace, o.namespace); } @Override public int hashCode() { - return Objects.hash(name, type); + return Objects.hash(name, type, namespace); } @Override @@ -96,13 +113,31 @@ public String toString() { return "TableElement{" + "name='" + name + '\'' + ", type=" + type + + ", namespace=" + namespace + '}'; } private void validate() { - if (name.toUpperCase().equals(SchemaUtil.ROWTIME_NAME) - || name.toUpperCase().equals(SchemaUtil.ROWKEY_NAME)) { + if (name.toUpperCase().equals(SchemaUtil.ROWTIME_NAME)) { throw new ParsingException("'" + name + "' is a reserved field name.", getLocation()); } + + final boolean isRowKey = name.toUpperCase().equals(SchemaUtil.ROWKEY_NAME); + + if (namespace == Namespace.KEY) { + if (!isRowKey) { + throw new ParsingException("'" + name + "' is an invalid KEY field name. " + + "KSQL currently only supports KEY fields named ROWKEY.", getLocation()); + } + + if (type.getSqlType().baseType() != SqlBaseType.STRING) { + throw new ParsingException("'" + name + "' is a KEY field with an unsupported type. " + + "KSQL currently only supports KEY fields of type " + SqlBaseType.STRING + ".", + getLocation()); + } + } else if (isRowKey) { + throw new ParsingException("'" + name + "' is a reserved field name. " + + "It can only be used for KEY fields.", getLocation()); + } } } diff --git a/ksql-parser/src/main/java/io/confluent/ksql/parser/tree/TableElements.java b/ksql-parser/src/main/java/io/confluent/ksql/parser/tree/TableElements.java index a98df57c28e8..a4980acce47e 100644 --- a/ksql-parser/src/main/java/io/confluent/ksql/parser/tree/TableElements.java +++ b/ksql-parser/src/main/java/io/confluent/ksql/parser/tree/TableElements.java @@ -16,11 +16,16 @@ package io.confluent.ksql.parser.tree; import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; import com.google.errorprone.annotations.Immutable; +import io.confluent.ksql.parser.tree.TableElement.Namespace; +import io.confluent.ksql.schema.ksql.SqlBaseType; import io.confluent.ksql.util.KsqlException; +import java.util.ArrayList; import java.util.Arrays; import java.util.Iterator; import java.util.List; +import java.util.Map; import java.util.Map.Entry; import java.util.Objects; import java.util.stream.Collectors; @@ -76,17 +81,60 @@ private TableElements(final ImmutableList elements) { } private static TableElements build(final Stream elements) { - final List valueColumns = elements.collect(Collectors.toList()); + final Map> split = splitByElementType(elements); + final List keyColumns = split.getOrDefault(Boolean.TRUE, ImmutableList.of()); + final List valueColumns = split.getOrDefault(Boolean.FALSE, ImmutableList.of()); + + throwOnDuplicateNames(keyColumns, "KEY"); throwOnDuplicateNames(valueColumns, "non-KEY"); + final long numKeyColumns = keyColumns.size(); + + if (numKeyColumns > 1) { + throw new KsqlException("KSQL does not yet support multiple KEY columns"); + } + + if (numKeyColumns == 1 + && keyColumns.get(0).getType().getSqlType().baseType() != SqlBaseType.STRING) { + throw new KsqlException("KEY columns must be of type STRING: " + keyColumns.get(0).getName()); + } + final ImmutableList.Builder builder = ImmutableList.builder(); + builder.addAll(keyColumns); builder.addAll(valueColumns); return new TableElements(builder.build()); } + private static Map> splitByElementType( + final Stream elements + ) { + final List keyFields = new ArrayList<>(); + final List valueFields = new ArrayList<>(); + + elements.forEach(element -> { + if (element.getNamespace() == Namespace.VALUE) { + valueFields.add(element); + return; + } + + if (!valueFields.isEmpty()) { + throw new KsqlException("KEY column declared after VALUE column: " + element.getName() + + System.lineSeparator() + + "All KEY columns must be declared before any VALUE column(s)."); + } + + keyFields.add(element); + }); + + return ImmutableMap.of( + Boolean.TRUE, keyFields, + Boolean.FALSE, valueFields + ); + } + private static void throwOnDuplicateNames( final List columns, final String type diff --git a/ksql-parser/src/test/java/io/confluent/ksql/parser/KsqlParserTest.java b/ksql-parser/src/test/java/io/confluent/ksql/parser/KsqlParserTest.java index 197e24371ff8..927d4e378bb8 100644 --- a/ksql-parser/src/test/java/io/confluent/ksql/parser/KsqlParserTest.java +++ b/ksql-parser/src/test/java/io/confluent/ksql/parser/KsqlParserTest.java @@ -69,7 +69,6 @@ import io.confluent.ksql.parser.tree.WithinExpression; import io.confluent.ksql.schema.ksql.LogicalSchema; import io.confluent.ksql.schema.ksql.SqlBaseType; -import io.confluent.ksql.schema.ksql.types.SqlStruct; import io.confluent.ksql.serde.Format; import io.confluent.ksql.serde.SerdeOption; import io.confluent.ksql.serde.json.KsqlJsonSerdeFactory; @@ -488,85 +487,42 @@ public void testUDF() { Assert.assertTrue(column2.getExpression().toString().equalsIgnoreCase("FLOOR(ABS(T1.COL3))")); } - @Test - public void testCreateStreamWithTopic() { - final String - queryStr = - "CREATE STREAM orders (ordertime bigint, orderid varchar, itemid varchar, orderunits " - + "double) WITH (key='ordertime', kafka_topic='foo', value_format='json');"; - final Statement statement = KsqlParserTestUtil.buildSingleAst(queryStr, metaStore).getStatement(); - Assert.assertTrue(statement instanceof CreateStream); - final CreateStream createStream = (CreateStream)statement; - Assert.assertTrue(createStream.getName().toString().equalsIgnoreCase("ORDERS")); - assertThat(Iterables.size(createStream.getElements()), is(4)); - assertThat(Iterables.get(createStream.getElements(), 0).getName(), is("ORDERTIME")); - } - - @Test - public void testCreateStreamWithTopicWithStruct() { - final String - queryStr = - "CREATE STREAM orders (ordertime bigint, orderid varchar, itemid varchar, orderunits " - + "double, arraycol array, mapcol map, " - + "order_address STRUCT< number VARCHAR, street VARCHAR, zip INTEGER, city " - + "VARCHAR, state VARCHAR >) WITH (key='ordertime', value_format='json', kafka_topic='foo');"; - final Statement statement = KsqlParserTestUtil.buildSingleAst(queryStr, metaStore).getStatement(); - Assert.assertTrue(statement instanceof CreateStream); - final CreateStream createStream = (CreateStream)statement; - assertThat(createStream.getName().toString().toUpperCase(), equalTo("ORDERS")); - assertThat(Iterables.size(createStream.getElements()), equalTo(7)); - assertThat(Iterables.get(createStream.getElements(), 0).getName().toLowerCase(), equalTo("ordertime")); - assertThat(Iterables.get(createStream.getElements(), 6).getType().getSqlType().baseType(), equalTo(SqlBaseType.STRUCT)); - final SqlStruct struct = (SqlStruct) Iterables.get(createStream.getElements(), 6).getType().getSqlType(); - assertThat(struct.getFields(), hasSize(5)); - assertThat(struct.getFields().get(0).getType().baseType(), equalTo(SqlBaseType.STRING)); - } - @Test public void testCreateStream() { - final String - queryStr = - "CREATE STREAM orders (ordertime bigint, orderid varchar, itemid varchar, orderunits " - + "double) WITH (value_format = 'avro', kafka_topic='orders_topic');"; - final Statement statement = KsqlParserTestUtil.buildSingleAst(queryStr, metaStore).getStatement(); - Assert.assertTrue(statement instanceof CreateStream); - final CreateStream createStream = (CreateStream)statement; - Assert.assertTrue(createStream.getName().toString().equalsIgnoreCase("ORDERS")); - assertThat(Iterables.size(createStream.getElements()), is(4)); - assertThat(Iterables.get(createStream.getElements(), 0).getName(), is("ORDERTIME")); - Assert.assertTrue(createStream.getProperties().getKafkaTopic().equalsIgnoreCase("orders_topic")); - Assert.assertTrue(createStream.getProperties().getValueFormat().equals(Format.AVRO)); - - } + // When: + final CreateStream result = (CreateStream) KsqlParserTestUtil.buildSingleAst("CREATE STREAM orders (" + + "ordertime bigint, " + + "orderid varchar, " + + "itemid varchar, " + + "orderunits double, " + + "arraycol array, " + + "mapcol map, " + + "order_address STRUCT" + + ") WITH (value_format = 'avro',kafka_topic='orders_topic');", metaStore).getStatement(); - @Test - public void testCreateTableWithTopic() { - final String - queryStr = - "CREATE TABLE users (usertime bigint, userid varchar, regionid varchar, gender varchar) " - + "WITH (kafka_topic='foo', value_format='json', key='userid');"; - final Statement statement = KsqlParserTestUtil.buildSingleAst(queryStr, metaStore).getStatement(); - Assert.assertTrue(statement instanceof CreateTable); - final CreateTable createTable = (CreateTable)statement; - Assert.assertTrue("testCreateTable failed.", createTable.getName().toString().equalsIgnoreCase("USERS")); - assertThat(Iterables.size(createTable.getElements()), is(4)); - assertThat(Iterables.get(createTable.getElements(), 0).getName(), is("USERTIME")); + // Then: + assertThat(result.getName().toString(), equalTo("ORDERS")); + assertThat(Iterables.size(result.getElements()), equalTo(7)); + assertThat(Iterables.get(result.getElements(), 0).getName(), equalTo("ORDERTIME")); + assertThat(Iterables.get(result.getElements(), 6).getType().getSqlType().baseType(), equalTo(SqlBaseType.STRUCT)); + assertThat(result.getProperties().getKafkaTopic(), equalTo("orders_topic")); + assertThat(result.getProperties().getValueFormat(), equalTo(Format.AVRO)); } @Test public void testCreateTable() { - final String - queryStr = + // When: + final CreateTable result = (CreateTable) KsqlParserTestUtil.buildSingleAst( "CREATE TABLE users (usertime bigint, userid varchar, regionid varchar, gender varchar) " - + "WITH (kafka_topic = 'users_topic', value_format='json', key = 'userid');"; - final Statement statement = KsqlParserTestUtil.buildSingleAst(queryStr, metaStore).getStatement(); - Assert.assertTrue(statement instanceof CreateTable); - final CreateTable createTable = (CreateTable)statement; - Assert.assertTrue(createTable.getName().toString().equalsIgnoreCase("USERS")); - assertThat(Iterables.size(createTable.getElements()), is(4)); - assertThat(Iterables.get(createTable.getElements(), 0).getName(), is("USERTIME")); - Assert.assertTrue(createTable.getProperties().getKafkaTopic().equalsIgnoreCase("users_topic")); - Assert.assertTrue(createTable.getProperties().getValueFormat().equals(Format.JSON)); + + "WITH (kafka_topic='foo', value_format='json', key='userid');", metaStore).getStatement(); + + // Then: + assertThat(result.getName().toString(), equalTo("USERS")); + assertThat(Iterables.size(result.getElements()), equalTo(4)); + assertThat(Iterables.get(result.getElements(), 0).getName(), equalTo("USERTIME")); + assertThat(result.getProperties().getKafkaTopic(), equalTo("foo")); + assertThat(result.getProperties().getValueFormat(), equalTo(Format.JSON)); + assertThat(result.getProperties().getKeyField(), equalTo(Optional.of("userid"))); } @Test diff --git a/ksql-parser/src/test/java/io/confluent/ksql/parser/SchemaParserTest.java b/ksql-parser/src/test/java/io/confluent/ksql/parser/SchemaParserTest.java index 22fe0550b8d6..87c9e16213db 100644 --- a/ksql-parser/src/test/java/io/confluent/ksql/parser/SchemaParserTest.java +++ b/ksql-parser/src/test/java/io/confluent/ksql/parser/SchemaParserTest.java @@ -18,9 +18,11 @@ import static org.hamcrest.CoreMatchers.is; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.contains; +import static org.hamcrest.Matchers.hasItem; import com.google.common.collect.Iterables; import io.confluent.ksql.parser.tree.TableElement; +import io.confluent.ksql.parser.tree.TableElement.Namespace; import io.confluent.ksql.parser.tree.TableElements; import io.confluent.ksql.parser.tree.Type; import io.confluent.ksql.schema.ksql.types.SqlTypes; @@ -44,8 +46,23 @@ public void shouldParseValidSchema() { // Then: assertThat(elements, contains( - new TableElement("FOO", new Type(SqlTypes.INTEGER)), - new TableElement("BAR", new Type(SqlTypes.map(SqlTypes.STRING))) + new TableElement(Namespace.VALUE, "FOO", new Type(SqlTypes.INTEGER)), + new TableElement(Namespace.VALUE, "BAR", new Type(SqlTypes.map(SqlTypes.STRING))) + )); + } + + @Test + public void shouldParseValidSchemaWithKeyField() { + // Given: + final String schema = "ROWKEY STRING KEY, bar INT"; + + // When: + final TableElements elements = SchemaParser.parse(schema); + + // Then: + assertThat(elements, contains( + new TableElement(Namespace.KEY, "ROWKEY", new Type(SqlTypes.STRING)), + new TableElement(Namespace.VALUE, "BAR", new Type(SqlTypes.INTEGER)) )); } @@ -58,8 +75,8 @@ public void shouldParseQuotedSchema() { final TableElements elements = SchemaParser.parse(schema); // Then: - assertThat(elements, contains( - new TableElement("END", new Type(SqlTypes.STRING)) + assertThat(elements, hasItem( + new TableElement(Namespace.VALUE, "END", new Type(SqlTypes.STRING)) )); } @@ -72,8 +89,8 @@ public void shouldParseQuotedMixedCase() { final TableElements elements = SchemaParser.parse(schema); // Then: - assertThat(elements, contains( - new TableElement("End", new Type(SqlTypes.STRING)) + assertThat(elements, hasItem( + new TableElement(Namespace.VALUE, "End", new Type(SqlTypes.STRING)) )); } diff --git a/ksql-parser/src/test/java/io/confluent/ksql/parser/SqlFormatterTest.java b/ksql-parser/src/test/java/io/confluent/ksql/parser/SqlFormatterTest.java index 3cafbe8e315b..4d1400c31e19 100644 --- a/ksql-parser/src/test/java/io/confluent/ksql/parser/SqlFormatterTest.java +++ b/ksql-parser/src/test/java/io/confluent/ksql/parser/SqlFormatterTest.java @@ -46,6 +46,7 @@ import io.confluent.ksql.parser.tree.StringLiteral; import io.confluent.ksql.parser.tree.Table; import io.confluent.ksql.parser.tree.TableElement; +import io.confluent.ksql.parser.tree.TableElement.Namespace; import io.confluent.ksql.parser.tree.TableElements; import io.confluent.ksql.parser.tree.Type; import io.confluent.ksql.parser.tree.WithinExpression; @@ -122,9 +123,14 @@ CommonCreateConfigs.VALUE_FORMAT_PROPERTY, new StringLiteral("JSON"), CommonCreateConfigs.KAFKA_TOPIC_NAME_PROPERTY, new StringLiteral("topic_test")) ); + private static final TableElements ELEMENTS_WITH_KEY = TableElements.of( + new TableElement(Namespace.KEY, "ROWKEY", new Type(SqlTypes.STRING)), + new TableElement(Namespace.VALUE, "Foo", new Type(SqlTypes.STRING)) + ); + private static final TableElements ELEMENTS_WITHOUT_KEY = TableElements.of( - new TableElement("Foo", new Type(SqlTypes.STRING)), - new TableElement("Bar", new Type(SqlTypes.STRING)) + new TableElement(Namespace.VALUE, "Foo", new Type(SqlTypes.STRING)), + new TableElement(Namespace.VALUE, "Bar", new Type(SqlTypes.STRING)) ); @Before @@ -175,6 +181,23 @@ public void setUp() { metaStore.putSource(ksqlTableOrders); } + @Test + public void shouldFormatCreateStreamStatementWithExplicitKey() { + // Given: + final CreateStream createStream = new CreateStream( + QualifiedName.of("TEST"), + ELEMENTS_WITH_KEY, + false, + SOME_WITH_PROPS); + + // When: + final String sql = SqlFormatter.formatSql(createStream); + + // Then: + assertThat(sql, is("CREATE STREAM TEST (ROWKEY STRING KEY, Foo STRING) " + + "WITH (KAFKA_TOPIC='topic_test', KEY='ORDERID', VALUE_FORMAT='JSON');")); + } + @Test public void shouldFormatCreateStreamStatementWithImplicitKey() { // Given: @@ -192,6 +215,23 @@ public void shouldFormatCreateStreamStatementWithImplicitKey() { + "WITH (KAFKA_TOPIC='topic_test', KEY='ORDERID', VALUE_FORMAT='JSON');")); } + @Test + public void shouldFormatCreateTableStatementWithExplicitKey() { + // Given: + final CreateTable createTable = new CreateTable( + QualifiedName.of("TEST"), + ELEMENTS_WITH_KEY, + false, + SOME_WITH_PROPS); + + // When: + final String sql = SqlFormatter.formatSql(createTable); + + // Then: + assertThat(sql, is("CREATE TABLE TEST (ROWKEY STRING KEY, Foo STRING) " + + "WITH (KAFKA_TOPIC='topic_test', KEY='ORDERID', VALUE_FORMAT='JSON');")); + } + @Test public void shouldFormatCreateTableStatementWithImplicitKey() { // Given: @@ -213,8 +253,8 @@ public void shouldFormatCreateTableStatementWithImplicitKey() { public void shouldFormatTableElementsNamedAfterReservedWords() { // Given: final TableElements tableElements = TableElements.of( - new TableElement("GROUP", new Type(SqlTypes.STRING)), - new TableElement("Having", new Type(SqlTypes.STRING)) + new TableElement(Namespace.VALUE, "GROUP", new Type(SqlTypes.STRING)), + new TableElement(Namespace.VALUE, "Having", new Type(SqlTypes.STRING)) ); final CreateStream createStream = new CreateStream( diff --git a/ksql-parser/src/test/java/io/confluent/ksql/parser/rewrite/StatementRewriterTest.java b/ksql-parser/src/test/java/io/confluent/ksql/parser/rewrite/StatementRewriterTest.java index 815e0f9a9127..d602be1a71d4 100644 --- a/ksql-parser/src/test/java/io/confluent/ksql/parser/rewrite/StatementRewriterTest.java +++ b/ksql-parser/src/test/java/io/confluent/ksql/parser/rewrite/StatementRewriterTest.java @@ -18,7 +18,6 @@ import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.CoreMatchers.nullValue; import static org.hamcrest.MatcherAssert.assertThat; -import static org.hamcrest.Matchers.hasSize; import static org.hamcrest.Matchers.is; import static org.hamcrest.core.IsInstanceOf.instanceOf; import static org.hamcrest.core.IsNot.not; @@ -40,9 +39,9 @@ import io.confluent.ksql.parser.tree.SingleColumn; import io.confluent.ksql.parser.tree.Statement; import io.confluent.ksql.schema.ksql.SqlBaseType; -import io.confluent.ksql.schema.ksql.types.SqlStruct; import io.confluent.ksql.serde.Format; import io.confluent.ksql.util.MetaStoreFixture; +import java.util.Optional; import org.junit.Assert; import org.junit.Before; import org.junit.Test; @@ -304,92 +303,48 @@ public void testUDF() { assertThat(column2.getExpression().toString(), equalTo("FLOOR(ABS(T1.COL3))")); } - @Test - public void testCreateStreamWithTopic() { - final String queryStr = - "CREATE STREAM orders (ordertime bigint, orderid varchar, itemid varchar, orderunits " - + "double) WITH (kafka_topic = 'foo', value_format = 'json', key='ordertime');"; - final Statement statement = parse(queryStr); - - final Statement rewrittenStatement = (Statement) statementRewriter.process(statement, null); - - assertThat(rewrittenStatement, instanceOf(CreateStream.class)); - final CreateStream createStream = (CreateStream)rewrittenStatement; - assertThat(createStream.getName().toString(), equalTo("ORDERS")); - assertThat(Iterables.size(createStream.getElements()), equalTo(4)); - assertThat(Iterables.get(createStream.getElements(), 0).getName(), equalTo("ORDERTIME")); - } - - @Test - public void testCreateStreamWithTopicWithStruct() { - final String queryStr = - "CREATE STREAM orders (ordertime bigint, orderid varchar, itemid varchar, orderunits " - + "double, arraycol array, mapcol map, " - + "order_address STRUCT< number VARCHAR, street VARCHAR, zip INTEGER, city " - + "VARCHAR, state VARCHAR >) WITH (kafka_topic='foo', value_format='json', key='ordertime');"; - final Statement statement = parse(queryStr); - - final Statement rewrittenStatement = (Statement) statementRewriter.process(statement, null); - - assertThat(rewrittenStatement, instanceOf(CreateStream.class)); - final CreateStream createStream = (CreateStream)rewrittenStatement; - assertThat(createStream.getName().toString().toUpperCase(), equalTo("ORDERS")); - assertThat(Iterables.size(createStream.getElements()), equalTo(7)); - assertThat(Iterables.get(createStream.getElements(), 0).getName().toLowerCase(), equalTo("ordertime")); - assertThat(Iterables.get(createStream.getElements(), 6).getType().getSqlType().baseType(), equalTo(SqlBaseType.STRUCT)); - final SqlStruct struct = (SqlStruct) Iterables.get(createStream.getElements(), 6).getType().getSqlType(); - assertThat(struct.getFields(), hasSize(5)); - assertThat(struct.getFields().get(0).getType().baseType(), equalTo(SqlBaseType.STRING)); - } - @Test public void testCreateStream() { - final String queryStr = - "CREATE STREAM orders " - + "(ordertime bigint, orderid varchar, itemid varchar, orderunits double) " - + "WITH (value_format = 'avro',kafka_topic='orders_topic');"; - final Statement statement = parse(queryStr); - - final Statement rewrittenStatement = (Statement) statementRewriter.process(statement, null); - - assertThat(rewrittenStatement, instanceOf(CreateStream.class)); - final CreateStream createStream = (CreateStream)rewrittenStatement; + // Given: + final Statement statement = parse("CREATE STREAM orders (" + + "ordertime bigint, " + + "orderid varchar, " + + "itemid varchar, " + + "orderunits double, " + + "arraycol array, " + + "mapcol map, " + + "order_address STRUCT" + + ") WITH (value_format = 'avro',kafka_topic='orders_topic');"); - assertThat(createStream.getName().toString(), equalTo("ORDERS")); - assertThat(Iterables.size(createStream.getElements()), equalTo(4)); - assertThat(Iterables.get(createStream.getElements(), 0).getName(), equalTo("ORDERTIME")); - assertThat(createStream.getProperties().getKafkaTopic(), equalTo("orders_topic")); - assertThat(createStream.getProperties().getValueFormat(), equalTo(Format.AVRO)); - } + // When: + final CreateStream result = (CreateStream) statementRewriter.process(statement, null); - @Test - public void testCreateTableWithTopic() { - final String queryStr = - "CREATE TABLE users (usertime bigint, userid varchar, regionid varchar, gender varchar) " - + "WITH (kafka_topic='foo', value_format='json', key='userid');"; - final Statement statement = parse(queryStr); - final Statement rewrittenStatement = (Statement) statementRewriter.process(statement, null); - assertThat(rewrittenStatement, is(instanceOf(CreateTable.class))); - final CreateTable createTable = (CreateTable)rewrittenStatement; - assertThat(createTable.getName().toString(), equalTo("USERS")); - assertThat(Iterables.size(createTable.getElements()), equalTo(4)); - assertThat(Iterables.get(createTable.getElements(), 0).getName(), equalTo("USERTIME")); + // Then: + assertThat(result.getName().toString(), equalTo("ORDERS")); + assertThat(Iterables.size(result.getElements()), equalTo(7)); + assertThat(Iterables.get(result.getElements(), 0).getName(), equalTo("ORDERTIME")); + assertThat(Iterables.get(result.getElements(), 6).getType().getSqlType().baseType(), equalTo(SqlBaseType.STRUCT)); + assertThat(result.getProperties().getKafkaTopic(), equalTo("orders_topic")); + assertThat(result.getProperties().getValueFormat(), equalTo(Format.AVRO)); } @Test public void testCreateTable() { - final String queryStr = + // Given: + final Statement statement = parse( "CREATE TABLE users (usertime bigint, userid varchar, regionid varchar, gender varchar) " - + "WITH (kafka_topic = 'users_topic', value_format='json', key = 'userid');"; - final Statement statement = parse(queryStr); - final Statement rewrittenStatement = (Statement) statementRewriter.process(statement, null); - assertThat(rewrittenStatement, is(instanceOf(CreateTable.class))); - final CreateTable createTable = (CreateTable)rewrittenStatement; - assertThat(createTable.getName().toString(), equalTo("USERS")); - assertThat(Iterables.size(createTable.getElements()), equalTo(4)); - assertThat(Iterables.get(createTable.getElements(), 0).getName(), equalTo("USERTIME")); - assertThat(createTable.getProperties().getKafkaTopic(), equalTo("users_topic")); - assertThat(createTable.getProperties().getValueFormat(), equalTo(Format.JSON)); + + "WITH (kafka_topic='foo', value_format='json', key='userid');"); + + // When: + final CreateTable result = (CreateTable) statementRewriter.process(statement, null); + + // Then: + assertThat(result.getName().toString(), equalTo("USERS")); + assertThat(Iterables.size(result.getElements()), equalTo(4)); + assertThat(Iterables.get(result.getElements(), 0).getName(), equalTo("USERTIME")); + assertThat(result.getProperties().getKafkaTopic(), equalTo("foo")); + assertThat(result.getProperties().getValueFormat(), equalTo(Format.JSON)); + assertThat(result.getProperties().getKeyField(), equalTo(Optional.of("userid"))); } @Test diff --git a/ksql-parser/src/test/java/io/confluent/ksql/parser/tree/CreateStreamTest.java b/ksql-parser/src/test/java/io/confluent/ksql/parser/tree/CreateStreamTest.java index 3315170343c5..644a720a0ae5 100644 --- a/ksql-parser/src/test/java/io/confluent/ksql/parser/tree/CreateStreamTest.java +++ b/ksql-parser/src/test/java/io/confluent/ksql/parser/tree/CreateStreamTest.java @@ -18,6 +18,7 @@ import com.google.common.collect.ImmutableMap; import com.google.common.testing.EqualsTester; import io.confluent.ksql.parser.properties.with.CreateSourceProperties; +import io.confluent.ksql.parser.tree.TableElement.Namespace; import io.confluent.ksql.properties.with.CommonCreateConfigs; import io.confluent.ksql.schema.ksql.types.SqlTypes; import java.util.Optional; @@ -29,7 +30,7 @@ public class CreateStreamTest { public static final NodeLocation OTHER_LOCATION = new NodeLocation(1, 0); private static final QualifiedName SOME_NAME = QualifiedName.of("bob"); private static final TableElements SOME_ELEMENTS = TableElements.of( - new TableElement("Bob", new Type(SqlTypes.STRING)) + new TableElement(Namespace.VALUE, "Bob", new Type(SqlTypes.STRING)) ); private static final CreateSourceProperties SOME_PROPS = CreateSourceProperties.from( ImmutableMap.of( diff --git a/ksql-parser/src/test/java/io/confluent/ksql/parser/tree/CreateTableTest.java b/ksql-parser/src/test/java/io/confluent/ksql/parser/tree/CreateTableTest.java index 127803ea5826..41d2aff1c1ce 100644 --- a/ksql-parser/src/test/java/io/confluent/ksql/parser/tree/CreateTableTest.java +++ b/ksql-parser/src/test/java/io/confluent/ksql/parser/tree/CreateTableTest.java @@ -18,6 +18,7 @@ import com.google.common.collect.ImmutableMap; import com.google.common.testing.EqualsTester; import io.confluent.ksql.parser.properties.with.CreateSourceProperties; +import io.confluent.ksql.parser.tree.TableElement.Namespace; import io.confluent.ksql.properties.with.CommonCreateConfigs; import io.confluent.ksql.schema.ksql.types.SqlTypes; import java.util.Optional; @@ -29,7 +30,7 @@ public class CreateTableTest { public static final NodeLocation OTHER_LOCATION = new NodeLocation(1, 0); private static final QualifiedName SOME_NAME = QualifiedName.of("bob"); private static final TableElements SOME_ELEMENTS = TableElements.of( - new TableElement("Bob", new Type(SqlTypes.STRING)) + new TableElement(Namespace.VALUE, "Bob", new Type(SqlTypes.STRING)) ); private static final CreateSourceProperties SOME_PROPS = CreateSourceProperties.from( ImmutableMap.of( diff --git a/ksql-parser/src/test/java/io/confluent/ksql/parser/tree/TableElementTest.java b/ksql-parser/src/test/java/io/confluent/ksql/parser/tree/TableElementTest.java index 40f080572dbd..a4ae71fcf864 100644 --- a/ksql-parser/src/test/java/io/confluent/ksql/parser/tree/TableElementTest.java +++ b/ksql-parser/src/test/java/io/confluent/ksql/parser/tree/TableElementTest.java @@ -15,6 +15,8 @@ package io.confluent.ksql.parser.tree; +import static io.confluent.ksql.parser.tree.TableElement.Namespace.KEY; +import static io.confluent.ksql.parser.tree.TableElement.Namespace.VALUE; import static io.confluent.ksql.util.SchemaUtil.ROWKEY_NAME; import static io.confluent.ksql.util.SchemaUtil.ROWTIME_NAME; import static org.hamcrest.MatcherAssert.assertThat; @@ -40,16 +42,19 @@ public class TableElementTest { public void shouldImplementEquals() { new EqualsTester() .addEqualityGroup( - new TableElement(A_LOCATION, "name", new Type(SqlTypes.STRING)), - new TableElement("name", new Type(SqlTypes.STRING)) + new TableElement(A_LOCATION, VALUE, "name", new Type(SqlTypes.STRING)), + new TableElement(VALUE, "name", new Type(SqlTypes.STRING)) ) .addEqualityGroup( - new TableElement("different", new Type(SqlTypes.STRING)) + new TableElement(VALUE, "different", new Type(SqlTypes.STRING)) ) .addEqualityGroup( - new TableElement("name", new Type(SqlTypes.INTEGER)) + new TableElement(VALUE, "name", new Type(SqlTypes.INTEGER)) ) - .testEquals(); + .addEqualityGroup( + new TableElement(KEY, "ROWKEY", new Type(SqlTypes.STRING)) + ) + .testEquals(); } @Test @@ -59,7 +64,17 @@ public void shouldThrowOnRowTimeValueColumn() { expectedException.expectMessage("line 2:6: 'ROWTIME' is a reserved field name."); // When: - new TableElement(A_LOCATION, ROWTIME_NAME, new Type(SqlTypes.BIGINT)); + new TableElement(A_LOCATION, VALUE, ROWTIME_NAME, new Type(SqlTypes.BIGINT)); + } + + @Test + public void shouldThrowOnRowTimeKeyColumn() { + // Then: + expectedException.expect(ParsingException.class); + expectedException.expectMessage("line 2:6: 'ROWTIME' is a reserved field name."); + + // When: + new TableElement(A_LOCATION, VALUE, ROWTIME_NAME, new Type(SqlTypes.BIGINT)); } @Test @@ -67,16 +82,49 @@ public void shouldThrowOnRowKeyValueColumn() { // Then: expectedException.expect(ParsingException.class); expectedException.expectMessage( - "line 2:6: 'ROWKEY' is a reserved field name."); + "line 2:6: 'ROWKEY' is a reserved field name. It can only be used for KEY fields."); // When: - new TableElement(A_LOCATION, ROWKEY_NAME, new Type(SqlTypes.STRING)); + new TableElement(A_LOCATION, VALUE, ROWKEY_NAME, new Type(SqlTypes.STRING)); + } + + @Test + public void shouldNotThrowOnRowKeyKeyColumn() { + new TableElement( + A_LOCATION, + KEY, + ROWKEY_NAME, + new Type(SqlTypes.STRING) + ); + } + + @Test + public void shouldThrowOnRowKeyIfNotString() { + // Then: + expectedException.expect(ParsingException.class); + expectedException.expectMessage("line 2:6: 'ROWKEY' is a KEY field with an unsupported type. " + + "KSQL currently only supports KEY fields of type STRING."); + + // When: + new TableElement(A_LOCATION, KEY, ROWKEY_NAME, new Type(SqlTypes.INTEGER)); + } + + @Test + public void shouldThrowOnKeyColumnThatIsNotCalledRowKey() { + // Then: + expectedException.expect(ParsingException.class); + expectedException.expectMessage("line 2:6: 'someKey' is an invalid KEY field name. " + + "KSQL currently only supports KEY fields named ROWKEY."); + + // When: + new TableElement(A_LOCATION, KEY, "someKey", new Type(SqlTypes.INTEGER)); } @Test public void shouldReturnName() { // Given: - final TableElement element = new TableElement("name", new Type(SqlTypes.STRING)); + final TableElement element = + new TableElement(VALUE, "name", new Type(SqlTypes.STRING)); // Then: assertThat(element.getName(), is("name")); @@ -85,9 +133,18 @@ public void shouldReturnName() { @Test public void shouldReturnType() { // Given: - final TableElement element = new TableElement("name", new Type(SqlTypes.STRING)); + final TableElement element = new TableElement(VALUE, "name", new Type(SqlTypes.STRING)); // Then: assertThat(element.getType(), is(new Type(SqlTypes.STRING))); } + + @Test + public void shouldReturnNamespace() { + // Given: + final TableElement valueElement = new TableElement(VALUE, "name", new Type(SqlTypes.STRING)); + + // Then: + assertThat(valueElement.getNamespace(), is(VALUE)); + } } \ No newline at end of file diff --git a/ksql-parser/src/test/java/io/confluent/ksql/parser/tree/TableElementsTest.java b/ksql-parser/src/test/java/io/confluent/ksql/parser/tree/TableElementsTest.java index 8a0f344d616a..be6544168251 100644 --- a/ksql-parser/src/test/java/io/confluent/ksql/parser/tree/TableElementsTest.java +++ b/ksql-parser/src/test/java/io/confluent/ksql/parser/tree/TableElementsTest.java @@ -15,6 +15,8 @@ package io.confluent.ksql.parser.tree; +import static io.confluent.ksql.parser.tree.TableElement.Namespace.KEY; +import static io.confluent.ksql.parser.tree.TableElement.Namespace.VALUE; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.contains; import static org.hamcrest.Matchers.is; @@ -23,6 +25,7 @@ import com.google.common.collect.ImmutableList; import com.google.common.testing.EqualsTester; +import io.confluent.ksql.parser.tree.TableElement.Namespace; import io.confluent.ksql.schema.ksql.types.SqlTypes; import io.confluent.ksql.util.KsqlException; import java.util.List; @@ -42,7 +45,7 @@ public class TableElementsTest { @Test public void shouldImplementHashCodeAndEqualsProperty() { final List someElements = ImmutableList.of( - tableElement("bob", SOME_TYPE) + tableElement(VALUE, "bob", SOME_TYPE) ); new EqualsTester() @@ -51,14 +54,50 @@ public void shouldImplementHashCodeAndEqualsProperty() { .testEquals(); } + @Test + public void shouldThrowOnOutOfOrderKeyColumns() { + // Given: + final List elements = ImmutableList.of( + tableElement(VALUE, "v0", SOME_TYPE), + tableElement(KEY, "key", STRING_TYPE) + ); + + // Then: + expectedException.expect(KsqlException.class); + expectedException.expectMessage("KEY column declared after VALUE column: key"); + + // When: + TableElements.of(elements); + } + + @Test + public void shouldThrowOnDuplicateKeyColumns() { + // Given: + final List elements = ImmutableList.of( + tableElement(KEY, "k0", STRING_TYPE), + tableElement(KEY, "k0", STRING_TYPE), + tableElement(KEY, "k1", STRING_TYPE), + tableElement(KEY, "k1", STRING_TYPE) + ); + + // Then: + expectedException.expect(KsqlException.class); + expectedException.expectMessage("Duplicate KEY column names:"); + expectedException.expectMessage("k0"); + expectedException.expectMessage("k1"); + + // When: + TableElements.of(elements); + } + @Test public void shouldThrowOnDuplicateValueColumns() { // Given: final List elements = ImmutableList.of( - tableElement("v0", SOME_TYPE), - tableElement("v0", SOME_TYPE), - tableElement("v1", SOME_TYPE), - tableElement("v1", SOME_TYPE) + tableElement(VALUE, "v0", SOME_TYPE), + tableElement(VALUE, "v0", SOME_TYPE), + tableElement(VALUE, "v1", SOME_TYPE), + tableElement(VALUE, "v1", SOME_TYPE) ); // Then: @@ -75,7 +114,7 @@ public void shouldThrowOnDuplicateValueColumns() { public void shouldNotThrowOnNoKeyElements() { // Given: final List elements = ImmutableList.of( - tableElement("v0", new Type(SqlTypes.INTEGER)) + tableElement(VALUE, "v0", new Type(SqlTypes.INTEGER)) ); // When: @@ -84,11 +123,44 @@ public void shouldNotThrowOnNoKeyElements() { // Then: did not throw. } + @Test + public void shouldThrowIfMoreThatOneKeyColumn() { + // Given: + final List elements = ImmutableList.of( + tableElement(KEY, "k0", STRING_TYPE), + tableElement(KEY, "k1", STRING_TYPE), + tableElement(VALUE, "v0", SOME_TYPE) + ); + + // Then: + expectedException.expect(KsqlException.class); + expectedException.expectMessage("KSQL does not yet support multiple KEY columns"); + + // When: + TableElements.of(elements); + } + + @Test + public void shouldThrowIfKeyColumnNotString() { + // Given: + final List elements = ImmutableList.of( + tableElement(KEY, "k0", new Type(SqlTypes.INTEGER)), + tableElement(VALUE, "v0", SOME_TYPE) + ); + + // Then: + expectedException.expect(KsqlException.class); + expectedException.expectMessage("KEY columns must be of type STRING: k0"); + + // When: + TableElements.of(elements); + } + @Test public void shouldIterateElements() { // Given: - final TableElement te1 = tableElement("k0", STRING_TYPE); - final TableElement te2 = tableElement("v0", SOME_TYPE); + final TableElement te1 = tableElement(KEY, "k0", STRING_TYPE); + final TableElement te2 = tableElement(VALUE, "v0", SOME_TYPE); // When: final Iterable iterable = TableElements.of(ImmutableList.of(te1, te2)); @@ -101,8 +173,8 @@ public void shouldIterateElements() { public void shouldStreamElements() { // Given: final List elements = ImmutableList.of( - tableElement("k0", STRING_TYPE), - tableElement("v0", SOME_TYPE) + tableElement(KEY, "k0", STRING_TYPE), + tableElement(VALUE, "v0", SOME_TYPE) ); final TableElements tableElements = TableElements.of(elements); @@ -118,8 +190,8 @@ public void shouldStreamElements() { @Test public void shouldToString() { // Given: - final TableElement element0 = tableElement("k0", STRING_TYPE); - final TableElement element1 = tableElement("v0", SOME_TYPE); + final TableElement element0 = tableElement(KEY, "k0", STRING_TYPE); + final TableElement element1 = tableElement(VALUE, "v0", SOME_TYPE); final TableElements tableElements = TableElements.of(element0, element1); @@ -131,12 +203,14 @@ public void shouldToString() { } private static TableElement tableElement( + final Namespace namespace, final String name, final Type type ) { final TableElement te = mock(TableElement.class, name); when(te.getName()).thenReturn(name); when(te.getType()).thenReturn(type); + when(te.getNamespace()).thenReturn(namespace); return te; } } \ No newline at end of file diff --git a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/context/KsqlRestServiceContextFactory.java b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/context/KsqlRestServiceContextFactory.java index 3405aac861c4..da8b47425783 100644 --- a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/context/KsqlRestServiceContextFactory.java +++ b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/context/KsqlRestServiceContextFactory.java @@ -27,7 +27,6 @@ import java.util.function.Supplier; import javax.inject.Inject; import javax.ws.rs.core.SecurityContext; - import org.apache.kafka.streams.KafkaClientSupplier; import org.glassfish.hk2.api.Factory; diff --git a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/security/KsqlUserContextProvider.java b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/security/KsqlUserContextProvider.java index 5dd081384151..470c07036f79 100644 --- a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/security/KsqlUserContextProvider.java +++ b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/security/KsqlUserContextProvider.java @@ -16,7 +16,6 @@ package io.confluent.ksql.rest.server.security; import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient; - import io.confluent.ksql.rest.server.context.ConfiguredKafkaClientSupplier; import java.security.Principal; import java.util.function.Supplier; diff --git a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/integration/AuthorizationFunctionalTest.java b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/integration/AuthorizationFunctionalTest.java index 28c9ebc7f8be..6b66f690c05a 100644 --- a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/integration/AuthorizationFunctionalTest.java +++ b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/integration/AuthorizationFunctionalTest.java @@ -15,6 +15,15 @@ package io.confluent.ksql.rest.integration; +import static io.confluent.ksql.test.util.EmbeddedSingleNodeKafkaCluster.JAAS_KAFKA_PROPS_NAME; +import static io.confluent.ksql.test.util.EmbeddedSingleNodeKafkaCluster.VALID_USER1; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.is; +import static org.mockito.ArgumentMatchers.argThat; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.doNothing; +import static org.mockito.Mockito.doThrow; + import io.confluent.common.utils.IntegrationTest; import io.confluent.ksql.integration.IntegrationTestHarness; import io.confluent.ksql.rest.entity.KafkaTopicInfo; @@ -25,6 +34,9 @@ import io.confluent.ksql.services.ServiceContext; import io.confluent.ksql.test.util.secure.Credentials; import io.confluent.ksql.util.KsqlConfig; +import java.security.Principal; +import java.util.List; +import java.util.Optional; import org.apache.kafka.common.errors.AuthorizationException; import org.glassfish.hk2.api.Factory; import org.glassfish.hk2.utilities.binding.AbstractBinder; @@ -42,19 +54,6 @@ import org.mockito.Mock; import org.mockito.junit.MockitoJUnitRunner; -import java.security.Principal; -import java.util.List; -import java.util.Optional; - -import static io.confluent.ksql.test.util.EmbeddedSingleNodeKafkaCluster.JAAS_KAFKA_PROPS_NAME; -import static io.confluent.ksql.test.util.EmbeddedSingleNodeKafkaCluster.VALID_USER1; -import static org.hamcrest.MatcherAssert.assertThat; -import static org.hamcrest.Matchers.is; -import static org.mockito.ArgumentMatchers.argThat; -import static org.mockito.ArgumentMatchers.eq; -import static org.mockito.Mockito.doNothing; -import static org.mockito.Mockito.doThrow; - @Category({IntegrationTest.class}) @RunWith(MockitoJUnitRunner.class) public class AuthorizationFunctionalTest { diff --git a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/integration/MockKsqlSecurityExtension.java b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/integration/MockKsqlSecurityExtension.java index 8f38631bdc0c..a9408495e699 100644 --- a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/integration/MockKsqlSecurityExtension.java +++ b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/integration/MockKsqlSecurityExtension.java @@ -4,7 +4,6 @@ import io.confluent.ksql.rest.server.security.KsqlSecurityExtension; import io.confluent.ksql.rest.server.security.KsqlUserContextProvider; import io.confluent.ksql.util.KsqlConfig; - import java.util.Optional; /** diff --git a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/StandaloneExecutorTest.java b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/StandaloneExecutorTest.java index 4b32f003f004..57ccd89630cb 100644 --- a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/StandaloneExecutorTest.java +++ b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/StandaloneExecutorTest.java @@ -58,6 +58,7 @@ import io.confluent.ksql.parser.tree.StringLiteral; import io.confluent.ksql.parser.tree.Table; import io.confluent.ksql.parser.tree.TableElement; +import io.confluent.ksql.parser.tree.TableElement.Namespace; import io.confluent.ksql.parser.tree.TableElements; import io.confluent.ksql.parser.tree.Type; import io.confluent.ksql.parser.tree.UnsetProperty; @@ -112,7 +113,7 @@ public class StandaloneExecutorTest { private static final KsqlConfig ksqlConfig = new KsqlConfig(emptyMap()); private static final TableElements SOME_ELEMENTS = TableElements.of( - new TableElement("bob", new Type(SqlTypes.STRING))); + new TableElement(Namespace.VALUE, "bob", new Type(SqlTypes.STRING))); private static final QualifiedName SOME_NAME = QualifiedName.of("Bob"); private static final String SOME_TOPIC = "some-topic"; diff --git a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/context/KsqlRestServiceContextFactoryTest.java b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/context/KsqlRestServiceContextFactoryTest.java index dd205c66d432..abcefecc3f39 100644 --- a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/context/KsqlRestServiceContextFactoryTest.java +++ b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/context/KsqlRestServiceContextFactoryTest.java @@ -15,29 +15,27 @@ package io.confluent.ksql.rest.server.context; +import static org.hamcrest.Matchers.is; +import static org.junit.Assert.assertThat; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + import io.confluent.ksql.rest.server.security.KsqlSecurityExtension; import io.confluent.ksql.rest.server.security.KsqlUserContextProvider; import io.confluent.ksql.services.ServiceContext; import io.confluent.ksql.util.KsqlConfig; +import java.security.Principal; +import java.util.Optional; +import java.util.function.Function; +import javax.ws.rs.core.SecurityContext; import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; import org.mockito.Mock; import org.mockito.junit.MockitoJUnitRunner; -import javax.ws.rs.core.SecurityContext; - -import java.security.Principal; -import java.util.Optional; -import java.util.function.Function; - -import static org.hamcrest.Matchers.is; -import static org.junit.Assert.assertThat; -import static org.mockito.ArgumentMatchers.any; -import static org.mockito.ArgumentMatchers.eq; -import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.when; - @RunWith(MockitoJUnitRunner.class) public class KsqlRestServiceContextFactoryTest { private KsqlRestServiceContextFactory serviceContextFactory; diff --git a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/resources/KsqlResourceTest.java b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/resources/KsqlResourceTest.java index c1e574d6ad7c..6b50a8fa0876 100644 --- a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/resources/KsqlResourceTest.java +++ b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/resources/KsqlResourceTest.java @@ -81,6 +81,7 @@ import io.confluent.ksql.parser.tree.Statement; import io.confluent.ksql.parser.tree.StringLiteral; import io.confluent.ksql.parser.tree.TableElement; +import io.confluent.ksql.parser.tree.TableElement.Namespace; import io.confluent.ksql.parser.tree.TableElements; import io.confluent.ksql.parser.tree.TerminateQuery; import io.confluent.ksql.rest.entity.ClusterTerminateRequest; @@ -187,7 +188,7 @@ public class KsqlResourceTest { private static final ClusterTerminateRequest VALID_TERMINATE_REQUEST = new ClusterTerminateRequest(ImmutableList.of("Foo")); private static final TableElements SOME_ELEMENTS = TableElements.of( - new TableElement("f0", new io.confluent.ksql.parser.tree.Type(SqlTypes.STRING)) + new TableElement(Namespace.VALUE, "f0", new io.confluent.ksql.parser.tree.Type(SqlTypes.STRING)) ); private static final PreparedStatement STMT_0_WITH_SCHEMA = PreparedStatement.of( "sql with schema", diff --git a/ksql-version-metrics-client/src/test/java/io/confluent/ksql/version/metrics/collector/BasicCollectorTest.java b/ksql-version-metrics-client/src/test/java/io/confluent/ksql/version/metrics/collector/BasicCollectorTest.java index 557c11dd5473..b35f2d00fbca 100644 --- a/ksql-version-metrics-client/src/test/java/io/confluent/ksql/version/metrics/collector/BasicCollectorTest.java +++ b/ksql-version-metrics-client/src/test/java/io/confluent/ksql/version/metrics/collector/BasicCollectorTest.java @@ -24,7 +24,6 @@ import java.time.Clock; import java.util.concurrent.TimeUnit; import java.util.function.Supplier; - import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith;