Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: expression support for insert values #3612

Merged
merged 4 commits into from
Oct 19, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,17 @@
package io.confluent.ksql.engine;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Iterables;
import com.google.common.collect.Streams;
import io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException;
import io.confluent.ksql.GenericRow;
import io.confluent.ksql.KsqlExecutionContext;
import io.confluent.ksql.exception.KsqlTopicAuthorizationException;
import io.confluent.ksql.execution.codegen.CodeGenRunner;
import io.confluent.ksql.execution.codegen.ExpressionMetadata;
import io.confluent.ksql.execution.expression.tree.Expression;
import io.confluent.ksql.execution.expression.tree.Literal;
import io.confluent.ksql.execution.expression.tree.NullLiteral;
import io.confluent.ksql.execution.expression.tree.VisitParentExpressionVisitor;
import io.confluent.ksql.function.FunctionRegistry;
import io.confluent.ksql.logging.processing.NoopProcessingLogContext;
import io.confluent.ksql.metastore.model.DataSource;
import io.confluent.ksql.metastore.model.DataSource.DataSourceType;
Expand Down Expand Up @@ -63,6 +65,7 @@
import java.util.concurrent.Future;
import java.util.function.LongSupplier;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.commons.lang3.exception.ExceptionUtils;
import org.apache.http.HttpStatus;
import org.apache.kafka.clients.producer.Producer;
Expand Down Expand Up @@ -190,7 +193,12 @@ private ProducerRecord<byte[], byte[]> buildRecord(
}

try {
final RowData row = extractRow(insertValues, dataSource);
final RowData row = extractRow(
insertValues,
dataSource,
executionContext.getMetaStore(),
config);

final byte[] key = serializeKey(row.key, dataSource, config, serviceContext);
final byte[] value = serializeValue(row.value, dataSource, config, serviceContext);

Expand Down Expand Up @@ -226,15 +234,18 @@ private void throwIfDisabled(final KsqlConfig config) {

private RowData extractRow(
final InsertValues insertValues,
final DataSource<?> dataSource
final DataSource<?> dataSource,
final FunctionRegistry functionRegistry,
final KsqlConfig config
) {
final List<ColumnName> columns = insertValues.getColumns().isEmpty()
? implicitColumns(dataSource, insertValues.getValues())
: insertValues.getColumns();

final LogicalSchema schema = dataSource.getSchema();

final Map<ColumnName, Object> values = resolveValues(insertValues, columns, schema);
final Map<ColumnName, Object> values = resolveValues(
insertValues, columns, schema, functionRegistry, config);

handleExplicitKeyField(values, dataSource.getKeyField());

Expand Down Expand Up @@ -306,15 +317,18 @@ private static List<ColumnName> implicitColumns(
private static Map<ColumnName, Object> resolveValues(
final InsertValues insertValues,
final List<ColumnName> columns,
final LogicalSchema schema
final LogicalSchema schema,
final FunctionRegistry functionRegistry,
final KsqlConfig config
) {
final Map<ColumnName, Object> values = new HashMap<>();
for (int i = 0; i < columns.size(); i++) {
final ColumnName column = columns.get(i);
final SqlType columnType = columnType(column, schema);
final Expression valueExp = insertValues.getValues().get(i);

final Object value = new ExpressionResolver(columnType, column)
final Object value =
new ExpressionResolver(columnType, column, schema, functionRegistry, config)
.process(valueExp, null);

values.put(column, value);
Expand Down Expand Up @@ -482,26 +496,39 @@ private static class ExpressionResolver extends VisitParentExpressionVisitor<Obj

private final SqlType fieldType;
private final ColumnName fieldName;
private final LogicalSchema schema;
private final SqlValueCoercer defaultSqlValueCoercer = new DefaultSqlValueCoercer();

ExpressionResolver(final SqlType fieldType, final ColumnName fieldName) {
private final FunctionRegistry functionRegistry;
private final KsqlConfig config;

ExpressionResolver(
final SqlType fieldType,
final ColumnName fieldName,
final LogicalSchema schema,
final FunctionRegistry functionRegistry,
final KsqlConfig config
) {
this.fieldType = Objects.requireNonNull(fieldType, "fieldType");
this.fieldName = Objects.requireNonNull(fieldName, "fieldName");
this.schema = Objects.requireNonNull(schema, "schema");
this.functionRegistry = Objects.requireNonNull(functionRegistry, "functionRegistry");
this.config = Objects.requireNonNull(config, "config");
}

@Override
protected String visitExpression(final Expression expression, final Void context) {
throw new KsqlException(
"Only Literals are supported for INSERT INTO. Got: "
+ expression + " for field " + fieldName);
}

@Override
protected Object visitLiteral(final Literal node, final Void context) {
final Object value = node.getValue();
if (node instanceof NullLiteral || value == null) {
return null;
}
protected Object visitExpression(final Expression expression, final Void context) {
agavra marked this conversation as resolved.
Show resolved Hide resolved
final ExpressionMetadata metadata =
Iterables.getOnlyElement(
CodeGenRunner.compileExpressions(
Stream.of(expression),
"insert value",
schema,
config,
functionRegistry)
);

// we expect no column references, so we can pass in an empty generic row
final Object value = metadata.evaluate(new GenericRow());

return defaultSqlValueCoercer.coerce(value, fieldType)
.orElseThrow(() -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,11 @@
import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;
import io.confluent.ksql.GenericRow;
import io.confluent.ksql.execution.ddl.commands.KsqlTopic;
import io.confluent.ksql.execution.expression.tree.ArithmeticUnaryExpression;
import io.confluent.ksql.execution.expression.tree.BooleanLiteral;
import io.confluent.ksql.execution.expression.tree.DoubleLiteral;
import io.confluent.ksql.execution.expression.tree.Expression;
import io.confluent.ksql.execution.expression.tree.FunctionCall;
import io.confluent.ksql.execution.expression.tree.IntegerLiteral;
import io.confluent.ksql.execution.expression.tree.LongLiteral;
import io.confluent.ksql.execution.expression.tree.StringLiteral;
Expand All @@ -43,13 +45,16 @@
import io.confluent.ksql.metastore.model.KsqlStream;
import io.confluent.ksql.metastore.model.KsqlTable;
import io.confluent.ksql.name.ColumnName;
import io.confluent.ksql.name.FunctionName;
import io.confluent.ksql.name.SourceName;
import io.confluent.ksql.parser.KsqlParser.PreparedStatement;
import io.confluent.ksql.parser.tree.InsertValues;
import io.confluent.ksql.schema.ksql.Column;
import io.confluent.ksql.schema.ksql.ColumnRef;
import io.confluent.ksql.schema.ksql.LogicalSchema;
import io.confluent.ksql.schema.ksql.PersistenceSchema;
import io.confluent.ksql.schema.ksql.types.SqlArray;
import io.confluent.ksql.schema.ksql.types.SqlMap;
import io.confluent.ksql.schema.ksql.types.SqlTypes;
import io.confluent.ksql.serde.Format;
import io.confluent.ksql.serde.FormatInfo;
Expand Down Expand Up @@ -101,6 +106,15 @@ public class InsertValuesExecutorTest {
.valueColumn(COL0, SqlTypes.STRING)
.build();

private static final LogicalSchema SINGLE_ARRAY_SCHEMA = LogicalSchema.builder()
.valueColumn(ColumnName.of("COL0"), SqlArray.of(SqlTypes.INTEGER))
.build();

private static final LogicalSchema SINGLE_MAP_SCHEMA = LogicalSchema.builder()
.valueColumn(ColumnName.of("COL0"), SqlMap.of(SqlTypes.INTEGER))
.build();


private static final LogicalSchema SCHEMA = LogicalSchema.builder()
.valueColumn(COL0, SqlTypes.STRING)
.valueColumn(ColumnName.of("COL1"), SqlTypes.BIGINT)
Expand Down Expand Up @@ -434,6 +448,74 @@ public void shouldHandleNullKeyForSourceWithKeyField() {
verify(producer).send(new ProducerRecord<>(TOPIC_NAME, null, 1L, KEY, VALUE));
}

@Test
public void shouldHandleNegativeValueExpression() {
// Given:
givenSourceStreamWithSchema(SCHEMA, SerdeOption.none(), Optional.of(ColumnName.of("COL0")));

final ConfiguredStatement<InsertValues> statement = givenInsertValuesStrings(
ImmutableList.of("COL0", "COL1"),
ImmutableList.of(
new StringLiteral("str"),
ArithmeticUnaryExpression.negative(Optional.empty(), new LongLiteral(1))
)
);

// When:
executor.execute(statement, ImmutableMap.of(), engine, serviceContext);

// Then:
verify(keySerializer).serialize(TOPIC_NAME, keyStruct("str"));
verify(valueSerializer).serialize(TOPIC_NAME, new GenericRow(ImmutableList.of("str", -1L)));
verify(producer).send(new ProducerRecord<>(TOPIC_NAME, null, 1L, KEY, VALUE));
}

@Test
public void shouldHandleUdfs() {
// Given:
givenSourceStreamWithSchema(SINGLE_ARRAY_SCHEMA, SerdeOption.none(), Optional.empty());

final ConfiguredStatement<InsertValues> statement = givenInsertValuesStrings(
ImmutableList.of("COL0"),
ImmutableList.of(
new FunctionCall(
FunctionName.of("AS_ARRAY"),
ImmutableList.of(new IntegerLiteral(1), new IntegerLiteral(2))))
);

// When:
executor.execute(statement, ImmutableMap.of(), engine, serviceContext);

// Then:
verify(valueSerializer).serialize(TOPIC_NAME, new GenericRow(ImmutableList.of(ImmutableList.of(1, 2))));
verify(producer).send(new ProducerRecord<>(TOPIC_NAME, null, 1L, KEY, VALUE));
}

@Test
public void shouldHandleNestedUdfs() {
// Given:
givenSourceStreamWithSchema(SINGLE_MAP_SCHEMA, SerdeOption.none(), Optional.empty());

final ConfiguredStatement<InsertValues> statement = givenInsertValuesStrings(
ImmutableList.of("COL0"),
ImmutableList.of(
new FunctionCall(
FunctionName.of("AS_MAP"),
ImmutableList.of(
new FunctionCall(FunctionName.of("AS_ARRAY"), ImmutableList.of(new StringLiteral("foo"))),
new FunctionCall(FunctionName.of("AS_ARRAY"), ImmutableList.of(new IntegerLiteral(1)))
))
)
);

// When:
executor.execute(statement, ImmutableMap.of(), engine, serviceContext);

// Then:
verify(valueSerializer).serialize(TOPIC_NAME, new GenericRow(ImmutableList.of(ImmutableMap.of("foo", 1))));
verify(producer).send(new ProducerRecord<>(TOPIC_NAME, null, 1L, KEY, VALUE));
}

@Test
public void shouldAllowUpcast() {
// Given:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -245,6 +245,18 @@
"outputs": [
{"topic": "test_topic", "key": null, "value": {"I": 1, "BI": 2, "D": 3.0}}
]
},
{
"name": "should handle arbitrary expressions",
"statements": [
"CREATE STREAM TEST (I INT, A ARRAY<INT>) WITH (kafka_topic='test_topic', value_format='JSON');",
"INSERT INTO TEST (I, A) VALUES (-1, AS_ARRAY(1, 1 + 1, 3));"
],
"inputs": [
],
"outputs": [
{"topic": "test_topic", "key": null, "value": {"I": -1, "A": [1, 2, 3]}}
]
}
]
}
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ groupingExpressions
;

values
: '(' (literal (',' literal)*)? ')'
: '(' (valueExpression (',' valueExpression)*)? ')'
;

/*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -352,7 +352,7 @@ public Node visitInsertValues(final InsertValuesContext context) {
targetLocation,
SourceName.of(targetName),
columns,
visit(context.values().literal(), Expression.class));
visit(context.values().valueExpression(), Expression.class));
}

@Override
Expand Down
Loading