Skip to content

Commit

Permalink
Add FlinkDeploymentTemplate to get default deployment with sql-runner (
Browse files Browse the repository at this point in the history
…#36)

Signed-off-by: David Kornel <[email protected]>
  • Loading branch information
kornys authored Sep 3, 2024
1 parent 436de9c commit 7e62f43
Show file tree
Hide file tree
Showing 4 changed files with 151 additions and 16 deletions.
12 changes: 0 additions & 12 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -500,18 +500,6 @@
</configuration>
</execution>
<!-- Flink SQL Example https://github.com/streamshub/flink-sql-examples -->
<execution>
<id>get-flink-sql-example-flink-deployment</id>
<phase>generate-sources</phase>
<goals>
<goal>wget</goal>
</goals>
<configuration>
<url>https://raw.githubusercontent.com/streamshub/flink-sql-examples/main/recommendation-app/flink-deployment.yaml</url>
<outputDirectory>${basedir}/${operator.files.destination}/examples/sql-example</outputDirectory>
<outputFileName>flink-deployment.yaml</outputFileName>
</configuration>
</execution>
<execution>
<id>get-flink-sql-example-data-app</id>
<phase>generate-sources</phase>
Expand Down
43 changes: 43 additions & 0 deletions src/main/java/io/streams/constants/FlinkConstants.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
/*
* Copyright streamshub authors.
* License: Apache License 2.0 (see the file LICENSE or http://apache.org/licenses/LICENSE-2.0.html).
*/
package io.streams.constants;

public interface FlinkConstants {
String TEST_SQL_EXAMPLE_STATEMENT =
"CREATE TABLE ProductInventoryTable ( product_id STRING, category STRING, stock STRING, rating STRING ) " +
"WITH ( 'connector' = 'filesystem', 'path' = '/opt/flink/data/productInventory.csv', " +
"'format' = 'csv', 'csv.ignore-parse-errors' = 'true' ); CREATE TABLE ClickStreamTable " +
"( user_id STRING, product_id STRING, `event_time` TIMESTAMP(3) METADATA FROM 'timestamp', " +
"WATERMARK FOR event_time AS event_time - INTERVAL '1' SECOND ) WITH ( 'connector' = 'kafka', " +
"'topic' = 'flink.click.streams', 'properties.bootstrap.servers' = " +
"'my-cluster-kafka-bootstrap.flink.svc:9092', 'properties.group.id' = 'click-stream-group', " +
"'value.format' = 'avro-confluent', 'value.avro-confluent.schema-registry.url' = " +
"'http://apicurio-registry-service.flink.svc:8080/apis/ccompat/v6', 'scan.startup.mode' = " +
"'latest-offset' ); CREATE TABLE SalesRecordTable ( invoice_id STRING, user_id STRING, product_id STRING, " +
"quantity STRING, unit_cost STRING, `purchase_time` TIMESTAMP(3) METADATA FROM 'timestamp', " +
"WATERMARK FOR purchase_time AS purchase_time - INTERVAL '1' SECOND ) WITH ( 'connector' = 'kafka', " +
"'topic' = 'flink.sales.records', 'properties.bootstrap.servers' = " +
"'my-cluster-kafka-bootstrap.flink.svc:9092', 'properties.group.id' = 'sales-record-group', " +
"'value.format' = 'avro-confluent', 'value.avro-confluent.schema-registry.url' = " +
"'http://apicurio-registry-service.flink.svc:8080/apis/ccompat/v6', 'scan.startup.mode' = " +
"'latest-offset' ); CREATE TABLE CsvSinkTable ( user_id STRING, top_product_ids STRING, " +
"`event_time` TIMESTAMP(3), PRIMARY KEY(`user_id`) NOT ENFORCED ) WITH ( 'connector' = 'upsert-kafka', " +
"'topic' = 'flink.recommended.products', 'properties.bootstrap.servers' = " +
"'my-cluster-kafka-bootstrap.flink.svc:9092', 'properties.client.id' = " +
"'recommended-products-producer-client', 'properties.transaction.timeout.ms' = '800000', " +
"'key.format' = 'csv', 'value.format' = 'csv', 'value.fields-include' = 'ALL' ); CREATE TEMPORARY " +
"VIEW clicked_products AS SELECT DISTINCT c.user_id, c.event_time, p.product_id, p.category " +
"FROM ClickStreamTable AS c JOIN ProductInventoryTable AS p ON c.product_id = p.product_id; " +
"CREATE TEMPORARY VIEW category_products AS SELECT cp.user_id, cp.event_time, p.product_id, " +
"p.category, p.stock, p.rating, sr.user_id as purchased FROM clicked_products cp JOIN " +
"ProductInventoryTable AS p ON cp.category = p.category LEFT JOIN SalesRecordTable sr ON " +
"cp.user_id = sr.user_id AND p.product_id = sr.product_id WHERE p.stock > 0 GROUP BY p.product_id, " +
"p.category, p.stock, cp.user_id, cp.event_time, sr.user_id, p.rating; CREATE TEMPORARY VIEW " +
"top_products AS SELECT cp.user_id, cp.event_time, cp.product_id, cp.category, cp.stock, cp.rating, " +
"cp.purchased, ROW_NUMBER() OVER (PARTITION BY cp.user_id ORDER BY cp.purchased DESC, cp.rating DESC) " +
"AS rn FROM category_products cp; INSERT INTO CsvSinkTable SELECT user_id, LISTAGG(product_id, ',') " +
"AS top_product_ids, TUMBLE_END(event_time, INTERVAL '5' SECOND) FROM top_products WHERE rn <= 6 GROUP " +
"BY user_id, TUMBLE(event_time, INTERVAL '5' SECOND);";
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
/*
* Copyright streamshub authors.
* License: Apache License 2.0 (see the file LICENSE or http://apache.org/licenses/LICENSE-2.0.html).
*/
package io.streams.operands.flink.templates;

import org.apache.flink.v1beta1.FlinkDeploymentBuilder;
import org.apache.flink.v1beta1.FlinkDeploymentSpec;
import org.apache.flink.v1beta1.flinkdeploymentspec.Job;

import java.util.List;
import java.util.Map;

/**
* FlinkDeployment templates
*/
public class FlinkDeploymentTemplate {

/**
* Return default flink deployment for sql runner
*
* @param namespace namespace of flink deployment
* @param name name of deployment
* @param args args for sql runner
* @return flink deployment builder
*/
public static FlinkDeploymentBuilder defaultFlinkDeployment(String namespace, String name, List<String> args) {
return new FlinkDeploymentBuilder()
.withNewMetadata()
.withName(name)
.withNamespace(namespace)
.endMetadata()
.withNewSpec()
.withImage("quay.io/streamshub/flink-sql-runner:latest")
.withFlinkVersion(FlinkDeploymentSpec.FlinkVersion.v1_19)
.withFlinkConfiguration(
Map.of("taskmanager.numberOfTaskSlots", "1")
)
.withServiceAccount("flink")
.withNewPodTemplate()
.withKind("Pod")
.withNewMetadata()
.withName(name)
.endFlinkdeploymentspecMetadata()
.withNewSpec()
.addNewContainer()
.withName("flink-main-container")
.withImage("quay.io/streamshub/flink-sql-runner:latest")
.addNewVolumeMount()
.withName("product-inventory-vol")
.withMountPath("/opt/flink/data")
.endFlinkdeploymentspecVolumeMount()
.addNewVolumeMount()
.withName("flink-logs")
.withMountPath("/opt/flink/log")
.endFlinkdeploymentspecVolumeMount()
.addNewVolumeMount()
.withName("flink-artifacts")
.withMountPath("/opt/flink/artifacts")
.endFlinkdeploymentspecVolumeMount()
.endFlinkdeploymentspecContainer()
.addNewVolume()
.withName("product-inventory-vol")
.withNewConfigMap()
.withName("product-inventory")
.addNewItem()
.withKey("productInventory.csv")
.withPath("productInventory.csv")
.endFlinkdeploymentspecItem()
.endFlinkdeploymentspecConfigMap()
.endFlinkdeploymentspecVolume()
.addNewVolume()
.withName("flink-logs")
.withNewEmptyDir()
.endFlinkdeploymentspecEmptyDir()
.endFlinkdeploymentspecVolume()
.addNewVolume()
.withName("flink-artifacts")
.withNewEmptyDir()
.endFlinkdeploymentspecEmptyDir()
.endFlinkdeploymentspecVolume()
.endFlinkdeploymentspecSpec()
.endPodTemplate()
.withNewJobManager()
.withNewResource()
.withCpu(1.0)
.withMemory("2048m")
.endResource()
.endJobManager()
.withNewTaskManager()
.withNewResource()
.withCpu(1.0)
.withMemory("2048m")
.endTaskmanagerResource()
.endTaskManager()
.withNewJob()
.withJarURI("local:///opt/flink/usrlib/flink-sql-runner.jar")
.withParallelism(1L)
.withUpgradeMode(Job.UpgradeMode.stateless)
.withArgs(args)
.endJob()
.endSpec();
}
}
8 changes: 4 additions & 4 deletions src/test/java/io/streams/e2e/flink/sql/SqlExampleST.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,11 @@
import io.fabric8.kubernetes.api.model.NamespaceBuilder;
import io.skodjob.testframe.executor.ExecResult;
import io.skodjob.testframe.resources.KubeResourceManager;
import io.streams.constants.FlinkConstants;
import io.streams.constants.TestConstants;
import io.streams.e2e.Abstract;
import io.streams.operands.apicurio.templates.ApicurioRegistryTemplate;
import io.streams.operands.flink.resoruces.FlinkDeploymentType;
import io.streams.operands.flink.templates.FlinkDeploymentTemplate;
import io.streams.operands.flink.templates.FlinkRBAC;
import io.streams.operands.strimzi.templates.KafkaNodePoolTemplate;
import io.streams.operands.strimzi.templates.KafkaTemplate;
Expand Down Expand Up @@ -105,9 +106,8 @@ void testFlinkSqlExample() throws IOException {
KubeResourceManager.getInstance().createOrUpdateResourceWithWait(dataApp.toArray(new HasMetadata[0]));

// Deploy flink
FlinkDeployment flinkApp = new FlinkDeploymentType().getClient()
.load(exampleFiles.resolve("flink-deployment.yaml").toFile()).item();
flinkApp.getMetadata().setNamespace(namespace);
FlinkDeployment flinkApp = FlinkDeploymentTemplate.defaultFlinkDeployment(namespace,
"recommendation-app", List.of(FlinkConstants.TEST_SQL_EXAMPLE_STATEMENT)).build();
KubeResourceManager.getInstance().createOrUpdateResourceWithWait(flinkApp);

// Run internal consumer and check if topic contains messages
Expand Down

0 comments on commit 7e62f43

Please sign in to comment.