diff --git a/build.gradle b/build.gradle index 3bb2d867..09a98d78 100644 --- a/build.gradle +++ b/build.gradle @@ -109,6 +109,8 @@ subprojects { ext.bootstrapVer = '3.3.5' ext.hibernateVer = '4.3.11.Final' + ext.kafkaClientsVer = '0.9.0.0' // ''0.8.2.2' + configurations.all { resolutionStrategy { failOnVersionConflict() @@ -435,7 +437,7 @@ project (':comsat-okhttp') { project (':comsat-kafka') { dependencies { - compile "org.apache.kafka:kafka-clients:0.8.2.2" + compile "org.apache.kafka:kafka-clients:$kafkaClientsVer" } } diff --git a/comsat-kafka/src/main/java/co/paralleluniverse/fibers/kafka/FiberKafkaProducer.java b/comsat-kafka/src/main/java/co/paralleluniverse/fibers/kafka/FiberKafkaProducer.java index 082f097e..a31eb693 100644 --- a/comsat-kafka/src/main/java/co/paralleluniverse/fibers/kafka/FiberKafkaProducer.java +++ b/comsat-kafka/src/main/java/co/paralleluniverse/fibers/kafka/FiberKafkaProducer.java @@ -1,6 +1,6 @@ /* * COMSAT - * Copyright (C) 2013-2015, Parallel Universe Software Co. All rights reserved. + * Copyright (C) 2013-2016, Parallel Universe Software Co. All rights reserved. * * This program and the accompanying materials are dual-licensed under * either the terms of the Eclipse Public License v1.0 as published by @@ -22,6 +22,7 @@ import java.util.List; import java.util.Map; import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; public class FiberKafkaProducer implements Producer { @@ -38,11 +39,16 @@ public Future send(ProducerRecord record) { @Override public Future send(ProducerRecord record, Callback callback) { - SettableFuture future = new SettableFuture<>(); + final SettableFuture future = new SettableFuture<>(); producer.send(record, new CallbackWrapper(future, callback)); return future; } + @Override + public void flush() { + producer.flush(); + } + @Override public List partitionsFor(String topic) { return producer.partitionsFor(topic); @@ -58,6 +64,11 @@ public void close() { producer.close(); } + @Override + public void close(long timeout, TimeUnit unit) { + producer.close(timeout, unit); + } + private static class CallbackWrapper implements Callback { private final SettableFuture future; diff --git a/comsat-kafka/src/test/java/co/paralleluniverse/fibers/kafka/FiberKafkaProducerTest.java b/comsat-kafka/src/test/java/co/paralleluniverse/fibers/kafka/FiberKafkaProducerTest.java index ea6e6a27..e1d23048 100644 --- a/comsat-kafka/src/test/java/co/paralleluniverse/fibers/kafka/FiberKafkaProducerTest.java +++ b/comsat-kafka/src/test/java/co/paralleluniverse/fibers/kafka/FiberKafkaProducerTest.java @@ -1,6 +1,6 @@ /* * COMSAT - * Copyright (C) 2013-2015, Parallel Universe Software Co. All rights reserved. + * Copyright (C) 2013-2016, Parallel Universe Software Co. All rights reserved. * * This program and the accompanying materials are dual-licensed under * either the terms of the Eclipse Public License v1.0 as published by @@ -20,6 +20,7 @@ import org.apache.kafka.clients.producer.MockProducer; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; +import org.apache.kafka.common.serialization.ByteArraySerializer; import org.junit.Before; import org.junit.Test; @@ -34,13 +35,13 @@ public class FiberKafkaProducerTest { - private MockProducer mockProducer; + private MockProducer mockProducer; private FiberKafkaProducer fiberProducer; private co.paralleluniverse.strands.concurrent.Phaser phaser; @Before public void setUp() { - mockProducer = new MockProducer(false); + mockProducer = new MockProducer<>(false, new ByteArraySerializer(), new ByteArraySerializer()); fiberProducer = new FiberKafkaProducer<>(mockProducer); phaser = new co.paralleluniverse.strands.concurrent.Phaser(2); }