-
Notifications
You must be signed in to change notification settings - Fork 1.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
pubsub: fix race condition in streaming connection #2416
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would replace ReentrantLock with ReadWriteLock. I think it will simplify the concurrency model + we can allow almost all operations here to happen concurrently, except for recreating the stream so that we only call Ack on the new stream.
private final AtomicLong channelReconnectBackoffMillis = | ||
new AtomicLong(INITIAL_CHANNEL_RECONNECT_BACKOFF.toMillis()); | ||
|
||
private final Lock lock = new ReentrantLock(); |
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
|
||
lock.lock(); | ||
try { | ||
requestObserver.onError(Status.CANCELLED.asException()); |
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
StreamingSubscriberConnection.this.requestObserver = requestObserver; | ||
requestObserver.disableAutoInboundFlowControl(); | ||
thisRequestObserver = requestObserver; | ||
lock.lock(); |
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
requestObserver.request(1); | ||
lock.lock(); | ||
try { | ||
thisRequestObserver.request(1); |
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
@@ -220,8 +260,13 @@ public void sendAckOperations( | |||
List<String> acksToSend, List<PendingModifyAckDeadline> ackDeadlineExtensions) { | |||
List<StreamingPullRequest> requests = | |||
partitionAckOperations(acksToSend, ackDeadlineExtensions, MAX_PER_REQUEST_CHANGES); | |||
for (StreamingPullRequest request : requests) { | |||
requestObserver.onNext(request); | |||
lock.lock(); |
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
StreamingPullRequest.newBuilder() | ||
.setStreamAckDeadlineSeconds(newAckDeadlineSeconds) | ||
.build()); | ||
lock.lock(); |
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
requestObserver.request(1); | ||
lock.lock(); | ||
try { | ||
thisRequestObserver.request(1); |
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
@@ -220,8 +260,13 @@ public void sendAckOperations( | |||
List<String> acksToSend, List<PendingModifyAckDeadline> ackDeadlineExtensions) { | |||
List<StreamingPullRequest> requests = | |||
partitionAckOperations(acksToSend, ackDeadlineExtensions, MAX_PER_REQUEST_CHANGES); | |||
for (StreamingPullRequest request : requests) { | |||
requestObserver.onNext(request); | |||
lock.lock(); |
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
StreamingPullRequest.newBuilder() | ||
.setStreamAckDeadlineSeconds(newAckDeadlineSeconds) | ||
.build()); | ||
lock.lock(); |
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
@mdietz94 PTAL |
} finally { | ||
lock.unlock(); | ||
} | ||
thisRequestObserver.request(1); |
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Still require a reader lock around request to avoid concurrent access with onNext/onError
Thinking about this more, I think I found more issues with streams. I'll follow up with you and David on email. |
@mdietz94 @davidcavazos Please let me know if this looks OK to you. The error handling is still obviously wrong. However, I think this is a big enough improvement that we can submit this as an intermediate state.
|
Did you mean @davidtorres ? |
Yes I did! Sorry for the spam! |
// If errorFuture is done, the stream has either failed or hung up, | ||
// and we don't need to request. | ||
if (isAlive() && !errorFuture.isDone()) { | ||
lock.lock(); |
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
No description provided.