Skip to content

Commit

Permalink
feat: serialize expressions (#3721)
Browse files Browse the repository at this point in the history
feat: serialize expressions

Adds serialization modules for serializing/parsing Expression, ColumnRef,
SelectExpression, and WindowExpression into/from ksql snippets.

ColumnRef is serialized to / parsed from column reference expressions.

SelectExpression is serialized to / parsed from SingleColumn AST nodes.
  • Loading branch information
rodesai authored Nov 15, 2019
1 parent a6fa318 commit e1cd477
Show file tree
Hide file tree
Showing 32 changed files with 1,159 additions and 56 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -296,10 +296,10 @@ public void shouldPreserveAliasIfPresent() {
private static Statement givenQuery(final String sql) {
final List<ParsedStatement> statements = new DefaultKsqlParser().parse(sql);
assertThat(statements, hasSize(1));
return new AstBuilder(META_STORE).build(statements.get(0).getStatement());
return new AstBuilder(META_STORE).buildStatement(statements.get(0).getStatement());
}

private static ColumnReferenceExp column(final SourceName source, final String fieldName) {
return new ColumnReferenceExp(ColumnRef.of(source, ColumnName.of(fieldName)));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,6 @@ public void shouldThrowIfRightJoinSourceDoesNotExist() {
private static AstNode givenQuery(final String sql) {
final List<ParsedStatement> statements = new DefaultKsqlParser().parse(sql);
assertThat(statements, hasSize(1));
return new AstBuilder(META_STORE).build(statements.get(0).getStatement());
return new AstBuilder(META_STORE).buildStatement(statements.get(0).getStatement());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,10 @@

package io.confluent.ksql.execution.plan;

import io.confluent.ksql.execution.expression.formatter.ExpressionFormatter;
import io.confluent.ksql.execution.expression.tree.Expression;
import io.confluent.ksql.name.ColumnName;
import io.confluent.ksql.schema.ksql.FormatOptions;
import java.util.Objects;
import javax.annotation.concurrent.Immutable;

Expand All @@ -25,6 +27,7 @@
*/
@Immutable
public final class SelectExpression {
private static final String FMT = "%s AS %s";

private final ColumnName alias;
private final Expression expression;
Expand Down Expand Up @@ -66,9 +69,14 @@ public int hashCode() {

@Override
public String toString() {
return "SelectExpression{"
+ "name='" + alias + '\''
+ ", expression=" + expression
+ '}';
return format(FormatOptions.none());
}

public String format(final FormatOptions formatOptions) {
return String.format(
FMT,
ExpressionFormatter.formatExpression(expression, formatOptions),
alias.toString(formatOptions)
);
}
}
34 changes: 23 additions & 11 deletions ksql-parser/src/main/java/io/confluent/ksql/parser/AstBuilder.java
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,6 @@
import io.confluent.ksql.parser.SqlBaseParser.ListConnectorsContext;
import io.confluent.ksql.parser.SqlBaseParser.ListTypesContext;
import io.confluent.ksql.parser.SqlBaseParser.RegisterTypeContext;
import io.confluent.ksql.parser.SqlBaseParser.SingleStatementContext;
import io.confluent.ksql.parser.SqlBaseParser.SourceNameContext;
import io.confluent.ksql.parser.SqlBaseParser.TablePropertiesContext;
import io.confluent.ksql.parser.SqlBaseParser.TablePropertyContext;
Expand Down Expand Up @@ -158,12 +157,25 @@ public AstBuilder(final TypeRegistry typeRegistry) {
this.typeRegistry = requireNonNull(typeRegistry, "typeRegistry");
}

public Statement build(final SingleStatementContext statement) {
final Node result = new Visitor(
getSources(statement),
typeRegistry
).visit(statement);
return (Statement) result;
@SuppressWarnings("unchecked")
public Statement buildStatement(final ParserRuleContext parseTree) {
return build(Optional.of(getSources(parseTree)), typeRegistry, parseTree);
}

public Expression buildExpression(final ParserRuleContext parseTree) {
return build(Optional.empty(), typeRegistry, parseTree);
}

public WindowExpression buildWindowExpression(final ParserRuleContext parseTree) {
return build(Optional.empty(), typeRegistry, parseTree);
}

@SuppressWarnings("unchecked")
private <T extends Node> T build(
final Optional<Set<SourceName>> sources,
final TypeRegistry typeRegistry,
final ParserRuleContext parseTree) {
return (T) new Visitor(sources, typeRegistry).visit(parseTree);
}

// CHECKSTYLE_RULES.OFF: ClassDataAbstractionCoupling
Expand All @@ -172,13 +184,13 @@ private static final class Visitor extends SqlBaseBaseVisitor<Node> {

private static final String DEFAULT_WINDOW_NAME = "StreamWindow";

private final Set<SourceName> sources;
private final Optional<Set<SourceName>> sources;
private final SqlTypeParser typeParser;

private boolean buildingPersistentQuery = false;

Visitor(final Set<SourceName> sources, final TypeRegistry typeRegistry) {
this.sources = ImmutableSet.copyOf(Objects.requireNonNull(sources, "sources"));
Visitor(final Optional<Set<SourceName>> sources, final TypeRegistry typeRegistry) {
this.sources = Objects.requireNonNull(sources, "sources").map(ImmutableSet::copyOf);
this.typeParser = SqlTypeParser.create(typeRegistry);
}

Expand Down Expand Up @@ -1088,7 +1100,7 @@ public Node visitRegisterType(final RegisterTypeContext context) {
}

private void throwOnUnknownNameOrAlias(final SourceName name) {
if (!sources.contains(name)) {
if (sources.isPresent() && !sources.get().contains(name)) {
throw new KsqlException("'" + name.name() + "' is not a valid stream/table name or alias.");
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ public PreparedStatement<?> prepare(
) {
try {
final AstBuilder astBuilder = new AstBuilder(typeRegistry);
final Statement root = astBuilder.build(stmt.getStatement());
final Statement root = astBuilder.buildStatement(stmt.getStatement());

return PreparedStatement.of(stmt.getStatementText(), root);
} catch (final ParseFailedException e) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
/*
* Copyright 2019 Confluent Inc.
*
* Licensed under the Confluent Community License (the "License"); you may not use
* this file except in compliance with the License. You may obtain a copy of the
* License at
*
* http://www.confluent.io/confluent-community-license
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/

package io.confluent.ksql.parser;

import io.confluent.ksql.execution.expression.tree.Expression;
import io.confluent.ksql.execution.plan.SelectExpression;
import io.confluent.ksql.execution.windows.KsqlWindowExpression;
import io.confluent.ksql.metastore.TypeRegistry;
import io.confluent.ksql.name.ColumnName;
import io.confluent.ksql.parser.tree.WindowExpression;
import io.confluent.ksql.util.ParserUtil;
import org.antlr.v4.runtime.ParserRuleContext;

public final class ExpressionParser {
private ExpressionParser() {
}

public static SelectExpression parseSelectExpression(final String expressionText) {
final SqlBaseParser.SelectItemContext parseCtx = GrammarParseUtil.getParseTree(
expressionText,
SqlBaseParser::selectItem
);
if (!(parseCtx instanceof SqlBaseParser.SelectSingleContext)) {
throw new IllegalArgumentException("Illegal select item type in: " + expressionText);
}
final SqlBaseParser.SelectSingleContext selectSingleContext =
(SqlBaseParser.SelectSingleContext) parseCtx;
if (selectSingleContext.identifier() == null) {
throw new IllegalArgumentException("Select item must have identifier in: " + expressionText);
}
return SelectExpression.of(
ColumnName.of(ParserUtil.getIdentifierText(selectSingleContext.identifier())),
new AstBuilder(TypeRegistry.EMPTY).buildExpression(selectSingleContext.expression())
);
}

public static Expression parseExpression(final String expressionText) {
final ParserRuleContext parseTree = GrammarParseUtil.getParseTree(
expressionText,
SqlBaseParser::singleExpression
);
return new AstBuilder(TypeRegistry.EMPTY).buildExpression(parseTree);
}

public static KsqlWindowExpression parseWindowExpression(final String expressionText) {
final ParserRuleContext parseTree = GrammarParseUtil.getParseTree(
expressionText,
SqlBaseParser::windowExpression
);
final WindowExpression windowExpression =
new AstBuilder(TypeRegistry.EMPTY).buildWindowExpression(parseTree);
return windowExpression.getKsqlWindowExpression();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
/*
* Copyright 2019 Confluent Inc.
*
* Licensed under the Confluent Community License (the "License"); you may not use
* this file except in compliance with the License. You may obtain a copy of the
* License at
*
* http://www.confluent.io/confluent-community-license
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/

package io.confluent.ksql.parser;

import java.util.function.Function;
import org.antlr.v4.runtime.BaseErrorListener;
import org.antlr.v4.runtime.CharStreams;
import org.antlr.v4.runtime.CommonTokenStream;
import org.antlr.v4.runtime.ParserRuleContext;
import org.antlr.v4.runtime.RecognitionException;
import org.antlr.v4.runtime.Recognizer;
import org.antlr.v4.runtime.atn.PredictionMode;
import org.antlr.v4.runtime.misc.ParseCancellationException;

final class GrammarParseUtil {
private static final BaseErrorListener ERROR_LISTENER = new BaseErrorListener() {
@Override
public void syntaxError(
final Recognizer<?, ?> recognizer,
final Object offendingSymbol,
final int line,
final int charPositionInLine,
final String message,
final RecognitionException e
) {
throw new ParsingException(message, e, line, charPositionInLine);
}
};

private GrammarParseUtil() {
}

static <T extends ParserRuleContext> T getParseTree(
final String text,
final Function<SqlBaseParser, T> parseFunction) {
final SqlBaseLexer sqlBaseLexer = new SqlBaseLexer(
new CaseInsensitiveStream(CharStreams.fromString(text)));
final CommonTokenStream tokenStream = new CommonTokenStream(sqlBaseLexer);
final SqlBaseParser sqlBaseParser = new SqlBaseParser(tokenStream);

sqlBaseLexer.removeErrorListeners();
sqlBaseLexer.addErrorListener(ERROR_LISTENER);

sqlBaseParser.removeErrorListeners();
sqlBaseParser.addErrorListener(ERROR_LISTENER);

try {
// first, try parsing w/ potentially faster SLL mode
sqlBaseParser.getInterpreter().setPredictionMode(PredictionMode.SLL);
return castContext(parseFunction.apply(sqlBaseParser));
} catch (final ParseCancellationException ex) {
// if we fail, parse with LL mode
tokenStream.seek(0); // rewind input stream
sqlBaseParser.reset();

sqlBaseParser.getInterpreter().setPredictionMode(PredictionMode.LL);
return castContext(parseFunction.apply(sqlBaseParser));
}
}

@SuppressWarnings("unchecked")
private static <T extends ParserRuleContext> T castContext(final ParserRuleContext ctx) {
return (T) ctx;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ public ParsingException(final String message, final Optional<NodeLocation> nodeL
);
}

ParsingException(
public ParsingException(
final String message,
final RecognitionException cause,
final int line,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
/*
* Copyright 2019 Confluent Inc.
*
* Licensed under the Confluent Community License (the "License"); you may not use
* this file except in compliance with the License. You may obtain a copy of the
* License at
*
* http://www.confluent.io/confluent-community-license
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/

package io.confluent.ksql.parser.json;

import com.fasterxml.jackson.core.JsonParser;
import com.fasterxml.jackson.databind.DeserializationContext;
import com.fasterxml.jackson.databind.JsonDeserializer;
import io.confluent.ksql.execution.expression.tree.ColumnReferenceExp;
import io.confluent.ksql.execution.expression.tree.Expression;
import io.confluent.ksql.parser.ExpressionParser;
import io.confluent.ksql.schema.ksql.ColumnRef;
import java.io.IOException;

class ColumnRefDeserializer extends JsonDeserializer<ColumnRef> {
@Override
public ColumnRef deserialize(final JsonParser parser, final DeserializationContext ctx)
throws IOException {
final Expression expression
= ExpressionParser.parseExpression(parser.readValueAs(String.class));
if (expression instanceof ColumnReferenceExp) {
return ((ColumnReferenceExp) expression).getReference();
}
throw new IllegalArgumentException("Passed JSON is not a column reference: " + expression);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/*
* Copyright 2019 Confluent Inc.
*
* Licensed under the Confluent Community License (the "License"); you may not use
* this file except in compliance with the License. You may obtain a copy of the
* License at
*
* http://www.confluent.io/confluent-community-license
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/

package io.confluent.ksql.parser.json;

import com.fasterxml.jackson.core.JsonGenerator;
import com.fasterxml.jackson.databind.JsonSerializer;
import com.fasterxml.jackson.databind.SerializerProvider;
import io.confluent.ksql.schema.ksql.ColumnRef;
import io.confluent.ksql.schema.ksql.FormatOptions;
import io.confluent.ksql.util.IdentifierUtil;
import java.io.IOException;

class ColumnRefSerializer extends JsonSerializer<ColumnRef> {
@Override
public void serialize(
final ColumnRef columnRef,
final JsonGenerator gen,
final SerializerProvider provider) throws IOException {
gen.writeString(columnRef.toString(FormatOptions.of(IdentifierUtil::needsQuotes)));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/*
* Copyright 2019 Confluent Inc.
*
* Licensed under the Confluent Community License (the "License"); you may not use
* this file except in compliance with the License. You may obtain a copy of the
* License at
*
* http://www.confluent.io/confluent-community-license
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/

package io.confluent.ksql.parser.json;

import com.fasterxml.jackson.core.JsonParser;
import com.fasterxml.jackson.databind.DeserializationContext;
import com.fasterxml.jackson.databind.JsonDeserializer;
import io.confluent.ksql.execution.expression.tree.Expression;
import io.confluent.ksql.parser.ExpressionParser;
import java.io.IOException;

class ExpressionDeserializer<T extends Expression> extends JsonDeserializer<T> {
@Override
@SuppressWarnings("unchecked")
public T deserialize(final JsonParser parser, final DeserializationContext ctx)
throws IOException {
return (T) ExpressionParser.parseExpression(parser.readValueAs(String.class));
}
}
Loading

0 comments on commit e1cd477

Please sign in to comment.