Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Debug build failure with KeyedBroadcastProcessFunction #1

Closed
wants to merge 8 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 12 additions & 9 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,29 +1,32 @@
# Flink API Examples for DataStream API and Table API in Scala 3

Flink is now Scala Free. In the upcoming 1.15 release, Flink will not expose any specific Scala version.
Flink is now Scala Free. In the 1.15 release, Flink does not expose any specific Scala version.
Users can now choose whatever Scala version they need in their user code, including Scala 3.

This repository is a reimplementation of Timo Walther's [Flink API Examples for DataStream API and Table API](https://github.com/twalthr/flink-api-examples)
examples in Scala 3.
examples in Scala 3.
You can watch his talk [Flink's Table & DataStream API: A Perfect Symbiosis](https://youtu.be/vLLn5PxF2Lw) on YouTube which walks through the Java version of this code.

# How to Use This Repository

1. Import this repository into your IDE (preferably IntelliJ IDEA). The project uses the latest Flink 1.15 nightly version.
1. Import this repository into your IDE (preferably IntelliJ IDEA). The project uses the latest Flink 1.15.1 version.

2. All examples are runnable from the IDE or SBT. You simply need to execute the `main()` method of every example class.
Command line example:
```shell
sbt "runMain com.ververica.example3"
```

3. In order to make the examples run within IntelliJ IDEA, it is necessary to tick
the `Add dependencies with "provided" scope to classpath` option in the run configuration under `Modify options`.

4. For the Apache Kafka examples, download and unzip [Apache Kafka](https://kafka.apache.org/downloads).

5. Start up Kafka and Zookeeper:
```shell
./bin/zookeeper-server-start.sh config/zookeeper.properties &

```
./bin/zookeeper-server-start.sh config/zookeeper.properties &
./bin/kafka-server-start.sh config/server.properties &
```

./bin/kafka-server-start.sh config/server.properties &
```

6. Run `FillKafkaWithCustomers` and `FillKafkaWithTransactions` to create and fill the Kafka topics with Flink.
6. Run `FillKafkaWithCustomers` and `FillKafkaWithTransactions` to create and fill the Kafka topics with Flink.
19 changes: 10 additions & 9 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -2,19 +2,20 @@ name := "flink-scala-3"

version := "0.1"

scalaVersion := "3.0.2"
scalaVersion := "3.2.1"

resolvers += Resolver.mavenLocal

javacOptions ++= Seq("-source", "11", "-target", "11")

libraryDependencies += "org.apache.flink" % "flink-streaming-java" % "1.15-SNAPSHOT"
libraryDependencies += "org.apache.flink" % "flink-clients" % "1.15-SNAPSHOT"
libraryDependencies += "org.apache.flink" % "flink-table-planner-loader" % "1.15-SNAPSHOT"
libraryDependencies += "org.apache.flink" % "flink-table-common" % "1.15-SNAPSHOT"
libraryDependencies += "org.apache.flink" % "flink-table-api-java" % "1.15-SNAPSHOT"
libraryDependencies += "org.apache.flink" % "flink-table-api-java-bridge" % "1.15-SNAPSHOT"
libraryDependencies += "org.apache.flink" % "flink-table-runtime" % "1.15-SNAPSHOT"
libraryDependencies += "org.apache.flink" % "flink-connector-kafka" % "1.15-SNAPSHOT"
val flinkVersion = "1.15.1"
libraryDependencies += "org.apache.flink" % "flink-streaming-java" % flinkVersion
libraryDependencies += "org.apache.flink" % "flink-clients" % flinkVersion
libraryDependencies += "org.apache.flink" % "flink-table-planner-loader" % flinkVersion
libraryDependencies += "org.apache.flink" % "flink-table-common" % flinkVersion
libraryDependencies += "org.apache.flink" % "flink-table-api-java" % flinkVersion
libraryDependencies += "org.apache.flink" % "flink-table-api-java-bridge" % flinkVersion
libraryDependencies += "org.apache.flink" % "flink-table-runtime" % flinkVersion
libraryDependencies += "org.apache.flink" % "flink-connector-kafka" % flinkVersion

libraryDependencies += "com.github.losizm" %% "little-json" % "9.0.0"
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
package com.ververica

import com.ververica.data.ExampleData
import com.ververica.models.{Customer, Transaction, TransactionDeserializer}
import org.apache.flink.api.common.eventtime.WatermarkStrategy
import org.apache.flink.api.common.state.{
ListState,
ListStateDescriptor,
ValueState,
ValueStateDescriptor
}
import org.apache.flink.configuration.Configuration
import org.apache.flink.connector.kafka.source.KafkaSource
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
import org.apache.flink.streaming.api.functions.co.{
KeyedBroadcastProcessFunction,
KeyedCoProcessFunction
}
import org.apache.flink.util.Collector
import scala.jdk.CollectionConverters._

/** Use Flink's state to perform efficient record joining based on business
* requirements.
*/
@main def example11 =
val env = StreamExecutionEnvironment.getExecutionEnvironment()

// switch to batch mode on demand
// env.setRuntimeMode(RuntimeExecutionMode.BATCH)

// read transactions
val transactionSource = KafkaSource
.builder[Transaction]
.setBootstrapServers("localhost:9092")
.setTopics("transactions")
.setStartingOffsets(OffsetsInitializer.earliest())
.setValueOnlyDeserializer(new TransactionDeserializer)
// .setBounded(OffsetsInitializer.latest())
.build()

val transactionStream =
env.fromSource(
transactionSource,
WatermarkStrategy.noWatermarks(),
"Transactions"
)

// Deduplicate using the function
// defined in example 5
val deduplicatedStream =
transactionStream
.keyBy((t: Transaction) => t.t_id)
.process(new DataStreamDeduplicate)

// join transactions and customers
env
.fromElements(ExampleData.customers: _*)
.connect(deduplicatedStream)
.keyBy((c: Customer) => c.c_id, (t: Transaction) => t.t_customer_id)
.process(new JoinCustomersWithTransaction)
.executeAndCollect
.forEachRemaining(println)

class MyFunction
extends KeyedBroadcastProcessFunction[Int, Long, String, Double]:
override def processBroadcastElement(
value: String,
ctx: KeyedBroadcastProcessFunction[Int, Long, String, Double]#Context,
out: Collector[Double]
): Unit = ???

override def processElement(
value: Long,
ctx: KeyedBroadcastProcessFunction[
Int,
Long,
String,
Double
]#ReadOnlyContext,
out: Collector[Double]
): Unit = ???

class JoinCustomersWithTransactionCopy
extends KeyedCoProcessFunction[Long, Customer, Transaction, String]:

var customer: ValueState[Customer] = _
var transactions: ListState[Transaction] = _

override def open(parameters: Configuration): Unit =
customer = getRuntimeContext.getState(
new ValueStateDescriptor("customer", classOf[Customer])
)
transactions = getRuntimeContext.getListState(
new ListStateDescriptor("transactions", classOf[Transaction])
)

override def processElement1(
in1: Customer,
context: KeyedCoProcessFunction[
Long,
Customer,
Transaction,
String
]#Context,
collector: Collector[String]
): Unit =
customer.update(in1)
val txs = transactions.get().asScala.to(LazyList)

if !txs.isEmpty then join(collector, in1, txs)

override def processElement2(
in2: Transaction,
context: KeyedCoProcessFunction[
Long,
Customer,
Transaction,
String
]#Context,
collector: Collector[String]
): Unit =
transactions.add(in2)
val c = customer.value

if c != null then
join(collector, c, transactions.get().asScala.to(LazyList))

private def join(
out: Collector[String],
c: Customer,
txs: LazyList[Transaction]
) =
txs.foreach(t => out.collect(s"${c.c_name} ${t.t_amount}"))