-
Notifications
You must be signed in to change notification settings - Fork 3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Adding FLUSHING state to tasks and stages #4290
Conversation
If the build side completed building the lookup, the source is finished, otherwise the build side would not know the lookup is "full". To put differently, I do not understand the situation this is helping with. |
consider a broadcast join scenario where the build side is very small (1KB) and is computed and stored in build source's output buffer. Being a broadcast buffer, it will keep running until it is told that no new tasks will be added in the join stage - since it would have to serve the new join tasks if created. There are scenarios where it will never be told that no new tasks are coming (that logic will be improved as well in future changes). Now since the build source is running, its sub stages will also keep on running until they get blocked (because the build source is not consuming their output). The time and memory taken to reach to blocking state can be significant. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
great job! left some comments. I think we need to improve testing for new FLUSHING
state
@@ -112,4 +116,9 @@ public boolean canScheduleMoreTasks() | |||
} | |||
return true; | |||
} | |||
|
|||
public boolean isRunning() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Rename it to:
isRunningOrFlushing
@@ -524,6 +536,12 @@ else if (taskState == TaskState.FINISHED) { | |||
} | |||
} | |||
|
|||
private boolean isFlushing() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
make it synchronized
@@ -64,4 +68,9 @@ public boolean isDone() | |||
{ | |||
return doneState; | |||
} | |||
|
|||
public boolean isRunning() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
rename to isRunningOrFlushing
@@ -30,6 +30,10 @@ | |||
* Task is running. | |||
*/ | |||
RUNNING(false), | |||
/** | |||
* Task has finished executing and output is left to be consumed |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
add a sentence from PR description:
Flushing state signifies that there will be no new drivers,
the existing drivers have finished and the output buffer of
the task is at-least in a 'no-more-pages' state.
@@ -218,7 +218,7 @@ private boolean memoryRevokingNeeded(MemoryPool memoryPool) | |||
private long getMemoryAlreadyBeingRevoked(Collection<SqlTask> sqlTasks, MemoryPool memoryPool) | |||
{ | |||
return sqlTasks.stream() | |||
.filter(task -> task.getTaskStatus().getState() == TaskState.RUNNING) | |||
.filter(task -> task.getTaskStatus().getState().isRunning()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can keep == TaskState.RUNNING
as FLUSHING
tasks have no memory to revoke
@@ -253,7 +259,7 @@ public BasicStageStats getBasicStageStats(Supplier<Iterable<TaskInfo>> taskInfos | |||
// information, the stage could finish, and the task states would | |||
// never be visible. | |||
StageState state = stageState.get(); | |||
boolean isScheduled = (state == RUNNING) || state.isDone(); | |||
boolean isScheduled = state.isRunning() || state.isDone(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe we should have StageState#isScheduled
method instead.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There is already a scheduled
state for a stage - so having isScheduled
could lead to ambiguity. Although, by that logic - the variable can be named differently (like begunExecution
)
@@ -114,10 +114,10 @@ public void testEmptyQuery() | |||
createInitialEmptyOutputBuffers(PARTITIONED) | |||
.withNoMoreBufferIds(), | |||
OptionalInt.empty()); | |||
assertEquals(taskInfo.getTaskStatus().getState(), TaskState.RUNNING); | |||
assertTrue(taskInfo.getTaskStatus().getState().isRunning()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
please test actual states (if possible) and don't use isRunning()
. We need to have test where we validate FLUSHING
state happens.
@@ -177,7 +178,7 @@ public void testSimple(PipelineExecutionStrategy executionStrategy) | |||
|
|||
// | |||
// test body | |||
assertEquals(taskStateMachine.getState(), TaskState.RUNNING); | |||
assertTrue(taskStateMachine.getState().isRunning()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
please test actual states (if possible) and don't use isRunning()
. We need to have test where we validate FLUSHING
state happens
@@ -279,6 +280,9 @@ public void testSimple(PipelineExecutionStrategy executionStrategy) | |||
|
|||
outputBufferConsumer.abort(); // complete the task by calling abort on it | |||
TaskState taskState = taskStateMachine.getStateChange(TaskState.RUNNING).get(10, SECONDS); | |||
if (taskState == TaskState.FLUSHING) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should incorporate check for FLUSHING
state into test scenario
@@ -419,7 +420,7 @@ else if (partitioningHandle.equals(SCALED_WRITER_DISTRIBUTION)) { | |||
} | |||
Set<SqlStageExecution> childStages = childStagesBuilder.build(); | |||
stage.addStateChangeListener(newState -> { | |||
if (newState.isDone()) { | |||
if (newState.isDone() || newState == FLUSHING) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please add a test for this. The test could be:
- staring
SELECT * FROM large_table LIMIT 1
- NOT fetching results and waiting for SOURCE stage to be cancelled
@sopel39 : Thank you for the review! I have addressed all the comments w.r.t non-test code. Also, I have made the |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A couple of minor comments from me. I'll leave the final approval up to @sopel39
@@ -524,6 +536,12 @@ else if (taskState == TaskState.FINISHED) { | |||
} | |||
} | |||
|
|||
private synchronized boolean isFlushing() | |||
{ | |||
return flushingTasks.size() > 0 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add a comment explaining that to be flushing, there must be at least one flushing task, and all others must be flushing or finished.
also replace flushingTasks.size() > 0
with !flushingTasks.isEmpty()
@@ -64,4 +70,9 @@ public boolean isDone() | |||
{ | |||
return doneState; | |||
} | |||
|
|||
public boolean isRunningOrFlushing() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not a fan of this method. In all uses, it seem to be used in an ||
with some other clause on the state. Instead, I would just inline this method.
cfc220e
to
ddc6f5e
Compare
@dain : Thanks for the review! I have incorporated the current review comments and added a test for the case
Will non query based UTs for |
ddc6f5e
to
581af4e
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
mostly lgtm % styling and testing comments.
good job!
presto-main/src/main/java/io/prestosql/execution/SqlStageExecution.java
Outdated
Show resolved
Hide resolved
presto-main/src/test/java/io/prestosql/execution/TestSqlTask.java
Outdated
Show resolved
Hide resolved
presto-main/src/main/java/io/prestosql/execution/scheduler/SqlQueryScheduler.java
Show resolved
Hide resolved
public void testFlushingState() | ||
throws Exception | ||
{ | ||
queryRunner = createQueryRunner(ImmutableMap.of("node-scheduler.include-coordinator", "false")); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we need distributed runner for this test?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No, we don't need a distributed runner for this test. Used it because TestQueryRunnerUtil
provided a clean interface for waitForQueryState
(without fetching results) - is any existing one with LocalQueryRunner
too?
presto-tests/src/test/java/io/prestosql/execution/TestSqlStageExecution.java
Outdated
Show resolved
Hide resolved
presto-tests/src/test/java/io/prestosql/execution/TestSqlStageExecution.java
Outdated
Show resolved
Hide resolved
presto-tests/src/test/java/io/prestosql/execution/TestSqlStageExecution.java
Outdated
Show resolved
Hide resolved
tests are failing (I've seen some checkstyle issues) |
581af4e
to
875ed51
Compare
@@ -641,6 +641,7 @@ private synchronized void checkTaskCompletion() | |||
|
|||
// are there still pages in the output buffer | |||
if (!outputBuffer.isFinished()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@dain is this a valid transition for grouped execution or there should be additional conditions? I'm thinking that if there are more execution groups, the task should stay in RUNNING
state.
@@ -209,6 +212,7 @@ public void testSimple(PipelineExecutionStrategy executionStrategy) | |||
// assert that task result is produced | |||
outputBufferConsumer.consume(300 + 200, ASSERT_WAIT_TIMEOUT); | |||
outputBufferConsumer.assertBufferComplete(ASSERT_WAIT_TIMEOUT); | |||
assertEquals(taskStateMachine.getStateChange(RUNNING).get(10, SECONDS), FLUSHING); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
shouldn't that be moved above outputBufferConsumer.consume(300 + 200, ASSERT_WAIT_TIMEOUT);
to prevent race condition?
Does outputBufferConsumer.assertBufferComplete(ASSERT_WAIT_TIMEOUT);
consume all output?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, assertBufferComplete
makes sure that the buffer is complete. Added the assertions as discussed offline.
waitForQueryState(queryRunner, queryId, RUNNING); | ||
|
||
QueryInfo queryInfo = queryRunner.getCoordinator().getFullQueryInfo(queryId); | ||
while (queryInfo.getOutputStage().get().getState() != FLUSHING) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
please use assertEventually
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed - also moved the creation of queryRunner to an unbounded setup
. I think the test after setup should run within 30sec.
875ed51
to
875693f
Compare
875693f
to
d887993
Compare
assertEquals(results.getSerializedPages().size(), 1); | ||
assertEquals(results.getSerializedPages().get(0).getPositionCount(), 1); | ||
|
||
for (boolean moreResults = true; moreResults; moreResults = !results.isBufferComplete()) { | ||
results = sqlTaskManager.getTaskResults(taskId, OUT, results.getToken() + results.getSerializedPages().size(), DataSize.of(1, Unit.MEGABYTE)).get(); | ||
} | ||
assertEquals(results.isBufferComplete(), true); | ||
assertTrue(results.isBufferComplete()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit this should be separate commit
merged, thanks! |
Adds a flushing state to tasks and stages.
This is useful in case of broadcast joins where if the build source stage is done processing and is just running to serving data from its output buffer to join tasks, its sub-stages can be shutdown by cancellation (since the new output produced by the sub stages will not be useful). The cancellation is helpful since the sub stages under build source can be complex and take significant memory (the output buffer size is configurable) and CPU (due to the processing) to get to a blocked state.
For tasks, flushing state signifies that there will be no new drivers, the existing drivers have finished and the output buffer of the task is at-least in a 'no-more-pages' state.
For stages, flushing state signifies that at-least one of its task is flushing and the non-flushing tasks are finished.
For both, once the state has started flushing it will never go back to 'RUNNING' state.