From 39b7232b924631500b51363f125ca904dc31ac2b Mon Sep 17 00:00:00 2001 From: Steven Zhang Date: Fri, 13 Dec 2019 13:51:26 -0800 Subject: [PATCH] feat: add source statement to SourceDescription --- .../confluent/ksql/cli/console/Console.java | 1 + .../ksql/cli/console/ConsoleTest.java | 19 +++++-- .../rest/entity/SourceDescriptionFactory.java | 3 +- .../DescribeConnectorExecutorTest.java | 2 + .../ksql/rest/entity/SourceDescription.java | 56 ++++++++++++++----- .../rest/entity/SourceDescriptionTest.java | 54 ++++++++++++++++++ 6 files changed, 113 insertions(+), 22 deletions(-) create mode 100644 ksql-rest-model/src/test/java/io/confluent/ksql/rest/entity/SourceDescriptionTest.java diff --git a/ksql-cli/src/main/java/io/confluent/ksql/cli/console/Console.java b/ksql-cli/src/main/java/io/confluent/ksql/cli/console/Console.java index a6a0ce7e3fd2..f27bcffd0ba7 100644 --- a/ksql-cli/src/main/java/io/confluent/ksql/cli/console/Console.java +++ b/ksql-cli/src/main/java/io/confluent/ksql/cli/console/Console.java @@ -570,6 +570,7 @@ private void printSourceDescription(final SourceDescription source) { writer().println(String.format("%-20s : %s", "Type", source.getType())); printTopicInfo(source); + writer().println(String.format("%-20s : %s", "Statement", source.getStatement())); writer().println(""); printSchema(source.getFields(), source.getKey()); diff --git a/ksql-cli/src/test/java/io/confluent/ksql/cli/console/ConsoleTest.java b/ksql-cli/src/test/java/io/confluent/ksql/cli/console/ConsoleTest.java index 14d2feafbd20..31b5bbf9b18e 100644 --- a/ksql-cli/src/test/java/io/confluent/ksql/cli/console/ConsoleTest.java +++ b/ksql-cli/src/test/java/io/confluent/ksql/cli/console/ConsoleTest.java @@ -122,7 +122,8 @@ public class ConsoleTest { "avro", "kadka-topic", 2, - 1 + 1, + "statement" ); @Parameterized.Parameters(name = "{0}") @@ -349,7 +350,8 @@ public void testPrintSourceDescription() { "avro", "kadka-topic", 1, - 1 + 1, + "sql statement" ), Collections.emptyList() ) @@ -471,7 +473,8 @@ public void testPrintSourceDescription() { + " \"format\" : \"avro\",\n" + " \"topic\" : \"kadka-topic\",\n" + " \"partitions\" : 1,\n" - + " \"replication\" : 1\n" + + " \"replication\" : 1,\n" + + " \"statement\" : \"sql statement\"\n" + " },\n" + " \"warnings\" : [ ]\n" + "} ]\n")); @@ -613,7 +616,8 @@ public void testPrintConnectorDescription() { + " \"format\" : \"avro\",\n" + " \"topic\" : \"kadka-topic\",\n" + " \"partitions\" : 2,\n" - + " \"replication\" : 1\n" + + " \"replication\" : 1,\n" + + " \"statement\" : \"statement\"\n" + " } ],\n" + " \"topics\" : [ \"a-jdbc-topic\" ],\n" + " \"warnings\" : [ ]\n" @@ -990,7 +994,8 @@ public void shouldPrintTopicDescribeExtended() { true, "avro", "kadka-topic", - 2, 1 + 2, 1, + "sql statement text" ), Collections.emptyList() )) @@ -1048,7 +1053,8 @@ public void shouldPrintTopicDescribeExtended() { + " \"format\" : \"avro\",\n" + " \"topic\" : \"kadka-topic\",\n" + " \"partitions\" : 2,\n" - + " \"replication\" : 1\n" + + " \"replication\" : 1,\n" + + " \"statement\" : \"sql statement text\"\n" + " },\n" + " \"warnings\" : [ ]\n" + "} ]\n")); @@ -1061,6 +1067,7 @@ public void shouldPrintTopicDescribeExtended() { + "Timestamp field : 2000-01-01\n" + "Value format : avro\n" + "Kafka topic : kadka-topic (partitions: 2, replication: 1)\n" + + "Statement : sql statement text\n" + "\n" + " Field | Type \n" + "-------------------------------------\n" diff --git a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/entity/SourceDescriptionFactory.java b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/entity/SourceDescriptionFactory.java index 1c0ca86be7ce..ea357831049a 100644 --- a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/entity/SourceDescriptionFactory.java +++ b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/entity/SourceDescriptionFactory.java @@ -57,7 +57,8 @@ public static SourceDescription create( format, dataSource.getKafkaTopicName(), topicDescription.map(td -> td.partitions().size()).orElse(0), - topicDescription.map(td -> td.partitions().get(0).replicas().size()).orElse(0) + topicDescription.map(td -> td.partitions().get(0).replicas().size()).orElse(0), + dataSource.getSqlExpression() ); } } diff --git a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/execution/DescribeConnectorExecutorTest.java b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/execution/DescribeConnectorExecutorTest.java index f6f5eeaaca55..103957867e7a 100644 --- a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/execution/DescribeConnectorExecutorTest.java +++ b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/execution/DescribeConnectorExecutorTest.java @@ -76,6 +76,7 @@ public class DescribeConnectorExecutorTest { private static final String TOPIC = "kafka-topic"; + private static final String STATEMENT = "statement"; private static final String CONNECTOR_NAME = "connector"; private static final String CONNECTOR_CLASS = "io.confluent.ConnectorClazz"; @@ -122,6 +123,7 @@ public void setUp() { when(serviceContext.getAdminClient()).thenReturn(adminClient); when(metaStore.getAllDataSources()).thenReturn(ImmutableMap.of(SourceName.of("source"), source)); when(source.getKafkaTopicName()).thenReturn(TOPIC); + when(source.getSqlExpression()).thenReturn(STATEMENT); when(source.getKsqlTopic()).thenReturn( new KsqlTopic( TOPIC, diff --git a/ksql-rest-model/src/main/java/io/confluent/ksql/rest/entity/SourceDescription.java b/ksql-rest-model/src/main/java/io/confluent/ksql/rest/entity/SourceDescription.java index 5066c778fbd8..19416348a798 100644 --- a/ksql-rest-model/src/main/java/io/confluent/ksql/rest/entity/SourceDescription.java +++ b/ksql-rest-model/src/main/java/io/confluent/ksql/rest/entity/SourceDescription.java @@ -43,6 +43,7 @@ public class SourceDescription { private final String topic; private final int partitions; private final int replication; + private final String statement; // CHECKSTYLE_RULES.OFF: ParameterNumberCheck @JsonCreator @@ -60,23 +61,32 @@ public SourceDescription( @JsonProperty("format") final String format, @JsonProperty("topic") final String topic, @JsonProperty("partitions") final int partitions, - @JsonProperty("replication") final int replication + @JsonProperty("replication") final int replication, + @JsonProperty("statement") final String statement ) { // CHECKSTYLE_RULES.ON: ParameterNumberCheck - this.name = name; - this.readQueries = Collections.unmodifiableList(readQueries); - this.writeQueries = Collections.unmodifiableList(writeQueries); - this.fields = Collections.unmodifiableList(fields); - this.type = type; - this.key = key; - this.timestamp = timestamp; - this.statistics = statistics; - this.errorStats = errorStats; - this.extended = extended; - this.format = format; - this.topic = topic; + this.name = Objects.requireNonNull(name, "name"); + this.readQueries = + Collections.unmodifiableList(Objects.requireNonNull(readQueries, "readQueries")); + this.writeQueries = + Collections.unmodifiableList(Objects.requireNonNull(writeQueries, "writeQueries")); + this.fields = + Collections.unmodifiableList(Objects.requireNonNull(fields, "fields")); + this.type = Objects.requireNonNull(type, "type"); + this.key = Objects.requireNonNull(key, "key"); + this.timestamp = Objects.requireNonNull(timestamp, "timestamp"); + this.statistics = Objects.requireNonNull(statistics, "statistics"); + this.errorStats = Objects.requireNonNull(errorStats, "errorStats"); + this.extended = Objects.requireNonNull(extended, "extended"); + this.format = Objects.requireNonNull(format, "format"); + this.topic = Objects.requireNonNull(topic, "topic"); this.partitions = partitions; this.replication = replication; + this.statement = Objects.requireNonNull(statement, "statement"); + } + + public String getStatement() { + return statement; } public int getPartitions() { @@ -157,7 +167,7 @@ private boolean equals2(final SourceDescription that) { if (!Objects.equals(errorStats, that.errorStats)) { return false; } - return true; + return Objects.equals(statement, that.statement); } @Override @@ -189,6 +199,22 @@ public boolean equals(final Object o) { @Override public int hashCode() { - return Objects.hash(name, fields, type, key, timestamp); + return Objects.hash( + name, + readQueries, + writeQueries, + fields, + type, + key, + timestamp, + statistics, + errorStats, + extended, + format, + topic, + partitions, + replication, + statement + ); } } diff --git a/ksql-rest-model/src/test/java/io/confluent/ksql/rest/entity/SourceDescriptionTest.java b/ksql-rest-model/src/test/java/io/confluent/ksql/rest/entity/SourceDescriptionTest.java new file mode 100644 index 000000000000..ba4c587c3caa --- /dev/null +++ b/ksql-rest-model/src/test/java/io/confluent/ksql/rest/entity/SourceDescriptionTest.java @@ -0,0 +1,54 @@ +/* + * Copyright 2019 Confluent Inc. + * + * Licensed under the Confluent Community License (the "License"); you may not use + * this file except in compliance with the License. You may obtain a copy of the + * License at + * + * http://www.confluent.io/confluent-community-license + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package io.confluent.ksql.rest.entity; + +import com.google.common.testing.EqualsTester; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.Mock; +import org.mockito.junit.MockitoJUnitRunner; + +import java.util.Collections; + +@RunWith(MockitoJUnitRunner.class) +public class SourceDescriptionTest { + + private static final String SOME_STRING = "some string"; + private static final int SOME_INT = 3; + private static final boolean SOME_BOOL = true; + + @Mock + private RunningQuery runningQuery; + + @Mock + private FieldInfo fieldInfo; + + @Test + public void shouldImplementHashCodeAndEqualsProperty() { + new EqualsTester() + .addEqualityGroup( + new SourceDescription( + SOME_STRING, Collections.singletonList(runningQuery), Collections.singletonList(runningQuery), + Collections.singletonList(fieldInfo), SOME_STRING, SOME_STRING, SOME_STRING, SOME_STRING, + SOME_STRING, SOME_BOOL, SOME_STRING, SOME_STRING, SOME_INT, SOME_INT, SOME_STRING), + new SourceDescription( + SOME_STRING, Collections.singletonList(runningQuery), Collections.singletonList(runningQuery), + Collections.singletonList(fieldInfo), SOME_STRING, SOME_STRING, SOME_STRING, SOME_STRING, + SOME_STRING, SOME_BOOL, SOME_STRING, SOME_STRING, SOME_INT, SOME_INT, SOME_STRING) + ) + .testEquals(); + } + }