Skip to content

Commit

Permalink
feat(static): initial drop of static query functionality (#3340)
Browse files Browse the repository at this point in the history
* feat(static): initial drop of static query functionality

The change brings in the first batch of functionality for static query.
It allows static queries on both windowed and non-windowed aggregate tables, i.e. tables build with a `GROUP BY` clause.

For non-windowed queries the following form is accepted:

```
SELECT * FROM X WHERE ROWKEY=Y;
```

- The only filter allowed is an equals filter on ROWKEY.
- Only a single ROWKEY is supported at the moment.
- Only `SELECT *` is supported at the moment.

For windowed queries the following forms is accepted:

```
-- exact window start:
SELECT * FROM X WHERE ROWKEY=Y AND WINDOWSTART=X;
SELECT * FROM X WHERE ROWKEY=Y AND WINDOWSTART='2020-02-23T23:45:12.000';

-- windwo start range:
SELECT * FROM X WHERE ROWKEY=Y AND WINDOWSTART>A AND WINDOWSTART<=B;
SELECT * FROM X WHERE ROWKEY=Y AND '2020-02-23T23:45:12.000' < WINDOWSTART AND WINDOWSTART <= '2020-02-24T23:45:12.000';
```

All types of windowed tables are supported.

At the moment, static query requests are served over the standard `/ksql` HTTP POST request. But this may change to being a websocket request.

* chore: fix test

* chore: reviewer's requested changes
  • Loading branch information
big-andy-coates authored Sep 13, 2019
1 parent 6747d5c commit 54c5139
Show file tree
Hide file tree
Showing 64 changed files with 6,844 additions and 145 deletions.
18 changes: 16 additions & 2 deletions ksql-cli/src/main/java/io/confluent/ksql/cli/Cli.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import io.confluent.ksql.parser.DefaultKsqlParser;
import io.confluent.ksql.parser.KsqlParser.ParsedStatement;
import io.confluent.ksql.parser.SqlBaseParser;
import io.confluent.ksql.parser.SqlBaseParser.QueryStatementContext;
import io.confluent.ksql.parser.SqlBaseParser.SingleStatementContext;
import io.confluent.ksql.rest.Errors;
import io.confluent.ksql.rest.client.KsqlRestClient;
Expand Down Expand Up @@ -257,8 +258,21 @@ private void handleStatements(final String line)
final SingleStatementContext statementContext = statement.getStatement();
final String statementText = statement.getStatementText();

if (statementContext.statement() instanceof SqlBaseParser.QueryStatementContext
|| statementContext.statement() instanceof SqlBaseParser.PrintTopicContext) {
if (statementContext.statement() instanceof SqlBaseParser.QueryStatementContext) {

final QueryStatementContext queryContext =
(QueryStatementContext) statementContext.statement();

if (queryContext.query().EMIT() != null) {
consecutiveStatements = printOrDisplayQueryResults(
consecutiveStatements,
statementContext,
statementText
);
} else {
makeKsqlRequest(statementText);
}
} else if (statementContext.statement() instanceof SqlBaseParser.PrintTopicContext) {
consecutiveStatements = printOrDisplayQueryResults(
consecutiveStatements,
statementContext,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import io.confluent.ksql.cli.console.table.builder.KafkaTopicsListTableBuilder;
import io.confluent.ksql.cli.console.table.builder.PropertiesListTableBuilder;
import io.confluent.ksql.cli.console.table.builder.QueriesTableBuilder;
import io.confluent.ksql.cli.console.table.builder.QueryResultTableBuilder;
import io.confluent.ksql.cli.console.table.builder.StreamsListTableBuilder;
import io.confluent.ksql.cli.console.table.builder.TableBuilder;
import io.confluent.ksql.cli.console.table.builder.TablesListTableBuilder;
Expand Down Expand Up @@ -66,6 +67,7 @@
import io.confluent.ksql.rest.entity.QueryDescription;
import io.confluent.ksql.rest.entity.QueryDescriptionEntity;
import io.confluent.ksql.rest.entity.QueryDescriptionList;
import io.confluent.ksql.rest.entity.QueryResultEntity;
import io.confluent.ksql.rest.entity.RunningQuery;
import io.confluent.ksql.rest.entity.SourceDescription;
import io.confluent.ksql.rest.entity.SourceDescriptionEntity;
Expand Down Expand Up @@ -161,6 +163,8 @@ public class Console implements Closeable {
tablePrinter(TypeList.class, TypeListTableBuilder::new))
.put(ErrorEntity.class,
tablePrinter(ErrorEntity.class, ErrorEntityTableBuilder::new))
.put(QueryResultEntity.class,
tablePrinter(QueryResultEntity.class, QueryResultTableBuilder::new))
.build();

private static <T extends KsqlEntity> Handler1<KsqlEntity, Console> tablePrinter(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,9 @@

package io.confluent.ksql.cli.console.table;

import static java.util.Objects.requireNonNull;

import com.google.common.annotations.VisibleForTesting;
import io.confluent.ksql.cli.console.Console;
import java.util.Arrays;
import java.util.LinkedList;
Expand All @@ -30,12 +33,26 @@ public final class Table {
private final List<String> header;
private final List<String> footer;

private Table(final List<String> columnHeaders, final List<List<String>> rowValues,
final List<String> header, final List<String> footer) {
this.columnHeaders = columnHeaders;
this.rowValues = rowValues;
this.header = header;
this.footer = footer;
private Table(
final List<String> columnHeaders,
final List<List<String>> rowValues,
final List<String> header,
final List<String> footer
) {
this.columnHeaders = requireNonNull(columnHeaders, "columnHeaders");
this.rowValues = requireNonNull(rowValues, "rowValues");
this.header = requireNonNull(header, "header");
this.footer = requireNonNull(footer, "footer");
}

@VisibleForTesting
public List<String> headers() {
return columnHeaders;
}

@VisibleForTesting
public List<List<String>> rows() {
return rowValues;
}

public static final class Builder {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
/*
* Copyright 2019 Confluent Inc.
*
* Licensed under the Confluent Community License (the "License"); you may not use
* this file except in compliance with the License. You may obtain a copy of the
* License at
*
* http://www.confluent.io/confluent-community-license
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/

package io.confluent.ksql.cli.console.table.builder;

import com.google.common.collect.ImmutableList;
import io.confluent.ksql.cli.console.table.Table;
import io.confluent.ksql.cli.console.table.Table.Builder;
import io.confluent.ksql.model.WindowType;
import io.confluent.ksql.rest.entity.QueryResultEntity;
import io.confluent.ksql.rest.entity.QueryResultEntity.Row;
import io.confluent.ksql.schema.ksql.Column;
import io.confluent.ksql.schema.ksql.LogicalSchema;
import java.util.List;
import java.util.Objects;
import java.util.stream.Collectors;
import java.util.stream.Stream;

public class QueryResultTableBuilder implements TableBuilder<QueryResultEntity> {

private static final List<String> TIME_WINDOW_HEADINGS = ImmutableList
.of("WINDOWSTART BIGINT");

private static final List<String> SESSION_WINDOW_HEADINGS = ImmutableList.<String>builder()
.addAll(TIME_WINDOW_HEADINGS)
.add("WINDOWEND BIGINT")
.build();

@Override
public Table buildTable(final QueryResultEntity entity) {

final List<String> headers = buildHeadings(entity);

final Stream<List<String>> rows = entity
.getRows()
.stream()
.map(r -> buildRow(r, entity.getSchema().value()));

return new Builder()
.withColumnHeaders(headers)
.withRows(rows)
.build();
}

private static List<String> buildHeadings(final QueryResultEntity entity) {
final LogicalSchema schema = entity.getSchema();

final Stream<String> keys = schema.key().stream()
.map(f -> f.fullName() + " " + f.type() + " KEY");

final Stream<String> window = entity.getWindowType()
.map(wt -> wt == WindowType.SESSION
? SESSION_WINDOW_HEADINGS
: TIME_WINDOW_HEADINGS)
.orElse(ImmutableList.of())
.stream();

final Stream<String> values = schema.value().stream()
.map(f -> f.fullName() + " " + f.type());

return Stream.concat(keys, Stream.concat(window, values))
.collect(Collectors.toList());
}

private static List<String> buildRow(
final Row row,
final List<Column> valueSchema
) {
final Stream<?> keys = row.getKey().values().stream();

final Stream<?> window = row.getWindow()
.map(w -> w.getEnd().isPresent()
? Stream.of(w.getStart(), w.getEnd().getAsLong())
: Stream.of(w.getStart()))
.orElse(Stream.of());

final Stream<?> values = row.getValue() == null
? valueSchema.stream().map(f -> null)
: row.getValue().values().stream();

return Stream.concat(keys, Stream.concat(window, values))
.map(Objects::toString)
.collect(Collectors.toList());
}
}
60 changes: 37 additions & 23 deletions ksql-cli/src/test/java/io/confluent/ksql/cli/CliTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import static org.hamcrest.Matchers.any;
import static org.hamcrest.Matchers.anyOf;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.emptyIterable;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.is;
Expand Down Expand Up @@ -54,7 +55,6 @@
import io.confluent.ksql.rest.entity.KsqlEntityList;
import io.confluent.ksql.rest.entity.KsqlErrorMessage;
import io.confluent.ksql.rest.entity.ServerInfo;
import io.confluent.ksql.rest.server.KsqlRestApplication;
import io.confluent.ksql.rest.server.TestKsqlRestApp;
import io.confluent.ksql.schema.ksql.LogicalSchema;
import io.confluent.ksql.schema.ksql.PhysicalSchema;
Expand Down Expand Up @@ -143,7 +143,6 @@ public class CliTest {
.withLookingForStuckThread(true)
.build();

private static final String COMMANDS_KSQL_TOPIC_NAME = KsqlRestApplication.COMMANDS_STREAM_NAME;
private static final OutputFormat CLI_OUTPUT_FORMAT = OutputFormat.TABULAR;

private static final long STREAMED_QUERY_ROW_LIMIT = 10000;
Expand Down Expand Up @@ -184,6 +183,9 @@ public static void classSetUp() throws Exception {

@Before
public void setUp() {
REST_APP.closePersistentQueries();
REST_APP.dropSourcesExcept(orderDataProvider.kstreamName());

streamName = KsqlIdentifierTestUtil.uniqueIdentifierName();
tableName = KsqlIdentifierTestUtil.uniqueIdentifierName();
terminal = new TestTerminal(lineSupplier);
Expand All @@ -196,8 +198,6 @@ public void setUp() {
restClient,
console
);

maybeDropStream("SHOULDRUNSCRIPT");
}

private void assertRunListCommand(
Expand Down Expand Up @@ -349,18 +349,6 @@ private static void dropStreamOrTable(final boolean stream, final String name) {
runStatement(dropStatement, restClient);
}

private static void maybeDropStream(final String name) {
final String dropStatement = String.format("drop stream %s;", name);

final RestResponse<?> response = restClient.makeKsqlRequest(dropStatement, null);
if (response.isSuccessful()
|| response.getErrorMessage().toString().contains("does not exist")) {
return;
}

dropStream(name);
}

private void selectWithLimit(
final String selectQuery,
final int limit,
Expand Down Expand Up @@ -604,7 +592,38 @@ public void testTransientSelect() {
}

@Test
public void testTransientSelectStar() {
public void testTransientStaticSelectStar() {
// Given:
run("CREATE TABLE X AS SELECT COUNT(1) AS COUNT "
+ "FROM " + orderDataProvider.kstreamName()
+ " GROUP BY ITEMID;",
localCli
);

assertRunCommand(
"SELECT * FROM X WHERE ROWKEY='unknowwn';",
is(emptyIterable())
);
}

@Test
public void testTransientStaticHeader() {
// Given:
run("CREATE TABLE Y AS SELECT COUNT(1) AS COUNT "
+ "FROM " + orderDataProvider.kstreamName()
+ " GROUP BY ITEMID;",
localCli
);

// When:
run("SELECT * FROM Y WHERE ROWKEY='ITEM_1';", localCli);

assertThat(terminal.getOutputString(), containsString("ROWKEY STRING KEY"));
assertThat(terminal.getOutputString(), containsString("COUNT BIGINT"));
}

@Test
public void testTransientContinuousSelectStar() {
final Map<String, GenericRow> streamData = orderDataProvider.data();
final List<Object> row1 = streamData.get("1").getColumns();
final List<Object> row2 = streamData.get("2").getColumns();
Expand All @@ -621,9 +640,8 @@ public void testTransientSelectStar() {
}

@Test
public void testTransientHeader() {
public void testTransientContinuousHeader() {
// When:
rowCaptor.resetTestResult();
run("SELECT * FROM " + orderDataProvider.kstreamName() + " EMIT CHANGES LIMIT 1", localCli);

// Then: (note that some of these are truncated because of header wrapping)
Expand Down Expand Up @@ -683,10 +701,6 @@ public void testCreateTable() {
dropTable(tableName);
}

// ===================================================================
// Below Tests are only used for coverage, not for results validation.
// ===================================================================

@Test
public void testRunInteractively() {
// Given:
Expand Down
Loading

0 comments on commit 54c5139

Please sign in to comment.