Skip to content

Commit

Permalink
refactor: clean up of test to ensure all queries are closed (MINOR) (#…
Browse files Browse the repository at this point in the history
…3243)

* refactor: clean up of test to ensure all queries are closed (MINOR)
  • Loading branch information
big-andy-coates authored Aug 22, 2019
1 parent ddb970b commit 1552f92
Showing 1 changed file with 42 additions and 34 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,17 +22,16 @@
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.startsWith;
import static org.hamcrest.core.IsInstanceOf.instanceOf;
import static org.junit.Assert.assertThat;

import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import io.confluent.ksql.GenericRow;
import io.confluent.ksql.function.udf.Udf;
import io.confluent.ksql.function.udf.UdfDescription;
import io.confluent.ksql.query.QueryId;
import io.confluent.ksql.serde.Format;
import io.confluent.ksql.util.KsqlConstants;
import io.confluent.ksql.util.PageViewDataProvider;
import io.confluent.ksql.util.PersistentQueryMetadata;
Expand Down Expand Up @@ -77,6 +76,7 @@
* This test emulates the end to end flow in the quick start guide and ensures that the outputs at each stage
* are what we expect. This tests a broad set of KSQL functionality and is a good catch-all.
*/
@SuppressWarnings("ConstantConditions")
@Category({IntegrationTest.class})
public class EndToEndIntegrationTest {

Expand All @@ -87,6 +87,9 @@ public class EndToEndIntegrationTest {
private static final String PAGE_VIEW_STREAM = "pageviews_original";
private static final String USER_TABLE = "users_original";

private static final Format VALUE_FORMAT = JSON;
private static final UserDataProvider USER_DATA_PROVIDER = new UserDataProvider();

private static final AtomicInteger CONSUMED_COUNT = new AtomicInteger();
private static final AtomicInteger PRODUCED_COUNT = new AtomicInteger();
private static final PageViewDataProvider PAGE_VIEW_DATA_PROVIDER = new PageViewDataProvider();
Expand Down Expand Up @@ -121,21 +124,30 @@ public class EndToEndIntegrationTest {
@Rule
public final Timeout timeout = Timeout.seconds(120);

private QueryMetadata toClose;
private final List<QueryMetadata> toClose = new ArrayList<>();

@Before
public void before() {
ConfigurableUdf.PASSED_CONFIG = null;
PRODUCED_COUNT.set(0);
CONSUMED_COUNT.set(0);
toClose.clear();

TEST_HARNESS.ensureTopics(PAGE_VIEW_TOPIC, USERS_TOPIC);

TEST_HARNESS
.produceRows(USERS_TOPIC, new UserDataProvider(), JSON,
() -> System.currentTimeMillis() - 10000);
TEST_HARNESS
.produceRows(PAGE_VIEW_TOPIC, PAGE_VIEW_DATA_PROVIDER, JSON, System::currentTimeMillis);
TEST_HARNESS.produceRows(
USERS_TOPIC,
USER_DATA_PROVIDER,
VALUE_FORMAT,
() -> System.currentTimeMillis() - 10000
);

TEST_HARNESS.produceRows(
PAGE_VIEW_TOPIC,
PAGE_VIEW_DATA_PROVIDER,
VALUE_FORMAT,
System::currentTimeMillis
);

ksqlContext.sql("CREATE TABLE " + USER_TABLE
+ " (registertime bigint, gender varchar, regionid varchar, userid varchar)"
Expand All @@ -148,18 +160,14 @@ public void before() {

@After
public void after() {
if (toClose != null) {
toClose.close();
}
toClose.forEach(QueryMetadata::close);
}

@Test
public void shouldSelectAllFromUsers() throws Exception {
final TransientQueryMetadata queryMetadata = executeQuery(
"SELECT * from %s;", USER_TABLE);
final TransientQueryMetadata queryMetadata = executeStatement("SELECT * from %s;", USER_TABLE);

final Set<String> expectedUsers = ImmutableSet
.of("USER_0", "USER_1", "USER_2", "USER_3", "USER_4");
final Set<?> expectedUsers = USER_DATA_PROVIDER.data().keySet();

final List<GenericRow> rows = verifyAvailableRows(queryMetadata, expectedUsers.size());

Expand All @@ -176,7 +184,7 @@ public void shouldSelectAllFromUsers() throws Exception {
@Test
public void shouldSelectFromPageViewsWithSpecificColumn() throws Exception {
final TransientQueryMetadata queryMetadata =
executeQuery("SELECT pageid from %s;", PAGE_VIEW_STREAM);
executeStatement("SELECT pageid from %s;", PAGE_VIEW_STREAM);

final List<String> expectedPages =
Arrays.asList("PAGE_1", "PAGE_2", "PAGE_3", "PAGE_4", "PAGE_5", "PAGE_5", "PAGE_5");
Expand Down Expand Up @@ -204,7 +212,7 @@ public void shouldSelectAllFromDerivedStream() throws Exception {
USER_TABLE, PAGE_VIEW_STREAM, USER_TABLE, PAGE_VIEW_STREAM,
USER_TABLE);

final TransientQueryMetadata queryMetadata = executeQuery(
final TransientQueryMetadata queryMetadata = executeStatement(
"SELECT * from pageviews_female;");

final List<KeyValue<String, GenericRow>> results = new ArrayList<>();
Expand Down Expand Up @@ -267,7 +275,7 @@ public void shouldCreateStreamUsingLikeClause() throws Exception {
PAGE_VIEW_STREAM);

final TransientQueryMetadata queryMetadata =
executeQuery("SELECT userid, pageid from pageviews_like_p5;");
executeStatement("SELECT userid, pageid from pageviews_like_p5;");

final List<Object> columns = waitForFirstRow(queryMetadata);

Expand All @@ -284,7 +292,7 @@ public void shouldRetainSelectedColumnsInPartitionBy() throws Exception {
+ "partition by viewtime;",
PAGE_VIEW_STREAM);

final TransientQueryMetadata queryMetadata = executeQuery(
final TransientQueryMetadata queryMetadata = executeStatement(
"SELECT * from pageviews_by_viewtime;");

final List<Object> columns = waitForFirstRow(queryMetadata);
Expand All @@ -311,7 +319,7 @@ public void shouldSupportDroppingAndRecreatingJoinQuery() throws Exception {

executeStatement(createStreamStatement);

final TransientQueryMetadata queryMetadata = executeQuery(
final TransientQueryMetadata queryMetadata = executeStatement(
"SELECT * from cart_event_product;");

final List<Object> columns = waitForFirstRow(queryMetadata);
Expand Down Expand Up @@ -347,7 +355,7 @@ public void shouldCleanUpAvroSchemaOnDropSource() throws Exception {
@Test
public void shouldSupportConfigurableUdfs() throws Exception {
// When:
final TransientQueryMetadata queryMetadata = executeQuery(
final TransientQueryMetadata queryMetadata = executeStatement(
"SELECT E2EConfigurableUdf(registertime) AS x from %s;", USER_TABLE);

// Then:
Expand All @@ -363,30 +371,30 @@ public void shouldSupportConfigurableUdfs() throws Exception {
rows.forEach(row -> assertThat(row.getColumns().get(0), is(-1L)));
}

private QueryMetadata executeStatement(final String statement,
final String... args) {
@SuppressWarnings("unchecked")
private <T extends QueryMetadata> T executeStatement(
final String statement,
final String... args
) {
final String formatted = String.format(statement, (Object[])args);
log.debug("Sending statement: {}", formatted);

final List<QueryMetadata> queries = ksqlContext.sql(formatted);

queries.stream()
final List<QueryMetadata> newQueries = queries.stream()
.filter(q -> !(q instanceof PersistentQueryMetadata))
.forEach(QueryMetadata::start);
.collect(Collectors.toList());

return queries.isEmpty() ? null : queries.get(0);
}
newQueries.forEach(QueryMetadata::start);

private TransientQueryMetadata executeQuery(final String statement,
final String... args) {
final QueryMetadata queryMetadata = executeStatement(statement, args);
assertThat(queryMetadata, instanceOf(TransientQueryMetadata.class));
toClose = queryMetadata;
return (TransientQueryMetadata) queryMetadata;
toClose.addAll(newQueries);

return queries.isEmpty() ? null : (T) queries.get(0);
}

private static List<Object> waitForFirstRow(
final TransientQueryMetadata queryMetadata) throws Exception {
final TransientQueryMetadata queryMetadata
) throws Exception {
return verifyAvailableRows(queryMetadata, 1).get(0).getColumns();
}

Expand Down

0 comments on commit 1552f92

Please sign in to comment.