Skip to content

Commit

Permalink
Switch to using ExternalResource
Browse files Browse the repository at this point in the history
  • Loading branch information
Charith Ellawala committed Mar 8, 2015
1 parent b1bc7c0 commit 0bb6586
Show file tree
Hide file tree
Showing 4 changed files with 152 additions and 62 deletions.
15 changes: 13 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,25 @@ Use https://jitpack.io/ until I get around to doing a proper release to Maven Ce
Usage
------

Create an instance of the rule in your test class and annotate it with `@Rule`.
Create an instance of the rule in your test class and annotate it with `@Rule`. This will start and stop the
broker between each test invocation.

```java
@Rule
public KafkaJunitRule kafkaRule = new KafkaJunitRule();
```

`kafkaRule` can now be referenced from within your test methods.

To spin up the broker at the beginning of a test suite and tear it down at the end, use `@ClassRule`.

```java
@ClassRule
public static KafkaJunitRule kafkaRule = new KafkaJunitRule();
```



`kafkaRule` can be referenced from within your test methods to obtain information about the Kafka broker.

```java
@Test
Expand Down
99 changes: 47 additions & 52 deletions src/main/java/com/github/charithe/kafka/KafkaJunitRule.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,7 @@
import kafka.server.KafkaServerStartable;
import org.apache.curator.test.InstanceSpec;
import org.apache.curator.test.TestingServer;
import org.junit.rules.TestRule;
import org.junit.runner.Description;
import org.junit.runners.model.Statement;
import org.junit.rules.ExternalResource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -39,41 +37,66 @@
/**
* Starts up a local Zookeeper and a Kafka broker
*/
public class KafkaJunitRule implements TestRule {
public class KafkaJunitRule extends ExternalResource {

private static final Logger LOGGER = LoggerFactory.getLogger(KafkaJunitRule.class);

private TestingServer zookeeper;
private KafkaServerStartable kafkaServer;

private int zookeeperPort;
private String zookeeperConnectionString;
private int kafkaPort = 9092;
private Path kafkaLogDir;

@Override
public Statement apply(final Statement statement, Description description) {
return new Statement() {
@Override
public void evaluate() throws Throwable {
try {
startKafkaServer();
statement.evaluate();
} finally {
stopKafkaServer();
}
}
};
}

private void startKafkaServer() throws Exception {
@Override
protected void before() throws Throwable {
zookeeper = new TestingServer(true);
String zkQuorumStr = zookeeper.getConnectString();
KafkaConfig kafkaConfig = buildKafkaConfig(zkQuorumStr);
zookeeperPort = zookeeper.getPort();
zookeeperConnectionString = zookeeper.getConnectString();
KafkaConfig kafkaConfig = buildKafkaConfig(zookeeperConnectionString);

LOGGER.info("Starting Kafka server with config: {}", kafkaConfig.props().props());
kafkaServer = new KafkaServerStartable(kafkaConfig);
kafkaServer.startup();
}

@Override
protected void after() {
try {
if (kafkaServer != null) {
LOGGER.info("Shutting down Kafka Server");
kafkaServer.shutdown();
}

if (zookeeper != null) {
LOGGER.info("Shutting down Zookeeper");
zookeeper.close();
}

if (Files.exists(kafkaLogDir)) {
LOGGER.info("Deleting the log dir: {}", kafkaLogDir);
Files.walkFileTree(kafkaLogDir, new SimpleFileVisitor<Path>() {
@Override
public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) throws IOException {
Files.deleteIfExists(file);
return FileVisitResult.CONTINUE;
}

@Override
public FileVisitResult postVisitDirectory(Path dir, IOException exc) throws IOException {
Files.deleteIfExists(dir);
return FileVisitResult.CONTINUE;
}
});
}
}
catch(Exception e){
LOGGER.error("Failed to clean-up Kafka",e);
}
}

private KafkaConfig buildKafkaConfig(String zookeeperQuorum) throws IOException {
kafkaLogDir = Files.createTempDirectory("kafka_junit");
kafkaPort = InstanceSpec.getRandomPort();
Expand All @@ -88,34 +111,6 @@ private KafkaConfig buildKafkaConfig(String zookeeperQuorum) throws IOException
return new KafkaConfig(props);
}

private void stopKafkaServer() throws IOException {
if (kafkaServer != null) {
LOGGER.info("Shutting down Kafka Server");
kafkaServer.shutdown();
}

if (zookeeper != null) {
LOGGER.info("Shutting down Zookeeper");
zookeeper.close();
}

if (Files.exists(kafkaLogDir)) {
LOGGER.info("Deleting the log dir: {}", kafkaLogDir);
Files.walkFileTree(kafkaLogDir, new SimpleFileVisitor<Path>() {
@Override
public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) throws IOException {
Files.deleteIfExists(file);
return FileVisitResult.CONTINUE;
}

@Override
public FileVisitResult postVisitDirectory(Path dir, IOException exc) throws IOException {
Files.deleteIfExists(dir);
return FileVisitResult.CONTINUE;
}
});
}
}

/**
* Create a producer configuration.
Expand All @@ -140,7 +135,7 @@ public ProducerConfig producerConfig() {
*/
public ConsumerConfig consumerConfig() {
Properties props = new Properties();
props.put("zookeeper.connect", zookeeper.getConnectString());
props.put("zookeeper.connect", zookeeperConnectionString);
props.put("group.id", "kafka-junit-consumer");
props.put("zookeeper.session.timeout.ms", "400");
props.put("zookeeper.sync.time.ms", "200");
Expand Down Expand Up @@ -170,14 +165,14 @@ public int kafkaBrokerPort(){
* @return zookeeper port
*/
public int zookeeperPort(){
return zookeeper.getPort();
return zookeeperPort;
}

/**
* Get the zookeeper connection string
* @return zookeeper connection string
*/
public String zookeeperConnectionString(){
return zookeeper.getConnectString();
return zookeeperConnectionString;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
/*
* Copyright 2015 Charith Ellawala
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.github.charithe.kafka;

import kafka.consumer.Consumer;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
import kafka.javaapi.producer.Producer;
import kafka.message.MessageAndMetadata;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;
import kafka.serializer.StringDecoder;
import org.junit.ClassRule;
import org.junit.Rule;
import org.junit.Test;

import java.util.HashMap;
import java.util.List;
import java.util.Map;

import static org.hamcrest.CoreMatchers.*;
import static org.junit.Assert.assertThat;


public class KafkaJunitClassRuleTest {

private static final String TOPIC = "topicY";
private static final String KEY = "keyY";
private static final String VALUE = "valueY";

@ClassRule
public static KafkaJunitRule kafkaRule = new KafkaJunitRule();

@Test
public void testKafkaServerIsUp() {
ProducerConfig conf = kafkaRule.producerConfig();
Producer<String, String> producer = new Producer<>(conf);
producer.send(new KeyedMessage<>(TOPIC, KEY, VALUE));
producer.close();


ConsumerConfig consumerConf = kafkaRule.consumerConfig();
ConsumerConnector consumer = Consumer.createJavaConsumerConnector(consumerConf);
Map<String, Integer> topicCountMap = new HashMap<>();
topicCountMap.put(TOPIC, 1);
Map<String, List<KafkaStream<String, String>>> consumerMap = consumer
.createMessageStreams(topicCountMap, new StringDecoder(consumerConf.props()),
new StringDecoder(consumerConf.props()));
List<KafkaStream<String, String>> streams = consumerMap.get(TOPIC);

assertThat(streams, is(notNullValue()));
assertThat(streams.size(), is(equalTo(1)));

KafkaStream<String, String> ks = streams.get(0);
ConsumerIterator<String, String> iterator = ks.iterator();
MessageAndMetadata<String, String> msg = iterator.next();

assertThat(msg, is(notNullValue()));
assertThat(msg.key(), is(equalTo(KEY)));
assertThat(msg.message(), is(equalTo(VALUE)));
}
}
22 changes: 14 additions & 8 deletions src/test/java/com/github/charithe/kafka/KafkaJunitRuleTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -39,34 +39,40 @@

public class KafkaJunitRuleTest {

private static final String TOPIC = "topicX";
private static final String KEY = "keyX";
private static final String VALUE = "valueX";

@Rule
public KafkaJunitRule kafkaRule = new KafkaJunitRule();

@Test
public void testKafkaServerIsUp() {
ProducerConfig conf = kafkaRule.producerConfig();
Producer<String, String> producer = new Producer<>(conf);
producer.send(new KeyedMessage<>("topic", "k1", "value1"));
producer.send(new KeyedMessage<>(TOPIC, KEY, VALUE));
producer.close();


ConsumerConfig consumerConf = kafkaRule.consumerConfig();
ConsumerConnector consumer = Consumer.createJavaConsumerConnector(consumerConf);

Map<String, Integer> topicCountMap = new HashMap<>();
topicCountMap.put("topic", 1);
Map<String, List<KafkaStream<String, String>>> consumerMap = consumer.createMessageStreams(topicCountMap, new StringDecoder(consumerConf.props()), new StringDecoder(consumerConf.props()));
List<KafkaStream<String, String>> streams = consumerMap.get("topic");

topicCountMap.put(TOPIC, 1);
Map<String, List<KafkaStream<String, String>>> consumerMap = consumer
.createMessageStreams(topicCountMap, new StringDecoder(consumerConf.props()),
new StringDecoder(consumerConf.props()));
List<KafkaStream<String, String>> streams = consumerMap.get(TOPIC);

assertThat(streams, is(notNullValue()));
assertThat(streams.size(), is(equalTo(1)));

KafkaStream<String, String> ks = streams.get(0);
ConsumerIterator<String, String> iterator = ks.iterator();
MessageAndMetadata<String, String> msg = iterator.next();

assertThat(msg, is(notNullValue()));
assertThat(msg.key(), is(equalTo("k1")));
assertThat(msg.message(), is(equalTo("value1")));
assertThat(msg.key(), is(equalTo(KEY)));
assertThat(msg.message(), is(equalTo(VALUE)));
}

}

0 comments on commit 0bb6586

Please sign in to comment.