Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adding FLUSHING state to tasks and stages #4290

Merged
merged 3 commits into from
Jul 30, 2020
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -998,7 +998,7 @@ private static boolean isScheduled(Optional<StageInfo> rootStage)
}
return getAllStages(rootStage).stream()
.map(StageInfo::getState)
.allMatch(state -> (state == StageState.RUNNING) || state.isDone());
.allMatch(state -> state == StageState.RUNNING || state == StageState.FLUSHING || state.isDone());
}

public Optional<ExecutionFailureInfo> getFailureInfo()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,8 @@ public final class SqlStageExecution
@GuardedBy("this")
private final Set<TaskId> finishedTasks = newConcurrentHashSet();
@GuardedBy("this")
private final Set<TaskId> flushingTasks = newConcurrentHashSet();
@GuardedBy("this")
private final Set<TaskId> tasksWithFinalInfo = newConcurrentHashSet();
@GuardedBy("this")
private final AtomicBoolean splitsScheduled = new AtomicBoolean();
Expand Down Expand Up @@ -225,6 +227,9 @@ public synchronized void schedulingComplete()
if (getAllTasks().stream().anyMatch(task -> getState() == StageState.RUNNING)) {
stateMachine.transitionToRunning();
}
if (isFlushing()) {
rohangarg marked this conversation as resolved.
Show resolved Hide resolved
stateMachine.transitionToFlushing();
}
if (finishedTasks.containsAll(allTasks)) {
stateMachine.transitionToFinished();
}
Expand Down Expand Up @@ -505,14 +510,21 @@ else if (taskState == TaskState.ABORTED) {
// A task should only be in the aborted state if the STAGE is done (ABORTED or FAILED)
stateMachine.transitionToFailed(new PrestoException(GENERIC_INTERNAL_ERROR, "A task is in the ABORTED state but stage is " + stageState));
}
else if (taskState == TaskState.FLUSHING) {
flushingTasks.add(taskStatus.getTaskId());
}
else if (taskState == TaskState.FINISHED) {
finishedTasks.add(taskStatus.getTaskId());
flushingTasks.remove(taskStatus.getTaskId());
}

if (stageState == StageState.SCHEDULED || stageState == StageState.RUNNING) {
if (stageState == StageState.SCHEDULED || stageState == StageState.RUNNING || stageState == StageState.FLUSHING) {
if (taskState == TaskState.RUNNING) {
stateMachine.transitionToRunning();
}
if (isFlushing()) {
rohangarg marked this conversation as resolved.
Show resolved Hide resolved
stateMachine.transitionToFlushing();
}
if (finishedTasks.containsAll(allTasks)) {
stateMachine.transitionToFinished();
}
Expand All @@ -524,6 +536,13 @@ else if (taskState == TaskState.FINISHED) {
}
}

private synchronized boolean isFlushing()
{
// to transition to flushing, there must be at least one flushing task, and all others must be flushing or finished.
return !flushingTasks.isEmpty()
&& allTasks.stream().allMatch(taskId -> finishedTasks.contains(taskId) || flushingTasks.contains(taskId));
}

private synchronized void updateFinalTaskInfo(TaskInfo finalTaskInfo)
{
tasksWithFinalInfo.add(finalTaskInfo.getTaskStatus().getTaskId());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -641,6 +641,7 @@ private synchronized void checkTaskCompletion()

// are there still pages in the output buffer
if (!outputBuffer.isFinished()) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@dain is this a valid transition for grouped execution or there should be additional conditions? I'm thinking that if there are more execution groups, the task should stay in RUNNING state.

taskStateMachine.transitionToFlushing();
return;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,11 @@ public enum StageState
* Stage is running.
*/
RUNNING(false, false),
/**
rohangarg marked this conversation as resolved.
Show resolved Hide resolved
* Stage has finished executing and output being consumed.
rohangarg marked this conversation as resolved.
Show resolved Hide resolved
* In this state, at-least one of the tasks is flushing and the non-flushing tasks are finished
*/
FLUSHING(false, false),
/**
* Stage has finished executing and all output has been consumed.
*/
Expand Down Expand Up @@ -99,6 +104,7 @@ public boolean canScheduleMoreTasks()
case SCHEDULING_SPLITS:
case SCHEDULED:
case RUNNING:
case FLUSHING:
case FINISHED:
case CANCELED:
// no more workers will be added to the query
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@
import static io.prestosql.execution.StageState.CANCELED;
import static io.prestosql.execution.StageState.FAILED;
import static io.prestosql.execution.StageState.FINISHED;
import static io.prestosql.execution.StageState.FLUSHING;
import static io.prestosql.execution.StageState.PLANNED;
import static io.prestosql.execution.StageState.RUNNING;
import static io.prestosql.execution.StageState.SCHEDULED;
Expand Down Expand Up @@ -161,7 +162,12 @@ public synchronized boolean transitionToScheduled()

public boolean transitionToRunning()
{
return stageState.setIf(RUNNING, currentState -> currentState != RUNNING && !currentState.isDone());
return stageState.setIf(RUNNING, currentState -> currentState != RUNNING && currentState != FLUSHING && !currentState.isDone());
}

public boolean transitionToFlushing()
{
return stageState.setIf(FLUSHING, currentState -> currentState != FLUSHING && !currentState.isDone());
}

public boolean transitionToFinished()
Expand Down Expand Up @@ -253,7 +259,7 @@ public BasicStageStats getBasicStageStats(Supplier<Iterable<TaskInfo>> taskInfos
// information, the stage could finish, and the task states would
// never be visible.
StageState state = stageState.get();
boolean isScheduled = (state == RUNNING) || state.isDone();
boolean isScheduled = state == RUNNING || state == FLUSHING || state.isDone();

List<TaskInfo> taskInfos = ImmutableList.copyOf(taskInfosSupplier.get());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import java.util.Set;

import static com.google.common.base.Preconditions.checkArgument;
import static io.prestosql.execution.StageState.FLUSHING;
import static io.prestosql.execution.StageState.RUNNING;
import static java.lang.Math.min;
import static java.util.Objects.requireNonNull;
Expand Down Expand Up @@ -422,7 +423,7 @@ public List<OperatorStats> getOperatorSummaries()

public BasicStageStats toBasicStageStats(StageState stageState)
{
boolean isScheduled = (stageState == RUNNING) || stageState.isDone();
boolean isScheduled = stageState == RUNNING || stageState == FLUSHING || stageState.isDone();

OptionalDouble progressPercentage = OptionalDouble.empty();
if (isScheduled && totalDrivers != 0) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,12 @@ public enum TaskState
* Task is running.
*/
RUNNING(false),
/**
* Task has finished executing and output is left to be consumed.
* In this state, there will be no new drivers, the existing drivers have finished
* and the output buffer of the task is at-least in a 'no-more-pages' state.
*/
FLUSHING(false),
/**
* Task has finished executing and all output has been consumed.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@
import static com.google.common.base.MoreObjects.toStringHelper;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.util.concurrent.Futures.immediateFuture;
import static io.prestosql.execution.TaskState.FLUSHING;
import static io.prestosql.execution.TaskState.RUNNING;
import static io.prestosql.execution.TaskState.TERMINAL_TASK_STATES;
import static java.util.Objects.requireNonNull;

Expand Down Expand Up @@ -80,6 +82,11 @@ public LinkedBlockingQueue<Throwable> getFailureCauses()
return failureCauses;
}

public void transitionToFlushing()
{
taskState.setIf(FLUSHING, currentState -> currentState == RUNNING);
}

public void finished()
{
transitionToDoneState(TaskState.FINISHED);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import static com.google.common.collect.ImmutableMap.toImmutableMap;
import static com.google.common.collect.ImmutableSet.toImmutableSet;
import static com.google.common.collect.Iterables.getOnlyElement;
import static io.prestosql.execution.StageState.FLUSHING;
import static io.prestosql.execution.StageState.RUNNING;
import static io.prestosql.execution.StageState.SCHEDULED;
import static java.util.Objects.requireNonNull;
Expand Down Expand Up @@ -71,7 +72,7 @@ public Set<SqlStageExecution> getStagesToSchedule()
{
for (Iterator<SqlStageExecution> iterator = schedulingStages.iterator(); iterator.hasNext(); ) {
StageState state = iterator.next().getState();
if (state == SCHEDULED || state == RUNNING || state.isDone()) {
if (state == SCHEDULED || state == RUNNING || state == FLUSHING || state.isDone()) {
iterator.remove();
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.collect.ImmutableList.toImmutableList;
import static com.google.common.collect.ImmutableMap.toImmutableMap;
import static io.prestosql.execution.StageState.FLUSHING;
import static io.prestosql.execution.StageState.RUNNING;
import static io.prestosql.execution.StageState.SCHEDULED;
import static io.prestosql.sql.planner.plan.ExchangeNode.Scope.LOCAL;
Expand Down Expand Up @@ -92,7 +93,7 @@ private void removeCompletedStages()
{
for (Iterator<SqlStageExecution> stageIterator = activeSources.iterator(); stageIterator.hasNext(); ) {
StageState state = stageIterator.next().getState();
if (state == SCHEDULED || state == RUNNING || state.isDone()) {
if (state == SCHEDULED || state == RUNNING || state == FLUSHING || state.isDone()) {
stageIterator.remove();
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@
import static io.prestosql.execution.StageState.CANCELED;
import static io.prestosql.execution.StageState.FAILED;
import static io.prestosql.execution.StageState.FINISHED;
import static io.prestosql.execution.StageState.FLUSHING;
import static io.prestosql.execution.StageState.RUNNING;
import static io.prestosql.execution.StageState.SCHEDULED;
import static io.prestosql.execution.scheduler.SourcePartitionedScheduler.newSourcePartitionedSchedulerAsStageScheduler;
Expand Down Expand Up @@ -595,7 +596,7 @@ else if (!result.getBlocked().isDone()) {

for (SqlStageExecution stage : stages.values()) {
StageState state = stage.getState();
if (state != SCHEDULED && state != RUNNING && !state.isDone()) {
if (state != SCHEDULED && state != RUNNING && state != FLUSHING && !state.isDone()) {
throw new PrestoException(GENERIC_INTERNAL_ERROR, format("Scheduling is complete, but stage %s is in state %s", stage.getStageId(), state));
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,18 +137,18 @@ public void testSimpleQuery()
{
SqlTask sqlTask = createInitialTask();

TaskInfo taskInfo = sqlTask.updateTask(TEST_SESSION,
assertEquals(sqlTask.getTaskStatus().getState(), TaskState.RUNNING);
sqlTask.updateTask(TEST_SESSION,
Optional.of(PLAN_FRAGMENT),
ImmutableList.of(new TaskSource(TABLE_SCAN_NODE_ID, ImmutableSet.of(SPLIT), true)),
createInitialEmptyOutputBuffers(PARTITIONED).withBuffer(OUT, 0).withNoMoreBufferIds(),
OptionalInt.empty());
assertEquals(taskInfo.getTaskStatus().getState(), TaskState.RUNNING);
rohangarg marked this conversation as resolved.
Show resolved Hide resolved

taskInfo = sqlTask.getTaskInfo();
assertEquals(taskInfo.getTaskStatus().getState(), TaskState.RUNNING);
TaskInfo taskInfo = sqlTask.getTaskInfo(TaskState.RUNNING).get(1, SECONDS);
assertEquals(taskInfo.getTaskStatus().getState(), TaskState.FLUSHING);

BufferResult results = sqlTask.getTaskResults(OUT, 0, DataSize.of(1, MEGABYTE)).get();
assertEquals(results.isBufferComplete(), false);
assertFalse(results.isBufferComplete());
assertEquals(results.getSerializedPages().size(), 1);
assertEquals(results.getSerializedPages().get(0).getPositionCount(), 1);

Expand Down Expand Up @@ -202,15 +202,15 @@ public void testAbort()
{
SqlTask sqlTask = createInitialTask();

TaskInfo taskInfo = sqlTask.updateTask(TEST_SESSION,
assertEquals(sqlTask.getTaskStatus().getState(), TaskState.RUNNING);
sqlTask.updateTask(TEST_SESSION,
Optional.of(PLAN_FRAGMENT),
ImmutableList.of(new TaskSource(TABLE_SCAN_NODE_ID, ImmutableSet.of(SPLIT), true)),
createInitialEmptyOutputBuffers(PARTITIONED).withBuffer(OUT, 0).withNoMoreBufferIds(),
OptionalInt.empty());
assertEquals(taskInfo.getTaskStatus().getState(), TaskState.RUNNING);
rohangarg marked this conversation as resolved.
Show resolved Hide resolved

taskInfo = sqlTask.getTaskInfo();
assertEquals(taskInfo.getTaskStatus().getState(), TaskState.RUNNING);
TaskInfo taskInfo = sqlTask.getTaskInfo(TaskState.RUNNING).get(1, SECONDS);
assertEquals(taskInfo.getTaskStatus().getState(), TaskState.FLUSHING);

sqlTask.abortTaskResults(OUT);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -277,8 +277,9 @@ public void testSimple(PipelineExecutionStrategy executionStrategy)
throw new UnsupportedOperationException();
}

assertEquals(taskStateMachine.getStateChange(TaskState.RUNNING).get(10, SECONDS), TaskState.FLUSHING);
outputBufferConsumer.abort(); // complete the task by calling abort on it
TaskState taskState = taskStateMachine.getStateChange(TaskState.RUNNING).get(10, SECONDS);
sopel39 marked this conversation as resolved.
Show resolved Hide resolved
TaskState taskState = taskStateMachine.getStateChange(TaskState.FLUSHING).get(10, SECONDS);
assertEquals(taskState, TaskState.FINISHED);
}
finally {
Expand Down Expand Up @@ -579,8 +580,9 @@ public void testComplex(PipelineExecutionStrategy executionStrategy)
throw new UnsupportedOperationException();
}

assertEquals(taskStateMachine.getStateChange(TaskState.RUNNING).get(10, SECONDS), TaskState.FLUSHING);
outputBufferConsumer.abort(); // complete the task by calling abort on it
TaskState taskState = taskStateMachine.getStateChange(TaskState.RUNNING).get(10, SECONDS);
sopel39 marked this conversation as resolved.
Show resolved Hide resolved
TaskState taskState = taskStateMachine.getStateChange(TaskState.FLUSHING).get(10, SECONDS);
assertEquals(taskState, TaskState.FINISHED);
}
finally {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,9 +54,11 @@
import static io.prestosql.execution.buffer.OutputBuffers.createInitialEmptyOutputBuffers;
import static io.prestosql.testing.TestingSession.testSessionBuilder;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertNotEquals;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertNull;
import static org.testng.Assert.assertTrue;

@Test
public class TestSqlTaskManager
Expand Down Expand Up @@ -110,21 +112,20 @@ public void testSimpleQuery()
{
try (SqlTaskManager sqlTaskManager = createSqlTaskManager(new TaskManagerConfig())) {
TaskId taskId = TASK_ID;
TaskInfo taskInfo = createTask(sqlTaskManager, taskId, ImmutableSet.of(SPLIT), createInitialEmptyOutputBuffers(PARTITIONED).withBuffer(OUT, 0).withNoMoreBufferIds());
assertEquals(taskInfo.getTaskStatus().getState(), TaskState.RUNNING);
createTask(sqlTaskManager, taskId, ImmutableSet.of(SPLIT), createInitialEmptyOutputBuffers(PARTITIONED).withBuffer(OUT, 0).withNoMoreBufferIds());

taskInfo = sqlTaskManager.getTaskInfo(taskId);
assertEquals(taskInfo.getTaskStatus().getState(), TaskState.RUNNING);
TaskInfo taskInfo = sqlTaskManager.getTaskInfo(taskId, TaskState.RUNNING).get(1, TimeUnit.SECONDS);
assertEquals(taskInfo.getTaskStatus().getState(), TaskState.FLUSHING);

BufferResult results = sqlTaskManager.getTaskResults(taskId, OUT, 0, DataSize.of(1, Unit.MEGABYTE)).get();
assertEquals(results.isBufferComplete(), false);
assertFalse(results.isBufferComplete());
assertEquals(results.getSerializedPages().size(), 1);
assertEquals(results.getSerializedPages().get(0).getPositionCount(), 1);

for (boolean moreResults = true; moreResults; moreResults = !results.isBufferComplete()) {
results = sqlTaskManager.getTaskResults(taskId, OUT, results.getToken() + results.getSerializedPages().size(), DataSize.of(1, Unit.MEGABYTE)).get();
}
assertEquals(results.isBufferComplete(), true);
assertTrue(results.isBufferComplete());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit this should be separate commit

assertEquals(results.getSerializedPages().size(), 0);

// complete the task by calling abort on it
Expand Down Expand Up @@ -190,11 +191,10 @@ public void testAbortResults()
{
try (SqlTaskManager sqlTaskManager = createSqlTaskManager(new TaskManagerConfig())) {
TaskId taskId = TASK_ID;
TaskInfo taskInfo = createTask(sqlTaskManager, taskId, ImmutableSet.of(SPLIT), createInitialEmptyOutputBuffers(PARTITIONED).withBuffer(OUT, 0).withNoMoreBufferIds());
assertEquals(taskInfo.getTaskStatus().getState(), TaskState.RUNNING);
createTask(sqlTaskManager, taskId, ImmutableSet.of(SPLIT), createInitialEmptyOutputBuffers(PARTITIONED).withBuffer(OUT, 0).withNoMoreBufferIds());

taskInfo = sqlTaskManager.getTaskInfo(taskId);
assertEquals(taskInfo.getTaskStatus().getState(), TaskState.RUNNING);
TaskInfo taskInfo = sqlTaskManager.getTaskInfo(taskId, TaskState.RUNNING).get(1, TimeUnit.SECONDS);
assertEquals(taskInfo.getTaskStatus().getState(), TaskState.FLUSHING);

sqlTaskManager.abortTaskResults(taskId, OUT);

Expand Down
Loading