Skip to content

Commit

Permalink
Refactor: Remove fuzzy logic used in LogicalSchema.findXXX methods (#…
Browse files Browse the repository at this point in the history
…3748)

* refactor: change LogicalSchema.findXXX to be exact match

Remove the fuzzy logic from `findColumn` and its friends, which would previously ignore the `SourceName` if either the schema column or the requested `ColumnRef` did not have a source.
  • Loading branch information
big-andy-coates authored Nov 5, 2019
1 parent accb1f3 commit 3d9a1e8
Show file tree
Hide file tree
Showing 18 changed files with 185 additions and 139 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,14 @@ public static ColumnRef withoutSource(final ColumnName name) {
return new ColumnRef(Optional.empty(), name);
}

public ColumnRef withoutSource() {
return withoutSource(name);
}

public ColumnRef withSource(final SourceName source) {
return of(source, name);
}

private ColumnRef(final Optional<SourceName> qualifier, final ColumnName name) {
this.qualifier = requireNonNull(qualifier, "qualifier");
this.name = requireNonNull(name, "name");
Expand Down Expand Up @@ -115,8 +123,4 @@ public boolean equals(final Object o) {
public int hashCode() {
return Objects.hash(qualifier, name);
}

public ColumnRef withSource(final SourceName source) {
return of(source, name);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -109,24 +109,24 @@ public List<Column> columns() {
}

/**
* Search for a column with the supplied {@code target}.
* Search for a column with the supplied {@code columnRef}.
*
* @param target the column name, where any alias is ignored.
* @param columnRef the column source and name to match.
* @return the column if found, else {@code Optional.empty()}.
*/
public Optional<Column> findColumn(final ColumnRef target) {
return findNamespacedColumn(thatMatches(target))
public Optional<Column> findColumn(final ColumnRef columnRef) {
return findNamespacedColumn(withRef(columnRef))
.map(NamespacedColumn::column);
}

/**
* Search for a value column with the supplied {@code target}.
* Search for a value column with the supplied {@code columnRef}.
*
* @param target the column name, where any alias is ignored.
* @param columnRef the column source and name to match.
* @return the value column if found, else {@code Optional.empty()}.
*/
public Optional<Column> findValueColumn(final ColumnRef target) {
return findNamespacedColumn(withNamespace(Namespace.VALUE).and(thatMatches(target)))
public Optional<Column> findValueColumn(final ColumnRef columnRef) {
return findNamespacedColumn(withNamespace(Namespace.VALUE).and(withRef(columnRef)))
.map(NamespacedColumn::column);
}

Expand All @@ -139,7 +139,7 @@ public Optional<Column> findValueColumn(final ColumnRef target) {
public OptionalInt valueColumnIndex(final ColumnRef target) {
int idx = 0;
for (final Column column : value()) {
if (column.matches(target)) {
if (column.ref().equals(target)) {
return OptionalInt.of(idx);
}
++idx;
Expand Down Expand Up @@ -312,19 +312,19 @@ private LogicalSchema rebuild(final boolean withMetaAndKeyColsInValue) {
value.stream()
.filter(c -> !findNamespacedColumn(
(withNamespace(Namespace.META).or(withNamespace(Namespace.KEY))
.and(thatMatches(c.column().ref()))
.and(withRef(c.column().ref()))
)).isPresent())
.forEach(builder::add);

return new LogicalSchema(builder.build());
}

private static Predicate<NamespacedColumn> thatMatches(final ColumnRef ref) {
return c -> c.column().matches(ref);
private static Predicate<NamespacedColumn> withRef(final ColumnRef ref) {
return c -> c.column().ref().equals(ref);
}

private static Predicate<NamespacedColumn> withName(final ColumnName name) {
return c -> c.column().matches(ColumnRef.withoutSource(name));
return c -> c.column().name().equals(name);
}

private static Predicate<NamespacedColumn> withNamespace(final Namespace ns) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -222,12 +222,12 @@ public void shouldGetColumnByName() {
@Test
public void shouldGetColumnByAliasedName() {
// When:
final Optional<Column> result = SOME_SCHEMA.findValueColumn(
ColumnRef.of(SourceName.of("SomeAlias"), F0)
final Optional<Column> result = ALIASED_SCHEMA.findValueColumn(
ColumnRef.of(BOB, F0)
);

// Then:
assertThat(result, is(Optional.of(Column.of(F0, SqlTypes.STRING))));
assertThat(result, is(Optional.of(Column.of(ColumnRef.of(BOB, F0), SqlTypes.STRING))));
}

@Test
Expand All @@ -243,7 +243,7 @@ public void shouldNotGetColumnByNameIfWrongCase() {
@Test
public void shouldGetColumnByNameIfBothColumnAndNameAreAliased() {
// When:
final Optional<Column> result = ALIASED_SCHEMA.findValueColumn(ColumnRef.of(SourceName.of("bob"), F0));
final Optional<Column> result = ALIASED_SCHEMA.findValueColumn(ColumnRef.of(BOB, F0));

// Then:
assertThat(result, is(Optional.of(Column.of(BOB, F0, SqlTypes.STRING))));
Expand Down Expand Up @@ -271,6 +271,26 @@ public void shouldGetKeyColumnFromValueIfAdded() {
is(not(Optional.empty())));
}

@Test
public void shouldFindExactValueColumn() {
// Given:
final LogicalSchema schema = LogicalSchema.builder()
.valueColumn(F0, SqlTypes.BIGINT)
.valueColumn(BOB, F0, SqlTypes.STRING)
.build();

// Then:
assertThat(
schema.findValueColumn(ColumnRef.withoutSource(F0)),
is(Optional.of(Column.of(ColumnRef.withoutSource(F0), SqlTypes.BIGINT)))
);

assertThat(
schema.findValueColumn(ColumnRef.of(BOB, F0)),
is(Optional.of(Column.of(ColumnRef.of(BOB, F0), SqlTypes.STRING)))
);
}

@Test
public void shouldGetMetaFields() {
assertThat(SOME_SCHEMA.findColumn(ColumnRef.withoutSource(ROWTIME_NAME)), is(Optional.of(
Expand All @@ -293,7 +313,27 @@ public void shouldGetValueColumns() {
}

@Test
public void shouldGetColumnIndex() {
public void shouldFindExactColumn() {
// Given:
final LogicalSchema schema = LogicalSchema.builder()
.keyColumn(F0, SqlTypes.BIGINT)
.valueColumn(BOB, F0, SqlTypes.STRING)
.build();

// Then:
assertThat(
schema.findColumn(ColumnRef.withoutSource(F0)),
is(Optional.of(Column.of(ColumnRef.withoutSource(F0), SqlTypes.BIGINT)))
);

assertThat(
schema.findColumn(ColumnRef.of(BOB, F0)),
is(Optional.of(Column.of(ColumnRef.of(BOB, F0), SqlTypes.STRING)))
);
}

@Test
public void shouldGetValueColumnIndex() {
assertThat(SOME_SCHEMA.valueColumnIndex(ColumnRef.withoutSource(F0)), is(OptionalInt.of(0)));
assertThat(SOME_SCHEMA.valueColumnIndex(ColumnRef.withoutSource(F1)), is(OptionalInt.of(1)));
}
Expand All @@ -310,7 +350,27 @@ public void shouldNotFindColumnIfDifferentCase() {

@Test
public void shouldGetAliasedColumnIndex() {
assertThat(ALIASED_SCHEMA.valueColumnIndex(ColumnRef.of(SourceName.of("bob"), F1)), is(OptionalInt.of(1)));
assertThat(ALIASED_SCHEMA.valueColumnIndex(ColumnRef.of(BOB, F1)), is(OptionalInt.of(1)));
}

@Test
public void shouldFindExactColumnIndex() {
// Given:
final LogicalSchema schema = LogicalSchema.builder()
.valueColumn(F0, SqlTypes.BIGINT)
.valueColumn(BOB, F0, SqlTypes.STRING)
.build();

// Then:
assertThat(
schema.valueColumnIndex(ColumnRef.withoutSource(F0)),
is(OptionalInt.of(0))
);

assertThat(
schema.valueColumnIndex(ColumnRef.of(BOB, F0)),
is(OptionalInt.of(1))
);
}

@Test
Expand Down Expand Up @@ -564,10 +624,10 @@ public void shouldAddMetaAndKeyColumnsWhenAliased() {

// Then:
assertThat(result.value(), hasSize(schema.value().size() + 2));
assertThat(result.value().get(0).source().get(), is(SourceName.of("bob")));
assertThat(result.value().get(0).source().get(), is(BOB));
assertThat(result.value().get(0).name(), is(ROWTIME_NAME));
assertThat(result.value().get(0).type(), is(SqlTypes.BIGINT));
assertThat(result.value().get(1).source().get(), is(SourceName.of("bob")));
assertThat(result.value().get(1).source().get(), is(BOB));
assertThat(result.value().get(1).name(), is(ROWKEY_NAME));
assertThat(result.value().get(1).type(), is(SqlTypes.STRING));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ private final class Visitor extends DefaultTraversalVisitor<AstNode, Void> {
private void analyzeNonStdOutSink(final Sink sink) {
analysis.setProperties(sink.getProperties());
sink.getPartitionBy()
.map(name -> ColumnRef.of(sink.getName(), name.name()))
.map(name -> ColumnRef.withoutSource(name.name()))
.ifPresent(analysis::setPartitionBy);

setSerdeOptions(sink);
Expand Down Expand Up @@ -459,7 +459,7 @@ private ColumnRef getJoinFieldName(
final ColumnRef fieldName = joinFieldName.get();

final Optional<ColumnRef> joinField =
getJoinFieldNameFromSource(fieldName, sourceAlias, sourceSchema);
getJoinFieldNameFromSource(fieldName.withoutSource(), sourceAlias, sourceSchema);

return joinField
.orElseThrow(() -> new KsqlException(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@
import java.util.stream.Collectors;

/**
* Searches through the AST for any column references and throws if they are unknown fields.
* Searches through the AST for any column references and throws if they are unknown or ambiguous.
*/
class ExpressionAnalyzer {

Expand Down Expand Up @@ -122,7 +122,7 @@ public Object visitColumnReference(
final ColumnReferenceExp node,
final Object context
) {
throwOnUnknownColumn(node.getReference());
throwOnUnknownOrAmbiguousColumn(node.getReference());
return null;
}

Expand All @@ -135,11 +135,8 @@ public Object visitDereferenceExpression(
return null;
}

private void throwOnUnknownColumn(final ColumnRef name) {
// check all sources
final Set<SourceName> sourcesWithField = sourceSchemas.sourcesWithField(
ColumnRef.withoutSource(name.name())
);
private void throwOnUnknownOrAmbiguousColumn(final ColumnRef name) {
final Set<SourceName> sourcesWithField = sourceSchemas.sourcesWithField(name);

if (sourcesWithField.isEmpty()) {
if (allowWindowMetaFields && name.name().equals(SchemaUtil.WINDOWSTART_NAME)) {
Expand All @@ -150,13 +147,7 @@ private void throwOnUnknownColumn(final ColumnRef name) {
+ "' cannot be resolved.");
}

if (name.source().isPresent()) {
final SourceName qualifier = name.source().get();
if (!sourcesWithField.contains(qualifier)) {
throw new KsqlException("Source '" + qualifier.name() + "', "
+ "used in '" + name.aliasedFieldName() + "' cannot be resolved.");
}
} else if (sourcesWithField.size() > 1) {
if (sourcesWithField.size() > 1) {
final String possibilities = sourcesWithField.stream()
.map(source -> SchemaUtil.buildAliasedFieldName(source.name(), name.name().name()))
.sorted()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ Set<SourceName> sourcesWithField(final ColumnRef target) {
return ImmutableSet.of();
}

return sourceSchema.findColumn(target).isPresent()
return sourceSchema.findColumn(target.withoutSource()).isPresent()
? ImmutableSet.of(sourceName)
: ImmutableSet.of();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -151,14 +151,13 @@ private static Optional<ColumnName> buildKeyFieldName(
final CreateSource statement,
final LogicalSchema schema) {
if (statement.getProperties().getKeyField().isPresent()) {
final ColumnName columnName = statement.getProperties().getKeyField().get().name();
schema.findValueColumn(ColumnRef.withoutSource(columnName)).orElseThrow(
() -> new KsqlException(
final ColumnRef column = statement.getProperties().getKeyField().get();
schema.findValueColumn(column)
.orElseThrow(() -> new KsqlException(
"The KEY column set in the WITH clause does not exist in the schema: '"
+ columnName.toString(FormatOptions.noEscape()) + "'"
)
);
return Optional.of(columnName);
+ column.toString(FormatOptions.noEscape()) + "'"
));
return Optional.of(column.name());
} else {
return Optional.empty();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,14 +137,14 @@ private KeyField getKeyField(
return keyFieldOverride.get();
}
if (keyFieldName.isPresent()) {
final Column keyColumn = schema.findValueColumn(
// for DDL commands, the key name is never specified with a source
ColumnRef.withoutSource(keyFieldName.get()))
// for DDL commands, the key name is never specified with a source
final ColumnRef columnRef = ColumnRef.withoutSource(keyFieldName.get());
final Column keyColumn = schema.findValueColumn(columnRef)
.orElseThrow(() -> new IllegalStateException(
"The KEY column set in the WITH clause does not exist in the schema: '"
+ keyFieldName + "'"
));
return KeyField.of(ColumnRef.withoutSource(keyFieldName.get()), keyColumn);
return KeyField.of(columnRef, keyColumn);
} else {
return KeyField.none();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -368,7 +368,7 @@ List<Expression> resolveGroupByExpressions(
final ColumnReferenceExp nameRef = (ColumnReferenceExp) e;
return new ColumnReferenceExp(
nameRef.getLocation(),
ColumnRef.withoutSource(nameRef.getReference().name())
nameRef.getReference().withoutSource()
);
};

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -180,12 +180,7 @@ private int timestampIndex() {
}

return originalSchema.valueColumnIndex(timestampField)
.orElse(
originalSchema
.withAlias(alias)
.valueColumnIndex(timestampField)
.orElse(-1)
);
.orElseThrow(IllegalStateException::new);
}

private static Optional<Topology.AutoOffsetReset> getAutoOffsetReset(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,6 @@ public SchemaKStream<?> buildStream(final KsqlQueryBuilder builder) {
);
}

@SuppressWarnings("unchecked")
private SchemaKStream<?> createOutputStream(
final SchemaKStream schemaKStream,
final QueryContext.Stacker contextStacker
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,6 @@ public static SchemaKStream<?> forSource(
this.functionRegistry = requireNonNull(functionRegistry, "functionRegistry");
}

@SuppressWarnings("unchecked")
public SchemaKTable<K> toTable(
final KeyFormat keyFormat,
final ValueFormat valueFormat,
Expand Down Expand Up @@ -235,7 +234,6 @@ public SchemaKStream<K> withKeyField(final KeyField resultKeyField) {
);
}

@SuppressWarnings("unchecked")
public SchemaKStream<K> into(
final String kafkaTopicName,
final LogicalSchema outputSchema,
Expand Down Expand Up @@ -750,7 +748,7 @@ public FunctionRegistry getFunctionRegistry() {
return functionRegistry;
}

ColumnRef groupedKeyNameFor(final List<Expression> groupByExpressions) {
static ColumnRef groupedKeyNameFor(final List<Expression> groupByExpressions) {
if (groupByExpressions.size() == 1 && groupByExpressions.get(0) instanceof ColumnReferenceExp) {
return ((ColumnReferenceExp) groupByExpressions.get(0)).getReference();
}
Expand Down
Loading

0 comments on commit 3d9a1e8

Please sign in to comment.