From d935af3b6c8a4ec4ff2f408c5a854e38f11c8009 Mon Sep 17 00:00:00 2001 From: Andy Coates <8012398+big-andy-coates@users.noreply.github.com> Date: Fri, 27 Sep 2019 10:23:47 +0100 Subject: [PATCH] feat(static): allow logical schema to have fields in any order (#3422) * feat(static): allow logical schema to have fields in any order Our logical schema should be able to support schemas where the is not key column, no rowtime, or has key columns interleaved with value columns. This is because a projection does not necessarily include key at the start, or at all, or rowtime. --- .../ksql/schema/ksql/LogicalSchema.java | 361 ++++++++++-------- .../io/confluent/ksql/util/SchemaUtil.java | 9 - .../ksql/schema/ksql/LogicalSchemaTest.java | 200 ++++++---- .../ksql/analyzer/SourceSchemas.java | 5 +- .../confluent/ksql/planner/plan/JoinNode.java | 6 +- .../ksql/query/id/HybridQueryIdGenerator.java | 1 - .../query/id/SpecificQueryIdGenerator.java | 1 - .../confluent/ksql/KsqlContextTestUtil.java | 3 +- .../ksql/analyzer/AnalyzerFunctionalTest.java | 1 - .../ksql/engine/KsqlEngineTestUtil.java | 4 +- .../ksql/function/udf/math/RoundTest.java | 2 - .../ksql/integration/JsonFormatTest.java | 2 +- .../integration/SecureIntegrationTest.java | 2 +- .../physical/PhysicalPlanBuilderTest.java | 1 - .../ksql/planner/plan/JoinNodeTest.java | 10 +- .../planner/plan/KsqlBareOutputNodeTest.java | 2 +- .../query/id/HybridQueryIdGeneratorTest.java | 2 +- .../execution/testutil/TestExpressions.java | 22 +- .../util/ExpressionTypeManagerTest.java | 2 +- .../rest/server/computation/RecoveryTest.java | 2 +- .../computation/StatementExecutorTest.java | 4 +- 21 files changed, 371 insertions(+), 271 deletions(-) diff --git a/ksql-common/src/main/java/io/confluent/ksql/schema/ksql/LogicalSchema.java b/ksql-common/src/main/java/io/confluent/ksql/schema/ksql/LogicalSchema.java index cbf496efee41..59b6257ab7d7 100644 --- a/ksql-common/src/main/java/io/confluent/ksql/schema/ksql/LogicalSchema.java +++ b/ksql-common/src/main/java/io/confluent/ksql/schema/ksql/LogicalSchema.java @@ -27,13 +27,16 @@ import io.confluent.ksql.schema.ksql.types.SqlTypes; import io.confluent.ksql.util.KsqlException; import io.confluent.ksql.util.SchemaUtil; -import java.util.ArrayList; +import java.util.Arrays; import java.util.HashSet; import java.util.List; +import java.util.Map; import java.util.Objects; import java.util.Optional; import java.util.OptionalInt; import java.util.Set; +import java.util.function.Function; +import java.util.function.Predicate; import java.util.stream.Collectors; import org.apache.kafka.connect.data.ConnectSchema; import org.apache.kafka.connect.data.Schema; @@ -45,72 +48,65 @@ @Immutable public final class LogicalSchema { - private static final String KEY_KEYWORD = "KEY"; + private static final NamespacedColumn IMPLICIT_TIME_COLUMN = NamespacedColumn.of( + Column.of(SchemaUtil.ROWTIME_NAME, SqlTypes.BIGINT), + Namespace.META + ); - private static final List METADATA_SCHEMA = ImmutableList.builder() - .add(Column.of(SchemaUtil.ROWTIME_NAME, SqlTypes.BIGINT)) - .build(); + private static final NamespacedColumn IMPLICIT_KEY_COLUMN = NamespacedColumn.of( + Column.of(SchemaUtil.ROWKEY_NAME, SqlTypes.STRING), + Namespace.KEY + ); - private static final List IMPLICIT_KEY_SCHEMA = ImmutableList.builder() - .add(Column.of(SchemaUtil.ROWKEY_NAME, SqlTypes.STRING)) - .build(); - - private final List metadata; - private final List key; - private final List value; + private final ImmutableList columns; public static Builder builder() { return new Builder(); } - private LogicalSchema( - final List metadata, - final List key, - final List value - ) { - this.metadata = requireNonNull(metadata, "metadata"); - this.key = requireNonNull(key, "key"); - this.value = requireNonNull(value, "value"); + private LogicalSchema(final ImmutableList columns) { + this.columns = Objects.requireNonNull(columns, "columns"); } public ConnectSchema keyConnectSchema() { - return toConnectSchema(key); + return toConnectSchema(key()); } public ConnectSchema valueConnectSchema() { - return toConnectSchema(value); + return toConnectSchema(value()); } /** * @return the schema of the metadata. */ public List metadata() { - return metadata; + return byNamespace(NamespacedColumn::column) + .get(Namespace.META); } /** * @return the schema of the key. */ public List key() { - return key; + return byNamespace(NamespacedColumn::column) + .get(Namespace.KEY); } /** * @return the schema of the value. */ public List value() { - return value; + return byNamespace(NamespacedColumn::column) + .get(Namespace.VALUE); } /** * @return all columns in the schema. */ public List columns() { - return ImmutableList.builder() - .addAll(metadata) - .addAll(key) - .addAll(value) - .build(); + final ImmutableList.Builder builder = ImmutableList.builder(); + columns.forEach(c -> builder.add(c.column())); + return builder.build(); } /** @@ -127,22 +123,8 @@ public List columns() { * @return the column if found, else {@code Optional.empty()}. */ public Optional findColumn(final ColumnName columnName) { - return findColumn(columnName.name()); - } - - @VisibleForTesting - private Optional findColumn(final String columnName) { - Optional found = doFindColumn(columnName, metadata); - if (found.isPresent()) { - return found; - } - - found = doFindColumn(columnName, key); - if (found.isPresent()) { - return found; - } - - return doFindColumn(columnName, value); + return findNamespacedColumn(withName(columnName)) + .map(NamespacedColumn::column); } /** @@ -157,14 +139,15 @@ private Optional findColumn(final String columnName) { * @return the value column if found, else {@code Optional.empty()}. */ public Optional findValueColumn(final ColumnName columnName) { - return doFindColumn(columnName.name(), value); + return findValueColumn(columnName.name()); } /** * @see #findValueColumn(ColumnName) */ public Optional findValueColumn(final String columnName) { - return doFindColumn(columnName, value); + return findNamespacedColumn(withNamespace(Namespace.VALUE).and(withLaxName(columnName))) + .map(NamespacedColumn::column); } /** @@ -180,7 +163,7 @@ public OptionalInt valueColumnIndex(final ColumnName fullColumnName) { @VisibleForTesting public OptionalInt valueColumnIndex(final String fullColumnName) { int idx = 0; - for (final Column column : value) { + for (final Column column : value()) { if (column.fullName().equals(fullColumnName)) { return OptionalInt.of(idx); } @@ -205,11 +188,12 @@ public LogicalSchema withAlias(final SourceName alias) { throw new IllegalStateException("Already aliased"); } - return new LogicalSchema( - addAlias(alias, metadata), - addAlias(alias, key), - addAlias(alias, value) - ); + final ImmutableList.Builder builder = ImmutableList.builder(); + columns.stream() + .map(c -> c.withSource(alias)) + .forEach(builder::add); + + return new LogicalSchema(builder.build()); } /** @@ -222,11 +206,12 @@ public LogicalSchema withoutAlias() { throw new IllegalStateException("Not aliased"); } - return new LogicalSchema( - removeAlias(metadata), - removeAlias(key), - removeAlias(value) - ); + final ImmutableList.Builder builder = ImmutableList.builder(); + columns.stream() + .map(NamespacedColumn::noSource) + .forEach(builder::add); + + return new LogicalSchema(builder.build()); } /** @@ -234,7 +219,7 @@ public LogicalSchema withoutAlias() { */ public boolean isAliased() { // Either all columns are aliased, or none: - return metadata.get(0).source().isPresent(); + return columns.get(0).column().source().isPresent(); } /** @@ -245,28 +230,7 @@ public boolean isAliased() { * @return the new schema. */ public LogicalSchema withMetaAndKeyColsInValue() { - final List newValueColumns = new ArrayList<>( - metadata.size() - + key.size() - + value.size()); - - newValueColumns.addAll(metadata); - newValueColumns.addAll(key); - - value.forEach(f -> { - if (!doFindColumn(f.name().name(), newValueColumns).isPresent()) { - newValueColumns.add(f); - } - }); - - final ImmutableList.Builder builder = ImmutableList.builder(); - newValueColumns.forEach(builder::add); - - return new LogicalSchema( - metadata, - key, - builder.build() - ); + return rebuild(true); } /** @@ -275,19 +239,7 @@ public LogicalSchema withMetaAndKeyColsInValue() { * @return the new schema with the columns removed. */ public LogicalSchema withoutMetaAndKeyColsInValue() { - final ImmutableList.Builder builder = ImmutableList.builder(); - - final Set excluded = metaAndKeyColumnNames(); - - value.stream() - .filter(f -> !excluded.contains(f.name())) - .forEach(builder::add); - - return new LogicalSchema( - metadata, - key, - builder.build() - ); + return rebuild(false); } /** @@ -295,7 +247,8 @@ public LogicalSchema withoutMetaAndKeyColsInValue() { * @return {@code true} if the column matches the name of any metadata column. */ public boolean isMetaColumn(final ColumnName columnName) { - return metaColumnNames().contains(columnName); + return findNamespacedColumn(withNamespace(Namespace.META).and(withName(columnName))) + .isPresent(); } /** @@ -303,7 +256,8 @@ public boolean isMetaColumn(final ColumnName columnName) { * @return {@code true} if the column matches the name of any key column. */ public boolean isKeyColumn(final ColumnName columnName) { - return keyColumnNames().contains(columnName); + return findNamespacedColumn(withNamespace(Namespace.KEY).and(withName(columnName))) + .isPresent(); } @Override @@ -315,14 +269,12 @@ public boolean equals(final Object o) { return false; } final LogicalSchema that = (LogicalSchema) o; - // Meta Columns deliberately excluded. - return Objects.equals(key, that.key) - && Objects.equals(value, that.value); + return Objects.equals(columns, that.columns); } @Override public int hashCode() { - return Objects.hash(key, value); + return Objects.hash(columns); } @Override @@ -333,64 +285,75 @@ public String toString() { public String toString(final FormatOptions formatOptions) { // Meta columns deliberately excluded. - final String keys = key.stream() - .map(f -> f.toString(formatOptions) + " " + KEY_KEYWORD) - .collect(Collectors.joining(", ")); + return columns.stream() + .filter(withNamespace(Namespace.META).negate()) + .map(c -> c.toString(formatOptions)) + .collect(Collectors.joining(", ", "[", "]")); + } - final String values = value.stream() - .map(f -> f.toString(formatOptions)) - .collect(Collectors.joining(", ")); + private Optional findNamespacedColumn( + final Predicate predicate + ) { + return columns.stream() + .filter(predicate) + .findFirst(); + } - final String join = keys.isEmpty() || values.isEmpty() - ? "" - : ", "; + private Map> byNamespace(final Function mapper) { + final Map> byNamespace = columns.stream() + .collect(Collectors.groupingBy( + NamespacedColumn::namespace, + Collectors.mapping(mapper, Collectors.toList()) + )); - return "[" + keys + join + values + "]"; - } + Arrays.stream(Namespace.values()) + .forEach(ns -> byNamespace.putIfAbsent(ns, ImmutableList.of())); - private Set metaColumnNames() { - return columnNames(metadata); + return byNamespace; } - private Set keyColumnNames() { - return columnNames(key); - } + private LogicalSchema rebuild(final boolean withMetaAndKeyColsInValue) { + final Map> byNamespace = byNamespace(Function.identity()); - private Set metaAndKeyColumnNames() { - final Set names = metaColumnNames(); - names.addAll(keyColumnNames()); - return names; - } + final List metadata = byNamespace.get(Namespace.META); + final List key = byNamespace.get(Namespace.KEY); + final List value = byNamespace.get(Namespace.VALUE); - private static Set columnNames(final List struct) { - return struct.stream() - .map(Column::name) - .collect(Collectors.toSet()); - } + final ImmutableList.Builder builder = ImmutableList.builder(); - private static List addAlias(final SourceName alias, final List columns) { - final ImmutableList.Builder builder = ImmutableList.builder(); + builder.addAll(metadata); + builder.addAll(key); + + if (withMetaAndKeyColsInValue) { + metadata.stream() + .map(c -> NamespacedColumn.of(c.column(), Namespace.VALUE)) + .forEach(builder::add); - for (final Column col : columns) { - builder.add(col.withSource(alias)); + key.stream() + .map(c -> NamespacedColumn.of(c.column(), Namespace.VALUE)) + .forEach(builder::add); } - return builder.build(); + + value.stream() + .filter(c -> !findNamespacedColumn( + (withNamespace(Namespace.META).or(withNamespace(Namespace.KEY)) + .and(withName(c.column().name())) + )).isPresent()) + .forEach(builder::add); + + return new LogicalSchema(builder.build()); } - private static List removeAlias(final List columns) { - final ImmutableList.Builder builder = ImmutableList.builder(); + private static Predicate withName(final ColumnName name) { + return c -> c.column().name().equals(name); + } - for (final Column col : columns) { - builder.add(Column.of(col.name(), col.type())); - } - return builder.build(); + private static Predicate withNamespace(final Namespace ns) { + return c -> c.namespace() == ns; } - private static Optional doFindColumn(final String column, final List columns) { - return columns - .stream() - .filter(f -> SchemaUtil.isFieldName(column, f.fullName())) - .findFirst(); + private static Predicate withLaxName(final String name) { + return c -> SchemaUtil.isFieldName(name, c.column().fullName()); } private static ConnectSchema toConnectSchema( @@ -409,12 +372,20 @@ private static ConnectSchema toConnectSchema( public static class Builder { - private final ImmutableList.Builder keyBuilder = ImmutableList.builder(); - private final ImmutableList.Builder valueBuilder = ImmutableList.builder(); + private final ImmutableList.Builder explicitColumns = ImmutableList.builder(); private final Set seenKeys = new HashSet<>(); private final Set seenValues = new HashSet<>(); + private boolean addImplicitRowKey = true; + private boolean addImplicitRowTime = true; + + public Builder noImplicitColumns() { + addImplicitRowKey = false; + addImplicitRowTime = false; + return this; + } + public Builder keyColumn(final ColumnName columnName, final SqlType type) { keyColumn(Column.of(columnName, type)); return this; @@ -424,7 +395,8 @@ public Builder keyColumn(final Column column) { if (!seenKeys.add(column.fullName())) { throw new KsqlException("Duplicate keys found in schema: " + column); } - keyBuilder.add(column); + explicitColumns.add(NamespacedColumn.of(column, Namespace.KEY)); + addImplicitRowKey = false; return this; } @@ -447,7 +419,7 @@ public Builder valueColumn(final Column column) { if (!seenValues.add(column.fullName())) { throw new KsqlException("Duplicate values found in schema: " + column); } - valueBuilder.add(column); + explicitColumns.add(NamespacedColumn.of(column, Namespace.VALUE)); return this; } @@ -457,17 +429,94 @@ public Builder valueColumns(final Iterable column) { } public LogicalSchema build() { - final List suppliedKey = keyBuilder.build(); + final ImmutableList.Builder allColumns = ImmutableList.builder(); + + if (addImplicitRowTime) { + allColumns.add(IMPLICIT_TIME_COLUMN); + } + + if (addImplicitRowKey) { + allColumns.add(IMPLICIT_KEY_COLUMN); + } + + allColumns.addAll(explicitColumns.build()); + + return new LogicalSchema(allColumns.build()); + } + } + + private enum Namespace { + META, + KEY, + VALUE + } + + @Immutable + private static final class NamespacedColumn { + + private final Column column; + private final Namespace namespace; + + static NamespacedColumn of( + final Column column, + final Namespace namespace + ) { + return new NamespacedColumn(column, namespace); + } + + private NamespacedColumn( + final Column column, + final Namespace namespace + ) { + this.column = requireNonNull(column, "column"); + this.namespace = requireNonNull(namespace, "namespace"); + } + + Column column() { + return column; + } + + Namespace namespace() { + return namespace; + } + + NamespacedColumn withSource(final SourceName sourceName) { + return NamespacedColumn.of(column.withSource(sourceName), namespace); + } + + NamespacedColumn noSource() { + return NamespacedColumn.of(Column.of(column.name(), column.type()), namespace); + } + + @Override + public boolean equals(final Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + final NamespacedColumn that = (NamespacedColumn) o; + return Objects.equals(column, that.column) + && namespace == that.namespace; + } + + @Override + public int hashCode() { + return Objects.hash(column, namespace); + } + + @Override + public String toString() { + return toString(FormatOptions.none()); + } - final List key = suppliedKey.isEmpty() - ? IMPLICIT_KEY_SCHEMA - : suppliedKey; + public String toString(final FormatOptions formatOptions) { + final String postFix = namespace == Namespace.VALUE + ? "" + : " " + namespace; - return new LogicalSchema( - METADATA_SCHEMA, - key, - valueBuilder.build() - ); + return column.toString(formatOptions) + postFix; } } } \ No newline at end of file diff --git a/ksql-common/src/main/java/io/confluent/ksql/util/SchemaUtil.java b/ksql-common/src/main/java/io/confluent/ksql/util/SchemaUtil.java index 9e25afc5e715..4eab6c459778 100644 --- a/ksql-common/src/main/java/io/confluent/ksql/util/SchemaUtil.java +++ b/ksql-common/src/main/java/io/confluent/ksql/util/SchemaUtil.java @@ -81,15 +81,6 @@ public final class SchemaUtil { private static final char FIELD_NAME_DELIMITER = '.'; - private static final ImmutableMap SCHEMA_TYPE_TO_CAST_STRING = - new ImmutableMap.Builder() - .put(Schema.Type.INT32, "(Integer)") - .put(Schema.Type.INT64, "(Long)") - .put(Schema.Type.FLOAT64, "(Double)") - .put(Schema.Type.STRING, "(String)") - .put(Schema.Type.BOOLEAN, "(Boolean)") - .build(); - private static final Map> CUSTOM_SCHEMA_EQ = ImmutableMap.>builder() .put(Type.MAP, SchemaUtil::mapCompatible) diff --git a/ksql-common/src/test/java/io/confluent/ksql/schema/ksql/LogicalSchemaTest.java b/ksql-common/src/test/java/io/confluent/ksql/schema/ksql/LogicalSchemaTest.java index 55c3ac607162..1dff769bbe62 100644 --- a/ksql-common/src/test/java/io/confluent/ksql/schema/ksql/LogicalSchemaTest.java +++ b/ksql-common/src/test/java/io/confluent/ksql/schema/ksql/LogicalSchemaTest.java @@ -19,6 +19,7 @@ import static io.confluent.ksql.util.SchemaUtil.ROWTIME_NAME; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.contains; +import static org.hamcrest.Matchers.empty; import static org.hamcrest.Matchers.hasSize; import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.not; @@ -44,8 +45,10 @@ public class LogicalSchemaTest { private static final ColumnName K0 = ColumnName.of("k0"); + private static final ColumnName K1 = ColumnName.of("k1"); private static final ColumnName KEY = ColumnName.of("key"); private static final ColumnName V0 = ColumnName.of("v0"); + private static final ColumnName V1 = ColumnName.of("v1"); private static final ColumnName F0 = ColumnName.of("f0"); private static final ColumnName F1 = ColumnName.of("f1"); private static final ColumnName VALUE = ColumnName.of("value"); @@ -54,8 +57,8 @@ public class LogicalSchemaTest { private static final SourceName FRED = SourceName.of("fred"); private static final LogicalSchema SOME_SCHEMA = LogicalSchema.builder() - .keyColumn(K0, SqlTypes.BIGINT) .valueColumn(F0, SqlTypes.STRING) + .keyColumn(K0, SqlTypes.BIGINT) .valueColumn(F1, SqlTypes.BIGINT) .build(); @@ -84,6 +87,12 @@ public void shouldImplementEqualsProperly() { .valueColumn(V0, SqlTypes.STRING) .build() ) + .addEqualityGroup( + LogicalSchema.builder() + .valueColumn(V0, SqlTypes.STRING) + .keyColumn(K0, SqlTypes.BIGINT) + .build() + ) .addEqualityGroup( LogicalSchema.builder() .valueColumn(F0, SqlTypes.STRING) @@ -136,7 +145,7 @@ public void shouldThrowIfAlreadyAliased() { } @Test - public void shouldOnlyAddAliasToTopLevelFields() { + public void shouldOnlyAddAliasToTopLevelColumns() { // Given: final LogicalSchema schema = LogicalSchema.builder() .keyColumn(K0, SqlTypes.BIGINT) @@ -150,13 +159,15 @@ public void shouldOnlyAddAliasToTopLevelFields() { final LogicalSchema result = schema.withAlias(BOB); // Then: - assertThat(result, is(LogicalSchema.builder() - .keyColumn(Column.of(BOB, K0, SqlTypes.BIGINT)) - .valueColumn(Column.of(BOB, F0, SqlTypes.STRING)) - .valueColumn(Column.of(BOB, F1, SqlTypes.struct() + assertThat(result.key(), is(contains( + Column.of(BOB, K0, SqlTypes.BIGINT) + ))); + assertThat(result.value(), is(contains( + Column.of(BOB, F0, SqlTypes.STRING), + Column.of(BOB, F1, SqlTypes.struct() .field("nested", SqlTypes.BIGINT) - .build())) - .build())); + .build()) + ))); } @Test @@ -176,7 +187,7 @@ public void shouldThrowIfNotAliased() { } @Test - public void shouldOnlyRemoveAliasFromTopLevelFields() { + public void shouldOnlyRemoveAliasFromTopLevelColumns() { // Given: final LogicalSchema schema = LogicalSchema.builder() .keyColumn(K0, SqlTypes.INTEGER) @@ -200,7 +211,7 @@ public void shouldOnlyRemoveAliasFromTopLevelFields() { } @Test - public void shouldGetFieldByName() { + public void shouldGetColumnByName() { // When: final Optional result = SOME_SCHEMA.findValueColumn(F0); @@ -209,7 +220,7 @@ public void shouldGetFieldByName() { } @Test - public void shouldGetFieldByAliasedName() { + public void shouldGetColumnByAliasedName() { // When: final Optional result = SOME_SCHEMA.findValueColumn("SomeAlias.f0"); @@ -218,7 +229,7 @@ public void shouldGetFieldByAliasedName() { } @Test - public void shouldNotGetFieldByNameIfWrongCase() { + public void shouldNotGetColumnByNameIfWrongCase() { // When: final Optional result = SOME_SCHEMA.findValueColumn(ColumnName.of("F0")); @@ -227,7 +238,7 @@ public void shouldNotGetFieldByNameIfWrongCase() { } @Test - public void shouldNotGetFieldByNameIfFieldIsAliasedAndNameIsNot() { + public void shouldNotGetColumnByNameIfColumnIsAliasedAndNameIsNot() { // When: final Optional result = ALIASED_SCHEMA.findValueColumn(F0); @@ -236,7 +247,7 @@ public void shouldNotGetFieldByNameIfFieldIsAliasedAndNameIsNot() { } @Test - public void shouldGetFieldByNameIfBothFieldAndNameAreAliased() { + public void shouldGetColumnByNameIfBothColumnAndNameAreAliased() { // When: final Optional result = ALIASED_SCHEMA.findValueColumn("bob.f0"); @@ -245,23 +256,23 @@ public void shouldGetFieldByNameIfBothFieldAndNameAreAliased() { } @Test - public void shouldNotGetMetaFieldFromValue() { + public void shouldNotGetMetaColumnFromValue() { assertThat(SOME_SCHEMA.findValueColumn("ROWTIME"), is(Optional.empty())); } @Test - public void shouldNotGetKeyFieldFromValue() { + public void shouldNotGetKeyColumnFromValue() { assertThat(SOME_SCHEMA.findValueColumn(K0), is(Optional.empty())); } @Test - public void shouldGetMetaFieldFromValueIfAdded() { + public void shouldGetMetaColumnFromValueIfAdded() { assertThat(SOME_SCHEMA.withMetaAndKeyColsInValue().findValueColumn("ROWTIME"), is(not(Optional.empty()))); } @Test - public void shouldGetKeyFieldFromValueIfAdded() { + public void shouldGetKeyColumnFromValueIfAdded() { assertThat(SOME_SCHEMA.withMetaAndKeyColsInValue().findValueColumn(K0), is(not(Optional.empty()))); } @@ -274,94 +285,94 @@ public void shouldGetMetaFields() { } @Test - public void shouldGetKeyFields() { + public void shouldGetKeyColumns() { assertThat(SOME_SCHEMA.findColumn(K0), is(Optional.of( Column.of(K0, SqlTypes.BIGINT) ))); } @Test - public void shouldGetValueFields() { + public void shouldGetValueColumns() { assertThat(SOME_SCHEMA.findColumn(F0), is(Optional.of( Column.of(F0, SqlTypes.STRING) ))); } @Test - public void shouldGetFieldIndex() { + public void shouldGetColumnIndex() { assertThat(SOME_SCHEMA.valueColumnIndex(F0), is(OptionalInt.of(0))); assertThat(SOME_SCHEMA.valueColumnIndex(F1), is(OptionalInt.of(1))); } @Test - public void shouldReturnMinusOneForIndexIfFieldNotFound() { + public void shouldReturnMinusOneForIndexIfColumnNotFound() { assertThat(SOME_SCHEMA.valueColumnIndex("wontfindme"), is(OptionalInt.empty())); } @Test - public void shouldNotFindFieldIfDifferentCase() { + public void shouldNotFindColumnIfDifferentCase() { assertThat(SOME_SCHEMA.valueColumnIndex(ColumnName.of("F0")), is(OptionalInt.empty())); } @Test - public void shouldGetAliasedFieldIndex() { + public void shouldGetAliasedColumnIndex() { assertThat(ALIASED_SCHEMA.valueColumnIndex("bob.f1"), is(OptionalInt.of(1))); } @Test - public void shouldNotFindUnaliasedFieldIndexInAliasedSchema() { + public void shouldNotFindUnaliasedColumnIndexInAliasedSchema() { assertThat(ALIASED_SCHEMA.valueColumnIndex(F1), is(OptionalInt.empty())); } @Test - public void shouldNotFindAliasedFieldIndexInUnaliasedSchema() { + public void shouldNotFindAliasedColumnIndexInUnaliasedSchema() { assertThat(SOME_SCHEMA.valueColumnIndex("bob.f1"), is(OptionalInt.empty())); } @Test - public void shouldExposeMetaFields() { + public void shouldExposeMetaColumns() { assertThat(SOME_SCHEMA.metadata(), is(ImmutableList.of( Column.of(ROWTIME_NAME, SqlTypes.BIGINT) ))); } @Test - public void shouldExposeAliasedMetaFields() { + public void shouldExposeAliasedMetaColumns() { // Given: final LogicalSchema schema = SOME_SCHEMA.withAlias(BOB); // When: - final List fields = schema.metadata(); + final List columns = schema.metadata(); // Then: - assertThat(fields, is(ImmutableList.of( + assertThat(columns, is(ImmutableList.of( Column.of(BOB, ROWTIME_NAME, SqlTypes.BIGINT) ))); } @Test - public void shouldExposeKeyFields() { + public void shouldExposeKeyColumns() { assertThat(SOME_SCHEMA.key(), is(ImmutableList.of( Column.of(K0, SqlTypes.BIGINT) ))); } @Test - public void shouldExposeAliasedKeyFields() { + public void shouldExposeAliasedKeyColumns() { // Given: final LogicalSchema schema = SOME_SCHEMA.withAlias(BOB); // When: - final List fields = schema.key(); + final List columns = schema.key(); // Then: - assertThat(fields, is(ImmutableList.of( + assertThat(columns, is(ImmutableList.of( Column.of(BOB, K0, SqlTypes.BIGINT) ))); } @Test - public void shouldExposeValueFields() { + public void shouldExposeValueColumns() { assertThat(SOME_SCHEMA.value(), contains( Column.of(F0, SqlTypes.STRING), Column.of(F1, SqlTypes.BIGINT) @@ -369,43 +380,60 @@ public void shouldExposeValueFields() { } @Test - public void shouldExposeAliasedValueFields() { + public void shouldExposeAliasedValueColumns() { // Given: final LogicalSchema schema = SOME_SCHEMA.withAlias(BOB); // When: - final List fields = schema.value(); + final List columns = schema.value(); // Then: - assertThat(fields, contains( + assertThat(columns, contains( Column.of(BOB, F0, SqlTypes.STRING), Column.of(BOB, F1, SqlTypes.BIGINT) )); } @Test - public void shouldExposeAllFields() { + public void shouldExposeAllColumns() { assertThat(SOME_SCHEMA.columns(), is(ImmutableList.of( Column.of(ROWTIME_NAME, SqlTypes.BIGINT), + Column.of(F0, SqlTypes.STRING), Column.of(K0, SqlTypes.BIGINT), + Column.of(F1, SqlTypes.BIGINT) + ))); + } + + @Test + public void shouldExposeAllColumnsWithoutImplicits() { + // Given: + final LogicalSchema schema = LogicalSchema.builder() + .noImplicitColumns() + .valueColumn(F0, SqlTypes.STRING) + .keyColumn(K0, SqlTypes.BIGINT) + .valueColumn(F1, SqlTypes.BIGINT) + .build(); + + assertThat(schema.columns(), is(ImmutableList.of( Column.of(F0, SqlTypes.STRING), + Column.of(K0, SqlTypes.BIGINT), Column.of(F1, SqlTypes.BIGINT) ))); } @Test - public void shouldExposeAliasedAllFields() { + public void shouldExposeAliasedAllColumns() { // Given: final LogicalSchema schema = SOME_SCHEMA.withAlias(BOB); // When: - final List fields = schema.columns(); + final List columns = schema.columns(); // Then: - assertThat(fields, is(ImmutableList.of( + assertThat(columns, is(ImmutableList.of( Column.of(BOB, ROWTIME_NAME, SqlTypes.BIGINT), - Column.of(BOB, K0, SqlTypes.BIGINT), Column.of(BOB, F0, SqlTypes.STRING), + Column.of(BOB, K0, SqlTypes.BIGINT), Column.of(BOB, F1, SqlTypes.BIGINT) ))); } @@ -414,6 +442,8 @@ public void shouldExposeAliasedAllFields() { public void shouldConvertSchemaToString() { // Given: final LogicalSchema schema = LogicalSchema.builder() + .keyColumn(K0, SqlTypes.BIGINT) + .keyColumn(K1, SqlTypes.DOUBLE) .valueColumn(F0, SqlTypes.BOOLEAN) .valueColumn(F1, SqlTypes.INTEGER) .valueColumn(ColumnName.of("f2"), SqlTypes.BIGINT) @@ -424,8 +454,6 @@ public void shouldConvertSchemaToString() { .build()) .valueColumn(ColumnName.of("f7"), SqlTypes.array(SqlTypes.STRING)) .valueColumn(ColumnName.of("f8"), SqlTypes.map(SqlTypes.STRING)) - .keyColumn(K0, SqlTypes.BIGINT) - .keyColumn(ColumnName.of("k1"), SqlTypes.DOUBLE) .build(); // When: @@ -447,6 +475,31 @@ public void shouldConvertSchemaToString() { + "]")); } + @Test + public void shouldSupportKeyInterleavedWithValueColumns() { + // Given: + final LogicalSchema schema = LogicalSchema.builder() + .valueColumn(F0, SqlTypes.BOOLEAN) + .keyColumn(K0, SqlTypes.BIGINT) + .valueColumn(V0, SqlTypes.INTEGER) + .keyColumn(K1, SqlTypes.DOUBLE) + .valueColumn(V1, SqlTypes.BOOLEAN) + .build(); + + // When: + final String s = schema.toString(); + + // Then: + assertThat(s, is( + "[" + + "`f0` BOOLEAN, " + + "`k0` BIGINT KEY, " + + "`v0` INTEGER, " + + "`k1` DOUBLE KEY, " + + "`v1` BOOLEAN" + + "]")); + } + @Test public void shouldConvertSchemaToStringWithReservedWords() { // Given: @@ -550,7 +603,7 @@ public void shouldAddMetaAndKeyColumnsOnlyOnce() { } @Test - public void shouldRemoveOthersWhenAddingMetasAndKeyFields() { + public void shouldRemoveOthersWhenAddingMetasAndKeyColumns() { // Given: final LogicalSchema ksqlSchema = LogicalSchema.builder() .valueColumn(F0, SqlTypes.BIGINT) @@ -573,7 +626,7 @@ public void shouldRemoveOthersWhenAddingMetasAndKeyFields() { } @Test - public void shouldRemoveMetaFields() { + public void shouldRemoveMetaColumns() { // Given: final LogicalSchema schema = LogicalSchema.builder() .valueColumn(F0, SqlTypes.BIGINT) @@ -593,7 +646,7 @@ public void shouldRemoveMetaFields() { } @Test - public void shouldRemoveMetaFieldsWhereEverTheyAre() { + public void shouldRemoveMetaColumnsWhereEverTheyAre() { // Given: final LogicalSchema schema = LogicalSchema.builder() .valueColumn(F0, SqlTypes.BIGINT) @@ -614,7 +667,7 @@ public void shouldRemoveMetaFieldsWhereEverTheyAre() { } @Test - public void shouldRemoveMetaFieldsEvenIfAliased() { + public void shouldRemoveMetaColumnsEvenIfAliased() { // Given: final LogicalSchema schema = LogicalSchema.builder() .valueColumn(F0, SqlTypes.BIGINT) @@ -627,43 +680,44 @@ public void shouldRemoveMetaFieldsEvenIfAliased() { final LogicalSchema result = schema.withoutMetaAndKeyColsInValue(); // Then: - assertThat(result, is(LogicalSchema.builder() - .keyColumn(Column.of(BOB, ROWKEY_NAME, SqlTypes.STRING)) - .valueColumn(Column.of(BOB, F0, SqlTypes.BIGINT)) - .valueColumn(Column.of(BOB, F1, SqlTypes.BIGINT)) - .build() - )); + assertThat(result.key(), is(contains( + Column.of(BOB, ROWKEY_NAME, SqlTypes.STRING) + ))); + assertThat(result.value(), is(contains( + Column.of(BOB, F0, SqlTypes.BIGINT), + Column.of(BOB, F1, SqlTypes.BIGINT) + ))); } @Test - public void shouldMatchMetaFieldName() { + public void shouldMatchMetaColumnName() { assertThat(SOME_SCHEMA.isMetaColumn(ROWTIME_NAME), is(true)); assertThat(SOME_SCHEMA.isKeyColumn(ROWTIME_NAME), is(false)); } @Test - public void shouldMatchKeyFieldName() { + public void shouldMatchKeyColumnName() { assertThat(SOME_SCHEMA.isMetaColumn(K0), is(false)); assertThat(SOME_SCHEMA.isKeyColumn(K0), is(true)); } @Test - public void shouldNotMatchValueFieldsAsBeingMetaOrKeyFields() { - SOME_SCHEMA.value().forEach(field -> + public void shouldNotMatchValueColumnsAsBeingMetaOrKeyColumns() { + SOME_SCHEMA.value().forEach(column -> { - assertThat(SOME_SCHEMA.isMetaColumn(field.name()), is(false)); - assertThat(SOME_SCHEMA.isKeyColumn(field.name()), is(false)); + assertThat(SOME_SCHEMA.isMetaColumn(column.name()), is(false)); + assertThat(SOME_SCHEMA.isKeyColumn(column.name()), is(false)); }); } @Test - public void shouldNotMatchRandomFieldNameAsBeingMetaOrKeyFields() { + public void shouldNotMatchRandomColumnNameAsBeingMetaOrKeyColumns() { assertThat(SOME_SCHEMA.isMetaColumn(ColumnName.of("well_this_ain't_in_the_schema")), is(false)); assertThat(SOME_SCHEMA.isKeyColumn(ColumnName.of("well_this_ain't_in_the_schema")), is(false)); } @Test - public void shouldThrowOnDuplicateKeyFieldName() { + public void shouldThrowOnDuplicateKeyColumnName() { // Given: final Builder builder = LogicalSchema.builder() .keyColumn(KEY, SqlTypes.BIGINT); @@ -677,7 +731,7 @@ public void shouldThrowOnDuplicateKeyFieldName() { } @Test - public void shouldThrowOnDuplicateValueFieldName() { + public void shouldThrowOnDuplicateValueColumnName() { // Given: final Builder builder = LogicalSchema.builder() .valueColumn(VALUE, SqlTypes.BIGINT); @@ -691,7 +745,7 @@ public void shouldThrowOnDuplicateValueFieldName() { } @Test - public void shouldAllowKeyFieldsWithSameNameButDifferentSource() { + public void shouldAllowKeyColumnsWithSameNameButDifferentSource() { // Given: final Builder builder = LogicalSchema.builder(); @@ -710,7 +764,7 @@ public void shouldAllowKeyFieldsWithSameNameButDifferentSource() { } @Test - public void shouldAllowValueFieldsWithSameNameButDifferentSource() { + public void shouldAllowValueColumnsWithSameNameButDifferentSource() { // Given: final Builder builder = LogicalSchema.builder(); @@ -772,6 +826,20 @@ public void shouldGetValueConnectSchema() { )); } + @Test + public void shouldBuildSchemaWithNoImplicitColumns() { + // When: + final LogicalSchema schema = LogicalSchema.builder() + .noImplicitColumns() + .valueColumn(F0, SqlTypes.BIGINT) + .build(); + + // Then: + assertThat(schema.metadata(), is(empty())); + assertThat(schema.key(), is(empty())); + assertThat(schema.value(), contains(Column.of(F0, SqlTypes.BIGINT))); + } + private static org.apache.kafka.connect.data.Field connectField( final String fieldName, final int index, diff --git a/ksql-engine/src/main/java/io/confluent/ksql/analyzer/SourceSchemas.java b/ksql-engine/src/main/java/io/confluent/ksql/analyzer/SourceSchemas.java index aa8eb6e4ff87..abc6eb2b0a62 100644 --- a/ksql-engine/src/main/java/io/confluent/ksql/analyzer/SourceSchemas.java +++ b/ksql-engine/src/main/java/io/confluent/ksql/analyzer/SourceSchemas.java @@ -72,14 +72,15 @@ Set sourcesWithField(final ColumnName columnName) { } final SourceName sourceName = SourceName.of(maybeSourceName.get()); - final String baseColumnName = SchemaUtil.getFieldNameWithNoAlias(columnName.name()); + final ColumnName baseColumnName = ColumnName + .of(SchemaUtil.getFieldNameWithNoAlias(columnName.name())); final LogicalSchema sourceSchema = sourceSchemas.get(sourceName); if (sourceSchema == null) { return ImmutableSet.of(); } - return sourceSchema.findColumn(ColumnName.of(baseColumnName)).isPresent() + return sourceSchema.findColumn(baseColumnName).isPresent() ? ImmutableSet.of(sourceName) : ImmutableSet.of(); } diff --git a/ksql-engine/src/main/java/io/confluent/ksql/planner/plan/JoinNode.java b/ksql-engine/src/main/java/io/confluent/ksql/planner/plan/JoinNode.java index 9a0706144056..d4ffa9c79669 100644 --- a/ksql-engine/src/main/java/io/confluent/ksql/planner/plan/JoinNode.java +++ b/ksql-engine/src/main/java/io/confluent/ksql/planner/plan/JoinNode.java @@ -503,13 +503,13 @@ private static LogicalSchema buildSchema( final LogicalSchema.Builder joinSchema = LogicalSchema.builder(); + // Hard-wire for now, until we support custom type/name of key fields: + joinSchema.keyColumn(SchemaUtil.ROWKEY_NAME, SqlTypes.STRING); + joinSchema.valueColumns(leftSchema.value()); joinSchema.valueColumns(rightSchema.value()); - // Hard-wire for now, until we support custom type/name of key fields: - joinSchema.keyColumn(SchemaUtil.ROWKEY_NAME, SqlTypes.STRING); - return joinSchema.build(); } } diff --git a/ksql-engine/src/main/java/io/confluent/ksql/query/id/HybridQueryIdGenerator.java b/ksql-engine/src/main/java/io/confluent/ksql/query/id/HybridQueryIdGenerator.java index 04acf5c13b60..0cb31173f9a8 100644 --- a/ksql-engine/src/main/java/io/confluent/ksql/query/id/HybridQueryIdGenerator.java +++ b/ksql-engine/src/main/java/io/confluent/ksql/query/id/HybridQueryIdGenerator.java @@ -16,7 +16,6 @@ package io.confluent.ksql.query.id; import com.google.common.annotations.VisibleForTesting; - import java.util.concurrent.atomic.AtomicReference; /** diff --git a/ksql-engine/src/main/java/io/confluent/ksql/query/id/SpecificQueryIdGenerator.java b/ksql-engine/src/main/java/io/confluent/ksql/query/id/SpecificQueryIdGenerator.java index 46aef6a68bb2..512dec937217 100644 --- a/ksql-engine/src/main/java/io/confluent/ksql/query/id/SpecificQueryIdGenerator.java +++ b/ksql-engine/src/main/java/io/confluent/ksql/query/id/SpecificQueryIdGenerator.java @@ -16,7 +16,6 @@ package io.confluent.ksql.query.id; import io.confluent.ksql.util.KsqlServerException; - import javax.annotation.concurrent.NotThreadSafe; /** diff --git a/ksql-engine/src/test/java/io/confluent/ksql/KsqlContextTestUtil.java b/ksql-engine/src/test/java/io/confluent/ksql/KsqlContextTestUtil.java index 876aadb14ae8..10892a22f826 100644 --- a/ksql-engine/src/test/java/io/confluent/ksql/KsqlContextTestUtil.java +++ b/ksql-engine/src/test/java/io/confluent/ksql/KsqlContextTestUtil.java @@ -19,6 +19,7 @@ import io.confluent.ksql.engine.KsqlEngine; import io.confluent.ksql.function.FunctionRegistry; import io.confluent.ksql.logging.processing.ProcessingLogContext; +import io.confluent.ksql.query.id.SequentialQueryIdGenerator; import io.confluent.ksql.services.DefaultConnectClient; import io.confluent.ksql.services.KafkaTopicClient; import io.confluent.ksql.services.KafkaTopicClientImpl; @@ -27,8 +28,6 @@ import io.confluent.ksql.statement.Injectors; import io.confluent.ksql.util.KsqlConfig; import java.util.concurrent.atomic.AtomicInteger; - -import io.confluent.ksql.query.id.SequentialQueryIdGenerator; import org.apache.kafka.clients.admin.Admin; import org.apache.kafka.streams.KafkaClientSupplier; import org.apache.kafka.streams.processor.internals.DefaultKafkaClientSupplier; diff --git a/ksql-engine/src/test/java/io/confluent/ksql/analyzer/AnalyzerFunctionalTest.java b/ksql-engine/src/test/java/io/confluent/ksql/analyzer/AnalyzerFunctionalTest.java index aa588ac8df15..30180cd35135 100644 --- a/ksql-engine/src/test/java/io/confluent/ksql/analyzer/AnalyzerFunctionalTest.java +++ b/ksql-engine/src/test/java/io/confluent/ksql/analyzer/AnalyzerFunctionalTest.java @@ -70,7 +70,6 @@ import java.util.Optional; import java.util.Set; import java.util.stream.Collectors; -import org.junit.Assert; import org.junit.Before; import org.junit.Rule; import org.junit.Test; diff --git a/ksql-engine/src/test/java/io/confluent/ksql/engine/KsqlEngineTestUtil.java b/ksql-engine/src/test/java/io/confluent/ksql/engine/KsqlEngineTestUtil.java index 6cfc2ee02128..431072ede56f 100644 --- a/ksql-engine/src/test/java/io/confluent/ksql/engine/KsqlEngineTestUtil.java +++ b/ksql-engine/src/test/java/io/confluent/ksql/engine/KsqlEngineTestUtil.java @@ -23,14 +23,14 @@ import io.confluent.ksql.metastore.MutableMetaStore; import io.confluent.ksql.parser.KsqlParser.ParsedStatement; import io.confluent.ksql.parser.KsqlParser.PreparedStatement; +import io.confluent.ksql.query.id.QueryIdGenerator; +import io.confluent.ksql.query.id.SequentialQueryIdGenerator; import io.confluent.ksql.schema.ksql.inference.DefaultSchemaInjector; import io.confluent.ksql.schema.ksql.inference.SchemaRegistryTopicSchemaSupplier; import io.confluent.ksql.services.ServiceContext; import io.confluent.ksql.statement.ConfiguredStatement; -import io.confluent.ksql.query.id.SequentialQueryIdGenerator; import io.confluent.ksql.util.KsqlConfig; import io.confluent.ksql.util.KsqlStatementException; -import io.confluent.ksql.query.id.QueryIdGenerator; import io.confluent.ksql.util.QueryMetadata; import java.util.Collections; import java.util.List; diff --git a/ksql-engine/src/test/java/io/confluent/ksql/function/udf/math/RoundTest.java b/ksql-engine/src/test/java/io/confluent/ksql/function/udf/math/RoundTest.java index c921340578ee..bbedb5edf3d3 100644 --- a/ksql-engine/src/test/java/io/confluent/ksql/function/udf/math/RoundTest.java +++ b/ksql-engine/src/test/java/io/confluent/ksql/function/udf/math/RoundTest.java @@ -16,11 +16,9 @@ package io.confluent.ksql.function.udf.math; import static org.hamcrest.Matchers.is; -import static org.hamcrest.Matchers.nullValue; import static org.junit.Assert.assertThat; import java.math.BigDecimal; -import java.math.RoundingMode; import org.junit.Before; import org.junit.Test; diff --git a/ksql-engine/src/test/java/io/confluent/ksql/integration/JsonFormatTest.java b/ksql-engine/src/test/java/io/confluent/ksql/integration/JsonFormatTest.java index e5a8c0fe3397..2b10d1f7ae50 100644 --- a/ksql-engine/src/test/java/io/confluent/ksql/integration/JsonFormatTest.java +++ b/ksql-engine/src/test/java/io/confluent/ksql/integration/JsonFormatTest.java @@ -31,6 +31,7 @@ import io.confluent.ksql.name.ColumnName; import io.confluent.ksql.name.SourceName; import io.confluent.ksql.query.QueryId; +import io.confluent.ksql.query.id.SequentialQueryIdGenerator; import io.confluent.ksql.schema.ksql.LogicalSchema; import io.confluent.ksql.schema.ksql.PhysicalSchema; import io.confluent.ksql.schema.ksql.types.SqlTypes; @@ -43,7 +44,6 @@ import io.confluent.ksql.util.OrderDataProvider; import io.confluent.ksql.util.PersistentQueryMetadata; import io.confluent.ksql.util.QueryMetadata; -import io.confluent.ksql.query.id.SequentialQueryIdGenerator; import io.confluent.ksql.util.TopicConsumer; import io.confluent.ksql.util.TopicProducer; import java.util.Arrays; diff --git a/ksql-engine/src/test/java/io/confluent/ksql/integration/SecureIntegrationTest.java b/ksql-engine/src/test/java/io/confluent/ksql/integration/SecureIntegrationTest.java index 881f1b9c2a32..81292a42581b 100644 --- a/ksql-engine/src/test/java/io/confluent/ksql/integration/SecureIntegrationTest.java +++ b/ksql-engine/src/test/java/io/confluent/ksql/integration/SecureIntegrationTest.java @@ -43,6 +43,7 @@ import io.confluent.ksql.function.InternalFunctionRegistry; import io.confluent.ksql.logging.processing.ProcessingLogContext; import io.confluent.ksql.query.QueryId; +import io.confluent.ksql.query.id.SequentialQueryIdGenerator; import io.confluent.ksql.services.KafkaTopicClient; import io.confluent.ksql.services.KafkaTopicClientImpl; import io.confluent.ksql.services.ServiceContext; @@ -55,7 +56,6 @@ import io.confluent.ksql.util.OrderDataProvider; import io.confluent.ksql.util.PersistentQueryMetadata; import io.confluent.ksql.util.QueryMetadata; -import io.confluent.ksql.query.id.SequentialQueryIdGenerator; import io.confluent.ksql.util.TopicConsumer; import io.confluent.ksql.util.TopicProducer; import java.util.Collections; diff --git a/ksql-engine/src/test/java/io/confluent/ksql/physical/PhysicalPlanBuilderTest.java b/ksql-engine/src/test/java/io/confluent/ksql/physical/PhysicalPlanBuilderTest.java index ff4d81e9229d..701a31f1fd91 100644 --- a/ksql-engine/src/test/java/io/confluent/ksql/physical/PhysicalPlanBuilderTest.java +++ b/ksql-engine/src/test/java/io/confluent/ksql/physical/PhysicalPlanBuilderTest.java @@ -66,7 +66,6 @@ import io.confluent.ksql.services.ServiceContext; import io.confluent.ksql.services.TestServiceContext; import io.confluent.ksql.testutils.AnalysisTestUtil; -import io.confluent.ksql.query.id.SequentialQueryIdGenerator; import io.confluent.ksql.util.KsqlConfig; import io.confluent.ksql.util.KsqlConstants; import io.confluent.ksql.util.KsqlException; diff --git a/ksql-engine/src/test/java/io/confluent/ksql/planner/plan/JoinNodeTest.java b/ksql-engine/src/test/java/io/confluent/ksql/planner/plan/JoinNodeTest.java index 22abd7d468a4..57fa2e7308d4 100644 --- a/ksql-engine/src/test/java/io/confluent/ksql/planner/plan/JoinNodeTest.java +++ b/ksql-engine/src/test/java/io/confluent/ksql/planner/plan/JoinNodeTest.java @@ -172,7 +172,6 @@ public class JoinNodeTest { @Mock private KeySerde reboundKeySerde; - @SuppressWarnings("unchecked") @Before public void setUp() { builder = new StreamsBuilder(); @@ -979,10 +978,11 @@ private void setupStream( @SuppressWarnings("Duplicates") private static LogicalSchema joinSchema() { - final LogicalSchema.Builder schemaBuilder = LogicalSchema.builder(); - schemaBuilder.valueColumns(LEFT_NODE_SCHEMA.value()); - schemaBuilder.valueColumns(RIGHT_NODE_SCHEMA.value()); - return schemaBuilder.build(); + return LogicalSchema.builder() + .keyColumn(SchemaUtil.ROWKEY_NAME, SqlTypes.STRING) + .valueColumns(LEFT_NODE_SCHEMA.value()) + .valueColumns(RIGHT_NODE_SCHEMA.value()) + .build(); } private void buildJoin() { diff --git a/ksql-engine/src/test/java/io/confluent/ksql/planner/plan/KsqlBareOutputNodeTest.java b/ksql-engine/src/test/java/io/confluent/ksql/planner/plan/KsqlBareOutputNodeTest.java index 02e75c52127a..5764d1bd870b 100644 --- a/ksql-engine/src/test/java/io/confluent/ksql/planner/plan/KsqlBareOutputNodeTest.java +++ b/ksql-engine/src/test/java/io/confluent/ksql/planner/plan/KsqlBareOutputNodeTest.java @@ -33,6 +33,7 @@ import io.confluent.ksql.metastore.MetaStore; import io.confluent.ksql.name.ColumnName; import io.confluent.ksql.query.QueryId; +import io.confluent.ksql.query.id.QueryIdGenerator; import io.confluent.ksql.schema.ksql.Column; import io.confluent.ksql.schema.ksql.LogicalSchema; import io.confluent.ksql.schema.ksql.types.SqlTypes; @@ -41,7 +42,6 @@ import io.confluent.ksql.testutils.AnalysisTestUtil; import io.confluent.ksql.util.KsqlConfig; import io.confluent.ksql.util.MetaStoreFixture; -import io.confluent.ksql.query.id.QueryIdGenerator; import java.util.Collections; import java.util.List; import java.util.Set; diff --git a/ksql-engine/src/test/java/io/confluent/ksql/query/id/HybridQueryIdGeneratorTest.java b/ksql-engine/src/test/java/io/confluent/ksql/query/id/HybridQueryIdGeneratorTest.java index 9b3f9cb4213a..c8849572dc43 100644 --- a/ksql-engine/src/test/java/io/confluent/ksql/query/id/HybridQueryIdGeneratorTest.java +++ b/ksql-engine/src/test/java/io/confluent/ksql/query/id/HybridQueryIdGeneratorTest.java @@ -17,11 +17,11 @@ import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.is; -import static org.mockito.ArgumentMatchers.anyLong; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; + import org.junit.Before; import org.junit.Test; diff --git a/ksql-execution/src/test/java/io/confluent/ksql/execution/testutil/TestExpressions.java b/ksql-execution/src/test/java/io/confluent/ksql/execution/testutil/TestExpressions.java index a7d2379d6693..d0f2e32785dd 100644 --- a/ksql-execution/src/test/java/io/confluent/ksql/execution/testutil/TestExpressions.java +++ b/ksql-execution/src/test/java/io/confluent/ksql/execution/testutil/TestExpressions.java @@ -1,11 +1,11 @@ package io.confluent.ksql.execution.testutil; +import io.confluent.ksql.execution.expression.tree.ColumnReferenceExp; import io.confluent.ksql.execution.expression.tree.Expression; import io.confluent.ksql.execution.expression.tree.IntegerLiteral; import io.confluent.ksql.name.ColumnName; import io.confluent.ksql.name.SourceName; import io.confluent.ksql.schema.ksql.ColumnRef; -import io.confluent.ksql.execution.expression.tree.ColumnReferenceExp; import io.confluent.ksql.schema.ksql.LogicalSchema; import io.confluent.ksql.schema.ksql.types.SqlStruct; import io.confluent.ksql.schema.ksql.types.SqlTypes; @@ -23,16 +23,16 @@ private TestExpressions() { .build(); public final static LogicalSchema SCHEMA = LogicalSchema.builder() - .valueColumn(ColumnName.of("TEST1.COL0"), SqlTypes.BIGINT) - .valueColumn(ColumnName.of("TEST1.COL1"), SqlTypes.STRING) - .valueColumn(ColumnName.of("TEST1.COL2"), SqlTypes.STRING) - .valueColumn(ColumnName.of("TEST1.COL3"), SqlTypes.DOUBLE) - .valueColumn(ColumnName.of("TEST1.COL4"), SqlTypes.array(SqlTypes.DOUBLE)) - .valueColumn(ColumnName.of("TEST1.COL5"), SqlTypes.map(SqlTypes.DOUBLE)) - .valueColumn(ColumnName.of("TEST1.COL6"), ADDRESS_SCHEMA) - .valueColumn(ColumnName.of("TEST1.COL7"), SqlTypes.INTEGER) - .valueColumn(ColumnName.of("TEST1.COL8"), SqlTypes.decimal(2, 1)) - .valueColumn(ColumnName.of("TEST1.COL9"), SqlTypes.decimal(2, 1)) + .valueColumn(SourceName.of("TEST1"), ColumnName.of("COL0"), SqlTypes.BIGINT) + .valueColumn(SourceName.of("TEST1"), ColumnName.of("COL1"), SqlTypes.STRING) + .valueColumn(SourceName.of("TEST1"), ColumnName.of("COL2"), SqlTypes.STRING) + .valueColumn(SourceName.of("TEST1"), ColumnName.of("COL3"), SqlTypes.DOUBLE) + .valueColumn(SourceName.of("TEST1"), ColumnName.of("COL4"), SqlTypes.array(SqlTypes.DOUBLE)) + .valueColumn(SourceName.of("TEST1"), ColumnName.of("COL5"), SqlTypes.map(SqlTypes.DOUBLE)) + .valueColumn(SourceName.of("TEST1"), ColumnName.of("COL6"), ADDRESS_SCHEMA) + .valueColumn(SourceName.of("TEST1"), ColumnName.of("COL7"), SqlTypes.INTEGER) + .valueColumn(SourceName.of("TEST1"), ColumnName.of("COL8"), SqlTypes.decimal(2, 1)) + .valueColumn(SourceName.of("TEST1"), ColumnName.of("COL9"), SqlTypes.decimal(2, 1)) .build(); private static final String TEST1 = "TEST1"; diff --git a/ksql-execution/src/test/java/io/confluent/ksql/execution/util/ExpressionTypeManagerTest.java b/ksql-execution/src/test/java/io/confluent/ksql/execution/util/ExpressionTypeManagerTest.java index d7b2f645e61c..7cc80ef1555f 100644 --- a/ksql-execution/src/test/java/io/confluent/ksql/execution/util/ExpressionTypeManagerTest.java +++ b/ksql-execution/src/test/java/io/confluent/ksql/execution/util/ExpressionTypeManagerTest.java @@ -397,7 +397,7 @@ public void shouldGetCorrectSchemaForSearchedCaseWhenStruct() { final SqlType result = expressionTypeManager.getExpressionSqlType(expression); // Then: - final SqlType sqlType = SCHEMA.findColumn(ColumnName.of(ADDRESS.toString())).get().type(); + final SqlType sqlType = SCHEMA.findColumn(ADDRESS.getReference().name()).get().type(); assertThat(result, is(sqlType)); } diff --git a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/computation/RecoveryTest.java b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/computation/RecoveryTest.java index 91442539b0e9..9cf9727c256c 100644 --- a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/computation/RecoveryTest.java +++ b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/computation/RecoveryTest.java @@ -36,6 +36,7 @@ import io.confluent.ksql.metastore.model.DataSource; import io.confluent.ksql.name.SourceName; import io.confluent.ksql.query.QueryId; +import io.confluent.ksql.query.id.HybridQueryIdGenerator; import io.confluent.ksql.rest.entity.CommandId; import io.confluent.ksql.rest.entity.CommandId.Action; import io.confluent.ksql.rest.entity.CommandId.Type; @@ -49,7 +50,6 @@ import io.confluent.ksql.services.ServiceContext; import io.confluent.ksql.services.TestServiceContext; import io.confluent.ksql.statement.ConfiguredStatement; -import io.confluent.ksql.query.id.HybridQueryIdGenerator; import io.confluent.ksql.util.KsqlConfig; import io.confluent.ksql.util.PersistentQueryMetadata; import io.confluent.ksql.util.timestamp.TimestampExtractionPolicy; diff --git a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/computation/StatementExecutorTest.java b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/computation/StatementExecutorTest.java index 36abd2720225..878a03b881dd 100644 --- a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/computation/StatementExecutorTest.java +++ b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/computation/StatementExecutorTest.java @@ -55,7 +55,7 @@ import io.confluent.ksql.parser.tree.RunScript; import io.confluent.ksql.parser.tree.Statement; import io.confluent.ksql.query.QueryId; -import io.confluent.ksql.query.id.QueryIdGenerator; +import io.confluent.ksql.query.id.HybridQueryIdGenerator; import io.confluent.ksql.rest.entity.CommandId; import io.confluent.ksql.rest.entity.CommandId.Action; import io.confluent.ksql.rest.entity.CommandId.Type; @@ -68,7 +68,6 @@ import io.confluent.ksql.services.TestServiceContext; import io.confluent.ksql.statement.ConfiguredStatement; import io.confluent.ksql.test.util.EmbeddedSingleNodeKafkaCluster; -import io.confluent.ksql.query.id.HybridQueryIdGenerator; import io.confluent.ksql.util.KsqlConfig; import io.confluent.ksql.util.Pair; import io.confluent.ksql.util.PersistentQueryMetadata; @@ -78,7 +77,6 @@ import java.util.Optional; import java.util.OptionalInt; import java.util.concurrent.TimeUnit; - import kafka.zookeeper.ZooKeeperClientException; import org.apache.kafka.streams.StreamsConfig; import org.easymock.EasyMockSupport;