Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix:Output to console the key and value that was inserted by a I… #3377

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@
import io.confluent.ksql.rest.entity.KsqlErrorMessage;
import io.confluent.ksql.rest.entity.KsqlStatementErrorMessage;
import io.confluent.ksql.rest.entity.KsqlWarning;
import io.confluent.ksql.rest.entity.MessageEntity;
import io.confluent.ksql.rest.entity.PropertiesList;
import io.confluent.ksql.rest.entity.Queries;
import io.confluent.ksql.rest.entity.QueryDescription;
Expand Down Expand Up @@ -166,6 +167,7 @@ public class Console implements Closeable {
tablePrinter(ErrorEntity.class, ErrorEntityTableBuilder::new))
.put(QueryResultEntity.class,
tablePrinter(QueryResultEntity.class, QueryResultTableBuilder::new))
.put(MessageEntity.class, Console::printMessage)
.build();

private static <T extends KsqlEntity> Handler1<KsqlEntity, Console> tablePrinter(
Expand Down Expand Up @@ -787,6 +789,12 @@ private void printAsJson(final Object o) {
}
}

private void printMessage(final MessageEntity message) {
if (message.getMessage().isPresent()) {
writer().println(message.getMessage().get());
}
}

static class NoOpRowCaptor implements RowCaptor {
@Override
public void addRow(final GenericRow row) {
Expand Down
29 changes: 26 additions & 3 deletions ksql-cli/src/test/java/io/confluent/ksql/cli/CliTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -246,6 +246,9 @@ public void tearDown() {
System.out.println("[Terminal Output]");
System.out.println(terminal.getOutputString());

dropStream(streamName);
dropTable(tableName);

localCli.close();
console.close();
}
Expand Down Expand Up @@ -277,8 +280,6 @@ private void testCreateStreamAsSelect(
final Map<String, GenericRow> results = topicConsumer
.readResults(streamName, resultSchema, expectedResults.size(), new StringDeserializer());

dropStream(streamName);

assertThat(results, equalTo(expectedResults));
}

Expand Down Expand Up @@ -698,7 +699,6 @@ public void testCreateTable() {
isRow(is("Parsing statement")),
isRow(is("Executing statement"))));

dropTable(tableName);
}

@Test
Expand Down Expand Up @@ -859,6 +859,29 @@ public void shouldDescribeAggregateFunction() throws Exception {
assertThat(output, containsString(expectedVariant));
}

@Test
public void shouldDisplayInsertedKeyValue() throws Exception {

final String message =
"Inserted:\n"
+ "key:null\n"
+ "value:[ 294 | 8.1 ]";

// Given:
final String line =
"CREATE STREAM " + streamName + "(id INT, rating DOUBLE) WITH "
+ "(kafka_topic='ratings', partitions=1, value_format='json');\n"
+ "INSERT INTO " + streamName + "(id, rating) VALUES (294, 8.1);";

// When:
localCli.handleLine(line);

final String output = terminal.getOutputString();

// Then:
assertThat(output, containsString(message));
}

@Test
public void shouldPrintErrorIfCantFindFunction() throws Exception {
localCli.handleLine("describe function foobar;");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ private InsertValuesExecutor(
this.valueSerdeFactory = Objects.requireNonNull(valueSerdeFactory, "valueSerdeFactory");
}

public void execute(
public Optional<String> execute(
final ConfiguredStatement<InsertValues> statement,
final KsqlExecutionContext executionContext,
final ServiceContext serviceContext
Expand All @@ -140,11 +140,14 @@ public void execute(
final KsqlConfig config = statement.getConfig()
.cloneWithPropertyOverwrite(statement.getOverrides());

final ProducerRecord<byte[], byte[]> record =
final ProducerRecordInfo producerRecordInfo =
buildRecord(statement, executionContext, serviceContext);

try {
producer.sendRecord(record, serviceContext, config.getProducerClientConfigProps());
producer.sendRecord(producerRecordInfo.record,
serviceContext, config.getProducerClientConfigProps());

return Optional.of(producerRecordInfo.toString());
} catch (final TopicAuthorizationException e) {
// TopicAuthorizationException does not give much detailed information about why it failed,
// except which topics are denied. Here we just add the ACL to make the error message
Expand All @@ -160,7 +163,7 @@ public void execute(
}
}

private ProducerRecord<byte[], byte[]> buildRecord(
private ProducerRecordInfo buildRecord(
final ConfiguredStatement<InsertValues> statement,
final KsqlExecutionContext executionContext,
final ServiceContext serviceContext
Expand Down Expand Up @@ -188,16 +191,15 @@ private ProducerRecord<byte[], byte[]> buildRecord(
final RowData row = extractRow(insertValues, dataSource);
final byte[] key = serializeKey(row.key, dataSource, config, serviceContext);
final byte[] value = serializeValue(row.value, dataSource, config, serviceContext);

final String topicName = dataSource.getKafkaTopicName();

return new ProducerRecord<>(
return new ProducerRecordInfo(row, new ProducerRecord<>(
topicName,
null,
row.ts,
key,
value
);
));
} catch (Exception e) {
throw new KsqlStatementException(
createInsertFailedExceptionMessage(insertValues) + " " + e.getMessage(),
Expand Down Expand Up @@ -465,6 +467,26 @@ private RowData(final long ts, final Struct key, final GenericRow value) {
}
}

private static class ProducerRecordInfo {
final RowData row;
final ProducerRecord<byte[], byte[]> record;

ProducerRecordInfo(final RowData row,
final ProducerRecord<byte[], byte[]> record) {
this.row = Objects.requireNonNull(row);
this.record = Objects.requireNonNull(record);
}

@Override
public String toString() {
return String.format(
"Inserted:%nkey:%s%nvalue:%s%n",
row.key.get("ROWKEY"),
row.value
);
}
}

private static class ExpressionResolver extends VisitParentExpressionVisitor<Object, Void> {

private final SqlType fieldType;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import io.confluent.ksql.parser.tree.Statement;
import io.confluent.ksql.parser.tree.UnsetProperty;
import io.confluent.ksql.rest.entity.KsqlEntity;
import io.confluent.ksql.rest.entity.MessageEntity;
import io.confluent.ksql.services.ServiceContext;
import io.confluent.ksql.statement.ConfiguredStatement;
import java.util.EnumSet;
Expand Down Expand Up @@ -114,10 +115,10 @@ public Optional<KsqlEntity> execute(

private static StatementExecutor insertValuesExecutor() {
final InsertValuesExecutor executor = new InsertValuesExecutor();

return (statement, executionContext, serviceContext) -> {
executor.execute(statement, executionContext, serviceContext);
return Optional.empty();
final Optional<String> res = executor.execute(statement, executionContext, serviceContext);
return res.map(msg -> new MessageEntity(statement.getStatementText(),
null, Optional.of(msg)));
};
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,8 @@
@JsonSubTypes.Type(value = ConnectorDescription.class, name = "connector_description"),
@JsonSubTypes.Type(value = TypeList.class, name = "type_list"),
@JsonSubTypes.Type(value = ErrorEntity.class, name = "error_entity"),
@JsonSubTypes.Type(value = QueryResultEntity.class, name = "row")
@JsonSubTypes.Type(value = QueryResultEntity.class, name = "row"),
@JsonSubTypes.Type(value = MessageEntity.class, name = "message")
})
public abstract class KsqlEntity {
private final String statementText;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
/*
* Copyright 2018 Confluent Inc.
*
* Licensed under the Confluent Community License (the "License"); you may not use
* this file except in compliance with the License. You may obtain a copy of the
* License at
*
* http://www.confluent.io/confluent-community-license
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/

package io.confluent.ksql.rest.entity;

import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import com.fasterxml.jackson.annotation.JsonProperty;
import java.util.List;
import java.util.Objects;
import java.util.Optional;

@JsonIgnoreProperties(ignoreUnknown = true)
public class MessageEntity extends KsqlEntity {

private final Optional<String> message;

public MessageEntity(
@JsonProperty("statementText") final String statementText,
@JsonProperty("warnings") final List<KsqlWarning> warnings,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't accept warnings if you're only passing null / empty list. Just remove the constructor param and pass an empty list to the super class.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't have warnings for insert into, but I left it in there as MessageEntity could be used by a different endpoint which does produce warnings.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's easy enough for someone to put the warnings parameter back if they need it. The point is, at the moment its not needed, and I'm not even sure it a warning would get output in the CLI even if one was supplied.

Better to keep the API to want we require now. We may never need to pass a warning via this class...

@JsonProperty("message") final Optional<String> message) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why would the message not be provided? Isn't the whole point of this entity type to return a message?

Suggested change
@JsonProperty("message") final Optional<String> message) {
@JsonProperty("message") final String message) {

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The idea is MessageEntity could be used by a different endpoint which produces an optional message.

E.g. maybe we decide to only print the insert results if the key is null.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't get it... this is a MessageEntity.... what's a MessageEntity without a message?

super(statementText, warnings);
this.message = Objects.requireNonNull(message);
}

public Optional<String> getMessage() {
return message;
}


@Override
public boolean equals(final Object o) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

incorrect hashCode and equals, as they're not taking into account the parent class's fields.

I'd suggest adding a unit test using EqualsTester and NullPointerTester to ensure your class is well behaved.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looking at the other KSqlEntity subclasses, most (all?) seem to suffer from this problem.

So we should probably fix them all.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I actually wonder why hashCode() and equals() are overridden for these classes in the first place.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So we should probably fix them all.

If they are needed, then yes, but that's a different PR ;) . What's more important is ensuring any new code is correct.

I actually wonder why hashCode() and equals() are overridden for these classes in the first place.

Totally. Often its only for tests to ensure you can assert on output. I'm not normally a fan of having production code just for tests, but I can live with this. However, is it's in the production code base it should be correct!

So we should either fix this or remove it.

if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
final MessageEntity that = (MessageEntity) o;
return message.equals(that.message);
}

@Override
public int hashCode() {
return Objects.hash(message);
}
}