Skip to content

Commit

Permalink
fix: report window type and query status better from API (#4313)
Browse files Browse the repository at this point in the history
This commit:
1. exposes the window type of the key of a query/source, i.e. `HOPPING`, `TUMBLING` `SESSION` or none.
2. makes the status of a query easier to find.
3. fixes a bug that meant the statement text of a query was not displayed in the CLI.

BREAKING CHANGE: The response from the RESTful API has changed for some commands with this commit: the `SourceDescription` type no longer has a `format` field. Instead it has `keyFormat` and `valueFormat` fields.

## `SHOW QUERY` changes:

Response now includes a `state` property for each query that indicates the state of the query.

e.g.

```json
{
  "queryString" : "create table OUTPUT as select * from INPUT;",
  "sinks" : [ "OUTPUT" ],
  "id" : "CSAS_OUTPUT_0",
  "state" : "Running"
}
```

The CLI output was:

```
 ksql> show queries;

  Query ID                   | Kafka Topic         | Query String

   CSAS_OUTPUT_0              | OUTPUT              | CREATE STREAM OUTPUT WITH (KAFKA_TOPIC='OUTPUT', PARTITIONS=1, REPLICAS=1) AS SELECT *
 FROM INPUT INPUT
 EMIT CHANGES;
  CTAS_CLICK_USER_SESSIONS_5 | CLICK_USER_SESSIONS | CREATE TABLE CLICK_USER_SESSIONS WITH (KAFKA_TOPIC='CLICK_USER_SESSIONS', PARTITIONS=1, REPLICAS=1) AS SELECT
   CLICKSTREAM.USERID USERID,
   COUNT(*) COUNT
 FROM CLICKSTREAM CLICKSTREAM
 WINDOW SESSION ( 300 SECONDS )
 GROUP BY CLICKSTREAM.USERID
 EMIT CHANGES;

 For detailed information on a Query run: EXPLAIN <Query ID>;
```

and is now:

```
 Query ID                   | Status      | Kafka Topic         | Query String

 CSAS_OUTPUT_0              | RUNNING     | OUTPUT              | CREATE STREAM OUTPUT WITH (KAFKA_TOPIC='OUTPUT', PARTITIONS=1, REPLICAS=1) AS SELECT *FROM INPUT INPUTEMIT CHANGES;

For detailed information on a Query run: EXPLAIN <Query ID>;

```
Note the addition of the `Status` column and the fact that `Query String` is now longer being written across multiple lines.

## `DESCRIBE <source>;` changes:

old CLI output:

```
ksql> describe CLICK_USER_SESSIONS;

Name                 : CLICK_USER_SESSIONS
 Field   | Type

 ROWTIME | BIGINT           (system)
 ROWKEY  | INTEGER          (system)
 USERID  | INTEGER
 COUNT   | BIGINT

For runtime statistics and query details run: DESCRIBE EXTENDED <Stream,Table>;
```

New CLI output:

```
ksql> describe CLICK_USER_SESSIONS;

Name                 : CLICK_USER_SESSIONS
 Field   | Type

 ROWTIME | BIGINT           (system)
 ROWKEY  | INTEGER          (system) (Window type: SESSION)
 USERID  | INTEGER
 COUNT   | BIGINT

For runtime statistics and query details run: DESCRIBE EXTENDED <Stream,Table>;
```

Note the addition of the `Window Type` information.

The extended version of the command has also changed.

Old output:

```
ksql> describe extended CLICK_USER_SESSIONS;

Name                 : CLICK_USER_SESSIONS
Type                 : TABLE
Key field            : USERID
Key format           : STRING
Timestamp field      : Not set - using <ROWTIME>
Value Format                : JSON
Kafka topic          : CLICK_USER_SESSIONS (partitions: 1, replication: 1)
Statement            : CREATE TABLE CLICK_USER_SESSIONS WITH (KAFKA_TOPIC='CLICK_USER_SESSIONS', PARTITIONS=1, REPLICAS=1) AS SELECT
  CLICKSTREAM.USERID USERID,
  COUNT(*) COUNT
FROM CLICKSTREAM CLICKSTREAM
WINDOW SESSION ( 300 SECONDS )
GROUP BY CLICKSTREAM.USERID
EMIT CHANGES;

 Field   | Type

 ROWTIME | BIGINT           (system)
 ROWKEY  | INTEGER          (system)
 USERID  | INTEGER
 COUNT   | BIGINT

Queries that write from this TABLE
-----------------------------------
CTAS_CLICK_USER_SESSIONS_5 (RUNNING) : CREATE TABLE CLICK_USER_SESSIONS WITH (KAFKA_TOPIC='CLICK_USER_SESSIONS', PARTITIONS=1, REPLICAS=1) AS SELECT
 CLICKSTREAM.USERID USERID,
 COUNT(*) COUNT
 FROM CLICKSTREAM CLICKSTREAM
 WINDOW SESSION ( 300 SECONDS )
 GROUP BY CLICKSTREAM.USERID
 EMIT CHANGES;

For query topology and execution plan please run: EXPLAIN <QueryId>

Local runtime statistics
------------------------

(Statistics of the local KSQL server interaction with the Kafka topic CLICK_USER_SESSIONS)
```

New output:

```
ksql> describe extended CLICK_USER_SESSIONS;

Name                 : CLICK_USER_SESSIONS
Type                 : TABLE
Key field            : USERID
Timestamp field      : Not set - using <ROWTIME>
Key format           : KAFKA
Value format         : JSON
Kafka topic          : CLICK_USER_SESSIONS (partitions: 1, replication: 1)
Statement            : CREATE TABLE CLICK_USER_SESSIONS WITH (KAFKA_TOPIC='CLICK_USER_SESSIONS', PARTITIONS=1, REPLICAS=1) AS SELECT
  CLICKSTREAM.USERID USERID,
  COUNT(*) COUNT
FROM CLICKSTREAM CLICKSTREAM
WINDOW SESSION ( 300 SECONDS )
GROUP BY CLICKSTREAM.USERID
EMIT CHANGES;

 Field   | Type

 ROWTIME | BIGINT           (system)
 ROWKEY  | INTEGER          (system) (Window type: SESSION)
 USERID  | INTEGER
 COUNT   | BIGINT

Queries that write from this TABLE
-----------------------------------
CTAS_CLICK_USER_SESSIONS_5 (RUNNING) : CREATE TABLE CLICK_USER_SESSIONS WITH (KAFKA_TOPIC='CLICK_USER_SESSIONS', PARTITIONS=1, REPLICAS=1) AS SELECT  CLICKSTREAM.USERID USERID,  COUNT(*) COUNTFROM CLICKSTREAM CLICKSTREAMWINDOW SESSION ( 300 SECONDS ) GROUP BY CLICKSTREAM.USERIDEMIT CHANGES;

For query topology and execution plan please run: EXPLAIN <QueryId>

Local runtime statistics
------------------------

(Statistics of the local KSQL server interaction with the Kafka topic CLICK_USER_SESSIONS)
```

 Note: the change from `Key format` of `STRING` to `KAFKA`.  The output of `Window Type` information for windowed schemas and outputing sql statements on a single line.
  • Loading branch information
big-andy-coates authored Jan 16, 2020
1 parent 0a74151 commit ca9368a
Show file tree
Hide file tree
Showing 18 changed files with 384 additions and 145 deletions.
47 changes: 34 additions & 13 deletions ksql-cli/src/main/java/io/confluent/ksql/cli/console/Console.java
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import io.confluent.ksql.cli.console.table.builder.TopicDescriptionTableBuilder;
import io.confluent.ksql.cli.console.table.builder.TypeListTableBuilder;
import io.confluent.ksql.json.JsonMapper;
import io.confluent.ksql.model.WindowType;
import io.confluent.ksql.rest.entity.ArgumentInfo;
import io.confluent.ksql.rest.entity.CommandStatusEntity;
import io.confluent.ksql.rest.entity.ConnectorDescription;
Expand Down Expand Up @@ -448,22 +449,40 @@ private void printWarnings(final KsqlEntity entity) {
}
}

private static String formatFieldType(final FieldInfo field, final String keyField) {
if (field.getName().equals("ROWTIME") || field.getName().equals("ROWKEY")) {
private static String formatFieldType(
final FieldInfo field,
final Optional<WindowType> windowType,
final String keyField
) {
if (field.getName().equals("ROWTIME")) {
return String.format("%-16s %s", field.getSchema().toTypeString(), "(system)");
} else if (keyField != null && keyField.contains("." + field.getName())) {
}

if (field.getName().equals("ROWKEY")) {
final String wt = windowType
.map(v -> " (Window type: " + v + ")")
.orElse("");

return String.format("%-16s %s%s", field.getSchema().toTypeString(), "(system)", wt);
}

if (keyField != null && keyField.contains("." + field.getName())) {
return String.format("%-16s %s", field.getSchema().toTypeString(), "(key)");
} else {
return field.getSchema().toTypeString();
}

return field.getSchema().toTypeString();
}

private void printSchema(final List<FieldInfo> fields, final String keyField) {
private void printSchema(
final Optional<WindowType> windowType,
final List<FieldInfo> fields,
final String keyField
) {
final Table.Builder tableBuilder = new Table.Builder();
if (!fields.isEmpty()) {
tableBuilder.withColumnHeaders("Field", "Type");
fields.forEach(
f -> tableBuilder.withRow(f.getName(), formatFieldType(f, keyField)));
f -> tableBuilder.withRow(f.getName(), formatFieldType(f, windowType, keyField)));
tableBuilder.build().print(this);
}
}
Expand All @@ -474,9 +493,9 @@ private void printTopicInfo(final SourceDescription source) {
: source.getTimestamp();

writer().println(String.format("%-20s : %s", "Key field", source.getKey()));
writer().println(String.format("%-20s : %s", "Key format", "STRING"));
writer().println(String.format("%-20s : %s", "Timestamp field", timestamp));
writer().println(String.format("%-20s : %s", "Value format", source.getFormat()));
writer().println(String.format("%-20s : %s", "Key format", source.getKeyFormat()));
writer().println(String.format("%-20s : %s", "Value format", source.getValueFormat()));

if (!source.getTopic().isEmpty()) {
String topicInformation = String.format("%-20s : %s",
Expand Down Expand Up @@ -509,7 +528,9 @@ private void printQueries(
"-----------------------------------"
));
for (final RunningQuery writeQuery : queries) {
writer().println(writeQuery.getId().getId() + " : " + writeQuery.getQueryString());
writer().println(writeQuery.getId().getId()
+ " (" + writeQuery.getState().orElse("N/A")
+ ") : " + writeQuery.getQuerySingleLine());
}
writer().println("\nFor query topology and execution plan please run: EXPLAIN <QueryId>");
}
Expand Down Expand Up @@ -562,7 +583,7 @@ private void printOverriddenProperties(final QueryDescription queryDescription)
private void printSourceDescription(final SourceDescription source) {
writer().println(String.format("%-20s : %s", "Name", source.getName()));
if (!source.isExtended()) {
printSchema(source.getFields(), source.getKey());
printSchema(source.getWindowType(), source.getFields(), source.getKey());
writer().println(
"For runtime statistics and query details run: DESCRIBE EXTENDED <Stream,Table>;");
return;
Expand All @@ -573,7 +594,7 @@ private void printSourceDescription(final SourceDescription source) {
writer().println(String.format("%-20s : %s", "Statement", source.getStatement()));
writer().println("");

printSchema(source.getFields(), source.getKey());
printSchema(source.getWindowType(), source.getFields(), source.getKey());

printQueries(source.getReadQueries(), source.getType(), "read");

Expand Down Expand Up @@ -638,7 +659,7 @@ private void printQueryDescription(final QueryDescription query) {
writer().println(String.format("%-20s : %s", "Status", query.getState().get()));
}
writer().println();
printSchema(query.getFields(), "");
printSchema(query.getWindowType(), query.getFields(), "");
printQuerySources(query);
printQuerySinks(query);
printExecutionPlan(query);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,14 +25,17 @@
public class QueriesTableBuilder implements TableBuilder<Queries> {

private static final List<String> HEADERS =
ImmutableList.of("Query ID", "Kafka Topic", "Query String");
ImmutableList.of("Query ID", "Status", "Kafka Topic", "Query String");

@Override
public Table buildTable(final Queries entity) {
final Stream<List<String>> rows = entity.getQueries().stream()
.map(r -> ImmutableList.of(
r.getId().getId(),
String.join(",", r.getSinks()), r.getQueryString()));
r.getState().orElse("N/A"),
String.join(",", r.getSinks()),
r.getQuerySingleLine()
));

return new Builder()
.withColumnHeaders(HEADERS)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.function.Supplier;
import org.apache.commons.lang3.StringUtils;
import org.apache.kafka.connect.runtime.rest.entities.ConnectorStateInfo;
Expand Down Expand Up @@ -109,6 +110,7 @@ public class ConsoleTest {
private final CliSpecificCommand cliCommand;
private final SourceDescription sourceDescription = new SourceDescription(
"TestSource",
Optional.empty(),
Collections.emptyList(),
Collections.emptyList(),
buildTestSchema(SqlTypes.INTEGER, SqlTypes.STRING),
Expand All @@ -118,6 +120,7 @@ public class ConsoleTest {
"stats",
"errors",
true,
"kafka",
"avro",
"kadka-topic",
2,
Expand Down Expand Up @@ -285,7 +288,7 @@ public void testPrintQueries() {
final List<RunningQuery> queries = new ArrayList<>();
queries.add(
new RunningQuery(
"select * from t1", Collections.singleton("Test"), new QueryId("0")));
"select * from t1", Collections.singleton("Test"), new QueryId("0"), Optional.of("Running")));

final KsqlEntityList entityList = new KsqlEntityList(ImmutableList.of(
new Queries("e", queries)
Expand All @@ -301,18 +304,19 @@ public void testPrintQueries() {
+ " \"@type\" : \"queries\",\n"
+ " \"statementText\" : \"e\",\n"
+ " \"queries\" : [ {\n"
+ " \"queryString\" : \"select * from t1\",\n"
+ " \"sinks\" : [ \"Test\" ],\n"
+ " \"id\" : \"0\",\n"
+ " \"queryString\" : \"select * from t1\"\n"
+ " \"state\" : \"Running\"\n"
+ " } ],\n"
+ " \"warnings\" : [ ]\n"
+ "} ]\n"));
} else {
assertThat(output, is("\n"
+ " Query ID | Kafka Topic | Query String \n"
+ "-------------------------------------------\n"
+ " 0 | Test | select * from t1 \n"
+ "-------------------------------------------\n"
+ " Query ID | Status | Kafka Topic | Query String \n"
+ "-----------------------------------------------------\n"
+ " 0 | Running | Test | select * from t1 \n"
+ "-----------------------------------------------------\n"
+ "For detailed information on a Query run: EXPLAIN <Query ID>;\n"));
}
}
Expand All @@ -334,17 +338,18 @@ public void testPrintSourceDescription() {
);

final List<RunningQuery> readQueries = ImmutableList.of(
new RunningQuery("read query", ImmutableSet.of("sink1"), new QueryId("readId"))
new RunningQuery("read query", ImmutableSet.of("sink1"), new QueryId("readId"), Optional.of("Running"))
);
final List<RunningQuery> writeQueries = ImmutableList.of(
new RunningQuery("write query", ImmutableSet.of("sink2"), new QueryId("writeId"))
new RunningQuery("write query", ImmutableSet.of("sink2"), new QueryId("writeId"), Optional.of("Running"))
);

final KsqlEntityList entityList = new KsqlEntityList(ImmutableList.of(
new SourceDescriptionEntity(
"some sql",
new SourceDescription(
"TestSource",
Optional.empty(),
readQueries,
writeQueries,
fields,
Expand All @@ -354,6 +359,7 @@ public void testPrintSourceDescription() {
"stats",
"errors",
false,
"kafka",
"avro",
"kadka-topic",
1,
Expand All @@ -375,15 +381,18 @@ public void testPrintSourceDescription() {
+ " \"statementText\" : \"some sql\",\n"
+ " \"sourceDescription\" : {\n"
+ " \"name\" : \"TestSource\",\n"
+ " \"windowType\" : null,\n"
+ " \"readQueries\" : [ {\n"
+ " \"queryString\" : \"read query\",\n"
+ " \"sinks\" : [ \"sink1\" ],\n"
+ " \"id\" : \"readId\",\n"
+ " \"queryString\" : \"read query\"\n"
+ " \"state\" : \"Running\"\n"
+ " } ],\n"
+ " \"writeQueries\" : [ {\n"
+ " \"queryString\" : \"write query\",\n"
+ " \"sinks\" : [ \"sink2\" ],\n"
+ " \"id\" : \"writeId\",\n"
+ " \"queryString\" : \"write query\"\n"
+ " \"state\" : \"Running\"\n"
+ " } ],\n"
+ " \"fields\" : [ {\n"
+ " \"name\" : \"ROWTIME\",\n"
Expand Down Expand Up @@ -477,7 +486,8 @@ public void testPrintSourceDescription() {
+ " \"statistics\" : \"stats\",\n"
+ " \"errorStats\" : \"errors\",\n"
+ " \"extended\" : false,\n"
+ " \"format\" : \"avro\",\n"
+ " \"keyFormat\" : \"kafka\",\n"
+ " \"valueFormat\" : \"avro\",\n"
+ " \"topic\" : \"kadka-topic\",\n"
+ " \"partitions\" : 1,\n"
+ " \"replication\" : 1,\n"
Expand Down Expand Up @@ -583,6 +593,7 @@ public void testPrintConnectorDescription() {
+ " },\n"
+ " \"sources\" : [ {\n"
+ " \"name\" : \"TestSource\",\n"
+ " \"windowType\" : null,\n"
+ " \"readQueries\" : [ ],\n"
+ " \"writeQueries\" : [ ],\n"
+ " \"fields\" : [ {\n"
Expand Down Expand Up @@ -620,7 +631,8 @@ public void testPrintConnectorDescription() {
+ " \"statistics\" : \"stats\",\n"
+ " \"errorStats\" : \"errors\",\n"
+ " \"extended\" : true,\n"
+ " \"format\" : \"avro\",\n"
+ " \"keyFormat\" : \"kafka\",\n"
+ " \"valueFormat\" : \"avro\",\n"
+ " \"topic\" : \"kadka-topic\",\n"
+ " \"partitions\" : 2,\n"
+ " \"replication\" : 1,\n"
Expand Down Expand Up @@ -979,17 +991,18 @@ public void testPrintExecuptionPlan() {
public void shouldPrintTopicDescribeExtended() {
// Given:
final List<RunningQuery> readQueries = ImmutableList.of(
new RunningQuery("read query", ImmutableSet.of("sink1"), new QueryId("readId"))
new RunningQuery("read query", ImmutableSet.of("sink1"), new QueryId("readId"), Optional.of("Running"))
);
final List<RunningQuery> writeQueries = ImmutableList.of(
new RunningQuery("write query", ImmutableSet.of("sink2"), new QueryId("writeId"))
new RunningQuery("write query", ImmutableSet.of("sink2"), new QueryId("writeId"), Optional.of("Running"))
);

final KsqlEntityList entityList = new KsqlEntityList(ImmutableList.of(
new SourceDescriptionEntity(
"e",
new SourceDescription(
"TestSource",
Optional.empty(),
readQueries,
writeQueries,
buildTestSchema(SqlTypes.STRING),
Expand All @@ -999,6 +1012,7 @@ public void shouldPrintTopicDescribeExtended() {
"stats",
"errors",
true,
"kafka",
"avro",
"kadka-topic",
2, 1,
Expand All @@ -1019,15 +1033,18 @@ public void shouldPrintTopicDescribeExtended() {
+ " \"statementText\" : \"e\",\n"
+ " \"sourceDescription\" : {\n"
+ " \"name\" : \"TestSource\",\n"
+ " \"windowType\" : null,\n"
+ " \"readQueries\" : [ {\n"
+ " \"queryString\" : \"read query\",\n"
+ " \"sinks\" : [ \"sink1\" ],\n"
+ " \"id\" : \"readId\",\n"
+ " \"queryString\" : \"read query\"\n"
+ " \"state\" : \"Running\"\n"
+ " } ],\n"
+ " \"writeQueries\" : [ {\n"
+ " \"queryString\" : \"write query\",\n"
+ " \"sinks\" : [ \"sink2\" ],\n"
+ " \"id\" : \"writeId\",\n"
+ " \"queryString\" : \"write query\"\n"
+ " \"state\" : \"Running\"\n"
+ " } ],\n"
+ " \"fields\" : [ {\n"
+ " \"name\" : \"ROWTIME\",\n"
Expand Down Expand Up @@ -1057,7 +1074,8 @@ public void shouldPrintTopicDescribeExtended() {
+ " \"statistics\" : \"stats\",\n"
+ " \"errorStats\" : \"errors\",\n"
+ " \"extended\" : true,\n"
+ " \"format\" : \"avro\",\n"
+ " \"keyFormat\" : \"kafka\",\n"
+ " \"valueFormat\" : \"avro\",\n"
+ " \"topic\" : \"kadka-topic\",\n"
+ " \"partitions\" : 2,\n"
+ " \"replication\" : 1,\n"
Expand All @@ -1070,8 +1088,8 @@ public void shouldPrintTopicDescribeExtended() {
+ "Name : TestSource\n"
+ "Type : TABLE\n"
+ "Key field : key\n"
+ "Key format : STRING\n"
+ "Timestamp field : 2000-01-01\n"
+ "Key format : kafka\n"
+ "Value format : avro\n"
+ "Kafka topic : kadka-topic (partitions: 2, replication: 1)\n"
+ "Statement : sql statement text\n"
Expand All @@ -1085,13 +1103,13 @@ public void shouldPrintTopicDescribeExtended() {
+ "\n"
+ "Queries that read from this TABLE\n"
+ "-----------------------------------\n"
+ "readId : read query\n"
+ "readId (Running) : read query\n"
+ "\n"
+ "For query topology and execution plan please run: EXPLAIN <QueryId>\n"
+ "\n"
+ "Queries that write from this TABLE\n"
+ "-----------------------------------\n"
+ "writeId : write query\n"
+ "writeId (Running) : write query\n"
+ "\n"
+ "For query topology and execution plan please run: EXPLAIN <QueryId>\n"
+ "\n"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
package io.confluent.ksql.rest.entity;

import com.google.common.collect.ImmutableSet;
import io.confluent.ksql.model.WindowType;
import io.confluent.ksql.name.SourceName;
import io.confluent.ksql.query.QueryId;
import io.confluent.ksql.rest.util.EntityUtil;
Expand All @@ -37,6 +38,7 @@ public static QueryDescription forQueryMetadata(final QueryMetadata queryMetadat
return create(
persistentQuery.getQueryId(),
persistentQuery,
persistentQuery.getResultTopic().getKeyFormat().getWindowType(),
ImmutableSet.of(persistentQuery.getSinkName()),
Optional.of(persistentQuery.getState())
);
Expand All @@ -45,6 +47,7 @@ public static QueryDescription forQueryMetadata(final QueryMetadata queryMetadat
return create(
new QueryId(""),
queryMetadata,
Optional.empty(),
Collections.emptySet(),
Optional.empty()
);
Expand All @@ -53,12 +56,14 @@ public static QueryDescription forQueryMetadata(final QueryMetadata queryMetadat
private static QueryDescription create(
final QueryId id,
final QueryMetadata queryMetadata,
final Optional<WindowType> windowType,
final Set<SourceName> sinks,
final Optional<String> state
) {
return new QueryDescription(
id,
queryMetadata.getStatementString(),
windowType,
EntityUtil.buildSourceSchemaEntity(queryMetadata.getLogicalSchema()),
queryMetadata.getSourceNames().stream().map(SourceName::name).collect(Collectors.toSet()),
sinks.stream().map(SourceName::name).collect(Collectors.toSet()),
Expand Down
Loading

0 comments on commit ca9368a

Please sign in to comment.