Often times developers or users want to be able to quickly try out the Flink Operator with a long-running streaming application and test features like taking savepoints. The WordCount example including in the Flink release cannot do the job, because it exits after processing the input file. In this case, you might need to have a streaming data source (e.g., a Apache Kafka cluster), a streaming data generator and a Flink streaming application for testing purposes. This document introduces how to setup such a test environment.
- a running Kubernetes cluster with enough capacity
- a running Flink Operator in the cluster
- Helm 3+ installed on your local machine
Create namespace kafka
and install Kafka including Zookeeper in it:
kubectl create ns kafka
helm repo add incubator http://storage.googleapis.com/kubernetes-charts-incubator
helm install my-kafka incubator/kafka --namespace kafka
After that Kafka broker service will be available at my-kafka.kafka.svc.cluster.local:9092
, run the following command
to view more details:
helm status my-kafka -n kafka
Deploy the ClickGenerator application from the Flink Operations Playground to write data to the Kafka cluster.
You can create a Docker image from the Dockerfile or use the existing image functicons/flink-ops-playground:2-FLINK-1.9-scala_2.11
to create a deployment manifest.
kafka_click_generator.yaml
:
apiVersion: apps/v1beta1
kind: Deployment
metadata:
name: kafka-click-generator
spec:
replicas: 1
template:
metadata:
labels:
app: kafka-click-generator
spec:
containers:
- name: kafka-click-generator
image: functicons/flink-ops-playground:2-FLINK-1.9-scala_2.11
command: ["java"]
args:
- "-classpath"
- "/opt/ClickCountJob.jar:/opt/flink/lib/*"
- "org.apache.flink.playgrounds.ops.clickcount.ClickEventGenerator"
- "--bootstrap.servers"
- "my-kafka.kafka.svc.cluster.local:9092"
- "--topic"
- "input"
then run
kubectl apply -f kafka_click_generator.yaml
Now you can create a Flink job cluster CR with the ClickEventCount application.
flinkcluster_clickcount.yaml
:
apiVersion: flinkoperator.k8s.io/v1beta1
kind: FlinkCluster
metadata:
name: flinkcluster-clickcount
spec:
image:
name: functicons/flink-ops-playground:2-FLINK-1.9-scala_2.11
jobManager:
ports:
ui: 8081
resources:
limits:
memory: "2Gi"
cpu: "200m"
taskManager:
replicas: 2
resources:
limits:
memory: "2Gi"
cpu: "200m"
job:
jarFile: /opt/ClickCountJob.jar
className: org.apache.flink.playgrounds.ops.clickcount.ClickEventCount
args:
[
"--bootstrap.servers",
"my-kafka.kafka.svc.cluster.local:9092",
"--checkpointing",
"--event-time",
]
parallelism: 2
then run the following command to launch the streaming application:
kubectl apply -f flinkcluster_clickcount.yaml
After that you can check the Flink cluster and job status with:
kubectl describe flinkclusters flinkcluster-clickcount
Delete the FlinkCluster custom resource:
kubectl delete flinkclusters flinkcluster-clickcount
Delete ClickGenerator:
kubectl delete deployments kafka-click-generator
Delete Kafka:
helm uninstall my-kafka -n kafka