Skip to content

Commit

Permalink
refactor: type safety for column, source and function name strings (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
agavra authored Sep 25, 2019
1 parent 49f456e commit 437d252
Show file tree
Hide file tree
Showing 262 changed files with 2,904 additions and 2,642 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
import io.confluent.ksql.serde.GenericRowSerDe;
import io.confluent.ksql.util.KsqlConfig;
import io.confluent.ksql.util.Pair;
import io.confluent.ksql.util.SchemaUtil;
import java.io.InputStream;
import java.nio.file.Path;
import java.nio.file.Paths;
Expand Down Expand Up @@ -110,10 +109,6 @@ private InputStream getSchemaStream() {
@State(Scope.Thread)
public static class SerdeState {

private static final org.apache.kafka.connect.data.Schema KEY_SCHEMA = SchemaBuilder.struct()
.field(SchemaUtil.ROWKEY_NAME, org.apache.kafka.connect.data.Schema.OPTIONAL_STRING_SCHEMA)
.build();

Serializer<GenericRow> serializer;
Deserializer<GenericRow> deserializer;
GenericRow row;
Expand Down
17 changes: 9 additions & 8 deletions ksql-cli/src/test/java/io/confluent/ksql/cli/CliTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import io.confluent.ksql.cli.console.cmd.RemoteServerSpecificCommand;
import io.confluent.ksql.cli.console.cmd.RequestPipeliningCommand;
import io.confluent.ksql.integration.Retry;
import io.confluent.ksql.name.ColumnName;
import io.confluent.ksql.rest.Errors;
import io.confluent.ksql.rest.client.KsqlRestClient;
import io.confluent.ksql.rest.client.KsqlRestClientException;
Expand Down Expand Up @@ -536,9 +537,9 @@ public void testSelectProject() {

final PhysicalSchema resultSchema = PhysicalSchema.from(
LogicalSchema.builder()
.valueColumn("ITEMID", SqlTypes.STRING)
.valueColumn("ORDERUNITS", SqlTypes.DOUBLE)
.valueColumn("PRICEARRAY", SqlTypes.array(SqlTypes.DOUBLE))
.valueColumn(ColumnName.of("ITEMID"), SqlTypes.STRING)
.valueColumn(ColumnName.of("ORDERUNITS"), SqlTypes.DOUBLE)
.valueColumn(ColumnName.of("PRICEARRAY"), SqlTypes.array(SqlTypes.DOUBLE))
.build(),
SerdeOption.none()
);
Expand Down Expand Up @@ -670,11 +671,11 @@ public void testSelectUDFs() {

final PhysicalSchema resultSchema = PhysicalSchema.from(
LogicalSchema.builder()
.valueColumn("ITEMID", SqlTypes.STRING)
.valueColumn("COL1", SqlTypes.DOUBLE)
.valueColumn("COL2", SqlTypes.DOUBLE)
.valueColumn("COL3", SqlTypes.DOUBLE)
.valueColumn("COL4", SqlTypes.BOOLEAN)
.valueColumn(ColumnName.of("ITEMID"), SqlTypes.STRING)
.valueColumn(ColumnName.of("COL1"), SqlTypes.DOUBLE)
.valueColumn(ColumnName.of("COL2"), SqlTypes.DOUBLE)
.valueColumn(ColumnName.of("COL3"), SqlTypes.DOUBLE)
.valueColumn(ColumnName.of("COL4"), SqlTypes.BOOLEAN)
.build(),
SerdeOption.none()
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import io.confluent.ksql.cli.console.Console.NoOpRowCaptor;
import io.confluent.ksql.cli.console.cmd.CliSpecificCommand;
import io.confluent.ksql.metastore.model.DataSource.DataSourceType;
import io.confluent.ksql.name.ColumnName;
import io.confluent.ksql.rest.Errors;
import io.confluent.ksql.rest.entity.ArgumentInfo;
import io.confluent.ksql.rest.entity.CommandId;
Expand Down Expand Up @@ -1414,7 +1415,7 @@ private static List<FieldInfo> buildTestSchema(final SqlType... fieldTypes) {
final Builder schemaBuilder = LogicalSchema.builder();

for (int idx = 0; idx < fieldTypes.length; idx++) {
schemaBuilder.valueColumn("f_" + idx, fieldTypes[idx]);
schemaBuilder.valueColumn(ColumnName.of("f_" + idx), fieldTypes[idx]);
}

final LogicalSchema schema = schemaBuilder.build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@

import com.google.common.collect.ImmutableList;
import io.confluent.ksql.cli.console.table.Table;
import io.confluent.ksql.name.ColumnName;
import io.confluent.ksql.rest.entity.TableRowsEntity;
import io.confluent.ksql.schema.ksql.LogicalSchema;
import io.confluent.ksql.schema.ksql.types.SqlTypes;
Expand All @@ -38,35 +39,35 @@ public class TableRowsTableBuilderTest {
private static final String SOME_SQL = "some sql";

private static final LogicalSchema SCHEMA = LogicalSchema.builder()
.keyColumn("k0", SqlTypes.BIGINT)
.keyColumn("k1", SqlTypes.DOUBLE)
.valueColumn("v0", SqlTypes.STRING)
.valueColumn("v1", SqlTypes.INTEGER)
.keyColumn(ColumnName.of("k0"), SqlTypes.BIGINT)
.keyColumn(ColumnName.of("k1"), SqlTypes.DOUBLE)
.valueColumn(ColumnName.of("v0"), SqlTypes.STRING)
.valueColumn(ColumnName.of("v1"), SqlTypes.INTEGER)
.build();

private static final List<?> VALUES = ImmutableList.of(
10L, 5.1D, "x", 5
);

private static final LogicalSchema TIME_WINDOW_SCHEMA = LogicalSchema.builder()
.keyColumn("k0", SqlTypes.BIGINT)
.keyColumn("k1", SqlTypes.DOUBLE)
.keyColumn("WINDOWSTART", SqlTypes.BIGINT)
.valueColumn("v0", SqlTypes.STRING)
.valueColumn("v1", SqlTypes.INTEGER)
.keyColumn(ColumnName.of("k0"), SqlTypes.BIGINT)
.keyColumn(ColumnName.of("k1"), SqlTypes.DOUBLE)
.keyColumn(ColumnName.of("WINDOWSTART"), SqlTypes.BIGINT)
.valueColumn(ColumnName.of("v0"), SqlTypes.STRING)
.valueColumn(ColumnName.of("v1"), SqlTypes.INTEGER)
.build();

private static final List<?> TIME_WINDOW_VALUES = ImmutableList.of(
10L, 5.1D, 123456L, "x", 5
);

private static final LogicalSchema SESSION_WINDOW_SCHEMA = LogicalSchema.builder()
.keyColumn("k0", SqlTypes.BIGINT)
.keyColumn("k1", SqlTypes.DOUBLE)
.keyColumn("WINDOWSTART", SqlTypes.BIGINT)
.keyColumn("WINDOWEND", SqlTypes.BIGINT)
.valueColumn("v0", SqlTypes.STRING)
.valueColumn("v1", SqlTypes.INTEGER)
.keyColumn(ColumnName.of("k0"), SqlTypes.BIGINT)
.keyColumn(ColumnName.of("k1"), SqlTypes.DOUBLE)
.keyColumn(ColumnName.of("WINDOWSTART"), SqlTypes.BIGINT)
.keyColumn(ColumnName.of("WINDOWEND"), SqlTypes.BIGINT)
.valueColumn(ColumnName.of("v0"), SqlTypes.STRING)
.valueColumn(ColumnName.of("v1"), SqlTypes.INTEGER)
.build();

private static final List<?> SESSION_WINDOW_VALUES = ImmutableList.of(
Expand Down
44 changes: 44 additions & 0 deletions ksql-common/src/main/java/io/confluent/ksql/name/ColumnName.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
/*
* Copyright 2019 Confluent Inc.
*
* Licensed under the Confluent Community License (the "License"; you may not use
* this file except in compliance with the License. You may obtain a copy of the
* License at
*
* http://www.confluent.io/confluent-community-license
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/

package io.confluent.ksql.name;

import com.google.errorprone.annotations.Immutable;

/**
* The name of a column within a source.
*/
@Immutable
public final class ColumnName extends Name<ColumnName> {

private static final String AGGREGATE_COLUMN_PREFIX = "KSQL_AGG_VARIABLE_";

public static ColumnName aggregate(final int idx) {
return of(AGGREGATE_COLUMN_PREFIX + idx);
}

public static ColumnName of(final String name) {
return new ColumnName(name);
}

private ColumnName(final String name) {
super(name);
}

public boolean isAggregate() {
return name.startsWith(AGGREGATE_COLUMN_PREFIX);
}

}
33 changes: 33 additions & 0 deletions ksql-common/src/main/java/io/confluent/ksql/name/FunctionName.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/*
* Copyright 2019 Confluent Inc.
*
* Licensed under the Confluent Community License (the "License"; you may not use
* this file except in compliance with the License. You may obtain a copy of the
* License at
*
* http://www.confluent.io/confluent-community-license
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/

package io.confluent.ksql.name;

import com.google.errorprone.annotations.Immutable;

/**
* The name for a function (UDF or UDAF).
*/
@Immutable
public final class FunctionName extends Name<FunctionName> {

public static FunctionName of(final String name) {
return new FunctionName(name);
}

private FunctionName(final String name) {
super(name);
}
}
96 changes: 96 additions & 0 deletions ksql-common/src/main/java/io/confluent/ksql/name/Name.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
/*
* Copyright 2019 Confluent Inc.
*
* Licensed under the Confluent Community License (the "License"; you may not use
* this file except in compliance with the License. You may obtain a copy of the
* License at
*
* http://www.confluent.io/confluent-community-license
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/

package io.confluent.ksql.name;

import com.google.errorprone.annotations.Immutable;
import io.confluent.ksql.schema.ksql.FormatOptions;
import io.confluent.ksql.util.Identifiers;
import java.util.Objects;

/**
* The base type for all names, which just wraps a String in
* a type-safe wrapper and supplies formatting options.
*
* @param <T> ensure type safety of methods
*/
@Immutable
public abstract class Name<T extends Name<?>> {

protected final String name;

protected Name(final String name) {
this.name = Identifiers.ensureTrimmed(Objects.requireNonNull(name, "name"), "name");
}

// we should remove this getter after all code has
// migrated to use Name instead of Strings to make
// sure that we never lose type safety
public String name() {
return name;
}

/**
* @see String#equalsIgnoreCase(String)
*/
public boolean equalsIgnoreCase(final T o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}

return name.equalsIgnoreCase(o.name());
}

public boolean startsWith(final T o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}

return name.startsWith(o.name());
}

@Override
public boolean equals(final Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
final Name<?> that = (Name<?>) o;
return Objects.equals(name, that.name);
}

@Override
public int hashCode() {
return Objects.hash(getClass(), name);
}

@Override
public String toString() {
return toString(FormatOptions.none());
}

public String toString(final FormatOptions formatOptions) {
return formatOptions.escape(name);
}

}
34 changes: 34 additions & 0 deletions ksql-common/src/main/java/io/confluent/ksql/name/SourceName.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/*
* Copyright 2019 Confluent Inc.
*
* Licensed under the Confluent Community License (the "License"; you may not use
* this file except in compliance with the License. You may obtain a copy of the
* License at
*
* http://www.confluent.io/confluent-community-license
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/

package io.confluent.ksql.name;

import com.google.errorprone.annotations.Immutable;

/**
* The name of a source (stream or table).
*/
@Immutable
public final class SourceName extends Name<SourceName> {

public static SourceName of(final String name) {
return new SourceName(name);
}

private SourceName(final String name) {
super(name);
}

}
Loading

0 comments on commit 437d252

Please sign in to comment.