Skip to content

Commit

Permalink
Implement ack and nack methods, add javadoc and tests (#1027)
Browse files Browse the repository at this point in the history
  • Loading branch information
mziccard committed Jun 1, 2016
1 parent f4055bd commit 6df07ba
Show file tree
Hide file tree
Showing 3 changed files with 276 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -387,25 +387,107 @@ interface MessageConsumer extends AutoCloseable {

MessageConsumer pullAsync(String subscription, MessageProcessor callback, PullOption... options);

/**
* Acknowledges the given messages for the provided subscription. Ack ids identify the messages to
* acknowledge, as returned in {@link ReceivedMessage#ackId()} by {@link #pull(String, int)} and
* {@link #pullAsync(String, int)}.
*
* @param subscription the subscription whose messages must be acknowledged
* @param ackId the ack id of the first message to acknowledge
* @param ackIds other ack ids of messages to acknowledge
* @throws PubSubException upon failure, or if the subscription was not found
*/
void ack(String subscription, String ackId, String... ackIds);

/**
* Sends a request to acknowledge the given messages for the provided subscription. Ack ids
* identify the messages to acknowledge, as returned in {@link ReceivedMessage#ackId()} by
* {@link #pull(String, int)} and {@link #pullAsync(String, int)}. The method returns a
* {@code Future} object that can be used to wait for the acknowledge operation to be completed.
*
* @param subscription the subscription whose messages must be acknowledged
* @param ackId the ack id of the first message to acknowledge
* @param ackIds other ack ids of messages to acknowledge
*/
Future<Void> ackAsync(String subscription, String ackId, String... ackIds);

/**
* Acknowledges the given messages for the provided subscription. Ack ids identify the messages to
* acknowledge, as returned in {@link ReceivedMessage#ackId()} by {@link #pull(String, int)} and
* {@link #pullAsync(String, int)}.
*
* @param subscription the subscription whose messages must be acknowledged
* @param ackIds the ack ids of messages to acknowledge
* @throws PubSubException upon failure, or if the subscription was not found
*/
void ack(String subscription, Iterable<String> ackIds);

/**
* Sends a request to acknowledge the given messages for the provided subscription. Ack ids
* identify the messages to acknowledge, as returned in {@link ReceivedMessage#ackId()} by
* {@link #pull(String, int)} and {@link #pullAsync(String, int)}. The method returns a
* {@code Future} object that can be used to wait for the acknowledge operation to be completed.
*
* @param subscription the subscription whose messages must be acknowledged
* @param ackIds the ack ids of messages to acknowledge
*/
Future<Void> ackAsync(String subscription, Iterable<String> ackIds);

/**
* "Nacks" the given messages for the provided subscription. Ack ids identify the messages to
* "nack", as returned in {@link ReceivedMessage#ackId()} by {@link #pull(String, int)} and
* {@link #pullAsync(String, int)}. This method corresponds to calling
* {@link #modifyAckDeadline(String, int, TimeUnit, String, String...)} with a deadline of 0.
*
* @param subscription the subscription whose messages must be "nacked"
* @param ackId the ack id of the first message to "nack"
* @param ackIds other ack ids of messages to "nack"
* @throws PubSubException upon failure, or if the subscription was not found
*/
void nack(String subscription, String ackId, String... ackIds);

/**
* Sends a request to "nack" the given messages for the provided subscription. Ack ids identify
* the messages to "nack", as returned in {@link ReceivedMessage#ackId()} by
* {@link #pull(String, int)} and {@link #pullAsync(String, int)}. This method corresponds to
* calling {@link #modifyAckDeadlineAsync(String, int, TimeUnit, String, String...)} with a
* deadline of 0. The method returns a {@code Future} object that can be used to wait for the
* "nack" operation to be completed.
*
* @param subscription the subscription whose messages must be "nacked"
* @param ackId the ack id of the first message to "nack"
* @param ackIds other ack ids of messages to "nack"
*/
Future<Void> nackAsync(String subscription, String ackId, String... ackIds);

/**
* "Nacks" the given messages for the provided subscription. Ack ids identify the messages to
* "nack", as returned in {@link ReceivedMessage#ackId()} by {@link #pull(String, int)} and
* {@link #pullAsync(String, int)}. This method corresponds to calling
* {@link #modifyAckDeadline(String, int, TimeUnit, Iterable)} with a deadline of 0.
*
* @param subscription the subscription whose messages must be "nacked"
* @param ackIds the ack ids of messages to "nack"
* @throws PubSubException upon failure, or if the subscription was not found
*/
void nack(String subscription, Iterable<String> ackIds);

/**
* Sends a request to "nack" the given messages for the provided subscription. Ack ids identify
* the messages to "nack", as returned in {@link ReceivedMessage#ackId()} by
* {@link #pull(String, int)} and {@link #pullAsync(String, int)}. This method corresponds to
* calling {@link #modifyAckDeadlineAsync(String, int, TimeUnit, Iterable)} with a deadline of 0.
* The method returns a {@code Future} object that can be used to wait for the "nack" operation to
* be completed.
*
* @param subscription the subscription whose messages must be "nacked"
* @param ackIds the ack ids of messages to "nack"
*/
Future<Void> nackAsync(String subscription, Iterable<String> ackIds);

/**
* Modifies the acknowledge deadline of the given messages. {@code deadline} must be >= 0 and is
* the new deadline with respect to the time the modify request was received by the Pub/Sub
* Modifies the acknowledge deadline of the given messages. {@code deadline} must be &gt;= 0 and
* is the new deadline with respect to the time the modify request was received by the Pub/Sub
* service. For example, if {@code deadline} is 10 and {@code unit} is {@link TimeUnit#SECONDS},
* the new ack deadline will expire 10 seconds after the modify request was received by the
* service. Specifying 0 may be used to make the message available for another pull request
Expand All @@ -425,8 +507,8 @@ void modifyAckDeadline(String subscription, int deadline, TimeUnit unit, String

/**
* Sends a request to modify the acknowledge deadline of the given messages. {@code deadline}
* must be >= 0 and is the new deadline with respect to the time the modify request was received
* by the Pub/Sub service. For example, if {@code deadline} is 10 and {@code unit} is
* must be &gt;= 0 and is the new deadline with respect to the time the modify request was
* received by the Pub/Sub service. For example, if {@code deadline} is 10 and {@code unit} is
* {@link TimeUnit#SECONDS}, the new ack deadline will expire 10 seconds after the modify request
* was received by the service. Specifying 0 may be used to make the message available for another
* pull request (corresponds to calling {@link #nackAsync(String, Iterable)}). The method returns
Expand All @@ -444,8 +526,8 @@ Future<Void> modifyAckDeadlineAsync(String subscription, int deadline, TimeUnit
String ackId, String... ackIds);

/**
* Modifies the acknowledge deadline of the given messages. {@code deadline} must be >= 0 and is
* the new deadline with respect to the time the modify request was received by the Pub/Sub
* Modifies the acknowledge deadline of the given messages. {@code deadline} must be &gt;= 0 and
* is the new deadline with respect to the time the modify request was received by the Pub/Sub
* service. For example, if {@code deadline} is 10 and {@code unit} is {@link TimeUnit#SECONDS},
* the new ack deadline will expire 10 seconds after the modify request was received by the
* service. Specifying 0 may be used to make the message available for another pull request
Expand All @@ -462,8 +544,8 @@ Future<Void> modifyAckDeadlineAsync(String subscription, int deadline, TimeUnit

/**
* Sends a request to modify the acknowledge deadline of the given messages. {@code deadline}
* must be >= 0 and is the new deadline with respect to the time the modify request was received
* by the Pub/Sub service. For example, if {@code deadline} is 10 and {@code unit} is
* must be &gt;= 0 and is the new deadline with respect to the time the modify request was
* received by the Pub/Sub service. For example, if {@code deadline} is 10 and {@code unit} is
* {@link TimeUnit#SECONDS}, the new ack deadline will expire 10 seconds after the modify request
* was received by the service. Specifying 0 may be used to make the message available for another
* pull request (corresponds to calling {@link #nackAsync(String, Iterable)}). The method returns
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import com.google.common.collect.Maps;
import com.google.common.util.concurrent.Uninterruptibles;
import com.google.protobuf.Empty;
import com.google.pubsub.v1.AcknowledgeRequest;
import com.google.pubsub.v1.DeleteSubscriptionRequest;
import com.google.pubsub.v1.DeleteTopicRequest;
import com.google.pubsub.v1.GetSubscriptionRequest;
Expand Down Expand Up @@ -466,40 +467,46 @@ public MessageConsumer pullAsync(String subscription, MessageProcessor callback,

@Override
public void ack(String subscription, String ackId, String... ackIds) {
ack(subscription, Lists.asList(ackId, ackIds));
}

@Override
public Future<Void> ackAsync(String subscription, String ackId, String... ackIds) {
return null;
return ackAsync(subscription, Lists.asList(ackId, ackIds));
}

@Override
public void ack(String subscription, Iterable<String> ackIds) {

get(ackAsync(subscription, ackIds));
}

@Override
public Future<Void> ackAsync(String subscription, Iterable<String> ackIds) {
return null;
AcknowledgeRequest request = AcknowledgeRequest.newBuilder()
.setSubscription(SubscriberApi.formatSubscriptionName(options().projectId(), subscription))
.addAllAckIds(ackIds)
.build();
return lazyTransform(rpc.acknowledge(request), EMPTY_TO_VOID_FUNCTION);
}

@Override
public void nack(String subscription, String ackId, String... ackIds) {
nack(subscription, Lists.asList(ackId, ackIds));
}

@Override
public Future<Void> nackAsync(String subscription, String ackId, String... ackIds) {
return null;
return nackAsync(subscription, Lists.asList(ackId, ackIds));
}

@Override
public void nack(String subscription, Iterable<String> ackIds) {

get(nackAsync(subscription, ackIds));
}

@Override
public Future<Void> nackAsync(String subscription, Iterable<String> ackIds) {
return null;
return modifyAckDeadlineAsync(subscription, 0, TimeUnit.SECONDS, ackIds);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.Futures;
import com.google.protobuf.Empty;
import com.google.pubsub.v1.AcknowledgeRequest;
import com.google.pubsub.v1.DeleteSubscriptionRequest;
import com.google.pubsub.v1.DeleteTopicRequest;
import com.google.pubsub.v1.GetSubscriptionRequest;
Expand Down Expand Up @@ -1189,6 +1190,178 @@ public void testListTopicSubscriptionsAsyncWithOptions()
Iterables.toArray(page.values(), SubscriptionId.class));
}

@Test
public void testAckOneMessage() {
pubsub = options.service();
AcknowledgeRequest request = AcknowledgeRequest.newBuilder()
.setSubscription(SUBSCRIPTION_NAME_PB)
.addAckIds("ackId")
.build();
Future<Empty> response = Futures.immediateFuture(Empty.getDefaultInstance());
EasyMock.expect(pubsubRpcMock.acknowledge(request)).andReturn(response);
EasyMock.replay(pubsubRpcMock);
pubsub.ack(SUBSCRIPTION, "ackId");
}

@Test
public void testAckOneMessageAsync() throws ExecutionException, InterruptedException {
pubsub = options.service();
AcknowledgeRequest request = AcknowledgeRequest.newBuilder()
.setSubscription(SUBSCRIPTION_NAME_PB)
.addAckIds("ackId")
.build();
Future<Empty> response = Futures.immediateFuture(Empty.getDefaultInstance());
EasyMock.expect(pubsubRpcMock.acknowledge(request)).andReturn(response);
EasyMock.replay(pubsubRpcMock);
Future<Void> future = pubsub.ackAsync(SUBSCRIPTION, "ackId");
assertNull(future.get());
}

@Test
public void testAckMoreMessages() {
pubsub = options.service();
AcknowledgeRequest request = AcknowledgeRequest.newBuilder()
.setSubscription(SUBSCRIPTION_NAME_PB)
.addAllAckIds(ImmutableList.of("ackId1", "ackId2"))
.build();
Future<Empty> response = Futures.immediateFuture(Empty.getDefaultInstance());
EasyMock.expect(pubsubRpcMock.acknowledge(request)).andReturn(response);
EasyMock.replay(pubsubRpcMock);
pubsub.ack(SUBSCRIPTION, "ackId1", "ackId2");
}

@Test
public void testAckMoreMessagesAsync() throws ExecutionException, InterruptedException {
pubsub = options.service();
AcknowledgeRequest request = AcknowledgeRequest.newBuilder()
.setSubscription(SUBSCRIPTION_NAME_PB)
.addAllAckIds(ImmutableList.of("ackId1", "ackId2"))
.build();
Future<Empty> response = Futures.immediateFuture(Empty.getDefaultInstance());
EasyMock.expect(pubsubRpcMock.acknowledge(request)).andReturn(response);
EasyMock.replay(pubsubRpcMock);
Future<Void> future = pubsub.ackAsync(SUBSCRIPTION, "ackId1", "ackId2");
assertNull(future.get());
}

@Test
public void testAckMessageList() {
pubsub = options.service();
List<String> ackIds = ImmutableList.of("ackId1", "ackId2");
AcknowledgeRequest request = AcknowledgeRequest.newBuilder()
.setSubscription(SUBSCRIPTION_NAME_PB)
.addAllAckIds(ackIds)
.build();
Future<Empty> response = Futures.immediateFuture(Empty.getDefaultInstance());
EasyMock.expect(pubsubRpcMock.acknowledge(request)).andReturn(response);
EasyMock.replay(pubsubRpcMock);
pubsub.ack(SUBSCRIPTION, ackIds);
}

@Test
public void testAckMessageListAsync() throws ExecutionException, InterruptedException {
pubsub = options.service();
List<String> ackIds = ImmutableList.of("ackId1", "ackId2");
AcknowledgeRequest request = AcknowledgeRequest.newBuilder()
.setSubscription(SUBSCRIPTION_NAME_PB)
.addAllAckIds(ackIds)
.build();
Future<Empty> response = Futures.immediateFuture(Empty.getDefaultInstance());
EasyMock.expect(pubsubRpcMock.acknowledge(request)).andReturn(response);
EasyMock.replay(pubsubRpcMock);
Future<Void> future = pubsub.ackAsync(SUBSCRIPTION, ackIds);
assertNull(future.get());
}

@Test
public void testNackOneMessage() {
pubsub = options.service();
ModifyAckDeadlineRequest request = ModifyAckDeadlineRequest.newBuilder()
.setAckDeadlineSeconds(0)
.setSubscription(SUBSCRIPTION_NAME_PB)
.addAckIds("ackId")
.build();
Future<Empty> response = Futures.immediateFuture(Empty.getDefaultInstance());
EasyMock.expect(pubsubRpcMock.modify(request)).andReturn(response);
EasyMock.replay(pubsubRpcMock);
pubsub.nack(SUBSCRIPTION, "ackId");
}

@Test
public void testNackOneMessageAsync() throws ExecutionException, InterruptedException {
pubsub = options.service();
ModifyAckDeadlineRequest request = ModifyAckDeadlineRequest.newBuilder()
.setAckDeadlineSeconds(0)
.setSubscription(SUBSCRIPTION_NAME_PB)
.addAckIds("ackId")
.build();
Future<Empty> response = Futures.immediateFuture(Empty.getDefaultInstance());
EasyMock.expect(pubsubRpcMock.modify(request)).andReturn(response);
EasyMock.replay(pubsubRpcMock);
Future<Void> future = pubsub.nackAsync(SUBSCRIPTION, "ackId");
assertNull(future.get());
}

@Test
public void testNackMoreMessages() {
pubsub = options.service();
ModifyAckDeadlineRequest request = ModifyAckDeadlineRequest.newBuilder()
.setAckDeadlineSeconds(0)
.setSubscription(SUBSCRIPTION_NAME_PB)
.addAllAckIds(ImmutableList.of("ackId1", "ackId2"))
.build();
Future<Empty> response = Futures.immediateFuture(Empty.getDefaultInstance());
EasyMock.expect(pubsubRpcMock.modify(request)).andReturn(response);
EasyMock.replay(pubsubRpcMock);
pubsub.nack(SUBSCRIPTION, "ackId1", "ackId2");
}

@Test
public void testNackMoreMessagesAsync() throws ExecutionException, InterruptedException {
pubsub = options.service();
ModifyAckDeadlineRequest request = ModifyAckDeadlineRequest.newBuilder()
.setAckDeadlineSeconds(0)
.setSubscription(SUBSCRIPTION_NAME_PB)
.addAllAckIds(ImmutableList.of("ackId1", "ackId2"))
.build();
Future<Empty> response = Futures.immediateFuture(Empty.getDefaultInstance());
EasyMock.expect(pubsubRpcMock.modify(request)).andReturn(response);
EasyMock.replay(pubsubRpcMock);
Future<Void> future = pubsub.nackAsync(SUBSCRIPTION, "ackId1", "ackId2");
assertNull(future.get());
}

@Test
public void testNackMessageList() {
pubsub = options.service();
List<String> ackIds = ImmutableList.of("ackId1", "ackId2");
ModifyAckDeadlineRequest request = ModifyAckDeadlineRequest.newBuilder()
.setAckDeadlineSeconds(0)
.setSubscription(SUBSCRIPTION_NAME_PB)
.addAllAckIds(ackIds)
.build();
Future<Empty> response = Futures.immediateFuture(Empty.getDefaultInstance());
EasyMock.expect(pubsubRpcMock.modify(request)).andReturn(response);
EasyMock.replay(pubsubRpcMock);
pubsub.nack(SUBSCRIPTION, ackIds);
}

@Test
public void testNackMessageListAsync() throws ExecutionException, InterruptedException {
pubsub = options.service();
List<String> ackIds = ImmutableList.of("ackId1", "ackId2");
ModifyAckDeadlineRequest request = ModifyAckDeadlineRequest.newBuilder()
.setAckDeadlineSeconds(0)
.setSubscription(SUBSCRIPTION_NAME_PB)
.addAllAckIds(ackIds)
.build();
Future<Empty> response = Futures.immediateFuture(Empty.getDefaultInstance());
EasyMock.expect(pubsubRpcMock.modify(request)).andReturn(response);
EasyMock.replay(pubsubRpcMock);
Future<Void> future = pubsub.nackAsync(SUBSCRIPTION, ackIds);
assertNull(future.get());
}

@Test
public void testModifyAckDeadlineOneMessage() {
pubsub = options.service();
Expand Down

0 comments on commit 6df07ba

Please sign in to comment.