From 8ef82ebbc29b33179fd4c232ca440b9926b78fdf Mon Sep 17 00:00:00 2001 From: Andy Coates <8012398+big-andy-coates@users.noreply.github.com> Date: Tue, 15 Oct 2019 14:08:53 +0100 Subject: [PATCH] feat: expose query status through EXPLAIN (#3570) * feat(3208): expose query status through EXPLAIN With this commit the status of any persistent query is exposed as part of the response from `EXPLAIN ;` For example, ``` ksql>EXPLAIN CSAS_ID_0_1; ID : CSAS_ID_0_1 SQL : CREATE STREAM ID_0 WITH (KAFKA_TOPIC='ID_0', PARTITIONS=1, REPLICAS=1) AS SELECT * FROM ORDER_KSTREAM ORDER_KSTREAM EMIT CHANGES; Status : REBALANCING Field | Type ----------------------------------------- ROWTIME | BIGINT (system) ROWKEY | VARCHAR(STRING) (system) ORDERTIME | BIGINT ORDERID | VARCHAR(STRING) ITEMID | VARCHAR(STRING) ORDERUNITS | DOUBLE TIMESTAMP | VARCHAR(STRING) PRICEARRAY | ARRAY KEYVALUEMAP | MAP ----------------------------------------- ... ``` Note the new `Status : REBALANCING` part. * chore: fix tests --- .../confluent/ksql/cli/console/Console.java | 3 + .../java/io/confluent/ksql/cli/CliTest.java | 60 ++++++++++++++----- .../ksql/util/PersistentQueryMetadata.java | 5 -- .../rest/entity/QueryDescriptionFactory.java | 20 +++++-- .../entity/QueryDescriptionFactoryTest.java | 46 ++++++++------ .../server/execution/ExplainExecutorTest.java | 8 ++- .../execution/ListQueriesExecutorTest.java | 1 + .../ksql/rest/entity/QueryDescription.java | 27 ++++++++- 8 files changed, 121 insertions(+), 49 deletions(-) diff --git a/ksql-cli/src/main/java/io/confluent/ksql/cli/console/Console.java b/ksql-cli/src/main/java/io/confluent/ksql/cli/console/Console.java index b2fa2b645111..5b4bbae4e509 100644 --- a/ksql-cli/src/main/java/io/confluent/ksql/cli/console/Console.java +++ b/ksql-cli/src/main/java/io/confluent/ksql/cli/console/Console.java @@ -640,6 +640,9 @@ private void printQueryDescription(final QueryDescription query) { if (query.getStatementText().length() > 0) { writer().println(String.format("%-20s : %s", "SQL", query.getStatementText())); } + if (query.getState().isPresent()) { + writer().println(String.format("%-20s : %s", "Status", query.getState().get())); + } writer().println(); printSchema(query.getFields(), ""); printQuerySources(query); diff --git a/ksql-cli/src/test/java/io/confluent/ksql/cli/CliTest.java b/ksql-cli/src/test/java/io/confluent/ksql/cli/CliTest.java index e790fb0eb2dd..f619b1738ffa 100644 --- a/ksql-cli/src/test/java/io/confluent/ksql/cli/CliTest.java +++ b/ksql-cli/src/test/java/io/confluent/ksql/cli/CliTest.java @@ -21,6 +21,7 @@ import static org.hamcrest.Matchers.any; import static org.hamcrest.Matchers.anyOf; import static org.hamcrest.Matchers.containsString; +import static org.hamcrest.Matchers.either; import static org.hamcrest.Matchers.emptyIterable; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.instanceOf; @@ -120,6 +121,8 @@ public class CliTest { private static final String SERVER_OVERRIDE = "SERVER"; private static final String SESSION_OVERRIDE = "SESSION"; + private static final Pattern QUERY_ID_PATTERN = Pattern.compile("with query ID: (\\S+)"); + private static final Pattern WRITE_QUERIES = Pattern .compile(".*The following queries write into this source: \\[(.+)].*", Pattern.DOTALL); @@ -779,7 +782,7 @@ public void shouldListFunctions() { } @Test - public void shouldDescribeScalarFunction() throws Exception { + public void shouldDescribeScalarFunction() { final String expectedOutput = "Name : TIMESTAMPTOSTRING\n" + "Author : Confluent\n" @@ -815,7 +818,7 @@ public void shouldDescribeScalarFunction() throws Exception { } @Test - public void shouldDescribeOverloadedScalarFunction() throws Exception { + public void shouldDescribeOverloadedScalarFunction() { // Given: localCli.handleLine("describe function substring;"); @@ -847,7 +850,7 @@ public void shouldDescribeOverloadedScalarFunction() throws Exception { } @Test - public void shouldDescribeAggregateFunction() throws Exception { + public void shouldDescribeAggregateFunction() { final String expectedSummary = "Name : TOPK\n" + "Author : confluent\n" + @@ -868,7 +871,30 @@ public void shouldDescribeAggregateFunction() throws Exception { } @Test - public void shouldPrintErrorIfCantFindFunction() throws Exception { + public void shouldExplainQueryId() { + // Given: + localCli.handleLine("CREATE STREAM " + streamName + " " + + "AS SELECT * FROM " + orderDataProvider.kstreamName() + ";"); + + final String queryId = extractQueryId(terminal.getOutputString()); + + final String explain = "EXPLAIN " + queryId + ";"; + + // When: + localCli.handleLine(explain); + + // Then: + assertThat(terminal.getOutputString(), containsString(queryId)); + assertThat(terminal.getOutputString(), containsString("Status")); + assertThat(terminal.getOutputString(), + either(containsString(": REBALANCING")) + .or(containsString("RUNNING"))); + + dropStream(streamName); + } + + @Test + public void shouldPrintErrorIfCantFindFunction() { localCli.handleLine("describe function foobar;"); assertThat(terminal.getOutputString(), @@ -876,19 +902,11 @@ public void shouldPrintErrorIfCantFindFunction() throws Exception { } @Test - public void shouldHandleSetPropertyAsPartOfMultiStatementLine() throws Exception { - // Given: - final String csas = - "CREATE STREAM " + streamName + " " - + "AS SELECT * FROM " + orderDataProvider.kstreamName() + ";"; - + public void shouldHandleSetPropertyAsPartOfMultiStatementLine() { // When: - localCli - .handleLine("set 'auto.offset.reset'='earliest'; " + csas); + localCli.handleLine("set 'auto.offset.reset'='earliest';"); // Then: - dropStream(streamName); - assertThat(terminal.getOutputString(), containsString("Successfully changed local property 'auto.offset.reset' to 'earliest'")); } @@ -1082,7 +1100,9 @@ private static CommandStatusEntity stubCommandStatusEntityWithSeqNum(final long } private void givenCommandSequenceNumber( - final KsqlRestClient mockRestClient, final long seqNum) throws Exception { + final KsqlRestClient mockRestClient, + final long seqNum + ) { final CommandStatusEntity stubEntity = stubCommandStatusEntityWithSeqNum(seqNum); when(mockRestClient.makeKsqlRequest(anyString(), anyLong())).thenReturn( RestResponse.successful( @@ -1093,7 +1113,9 @@ private void givenCommandSequenceNumber( } private void assertLastCommandSequenceNumber( - final KsqlRestClient mockRestClient, final long seqNum) throws Exception { + final KsqlRestClient mockRestClient, + final long seqNum + ) { // Given: reset(mockRestClient); final String statementText = "list streams;"; @@ -1169,6 +1191,12 @@ private static Matcher>> containsR return Matchers.contains(rows); } + private static String extractQueryId(final String outputString) { + final java.util.regex.Matcher matcher = QUERY_ID_PATTERN.matcher(outputString); + assertThat("Could not find query id in: " + outputString, matcher.find()); + return matcher.group(1); + } + private static class TestRowCaptor implements RowCaptor { private ImmutableList.Builder> rows = ImmutableList.builder(); diff --git a/ksql-engine/src/main/java/io/confluent/ksql/util/PersistentQueryMetadata.java b/ksql-engine/src/main/java/io/confluent/ksql/util/PersistentQueryMetadata.java index 3a981b4578ce..3947e378bdf6 100644 --- a/ksql-engine/src/main/java/io/confluent/ksql/util/PersistentQueryMetadata.java +++ b/ksql-engine/src/main/java/io/confluent/ksql/util/PersistentQueryMetadata.java @@ -25,7 +25,6 @@ import io.confluent.ksql.name.SourceName; import io.confluent.ksql.query.QueryId; import io.confluent.ksql.schema.ksql.PhysicalSchema; -import io.confluent.ksql.serde.Format; import java.util.Map; import java.util.Objects; import java.util.Optional; @@ -123,10 +122,6 @@ public SourceName getSinkName() { return sinkName; } - public Format getResultTopicFormat() { - return resultTopic.getValueFormat().getFormat(); - } - public String getSchemasDescription() { return schemas.toString(); } diff --git a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/entity/QueryDescriptionFactory.java b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/entity/QueryDescriptionFactory.java index 68d1603f01a1..d1d60dabb5a3 100644 --- a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/entity/QueryDescriptionFactory.java +++ b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/entity/QueryDescriptionFactory.java @@ -22,6 +22,7 @@ import io.confluent.ksql.util.PersistentQueryMetadata; import io.confluent.ksql.util.QueryMetadata; import java.util.Collections; +import java.util.Optional; import java.util.Set; import java.util.stream.Collectors; @@ -37,17 +38,26 @@ public static QueryDescription forQueryMetadata(final QueryMetadata queryMetadat persistentQuery.getQueryId(), persistentQuery, ImmutableSet.of(persistentQuery.getSinkName()), - false + false, + Optional.of(persistentQuery.getState()) ); } - return create(new QueryId(""), queryMetadata, Collections.emptySet(), true); + + return create( + new QueryId(""), + queryMetadata, + Collections.emptySet(), + true, + Optional.empty() + ); } private static QueryDescription create( final QueryId id, final QueryMetadata queryMetadata, final Set sinks, - final boolean valueSchemaOnly + final boolean valueSchemaOnly, + final Optional state ) { return new QueryDescription( id, @@ -57,8 +67,8 @@ private static QueryDescription create( sinks.stream().map(SourceName::name).collect(Collectors.toSet()), queryMetadata.getTopologyDescription(), queryMetadata.getExecutionPlan(), - queryMetadata.getOverriddenProperties() + queryMetadata.getOverriddenProperties(), + state ); } - } diff --git a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/entity/QueryDescriptionFactoryTest.java b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/entity/QueryDescriptionFactoryTest.java index 581d260895e0..f0a24bb21811 100644 --- a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/entity/QueryDescriptionFactoryTest.java +++ b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/entity/QueryDescriptionFactoryTest.java @@ -15,6 +15,7 @@ package io.confluent.ksql.rest.entity; +import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.contains; import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.isEmptyString; @@ -44,9 +45,9 @@ import java.util.function.Consumer; import java.util.stream.Collectors; import org.apache.kafka.streams.KafkaStreams; +import org.apache.kafka.streams.KafkaStreams.State; import org.apache.kafka.streams.Topology; import org.apache.kafka.streams.TopologyDescription; -import org.junit.Assert; import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; @@ -87,6 +88,7 @@ public class QueryDescriptionFactoryTest { @Before public void setUp() { when(topology.describe()).thenReturn(topologyDescription); + when(queryStreams.state()).thenReturn(State.RUNNING); transientQuery = new TransientQueryMetadata( SQL_TEXT, @@ -127,60 +129,70 @@ public void setUp() { @Test public void shouldHaveEmptyQueryIdFromTransientQuery() { - Assert.assertThat(transientQueryDescription.getId().getId(), is(isEmptyString())); + assertThat(transientQueryDescription.getId().getId(), is(isEmptyString())); } @Test public void shouldHaveQueryIdForPersistentQuery() { - Assert.assertThat(persistentQueryDescription.getId().getId(), is(QUERY_ID.getId())); + assertThat(persistentQueryDescription.getId().getId(), is(QUERY_ID.getId())); } @Test public void shouldExposeExecutionPlan() { - Assert.assertThat(transientQueryDescription.getExecutionPlan(), is("execution plan")); - Assert.assertThat(persistentQueryDescription.getExecutionPlan(), is("execution plan")); + assertThat(transientQueryDescription.getExecutionPlan(), is("execution plan")); + assertThat(persistentQueryDescription.getExecutionPlan(), is("execution plan")); } @Test public void shouldExposeSources() { - Assert.assertThat(transientQueryDescription.getSources(), is(SOURCE_NAMES.stream().map(SourceName::name).collect(Collectors.toSet()))); - Assert.assertThat(persistentQueryDescription.getSources(), is(SOURCE_NAMES.stream().map(SourceName::name).collect( Collectors.toSet()))); + assertThat(transientQueryDescription.getSources(), is(SOURCE_NAMES.stream().map(SourceName::name).collect(Collectors.toSet()))); + assertThat(persistentQueryDescription.getSources(), is(SOURCE_NAMES.stream().map(SourceName::name).collect( Collectors.toSet()))); } @Test public void shouldExposeStatementText() { - Assert.assertThat(transientQueryDescription.getStatementText(), is(SQL_TEXT)); - Assert.assertThat(persistentQueryDescription.getStatementText(), is(SQL_TEXT)); + assertThat(transientQueryDescription.getStatementText(), is(SQL_TEXT)); + assertThat(persistentQueryDescription.getStatementText(), is(SQL_TEXT)); } @Test public void shouldExposeTopology() { - Assert.assertThat(transientQueryDescription.getTopology(), is(TOPOLOGY_TEXT)); - Assert.assertThat(persistentQueryDescription.getTopology(), is(TOPOLOGY_TEXT)); + assertThat(transientQueryDescription.getTopology(), is(TOPOLOGY_TEXT)); + assertThat(persistentQueryDescription.getTopology(), is(TOPOLOGY_TEXT)); } @Test public void shouldExposeOverridenProperties() { - Assert.assertThat(transientQueryDescription.getOverriddenProperties(), is(PROP_OVERRIDES)); - Assert.assertThat(persistentQueryDescription.getOverriddenProperties(), is(PROP_OVERRIDES)); + assertThat(transientQueryDescription.getOverriddenProperties(), is(PROP_OVERRIDES)); + assertThat(persistentQueryDescription.getOverriddenProperties(), is(PROP_OVERRIDES)); } @Test public void shouldExposeValueFieldsForTransientQueries() { - Assert.assertThat(transientQueryDescription.getFields(), contains( + assertThat(transientQueryDescription.getFields(), contains( new FieldInfo("field1", new SchemaInfo(SqlBaseType.INTEGER, null, null)), new FieldInfo("field2", new SchemaInfo(SqlBaseType.STRING, null, null)))); } @Test public void shouldExposeAllFieldsForPersistentQueries() { - Assert.assertThat(persistentQueryDescription.getFields(), contains( + assertThat(persistentQueryDescription.getFields(), contains( new FieldInfo("ROWTIME", new SchemaInfo(SqlBaseType.BIGINT, null, null)), new FieldInfo("ROWKEY", new SchemaInfo(SqlBaseType.STRING, null, null)), new FieldInfo("field1", new SchemaInfo(SqlBaseType.INTEGER, null, null)), new FieldInfo("field2", new SchemaInfo(SqlBaseType.STRING, null, null)))); } + @Test + public void shouldReportPersistentQueriesStatus() { + assertThat(persistentQueryDescription.getState(), is(Optional.of("RUNNING"))); + } + + @Test + public void shouldNotReportTransientQueriesStatus() { + assertThat(transientQueryDescription.getState(), is(Optional.empty())); + } + @Test public void shouldHandleRowTimeInValueSchemaForTransientQuery() { // Given: @@ -208,7 +220,7 @@ public void shouldHandleRowTimeInValueSchemaForTransientQuery() { transientQueryDescription = QueryDescriptionFactory.forQueryMetadata(transientQuery); // Then: - Assert.assertThat(transientQueryDescription.getFields(), contains( + assertThat(transientQueryDescription.getFields(), contains( new FieldInfo("field1", new SchemaInfo(SqlBaseType.INTEGER, null, null)), new FieldInfo("ROWTIME", new SchemaInfo(SqlBaseType.BIGINT, null, null)), new FieldInfo("field2", new SchemaInfo(SqlBaseType.STRING, null, null)))); @@ -241,7 +253,7 @@ public void shouldHandleRowKeyInValueSchemaForTransientQuery() { transientQueryDescription = QueryDescriptionFactory.forQueryMetadata(transientQuery); // Then: - Assert.assertThat(transientQueryDescription.getFields(), contains( + assertThat(transientQueryDescription.getFields(), contains( new FieldInfo("field1", new SchemaInfo(SqlBaseType.INTEGER, null, null)), new FieldInfo("ROWKEY", new SchemaInfo(SqlBaseType.STRING, null, null)), new FieldInfo("field2", new SchemaInfo(SqlBaseType.STRING, null, null)))); diff --git a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/execution/ExplainExecutorTest.java b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/execution/ExplainExecutorTest.java index dc51836e58f7..2da255335abe 100644 --- a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/execution/ExplainExecutorTest.java +++ b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/execution/ExplainExecutorTest.java @@ -44,14 +44,17 @@ @RunWith(MockitoJUnitRunner.class) public class ExplainExecutorTest { - @Rule public final TemporaryEngine engine = new TemporaryEngine(); - @Rule public ExpectedException expectedException = ExpectedException.none(); + @Rule + public final TemporaryEngine engine = new TemporaryEngine(); + @Rule + public ExpectedException expectedException = ExpectedException.none(); @Test public void shouldExplainQueryId() { // Given: final ConfiguredStatement explain = engine.configure("EXPLAIN id;"); final PersistentQueryMetadata metadata = givenPersistentQuery("id"); + when(metadata.getState()).thenReturn("Running"); KsqlEngine engine = mock(KsqlEngine.class); when(engine.getPersistentQuery(metadata.getQueryId())).thenReturn(Optional.of(metadata)); @@ -68,7 +71,6 @@ public void shouldExplainQueryId() { assertThat(query.getQueryDescription(), equalTo(QueryDescriptionFactory.forQueryMetadata(metadata))); } - @Test public void shouldExplainPersistentStatement() { // Given: diff --git a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/execution/ListQueriesExecutorTest.java b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/execution/ListQueriesExecutorTest.java index 4203efe4d4e9..b5fe79efddf4 100644 --- a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/execution/ListQueriesExecutorTest.java +++ b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/execution/ListQueriesExecutorTest.java @@ -87,6 +87,7 @@ public void shouldListQueriesExtended() { // Given final ConfiguredStatement showQueries = engine.configure("SHOW QUERIES EXTENDED;"); final PersistentQueryMetadata metadata = givenPersistentQuery("id"); + when(metadata.getState()).thenReturn("Running"); final KsqlEngine engine = mock(KsqlEngine.class); when(engine.getPersistentQueries()).thenReturn(ImmutableList.of(metadata)); diff --git a/ksql-rest-model/src/main/java/io/confluent/ksql/rest/entity/QueryDescription.java b/ksql-rest-model/src/main/java/io/confluent/ksql/rest/entity/QueryDescription.java index f4505a564429..d62ec29b6894 100644 --- a/ksql-rest-model/src/main/java/io/confluent/ksql/rest/entity/QueryDescription.java +++ b/ksql-rest-model/src/main/java/io/confluent/ksql/rest/entity/QueryDescription.java @@ -23,6 +23,7 @@ import java.util.List; import java.util.Map; import java.util.Objects; +import java.util.Optional; import java.util.Set; @JsonIgnoreProperties(ignoreUnknown = true) @@ -36,7 +37,9 @@ public class QueryDescription { private final String topology; private final String executionPlan; private final Map overriddenProperties; + private final Optional state; + @SuppressWarnings("WeakerAccess") // Invoked via reflection @JsonCreator public QueryDescription( @JsonProperty("id") final QueryId id, @@ -46,7 +49,8 @@ public QueryDescription( @JsonProperty("sinks") final Set sinks, @JsonProperty("topology") final String topology, @JsonProperty("executionPlan") final String executionPlan, - @JsonProperty("overriddenProperties") final Map overriddenProperties + @JsonProperty("overriddenProperties") final Map overriddenProperties, + @JsonProperty("state") final Optional state ) { this.id = id; this.statementText = statementText; @@ -56,6 +60,7 @@ public QueryDescription( this.topology = topology; this.executionPlan = executionPlan; this.overriddenProperties = Collections.unmodifiableMap(overriddenProperties); + this.state = Objects.requireNonNull(state, "state"); } public QueryId getId() { @@ -90,8 +95,14 @@ public Map getOverriddenProperties() { return overriddenProperties; } + public Optional getState() { + return state; + } + + // CHECKSTYLE_RULES.OFF: CyclomaticComplexity @Override public boolean equals(final Object o) { + // CHECKSTYLE_RULES.ON: CyclomaticComplexity if (this == o) { return true; } @@ -106,12 +117,22 @@ public boolean equals(final Object o) { && Objects.equals(executionPlan, that.executionPlan) && Objects.equals(sources, that.sources) && Objects.equals(sinks, that.sinks) - && Objects.equals(overriddenProperties, that.overriddenProperties); + && Objects.equals(overriddenProperties, that.overriddenProperties) + && Objects.equals(state, that.state); } @Override public int hashCode() { return Objects.hash( - id, statementText, fields, topology, executionPlan, sources, sinks, overriddenProperties); + id, + statementText, + fields, + topology, + executionPlan, + sources, + sinks, + overriddenProperties, + state + ); } }