diff --git a/ksql-cli/src/main/java/io/confluent/ksql/cli/console/Console.java b/ksql-cli/src/main/java/io/confluent/ksql/cli/console/Console.java index eb3eb925f68f..d985cb2bbcad 100644 --- a/ksql-cli/src/main/java/io/confluent/ksql/cli/console/Console.java +++ b/ksql-cli/src/main/java/io/confluent/ksql/cli/console/Console.java @@ -62,6 +62,7 @@ import io.confluent.ksql.rest.entity.KsqlErrorMessage; import io.confluent.ksql.rest.entity.KsqlStatementErrorMessage; import io.confluent.ksql.rest.entity.KsqlWarning; +import io.confluent.ksql.rest.entity.MessageEntity; import io.confluent.ksql.rest.entity.PropertiesList; import io.confluent.ksql.rest.entity.Queries; import io.confluent.ksql.rest.entity.QueryDescription; @@ -166,6 +167,7 @@ public class Console implements Closeable { tablePrinter(ErrorEntity.class, ErrorEntityTableBuilder::new)) .put(QueryResultEntity.class, tablePrinter(QueryResultEntity.class, QueryResultTableBuilder::new)) + .put(MessageEntity.class, Console::printMessage) .build(); private static Handler1 tablePrinter( @@ -787,6 +789,12 @@ private void printAsJson(final Object o) { } } + private void printMessage(final MessageEntity message) { + if (message.getMessage().isPresent()) { + writer().println(message.getMessage().get()); + } + } + static class NoOpRowCaptor implements RowCaptor { @Override public void addRow(final GenericRow row) { diff --git a/ksql-cli/src/test/java/io/confluent/ksql/cli/CliTest.java b/ksql-cli/src/test/java/io/confluent/ksql/cli/CliTest.java index 8946999ed2a4..c93ec446c565 100644 --- a/ksql-cli/src/test/java/io/confluent/ksql/cli/CliTest.java +++ b/ksql-cli/src/test/java/io/confluent/ksql/cli/CliTest.java @@ -246,6 +246,9 @@ public void tearDown() { System.out.println("[Terminal Output]"); System.out.println(terminal.getOutputString()); + dropStream(streamName); + dropTable(tableName); + localCli.close(); console.close(); } @@ -277,8 +280,6 @@ private void testCreateStreamAsSelect( final Map results = topicConsumer .readResults(streamName, resultSchema, expectedResults.size(), new StringDeserializer()); - dropStream(streamName); - assertThat(results, equalTo(expectedResults)); } @@ -698,7 +699,6 @@ public void testCreateTable() { isRow(is("Parsing statement")), isRow(is("Executing statement")))); - dropTable(tableName); } @Test @@ -859,6 +859,29 @@ public void shouldDescribeAggregateFunction() throws Exception { assertThat(output, containsString(expectedVariant)); } + @Test + public void shouldDisplayInsertedKeyValue() throws Exception { + + final String message = + "Inserted:\n" + + "key:null\n" + + "value:[ 294 | 8.1 ]"; + + // Given: + final String line = + "CREATE STREAM " + streamName + "(id INT, rating DOUBLE) WITH " + + "(kafka_topic='ratings', partitions=1, value_format='json');\n" + + "INSERT INTO " + streamName + "(id, rating) VALUES (294, 8.1);"; + + // When: + localCli.handleLine(line); + + final String output = terminal.getOutputString(); + + // Then: + assertThat(output, containsString(message)); + } + @Test public void shouldPrintErrorIfCantFindFunction() throws Exception { localCli.handleLine("describe function foobar;"); diff --git a/ksql-engine/src/main/java/io/confluent/ksql/engine/InsertValuesExecutor.java b/ksql-engine/src/main/java/io/confluent/ksql/engine/InsertValuesExecutor.java index 01ee6835ea47..18c20cc43ec4 100644 --- a/ksql-engine/src/main/java/io/confluent/ksql/engine/InsertValuesExecutor.java +++ b/ksql-engine/src/main/java/io/confluent/ksql/engine/InsertValuesExecutor.java @@ -131,7 +131,7 @@ private InsertValuesExecutor( this.valueSerdeFactory = Objects.requireNonNull(valueSerdeFactory, "valueSerdeFactory"); } - public void execute( + public Optional execute( final ConfiguredStatement statement, final KsqlExecutionContext executionContext, final ServiceContext serviceContext @@ -140,11 +140,14 @@ public void execute( final KsqlConfig config = statement.getConfig() .cloneWithPropertyOverwrite(statement.getOverrides()); - final ProducerRecord record = + final ProducerRecordInfo producerRecordInfo = buildRecord(statement, executionContext, serviceContext); try { - producer.sendRecord(record, serviceContext, config.getProducerClientConfigProps()); + producer.sendRecord(producerRecordInfo.record, + serviceContext, config.getProducerClientConfigProps()); + + return Optional.of(producerRecordInfo.toString()); } catch (final TopicAuthorizationException e) { // TopicAuthorizationException does not give much detailed information about why it failed, // except which topics are denied. Here we just add the ACL to make the error message @@ -160,7 +163,7 @@ public void execute( } } - private ProducerRecord buildRecord( + private ProducerRecordInfo buildRecord( final ConfiguredStatement statement, final KsqlExecutionContext executionContext, final ServiceContext serviceContext @@ -188,16 +191,15 @@ private ProducerRecord buildRecord( final RowData row = extractRow(insertValues, dataSource); final byte[] key = serializeKey(row.key, dataSource, config, serviceContext); final byte[] value = serializeValue(row.value, dataSource, config, serviceContext); - final String topicName = dataSource.getKafkaTopicName(); - return new ProducerRecord<>( + return new ProducerRecordInfo(row, new ProducerRecord<>( topicName, null, row.ts, key, value - ); + )); } catch (Exception e) { throw new KsqlStatementException( createInsertFailedExceptionMessage(insertValues) + " " + e.getMessage(), @@ -465,6 +467,26 @@ private RowData(final long ts, final Struct key, final GenericRow value) { } } + private static class ProducerRecordInfo { + final RowData row; + final ProducerRecord record; + + ProducerRecordInfo(final RowData row, + final ProducerRecord record) { + this.row = Objects.requireNonNull(row); + this.record = Objects.requireNonNull(record); + } + + @Override + public String toString() { + return String.format( + "Inserted:%nkey:%s%nvalue:%s%n", + row.key.get("ROWKEY"), + row.value + ); + } + } + private static class ExpressionResolver extends VisitParentExpressionVisitor { private final SqlType fieldType; diff --git a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/execution/CustomExecutors.java b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/execution/CustomExecutors.java index d4688895bf41..22f5421b08bc 100644 --- a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/execution/CustomExecutors.java +++ b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/execution/CustomExecutors.java @@ -38,6 +38,7 @@ import io.confluent.ksql.parser.tree.Statement; import io.confluent.ksql.parser.tree.UnsetProperty; import io.confluent.ksql.rest.entity.KsqlEntity; +import io.confluent.ksql.rest.entity.MessageEntity; import io.confluent.ksql.services.ServiceContext; import io.confluent.ksql.statement.ConfiguredStatement; import java.util.EnumSet; @@ -114,10 +115,10 @@ public Optional execute( private static StatementExecutor insertValuesExecutor() { final InsertValuesExecutor executor = new InsertValuesExecutor(); - return (statement, executionContext, serviceContext) -> { - executor.execute(statement, executionContext, serviceContext); - return Optional.empty(); + final Optional res = executor.execute(statement, executionContext, serviceContext); + return res.map(msg -> new MessageEntity(statement.getStatementText(), + null, Optional.of(msg))); }; } } diff --git a/ksql-rest-model/src/main/java/io/confluent/ksql/rest/entity/KsqlEntity.java b/ksql-rest-model/src/main/java/io/confluent/ksql/rest/entity/KsqlEntity.java index b89d7be63985..5f6b2a074f40 100644 --- a/ksql-rest-model/src/main/java/io/confluent/ksql/rest/entity/KsqlEntity.java +++ b/ksql-rest-model/src/main/java/io/confluent/ksql/rest/entity/KsqlEntity.java @@ -49,7 +49,8 @@ @JsonSubTypes.Type(value = ConnectorDescription.class, name = "connector_description"), @JsonSubTypes.Type(value = TypeList.class, name = "type_list"), @JsonSubTypes.Type(value = ErrorEntity.class, name = "error_entity"), - @JsonSubTypes.Type(value = QueryResultEntity.class, name = "row") + @JsonSubTypes.Type(value = QueryResultEntity.class, name = "row"), + @JsonSubTypes.Type(value = MessageEntity.class, name = "message") }) public abstract class KsqlEntity { private final String statementText; diff --git a/ksql-rest-model/src/main/java/io/confluent/ksql/rest/entity/MessageEntity.java b/ksql-rest-model/src/main/java/io/confluent/ksql/rest/entity/MessageEntity.java new file mode 100644 index 000000000000..3be33b9eb5c4 --- /dev/null +++ b/ksql-rest-model/src/main/java/io/confluent/ksql/rest/entity/MessageEntity.java @@ -0,0 +1,59 @@ +/* + * Copyright 2018 Confluent Inc. + * + * Licensed under the Confluent Community License (the "License"); you may not use + * this file except in compliance with the License. You may obtain a copy of the + * License at + * + * http://www.confluent.io/confluent-community-license + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package io.confluent.ksql.rest.entity; + +import com.fasterxml.jackson.annotation.JsonIgnoreProperties; +import com.fasterxml.jackson.annotation.JsonProperty; +import java.util.List; +import java.util.Objects; +import java.util.Optional; + +@JsonIgnoreProperties(ignoreUnknown = true) +public class MessageEntity extends KsqlEntity { + + private final Optional message; + + public MessageEntity( + @JsonProperty("statementText") final String statementText, + @JsonProperty("warnings") final List warnings, + @JsonProperty("message") final Optional message) { + super(statementText, warnings); + this.message = Objects.requireNonNull(message); + } + + public Optional getMessage() { + return message; + } + + + @Override + public boolean equals(final Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + final MessageEntity that = (MessageEntity) o; + return message.equals(that.message); + } + + @Override + public int hashCode() { + return Objects.hash(message); + } +} +