diff --git a/pom.xml b/pom.xml index 26acad2..8183a7f 100644 --- a/pom.xml +++ b/pom.xml @@ -500,18 +500,6 @@ - - get-flink-sql-example-flink-deployment - generate-sources - - wget - - - https://raw.githubusercontent.com/streamshub/flink-sql-examples/main/recommendation-app/flink-deployment.yaml - ${basedir}/${operator.files.destination}/examples/sql-example - flink-deployment.yaml - - get-flink-sql-example-data-app generate-sources diff --git a/src/main/java/io/streams/constants/FlinkConstants.java b/src/main/java/io/streams/constants/FlinkConstants.java new file mode 100644 index 0000000..c8bc27a --- /dev/null +++ b/src/main/java/io/streams/constants/FlinkConstants.java @@ -0,0 +1,43 @@ +/* + * Copyright streamshub authors. + * License: Apache License 2.0 (see the file LICENSE or http://apache.org/licenses/LICENSE-2.0.html). + */ +package io.streams.constants; + +public interface FlinkConstants { + String TEST_SQL_EXAMPLE_STATEMENT = + "CREATE TABLE ProductInventoryTable ( product_id STRING, category STRING, stock STRING, rating STRING ) " + + "WITH ( 'connector' = 'filesystem', 'path' = '/opt/flink/data/productInventory.csv', " + + "'format' = 'csv', 'csv.ignore-parse-errors' = 'true' ); CREATE TABLE ClickStreamTable " + + "( user_id STRING, product_id STRING, `event_time` TIMESTAMP(3) METADATA FROM 'timestamp', " + + "WATERMARK FOR event_time AS event_time - INTERVAL '1' SECOND ) WITH ( 'connector' = 'kafka', " + + "'topic' = 'flink.click.streams', 'properties.bootstrap.servers' = " + + "'my-cluster-kafka-bootstrap.flink.svc:9092', 'properties.group.id' = 'click-stream-group', " + + "'value.format' = 'avro-confluent', 'value.avro-confluent.schema-registry.url' = " + + "'http://apicurio-registry-service.flink.svc:8080/apis/ccompat/v6', 'scan.startup.mode' = " + + "'latest-offset' ); CREATE TABLE SalesRecordTable ( invoice_id STRING, user_id STRING, product_id STRING, " + + "quantity STRING, unit_cost STRING, `purchase_time` TIMESTAMP(3) METADATA FROM 'timestamp', " + + "WATERMARK FOR purchase_time AS purchase_time - INTERVAL '1' SECOND ) WITH ( 'connector' = 'kafka', " + + "'topic' = 'flink.sales.records', 'properties.bootstrap.servers' = " + + "'my-cluster-kafka-bootstrap.flink.svc:9092', 'properties.group.id' = 'sales-record-group', " + + "'value.format' = 'avro-confluent', 'value.avro-confluent.schema-registry.url' = " + + "'http://apicurio-registry-service.flink.svc:8080/apis/ccompat/v6', 'scan.startup.mode' = " + + "'latest-offset' ); CREATE TABLE CsvSinkTable ( user_id STRING, top_product_ids STRING, " + + "`event_time` TIMESTAMP(3), PRIMARY KEY(`user_id`) NOT ENFORCED ) WITH ( 'connector' = 'upsert-kafka', " + + "'topic' = 'flink.recommended.products', 'properties.bootstrap.servers' = " + + "'my-cluster-kafka-bootstrap.flink.svc:9092', 'properties.client.id' = " + + "'recommended-products-producer-client', 'properties.transaction.timeout.ms' = '800000', " + + "'key.format' = 'csv', 'value.format' = 'csv', 'value.fields-include' = 'ALL' ); CREATE TEMPORARY " + + "VIEW clicked_products AS SELECT DISTINCT c.user_id, c.event_time, p.product_id, p.category " + + "FROM ClickStreamTable AS c JOIN ProductInventoryTable AS p ON c.product_id = p.product_id; " + + "CREATE TEMPORARY VIEW category_products AS SELECT cp.user_id, cp.event_time, p.product_id, " + + "p.category, p.stock, p.rating, sr.user_id as purchased FROM clicked_products cp JOIN " + + "ProductInventoryTable AS p ON cp.category = p.category LEFT JOIN SalesRecordTable sr ON " + + "cp.user_id = sr.user_id AND p.product_id = sr.product_id WHERE p.stock > 0 GROUP BY p.product_id, " + + "p.category, p.stock, cp.user_id, cp.event_time, sr.user_id, p.rating; CREATE TEMPORARY VIEW " + + "top_products AS SELECT cp.user_id, cp.event_time, cp.product_id, cp.category, cp.stock, cp.rating, " + + "cp.purchased, ROW_NUMBER() OVER (PARTITION BY cp.user_id ORDER BY cp.purchased DESC, cp.rating DESC) " + + "AS rn FROM category_products cp; INSERT INTO CsvSinkTable SELECT user_id, LISTAGG(product_id, ',') " + + "AS top_product_ids, TUMBLE_END(event_time, INTERVAL '5' SECOND) FROM top_products WHERE rn <= 6 GROUP " + + "BY user_id, TUMBLE(event_time, INTERVAL '5' SECOND);"; +} diff --git a/src/main/java/io/streams/operands/flink/templates/FlinkDeploymentTemplate.java b/src/main/java/io/streams/operands/flink/templates/FlinkDeploymentTemplate.java new file mode 100644 index 0000000..838e248 --- /dev/null +++ b/src/main/java/io/streams/operands/flink/templates/FlinkDeploymentTemplate.java @@ -0,0 +1,104 @@ +/* + * Copyright streamshub authors. + * License: Apache License 2.0 (see the file LICENSE or http://apache.org/licenses/LICENSE-2.0.html). + */ +package io.streams.operands.flink.templates; + +import org.apache.flink.v1beta1.FlinkDeploymentBuilder; +import org.apache.flink.v1beta1.FlinkDeploymentSpec; +import org.apache.flink.v1beta1.flinkdeploymentspec.Job; + +import java.util.List; +import java.util.Map; + +/** + * FlinkDeployment templates + */ +public class FlinkDeploymentTemplate { + + /** + * Return default flink deployment for sql runner + * + * @param namespace namespace of flink deployment + * @param name name of deployment + * @param args args for sql runner + * @return flink deployment builder + */ + public static FlinkDeploymentBuilder defaultFlinkDeployment(String namespace, String name, List args) { + return new FlinkDeploymentBuilder() + .withNewMetadata() + .withName(name) + .withNamespace(namespace) + .endMetadata() + .withNewSpec() + .withImage("quay.io/streamshub/flink-sql-runner:latest") + .withFlinkVersion(FlinkDeploymentSpec.FlinkVersion.v1_19) + .withFlinkConfiguration( + Map.of("taskmanager.numberOfTaskSlots", "1") + ) + .withServiceAccount("flink") + .withNewPodTemplate() + .withKind("Pod") + .withNewMetadata() + .withName(name) + .endFlinkdeploymentspecMetadata() + .withNewSpec() + .addNewContainer() + .withName("flink-main-container") + .withImage("quay.io/streamshub/flink-sql-runner:latest") + .addNewVolumeMount() + .withName("product-inventory-vol") + .withMountPath("/opt/flink/data") + .endFlinkdeploymentspecVolumeMount() + .addNewVolumeMount() + .withName("flink-logs") + .withMountPath("/opt/flink/log") + .endFlinkdeploymentspecVolumeMount() + .addNewVolumeMount() + .withName("flink-artifacts") + .withMountPath("/opt/flink/artifacts") + .endFlinkdeploymentspecVolumeMount() + .endFlinkdeploymentspecContainer() + .addNewVolume() + .withName("product-inventory-vol") + .withNewConfigMap() + .withName("product-inventory") + .addNewItem() + .withKey("productInventory.csv") + .withPath("productInventory.csv") + .endFlinkdeploymentspecItem() + .endFlinkdeploymentspecConfigMap() + .endFlinkdeploymentspecVolume() + .addNewVolume() + .withName("flink-logs") + .withNewEmptyDir() + .endFlinkdeploymentspecEmptyDir() + .endFlinkdeploymentspecVolume() + .addNewVolume() + .withName("flink-artifacts") + .withNewEmptyDir() + .endFlinkdeploymentspecEmptyDir() + .endFlinkdeploymentspecVolume() + .endFlinkdeploymentspecSpec() + .endPodTemplate() + .withNewJobManager() + .withNewResource() + .withCpu(1.0) + .withMemory("2048m") + .endResource() + .endJobManager() + .withNewTaskManager() + .withNewResource() + .withCpu(1.0) + .withMemory("2048m") + .endTaskmanagerResource() + .endTaskManager() + .withNewJob() + .withJarURI("local:///opt/flink/usrlib/flink-sql-runner.jar") + .withParallelism(1L) + .withUpgradeMode(Job.UpgradeMode.stateless) + .withArgs(args) + .endJob() + .endSpec(); + } +} diff --git a/src/test/java/io/streams/e2e/flink/sql/SqlExampleST.java b/src/test/java/io/streams/e2e/flink/sql/SqlExampleST.java index b3ed24a..68af94c 100644 --- a/src/test/java/io/streams/e2e/flink/sql/SqlExampleST.java +++ b/src/test/java/io/streams/e2e/flink/sql/SqlExampleST.java @@ -5,10 +5,11 @@ import io.fabric8.kubernetes.api.model.NamespaceBuilder; import io.skodjob.testframe.executor.ExecResult; import io.skodjob.testframe.resources.KubeResourceManager; +import io.streams.constants.FlinkConstants; import io.streams.constants.TestConstants; import io.streams.e2e.Abstract; import io.streams.operands.apicurio.templates.ApicurioRegistryTemplate; -import io.streams.operands.flink.resoruces.FlinkDeploymentType; +import io.streams.operands.flink.templates.FlinkDeploymentTemplate; import io.streams.operands.flink.templates.FlinkRBAC; import io.streams.operands.strimzi.templates.KafkaNodePoolTemplate; import io.streams.operands.strimzi.templates.KafkaTemplate; @@ -105,9 +106,8 @@ void testFlinkSqlExample() throws IOException { KubeResourceManager.getInstance().createOrUpdateResourceWithWait(dataApp.toArray(new HasMetadata[0])); // Deploy flink - FlinkDeployment flinkApp = new FlinkDeploymentType().getClient() - .load(exampleFiles.resolve("flink-deployment.yaml").toFile()).item(); - flinkApp.getMetadata().setNamespace(namespace); + FlinkDeployment flinkApp = FlinkDeploymentTemplate.defaultFlinkDeployment(namespace, + "recommendation-app", List.of(FlinkConstants.TEST_SQL_EXAMPLE_STATEMENT)).build(); KubeResourceManager.getInstance().createOrUpdateResourceWithWait(flinkApp); // Run internal consumer and check if topic contains messages