Skip to content

Commit

Permalink
pubsub: fix race condition in streaming connection (#2416)
Browse files Browse the repository at this point in the history
  • Loading branch information
pongad authored Oct 4, 2017
1 parent 1fefc4c commit 97d50c2
Showing 1 changed file with 85 additions and 32 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.logging.Level;
import java.util.logging.Logger;
import javax.annotation.Nullable;
Expand All @@ -53,14 +55,15 @@ final class StreamingSubscriberConnection extends AbstractApiService implements
private static final Duration MAX_CHANNEL_RECONNECT_BACKOFF = Duration.ofSeconds(10);
private static final int MAX_PER_REQUEST_CHANGES = 10000;

private final AtomicLong channelReconnectBackoffMillis =
new AtomicLong(INITIAL_CHANNEL_RECONNECT_BACKOFF.toMillis());

private final SubscriberStub asyncStub;

private final String subscription;
private final ScheduledExecutorService executor;
private final MessageDispatcher messageDispatcher;

private final AtomicLong channelReconnectBackoffMillis =
new AtomicLong(INITIAL_CHANNEL_RECONNECT_BACKOFF.toMillis());

private final Lock lock = new ReentrantLock();
private ClientCallStreamObserver<StreamingPullRequest> requestObserver;

public StreamingSubscriberConnection(
Expand Down Expand Up @@ -104,22 +107,37 @@ protected void doStart() {
@Override
protected void doStop() {
messageDispatcher.stop();
requestObserver.onError(Status.CANCELLED.asException());
notifyStopped();

lock.lock();
try {
requestObserver.onError(Status.CANCELLED.asException());
} finally {
lock.unlock();
notifyStopped();
}
}

private class StreamingPullResponseObserver
implements ClientResponseObserver<StreamingPullRequest, StreamingPullResponse> {

final SettableFuture<Void> errorFuture;

/**
* When a batch finsihes processing, we want to request one more batch from the server. But by
* the time this happens, our stream might have already errored, and new stream created. We
* don't want to request more batches from the new stream -- that might pull more messages than
* the user can deal with -- so we save the request observer this response observer is "paired
* with". If the stream has already errored, requesting more messages is a no-op.
*/
ClientCallStreamObserver<StreamingPullRequest> thisRequestObserver;

StreamingPullResponseObserver(SettableFuture<Void> errorFuture) {
this.errorFuture = errorFuture;
}

@Override
public void beforeStart(ClientCallStreamObserver<StreamingPullRequest> requestObserver) {
StreamingSubscriberConnection.this.requestObserver = requestObserver;
thisRequestObserver = requestObserver;
requestObserver.disableAutoInboundFlowControl();
}

Expand All @@ -131,9 +149,18 @@ public void onNext(StreamingPullResponse response) {
new Runnable() {
@Override
public void run() {
// Only if not shutdown we will request one more batches of messages to be delivered.
if (isAlive()) {
requestObserver.request(1);
// Only request more if we're not shutdown.
// If errorFuture is done, the stream has either failed or hung up,
// and we don't need to request.
if (isAlive() && !errorFuture.isDone()) {
lock.lock();
try {
thisRequestObserver.request(1);
} catch (Exception e) {
logger.log(Level.WARNING, "cannot request more messages", e);
} finally {
lock.unlock();
}
}
}
});
Expand Down Expand Up @@ -169,6 +196,18 @@ private void initialize() {
.build());
requestObserver.request(1);

/**
* Must make sure we do this after sending the subscription name and deadline. Otherwise, some
* other thread might use this stream to do something else before we could send the first
* request.
*/
lock.lock();
try {
this.requestObserver = requestObserver;
} finally {
lock.unlock();
}

Futures.addCallback(
errorFuture,
new FutureCallback<Void>() {
Expand All @@ -191,24 +230,24 @@ public void onFailure(Throwable cause) {
return;
}
logger.log(Level.WARNING, "Terminated streaming with exception", cause);
if (StatusUtil.isRetryable(cause)) {
long backoffMillis = channelReconnectBackoffMillis.get();
long newBackoffMillis =
Math.min(backoffMillis * 2, MAX_CHANNEL_RECONNECT_BACKOFF.toMillis());
channelReconnectBackoffMillis.set(newBackoffMillis);

executor.schedule(
new Runnable() {
@Override
public void run() {
initialize();
}
},
backoffMillis,
TimeUnit.MILLISECONDS);
} else {
if (!StatusUtil.isRetryable(cause)) {
notifyFailed(cause);
return;
}
long backoffMillis = channelReconnectBackoffMillis.get();
long newBackoffMillis =
Math.min(backoffMillis * 2, MAX_CHANNEL_RECONNECT_BACKOFF.toMillis());
channelReconnectBackoffMillis.set(newBackoffMillis);

executor.schedule(
new Runnable() {
@Override
public void run() {
initialize();
}
},
backoffMillis,
TimeUnit.MILLISECONDS);
}
},
executor);
Expand All @@ -223,8 +262,15 @@ public void sendAckOperations(
List<String> acksToSend, List<PendingModifyAckDeadline> ackDeadlineExtensions) {
List<StreamingPullRequest> requests =
partitionAckOperations(acksToSend, ackDeadlineExtensions, MAX_PER_REQUEST_CHANGES);
for (StreamingPullRequest request : requests) {
requestObserver.onNext(request);
lock.lock();
try {
for (StreamingPullRequest request : requests) {
requestObserver.onNext(request);
}
} catch (Exception e) {
logger.log(Level.WARNING, "failed to send acks", e);
} finally {
lock.unlock();
}
}

Expand Down Expand Up @@ -274,9 +320,16 @@ static List<StreamingPullRequest> partitionAckOperations(

public void updateStreamAckDeadline(int newAckDeadlineSeconds) {
messageDispatcher.setMessageDeadlineSeconds(newAckDeadlineSeconds);
requestObserver.onNext(
StreamingPullRequest.newBuilder()
.setStreamAckDeadlineSeconds(newAckDeadlineSeconds)
.build());
lock.lock();
try {
requestObserver.onNext(
StreamingPullRequest.newBuilder()
.setStreamAckDeadlineSeconds(newAckDeadlineSeconds)
.build());
} catch (Exception e) {
logger.log(Level.WARNING, "failed to set deadline", e);
} finally {
lock.unlock();
}
}
}

0 comments on commit 97d50c2

Please sign in to comment.