diff --git a/ksql-cli/src/main/java/io/confluent/ksql/cli/console/Console.java b/ksql-cli/src/main/java/io/confluent/ksql/cli/console/Console.java index 7cc90f5c8799..c2c6d39d0d27 100644 --- a/ksql-cli/src/main/java/io/confluent/ksql/cli/console/Console.java +++ b/ksql-cli/src/main/java/io/confluent/ksql/cli/console/Console.java @@ -44,6 +44,7 @@ import io.confluent.ksql.cli.console.table.builder.TopicDescriptionTableBuilder; import io.confluent.ksql.cli.console.table.builder.TypeListTableBuilder; import io.confluent.ksql.json.JsonMapper; +import io.confluent.ksql.model.WindowType; import io.confluent.ksql.rest.entity.ArgumentInfo; import io.confluent.ksql.rest.entity.CommandStatusEntity; import io.confluent.ksql.rest.entity.ConnectorDescription; @@ -448,22 +449,40 @@ private void printWarnings(final KsqlEntity entity) { } } - private static String formatFieldType(final FieldInfo field, final String keyField) { - if (field.getName().equals("ROWTIME") || field.getName().equals("ROWKEY")) { + private static String formatFieldType( + final FieldInfo field, + final Optional windowType, + final String keyField + ) { + if (field.getName().equals("ROWTIME")) { return String.format("%-16s %s", field.getSchema().toTypeString(), "(system)"); - } else if (keyField != null && keyField.contains("." + field.getName())) { + } + + if (field.getName().equals("ROWKEY")) { + final String wt = windowType + .map(v -> " (Window type: " + v + ")") + .orElse(""); + + return String.format("%-16s %s%s", field.getSchema().toTypeString(), "(system)", wt); + } + + if (keyField != null && keyField.contains("." + field.getName())) { return String.format("%-16s %s", field.getSchema().toTypeString(), "(key)"); - } else { - return field.getSchema().toTypeString(); } + + return field.getSchema().toTypeString(); } - private void printSchema(final List fields, final String keyField) { + private void printSchema( + final Optional windowType, + final List fields, + final String keyField + ) { final Table.Builder tableBuilder = new Table.Builder(); if (!fields.isEmpty()) { tableBuilder.withColumnHeaders("Field", "Type"); fields.forEach( - f -> tableBuilder.withRow(f.getName(), formatFieldType(f, keyField))); + f -> tableBuilder.withRow(f.getName(), formatFieldType(f, windowType, keyField))); tableBuilder.build().print(this); } } @@ -474,9 +493,9 @@ private void printTopicInfo(final SourceDescription source) { : source.getTimestamp(); writer().println(String.format("%-20s : %s", "Key field", source.getKey())); - writer().println(String.format("%-20s : %s", "Key format", "STRING")); writer().println(String.format("%-20s : %s", "Timestamp field", timestamp)); - writer().println(String.format("%-20s : %s", "Value format", source.getFormat())); + writer().println(String.format("%-20s : %s", "Key format", source.getKeyFormat())); + writer().println(String.format("%-20s : %s", "Value format", source.getValueFormat())); if (!source.getTopic().isEmpty()) { String topicInformation = String.format("%-20s : %s", @@ -509,7 +528,9 @@ private void printQueries( "-----------------------------------" )); for (final RunningQuery writeQuery : queries) { - writer().println(writeQuery.getId().getId() + " : " + writeQuery.getQueryString()); + writer().println(writeQuery.getId().getId() + + " (" + writeQuery.getState().orElse("N/A") + + ") : " + writeQuery.getQuerySingleLine()); } writer().println("\nFor query topology and execution plan please run: EXPLAIN "); } @@ -562,7 +583,7 @@ private void printOverriddenProperties(final QueryDescription queryDescription) private void printSourceDescription(final SourceDescription source) { writer().println(String.format("%-20s : %s", "Name", source.getName())); if (!source.isExtended()) { - printSchema(source.getFields(), source.getKey()); + printSchema(source.getWindowType(), source.getFields(), source.getKey()); writer().println( "For runtime statistics and query details run: DESCRIBE EXTENDED ;"); return; @@ -573,7 +594,7 @@ private void printSourceDescription(final SourceDescription source) { writer().println(String.format("%-20s : %s", "Statement", source.getStatement())); writer().println(""); - printSchema(source.getFields(), source.getKey()); + printSchema(source.getWindowType(), source.getFields(), source.getKey()); printQueries(source.getReadQueries(), source.getType(), "read"); @@ -638,7 +659,7 @@ private void printQueryDescription(final QueryDescription query) { writer().println(String.format("%-20s : %s", "Status", query.getState().get())); } writer().println(); - printSchema(query.getFields(), ""); + printSchema(query.getWindowType(), query.getFields(), ""); printQuerySources(query); printQuerySinks(query); printExecutionPlan(query); diff --git a/ksql-cli/src/main/java/io/confluent/ksql/cli/console/table/builder/QueriesTableBuilder.java b/ksql-cli/src/main/java/io/confluent/ksql/cli/console/table/builder/QueriesTableBuilder.java index 079f507f9ece..52d74297a78e 100644 --- a/ksql-cli/src/main/java/io/confluent/ksql/cli/console/table/builder/QueriesTableBuilder.java +++ b/ksql-cli/src/main/java/io/confluent/ksql/cli/console/table/builder/QueriesTableBuilder.java @@ -25,14 +25,17 @@ public class QueriesTableBuilder implements TableBuilder { private static final List HEADERS = - ImmutableList.of("Query ID", "Kafka Topic", "Query String"); + ImmutableList.of("Query ID", "Status", "Kafka Topic", "Query String"); @Override public Table buildTable(final Queries entity) { final Stream> rows = entity.getQueries().stream() .map(r -> ImmutableList.of( r.getId().getId(), - String.join(",", r.getSinks()), r.getQueryString())); + r.getState().orElse("N/A"), + String.join(",", r.getSinks()), + r.getQuerySingleLine() + )); return new Builder() .withColumnHeaders(HEADERS) diff --git a/ksql-cli/src/test/java/io/confluent/ksql/cli/console/ConsoleTest.java b/ksql-cli/src/test/java/io/confluent/ksql/cli/console/ConsoleTest.java index aa8f29638bfd..ea63de882e3a 100644 --- a/ksql-cli/src/test/java/io/confluent/ksql/cli/console/ConsoleTest.java +++ b/ksql-cli/src/test/java/io/confluent/ksql/cli/console/ConsoleTest.java @@ -80,6 +80,7 @@ import java.util.Collection; import java.util.Collections; import java.util.List; +import java.util.Optional; import java.util.function.Supplier; import org.apache.commons.lang3.StringUtils; import org.apache.kafka.connect.runtime.rest.entities.ConnectorStateInfo; @@ -109,6 +110,7 @@ public class ConsoleTest { private final CliSpecificCommand cliCommand; private final SourceDescription sourceDescription = new SourceDescription( "TestSource", + Optional.empty(), Collections.emptyList(), Collections.emptyList(), buildTestSchema(SqlTypes.INTEGER, SqlTypes.STRING), @@ -118,6 +120,7 @@ public class ConsoleTest { "stats", "errors", true, + "kafka", "avro", "kadka-topic", 2, @@ -285,7 +288,7 @@ public void testPrintQueries() { final List queries = new ArrayList<>(); queries.add( new RunningQuery( - "select * from t1", Collections.singleton("Test"), new QueryId("0"))); + "select * from t1", Collections.singleton("Test"), new QueryId("0"), Optional.of("Running"))); final KsqlEntityList entityList = new KsqlEntityList(ImmutableList.of( new Queries("e", queries) @@ -301,18 +304,19 @@ public void testPrintQueries() { + " \"@type\" : \"queries\",\n" + " \"statementText\" : \"e\",\n" + " \"queries\" : [ {\n" + + " \"queryString\" : \"select * from t1\",\n" + " \"sinks\" : [ \"Test\" ],\n" + " \"id\" : \"0\",\n" - + " \"queryString\" : \"select * from t1\"\n" + + " \"state\" : \"Running\"\n" + " } ],\n" + " \"warnings\" : [ ]\n" + "} ]\n")); } else { assertThat(output, is("\n" - + " Query ID | Kafka Topic | Query String \n" - + "-------------------------------------------\n" - + " 0 | Test | select * from t1 \n" - + "-------------------------------------------\n" + + " Query ID | Status | Kafka Topic | Query String \n" + + "-----------------------------------------------------\n" + + " 0 | Running | Test | select * from t1 \n" + + "-----------------------------------------------------\n" + "For detailed information on a Query run: EXPLAIN ;\n")); } } @@ -334,10 +338,10 @@ public void testPrintSourceDescription() { ); final List readQueries = ImmutableList.of( - new RunningQuery("read query", ImmutableSet.of("sink1"), new QueryId("readId")) + new RunningQuery("read query", ImmutableSet.of("sink1"), new QueryId("readId"), Optional.of("Running")) ); final List writeQueries = ImmutableList.of( - new RunningQuery("write query", ImmutableSet.of("sink2"), new QueryId("writeId")) + new RunningQuery("write query", ImmutableSet.of("sink2"), new QueryId("writeId"), Optional.of("Running")) ); final KsqlEntityList entityList = new KsqlEntityList(ImmutableList.of( @@ -345,6 +349,7 @@ public void testPrintSourceDescription() { "some sql", new SourceDescription( "TestSource", + Optional.empty(), readQueries, writeQueries, fields, @@ -354,6 +359,7 @@ public void testPrintSourceDescription() { "stats", "errors", false, + "kafka", "avro", "kadka-topic", 1, @@ -375,15 +381,18 @@ public void testPrintSourceDescription() { + " \"statementText\" : \"some sql\",\n" + " \"sourceDescription\" : {\n" + " \"name\" : \"TestSource\",\n" + + " \"windowType\" : null,\n" + " \"readQueries\" : [ {\n" + + " \"queryString\" : \"read query\",\n" + " \"sinks\" : [ \"sink1\" ],\n" + " \"id\" : \"readId\",\n" - + " \"queryString\" : \"read query\"\n" + + " \"state\" : \"Running\"\n" + " } ],\n" + " \"writeQueries\" : [ {\n" + + " \"queryString\" : \"write query\",\n" + " \"sinks\" : [ \"sink2\" ],\n" + " \"id\" : \"writeId\",\n" - + " \"queryString\" : \"write query\"\n" + + " \"state\" : \"Running\"\n" + " } ],\n" + " \"fields\" : [ {\n" + " \"name\" : \"ROWTIME\",\n" @@ -477,7 +486,8 @@ public void testPrintSourceDescription() { + " \"statistics\" : \"stats\",\n" + " \"errorStats\" : \"errors\",\n" + " \"extended\" : false,\n" - + " \"format\" : \"avro\",\n" + + " \"keyFormat\" : \"kafka\",\n" + + " \"valueFormat\" : \"avro\",\n" + " \"topic\" : \"kadka-topic\",\n" + " \"partitions\" : 1,\n" + " \"replication\" : 1,\n" @@ -583,6 +593,7 @@ public void testPrintConnectorDescription() { + " },\n" + " \"sources\" : [ {\n" + " \"name\" : \"TestSource\",\n" + + " \"windowType\" : null,\n" + " \"readQueries\" : [ ],\n" + " \"writeQueries\" : [ ],\n" + " \"fields\" : [ {\n" @@ -620,7 +631,8 @@ public void testPrintConnectorDescription() { + " \"statistics\" : \"stats\",\n" + " \"errorStats\" : \"errors\",\n" + " \"extended\" : true,\n" - + " \"format\" : \"avro\",\n" + + " \"keyFormat\" : \"kafka\",\n" + + " \"valueFormat\" : \"avro\",\n" + " \"topic\" : \"kadka-topic\",\n" + " \"partitions\" : 2,\n" + " \"replication\" : 1,\n" @@ -979,10 +991,10 @@ public void testPrintExecuptionPlan() { public void shouldPrintTopicDescribeExtended() { // Given: final List readQueries = ImmutableList.of( - new RunningQuery("read query", ImmutableSet.of("sink1"), new QueryId("readId")) + new RunningQuery("read query", ImmutableSet.of("sink1"), new QueryId("readId"), Optional.of("Running")) ); final List writeQueries = ImmutableList.of( - new RunningQuery("write query", ImmutableSet.of("sink2"), new QueryId("writeId")) + new RunningQuery("write query", ImmutableSet.of("sink2"), new QueryId("writeId"), Optional.of("Running")) ); final KsqlEntityList entityList = new KsqlEntityList(ImmutableList.of( @@ -990,6 +1002,7 @@ public void shouldPrintTopicDescribeExtended() { "e", new SourceDescription( "TestSource", + Optional.empty(), readQueries, writeQueries, buildTestSchema(SqlTypes.STRING), @@ -999,6 +1012,7 @@ public void shouldPrintTopicDescribeExtended() { "stats", "errors", true, + "kafka", "avro", "kadka-topic", 2, 1, @@ -1019,15 +1033,18 @@ public void shouldPrintTopicDescribeExtended() { + " \"statementText\" : \"e\",\n" + " \"sourceDescription\" : {\n" + " \"name\" : \"TestSource\",\n" + + " \"windowType\" : null,\n" + " \"readQueries\" : [ {\n" + + " \"queryString\" : \"read query\",\n" + " \"sinks\" : [ \"sink1\" ],\n" + " \"id\" : \"readId\",\n" - + " \"queryString\" : \"read query\"\n" + + " \"state\" : \"Running\"\n" + " } ],\n" + " \"writeQueries\" : [ {\n" + + " \"queryString\" : \"write query\",\n" + " \"sinks\" : [ \"sink2\" ],\n" + " \"id\" : \"writeId\",\n" - + " \"queryString\" : \"write query\"\n" + + " \"state\" : \"Running\"\n" + " } ],\n" + " \"fields\" : [ {\n" + " \"name\" : \"ROWTIME\",\n" @@ -1057,7 +1074,8 @@ public void shouldPrintTopicDescribeExtended() { + " \"statistics\" : \"stats\",\n" + " \"errorStats\" : \"errors\",\n" + " \"extended\" : true,\n" - + " \"format\" : \"avro\",\n" + + " \"keyFormat\" : \"kafka\",\n" + + " \"valueFormat\" : \"avro\",\n" + " \"topic\" : \"kadka-topic\",\n" + " \"partitions\" : 2,\n" + " \"replication\" : 1,\n" @@ -1070,8 +1088,8 @@ public void shouldPrintTopicDescribeExtended() { + "Name : TestSource\n" + "Type : TABLE\n" + "Key field : key\n" - + "Key format : STRING\n" + "Timestamp field : 2000-01-01\n" + + "Key format : kafka\n" + "Value format : avro\n" + "Kafka topic : kadka-topic (partitions: 2, replication: 1)\n" + "Statement : sql statement text\n" @@ -1085,13 +1103,13 @@ public void shouldPrintTopicDescribeExtended() { + "\n" + "Queries that read from this TABLE\n" + "-----------------------------------\n" - + "readId : read query\n" + + "readId (Running) : read query\n" + "\n" + "For query topology and execution plan please run: EXPLAIN \n" + "\n" + "Queries that write from this TABLE\n" + "-----------------------------------\n" - + "writeId : write query\n" + + "writeId (Running) : write query\n" + "\n" + "For query topology and execution plan please run: EXPLAIN \n" + "\n" diff --git a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/entity/QueryDescriptionFactory.java b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/entity/QueryDescriptionFactory.java index 869f2ad9d715..d323904d819e 100644 --- a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/entity/QueryDescriptionFactory.java +++ b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/entity/QueryDescriptionFactory.java @@ -16,6 +16,7 @@ package io.confluent.ksql.rest.entity; import com.google.common.collect.ImmutableSet; +import io.confluent.ksql.model.WindowType; import io.confluent.ksql.name.SourceName; import io.confluent.ksql.query.QueryId; import io.confluent.ksql.rest.util.EntityUtil; @@ -37,6 +38,7 @@ public static QueryDescription forQueryMetadata(final QueryMetadata queryMetadat return create( persistentQuery.getQueryId(), persistentQuery, + persistentQuery.getResultTopic().getKeyFormat().getWindowType(), ImmutableSet.of(persistentQuery.getSinkName()), Optional.of(persistentQuery.getState()) ); @@ -45,6 +47,7 @@ public static QueryDescription forQueryMetadata(final QueryMetadata queryMetadat return create( new QueryId(""), queryMetadata, + Optional.empty(), Collections.emptySet(), Optional.empty() ); @@ -53,12 +56,14 @@ public static QueryDescription forQueryMetadata(final QueryMetadata queryMetadat private static QueryDescription create( final QueryId id, final QueryMetadata queryMetadata, + final Optional windowType, final Set sinks, final Optional state ) { return new QueryDescription( id, queryMetadata.getStatementString(), + windowType, EntityUtil.buildSourceSchemaEntity(queryMetadata.getLogicalSchema()), queryMetadata.getSourceNames().stream().map(SourceName::name).collect(Collectors.toSet()), sinks.stream().map(SourceName::name).collect(Collectors.toSet()), diff --git a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/entity/SourceDescriptionFactory.java b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/entity/SourceDescriptionFactory.java index ea357831049a..c7ac042aad7a 100644 --- a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/entity/SourceDescriptionFactory.java +++ b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/entity/SourceDescriptionFactory.java @@ -32,13 +32,13 @@ private SourceDescriptionFactory() { public static SourceDescription create( final DataSource dataSource, final boolean extended, - final String format, final List readQueries, final List writeQueries, final Optional topicDescription ) { return new SourceDescription( dataSource.getName().toString(FormatOptions.noEscape()), + dataSource.getKsqlTopic().getKeyFormat().getWindowType(), readQueries, writeQueries, EntityUtil.buildSourceSchemaEntity(dataSource.getSchema()), @@ -54,7 +54,8 @@ public static SourceDescription create( ? MetricCollectors.getAndFormatStatsFor( dataSource.getKafkaTopicName(), true) : ""), extended, - format, + dataSource.getKsqlTopic().getKeyFormat().getFormat().name(), + dataSource.getKsqlTopic().getValueFormat().getFormat().name(), dataSource.getKafkaTopicName(), topicDescription.map(td -> td.partitions().size()).orElse(0), topicDescription.map(td -> td.partitions().get(0).replicas().size()).orElse(0), diff --git a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/execution/DescribeConnectorExecutor.java b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/execution/DescribeConnectorExecutor.java index fa1385c27b37..293ceff571cd 100644 --- a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/execution/DescribeConnectorExecutor.java +++ b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/execution/DescribeConnectorExecutor.java @@ -106,7 +106,6 @@ public Optional execute( .map(source -> SourceDescriptionFactory.create( source, false, - source.getKsqlTopic().getValueFormat().getFormat().name(), ImmutableList.of(), ImmutableList.of(), Optional.empty())) diff --git a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/execution/ListQueriesExecutor.java b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/execution/ListQueriesExecutor.java index 90f1265d0a81..e57caf60ef06 100644 --- a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/execution/ListQueriesExecutor.java +++ b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/execution/ListQueriesExecutor.java @@ -51,11 +51,12 @@ public static Optional execute( statement.getStatementText(), executionContext.getPersistentQueries() .stream() - .map( - q -> new RunningQuery( + .map(q -> new RunningQuery( q.getStatementString(), ImmutableSet.of(q.getSinkName().name()), - q.getQueryId())) + q.getQueryId(), + Optional.of(q.getState()) + )) .collect(Collectors.toList()))); } diff --git a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/execution/ListSourceExecutor.java b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/execution/ListSourceExecutor.java index 6b2b03b6bb99..7b474dd3e76f 100644 --- a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/execution/ListSourceExecutor.java +++ b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/execution/ListSourceExecutor.java @@ -205,7 +205,6 @@ private static SourceDescriptionWithWarnings describeSource( SourceDescriptionFactory.create( dataSource, extended, - dataSource.getKsqlTopic().getValueFormat().getFormat().name(), getQueries(ksqlEngine, q -> q.getSourceNames().contains(dataSource.getName())), getQueries(ksqlEngine, q -> q.getSinkName().equals(dataSource.getName())), topicDescription @@ -223,7 +222,8 @@ private static List getQueries( .map(q -> new RunningQuery( q.getStatementString(), ImmutableSet.of(q.getSinkName().name()), - q.getQueryId() + q.getQueryId(), + Optional.of(q.getState()) )) .collect(Collectors.toList()); } diff --git a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/entity/QueryDescriptionFactoryTest.java b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/entity/QueryDescriptionFactoryTest.java index 79c11ad3f9ff..f4ac5003f630 100644 --- a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/entity/QueryDescriptionFactoryTest.java +++ b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/entity/QueryDescriptionFactoryTest.java @@ -32,6 +32,9 @@ import io.confluent.ksql.schema.ksql.PhysicalSchema; import io.confluent.ksql.schema.ksql.SqlBaseType; import io.confluent.ksql.schema.ksql.types.SqlTypes; +import io.confluent.ksql.serde.Format; +import io.confluent.ksql.serde.FormatInfo; +import io.confluent.ksql.serde.KeyFormat; import io.confluent.ksql.serde.SerdeOption; import io.confluent.ksql.util.KsqlConfig; import io.confluent.ksql.util.PersistentQueryMetadata; @@ -97,6 +100,8 @@ public void setUp() { when(topology.describe()).thenReturn(topologyDescription); when(queryStreams.state()).thenReturn(State.RUNNING); + when(sinkTopic.getKeyFormat()).thenReturn(KeyFormat.nonWindowed(FormatInfo.of(Format.KAFKA))); + transientQuery = new TransientQueryMetadata( SQL_TEXT, queryStreams, diff --git a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/entity/SourceDescriptionFactoryTest.java b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/entity/SourceDescriptionFactoryTest.java index 27fcacf6c4f0..a0f4da4c9552 100644 --- a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/entity/SourceDescriptionFactoryTest.java +++ b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/entity/SourceDescriptionFactoryTest.java @@ -118,7 +118,6 @@ public void shouldReturnStatsBasedOnKafkaTopic() { final SourceDescription sourceDescription = SourceDescriptionFactory.create( dataSource, true, - "json", Collections.emptyList(), Collections.emptyList(), Optional.empty()); @@ -142,7 +141,6 @@ public void shouldReturnEmptyTimestampColumn() { final SourceDescription sourceDescription = SourceDescriptionFactory.create( dataSource, true, - "json", Collections.emptyList(), Collections.emptyList(), Optional.empty()); @@ -165,7 +163,6 @@ public void shouldReturnTimestampColumnIfPresent() { final SourceDescription sourceDescription = SourceDescriptionFactory.create( dataSource, true, - "json", Collections.emptyList(), Collections.emptyList(), Optional.empty()); diff --git a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/execution/ExplainExecutorTest.java b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/execution/ExplainExecutorTest.java index a4bf771c84f7..a55116ca5de5 100644 --- a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/execution/ExplainExecutorTest.java +++ b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/execution/ExplainExecutorTest.java @@ -26,11 +26,15 @@ import com.google.common.collect.ImmutableMap; import io.confluent.ksql.engine.KsqlEngine; +import io.confluent.ksql.execution.ddl.commands.KsqlTopic; import io.confluent.ksql.name.SourceName; import io.confluent.ksql.query.QueryId; import io.confluent.ksql.rest.entity.QueryDescriptionEntity; import io.confluent.ksql.rest.entity.QueryDescriptionFactory; import io.confluent.ksql.rest.server.TemporaryEngine; +import io.confluent.ksql.serde.Format; +import io.confluent.ksql.serde.FormatInfo; +import io.confluent.ksql.serde.KeyFormat; import io.confluent.ksql.statement.ConfiguredStatement; import io.confluent.ksql.util.KsqlException; import io.confluent.ksql.util.PersistentQueryMetadata; @@ -54,7 +58,6 @@ public void shouldExplainQueryId() { // Given: final ConfiguredStatement explain = engine.configure("EXPLAIN id;"); final PersistentQueryMetadata metadata = givenPersistentQuery("id"); - when(metadata.getState()).thenReturn("Running"); final KsqlEngine engine = mock(KsqlEngine.class); when(engine.getPersistentQuery(metadata.getQueryId())).thenReturn(Optional.of(metadata)); @@ -153,6 +156,14 @@ public static PersistentQueryMetadata givenPersistentQuery(final String id) { when(metadata.getQueryId()).thenReturn(new QueryId(id)); when(metadata.getSinkName()).thenReturn(SourceName.of(id)); when(metadata.getLogicalSchema()).thenReturn(TemporaryEngine.SCHEMA); + when(metadata.getState()).thenReturn("Running"); + when(metadata.getTopologyDescription()).thenReturn("topology"); + when(metadata.getExecutionPlan()).thenReturn("plan"); + when(metadata.getStatementString()).thenReturn("sql"); + + final KsqlTopic sinkTopic = mock(KsqlTopic.class); + when(sinkTopic.getKeyFormat()).thenReturn(KeyFormat.nonWindowed(FormatInfo.of(Format.KAFKA))); + when(metadata.getResultTopic()).thenReturn(sinkTopic); return metadata; } diff --git a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/execution/ListQueriesExecutorTest.java b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/execution/ListQueriesExecutorTest.java index b5fe79efddf4..579fd7b14959 100644 --- a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/execution/ListQueriesExecutorTest.java +++ b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/execution/ListQueriesExecutorTest.java @@ -26,6 +26,7 @@ import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; import io.confluent.ksql.engine.KsqlEngine; +import io.confluent.ksql.execution.ddl.commands.KsqlTopic; import io.confluent.ksql.name.SourceName; import io.confluent.ksql.query.QueryId; import io.confluent.ksql.rest.entity.Queries; @@ -33,8 +34,12 @@ import io.confluent.ksql.rest.entity.QueryDescriptionList; import io.confluent.ksql.rest.entity.RunningQuery; import io.confluent.ksql.rest.server.TemporaryEngine; +import io.confluent.ksql.serde.Format; +import io.confluent.ksql.serde.FormatInfo; +import io.confluent.ksql.serde.KeyFormat; import io.confluent.ksql.statement.ConfiguredStatement; import io.confluent.ksql.util.PersistentQueryMetadata; +import java.util.Optional; import org.junit.Rule; import org.junit.Test; import org.junit.runner.RunWith; @@ -79,7 +84,9 @@ public void shouldListQueriesBasic() { new RunningQuery( metadata.getStatementString(), ImmutableSet.of(metadata.getSinkName().name()), - metadata.getQueryId()))); + metadata.getQueryId(), + Optional.of(metadata.getState()) + ))); } @Test @@ -107,9 +114,17 @@ public void shouldListQueriesExtended() { @SuppressWarnings("SameParameterValue") public static PersistentQueryMetadata givenPersistentQuery(final String id) { final PersistentQueryMetadata metadata = mock(PersistentQueryMetadata.class); + when(metadata.getStatementString()).thenReturn("sql"); when(metadata.getQueryId()).thenReturn(new QueryId(id)); when(metadata.getSinkName()).thenReturn(SourceName.of(id)); when(metadata.getLogicalSchema()).thenReturn(TemporaryEngine.SCHEMA); + when(metadata.getState()).thenReturn("Running"); + when(metadata.getTopologyDescription()).thenReturn("topology"); + when(metadata.getExecutionPlan()).thenReturn("plan"); + + final KsqlTopic sinkTopic = mock(KsqlTopic.class); + when(sinkTopic.getKeyFormat()).thenReturn(KeyFormat.nonWindowed(FormatInfo.of(Format.KAFKA))); + when(metadata.getResultTopic()).thenReturn(sinkTopic); return metadata; } diff --git a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/execution/ListSourceExecutorTest.java b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/execution/ListSourceExecutorTest.java index 7cbbe411bd07..baa8890122f8 100644 --- a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/execution/ListSourceExecutorTest.java +++ b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/execution/ListSourceExecutorTest.java @@ -22,9 +22,11 @@ import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.instanceOf; import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; import static org.mockito.Mockito.spy; import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; @@ -56,10 +58,15 @@ import java.util.Arrays; import java.util.Optional; import java.util.stream.Collectors; +import org.apache.kafka.clients.admin.TopicDescription; +import org.apache.kafka.common.Node; +import org.apache.kafka.common.TopicPartitionInfo; +import org.junit.Before; import org.junit.Rule; import org.junit.Test; import org.junit.rules.ExpectedException; import org.junit.runner.RunWith; +import org.mockito.Mock; import org.mockito.junit.MockitoJUnitRunner; @RunWith(MockitoJUnitRunner.class) @@ -70,6 +77,17 @@ public class ListSourceExecutorTest { @Rule public final ExpectedException expectedException = ExpectedException.none(); + @Mock + private TopicDescription topicWith1PartitionAndRfOf1; + + @Before + public void setUp() { + final Node node = mock(Node.class); + final TopicPartitionInfo topicInfo = mock(TopicPartitionInfo.class); + when(topicInfo.replicas()).thenReturn(ImmutableList.of(node)); + when(topicWith1PartitionAndRfOf1.partitions()).thenReturn(ImmutableList.of(topicInfo)); + } + @Test public void shouldShowStreams() { // Given: @@ -122,17 +140,15 @@ public void shouldShowStreamsExtended() { SourceDescriptionFactory.create( stream1, true, - "JSON", ImmutableList.of(), ImmutableList.of(), - Optional.empty()), + Optional.of(topicWith1PartitionAndRfOf1)), SourceDescriptionFactory.create( stream2, true, - "JSON", ImmutableList.of(), ImmutableList.of(), - Optional.empty()) + Optional.of(topicWith1PartitionAndRfOf1)) )); } @@ -191,7 +207,6 @@ public void shouldShowTablesExtended() { SourceDescriptionFactory.create( table1, true, - "JSON", ImmutableList.of(), ImmutableList.of(), Optional.of(client.describeTopic(table1.getKafkaTopicName())) @@ -199,7 +214,6 @@ public void shouldShowTablesExtended() { SourceDescriptionFactory.create( table2, true, - "JSON", ImmutableList.of(), ImmutableList.of(), Optional.of(client.describeTopic(table1.getKafkaTopicName())) @@ -239,12 +253,13 @@ public void shouldShowColumnsSource() { equalTo(SourceDescriptionFactory.create( stream, false, - "JSON", ImmutableList.of(), ImmutableList.of(new RunningQuery( metadata.getStatementString(), ImmutableSet.of(metadata.getSinkName().toString(FormatOptions.noEscape())), - metadata.getQueryId())), + metadata.getQueryId(), + Optional.of(metadata.getState()) + )), Optional.empty()))); } @@ -288,7 +303,7 @@ public void shouldNotCallTopicClientForExtendedDescription() { verify(spyTopicClient, never()).describeTopic(anyString()); } - private void assertSourceListWithWarning( + private static void assertSourceListWithWarning( final KsqlEntity entity, final DataSource... sources) { assertThat(entity, instanceOf(SourceDescriptionList.class)); @@ -302,7 +317,6 @@ private void assertSourceListWithWarning( SourceDescriptionFactory.create( s, true, - "JSON", ImmutableList.of(), ImmutableList.of(), Optional.empty() @@ -389,7 +403,6 @@ public void shouldAddWarningOnClientExceptionForDescription() { SourceDescriptionFactory.create( stream1, true, - "JSON", ImmutableList.of(), ImmutableList.of(), Optional.empty() diff --git a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/resources/KsqlResourceTest.java b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/resources/KsqlResourceTest.java index ca0a2e48258c..d73a51925300 100644 --- a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/resources/KsqlResourceTest.java +++ b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/resources/KsqlResourceTest.java @@ -473,11 +473,11 @@ public void shouldShowStreamsExtended() { assertThat(descriptionList.getSourceDescriptions(), containsInAnyOrder( SourceDescriptionFactory.create( ksqlEngine.getMetaStore().getSource(SourceName.of("TEST_STREAM")), - true, "JSON", Collections.emptyList(), Collections.emptyList(), + true, Collections.emptyList(), Collections.emptyList(), Optional.of(kafkaTopicClient.describeTopic("KAFKA_TOPIC_2"))), SourceDescriptionFactory.create( ksqlEngine.getMetaStore().getSource(SourceName.of("new_stream")), - true, "JSON", Collections.emptyList(), Collections.emptyList(), + true, Collections.emptyList(), Collections.emptyList(), Optional.of(kafkaTopicClient.describeTopic("new_topic")))) ); } @@ -502,11 +502,11 @@ public void shouldShowTablesExtended() { assertThat(descriptionList.getSourceDescriptions(), containsInAnyOrder( SourceDescriptionFactory.create( ksqlEngine.getMetaStore().getSource(SourceName.of("TEST_TABLE")), - true, "JSON", Collections.emptyList(), Collections.emptyList(), + true, Collections.emptyList(), Collections.emptyList(), Optional.of(kafkaTopicClient.describeTopic("KAFKA_TOPIC_1"))), SourceDescriptionFactory.create( ksqlEngine.getMetaStore().getSource(SourceName.of("new_table")), - true, "JSON", Collections.emptyList(), Collections.emptyList(), + true, Collections.emptyList(), Collections.emptyList(), Optional.of(kafkaTopicClient.describeTopic("new_topic")))) ); } @@ -547,7 +547,6 @@ public void shouldDescribeStatement() { final SourceDescription expectedDescription = SourceDescriptionFactory.create( ksqlEngine.getMetaStore().getSource(SourceName.of("DESCRIBED_STREAM")), false, - "JSON", Collections.singletonList(queries.get(1)), Collections.singletonList(queries.get(0)), Optional.empty() @@ -1959,7 +1958,9 @@ private List createRunningQueries( .map(md -> new RunningQuery( md.getStatementString(), ImmutableSet.of(md.getSinkName().toString(FormatOptions.noEscape())), - md.getQueryId())) + md.getQueryId(), + Optional.of(md.getState()) + )) .collect(Collectors.toList()); } diff --git a/ksql-rest-model/src/main/java/io/confluent/ksql/rest/entity/QueryDescription.java b/ksql-rest-model/src/main/java/io/confluent/ksql/rest/entity/QueryDescription.java index d62ec29b6894..cd6bbe599a50 100644 --- a/ksql-rest-model/src/main/java/io/confluent/ksql/rest/entity/QueryDescription.java +++ b/ksql-rest-model/src/main/java/io/confluent/ksql/rest/entity/QueryDescription.java @@ -18,8 +18,11 @@ import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonIgnoreProperties; import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableSet; +import io.confluent.ksql.model.WindowType; import io.confluent.ksql.query.QueryId; -import java.util.Collections; import java.util.List; import java.util.Map; import java.util.Objects; @@ -31,6 +34,7 @@ public class QueryDescription { private final QueryId id; private final String statementText; + private final Optional windowType; private final List fields; private final Set sources; private final Set sinks; @@ -44,6 +48,7 @@ public class QueryDescription { public QueryDescription( @JsonProperty("id") final QueryId id, @JsonProperty("statementText") final String statementText, + @JsonProperty("windowType") final Optional windowType, @JsonProperty("fields") final List fields, @JsonProperty("sources") final Set sources, @JsonProperty("sinks") final Set sinks, @@ -52,14 +57,16 @@ public QueryDescription( @JsonProperty("overriddenProperties") final Map overriddenProperties, @JsonProperty("state") final Optional state ) { - this.id = id; - this.statementText = statementText; - this.fields = Collections.unmodifiableList(fields); - this.sources = Collections.unmodifiableSet(sources); - this.sinks = Collections.unmodifiableSet(sinks); - this.topology = topology; - this.executionPlan = executionPlan; - this.overriddenProperties = Collections.unmodifiableMap(overriddenProperties); + this.id = Objects.requireNonNull(id, "id"); + this.statementText = Objects.requireNonNull(statementText, "statementText"); + this.windowType = Objects.requireNonNull(windowType, "windowType"); + this.fields = ImmutableList.copyOf(Objects.requireNonNull(fields, "fields")); + this.sources = ImmutableSet.copyOf(Objects.requireNonNull(sources, "sources")); + this.sinks = ImmutableSet.copyOf(Objects.requireNonNull(sinks, "sinks")); + this.topology = Objects.requireNonNull(topology, "topology"); + this.executionPlan = Objects.requireNonNull(executionPlan, "executionPlan"); + this.overriddenProperties = ImmutableMap.copyOf(Objects + .requireNonNull(overriddenProperties, "overriddenProperties")); this.state = Objects.requireNonNull(state, "state"); } @@ -71,6 +78,10 @@ public String getStatementText() { return statementText; } + public Optional getWindowType() { + return windowType; + } + public List getFields() { return fields; } @@ -112,6 +123,7 @@ public boolean equals(final Object o) { final QueryDescription that = (QueryDescription) o; return Objects.equals(id, that.id) && Objects.equals(statementText, that.statementText) + && Objects.equals(windowType, that.windowType) && Objects.equals(fields, that.fields) && Objects.equals(topology, that.topology) && Objects.equals(executionPlan, that.executionPlan) @@ -126,6 +138,7 @@ public int hashCode() { return Objects.hash( id, statementText, + windowType, fields, topology, executionPlan, diff --git a/ksql-rest-model/src/main/java/io/confluent/ksql/rest/entity/RunningQuery.java b/ksql-rest-model/src/main/java/io/confluent/ksql/rest/entity/RunningQuery.java index fb03cafbae2d..6b28978686a6 100644 --- a/ksql-rest-model/src/main/java/io/confluent/ksql/rest/entity/RunningQuery.java +++ b/ksql-rest-model/src/main/java/io/confluent/ksql/rest/entity/RunningQuery.java @@ -16,33 +16,44 @@ package io.confluent.ksql.rest.entity; import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonIgnore; import com.fasterxml.jackson.annotation.JsonIgnoreProperties; import com.fasterxml.jackson.annotation.JsonProperty; import io.confluent.ksql.query.QueryId; import java.util.Objects; +import java.util.Optional; import java.util.Set; @JsonIgnoreProperties(ignoreUnknown = true) public class RunningQuery { + private final String queryString; private final Set sinks; private final QueryId id; + private final Optional state; @JsonCreator public RunningQuery( - @JsonProperty("statementText") final String queryString, + @JsonProperty("queryString") final String queryString, @JsonProperty("sinks") final Set sinks, - @JsonProperty("id") final QueryId id + @JsonProperty("id") final QueryId id, + @JsonProperty("state") final Optional state ) { - this.queryString = queryString; - this.sinks = sinks; - this.id = id; + this.queryString = Objects.requireNonNull(queryString, "queryString"); + this.sinks = Objects.requireNonNull(sinks, "sinks"); + this.id = Objects.requireNonNull(id, "id"); + this.state = Objects.requireNonNull(state, "state"); } public String getQueryString() { return queryString; } + @JsonIgnore + public String getQuerySingleLine() { + return queryString.replaceAll(System.lineSeparator(), ""); + } + public Set getSinks() { return sinks; } @@ -51,6 +62,10 @@ public QueryId getId() { return id; } + public Optional getState() { + return state; + } + @Override public boolean equals(final Object o) { if (this == o) { @@ -62,11 +77,12 @@ public boolean equals(final Object o) { final RunningQuery that = (RunningQuery) o; return Objects.equals(id, that.id) && Objects.equals(queryString, that.queryString) - && Objects.equals(sinks, that.sinks); + && Objects.equals(sinks, that.sinks) + && Objects.equals(state, that.state); } @Override public int hashCode() { - return Objects.hash(id, queryString, id); + return Objects.hash(id, queryString, id, state); } } diff --git a/ksql-rest-model/src/main/java/io/confluent/ksql/rest/entity/SourceDescription.java b/ksql-rest-model/src/main/java/io/confluent/ksql/rest/entity/SourceDescription.java index 19416348a798..6affc6eeba41 100644 --- a/ksql-rest-model/src/main/java/io/confluent/ksql/rest/entity/SourceDescription.java +++ b/ksql-rest-model/src/main/java/io/confluent/ksql/rest/entity/SourceDescription.java @@ -20,9 +20,11 @@ import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.annotation.JsonSubTypes; import com.fasterxml.jackson.annotation.JsonTypeName; +import io.confluent.ksql.model.WindowType; import java.util.Collections; import java.util.List; import java.util.Objects; +import java.util.Optional; @JsonIgnoreProperties(ignoreUnknown = true) @JsonTypeName("description") @@ -30,6 +32,7 @@ public class SourceDescription { private final String name; + private final Optional windowType; private final List readQueries; private final List writeQueries; private final List fields; @@ -39,7 +42,8 @@ public class SourceDescription { private final String statistics; private final String errorStats; private final boolean extended; - private final String format; + private final String keyFormat; + private final String valueFormat; private final String topic; private final int partitions; private final int replication; @@ -49,6 +53,7 @@ public class SourceDescription { @JsonCreator public SourceDescription( @JsonProperty("name") final String name, + @JsonProperty("windowType") final Optional windowType, @JsonProperty("readQueries") final List readQueries, @JsonProperty("writeQueries") final List writeQueries, @JsonProperty("fields") final List fields, @@ -58,7 +63,8 @@ public SourceDescription( @JsonProperty("statistics") final String statistics, @JsonProperty("errorStats") final String errorStats, @JsonProperty("extended") final boolean extended, - @JsonProperty("format") final String format, + @JsonProperty("keyFormat") final String keyFormat, + @JsonProperty("valueFormat") final String valueFormat, @JsonProperty("topic") final String topic, @JsonProperty("partitions") final int partitions, @JsonProperty("replication") final int replication, @@ -66,6 +72,7 @@ public SourceDescription( ) { // CHECKSTYLE_RULES.ON: ParameterNumberCheck this.name = Objects.requireNonNull(name, "name"); + this.windowType = Objects.requireNonNull(windowType, "windowType"); this.readQueries = Collections.unmodifiableList(Objects.requireNonNull(readQueries, "readQueries")); this.writeQueries = @@ -77,8 +84,9 @@ public SourceDescription( this.timestamp = Objects.requireNonNull(timestamp, "timestamp"); this.statistics = Objects.requireNonNull(statistics, "statistics"); this.errorStats = Objects.requireNonNull(errorStats, "errorStats"); - this.extended = Objects.requireNonNull(extended, "extended"); - this.format = Objects.requireNonNull(format, "format"); + this.extended = extended; + this.keyFormat = Objects.requireNonNull(keyFormat, "keyFormat"); + this.valueFormat = Objects.requireNonNull(valueFormat, "valueFormat"); this.topic = Objects.requireNonNull(topic, "topic"); this.partitions = partitions; this.replication = replication; @@ -89,6 +97,10 @@ public String getStatement() { return statement; } + public Optional getWindowType() { + return windowType; + } + public int getPartitions() { return partitions; } @@ -113,8 +125,12 @@ public String getType() { return type; } - public String getFormat() { - return format; + public String getKeyFormat() { + return keyFormat; + } + + public String getValueFormat() { + return valueFormat; } public String getTopic() { @@ -145,62 +161,41 @@ public String getErrorStats() { return errorStats; } - private boolean equals2(final SourceDescription that) { - if (!Objects.equals(topic, that.topic)) { - return false; - } - if (!Objects.equals(key, that.key)) { - return false; - } - if (!Objects.equals(writeQueries, that.writeQueries)) { - return false; - } - if (!Objects.equals(readQueries, that.readQueries)) { - return false; - } - if (!Objects.equals(timestamp, that.timestamp)) { - return false; - } - if (!Objects.equals(statistics, that.statistics)) { - return false; - } - if (!Objects.equals(errorStats, that.errorStats)) { - return false; - } - return Objects.equals(statement, that.statement); - } - + // CHECKSTYLE_RULES.OFF: CyclomaticComplexity @Override public boolean equals(final Object o) { + // CHECKSTYLE_RULES.ON: CyclomaticComplexity if (this == o) { return true; } - if (!(o instanceof SourceDescription)) { + if (o == null || getClass() != o.getClass()) { return false; } final SourceDescription that = (SourceDescription) o; - if (!Objects.equals(name, that.name)) { - return false; - } - if (!Objects.equals(fields, that.fields)) { - return false; - } - if (!Objects.equals(extended, that.extended)) { - return false; - } - if (!Objects.equals(type, that.type)) { - return false; - } - if (!Objects.equals(format, that.format)) { - return false; - } - return equals2(that); + return extended == that.extended + && partitions == that.partitions + && replication == that.replication + && Objects.equals(name, that.name) + && Objects.equals(windowType, that.windowType) + && Objects.equals(readQueries, that.readQueries) + && Objects.equals(writeQueries, that.writeQueries) + && Objects.equals(fields, that.fields) + && Objects.equals(type, that.type) + && Objects.equals(key, that.key) + && Objects.equals(timestamp, that.timestamp) + && Objects.equals(statistics, that.statistics) + && Objects.equals(errorStats, that.errorStats) + && Objects.equals(keyFormat, that.keyFormat) + && Objects.equals(valueFormat, that.valueFormat) + && Objects.equals(topic, that.topic) + && Objects.equals(statement, that.statement); } @Override public int hashCode() { return Objects.hash( name, + windowType, readQueries, writeQueries, fields, @@ -210,7 +205,8 @@ public int hashCode() { statistics, errorStats, extended, - format, + keyFormat, + valueFormat, topic, partitions, replication, diff --git a/ksql-rest-model/src/test/java/io/confluent/ksql/rest/entity/SourceDescriptionTest.java b/ksql-rest-model/src/test/java/io/confluent/ksql/rest/entity/SourceDescriptionTest.java index 5027bbd9b229..6e793f28ad39 100644 --- a/ksql-rest-model/src/test/java/io/confluent/ksql/rest/entity/SourceDescriptionTest.java +++ b/ksql-rest-model/src/test/java/io/confluent/ksql/rest/entity/SourceDescriptionTest.java @@ -15,8 +15,12 @@ package io.confluent.ksql.rest.entity; +import com.google.common.collect.ImmutableList; import com.google.common.testing.EqualsTester; +import io.confluent.ksql.model.WindowType; import java.util.Collections; +import java.util.List; +import java.util.Optional; import org.junit.Test; import org.junit.runner.RunWith; import org.mockito.Mock; @@ -28,26 +32,146 @@ public class SourceDescriptionTest { private static final String SOME_STRING = "some string"; private static final int SOME_INT = 3; private static final boolean SOME_BOOL = true; - - @Mock - private RunningQuery runningQuery; + @Mock + private RunningQuery query1; + @Mock + private RunningQuery query2; @Mock private FieldInfo fieldInfo; + @SuppressWarnings("UnstableApiUsage") @Test public void shouldImplementHashCodeAndEqualsProperty() { - new EqualsTester() - .addEqualityGroup( - new SourceDescription( - SOME_STRING, Collections.singletonList(runningQuery), Collections.singletonList(runningQuery), - Collections.singletonList(fieldInfo), SOME_STRING, SOME_STRING, SOME_STRING, SOME_STRING, - SOME_STRING, SOME_BOOL, SOME_STRING, SOME_STRING, SOME_INT, SOME_INT, SOME_STRING), - new SourceDescription( - SOME_STRING, Collections.singletonList(runningQuery), Collections.singletonList(runningQuery), - Collections.singletonList(fieldInfo), SOME_STRING, SOME_STRING, SOME_STRING, SOME_STRING, - SOME_STRING, SOME_BOOL, SOME_STRING, SOME_STRING, SOME_INT, SOME_INT, SOME_STRING) - ) - .testEquals(); + final List readQueries = Collections.singletonList(query1); + final List writeQueries = Collections.singletonList(query2); + final List fields = Collections.singletonList(fieldInfo); + + new EqualsTester() + .addEqualityGroup( + new SourceDescription( + SOME_STRING, Optional.empty(), readQueries, writeQueries, fields, SOME_STRING, + SOME_STRING, SOME_STRING, SOME_STRING, SOME_STRING, + SOME_BOOL, SOME_STRING, SOME_STRING, SOME_STRING, SOME_INT, SOME_INT, + SOME_STRING), + new SourceDescription( + SOME_STRING, Optional.empty(), readQueries, writeQueries, fields, SOME_STRING, + SOME_STRING, SOME_STRING, SOME_STRING, SOME_STRING, + SOME_BOOL, SOME_STRING, SOME_STRING, SOME_STRING, SOME_INT, SOME_INT, + SOME_STRING) + ) + .addEqualityGroup( + new SourceDescription( + "diff", Optional.of(WindowType.SESSION), readQueries, writeQueries, fields, + SOME_STRING, SOME_STRING, SOME_STRING, SOME_STRING, SOME_STRING, + SOME_BOOL, SOME_STRING, SOME_STRING, SOME_STRING, SOME_INT, SOME_INT, + SOME_STRING) + ) + .addEqualityGroup( + new SourceDescription( + SOME_STRING, Optional.empty(), ImmutableList.of(), writeQueries, fields, + SOME_STRING, SOME_STRING, SOME_STRING, SOME_STRING, SOME_STRING, + SOME_BOOL, SOME_STRING, SOME_STRING, SOME_STRING, SOME_INT, SOME_INT, + SOME_STRING) + ) + .addEqualityGroup( + new SourceDescription( + SOME_STRING, Optional.empty(), readQueries, ImmutableList.of(), fields, + SOME_STRING, SOME_STRING, SOME_STRING, SOME_STRING, SOME_STRING, + SOME_BOOL, SOME_STRING, SOME_STRING, SOME_STRING, SOME_INT, SOME_INT, + SOME_STRING) + ) + .addEqualityGroup( + new SourceDescription( + SOME_STRING, Optional.empty(), readQueries, writeQueries, ImmutableList.of(), + SOME_STRING, SOME_STRING, SOME_STRING, SOME_STRING, SOME_STRING, + SOME_BOOL, SOME_STRING, SOME_STRING, SOME_STRING, SOME_INT, SOME_INT, + SOME_STRING) + ) + .addEqualityGroup( + new SourceDescription( + SOME_STRING, Optional.empty(), readQueries, writeQueries, fields, "diff", + SOME_STRING, SOME_STRING, SOME_STRING, SOME_STRING, + SOME_BOOL, SOME_STRING, SOME_STRING, SOME_STRING, SOME_INT, SOME_INT, + SOME_STRING) + ) + .addEqualityGroup( + new SourceDescription( + SOME_STRING, Optional.empty(), readQueries, writeQueries, fields, SOME_STRING, + "diff", SOME_STRING, SOME_STRING, SOME_STRING, + SOME_BOOL, SOME_STRING, SOME_STRING, SOME_STRING, SOME_INT, SOME_INT, + SOME_STRING) + ) + .addEqualityGroup( + new SourceDescription( + SOME_STRING, Optional.empty(), readQueries, writeQueries, fields, SOME_STRING, + SOME_STRING, "diff", SOME_STRING, SOME_STRING, + SOME_BOOL, SOME_STRING, SOME_STRING, SOME_STRING, SOME_INT, SOME_INT, + SOME_STRING) + ) + .addEqualityGroup( + new SourceDescription( + SOME_STRING, Optional.empty(), readQueries, writeQueries, fields, SOME_STRING, + SOME_STRING, SOME_STRING, "diff", SOME_STRING, + SOME_BOOL, SOME_STRING, SOME_STRING, SOME_STRING, SOME_INT, SOME_INT, + SOME_STRING) + ) + .addEqualityGroup( + new SourceDescription( + SOME_STRING, Optional.empty(), readQueries, writeQueries, fields, SOME_STRING, + SOME_STRING, SOME_STRING, SOME_STRING, "diff", + SOME_BOOL, SOME_STRING, SOME_STRING, SOME_STRING, SOME_INT, SOME_INT, + SOME_STRING) + ) + .addEqualityGroup( + new SourceDescription( + SOME_STRING, Optional.empty(), readQueries, writeQueries, fields, SOME_STRING, + SOME_STRING, SOME_STRING, SOME_STRING, SOME_STRING, + SOME_BOOL, "diff", SOME_STRING, SOME_STRING, SOME_INT, SOME_INT, + SOME_STRING) + ) + .addEqualityGroup( + new SourceDescription( + SOME_STRING, Optional.empty(), readQueries, writeQueries, fields, SOME_STRING, + SOME_STRING, SOME_STRING, SOME_STRING, SOME_STRING, + !SOME_BOOL, SOME_STRING, SOME_STRING, SOME_STRING, SOME_INT, SOME_INT, + SOME_STRING) + ) + .addEqualityGroup( + new SourceDescription( + SOME_STRING, Optional.empty(), readQueries, writeQueries, fields, SOME_STRING, + SOME_STRING, SOME_STRING, SOME_STRING, SOME_STRING, + SOME_BOOL, SOME_STRING, "diff", SOME_STRING, SOME_INT, SOME_INT, + SOME_STRING) + ) + .addEqualityGroup( + new SourceDescription( + SOME_STRING, Optional.empty(), readQueries, writeQueries, fields, SOME_STRING, + SOME_STRING, SOME_STRING, SOME_STRING, SOME_STRING, + SOME_BOOL, SOME_STRING, SOME_STRING, "diff", SOME_INT, SOME_INT, + SOME_STRING) + ) + .addEqualityGroup( + new SourceDescription( + SOME_STRING, Optional.empty(), readQueries, writeQueries, fields, SOME_STRING, + SOME_STRING, SOME_STRING, SOME_STRING, SOME_STRING, + SOME_BOOL, SOME_STRING, SOME_STRING, SOME_STRING, SOME_INT + 1, SOME_INT, + SOME_STRING) + ) + .addEqualityGroup( + new SourceDescription( + SOME_STRING, Optional.empty(), readQueries, writeQueries, fields, SOME_STRING, + SOME_STRING, SOME_STRING, SOME_STRING, SOME_STRING, + SOME_BOOL, SOME_STRING, SOME_STRING, SOME_STRING, SOME_INT, SOME_INT + 1, + SOME_STRING) + ) + .addEqualityGroup( + new SourceDescription( + SOME_STRING, Optional.empty(), readQueries, writeQueries, fields, SOME_STRING, + SOME_STRING, SOME_STRING, SOME_STRING, SOME_STRING, + SOME_BOOL, SOME_STRING, SOME_STRING, SOME_STRING, SOME_INT, SOME_INT, + "diff") + ) + .testEquals(); } }