-
Notifications
You must be signed in to change notification settings - Fork 29
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fixing transaction_timed_out errors with long running reads. #182
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
first = false; | ||
try { | ||
loop choose { | ||
when(Reference<ScanReturnedContext> doc = waitNext(docs)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What is the priority at which this stream is yielding results?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Doc Layer code that is sending documents to this stream is not setting any priority explicitly. Source of the data is FDB flow bindings. Flow bindings set the priority to TaskDefaultOnMainThread
(7500) and delay default priority is TaskDefaultDelay
(7010). I tried this with TaskProxyGRVTimer
(8510) instead of max priority, that too seems to be fixing the issue. Shall I go with it?
loop choose { | ||
when(state Reference<ScanReturnedContext> doc = waitNext(docs)) { | ||
// throws end_of_stream when totally finished | ||
Void _ = wait(outerLock->take()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would it be equivalent and simpler to just do Void _ = wait( outerLock->take() || timeout );
instead of splitting this into two functions?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If I am thinking right, it would look like this
loop choose {
when(state Reference<ScanReturnedContext> doc = waitNext(docs)) {
// throws end_of_stream when totally finished
Void _ = wait(outerLock->take() || timeout);
if (!timeout.isReady()) {
.....
}
}
when(Void _ = wait(timeout)) { break; }
}
// squeeze remaining documents from the stream
// checkpoint planner
This will need special handling with !timeout.isReady()
.
And due to the way query planner checkpoint scheme works, it's important there are no documents pending in document stream when the checkpoint is being called, that would be below this loop. I guess I can have extra code before the checkpoint to squeeze the documents left in the stream.
If so, probably having 3 when
blocks probably clearer code.
innerCheckpoint = innerCheckpoint->stopAndCheckpoint(); | ||
|
||
while (!bufferedDocs.empty()) { | ||
Void _ = wait(bufferedDocs.front().second); | ||
output.send(bufferedDocs.front().first); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do you not need to innerLock->release();
here also?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Because of checkpoint at line 730, innerLock
is not valid anymore, no one is waiting on it.
// throws end_of_stream when totally finished | ||
bufferedDocs.push_back(std::make_pair(doc, outerLock->take())); | ||
} | ||
when(Void _ = wait(bufferedDocs.empty() ? Never() : bufferedDocs.front().second)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You're taking outerLock, but releasing innerLock, so what's making sure that you actually process documents as you add them?
src/QLPlan.actor.cpp
Outdated
loop choose { | ||
when(Reference<ScanReturnedContext> doc = waitNext(docs)) { | ||
// throws end_of_stream when totally finished | ||
bufferedDocs.push_back(std::make_pair(doc, outerLock->take())); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do you need to pass a priority to take() as well?
Co-Authored-By: apkar <[email protected]>
…nto read_timeouts
@@ -699,22 +699,23 @@ ACTOR static Future<Void> doNonIsolatedRO(PlanCheckpoint* outerCheckpoint, | |||
state FutureStream<Reference<ScanReturnedContext>> docs = subPlan->execute(innerCheckpoint.getPtr(), dtr); | |||
state FlowLock* innerLock = innerCheckpoint->getDocumentFinishedLock(); | |||
state bool first = true; | |||
state Future<Void> timeout = delay(3.0, TaskMaxPriority); | |||
state Future<Void> timeout = delay(3.0, g_network->getCurrentTask() + 1); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
g_network->getCurrentTask() + 1
could still give you a task priority lower than TaskDefaultOnMainThread
right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Idea is current task is running at TaskDefaultMainThread
as that’s the one setting the promise stream.
This patch attempts to fix
transaction_timed_out
errors in read pathwait()
inchoose-when
block ofdoNonIsolatedRO
, instead added anotherwhen
block to represent the same logic.delay
, that controls transaction split logic, at a higher priority.