Skip to content

Publish from Kafka, Persist on MinIO in k8s

Cesar Celis Hernandez edited this page Jan 11, 2023 · 77 revisions

Diagram:

telegram-cloud-photo-size-1-5001574061863709587-y

Objective

Publish from Kafka, Persist on MinIO in k8s

Links:

Pre-Steps:

  1. Create the cluster
  • File: kind-config.yaml
# four node (two workers) cluster config
kind: Cluster
apiVersion: kind.x-k8s.io/v1alpha4
nodes:
  - role: control-plane
  - role: worker
  - role: worker
  - role: worker
  - role: worker
kind delete cluster
kind create cluster --config kind-config.yaml
  1. From https://github.com/cniackz/public/wiki/Publish-from-Kafka,-Persist-on-MinIO you need to install the plugin with Confluent Hub so that you can build the docker image in quay and load the plugin:
$ confluent-hub install confluentinc/kafka-connect-s3:latest \
>    --component-dir /Users/cniackz/confluent-plugins \
>    --worker-configs /Users/cniackz/kafka/kafka_2.13-3.3.1/config/connect-distributed.properties
 
Component's license: 
Confluent Community License 
http://www.confluent.io/confluent-community-license 
I agree to the software license agreement (yN) y

Downloading component Kafka Connect S3 10.3.0, provided by Confluent, Inc. from Confluent Hub and installing into /Users/cniackz/confluent-plugins 
Do you want to uninstall existing version 10.3.0? (yN) y

Adding installation directory to plugin path in the following files: 
  /Users/cniackz/kafka/kafka_2.13-3.3.1/config/connect-distributed.properties 
 
Completed 

Steps:

  1. Get MinIO UP and Running!
kubectl apply -k github.com/minio/operator/
kubectl apply -k ~/operator/examples/kustomization/tenant-lite
kubectl create namespace kafka
k apply -f ~/minio/ubuntu.yaml -n kafka
apt update
apt install -y wget
apt install -y iputils-ping
wget https://dl.min.io/client/mc/release/linux-amd64/mc
chmod +x mc
mv mc /usr/local/bin/mc
mc alias set myminio http://minio.tenant-lite.svc.cluster.local minio minio123
mc mb myminio/kafka-bucket
mc ls myminio/kafka-bucket
Screenshot 2022-12-29 at 5 55 36 AM Screenshot 2022-12-29 at 5 55 51 AM Screenshot 2022-12-28 at 10 52 05 AM
  1. Get Kafka Running:
# Delete any previous Kafka as it may be stuck or outdated:
kubectl delete -f 'https://strimzi.io/install/latest?namespace=kafka' -n kafka
kubectl delete -f https://strimzi.io/examples/latest/kafka/kafka-persistent-single.yaml -n kafka
kubectl delete -f /Users/cniackz/strimzi-kafka-operator/examples/connect/kafka-connect.yaml -n kafka
# TODO: Make sure to delete all PVC related to Kafka.
kubectl create -f 'https://strimzi.io/install/latest?namespace=kafka' -n kafka
kubectl apply -f https://strimzi.io/examples/latest/kafka/kafka-persistent-single.yaml -n kafka
Screenshot 2022-12-28 at 10 44 11 AM
  1. Create the topics: minio-topic-1 and connect-offsets:
exec kubectl exec -i -t -n kafka my-cluster-kafka-0 -c kafka -- sh -c "clear; (bash || ash || sh)"
cd bin
./kafka-topics.sh --bootstrap-server localhost:9092 --topic minio-topic-1 --create --partitions 1 --replication-factor 1 --config cleanup.policy=compact
./kafka-topics.sh --bootstrap-server localhost:9092 --topic connect-offsets --create --partitions 1 --replication-factor 1 --config cleanup.policy=compact

Expected:

192:~ cniackz$ exec kubectl exec -i -t -n kafka my-cluster-kafka-0 -c kafka -- sh -c "clear; (bash || ash || sh)"
sh: clear: command not found
[kafka@my-cluster-kafka-0 kafka]$ cd bin
[kafka@my-cluster-kafka-0 bin]$ ./kafka-topics.sh \
>   --bootstrap-server localhost:9092 \
>   --topic minio-topic-1 \
>   --create --partitions 1 \
>   --replication-factor 1 \
>   --config cleanup.policy=compact
Created topic minio-topic-1.
[kafka@my-cluster-kafka-0 bin]$ 
[kafka@my-cluster-kafka-0 bin]$ ./kafka-topics.sh --bootstrap-server localhost:9092 --topic connect-offsets --create --partitions 1 --replication-factor 1 --config cleanup.policy=compact
Created topic connect-offsets.
  1. Produce 4 messages to send data to minio flush.size: '3':
kubectl -n kafka run kafka-producer -ti \
  --image=quay.io/strimzi/kafka:0.32.0-kafka-3.3.1 \
  --rm=true --restart=Never -- bin/kafka-console-producer.sh \
  --bootstrap-server my-cluster-kafka-bootstrap:9092 \
  --topic minio-topic-1 \
  --property parse.key=true \
  --property key.separator=,
$ kubectl -n kafka run kafka-producer -ti --image=quay.io/strimzi/kafka:0.32.0-kafka-3.3.1 --rm=true --restart=Never -- bin/kafka-console-producer.sh --bootstrap-server my-cluster-kafka-bootstrap:9092 --topic minio-topic-1 --property parse.key=true --property key.separator=,
If you don't see a command prompt, try pressing enter.
>123,123
>234,234
>345,345
>456,456
>
  1. Create image for plugin:
cd /Users/cniackz/kafka-connect
docker build .
docker login --username cniackz4 https://quay.io
docker tag 721f43699515 quay.io/cniackz4/kafkaconnect:latest
docker push quay.io/cniackz4/kafkaconnect:latest
  • When running docker build . I did this in Ubuntu x86 for our Intel cluster otherwise there will be architecture issue, if you run local in kind, it can be build on MacBook Pro M1.

  • After docker push don't forget to make it Public so that it can be downloaded from within the cluster.

  • File: /Users/cniackz/kafka-connect/Dockerfile

  • Don't forget to update accordingly: ENV AWS_ACCESS_KEY_ID=<USER> and ENV AWS_SECRET_ACCESS_KEY=<PASSWORD>

FROM quay.io/strimzi/kafka:latest-kafka-3.3.1
USER root:root
COPY ./confluent-plugins/ /opt/kafka/plugins/
USER kafka:kafka
ENV AWS_ACCESS_KEY_ID=<USER>
ENV AWS_SECRET_ACCESS_KEY=<PASSWORD>
  1. Apply Kafka Connect:
  • Make sure IP is correct bootstrap.servers: 10.244.1.5:9092

  • Make sure to update store.url: http://10.244.4.13:9000 accordingly get the IP from the MinIO Pod.

  • Don't forget to create the bucket: s3.bucket.name: kafka-bucket

  • Remember you need more than 3 messages to send file to MinIO: flush.size: '3'

  • Also remember to get/update topic accordingly: topics: minio-topic-1

  • File: /Users/cniackz/strimzi-kafka-operator/examples/connect/kafka-connect.yaml

apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnect
metadata:
  name: my-connect-cluster
  namespace: kafka
  annotations:
    strimzi.io/use-connector-resources: "true"
spec:
  image: quay.io/cniackz4/kafkaconnect:latest
  version: 3.3.1
  replicas: 1
  bootstrapServers: my-cluster-kafka-bootstrap:9093
  tls:
    trustedCertificates:
      - secretName: my-cluster-cluster-ca-cert
        certificate: ca.crt
  config:
    bootstrap.servers: 10.244.1.5:9092
    group.id: connect-cluster
    key.converter: org.apache.kafka.connect.json.JsonConverter
    value.converter: org.apache.kafka.connect.json.JsonConverter
    key.converter.schemas.enable: false
    value.converter.schemas.enable: false
    offset.storage.topic: connect-offsets
    offset.storage.replication.factor: 1
    config.storage.topic: connect-configs
    config.storage.replication.factor: 1
    status.storage.topic: connect-status
    status.storage.replication.factor: 1
    offset.flush.interval.ms: 10000
    plugin.path: /opt/kafka/plugins
    offset.storage.file.filename: /tmp/connect.offsets

---

apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnector
metadata:
  name: "minio-connector"
  labels:
    strimzi.io/cluster: my-connect-cluster
spec:
  class: io.confluent.connect.s3.S3SinkConnector
  config:
    connector.class: io.confluent.connect.s3.S3SinkConnector
    task.max: '1'
    topics: minio-topic-1
    s3.region: us-east-1
    s3.bucket.name: kafka-bucket
    s3.part.size: '5242880'
    flush.size: '3'
    store.url: http://10.244.3.10:9000
    storage.class: io.confluent.connect.s3.storage.S3Storage
    format.class: io.confluent.connect.s3.format.json.JsonFormat
    schema.generator.class: io.confluent.connect.storage.hive.schema.DefaultSchemaGenerator
    partitioner.class: io.confluent.connect.storage.partitioner.DefaultPartitioner
    schema.compatibility: NONE
    behavior.on.null.values: ignore
kubectl config use-context kubernetes-admin@kubernetes
cd /Users/cniackz/strimzi-kafka-operator
kubectl delete -f examples/connect/kafka-connect.yaml -n kafka
kubectl apply -f examples/connect/kafka-connect.yaml -n kafka

Expected Log is from pod my-connect-cluster-connect-c47f64b46-d4xnz:

2022-12-29 12:22:16,084 INFO [minio-connector|task-0] Files committed to S3. Target commit offset for minio-topic-1-0 is 3 (io.confluent.connect.s3.TopicPartitionWriter) [task-thread-minio-connector-0]

Expected output is:

root@ubuntu:/# mc ls myminio/kafka-bucket
[2022-12-29 12:22:38 UTC]     0B topics/
root@ubuntu:/# mc ls myminio/kafka-bucket/topics
[2022-12-29 12:26:10 UTC]     0B minio-topic-1/
root@ubuntu:/# mc ls myminio/kafka-bucket/topics/minio-topic-1
[2022-12-29 12:27:57 UTC]     0B partition=0/
root@ubuntu:/# mc ls myminio/kafka-bucket/topics/minio-topic-1/partition=0
[2022-12-29 12:22:16 UTC]    12B STANDARD minio-topic-1+0+0000000000.json
root@ubuntu:/# mc cat myminio/kafka-bucket/topics/minio-topic-1/partition=0/minio-topic-1+0+0000000000.json
123
234
345
root@ubuntu:/# 

Tested

  • Tested locally on MacBook Pro M1: Passed [Wed Dec 28 2022]

  • Tested locally on MacBook Pro M1: Passed [Fri Jan 06 2023]

  • Tested in our DC cluster: Passed [Fri Dec 30 2022]

Screenshot 2022-12-30 at 9 01 43 AM

How to send JSON data:

192:~ cniackz$ kubectl -n kafka run kafka-producer -ti \
>   --image=quay.io/strimzi/kafka:0.32.0-kafka-3.3.1 \
>   --rm=true --restart=Never -- bin/kafka-console-producer.sh \
>   --bootstrap-server my-cluster-kafka-bootstrap:9092 \
>   --topic minio-topic-1 \
>   --property parse.key=true \
>   --property key.separator=,
If you don't see a command prompt, try pressing enter.
>123,123
>234,234
>345,345
>456,456
>555,555                                 
>666,666
>777,777
>888,{"a":"a"}
>999,{"a":"a"}
>AAA,{"a":"a"} <--- This will break us: Unrecognized token 'AAA': was expecting:
  |
  |___ JSON String, Number, Array, Object or token 'null', 'true' or 'false'
root@ubuntu:/# mc ls myminio/kafka-bucket/topics/minio-topic-1/partition=0
[2023-01-06 17:25:14 UTC]    12B STANDARD minio-topic-1+0+0000000000.json
root@ubuntu:/# mc cat myminio/kafka-bucket/topics/minio-topic-1/partition=0/minio-topic-1+0+0000000000.json
123
234
345
root@ubuntu:/# mc cat myminio/kafka-bucket/topics/minio-topic-1/partition=0/minio-topic-1+0+0000000000.json
123
234
345
root@ubuntu:/# mc ls myminio/kafka-bucket/topics/minio-topic-1/partition=0                                
[2023-01-06 17:25:14 UTC]    12B STANDARD minio-topic-1+0+0000000000.json
[2023-01-06 17:30:03 UTC]    12B STANDARD minio-topic-1+0+0000000003.json
root@ubuntu:/# mc cat myminio/kafka-bucket/topics/minio-topic-1/partition=0/minio-topic-1+0+0000000003.json
456
555
666
root@ubuntu:/# mc ls myminio/kafka-bucket/topics/minio-topic-1/partition=0
[2023-01-06 17:25:14 UTC]    12B STANDARD minio-topic-1+0+0000000000.json
[2023-01-06 17:30:03 UTC]    12B STANDARD minio-topic-1+0+0000000003.json
[2023-01-06 17:33:00 UTC]    24B STANDARD minio-topic-1+0+0000000006.json
root@ubuntu:/# mc cat myminio/kafka-bucket/topics/minio-topic-1/partition=0/minio-topic-1+0+0000000006.json
777
{"a":"a"}
{"a":"a"}
root@ubuntu:/# 
Screenshot 2023-01-06 at 11 35 46 AM

Script to convert from csv to json for Kafka

import pdb
import csv 
import json 

def csv_to_json(csvFilePath, jsonFilePath):
    jsonArray = []
      
    #read csv file
    with open(csvFilePath, encoding='utf-8') as csvf: 
        #load csv file data using csv library's dictionary reader
        csvReader = csv.DictReader(csvf) 

        #convert each csv row into python dict
        for row in csvReader: 
            #add this python dict to json array
            jsonArray.append(row)
            #pdb.set_trace()
  
    f = open(jsonFilePath, "a")
    counter = 0
    for item in jsonArray:
        line = str(counter) + ","  + json.dumps(item, indent=None, separators=(",",":"))
        counter = counter + 1
        f.write(line + '\n')
    f.close()
          
csvFilePath = r'taxi-data.csv'
jsonFilePath = r'taxi-data.json'
csv_to_json(csvFilePath, jsonFilePath)
  • From:
VendorID,tpep_pickup_datetime,tpep_dropoff_datetime,passenger_count,trip_distance,RatecodeID,store_and_fwd_flag,PULocationID,DOLocationID,payment_type,fare_amount,extra,mta_tax,tip_amount,tolls_amount,improvement_surcharge,total_amount
2,07/11/2018 07:15:43 PM,07/11/2018 07:19:03 PM,1,0.45,1,N,249,158,1,4.5,1,0.5,1.08,0,0.3,7.38
2,07/11/2018 07:36:35 PM,07/11/2018 07:46:31 PM,1,1.24,1,N,100,48,1,8,1,0.5,2.45,0,0.3,12.25
2,07/11/2018 07:51:37 PM,07/11/2018 07:58:25 PM,1,0.97,1,N,48,142,1,6.5,1,0.5,1.66,0,0.3,9.96
1,07/11/2018 07:13:51 PM,07/11/2018 07:16:29 PM,1,0.6,1,N,90,90,2,4,1,0.5,0,0,0.3,5.8
1,07/11/2018 07:06:21 PM,07/11/2018 07:20:18 PM,1,2,1,N,234,231,1,11,1,0.5,2.55,0,0.3,15.35
1,07/11/2018 07:24:54 PM,07/11/2018 07:26:24 PM,1,0.5,1,N,125,249,1,3.5,1,0.5,0.79,0,0.3,6.09
1,07/11/2018 07:27:31 PM,07/11/2018 07:47:54 PM,2,5.5,1,N,249,239,1,19,1,0.5,4.15,0,0.3,24.95
1,07/11/2018 07:15:18 PM,07/11/2018 07:21:40 PM,1,1.1,1,N,162,170,1,6,1,0.5,1.55,0,0.3,9.35
1,07/11/2018 07:24:52 PM,07/11/2018 07:30:04 PM,1,1.2,1,N,233,141,2,6,1,0.5,0,0,0.3,7.8
  • To
0,{"VendorID":"2","tpep_pickup_datetime":"07/11/2018 07:15:43 PM","tpep_dropoff_datetime":"07/11/2018 07:19:03 PM","passenger_count":"1","trip_distance":"0.45","RatecodeID":"1","store_and_fwd_flag":"N","PULocationID":"249","DOLocationID":"158","payment_type":"1","fare_amount":"4.5","extra":"1","mta_tax":"0.5","tip_amount":"1.08","tolls_amount":"0","improvement_surcharge":"0.3","total_amount":"7.38"}
1,{"VendorID":"2","tpep_pickup_datetime":"07/11/2018 07:36:35 PM","tpep_dropoff_datetime":"07/11/2018 07:46:31 PM","passenger_count":"1","trip_distance":"1.24","RatecodeID":"1","store_and_fwd_flag":"N","PULocationID":"100","DOLocationID":"48","payment_type":"1","fare_amount":"8","extra":"1","mta_tax":"0.5","tip_amount":"2.45","tolls_amount":"0","improvement_surcharge":"0.3","total_amount":"12.25"}
2,{"VendorID":"2","tpep_pickup_datetime":"07/11/2018 07:51:37 PM","tpep_dropoff_datetime":"07/11/2018 07:58:25 PM","passenger_count":"1","trip_distance":"0.97","RatecodeID":"1","store_and_fwd_flag":"N","PULocationID":"48","DOLocationID":"142","payment_type":"1","fare_amount":"6.5","extra":"1","mta_tax":"0.5","tip_amount":"1.66","tolls_amount":"0","improvement_surcharge":"0.3","total_amount":"9.96"}
3,{"VendorID":"1","tpep_pickup_datetime":"07/11/2018 07:13:51 PM","tpep_dropoff_datetime":"07/11/2018 07:16:29 PM","passenger_count":"1","trip_distance":"0.6","RatecodeID":"1","store_and_fwd_flag":"N","PULocationID":"90","DOLocationID":"90","payment_type":"2","fare_amount":"4","extra":"1","mta_tax":"0.5","tip_amount":"0","tolls_amount":"0","improvement_surcharge":"0.3","total_amount":"5.8"}
4,{"VendorID":"1","tpep_pickup_datetime":"07/11/2018 07:06:21 PM","tpep_dropoff_datetime":"07/11/2018 07:20:18 PM","passenger_count":"1","trip_distance":"2","RatecodeID":"1","store_and_fwd_flag":"N","PULocationID":"234","DOLocationID":"231","payment_type":"1","fare_amount":"11","extra":"1","mta_tax":"0.5","tip_amount":"2.55","tolls_amount":"0","improvement_surcharge":"0.3","total_amount":"15.35"}
5,{"VendorID":"1","tpep_pickup_datetime":"07/11/2018 07:24:54 PM","tpep_dropoff_datetime":"07/11/2018 07:26:24 PM","passenger_count":"1","trip_distance":"0.5","RatecodeID":"1","store_and_fwd_flag":"N","PULocationID":"125","DOLocationID":"249","payment_type":"1","fare_amount":"3.5","extra":"1","mta_tax":"0.5","tip_amount":"0.79","tolls_amount":"0","improvement_surcharge":"0.3","total_amount":"6.09"}
6,{"VendorID":"1","tpep_pickup_datetime":"07/11/2018 07:27:31 PM","tpep_dropoff_datetime":"07/11/2018 07:47:54 PM","passenger_count":"2","trip_distance":"5.5","RatecodeID":"1","store_and_fwd_flag":"N","PULocationID":"249","DOLocationID":"239","payment_type":"1","fare_amount":"19","extra":"1","mta_tax":"0.5","tip_amount":"4.15","tolls_amount":"0","improvement_surcharge":"0.3","total_amount":"24.95"}
7,{"VendorID":"1","tpep_pickup_datetime":"07/11/2018 07:15:18 PM","tpep_dropoff_datetime":"07/11/2018 07:21:40 PM","passenger_count":"1","trip_distance":"1.1","RatecodeID":"1","store_and_fwd_flag":"N","PULocationID":"162","DOLocationID":"170","payment_type":"1","fare_amount":"6","extra":"1","mta_tax":"0.5","tip_amount":"1.55","tolls_amount":"0","improvement_surcharge":"0.3","total_amount":"9.35"}
8,{"VendorID":"1","tpep_pickup_datetime":"07/11/2018 07:24:52 PM","tpep_dropoff_datetime":"07/11/2018 07:30:04 PM","passenger_count":"1","trip_distance":"1.2","RatecodeID":"1","store_and_fwd_flag":"N","PULocationID":"233","DOLocationID":"141","payment_type":"2","fare_amount":"6","extra":"1","mta_tax":"0.5","tip_amount":"0","tolls_amount":"0","improvement_surcharge":"0.3","total_amount":"7.8"}
  • You see all events:
Screenshot 2023-01-06 at 4 11 19 PM
  • Example:
root@ubuntu:/# mc cat kafka/kafka-bucket/topics/minio-topic-1/partition=0/minio-topic-1+0+0000000339.json
{"DOLocationID":"209","RatecodeID":"1","fare_amount":"5.5","tpep_dropoff_datetime":"07/11/2018 07:55:40 PM","VendorID":"2","passenger_count":"2","tolls_amount":"0","improvement_surcharge":"0.3","trip_distance":"0.92","store_and_fwd_flag":"N","payment_type":"1","total_amount":"8.3","extra":"1","tip_amount":"1","mta_tax":"0.5","tpep_pickup_datetime":"07/11/2018 07:50:18 PM","PULocationID":"261"}
{"DOLocationID":"48","RatecodeID":"1","fare_amount":"8","tpep_dropoff_datetime":"07/11/2018 07:15:46 PM","VendorID":"2","passenger_count":"2","tolls_amount":"0","improvement_surcharge":"0.3","trip_distance":"0.97","store_and_fwd_flag":"N","payment_type":"2","total_amount":"9.8","extra":"1","tip_amount":"0","mta_tax":"0.5","tpep_pickup_datetime":"07/11/2018 07:05:17 PM","PULocationID":"186"}
{"DOLocationID":"114","RatecodeID":"1","fare_amount":"11.5","tpep_dropoff_datetime":"07/11/2018 07:36:22 PM","VendorID":"2","passenger_count":"2","tolls_amount":"0","improvement_surcharge":"0.3","trip_distance":"2.4","store_and_fwd_flag":"N","payment_type":"1","total_amount":"16.62","extra":"1","tip_amount":"3.32","mta_tax":"0.5","tpep_pickup_datetime":"07/11/2018 07:21:14 PM","PULocationID":"48"}

Connector for ns3

apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnect
metadata:
  name: my-connect-cluster
  namespace: kafka
  annotations:
    strimzi.io/use-connector-resources: "true"
spec:
  image: quay.io/cniackz4/kafkaconnect:latest
  version: 3.3.1
  replicas: 1
  bootstrapServers: my-cluster-kafka-bootstrap:9093
  tls:
    trustedCertificates:
      - secretName: my-cluster-cluster-ca-cert
        certificate: ca.crt
  config:
    bootstrap.servers: 192.168.177.26:9092
    group.id: connect-cluster
    key.converter: org.apache.kafka.connect.json.JsonConverter
    value.converter: org.apache.kafka.connect.json.JsonConverter
    key.converter.schemas.enable: false
    value.converter.schemas.enable: false
    offset.storage.topic: connect-offsets
    offset.storage.replication.factor: 1
    config.storage.topic: connect-configs
    config.storage.replication.factor: 1
    status.storage.topic: connect-status
    status.storage.replication.factor: 1
    offset.flush.interval.ms: 10000
    plugin.path: /opt/kafka/plugins
    offset.storage.file.filename: /tmp/connect.offsets

---

apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnector
metadata:
  name: "minio-connector"
  labels:
    strimzi.io/cluster: my-connect-cluster
spec:
  class: io.confluent.connect.s3.S3SinkConnector
  config:
    connector.class: io.confluent.connect.s3.S3SinkConnector
    task.max: '1'
    topics: minio-topic-1
    s3.region: us-east-1
    s3.bucket.name: kafka-bucket
    s3.part.size: '5242880'
    flush.size: '3'
    store.url: https://ns-3.ic.min.dev
    storage.class: io.confluent.connect.s3.storage.S3Storage
    format.class: io.confluent.connect.s3.format.json.JsonFormat
    schema.generator.class: io.confluent.connect.storage.hive.schema.DefaultSchemaGenerator
    partitioner.class: io.confluent.connect.storage.partitioner.DefaultPartitioner
    schema.compatibility: NONE
    behavior.on.null.values: ignore
Clone this wiki locally