Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

core: fix retry flow control issue #8401

Merged
merged 4 commits into from
Aug 11, 2021

Conversation

dapengzhang0
Copy link
Member

@dapengzhang0 dapengzhang0 commented Aug 10, 2021

There has been an issue about flow control when retry is enabled.

Currently we call masterListener.onReady() whenever substreamListener.onReady() is called.

The user's onReady() implementation might do

while(observer.isReady()) {
  // send one more message.
}

However, currently if the RetriableStream is still draining, isReady() is false, and user's onReady() exits immediately. And because substreamListener.onReady() is already called, it may not be called again after drained.

This PR fixes the issue by

  • Use a SerializeExecutor to call all masterListener callbacks.
  • Once RetriableStream is drained, check isReady() and if so call onReady().
  • Once substreamListener.onReady() is called, check isReady() and only if so we call masterListener.onReady().

@@ -64,6 +64,7 @@

private final MethodDescriptor<ReqT, ?> method;
private final Executor callExecutor;
private final Executor listenerSerializeExecutor = new SerializeReentrantCallsDirectExecutor();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SerializeReentrantCallsDirectExecutor is single-producer (not thread safe). It doesn't work in this case. I think SynchronizationContext is probably better suited.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

core/src/main/java/io/grpc/internal/RetriableStream.java Outdated Show resolved Hide resolved
core/src/main/java/io/grpc/internal/RetriableStream.java Outdated Show resolved Hide resolved
@dapengzhang0 dapengzhang0 added the TODO:backport PR needs to be backported. Removed after backport complete label Aug 11, 2021
@dapengzhang0 dapengzhang0 merged commit 2142902 into grpc:master Aug 11, 2021
@dapengzhang0 dapengzhang0 deleted the fix-retry-flow-control branch August 11, 2021 17:26
dapengzhang0 added a commit to dapengzhang0/grpc-java that referenced this pull request Aug 12, 2021
There has been an issue about flow control when retry is enabled.

Currently we call `masterListener.onReady()` whenever `substreamListener.onReady()` is called.

The user's `onReady()` implementation might do

```
while(observer.isReady()) {
  // send one more message.
}
```

However, currently if the `RetriableStream` is still draining, `isReady()` is false, and user's `onReady()` exits immediately. And because `substreamListener.onReady()` is already called, it may not be called again after drained.

This PR fixes the issue by

- Use a SerializeExecutor to call all `masterListener` callbacks.
- Once `RetriableStream` is drained, check `isReady()` and if so call `onReady()`.
- Once `substreamListener.onReady()` is called, check `isReady()` and only if so we call `masterListener.onReady()`.
@dapengzhang0 dapengzhang0 removed the TODO:backport PR needs to be backported. Removed after backport complete label Aug 12, 2021
dapengzhang0 added a commit that referenced this pull request Aug 12, 2021
There has been an issue about flow control when retry is enabled.

Currently we call `masterListener.onReady()` whenever `substreamListener.onReady()` is called.

The user's `onReady()` implementation might do

```
while(observer.isReady()) {
  // send one more message.
}
```

However, currently if the `RetriableStream` is still draining, `isReady()` is false, and user's `onReady()` exits immediately. And because `substreamListener.onReady()` is already called, it may not be called again after drained.

This PR fixes the issue by

- Use a SerializeExecutor to call all `masterListener` callbacks.
- Once `RetriableStream` is drained, check `isReady()` and if so call `onReady()`.
- Once `substreamListener.onReady()` is called, check `isReady()` and only if so we call `masterListener.onReady()`.
@github-actions github-actions bot locked as resolved and limited conversation to collaborators Nov 11, 2021
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants