Skip to content

Commit

Permalink
Fix MessageConsumerImpl tests
Browse files Browse the repository at this point in the history
  • Loading branch information
mziccard committed Jun 21, 2016
1 parent 092acfc commit 61bdd81
Showing 1 changed file with 67 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;

public class MessageConsumerImplTest {

Expand Down Expand Up @@ -213,20 +214,33 @@ public void testMessageConsumerMultipleCallsAck() throws Exception {
PullResponse response1 = PullResponse.newBuilder()
.addReceivedMessages(MESSAGE1_PB)
.build();
PullResponse response2 = PullResponse.newBuilder()
final PullResponse response2 = PullResponse.newBuilder()
.addReceivedMessages(MESSAGE2_PB)
.build();
EasyMock.expect(options.rpc()).andReturn(pubsubRpc);
EasyMock.expect(options.service()).andReturn(pubsub);
EasyMock.expect(options.projectId()).andReturn(PROJECT).anyTimes();
final CountDownLatch nextPullLatch = new CountDownLatch(1);
final CountDownLatch latch = new CountDownLatch(2);
EasyMock.expect(pubsub.options()).andReturn(options);
EasyMock.expect(pubsub.ackAsync(SUBSCRIPTION, ACK_ID1)).andReturn(null);
EasyMock.expect(pubsub.ackAsync(SUBSCRIPTION, ACK_ID1)).andAnswer(new IAnswer<Future<Void>>() {
@Override
public Future<Void> answer() throws Throwable {
nextPullLatch.await();
return null;
}
});
EasyMock.expect(pubsub.options()).andReturn(options);
EasyMock.expect(pubsub.ackAsync(SUBSCRIPTION, ACK_ID2)).andReturn(null);
EasyMock.replay(pubsub);
EasyMock.expect(pubsubRpc.pull(request1)).andReturn(new TestPullFuture(response1));
EasyMock.expect(pubsubRpc.pull(request2)).andReturn(new TestPullFuture(response2));
EasyMock.expect(pubsubRpc.pull(request2)).andAnswer(new IAnswer<PullFuture>() {
@Override
public PullFuture answer() throws Throwable {
nextPullLatch.countDown();
return new TestPullFuture(response2);
}
});
EasyMock.expect(pubsubRpc.pull(EasyMock.<PullRequest>anyObject()))
.andReturn(new TestPullFuture(EMPTY_RESPONSE)).anyTimes();
renewer.add(SUBSCRIPTION, ACK_ID1);
Expand All @@ -253,20 +267,33 @@ public void testMessageConsumerMultipleCallsNack() throws Exception {
PullResponse response1 = PullResponse.newBuilder()
.addReceivedMessages(MESSAGE1_PB)
.build();
PullResponse response2 = PullResponse.newBuilder()
final PullResponse response2 = PullResponse.newBuilder()
.addReceivedMessages(MESSAGE2_PB)
.build();
EasyMock.expect(options.rpc()).andReturn(pubsubRpc);
EasyMock.expect(options.service()).andReturn(pubsub);
EasyMock.expect(options.projectId()).andReturn(PROJECT).anyTimes();
final CountDownLatch nextPullLatch = new CountDownLatch(1);
final CountDownLatch latch = new CountDownLatch(2);
EasyMock.expect(pubsub.options()).andReturn(options);
EasyMock.expect(pubsub.nackAsync(SUBSCRIPTION, ACK_ID1)).andReturn(null);
EasyMock.expect(pubsub.nackAsync(SUBSCRIPTION, ACK_ID1)).andAnswer(new IAnswer<Future<Void>>() {
@Override
public Future<Void> answer() throws Throwable {
nextPullLatch.await();
return null;
}
});
EasyMock.expect(pubsub.options()).andReturn(options);
EasyMock.expect(pubsub.nackAsync(SUBSCRIPTION, ACK_ID2)).andReturn(null);
EasyMock.replay(pubsub);
EasyMock.expect(pubsubRpc.pull(request1)).andReturn(new TestPullFuture(response1));
EasyMock.expect(pubsubRpc.pull(request2)).andReturn(new TestPullFuture(response2));
EasyMock.expect(pubsubRpc.pull(request2)).andAnswer(new IAnswer<PullFuture>() {
@Override
public PullFuture answer() throws Throwable {
nextPullLatch.countDown();
return new TestPullFuture(response2);
}
});
EasyMock.expect(pubsubRpc.pull(EasyMock.<PullRequest>anyObject()))
.andReturn(new TestPullFuture(EMPTY_RESPONSE)).anyTimes();
renewer.add(SUBSCRIPTION, ACK_ID1);
Expand All @@ -289,22 +316,35 @@ public void testMessageConsumerMultipleCallsNack() throws Exception {
@Test
public void testMessageConsumerMaxCallbacksAck() throws Exception {
PullRequest request1 = pullRequest(2);
PullRequest request2 = pullRequest(2);
PullResponse otherPullResponse = PullResponse.newBuilder()
PullRequest request2 = pullRequest(1);
final PullResponse otherPullResponse = PullResponse.newBuilder()
.addReceivedMessages(MESSAGE1_PB)
.build();
EasyMock.expect(options.rpc()).andReturn(pubsubRpc);
EasyMock.expect(options.service()).andReturn(pubsub);
EasyMock.expect(options.projectId()).andReturn(PROJECT).anyTimes();
EasyMock.expect(pubsub.options()).andReturn(options).times(2);
final CountDownLatch nextPullLatch = new CountDownLatch(1);
final CountDownLatch latch = new CountDownLatch(3);
EasyMock.expect(pubsub.ackAsync(SUBSCRIPTION, ACK_ID1)).andReturn(null);
EasyMock.expect(pubsub.ackAsync(SUBSCRIPTION, ACK_ID2)).andReturn(null);
EasyMock.expect(pubsub.ackAsync(SUBSCRIPTION, ACK_ID2)).andAnswer(new IAnswer<Future<Void>>() {
@Override
public Future<Void> answer() throws Throwable {
nextPullLatch.await();
return null;
}
});
EasyMock.expect(pubsub.options()).andReturn(options);
EasyMock.expect(pubsub.ackAsync(SUBSCRIPTION, ACK_ID1)).andReturn(null);
EasyMock.replay(pubsub);
EasyMock.expect(pubsubRpc.pull(request1)).andReturn(new TestPullFuture(PULL_RESPONSE));
EasyMock.expect(pubsubRpc.pull(request2)).andReturn(new TestPullFuture(otherPullResponse));
EasyMock.expect(pubsubRpc.pull(request2)).andAnswer(new IAnswer<PullFuture>() {
@Override
public PullFuture answer() throws Throwable {
nextPullLatch.countDown();
return new TestPullFuture(otherPullResponse);
}
});
EasyMock.expect(pubsubRpc.pull(EasyMock.<PullRequest>anyObject()))
.andReturn(new TestPullFuture(EMPTY_RESPONSE)).anyTimes();
renewer.add(SUBSCRIPTION, ACK_ID1);
Expand All @@ -331,22 +371,35 @@ public void testMessageConsumerMaxCallbacksAck() throws Exception {
@Test
public void testMessageConsumerMaxCallbacksNack() throws Exception {
PullRequest request1 = pullRequest(2);
PullRequest request2 = pullRequest(2);
PullResponse otherPullResponse = PullResponse.newBuilder()
PullRequest request2 = pullRequest(1);
final PullResponse otherPullResponse = PullResponse.newBuilder()
.addReceivedMessages(MESSAGE1_PB)
.build();
EasyMock.expect(options.rpc()).andReturn(pubsubRpc);
EasyMock.expect(options.service()).andReturn(pubsub);
EasyMock.expect(options.projectId()).andReturn(PROJECT).anyTimes();
EasyMock.expect(pubsub.options()).andReturn(options).times(2);
final CountDownLatch nextPullLatch = new CountDownLatch(1);
final CountDownLatch latch = new CountDownLatch(3);
EasyMock.expect(pubsub.nackAsync(SUBSCRIPTION, ACK_ID1)).andReturn(null);
EasyMock.expect(pubsub.nackAsync(SUBSCRIPTION, ACK_ID2)).andReturn(null);
EasyMock.expect(pubsub.nackAsync(SUBSCRIPTION, ACK_ID2)).andAnswer(new IAnswer<Future<Void>>() {
@Override
public Future<Void> answer() throws Throwable {
nextPullLatch.await();
return null;
}
});
EasyMock.expect(pubsub.options()).andReturn(options);
EasyMock.expect(pubsub.nackAsync(SUBSCRIPTION, ACK_ID1)).andReturn(null);
EasyMock.replay(pubsub);
EasyMock.expect(pubsubRpc.pull(request1)).andReturn(new TestPullFuture(PULL_RESPONSE));
EasyMock.expect(pubsubRpc.pull(request2)).andReturn(new TestPullFuture(otherPullResponse));
EasyMock.expect(pubsubRpc.pull(request2)).andAnswer(new IAnswer<PullFuture>() {
@Override
public PullFuture answer() throws Throwable {
nextPullLatch.countDown();
return new TestPullFuture(otherPullResponse);
}
});
EasyMock.expect(pubsubRpc.pull(EasyMock.<PullRequest>anyObject()))
.andReturn(new TestPullFuture(EMPTY_RESPONSE)).anyTimes();
renewer.add(SUBSCRIPTION, ACK_ID1);
Expand Down

0 comments on commit 61bdd81

Please sign in to comment.