From ff91c096a74fa9b2de74d86c132a4a3d9a3f1d59 Mon Sep 17 00:00:00 2001 From: Tim Fox Date: Wed, 18 Sep 2019 13:17:37 +0100 Subject: [PATCH 1/2] fix:3349 Output to console the key and value that was inserted by a INSERT INTO --- .../confluent/ksql/cli/console/Console.java | 8 +++ .../java/io/confluent/ksql/cli/CliTest.java | 22 +++++++ .../ksql/engine/InsertValuesExecutor.java | 32 +++++++--- .../server/execution/CustomExecutors.java | 6 +- .../ksql/rest/entity/KsqlEntity.java | 3 +- .../ksql/rest/entity/MessageEntity.java | 59 +++++++++++++++++++ 6 files changed, 120 insertions(+), 10 deletions(-) create mode 100644 ksql-rest-model/src/main/java/io/confluent/ksql/rest/entity/MessageEntity.java diff --git a/ksql-cli/src/main/java/io/confluent/ksql/cli/console/Console.java b/ksql-cli/src/main/java/io/confluent/ksql/cli/console/Console.java index eb3eb925f68f..d985cb2bbcad 100644 --- a/ksql-cli/src/main/java/io/confluent/ksql/cli/console/Console.java +++ b/ksql-cli/src/main/java/io/confluent/ksql/cli/console/Console.java @@ -62,6 +62,7 @@ import io.confluent.ksql.rest.entity.KsqlErrorMessage; import io.confluent.ksql.rest.entity.KsqlStatementErrorMessage; import io.confluent.ksql.rest.entity.KsqlWarning; +import io.confluent.ksql.rest.entity.MessageEntity; import io.confluent.ksql.rest.entity.PropertiesList; import io.confluent.ksql.rest.entity.Queries; import io.confluent.ksql.rest.entity.QueryDescription; @@ -166,6 +167,7 @@ public class Console implements Closeable { tablePrinter(ErrorEntity.class, ErrorEntityTableBuilder::new)) .put(QueryResultEntity.class, tablePrinter(QueryResultEntity.class, QueryResultTableBuilder::new)) + .put(MessageEntity.class, Console::printMessage) .build(); private static Handler1 tablePrinter( @@ -787,6 +789,12 @@ private void printAsJson(final Object o) { } } + private void printMessage(final MessageEntity message) { + if (message.getMessage().isPresent()) { + writer().println(message.getMessage().get()); + } + } + static class NoOpRowCaptor implements RowCaptor { @Override public void addRow(final GenericRow row) { diff --git a/ksql-cli/src/test/java/io/confluent/ksql/cli/CliTest.java b/ksql-cli/src/test/java/io/confluent/ksql/cli/CliTest.java index 8946999ed2a4..d6a24c7ed795 100644 --- a/ksql-cli/src/test/java/io/confluent/ksql/cli/CliTest.java +++ b/ksql-cli/src/test/java/io/confluent/ksql/cli/CliTest.java @@ -859,6 +859,28 @@ public void shouldDescribeAggregateFunction() throws Exception { assertThat(output, containsString(expectedVariant)); } + @Test + public void shouldDisplayInsertedKeyValue() throws Exception { + + final String message = String.format( + "Inserted:%nkey:%s%nvalue:%s%n", + "null", + "[ 294 | 8.1 ]" + ); + + final String line = + "CREATE STREAM " + streamName + "(id INT, rating DOUBLE) WITH " + + "(kafka_topic='ratings', partitions=1, value_format='json');\n" + + "INSERT INTO " + streamName + "(id, rating) VALUES (294, 8.1);"; + + localCli.handleLine(line); + + dropStream(streamName); + + final String output = terminal.getOutputString(); + assertThat(output, containsString(message)); + } + @Test public void shouldPrintErrorIfCantFindFunction() throws Exception { localCli.handleLine("describe function foobar;"); diff --git a/ksql-engine/src/main/java/io/confluent/ksql/engine/InsertValuesExecutor.java b/ksql-engine/src/main/java/io/confluent/ksql/engine/InsertValuesExecutor.java index 01ee6835ea47..c8560ee6cfe6 100644 --- a/ksql-engine/src/main/java/io/confluent/ksql/engine/InsertValuesExecutor.java +++ b/ksql-engine/src/main/java/io/confluent/ksql/engine/InsertValuesExecutor.java @@ -131,7 +131,7 @@ private InsertValuesExecutor( this.valueSerdeFactory = Objects.requireNonNull(valueSerdeFactory, "valueSerdeFactory"); } - public void execute( + public Optional execute( final ConfiguredStatement statement, final KsqlExecutionContext executionContext, final ServiceContext serviceContext @@ -140,11 +140,12 @@ public void execute( final KsqlConfig config = statement.getConfig() .cloneWithPropertyOverwrite(statement.getOverrides()); - final ProducerRecord record = + final ProducerRecordInfo producerRecordInfo = buildRecord(statement, executionContext, serviceContext); try { - producer.sendRecord(record, serviceContext, config.getProducerClientConfigProps()); + producer.sendRecord(producerRecordInfo.record, + serviceContext, config.getProducerClientConfigProps()); } catch (final TopicAuthorizationException e) { // TopicAuthorizationException does not give much detailed information about why it failed, // except which topics are denied. Here we just add the ACL to make the error message @@ -158,9 +159,16 @@ public void execute( } catch (final Exception e) { throw new KsqlException(createInsertFailedExceptionMessage(insertValues), e); } + + final String message = String.format( + "Inserted:%nkey:%s%nvalue:%s%n", + producerRecordInfo.row.key.get("ROWKEY"), + producerRecordInfo.row.value + ); + return Optional.of(message); } - private ProducerRecord buildRecord( + private ProducerRecordInfo buildRecord( final ConfiguredStatement statement, final KsqlExecutionContext executionContext, final ServiceContext serviceContext @@ -188,16 +196,15 @@ private ProducerRecord buildRecord( final RowData row = extractRow(insertValues, dataSource); final byte[] key = serializeKey(row.key, dataSource, config, serviceContext); final byte[] value = serializeValue(row.value, dataSource, config, serviceContext); - final String topicName = dataSource.getKafkaTopicName(); - return new ProducerRecord<>( + return new ProducerRecordInfo(row, new ProducerRecord<>( topicName, null, row.ts, key, value - ); + )); } catch (Exception e) { throw new KsqlStatementException( createInsertFailedExceptionMessage(insertValues) + " " + e.getMessage(), @@ -465,6 +472,17 @@ private RowData(final long ts, final Struct key, final GenericRow value) { } } + private static class ProducerRecordInfo { + final RowData row; + final ProducerRecord record; + + ProducerRecordInfo(final RowData row, + final ProducerRecord record) { + this.row = row; + this.record = record; + } + } + private static class ExpressionResolver extends VisitParentExpressionVisitor { private final SqlType fieldType; diff --git a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/execution/CustomExecutors.java b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/execution/CustomExecutors.java index d4688895bf41..a84a04f858b9 100644 --- a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/execution/CustomExecutors.java +++ b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/execution/CustomExecutors.java @@ -38,6 +38,7 @@ import io.confluent.ksql.parser.tree.Statement; import io.confluent.ksql.parser.tree.UnsetProperty; import io.confluent.ksql.rest.entity.KsqlEntity; +import io.confluent.ksql.rest.entity.MessageEntity; import io.confluent.ksql.services.ServiceContext; import io.confluent.ksql.statement.ConfiguredStatement; import java.util.EnumSet; @@ -116,8 +117,9 @@ private static StatementExecutor insertValuesExecutor() { final InsertValuesExecutor executor = new InsertValuesExecutor(); return (statement, executionContext, serviceContext) -> { - executor.execute(statement, executionContext, serviceContext); - return Optional.empty(); + final Optional message = + executor.execute(statement, executionContext, serviceContext); + return Optional.of(new MessageEntity(statement.getStatementText(), null, message)); }; } } diff --git a/ksql-rest-model/src/main/java/io/confluent/ksql/rest/entity/KsqlEntity.java b/ksql-rest-model/src/main/java/io/confluent/ksql/rest/entity/KsqlEntity.java index b89d7be63985..5f6b2a074f40 100644 --- a/ksql-rest-model/src/main/java/io/confluent/ksql/rest/entity/KsqlEntity.java +++ b/ksql-rest-model/src/main/java/io/confluent/ksql/rest/entity/KsqlEntity.java @@ -49,7 +49,8 @@ @JsonSubTypes.Type(value = ConnectorDescription.class, name = "connector_description"), @JsonSubTypes.Type(value = TypeList.class, name = "type_list"), @JsonSubTypes.Type(value = ErrorEntity.class, name = "error_entity"), - @JsonSubTypes.Type(value = QueryResultEntity.class, name = "row") + @JsonSubTypes.Type(value = QueryResultEntity.class, name = "row"), + @JsonSubTypes.Type(value = MessageEntity.class, name = "message") }) public abstract class KsqlEntity { private final String statementText; diff --git a/ksql-rest-model/src/main/java/io/confluent/ksql/rest/entity/MessageEntity.java b/ksql-rest-model/src/main/java/io/confluent/ksql/rest/entity/MessageEntity.java new file mode 100644 index 000000000000..5518610c6de3 --- /dev/null +++ b/ksql-rest-model/src/main/java/io/confluent/ksql/rest/entity/MessageEntity.java @@ -0,0 +1,59 @@ +/* + * Copyright 2018 Confluent Inc. + * + * Licensed under the Confluent Community License (the "License"); you may not use + * this file except in compliance with the License. You may obtain a copy of the + * License at + * + * http://www.confluent.io/confluent-community-license + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package io.confluent.ksql.rest.entity; + +import com.fasterxml.jackson.annotation.JsonIgnoreProperties; +import com.fasterxml.jackson.annotation.JsonProperty; +import java.util.List; +import java.util.Objects; +import java.util.Optional; + +@JsonIgnoreProperties(ignoreUnknown = true) +public class MessageEntity extends KsqlEntity { + + private final Optional message; + + public MessageEntity( + @JsonProperty("statementText") final String statementText, + @JsonProperty("warnings") final List warnings, + @JsonProperty("message") final Optional message) { + super(statementText, warnings); + Objects.requireNonNull(message); + this.message = message; + } + + public Optional getMessage() { + return message; + } + + @Override + public boolean equals(final Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + final MessageEntity that = (MessageEntity) o; + return message.equals(that.message); + } + + @Override + public int hashCode() { + return Objects.hash(message); + } +} + From 92fb8e2ad7c7859aa6e7c5386a0d93151cf3014c Mon Sep 17 00:00:00 2001 From: Tim Fox Date: Wed, 18 Sep 2019 18:19:33 +0100 Subject: [PATCH 2/2] fix:nits --- .../java/io/confluent/ksql/cli/CliTest.java | 21 +++++++++--------- .../ksql/engine/InsertValuesExecutor.java | 22 +++++++++++-------- .../server/execution/CustomExecutors.java | 7 +++--- .../ksql/rest/entity/MessageEntity.java | 4 ++-- 4 files changed, 29 insertions(+), 25 deletions(-) diff --git a/ksql-cli/src/test/java/io/confluent/ksql/cli/CliTest.java b/ksql-cli/src/test/java/io/confluent/ksql/cli/CliTest.java index d6a24c7ed795..c93ec446c565 100644 --- a/ksql-cli/src/test/java/io/confluent/ksql/cli/CliTest.java +++ b/ksql-cli/src/test/java/io/confluent/ksql/cli/CliTest.java @@ -246,6 +246,9 @@ public void tearDown() { System.out.println("[Terminal Output]"); System.out.println(terminal.getOutputString()); + dropStream(streamName); + dropTable(tableName); + localCli.close(); console.close(); } @@ -277,8 +280,6 @@ private void testCreateStreamAsSelect( final Map results = topicConsumer .readResults(streamName, resultSchema, expectedResults.size(), new StringDeserializer()); - dropStream(streamName); - assertThat(results, equalTo(expectedResults)); } @@ -698,7 +699,6 @@ public void testCreateTable() { isRow(is("Parsing statement")), isRow(is("Executing statement")))); - dropTable(tableName); } @Test @@ -862,22 +862,23 @@ public void shouldDescribeAggregateFunction() throws Exception { @Test public void shouldDisplayInsertedKeyValue() throws Exception { - final String message = String.format( - "Inserted:%nkey:%s%nvalue:%s%n", - "null", - "[ 294 | 8.1 ]" - ); + final String message = + "Inserted:\n" + + "key:null\n" + + "value:[ 294 | 8.1 ]"; + // Given: final String line = "CREATE STREAM " + streamName + "(id INT, rating DOUBLE) WITH " + "(kafka_topic='ratings', partitions=1, value_format='json');\n" + "INSERT INTO " + streamName + "(id, rating) VALUES (294, 8.1);"; + // When: localCli.handleLine(line); - dropStream(streamName); - final String output = terminal.getOutputString(); + + // Then: assertThat(output, containsString(message)); } diff --git a/ksql-engine/src/main/java/io/confluent/ksql/engine/InsertValuesExecutor.java b/ksql-engine/src/main/java/io/confluent/ksql/engine/InsertValuesExecutor.java index c8560ee6cfe6..18c20cc43ec4 100644 --- a/ksql-engine/src/main/java/io/confluent/ksql/engine/InsertValuesExecutor.java +++ b/ksql-engine/src/main/java/io/confluent/ksql/engine/InsertValuesExecutor.java @@ -146,6 +146,8 @@ public Optional execute( try { producer.sendRecord(producerRecordInfo.record, serviceContext, config.getProducerClientConfigProps()); + + return Optional.of(producerRecordInfo.toString()); } catch (final TopicAuthorizationException e) { // TopicAuthorizationException does not give much detailed information about why it failed, // except which topics are denied. Here we just add the ACL to make the error message @@ -159,13 +161,6 @@ public Optional execute( } catch (final Exception e) { throw new KsqlException(createInsertFailedExceptionMessage(insertValues), e); } - - final String message = String.format( - "Inserted:%nkey:%s%nvalue:%s%n", - producerRecordInfo.row.key.get("ROWKEY"), - producerRecordInfo.row.value - ); - return Optional.of(message); } private ProducerRecordInfo buildRecord( @@ -478,8 +473,17 @@ private static class ProducerRecordInfo { ProducerRecordInfo(final RowData row, final ProducerRecord record) { - this.row = row; - this.record = record; + this.row = Objects.requireNonNull(row); + this.record = Objects.requireNonNull(record); + } + + @Override + public String toString() { + return String.format( + "Inserted:%nkey:%s%nvalue:%s%n", + row.key.get("ROWKEY"), + row.value + ); } } diff --git a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/execution/CustomExecutors.java b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/execution/CustomExecutors.java index a84a04f858b9..22f5421b08bc 100644 --- a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/execution/CustomExecutors.java +++ b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/execution/CustomExecutors.java @@ -115,11 +115,10 @@ public Optional execute( private static StatementExecutor insertValuesExecutor() { final InsertValuesExecutor executor = new InsertValuesExecutor(); - return (statement, executionContext, serviceContext) -> { - final Optional message = - executor.execute(statement, executionContext, serviceContext); - return Optional.of(new MessageEntity(statement.getStatementText(), null, message)); + final Optional res = executor.execute(statement, executionContext, serviceContext); + return res.map(msg -> new MessageEntity(statement.getStatementText(), + null, Optional.of(msg))); }; } } diff --git a/ksql-rest-model/src/main/java/io/confluent/ksql/rest/entity/MessageEntity.java b/ksql-rest-model/src/main/java/io/confluent/ksql/rest/entity/MessageEntity.java index 5518610c6de3..3be33b9eb5c4 100644 --- a/ksql-rest-model/src/main/java/io/confluent/ksql/rest/entity/MessageEntity.java +++ b/ksql-rest-model/src/main/java/io/confluent/ksql/rest/entity/MessageEntity.java @@ -31,14 +31,14 @@ public MessageEntity( @JsonProperty("warnings") final List warnings, @JsonProperty("message") final Optional message) { super(statementText, warnings); - Objects.requireNonNull(message); - this.message = message; + this.message = Objects.requireNonNull(message); } public Optional getMessage() { return message; } + @Override public boolean equals(final Object o) { if (this == o) {