Skip to content

Commit

Permalink
Update location of flink sql jar and update sql registry config (#37)
Browse files Browse the repository at this point in the history
Signed-off-by: David Kornel <[email protected]>
  • Loading branch information
kornys authored Sep 6, 2024
1 parent 7e62f43 commit 2c6403f
Show file tree
Hide file tree
Showing 2 changed files with 4 additions and 3 deletions.
4 changes: 2 additions & 2 deletions src/main/java/io/streams/constants/FlinkConstants.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,14 @@ public interface FlinkConstants {
"WATERMARK FOR event_time AS event_time - INTERVAL '1' SECOND ) WITH ( 'connector' = 'kafka', " +
"'topic' = 'flink.click.streams', 'properties.bootstrap.servers' = " +
"'my-cluster-kafka-bootstrap.flink.svc:9092', 'properties.group.id' = 'click-stream-group', " +
"'value.format' = 'avro-confluent', 'value.avro-confluent.schema-registry.url' = " +
"'value.format' = 'avro-confluent', 'value.avro-confluent.url' = " +
"'http://apicurio-registry-service.flink.svc:8080/apis/ccompat/v6', 'scan.startup.mode' = " +
"'latest-offset' ); CREATE TABLE SalesRecordTable ( invoice_id STRING, user_id STRING, product_id STRING, " +
"quantity STRING, unit_cost STRING, `purchase_time` TIMESTAMP(3) METADATA FROM 'timestamp', " +
"WATERMARK FOR purchase_time AS purchase_time - INTERVAL '1' SECOND ) WITH ( 'connector' = 'kafka', " +
"'topic' = 'flink.sales.records', 'properties.bootstrap.servers' = " +
"'my-cluster-kafka-bootstrap.flink.svc:9092', 'properties.group.id' = 'sales-record-group', " +
"'value.format' = 'avro-confluent', 'value.avro-confluent.schema-registry.url' = " +
"'value.format' = 'avro-confluent', 'value.avro-confluent.url' = " +
"'http://apicurio-registry-service.flink.svc:8080/apis/ccompat/v6', 'scan.startup.mode' = " +
"'latest-offset' ); CREATE TABLE CsvSinkTable ( user_id STRING, top_product_ids STRING, " +
"`event_time` TIMESTAMP(3), PRIMARY KEY(`user_id`) NOT ENFORCED ) WITH ( 'connector' = 'upsert-kafka', " +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ public static FlinkDeploymentBuilder defaultFlinkDeployment(String namespace, St
.addNewContainer()
.withName("flink-main-container")
.withImage("quay.io/streamshub/flink-sql-runner:latest")
.withImagePullPolicy("Always")
.addNewVolumeMount()
.withName("product-inventory-vol")
.withMountPath("/opt/flink/data")
Expand Down Expand Up @@ -94,7 +95,7 @@ public static FlinkDeploymentBuilder defaultFlinkDeployment(String namespace, St
.endTaskmanagerResource()
.endTaskManager()
.withNewJob()
.withJarURI("local:///opt/flink/usrlib/flink-sql-runner.jar")
.withJarURI("local:///opt/streamshub/flink-sql-runner.jar")
.withParallelism(1L)
.withUpgradeMode(Job.UpgradeMode.stateless)
.withArgs(args)
Expand Down

0 comments on commit 2c6403f

Please sign in to comment.