Skip to content

Commit

Permalink
Simplified the kafka integration tests
Browse files Browse the repository at this point in the history
- Using `testcontainers` instead of `org.apache.kafka/kafka_2.12`, to spin a Kafka dind container. This simplified the Kafka cluster startup and leaves the configuration to the battle tested `testcontainers`.
  • Loading branch information
yevgeni.t committed May 31, 2021
1 parent deadce6 commit b532c06
Show file tree
Hide file tree
Showing 4 changed files with 59 additions and 144 deletions.
6 changes: 5 additions & 1 deletion project.clj
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,11 @@
[metosin/sieppari "0.0.0-alpha13"]
[commons-io/commons-io "2.6"]
[ch.qos.logback/logback-classic "1.2.3"]
[org.apache.kafka/kafka_2.12 "2.5.1"]]

; Kafka (docker in docker)
[org.testcontainers/kafka "1.15.3"]
[clj-test-containers "0.4.0"]
[net.java.dev.jna/jna "5.8.0"]]
:jvm-opts ["-Dlogback.configurationFile=dev-logback.xml"]}

;; Tests only, silent logs
Expand Down
47 changes: 25 additions & 22 deletions test/ketu/async/integration_test.clj
Original file line number Diff line number Diff line change
Expand Up @@ -3,20 +3,20 @@
[spy.core :as spy]
[spy.assert]
[ketu.test.util :as u]
[ketu.test.embedded-kafka :as embedded-kafka]
[clojure.core.async :as async]
[ketu.clients.consumer :as consumer]
[ketu.clients.producer :as producer]
[ketu.async.source :as source]
[ketu.async.sink :as sink])
[ketu.async.sink :as sink]
[ketu.test.kafka-setup :as kafka-setup])
(:import (java.time Duration)
(org.apache.kafka.common PartitionInfo TopicPartition)
(org.apache.kafka.clients.producer RecordMetadata)
(org.apache.kafka.clients.consumer Consumer)
(org.apache.kafka.clients.admin AdminClient NewTopic)
(java.util.concurrent TimeUnit)))

(use-fixtures :each (fn [f] (embedded-kafka/with-embedded-kafka {} {} f)))
(use-fixtures :each kafka-setup/with-kafka-container)

(defn- fill-topic [topic values]
(let [;; Start a producer-sink with results chan
Expand All @@ -27,7 +27,7 @@
:name "test-sink"
:key-type :string
:value-type :string
:brokers "127.0.0.1:9999"
:brokers (kafka-setup/get-bootstrap-servers)
:ketu.sink/callback-obj (producer/callback (fn [_ _] (async/>!! result-chan ::done)))}
sink (sink/sink producer-chan sink-opts)]
;; Produce the values
Expand All @@ -41,7 +41,7 @@
(deftest end-to-end
(let [consumer-chan (async/chan 10)
clicks-consumer-opts {:name "clicks-consumer"
:brokers "127.0.0.1:9999"
:brokers (kafka-setup/get-bootstrap-servers)
:topic "clicks"
:group-id "clicks-test-consumer"
:auto-offset-reset "earliest"
Expand All @@ -51,7 +51,7 @@
:shape [:vector :key :value :offset]}
source (source/source consumer-chan clicks-consumer-opts)
clicks-producer-opts {:name "clicks-producer"
:brokers "127.0.0.1:9999"
:brokers (kafka-setup/get-bootstrap-servers)
:topic "clicks"
:key-type :string
:internal-config {"value.serializer" "org.apache.kafka.common.serialization.StringSerializer"}
Expand All @@ -67,15 +67,15 @@
(deftest shapes-and-topic-list
(let [;; Producer-sink that will send data to clicks and matches topics
multi-producer-opts {:name "multi-producer"
:brokers "127.0.0.1:9999"
:brokers (kafka-setup/get-bootstrap-servers)
:key-type :string
:value-type :string
:shape [:vector :topic :key :value]}
producer-chan (async/chan 10)
sink (sink/sink producer-chan multi-producer-opts)
;; Consumer-source that will read from clicks and matches
multi-consumer-opts {:name "multi-consumer"
:brokers "127.0.0.1:9999"
:brokers (kafka-setup/get-bootstrap-servers)
:topic-list ["clicks" "matches"]
:group-id "multi-group"
:auto-offset-reset "earliest"
Expand All @@ -101,7 +101,7 @@
(deftest producer-callback
(testing "Resolve a promise when its associated message is sent"
(let [clicks-producer-opts {:name "clicks-producer"
:brokers "127.0.0.1:9999"
:brokers (kafka-setup/get-bootstrap-servers)
:topic "clicks"
:key-type :string
:value-type :string
Expand Down Expand Up @@ -132,7 +132,7 @@

(testing "End-offset directly with end-offsets function (recommended)"
(let [;; Start a consumer without subscribing and without a group
consumer (consumer/consumer {"bootstrap.servers" "127.0.0.1:9999"
consumer (consumer/consumer {"bootstrap.servers" (kafka-setup/get-bootstrap-servers)
"key.deserializer" "org.apache.kafka.common.serialization.StringDeserializer"
"value.deserializer" "org.apache.kafka.common.serialization.StringDeserializer"})
;; Get all the topic partitions info
Expand All @@ -148,7 +148,7 @@

(testing "End-offset with assign and seek-to-end! function (not recommended)"
(let [;; Start a consumer without subscribing and without a group
consumer (consumer/consumer {"bootstrap.servers" "127.0.0.1:9999"
consumer (consumer/consumer {"bootstrap.servers" (kafka-setup/get-bootstrap-servers)
"key.deserializer" "org.apache.kafka.common.serialization.StringDeserializer"
"value.deserializer" "org.apache.kafka.common.serialization.StringDeserializer"})
;; Get all the topic partitions info
Expand All @@ -169,7 +169,8 @@
(testing "Create ConsumerRebalanceListener object when source subscribes"
(let [topic "test-topic"
;; Create test-topic with two partitions
_ (-> (AdminClient/create {"bootstrap.servers" "127.0.0.1:9999"})
admin-client (AdminClient/create {"bootstrap.servers" (kafka-setup/get-bootstrap-servers)})
_ (-> admin-client
(.createTopics [(NewTopic. topic (int 2) (short 1))])
(.all)
(.get 5 TimeUnit/SECONDS))
Expand All @@ -187,7 +188,7 @@
:consumer-assignment (->partitions (.assignment consumer))}))]
(consumer/rebalance-listener assigned revoked)))
opts {:name "test"
:brokers "127.0.0.1:9999"
:brokers (kafka-setup/get-bootstrap-servers)
:topic topic
:group-id "test-group"
:key-type :string
Expand All @@ -199,12 +200,14 @@
(is (= {:assigned [0 1] :consumer-assignment [0 1]}
(u/try-take! assignment-chan)))
;; Start another identical consumer to trigger a rebalance
(source/source ch opts)
;; First, the original consumer partitions are revoked.
(is (= {:revoked [0 1] :consumer-assignment [0 1]}
(u/try-take! assignment-chan)))
;; Then, each consumer is assigned one partition
(is (= #{{:assigned [0] :consumer-assignment [0]}
{:assigned [1] :consumer-assignment [1]}}
(hash-set (u/try-take! assignment-chan) (u/try-take! assignment-chan))))
(source/stop! source))))
(let [source-rebalance (source/source ch opts)]
;; First, the original consumer partitions are revoked.
(is (= {:revoked [0 1] :consumer-assignment [0 1]}
(u/try-take! assignment-chan)))
;; Then, each consumer is assigned one partition
(is (= #{{:assigned [0] :consumer-assignment [0]}
{:assigned [1] :consumer-assignment [1]}}
(hash-set (u/try-take! assignment-chan) (u/try-take! assignment-chan))))
(source/stop! source-rebalance)
(source/stop! source)
(.close ^AdminClient admin-client)))))
121 changes: 0 additions & 121 deletions test/ketu/test/embedded_kafka.clj

This file was deleted.

29 changes: 29 additions & 0 deletions test/ketu/test/kafka_setup.clj
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
(ns ketu.test.kafka-setup
(:require [clj-test-containers.core :as tc])
(:import (org.testcontainers.containers KafkaContainer)
(org.testcontainers.utility DockerImageName)))

(def ^:private kafka-container (atom nil))
(def ^:private ^:const container-port (KafkaContainer/KAFKA_PORT))

(defn get-bootstrap-servers
([]
(get-bootstrap-servers @kafka-container))
([container]
(.getBootstrapServers ^KafkaContainer (:container container))))

(defn start-container []
(-> {:container (KafkaContainer. (DockerImageName/parse "confluentinc/cp-kafka:5.5.3"))
:exposed-ports [container-port]}
tc/init
tc/start!))

(defn- stop-container! [container]
(tc/stop! container))

(defn with-kafka-container [test-fn]
(let [container (start-container)]
(reset! kafka-container container)
(test-fn)
(stop-container! container)
(reset! kafka-container nil)))

0 comments on commit b532c06

Please sign in to comment.