Skip to content

Commit

Permalink
feat(cli): improve CLI transient queries with headers and spacing (pa…
Browse files Browse the repository at this point in the history
…rtial fix for #892) (#3047)
  • Loading branch information
agavra authored Jul 15, 2019
1 parent 55bfceb commit 050b72a
Show file tree
Hide file tree
Showing 6 changed files with 361 additions and 16 deletions.
23 changes: 19 additions & 4 deletions ksql-cli/src/main/java/io/confluent/ksql/cli/Cli.java
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,10 @@
import io.confluent.ksql.rest.client.RestResponse;
import io.confluent.ksql.rest.entity.CommandStatus;
import io.confluent.ksql.rest.entity.CommandStatusEntity;
import io.confluent.ksql.rest.entity.FieldInfo;
import io.confluent.ksql.rest.entity.KsqlEntity;
import io.confluent.ksql.rest.entity.KsqlEntityList;
import io.confluent.ksql.rest.entity.QueryDescriptionEntity;
import io.confluent.ksql.rest.entity.StreamedRow;
import io.confluent.ksql.rest.server.resources.Errors;
import io.confluent.ksql.util.ErrorMessageUtil;
Expand Down Expand Up @@ -323,6 +325,17 @@ private void printKsqlResponse(final RestResponse<KsqlEntityList> response) thro

@SuppressWarnings("try")
private void handleStreamedQuery(final String query) throws IOException {
final RestResponse<KsqlEntityList> explainResponse = restClient
.makeKsqlRequest("EXPLAIN " + query);
if (!explainResponse.isSuccessful()) {
terminal.printErrorMessage(explainResponse.getErrorMessage());
return;
}

final QueryDescriptionEntity description =
(QueryDescriptionEntity) explainResponse.getResponse().get(0);
final List<FieldInfo> fields = description.getQueryDescription().getFields();
terminal.printRowHeader(fields);

final RestResponse<KsqlRestClient.QueryStream> queryResponse =
makeKsqlRequest(query, restClient::makeQueryRequest);
Expand All @@ -334,18 +347,20 @@ private void handleStreamedQuery(final String query) throws IOException {
} else {
try (KsqlRestClient.QueryStream queryStream = queryResponse.getResponse();
StatusClosable ignored = terminal.setStatusMessage("Press CTRL-C to interrupt")) {
streamResults(queryStream);
streamResults(queryStream, fields);
}
}
}

private void streamResults(final QueryStream queryStream) {

private void streamResults(
final QueryStream queryStream,
final List<FieldInfo> fields
) {
final Future<?> queryStreamFuture = queryStreamExecutorService.submit(() -> {
for (long rowsRead = 0; limitNotReached(rowsRead) && queryStream.hasNext(); rowsRead++) {
try {
final StreamedRow row = queryStream.next();
terminal.printStreamedRow(row);
terminal.printStreamedRow(row, fields);
if (row.isTerminal()) {
break;
}
Expand Down
35 changes: 26 additions & 9 deletions ksql-cli/src/main/java/io/confluent/ksql/cli/console/Console.java
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@
import io.confluent.ksql.util.HandlerMaps.ClassHandlerMap1;
import io.confluent.ksql.util.HandlerMaps.Handler1;
import io.confluent.ksql.util.KsqlException;
import io.confluent.ksql.util.TabularRow;
import java.io.Closeable;
import java.io.File;
import java.io.IOException;
Expand Down Expand Up @@ -282,7 +283,10 @@ public void printError(final String shortMsg, final String fullMsg) {
writer().println(shortMsg);
}

public void printStreamedRow(final StreamedRow row) throws IOException {
public void printStreamedRow(
final StreamedRow row,
final List<FieldInfo> fields
) throws IOException {
if (row.getErrorMessage() != null) {
printErrorMessage(row.getErrorMessage());
return;
Expand All @@ -298,7 +302,7 @@ public void printStreamedRow(final StreamedRow row) throws IOException {
printAsJson(row.getRow().getColumns());
break;
case TABULAR:
printAsTable(row.getRow());
printAsTable(row.getRow(), fields);
break;
default:
throw new RuntimeException(String.format(
Expand Down Expand Up @@ -331,6 +335,21 @@ public void printKsqlEntityList(final List<KsqlEntity> entityList) throws IOExce
}
}

public void printRowHeader(final List<FieldInfo> fields) throws IOException {
switch (outputFormat) {
case JSON:
break;
case TABULAR:
writer().println(TabularRow.createHeader(getWidth(), fields));
break;
default:
throw new RuntimeException(String.format(
"Unexpected output format: '%s'",
outputFormat.name()
));
}
}

public void registerCliSpecificCommand(final CliSpecificCommand cliSpecificCommand) {
cliSpecificCommands.put(cliSpecificCommand.getName().toLowerCase(), cliSpecificCommand);
}
Expand Down Expand Up @@ -369,14 +388,12 @@ private Optional<CliCmdExecutor> getCliCommand(final String line) {
.findFirst();
}

private void printAsTable(final GenericRow row) {
private void printAsTable(
final GenericRow row,
final List<FieldInfo> fields
) {
rowCaptor.addRow(row);
writer().println(
String.join(
" | ",
row.getColumns().stream().map(Objects::toString).collect(Collectors.toList())
)
);
writer().println(TabularRow.createRow(getWidth(), fields, row));
flush();
}

Expand Down
155 changes: 155 additions & 0 deletions ksql-cli/src/main/java/io/confluent/ksql/util/TabularRow.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,155 @@
/*
* Copyright 2019 Confluent Inc.
*
* Licensed under the Confluent Community License (the "License"; you may not use
* this file except in compliance with the License. You may obtain a copy of the
* License at
*
* http://www.confluent.io/confluent-community-license
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/

package io.confluent.ksql.util;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Splitter;
import com.google.common.base.Strings;
import io.confluent.ksql.GenericRow;
import io.confluent.ksql.rest.entity.FieldInfo;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.stream.Collectors;

public class TabularRow {

private static final int MIN_CELL_WIDTH = 5;

private final int width;
private final List<String> value;
private final List<String> header;
private final boolean isHeader;

public static TabularRow createHeader(final int width, final List<FieldInfo> header) {
return new TabularRow(
width,
header.stream().map(FieldInfo::getName).collect(Collectors.toList()),
null);
}

public static TabularRow createRow(
final int width,
final List<FieldInfo> header,
final GenericRow value
) {
return new TabularRow(
width,
header.stream().map(FieldInfo::getName).collect(Collectors.toList()),
value.getColumns().stream().map(Objects::toString).collect(Collectors.toList())
);
}

@VisibleForTesting
TabularRow(
final int width,
final List<String> header,
final List<String> value
) {
this.header = Objects.requireNonNull(header, "header");
this.width = width;
this.value = value;
this.isHeader = value == null;
}

@Override
public String toString() {
final List<String> columns = isHeader ? header : value;

if (columns.isEmpty()) {
return "";
}

final int cellWidth = Math.max(width / columns.size() - 2, MIN_CELL_WIDTH);
final StringBuilder builder = new StringBuilder();

if (isHeader) {
separatingLine(builder, cellWidth, columns.size());
builder.append('\n');
}

// split each column into fix length chunks
final List<List<String>> split = columns.stream()
.map(col -> splitToFixed(col, cellWidth))
.collect(Collectors.toList());

// buffer each column vertically to have the max number of splits
final int maxSplit = split.stream().mapToInt(List::size).max().orElse(0);
final List<List<String>> buffered = split.stream()
.map(s -> addUntil(s, createCell("", cellWidth), maxSplit))
.collect(Collectors.toList());

formatRow(builder, buffered, maxSplit);

if (isHeader) {
builder.append('\n');
separatingLine(builder, cellWidth, columns.size());
}

return builder.toString();
}

@SuppressWarnings("ForLoopReplaceableByForEach") // clearer to read this way
private static void formatRow(
final StringBuilder builder,
final List<List<String>> columns,
final int numRows
) {
for (int row = 0; row < numRows; row++) {
builder.append('|');
for (int col = 0; col < columns.size(); col++) {
builder.append(columns.get(col).get(row));
}
if (row != numRows - 1) {
builder.append('\n');
}
}
}

@SuppressWarnings("UnstableApiUsage")
private static List<String> splitToFixed(final String value, final int width) {
return Splitter.fixedLength(width)
.splitToList(value)
.stream()
.map(line -> createCell(line, width))
.collect(Collectors.toList());
}

private static void separatingLine(
final StringBuilder builder,
final int cellWidth,
final int numColumns
) {
builder.append("+");
for (int i = 0; i < numColumns; i++) {
builder.append(Strings.repeat("-", cellWidth));
builder.append("+");
}
}

private static String createCell(final String value, final int width) {
final String format = "%-" + width + "s|";
return String.format(format, value);
}

private static <T> List<T> addUntil(final List<T> source, final T value, final int desiredSize) {
final List<T> copy = new ArrayList<>(source) ;
while (copy.size() < desiredSize) {
copy.add(value);
}
return copy;
}
}
17 changes: 17 additions & 0 deletions ksql-cli/src/test/java/io/confluent/ksql/cli/CliTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -598,6 +598,23 @@ public void testTransientSelectStar() {
));
}

@Test
public void testTransientHeader() {
// When:
rowCaptor.resetTestResult();
run("SELECT * FROM " + orderDataProvider.kstreamName() + " LIMIT 1", localCli);

// Then: (note that some of these are truncated because of header wrapping)
assertThat(terminal.getOutputString(), containsString("ROWTIME"));
assertThat(terminal.getOutputString(), containsString("ROWKEY"));
assertThat(terminal.getOutputString(), containsString("ITEMID"));
assertThat(terminal.getOutputString(), containsString("ORDERID"));
assertThat(terminal.getOutputString(), containsString("ORDERUNIT"));
assertThat(terminal.getOutputString(), containsString("TIMESTAMP"));
assertThat(terminal.getOutputString(), containsString("PRICEARRA"));
assertThat(terminal.getOutputString(), containsString("KEYVALUEM"));
}

@Test
public void testSelectUDFs() {
final String queryString = String.format(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
import io.confluent.ksql.rest.entity.PropertiesList;
import io.confluent.ksql.rest.entity.Queries;
import io.confluent.ksql.rest.entity.RunningQuery;
import io.confluent.ksql.rest.entity.SchemaInfo;
import io.confluent.ksql.rest.entity.SourceDescription;
import io.confluent.ksql.rest.entity.SourceDescriptionEntity;
import io.confluent.ksql.rest.entity.SourceInfo;
Expand All @@ -58,6 +59,7 @@
import io.confluent.ksql.rest.server.computation.CommandId;
import io.confluent.ksql.rest.util.EntityUtil;
import io.confluent.ksql.schema.ksql.LogicalSchema;
import io.confluent.ksql.schema.ksql.SqlBaseType;
import io.confluent.ksql.serde.Format;
import java.io.IOException;
import java.util.ArrayList;
Expand All @@ -79,6 +81,10 @@ public class ConsoleTest {

private static final String CLI_CMD_NAME = "some command";
private static final String WHITE_SPACE = " \t ";
private static final List<FieldInfo> HEADER =
ImmutableList.of(
new FieldInfo("foo", new SchemaInfo(SqlBaseType.STRING, null, null)),
new FieldInfo("bar", new SchemaInfo(SqlBaseType.STRING, null, null)));

private final TestTerminal terminal;
private final Console console;
Expand Down Expand Up @@ -108,22 +114,43 @@ public void after() {

@Test
public void testPrintGenericStreamedRow() throws IOException {
// Given:
final StreamedRow row = StreamedRow.row(new GenericRow(ImmutableList.of("col_1", "col_2")));
console.printStreamedRow(row);

// When:
console.printStreamedRow(row, HEADER);

// Then:
if (console.getOutputFormat() == OutputFormat.TABULAR) {
assertThat(terminal.getOutputString(), containsString("col_1"));
assertThat(terminal.getOutputString(), containsString("col_2"));
}
}

@Test
public void testPrintHeader() throws IOException {
// When:
console.printRowHeader(HEADER);

// Then:
if (console.getOutputFormat() == OutputFormat.TABULAR) {
assertThat(terminal.getOutputString(), containsString("foo"));
assertThat(terminal.getOutputString(), containsString("bar"));
}
}

@Test
public void testPrintErrorStreamedRow() throws IOException {
final FakeException exception = new FakeException();

console.printStreamedRow(StreamedRow.error(exception));
console.printStreamedRow(StreamedRow.error(exception), HEADER);

assertThat(terminal.getOutputString(), is(exception.getMessage() + "\n"));
}

@Test
public void testPrintFinalMessageStreamedRow() throws IOException {
console.printStreamedRow(StreamedRow.finalMessage("Some message"));
console.printStreamedRow(StreamedRow.finalMessage("Some message"), HEADER);
assertThat(terminal.getOutputString(), is("Some message\n"));
}

Expand Down
Loading

0 comments on commit 050b72a

Please sign in to comment.