Skip to content

Commit

Permalink
⬆️ Upgrade Kafka to 0.9.0.0
Browse files Browse the repository at this point in the history
  • Loading branch information
circlespainter committed Feb 10, 2016
1 parent 88cdb54 commit 289f918
Show file tree
Hide file tree
Showing 3 changed files with 20 additions and 6 deletions.
4 changes: 3 additions & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,8 @@ subprojects {
ext.bootstrapVer = '3.3.5'
ext.hibernateVer = '4.3.11.Final'

ext.kafkaClientsVer = '0.9.0.0' // ''0.8.2.2'

configurations.all {
resolutionStrategy {
failOnVersionConflict()
Expand Down Expand Up @@ -435,7 +437,7 @@ project (':comsat-okhttp') {

project (':comsat-kafka') {
dependencies {
compile "org.apache.kafka:kafka-clients:0.8.2.2"
compile "org.apache.kafka:kafka-clients:$kafkaClientsVer"
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
/*
* COMSAT
* Copyright (C) 2013-2015, Parallel Universe Software Co. All rights reserved.
* Copyright (C) 2013-2016, Parallel Universe Software Co. All rights reserved.
*
* This program and the accompanying materials are dual-licensed under
* either the terms of the Eclipse Public License v1.0 as published by
Expand All @@ -22,6 +22,7 @@
import java.util.List;
import java.util.Map;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;

public class FiberKafkaProducer<K, V> implements Producer<K, V> {

Expand All @@ -38,11 +39,16 @@ public Future<RecordMetadata> send(ProducerRecord<K, V> record) {

@Override
public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback) {
SettableFuture<RecordMetadata> future = new SettableFuture<>();
final SettableFuture<RecordMetadata> future = new SettableFuture<>();
producer.send(record, new CallbackWrapper(future, callback));
return future;
}

@Override
public void flush() {
producer.flush();
}

@Override
public List<PartitionInfo> partitionsFor(String topic) {
return producer.partitionsFor(topic);
Expand All @@ -58,6 +64,11 @@ public void close() {
producer.close();
}

@Override
public void close(long timeout, TimeUnit unit) {
producer.close(timeout, unit);
}

private static class CallbackWrapper implements Callback {

private final SettableFuture<RecordMetadata> future;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
/*
* COMSAT
* Copyright (C) 2013-2015, Parallel Universe Software Co. All rights reserved.
* Copyright (C) 2013-2016, Parallel Universe Software Co. All rights reserved.
*
* This program and the accompanying materials are dual-licensed under
* either the terms of the Eclipse Public License v1.0 as published by
Expand All @@ -20,6 +20,7 @@
import org.apache.kafka.clients.producer.MockProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.junit.Before;
import org.junit.Test;

Expand All @@ -34,13 +35,13 @@

public class FiberKafkaProducerTest {

private MockProducer mockProducer;
private MockProducer<byte[], byte[]> mockProducer;
private FiberKafkaProducer<byte[], byte[]> fiberProducer;
private co.paralleluniverse.strands.concurrent.Phaser phaser;

@Before
public void setUp() {
mockProducer = new MockProducer(false);
mockProducer = new MockProducer<>(false, new ByteArraySerializer(), new ByteArraySerializer());
fiberProducer = new FiberKafkaProducer<>(mockProducer);
phaser = new co.paralleluniverse.strands.concurrent.Phaser(2);
}
Expand Down

0 comments on commit 289f918

Please sign in to comment.