From deb8b206a2a1a0fb41abb3fe709190899a717496 Mon Sep 17 00:00:00 2001 From: Andy Coates <8012398+big-andy-coates@users.noreply.github.com> Date: Fri, 15 Nov 2019 20:43:19 +0000 Subject: [PATCH] Enable pull query RQTT tests (#3868) * chore: re-enable pull query tests fixes: #3864 --- .../ksql/test/rest/RestTestExecutor.java | 274 +++++++--- .../insert-values.json | 4 +- ...ries-against-materialized-aggregates.json} | 517 ++++++++---------- .../streaming/StreamedQueryResource.java | 3 + 4 files changed, 426 insertions(+), 372 deletions(-) rename ksql-functional-tests/src/test/resources/rest-query-validation-tests/{materialized-aggregate-static-queries.json => pull-queries-against-materialized-aggregates.json} (68%) diff --git a/ksql-functional-tests/src/test/java/io/confluent/ksql/test/rest/RestTestExecutor.java b/ksql-functional-tests/src/test/java/io/confluent/ksql/test/rest/RestTestExecutor.java index 0803c3de7d47..ba30cc1b6912 100644 --- a/ksql-functional-tests/src/test/java/io/confluent/ksql/test/rest/RestTestExecutor.java +++ b/ksql-functional-tests/src/test/java/io/confluent/ksql/test/rest/RestTestExecutor.java @@ -15,23 +15,27 @@ package io.confluent.ksql.test.rest; +import static java.util.Objects.requireNonNull; import static org.hamcrest.Matchers.greaterThanOrEqualTo; import static org.hamcrest.Matchers.hasKey; import static org.hamcrest.Matchers.hasSize; +import static org.hamcrest.Matchers.instanceOf; import static org.hamcrest.Matchers.is; import static org.junit.Assert.assertThat; import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.JsonNode; -import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableList.Builder; import com.google.common.collect.ImmutableMap; import io.confluent.ksql.json.JsonMapper; import io.confluent.ksql.rest.client.KsqlRestClient; +import io.confluent.ksql.rest.client.QueryStream; import io.confluent.ksql.rest.client.RestResponse; import io.confluent.ksql.rest.entity.KsqlEntity; import io.confluent.ksql.rest.entity.KsqlEntityList; import io.confluent.ksql.rest.entity.KsqlStatementErrorMessage; +import io.confluent.ksql.rest.entity.StreamedRow; import io.confluent.ksql.services.ServiceContext; import io.confluent.ksql.test.rest.model.Response; import io.confluent.ksql.test.tools.ExpectedRecordComparator; @@ -83,8 +87,8 @@ public class RestTestExecutor implements Closeable { ImmutableMap.of(), Optional.empty() ); - this.kafkaCluster = Objects.requireNonNull(kafkaCluster, "kafkaCluster"); - this.serviceContext = Objects.requireNonNull(serviceContext, "serviceContext"); + this.kafkaCluster = requireNonNull(kafkaCluster, "kafkaCluster"); + this.serviceContext = requireNonNull(serviceContext, "serviceContext"); } void buildAndExecuteQuery(final RestTestCase testCase) { @@ -92,7 +96,7 @@ void buildAndExecuteQuery(final RestTestCase testCase) { produceInputs(testCase.getInputsByTopic()); - final Optional> responses = sendStatements(testCase); + final Optional> responses = sendStatements(testCase); if (!responses.isPresent()) { return; } @@ -157,60 +161,59 @@ private void produceInputs(final Map> inputs) { }); } - private Optional> sendStatements(final RestTestCase testCase) { + private Optional> sendStatements(final RestTestCase testCase) { final List allStatements = testCase.getStatements(); - int firstStatic = 0; - for (; firstStatic < allStatements.size(); firstStatic++) { - final boolean isStatic = allStatements.get(firstStatic).startsWith("SELECT "); - if (isStatic) { + int firstQuery = 0; + for (; firstQuery < allStatements.size(); firstQuery++) { + final boolean isQuery = allStatements.get(firstQuery).startsWith("SELECT "); + if (isQuery) { break; } } - final List nonStatics = IntStream.range(0, firstStatic) + final List nonQuery = IntStream.range(0, firstQuery) .mapToObj(allStatements::get) .collect(Collectors.toList()); - final Optional> results = sendStatements(testCase, nonStatics); - if (!results.isPresent()) { + final Optional> adminResults = sendAdminStatements(testCase, nonQuery); + if (!adminResults.isPresent()) { return Optional.empty(); } - final List statics = IntStream.range(firstStatic, allStatements.size()) + final List queries = IntStream.range(firstQuery, allStatements.size()) .mapToObj(allStatements::get) .collect(Collectors.toList()); - if (statics.isEmpty()) { + if (queries.isEmpty()) { failIfExpectingError(testCase); - return results; + return adminResults; } if (!testCase.expectedError().isPresent()) { - for (int idx = firstStatic; testCase.getExpectedResponses().size() > idx; ++idx) { - final String staticStatement = allStatements.get(idx); - final Response staticResponse = testCase.getExpectedResponses().get(idx); + for (int idx = firstQuery; testCase.getExpectedResponses().size() > idx; ++idx) { + final String queryStatement = allStatements.get(idx); + final Response queryResponse = testCase.getExpectedResponses().get(idx); - waitForWarmStateStores(staticStatement, staticResponse); + waitForWarmStateStores(queryStatement, queryResponse); } } - final Optional> moreResults = sendStatements(testCase, statics); - if (!moreResults.isPresent()) { + final List moreResults = sendQueryStatements(testCase, queries); + if (moreResults.isEmpty()) { return Optional.empty(); } failIfExpectingError(testCase); - return moreResults - .map(ksqlEntities -> ImmutableList.builder() - .addAll(results.get()) - .addAll(ksqlEntities) + return Optional.of(ImmutableList.builder() + .addAll(adminResults.get()) + .addAll(moreResults) .build()); } - private Optional> sendStatements( + private Optional> sendAdminStatements( final RestTestCase testCase, final List statements ) { @@ -220,24 +223,34 @@ private Optional> sendStatements( final RestResponse resp = restClient.makeKsqlRequest(sql); if (resp.isErroneous()) { - final Optional>> expectedError = testCase.expectedError(); - if (!expectedError.isPresent()) { - final String statement = resp.getErrorMessage() instanceof KsqlStatementErrorMessage - ? ((KsqlStatementErrorMessage)resp.getErrorMessage()).getStatementText() - : ""; - - throw new AssertionError( - "Server failed to execute statement" + System.lineSeparator() - + "statement: " + statement + System.lineSeparator() - + "reason: " + resp.getErrorMessage() - ); - } + handleErrorResponse(testCase, resp); + return Optional.empty(); + } - final String reason = "Expected error mismatch." - + System.lineSeparator() - + "Actual: " + resp.getErrorMessage(); + final KsqlEntityList entity = resp.getResponse(); + return Optional.of(RqttResponse.admin(entity)); + } - assertThat(reason, resp, expectedError.get()); + private List sendQueryStatements( + final RestTestCase testCase, + final List statements + ) { + return statements.stream() + .map(stmt -> sendQueryStatement(testCase, stmt)) + .filter(Optional::isPresent) + .map(Optional::get) + .map(RqttResponse::query) + .collect(Collectors.toList()); + } + + private Optional sendQueryStatement( + final RestTestCase testCase, + final String sql + ) { + final RestResponse resp = restClient.makeQueryRequest(sql, null); + + if (resp.isErroneous()) { + handleErrorResponse(testCase, resp); return Optional.empty(); } @@ -269,8 +282,29 @@ private void verifyOutput(final RestTestCase testCase) { }); } + private static void handleErrorResponse(final RestTestCase testCase, final RestResponse resp) { + final Optional>> expectedError = testCase.expectedError(); + if (!expectedError.isPresent()) { + final String statement = resp.getErrorMessage() instanceof KsqlStatementErrorMessage + ? ((KsqlStatementErrorMessage) resp.getErrorMessage()).getStatementText() + : ""; + + throw new AssertionError( + "Server failed to execute statement" + System.lineSeparator() + + "statement: " + statement + System.lineSeparator() + + "reason: " + resp.getErrorMessage() + ); + } + + final String reason = "Expected error mismatch." + + System.lineSeparator() + + "Actual: " + resp.getErrorMessage(); + + assertThat(reason, resp, expectedError.get()); + } + private static void verifyResponses( - final List actualResponses, + final List actualResponses, final List expectedResponses, final List statements ) { @@ -281,17 +315,15 @@ private static void verifyResponses( ); for (int idx = 0; idx < expectedResponses.size(); idx++) { - final Map expected = expectedResponses.get(idx).getContent(); - final Map actual = asJsonMap(actualResponses.get(idx)); + final Map expectedResponse = expectedResponses.get(idx).getContent(); - // Expected does not need to include everything, only needs to be tested: - for (final Entry e : expected.entrySet()) { - final String key = e.getKey(); - final Object value = replaceMacros(e.getValue(), statements, idx); - final String baseReason = "Response mismatch at index " + idx; - assertThat(baseReason, actual, hasKey(key)); - assertThat(baseReason + " on key: " + key, actual.get(key), is(value)); - } + assertThat(expectedResponse.entrySet(), hasSize(1)); + + final String expectedType = expectedResponse.keySet().iterator().next(); + final Object expectedPayload = expectedResponse.values().iterator().next(); + + final RqttResponse actualResponse = actualResponses.get(idx); + actualResponse.verify(expectedType, expectedPayload, statements, idx); } } @@ -352,30 +384,28 @@ private static void compareKeyValueTimestamp( } } - private static Map asJsonMap(final KsqlEntity response) { + private static T asJson(final Object response, final TypeReference type) { try { - final ObjectMapper mapper = JsonMapper.INSTANCE.mapper; - final String text = mapper.writeValueAsString(response); - return mapper.readValue(text, new TypeReference>() { - }); + final String text = JsonMapper.INSTANCE.mapper.writeValueAsString(response); + return JsonMapper.INSTANCE.mapper.readValue(text, type); } catch (final Exception e) { throw new AssertionError("Failed to serialize response to JSON: " + response); } } private void waitForWarmStateStores( - final String staticStatement, - final Response staticResponse + final String querySql, + final Response queryResponse ) { - // Special handling for static queries is required, as they depend on materialized state stores - // being warmed up. Initial requests may return null values. + // Special handling for pull queries is required, as they depend on materialized state stores + // being warmed up. Initial requests may return no rows. - final ImmutableList expectedResponse = ImmutableList.of(staticResponse); - final ImmutableList statements = ImmutableList.of(staticStatement); + final ImmutableList expectedResponse = ImmutableList.of(queryResponse); + final ImmutableList statements = ImmutableList.of(querySql); final long threshold = System.currentTimeMillis() + MAX_STATIC_WARMUP.toMillis(); while (System.currentTimeMillis() < threshold) { - final RestResponse resp = restClient.makeKsqlRequest(staticStatement); + final RestResponse resp = restClient.makeQueryRequest(querySql, null); if (resp.isErroneous()) { Thread.yield(); LOG.info("Server responded with an error code to a static query. " @@ -383,7 +413,8 @@ private void waitForWarmStateStores( continue; } - final KsqlEntityList actualResponses = resp.getResponse(); + final List actualResponses = ImmutableList + .of(RqttResponse.query(resp.getResponse())); try { verifyResponses(actualResponses, expectedResponse, statements); @@ -396,4 +427,115 @@ private void waitForWarmStateStores( } } } + + private interface RqttResponse { + + static List admin(final KsqlEntityList adminResponses) { + return adminResponses.stream() + .map(RqttAdminResponse::new) + .collect(Collectors.toList()); + } + + static RqttResponse query(final QueryStream queryStream) { + final Builder responses = ImmutableList.builder(); + + while (queryStream.hasNext()) { + final StreamedRow row = queryStream.next(); + responses.add(row); + } + + queryStream.close(); + + return new RqttQueryResponse(responses.build()); + } + + void verify( + String expectedType, + Object expectedPayload, + List statements, + int idx + ); + } + + private static class RqttAdminResponse implements RqttResponse { + + private static final TypeReference> PAYLOAD_TYPE = + new TypeReference>() { + }; + + private final KsqlEntity entity; + + RqttAdminResponse(final KsqlEntity entity) { + this.entity = requireNonNull(entity, "entity"); + } + + @SuppressWarnings("unchecked") + @Override + public void verify( + final String expectedType, + final Object expectedPayload, + final List statements, + final int idx + ) { + assertThat("Expected admin response", expectedType, is("admin")); + assertThat("Admin payload should be JSON object", expectedPayload, is(instanceOf(Map.class))); + + final Map expected = (Map)expectedPayload; + + final Map actualPayload = asJson(entity, PAYLOAD_TYPE); + + // Expected does not need to include everything, only keys that need to be tested: + for (final Entry e : expected.entrySet()) { + final String key = e.getKey(); + final Object value = replaceMacros(e.getValue(), statements, idx); + final String baseReason = "Response mismatch at index " + idx; + assertThat(baseReason, actualPayload, hasKey(key)); + assertThat(baseReason + " on key: " + key, actualPayload.get(key), is(value)); + } + } + } + + private static class RqttQueryResponse implements RqttResponse { + + private static final TypeReference> PAYLOAD_TYPE = + new TypeReference>() { + }; + + private final List rows; + + RqttQueryResponse(final List rows) { + this.rows = requireNonNull(rows, "rows"); + } + + @SuppressWarnings("unchecked") + @Override + public void verify( + final String expectedType, + final Object expectedPayload, + final List statements, + final int idx + ) { + assertThat("Expected query response", expectedType, is("query")); + assertThat("Query response should be an array", expectedPayload, is(instanceOf(List.class))); + + final List expectedRows = (List) expectedPayload; + + assertThat("row count mismatch", rows.size(), is(expectedRows.size())); + + for (int i = 0; i != rows.size(); ++i) { + assertThat("Each row should JSON object", expectedRows.get(i), is(instanceOf(Map.class))); + final Map actual = asJson(rows.get(i), PAYLOAD_TYPE); + final Map expected = (Map) expectedRows.get(i); + + // Expected does not need to include everything, only keys that need to be tested: + for (final Entry e : expected.entrySet()) { + final String key = e.getKey(); + final Object value = replaceMacros(e.getValue(), statements, idx); + final String baseReason = "Response mismatch at index " + idx; + assertThat(baseReason, actual, hasKey(key)); + assertThat(baseReason + " on key: " + key, expected.get(key), is(value)); + } + } + } + } } diff --git a/ksql-functional-tests/src/test/resources/rest-query-validation-tests/insert-values.json b/ksql-functional-tests/src/test/resources/rest-query-validation-tests/insert-values.json index e7b2c36fdadc..3cdf825a9d26 100644 --- a/ksql-functional-tests/src/test/resources/rest-query-validation-tests/insert-values.json +++ b/ksql-functional-tests/src/test/resources/rest-query-validation-tests/insert-values.json @@ -15,7 +15,7 @@ {"topic": "test_topic", "timestamp": 1234, "key": "key", "value": {"ID": 10}} ], "responses": [ - {"@type": "currentStatus", "statementText": "{STATEMENT}"} + {"admin": {"@type": "currentStatus", "statementText": "{STATEMENT}"}} ] }, { @@ -270,7 +270,7 @@ {"topic": "test_topic", "timestamp": 1234, "key": "key", "value": {"id!": 10}} ], "responses": [ - {"@type": "currentStatus", "statementText": "{STATEMENT}"} + {"admin": {"@type": "currentStatus", "statementText": "{STATEMENT}"}} ] } ] diff --git a/ksql-functional-tests/src/test/resources/rest-query-validation-tests/materialized-aggregate-static-queries.json b/ksql-functional-tests/src/test/resources/rest-query-validation-tests/pull-queries-against-materialized-aggregates.json similarity index 68% rename from ksql-functional-tests/src/test/resources/rest-query-validation-tests/materialized-aggregate-static-queries.json rename to ksql-functional-tests/src/test/resources/rest-query-validation-tests/pull-queries-against-materialized-aggregates.json index fc5ba7779ecc..0829bb11a1f6 100644 --- a/ksql-functional-tests/src/test/resources/rest-query-validation-tests/materialized-aggregate-static-queries.json +++ b/ksql-functional-tests/src/test/resources/rest-query-validation-tests/pull-queries-against-materialized-aggregates.json @@ -1,12 +1,10 @@ { "comments": [ - "Tests covering static queries of (materialized) aggregate tables" + "Tests covering Pull queries of (materialized) aggregate tables" ], "tests": [ { "name": "non-windowed single key lookup", - "enabled": false, - "comment": "disabled until RQTT can handle /query endpoint", "statements": [ "CREATE STREAM INPUT (IGNORED INT) WITH (kafka_topic='test_topic', value_format='JSON');", "CREATE TABLE AGGREGATE AS SELECT COUNT(1) AS COUNT FROM INPUT GROUP BY ROWKEY;", @@ -18,20 +16,19 @@ {"topic": "test_topic", "key": "10", "value": {}} ], "responses": [ - {"@type": "currentStatus"}, - {"@type": "currentStatus"}, - { - "@type": "rows", - "schema": "`ROWKEY` STRING KEY, `COUNT` BIGINT", - "rows": [["10", 1]] - }, - {"@type": "rows", "rows": []} + {"admin": {"@type": "currentStatus"}}, + {"admin": {"@type": "currentStatus"}}, + {"query": [ + {"header":{"schema":"`ROWKEY` STRING KEY, `COUNT` BIGINT"}}, + {"row":{"columns":["10", 1]}} + ]}, + {"query": [ + {"header":{"schema":"`ROWKEY` STRING KEY, `COUNT` BIGINT"}} + ]} ] }, { "name": "tumbling windowed single key lookup with exact window start", - "enabled": false, - "comment": "disabled until RQTT can handle /query endpoint", "statements": [ "CREATE STREAM INPUT (IGNORED INT) WITH (kafka_topic='test_topic', value_format='JSON');", "CREATE TABLE AGGREGATE AS SELECT COUNT(1) AS COUNT FROM INPUT WINDOW TUMBLING(SIZE 1 SECOND) GROUP BY ROWKEY;", @@ -44,20 +41,19 @@ {"topic": "test_topic", "timestamp": 12345, "key": "10", "value": {}} ], "responses": [ - {"@type": "currentStatus"}, - {"@type": "currentStatus"}, - { - "@type": "rows", - "schema": "`ROWKEY` STRING KEY, `WINDOWSTART` BIGINT KEY, `COUNT` BIGINT", - "rows": [["10", 12000, 1]] - }, - {"@type": "rows", "rows": []} + {"admin": {"@type": "currentStatus"}}, + {"admin": {"@type": "currentStatus"}}, + {"query": [ + {"header":{"schema":"`ROWKEY` STRING KEY, `WINDOWSTART` BIGINT KEY, `COUNT` BIGINT"}}, + {"row":{"columns":["10", 12000, 1]}} + ]}, + {"query": [ + {"header":{"schema":"`ROWKEY` STRING KEY, `WINDOWSTART` BIGINT KEY, `COUNT` BIGINT"}} + ]} ] }, { "name": "hopping windowed single key lookup with exact window start", - "enabled": false, - "comment": "disabled until RQTT can handle /query endpoint", "statements": [ "CREATE STREAM INPUT (IGNORED INT) WITH (kafka_topic='test_topic', value_format='JSON');", "CREATE TABLE AGGREGATE AS SELECT COUNT(1) AS COUNT FROM INPUT WINDOW HOPPING(SIZE 10 SECOND, ADVANCE BY 1 SECONDS) GROUP BY ROWKEY;", @@ -71,25 +67,23 @@ {"topic": "test_topic", "timestamp": 13345, "key": "10", "value": {}} ], "responses": [ - {"@type": "currentStatus"}, - {"@type": "currentStatus"}, - { - "@type": "rows", - "schema": "`ROWKEY` STRING KEY, `WINDOWSTART` BIGINT KEY, `COUNT` BIGINT", - "rows": [["10", 12000, 1]] - }, - { - "@type": "rows", - "schema": "`ROWKEY` STRING KEY, `WINDOWSTART` BIGINT KEY, `COUNT` BIGINT", - "rows": [["10", 13000, 1]] - }, - {"@type": "rows", "rows": []} + {"admin": {"@type": "currentStatus"}}, + {"admin": {"@type": "currentStatus"}}, + {"query": [ + {"header":{"schema":"`ROWKEY` STRING KEY, `WINDOWSTART` BIGINT KEY, `COUNT` BIGINT"}}, + {"row":{"columns":["10", 12000, 1]}} + ]}, + {"query": [ + {"header":{"schema":"`ROWKEY` STRING KEY, `WINDOWSTART` BIGINT KEY, `COUNT` BIGINT"}}, + {"row":{"columns":["10", 13000, 1]}} + ]}, + {"query": [ + {"header":{"schema":"`ROWKEY` STRING KEY, `WINDOWSTART` BIGINT KEY, `COUNT` BIGINT"}} + ]} ] }, { "name": "session windowed single key lookup with exact window start", - "enabled": false, - "comment": "disabled until RQTT can handle /query endpoint", "statements": [ "CREATE STREAM INPUT (IGNORED INT) WITH (kafka_topic='test_topic', value_format='JSON');", "CREATE TABLE AGGREGATE AS SELECT COUNT(1) AS COUNT FROM INPUT WINDOW SESSION(10 SECOND) GROUP BY ROWKEY;", @@ -103,21 +97,22 @@ {"topic": "test_topic", "timestamp": 12366, "key": "10", "value": {}} ], "responses": [ - {"@type": "currentStatus"}, - {"@type": "currentStatus"}, - { - "@type": "rows", - "schema": "`ROWKEY` STRING KEY, `WINDOWSTART` BIGINT KEY, `WINDOWEND` BIGINT KEY, `COUNT` BIGINT", - "rows": [["10", 12345, 12366, 2]] - }, - {"@type": "rows", "rows": []}, - {"@type": "rows", "rows": []} + {"admin": {"@type": "currentStatus"}}, + {"admin": {"@type": "currentStatus"}}, + {"query": [ + {"header":{"schema":"`ROWKEY` STRING KEY, `WINDOWSTART` BIGINT KEY, `WINDOWEND` BIGINT KEY, `COUNT` BIGINT"}}, + {"row":{"columns":["10", 12345, 12366, 2]}} + ]}, + {"query": [ + {"header":{"schema":"`ROWKEY` STRING KEY, `WINDOWSTART` BIGINT KEY, `WINDOWEND` BIGINT KEY, `COUNT` BIGINT"}} + ]}, + {"query": [ + {"header":{"schema":"`ROWKEY` STRING KEY, `WINDOWSTART` BIGINT KEY, `WINDOWEND` BIGINT KEY, `COUNT` BIGINT"}} + ]} ] }, { "name": "tumbling windowed single key lookup with window start range", - "enabled": false, - "comment": "disabled until RQTT can handle /query endpoint", "statements": [ "CREATE STREAM INPUT (IGNORED INT) WITH (kafka_topic='test_topic', value_format='JSON');", "CREATE TABLE AGGREGATE AS SELECT COUNT(1) AS COUNT FROM INPUT WINDOW TUMBLING(SIZE 1 SECOND) GROUP BY ROWKEY;", @@ -134,38 +129,29 @@ {"topic": "test_topic", "timestamp": 15364, "key": "10", "value": {}} ], "responses": [ - {"@type": "currentStatus"}, - {"@type": "currentStatus"}, - { - "@type": "rows", - "schema": "`ROWKEY` STRING KEY, `WINDOWSTART` BIGINT KEY, `COUNT` BIGINT", - "rows": [ - ["10", 12000, 2], - ["10", 14000, 1], - ["10", 15000, 1] - ] - }, - { - "@type": "rows", - "schema": "`ROWKEY` STRING KEY, `WINDOWSTART` BIGINT KEY, `COUNT` BIGINT", - "rows": [ - ["10", 12000, 2] - ] - }, - { - "@type": "rows", - "schema": "`ROWKEY` STRING KEY, `WINDOWSTART` BIGINT KEY, `COUNT` BIGINT", - "rows": [ - ["10", 14000, 1] - ] - }, - {"@type": "rows", "rows": []} + {"admin": {"@type": "currentStatus"}}, + {"admin": {"@type": "currentStatus"}}, + {"query": [ + {"header":{"schema":"`ROWKEY` STRING KEY, `WINDOWSTART` BIGINT KEY, `COUNT` BIGINT"}}, + {"row":{"columns":["10", 12000, 2]}}, + {"row":{"columns":["10", 14000, 1]}}, + {"row":{"columns":["10", 15000, 1]}} + ]}, + {"query": [ + {"header":{"schema":"`ROWKEY` STRING KEY, `WINDOWSTART` BIGINT KEY, `COUNT` BIGINT"}}, + {"row":{"columns":["10", 12000, 2]}} + ]}, + {"query": [ + {"header":{"schema":"`ROWKEY` STRING KEY, `WINDOWSTART` BIGINT KEY, `COUNT` BIGINT"}}, + {"row":{"columns":["10", 14000, 1]}} + ]}, + {"query": [ + {"header":{"schema":"`ROWKEY` STRING KEY, `WINDOWSTART` BIGINT KEY, `COUNT` BIGINT"}} + ]} ] }, { "name": "hopping windowed single key lookup with window start range", - "enabled": false, - "comment": "disabled until RQTT can handle /query endpoint", "statements": [ "CREATE STREAM INPUT (IGNORED INT) WITH (kafka_topic='test_topic', value_format='JSON');", "CREATE TABLE AGGREGATE AS SELECT COUNT(1) AS COUNT FROM INPUT WINDOW HOPPING(SIZE 5 SECOND, ADVANCE BY 1 SECONDS) GROUP BY ROWKEY;", @@ -179,33 +165,29 @@ {"topic": "test_topic", "timestamp": 13251, "key": "10", "value": {}} ], "responses": [ - {"@type": "currentStatus"}, - {"@type": "currentStatus"}, - { - "@type": "rows", - "rows": [ - ["10", 7000, 1], - ["10", 8000, 1], - ["10", 9000, 2], - ["10", 10000, 2] - ] - }, - { - "@type": "rows", - "rows": [ - ["10", 8000, 1], - ["10", 9000, 2], - ["10", 10000, 2], - ["10", 11000, 1] - ] - }, - {"@type": "rows", "rows": []} + {"admin": {"@type": "currentStatus"}}, + {"admin": {"@type": "currentStatus"}}, + {"query": [ + {"header":{"schema":"`ROWKEY` STRING KEY, `WINDOWSTART` BIGINT KEY, `COUNT` BIGINT"}}, + {"row":{"columns":["10", 7000, 1]}}, + {"row":{"columns":["10", 8000, 1]}}, + {"row":{"columns":["10", 9000, 2]}}, + {"row":{"columns":["10", 10000, 2]}} + ]}, + {"query": [ + {"header":{"schema":"`ROWKEY` STRING KEY, `WINDOWSTART` BIGINT KEY, `COUNT` BIGINT"}}, + {"row":{"columns":["10", 8000, 1]}}, + {"row":{"columns":["10", 9000, 2]}}, + {"row":{"columns":["10", 10000, 2]}}, + {"row":{"columns":["10", 11000, 1]}} + ]}, + {"query": [ + {"header":{"schema":"`ROWKEY` STRING KEY, `WINDOWSTART` BIGINT KEY, `COUNT` BIGINT"}} + ]} ] }, { "name": "session windowed single key lookup with window start range", - "enabled": false, - "comment": "disabled until RQTT can handle /query endpoint", "statements": [ "CREATE STREAM INPUT (IGNORED INT) WITH (kafka_topic='test_topic', value_format='JSON');", "CREATE TABLE AGGREGATE AS SELECT COUNT(1) AS COUNT FROM INPUT WINDOW SESSION(5 SECOND) GROUP BY ROWKEY;", @@ -220,33 +202,25 @@ {"topic": "test_topic", "timestamp": 19444, "key": "10", "value": {}} ], "responses": [ - {"@type": "currentStatus"}, - {"@type": "currentStatus"}, - { - "@type": "rows", - "rows": [ - ["10", 8001, 10456, 2], - ["10", 19444, 19444, 1] - ] - }, - { - "@type": "rows", - "rows": [ - ["10", 8001, 10456, 2] - ] - }, - { - "@type": "rows", - "rows": [ - ["10", 19444, 19444, 1] - ] - } + {"admin": {"@type": "currentStatus"}}, + {"admin": {"@type": "currentStatus"}}, + {"query": [ + {"header":{"schema":"`ROWKEY` STRING KEY, `WINDOWSTART` BIGINT KEY, `WINDOWEND` BIGINT KEY, `COUNT` BIGINT"}}, + {"row":{"columns":["10", 8001, 10456, 2]}}, + {"row":{"columns":["10", 19444, 19444, 1]}} + ]}, + {"query": [ + {"header":{"schema":"`ROWKEY` STRING KEY, `WINDOWSTART` BIGINT KEY, `WINDOWEND` BIGINT KEY, `COUNT` BIGINT"}}, + {"row":{"columns":["10", 8001, 10456, 2]}} + ]}, + {"query": [ + {"header":{"schema":"`ROWKEY` STRING KEY, `WINDOWSTART` BIGINT KEY, `WINDOWEND` BIGINT KEY, `COUNT` BIGINT"}}, + {"row":{"columns":["10", 19444, 19444, 1]}} + ]} ] }, { "name": "tumbling windowed single key lookup with unbounded window start", - "enabled": false, - "comment": "disabled until RQTT can handle /query endpoint", "statements": [ "CREATE STREAM INPUT (IGNORED INT) WITH (kafka_topic='test_topic', value_format='JSON');", "CREATE TABLE AGGREGATE AS SELECT COUNT(1) AS COUNT FROM INPUT WINDOW TUMBLING(SIZE 1 SECOND) GROUP BY ROWKEY;", @@ -258,21 +232,17 @@ {"topic": "test_topic", "timestamp": 12345, "key": "10", "value": {}} ], "responses": [ - {"@type": "currentStatus"}, - {"@type": "currentStatus"}, - { - "@type": "rows", - "rows": [ - ["10", 11000, 1], - ["10", 12000, 1] - ] - } + {"admin": {"@type": "currentStatus"}}, + {"admin": {"@type": "currentStatus"}}, + {"query": [ + {"header":{"schema":"`ROWKEY` STRING KEY, `WINDOWSTART` BIGINT KEY, `COUNT` BIGINT"}}, + {"row":{"columns":["10", 11000, 1]}}, + {"row":{"columns":["10", 12000, 1]}} + ]} ] }, { "name": "hopping windowed single key lookup with unbounded window start", - "enabled": false, - "comment": "disabled until RQTT can handle /query endpoint", "statements": [ "CREATE STREAM INPUT (IGNORED INT) WITH (kafka_topic='test_topic', value_format='JSON');", "CREATE TABLE AGGREGATE AS SELECT COUNT(1) AS COUNT FROM INPUT WINDOW HOPPING(SIZE 4 SECOND, ADVANCE BY 1 SECONDS) GROUP BY ROWKEY;", @@ -284,26 +254,22 @@ {"topic": "test_topic", "timestamp": 13345, "key": "10", "value": {}} ], "responses": [ - {"@type": "currentStatus"}, - {"@type": "currentStatus"}, - { - "@type": "rows", - "rows": [ - ["10", 7000, 1], - ["10", 8000, 1], - ["10", 9000, 1], - ["10", 10000, 2], - ["10", 11000, 1], - ["10", 12000, 1], - ["10", 13000, 1] - ] - } + {"admin": {"@type": "currentStatus"}}, + {"admin": {"@type": "currentStatus"}}, + {"query": [ + {"header":{"schema":"`ROWKEY` STRING KEY, `WINDOWSTART` BIGINT KEY, `COUNT` BIGINT"}}, + {"row":{"columns":["10", 7000, 1]}}, + {"row":{"columns":["10", 8000, 1]}}, + {"row":{"columns":["10", 9000, 1]}}, + {"row":{"columns":["10", 10000, 2]}}, + {"row":{"columns":["10", 11000, 1]}}, + {"row":{"columns":["10", 12000, 1]}}, + {"row":{"columns":["10", 13000, 1]}} + ]} ] }, { "name": "session windowed single key lookup with unbounded window start", - "enabled": false, - "comment": "disabled until RQTT can handle /query endpoint", "statements": [ "CREATE STREAM INPUT (IGNORED INT) WITH (kafka_topic='test_topic', value_format='JSON');", "CREATE TABLE AGGREGATE AS SELECT COUNT(1) AS COUNT FROM INPUT WINDOW SESSION(10 SECOND) GROUP BY ROWKEY;", @@ -315,18 +281,16 @@ {"topic": "test_topic", "timestamp": 12366, "key": "10", "value": {}} ], "responses": [ - {"@type": "currentStatus"}, - {"@type": "currentStatus"}, - { - "@type": "rows", - "rows": [["10", 12345, 12366, 2]] - } + {"admin": {"@type": "currentStatus"}}, + {"admin": {"@type": "currentStatus"}}, + {"query": [ + {"header":{"schema":"`ROWKEY` STRING KEY, `WINDOWSTART` BIGINT KEY, `WINDOWEND` BIGINT KEY, `COUNT` BIGINT"}}, + {"row":{"columns":["10", 12345, 12366, 2]}} + ]} ] }, { "name": "non-windowed with projection", - "enabled": false, - "comment": "disabled until RQTT can handle /query endpoint", "statements": [ "CREATE STREAM INPUT (IGNORED INT) WITH (kafka_topic='test_topic', value_format='JSON');", "CREATE TABLE AGGREGATE AS SELECT ROWKEY AS ID, COUNT(1) AS COUNT FROM INPUT GROUP BY ROWKEY;", @@ -337,19 +301,16 @@ {"topic": "test_topic", "key": "10", "value": {}} ], "responses": [ - {"@type": "currentStatus"}, - {"@type": "currentStatus"}, - { - "@type": "rows", - "schema": "`COUNT` BIGINT, `ID` STRING, `KSQL_COL_2` BIGINT", - "rows": [[1, "10x", 2]] - } + {"admin": {"@type": "currentStatus"}}, + {"admin": {"@type": "currentStatus"}}, + {"query": [ + {"header":{"schema":"`COUNT` BIGINT, `ID` STRING, `KSQL_COL_2` BIGINT"}}, + {"row":{"columns":[1, "10x", 2]}} + ]} ] }, { "name": "windowed with projection", - "enabled": false, - "comment": "disabled until RQTT can handle /query endpoint", "statements": [ "CREATE STREAM INPUT (IGNORED INT) WITH (kafka_topic='test_topic', value_format='JSON');", "CREATE TABLE AGGREGATE AS SELECT ROWKEY AS ID, COUNT(1) AS COUNT FROM INPUT WINDOW TUMBLING(SIZE 1 SECOND) GROUP BY ROWKEY;", @@ -361,19 +322,16 @@ {"topic": "test_topic", "timestamp": 12345, "key": "10", "value": {}} ], "responses": [ - {"@type": "currentStatus"}, - {"@type": "currentStatus"}, - { - "@type": "rows", - "schema": "`COUNT` BIGINT, `ID` STRING, `KSQL_COL_2` BIGINT", - "rows": [[1, "10x", 2]] - } + {"admin": {"@type": "currentStatus"}}, + {"admin": {"@type": "currentStatus"}}, + {"query": [ + {"header":{"schema":"`COUNT` BIGINT, `ID` STRING, `KSQL_COL_2` BIGINT"}}, + {"row":{"columns":[1, "10x", 2]}} + ]} ] }, { "name": "non-windowed projection WITH ROWKEY", - "enabled": false, - "comment": "disabled until RQTT can handle /query endpoint", "statements": [ "CREATE STREAM INPUT (IGNORED INT) WITH (kafka_topic='test_topic', value_format='JSON');", "CREATE TABLE AGGREGATE AS SELECT COUNT(1) AS COUNT FROM INPUT GROUP BY ROWKEY;", @@ -384,19 +342,16 @@ {"topic": "test_topic", "key": "10", "value": {}} ], "responses": [ - {"@type": "currentStatus"}, - {"@type": "currentStatus"}, - { - "@type": "rows", - "schema": "`ROWKEY` STRING KEY, `COUNT` BIGINT", - "rows": [["10", 1]] - } + {"admin": {"@type": "currentStatus"}}, + {"admin": {"@type": "currentStatus"}}, + {"query": [ + {"header":{"schema":"`ROWKEY` STRING KEY, `COUNT` BIGINT"}}, + {"row":{"columns":["10", 1]}} + ]} ] }, { "name": "windowed with projection with ROWKEY", - "enabled": false, - "comment": "disabled until RQTT can handle /query endpoint", "statements": [ "CREATE STREAM INPUT (IGNORED INT) WITH (kafka_topic='test_topic', value_format='JSON');", "CREATE TABLE AGGREGATE AS SELECT COUNT(1) AS COUNT FROM INPUT WINDOW TUMBLING(SIZE 1 SECOND) GROUP BY ROWKEY;", @@ -408,19 +363,16 @@ {"topic": "test_topic", "timestamp": 12345, "key": "10", "value": {}} ], "responses": [ - {"@type": "currentStatus"}, - {"@type": "currentStatus"}, - { - "@type": "rows", - "schema": "`COUNT` BIGINT, `ROWKEY` STRING KEY", - "rows": [[1, "10"]] - } + {"admin": {"@type": "currentStatus"}}, + {"admin": {"@type": "currentStatus"}}, + {"query": [ + {"header":{"schema":"`COUNT` BIGINT, `ROWKEY` STRING KEY"}}, + {"row":{"columns":[1, "10"]}} + ]} ] }, { "name": "non-windowed projection WITH ROWTIME", - "enabled": false, - "comment": "disabled until RQTT can handle /query endpoint", "statements": [ "CREATE STREAM INPUT (IGNORED INT) WITH (kafka_topic='test_topic', value_format='JSON');", "CREATE TABLE AGGREGATE AS SELECT COUNT(1) AS COUNT FROM INPUT GROUP BY ROWKEY;", @@ -428,14 +380,12 @@ ], "expectedError": { "type": "io.confluent.ksql.rest.entity.KsqlStatementErrorMessage", - "message": "Static queries don't support ROWTIME in select columns.", + "message": "Pull queries don't support ROWTIME in select columns.", "status": 400 } }, { "name": "windowed with projection with ROWTIME", - "enabled": false, - "comment": "disabled until RQTT can handle /query endpoint", "statements": [ "CREATE STREAM INPUT (IGNORED INT) WITH (kafka_topic='test_topic', value_format='JSON');", "CREATE TABLE AGGREGATE AS SELECT COUNT(1) AS COUNT FROM INPUT WINDOW TUMBLING(SIZE 1 SECOND) GROUP BY ROWKEY;", @@ -443,14 +393,12 @@ ], "expectedError": { "type": "io.confluent.ksql.rest.entity.KsqlStatementErrorMessage", - "message": "Static queries don't support ROWTIME in select columns.", + "message": "Pull queries don't support ROWTIME in select columns.", "status": 400 } }, { "name": "non-windowed projection with ROWMEY and more columns in aggregate", - "enabled": false, - "comment": "disabled until RQTT can handle /query endpoint", "statements": [ "CREATE STREAM INPUT (VAL INT) WITH (kafka_topic='test_topic', value_format='JSON');", "CREATE TABLE AGGREGATE AS SELECT COUNT(1) AS COUNT, SUM(VAL) AS SUM, MIN(VAL) AS MIN FROM INPUT GROUP BY ROWKEY;", @@ -461,19 +409,16 @@ {"topic": "test_topic", "key": "10", "value": {"val": 2}} ], "responses": [ - {"@type": "currentStatus"}, - {"@type": "currentStatus"}, - { - "@type": "rows", - "schema": "`ROWKEY` STRING KEY, `COUNT` BIGINT", - "rows": [["10", 1]] - } + {"admin": {"@type": "currentStatus"}}, + {"admin": {"@type": "currentStatus"}}, + {"query": [ + {"header":{"schema":"`ROWKEY` STRING KEY, `COUNT` BIGINT"}}, + {"row":{"columns":["10", 1]}} + ]} ] }, { "name": "non-windowed projection with ROWMEY and more columns in lookup", - "enabled": false, - "comment": "disabled until RQTT can handle /query endpoint", "statements": [ "CREATE STREAM INPUT (IGNORED INT) WITH (kafka_topic='test_topic', value_format='JSON');", "CREATE TABLE AGGREGATE AS SELECT COUNT(1) AS COUNT FROM INPUT GROUP BY ROWKEY;", @@ -485,20 +430,17 @@ {"topic": "test_topic", "timestamp": 12345, "key": "10", "value": {}} ], "responses": [ - {"@type": "currentStatus"}, - {"@type": "currentStatus"}, - { - "@type": "rows", - "schema": "`COUNT` BIGINT, `ROWKEY` STRING KEY, `COUNT2` BIGINT", - "rows": [[2,"10",2]] - } + {"admin": {"@type": "currentStatus"}}, + {"admin": {"@type": "currentStatus"}}, + {"query": [ + {"header":{"schema":"`COUNT` BIGINT, `ROWKEY` STRING KEY, `COUNT2` BIGINT"}}, + {"row":{"columns":[2,"10",2]}} + ]} ] }, { "name": "text datetime window bounds", "comment": "Note: this test must specify a timezone in the exact lookup so that it works when run from any TZ.", - "enabled": false, - "comment": "disabled until RQTT can handle /query endpoint", "statements": [ "CREATE STREAM INPUT (IGNORED INT) WITH (kafka_topic='test_topic', value_format='JSON');", "CREATE TABLE AGGREGATE AS SELECT COUNT(1) AS COUNT FROM INPUT WINDOW TUMBLING(SIZE 1 SECOND) GROUP BY ROWKEY;", @@ -509,17 +451,16 @@ {"topic": "test_topic", "timestamp": 1582501512456, "key": "10", "value": {}} ], "responses": [ - {"@type": "currentStatus"}, - {"@type": "currentStatus"}, - {"@type": "rows", "rows": [ - ["10", 1582501512000, 1] + {"admin": {"@type": "currentStatus"}}, + {"admin": {"@type": "currentStatus"}}, + {"query": [ + {"header":{"schema":"`COUNT` BIGINT, `ROWKEY` STRING KEY, `COUNT2` BIGINT"}}, + {"row":{"columns":["10", 1582501512000, 1]}} ]} ] }, { "name": "text datetime + timezone window bounds", - "enabled": false, - "comment": "disabled until RQTT can handle /query endpoint", "statements": [ "CREATE STREAM INPUT (IGNORED INT) WITH (kafka_topic='test_topic', value_format='JSON');", "CREATE TABLE AGGREGATE AS SELECT COUNT(1) AS COUNT FROM INPUT WINDOW TUMBLING(SIZE 1 SECOND) GROUP BY ROWKEY;", @@ -530,18 +471,17 @@ {"topic": "test_topic", "timestamp": 1582501512456, "key": "10", "value": {}} ], "responses": [ - {"@type": "currentStatus"}, - {"@type": "currentStatus"}, - {"@type": "rows", "rows": [ - ["10", 1582501512000, 1] + {"admin": {"@type": "currentStatus"}}, + {"admin": {"@type": "currentStatus"}}, + {"query": [ + {"header":{"schema":"`COUNT` BIGINT, `ROWKEY` STRING KEY, `COUNT2` BIGINT"}}, + {"row":{"columns":["10", 1582501512000, 1]}} ]} ] }, { "name": "partial text datetime window bounds", "comment": "Note: this test has side enough range on dates to ensure running in different timezones do not cause it to fail", - "enabled": false, - "comment": "disabled until RQTT can handle /query endpoint", "statements": [ "CREATE STREAM INPUT (IGNORED INT) WITH (kafka_topic='test_topic', value_format='JSON');", "CREATE TABLE AGGREGATE AS SELECT COUNT(1) AS COUNT FROM INPUT WINDOW TUMBLING(SIZE 1 SECOND) GROUP BY ROWKEY;", @@ -554,20 +494,17 @@ {"topic": "test_topic", "timestamp": 1582501552456, "key": "10", "value": {}} ], "responses": [ - {"@type": "currentStatus"}, - {"@type": "currentStatus"}, - { - "@type": "rows", - "rows": [ - ["10", 1582501512000, 1], - ["10", 1582501552000, 1] + {"admin": {"@type": "currentStatus"}}, + {"admin": {"@type": "currentStatus"}}, + {"query": [ + {"header":{"schema":"`COUNT` BIGINT, `ROWKEY` STRING KEY, `COUNT2` BIGINT"}}, + {"row":{"columns":["10", 1582501512000, 1]}}, + {"row":{"columns":["10", 1582501552000, 1]}} ]} ] }, { "name": "aliased table", - "enabled": false, - "comment": "disabled until RQTT can handle /query endpoint", "statements": [ "CREATE STREAM INPUT (IGNORED INT) WITH (kafka_topic='test_topic', value_format='JSON');", "CREATE TABLE AGGREGATE AS SELECT COUNT(1) AS COUNT FROM INPUT GROUP BY ROWKEY;", @@ -577,19 +514,16 @@ {"topic": "test_topic", "key": "10", "value": {}} ], "responses": [ - {"@type": "currentStatus"}, - {"@type": "currentStatus"}, - { - "@type": "rows", - "schema": "`ROWKEY` STRING KEY, `COUNT` BIGINT", - "rows": [["10", 1]] - } + {"admin": {"@type": "currentStatus"}}, + {"admin": {"@type": "currentStatus"}}, + {"query": [ + {"header":{"schema":"`ROWKEY` STRING KEY, `COUNT` BIGINT"}}, + {"row":{"columns":["10", 1]}} + ]} ] }, { "name": "multiple aggregate columns", - "enabled": false, - "comment": "disabled until RQTT can handle /query endpoint", "statements": [ "CREATE STREAM INPUT (IGNORED INT) WITH (kafka_topic='test_topic', value_format='JSON');", "CREATE TABLE AGGREGATE AS SELECT COUNT(1) AS COUNT, SUM(CAST(ROWKEY AS INT)) AS SUM FROM INPUT GROUP BY ROWKEY;", @@ -599,19 +533,16 @@ {"topic": "test_topic", "key": "10", "value": {}} ], "responses": [ - {"@type": "currentStatus"}, - {"@type": "currentStatus"}, - { - "@type": "rows", - "schema": "`ROWKEY` STRING KEY, `COUNT` BIGINT, `SUM` INTEGER", - "rows": [["10", 1,10]] - } + {"admin": {"@type": "currentStatus"}}, + {"admin": {"@type": "currentStatus"}}, + {"query": [ + {"header":{"schema":"`ROWKEY` STRING KEY, `COUNT` BIGINT, `SUM` INTEGER"}}, + {"row":{"columns":["10", 1, 10]}} + ]} ] }, { "name": "having clause on aggregate", - "enabled": false, - "comment": "disabled until RQTT can handle /query endpoint", "statements": [ "CREATE STREAM INPUT (X INT) WITH (kafka_topic='test_topic', value_format='JSON');", "CREATE TABLE AGGREGATE AS SELECT SUM(X) AS SUM FROM INPUT GROUP BY ROWKEY HAVING SUM(X) < 10;", @@ -623,16 +554,19 @@ {"topic": "test_topic", "key": "missing", "value": {"X": 10}} ], "responses": [ - {"@type": "currentStatus"}, - {"@type": "currentStatus"}, - {"@type": "rows", "rows": [["10", 9]]}, - {"@type": "rows", "rows": []} + {"admin": {"@type": "currentStatus"}}, + {"admin": {"@type": "currentStatus"}}, + {"query": [ + {"header":{"schema":"`ROWKEY` STRING KEY, `SUM` INTEGER"}}, + {"row":{"columns":["10", 9]}} + ]}, + {"query": [ + {"header":{"schema":"`ROWKEY` STRING KEY, `SUM` INTEGER"}} + ]} ] }, { "name": "non-windowed with UDAF with different intermediate type", - "enabled": false, - "comment": "disabled until RQTT can handle /query endpoint", "statements": [ "CREATE STREAM INPUT (VAL INT) WITH (kafka_topic='test_topic', value_format='JSON');", "CREATE TABLE AGGREGATE AS SELECT COUNT(1) AS COUNT, AVG(VAL) AS AVG FROM INPUT GROUP BY ROWKEY;", @@ -644,19 +578,16 @@ {"topic": "test_topic", "key": "10", "value": {"val": 4}} ], "responses": [ - {"@type": "currentStatus"}, - {"@type": "currentStatus"}, - { - "@type": "rows", - "schema": "`ROWKEY` STRING KEY, `COUNT` BIGINT, `AVG` DOUBLE", - "rows": [["10", 2, 3.0]] - } + {"admin": {"@type": "currentStatus"}}, + {"admin": {"@type": "currentStatus"}}, + {"query": [ + {"header":{"schema":"`ROWKEY` STRING KEY, `COUNT` BIGINT, `AVG` DOUBLE"}}, + {"row":{"columns":["10", 2, 3.0]}} + ]} ] }, { "name": "windowed with UDAF with different intermediate type", - "enabled": false, - "comment": "disabled until RQTT can handle /query endpoint", "statements": [ "CREATE STREAM INPUT (VAL INT) WITH (kafka_topic='test_topic', value_format='JSON');", "CREATE TABLE AGGREGATE AS SELECT COUNT(1) AS COUNT, AVG(VAL) AS AVG FROM INPUT WINDOW TUMBLING(SIZE 1 SECOND) GROUP BY ROWKEY;", @@ -668,20 +599,16 @@ {"topic": "test_topic", "timestamp": 11346, "key": "10", "value": {"VAL": 4}} ], "responses": [ - {"@type": "currentStatus"}, - {"@type": "currentStatus"}, - { - "@type": "rows", - "schema": "`ROWKEY` STRING KEY, `WINDOWSTART` BIGINT KEY, `COUNT` BIGINT, `AVG` DOUBLE", - "rows": [ - ["10", 11000, 2, 5.0] + {"admin": {"@type": "currentStatus"}}, + {"admin": {"@type": "currentStatus"}}, + {"query": [ + {"header":{"schema":"`ROWKEY` STRING KEY, `WINDOWSTART` BIGINT KEY, `COUNT` BIGINT, `AVG` DOUBLE"}}, + {"row":{"columns":["10", 11000, 2, 5.0]}} ]} ] }, { "name": "fail on unsupported query feature: join", - "enabled": false, - "comment": "disabled until RQTT can handle /query endpoint", "statements": [ "CREATE STREAM INPUT (IGNORED INT) WITH (kafka_topic='test_topic', value_format='JSON');", "CREATE TABLE AGGREGATE AS SELECT COUNT(1) AS COUNT FROM INPUT GROUP BY ROWKEY;", @@ -689,14 +616,12 @@ ], "expectedError": { "type": "io.confluent.ksql.rest.entity.KsqlStatementErrorMessage", - "message": "Static queries don't support JOIN clauses.", + "message": "Pull queries don't support JOIN clauses.", "status": 400 } }, { "name": "fail on unsupported query feature: window", - "enabled": false, - "comment": "disabled until RQTT can handle /query endpoint", "statements": [ "CREATE STREAM INPUT (IGNORED INT) WITH (kafka_topic='test_topic', value_format='JSON');", "CREATE TABLE AGGREGATE AS SELECT COUNT(1) AS COUNT FROM INPUT GROUP BY ROWKEY;", @@ -704,14 +629,12 @@ ], "expectedError": { "type": "io.confluent.ksql.rest.entity.KsqlStatementErrorMessage", - "message": "Static queries don't support WINDOW clauses.", + "message": "Pull queries don't support WINDOW clauses.", "status": 400 } }, { "name": "fail on unsupported query feature: group by", - "enabled": false, - "comment": "disabled until RQTT can handle /query endpoint", "statements": [ "CREATE STREAM INPUT (IGNORED INT) WITH (kafka_topic='test_topic', value_format='JSON');", "CREATE TABLE AGGREGATE AS SELECT COUNT(1) AS COUNT FROM INPUT GROUP BY ROWKEY;", @@ -719,14 +642,12 @@ ], "expectedError": { "type": "io.confluent.ksql.rest.entity.KsqlStatementErrorMessage", - "message": "Static queries don't support GROUP BY clauses", + "message": "Pull queries don't support GROUP BY clauses", "status": 400 } }, { "name": "fail on unsupported query feature: where multiple rowkeys", - "enabled": false, - "comment": "disabled until RQTT can handle /query endpoint", "statements": [ "CREATE STREAM INPUT (IGNORED INT) WITH (kafka_topic='test_topic', value_format='JSON');", "CREATE TABLE AGGREGATE AS SELECT COUNT(1) AS COUNT FROM INPUT GROUP BY ROWKEY;", @@ -740,8 +661,6 @@ }, { "name": "fail on unsupported query feature: where rowkey range", - "enabled": false, - "comment": "disabled until RQTT can handle /query endpoint", "statements": [ "CREATE STREAM INPUT (IGNORED INT) WITH (kafka_topic='test_topic', value_format='JSON');", "CREATE TABLE AGGREGATE AS SELECT COUNT(1) AS COUNT FROM INPUT GROUP BY ROWKEY;", @@ -755,8 +674,6 @@ }, { "name": "fail on unsupported query feature: where rowkey not equals", - "enabled": false, - "comment": "disabled until RQTT can handle /query endpoint", "statements": [ "CREATE STREAM INPUT (IGNORED INT) WITH (kafka_topic='test_topic', value_format='JSON');", "CREATE TABLE AGGREGATE AS SELECT COUNT(1) AS COUNT FROM INPUT GROUP BY ROWKEY;", @@ -770,8 +687,6 @@ }, { "name": "fail on unsupported query feature: where rowkey not string", - "enabled": false, - "comment": "disabled until RQTT can handle /query endpoint", "statements": [ "CREATE STREAM INPUT (IGNORED INT) WITH (kafka_topic='test_topic', value_format='JSON');", "CREATE TABLE AGGREGATE AS SELECT COUNT(1) AS COUNT FROM INPUT GROUP BY ROWKEY;", @@ -785,8 +700,6 @@ }, { "name": "fail on unsupported query feature: where not on rowkey", - "enabled": false, - "comment": "disabled until RQTT can handle /query endpoint", "statements": [ "CREATE STREAM INPUT (IGNORED INT) WITH (kafka_topic='test_topic', value_format='JSON');", "CREATE TABLE AGGREGATE AS SELECT COUNT(1) AS COUNT FROM INPUT GROUP BY ROWKEY;", @@ -800,22 +713,18 @@ }, { "name": "fail on unsupported query feature: not materialized aggregate", - "enabled": false, - "comment": "disabled until RQTT can handle /query endpoint", "statements": [ "CREATE TABLE X (ID INT) WITH (kafka_topic='test_topic', value_format='JSON');", "SELECT * FROM X WHERE ROWKEY = '100';" ], "expectedError": { "type": "io.confluent.ksql.rest.entity.KsqlStatementErrorMessage", - "message": "Table 'X' is not materialized. KSQL currently only supports pull queries on materialized aggregate tables. i.e. those created by a 'CREATE TABLE AS SELECT , FROM GROUP BY ' style statement.", + "message": "Table 'X' is not materialized.", "status": 400 } }, { - "name": "fail if WINDOWSTART used in non-windowed static query", - "enabled": false, - "comment": "disabled until RQTT can handle /query endpoint", + "name": "fail if WINDOWSTART used in non-windowed pull query", "statements": [ "CREATE STREAM INPUT (IGNORED INT) WITH (kafka_topic='test_topic', value_format='JSON');", "CREATE TABLE AGGREGATE AS SELECT COUNT(1) AS COUNT FROM INPUT GROUP BY ROWKEY;", diff --git a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/streaming/StreamedQueryResource.java b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/streaming/StreamedQueryResource.java index b1640bebba1f..ba01387fbcab 100644 --- a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/streaming/StreamedQueryResource.java +++ b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/streaming/StreamedQueryResource.java @@ -41,6 +41,7 @@ import io.confluent.ksql.statement.ConfiguredStatement; import io.confluent.ksql.util.KsqlConfig; import io.confluent.ksql.util.KsqlException; +import io.confluent.ksql.util.KsqlStatementException; import io.confluent.ksql.util.QueryMetadata; import io.confluent.ksql.util.TransientQueryMetadata; import io.confluent.ksql.version.metrics.ActivenessRegistrar; @@ -210,6 +211,8 @@ private Response handleStatement( statement.getClass().getName())); } catch (final TopicAuthorizationException e) { return Errors.accessDeniedFromKafka(e); + } catch (final KsqlStatementException e) { + return Errors.badStatement(e.getRawMessage(), e.getSqlStatement()); } catch (final KsqlException e) { return ErrorResponseUtil.generateResponse( e, Errors.badRequest(e));