From 0bb6586ea167379e0ecd41da4840850b681d12f2 Mon Sep 17 00:00:00 2001 From: Charith Ellawala Date: Sun, 8 Mar 2015 16:14:44 +0000 Subject: [PATCH] Switch to using ExternalResource --- README.md | 15 ++- .../github/charithe/kafka/KafkaJunitRule.java | 99 +++++++++---------- .../kafka/KafkaJunitClassRuleTest.java | 78 +++++++++++++++ .../charithe/kafka/KafkaJunitRuleTest.java | 22 +++-- 4 files changed, 152 insertions(+), 62 deletions(-) create mode 100644 src/test/java/com/github/charithe/kafka/KafkaJunitClassRuleTest.java diff --git a/README.md b/README.md index 11bde1d..197c1b5 100644 --- a/README.md +++ b/README.md @@ -13,14 +13,25 @@ Use https://jitpack.io/ until I get around to doing a proper release to Maven Ce Usage ------ -Create an instance of the rule in your test class and annotate it with `@Rule`. +Create an instance of the rule in your test class and annotate it with `@Rule`. This will start and stop the +broker between each test invocation. ```java @Rule public KafkaJunitRule kafkaRule = new KafkaJunitRule(); ``` -`kafkaRule` can now be referenced from within your test methods. + + To spin up the broker at the beginning of a test suite and tear it down at the end, use `@ClassRule`. + + ```java + @ClassRule + public static KafkaJunitRule kafkaRule = new KafkaJunitRule(); + ``` + + + +`kafkaRule` can be referenced from within your test methods to obtain information about the Kafka broker. ```java @Test diff --git a/src/main/java/com/github/charithe/kafka/KafkaJunitRule.java b/src/main/java/com/github/charithe/kafka/KafkaJunitRule.java index 1c2d1d7..a870d3a 100644 --- a/src/main/java/com/github/charithe/kafka/KafkaJunitRule.java +++ b/src/main/java/com/github/charithe/kafka/KafkaJunitRule.java @@ -22,9 +22,7 @@ import kafka.server.KafkaServerStartable; import org.apache.curator.test.InstanceSpec; import org.apache.curator.test.TestingServer; -import org.junit.rules.TestRule; -import org.junit.runner.Description; -import org.junit.runners.model.Statement; +import org.junit.rules.ExternalResource; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -39,41 +37,66 @@ /** * Starts up a local Zookeeper and a Kafka broker */ -public class KafkaJunitRule implements TestRule { +public class KafkaJunitRule extends ExternalResource { private static final Logger LOGGER = LoggerFactory.getLogger(KafkaJunitRule.class); private TestingServer zookeeper; private KafkaServerStartable kafkaServer; + private int zookeeperPort; + private String zookeeperConnectionString; private int kafkaPort = 9092; private Path kafkaLogDir; - @Override - public Statement apply(final Statement statement, Description description) { - return new Statement() { - @Override - public void evaluate() throws Throwable { - try { - startKafkaServer(); - statement.evaluate(); - } finally { - stopKafkaServer(); - } - } - }; - } - private void startKafkaServer() throws Exception { + @Override + protected void before() throws Throwable { zookeeper = new TestingServer(true); - String zkQuorumStr = zookeeper.getConnectString(); - KafkaConfig kafkaConfig = buildKafkaConfig(zkQuorumStr); + zookeeperPort = zookeeper.getPort(); + zookeeperConnectionString = zookeeper.getConnectString(); + KafkaConfig kafkaConfig = buildKafkaConfig(zookeeperConnectionString); LOGGER.info("Starting Kafka server with config: {}", kafkaConfig.props().props()); kafkaServer = new KafkaServerStartable(kafkaConfig); kafkaServer.startup(); } + @Override + protected void after() { + try { + if (kafkaServer != null) { + LOGGER.info("Shutting down Kafka Server"); + kafkaServer.shutdown(); + } + + if (zookeeper != null) { + LOGGER.info("Shutting down Zookeeper"); + zookeeper.close(); + } + + if (Files.exists(kafkaLogDir)) { + LOGGER.info("Deleting the log dir: {}", kafkaLogDir); + Files.walkFileTree(kafkaLogDir, new SimpleFileVisitor() { + @Override + public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) throws IOException { + Files.deleteIfExists(file); + return FileVisitResult.CONTINUE; + } + + @Override + public FileVisitResult postVisitDirectory(Path dir, IOException exc) throws IOException { + Files.deleteIfExists(dir); + return FileVisitResult.CONTINUE; + } + }); + } + } + catch(Exception e){ + LOGGER.error("Failed to clean-up Kafka",e); + } + } + private KafkaConfig buildKafkaConfig(String zookeeperQuorum) throws IOException { kafkaLogDir = Files.createTempDirectory("kafka_junit"); kafkaPort = InstanceSpec.getRandomPort(); @@ -88,34 +111,6 @@ private KafkaConfig buildKafkaConfig(String zookeeperQuorum) throws IOException return new KafkaConfig(props); } - private void stopKafkaServer() throws IOException { - if (kafkaServer != null) { - LOGGER.info("Shutting down Kafka Server"); - kafkaServer.shutdown(); - } - - if (zookeeper != null) { - LOGGER.info("Shutting down Zookeeper"); - zookeeper.close(); - } - - if (Files.exists(kafkaLogDir)) { - LOGGER.info("Deleting the log dir: {}", kafkaLogDir); - Files.walkFileTree(kafkaLogDir, new SimpleFileVisitor() { - @Override - public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) throws IOException { - Files.deleteIfExists(file); - return FileVisitResult.CONTINUE; - } - - @Override - public FileVisitResult postVisitDirectory(Path dir, IOException exc) throws IOException { - Files.deleteIfExists(dir); - return FileVisitResult.CONTINUE; - } - }); - } - } /** * Create a producer configuration. @@ -140,7 +135,7 @@ public ProducerConfig producerConfig() { */ public ConsumerConfig consumerConfig() { Properties props = new Properties(); - props.put("zookeeper.connect", zookeeper.getConnectString()); + props.put("zookeeper.connect", zookeeperConnectionString); props.put("group.id", "kafka-junit-consumer"); props.put("zookeeper.session.timeout.ms", "400"); props.put("zookeeper.sync.time.ms", "200"); @@ -170,7 +165,7 @@ public int kafkaBrokerPort(){ * @return zookeeper port */ public int zookeeperPort(){ - return zookeeper.getPort(); + return zookeeperPort; } /** @@ -178,6 +173,6 @@ public int zookeeperPort(){ * @return zookeeper connection string */ public String zookeeperConnectionString(){ - return zookeeper.getConnectString(); + return zookeeperConnectionString; } } diff --git a/src/test/java/com/github/charithe/kafka/KafkaJunitClassRuleTest.java b/src/test/java/com/github/charithe/kafka/KafkaJunitClassRuleTest.java new file mode 100644 index 0000000..870ac8b --- /dev/null +++ b/src/test/java/com/github/charithe/kafka/KafkaJunitClassRuleTest.java @@ -0,0 +1,78 @@ +/* + * Copyright 2015 Charith Ellawala + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.github.charithe.kafka; + +import kafka.consumer.Consumer; +import kafka.consumer.ConsumerConfig; +import kafka.consumer.ConsumerIterator; +import kafka.consumer.KafkaStream; +import kafka.javaapi.consumer.ConsumerConnector; +import kafka.javaapi.producer.Producer; +import kafka.message.MessageAndMetadata; +import kafka.producer.KeyedMessage; +import kafka.producer.ProducerConfig; +import kafka.serializer.StringDecoder; +import org.junit.ClassRule; +import org.junit.Rule; +import org.junit.Test; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static org.hamcrest.CoreMatchers.*; +import static org.junit.Assert.assertThat; + + +public class KafkaJunitClassRuleTest { + + private static final String TOPIC = "topicY"; + private static final String KEY = "keyY"; + private static final String VALUE = "valueY"; + + @ClassRule + public static KafkaJunitRule kafkaRule = new KafkaJunitRule(); + + @Test + public void testKafkaServerIsUp() { + ProducerConfig conf = kafkaRule.producerConfig(); + Producer producer = new Producer<>(conf); + producer.send(new KeyedMessage<>(TOPIC, KEY, VALUE)); + producer.close(); + + + ConsumerConfig consumerConf = kafkaRule.consumerConfig(); + ConsumerConnector consumer = Consumer.createJavaConsumerConnector(consumerConf); + Map topicCountMap = new HashMap<>(); + topicCountMap.put(TOPIC, 1); + Map>> consumerMap = consumer + .createMessageStreams(topicCountMap, new StringDecoder(consumerConf.props()), + new StringDecoder(consumerConf.props())); + List> streams = consumerMap.get(TOPIC); + + assertThat(streams, is(notNullValue())); + assertThat(streams.size(), is(equalTo(1))); + + KafkaStream ks = streams.get(0); + ConsumerIterator iterator = ks.iterator(); + MessageAndMetadata msg = iterator.next(); + + assertThat(msg, is(notNullValue())); + assertThat(msg.key(), is(equalTo(KEY))); + assertThat(msg.message(), is(equalTo(VALUE))); + } +} diff --git a/src/test/java/com/github/charithe/kafka/KafkaJunitRuleTest.java b/src/test/java/com/github/charithe/kafka/KafkaJunitRuleTest.java index fd1202a..cb7b7c5 100644 --- a/src/test/java/com/github/charithe/kafka/KafkaJunitRuleTest.java +++ b/src/test/java/com/github/charithe/kafka/KafkaJunitRuleTest.java @@ -39,6 +39,10 @@ public class KafkaJunitRuleTest { + private static final String TOPIC = "topicX"; + private static final String KEY = "keyX"; + private static final String VALUE = "valueX"; + @Rule public KafkaJunitRule kafkaRule = new KafkaJunitRule(); @@ -46,17 +50,18 @@ public class KafkaJunitRuleTest { public void testKafkaServerIsUp() { ProducerConfig conf = kafkaRule.producerConfig(); Producer producer = new Producer<>(conf); - producer.send(new KeyedMessage<>("topic", "k1", "value1")); + producer.send(new KeyedMessage<>(TOPIC, KEY, VALUE)); producer.close(); + ConsumerConfig consumerConf = kafkaRule.consumerConfig(); ConsumerConnector consumer = Consumer.createJavaConsumerConnector(consumerConf); - Map topicCountMap = new HashMap<>(); - topicCountMap.put("topic", 1); - Map>> consumerMap = consumer.createMessageStreams(topicCountMap, new StringDecoder(consumerConf.props()), new StringDecoder(consumerConf.props())); - List> streams = consumerMap.get("topic"); - + topicCountMap.put(TOPIC, 1); + Map>> consumerMap = consumer + .createMessageStreams(topicCountMap, new StringDecoder(consumerConf.props()), + new StringDecoder(consumerConf.props())); + List> streams = consumerMap.get(TOPIC); assertThat(streams, is(notNullValue())); assertThat(streams.size(), is(equalTo(1))); @@ -64,9 +69,10 @@ public void testKafkaServerIsUp() { KafkaStream ks = streams.get(0); ConsumerIterator iterator = ks.iterator(); MessageAndMetadata msg = iterator.next(); + assertThat(msg, is(notNullValue())); - assertThat(msg.key(), is(equalTo("k1"))); - assertThat(msg.message(), is(equalTo("value1"))); + assertThat(msg.key(), is(equalTo(KEY))); + assertThat(msg.message(), is(equalTo(VALUE))); } }