Skip to content

Commit

Permalink
feat: use coherent naming scheme for generated java code
Browse files Browse the repository at this point in the history
  • Loading branch information
agavra committed Sep 25, 2019
1 parent 437d252 commit 8371eb8
Show file tree
Hide file tree
Showing 8 changed files with 153 additions and 101 deletions.
41 changes: 30 additions & 11 deletions ksql-common/src/main/java/io/confluent/ksql/schema/ksql/Column.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,21 +23,28 @@
import java.util.Optional;

/**
* A named field within KSQL schema types.
* A named column within KSQL schema types.
*/
@Immutable
public final class Column {

/**
* Unknown index, used for key columns that have not
* been resolved to a value column.
*/
public static final int UNKNOWN_IDX = -1;

private final ColumnRef ref;
private final SqlType type;
private final int index;

/**
* @param name the name of the field.
* @param type the type of the field.
* @return the immutable field.
*/
public static Column of(final ColumnName name, final SqlType type) {
return new Column(ColumnRef.of(name), type);
return new Column(ColumnRef.of(name), type, UNKNOWN_IDX);
}

/**
Expand All @@ -47,7 +54,7 @@ public static Column of(final ColumnName name, final SqlType type) {
* @return the immutable field.
*/
public static Column of(final SourceName source, final ColumnName name, final SqlType type) {
return new Column(ColumnRef.of(source, name), type);
return new Column(ColumnRef.of(source, name), type, UNKNOWN_IDX);
}

/**
Expand All @@ -61,16 +68,17 @@ public static Column of(
final ColumnName name,
final SqlType type
) {
return new Column(ColumnRef.of(source, name), type);
return new Column(ColumnRef.of(source, name), type, UNKNOWN_IDX);
}

private Column(final ColumnRef ref, final SqlType type) {
private Column(final ColumnRef ref, final SqlType type, final int index) {
this.ref = Objects.requireNonNull(ref, "name");
this.type = Objects.requireNonNull(type, "type");
this.index = index;
}

/**
* @return the fully qualified field name.
* @return the fully qualified column name.
*/
public String fullName() {
return ref.aliasedFieldName();
Expand All @@ -84,27 +92,38 @@ public Optional<SourceName> source() {
}

/**
* @return the name of the field, without any source / alias.
* @return the name of the column, without any source / alias.
*/
public ColumnName name() {
return ref.name();
}

/**
* @return the type of the field.
* @return the type of the column.
*/
public SqlType type() {
return type;
}

/**
* @return the index of the column, or {@link #UNKNOWN_IDX} if unknown
*/
public int index() {
return index;
}

/**
* Create a new Field that matches the current, but with the supplied {@code source}.
*
* @param source the source to set of the new field.
* @return the new field.
* @param source the source to set of the new column.
* @return the new column.
*/
public Column withSource(final SourceName source) {
return new Column(ref.withSource(source), type);
return new Column(ref.withSource(source), type, index);
}

public Column withIndex(final int index) {
return new Column(ref, type, index);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
import io.confluent.ksql.schema.ksql.types.SqlTypes;
import io.confluent.ksql.util.KsqlException;
import io.confluent.ksql.util.SchemaUtil;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Objects;
Expand Down Expand Up @@ -179,12 +178,10 @@ public OptionalInt valueColumnIndex(final ColumnName fullColumnName) {

@VisibleForTesting
public OptionalInt valueColumnIndex(final String fullColumnName) {
int idx = 0;
for (final Column column : value) {
if (column.fullName().equals(fullColumnName)) {
return OptionalInt.of(idx);
return OptionalInt.of(column.index());
}
++idx;
}

return OptionalInt.empty();
Expand All @@ -205,11 +202,11 @@ public LogicalSchema withAlias(final SourceName alias) {
throw new IllegalStateException("Already aliased");
}

return new LogicalSchema(
addAlias(alias, metadata),
addAlias(alias, key),
addAlias(alias, value)
);
return new Builder()
.metadataColumns(addAlias(alias, metadata))
.keyColumns(addAlias(alias, key))
.valueColumns(addAlias(alias, value))
.build();
}

/**
Expand All @@ -222,11 +219,11 @@ public LogicalSchema withoutAlias() {
throw new IllegalStateException("Not aliased");
}

return new LogicalSchema(
removeAlias(metadata),
removeAlias(key),
removeAlias(value)
);
return new Builder()
.metadataColumns(removeAlias(metadata))
.keyColumns(removeAlias(key))
.valueColumns(removeAlias(value))
.build();
}

/**
Expand All @@ -245,28 +242,19 @@ public boolean isAliased() {
* @return the new schema.
*/
public LogicalSchema withMetaAndKeyColsInValue() {
final List<Column> newValueColumns = new ArrayList<>(
metadata.size()
+ key.size()
+ value.size());
final Builder builder = new Builder();

newValueColumns.addAll(metadata);
newValueColumns.addAll(key);

value.forEach(f -> {
if (!doFindColumn(f.name().name(), newValueColumns).isPresent()) {
newValueColumns.add(f);
}
});
builder.metadataColumns(metadata);
builder.keyColumns(key);
builder.valueColumns(metadata);
builder.valueColumns(key);

final ImmutableList.Builder<Column> builder = ImmutableList.builder();
newValueColumns.forEach(builder::add);
value.stream()
.filter(c -> !doFindColumn(c.fullName(), key).isPresent())
.filter(c -> !doFindColumn(c.fullName(), metadata).isPresent())
.forEach(builder::valueColumn);

return new LogicalSchema(
metadata,
key,
builder.build()
);
return builder.build();
}

/**
Expand All @@ -275,19 +263,17 @@ public LogicalSchema withMetaAndKeyColsInValue() {
* @return the new schema with the columns removed.
*/
public LogicalSchema withoutMetaAndKeyColsInValue() {
final ImmutableList.Builder<Column> builder = ImmutableList.builder();
final Builder builder = new Builder();

final Set<ColumnName> excluded = metaAndKeyColumnNames();
builder.metadataColumns(metadata);
builder.keyColumns(key);

value.stream()
.filter(f -> !excluded.contains(f.name()))
.forEach(builder::add);
.filter(c -> !doFindColumn(c.fullName(), key).isPresent())
.filter(c -> !doFindColumn(c.fullName(), metadata).isPresent())
.forEach(builder::valueColumn);

return new LogicalSchema(
metadata,
key,
builder.build()
);
return builder.build();
}

/**
Expand Down Expand Up @@ -409,12 +395,21 @@ private static ConnectSchema toConnectSchema(

public static class Builder {

private final ImmutableList.Builder<Column> metadataBuilder = ImmutableList.builder();
private final ImmutableList.Builder<Column> keyBuilder = ImmutableList.builder();
private final ImmutableList.Builder<Column> valueBuilder = ImmutableList.builder();

private final Set<String> seenKeys = new HashSet<>();
private final Set<String> seenValues = new HashSet<>();

int valueIndex = 0;

// used only for internal copy (keeps the alias from the original schema)
private Builder metadataColumns(final List<Column> metadata) {
metadataBuilder.addAll(metadata);
return this;
}

public Builder keyColumn(final ColumnName columnName, final SqlType type) {
keyColumn(Column.of(columnName, type));
return this;
Expand Down Expand Up @@ -447,7 +442,7 @@ public Builder valueColumn(final Column column) {
if (!seenValues.add(column.fullName())) {
throw new KsqlException("Duplicate values found in schema: " + column);
}
valueBuilder.add(column);
valueBuilder.add(column.withIndex(valueIndex++));
return this;
}

Expand All @@ -457,14 +452,15 @@ public Builder valueColumns(final Iterable<? extends Column> column) {
}

public LogicalSchema build() {
final List<Column> metadata = metadataBuilder.build();
final List<Column> suppliedKey = keyBuilder.build();

final List<Column> key = suppliedKey.isEmpty()
? IMPLICIT_KEY_SCHEMA
: suppliedKey;

return new LogicalSchema(
METADATA_SCHEMA,
metadata.isEmpty() ? METADATA_SCHEMA : metadata,
key,
valueBuilder.build()
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,8 @@ public void shouldImplementEqualsProperly() {
new EqualsTester()
.addEqualityGroup(
Column.of(SOME_NAME, SqlTypes.INTEGER),
Column.of(SOME_NAME, SqlTypes.INTEGER)
Column.of(SOME_NAME, SqlTypes.INTEGER),
Column.of(SOME_NAME, SqlTypes.INTEGER).withIndex(1) // index is not considered
)
.addEqualityGroup(
Column.of(SOME_OHTER_NAME, SqlTypes.INTEGER)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,7 @@ private void addParameter(final Column schemaColumn) {
parameters.add(new ParameterType(
SQL_TO_JAVA_TYPE_CONVERTER.toJavaType(schemaColumn.type()),
schemaColumn.fullName(),
schemaColumn.fullName().replace(".", "_"),
CodeGenUtils.paramName(schemaColumn),
ksqlConfig));
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/*
* Copyright 2019 Confluent Inc.
*
* Licensed under the Confluent Community License (the "License"; you may not use
* this file except in compliance with the License. You may obtain a copy of the
* License at
*
* http://www.confluent.io/confluent-community-license
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/

package io.confluent.ksql.execution.codegen;

import io.confluent.ksql.schema.ksql.Column;
import io.confluent.ksql.util.KsqlPreconditions;

final class CodeGenUtils {

private static final String PARAM_PREFIX = "var";

private CodeGenUtils() {
}

static String paramName(final Column column) {
KsqlPreconditions.checkArgument(
column.index() != Column.UNKNOWN_IDX,
"Expected known index for column '" + column.fullName() + "'. This is likely a bug!"
);

return PARAM_PREFIX + column.index();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -272,7 +272,7 @@ public Pair<String, Schema> visitQualifiedNameReference(
new KsqlException("Field not found: " + fieldName));

final Schema schema = SQL_TO_CONNECT_SCHEMA_CONVERTER.toConnectSchema(schemaColumn.type());
return new Pair<>(fieldName.replace(".", "_"), schema);
return new Pair<>(CodeGenUtils.paramName(schemaColumn), schema);
}

@Override
Expand Down
Loading

0 comments on commit 8371eb8

Please sign in to comment.