Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[DISCUSS] Shuffle read-side error handling #326

Closed
abellina opened this issue Jul 6, 2020 · 6 comments
Closed

[DISCUSS] Shuffle read-side error handling #326

abellina opened this issue Jul 6, 2020 · 6 comments
Assignees
Labels
documentation Improvements or additions to documentation feature request New feature or request P0 Must have for release shuffle things that impact the shuffle plugin

Comments

@abellina
Copy link
Collaborator

abellina commented Jul 6, 2020

Currently, any errors from the RapidsShuffleClient would cause an IllegalStateException, triggering an Executor failure (as this is a fatal exception). In our case, this means a new JVM would have to launch and take hold of the GPU.

Shutting down the Executor process can cause peers waiting on pending UCX transfers to fail, as the Executor is also a shuffle server, so we want to avoid exiting the process as much as possible.

The following is a plan to use FetchFailedException when appropriate.

The end result of this ticket should be documentation on how retried tasks are handled in the Rapids plugin, specifically with the shuffle component as it deals with map output caching, and code changes to ensure correctness (if any), and improve robustness.

How FetchFailedException is handled in Spark:

When constructing a new FetchFailedException, the TaskContext‘s setFetchFailed is called, recording a single fetch failure for a (shuffleId, mapId, mapIndex, reduceId) tuple. On calling next() on the shuffle iterator, this exception is thrown. Only a single exception is really observed. In Spark proper, if multiple tasks were running and corrupted, then we’ll fail again when we detect the new failure.

There is also a MetadataFetchFailedException. This is used to retry the task when there is a failure fetching from the MapOutputTracker

On the various ids (for reference, more eyes on this could be great):

Executive summary:

On a FetchFailedException, Spark handles it by resubmitting certain stages. Some stages are not fully submitted, but instead, Spark, given the failed map stage and the mapIndex (encoded in the FetchFailedException), will fail it, unregister the map output, and any indeterminate stage that it depends on, because its output is non deterministic, must be recomputed. Additionally, the DAGScheduler will unregister all of the executor's output..

Specifics:

  1. The DAGScheduler receives a FetchFailed event from the Executor (who had a try/catch around the task, and caught the FetchFailedException).
  2. Once a task fails due to a fetch failure, its stage fails, and the number of stage failures are tracked (failedAttemptIds in Stage.scala) up to spark.stage.maxConsecutiveAttempts (4 by default).
  3. Recomputing map output:
    • For barrier tasks, all map output is recomputed (all of it is invalidated).
    • For non-barrier tasks, the specific mapIndex is unregistered (shuffleId, mapIndex, block manager)
      • Given the nature of TaskContext’s setFetchFailed/exception handling, it seems that only one mapIndex is invalidated.
    • For INDETERMINATE RDDs, all map of the failed stage are retried.
    • For a DETERMINATE and UNORDERED RDDs => fail specific map and recompute those. DETERMINATE RDDs have key ordering and have an aggregator. These are produced with RDD-specific functions like reduceByKey, we should make sure that the RapidsShuffleManager at least blows up if it sees a ShuffleDependency that has these requirements.
      • UNORDERED RDDs are treated as DETERMINATE as well, but they don't need to be sorted or map-side aggregated, and this is where we would fall.
  4. Last, but not least, when the blockManagerId passed to FetchFailedException is specified, removeOutputsOnExecutor is called, clearing any map output that the peer executor had computed.

Question: Are map indices (partitions) always deterministic? And if not, are we making sure the RDD is marked non deterministic

  • All standard partitioners are deterministic (that cudf supports). Except range? We should double check and test error cases for partitioning.

When does Spark throw FetchFailedException:

  1. Zero-sized blocks
  2. Failure to open a local block
  3. Buffer format is corrupted (can’t be decompressed for example), or the buffer can't otherwise be read.
  4. Failure while fetching a block. There is try logic depending on the transport, but at the end there are catch-all cases where a FetchFailedException is thrown.
  5. When the peer is no longer alive

Map Output Corruption

There has been plenty of work around how to deal with map output corruption (https://issues.apache.org/jira/browse/SPARK-8029). In the cases outlined, the issues are normally related to multiple live copies of a map task trying to commit output, due to speculation, or due to re-run caused by a fetch failure.

Presently, outputs have little chance of corrupting each other. Speculative tasks are tried on different hosts and also mean that each will get their own taskAttemptId, retried tasks will have different taskAttemptIds under the new shuffle protocol) and committing is "last writer wins".

Question: I believe our real issue is running out of memory. We will happily write the executor's blocks again to the catalog, but there is no callback from the MapOutputTracker saying we can forget those old blocks we are retrying. There may be enough hooks to do this, but I don't think we do now.

Current change proposal:

RapidsShuffleIterator will throw a RapidsShuffleFetchFailedException when there was a fetch failure and the caller calls next. The expectation is that the shuffle stage will be re-submitted, along the map stage that computed the failed partition, given apache/spark#18150 all map output for the executor that we were fetching from, and any of their dependents as per DAGScheduler requirements. We'll complain about all the blocks the peer was responsible for at first, without changing the protocol. We can change the protocol to preserve the mapIndex, which would allow us to point at a particular bad partition in the future (since all map outputs for the executor will be recomputed, it doesn't matter much in those terms).

The error comes from a TransactionStatus.Error or TransactionStatus.Cancelled. Currently, they are handled the same way. Cancelled transactions are likely caused by an error (while tearing down UCX classes). In the future cancelled vs error may have two different error handling paths. This should handle mostly fetch failures, and other PRs should follow if we agree.

I don't have a change that would deal with the old retried accumulating map outputs. We can update this (and the potential doc with some of this info) when that is ready.

Error examples:

Metadata fetch failure:

FetchFailed(BlockManagerId(7, 192.168.1.2, 36731, Some(rapids=44207)), shuffleId=1, mapIndex=15, mapId=192, reduceId=18, message=
org.apache.spark.shuffle.RapidsShuffleFetchFailedException: There was a fetch error from the rapids shuffle: Unsuccessful metadata request Transaction(txId=4, type=Request, connection=UCXClientConnection(ucx=com.nvidia.spark.rapids.shuffle.ucx.UCX@1eac41b5, peerExecutorId=7, peerClientId=1) , status=Success, errorMessage=None, totalMessages=2, pending=0)
	at com.nvidia.spark.rapids.shuffle.RapidsShuffleIterator.next(RapidsShuffleIterator.scala:304)

Buffer fetch failure:

FetchFailed(BlockManagerId(21, 192.168.1.3, 39549, Some(rapids=37109)), shuffleId=1, mapIndex=45, mapId=222, reduceId=111, message=
org.apache.spark.shuffle.RapidsShuffleFetchFailedException: There was a fetch error from the rapids shuffle: Unsuccessful buffer receive Transaction(txId=67, type=Receive, connection=UCXClientConnection(ucx=com.nvidia.spark.rapids.shuffle.ucx.UCX@37ae314f, peerExecutorId=21, peerClientId=1) , status=Success, errorMessage=None, totalMessages=1, pending=0)
	at com.nvidia.spark.rapids.shuffle.RapidsShuffleIterator.next(RapidsShuffleIterator.scala:304)

DAG scheduler example showing the handling:

20/07/04 10:57:33 WARN TaskSetManager: Lost task 39.0 in stage 22.0 (TID 859, 192.168.1.2, executor 1): FetchFailed(BlockManagerId(27, 192.168.1.2, 34867, Some(rapids=34825)), shuffleId=1, mapIndex=45, mapId=628, reduceId=39, message=
  org.apache.spark.shuffle.RapidsShuffleFetchFailedException: There was a fetch error from the rapids shuffle: Unsuccessful metadata request Transaction(txId=3, type=Request, connection=UCXClientConnection(ucx=com.nvidia.spark.rapids.shuffle.ucx.UCX@69d4caa0, peerExecutorId=27, peerClientId=1) , status=Success, errorMessage=None, totalMessages=2, pending=0)

20/07/04 10:57:33 INFO TaskSetManager: Task 39.0 in stage 22.0 (TID 859) failed, but the task will not be re-executed (either because the task failed with a shuffle data fetch failure, so the previous stage needs to be re-run, or because a different copy of the task has already succeeded).

20/07/04 10:57:33 INFO DAGScheduler: Marking ShuffleMapStage 22 (mapPartitions at GpuShuffleExchangeExec.scala:159) as failed due to a fetch failure from ShuffleMapStage 21 (mapPartitions at GpuShuffleExchangeExec.scala:159)

20/07/04 10:57:33 INFO DAGScheduler: Resubmitting ShuffleMapStage 21 (mapPartitions at GpuShuffleExchangeExec.scala:159) and ShuffleMapStage 22 (mapPartitions at GpuShuffleExchangeExec.scala:159) due to fetch failure

DAG scheduler removes the executor output altogether:

20/07/04 10:57:33 INFO DAGScheduler: Executor lost: 27 (epoch 3)
20/07/04 10:57:33 INFO BlockManagerMasterEndpoint: Trying to remove executor 27 from BlockManagerMaster.
20/07/04 10:57:33 INFO BlockManagerMasterEndpoint: Removing block manager BlockManagerId(27, 192.168.1.2, 34867, None)
20/07/04 10:57:33 INFO BlockManagerMaster: Removed 27 successfully in removeExecutor
20/07/04 10:57:33 INFO DAGScheduler: Shuffle files lost for executor: 27 (epoch 3)

Later the executor is told to re-register

20/07/04 10:57:33 INFO BlockManagerMaster: Removed 27 successfully in removeExecutor
20/07/04 10:57:34 INFO Executor: Told to re-register on heartbeat
20/07/04 10:57:34 INFO BlockManager: BlockManager BlockManagerId(27, 192.168.1.2, 34867, None) re-registering with master
20/07/04 10:57:34 INFO BlockManagerMaster: Registering BlockManager BlockManagerId(27, 192.168.1.2, 34867, None)
20/07/04 10:57:34 INFO BlockManagerMaster: Registered BlockManager BlockManagerId(27, 192.168.1.2, 34867, None)

And it gets re-added

20/07/04 10:57:34 INFO BlockManagerMasterEndpoint: Registering block manager 192.168.1.2:34867 with 16.9 GiB RAM, BlockManagerId(27, 192.168.1.2, 34867, None)

Executor 27 only shuts down when the job finishes:

20/07/04 11:00:01 INFO CoarseGrainedExecutorBackend: Driver commanded a shutdown

Note: this does resubmit stage 21 and 22 (fetch error happened in 23), and then later 23 executes. It works well, unless there are other fetch failures that are not triggered by the rapids shuffle client (see below)

Bad case:

In this case, it’s not just the two failures (which were induced for executor id 1 and 2). There are other failures when a map status is missing. These lead to job failures, and don’t happen all the time.

FetchFailed(null, shuffleId=1, mapIndex=-1, mapId=-1, reduceId=101, message=
org.apache.spark.shuffle.MetadataFetchFailedException: Missing an output location for shuffle 1
	at org.apache.spark.MapOutputTracker$.$anonfun$convertMapStatuses$2(MapOutputTracker.scala:1010)
	at org.apache.spark.MapOutputTracker$.$anonfun$convertMapStatuses$2$adapted(MapOutputTracker.scala:1006)
	at scala.collection.Iterator.foreach(Iterator.scala:941)
	at scala.collection.Iterator.foreach$(Iterator.scala:941)
	at scala.collection.AbstractIterator.foreach(Iterator.scala:1429)
	at org.apache.spark.MapOutputTracker$.convertMapStatuses(MapOutputTracker.scala:1006)
	at org.apache.spark.MapOutputTrackerWorker.getMapSizesByExecutorId(MapOutputTracker.scala:811)
	at org.apache.spark.sql.rapids.RapidsShuffleInternalManager.getReader(RapidsShuffleInternalManager.scala:330)

A mapIndex of -1 normally means a metadata fetch failure, from the code I've seen. This last case needs to be looked at more.

--- UPDATE ---

Timeout handling

While fetching metadata, buffers, or connecting to another client, we would like to be able to time out and either fail the job or retry.

  • Connection timeouts: The shuffle creates its own set of connections, separate from the built in rpc in Spark. This leads to a point in time when we are attempting to connect to a peer, and that connection attempt needs to have a timeout around it.
  • Metadata fetch timeouts: After a connection is established, we fetch metadata about buffers we'll be asking to download later, this is yet another place where a timeout is necessary.
  • Buffer fetch timeouts: While transferring data, if the peer is not responding (could be due to a bug, or system/network issue), we need to timeout here as well.

As of this update, #732 handles metadata + buffer fetch timeouts in a basic way, but it serves the purpose of not having a user wait indefinitely. UCX is looking to improve error handling features and this is an area where we could get finer grain errors.

@abellina abellina added documentation Improvements or additions to documentation feature request New feature or request ? - Needs Triage Need team to review and classify shuffle things that impact the shuffle plugin labels Jul 6, 2020
@revans2
Copy link
Collaborator

revans2 commented Jul 7, 2020

Looks good to me.

@abellina
Copy link
Collaborator Author

abellina commented Sep 11, 2020

Filed this (#737) to handle the timeouts better than #732, when UCX exposes them to us

@abellina
Copy link
Collaborator Author

Filed this (#739) to handle zero-sized blocks, local block failures, and compression issues.

@abellina
Copy link
Collaborator Author

abellina commented Sep 11, 2020

Question: I believe our real issue is running out of memory. We will happily write the executor's blocks again to the catalog, but there is no callback from the MapOutputTracker saying we can forget those old blocks we are retrying. There may be enough hooks to do this, but I don't think we do now.

The worry here I think is somewhat unfounded. It's not that we'll run out of device memory, but instead we'll have to spill and track buffers that will never be used, so yes we'll ultimately cause more memory pressure and disk usage. We also could have pending transfers for these blocks, so we will want to handle them. Filed #742 to track.

@abellina
Copy link
Collaborator Author

All standard partitioners are deterministic (that cudf supports). Except range? We should double check and test error cases for partitioning.

There is already an issue to try and add more tests for partitioning: #424. We could add here some of the partitioning tests that spark has, for bound computation (as suggested by @kuhushukla). No issue to report, just something we should have tests for.

@abellina
Copy link
Collaborator Author

I am closing this issue as I have branched off individual tasks, and did a couple of the items in release 0.2 (#331, and #742). The rest of them should be something we target for 0.3

pxLi pushed a commit to pxLi/spark-rapids that referenced this issue May 12, 2022
tgravescs pushed a commit to tgravescs/spark-rapids that referenced this issue Nov 30, 2023
Signed-off-by: spark-rapids automation <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
documentation Improvements or additions to documentation feature request New feature or request P0 Must have for release shuffle things that impact the shuffle plugin
Projects
None yet
Development

No branches or pull requests

3 participants