Skip to content

Commit

Permalink
Implement modifyAckDeadline methods, add javadoc and tests (#1022)
Browse files Browse the repository at this point in the history
  • Loading branch information
mziccard committed May 25, 2016
1 parent fe4c90c commit 235152e
Show file tree
Hide file tree
Showing 6 changed files with 178 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -403,14 +403,78 @@ interface MessageConsumer extends AutoCloseable {

Future<Void> nackAsync(String subscription, Iterable<String> ackIds);

/**
* Modifies the acknowledge deadline of the given messages. {@code deadline} must be >= 0 and is
* the new deadline with respect to the time the modify request was received by the Pub/Sub
* service. For example, if {@code deadline} is 10 and {@code unit} is {@link TimeUnit#SECONDS},
* the new ack deadline will expire 10 seconds after the modify request was received by the
* service. Specifying 0 may be used to make the message available for another pull request
* (corresponds to calling {@link #nack(String, String, String...)}).
*
* @param subscription the subscription whose messages need to update their acknowledge deadline
* @param deadline the new deadline, relative to the time the modify request is received by the
* Pub/Sub service
* @param unit time unit for the {@code deadline} parameter
* @param ackId the ack id of the first message for which the acknowledge deadline must be
* modified
* @param ackIds other ack ids of messages for which the acknowledge deadline must be modified
* @throws PubSubException upon failure, or if the subscription was not found
*/
void modifyAckDeadline(String subscription, int deadline, TimeUnit unit, String ackId,
String... ackIds);

/**
* Sends a request to modify the acknowledge deadline of the given messages. {@code deadline}
* must be >= 0 and is the new deadline with respect to the time the modify request was received
* by the Pub/Sub service. For example, if {@code deadline} is 10 and {@code unit} is
* {@link TimeUnit#SECONDS}, the new ack deadline will expire 10 seconds after the modify request
* was received by the service. Specifying 0 may be used to make the message available for another
* pull request (corresponds to calling {@link #nackAsync(String, Iterable)}). The method returns
* a {@code Future} object that can be used to wait for the modify operation to be completed.
*
* @param subscription the subscription whose messages need to update their acknowledge deadline
* @param deadline the new deadline, relative to the time the modify request is received by the
* Pub/Sub service
* @param unit time unit for the {@code deadline} parameter
* @param ackId the ack id of the first message for which the acknowledge deadline must be
* modified
* @param ackIds other ack ids of messages for which the acknowledge deadline must be modified
*/
Future<Void> modifyAckDeadlineAsync(String subscription, int deadline, TimeUnit unit,
String ackId, String... ackIds);

/**
* Modifies the acknowledge deadline of the given messages. {@code deadline} must be >= 0 and is
* the new deadline with respect to the time the modify request was received by the Pub/Sub
* service. For example, if {@code deadline} is 10 and {@code unit} is {@link TimeUnit#SECONDS},
* the new ack deadline will expire 10 seconds after the modify request was received by the
* service. Specifying 0 may be used to make the message available for another pull request
* (corresponds to calling {@link #nack(String, Iterable)}).
*
* @param subscription the subscription whose messages need to update their acknowledge deadline
* @param deadline the new deadline, relative to the time the modify request is received by the
* Pub/Sub service
* @param unit time unit for the {@code deadline} parameter
* @param ackIds the ack ids of messages for which the acknowledge deadline must be modified
* @throws PubSubException upon failure, or if the subscription was not found
*/
void modifyAckDeadline(String subscription, int deadline, TimeUnit unit, Iterable<String> ackIds);

/**
* Sends a request to modify the acknowledge deadline of the given messages. {@code deadline}
* must be >= 0 and is the new deadline with respect to the time the modify request was received
* by the Pub/Sub service. For example, if {@code deadline} is 10 and {@code unit} is
* {@link TimeUnit#SECONDS}, the new ack deadline will expire 10 seconds after the modify request
* was received by the service. Specifying 0 may be used to make the message available for another
* pull request (corresponds to calling {@link #nackAsync(String, Iterable)}). The method returns
* a {@code Future} object that can be used to wait for the modify operation to be completed.
*
* @param subscription the subscription whose messages need to update their acknowledge deadline
* @param deadline the new deadline, relative to the time the modify request is received by the
* Pub/Sub service
* @param unit time unit for the {@code deadline} parameter
* @param ackIds the ack ids of messages for which the acknowledge deadline must be modified
*/
Future<Void> modifyAckDeadlineAsync(String subscription, int deadline, TimeUnit unit,
Iterable<String> ackIds);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,9 @@

package com.google.cloud.pubsub;

import static com.google.api.client.util.Preconditions.checkArgument;
import static com.google.cloud.pubsub.PubSub.ListOption.OptionType.PAGE_SIZE;
import static com.google.cloud.pubsub.PubSub.ListOption.OptionType.PAGE_TOKEN;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.util.concurrent.Futures.lazyTransform;

import com.google.cloud.AsyncPage;
Expand Down Expand Up @@ -47,6 +47,7 @@
import com.google.pubsub.v1.ListTopicSubscriptionsResponse;
import com.google.pubsub.v1.ListTopicsRequest;
import com.google.pubsub.v1.ListTopicsResponse;
import com.google.pubsub.v1.ModifyAckDeadlineRequest;
import com.google.pubsub.v1.ModifyPushConfigRequest;
import com.google.pubsub.v1.PublishRequest;
import com.google.pubsub.v1.PublishResponse;
Expand Down Expand Up @@ -504,25 +505,30 @@ public Future<Void> nackAsync(String subscription, Iterable<String> ackIds) {
@Override
public void modifyAckDeadline(String subscription, int deadline, TimeUnit unit, String ackId,
String... ackIds) {

get(modifyAckDeadlineAsync(subscription, deadline, unit, Lists.asList(ackId, ackIds)));
}

@Override
public Future<Void> modifyAckDeadlineAsync(String subscription, int deadline, TimeUnit unit,
String ackId, String... ackIds) {
return null;
return modifyAckDeadlineAsync(subscription, deadline, unit, Lists.asList(ackId, ackIds));
}

@Override
public void modifyAckDeadline(String subscription, int deadline, TimeUnit unit,
Iterable<String> ackIds) {

get(modifyAckDeadlineAsync(subscription, deadline, unit, ackIds));
}

@Override
public Future<Void> modifyAckDeadlineAsync(String subscription, int deadline, TimeUnit unit,
Iterable<String> ackIds) {
return null;
ModifyAckDeadlineRequest request = ModifyAckDeadlineRequest.newBuilder()
.setSubscription(SubscriberApi.formatSubscriptionName(options().projectId(), subscription))
.setAckDeadlineSeconds((int) TimeUnit.SECONDS.convert(deadline, unit))
.addAllAckIds(ackIds)
.build();
return lazyTransform(rpc.modify(request), EMPTY_TO_VOID_FUNCTION);
}

static <T extends Option.OptionType> Map<Option.OptionType, ?> optionMap(Option... options) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,13 +54,13 @@
import com.google.pubsub.v1.Subscription;
import com.google.pubsub.v1.Topic;

import org.joda.time.Duration;

import io.grpc.ManagedChannel;
import io.grpc.Status.Code;
import io.grpc.netty.NegotiationType;
import io.grpc.netty.NettyChannelBuilder;

import org.joda.time.Duration;

import java.io.IOException;
import java.util.Set;
import java.util.concurrent.Future;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,10 @@
import com.google.cloud.RetryParams;
import com.google.cloud.pubsub.PubSubOptions;

import io.grpc.ManagedChannel;
import io.grpc.netty.NegotiationType;
import io.grpc.netty.NettyChannelBuilder;

import java.io.IOException;
import java.net.MalformedURLException;
import java.net.URL;
Expand All @@ -31,10 +35,6 @@
import java.util.List;
import java.util.UUID;

import io.grpc.ManagedChannel;
import io.grpc.netty.NegotiationType;
import io.grpc.netty.NettyChannelBuilder;

/**
* A class that runs a Pubsub emulator instance for use in tests.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ public static void startServer() throws IOException, InterruptedException {

@AfterClass
public static void stopServer() throws Exception {
pubsub.options().rpc().close();
pubsub.close();
pubsubHelper.reset();
pubsubHelper.stop();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import com.google.pubsub.v1.ListTopicSubscriptionsResponse;
import com.google.pubsub.v1.ListTopicsRequest;
import com.google.pubsub.v1.ListTopicsResponse;
import com.google.pubsub.v1.ModifyAckDeadlineRequest;
import com.google.pubsub.v1.ModifyPushConfigRequest;
import com.google.pubsub.v1.PublishRequest;
import com.google.pubsub.v1.PublishResponse;
Expand All @@ -61,6 +62,7 @@
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;

public class PubSubImplTest {

Expand Down Expand Up @@ -1187,6 +1189,100 @@ public void testListTopicSubscriptionsAsyncWithOptions()
Iterables.toArray(page.values(), SubscriptionId.class));
}

@Test
public void testModifyAckDeadlineOneMessage() {
pubsub = options.service();
ModifyAckDeadlineRequest request = ModifyAckDeadlineRequest.newBuilder()
.setAckDeadlineSeconds(10)
.setSubscription(SUBSCRIPTION_NAME_PB)
.addAckIds("ackId")
.build();
Future<Empty> response = Futures.immediateFuture(Empty.getDefaultInstance());
EasyMock.expect(pubsubRpcMock.modify(request)).andReturn(response);
EasyMock.replay(pubsubRpcMock);
pubsub.modifyAckDeadline(SUBSCRIPTION, 10, TimeUnit.SECONDS, "ackId");
}

@Test
public void testModifyAckDeadlineOneMessageAsync()
throws ExecutionException, InterruptedException {
pubsub = options.service();
ModifyAckDeadlineRequest request = ModifyAckDeadlineRequest.newBuilder()
.setAckDeadlineSeconds(10)
.setSubscription(SUBSCRIPTION_NAME_PB)
.addAckIds("ackId")
.build();
Future<Empty> response = Futures.immediateFuture(Empty.getDefaultInstance());
EasyMock.expect(pubsubRpcMock.modify(request)).andReturn(response);
EasyMock.replay(pubsubRpcMock);
Future<Void> future =
pubsub.modifyAckDeadlineAsync(SUBSCRIPTION, 10, TimeUnit.SECONDS, "ackId");
assertNull(future.get());
}

@Test
public void testModifyAckDeadlineMoreMessages() {
pubsub = options.service();
ModifyAckDeadlineRequest request = ModifyAckDeadlineRequest.newBuilder()
.setAckDeadlineSeconds(10)
.setSubscription(SUBSCRIPTION_NAME_PB)
.addAllAckIds(ImmutableList.of("ackId1", "ackId2"))
.build();
Future<Empty> response = Futures.immediateFuture(Empty.getDefaultInstance());
EasyMock.expect(pubsubRpcMock.modify(request)).andReturn(response);
EasyMock.replay(pubsubRpcMock);
pubsub.modifyAckDeadline(SUBSCRIPTION, 10, TimeUnit.SECONDS, "ackId1", "ackId2");
}

@Test
public void testModifyAckDeadlineMoreMessagesAsync()
throws ExecutionException, InterruptedException {
pubsub = options.service();
ModifyAckDeadlineRequest request = ModifyAckDeadlineRequest.newBuilder()
.setAckDeadlineSeconds(10)
.setSubscription(SUBSCRIPTION_NAME_PB)
.addAllAckIds(ImmutableList.of("ackId1", "ackId2"))
.build();
Future<Empty> response = Futures.immediateFuture(Empty.getDefaultInstance());
EasyMock.expect(pubsubRpcMock.modify(request)).andReturn(response);
EasyMock.replay(pubsubRpcMock);
Future<Void> future =
pubsub.modifyAckDeadlineAsync(SUBSCRIPTION, 10, TimeUnit.SECONDS, "ackId1", "ackId2");
assertNull(future.get());
}

@Test
public void testModifyAckDeadlineMessageList() {
pubsub = options.service();
List<String> ackIds = ImmutableList.of("ackId1", "ackId2");
ModifyAckDeadlineRequest request = ModifyAckDeadlineRequest.newBuilder()
.setAckDeadlineSeconds(10)
.setSubscription(SUBSCRIPTION_NAME_PB)
.addAllAckIds(ackIds)
.build();
Future<Empty> response = Futures.immediateFuture(Empty.getDefaultInstance());
EasyMock.expect(pubsubRpcMock.modify(request)).andReturn(response);
EasyMock.replay(pubsubRpcMock);
pubsub.modifyAckDeadline(SUBSCRIPTION, 10, TimeUnit.SECONDS, ackIds);
}

@Test
public void testModifyAckDeadlineMessageListAsync()
throws ExecutionException, InterruptedException {
pubsub = options.service();
List<String> ackIds = ImmutableList.of("ackId1", "ackId2");
ModifyAckDeadlineRequest request = ModifyAckDeadlineRequest.newBuilder()
.setAckDeadlineSeconds(10)
.setSubscription(SUBSCRIPTION_NAME_PB)
.addAllAckIds(ackIds)
.build();
Future<Empty> response = Futures.immediateFuture(Empty.getDefaultInstance());
EasyMock.expect(pubsubRpcMock.modify(request)).andReturn(response);
EasyMock.replay(pubsubRpcMock);
Future<Void> future = pubsub.modifyAckDeadlineAsync(SUBSCRIPTION, 10, TimeUnit.SECONDS, ackIds);
assertNull(future.get());
}

@Test
public void testClose() throws Exception {
pubsub = options.service();
Expand Down

0 comments on commit 235152e

Please sign in to comment.