From ade35f001d06404719cb176096fc73eb49e016ad Mon Sep 17 00:00:00 2001 From: Gantigmaa Selenge Date: Fri, 19 Jul 2024 15:16:15 +0300 Subject: [PATCH 1/3] Add sql-runner module Signed-off-by: Gantigmaa Selenge --- .gitignore | 7 + README.md | 17 ++- examples/FlinkDeployment.yaml | 24 +++ flink-sql-runner/Dockerfile | 4 + flink-sql-runner/pom.xml | 103 +++++++++++++ .../flink/KubernetesSecretReplacer.java | 41 +++++ .../github/streamshub/flink/SqlRunner.java | 142 ++++++++++++++++++ pom.xml | 8 + 8 files changed, 345 insertions(+), 1 deletion(-) create mode 100644 examples/FlinkDeployment.yaml create mode 100644 flink-sql-runner/Dockerfile create mode 100644 flink-sql-runner/pom.xml create mode 100644 flink-sql-runner/src/main/java/com/github/streamshub/flink/KubernetesSecretReplacer.java create mode 100644 flink-sql-runner/src/main/java/com/github/streamshub/flink/SqlRunner.java diff --git a/.gitignore b/.gitignore index 524f096..e091671 100644 --- a/.gitignore +++ b/.gitignore @@ -10,6 +10,12 @@ # Mobile Tools for Java (J2ME) .mtj.tmp/ +### IntelliJ IDEA ### +.idea/* +*.iws +*.iml +*.ipr + # Package Files # *.jar *.war @@ -22,3 +28,4 @@ # virtual machine crash logs, see http://www.java.com/en/download/help/error_hotspot.xml hs_err_pid* replay_pid* +/flink-sql-runner/target/ diff --git a/README.md b/README.md index 34627ee..b9fd3d7 100644 --- a/README.md +++ b/README.md @@ -1 +1,16 @@ -# flink-sql \ No newline at end of file +# Flink SQL Runner + +An application to execute Flink SQL jobs. + +## Building and running Flink SQL Runner + +1. Build application + ``` + mvn package + ``` +2. Build an image + ``` + minikube image build flink-sql-runner -t flink-sql-runner:latest + ``` + +See /examples directory to see how to run FlinkDeployment using the SQL runner. \ No newline at end of file diff --git a/examples/FlinkDeployment.yaml b/examples/FlinkDeployment.yaml new file mode 100644 index 0000000..84c772f --- /dev/null +++ b/examples/FlinkDeployment.yaml @@ -0,0 +1,24 @@ +apiVersion: flink.apache.org/v1beta1 +kind: FlinkDeployment +metadata: + name: flink-deployment-example +spec: + image: flink-sql-runner:latest + imagePullPolicy: Never + flinkVersion: v1_19 + flinkConfiguration: + taskmanager.numberOfTaskSlots: "1" + serviceAccount: flink + jobManager: + resource: + memory: "2048m" + cpu: 1 + taskManager: + resource: + memory: "2048m" + cpu: 1 + job: + jarURI: local:///opt/flink/usrlib/flink-sql-runner.jar + args: [""] + parallelism: 1 + upgradeMode: stateless \ No newline at end of file diff --git a/flink-sql-runner/Dockerfile b/flink-sql-runner/Dockerfile new file mode 100644 index 0000000..0783529 --- /dev/null +++ b/flink-sql-runner/Dockerfile @@ -0,0 +1,4 @@ +FROM flink:1.19.0 + +RUN mkdir /opt/flink/usrlib +ADD target/flink-sql-runner-*.jar /opt/flink/usrlib/flink-sql-runner.jar \ No newline at end of file diff --git a/flink-sql-runner/pom.xml b/flink-sql-runner/pom.xml new file mode 100644 index 0000000..571ca7a --- /dev/null +++ b/flink-sql-runner/pom.xml @@ -0,0 +1,103 @@ + + + 4.0.0 + + com.github.streamshub + flink-sql + 0.0.1-SNAPSHOT + + + flink-sql-runner + + + 11 + UTF-8 + + + + + org.apache.flink + flink-clients + ${flink.version} + provided + + + org.apache.flink + flink-table-api-java + ${flink.version} + provided + + + org.apache.flink + flink-csv + ${flink.version} + provided + + + org.slf4j + slf4j-api + ${slf4j.version} + provided + + + + io.fabric8 + kubernetes-client + ${fabric8.kubernetes-client.version} + + + + + org.apache.flink + flink-connector-kafka + ${flink.kafka.connector.version} + + + + + + + org.apache.maven.plugins + maven-shade-plugin + 3.1.1 + + + + package + + shade + + + + + org.apache.flink:flink-shaded-force-shading + + + + + + *:* + + META-INF/*.SF + META-INF/*.DSA + META-INF/*.RSA + + + + + + + com.github.streamshub.flink.SqlRunner + + + + + + + + + + \ No newline at end of file diff --git a/flink-sql-runner/src/main/java/com/github/streamshub/flink/KubernetesSecretReplacer.java b/flink-sql-runner/src/main/java/com/github/streamshub/flink/KubernetesSecretReplacer.java new file mode 100644 index 0000000..aa8b48f --- /dev/null +++ b/flink-sql-runner/src/main/java/com/github/streamshub/flink/KubernetesSecretReplacer.java @@ -0,0 +1,41 @@ +package com.github.streamshub.flink; + +import io.fabric8.kubernetes.api.model.Secret; +import io.fabric8.kubernetes.client.KubernetesClient; +import io.fabric8.kubernetes.client.KubernetesClientBuilder; +import io.fabric8.kubernetes.client.KubernetesClientException; +import java.nio.charset.StandardCharsets; +import java.util.Base64; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +public class KubernetesSecretReplacer { + private static final Pattern SECRET_PATTERN = Pattern.compile("\\{\\{secret:([^/]+)/([^}]+)/([^}]+)}}"); + static String replaceSecrets(String input) { + Matcher matcher = SECRET_PATTERN.matcher(input); + StringBuffer result = new StringBuffer(); + + while (matcher.find()) { + String namespace = matcher.group(1); + String secretName = matcher.group(2); + String secretKey = matcher.group(3); + String secretValue = getSecretValue(namespace, secretName, secretKey); + matcher.appendReplacement(result, secretValue != null ? secretValue : ""); + } + matcher.appendTail(result); + + return result.toString(); + } + + private static String getSecretValue(String namespace, String secretName, String secretKey) { + try (KubernetesClient client = new KubernetesClientBuilder().build()){ + Secret secret = client.secrets().inNamespace(namespace).withName(secretName).get(); + if (secret != null && secret.getData() != null && secret.getData().containsKey(secretKey)) { + return new String(Base64.getDecoder().decode(secret.getData().get(secretKey)), StandardCharsets.UTF_8); + } + } catch (KubernetesClientException e) { + e.printStackTrace(); + } + return null; + } +} \ No newline at end of file diff --git a/flink-sql-runner/src/main/java/com/github/streamshub/flink/SqlRunner.java b/flink-sql-runner/src/main/java/com/github/streamshub/flink/SqlRunner.java new file mode 100644 index 0000000..c7bc77d --- /dev/null +++ b/flink-sql-runner/src/main/java/com/github/streamshub/flink/SqlRunner.java @@ -0,0 +1,142 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.github.streamshub.flink; + +import org.apache.flink.table.api.EnvironmentSettings; +import org.apache.flink.table.api.TableEnvironment; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.List; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +/** Parses and executes SQL statements. */ +public class SqlRunner { + + private static final Logger LOG = LoggerFactory.getLogger(SqlRunner.class); + + private static final String STATEMENT_DELIMITER = ";"; // a statement should end with `;` + private static final String LINE_DELIMITER = "\n"; + + private static final String COMMENT_PATTERN = "(--.*)|(((\\/\\*)+?[\\w\\W]+?(\\*\\/)+))"; + + private static final Pattern SET_STATEMENT_PATTERN = + Pattern.compile("SET\\s+'(\\S+)'\\s+=\\s+'(.*)';", Pattern.CASE_INSENSITIVE); + + private static final Pattern STATEMENT_SET_PATTERN = + Pattern.compile("(EXECUTE STATEMENT SET BEGIN.*?END;)", Pattern.CASE_INSENSITIVE | Pattern.DOTALL); + + private static final String BEGIN_CERTIFICATE = "-----BEGIN CERTIFICATE-----"; + private static final String END_CERTIFICATE = "-----END CERTIFICATE-----"; + private static final String ESCAPED_BEGIN_CERTIFICATE = "======BEGIN CERTIFICATE====="; + private static final String ESCAPED_END_CERTIFICATE = "=====END CERTIFICATE====="; + + private static final String BEGIN_PRIVATE_KEY = "-----BEGIN PRIVATE KEY-----"; + private static final String END_PRIVATE_KEY = "-----END PRIVATE KEY-----"; + private static final String ESCAPED_BEGIN_PRIVATE_KEY = "======BEGIN PRIVATE KEY====="; + private static final String ESCAPED_END_PRIVATE_KEY = "=====END PRIVATE KEY====="; + + public static void main(String[] args) throws Exception { + if (args.length != 1) { + throw new Exception("Exactly 1 argument is expected."); + } + + var statements = parseStatements(args[0]); + + EnvironmentSettings settings = EnvironmentSettings + .newInstance() + .inStreamingMode() + .build(); + var tableEnv = TableEnvironment.create(settings); + LOG.debug("TableEnvironment config: " + tableEnv.getConfig().toMap()); + + for (String statement : statements) { + var processedStatement = interpolateSecrets(statement); + Matcher setMatcher = SET_STATEMENT_PATTERN.matcher(statement.trim()); + + if (setMatcher.matches()) { + // Handle SET statements + String key = setMatcher.group(1); + String value = setMatcher.group(2); + LOG.debug("Setting configurations:\n{}={}", key, value); + tableEnv.getConfig().getConfiguration().setString(key, value); + } else { + LOG.info("Executing:\n{}", statement); + tableEnv.executeSql(processedStatement); + } + } + } + + private static List parseStatements(String rawStatements) { + var formatted = formatSqlStatements(rawStatements.trim()); + + var statements = new ArrayList(); + StringBuilder current = new StringBuilder(); + Matcher matcher = STATEMENT_SET_PATTERN.matcher(formatted); + + String statementSet = ""; + String otherStatements = formatted; + + if (matcher.find()) { + statementSet = matcher.group(1); + otherStatements = formatted.replace(statementSet, "").trim(); + } + + for (char c : otherStatements.toCharArray()) { + if (c == STATEMENT_DELIMITER.charAt(0)) { + current.append(c); + statements.add(current.toString().trim()); + current = new StringBuilder(); + } else { + current.append(c); + } + } + + if (statementSet.length() > 0) { + statements.add(statementSet); + } + + return statements; + } + + private static String interpolateSecrets(String statement) { + return KubernetesSecretReplacer.replaceSecrets(statement) + .replaceAll(BEGIN_CERTIFICATE, ESCAPED_BEGIN_CERTIFICATE) + .replaceAll(END_CERTIFICATE, ESCAPED_END_CERTIFICATE) + .replaceAll(BEGIN_PRIVATE_KEY, ESCAPED_BEGIN_PRIVATE_KEY) + .replaceAll(END_PRIVATE_KEY, ESCAPED_END_PRIVATE_KEY) + .replaceAll(COMMENT_PATTERN, "") + .replaceAll(ESCAPED_BEGIN_CERTIFICATE, BEGIN_CERTIFICATE) + .replaceAll(ESCAPED_END_CERTIFICATE, END_CERTIFICATE) + .replaceAll(ESCAPED_BEGIN_PRIVATE_KEY, BEGIN_PRIVATE_KEY) + .replaceAll(ESCAPED_END_PRIVATE_KEY, END_PRIVATE_KEY); + } + + private static String formatSqlStatements(String content) { + StringBuilder formatted = new StringBuilder(); + formatted.append(content); + if (!content.endsWith(STATEMENT_DELIMITER)) { + formatted.append(STATEMENT_DELIMITER); + } + formatted.append(LINE_DELIMITER); + return formatted.toString(); + } +} \ No newline at end of file diff --git a/pom.xml b/pom.xml index 9743632..1341aed 100644 --- a/pom.xml +++ b/pom.xml @@ -15,9 +15,17 @@ 3.10.1 + + + 1.19.0 + 3.7.0 + 1.7.36 + 6.13.0 + 3.2.0-1.19 + flink-sql-runner From e9f9ca71eeb8770a36b31267470053625fc33f0e Mon Sep 17 00:00:00 2001 From: Gantigmaa Selenge Date: Mon, 29 Jul 2024 16:40:34 +0100 Subject: [PATCH 2/3] Add avro dependency --- flink-sql-runner/pom.xml | 6 ++++++ pom.xml | 1 + 2 files changed, 7 insertions(+) diff --git a/flink-sql-runner/pom.xml b/flink-sql-runner/pom.xml index 571ca7a..146dc2f 100644 --- a/flink-sql-runner/pom.xml +++ b/flink-sql-runner/pom.xml @@ -48,6 +48,12 @@ ${fabric8.kubernetes-client.version} + + org.apache.flink + flink-sql-avro-confluent-registry + ${flink.avro.confluent.registry.version} + + org.apache.flink diff --git a/pom.xml b/pom.xml index 1341aed..8c932e6 100644 --- a/pom.xml +++ b/pom.xml @@ -21,6 +21,7 @@ 3.7.0 1.7.36 6.13.0 + 1.19.1 3.2.0-1.19 From a9059367ab0057e326095631a762cc8621acdc65 Mon Sep 17 00:00:00 2001 From: Gantigmaa Selenge Date: Wed, 31 Jul 2024 17:17:11 +0100 Subject: [PATCH 3/3] Address review comments --- .gitignore | 3 ++- flink-sql-runner/Dockerfile | 2 +- flink-sql-runner/pom.xml | 4 +-- .../flink/KubernetesSecretReplacer.java | 13 ++++++--- .../github/streamshub/flink/SqlRunner.java | 27 +------------------ pom.xml | 2 +- 6 files changed, 16 insertions(+), 35 deletions(-) diff --git a/.gitignore b/.gitignore index e091671..33f0bc7 100644 --- a/.gitignore +++ b/.gitignore @@ -28,4 +28,5 @@ # virtual machine crash logs, see http://www.java.com/en/download/help/error_hotspot.xml hs_err_pid* replay_pid* -/flink-sql-runner/target/ +target +dependency-reduced-pom.xml diff --git a/flink-sql-runner/Dockerfile b/flink-sql-runner/Dockerfile index 0783529..3b0d827 100644 --- a/flink-sql-runner/Dockerfile +++ b/flink-sql-runner/Dockerfile @@ -1,4 +1,4 @@ -FROM flink:1.19.0 +FROM flink:1.19.1 RUN mkdir /opt/flink/usrlib ADD target/flink-sql-runner-*.jar /opt/flink/usrlib/flink-sql-runner.jar \ No newline at end of file diff --git a/flink-sql-runner/pom.xml b/flink-sql-runner/pom.xml index 146dc2f..50d0618 100644 --- a/flink-sql-runner/pom.xml +++ b/flink-sql-runner/pom.xml @@ -50,7 +50,7 @@ org.apache.flink - flink-sql-avro-confluent-registry + flink-avro-confluent-registry ${flink.avro.confluent.registry.version} @@ -67,7 +67,7 @@ org.apache.maven.plugins maven-shade-plugin - 3.1.1 + 3.6.0 diff --git a/flink-sql-runner/src/main/java/com/github/streamshub/flink/KubernetesSecretReplacer.java b/flink-sql-runner/src/main/java/com/github/streamshub/flink/KubernetesSecretReplacer.java index aa8b48f..a1ca0d4 100644 --- a/flink-sql-runner/src/main/java/com/github/streamshub/flink/KubernetesSecretReplacer.java +++ b/flink-sql-runner/src/main/java/com/github/streamshub/flink/KubernetesSecretReplacer.java @@ -10,8 +10,9 @@ import java.util.regex.Pattern; public class KubernetesSecretReplacer { + // Expected pattern for a secret is {{secret://}} private static final Pattern SECRET_PATTERN = Pattern.compile("\\{\\{secret:([^/]+)/([^}]+)/([^}]+)}}"); - static String replaceSecrets(String input) { + static String interpolateSecrets(String input) { Matcher matcher = SECRET_PATTERN.matcher(input); StringBuffer result = new StringBuffer(); @@ -30,12 +31,16 @@ static String replaceSecrets(String input) { private static String getSecretValue(String namespace, String secretName, String secretKey) { try (KubernetesClient client = new KubernetesClientBuilder().build()){ Secret secret = client.secrets().inNamespace(namespace).withName(secretName).get(); - if (secret != null && secret.getData() != null && secret.getData().containsKey(secretKey)) { + if (secret == null) { + throw new RuntimeException("Secret" + secretName + " does not exist"); + } + if (secret.getData() != null && secret.getData().containsKey(secretKey)) { return new String(Base64.getDecoder().decode(secret.getData().get(secretKey)), StandardCharsets.UTF_8); + } else { + throw new RuntimeException("Could not read data with key " + secretKey + "from secret " + secretName); } } catch (KubernetesClientException e) { - e.printStackTrace(); + throw new RuntimeException(e); } - return null; } } \ No newline at end of file diff --git a/flink-sql-runner/src/main/java/com/github/streamshub/flink/SqlRunner.java b/flink-sql-runner/src/main/java/com/github/streamshub/flink/SqlRunner.java index c7bc77d..010fb55 100644 --- a/flink-sql-runner/src/main/java/com/github/streamshub/flink/SqlRunner.java +++ b/flink-sql-runner/src/main/java/com/github/streamshub/flink/SqlRunner.java @@ -36,24 +36,12 @@ public class SqlRunner { private static final String STATEMENT_DELIMITER = ";"; // a statement should end with `;` private static final String LINE_DELIMITER = "\n"; - private static final String COMMENT_PATTERN = "(--.*)|(((\\/\\*)+?[\\w\\W]+?(\\*\\/)+))"; - private static final Pattern SET_STATEMENT_PATTERN = Pattern.compile("SET\\s+'(\\S+)'\\s+=\\s+'(.*)';", Pattern.CASE_INSENSITIVE); private static final Pattern STATEMENT_SET_PATTERN = Pattern.compile("(EXECUTE STATEMENT SET BEGIN.*?END;)", Pattern.CASE_INSENSITIVE | Pattern.DOTALL); - private static final String BEGIN_CERTIFICATE = "-----BEGIN CERTIFICATE-----"; - private static final String END_CERTIFICATE = "-----END CERTIFICATE-----"; - private static final String ESCAPED_BEGIN_CERTIFICATE = "======BEGIN CERTIFICATE====="; - private static final String ESCAPED_END_CERTIFICATE = "=====END CERTIFICATE====="; - - private static final String BEGIN_PRIVATE_KEY = "-----BEGIN PRIVATE KEY-----"; - private static final String END_PRIVATE_KEY = "-----END PRIVATE KEY-----"; - private static final String ESCAPED_BEGIN_PRIVATE_KEY = "======BEGIN PRIVATE KEY====="; - private static final String ESCAPED_END_PRIVATE_KEY = "=====END PRIVATE KEY====="; - public static void main(String[] args) throws Exception { if (args.length != 1) { throw new Exception("Exactly 1 argument is expected."); @@ -69,7 +57,7 @@ public static void main(String[] args) throws Exception { LOG.debug("TableEnvironment config: " + tableEnv.getConfig().toMap()); for (String statement : statements) { - var processedStatement = interpolateSecrets(statement); + var processedStatement = KubernetesSecretReplacer.interpolateSecrets(statement); Matcher setMatcher = SET_STATEMENT_PATTERN.matcher(statement.trim()); if (setMatcher.matches()) { @@ -117,19 +105,6 @@ private static List parseStatements(String rawStatements) { return statements; } - private static String interpolateSecrets(String statement) { - return KubernetesSecretReplacer.replaceSecrets(statement) - .replaceAll(BEGIN_CERTIFICATE, ESCAPED_BEGIN_CERTIFICATE) - .replaceAll(END_CERTIFICATE, ESCAPED_END_CERTIFICATE) - .replaceAll(BEGIN_PRIVATE_KEY, ESCAPED_BEGIN_PRIVATE_KEY) - .replaceAll(END_PRIVATE_KEY, ESCAPED_END_PRIVATE_KEY) - .replaceAll(COMMENT_PATTERN, "") - .replaceAll(ESCAPED_BEGIN_CERTIFICATE, BEGIN_CERTIFICATE) - .replaceAll(ESCAPED_END_CERTIFICATE, END_CERTIFICATE) - .replaceAll(ESCAPED_BEGIN_PRIVATE_KEY, BEGIN_PRIVATE_KEY) - .replaceAll(ESCAPED_END_PRIVATE_KEY, END_PRIVATE_KEY); - } - private static String formatSqlStatements(String content) { StringBuilder formatted = new StringBuilder(); formatted.append(content); diff --git a/pom.xml b/pom.xml index 8c932e6..b79bf3e 100644 --- a/pom.xml +++ b/pom.xml @@ -17,7 +17,7 @@ 3.10.1 - 1.19.0 + 1.19.1 3.7.0 1.7.36 6.13.0