diff --git a/gcloud-java-pubsub/src/main/java/com/google/cloud/pubsub/PubSub.java b/gcloud-java-pubsub/src/main/java/com/google/cloud/pubsub/PubSub.java index e6b496be8e5f..428f74b64024 100644 --- a/gcloud-java-pubsub/src/main/java/com/google/cloud/pubsub/PubSub.java +++ b/gcloud-java-pubsub/src/main/java/com/google/cloud/pubsub/PubSub.java @@ -387,25 +387,107 @@ interface MessageConsumer extends AutoCloseable { MessageConsumer pullAsync(String subscription, MessageProcessor callback, PullOption... options); + /** + * Acknowledges the given messages for the provided subscription. Ack ids identify the messages to + * acknowledge, as returned in {@link ReceivedMessage#ackId()} by {@link #pull(String, int)} and + * {@link #pullAsync(String, int)}. + * + * @param subscription the subscription whose messages must be acknowledged + * @param ackId the ack id of the first message to acknowledge + * @param ackIds other ack ids of messages to acknowledge + * @throws PubSubException upon failure, or if the subscription was not found + */ void ack(String subscription, String ackId, String... ackIds); + /** + * Sends a request to acknowledge the given messages for the provided subscription. Ack ids + * identify the messages to acknowledge, as returned in {@link ReceivedMessage#ackId()} by + * {@link #pull(String, int)} and {@link #pullAsync(String, int)}. The method returns a + * {@code Future} object that can be used to wait for the acknowledge operation to be completed. + * + * @param subscription the subscription whose messages must be acknowledged + * @param ackId the ack id of the first message to acknowledge + * @param ackIds other ack ids of messages to acknowledge + */ Future ackAsync(String subscription, String ackId, String... ackIds); + /** + * Acknowledges the given messages for the provided subscription. Ack ids identify the messages to + * acknowledge, as returned in {@link ReceivedMessage#ackId()} by {@link #pull(String, int)} and + * {@link #pullAsync(String, int)}. + * + * @param subscription the subscription whose messages must be acknowledged + * @param ackIds the ack ids of messages to acknowledge + * @throws PubSubException upon failure, or if the subscription was not found + */ void ack(String subscription, Iterable ackIds); + /** + * Sends a request to acknowledge the given messages for the provided subscription. Ack ids + * identify the messages to acknowledge, as returned in {@link ReceivedMessage#ackId()} by + * {@link #pull(String, int)} and {@link #pullAsync(String, int)}. The method returns a + * {@code Future} object that can be used to wait for the acknowledge operation to be completed. + * + * @param subscription the subscription whose messages must be acknowledged + * @param ackIds the ack ids of messages to acknowledge + */ Future ackAsync(String subscription, Iterable ackIds); + /** + * "Nacks" the given messages for the provided subscription. Ack ids identify the messages to + * "nack", as returned in {@link ReceivedMessage#ackId()} by {@link #pull(String, int)} and + * {@link #pullAsync(String, int)}. This method corresponds to calling + * {@link #modifyAckDeadline(String, int, TimeUnit, String, String...)} with a deadline of 0. + * + * @param subscription the subscription whose messages must be "nacked" + * @param ackId the ack id of the first message to "nack" + * @param ackIds other ack ids of messages to "nack" + * @throws PubSubException upon failure, or if the subscription was not found + */ void nack(String subscription, String ackId, String... ackIds); + /** + * Sends a request to "nack" the given messages for the provided subscription. Ack ids identify + * the messages to "nack", as returned in {@link ReceivedMessage#ackId()} by + * {@link #pull(String, int)} and {@link #pullAsync(String, int)}. This method corresponds to + * calling {@link #modifyAckDeadlineAsync(String, int, TimeUnit, String, String...)} with a + * deadline of 0. The method returns a {@code Future} object that can be used to wait for the + * "nack" operation to be completed. + * + * @param subscription the subscription whose messages must be "nacked" + * @param ackId the ack id of the first message to "nack" + * @param ackIds other ack ids of messages to "nack" + */ Future nackAsync(String subscription, String ackId, String... ackIds); + /** + * "Nacks" the given messages for the provided subscription. Ack ids identify the messages to + * "nack", as returned in {@link ReceivedMessage#ackId()} by {@link #pull(String, int)} and + * {@link #pullAsync(String, int)}. This method corresponds to calling + * {@link #modifyAckDeadline(String, int, TimeUnit, Iterable)} with a deadline of 0. + * + * @param subscription the subscription whose messages must be "nacked" + * @param ackIds the ack ids of messages to "nack" + * @throws PubSubException upon failure, or if the subscription was not found + */ void nack(String subscription, Iterable ackIds); + /** + * Sends a request to "nack" the given messages for the provided subscription. Ack ids identify + * the messages to "nack", as returned in {@link ReceivedMessage#ackId()} by + * {@link #pull(String, int)} and {@link #pullAsync(String, int)}. This method corresponds to + * calling {@link #modifyAckDeadlineAsync(String, int, TimeUnit, Iterable)} with a deadline of 0. + * The method returns a {@code Future} object that can be used to wait for the "nack" operation to + * be completed. + * + * @param subscription the subscription whose messages must be "nacked" + * @param ackIds the ack ids of messages to "nack" + */ Future nackAsync(String subscription, Iterable ackIds); /** - * Modifies the acknowledge deadline of the given messages. {@code deadline} must be >= 0 and is - * the new deadline with respect to the time the modify request was received by the Pub/Sub + * Modifies the acknowledge deadline of the given messages. {@code deadline} must be >= 0 and + * is the new deadline with respect to the time the modify request was received by the Pub/Sub * service. For example, if {@code deadline} is 10 and {@code unit} is {@link TimeUnit#SECONDS}, * the new ack deadline will expire 10 seconds after the modify request was received by the * service. Specifying 0 may be used to make the message available for another pull request @@ -425,8 +507,8 @@ void modifyAckDeadline(String subscription, int deadline, TimeUnit unit, String /** * Sends a request to modify the acknowledge deadline of the given messages. {@code deadline} - * must be >= 0 and is the new deadline with respect to the time the modify request was received - * by the Pub/Sub service. For example, if {@code deadline} is 10 and {@code unit} is + * must be >= 0 and is the new deadline with respect to the time the modify request was + * received by the Pub/Sub service. For example, if {@code deadline} is 10 and {@code unit} is * {@link TimeUnit#SECONDS}, the new ack deadline will expire 10 seconds after the modify request * was received by the service. Specifying 0 may be used to make the message available for another * pull request (corresponds to calling {@link #nackAsync(String, Iterable)}). The method returns @@ -444,8 +526,8 @@ Future modifyAckDeadlineAsync(String subscription, int deadline, TimeUnit String ackId, String... ackIds); /** - * Modifies the acknowledge deadline of the given messages. {@code deadline} must be >= 0 and is - * the new deadline with respect to the time the modify request was received by the Pub/Sub + * Modifies the acknowledge deadline of the given messages. {@code deadline} must be >= 0 and + * is the new deadline with respect to the time the modify request was received by the Pub/Sub * service. For example, if {@code deadline} is 10 and {@code unit} is {@link TimeUnit#SECONDS}, * the new ack deadline will expire 10 seconds after the modify request was received by the * service. Specifying 0 may be used to make the message available for another pull request @@ -462,8 +544,8 @@ Future modifyAckDeadlineAsync(String subscription, int deadline, TimeUnit /** * Sends a request to modify the acknowledge deadline of the given messages. {@code deadline} - * must be >= 0 and is the new deadline with respect to the time the modify request was received - * by the Pub/Sub service. For example, if {@code deadline} is 10 and {@code unit} is + * must be >= 0 and is the new deadline with respect to the time the modify request was + * received by the Pub/Sub service. For example, if {@code deadline} is 10 and {@code unit} is * {@link TimeUnit#SECONDS}, the new ack deadline will expire 10 seconds after the modify request * was received by the service. Specifying 0 may be used to make the message available for another * pull request (corresponds to calling {@link #nackAsync(String, Iterable)}). The method returns diff --git a/gcloud-java-pubsub/src/main/java/com/google/cloud/pubsub/PubSubImpl.java b/gcloud-java-pubsub/src/main/java/com/google/cloud/pubsub/PubSubImpl.java index ec1ca920d91d..042f76b5197c 100644 --- a/gcloud-java-pubsub/src/main/java/com/google/cloud/pubsub/PubSubImpl.java +++ b/gcloud-java-pubsub/src/main/java/com/google/cloud/pubsub/PubSubImpl.java @@ -37,6 +37,7 @@ import com.google.common.collect.Maps; import com.google.common.util.concurrent.Uninterruptibles; import com.google.protobuf.Empty; +import com.google.pubsub.v1.AcknowledgeRequest; import com.google.pubsub.v1.DeleteSubscriptionRequest; import com.google.pubsub.v1.DeleteTopicRequest; import com.google.pubsub.v1.GetSubscriptionRequest; @@ -466,40 +467,46 @@ public MessageConsumer pullAsync(String subscription, MessageProcessor callback, @Override public void ack(String subscription, String ackId, String... ackIds) { + ack(subscription, Lists.asList(ackId, ackIds)); } @Override public Future ackAsync(String subscription, String ackId, String... ackIds) { - return null; + return ackAsync(subscription, Lists.asList(ackId, ackIds)); } @Override public void ack(String subscription, Iterable ackIds) { - + get(ackAsync(subscription, ackIds)); } @Override public Future ackAsync(String subscription, Iterable ackIds) { - return null; + AcknowledgeRequest request = AcknowledgeRequest.newBuilder() + .setSubscription(SubscriberApi.formatSubscriptionName(options().projectId(), subscription)) + .addAllAckIds(ackIds) + .build(); + return lazyTransform(rpc.acknowledge(request), EMPTY_TO_VOID_FUNCTION); } @Override public void nack(String subscription, String ackId, String... ackIds) { + nack(subscription, Lists.asList(ackId, ackIds)); } @Override public Future nackAsync(String subscription, String ackId, String... ackIds) { - return null; + return nackAsync(subscription, Lists.asList(ackId, ackIds)); } @Override public void nack(String subscription, Iterable ackIds) { - + get(nackAsync(subscription, ackIds)); } @Override public Future nackAsync(String subscription, Iterable ackIds) { - return null; + return modifyAckDeadlineAsync(subscription, 0, TimeUnit.SECONDS, ackIds); } @Override diff --git a/gcloud-java-pubsub/src/test/java/com/google/cloud/pubsub/PubSubImplTest.java b/gcloud-java-pubsub/src/test/java/com/google/cloud/pubsub/PubSubImplTest.java index 3c5b811307f5..6084266492f0 100644 --- a/gcloud-java-pubsub/src/test/java/com/google/cloud/pubsub/PubSubImplTest.java +++ b/gcloud-java-pubsub/src/test/java/com/google/cloud/pubsub/PubSubImplTest.java @@ -37,6 +37,7 @@ import com.google.common.collect.Lists; import com.google.common.util.concurrent.Futures; import com.google.protobuf.Empty; +import com.google.pubsub.v1.AcknowledgeRequest; import com.google.pubsub.v1.DeleteSubscriptionRequest; import com.google.pubsub.v1.DeleteTopicRequest; import com.google.pubsub.v1.GetSubscriptionRequest; @@ -1189,6 +1190,178 @@ public void testListTopicSubscriptionsAsyncWithOptions() Iterables.toArray(page.values(), SubscriptionId.class)); } + @Test + public void testAckOneMessage() { + pubsub = options.service(); + AcknowledgeRequest request = AcknowledgeRequest.newBuilder() + .setSubscription(SUBSCRIPTION_NAME_PB) + .addAckIds("ackId") + .build(); + Future response = Futures.immediateFuture(Empty.getDefaultInstance()); + EasyMock.expect(pubsubRpcMock.acknowledge(request)).andReturn(response); + EasyMock.replay(pubsubRpcMock); + pubsub.ack(SUBSCRIPTION, "ackId"); + } + + @Test + public void testAckOneMessageAsync() throws ExecutionException, InterruptedException { + pubsub = options.service(); + AcknowledgeRequest request = AcknowledgeRequest.newBuilder() + .setSubscription(SUBSCRIPTION_NAME_PB) + .addAckIds("ackId") + .build(); + Future response = Futures.immediateFuture(Empty.getDefaultInstance()); + EasyMock.expect(pubsubRpcMock.acknowledge(request)).andReturn(response); + EasyMock.replay(pubsubRpcMock); + Future future = pubsub.ackAsync(SUBSCRIPTION, "ackId"); + assertNull(future.get()); + } + + @Test + public void testAckMoreMessages() { + pubsub = options.service(); + AcknowledgeRequest request = AcknowledgeRequest.newBuilder() + .setSubscription(SUBSCRIPTION_NAME_PB) + .addAllAckIds(ImmutableList.of("ackId1", "ackId2")) + .build(); + Future response = Futures.immediateFuture(Empty.getDefaultInstance()); + EasyMock.expect(pubsubRpcMock.acknowledge(request)).andReturn(response); + EasyMock.replay(pubsubRpcMock); + pubsub.ack(SUBSCRIPTION, "ackId1", "ackId2"); + } + + @Test + public void testAckMoreMessagesAsync() throws ExecutionException, InterruptedException { + pubsub = options.service(); + AcknowledgeRequest request = AcknowledgeRequest.newBuilder() + .setSubscription(SUBSCRIPTION_NAME_PB) + .addAllAckIds(ImmutableList.of("ackId1", "ackId2")) + .build(); + Future response = Futures.immediateFuture(Empty.getDefaultInstance()); + EasyMock.expect(pubsubRpcMock.acknowledge(request)).andReturn(response); + EasyMock.replay(pubsubRpcMock); + Future future = pubsub.ackAsync(SUBSCRIPTION, "ackId1", "ackId2"); + assertNull(future.get()); + } + + @Test + public void testAckMessageList() { + pubsub = options.service(); + List ackIds = ImmutableList.of("ackId1", "ackId2"); + AcknowledgeRequest request = AcknowledgeRequest.newBuilder() + .setSubscription(SUBSCRIPTION_NAME_PB) + .addAllAckIds(ackIds) + .build(); + Future response = Futures.immediateFuture(Empty.getDefaultInstance()); + EasyMock.expect(pubsubRpcMock.acknowledge(request)).andReturn(response); + EasyMock.replay(pubsubRpcMock); + pubsub.ack(SUBSCRIPTION, ackIds); + } + + @Test + public void testAckMessageListAsync() throws ExecutionException, InterruptedException { + pubsub = options.service(); + List ackIds = ImmutableList.of("ackId1", "ackId2"); + AcknowledgeRequest request = AcknowledgeRequest.newBuilder() + .setSubscription(SUBSCRIPTION_NAME_PB) + .addAllAckIds(ackIds) + .build(); + Future response = Futures.immediateFuture(Empty.getDefaultInstance()); + EasyMock.expect(pubsubRpcMock.acknowledge(request)).andReturn(response); + EasyMock.replay(pubsubRpcMock); + Future future = pubsub.ackAsync(SUBSCRIPTION, ackIds); + assertNull(future.get()); + } + + @Test + public void testNackOneMessage() { + pubsub = options.service(); + ModifyAckDeadlineRequest request = ModifyAckDeadlineRequest.newBuilder() + .setAckDeadlineSeconds(0) + .setSubscription(SUBSCRIPTION_NAME_PB) + .addAckIds("ackId") + .build(); + Future response = Futures.immediateFuture(Empty.getDefaultInstance()); + EasyMock.expect(pubsubRpcMock.modify(request)).andReturn(response); + EasyMock.replay(pubsubRpcMock); + pubsub.nack(SUBSCRIPTION, "ackId"); + } + + @Test + public void testNackOneMessageAsync() throws ExecutionException, InterruptedException { + pubsub = options.service(); + ModifyAckDeadlineRequest request = ModifyAckDeadlineRequest.newBuilder() + .setAckDeadlineSeconds(0) + .setSubscription(SUBSCRIPTION_NAME_PB) + .addAckIds("ackId") + .build(); + Future response = Futures.immediateFuture(Empty.getDefaultInstance()); + EasyMock.expect(pubsubRpcMock.modify(request)).andReturn(response); + EasyMock.replay(pubsubRpcMock); + Future future = pubsub.nackAsync(SUBSCRIPTION, "ackId"); + assertNull(future.get()); + } + + @Test + public void testNackMoreMessages() { + pubsub = options.service(); + ModifyAckDeadlineRequest request = ModifyAckDeadlineRequest.newBuilder() + .setAckDeadlineSeconds(0) + .setSubscription(SUBSCRIPTION_NAME_PB) + .addAllAckIds(ImmutableList.of("ackId1", "ackId2")) + .build(); + Future response = Futures.immediateFuture(Empty.getDefaultInstance()); + EasyMock.expect(pubsubRpcMock.modify(request)).andReturn(response); + EasyMock.replay(pubsubRpcMock); + pubsub.nack(SUBSCRIPTION, "ackId1", "ackId2"); + } + + @Test + public void testNackMoreMessagesAsync() throws ExecutionException, InterruptedException { + pubsub = options.service(); + ModifyAckDeadlineRequest request = ModifyAckDeadlineRequest.newBuilder() + .setAckDeadlineSeconds(0) + .setSubscription(SUBSCRIPTION_NAME_PB) + .addAllAckIds(ImmutableList.of("ackId1", "ackId2")) + .build(); + Future response = Futures.immediateFuture(Empty.getDefaultInstance()); + EasyMock.expect(pubsubRpcMock.modify(request)).andReturn(response); + EasyMock.replay(pubsubRpcMock); + Future future = pubsub.nackAsync(SUBSCRIPTION, "ackId1", "ackId2"); + assertNull(future.get()); + } + + @Test + public void testNackMessageList() { + pubsub = options.service(); + List ackIds = ImmutableList.of("ackId1", "ackId2"); + ModifyAckDeadlineRequest request = ModifyAckDeadlineRequest.newBuilder() + .setAckDeadlineSeconds(0) + .setSubscription(SUBSCRIPTION_NAME_PB) + .addAllAckIds(ackIds) + .build(); + Future response = Futures.immediateFuture(Empty.getDefaultInstance()); + EasyMock.expect(pubsubRpcMock.modify(request)).andReturn(response); + EasyMock.replay(pubsubRpcMock); + pubsub.nack(SUBSCRIPTION, ackIds); + } + + @Test + public void testNackMessageListAsync() throws ExecutionException, InterruptedException { + pubsub = options.service(); + List ackIds = ImmutableList.of("ackId1", "ackId2"); + ModifyAckDeadlineRequest request = ModifyAckDeadlineRequest.newBuilder() + .setAckDeadlineSeconds(0) + .setSubscription(SUBSCRIPTION_NAME_PB) + .addAllAckIds(ackIds) + .build(); + Future response = Futures.immediateFuture(Empty.getDefaultInstance()); + EasyMock.expect(pubsubRpcMock.modify(request)).andReturn(response); + EasyMock.replay(pubsubRpcMock); + Future future = pubsub.nackAsync(SUBSCRIPTION, ackIds); + assertNull(future.get()); + } + @Test public void testModifyAckDeadlineOneMessage() { pubsub = options.service();