Skip to content

Commit

Permalink
fix(3441): stabilize the StaticQueryFunctionalTest (#3442)
Browse files Browse the repository at this point in the history
Fixes: #3441

Fix is to use different tmp directories per app.  Two instances should not be using the same dir anyway.
  • Loading branch information
big-andy-coates authored Sep 29, 2019
1 parent 06a2a57 commit 44ae3a0
Showing 1 changed file with 25 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

package io.confluent.ksql.rest.integration;

import static io.confluent.ksql.util.KsqlConfig.KSQL_STREAMS_PREFIX;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.instanceOf;
Expand All @@ -35,9 +36,9 @@
import io.confluent.ksql.serde.Format;
import io.confluent.ksql.serde.SerdeOption;
import io.confluent.ksql.test.util.KsqlIdentifierTestUtil;
import io.confluent.ksql.util.KsqlConfig;
import io.confluent.ksql.util.TestDataProvider;
import io.confluent.ksql.util.UserDataProvider;
import java.io.IOException;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
Expand All @@ -50,6 +51,7 @@
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.rules.RuleChain;
import org.junit.rules.TemporaryFolder;

/**
* Test to ensure static queries route across multiple KSQL nodes correctly.
Expand Down Expand Up @@ -77,15 +79,27 @@ public class StaticQueryFunctionalTest {
SerdeOption.none()
);

private static final TemporaryFolder TMP = new TemporaryFolder();

static {
try {
TMP.create();
} catch (final IOException e) {
throw new AssertionError("Failed to init TMP", e);
}
}

private static final TestKsqlRestApp REST_APP_0 = TestKsqlRestApp
.builder(TEST_HARNESS::kafkaBootstrapServers)
.withProperty(KsqlConfig.KSQL_STREAMS_PREFIX + StreamsConfig.NUM_STREAM_THREADS_CONFIG, 1)
.withProperty(KSQL_STREAMS_PREFIX + StreamsConfig.NUM_STREAM_THREADS_CONFIG, 1)
.withProperty(KSQL_STREAMS_PREFIX + StreamsConfig.STATE_DIR_CONFIG, getNewStateDir())
.withStaticServiceContext(TEST_HARNESS::getServiceContext)
.build();

private static final TestKsqlRestApp REST_APP_1 = TestKsqlRestApp
.builder(TEST_HARNESS::kafkaBootstrapServers)
.withProperty(KsqlConfig.KSQL_STREAMS_PREFIX + StreamsConfig.NUM_STREAM_THREADS_CONFIG, 1)
.withProperty(KSQL_STREAMS_PREFIX + StreamsConfig.NUM_STREAM_THREADS_CONFIG, 1)
.withProperty(KSQL_STREAMS_PREFIX + StreamsConfig.STATE_DIR_CONFIG, getNewStateDir())
.withStaticServiceContext(TEST_HARNESS::getServiceContext)
.build();

Expand Down Expand Up @@ -217,5 +231,13 @@ private void waitForTableRows() {
AGGREGATE_SCHEMA
);
}

private static String getNewStateDir() {
try {
return TMP.newFolder().getAbsolutePath();
} catch (final IOException e) {
throw new AssertionError("Failed to create new state dir", e);
}
}
}

0 comments on commit 44ae3a0

Please sign in to comment.