Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adding resume publish. #5046

Merged
merged 2 commits into from
May 2, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -148,29 +148,21 @@ private Publisher(Builder builder) throws IOException {
.setTotalTimeout(Duration.ofNanos(Long.MAX_VALUE));
}

Set<StatusCode.Code> retryCodes;
if (enableMessageOrdering) {
retryCodes = EnumSet.allOf(StatusCode.Code.class);
} else {
retryCodes =
EnumSet.of(
StatusCode.Code.ABORTED,
StatusCode.Code.CANCELLED,
StatusCode.Code.DEADLINE_EXCEEDED,
StatusCode.Code.INTERNAL,
StatusCode.Code.RESOURCE_EXHAUSTED,
StatusCode.Code.UNKNOWN,
StatusCode.Code.UNAVAILABLE);
}

PublisherStubSettings.Builder stubSettings =
PublisherStubSettings.newBuilder()
.setCredentialsProvider(builder.credentialsProvider)
.setExecutorProvider(FixedExecutorProvider.create(executor))
.setTransportChannelProvider(builder.channelProvider);
stubSettings
.publishSettings()
.setRetryableCodes(retryCodes)
.setRetryableCodes(EnumSet.of(
StatusCode.Code.ABORTED,
StatusCode.Code.CANCELLED,
StatusCode.Code.DEADLINE_EXCEEDED,
StatusCode.Code.INTERNAL,
StatusCode.Code.RESOURCE_EXHAUSTED,
StatusCode.Code.UNKNOWN,
StatusCode.Code.UNAVAILABLE))
.setRetrySettings(retrySettingsBuilder.build())
.setBatchingSettings(BatchingSettings.newBuilder().setIsEnabled(false).build());
this.publisherStub = GrpcPublisherStub.create(stubSettings.build());
Expand Down Expand Up @@ -263,6 +255,18 @@ public void run() {
return outstandingPublish.publishResult;
}

/**
* There may be non-recoverable problems with a request for an ordering key. In that case, all
* subsequent requests will fail until this method is called. If the key is not currently paused,
* calling this method will be a no-op.
*

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add a comment that calling this function when the key has not been failed is no-op?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

* @param key The key for which to resume publishing.
*/
public void resumePublish(String key) {
Preconditions.checkState(!shutdown.get(), "Cannot publish on a shut-down publisher.");
sequentialExecutor.resumePublish(key);
}

private void setupAlarm() {
if (!messagesBatches.isEmpty()) {
if (!activeAlarm.getAndSet(true)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,14 @@
import com.google.api.core.ApiFutures;
import com.google.api.core.BetaApi;
import com.google.api.core.SettableApiFuture;

import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.Map;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.CancellationException;
import java.util.concurrent.Executor;
Expand Down Expand Up @@ -147,6 +151,8 @@ static class CallbackExecutor extends SequentialExecutor<CancellableRunnable> {
new CancellationException(
"Execution cancelled because executing previous runnable failed.");

private final Set<String> keysWithErrors = Collections.synchronizedSet(new HashSet<String>());

CallbackExecutor(Executor executor) {
super(executor);
}
Expand Down Expand Up @@ -186,6 +192,11 @@ <T> ApiFuture<T> submit(final String key, final Callable<ApiFuture<T>> callable)
// Step 1: create a future for the user
final SettableApiFuture<T> future = SettableApiFuture.create();

if (keysWithErrors.contains(key)) {
future.setException(CANCELLATION_EXCEPTION);
return future;
}

// Step 2: create the CancellableRunnable
// Step 3: add the task to queue via `execute`
CancellableRunnable task =
Expand Down Expand Up @@ -213,6 +224,7 @@ public void onSuccess(T msg) {
// Step 5.2: on failure
@Override
public void onFailure(Throwable e) {
keysWithErrors.add(key);
future.setException(e);
cancelQueuedTasks(key, CANCELLATION_EXCEPTION);
}
Expand All @@ -233,7 +245,11 @@ public void cancel(Throwable e) {
return future;
}

/** Cancels every task in the queue assoicated with {@code key}. */
void resumePublish(String key) {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it intended that the function name in Publisher is resumePublishing and this one is resumePublish?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I made both resumePublish

keysWithErrors.remove(key);
}

/** Cancels every task in the queue associated with {@code key}. */
private void cancelQueuedTasks(final String key, Throwable e) {
// TODO(kimkyung-goog): Ensure execute() fails once cancelQueueTasks() has been ever invoked,
// so that no more tasks are scheduled.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import java.util.concurrent.TimeUnit;
import org.easymock.EasyMock;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
Expand Down Expand Up @@ -443,6 +444,95 @@ public void testEnableMessageOrdering_dontSendWhileInflight() throws Exception {
publisher.shutdown();
}

@Test
/**
* Make sure that resume publishing works as expected:
*
* <ol>
* <li> publish with key orderA which returns a failure.</li>
* <li> publish with key orderA again, which should fail immediately</li>
* <li> publish with key orderB, which should succeed</li>
* <li> resume publishing on key orderA</li>
* <li> publish with key orderA, which should now succeed</li>
* </ol>
*
*/
public void testResumePublish() throws Exception {
Publisher publisher =
getTestPublisherBuilder()
.setBatchingSettings(
Publisher.Builder.DEFAULT_BATCHING_SETTINGS
.toBuilder()
.setElementCountThreshold(2L)
.build())
.setEnableMessageOrdering(true)
.build();

ApiFuture<String> future1 = sendTestMessageWithOrderingKey(publisher, "m1", "orderA");
ApiFuture<String> future2 = sendTestMessageWithOrderingKey(publisher, "m2", "orderA");

fakeExecutor.advanceTime(Duration.ZERO);
assertFalse(future1.isDone());
assertFalse(future2.isDone());

// This exception should stop future publishing to the same key
testPublisherServiceImpl.addPublishError(new StatusException(Status.INVALID_ARGUMENT));

fakeExecutor.advanceTime(Duration.ZERO);

try {
future1.get();
Assert.fail("This should fail.");
} catch (ExecutionException e) {
}

try {
future2.get();
Assert.fail("This should fail.");
} catch (ExecutionException e) {
}

// Submit new requests with orderA that should fail.
ApiFuture<String> future3 = sendTestMessageWithOrderingKey(publisher, "m3", "orderA");
ApiFuture<String> future4 = sendTestMessageWithOrderingKey(publisher, "m4", "orderA");

try {
future3.get();
Assert.fail("This should fail.");
} catch (ExecutionException e) {
assertEquals(SequentialExecutorService.CallbackExecutor.CANCELLATION_EXCEPTION, e.getCause());
}

try {
future4.get();
Assert.fail("This should fail.");
} catch (ExecutionException e) {
assertEquals(SequentialExecutorService.CallbackExecutor.CANCELLATION_EXCEPTION, e.getCause());
}

// Submit a new request with orderB, which should succeed
ApiFuture<String> future5 = sendTestMessageWithOrderingKey(publisher, "m5", "orderB");
ApiFuture<String> future6 = sendTestMessageWithOrderingKey(publisher, "m6", "orderB");

testPublisherServiceImpl.addPublishResponse(PublishResponse.newBuilder().addMessageIds("5").addMessageIds("6"));

Assert.assertEquals("5", future5.get());
Assert.assertEquals("6", future6.get());

// Resume publishing of "orderA", which should now succeed
publisher.resumePublish("orderA");

ApiFuture<String> future7 = sendTestMessageWithOrderingKey(publisher, "m7", "orderA");
ApiFuture<String> future8 = sendTestMessageWithOrderingKey(publisher, "m8", "orderA");

testPublisherServiceImpl.addPublishResponse(PublishResponse.newBuilder().addMessageIds("7").addMessageIds("8"));

Assert.assertEquals("7", future7.get());
Assert.assertEquals("8", future8.get());

publisher.shutdown();
}

private ApiFuture<String> sendTestMessageWithOrderingKey(
Publisher publisher, String data, String orderingKey) {
return publisher.publish(
Expand Down