restClusterClient;
+
+ @Before
+ public void setUp() throws Exception {
+ CLICKHOUSE_CONTAINER.start();
+
+ String properties =
+ String.join(
+ "\n",
+ Arrays.asList(
+ "jobmanager.rpc.address: jobmanager",
+ "heartbeat.timeout: 60000",
+ "parallelism.default: 1"));
+ jobManager =
+ new GenericContainer<>(new DockerImageName("flink:1.19.0-scala_2.12"))
+ .withCommand("jobmanager")
+ .withNetwork(NETWORK)
+ .withExtraHost("host.docker.internal", "host-gateway")
+ .withNetworkAliases("jobmanager")
+ .withExposedPorts(8081, 6123)
+ .dependsOn(CLICKHOUSE_CONTAINER)
+ .withLabel("com.testcontainers.allow-filesystem-access", "true")
+ .withEnv("FLINK_PROPERTIES", properties)
+ .withLogConsumer(new Slf4jLogConsumer(logger));
+ taskManager =
+ new GenericContainer<>(new DockerImageName("flink:1.19.0-scala_2.12"))
+ .withCommand("taskmanager")
+ .withExtraHost("host.docker.internal", "host-gateway")
+ .withNetwork(NETWORK)
+ .withNetworkAliases("taskmanager")
+ .withEnv("FLINK_PROPERTIES", properties)
+ .dependsOn(jobManager)
+ .withLabel("com.testcontainers.allow-filesystem-access", "true")
+ .withLogConsumer(new Slf4jLogConsumer(logger));
+ Startables.deepStart(Stream.of(jobManager)).join();
+ Startables.deepStart(Stream.of(taskManager)).join();
+ Thread.sleep(5000);
+ logger.info("Containers are started.");
+ }
+
+ /**
+ * Returns the {@link RestClusterClient} for the running cluster.
+ *
+ * NOTE: The client is created lazily and should only be retrieved after the cluster
+ * is running.
+ */
+ public RestClusterClient getRestClusterClient() {
+ if (restClusterClient != null) {
+ return restClusterClient;
+ }
+ checkState(
+ jobManager.isRunning(),
+ "Cluster client should only be retrieved for a running cluster");
+ try {
+ final Configuration clientConfiguration = new Configuration();
+ clientConfiguration.set(RestOptions.ADDRESS, jobManager.getHost());
+ clientConfiguration.set(RestOptions.PORT, jobManager.getMappedPort(8081));
+ this.restClusterClient =
+ new RestClusterClient<>(clientConfiguration, StandaloneClusterId.getInstance());
+ } catch (Exception e) {
+ throw new IllegalStateException(
+ "Failed to create client for Flink container cluster", e);
+ }
+ return restClusterClient;
+ }
+
+ /**
+ * Submits a SQL job to the running cluster.
+ *
+ * NOTE: You should not use {@code '\t'}.
+ */
+ public void submitSQLJob(List sqlLines, Path... jars)
+ throws IOException, InterruptedException {
+ logger.info("submitting flink sql task");
+
+ SQLJobSubmission job =
+ new SQLJobSubmission.SQLJobSubmissionBuilder(sqlLines).addJars(jars).build();
+ final List commands = new ArrayList<>();
+ Path script = temporaryFolder.newFile().toPath();
+ Files.write(script, job.getSqlLines());
+ jobManager.copyFileToContainer(MountableFile.forHostPath(script), "/tmp/script.sql");
+ commands.add("cat /tmp/script.sql | ");
+ commands.add("bin/sql-client.sh");
+ for (String jar : job.getJars()) {
+ commands.add("--jar");
+ String containerPath = copyAndGetContainerPath(jobManager, jar);
+ commands.add(containerPath);
+ }
+
+ Container.ExecResult execResult =
+ jobManager.execInContainer("bash", "-c", String.join(" ", commands));
+ logger.info("execute result:" + execResult.getStdout());
+ logger.error("execute error:" + execResult.getStderr());
+ if (execResult.getExitCode() != 0) {
+ throw new AssertionError("Failed when submitting the SQL job.");
+ }
+ }
+
+ /*
+ * Copy the file to the container and return the container path.
+ */
+ private String copyAndGetContainerPath(GenericContainer> container, String filePath) {
+ Path path = Paths.get(filePath);
+ String containerPath = "/tmp/" + path.getFileName();
+ container.copyFileToContainer(MountableFile.forHostPath(path), containerPath);
+ return containerPath;
+ }
+
+ private static List readSqlFile(final String resourceName) throws Exception {
+ return Files.readAllLines(
+ Paths.get(ClickhouseE2ECase.class.getResource("/" + resourceName).toURI()));
+ }
+
+ public void waitUntilJobRunning(Duration timeout) {
+ RestClusterClient> clusterClient = getRestClusterClient();
+ Deadline deadline = Deadline.fromNow(timeout);
+ while (deadline.hasTimeLeft()) {
+ Collection jobStatusMessages;
+ try {
+ jobStatusMessages = clusterClient.listJobs().get(10, TimeUnit.SECONDS);
+ } catch (Exception e) {
+ logger.warn("Error when fetching job status.", e);
+ continue;
+ }
+ if (jobStatusMessages != null && !jobStatusMessages.isEmpty()) {
+ JobStatusMessage message = jobStatusMessages.iterator().next();
+ JobStatus jobStatus = message.getJobState();
+ if (jobStatus.isTerminalState() && message.getJobState() == JobStatus.FAILED) {
+ throw new ValidationException(
+ String.format(
+ "Job has been terminated! JobName: %s, JobID: %s, Status: %s",
+ message.getJobName(),
+ message.getJobId(),
+ message.getJobState()));
+ } else if (jobStatus == JobStatus.RUNNING) {
+ return;
+ }
+ }
+ }
+ }
+}
diff --git a/pom.xml b/pom.xml
index 7881d18..c1aeee3 100644
--- a/pom.xml
+++ b/pom.xml
@@ -68,6 +68,9 @@ limitations under the License.
32.1.3-jre
2.12.4
3.13.0
+ 1.19.8
+ 5.2.1
+ 5.2
org.apache.flink.shaded.clickhouse
flink-connector-clickhouse-parent
3.8.1