Skip to content

Commit

Permalink
feat: add source statement to SourceDescription
Browse files Browse the repository at this point in the history
  • Loading branch information
stevenpyzhang committed Dec 16, 2019
1 parent 3eecd40 commit 39b7232
Show file tree
Hide file tree
Showing 6 changed files with 113 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -570,6 +570,7 @@ private void printSourceDescription(final SourceDescription source) {
writer().println(String.format("%-20s : %s", "Type", source.getType()));

printTopicInfo(source);
writer().println(String.format("%-20s : %s", "Statement", source.getStatement()));
writer().println("");

printSchema(source.getFields(), source.getKey());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,8 @@ public class ConsoleTest {
"avro",
"kadka-topic",
2,
1
1,
"statement"
);

@Parameterized.Parameters(name = "{0}")
Expand Down Expand Up @@ -349,7 +350,8 @@ public void testPrintSourceDescription() {
"avro",
"kadka-topic",
1,
1
1,
"sql statement"
),
Collections.emptyList()
)
Expand Down Expand Up @@ -471,7 +473,8 @@ public void testPrintSourceDescription() {
+ " \"format\" : \"avro\",\n"
+ " \"topic\" : \"kadka-topic\",\n"
+ " \"partitions\" : 1,\n"
+ " \"replication\" : 1\n"
+ " \"replication\" : 1,\n"
+ " \"statement\" : \"sql statement\"\n"
+ " },\n"
+ " \"warnings\" : [ ]\n"
+ "} ]\n"));
Expand Down Expand Up @@ -613,7 +616,8 @@ public void testPrintConnectorDescription() {
+ " \"format\" : \"avro\",\n"
+ " \"topic\" : \"kadka-topic\",\n"
+ " \"partitions\" : 2,\n"
+ " \"replication\" : 1\n"
+ " \"replication\" : 1,\n"
+ " \"statement\" : \"statement\"\n"
+ " } ],\n"
+ " \"topics\" : [ \"a-jdbc-topic\" ],\n"
+ " \"warnings\" : [ ]\n"
Expand Down Expand Up @@ -990,7 +994,8 @@ public void shouldPrintTopicDescribeExtended() {
true,
"avro",
"kadka-topic",
2, 1
2, 1,
"sql statement text"
),
Collections.emptyList()
))
Expand Down Expand Up @@ -1048,7 +1053,8 @@ public void shouldPrintTopicDescribeExtended() {
+ " \"format\" : \"avro\",\n"
+ " \"topic\" : \"kadka-topic\",\n"
+ " \"partitions\" : 2,\n"
+ " \"replication\" : 1\n"
+ " \"replication\" : 1,\n"
+ " \"statement\" : \"sql statement text\"\n"
+ " },\n"
+ " \"warnings\" : [ ]\n"
+ "} ]\n"));
Expand All @@ -1061,6 +1067,7 @@ public void shouldPrintTopicDescribeExtended() {
+ "Timestamp field : 2000-01-01\n"
+ "Value format : avro\n"
+ "Kafka topic : kadka-topic (partitions: 2, replication: 1)\n"
+ "Statement : sql statement text\n"
+ "\n"
+ " Field | Type \n"
+ "-------------------------------------\n"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,8 @@ public static SourceDescription create(
format,
dataSource.getKafkaTopicName(),
topicDescription.map(td -> td.partitions().size()).orElse(0),
topicDescription.map(td -> td.partitions().get(0).replicas().size()).orElse(0)
topicDescription.map(td -> td.partitions().get(0).replicas().size()).orElse(0),
dataSource.getSqlExpression()
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@
public class DescribeConnectorExecutorTest {

private static final String TOPIC = "kafka-topic";
private static final String STATEMENT = "statement";
private static final String CONNECTOR_NAME = "connector";
private static final String CONNECTOR_CLASS = "io.confluent.ConnectorClazz";

Expand Down Expand Up @@ -122,6 +123,7 @@ public void setUp() {
when(serviceContext.getAdminClient()).thenReturn(adminClient);
when(metaStore.getAllDataSources()).thenReturn(ImmutableMap.of(SourceName.of("source"), source));
when(source.getKafkaTopicName()).thenReturn(TOPIC);
when(source.getSqlExpression()).thenReturn(STATEMENT);
when(source.getKsqlTopic()).thenReturn(
new KsqlTopic(
TOPIC,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ public class SourceDescription {
private final String topic;
private final int partitions;
private final int replication;
private final String statement;

// CHECKSTYLE_RULES.OFF: ParameterNumberCheck
@JsonCreator
Expand All @@ -60,23 +61,32 @@ public SourceDescription(
@JsonProperty("format") final String format,
@JsonProperty("topic") final String topic,
@JsonProperty("partitions") final int partitions,
@JsonProperty("replication") final int replication
@JsonProperty("replication") final int replication,
@JsonProperty("statement") final String statement
) {
// CHECKSTYLE_RULES.ON: ParameterNumberCheck
this.name = name;
this.readQueries = Collections.unmodifiableList(readQueries);
this.writeQueries = Collections.unmodifiableList(writeQueries);
this.fields = Collections.unmodifiableList(fields);
this.type = type;
this.key = key;
this.timestamp = timestamp;
this.statistics = statistics;
this.errorStats = errorStats;
this.extended = extended;
this.format = format;
this.topic = topic;
this.name = Objects.requireNonNull(name, "name");
this.readQueries =
Collections.unmodifiableList(Objects.requireNonNull(readQueries, "readQueries"));
this.writeQueries =
Collections.unmodifiableList(Objects.requireNonNull(writeQueries, "writeQueries"));
this.fields =
Collections.unmodifiableList(Objects.requireNonNull(fields, "fields"));
this.type = Objects.requireNonNull(type, "type");
this.key = Objects.requireNonNull(key, "key");
this.timestamp = Objects.requireNonNull(timestamp, "timestamp");
this.statistics = Objects.requireNonNull(statistics, "statistics");
this.errorStats = Objects.requireNonNull(errorStats, "errorStats");
this.extended = Objects.requireNonNull(extended, "extended");
this.format = Objects.requireNonNull(format, "format");
this.topic = Objects.requireNonNull(topic, "topic");
this.partitions = partitions;
this.replication = replication;
this.statement = Objects.requireNonNull(statement, "statement");
}

public String getStatement() {
return statement;
}

public int getPartitions() {
Expand Down Expand Up @@ -157,7 +167,7 @@ private boolean equals2(final SourceDescription that) {
if (!Objects.equals(errorStats, that.errorStats)) {
return false;
}
return true;
return Objects.equals(statement, that.statement);
}

@Override
Expand Down Expand Up @@ -189,6 +199,22 @@ public boolean equals(final Object o) {

@Override
public int hashCode() {
return Objects.hash(name, fields, type, key, timestamp);
return Objects.hash(
name,
readQueries,
writeQueries,
fields,
type,
key,
timestamp,
statistics,
errorStats,
extended,
format,
topic,
partitions,
replication,
statement
);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
/*
* Copyright 2019 Confluent Inc.
*
* Licensed under the Confluent Community License (the "License"); you may not use
* this file except in compliance with the License. You may obtain a copy of the
* License at
*
* http://www.confluent.io/confluent-community-license
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/

package io.confluent.ksql.rest.entity;

import com.google.common.testing.EqualsTester;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.junit.MockitoJUnitRunner;

import java.util.Collections;

@RunWith(MockitoJUnitRunner.class)
public class SourceDescriptionTest {

private static final String SOME_STRING = "some string";
private static final int SOME_INT = 3;
private static final boolean SOME_BOOL = true;

@Mock
private RunningQuery runningQuery;

@Mock
private FieldInfo fieldInfo;

@Test
public void shouldImplementHashCodeAndEqualsProperty() {
new EqualsTester()
.addEqualityGroup(
new SourceDescription(
SOME_STRING, Collections.singletonList(runningQuery), Collections.singletonList(runningQuery),
Collections.singletonList(fieldInfo), SOME_STRING, SOME_STRING, SOME_STRING, SOME_STRING,
SOME_STRING, SOME_BOOL, SOME_STRING, SOME_STRING, SOME_INT, SOME_INT, SOME_STRING),
new SourceDescription(
SOME_STRING, Collections.singletonList(runningQuery), Collections.singletonList(runningQuery),
Collections.singletonList(fieldInfo), SOME_STRING, SOME_STRING, SOME_STRING, SOME_STRING,
SOME_STRING, SOME_BOOL, SOME_STRING, SOME_STRING, SOME_INT, SOME_INT, SOME_STRING)
)
.testEquals();
}
}

0 comments on commit 39b7232

Please sign in to comment.